source: sasview/src/data_util/calcthread.py @ a7d2e42

ESS_GUIESS_GUI_DocsESS_GUI_batch_fittingESS_GUI_bumps_abstractionESS_GUI_iss1116ESS_GUI_iss879ESS_GUI_iss959ESS_GUI_openclESS_GUI_orderingESS_GUI_sync_sascalccostrafo411magnetic_scattrelease-4.1.1release-4.1.2release-4.2.2release_4.0.1ticket-1009ticket-1094-headlessticket-1242-2d-resolutionticket-1243ticket-1249ticket885unittest-saveload
Last change on this file since a7d2e42 was a7d2e42, checked in by Mathieu Doucet <doucetm@…>, 11 years ago

Move data_util module so directory struct matches import

  • Property mode set to 100644
File size: 11.4 KB
Line 
1# This program is public domain
2
3## \file
4#  \brief Abstract class for defining calculation threads.
5#
6
7import thread
8import traceback
9import sys
10
11if sys.platform.count("darwin") > 0:
12    import time
13    stime = time.time()
14   
15    def clock():
16        return time.time() - stime
17   
18    def sleep(t):
19        return time.sleep(t)
20else:
21    from time import clock
22    from time import sleep
23
24
25class CalcThread:
26    """Threaded calculation class.  Inherit from here and specialize
27    the compute() method to perform the appropriate operations for the
28    class.
29
30    If you specialize the __init__ method be sure to call
31    CalcThread.__init__, passing it the keyword arguments for
32    yieldtime, worktime, update and complete.
33   
34    When defining the compute() method you need to include code which
35    allows the GUI to run.  They are as follows:
36        self.isquit()          call frequently to check for interrupts
37        self.update(kw=...)    call when the GUI could be updated
38        self.complete(kw=...)  call before exiting compute()
39    The update() and complete() calls accept field=value keyword
40    arguments which are passed to the called function.  complete()
41    should be called before exiting the GUI function.  A KeyboardInterrupt
42    event is triggered if the GUI signals that the computation should
43    be halted.
44
45    The following documentation should be included in the description
46    of the derived class.
47
48    The user of this class will call the following:
49
50        thread = Work(...,kw=...)  prepare the work thread.
51        thread.queue(...,kw=...)   queue a work unit
52        thread.requeue(...,kw=...) replace work unit on the end of queue
53        thread.reset(...,kw=...)   reset the queue to the given work unit
54        thread.stop()              clear the queue and halt
55        thread.interrupt()         halt the current work unit but continue
56        thread.ready(delay=0.)     request an update signal after delay
57        thread.isrunning()         returns true if compute() is running
58
59    Use queue() when all work must be done.  Use requeue() when intermediate
60    work items don't need to be done (e.g., in response to a mouse move
61    event).  Use reset() when the current item doesn't need to be completed
62    before the new event (e.g., in response to a mouse release event).  Use
63    stop() to halt the current and pending computations (e.g., in response to
64    a stop button).
65   
66    The methods queue(), requeue() and reset() are proxies for the compute()
67    method in the subclass.  Look there for a description of the arguments.
68    The compute() method can be called directly to run the computation in
69    the main thread, but it should not be called if isrunning() returns true.
70
71    The constructor accepts additional keywords yieldtime=0.01 and
72    worktime=0.01 which determine the cooperative multitasking
73    behaviour.  Yield time is the duration of the sleep period
74    required to give other processes a chance to run.  Work time
75    is the duration between sleep periods.
76
77    Notifying the GUI thread of work in progress and work complete
78    is done with updatefn=updatefn and completefn=completefn arguments
79    to the constructor.  Details of the parameters to the functions
80    depend on the particular calculation class, but they will all
81    be passed as keyword arguments.  Details of how the functions
82    should be implemented vary from framework to framework.
83
84    For wx, something like the following is needed:
85
86        import wx, wx.lib.newevent
87        (CalcCompleteEvent, EVT_CALC_COMPLETE) = wx.lib.newevent.NewEvent()
88
89        # methods in the main window class of your application
90        def __init__():
91            ...
92            # Prepare the calculation in the GUI thread.
93            self.work = Work(completefn=self.CalcComplete)
94            self.Bind(EVT_CALC_COMPLETE, self.OnCalcComplete)
95            ...
96            # Bind work queue to a menu event.
97            self.Bind(wx.EVT_MENU, self.OnCalcStart, id=idCALCSTART)
98            ...
99
100        def OnCalcStart(self,event):
101            # Start the work thread from the GUI thread.
102            self.work.queue(...work unit parameters...)
103
104        def CalcComplete(self,**kwargs):
105            # Generate CalcComplete event in the calculation thread.
106            # kwargs contains field1, field2, etc. as defined by
107            # the Work thread class.
108            event = CalcCompleteEvent(**kwargs)
109            wx.PostEvent(self, event)
110
111        def OnCalcComplete(self,event):
112            # Process CalcComplete event in GUI thread.
113            # Use values from event.field1, event.field2 etc. as
114            # defined by the Work thread class to show the results.
115            ...
116    """
117
118    def __init__(self, completefn=None, updatefn=None,
119                 yieldtime=0.01, worktime=0.01):
120        """Prepare the calculator"""
121        self.yieldtime     = yieldtime
122        self.worktime      = worktime
123        self.completefn    = completefn
124        self.updatefn      = updatefn
125        self._interrupting = False
126        self._running      = False
127        self._queue        = []
128        self._lock         = thread.allocate_lock()
129        self._delay        = 1e6
130
131    def queue(self,*args,**kwargs):
132        """Add a work unit to the end of the queue.  See the compute()
133        method for details of the arguments to the work unit."""
134        self._lock.acquire()
135        self._queue.append((args, kwargs))
136        # Cannot do start_new_thread call within the lock
137        self._lock.release()
138        if not self._running:
139            self._time_for_update = clock() + 1e6
140            thread.start_new_thread(self._run, ())
141
142    def requeue(self, *args, **kwargs):
143        """Replace the work unit on the end of the queue.  See the compute()
144        method for details of the arguments to the work unit."""
145        self._lock.acquire()
146        self._queue = self._queue[:-1]
147        self._lock.release()
148        self.queue(*args, **kwargs)
149
150    def reset(self, *args, **kwargs):
151        """Clear the queue and start a new work unit.  See the compute()
152        method for details of the arguments to the work unit."""
153        self.stop()
154        self.queue(*args, **kwargs)
155
156    def stop(self):
157        """Clear the queue and stop the thread.  New items may be
158        queued after stop.  To stop just the current work item, and
159        continue the rest of the queue call the interrupt method"""
160        self._lock.acquire()
161        self._interrupting = True
162        self._queue = []
163        self._lock.release()
164
165    def interrupt(self):
166        """Stop the current work item.  To clear the work queue as
167        well call the stop() method."""
168        self._lock.acquire()
169        self._interrupting = True
170        self._lock.release()
171
172    def isrunning(self):
173        return self._running
174
175    def ready(self, delay=0.):
176        """Ready for another update after delay=t seconds.  Call
177        this for threads which can show intermediate results from
178        long calculations."""
179        self._delay = delay
180        self._lock.acquire()
181        self._time_for_update = clock() + delay
182        # print "setting _time_for_update to ",self._time_for_update
183        self._lock.release()
184
185    def isquit(self):
186        """Check for interrupts.  Should be called frequently to
187        provide user responsiveness.  Also yields to other running
188        threads, which is required for good performance on OS X."""
189
190        # Only called from within the running thread so no need to lock
191        if self._running and self.yieldtime > 0 \
192            and clock() > self._time_for_nap:
193            sleep(self.yieldtime)
194            self._time_for_nap = clock() + self.worktime
195        if self._interrupting:
196            raise KeyboardInterrupt
197
198    def update(self, **kwargs):
199
200        """Update GUI with the lastest results from the current work unit."""
201        if self.updatefn != None and clock() > self._time_for_update:
202            self._lock.acquire()
203            self._time_for_update = clock() + self._delay
204            self._lock.release()
205            self._time_for_update += 1e6  # No more updates
206           
207            self.updatefn(**kwargs)
208            sleep(self.yieldtime)
209            if self._interrupting:
210                raise KeyboardInterrupt
211        else:
212            self.isquit()
213        return
214
215    def complete(self, **kwargs):
216        """Update the GUI with the completed results from a work unit."""
217        if self.completefn != None:
218            self.completefn(**kwargs)
219            sleep(self.yieldtime)
220        return
221
222    def compute(self, *args, **kwargs):
223        """Perform a work unit.  The subclass will provide details of
224        the arguments."""
225        raise NotImplemented, "Calculation thread needs compute method"
226
227    def _run(self):
228        """Internal function to manage the thread."""
229        # The code for condition wait in the threading package is
230        # implemented using polling.  I'll accept for now that the
231        # authors of this code are clever enough that polling is
232        # difficult to avoid.  Rather than polling, I will exit the
233        # thread when the queue is empty and start a new thread when
234        # there is more work to be done.
235        while 1:
236            self._lock.acquire()
237            self._time_for_nap = clock() + self.worktime
238            self._running = True
239            if self._queue == []:
240                break
241            self._interrupting = False
242            args, kwargs = self._queue[0]
243            self._queue = self._queue[1:]
244            self._lock.release()
245            try:
246                self.compute(*args, **kwargs)
247            except KeyboardInterrupt:
248                pass
249            except:
250                traceback.print_exc()
251                #print 'CalcThread exception',
252        self._running = False
253
254
255# ======================================================================
256# Demonstration of calcthread in action
257class CalcDemo(CalcThread):
258    """Example of a calculation thread."""
259    def compute(self, n):
260        total = 0.
261        for i in range(n):
262            self.update(i=i)
263            for j in range(n):
264                self.isquit()
265                total += j
266        self.complete(total=total)
267
268
269class CalcCommandline:
270    """
271        Test method
272    """
273    def __init__(self, n=20000):
274        print thread.get_ident()
275        self.starttime = clock()
276        self.done = False
277        self.work = CalcDemo(completefn=self.complete,
278                             updatefn=self.update, yieldtime=0.001)
279        self.work2 = CalcDemo(completefn=self.complete,
280                             updatefn=self.update)
281        self.work3 = CalcDemo(completefn=self.complete,
282                             updatefn=self.update)
283        self.work.queue(n)
284        self.work2.queue(n)
285        self.work3.queue(n)
286        print "Expect updates from Main every second and from thread every 2.5 seconds"
287        print ""
288        self.work.ready(.5)
289        while not self.done:
290            sleep(1)
291            print "Main thread %d at %.2f" % (thread.get_ident(),
292                                              clock() - self.starttime)
293
294    def update(self, i=0):
295        print "Update i=%d from thread %d at %.2f" % (i, thread.get_ident(),
296                                                      clock() - self.starttime)
297        self.work.ready(2.5)
298
299    def complete(self, total=0.0):
300        print "Complete total=%g from thread %d at %.2f" % (total,
301                                                    thread.get_ident(),
302                                                    clock() - self.starttime)
303        self.done = True
Note: See TracBrowser for help on using the repository browser.