source: sasview/src/sas/sascalc/data_util/calcthread.py @ 44a698c

ESS_GUI_sync_sascalc
Last change on this file since 44a698c was 44a698c, checked in by Piotr Rozyczko <piotr.rozyczko@…>, 5 years ago

Cherry-picked changes from py37-sascalc branch.

  • Property mode set to 100644
File size: 12.4 KB
Line 
1# This program is public domain
2
3## \file
4#  \brief Abstract class for defining calculation threads.
5#
6from __future__ import print_function
7
8import sys
9import logging
10import traceback
11from time import sleep
12
13try:
14    import _thread as thread
15except ImportError: # CRUFT: python 2 support
16    import thread
17try:
18    from time import perf_counter as clock
19except ImportError: # CRUFT: python < 3.3
20    if sys.platform.count("darwin") > 0:
21        from time import time as clock
22    else:
23        from time import clock
24
25logger = logging.getLogger(__name__)
26
27class CalcThread:
28    """Threaded calculation class.  Inherit from here and specialize
29    the compute() method to perform the appropriate operations for the
30    class.
31
32    If you specialize the __init__ method be sure to call
33    CalcThread.__init__, passing it the keyword arguments for
34    yieldtime, worktime, update and complete.
35
36    When defining the compute() method you need to include code which
37    allows the GUI to run.  They are as follows: ::
38
39        self.isquit()          # call frequently to check for interrupts
40        self.update(kw=...)    # call when the GUI could be updated
41        self.complete(kw=...)  # call before exiting compute()
42
43    The update() and complete() calls accept field=value keyword
44    arguments which are passed to the called function.  complete()
45    should be called before exiting the GUI function.  A KeyboardInterrupt
46    event is triggered if the GUI signals that the computation should
47    be halted.
48
49    The following documentation should be included in the description
50    of the derived class.
51
52    The user of this class will call the following: ::
53
54        thread = Work(...,kw=...)  # prepare the work thread.
55        thread.queue(...,kw=...)   # queue a work unit
56        thread.requeue(...,kw=...) # replace work unit on the end of queue
57        thread.reset(...,kw=...)   # reset the queue to the given work unit
58        thread.stop()              # clear the queue and halt
59        thread.interrupt()         # halt the current work unit but continue
60        thread.ready(delay=0.)     # request an update signal after delay
61        thread.isrunning()         # returns true if compute() is running
62
63    Use queue() when all work must be done.  Use requeue() when intermediate
64    work items don't need to be done (e.g., in response to a mouse move
65    event).  Use reset() when the current item doesn't need to be completed
66    before the new event (e.g., in response to a mouse release event).  Use
67    stop() to halt the current and pending computations (e.g., in response to
68    a stop button).
69
70    The methods queue(), requeue() and reset() are proxies for the compute()
71    method in the subclass.  Look there for a description of the arguments.
72    The compute() method can be called directly to run the computation in
73    the main thread, but it should not be called if isrunning() returns true.
74
75    The constructor accepts additional keywords yieldtime=0.01 and
76    worktime=0.01 which determine the cooperative multitasking
77    behaviour.  Yield time is the duration of the sleep period
78    required to give other processes a chance to run.  Work time
79    is the duration between sleep periods.
80
81    Notifying the GUI thread of work in progress and work complete
82    is done with updatefn=updatefn and completefn=completefn arguments
83    to the constructor.  Details of the parameters to the functions
84    depend on the particular calculation class, but they will all
85    be passed as keyword arguments.  Details of how the functions
86    should be implemented vary from framework to framework.
87
88    For wx, something like the following is needed::
89
90        import wx, wx.lib.newevent
91        (CalcCompleteEvent, EVT_CALC_COMPLETE) = wx.lib.newevent.NewEvent()
92
93        # methods in the main window class of your application
94        def __init__():
95            ...
96            # Prepare the calculation in the GUI thread.
97            self.work = Work(completefn=self.CalcComplete)
98            self.Bind(EVT_CALC_COMPLETE, self.OnCalcComplete)
99            ...
100            # Bind work queue to a menu event.
101            self.Bind(wx.EVT_MENU, self.OnCalcStart, id=idCALCSTART)
102            ...
103
104        def OnCalcStart(self,event):
105            # Start the work thread from the GUI thread.
106            self.work.queue(...work unit parameters...)
107
108        def CalcComplete(self,**kwargs):
109            # Generate CalcComplete event in the calculation thread.
110            # kwargs contains field1, field2, etc. as defined by
111            # the Work thread class.
112            event = CalcCompleteEvent(**kwargs)
113            wx.PostEvent(self, event)
114
115        def OnCalcComplete(self,event):
116            # Process CalcComplete event in GUI thread.
117            # Use values from event.field1, event.field2 etc. as
118            # defined by the Work thread class to show the results.
119            ...
120    """
121
122    def __init__(self, completefn=None, updatefn=None,
123                 yieldtime=0.01, worktime=0.01,
124                 exception_handler=None):
125        """Prepare the calculator"""
126        self.yieldtime     = yieldtime
127        self.worktime      = worktime
128        self.completefn    = completefn
129        self.updatefn      = updatefn
130        self.exception_handler = exception_handler
131        self._interrupting = False
132        self._running      = False
133        self._queue        = []
134        self._lock         = thread.allocate_lock()
135        self._delay        = 1e6
136
137    def queue(self,*args,**kwargs):
138        """Add a work unit to the end of the queue.  See the compute()
139        method for details of the arguments to the work unit."""
140        self._lock.acquire()
141        self._queue.append((args, kwargs))
142        # Cannot do start_new_thread call within the lock
143        self._lock.release()
144        if not self._running:
145            self._time_for_update = clock() + 1e6
146            thread.start_new_thread(self._run, ())
147
148    def requeue(self, *args, **kwargs):
149        """Replace the work unit on the end of the queue.  See the compute()
150        method for details of the arguments to the work unit."""
151        self._lock.acquire()
152        self._queue = self._queue[:-1]
153        self._lock.release()
154        self.queue(*args, **kwargs)
155
156    def reset(self, *args, **kwargs):
157        """Clear the queue and start a new work unit.  See the compute()
158        method for details of the arguments to the work unit."""
159        self.stop()
160        self.queue(*args, **kwargs)
161
162    def stop(self):
163        """Clear the queue and stop the thread.  New items may be
164        queued after stop.  To stop just the current work item, and
165        continue the rest of the queue call the interrupt method"""
166        self._lock.acquire()
167        self._interrupting = True
168        self._queue = []
169        self._lock.release()
170
171    def interrupt(self):
172        """Stop the current work item.  To clear the work queue as
173        well call the stop() method."""
174        self._lock.acquire()
175        self._interrupting = True
176        self._lock.release()
177
178    def isrunning(self):
179        return self._running
180
181    def ready(self, delay=0.):
182        """Ready for another update after delay=t seconds.  Call
183        this for threads which can show intermediate results from
184        long calculations."""
185        self._delay = delay
186        self._lock.acquire()
187        self._time_for_update = clock() + delay
188        # print "setting _time_for_update to ",self._time_for_update
189        self._lock.release()
190
191    def isquit(self):
192        """Check for interrupts.  Should be called frequently to
193        provide user responsiveness.  Also yields to other running
194        threads, which is required for good performance on OS X."""
195
196        # Only called from within the running thread so no need to lock
197        if self._running and self.yieldtime > 0 \
198            and clock() > self._time_for_nap:
199            sleep(self.yieldtime)
200            self._time_for_nap = clock() + self.worktime
201        if self._interrupting:
202            raise KeyboardInterrupt
203
204    def update(self, **kwargs):
205        """Update GUI with the lastest results from the current work unit."""
206        if self.updatefn is not None and clock() > self._time_for_update:
207            self._lock.acquire()
208            self._time_for_update = clock() + self._delay
209            self._lock.release()
210            self._time_for_update += 1e6  # No more updates
211
212            self.updatefn(**kwargs)
213            sleep(self.yieldtime)
214            if self._interrupting:
215                raise KeyboardInterrupt
216        else:
217            self.isquit()
218        return
219
220    def complete(self, **kwargs):
221        """Update the GUI with the completed results from a work unit."""
222        if self.completefn is not None:
223            self.completefn(**kwargs)
224            sleep(self.yieldtime)
225        return
226
227    def compute(self, *args, **kwargs):
228        """Perform a work unit.  The subclass will provide details of
229        the arguments."""
230        raise NotImplemented("Calculation thread needs compute method")
231
232    def exception(self):
233        """
234        An exception occurred during computation, so call the exception handler
235        if there is one.  If not, then log the exception and continue.
236        """
237        # If we have an exception handler, let it try to handle the exception.
238        # If it fails fall through to log the failure to handle the exception
239        # (the original exception will be lost).  If there is no exception
240        # handler, just log the exception in compute that we are responding to.
241        if self.exception_handler:
242            try:
243                self.exception_handler(*sys.exc_info())
244                return
245            except Exception as exc:
246                pass
247        logger.error(traceback.format_exc())
248        #print 'CalcThread exception',
249
250    def _run(self):
251        """Internal function to manage the thread."""
252        # The code for condition wait in the threading package is
253        # implemented using polling.  I'll accept for now that the
254        # authors of this code are clever enough that polling is
255        # difficult to avoid.  Rather than polling, I will exit the
256        # thread when the queue is empty and start a new thread when
257        # there is more work to be done.
258        while 1:
259            self._lock.acquire()
260            self._time_for_nap = clock() + self.worktime
261            self._running = True
262            if self._queue == []:
263                break
264            self._interrupting = False
265            args, kwargs = self._queue[0]
266            self._queue = self._queue[1:]
267            self._lock.release()
268            try:
269                self.compute(*args, **kwargs)
270            except KeyboardInterrupt:
271                pass
272            except:
273                self.exception()
274        self._running = False
275
276
277# ======================================================================
278# Demonstration of calcthread in action
279class CalcDemo(CalcThread):
280    """Example of a calculation thread."""
281    def compute(self, n):
282        total = 0.
283        for i in range(n):
284            self.update(i=i)
285            for j in range(n):
286                self.isquit()
287                total += j
288        self.complete(total=total)
289
290
291class CalcCommandline:
292    """
293        Test method
294    """
295    def __init__(self, n=20000):
296        print(thread.get_ident())
297        self.starttime = clock()
298        self.done = False
299        self.work = CalcDemo(completefn=self.complete,
300                             updatefn=self.update, yieldtime=0.001)
301        self.work2 = CalcDemo(completefn=self.complete,
302                             updatefn=self.update)
303        self.work3 = CalcDemo(completefn=self.complete,
304                             updatefn=self.update)
305        self.work.queue(n)
306        self.work2.queue(n)
307        self.work3.queue(n)
308        print("Expect updates from Main every second and from thread every 2.5 seconds")
309        print("")
310        self.work.ready(.5)
311        while not self.done:
312            sleep(1)
313            print("Main thread %d at %.2f" % (thread.get_ident(),
314                                              clock() - self.starttime))
315
316    def update(self, i=0):
317        print("Update i=%d from thread %d at %.2f" % (i, thread.get_ident(),
318                                                      clock() - self.starttime))
319        self.work.ready(2.5)
320
321    def complete(self, total=0.0):
322        print("Complete total=%g from thread %d at %.2f" % (total,
323                                                    thread.get_ident(),
324                                                    clock() - self.starttime))
325        self.done = True
Note: See TracBrowser for help on using the repository browser.