Files
audiobookshelf/generate_abs_mock_report.py
2026-04-26 00:46:48 +02:00

949 lines
32 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
from __future__ import annotations
import argparse
import csv
import os
import re
import unicodedata
import xml.etree.ElementTree as ET
from dataclasses import dataclass, field
from pathlib import Path
# Manual verification workflow:
# - `verification_source` in `data/verified_author_overrides.tsv` should point to
# the page used to confirm corrected metadata.
# - `lubimyczytac.pl` is an accepted auxiliary source for author, title, and
# series/cycle verification when audiobook storefront metadata is missing or
# ambiguous.
AUDIO_EXTS = {
".aac",
".flac",
".m4a",
".m4b",
".mp3",
".ogg",
".opus",
".wav",
}
DISC_RE = re.compile(r"^(disc|cd|disk)\s*0*\d+$", re.IGNORECASE)
TRACK_PREFIX_RE = re.compile(r"^\s*(cd|disc|disk)?\s*\d+\s*([._ -]+)\s*", re.IGNORECASE)
YEAR_RE = re.compile(r"^\(?((?:19|20)\d{2})\)?\s*-\s*(.+)$")
LEADING_SEQ_RE = re.compile(r"^\[?(\d+(?:[.,]\d+)?)\]?\s*(?:[.-]\s+|\s+-\s+)(.+)$")
SEQ_LABEL_RE = re.compile(
r"(?i)(?:^| - )(?:(?:vol(?:ume)?|book|tom|t|cz(?:eść|esc)?)\.?\s*)(\d+(?:[.,]\d+)?)"
)
BRACKET_SEQ_RE = re.compile(r"^\[(\d+(?:[.,]\d+)?)\]\s*(.+)$")
SERIES_SEQ_PATTERNS = [
re.compile(r"^(.+?)\s+(?:tom|vol(?:ume)?|book)\.?\s*(\d+(?:[.,]\d+)?)\s*-\s*(.+)$", re.IGNORECASE),
re.compile(r"^(.+?)\s*-\s*(\d+(?:[.,]\d+)?)\s+(.+)$"),
]
WORD_TOKEN_RE = re.compile(r"[A-Za-zÀ-ÖØ-öø-ÿĀ-žĄĆĘŁŃÓŚŹŻąćęłńóśźż0-9&']+")
NARRATOR_PATTERNS = [
re.compile(r"(?i)\[(?:czyta|czyt\.?|lektor|read by)\s+([^\]]+)\]"),
re.compile(r"(?i)\((?:czyta|czyt\.?|lektor|read by)\s+([^)]+)\)"),
re.compile(r"(?i)(?:^| - |\s)(?:czyta|czyt\.?|lektor|read by)\s+(.+)$"),
]
NOISE_PATTERNS = [
re.compile(r"(?i)\[[^]]*kbps[^]]*\]"),
re.compile(r"(?i)\[[^]]*czas[^]]*\]"),
re.compile(r"(?i)\([^)]*(?:trawel|hex-lub|lapiejko|elgrande|kwgr)[^)]*\)"),
]
GENERIC_KEYWORDS = {
"angielski",
"audiobook",
"audiobooki",
"biblioteka",
"brakuje",
"brakujace",
"czyta",
"czyt",
"cykl",
"do przeczytania",
"eng",
"kolekcja",
"lekt",
"lektor",
"magazyn",
"nowe",
"sluchowisko",
}
NAME_PARTICLES = {"al", "bin", "da", "de", "del", "di", "la", "le", "van", "von", "and"}
TITLE_CONNECTORS = {"a", "and", "bez", "dla", "do", "for", "i", "na", "nad", "o", "of", "po", "pod", "przez", "the", "to", "w", "z", "ze"}
@dataclass
class BookRoot:
path: Path
audio_files: list[Path] = field(default_factory=list)
def normalize_unicode(text: str) -> str:
return unicodedata.normalize("NFKC", text)
def collapse_spaces(text: str) -> str:
return re.sub(r"\s+", " ", text).strip()
def clean_component(raw: str) -> str:
text = normalize_unicode(raw)
text = text.replace("_", " ")
text = text.replace(".", " ")
text = text.replace("", " - ")
text = text.replace("", " - ")
text = collapse_spaces(text)
text = re.sub(r"\s*-\s*", " - ", text)
return collapse_spaces(text)
def ascii_fold(text: str) -> str:
special_map = str.maketrans({
"Ł": "L",
"ł": "l",
})
normalized = unicodedata.normalize("NFKD", text.translate(special_map))
return "".join(ch for ch in normalized if not unicodedata.combining(ch))
def signature(text: str) -> str:
folded = ascii_fold(clean_component(text)).lower()
folded = re.sub(r"[^a-z0-9]+", " ", folded)
return collapse_spaces(folded)
def same_signature_tokens(left: str, right: str) -> bool:
left_tokens = signature(left).split()
right_tokens = signature(right).split()
if not left_tokens or not right_tokens:
return False
return sorted(left_tokens) == sorted(right_tokens)
def contains_signature_tokens(container: str, text: str) -> bool:
container_tokens = set(signature(container).split())
needle_tokens = set(signature(text).split())
if not container_tokens or not needle_tokens:
return False
return needle_tokens.issubset(container_tokens)
def looks_generic(text: str) -> bool:
sig = signature(text)
if not sig:
return True
if any(keyword in sig for keyword in GENERIC_KEYWORDS):
return True
if re.search(r"\d", sig) and len(sig.split()) <= 4:
return True
return False
def author_confidence_for(name: str) -> str:
sig = signature(name)
if not sig:
return "none"
tokens = sig.split()
initials = sum(1 for token in tokens if len(token) == 1)
if 2 <= len(tokens) <= 4:
return "high"
if len(tokens) == 1 or initials:
return "medium"
return "low"
def is_author_like(text: str) -> bool:
cleaned = clean_component(text)
if not cleaned or looks_generic(cleaned):
return False
if " - " in cleaned:
return False
allowed = re.sub(r"[A-Za-zÀ-ÖØ-öø-ÿĀ-žĄĆĘŁŃÓŚŹŻąćęłńóśźż0-9,&' /-]", "", cleaned)
if allowed:
return False
if re.search(r"\d", cleaned):
return False
sig = signature(cleaned)
tokens = sig.split()
if not 1 <= len(tokens) <= 5:
return False
if all(len(token) == 1 for token in tokens):
return False
initials = sum(1 for token in tokens if len(token) == 1)
if len(tokens) == 5 and initials == 0:
return False
raw_tokens = re.findall(r"[A-Za-zÀ-ÖØ-öø-ÿĀ-žĄĆĘŁŃÓŚŹŻąćęłńóśźż]+", cleaned)
for token in raw_tokens:
if len(token) == 1:
continue
if token[0].islower() and token.lower() not in NAME_PARTICLES:
return False
return True
def strip_noise(text: str) -> tuple[str, list[str]]:
notes: list[str] = []
result = text
for pattern in NOISE_PATTERNS:
if pattern.search(result):
notes.append("removed technical/uploader marker")
result = pattern.sub("", result)
return collapse_spaces(result), notes
def extract_narrator(text: str) -> tuple[str, str]:
result = text
narrator = ""
for pattern in NARRATOR_PATTERNS:
match = pattern.search(result)
if not match:
continue
narrator = collapse_spaces(match.group(1))
result = collapse_spaces(pattern.sub("", result))
break
return result, narrator
def extract_year(text: str) -> tuple[str, str]:
match = YEAR_RE.match(text)
if not match:
return text, ""
return collapse_spaces(match.group(2)), match.group(1)
def extract_sequence(text: str) -> tuple[str, str]:
working = text.lstrip("- ").strip()
match = BRACKET_SEQ_RE.match(working)
if match:
return collapse_spaces(match.group(2)), match.group(1)
match = LEADING_SEQ_RE.match(working)
if match:
seq = match.group(1)
if len(seq) < 4:
return collapse_spaces(match.group(2)), seq
match = SEQ_LABEL_RE.search(working)
if not match:
return working, ""
seq = match.group(1)
start, end = match.span()
stripped = collapse_spaces((working[:start] + working[end:]).strip(" -"))
return stripped, seq
def strip_author_prefix(text: str, author: str) -> str:
cleaned_tokens = component_words(text)
author_words = component_words(author)
if not cleaned_tokens or not author_words:
return text
if len(cleaned_tokens) <= len(author_words):
return text
prefix = " ".join(cleaned_tokens[: len(author_words)])
if not same_signature_tokens(prefix, author):
return text
stripped = collapse_spaces(" ".join(cleaned_tokens[len(author_words) :]))
return stripped.lstrip("- ").strip()
def normalize_author_segments(segments: list[str]) -> str:
cleaned = [collapse_spaces(segment) for segment in segments if collapse_spaces(segment)]
if not cleaned:
return ""
if len(cleaned) == 1:
return cleaned[0]
return "-".join(cleaned)
def is_author_like_flexible(text: str) -> bool:
if is_author_like(text):
return True
if "-" in text:
return is_author_like(text.replace("-", " "))
return False
def author_title_candidate_score(author: str, title: str) -> int:
author_words = len(component_words(author))
title_tokens = signature(title).split()
score = author_words * 10 + min(len(title_tokens), 6)
if author_words == 1:
score -= 2
if re.search(r"\d", title):
score += 2
if " - " in title:
score += 1
return score
def split_author_title(text: str) -> tuple[str, str]:
cleaned = clean_component(text)
if " - " in cleaned:
parts = [collapse_spaces(part) for part in cleaned.split(" - ") if collapse_spaces(part)]
best_author = ""
best_title = ""
best_score = -1
for size in range(1, len(parts)):
left_author = normalize_author_segments(parts[:size])
right_title = collapse_spaces(" - ".join(parts[size:]))
if left_author and right_title and is_author_like_flexible(left_author):
score = author_title_candidate_score(left_author, right_title)
if score > best_score:
best_author = left_author
best_title = right_title
best_score = score
right_author = normalize_author_segments(parts[size:])
left_title = collapse_spaces(" - ".join(parts[:size]))
if right_author and left_title and is_author_like_flexible(right_author):
score = author_title_candidate_score(right_author, left_title) - 1
if score > best_score:
best_author = right_author
best_title = left_title
best_score = score
if best_score >= 0:
return best_author, best_title
tokens = cleaned.split()
for size in range(2, min(5, len(tokens) - 1) + 1):
author = " ".join(tokens[:size])
title = " ".join(tokens[size:])
if not is_author_like(author):
continue
title_tokens = signature(title).split()
if TITLE_CONNECTORS.intersection(title_tokens):
return author, title
return "", ""
def component_words(text: str) -> list[str]:
return WORD_TOKEN_RE.findall(clean_component(text))
def find_token_sequence(text: str, sequence_tokens: list[str]) -> str:
if not sequence_tokens:
return ""
words = component_words(text)
word_tokens = [signature(word) for word in words]
for start in range(0, len(word_tokens) - len(sequence_tokens) + 1):
if word_tokens[start : start + len(sequence_tokens)] == sequence_tokens:
return collapse_spaces(" ".join(words[start : start + len(sequence_tokens)]))
return ""
def extract_author_from_title_context(text: str, known_title: str) -> str:
title_tokens = signature(known_title).split()
if not title_tokens:
return ""
words = component_words(text)
word_tokens = [signature(word) for word in words]
if len(word_tokens) <= len(title_tokens):
return ""
if word_tokens[-len(title_tokens) :] == title_tokens:
candidate = collapse_spaces(" ".join(words[: len(word_tokens) - len(title_tokens)])).strip("- ")
if is_author_like(candidate):
return clean_component(candidate)
if word_tokens[: len(title_tokens)] == title_tokens:
candidate = collapse_spaces(" ".join(words[len(title_tokens) :])).strip("- ")
if is_author_like(candidate):
return clean_component(candidate)
return ""
def normalize_author_order_from_context(author: str, contexts: list[str]) -> str:
words = component_words(author)
if len(words) < 2:
return ""
token_orders = []
reversed_tokens = [signature(word) for word in reversed(words)]
if reversed_tokens:
token_orders.append(reversed_tokens)
rotated_tokens = [signature(word) for word in [words[-1], *words[:-1]]]
if rotated_tokens and rotated_tokens != reversed_tokens:
token_orders.append(rotated_tokens)
for text in contexts:
for token_order in token_orders:
match = find_token_sequence(text, token_order)
if match:
return clean_component(match)
return ""
def strip_title_suffix(text: str, title: str) -> str:
cleaned = clean_component(text)
cleaned_tokens = cleaned.split()
title_words = component_words(title)
if not cleaned_tokens or not title_words:
return text
if len(cleaned_tokens) <= len(title_words):
return text
suffix = " ".join(cleaned_tokens[-len(title_words) :])
if not same_signature_tokens(suffix, title):
return text
stripped = collapse_spaces(" ".join(cleaned_tokens[: -len(title_words)]))
return stripped.strip("- ").strip()
TRAILING_SERIES_SEQ_PATTERNS = [
re.compile(r"^(.+?)\s+(?:tom|vol(?:ume)?|book|t|cz(?:eść|esc)?)\.?\s*(\d+(?:[.,]\d+)?)$", re.IGNORECASE),
re.compile(r"^(.+?)\s+(\d+(?:[.,]\d+)?)$"),
]
INLINE_SERIES_SEQ_TITLE_PATTERNS = [
re.compile(r"^(.+?)\s+(?:tom|vol(?:ume)?|book|t|cz(?:eść|esc)?)\.?\s*(\d+(?:[.,]\d+)?)\s+(.+)$", re.IGNORECASE),
re.compile(r"^(.+?)\s+(0\d+(?:[.,]\d+)?)\s+(.+)$"),
]
def extract_trailing_series_sequence(text: str) -> tuple[str, str]:
cleaned = clean_component(text).strip("- ")
if not cleaned:
return "", ""
for pattern in TRAILING_SERIES_SEQ_PATTERNS:
match = pattern.match(cleaned)
if not match:
continue
series, seq = (collapse_spaces(part) for part in match.groups())
if len(seq) < 4:
return series, seq
return cleaned, ""
def extract_inline_series_sequence_title(text: str) -> tuple[str, str, str]:
cleaned = clean_component(text).strip("- ")
if not cleaned:
return "", "", ""
for pattern in INLINE_SERIES_SEQ_TITLE_PATTERNS:
match = pattern.match(cleaned)
if not match:
continue
series, sequence, title = (collapse_spaces(part) for part in match.groups())
if len(sequence) < 4:
return series, sequence, title
return "", "", ""
def normalize_series_candidate(text: str, author: str, title_hints: list[str]) -> tuple[str, str]:
working = clean_component(text)
if not working:
return "", ""
if author:
stripped = strip_author_prefix(working, author)
if stripped != working:
working = stripped
working, _ = extract_narrator(working)
working, _ = strip_noise(working)
for title_hint in sorted(
{clean_component(hint) for hint in title_hints if hint},
key=lambda hint: len(component_words(hint)),
):
stripped = strip_title_suffix(working, title_hint)
if stripped != working:
working = stripped
break
working = clean_component(working).strip("- ")
if not working:
return "", ""
series, sequence, parsed_title = extract_inline_series_sequence_title(working)
if series:
if not looks_generic(series) and not (author and contains_signature_tokens(series, author)):
if not title_hints or any(contains_signature_tokens(title_hint, parsed_title) for title_hint in title_hints if title_hint):
return series, sequence
series, sequence = extract_trailing_series_sequence(working)
if not series or looks_generic(series):
return "", ""
if author and contains_signature_tokens(series, author):
return "", ""
for title_hint in title_hints:
if title_hint and contains_signature_tokens(series, title_hint):
return "", ""
return series, sequence
def infer_series_from_context(parts: list[str], author_index: int, author: str, title_hints: list[str]) -> tuple[str, str]:
start = author_index + 1 if author_index >= 0 else 0
candidates = parts[start:]
for part in reversed(candidates):
series, sequence = normalize_series_candidate(part, author, title_hints)
if series:
return series, sequence
return "", ""
def series_needs_normalization(series: str, author: str, title: str) -> bool:
if not series:
return True
if looks_generic(series):
return True
if author and contains_signature_tokens(series, author):
return True
if title and contains_signature_tokens(series, title):
return True
return False
def strip_series_prefix(text: str, series: str, sequence: str) -> str:
cleaned_tokens = clean_component(text).split()
series_tokens = component_words(series)
if not cleaned_tokens or not series_tokens or len(cleaned_tokens) <= len(series_tokens):
return text
prefix = " ".join(cleaned_tokens[: len(series_tokens)])
if signature(prefix) != signature(series):
return text
index = len(series_tokens)
if sequence and index < len(cleaned_tokens):
next_token = signature(cleaned_tokens[index])
if next_token == signature(sequence):
index += 1
elif index + 1 < len(cleaned_tokens) and signature(cleaned_tokens[index]) in {"tom", "t", "vol", "volume", "book"} and signature(cleaned_tokens[index + 1]) == signature(sequence):
index += 2
stripped = collapse_spaces(" ".join(cleaned_tokens[index:])).strip("- ")
return stripped or text
def sanitize_component(text: str) -> str:
value = ascii_fold(collapse_spaces(text.replace("/", "-").replace("\\", "-")))
value = value.strip(". ")
return value or "__REVIEW__"
def extract_series_sequence_title(text: str) -> tuple[str, str, str]:
for pattern in SERIES_SEQ_PATTERNS:
match = pattern.match(text)
if not match:
continue
series, sequence, title = (collapse_spaces(part) for part in match.groups())
if len(sequence) >= 4:
continue
return series, sequence, title
return "", "", text
def title_from_filenames(audio_files: list[Path]) -> str:
stems: list[str] = []
for path in audio_files[:12]:
stem = clean_component(path.stem)
stem = TRACK_PREFIX_RE.sub("", stem)
stem = collapse_spaces(stem)
if stem and not stem.isdigit():
stems.append(stem)
unique = {signature(stem): stem for stem in stems if stem}
if len(unique) == 1:
return next(iter(unique.values()))
return ""
def parse_opf(path: Path) -> dict[str, str]:
try:
root = ET.parse(path).getroot()
except Exception:
return {}
values: dict[str, str] = {}
wanted = {
"title",
"author",
"narrator",
"publishYear",
"publisher",
"isbn",
"description",
"genres",
"language",
"series",
"volumeNumber",
}
for element in root.iter():
tag = element.tag.rsplit("}", 1)[-1]
if tag in wanted and element.text and tag not in values:
values[tag] = collapse_spaces(element.text)
return values
def collect_book_roots(root: Path) -> list[BookRoot]:
books: dict[Path, BookRoot] = {}
for dirpath_str, _, filenames in os.walk(root):
dirpath = Path(dirpath_str)
audio_files = [dirpath / name for name in sorted(filenames) if Path(name).suffix.lower() in AUDIO_EXTS]
if not audio_files:
continue
book_root = dirpath.parent if DISC_RE.match(clean_component(dirpath.name)) else dirpath
book = books.setdefault(book_root, BookRoot(path=book_root))
book.audio_files.extend(audio_files)
return sorted(books.values(), key=lambda book: str(book.path))
def choose_author(parts: list[str], filename_title: str = "") -> tuple[str, int, str, str]:
if filename_title:
for index in range(len(parts) - 1, -1, -1):
author = extract_author_from_title_context(parts[index], filename_title)
if author:
return author, index, "title-context", author_confidence_for(author)
for index, part in enumerate(parts):
if is_author_like(part):
return clean_component(part), index, "folder", author_confidence_for(part)
best_author = ""
best_index = -1
best_score = -1
for index, part in enumerate(parts):
author, title = split_author_title(part)
if author:
score = author_title_candidate_score(author, title)
if score > best_score:
best_author = author
best_index = index
best_score = score
if best_author:
return best_author, best_index, "mixed-folder", "medium"
return "", -1, "missing", "none"
def choose_series(parts: list[str], author_index: int, leaf: str, author: str) -> tuple[str, list[str]]:
notes: list[str] = []
if author_index < 0:
candidates = parts[:-1]
else:
candidates = parts[author_index + 1 : -1]
leaf_sig = signature(leaf)
author_sig = signature(author)
for part in candidates:
cleaned = clean_component(part)
if not cleaned:
continue
if looks_generic(cleaned):
notes.append(f"ignored grouping folder '{cleaned}'")
continue
if signature(cleaned) in {leaf_sig, author_sig}:
continue
_, possible_title = split_author_title(cleaned)
if possible_title and signature(possible_title) == leaf_sig:
continue
return cleaned, notes
return "", notes
def infer_book(root: Path, library_root: Path, audio_files: list[Path]) -> dict[str, str]:
rel_parts = [clean_component(part) for part in root.relative_to(library_root).parts]
leaf = rel_parts[-1]
notes: list[str] = []
filename_title = title_from_filenames(audio_files)
opf_path = next((path for path in root.iterdir() if path.suffix.lower() == ".opf"), None)
opf = parse_opf(opf_path) if opf_path else {}
if opf_path:
notes.append(f"loaded sidecar metadata from {opf_path.name}")
author, author_index, author_source, author_confidence = choose_author(rel_parts, filename_title)
if author_source == "missing":
notes.append("author not confidently identifiable from path")
elif author_confidence == "medium":
notes.append("author inferred from a weak path signal")
series, series_notes = choose_series(rel_parts, author_index, leaf, author)
notes.extend(series_notes)
narrator = opf.get("narrator", "")
year = opf.get("publishYear", "")
sequence = opf.get("volumeNumber", "")
title = opf.get("title", "")
if opf.get("author"):
author = opf["author"]
author_source = "opf"
author_confidence = "high"
notes.append("author taken from OPF sidecar")
if opf.get("series"):
series = opf["series"]
if title:
title_source = "opf"
else:
title_source = "path"
title = leaf
if author:
stripped = strip_author_prefix(title, author)
if stripped != title:
title = stripped
leaf_author, leaf_title = split_author_title(leaf)
if leaf_title and same_signature_tokens(leaf_author, author):
title = leaf_title
title, narrator_from_path = extract_narrator(title)
if narrator_from_path and not narrator:
narrator = narrator_from_path
notes.append("narrator inferred from folder name")
title, noise_notes = strip_noise(title)
notes.extend(noise_notes)
title, year_from_path = extract_year(title)
if year_from_path and not year:
year = year_from_path
title, sequence_from_path = extract_sequence(title)
if sequence_from_path and not sequence:
sequence = sequence_from_path
notes.append("sequence inferred from folder name")
if not series:
series_from_title, seq_from_series_title, stripped_title = extract_series_sequence_title(title)
if series_from_title:
series = series_from_title
title = stripped_title
if seq_from_series_title and not sequence:
sequence = seq_from_series_title
notes.append("series inferred from folder name")
if filename_title and author:
stripped_filename_title = strip_author_prefix(filename_title, author)
if stripped_filename_title:
filename_title = stripped_filename_title
if not title or signature(title) == signature(author):
if filename_title:
title = filename_title
title_source = "filename"
notes.append("title inferred from repeated audio filename stem")
title_hints = [title]
if filename_title:
title_hints.append(filename_title)
normalized_series, normalized_sequence = infer_series_from_context(rel_parts, author_index, author, title_hints)
if normalized_series and series_needs_normalization(series, author, title):
series = normalized_series
notes.append("series normalized from folder context")
if normalized_sequence and not sequence:
sequence = normalized_sequence
notes.append("sequence inferred from folder context")
if series and title:
stripped_title = strip_series_prefix(title, series, sequence)
if stripped_title != title:
title = stripped_title
if author and not opf.get("author"):
context_texts = [leaf] if author_index != len(rel_parts) - 1 else []
context_texts.extend(clean_component(path.stem) for path in audio_files[:12])
if author_source == "mixed-folder" and ("-" in author or len(component_words(author)) > 2):
context_texts.extend(part for index, part in enumerate(rel_parts) if index != author_index)
normalized_author = normalize_author_order_from_context(author, context_texts)
if normalized_author and signature(normalized_author) != signature(author):
author = normalized_author
notes.append("author order normalized from current folder/file name")
title = collapse_spaces(title)
if not title:
title = sanitize_component(root.name)
notes.append("title fallback came from raw folder name")
if author_index > 0:
notes.append(f"author came from nested folder '{rel_parts[author_index]}'")
if author_index < 0 and len(rel_parts) > 1 and looks_generic(rel_parts[0]):
notes.append(f"top-level folder '{rel_parts[0]}' looks like a generic bucket")
status = "ready" if title and author_confidence in {"high", "medium"} else "review"
author_folder = sanitize_component(author) if author else "__AUTHOR_REVIEW__"
title_bits = []
if sequence:
title_bits.append(f"Vol. {sequence}")
if year:
title_bits.append(year)
title_bits.append(title)
title_folder = sanitize_component(" - ".join(bit for bit in title_bits if bit))
if narrator:
title_folder = f"{title_folder} {{{sanitize_component(narrator)}}}"
proposed_parts = [author_folder]
if series:
proposed_parts.append(sanitize_component(series))
proposed_parts.append(title_folder)
proposed_path = "/".join(proposed_parts)
return {
"verification_status": "unverified",
"verification_source": "",
"status": status,
"current_path": str(root),
"audio_file_count": str(len(audio_files)),
"sample_audio_file": audio_files[0].name if audio_files else "",
"author": author,
"author_confidence": author_confidence,
"author_source": author_source,
"series": series,
"sequence": sequence,
"publish_year": year,
"title": title,
"title_source": title_source,
"narrator": narrator,
"proposed_abs_path": proposed_path,
"notes": "; ".join(dict.fromkeys(note for note in notes if note)),
}
def load_overrides(path: Path | None) -> dict[str, dict[str, str]]:
if not path or not path.exists():
return {}
with path.open(encoding="utf-8", newline="") as handle:
rows = list(csv.DictReader(handle, delimiter="\t"))
return {row["current_path"]: row for row in rows if row.get("current_path")}
def apply_override(row: dict[str, str], override: dict[str, str]) -> dict[str, str]:
updated = dict(row)
for key, value in override.items():
if key == "current_path":
continue
if key in {"verified_author", "verified_series", "verified_sequence", "verified_publish_year", "verified_title", "verified_narrator"}:
continue
if value:
updated[key] = value
mapping = {
"verified_author": "author",
"verified_series": "series",
"verified_sequence": "sequence",
"verified_publish_year": "publish_year",
"verified_title": "title",
"verified_narrator": "narrator",
}
for source_key, target_key in mapping.items():
value = collapse_spaces(override.get(source_key, ""))
if value:
updated[target_key] = value
updated["author_source"] = "verified-override" if updated.get("author") else updated["author_source"]
updated["author_confidence"] = "high" if updated.get("author") else updated["author_confidence"]
updated["verification_status"] = override.get("verification_status", "verified") or "verified"
updated["verification_source"] = override.get("verification_source", "")
if override.get("verification_note"):
notes = [note for note in [updated.get("notes", ""), override["verification_note"]] if note]
updated["notes"] = "; ".join(dict.fromkeys(notes))
author_folder = sanitize_component(updated["author"]) if updated.get("author") else "__AUTHOR_REVIEW__"
title_bits = []
if updated.get("sequence"):
title_bits.append(f"Vol. {updated['sequence']}")
if updated.get("publish_year"):
title_bits.append(updated["publish_year"])
title_bits.append(updated["title"])
title_folder = sanitize_component(" - ".join(bit for bit in title_bits if bit))
if updated.get("narrator"):
title_folder = f"{title_folder} {{{sanitize_component(updated['narrator'])}}}"
proposed_parts = [author_folder]
if updated.get("series"):
proposed_parts.append(sanitize_component(updated["series"]))
proposed_parts.append(title_folder)
updated["proposed_abs_path"] = "/".join(proposed_parts)
updated["status"] = "ready" if updated.get("author") else "review"
return updated
def main() -> int:
parser = argparse.ArgumentParser(
description="Generate a non-destructive Audiobookshelf path proposal report."
)
parser.add_argument(
"--root",
default="/mnt/nextcloudExtDS/Ksiazki/Audiobooki",
help="Path to the current audiobook library",
)
parser.add_argument(
"--output",
default="reports/audiobookshelf_mock_report.tsv",
help="TSV output path",
)
parser.add_argument(
"--overrides",
default="data/verified_author_overrides.tsv",
help="Optional TSV with verified metadata overrides",
)
args = parser.parse_args()
library_root = Path(args.root).resolve()
output_path = Path(args.output).resolve()
output_path.parent.mkdir(parents=True, exist_ok=True)
overrides = load_overrides(Path(args.overrides).resolve())
books = collect_book_roots(library_root)
rows = []
for book in books:
row = infer_book(book.path, library_root, sorted(book.audio_files))
override = overrides.get(row["current_path"])
if override:
row = apply_override(row, override)
rows.append(row)
fieldnames = [
"verification_status",
"verification_source",
"verification_note",
"status",
"current_path",
"audio_file_count",
"sample_audio_file",
"author",
"author_confidence",
"author_source",
"series",
"sequence",
"publish_year",
"title",
"title_source",
"narrator",
"proposed_abs_path",
"notes",
]
with output_path.open("w", encoding="utf-8", newline="") as handle:
writer = csv.DictWriter(handle, fieldnames=fieldnames, delimiter="\t")
writer.writeheader()
writer.writerows(rows)
ready = sum(1 for row in rows if row["status"] == "ready")
review = len(rows) - ready
print(f"library_root\t{library_root}")
print(f"report\t{output_path}")
print(f"books\t{len(rows)}")
print(f"ready\t{ready}")
print(f"review\t{review}")
return 0
if __name__ == "__main__":
raise SystemExit(main())