Source code for smqtk_descriptors.utils.parallel

import abc
from itertools import zip_longest
import heapq
import logging
import multiprocessing
import multiprocessing.queues
import multiprocessing.synchronize
import queue
import sys
import threading
import traceback
from typing import Any, Callable, Iterable, Iterator, List, Optional, Sequence, Type, Union, TypeVar


LOG = logging.getLogger(__name__)
T_co = TypeVar("T_co", covariant=True)


[docs]def parallel_map( work_func: Callable[..., T_co], *sequences: Iterable, **kwargs: Any ) -> "ParallelResultsIterator[T_co]": """ Generalized local parallelization helper for executing embarrassingly parallel functions on an iterable of input data. This function then yields work results for data, optionally in the order that they were provided to this function. By default, we act like ``itertools.izip`` in regards to input sequences, whereby we stop performing work as soon as one of the input sequences is exhausted. The optional keyword argument ``fill_void`` may be specified to enable sequence handling like ``itertools.zip_longest`` where the longest sequence determines what is iterated, and the value given to ``fill_void`` is used as the fill value. This is intended to be able to replace ``multiprocessing.pool.Pool`` and ``multiprocessing.pool.ThreadPool`` uses with the added benefit of: - No set-up or clean-up needed - No performance loss compared to ``multiprocessing.pool`` classes for non-trivial work functions (like IO operations). - We can iterate results as they are ready (optionally in order of input) - Lambda or on-the-fly function can be provided as the work function when using multiprocessing. - Buffered input/output queues so that mapping work of a very large input set doesn't overrun your memory (e.g. iterating over many large vectors/matrices). This function is, however, slower than multiprocessing.pool classes for trivial functions, like using the function ``ord`` over a set of characters. Input data given to ``sequences`` must be picklable in order to transport to worker threads/processes. Input Iteration and Results Buffering ------------------------------------- The buffer factor, ``F``, operates with the number of utilized cores, ``C``, to create an upper bound on the times the input sequences are iterated and the number of work function outputs are held in memory at any given time. The maximum number of input sequence items loaded at a time is ``floor(C * F) + C``. This is due to the input work queue ``maxsize`` being set to ``floor(C*F)`` while there can be ``C`` workers could be utilizing their inputs to complete their work instances. The maximum number of results queued is ``floor(C * F) + C``. This is similarly due to the output result queue maxsize being set to ``floor(C * F)`` while there can be ``C`` workers blocked on putting values into a full results queue. Sometimes its important to know how much farther ahead the input iterator(s) have yielded compared to the number of output results from the ``ParallelResultsIterator``. For some yielded result at index ``N``, the input iterator(s) next yielded item should be their index ``N + (2 * floor(C * F) + C)``. This is derived from the input work and output result queues maximally filled with at most ``floor(C * F)`` items and there being ``C`` workers working on, or attempting to queue results for, their current inputs. For example, if we have use ``C=4`` and ``F=1.5``, if result index N has just been yielded, then the input iterators are ready to yield their ``N + 16``-th indexed item (``2 * floor(4*1.5) + 4 = 2 * 6 + 4 = 16``). The above is only guaranteed no the ``ordered`` option is ``False``, otherwise non-determinism in processing order can cause results for input items to return out of order, causing additional buffering in the heap used to ensure ordered output which, necessarily, has no size limits so as to not dead-lock. :param work_func: Function that performs some work on input data, resulting in some returned value. When in multiprocessing mode, this cannot be a local function or a transport error will occur when trying to move the function to the worker process. :param sequences: Input data to apply to the given ``work_func`` function. If more than one sequence is given, the function is called with an argument list consisting of the corresponding item of each sequence. While we expect Iterable types to be provided here, they will only be observed by `zip`/`zip_longest` at most once. See iteration rules for `zip`/`zip_longest` for details. :param kwargs: Optionally available keyword arguments are as follows: - fill_void - Optional value that, if specified, activates sequence handling like that of ``itertools.zip_longest``, using the provided value as a fill-in for shorter sequences until the longest sequence is exhausted. - type: Any - default: No default - ordered - If results for input elements should be yielded in the same order as input elements. If False, we yield results as soon as they are collected. - type: bool - default: True - buffer_factor - Multiplier against the number of processes used to limit the growth size of the result queue coming from worker processes (``int(cores * buffer_factor)``). This is utilized so we don't overrun our RAM buffering results. - type: float - default: 2.0 - cores - Optional specification of the number of threads/cores to use. If None, we will attempt to use all available threads/cores. - type: None | int | long - default: None - use_multiprocessing - Whether or not to use discrete processes as the parallelization agent vs python threads. - type: bool - default: False - heart_beat - Interval at which workers check for operational messages while waiting on locks (e.g. waiting to push or pull messages). This ensures that workers are not left hanging, or hang the program, when and error or interruption occurs, or when waiting on an full edge. This must be >0. - type: float - default: 0.001 - name - Optional string name for identifying workers and logging messages. ``None`` means no names are added. - type: str - default: None - daemon - Optional flag for if started threads/processes are flagged as daemonic. This is should probably nearly always be True (the default) otherwise related threads/processes can hang the main process. - type: bool - default: True :return: A new parallel results iterator that starts work on the input iterable when iterated. Example ------- >>> import math >>> result_iter = parallel_map(math.factorial, range(10), ... use_multiprocessing=True) >>> sorted(result_iter) [1, 1, 2, 6, 24, 120, 720, 5040, 40320, 362880] """ # kwargs cores: Optional[int] = kwargs.get('cores', None) ordered = kwargs.get('ordered', True) buffer_factor = kwargs.get('buffer_factor', 2.0) use_multiprocessing = kwargs.get('use_multiprocessing', False) heart_beat = kwargs.get('heart_beat', 0.001) fill_activate = 'fill_void' in kwargs fill_value = kwargs.get('fill_void', None) name = kwargs.get('name', None) daemon = kwargs.get('daemon', True) if name: log = logging.getLogger(__name__ + '[%s]' % name) else: log = logging.getLogger(__name__) if heart_beat <= 0: raise ValueError("heart_beat must be >0.") if cores is None or cores <= 0: cores = multiprocessing.cpu_count() log.debug("Using all cores (%d)", cores) else: log.debug("Only using %d cores", cores) # Choose parallel types queue_t: Union[Type[multiprocessing.Queue], Type[queue.Queue]] worker_t: Type[Union[_WorkerThread, _WorkerProcess]] if use_multiprocessing: queue_t = multiprocessing.Queue worker_t = _WorkerProcess else: queue_t = queue.Queue worker_t = _WorkerThread # Type ignoring these calls due to a mypy issue where it's deducing the # type to `<nothing>` for some reason. queue_work = queue_t(maxsize=int(cores * buffer_factor)) # type: ignore queue_results = queue_t(maxsize=int(cores * buffer_factor)) # type: ignore log.log(1, "Constructing worker processes") workers = [worker_t(name, i, work_func, queue_work, queue_results, heart_beat) for i in range(cores)] log.log(1, "Constructing feeder thread") feeder_thread = _FeedQueueThread(name, sequences, queue_work, len(workers), heart_beat, fill_activate, fill_value) return ParallelResultsIterator(name, ordered, use_multiprocessing, heart_beat, queue_work, queue_results, feeder_thread, workers, daemon)
class _TerminalPacket (object): """ Signals a terminal message """ def _is_terminal(p: Any) -> bool: """ Check if a given packet is a terminal element. :param p: element to check :return: If ``p`` is a terminal element """ return isinstance(p, _TerminalPacket)
[docs]class ParallelResultsIterator (Iterator[T_co]): """ Iterator return from a parallel mapping job, managing workers and output results queue consumption. A parallel work mapping jobs may be canceled through this object. :param name: String name to attribute to this iterator. May be None. :param ordered: If this results iterator should yield results in a congruent order to the input parameter sequences. If this is `false` then this results iterator will yield results as soon as they are available regardless of the input parameter sequence order. :param is_multiprocessing: If workers are processes vs. threads. When this is true, extra steps are taken to appropriately shutdown processes. :param heart_beat: How long in seconds we wait when polling for data on the results queue before momentarily giving up to allow a cycle of the loop. This is important in allowing an external signal to indicate we should stop iterating (prevents hanging on getting the next result value). :param work_queue: Queue into which work is placed by the feeder thread. This object is responsible for cleaning up this queue, if applicable, upon iteration termination. :param results_queue: Queue from which work results are pulled. This object is responsible for cleaning up this queue, if applicable, upon iteration termination. :param feeder_thread: Thread for feeding the input queue for this iterator to manage starting and stopping appropriately. :param workers: Sequence of worker threads/processes for this iterator to manage starting and stopping appropriately. :param daemon: If the managed threads/processes should be started as daemons. """ def __init__( self, name: Optional[str], ordered: bool, is_multiprocessing: bool, heart_beat: float, work_queue: Union[queue.Queue, multiprocessing.Queue], results_queue: Union[queue.Queue, multiprocessing.Queue], feeder_thread: "_FeedQueueThread", workers: Sequence[Union["_WorkerThread", "_WorkerProcess"]], daemon: bool ): self.name = name self._l_prefix: str = f"[PRI{(name and f'::{name}') or ''}]" self.ordered = ordered if self.ordered: LOG.debug(f"{self._l_prefix} Maintaining result iteration order " f"based on input order") self.heart_beat = heart_beat self.is_multiprocessing = is_multiprocessing self.work_queue = work_queue self.results_queue = results_queue self.feeder_thread = feeder_thread self.workers = workers self.daemon = daemon self.has_started_workers = False self.has_cleaned_up = False self.found_terminals = 0 self.result_heap: List = [] self.next_index = 0 self.stop_event = threading.Event() self.stop_event_lock = threading.Lock() def __repr__(self) -> str: sfx = '' if self.name: sfx = '[' + self.name + ']' return "<%(module)s.%(class)s%(sfx)s at %(address)s>" % { "module": self.__module__, "class": self.__class__.__name__, "sfx": sfx, "address": hex(id(self)), } def __next__(self) -> T_co: # TORCH DETAIL -- Only allow worker shutdown *AFTER* results yielded or # error has occurred. # - leave feeder and workers alive when they are done, let this class # manage stopping/joining them when *this* is done. # - Achieved via added "master_stop" event in feeder/workers. l_prefix = self._l_prefix try: if not self.has_started_workers: self.start_workers() while (self.found_terminals < len(self.workers) and not self.stopped()): packet = self.results_q_get() if _is_terminal(packet): LOG.log(1, f'{l_prefix} Found terminal') self.found_terminals += 1 elif isinstance(packet[0], BaseException): ex, formatted_exc = packet LOG.warning(f'{l_prefix} Received exception: ' f'{ex}\n{formatted_exc}') raise ex else: i, result = packet if self.ordered: heapq.heappush(self.result_heap, (i, result)) if self.result_heap[0][0] == self.next_index: _, result = heapq.heappop(self.result_heap) self.next_index += 1 return result else: return result # Go through heap if there's anything in it if self.result_heap: _, result = heapq.heappop(self.result_heap) return result # Nothing left if not self.stopped(): LOG.log(1, f"{l_prefix} Asserting empty queues on what looks " f"like a full iteration.") self.assert_queues_empty() raise StopIteration() # If anything bad happens, stop iteration and workers. # - Using BaseException to also catch things like KeyboardInterrupt # and other exceptions that do not descend from Exception. # - This also catches the in-due-course StopIteration exception, thus # this is also the "normal" stop route. except BaseException as ex: LOG.log(1, f"{l_prefix} Stopping iteration due to exception: " f"({type(ex)}) {str(ex)}") self.stop() raise next = __next__
[docs] def start_workers(self) -> None: """ Start worker threads/processes. """ LOG.log(1, f"{self._l_prefix} Starting worker processes") for w in self.workers: w.daemon = self.daemon w.start() LOG.log(1, f"{self._l_prefix} Starting feeder thread") self.feeder_thread.daemon = self.daemon self.feeder_thread.start() self.has_started_workers = True
[docs] def clean_up(self) -> None: """ Clean up any live resources if we haven't done so already. """ if self.has_started_workers and not self.has_cleaned_up: l_prefix = self._l_prefix LOG.log(1, f"{l_prefix} Stopping feeder thread") self.feeder_thread.stop() self.feeder_thread.master_stop() self.feeder_thread.join() LOG.log(1, f"{l_prefix} Stopping workers") for w in self.workers: w.stop() w.master_stop() w.join() if self.is_multiprocessing: LOG.log(1, f"{l_prefix} Closing/Joining process queues") for q in (self.work_queue, self.results_queue): assert isinstance(q, multiprocessing.queues.Queue) q.close() q.join_thread() self.has_cleaned_up = True
[docs] def stop(self) -> None: """ Stop this iterator. This does not clean up resources (see ``clean_up`` for that). """ with self.stop_event_lock: self.stop_event.set() self.clean_up()
[docs] def stopped(self) -> bool: """ :return: if this iterator has been stopped """ return self.stop_event.is_set()
[docs] def results_q_get(self) -> Any: """ Attempts to get something from the results queue. :raises StopIteration: when we've been told to stop. :returns: Single result from the results queue. """ while not self.stopped(): try: return self.results_queue.get(timeout=self.heart_beat) except queue.Empty: pass raise StopIteration()
def assert_queues_empty(self) -> None: # All work should be exhausted at this point if self.is_multiprocessing and sys.platform == 'darwin': # multiprocessing.Queue.qsize doesn't work on OSX # - Try to get something from each queue, expecting an empty # exception. # - multiprocessing shares the same exception as the queue module. try: self.work_queue.get(block=False) except queue.Empty: pass else: raise AssertionError("In queue not empty") try: self.results_queue.get(block=False) except queue.Empty: pass else: raise AssertionError("Out queue not empty") else: assert self.work_queue.qsize() == 0, \ "In queue not empty (%d)" % self.work_queue.qsize() assert self.results_queue.qsize() == 0, \ "Out queue not empty (%d)" % self.results_queue.qsize()
class _FeedQueueThread (threading.Thread): """ Helper thread for putting data into the work queue """ def __init__( self, name: Optional[str], arg_sequences: Sequence[Iterable], q: Union[queue.Queue, multiprocessing.Queue], num_terminal_packets: int, heart_beat: float, do_fill: bool, fill_value: Any ): """ :param name: Optional name for this feed queue thread. :param arg_sequences: Sequence of iterators that will feed input work arguments. While we expect Iterable types to be provided here, they will only be observed by `zip`/`zip_longest` at most once. See iteration rules for `zip`/`zip_longest` for details. :param q: Queue to put work argument sets into. :param num_terminal_packets: Number of terminal packets to put into the work queue upon completion of submitting real work. This should be the same number of workers feeding off of `q`. :param heart_beat: How long in seconds we wait for an individual put attempt into `q` before momentarily giving up to allow a cycle of the loop. This is important in allowing an external signal to indicate we should stop feeding efforts (prevents hanging on pushing values into `q`). :param do_fill: If we should fill in a certain value for the shorter input sequences along the same rules for `itertools.zip_longest`. :param fill_value: The value to fill with if `do_fill` is True. """ super().__init__(name=name) self._l_prefix: str = f"[FQT{(name and f'::{name}') or ''}]" self.arg_sequences = arg_sequences self.q = q self.num_terminal_packets = num_terminal_packets self.heart_beat = heart_beat self.do_fill = do_fill self.fill_value = fill_value self._stop_event = threading.Event() # Event marking actual close of the thread. # This should be only invoked by the ParallelResultsIterator when # performing resource cleanup. self._master_stop_event = threading.Event() def stop(self) -> None: self._stop_event.set() def master_stop(self) -> None: """ Actually flag the thread for runtime completion. """ self._master_stop_event.set() def stopped(self) -> bool: return self._stop_event.is_set() def run(self) -> None: l_prefix = self._l_prefix LOG.log(1, f"{l_prefix} Starting") if self.do_fill: _zip = zip_longest _zip_kwds = {'fillvalue': self.fill_value} else: # Ignoring that the callable here doesn't *exactly* match # zip_longest based on stubs: They match in the way that we use it # here. _zip = zip # type: ignore _zip_kwds = {} try: r = 0 for args in _zip(*self.arg_sequences, **_zip_kwds): self.q_put((r, args)) r += 1 # If we're told to stop, immediately quit out of processing if self.stopped(): LOG.log(1, f"{l_prefix} Told to stop prematurely") break # Transport back any exceptions raised except (Exception, KeyboardInterrupt) as ex: LOG.warning(f"{l_prefix} Caught exception {str(ex)}") self.q_put((ex, traceback.format_exc())) self.stop() else: LOG.log(1, f"{l_prefix} Sending in-queue terminal packets") for _ in range(self.num_terminal_packets): self.q_put(_TerminalPacket()) finally: # Explicitly stop any nested parallel maps for s in self.arg_sequences: if isinstance(s, ParallelResultsIterator): LOG.log(1, f"{l_prefix} Stopping nested parallel map: {s}") s.stop() # Await master stop LOG.log(1, f"{l_prefix} Waiting for master stop...") self._master_stop_event.wait() LOG.log(1, f"{l_prefix} Closing") def q_put(self, val: Any) -> None: """ Try to put the given value into the output queue until it is inserted (if it was previously full), or the stop signal was given. :param val: value to put into the output queue. """ put = False while not put and not self.stopped(): try: self.q.put(val, timeout=self.heart_beat) put = True except queue.Full: pass class _Worker(metaclass=abc.ABCMeta): def __init__( self, name: Optional[str], i: int, work_function: Callable, in_q: Union[queue.Queue, multiprocessing.Queue], out_q: Union[queue.Queue, multiprocessing.Queue], heart_beat: float ): """ Individual worker agent. :param name: Optional name for this worker. :param i: The integer index, >= 0, of this worker among active workers for this parallel iteration task. :param work_function: Callable function to invoke which generates some result value. :param in_q: Queue to draw work function input parameters from. :param out_q: Queue to output work results, or triggered exceptions, to. :param heart_beat: How long in seconds we wait when polling for data on the `in_q`, as well as for result put attempts into `out_q`, before momentarily giving up to allow a cycle of the loop. This is important in allowing an external signal to indicate we should stop working (prevents hanging on queue interactions). """ self._l_prefix: str = f"[Worker{(name and f'::{name}') or ''}::#{int(i)}]" self.i = i self.work_function = work_function self.in_q = in_q self.out_q = out_q self.heart_beat = heart_beat LOG.log(1, f"{self._l_prefix} Making process worker ({str(in_q)}, {str(out_q)})") self._stop_event = self._make_event() # Event marking actual close of the thread. # This should be only invoked by the ParallelResultsIterator when # performing resource cleanup. self._master_stop_event = self._make_event() @classmethod @abc.abstractmethod def _make_event(cls) -> Union[threading.Event, multiprocessing.synchronize.Event]: """ Generate an event type instance appropriate for the type of worker sub-classed. """ raise NotImplementedError() def stop(self) -> None: self._stop_event.set() def master_stop(self) -> None: """ Actually flag the worker for runtime completion. """ self._master_stop_event.set() def stopped(self) -> bool: return self._stop_event.is_set() def run(self) -> None: """ Perform work function on available data in the input queue. """ l_prefix = self._l_prefix try: packet = self.q_get() while not self.stopped(): if _is_terminal(packet): LOG.log(1, f"{l_prefix} sending terminal") self.q_put(packet) self.stop() elif isinstance(packet[0], Exception): # Pass exception along self.q_put(packet) self.stop() else: i, args = packet result = self.work_function(*args) self.q_put((i, result)) packet = self.q_get() # Transport back any exceptions raised except (Exception, KeyboardInterrupt) as ex: LOG.warning(f"{l_prefix} Caught exception {type(ex)}") self.q_put((ex, traceback.format_exc())) self.stop() except BaseException as ex: # Some exotic error occurred (can only be systemExit at this # point?). Register stopping and re-raise. LOG.log(1, f"Exotic error {type(ex)}: {ex}") self.stop() raise finally: LOG.log(1, f"{l_prefix} Waiting for master stop...") self._master_stop_event.wait() LOG.log(1, f"{l_prefix} Closing") def q_get(self) -> Any: """ Try to get a value from the queue while keeping an eye out for an exit request. :return: next value on the input queue """ while not self.stopped(): try: return self.in_q.get(timeout=self.heart_beat) except queue.Empty: pass def q_put(self, val: Any) -> None: """ Try to put the given value into the output queue while keeping an eye out for an exit request. :param val: value to put into the output queue. """ put = False while not put and not self.stopped(): try: self.out_q.put(val, timeout=self.heart_beat) put = True except queue.Full: pass class _WorkerProcess (_Worker, multiprocessing.Process): def __init__( self, name: Optional[str], i: int, work_function: Callable, in_q: Union[queue.Queue, multiprocessing.Queue], out_q: Union[queue.Queue, multiprocessing.Queue], heart_beat: float ): """ Constructor override to include multiprocessing.Process constructor super construction. See `_Worker` constructor doc-string for parameter documentation. """ multiprocessing.Process.__init__(self) _Worker.__init__(self, name, i, work_function, in_q, out_q, heart_beat) @classmethod def _make_event(cls) -> multiprocessing.synchronize.Event: return multiprocessing.Event() # The inheritance order should be sufficient to ensure the `_Worker.run` # method is used instead of `multiprocessing.Process.run`, but we are # explicit here just to be sure. run = _Worker.run class _WorkerThread (_Worker, threading.Thread): def __init__( self, name: Optional[str], i: int, work_function: Callable, in_q: Union[queue.Queue, multiprocessing.Queue], out_q: Union[queue.Queue, multiprocessing.Queue], heart_beat: float ): """ Constructor override to include threading.Thread constructor super construction. See `_Worker` constructor doc-string for parameter documentation. """ threading.Thread.__init__(self) _Worker.__init__(self, name, i, work_function, in_q, out_q, heart_beat) @classmethod def _make_event(cls) -> threading.Event: return threading.Event() # The inheritance order should be sufficient to ensure the `_Worker.run` # method is used instead of `threading.Thread.run`, but we are explicit # here just to be sure. run = _Worker.run