From 2586becfba202ef084f310dc35cfe9d2843b7b44 Mon Sep 17 00:00:00 2001 From: Ettore Dreucci Date: Sat, 18 Oct 2025 19:19:14 +0200 Subject: [PATCH] Correctly handle RRSIG records, converting numeric type codes into their textual equivalents --- src/DebouncedHandler.py | 9 ++++++-- src/git.py | 5 +++-- src/helpers.py | 37 ++++++++++++++++++++++++++++++--- src/technitium.py | 12 ++--------- src/technitium_zone_exporter.py | 1 + 5 files changed, 47 insertions(+), 17 deletions(-) diff --git a/src/DebouncedHandler.py b/src/DebouncedHandler.py index 2b33069..c69d8f4 100644 --- a/src/DebouncedHandler.py +++ b/src/DebouncedHandler.py @@ -11,6 +11,7 @@ from helpers import export_single_zone debounce_timer = None debounce_lock = threading.Lock() + def run_export(trigger_path): global debounce_timer with debounce_lock: @@ -20,6 +21,7 @@ def run_export(trigger_path): except Exception: logging.exception("Export run failed.") + def schedule_export(trigger_path): global debounce_timer with debounce_lock: @@ -30,6 +32,7 @@ def schedule_export(trigger_path): debounce_timer.start() logging.debug("Debounce timer started/reset (%.1fs)", DEBOUNCE_SECONDS) + def _most_recent_file_under(path: Path, max_depth: int = 2) -> Path | None: if not path.exists(): return None @@ -51,9 +54,11 @@ def _most_recent_file_under(path: Path, max_depth: int = 2) -> Path | None: best = (m, p) return best[1] if best else None + class DebouncedHandler(PatternMatchingEventHandler): def __init__(self, patterns=None, ignore_patterns=None, ignore_directories=False, case_sensitive=True): - super().__init__(patterns=patterns or ["*"], ignore_patterns=ignore_patterns or [], ignore_directories=ignore_directories, case_sensitive=case_sensitive) + super().__init__(patterns=patterns or ["*"], ignore_patterns=ignore_patterns or [], + ignore_directories=ignore_directories, case_sensitive=case_sensitive) def on_any_event(self, event): try: @@ -78,4 +83,4 @@ class DebouncedHandler(PatternMatchingEventHandler): schedule_export(trigger) except Exception as e: logging.exception(f"Error handling filesystem event; scheduling export for watch dir as fallback: {e}") - schedule_export(WATCH_DIR) \ No newline at end of file + schedule_export(WATCH_DIR) diff --git a/src/git.py b/src/git.py index 3935264..a8c4c39 100644 --- a/src/git.py +++ b/src/git.py @@ -6,7 +6,8 @@ from pathlib import Path from config import * -def run_git_cmd(args, check=True, capture_output=False, env = None): + +def run_git_cmd(args, check=True, capture_output=False, env=None): cmd = ["git", "-C", GIT_REPO_DIR] + args logging.debug(f"Running git: {' '.join(cmd)}") return subprocess.run(cmd, check=check, capture_output=capture_output, text=True, env=env) @@ -16,4 +17,4 @@ def ensure_git_repo(): gitdir = Path(GIT_REPO_DIR) / ".git" if not gitdir.exists(): logging.error(f"Git repo not found at {GIT_REPO_DIR} (no .git directory). Initialize or set correct path.") - sys.exit(2) \ No newline at end of file + sys.exit(2) diff --git a/src/helpers.py b/src/helpers.py index 1f3a178..341c70f 100644 --- a/src/helpers.py +++ b/src/helpers.py @@ -1,5 +1,6 @@ import logging import subprocess +import dns.zone from pathlib import Path from datetime import datetime, UTC @@ -8,6 +9,7 @@ from config import * from git import run_git_cmd, ensure_git_repo from technitium import list_zones, export_zone + def write_zone_export(zone_name, content) -> Path: dest_dir = Path(GIT_REPO_DIR) dest_dir.mkdir(parents=True, exist_ok=True) @@ -19,6 +21,7 @@ def write_zone_export(zone_name, content) -> Path: f.write(content) return out_path + def commit_and_push(changed_files, trigger_path): # Pull from remote, rebasing try: @@ -65,7 +68,8 @@ def commit_and_push(changed_files, trigger_path): except subprocess.CalledProcessError as e: logging.exception(f"git push failed: {e}") -def extract_domain_from_path(path: str) -> str|None: + +def extract_domain_from_path(path: str) -> str | None: name = Path(path).name name_no_ext = name.rstrip(".zone") @@ -76,6 +80,7 @@ def extract_domain_from_path(path: str) -> str|None: return None + def export_single_zone(trigger_path: str) -> list[Path]: logging.info(f"Starting export of single zone for trigger path {trigger_path})") ensure_git_repo() @@ -108,7 +113,8 @@ def export_single_zone(trigger_path: str) -> list[Path]: logging.info(f"No domain found for trigger path {trigger_path}; falling back to full export") return export_all_zones(trigger_path) -def export_all_zones(trigger_path: str ="filesystem-change") -> list[Path]: + +def export_all_zones(trigger_path: str = "filesystem-change") -> list[Path]: logging.info(f"Starting export of all zones (trigger={trigger_path})") ensure_git_repo() @@ -134,4 +140,29 @@ def export_all_zones(trigger_path: str ="filesystem-change") -> list[Path]: else: logging.info("No zone files were written; skipping commit.") - return written_files \ No newline at end of file + return written_files + + +def replace_type_codes(content): + pattern = re.compile(r'(RRSIG\s+)(\d+)(\s+)') + + def repl(match): + num = int(match.group(2)) + try: + text_type = dns.rdatatype.to_text(dns.rdatatype.RdataType(num)) + return f"{match.group(1)}{text_type}{match.group(3)}" + except Exception as e: + logging.warning(e) + return match.group(0) + + return pattern.sub(repl, content) + + +def validate_zone(zone_name, content) -> bool: + try: + dns.zone.from_text(replace_type_codes(content), zone_name + '.', relativize=False) + logging.info(f"Zone {zone_name} parsed successfully") + return True + except Exception as e: + logging.error(f"Parse failed for zone {zone_name}: {e}") + return False diff --git a/src/technitium.py b/src/technitium.py index 1f7148e..3724352 100644 --- a/src/technitium.py +++ b/src/technitium.py @@ -1,19 +1,11 @@ import logging import requests -import dns.zone from config import * +from helpers import validate_zone session = requests.Session() -def validate_zone(zone_name, content) -> bool: - try: - dns.zone.from_text(content, zone_name, relativize=False) - logging.info(f"Zone {zone_name} parsed successfully") - return True - except Exception as e: - logging.error(f"Parse failed for zone {zone_name}: {e}") - return False def list_zones() -> list[dict]: url = f"{TECHNITIUM_API_BASE.rstrip('/')}{LIST_ZONES_ENDPOINT}?token={API_TOKEN}" @@ -40,4 +32,4 @@ def export_zone(zone_name) -> str: if validate_zone(zone_name, content): return content else: - raise RuntimeError(f"Could not validate zone {zone_name}: {content}") \ No newline at end of file + raise RuntimeError(f"Could not validate zone {zone_name}: {content}") diff --git a/src/technitium_zone_exporter.py b/src/technitium_zone_exporter.py index df4200e..a7b46c2 100644 --- a/src/technitium_zone_exporter.py +++ b/src/technitium_zone_exporter.py @@ -17,6 +17,7 @@ logging.basicConfig( datefmt="%Y-%m-%d %H:%M:%S", ) + def main(): # sanity checks if not Path(WATCH_DIR).exists():