source: sasview/src/sas/sascalc/data_util/calcthread.py @ 9dcd5ed

ESS_GUIESS_GUI_DocsESS_GUI_batch_fittingESS_GUI_bumps_abstractionESS_GUI_iss1116ESS_GUI_iss879ESS_GUI_iss959ESS_GUI_openclESS_GUI_orderingESS_GUI_sync_sascalccostrafo411magnetic_scattrelease-4.1.1release-4.1.2release-4.2.2release_4.0.1ticket-1009ticket-1094-headlessticket-1242-2d-resolutionticket-1243ticket-1249ticket885unittest-saveload
Last change on this file since 9dcd5ed was b699768, checked in by Piotr Rozyczko <piotr.rozyczko@…>, 9 years ago

Initial commit of the refactored SasCalc? module.

  • Property mode set to 100644
File size: 11.5 KB
Line 
1# This program is public domain
2
3## \file
4#  \brief Abstract class for defining calculation threads.
5#
6
7import thread
8import traceback
9import sys
10
11if sys.platform.count("darwin") > 0:
12    import time
13    stime = time.time()
14   
15    def clock():
16        return time.time() - stime
17   
18    def sleep(t):
19        return time.sleep(t)
20else:
21    from time import clock
22    from time import sleep
23
24
25class CalcThread:
26    """Threaded calculation class.  Inherit from here and specialize
27    the compute() method to perform the appropriate operations for the
28    class.
29
30    If you specialize the __init__ method be sure to call
31    CalcThread.__init__, passing it the keyword arguments for
32    yieldtime, worktime, update and complete.
33   
34    When defining the compute() method you need to include code which
35    allows the GUI to run.  They are as follows: ::
36
37        self.isquit()          # call frequently to check for interrupts
38        self.update(kw=...)    # call when the GUI could be updated
39        self.complete(kw=...)  # call before exiting compute()
40
41    The update() and complete() calls accept field=value keyword
42    arguments which are passed to the called function.  complete()
43    should be called before exiting the GUI function.  A KeyboardInterrupt
44    event is triggered if the GUI signals that the computation should
45    be halted.
46
47    The following documentation should be included in the description
48    of the derived class.
49
50    The user of this class will call the following: ::
51
52        thread = Work(...,kw=...)  # prepare the work thread.
53        thread.queue(...,kw=...)   # queue a work unit
54        thread.requeue(...,kw=...) # replace work unit on the end of queue
55        thread.reset(...,kw=...)   # reset the queue to the given work unit
56        thread.stop()              # clear the queue and halt
57        thread.interrupt()         # halt the current work unit but continue
58        thread.ready(delay=0.)     # request an update signal after delay
59        thread.isrunning()         # returns true if compute() is running
60
61    Use queue() when all work must be done.  Use requeue() when intermediate
62    work items don't need to be done (e.g., in response to a mouse move
63    event).  Use reset() when the current item doesn't need to be completed
64    before the new event (e.g., in response to a mouse release event).  Use
65    stop() to halt the current and pending computations (e.g., in response to
66    a stop button).
67
68    The methods queue(), requeue() and reset() are proxies for the compute()
69    method in the subclass.  Look there for a description of the arguments.
70    The compute() method can be called directly to run the computation in
71    the main thread, but it should not be called if isrunning() returns true.
72
73    The constructor accepts additional keywords yieldtime=0.01 and
74    worktime=0.01 which determine the cooperative multitasking
75    behaviour.  Yield time is the duration of the sleep period
76    required to give other processes a chance to run.  Work time
77    is the duration between sleep periods.
78
79    Notifying the GUI thread of work in progress and work complete
80    is done with updatefn=updatefn and completefn=completefn arguments
81    to the constructor.  Details of the parameters to the functions
82    depend on the particular calculation class, but they will all
83    be passed as keyword arguments.  Details of how the functions
84    should be implemented vary from framework to framework.
85
86    For wx, something like the following is needed::
87
88        import wx, wx.lib.newevent
89        (CalcCompleteEvent, EVT_CALC_COMPLETE) = wx.lib.newevent.NewEvent()
90
91        # methods in the main window class of your application
92        def __init__():
93            ...
94            # Prepare the calculation in the GUI thread.
95            self.work = Work(completefn=self.CalcComplete)
96            self.Bind(EVT_CALC_COMPLETE, self.OnCalcComplete)
97            ...
98            # Bind work queue to a menu event.
99            self.Bind(wx.EVT_MENU, self.OnCalcStart, id=idCALCSTART)
100            ...
101
102        def OnCalcStart(self,event):
103            # Start the work thread from the GUI thread.
104            self.work.queue(...work unit parameters...)
105
106        def CalcComplete(self,**kwargs):
107            # Generate CalcComplete event in the calculation thread.
108            # kwargs contains field1, field2, etc. as defined by
109            # the Work thread class.
110            event = CalcCompleteEvent(**kwargs)
111            wx.PostEvent(self, event)
112
113        def OnCalcComplete(self,event):
114            # Process CalcComplete event in GUI thread.
115            # Use values from event.field1, event.field2 etc. as
116            # defined by the Work thread class to show the results.
117            ...
118    """
119
120    def __init__(self, completefn=None, updatefn=None,
121                 yieldtime=0.01, worktime=0.01):
122        """Prepare the calculator"""
123        self.yieldtime     = yieldtime
124        self.worktime      = worktime
125        self.completefn    = completefn
126        self.updatefn      = updatefn
127        self._interrupting = False
128        self._running      = False
129        self._queue        = []
130        self._lock         = thread.allocate_lock()
131        self._delay        = 1e6
132
133    def queue(self,*args,**kwargs):
134        """Add a work unit to the end of the queue.  See the compute()
135        method for details of the arguments to the work unit."""
136        self._lock.acquire()
137        self._queue.append((args, kwargs))
138        # Cannot do start_new_thread call within the lock
139        self._lock.release()
140        if not self._running:
141            self._time_for_update = clock() + 1e6
142            thread.start_new_thread(self._run, ())
143
144    def requeue(self, *args, **kwargs):
145        """Replace the work unit on the end of the queue.  See the compute()
146        method for details of the arguments to the work unit."""
147        self._lock.acquire()
148        self._queue = self._queue[:-1]
149        self._lock.release()
150        self.queue(*args, **kwargs)
151
152    def reset(self, *args, **kwargs):
153        """Clear the queue and start a new work unit.  See the compute()
154        method for details of the arguments to the work unit."""
155        self.stop()
156        self.queue(*args, **kwargs)
157
158    def stop(self):
159        """Clear the queue and stop the thread.  New items may be
160        queued after stop.  To stop just the current work item, and
161        continue the rest of the queue call the interrupt method"""
162        self._lock.acquire()
163        self._interrupting = True
164        self._queue = []
165        self._lock.release()
166
167    def interrupt(self):
168        """Stop the current work item.  To clear the work queue as
169        well call the stop() method."""
170        self._lock.acquire()
171        self._interrupting = True
172        self._lock.release()
173
174    def isrunning(self):
175        return self._running
176
177    def ready(self, delay=0.):
178        """Ready for another update after delay=t seconds.  Call
179        this for threads which can show intermediate results from
180        long calculations."""
181        self._delay = delay
182        self._lock.acquire()
183        self._time_for_update = clock() + delay
184        # print "setting _time_for_update to ",self._time_for_update
185        self._lock.release()
186
187    def isquit(self):
188        """Check for interrupts.  Should be called frequently to
189        provide user responsiveness.  Also yields to other running
190        threads, which is required for good performance on OS X."""
191
192        # Only called from within the running thread so no need to lock
193        if self._running and self.yieldtime > 0 \
194            and clock() > self._time_for_nap:
195            sleep(self.yieldtime)
196            self._time_for_nap = clock() + self.worktime
197        if self._interrupting:
198            raise KeyboardInterrupt
199
200    def update(self, **kwargs):
201
202        """Update GUI with the lastest results from the current work unit."""
203        if self.updatefn != None and clock() > self._time_for_update:
204            self._lock.acquire()
205            self._time_for_update = clock() + self._delay
206            self._lock.release()
207            self._time_for_update += 1e6  # No more updates
208           
209            self.updatefn(**kwargs)
210            sleep(self.yieldtime)
211            if self._interrupting:
212                raise KeyboardInterrupt
213        else:
214            self.isquit()
215        return
216
217    def complete(self, **kwargs):
218        """Update the GUI with the completed results from a work unit."""
219        if self.completefn != None:
220            self.completefn(**kwargs)
221            sleep(self.yieldtime)
222        return
223
224    def compute(self, *args, **kwargs):
225        """Perform a work unit.  The subclass will provide details of
226        the arguments."""
227        raise NotImplemented, "Calculation thread needs compute method"
228
229    def _run(self):
230        """Internal function to manage the thread."""
231        # The code for condition wait in the threading package is
232        # implemented using polling.  I'll accept for now that the
233        # authors of this code are clever enough that polling is
234        # difficult to avoid.  Rather than polling, I will exit the
235        # thread when the queue is empty and start a new thread when
236        # there is more work to be done.
237        while 1:
238            self._lock.acquire()
239            self._time_for_nap = clock() + self.worktime
240            self._running = True
241            if self._queue == []:
242                break
243            self._interrupting = False
244            args, kwargs = self._queue[0]
245            self._queue = self._queue[1:]
246            self._lock.release()
247            try:
248                self.compute(*args, **kwargs)
249            except KeyboardInterrupt:
250                pass
251            except:
252                traceback.print_exc()
253                #print 'CalcThread exception',
254        self._running = False
255
256
257# ======================================================================
258# Demonstration of calcthread in action
259class CalcDemo(CalcThread):
260    """Example of a calculation thread."""
261    def compute(self, n):
262        total = 0.
263        for i in range(n):
264            self.update(i=i)
265            for j in range(n):
266                self.isquit()
267                total += j
268        self.complete(total=total)
269
270
271class CalcCommandline:
272    """
273        Test method
274    """
275    def __init__(self, n=20000):
276        print thread.get_ident()
277        self.starttime = clock()
278        self.done = False
279        self.work = CalcDemo(completefn=self.complete,
280                             updatefn=self.update, yieldtime=0.001)
281        self.work2 = CalcDemo(completefn=self.complete,
282                             updatefn=self.update)
283        self.work3 = CalcDemo(completefn=self.complete,
284                             updatefn=self.update)
285        self.work.queue(n)
286        self.work2.queue(n)
287        self.work3.queue(n)
288        print "Expect updates from Main every second and from thread every 2.5 seconds"
289        print ""
290        self.work.ready(.5)
291        while not self.done:
292            sleep(1)
293            print "Main thread %d at %.2f" % (thread.get_ident(),
294                                              clock() - self.starttime)
295
296    def update(self, i=0):
297        print "Update i=%d from thread %d at %.2f" % (i, thread.get_ident(),
298                                                      clock() - self.starttime)
299        self.work.ready(2.5)
300
301    def complete(self, total=0.0):
302        print "Complete total=%g from thread %d at %.2f" % (total,
303                                                    thread.get_ident(),
304                                                    clock() - self.starttime)
305        self.done = True
Note: See TracBrowser for help on using the repository browser.