Correctly handle RRSIG records, converting numeric type codes into their textual equivalents

This commit is contained in:
2025-10-18 19:21:52 +02:00
parent 090d719f0d
commit 805ec6c7d2
2 changed files with 27 additions and 26 deletions

View File

@@ -1,6 +1,5 @@
import logging import logging
import subprocess import subprocess
import dns.zone
from pathlib import Path from pathlib import Path
from datetime import datetime, UTC from datetime import datetime, UTC
@@ -141,27 +140,3 @@ def export_all_zones(trigger_path: str = "filesystem-change") -> list[Path]:
logging.info("No zone files were written; skipping commit.") logging.info("No zone files were written; skipping commit.")
return written_files return written_files
def validate_zone(zone_name, content) -> bool:
def replace_type_codes(content):
pattern = re.compile(r'(RRSIG\s+)(\d+)(\s+)')
def repl(match):
num = int(match.group(2))
try:
text_type = dns.rdatatype.to_text(dns.rdatatype.RdataType(num))
return f"{match.group(1)}{text_type}{match.group(3)}"
except Exception as e:
logging.warning(e)
return match.group(0)
return pattern.sub(repl, content)
try:
dns.zone.from_text(replace_type_codes(content), zone_name + '.', relativize=False)
logging.info(f"Zone {zone_name} parsed successfully")
return True
except Exception as e:
logging.error(f"Parse failed for zone {zone_name}: {e}")
return False

View File

@@ -1,12 +1,38 @@
import logging import logging
import requests import requests
import dns.zone
import dns.rdatatype
from config import * from config import *
from helpers import validate_zone
session = requests.Session() session = requests.Session()
def validate_zone(zone_name, content) -> bool:
def replace_type_codes(content):
pattern = re.compile(r'(RRSIG\s+)(\d+)(\s+)')
def repl(match):
num = int(match.group(2))
try:
text_type = dns.rdatatype.to_text(dns.rdatatype.RdataType(num))
return f"{match.group(1)}{text_type}{match.group(3)}"
except Exception as e:
logging.warning(e)
return match.group(0)
return pattern.sub(repl, content)
try:
dns.zone.from_text(replace_type_codes(content), zone_name + '.', relativize=False)
logging.info(f"Zone {zone_name} parsed successfully")
return True
except Exception as e:
logging.error(f"Parse failed for zone {zone_name}: {e}")
return False
def list_zones() -> list[dict]: def list_zones() -> list[dict]:
url = f"{TECHNITIUM_API_BASE.rstrip('/')}{LIST_ZONES_ENDPOINT}?token={API_TOKEN}" url = f"{TECHNITIUM_API_BASE.rstrip('/')}{LIST_ZONES_ENDPOINT}?token={API_TOKEN}"
logging.debug(f"Listing zones from {url}") logging.debug(f"Listing zones from {url}")