codex author changes

This commit is contained in:
MirSob
2026-04-26 00:46:48 +02:00
parent b5c2929062
commit d07c6b033f
9 changed files with 577 additions and 6 deletions

278
apply_abs_mock_report.py Normal file
View File

@@ -0,0 +1,278 @@
#!/usr/bin/env python3
from __future__ import annotations
import argparse
import csv
import shutil
import sys
from dataclasses import dataclass
from pathlib import Path, PurePosixPath
REQUIRED_COLUMNS = {"current_path", "proposed_abs_path", "status"}
class ReportError(ValueError):
pass
@dataclass(frozen=True)
class ReportRow:
line_number: int
source_path: Path
target_rel_path: PurePosixPath
status: str
@dataclass(frozen=True)
class TransferOperation:
line_number: int
source_path: Path
target_rel_path: PurePosixPath
target_path: Path
def parse_report_row(raw_row: dict[str, str], line_number: int) -> ReportRow:
source_raw = (raw_row.get("current_path") or "").strip()
target_raw = (raw_row.get("proposed_abs_path") or "").strip()
status = (raw_row.get("status") or "").strip()
if not source_raw:
raise ReportError(f"Line {line_number}: missing current_path")
if not target_raw:
raise ReportError(f"Line {line_number}: missing proposed_abs_path")
target_rel_path = PurePosixPath(target_raw)
if target_rel_path.is_absolute():
raise ReportError(f"Line {line_number}: proposed_abs_path must stay relative: {target_raw}")
if not target_rel_path.parts:
raise ReportError(f"Line {line_number}: proposed_abs_path is empty")
if any(part in {"", ".", ".."} for part in target_rel_path.parts):
raise ReportError(
f"Line {line_number}: proposed_abs_path contains an unsafe path component: {target_raw}"
)
return ReportRow(
line_number=line_number,
source_path=Path(source_raw).expanduser(),
target_rel_path=target_rel_path,
status=status,
)
def load_report(path: Path) -> list[ReportRow]:
with path.open(encoding="utf-8", newline="") as handle:
reader = csv.DictReader(handle, delimiter="\t")
fieldnames = set(reader.fieldnames or [])
missing = REQUIRED_COLUMNS - fieldnames
if missing:
missing_fields = ", ".join(sorted(missing))
raise ReportError(f"Report is missing required columns: {missing_fields}")
rows: list[ReportRow] = []
for line_number, raw_row in enumerate(reader, start=2):
if not any((value or "").strip() for value in raw_row.values()):
continue
rows.append(parse_report_row(raw_row, line_number))
return rows
def is_same_or_child(path: Path, other: Path) -> bool:
try:
path.relative_to(other)
except ValueError:
return False
return True
def validate_non_overlapping_paths(
entries: list[tuple[Path, str]],
*,
kind: str,
) -> None:
for index, (path, description) in enumerate(entries):
for other_path, other_description in entries[index + 1 :]:
if is_same_or_child(path, other_path) or is_same_or_child(other_path, path):
raise ReportError(
f"{kind} paths overlap: {description} <-> {other_description}"
)
def build_execution_plan(
rows: list[ReportRow],
destination_root: Path,
*,
selected_status: str,
) -> tuple[list[TransferOperation], int]:
selected_rows = rows if selected_status == "all" else [
row for row in rows if row.status == selected_status
]
skipped_rows = len(rows) - len(selected_rows)
resolved_destination_root = destination_root.resolve()
operations: list[TransferOperation] = []
seen_sources: dict[Path, int] = {}
seen_targets: dict[PurePosixPath, int] = {}
for row in selected_rows:
source_path = row.source_path.resolve()
if not source_path.exists():
raise ReportError(
f"Line {row.line_number}: source path does not exist: {row.source_path}"
)
if not source_path.is_dir():
raise ReportError(
f"Line {row.line_number}: source path is not a directory: {row.source_path}"
)
target_path = resolved_destination_root.joinpath(*row.target_rel_path.parts)
if target_path.exists():
raise ReportError(
f"Line {row.line_number}: destination already exists: {target_path}"
)
if is_same_or_child(target_path, source_path):
raise ReportError(
f"Line {row.line_number}: destination cannot point inside the source tree: {target_path}"
)
previous_source_line = seen_sources.get(source_path)
if previous_source_line is not None:
raise ReportError(
f"Line {row.line_number}: duplicate source path already used on line {previous_source_line}: {source_path}"
)
previous_target_line = seen_targets.get(row.target_rel_path)
if previous_target_line is not None:
raise ReportError(
f"Line {row.line_number}: duplicate proposed_abs_path already used on line {previous_target_line}: "
f"{row.target_rel_path.as_posix()}"
)
seen_sources[source_path] = row.line_number
seen_targets[row.target_rel_path] = row.line_number
operations.append(
TransferOperation(
line_number=row.line_number,
source_path=source_path,
target_rel_path=row.target_rel_path,
target_path=target_path,
)
)
validate_non_overlapping_paths(
[(operation.source_path, f"line {operation.line_number}: {operation.source_path}") for operation in operations],
kind="Source",
)
validate_non_overlapping_paths(
[(operation.target_path, f"line {operation.line_number}: {operation.target_path}") for operation in operations],
kind="Destination",
)
return operations, skipped_rows
def execute_plan(
operations: list[TransferOperation],
*,
mode: str,
dry_run: bool,
verbose: bool,
) -> None:
for operation in operations:
if dry_run or verbose:
print(
f"{'plan' if dry_run else mode}\t{operation.source_path}\t{operation.target_path}"
)
if dry_run:
continue
operation.target_path.parent.mkdir(parents=True, exist_ok=True)
if mode == "copy":
shutil.copytree(operation.source_path, operation.target_path)
else:
shutil.move(str(operation.source_path), str(operation.target_path))
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
description="Copy or move audiobook folders into the directory structure proposed by a mock ABS report."
)
parser.add_argument(
"--report",
default="reports/audiobookshelf_mock_report.tsv",
help="TSV report produced by generate_abs_mock_report.py",
)
parser.add_argument(
"--destination-root",
"--dest-root",
"--destination",
required=True,
help="Root directory where the new structure should be created",
)
parser.add_argument(
"--mode",
choices=("copy", "move"),
default="copy",
help="Whether to copy or move each source directory",
)
parser.add_argument(
"--status",
choices=("ready", "review", "all"),
default="ready",
help="Which report rows should be applied",
)
parser.add_argument(
"--dry-run",
action="store_true",
help="Validate the report and print the planned operations without changing files",
)
parser.add_argument(
"--verbose",
action="store_true",
help="Print each completed operation",
)
return parser
def main(argv: list[str] | None = None) -> int:
parser = build_parser()
args = parser.parse_args(argv)
report_path = Path(args.report).expanduser().resolve()
destination_root = Path(args.destination_root).expanduser().resolve()
try:
if not report_path.exists():
raise ReportError(f"Report does not exist: {report_path}")
if destination_root.exists() and not destination_root.is_dir():
raise ReportError(f"Destination root is not a directory: {destination_root}")
rows = load_report(report_path)
operations, skipped_rows = build_execution_plan(
rows,
destination_root,
selected_status=args.status,
)
execute_plan(
operations,
mode=args.mode,
dry_run=args.dry_run,
verbose=args.verbose,
)
except (OSError, ReportError) as error:
print(error, file=sys.stderr)
return 1
print(f"report\t{report_path}")
print(f"destination_root\t{destination_root}")
print(f"mode\t{args.mode}")
print(f"selected_status\t{args.status}")
print(f"rows_total\t{len(rows)}")
print(f"rows_selected\t{len(operations)}")
print(f"rows_skipped\t{skipped_rows}")
print(f"dry_run\t{'yes' if args.dry_run else 'no'}")
return 0
if __name__ == "__main__":
raise SystemExit(main())