source: sasview/sansutil/calcthread.py @ dac6869

ESS_GUIESS_GUI_DocsESS_GUI_batch_fittingESS_GUI_bumps_abstractionESS_GUI_iss1116ESS_GUI_iss879ESS_GUI_iss959ESS_GUI_openclESS_GUI_orderingESS_GUI_sync_sascalccostrafo411magnetic_scattrelease-4.1.1release-4.1.2release-4.2.2release_4.0.1ticket-1009ticket-1094-headlessticket-1242-2d-resolutionticket-1243ticket-1249ticket885unittest-saveload
Last change on this file since dac6869 was 4025019, checked in by Mathieu Doucet <doucetm@…>, 13 years ago

Adding util and park

  • Property mode set to 100644
File size: 11.4 KB
Line 
1# This program is public domain
2
3## \file
4#  \brief Abstract class for defining calculation threads.
5#
6
7import thread, traceback
8
9import sys
10
11if sys.platform.count("darwin")>0:
12    import time
13    stime = time.time()
14    def clock():
15        return time.time()-stime
16    def sleep(t):
17        return time.sleep(t)   
18else:
19    from time import clock
20    from time import sleep
21
22class CalcThread:
23    """Threaded calculation class.  Inherit from here and specialize
24    the compute() method to perform the appropriate operations for the
25    class.
26
27    If you specialize the __init__ method be sure to call
28    CalcThread.__init__, passing it the keyword arguments for
29    yieldtime, worktime, update and complete.
30   
31    When defining the compute() method you need to include code which
32    allows the GUI to run.  They are as follows:
33        self.isquit()          call frequently to check for interrupts
34        self.update(kw=...)    call when the GUI could be updated
35        self.complete(kw=...)  call before exiting compute()
36    The update() and complete() calls accept field=value keyword
37    arguments which are passed to the called function.  complete()
38    should be called before exiting the GUI function.  A KeyboardInterrupt
39    event is triggered if the GUI signals that the computation should
40    be halted.
41
42    The following documentation should be included in the description
43    of the derived class.
44
45    The user of this class will call the following:
46
47        thread = Work(...,kw=...)  prepare the work thread.
48        thread.queue(...,kw=...)   queue a work unit
49        thread.requeue(...,kw=...) replace work unit on the end of queue
50        thread.reset(...,kw=...)   reset the queue to the given work unit
51        thread.stop()              clear the queue and halt
52        thread.interrupt()         halt the current work unit but continue
53        thread.ready(delay=0.)     request an update signal after delay
54        thread.isrunning()         returns true if compute() is running
55
56    Use queue() when all work must be done.  Use requeue() when intermediate
57    work items don't need to be done (e.g., in response to a mouse move
58    event).  Use reset() when the current item doesn't need to be completed
59    before the new event (e.g., in response to a mouse release event).  Use
60    stop() to halt the current and pending computations (e.g., in response to
61    a stop button).
62   
63    The methods queue(), requeue() and reset() are proxies for the compute()
64    method in the subclass.  Look there for a description of the arguments.
65    The compute() method can be called directly to run the computation in
66    the main thread, but it should not be called if isrunning() returns true.
67
68    The constructor accepts additional keywords yieldtime=0.01 and
69    worktime=0.01 which determine the cooperative multitasking
70    behaviour.  Yield time is the duration of the sleep period
71    required to give other processes a chance to run.  Work time
72    is the duration between sleep periods.
73
74    Notifying the GUI thread of work in progress and work complete
75    is done with updatefn=updatefn and completefn=completefn arguments
76    to the constructor.  Details of the parameters to the functions
77    depend on the particular calculation class, but they will all
78    be passed as keyword arguments.  Details of how the functions
79    should be implemented vary from framework to framework.
80
81    For wx, something like the following is needed:
82
83        import wx, wx.lib.newevent
84        (CalcCompleteEvent, EVT_CALC_COMPLETE) = wx.lib.newevent.NewEvent()
85
86        # methods in the main window class of your application
87        def __init__():
88            ...
89            # Prepare the calculation in the GUI thread.
90            self.work = Work(completefn=self.CalcComplete)
91            self.Bind(EVT_CALC_COMPLETE, self.OnCalcComplete)
92            ...
93            # Bind work queue to a menu event.
94            self.Bind(wx.EVT_MENU, self.OnCalcStart, id=idCALCSTART)
95            ...
96
97        def OnCalcStart(self,event):
98            # Start the work thread from the GUI thread.
99            self.work.queue(...work unit parameters...)
100
101        def CalcComplete(self,**kwargs):
102            # Generate CalcComplete event in the calculation thread.
103            # kwargs contains field1, field2, etc. as defined by
104            # the Work thread class.
105            event = CalcCompleteEvent(**kwargs)
106            wx.PostEvent(self, event)
107
108        def OnCalcComplete(self,event):
109            # Process CalcComplete event in GUI thread.
110            # Use values from event.field1, event.field2 etc. as
111            # defined by the Work thread class to show the results.
112            ...
113    """
114
115    def __init__(self,
116                 completefn = None,
117                 updatefn   = None,
118                 yieldtime  = 0.01,
119                 worktime   = 0.01
120                 ):
121        """Prepare the calculator"""
122        self.yieldtime     = yieldtime
123        self.worktime      = worktime
124        self.completefn    = completefn
125        self.updatefn      = updatefn
126        self._interrupting = False
127        self._running      = False
128        self._queue        = []
129        self._lock         = thread.allocate_lock()
130        self._delay        = 1e6
131
132    def queue(self,*args,**kwargs):
133        """Add a work unit to the end of the queue.  See the compute()
134        method for details of the arguments to the work unit."""
135        self._lock.acquire()
136        self._queue.append((args,kwargs))
137        # Cannot do start_new_thread call within the lock
138        self._lock.release()
139        if not self._running:
140            self._time_for_update = clock()+1e6
141            thread.start_new_thread(self._run,())
142
143    def requeue(self,*args,**kwargs):
144        """Replace the work unit on the end of the queue.  See the compute()
145        method for details of the arguments to the work unit."""
146        self._lock.acquire()
147        self._queue = self._queue[:-1]
148        self._lock.release()
149        self.queue(*args,**kwargs)
150
151    def reset(self,*args,**kwargs):
152        """Clear the queue and start a new work unit.  See the compute()
153        method for details of the arguments to the work unit."""
154        self.stop()
155        self.queue(*args,**kwargs)
156
157    def stop(self):
158        """Clear the queue and stop the thread.  New items may be
159        queued after stop.  To stop just the current work item, and
160        continue the rest of the queue call the interrupt method"""
161        self._lock.acquire()
162        self._interrupting = True
163        self._queue = []
164        self._lock.release()
165
166    def interrupt(self):
167        """Stop the current work item.  To clear the work queue as
168        well call the stop() method."""
169        self._lock.acquire()
170        self._interrupting = True
171        self._lock.release()
172
173    def isrunning(self): return self._running
174
175    def ready(self, delay=0.):
176        """Ready for another update after delay=t seconds.  Call
177        this for threads which can show intermediate results from
178        long calculations."""
179        self._delay = delay
180        self._lock.acquire()
181        self._time_for_update = clock() + delay
182        # print "setting _time_for_update to ",self._time_for_update
183        self._lock.release()
184
185    def isquit(self):
186        """Check for interrupts.  Should be called frequently to
187        provide user responsiveness.  Also yields to other running
188        threads, which is required for good performance on OS X."""
189
190        # Only called from within the running thread so no need to lock
191        if self._running and self.yieldtime>0 and clock()>self._time_for_nap:
192            # print "sharing"
193            sleep(self.yieldtime)
194            self._time_for_nap = clock() + self.worktime
195        if self._interrupting: raise KeyboardInterrupt
196
197    def update(self,**kwargs):
198
199        """Update GUI with the lastest results from the current work unit."""
200        if self.updatefn != None and clock() > self._time_for_update:
201            self._lock.acquire()
202            self._time_for_update = clock() + self._delay
203            self._lock.release()
204            self._time_for_update += 1e6  # No more updates
205           
206            self.updatefn(**kwargs)
207            sleep(self.yieldtime)
208            if self._interrupting: raise KeyboardInterrupt
209        else:
210            self.isquit()
211        return
212
213    def complete(self,**kwargs):
214        """Update the GUI with the completed results from a work unit."""
215        if self.completefn != None:
216            self.completefn(**kwargs)
217            sleep(self.yieldtime)
218        return
219
220    def compute(self,*args,**kwargs):
221        """Perform a work unit.  The subclass will provide details of
222        the arguments."""
223        raise NotImplemented, "Calculation thread needs compute method"
224
225    def _run(self):
226        """Internal function to manage the thread."""
227        # The code for condition wait in the threading package is
228        # implemented using polling.  I'll accept for now that the
229        # authors of this code are clever enough that polling is
230        # difficult to avoid.  Rather than polling, I will exit the
231        # thread when the queue is empty and start a new thread when
232        # there is more work to be done.
233        while 1:
234            self._lock.acquire()
235            # print "lock aquired"
236            self._time_for_nap = clock()+self.worktime
237            self._running = True
238            if self._queue == []: break
239            self._interrupting = False
240            args,kwargs = self._queue[0]
241            self._queue = self._queue[1:]
242            self._lock.release()
243            # print "lock released"
244            try:
245                self.compute(*args,**kwargs)
246            except KeyboardInterrupt:
247                pass
248            except:
249                traceback.print_exc()
250                #print 'CalcThread exception',
251        self._running = False
252
253# ======================================================================
254# Demonstration of calcthread in action
255class CalcDemo(CalcThread):
256    """Example of a calculation thread."""
257    def compute(self,n):
258        total = 0.
259        for i in range(n):
260            self.update(i=i)
261            for j in range(n):
262                self.isquit()
263                total += j
264        self.complete(total=total)
265
266class CalcCommandline:
267    def __init__(self, n=20000):
268        print thread.get_ident()
269        self.starttime = clock()
270        self.done = False
271        self.work = CalcDemo(completefn=self.complete,
272                             updatefn=self.update, yieldtime=0.001)
273        self.work2 = CalcDemo(completefn=self.complete,
274                             updatefn=self.update)
275        self.work3 = CalcDemo(completefn=self.complete,
276                             updatefn=self.update)
277        self.work.queue(n)
278        self.work2.queue(n)
279        self.work3.queue(n)
280        print "Expect updates from Main every second and from thread every 2.5 seconds"
281        print ""
282        self.work.ready(.5)
283        while not self.done:
284            sleep(1)
285            print "Main thread %d at %.2f"%(thread.get_ident(),clock()-self.starttime)
286
287    def update(self,i=0):
288        print "Update i=%d from thread %d at %.2f"%(i,thread.get_ident(),clock()-self.starttime)
289        self.work.ready(2.5)
290
291    def complete(self,total=0.0):
292        print "Complete total=%g from thread %d at %.2f"%(total,thread.get_ident(),clock()-self.starttime)
293        self.done = True
294
295if __name__ == "__main__":
296    CalcCommandline()
297
298# version
299__id__ = "$Id: calcthread.py 249 2007-06-15 17:03:01Z ziwen $"
300
301# End of file
Note: See TracBrowser for help on using the repository browser.