source: sasview/src/sas/sascalc/data_util/calcthread.py @ 25dd9c9

Last change on this file since 25dd9c9 was 7432acb, checked in by andyfaff, 8 years ago

MAINT: search+replace '!= None' by 'is not None'

  • Property mode set to 100644
File size: 12.3 KB
Line 
1# This program is public domain
2
3## \file
4#  \brief Abstract class for defining calculation threads.
5#
6
7import thread
8import traceback
9import sys
10import logging
11
12if sys.platform.count("darwin") > 0:
13    import time
14    stime = time.time()
15   
16    def clock():
17        return time.time() - stime
18   
19    def sleep(t):
20        return time.sleep(t)
21else:
22    from time import clock
23    from time import sleep
24
25logger = logging.getLogger(__name__)
26
27
28class CalcThread:
29    """Threaded calculation class.  Inherit from here and specialize
30    the compute() method to perform the appropriate operations for the
31    class.
32
33    If you specialize the __init__ method be sure to call
34    CalcThread.__init__, passing it the keyword arguments for
35    yieldtime, worktime, update and complete.
36   
37    When defining the compute() method you need to include code which
38    allows the GUI to run.  They are as follows: ::
39
40        self.isquit()          # call frequently to check for interrupts
41        self.update(kw=...)    # call when the GUI could be updated
42        self.complete(kw=...)  # call before exiting compute()
43
44    The update() and complete() calls accept field=value keyword
45    arguments which are passed to the called function.  complete()
46    should be called before exiting the GUI function.  A KeyboardInterrupt
47    event is triggered if the GUI signals that the computation should
48    be halted.
49
50    The following documentation should be included in the description
51    of the derived class.
52
53    The user of this class will call the following: ::
54
55        thread = Work(...,kw=...)  # prepare the work thread.
56        thread.queue(...,kw=...)   # queue a work unit
57        thread.requeue(...,kw=...) # replace work unit on the end of queue
58        thread.reset(...,kw=...)   # reset the queue to the given work unit
59        thread.stop()              # clear the queue and halt
60        thread.interrupt()         # halt the current work unit but continue
61        thread.ready(delay=0.)     # request an update signal after delay
62        thread.isrunning()         # returns true if compute() is running
63
64    Use queue() when all work must be done.  Use requeue() when intermediate
65    work items don't need to be done (e.g., in response to a mouse move
66    event).  Use reset() when the current item doesn't need to be completed
67    before the new event (e.g., in response to a mouse release event).  Use
68    stop() to halt the current and pending computations (e.g., in response to
69    a stop button).
70
71    The methods queue(), requeue() and reset() are proxies for the compute()
72    method in the subclass.  Look there for a description of the arguments.
73    The compute() method can be called directly to run the computation in
74    the main thread, but it should not be called if isrunning() returns true.
75
76    The constructor accepts additional keywords yieldtime=0.01 and
77    worktime=0.01 which determine the cooperative multitasking
78    behaviour.  Yield time is the duration of the sleep period
79    required to give other processes a chance to run.  Work time
80    is the duration between sleep periods.
81
82    Notifying the GUI thread of work in progress and work complete
83    is done with updatefn=updatefn and completefn=completefn arguments
84    to the constructor.  Details of the parameters to the functions
85    depend on the particular calculation class, but they will all
86    be passed as keyword arguments.  Details of how the functions
87    should be implemented vary from framework to framework.
88
89    For wx, something like the following is needed::
90
91        import wx, wx.lib.newevent
92        (CalcCompleteEvent, EVT_CALC_COMPLETE) = wx.lib.newevent.NewEvent()
93
94        # methods in the main window class of your application
95        def __init__():
96            ...
97            # Prepare the calculation in the GUI thread.
98            self.work = Work(completefn=self.CalcComplete)
99            self.Bind(EVT_CALC_COMPLETE, self.OnCalcComplete)
100            ...
101            # Bind work queue to a menu event.
102            self.Bind(wx.EVT_MENU, self.OnCalcStart, id=idCALCSTART)
103            ...
104
105        def OnCalcStart(self,event):
106            # Start the work thread from the GUI thread.
107            self.work.queue(...work unit parameters...)
108
109        def CalcComplete(self,**kwargs):
110            # Generate CalcComplete event in the calculation thread.
111            # kwargs contains field1, field2, etc. as defined by
112            # the Work thread class.
113            event = CalcCompleteEvent(**kwargs)
114            wx.PostEvent(self, event)
115
116        def OnCalcComplete(self,event):
117            # Process CalcComplete event in GUI thread.
118            # Use values from event.field1, event.field2 etc. as
119            # defined by the Work thread class to show the results.
120            ...
121    """
122
123    def __init__(self, completefn=None, updatefn=None,
124                 yieldtime=0.01, worktime=0.01,
125                 exception_handler=None):
126        """Prepare the calculator"""
127        self.yieldtime     = yieldtime
128        self.worktime      = worktime
129        self.completefn    = completefn
130        self.updatefn      = updatefn
131        self.exception_handler = exception_handler
132        self._interrupting = False
133        self._running      = False
134        self._queue        = []
135        self._lock         = thread.allocate_lock()
136        self._delay        = 1e6
137
138    def queue(self,*args,**kwargs):
139        """Add a work unit to the end of the queue.  See the compute()
140        method for details of the arguments to the work unit."""
141        self._lock.acquire()
142        self._queue.append((args, kwargs))
143        # Cannot do start_new_thread call within the lock
144        self._lock.release()
145        if not self._running:
146            self._time_for_update = clock() + 1e6
147            thread.start_new_thread(self._run, ())
148
149    def requeue(self, *args, **kwargs):
150        """Replace the work unit on the end of the queue.  See the compute()
151        method for details of the arguments to the work unit."""
152        self._lock.acquire()
153        self._queue = self._queue[:-1]
154        self._lock.release()
155        self.queue(*args, **kwargs)
156
157    def reset(self, *args, **kwargs):
158        """Clear the queue and start a new work unit.  See the compute()
159        method for details of the arguments to the work unit."""
160        self.stop()
161        self.queue(*args, **kwargs)
162
163    def stop(self):
164        """Clear the queue and stop the thread.  New items may be
165        queued after stop.  To stop just the current work item, and
166        continue the rest of the queue call the interrupt method"""
167        self._lock.acquire()
168        self._interrupting = True
169        self._queue = []
170        self._lock.release()
171
172    def interrupt(self):
173        """Stop the current work item.  To clear the work queue as
174        well call the stop() method."""
175        self._lock.acquire()
176        self._interrupting = True
177        self._lock.release()
178
179    def isrunning(self):
180        return self._running
181
182    def ready(self, delay=0.):
183        """Ready for another update after delay=t seconds.  Call
184        this for threads which can show intermediate results from
185        long calculations."""
186        self._delay = delay
187        self._lock.acquire()
188        self._time_for_update = clock() + delay
189        # print "setting _time_for_update to ",self._time_for_update
190        self._lock.release()
191
192    def isquit(self):
193        """Check for interrupts.  Should be called frequently to
194        provide user responsiveness.  Also yields to other running
195        threads, which is required for good performance on OS X."""
196
197        # Only called from within the running thread so no need to lock
198        if self._running and self.yieldtime > 0 \
199            and clock() > self._time_for_nap:
200            sleep(self.yieldtime)
201            self._time_for_nap = clock() + self.worktime
202        if self._interrupting:
203            raise KeyboardInterrupt
204
205    def update(self, **kwargs):
206        """Update GUI with the lastest results from the current work unit."""
207        if self.updatefn is not None and clock() > self._time_for_update:
208            self._lock.acquire()
209            self._time_for_update = clock() + self._delay
210            self._lock.release()
211            self._time_for_update += 1e6  # No more updates
212           
213            self.updatefn(**kwargs)
214            sleep(self.yieldtime)
215            if self._interrupting:
216                raise KeyboardInterrupt
217        else:
218            self.isquit()
219        return
220
221    def complete(self, **kwargs):
222        """Update the GUI with the completed results from a work unit."""
223        if self.completefn is not None:
224            self.completefn(**kwargs)
225            sleep(self.yieldtime)
226        return
227
228    def compute(self, *args, **kwargs):
229        """Perform a work unit.  The subclass will provide details of
230        the arguments."""
231        raise NotImplemented("Calculation thread needs compute method")
232
233    def exception(self):
234        """
235        An exception occurred during computation, so call the exception handler
236        if there is one.  If not, then log the exception and continue.
237        """
238        # If we have an exception handler, let it try to handle the exception.
239        # If it fails fall through to log the failure to handle the exception
240        # (the original exception will be lost).  If there is no exception
241        # handler, just log the exception in compute that we are responding to.
242        if self.exception_handler:
243            try:
244                self.exception_handler(*sys.exc_info())
245                return
246            except Exception:
247                pass
248        logger.error(traceback.format_exc())
249        #print 'CalcThread exception',
250
251    def _run(self):
252        """Internal function to manage the thread."""
253        # The code for condition wait in the threading package is
254        # implemented using polling.  I'll accept for now that the
255        # authors of this code are clever enough that polling is
256        # difficult to avoid.  Rather than polling, I will exit the
257        # thread when the queue is empty and start a new thread when
258        # there is more work to be done.
259        while 1:
260            self._lock.acquire()
261            self._time_for_nap = clock() + self.worktime
262            self._running = True
263            if self._queue == []:
264                break
265            self._interrupting = False
266            args, kwargs = self._queue[0]
267            self._queue = self._queue[1:]
268            self._lock.release()
269            try:
270                self.compute(*args, **kwargs)
271            except KeyboardInterrupt:
272                pass
273            except:
274                self.exception()
275        self._running = False
276
277
278# ======================================================================
279# Demonstration of calcthread in action
280class CalcDemo(CalcThread):
281    """Example of a calculation thread."""
282    def compute(self, n):
283        total = 0.
284        for i in range(n):
285            self.update(i=i)
286            for j in range(n):
287                self.isquit()
288                total += j
289        self.complete(total=total)
290
291
292class CalcCommandline:
293    """
294        Test method
295    """
296    def __init__(self, n=20000):
297        print thread.get_ident()
298        self.starttime = clock()
299        self.done = False
300        self.work = CalcDemo(completefn=self.complete,
301                             updatefn=self.update, yieldtime=0.001)
302        self.work2 = CalcDemo(completefn=self.complete,
303                             updatefn=self.update)
304        self.work3 = CalcDemo(completefn=self.complete,
305                             updatefn=self.update)
306        self.work.queue(n)
307        self.work2.queue(n)
308        self.work3.queue(n)
309        print "Expect updates from Main every second and from thread every 2.5 seconds"
310        print ""
311        self.work.ready(.5)
312        while not self.done:
313            sleep(1)
314            print "Main thread %d at %.2f" % (thread.get_ident(),
315                                              clock() - self.starttime)
316
317    def update(self, i=0):
318        print "Update i=%d from thread %d at %.2f" % (i, thread.get_ident(),
319                                                      clock() - self.starttime)
320        self.work.ready(2.5)
321
322    def complete(self, total=0.0):
323        print "Complete total=%g from thread %d at %.2f" % (total,
324                                                    thread.get_ident(),
325                                                    clock() - self.starttime)
326        self.done = True
Note: See TracBrowser for help on using the repository browser.