source: sasview/park-1.2.1/park/pmap.py @ a8d882a

ESS_GUIESS_GUI_DocsESS_GUI_batch_fittingESS_GUI_bumps_abstractionESS_GUI_iss1116ESS_GUI_iss879ESS_GUI_iss959ESS_GUI_openclESS_GUI_orderingESS_GUI_sync_sascalccostrafo411magnetic_scattrelease-4.1.1release-4.1.2release-4.2.2release_4.0.1ticket-1009ticket-1094-headlessticket-1242-2d-resolutionticket-1243ticket-1249ticket885unittest-saveload
Last change on this file since a8d882a was 9d6d5ba, checked in by pkienzle, 11 years ago

code cleanup

  • Property mode set to 100644
File size: 7.0 KB
Line 
1"""
2Parallel map-reduce implementation using threads.
3"""
4
5import traceback
6import thread
7
8class Collector(object):
9    """
10    Abstract interface to map-reduce accumulator function.
11    """
12    def __call__(self, part):
13        """Receive next part, storing it in the accumulated result"""
14    def finalize(self):
15        """Called when all parts have been accumulated"""
16    def error(self, part, msg):
17        """
18        Exception seen on executing map or reduce.  The collector
19        can adjust the accumulated result appropriately to reflect
20        the error.
21        """
22
23class Mapper(object):
24    """
25    Abstract interface to map-reduce mapper function.
26    """
27    def __call__(self, value):
28        """Evaluate part"""
29    def abort(self):
30        """Stop the mapper"""
31
32def pmap(mapper, inputs):
33    """
34    Apply function mapper to all inputs.
35   
36    This is the serial version of a parallel iterator, yielding the next
37    sequence value as soon as it is available.  There is no guarantee
38    that the order of the inputs will be preserved in the parallel
39    version, so don't depend on it!
40    """
41    for item in inputs:
42        yield mapper(item)
43
44def preduce(collector, outputs):
45    """
46    Collect all outputs, calling collector(item) for each item in the sequence.
47    """
48    for item in outputs:
49        collector(item)
50    return collector
51
52def _pmapreduce_thread(fn, collector, inputs):
53    try:
54        preduce(collector, pmap(fn,inputs))
55        collector.finalize()
56    except KeyboardInterrupt:
57        fn.abort()
58        thread.interrupt_main()
59    #except:
60    #    raise
61    #    msg = traceback.format_exc()
62    #    collector.error(msg)
63
64def _pmapreduce_profile(fn, collector, inputs):
65    import cProfile, pstats, os
66    def mapr():
67        _pmapreduce_thread(fn, collector, inputs)
68    cProfile.runctx('mapr()', dict(mapr=mapr), {}, 'mapr.out')
69    stats = pstats.Stats('mapr.out')
70    #stats.sort_stats('time')
71    stats.sort_stats('calls')
72    stats.print_stats()
73    os.unlink('mapr.out')
74
75profile_mapper = False
76"""True if the mapper cost should be profiled."""
77
78def pmapreduce(mapper, collector, inputs):
79    """
80    Apply function mapper to inputs, accumulating the results in collector.
81   
82    Collector is a function which accepts the result of mapper(item) for
83    each item of inputs.  There is no guarantee that the outputs will be
84    received in order.
85   
86    The map is executed in a separate thread so the function returns
87    to the caller immediately.
88    """
89    global profile_mapper
90    fn = _pmapreduce_profile if profile_mapper else _pmapreduce_thread       
91    thread.start_new_thread(fn,(mapper,collector, inputs))
92
93def main():
94    import time,numpy
95    class TestCollector(object):
96        def __init__(self):
97            self.done = False
98        def __call__(self, part):
99            print "collecting",part,'for',id(self)
100        def finalize(self):
101            self.done = True
102            print "finalizing"
103        def abort(self):
104            self.done = True
105            print "aborting"
106        def error(self,msg):
107            print "error",msg
108   
109    class TestMapper(object):
110        def __call__(self, x): 
111            print "mapping",x,'for',id(self)
112            if x == 8: raise Exception('x is 8')
113            time.sleep(4*numpy.random.rand())
114            return x
115
116    collector1 = TestCollector()
117    collector2 = TestCollector()
118    pmapreduce(TestMapper(), collector1, [1,2,3,4,5])
119    pmapreduce(TestMapper(), collector2, [1,2,3,8])
120    while not collector1.done and not collector2.done:
121        time.sleep(1)
122if __name__ == "__main__": main()
123
124   
125_ = '''
126# The choice of job to do next is complicated.
127# 1. Strongly prefer a job of the same type as is already running
128# on the node.  If this job requires significant resources (e.g.,
129# a large file transfer) increase that preference.
130# 2. Strongly prefer sending a user's own job to their own machine.
131# That way at least they can make progress even if the cluster is busy.
132# 3. Try to start each job as soon as possible.  That way if there are
133# errors, then the user gets feedback early in the process.
134# 4. Try to balance the load across users.  Rather than first come
135# first serve, jobs use round robin amongst users.
136# 5. Prefer high priority jobs.
137
138
139def map(apply,collect,items,priority=1):
140    mapper = MapJob(apply, items, collect, priority)
141   
142class MapJob(object):
143    """
144    Keep track of which jobs have been submitted and which are complete
145    """
146    def __init__(self, workfn, worklist, manager, priority):
147        self.workfn = workfn
148        self.worklist = worklist
149        self.manager = manager
150        self.priority = priority
151        self._priority_edge = 0
152    def next_work(self):
153       
154
155class MapServer(object):
156    """
157    Keep track of work units.
158    """
159    def __init__(self):
160        self.workingset = {}
161       
162    def add_work(self, workfn, worklist, manager, priority):
163        """
164        Add a new work list to distributed to worker objects.  The manager
165        gathers the results of the work.  Work is assigned from the queue
166        based on priority.
167        """
168        job = MapJob(workfn, worklist, manager, priority)
169
170        # add work to the queue in priority order
171        for i,job in enumerate(self.jobs):
172            if job.priority < priority: break
173        self.jobs.insert(i,job)
174           
175        # Create an entry in a persistent store for each job to
176        # capture completed work units and to recover from server
177        # reboot.
178
179        # Assign _priority_edge to cumsum(priority)/total_priority.
180        # This allows us to select the next job according to priority
181        # with a random number generator.
182        # NOTE: scalability is a bit of a concern --- the lookup
183        # operation is linear in the number of active jobs.  This
184        # can be mitigated by limiting the number of active jobs.
185        total_priority = 0.
186        for job in self.jobs: total_priority += job.priority
187        edge = 0.
188        for job in self.jobs:
189            edge += job.priority/total_priority
190            self.job._priority_edge = edge
191       
192   
193    def register(self, pool=None):
194        """
195        Called by a worker when they are registering for work.
196       
197        Returns the next piece of work.
198        """
199        P = numpy.random.rand()
200        for job in self.jobs:
201            if P < j._priority_edge:
202                return job.new_work()
203
204        return NoWork
205
206    def update(self, jobid, result):
207        """
208        Called by a worker when the work unit is complete.
209       
210        Returns the next piece of work.
211        """
212        current_job = self.lookup(jobid)
213        current_job.reduce(result)
214        if numpy.random.rand() < current_job.switch_probability:
215            return current_job.next_work()
216       
217        P = numpy.random.rand()
218        for job in self.jobs:
219            if P < job._priority_edge:
220                if job == current_job:
221                    return curent_job.next_work()
222                else:
223                    return job.new_work()
224       
225        return NoWork
226'''
Note: See TracBrowser for help on using the repository browser.