source: sasview/src/sas/sascalc/data_util/calcthread.py @ 4320104

Last change on this file since 4320104 was 934ce649, checked in by Paul Kienzle <pkienzle@…>, 9 years ago

make sure errors in compute get reported to user

  • Property mode set to 100644
File size: 12.3 KB
RevLine 
[4bae1ef]1# This program is public domain
2
3## \file
4#  \brief Abstract class for defining calculation threads.
5#
6
7import thread
8import traceback
9import sys
10
11if sys.platform.count("darwin") > 0:
12    import time
13    stime = time.time()
14   
15    def clock():
16        return time.time() - stime
17   
18    def sleep(t):
19        return time.sleep(t)
20else:
21    from time import clock
22    from time import sleep
23
24
25class CalcThread:
26    """Threaded calculation class.  Inherit from here and specialize
27    the compute() method to perform the appropriate operations for the
28    class.
29
30    If you specialize the __init__ method be sure to call
31    CalcThread.__init__, passing it the keyword arguments for
32    yieldtime, worktime, update and complete.
33   
34    When defining the compute() method you need to include code which
[51f14603]35    allows the GUI to run.  They are as follows: ::
36
37        self.isquit()          # call frequently to check for interrupts
38        self.update(kw=...)    # call when the GUI could be updated
39        self.complete(kw=...)  # call before exiting compute()
40
[4bae1ef]41    The update() and complete() calls accept field=value keyword
42    arguments which are passed to the called function.  complete()
43    should be called before exiting the GUI function.  A KeyboardInterrupt
44    event is triggered if the GUI signals that the computation should
45    be halted.
46
47    The following documentation should be included in the description
48    of the derived class.
49
[51f14603]50    The user of this class will call the following: ::
[4bae1ef]51
[51f14603]52        thread = Work(...,kw=...)  # prepare the work thread.
53        thread.queue(...,kw=...)   # queue a work unit
54        thread.requeue(...,kw=...) # replace work unit on the end of queue
55        thread.reset(...,kw=...)   # reset the queue to the given work unit
56        thread.stop()              # clear the queue and halt
57        thread.interrupt()         # halt the current work unit but continue
58        thread.ready(delay=0.)     # request an update signal after delay
59        thread.isrunning()         # returns true if compute() is running
[4bae1ef]60
61    Use queue() when all work must be done.  Use requeue() when intermediate
62    work items don't need to be done (e.g., in response to a mouse move
63    event).  Use reset() when the current item doesn't need to be completed
64    before the new event (e.g., in response to a mouse release event).  Use
65    stop() to halt the current and pending computations (e.g., in response to
66    a stop button).
[51f14603]67
[4bae1ef]68    The methods queue(), requeue() and reset() are proxies for the compute()
69    method in the subclass.  Look there for a description of the arguments.
70    The compute() method can be called directly to run the computation in
71    the main thread, but it should not be called if isrunning() returns true.
72
73    The constructor accepts additional keywords yieldtime=0.01 and
74    worktime=0.01 which determine the cooperative multitasking
75    behaviour.  Yield time is the duration of the sleep period
76    required to give other processes a chance to run.  Work time
77    is the duration between sleep periods.
78
79    Notifying the GUI thread of work in progress and work complete
80    is done with updatefn=updatefn and completefn=completefn arguments
81    to the constructor.  Details of the parameters to the functions
82    depend on the particular calculation class, but they will all
83    be passed as keyword arguments.  Details of how the functions
84    should be implemented vary from framework to framework.
85
[373d4ee]86    For wx, something like the following is needed::
[4bae1ef]87
88        import wx, wx.lib.newevent
89        (CalcCompleteEvent, EVT_CALC_COMPLETE) = wx.lib.newevent.NewEvent()
90
91        # methods in the main window class of your application
92        def __init__():
93            ...
94            # Prepare the calculation in the GUI thread.
95            self.work = Work(completefn=self.CalcComplete)
96            self.Bind(EVT_CALC_COMPLETE, self.OnCalcComplete)
97            ...
98            # Bind work queue to a menu event.
99            self.Bind(wx.EVT_MENU, self.OnCalcStart, id=idCALCSTART)
100            ...
101
102        def OnCalcStart(self,event):
103            # Start the work thread from the GUI thread.
104            self.work.queue(...work unit parameters...)
105
106        def CalcComplete(self,**kwargs):
107            # Generate CalcComplete event in the calculation thread.
108            # kwargs contains field1, field2, etc. as defined by
109            # the Work thread class.
110            event = CalcCompleteEvent(**kwargs)
111            wx.PostEvent(self, event)
112
113        def OnCalcComplete(self,event):
114            # Process CalcComplete event in GUI thread.
115            # Use values from event.field1, event.field2 etc. as
116            # defined by the Work thread class to show the results.
117            ...
118    """
119
120    def __init__(self, completefn=None, updatefn=None,
[934ce649]121                 yieldtime=0.01, worktime=0.01,
122                 exception_handler=None):
[4bae1ef]123        """Prepare the calculator"""
124        self.yieldtime     = yieldtime
125        self.worktime      = worktime
126        self.completefn    = completefn
127        self.updatefn      = updatefn
[934ce649]128        self.exception_handler = exception_handler
[4bae1ef]129        self._interrupting = False
130        self._running      = False
131        self._queue        = []
132        self._lock         = thread.allocate_lock()
133        self._delay        = 1e6
134
135    def queue(self,*args,**kwargs):
136        """Add a work unit to the end of the queue.  See the compute()
137        method for details of the arguments to the work unit."""
138        self._lock.acquire()
139        self._queue.append((args, kwargs))
140        # Cannot do start_new_thread call within the lock
141        self._lock.release()
142        if not self._running:
143            self._time_for_update = clock() + 1e6
144            thread.start_new_thread(self._run, ())
145
146    def requeue(self, *args, **kwargs):
147        """Replace the work unit on the end of the queue.  See the compute()
148        method for details of the arguments to the work unit."""
149        self._lock.acquire()
150        self._queue = self._queue[:-1]
151        self._lock.release()
152        self.queue(*args, **kwargs)
153
154    def reset(self, *args, **kwargs):
155        """Clear the queue and start a new work unit.  See the compute()
156        method for details of the arguments to the work unit."""
157        self.stop()
158        self.queue(*args, **kwargs)
159
160    def stop(self):
161        """Clear the queue and stop the thread.  New items may be
162        queued after stop.  To stop just the current work item, and
163        continue the rest of the queue call the interrupt method"""
164        self._lock.acquire()
165        self._interrupting = True
166        self._queue = []
167        self._lock.release()
168
169    def interrupt(self):
170        """Stop the current work item.  To clear the work queue as
171        well call the stop() method."""
172        self._lock.acquire()
173        self._interrupting = True
174        self._lock.release()
175
176    def isrunning(self):
177        return self._running
178
179    def ready(self, delay=0.):
180        """Ready for another update after delay=t seconds.  Call
181        this for threads which can show intermediate results from
182        long calculations."""
183        self._delay = delay
184        self._lock.acquire()
185        self._time_for_update = clock() + delay
186        # print "setting _time_for_update to ",self._time_for_update
187        self._lock.release()
188
189    def isquit(self):
190        """Check for interrupts.  Should be called frequently to
191        provide user responsiveness.  Also yields to other running
192        threads, which is required for good performance on OS X."""
193
194        # Only called from within the running thread so no need to lock
195        if self._running and self.yieldtime > 0 \
196            and clock() > self._time_for_nap:
197            sleep(self.yieldtime)
198            self._time_for_nap = clock() + self.worktime
199        if self._interrupting:
200            raise KeyboardInterrupt
201
202    def update(self, **kwargs):
203        """Update GUI with the lastest results from the current work unit."""
204        if self.updatefn != None and clock() > self._time_for_update:
205            self._lock.acquire()
206            self._time_for_update = clock() + self._delay
207            self._lock.release()
208            self._time_for_update += 1e6  # No more updates
209           
210            self.updatefn(**kwargs)
211            sleep(self.yieldtime)
212            if self._interrupting:
213                raise KeyboardInterrupt
214        else:
215            self.isquit()
216        return
217
218    def complete(self, **kwargs):
219        """Update the GUI with the completed results from a work unit."""
220        if self.completefn != None:
221            self.completefn(**kwargs)
222            sleep(self.yieldtime)
223        return
224
225    def compute(self, *args, **kwargs):
226        """Perform a work unit.  The subclass will provide details of
227        the arguments."""
[934ce649]228        raise NotImplemented("Calculation thread needs compute method")
229
230    def exception(self):
231        """
232        An exception occurred during computation, so call the exception handler
233        if there is one.  If not, then log the exception and continue.
234        """
235        # If we have an exception handler, let it try to handle the exception.
236        # If it fails fall through to log the failure to handle the exception
237        # (the original exception will be lost).  If there is no exception
238        # handler, just log the exception in compute that we are responding to.
239        if self.exception_handler:
240            try:
241                self.exception_handler(*sys.exc_info())
242                return
243            except Exception:
244                pass
245        import logging
246        logging.error(traceback.format_exc())
247        #print 'CalcThread exception',
[4bae1ef]248
249    def _run(self):
250        """Internal function to manage the thread."""
251        # The code for condition wait in the threading package is
252        # implemented using polling.  I'll accept for now that the
253        # authors of this code are clever enough that polling is
254        # difficult to avoid.  Rather than polling, I will exit the
255        # thread when the queue is empty and start a new thread when
256        # there is more work to be done.
257        while 1:
258            self._lock.acquire()
259            self._time_for_nap = clock() + self.worktime
260            self._running = True
261            if self._queue == []:
262                break
263            self._interrupting = False
264            args, kwargs = self._queue[0]
265            self._queue = self._queue[1:]
266            self._lock.release()
267            try:
268                self.compute(*args, **kwargs)
269            except KeyboardInterrupt:
270                pass
271            except:
[934ce649]272                self.exception()
[4bae1ef]273        self._running = False
274
275
276# ======================================================================
277# Demonstration of calcthread in action
278class CalcDemo(CalcThread):
279    """Example of a calculation thread."""
280    def compute(self, n):
281        total = 0.
282        for i in range(n):
283            self.update(i=i)
284            for j in range(n):
285                self.isquit()
286                total += j
287        self.complete(total=total)
288
289
290class CalcCommandline:
291    """
292        Test method
293    """
294    def __init__(self, n=20000):
295        print thread.get_ident()
296        self.starttime = clock()
297        self.done = False
298        self.work = CalcDemo(completefn=self.complete,
299                             updatefn=self.update, yieldtime=0.001)
300        self.work2 = CalcDemo(completefn=self.complete,
301                             updatefn=self.update)
302        self.work3 = CalcDemo(completefn=self.complete,
303                             updatefn=self.update)
304        self.work.queue(n)
305        self.work2.queue(n)
306        self.work3.queue(n)
307        print "Expect updates from Main every second and from thread every 2.5 seconds"
308        print ""
309        self.work.ready(.5)
310        while not self.done:
311            sleep(1)
312            print "Main thread %d at %.2f" % (thread.get_ident(),
313                                              clock() - self.starttime)
314
315    def update(self, i=0):
316        print "Update i=%d from thread %d at %.2f" % (i, thread.get_ident(),
317                                                      clock() - self.starttime)
318        self.work.ready(2.5)
319
320    def complete(self, total=0.0):
321        print "Complete total=%g from thread %d at %.2f" % (total,
322                                                    thread.get_ident(),
323                                                    clock() - self.starttime)
324        self.done = True
Note: See TracBrowser for help on using the repository browser.