Skip to content

raster

This module contains functions that process or create raster/image data.

reproject_raster #

reproject_raster(
    dataset_path: str | Path,
    target_crs: str | int,
    target_path: str | Path,
    logger: Logger = LOGGER,
) -> Path | None

Parameters:

Name Type Description Default
dataset_path str | Path

Path to the dataset to be reprojected.

required
target_crs str | int

EPSG code in string or int format. Can be given in the following ways: 5070 | "5070" | "EPSG:5070"

required
target_path str | Path

Path and filename for reprojected dataset.

required
logger Logger

Logger instance

LOGGER

Returns:

Source code in geospatial_tools/raster.py
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
def reproject_raster(
    dataset_path: str | pathlib.Path,
    target_crs: str | int,
    target_path: str | pathlib.Path,
    logger: logging.Logger = LOGGER,
) -> pathlib.Path | None:
    """

    Args:
      dataset_path: Path to the dataset to be reprojected.
      target_crs: EPSG code in string or int format. Can be given in the following ways: 5070 | "5070" | "EPSG:5070"
      target_path: Path and filename for reprojected dataset.
      logger: Logger instance

    Returns:


    """
    if isinstance(dataset_path, str):
        dataset_path = pathlib.Path(dataset_path)

    if isinstance(target_path, str):
        target_path = pathlib.Path(target_path)

    target_crs = create_crs(target_crs, logger=logger)

    with rasterio.open(dataset_path) as source_dataset:
        transform, width, height = calculate_default_transform(
            source_dataset.crs, target_crs, source_dataset.width, source_dataset.height, *source_dataset.bounds
        )
        kwargs = source_dataset.meta.copy()
        kwargs.update({"crs": target_crs, "transform": transform, "width": width, "height": height})

        with rasterio.open(target_path, "w", **kwargs) as reprojected_dataset:
            for i in range(1, source_dataset.count + 1):
                reproject(
                    source=rasterio.band(source_dataset, i),
                    destination=rasterio.band(reprojected_dataset, i),
                    src_transform=source_dataset.transform,
                    src_crs=source_dataset.crs,
                    dst_transform=transform,
                    dst_crs=target_crs,
                    resampling=Resampling.nearest,
                )
    if target_path.exists():
        logger.info(f"Reprojected file created at {target_path}")
        return target_path
    logger.error(f"Failed to reproject file {dataset_path}")
    return None

clip_raster_with_polygon #

clip_raster_with_polygon(
    raster_image: Path | str,
    polygon_layer: Path | str | GeoDataFrame,
    base_output_filename: str | None = None,
    output_dir: str | Path = DATA_DIR,
    num_of_workers: int | None = None,
    logger: Logger = LOGGER,
) -> list[Path]

Parameters:

Name Type Description Default
raster_image Path | str

Path to raster image to be clipped.

required
polygon_layer Path | str | GeoDataFrame

Polygon layer which polygons will be used to clip the raster image.

required
base_output_filename str | None

Base filename for outputs. If None, will be taken from input polygon layer.

None
output_dir str | Path

Directory path where output will be written.

DATA_DIR
num_of_workers int | None

The number of processes to use for parallel execution. Defaults to cpu_count().

None
logger Logger

Logger instance

LOGGER

Returns:

Source code in geospatial_tools/raster.py
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
def clip_raster_with_polygon(
    raster_image: pathlib.Path | str,
    polygon_layer: pathlib.Path | str | GeoDataFrame,
    base_output_filename: str | None = None,
    output_dir: str | pathlib.Path = DATA_DIR,
    num_of_workers: int | None = None,
    logger: logging.Logger = LOGGER,
) -> list[pathlib.Path]:
    """

    Args:
      raster_image: Path to raster image to be clipped.
      polygon_layer: Polygon layer which polygons will be used to clip the raster image.
      base_output_filename: Base filename for outputs. If `None`, will be taken from input polygon layer.
      output_dir: Directory path where output will be written.
      num_of_workers: The number of processes to use for parallel execution. Defaults to `cpu_count()`.
      logger: Logger instance

    Returns:


    """
    workers = cpu_count()
    if num_of_workers:
        workers = num_of_workers

    logger.info(f"Number of workers used: {workers}")
    logger.info(f"Output path : [{output_dir}]")

    if isinstance(raster_image, str):
        raster_image = pathlib.Path(raster_image)
    if not raster_image.exists():
        logger.error("Raster image does not exist")
        return []

    if not base_output_filename:
        base_output_filename = raster_image.stem

    if isinstance(output_dir, str):
        output_dir = pathlib.Path(output_dir)
    output_dir.mkdir(parents=True, exist_ok=True)

    gdf = polygon_layer
    if not isinstance(polygon_layer, GeoDataFrame):
        gdf = gpd.read_file(polygon_layer)

    polygons = gdf["geometry"]
    ids = gdf.index

    id_polygon_list = zip(ids, polygons, strict=False)
    logger.info(f"Clipping raster image with {len(polygons)} polygons")
    with ProcessPoolExecutor(max_workers=workers) as executor:
        futures = [
            executor.submit(
                _clip_process,
                raster_image=raster_image,
                id_polygon=polygon_tuple,
                base_output_filename=base_output_filename,
                output_dir=output_dir,
                logger=logger,
            )
            for polygon_tuple in id_polygon_list
        ]
        results = [future.result() for future in concurrent.futures.as_completed(futures)]
    path_list = []
    for result in results:
        if isinstance(result, tuple):
            logger.debug(f"Writing file successful : [{result}]")
            path_list.append(result[2])
    logger.info("Clipping process finished")
    return path_list

get_total_band_count #

get_total_band_count(
    raster_file_list: list[Path | str], logger: Logger = LOGGER
) -> int

Parameters:

Name Type Description Default
raster_file_list list[Path | str]

List of raster files to be processed.

required
logger Logger

Logger instance

LOGGER

Returns:

Source code in geospatial_tools/raster.py
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
def get_total_band_count(raster_file_list: list[pathlib.Path | str], logger: logging.Logger = LOGGER) -> int:
    """

    Args:
      raster_file_list: List of raster files to be processed.
      logger: Logger instance

    Returns:


    """
    total_band_count = 0
    for raster in raster_file_list:
        with rasterio.open(raster, "r") as raster_image:
            total_band_count += raster_image.count
    logger.info(f"Calculated a total of [{total_band_count}] bands")
    return total_band_count

create_merged_raster_bands_metadata #

create_merged_raster_bands_metadata(
    raster_file_list: list[Path | str], logger: Logger = LOGGER
) -> dict

Parameters:

Name Type Description Default
raster_file_list list[Path | str]
required
logger Logger
LOGGER

Returns:

Source code in geospatial_tools/raster.py
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
def create_merged_raster_bands_metadata(
    raster_file_list: list[pathlib.Path | str], logger: logging.Logger = LOGGER
) -> dict:
    """

    Args:
      raster_file_list:
      logger:

    Returns:


    """
    logger.info("Creating merged asset metadata")
    total_band_count = get_total_band_count(raster_file_list)
    with rasterio.open(raster_file_list[0]) as meta_source:
        meta = meta_source.meta
        meta.update(count=total_band_count)
    return meta

merge_raster_bands #

merge_raster_bands(
    raster_file_list: list[Path | str],
    merged_filename: Path | str,
    merged_band_names: list[str] = None,
    merged_metadata: dict = None,
    logger: Logger = LOGGER,
) -> Path | None

This function aims to combine multiple overlapping raster bands into a single raster image.

Example use case: I have 3 bands, B0, B1 and B2, each as an independent raster file (like is the case with downloaded STAC data.

While it can probably be used to create spatial time series, and not just combine bands from a single image product, it has not yet been tested for that specific purpose.

Parameters:

Name Type Description Default
raster_file_list list[Path | str]

List of raster files to be processed.

required
merged_filename Path | str

Name of output raster file.

required
merged_metadata dict

Dictionary of metadata to use if you prefer to great it independently.

None
merged_band_names list[str]

Names of final output raster bands. For example : I have 3 images representing each

None

a single band; raster_file_list = ["image01_B0.tif", "image01_B1.tif", "image01_B2.tif"]. With, merged_band_names, individual band id can be assigned for the final output raster; ["B0", "B1", "B2"]. logger: Logger instance

Returns:

Source code in geospatial_tools/raster.py
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
def merge_raster_bands(
    raster_file_list: list[pathlib.Path | str],
    merged_filename: pathlib.Path | str,
    merged_band_names: list[str] = None,
    merged_metadata: dict = None,
    logger: logging.Logger = LOGGER,
) -> pathlib.Path | None:
    """
    This function aims to combine multiple overlapping raster bands into a single raster image.

    Example use case: I have 3 bands, B0, B1 and B2, each as an independent raster file (like is the case with
    downloaded STAC data.

    While it can probably be used to create spatial time series, and not just combine bands
    from a single image product, it has not yet been tested for that specific purpose.

    Args:
      raster_file_list: List of raster files to be processed.
      merged_filename: Name of output raster file.
      merged_metadata: Dictionary of metadata to use if you prefer to great it independently.
      merged_band_names: Names of final output raster bands. For example : I have 3 images representing each
    a single band; raster_file_list =  ["image01_B0.tif", "image01_B1.tif", "image01_B2.tif"].
    With, merged_band_names, individual band id can be assigned for the final output raster;
    ["B0", "B1", "B2"].
      logger: Logger instance

    Returns:
    """
    if not merged_metadata:
        merged_metadata = create_merged_raster_bands_metadata(raster_file_list)

    merged_image_index = 1
    band_names_index = 0

    logger.info(f"Merging asset [{merged_filename}] ...")
    # Create the final raster image in which all bands will be written to
    with rasterio.open(merged_filename, "w", **merged_metadata) as merged_asset_image:
        # Iterate through the raster file list to be merged
        for raster_file in raster_file_list:
            asset_name = pathlib.Path(raster_file).name
            logger.info(f"Writing band image: {asset_name}")
            with rasterio.open(raster_file) as source_image:
                num_of_bands = source_image.count

                # Iterate through each band of the raster file
                for source_image_band_index in range(1, num_of_bands + 1):
                    logger.info(
                        f"Writing asset sub item band {source_image_band_index} "
                        f"to merged index band {merged_image_index}"
                    )
                    # Write band to output merged_asset_image
                    merged_asset_image.write_band(merged_image_index, source_image.read(source_image_band_index))
                    _handle_band_metadata(
                        source_image=source_image,
                        source_image_band_index=source_image_band_index,
                        band_names_index=band_names_index,
                        merged_asset_image=merged_asset_image,
                        merged_band_names=merged_band_names,
                        merged_image_index=merged_image_index,
                    )
                    merged_image_index += 1
                band_names_index += 1

    if not merged_filename.exists():
        return None

    return merged_filename