Source code for arguslib.radar.locator

from typing import Union
import xarray as xr
import datetime
import pyart
from csat2.locator import FileLocator
import os
import re  # Added for robust parsing

# TODO: could update these to discriminate the specific radar... Probably a job for when we put the search paths in the config.
vpt_filename_format = "/disk1/Data/{campaign}/radar/L1/{year}{mon:0>2}{day:0>2}/ncas-mobile-ka-band-radar-1_cao_{year}{mon:0>2}{day:0>2}-{hour:0>2}**_vpt_l1_v1.0.0.nc"
rhi_filename_format = "/disk1/Data/{campaign}/radar/L1/{year}{mon:0>2}{day:0>2}/ncas-mobile-ka-band-radar-1_cao_{year}{mon:0>2}{day:0>2}-{hour:0>2}**_rhi_l1_v1.0.0.nc"


def initialise_locator():
    locator = FileLocator()
    locator.search_paths["ARGUS"] = {}
    locator.search_paths["ARGUS"]["vpt"] = [vpt_filename_format]
    locator.search_paths["ARGUS"]["rhi"] = [rhi_filename_format]
    return locator


[docs] class RadarData: def __init__(self, campaign, scan_type="rhi"): self.campaign = campaign self.scan_type = scan_type self.locator = initialise_locator() self.radar_data: Union[pyart.core.Radar, None] = None # Type hint for clarity self.current_file_path: Union[str, None] = None
[docs] def get_gridded_data_time(self, dt, var) -> xr.DataArray: if self.scan_type == "vpt": return self._get_gridded_vpt_data(dt, var) elif self.scan_type == "rhi": return self._get_gridded_rhi_data(dt, var) else: raise ValueError(f"Unknown scan type {self.scan_type}")
def _get_gridded_rhi_data(self, dt, var) -> xr.DataArray: radar = self.get_pyart_radar(dt) # This will now use the smarter loader if var not in radar.fields: raise ValueError( f"Variable {var} not found in radar file: {self.current_file_path}" ) grid = pyart.map.grid_rhi_sweeps( radar, roi_func="dist_beam", nb=0.6, bsp=0.3, min_radius=30 ) # Assumptions made about beamwidth, spacing, and min_radius. # spacing an width informed by radar parameters, min_radius guessed to look ok. return grid[var] def _get_gridded_vpt_data(self, dt, var) -> xr.DataArray: raise NotImplementedError("VPT data gridding not yet implemented")
[docs] def get_pyart_radar(self, dt) -> pyart.core.Radar: # Check cache first: if currently loaded radar data covers dt if self.radar_data and self.current_file_path: try: if ( hasattr(self.radar_data, "time") and "data" in self.radar_data.time and self.radar_data.time["data"].size > 0 ): scan_start_time = pyart.util.datetime_from_radar(self.radar_data) # Ensure time data is not empty for duration calculation # Duration from the start of the volume to the time of the last ray recorded. scan_duration_seconds = ( self.radar_data.time["data"][-1] - self.radar_data.time["data"][0] ) scan_end_time = scan_start_time + datetime.timedelta( seconds=scan_duration_seconds ) if scan_start_time <= dt <= scan_end_time: return self.radar_data else: # Cached data is invalid or incomplete, invalidate self.radar_data = None self.current_file_path = None except Exception: # Broad exception if there's any issue with cached data self.radar_data = None # Invalidate cache self.current_file_path = None # If cache miss or invalid, find and load the appropriate file filepath = self.get_filepath(dt) if filepath is None: raise FileNotFoundError( f"No radar file found containing the time {dt} for campaign {self.campaign}, scan_type {self.scan_type}" ) if filepath != self.current_file_path or self.radar_data is None: self.radar_data = pyart.io.read(filepath) self.current_file_path = filepath # Final check to ensure the loaded data actually contains dt # This is a safeguard, as get_filepath should have ensured this. if self.radar_data: # Ensure radar_data is loaded scan_start_time = pyart.util.datetime_from_radar(self.radar_data) if ( hasattr(self.radar_data, "time") and "data" in self.radar_data.time and self.radar_data.time["data"].size > 0 ): scan_duration_seconds = ( self.radar_data.time["data"][-1] - self.radar_data.time["data"][0] ) scan_end_time = scan_start_time + datetime.timedelta( seconds=scan_duration_seconds ) # if not (scan_start_time <= dt <= scan_end_time): #no longer the case, because we're reverting to exactly matching the filename (rounding the start time) # # This case should ideally not be reached if get_filepath is correct. # raise FileNotFoundError( # f"Loaded file {filepath} (scan: {scan_start_time} to {scan_end_time}) " # f"does not actually contain requested dt {dt}. This indicates an issue in logic." # ) else: # Should not happen if pyart.io.read was successful and file is valid raise ValueError( f"Loaded radar file {filepath} has no valid time data." ) else: # Should not happen if filepath was found raise ValueError("Radar data is None after attempting to load file.") return self.radar_data
[docs] def get_filepath(self, dt: datetime.datetime) -> str: """ Finds the filepath of a radar scan that contains the given datetime 'dt'. It searches all files for the day of 'dt', then checks their actual scan start and end times by reading the file metadata. Returns the path to the first suitable file found (sorted by filename). """ all_day_files = self.locator.search( "ARGUS", self.scan_type, campaign=self.campaign, year=dt.year, mon=dt.month, day=dt.day, hour="**", # Search all hours for the day min="**", second="**", ) if not all_day_files: return None # Sort files by name, which should correspond to start times. # This helps in picking the earliest scan if multiple cover 'dt'. for f_path in sorted(all_day_files): try: # Optional: Heuristic pre-check based on filename to speed up. # Assumes filename contains start time and a max scan duration. filename_base = os.path.basename(f_path) match = re.search(r"(\d{4}\d{2}\d{2}-\d{2}\d{2}\d{2})", filename_base) if match: file_start_dt_from_name = datetime.datetime.strptime( match.group(1), "%Y%m%d-%H%M%S" ) if file_start_dt_from_name.replace(microsecond=0) == dt.replace( microsecond=0 ): return f_path # # If file starts significantly after dt, skip. # if file_start_dt_from_name > dt + datetime.timedelta(minutes=5): # Grace for dt slightly before actual scan # continue # # If file (estimated) ends significantly before dt, skip. # max_estimated_scan_duration = datetime.timedelta(minutes=15) # Generous estimate # if file_start_dt_from_name + max_estimated_scan_duration < dt: # continue # # Read the radar file to get its precise time coverage # radar_obj = pyart.io.read(f_path) # actual_scan_start_time = pyart.util.datetime_from_radar(radar_obj) # if not (hasattr(radar_obj, 'time') and 'data' in radar_obj.time and radar_obj.time['data'].size > 0): # continue # Skip if no valid time data # scan_duration_seconds = radar_obj.time['data'][-1] - radar_obj.time['data'][0] # actual_scan_end_time = actual_scan_start_time + datetime.timedelta(seconds=scan_duration_seconds) # if actual_scan_start_time <= dt <= actual_scan_end_time: # return f_path # Found a file whose scan duration covers dt except FileNotFoundError: # Log or handle if a file listed by locator is not found continue except Exception: # Log or handle errors during file reading/processing (e.g., corrupted file) continue return None # No suitable file found
[docs] def get_next_time(self, dt, max_gap_hrs=48): """ Get the next available time for the radar data. This method might need review if its definition of "next time" should also consider scans crossing hour boundaries more explicitly, but for now, it finds the next *file start time*. """ # Search all files for the day files = self.locator.search( "ARGUS", self.scan_type, campaign=self.campaign, year=dt.year, mon=dt.month, day=dt.day, hour="**", # Search all hours min="**", second="**", ) files = sorted(files) dates = [] for f_path in files: try: # Parse start time from filename filename_base = os.path.basename(f_path) # Assuming YYYYMMDD-HHMMSS is in a part like 'YYYYMMDD-HHMMSS' match = re.search(r"(\d{4}\d{2}\d{2}-\d{2}\d{2}\d{2})", filename_base) if match: file_start_dt = datetime.datetime.strptime( match.group(1), "%Y%m%d-%H%M%S" ) dates.append(file_start_dt) except ValueError: # Skip if filename format is unexpected for date parsing pass next_available = None for i, date_obj in enumerate(dates): # Changed 'date' to 'date_obj' if date_obj > dt: # We want the start time of the *next* file # check if the gap is less than max_gap_hrs if (date_obj - dt).total_seconds() / 3600 > max_gap_hrs: return None # Gap is too large next_available = date_obj return next_available # Return start time of the next file # If no file found today after dt, try the next day recursively. start_of_next_day = (dt + datetime.timedelta(days=1)).replace( hour=0, minute=0, second=0, microsecond=0 ) time_to_next_day_hrs = (start_of_next_day - dt).total_seconds() / 3600 remaining_max_gap_hrs = max_gap_hrs - time_to_next_day_hrs if remaining_max_gap_hrs < 0: return None return self.get_next_time(start_of_next_day, remaining_max_gap_hrs)