[3570545] | 1 | # This program is public domain |
---|
| 2 | """ |
---|
| 3 | Asynchronous message streams. |
---|
| 4 | |
---|
| 5 | When you have multiple listeners to a process, some of which can connect and |
---|
| 6 | disconnect at different times, you need to dispatch the incoming messages to |
---|
| 7 | all the listeners. When a listener joins a running stream, they need to get |
---|
| 8 | an immediate status update for the computation. This is stored as the |
---|
| 9 | stream header. Then, without dropping any messages, the listeners should |
---|
| 10 | receive all subsequent messages in the stream in order. |
---|
| 11 | |
---|
| 12 | The message stream is multi-channelled, and indexed by a key. Within |
---|
| 13 | the service framework the key is likely to be the jobid associated with |
---|
| 14 | the message stream. |
---|
| 15 | |
---|
| 16 | The contents of the message stream are expected to be monitor messages |
---|
| 17 | (see `park.monitor` for details), with the stream header being a |
---|
| 18 | `park.monitor.Join` message. When a new listener is registered, |
---|
| 19 | the header is immediately put on the queue (if there is one), then all |
---|
| 20 | subsequent message are sent until the listener calls hangup(). |
---|
| 21 | |
---|
| 22 | To attach to a message stream you need an object which accepts put(). |
---|
| 23 | An asynchronous queue object is a good choice since it allows you to |
---|
| 24 | buffer the messages in one thread and pull them off as needed in another. |
---|
| 25 | You can also use a `park.monitor.Monitor` object to process the messages |
---|
| 26 | directly. |
---|
| 27 | """ |
---|
| 28 | from __future__ import with_statement |
---|
| 29 | |
---|
| 30 | __all__ = ['message_stream'] |
---|
| 31 | |
---|
| 32 | import thread |
---|
| 33 | |
---|
| 34 | |
---|
| 35 | # Design note: we cannot buffer the messages in the stream. The only |
---|
| 36 | # logical buffer size in this case would be the entire stream history |
---|
| 37 | # and that may large. Furthermore, leading messages in the stream are |
---|
| 38 | # not of value to the computation. The final result will be available |
---|
| 39 | # for as long as the job is stored on the server, which is a lifetime |
---|
| 40 | # much longer than the message queue. |
---|
| 41 | |
---|
| 42 | class MessageStream: |
---|
| 43 | """ |
---|
| 44 | Message streams. |
---|
| 45 | |
---|
| 46 | For each active job on the system there is a message stream |
---|
| 47 | containing the job header and a list of listening objects. |
---|
| 48 | Listeners should accept a put() message to process the next |
---|
| 49 | item in the stream. |
---|
| 50 | """ |
---|
| 51 | def __init__(self): |
---|
| 52 | self._lock = thread.allocate_lock() |
---|
| 53 | self.listeners = {} |
---|
| 54 | self.headers = {} |
---|
| 55 | |
---|
| 56 | def __getitem__(self, stream): |
---|
| 57 | """Get the header for message stream 'stream'.""" |
---|
| 58 | with self._lock: |
---|
| 59 | return self.headers.get(stream, None) |
---|
| 60 | def __setitem__(self, stream, message): |
---|
| 61 | """Set the header for message stream 'stream'.""" |
---|
| 62 | with self._lock: |
---|
| 63 | self.headers[stream] = message |
---|
| 64 | def __delitem__(self, stream): |
---|
| 65 | """Delete the message stream 'stream'.""" |
---|
| 66 | self.headers.pop(stream,None) |
---|
| 67 | self.listeners.pop(stream,None) |
---|
| 68 | |
---|
| 69 | def put(self, stream, message): |
---|
| 70 | """ |
---|
| 71 | Put a message on stream 'stream', transfering it to all listening queues. |
---|
| 72 | """ |
---|
| 73 | #print "post message on",stream,message |
---|
| 74 | with self._lock: |
---|
| 75 | #print "post",stream,message,"->",self.listeners.get(stream,[]) |
---|
| 76 | for queue in self.listeners.get(stream,[]): |
---|
| 77 | queue.put(message) |
---|
| 78 | |
---|
| 79 | def listen(self, stream, queue): |
---|
| 80 | """ |
---|
| 81 | Listen to message stream 'stream', adding all new messages to queue. |
---|
| 82 | |
---|
| 83 | The stream header will be the first message queued. |
---|
| 84 | """ |
---|
| 85 | #print "listening on",stream |
---|
| 86 | with self._lock: |
---|
| 87 | queues = self.listeners.setdefault(stream,[]) |
---|
| 88 | queues.append(queue) |
---|
| 89 | #print "listen",stream,"->",self.listeners.get(stream,[]) |
---|
| 90 | # Make sure that the Join header is the first item in the queue. |
---|
| 91 | header = self.headers.get(stream, None) |
---|
| 92 | if header is not None: |
---|
| 93 | queue.put(header) |
---|
| 94 | |
---|
| 95 | def hangup(self, stream, queue): |
---|
| 96 | """ |
---|
| 97 | Stop listening to message stream 'stream' with queue. |
---|
| 98 | |
---|
| 99 | If stream is None then remove queue from all message streams. |
---|
| 100 | """ |
---|
| 101 | with self._lock: |
---|
| 102 | if stream is not None: |
---|
| 103 | try: |
---|
| 104 | queuelist = self.listeners[stream] |
---|
| 105 | queuelist.remove(queue) |
---|
| 106 | if queuelist == []: del self.listeners[stream] |
---|
| 107 | except KeyError: |
---|
| 108 | pass |
---|
| 109 | except ValueError: |
---|
| 110 | pass |
---|
| 111 | else: |
---|
| 112 | purge = [] |
---|
| 113 | for stream,queuelist in self.listeners.iteritems(): |
---|
| 114 | try: |
---|
| 115 | queuelist.remove(queue) |
---|
| 116 | if queuelist == []: purge.append(stream) |
---|
| 117 | except ValueError: |
---|
| 118 | pass |
---|
| 119 | for stream in purge: del self.listeners[stream] |
---|
| 120 | |
---|
| 121 | message_stream = MessageStream() |
---|
| 122 | stream = message_stream # so message.stream works |
---|
| 123 | |
---|
| 124 | |
---|
| 125 | def demo(): |
---|
| 126 | import Queue |
---|
| 127 | import time |
---|
| 128 | t0 = time.time() |
---|
| 129 | class NamedQueue(Queue.Queue): |
---|
| 130 | def __init__(self, name): |
---|
| 131 | Queue.Queue.__init__(self) |
---|
| 132 | self.name = name |
---|
| 133 | def __str__(self): return self.name |
---|
| 134 | def __repr__(self): return "Queue('%s')"%self.name |
---|
| 135 | def process_queue(queue): |
---|
| 136 | print ">>>",queue |
---|
| 137 | while True: |
---|
| 138 | value = queue.get() |
---|
| 139 | print "recv",queue,":",value[2],"<-",value[:2],"time",time.time()-t0 |
---|
| 140 | def post(id, stream, deltas): |
---|
| 141 | for k,t in enumerate(deltas): |
---|
| 142 | time.sleep(t) |
---|
| 143 | print " send ",id,":",stream,"<-",(id,k) |
---|
| 144 | message_stream.put(stream,(id,k,stream)) |
---|
| 145 | def listen(stream,queue): |
---|
| 146 | print "+++",queue,":",stream |
---|
| 147 | message_stream.listen(stream,queue) |
---|
| 148 | def hangup(stream,queue): |
---|
| 149 | print "---",queue,":",stream |
---|
| 150 | message_stream.hangup(stream,queue) |
---|
| 151 | |
---|
| 152 | q1 = NamedQueue('Q1') |
---|
| 153 | q2 = NamedQueue('Q2') |
---|
| 154 | q3 = NamedQueue('Q3') |
---|
| 155 | for q in [q1,q2,q3]: thread.start_new_thread(process_queue,(q,)) |
---|
| 156 | message_queue['M2'] = ('H2',0,'M2') |
---|
| 157 | thread.start_new_thread(post,('S1','M1',[1,2,1,2,3,2])) |
---|
| 158 | thread.start_new_thread(post,('S2','M1',[1,2,1,2,3,2])) |
---|
| 159 | thread.start_new_thread(post,('S3','M2',[1,2,1,2,3,2])) |
---|
| 160 | thread.start_new_thread(post,('S4','M2',[1,2,1,2,3,2])) |
---|
| 161 | listen('M1',q1) |
---|
| 162 | listen('M1',q3) |
---|
| 163 | listen('M2',q3) |
---|
| 164 | time.sleep(5) |
---|
| 165 | hangup(None,q3) |
---|
| 166 | hangup('M1',q1) |
---|
| 167 | listen('M2',q2) |
---|
| 168 | time.sleep(5) |
---|
| 169 | time.sleep(15) |
---|
| 170 | |
---|
| 171 | if __name__ == "__main__": demo() |
---|