[kepler-dev] Threaded actors in the DE Domain

Christopher Brooks cxh at eecs.berkeley.edu
Tue Sep 11 13:45:47 PDT 2007


Hi Ian,

Are you running from the latest Ptolemy II sources?
In December, 2006, there were changes that helped address this issue.

An internal report for a project said:
>  * We addressed concurrency errors in the graph package.
>  The graph package was not designed for a multithreaded environment.
>  A simple test that executes multiple models resulted in
>  ConcurrentModificationException was created by Raymond Cardillo
>  and Brian Hudson.  We added a modified version of that test to
>  our nightly build, where the test fails under Solaris with two
>  processors, but passes under Windows with one processor.
>
>  Our workaround is to make all public methods in TypeLattice
>  synchronized to the class.  This workaround could result in
>  slowness.

_Christopher

--------

    Hi, I have an source actor which uses a thread to read from a TCP/IP
    socket. When data is received, it fires an event. The overall model
    consists of two of these sources which are connected to a finite state
    machine. This FSM is a modal model although none of the states actually
    have any refinements.
    
    The problem we are having is that occasionally we will get a graph
    construction exception. I think the problem is that when a state transition
    occurs the DAG is rebuilt. During this process a fire event can happen and
    you end up with 2 threads both trying to rebuild the DAG at the same time.
    This is a bit of a guess - but it certainly appears to be what is
    happening.
    How do I synchronise the fireAt() call so that this does not happen? I seem
    to remember reading something somewhere about thread restrictions in
    fireAt() but cannot find the reference any more. Can anyone help?
    
    The exception I am getting is as follows:
    
    ptolemy.graph.GraphConstructionException: Attempt to add a node that is
    already contained in the graph.
    Dumps of the offending node and graph follow.
    The offending node:
    ptolemy.actor.TypedIOPort
    {.masterModel6ib.meanReversionParent.meanRevision.ArrayVariance.in}
    The offending graph:
    {ptolemy.graph.DirectedGraph
    Node Set:
    0: ptolemy.actor.TypedIOPort
    {.masterModel6ib.meanReversionParent.meanRevision2.positionTrigger2.trigger
   }
    1: ptolemy.actor.TypedIOPort
    {.masterModel6ib.meanReversionParent.meanRevision2.positionTrigger2.output}
    2: ptolemy.actor.TypedIOPort
    {.masterModel6ib.meanReversionParent.meanRevision2.orderBuilder.Const2.trig




    227: (ptolemy.actor.TypedIOPort
    {.masterModel6ib.orderManager.CompositeActor.FindOrderWS.status},
    ptolemy.actor.TypedIOPort {.masterModel6ib.orderManager.Display.input})
    228: (ptolemy.actor.TypedIOPort
    {.masterModel6ib.orderManager.CompositeActor.FindOrderWS.status},
    ptolemy.actor.TypedIOPort
    {.masterModel6ib.orderManager.CompositeActor.TicCount2.input})
    }
    
    
          at ptolemy.graph.Graph.addNode(Graph.java:308)
          at ptolemy.graph.Graph.addNodes(Graph.java:356)
          at ptolemy.graph.Graph.addGraph(Graph.java:284)
          at
    ptolemy.actor.util.FunctionDependencyOfCompositeActor._mergeActorsGraph(Fun
   ctionDependencyOfCompositeActor.java:377)
          at
    ptolemy.actor.util.FunctionDependencyOfCompositeActor._constructDetailedDep
   endencyGraph(FunctionDependencyOfCompositeActor.java:225)
          at
    ptolemy.actor.util.FunctionDependencyOfCompositeActor._constructDependencyG
   raph(FunctionDependencyOfCompositeActor.java:146)
          at
    ptolemy.actor.util.FunctionDependency._validate(FunctionDependency.java:273
   )
          at
    ptolemy.actor.util.FunctionDependencyOfCompositeActor.getCycleNodes(Functio
   nDependencyOfCompositeActor.java:116)
          at
    ptolemy.domains.de.kernel.DEDirector._constructDirectedGraph(DEDirector.jav
   a:1617)
          at
    ptolemy.domains.de.kernel.DEDirector._computePortDepth(DEDirector.java:1386
   )
          at
    ptolemy.domains.de.kernel.DEDirector._getDepthOfIOPort(DEDirector.java:1691
   )
          at
    ptolemy.domains.de.kernel.DEDirector._enqueueTriggerEvent(DEDirector.java:1
   260)
          at ptolemy.domains.de.kernel.DEReceiver.put(DEReceiver.java:150)
          at ptolemy.actor.AbstractReceiver.putToAll(AbstractReceiver.java:315)
          at ptolemy.actor.IOPort.broadcast(IOPort.java:279)
          at ptolemy.actor.TypedIOPort.broadcast(TypedIOPort.java:236)
          at
    com.hsbc.IMTActors.CurrencyPairListener.fire(CurrencyPairListener.java:122)
          at ptolemy.domains.de.kernel.DEDirector.fire(DEDirector.java:505)
          at ptolemy.actor.CompositeActor.fire(CompositeActor.java:370)
          at ptolemy.actor.Manager.iterate(Manager.java:681)
          at ptolemy.actor.Manager.execute(Manager.java:331)
          at ptolemy.actor.Manager.run(Manager.java:1064)
          at ptolemy.actor.Manager$3.run(Manager.java:1105)
    
    
    
    My actor source class is implemented as follows:
    
    
    /* An actor that asynchronously reads currency pair updates.
     */
    
    package com.hsbc.IMTActors;
    
    import java.io.IOException;
    import java.io.InputStream;
    import java.io.InputStreamReader;
    import java.net.Socket;
    
    import ptolemy.actor.TypedAtomicActor;
    import ptolemy.actor.TypedIOPort;
    import ptolemy.data.ArrayToken;
    import ptolemy.data.BooleanToken;
    import ptolemy.data.DoubleToken;
    import ptolemy.data.IntToken;
    import ptolemy.data.StringToken;
    import ptolemy.data.Token;
    import ptolemy.data.UnsignedByteToken;
    import ptolemy.data.expr.Parameter;
    import ptolemy.data.type.ArrayType;
    import ptolemy.data.type.BaseType;
    import ptolemy.kernel.CompositeEntity;
    import ptolemy.kernel.util.Attribute;
    import ptolemy.kernel.util.IllegalActionException;
    import ptolemy.kernel.util.InternalErrorException;
    import ptolemy.kernel.util.NameDuplicationException;
    import ptolemy.domains.de.kernel.DEDirector;
    
    //////////////////////////////////////////////////////////////////////////
    
    /**
        an actor to subscribe to the IMT data feed and provide top-of-book
        price data for a single currency-pair instrument.
          @author Ian Brown
     */
    public class CurrencyPairListener extends TypedAtomicActor {
        /** Construct an actor with the given container and name.
         *  @param container The container.
         *  @param name The name of this actor.
         *  @exception IllegalActionException If the actor cannot be contained
         *   by the proposed container.
         *  @exception NameDuplicationException If the container already has an
         *   actor with this name.
         */
        public CurrencyPairListener(CompositeEntity container, String name)
                throws NameDuplicationException, IllegalActionException {
            super(container, name);
    
            // ports - Ordering here sets the order they show up in Vergil
    
            bid = new TypedIOPort(this, "bid");
            bid.setTypeEquals(BaseType.DOUBLE);
            bid.setOutput(true);
    
            ask = new TypedIOPort(this, "ask");
            ask.setTypeEquals(BaseType.DOUBLE);
            ask.setOutput(true);
    
            // parameters - Ordering here sets the order they show up in Vergil
            portNumberParam = new Parameter(this, "portNumber");
            portNumberParam.setTypeEquals(BaseType.INT);
            portNumberParam.setToken(new IntToken(12345));
    
            serverAddressParam = new Parameter(this, "serverAddress");
            serverAddressParam.setTypeEquals(BaseType.STRING);
            serverAddressParam.setToken(new StringToken("128.8.120.18"));
    
            instrumentParam = new Parameter(this, "Instrument");
            instrumentParam.setTypeEquals(BaseType.STRING);
            instrumentParam.setToken(new StringToken("EURGBP"));
    
        }
    
        ///////////////////////////////////////////////////////////////////
        ////                     ports and parameters                  ////
        // Ports and parameters are in the same order hare as in the
    constructor.
    
        /** This port outputs the latest bid price. */
        public TypedIOPort bid;
    
        /** This port outputs the latest ask price. */
        public TypedIOPort ask;
    
        /** The port number to connect to. */
        public Parameter portNumberParam;
    
        /** The IP address to connect to. */
        public Parameter serverAddressParam;
    
          /** The instrument we're watching. */
          public Parameter instrumentParam;
    
        ///////////////////////////////////////////////////////////////////
        ////                         public methods                    ////
    
          /**
         *  @exception IllegalActionException If the data cannot be
         *  converted into a token of the same type as the configured type
         *  of the output port.
         */
        public void fire() throws IllegalActionException {
            super.fire();
    
            // The part dependent on the packet's contents must be synchronized
            // to ensure that the thread does not mess with it while it is in
    use
            // here.
            synchronized (_syncFireAndThread) {
                      if (_cur_bid > 0.){
                            _bidToken = new DoubleToken(_cur_bid);
                      } else {
                            _bidToken = null;
                      }
                      if (_cur_ask > 0.){
                            _askToken = new DoubleToken(_cur_ask);
                      } else {
                            _askToken = null;
                      }
                _syncFireAndThread.notifyAll();
    
    
                      if (_bidToken != null) bid.broadcast(_bidToken);
                      if (_askToken != null) ask.broadcast(_askToken);
                } // sync
        }
    
        /** Initialize this actor, including the creation of an evaluation
         *  variable for the Ptolemy parser, a DatagramSocket for
         *  receiving datagrams, and a SocketReadingThread for blocking in
         *  the DatagramSocket.receive() method call.  This method is used
         *  as a bookend with wrapup() being the other end.  Resources
         *  created/allocated here are released in wrapup().
         *  @exception IllegalActionException If the
         *  <i>localSocketNumber</i> parameter has a value outside 0..65535
         * or a socket could not be created.  */
        public void initialize() throws IllegalActionException {
            super.initialize();
    
            _cur_bid = -1.0; _cur_ask = -1.0;
    
            int portNumber = ((IntToken)
    (portNumberParam.getToken())).intValue();
    
            if ((portNumber < 0) || (portNumber > 65535)) {
                throw new IllegalActionException(this, portNumberParam
                        + " is outside the required 0..65535 range");
            }
    
            String addr =
    ((StringToken)(serverAddressParam.getToken())).stringValue();
    
            if (_debugging) {
                _debug(this + "portNumber = " + portNumber + ", Address = " +
    addr);
            }
    
    
            // Allocate a new socket.
            try {
                if (_debugging) {
                    _debug("Trying to create a new socket on port "
                            + portNumber);
                }
    
                _socket = new Socket(addr, portNumber);
                if (_debugging) {
                    _debug("Socket created successfully!");
                }
            } catch (Exception ex) {
                throw new IllegalActionException(this, ex,
                        "Failed to create a new socket on port " + portNumber);
            }
    
            // Allocate & start a thread to read from the socket.
            _socketReadingThread = new SocketReadingThread();
            _socketReadingThread.start();
        }
    
        /** Override the setContainer() method to call wrapup() if the
         *  actor is deleted while the model is running.  Wrapup() then
         *  releases resources acquired by initialize().
         */
        public void setContainer(CompositeEntity container)
                throws IllegalActionException, NameDuplicationException {
            // FIXME: Is this necessary?
            if (container != getContainer()) {
                wrapup();
            }
            super.setContainer(container);
        }
    
        /** Release resources acquired in the initialize() method.
         */
        public void wrapup() throws IllegalActionException {
            if (_socketReadingThread != null) {
                _socketReadingThread.interrupt();
                _socketReadingThread = null;
            } else {
                if (_debugging) {
                    _debug("socketReadingThread null at wrapup!?");
                }
            }
    
            if (_socket != null) {
                try {
                    _socket.close();
                    _socket = null;
                } catch (IOException ex) {
                    throw new IllegalActionException(this, ex, "Failed to close
    socket");
                }
            } else {
                if (_debugging) {
                    _debug("Socket null at wrapup!?");
                }
            }
        }
    
        ///////////////////////////////////////////////////////////////////
        ////                         private variables                 ////
        // Synchronization objects.  Used for synchronization between the
    fire() call and the reading thread.
        private Object _syncFireAndThread = new Object();
    
          // The socket we are reading from
        private Socket _socket;
    
        private SocketReadingThread _socketReadingThread;
    
        private DoubleToken _bidToken;
        private DoubleToken _askToken;
    
        private double _cur_bid = -1;
        private double _cur_ask = -1;
    
        ///////////////////////////////////////////////////////////////////
        ////                        private inner class                ////
        private class SocketReadingThread extends Thread {
            /** Constructor.  Create a new thread to listen for packets
             *  at the socket opened by the actor's [pre]initialize method.
             */
            public SocketReadingThread() {
            }
    
            /** Run.  Run the thread.  This begins running when .start()
             *  is called on the thread.
             */
            public void run() {
                      String instrument;
                      try {
                            instrument =
    ((StringToken)(instrumentParam.getToken())).stringValue();
                  } catch (Exception ex) {
                      throw new RuntimeException("failed to get the currency
    instrument", ex);
                  }
                      InputStream _socketStream;
                      try {
                            _socketStream = _socket.getInputStream();
                } catch (IOException ex) {
                            if (_debugging){
                                  _debug("IOException: " + ex);
                            }
                            return;
                      }
    
                      // Attempt to receive something from the socket!
                      boolean receiving = true;
    
                      StringBuffer sb = new StringBuffer(64);
                      try {
                            while (true) {
                                  // NOTE: The following call may block.
                                  int c = _socketStream.read();
    //                                  if (_debugging){
    //                                        _debug(Integer.toString(c));
    //                                  }
                                  if (c > 0x0A){
                                        sb.append((char)c);
                                  } else if (0 == c){
                                        // A packet was successfully received!
                                        if (_debugging){
                                              _debug(sb.toString());
                                        }
                                        if (processInput(sb.toString(),
    instrument)){
                                              try {
                                                    // fireAtCurrentTime has
    the result that all events are called at model
                                                    // time zero with an
    incrementing microstep. Whilst this will work for most
                                                    // scenarios, it has the
    effect that the timed plotter just displays a vertical
                                                    // line. For that reason,
    we need to figure out the correct time to fire the event at.
    
    //getDirector().fireAtCurrentTime(CurrencyPairListener.this);
                                                    //
                                                    DEDirector director =
    (DEDirector)(getDirector());
                                                    long start_time =
    director.getRealStartTimeMillis();
                                                    long time_now =
    java.lang.System.currentTimeMillis();
                                                    double model_time =
    (time_now - start_time) / 1000.0;
    
    director.fireAt(CurrencyPairListener.this, new
    ptolemy.actor.util.Time(director, model_time));
                                              } catch (IllegalActionException
    ex) {
                                                    throw new
    RuntimeException("fireAtCurrentTime() "
                                                                + "threw an
    exception", ex);
                                              }
                                        }
                                        sb.setLength(0);
                                  }
                            }
                      } catch (IOException ex) {
                            if (_debugging){
                                  _debug("IOException: " + ex);
                            }
                      } catch (NullPointerException ex) {
                            if (_debugging) {
                                  _debug("--!!--" + (_socket == null));
                            }
                      }
            }
    
                // process the raw input from the TCP/IP stream
                // @return true if the price has changed
                //
                private boolean processInput(String in_string, String
    instrument){
                      boolean retval = false;
                      String[] s = in_string.split(":");
                      if (s.length == 5 && s[0].equals("CY")){
                            if (s[1].equals(instrument)){
                                  try {
                                        Double new_val = new Double(s[3]);
    
                                      synchronized (_syncFireAndThread) {
                                              // set the output value
                                              if(s[2].equals("S")){
                                                    _cur_ask =
    new_val.doubleValue();
                                              } else {
                                                    _cur_bid =
    new_val.doubleValue();
                                              }
                                              retval = true;
                                        }
                                  } catch (NumberFormatException nfe){
                                      if (_debugging) {
                                  _debug(s[3] + " is not a valid price");
                              }
                                  }
                            }
                      } else {
                            if (_debugging){
                                  _debug("length = " + s.length + " s[0] = " +
    s[0]);
                            }
                      }
                      return retval;
                }
        }
    }
    
    
    
    Thanks,
    
    Ian
    
--------


More information about the Kepler-dev mailing list