import numpy as np
import datetime
import cv2
from zoneinfo import ZoneInfo
from pytz import utc
# Size of the timestamp label
ts_factor = 4
[docs]
class Video:
def __init__(self, filepath):
self.filepath = filepath
self.cap = cv2.VideoCapture(filepath)
self.n_frames = int(self.cap.get(cv2.CAP_PROP_FRAME_COUNT))
self.time_bounds = self.get_timestamps([0, self.n_frames - 1])
[docs]
def get_data_time(self, dt, return_timestamp=False):
"""Gets image data at the nearest possible timestamp. Images have 5 second time resolution."""
n = self.estimate_frame_number(dt)
n = np.round(n).astype(int)
frame = self.get_frame(n)
if return_timestamp:
return frame, extract_timestamp(frame)
return frame
[docs]
def get_frame(self, n):
if n < 0:
n = self.n_frames + n
self.cap.set(cv2.CAP_PROP_POS_FRAMES, n)
ret, frame = self.cap.read()
if not ret:
raise ValueError(f"Could not read frame {n} from {self.filepath}")
return frame
[docs]
def get_timestamps(self, ns=None):
timestamps = []
if ns is None:
ns = range(self.n_frames)
for i in ns:
frame = self.get_frame(i)
timestamps.append(extract_timestamp(frame))
return tuple(timestamps)
[docs]
def estimate_frame_number(self, dt):
if dt < self.time_bounds[0] or dt > self.time_bounds[1]:
raise ValueError(
f"Timestamp {dt} is not in the video time bounds for the video {self.filepath}"
)
return np.interp(
dt.timestamp(),
[self.time_bounds[0].timestamp(), self.time_bounds[1].timestamp()],
[0, self.n_frames - 1],
)
def __len__(self):
return self.n_frames
def __repr__(self):
return f"Video({self.filepath}, {self.n_frames} frames)"
def timestamp_image(tstamp, data, ts_factor=ts_factor, exposure=None):
stamp = int(tstamp)
ts_array = np.array(list(np.binary_repr(stamp, 31))).astype("int")
ts_array = ts_array[None, :].repeat(ts_factor, axis=1).repeat(ts_factor, axis=0)
data[:ts_factor, : (31 * ts_factor), :] = 255 * ts_array[:, :, None]
if exposure is not None:
stamp = int(exposure)
ts_array = np.array(list(np.binary_repr(stamp, 31))).astype("int")
ts_array = ts_array[None, :].repeat(ts_factor, axis=1).repeat(ts_factor, axis=0)
data[ts_factor : (2 * ts_factor), : (31 * ts_factor), :] = (
255 * ts_array[:, :, None]
)
return data
def create_timestamp():
pass
def extract_timestamp(image, ts_factor=ts_factor):
"""Return a datetime object with the image timestamp to the nearest second."""
image_ts_array = (image[0, 0 : (31 * ts_factor) : ts_factor, 0] > 128).astype("int")
int_timestamp = int("".join(str(a) for a in image_ts_array), 2)
# return datetime.datetime.fromtimestamp(
# int_timestamp
# )
local_time = datetime.datetime.fromtimestamp(
int_timestamp, tz=ZoneInfo("Europe/London")
)
utc_time = local_time.astimezone(utc)
return utc_time.replace(tzinfo=None) # Make "timezone naive" to be compatible.
def extract_exposure(image, ts_factor=ts_factor):
image_exp_array = (
image[ts_factor, 0 : (31 * ts_factor) : ts_factor, 0] > 128
).astype("int")
return int("".join(str(a) for a in image_exp_array), 2)
def round_seconds(dt):
if dt.microsecond >= 500000:
dt += datetime.timedelta(seconds=1)
return dt.replace(microsecond=0)
def round_up_interval(dt, interval):
dt = dt.replace(microsecond=0) + datetime.timedelta(seconds=1)
seconds_since_hour = (
dt - dt.replace(minute=0, second=0, microsecond=0)
).total_seconds()
extra_seconds = interval - seconds_since_hour % interval
return dt + datetime.timedelta(seconds=extra_seconds)