949 lines
32 KiB
Python
949 lines
32 KiB
Python
#!/usr/bin/env python3
|
||
|
||
from __future__ import annotations
|
||
|
||
import argparse
|
||
import csv
|
||
import os
|
||
import re
|
||
import unicodedata
|
||
import xml.etree.ElementTree as ET
|
||
from dataclasses import dataclass, field
|
||
from pathlib import Path
|
||
|
||
# Manual verification workflow:
|
||
# - `verification_source` in `data/verified_author_overrides.tsv` should point to
|
||
# the page used to confirm corrected metadata.
|
||
# - `lubimyczytac.pl` is an accepted auxiliary source for author, title, and
|
||
# series/cycle verification when audiobook storefront metadata is missing or
|
||
# ambiguous.
|
||
|
||
AUDIO_EXTS = {
|
||
".aac",
|
||
".flac",
|
||
".m4a",
|
||
".m4b",
|
||
".mp3",
|
||
".ogg",
|
||
".opus",
|
||
".wav",
|
||
}
|
||
|
||
DISC_RE = re.compile(r"^(disc|cd|disk)\s*0*\d+$", re.IGNORECASE)
|
||
TRACK_PREFIX_RE = re.compile(r"^\s*(cd|disc|disk)?\s*\d+\s*([._ -]+)\s*", re.IGNORECASE)
|
||
YEAR_RE = re.compile(r"^\(?((?:19|20)\d{2})\)?\s*-\s*(.+)$")
|
||
LEADING_SEQ_RE = re.compile(r"^\[?(\d+(?:[.,]\d+)?)\]?\s*(?:[.-]\s+|\s+-\s+)(.+)$")
|
||
SEQ_LABEL_RE = re.compile(
|
||
r"(?i)(?:^| - )(?:(?:vol(?:ume)?|book|tom|t|cz(?:eść|esc)?)\.?\s*)(\d+(?:[.,]\d+)?)"
|
||
)
|
||
BRACKET_SEQ_RE = re.compile(r"^\[(\d+(?:[.,]\d+)?)\]\s*(.+)$")
|
||
SERIES_SEQ_PATTERNS = [
|
||
re.compile(r"^(.+?)\s+(?:tom|vol(?:ume)?|book)\.?\s*(\d+(?:[.,]\d+)?)\s*-\s*(.+)$", re.IGNORECASE),
|
||
re.compile(r"^(.+?)\s*-\s*(\d+(?:[.,]\d+)?)\s+(.+)$"),
|
||
]
|
||
WORD_TOKEN_RE = re.compile(r"[A-Za-zÀ-ÖØ-öø-ÿĀ-žĄĆĘŁŃÓŚŹŻąćęłńóśźż0-9&']+")
|
||
NARRATOR_PATTERNS = [
|
||
re.compile(r"(?i)\[(?:czyta|czyt\.?|lektor|read by)\s+([^\]]+)\]"),
|
||
re.compile(r"(?i)\((?:czyta|czyt\.?|lektor|read by)\s+([^)]+)\)"),
|
||
re.compile(r"(?i)(?:^| - |\s)(?:czyta|czyt\.?|lektor|read by)\s+(.+)$"),
|
||
]
|
||
NOISE_PATTERNS = [
|
||
re.compile(r"(?i)\[[^]]*kbps[^]]*\]"),
|
||
re.compile(r"(?i)\[[^]]*czas[^]]*\]"),
|
||
re.compile(r"(?i)\([^)]*(?:trawel|hex-lub|lapiejko|elgrande|kwgr)[^)]*\)"),
|
||
]
|
||
GENERIC_KEYWORDS = {
|
||
"angielski",
|
||
"audiobook",
|
||
"audiobooki",
|
||
"biblioteka",
|
||
"brakuje",
|
||
"brakujace",
|
||
"czyta",
|
||
"czyt",
|
||
"cykl",
|
||
"do przeczytania",
|
||
"eng",
|
||
"kolekcja",
|
||
"lekt",
|
||
"lektor",
|
||
"magazyn",
|
||
"nowe",
|
||
"sluchowisko",
|
||
}
|
||
NAME_PARTICLES = {"al", "bin", "da", "de", "del", "di", "la", "le", "van", "von", "and"}
|
||
TITLE_CONNECTORS = {"a", "and", "bez", "dla", "do", "for", "i", "na", "nad", "o", "of", "po", "pod", "przez", "the", "to", "w", "z", "ze"}
|
||
|
||
|
||
@dataclass
|
||
class BookRoot:
|
||
path: Path
|
||
audio_files: list[Path] = field(default_factory=list)
|
||
|
||
|
||
def normalize_unicode(text: str) -> str:
|
||
return unicodedata.normalize("NFKC", text)
|
||
|
||
|
||
def collapse_spaces(text: str) -> str:
|
||
return re.sub(r"\s+", " ", text).strip()
|
||
|
||
|
||
def clean_component(raw: str) -> str:
|
||
text = normalize_unicode(raw)
|
||
text = text.replace("_", " ")
|
||
text = text.replace(".", " ")
|
||
text = text.replace("–", " - ")
|
||
text = text.replace("—", " - ")
|
||
text = collapse_spaces(text)
|
||
text = re.sub(r"\s*-\s*", " - ", text)
|
||
return collapse_spaces(text)
|
||
|
||
|
||
def ascii_fold(text: str) -> str:
|
||
special_map = str.maketrans({
|
||
"Ł": "L",
|
||
"ł": "l",
|
||
})
|
||
normalized = unicodedata.normalize("NFKD", text.translate(special_map))
|
||
return "".join(ch for ch in normalized if not unicodedata.combining(ch))
|
||
|
||
|
||
def signature(text: str) -> str:
|
||
folded = ascii_fold(clean_component(text)).lower()
|
||
folded = re.sub(r"[^a-z0-9]+", " ", folded)
|
||
return collapse_spaces(folded)
|
||
|
||
|
||
def same_signature_tokens(left: str, right: str) -> bool:
|
||
left_tokens = signature(left).split()
|
||
right_tokens = signature(right).split()
|
||
if not left_tokens or not right_tokens:
|
||
return False
|
||
return sorted(left_tokens) == sorted(right_tokens)
|
||
|
||
|
||
def contains_signature_tokens(container: str, text: str) -> bool:
|
||
container_tokens = set(signature(container).split())
|
||
needle_tokens = set(signature(text).split())
|
||
if not container_tokens or not needle_tokens:
|
||
return False
|
||
return needle_tokens.issubset(container_tokens)
|
||
|
||
|
||
def looks_generic(text: str) -> bool:
|
||
sig = signature(text)
|
||
if not sig:
|
||
return True
|
||
if any(keyword in sig for keyword in GENERIC_KEYWORDS):
|
||
return True
|
||
if re.search(r"\d", sig) and len(sig.split()) <= 4:
|
||
return True
|
||
return False
|
||
|
||
|
||
def author_confidence_for(name: str) -> str:
|
||
sig = signature(name)
|
||
if not sig:
|
||
return "none"
|
||
tokens = sig.split()
|
||
initials = sum(1 for token in tokens if len(token) == 1)
|
||
if 2 <= len(tokens) <= 4:
|
||
return "high"
|
||
if len(tokens) == 1 or initials:
|
||
return "medium"
|
||
return "low"
|
||
|
||
|
||
def is_author_like(text: str) -> bool:
|
||
cleaned = clean_component(text)
|
||
if not cleaned or looks_generic(cleaned):
|
||
return False
|
||
if " - " in cleaned:
|
||
return False
|
||
allowed = re.sub(r"[A-Za-zÀ-ÖØ-öø-ÿĀ-žĄĆĘŁŃÓŚŹŻąćęłńóśźż0-9,&' /-]", "", cleaned)
|
||
if allowed:
|
||
return False
|
||
if re.search(r"\d", cleaned):
|
||
return False
|
||
sig = signature(cleaned)
|
||
tokens = sig.split()
|
||
if not 1 <= len(tokens) <= 5:
|
||
return False
|
||
if all(len(token) == 1 for token in tokens):
|
||
return False
|
||
initials = sum(1 for token in tokens if len(token) == 1)
|
||
if len(tokens) == 5 and initials == 0:
|
||
return False
|
||
raw_tokens = re.findall(r"[A-Za-zÀ-ÖØ-öø-ÿĀ-žĄĆĘŁŃÓŚŹŻąćęłńóśźż]+", cleaned)
|
||
for token in raw_tokens:
|
||
if len(token) == 1:
|
||
continue
|
||
if token[0].islower() and token.lower() not in NAME_PARTICLES:
|
||
return False
|
||
return True
|
||
|
||
|
||
def strip_noise(text: str) -> tuple[str, list[str]]:
|
||
notes: list[str] = []
|
||
result = text
|
||
for pattern in NOISE_PATTERNS:
|
||
if pattern.search(result):
|
||
notes.append("removed technical/uploader marker")
|
||
result = pattern.sub("", result)
|
||
return collapse_spaces(result), notes
|
||
|
||
|
||
def extract_narrator(text: str) -> tuple[str, str]:
|
||
result = text
|
||
narrator = ""
|
||
for pattern in NARRATOR_PATTERNS:
|
||
match = pattern.search(result)
|
||
if not match:
|
||
continue
|
||
narrator = collapse_spaces(match.group(1))
|
||
result = collapse_spaces(pattern.sub("", result))
|
||
break
|
||
return result, narrator
|
||
|
||
|
||
def extract_year(text: str) -> tuple[str, str]:
|
||
match = YEAR_RE.match(text)
|
||
if not match:
|
||
return text, ""
|
||
return collapse_spaces(match.group(2)), match.group(1)
|
||
|
||
|
||
def extract_sequence(text: str) -> tuple[str, str]:
|
||
working = text.lstrip("- ").strip()
|
||
|
||
match = BRACKET_SEQ_RE.match(working)
|
||
if match:
|
||
return collapse_spaces(match.group(2)), match.group(1)
|
||
|
||
match = LEADING_SEQ_RE.match(working)
|
||
if match:
|
||
seq = match.group(1)
|
||
if len(seq) < 4:
|
||
return collapse_spaces(match.group(2)), seq
|
||
|
||
match = SEQ_LABEL_RE.search(working)
|
||
if not match:
|
||
return working, ""
|
||
|
||
seq = match.group(1)
|
||
start, end = match.span()
|
||
stripped = collapse_spaces((working[:start] + working[end:]).strip(" -"))
|
||
return stripped, seq
|
||
|
||
|
||
def strip_author_prefix(text: str, author: str) -> str:
|
||
cleaned_tokens = component_words(text)
|
||
author_words = component_words(author)
|
||
if not cleaned_tokens or not author_words:
|
||
return text
|
||
|
||
if len(cleaned_tokens) <= len(author_words):
|
||
return text
|
||
|
||
prefix = " ".join(cleaned_tokens[: len(author_words)])
|
||
if not same_signature_tokens(prefix, author):
|
||
return text
|
||
|
||
stripped = collapse_spaces(" ".join(cleaned_tokens[len(author_words) :]))
|
||
return stripped.lstrip("- ").strip()
|
||
|
||
|
||
def normalize_author_segments(segments: list[str]) -> str:
|
||
cleaned = [collapse_spaces(segment) for segment in segments if collapse_spaces(segment)]
|
||
if not cleaned:
|
||
return ""
|
||
if len(cleaned) == 1:
|
||
return cleaned[0]
|
||
return "-".join(cleaned)
|
||
|
||
|
||
def is_author_like_flexible(text: str) -> bool:
|
||
if is_author_like(text):
|
||
return True
|
||
if "-" in text:
|
||
return is_author_like(text.replace("-", " "))
|
||
return False
|
||
|
||
|
||
def author_title_candidate_score(author: str, title: str) -> int:
|
||
author_words = len(component_words(author))
|
||
title_tokens = signature(title).split()
|
||
score = author_words * 10 + min(len(title_tokens), 6)
|
||
if author_words == 1:
|
||
score -= 2
|
||
if re.search(r"\d", title):
|
||
score += 2
|
||
if " - " in title:
|
||
score += 1
|
||
return score
|
||
|
||
|
||
def split_author_title(text: str) -> tuple[str, str]:
|
||
cleaned = clean_component(text)
|
||
if " - " in cleaned:
|
||
parts = [collapse_spaces(part) for part in cleaned.split(" - ") if collapse_spaces(part)]
|
||
best_author = ""
|
||
best_title = ""
|
||
best_score = -1
|
||
|
||
for size in range(1, len(parts)):
|
||
left_author = normalize_author_segments(parts[:size])
|
||
right_title = collapse_spaces(" - ".join(parts[size:]))
|
||
if left_author and right_title and is_author_like_flexible(left_author):
|
||
score = author_title_candidate_score(left_author, right_title)
|
||
if score > best_score:
|
||
best_author = left_author
|
||
best_title = right_title
|
||
best_score = score
|
||
|
||
right_author = normalize_author_segments(parts[size:])
|
||
left_title = collapse_spaces(" - ".join(parts[:size]))
|
||
if right_author and left_title and is_author_like_flexible(right_author):
|
||
score = author_title_candidate_score(right_author, left_title) - 1
|
||
if score > best_score:
|
||
best_author = right_author
|
||
best_title = left_title
|
||
best_score = score
|
||
|
||
if best_score >= 0:
|
||
return best_author, best_title
|
||
|
||
tokens = cleaned.split()
|
||
for size in range(2, min(5, len(tokens) - 1) + 1):
|
||
author = " ".join(tokens[:size])
|
||
title = " ".join(tokens[size:])
|
||
if not is_author_like(author):
|
||
continue
|
||
title_tokens = signature(title).split()
|
||
if TITLE_CONNECTORS.intersection(title_tokens):
|
||
return author, title
|
||
return "", ""
|
||
|
||
|
||
def component_words(text: str) -> list[str]:
|
||
return WORD_TOKEN_RE.findall(clean_component(text))
|
||
|
||
|
||
def find_token_sequence(text: str, sequence_tokens: list[str]) -> str:
|
||
if not sequence_tokens:
|
||
return ""
|
||
|
||
words = component_words(text)
|
||
word_tokens = [signature(word) for word in words]
|
||
for start in range(0, len(word_tokens) - len(sequence_tokens) + 1):
|
||
if word_tokens[start : start + len(sequence_tokens)] == sequence_tokens:
|
||
return collapse_spaces(" ".join(words[start : start + len(sequence_tokens)]))
|
||
return ""
|
||
|
||
|
||
def extract_author_from_title_context(text: str, known_title: str) -> str:
|
||
title_tokens = signature(known_title).split()
|
||
if not title_tokens:
|
||
return ""
|
||
|
||
words = component_words(text)
|
||
word_tokens = [signature(word) for word in words]
|
||
if len(word_tokens) <= len(title_tokens):
|
||
return ""
|
||
|
||
if word_tokens[-len(title_tokens) :] == title_tokens:
|
||
candidate = collapse_spaces(" ".join(words[: len(word_tokens) - len(title_tokens)])).strip("- ")
|
||
if is_author_like(candidate):
|
||
return clean_component(candidate)
|
||
|
||
if word_tokens[: len(title_tokens)] == title_tokens:
|
||
candidate = collapse_spaces(" ".join(words[len(title_tokens) :])).strip("- ")
|
||
if is_author_like(candidate):
|
||
return clean_component(candidate)
|
||
|
||
return ""
|
||
|
||
|
||
def normalize_author_order_from_context(author: str, contexts: list[str]) -> str:
|
||
words = component_words(author)
|
||
if len(words) < 2:
|
||
return ""
|
||
|
||
token_orders = []
|
||
reversed_tokens = [signature(word) for word in reversed(words)]
|
||
if reversed_tokens:
|
||
token_orders.append(reversed_tokens)
|
||
rotated_tokens = [signature(word) for word in [words[-1], *words[:-1]]]
|
||
if rotated_tokens and rotated_tokens != reversed_tokens:
|
||
token_orders.append(rotated_tokens)
|
||
|
||
for text in contexts:
|
||
for token_order in token_orders:
|
||
match = find_token_sequence(text, token_order)
|
||
if match:
|
||
return clean_component(match)
|
||
return ""
|
||
|
||
|
||
def strip_title_suffix(text: str, title: str) -> str:
|
||
cleaned = clean_component(text)
|
||
cleaned_tokens = cleaned.split()
|
||
title_words = component_words(title)
|
||
if not cleaned_tokens or not title_words:
|
||
return text
|
||
if len(cleaned_tokens) <= len(title_words):
|
||
return text
|
||
|
||
suffix = " ".join(cleaned_tokens[-len(title_words) :])
|
||
if not same_signature_tokens(suffix, title):
|
||
return text
|
||
|
||
stripped = collapse_spaces(" ".join(cleaned_tokens[: -len(title_words)]))
|
||
return stripped.strip("- ").strip()
|
||
|
||
|
||
TRAILING_SERIES_SEQ_PATTERNS = [
|
||
re.compile(r"^(.+?)\s+(?:tom|vol(?:ume)?|book|t|cz(?:eść|esc)?)\.?\s*(\d+(?:[.,]\d+)?)$", re.IGNORECASE),
|
||
re.compile(r"^(.+?)\s+(\d+(?:[.,]\d+)?)$"),
|
||
]
|
||
INLINE_SERIES_SEQ_TITLE_PATTERNS = [
|
||
re.compile(r"^(.+?)\s+(?:tom|vol(?:ume)?|book|t|cz(?:eść|esc)?)\.?\s*(\d+(?:[.,]\d+)?)\s+(.+)$", re.IGNORECASE),
|
||
re.compile(r"^(.+?)\s+(0\d+(?:[.,]\d+)?)\s+(.+)$"),
|
||
]
|
||
|
||
|
||
def extract_trailing_series_sequence(text: str) -> tuple[str, str]:
|
||
cleaned = clean_component(text).strip("- ")
|
||
if not cleaned:
|
||
return "", ""
|
||
|
||
for pattern in TRAILING_SERIES_SEQ_PATTERNS:
|
||
match = pattern.match(cleaned)
|
||
if not match:
|
||
continue
|
||
series, seq = (collapse_spaces(part) for part in match.groups())
|
||
if len(seq) < 4:
|
||
return series, seq
|
||
return cleaned, ""
|
||
|
||
|
||
def extract_inline_series_sequence_title(text: str) -> tuple[str, str, str]:
|
||
cleaned = clean_component(text).strip("- ")
|
||
if not cleaned:
|
||
return "", "", ""
|
||
|
||
for pattern in INLINE_SERIES_SEQ_TITLE_PATTERNS:
|
||
match = pattern.match(cleaned)
|
||
if not match:
|
||
continue
|
||
series, sequence, title = (collapse_spaces(part) for part in match.groups())
|
||
if len(sequence) < 4:
|
||
return series, sequence, title
|
||
return "", "", ""
|
||
|
||
|
||
def normalize_series_candidate(text: str, author: str, title_hints: list[str]) -> tuple[str, str]:
|
||
working = clean_component(text)
|
||
if not working:
|
||
return "", ""
|
||
|
||
if author:
|
||
stripped = strip_author_prefix(working, author)
|
||
if stripped != working:
|
||
working = stripped
|
||
|
||
working, _ = extract_narrator(working)
|
||
working, _ = strip_noise(working)
|
||
|
||
for title_hint in sorted(
|
||
{clean_component(hint) for hint in title_hints if hint},
|
||
key=lambda hint: len(component_words(hint)),
|
||
):
|
||
stripped = strip_title_suffix(working, title_hint)
|
||
if stripped != working:
|
||
working = stripped
|
||
break
|
||
|
||
working = clean_component(working).strip("- ")
|
||
if not working:
|
||
return "", ""
|
||
|
||
series, sequence, parsed_title = extract_inline_series_sequence_title(working)
|
||
if series:
|
||
if not looks_generic(series) and not (author and contains_signature_tokens(series, author)):
|
||
if not title_hints or any(contains_signature_tokens(title_hint, parsed_title) for title_hint in title_hints if title_hint):
|
||
return series, sequence
|
||
|
||
series, sequence = extract_trailing_series_sequence(working)
|
||
if not series or looks_generic(series):
|
||
return "", ""
|
||
if author and contains_signature_tokens(series, author):
|
||
return "", ""
|
||
for title_hint in title_hints:
|
||
if title_hint and contains_signature_tokens(series, title_hint):
|
||
return "", ""
|
||
return series, sequence
|
||
|
||
|
||
def infer_series_from_context(parts: list[str], author_index: int, author: str, title_hints: list[str]) -> tuple[str, str]:
|
||
start = author_index + 1 if author_index >= 0 else 0
|
||
candidates = parts[start:]
|
||
for part in reversed(candidates):
|
||
series, sequence = normalize_series_candidate(part, author, title_hints)
|
||
if series:
|
||
return series, sequence
|
||
return "", ""
|
||
|
||
|
||
def series_needs_normalization(series: str, author: str, title: str) -> bool:
|
||
if not series:
|
||
return True
|
||
if looks_generic(series):
|
||
return True
|
||
if author and contains_signature_tokens(series, author):
|
||
return True
|
||
if title and contains_signature_tokens(series, title):
|
||
return True
|
||
return False
|
||
|
||
|
||
def strip_series_prefix(text: str, series: str, sequence: str) -> str:
|
||
cleaned_tokens = clean_component(text).split()
|
||
series_tokens = component_words(series)
|
||
if not cleaned_tokens or not series_tokens or len(cleaned_tokens) <= len(series_tokens):
|
||
return text
|
||
|
||
prefix = " ".join(cleaned_tokens[: len(series_tokens)])
|
||
if signature(prefix) != signature(series):
|
||
return text
|
||
|
||
index = len(series_tokens)
|
||
if sequence and index < len(cleaned_tokens):
|
||
next_token = signature(cleaned_tokens[index])
|
||
if next_token == signature(sequence):
|
||
index += 1
|
||
elif index + 1 < len(cleaned_tokens) and signature(cleaned_tokens[index]) in {"tom", "t", "vol", "volume", "book"} and signature(cleaned_tokens[index + 1]) == signature(sequence):
|
||
index += 2
|
||
|
||
stripped = collapse_spaces(" ".join(cleaned_tokens[index:])).strip("- ")
|
||
return stripped or text
|
||
|
||
|
||
def sanitize_component(text: str) -> str:
|
||
value = ascii_fold(collapse_spaces(text.replace("/", "-").replace("\\", "-")))
|
||
value = value.strip(". ")
|
||
return value or "__REVIEW__"
|
||
|
||
|
||
def extract_series_sequence_title(text: str) -> tuple[str, str, str]:
|
||
for pattern in SERIES_SEQ_PATTERNS:
|
||
match = pattern.match(text)
|
||
if not match:
|
||
continue
|
||
series, sequence, title = (collapse_spaces(part) for part in match.groups())
|
||
if len(sequence) >= 4:
|
||
continue
|
||
return series, sequence, title
|
||
return "", "", text
|
||
|
||
|
||
def title_from_filenames(audio_files: list[Path]) -> str:
|
||
stems: list[str] = []
|
||
for path in audio_files[:12]:
|
||
stem = clean_component(path.stem)
|
||
stem = TRACK_PREFIX_RE.sub("", stem)
|
||
stem = collapse_spaces(stem)
|
||
if stem and not stem.isdigit():
|
||
stems.append(stem)
|
||
unique = {signature(stem): stem for stem in stems if stem}
|
||
if len(unique) == 1:
|
||
return next(iter(unique.values()))
|
||
return ""
|
||
|
||
|
||
def parse_opf(path: Path) -> dict[str, str]:
|
||
try:
|
||
root = ET.parse(path).getroot()
|
||
except Exception:
|
||
return {}
|
||
|
||
values: dict[str, str] = {}
|
||
wanted = {
|
||
"title",
|
||
"author",
|
||
"narrator",
|
||
"publishYear",
|
||
"publisher",
|
||
"isbn",
|
||
"description",
|
||
"genres",
|
||
"language",
|
||
"series",
|
||
"volumeNumber",
|
||
}
|
||
for element in root.iter():
|
||
tag = element.tag.rsplit("}", 1)[-1]
|
||
if tag in wanted and element.text and tag not in values:
|
||
values[tag] = collapse_spaces(element.text)
|
||
return values
|
||
|
||
|
||
def collect_book_roots(root: Path) -> list[BookRoot]:
|
||
books: dict[Path, BookRoot] = {}
|
||
for dirpath_str, _, filenames in os.walk(root):
|
||
dirpath = Path(dirpath_str)
|
||
audio_files = [dirpath / name for name in sorted(filenames) if Path(name).suffix.lower() in AUDIO_EXTS]
|
||
if not audio_files:
|
||
continue
|
||
book_root = dirpath.parent if DISC_RE.match(clean_component(dirpath.name)) else dirpath
|
||
book = books.setdefault(book_root, BookRoot(path=book_root))
|
||
book.audio_files.extend(audio_files)
|
||
return sorted(books.values(), key=lambda book: str(book.path))
|
||
|
||
|
||
def choose_author(parts: list[str], filename_title: str = "") -> tuple[str, int, str, str]:
|
||
if filename_title:
|
||
for index in range(len(parts) - 1, -1, -1):
|
||
author = extract_author_from_title_context(parts[index], filename_title)
|
||
if author:
|
||
return author, index, "title-context", author_confidence_for(author)
|
||
|
||
for index, part in enumerate(parts):
|
||
if is_author_like(part):
|
||
return clean_component(part), index, "folder", author_confidence_for(part)
|
||
|
||
best_author = ""
|
||
best_index = -1
|
||
best_score = -1
|
||
for index, part in enumerate(parts):
|
||
author, title = split_author_title(part)
|
||
if author:
|
||
score = author_title_candidate_score(author, title)
|
||
if score > best_score:
|
||
best_author = author
|
||
best_index = index
|
||
best_score = score
|
||
|
||
if best_author:
|
||
return best_author, best_index, "mixed-folder", "medium"
|
||
|
||
return "", -1, "missing", "none"
|
||
|
||
|
||
def choose_series(parts: list[str], author_index: int, leaf: str, author: str) -> tuple[str, list[str]]:
|
||
notes: list[str] = []
|
||
if author_index < 0:
|
||
candidates = parts[:-1]
|
||
else:
|
||
candidates = parts[author_index + 1 : -1]
|
||
|
||
leaf_sig = signature(leaf)
|
||
author_sig = signature(author)
|
||
for part in candidates:
|
||
cleaned = clean_component(part)
|
||
if not cleaned:
|
||
continue
|
||
if looks_generic(cleaned):
|
||
notes.append(f"ignored grouping folder '{cleaned}'")
|
||
continue
|
||
if signature(cleaned) in {leaf_sig, author_sig}:
|
||
continue
|
||
_, possible_title = split_author_title(cleaned)
|
||
if possible_title and signature(possible_title) == leaf_sig:
|
||
continue
|
||
return cleaned, notes
|
||
return "", notes
|
||
|
||
|
||
def infer_book(root: Path, library_root: Path, audio_files: list[Path]) -> dict[str, str]:
|
||
rel_parts = [clean_component(part) for part in root.relative_to(library_root).parts]
|
||
leaf = rel_parts[-1]
|
||
notes: list[str] = []
|
||
filename_title = title_from_filenames(audio_files)
|
||
|
||
opf_path = next((path for path in root.iterdir() if path.suffix.lower() == ".opf"), None)
|
||
opf = parse_opf(opf_path) if opf_path else {}
|
||
if opf_path:
|
||
notes.append(f"loaded sidecar metadata from {opf_path.name}")
|
||
|
||
author, author_index, author_source, author_confidence = choose_author(rel_parts, filename_title)
|
||
if author_source == "missing":
|
||
notes.append("author not confidently identifiable from path")
|
||
elif author_confidence == "medium":
|
||
notes.append("author inferred from a weak path signal")
|
||
|
||
series, series_notes = choose_series(rel_parts, author_index, leaf, author)
|
||
notes.extend(series_notes)
|
||
|
||
narrator = opf.get("narrator", "")
|
||
year = opf.get("publishYear", "")
|
||
sequence = opf.get("volumeNumber", "")
|
||
title = opf.get("title", "")
|
||
|
||
if opf.get("author"):
|
||
author = opf["author"]
|
||
author_source = "opf"
|
||
author_confidence = "high"
|
||
notes.append("author taken from OPF sidecar")
|
||
if opf.get("series"):
|
||
series = opf["series"]
|
||
if title:
|
||
title_source = "opf"
|
||
else:
|
||
title_source = "path"
|
||
title = leaf
|
||
if author:
|
||
stripped = strip_author_prefix(title, author)
|
||
if stripped != title:
|
||
title = stripped
|
||
|
||
leaf_author, leaf_title = split_author_title(leaf)
|
||
if leaf_title and same_signature_tokens(leaf_author, author):
|
||
title = leaf_title
|
||
|
||
title, narrator_from_path = extract_narrator(title)
|
||
if narrator_from_path and not narrator:
|
||
narrator = narrator_from_path
|
||
notes.append("narrator inferred from folder name")
|
||
|
||
title, noise_notes = strip_noise(title)
|
||
notes.extend(noise_notes)
|
||
|
||
title, year_from_path = extract_year(title)
|
||
if year_from_path and not year:
|
||
year = year_from_path
|
||
|
||
title, sequence_from_path = extract_sequence(title)
|
||
if sequence_from_path and not sequence:
|
||
sequence = sequence_from_path
|
||
notes.append("sequence inferred from folder name")
|
||
|
||
if not series:
|
||
series_from_title, seq_from_series_title, stripped_title = extract_series_sequence_title(title)
|
||
if series_from_title:
|
||
series = series_from_title
|
||
title = stripped_title
|
||
if seq_from_series_title and not sequence:
|
||
sequence = seq_from_series_title
|
||
notes.append("series inferred from folder name")
|
||
|
||
if filename_title and author:
|
||
stripped_filename_title = strip_author_prefix(filename_title, author)
|
||
if stripped_filename_title:
|
||
filename_title = stripped_filename_title
|
||
|
||
if not title or signature(title) == signature(author):
|
||
if filename_title:
|
||
title = filename_title
|
||
title_source = "filename"
|
||
notes.append("title inferred from repeated audio filename stem")
|
||
|
||
title_hints = [title]
|
||
if filename_title:
|
||
title_hints.append(filename_title)
|
||
normalized_series, normalized_sequence = infer_series_from_context(rel_parts, author_index, author, title_hints)
|
||
if normalized_series and series_needs_normalization(series, author, title):
|
||
series = normalized_series
|
||
notes.append("series normalized from folder context")
|
||
if normalized_sequence and not sequence:
|
||
sequence = normalized_sequence
|
||
notes.append("sequence inferred from folder context")
|
||
if series and title:
|
||
stripped_title = strip_series_prefix(title, series, sequence)
|
||
if stripped_title != title:
|
||
title = stripped_title
|
||
|
||
if author and not opf.get("author"):
|
||
context_texts = [leaf] if author_index != len(rel_parts) - 1 else []
|
||
context_texts.extend(clean_component(path.stem) for path in audio_files[:12])
|
||
if author_source == "mixed-folder" and ("-" in author or len(component_words(author)) > 2):
|
||
context_texts.extend(part for index, part in enumerate(rel_parts) if index != author_index)
|
||
normalized_author = normalize_author_order_from_context(author, context_texts)
|
||
if normalized_author and signature(normalized_author) != signature(author):
|
||
author = normalized_author
|
||
notes.append("author order normalized from current folder/file name")
|
||
|
||
title = collapse_spaces(title)
|
||
if not title:
|
||
title = sanitize_component(root.name)
|
||
notes.append("title fallback came from raw folder name")
|
||
|
||
if author_index > 0:
|
||
notes.append(f"author came from nested folder '{rel_parts[author_index]}'")
|
||
if author_index < 0 and len(rel_parts) > 1 and looks_generic(rel_parts[0]):
|
||
notes.append(f"top-level folder '{rel_parts[0]}' looks like a generic bucket")
|
||
|
||
status = "ready" if title and author_confidence in {"high", "medium"} else "review"
|
||
author_folder = sanitize_component(author) if author else "__AUTHOR_REVIEW__"
|
||
title_bits = []
|
||
if sequence:
|
||
title_bits.append(f"Vol. {sequence}")
|
||
if year:
|
||
title_bits.append(year)
|
||
title_bits.append(title)
|
||
title_folder = sanitize_component(" - ".join(bit for bit in title_bits if bit))
|
||
if narrator:
|
||
title_folder = f"{title_folder} {{{sanitize_component(narrator)}}}"
|
||
|
||
proposed_parts = [author_folder]
|
||
if series:
|
||
proposed_parts.append(sanitize_component(series))
|
||
proposed_parts.append(title_folder)
|
||
proposed_path = "/".join(proposed_parts)
|
||
|
||
return {
|
||
"verification_status": "unverified",
|
||
"verification_source": "",
|
||
"status": status,
|
||
"current_path": str(root),
|
||
"audio_file_count": str(len(audio_files)),
|
||
"sample_audio_file": audio_files[0].name if audio_files else "",
|
||
"author": author,
|
||
"author_confidence": author_confidence,
|
||
"author_source": author_source,
|
||
"series": series,
|
||
"sequence": sequence,
|
||
"publish_year": year,
|
||
"title": title,
|
||
"title_source": title_source,
|
||
"narrator": narrator,
|
||
"proposed_abs_path": proposed_path,
|
||
"notes": "; ".join(dict.fromkeys(note for note in notes if note)),
|
||
}
|
||
|
||
|
||
def load_overrides(path: Path | None) -> dict[str, dict[str, str]]:
|
||
if not path or not path.exists():
|
||
return {}
|
||
|
||
with path.open(encoding="utf-8", newline="") as handle:
|
||
rows = list(csv.DictReader(handle, delimiter="\t"))
|
||
return {row["current_path"]: row for row in rows if row.get("current_path")}
|
||
|
||
|
||
def apply_override(row: dict[str, str], override: dict[str, str]) -> dict[str, str]:
|
||
updated = dict(row)
|
||
for key, value in override.items():
|
||
if key == "current_path":
|
||
continue
|
||
if key in {"verified_author", "verified_series", "verified_sequence", "verified_publish_year", "verified_title", "verified_narrator"}:
|
||
continue
|
||
if value:
|
||
updated[key] = value
|
||
|
||
mapping = {
|
||
"verified_author": "author",
|
||
"verified_series": "series",
|
||
"verified_sequence": "sequence",
|
||
"verified_publish_year": "publish_year",
|
||
"verified_title": "title",
|
||
"verified_narrator": "narrator",
|
||
}
|
||
for source_key, target_key in mapping.items():
|
||
value = collapse_spaces(override.get(source_key, ""))
|
||
if value:
|
||
updated[target_key] = value
|
||
|
||
updated["author_source"] = "verified-override" if updated.get("author") else updated["author_source"]
|
||
updated["author_confidence"] = "high" if updated.get("author") else updated["author_confidence"]
|
||
updated["verification_status"] = override.get("verification_status", "verified") or "verified"
|
||
updated["verification_source"] = override.get("verification_source", "")
|
||
if override.get("verification_note"):
|
||
notes = [note for note in [updated.get("notes", ""), override["verification_note"]] if note]
|
||
updated["notes"] = "; ".join(dict.fromkeys(notes))
|
||
|
||
author_folder = sanitize_component(updated["author"]) if updated.get("author") else "__AUTHOR_REVIEW__"
|
||
title_bits = []
|
||
if updated.get("sequence"):
|
||
title_bits.append(f"Vol. {updated['sequence']}")
|
||
if updated.get("publish_year"):
|
||
title_bits.append(updated["publish_year"])
|
||
title_bits.append(updated["title"])
|
||
title_folder = sanitize_component(" - ".join(bit for bit in title_bits if bit))
|
||
if updated.get("narrator"):
|
||
title_folder = f"{title_folder} {{{sanitize_component(updated['narrator'])}}}"
|
||
|
||
proposed_parts = [author_folder]
|
||
if updated.get("series"):
|
||
proposed_parts.append(sanitize_component(updated["series"]))
|
||
proposed_parts.append(title_folder)
|
||
updated["proposed_abs_path"] = "/".join(proposed_parts)
|
||
updated["status"] = "ready" if updated.get("author") else "review"
|
||
return updated
|
||
|
||
|
||
def main() -> int:
|
||
parser = argparse.ArgumentParser(
|
||
description="Generate a non-destructive Audiobookshelf path proposal report."
|
||
)
|
||
parser.add_argument(
|
||
"--root",
|
||
default="/mnt/nextcloudExtDS/Ksiazki/Audiobooki",
|
||
help="Path to the current audiobook library",
|
||
)
|
||
parser.add_argument(
|
||
"--output",
|
||
default="reports/audiobookshelf_mock_report.tsv",
|
||
help="TSV output path",
|
||
)
|
||
parser.add_argument(
|
||
"--overrides",
|
||
default="data/verified_author_overrides.tsv",
|
||
help="Optional TSV with verified metadata overrides",
|
||
)
|
||
args = parser.parse_args()
|
||
|
||
library_root = Path(args.root).resolve()
|
||
output_path = Path(args.output).resolve()
|
||
output_path.parent.mkdir(parents=True, exist_ok=True)
|
||
overrides = load_overrides(Path(args.overrides).resolve())
|
||
|
||
books = collect_book_roots(library_root)
|
||
rows = []
|
||
for book in books:
|
||
row = infer_book(book.path, library_root, sorted(book.audio_files))
|
||
override = overrides.get(row["current_path"])
|
||
if override:
|
||
row = apply_override(row, override)
|
||
rows.append(row)
|
||
|
||
fieldnames = [
|
||
"verification_status",
|
||
"verification_source",
|
||
"verification_note",
|
||
"status",
|
||
"current_path",
|
||
"audio_file_count",
|
||
"sample_audio_file",
|
||
"author",
|
||
"author_confidence",
|
||
"author_source",
|
||
"series",
|
||
"sequence",
|
||
"publish_year",
|
||
"title",
|
||
"title_source",
|
||
"narrator",
|
||
"proposed_abs_path",
|
||
"notes",
|
||
]
|
||
|
||
with output_path.open("w", encoding="utf-8", newline="") as handle:
|
||
writer = csv.DictWriter(handle, fieldnames=fieldnames, delimiter="\t")
|
||
writer.writeheader()
|
||
writer.writerows(rows)
|
||
|
||
ready = sum(1 for row in rows if row["status"] == "ready")
|
||
review = len(rows) - ready
|
||
print(f"library_root\t{library_root}")
|
||
print(f"report\t{output_path}")
|
||
print(f"books\t{len(rows)}")
|
||
print(f"ready\t{ready}")
|
||
print(f"review\t{review}")
|
||
return 0
|
||
|
||
|
||
if __name__ == "__main__":
|
||
raise SystemExit(main())
|