Source code for arguslib.radar.radar

from arguslib.instruments.instruments import Instrument, Position
from arguslib.misc.plotting import TimestampedFigure, plot_beam
from arguslib.misc.interpolation import interpolate_to_intersection

import datetime
import numpy as np
import pyart
import yaml


from pathlib import Path
from typing_extensions import override


[docs] class Radar(Instrument): def __init__(self, beamwidth, *args, **kwargs): self.beamwidth = beamwidth super().__init__(*args, **kwargs)
[docs] def beam(self, radar_elevation, radar_azimuth, radar_distances): if self.beamwidth: return np.array( [ [ self.position.ead_to_lla( radar_elevation, radar_azimuth, radar_dist ), self.position.ead_to_lla( radar_elevation + self.beamwidth / 2, radar_azimuth, radar_dist, ), self.position.ead_to_lla( radar_elevation, radar_azimuth + self.beamwidth / 2, radar_dist, ), self.position.ead_to_lla( radar_elevation - self.beamwidth / 2, radar_azimuth, radar_dist, ), self.position.ead_to_lla( radar_elevation, radar_azimuth - self.beamwidth / 2, radar_dist, ), ] for radar_dist in radar_distances ] ) else: return np.array( [ [self.ead_to_lla(radar_elevation, radar_azimuth, radar_dist)] for radar_dist in radar_distances ] )
[docs] @classmethod def from_config( cls, campaign, **kwargs ): # TODO: make from_config a method of Instrument...? from arguslib.config import load_config radars = load_config("radars.yml") radar_config = radars[campaign] kwargs = { "beamwidth": radar_config["beamwidth"], "position": Position(*radar_config["position"]), "rotation": radar_config["rotation"], } | kwargs # will ignore config if kwargs contains any of the keys in camera_config kwargs |= {"campaign": campaign} return cls(**kwargs)
[docs] @override def initialise_data_loader(self): from . import RadarData self.data_loader = RadarData(self.attrs["campaign"], "rhi")
@override def _show(self, dt, var, ax=None, kwargs_beam={}, **kwargs): import matplotlib.pyplot as plt if ax is None: _, ax = plt.subplots(FigureClass=TimestampedFigure, timestamp=dt) radar = self.data_loader.get_pyart_radar(dt) display = pyart.graph.RadarDisplay(radar) kwargs = { "vmin": -60, "vmax": 40, "reverse_xaxis": False, # explicitly False to avoid flipping when all distances are negative. } | kwargs display.plot(var, ax=ax, **kwargs) ax.set_aspect("equal") # elevs = radar.elevation["data"] elev_azi_start = radar.elevation["data"][0], radar.azimuth["data"][0] elev_azi_end = radar.elevation["data"][-1], radar.azimuth["data"][-1] plot_beam( self, self, elev_azi_start, dt=dt, ax=ax, color="darkgreen", **kwargs_beam ) plot_beam( self, self, elev_azi_end, dt=dt, ax=ax, color="limegreen", **kwargs_beam ) return ax
[docs] def annotate_positions( self, positions, dt, ax, *args, plotting_method=None, label=None, **kwargs ): # **THE FIX**: Check for list, tuple, or numpy array, and check for length. if not isinstance(positions, (list, tuple, np.ndarray)) or len(positions) == 0: return interpolation_data = kwargs.pop("interpolation_data", None) # project the positions to the xy plane... xlims = ax.get_xlim() if dt is None: raise ValueError( "dt must be provided for radar positions to get the azimuth" ) # Vectorized call to get elevation, azimuth, and distance for all points eads = self.position.target_ead(positions) if eads.ndim == 1: eads = eads.reshape(1, -1) elevs, azimuths, dists = eads[:, 0], eads[:, 1], eads[:, 2] azimuth = self.data_loader.get_pyart_radar(dt).azimuth["data"][0] # Calculate angular separation and perpendicular offset from the scan plane theta_seps = azimuths % 360 - azimuth % 360 offsets = dists * np.sin(np.deg2rad(theta_seps)) # Calculate height (y) and horizontal range (x) for the RHI plot ys = dists * np.sin(np.deg2rad(elevs)) xs = dists * np.cos(np.deg2rad(elevs)) * np.cos(np.deg2rad(theta_seps)) # xaxis runs west to east, so if azimuth % 360 > 180, then the x axis is flipped if azimuth > 90 and azimuth < 270: xs = -xs plot_filter = (np.abs(offsets) < 2.5) & (xs > xlims[0]) & (xs < xlims[1]) if not plot_filter.any(): return filtered_xs = xs[plot_filter] filtered_ys = ys[plot_filter] if plotting_method is None: ax.plot(filtered_xs, filtered_ys, *args, label=label, **kwargs) elif plotting_method == "sized_plot": # vary the size based on the offset filtered_offsets = offsets[plot_filter] sizes = 1 / np.clip(np.abs(filtered_offsets), 0.1, 5) * 10 ax.plot( filtered_xs, filtered_ys, *args, label=label, **kwargs, ) ax.scatter( filtered_xs, filtered_ys, s=sizes, *args, **kwargs, ) else: getattr(ax, plotting_method)(filtered_xs, filtered_ys, *args, **kwargs) ax.set_xlim(xlims)
[docs] def annotate_intersections(self, positions, ages, dt, ax, **kwargs): """Calculates and annotates where a given path intersects the radar scan.""" (dt_start, dt_end) = kwargs.pop("time_bounds", (None, None)) if dt is None: raise ValueError("dt must be provided for radar positions") # A single vectorized call now handles all positions efficiently. eads = self.position.target_ead(positions) # Use direct numpy slicing for clarity and performance. elevs = eads[:, 0] azimuths = eads[:, 1] dists = eads[:, 2] pyart_radar = self.data_loader.get_pyart_radar(dt) azimuth = pyart_radar.azimuth["data"][0] theta_seps = azimuths % 360 - azimuth % 360 offsets = dists * np.sin(np.deg2rad(theta_seps)) ys = dists * np.sin(np.deg2rad(elevs)) xs = dists * np.cos(np.deg2rad(elevs)) * np.cos(np.deg2rad(theta_seps)) if azimuth > 90 and azimuth < 270: xs = -xs if dt_start is not None: # get the elev at the two dtimes radar_elevs = pyart_radar.elevation["data"] radar_times = pyart_radar.time["data"] try: start_elev = np.interp( dt_start.hour * 60 * 60 + dt_start.minute * 60 + dt_start.second + dt_start.microsecond * 1e-6, radar_times, radar_elevs, ) except: start_elev = radar_elevs[np.argmin(radar_times)] try: end_elev = np.interp( dt_end.hour * 60 * 60 + dt_end.minute * 60 + dt_end.second + dt_end.microsecond * 1e-6, radar_times, radar_elevs, ) except: end_elev = radar_elevs[np.argmax(radar_times)] min_elev = min(start_elev, end_elev) max_elev = max(start_elev, end_elev) else: min_elev, max_elev = None, None # Use the interpolation helper to find the intersection points intersections = interpolate_to_intersection( offsets=offsets, coords_to_interpolate={"x": xs, "y": ys}, data_to_interpolate={"age": ages}, ) # Get the current x-axis limits from the plot before looping xlims = ax.get_xlim() reverse_elevation = azimuth > 90 and azimuth < 270 elevs = np.array( [np.rad2deg(np.arctan2(point["y"], point["x"])) for point in intersections] ) elevs = 180 - elevs if reverse_elevation else elevs if min_elev is not None: intersections_in_elev_range = [ point for point, elev in zip(intersections, elevs) if min_elev <= elev <= max_elev ] else: intersections_in_elev_range = intersections valid_intersections = [ point for point in intersections_in_elev_range if xlims[0] <= point["x"] <= xlims[1] ] intersect_positions = [] if valid_intersections: # Extract data for plotting (vectorized) plot_xs = np.array([point["x"] for point in valid_intersections]) plot_ys = np.array([point["y"] for point in valid_intersections]) ages_at_intersect = np.array( [point["age"] for point in valid_intersections] ) # Calculate labels labels = [f"{age/60:.0f}min" for age in ages_at_intersect] # Plot intersections (single scatter call) ax.scatter(plot_xs, plot_ys, **kwargs) # Annotate intersections for i in range(len(plot_xs)): ax.text( plot_xs[i], plot_ys[i], labels[i], fontsize=8, color="white", bbox=dict( facecolor=kwargs.get("color", "magenta"), alpha=0.6, pad=1 ), ha="left", va="bottom", ) elevs = np.rad2deg(np.arctan2(plot_ys, plot_xs)) elevs = 180 - elevs if reverse_elevation else elevs intersect_positions.append( self.position.ead_to_lla( elevs, np.ones_like(elevs) * azimuth, np.sqrt(plot_xs**2 + plot_ys**2), ) ) ax.set_xlim(xlims) return intersect_positions