Parallelism

This package provides a parallel work mapping function called parallel_map() to allow for stream parallelism of arbitrary work functions using multi-threading or multi-processing. Which this method of parallelism is not universally the most efficient, it is intended to operate at it’s best under situations where work functions are dynamic or parallelism with streaming input is required.

Reference

smqtk_descriptors.utils.parallel.parallel_map(work_func: Callable[[...], T_co], *sequences: Iterable, **kwargs: Any) ParallelResultsIterator[T_co][source]

Generalized local parallelization helper for executing embarrassingly parallel functions on an iterable of input data. This function then yields work results for data, optionally in the order that they were provided to this function.

By default, we act like itertools.izip in regards to input sequences, whereby we stop performing work as soon as one of the input sequences is exhausted. The optional keyword argument fill_void may be specified to enable sequence handling like itertools.zip_longest where the longest sequence determines what is iterated, and the value given to fill_void is used as the fill value.

This is intended to be able to replace multiprocessing.pool.Pool and multiprocessing.pool.ThreadPool uses with the added benefit of:

  • No set-up or clean-up needed

  • No performance loss compared to multiprocessing.pool classes for non-trivial work functions (like IO operations).

  • We can iterate results as they are ready (optionally in order of input)

  • Lambda or on-the-fly function can be provided as the work function when using multiprocessing.

  • Buffered input/output queues so that mapping work of a very large input set doesn’t overrun your memory (e.g. iterating over many large vectors/matrices).

This function is, however, slower than multiprocessing.pool classes for trivial functions, like using the function ord over a set of characters.

Input data given to sequences must be picklable in order to transport to worker threads/processes.

Input Iteration and Results Buffering

The buffer factor, F, operates with the number of utilized cores, C, to create an upper bound on the times the input sequences are iterated and the number of work function outputs are held in memory at any given time.

The maximum number of input sequence items loaded at a time is floor(C * F) + C. This is due to the input work queue maxsize being set to floor(C*F) while there can be C workers could be utilizing their inputs to complete their work instances.

The maximum number of results queued is floor(C * F) + C. This is similarly due to the output result queue maxsize being set to floor(C * F) while there can be C workers blocked on putting values into a full results queue.

Sometimes its important to know how much farther ahead the input iterator(s) have yielded compared to the number of output results from the ParallelResultsIterator. For some yielded result at index N, the input iterator(s) next yielded item should be their index N + (2 * floor(C * F) + C). This is derived from the input work and output result queues maximally filled with at most floor(C * F) items and there being C workers working on, or attempting to queue results for, their current inputs. For example, if we have use C=4 and F=1.5, if result index N has just been yielded, then the input iterators are ready to yield their N + 16-th indexed item (2 * floor(4*1.5) + 4 = 2 * 6 + 4 = 16).

The above is only guaranteed no the ordered option is False, otherwise non-determinism in processing order can cause results for input items to return out of order, causing additional buffering in the heap used to ensure ordered output which, necessarily, has no size limits so as to not dead-lock.

param work_func:

Function that performs some work on input data, resulting in some returned value.

When in multiprocessing mode, this cannot be a local function or a transport error will occur when trying to move the function to the worker process.

param sequences:

Input data to apply to the given work_func function. If more than one sequence is given, the function is called with an argument list consisting of the corresponding item of each sequence. While we expect Iterable types to be provided here, they will only be observed by zip/zip_longest at most once. See iteration rules for zip/zip_longest for details.

param kwargs:

Optionally available keyword arguments are as follows:

  • fill_void
    • Optional value that, if specified, activates sequence handling like that of itertools.zip_longest, using the provided value as a fill-in for shorter sequences until the longest sequence is exhausted.

    • type: Any

    • default: No default

  • ordered
    • If results for input elements should be yielded in the same order as input elements. If False, we yield results as soon as they are collected.

    • type: bool

    • default: True

  • buffer_factor
    • Multiplier against the number of processes used to limit the growth size of the result queue coming from worker processes (int(cores * buffer_factor)). This is utilized so we don’t overrun our RAM buffering results.

    • type: float

    • default: 2.0

  • cores
    • Optional specification of the number of threads/cores to use. If None, we will attempt to use all available threads/cores.

    • type: None | int | long

    • default: None

  • use_multiprocessing
    • Whether or not to use discrete processes as the parallelization agent vs python threads.

    • type: bool

    • default: False

  • heart_beat
    • Interval at which workers check for operational messages while waiting on locks (e.g. waiting to push or pull messages). This ensures that workers are not left hanging, or hang the program, when and error or interruption occurs, or when waiting on an full edge. This must be >0.

    • type: float

    • default: 0.001

  • name
    • Optional string name for identifying workers and logging messages. None means no names are added.

    • type: str

    • default: None

  • daemon
    • Optional flag for if started threads/processes are flagged as daemonic. This is should probably nearly always be True (the default) otherwise related threads/processes can hang the main process.

    • type: bool

    • default: True

return:

A new parallel results iterator that starts work on the input iterable when iterated.

Example

>>> import math
>>> result_iter = parallel_map(math.factorial, range(10),
...                            use_multiprocessing=True)
>>> sorted(result_iter)
[1, 1, 2, 6, 24, 120, 720, 5040, 40320, 362880]
class smqtk_descriptors.utils.parallel.ParallelResultsIterator(name: str | None, ordered: bool, is_multiprocessing: bool, heart_beat: float, work_queue: Queue | Queue, results_queue: Queue | Queue, feeder_thread: _FeedQueueThread, workers: Sequence[_WorkerThread | _WorkerProcess], daemon: bool)[source]

Iterator return from a parallel mapping job, managing workers and output results queue consumption.

A parallel work mapping jobs may be canceled through this object.

Parameters:
  • name – String name to attribute to this iterator. May be None.

  • ordered – If this results iterator should yield results in a congruent order to the input parameter sequences. If this is false then this results iterator will yield results as soon as they are available regardless of the input parameter sequence order.

  • is_multiprocessing – If workers are processes vs. threads. When this is true, extra steps are taken to appropriately shutdown processes.

  • heart_beat – How long in seconds we wait when polling for data on the results queue before momentarily giving up to allow a cycle of the loop. This is important in allowing an external signal to indicate we should stop iterating (prevents hanging on getting the next result value).

  • work_queue – Queue into which work is placed by the feeder thread. This object is responsible for cleaning up this queue, if applicable, upon iteration termination.

  • results_queue – Queue from which work results are pulled. This object is responsible for cleaning up this queue, if applicable, upon iteration termination.

  • feeder_thread – Thread for feeding the input queue for this iterator to manage starting and stopping appropriately.

  • workers – Sequence of worker threads/processes for this iterator to manage starting and stopping appropriately.

  • daemon – If the managed threads/processes should be started as daemons.

clean_up() None[source]

Clean up any live resources if we haven’t done so already.

next() T_co

Return the next item from the iterator. When exhausted, raise StopIteration

results_q_get() Any[source]

Attempts to get something from the results queue.

Raises:

StopIteration – when we’ve been told to stop.

Returns:

Single result from the results queue.

start_workers() None[source]

Start worker threads/processes.

stop() None[source]

Stop this iterator.

This does not clean up resources (see clean_up for that).

stopped() bool[source]
Returns:

if this iterator has been stopped