1 | # This program is public domain |
---|
2 | """ |
---|
3 | Asynchronous message streams. |
---|
4 | |
---|
5 | When you have multiple listeners to a process, some of which can connect and |
---|
6 | disconnect at different times, you need to dispatch the incoming messages to |
---|
7 | all the listeners. When a listener joins a running stream, they need to get |
---|
8 | an immediate status update for the computation. This is stored as the |
---|
9 | stream header. Then, without dropping any messages, the listeners should |
---|
10 | receive all subsequent messages in the stream in order. |
---|
11 | |
---|
12 | The message stream is multi-channelled, and indexed by a key. Within |
---|
13 | the service framework the key is likely to be the jobid associated with |
---|
14 | the message stream. |
---|
15 | |
---|
16 | The contents of the message stream are expected to be monitor messages |
---|
17 | (see `park.monitor` for details), with the stream header being a |
---|
18 | `park.monitor.Join` message. When a new listener is registered, |
---|
19 | the header is immediately put on the queue (if there is one), then all |
---|
20 | subsequent message are sent until the listener calls hangup(). |
---|
21 | |
---|
22 | To attach to a message stream you need an object which accepts put(). |
---|
23 | An asynchronous queue object is a good choice since it allows you to |
---|
24 | buffer the messages in one thread and pull them off as needed in another. |
---|
25 | You can also use a `park.monitor.Monitor` object to process the messages |
---|
26 | directly. |
---|
27 | """ |
---|
28 | from __future__ import with_statement |
---|
29 | |
---|
30 | __all__ = ['message_stream'] |
---|
31 | |
---|
32 | import thread |
---|
33 | |
---|
34 | |
---|
35 | # Design note: we cannot buffer the messages in the stream. The only |
---|
36 | # logical buffer size in this case would be the entire stream history |
---|
37 | # and that may large. Furthermore, leading messages in the stream are |
---|
38 | # not of value to the computation. The final result will be available |
---|
39 | # for as long as the job is stored on the server, which is a lifetime |
---|
40 | # much longer than the message queue. |
---|
41 | |
---|
42 | class MessageStream: |
---|
43 | """ |
---|
44 | Message streams. |
---|
45 | |
---|
46 | For each active job on the system there is a message stream |
---|
47 | containing the job header and a list of listening objects. |
---|
48 | Listeners should accept a put() message to process the next |
---|
49 | item in the stream. |
---|
50 | """ |
---|
51 | def __init__(self): |
---|
52 | self._lock = thread.allocate_lock() |
---|
53 | self.listeners = {} |
---|
54 | self.headers = {} |
---|
55 | |
---|
56 | def __getitem__(self, stream): |
---|
57 | """Get the header for message stream 'stream'.""" |
---|
58 | with self._lock: |
---|
59 | return self.headers.get(stream, None) |
---|
60 | def __setitem__(self, stream, message): |
---|
61 | """Set the header for message stream 'stream'.""" |
---|
62 | with self._lock: |
---|
63 | self.headers[stream] = message |
---|
64 | def __delitem__(self, stream): |
---|
65 | """Delete the message stream 'stream'.""" |
---|
66 | self.headers.pop(stream,None) |
---|
67 | self.listeners.pop(stream,None) |
---|
68 | |
---|
69 | def put(self, stream, message): |
---|
70 | """ |
---|
71 | Put a message on stream 'stream', transfering it to all listening queues. |
---|
72 | """ |
---|
73 | #print "post message on",stream,message |
---|
74 | with self._lock: |
---|
75 | #print "post",stream,message,"->",self.listeners.get(stream,[]) |
---|
76 | for queue in self.listeners.get(stream,[]): |
---|
77 | queue.put(message) |
---|
78 | |
---|
79 | def listen(self, stream, queue): |
---|
80 | """ |
---|
81 | Listen to message stream 'stream', adding all new messages to queue. |
---|
82 | |
---|
83 | The stream header will be the first message queued. |
---|
84 | """ |
---|
85 | #print "listening on",stream |
---|
86 | with self._lock: |
---|
87 | queues = self.listeners.setdefault(stream,[]) |
---|
88 | queues.append(queue) |
---|
89 | #print "listen",stream,"->",self.listeners.get(stream,[]) |
---|
90 | # Make sure that the Join header is the first item in the queue. |
---|
91 | header = self.headers.get(stream, None) |
---|
92 | if header is not None: |
---|
93 | queue.put(header) |
---|
94 | |
---|
95 | def hangup(self, stream, queue): |
---|
96 | """ |
---|
97 | Stop listening to message stream 'stream' with queue. |
---|
98 | |
---|
99 | If stream is None then remove queue from all message streams. |
---|
100 | """ |
---|
101 | with self._lock: |
---|
102 | if stream is not None: |
---|
103 | try: |
---|
104 | queuelist = self.listeners[stream] |
---|
105 | queuelist.remove(queue) |
---|
106 | if queuelist == []: del self.listeners[stream] |
---|
107 | except KeyError: |
---|
108 | pass |
---|
109 | except ValueError: |
---|
110 | pass |
---|
111 | else: |
---|
112 | purge = [] |
---|
113 | for stream,queuelist in self.listeners.iteritems(): |
---|
114 | try: |
---|
115 | queuelist.remove(queue) |
---|
116 | if queuelist == []: purge.append(stream) |
---|
117 | except ValueError: |
---|
118 | pass |
---|
119 | for stream in purge: del self.listeners[stream] |
---|
120 | |
---|
121 | message_stream = MessageStream() |
---|
122 | stream = message_stream # so message.stream works |
---|
123 | |
---|
124 | |
---|
125 | def demo(): |
---|
126 | import Queue |
---|
127 | import time |
---|
128 | t0 = time.time() |
---|
129 | class NamedQueue(Queue.Queue): |
---|
130 | def __init__(self, name): |
---|
131 | Queue.Queue.__init__(self) |
---|
132 | self.name = name |
---|
133 | def __str__(self): return self.name |
---|
134 | def __repr__(self): return "Queue('%s')"%self.name |
---|
135 | def process_queue(queue): |
---|
136 | print ">>>",queue |
---|
137 | while True: |
---|
138 | value = queue.get() |
---|
139 | print "recv",queue,":",value[2],"<-",value[:2],"time",time.time()-t0 |
---|
140 | def post(id, stream, deltas): |
---|
141 | for k,t in enumerate(deltas): |
---|
142 | time.sleep(t) |
---|
143 | print " send ",id,":",stream,"<-",(id,k) |
---|
144 | message_stream.put(stream,(id,k,stream)) |
---|
145 | def listen(stream,queue): |
---|
146 | print "+++",queue,":",stream |
---|
147 | message_stream.listen(stream,queue) |
---|
148 | def hangup(stream,queue): |
---|
149 | print "---",queue,":",stream |
---|
150 | message_stream.hangup(stream,queue) |
---|
151 | |
---|
152 | q1 = NamedQueue('Q1') |
---|
153 | q2 = NamedQueue('Q2') |
---|
154 | q3 = NamedQueue('Q3') |
---|
155 | for q in [q1,q2,q3]: thread.start_new_thread(process_queue,(q,)) |
---|
156 | message_queue['M2'] = ('H2',0,'M2') |
---|
157 | thread.start_new_thread(post,('S1','M1',[1,2,1,2,3,2])) |
---|
158 | thread.start_new_thread(post,('S2','M1',[1,2,1,2,3,2])) |
---|
159 | thread.start_new_thread(post,('S3','M2',[1,2,1,2,3,2])) |
---|
160 | thread.start_new_thread(post,('S4','M2',[1,2,1,2,3,2])) |
---|
161 | listen('M1',q1) |
---|
162 | listen('M1',q3) |
---|
163 | listen('M2',q3) |
---|
164 | time.sleep(5) |
---|
165 | hangup(None,q3) |
---|
166 | hangup('M1',q1) |
---|
167 | listen('M2',q2) |
---|
168 | time.sleep(5) |
---|
169 | time.sleep(15) |
---|
170 | |
---|
171 | if __name__ == "__main__": demo() |
---|