Prevent missing regex_patterns (#15)

This commit is contained in:
Johan 2026-01-10 17:51:12 +02:00 committed by GitHub
parent 599fd6209f
commit 783609d2e3
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
195 changed files with 1617 additions and 562 deletions

View file

@ -1,18 +1,12 @@
# /// script
# requires-python = ">=3.13"
# dependencies = [
# "markdownify",
# "pyyaml",
# ]
# ///
import os
import sys
import yaml
import yaml
from utils.custom_formats import collect_custom_formats
from utils.regex_patterns import collect_regex_patterns
from utils.profiles import collect_profiles
from utils.media_management import collect_media_management
from utils.profiles import collect_profiles
from utils.regex_patterns import collect_regex_patterns
# Prevent aliases from showing up
yaml.Dumper.ignore_aliases = lambda *args: True
@ -20,7 +14,7 @@ yaml.Dumper.ignore_aliases = lambda *args: True
def clear_output_dir(output_dir):
if not os.path.exists(output_dir):
print(f"Output directory does not exist, skipping clearing")
print("Output directory does not exist, skipping clearing")
else:
for filename in os.listdir(output_dir):
file_path = os.path.join(output_dir, filename)
@ -58,7 +52,7 @@ def main():
f"Custom format directory {trash_custom_formats_dir} does not exist, skipping."
)
continue
custom_regex_patterns = collect_regex_patterns(
regex_patterns = collect_regex_patterns(
service,
trash_custom_formats_dir,
regex_patterns_dir,
@ -79,7 +73,7 @@ def main():
)
continue
trash_id_to_scoring_mapping = collect_custom_formats(
service, trash_custom_formats_dir, custom_formats_dir, custom_regex_patterns
service, trash_custom_formats_dir, custom_formats_dir, regex_patterns
)
collect_profiles(
service,

View file

@ -1,15 +1,16 @@
import os
import json
import yaml
import yaml
from markdownify import markdownify
from utils.mappings.languages import LANGUAGE_MAPPING
from utils.file_utils import iterate_json_files
from utils.mappings.indexer_flags import INDEXER_FLAG_MAPPING
from utils.mappings.release_type import RELEASE_TYPE_MAPPING
from utils.mappings.languages import LANGUAGE_MAPPING
from utils.mappings.quality_modifiers import QUALITY_MODIFIER_MAPPING
from utils.mappings.release_type import RELEASE_TYPE_MAPPING
from utils.mappings.source import SOURCE_MAPPING
from utils.strings import get_name, get_regex_pattern_name
from utils.strings import get_name
IMPLEMENTATION_TO_TAG_MAPPING = {
"ReleaseTitleSpecification": "Release Title",
@ -39,54 +40,68 @@ SERVICE_TO_TRASH_GUIDES_URL = {
}
def collect_custom_format(
service, file_name, input_json, output_dir, custom_regex_patterns
def _create_condition_base(service, spec):
"""Create base condition structure from specification."""
return {
"name": get_name(service, spec.get("name", "")),
"negate": spec.get("negate", False),
"required": spec.get("required", False),
"type": IMPLEMENTATION_TO_TYPE_MAPPING.get(
spec.get("implementation"), "unknown"
),
}
def _add_condition_value(
condition, implementation, spec, *, service, regex_patterns, file_name
):
"""Add implementation-specific value to condition."""
fields = spec.get("fields", {})
value = fields.get("value")
if implementation in ["ReleaseTitleSpecification", "ReleaseGroupSpecification"]:
pattern_name = regex_patterns["by_pattern"].get(value)["name"]
if not pattern_name:
raise ValueError(
f"Pattern '{value}' not found in collected regex patterns "
f"for {service} in custom format {file_name}."
)
condition["pattern"] = pattern_name
elif implementation == "ResolutionSpecification":
condition["resolution"] = f"{value}p"
elif implementation == "SourceSpecification":
condition["source"] = SOURCE_MAPPING[service][value]
elif implementation == "LanguageSpecification":
condition["language"] = LANGUAGE_MAPPING[service][value]
elif implementation == "IndexerFlagSpecification":
condition["flag"] = INDEXER_FLAG_MAPPING[service][value]
elif implementation == "QualityModifierSpecification":
condition["qualityModifier"] = QUALITY_MODIFIER_MAPPING[service][value]
elif implementation == "ReleaseTypeSpecification":
condition["releaseType"] = RELEASE_TYPE_MAPPING[service][value]
else:
return False
return True
def _collect_custom_format(
service, file_name, input_json, output_dir, regex_patterns
):
conditions = []
implementation_tags = set()
for spec in input_json.get("specifications", []):
condition = {
"name": get_name(service, spec.get("name", "")),
"negate": spec.get("negate", False),
"required": spec.get("required", False),
"type": IMPLEMENTATION_TO_TYPE_MAPPING.get(
spec.get("implementation"), "unknown"
),
}
implementation = spec.get("implementation")
implementation_tags.add(IMPLEMENTATION_TO_TAG_MAPPING[implementation])
if implementation in ["ReleaseTitleSpecification", "ReleaseGroupSpecification"]:
pattern = spec.get("fields", {}).get("value")
condition["pattern"] = custom_regex_patterns.get(
pattern, get_regex_pattern_name(service, spec.get("name", ""))
)
elif implementation in ["ResolutionSpecification"]:
condition["resolution"] = f"{spec.get('fields', {}).get('value')}p"
elif implementation in ["SourceSpecification"]:
condition["source"] = SOURCE_MAPPING[service][
spec.get("fields", {}).get("value")
]
elif implementation in ["LanguageSpecification"]:
# TODO: exceptLanguage
condition["language"] = LANGUAGE_MAPPING[service][
spec.get("fields", {}).get("value")
]
elif implementation in ["IndexerFlagSpecification"]:
condition["flag"] = INDEXER_FLAG_MAPPING[service][
spec.get("fields", {}).get("value")
]
elif implementation in ["QualityModifierSpecification"]:
condition["qualityModifier"] = QUALITY_MODIFIER_MAPPING[service][
spec.get("fields", {}).get("value")
]
elif implementation in ["ReleaseTypeSpecification"]:
condition["releaseType"] = RELEASE_TYPE_MAPPING[service][
spec.get("fields", {}).get("value")
]
else:
condition = _create_condition_base(service, spec)
if not _add_condition_value(
condition,
implementation,
spec,
service=service,
regex_patterns=regex_patterns,
file_name=file_name,
):
print(f"Unrecognised implementation ({implementation}), skipping for now.")
continue
@ -120,23 +135,14 @@ def collect_custom_format(
def collect_custom_formats(service, input_dir, output_dir, custom_regex_patterns):
trash_id_to_scoring_mapping = {}
for root, _, files in os.walk(input_dir):
for filename in sorted(files):
if not filename.endswith(".json"):
continue
for _, file_stem, data in iterate_json_files(input_dir):
trash_id = data.get("trash_id")
trash_scores = data.get("trash_scores", {})
if trash_id:
trash_id_to_scoring_mapping[trash_id] = trash_scores
file_path = os.path.join(root, filename)
file_stem = os.path.splitext(filename)[0] # Filename without extension
with open(file_path, "r", encoding="utf-8") as f:
data = json.load(f)
trash_id = data.get("trash_id")
trash_scores = data.get("trash_scores", {})
if trash_id:
trash_id_to_scoring_mapping[trash_id] = trash_scores
collect_custom_format(
service, file_stem, data, output_dir, custom_regex_patterns
)
_collect_custom_format(
service, file_stem, data, output_dir, custom_regex_patterns
)
return trash_id_to_scoring_mapping

View file

@ -0,0 +1,29 @@
import json
import os
def iterate_json_files(input_dir):
"""
Generator that yields (file_path, file_stem, data) tuples for all JSON files
in the input directory and its subdirectories.
Args:
input_dir: Directory to search for JSON files
Yields:
Tuple of (file_path, file_stem, data) where:
- file_path: Full path to the JSON file
- file_stem: Filename without extension
- data: Parsed JSON data
"""
for root, _, files in os.walk(input_dir):
for filename in sorted(files):
if not filename.endswith(".json"):
continue
file_path = os.path.join(root, filename)
file_stem = os.path.splitext(filename)[0]
with open(file_path, encoding="utf-8") as f:
data = json.load(f)
yield file_path, file_stem, data

View file

@ -1,9 +1,11 @@
import os
import json
import os
import yaml
from utils.mappings.misc_media_management import MISC_MEDIA_MANAGEMENT
BASE_NAMING_CONFIG = {
"radarr": {
"rename": True,
@ -28,7 +30,7 @@ BASE_NAMING_CONFIG = {
BASE_QUALITY_DEFINITIONS = {"qualityDefinitions": {"radarr": {}, "sonarr": {}}}
def collect_misc_config(output_dir):
def _collect_misc_config(output_dir):
output_file = os.path.join(output_dir, "misc.yml")
with open(output_file, "w", encoding="utf-8") as f:
@ -37,14 +39,14 @@ def collect_misc_config(output_dir):
print(f"Generated: {output_file}")
def collect_naming_formats(input_dir, output_dir):
output_file = os.path.join(output_dir, f"naming.yml")
def _collect_naming_formats(input_dir, output_dir):
output_file = os.path.join(output_dir, "naming.yml")
new_config = BASE_NAMING_CONFIG.copy()
radarr_input_file_path = os.path.join(
input_dir, "radarr", "naming", "radarr-naming.json"
)
with open(radarr_input_file_path, "r", encoding="utf-8") as f:
with open(radarr_input_file_path, encoding="utf-8") as f:
input_json = json.load(f)
new_config["radarr"]["movieFormat"] = input_json["file"]["standard"]
new_config["radarr"]["movieFolderFormat"] = input_json["folder"]["default"]
@ -52,7 +54,7 @@ def collect_naming_formats(input_dir, output_dir):
sonarr_input_file_path = os.path.join(
input_dir, "sonarr", "naming", "sonarr-naming.json"
)
with open(sonarr_input_file_path, "r", encoding="utf-8") as f:
with open(sonarr_input_file_path, encoding="utf-8") as f:
input_json = json.load(f)
standard_episode_format = input_json["episodes"]["standard"]["default"]
daily_episode_format = input_json["episodes"]["daily"]["default"]
@ -71,14 +73,14 @@ def collect_naming_formats(input_dir, output_dir):
print(f"Generated: {output_file}")
def collect_quality_definitions(input_dir, output_dir):
def _collect_quality_definitions(input_dir, output_dir):
output_structure = BASE_QUALITY_DEFINITIONS.copy()
output_file = os.path.join(output_dir, "quality_definitions.yml")
radarr_input_file_path = os.path.join(
input_dir, "radarr", "quality-size", "movie.json"
)
with open(radarr_input_file_path, "r", encoding="utf-8") as f:
with open(radarr_input_file_path, encoding="utf-8") as f:
radarr_data = json.load(f)
for quality in reversed(radarr_data["qualities"]):
profilarr_quality = {
@ -93,7 +95,7 @@ def collect_quality_definitions(input_dir, output_dir):
sonarr_input_file_path = os.path.join(
input_dir, "sonarr", "quality-size", "series.json"
)
with open(sonarr_input_file_path, "r", encoding="utf-8") as f:
with open(sonarr_input_file_path, encoding="utf-8") as f:
sonarr_data = json.load(f)
for quality in reversed(sonarr_data["qualities"]):
profilarr_quality = {
@ -112,6 +114,6 @@ def collect_quality_definitions(input_dir, output_dir):
def collect_media_management(input_dir, output_dir):
collect_misc_config(output_dir)
collect_naming_formats(input_dir, output_dir)
collect_quality_definitions(input_dir, output_dir)
_collect_misc_config(output_dir)
_collect_naming_formats(input_dir, output_dir)
_collect_quality_definitions(input_dir, output_dir)

View file

@ -1,14 +1,14 @@
import os
import json
import yaml
import yaml
from markdownify import markdownify
from utils.file_utils import iterate_json_files
from utils.mappings.qualities import QUALITIES
from utils.strings import get_name
def collect_profile_formats(
def _collect_profile_formats(
service, trash_score_name, format_items, trash_id_to_scoring_mapping
):
profile_formats = []
@ -29,14 +29,14 @@ def collect_profile_formats(
)
def get_quality_id(quality_name):
def _get_quality_id(quality_name):
return next(
(quality["id"] for quality in QUALITIES if quality["name"] == quality_name),
None,
)
def collect_qualities(items):
def _collect_qualities(items):
qualities = []
quality_collection_id = -1
for item in items:
@ -44,7 +44,7 @@ def collect_qualities(items):
continue
quality = {
"id": get_quality_id(item.get("name", "")),
"id": _get_quality_id(item.get("name", "")),
"name": item.get("name", ""),
}
if item.get("items") is not None:
@ -54,14 +54,14 @@ def collect_qualities(items):
quality["qualities"] = []
for sub_item in item["items"]:
quality["qualities"].append(
{"id": get_quality_id(sub_item), "name": sub_item}
{"id": _get_quality_id(sub_item), "name": sub_item}
)
qualities.append(quality)
return list(reversed(qualities))
def get_upgrade_until(quality_name, profile_qualities):
def _get_upgrade_until(quality_name, profile_qualities):
found_quality = next(
quality for quality in profile_qualities if quality["name"] == quality_name
)
@ -73,10 +73,10 @@ def get_upgrade_until(quality_name, profile_qualities):
return found_quality
def collect_profile(service, input_json, output_dir, trash_id_to_scoring_mapping):
def _collect_profile(service, input_json, output_dir, trash_id_to_scoring_mapping):
# Compose YAML structure
name = input_json.get("name", "")
profile_qualities = collect_qualities(input_json.get("items", []))
profile_qualities = _collect_qualities(input_json.get("items", []))
yml_data = {
"name": get_name(service, name),
"description": f"""[Profile from TRaSH-Guides.](https://trash-guides.info/{service.capitalize()}/{service}-setup-quality-profiles)
@ -87,14 +87,14 @@ def collect_profile(service, input_json, output_dir, trash_id_to_scoring_mapping
"minCustomFormatScore": input_json.get("minFormatScore", 0),
"upgradeUntilScore": input_json.get("cutoffFormatScore", 0),
"minScoreIncrement": input_json.get("minUpgradeFormatScore", 0),
"custom_formats": collect_profile_formats(
"custom_formats": _collect_profile_formats(
service,
input_json.get("trash_score_set"),
input_json.get("formatItems", {}),
trash_id_to_scoring_mapping,
),
"qualities": profile_qualities,
"upgrade_until": get_upgrade_until(input_json.get("cutoff"), profile_qualities),
"upgrade_until": _get_upgrade_until(input_json.get("cutoff"), profile_qualities),
"language": input_json.get("language", "any").lower(),
}
@ -111,12 +111,5 @@ def collect_profiles(
output_dir,
trash_id_to_scoring_mapping,
):
for root, _, files in os.walk(input_dir):
for filename in sorted(files):
if not filename.endswith(".json"):
continue
file_path = os.path.join(root, filename)
with open(file_path, "r", encoding="utf-8") as f:
data = json.load(f)
collect_profile(service, data, output_dir, trash_id_to_scoring_mapping)
for _, _, data in iterate_json_files(input_dir):
_collect_profile(service, data, output_dir, trash_id_to_scoring_mapping)

View file

@ -1,16 +1,142 @@
import os
import json
import yaml
from utils.strings import get_regex_pattern_name, get_safe_name
duplicate_regex_patterns = {}
from utils.file_utils import iterate_json_files
from utils.strings import get_name
def collect_regex_pattern(service, file_name, input_json, output_dir):
# Find the first pattern in specifications
pattern = None
regex_patterns = {
"by_name": {},
"by_pattern": {},
}
def _update_existing_pattern_for_service(existing_data, service, output_dir):
"""Update an existing pattern file to support multiple services."""
existing_path = existing_data["file_path"]
if not os.path.exists(existing_path):
raise FileNotFoundError(f"Expected pattern file not found: {existing_path}")
# Use the existing pattern name to preserve casing
existing_name = existing_data["name"]
# Remove service prefix if present (e.g., "Radarr - " or "Sonarr - ")
# to get the base pattern name with original casing
for svc in ["Radarr", "Sonarr"]:
prefix = f"{svc} - "
if existing_name.startswith(prefix):
existing_name = existing_name[len(prefix):]
break
# Use the existing (original) name with preserved casing
safe_name = existing_name
new_path = os.path.join(output_dir, f"{safe_name}.yml")
# Rename file if needed (from service-specific to generic name)
if existing_path != new_path and not os.path.exists(new_path):
os.rename(existing_path, new_path)
# Update tracking data
old_name = existing_data["name"]
existing_data["file_path"] = new_path
existing_data["name"] = safe_name
# Update by_name dict with normalized keys
old_key = old_name.lower()
if old_key in regex_patterns["by_name"]:
regex_patterns["by_name"].pop(old_key)
regex_patterns["by_name"][safe_name.lower()] = existing_data
# Update the file to add the new service tag
with open(new_path, "r+", encoding="utf-8") as f:
yml_data = yaml.safe_load(f)
if "tags" not in yml_data:
yml_data["tags"] = []
if service.capitalize() not in yml_data["tags"]:
yml_data["tags"].append(service.capitalize())
yml_data["name"] = safe_name
f.seek(0)
yaml.dump(yml_data, f, sort_keys=False, allow_unicode=True)
f.truncate()
# Update services list in tracking
if service.capitalize() not in existing_data["services"]:
existing_data["services"].append(service.capitalize())
# Update name in tracking data
existing_data["name"] = safe_name
print(f"Updated pattern for multiple services: {new_path}")
def _generate_unique_pattern_name(initial_name, pattern, output_dir):
"""Generate a unique pattern name if there are collisions."""
final_name = initial_name
counter = 1
# Use lowercase for case-insensitive comparison
normalized_key = final_name.lower()
while normalized_key in regex_patterns["by_name"]:
existing_pattern_data = regex_patterns["by_name"][normalized_key]
if existing_pattern_data["pattern"] == pattern:
print(f"Pattern with same name and pattern already exists: {final_name}")
return None
final_name = f"{initial_name} ({counter})"
normalized_key = final_name.lower()
counter += 1
# Also check for case-insensitive file system collisions
while _case_insensitive_file_exists(output_dir, f"{final_name}.yml"):
final_name = f"{initial_name} ({counter})"
normalized_key = final_name.lower()
counter += 1
return final_name
def _case_insensitive_file_exists(directory, filename):
"""Check if a file exists with case-insensitive matching."""
if not os.path.exists(directory):
return False
filename_lower = filename.lower()
for existing_file in os.listdir(directory):
if existing_file.lower() == filename_lower:
return True
return False
def _create_new_pattern_file(service, pattern, final_name, output_dir):
"""Create a new pattern file."""
yml_data = {
"name": final_name,
"pattern": pattern,
"description": "",
"tags": [service.capitalize()],
"tests": [],
}
output_path = os.path.join(output_dir, f"{final_name}.yml")
with open(output_path, "w", encoding="utf-8") as f:
yaml.dump(yml_data, f, sort_keys=False, allow_unicode=True)
# Store in dict (twice - by name and by pattern)
# Use lowercase keys for case-insensitive lookups
pattern_data = {
"name": final_name,
"pattern": pattern,
"services": [service.capitalize()],
"file_path": output_path,
}
regex_patterns["by_name"][final_name.lower()] = pattern_data
regex_patterns["by_pattern"][pattern] = pattern_data
print(f"Generated: {output_path}")
return True
def _collect_regex_pattern(service, file_name, input_json, output_dir):
"""Extract and collect regex patterns from specifications."""
for spec in input_json.get("specifications", []):
implementation = spec.get("implementation")
if implementation not in [
@ -20,78 +146,37 @@ def collect_regex_pattern(service, file_name, input_json, output_dir):
continue
pattern = spec.get("fields", {}).get("value")
if not pattern:
print(f"No pattern found in {file_name} for {implementation}")
continue
# Compose YAML structure
name = spec.get("name", "")
spec_name = spec.get("name", "")
existing_pattern_name = duplicate_regex_patterns.get(pattern)
if existing_pattern_name:
existing_pattern_path = os.path.join(
output_dir,
f"{existing_pattern_name}.yml",
# Check if this exact pattern was already seen
if pattern in regex_patterns["by_pattern"]:
existing_data = regex_patterns["by_pattern"][pattern]
# If previously seen for the same service with the same regex - Skip
if service.capitalize() in existing_data.get("services", []):
print(f"Pattern already exists for {service}: {spec_name}")
continue
# If previously seen for a different service - update to support both
_update_existing_pattern_for_service(
existing_data, service, output_dir
)
if (
os.path.exists(existing_pattern_path)
and service.capitalize() not in existing_pattern_path
):
new_path = os.path.join(
output_dir,
f"{get_safe_name(name)}.yml",
)
os.rename(
existing_pattern_path,
new_path,
)
with open(new_path, "r+", encoding="utf-8") as f:
yml_data = yaml.safe_load(f)
yml_data["name"] = get_safe_name(name)
if service.capitalize() not in yml_data["tags"]:
yml_data["tags"].append(service.capitalize())
f.seek(0)
yaml.dump(yml_data, f, sort_keys=False, allow_unicode=True)
f.truncate()
duplicate_regex_patterns[pattern] = get_safe_name(name)
continue
else:
duplicate_regex_patterns[pattern] = get_regex_pattern_name(service, name)
yml_data = {
"name": get_regex_pattern_name(service, name),
"pattern": pattern,
"description": "",
"tags": [service.capitalize()],
"tests": [],
}
# Output path
output_path = os.path.join(
output_dir,
f"{get_regex_pattern_name(service, name)}.yml",
)
if os.path.exists(output_path):
print(f"exists{output_path}, skipping")
continue
with open(output_path, "w", encoding="utf-8") as f:
yaml.dump(yml_data, f, sort_keys=False, allow_unicode=True)
print(f"Generated: {output_path}")
# Pattern not seen before - check for name collisions
initial_name = get_name(service, spec_name, remove_not=True)
final_name = _generate_unique_pattern_name(initial_name, pattern, output_dir)
if final_name:
_create_new_pattern_file(service, pattern, final_name, output_dir)
def collect_regex_patterns(service, input_dir, output_dir):
for root, _, files in os.walk(input_dir):
for filename in sorted(files):
if not filename.endswith(".json"):
continue
for _, file_stem, data in iterate_json_files(input_dir):
_collect_regex_pattern(service, file_stem, data, output_dir)
file_path = os.path.join(root, filename)
file_stem = os.path.splitext(filename)[0] # Filename without extension
with open(file_path, "r", encoding="utf-8") as f:
data = json.load(f)
collect_regex_pattern(service, file_stem, data, output_dir)
return duplicate_regex_patterns
return regex_patterns

View file

@ -1,17 +1,17 @@
def get_safe_name(name):
def _get_safe_name(name, remove_not=False):
result = name
if remove_not:
result = result.replace("Not ", "")
return (
name.replace("/", "-")
result.replace("/", "-")
.replace("[", "(")
.replace("]", ")")
.replace("HDR10Plus", "HDR10+")
.replace("10 bit", "10bit")
.replace("Atmos", "ATMOS")
.strip()
)
def get_name(service, name):
return f"{service.capitalize()} - {get_safe_name(name)}"
def get_regex_pattern_name(service, regex_pattern_name):
return get_name(service, regex_pattern_name).replace("Not ", "")
def get_name(service, name, remove_not=False):
return f"{service.capitalize()} - {_get_safe_name(name, remove_not)}"