Parallelism
This package provides a parallel work mapping function called
parallel_map() to allow for stream
parallelism of arbitrary work functions using multi-threading or
multi-processing.
Which this method of parallelism is not universally the most efficient, it is
intended to operate at it’s best under situations where work functions are
dynamic or parallelism with streaming input is required.
Reference
- smqtk_descriptors.utils.parallel.parallel_map(work_func: Callable[[...], T_co], *sequences: Iterable, **kwargs: Any) ParallelResultsIterator[T_co][source]
Generalized local parallelization helper for executing embarrassingly parallel functions on an iterable of input data. This function then yields work results for data, optionally in the order that they were provided to this function.
By default, we act like
itertools.izipin regards to input sequences, whereby we stop performing work as soon as one of the input sequences is exhausted. The optional keyword argumentfill_voidmay be specified to enable sequence handling likeitertools.zip_longestwhere the longest sequence determines what is iterated, and the value given tofill_voidis used as the fill value.This is intended to be able to replace
multiprocessing.pool.Poolandmultiprocessing.pool.ThreadPooluses with the added benefit of:No set-up or clean-up needed
No performance loss compared to
multiprocessing.poolclasses for non-trivial work functions (like IO operations).We can iterate results as they are ready (optionally in order of input)
Lambda or on-the-fly function can be provided as the work function when using multiprocessing.
Buffered input/output queues so that mapping work of a very large input set doesn’t overrun your memory (e.g. iterating over many large vectors/matrices).
This function is, however, slower than multiprocessing.pool classes for trivial functions, like using the function
ordover a set of characters.Input data given to
sequencesmust be picklable in order to transport to worker threads/processes.Input Iteration and Results Buffering
The buffer factor,
F, operates with the number of utilized cores,C, to create an upper bound on the times the input sequences are iterated and the number of work function outputs are held in memory at any given time.The maximum number of input sequence items loaded at a time is
floor(C * F) + C. This is due to the input work queuemaxsizebeing set tofloor(C*F)while there can beCworkers could be utilizing their inputs to complete their work instances.The maximum number of results queued is
floor(C * F) + C. This is similarly due to the output result queue maxsize being set tofloor(C * F)while there can beCworkers blocked on putting values into a full results queue.Sometimes its important to know how much farther ahead the input iterator(s) have yielded compared to the number of output results from the
ParallelResultsIterator. For some yielded result at indexN, the input iterator(s) next yielded item should be their indexN + (2 * floor(C * F) + C). This is derived from the input work and output result queues maximally filled with at mostfloor(C * F)items and there beingCworkers working on, or attempting to queue results for, their current inputs. For example, if we have useC=4andF=1.5, if result index N has just been yielded, then the input iterators are ready to yield theirN + 16-th indexed item (2 * floor(4*1.5) + 4 = 2 * 6 + 4 = 16).The above is only guaranteed no the
orderedoption isFalse, otherwise non-determinism in processing order can cause results for input items to return out of order, causing additional buffering in the heap used to ensure ordered output which, necessarily, has no size limits so as to not dead-lock.- param work_func:
Function that performs some work on input data, resulting in some returned value.
When in multiprocessing mode, this cannot be a local function or a transport error will occur when trying to move the function to the worker process.
- param sequences:
Input data to apply to the given
work_funcfunction. If more than one sequence is given, the function is called with an argument list consisting of the corresponding item of each sequence. While we expect Iterable types to be provided here, they will only be observed by zip/zip_longest at most once. See iteration rules for zip/zip_longest for details.- param kwargs:
Optionally available keyword arguments are as follows:
- fill_void
Optional value that, if specified, activates sequence handling like that of
itertools.zip_longest, using the provided value as a fill-in for shorter sequences until the longest sequence is exhausted.type: Any
default: No default
- ordered
If results for input elements should be yielded in the same order as input elements. If False, we yield results as soon as they are collected.
type: bool
default: True
- buffer_factor
Multiplier against the number of processes used to limit the growth size of the result queue coming from worker processes (
int(cores * buffer_factor)). This is utilized so we don’t overrun our RAM buffering results.type: float
default: 2.0
- cores
Optional specification of the number of threads/cores to use. If None, we will attempt to use all available threads/cores.
type: None | int | long
default: None
- use_multiprocessing
Whether or not to use discrete processes as the parallelization agent vs python threads.
type: bool
default: False
- heart_beat
Interval at which workers check for operational messages while waiting on locks (e.g. waiting to push or pull messages). This ensures that workers are not left hanging, or hang the program, when and error or interruption occurs, or when waiting on an full edge. This must be >0.
type: float
default: 0.001
- name
Optional string name for identifying workers and logging messages.
Nonemeans no names are added.type: str
default: None
- daemon
Optional flag for if started threads/processes are flagged as daemonic. This is should probably nearly always be True (the default) otherwise related threads/processes can hang the main process.
type: bool
default: True
- return:
A new parallel results iterator that starts work on the input iterable when iterated.
Example
>>> import math >>> result_iter = parallel_map(math.factorial, range(10), ... use_multiprocessing=True) >>> sorted(result_iter) [1, 1, 2, 6, 24, 120, 720, 5040, 40320, 362880]
- class smqtk_descriptors.utils.parallel.ParallelResultsIterator(name: str | None, ordered: bool, is_multiprocessing: bool, heart_beat: float, work_queue: Queue | Queue, results_queue: Queue | Queue, feeder_thread: _FeedQueueThread, workers: Sequence[_WorkerThread | _WorkerProcess], daemon: bool)[source]
Iterator return from a parallel mapping job, managing workers and output results queue consumption.
A parallel work mapping jobs may be canceled through this object.
- Parameters:
name – String name to attribute to this iterator. May be None.
ordered – If this results iterator should yield results in a congruent order to the input parameter sequences. If this is false then this results iterator will yield results as soon as they are available regardless of the input parameter sequence order.
is_multiprocessing – If workers are processes vs. threads. When this is true, extra steps are taken to appropriately shutdown processes.
heart_beat – How long in seconds we wait when polling for data on the results queue before momentarily giving up to allow a cycle of the loop. This is important in allowing an external signal to indicate we should stop iterating (prevents hanging on getting the next result value).
work_queue – Queue into which work is placed by the feeder thread. This object is responsible for cleaning up this queue, if applicable, upon iteration termination.
results_queue – Queue from which work results are pulled. This object is responsible for cleaning up this queue, if applicable, upon iteration termination.
feeder_thread – Thread for feeding the input queue for this iterator to manage starting and stopping appropriately.
workers – Sequence of worker threads/processes for this iterator to manage starting and stopping appropriately.
daemon – If the managed threads/processes should be started as daemons.
- next() T_co
Return the next item from the iterator. When exhausted, raise StopIteration