Source code for eolearn.geometry.transformations

"""
Transformations between vector and raster formats of data

Copyright (c) 2017- Sinergise and contributors
For the full list of contributors, see the CREDITS file in the root directory of this source tree.

This source code is licensed under the MIT license, see the LICENSE file in the root directory of this source tree.
"""

from __future__ import annotations

import datetime as dt
import itertools as it
import logging
import warnings
from functools import partial
from typing import Any, Callable, ClassVar, Iterator, Tuple, cast

import numpy as np
import pandas as pd
import rasterio.features
import rasterio.transform
import shapely.geometry
import shapely.ops
import shapely.wkt
from affine import Affine
from geopandas import GeoDataFrame, GeoSeries
from shapely.geometry.base import BaseGeometry

from sentinelhub import CRS, BBox, bbox_to_dimensions, parse_time

from eolearn.core import EOPatch, EOTask, FeatureType
from eolearn.core.constants import TIMESTAMP_COLUMN
from eolearn.core.exceptions import EORuntimeWarning
from eolearn.core.types import Feature, FeaturesSpecification, SingleFeatureSpec

LOGGER = logging.getLogger(__name__)

ShapeIterator = Iterator[Tuple[BaseGeometry, float]]


[docs]class VectorToRasterTask(EOTask): """A task for transforming a vector feature into a raster feature Vector data can be given as an EOPatch feature or as an independent geopandas `GeoDataFrame`. In the background it uses `rasterio.features.rasterize`, documented in `rasterio documentation <https://rasterio.readthedocs.io/en/stable/api/rasterio.features.html#rasterio.features.rasterize>`__. """ # A mapping between types that are not supported by rasterio into types that are. After rasterization the task # will cast results back into the original dtype. _RASTERIO_DTYPES_MAP: ClassVar[dict[type | np.dtype, type]] = { bool: np.uint8, np.dtype(bool): np.uint8, np.int8: np.int16, np.dtype(np.int8): np.int16, float: np.float64, np.dtype(float): np.float64, } def __init__( self, vector_input: GeoDataFrame | Feature, raster_feature: Feature, *, values: None | float | list[float] = None, values_column: str | None = None, raster_shape: None | tuple[int, int] | Feature = None, raster_resolution: None | float | tuple[float, float] = None, raster_dtype: np.dtype | type = np.uint8, no_data_value: float = 0, write_to_existing: bool = False, overlap_value: float | None = None, buffer: float = 0, **rasterio_params: Any, ): """ :param vector_input: Vector data to be used for rasterization. It can be given as a feature in `EOPatch` or as a geopandas `GeoDataFrame`. :param raster_feature: New or existing raster feature into which data will be written. If existing raster feature is given it will by default take existing values and write over them. :param values: If `values_column` parameter is specified then only polygons which have one of these specified values in `values_column` will be rasterized. It can be also left to `None`. If `values_column` parameter is not specified `values` parameter has to be a single number into which everything will be rasterized. :param values_column: A column in given dataframe where values, into which polygons will be rasterized, are stored. If it is left to `None` then `values` parameter should be a single number into which everything will be rasterized. :param raster_shape: Can be a tuple in form of (height, width) or an existing feature from which the spatial dimensions will be taken. Parameter `raster_resolution` can be specified instead of `raster_shape`. :param raster_resolution: Resolution of raster into which data will be rasterized. Has to be given as a number or a tuple of numbers in meters. Parameter `raster_shape` can be specified instead or `raster_resolution`. :param raster_dtype: Data type of the obtained raster array, default is `numpy.uint8`. :param no_data_value: Default value of all other raster pixels into which no value will be rasterized. :param write_to_existing: If `True` it will write to existing raster array and overwrite parts of its values. If `False` it will create a new raster array and remove the old one. Default is `False`. :param overlap_value: A value to override parts of raster where polygons of different classes overlap. If None, rasterization overlays polygon as it is the default behavior of `rasterio.features.rasterize`. :param buffer: Buffer value passed to vector_data.buffer() before rasterization. If 0, no buffering is done. :param: rasterio_params: Additional parameters to be passed to `rasterio.features.rasterize`. Currently, available parameters are `all_touched` and `merge_alg` """ self.vector_input, self.raster_feature = self._parse_main_params(vector_input, raster_feature) self._rasterize_per_timestamp = self.raster_feature[0].is_temporal() if _vector_is_timeless(self.vector_input) and not self.raster_feature[0].is_timeless(): raise ValueError("Vector input has no time-dependence but a time-dependent output feature was selected") self.values = values self.values_column = values_column if values_column is None and (values is None or not isinstance(values, (int, float))): raise ValueError("One of parameters 'values' and 'values_column' is missing") self.raster_shape = raster_shape self.raster_resolution = raster_resolution if (raster_shape is None) == (raster_resolution is None): raise ValueError("Exactly one of parameters 'raster_shape' and 'raster_resolution' has to be specified") self.raster_dtype = raster_dtype self.no_data_value = no_data_value self.write_to_existing = write_to_existing self.rasterio_params = rasterio_params self.overlap_value = overlap_value self.buffer = buffer def _parse_main_params( self, vector_input: GeoDataFrame | SingleFeatureSpec, raster_feature: SingleFeatureSpec ) -> tuple[GeoDataFrame | Feature, Feature]: """Parsing first 2 parameters - what vector data will be used and in which raster feature it will be saved""" if not _is_geopandas_object(vector_input): vector_input = self.parse_feature(vector_input, allowed_feature_types=lambda fty: fty.is_vector()) parsed_raster_feature = self.parse_feature(raster_feature, allowed_feature_types=lambda fty: fty.is_image()) return vector_input, parsed_raster_feature def _get_vector_data_iterator( self, eopatch: EOPatch, join_per_value: bool ) -> Iterator[tuple[dt.datetime | None, ShapeIterator]]: """Collects and prepares vector shapes for rasterization. It works as an iterator that returns pairs of `(timestamp or None, <iterator over shapes and values>)` :param eopatch: An EOPatch from where geometries will be obtained :param join_per_value: If `True` it will join geometries with the same value using a cascaded union """ vector_data = self.vector_input if _is_geopandas_object(self.vector_input) else eopatch[self.vector_input] # EOPatch has a bbox, verified in execute vector_data = self._preprocess_vector_data(vector_data, cast(BBox, eopatch.bbox), eopatch.timestamps) if self._rasterize_per_timestamp: for timestamp, data_for_time in vector_data.groupby(TIMESTAMP_COLUMN): if not data_for_time.empty: yield timestamp.to_pydatetime(), self._vector_data_to_shape_iterator(data_for_time, join_per_value) elif not vector_data.empty: yield None, self._vector_data_to_shape_iterator(vector_data, join_per_value) def _preprocess_vector_data( self, vector_data: GeoDataFrame, bbox: BBox, timestamps: list[dt.datetime] | None ) -> GeoDataFrame: """Applies preprocessing steps on a dataframe with geometries and potential values and timestamps""" vector_data = self._reduce_vector_data(vector_data, timestamps) if bbox.crs is not CRS(vector_data.crs.to_epsg()): warnings.warn( "Vector data is not in the same CRS as EOPatch, the task will re-project vectors for each execution", EORuntimeWarning, ) vector_data = vector_data.to_crs(bbox.crs.pyproj_crs()) bbox_poly = bbox.geometry vector_data = vector_data[vector_data.geometry.intersects(bbox_poly)].copy(deep=True) if self.buffer: vector_data.geometry = vector_data.geometry.buffer(self.buffer) vector_data = vector_data[~vector_data.is_empty] if not vector_data.geometry.is_valid.all(): warnings.warn("Given vector polygons contain some invalid geometries, attempting to fix", EORuntimeWarning) vector_data.geometry = vector_data.geometry.buffer(0) if vector_data.geometry.has_z.any(): warnings.warn("Polygons contain 3D geometries, they will be projected to 2D", EORuntimeWarning) vector_data.geometry = vector_data.geometry.map(partial(shapely.ops.transform, lambda *args: args[:2])) return vector_data def _reduce_vector_data(self, vector_data: GeoDataFrame, timestamps: list[dt.datetime] | None) -> GeoDataFrame: """Removes all redundant columns and rows.""" columns_to_keep = ["geometry"] if self._rasterize_per_timestamp: columns_to_keep.append(TIMESTAMP_COLUMN) if self.values_column is not None: columns_to_keep.append(self.values_column) vector_data = vector_data[columns_to_keep] if self._rasterize_per_timestamp: vector_data[TIMESTAMP_COLUMN] = vector_data[TIMESTAMP_COLUMN].apply(parse_time) vector_data = vector_data[vector_data[TIMESTAMP_COLUMN].isin(timestamps)] if self.values_column is not None and self.values is not None: values = [self.values] if isinstance(self.values, (int, float)) else self.values vector_data = vector_data[vector_data[self.values_column].isin(values)] return vector_data def _vector_data_to_shape_iterator(self, vector_data: GeoDataFrame, join_per_value: bool) -> ShapeIterator: if self.values_column is None: value = cast(float, self.values) # cast is checked at init return zip(vector_data.geometry, it.repeat(value)) values = vector_data[self.values_column] if join_per_value: groups = {val: vector_data.geometry[values == val] for val in np.unique(values)} join_function = shapely.ops.unary_union if shapely.__version__ >= "1.8.0" else shapely.ops.cascaded_union return ((join_function(group), val) for val, group in groups.items()) return zip(vector_data.geometry, values) def _get_raster_shape(self, eopatch: EOPatch) -> tuple[int, int]: """Determines the shape of new raster feature, returns a pair (height, width)""" if isinstance(self.raster_shape, (tuple, list)) and len(self.raster_shape) == 2: if isinstance(self.raster_shape[0], int) and isinstance(self.raster_shape[1], int): return self.raster_shape ftype, fname = self.parse_feature(self.raster_shape, allowed_feature_types=lambda fty: fty.is_array()) return eopatch.get_spatial_dimension(ftype, fname) if self.raster_resolution: # parsing from strings is not denoted in types, so an explicit upcast is required raw_resolution: str | float | tuple[float, float] = self.raster_resolution resolution = float(raw_resolution.strip("m")) if isinstance(raw_resolution, str) else raw_resolution width, height = bbox_to_dimensions(cast(BBox, eopatch.bbox), resolution) # cast verified in execute return height, width raise ValueError("Could not determine shape of the raster image") def _get_raster(self, eopatch: EOPatch, height: int, width: int) -> np.ndarray: """Provides raster into which data will be written""" raster_shape = ( (len(eopatch.get_timestamps()), height, width) if self._rasterize_per_timestamp else (height, width) ) if self.write_to_existing and self.raster_feature in eopatch: raster = eopatch[self.raster_feature] expected_full_shape = (*raster_shape, 1) if raster.shape != expected_full_shape: msg = ( f"The existing raster feature {self.raster_feature} has a shape {raster.shape} but the expected" f" shape is {expected_full_shape}. This might cause errors or unexpected results." ) warnings.warn(msg, EORuntimeWarning) return raster.squeeze(axis=-1) return np.full(raster_shape, self.no_data_value, dtype=self.raster_dtype) def _get_rasterization_function(self, bbox: BBox, height: int, width: int) -> Callable: """Provides a function that rasterizes shapes into output raster and already contains all optional parameters""" affine_transform = rasterio.transform.from_bounds(*bbox, width=width, height=height) rasterize_params = dict(self.rasterio_params, transform=affine_transform) base_rasterize_func = rasterio.features.rasterize if self.overlap_value is None else self.rasterize_overlapped return partial(base_rasterize_func, **rasterize_params)
[docs] def rasterize_overlapped(self, shapes: ShapeIterator, out: np.ndarray, **rasterize_args: Any) -> None: """Rasterize overlapped classes. :param shapes: Shapes to be rasterized. :param out: A numpy array to which to rasterize polygon classes. :param rasterize_args: Keyword arguments to be passed to `rasterio.features.rasterize`. """ rasters = [rasterio.features.rasterize([shape], out=np.copy(out), **rasterize_args) for shape in shapes] overlap_mask = np.zeros(out.shape, dtype=bool) no_data = self.no_data_value out[:] = rasters[0][:] for raster in rasters[1:]: overlap_mask[(out != no_data) & (raster != no_data) & (raster != out)] = True out[raster != no_data] = raster[raster != no_data] out[overlap_mask] = self.overlap_value
[docs] def execute(self, eopatch: EOPatch) -> EOPatch: """Execute method :param eopatch: input EOPatch :return: New EOPatch with vector data transformed into a raster feature """ if eopatch.bbox is None: raise ValueError("EOPatch has to have a bounding box") height, width = self._get_raster_shape(eopatch) rasterize_func = self._get_rasterization_function(eopatch.bbox, height=height, width=width) vector_data_iterator = self._get_vector_data_iterator(eopatch, join_per_value=self.overlap_value is not None) raster = self._get_raster(eopatch, height, width) original_dtype = raster.dtype if original_dtype in self._RASTERIO_DTYPES_MAP: rasterio_dtype = self._RASTERIO_DTYPES_MAP[original_dtype] raster = raster.astype(rasterio_dtype) timestamps = eopatch.timestamps or [] timestamp_to_index = {timestamp: index for index, timestamp in enumerate(timestamps)} for timestamp, shape_iterator in vector_data_iterator: if timestamp is None: rasterize_func(shape_iterator, out=raster) else: time_index = timestamp_to_index[timestamp] rasterize_func(shape_iterator, out=raster[time_index, ...]) if original_dtype is not raster.dtype: raster = raster.astype(original_dtype) eopatch[self.raster_feature] = raster[..., np.newaxis] return eopatch
[docs]class RasterToVectorTask(EOTask): """Task for transforming raster mask feature into vector feature. Each connected component with the same value on the raster mask is turned into a shapely polygon. Polygon are returned as a geometry column in a ``geopandas.GeoDataFrame`` structure together with a column `VALUE` with values of each polygon. If raster mask feature has time component, vector feature will also have a column `TIMESTAMP` with timestamps to which raster image each polygon belongs to. If raster mask has multiple channels each of them will be vectorized separately but polygons will be in the same vector feature """ def __init__( self, features: FeaturesSpecification, *, values: list[int] | None = None, values_column: str = "VALUE", raster_dtype: None | np.dtype | type = None, **rasterio_params: Any, ): """ :param features: One or more raster mask features which will be vectorized together with an optional new name of vector feature. If no new name is given the same name will be used. Examples: - `features=(FeatureType.MASK, 'CLOUD_MASK', 'VECTOR_CLOUD_MASK')` - `features=[(FeatureType.MASK_TIMELESS, 'CLASSIFICATION'), (FeatureType.MASK, 'TEMPORAL_CLASSIFICATION')]` :param values: List of values which will be vectorized. By default, is set to ``None`` and all values will be vectorized :param values_column: Name of the column in vector feature where raster values will be written :param raster_dtype: If raster feature mask is of type which is not supported by ``rasterio.features.shapes`` (e.g. ``numpy.int64``) this parameter is used to cast the mask into a different type (``numpy.int16``, ``numpy.int32``, ``numpy.uint8``, ``numpy.uint16`` or ``numpy.float32``). By default, value of the parameter is ``None`` and no casting is done. :param: rasterio_params: Additional parameters to be passed to `rasterio.features.shapes`. Currently, available is parameter `connectivity`. """ self.feature_parser = self.get_feature_parser(features, allowed_feature_types=lambda fty: fty.is_discrete()) self.values = values self.values_column = values_column self.raster_dtype = raster_dtype self.rasterio_params = rasterio_params def _vectorize_single_raster( self, raster: np.ndarray, affine_transform: Affine, crs: CRS, timestamps: dt.datetime | None = None ) -> GeoDataFrame: """Vectorizes a data slice of a single time component :param raster: Numpy array or shape (height, width, channels) :param affine_transform: Object holding a transform vector (i.e. geographical location vector) of the raster :param crs: Coordinate reference system :param timestamp: Time of the data slice :return: Vectorized data """ mask = np.isin(raster, self.values) if self.values is not None else None geo_list = [] value_list = [] for idx in range(raster.shape[-1]): idx_mask = None if mask is None else mask[..., idx] for geojson, value in rasterio.features.shapes( raster[..., idx], mask=idx_mask, transform=affine_transform, **self.rasterio_params ): geo_list.append(shapely.geometry.shape(geojson)) value_list.append(value) series_dict = {self.values_column: pd.Series(value_list, dtype=self.raster_dtype)} if timestamps is not None: series_dict[TIMESTAMP_COLUMN] = pd.to_datetime([timestamps] * len(geo_list)) vector_data = GeoDataFrame(series_dict, geometry=geo_list, crs=crs.pyproj_crs()) if not vector_data.geometry.is_valid.all(): vector_data.geometry = vector_data.geometry.buffer(0) return vector_data
[docs] def execute(self, eopatch: EOPatch) -> EOPatch: """Execute function which adds new vector layer to the EOPatch :param eopatch: input EOPatch :return: New EOPatch with added vector layer """ if eopatch.bbox is None: raise ValueError("EOPatch has to have a bounding box") crs = eopatch.bbox.crs for raster_type, raster_name, vector_name in self.feature_parser.get_renamed_features(eopatch): vector_type = FeatureType.VECTOR_TIMELESS if raster_type.is_timeless() else FeatureType.VECTOR raster = eopatch[raster_type, raster_name] height, width = raster.shape[:2] if raster_type.is_timeless() else raster.shape[1:3] if self.raster_dtype: raster = raster.astype(self.raster_dtype) affine_transform = rasterio.transform.from_bounds(*eopatch.bbox, width=width, height=height) if raster_type.is_timeless(): eopatch[vector_type, vector_name] = self._vectorize_single_raster(raster, affine_transform, crs) else: timestamps = eopatch.get_timestamps() gpd_list = [ self._vectorize_single_raster(raster[idx, ...], affine_transform, crs, timestamps[idx]) for idx in range(raster.shape[0]) ] eopatch[vector_type, vector_name] = GeoDataFrame( pd.concat(gpd_list, ignore_index=True), crs=gpd_list[0].crs ) return eopatch
def _is_geopandas_object(data: object) -> bool: """A frequently used check if object is geopandas `GeoDataFrame` or `GeoSeries`""" return isinstance(data, (GeoDataFrame, GeoSeries)) def _vector_is_timeless(vector_input: GeoDataFrame | tuple[FeatureType, Any]) -> bool: """Used to check if the vector input (either geopandas object EOPatch Feature) is time independent""" if _is_geopandas_object(vector_input): return TIMESTAMP_COLUMN not in vector_input vector_type, _ = vector_input return vector_type.is_timeless()