import json
import shutil
import sys
from pathlib import Path
import cv2 as cv
import numpy as np
help_doc_str = """\
- press `esc` to exit
- press `w/s` up/down step size
- press `a/d` backward/forward pos
- press `f` keep clip and to the next
- press `space` drop clip and to the next
- press `enter` to accept all remaining frames
"""
suffix_set = set(".avi,.mp4".split(","))
[docs]
def find_videos(input_dir):
video_paths = []
for f in sorted(Path(input_dir).glob("**/*")):
if f.suffix in suffix_set:
video_paths.append(f.as_posix())
return video_paths
[docs]
def tag_video(video_path, factor):
cap = cv.VideoCapture(video_path)
cap_fps = int(cap.get(cv.CAP_PROP_FPS))
frame_count = int(cap.get(cv.CAP_PROP_FRAME_COUNT))
frame_width = int(cap.get(cv.CAP_PROP_FRAME_WIDTH))
tag_frames = np.full((30, frame_count, 3), (255, 0, 0), dtype="uint8")
curr_pos, step_size = 0, cap_fps
while curr_pos < frame_count:
this_pos = int(cap.get(cv.CAP_PROP_POS_FRAMES))
if this_pos != curr_pos:
cap.set(cv.CAP_PROP_POS_FRAMES, curr_pos)
this_pos = curr_pos
ret, frame = cap.read()
if not ret:
print("Can't receive frame (stream end?). Exiting ...")
break
txt = f"{curr_pos=}/{frame_count}:{cap_fps} ({step_size=})"
banner = np.full((30, frame_width, 3), (0, 0, 255), dtype="uint8")
cv.putText(banner, txt, (15, 25), cv.FONT_HERSHEY_SIMPLEX, 1.0, (255, 255, 255), 2)
tag_bar = cv.resize(tag_frames, (frame_width, 30), interpolation=cv.INTER_NEAREST)
center = (int(curr_pos / frame_count * frame_width), 15)
cv.circle(tag_bar, center, 5, (255, 255, 255), -1)
image = np.concatenate((banner, frame, tag_bar))
if factor is not None:
image = cv.resize(image, None, fx=factor, fy=factor, interpolation=cv.INTER_NEAREST)
cv.imshow(video_path, image)
key = cv.waitKey(0)
if key == 27: # esc
break
elif key == ord("w"):
step_size = step_size * 2
elif key == ord("s"):
step_size = step_size // 2
step_size = max(1, step_size)
elif key == ord("a"):
curr_pos = this_pos - step_size
curr_pos = max(0, curr_pos)
elif key == ord("d"):
curr_pos = this_pos + step_size
elif key == ord("f"):
curr_pos = this_pos + step_size
tag_frames[:, this_pos:curr_pos] = (0, 255, 0)
elif key == 32: # space
curr_pos = this_pos + step_size
tag_frames[:, this_pos:curr_pos] = (0, 0, 255)
elif key == 13: # enter
tag_frames[:, this_pos:] = (0, 255, 0)
break
cv.destroyAllWindows()
cap.release()
return tag_frames[0, :, 1]
[docs]
def clip_video(video_path, tag_frames, output_dir, interval, fisheye):
if fisheye is not None:
with open(fisheye, "r") as f:
params = json.load(f)
fisheye_K = np.array(params["fisheye_camera_K"]).reshape(3, 3)
fisheye_D = np.array(params["fisheye_dist"])
img_shape = params["fisheye_image_size"]
map1, map2 = cv.fisheye.initUndistortRectifyMap(
fisheye_K,
fisheye_D,
np.eye(3),
fisheye_K,
img_shape,
cv.CV_32FC1
)
index = 0
keep_frames = 0
limit = tag_frames.size
prefix = Path(video_path).stem
cap = cv.VideoCapture(video_path)
saved_images = 0
while index < limit:
ret, frame = cap.read()
if not ret:
print("Can't receive frame (stream end?). Exiting ...")
break
if tag_frames[index] > 0:
if keep_frames % interval == 0:
if fisheye is not None:
frame = cv.remap(frame, map1, map2, interpolation=cv.INTER_LINEAR)
filename = f"data/{prefix}_i{index:06d}.jpg"
cv.imwrite(str(output_dir / filename), frame)
saved_images += 1
keep_frames += 1
index += 1
cap.release()
return saved_images
[docs]
def func(input_dir, output_dir, factor, interval, fisheye):
input_dir = Path(input_dir)
if input_dir.is_file():
video_paths = [input_dir.as_posix()]
input_dir = input_dir.parent
else:
video_paths = find_videos(input_dir)
if output_dir is not None:
output_dir = Path(output_dir)
else:
new_name = f"{input_dir.name}_clip"
output_dir = input_dir.with_name(new_name)
if fisheye is not None:
if not Path(fisheye).is_file():
fisheye = str(input_dir / fisheye)
if not Path(fisheye).is_file():
fisheye = None
shutil.rmtree(output_dir, ignore_errors=True)
(output_dir / "data").mkdir(parents=True, exist_ok=False)
saved_images = 0
for video_path in video_paths:
tag_frames = tag_video(video_path, factor)
if tag_frames.max() > 0:
n = clip_video(video_path, tag_frames, output_dir, interval, fisheye)
saved_images += n
print(f"[INFO] saved images: {saved_images}")
with open(output_dir / "README.md", "w") as f:
f.write("# README\n\n## Data Processing\n\n")
return f"\n[OUTDIR]\n{output_dir}"
[docs]
def parse_args(args=None):
from argparse import ArgumentDefaultsHelpFormatter, ArgumentParser
parser = ArgumentParser(formatter_class=ArgumentDefaultsHelpFormatter)
parser.add_argument("input_dir", type=str,
help="videos dir or file path")
parser.add_argument("-o", "--output_dir", type=str, default=None,
help="output dir")
parser.add_argument("-f", "--factor", type=float, default=None,
help="resize factor")
parser.add_argument("-i", "--interval", type=int, default=5,
help="sample the frames")
parser.add_argument("-e", "--fisheye", type=str, default=None,
help="fisheye parameter file path")
args = parser.parse_args(args=args)
return vars(args)
[docs]
def main(args=None):
print(help_doc_str)
kwargs = parse_args(args)
print(f"{__file__}: {kwargs}")
print(func(**kwargs))
return 0
if __name__ == "__main__":
sys.exit(main())