#!/usr/bin/env python3 from __future__ import annotations import argparse import csv import shutil import sys from dataclasses import dataclass from pathlib import Path, PurePosixPath REQUIRED_COLUMNS = {"current_path", "proposed_abs_path", "status"} class ReportError(ValueError): pass @dataclass(frozen=True) class ReportRow: line_number: int source_path: Path target_rel_path: PurePosixPath status: str @dataclass(frozen=True) class TransferOperation: line_number: int source_path: Path target_rel_path: PurePosixPath target_path: Path def parse_report_row(raw_row: dict[str, str], line_number: int) -> ReportRow: source_raw = (raw_row.get("current_path") or "").strip() target_raw = (raw_row.get("proposed_abs_path") or "").strip() status = (raw_row.get("status") or "").strip() if not source_raw: raise ReportError(f"Line {line_number}: missing current_path") if not target_raw: raise ReportError(f"Line {line_number}: missing proposed_abs_path") target_rel_path = PurePosixPath(target_raw) if target_rel_path.is_absolute(): raise ReportError(f"Line {line_number}: proposed_abs_path must stay relative: {target_raw}") if not target_rel_path.parts: raise ReportError(f"Line {line_number}: proposed_abs_path is empty") if any(part in {"", ".", ".."} for part in target_rel_path.parts): raise ReportError( f"Line {line_number}: proposed_abs_path contains an unsafe path component: {target_raw}" ) return ReportRow( line_number=line_number, source_path=Path(source_raw).expanduser(), target_rel_path=target_rel_path, status=status, ) def load_report(path: Path) -> list[ReportRow]: with path.open(encoding="utf-8", newline="") as handle: reader = csv.DictReader(handle, delimiter="\t") fieldnames = set(reader.fieldnames or []) missing = REQUIRED_COLUMNS - fieldnames if missing: missing_fields = ", ".join(sorted(missing)) raise ReportError(f"Report is missing required columns: {missing_fields}") rows: list[ReportRow] = [] for line_number, raw_row in enumerate(reader, start=2): if not any((value or "").strip() for value in raw_row.values()): continue rows.append(parse_report_row(raw_row, line_number)) return rows def is_same_or_child(path: Path, other: Path) -> bool: try: path.relative_to(other) except ValueError: return False return True def validate_non_overlapping_paths( entries: list[tuple[Path, str]], *, kind: str, ) -> None: for index, (path, description) in enumerate(entries): for other_path, other_description in entries[index + 1 :]: if is_same_or_child(path, other_path) or is_same_or_child(other_path, path): raise ReportError( f"{kind} paths overlap: {description} <-> {other_description}" ) def build_execution_plan( rows: list[ReportRow], destination_root: Path, *, selected_status: str, ) -> tuple[list[TransferOperation], int]: selected_rows = rows if selected_status == "all" else [ row for row in rows if row.status == selected_status ] skipped_rows = len(rows) - len(selected_rows) resolved_destination_root = destination_root.resolve() operations: list[TransferOperation] = [] seen_sources: dict[Path, int] = {} seen_targets: dict[PurePosixPath, int] = {} for row in selected_rows: source_path = row.source_path.resolve() if not source_path.exists(): raise ReportError( f"Line {row.line_number}: source path does not exist: {row.source_path}" ) if not source_path.is_dir(): raise ReportError( f"Line {row.line_number}: source path is not a directory: {row.source_path}" ) target_path = resolved_destination_root.joinpath(*row.target_rel_path.parts) if target_path.exists(): raise ReportError( f"Line {row.line_number}: destination already exists: {target_path}" ) if is_same_or_child(target_path, source_path): raise ReportError( f"Line {row.line_number}: destination cannot point inside the source tree: {target_path}" ) previous_source_line = seen_sources.get(source_path) if previous_source_line is not None: raise ReportError( f"Line {row.line_number}: duplicate source path already used on line {previous_source_line}: {source_path}" ) previous_target_line = seen_targets.get(row.target_rel_path) if previous_target_line is not None: raise ReportError( f"Line {row.line_number}: duplicate proposed_abs_path already used on line {previous_target_line}: " f"{row.target_rel_path.as_posix()}" ) seen_sources[source_path] = row.line_number seen_targets[row.target_rel_path] = row.line_number operations.append( TransferOperation( line_number=row.line_number, source_path=source_path, target_rel_path=row.target_rel_path, target_path=target_path, ) ) validate_non_overlapping_paths( [(operation.source_path, f"line {operation.line_number}: {operation.source_path}") for operation in operations], kind="Source", ) validate_non_overlapping_paths( [(operation.target_path, f"line {operation.line_number}: {operation.target_path}") for operation in operations], kind="Destination", ) return operations, skipped_rows def execute_plan( operations: list[TransferOperation], *, mode: str, dry_run: bool, verbose: bool, ) -> None: for operation in operations: if dry_run or verbose: print( f"{'plan' if dry_run else mode}\t{operation.source_path}\t{operation.target_path}" ) if dry_run: continue operation.target_path.parent.mkdir(parents=True, exist_ok=True) if mode == "copy": shutil.copytree(operation.source_path, operation.target_path) else: shutil.move(str(operation.source_path), str(operation.target_path)) def build_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser( description="Copy or move audiobook folders into the directory structure proposed by a mock ABS report." ) parser.add_argument( "--report", default="reports/audiobookshelf_mock_report.tsv", help="TSV report produced by generate_abs_mock_report.py", ) parser.add_argument( "--destination-root", "--dest-root", "--destination", required=True, help="Root directory where the new structure should be created", ) parser.add_argument( "--mode", choices=("copy", "move"), default="copy", help="Whether to copy or move each source directory", ) parser.add_argument( "--status", choices=("ready", "review", "all"), default="ready", help="Which report rows should be applied", ) parser.add_argument( "--dry-run", action="store_true", help="Validate the report and print the planned operations without changing files", ) parser.add_argument( "--verbose", action="store_true", help="Print each completed operation", ) return parser def main(argv: list[str] | None = None) -> int: parser = build_parser() args = parser.parse_args(argv) report_path = Path(args.report).expanduser().resolve() destination_root = Path(args.destination_root).expanduser().resolve() try: if not report_path.exists(): raise ReportError(f"Report does not exist: {report_path}") if destination_root.exists() and not destination_root.is_dir(): raise ReportError(f"Destination root is not a directory: {destination_root}") rows = load_report(report_path) operations, skipped_rows = build_execution_plan( rows, destination_root, selected_status=args.status, ) execute_plan( operations, mode=args.mode, dry_run=args.dry_run, verbose=args.verbose, ) except (OSError, ReportError) as error: print(error, file=sys.stderr) return 1 print(f"report\t{report_path}") print(f"destination_root\t{destination_root}") print(f"mode\t{args.mode}") print(f"selected_status\t{args.status}") print(f"rows_total\t{len(rows)}") print(f"rows_selected\t{len(operations)}") print(f"rows_skipped\t{skipped_rows}") print(f"dry_run\t{'yes' if args.dry_run else 'no'}") return 0 if __name__ == "__main__": raise SystemExit(main())