eolearn.core.extra.ray

Module containing integrations with Ray framework

In order to use this module you have to install ray Python package.

class eolearn.core.extra.ray.RayExecutor(workflow, execution_kwargs, *, execution_names=None, save_logs=False, logs_folder='.', filesystem=None, logs_filter=None, logs_handler_factory=<class 'logging.FileHandler'>, raise_on_temporal_mismatch=False, ray_remote_kwargs=None)[source]

Bases: EOExecutor

A special type of EOExecutor that works with Ray framework

Parameters:
  • workflow (EOWorkflow) – A prepared instance of EOWorkflow class

  • execution_kwargs (Sequence[dict[EONode, dict[str, object]]]) – A list of dictionaries where each dictionary represents execution inputs for the workflow. EOExecutor will execute the workflow for each of the given dictionaries in the list. The content of such dictionary will be used as input_kwargs parameter in EOWorkflow.execution method. Check EOWorkflow.execution for definition of a dictionary structure.

  • execution_names (list[str] | None) – A list of execution names, which will be shown in execution report

  • save_logs (bool) – Flag used to specify if execution log files should be saved locally on disk

  • logs_folder (str) – A folder where logs and execution report should be saved. If filesystem parameter is defined the folder path should be relative to the filesystem.

  • filesystem (FS | None) – A filesystem object for saving logs and a report.

  • logs_filter (Filter | None) – An instance of a custom filter object that will filter certain logs from being written into logs. It works only if save_logs parameter is set to True.

  • logs_handler_factory (_HandlerFactoryType) –

    A callable class or function that initializes an instance of a logging Handler object. Its signature should support one of the following options:

    • A single parameter describing a full path to the log file.

    • Parameters path and filesystem where path to the log file is relative to the given filesystem object.

    The 2nd option is chosen only if filesystem parameter exists in the signature.

  • raise_on_temporal_mismatch (bool) – Treat TemporalDimensionWarning as an exception.

  • ray_remote_kwargs (dict[str, Any] | None) –

run(**tqdm_kwargs)[source]

Runs the executor using a Ray cluster

Before calling this method make sure to initialize a Ray cluster using ray.init.

Parameters:

tqdm_kwargs (Any) – Keyword arguments that will be propagated to tqdm progress bar.

Returns:

A list of EOWorkflow results

Return type:

list[eolearn.core.eoworkflow.WorkflowResults]

eolearn.core.extra.ray.parallelize_with_ray(function, *params, ray_remote_kwargs=None, **tqdm_kwargs)[source]

Parallelizes function execution with Ray.

Note that this function will automatically connect to a Ray cluster, if a connection wouldn’t exist yet. But it won’t automatically shut down the connection.

Parameters:
  • function (Callable[[InputType], OutputType]) – A normal function that is not yet decorated by ray.remote.

  • params (Iterable[InputType]) – Iterables of parameters that will be used with given function.

  • ray_remote_kwargs (dict[str, Any] | None) – Keyword arguments passed to ray.remote.

  • tqdm_kwargs (Any) – Keyword arguments that will be propagated to tqdm progress bar.

Returns:

A list of results in the order that corresponds with the order of the given input params.

Return type:

list[~OutputType]

eolearn.core.extra.ray.join_ray_futures(futures, **tqdm_kwargs)[source]

Resolves futures, monitors progress, and returns a list of results.

Parameters:
  • futures (list[ray.ObjectRef]) – A list of futures to be joined. Note that this list will be reduced into an empty list as a side effect of this function. This way Ray future objects will get cleared from memory already during the execution and this will free memory from Ray Plasma store. But this can be achieved only if future objects aren’t kept in memory outside futures list.

  • tqdm_kwargs (Any) – Keyword arguments that will be propagated to tqdm progress bar.

Returns:

A list of results in the order that corresponds with the order of the given input futures.

Return type:

list[Any]

eolearn.core.extra.ray.join_ray_futures_iter(futures, update_interval=0.5, **tqdm_kwargs)[source]

Resolves futures, monitors progress, and serves as an iterator over results.

Parameters:
  • futures (list[ray.ObjectRef]) – A list of futures to be joined. Note that this list will be reduced into an empty list as a side effect of this function. This way Ray future objects will get cleared from memory already during the execution and this will free memory from Ray Plasma store. But this can be achieved only if future objects aren’t kept in memory outside futures list.

  • update_interval (float) – A number of seconds to wait between consecutive updates of a progress bar.

  • tqdm_kwargs (Any) – Keyword arguments that will be propagated to tqdm progress bar.

Returns:

A generator that will be returning pairs (index, result) where index will define the position of future in the original list to which result belongs to.

Return type:

Generator[tuple[int, Any], None, None]