Home AMX User Forum Duet/Cafe Duet

Multithreaded device interaction

I've been playing round with some ideas for ways to better handle device communication from within Duet. The default pattern that the SDK steers you towards (at least in my experience) is handling the communication to, and data coming back from the device discreetly. This works if you are dealing with a protocol that responds with a uniquely identifiable state update whenever things change, however, in devices that only provide an ACK or value response to a request things get messy rather quickly.

In a lot of instances it seems it would be much nicer to to be able to send some data to the device and be returned with the response, the ideal API being something along the lines of:
/**
 * Sends command data to the device and provides the device response. This
 * method will block until the device responds or the buffer response
 * timeout is reached.
 * 
 * @param data
 *            a string containing the command to send
 * @return a byte array containing the device reponse, null if no response
 */
byte[] send(byte[] data)

If you're dealing with IP based comms this is nice and easy as the socket communication is handled in its own thread so the above method can be implemented as such:
  1. open up the InputStream / clear an already open one
  2. write the data to and flush your OutputStream
  3. call read / readln on your InputStream which will block until data is available or your socket timeout is reached
  4. return the response

Serial devices are a different story though. I've written some classes that abstract my serial device communication into an extended StreamConnection interface and provide some buffering and command spacing, however, as the only way to get incoming data is via a handleDataEvent(Event) method of an IDataListener class the implementation above breaks in normal use.

This is because to implement blocking read methods in my NetLinx device InputStream there is a need to wait if no data is available then be notified from within handleDataEvent(Event) and return. The problem is handleDataEvent(Event) is called from EventRouter.processEvent(Event) in the SWROUTER thread, which happens to be the same thread that calls any of the control / device interaction methods you override within a Module.

The solution I've been able to find at this point is to handle each device interaction in its own thread using a little helper class similar to the following:
abstract class DeviceInteraction implements Runnable {

    private DeviceInteraction() {
        new Thread(this).start();
    }

    public final void run() {
        action();
    }

    abstract void action();

}

which is used like so:
new DeviceInteraction() {

    void action() {
        String response = send("foo");
        if (response.equals("bar")) {
            // do stuff
        }
    }

};

This is functional, but its far from simple or elegant. How have others approached this problem? Any insight would be greatly appreciated and will be rewarded with immense amounts of cash and a free holiday*.



[size=-2]* no cash, or free holiday will actually be rewarded.[/size]

Comments

  • mighemighe Posts: 39
    I start one thread for every IP connection, so I can usa them as an Asynchronous connection, just like the serials.

    My send function is buffered with a message queue, so
    public void send(byte[] data, int delayAfterAck) {
      queue.add(data, delayAfterAck);
    }
    

    Another thread sends those messages
    public void run() {
      while(true) {
        send(message);
        waitForAckOrTimeout();
        dequeue(message);
      }
    }
    

    and the callback for received data is:
    public void signalString(String receivedData) {
      signalToListeners(lastMessage + receivedData);
    }
    

    I semplified very much the code, ignoring all synchronization issue.
    This strategy works very well when every sent message will be ack-ed or we get a response.
  • PhreaKPhreaK Posts: 966
    Nice. That's not a bad way of approaching it - returning the original message with the response. Though its probably worth returning an object that contains the rx and tx as separate entities rather than concatenating them into String.

    I've had a bit more time to experiment with ideas and utilizing a thread for each interaction as originally proposed is perhaps the most inefficient, expensive way to do it. As it's only possible to perform a single device interaction at any time it instead makes far more sense to utilize a single 'device interaction' thread for each device. This can then draw from a set of queued interactions or enter a wait state if there's none left to perform.

    I've implemented this in my DeviceInteraction class. When a new interaction is defined it will queue it with the DeviceBuffer which, in turn, handles the executing of the queued interaction tasks (well, it will eventually, at the time of writing it still kicks off a single thread for each but I'll implement this tomorrow). Additionally the the DeviceBuffer provides the ability to add RXListeners which are alerted of any incoming data and as such can be used for processing asynchronous communication.
  • mighemighe Posts: 39
    PhreaK wrote: »
    Nice. That's not a bad way of approaching it - returning the original message with the response. Though its probably worth returning an object that contains the rx and tx as separate entities rather than concatenating them into String.
    In the production code you can pass an object to the connection that defines what to do with the feedback and the original message. I often use a simple concatenation like:
    lastMessage + "XXXX" + received
    using XXXX as separator.
    PhreaK wrote: »
    I've had a bit more time to experiment with ideas and utilizing a thread for each interaction as originally proposed is perhaps the most inefficient, expensive way to do it. As it's only possible to perform a single device interaction at any time it instead makes far more sense to utilize a single 'device interaction' thread for each device. This can then draw from a set of queued interactions or enter a wait state if there's none left to perform.

    I've implemented this in my DeviceInteraction class. When a new interaction is defined it will queue it with the DeviceBuffer which, in turn, handles the executing of the queued interaction tasks (well, it will eventually, at the time of writing it still kicks off a single thread for each but I'll implement this tomorrow). Additionally the the DeviceBuffer provides the ability to add RXListeners which are alerted of any incoming data and as such can be used for processing asynchronous communication.
        public DeviceInteraction(DeviceBuffer deviceBuffer) {
            buffer = deviceBuffer;
            deviceBuffer.enqueue(this);
        }
    

    This constructor is very "dangerous" and can lead to unexpected errors, beacuse it enqueues an object that cannot be fully initialized under some JVM if deviceBuffer start before the constructor end.
    A better approach is using a factory method, for example:
        public static DeviceInteraction newInstance(DeviceBuffer deviceBuffer) {
            DeviceInteraction instance = new DeviceInteraction(deviceBuffer);
            deviceBuffer.enqueue(instance);
            return instance;
        }
    
        private DeviceInteraction(DeviceBuffer deviceBuffer) {
            buffer = deviceBuffer;
        }
    

    Multithreading under Java, especially for versions <= 1.4 is very very nasty :-(
  • PhreaKPhreaK Posts: 966
    mighe wrote: »
    This constructor is very "dangerous" and can lead to unexpected errors, beacuse it enqueues an object that cannot be fully initialized under some JVM if deviceBuffer start before the constructor end.
    Cheers for catching that. It's not possible to utilize a factory method with the pattern I'm trying to use as each DeviceInteraction must implement what the interaction involves. Instead I've added an execute() method which will enqueue the interaction.

    mighe wrote: »
    Multithreading under Java, especially for versions <= 1.4 is very very nasty :-(
    Tell me about it. Oh what I'd give for java.util.concurrent.
Sign In or Register to comment.