[kepler-code] r28740 - trunk/modules/actors/src/org/kepler/data/netcdf
crawl at ecoinformatics.org
crawl at ecoinformatics.org
Mon Oct 3 11:43:25 PDT 2011
Author: crawl
Date: 2011-10-03 11:43:24 -0700 (Mon, 03 Oct 2011)
New Revision: 28740
Added:
trunk/modules/actors/src/org/kepler/data/netcdf/NetCDFReader.java
Log:
an actor to read netcdf files
Added: trunk/modules/actors/src/org/kepler/data/netcdf/NetCDFReader.java
===================================================================
--- trunk/modules/actors/src/org/kepler/data/netcdf/NetCDFReader.java (rev 0)
+++ trunk/modules/actors/src/org/kepler/data/netcdf/NetCDFReader.java 2011-10-03 18:43:24 UTC (rev 28740)
@@ -0,0 +1,670 @@
+/* An actor that reads NetCDF files.
+ *
+ * Copyright (c) 2011 The Regents of the University of California.
+ * All rights reserved.
+ *
+ * '$Author$'
+ * '$Date$'
+ * '$Revision$'
+ *
+ * Permission is hereby granted, without written agreement and without
+ * license or royalty fees, to use, copy, modify, and distribute this
+ * software and its documentation for any purpose, provided that the above
+ * copyright notice and the following two paragraphs appear in all copies
+ * of this software.
+ *
+ * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY
+ * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES
+ * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF
+ * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ *
+ * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
+ * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
+ * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE
+ * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF
+ * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES,
+ * ENHANCEMENTS, OR MODIFICATIONS.
+ *
+ */
+package org.kepler.data.netcdf;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import ptolemy.actor.TypedIOPort;
+import ptolemy.actor.lib.LimitedFiringSource;
+import ptolemy.actor.parameters.FilePortParameter;
+import ptolemy.actor.parameters.PortParameter;
+import ptolemy.data.ArrayToken;
+import ptolemy.data.BooleanToken;
+import ptolemy.data.DoubleMatrixToken;
+import ptolemy.data.DoubleToken;
+import ptolemy.data.FloatToken;
+import ptolemy.data.IntMatrixToken;
+import ptolemy.data.IntToken;
+import ptolemy.data.LongToken;
+import ptolemy.data.ShortToken;
+import ptolemy.data.StringToken;
+import ptolemy.data.Token;
+import ptolemy.data.type.ArrayType;
+import ptolemy.data.type.BaseType;
+import ptolemy.data.type.Type;
+import ptolemy.kernel.CompositeEntity;
+import ptolemy.kernel.util.Attribute;
+import ptolemy.kernel.util.IllegalActionException;
+import ptolemy.kernel.util.NameDuplicationException;
+import ptolemy.kernel.util.Workspace;
+import ptolemy.math.DoubleMatrixMath;
+import ptolemy.math.IntegerMatrixMath;
+import ucar.ma2.Array;
+import ucar.ma2.DataType;
+import ucar.ma2.InvalidRangeException;
+import ucar.ma2.Range;
+import ucar.ma2.Section;
+import ucar.nc2.NetcdfFile;
+import ucar.nc2.Variable;
+
+/**
+ * An actor that reads NetCDF files.
+ *
+ *
+ * @author Daniel Crawl
+ * @version $Id$
+ *
+ * TODO
+ *
+ * create kar file
+ * test reading hdf5, other types
+ *
+ */
+public class NetCDFReader extends LimitedFiringSource {
+
+ public NetCDFReader(CompositeEntity container, String name)
+ throws NameDuplicationException, IllegalActionException {
+ super(container, name);
+
+ filename = new FilePortParameter(this, "filename");
+
+ constraint = new PortParameter(this, "constraint");
+ constraint.setStringMode(true);
+ constraint.getPort().setTypeEquals(BaseType.STRING);
+
+ // hide output port name
+ new Attribute(output, "_hide");
+ }
+
+ /** React to a change in an attribute. */
+ public void attributeChanged(Attribute attribute) throws IllegalActionException
+ {
+ if(attribute == filename)
+ {
+ _updateFileName();
+ _updateOutputPorts();
+ }
+ else if(attribute == constraint)
+ {
+ _parseConstraintExpression();
+ _updateOutputPorts();
+ }
+ else
+ {
+ super.attributeChanged(attribute);
+ }
+ }
+
+ /** Clone this actor into the specified workspace.
+ * @param workspace The workspace for the cloned object.
+ * @exception CloneNotSupportedException If cloned ports cannot have
+ * as their container the cloned entity (this should not occur), or
+ * if one of the attributes cannot be cloned.
+ * @return A new NetCDFReader.
+ */
+ public Object clone(Workspace workspace) throws CloneNotSupportedException
+ {
+ NetCDFReader newObject = (NetCDFReader) super.clone(workspace);
+ newObject._constraint = null;
+ newObject._constraintMap = new HashMap<String,Section>();
+ newObject._filenameStr = null;
+ newObject._ncFile = null;
+ return newObject;
+ }
+
+ public void fire() throws IllegalActionException
+ {
+ // read tokens in port parameters
+ constraint.update();
+ filename.update();
+
+ // output the variables for each connected output port.
+ for(Object object : outputPortList())
+ {
+ TypedIOPort port = (TypedIOPort)object;
+ if(port.numberOfSinks() > 0)
+ {
+ _outputData(port);
+ }
+ }
+ }
+
+ public void initialize() throws IllegalActionException
+ {
+ super.initialize();
+ _amFiring = true;
+
+ if(_ncFile == null) {
+ _openFile();
+ }
+ }
+
+ /** Close the NetCDF file. */
+ public void wrapup() throws IllegalActionException
+ {
+ _amFiring = false;
+ _closeFile();
+ super.wrapup();
+ }
+
+ ///////////////////////////////////////////////////////////////////
+ //// public fields ////
+
+ /** NetCDF file name. */
+ public FilePortParameter filename;
+
+ /** Space-separated list of variables and constraints.
+ * constraints are start:end:stride
+ */
+ public PortParameter constraint;
+
+ ///////////////////////////////////////////////////////////////////
+ //// private methods ////
+
+ /** Close the NetCDF file. */
+ private void _closeFile() throws IllegalActionException
+ {
+ if(_ncFile != null) {
+ try {
+ _ncFile.close();
+ _ncFile = null;
+ } catch (IOException e) {
+ throw new IllegalActionException(this, e, "Error closing " + _filenameStr);
+ }
+ }
+
+ }
+
+ /** Open the NetCDF file. */
+ private void _openFile() throws IllegalActionException
+ {
+ if(_filenameStr != null && !_filenameStr.isEmpty())
+ {
+ try {
+ _ncFile = NetcdfFile.open(_filenameStr);
+ } catch (IOException e) {
+ throw new IllegalActionException(this, e, "Error opening " + _filenameStr);
+ }
+ }
+ }
+
+ /** Read a token from the file name port parameter. */
+ private void _updateFileName() throws IllegalActionException
+ {
+ Token token = filename.getToken();
+ if(token != null)
+ {
+ String fileStr = ((StringToken)token).stringValue();
+ if(_filenameStr == null || !_filenameStr.equals(fileStr))
+ {
+ _filenameStr = fileStr;
+
+ // if the old file is open, close and open the new one
+ if(_ncFile != null) {
+ _closeFile();
+ }
+
+ if(_ncFile == null) {
+ _openFile();
+ }
+ }
+ }
+ }
+
+ private void _updateOutputPorts() throws IllegalActionException
+ {
+ if(!_amFiring && _filenameStr != null && !_filenameStr.isEmpty() && _constraint != null)
+ {
+
+ if(_ncFile == null) {
+ _openFile();
+ }
+
+ Set<String> variableNames = new HashSet<String>();
+ List<Variable> variables = null;
+
+ // see if there are constraints
+ if(_constraintMap.size() > 0)
+ {
+ // add all the variables in the constraints
+ variableNames.addAll(_constraintMap.keySet());
+ variables = new LinkedList<Variable>();
+ for(String name : variableNames)
+ {
+ Variable variable = _ncFile.findVariable(name);
+ if(variable == null)
+ {
+ throw new IllegalActionException(this, "Variable " + name + " is in constraint " +
+ "expression, but not found in file " + _filenameStr);
+ }
+ variables.add(variable);
+ }
+ }
+ else
+ {
+ // add all the variables in the file
+ variables = _ncFile.getVariables();
+ for(Variable variable : variables)
+ {
+ variableNames.add(variable.getFullName());
+ }
+ }
+
+ // add ports and set types
+ for(Variable variable : variables)
+ {
+ String name = variable.getFullName();
+
+ // see if we need to add the port
+ TypedIOPort port = (TypedIOPort) getPort(name);
+ if(port == null)
+ {
+ try
+ {
+ port = new TypedIOPort(this, name, false, true);
+ }
+ catch (NameDuplicationException e)
+ {
+ throw new IllegalActionException(this, e, "Error creating port " + name);
+ }
+ }
+ // see if variable is named "output". we already have port called output,
+ // so unhide it if it is hidden.
+ else if(name.equals("output"))
+ {
+ Attribute attribute = port.getAttribute("_hide");
+ if(attribute != null)
+ {
+ try
+ {
+ attribute.setContainer(null);
+ }
+ catch (NameDuplicationException e)
+ {
+ throw new IllegalActionException(this, e, "Unable to show output port.");
+ }
+ }
+ }
+
+ Type oldType = port.getType();
+
+ // set the port type based on the netcdf type and decimation
+ Type newType = _getTokenTypeForVariable(variable);
+
+ if(!oldType.equals(newType))
+ {
+ port.setTypeEquals(newType);
+ //System.out.println("setting type for " + name);
+ }
+ }
+
+ // delete ports for non-existing variables
+ for(Object obj : outputPortList())
+ {
+ TypedIOPort port = (TypedIOPort) obj;
+ String portName = port.getName();
+ if(!variableNames.contains(portName))
+ {
+ // can't delete output port since belongs to parent class
+ if(portName.equals("output"))
+ {
+ if(port.getAttribute("_hide") == null)
+ {
+ // hide output port
+ try
+ {
+ new Attribute(port, "_hide");
+ }
+ catch (NameDuplicationException e)
+ {
+ throw new IllegalActionException(this, e, "Unable to hide output port.");
+ }
+ }
+ }
+ else
+ {
+ // remove port
+ try
+ {
+ port.setContainer(null);
+ }
+ catch (NameDuplicationException e)
+ {
+ throw new IllegalActionException(this, e, "Error deleting " + port.getName());
+ }
+ }
+ }
+ }
+ }
+ }
+
+ /** Get the number of dimensions of a variable after decimating. */
+ private int _getDimensionsRemaining(Variable variable)
+ {
+ int decimationAmount = 0;
+
+ // see if this variable was decimated in the constraint expression
+ Section section = _constraintMap.get(variable.getFullName());
+
+ if(section != null)
+ {
+ for(Range range : section.getRanges())
+ {
+ // NOTE: range can be null if decimation specified as [:]
+ if(range != null && range.length() == 1)
+ {
+ decimationAmount++;
+ }
+ }
+ }
+
+ return variable.getShape().length - decimationAmount;
+ }
+
+ private Type _getTokenTypeForVariable(Variable variable) throws IllegalActionException
+ {
+
+ Type retval = null;
+
+ if(variable.isMetadata())
+ {
+ throw new IllegalActionException(this, "variable " + variable.getFullName() + " is metadata.");
+ }
+
+ String name = variable.getFullName();
+
+ DataType dataType = variable.getDataType();
+
+ int dimensionsRemaining = _getDimensionsRemaining(variable);
+
+ //System.out.println("dim rem for " + name + " is " + dimensionsRemaining);
+
+ if(dimensionsRemaining < 0)
+ {
+ throw new IllegalActionException(this, "Variable " + name + " has " +
+ variable.getShape().length + " dimension(s), but more " +
+ " dimensions have been constrained.");
+ }
+ else if(dimensionsRemaining < 2)
+ {
+ switch(dataType)
+ {
+ case DOUBLE:
+ retval = BaseType.DOUBLE;
+ break;
+ case FLOAT:
+ retval = BaseType.FLOAT;
+ break;
+ case SHORT:
+ retval = BaseType.SHORT;
+ break;
+ case INT:
+ retval = BaseType.INT;
+ break;
+ case LONG:
+ retval = BaseType.LONG;
+ break;
+ case BOOLEAN:
+ retval = BaseType.BOOLEAN;
+ break;
+ default:
+ throw new IllegalActionException(this, "Variable " + name +
+ "has unsupported data type: " + dataType);
+ }
+
+ if(dimensionsRemaining == 1)
+ {
+ retval = new ArrayType(retval);
+ }
+ }
+ else if(dimensionsRemaining == 2)
+ {
+ switch(dataType)
+ {
+ case DOUBLE:
+ case FLOAT:
+ retval = BaseType.DOUBLE_MATRIX;
+ break;
+ case INT:
+ retval = BaseType.INT_MATRIX;
+ break;
+ case LONG:
+ retval = BaseType.LONG_MATRIX;
+ break;
+ case BOOLEAN:
+ retval = BaseType.BOOLEAN_MATRIX;
+ break;
+ default:
+ throw new IllegalActionException(this, "Unsupported matrix " +
+ "type for variable " + name + "(" + dataType + ")");
+ }
+ }
+ else if(dimensionsRemaining > 2)
+ {
+ throw new IllegalActionException(this, "Variable " + name + " has" +
+ " been decimated to have more than two dimensions, which is" +
+ " currently not supported.");
+ }
+
+ return retval;
+ }
+
+ /** Write the data from the file to an output port. */
+ private void _outputData(TypedIOPort port) throws IllegalActionException
+ {
+ Token token = null;
+
+ String name = port.getName();
+ Variable variable = _ncFile.findVariable(name);
+ if(variable == null)
+ {
+ throw new IllegalActionException("Could not find variable " + name + " in file " + _filenameStr);
+ }
+
+ int dimensionsRemaing = _getDimensionsRemaining(variable);
+
+ Array array;
+ try {
+ Section section = _constraintMap.get(name);
+ if(section != null) {
+ array = variable.read(section);
+ } else {
+ array = variable.read(null, variable.getShape());
+ }
+ } catch (Exception e) {
+ throw new IllegalActionException(this, e, "Unable to read variable " + name);
+ }
+
+ // get the shape from the read array
+ int[] readShape = array.getShape();
+
+ Object arrayStorage = array.getStorage();
+
+ DataType dataType = variable.getDataType();
+
+ if(dimensionsRemaing == 0)
+ {
+ switch(dataType)
+ {
+ case DOUBLE:
+ token = new DoubleToken(((double[])arrayStorage)[0]);
+ break;
+ case FLOAT:
+ token = new FloatToken(((float[])arrayStorage)[0]);
+ break;
+ case SHORT:
+ token = new ShortToken(((short[])arrayStorage)[0]);
+ break;
+ case INT:
+ token = new IntToken(((int[])arrayStorage)[0]);
+ break;
+ case LONG:
+ token = new LongToken(((long[])arrayStorage)[0]);
+ break;
+ case BOOLEAN:
+ token = new BooleanToken(((boolean[])arrayStorage)[0]);
+ break;
+ default:
+ throw new IllegalActionException(this, "Variable " + name +
+ " has unsupported data type: " + dataType);
+ }
+ }
+ else if(dimensionsRemaing == 1)
+ {
+ int length = java.lang.reflect.Array.getLength(arrayStorage);
+ StringBuilder arrayStr = new StringBuilder("{");
+ for(int i = 0; i < length - 1; i++)
+ {
+ arrayStr.append(java.lang.reflect.Array.get(arrayStorage, i));
+ arrayStr.append(",");
+ }
+ arrayStr.append(java.lang.reflect.Array.get(arrayStorage, length - 1));
+ arrayStr.append("}");
+ token = new ArrayToken(arrayStr.toString());
+ }
+ else // dimensionsRemaing == 2
+ {
+ switch(dataType)
+ {
+ case DOUBLE:
+
+ /*
+ double[][] data = new double[shape[0]][shape[1]];
+ for(int i = 0; i < shape[0]; i++)
+ {
+ for(int j = 0; j < shape[1]; j++)
+ {
+ data[i][j] = arrayDouble.get(i,j);
+ }
+ }
+ */
+
+ token = new DoubleMatrixToken((double[])array.getStorage(),
+ readShape[0], readShape[1]);
+
+ // XXX take the transpose to get elements in correct position
+ token = new DoubleMatrixToken(
+ DoubleMatrixMath.transpose(((DoubleMatrixToken)token).doubleMatrix()));
+
+
+ //System.out.println(getName() + ": array 5000,73 = " + arrayDouble.get(5000, 73));
+ //System.out.println("rows = " + ((DoubleMatrixToken)token).getRowCount());
+ //System.out.println("cols = " + ((DoubleMatrixToken)token).getColumnCount());
+ //System.out.println(getName() + ": token 5000,73 = " +
+ //((DoubleMatrixToken)token).getElementAt(73, 5000));
+
+
+ break;
+
+ case INT:
+
+ token = new IntMatrixToken((int[])array.getStorage(),
+ readShape[0], readShape[1]);
+
+ // XXX take the transpose to get elements in correct position
+ token = new IntMatrixToken(
+ IntegerMatrixMath.transpose(((IntMatrixToken)token).intMatrix()));
+
+ break;
+
+ default:
+ throw new IllegalActionException(this, "Variable " + name +
+ " has unsupported data type: " + dataType);
+ }
+ }
+
+ if(token != null)
+ {
+ //System.out.println(getFullName() + " output token type " + token.getType());
+ port.broadcast(token);
+ }
+ }
+
+ private void _parseConstraintExpression() throws IllegalActionException
+ {
+ String origConstraintStr = ((StringToken)constraint.getToken()).stringValue();
+ // see if constraint expression has changed
+ if(_constraint == null || !_constraint.equals(origConstraintStr))
+ {
+ _constraint = origConstraintStr.trim();
+ _constraintMap.clear();
+
+ String constraintStr = _constraint;
+ while(constraintStr.length() > 0)
+ {
+ Matcher matcher = _VARIABLE_PATTERN.matcher(constraintStr);
+ if(!matcher.matches())
+ {
+ throw new IllegalActionException(this, "Bad constraint: " + origConstraintStr);
+ }
+ String variableName = matcher.group(1);
+
+ Section section = null;
+ String sectionStr = "";
+ if(matcher.groupCount() > 1)
+ {
+ sectionStr = matcher.group(2);
+ String formattedSectionStr =
+ sectionStr.replaceAll("\\[", "").replaceAll("\\]", ",").replaceAll(",$", "");
+
+ try {
+ section = new Section(formattedSectionStr);
+ } catch (InvalidRangeException e) {
+ throw new IllegalActionException(this, e, "Invalid decimation " + sectionStr);
+ }
+ }
+
+ _constraintMap.put(variableName, section);
+
+ constraintStr = constraintStr.substring(variableName.length() + sectionStr.length());
+ }
+ }
+ }
+
+ ///////////////////////////////////////////////////////////////////
+ //// private fields ////
+
+ /** Variable pattern: variable name optionally followed by dimension(s). */
+ private static final Pattern _VARIABLE_PATTERN =
+ Pattern.compile("(\\w+)([\\d\\:\\[\\]]*).*");
+
+ /** The name of the NetCDF file. */
+ private String _filenameStr;
+
+ /** The constraint expression. */
+ private String _constraint;
+
+ /** A map of variable name to dimension decimation. */
+ private Map<String,Section> _constraintMap = new HashMap<String,Section>();
+
+ /** NetCDF file object. */
+ private NetcdfFile _ncFile;
+
+ /** If true, workflow is executing. */
+ private boolean _amFiring;
+}
Property changes on: trunk/modules/actors/src/org/kepler/data/netcdf/NetCDFReader.java
___________________________________________________________________
Added: svn:keywords
+ Author Date Id Revision
Added: svn:eol-style
+ native
More information about the Kepler-cvs
mailing list