source: sasview/src/sas/sascalc/data_util/calcthread.py @ 0d93464

magnetic_scattrelease-4.2.2ticket-1009ticket-1094-headlessticket-1242-2d-resolutionticket-1243ticket-1249unittest-saveload
Last change on this file since 0d93464 was a1b8fee, checked in by andyfaff, 8 years ago

MAINT: from future import print_function

  • Property mode set to 100644
File size: 12.4 KB
Line 
1# This program is public domain
2
3## \file
4#  \brief Abstract class for defining calculation threads.
5#
6from __future__ import print_function
7
8import thread
9import traceback
10import sys
11import logging
12
13if sys.platform.count("darwin") > 0:
14    import time
15    stime = time.time()
16   
17    def clock():
18        return time.time() - stime
19   
20    def sleep(t):
21        return time.sleep(t)
22else:
23    from time import clock
24    from time import sleep
25
26logger = logging.getLogger(__name__)
27
28
29class CalcThread:
30    """Threaded calculation class.  Inherit from here and specialize
31    the compute() method to perform the appropriate operations for the
32    class.
33
34    If you specialize the __init__ method be sure to call
35    CalcThread.__init__, passing it the keyword arguments for
36    yieldtime, worktime, update and complete.
37   
38    When defining the compute() method you need to include code which
39    allows the GUI to run.  They are as follows: ::
40
41        self.isquit()          # call frequently to check for interrupts
42        self.update(kw=...)    # call when the GUI could be updated
43        self.complete(kw=...)  # call before exiting compute()
44
45    The update() and complete() calls accept field=value keyword
46    arguments which are passed to the called function.  complete()
47    should be called before exiting the GUI function.  A KeyboardInterrupt
48    event is triggered if the GUI signals that the computation should
49    be halted.
50
51    The following documentation should be included in the description
52    of the derived class.
53
54    The user of this class will call the following: ::
55
56        thread = Work(...,kw=...)  # prepare the work thread.
57        thread.queue(...,kw=...)   # queue a work unit
58        thread.requeue(...,kw=...) # replace work unit on the end of queue
59        thread.reset(...,kw=...)   # reset the queue to the given work unit
60        thread.stop()              # clear the queue and halt
61        thread.interrupt()         # halt the current work unit but continue
62        thread.ready(delay=0.)     # request an update signal after delay
63        thread.isrunning()         # returns true if compute() is running
64
65    Use queue() when all work must be done.  Use requeue() when intermediate
66    work items don't need to be done (e.g., in response to a mouse move
67    event).  Use reset() when the current item doesn't need to be completed
68    before the new event (e.g., in response to a mouse release event).  Use
69    stop() to halt the current and pending computations (e.g., in response to
70    a stop button).
71
72    The methods queue(), requeue() and reset() are proxies for the compute()
73    method in the subclass.  Look there for a description of the arguments.
74    The compute() method can be called directly to run the computation in
75    the main thread, but it should not be called if isrunning() returns true.
76
77    The constructor accepts additional keywords yieldtime=0.01 and
78    worktime=0.01 which determine the cooperative multitasking
79    behaviour.  Yield time is the duration of the sleep period
80    required to give other processes a chance to run.  Work time
81    is the duration between sleep periods.
82
83    Notifying the GUI thread of work in progress and work complete
84    is done with updatefn=updatefn and completefn=completefn arguments
85    to the constructor.  Details of the parameters to the functions
86    depend on the particular calculation class, but they will all
87    be passed as keyword arguments.  Details of how the functions
88    should be implemented vary from framework to framework.
89
90    For wx, something like the following is needed::
91
92        import wx, wx.lib.newevent
93        (CalcCompleteEvent, EVT_CALC_COMPLETE) = wx.lib.newevent.NewEvent()
94
95        # methods in the main window class of your application
96        def __init__():
97            ...
98            # Prepare the calculation in the GUI thread.
99            self.work = Work(completefn=self.CalcComplete)
100            self.Bind(EVT_CALC_COMPLETE, self.OnCalcComplete)
101            ...
102            # Bind work queue to a menu event.
103            self.Bind(wx.EVT_MENU, self.OnCalcStart, id=idCALCSTART)
104            ...
105
106        def OnCalcStart(self,event):
107            # Start the work thread from the GUI thread.
108            self.work.queue(...work unit parameters...)
109
110        def CalcComplete(self,**kwargs):
111            # Generate CalcComplete event in the calculation thread.
112            # kwargs contains field1, field2, etc. as defined by
113            # the Work thread class.
114            event = CalcCompleteEvent(**kwargs)
115            wx.PostEvent(self, event)
116
117        def OnCalcComplete(self,event):
118            # Process CalcComplete event in GUI thread.
119            # Use values from event.field1, event.field2 etc. as
120            # defined by the Work thread class to show the results.
121            ...
122    """
123
124    def __init__(self, completefn=None, updatefn=None,
125                 yieldtime=0.01, worktime=0.01,
126                 exception_handler=None):
127        """Prepare the calculator"""
128        self.yieldtime     = yieldtime
129        self.worktime      = worktime
130        self.completefn    = completefn
131        self.updatefn      = updatefn
132        self.exception_handler = exception_handler
133        self._interrupting = False
134        self._running      = False
135        self._queue        = []
136        self._lock         = thread.allocate_lock()
137        self._delay        = 1e6
138
139    def queue(self,*args,**kwargs):
140        """Add a work unit to the end of the queue.  See the compute()
141        method for details of the arguments to the work unit."""
142        self._lock.acquire()
143        self._queue.append((args, kwargs))
144        # Cannot do start_new_thread call within the lock
145        self._lock.release()
146        if not self._running:
147            self._time_for_update = clock() + 1e6
148            thread.start_new_thread(self._run, ())
149
150    def requeue(self, *args, **kwargs):
151        """Replace the work unit on the end of the queue.  See the compute()
152        method for details of the arguments to the work unit."""
153        self._lock.acquire()
154        self._queue = self._queue[:-1]
155        self._lock.release()
156        self.queue(*args, **kwargs)
157
158    def reset(self, *args, **kwargs):
159        """Clear the queue and start a new work unit.  See the compute()
160        method for details of the arguments to the work unit."""
161        self.stop()
162        self.queue(*args, **kwargs)
163
164    def stop(self):
165        """Clear the queue and stop the thread.  New items may be
166        queued after stop.  To stop just the current work item, and
167        continue the rest of the queue call the interrupt method"""
168        self._lock.acquire()
169        self._interrupting = True
170        self._queue = []
171        self._lock.release()
172
173    def interrupt(self):
174        """Stop the current work item.  To clear the work queue as
175        well call the stop() method."""
176        self._lock.acquire()
177        self._interrupting = True
178        self._lock.release()
179
180    def isrunning(self):
181        return self._running
182
183    def ready(self, delay=0.):
184        """Ready for another update after delay=t seconds.  Call
185        this for threads which can show intermediate results from
186        long calculations."""
187        self._delay = delay
188        self._lock.acquire()
189        self._time_for_update = clock() + delay
190        # print "setting _time_for_update to ",self._time_for_update
191        self._lock.release()
192
193    def isquit(self):
194        """Check for interrupts.  Should be called frequently to
195        provide user responsiveness.  Also yields to other running
196        threads, which is required for good performance on OS X."""
197
198        # Only called from within the running thread so no need to lock
199        if self._running and self.yieldtime > 0 \
200            and clock() > self._time_for_nap:
201            sleep(self.yieldtime)
202            self._time_for_nap = clock() + self.worktime
203        if self._interrupting:
204            raise KeyboardInterrupt
205
206    def update(self, **kwargs):
207        """Update GUI with the lastest results from the current work unit."""
208        if self.updatefn is not None and clock() > self._time_for_update:
209            self._lock.acquire()
210            self._time_for_update = clock() + self._delay
211            self._lock.release()
212            self._time_for_update += 1e6  # No more updates
213           
214            self.updatefn(**kwargs)
215            sleep(self.yieldtime)
216            if self._interrupting:
217                raise KeyboardInterrupt
218        else:
219            self.isquit()
220        return
221
222    def complete(self, **kwargs):
223        """Update the GUI with the completed results from a work unit."""
224        if self.completefn is not None:
225            self.completefn(**kwargs)
226            sleep(self.yieldtime)
227        return
228
229    def compute(self, *args, **kwargs):
230        """Perform a work unit.  The subclass will provide details of
231        the arguments."""
232        raise NotImplemented("Calculation thread needs compute method")
233
234    def exception(self):
235        """
236        An exception occurred during computation, so call the exception handler
237        if there is one.  If not, then log the exception and continue.
238        """
239        # If we have an exception handler, let it try to handle the exception.
240        # If it fails fall through to log the failure to handle the exception
241        # (the original exception will be lost).  If there is no exception
242        # handler, just log the exception in compute that we are responding to.
243        if self.exception_handler:
244            try:
245                self.exception_handler(*sys.exc_info())
246                return
247            except Exception:
248                pass
249        logger.error(traceback.format_exc())
250        #print 'CalcThread exception',
251
252    def _run(self):
253        """Internal function to manage the thread."""
254        # The code for condition wait in the threading package is
255        # implemented using polling.  I'll accept for now that the
256        # authors of this code are clever enough that polling is
257        # difficult to avoid.  Rather than polling, I will exit the
258        # thread when the queue is empty and start a new thread when
259        # there is more work to be done.
260        while 1:
261            self._lock.acquire()
262            self._time_for_nap = clock() + self.worktime
263            self._running = True
264            if self._queue == []:
265                break
266            self._interrupting = False
267            args, kwargs = self._queue[0]
268            self._queue = self._queue[1:]
269            self._lock.release()
270            try:
271                self.compute(*args, **kwargs)
272            except KeyboardInterrupt:
273                pass
274            except:
275                self.exception()
276        self._running = False
277
278
279# ======================================================================
280# Demonstration of calcthread in action
281class CalcDemo(CalcThread):
282    """Example of a calculation thread."""
283    def compute(self, n):
284        total = 0.
285        for i in range(n):
286            self.update(i=i)
287            for j in range(n):
288                self.isquit()
289                total += j
290        self.complete(total=total)
291
292
293class CalcCommandline:
294    """
295        Test method
296    """
297    def __init__(self, n=20000):
298        print(thread.get_ident())
299        self.starttime = clock()
300        self.done = False
301        self.work = CalcDemo(completefn=self.complete,
302                             updatefn=self.update, yieldtime=0.001)
303        self.work2 = CalcDemo(completefn=self.complete,
304                             updatefn=self.update)
305        self.work3 = CalcDemo(completefn=self.complete,
306                             updatefn=self.update)
307        self.work.queue(n)
308        self.work2.queue(n)
309        self.work3.queue(n)
310        print("Expect updates from Main every second and from thread every 2.5 seconds")
311        print("")
312        self.work.ready(.5)
313        while not self.done:
314            sleep(1)
315            print("Main thread %d at %.2f" % (thread.get_ident(),
316                                              clock() - self.starttime))
317
318    def update(self, i=0):
319        print("Update i=%d from thread %d at %.2f" % (i, thread.get_ident(),
320                                                      clock() - self.starttime))
321        self.work.ready(2.5)
322
323    def complete(self, total=0.0):
324        print("Complete total=%g from thread %d at %.2f" % (total,
325                                                    thread.get_ident(),
326                                                    clock() - self.starttime))
327        self.done = True
Note: See TracBrowser for help on using the repository browser.