source: sasview/park-1.2.1/park/pmap.py @ 92a52ff2

ESS_GUIESS_GUI_DocsESS_GUI_batch_fittingESS_GUI_bumps_abstractionESS_GUI_iss1116ESS_GUI_iss879ESS_GUI_iss959ESS_GUI_openclESS_GUI_orderingESS_GUI_sync_sascalccostrafo411magnetic_scattrelease-4.1.1release-4.1.2release-4.2.2release_4.0.1ticket-1009ticket-1094-headlessticket-1242-2d-resolutionticket-1243ticket-1249ticket885unittest-saveload
Last change on this file since 92a52ff2 was 3570545, checked in by Mathieu Doucet <doucetm@…>, 13 years ago

Adding park Part 2

  • Property mode set to 100644
File size: 7.0 KB
Line 
1"""
2Parallel map-reduce implementation using threads.
3"""
4
5import traceback
6import thread
7
8class Collector(object):
9    """
10    Abstract interface to map-reduce accumulator function.
11    """
12    def __call__(self, part):
13        """Receive next part, storing it in the accumulated result"""
14    def finalize(self):
15        """Called when all parts have been accumulated"""
16    def error(self, part, msg):
17        """
18        Exception seen on executing map or reduce.  The collector
19        can adjust the accumulated result appropriately to reflect
20        the error.
21        """
22
23class Mapper(object):
24    """
25    Abstract interface to map-reduce mapper function.
26    """
27    def __call__(self, value):
28        """Evaluate part"""
29    def abort(self):
30        """Stop the mapper"""
31
32def pmap(mapper, inputs):
33    """
34    Apply function mapper to all inputs.
35   
36    This is the serial version of a parallel iterator, yielding the next
37    sequence value as soon as it is available.  There is no guarantee
38    that the order of the inputs will be preserved in the parallel
39    version, so don't depend on it!
40    """
41    for item in inputs:
42        yield mapper(item)
43
44def preduce(collector, outputs):
45    """
46    Collect all outputs, calling collector(item) for each item in the sequence.
47    """
48    for item in outputs:
49        collector(item)
50    return collector
51
52def _pmapreduce_thread(fn, collector, inputs):
53    try:
54        preduce(collector, pmap(fn,inputs))
55        collector.finalize()
56    except KeyboardInterrupt:
57        fn.abort()
58        thread.interrupt_main()
59    #except:
60    #    raise
61    #    msg = traceback.format_exc()
62    #    collector.error(msg)
63
64def _pmapreduce_profile(fn, collector, inputs):
65    import cProfile, pstats, os
66    def mapr():
67        _pmapreduce_thread(fn, collector, inputs)
68    cProfile.runctx('mapr()', dict(mapr=mapr), {}, 'mapr.out')
69    stats = pstats.Stats('mapr.out')
70    #stats.sort_stats('time')
71    stats.sort_stats('calls')
72    stats.print_stats()
73    os.unlink('mapr.out')
74
75profile_mapper = False
76"""True if the mapper cost should be profiled."""
77
78def pmapreduce(mapper, collector, inputs):
79    """
80    Apply function mapper to inputs, accumulating the results in collector.
81   
82    Collector is a function which accepts the result of mapper(item) for
83    each item of inputs.  There is no guarantee that the outputs will be
84    received in order.
85   
86    The map is executed in a separate thread so the function returns
87    to the caller immediately.
88    """
89    global profile_mapper
90    fn = _pmapreduce_profile if profile_mapper else _pmapreduce_thread       
91    thread.start_new_thread(fn,(mapper,collector, inputs))
92
93
94def main():
95    import time,numpy
96    class TestCollector(object):
97        def __init__(self):
98            self.done = False
99        def __call__(self, part):
100            print "collecting",part,'for',id(self)
101        def finalize(self):
102            self.done = True
103            print "finalizing"
104        def abort(self):
105            self.done = True
106            print "aborting"
107        def error(self,msg):
108            print "error",msg
109   
110    class TestMapper(object):
111        def __call__(self, x): 
112            print "mapping",x,'for',id(self)
113            if x == 8: raise Exception('x is 8')
114            time.sleep(4*numpy.random.rand())
115            return x
116
117    collector1 = TestCollector()
118    collector2 = TestCollector()
119    pmapreduce(TestMapper(), collector1, [1,2,3,4,5])
120    pmapreduce(TestMapper(), collector2, [1,2,3,8])
121    while not collector1.done and not collector2.done:
122        time.sleep(1)
123if __name__ == "__main__": main()
124
125   
126_ = '''
127# The choice of job to do next is complicated.
128# 1. Strongly prefer a job of the same type as is already running
129# on the node.  If this job requires significant resources (e.g.,
130# a large file transfer) increase that preference.
131# 2. Strongly prefer sending a user's own job to their own machine.
132# That way at least they can make progress even if the cluster is busy.
133# 3. Try to start each job as soon as possible.  That way if there are
134# errors, then the user gets feedback early in the process.
135# 4. Try to balance the load across users.  Rather than first come
136# first serve, jobs use round robin amongst users.
137# 5. Prefer high priority jobs.
138
139
140def map(apply,collect,items,priority=1):
141    mapper = MapJob(apply, items, collect, priority)
142   
143class MapJob(object):
144    """
145    Keep track of which jobs have been submitted and which are complete
146    """
147    def __init__(self, workfn, worklist, manager, priority):
148        self.workfn = workfn
149        self.worklist = worklist
150        self.manager = manager
151        self.priority = priority
152        self._priority_edge = 0
153    def next_work(self):
154       
155
156class MapServer(object):
157    """
158    Keep track of work units.
159    """
160    def __init__(self):
161        self.workingset = {}
162       
163    def add_work(self, workfn, worklist, manager, priority):
164        """
165        Add a new work list to distributed to worker objects.  The manager
166        gathers the results of the work.  Work is assigned from the queue
167        based on priority.
168        """
169        job = MapJob(workfn, worklist, manager, priority)
170
171        # add work to the queue in priority order
172        for i,job in enumerate(self.jobs):
173            if job.priority < priority: break
174        self.jobs.insert(i,job)
175           
176        # Create an entry in a persistent store for each job to
177        # capture completed work units and to recover from server
178        # reboot.
179
180        # Assign _priority_edge to cumsum(priority)/total_priority.
181        # This allows us to select the next job according to priority
182        # with a random number generator.
183        # NOTE: scalability is a bit of a concern --- the lookup
184        # operation is linear in the number of active jobs.  This
185        # can be mitigated by limiting the number of active jobs.
186        total_priority = 0.
187        for job in self.jobs: total_priority += job.priority
188        edge = 0.
189        for job in self.jobs:
190            edge += job.priority/total_priority
191            self.job._priority_edge = edge
192       
193   
194    def register(self, pool=None):
195        """
196        Called by a worker when they are registering for work.
197       
198        Returns the next piece of work.
199        """
200        P = numpy.random.rand()
201        for job in self.jobs:
202            if P < j._priority_edge:
203                return job.new_work()
204
205        return NoWork
206
207    def update(self, jobid, result):
208        """
209        Called by a worker when the work unit is complete.
210       
211        Returns the next piece of work.
212        """
213        current_job = self.lookup(jobid)
214        current_job.reduce(result)
215        if numpy.random.rand() < current_job.switch_probability:
216            return current_job.next_work()
217       
218        P = numpy.random.rand()
219        for job in self.jobs:
220            if P < job._priority_edge:
221                if job == current_job:
222                    return curent_job.next_work()
223                else:
224                    return job.new_work()
225       
226        return NoWork
227'''
Note: See TracBrowser for help on using the repository browser.