[kepler-dev] Threaded actors in the DE Domain

Edward A. Lee eal at eecs.berkeley.edu
Wed Sep 12 16:55:47 PDT 2007


I see... Yes, as you discovered, this code won't work.
There are other problems lurking besides the exception you saw...

I would suggest that your thread should call fireAt() inside the
body of a change request.  Something like:

   ChangeRequest request = new ChangeRequest(suitable args) {
      protected void _execute() {
         get all the time information you need
         fireAt(...);
      }
   }
   requestChange(request);

The _execute() method will be evaluated by the DEDirector at a
point where current time is stable, it's not in the middle of doing
causality analysis (the cause for the error you saw), and no
actor is in the middle of firing.

The reason the error went away when you changed the FSM actor
to not allow modifications is simply that you removed a very
costly computation that the DEDirector was doing on each FSM
transition (you reported this problem before, and it's on my
"to do" list). However, all you did was reduce the size of
the target for concurrency errors. You didn't eliminate the
concurrency errors...

Edward


At 09:25 AM 9/12/2007, ian.brown at hsbcib.com wrote:
>Edward, the code for the actor was at the bottom of the original mail - I
>have chopped out the exception report and left it at the bottom of this
>mail so it should be easier to see.
>
>The actor is based on the Datagram Reader actor - but it reads from a
>connected TCP/IP socket rather than from a UDP port.
>There is only 1 DEDirector in the model and 2 of my source actors. This
>means that the DEDirector.fireAt() method will be called from more than one
>thread (in the same manner as the Datagram reader). Given that events will
>arrive asynchronously, this would appear the only way to do it. I could
>synchronise the calls to fireAt() but I would need to know what to
>synchronise it against...
>
>I'll take a look at the RealTimeComposte now...
>
>Thanks,
>
>Ian
>
>
>
>                                                                           
>             "Edward A. Lee"                                               
>             <eal at eecs.berkele                                             
>             y.edu>                                                     To 
>                                       ian.brown at hsbcib.com                
>             12/09/2007 17:00                                           cc 
>             Mail Size: 92009          kepler-dev at ecoinformatics.org       
>                                                                   Subject 
>                                       Re: [kepler-dev] Threaded actors in 
>                                       the DE Domain                       
>                                                                    Entity 
>                                       Investment Banking Europe - IBEU    
>                                                                           
>                                                                           
>                                                                           
>                                                                           
>                                                                           
>                                                                           
>
>
>
>
>
>Ian,
>
>I'm not sure what you mean by "it fires an event".
>This is fairly central... To get this exception, it would seem
>that you have to be executing DEDirector methods in two separate
>threads, which almost certainly won't work.  Can you explain
>exactly what you mean by this phrase?
>
>You may want to take a look at a "work in progress" that I've
>been working on:  ptolemy.actor.lib.hoc.RealTimeComposite.
>This actor runs its contained composite actor in a separate thread
>from the container that fires this actor.  It is meant for use
>in DE.
>
>Edward
>
>
>At 05:43 AM 9/11/2007, ian.brown at hsbcib.com wrote:
>>
>>My actor source class is implemented as follows:
>>
>>
>>/* An actor that asynchronously reads currency pair updates.
>> */
>>
>>package com.hsbc.IMTActors;
>>
>>import java.io.IOException;
>>import java.io.InputStream;
>>import java.io.InputStreamReader;
>>import java.net.Socket;
>>
>>import ptolemy.actor.TypedAtomicActor;
>>import ptolemy.actor.TypedIOPort;
>>import ptolemy.data.ArrayToken;
>>import ptolemy.data.BooleanToken;
>>import ptolemy.data.DoubleToken;
>>import ptolemy.data.IntToken;
>>import ptolemy.data.StringToken;
>>import ptolemy.data.Token;
>>import ptolemy.data.UnsignedByteToken;
>>import ptolemy.data.expr.Parameter;
>>import ptolemy.data.type.ArrayType;
>>import ptolemy.data.type.BaseType;
>>import ptolemy.kernel.CompositeEntity;
>>import ptolemy.kernel.util.Attribute;
>>import ptolemy.kernel.util.IllegalActionException;
>>import ptolemy.kernel.util.InternalErrorException;
>>import ptolemy.kernel.util.NameDuplicationException;
>>import ptolemy.domains.de.kernel.DEDirector;
>>
>>//////////////////////////////////////////////////////////////////////////
>>
>>/**
>>    an actor to subscribe to the IMT data feed and provide top-of-book
>>    price data for a single currency-pair instrument.
>>      @author Ian Brown
>> */
>>public class CurrencyPairListener extends TypedAtomicActor {
>>    /** Construct an actor with the given container and name.
>>     *  @param container The container.
>>     *  @param name The name of this actor.
>>     *  @exception IllegalActionException If the actor cannot be contained
>>     *   by the proposed container.
>>     *  @exception NameDuplicationException If the container already has
>an
>>     *   actor with this name.
>>     */
>>    public CurrencyPairListener(CompositeEntity container, String name)
>>            throws NameDuplicationException, IllegalActionException {
>>        super(container, name);
>>
>>        // ports - Ordering here sets the order they show up in Vergil
>>
>>        bid = new TypedIOPort(this, "bid");
>>        bid.setTypeEquals(BaseType.DOUBLE);
>>        bid.setOutput(true);
>>
>>        ask = new TypedIOPort(this, "ask");
>>        ask.setTypeEquals(BaseType.DOUBLE);
>>        ask.setOutput(true);
>>
>>        // parameters - Ordering here sets the order they show up in
>Vergil
>>        portNumberParam = new Parameter(this, "portNumber");
>>        portNumberParam.setTypeEquals(BaseType.INT);
>>        portNumberParam.setToken(new IntToken(12345));
>>
>>        serverAddressParam = new Parameter(this, "serverAddress");
>>        serverAddressParam.setTypeEquals(BaseType.STRING);
>>        serverAddressParam.setToken(new StringToken("128.8.120.18"));
>>
>>        instrumentParam = new Parameter(this, "Instrument");
>>        instrumentParam.setTypeEquals(BaseType.STRING);
>>        instrumentParam.setToken(new StringToken("EURGBP"));
>>
>>    }
>>
>>    ///////////////////////////////////////////////////////////////////
>>    ////                     ports and parameters                  ////
>>    // Ports and parameters are in the same order hare as in the
>>constructor.
>>
>>    /** This port outputs the latest bid price. */
>>    public TypedIOPort bid;
>>
>>    /** This port outputs the latest ask price. */
>>    public TypedIOPort ask;
>>
>>    /** The port number to connect to. */
>>    public Parameter portNumberParam;
>>
>>    /** The IP address to connect to. */
>>    public Parameter serverAddressParam;
>>
>>      /** The instrument we're watching. */
>>      public Parameter instrumentParam;
>>
>>    ///////////////////////////////////////////////////////////////////
>>    ////                         public methods                    ////
>>
>>      /**
>>     *  @exception IllegalActionException If the data cannot be
>>     *  converted into a token of the same type as the configured type
>>     *  of the output port.
>>     */
>>    public void fire() throws IllegalActionException {
>>        super.fire();
>>
>>        // The part dependent on the packet's contents must be
>synchronized
>>        // to ensure that the thread does not mess with it while it is in
>>use
>>        // here.
>>        synchronized (_syncFireAndThread) {
>>                  if (_cur_bid > 0.){
>>                        _bidToken = new DoubleToken(_cur_bid);
>>                  } else {
>>                        _bidToken = null;
>>                  }
>>                  if (_cur_ask > 0.){
>>                        _askToken = new DoubleToken(_cur_ask);
>>                  } else {
>>                        _askToken = null;
>>                  }
>>            _syncFireAndThread.notifyAll();
>>
>>
>>                  if (_bidToken != null) bid.broadcast(_bidToken);
>>                  if (_askToken != null) ask.broadcast(_askToken);
>>            } // sync
>>    }
>>
>>    /** Initialize this actor, including the creation of an evaluation
>>     *  variable for the Ptolemy parser, a DatagramSocket for
>>     *  receiving datagrams, and a SocketReadingThread for blocking in
>>     *  the DatagramSocket.receive() method call.  This method is used
>>     *  as a bookend with wrapup() being the other end.  Resources
>>     *  created/allocated here are released in wrapup().
>>     *  @exception IllegalActionException If the
>>     *  <i>localSocketNumber</i> parameter has a value outside 0..65535
>>     * or a socket could not be created.  */
>>    public void initialize() throws IllegalActionException {
>>        super.initialize();
>>
>>        _cur_bid = -1.0; _cur_ask = -1.0;
>>
>>        int portNumber = ((IntToken)
>>(portNumberParam.getToken())).intValue();
>>
>>        if ((portNumber < 0) || (portNumber > 65535)) {
>>            throw new IllegalActionException(this, portNumberParam
>>                    + " is outside the required 0..65535 range");
>>        }
>>
>>        String addr =
>>((StringToken)(serverAddressParam.getToken())).stringValue();
>>
>>        if (_debugging) {
>>            _debug(this + "portNumber = " + portNumber + ", Address = " +
>>addr);
>>        }
>>
>>
>>        // Allocate a new socket.
>>        try {
>>            if (_debugging) {
>>                _debug("Trying to create a new socket on port "
>>                        + portNumber);
>>            }
>>
>>            _socket = new Socket(addr, portNumber);
>>            if (_debugging) {
>>                _debug("Socket created successfully!");
>>            }
>>        } catch (Exception ex) {
>>            throw new IllegalActionException(this, ex,
>>                    "Failed to create a new socket on port " +
>portNumber);
>>        }
>>
>>        // Allocate & start a thread to read from the socket.
>>        _socketReadingThread = new SocketReadingThread();
>>        _socketReadingThread.start();
>>    }
>>
>>    /** Override the setContainer() method to call wrapup() if the
>>     *  actor is deleted while the model is running.  Wrapup() then
>>     *  releases resources acquired by initialize().
>>     */
>>    public void setContainer(CompositeEntity container)
>>            throws IllegalActionException, NameDuplicationException {
>>        // FIXME: Is this necessary?
>>        if (container != getContainer()) {
>>            wrapup();
>>        }
>>        super.setContainer(container);
>>    }
>>
>>    /** Release resources acquired in the initialize() method.
>>     */
>>    public void wrapup() throws IllegalActionException {
>>        if (_socketReadingThread != null) {
>>            _socketReadingThread.interrupt();
>>            _socketReadingThread = null;
>>        } else {
>>            if (_debugging) {
>>                _debug("socketReadingThread null at wrapup!?");
>>            }
>>        }
>>
>>        if (_socket != null) {
>>            try {
>>                _socket.close();
>>                _socket = null;
>>            } catch (IOException ex) {
>>                throw new IllegalActionException(this, ex, "Failed to
>close
>>socket");
>>            }
>>        } else {
>>            if (_debugging) {
>>                _debug("Socket null at wrapup!?");
>>            }
>>        }
>>    }
>>
>>    ///////////////////////////////////////////////////////////////////
>>    ////                         private variables                 ////
>>    // Synchronization objects.  Used for synchronization between the
>>fire() call and the reading thread.
>>    private Object _syncFireAndThread = new Object();
>>
>>      // The socket we are reading from
>>    private Socket _socket;
>>
>>    private SocketReadingThread _socketReadingThread;
>>
>>    private DoubleToken _bidToken;
>>    private DoubleToken _askToken;
>>
>>    private double _cur_bid = -1;
>>    private double _cur_ask = -1;
>>
>>    ///////////////////////////////////////////////////////////////////
>>    ////                        private inner class                ////
>>    private class SocketReadingThread extends Thread {
>>        /** Constructor.  Create a new thread to listen for packets
>>         *  at the socket opened by the actor's [pre]initialize method.
>>         */
>>        public SocketReadingThread() {
>>        }
>>
>>        /** Run.  Run the thread.  This begins running when .start()
>>         *  is called on the thread.
>>         */
>>        public void run() {
>>                  String instrument;
>>                  try {
>>                        instrument =
>>((StringToken)(instrumentParam.getToken())).stringValue();
>>              } catch (Exception ex) {
>>                  throw new RuntimeException("failed to get the currency
>>instrument", ex);
>>              }
>>                  InputStream _socketStream;
>>                  try {
>>                        _socketStream = _socket.getInputStream();
>>            } catch (IOException ex) {
>>                        if (_debugging){
>>                              _debug("IOException: " + ex);
>>                        }
>>                        return;
>>                  }
>>
>>                  // Attempt to receive something from the socket!
>>                  boolean receiving = true;
>>
>>                  StringBuffer sb = new StringBuffer(64);
>>                  try {
>>                        while (true) {
>>                              // NOTE: The following call may block.
>>                              int c = _socketStream.read();
>>//                                  if (_debugging){
>>//                                        _debug(Integer.toString(c));
>>//                                  }
>>                              if (c > 0x0A){
>>                                    sb.append((char)c);
>>                              } else if (0 == c){
>>                                    // A packet was successfully received!
>>                                    if (_debugging){
>>                                          _debug(sb.toString());
>>                                    }
>>                                    if (processInput(sb.toString(),
>>instrument)){
>>                                          try {
>>                                                // fireAtCurrentTime has
>>the result that all events are called at model
>>                                                // time zero with an
>>incrementing microstep. Whilst this will work for most
>>                                                // scenarios, it has the
>>effect that the timed plotter just displays a vertical
>>                                                // line. For that reason,
>>we need to figure out the correct time to fire the event at.
>>
>>//getDirector().fireAtCurrentTime(CurrencyPairListener.this);
>>                                                //
>>                                                DEDirector director =
>>(DEDirector)(getDirector());
>>                                                long start_time =
>>director.getRealStartTimeMillis();
>>                                                long time_now =
>>java.lang.System.currentTimeMillis();
>>                                                double model_time =
>>(time_now - start_time) / 1000.0;
>>
>>director.fireAt(CurrencyPairListener.this, new
>>ptolemy.actor.util.Time(director, model_time));
>>                                          } catch (IllegalActionException
>>ex) {
>>                                                throw new
>>RuntimeException("fireAtCurrentTime() "
>>                                                            + "threw an
>>exception", ex);
>>                                          }
>>                                    }
>>                                    sb.setLength(0);
>>                              }
>>                        }
>>                  } catch (IOException ex) {
>>                        if (_debugging){
>>                              _debug("IOException: " + ex);
>>                        }
>>                  } catch (NullPointerException ex) {
>>                        if (_debugging) {
>>                              _debug("--!!--" + (_socket == null));
>>                        }
>>                  }
>>        }
>>
>>            // process the raw input from the TCP/IP stream
>>            // @return true if the price has changed
>>            //
>>            private boolean processInput(String in_string, String
>>instrument){
>>                  boolean retval = false;
>>                  String[] s = in_string.split(":");
>>                  if (s.length == 5 && s[0].equals("CY")){
>>                        if (s[1].equals(instrument)){
>>                              try {
>>                                    Double new_val = new Double(s[3]);
>>
>>                                  synchronized (_syncFireAndThread) {
>>                                          // set the output value
>>                                          if(s[2].equals("S")){
>>                                                _cur_ask =
>>new_val.doubleValue();
>>                                          } else {
>>                                                _cur_bid =
>>new_val.doubleValue();
>>                                          }
>>                                          retval = true;
>>                                    }
>>                              } catch (NumberFormatException nfe){
>>                                  if (_debugging) {
>>                              _debug(s[3] + " is not a valid price");
>>                          }
>>                              }
>>                        }
>>                  } else {
>>                        if (_debugging){
>>                              _debug("length = " + s.length + " s[0] = " +
>>s[0]);
>>                        }
>>                  }
>>                  return retval;
>>            }
>>    }
>>}
>>
>>
>>
>>Thanks,
>>
>>Ian
>>
>>
>>************************************************************
>>HSBC Bank plc may be solicited in the course of its placement efforts for
>a
>>new issue, by investment clients of the firm for whom the Bank as a firm
>>already provides other services. It may equally decide to allocate to its
>>own proprietary book or with an associate of HSBC Group. This represents a
>>potential conflict of interest. HSBC Bank plc has internal arrangements
>>designed to ensure that the firm would give unbiased and full advice to
>the
>>corporate finance client about the valuation and pricing of the offering
>as
>>well as internal systems, controls and procedures to identify and manage
>>conflicts of interest.
>>
>>HSBC Bank plc
>>Registered Office: 8 Canada Square, London E14 5HQ, United Kingdom
>>Registered in England - Number 14259
>>Authorised and regulated by the Financial Services Authority.
>>************************************************************
>>
>>-----------------------------------------
>>SAVE PAPER - THINK BEFORE YOU PRINT!
>>
>>This transmission has been issued by a member of the HSBC Group
>>"HSBC" for the information of the addressee only and should not be
>>reproduced and/or distributed to any other person. Each page
>>attached hereto must be read in conjunction with any disclaimer
>>which forms part of it. Unless otherwise stated, this transmission
>>is neither an offer nor the solicitation of an offer to sell or
>>purchase any investment. Its contents are based on information
>>obtained from sources believed to be reliable but HSBC makes no
>>representation and accepts no responsibility or liability as to its
>>completeness or accuracy.
>>_______________________________________________
>>Kepler-dev mailing list
>>Kepler-dev at ecoinformatics.org
>>http://mercury.nceas.ucsb.edu/ecoinformatics/mailman/listinfo/kepler-dev
>
>------------
>Edward A. Lee
>Chair of EECS and Robert S. Pepper Distinguished Professor
>231 Cory Hall, UC Berkeley, Berkeley, CA 94720-1770
>phone: 510-642-0253, fax: 510-642-2845
>eal at eecs.Berkeley.EDU, http://ptolemy.eecs.berkeley.edu/~eal
>
>
>
>
>************************************************************
>HSBC Bank plc may be solicited in the course of its placement efforts for a
>new issue, by investment clients of the firm for whom the Bank as a firm
>already provides other services. It may equally decide to allocate to its
>own proprietary book or with an associate of HSBC Group. This represents a
>potential conflict of interest. HSBC Bank plc has internal arrangements
>designed to ensure that the firm would give unbiased and full advice to the
>corporate finance client about the valuation and pricing of the offering as
>well as internal systems, controls and procedures to identify and manage
>conflicts of interest.
>
>HSBC Bank plc
>Registered Office: 8 Canada Square, London E14 5HQ, United Kingdom
>Registered in England - Number 14259
>Authorised and regulated by the Financial Services Authority.
>************************************************************

------------ 
Edward A. Lee
Chair of EECS and Robert S. Pepper Distinguished Professor
231 Cory Hall, UC Berkeley, Berkeley, CA 94720-1770
phone: 510-642-0253, fax: 510-642-2845
eal at eecs.Berkeley.EDU, http://ptolemy.eecs.berkeley.edu/~eal  



More information about the Kepler-dev mailing list