source: sasview/sansview/perspectives/fitting/calcthread.py @ cfc0913

ESS_GUIESS_GUI_DocsESS_GUI_batch_fittingESS_GUI_bumps_abstractionESS_GUI_iss1116ESS_GUI_iss879ESS_GUI_iss959ESS_GUI_openclESS_GUI_orderingESS_GUI_sync_sascalccostrafo411magnetic_scattrelease-4.1.1release-4.1.2release-4.2.2release_4.0.1ticket-1009ticket-1094-headlessticket-1242-2d-resolutionticket-1243ticket-1249ticket885unittest-saveload
Last change on this file since cfc0913 was bb18ef1, checked in by Gervaise Alina <gervyh@…>, 15 years ago

some changes of sansview review

  • Property mode set to 100644
File size: 11.3 KB
Line 
1# This program is public domain
2
3## \file
4#  \brief Abstract class for defining calculation threads.
5#
6
7import thread, traceback
8
9import sys
10if sys.platform.count("darwin")>0:
11    import mactime as time
12else:
13    import time
14
15class CalcThread:
16    """Threaded calculation class.  Inherit from here and specialize
17    the compute() method to perform the appropriate operations for the
18    class.
19
20    If you specialize the __init__ method be sure to call
21    CalcThread.__init__, passing it the keyword arguments for
22    yieldtime, worktime, update and complete.
23   
24    When defining the compute() method you need to include code which
25    allows the GUI to run.  They are as follows:
26        self.isquit()          call frequently to check for interrupts
27        self.update(kw=...)    call when the GUI could be updated
28        self.complete(kw=...)  call before exiting compute()
29    The update() and complete() calls accept field=value keyword
30    arguments which are passed to the called function.  complete()
31    should be called before exiting the GUI function.  A KeyboardInterrupt
32    event is triggered if the GUI signals that the computation should
33    be halted.
34
35    The following documentation should be included in the description
36    of the derived class.
37
38    The user of this class will call the following:
39
40        thread = Work(...,kw=...)  prepare the work thread.
41        thread.queue(...,kw=...)   queue a work unit
42        thread.requeue(...,kw=...) replace work unit on the end of queue
43        thread.reset(...,kw=...)   reset the queue to the given work unit
44        thread.stop()              clear the queue and halt
45        thread.interrupt()         halt the current work unit but continue
46        thread.ready(delay=0.)     request an update signal after delay
47        thread.isrunning()         returns true if compute() is running
48
49    Use queue() when all work must be done.  Use requeue() when intermediate
50    work items don't need to be done (e.g., in response to a mouse move
51    event).  Use reset() when the current item doesn't need to be completed
52    before the new event (e.g., in response to a mouse release event).  Use
53    stop() to halt the current and pending computations (e.g., in response to
54    a stop button).
55   
56    The methods queue(), requeue() and reset() are proxies for the compute()
57    method in the subclass.  Look there for a description of the arguments.
58    The compute() method can be called directly to run the computation in
59    the main thread, but it should not be called if isrunning() returns true.
60
61    The constructor accepts additional keywords yieldtime=0.01 and
62    worktime=0.01 which determine the cooperative multitasking
63    behaviour.  Yield time is the duration of the sleep period
64    required to give other processes a chance to run.  Work time
65    is the duration between sleep periods.
66
67    Notifying the GUI thread of work in progress and work complete
68    is done with updatefn=updatefn and completefn=completefn arguments
69    to the constructor.  Details of the parameters to the functions
70    depend on the particular calculation class, but they will all
71    be passed as keyword arguments.  Details of how the functions
72    should be implemented vary from framework to framework.
73
74    For wx, something like the following is needed:
75
76        import wx, wx.lib.newevent
77        (CalcCompleteEvent, EVT_CALC_COMPLETE) = wx.lib.newevent.NewEvent()
78
79        # methods in the main window class of your application
80        def __init__():
81            ...
82            # Prepare the calculation in the GUI thread.
83            self.work = Work(completefn=self.CalcComplete)
84            self.Bind(EVT_CALC_COMPLETE, self.OnCalcComplete)
85            ...
86            # Bind work queue to a menu event.
87            self.Bind(wx.EVT_MENU, self.OnCalcStart, id=idCALCSTART)
88            ...
89
90        def OnCalcStart(self,event):
91            # Start the work thread from the GUI thread.
92            self.work.queue(...work unit parameters...)
93
94        def CalcComplete(self,**kwargs):
95            # Generate CalcComplete event in the calculation thread.
96            # kwargs contains field1, field2, etc. as defined by
97            # the Work thread class.
98            event = CalcCompleteEvent(**kwargs)
99            wx.PostEvent(self, event)
100
101        def OnCalcComplete(self,event):
102            # Process CalcComplete event in GUI thread.
103            # Use values from event.field1, event.field2 etc. as
104            # defined by the Work thread class to show the results.
105            ...
106    """
107
108    def __init__(self,
109                 completefn = None,
110                 updatefn   = None,
111                 yieldtime  = 0.01,
112                 worktime   = 0.01
113                 ):
114        """Prepare the calculator"""
115        self.yieldtime     = yieldtime
116        self.worktime      = worktime
117        self.completefn    = completefn
118        self.updatefn      = updatefn
119        self._interrupting = False
120        self._running      = False
121        self._queue        = []
122        self._lock         = thread.allocate_lock()
123        self._delay        = 1e6
124
125    def queue(self,*args,**kwargs):
126        """Add a work unit to the end of the queue.  See the compute()
127        method for details of the arguments to the work unit."""
128        self._lock.acquire()
129        self._queue.append((args,kwargs))
130        # Cannot do start_new_thread call within the lock
131        self._lock.release()
132        if not self._running:
133            self._time_for_update = time.clock()+1e6
134            thread.start_new_thread(self._run,())
135
136    def requeue(self,*args,**kwargs):
137        """Replace the work unit on the end of the queue.  See the compute()
138        method for details of the arguments to the work unit."""
139        self._lock.acquire()
140        self._queue = self._queue[:-1]
141        self._lock.release()
142        self.queue(*args,**kwargs)
143
144    def reset(self,*args,**kwargs):
145        """Clear the queue and start a new work unit.  See the compute()
146        method for details of the arguments to the work unit."""
147        self.stop()
148        self.queue(*args,**kwargs)
149
150    def stop(self):
151        """Clear the queue and stop the thread.  New items may be
152        queued after stop.  To stop just the current work item, and
153        continue the rest of the queue call the interrupt method"""
154        self._lock.acquire()
155        self._interrupting = True
156        self._queue = []
157        self._lock.release()
158
159    def interrupt(self):
160        """Stop the current work item.  To clear the work queue as
161        well call the stop() method."""
162        self._lock.acquire()
163        self._interrupting = True
164        self._lock.release()
165
166    def isrunning(self): return self._running
167
168    def ready(self, delay=0.):
169        """Ready for another update after delay=t seconds.  Call
170        this for threads which can show intermediate results from
171        long calculations."""
172        self._delay = delay
173        self._lock.acquire()
174        self._time_for_update = time.clock() + delay
175        # print "setting _time_for_update to ",self._time_for_update
176        self._lock.release()
177
178    def isquit(self):
179        """Check for interrupts.  Should be called frequently to
180        provide user responsiveness.  Also yields to other running
181        threads, which is required for good performance on OS X."""
182
183        # Only called from within the running thread so no need to lock
184        if self._running and self.yieldtime>0 and time.clock()>self._time_for_nap:
185            # print "sharing"
186            time.sleep(self.yieldtime)
187            self._time_for_nap = time.clock() + self.worktime
188        if self._interrupting: raise KeyboardInterrupt
189
190    def update(self,**kwargs):
191       
192        """Update GUI with the lastest results from the current work unit."""
193        if self.updatefn != None and time.clock() > self._time_for_update:
194            self._lock.acquire()
195            self._time_for_update = time.clock() + self._delay
196            self._lock.release()
197            self._time_for_update += 1e6  # No more updates
198           
199            self.updatefn(**kwargs)
200            time.sleep(self.yieldtime)
201            if self._interrupting: raise KeyboardInterrupt
202        else:
203            self.isquit()
204        return
205
206    def complete(self,**kwargs):
207        """Update the GUI with the completed results from a work unit."""
208        if self.completefn != None:
209            self.completefn(**kwargs)
210            time.sleep(self.yieldtime)
211        return
212
213    def compute(self,*args,**kwargs):
214        """Perform a work unit.  The subclass will provide details of
215        the arguments."""
216        raise NotImplemented, "Calculation thread needs compute method"
217
218    def _run(self):
219        """Internal function to manage the thread."""
220        # The code for condition wait in the threading package is
221        # implemented using polling.  I'll accept for now that the
222        # authors of this code are clever enough that polling is
223        # difficult to avoid.  Rather than polling, I will exit the
224        # thread when the queue is empty and start a new thread when
225        # there is more work to be done.
226        while 1:
227            self._lock.acquire()
228            # print "lock aquired"
229            self._time_for_nap = time.clock()+self.worktime
230            self._running = True
231            if self._queue == []: break
232            self._interrupting = False
233            args,kwargs = self._queue[0]
234            self._queue = self._queue[1:]
235            self._lock.release()
236            # print "lock released"
237            try:
238                self.compute(*args,**kwargs)
239            except KeyboardInterrupt:
240                pass
241            except:
242                traceback.print_exc()
243                #print 'CalcThread exception',
244        self._running = False
245
246# ======================================================================
247# Demonstration of calcthread in action
248class CalcDemo(CalcThread):
249    """Example of a calculation thread."""
250    def compute(self,n):
251        total = 0.
252        for i in range(n):
253            self.update(i=i)
254            for j in range(n):
255                self.isquit()
256                total += j
257        self.complete(total=total)
258
259class CalcCommandline:
260    def __init__(self, n=20000):
261        print thread.get_ident()
262        self.starttime = time.clock()
263        self.done = False
264        self.work = CalcDemo(completefn=self.complete,
265                             updatefn=self.update, yieldtime=0.001)
266        self.work2 = CalcDemo(completefn=self.complete,
267                             updatefn=self.update)
268        self.work3 = CalcDemo(completefn=self.complete,
269                             updatefn=self.update)
270        self.work.queue(n)
271        self.work2.queue(n)
272        self.work3.queue(n)
273        print "Expect updates from Main every second and from thread every 2.5 seconds"
274        print ""
275        self.work.ready(.5)
276        while not self.done:
277            time.sleep(1)
278            print "Main thread %d at %.2f"%(thread.get_ident(),time.clock()-self.starttime)
279
280    def update(self,i=0):
281        print "Update i=%d from thread %d at %.2f"%(i,thread.get_ident(),time.clock()-self.starttime)
282        self.work.ready(2.5)
283
284    def complete(self,total=0.0):
285        print "Complete total=%g from thread %d at %.2f"%(total,thread.get_ident(),time.clock()-self.starttime)
286        self.done = True
287
288if __name__ == "__main__":
289    CalcCommandline()
290
291# version
292__id__ = "$Id: calcthread.py 249 2007-06-15 17:03:01Z ziwen $"
293
294# End of file
Note: See TracBrowser for help on using the repository browser.