source: sasview/park-1.2.1/park/message.py @ d8c4019

ESS_GUIESS_GUI_DocsESS_GUI_batch_fittingESS_GUI_bumps_abstractionESS_GUI_iss1116ESS_GUI_iss879ESS_GUI_iss959ESS_GUI_openclESS_GUI_orderingESS_GUI_sync_sascalccostrafo411magnetic_scattrelease-4.1.1release-4.1.2release-4.2.2release_4.0.1ticket-1009ticket-1094-headlessticket-1242-2d-resolutionticket-1243ticket-1249ticket885unittest-saveload
Last change on this file since d8c4019 was 3570545, checked in by Mathieu Doucet <doucetm@…>, 13 years ago

Adding park Part 2

  • Property mode set to 100644
File size: 6.1 KB
Line 
1# This program is public domain
2"""
3Asynchronous message streams.
4
5When you have multiple listeners to a process, some of which can connect and
6disconnect at different times, you need to dispatch the incoming messages to
7all the listeners.  When a listener joins a running stream, they need to get
8an immediate status update for the computation.  This is stored as the
9stream header. Then, without dropping any messages, the listeners should
10receive all subsequent messages in the stream in order.
11
12The message stream is multi-channelled, and indexed by a key.  Within
13the service framework the key is likely to be the jobid associated with
14the message stream.
15
16The contents of the message stream are expected to be monitor messages
17(see `park.monitor` for details), with the stream header being a
18`park.monitor.Join` message.  When a new listener is registered,
19the header is immediately put on the queue (if there is one), then all
20subsequent message are sent until the listener calls hangup().
21
22To attach to a message stream you need an object which accepts put().
23An asynchronous queue object is a good choice since it allows you to
24buffer the messages in one thread and pull them off as needed in another.
25You can also use a `park.monitor.Monitor` object to process the messages
26directly.
27"""
28from __future__ import with_statement
29
30__all__ = ['message_stream']
31
32import thread
33
34
35# Design note: we cannot buffer the messages in the stream.  The only
36# logical buffer size in this case would be the entire stream history
37# and that may large.  Furthermore, leading messages in the stream are
38# not of value to the computation.  The final result will be available
39# for as long as the job is stored on the server, which is a lifetime
40# much longer than the message queue.
41
42class MessageStream:
43    """
44    Message streams.
45
46    For each active job on the system there is a message stream
47    containing the job header and a list of listening objects.
48    Listeners should accept a put() message to process the next
49    item in the stream.
50    """
51    def __init__(self):
52        self._lock = thread.allocate_lock()
53        self.listeners = {}
54        self.headers = {}
55
56    def __getitem__(self, stream):
57        """Get the header for message stream 'stream'."""
58        with self._lock:
59            return self.headers.get(stream, None)
60    def __setitem__(self, stream, message):
61        """Set the header for message stream 'stream'."""
62        with self._lock:
63            self.headers[stream] = message
64    def __delitem__(self, stream):
65        """Delete the message stream 'stream'."""
66        self.headers.pop(stream,None)
67        self.listeners.pop(stream,None)
68
69    def put(self, stream, message):
70        """
71        Put a message on stream 'stream', transfering it to all listening queues.
72        """
73        #print "post message on",stream,message
74        with self._lock:
75            #print "post",stream,message,"->",self.listeners.get(stream,[])
76            for queue in self.listeners.get(stream,[]):
77                queue.put(message)
78
79    def listen(self, stream, queue):
80        """
81        Listen to message stream 'stream', adding all new messages to queue.
82
83        The stream header will be the first message queued.
84        """
85        #print "listening on",stream
86        with self._lock:
87            queues = self.listeners.setdefault(stream,[])
88            queues.append(queue)
89            #print "listen",stream,"->",self.listeners.get(stream,[])
90            # Make sure that the Join header is the first item in the queue.
91            header = self.headers.get(stream, None)
92            if header is not None:
93                queue.put(header)
94
95    def hangup(self, stream, queue):
96        """
97        Stop listening to message stream 'stream' with queue.
98
99        If stream is None then remove queue from all message streams.
100        """
101        with self._lock:
102            if stream is not None:
103                try:
104                    queuelist = self.listeners[stream]
105                    queuelist.remove(queue)
106                    if queuelist == []: del self.listeners[stream]
107                except KeyError:
108                    pass
109                except ValueError:
110                    pass
111            else:
112                purge = []
113                for stream,queuelist in self.listeners.iteritems():
114                    try:
115                        queuelist.remove(queue)
116                        if queuelist == []: purge.append(stream)
117                    except ValueError:
118                        pass
119                for stream in purge: del self.listeners[stream]
120
121message_stream = MessageStream()
122stream = message_stream # so message.stream works
123
124
125def demo():
126    import Queue
127    import time
128    t0 = time.time()
129    class NamedQueue(Queue.Queue):
130        def __init__(self, name):
131            Queue.Queue.__init__(self)
132            self.name = name
133        def __str__(self): return self.name
134        def __repr__(self): return "Queue('%s')"%self.name
135    def process_queue(queue):
136        print ">>>",queue
137        while True:
138            value = queue.get()
139            print "recv",queue,":",value[2],"<-",value[:2],"time",time.time()-t0
140    def post(id, stream, deltas):
141        for k,t in enumerate(deltas):
142            time.sleep(t)
143            print "       send ",id,":",stream,"<-",(id,k)
144            message_stream.put(stream,(id,k,stream))
145    def listen(stream,queue):
146        print "+++",queue,":",stream
147        message_stream.listen(stream,queue)
148    def hangup(stream,queue):
149        print "---",queue,":",stream
150        message_stream.hangup(stream,queue)
151
152    q1 = NamedQueue('Q1')
153    q2 = NamedQueue('Q2')
154    q3 = NamedQueue('Q3')
155    for q in [q1,q2,q3]: thread.start_new_thread(process_queue,(q,))
156    message_queue['M2'] = ('H2',0,'M2')
157    thread.start_new_thread(post,('S1','M1',[1,2,1,2,3,2]))
158    thread.start_new_thread(post,('S2','M1',[1,2,1,2,3,2]))
159    thread.start_new_thread(post,('S3','M2',[1,2,1,2,3,2]))
160    thread.start_new_thread(post,('S4','M2',[1,2,1,2,3,2]))
161    listen('M1',q1)
162    listen('M1',q3)
163    listen('M2',q3)
164    time.sleep(5)
165    hangup(None,q3)
166    hangup('M1',q1)
167    listen('M2',q2)
168    time.sleep(5)
169    time.sleep(15)
170
171if __name__ == "__main__": demo()
Note: See TracBrowser for help on using the repository browser.