Skip to content

radar

nimrod #

extract_nimrod_from_archive #

extract_nimrod_from_archive(
    archive_file_path: str | Path, output_directory: str | Path = None
) -> Path

Extract nimrod data from an archive file. If no output directory is provided, the extracted data will be saved to the archive file's directory.

Parameters:

Name Type Description Default
archive_file_path str | Path

Path to the archive file

required
output_directory str | Path

Optional output directory.

None

Returns:

Type Description
Path

Path to the extracted nimrod data file

Source code in geospatial_tools/radar/nimrod.py
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
def extract_nimrod_from_archive(archive_file_path: str | Path, output_directory: str | Path = None) -> Path:
    """
    Extract nimrod data from an archive file. If no output directory is provided, the extracted data will be saved to
    the archive file's directory.

    Args:
        archive_file_path: Path to the archive file
        output_directory: Optional output directory.

    Returns:
            Path to the extracted nimrod data file
    """
    if isinstance(archive_file_path, str):
        archive_file_path = Path(archive_file_path)
    full_path = archive_file_path.resolve()
    parent_folder = archive_file_path.parent
    filename = archive_file_path.stem

    target_folder = None
    if output_directory:
        if isinstance(output_directory, str):
            output_directory = Path(output_directory)
        target_folder = output_directory

    if not target_folder:
        target_folder = parent_folder / filename

    target_folder.mkdir(parents=True, exist_ok=True)
    gzip_file_headers = parse_gzip_header(archive_file_path)
    contained_filename = gzip_file_headers["original_name"]

    with gzip.open(full_path, "rb") as f_in:
        if not contained_filename:
            contained_filename = filename
        print(f"Filename {contained_filename}")
        out_path = target_folder / contained_filename

        with open(out_path, "wb") as f_out:
            shutil.copyfileobj(f_in, f_out)

    return out_path

load_nimrod_cubes #

load_nimrod_cubes(filenames: list[str | Path]) -> Generator[Cube | Any, Any, None]

Parameters:

Name Type Description Default
filenames list[str | Path]

List of nimrod files

required

Returns:

Type Description
None

Generator of cubes

Source code in geospatial_tools/radar/nimrod.py
63
64
65
66
67
68
69
70
71
72
73
74
75
76
def load_nimrod_cubes(filenames: list[str | Path]) -> Generator[Cube | Any, Any, None]:
    """

    Args:
        filenames: List of nimrod files

    Returns:
        Generator of cubes

    """
    # Ensure filenames are strings, as iris load_cubes might expect strings
    filenames = [str(f) for f in filenames]
    cubes = load_cubes(filenames)
    return cubes

load_nimrod_from_archive #

load_nimrod_from_archive(filename: str | Path) -> Generator[Cube | Any, Any, None]

Parameters:

Name Type Description Default
filename str | Path

Path to the archive file

required

Returns:

Type Description
None

Generator of cubes

Source code in geospatial_tools/radar/nimrod.py
79
80
81
82
83
84
85
86
87
88
89
90
91
92
def load_nimrod_from_archive(filename: str | Path) -> Generator[Cube | Any, Any, None]:
    """

    Args:
        filename: Path to the archive file

    Returns:
        Generator of cubes

    """
    nimrod_extracted_file = extract_nimrod_from_archive(filename)
    # The extraction returns a single file path. We load that file.
    cubes = load_nimrod_cubes([nimrod_extracted_file])
    return cubes

merge_nimrod_cubes #

merge_nimrod_cubes(cubes: list[Cube]) -> Cube

Parameters:

Name Type Description Default
cubes list[Cube]

List of cubes to merge

required

Returns:

Type Description
Cube

Merged cube

Source code in geospatial_tools/radar/nimrod.py
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
def merge_nimrod_cubes(cubes: list[Cube]) -> Cube:
    """

    Args:
        cubes: List of cubes to merge

    Returns:
        Merged cube
    """
    cubes = CubeList(cubes)
    merged_cubes = cubes.merge_cube()
    return merged_cubes

mean_nimrod_cubes #

mean_nimrod_cubes(merged_cubes: Cube) -> Cube

Parameters:

Name Type Description Default
merged_cubes Cube

Merged cube

required

Returns:

Type Description
Cube

Mean cube

Source code in geospatial_tools/radar/nimrod.py
109
110
111
112
113
114
115
116
117
118
119
def mean_nimrod_cubes(merged_cubes: Cube) -> Cube:
    """

    Args:
        merged_cubes: Merged cube

    Returns:
        Mean cube
    """
    mean_cube = merged_cubes.collapsed("time", MEAN)
    return mean_cube

write_cube_to_file #

write_cube_to_file(cube: Cube, output_name: str | Path) -> None

Save a nimrod cube to a Netcdf file.

Parameters:

Name Type Description Default
cube Cube

Cube to save

required
output_name str | Path

Output filename

required
Source code in geospatial_tools/radar/nimrod.py
122
123
124
125
126
127
128
129
130
def write_cube_to_file(cube: Cube, output_name: str | Path) -> None:
    """
    Save a nimrod cube to a Netcdf file.

    Args:
        cube: Cube to save
        output_name: Output filename
    """
    netcdf.save(cube, output_name)

assert_dataset_time_dim_is_valid #

assert_dataset_time_dim_is_valid(
    dataset: Dataset, time_dimension_name: str = "time"
) -> None
Ths function checks that the time dimension of a given dataset
  • Is composed of 5-minute time bins - Which is the native Nimrod format
  • Contains a continuous time series, without any holes - which would lead to false statistics when resampling

Parameters:

Name Type Description Default
dataset Dataset

Merged nimrod cube

required
time_dimension_name str

Name of the time dimension

'time'

Returns:

Type Description
None

Bool value indicating if the time bins are 5 minutes long and if there are no

None

gaps in the time series

Source code in geospatial_tools/radar/nimrod.py
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
def assert_dataset_time_dim_is_valid(dataset: xr.Dataset, time_dimension_name: str = "time") -> None:
    """
    Ths function checks that the time dimension of a given dataset :
        - Is composed of 5-minute time bins - Which is the native Nimrod format
        - Contains a continuous time series, without any holes - which would lead to false statistics when resampling

    Args:
        dataset: Merged nimrod cube
        time_dimension_name: Name of the time dimension

    Returns:
        Bool value indicating if the time bins are 5 minutes long and if there are no
        gaps in the time series
    """
    dataset_time_dimension = dataset[time_dimension_name]
    if not dataset_time_dimension.to_index().is_monotonic_increasing:
        raise AssertionError("Time is not sorted ascending")
    if not dataset_time_dimension.to_index().is_unique:
        duplicates = dataset_time_dimension.to_index()[dataset_time_dimension.to_index().duplicated(keep=False)]
        raise AssertionError(f"Duplicate timestamps present: {duplicates[:10]} ...")

    difference_between_timesteps = dataset_time_dimension.diff(time_dimension_name)
    if (difference_between_timesteps != FIVE_MIN).any():
        larger_time_gaps = np.nonzero((difference_between_timesteps != FIVE_MIN).compute().to_numpy())[0][:5]
        raise AssertionError(
            f"Non-5min gaps at positions {larger_time_gaps} "
            f"(examples: {difference_between_timesteps.isel({time_dimension_name: larger_time_gaps}).to_numpy()})"
        )

    start = pd.Timestamp(dataset_time_dimension.to_numpy()[0])
    end = pd.Timestamp(dataset_time_dimension.to_numpy()[-1])
    expected_index = pd.date_range(start=start, end=end, freq="5min", inclusive="both")
    dataset_index = dataset_time_dimension.to_index()
    missing_indexes = expected_index.difference(dataset_index)
    if len(missing_indexes) > 0:
        raise AssertionError(f"missing {len(missing_indexes)} stamps; first few: {missing_indexes[:10]}")

resample_nimrod_timebox_30min_bins #

resample_nimrod_timebox_30min_bins(
    filenames: list[str | Path], output_name: str | Path
) -> str | Path

This will resample nimrod data's bins to 30-minute interval instead of their normal 5-minute interval. It uses a mean resampling, and creates time bins like follows :

ex. [[09h00, < 9h05], [09h05, < 9h10], ... ] -> [[09h00, < 9h30], [09h30, < 10h], ... ]

Parameters:

Name Type Description Default
filenames list[str | Path]

List of netcdf nimrod files

required
output_name str | Path

Output filename

required

Returns:

Type Description
str | Path

Path to the output file

Source code in geospatial_tools/radar/nimrod.py
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
def resample_nimrod_timebox_30min_bins(filenames: list[str | Path], output_name: str | Path) -> str | Path:
    """
    This will resample nimrod data's bins to 30-minute interval instead of their
    normal 5-minute interval. It uses a mean resampling, and creates time bins like
    follows :

    ex. [[09h00, < 9h05], [09h05, < 9h10], ... ] -> [[09h00, < 9h30], [09h30, < 10h], ... ]

    Args:
        filenames: List of netcdf nimrod files
        output_name: Output filename

    Returns:
        Path to the output file
    """
    ds = xr.open_mfdataset(filenames, combine="nested", concat_dim="time")
    ds_30min = ds.resample(time="30min").mean()
    ds_30min.to_netcdf(output_name)
    return output_name