parallelize#
- parallelize(func, iterarg=None, iterkwargs=None, args=None, kwargs=None, ncpus=None, maxcpu=None, maxmem=None, interval=None, parallelizer=None, serial=False, progress=False, callback=None, globaldict=None, capture=False, die=True, lbkwargs=None, **func_kwargs)[source]#
Execute a function in parallel.
Most simply,
sc.parallelize()acts as a shortcut for usingpool.map. However, it also provides flexibility in how arguments are passed to the function, load balancing, etc.Either or both of
iterargoriterkwargscan be used.iterargcan be an iterable or an integer; if the latter, it will run the function that number of times and not pass the argument to the function (which may be useful for running “embarrassingly parallel” simulations).iterkwargsis a dict of iterables; each iterable must be the same length (and the same length ofiterarg, if it exists), and each dict key will be used as a kwarg to the called function. Any other kwargs passed tosc.parallelize()will also be passed to the function.This function can either use a fixed number of CPUs or allocate dynamically based on load. If
ncpusisNone, then it will allocate the number of CPUs dynamically. Memory (maxmem) and CPU load (maxcpu) limits can also be specified.- Parameters:
func (func) – the function to parallelize
iterarg (list) – the variable(s) to provide to each process (see examples below)
iterkwargs (dict) – another way of providing variables to each process (see examples below)
args (list) – positional arguments for each process, the same for all processes
kwargs (dict) – keyword arguments for each process, the same for all processes
ncpus (int/float) – number of CPUs to use (if <1, treat as a fraction of the total available; if None, use loadbalancer)
maxcpu (float) – maximum CPU load; otherwise, delay the start of the next process (not used if
ncpusis specified)maxmem (float) – maximum fraction of virtual memory (RAM); otherwise, delay the start of the next process
interval (float) – number of seconds to pause between starting processes for checking load
parallelizer (str/func) – parallelization function; default ‘multiprocess’ (see below for details)
serial (bool) – whether to skip parallelization and run in serial (useful for debugging; equivalent to
parallelizer='serial')progress (bool) – whether to show a progress bar
callback (func) – an optional function to call from each worker
globaldict (dict) – an optional global dictionary to pass to each worker via the kwarg “globaldict” (note: may not update properly with low task latency)
capture (bool) – if True, capture the output of the task rather than printing it
die (bool) – whether to stop immediately if an exception is encountered (otherwise, store the exception as the result)
lbkwargs (dict) – if provided, passed to
sc.loadbalancer()func_kwargs (dict) – merged with kwargs (see above)
- Returns:
List of outputs from each process
Example 1 – simple usage as a shortcut to multiprocess.map():
def f(x): return x*x results = sc.parallelize(f, [1,2,3])
Example 2 – simple usage for “embarrassingly parallel” processing:
import numpy as np def rnd(): np.random.seed() return np.random.random() results = sc.parallelize(rnd, 10, ncpus=4)
Example 3 – three different equivalent ways to use multiple arguments:
def f(x,y): return x*y results1 = sc.parallelize(func=f, iterarg=[(1,2),(2,3),(3,4)]) results2 = sc.parallelize(func=f, iterkwargs={'x':[1,2,3], 'y':[2,3,4]}) results3 = sc.parallelize(func=f, iterkwargs=[{'x':1, 'y':2}, {'x':2, 'y':3}, {'x':3, 'y':4}]) assert results1 == results2 == results3
Example 4 – using non-iterated arguments and dynamic load balancing:
def myfunc(i, x, y): np.random.seed() xy = [x+i*np.random.randn(100), y+i*np.random.randn(100)] return xy xylist1 = sc.parallelize(myfunc, iterarg=range(5), kwargs={'x':3, 'y':8}, maxcpu=0.8, interval=0.2) # Use kwargs dict xylist2 = sc.parallelize(myfunc, x=5, y=10, iterarg=[0,1,2], parallelizer='multiprocessing') # Supply kwargs directly and use a different parallelizer for p,xylist in enumerate([xylist1, xylist2]): plt.subplot(2,1,p+1) for i,xy in enumerate(reversed(xylist)): plt.scatter(xy[0], xy[1], label='Run %i'%i) plt.legend()
Example 5 – using a custom parallelization function:
def f(x,y): return [x]*y import multiprocessing as mp pool = mp.Pool(processes=2) results = sc.parallelize(f, iterkwargs=dict(x=[1,2,3], y=[4,5,6]), parallelizer=pool.map) # Note: parallelizer is pool.map, not pool
Example 6 – using Sciris as an interface to Dask:
def f(x,y): return [x]*y def dask_map(task, argslist): import dask queued = [dask.delayed(task)(args) for args in argslist] return list(dask.compute(*queued)) results = sc.parallelize(f, iterkwargs=dict(x=[1,2,3], y=[4,5,6]), parallelizer=dask_map)
Note 1: the default parallelizer
"multiprocess"usesdillfor pickling, so is the most versatile (e.g., it can pickle non-top-level functions). However, it is also the slowest for passing large amounts of data. You can switch between these withparallelizer='fast'(concurrent.futures) andparallelizer='robust'(multiprocess).The
parallelizerargument allows a wide range of different parallelizers (including different aliases for each), and also supports user-supplied ones. Note that in most cases, the default parallelizer will suffice. However, the full list of options is:None,'default','robust','multiprocess': the slow but robust dill-based parallelizermultiprocess'fast','concurrent','concurrent.futures': the faster but more fragile pickle-based Python-default parallelizerconcurrent.futures'multiprocessing': the previous pickle-based Python default parallelizer,multiprocessing'serial','serial-copy': no parallelization (single-threaded); with “-copy”, force pickling'thread'’,'threadpool'’,'thread-copy'’: thread- rather than process-based parallelization (“-copy” as above)User supplied: any
map()-like function that takes in a function and an argument list
Note 2: If parallelizing figure generation, use a non-interactive backend, or make sure (a) figure is closed inside the function call, and (b) the figure object is not returned. Otherwise, parallelization won’t increase speed (and might even be slower than serial!).
Note 3: to use on Windows, parallel calls must contained with an
if __name__ == '__main__'block.For example:
import sciris as sc def f(x,y): return x*y if __name__ == '__main__': results = sc.parallelize(func=f, iterarg=[(1,2),(2,3),(3,4)]) print(results)
Note 4: In Python 3.14, the default process start method on Linux was changed from “fork” to “forkserver”. This does not use copy-on-write to share memory with worker processes but rather behaves more like “spawn” on Mac/Windows. It can also result in an EOFError when using WSL on Windows. To restore the previous behaviour, after importing sciris, set the start method to “fork” as follows:
import multiprocessing multiprocessing.set_start_method("fork", force=True)
New in version 1.1.1: “serial” argumentNew in version 2.0.0: changed default parallelizer frommultiprocess.Pooltoconcurrent.futures.ProcessPoolExecutor; replacedmaxloadwithmaxcpu/maxmem; addedreturnpoolargumentNew in version 2.0.4: added “die” argument; changed exception handlingNew in version 3.0.0: new Parallel class; propagated “die” to jobsNew in version 3.1.0: new “globaldict” argumentNew in version 3.2.5: “capture” and “lbkwargs” arguments