Source code for park.pmap

"""
Parallel map-reduce implementation using threads.
"""

import traceback
import thread

[docs]class Collector(object): """ Abstract interface to map-reduce accumulator function. """ def __call__(self, part): """Receive next part, storing it in the accumulated result"""
[docs] def finalize(self): """Called when all parts have been accumulated"""
[docs] def error(self, part, msg): """ Exception seen on executing map or reduce. The collector can adjust the accumulated result appropriately to reflect the error. """
[docs]class Mapper(object): """ Abstract interface to map-reduce mapper function. """ def __call__(self, value): """Evaluate part"""
[docs] def abort(self): """Stop the mapper"""
[docs]def pmap(mapper, inputs): """ Apply function mapper to all inputs. This is the serial version of a parallel iterator, yielding the next sequence value as soon as it is available. There is no guarantee that the order of the inputs will be preserved in the parallel version, so don't depend on it! """ for item in inputs: yield mapper(item)
[docs]def preduce(collector, outputs): """ Collect all outputs, calling collector(item) for each item in the sequence. """ for item in outputs: collector(item) return collector
def _pmapreduce_thread(fn, collector, inputs): try: preduce(collector, pmap(fn,inputs)) collector.finalize() except KeyboardInterrupt: fn.abort() thread.interrupt_main() #except: # raise # msg = traceback.format_exc() # collector.error(msg) def _pmapreduce_profile(fn, collector, inputs): import cProfile, pstats, os def mapr(): _pmapreduce_thread(fn, collector, inputs) cProfile.runctx('mapr()', dict(mapr=mapr), {}, 'mapr.out') stats = pstats.Stats('mapr.out') #stats.sort_stats('time') stats.sort_stats('calls') stats.print_stats() os.unlink('mapr.out') profile_mapper = False """True if the mapper cost should be profiled."""
[docs]def pmapreduce(mapper, collector, inputs): """ Apply function mapper to inputs, accumulating the results in collector. Collector is a function which accepts the result of mapper(item) for each item of inputs. There is no guarantee that the outputs will be received in order. The map is executed in a separate thread so the function returns to the caller immediately. """ global profile_mapper fn = _pmapreduce_profile if profile_mapper else _pmapreduce_thread thread.start_new_thread(fn,(mapper,collector, inputs))
[docs]def main(): import time,numpy class TestCollector(object): def __init__(self): self.done = False def __call__(self, part): print "collecting",part,'for',id(self) def finalize(self): self.done = True print "finalizing" def abort(self): self.done = True print "aborting" def error(self,msg): print "error",msg class TestMapper(object): def __call__(self, x): print "mapping",x,'for',id(self) if x == 8: raise Exception('x is 8') time.sleep(4*numpy.random.rand()) return x collector1 = TestCollector() collector2 = TestCollector() pmapreduce(TestMapper(), collector1, [1,2,3,4,5]) pmapreduce(TestMapper(), collector2, [1,2,3,8]) while not collector1.done and not collector2.done: time.sleep(1)
if __name__ == "__main__": main() _ = ''' # The choice of job to do next is complicated. # 1. Strongly prefer a job of the same type as is already running # on the node. If this job requires significant resources (e.g., # a large file transfer) increase that preference. # 2. Strongly prefer sending a user's own job to their own machine. # That way at least they can make progress even if the cluster is busy. # 3. Try to start each job as soon as possible. That way if there are # errors, then the user gets feedback early in the process. # 4. Try to balance the load across users. Rather than first come # first serve, jobs use round robin amongst users. # 5. Prefer high priority jobs. def map(apply,collect,items,priority=1): mapper = MapJob(apply, items, collect, priority) class MapJob(object): """ Keep track of which jobs have been submitted and which are complete """ def __init__(self, workfn, worklist, manager, priority): self.workfn = workfn self.worklist = worklist self.manager = manager self.priority = priority self._priority_edge = 0 def next_work(self): class MapServer(object): """ Keep track of work units. """ def __init__(self): self.workingset = {} def add_work(self, workfn, worklist, manager, priority): """ Add a new work list to distributed to worker objects. The manager gathers the results of the work. Work is assigned from the queue based on priority. """ job = MapJob(workfn, worklist, manager, priority) # add work to the queue in priority order for i,job in enumerate(self.jobs): if job.priority < priority: break self.jobs.insert(i,job) # Create an entry in a persistent store for each job to # capture completed work units and to recover from server # reboot. # Assign _priority_edge to cumsum(priority)/total_priority. # This allows us to select the next job according to priority # with a random number generator. # NOTE: scalability is a bit of a concern --- the lookup # operation is linear in the number of active jobs. This # can be mitigated by limiting the number of active jobs. total_priority = 0. for job in self.jobs: total_priority += job.priority edge = 0. for job in self.jobs: edge += job.priority/total_priority self.job._priority_edge = edge def register(self, pool=None): """ Called by a worker when they are registering for work. Returns the next piece of work. """ P = numpy.random.rand() for job in self.jobs: if P < j._priority_edge: return job.new_work() return NoWork def update(self, jobid, result): """ Called by a worker when the work unit is complete. Returns the next piece of work. """ current_job = self.lookup(jobid) current_job.reduce(result) if numpy.random.rand() < current_job.switch_probability: return current_job.next_work() P = numpy.random.rand() for job in self.jobs: if P < job._priority_edge: if job == current_job: return curent_job.next_work() else: return job.new_work() return NoWork '''