001    package org.cocome.tradingsystem.testdriver;
002    
003    import java.io.Serializable;
004    import java.util.concurrent.TimeoutException;
005    
006    import javax.jms.JMSException;
007    import javax.jms.Message;
008    import javax.jms.MessageListener;
009    import javax.jms.ObjectMessage;
010    import javax.jms.TopicSubscriber;
011    
012    import org.cocome.tradingsystem.systests.interfaces.IUpdateReceiver;
013    
014    /**
015     * This is the base for all classes implementing the IUpdateReceiver. The idea
016     * is to listen on a specified channel and keep only messages of certain types.
017     * 
018     * @author Benjamin Hummel
019     * @author $Author: hummel $
020     * @version $Rev: 63 $
021     * @levd.rating GREEN Rev: 63
022     */
023    public class UpdateReceiver implements IUpdateReceiver, MessageListener {
024    
025            /** The last message (of those we are interested in) received. */
026            private Serializable lastRelevantMessage = null;
027    
028            /**
029             * A flag indicating if this is a new message or if this has been read
030             * before.
031             */
032            private boolean isNewMessage = false;
033    
034            /** The array of message types we are interested in. */
035            private final Class<?>[] relevantMessages;
036    
037            /**
038             * Creates a new update receiver.
039             * 
040             * @param subscriber
041             *            the subscriber to get the messages from.
042             * @param relevantMessageClasses
043             *            the classes of messages we are interested in.
044             */
045            public UpdateReceiver(TopicSubscriber subscriber,
046                            Class<?>... relevantMessageClasses) throws JMSException {
047                    subscriber.setMessageListener(this);
048                    this.relevantMessages = relevantMessageClasses;
049            }
050    
051            /** {@inheritDoc} */
052            public synchronized void waitForUpdate(int maxMilliseconds)
053                            throws TimeoutException {
054    
055                    try {
056                            // multiplying by 10 makes the tests slightly slower, however I have
057                            // a chance of getting them work on my system (even with so many
058                            // processes running).
059                            this.wait(10 * maxMilliseconds);
060                    } catch (InterruptedException e) {
061                            // should not happen
062                    }
063    
064                    if (!isNewMessage) {
065                            throw new TimeoutException();
066                    }
067            }
068    
069            /**
070             * Signals that the next expected message should be of the given type. This
071             * checks if such a message is available, and waits again for a short time
072             * to get the chance of receiving the missing message. This is mostly used
073             * to circumvent situations where two messages are sent at the (nearly) same
074             * time.
075             * 
076             * @param messageType
077             *            the type of the message expected.
078             */
079            protected void expectMessage(Class<? extends Serializable> messageType) {
080                    if (lastRelevantMessage == null) {
081                            return;
082                    }
083    
084                    // This could lead to an infinite loop, if there are too many messages.
085                    // However in the cash desk (where this is used) the
086                    // number of messages is small.
087                    while (!messageType.isAssignableFrom(lastRelevantMessage.getClass())) {
088                            // throw away old message
089                            isNewMessage = false;
090                            try {
091                                    waitForUpdate(100);
092                            } catch (TimeoutException e) {
093                                    // there are no more messages, so stop now
094                                    return;
095                            }
096                    }
097            }
098    
099            /** Returns the last relevant message received. */
100            protected synchronized Serializable getLastRelevantMessage() {
101                    isNewMessage = false;
102                    return lastRelevantMessage;
103            }
104    
105            /** {@inheritDoc} */
106            public synchronized void onMessage(Message message) {
107    
108                    if (!(message instanceof ObjectMessage)) {
109                            return;
110                    }
111    
112                    ObjectMessage objMessage = (ObjectMessage) message;
113                    Serializable eventObject;
114                    try {
115                            eventObject = objMessage.getObject();
116                    } catch (JMSException e) {
117                            // nothing much we can do here
118                            return;
119                    }
120    
121                    for (Class<?> relevantMessage : relevantMessages) {
122                            if (relevantMessage.isAssignableFrom(eventObject.getClass())) {
123                                    isNewMessage = true;
124                                    lastRelevantMessage = eventObject;
125                                    this.notify();
126                                    return;
127                            }
128                    }
129            }
130    }