Source code for eolearn.core.eoexecution

"""
The module handles execution and monitoring of workflows. It enables executing a workflow multiple times and in
parallel. It monitors execution times and handles any error that might occur in the process. At the end it generates a
report which contains summary of the workflow and process of execution.

All this is implemented in EOExecutor class.

Copyright (c) 2017- Sinergise and contributors
For the full list of contributors, see the CREDITS file in the root directory of this source tree.

This source code is licensed under the MIT license, see the LICENSE file in the root directory of this source tree.
"""

from __future__ import annotations

import concurrent.futures
import datetime as dt
import inspect
import itertools as it
import logging
import threading
import warnings
from dataclasses import dataclass
from logging import FileHandler, Filter, Handler, Logger
from typing import Any, Callable, Iterable, Protocol, Union

import fs
from fs.base import FS
from typing_extensions import deprecated

from .eonode import EONode
from .eoworkflow import EOWorkflow, WorkflowResults
from .exceptions import EODeprecationWarning, EORuntimeWarning, TemporalDimensionWarning
from .utils.fs import get_base_filesystem_and_path, get_full_path, pickle_fs, unpickle_fs
from .utils.logging import LogFileFilter
from .utils.parallelize import parallelize


class _HandlerWithFsFactoryType(Protocol):
    """Type definition for a callable that accepts a path and a filesystem object"""

    def __call__(self, path: str, filesystem: FS, **kwargs: Any) -> Handler: ...


# pylint: disable=invalid-name
_HandlerFactoryType = Union[Callable[[str], Handler], _HandlerWithFsFactoryType]


@dataclass(frozen=True)
class _ProcessingData:
    """Data to be used in EOExecutor processing. This will be passed to a process pool, so everything has to be
    serializable with pickle."""

    workflow: EOWorkflow
    workflow_kwargs: dict[EONode, dict[str, object]]
    pickled_filesystem: bytes
    log_path: str | None
    filter_logs_by_thread: bool
    logs_filter: Filter | None
    logs_handler_factory: _HandlerFactoryType
    raise_on_temporal_mismatch: bool


@dataclass(frozen=True)
class _ExecutionRunParams:
    """Parameters that are used during execution run."""

    workers: int | None
    multiprocess: bool
    tqdm_kwargs: dict[str, Any]


[docs]class EOExecutor: """Simultaneously executes a workflow with different input arguments. In the process it monitors execution and handles errors. It can also save logs and create a html report about each execution. """ REPORT_FILENAME = "report.html" STATS_START_TIME = "start_time" STATS_END_TIME = "end_time" def __init__( self, workflow: EOWorkflow, execution_kwargs: Iterable[dict[EONode, dict[str, object]]], *, execution_names: Iterable[str] | None = None, save_logs: bool = False, logs_folder: str = ".", filesystem: FS | None = None, logs_filter: Filter | None = None, logs_handler_factory: _HandlerFactoryType = FileHandler, raise_on_temporal_mismatch: bool = False, ): """ :param workflow: A prepared instance of EOWorkflow class :param execution_kwargs: A list of dictionaries where each dictionary represents execution inputs for the workflow. `EOExecutor` will execute the workflow for each of the given dictionaries in the list. The content of such dictionary will be used as `input_kwargs` parameter in `EOWorkflow.execution` method. Check `EOWorkflow.execution` for definition of a dictionary structure. :param execution_names: A list of execution names, which will be shown in execution report :param save_logs: Flag used to specify if execution log files should be saved locally on disk :param logs_folder: A folder where logs and execution report should be saved. If `filesystem` parameter is defined the folder path should be relative to the filesystem. :param filesystem: A filesystem object for saving logs and a report. :param logs_filter: An instance of a custom filter object that will filter certain logs from being written into logs. It works only if save_logs parameter is set to True. :param logs_handler_factory: A callable class or function that initializes an instance of a logging `Handler` object. Its signature should support one of the following options: - A single parameter describing a full path to the log file. - Parameters `path` and `filesystem` where path to the log file is relative to the given `filesystem` object. The 2nd option is chosen only if `filesystem` parameter exists in the signature. :param raise_on_temporal_mismatch: Treat `TemporalDimensionWarning` as an exception. """ self.workflow = workflow self.execution_kwargs = self._parse_and_validate_execution_kwargs(execution_kwargs) self.execution_names = self._parse_execution_names(execution_names, self.execution_kwargs) self.save_logs = save_logs self.filesystem, self.logs_folder = self._parse_logs_filesystem(filesystem, logs_folder) self.logs_filter = logs_filter self.logs_handler_factory = logs_handler_factory self.raise_on_temporal_mismatch = raise_on_temporal_mismatch self.start_time: dt.datetime | None = None self.report_folder: str | None = None self.general_stats: dict[str, object] = {} self.execution_results: list[WorkflowResults] = [] @staticmethod def _parse_and_validate_execution_kwargs( execution_kwargs: Iterable[dict[EONode, dict[str, object]]] ) -> list[dict[EONode, dict[str, object]]]: """Parses and validates execution arguments provided by user and raises an error if something is wrong.""" for input_kwargs in execution_kwargs: EOWorkflow.validate_input_kwargs(input_kwargs) return list(execution_kwargs) @staticmethod def _parse_execution_names(execution_names: Iterable[str] | None, execution_kwargs: list) -> list[str]: """Parses a list of execution names.""" if execution_names is None: return [str(num) for num in range(1, len(execution_kwargs) + 1)] execution_names = list(execution_names) if len(execution_names) != len(execution_kwargs): raise ValueError("Parameter 'execution_names' has to be of the same size as `execution_kwargs`.") return execution_names @staticmethod def _parse_logs_filesystem(filesystem: FS | None, logs_folder: str) -> tuple[FS, str]: """Ensures a filesystem and a file path relative to it.""" if filesystem is None: return get_base_filesystem_and_path(logs_folder) return filesystem, logs_folder
[docs] def run(self, workers: int | None = 1, multiprocess: bool = True, **tqdm_kwargs: Any) -> list[WorkflowResults]: """Runs the executor with n workers. :param workers: Maximum number of workflows which will be executed in parallel. Default value is `1` which will execute workflows consecutively. If set to `None` the number of workers will be the number of processors of the system. :param multiprocess: If `True` it will use `concurrent.futures.ProcessPoolExecutor` which will distribute workflow executions among multiple processors. If `False` it will use `concurrent.futures.ThreadPoolExecutor` which will distribute workflow among multiple threads. However, even when `multiprocess=False`, tasks from workflow could still be using multiple processors. This parameter is used especially because certain task cannot run with `concurrent.futures.ProcessPoolExecutor`. In case of `workers=1` this parameter is ignored and workflows will be executed consecutively. :param tqdm_kwargs: Keyword arguments that will be propagated to `tqdm` progress bar. :return: A list of EOWorkflow results """ self.start_time = dt.datetime.now() self.report_folder = fs.path.combine( self.logs_folder, f'eoexecution-report-{self.start_time.strftime("%Y_%m_%d-%H_%M_%S")}' ) if self.save_logs: self.filesystem.makedirs(self.report_folder, recreate=True) log_paths = self.get_log_paths(full_path=False) if self.save_logs else it.repeat(None) filter_logs_by_thread = not multiprocess and workers is not None and workers > 1 processing_args = [ _ProcessingData( workflow=self.workflow, workflow_kwargs=workflow_kwargs, pickled_filesystem=pickle_fs(self.filesystem), log_path=log_path, filter_logs_by_thread=filter_logs_by_thread, logs_filter=self.logs_filter, logs_handler_factory=self.logs_handler_factory, raise_on_temporal_mismatch=self.raise_on_temporal_mismatch, ) for workflow_kwargs, log_path in zip(self.execution_kwargs, log_paths) ] run_params = _ExecutionRunParams(workers=workers, multiprocess=multiprocess, tqdm_kwargs=tqdm_kwargs) full_execution_results = self._run_execution(processing_args, run_params) self.execution_results = [results.drop_outputs() for results in full_execution_results] self.general_stats = self._prepare_general_stats(workers) return full_execution_results
def _run_execution( self, processing_args: list[_ProcessingData], run_params: _ExecutionRunParams ) -> list[WorkflowResults]: """Parallelizes the execution for each item of processing_args list.""" return parallelize( self._execute_workflow, processing_args, workers=run_params.workers, multiprocess=run_params.multiprocess, **run_params.tqdm_kwargs, ) @classmethod def _try_add_logging( cls, log_path: str | None, pickled_filesystem: bytes, filter_logs_by_thread: bool, logs_filter: Filter | None, logs_handler_factory: _HandlerFactoryType, ) -> tuple[Logger | None, Handler | None]: """Adds a handler to a logger and returns them both. In case this fails it shows a warning.""" if log_path: try: logger = logging.getLogger() logger.setLevel(logging.DEBUG) handler = cls._build_log_handler( log_path, pickled_filesystem, filter_logs_by_thread, logs_filter, logs_handler_factory ) logger.addHandler(handler) return logger, handler except BaseException as exception: warnings.warn(f"Failed to start logging with exception: {exception!r}", category=EORuntimeWarning) return None, None @classmethod def _try_remove_logging(cls, log_path: str | None, logger: Logger | None, handler: Handler | None) -> None: """Removes a handler from a logger in case that handler exists.""" if log_path and logger and handler: try: handler.close() logger.removeHandler(handler) except BaseException as exception: warnings.warn(f"Failed to end logging with exception: {exception!r}", category=EORuntimeWarning) @classmethod def _execute_workflow(cls, data: _ProcessingData) -> WorkflowResults: """Handles a single execution of a workflow.""" logger, handler = cls._try_add_logging( data.log_path, data.pickled_filesystem, data.filter_logs_by_thread, data.logs_filter, data.logs_handler_factory, ) with warnings.catch_warnings(): if data.raise_on_temporal_mismatch: warnings.simplefilter("error", TemporalDimensionWarning) results = data.workflow.execute(data.workflow_kwargs, raise_errors=False) cls._try_remove_logging(data.log_path, logger, handler) return results @staticmethod def _build_log_handler( log_path: str, pickled_filesystem: bytes, filter_logs_by_thread: bool, logs_filter: Filter | None, logs_handler_factory: _HandlerFactoryType, ) -> Handler: """Provides object which handles logs.""" filesystem = unpickle_fs(pickled_filesystem) factory_signature = inspect.signature(logs_handler_factory) if "filesystem" in factory_signature.parameters: handler = logs_handler_factory(log_path, filesystem=filesystem) # type: ignore[call-arg] else: full_path = get_full_path(filesystem, log_path) handler = logs_handler_factory(full_path) # type: ignore[call-arg] if not handler.formatter: formatter = logging.Formatter("%(asctime)s %(name)-12s %(levelname)-8s %(message)s") handler.setFormatter(formatter) if filter_logs_by_thread: handler.addFilter(LogFileFilter(threading.current_thread().name)) if logs_filter: handler.addFilter(logs_filter) return handler def _prepare_general_stats(self, workers: int | None) -> dict[str, object]: """Prepares a dictionary with a general statistics about executions.""" failed_count = sum(results.workflow_failed() for results in self.execution_results) return { self.STATS_START_TIME: self.start_time, self.STATS_END_TIME: dt.datetime.now(), "finished": len(self.execution_results) - failed_count, "failed": failed_count, "workers": workers, }
[docs] def get_successful_executions(self) -> list[int]: """Returns a list of IDs of successful executions. The IDs are integers from interval `[0, len(execution_kwargs) - 1]`, sorted in increasing order. """ return [idx for idx, results in enumerate(self.execution_results) if not results.workflow_failed()]
[docs] def get_failed_executions(self) -> list[int]: """Returns a list of IDs of failed executions. The IDs are integers from interval `[0, len(execution_kwargs) - 1]`, sorted in increasing order. """ return [idx for idx, results in enumerate(self.execution_results) if results.workflow_failed()]
[docs] def get_report_path(self, full_path: bool = True) -> str: """Returns the filename and file path of the report. :param full_path: Return full absolute paths or paths relative to the filesystem object. :return: Report filename """ if self.report_folder is None: raise RuntimeError("Executor has to be run before the report path is created.") report_path = fs.path.combine(self.report_folder, self.REPORT_FILENAME) return get_full_path(self.filesystem, report_path) if full_path else report_path
[docs] def make_report(self, include_logs: bool = True) -> None: """Makes a html report and saves it into the same folder where logs are stored. :param include_logs: If `True` log files will be loaded into the report file. If `False` they will be just referenced with a link to a log file. In case of a very large number of executions it is recommended that this parameter is set to `False` to avoid compiling a too large report file. """ # pylint: disable=import-outside-toplevel,raise-missing-from try: from eolearn.visualization.eoexecutor import EOExecutorVisualization except ImportError: raise RuntimeError( "Dependencies `eo-learn[VISUALIZATION]` have to be installed in order to create EOExecutor reports." ) return EOExecutorVisualization(self).make_report(include_logs=include_logs)
[docs] def get_log_paths(self, full_path: bool = True) -> list[str]: """Returns a list of file paths containing logs. :param full_path: Return full absolute paths or paths relative to the filesystem object. :return: A list of paths to log files. """ if self.report_folder is None: raise RuntimeError("Executor has to be run before log paths are created.") log_paths = [fs.path.combine(self.report_folder, f"eoexecution-{name}.log") for name in self.execution_names] return [get_full_path(self.filesystem, path) for path in log_paths] if full_path else log_paths
[docs] @deprecated("The method `read_logs` has been deprecated.", category=EODeprecationWarning) def read_logs(self) -> list[str | None]: """Loads the content of log files if logs have been saved.""" if not self.save_logs: return [None] * len(self.execution_kwargs) log_paths = self.get_log_paths(full_path=False) with concurrent.futures.ThreadPoolExecutor() as executor: return list(executor.map(self._read_log_file, log_paths))
def _read_log_file(self, log_path: str) -> str: """Read a content of a log file.""" try: with self.filesystem.open(log_path, "r") as file_handle: return file_handle.read() except BaseException as exception: warnings.warn(f"Failed to load logs with exception: {exception!r}", category=EORuntimeWarning) return "Failed to load logs"