Correctly handle RRSIG records, converting numeric type codes into their textual equivalents

This commit is contained in:
2025-10-18 19:19:14 +02:00
parent e155fc4611
commit 2586becfba
5 changed files with 47 additions and 17 deletions

View File

@@ -11,6 +11,7 @@ from helpers import export_single_zone
debounce_timer = None debounce_timer = None
debounce_lock = threading.Lock() debounce_lock = threading.Lock()
def run_export(trigger_path): def run_export(trigger_path):
global debounce_timer global debounce_timer
with debounce_lock: with debounce_lock:
@@ -20,6 +21,7 @@ def run_export(trigger_path):
except Exception: except Exception:
logging.exception("Export run failed.") logging.exception("Export run failed.")
def schedule_export(trigger_path): def schedule_export(trigger_path):
global debounce_timer global debounce_timer
with debounce_lock: with debounce_lock:
@@ -30,6 +32,7 @@ def schedule_export(trigger_path):
debounce_timer.start() debounce_timer.start()
logging.debug("Debounce timer started/reset (%.1fs)", DEBOUNCE_SECONDS) logging.debug("Debounce timer started/reset (%.1fs)", DEBOUNCE_SECONDS)
def _most_recent_file_under(path: Path, max_depth: int = 2) -> Path | None: def _most_recent_file_under(path: Path, max_depth: int = 2) -> Path | None:
if not path.exists(): if not path.exists():
return None return None
@@ -51,9 +54,11 @@ def _most_recent_file_under(path: Path, max_depth: int = 2) -> Path | None:
best = (m, p) best = (m, p)
return best[1] if best else None return best[1] if best else None
class DebouncedHandler(PatternMatchingEventHandler): class DebouncedHandler(PatternMatchingEventHandler):
def __init__(self, patterns=None, ignore_patterns=None, ignore_directories=False, case_sensitive=True): def __init__(self, patterns=None, ignore_patterns=None, ignore_directories=False, case_sensitive=True):
super().__init__(patterns=patterns or ["*"], ignore_patterns=ignore_patterns or [], ignore_directories=ignore_directories, case_sensitive=case_sensitive) super().__init__(patterns=patterns or ["*"], ignore_patterns=ignore_patterns or [],
ignore_directories=ignore_directories, case_sensitive=case_sensitive)
def on_any_event(self, event): def on_any_event(self, event):
try: try:
@@ -78,4 +83,4 @@ class DebouncedHandler(PatternMatchingEventHandler):
schedule_export(trigger) schedule_export(trigger)
except Exception as e: except Exception as e:
logging.exception(f"Error handling filesystem event; scheduling export for watch dir as fallback: {e}") logging.exception(f"Error handling filesystem event; scheduling export for watch dir as fallback: {e}")
schedule_export(WATCH_DIR) schedule_export(WATCH_DIR)

View File

@@ -6,7 +6,8 @@ from pathlib import Path
from config import * from config import *
def run_git_cmd(args, check=True, capture_output=False, env = None):
def run_git_cmd(args, check=True, capture_output=False, env=None):
cmd = ["git", "-C", GIT_REPO_DIR] + args cmd = ["git", "-C", GIT_REPO_DIR] + args
logging.debug(f"Running git: {' '.join(cmd)}") logging.debug(f"Running git: {' '.join(cmd)}")
return subprocess.run(cmd, check=check, capture_output=capture_output, text=True, env=env) return subprocess.run(cmd, check=check, capture_output=capture_output, text=True, env=env)
@@ -16,4 +17,4 @@ def ensure_git_repo():
gitdir = Path(GIT_REPO_DIR) / ".git" gitdir = Path(GIT_REPO_DIR) / ".git"
if not gitdir.exists(): if not gitdir.exists():
logging.error(f"Git repo not found at {GIT_REPO_DIR} (no .git directory). Initialize or set correct path.") logging.error(f"Git repo not found at {GIT_REPO_DIR} (no .git directory). Initialize or set correct path.")
sys.exit(2) sys.exit(2)

View File

@@ -1,5 +1,6 @@
import logging import logging
import subprocess import subprocess
import dns.zone
from pathlib import Path from pathlib import Path
from datetime import datetime, UTC from datetime import datetime, UTC
@@ -8,6 +9,7 @@ from config import *
from git import run_git_cmd, ensure_git_repo from git import run_git_cmd, ensure_git_repo
from technitium import list_zones, export_zone from technitium import list_zones, export_zone
def write_zone_export(zone_name, content) -> Path: def write_zone_export(zone_name, content) -> Path:
dest_dir = Path(GIT_REPO_DIR) dest_dir = Path(GIT_REPO_DIR)
dest_dir.mkdir(parents=True, exist_ok=True) dest_dir.mkdir(parents=True, exist_ok=True)
@@ -19,6 +21,7 @@ def write_zone_export(zone_name, content) -> Path:
f.write(content) f.write(content)
return out_path return out_path
def commit_and_push(changed_files, trigger_path): def commit_and_push(changed_files, trigger_path):
# Pull from remote, rebasing # Pull from remote, rebasing
try: try:
@@ -65,7 +68,8 @@ def commit_and_push(changed_files, trigger_path):
except subprocess.CalledProcessError as e: except subprocess.CalledProcessError as e:
logging.exception(f"git push failed: {e}") logging.exception(f"git push failed: {e}")
def extract_domain_from_path(path: str) -> str|None:
def extract_domain_from_path(path: str) -> str | None:
name = Path(path).name name = Path(path).name
name_no_ext = name.rstrip(".zone") name_no_ext = name.rstrip(".zone")
@@ -76,6 +80,7 @@ def extract_domain_from_path(path: str) -> str|None:
return None return None
def export_single_zone(trigger_path: str) -> list[Path]: def export_single_zone(trigger_path: str) -> list[Path]:
logging.info(f"Starting export of single zone for trigger path {trigger_path})") logging.info(f"Starting export of single zone for trigger path {trigger_path})")
ensure_git_repo() ensure_git_repo()
@@ -108,7 +113,8 @@ def export_single_zone(trigger_path: str) -> list[Path]:
logging.info(f"No domain found for trigger path {trigger_path}; falling back to full export") logging.info(f"No domain found for trigger path {trigger_path}; falling back to full export")
return export_all_zones(trigger_path) return export_all_zones(trigger_path)
def export_all_zones(trigger_path: str ="filesystem-change") -> list[Path]:
def export_all_zones(trigger_path: str = "filesystem-change") -> list[Path]:
logging.info(f"Starting export of all zones (trigger={trigger_path})") logging.info(f"Starting export of all zones (trigger={trigger_path})")
ensure_git_repo() ensure_git_repo()
@@ -134,4 +140,29 @@ def export_all_zones(trigger_path: str ="filesystem-change") -> list[Path]:
else: else:
logging.info("No zone files were written; skipping commit.") logging.info("No zone files were written; skipping commit.")
return written_files return written_files
def replace_type_codes(content):
pattern = re.compile(r'(RRSIG\s+)(\d+)(\s+)')
def repl(match):
num = int(match.group(2))
try:
text_type = dns.rdatatype.to_text(dns.rdatatype.RdataType(num))
return f"{match.group(1)}{text_type}{match.group(3)}"
except Exception as e:
logging.warning(e)
return match.group(0)
return pattern.sub(repl, content)
def validate_zone(zone_name, content) -> bool:
try:
dns.zone.from_text(replace_type_codes(content), zone_name + '.', relativize=False)
logging.info(f"Zone {zone_name} parsed successfully")
return True
except Exception as e:
logging.error(f"Parse failed for zone {zone_name}: {e}")
return False

View File

@@ -1,19 +1,11 @@
import logging import logging
import requests import requests
import dns.zone
from config import * from config import *
from helpers import validate_zone
session = requests.Session() session = requests.Session()
def validate_zone(zone_name, content) -> bool:
try:
dns.zone.from_text(content, zone_name, relativize=False)
logging.info(f"Zone {zone_name} parsed successfully")
return True
except Exception as e:
logging.error(f"Parse failed for zone {zone_name}: {e}")
return False
def list_zones() -> list[dict]: def list_zones() -> list[dict]:
url = f"{TECHNITIUM_API_BASE.rstrip('/')}{LIST_ZONES_ENDPOINT}?token={API_TOKEN}" url = f"{TECHNITIUM_API_BASE.rstrip('/')}{LIST_ZONES_ENDPOINT}?token={API_TOKEN}"
@@ -40,4 +32,4 @@ def export_zone(zone_name) -> str:
if validate_zone(zone_name, content): if validate_zone(zone_name, content):
return content return content
else: else:
raise RuntimeError(f"Could not validate zone {zone_name}: {content}") raise RuntimeError(f"Could not validate zone {zone_name}: {content}")

View File

@@ -17,6 +17,7 @@ logging.basicConfig(
datefmt="%Y-%m-%d %H:%M:%S", datefmt="%Y-%m-%d %H:%M:%S",
) )
def main(): def main():
# sanity checks # sanity checks
if not Path(WATCH_DIR).exists(): if not Path(WATCH_DIR).exists():