Skip to content

geospatial_tools

auth #

This module contains authentication-related functions.

get_copernicus_credentials #

get_copernicus_credentials(logger: Logger = LOGGER) -> tuple[str, str] | None

Retrieves Copernicus credentials from environment variables or prompts the user.

This function first checks for COPERNICUS_USERNAME and COPERNICUS_PASSWORD environment variables. If they are not set, it interactively prompts the user for their username and password.

Using environment variables is recommended for security and to comply with the 12-factor app methodology, which separates configuration from code. This prevents hardcoding sensitive information and makes the application more portable across different environments (development, testing, production).

Parameters:

Name Type Description Default
logger Logger

Logger instance.

LOGGER

Returns:

Type Description
tuple[str, str] | None

A tuple containing the username and password, or None if they could not be

tuple[str, str] | None

obtained.

Source code in src/geospatial_tools/auth.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
def get_copernicus_credentials(logger: logging.Logger = LOGGER) -> tuple[str, str] | None:
    """
    Retrieves Copernicus credentials from environment variables or prompts the user.

    This function first checks for `COPERNICUS_USERNAME` and `COPERNICUS_PASSWORD`
    environment variables. If they are not set, it interactively prompts the user
    for their username and password.

    Using environment variables is recommended for security and to comply with the
    12-factor app methodology, which separates configuration from code. This prevents
    hardcoding sensitive information and makes the application more portable across
    different environments (development, testing, production).

    Args:
        logger: Logger instance.

    Returns:
        A tuple containing the username and password, or None if they could not be
        obtained.
    """
    logger.info("Retrieving Copernicus credentials...")
    username = os.environ.get("COPERNICUS_USERNAME")
    password = os.environ.get("COPERNICUS_PASSWORD")

    if not username:
        logger.warning("COPERNICUS_USERNAME environment variable not set.")
        try:
            username = input("Enter your Copernicus username: ")
        except EOFError:
            logger.error("Could not read username from prompt.")
            return None

    if not password:
        logger.warning("COPERNICUS_PASSWORD environment variable not set.")
        try:
            password = getpass.getpass("Enter your Copernicus password: ")
        except EOFError:
            logger.error("Could not read password from prompt.")
            return None

    if not username or not password:
        logger.error("Username or password could not be obtained. Cannot proceed with authentication.")
        return None

    logger.info("Successfully retrieved Copernicus credentials.")
    return username, password

get_copernicus_token #

get_copernicus_token(logger: Logger = LOGGER) -> str | None

Retrieves an access token from the Copernicus Data Space Ecosystem.

This function uses the credentials obtained from get_copernicus_credentials to request an access token from the authentication endpoint.

Parameters:

Name Type Description Default
logger Logger

Logger instance.

LOGGER

Returns:

Type Description
str | None

The access token as a string, or None if authentication fails.

Source code in src/geospatial_tools/auth.py
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
def get_copernicus_token(logger: logging.Logger = LOGGER) -> str | None:
    """
    Retrieves an access token from the Copernicus Data Space Ecosystem.

    This function uses the credentials obtained from `get_copernicus_credentials`
    to request an access token from the authentication endpoint.

    Args:
        logger: Logger instance.

    Returns:
        The access token as a string, or None if authentication fails.
    """
    credentials = get_copernicus_credentials(logger)
    if not credentials:
        return None

    username, password = credentials
    data = {
        "client_id": "cdse-public",
        "username": username,
        "password": password,
        "grant_type": "password",
    }

    try:
        response = requests.post(COPERNICUS_AUTH_URL, data=data, timeout=10)
        response.raise_for_status()
        token_data = response.json()
        access_token = token_data.get("access_token")
        if access_token:
            logger.info("Successfully obtained Copernicus access token.")
            return access_token
        logger.error("Access token not found in response.")
        return None
    except requests.exceptions.RequestException as e:
        logger.error(f"Failed to obtain access token: {e}")
        return None

copernicus #

Copernicus Data Space Ecosystem (CDSE) tools and constants.

CopernicusS2Band #

Bases: str, Enum

Copernicus Sentinel-2 Bands for Level-2A.

The value of each member corresponds to the asset key for the band. Base band names (e.g., 'B02') default to their native resolution. Explicit resolution members (e.g., 'B02_20m') are also provided.

base_name property #

base_name: str

Returns the base name of the band (e.g., 'B02').

native_res property #

native_res: int

Returns the native resolution of the band in meters.

Defaults to 10m if band base name is not recognized.

at_res #

at_res(resolution: int | CopernicusS2Resolution) -> str

Returns the asset key for this band at the specified resolution.

Parameters:

Name Type Description Default
resolution int | CopernicusS2Resolution

The resolution to get the key for (e.g., 20 or CopernicusS2Resolution.R20M).

required

Returns:

Type Description
str

The asset key string (e.g., 'B02_20m').

Source code in src/geospatial_tools/copernicus/sentinel_2.py
174
175
176
177
178
179
180
181
182
183
184
185
def at_res(self, resolution: int | CopernicusS2Resolution) -> str:
    """
    Returns the asset key for this band at the specified resolution.

    Args:
        resolution: The resolution to get the key for (e.g., 20 or CopernicusS2Resolution.R20M).

    Returns:
        The asset key string (e.g., 'B02_20m').
    """
    res_val = resolution.value if isinstance(resolution, CopernicusS2Resolution) else resolution
    return f"{self.base_name}_{res_val}m"

__str__ #

__str__() -> str

Returns the band name as a string.

Source code in src/geospatial_tools/copernicus/sentinel_2.py
187
188
189
def __str__(self) -> str:
    """Returns the band name as a string."""
    return f"{self.value}"

__repr__ #

__repr__() -> str

Returns the band name as a string.

Source code in src/geospatial_tools/copernicus/sentinel_2.py
191
192
193
def __repr__(self) -> str:
    """Returns the band name as a string."""
    return f"{self.value}"

CopernicusS2Collection #

Bases: str, Enum

Copernicus Sentinel-2 Collections.

__str__ #

__str__() -> str

Returns the collection name as a string.

Source code in src/geospatial_tools/copernicus/sentinel_2.py
39
40
41
def __str__(self) -> str:
    """Returns the collection name as a string."""
    return f"{self.value}"

__repr__ #

__repr__() -> str

Returns the band name as a string.

Source code in src/geospatial_tools/copernicus/sentinel_2.py
43
44
45
def __repr__(self) -> str:
    """Returns the band name as a string."""
    return f"{self.value}"

CopernicusS2Resolution #

Bases: int, Enum

Copernicus Sentinel-2 Resolutions in meters.

__str__ #

__str__() -> str

Returns the resolution as a string with 'm' suffix.

Source code in src/geospatial_tools/copernicus/sentinel_2.py
55
56
57
def __str__(self) -> str:
    """Returns the resolution as a string with 'm' suffix."""
    return f"{self.value}m"

__repr__ #

__repr__() -> str

Returns the band name as a string.

Source code in src/geospatial_tools/copernicus/sentinel_2.py
59
60
61
def __repr__(self) -> str:
    """Returns the band name as a string."""
    return f"{self.value}"

sentinel_2 #

This module contains Enums for Sentinel-2 on Copernicus Data Space Ecosystem (CDSE).

CopernicusS2Collection #

Bases: str, Enum

Copernicus Sentinel-2 Collections.

__str__ #
__str__() -> str

Returns the collection name as a string.

Source code in src/geospatial_tools/copernicus/sentinel_2.py
39
40
41
def __str__(self) -> str:
    """Returns the collection name as a string."""
    return f"{self.value}"
__repr__ #
__repr__() -> str

Returns the band name as a string.

Source code in src/geospatial_tools/copernicus/sentinel_2.py
43
44
45
def __repr__(self) -> str:
    """Returns the band name as a string."""
    return f"{self.value}"

CopernicusS2Resolution #

Bases: int, Enum

Copernicus Sentinel-2 Resolutions in meters.

__str__ #
__str__() -> str

Returns the resolution as a string with 'm' suffix.

Source code in src/geospatial_tools/copernicus/sentinel_2.py
55
56
57
def __str__(self) -> str:
    """Returns the resolution as a string with 'm' suffix."""
    return f"{self.value}m"
__repr__ #
__repr__() -> str

Returns the band name as a string.

Source code in src/geospatial_tools/copernicus/sentinel_2.py
59
60
61
def __repr__(self) -> str:
    """Returns the band name as a string."""
    return f"{self.value}"

CopernicusS2Band #

Bases: str, Enum

Copernicus Sentinel-2 Bands for Level-2A.

The value of each member corresponds to the asset key for the band. Base band names (e.g., 'B02') default to their native resolution. Explicit resolution members (e.g., 'B02_20m') are also provided.

base_name property #
base_name: str

Returns the base name of the band (e.g., 'B02').

native_res property #
native_res: int

Returns the native resolution of the band in meters.

Defaults to 10m if band base name is not recognized.

at_res #
at_res(resolution: int | CopernicusS2Resolution) -> str

Returns the asset key for this band at the specified resolution.

Parameters:

Name Type Description Default
resolution int | CopernicusS2Resolution

The resolution to get the key for (e.g., 20 or CopernicusS2Resolution.R20M).

required

Returns:

Type Description
str

The asset key string (e.g., 'B02_20m').

Source code in src/geospatial_tools/copernicus/sentinel_2.py
174
175
176
177
178
179
180
181
182
183
184
185
def at_res(self, resolution: int | CopernicusS2Resolution) -> str:
    """
    Returns the asset key for this band at the specified resolution.

    Args:
        resolution: The resolution to get the key for (e.g., 20 or CopernicusS2Resolution.R20M).

    Returns:
        The asset key string (e.g., 'B02_20m').
    """
    res_val = resolution.value if isinstance(resolution, CopernicusS2Resolution) else resolution
    return f"{self.base_name}_{res_val}m"
__str__ #
__str__() -> str

Returns the band name as a string.

Source code in src/geospatial_tools/copernicus/sentinel_2.py
187
188
189
def __str__(self) -> str:
    """Returns the band name as a string."""
    return f"{self.value}"
__repr__ #
__repr__() -> str

Returns the band name as a string.

Source code in src/geospatial_tools/copernicus/sentinel_2.py
191
192
193
def __repr__(self) -> str:
    """Returns the band name as a string."""
    return f"{self.value}"

download #

download_usa_polygon #

download_usa_polygon(
    output_name: str = USA_POLYGON, output_directory: str | Path = DATA_DIR
) -> list[str | Path]

Download USA polygon file.

Parameters:

Name Type Description Default
output_name str

What name to give to downloaded file

USA_POLYGON
output_directory str | Path

Where to save the downloaded file

DATA_DIR

Returns:

Source code in src/geospatial_tools/download.py
39
40
41
42
43
44
45
46
47
48
49
50
51
52
def download_usa_polygon(output_name: str = USA_POLYGON, output_directory: str | Path = DATA_DIR) -> list[str | Path]:
    """
    Download USA polygon file.

    Args:
      output_name: What name to give to downloaded file
      output_directory: Where to save the downloaded file

    Returns:
    """
    file_list = _download_from_link(
        target_download=USA_POLYGON, output_name=output_name, output_directory=output_directory
    )
    return file_list

download_s2_tiling_grid #

download_s2_tiling_grid(
    output_name: str = SENTINEL_2_TILLING_GRID, output_directory: str | Path = DATA_DIR
) -> list[str | Path]

" Download Sentinel 2 tiling grid file.

Parameters:

Name Type Description Default
output_name str

What name to give to downloaded file

SENTINEL_2_TILLING_GRID
output_directory str | Path

Where to save the downloaded file

DATA_DIR

Returns:

Source code in src/geospatial_tools/download.py
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
def download_s2_tiling_grid(
    output_name: str = SENTINEL_2_TILLING_GRID, output_directory: str | Path = DATA_DIR
) -> list[str | Path]:
    """
    " Download Sentinel 2 tiling grid file.

    Args:
      output_name: What name to give to downloaded file
      output_directory: Where to save the downloaded file

    Returns:
    """
    file_list = _download_from_link(
        target_download=SENTINEL_2_TILLING_GRID, output_name=output_name, output_directory=output_directory
    )
    return file_list

geotools_types #

This module contains constants and functions pertaining to data types.

BBoxLike module-attribute #

BBoxLike = tuple[float, float, float, float]

BBox like tuple structure used for type checking.

IntersectsLike module-attribute #

IntersectsLike = (
    Point
    | Polygon
    | LineString
    | MultiPolygon
    | MultiPoint
    | MultiLineString
    | GeometryCollection
)

Intersect-like union of types used for type checking.

DateLike module-attribute #

DateLike = (
    datetime
    | str
    | None
    | tuple[datetime | str | None, datetime | str | None]
    | list[datetime | str | None]
    | Iterator[datetime | str | None]
)

Date-like union of types used for type checking.

planetary_computer #

sentinel_2 #

BestProductsForFeatures #

BestProductsForFeatures(
    sentinel2_tiling_grid: GeoDataFrame,
    sentinel2_tiling_grid_column: str,
    vector_features: GeoDataFrame,
    vector_features_column: str,
    date_ranges: list[str] | None = None,
    max_cloud_cover: int = 5,
    max_no_data_value: int = 5,
    logger: Logger = LOGGER,
)

Class made to facilitate and automate searching for Sentinel 2 products using the Sentinel 2 tiling grid as a reference.

Current limitation is that vector features used must fit, or be completely contained inside a single Sentinel 2 tiling grid.

For larger features, a mosaic of products will be necessary.

This class was conceived first and foremost to be used for numerous smaller vector features, like polygon grids created from geospatial_tools.vector.create_vector_grid

Parameters:

Name Type Description Default
sentinel2_tiling_grid GeoDataFrame

GeoDataFrame containing Sentinel 2 tiling grid

required
sentinel2_tiling_grid_column str

Name of the column in sentinel2_tiling_grid that contains the tile names (ex tile name: 10SDJ)

required
vector_features GeoDataFrame

GeoDataFrame containing the vector features for which the best Sentinel 2 products will be chosen for.

required
vector_features_column str

Name of the column in vector_features where the best Sentinel 2 products will be written to

required
date_ranges list[str] | None

Date range used to search for Sentinel 2 products. should be created using geospatial_tools.utils.create_date_range_for_specific_period separately, or BestProductsForFeatures.create_date_range after initialization.

None
max_cloud_cover int

Maximum cloud cover used to search for Sentinel 2 products.

5
logger Logger

Logger instance

LOGGER
Source code in src/geospatial_tools/planetary_computer/sentinel_2.py
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
def __init__(
    self,
    sentinel2_tiling_grid: GeoDataFrame,
    sentinel2_tiling_grid_column: str,
    vector_features: GeoDataFrame,
    vector_features_column: str,
    date_ranges: list[str] | None = None,
    max_cloud_cover: int = 5,
    max_no_data_value: int = 5,
    logger: logging.Logger = LOGGER,
) -> None:
    """

    Args:
        sentinel2_tiling_grid: GeoDataFrame containing Sentinel 2 tiling grid
        sentinel2_tiling_grid_column: Name of the column in `sentinel2_tiling_grid` that contains the tile names
            (ex tile name: 10SDJ)
        vector_features: GeoDataFrame containing the vector features for which the best Sentinel 2
            products will be chosen for.
        vector_features_column: Name of the column in `vector_features` where the best Sentinel 2 products
            will be written to
        date_ranges: Date range used to search for Sentinel 2 products. should be created using
            `geospatial_tools.utils.create_date_range_for_specific_period` separately,
            or `BestProductsForFeatures.create_date_range` after initialization.
        max_cloud_cover: Maximum cloud cover used to search for Sentinel 2 products.
        logger: Logger instance
    """
    self.logger = logger
    self.sentinel2_tiling_grid = sentinel2_tiling_grid
    self.sentinel2_tiling_grid_column = sentinel2_tiling_grid_column
    self.sentinel2_tile_list = sentinel2_tiling_grid["name"].to_list()
    self.vector_features = vector_features
    self.vector_features_column = vector_features_column
    self.vector_features_best_product_column = "best_s2_product_id"
    self.vector_features_with_products = None
    self._date_ranges = date_ranges
    self._max_cloud_cover = max_cloud_cover
    self.max_no_data_value = max_no_data_value
    self.successful_results: dict[Any, Any] = {}
    self.incomplete_results: list[Any] = []
    self.error_results: list[Any] = []
max_cloud_cover property writable #
max_cloud_cover

Max % of cloud cover used for Sentinel 2 product search.

date_ranges property writable #
date_ranges

Date range used to search for Sentinel 2 products.

create_date_ranges #
create_date_ranges(
    start_year: int, end_year: int, start_month: int, end_month: int
) -> list[str]

This function create a list of date ranges.

For example, I want to create date ranges for 2020 and 2021, but only for the months from March to May. I therefore expect to have 2 ranges: [2020-03-01 to 2020-05-30, 2021-03-01 to 2021-05-30].

Handles the automatic definition of the last day for the end month, as well as periods that cross over years

For example, I want to create date ranges for 2020 and 2022, but only for the months from November to January. I therefore expect to have 2 ranges: [2020-11-01 to 2021-01-31, 2021-11-01 to 2022-01-31].

Parameters:

Name Type Description Default
start_year int

Start year for ranges

required
end_year int

End year for ranges

required
start_month int

Starting month for each period

required
end_month int

End month for each period (inclusively)

required

Returns:

Type Description
list[str]

List of date ranges

Source code in src/geospatial_tools/planetary_computer/sentinel_2.py
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
def create_date_ranges(self, start_year: int, end_year: int, start_month: int, end_month: int) -> list[str]:
    """
    This function create a list of date ranges.

    For example, I want to create date ranges for 2020 and 2021, but only for the months from March to May.
    I therefore expect to have 2 ranges: [2020-03-01 to 2020-05-30, 2021-03-01 to 2021-05-30].

    Handles the automatic definition of the last day for the end month, as well as periods that cross over years

    For example, I want to create date ranges for 2020 and 2022, but only for the months from November to January.
    I therefore expect to have 2 ranges: [2020-11-01 to 2021-01-31, 2021-11-01 to 2022-01-31].

    Args:
      start_year: Start year for ranges
      end_year: End year for ranges
      start_month: Starting month for each period
      end_month: End month for each period (inclusively)

    Returns:
        List of date ranges
    """
    self.date_ranges = create_date_range_for_specific_period(
        start_year=start_year, end_year=end_year, start_month_range=start_month, end_month_range=end_month
    )
    return self.date_ranges
find_best_complete_products #
find_best_complete_products(
    max_cloud_cover: int | None = None, max_no_data_value: int = 5
) -> dict

Finds the best complete products for each Sentinel 2 tiles. This function will filter out all products that have more than 5% of nodata values.

Filtered out tiles will be stored in self.incomplete and tiles for which the search has found no results will be stored in self.error_list

Parameters:

Name Type Description Default
max_cloud_cover int | None

Max percentage of cloud cover allowed used for the search (Default value = None)

None
max_no_data_value int

Max percentage of no-data coverage by individual Sentinel 2 product (Default value = 5)

5

Returns:

Type Description
dict

Dictionary of product IDs and their corresponding Sentinel 2 tile names.

Source code in src/geospatial_tools/planetary_computer/sentinel_2.py
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
def find_best_complete_products(self, max_cloud_cover: int | None = None, max_no_data_value: int = 5) -> dict:
    """
    Finds the best complete products for each Sentinel 2 tiles. This function will filter out all products that have
    more than 5% of nodata values.

    Filtered out tiles will be stored in `self.incomplete` and tiles for which
    the search has found no results will be stored in `self.error_list`

    Args:
      max_cloud_cover: Max percentage of cloud cover allowed used for the search  (Default value = None)
      max_no_data_value: Max percentage of no-data coverage by individual Sentinel 2 product  (Default value = 5)

    Returns:
        Dictionary of product IDs and their corresponding Sentinel 2 tile names.
    """
    cloud_cover = self.max_cloud_cover
    if max_cloud_cover:
        cloud_cover = max_cloud_cover
    no_data_value = self.max_no_data_value
    if max_no_data_value:
        no_data_value = max_no_data_value

    tile_dict, incomplete_list, error_list = find_best_product_per_s2_tile(
        date_ranges=self.date_ranges,
        max_cloud_cover=cloud_cover,
        s2_tile_grid_list=self.sentinel2_tile_list,
        num_of_workers=4,
        max_no_data_value=no_data_value,
    )
    self.successful_results = tile_dict
    self.incomplete_results = incomplete_list
    if incomplete_list:
        self.logger.warning(
            "Warning, some of the input Sentinel 2 tiles do not have products covering the entire tile. "
            "These tiles will need to be handled differently (ex. creating a mosaic with multiple products"
        )
        self.logger.warning(f"Incomplete list: {incomplete_list}")
    self.error_results = error_list
    if error_list:
        self.logger.warning(
            "Warning, products for some Sentinel 2 tiles could not be found. "
            "Consider either extending date range input or max cloud cover"
        )
        self.logger.warning(f"Error list: {error_list}")
    return self.successful_results
select_best_products_per_feature #
select_best_products_per_feature() -> GeoDataFrame

Return a GeoDataFrame containing the best products for each Sentinel 2 tile.

Source code in src/geospatial_tools/planetary_computer/sentinel_2.py
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
def select_best_products_per_feature(self) -> GeoDataFrame:
    """Return a GeoDataFrame containing the best products for each Sentinel 2 tile."""
    spatial_join_results = spatial_join_within(
        polygon_features=self.sentinel2_tiling_grid,
        polygon_column=self.sentinel2_tiling_grid_column,
        vector_features=self.vector_features,
        vector_column_name=self.vector_features_column,
    )
    write_best_product_ids_to_dataframe(
        spatial_join_results=spatial_join_results,
        tile_dictionary=self.successful_results,
        best_product_column=self.vector_features_best_product_column,
        s2_tiles_column=self.vector_features_column,
    )
    self.vector_features_with_products = spatial_join_results
    return self.vector_features_with_products
to_file #
to_file(output_dir: str | Path) -> None

Parameters:

Name Type Description Default
output_dir str | Path

Output directory used to write to file

required
Source code in src/geospatial_tools/planetary_computer/sentinel_2.py
199
200
201
202
203
204
205
206
207
208
209
210
211
def to_file(self, output_dir: str | pathlib.Path) -> None:
    """

    Args:
      output_dir: Output directory used to write to file
    """
    write_results_to_file(
        cloud_cover=self.max_cloud_cover,
        successful_results=self.successful_results,
        incomplete_results=self.incomplete_results,
        error_results=self.error_results,
        output_dir=output_dir,
    )
sentinel_2_complete_tile_search(
    tile_id: int,
    date_ranges: list[str],
    max_cloud_cover: int,
    max_no_data_value: int = 5,
) -> tuple[int, str, float | None, float | None] | None

Parameters:

Name Type Description Default
tile_id int
required
date_ranges list[str]
required
max_cloud_cover int
required
max_no_data_value int

(Default value = 5)

5

Returns:

Source code in src/geospatial_tools/planetary_computer/sentinel_2.py
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
def sentinel_2_complete_tile_search(
    tile_id: int,
    date_ranges: list[str],
    max_cloud_cover: int,
    max_no_data_value: int = 5,
) -> tuple[int, str, float | None, float | None] | None:
    """

    Args:
      tile_id:
      date_ranges:
      max_cloud_cover:
      max_no_data_value: (Default value = 5)

    Returns:


    """
    client = StacSearch(PLANETARY_COMPUTER)
    collection = "sentinel-2-l2a"
    tile_ids = [tile_id]
    query = {"eo:cloud_cover": {"lt": max_cloud_cover}, "s2:mgrs_tile": {"in": tile_ids}}
    sortby = [{"field": "properties.eo:cloud_cover", "direction": "asc"}]

    client.search_for_date_ranges(
        date_ranges=date_ranges, collections=collection, query=query, sortby=sortby, limit=100
    )
    try:
        sorted_items = client.sort_results_by_cloud_coverage()
        if not sorted_items:
            return tile_id, "error: No results found", None, None
        filtered_items = client.filter_no_data(
            property_name="s2:nodata_pixel_percentage", max_no_data_value=max_no_data_value
        )
        if not filtered_items:
            return tile_id, "incomplete: No results found that cover the entire tile", None, None
        optimal_result = filtered_items[0]
        if optimal_result:
            return (
                tile_id,
                optimal_result.id,
                optimal_result.properties["eo:cloud_cover"],
                optimal_result.properties["s2:nodata_pixel_percentage"],
            )

    except (IndexError, TypeError) as error:
        print(error)
        return tile_id, f"error: {error}", None, None
    return None

find_best_product_per_s2_tile #

find_best_product_per_s2_tile(
    date_ranges: list[str],
    max_cloud_cover: int,
    s2_tile_grid_list: list,
    max_no_data_value: int = 5,
    num_of_workers: int = 4,
)

Parameters:

Name Type Description Default
date_ranges list[str]
required
max_cloud_cover int
required
s2_tile_grid_list list
required
max_no_data_value int

(Default value = 5)

5
num_of_workers int

(Default value = 4)

4

Returns:

Source code in src/geospatial_tools/planetary_computer/sentinel_2.py
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
def find_best_product_per_s2_tile(
    date_ranges: list[str],
    max_cloud_cover: int,
    s2_tile_grid_list: list,
    max_no_data_value: int = 5,
    num_of_workers: int = 4,
):
    """

    Args:
      date_ranges:
      max_cloud_cover:
      s2_tile_grid_list:
      max_no_data_value:  (Default value = 5)
      num_of_workers: (Default value = 4)

    Returns:


    """
    successful_results: dict[Any, Any] = {}
    for tile in s2_tile_grid_list:
        successful_results[tile] = ""
    incomplete_results = []
    error_results = []
    with ThreadPoolExecutor(max_workers=num_of_workers) as executor:
        future_to_tile = {
            executor.submit(
                sentinel_2_complete_tile_search,
                tile_id=tile,
                date_ranges=date_ranges,
                max_cloud_cover=max_cloud_cover,
                max_no_data_value=max_no_data_value,
            ): tile
            for tile in s2_tile_grid_list
        }

        for future in as_completed(future_to_tile):
            result = future.result()
            if result is None:
                continue
            tile_id, optimal_result_id, result_cloud_cover, no_data = result
            if optimal_result_id.startswith("error:"):
                error_results.append(tile_id)
                continue
            if optimal_result_id.startswith("incomplete:"):
                incomplete_results.append(tile_id)
                continue
            successful_results[tile_id] = {
                "id": optimal_result_id,
                "cloud_cover": result_cloud_cover,
                "no_data": no_data,
            }
        cleaned_successful_results = {k: v for k, v in successful_results.items() if v != ""}
    return cleaned_successful_results, incomplete_results, error_results

write_best_product_ids_to_dataframe #

write_best_product_ids_to_dataframe(
    spatial_join_results: GeoDataFrame,
    tile_dictionary: dict,
    best_product_column: str = "best_s2_product_id",
    s2_tiles_column: str = "s2_tiles",
    logger: Logger = LOGGER,
) -> None

Parameters:

Name Type Description Default
spatial_join_results GeoDataFrame
required
tile_dictionary dict
required
best_product_column str
'best_s2_product_id'
s2_tiles_column str
's2_tiles'
logger Logger
LOGGER

Returns:

Source code in src/geospatial_tools/planetary_computer/sentinel_2.py
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
def write_best_product_ids_to_dataframe(
    spatial_join_results: GeoDataFrame,
    tile_dictionary: dict,
    best_product_column: str = "best_s2_product_id",
    s2_tiles_column: str = "s2_tiles",
    logger: logging.Logger = LOGGER,
) -> None:
    """

    Args:
      spatial_join_results:
      tile_dictionary:
      best_product_column:
      s2_tiles_column:
      logger:

    Returns:


    """
    logger.info("Writing best product IDs to dataframe")
    spatial_join_results[best_product_column] = spatial_join_results[s2_tiles_column].apply(
        lambda x: _get_best_product_id_for_each_grid_tile(s2_tile_search_results=tile_dictionary, feature_s2_tiles=x)
    )

write_results_to_file #

write_results_to_file(
    cloud_cover: int,
    successful_results: dict,
    incomplete_results: list | None = None,
    error_results: list | None = None,
    output_dir: str | Path = DATA_DIR,
    logger: Logger = LOGGER,
) -> dict

Parameters:

Name Type Description Default
cloud_cover int
required
successful_results dict
required
incomplete_results list | None
None
error_results list | None
None
output_dir str | Path
DATA_DIR
logger Logger
LOGGER

Returns:

Source code in src/geospatial_tools/planetary_computer/sentinel_2.py
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
def write_results_to_file(
    cloud_cover: int,
    successful_results: dict,
    incomplete_results: list | None = None,
    error_results: list | None = None,
    output_dir: str | pathlib.Path = DATA_DIR,
    logger: logging.Logger = LOGGER,
) -> dict:
    """

    Args:
      cloud_cover:
      successful_results:
      incomplete_results:
      error_results:
      output_dir:
      logger:

    Returns:


    """
    output_dir = pathlib.Path(output_dir)
    tile_filename = output_dir / f"data_lt{cloud_cover}cc.json"
    with open(tile_filename, "w", encoding="utf-8") as json_file:
        json.dump(successful_results, json_file, indent=4)
    logger.info(f"Results have been written to {tile_filename}")

    incomplete_filename: pathlib.Path | None = None
    if incomplete_results:
        incomplete_dict = {"incomplete": incomplete_results}
        incomplete_filename = output_dir / f"incomplete_lt{cloud_cover}cc.json"
        with open(incomplete_filename, "w", encoding="utf-8") as json_file:
            json.dump(incomplete_dict, json_file, indent=4)
        logger.info(f"Incomplete results have been written to {incomplete_filename}")

    error_filename: pathlib.Path | None = None
    if error_results:
        error_dict = {"errors": error_results}
        error_filename = output_dir / f"errors_lt{cloud_cover}cc.json"
        with open(error_filename, "w", encoding="utf-8") as json_file:
            json.dump(error_dict, json_file, indent=4)
        logger.info(f"Errors results have been written to {error_filename}")

    return {
        "tile_filename": tile_filename,
        "incomplete_filename": incomplete_filename,
        "errors_filename": error_filename,
    }

download_and_process_sentinel2_asset #

download_and_process_sentinel2_asset(
    product_id: str,
    product_bands: list[str],
    collections: str = "sentinel-2-l2a",
    target_projection: int | str | None = None,
    base_directory: str | Path = DATA_DIR,
    delete_intermediate_files: bool = False,
    logger: Logger = LOGGER,
) -> Asset

This function downloads a Sentinel 2 product based on the product ID provided.

It will download the individual asset bands provided in the bands argument, merge then all in a single tif and then reproject them to the input CRS.

Parameters:

Name Type Description Default
product_id str

ID of the Sentinel 2 product to be downloaded

required
product_bands list[str]

List of the product bands to be downloaded

required
collections str

Collections to be downloaded from. Defaults to sentinel-2-l2a

'sentinel-2-l2a'
target_projection int | str | None

The CRS project for the end product. If None, the reprojection step will be skipped

None
stac_client

StacSearch client to used. A new one will be created if not provided

required
base_directory str | Path

The base directory path where the downloaded files will be stored

DATA_DIR
delete_intermediate_files bool

Flag to determine if intermediate files should be deleted. Defaults to False

False
logger Logger

Logger instance

LOGGER

Returns:

Type Description
Asset

Asset instance

Source code in src/geospatial_tools/planetary_computer/sentinel_2.py
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
def download_and_process_sentinel2_asset(
    product_id: str,
    product_bands: list[str],
    collections: str = "sentinel-2-l2a",
    target_projection: int | str | None = None,
    base_directory: str | pathlib.Path = DATA_DIR,
    delete_intermediate_files: bool = False,
    logger: logging.Logger = LOGGER,
) -> Asset:
    """
    This function downloads a Sentinel 2 product based on the product ID provided.

    It will download the individual asset bands provided in the `bands` argument,
    merge then all in a single tif and then reproject them to the input CRS.

    Args:
      product_id: ID of the Sentinel 2 product to be downloaded
      product_bands: List of the product bands to be downloaded
      collections: Collections to be downloaded from. Defaults to `sentinel-2-l2a`
      target_projection: The CRS project for the end product. If `None`, the reprojection step will be
        skipped
      stac_client: StacSearch client to used. A new one will be created if not provided
      base_directory: The base directory path where the downloaded files will be stored
      delete_intermediate_files: Flag to determine if intermediate files should be deleted. Defaults to False
      logger: Logger instance

    Returns:
        Asset instance
    """
    base_file_name = f"{base_directory}/{product_id}"
    merged_file = f"{base_file_name}_merged.tif"
    reprojected_file = f"{base_file_name}_reprojected.tif"

    merged_file_exists = pathlib.Path(merged_file).exists()
    reprojected_file_exists = pathlib.Path(reprojected_file).exists()

    if reprojected_file_exists:
        logger.info(f"Reprojected file [{reprojected_file}] already exists")
        asset = Asset(asset_id=product_id, bands=product_bands, reprojected_asset=reprojected_file)
        return asset

    if merged_file_exists:
        logger.info(f"Merged file [{merged_file}] already exists")
        asset = Asset(asset_id=product_id, bands=product_bands, merged_asset_path=merged_file)
        if target_projection:
            logger.info(f"Reprojecting merged file [{merged_file}]")
            asset.reproject_merged_asset(
                base_directory=base_directory,
                target_projection=target_projection,
                delete_merged_asset=delete_intermediate_files,
            )
        return asset

    stac_client = StacSearch(catalog_name=PLANETARY_COMPUTER)
    items = stac_client.search(collections=collections, ids=[product_id])
    logger.info(items)
    asset_list = stac_client.download_search_results(bands=product_bands, base_directory=base_directory)
    logger.info(asset_list)
    asset = asset_list[0]
    asset.merge_asset(base_directory=base_directory, delete_sub_items=delete_intermediate_files)
    if not target_projection:
        logger.info("Skipping reprojection")
        return asset
    if target_projection:
        asset.reproject_merged_asset(
            target_projection=target_projection,
            base_directory=base_directory,
            delete_merged_asset=delete_intermediate_files,
        )
    return asset

radar #

nimrod #

extract_nimrod_from_archive #

extract_nimrod_from_archive(
    archive_file_path: str | Path, output_directory: str | Path | None = None
) -> Path

Extract nimrod data from an archive file. If no output directory is provided, the extracted data will be saved to the archive file's directory.

Parameters:

Name Type Description Default
archive_file_path str | Path

Path to the archive file

required
output_directory str | Path | None

Optional output directory.

None

Returns:

Type Description
Path

Path to the extracted nimrod data file

Source code in src/geospatial_tools/radar/nimrod.py
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
def extract_nimrod_from_archive(archive_file_path: str | Path, output_directory: str | Path | None = None) -> Path:
    """
    Extract nimrod data from an archive file. If no output directory is provided, the extracted data will be saved to
    the archive file's directory.

    Args:
        archive_file_path: Path to the archive file
        output_directory: Optional output directory.

    Returns:
            Path to the extracted nimrod data file
    """
    if isinstance(archive_file_path, str):
        archive_file_path = Path(archive_file_path)
    full_path = archive_file_path.resolve()
    parent_folder = archive_file_path.parent
    filename = archive_file_path.stem

    target_folder = None
    if output_directory:
        if isinstance(output_directory, str):
            output_directory = Path(output_directory)
        target_folder = output_directory

    if not target_folder:
        target_folder = parent_folder / filename

    target_folder.mkdir(parents=True, exist_ok=True)
    gzip_file_headers = parse_gzip_header(archive_file_path)
    contained_filename = gzip_file_headers["original_name"]

    with gzip.open(full_path, "rb") as f_in:
        if not contained_filename:
            contained_filename = filename
        print(f"Filename {contained_filename}")
        out_path = target_folder / contained_filename

        with open(out_path, "wb") as f_out:
            shutil.copyfileobj(f_in, f_out)

    return out_path

load_nimrod_cubes #

load_nimrod_cubes(filenames: list[str | Path]) -> Generator[Cube | Any, Any]

Parameters:

Name Type Description Default
filenames list[str | Path]

List of nimrod files

required

Returns:

Type Description
Generator[Cube | Any, Any]

Generator of cubes

Source code in src/geospatial_tools/radar/nimrod.py
63
64
65
66
67
68
69
70
71
72
73
74
75
76
def load_nimrod_cubes(filenames: list[str | Path]) -> Generator[Cube | Any, Any]:
    """

    Args:
        filenames: List of nimrod files

    Returns:
        Generator of cubes

    """
    # Ensure filenames are strings, as iris load_cubes might expect strings
    filenames = [str(f) for f in filenames]
    cubes = load_cubes(filenames)
    return cubes

load_nimrod_from_archive #

load_nimrod_from_archive(filename: str | Path) -> Generator[Cube | Any, Any]

Parameters:

Name Type Description Default
filename str | Path

Path to the archive file

required

Returns:

Type Description
Generator[Cube | Any, Any]

Generator of cubes

Source code in src/geospatial_tools/radar/nimrod.py
79
80
81
82
83
84
85
86
87
88
89
90
91
92
def load_nimrod_from_archive(filename: str | Path) -> Generator[Cube | Any, Any]:
    """

    Args:
        filename: Path to the archive file

    Returns:
        Generator of cubes

    """
    nimrod_extracted_file = extract_nimrod_from_archive(filename)
    # The extraction returns a single file path. We load that file.
    cubes = load_nimrod_cubes([nimrod_extracted_file])
    return cubes

merge_nimrod_cubes #

merge_nimrod_cubes(cubes: list[Cube]) -> Cube

Parameters:

Name Type Description Default
cubes list[Cube]

List of cubes to merge

required

Returns:

Type Description
Cube

Merged cube

Source code in src/geospatial_tools/radar/nimrod.py
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
def merge_nimrod_cubes(cubes: list[Cube]) -> Cube:
    """

    Args:
        cubes: List of cubes to merge

    Returns:
        Merged cube
    """
    cubes = CubeList(cubes)
    merged_cubes = cubes.merge_cube()
    return merged_cubes

mean_nimrod_cubes #

mean_nimrod_cubes(merged_cubes: Cube) -> Cube

Parameters:

Name Type Description Default
merged_cubes Cube

Merged cube

required

Returns:

Type Description
Cube

Mean cube

Source code in src/geospatial_tools/radar/nimrod.py
109
110
111
112
113
114
115
116
117
118
119
def mean_nimrod_cubes(merged_cubes: Cube) -> Cube:
    """

    Args:
        merged_cubes: Merged cube

    Returns:
        Mean cube
    """
    mean_cube = merged_cubes.collapsed("time", MEAN)
    return mean_cube

write_cube_to_file #

write_cube_to_file(cube: Cube, output_name: str | Path) -> None

Save a nimrod cube to a Netcdf file.

Parameters:

Name Type Description Default
cube Cube

Cube to save

required
output_name str | Path

Output filename

required
Source code in src/geospatial_tools/radar/nimrod.py
122
123
124
125
126
127
128
129
130
def write_cube_to_file(cube: Cube, output_name: str | Path) -> None:
    """
    Save a nimrod cube to a Netcdf file.

    Args:
        cube: Cube to save
        output_name: Output filename
    """
    netcdf.save(cube, output_name)

assert_dataset_time_dim_is_valid #

assert_dataset_time_dim_is_valid(
    dataset: Dataset, time_dimension_name: str = "time"
) -> None
Ths function checks that the time dimension of a given dataset
  • Is composed of 5-minute time bins - Which is the native Nimrod format
  • Contains a continuous time series, without any holes - which would lead to false statistics when resampling

Parameters:

Name Type Description Default
dataset Dataset

Merged nimrod cube

required
time_dimension_name str

Name of the time dimension

'time'

Returns:

Type Description
None

Bool value indicating if the time bins are 5 minutes long and if there are no

None

gaps in the time series

Source code in src/geospatial_tools/radar/nimrod.py
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
def assert_dataset_time_dim_is_valid(dataset: xr.Dataset, time_dimension_name: str = "time") -> None:
    """
    Ths function checks that the time dimension of a given dataset :
        - Is composed of 5-minute time bins - Which is the native Nimrod format
        - Contains a continuous time series, without any holes - which would lead to false statistics when resampling

    Args:
        dataset: Merged nimrod cube
        time_dimension_name: Name of the time dimension

    Returns:
        Bool value indicating if the time bins are 5 minutes long and if there are no
        gaps in the time series
    """
    dataset_time_dimension = dataset[time_dimension_name]
    if not dataset_time_dimension.to_index().is_monotonic_increasing:
        raise AssertionError("Time is not sorted ascending")
    if not dataset_time_dimension.to_index().is_unique:
        duplicates = dataset_time_dimension.to_index()[dataset_time_dimension.to_index().duplicated(keep=False)]
        raise AssertionError(f"Duplicate timestamps present: {duplicates[:10]} ...")

    difference_between_timesteps = dataset_time_dimension.diff(time_dimension_name)
    if (difference_between_timesteps != FIVE_MIN).any():
        larger_time_gaps = np.nonzero((difference_between_timesteps != FIVE_MIN).compute().to_numpy())[0][:5]
        raise AssertionError(
            f"Non-5min gaps at positions {larger_time_gaps} "
            f"(examples: {difference_between_timesteps.isel({time_dimension_name: larger_time_gaps}).to_numpy()})"
        )

    start = pd.Timestamp(dataset_time_dimension.to_numpy()[0])
    end = pd.Timestamp(dataset_time_dimension.to_numpy()[-1])
    expected_index = pd.date_range(start=start, end=end, freq="5min", inclusive="both")
    dataset_index = dataset_time_dimension.to_index()
    missing_indexes = expected_index.difference(dataset_index)
    if len(missing_indexes) > 0:
        raise AssertionError(f"missing {len(missing_indexes)} stamps; first few: {missing_indexes[:10]}")

resample_nimrod_timebox_30min_bins #

resample_nimrod_timebox_30min_bins(
    filenames: list[str | Path], output_name: str | Path
) -> str | Path

This will resample nimrod data's bins to 30-minute interval instead of their normal 5-minute interval. It uses a mean resampling, and creates time bins like follows :

ex. [[09h00, < 9h05], [09h05, < 9h10], ... ] -> [[09h00, < 9h30], [09h30, < 10h], ... ]

Parameters:

Name Type Description Default
filenames list[str | Path]

List of netcdf nimrod files

required
output_name str | Path

Output filename

required

Returns:

Type Description
str | Path

Path to the output file

Source code in src/geospatial_tools/radar/nimrod.py
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
def resample_nimrod_timebox_30min_bins(filenames: list[str | Path], output_name: str | Path) -> str | Path:
    """
    This will resample nimrod data's bins to 30-minute interval instead of their
    normal 5-minute interval. It uses a mean resampling, and creates time bins like
    follows :

    ex. [[09h00, < 9h05], [09h05, < 9h10], ... ] -> [[09h00, < 9h30], [09h30, < 10h], ... ]

    Args:
        filenames: List of netcdf nimrod files
        output_name: Output filename

    Returns:
        Path to the output file
    """
    ds = xr.open_mfdataset(filenames, combine="nested", concat_dim="time")
    ds_30min = ds.resample(time="30min").mean()
    ds_30min.to_netcdf(output_name)
    return output_name

raster #

This module contains functions that process or create raster/image data.

reproject_raster #

reproject_raster(
    dataset_path: str | Path,
    target_crs: str | int,
    target_path: str | Path,
    logger: Logger = LOGGER,
) -> Path | None

Parameters:

Name Type Description Default
dataset_path str | Path

Path to the dataset to be reprojected.

required
target_crs str | int

EPSG code in string or int format. Can be given in the following ways: 5070 | "5070" | "EPSG:5070"

required
target_path str | Path

Path and filename for reprojected dataset.

required
logger Logger

Logger instance

LOGGER

Returns:

Source code in src/geospatial_tools/raster.py
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
def reproject_raster(
    dataset_path: str | pathlib.Path,
    target_crs: str | int,
    target_path: str | pathlib.Path,
    logger: logging.Logger = LOGGER,
) -> pathlib.Path | None:
    """

    Args:
      dataset_path: Path to the dataset to be reprojected.
      target_crs: EPSG code in string or int format. Can be given in the following ways: 5070 | "5070" | "EPSG:5070"
      target_path: Path and filename for reprojected dataset.
      logger: Logger instance

    Returns:


    """
    if isinstance(dataset_path, str):
        dataset_path = pathlib.Path(dataset_path)

    if isinstance(target_path, str):
        target_path = pathlib.Path(target_path)

    target_crs = create_crs(target_crs, logger=logger)

    with rasterio.open(dataset_path) as source_dataset:
        transform, width, height = calculate_default_transform(
            source_dataset.crs, target_crs, source_dataset.width, source_dataset.height, *source_dataset.bounds
        )
        kwargs = source_dataset.meta.copy()
        kwargs.update({"crs": target_crs, "transform": transform, "width": width, "height": height})

        with rasterio.open(target_path, "w", **kwargs) as reprojected_dataset:
            for i in range(1, source_dataset.count + 1):
                reproject(
                    source=rasterio.band(source_dataset, i),
                    destination=rasterio.band(reprojected_dataset, i),
                    src_transform=source_dataset.transform,
                    src_crs=source_dataset.crs,
                    dst_transform=transform,
                    dst_crs=target_crs,
                    resampling=Resampling.nearest,
                )
    if target_path.exists():
        logger.info(f"Reprojected file created at {target_path}")
        return target_path
    logger.error(f"Failed to reproject file {dataset_path}")
    return None

clip_raster_with_polygon #

clip_raster_with_polygon(
    raster_image: Path | str,
    polygon_layer: Path | str | GeoDataFrame,
    base_output_filename: str | None = None,
    output_dir: str | Path = DATA_DIR,
    num_of_workers: int | None = None,
    logger: Logger = LOGGER,
) -> list[Path]

Parameters:

Name Type Description Default
raster_image Path | str

Path to raster image to be clipped.

required
polygon_layer Path | str | GeoDataFrame

Polygon layer which polygons will be used to clip the raster image.

required
base_output_filename str | None

Base filename for outputs. If None, will be taken from input polygon layer.

None
output_dir str | Path

Directory path where output will be written.

DATA_DIR
num_of_workers int | None

The number of processes to use for parallel execution. If using on a compute cluster, please set a specific amount (ex. 1 per CPU core requested). Defaults to cpu_count().

None
logger Logger

Logger instance

LOGGER

Returns:

Source code in src/geospatial_tools/raster.py
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
def clip_raster_with_polygon(
    raster_image: pathlib.Path | str,
    polygon_layer: pathlib.Path | str | GeoDataFrame,
    base_output_filename: str | None = None,
    output_dir: str | pathlib.Path = DATA_DIR,
    num_of_workers: int | None = None,
    logger: logging.Logger = LOGGER,
) -> list[pathlib.Path]:
    """

    Args:
      raster_image: Path to raster image to be clipped.
      polygon_layer: Polygon layer which polygons will be used to clip the raster image.
      base_output_filename: Base filename for outputs. If `None`, will be taken from
        input polygon layer.
      output_dir: Directory path where output will be written.
      num_of_workers: The number of processes to use for parallel execution. If using
        on a compute cluster, please set a specific amount (ex. 1 per CPU core requested).
        Defaults to `cpu_count()`.
      logger: Logger instance

    Returns:


    """
    workers = cpu_count()
    if num_of_workers:
        workers = num_of_workers

    logger.info(f"Number of workers used: {workers}")
    logger.info(f"Output path : [{output_dir}]")

    if isinstance(raster_image, str):
        raster_image = pathlib.Path(raster_image)
    if not raster_image.exists():
        logger.error("Raster image does not exist")
        return []

    if not base_output_filename:
        base_output_filename = raster_image.stem

    if isinstance(output_dir, str):
        output_dir = pathlib.Path(output_dir)
    output_dir.mkdir(parents=True, exist_ok=True)

    gdf = polygon_layer
    if not isinstance(polygon_layer, GeoDataFrame):
        gdf = gpd.read_file(polygon_layer)

    assert isinstance(gdf, GeoDataFrame)
    polygons = gdf["geometry"]
    _ = gdf.index

    id_polygon_list = zip(gdf.index.tolist(), polygons, strict=False)
    logger.info(f"Clipping raster image with {len(polygons)} polygons")
    with ProcessPoolExecutor(max_workers=workers) as executor:
        futures = [
            executor.submit(
                _clip_process,
                raster_image=raster_image,
                id_polygon=polygon_tuple,
                base_output_filename=base_output_filename,
                output_dir=output_dir,
                logger=logger,
            )
            for polygon_tuple in id_polygon_list
        ]
        results = [future.result() for future in concurrent.futures.as_completed(futures)]
    path_list = []
    for result in results:
        if isinstance(result, tuple):
            logger.debug(f"Writing file successful : [{result}]")
            path_list.append(result[2])
    logger.info("Clipping process finished")
    return path_list

get_total_band_count #

get_total_band_count(
    raster_file_list: Sequence[Path | str], logger: Logger = LOGGER
) -> int

Parameters:

Name Type Description Default
raster_file_list Sequence[Path | str]

List of raster files to be processed.

required
logger Logger

Logger instance

LOGGER

Returns:

Source code in src/geospatial_tools/raster.py
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
def get_total_band_count(raster_file_list: Sequence[pathlib.Path | str], logger: logging.Logger = LOGGER) -> int:
    """

    Args:
      raster_file_list: List of raster files to be processed.
      logger: Logger instance

    Returns:


    """
    total_band_count = 0
    for raster in raster_file_list:
        with rasterio.open(raster, "r") as raster_image:
            total_band_count += raster_image.count
    logger.info(f"Calculated a total of [{total_band_count}] bands")
    return total_band_count

create_merged_raster_bands_metadata #

create_merged_raster_bands_metadata(
    raster_file_list: Sequence[Path | str], logger: Logger = LOGGER
) -> dict

Parameters:

Name Type Description Default
raster_file_list Sequence[Path | str]
required
logger Logger
LOGGER

Returns:

Source code in src/geospatial_tools/raster.py
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
def create_merged_raster_bands_metadata(
    raster_file_list: Sequence[pathlib.Path | str], logger: logging.Logger = LOGGER
) -> dict:
    """

    Args:
      raster_file_list:
      logger:

    Returns:


    """
    logger.info("Creating merged asset metadata")
    total_band_count = get_total_band_count(raster_file_list)
    with rasterio.open(raster_file_list[0]) as meta_source:
        meta = meta_source.meta
        meta.update(count=total_band_count)
    return meta

merge_raster_bands #

merge_raster_bands(
    raster_file_list: Sequence[Path | str],
    merged_filename: Path | str,
    merged_band_names: list[str] | None = None,
    merged_metadata: dict | None = None,
    logger: Logger = LOGGER,
) -> Path | None

This function aims to combine multiple overlapping raster bands into a single raster image.

Example use case: I have 3 bands, B0, B1 and B2, each as an independent raster file (like is the case with downloaded STAC data.

While it can probably be used to create spatial time series, and not just combine bands from a single image product, it has not yet been tested for that specific purpose.

Parameters:

Name Type Description Default
raster_file_list Sequence[Path | str]

List of raster files to be processed.

required
merged_filename Path | str

Name of output raster file.

required
merged_metadata dict | None

Dictionary of metadata to use if you prefer to great it independently.

None
merged_band_names list[str] | None

Names of final output raster bands. For example : I have 3 images representing each a single band; raster_file_list = ["image01_B0.tif", "image01_B1.tif", "image01_B2.tif"]. With, merged_band_names, individual band id can be assigned for the final output raster; ["B0", "B1", "B2"].

None
logger Logger

Logger instance

LOGGER

Returns:

Type Description
Path | None

Path to merged raster

Source code in src/geospatial_tools/raster.py
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
def merge_raster_bands(
    raster_file_list: Sequence[pathlib.Path | str],
    merged_filename: pathlib.Path | str,
    merged_band_names: list[str] | None = None,
    merged_metadata: dict | None = None,
    logger: logging.Logger = LOGGER,
) -> pathlib.Path | None:
    """
    This function aims to combine multiple overlapping raster bands into a single raster image.

    Example use case: I have 3 bands, B0, B1 and B2, each as an independent raster file (like is the case with
    downloaded STAC data.

    While it can probably be used to create spatial time series, and not just combine bands
    from a single image product, it has not yet been tested for that specific purpose.

    Args:
      raster_file_list: List of raster files to be processed.
      merged_filename: Name of output raster file.
      merged_metadata: Dictionary of metadata to use if you prefer to great it independently.
      merged_band_names: Names of final output raster bands. For example : I have 3 images representing each
        a single band; raster_file_list =  ["image01_B0.tif", "image01_B1.tif", "image01_B2.tif"].
        With, merged_band_names, individual band id can be assigned for the final output raster;
        ["B0", "B1", "B2"].
      logger: Logger instance

    Returns:
        Path to merged raster
    """
    if not merged_metadata:
        merged_metadata = create_merged_raster_bands_metadata(raster_file_list)

    merged_image_index = 1
    band_names_index = 0

    logger.info(f"Merging asset [{merged_filename}] ...")
    # Create the final raster image in which all bands will be written to
    with rasterio.open(merged_filename, "w", **merged_metadata) as merged_asset_image:
        # Iterate through the raster file list to be merged
        for raster_file in raster_file_list:
            asset_name = pathlib.Path(raster_file).name
            logger.info(f"Writing band image: {asset_name}")
            with rasterio.open(raster_file) as source_image:
                num_of_bands = source_image.count

                # Iterate through each band of the raster file
                for source_image_band_index in range(1, num_of_bands + 1):
                    logger.info(
                        f"Writing asset sub item band {source_image_band_index} "
                        f"to merged index band {merged_image_index}"
                    )
                    # Write band to output merged_asset_image
                    merged_asset_image.write_band(merged_image_index, source_image.read(source_image_band_index))
                    _handle_band_metadata(
                        source_image=source_image,
                        source_image_band_index=source_image_band_index,
                        band_names_index=band_names_index,
                        merged_asset_image=merged_asset_image,
                        merged_band_names=merged_band_names,
                        merged_image_index=merged_image_index,
                    )
                    merged_image_index += 1
                band_names_index += 1

    if not merged_filename.exists():
        return None

    if isinstance(merged_filename, str):
        return pathlib.Path(merged_filename)
    return merged_filename

s3_utils #

Utility module for S3 operations related to Copernicus Data Space Ecosystem.

get_s3_client #

get_s3_client(endpoint_url: str | None = None) -> client

Creates and returns a boto3 S3 client.

Parameters:

Name Type Description Default
endpoint_url str | None

The S3 endpoint URL. If None, it attempts to use the COPERNICUS_S3_ENDPOINT environment variable.

None

Returns:

Type Description
client

A boto3 S3 client.

Source code in src/geospatial_tools/s3_utils.py
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
def get_s3_client(endpoint_url: str | None = None) -> boto3.client:
    """
    Creates and returns a boto3 S3 client.

    Args:
        endpoint_url: The S3 endpoint URL. If None, it attempts to use
                      the COPERNICUS_S3_ENDPOINT environment variable.

    Returns:
        A boto3 S3 client.
    """
    if not endpoint_url:
        endpoint_url = os.environ.get("COPERNICUS_S3_ENDPOINT", "https://eodata.dataspace.copernicus.eu")

    access_key = os.environ.get("AWS_ACCESS_KEY_ID") or None
    secret_key = os.environ.get("AWS_SECRET_ACCESS_KEY") or None

    if not access_key or not secret_key:
        LOGGER.warning("AWS_ACCESS_KEY_ID or AWS_SECRET_ACCESS_KEY not found in environment.")

    LOGGER.info(f"Creating S3 client with endpoint: [{endpoint_url}]")

    # Note: boto3 automatically picks up AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY
    # from the environment, but we can also pass them explicitly if needed.
    return boto3.client(
        "s3",
        endpoint_url=endpoint_url,
        aws_access_key_id=access_key,
        aws_secret_access_key=secret_key,
    )

parse_s3_url #

parse_s3_url(url: str) -> tuple[str, str]

Parses an S3 URL or a CDSE STAC href to extract the bucket and key.

Expected formats: - s3://bucket/key - https://eodata.dataspace.copernicus.eu/bucket/key - https://zipper.dataspace.copernicus.eu/download/uuid (this might not be a direct S3 key)

Parameters:

Name Type Description Default
url str

The URL to parse.

required

Returns:

Type Description
tuple[str, str]

A tuple of (bucket, key).

Raises:

Type Description
ValueError

If the URL cannot be parsed into a bucket and key.

Source code in src/geospatial_tools/s3_utils.py
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
def parse_s3_url(url: str) -> tuple[str, str]:
    """
    Parses an S3 URL or a CDSE STAC href to extract the bucket and key.

    Expected formats:
    - s3://bucket/key
    - https://eodata.dataspace.copernicus.eu/bucket/key
    - https://zipper.dataspace.copernicus.eu/download/uuid (this might not be a direct S3 key)

    Args:
        url: The URL to parse.

    Returns:
        A tuple of (bucket, key).

    Raises:
        ValueError: If the URL cannot be parsed into a bucket and key.
    """
    parsed = urlparse(url)

    if parsed.scheme == "s3":
        bucket = parsed.netloc
        key = parsed.path.lstrip("/")
        return bucket, key

    if parsed.scheme in ["http", "https"]:
        # For CDSE eodata endpoint, the path starts with the bucket
        # e.g., /Sentinel-2/MSI/L2A/2023/01/01/...
        path_parts = parsed.path.lstrip("/").split("/")
        if len(path_parts) < 2:
            raise ValueError(f"URL path does not contain enough parts to determine bucket and key: {url}")

        bucket = path_parts[0]
        key = "/".join(path_parts[1:])
        return bucket, key

    raise ValueError(f"Unsupported URL scheme: {parsed.scheme} in URL: {url}")

stac #

This module contains functions that are related to STAC API.

AssetSubItem #

AssetSubItem(asset: Item, item_id: str, band: str, filename: str | Path)

Class that represent a STAC asset sub item.

Generally represents a single satellite image band.

Initializes an AssetSubItem.

Parameters:

Name Type Description Default
asset Item

The pystac Item this asset belongs to.

required
item_id str

The ID of the item.

required
band str

The band name of this sub-item.

required
filename str | Path

The local filename of the downloaded asset.

required
Source code in src/geospatial_tools/stac.py
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
def __init__(self, asset: pystac.Item, item_id: str, band: str, filename: str | Path) -> None:
    """
    Initializes an AssetSubItem.

    Args:
        asset: The pystac Item this asset belongs to.
        item_id: The ID of the item.
        band: The band name of this sub-item.
        filename: The local filename of the downloaded asset.
    """
    if isinstance(filename, str):
        filename = Path(filename)
    self.asset = asset
    self.item_id: str = item_id
    self.band: str = band
    self.filename: Path = filename

Asset #

Asset(
    asset_id: str,
    bands: list[str] | None = None,
    asset_item_list: list[AssetSubItem] | None = None,
    merged_asset_path: str | Path | None = None,
    reprojected_asset: str | Path | None = None,
    logger: Logger = LOGGER,
)

Represents a STAC asset, potentially composed of multiple bands/sub-items.

Initializes an Asset object.

Parameters:

Name Type Description Default
asset_id str

Unique ID for the asset (usually the item ID).

required
bands list[str] | None

List of bands this asset contains.

None
asset_item_list list[AssetSubItem] | None

List of AssetSubItem objects belonging to this asset.

None
merged_asset_path str | Path | None

Path to the merged multi-band raster file.

None
reprojected_asset str | Path | None

Path to the reprojected raster file.

None
logger Logger

Logger instance.

LOGGER
Source code in src/geospatial_tools/stac.py
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
def __init__(
    self,
    asset_id: str,
    bands: list[str] | None = None,
    asset_item_list: list[AssetSubItem] | None = None,
    merged_asset_path: str | Path | None = None,
    reprojected_asset: str | Path | None = None,
    logger: logging.Logger = LOGGER,
) -> None:
    """
    Initializes an Asset object.

    Args:
        asset_id: Unique ID for the asset (usually the item ID).
        bands: List of bands this asset contains.
        asset_item_list: List of AssetSubItem objects belonging to this asset.
        merged_asset_path: Path to the merged multi-band raster file.
        reprojected_asset: Path to the reprojected raster file.
        logger: Logger instance.
    """
    self.asset_id = asset_id
    self.bands = bands
    self.merged_asset_path = Path(merged_asset_path) if isinstance(merged_asset_path, str) else merged_asset_path
    self.reprojected_asset_path = (
        Path(reprojected_asset) if isinstance(reprojected_asset, str) else reprojected_asset
    )
    self.logger = logger

    self._sub_items: list[AssetSubItem] = asset_item_list or []

__iter__ #

__iter__() -> Iterator[AssetSubItem]

Allows direct iteration: for item in asset:

Source code in src/geospatial_tools/stac.py
193
194
195
def __iter__(self) -> Iterator[AssetSubItem]:
    """Allows direct iteration: `for item in asset:`"""
    return iter(self._sub_items)

__len__ #

__len__() -> int

Allows checking size: len(asset)

Source code in src/geospatial_tools/stac.py
197
198
199
def __len__(self) -> int:
    """Allows checking size: `len(asset)`"""
    return len(self._sub_items)

__contains__ #

__contains__(band_name: str) -> bool

Allows checking for band existence: "B04" in asset

Source code in src/geospatial_tools/stac.py
201
202
203
def __contains__(self, band_name: str) -> bool:
    """Allows checking for band existence: `"B04" in asset`"""
    return any(item.band == band_name for item in self._sub_items)

__getitem__ #

__getitem__(index: int) -> AssetSubItem
__getitem__(band_name: str) -> AssetSubItem
__getitem__(key: int | str) -> AssetSubItem

Allows indexing by position or band name: asset[0] or asset["B04"]

Source code in src/geospatial_tools/stac.py
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
def __getitem__(self, key: int | str) -> AssetSubItem:
    """
    Allows indexing by position or band name:
    `asset[0]` or `asset["B04"]`
    """
    if isinstance(key, int):
        return self._sub_items[key]

    if isinstance(key, str):
        for item in self._sub_items:
            if item.band == key:
                return item
        raise KeyError(f"Band '{key}' not found in asset '{self.asset_id}'.")

    raise TypeError(f"Invalid argument type: {type(key)}. Expected int or str.")

add_asset_item #

add_asset_item(asset: AssetSubItem) -> None

Adds an AssetSubItem to the asset.

Parameters:

Name Type Description Default
asset AssetSubItem

The AssetSubItem to add.

required
Source code in src/geospatial_tools/stac.py
227
228
229
230
231
232
233
234
235
236
def add_asset_item(self, asset: AssetSubItem) -> None:
    """
    Adds an AssetSubItem to the asset.

    Args:
      asset: The AssetSubItem to add.
    """
    self._sub_items.append(asset)
    if self.bands is not None and asset.band not in self.bands:
        self.bands.append(asset.band)

show_asset_items #

show_asset_items() -> None

Show items that belong to this asset.

Source code in src/geospatial_tools/stac.py
238
239
240
241
242
243
def show_asset_items(self) -> None:
    """Show items that belong to this asset."""
    asset_list = [
        f"ID: [{item.item_id}], Band: [{item.band}], filename: [{item.filename}]" for item in self._sub_items
    ]
    self.logger.info(f"Asset list for asset [{self.asset_id}] :\n\t{asset_list}")

merge_asset #

merge_asset(
    base_directory: str | Path | None = None, delete_sub_items: bool = False
) -> Path | None

Merges individual band rasters into a single multi-band raster file.

Parameters:

Name Type Description Default
base_directory str | Path | None

Directory where the merged file will be saved.

None
delete_sub_items bool

If True, delete individual band files after merging.

False

Returns:

Type Description
Path | None

The Path to the merged file if successful, else None.

Source code in src/geospatial_tools/stac.py
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
def merge_asset(self, base_directory: str | Path | None = None, delete_sub_items: bool = False) -> Path | None:
    """
    Merges individual band rasters into a single multi-band raster file.

    Args:
      base_directory: Directory where the merged file will be saved.
      delete_sub_items: If True, delete individual band files after merging.

    Returns:
        The Path to the merged file if successful, else None.
    """
    if not base_directory:
        base_directory = Path()
    if isinstance(base_directory, str):
        base_directory = Path(base_directory)

    merged_filename = base_directory / f"{self.asset_id}_merged.tif"

    if not self._sub_items:
        self.logger.error(f"No asset items to merge for asset [{self.asset_id}]")
        return None

    asset_filename_list = [asset.filename for asset in self._sub_items]

    meta = self._create_merged_asset_metadata()

    merge_raster_bands(
        merged_filename=merged_filename,
        raster_file_list=asset_filename_list,
        merged_metadata=meta,
        merged_band_names=self.bands,
    )

    if merged_filename.exists():
        self.logger.info(f"Asset [{self.asset_id}] merged successfully")
        self.logger.info(f"Asset location : [{merged_filename}]")
        self.merged_asset_path = merged_filename
        if delete_sub_items:
            self.delete_asset_sub_items()
        return merged_filename
    self.logger.error(f"There was a problem merging asset [{self.asset_id}]")
    return None

reproject_merged_asset #

reproject_merged_asset(
    target_projection: str | int,
    base_directory: str | Path | None = None,
    delete_merged_asset: bool = False,
) -> Path | None

Reprojects the merged multi-band raster to a target projection.

Parameters:

Name Type Description Default
target_projection str | int

The target CRS (EPSG code or string).

required
base_directory str | Path | None

Directory where the reprojected file will be saved.

None
delete_merged_asset bool

If True, delete the merged file after reprojection.

False

Returns:

Type Description
Path | None

The Path to the reprojected file if successful, else None.

Source code in src/geospatial_tools/stac.py
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
def reproject_merged_asset(
    self,
    target_projection: str | int,
    base_directory: str | Path | None = None,
    delete_merged_asset: bool = False,
) -> Path | None:
    """
    Reprojects the merged multi-band raster to a target projection.

    Args:
      target_projection: The target CRS (EPSG code or string).
      base_directory: Directory where the reprojected file will be saved.
      delete_merged_asset: If True, delete the merged file after reprojection.

    Returns:
        The Path to the reprojected file if successful, else None.
    """
    if not base_directory:
        base_directory = Path()
    if isinstance(base_directory, str):
        base_directory = Path(base_directory)
    target_path = base_directory / f"{self.asset_id}_reprojected.tif"
    self.logger.info(f"Reprojecting asset [{self.asset_id}] ...")

    if not self.merged_asset_path:
        self.logger.error(f"Merged asset path is missing for asset [{self.asset_id}]")
        return None

    reprojected_filename = reproject_raster(
        dataset_path=self.merged_asset_path,
        target_path=target_path,
        target_crs=target_projection,
        logger=self.logger,
    )
    if reprojected_filename and reprojected_filename.exists():
        self.logger.info(f"Asset location : [{reprojected_filename}]")
        self.reprojected_asset_path = reprojected_filename
        if delete_merged_asset:
            self.delete_merged_asset()
        return reprojected_filename
    self.logger.error(f"There was a problem reprojecting asset [{self.asset_id}]")
    return None

delete_asset_sub_items #

delete_asset_sub_items() -> None

Delete all asset sub items that belong to this asset.

Source code in src/geospatial_tools/stac.py
331
332
333
334
335
336
def delete_asset_sub_items(self) -> None:
    """Delete all asset sub items that belong to this asset."""
    self.logger.info(f"Deleting asset sub items from asset [{self.asset_id}]")
    for item in self._sub_items:
        self.logger.info(f"Deleting [{item.filename}] ...")
        item.filename.unlink(missing_ok=True)

delete_merged_asset #

delete_merged_asset() -> None

Delete merged asset.

Source code in src/geospatial_tools/stac.py
338
339
340
341
342
def delete_merged_asset(self) -> None:
    """Delete merged asset."""
    if self.merged_asset_path:
        self.logger.info(f"Deleting merged asset file for [{self.merged_asset_path}]")
        self.merged_asset_path.unlink(missing_ok=True)

delete_reprojected_asset #

delete_reprojected_asset() -> None

Delete reprojected asset.

Source code in src/geospatial_tools/stac.py
344
345
346
347
348
def delete_reprojected_asset(self) -> None:
    """Delete reprojected asset."""
    if self.reprojected_asset_path:
        self.logger.info(f"Deleting reprojected asset file for [{self.reprojected_asset_path}]")
        self.reprojected_asset_path.unlink(missing_ok=True)

StacSearch #

StacSearch(catalog_name: str, logger: Logger = LOGGER)

Utility class to help facilitate and automate STAC API searches through the use of pystac_client.Client.

Initializes a StacSearch instance.

Parameters:

Name Type Description Default
catalog_name str

Name of the STAC catalog (e.g., 'planetary_computer', 'copernicus').

required
logger Logger

Logger instance.

LOGGER
Source code in src/geospatial_tools/stac.py
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
def __init__(self, catalog_name: str, logger: logging.Logger = LOGGER) -> None:
    """
    Initializes a StacSearch instance.

    Args:
        catalog_name: Name of the STAC catalog (e.g., 'planetary_computer', 'copernicus').
        logger: Logger instance.
    """
    self.catalog_name = catalog_name
    self.catalog: pystac_client.Client | None = catalog_generator(catalog_name=catalog_name)
    self.search_results: list[pystac.Item] | None = None
    self.cloud_cover_sorted_results: list[pystac.Item] | None = None
    self.filtered_results: list[pystac.Item] | None = None
    self.downloaded_search_assets: list[Asset] | None = None
    self.downloaded_cloud_cover_sorted_assets: list[Asset] | None = None
    self.downloaded_best_sorted_asset: Asset | None = None
    self.logger = logger
    self.s3_client: Any | None = None
    if catalog_name == COPERNICUS:
        self.s3_client = s3_utils.get_s3_client()

search #

search(
    date_range: DateLike = None,
    max_items: int | None = None,
    limit: int | None = None,
    ids: list[str] | None = None,
    collections: str | list[str] | None = None,
    bbox: BBoxLike | None = None,
    intersects: IntersectsLike | None = None,
    query: dict[str, Any] | None = None,
    sortby: list[dict[str, str]] | str | list[str] | None = None,
    max_retries: int = 3,
    delay: int = 5,
) -> list[Item]

STAC API search that will use search query and parameters. Essentially a wrapper on pystac_client.Client.

Parameter descriptions taken from pystac docs.

Parameters:

Name Type Description Default
date_range DateLike

Either a single datetime or datetime range used to filter results. You may express a single datetime using a :class:datetime.datetime instance, a RFC 3339-compliant <https://tools.ietf.org/html/rfc3339>__ timestamp, or a simple date string (see below). Instances of :class:datetime.datetime may be either timezone aware or unaware. Timezone aware instances will be converted to a UTC timestamp before being passed to the endpoint. Timezone unaware instances are assumed to represent UTC timestamps. You may represent a datetime range using a "/" separated string as described in the spec, or a list, tuple, or iterator of 2 timestamps or datetime instances. For open-ended ranges, use either ".." ('2020-01-01:00:00:00Z/..', ['2020-01-01:00:00:00Z', '..']) or a value of None (['2020-01-01:00:00:00Z', None]). If using a simple date string, the datetime can be specified in YYYY-mm-dd format, optionally truncating to YYYY-mm or just YYYY. Simple date strings will be expanded to include the entire time period, for example: 2017 expands to 2017-01-01T00:00:00Z/2017-12-31T23:59:59Z and 2017-06 expands to 2017-06-01T00:00:00Z/2017-06-30T23:59:59Z If used in a range, the end of the range expands to the end of that day/month/year, for example: 2017-06-10/2017-06-11 expands to 2017-06-10T00:00:00Z/2017-06-11T23:59:59Z (Default value = None)

None
max_items int | None

The maximum number of items to return from the search, even if there are more matching results.

None
limit int | None

A recommendation to the service as to the number of items to return per page of results.

None
ids list[str] | None

List of one or more Item ids to filter on.

None
collections str | list[str] | None

List of one or more Collection IDs or pystac. Collection instances. Only Items in one of the provided Collections will be searched

None
bbox BBoxLike | None

A list, tuple, or iterator representing a bounding box of 2D or 3D coordinates. Results will be filtered to only those intersecting the bounding box.

None
intersects IntersectsLike | None

A string or dictionary representing a GeoJSON geometry, or an object that implements a geo_interface property, as supported by several libraries including Shapely, ArcPy, PySAL, and geojson. Results filtered to only those intersecting the geometry.

None
query dict[str, Any] | None

List or JSON of query parameters as per the STAC API query extension.

None
sortby list[dict[str, str]] | str | list[str] | None

A single field or list of fields to sort the response by

None
max_retries int
3
delay int
5

Returns:

Type Description
list[Item]

A list of pystac.Item objects matching the search criteria.

Source code in src/geospatial_tools/stac.py
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
def search(
    self,
    date_range: DateLike = None,
    max_items: int | None = None,
    limit: int | None = None,
    ids: list[str] | None = None,
    collections: str | list[str] | None = None,
    bbox: geotools_types.BBoxLike | None = None,
    intersects: geotools_types.IntersectsLike | None = None,
    query: dict[str, Any] | None = None,
    sortby: list[dict[str, str]] | str | list[str] | None = None,
    max_retries: int = 3,
    delay: int = 5,
) -> list[pystac.Item]:
    """
    STAC API search that will use search query and parameters. Essentially a wrapper on `pystac_client.Client`.

    Parameter descriptions taken from pystac docs.

    Args:
      date_range: Either a single datetime or datetime range used to filter results.
            You may express a single datetime using a :class:`datetime.datetime`
            instance, a `RFC 3339-compliant <https://tools.ietf.org/html/rfc3339>`__
            timestamp, or a simple date string (see below). Instances of
            :class:`datetime.datetime` may be either
            timezone aware or unaware. Timezone aware instances will be converted to
            a UTC timestamp before being passed
            to the endpoint. Timezone unaware instances are assumed to represent UTC
            timestamps. You may represent a
            datetime range using a ``"/"`` separated string as described in the
            spec, or a list, tuple, or iterator
            of 2 timestamps or datetime instances. For open-ended ranges, use either
            ``".."`` (``'2020-01-01:00:00:00Z/..'``,
            ``['2020-01-01:00:00:00Z', '..']``) or a value of ``None``
            (``['2020-01-01:00:00:00Z', None]``).
            If using a simple date string, the datetime can be specified in
            ``YYYY-mm-dd`` format, optionally truncating
            to ``YYYY-mm`` or just ``YYYY``. Simple date strings will be expanded to
            include the entire time period, for example: ``2017`` expands to
            ``2017-01-01T00:00:00Z/2017-12-31T23:59:59Z`` and ``2017-06`` expands
            to ``2017-06-01T00:00:00Z/2017-06-30T23:59:59Z``
            If used in a range, the end of the range expands to the end of that
            day/month/year, for example: ``2017-06-10/2017-06-11`` expands to
              ``2017-06-10T00:00:00Z/2017-06-11T23:59:59Z`` (Default value = None)
      max_items: The maximum number of items to return from the search, even if there are
        more matching results.
      limit: A recommendation to the service as to the number of items to return per
        page of results.
      ids: List of one or more Item ids to filter on.
      collections: List of one or more Collection IDs or pystac. Collection instances. Only Items in one of the
        provided Collections will be searched
      bbox: A list, tuple, or iterator representing a bounding box of 2D or 3D coordinates. Results will be filtered
        to only those intersecting the bounding box.
      intersects: A string or dictionary representing a GeoJSON geometry, or an object that implements a
        __geo_interface__ property, as supported by several libraries including Shapely, ArcPy, PySAL, and geojson.
        Results filtered to only those intersecting the geometry.
      query: List or JSON of query parameters as per the STAC API query extension.
      sortby: A single field or list of fields to sort the response by
      max_retries:
      delay:

    Returns:
        A list of pystac.Item objects matching the search criteria.
    """
    if isinstance(collections, str):
        collections = [collections]
    if isinstance(sortby, dict):
        sortby = [sortby]

    if not self.catalog:
        self.logger.error("STAC client is not initialized.")
        return []

    intro_log = "Initiating STAC API search"
    if query:
        intro_log = f"{intro_log} \n\tQuery : [{query}]"
    self.logger.info(intro_log)
    items: list[pystac.Item] = []
    for attempt in range(1, max_retries + 1):
        try:
            items = self._base_catalog_search(
                date_range=date_range,
                max_items=max_items,
                limit=limit,
                ids=ids,
                collections=collections,
                bbox=bbox,
                intersects=intersects,
                query=query,
                sortby=sortby,
            )
            break
        except APIError as e:  # pylint: disable=W0718
            self.logger.error(f"Attempt {attempt} failed: {e}")
            if attempt < max_retries:
                time.sleep(delay)
            else:
                raise e

    self.search_results = items
    return items

search_for_date_ranges #

search_for_date_ranges(
    date_ranges: Sequence[DateLike],
    max_items: int | None = None,
    limit: int | None = None,
    collections: str | list[str] | None = None,
    bbox: BBoxLike | None = None,
    intersects: IntersectsLike | None = None,
    query: dict[str, Any] | None = None,
    sortby: list[dict[str, str]] | str | list[str] | None = None,
    max_retries: int = 3,
    delay: int = 5,
) -> list[Item]

STAC API search that will use search query and parameters for each date range in given list of date_ranges.

Date ranges can be generated with the help of the geospatial_tools.utils.create_date_range_for_specific_period function for more complex ranges.

Parameters:

Name Type Description Default
date_ranges Sequence[DateLike]

List containing datetime date ranges

required
max_items int | None

The maximum number of items to return from the search, even if there are more matching results

None
limit int | None

A recommendation to the service as to the number of items to return per page of results.

None
collections str | list[str] | None

List of one or more Collection IDs or pystac. Collection instances. Only Items in one of the provided Collections will be searched

None
bbox BBoxLike | None

A list, tuple, or iterator representing a bounding box of 2D or 3D coordinates. Results will be filtered to only those intersecting the bounding box.

None
intersects IntersectsLike | None

A string or dictionary representing a GeoJSON geometry, or an object that implements a geo_interface property, as supported by several libraries including Shapely, ArcPy, PySAL, and geojson. Results filtered to only those intersecting the geometry.

None
query dict[str, Any] | None

List or JSON of query parameters as per the STAC API query extension.

None
sortby list[dict[str, str]] | str | list[str] | None

A single field or list of fields to sort the response by

None
max_retries int
3
delay int
5

Returns:

Type Description
list[Item]

A list of pystac.Item objects.

Source code in src/geospatial_tools/stac.py
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
def search_for_date_ranges(
    self,
    date_ranges: Sequence[DateLike],
    max_items: int | None = None,
    limit: int | None = None,
    collections: str | list[str] | None = None,
    bbox: geotools_types.BBoxLike | None = None,
    intersects: geotools_types.IntersectsLike | None = None,
    query: dict[str, Any] | None = None,
    sortby: list[dict[str, str]] | str | list[str] | None = None,
    max_retries: int = 3,
    delay: int = 5,
) -> list[pystac.Item]:
    """
    STAC API search that will use search query and parameters for each date range in given list of `date_ranges`.

    Date ranges can be generated with the help of the `geospatial_tools.utils.create_date_range_for_specific_period`
    function for more complex ranges.

    Args:
      date_ranges: List containing datetime date ranges
      max_items: The maximum number of items to return from the search, even if there are more matching results
      limit: A recommendation to the service as to the number of items to return per page of results.
      collections: List of one or more Collection IDs or pystac. Collection instances. Only Items in one of the
        provided Collections will be searched
      bbox: A list, tuple, or iterator representing a bounding box of 2D or 3D coordinates. Results will be
        filtered to only those intersecting the bounding box.
      intersects: A string or dictionary representing a GeoJSON geometry, or an object that implements
        a __geo_interface__ property, as supported by several libraries including Shapely, ArcPy, PySAL, and
        geojson. Results filtered to only those intersecting the geometry.
      query: List or JSON of query parameters as per the STAC API query extension.
      sortby: A single field or list of fields to sort the response by
      max_retries:
      delay:

    Returns:
        A list of pystac.Item objects.
    """
    results: list[pystac.Item] = []
    if isinstance(collections, str):
        collections = [collections]
    if isinstance(sortby, dict):
        sortby = [sortby]

    if not self.catalog:
        self.logger.error("STAC client is not initialized.")
        return []

    intro_log = f"Running STAC API search for the following parameters: \n\tDate ranges : {date_ranges}"
    if query:
        intro_log = f"{intro_log} \n\tQuery : {query}"
    self.logger.info(intro_log)

    for attempt in range(1, max_retries + 1):
        try:
            for date_range in date_ranges:
                items = self._base_catalog_search(
                    date_range=date_range,
                    max_items=max_items,
                    limit=limit,
                    collections=collections,
                    bbox=bbox,
                    intersects=intersects,
                    query=query,
                    sortby=sortby,
                )
                results.extend(items)
            break
        except APIError as e:  # pylint: disable=W0718
            self.logger.error(f"Attempt {attempt} failed: {e}")
            if attempt < max_retries:
                time.sleep(delay)
            else:
                raise e

    if not results:
        self.logger.warning(f"Search for date ranges [{date_ranges}] found no results!")
        self.search_results = None

    self.search_results = results
    return results

sort_results_by_cloud_coverage #

sort_results_by_cloud_coverage() -> list[Item] | None

Sorts the search results by cloud coverage (ascending).

Returns:

Type Description
list[Item] | None

A list of sorted pystac.Item objects, or None if no results exist.

Source code in src/geospatial_tools/stac.py
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
def sort_results_by_cloud_coverage(self) -> list[pystac.Item] | None:
    """
    Sorts the search results by cloud coverage (ascending).

    Returns:
        A list of sorted pystac.Item objects, or None if no results exist.
    """
    if self.search_results:
        self.logger.debug("Sorting results by cloud cover (from least to most)")
        self.cloud_cover_sorted_results = sorted(
            self.search_results, key=lambda item: item.properties.get("eo:cloud_cover", float("inf"))
        )
        return self.cloud_cover_sorted_results
    self.logger.warning("No results found: please run a search before trying to sort results")
    return None

filter_no_data #

filter_no_data(property_name: str, max_no_data_value: int = 5) -> list[Item] | None

Filter results that are above a nodata value threshold.

Parameters:

Name Type Description Default
property_name str

Name of the property containing nodata percentage.

required
max_no_data_value int

Max allowed percentage of nodata. (Default value = 5)

5

Returns:

Type Description
list[Item] | None

Filtered list of pystac.Item objects.

Source code in src/geospatial_tools/stac.py
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
def filter_no_data(self, property_name: str, max_no_data_value: int = 5) -> list[pystac.Item] | None:
    """
    Filter results that are above a nodata value threshold.

    Args:
      property_name: Name of the property containing nodata percentage.
      max_no_data_value: Max allowed percentage of nodata. (Default value = 5)

    Returns:
        Filtered list of pystac.Item objects.
    """
    sorted_results = self.cloud_cover_sorted_results
    if not sorted_results:
        sorted_results = self.sort_results_by_cloud_coverage()
    if not sorted_results:
        return None

    filtered_results = []
    for item in sorted_results:
        if item.properties.get(property_name, 0) < max_no_data_value:
            filtered_results.append(item)
    self.filtered_results = filtered_results

    return filtered_results

download_search_results #

download_search_results(bands: list[str], base_directory: str | Path) -> list[Asset]

Downloads assets for all search results.

Parameters:

Name Type Description Default
bands list[str]

List of bands to download.

required
base_directory str | Path

The base directory for downloads.

required

Returns:

Type Description
list[Asset]

A list of Asset objects for the downloaded search results.

Source code in src/geospatial_tools/stac.py
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
def download_search_results(self, bands: list[str], base_directory: str | Path) -> list[Asset]:
    """
    Downloads assets for all search results.

    Args:
        bands: List of bands to download.
        base_directory: The base directory for downloads.

    Returns:
        A list of Asset objects for the downloaded search results.
    """
    downloaded_search_results = self._download_results(
        results=self.search_results, bands=bands, base_directory=base_directory
    )
    self.downloaded_search_assets = downloaded_search_results
    return downloaded_search_results

download_sorted_by_cloud_cover_search_results #

download_sorted_by_cloud_cover_search_results(
    bands: list[str],
    base_directory: str | Path,
    first_x_num_of_items: int | None = None,
) -> list[Asset]

Downloads sorted results.

Parameters:

Name Type Description Default
bands list[str]

List of bands to download.

required
base_directory str | Path

The base directory for downloads.

required
first_x_num_of_items int | None

Optional number of top items to download.

None

Returns:

Type Description
list[Asset]

A list of Asset objects for the downloaded items.

Source code in src/geospatial_tools/stac.py
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
def download_sorted_by_cloud_cover_search_results(
    self, bands: list[str], base_directory: str | Path, first_x_num_of_items: int | None = None
) -> list[Asset]:
    """
    Downloads sorted results.

    Args:
        bands: List of bands to download.
        base_directory: The base directory for downloads.
        first_x_num_of_items: Optional number of top items to download.

    Returns:
        A list of Asset objects for the downloaded items.
    """
    results = self._generate_best_results()
    if not results:
        return []
    if first_x_num_of_items:
        results = results[:first_x_num_of_items]
    downloaded_search_results = self._download_results(results=results, bands=bands, base_directory=base_directory)
    self.downloaded_cloud_cover_sorted_assets = downloaded_search_results
    return downloaded_search_results

download_best_cloud_cover_result #

download_best_cloud_cover_result(
    bands: list[str], base_directory: str | Path
) -> Asset | None

Downloads the single best result based on cloud cover.

Parameters:

Name Type Description Default
bands list[str]

List of bands to download.

required
base_directory str | Path

The base directory for downloads.

required

Returns:

Type Description
Asset | None

The Asset object for the best result, or None if no results available.

Source code in src/geospatial_tools/stac.py
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
def download_best_cloud_cover_result(self, bands: list[str], base_directory: str | Path) -> Asset | None:
    """
    Downloads the single best result based on cloud cover.

    Args:
        bands: List of bands to download.
        base_directory: The base directory for downloads.

    Returns:
        The Asset object for the best result, or None if no results available.
    """
    results = self._generate_best_results()
    if not results:
        return None
    best_result = [results[0]]

    if self.downloaded_cloud_cover_sorted_assets:
        self.logger.info(f"Asset [{best_result[0].id}] is already downloaded")
        self.downloaded_best_sorted_asset = self.downloaded_cloud_cover_sorted_assets[0]
        return self.downloaded_cloud_cover_sorted_assets[0]

    downloaded_search_results = self._download_results(
        results=best_result, bands=bands, base_directory=base_directory
    )
    if downloaded_search_results:
        self.downloaded_best_sorted_asset = downloaded_search_results[0]
        return downloaded_search_results[0]
    return None

create_planetary_computer_catalog #

create_planetary_computer_catalog(
    max_retries: int = 3, delay: int = 5, logger: Logger = LOGGER
) -> Client | None

Creates a Planetary Computer Catalog Client.

Parameters:

Name Type Description Default
max_retries int

The maximum number of retries for the API connection. (Default value = 3)

3
delay int

The delay between retry attempts in seconds. (Default value = 5)

5
logger Logger

The logger instance to use. (Default value = LOGGER)

LOGGER

Returns:

Type Description
Client | None

A pystac_client.Client instance if successful, else None.

Source code in src/geospatial_tools/stac.py
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
def create_planetary_computer_catalog(
    max_retries: int = 3, delay: int = 5, logger: logging.Logger = LOGGER
) -> pystac_client.Client | None:
    """
    Creates a Planetary Computer Catalog Client.

    Args:
      max_retries: The maximum number of retries for the API connection. (Default value = 3)
      delay: The delay between retry attempts in seconds. (Default value = 5)
      logger: The logger instance to use. (Default value = LOGGER)

    Returns:
        A pystac_client.Client instance if successful, else None.
    """
    for attempt in range(1, max_retries + 1):
        try:
            client = pystac_client.Client.open(PLANETARY_COMPUTER_API, modifier=sign_inplace)
            logger.debug("Successfully connected to the API.")
            return client
        except Exception as e:  # pylint: disable=W0718
            logger.error(f"Attempt {attempt} failed: {e}")
            if attempt < max_retries:
                time.sleep(delay)
            else:
                logger.error(e)
                raise e
    return None

create_copernicus_catalog #

create_copernicus_catalog(
    max_retries: int = 3, delay: int = 5, logger: Logger = LOGGER
) -> Client | None

Creates a Copernicus Data Space Ecosystem Catalog Client.

Parameters:

Name Type Description Default
max_retries int

The maximum number of retries for the API connection. (Default value = 3)

3
delay int

The delay between retry attempts in seconds. (Default value = 5)

5
logger Logger

The logger instance to use. (Default value = LOGGER)

LOGGER

Returns:

Type Description
Client | None

A pystac_client.Client instance if successful, else None.

Source code in src/geospatial_tools/stac.py
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
def create_copernicus_catalog(
    max_retries: int = 3, delay: int = 5, logger: logging.Logger = LOGGER
) -> pystac_client.Client | None:
    """
    Creates a Copernicus Data Space Ecosystem Catalog Client.

    Args:
      max_retries: The maximum number of retries for the API connection. (Default value = 3)
      delay: The delay between retry attempts in seconds. (Default value = 5)
      logger: The logger instance to use. (Default value = LOGGER)

    Returns:
        A pystac_client.Client instance if successful, else None.
    """
    for attempt in range(1, max_retries + 1):
        try:
            client = pystac_client.Client.open(COPERNICUS_API)
            logger.debug("Successfully connected to the API.")
            return client
        except Exception as e:  # pylint: disable=W0718
            logger.error(f"Attempt {attempt} failed: {e}")
            if attempt < max_retries:
                time.sleep(delay)
            else:
                logger.error(e)
                raise e
    return None

catalog_generator #

catalog_generator(catalog_name: str, logger: Logger = LOGGER) -> Client | None

Generates a STAC Client for the specified catalog.

Parameters:

Name Type Description Default
catalog_name str

The name of the catalog (e.g., 'planetary_computer', 'copernicus').

required
logger Logger

The logger instance to use.

LOGGER

Returns:

Type Description
Client | None

A pystac_client.Client instance for the requested catalog if supported, else None.

Source code in src/geospatial_tools/stac.py
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
def catalog_generator(catalog_name: str, logger: logging.Logger = LOGGER) -> pystac_client.Client | None:
    """
    Generates a STAC Client for the specified catalog.

    Args:
      catalog_name: The name of the catalog (e.g., 'planetary_computer', 'copernicus').
      logger: The logger instance to use.

    Returns:
        A pystac_client.Client instance for the requested catalog if supported, else None.
    """
    catalog_dict = {
        PLANETARY_COMPUTER: create_planetary_computer_catalog,
        COPERNICUS: create_copernicus_catalog,
    }
    if catalog_name not in catalog_dict:
        logger.error(f"Unsupported catalog name: {catalog_name}")
        return None

    catalog = catalog_dict[catalog_name]()

    return catalog

list_available_catalogs #

list_available_catalogs(logger: Logger = LOGGER) -> frozenset[str]

Lists all available STAC catalogs.

Parameters:

Name Type Description Default
logger Logger

The logger instance to use.

LOGGER

Returns:

Type Description
frozenset[str]

A frozenset of available catalog names.

Source code in src/geospatial_tools/stac.py
121
122
123
124
125
126
127
128
129
130
131
132
def list_available_catalogs(logger: logging.Logger = LOGGER) -> frozenset[str]:
    """
    Lists all available STAC catalogs.

    Args:
      logger: The logger instance to use.

    Returns:
        A frozenset of available catalog names.
    """
    logger.info("Available catalogs")
    return CATALOG_NAME_LIST

download_stac_asset #

download_stac_asset(
    asset_url: str,
    destination: Path,
    method: str = "http",
    headers: dict[str, str] | None = None,
    s3_client: Any | None = None,
    logger: Logger = LOGGER,
) -> Path | None

Generic dispatcher for downloading STAC assets via HTTP or S3.

Parameters:

Name Type Description Default
asset_url str

URL/HREF of the asset to download.

required
destination Path

Path where the file will be saved.

required
method str

Download method ('http' or 's3').

'http'
headers dict[str, str] | None

Headers for HTTP request.

None
s3_client Any | None

Boto3 S3 client (required for 's3' method).

None
logger Logger

Logger instance.

LOGGER

Returns:

Type Description
Path | None

The Path to the downloaded file if successful, else None.

Source code in src/geospatial_tools/stac.py
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
def download_stac_asset(
    asset_url: str,
    destination: Path,
    method: str = "http",
    headers: dict[str, str] | None = None,
    s3_client: Any | None = None,
    logger: logging.Logger = LOGGER,
) -> Path | None:
    """
    Generic dispatcher for downloading STAC assets via HTTP or S3.

    Args:
        asset_url: URL/HREF of the asset to download.
        destination: Path where the file will be saved.
        method: Download method ('http' or 's3').
        headers: Headers for HTTP request.
        s3_client: Boto3 S3 client (required for 's3' method).
        logger: Logger instance.

    Returns:
        The Path to the downloaded file if successful, else None.
    """
    if method == "s3":
        file_path = download_url_s3(asset_url=asset_url, destination=destination, s3_client=s3_client, logger=logger)
        return file_path
    # Default to HTTP
    file_path = download_url(url=asset_url, filename=destination, headers=headers, logger=logger)
    return file_path

utils #

This module contains general utility functions.

create_logger #

create_logger(logger_name: str) -> Logger

Creates a logger object using input name parameter that outputs to stdout.

Parameters:

Name Type Description Default
logger_name str

Name of logger

required

Returns:

Source code in src/geospatial_tools/utils.py
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
def create_logger(logger_name: str) -> logging.Logger:
    """
    Creates a logger object using input name parameter that outputs to stdout.

    Args:
      logger_name: Name of logger

    Returns:
    """
    logging_level = logging.INFO
    app_config_path = CONFIGS / "geospatial_tools_ini.yaml"
    if app_config_path.exists():
        with app_config_path.open("r", encoding="UTF-8") as config_file:
            application_params = yaml.safe_load(config_file)
            logger_params = application_params["logging"]
            logging_level = logger_params["logging_level"].upper()
    if os.getenv("GEO_LOG_LEVEL"):
        logging_level = os.getenv("GEO_LOG_LEVEL").upper()  # type: ignore

    logger = logging.getLogger(logger_name)
    logger.setLevel(logging_level)
    handler = logging.StreamHandler(sys.stdout)
    formatter = logging.Formatter(
        fmt="[%(asctime)s] %(levelname)-10.10s [%(threadName)s][%(name)s] %(message)s",
        datefmt="%Y-%m-%d %H:%M:%S",
    )
    handler.setFormatter(formatter)
    logger.addHandler(handler)
    return logger

get_yaml_config #

get_yaml_config(yaml_config_file: str, logger: Logger = LOGGER) -> dict

This function takes in the path, or name of the file if it can be found in the config/ folder, with of without the extension, and returns the values of the file in a dictionary format.

Ex. For a file named app_config.yml (or app_config.yaml), directly in the config/ folder, the function could be called like so : params = get_yaml_config('app_config')

Parameters:

Name Type Description Default
yaml_config_file str

Path to yaml config file. If config file is in the config folder, you can use the file's name without the extension.

required
logger Logger

Logger to handle messaging, by default LOGGER

LOGGER

Returns:

Source code in src/geospatial_tools/utils.py
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
def get_yaml_config(yaml_config_file: str, logger: logging.Logger = LOGGER) -> dict:
    """
    This function takes in the path, or name of the file if it can be found in the config/ folder, with of without the
    extension, and returns the values of the file in a dictionary format.

    Ex. For a file named app_config.yml (or app_config.yaml), directly in the config/ folder,
        the function could be called like so : `params = get_yaml_config('app_config')`

    Args:
      yaml_config_file: Path to yaml config file. If config file is in the config folder,
        you can use the file's name without the extension.
      logger: Logger to handle messaging, by default LOGGER

    Returns:
    """

    potential_paths = [
        Path(yaml_config_file),
        CONFIGS / yaml_config_file,
        CONFIGS / f"{yaml_config_file}.yaml",
        CONFIGS / f"{yaml_config_file}.yml",
    ]

    config_filepath = None
    for path in potential_paths:
        if path.exists():
            config_filepath = path
            logger.info(f"Yaml config file [{path!s}] found.")
            break

    params: dict[str, Any] = {}
    if not config_filepath:
        logger.error(f"Yaml config file [{yaml_config_file}] was not found.")
        return params

    try:
        with config_filepath.open("r", encoding="UTF-8") as file:
            logger.info(f"Loading YAML config file [{config_filepath}].")
            return yaml.safe_load(file)
    except yaml.YAMLError as e:
        logger.warning(f"Error loading YAML file [{config_filepath}]: {e}")
        return {}

get_json_config #

get_json_config(json_config_file: str, logger=LOGGER) -> dict

This function takes in the path, or name of the file if it can be found in the config/ folder, with of without the extension, and returns the values of the file in a dictionary format.

Ex. For a file named app_config.json, directly in the config/ folder, the function could be called like so : params = get_json_config('app_config')

Parameters:

Name Type Description Default
json_config_file str

Path to JSON config file. If config file is in the config folder,

required
logger

Logger to handle messaging

LOGGER

Returns:

Source code in src/geospatial_tools/utils.py
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
def get_json_config(json_config_file: str, logger=LOGGER) -> dict:
    """
    This function takes in the path, or name of the file if it can be found in the config/ folder, with of without the
    extension, and returns the values of the file in a dictionary format.

    Ex. For a file named app_config.json, directly in the config/ folder,
        the function could be called like so : `params = get_json_config('app_config')`

    Args:
      json_config_file: Path to JSON config file. If config file is in the config folder,
      logger: Logger to handle messaging

    Returns:
    """

    potential_paths = [
        Path(json_config_file),
        CONFIGS / json_config_file,
        CONFIGS / f"{json_config_file}.json",
    ]

    config_filepath = None
    for path in potential_paths:
        if path.exists():
            config_filepath = path
            logger.info(f"JSON config file [{path!s}] found.")
            break

    if not config_filepath:
        logger.error(f"JSON config file [{json_config_file}] not found.")
        return {}

    try:
        with config_filepath.open("r", encoding="UTF-8") as file:
            logger.info(f"Loading JSON config file [{config_filepath}].")
            return json.load(file)
    except json.JSONDecodeError as e:
        logger.warning(f"Error loading JSON file [{config_filepath}]: {e}")
        return {}

create_crs #

create_crs(dataset_crs: str | int, logger=LOGGER)

Parameters:

Name Type Description Default
dataset_crs str | int

EPSG code in string or int format. Can be given in the following ways: 5070 | "5070" | "EPSG:5070"

required
logger

Logger instance (Default value = LOGGER)

LOGGER

Returns:

Source code in src/geospatial_tools/utils.py
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
def create_crs(dataset_crs: str | int, logger=LOGGER):
    """

    Args:
      dataset_crs: EPSG code in string or int format. Can be given in the following ways: 5070 | "5070" | "EPSG:5070"
      logger: Logger instance (Default value = LOGGER)

    Returns:


    """
    logger.info(f"Creating EPSG code from following input : [{dataset_crs}]")
    is_int = isinstance(dataset_crs, int) or (isinstance(dataset_crs, str) and dataset_crs.isnumeric())
    contains_epsg = isinstance(dataset_crs, str) and "EPSG:" in dataset_crs
    if is_int:
        return CRS.from_epsg(dataset_crs)
    if contains_epsg:
        return CRS.from_string(dataset_crs.upper())
    if isinstance(dataset_crs, str) and ":" in dataset_crs:
        logger.warning("Input is not conform to standards. Attempting to extract code from the provided input.")
        recovered_code = dataset_crs.split(":")[-1]
        if recovered_code.isnumeric():
            return CRS.from_epsg(recovered_code)

    logger.error(f"Encountered problem while trying to format EPSG code from input : [{dataset_crs}]")
    return None

download_url #

download_url(
    url: str,
    filename: str | Path,
    overwrite: bool = False,
    headers: dict | None = None,
    logger=LOGGER,
) -> Path | None

This function downloads a file from a given URL.

Parameters:

Name Type Description Default
url str

Url to download

required
filename str | Path

Filename (or full path) to save the downloaded file

required
overwrite bool

If True, overwrite existing file

False
headers dict | None

Optional headers to include in the request (e.g., Authorization)

None
logger

Logger instance

LOGGER

Returns:

Type Description
Path | None

Path to downloaded file

Source code in src/geospatial_tools/utils.py
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
def download_url(
    url: str, filename: str | Path, overwrite: bool = False, headers: dict | None = None, logger=LOGGER
) -> Path | None:
    """
    This function downloads a file from a given URL.

    Args:
      url: Url to download
      filename: Filename (or full path) to save the downloaded file
      overwrite: If True, overwrite existing file
      headers: Optional headers to include in the request (e.g., Authorization)
      logger: Logger instance

    Returns:
        Path to downloaded file
    """
    if isinstance(filename, str):
        filename = Path(filename)

    if filename.exists() and not overwrite:
        logger.info(f"File [{filename}] already exists. Skipping download.")
        return filename

    response = requests.get(url, headers=headers, timeout=None)
    if response.status_code == 200:
        with open(filename, "wb") as f:
            f.write(response.content)
        logger.info(f"Downloaded {filename} successfully.")
        return filename

    logger.error(f"Failed to download the asset. Status code: {response.status_code}")
    return None

unzip_file #

unzip_file(
    zip_path: str | Path, extract_to: str | Path, logger: Logger = LOGGER
) -> list[str | Path]

This function unzips an archive to a specific directory.

Parameters:

Name Type Description Default
zip_path str | Path

Path to zip file

required
extract_to str | Path

Path of directory to extract the zip file

required
logger Logger

Logger instance

LOGGER

Returns:

Source code in src/geospatial_tools/utils.py
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
def unzip_file(zip_path: str | Path, extract_to: str | Path, logger: logging.Logger = LOGGER) -> list[str | Path]:
    """
    This function unzips an archive to a specific directory.

    Args:
      zip_path: Path to zip file
      extract_to: Path of directory to extract the zip file
      logger: Logger instance

    Returns:
    """
    if isinstance(zip_path, str):
        zip_path = Path(zip_path)
    if isinstance(extract_to, str):
        extract_to = Path(extract_to)
    extract_to.mkdir(parents=True, exist_ok=True)
    extracted_files: list[str | Path] = []
    with zipfile.ZipFile(zip_path, "r") as zip_ref:
        for member in zip_ref.infolist():
            zip_ref.extract(member, extract_to)
            logger.info(f"Extracted: [{member.filename}]")
            extracted_files.append(f"{extract_to}/{member.filename}")
    return extracted_files

create_date_range_for_specific_period #

create_date_range_for_specific_period(
    start_year: int, end_year: int, start_month_range: int, end_month_range: int
) -> list[str]

This function create a list of date ranges.

For example, I want to create date ranges for 2020 and 2021, but only for the months from March to May. I therefore expect to have 2 ranges: [2020-03-01 to 2020-05-30, 2021-03-01 to 2021-05-30].

Handles the automatic definition of the last day for the end month, as well as periods that cross over years

For example, I want to create date ranges for 2020 and 2022, but only for the months from November to January. I therefore expect to have 2 ranges: [2020-11-01 to 2021-01-31, 2021-11-01 to 2022-01-31].

Parameters:

Name Type Description Default
start_year int

Start year for ranges

required
end_year int

End year for ranges

required
start_month_range int

Starting month for each period

required
end_month_range int

End month for each period (inclusively)

required

Returns:

Source code in src/geospatial_tools/utils.py
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
def create_date_range_for_specific_period(
    start_year: int, end_year: int, start_month_range: int, end_month_range: int
) -> list[str]:
    """
    This function create a list of date ranges.

    For example, I want to create date ranges for 2020 and 2021, but only for the months from March to May.
    I therefore expect to have 2 ranges: [2020-03-01 to 2020-05-30, 2021-03-01 to 2021-05-30].

    Handles the automatic definition of the last day for the end month, as well as periods that cross over years

    For example, I want to create date ranges for 2020 and 2022, but only for the months from November to January.
    I therefore expect to have 2 ranges: [2020-11-01 to 2021-01-31, 2021-11-01 to 2022-01-31].

    Args:
      start_year: Start year for ranges
      end_year: End year for ranges
      start_month_range: Starting month for each period
      end_month_range: End month for each period (inclusively)

    Returns:
    """
    date_ranges = []
    year_bump = 0
    if start_month_range > end_month_range:
        year_bump = 1
    range_end_year = end_year + 1 - year_bump
    for year in range(start_year, range_end_year):
        start_date = datetime.datetime(year, start_month_range, 1)
        last_day = calendar.monthrange(year + year_bump, end_month_range)[1]
        end_date = datetime.datetime(year + year_bump, end_month_range, last_day, 23, 59, 59)
        date_ranges.append(f"{start_date.isoformat()}Z/{end_date.isoformat()}Z")
    return date_ranges

parse_gzip_header #

parse_gzip_header(path: str | Path) -> dict[str, Any]

Parse the gzip header at the beginning of path (first member only).

Raises ValueError if file doesn't look like gzip.

Parameters:

Name Type Description Default
path str | Path

Path to gzip file

required

Returns:

Name Type Description
dict dict[str, Any]

Returns a dict with keys - compression_method (int) - flags (int) - mtime (int, Unix epoch or 0) - xflags (int) - os (int) - original_name (Optional[str]) # FNAME - comment (Optional[str]) # FCOMMENT - header_end_offset (int) # file offset where compressed data starts

Source code in src/geospatial_tools/utils.py
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
def parse_gzip_header(path: str | Path) -> dict[str, Any]:
    """
    Parse the gzip header at the beginning of `path` (first member only).

    Raises ValueError if file doesn't look like gzip.

    Args:
        path: Path to gzip file

    Returns:
        dict: Returns a dict with keys
                  - compression_method (int)
                  - flags (int)
                  - mtime (int, Unix epoch or 0)
                  - xflags (int)
                  - os (int)
                  - original_name (Optional[str])   # FNAME
                  - comment      (Optional[str])    # FCOMMENT
                  - header_end_offset (int)         # file offset where compressed data starts
    """
    p = Path(path)
    with p.open("rb") as f:
        # Magic
        if f.read(2) != b"\x1f\x8b":
            raise ValueError(f"{p} is not a gzip file (bad magic)")

        method_b = f.read(1)
        flags_b = f.read(1)
        if not method_b or not flags_b:
            raise ValueError("Truncated header")

        compression_method = method_b[0]
        flags = flags_b[0]

        # MTIME(4), XFL(1), OS(1)
        mtime_bytes = f.read(4)
        if len(mtime_bytes) != 4:
            raise ValueError("Truncated header (mtime)")
        mtime = struct.unpack("<I", mtime_bytes)[0]
        xflags = f.read(1)[0]
        os_code = f.read(1)[0]

        # Optional fields in order
        if flags & FEXTRA:
            xlen_bytes = f.read(2)
            if len(xlen_bytes) != 2:
                raise ValueError("Truncated FEXTRA length")
            xlen = struct.unpack("<H", xlen_bytes)[0]
            _ = f.read(xlen)  # skip payload

        original_name: str | None = None
        if flags & FNAME:
            # Historically ISO-8859-1; utf-8 with replace is pragmatic
            original_name = _read_cstring(f).decode("utf-8", errors="replace")

        comment: str | None = None
        if flags & FCOMMENT:
            comment = _read_cstring(f).decode("utf-8", errors="replace")

        if flags & FHCRC:
            _ = f.read(2)  # skip header CRC16

        return {
            "compression_method": compression_method,
            "flags": flags,
            "mtime": mtime,
            "xflags": xflags,
            "os": os_code,
            "original_name": original_name,
            "comment": comment,
            "header_end_offset": f.tell(),
        }

vector #

This module contains functions that process or create vector data.

create_grid_coordinates #

create_grid_coordinates(
    bounding_box: list | tuple | ndarray, grid_size: float, logger: Logger = LOGGER
) -> tuple[ndarray, ndarray]

Create grid coordinates based on input bounding box and grid size.

Parameters:

Name Type Description Default
bounding_box list | tuple | ndarray

The bounding box of the grid as (min_lon, min_lat, max_lon, max_lat). Unit needs to be based on projection used (meters, degrees, etc.).

required
grid_size float

Cell size for grid. Unit needs to be based on projection used (meters, degrees, etc.).

required
logger Logger

Logger instance.

LOGGER

Returns:

Source code in src/geospatial_tools/vector.py
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
def create_grid_coordinates(
    bounding_box: list | tuple | ndarray, grid_size: float, logger: logging.Logger = LOGGER
) -> tuple[ndarray, ndarray]:
    """
    Create grid coordinates based on input bounding box and grid size.

    Args:
      bounding_box: The bounding box of the grid as (min_lon, min_lat, max_lon, max_lat).
        Unit needs to be based on projection used (meters, degrees, etc.).
      grid_size: Cell size for grid. Unit needs to be based on projection used (meters, degrees, etc.).
      logger: Logger instance.

    Returns:
    """
    logger.info(f"Creating grid coordinates for bounding box [{bounding_box}]")
    min_lon, min_lat, max_lon, max_lat = bounding_box
    lon_coords = np.arange(min_lon, stop=max_lon, step=grid_size)
    lat_coords = np.arange(min_lat, stop=max_lat, step=grid_size)
    return lon_coords, lat_coords

generate_flattened_grid_coords #

generate_flattened_grid_coords(
    lon_coords: ndarray, lat_coords: ndarray, logger: Logger = LOGGER
) -> tuple[ndarray, ndarray]

Takes in previously created grid coordinates and flattens them.

Parameters:

Name Type Description Default
lon_coords ndarray

Longitude grid coordinates

required
lat_coords ndarray

Latitude grid coordinates

required
logger Logger

Logger instance.

LOGGER

Returns:

Source code in src/geospatial_tools/vector.py
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
def generate_flattened_grid_coords(
    lon_coords: ndarray, lat_coords: ndarray, logger: logging.Logger = LOGGER
) -> tuple[ndarray, ndarray]:
    """
    Takes in previously created grid coordinates and flattens them.

    Args:
      lon_coords: Longitude grid coordinates
      lat_coords: Latitude grid coordinates
      logger: Logger instance.

    Returns:
    """

    logger.info("Creating flattened grid coordinates")
    lon_grid, lat_grid = np.meshgrid(lon_coords, lat_coords)
    lon_grid = lon_grid.flatten()
    lat_grid = lat_grid.flatten()
    return lon_grid, lat_grid

create_vector_grid #

create_vector_grid(
    bounding_box: list | tuple | ndarray,
    grid_size: float,
    crs: str | int | None = None,
    logger: Logger = LOGGER,
) -> GeoDataFrame

Create a grid of polygons within the specified bounds and cell size. This function uses NumPy vectorized arrays for optimized performance.

Parameters:

Name Type Description Default
bounding_box list | tuple | ndarray

The bounding box of the grid as (min_lon, min_lat, max_lon, max_lat).

required
grid_size float

The size of each grid cell in degrees.

required
crs str | int | None

CRS code for projection. ex. 'EPSG:4326'

None
logger Logger

Logger instance.

LOGGER

Returns:

Source code in src/geospatial_tools/vector.py
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
def create_vector_grid(
    bounding_box: list | tuple | ndarray,
    grid_size: float,
    crs: str | int | None = None,
    logger: logging.Logger = LOGGER,
) -> GeoDataFrame:
    """
    Create a grid of polygons within the specified bounds and cell size. This function uses NumPy vectorized arrays for
    optimized performance.

    Args:
      bounding_box: The bounding box of the grid as (min_lon, min_lat, max_lon, max_lat).
      grid_size: The size of each grid cell in degrees.
      crs: CRS code for projection. ex. 'EPSG:4326'
      logger: Logger instance.

    Returns:
    """
    lon_coords, lat_coords = create_grid_coordinates(bounding_box=bounding_box, grid_size=grid_size, logger=logger)
    lon_flat_grid, lat_flat_grid = generate_flattened_grid_coords(
        lat_coords=lat_coords, lon_coords=lon_coords, logger=logger
    )

    num_cells = len(lon_flat_grid)
    logger.info(f"Allocating polygon array for [{num_cells}] polygons")
    polygons = np.empty(num_cells, dtype=object)

    for i in range(num_cells):
        x, y = lon_flat_grid[i], lat_flat_grid[i]
        polygons[i] = Polygon([(x, y), (x + grid_size, y), (x + grid_size, y + grid_size), (x, y + grid_size)])

    properties: dict[str, Any] = {"data": {"geometry": polygons}}
    if crs:
        properties["crs"] = crs
    grid = GeoDataFrame(**properties)
    grid.sindex  # pylint: disable=W0104
    _generate_uuid_column(grid)
    return grid

create_vector_grid_parallel #

create_vector_grid_parallel(
    bounding_box: list | tuple | ndarray,
    grid_size: float,
    crs: str | int | None = None,
    num_of_workers: int | None = None,
    logger: Logger = LOGGER,
) -> GeoDataFrame

Create a grid of polygons within the specified bounds and cell size. This function uses NumPy for optimized performance and ProcessPoolExecutor for parallel execution.

Parameters:

Name Type Description Default
bounding_box list | tuple | ndarray

The bounding box of the grid as (min_lon, min_lat, max_lon, max_lat).

required
grid_size float

The size of each grid cell in degrees.

required
crs str | int | None

Coordinate reference system for the resulting GeoDataFrame.

None
num_of_workers int | None

The number of processes to use for parallel execution. Defaults to the min of number of CPU cores or number of cells in the grid

None
logger Logger

Logger instance.

LOGGER

Returns:

Source code in src/geospatial_tools/vector.py
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
def create_vector_grid_parallel(
    bounding_box: list | tuple | ndarray,
    grid_size: float,
    crs: str | int | None = None,
    num_of_workers: int | None = None,
    logger: logging.Logger = LOGGER,
) -> GeoDataFrame:
    """
    Create a grid of polygons within the specified bounds and cell size. This function uses NumPy for optimized
    performance and ProcessPoolExecutor for parallel execution.

    Args:
      bounding_box: The bounding box of the grid as (min_lon, min_lat, max_lon, max_lat).
      grid_size: The size of each grid cell in degrees.
      crs: Coordinate reference system for the resulting GeoDataFrame.
      num_of_workers: The number of processes to use for parallel execution. Defaults to the min of number of CPU cores
        or number of cells in the grid
      logger: Logger instance.

    Returns:
    """
    lon_coords, lat_coords = create_grid_coordinates(bounding_box=bounding_box, grid_size=grid_size, logger=logger)
    lon_flat_grid, lat_flat_grid = generate_flattened_grid_coords(
        lat_coords=lat_coords, lon_coords=lon_coords, logger=logger
    )

    num_cells = len(lon_flat_grid)
    workers = min(cpu_count(), num_cells)
    if num_of_workers:
        workers = num_of_workers

    logger.info(f"Number of workers used: {workers}")
    logger.info(f"Allocating polygon array for [{num_cells}] polygons")

    chunk_size = (num_cells + workers - 1) // workers
    chunks = [
        (lon_flat_grid[i : i + chunk_size], lat_flat_grid[i : i + chunk_size], grid_size)
        for i in range(0, num_cells, chunk_size)
    ]

    polygons = []
    logger.info("Creating polygons from chunks")
    with ProcessPoolExecutor(max_workers=workers) as executor:
        results = executor.map(_create_polygons_from_coords_chunk, chunks)
        for result in results:
            polygons.extend(result)

    logger.info("Managing properties")
    properties: dict[str, Any] = {"data": {"geometry": polygons}}
    if crs:
        projection = create_crs(crs)
        properties["crs"] = projection
    grid: GeoDataFrame = GeoDataFrame(**properties)
    logger.info("Creating spatial index")
    grid.sindex  # pylint: disable=W0104
    logger.info("Generating polygon UUIDs")
    _generate_uuid_column(grid)
    return grid

dask_spatial_join #

dask_spatial_join(
    select_features_from: GeoDataFrame,
    intersected_with: GeoDataFrame,
    join_type: str = "inner",
    predicate: str = "intersects",
    num_of_workers: int = 4,
) -> GeoDataFrame

Parameters:

Name Type Description Default
select_features_from GeoDataFrame
required
intersected_with GeoDataFrame
required
join_type str

str:

'inner'
predicate str

str:

'intersects'
num_of_workers int
4

Returns:

Source code in src/geospatial_tools/vector.py
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
def dask_spatial_join(
    select_features_from: GeoDataFrame,
    intersected_with: GeoDataFrame,
    join_type: str = "inner",
    predicate: str = "intersects",
    num_of_workers: int = 4,
) -> GeoDataFrame:
    """

    Args:
      select_features_from:
      intersected_with:
      join_type: str:
      predicate: str:
      num_of_workers:

    Returns:


    """
    dask_select_gdf = dgpd.from_geopandas(select_features_from, npartitions=num_of_workers)
    dask_intersected_gdf = dgpd.from_geopandas(intersected_with, npartitions=1)
    result = dgpd.sjoin(dask_select_gdf, dask_intersected_gdf, how=join_type, predicate=predicate).compute()
    result = GeoDataFrame(result)
    result.sindex  # pylint: disable=W0104

    return result

multiprocessor_spatial_join #

multiprocessor_spatial_join(
    select_features_from: GeoDataFrame,
    intersected_with: GeoDataFrame,
    join_type: str = "inner",
    predicate: str = "intersects",
    num_of_workers: int = 4,
    logger: Logger = LOGGER,
) -> GeoDataFrame

Parameters:

Name Type Description Default
select_features_from GeoDataFrame

Numpy array containing the polygons from which to select features from.

required
intersected_with GeoDataFrame

Geodataframe containing the polygons that will be used to select features with via an intersect operation.

required
join_type str

How the join will be executed. Available join_types are: ['left', 'right', 'inner']. Defaults to 'inner'

'inner'
predicate str

The predicate to use for selecting features from. Available predicates are: ['intersects', 'contains', 'within', 'touches', 'crosses', 'overlaps']. Defaults to 'intersects'

'intersects'
num_of_workers int

The number of processes to use for parallel execution. Defaults to 4.

4
logger Logger

Logger instance.

LOGGER

Returns:

Source code in src/geospatial_tools/vector.py
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
def multiprocessor_spatial_join(
    select_features_from: GeoDataFrame,
    intersected_with: GeoDataFrame,
    join_type: str = "inner",
    predicate: str = "intersects",
    num_of_workers: int = 4,
    logger: logging.Logger = LOGGER,
) -> GeoDataFrame:
    """

    Args:
      select_features_from: Numpy array containing the polygons from which to select features from.
      intersected_with: Geodataframe containing the polygons that will be used to select features with via an
        intersect operation.
      join_type: How the join will be executed. Available join_types are:
        ['left', 'right', 'inner']. Defaults to 'inner'
      predicate: The predicate to use for selecting features from. Available predicates are:
        ['intersects', 'contains', 'within', 'touches', 'crosses', 'overlaps']. Defaults to 'intersects'
      num_of_workers: The number of processes to use for parallel execution. Defaults to 4.
      logger: Logger instance.

    Returns:


    """
    select_features_from_chunks = np.array_split(select_features_from, num_of_workers)
    with ProcessPoolExecutor(max_workers=num_of_workers) as executor:
        futures = [
            executor.submit(gpd.sjoin, chunk, intersected_with, how=join_type, predicate=predicate)
            for chunk in select_features_from_chunks
        ]
        intersecting_polygons_list = [future.result() for future in futures]
    logger.info("Concatenating results")
    intersecting_polygons = gpd.GeoDataFrame(pd.concat(intersecting_polygons_list, ignore_index=True))
    logger.info("Creating spatial index")
    intersecting_polygons.sindex  # pylint: disable=W0104
    if len(intersected_with) > 1:
        # This last step is necessary when doing a spatial join where `intersected_with` contains multiple features
        logger.info("Dropping duplicates")
        intersecting_polygons = intersecting_polygons.drop_duplicates(subset="geometry")
    return intersecting_polygons

select_polygons_by_location #

select_polygons_by_location(
    select_features_from: GeoDataFrame,
    intersected_with: GeoDataFrame,
    num_of_workers: int | None = None,
    join_type: str = "inner",
    predicate: str = "intersects",
    join_function=multiprocessor_spatial_join,
    logger: Logger = LOGGER,
) -> GeoDataFrame

This function executes a select by location operation on a GeoDataFrame. It is essentially a wrapper around gpd.sjoin to allow parallel execution. While it does use sjoin, only the columns from select_features_from are kept.

Parameters:

Name Type Description Default
select_features_from GeoDataFrame

GeoDataFrame containing the polygons from which to select features from.

required
intersected_with GeoDataFrame

Geodataframe containing the polygons that will be used to select features with via an intersect operation.

required
num_of_workers int | None

Number of parallel processes to use for execution. If using on a compute cluster, please set a specific amount (ex. 1 per CPU core requested). Defaults to the min of number of CPU cores or number (cpu_count())

None
join_type str
'inner'
predicate str

The predicate to use for selecting features from. Available predicates are: ['intersects', 'contains', 'within', 'touches', 'crosses', 'overlaps']. Defaults to 'intersects'

'intersects'
join_function

Function that will execute the join operation. Available functions are: 'multiprocessor_spatial_join'; 'dask_spatial_join'; or custom functions. (Default value = multiprocessor_spatial_join)

multiprocessor_spatial_join
logger Logger

Logger instance.

LOGGER

Returns:

Source code in src/geospatial_tools/vector.py
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
def select_polygons_by_location(
    select_features_from: GeoDataFrame,
    intersected_with: GeoDataFrame,
    num_of_workers: int | None = None,
    join_type: str = "inner",
    predicate: str = "intersects",
    join_function=multiprocessor_spatial_join,
    logger: logging.Logger = LOGGER,
) -> GeoDataFrame:
    """
    This function executes a `select by location` operation on a GeoDataFrame. It is essentially a wrapper around
    `gpd.sjoin` to allow parallel execution. While it does use `sjoin`, only the columns from `select_features_from` are
    kept.

    Args:
      select_features_from: GeoDataFrame containing the polygons from which to select features from.
      intersected_with: Geodataframe containing the polygons that will be used to select features with via an intersect
        operation.
      num_of_workers: Number of parallel processes to use for execution. If using
        on a compute cluster, please set a specific amount (ex. 1 per CPU core requested).
        Defaults to the min of number of CPU cores
        or number (cpu_count())
      join_type:
      predicate: The predicate to use for selecting features from. Available predicates are:
        ['intersects', 'contains', 'within', 'touches', 'crosses', 'overlaps']. Defaults to 'intersects'
      join_function: Function that will execute the join operation. Available functions are:
        'multiprocessor_spatial_join'; 'dask_spatial_join'; or custom functions.
        (Default value = multiprocessor_spatial_join)
      logger: Logger instance.

    Returns:
    """
    workers = cpu_count()
    if num_of_workers:
        workers = num_of_workers
    logger.info(f"Number of workers used: {workers}")

    intersecting_polygons = join_function(
        select_features_from=select_features_from,
        intersected_with=intersected_with,
        join_type=join_type,
        predicate=predicate,
        num_of_workers=num_of_workers,
    )
    logger.info("Filtering columns of the results")
    filtered_result_gdf = intersecting_polygons.drop(columns=intersecting_polygons.filter(like="_right").columns)
    column_list_to_filter = [item for item in intersected_with.columns if item not in select_features_from.columns]
    conserved_columns = [col for col in filtered_result_gdf.columns if col not in column_list_to_filter]
    filtered_result_gdf = filtered_result_gdf[conserved_columns]  # pylint: disable=E1136

    return filtered_result_gdf

to_geopackage #

to_geopackage(gdf: GeoDataFrame, filename: str | Path, logger=LOGGER) -> str | Path

Save GeoDataFrame to a Geopackage file.

Parameters:

Name Type Description Default
gdf GeoDataFrame

The GeoDataFrame to save.

required
filename str | Path

The filename to save to.

required
logger

Logger instance (Default value = LOGGER)

LOGGER

Returns:

Source code in src/geospatial_tools/vector.py
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
def to_geopackage(gdf: GeoDataFrame, filename: str | Path, logger=LOGGER) -> str | Path:
    """
    Save GeoDataFrame to a Geopackage file.

    Args:
      gdf: The GeoDataFrame to save.
      filename: The filename to save to.
      logger: Logger instance (Default value = LOGGER)

    Returns:
    """
    start = time.time()
    logger.info("Starting writing process")
    if isinstance(gdf, pd.DataFrame):
        gdf = GeoDataFrame(gdf)
    gdf.to_file(filename, driver=GEOPACKAGE_DRIVER, mode="w")
    stop = time.time()
    logger.info(f"File [{filename}] took {stop - start} seconds to write.")

    return filename

to_geopackage_chunked #

to_geopackage_chunked(
    gdf: GeoDataFrame, filename: str, chunk_size: int = 1000000, logger: Logger = LOGGER
) -> str

Save GeoDataFrame to a Geopackage file using chunks to help with potential memory consumption. This function can potentially be slower than to_geopackage, especially if chunk_size is not adequately defined. Therefore, this function should only be required if to_geopackage fails because of memory issues.

Parameters:

Name Type Description Default
gdf GeoDataFrame

The GeoDataFrame to save.

required
filename str

The filename to save to.

required
chunk_size int

The number of rows per chunk.

1000000
logger Logger

Logger instance.

LOGGER

Returns:

Source code in src/geospatial_tools/vector.py
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
def to_geopackage_chunked(
    gdf: GeoDataFrame, filename: str, chunk_size: int = 1000000, logger: logging.Logger = LOGGER
) -> str:
    """
    Save GeoDataFrame to a Geopackage file using chunks to help with potential memory consumption. This function can
    potentially be slower than `to_geopackage`, especially if `chunk_size` is not adequately defined. Therefore, this
    function should only be required if `to_geopackage` fails because of memory issues.

    Args:
      gdf: The GeoDataFrame to save.
      filename: The filename to save to.
      chunk_size: The number of rows per chunk.
      logger: Logger instance.

    Returns:
    """
    filename_path = Path(filename)
    if filename_path.exists():
        filename_path.unlink()

    start = time.time()
    logger.info("Starting writing process")
    logger.info(f"Chunk size used : [{chunk_size}]")
    chunk = gdf.iloc[0:chunk_size]
    chunk.to_file(filename, driver=GEOPACKAGE_DRIVER, mode="w")

    for i in range(chunk_size, len(gdf), chunk_size):
        chunk = gdf.iloc[i : i + chunk_size]
        chunk.to_file(filename, driver=GEOPACKAGE_DRIVER, mode="a")

    stop = time.time()
    logger.info(f"File [{filename}] took {stop - start} seconds to write.")

    return filename

select_all_within_feature #

select_all_within_feature(
    polygon_feature: GeoSeries, vector_features: GeoDataFrame
) -> GeoSeries

This function is quite small and simple, but exists mostly as a.

Parameters:

Name Type Description Default
polygon_feature GeoSeries

Polygon feature that will be used to find which features of vector_features are contained within it. In this function, it is expected to be a GeoSeries, so a single row from a GeoDataFrame.

required
vector_features GeoDataFrame

The dataframe containing the features that will be grouped by polygon_feature.

required

Returns:

Source code in src/geospatial_tools/vector.py
381
382
383
384
385
386
387
388
389
390
391
392
393
def select_all_within_feature(polygon_feature: gpd.GeoSeries, vector_features: gpd.GeoDataFrame) -> gpd.GeoSeries:
    """
    This function is quite small and simple, but exists mostly as a.

    Args:
      polygon_feature: Polygon feature that will be used to find which features of `vector_features` are contained
        within it. In this function, it is expected to be a GeoSeries, so a single row from a GeoDataFrame.
      vector_features: The dataframe containing the features that will be grouped by polygon_feature.

    Returns:
    """
    contained_features = vector_features[vector_features.within(polygon_feature.geometry)]
    return contained_features

add_and_fill_contained_column #

add_and_fill_contained_column(
    polygon_feature,
    polygon_column_name: str,
    vector_features,
    vector_column_name: str,
    logger=LOGGER,
) -> None

This function make in place changes to vector_geodataframe.

The purpose of this function is to first do a spatial search operation on which vector_features are within polygon_feature, and then write the contents found in the polygon_column_name to the selected vector_features

Parameters:

Name Type Description Default
polygon_feature

Polygon feature that will be used to find which features of vector_features are contained within it.

required
polygon_column_name str

The name of the column in polygon_feature that contains the name/id of each polygon to be written to vector_features.

required
vector_features

The dataframe containing the features that will be grouped by polygon_feature.

required
vector_column_name str

The name of the column in vector_features that will the name/id of each polygon.

required
logger

Logger instance

LOGGER

Returns:

Source code in src/geospatial_tools/vector.py
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
def add_and_fill_contained_column(
    polygon_feature, polygon_column_name: str, vector_features, vector_column_name: str, logger=LOGGER
) -> None:
    """
    This function make in place changes to `vector_geodataframe`.

    The purpose of this function is to first do a spatial search operation on which `vector_features` are within
    `polygon_feature`, and then write the contents found in the `polygon_column_name` to the selected `vector_features`

    Args:
      polygon_feature: Polygon feature that will be used to find which features of `vector_features` are contained
        within it.
      polygon_column_name: The name of the column in `polygon_feature` that contains the name/id of each polygon to
        be written to `vector_features`.
      vector_features: The dataframe containing the features that will be grouped by polygon_feature.
      vector_column_name: The name of the column in `vector_features` that will the name/id of each polygon.
      logger: Logger instance

    Returns:
    """
    feature_name = polygon_feature[polygon_column_name]
    logger.info(f"Selecting all vector features that are within {feature_name}")
    selected_features = select_all_within_feature(polygon_feature=polygon_feature, vector_features=vector_features)
    logger.info(f"Writing [{feature_name}] to selected vector features")

    vector_features.loc[selected_features.index, vector_column_name] = vector_features.loc[
        selected_features.index, vector_column_name
    ].apply(lambda s: s | {feature_name})

find_and_write_all_contained_features #

find_and_write_all_contained_features(
    polygon_features: GeoDataFrame,
    polygon_column: str,
    vector_features: GeoDataFrame,
    vector_column_name: str,
    logger=LOGGER,
) -> None

This function make in place changes to vector_geodataframe.

It iterates on all features of a dataframe containing polygons and executes a spatial search with each polygon to find all vector features from vector_features that are contained by it.

The name/id of each polygon is added to a set in a new column in vector_features to identify which features are within which polygon.

To make things simple, this is basically a "group by" operation based on the "within" spatial operator. Each feature in vector_features will have a list of all the polygons that contain it (contain as being completely within the polygon).

Parameters:

Name Type Description Default
polygon_features GeoDataFrame

Dataframes containing polygons. Will be used to find which features of vector_features are contained within which polygon

required
polygon_column str

The name of the column in polygon_features that contains the name/id of each polygon.

required
vector_features GeoDataFrame

The dataframe containing the features that will be grouped by polygon.

required
vector_column_name str

The name of the column in vector_features that will the name/id of each polygon.

required
logger

(Default value = LOGGER)

LOGGER

Returns:

Source code in src/geospatial_tools/vector.py
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
def find_and_write_all_contained_features(
    polygon_features: gpd.GeoDataFrame,
    polygon_column: str,
    vector_features: gpd.GeoDataFrame,
    vector_column_name: str,
    logger=LOGGER,
) -> None:
    """
    This function make in place changes to `vector_geodataframe`.

    It iterates on all features of a dataframe containing polygons and executes a spatial search with each
    polygon to find all vector features from `vector_features` that are contained by it.

    The name/id of each polygon is added to a set in a new column in
    `vector_features` to identify which features are within which polygon.

    To make things simple, this is basically a "group by" operation based on the
    "within" spatial operator. Each feature in `vector_features` will have a list of
    all the polygons that contain it (contain as being completely within the polygon).

    Args:
      polygon_features: Dataframes containing polygons. Will be used to find which features of `vector_features`
        are contained within which polygon
      polygon_column: The name of the column in `polygon_features` that contains the name/id
        of each polygon.
      vector_features: The dataframe containing the features that will be grouped by polygon.
      vector_column_name: The name of the column in `vector_features` that will the name/id of each polygon.
      logger:  (Default value = LOGGER)

    Returns:
    """
    if vector_column_name not in vector_features.columns:
        vector_features[vector_column_name] = [set() for _ in range(len(vector_features))]

    logger.info("Starting process to find and identify contained features")
    polygon_features.apply(
        lambda row: add_and_fill_contained_column(
            polygon_feature=row,
            polygon_column_name=polygon_column,
            vector_features=vector_features,
            vector_column_name=vector_column_name,
        ),
        axis=1,
    )
    vector_features[vector_column_name] = vector_features[vector_column_name].apply(sorted)
    logger.info("Process to find and identify contained features is completed")

spatial_join_within #

spatial_join_within(
    polygon_features: GeoDataFrame,
    polygon_column: str,
    vector_features: GeoDataFrame,
    vector_column_name: str,
    join_type: str = "left",
    predicate: str = "within",
    logger=LOGGER,
) -> GeoDataFrame

This function does approximately the same thing as find_and_write_all_contained_features, but does not make in place changes to vector_features and instead returns a new dataframe.

This function is more efficient than find_and_write_all_contained_features but offers less flexibility.

It does a spatial join based on a within operation between features to associate which vector_features are within which polygon_features, groups the results by vector feature

Parameters:

Name Type Description Default
polygon_features GeoDataFrame

Dataframes containing polygons. Will be used to find which features of vector_features are contained within which polygon

required
polygon_column str

The name of the column in polygon_features that contains the name/id of each polygon.

required
vector_features GeoDataFrame

The dataframe containing the features that will be grouped by polygon.

required
vector_column_name str

The name of the column in vector_features that will contain the name/id of each polygon.

required
join_type str
'left'
predicate str

The predicate to use for the spatial join operation. Defaults to within.

'within'
logger

Logger instance

LOGGER

Returns:

Source code in src/geospatial_tools/vector.py
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
def spatial_join_within(
    polygon_features: gpd.GeoDataFrame,
    polygon_column: str,
    vector_features: gpd.GeoDataFrame,
    vector_column_name: str,
    join_type: str = "left",
    predicate: str = "within",
    logger=LOGGER,
) -> gpd.GeoDataFrame:
    """
    This function does approximately the same thing as `find_and_write_all_contained_features`, but does not make in
    place changes to `vector_features` and instead returns a new dataframe.

    This function is more efficient than `find_and_write_all_contained_features` but offers less flexibility.

    It does a spatial join based on a within operation between features to associate which `vector_features`
    are within which `polygon_features`, groups the results by vector feature

    Args:
      polygon_features: Dataframes containing polygons. Will be used to find which features of `vector_features`
        are contained within which polygon
      polygon_column: The name of the column in `polygon_features` that contains the name/id
        of each polygon.
      vector_features: The dataframe containing the features that will be grouped by polygon.
      vector_column_name: The name of the column in `vector_features` that will contain the name/id of each polygon.
      join_type:
      predicate: The predicate to use for the spatial join operation. Defaults to `within`.
      logger: Logger instance

    Returns:
    """
    temp_feature_id = "feature_id"
    uuid_suffix = str(uuid.uuid4())
    if temp_feature_id in vector_features.columns:
        logger.info("Creating temporary UUID field for join operations")
        temp_feature_id = f"{temp_feature_id}_{uuid_suffix}"
    _generate_uuid_column(df=vector_features, column_name=temp_feature_id)
    logger.info("Starting process to find and identify contained features using spatial 'within' join operation")
    joined_gdf = gpd.sjoin(
        vector_features, polygon_features[[polygon_column, "geometry"]], how=join_type, predicate=predicate
    )
    logger.info("Grouping results")
    grouped_gdf = joined_gdf.groupby(temp_feature_id)[polygon_column].agg(list).reset_index()
    logger.info("Cleaning and merging results")
    features = vector_features.merge(grouped_gdf, on=temp_feature_id, how="left")
    features = features.rename(columns={polygon_column: vector_column_name})
    features.drop(columns=[temp_feature_id], inplace=True)
    features[vector_column_name] = features[vector_column_name].apply(sorted)
    logger.info("Spatial join operation is completed")
    return gpd.GeoDataFrame(features)