[kepler-dev] Threaded actors in the DE Domain
Christopher Brooks
cxh at eecs.berkeley.edu
Tue Sep 11 13:45:47 PDT 2007
Hi Ian,
Are you running from the latest Ptolemy II sources?
In December, 2006, there were changes that helped address this issue.
An internal report for a project said:
> * We addressed concurrency errors in the graph package.
> The graph package was not designed for a multithreaded environment.
> A simple test that executes multiple models resulted in
> ConcurrentModificationException was created by Raymond Cardillo
> and Brian Hudson. We added a modified version of that test to
> our nightly build, where the test fails under Solaris with two
> processors, but passes under Windows with one processor.
>
> Our workaround is to make all public methods in TypeLattice
> synchronized to the class. This workaround could result in
> slowness.
_Christopher
--------
Hi, I have an source actor which uses a thread to read from a TCP/IP
socket. When data is received, it fires an event. The overall model
consists of two of these sources which are connected to a finite state
machine. This FSM is a modal model although none of the states actually
have any refinements.
The problem we are having is that occasionally we will get a graph
construction exception. I think the problem is that when a state transition
occurs the DAG is rebuilt. During this process a fire event can happen and
you end up with 2 threads both trying to rebuild the DAG at the same time.
This is a bit of a guess - but it certainly appears to be what is
happening.
How do I synchronise the fireAt() call so that this does not happen? I seem
to remember reading something somewhere about thread restrictions in
fireAt() but cannot find the reference any more. Can anyone help?
The exception I am getting is as follows:
ptolemy.graph.GraphConstructionException: Attempt to add a node that is
already contained in the graph.
Dumps of the offending node and graph follow.
The offending node:
ptolemy.actor.TypedIOPort
{.masterModel6ib.meanReversionParent.meanRevision.ArrayVariance.in}
The offending graph:
{ptolemy.graph.DirectedGraph
Node Set:
0: ptolemy.actor.TypedIOPort
{.masterModel6ib.meanReversionParent.meanRevision2.positionTrigger2.trigger
}
1: ptolemy.actor.TypedIOPort
{.masterModel6ib.meanReversionParent.meanRevision2.positionTrigger2.output}
2: ptolemy.actor.TypedIOPort
{.masterModel6ib.meanReversionParent.meanRevision2.orderBuilder.Const2.trig
227: (ptolemy.actor.TypedIOPort
{.masterModel6ib.orderManager.CompositeActor.FindOrderWS.status},
ptolemy.actor.TypedIOPort {.masterModel6ib.orderManager.Display.input})
228: (ptolemy.actor.TypedIOPort
{.masterModel6ib.orderManager.CompositeActor.FindOrderWS.status},
ptolemy.actor.TypedIOPort
{.masterModel6ib.orderManager.CompositeActor.TicCount2.input})
}
at ptolemy.graph.Graph.addNode(Graph.java:308)
at ptolemy.graph.Graph.addNodes(Graph.java:356)
at ptolemy.graph.Graph.addGraph(Graph.java:284)
at
ptolemy.actor.util.FunctionDependencyOfCompositeActor._mergeActorsGraph(Fun
ctionDependencyOfCompositeActor.java:377)
at
ptolemy.actor.util.FunctionDependencyOfCompositeActor._constructDetailedDep
endencyGraph(FunctionDependencyOfCompositeActor.java:225)
at
ptolemy.actor.util.FunctionDependencyOfCompositeActor._constructDependencyG
raph(FunctionDependencyOfCompositeActor.java:146)
at
ptolemy.actor.util.FunctionDependency._validate(FunctionDependency.java:273
)
at
ptolemy.actor.util.FunctionDependencyOfCompositeActor.getCycleNodes(Functio
nDependencyOfCompositeActor.java:116)
at
ptolemy.domains.de.kernel.DEDirector._constructDirectedGraph(DEDirector.jav
a:1617)
at
ptolemy.domains.de.kernel.DEDirector._computePortDepth(DEDirector.java:1386
)
at
ptolemy.domains.de.kernel.DEDirector._getDepthOfIOPort(DEDirector.java:1691
)
at
ptolemy.domains.de.kernel.DEDirector._enqueueTriggerEvent(DEDirector.java:1
260)
at ptolemy.domains.de.kernel.DEReceiver.put(DEReceiver.java:150)
at ptolemy.actor.AbstractReceiver.putToAll(AbstractReceiver.java:315)
at ptolemy.actor.IOPort.broadcast(IOPort.java:279)
at ptolemy.actor.TypedIOPort.broadcast(TypedIOPort.java:236)
at
com.hsbc.IMTActors.CurrencyPairListener.fire(CurrencyPairListener.java:122)
at ptolemy.domains.de.kernel.DEDirector.fire(DEDirector.java:505)
at ptolemy.actor.CompositeActor.fire(CompositeActor.java:370)
at ptolemy.actor.Manager.iterate(Manager.java:681)
at ptolemy.actor.Manager.execute(Manager.java:331)
at ptolemy.actor.Manager.run(Manager.java:1064)
at ptolemy.actor.Manager$3.run(Manager.java:1105)
My actor source class is implemented as follows:
/* An actor that asynchronously reads currency pair updates.
*/
package com.hsbc.IMTActors;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.Socket;
import ptolemy.actor.TypedAtomicActor;
import ptolemy.actor.TypedIOPort;
import ptolemy.data.ArrayToken;
import ptolemy.data.BooleanToken;
import ptolemy.data.DoubleToken;
import ptolemy.data.IntToken;
import ptolemy.data.StringToken;
import ptolemy.data.Token;
import ptolemy.data.UnsignedByteToken;
import ptolemy.data.expr.Parameter;
import ptolemy.data.type.ArrayType;
import ptolemy.data.type.BaseType;
import ptolemy.kernel.CompositeEntity;
import ptolemy.kernel.util.Attribute;
import ptolemy.kernel.util.IllegalActionException;
import ptolemy.kernel.util.InternalErrorException;
import ptolemy.kernel.util.NameDuplicationException;
import ptolemy.domains.de.kernel.DEDirector;
//////////////////////////////////////////////////////////////////////////
/**
an actor to subscribe to the IMT data feed and provide top-of-book
price data for a single currency-pair instrument.
@author Ian Brown
*/
public class CurrencyPairListener extends TypedAtomicActor {
/** Construct an actor with the given container and name.
* @param container The container.
* @param name The name of this actor.
* @exception IllegalActionException If the actor cannot be contained
* by the proposed container.
* @exception NameDuplicationException If the container already has an
* actor with this name.
*/
public CurrencyPairListener(CompositeEntity container, String name)
throws NameDuplicationException, IllegalActionException {
super(container, name);
// ports - Ordering here sets the order they show up in Vergil
bid = new TypedIOPort(this, "bid");
bid.setTypeEquals(BaseType.DOUBLE);
bid.setOutput(true);
ask = new TypedIOPort(this, "ask");
ask.setTypeEquals(BaseType.DOUBLE);
ask.setOutput(true);
// parameters - Ordering here sets the order they show up in Vergil
portNumberParam = new Parameter(this, "portNumber");
portNumberParam.setTypeEquals(BaseType.INT);
portNumberParam.setToken(new IntToken(12345));
serverAddressParam = new Parameter(this, "serverAddress");
serverAddressParam.setTypeEquals(BaseType.STRING);
serverAddressParam.setToken(new StringToken("128.8.120.18"));
instrumentParam = new Parameter(this, "Instrument");
instrumentParam.setTypeEquals(BaseType.STRING);
instrumentParam.setToken(new StringToken("EURGBP"));
}
///////////////////////////////////////////////////////////////////
//// ports and parameters ////
// Ports and parameters are in the same order hare as in the
constructor.
/** This port outputs the latest bid price. */
public TypedIOPort bid;
/** This port outputs the latest ask price. */
public TypedIOPort ask;
/** The port number to connect to. */
public Parameter portNumberParam;
/** The IP address to connect to. */
public Parameter serverAddressParam;
/** The instrument we're watching. */
public Parameter instrumentParam;
///////////////////////////////////////////////////////////////////
//// public methods ////
/**
* @exception IllegalActionException If the data cannot be
* converted into a token of the same type as the configured type
* of the output port.
*/
public void fire() throws IllegalActionException {
super.fire();
// The part dependent on the packet's contents must be synchronized
// to ensure that the thread does not mess with it while it is in
use
// here.
synchronized (_syncFireAndThread) {
if (_cur_bid > 0.){
_bidToken = new DoubleToken(_cur_bid);
} else {
_bidToken = null;
}
if (_cur_ask > 0.){
_askToken = new DoubleToken(_cur_ask);
} else {
_askToken = null;
}
_syncFireAndThread.notifyAll();
if (_bidToken != null) bid.broadcast(_bidToken);
if (_askToken != null) ask.broadcast(_askToken);
} // sync
}
/** Initialize this actor, including the creation of an evaluation
* variable for the Ptolemy parser, a DatagramSocket for
* receiving datagrams, and a SocketReadingThread for blocking in
* the DatagramSocket.receive() method call. This method is used
* as a bookend with wrapup() being the other end. Resources
* created/allocated here are released in wrapup().
* @exception IllegalActionException If the
* <i>localSocketNumber</i> parameter has a value outside 0..65535
* or a socket could not be created. */
public void initialize() throws IllegalActionException {
super.initialize();
_cur_bid = -1.0; _cur_ask = -1.0;
int portNumber = ((IntToken)
(portNumberParam.getToken())).intValue();
if ((portNumber < 0) || (portNumber > 65535)) {
throw new IllegalActionException(this, portNumberParam
+ " is outside the required 0..65535 range");
}
String addr =
((StringToken)(serverAddressParam.getToken())).stringValue();
if (_debugging) {
_debug(this + "portNumber = " + portNumber + ", Address = " +
addr);
}
// Allocate a new socket.
try {
if (_debugging) {
_debug("Trying to create a new socket on port "
+ portNumber);
}
_socket = new Socket(addr, portNumber);
if (_debugging) {
_debug("Socket created successfully!");
}
} catch (Exception ex) {
throw new IllegalActionException(this, ex,
"Failed to create a new socket on port " + portNumber);
}
// Allocate & start a thread to read from the socket.
_socketReadingThread = new SocketReadingThread();
_socketReadingThread.start();
}
/** Override the setContainer() method to call wrapup() if the
* actor is deleted while the model is running. Wrapup() then
* releases resources acquired by initialize().
*/
public void setContainer(CompositeEntity container)
throws IllegalActionException, NameDuplicationException {
// FIXME: Is this necessary?
if (container != getContainer()) {
wrapup();
}
super.setContainer(container);
}
/** Release resources acquired in the initialize() method.
*/
public void wrapup() throws IllegalActionException {
if (_socketReadingThread != null) {
_socketReadingThread.interrupt();
_socketReadingThread = null;
} else {
if (_debugging) {
_debug("socketReadingThread null at wrapup!?");
}
}
if (_socket != null) {
try {
_socket.close();
_socket = null;
} catch (IOException ex) {
throw new IllegalActionException(this, ex, "Failed to close
socket");
}
} else {
if (_debugging) {
_debug("Socket null at wrapup!?");
}
}
}
///////////////////////////////////////////////////////////////////
//// private variables ////
// Synchronization objects. Used for synchronization between the
fire() call and the reading thread.
private Object _syncFireAndThread = new Object();
// The socket we are reading from
private Socket _socket;
private SocketReadingThread _socketReadingThread;
private DoubleToken _bidToken;
private DoubleToken _askToken;
private double _cur_bid = -1;
private double _cur_ask = -1;
///////////////////////////////////////////////////////////////////
//// private inner class ////
private class SocketReadingThread extends Thread {
/** Constructor. Create a new thread to listen for packets
* at the socket opened by the actor's [pre]initialize method.
*/
public SocketReadingThread() {
}
/** Run. Run the thread. This begins running when .start()
* is called on the thread.
*/
public void run() {
String instrument;
try {
instrument =
((StringToken)(instrumentParam.getToken())).stringValue();
} catch (Exception ex) {
throw new RuntimeException("failed to get the currency
instrument", ex);
}
InputStream _socketStream;
try {
_socketStream = _socket.getInputStream();
} catch (IOException ex) {
if (_debugging){
_debug("IOException: " + ex);
}
return;
}
// Attempt to receive something from the socket!
boolean receiving = true;
StringBuffer sb = new StringBuffer(64);
try {
while (true) {
// NOTE: The following call may block.
int c = _socketStream.read();
// if (_debugging){
// _debug(Integer.toString(c));
// }
if (c > 0x0A){
sb.append((char)c);
} else if (0 == c){
// A packet was successfully received!
if (_debugging){
_debug(sb.toString());
}
if (processInput(sb.toString(),
instrument)){
try {
// fireAtCurrentTime has
the result that all events are called at model
// time zero with an
incrementing microstep. Whilst this will work for most
// scenarios, it has the
effect that the timed plotter just displays a vertical
// line. For that reason,
we need to figure out the correct time to fire the event at.
//getDirector().fireAtCurrentTime(CurrencyPairListener.this);
//
DEDirector director =
(DEDirector)(getDirector());
long start_time =
director.getRealStartTimeMillis();
long time_now =
java.lang.System.currentTimeMillis();
double model_time =
(time_now - start_time) / 1000.0;
director.fireAt(CurrencyPairListener.this, new
ptolemy.actor.util.Time(director, model_time));
} catch (IllegalActionException
ex) {
throw new
RuntimeException("fireAtCurrentTime() "
+ "threw an
exception", ex);
}
}
sb.setLength(0);
}
}
} catch (IOException ex) {
if (_debugging){
_debug("IOException: " + ex);
}
} catch (NullPointerException ex) {
if (_debugging) {
_debug("--!!--" + (_socket == null));
}
}
}
// process the raw input from the TCP/IP stream
// @return true if the price has changed
//
private boolean processInput(String in_string, String
instrument){
boolean retval = false;
String[] s = in_string.split(":");
if (s.length == 5 && s[0].equals("CY")){
if (s[1].equals(instrument)){
try {
Double new_val = new Double(s[3]);
synchronized (_syncFireAndThread) {
// set the output value
if(s[2].equals("S")){
_cur_ask =
new_val.doubleValue();
} else {
_cur_bid =
new_val.doubleValue();
}
retval = true;
}
} catch (NumberFormatException nfe){
if (_debugging) {
_debug(s[3] + " is not a valid price");
}
}
}
} else {
if (_debugging){
_debug("length = " + s.length + " s[0] = " +
s[0]);
}
}
return retval;
}
}
}
Thanks,
Ian
--------
More information about the Kepler-dev
mailing list