Skip to content

vector

This module contains functions that process or create vector data.

create_grid_coordinates #

create_grid_coordinates(
    bounding_box: list | tuple | ndarray, grid_size: float, logger: Logger = LOGGER
) -> tuple[ndarray, ndarray]

Create grid coordinates based on input bounding box and grid size.

Parameters:

Name Type Description Default
bounding_box list | tuple | ndarray

The bounding box of the grid as (min_lon, min_lat, max_lon, max_lat). Unit needs to be based on projection used (meters, degrees, etc.).

required
grid_size float

Cell size for grid. Unit needs to be based on projection used (meters, degrees, etc.).

required
logger Logger

Logger instance.

LOGGER

Returns:

Type Description
tuple[ndarray, ndarray]

A tuple containing two numpy arrays for longitude and latitude coordinates.

Source code in src/geospatial_tools/vector.py
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
def create_grid_coordinates(
    bounding_box: list | tuple | ndarray, grid_size: float, logger: logging.Logger = LOGGER
) -> tuple[ndarray, ndarray]:
    """
    Create grid coordinates based on input bounding box and grid size.

    Args:
      bounding_box: The bounding box of the grid as (min_lon, min_lat, max_lon, max_lat).
        Unit needs to be based on projection used (meters, degrees, etc.).
      grid_size: Cell size for grid. Unit needs to be based on projection used (meters, degrees, etc.).
      logger: Logger instance.

    Returns:
      A tuple containing two numpy arrays for longitude and latitude coordinates.
    """
    logger.info(f"Creating grid coordinates for bounding box [{bounding_box}]")
    min_lon, min_lat, max_lon, max_lat = bounding_box
    lon_coords = np.arange(min_lon, stop=max_lon, step=grid_size)
    lat_coords = np.arange(min_lat, stop=max_lat, step=grid_size)
    return lon_coords, lat_coords

generate_flattened_grid_coords #

generate_flattened_grid_coords(
    lon_coords: ndarray, lat_coords: ndarray, logger: Logger = LOGGER
) -> tuple[ndarray, ndarray]

Takes in previously created grid coordinates and flattens them.

Parameters:

Name Type Description Default
lon_coords ndarray

Longitude grid coordinates

required
lat_coords ndarray

Latitude grid coordinates

required
logger Logger

Logger instance.

LOGGER

Returns:

Source code in src/geospatial_tools/vector.py
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
def generate_flattened_grid_coords(
    lon_coords: ndarray, lat_coords: ndarray, logger: logging.Logger = LOGGER
) -> tuple[ndarray, ndarray]:
    """
    Takes in previously created grid coordinates and flattens them.

    Args:
      lon_coords: Longitude grid coordinates
      lat_coords: Latitude grid coordinates
      logger: Logger instance.

    Returns:
    """

    logger.info("Creating flattened grid coordinates")
    lon_grid, lat_grid = np.meshgrid(lon_coords, lat_coords)
    lon_grid = lon_grid.flatten()
    lat_grid = lat_grid.flatten()
    return lon_grid, lat_grid

create_vector_grid #

create_vector_grid(
    bounding_box: list | tuple,
    grid_size: float,
    crs: str = "4326",
    logger: Logger = LOGGER,
) -> GeoDataFrame

Create a grid of polygons within the specified bounds and cell size. This function uses NumPy vectorized arrays for optimized performance.

Parameters:

Name Type Description Default
bounding_box list | tuple

The bounding box of the grid as (min_lon, min_lat, max_lon, max_lat).

required
grid_size float

The size of each grid cell in degrees.

required
crs str

CRS code for projection. ex. 'EPSG:4326'

'4326'
logger Logger

Logger instance.

LOGGER

Returns:

Source code in src/geospatial_tools/vector.py
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
def create_vector_grid(
    bounding_box: list | tuple, grid_size: float, crs: str = "4326", logger: logging.Logger = LOGGER
) -> GeoDataFrame:
    """
    Create a grid of polygons within the specified bounds and cell size. This function uses NumPy vectorized arrays for
    optimized performance.

    Args:
      bounding_box: The bounding box of the grid as (min_lon, min_lat, max_lon, max_lat).
      grid_size: The size of each grid cell in degrees.
      crs: CRS code for projection. ex. 'EPSG:4326'
      logger: Logger instance.

    Returns:
    """
    lon_coords, lat_coords = create_grid_coordinates(bounding_box=bounding_box, grid_size=grid_size, logger=logger)
    lon_flat_grid, lat_flat_grid = generate_flattened_grid_coords(
        lat_coords=lat_coords, lon_coords=lon_coords, logger=logger
    )

    num_cells = len(lon_flat_grid)
    logger.info(f"Allocating polygon array for [{num_cells}] polygons")
    polygons = np.empty(num_cells, dtype=object)

    for i in range(num_cells):
        x, y = lon_flat_grid[i], lat_flat_grid[i]
        polygons[i] = Polygon([(x, y), (x + grid_size, y), (x + grid_size, y + grid_size), (x, y + grid_size)])

    properties: dict[str, Any] = {"data": {"geometry": polygons}}
    if crs:
        properties["crs"] = crs
    grid = GeoDataFrame(**properties)
    _ = grid.sindex
    _generate_uuid_column(grid)
    return grid

create_vector_grid_parallel #

create_vector_grid_parallel(
    bounding_box: list | tuple | ndarray,
    grid_size: float,
    crs: str | int | None = None,
    num_of_workers: int | None = None,
    logger: Logger = LOGGER,
) -> GeoDataFrame

Create a grid of polygons within the specified bounds and cell size. This function uses NumPy for optimized performance and ProcessPoolExecutor for parallel execution.

Parameters:

Name Type Description Default
bounding_box list | tuple | ndarray

The bounding box of the grid as (min_lon, min_lat, max_lon, max_lat).

required
grid_size float

The size of each grid cell in degrees.

required
crs str | int | None

Coordinate reference system for the resulting GeoDataFrame.

None
num_of_workers int | None

The number of processes to use for parallel execution. Defaults to the min of number of CPU cores or number of cells in the grid

None
logger Logger

Logger instance.

LOGGER

Returns:

Source code in src/geospatial_tools/vector.py
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
def create_vector_grid_parallel(
    bounding_box: list | tuple | ndarray,
    grid_size: float,
    crs: str | int | None = None,
    num_of_workers: int | None = None,
    logger: logging.Logger = LOGGER,
) -> GeoDataFrame:
    """
    Create a grid of polygons within the specified bounds and cell size. This function uses NumPy for optimized
    performance and ProcessPoolExecutor for parallel execution.

    Args:
      bounding_box: The bounding box of the grid as (min_lon, min_lat, max_lon, max_lat).
      grid_size: The size of each grid cell in degrees.
      crs: Coordinate reference system for the resulting GeoDataFrame.
      num_of_workers: The number of processes to use for parallel execution. Defaults to the min of number of CPU cores
        or number of cells in the grid
      logger: Logger instance.

    Returns:
    """
    lon_coords, lat_coords = create_grid_coordinates(bounding_box=bounding_box, grid_size=grid_size, logger=logger)
    lon_flat_grid, lat_flat_grid = generate_flattened_grid_coords(
        lat_coords=lat_coords, lon_coords=lon_coords, logger=logger
    )

    num_cells = len(lon_flat_grid)
    workers = min(cpu_count(), num_cells)
    if num_of_workers:
        workers = num_of_workers

    logger.info(f"Number of workers used: {workers}")
    logger.info(f"Allocating polygon array for [{num_cells}] polygons")

    chunk_size = (num_cells + workers - 1) // workers
    chunks = [
        (lon_flat_grid[i : i + chunk_size], lat_flat_grid[i : i + chunk_size], grid_size)
        for i in range(0, num_cells, chunk_size)
    ]

    polygons = []
    logger.info("Creating polygons from chunks")
    with ProcessPoolExecutor(max_workers=workers) as executor:
        results = executor.map(_create_polygons_from_coords_chunk, chunks)
        for result in results:
            polygons.extend(result)

    logger.info("Managing properties")
    properties: dict[str, Any] = {"data": {"geometry": polygons}}
    if crs:
        projection = create_crs(crs)
        properties["crs"] = projection
    grid: GeoDataFrame = GeoDataFrame(**properties)
    logger.info("Creating spatial index")
    _ = grid.sindex
    logger.info("Generating polygon UUIDs")
    _generate_uuid_column(grid)
    return grid

dask_spatial_join #

dask_spatial_join(
    select_features_from: GeoDataFrame,
    intersected_with: GeoDataFrame,
    join_type: str = "inner",
    predicate: str = "intersects",
    num_of_workers=4,
    logger: Logger = LOGGER,
) -> GeoDataFrame

Parameters:

Name Type Description Default
select_features_from GeoDataFrame
required
intersected_with GeoDataFrame
required
join_type str
'inner'
predicate str
'intersects'
num_of_workers
4
logger Logger
LOGGER

Returns:

Source code in src/geospatial_tools/vector.py
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
def dask_spatial_join(
    select_features_from: GeoDataFrame,
    intersected_with: GeoDataFrame,
    join_type: str = "inner",
    predicate: str = "intersects",
    num_of_workers=4,
    logger: logging.Logger = LOGGER,
) -> GeoDataFrame:
    """

    Args:
      select_features_from:
      intersected_with:
      join_type:
      predicate:
      num_of_workers:
      logger:

    Returns:


    """
    dask_select_gdf = dgpd.from_geopandas(select_features_from, npartitions=num_of_workers)
    dask_intersected_gdf = dgpd.from_geopandas(intersected_with, npartitions=1)
    logger.info("Concatenating results")
    result = dgpd.sjoin(dask_select_gdf, dask_intersected_gdf, how=join_type, predicate=predicate).compute()
    result = GeoDataFrame(result)
    logger.info("Creating spatial index")
    _ = result.sindex

    return result

select_polygons_by_location #

select_polygons_by_location(
    select_features_from: GeoDataFrame,
    intersected_with: GeoDataFrame,
    num_of_workers: int | None = None,
    join_type: str = "inner",
    predicate="intersects",
    join_function=dask_spatial_join,
    logger: Logger = LOGGER,
) -> GeoDataFrame

This function executes a select by location operation on a GeoDataFrame. It is essentially a wrapper around gpd.sjoin to allow parallel execution. While it does use sjoin, only the columns from select_features_from are kept.

Parameters:

Name Type Description Default
select_features_from GeoDataFrame

GeoDataFrame containing the polygons from which to select features from.

required
intersected_with GeoDataFrame

Geodataframe containing the polygons that will be used to select features with via an intersect operation.

required
num_of_workers int | None

Number of parallel processes to use for execution. If using on a compute cluster, please set a specific amount (ex. 1 per CPU core requested). Defaults to the min of number of CPU cores or number (cpu_count())

None
join_type str
'inner'
predicate

The predicate to use for selecting features from. Available predicates are: ['intersects', 'contains', 'within', 'touches', 'crosses', 'overlaps']. Defaults to 'intersects'

'intersects'
join_function

Function that will execute the join operation. Available functions are: 'multiprocessor_spatial_join'; 'dask_spatial_join'; or custom functions. (Default value = multiprocessor_spatial_join)

dask_spatial_join
logger Logger

Logger instance.

LOGGER

Returns:

Source code in src/geospatial_tools/vector.py
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
def select_polygons_by_location(
    select_features_from: GeoDataFrame,
    intersected_with: GeoDataFrame,
    num_of_workers: int | None = None,
    join_type: str = "inner",
    predicate="intersects",
    join_function=dask_spatial_join,
    logger: logging.Logger = LOGGER,
) -> GeoDataFrame:
    """
    This function executes a `select by location` operation on a GeoDataFrame. It is essentially a wrapper around
    `gpd.sjoin` to allow parallel execution. While it does use `sjoin`, only the columns from `select_features_from` are
    kept.

    Args:
      select_features_from: GeoDataFrame containing the polygons from which to select features from.
      intersected_with: Geodataframe containing the polygons that will be used to select features with via an intersect
        operation.
      num_of_workers: Number of parallel processes to use for execution. If using
        on a compute cluster, please set a specific amount (ex. 1 per CPU core requested).
        Defaults to the min of number of CPU cores
        or number (cpu_count())
      join_type:
      predicate: The predicate to use for selecting features from. Available predicates are:
        ['intersects', 'contains', 'within', 'touches', 'crosses', 'overlaps']. Defaults to 'intersects'
      join_function: Function that will execute the join operation. Available functions are:
        'multiprocessor_spatial_join'; 'dask_spatial_join'; or custom functions.
        (Default value = multiprocessor_spatial_join)
      logger: Logger instance.

    Returns:
    """
    workers = min(cpu_count(), 4)
    if num_of_workers:
        workers = num_of_workers
    logger.info(f"Number of workers used: {workers}")

    intersecting_polygons = join_function(
        select_features_from=select_features_from,
        intersected_with=intersected_with,
        join_type=join_type,
        predicate=predicate,
        num_of_workers=num_of_workers,
    )
    logger.info("Filtering columns of the results")
    filtered_result_gdf = intersecting_polygons.drop(columns=intersecting_polygons.filter(like="_right").columns)
    column_list_to_filter = [item for item in intersected_with.columns if item not in select_features_from.columns]
    conserved_columns = [col for col in filtered_result_gdf.columns if col not in column_list_to_filter]
    filtered_result_gdf = filtered_result_gdf[conserved_columns]  # pylint: disable=E1136

    return filtered_result_gdf

to_geopackage #

to_geopackage(gdf: GeoDataFrame, filename: str | Path, logger=LOGGER) -> str | Path

Save GeoDataFrame to a Geopackage file.

Parameters:

Name Type Description Default
gdf GeoDataFrame

The GeoDataFrame to save.

required
filename str | Path

The filename to save to.

required
logger

Logger instance (Default value = LOGGER)

LOGGER

Returns:

Source code in src/geospatial_tools/vector.py
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
def to_geopackage(gdf: GeoDataFrame, filename: str | Path, logger=LOGGER) -> str | Path:
    """
    Save GeoDataFrame to a Geopackage file.

    Args:
      gdf: The GeoDataFrame to save.
      filename: The filename to save to.
      logger: Logger instance (Default value = LOGGER)

    Returns:
    """
    start = time.time()
    logger.info("Starting writing process")
    if isinstance(gdf, pd.DataFrame):
        gdf = GeoDataFrame(gdf)
    gdf.to_file(filename, driver=GEOPACKAGE_DRIVER, mode="w")
    stop = time.time()
    logger.info(f"File [{filename}] took {stop - start} seconds to write.")

    return filename

to_geopackage_chunked #

to_geopackage_chunked(
    gdf: GeoDataFrame, filename: str, chunk_size: int = 1000000, logger: Logger = LOGGER
) -> str

Save GeoDataFrame to a Geopackage file using chunks to help with potential memory consumption. This function can potentially be slower than to_geopackage, especially if chunk_size is not adequately defined. Therefore, this function should only be required if to_geopackage fails because of memory issues.

Parameters:

Name Type Description Default
gdf GeoDataFrame

The GeoDataFrame to save.

required
filename str

The filename to save to.

required
chunk_size int

The number of rows per chunk.

1000000
logger Logger

Logger instance.

LOGGER

Returns:

Source code in src/geospatial_tools/vector.py
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
def to_geopackage_chunked(
    gdf: GeoDataFrame, filename: str, chunk_size: int = 1000000, logger: logging.Logger = LOGGER
) -> str:
    """
    Save GeoDataFrame to a Geopackage file using chunks to help with potential memory consumption. This function can
    potentially be slower than `to_geopackage`, especially if `chunk_size` is not adequately defined. Therefore, this
    function should only be required if `to_geopackage` fails because of memory issues.

    Args:
      gdf: The GeoDataFrame to save.
      filename: The filename to save to.
      chunk_size: The number of rows per chunk.
      logger: Logger instance.

    Returns:
    """
    filename_path = Path(filename)
    if filename_path.exists():
        filename_path.unlink()

    start = time.time()
    logger.info("Starting writing process")
    logger.info(f"Chunk size used : [{chunk_size}]")
    chunk = gdf.iloc[0:chunk_size]
    chunk.to_file(filename, driver=GEOPACKAGE_DRIVER, mode="w")

    for i in range(chunk_size, len(gdf), chunk_size):
        chunk = gdf.iloc[i : i + chunk_size]
        chunk.to_file(filename, driver=GEOPACKAGE_DRIVER, mode="a")

    stop = time.time()
    logger.info(f"File [{filename}] took {stop - start} seconds to write.")

    return filename

spatial_join_within #

spatial_join_within(
    polygon_features: GeoDataFrame,
    polygon_column: str,
    vector_features: GeoDataFrame,
    vector_column_name: str,
    join_type: str = "left",
    predicate: str = "within",
    logger=LOGGER,
) -> GeoDataFrame

This function does a spatial join based on a within operation between features to associate which vector_features are within which polygon_features, groups the results by vector feature.

Parameters:

Name Type Description Default
polygon_features GeoDataFrame

Dataframes containing polygons. Will be used to find which features of vector_features are contained within which polygon

required
polygon_column str

The name of the column in polygon_features that contains the name/id of each polygon.

required
vector_features GeoDataFrame

The dataframe containing the features that will be grouped by polygon.

required
vector_column_name str

The name of the column in vector_features that will contain the name/id of each polygon.

required
join_type str

The type of join to perform. Defaults to 'left'.

'left'
predicate str

The predicate to use for the spatial join operation. Defaults to within.

'within'
logger

Logger instance

LOGGER

Returns:

Type Description
GeoDataFrame

A new GeoDataFrame with the joined features.

Source code in src/geospatial_tools/vector.py
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
def spatial_join_within(
    polygon_features: gpd.GeoDataFrame,
    polygon_column: str,
    vector_features: gpd.GeoDataFrame,
    vector_column_name: str,
    join_type: str = "left",
    predicate: str = "within",
    logger=LOGGER,
) -> gpd.GeoDataFrame:
    """
    This function does a spatial join based on a within operation between features to associate which `vector_features`
    are within which `polygon_features`, groups the results by vector feature.

    Args:
      polygon_features: Dataframes containing polygons. Will be used to find which features of `vector_features`
        are contained within which polygon
      polygon_column: The name of the column in `polygon_features` that contains the name/id
        of each polygon.
      vector_features: The dataframe containing the features that will be grouped by polygon.
      vector_column_name: The name of the column in `vector_features` that will contain the name/id of each polygon.
      join_type: The type of join to perform. Defaults to 'left'.
      predicate: The predicate to use for the spatial join operation. Defaults to `within`.
      logger: Logger instance

    Returns:
      A new GeoDataFrame with the joined features.
    """
    temp_feature_id = "feature_id"
    uuid_suffix = str(uuid.uuid4())
    if temp_feature_id in vector_features.columns:
        logger.info("Creating temporary UUID field for join operations")
        temp_feature_id = f"{temp_feature_id}_{uuid_suffix}"
    _generate_uuid_column(df=vector_features, column_name=temp_feature_id)
    logger.info("Starting process to find and identify contained features using spatial 'within' join operation")
    joined_gdf = gpd.sjoin(
        vector_features, polygon_features[[polygon_column, "geometry"]], how=join_type, predicate=predicate
    )
    logger.info("Grouping results")
    grouped_gdf = joined_gdf.groupby(temp_feature_id)[polygon_column].agg(list).reset_index()
    logger.info("Cleaning and merging results")
    features = vector_features.merge(grouped_gdf, on=temp_feature_id, how="left")
    features = features.rename(columns={polygon_column: vector_column_name})
    features = features.drop(columns=[temp_feature_id])
    features[vector_column_name] = features[vector_column_name].apply(sorted)
    logger.info("Spatial join operation is completed")
    return gpd.GeoDataFrame(features)