diff --git a/src/helpers.py b/src/helpers.py index f398fe2..26a1114 100644 --- a/src/helpers.py +++ b/src/helpers.py @@ -1,5 +1,6 @@ import logging import subprocess +import dns.zone from pathlib import Path from datetime import datetime, UTC @@ -62,8 +63,6 @@ def extract_domain_from_path(path: str) -> str|None: name = Path(path).name name_no_ext = name.rstrip(".zone") - candidates = set() - if DOMAIN_FRAGMENT_RE.search(name_no_ext): found = DOMAIN_FRAGMENT_RE.findall(name_no_ext) for f in found: @@ -71,6 +70,15 @@ def extract_domain_from_path(path: str) -> str|None: return None +def validate_zone(zone_name, content) -> bool: + try: + dns.zone.from_text(content, zone_name, relativize=False) + logging.info(f"Zone {zone_name} parsed successfully") + return True + except Exception as e: + logging.error(f"Parse failed for zone {zone_name}") + return False + def export_single_zone(trigger_path: str) -> list[Path]: logging.info(f"Starting export of single zone for trigger path {trigger_path})") ensure_git_repo() @@ -88,7 +96,7 @@ def export_single_zone(trigger_path: str) -> list[Path]: if zone_name == domain: logging.info(f"Single matching zone found: {zone_name}") try: - content = export_zone(zone) + content = export_zone(zone_name) out = write_zone_export(zone_name, content) commit_and_push([out], trigger_path) return [out] @@ -118,7 +126,7 @@ def export_all_zones(trigger_path: str ="filesystem-change") -> list[Path]: # zone may be a dict with keys like 'id' and 'domain' — adapt to your API result shape zone_name = z.get("name") try: - content = export_zone(z) + content = export_zone(zone_name) out = write_zone_export(zone_name, content) written_files.append(out) except Exception as e: diff --git a/src/technitium.py b/src/technitium.py index b5994f4..70a7c0e 100644 --- a/src/technitium.py +++ b/src/technitium.py @@ -2,6 +2,7 @@ import logging import requests from config import * +from helpers import validate_zone session = requests.Session() @@ -26,4 +27,8 @@ def export_zone(zone_name) -> str: url = f"{TECHNITIUM_API_BASE.rstrip('/')}{EXPORT_ZONE_ENDPOINT}?token={API_TOKEN}&zone={zone_name}" r = session.get(url, timeout=30) r.raise_for_status() - return r.text \ No newline at end of file + content = r.text + if validate_zone(zone_name, content): + return content + else: + raise RuntimeError(f"Could not validate zone {zone_name}: {content}") \ No newline at end of file