[kepler-dev] Threaded actors in the DE Domain
ian.brown at hsbcib.com
ian.brown at hsbcib.com
Wed Sep 12 00:57:44 PDT 2007
Hi Christoper, I'm running with the nightly source drop from 9/9/2007 - so
that's pretty recent :)
As a hack at the moment, I've set the _mutationEnabled member in
FSMDirector to false in FSMDirector::initialize(). This is oky for our
particular model (no refinements in any states) but it obviously a
temporary hack. It would be better to understand how I need to synchronise
the fireAt() call from the Actor.
Ian
"Christopher
Brooks"
<cxh at eecs.berkele To
y.edu> ian.brown at hsbcib.com
Sent by: cc
cxh at EECS.Berkeley kepler-dev at ecoinformatics.org
.EDU Subject
Re: [kepler-dev] Threaded actors in
the DE Domain
11/09/2007 21:45 Entity
Mail Size: 24247 Investment Banking Europe - IBEU
Hi Ian,
Are you running from the latest Ptolemy II sources?
In December, 2006, there were changes that helped address this issue.
An internal report for a project said:
> * We addressed concurrency errors in the graph package.
> The graph package was not designed for a multithreaded environment.
> A simple test that executes multiple models resulted in
> ConcurrentModificationException was created by Raymond Cardillo
> and Brian Hudson. We added a modified version of that test to
> our nightly build, where the test fails under Solaris with two
> processors, but passes under Windows with one processor.
>
> Our workaround is to make all public methods in TypeLattice
> synchronized to the class. This workaround could result in
> slowness.
_Christopher
--------
Hi, I have an source actor which uses a thread to read from a TCP/IP
socket. When data is received, it fires an event. The overall model
consists of two of these sources which are connected to a finite state
machine. This FSM is a modal model although none of the states actually
have any refinements.
The problem we are having is that occasionally we will get a graph
construction exception. I think the problem is that when a state
transition
occurs the DAG is rebuilt. During this process a fire event can happen
and
you end up with 2 threads both trying to rebuild the DAG at the same
time.
This is a bit of a guess - but it certainly appears to be what is
happening.
How do I synchronise the fireAt() call so that this does not happen? I
seem
to remember reading something somewhere about thread restrictions in
fireAt() but cannot find the reference any more. Can anyone help?
The exception I am getting is as follows:
ptolemy.graph.GraphConstructionException: Attempt to add a node that is
already contained in the graph.
Dumps of the offending node and graph follow.
The offending node:
ptolemy.actor.TypedIOPort
{.masterModel6ib.meanReversionParent.meanRevision.ArrayVariance.in}
The offending graph:
{ptolemy.graph.DirectedGraph
Node Set:
0: ptolemy.actor.TypedIOPort
{.masterModel6ib.meanReversionParent.meanRevision2.positionTrigger2.trigger
}
1: ptolemy.actor.TypedIOPort
{.masterModel6ib.meanReversionParent.meanRevision2.positionTrigger2.output}
2: ptolemy.actor.TypedIOPort
{.masterModel6ib.meanReversionParent.meanRevision2.orderBuilder.Const2.trig
227: (ptolemy.actor.TypedIOPort
{.masterModel6ib.orderManager.CompositeActor.FindOrderWS.status},
ptolemy.actor.TypedIOPort {.masterModel6ib.orderManager.Display.input})
228: (ptolemy.actor.TypedIOPort
{.masterModel6ib.orderManager.CompositeActor.FindOrderWS.status},
ptolemy.actor.TypedIOPort
{.masterModel6ib.orderManager.CompositeActor.TicCount2.input})
}
at ptolemy.graph.Graph.addNode(Graph.java:308)
at ptolemy.graph.Graph.addNodes(Graph.java:356)
at ptolemy.graph.Graph.addGraph(Graph.java:284)
at
ptolemy.actor.util.FunctionDependencyOfCompositeActor._mergeActorsGraph(Fun
ctionDependencyOfCompositeActor.java:377)
at
ptolemy.actor.util.FunctionDependencyOfCompositeActor._constructDetailedDep
endencyGraph(FunctionDependencyOfCompositeActor.java:225)
at
ptolemy.actor.util.FunctionDependencyOfCompositeActor._constructDependencyG
raph(FunctionDependencyOfCompositeActor.java:146)
at
ptolemy.actor.util.FunctionDependency._validate(FunctionDependency.java:273
)
at
ptolemy.actor.util.FunctionDependencyOfCompositeActor.getCycleNodes(Functio
nDependencyOfCompositeActor.java:116)
at
ptolemy.domains.de.kernel.DEDirector._constructDirectedGraph(DEDirector.jav
a:1617)
at
ptolemy.domains.de.kernel.DEDirector._computePortDepth(DEDirector.java:1386
)
at
ptolemy.domains.de.kernel.DEDirector._getDepthOfIOPort(DEDirector.java:1691
)
at
ptolemy.domains.de.kernel.DEDirector._enqueueTriggerEvent(DEDirector.java:1
260)
at ptolemy.domains.de.kernel.DEReceiver.put(DEReceiver.java:150)
at
ptolemy.actor.AbstractReceiver.putToAll(AbstractReceiver.java:315)
at ptolemy.actor.IOPort.broadcast(IOPort.java:279)
at ptolemy.actor.TypedIOPort.broadcast(TypedIOPort.java:236)
at
com.hsbc.IMTActors.CurrencyPairListener.fire(CurrencyPairListener.java:122)
at ptolemy.domains.de.kernel.DEDirector.fire(DEDirector.java:505)
at ptolemy.actor.CompositeActor.fire(CompositeActor.java:370)
at ptolemy.actor.Manager.iterate(Manager.java:681)
at ptolemy.actor.Manager.execute(Manager.java:331)
at ptolemy.actor.Manager.run(Manager.java:1064)
at ptolemy.actor.Manager$3.run(Manager.java:1105)
My actor source class is implemented as follows:
/* An actor that asynchronously reads currency pair updates.
*/
package com.hsbc.IMTActors;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.Socket;
import ptolemy.actor.TypedAtomicActor;
import ptolemy.actor.TypedIOPort;
import ptolemy.data.ArrayToken;
import ptolemy.data.BooleanToken;
import ptolemy.data.DoubleToken;
import ptolemy.data.IntToken;
import ptolemy.data.StringToken;
import ptolemy.data.Token;
import ptolemy.data.UnsignedByteToken;
import ptolemy.data.expr.Parameter;
import ptolemy.data.type.ArrayType;
import ptolemy.data.type.BaseType;
import ptolemy.kernel.CompositeEntity;
import ptolemy.kernel.util.Attribute;
import ptolemy.kernel.util.IllegalActionException;
import ptolemy.kernel.util.InternalErrorException;
import ptolemy.kernel.util.NameDuplicationException;
import ptolemy.domains.de.kernel.DEDirector;
//////////////////////////////////////////////////////////////////////////
/**
an actor to subscribe to the IMT data feed and provide top-of-book
price data for a single currency-pair instrument.
@author Ian Brown
*/
public class CurrencyPairListener extends TypedAtomicActor {
/** Construct an actor with the given container and name.
* @param container The container.
* @param name The name of this actor.
* @exception IllegalActionException If the actor cannot be
contained
* by the proposed container.
* @exception NameDuplicationException If the container already
has an
* actor with this name.
*/
public CurrencyPairListener(CompositeEntity container, String name)
throws NameDuplicationException, IllegalActionException {
super(container, name);
// ports - Ordering here sets the order they show up in Vergil
bid = new TypedIOPort(this, "bid");
bid.setTypeEquals(BaseType.DOUBLE);
bid.setOutput(true);
ask = new TypedIOPort(this, "ask");
ask.setTypeEquals(BaseType.DOUBLE);
ask.setOutput(true);
// parameters - Ordering here sets the order they show up in
Vergil
portNumberParam = new Parameter(this, "portNumber");
portNumberParam.setTypeEquals(BaseType.INT);
portNumberParam.setToken(new IntToken(12345));
serverAddressParam = new Parameter(this, "serverAddress");
serverAddressParam.setTypeEquals(BaseType.STRING);
serverAddressParam.setToken(new StringToken("128.8.120.18"));
instrumentParam = new Parameter(this, "Instrument");
instrumentParam.setTypeEquals(BaseType.STRING);
instrumentParam.setToken(new StringToken("EURGBP"));
}
///////////////////////////////////////////////////////////////////
//// ports and parameters ////
// Ports and parameters are in the same order hare as in the
constructor.
/** This port outputs the latest bid price. */
public TypedIOPort bid;
/** This port outputs the latest ask price. */
public TypedIOPort ask;
/** The port number to connect to. */
public Parameter portNumberParam;
/** The IP address to connect to. */
public Parameter serverAddressParam;
/** The instrument we're watching. */
public Parameter instrumentParam;
///////////////////////////////////////////////////////////////////
//// public methods ////
/**
* @exception IllegalActionException If the data cannot be
* converted into a token of the same type as the configured type
* of the output port.
*/
public void fire() throws IllegalActionException {
super.fire();
// The part dependent on the packet's contents must be
synchronized
// to ensure that the thread does not mess with it while it is
in
use
// here.
synchronized (_syncFireAndThread) {
if (_cur_bid > 0.){
_bidToken = new DoubleToken(_cur_bid);
} else {
_bidToken = null;
}
if (_cur_ask > 0.){
_askToken = new DoubleToken(_cur_ask);
} else {
_askToken = null;
}
_syncFireAndThread.notifyAll();
if (_bidToken != null) bid.broadcast(_bidToken);
if (_askToken != null) ask.broadcast(_askToken);
} // sync
}
/** Initialize this actor, including the creation of an evaluation
* variable for the Ptolemy parser, a DatagramSocket for
* receiving datagrams, and a SocketReadingThread for blocking in
* the DatagramSocket.receive() method call. This method is used
* as a bookend with wrapup() being the other end. Resources
* created/allocated here are released in wrapup().
* @exception IllegalActionException If the
* <i>localSocketNumber</i> parameter has a value outside 0..65535
* or a socket could not be created. */
public void initialize() throws IllegalActionException {
super.initialize();
_cur_bid = -1.0; _cur_ask = -1.0;
int portNumber = ((IntToken)
(portNumberParam.getToken())).intValue();
if ((portNumber < 0) || (portNumber > 65535)) {
throw new IllegalActionException(this, portNumberParam
+ " is outside the required 0..65535 range");
}
String addr =
((StringToken)(serverAddressParam.getToken())).stringValue();
if (_debugging) {
_debug(this + "portNumber = " + portNumber + ", Address = "
+
addr);
}
// Allocate a new socket.
try {
if (_debugging) {
_debug("Trying to create a new socket on port "
+ portNumber);
}
_socket = new Socket(addr, portNumber);
if (_debugging) {
_debug("Socket created successfully!");
}
} catch (Exception ex) {
throw new IllegalActionException(this, ex,
"Failed to create a new socket on port " +
portNumber);
}
// Allocate & start a thread to read from the socket.
_socketReadingThread = new SocketReadingThread();
_socketReadingThread.start();
}
/** Override the setContainer() method to call wrapup() if the
* actor is deleted while the model is running. Wrapup() then
* releases resources acquired by initialize().
*/
public void setContainer(CompositeEntity container)
throws IllegalActionException, NameDuplicationException {
// FIXME: Is this necessary?
if (container != getContainer()) {
wrapup();
}
super.setContainer(container);
}
/** Release resources acquired in the initialize() method.
*/
public void wrapup() throws IllegalActionException {
if (_socketReadingThread != null) {
_socketReadingThread.interrupt();
_socketReadingThread = null;
} else {
if (_debugging) {
_debug("socketReadingThread null at wrapup!?");
}
}
if (_socket != null) {
try {
_socket.close();
_socket = null;
} catch (IOException ex) {
throw new IllegalActionException(this, ex, "Failed to
close
socket");
}
} else {
if (_debugging) {
_debug("Socket null at wrapup!?");
}
}
}
///////////////////////////////////////////////////////////////////
//// private variables ////
// Synchronization objects. Used for synchronization between the
fire() call and the reading thread.
private Object _syncFireAndThread = new Object();
// The socket we are reading from
private Socket _socket;
private SocketReadingThread _socketReadingThread;
private DoubleToken _bidToken;
private DoubleToken _askToken;
private double _cur_bid = -1;
private double _cur_ask = -1;
///////////////////////////////////////////////////////////////////
//// private inner class ////
private class SocketReadingThread extends Thread {
/** Constructor. Create a new thread to listen for packets
* at the socket opened by the actor's [pre]initialize method.
*/
public SocketReadingThread() {
}
/** Run. Run the thread. This begins running when .start()
* is called on the thread.
*/
public void run() {
String instrument;
try {
instrument =
((StringToken)(instrumentParam.getToken())).stringValue();
} catch (Exception ex) {
throw new RuntimeException("failed to get the
currency
instrument", ex);
}
InputStream _socketStream;
try {
_socketStream = _socket.getInputStream();
} catch (IOException ex) {
if (_debugging){
_debug("IOException: " + ex);
}
return;
}
// Attempt to receive something from the socket!
boolean receiving = true;
StringBuffer sb = new StringBuffer(64);
try {
while (true) {
// NOTE: The following call may block.
int c = _socketStream.read();
// if (_debugging){
// _debug(Integer.toString(c));
// }
if (c > 0x0A){
sb.append((char)c);
} else if (0 == c){
// A packet was successfully
received!
if (_debugging){
_debug(sb.toString());
}
if (processInput(sb.toString(),
instrument)){
try {
// fireAtCurrentTime
has
the result that all events are called at model
// time zero with an
incrementing microstep. Whilst this will work for most
// scenarios, it has
the
effect that the timed plotter just displays a vertical
// line. For that
reason,
we need to figure out the correct time to fire the event at.
//getDirector().fireAtCurrentTime(CurrencyPairListener.this);
//
DEDirector director =
(DEDirector)(getDirector());
long start_time =
director.getRealStartTimeMillis();
long time_now =
java.lang.System.currentTimeMillis();
double model_time =
(time_now - start_time) / 1000.0;
director.fireAt(CurrencyPairListener.this, new
ptolemy.actor.util.Time(director, model_time));
} catch
(IllegalActionException
ex) {
throw new
RuntimeException("fireAtCurrentTime() "
+ "threw an
exception", ex);
}
}
sb.setLength(0);
}
}
} catch (IOException ex) {
if (_debugging){
_debug("IOException: " + ex);
}
} catch (NullPointerException ex) {
if (_debugging) {
_debug("--!!--" + (_socket == null));
}
}
}
// process the raw input from the TCP/IP stream
// @return true if the price has changed
//
private boolean processInput(String in_string, String
instrument){
boolean retval = false;
String[] s = in_string.split(":");
if (s.length == 5 && s[0].equals("CY")){
if (s[1].equals(instrument)){
try {
Double new_val = new Double(s[3]);
synchronized (_syncFireAndThread) {
// set the output value
if(s[2].equals("S")){
_cur_ask =
new_val.doubleValue();
} else {
_cur_bid =
new_val.doubleValue();
}
retval = true;
}
} catch (NumberFormatException nfe){
if (_debugging) {
_debug(s[3] + " is not a valid price");
}
}
}
} else {
if (_debugging){
_debug("length = " + s.length + " s[0] =
" +
s[0]);
}
}
return retval;
}
}
}
Thanks,
Ian
--------
************************************************************
HSBC Bank plc may be solicited in the course of its placement efforts for a
new issue, by investment clients of the firm for whom the Bank as a firm
already provides other services. It may equally decide to allocate to its
own proprietary book or with an associate of HSBC Group. This represents a
potential conflict of interest. HSBC Bank plc has internal arrangements
designed to ensure that the firm would give unbiased and full advice to the
corporate finance client about the valuation and pricing of the offering as
well as internal systems, controls and procedures to identify and manage
conflicts of interest.
HSBC Bank plc
Registered Office: 8 Canada Square, London E14 5HQ, United Kingdom
Registered in England - Number 14259
Authorised and regulated by the Financial Services Authority.
************************************************************
-----------------------------------------
SAVE PAPER - THINK BEFORE YOU PRINT!
This transmission has been issued by a member of the HSBC Group
"HSBC" for the information of the addressee only and should not be
reproduced and/or distributed to any other person. Each page
attached hereto must be read in conjunction with any disclaimer
which forms part of it. Unless otherwise stated, this transmission
is neither an offer nor the solicitation of an offer to sell or
purchase any investment. Its contents are based on information
obtained from sources believed to be reliable but HSBC makes no
representation and accepts no responsibility or liability as to its
completeness or accuracy.
More information about the Kepler-dev
mailing list