source: sasview/src/sas/sascalc/data_util/calcthread.py @ a7067ef2

Last change on this file since a7067ef2 was 574adc7, checked in by Paul Kienzle <pkienzle@…>, 7 years ago

convert sascalc to python 2/3 syntax

  • Property mode set to 100644
File size: 12.4 KB
Line 
1# This program is public domain
2
3## \file
4#  \brief Abstract class for defining calculation threads.
5#
6from __future__ import print_function
7
8import traceback
9import sys
10import logging
11try:
12    import _thread as thread
13except ImportError: # CRUFT: python 2 support
14    import thread
15
16if sys.platform.count("darwin") > 0:
17    import time
18    stime = time.time()
19
20    def clock():
21        return time.time() - stime
22
23    def sleep(t):
24        return time.sleep(t)
25else:
26    from time import clock
27    from time import sleep
28
29logger = logging.getLogger(__name__)
30
31
32class CalcThread:
33    """Threaded calculation class.  Inherit from here and specialize
34    the compute() method to perform the appropriate operations for the
35    class.
36
37    If you specialize the __init__ method be sure to call
38    CalcThread.__init__, passing it the keyword arguments for
39    yieldtime, worktime, update and complete.
40
41    When defining the compute() method you need to include code which
42    allows the GUI to run.  They are as follows: ::
43
44        self.isquit()          # call frequently to check for interrupts
45        self.update(kw=...)    # call when the GUI could be updated
46        self.complete(kw=...)  # call before exiting compute()
47
48    The update() and complete() calls accept field=value keyword
49    arguments which are passed to the called function.  complete()
50    should be called before exiting the GUI function.  A KeyboardInterrupt
51    event is triggered if the GUI signals that the computation should
52    be halted.
53
54    The following documentation should be included in the description
55    of the derived class.
56
57    The user of this class will call the following: ::
58
59        thread = Work(...,kw=...)  # prepare the work thread.
60        thread.queue(...,kw=...)   # queue a work unit
61        thread.requeue(...,kw=...) # replace work unit on the end of queue
62        thread.reset(...,kw=...)   # reset the queue to the given work unit
63        thread.stop()              # clear the queue and halt
64        thread.interrupt()         # halt the current work unit but continue
65        thread.ready(delay=0.)     # request an update signal after delay
66        thread.isrunning()         # returns true if compute() is running
67
68    Use queue() when all work must be done.  Use requeue() when intermediate
69    work items don't need to be done (e.g., in response to a mouse move
70    event).  Use reset() when the current item doesn't need to be completed
71    before the new event (e.g., in response to a mouse release event).  Use
72    stop() to halt the current and pending computations (e.g., in response to
73    a stop button).
74
75    The methods queue(), requeue() and reset() are proxies for the compute()
76    method in the subclass.  Look there for a description of the arguments.
77    The compute() method can be called directly to run the computation in
78    the main thread, but it should not be called if isrunning() returns true.
79
80    The constructor accepts additional keywords yieldtime=0.01 and
81    worktime=0.01 which determine the cooperative multitasking
82    behaviour.  Yield time is the duration of the sleep period
83    required to give other processes a chance to run.  Work time
84    is the duration between sleep periods.
85
86    Notifying the GUI thread of work in progress and work complete
87    is done with updatefn=updatefn and completefn=completefn arguments
88    to the constructor.  Details of the parameters to the functions
89    depend on the particular calculation class, but they will all
90    be passed as keyword arguments.  Details of how the functions
91    should be implemented vary from framework to framework.
92
93    For wx, something like the following is needed::
94
95        import wx, wx.lib.newevent
96        (CalcCompleteEvent, EVT_CALC_COMPLETE) = wx.lib.newevent.NewEvent()
97
98        # methods in the main window class of your application
99        def __init__():
100            ...
101            # Prepare the calculation in the GUI thread.
102            self.work = Work(completefn=self.CalcComplete)
103            self.Bind(EVT_CALC_COMPLETE, self.OnCalcComplete)
104            ...
105            # Bind work queue to a menu event.
106            self.Bind(wx.EVT_MENU, self.OnCalcStart, id=idCALCSTART)
107            ...
108
109        def OnCalcStart(self,event):
110            # Start the work thread from the GUI thread.
111            self.work.queue(...work unit parameters...)
112
113        def CalcComplete(self,**kwargs):
114            # Generate CalcComplete event in the calculation thread.
115            # kwargs contains field1, field2, etc. as defined by
116            # the Work thread class.
117            event = CalcCompleteEvent(**kwargs)
118            wx.PostEvent(self, event)
119
120        def OnCalcComplete(self,event):
121            # Process CalcComplete event in GUI thread.
122            # Use values from event.field1, event.field2 etc. as
123            # defined by the Work thread class to show the results.
124            ...
125    """
126
127    def __init__(self, completefn=None, updatefn=None,
128                 yieldtime=0.01, worktime=0.01,
129                 exception_handler=None):
130        """Prepare the calculator"""
131        self.yieldtime     = yieldtime
132        self.worktime      = worktime
133        self.completefn    = completefn
134        self.updatefn      = updatefn
135        self.exception_handler = exception_handler
136        self._interrupting = False
137        self._running      = False
138        self._queue        = []
139        self._lock         = thread.allocate_lock()
140        self._delay        = 1e6
141
142    def queue(self,*args,**kwargs):
143        """Add a work unit to the end of the queue.  See the compute()
144        method for details of the arguments to the work unit."""
145        self._lock.acquire()
146        self._queue.append((args, kwargs))
147        # Cannot do start_new_thread call within the lock
148        self._lock.release()
149        if not self._running:
150            self._time_for_update = clock() + 1e6
151            thread.start_new_thread(self._run, ())
152
153    def requeue(self, *args, **kwargs):
154        """Replace the work unit on the end of the queue.  See the compute()
155        method for details of the arguments to the work unit."""
156        self._lock.acquire()
157        self._queue = self._queue[:-1]
158        self._lock.release()
159        self.queue(*args, **kwargs)
160
161    def reset(self, *args, **kwargs):
162        """Clear the queue and start a new work unit.  See the compute()
163        method for details of the arguments to the work unit."""
164        self.stop()
165        self.queue(*args, **kwargs)
166
167    def stop(self):
168        """Clear the queue and stop the thread.  New items may be
169        queued after stop.  To stop just the current work item, and
170        continue the rest of the queue call the interrupt method"""
171        self._lock.acquire()
172        self._interrupting = True
173        self._queue = []
174        self._lock.release()
175
176    def interrupt(self):
177        """Stop the current work item.  To clear the work queue as
178        well call the stop() method."""
179        self._lock.acquire()
180        self._interrupting = True
181        self._lock.release()
182
183    def isrunning(self):
184        return self._running
185
186    def ready(self, delay=0.):
187        """Ready for another update after delay=t seconds.  Call
188        this for threads which can show intermediate results from
189        long calculations."""
190        self._delay = delay
191        self._lock.acquire()
192        self._time_for_update = clock() + delay
193        # print "setting _time_for_update to ",self._time_for_update
194        self._lock.release()
195
196    def isquit(self):
197        """Check for interrupts.  Should be called frequently to
198        provide user responsiveness.  Also yields to other running
199        threads, which is required for good performance on OS X."""
200
201        # Only called from within the running thread so no need to lock
202        if self._running and self.yieldtime > 0 \
203            and clock() > self._time_for_nap:
204            sleep(self.yieldtime)
205            self._time_for_nap = clock() + self.worktime
206        if self._interrupting:
207            raise KeyboardInterrupt
208
209    def update(self, **kwargs):
210        """Update GUI with the lastest results from the current work unit."""
211        if self.updatefn is not None and clock() > self._time_for_update:
212            self._lock.acquire()
213            self._time_for_update = clock() + self._delay
214            self._lock.release()
215            self._time_for_update += 1e6  # No more updates
216
217            self.updatefn(**kwargs)
218            sleep(self.yieldtime)
219            if self._interrupting:
220                raise KeyboardInterrupt
221        else:
222            self.isquit()
223        return
224
225    def complete(self, **kwargs):
226        """Update the GUI with the completed results from a work unit."""
227        if self.completefn is not None:
228            self.completefn(**kwargs)
229            sleep(self.yieldtime)
230        return
231
232    def compute(self, *args, **kwargs):
233        """Perform a work unit.  The subclass will provide details of
234        the arguments."""
235        raise NotImplemented("Calculation thread needs compute method")
236
237    def exception(self):
238        """
239        An exception occurred during computation, so call the exception handler
240        if there is one.  If not, then log the exception and continue.
241        """
242        # If we have an exception handler, let it try to handle the exception.
243        # If it fails fall through to log the failure to handle the exception
244        # (the original exception will be lost).  If there is no exception
245        # handler, just log the exception in compute that we are responding to.
246        if self.exception_handler:
247            try:
248                self.exception_handler(*sys.exc_info())
249                return
250            except Exception:
251                pass
252        logger.error(traceback.format_exc())
253        #print 'CalcThread exception',
254
255    def _run(self):
256        """Internal function to manage the thread."""
257        # The code for condition wait in the threading package is
258        # implemented using polling.  I'll accept for now that the
259        # authors of this code are clever enough that polling is
260        # difficult to avoid.  Rather than polling, I will exit the
261        # thread when the queue is empty and start a new thread when
262        # there is more work to be done.
263        while 1:
264            self._lock.acquire()
265            self._time_for_nap = clock() + self.worktime
266            self._running = True
267            if self._queue == []:
268                break
269            self._interrupting = False
270            args, kwargs = self._queue[0]
271            self._queue = self._queue[1:]
272            self._lock.release()
273            try:
274                self.compute(*args, **kwargs)
275            except KeyboardInterrupt:
276                pass
277            except:
278                self.exception()
279        self._running = False
280
281
282# ======================================================================
283# Demonstration of calcthread in action
284class CalcDemo(CalcThread):
285    """Example of a calculation thread."""
286    def compute(self, n):
287        total = 0.
288        for i in range(n):
289            self.update(i=i)
290            for j in range(n):
291                self.isquit()
292                total += j
293        self.complete(total=total)
294
295
296class CalcCommandline:
297    """
298        Test method
299    """
300    def __init__(self, n=20000):
301        print(thread.get_ident())
302        self.starttime = clock()
303        self.done = False
304        self.work = CalcDemo(completefn=self.complete,
305                             updatefn=self.update, yieldtime=0.001)
306        self.work2 = CalcDemo(completefn=self.complete,
307                             updatefn=self.update)
308        self.work3 = CalcDemo(completefn=self.complete,
309                             updatefn=self.update)
310        self.work.queue(n)
311        self.work2.queue(n)
312        self.work3.queue(n)
313        print("Expect updates from Main every second and from thread every 2.5 seconds")
314        print("")
315        self.work.ready(.5)
316        while not self.done:
317            sleep(1)
318            print("Main thread %d at %.2f" % (thread.get_ident(),
319                                              clock() - self.starttime))
320
321    def update(self, i=0):
322        print("Update i=%d from thread %d at %.2f" % (i, thread.get_ident(),
323                                                      clock() - self.starttime))
324        self.work.ready(2.5)
325
326    def complete(self, total=0.0):
327        print("Complete total=%g from thread %d at %.2f" % (total,
328                                                    thread.get_ident(),
329                                                    clock() - self.starttime))
330        self.done = True
Note: See TracBrowser for help on using the repository browser.