#!/usr/bin/env python3 from __future__ import annotations import argparse import logging from logging.handlers import RotatingFileHandler import os from dataclasses import dataclass from datetime import datetime, time from enum import Enum from pathlib import Path import shutil import subprocess import sys import tempfile import time as sleep_time from urllib.parse import quote, urlsplit, urlunsplit from uuid import uuid4 DEFAULT_LABEL = "ferngrowth" DEFAULT_WINDOW_START = "10:00" DEFAULT_WINDOW_END = "16:00" DEFAULT_LOG_FILE = Path("logs/fern_timelapse.log") DEFAULT_CAPTURE_DIR = Path("captures") DEFAULT_VIDEO_FILE = Path("timelapse/ferngrowth_timelapse.mp4") RTSP_ENV_VAR = "FERN_RTSP_URL" CAMERA_HOST_ENV_VAR = "FERN_CAMERA_HOST" CAMERA_USER_ENV_VAR = "FERN_CAMERA_USER" CAMERA_PASSWORD_ENV_VAR = "FERN_CAMERA_PASSWORD" CAMERA_PORT_ENV_VAR = "FERN_CAMERA_PORT" CAMERA_PATH_ENV_VAR = "FERN_CAMERA_PATH" class CaptureOutcome(Enum): CAPTURED = "captured" SKIPPED = "skipped" FAILED = "failed" @dataclass(frozen=True) class TimeWindow: start: time end: time def contains(self, candidate: time) -> bool: if self.start == self.end: return True if self.start < self.end: return self.start <= candidate < self.end return candidate >= self.start or candidate < self.end @dataclass(frozen=True) class RetryPolicy: max_attempts: int initial_backoff_seconds: float backoff_multiplier: float max_backoff_seconds: float @dataclass(frozen=True) class CaptureConfig: camera_url: str output_dir: Path label: str window: TimeWindow ignore_window: bool transport: str read_timeout_seconds: float process_timeout_seconds: float jpeg_quality: int ffmpeg_bin: str ffmpeg_input_args: tuple[str, ...] ffmpeg_output_args: tuple[str, ...] retry_policy: RetryPolicy @dataclass(frozen=True) class CompileConfig: input_dir: Path output_file: Path label: str fps: int crf: int preset: str ffmpeg_bin: str process_timeout_seconds: float def parse_clock_time(raw_value: str) -> time: try: return datetime.strptime(raw_value, "%H:%M").time() except ValueError as exc: raise argparse.ArgumentTypeError( f"Invalid time value '{raw_value}'. Expected HH:MM in 24-hour format." ) from exc def positive_int(raw_value: str) -> int: value = int(raw_value) if value <= 0: raise argparse.ArgumentTypeError("Value must be greater than zero.") return value def positive_float(raw_value: str) -> float: value = float(raw_value) if value <= 0: raise argparse.ArgumentTypeError("Value must be greater than zero.") return value def non_negative_float(raw_value: str) -> float: value = float(raw_value) if value < 0: raise argparse.ArgumentTypeError("Value must be zero or greater.") return value def jpeg_quality_value(raw_value: str) -> int: value = int(raw_value) if not 1 <= value <= 31: raise argparse.ArgumentTypeError("JPEG quality must be between 1 and 31. Lower is better.") return value def create_logger(log_file: Path) -> logging.Logger: log_file.parent.mkdir(parents=True, exist_ok=True) logger = logging.getLogger("fern_timelapse") logger.setLevel(logging.INFO) logger.handlers.clear() logger.propagate = False formatter = logging.Formatter("%(asctime)s %(levelname)s %(message)s") file_handler = RotatingFileHandler( log_file, maxBytes=5_000_000, backupCount=5, encoding="utf-8", ) file_handler.setFormatter(formatter) logger.addHandler(file_handler) return logger def ensure_ffmpeg_available(ffmpeg_bin: str) -> None: if shutil.which(ffmpeg_bin) is None: raise FileNotFoundError(f"ffmpeg binary not found: {ffmpeg_bin}") def sanitize_url(url: str) -> str: parsed = urlsplit(url) if not parsed.netloc: return url username = parsed.username or "" masked_user = username if username else "user" host = parsed.hostname or "" port = f":{parsed.port}" if parsed.port else "" masked_netloc = f"{masked_user}:***@{host}{port}" if parsed.username or parsed.password else parsed.netloc return urlunsplit((parsed.scheme, masked_netloc, parsed.path, parsed.query, parsed.fragment)) def redact_text(text: str, secret: str, replacement: str) -> str: if not text or not secret: return text return text.replace(secret, replacement) def sanitize_label(raw_label: str) -> str: cleaned = "".join(character if character.isalnum() or character in {"-", "_"} else "-" for character in raw_label.strip()) cleaned = cleaned.strip("-_") if not cleaned: raise ValueError("Label must contain at least one letter or digit.") return cleaned def normalize_camera_path(path: str) -> str: normalized = path.strip() if not normalized: raise ValueError("Camera path must not be empty.") if not normalized.startswith("/"): normalized = f"/{normalized}" return normalized def build_rtsp_url( host: str, path: str, port: int, username: str | None = None, password: str | None = None, ) -> str: host = host.strip() if not host: raise ValueError("Camera host must not be empty.") normalized_path = normalize_camera_path(path) auth = "" if username: encoded_user = quote(username, safe="") encoded_password = quote(password or "", safe="") auth = f"{encoded_user}:{encoded_password}@" return f"rtsp://{auth}{host}:{port}{normalized_path}" def build_frame_path(output_dir: Path, label: str, captured_at: datetime) -> Path: date_dir = output_dir / f"{captured_at:%Y}" / f"{captured_at:%m}" / f"{captured_at:%d}" filename = f"{captured_at:%Y-%m-%d_%H-%M-%S}_{label}.jpg" return date_dir / filename def format_window(window: TimeWindow) -> str: return f"{window.start.strftime('%H:%M')} - {window.end.strftime('%H:%M')}" def tail_stderr(stderr_text: str, limit: int = 10) -> str: if not stderr_text: return "" lines = [line.strip() for line in stderr_text.splitlines() if line.strip()] return "\n".join(lines[-limit:]) def build_capture_command(config: CaptureConfig, target_path: Path) -> list[str]: input_timeout_microseconds = str(int(config.read_timeout_seconds * 1_000_000)) return [ config.ffmpeg_bin, "-hide_banner", "-loglevel", "error", "-nostdin", "-rtsp_transport", config.transport, "-timeout", input_timeout_microseconds, *config.ffmpeg_input_args, "-i", config.camera_url, "-map", "0:v:0", "-an", "-sn", "-dn", "-frames:v", "1", "-c:v", "mjpeg", "-q:v", str(config.jpeg_quality), *config.ffmpeg_output_args, "-y", str(target_path), ] def run_subprocess(command: list[str], timeout_seconds: float) -> subprocess.CompletedProcess[str]: return subprocess.run( command, capture_output=True, text=True, check=False, timeout=timeout_seconds, ) def capture_frame(config: CaptureConfig, logger: logging.Logger) -> CaptureOutcome: now = datetime.now().astimezone() if not config.ignore_window and not config.window.contains(now.time()): logger.info( "Skipped capture outside window. now=%s window=%s", now.strftime("%Y-%m-%d %H:%M:%S %Z"), format_window(config.window), ) return CaptureOutcome.SKIPPED target_path = build_frame_path(config.output_dir, config.label, now) target_path.parent.mkdir(parents=True, exist_ok=True) if target_path.exists() and target_path.stat().st_size > 0: logger.warning("Skipped capture because target already exists: %s", target_path) return CaptureOutcome.SKIPPED sanitized_url = sanitize_url(config.camera_url) delay_seconds = config.retry_policy.initial_backoff_seconds for attempt in range(1, config.retry_policy.max_attempts + 1): temp_path = target_path.with_name(f"{target_path.stem}.{uuid4().hex}.tmp.jpg") command = build_capture_command(config, temp_path) try: logger.info( "Starting capture attempt=%s/%s target=%s source=%s transport=%s", attempt, config.retry_policy.max_attempts, target_path, sanitized_url, config.transport, ) completed = run_subprocess(command, config.process_timeout_seconds) if completed.returncode == 0 and temp_path.exists() and temp_path.stat().st_size > 0: temp_path.replace(target_path) logger.info("Capture succeeded target=%s bytes=%s", target_path, target_path.stat().st_size) return CaptureOutcome.CAPTURED stderr_tail = tail_stderr(redact_text(completed.stderr, config.camera_url, sanitized_url)) logger.warning( "Capture attempt failed attempt=%s/%s returncode=%s stderr=%s", attempt, config.retry_policy.max_attempts, completed.returncode, stderr_tail or "", ) except subprocess.TimeoutExpired: logger.warning( "Capture attempt timed out attempt=%s/%s timeout=%.1fs", attempt, config.retry_policy.max_attempts, config.process_timeout_seconds, ) finally: if temp_path.exists(): temp_path.unlink() if attempt < config.retry_policy.max_attempts: logger.info("Retrying capture in %.1f seconds", delay_seconds) sleep_time.sleep(delay_seconds) delay_seconds = min(delay_seconds * config.retry_policy.backoff_multiplier, config.retry_policy.max_backoff_seconds) logger.error("All capture attempts failed target=%s source=%s", target_path, sanitized_url) return CaptureOutcome.FAILED def list_frames(input_dir: Path, label: str) -> list[Path]: if not input_dir.exists(): raise FileNotFoundError(f"Input directory does not exist: {input_dir}") pattern = f"*_{label}.jpg" return sorted( (path for path in input_dir.rglob(pattern) if path.is_file()), key=lambda path: path.relative_to(input_dir).as_posix(), ) def compile_video(config: CompileConfig, logger: logging.Logger) -> Path: frames = list_frames(config.input_dir, config.label) if not frames: raise FileNotFoundError( f"No frames found in {config.input_dir} matching '*_{config.label}.jpg'." ) config.output_file.parent.mkdir(parents=True, exist_ok=True) ensure_ffmpeg_available(config.ffmpeg_bin) logger.info( "Compiling timelapse frames=%s fps=%s crf=%s preset=%s output=%s", len(frames), config.fps, config.crf, config.preset, config.output_file, ) with tempfile.TemporaryDirectory(prefix="fern-seq-", dir=str(config.output_file.parent)) as temp_dir: sequence_dir = Path(temp_dir) for index, frame in enumerate(frames, start=1): sequence_frame = sequence_dir / f"frame_{index:06d}.jpg" try: os.symlink(frame.resolve(), sequence_frame) except OSError: shutil.copy2(frame, sequence_frame) input_pattern = sequence_dir / "frame_%06d.jpg" command = [ config.ffmpeg_bin, "-hide_banner", "-loglevel", "error", "-nostdin", "-y", "-framerate", str(config.fps), "-start_number", "1", "-i", str(input_pattern), "-c:v", "libx264", "-preset", config.preset, "-crf", str(config.crf), "-pix_fmt", "yuv420p", "-movflags", "+faststart", str(config.output_file), ] completed = run_subprocess(command, config.process_timeout_seconds) if completed.returncode != 0: stderr_tail = tail_stderr(completed.stderr) raise RuntimeError(f"ffmpeg compile failed: {stderr_tail or ''}") logger.info("Timelapse compile succeeded output=%s", config.output_file) return config.output_file def build_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser( description="Capture RTSP fern-growth frames and compile them into a timelapse video.", formatter_class=argparse.ArgumentDefaultsHelpFormatter, ) subparsers = parser.add_subparsers(dest="command", required=True) capture_parser = subparsers.add_parser( "capture", help="Capture one frame if the current time is inside the allowed daily window.", formatter_class=argparse.ArgumentDefaultsHelpFormatter, ) capture_parser.add_argument("--camera-url", help=f"RTSP URL. Falls back to ${RTSP_ENV_VAR} when omitted.") capture_parser.add_argument("--camera-host", help=f"Camera IP or hostname. Falls back to ${CAMERA_HOST_ENV_VAR} when omitted.") capture_parser.add_argument("--camera-user", help=f"Camera username. Falls back to ${CAMERA_USER_ENV_VAR} when omitted.") capture_parser.add_argument("--camera-password", help=f"Camera password. Falls back to ${CAMERA_PASSWORD_ENV_VAR} when omitted.") capture_parser.add_argument("--camera-port", type=positive_int, default=554, help=f"RTSP port. Falls back to ${CAMERA_PORT_ENV_VAR} when set and --camera-url is omitted.") capture_parser.add_argument("--camera-path", default="/stream1", help=f"RTSP path. Falls back to ${CAMERA_PATH_ENV_VAR} when set and --camera-url is omitted.") capture_parser.add_argument("--output-dir", type=Path, default=DEFAULT_CAPTURE_DIR, help="Base directory for JPEG output.") capture_parser.add_argument("--label", default=DEFAULT_LABEL, help="Filename suffix used in each frame.") capture_parser.add_argument("--window-start", type=parse_clock_time, default=parse_clock_time(DEFAULT_WINDOW_START), help="Daily capture window start.") capture_parser.add_argument("--window-end", type=parse_clock_time, default=parse_clock_time(DEFAULT_WINDOW_END), help="Daily capture window end. End is exclusive.") capture_parser.add_argument("--ignore-window", action="store_true", help="Capture even when the current time is outside the configured window.") capture_parser.add_argument("--transport", choices=("tcp", "udp"), default="tcp", help="RTSP transport mode.") capture_parser.add_argument("--read-timeout", type=positive_float, default=15.0, help="Per-read network timeout passed to ffmpeg, in seconds.") capture_parser.add_argument("--process-timeout", type=positive_float, default=45.0, help="Maximum wall-clock time allowed for one ffmpeg capture attempt.") capture_parser.add_argument("--jpeg-quality", type=jpeg_quality_value, default=2, help="ffmpeg MJPEG quality. Lower values are higher quality.") capture_parser.add_argument("--retries", type=positive_int, default=4, help="How many capture attempts to make before giving up.") capture_parser.add_argument("--initial-backoff", type=non_negative_float, default=2.0, help="Initial retry delay in seconds.") capture_parser.add_argument("--backoff-multiplier", type=positive_float, default=2.0, help="Multiplier used for exponential backoff.") capture_parser.add_argument("--max-backoff", type=positive_float, default=60.0, help="Maximum retry delay in seconds.") capture_parser.add_argument("--ffmpeg-bin", default="ffmpeg", help="Path to ffmpeg binary.") capture_parser.add_argument("--ffmpeg-input-arg", action="append", default=[], help="Extra ffmpeg input argument. Repeat for multiple values.") capture_parser.add_argument("--ffmpeg-output-arg", action="append", default=[], help="Extra ffmpeg output argument. Repeat for multiple values.") capture_parser.add_argument("--log-file", type=Path, default=DEFAULT_LOG_FILE, help="Path to the rotating log file.") compile_parser = subparsers.add_parser( "compile", help="Compile captured JPEGs into an H.264 MP4 timelapse.", formatter_class=argparse.ArgumentDefaultsHelpFormatter, ) compile_parser.add_argument("--input-dir", type=Path, default=DEFAULT_CAPTURE_DIR, help="Base directory that contains captured JPEGs.") compile_parser.add_argument("--label", default=DEFAULT_LABEL, help="Only include frames with this filename suffix.") compile_parser.add_argument("--output-file", type=Path, default=DEFAULT_VIDEO_FILE, help="Target MP4 file.") compile_parser.add_argument("--fps", type=positive_int, default=30, help="Output frames per second.") compile_parser.add_argument("--crf", type=positive_int, default=17, help="libx264 CRF value. Lower is higher quality.") compile_parser.add_argument("--preset", default="slow", help="libx264 preset.") compile_parser.add_argument("--ffmpeg-bin", default="ffmpeg", help="Path to ffmpeg binary.") compile_parser.add_argument("--process-timeout", type=positive_float, default=3600.0, help="Maximum wall-clock time allowed for ffmpeg video compilation.") compile_parser.add_argument("--log-file", type=Path, default=DEFAULT_LOG_FILE, help="Path to the rotating log file.") return parser def build_capture_config(args: argparse.Namespace) -> CaptureConfig: label = sanitize_label(args.label) camera_url = args.camera_url or os.environ.get(RTSP_ENV_VAR, "") if not camera_url: camera_host = args.camera_host or os.environ.get(CAMERA_HOST_ENV_VAR, "") camera_user = args.camera_user or os.environ.get(CAMERA_USER_ENV_VAR) camera_password = args.camera_password or os.environ.get(CAMERA_PASSWORD_ENV_VAR) camera_port = int(os.environ.get(CAMERA_PORT_ENV_VAR, args.camera_port)) camera_path = os.environ.get(CAMERA_PATH_ENV_VAR, args.camera_path) if not camera_host: raise ValueError( "Missing camera source. Pass --camera-url, set " f"{RTSP_ENV_VAR}, or provide --camera-host/{CAMERA_HOST_ENV_VAR}." ) camera_url = build_rtsp_url( host=camera_host, path=camera_path, port=camera_port, username=camera_user, password=camera_password, ) return CaptureConfig( camera_url=camera_url, output_dir=args.output_dir, label=label, window=TimeWindow(start=args.window_start, end=args.window_end), ignore_window=args.ignore_window, transport=args.transport, read_timeout_seconds=args.read_timeout, process_timeout_seconds=args.process_timeout, jpeg_quality=args.jpeg_quality, ffmpeg_bin=args.ffmpeg_bin, ffmpeg_input_args=tuple(args.ffmpeg_input_arg), ffmpeg_output_args=tuple(args.ffmpeg_output_arg), retry_policy=RetryPolicy( max_attempts=args.retries, initial_backoff_seconds=args.initial_backoff, backoff_multiplier=args.backoff_multiplier, max_backoff_seconds=args.max_backoff, ), ) def build_compile_config(args: argparse.Namespace) -> CompileConfig: return CompileConfig( input_dir=args.input_dir, output_file=args.output_file, label=sanitize_label(args.label), fps=args.fps, crf=args.crf, preset=args.preset, ffmpeg_bin=args.ffmpeg_bin, process_timeout_seconds=args.process_timeout, ) def main(argv: list[str] | None = None) -> int: parser = build_parser() args = parser.parse_args(argv) logger: logging.Logger | None = None try: logger = create_logger(args.log_file) if args.command == "capture": config = build_capture_config(args) ensure_ffmpeg_available(config.ffmpeg_bin) outcome = capture_frame(config, logger) return 0 if outcome in {CaptureOutcome.CAPTURED, CaptureOutcome.SKIPPED} else 1 if args.command == "compile": config = build_compile_config(args) output_file = compile_video(config, logger) logger.info("Created timelapse video %s", output_file) return 0 parser.error(f"Unsupported command: {args.command}") except Exception as exc: if logger is not None: logger.exception("Command failed: %s", exc) print(f"Error: {exc}", file=sys.stderr) return 1 if __name__ == "__main__": raise SystemExit(main())