[kepler-dev] Threaded actors in the DE Domain

Christopher Brooks cxh at eecs.berkeley.edu
Tue Sep 11 13:45:47 PDT 2007

Hi Ian,

Are you running from the latest Ptolemy II sources?
In December, 2006, there were changes that helped address this issue.

An internal report for a project said:
>  * We addressed concurrency errors in the graph package.
>  The graph package was not designed for a multithreaded environment.
>  A simple test that executes multiple models resulted in
>  ConcurrentModificationException was created by Raymond Cardillo
>  and Brian Hudson.  We added a modified version of that test to
>  our nightly build, where the test fails under Solaris with two
>  processors, but passes under Windows with one processor.
>  Our workaround is to make all public methods in TypeLattice
>  synchronized to the class.  This workaround could result in
>  slowness.



    Hi, I have an source actor which uses a thread to read from a TCP/IP
    socket. When data is received, it fires an event. The overall model
    consists of two of these sources which are connected to a finite state
    machine. This FSM is a modal model although none of the states actually
    have any refinements.
    The problem we are having is that occasionally we will get a graph
    construction exception. I think the problem is that when a state transition
    occurs the DAG is rebuilt. During this process a fire event can happen and
    you end up with 2 threads both trying to rebuild the DAG at the same time.
    This is a bit of a guess - but it certainly appears to be what is
    How do I synchronise the fireAt() call so that this does not happen? I seem
    to remember reading something somewhere about thread restrictions in
    fireAt() but cannot find the reference any more. Can anyone help?
    The exception I am getting is as follows:
    ptolemy.graph.GraphConstructionException: Attempt to add a node that is
    already contained in the graph.
    Dumps of the offending node and graph follow.
    The offending node:
    The offending graph:
    Node Set:
    0: ptolemy.actor.TypedIOPort
    1: ptolemy.actor.TypedIOPort
    2: ptolemy.actor.TypedIOPort

    227: (ptolemy.actor.TypedIOPort
    ptolemy.actor.TypedIOPort {.masterModel6ib.orderManager.Display.input})
    228: (ptolemy.actor.TypedIOPort
          at ptolemy.graph.Graph.addNode(Graph.java:308)
          at ptolemy.graph.Graph.addNodes(Graph.java:356)
          at ptolemy.graph.Graph.addGraph(Graph.java:284)
          at ptolemy.domains.de.kernel.DEReceiver.put(DEReceiver.java:150)
          at ptolemy.actor.AbstractReceiver.putToAll(AbstractReceiver.java:315)
          at ptolemy.actor.IOPort.broadcast(IOPort.java:279)
          at ptolemy.actor.TypedIOPort.broadcast(TypedIOPort.java:236)
          at ptolemy.domains.de.kernel.DEDirector.fire(DEDirector.java:505)
          at ptolemy.actor.CompositeActor.fire(CompositeActor.java:370)
          at ptolemy.actor.Manager.iterate(Manager.java:681)
          at ptolemy.actor.Manager.execute(Manager.java:331)
          at ptolemy.actor.Manager.run(Manager.java:1064)
          at ptolemy.actor.Manager$3.run(Manager.java:1105)
    My actor source class is implemented as follows:
    /* An actor that asynchronously reads currency pair updates.
    package com.hsbc.IMTActors;
    import java.io.IOException;
    import java.io.InputStream;
    import java.io.InputStreamReader;
    import java.net.Socket;
    import ptolemy.actor.TypedAtomicActor;
    import ptolemy.actor.TypedIOPort;
    import ptolemy.data.ArrayToken;
    import ptolemy.data.BooleanToken;
    import ptolemy.data.DoubleToken;
    import ptolemy.data.IntToken;
    import ptolemy.data.StringToken;
    import ptolemy.data.Token;
    import ptolemy.data.UnsignedByteToken;
    import ptolemy.data.expr.Parameter;
    import ptolemy.data.type.ArrayType;
    import ptolemy.data.type.BaseType;
    import ptolemy.kernel.CompositeEntity;
    import ptolemy.kernel.util.Attribute;
    import ptolemy.kernel.util.IllegalActionException;
    import ptolemy.kernel.util.InternalErrorException;
    import ptolemy.kernel.util.NameDuplicationException;
    import ptolemy.domains.de.kernel.DEDirector;
        an actor to subscribe to the IMT data feed and provide top-of-book
        price data for a single currency-pair instrument.
          @author Ian Brown
    public class CurrencyPairListener extends TypedAtomicActor {
        /** Construct an actor with the given container and name.
         *  @param container The container.
         *  @param name The name of this actor.
         *  @exception IllegalActionException If the actor cannot be contained
         *   by the proposed container.
         *  @exception NameDuplicationException If the container already has an
         *   actor with this name.
        public CurrencyPairListener(CompositeEntity container, String name)
                throws NameDuplicationException, IllegalActionException {
            super(container, name);
            // ports - Ordering here sets the order they show up in Vergil
            bid = new TypedIOPort(this, "bid");
            ask = new TypedIOPort(this, "ask");
            // parameters - Ordering here sets the order they show up in Vergil
            portNumberParam = new Parameter(this, "portNumber");
            portNumberParam.setToken(new IntToken(12345));
            serverAddressParam = new Parameter(this, "serverAddress");
            serverAddressParam.setToken(new StringToken(""));
            instrumentParam = new Parameter(this, "Instrument");
            instrumentParam.setToken(new StringToken("EURGBP"));
        ////                     ports and parameters                  ////
        // Ports and parameters are in the same order hare as in the
        /** This port outputs the latest bid price. */
        public TypedIOPort bid;
        /** This port outputs the latest ask price. */
        public TypedIOPort ask;
        /** The port number to connect to. */
        public Parameter portNumberParam;
        /** The IP address to connect to. */
        public Parameter serverAddressParam;
          /** The instrument we're watching. */
          public Parameter instrumentParam;
        ////                         public methods                    ////
         *  @exception IllegalActionException If the data cannot be
         *  converted into a token of the same type as the configured type
         *  of the output port.
        public void fire() throws IllegalActionException {
            // The part dependent on the packet's contents must be synchronized
            // to ensure that the thread does not mess with it while it is in
            // here.
            synchronized (_syncFireAndThread) {
                      if (_cur_bid > 0.){
                            _bidToken = new DoubleToken(_cur_bid);
                      } else {
                            _bidToken = null;
                      if (_cur_ask > 0.){
                            _askToken = new DoubleToken(_cur_ask);
                      } else {
                            _askToken = null;
                      if (_bidToken != null) bid.broadcast(_bidToken);
                      if (_askToken != null) ask.broadcast(_askToken);
                } // sync
        /** Initialize this actor, including the creation of an evaluation
         *  variable for the Ptolemy parser, a DatagramSocket for
         *  receiving datagrams, and a SocketReadingThread for blocking in
         *  the DatagramSocket.receive() method call.  This method is used
         *  as a bookend with wrapup() being the other end.  Resources
         *  created/allocated here are released in wrapup().
         *  @exception IllegalActionException If the
         *  <i>localSocketNumber</i> parameter has a value outside 0..65535
         * or a socket could not be created.  */
        public void initialize() throws IllegalActionException {
            _cur_bid = -1.0; _cur_ask = -1.0;
            int portNumber = ((IntToken)
            if ((portNumber < 0) || (portNumber > 65535)) {
                throw new IllegalActionException(this, portNumberParam
                        + " is outside the required 0..65535 range");
            String addr =
            if (_debugging) {
                _debug(this + "portNumber = " + portNumber + ", Address = " +
            // Allocate a new socket.
            try {
                if (_debugging) {
                    _debug("Trying to create a new socket on port "
                            + portNumber);
                _socket = new Socket(addr, portNumber);
                if (_debugging) {
                    _debug("Socket created successfully!");
            } catch (Exception ex) {
                throw new IllegalActionException(this, ex,
                        "Failed to create a new socket on port " + portNumber);
            // Allocate & start a thread to read from the socket.
            _socketReadingThread = new SocketReadingThread();
        /** Override the setContainer() method to call wrapup() if the
         *  actor is deleted while the model is running.  Wrapup() then
         *  releases resources acquired by initialize().
        public void setContainer(CompositeEntity container)
                throws IllegalActionException, NameDuplicationException {
            // FIXME: Is this necessary?
            if (container != getContainer()) {
        /** Release resources acquired in the initialize() method.
        public void wrapup() throws IllegalActionException {
            if (_socketReadingThread != null) {
                _socketReadingThread = null;
            } else {
                if (_debugging) {
                    _debug("socketReadingThread null at wrapup!?");
            if (_socket != null) {
                try {
                    _socket = null;
                } catch (IOException ex) {
                    throw new IllegalActionException(this, ex, "Failed to close
            } else {
                if (_debugging) {
                    _debug("Socket null at wrapup!?");
        ////                         private variables                 ////
        // Synchronization objects.  Used for synchronization between the
    fire() call and the reading thread.
        private Object _syncFireAndThread = new Object();
          // The socket we are reading from
        private Socket _socket;
        private SocketReadingThread _socketReadingThread;
        private DoubleToken _bidToken;
        private DoubleToken _askToken;
        private double _cur_bid = -1;
        private double _cur_ask = -1;
        ////                        private inner class                ////
        private class SocketReadingThread extends Thread {
            /** Constructor.  Create a new thread to listen for packets
             *  at the socket opened by the actor's [pre]initialize method.
            public SocketReadingThread() {
            /** Run.  Run the thread.  This begins running when .start()
             *  is called on the thread.
            public void run() {
                      String instrument;
                      try {
                            instrument =
                  } catch (Exception ex) {
                      throw new RuntimeException("failed to get the currency
    instrument", ex);
                      InputStream _socketStream;
                      try {
                            _socketStream = _socket.getInputStream();
                } catch (IOException ex) {
                            if (_debugging){
                                  _debug("IOException: " + ex);
                      // Attempt to receive something from the socket!
                      boolean receiving = true;
                      StringBuffer sb = new StringBuffer(64);
                      try {
                            while (true) {
                                  // NOTE: The following call may block.
                                  int c = _socketStream.read();
    //                                  if (_debugging){
    //                                        _debug(Integer.toString(c));
    //                                  }
                                  if (c > 0x0A){
                                  } else if (0 == c){
                                        // A packet was successfully received!
                                        if (_debugging){
                                        if (processInput(sb.toString(),
                                              try {
                                                    // fireAtCurrentTime has
    the result that all events are called at model
                                                    // time zero with an
    incrementing microstep. Whilst this will work for most
                                                    // scenarios, it has the
    effect that the timed plotter just displays a vertical
                                                    // line. For that reason,
    we need to figure out the correct time to fire the event at.
                                                    DEDirector director =
                                                    long start_time =
                                                    long time_now =
                                                    double model_time =
    (time_now - start_time) / 1000.0;
    director.fireAt(CurrencyPairListener.this, new
    ptolemy.actor.util.Time(director, model_time));
                                              } catch (IllegalActionException
    ex) {
                                                    throw new
    RuntimeException("fireAtCurrentTime() "
                                                                + "threw an
    exception", ex);
                      } catch (IOException ex) {
                            if (_debugging){
                                  _debug("IOException: " + ex);
                      } catch (NullPointerException ex) {
                            if (_debugging) {
                                  _debug("--!!--" + (_socket == null));
                // process the raw input from the TCP/IP stream
                // @return true if the price has changed
                private boolean processInput(String in_string, String
                      boolean retval = false;
                      String[] s = in_string.split(":");
                      if (s.length == 5 && s[0].equals("CY")){
                            if (s[1].equals(instrument)){
                                  try {
                                        Double new_val = new Double(s[3]);
                                      synchronized (_syncFireAndThread) {
                                              // set the output value
                                                    _cur_ask =
                                              } else {
                                                    _cur_bid =
                                              retval = true;
                                  } catch (NumberFormatException nfe){
                                      if (_debugging) {
                                  _debug(s[3] + " is not a valid price");
                      } else {
                            if (_debugging){
                                  _debug("length = " + s.length + " s[0] = " +
                      return retval;

More information about the Kepler-dev mailing list