Source code for eolearn.core.utils.parallelize

"""
Utilities for a basic Python parallelization that build on top of `concurrent.futures` module from Standard Python
library.

Copyright (c) 2017- Sinergise and contributors
For the full list of contributors, see the CREDITS file in the root directory of this source tree.

This source code is licensed under the MIT license, see the LICENSE file in the root directory of this source tree.
"""

from __future__ import annotations

import concurrent.futures
import multiprocessing
from concurrent.futures import FIRST_COMPLETED, Executor, Future, ProcessPoolExecutor, ThreadPoolExecutor
from enum import Enum
from typing import TYPE_CHECKING, Any, Callable, Collection, Generator, Iterable, List, TypeVar, cast

from tqdm.auto import tqdm

if TYPE_CHECKING:
    from threading import Lock

    MULTIPROCESSING_LOCK: Lock | None = None
else:
    MULTIPROCESSING_LOCK = None

# pylint: disable=invalid-name
T = TypeVar("T")
FutureType = TypeVar("FutureType")
OutputType = TypeVar("OutputType")


class _ProcessingType(Enum):
    """Type of EOExecutor processing"""

    SINGLE_PROCESS = "single process"
    MULTIPROCESSING = "multiprocessing"
    MULTITHREADING = "multithreading"
    RAY = "ray"


def _decide_processing_type(workers: int | None, multiprocess: bool) -> _ProcessingType:
    """Decides processing type according to given parameters.

    :param workers: A number of workers to be used (either threads or processes). If a single worker is given it will
        always use the current thread and process.
    :param multiprocess: A flag to decide between multiple processes and multiple threads in a single process.
    :return: An enum defining a type of processing.
    """
    if workers == 1:
        return _ProcessingType.SINGLE_PROCESS
    return _ProcessingType.MULTIPROCESSING if multiprocess else _ProcessingType.MULTITHREADING


[docs]def parallelize( function: Callable[..., OutputType], *params: Iterable[Any], workers: int | None, multiprocess: bool = True, **tqdm_kwargs: Any, ) -> list[OutputType]: """Parallelizes the function on given parameters using the specified number of workers. :param function: A function to be parallelized. :param params: Sequences of parameters to be given to the function. It uses the same logic as Python `map` function. :param workers: Maximum number of time the function will be executed in parallel. :param multiprocess: If `True` it will use `concurrent.futures.ProcessPoolExecutor` which will distribute workflow executions among multiple processors. If `False` it will use `concurrent.futures.ThreadPoolExecutor` which will distribute workflow among multiple threads. In case of `workers=1` this parameter is ignored and workflows will be executed consecutively. :param tqdm_kwargs: Keyword arguments that will be propagated to `tqdm` progress bar. :return: A list of function results. """ if not params: return [] processing_type = _decide_processing_type(workers=workers, multiprocess=multiprocess) if processing_type is _ProcessingType.SINGLE_PROCESS: size = len(params[0] if isinstance(params[0], Collection) else list(params[0])) return list(tqdm(map(function, *params), total=size, **tqdm_kwargs)) if processing_type is _ProcessingType.MULTITHREADING: with ThreadPoolExecutor(max_workers=workers) as thread_executor: return submit_and_monitor_execution(thread_executor, function, *params, **tqdm_kwargs) # pylint: disable=global-statement global MULTIPROCESSING_LOCK try: MULTIPROCESSING_LOCK = multiprocessing.Manager().Lock() with ProcessPoolExecutor(max_workers=workers) as process_executor: return submit_and_monitor_execution(process_executor, function, *params, **tqdm_kwargs) finally: MULTIPROCESSING_LOCK = None
[docs]def execute_with_mp_lock(function: Callable[..., OutputType], *args: Any, **kwargs: Any) -> OutputType: """A helper utility function that executes a given function with multiprocessing lock if the process is being executed in a multiprocessing mode. :param function: A function :param args: Function's positional arguments :param kwargs: Function's keyword arguments """ if multiprocessing.current_process().name == "MainProcess" or MULTIPROCESSING_LOCK is None: return function(*args, **kwargs) # pylint: disable=not-context-manager with MULTIPROCESSING_LOCK: return function(*args, **kwargs)
[docs]def submit_and_monitor_execution( executor: Executor, function: Callable[..., OutputType], *params: Iterable[Any], **tqdm_kwargs: Any, ) -> list[OutputType]: """Performs the execution parallelization and monitors the process using a progress bar. :param executor: An object that performs parallelization. :param function: A function to be parallelized. :param params: Each element in a sequence are parameters for a single call of `function`. :return: A list of results in the same order as input parameters given by `executor_params`. """ futures = [executor.submit(function, *function_params) for function_params in zip(*params)] return join_futures(futures, **tqdm_kwargs)
[docs]def join_futures(futures: list[Future], **tqdm_kwargs: Any) -> list[Any]: """Resolves futures, monitors progress, and returns a list of results. :param futures: A list of futures to be joined. Note that this list will be reduced into an empty list as a side effect of this function. This way future objects will get cleared from memory already during the execution which will free some extra memory. But this can be achieved only if future objects aren't kept in memory outside `futures` list. :param tqdm_kwargs: Keyword arguments that will be propagated to `tqdm` progress bar. :return: A list of results in the order that corresponds with the order of the given input `futures`. """ results: list[Any | None] = [None] * len(futures) for position, result in join_futures_iter(futures, **tqdm_kwargs): results[position] = result return cast(List[Any], results)
[docs]def join_futures_iter( futures: list[Future], update_interval: float = 0.5, **tqdm_kwargs: Any ) -> Generator[tuple[int, Any], None, None]: """Resolves futures, monitors progress, and serves as an iterator over results. :param futures: A list of futures to be joined. Note that this list will be reduced into an empty list as a side effect of this function. This way future objects will get cleared from memory already during the execution which will free some extra memory. But this can be achieved only if future objects aren't kept in memory outside `futures` list. :param update_interval: A number of seconds to wait between consecutive updates of a progress bar. :param tqdm_kwargs: Keyword arguments that will be propagated to `tqdm` progress bar. :return: A generator that will be returning pairs `(index, result)` where `index` will define the position of future in the original list to which `result` belongs to. """ def _wait_function(remaining_futures: Collection[Future]) -> tuple[Collection[Future], Collection[Future]]: return concurrent.futures.wait(remaining_futures, timeout=float(update_interval), return_when=FIRST_COMPLETED) def _get_result(future: Future) -> Any: return future.result() return _base_join_futures_iter(_wait_function, _get_result, futures, **tqdm_kwargs)
def _base_join_futures_iter( wait_function: Callable[[Collection[FutureType]], tuple[Collection[FutureType], Collection[FutureType]]], get_result_function: Callable[[FutureType], OutputType], futures: list[FutureType], **tqdm_kwargs: Any, ) -> Generator[tuple[int, OutputType], None, None]: """A generalized utility function that resolves futures, monitors progress, and serves as an iterator over results.""" remaining_futures: Collection[FutureType] = _make_copy_and_empty_given(futures) id_to_position_map = {id(future): index for index, future in enumerate(remaining_futures)} with tqdm(total=len(remaining_futures), **tqdm_kwargs) as pbar: while remaining_futures: done, remaining_futures = wait_function(remaining_futures) for future in done: result = get_result_function(future) pbar.update(1) yield id_to_position_map[id(future)], result def _make_copy_and_empty_given(items: list[T]) -> list[T]: """Removes items from the given list and returns its copy. The side effect of removing items is intentional.""" items_copy = items[:] while items: items.pop() return items_copy