[kepler-dev] Threaded actors in the DE Domain

ian.brown@hsbcib.com ian.brown at hsbcib.com
Wed Sep 12 00:57:44 PDT 2007


Hi Christoper, I'm running with the nightly source drop from 9/9/2007 - so
that's pretty recent :)
As a hack at the moment, I've set the _mutationEnabled member in
FSMDirector to false in FSMDirector::initialize(). This is oky for our
particular model (no refinements in any states) but it obviously a
temporary hack. It would be better to understand how I need to synchronise
the fireAt() call from the Actor.

Ian



                                                                           
             "Christopher                                                  
             Brooks"                                                       
             <cxh at eecs.berkele                                          To 
             y.edu>                    ian.brown at hsbcib.com                
             Sent by:                                                   cc 
             cxh at EECS.Berkeley         kepler-dev at ecoinformatics.org       
             .EDU                                                  Subject 
                                       Re: [kepler-dev] Threaded actors in 
                                       the DE Domain                       
             11/09/2007 21:45                                       Entity 
             Mail Size: 24247          Investment Banking Europe - IBEU    
                                                                           
                                                                           
                                                                           
                                                                           
                                                                           
                                                                           




Hi Ian,

Are you running from the latest Ptolemy II sources?
In December, 2006, there were changes that helped address this issue.

An internal report for a project said:
>  * We addressed concurrency errors in the graph package.
>  The graph package was not designed for a multithreaded environment.
>  A simple test that executes multiple models resulted in
>  ConcurrentModificationException was created by Raymond Cardillo
>  and Brian Hudson.  We added a modified version of that test to
>  our nightly build, where the test fails under Solaris with two
>  processors, but passes under Windows with one processor.
>
>  Our workaround is to make all public methods in TypeLattice
>  synchronized to the class.  This workaround could result in
>  slowness.

_Christopher

--------

    Hi, I have an source actor which uses a thread to read from a TCP/IP
    socket. When data is received, it fires an event. The overall model
    consists of two of these sources which are connected to a finite state
    machine. This FSM is a modal model although none of the states actually
    have any refinements.

    The problem we are having is that occasionally we will get a graph
    construction exception. I think the problem is that when a state
transition
    occurs the DAG is rebuilt. During this process a fire event can happen
and
    you end up with 2 threads both trying to rebuild the DAG at the same
time.
    This is a bit of a guess - but it certainly appears to be what is
    happening.
    How do I synchronise the fireAt() call so that this does not happen? I
seem
    to remember reading something somewhere about thread restrictions in
    fireAt() but cannot find the reference any more. Can anyone help?

    The exception I am getting is as follows:

    ptolemy.graph.GraphConstructionException: Attempt to add a node that is
    already contained in the graph.
    Dumps of the offending node and graph follow.
    The offending node:
    ptolemy.actor.TypedIOPort
    {.masterModel6ib.meanReversionParent.meanRevision.ArrayVariance.in}
    The offending graph:
    {ptolemy.graph.DirectedGraph
    Node Set:
    0: ptolemy.actor.TypedIOPort

{.masterModel6ib.meanReversionParent.meanRevision2.positionTrigger2.trigger
   }
    1: ptolemy.actor.TypedIOPort

{.masterModel6ib.meanReversionParent.meanRevision2.positionTrigger2.output}
    2: ptolemy.actor.TypedIOPort

{.masterModel6ib.meanReversionParent.meanRevision2.orderBuilder.Const2.trig




    227: (ptolemy.actor.TypedIOPort
    {.masterModel6ib.orderManager.CompositeActor.FindOrderWS.status},
    ptolemy.actor.TypedIOPort {.masterModel6ib.orderManager.Display.input})
    228: (ptolemy.actor.TypedIOPort
    {.masterModel6ib.orderManager.CompositeActor.FindOrderWS.status},
    ptolemy.actor.TypedIOPort
    {.masterModel6ib.orderManager.CompositeActor.TicCount2.input})
    }


          at ptolemy.graph.Graph.addNode(Graph.java:308)
          at ptolemy.graph.Graph.addNodes(Graph.java:356)
          at ptolemy.graph.Graph.addGraph(Graph.java:284)
          at

ptolemy.actor.util.FunctionDependencyOfCompositeActor._mergeActorsGraph(Fun
   ctionDependencyOfCompositeActor.java:377)
          at

ptolemy.actor.util.FunctionDependencyOfCompositeActor._constructDetailedDep
   endencyGraph(FunctionDependencyOfCompositeActor.java:225)
          at

ptolemy.actor.util.FunctionDependencyOfCompositeActor._constructDependencyG
   raph(FunctionDependencyOfCompositeActor.java:146)
          at

ptolemy.actor.util.FunctionDependency._validate(FunctionDependency.java:273
   )
          at

ptolemy.actor.util.FunctionDependencyOfCompositeActor.getCycleNodes(Functio
   nDependencyOfCompositeActor.java:116)
          at

ptolemy.domains.de.kernel.DEDirector._constructDirectedGraph(DEDirector.jav
   a:1617)
          at

ptolemy.domains.de.kernel.DEDirector._computePortDepth(DEDirector.java:1386
   )
          at

ptolemy.domains.de.kernel.DEDirector._getDepthOfIOPort(DEDirector.java:1691
   )
          at

ptolemy.domains.de.kernel.DEDirector._enqueueTriggerEvent(DEDirector.java:1
   260)
          at ptolemy.domains.de.kernel.DEReceiver.put(DEReceiver.java:150)
          at
ptolemy.actor.AbstractReceiver.putToAll(AbstractReceiver.java:315)
          at ptolemy.actor.IOPort.broadcast(IOPort.java:279)
          at ptolemy.actor.TypedIOPort.broadcast(TypedIOPort.java:236)
          at

com.hsbc.IMTActors.CurrencyPairListener.fire(CurrencyPairListener.java:122)
          at ptolemy.domains.de.kernel.DEDirector.fire(DEDirector.java:505)
          at ptolemy.actor.CompositeActor.fire(CompositeActor.java:370)
          at ptolemy.actor.Manager.iterate(Manager.java:681)
          at ptolemy.actor.Manager.execute(Manager.java:331)
          at ptolemy.actor.Manager.run(Manager.java:1064)
          at ptolemy.actor.Manager$3.run(Manager.java:1105)



    My actor source class is implemented as follows:


    /* An actor that asynchronously reads currency pair updates.
     */

    package com.hsbc.IMTActors;

    import java.io.IOException;
    import java.io.InputStream;
    import java.io.InputStreamReader;
    import java.net.Socket;

    import ptolemy.actor.TypedAtomicActor;
    import ptolemy.actor.TypedIOPort;
    import ptolemy.data.ArrayToken;
    import ptolemy.data.BooleanToken;
    import ptolemy.data.DoubleToken;
    import ptolemy.data.IntToken;
    import ptolemy.data.StringToken;
    import ptolemy.data.Token;
    import ptolemy.data.UnsignedByteToken;
    import ptolemy.data.expr.Parameter;
    import ptolemy.data.type.ArrayType;
    import ptolemy.data.type.BaseType;
    import ptolemy.kernel.CompositeEntity;
    import ptolemy.kernel.util.Attribute;
    import ptolemy.kernel.util.IllegalActionException;
    import ptolemy.kernel.util.InternalErrorException;
    import ptolemy.kernel.util.NameDuplicationException;
    import ptolemy.domains.de.kernel.DEDirector;


//////////////////////////////////////////////////////////////////////////

    /**
        an actor to subscribe to the IMT data feed and provide top-of-book
        price data for a single currency-pair instrument.
          @author Ian Brown
     */
    public class CurrencyPairListener extends TypedAtomicActor {
        /** Construct an actor with the given container and name.
         *  @param container The container.
         *  @param name The name of this actor.
         *  @exception IllegalActionException If the actor cannot be
contained
         *   by the proposed container.
         *  @exception NameDuplicationException If the container already
has an
         *   actor with this name.
         */
        public CurrencyPairListener(CompositeEntity container, String name)
                throws NameDuplicationException, IllegalActionException {
            super(container, name);

            // ports - Ordering here sets the order they show up in Vergil

            bid = new TypedIOPort(this, "bid");
            bid.setTypeEquals(BaseType.DOUBLE);
            bid.setOutput(true);

            ask = new TypedIOPort(this, "ask");
            ask.setTypeEquals(BaseType.DOUBLE);
            ask.setOutput(true);

            // parameters - Ordering here sets the order they show up in
Vergil
            portNumberParam = new Parameter(this, "portNumber");
            portNumberParam.setTypeEquals(BaseType.INT);
            portNumberParam.setToken(new IntToken(12345));

            serverAddressParam = new Parameter(this, "serverAddress");
            serverAddressParam.setTypeEquals(BaseType.STRING);
            serverAddressParam.setToken(new StringToken("128.8.120.18"));

            instrumentParam = new Parameter(this, "Instrument");
            instrumentParam.setTypeEquals(BaseType.STRING);
            instrumentParam.setToken(new StringToken("EURGBP"));

        }

        ///////////////////////////////////////////////////////////////////
        ////                     ports and parameters                  ////
        // Ports and parameters are in the same order hare as in the
    constructor.

        /** This port outputs the latest bid price. */
        public TypedIOPort bid;

        /** This port outputs the latest ask price. */
        public TypedIOPort ask;

        /** The port number to connect to. */
        public Parameter portNumberParam;

        /** The IP address to connect to. */
        public Parameter serverAddressParam;

          /** The instrument we're watching. */
          public Parameter instrumentParam;

        ///////////////////////////////////////////////////////////////////
        ////                         public methods                    ////

          /**
         *  @exception IllegalActionException If the data cannot be
         *  converted into a token of the same type as the configured type
         *  of the output port.
         */
        public void fire() throws IllegalActionException {
            super.fire();

            // The part dependent on the packet's contents must be
synchronized
            // to ensure that the thread does not mess with it while it is
in
    use
            // here.
            synchronized (_syncFireAndThread) {
                      if (_cur_bid > 0.){
                            _bidToken = new DoubleToken(_cur_bid);
                      } else {
                            _bidToken = null;
                      }
                      if (_cur_ask > 0.){
                            _askToken = new DoubleToken(_cur_ask);
                      } else {
                            _askToken = null;
                      }
                _syncFireAndThread.notifyAll();


                      if (_bidToken != null) bid.broadcast(_bidToken);
                      if (_askToken != null) ask.broadcast(_askToken);
                } // sync
        }

        /** Initialize this actor, including the creation of an evaluation
         *  variable for the Ptolemy parser, a DatagramSocket for
         *  receiving datagrams, and a SocketReadingThread for blocking in
         *  the DatagramSocket.receive() method call.  This method is used
         *  as a bookend with wrapup() being the other end.  Resources
         *  created/allocated here are released in wrapup().
         *  @exception IllegalActionException If the
         *  <i>localSocketNumber</i> parameter has a value outside 0..65535
         * or a socket could not be created.  */
        public void initialize() throws IllegalActionException {
            super.initialize();

            _cur_bid = -1.0; _cur_ask = -1.0;

            int portNumber = ((IntToken)
    (portNumberParam.getToken())).intValue();

            if ((portNumber < 0) || (portNumber > 65535)) {
                throw new IllegalActionException(this, portNumberParam
                        + " is outside the required 0..65535 range");
            }

            String addr =
    ((StringToken)(serverAddressParam.getToken())).stringValue();

            if (_debugging) {
                _debug(this + "portNumber = " + portNumber + ", Address = "
+
    addr);
            }


            // Allocate a new socket.
            try {
                if (_debugging) {
                    _debug("Trying to create a new socket on port "
                            + portNumber);
                }

                _socket = new Socket(addr, portNumber);
                if (_debugging) {
                    _debug("Socket created successfully!");
                }
            } catch (Exception ex) {
                throw new IllegalActionException(this, ex,
                        "Failed to create a new socket on port " +
portNumber);
            }

            // Allocate & start a thread to read from the socket.
            _socketReadingThread = new SocketReadingThread();
            _socketReadingThread.start();
        }

        /** Override the setContainer() method to call wrapup() if the
         *  actor is deleted while the model is running.  Wrapup() then
         *  releases resources acquired by initialize().
         */
        public void setContainer(CompositeEntity container)
                throws IllegalActionException, NameDuplicationException {
            // FIXME: Is this necessary?
            if (container != getContainer()) {
                wrapup();
            }
            super.setContainer(container);
        }

        /** Release resources acquired in the initialize() method.
         */
        public void wrapup() throws IllegalActionException {
            if (_socketReadingThread != null) {
                _socketReadingThread.interrupt();
                _socketReadingThread = null;
            } else {
                if (_debugging) {
                    _debug("socketReadingThread null at wrapup!?");
                }
            }

            if (_socket != null) {
                try {
                    _socket.close();
                    _socket = null;
                } catch (IOException ex) {
                    throw new IllegalActionException(this, ex, "Failed to
close
    socket");
                }
            } else {
                if (_debugging) {
                    _debug("Socket null at wrapup!?");
                }
            }
        }

        ///////////////////////////////////////////////////////////////////
        ////                         private variables                 ////
        // Synchronization objects.  Used for synchronization between the
    fire() call and the reading thread.
        private Object _syncFireAndThread = new Object();

          // The socket we are reading from
        private Socket _socket;

        private SocketReadingThread _socketReadingThread;

        private DoubleToken _bidToken;
        private DoubleToken _askToken;

        private double _cur_bid = -1;
        private double _cur_ask = -1;

        ///////////////////////////////////////////////////////////////////
        ////                        private inner class                ////
        private class SocketReadingThread extends Thread {
            /** Constructor.  Create a new thread to listen for packets
             *  at the socket opened by the actor's [pre]initialize method.
             */
            public SocketReadingThread() {
            }

            /** Run.  Run the thread.  This begins running when .start()
             *  is called on the thread.
             */
            public void run() {
                      String instrument;
                      try {
                            instrument =
    ((StringToken)(instrumentParam.getToken())).stringValue();
                  } catch (Exception ex) {
                      throw new RuntimeException("failed to get the
currency
    instrument", ex);
                  }
                      InputStream _socketStream;
                      try {
                            _socketStream = _socket.getInputStream();
                } catch (IOException ex) {
                            if (_debugging){
                                  _debug("IOException: " + ex);
                            }
                            return;
                      }

                      // Attempt to receive something from the socket!
                      boolean receiving = true;

                      StringBuffer sb = new StringBuffer(64);
                      try {
                            while (true) {
                                  // NOTE: The following call may block.
                                  int c = _socketStream.read();
    //                                  if (_debugging){
    //                                        _debug(Integer.toString(c));
    //                                  }
                                  if (c > 0x0A){
                                        sb.append((char)c);
                                  } else if (0 == c){
                                        // A packet was successfully
received!
                                        if (_debugging){
                                              _debug(sb.toString());
                                        }
                                        if (processInput(sb.toString(),
    instrument)){
                                              try {
                                                    // fireAtCurrentTime
has
    the result that all events are called at model
                                                    // time zero with an
    incrementing microstep. Whilst this will work for most
                                                    // scenarios, it has
the
    effect that the timed plotter just displays a vertical
                                                    // line. For that
reason,
    we need to figure out the correct time to fire the event at.

    //getDirector().fireAtCurrentTime(CurrencyPairListener.this);
                                                    //
                                                    DEDirector director =
    (DEDirector)(getDirector());
                                                    long start_time =
    director.getRealStartTimeMillis();
                                                    long time_now =
    java.lang.System.currentTimeMillis();
                                                    double model_time =
    (time_now - start_time) / 1000.0;

    director.fireAt(CurrencyPairListener.this, new
    ptolemy.actor.util.Time(director, model_time));
                                              } catch
(IllegalActionException
    ex) {
                                                    throw new
    RuntimeException("fireAtCurrentTime() "
                                                                + "threw an
    exception", ex);
                                              }
                                        }
                                        sb.setLength(0);
                                  }
                            }
                      } catch (IOException ex) {
                            if (_debugging){
                                  _debug("IOException: " + ex);
                            }
                      } catch (NullPointerException ex) {
                            if (_debugging) {
                                  _debug("--!!--" + (_socket == null));
                            }
                      }
            }

                // process the raw input from the TCP/IP stream
                // @return true if the price has changed
                //
                private boolean processInput(String in_string, String
    instrument){
                      boolean retval = false;
                      String[] s = in_string.split(":");
                      if (s.length == 5 && s[0].equals("CY")){
                            if (s[1].equals(instrument)){
                                  try {
                                        Double new_val = new Double(s[3]);

                                      synchronized (_syncFireAndThread) {
                                              // set the output value
                                              if(s[2].equals("S")){
                                                    _cur_ask =
    new_val.doubleValue();
                                              } else {
                                                    _cur_bid =
    new_val.doubleValue();
                                              }
                                              retval = true;
                                        }
                                  } catch (NumberFormatException nfe){
                                      if (_debugging) {
                                  _debug(s[3] + " is not a valid price");
                              }
                                  }
                            }
                      } else {
                            if (_debugging){
                                  _debug("length = " + s.length + " s[0] =
" +
    s[0]);
                            }
                      }
                      return retval;
                }
        }
    }



    Thanks,

    Ian

--------




************************************************************
HSBC Bank plc may be solicited in the course of its placement efforts for a
new issue, by investment clients of the firm for whom the Bank as a firm
already provides other services. It may equally decide to allocate to its
own proprietary book or with an associate of HSBC Group. This represents a
potential conflict of interest. HSBC Bank plc has internal arrangements
designed to ensure that the firm would give unbiased and full advice to the
corporate finance client about the valuation and pricing of the offering as
well as internal systems, controls and procedures to identify and manage
conflicts of interest.

HSBC Bank plc
Registered Office: 8 Canada Square, London E14 5HQ, United Kingdom
Registered in England - Number 14259
Authorised and regulated by the Financial Services Authority.
************************************************************

-----------------------------------------
SAVE PAPER - THINK BEFORE YOU PRINT!

This transmission has been issued by a member of the HSBC Group
"HSBC" for the information of the addressee only and should not be
reproduced and/or distributed to any other person. Each page
attached hereto must be read in conjunction with any disclaimer
which forms part of it. Unless otherwise stated, this transmission
is neither an offer nor the solicitation of an offer to sell or
purchase any investment. Its contents are based on information
obtained from sources believed to be reliable but HSBC makes no
representation and accepts no responsibility or liability as to its
completeness or accuracy.


More information about the Kepler-dev mailing list