Source code for hello.video.split

import shutil
import subprocess
import sys
import time
from pathlib import Path

import cv2 as cv

suffix_set = set(".avi,.mp4,.MOV,.mkv".split(","))


[docs] def find_videos(input_dir): video_paths = [] for f in sorted(Path(input_dir).glob("**/*")): if f.suffix in suffix_set: video_paths.append(f.as_posix()) return video_paths
[docs] def split_video(video_path, duration, out_dir): video_copy = str(out_dir / Path(video_path).name) command_line = f"ffmpeg -i {video_path} -c copy {video_copy}" result = subprocess.run(command_line, shell=True, stdout=subprocess.PIPE) if result.returncode != 0: print(f"[ERR]\n {command_line}") return 0 cap = cv.VideoCapture(video_copy) cap_fps = int(cap.get(cv.CAP_PROP_FPS)) frame_count = int(cap.get(cv.CAP_PROP_FRAME_COUNT)) cap.release() duration_s = duration * 60 total_s = frame_count // cap_fps suffix = Path(video_copy).suffix for i, progress_s in enumerate(range(0, total_s, duration_s)): i_s = progress_s i_m, i_s = divmod(i_s, 60) i_h, i_m = divmod(i_m, 60) curr_start = f"{i_h:02d}:{i_m:02d}:{i_s:02d}" curr_input = video_copy remaining_s = total_s - progress_s if remaining_s < 5: continue i_s = min(remaining_s, duration_s) i_m, i_s = divmod(i_s, 60) i_h, i_m = divmod(i_m, 60) curr_duration = f"{i_h:02d}:{i_m:02d}:{i_s:02d}" prefix = time.strftime(r"%Y%m%d_%H%M%S") if i == 0: curr_output = str(out_dir / f"split/{prefix}{suffix}") else: curr_output = str(out_dir / f"split/{prefix}_{i}{suffix}") command_line = f"ffmpeg -ss {curr_start} -i {curr_input} -t {curr_duration} -c copy {curr_output}" result = subprocess.run(command_line, shell=True, stdout=subprocess.PIPE) if result.returncode != 0: print(f"[ERR]\n {command_line}") return 0
[docs] def func(input_dir, output_dir, duration): input_dir = Path(input_dir) output_dir = Path(output_dir) shutil.rmtree(output_dir, ignore_errors=True) (output_dir / "split").mkdir(parents=True, exist_ok=False) for video_path in find_videos(input_dir): split_video(video_path, duration, output_dir) return str(output_dir)
[docs] def parse_args(args=None): from argparse import ArgumentDefaultsHelpFormatter, ArgumentParser parser = ArgumentParser(formatter_class=ArgumentDefaultsHelpFormatter) parser.add_argument("input_dir", type=str, help="input dir") parser.add_argument("output_dir", type=str, help="output dir") parser.add_argument("--duration", type=int, default=5, help="minutes per segment") args = parser.parse_args(args=args) return vars(args)
[docs] def main(args=None): kwargs = parse_args(args) print(f"{__file__}: {kwargs}") print(func(**kwargs)) return 0
if __name__ == "__main__": sys.exit(main())