Source code for arguslib.camera.video_interface

import datetime
from typing import Tuple, Optional, Any

import cv2
import numpy as np

from ..instruments.instruments import PlottableInstrument, Position
from ..radar.radar_overlay_interface import (
    RadarOverlayInterface,
)  # Allow RadarOverlayInterface
from ..protocols import DirectRenderable


[docs] class VideoInterface(PlottableInstrument): """ A PlottableInstrument that uses a DirectCamera to produce video frames and provides utilities for writing these frames to a video file. """ def __init__(self, instrument: PlottableInstrument): """ Args: instrument: An instrument that conforms to the `DirectRenderable` protocol. This includes `DirectCamera` and wrappers like `AircraftInterface` or `RadarOverlayInterface` when they contain a `DirectCamera`. """ from arguslib.aircraft.aircraft_interface import AircraftInterface if not isinstance(instrument, DirectRenderable): raise TypeError( f"Instrument of type {type(instrument).__name__} does not conform to the " "DirectRenderable protocol. It must have show(), annotate_positions(), " "and to_image_array() methods suitable for direct image rendering." ) self.direct_camera_delegate = instrument # Try to find the original base instrument to get the core attributes attrs_source = instrument if isinstance(instrument, AircraftInterface): attrs_source = instrument.camera if isinstance(attrs_source, RadarOverlayInterface): attrs_source = attrs_source.target_instrument super().__init__(**getattr(attrs_source, "attrs", {})) self.video_writer: Optional[cv2.VideoWriter] = None self.output_path: Optional[str] = None self.fps: Optional[int] = None self.resolution: Optional[Tuple[int, int]] = None # (width, height)
[docs] def show(self, dt: datetime.datetime, ax: Any = None, **kwargs: Any) -> None: """ Prepares the direct camera's internal image for the given datetime. Consistent with DirectCamera, this method updates the internal state and returns None as no Matplotlib axes are involved. Args: dt: The datetime for which to prepare the frame. ax: Should be None. Included for PlottableInstrument compatibility. **kwargs: Additional arguments passed to the direct_camera.show() method (e.g., brightness_adjust). """ if ax is not None: raise ValueError( "VideoInterface using DirectCamera should not have Matplotlib axes." ) # This call prepares/updates self.direct_camera.data_loader.image self.direct_camera_delegate.show(dt, ax=None, **kwargs) return None
[docs] def annotate_positions( self, positions: list[Position], dt: datetime.datetime, ax: Any = None, *args: Any, **kwargs: Any, ) -> None: """ Delegates annotation to the underlying direct camera. 'ax' is expected to be None for DirectCamera. """ if ax is not None: # This check is also in DirectCamera, but good practice at interface level. raise ValueError( "VideoInterface using DirectCamera should not have Matplotlib axes." ) # DirectCamera.annotate_positions will draw on its internal image and return None. return self.direct_camera_delegate.annotate_positions( positions, dt, ax, *args, **kwargs )
[docs] def start_video_output( self, output_path: str, fps: int, resolution: Tuple[int, int] ) -> None: """ Initializes video writing to a file. Args: output_path: Path to the output video file. fps: Frames per second for the video. resolution: Tuple (width, height) for the video frames. """ if self.video_writer is not None: self.finish_video_output() # Close existing video if any self.output_path = output_path self.fps = fps self.resolution = resolution # (width, height) fourcc = cv2.VideoWriter_fourcc(*"avc1") # Or 'avc1', 'XVID', etc. self.video_writer = cv2.VideoWriter( self.output_path, fourcc, self.fps, self.resolution ) if not self.video_writer.isOpened(): raise IOError(f"Could not open video writer for path: {self.output_path}")
[docs] def add_frame_to_video( self, dt: datetime.datetime, show_kwargs: Optional[dict] = None, time_overlay: bool = True, image_array: Optional[np.ndarray] = None, ) -> None: """ Adds a frame for the given datetime 'dt' to the video. If image_array is provided (in BGR format), it's used directly. Otherwise, a frame is generated using the direct_camera. Args: dt: Datetime for the frame. show_kwargs: Arguments to pass to self.direct_camera.show(). time_overlay: If True, adds a timestamp overlay to the frame (if generated). image_array: Optional pre-rendered BGR numpy array. """ if self.video_writer is None or self.resolution is None: raise RuntimeError( "Video output not started. Call start_video_output() first." ) if image_array is None: self.show( dt, **(show_kwargs or {}) ) # Updates direct_camera's internal image # direct_camera.to_image_array returns RGB frame_rgb = self.direct_camera_delegate.to_image_array(time=time_overlay) frame_bgr = frame_rgb[:, :, ::-1] # Convert RGB to BGR for OpenCV else: frame_bgr = image_array # Assume provided image_array is already BGR # Resize if necessary (e.g. if image_array has different dimensions) if ( frame_bgr.shape[1] != self.resolution[0] or frame_bgr.shape[0] != self.resolution[1] ): frame_bgr = cv2.resize(frame_bgr, self.resolution) self.video_writer.write(frame_bgr)
[docs] def finish_video_output(self) -> None: """Releases the video writer and resets video properties.""" if self.video_writer is not None: self.video_writer.release() print(f"Video saved to {self.output_path}") self.video_writer = None self.output_path = None self.fps = None self.resolution = None
[docs] def generate_video( self, output_path: str, start_dt: datetime.datetime, end_dt: datetime.datetime, step_timedelta: datetime.timedelta, fps: int, show_kwargs: Optional[dict] = None, time_overlay: bool = True, ) -> None: """Generates a complete video file by iterating through time.""" _show_kwargs = show_kwargs or {} self.show( start_dt, **_show_kwargs ) # Prepare first frame by calling delegate's show first_frame_rgb = self.direct_camera_delegate.to_image_array( time=time_overlay ) # Get image from delegate height, width, _ = first_frame_rgb.shape self.start_video_output(output_path, fps, (width, height)) current_dt = start_dt while current_dt <= end_dt: self.add_frame_to_video( current_dt, show_kwargs=_show_kwargs, time_overlay=time_overlay ) print( f"Processed frame for {current_dt.isoformat(timespec='seconds')}", end="\r", ) current_dt += step_timedelta print(f"\nFinished processing frames for {output_path}") self.finish_video_output()