Source code for eolearn.io.raster_io

"""
Module containing tasks used for reading and writing to disk

Copyright (c) 2017- Sinergise and contributors
For the full list of contributors, see the CREDITS file in the root directory of this source tree.

This source code is licensed under the MIT license, see the LICENSE file in the root directory of this source tree.
"""

from __future__ import annotations

import datetime as dt
import functools
import logging
import warnings
from abc import ABCMeta

import fs
import numpy as np
import rasterio
import rasterio.warp
from affine import Affine
from fs.base import FS
from fs_s3fs import S3FS
from rasterio.io import DatasetReader
from rasterio.session import AWSSession
from rasterio.windows import Window, from_bounds

from sentinelhub import CRS, BBox, SHConfig, parse_time_interval

from eolearn.core import EOPatch
from eolearn.core.core_tasks import IOTask
from eolearn.core.exceptions import EODeprecationWarning, EORuntimeWarning
from eolearn.core.types import Feature
from eolearn.core.utils.fs import get_base_filesystem_and_path, get_full_path

LOGGER = logging.getLogger(__name__)


[docs]class BaseRasterIoTask(IOTask, metaclass=ABCMeta): """Base abstract class for raster IO tasks""" def __init__( self, feature: Feature, path: str = ..., # type: ignore[assignment] *, filesystem: FS | None = None, image_dtype: np.dtype | type | None = None, no_data_value: float | None = None, create: bool = False, config: SHConfig | None = None, folder: str | None = None, ): """ :param feature: Feature which will be exported or imported :param path: Path pointing to an image file or to a directory of image files. If `filesystem` is not defined, absolute paths should be provided. :param filesystem: A filesystem object. If not given it will be initialized according to `folder` parameter. :param image_dtype: A data type of data in exported images or data imported from images. :param no_data_value: When exporting this is the `NoData` value of pixels in exported images. When importing this value is assigned to the pixels with `NoData`. :param create: If the filesystem path doesn't exist this flag indicates to either create it or raise an error. :param config: A configuration object with AWS credentials. By default, is set to None and in this case the default configuration will be taken. """ if folder is not None: warnings.warn("The parameter `folder` has been renamed to `path`.", EODeprecationWarning, stacklevel=3) path = folder if path is ...: # type: ignore[comparison-overlap] raise TypeError(f"{type(self).__name__} is missing a required positional argument 'path'.") self.feature = self.parse_feature(feature) self.image_dtype = image_dtype self.no_data_value = no_data_value if filesystem is None: filesystem, path = get_base_filesystem_and_path(path, create=create, config=config) # the super-class takes care of filesystem pickling super().__init__(path, filesystem=filesystem, create=create, config=config) def _get_filename_paths( self, filename_template: str | list[str], timestamps: list[dt.datetime] | None ) -> list[str]: """From a filename "template" and base path on the filesystem it generates full paths to tiff files. The paths are still relative to the filesystem object. If the file extension is not provided, it will default to `.tif`. If a "*" wildcard or a datetime format substring (e.g. "%Y%m%dT%H%M%S") is provided in the template, it returns multiple GeoTIFF paths where each one will correspond to a single timestamp. Alternatively, a list of paths can be provided, one for each timestamp. """ if isinstance(filename_template, str): filename_path = fs.path.join(self.filesystem_path, filename_template) filename_paths = self._generate_paths(filename_path, timestamps) elif isinstance(filename_template, list): if timestamps and len(filename_template) != len(timestamps): raise ValueError("The number of provided timestamps does not match the number of provided filenames.") filenames = [] for idx, path in enumerate(filename_template): timestamps = [] if not timestamps else [timestamps[idx]] filenames.extend(self._generate_paths(path, timestamps)) filename_paths = [fs.path.join(self.filesystem_path, path) for path in filenames] else: raise TypeError(f"The `filename` parameter must be a list or a string, but {filename_template} found") if self._create_path: unique_folder_paths = {fs.path.dirname(filename_path) for filename_path in filename_paths} for folder_path in unique_folder_paths: self.filesystem.makedirs(folder_path, recreate=True) return filename_paths @classmethod def _generate_paths(cls, path_template: str, timestamps: list[dt.datetime] | None) -> list[str]: """Uses a filename path template to create a list of actual filename paths.""" if not path_template.lower().endswith((".tif", ".tiff")): path_template = f"{path_template}.tif" if not timestamps: return [path_template] path_template = path_template.replace("*", "%Y%m%dT%H%M%S") if timestamps[0].strftime(path_template) == path_template: # unaffected by timestamps return [path_template] return [timestamp.strftime(path_template) for timestamp in timestamps]
[docs]class ExportToTiffTask(BaseRasterIoTask): """Task exports specified feature to GeoTIFF. The task can export also features with sizes of both time and band dimension greater than `1`. When exporting only multiple times or only multiple bands, the GeoTIFF channels are in the expected order. However, when exporting multiple times and bands, the order obeys the following pattern: T(1)B(1), T(1)B(2), ..., T(1)B(N), T(2)B(1), T(2)B(2), ..., T(2)B(N), ..., ..., T(M)B(N) where T and B are the time and band indices of the array, and M and N are the lengths of these indices, respectively. """ def __init__( self, feature: Feature, path: str = ..., # type: ignore[assignment] *, date_indices: list[int] | tuple[int, int] | tuple[dt.datetime, dt.datetime] | tuple[str, str] | None = None, band_indices: list[int] | tuple[int, int] | None = None, crs: CRS | int | str | None = None, fail_on_missing: bool = True, compress: str | None = None, filesystem: FS | None = None, image_dtype: np.dtype | type | None = None, no_data_value: float | None = None, config: SHConfig | None = None, folder: str | None = None, ): """ :param feature: A feature to be exported. :param path: Path pointing to an image file or to a directory of image files to be exported. If `filesystem` is not defined, absolute paths should be provided. :param date_indices: Indices of those time frames from the give feature that will be exported to a tiff image. It can be either a list of indices or a tuple of `2` indices defining an interval of indices or a tuple of `2` datetime object also defining a time interval. By default, all time frames will be exported. :param band_indices: Indices of those bands from the given feature that will be exported to a tiff image. It can be either a list of indices or a tuple of `2` indices defining an interval of indices. By default, all bands will be exported. :param crs: A CRS in which to reproject the feature before writing it to GeoTIFF. By default, no reprojection will be done. :param fail_on_missing: A flag to specify if the task should fail if a feature is missing or if it should just show a warning. :param compress: A type of compression that rasterio should apply to an exported image. :param filesystem: A filesystem object. If not given it will be initialized according to `folder` parameter. :param image_dtype: A data type of data in exported images or data imported from images. :param no_data_value: The `NoData` value of pixels in exported images. :param config: A configuration object with AWS credentials. By default, is set to None and in this case the default configuration will be taken. """ super().__init__( feature, path=path, create=True, filesystem=filesystem, image_dtype=image_dtype, no_data_value=no_data_value, config=config, folder=folder, ) self.date_indices = date_indices self.band_indices = band_indices self.crs = None if crs is None else CRS(crs) self.fail_on_missing = fail_on_missing self.compress = compress def _prepare_image_array(self, eopatch: EOPatch, feature: Feature) -> np.ndarray: """Collects a feature from EOPatch and prepares the array of an image which will be rasterized. The resulting array has shape (channels, height, width) and is of correct dtype. """ data_array = self._reduce_by_bands(eopatch[feature]) feature_type, _ = feature if feature_type.is_temporal(): data_array = self._reduce_by_time(data_array, eopatch.get_timestamps()) else: data_array = np.expand_dims(data_array, axis=0) if not feature_type.is_spatial(): # add height and width dimensions data_array = np.expand_dims(np.expand_dims(data_array, axis=1), axis=1) data_array = self._set_export_dtype(data_array, feature) time_dim, height, width, band_dim = data_array.shape new_shape = (time_dim * band_dim, height, width) return np.moveaxis(data_array, -1, 1).reshape(new_shape) def _reduce_by_bands(self, array: np.ndarray) -> np.ndarray: """Reduces the array by selecting a subset of bands.""" if self.band_indices is None: return array if isinstance(self.band_indices, list): if [band for band in self.band_indices if not isinstance(band, int)]: raise ValueError(f"Invalid format in {self.band_indices} list, expected integers") return array[..., self.band_indices] if isinstance(self.band_indices, tuple): if tuple(map(type, self.band_indices)) != (int, int): raise ValueError(f"Invalid format in {self.band_indices} tuple, expected integers") return array[..., self.band_indices[0] : self.band_indices[1] + 1] raise ValueError(f"Invalid format in {self.band_indices}, expected tuple or list") def _reduce_by_time(self, data_array: np.ndarray, timestamps: list[dt.datetime]) -> np.ndarray: """Reduce array by selecting a subset of times.""" if self.date_indices is None: return data_array if isinstance(self.date_indices, list): if [date for date in self.date_indices if not isinstance(date, int)]: raise ValueError(f"Invalid format in {self.date_indices} list, expected integers") return data_array[np.array(self.date_indices), ...] if isinstance(self.date_indices, tuple): dates = np.array(timestamps) start_idx, end_idx = self.date_indices if isinstance(start_idx, int) and isinstance(end_idx, int): start_date, end_date = dates[start_idx], dates[end_idx] elif isinstance(start_idx, str) and isinstance(end_idx, str): start_date, end_date = parse_time_interval((start_idx, end_idx)) elif isinstance(start_idx, dt.datetime) and isinstance(end_idx, dt.datetime): start_date, end_date = start_idx, end_idx else: raise ValueError(f"Invalid format in {self.date_indices} tuple, expected ints, strings, or datetimes") return data_array[np.nonzero(np.where((dates >= start_date) & (dates <= end_date), dates, 0))[0]] raise ValueError(f"Invalid format in {self.date_indices}, expected tuple or list") def _set_export_dtype(self, data_array: np.ndarray, feature: Feature) -> np.ndarray: """To a given array it sets a dtype in which data will be exported""" image_dtype = data_array.dtype if self.image_dtype is None else self.image_dtype if image_dtype == np.int64: image_dtype = np.int32 warnings.warn( f"Cannot export {feature} with dtype numpy.int64. Will export as numpy.int32 instead", EORuntimeWarning ) return data_array.astype(image_dtype) if image_dtype != data_array.dtype else data_array def _get_source_and_destination_params( self, data_array: np.ndarray, bbox: BBox ) -> tuple[tuple[str, Affine], tuple[str, Affine], tuple[int, int]]: """Calculates source and destination CRS and transforms. Also returns destination height and width.""" _, height, width = data_array.shape src_crs = bbox.crs.ogc_string() src_transform = rasterio.transform.from_bounds(*bbox, width=width, height=height) if self.crs is None: return (src_crs, src_transform), (src_crs, src_transform), (height, width) dst_crs = self.crs.ogc_string() dst_transform, dst_width, dst_height = rasterio.warp.calculate_default_transform( src_crs, dst_crs, width, height, *bbox ) return (src_crs, src_transform), (dst_crs, dst_transform), (dst_height, dst_width) def _export_tiff( self, image_array: np.ndarray, filesystem: FS, path: str, channel_count: int, *, dst_crs: str, dst_transform: Affine, dst_height: int, dst_width: int, src_crs: str, src_transform: Affine, ) -> None: """Export an EOPatch feature to tiff based on input channel range.""" with rasterio.Env(), filesystem.openbin(path, "w") as file_handle: # noqa: SIM117 with rasterio.open( file_handle, "w", driver="GTiff", width=dst_width, height=dst_height, count=channel_count, dtype=image_array.dtype, nodata=self.no_data_value, transform=dst_transform, crs=dst_crs, compress=self.compress, ) as dst: if dst_crs == src_crs: dst.write(image_array) else: for idx in range(channel_count): rasterio.warp.reproject( source=image_array[idx, ...], destination=rasterio.band(dst, idx + 1), src_transform=src_transform, src_crs=src_crs, dst_transform=dst_transform, dst_crs=dst_crs, resampling=rasterio.warp.Resampling.nearest, )
[docs] def execute(self, eopatch: EOPatch, *, filename: str | list[str] | None = "") -> EOPatch: """Execute method :param eopatch: An input EOPatch :param filename: A filename with a path to a tiff file. The path has to be a relative to the filesystem object and path from `folder` initialization parameter. If the file extension of the image file is not provided, it will default to `.tif`. If a "*" wildcard or a datetime.strftime substring (e.g. "%Y%m%dT%H%M%S") is provided in the path, an EOPatch feature will be split over multiple GeoTIFFs where each one will correspond to a single timestamp. Alternatively, a list of paths can be provided, one for each timestamp. Set this parameter to `None` to disable exporting. :return: Unchanged input EOPatch """ if filename is None: return eopatch if self.feature not in eopatch: error_msg = f"Feature {self.feature} was not found in EOPatch" LOGGER.warning(error_msg) if self.fail_on_missing: raise ValueError(error_msg) return eopatch if eopatch.bbox is None: raise ValueError("EOPatch without a bounding box encountered, cannot export to GeoTIFF") image_array = self._prepare_image_array(eopatch, self.feature) src_info, dst_info, (dst_height, dst_width) = self._get_source_and_destination_params(image_array, eopatch.bbox) src_crs, src_transform = src_info dst_crs, dst_transform = dst_info filename_paths = self._get_filename_paths(filename, eopatch.timestamps) with self.filesystem as filesystem: # no worries about `close`, filesystem is freshly unpickled by the property export_function = functools.partial( self._export_tiff, filesystem=filesystem, dst_crs=dst_crs, dst_transform=dst_transform, dst_height=dst_height, dst_width=dst_width, src_crs=src_crs, src_transform=src_transform, ) channel_count = image_array.shape[0] if len(filename_paths) > 1: single_channel_count = channel_count // len(filename_paths) for timestamp_index, path in enumerate(filename_paths): time_slice_array = image_array[ timestamp_index * single_channel_count : (timestamp_index + 1) * single_channel_count, ... ] export_function(image_array=time_slice_array, path=path, channel_count=single_channel_count) else: export_function(image_array=image_array, path=filename_paths[0], channel_count=channel_count) return eopatch
[docs]class ImportFromTiffTask(BaseRasterIoTask): """Task for importing data from a GeoTIFF file into an EOPatch The task can take an existing EOPatch and read the part of GeoTIFF image, which intersects with its bounding box, into a new feature. But if no EOPatch is given it will create a new EOPatch, read entire GeoTIFF image into a feature and set a bounding box of the new EOPatch. Note that if GeoTIFF file is not completely spatially aligned with location of given EOPatch it will try to fit it as good as possible. However, it will not do any spatial resampling or interpolation on GeoTIFF data. """ def __init__( self, feature: Feature, path: str = ..., # type: ignore[assignment] *, use_vsi: bool | None = None, timestamp_size: int | None = None, filesystem: FS | None = None, image_dtype: np.dtype | type | None = None, no_data_value: float | None = None, config: SHConfig | None = None, folder: str | None = None, ): """ :param feature: EOPatch feature into which data will be imported :param path: Path pointing to an image file or to a directory of image files to be imported. If `filesystem` is not defined, absolute paths should be provided. :param use_vsi: Deprecated. :param timestamp_size: In case data will be imported into a time-dependant feature this parameter can be used to specify time dimension. If not specified, time dimension will be the same as size of the timestamps attribute. If timestamps do not exist this value will be set to 1. When converting data into a feature channels of given tiff image should be in order T(1)B(1), T(1)B(2), ..., T(1)B(N), T(2)B(1), T(2)B(2), ..., T(2)B(N), ..., ..., T(M)B(N) where T and B are the time and band indices. :param filesystem: A filesystem object. If not given it will be initialized according to `folder` parameter. :param image_dtype: A data type of data in exported images or data imported from images. :param no_data_value: When exporting this is the NoData value of pixels in exported images. When importing this value is assigned to the pixels with NoData. :param config: A configuration object with AWS credentials. By default, is set to None and in this case the default configuration will be taken. """ super().__init__( feature, path=path, filesystem=filesystem, image_dtype=image_dtype, no_data_value=no_data_value, config=config, folder=folder, ) if use_vsi is not None: warnings.warn( "The parameter `use_vsi` has been deprecated and has no effect.", EODeprecationWarning, stacklevel=2 ) self.timestamp_size = timestamp_size def _load_from_image(self, path: str, filesystem: FS, bbox: BBox | None) -> tuple[np.ndarray, BBox | None]: """The method decides in what way data will be loaded from the image.""" full_path = get_full_path(filesystem, path) env_kwargs = {} if isinstance(filesystem, S3FS): env_kwargs["session"] = AWSSession( aws_access_key_id=filesystem.aws_access_key_id, aws_secret_access_key=filesystem.aws_secret_access_key, aws_session_token=filesystem.aws_session_token, region_name=filesystem.region, endpoint_url=filesystem.endpoint_url, ) with rasterio.Env(**env_kwargs), rasterio.open(full_path) as src: read_window, read_bbox = self._get_reading_window_and_bbox(src, bbox) boundless_reading = read_window is not None return src.read(window=read_window, boundless=boundless_reading, fill_value=self.no_data_value), read_bbox @staticmethod def _get_reading_window_and_bbox(reader: DatasetReader, bbox: BBox | None) -> tuple[Window | None, BBox | None]: """Provides a reading window for which data will be read from image. If it returns `None` this means that the whole image should be read. Those cases are when bbox is not defined, image is not geo-referenced, or bbox coordinates exactly match image coordinates. Additionally, it provides a bounding box of reading window if an image is geo-referenced.""" image_crs = reader.crs image_transform = reader.transform image_bounds = reader.bounds if image_crs is None or image_transform is None or image_bounds is None: return None, bbox min_x, min_y, max_x, max_y = reader.bounds image_bbox = BBox((min_x, min_y, max_x, max_y), crs=image_crs.to_epsg()) if bbox is None: return None, image_bbox original_bbox = bbox if bbox.crs is not image_bbox.crs: bbox = bbox.transform(image_crs.to_epsg()) if bbox == image_bbox: return None, original_bbox return from_bounds(*iter(bbox), transform=image_transform), original_bbox def _load_data(self, filename_paths: list[str], initial_bbox: BBox | None) -> tuple[np.ndarray, BBox | None]: """Load data from images, join them, and provide their bounding box.""" data_per_path = [] final_bbox: BBox | None = None with self.filesystem as filesystem: for path in filename_paths: data, bbox = self._load_from_image(path, filesystem, initial_bbox) data_per_path.append(data) if bbox is None: continue if final_bbox and bbox != final_bbox: raise RuntimeError( "Given images have different geo-references. They can't be imported into an EOPatch that" " doesn't have a bounding box." ) final_bbox = bbox return np.concatenate(data_per_path, axis=0), final_bbox
[docs] def execute(self, eopatch: EOPatch | None = None, *, filename: str | None = "") -> EOPatch: """Execute method which adds a new feature to the EOPatch. :param eopatch: input EOPatch or None if a new EOPatch should be created :param filename: filename of tiff file or None if entire path has already been specified in `folder` parameter of task initialization. """ if filename is None: if eopatch is None: raise ValueError("Both eopatch and filename parameters cannot be set to None") return eopatch feature_type, feature_name = self.feature loading_bbox = eopatch.bbox if eopatch is not None else None loading_timestamps = eopatch.timestamps if eopatch is not None else [] filename_paths = self._get_filename_paths(filename, loading_timestamps) data, bbox = self._load_data(filename_paths, loading_bbox) if eopatch is None: eopatch = EOPatch(bbox=bbox) if eopatch.bbox is None: eopatch.bbox = bbox if self.image_dtype is not None: data = data.astype(self.image_dtype) if not feature_type.is_spatial(): data = data.flatten() if feature_type.is_timeless(): data = np.moveaxis(data, 0, -1) else: channels = data.shape[0] times = self.timestamp_size if times is None: times = len(eopatch.timestamps) if eopatch.timestamps else 1 if channels % times != 0: raise ValueError( "Cannot import as a time-dependant feature because the number of tiff image channels " "is not divisible by the number of timestamps" ) data = data.reshape((times, channels // times) + data.shape[1:]) data = np.moveaxis(data, 1, -1) eopatch[feature_type][feature_name] = data return eopatch