source: sasview/_modules/park/pmap.html @ a462c6a

gh-pages
Last change on this file since a462c6a was a462c6a, checked in by ajj, 9 years ago

Rebuild to fix index and modules docs

  • Property mode set to 100644
File size: 24.6 KB
Line 
1<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN"
2  "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd">
3
4
5<html xmlns="http://www.w3.org/1999/xhtml">
6  <head>
7    <meta http-equiv="Content-Type" content="text/html; charset=utf-8" />
8   
9    <title>park.pmap &mdash; SasView 3.0.0 documentation</title>
10   
11    <link rel="stylesheet" href="../../_static/default.css" type="text/css" />
12    <link rel="stylesheet" href="../../_static/pygments.css" type="text/css" />
13   
14    <script type="text/javascript">
15      var DOCUMENTATION_OPTIONS = {
16        URL_ROOT:    '../../',
17        VERSION:     '3.0.0',
18        COLLAPSE_INDEX: false,
19        FILE_SUFFIX: '.html',
20        HAS_SOURCE:  true
21      };
22    </script>
23    <script type="text/javascript" src="../../_static/jquery.js"></script>
24    <script type="text/javascript" src="../../_static/underscore.js"></script>
25    <script type="text/javascript" src="../../_static/doctools.js"></script>
26    <script type="text/javascript" src="http://cdn.mathjax.org/mathjax/latest/MathJax.js?config=TeX-AMS-MML_HTMLorMML"></script>
27    <link rel="top" title="SasView 3.0.0 documentation" href="../../index.html" />
28    <link rel="up" title="Module code" href="../index.html" /> 
29  </head>
30  <body>
31    <div class="related">
32      <h3>Navigation</h3>
33      <ul>
34        <li class="right" style="margin-right: 10px">
35          <a href="../../genindex.html" title="General Index"
36             accesskey="I">index</a></li>
37        <li class="right" >
38          <a href="../../py-modindex.html" title="Python Module Index"
39             >modules</a> |</li>
40        <li><a href="../../index.html">SasView 3.0.0 documentation</a> &raquo;</li>
41          <li><a href="../index.html" accesskey="U">Module code</a> &raquo;</li> 
42      </ul>
43    </div> 
44
45    <div class="document">
46      <div class="documentwrapper">
47        <div class="bodywrapper">
48          <div class="body">
49           
50  <h1>Source code for park.pmap</h1><div class="highlight"><pre>
51<span class="sd">&quot;&quot;&quot;</span>
52<span class="sd">Parallel map-reduce implementation using threads.</span>
53<span class="sd">&quot;&quot;&quot;</span>
54
55<span class="kn">import</span> <span class="nn">traceback</span>
56<span class="kn">import</span> <span class="nn">thread</span>
57
58<div class="viewcode-block" id="Collector"><a class="viewcode-back" href="../../dev/api/park.html#park.pmap.Collector">[docs]</a><span class="k">class</span> <span class="nc">Collector</span><span class="p">(</span><span class="nb">object</span><span class="p">):</span>
59    <span class="sd">&quot;&quot;&quot;</span>
60<span class="sd">    Abstract interface to map-reduce accumulator function.</span>
61<span class="sd">    &quot;&quot;&quot;</span>
62    <span class="k">def</span> <span class="nf">__call__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">part</span><span class="p">):</span>
63        <span class="sd">&quot;&quot;&quot;Receive next part, storing it in the accumulated result&quot;&quot;&quot;</span>
64<div class="viewcode-block" id="Collector.finalize"><a class="viewcode-back" href="../../dev/api/park.html#park.pmap.Collector.finalize">[docs]</a>    <span class="k">def</span> <span class="nf">finalize</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
65        <span class="sd">&quot;&quot;&quot;Called when all parts have been accumulated&quot;&quot;&quot;</span></div>
66<div class="viewcode-block" id="Collector.error"><a class="viewcode-back" href="../../dev/api/park.html#park.pmap.Collector.error">[docs]</a>    <span class="k">def</span> <span class="nf">error</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">part</span><span class="p">,</span> <span class="n">msg</span><span class="p">):</span>
67        <span class="sd">&quot;&quot;&quot;</span>
68<span class="sd">        Exception seen on executing map or reduce.  The collector</span>
69<span class="sd">        can adjust the accumulated result appropriately to reflect</span>
70<span class="sd">        the error.</span>
71<span class="sd">        &quot;&quot;&quot;</span>
72</div></div>
73<div class="viewcode-block" id="Mapper"><a class="viewcode-back" href="../../dev/api/park.html#park.pmap.Mapper">[docs]</a><span class="k">class</span> <span class="nc">Mapper</span><span class="p">(</span><span class="nb">object</span><span class="p">):</span>
74    <span class="sd">&quot;&quot;&quot;</span>
75<span class="sd">    Abstract interface to map-reduce mapper function.</span>
76<span class="sd">    &quot;&quot;&quot;</span>
77    <span class="k">def</span> <span class="nf">__call__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">value</span><span class="p">):</span>
78        <span class="sd">&quot;&quot;&quot;Evaluate part&quot;&quot;&quot;</span>
79<div class="viewcode-block" id="Mapper.abort"><a class="viewcode-back" href="../../dev/api/park.html#park.pmap.Mapper.abort">[docs]</a>    <span class="k">def</span> <span class="nf">abort</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
80        <span class="sd">&quot;&quot;&quot;Stop the mapper&quot;&quot;&quot;</span>
81</div></div>
82<div class="viewcode-block" id="pmap"><a class="viewcode-back" href="../../dev/api/park.html#park.pmap.pmap">[docs]</a><span class="k">def</span> <span class="nf">pmap</span><span class="p">(</span><span class="n">mapper</span><span class="p">,</span> <span class="n">inputs</span><span class="p">):</span>
83    <span class="sd">&quot;&quot;&quot;</span>
84<span class="sd">    Apply function mapper to all inputs.</span>
85<span class="sd">    </span>
86<span class="sd">    This is the serial version of a parallel iterator, yielding the next </span>
87<span class="sd">    sequence value as soon as it is available.  There is no guarantee </span>
88<span class="sd">    that the order of the inputs will be preserved in the parallel</span>
89<span class="sd">    version, so don&#39;t depend on it!</span>
90<span class="sd">    &quot;&quot;&quot;</span>
91    <span class="k">for</span> <span class="n">item</span> <span class="ow">in</span> <span class="n">inputs</span><span class="p">:</span>
92        <span class="k">yield</span> <span class="n">mapper</span><span class="p">(</span><span class="n">item</span><span class="p">)</span>
93</div>
94<div class="viewcode-block" id="preduce"><a class="viewcode-back" href="../../dev/api/park.html#park.pmap.preduce">[docs]</a><span class="k">def</span> <span class="nf">preduce</span><span class="p">(</span><span class="n">collector</span><span class="p">,</span> <span class="n">outputs</span><span class="p">):</span>
95    <span class="sd">&quot;&quot;&quot;</span>
96<span class="sd">    Collect all outputs, calling collector(item) for each item in the sequence.</span>
97<span class="sd">    &quot;&quot;&quot;</span>
98    <span class="k">for</span> <span class="n">item</span> <span class="ow">in</span> <span class="n">outputs</span><span class="p">:</span>
99        <span class="n">collector</span><span class="p">(</span><span class="n">item</span><span class="p">)</span>
100    <span class="k">return</span> <span class="n">collector</span>
101</div>
102<span class="k">def</span> <span class="nf">_pmapreduce_thread</span><span class="p">(</span><span class="n">fn</span><span class="p">,</span> <span class="n">collector</span><span class="p">,</span> <span class="n">inputs</span><span class="p">):</span>
103    <span class="k">try</span><span class="p">:</span>
104        <span class="n">preduce</span><span class="p">(</span><span class="n">collector</span><span class="p">,</span> <span class="n">pmap</span><span class="p">(</span><span class="n">fn</span><span class="p">,</span><span class="n">inputs</span><span class="p">))</span>
105        <span class="n">collector</span><span class="o">.</span><span class="n">finalize</span><span class="p">()</span>
106    <span class="k">except</span> <span class="ne">KeyboardInterrupt</span><span class="p">:</span>
107        <span class="n">fn</span><span class="o">.</span><span class="n">abort</span><span class="p">()</span>
108        <span class="n">thread</span><span class="o">.</span><span class="n">interrupt_main</span><span class="p">()</span>
109    <span class="c">#except:</span>
110    <span class="c">#    raise</span>
111    <span class="c">#    msg = traceback.format_exc()</span>
112    <span class="c">#    collector.error(msg)</span>
113
114<span class="k">def</span> <span class="nf">_pmapreduce_profile</span><span class="p">(</span><span class="n">fn</span><span class="p">,</span> <span class="n">collector</span><span class="p">,</span> <span class="n">inputs</span><span class="p">):</span>
115    <span class="kn">import</span> <span class="nn">cProfile</span><span class="o">,</span> <span class="nn">pstats</span><span class="o">,</span> <span class="nn">os</span>
116    <span class="k">def</span> <span class="nf">mapr</span><span class="p">():</span>
117        <span class="n">_pmapreduce_thread</span><span class="p">(</span><span class="n">fn</span><span class="p">,</span> <span class="n">collector</span><span class="p">,</span> <span class="n">inputs</span><span class="p">)</span>
118    <span class="n">cProfile</span><span class="o">.</span><span class="n">runctx</span><span class="p">(</span><span class="s">&#39;mapr()&#39;</span><span class="p">,</span> <span class="nb">dict</span><span class="p">(</span><span class="n">mapr</span><span class="o">=</span><span class="n">mapr</span><span class="p">),</span> <span class="p">{},</span> <span class="s">&#39;mapr.out&#39;</span><span class="p">)</span>
119    <span class="n">stats</span> <span class="o">=</span> <span class="n">pstats</span><span class="o">.</span><span class="n">Stats</span><span class="p">(</span><span class="s">&#39;mapr.out&#39;</span><span class="p">)</span>
120    <span class="c">#stats.sort_stats(&#39;time&#39;)</span>
121    <span class="n">stats</span><span class="o">.</span><span class="n">sort_stats</span><span class="p">(</span><span class="s">&#39;calls&#39;</span><span class="p">)</span>
122    <span class="n">stats</span><span class="o">.</span><span class="n">print_stats</span><span class="p">()</span>
123    <span class="n">os</span><span class="o">.</span><span class="n">unlink</span><span class="p">(</span><span class="s">&#39;mapr.out&#39;</span><span class="p">)</span>
124
125<span class="n">profile_mapper</span> <span class="o">=</span> <span class="bp">False</span>
126<span class="sd">&quot;&quot;&quot;True if the mapper cost should be profiled.&quot;&quot;&quot;</span>
127
128<div class="viewcode-block" id="pmapreduce"><a class="viewcode-back" href="../../dev/api/park.html#park.pmap.pmapreduce">[docs]</a><span class="k">def</span> <span class="nf">pmapreduce</span><span class="p">(</span><span class="n">mapper</span><span class="p">,</span> <span class="n">collector</span><span class="p">,</span> <span class="n">inputs</span><span class="p">):</span>
129    <span class="sd">&quot;&quot;&quot;</span>
130<span class="sd">    Apply function mapper to inputs, accumulating the results in collector.</span>
131<span class="sd">    </span>
132<span class="sd">    Collector is a function which accepts the result of mapper(item) for </span>
133<span class="sd">    each item of inputs.  There is no guarantee that the outputs will be</span>
134<span class="sd">    received in order.</span>
135<span class="sd">    </span>
136<span class="sd">    The map is executed in a separate thread so the function returns</span>
137<span class="sd">    to the caller immediately.</span>
138<span class="sd">    &quot;&quot;&quot;</span>
139    <span class="k">global</span> <span class="n">profile_mapper</span>
140    <span class="n">fn</span> <span class="o">=</span> <span class="n">_pmapreduce_profile</span> <span class="k">if</span> <span class="n">profile_mapper</span> <span class="k">else</span> <span class="n">_pmapreduce_thread</span>       
141    <span class="n">thread</span><span class="o">.</span><span class="n">start_new_thread</span><span class="p">(</span><span class="n">fn</span><span class="p">,(</span><span class="n">mapper</span><span class="p">,</span><span class="n">collector</span><span class="p">,</span> <span class="n">inputs</span><span class="p">))</span>
142</div>
143<div class="viewcode-block" id="main"><a class="viewcode-back" href="../../dev/api/park.html#park.pmap.main">[docs]</a><span class="k">def</span> <span class="nf">main</span><span class="p">():</span>
144    <span class="kn">import</span> <span class="nn">time</span><span class="o">,</span><span class="nn">numpy</span>
145    <span class="k">class</span> <span class="nc">TestCollector</span><span class="p">(</span><span class="nb">object</span><span class="p">):</span>
146        <span class="k">def</span> <span class="nf">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
147            <span class="bp">self</span><span class="o">.</span><span class="n">done</span> <span class="o">=</span> <span class="bp">False</span>
148        <span class="k">def</span> <span class="nf">__call__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">part</span><span class="p">):</span>
149            <span class="k">print</span> <span class="s">&quot;collecting&quot;</span><span class="p">,</span><span class="n">part</span><span class="p">,</span><span class="s">&#39;for&#39;</span><span class="p">,</span><span class="nb">id</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span>
150        <span class="k">def</span> <span class="nf">finalize</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
151            <span class="bp">self</span><span class="o">.</span><span class="n">done</span> <span class="o">=</span> <span class="bp">True</span>
152            <span class="k">print</span> <span class="s">&quot;finalizing&quot;</span>
153        <span class="k">def</span> <span class="nf">abort</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
154            <span class="bp">self</span><span class="o">.</span><span class="n">done</span> <span class="o">=</span> <span class="bp">True</span>
155            <span class="k">print</span> <span class="s">&quot;aborting&quot;</span>
156        <span class="k">def</span> <span class="nf">error</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span><span class="n">msg</span><span class="p">):</span>
157            <span class="k">print</span> <span class="s">&quot;error&quot;</span><span class="p">,</span><span class="n">msg</span>
158   
159    <span class="k">class</span> <span class="nc">TestMapper</span><span class="p">(</span><span class="nb">object</span><span class="p">):</span>
160        <span class="k">def</span> <span class="nf">__call__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">x</span><span class="p">):</span> 
161            <span class="k">print</span> <span class="s">&quot;mapping&quot;</span><span class="p">,</span><span class="n">x</span><span class="p">,</span><span class="s">&#39;for&#39;</span><span class="p">,</span><span class="nb">id</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span>
162            <span class="k">if</span> <span class="n">x</span> <span class="o">==</span> <span class="mi">8</span><span class="p">:</span> <span class="k">raise</span> <span class="ne">Exception</span><span class="p">(</span><span class="s">&#39;x is 8&#39;</span><span class="p">)</span>
163            <span class="n">time</span><span class="o">.</span><span class="n">sleep</span><span class="p">(</span><span class="mi">4</span><span class="o">*</span><span class="n">numpy</span><span class="o">.</span><span class="n">random</span><span class="o">.</span><span class="n">rand</span><span class="p">())</span>
164            <span class="k">return</span> <span class="n">x</span>
165
166    <span class="n">collector1</span> <span class="o">=</span> <span class="n">TestCollector</span><span class="p">()</span>
167    <span class="n">collector2</span> <span class="o">=</span> <span class="n">TestCollector</span><span class="p">()</span>
168    <span class="n">pmapreduce</span><span class="p">(</span><span class="n">TestMapper</span><span class="p">(),</span> <span class="n">collector1</span><span class="p">,</span> <span class="p">[</span><span class="mi">1</span><span class="p">,</span><span class="mi">2</span><span class="p">,</span><span class="mi">3</span><span class="p">,</span><span class="mi">4</span><span class="p">,</span><span class="mi">5</span><span class="p">])</span>
169    <span class="n">pmapreduce</span><span class="p">(</span><span class="n">TestMapper</span><span class="p">(),</span> <span class="n">collector2</span><span class="p">,</span> <span class="p">[</span><span class="mi">1</span><span class="p">,</span><span class="mi">2</span><span class="p">,</span><span class="mi">3</span><span class="p">,</span><span class="mi">8</span><span class="p">])</span>
170    <span class="k">while</span> <span class="ow">not</span> <span class="n">collector1</span><span class="o">.</span><span class="n">done</span> <span class="ow">and</span> <span class="ow">not</span> <span class="n">collector2</span><span class="o">.</span><span class="n">done</span><span class="p">:</span>
171        <span class="n">time</span><span class="o">.</span><span class="n">sleep</span><span class="p">(</span><span class="mi">1</span><span class="p">)</span></div>
172<span class="k">if</span> <span class="n">__name__</span> <span class="o">==</span> <span class="s">&quot;__main__&quot;</span><span class="p">:</span> <span class="n">main</span><span class="p">()</span>
173
174   
175<span class="n">_</span> <span class="o">=</span> <span class="s">&#39;&#39;&#39;</span>
176<span class="s"># The choice of job to do next is complicated.</span>
177<span class="s"># 1. Strongly prefer a job of the same type as is already running</span>
178<span class="s"># on the node.  If this job requires significant resources (e.g.,</span>
179<span class="s"># a large file transfer) increase that preference.</span>
180<span class="s"># 2. Strongly prefer sending a user&#39;s own job to their own machine.</span>
181<span class="s"># That way at least they can make progress even if the cluster is busy.</span>
182<span class="s"># 3. Try to start each job as soon as possible.  That way if there are</span>
183<span class="s"># errors, then the user gets feedback early in the process.</span>
184<span class="s"># 4. Try to balance the load across users.  Rather than first come</span>
185<span class="s"># first serve, jobs use round robin amongst users.</span>
186<span class="s"># 5. Prefer high priority jobs.</span>
187
188
189<span class="s">def map(apply,collect,items,priority=1):</span>
190<span class="s">    mapper = MapJob(apply, items, collect, priority)</span>
191<span class="s">    </span>
192<span class="s">class MapJob(object):</span>
193<span class="s">    &quot;&quot;&quot;</span>
194<span class="s">    Keep track of which jobs have been submitted and which are complete</span>
195<span class="s">    &quot;&quot;&quot;</span>
196<span class="s">    def __init__(self, workfn, worklist, manager, priority):</span>
197<span class="s">        self.workfn = workfn</span>
198<span class="s">        self.worklist = worklist</span>
199<span class="s">        self.manager = manager</span>
200<span class="s">        self.priority = priority</span>
201<span class="s">        self._priority_edge = 0</span>
202<span class="s">    def next_work(self):</span>
203<span class="s">        </span>
204
205<span class="s">class MapServer(object):</span>
206<span class="s">    &quot;&quot;&quot;</span>
207<span class="s">    Keep track of work units.</span>
208<span class="s">    &quot;&quot;&quot;</span>
209<span class="s">    def __init__(self):</span>
210<span class="s">        self.workingset = {}</span>
211<span class="s">        </span>
212<span class="s">    def add_work(self, workfn, worklist, manager, priority):</span>
213<span class="s">        &quot;&quot;&quot;</span>
214<span class="s">        Add a new work list to distributed to worker objects.  The manager</span>
215<span class="s">        gathers the results of the work.  Work is assigned from the queue</span>
216<span class="s">        based on priority.</span>
217<span class="s">        &quot;&quot;&quot;</span>
218<span class="s">        job = MapJob(workfn, worklist, manager, priority)</span>
219
220<span class="s">        # add work to the queue in priority order</span>
221<span class="s">        for i,job in enumerate(self.jobs):</span>
222<span class="s">            if job.priority &lt; priority: break</span>
223<span class="s">        self.jobs.insert(i,job)</span>
224<span class="s">            </span>
225<span class="s">        # Create an entry in a persistent store for each job to</span>
226<span class="s">        # capture completed work units and to recover from server</span>
227<span class="s">        # reboot.</span>
228
229<span class="s">        # Assign _priority_edge to cumsum(priority)/total_priority.</span>
230<span class="s">        # This allows us to select the next job according to priority</span>
231<span class="s">        # with a random number generator.</span>
232<span class="s">        # NOTE: scalability is a bit of a concern --- the lookup</span>
233<span class="s">        # operation is linear in the number of active jobs.  This</span>
234<span class="s">        # can be mitigated by limiting the number of active jobs.</span>
235<span class="s">        total_priority = 0.</span>
236<span class="s">        for job in self.jobs: total_priority += job.priority</span>
237<span class="s">        edge = 0.</span>
238<span class="s">        for job in self.jobs:</span>
239<span class="s">            edge += job.priority/total_priority</span>
240<span class="s">            self.job._priority_edge = edge</span>
241<span class="s">        </span>
242<span class="s">    </span>
243<span class="s">    def register(self, pool=None):</span>
244<span class="s">        &quot;&quot;&quot;</span>
245<span class="s">        Called by a worker when they are registering for work.</span>
246<span class="s">        </span>
247<span class="s">        Returns the next piece of work.</span>
248<span class="s">        &quot;&quot;&quot;</span>
249<span class="s">        P = numpy.random.rand()</span>
250<span class="s">        for job in self.jobs:</span>
251<span class="s">            if P &lt; j._priority_edge:</span>
252<span class="s">                return job.new_work()</span>
253
254<span class="s">        return NoWork</span>
255
256<span class="s">    def update(self, jobid, result):</span>
257<span class="s">        &quot;&quot;&quot;</span>
258<span class="s">        Called by a worker when the work unit is complete.</span>
259<span class="s">        </span>
260<span class="s">        Returns the next piece of work.</span>
261<span class="s">        &quot;&quot;&quot;</span>
262<span class="s">        current_job = self.lookup(jobid)</span>
263<span class="s">        current_job.reduce(result)</span>
264<span class="s">        if numpy.random.rand() &lt; current_job.switch_probability:</span>
265<span class="s">            return current_job.next_work()</span>
266<span class="s">        </span>
267<span class="s">        P = numpy.random.rand()</span>
268<span class="s">        for job in self.jobs:</span>
269<span class="s">            if P &lt; job._priority_edge:</span>
270<span class="s">                if job == current_job:</span>
271<span class="s">                    return curent_job.next_work()</span>
272<span class="s">                else:</span>
273<span class="s">                    return job.new_work()</span>
274<span class="s">        </span>
275<span class="s">        return NoWork</span>
276<span class="s">&#39;&#39;&#39;</span>
277</pre></div>
278
279          </div>
280        </div>
281      </div>
282      <div class="sphinxsidebar">
283        <div class="sphinxsidebarwrapper">
284<div id="searchbox" style="display: none">
285  <h3>Quick search</h3>
286    <form class="search" action="../../search.html" method="get">
287      <input type="text" name="q" />
288      <input type="submit" value="Go" />
289      <input type="hidden" name="check_keywords" value="yes" />
290      <input type="hidden" name="area" value="default" />
291    </form>
292    <p class="searchtip" style="font-size: 90%">
293    Enter search terms or a module, class or function name.
294    </p>
295</div>
296<script type="text/javascript">$('#searchbox').show(0);</script>
297        </div>
298      </div>
299      <div class="clearer"></div>
300    </div>
301    <div class="related">
302      <h3>Navigation</h3>
303      <ul>
304        <li class="right" style="margin-right: 10px">
305          <a href="../../genindex.html" title="General Index"
306             >index</a></li>
307        <li class="right" >
308          <a href="../../py-modindex.html" title="Python Module Index"
309             >modules</a> |</li>
310        <li><a href="../../index.html">SasView 3.0.0 documentation</a> &raquo;</li>
311          <li><a href="../index.html" >Module code</a> &raquo;</li> 
312      </ul>
313    </div>
314    <div class="footer">
315        &copy; Copyright 2013, The SasView Project.
316      Created using <a href="http://sphinx-doc.org/">Sphinx</a> 1.2.3.
317    </div>
318  </body>
319</html>
Note: See TracBrowser for help on using the repository browser.