[kepler-code] r28738 - in trunk/modules/actors/src/org/kepler/data: . netcdf

crawl at ecoinformatics.org crawl at ecoinformatics.org
Mon Oct 3 10:59:44 PDT 2011


Author: crawl
Date: 2011-10-03 10:59:44 -0700 (Mon, 03 Oct 2011)
New Revision: 28738

Added:
   trunk/modules/actors/src/org/kepler/data/netcdf/
   trunk/modules/actors/src/org/kepler/data/netcdf/NetCDFWriter.java
Log:
an actor to write netcdf files


Added: trunk/modules/actors/src/org/kepler/data/netcdf/NetCDFWriter.java
===================================================================
--- trunk/modules/actors/src/org/kepler/data/netcdf/NetCDFWriter.java	                        (rev 0)
+++ trunk/modules/actors/src/org/kepler/data/netcdf/NetCDFWriter.java	2011-10-03 17:59:44 UTC (rev 28738)
@@ -0,0 +1,609 @@
+/* An actor that writes NetCDF files.
+ * 
+ * Copyright (c) 2011 The Regents of the University of California.
+ * All rights reserved.
+ *
+ * '$Author: crawl $'
+ * '$Date: 2011-07-05 10:53:41 -0700 (Tue, 05 Jul 2011) $' 
+ * '$Revision: 27792 $'
+ * 
+ * Permission is hereby granted, without written agreement and without
+ * license or royalty fees, to use, copy, modify, and distribute this
+ * software and its documentation for any purpose, provided that the above
+ * copyright notice and the following two paragraphs appear in all copies
+ * of this software.
+ *
+ * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY
+ * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES
+ * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF
+ * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ *
+ * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
+ * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
+ * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE
+ * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF
+ * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES,
+ * ENHANCEMENTS, OR MODIFICATIONS.
+ *
+ */
+package org.kepler.data.netcdf;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import ptolemy.actor.IOPort;
+import ptolemy.actor.TypedAtomicActor;
+import ptolemy.actor.TypedIOPort;
+import ptolemy.data.BooleanToken;
+import ptolemy.data.DoubleToken;
+import ptolemy.data.FloatToken;
+import ptolemy.data.IntToken;
+import ptolemy.data.LongToken;
+import ptolemy.data.ShortToken;
+import ptolemy.data.StringToken;
+import ptolemy.data.Token;
+import ptolemy.data.expr.Parameter;
+import ptolemy.data.expr.FileParameter;
+import ptolemy.data.expr.StringParameter;
+import ptolemy.data.type.BaseType;
+import ptolemy.data.type.Type;
+import ptolemy.kernel.CompositeEntity;
+import ptolemy.kernel.Port;
+import ptolemy.kernel.util.Attribute;
+import ptolemy.kernel.util.IllegalActionException;
+import ptolemy.kernel.util.NameDuplicationException;
+import ptolemy.kernel.util.Workspace;
+import ucar.ma2.Array;
+import ucar.ma2.ArrayBoolean;
+import ucar.ma2.ArrayDouble;
+import ucar.ma2.ArrayFloat;
+import ucar.ma2.ArrayInt;
+import ucar.ma2.ArrayLong;
+import ucar.ma2.ArrayShort;
+import ucar.ma2.DataType;
+import ucar.ma2.Index;
+import ucar.nc2.Dimension;
+import ucar.nc2.NetcdfFileWriteable;
+
+/**
+ * An actor that writes NetCDF files.
+ * 
+ * 
+ * @author Daniel Crawl
+ * @version $Id$
+ * 
+ * TODO
+ * 
+ *  deletes/recreates variable port on wf load
+ *  add parameter to overwrite file
+ *  write each element to disk instead of keeping in memory until wrapup
+ *  create kar
+ */
+
+public class NetCDFWriter extends TypedAtomicActor
+{
+    /** Construct a new NetCDFWriter in a container with a name. */
+    public NetCDFWriter(CompositeEntity container, String name)
+        throws IllegalActionException, NameDuplicationException
+    {
+        super(container, name);
+
+        filename = new FileParameter(this, "filename");
+        _filenameStr = "";
+        
+        variable = new StringParameter(this, "inputVariable");
+        dimensions = new StringParameter(this, "dimensions");
+        
+        writeOnFinish = new Parameter(this, "writeOnFinish");
+        writeOnFinish.setTypeEquals(BaseType.BOOLEAN);
+        writeOnFinish.setToken(BooleanToken.FALSE);
+    }
+
+    /** React to a change in an attribute. */
+    public void attributeChanged(Attribute attribute) throws IllegalActionException
+    {
+        if(attribute == filename)
+        {
+            Token token = filename.getToken();
+            if(token != null)
+            {
+                _filenameStr = ((StringToken)token).stringValue();
+            }
+        }
+        else if(attribute == variable)
+        {
+            String varStr = variable.stringValue();
+            if(!varStr.isEmpty() && (_variableName == null || !_variableName.equals(varStr)))
+            {                
+                // add the port if not already there
+                Port port = getPort(varStr);
+                if(port == null)
+                {
+                    try
+                    {
+                        port = new TypedIOPort(this, varStr, true, false);
+                        new Attribute(port, "_showName");
+                    }
+                    catch (NameDuplicationException e)
+                    {
+                        throw new IllegalActionException(this, e, "Error adding port " + varStr);
+                    }
+                }
+                _variableName = varStr;
+                _removeOldOutputPorts();
+            }
+        }
+        else if(attribute == dimensions)
+        {
+            String dimStr = dimensions.stringValue();
+            if(!dimStr.isEmpty() && (_dimensions == null || !_dimensions.equals(dimStr)))
+            {
+                _dimensions = dimStr;                
+                _parseDimensions();
+                _removeOldOutputPorts();
+            }
+        }
+        else if(attribute == writeOnFinish)
+        {
+            boolean val = ((BooleanToken)writeOnFinish.getToken()).booleanValue();
+            if(val != _writeOnFinish)
+            {
+                _writeOnFinish = val;
+                // if turned off, reparse dimensions to make sure each dimension has a length. 
+                if(!_writeOnFinish && _dimensions != null)
+                {
+                    _parseDimensions();
+                }
+            }
+        }
+        else
+        {
+            super.attributeChanged(attribute);
+        }
+    }
+        
+    /** Clone this actor into the specified workspace.
+     *  @param workspace The workspace for the cloned object.
+     *  @exception CloneNotSupportedException If cloned ports cannot have
+     *   as their container the cloned entity (this should not occur), or
+     *   if one of the attributes cannot be cloned.
+     *  @return A new NetCDFWriter.
+     */
+    public Object clone(Workspace workspace) throws CloneNotSupportedException
+    {        
+        NetCDFWriter newObject = (NetCDFWriter) super.clone(workspace);
+        newObject._array = null;
+        newObject._datatype = null;
+        newObject._dimensionMap = new HashMap<String,Integer>();
+        newObject._dimensions = null;
+        newObject._filenameStr = null;
+        newObject._maxValue = new HashMap<String,Integer>();
+        newObject._ncFile = null;
+        newObject._savedIndexes = new LinkedList<Map<String,Integer>>();
+        newObject._savedValues = new LinkedList<Token>();
+        newObject._variableName = null;
+        newObject._writeOnFinish = false;
+        return newObject;
+    }
+
+    /** Read the value and indexes, and update the array. */
+    public void fire() throws IllegalActionException
+    {      
+        // read value and add to array
+        final IOPort valuePort = (IOPort) getPort(_variableName);
+        final Token valueToken = valuePort.get(0);
+
+        if(_writeOnFinish)
+        {
+            final Map<String,Integer> map = new HashMap<String,Integer>();
+            for(String dimName : _dimensionMap.keySet())
+            {
+                final IOPort port = (IOPort) getPort(dimName);
+                final int val = ((IntToken)port.get(0)).intValue();
+                map.put(dimName, val);
+                
+                // see if we need to update max values.
+                final Integer max = _maxValue.get(dimName);
+                if(max == null || max < val)
+                {
+                    _maxValue.put(dimName, val);
+                }
+            }
+            
+            _savedValues.add(valueToken);
+            _savedIndexes.add(map);
+            
+        }
+        else
+        {
+            
+            // read dimension indexes
+            final Index index = _array.getIndex();
+            int i = 0;
+            for(Map.Entry<String, Integer> entry : _dimensionMap.entrySet())
+            {            
+                final String dimName = entry.getKey();
+                final int length = entry.getValue();
+                
+                final IOPort port = (IOPort) getPort(dimName);
+                final int val = ((IntToken)port.get(0)).intValue();
+                if(val >= length)
+                {
+                    throw new IllegalActionException(this, "Invalid value " +
+                        val + " for dimension " + dimName +
+                        " whose length is " + length);
+                }
+                
+                index.setDim(i, val);
+                i++;
+            }
+            
+            _writeOneValue(valueToken, index);
+        }
+        
+    }
+         
+    /** Create the NetCDF file, add the header information, and initialize the array. */
+    public void initialize() throws IllegalActionException
+    {
+        super.initialize();
+        
+        _datatype = null;
+        
+        _maxValue.clear();
+        _savedValues.clear();
+        _savedIndexes.clear();
+
+        // sanity checks
+
+        if(_variableName.isEmpty())
+        {
+            throw new IllegalActionException(this, "No variable specified.");
+        }
+
+        if(_dimensionMap.isEmpty())
+        {
+            throw new IllegalActionException(this, "No dimensions specified.");
+        }
+
+        if(!_writeOnFinish)
+        {
+            _openFile();
+        }
+    }        
+
+    /** Write the array to the file and close it. */
+    public void wrapup() throws IllegalActionException
+    {
+        
+        if(_writeOnFinish)
+        {
+            _openFileAndWriteToArray();
+        }
+        
+        // write array to file.
+        if(_ncFile != null && _array != null)
+        {
+            try
+            {
+                _ncFile.write(_variableName, _array);
+            }
+            catch (Exception e)
+            {
+                throw new IllegalActionException(this, e, "Error writing array data to file.");
+            }
+        }
+        
+        _closeFile();
+        
+        super.wrapup();
+    }
+
+    ///////////////////////////////////////////////////////////////////
+    ////                         public fields                     ////
+
+    /** The name of the NetCDF file. */
+    public FileParameter filename;
+    
+    /** The name of the variable. */
+    public StringParameter variable;
+    
+    /** A space-separated list of dimensions and their length, e.g., x[10] y[4]. */
+    public StringParameter dimensions;
+    
+    /** If true, write data to the NetCDF file when the workflow is finished.
+     *  Set this to true if the length of the dimensions are not known before
+     *  the workflow starts. (A length number is still required for each
+     *  dimension in the dimensions parameter, but the value is ignored).
+     */
+    public Parameter writeOnFinish;
+    
+    ///////////////////////////////////////////////////////////////////
+    ////                         private methods                   ////
+
+    /** Close the NetCDF file. */
+    private void _closeFile() throws IllegalActionException
+    {
+        _array = null;
+        if(_ncFile != null) {
+            try {
+                _ncFile.close();
+            } catch (IOException e) {
+                throw new IllegalActionException(this, e, "Error closing file.");
+            }
+        }
+    }
+
+    /** Close the NetCDF file ignoring any exception thrown. */
+    private void _closeFileIgnoreException()
+    {
+        try {
+            _closeFile();
+        } catch(Throwable t) {
+            System.err.println("Error closing file: " + t.getMessage());
+        }
+    }
+    
+    /** Open the NetCDF file and initialize the data array. */
+    private void _openFile() throws IllegalActionException
+    {
+        try
+        {   
+            _ncFile = NetcdfFileWriteable.createNew(_filenameStr);
+            
+            // add the dimensions
+            
+            List<Dimension> dimensions = new ArrayList<Dimension>();
+            for(Map.Entry<String, Integer> entry : _dimensionMap.entrySet())
+            {
+                dimensions.add(_ncFile.addDimension(entry.getKey(), entry.getValue()));
+            }
+            
+            // add the variable
+            
+            TypedIOPort port = (TypedIOPort) getPort(_variableName);
+            _datatype = _getNetCDFType(port.getType());
+            _ncFile.addVariable(_variableName, _datatype, dimensions);
+            
+            // create the file and leave define mode
+            _ncFile.create();
+            
+            // create the array            
+            int[] dim = new int[_dimensionMap.size()];
+            int i = 0;
+            for(Integer length : _dimensionMap.values())
+            {
+                dim[i] = length;
+                i++;
+            }
+
+            if(_datatype == DataType.DOUBLE) {
+                _array = new ArrayDouble(dim);
+            } else if(_datatype == DataType.FLOAT) {
+                _array = new ArrayFloat(dim);
+            } else if(_datatype == DataType.SHORT) {
+                _array = new ArrayShort(dim);
+            } else if(_datatype == DataType.INT) {
+                _array = new ArrayInt(dim);
+            } else if(_datatype == DataType.LONG) {
+                _array = new ArrayLong(dim);
+            } else if(_datatype == DataType.BOOLEAN) {
+                _array = new ArrayBoolean(dim);
+            }
+        }
+        catch(IOException e)
+        {
+            _closeFileIgnoreException();
+            throw new IllegalActionException(this, e, "Error creating netcdf file.");
+        }
+    }
+    
+    /** Parse the dimensions parameter and change input ports accordingly. */
+    private void _parseDimensions() throws IllegalActionException
+    {              
+       // use LinkedHashMap for predictable iteration order
+       _dimensionMap.clear();
+       
+       String[] dimArray = _dimensions.split("\\s+");
+       for(String dimStr : dimArray)
+       {
+           Matcher matcher = DIMENSION_PATTERN.matcher(dimStr);
+       
+           if(!matcher.matches())
+           {
+               throw new IllegalActionException(this, "Dimension not formatted correctly: " + dimStr);
+           }
+           
+           String name = matcher.group(1);
+           int length = Integer.valueOf(matcher.group(2));
+           
+           // length must be >= 1 unless we write the data when finishing
+           if(length < 1 && !_writeOnFinish)
+           {
+               throw new IllegalActionException(this, "Dimension " + name + " must have length >= 1.");
+           }
+           
+           _dimensionMap.put(name, length);
+           
+           // add input port if not there
+           TypedIOPort port = (TypedIOPort) getPort(name);
+           if(port == null)
+           {
+               try {
+                   port = new TypedIOPort(this, name, true, false);
+                   new Attribute(port, "_showName");
+               } catch(NameDuplicationException e) {
+                   throw new IllegalActionException(this, e, "Error creating port for dimension " + name);
+               }
+           }
+           port.setTypeEquals(BaseType.INT);
+           
+       }
+    }
+    
+    /** Remove ports that are not named after the variable or one
+     *  of the dimensions.
+     */
+    private void _removeOldOutputPorts() throws IllegalActionException
+    {
+       // remove output ports whose names are not dimensions
+       List<?> inputPorts = inputPortList();
+       for(Object object : inputPorts)
+       {
+           TypedIOPort port = (TypedIOPort)object;
+           String name = port.getName();
+           // make sure both variable and dimensions have been set before removing
+           if(_variableName != null && !name.equals(_variableName) &&
+               !_dimensionMap.isEmpty() && !_dimensionMap.containsKey(name))
+           {
+               try
+               {
+                   port.setContainer(null);
+               } 
+               catch (NameDuplicationException e)
+               {
+                   throw new IllegalActionException(this, e, "Error removing " + name);
+               }
+           }
+       }
+    }
+        
+    /** Get the corresponding NetCDF type for the Ptolemy type. */
+    private DataType _getNetCDFType(Type type) throws IllegalActionException
+    {
+        if(type == BaseType.DOUBLE) {
+            return DataType.DOUBLE;
+        } else if(type == BaseType.FLOAT) {
+            return DataType.FLOAT;
+        } else if(type == BaseType.SHORT) {
+            return DataType.SHORT;
+        } else if(type == BaseType.INT) {
+            return DataType.INT;
+        } else if(type == BaseType.LONG) {
+            return DataType.LONG;
+        } else if(type == BaseType.BOOLEAN) {
+            return DataType.BOOLEAN;
+        } else {
+            throw new IllegalActionException(this, "Unsupported conversion to NetCDF type : " + type);
+        }
+    }
+
+    /** Write data collected during fire() to the array. */
+    private void _openFileAndWriteToArray() throws IllegalActionException
+    {
+        
+        // add length for each dimension
+        Set<String> dimensionNames = new HashSet<String>(_dimensionMap.keySet());
+        for(String name : dimensionNames)
+        {
+            Integer max = _maxValue.get(name);
+            
+            if(max == null)
+            {
+                throw new IllegalActionException("Could not find values for dimension " + name);
+            }
+            
+            // add one since dimensions start at 1, not 0
+            _dimensionMap.put(name, max + 1);
+        }
+        
+        // open the file for writing
+        _openFile();
+                
+        // write each value
+        while(!_savedValues.isEmpty())
+        {
+            Token valueToken = _savedValues.remove(0);
+            Map<String,Integer> dimMap = _savedIndexes.remove(0);
+            
+            final Index index = _array.getIndex();
+            int i = 0;
+
+            for(String name : _dimensionMap.keySet())
+            {
+                final Integer dimVal = dimMap.get(name);
+                
+                if(dimVal == null)
+                {
+                    throw new IllegalActionException("No value for dimension " + name);
+                }
+                
+                index.setDim(i, dimVal);
+                i++;
+            }        
+            _writeOneValue(valueToken, index);
+        }
+    }
+    
+    /** Write a single value in a token to the array. */
+    private void _writeOneValue(Token valueToken, Index index)
+    {
+        //System.out.println(getName() + " index " + index);
+        
+        if(_datatype == DataType.DOUBLE) {
+            _array.setDouble(index, ((DoubleToken)valueToken).doubleValue());
+        } else if(_datatype == DataType.FLOAT) {
+            _array.setFloat(index, ((FloatToken)valueToken).floatValue());
+        } else if(_datatype == DataType.SHORT) {
+            _array.setShort(index, ((ShortToken)valueToken).shortValue());
+        } else if(_datatype == DataType.INT) {
+            _array.setInt(index, ((IntToken)valueToken).intValue());
+        } else if(_datatype == DataType.LONG) {
+            _array.setLong(index, ((LongToken)valueToken).longValue());
+        } else if(_datatype == DataType.BOOLEAN) {
+            _array.setBoolean(index, ((BooleanToken)valueToken).booleanValue());
+        }   
+    }
+
+    ///////////////////////////////////////////////////////////////////
+    ////                         private fields                    ////
+
+    /** The name of the NetCDF file. */
+    private String _filenameStr;
+    
+    /** The name of the variable. */
+    private String _variableName;
+    
+    /** A space-separate list of dimensions and their lengths. */
+    private String _dimensions;
+    
+    /** A map of dimension name to its length. Use LinkedHashMap 
+     *  for predictable iteration order.
+     */
+    private Map<String,Integer> _dimensionMap = new LinkedHashMap<String,Integer>();
+
+    /** The NetCDF file object. */
+    private NetcdfFileWriteable _ncFile;
+    
+    /** The NetCDF type of the variable. */
+    private DataType _datatype;
+    
+    /** The array containing the values of the variable. */
+    private Array _array;
+
+    /** Regular expression of a dimension. */
+    private final static Pattern DIMENSION_PATTERN = Pattern.compile("(\\w+)\\[(\\d+)\\]");
+
+    /** If true, write data to the NetCDF file when the workflow is finished. */
+    private boolean _writeOnFinish;
+    
+    /** The maximum value seen for each dimension. */
+    private Map<String,Integer> _maxValue = new HashMap<String,Integer>();
+
+    /** A list of values collected during fire(). */
+    private List<Token> _savedValues = new LinkedList<Token>();
+    
+    /** A list of index names and values collected during fire(). */
+    private List<Map<String,Integer>> _savedIndexes = new LinkedList<Map<String,Integer>>();
+    
+}



More information about the Kepler-cvs mailing list