[kepler-dev] Threaded actors in the DE Domain

ian.brown@hsbcib.com ian.brown at hsbcib.com
Wed Sep 12 09:25:50 PDT 2007


Edward, the code for the actor was at the bottom of the original mail - I
have chopped out the exception report and left it at the bottom of this
mail so it should be easier to see.

The actor is based on the Datagram Reader actor - but it reads from a
connected TCP/IP socket rather than from a UDP port.
There is only 1 DEDirector in the model and 2 of my source actors. This
means that the DEDirector.fireAt() method will be called from more than one
thread (in the same manner as the Datagram reader). Given that events will
arrive asynchronously, this would appear the only way to do it. I could
synchronise the calls to fireAt() but I would need to know what to
synchronise it against...

I'll take a look at the RealTimeComposte now...

Thanks,

Ian



                                                                           
             "Edward A. Lee"                                               
             <eal at eecs.berkele                                             
             y.edu>                                                     To 
                                       ian.brown at hsbcib.com                
             12/09/2007 17:00                                           cc 
             Mail Size: 92009          kepler-dev at ecoinformatics.org       
                                                                   Subject 
                                       Re: [kepler-dev] Threaded actors in 
                                       the DE Domain                       
                                                                    Entity 
                                       Investment Banking Europe - IBEU    
                                                                           
                                                                           
                                                                           
                                                                           
                                                                           
                                                                           





Ian,

I'm not sure what you mean by "it fires an event".
This is fairly central... To get this exception, it would seem
that you have to be executing DEDirector methods in two separate
threads, which almost certainly won't work.  Can you explain
exactly what you mean by this phrase?

You may want to take a look at a "work in progress" that I've
been working on:  ptolemy.actor.lib.hoc.RealTimeComposite.
This actor runs its contained composite actor in a separate thread
from the container that fires this actor.  It is meant for use
in DE.

Edward


At 05:43 AM 9/11/2007, ian.brown at hsbcib.com wrote:
>
>My actor source class is implemented as follows:
>
>
>/* An actor that asynchronously reads currency pair updates.
> */
>
>package com.hsbc.IMTActors;
>
>import java.io.IOException;
>import java.io.InputStream;
>import java.io.InputStreamReader;
>import java.net.Socket;
>
>import ptolemy.actor.TypedAtomicActor;
>import ptolemy.actor.TypedIOPort;
>import ptolemy.data.ArrayToken;
>import ptolemy.data.BooleanToken;
>import ptolemy.data.DoubleToken;
>import ptolemy.data.IntToken;
>import ptolemy.data.StringToken;
>import ptolemy.data.Token;
>import ptolemy.data.UnsignedByteToken;
>import ptolemy.data.expr.Parameter;
>import ptolemy.data.type.ArrayType;
>import ptolemy.data.type.BaseType;
>import ptolemy.kernel.CompositeEntity;
>import ptolemy.kernel.util.Attribute;
>import ptolemy.kernel.util.IllegalActionException;
>import ptolemy.kernel.util.InternalErrorException;
>import ptolemy.kernel.util.NameDuplicationException;
>import ptolemy.domains.de.kernel.DEDirector;
>
>//////////////////////////////////////////////////////////////////////////
>
>/**
>    an actor to subscribe to the IMT data feed and provide top-of-book
>    price data for a single currency-pair instrument.
>      @author Ian Brown
> */
>public class CurrencyPairListener extends TypedAtomicActor {
>    /** Construct an actor with the given container and name.
>     *  @param container The container.
>     *  @param name The name of this actor.
>     *  @exception IllegalActionException If the actor cannot be contained
>     *   by the proposed container.
>     *  @exception NameDuplicationException If the container already has
an
>     *   actor with this name.
>     */
>    public CurrencyPairListener(CompositeEntity container, String name)
>            throws NameDuplicationException, IllegalActionException {
>        super(container, name);
>
>        // ports - Ordering here sets the order they show up in Vergil
>
>        bid = new TypedIOPort(this, "bid");
>        bid.setTypeEquals(BaseType.DOUBLE);
>        bid.setOutput(true);
>
>        ask = new TypedIOPort(this, "ask");
>        ask.setTypeEquals(BaseType.DOUBLE);
>        ask.setOutput(true);
>
>        // parameters - Ordering here sets the order they show up in
Vergil
>        portNumberParam = new Parameter(this, "portNumber");
>        portNumberParam.setTypeEquals(BaseType.INT);
>        portNumberParam.setToken(new IntToken(12345));
>
>        serverAddressParam = new Parameter(this, "serverAddress");
>        serverAddressParam.setTypeEquals(BaseType.STRING);
>        serverAddressParam.setToken(new StringToken("128.8.120.18"));
>
>        instrumentParam = new Parameter(this, "Instrument");
>        instrumentParam.setTypeEquals(BaseType.STRING);
>        instrumentParam.setToken(new StringToken("EURGBP"));
>
>    }
>
>    ///////////////////////////////////////////////////////////////////
>    ////                     ports and parameters                  ////
>    // Ports and parameters are in the same order hare as in the
>constructor.
>
>    /** This port outputs the latest bid price. */
>    public TypedIOPort bid;
>
>    /** This port outputs the latest ask price. */
>    public TypedIOPort ask;
>
>    /** The port number to connect to. */
>    public Parameter portNumberParam;
>
>    /** The IP address to connect to. */
>    public Parameter serverAddressParam;
>
>      /** The instrument we're watching. */
>      public Parameter instrumentParam;
>
>    ///////////////////////////////////////////////////////////////////
>    ////                         public methods                    ////
>
>      /**
>     *  @exception IllegalActionException If the data cannot be
>     *  converted into a token of the same type as the configured type
>     *  of the output port.
>     */
>    public void fire() throws IllegalActionException {
>        super.fire();
>
>        // The part dependent on the packet's contents must be
synchronized
>        // to ensure that the thread does not mess with it while it is in
>use
>        // here.
>        synchronized (_syncFireAndThread) {
>                  if (_cur_bid > 0.){
>                        _bidToken = new DoubleToken(_cur_bid);
>                  } else {
>                        _bidToken = null;
>                  }
>                  if (_cur_ask > 0.){
>                        _askToken = new DoubleToken(_cur_ask);
>                  } else {
>                        _askToken = null;
>                  }
>            _syncFireAndThread.notifyAll();
>
>
>                  if (_bidToken != null) bid.broadcast(_bidToken);
>                  if (_askToken != null) ask.broadcast(_askToken);
>            } // sync
>    }
>
>    /** Initialize this actor, including the creation of an evaluation
>     *  variable for the Ptolemy parser, a DatagramSocket for
>     *  receiving datagrams, and a SocketReadingThread for blocking in
>     *  the DatagramSocket.receive() method call.  This method is used
>     *  as a bookend with wrapup() being the other end.  Resources
>     *  created/allocated here are released in wrapup().
>     *  @exception IllegalActionException If the
>     *  <i>localSocketNumber</i> parameter has a value outside 0..65535
>     * or a socket could not be created.  */
>    public void initialize() throws IllegalActionException {
>        super.initialize();
>
>        _cur_bid = -1.0; _cur_ask = -1.0;
>
>        int portNumber = ((IntToken)
>(portNumberParam.getToken())).intValue();
>
>        if ((portNumber < 0) || (portNumber > 65535)) {
>            throw new IllegalActionException(this, portNumberParam
>                    + " is outside the required 0..65535 range");
>        }
>
>        String addr =
>((StringToken)(serverAddressParam.getToken())).stringValue();
>
>        if (_debugging) {
>            _debug(this + "portNumber = " + portNumber + ", Address = " +
>addr);
>        }
>
>
>        // Allocate a new socket.
>        try {
>            if (_debugging) {
>                _debug("Trying to create a new socket on port "
>                        + portNumber);
>            }
>
>            _socket = new Socket(addr, portNumber);
>            if (_debugging) {
>                _debug("Socket created successfully!");
>            }
>        } catch (Exception ex) {
>            throw new IllegalActionException(this, ex,
>                    "Failed to create a new socket on port " +
portNumber);
>        }
>
>        // Allocate & start a thread to read from the socket.
>        _socketReadingThread = new SocketReadingThread();
>        _socketReadingThread.start();
>    }
>
>    /** Override the setContainer() method to call wrapup() if the
>     *  actor is deleted while the model is running.  Wrapup() then
>     *  releases resources acquired by initialize().
>     */
>    public void setContainer(CompositeEntity container)
>            throws IllegalActionException, NameDuplicationException {
>        // FIXME: Is this necessary?
>        if (container != getContainer()) {
>            wrapup();
>        }
>        super.setContainer(container);
>    }
>
>    /** Release resources acquired in the initialize() method.
>     */
>    public void wrapup() throws IllegalActionException {
>        if (_socketReadingThread != null) {
>            _socketReadingThread.interrupt();
>            _socketReadingThread = null;
>        } else {
>            if (_debugging) {
>                _debug("socketReadingThread null at wrapup!?");
>            }
>        }
>
>        if (_socket != null) {
>            try {
>                _socket.close();
>                _socket = null;
>            } catch (IOException ex) {
>                throw new IllegalActionException(this, ex, "Failed to
close
>socket");
>            }
>        } else {
>            if (_debugging) {
>                _debug("Socket null at wrapup!?");
>            }
>        }
>    }
>
>    ///////////////////////////////////////////////////////////////////
>    ////                         private variables                 ////
>    // Synchronization objects.  Used for synchronization between the
>fire() call and the reading thread.
>    private Object _syncFireAndThread = new Object();
>
>      // The socket we are reading from
>    private Socket _socket;
>
>    private SocketReadingThread _socketReadingThread;
>
>    private DoubleToken _bidToken;
>    private DoubleToken _askToken;
>
>    private double _cur_bid = -1;
>    private double _cur_ask = -1;
>
>    ///////////////////////////////////////////////////////////////////
>    ////                        private inner class                ////
>    private class SocketReadingThread extends Thread {
>        /** Constructor.  Create a new thread to listen for packets
>         *  at the socket opened by the actor's [pre]initialize method.
>         */
>        public SocketReadingThread() {
>        }
>
>        /** Run.  Run the thread.  This begins running when .start()
>         *  is called on the thread.
>         */
>        public void run() {
>                  String instrument;
>                  try {
>                        instrument =
>((StringToken)(instrumentParam.getToken())).stringValue();
>              } catch (Exception ex) {
>                  throw new RuntimeException("failed to get the currency
>instrument", ex);
>              }
>                  InputStream _socketStream;
>                  try {
>                        _socketStream = _socket.getInputStream();
>            } catch (IOException ex) {
>                        if (_debugging){
>                              _debug("IOException: " + ex);
>                        }
>                        return;
>                  }
>
>                  // Attempt to receive something from the socket!
>                  boolean receiving = true;
>
>                  StringBuffer sb = new StringBuffer(64);
>                  try {
>                        while (true) {
>                              // NOTE: The following call may block.
>                              int c = _socketStream.read();
>//                                  if (_debugging){
>//                                        _debug(Integer.toString(c));
>//                                  }
>                              if (c > 0x0A){
>                                    sb.append((char)c);
>                              } else if (0 == c){
>                                    // A packet was successfully received!
>                                    if (_debugging){
>                                          _debug(sb.toString());
>                                    }
>                                    if (processInput(sb.toString(),
>instrument)){
>                                          try {
>                                                // fireAtCurrentTime has
>the result that all events are called at model
>                                                // time zero with an
>incrementing microstep. Whilst this will work for most
>                                                // scenarios, it has the
>effect that the timed plotter just displays a vertical
>                                                // line. For that reason,
>we need to figure out the correct time to fire the event at.
>
>//getDirector().fireAtCurrentTime(CurrencyPairListener.this);
>                                                //
>                                                DEDirector director =
>(DEDirector)(getDirector());
>                                                long start_time =
>director.getRealStartTimeMillis();
>                                                long time_now =
>java.lang.System.currentTimeMillis();
>                                                double model_time =
>(time_now - start_time) / 1000.0;
>
>director.fireAt(CurrencyPairListener.this, new
>ptolemy.actor.util.Time(director, model_time));
>                                          } catch (IllegalActionException
>ex) {
>                                                throw new
>RuntimeException("fireAtCurrentTime() "
>                                                            + "threw an
>exception", ex);
>                                          }
>                                    }
>                                    sb.setLength(0);
>                              }
>                        }
>                  } catch (IOException ex) {
>                        if (_debugging){
>                              _debug("IOException: " + ex);
>                        }
>                  } catch (NullPointerException ex) {
>                        if (_debugging) {
>                              _debug("--!!--" + (_socket == null));
>                        }
>                  }
>        }
>
>            // process the raw input from the TCP/IP stream
>            // @return true if the price has changed
>            //
>            private boolean processInput(String in_string, String
>instrument){
>                  boolean retval = false;
>                  String[] s = in_string.split(":");
>                  if (s.length == 5 && s[0].equals("CY")){
>                        if (s[1].equals(instrument)){
>                              try {
>                                    Double new_val = new Double(s[3]);
>
>                                  synchronized (_syncFireAndThread) {
>                                          // set the output value
>                                          if(s[2].equals("S")){
>                                                _cur_ask =
>new_val.doubleValue();
>                                          } else {
>                                                _cur_bid =
>new_val.doubleValue();
>                                          }
>                                          retval = true;
>                                    }
>                              } catch (NumberFormatException nfe){
>                                  if (_debugging) {
>                              _debug(s[3] + " is not a valid price");
>                          }
>                              }
>                        }
>                  } else {
>                        if (_debugging){
>                              _debug("length = " + s.length + " s[0] = " +
>s[0]);
>                        }
>                  }
>                  return retval;
>            }
>    }
>}
>
>
>
>Thanks,
>
>Ian
>
>
>************************************************************
>HSBC Bank plc may be solicited in the course of its placement efforts for
a
>new issue, by investment clients of the firm for whom the Bank as a firm
>already provides other services. It may equally decide to allocate to its
>own proprietary book or with an associate of HSBC Group. This represents a
>potential conflict of interest. HSBC Bank plc has internal arrangements
>designed to ensure that the firm would give unbiased and full advice to
the
>corporate finance client about the valuation and pricing of the offering
as
>well as internal systems, controls and procedures to identify and manage
>conflicts of interest.
>
>HSBC Bank plc
>Registered Office: 8 Canada Square, London E14 5HQ, United Kingdom
>Registered in England - Number 14259
>Authorised and regulated by the Financial Services Authority.
>************************************************************
>
>-----------------------------------------
>SAVE PAPER - THINK BEFORE YOU PRINT!
>
>This transmission has been issued by a member of the HSBC Group
>"HSBC" for the information of the addressee only and should not be
>reproduced and/or distributed to any other person. Each page
>attached hereto must be read in conjunction with any disclaimer
>which forms part of it. Unless otherwise stated, this transmission
>is neither an offer nor the solicitation of an offer to sell or
>purchase any investment. Its contents are based on information
>obtained from sources believed to be reliable but HSBC makes no
>representation and accepts no responsibility or liability as to its
>completeness or accuracy.
>_______________________________________________
>Kepler-dev mailing list
>Kepler-dev at ecoinformatics.org
>http://mercury.nceas.ucsb.edu/ecoinformatics/mailman/listinfo/kepler-dev

------------
Edward A. Lee
Chair of EECS and Robert S. Pepper Distinguished Professor
231 Cory Hall, UC Berkeley, Berkeley, CA 94720-1770
phone: 510-642-0253, fax: 510-642-2845
eal at eecs.Berkeley.EDU, http://ptolemy.eecs.berkeley.edu/~eal




************************************************************
HSBC Bank plc may be solicited in the course of its placement efforts for a
new issue, by investment clients of the firm for whom the Bank as a firm
already provides other services. It may equally decide to allocate to its
own proprietary book or with an associate of HSBC Group. This represents a
potential conflict of interest. HSBC Bank plc has internal arrangements
designed to ensure that the firm would give unbiased and full advice to the
corporate finance client about the valuation and pricing of the offering as
well as internal systems, controls and procedures to identify and manage
conflicts of interest.

HSBC Bank plc
Registered Office: 8 Canada Square, London E14 5HQ, United Kingdom
Registered in England - Number 14259
Authorised and regulated by the Financial Services Authority.
************************************************************



More information about the Kepler-dev mailing list