source: sasview/src/sas/sascalc/data_util/calcthread.py @ 463e7ffc

ESS_GUIESS_GUI_DocsESS_GUI_batch_fittingESS_GUI_bumps_abstractionESS_GUI_iss1116ESS_GUI_iss879ESS_GUI_iss959ESS_GUI_openclESS_GUI_orderingESS_GUI_sync_sascalccostrafo411magnetic_scattrelease-4.2.2ticket-1009ticket-1094-headlessticket-1242-2d-resolutionticket-1243ticket-1249ticket885unittest-saveload
Last change on this file since 463e7ffc was 463e7ffc, checked in by Ricardo Ferraz Leal <ricleal@…>, 7 years ago

getLogger with module name

  • Property mode set to 100644
File size: 12.3 KB
Line 
1# This program is public domain
2
3## \file
4#  \brief Abstract class for defining calculation threads.
5#
6
7import thread
8import traceback
9import sys
10import logging
11
12if sys.platform.count("darwin") > 0:
13    import time
14    stime = time.time()
15   
16    def clock():
17        return time.time() - stime
18   
19    def sleep(t):
20        return time.sleep(t)
21else:
22    from time import clock
23    from time import sleep
24
25logger = logging.getLogger(__name__)
26
27
28class CalcThread:
29    """Threaded calculation class.  Inherit from here and specialize
30    the compute() method to perform the appropriate operations for the
31    class.
32
33    If you specialize the __init__ method be sure to call
34    CalcThread.__init__, passing it the keyword arguments for
35    yieldtime, worktime, update and complete.
36   
37    When defining the compute() method you need to include code which
38    allows the GUI to run.  They are as follows: ::
39
40        self.isquit()          # call frequently to check for interrupts
41        self.update(kw=...)    # call when the GUI could be updated
42        self.complete(kw=...)  # call before exiting compute()
43
44    The update() and complete() calls accept field=value keyword
45    arguments which are passed to the called function.  complete()
46    should be called before exiting the GUI function.  A KeyboardInterrupt
47    event is triggered if the GUI signals that the computation should
48    be halted.
49
50    The following documentation should be included in the description
51    of the derived class.
52
53    The user of this class will call the following: ::
54
55        thread = Work(...,kw=...)  # prepare the work thread.
56        thread.queue(...,kw=...)   # queue a work unit
57        thread.requeue(...,kw=...) # replace work unit on the end of queue
58        thread.reset(...,kw=...)   # reset the queue to the given work unit
59        thread.stop()              # clear the queue and halt
60        thread.interrupt()         # halt the current work unit but continue
61        thread.ready(delay=0.)     # request an update signal after delay
62        thread.isrunning()         # returns true if compute() is running
63
64    Use queue() when all work must be done.  Use requeue() when intermediate
65    work items don't need to be done (e.g., in response to a mouse move
66    event).  Use reset() when the current item doesn't need to be completed
67    before the new event (e.g., in response to a mouse release event).  Use
68    stop() to halt the current and pending computations (e.g., in response to
69    a stop button).
70
71    The methods queue(), requeue() and reset() are proxies for the compute()
72    method in the subclass.  Look there for a description of the arguments.
73    The compute() method can be called directly to run the computation in
74    the main thread, but it should not be called if isrunning() returns true.
75
76    The constructor accepts additional keywords yieldtime=0.01 and
77    worktime=0.01 which determine the cooperative multitasking
78    behaviour.  Yield time is the duration of the sleep period
79    required to give other processes a chance to run.  Work time
80    is the duration between sleep periods.
81
82    Notifying the GUI thread of work in progress and work complete
83    is done with updatefn=updatefn and completefn=completefn arguments
84    to the constructor.  Details of the parameters to the functions
85    depend on the particular calculation class, but they will all
86    be passed as keyword arguments.  Details of how the functions
87    should be implemented vary from framework to framework.
88
89    For wx, something like the following is needed::
90
91        import wx, wx.lib.newevent
92        (CalcCompleteEvent, EVT_CALC_COMPLETE) = wx.lib.newevent.NewEvent()
93
94        # methods in the main window class of your application
95        def __init__():
96            ...
97            # Prepare the calculation in the GUI thread.
98            self.work = Work(completefn=self.CalcComplete)
99            self.Bind(EVT_CALC_COMPLETE, self.OnCalcComplete)
100            ...
101            # Bind work queue to a menu event.
102            self.Bind(wx.EVT_MENU, self.OnCalcStart, id=idCALCSTART)
103            ...
104
105        def OnCalcStart(self,event):
106            # Start the work thread from the GUI thread.
107            self.work.queue(...work unit parameters...)
108
109        def CalcComplete(self,**kwargs):
110            # Generate CalcComplete event in the calculation thread.
111            # kwargs contains field1, field2, etc. as defined by
112            # the Work thread class.
113            event = CalcCompleteEvent(**kwargs)
114            wx.PostEvent(self, event)
115
116        def OnCalcComplete(self,event):
117            # Process CalcComplete event in GUI thread.
118            # Use values from event.field1, event.field2 etc. as
119            # defined by the Work thread class to show the results.
120            ...
121    """
122
123    def __init__(self, completefn=None, updatefn=None,
124                 yieldtime=0.01, worktime=0.01,
125                 exception_handler=None):
126        """Prepare the calculator"""
127        self.yieldtime     = yieldtime
128        self.worktime      = worktime
129        self.completefn    = completefn
130        self.updatefn      = updatefn
131        self.exception_handler = exception_handler
132        self._interrupting = False
133        self._running      = False
134        self._queue        = []
135        self._lock         = thread.allocate_lock()
136        self._delay        = 1e6
137
138    def queue(self,*args,**kwargs):
139        """Add a work unit to the end of the queue.  See the compute()
140        method for details of the arguments to the work unit."""
141        self._lock.acquire()
142        self._queue.append((args, kwargs))
143        # Cannot do start_new_thread call within the lock
144        self._lock.release()
145        if not self._running:
146            self._time_for_update = clock() + 1e6
147            thread.start_new_thread(self._run, ())
148
149    def requeue(self, *args, **kwargs):
150        """Replace the work unit on the end of the queue.  See the compute()
151        method for details of the arguments to the work unit."""
152        self._lock.acquire()
153        self._queue = self._queue[:-1]
154        self._lock.release()
155        self.queue(*args, **kwargs)
156
157    def reset(self, *args, **kwargs):
158        """Clear the queue and start a new work unit.  See the compute()
159        method for details of the arguments to the work unit."""
160        self.stop()
161        self.queue(*args, **kwargs)
162
163    def stop(self):
164        """Clear the queue and stop the thread.  New items may be
165        queued after stop.  To stop just the current work item, and
166        continue the rest of the queue call the interrupt method"""
167        self._lock.acquire()
168        self._interrupting = True
169        self._queue = []
170        self._lock.release()
171
172    def interrupt(self):
173        """Stop the current work item.  To clear the work queue as
174        well call the stop() method."""
175        self._lock.acquire()
176        self._interrupting = True
177        self._lock.release()
178
179    def isrunning(self):
180        return self._running
181
182    def ready(self, delay=0.):
183        """Ready for another update after delay=t seconds.  Call
184        this for threads which can show intermediate results from
185        long calculations."""
186        self._delay = delay
187        self._lock.acquire()
188        self._time_for_update = clock() + delay
189        # print "setting _time_for_update to ",self._time_for_update
190        self._lock.release()
191
192    def isquit(self):
193        """Check for interrupts.  Should be called frequently to
194        provide user responsiveness.  Also yields to other running
195        threads, which is required for good performance on OS X."""
196
197        # Only called from within the running thread so no need to lock
198        if self._running and self.yieldtime > 0 \
199            and clock() > self._time_for_nap:
200            sleep(self.yieldtime)
201            self._time_for_nap = clock() + self.worktime
202        if self._interrupting:
203            raise KeyboardInterrupt
204
205    def update(self, **kwargs):
206        """Update GUI with the lastest results from the current work unit."""
207        if self.updatefn != None and clock() > self._time_for_update:
208            self._lock.acquire()
209            self._time_for_update = clock() + self._delay
210            self._lock.release()
211            self._time_for_update += 1e6  # No more updates
212           
213            self.updatefn(**kwargs)
214            sleep(self.yieldtime)
215            if self._interrupting:
216                raise KeyboardInterrupt
217        else:
218            self.isquit()
219        return
220
221    def complete(self, **kwargs):
222        """Update the GUI with the completed results from a work unit."""
223        if self.completefn != None:
224            self.completefn(**kwargs)
225            sleep(self.yieldtime)
226        return
227
228    def compute(self, *args, **kwargs):
229        """Perform a work unit.  The subclass will provide details of
230        the arguments."""
231        raise NotImplemented("Calculation thread needs compute method")
232
233    def exception(self):
234        """
235        An exception occurred during computation, so call the exception handler
236        if there is one.  If not, then log the exception and continue.
237        """
238        # If we have an exception handler, let it try to handle the exception.
239        # If it fails fall through to log the failure to handle the exception
240        # (the original exception will be lost).  If there is no exception
241        # handler, just log the exception in compute that we are responding to.
242        if self.exception_handler:
243            try:
244                self.exception_handler(*sys.exc_info())
245                return
246            except Exception:
247                pass
248        import logging
249        logger.error(traceback.format_exc())
250        #print 'CalcThread exception',
251
252    def _run(self):
253        """Internal function to manage the thread."""
254        # The code for condition wait in the threading package is
255        # implemented using polling.  I'll accept for now that the
256        # authors of this code are clever enough that polling is
257        # difficult to avoid.  Rather than polling, I will exit the
258        # thread when the queue is empty and start a new thread when
259        # there is more work to be done.
260        while 1:
261            self._lock.acquire()
262            self._time_for_nap = clock() + self.worktime
263            self._running = True
264            if self._queue == []:
265                break
266            self._interrupting = False
267            args, kwargs = self._queue[0]
268            self._queue = self._queue[1:]
269            self._lock.release()
270            try:
271                self.compute(*args, **kwargs)
272            except KeyboardInterrupt:
273                pass
274            except:
275                self.exception()
276        self._running = False
277
278
279# ======================================================================
280# Demonstration of calcthread in action
281class CalcDemo(CalcThread):
282    """Example of a calculation thread."""
283    def compute(self, n):
284        total = 0.
285        for i in range(n):
286            self.update(i=i)
287            for j in range(n):
288                self.isquit()
289                total += j
290        self.complete(total=total)
291
292
293class CalcCommandline:
294    """
295        Test method
296    """
297    def __init__(self, n=20000):
298        print thread.get_ident()
299        self.starttime = clock()
300        self.done = False
301        self.work = CalcDemo(completefn=self.complete,
302                             updatefn=self.update, yieldtime=0.001)
303        self.work2 = CalcDemo(completefn=self.complete,
304                             updatefn=self.update)
305        self.work3 = CalcDemo(completefn=self.complete,
306                             updatefn=self.update)
307        self.work.queue(n)
308        self.work2.queue(n)
309        self.work3.queue(n)
310        print "Expect updates from Main every second and from thread every 2.5 seconds"
311        print ""
312        self.work.ready(.5)
313        while not self.done:
314            sleep(1)
315            print "Main thread %d at %.2f" % (thread.get_ident(),
316                                              clock() - self.starttime)
317
318    def update(self, i=0):
319        print "Update i=%d from thread %d at %.2f" % (i, thread.get_ident(),
320                                                      clock() - self.starttime)
321        self.work.ready(2.5)
322
323    def complete(self, total=0.0):
324        print "Complete total=%g from thread %d at %.2f" % (total,
325                                                    thread.get_ident(),
326                                                    clock() - self.starttime)
327        self.done = True
Note: See TracBrowser for help on using the repository browser.