Source code for hello.video.rtsp_pull

import sys
import time
import traceback
from pathlib import Path

import cv2 as cv
import numpy as np


[docs] class Queue: def __init__(self, size) -> None: self.size = size self.data = []
[docs] def append(self, value): self.data.append(value) self.data = self.data[-self.size:]
[docs] def info(self): data = self.data n = len(data) if n > 1: v = (n - 1) / (data[-1] - data[0]) return f"{v:.1f}" return "none"
[docs] class Video: def __init__(self, rtsp_url) -> None: self.time_window = 30 # 30ms self.rtsp_url = rtsp_url self.cap = None self.out = None
[docs] def get_cap(self): if self.cap is None: self.cap = cv.VideoCapture(self.rtsp_url) if not self.cap.isOpened(): self.cap.open(self.rtsp_url) return self.cap
[docs] def get_info(self): cap = self.get_cap() fps, width, height = 0, 0, 0 if cap is not None: fps = int(cap.get(cv.CAP_PROP_FPS)) width = int(cap.get(cv.CAP_PROP_FRAME_WIDTH)) height = int(cap.get(cv.CAP_PROP_FRAME_HEIGHT)) return fps, width, height
[docs] def read(self): cap = self.get_cap() frame = None errstr = None if not cap.isOpened(): errstr = f"Failed to open: {self.rtsp_url}" return frame, errstr try: ret, frame = cap.read() if not ret: errstr = "Can't receive frame" frame = None except: print(traceback.format_exc()) errstr = "read exception" frame = None return frame, errstr
[docs] def get_out(self): if self.out is None: fps, width, height = self.get_info() out = None if min(fps, width, height) > 0: fourcc = cv.VideoWriter_fourcc(*"XVID") out_file = time.strftime(r"%Y%m%d_%H%M%S") Path("data").mkdir(parents=True, exist_ok=True) out = cv.VideoWriter(f"data/{out_file}.avi", fourcc, fps, (width, height)) self.time_window = 1000 // fps self.out = out return self.out
[docs] def write(self, frame): if frame is None: return False out = self.get_out() if out is None: return False out.write(frame) return True
[docs] def save(self): if self.out is not None: try: self.out.release() except: print("`out.release()` exception") self.out = None
[docs] def release(self): if self.cap is not None: try: self.cap.release() except: print("`cap.release()` exception") if self.out is not None: try: self.out.release() except: print("`out.release()` exception") self.cap = None self.out = None
[docs] def func(rtsp_url, view="front"): _queue = Queue(10) _video = Video(rtsp_url) flag = "watch" # watch, recording time_loc = time.time() while True: frame, errstr = _video.read() if flag == "recording": if _video.write(frame): n_frame += 1 msgstr = f"{flag} frame={n_frame} vfps={_queue.info()}" else: msgstr = f"{flag} vfps={_queue.info()}" if frame is None: frame = np.zeros((1080, 1920, 3), dtype="uint8") if view == "front": frame = frame[:, frame.shape[1]//2:] frame = cv.flip(cv.transpose(frame), 0) if errstr is None: errstr = "hotkey: n (recording), s (save), q (quit)" cv.putText(frame, msgstr, (15, 25), cv.FONT_HERSHEY_SIMPLEX, 1.0, (255, 255, 255), 2) cv.putText(frame, errstr, (15, 55), cv.FONT_HERSHEY_SIMPLEX, 1.0, (255, 255, 255), 2) cv.imshow("RTSP STREAM", frame) latency = int((time.time() - time_loc) * 1000) sleep = max(1, _video.time_window - latency) key = cv.waitKey(sleep) if key == ord("q"): cv.destroyAllWindows() _video.release() break elif key == ord("n"): if flag != "recording": n_frame = 0 flag = "recording" elif key == ord("s"): _video.save() flag = "watch" time_loc = time.time() _queue.append(time_loc)
# python rtsp_pull.py rtsp://localhost:8554/mystream # python -m hello.video.rtsp_pull rtsp://localhost:8554/mystream front if __name__ == "__main__": args = sys.argv[1:] assert len(args) > 0 # base_url = "rtsp://192.168.0.145/<filename>" base_url = args[0] view = "full" if len(args) > 1: view = args[1] rtsp_url = base_url.replace("<filename>", "lfi_stream.live") func(rtsp_url, view=view)