Source code for eolearn.io.sentinelhub_process

""" Input tasks that collect data from `Sentinel-Hub Process API
<https://docs.sentinel-hub.com/api/latest/api/process/>`__

Copyright (c) 2017- Sinergise and contributors
For the full list of contributors, see the CREDITS file in the root directory of this source tree.

This source code is licensed under the MIT license, see the LICENSE file in the root directory of this source tree.
"""

from __future__ import annotations

import datetime as dt
import logging
from abc import ABCMeta, abstractmethod
from typing import Any, Callable, Iterable, Tuple, cast

import numpy as np

from sentinelhub import (
    BBox,
    DataCollection,
    Geometry,
    MimeType,
    MosaickingOrder,
    ResamplingType,
    SentinelHubDownloadClient,
    SentinelHubRequest,
    SentinelHubSession,
    SHConfig,
    bbox_to_dimensions,
    filter_times,
    parse_time_interval,
)
from sentinelhub.api.catalog import get_available_timestamps
from sentinelhub.evalscript import generate_evalscript, parse_data_collection_bands
from sentinelhub.types import JsonDict, RawTimeIntervalType

from eolearn.core import EOPatch, EOTask, FeatureType
from eolearn.core.types import Feature, FeaturesSpecification
from eolearn.core.utils.parsing import parse_renamed_feature, parse_renamed_features

LOGGER = logging.getLogger(__name__)


[docs]class SentinelHubInputBaseTask(EOTask, metaclass=ABCMeta): """Base class for Processing API input tasks""" def __init__( self, data_collection: DataCollection, size: tuple[int, int] | None = None, resolution: float | tuple[float, float] | None = None, cache_folder: str | None = None, config: SHConfig | None = None, max_threads: int | None = None, upsampling: ResamplingType | None = None, downsampling: ResamplingType | None = None, session_loader: Callable[[], SentinelHubSession] | None = None, ): """ :param data_collection: A collection of requested satellite data. :param size: Number of pixels in x and y dimension. :param resolution: Resolution in meters, passed as a single number or a tuple of two numbers - resolution in horizontal and resolution in vertical direction. :param cache_folder: Path to cache_folder. If set to None (default) requests will not be cached. :param config: An instance of SHConfig defining the service :param max_threads: Maximum threads to be used when downloading data. :param upsampling: A type of upsampling to apply on data :param downsampling: A type of downsampling to apply on data :param session_loader: A callable that returns a valid SentinelHubSession, used for session sharing. Creates a new session if set to `None`, which should be avoided in large scale parallelization. """ if (size is None) == (resolution is None): raise ValueError("Exactly one of the parameters 'size' and 'resolution' should be given.") self.size = size self.resolution = resolution self.config = config or SHConfig() self.max_threads = max_threads self.data_collection: DataCollection = DataCollection(data_collection) self.cache_folder = cache_folder self.session_loader = session_loader self.upsampling = upsampling self.downsampling = downsampling
[docs] def execute( self, eopatch: EOPatch | None = None, bbox: BBox | None = None, time_interval: RawTimeIntervalType | None = None, # should be kept at this to prevent code-breaks geometry: Geometry | None = None, ) -> EOPatch: """Main execute method for the Process API tasks. The `geometry` is used only in conjunction with the `bbox` and does not act as a replacement.""" eopatch_bbox = eopatch.bbox if eopatch is not None else None area_bbox = self._consolidate_bbox(bbox, eopatch_bbox) eopatch = eopatch or EOPatch(bbox=area_bbox) eopatch.bbox = area_bbox size_x, size_y = self._get_size(area_bbox) if time_interval: time_interval = parse_time_interval(time_interval) timestamps = self._get_timestamps(time_interval, area_bbox) timestamps = [time_point.replace(tzinfo=None) for time_point in timestamps] elif self.data_collection.is_timeless: timestamps = None # should this be [] to match next branch in case of a fresh eopatch? else: timestamps = eopatch.timestamps if timestamps is not None: if not eopatch.timestamps: eopatch.timestamps = timestamps elif timestamps != eopatch.timestamps: raise ValueError("Trying to write data to an existing EOPatch with a different timestamp.") sh_requests = self._build_requests(area_bbox, size_x, size_y, timestamps, time_interval, geometry) requests = [request.download_list[0] for request in sh_requests] LOGGER.debug("Downloading %d requests of type %s", len(requests), str(self.data_collection)) session = None if self.session_loader is None else self.session_loader() client = SentinelHubDownloadClient(config=self.config, session=session) responses = client.download(requests, max_threads=self.max_threads) LOGGER.debug("Downloads complete") temporal_dim = 1 if timestamps is None else len(timestamps) shape = temporal_dim, size_y, size_x self._extract_data(eopatch, responses, shape) return eopatch
def _get_size(self, bbox: BBox) -> tuple[int, int]: """Get the size (width, height) for the request either from inputs, or from the (existing) eopatch""" if self.size is not None: return self.size if self.resolution is not None: return bbox_to_dimensions(bbox, self.resolution) raise ValueError("Size or resolution for the requests should be provided!") @staticmethod def _consolidate_bbox(bbox: BBox | None, eopatch_bbox: BBox | None) -> BBox: if eopatch_bbox is None: if bbox is None: raise ValueError("Either the eopatch or the task must provide valid bbox.") return bbox if bbox is None or eopatch_bbox == bbox: return eopatch_bbox raise ValueError("Either the eopatch or the task must provide bbox, or they must be the same.") @abstractmethod def _extract_data(self, eopatch: EOPatch, responses: list[Any], shape: tuple[int, ...]) -> EOPatch: """Extract data from the received images and assign them to eopatch features""" @abstractmethod def _build_requests( self, bbox: BBox | None, size_x: int, size_y: int, timestamps: list[dt.datetime] | None, time_interval: RawTimeIntervalType | None, geometry: Geometry | None, ) -> list[SentinelHubRequest]: """Build requests""" @abstractmethod def _get_timestamps(self, time_interval: RawTimeIntervalType | None, bbox: BBox) -> list[dt.datetime]: """Get the timestamp array needed as a parameter for downloading the images"""
[docs]class SentinelHubEvalscriptTask(SentinelHubInputBaseTask): """Process API task to download data using evalscript""" # pylint: disable=too-many-arguments def __init__( self, features: FeaturesSpecification, evalscript: str, data_collection: DataCollection, size: tuple[int, int] | None = None, resolution: float | tuple[float, float] | None = None, maxcc: float | None = None, time_difference: dt.timedelta | None = None, mosaicking_order: str | MosaickingOrder | None = None, cache_folder: str | None = None, config: SHConfig | None = None, max_threads: int | None = None, upsampling: ResamplingType | None = None, downsampling: ResamplingType | None = None, aux_request_args: dict | None = None, session_loader: Callable[[], SentinelHubSession] | None = None, timestamp_filter: Callable[[list[dt.datetime], dt.timedelta], list[dt.datetime]] = filter_times, ): """ :param features: Features to construct from the evalscript. :param evalscript: Evalscript for the request. Beware that all outputs from SentinelHub services should be named and should have the same name as corresponding feature :param data_collection: Source of requested satellite data. :param size: Number of pixels in x and y dimension. :param resolution: Resolution in meters, passed as a single number or a tuple of two numbers - resolution in horizontal and resolution in vertical direction. :param maxcc: Maximum cloud coverage, a float in interval [0, 1] :param time_difference: Minimum allowed time difference, used when filtering dates. Also used by the service for mosaicking, timestamps might be misleading for large values. :param cache_folder: Path to cache_folder. If set to None (default) requests will not be cached. :param config: An instance of SHConfig defining the service :param max_threads: Maximum threads to be used when downloading data. :param upsampling: A type of upsampling to apply on data :param downsampling: A type of downsampling to apply on data :param mosaicking_order: Mosaicking order, which has to be either 'mostRecent', 'leastRecent' or 'leastCC'. :param aux_request_args: a dictionary with auxiliary information for the input_data part of the SH request :param session_loader: A callable that returns a valid SentinelHubSession, used for session sharing. Creates a new session if set to `None`, which should be avoided in large scale parallelization. :param timestamp_filter: A function that performs the final filtering of timestamps, usually to remove multiple occurrences within the time_difference window. Check `get_available_timestamps` for more info. """ super().__init__( data_collection=data_collection, size=size, resolution=resolution, cache_folder=cache_folder, config=config, max_threads=max_threads, upsampling=upsampling, downsampling=downsampling, session_loader=session_loader, ) self.features = self._parse_and_validate_features(features) self.responses = self._create_response_objects() self.evalscript = evalscript if maxcc and isinstance(maxcc, (int, float)) and (maxcc < 0 or maxcc > 1): raise ValueError("maxcc should be a float on an interval [0, 1]") self.maxcc = maxcc self.time_difference = time_difference or dt.timedelta(seconds=1) self.timestamp_filter = timestamp_filter self.mosaicking_order = None if mosaicking_order is None else MosaickingOrder(mosaicking_order) self.aux_request_args = aux_request_args def _parse_and_validate_features(self, features: FeaturesSpecification) -> list[tuple[FeatureType, str, str]]: _features = parse_renamed_features( features, allowed_feature_types=lambda fty: fty.is_array() or fty == FeatureType.META_INFO ) ftr_data_types = {ft for ft, _, _ in _features if not ft.is_meta()} if all(ft.is_timeless() for ft in ftr_data_types) or all(ft.is_temporal() for ft in ftr_data_types): return _features raise ValueError("Cannot mix time dependent and timeless requests!") def _create_response_objects(self) -> list[JsonDict]: """Construct SentinelHubRequest output_responses from features""" responses = [] for feat_type, feat_name, _ in self.features: if feat_type.is_array(): responses.append(SentinelHubRequest.output_response(feat_name, MimeType.TIFF)) elif feat_type.is_meta(): responses.append(SentinelHubRequest.output_response("userdata", MimeType.JSON)) else: # should not happen as features have already been validated raise ValueError(f"{feat_type} not supported!") return responses def _get_timestamps(self, time_interval: RawTimeIntervalType | None, bbox: BBox) -> list[dt.datetime]: """Get the timestamp array needed as a parameter for downloading the images""" if any(feat_type.is_timeless() for feat_type, _, _ in self.features if feat_type.is_array()): return [] timestamps = get_available_timestamps( bbox=bbox, time_interval=time_interval, data_collection=self.data_collection, maxcc=self.maxcc, config=self.config, ) return self.timestamp_filter(timestamps, self.time_difference) def _build_requests( self, bbox: BBox | None, size_x: int, size_y: int, timestamps: list[dt.datetime] | None, time_interval: RawTimeIntervalType | None, geometry: Geometry | None, ) -> list[SentinelHubRequest]: """Defines request timestamps and builds requests. In case `timestamps` is either `None` or an empty list it still has to create at least one request in order to obtain back number of bands of responses.""" dates: list[tuple[dt.datetime | None, dt.datetime | None] | None] if timestamps: dates = [(date - self.time_difference, date + self.time_difference) for date in timestamps] elif timestamps is None: dates = [None] else: dates = [parse_time_interval(time_interval, allow_undefined=True)] return [self._create_sh_request(date, bbox, size_x, size_y, geometry) for date in dates] def _create_sh_request( self, time_interval: RawTimeIntervalType | None, bbox: BBox | None, size_x: int, size_y: int, geometry: Geometry | None, ) -> SentinelHubRequest: """Create an instance of SentinelHubRequest""" return SentinelHubRequest( evalscript=self.evalscript, input_data=[ SentinelHubRequest.input_data( data_collection=self.data_collection, mosaicking_order=self.mosaicking_order, time_interval=time_interval, maxcc=self.maxcc, upsampling=self.upsampling, downsampling=self.downsampling, other_args=self.aux_request_args, ) ], responses=self.responses, bbox=bbox, geometry=geometry, size=(size_x, size_y), data_folder=self.cache_folder, config=self.config, ) def _extract_data(self, eopatch: EOPatch, responses: list[Any], shape: tuple[int, ...]) -> EOPatch: """Extract data from the received images and assign them to eopatch features""" # pylint: disable=arguments-renamed if len(self.features) == 1: ftype, fname, _ = self.features[0] extension = "json" if ftype.is_meta() else "tif" responses = [{f"{fname}.{extension}": data} for data in responses] for ftype, fname, new_fname in self.features: if ftype.is_meta(): eopatch[ftype][new_fname] = [data["userdata.json"] for data in responses] elif ftype.is_temporal(): data = np.asarray([data[f"{fname}.tif"] for data in responses]) data = data[..., np.newaxis] if data.ndim == 3 else data time_dim = shape[0] eopatch[ftype][new_fname] = data[:time_dim] if time_dim != data.shape[0] else data else: eopatch[ftype][new_fname] = np.asarray(responses[0][f"{fname}.tif"])[..., np.newaxis] return eopatch
[docs]class SentinelHubInputTask(SentinelHubInputBaseTask): """Process API input task that loads 16bit integer data and converts it to a 32bit float feature.""" # pylint: disable=too-many-arguments # pylint: disable=too-many-locals def __init__( self, data_collection: DataCollection, size: tuple[int, int] | None = None, resolution: float | tuple[float, float] | None = None, bands_feature: Feature | None = None, bands: list[str] | None = None, additional_data: list[Feature] | None = None, evalscript: str | None = None, maxcc: float | None = None, time_difference: dt.timedelta | None = None, cache_folder: str | None = None, config: SHConfig | None = None, max_threads: int | None = None, bands_dtype: None | np.dtype | type = None, single_scene: bool = False, mosaicking_order: str | MosaickingOrder | None = None, upsampling: ResamplingType | None = None, downsampling: ResamplingType | None = None, aux_request_args: dict | None = None, session_loader: Callable[[], SentinelHubSession] | None = None, timestamp_filter: Callable[[list[dt.datetime], dt.timedelta], list[dt.datetime]] = filter_times, ): """ :param data_collection: Source of requested satellite data. :param size: Number of pixels in x and y dimension. :param resolution: Resolution in meters, passed as a single number or a tuple of two numbers - resolution in horizontal and resolution in vertical direction. :param bands_feature: A target feature into which to save the downloaded images. :param bands: An array of band names. If not specified it will download all bands specified for a given data collection. :param additional_data: A list of additional data to be downloaded, such as SCL, SNW, dataMask, etc. :param evalscript: An optional parameter to override an evalscript that is generated by default :param maxcc: Maximum cloud coverage. :param time_difference: Minimum allowed time difference, used when filtering dates. Also used by the service for mosaicking, timestamps might be misleading for large values. :param cache_folder: Path to cache_folder. If set to None (default) requests will not be cached. :param config: An instance of SHConfig defining the service :param max_threads: Maximum threads to be used when downloading data. :param bands_dtype: output type of the bands array, if set to None the default is used :param single_scene: If true, the service will compute a single image for the given time interval using mosaicking. :param mosaicking_order: Mosaicking order, which has to be either 'mostRecent', 'leastRecent' or 'leastCC'. :param upsampling: A type of upsampling to apply on data :param downsampling: A type of downsampling to apply on data :param aux_request_args: a dictionary with auxiliary information for the input_data part of the SH request :param session_loader: A callable that returns a valid SentinelHubSession, used for session sharing. Creates a new session if set to `None`, which should be avoided in large scale parallelization. :param timestamp_filter: A callable that performs the final filtering of timestamps, usually to remove multiple occurrences within the time_difference window. Check `get_available_timestamps` for more info. """ super().__init__( data_collection=data_collection, size=size, resolution=resolution, cache_folder=cache_folder, config=config, max_threads=max_threads, upsampling=upsampling, downsampling=downsampling, session_loader=session_loader, ) self.evalscript = evalscript self.maxcc = maxcc self.time_difference = time_difference or dt.timedelta(seconds=1) self.timestamp_filter = timestamp_filter self.single_scene = single_scene self.bands_dtype = bands_dtype self.mosaicking_order = None if mosaicking_order is None else MosaickingOrder(mosaicking_order) self.aux_request_args = aux_request_args self.bands_feature = None self.requested_bands = [] if bands_feature: self.bands_feature = self.parse_feature(bands_feature, allowed_feature_types=[FeatureType.DATA]) bands = bands if bands is not None else [band.name for band in data_collection.bands] self.requested_bands = parse_data_collection_bands(data_collection, bands) self.requested_additional_bands = [] self.additional_data: list[tuple[FeatureType, str, str]] | None = None if additional_data is not None: self.additional_data = parse_renamed_features(additional_data) additional_bands = [band for _, band, _ in self.additional_data] self.requested_additional_bands = parse_data_collection_bands(data_collection, additional_bands) def _get_timestamps(self, time_interval: RawTimeIntervalType | None, bbox: BBox) -> list[dt.datetime]: """Get the timestamp array needed as a parameter for downloading the images""" if self.single_scene: return [time_interval[0]] # type: ignore[index, list-item] timestamps = get_available_timestamps( bbox=bbox, time_interval=time_interval, data_collection=self.data_collection, maxcc=self.maxcc, config=self.config, ) return self.timestamp_filter(timestamps, self.time_difference) def _build_requests( self, bbox: BBox | None, size_x: int, size_y: int, timestamps: list[dt.datetime] | None, time_interval: RawTimeIntervalType | None, geometry: Geometry | None, ) -> list[SentinelHubRequest]: """Build requests""" if timestamps is None: intervals: list[RawTimeIntervalType | None] = [None] elif self.single_scene: intervals = [parse_time_interval(time_interval)] else: intervals = [(date - self.time_difference, date + self.time_difference) for date in timestamps] return [self._create_sh_request(time_interval, bbox, size_x, size_y, geometry) for time_interval in intervals] def _create_sh_request( self, time_interval: RawTimeIntervalType | None, bbox: BBox | None, size_x: int, size_y: int, geometry: Geometry | None, ) -> SentinelHubRequest: """Create an instance of SentinelHubRequest""" responses = [ SentinelHubRequest.output_response(band.name, MimeType.TIFF) for band in self.requested_bands + self.requested_additional_bands ] evalscript = generate_evalscript( data_collection=self.data_collection, bands=[band.name for band in self.requested_bands], meta_bands=[band.name for band in self.requested_additional_bands], prioritize_dn=not np.issubdtype(self.bands_dtype, np.floating), ) return SentinelHubRequest( evalscript=self.evalscript or evalscript, input_data=[ SentinelHubRequest.input_data( data_collection=self.data_collection, time_interval=time_interval, mosaicking_order=self.mosaicking_order, maxcc=self.maxcc, upsampling=self.upsampling, downsampling=self.downsampling, other_args=self.aux_request_args, ) ], responses=responses, bbox=bbox, geometry=geometry, size=(size_x, size_y), data_folder=self.cache_folder, config=self.config, ) def _extract_data(self, eopatch: EOPatch, responses: list[Any], shape: tuple[int, ...]) -> EOPatch: """Extract data from the received images and assign them to eopatch features""" if len(self.requested_bands) + len(self.requested_additional_bands) == 1: # if only one band is requested the response is not a tar so we reshape it only_band = (self.requested_bands + self.requested_additional_bands)[0] responses = [{only_band.name + ".tif": image} for image in responses] if self.additional_data: self._extract_additional_features(eopatch, responses, shape) if self.bands_feature: self._extract_bands_feature(eopatch, responses, shape) return eopatch def _extract_additional_features( self, eopatch: EOPatch, images: Iterable[np.ndarray], shape: tuple[int, ...] ) -> None: """Extracts additional features from response into an EOPatch""" if self.additional_data is not None: for (ftype, _, new_name), band_info in zip(self.additional_data, self.requested_additional_bands): tiffs = [tar[band_info.name + ".tif"] for tar in images] eopatch[ftype, new_name] = self._extract_array(tiffs, 0, shape, band_info.output_types[0]) def _extract_bands_feature(self, eopatch: EOPatch, images: Iterable[np.ndarray], shape: tuple[int, ...]) -> None: """Extract the bands feature arrays and concatenate them along the last axis""" processed_bands = [] for band_info in self.requested_bands: tiffs = [tar[band_info.name + ".tif"] for tar in images] dtype = self.bands_dtype or band_info.output_types[0] processed_bands.append(self._extract_array(tiffs, 0, shape, dtype)) bands_feature = cast(Tuple[FeatureType, str], self.bands_feature) # verified by `if` in _extract_data eopatch[bands_feature] = np.concatenate(processed_bands, axis=-1) @staticmethod def _extract_array(tiffs: list[np.ndarray], idx: int, shape: tuple[int, ...], dtype: type | np.dtype) -> np.ndarray: """Extract a numpy array from the received tiffs""" feature_arrays = (np.atleast_3d(img)[..., idx] for img in tiffs) return np.asarray(list(feature_arrays), dtype=dtype).reshape(*shape, 1)
[docs]class SentinelHubDemTask(SentinelHubEvalscriptTask): """ Adds DEM data (one of the `collections <https://docs.sentinel-hub.com/api/latest/data/dem/#deminstance>`__) to DATA_TIMELESS EOPatch feature. """ def __init__( self, feature: None | str | Feature = None, data_collection: DataCollection = DataCollection.DEM, **kwargs: Any, ): dem_band = data_collection.bands[0].name renamed_feature: tuple[FeatureType, str, str] if feature is None: renamed_feature = (FeatureType.DATA_TIMELESS, dem_band, dem_band) elif isinstance(feature, str): renamed_feature = (FeatureType.DATA_TIMELESS, dem_band, feature) else: ftype, _, fname = parse_renamed_feature(feature, allowed_feature_types=lambda ftype: ftype.is_timeless()) renamed_feature = (ftype, dem_band, fname or dem_band) evalscript = generate_evalscript(data_collection=data_collection, bands=[dem_band]) super().__init__(evalscript=evalscript, features=[renamed_feature], data_collection=data_collection, **kwargs)