#!/usr/bin/env python3 """ update_tdns_records.py Supports two update backends: - HTTP_API: Technitium DNS HTTP API (token-based) - RFC2136: standard DNS dynamic update (optionally TSIG authenticated) Environment variables (main): - BACKEND ("HTTP_API" or "RFC2136") default "HTTP_API" - RUN_ONCE "1" to run once and exit, default "0" (run loop) - UPDATE_INTERVAL seconds between runs when not RUN_ONCE, default 300 - ZONE e.g. "pippo.com" - RECORD_NAME e.g. "fi-vg3" (the script will append the zone: fi-vg3.pippo.com) - TTL integer TTL for records, default 300 - SKIP_IPV6 "1" to skip AAAA handling -- HTTP API specific -- - TDNS_HOST e.g. "http://technitium:5380" - TDNS_API_TOKEN your Technitium API token -- RFC2136 specific -- - RFC2136_SERVER IP or hostname of authoritative server (or Technitium host) - RFC2136_PORT port, default 53 - RFC2136_TSIG_NAME optional TSIG key name - RFC2136_TSIG_KEY optional TSIG key (base64) - RFC2136_TSIG_ALGO optional TSIG algorithm, e.g. "hmac-sha256." (note trailing dot accepted) """ import os import sys import time import socket from typing import Optional, Tuple, Dict, Any import requests import dns.resolver import dns.update import dns.query import dns.tsigkeyring import dns.exception # Configuration loading BACKEND = os.environ.get("BACKEND", "HTTP_API").upper() RUN_ONCE = os.environ.get("RUN_ONCE", "0") == "1" UPDATE_INTERVAL = int(os.environ.get("UPDATE_INTERVAL", "300")) ZONE = os.environ.get("ZONE").strip().rstrip(".") RECORD_NAME = os.environ.get("RECORD_NAME").strip() TTL = int(os.environ.get("TTL", "300")) SKIP_IPV6 = os.environ.get("SKIP_IPV6", "0") == "1" TDNS_HOST = os.environ.get("TDNS_HOST") TDNS_API_TOKEN = os.environ.get("TDNS_API_TOKEN") RFC2136_SERVER = os.environ.get("RFC2136_SERVER") RFC2136_PORT = int(os.environ.get("RFC2136_PORT", "53")) RFC2136_TSIG_NAME = os.environ.get("RFC2136_TSIG_NAME") RFC2136_TSIG_KEY = os.environ.get("RFC2136_TSIG_KEY") RFC2136_TSIG_ALGO = os.environ.get("RFC2136_TSIG_ALGO", "hmac-sha256.") FULL_DOMAIN = f"{RECORD_NAME}.{ZONE}".rstrip(".") # helpers to fetch public IPs def get_public_ipv4(timeout=10) -> Optional[str]: candidates = [ ("https://api.ipify.org", {"format": "text"}), ("https://v4.ifconfig.co/ip", None), ("https://ifconfig.me/ip", None), ] for url, params in candidates: try: r = requests.get(url, params=params, timeout=timeout) r.raise_for_status() ip = r.text.strip() # basic validation socket.inet_pton(socket.AF_INET, ip) return ip except Exception: continue return None def get_public_ipv6(timeout=10) -> Optional[str]: candidates = [ ("https://api64.ipify.org", {"format": "text"}), ("https://v6.ifconfig.co/ip", None), ] for url, params in candidates: try: r = requests.get(url, params=params, timeout=timeout) r.raise_for_status() ip = r.text.strip() socket.inet_pton(socket.AF_INET6, ip) return ip except Exception: continue return None # Technitium HTTP API backend def http_api_update(record_type: str, ip: str) -> Tuple[bool, Any]: """ Uses Technitium /api/zones/records/add with overwrite=true. """ if not TDNS_API_TOKEN: return False, "Missing TDNS_API_TOKEN" endpoint = TDNS_HOST.rstrip("/") + "/api/zones/records/add" params = { "token": TDNS_API_TOKEN, "domain": FULL_DOMAIN, "type": record_type, "ttl": str(TTL), "text": ip, "overwrite": "true", } try: r = requests.get(endpoint, params=params, timeout=15) if r.status_code != 200: return False, f"HTTP {r.status_code}: {r.text}" try: return True, r.json() except ValueError: return True, r.text except Exception as e: return False, str(e) # RFC2136 backend def rfc2136_update(record_type: str, ip: str) -> Tuple[bool, Any]: """ Build a DNS update packet for ZONE, replace the RR for FULL_DOMAIN of type record_type. Requires RFC2136_SERVER to be set. Optional TSIG key via RFC2136_TSIG_* env vars. """ if not RFC2136_SERVER: return False, "Missing RFC2136_SERVER env" try: # optionally create tsig keyring keyring = None if RFC2136_TSIG_NAME and RFC2136_TSIG_KEY: key_name = RFC2136_TSIG_NAME.rstrip(".") keyring = dns.tsigkeyring.from_text({key_name: RFC2136_TSIG_KEY}) update = dns.update.Update(ZONE, keyring=keyring, keyname=RFC2136_TSIG_NAME, keyalgorithm=RFC2136_TSIG_ALGO if RFC2136_TSIG_ALGO else None) # delete existing records for that name/type and add new update.delete(FULL_DOMAIN + ".", record_type) update.add(FULL_DOMAIN + ".", TTL, record_type, ip) # send to server response = dns.query.tcp(update, RFC2136_SERVER, timeout=10, port=RFC2136_PORT) rcode = response.rcode() if rcode == 0: return True, "OK" else: return False, f"DNS rcode {rcode}" except Exception as e: return False, str(e) # function to query current records (to avoid unnecessary updates) def query_current_records(record_type: str) -> list: try: resolver = dns.resolver resolver.nameservers = ['192.168.178.2'] answers = resolver.resolve(FULL_DOMAIN, record_type, lifetime=5) return [r.to_text() for r in answers] except (dns.resolver.NXDOMAIN, dns.resolver.NoAnswer, dns.resolver.Timeout, dns.resolver.NoNameservers): return [] def update_once() -> Dict[str, Any]: out = {"domain": FULL_DOMAIN, "backend": BACKEND, "results": []} v4 = get_public_ipv4() v6 = None if SKIP_IPV6 else get_public_ipv6() if not v4 and not v6: out["error"] = "no_public_ip" return out # IPv4 if v4: current = query_current_records("A") if current and v4 in current: out["results"].append({"type": "A", "ip": v4, "status": "skipped", "reason": "already_set", "current": current}) else: if BACKEND == "HTTP_API": ok, info = http_api_update("A", v4) else: ok, info = rfc2136_update("A", v4) out["results"].append({"type": "A", "ip": v4, "ok": ok, "info": info}) else: out["results"].append({"type": "A", "ip": None, "status": "skipped", "reason": "no_ipv4"}) # IPv6 if not SKIP_IPV6: if v6: current6 = query_current_records("AAAA") if current6 and v6 in current6: out["results"].append({"type": "AAAA", "ip": v6, "status": "skipped", "reason": "already_set", "current": current6}) else: if BACKEND == "HTTP_API": ok, info = http_api_update("AAAA", v6) else: ok, info = rfc2136_update("AAAA", v6) out["results"].append({"type": "AAAA", "ip": v6, "ok": ok, "info": info}) else: out["results"].append({"type": "AAAA", "ip": None, "status": "skipped", "reason": "no_ipv6"}) return out def pretty_print(result: Dict[str, Any]): from pprint import pformat print(pformat(result)) def main(): print(f"Starting updater for {FULL_DOMAIN} (zone {ZONE}) backend={BACKEND}") if BACKEND not in ("HTTP_API", "RFC2136"): print("ERROR: BACKEND must be HTTP_API or RFC2136", file=sys.stderr) sys.exit(2) if BACKEND == "HTTP_API" and not TDNS_API_TOKEN: print("ERROR: TDNS_API_TOKEN required for HTTP_API backend", file=sys.stderr) sys.exit(2) if BACKEND == "RFC2136" and not RFC2136_SERVER: print("ERROR: RFC2136_SERVER required for RFC2136 backend", file=sys.stderr) sys.exit(2) if not ZONE: print("ERROR: ZONE not defined", file=sys.stderr) sys.exit(2) if not RECORD_NAME: print("ERROR: RECORD_NAME not defined", file=sys.stderr) sys.exit(2) while True: res = update_once() pretty_print(res) if RUN_ONCE: break time.sleep(UPDATE_INTERVAL) if __name__ == "__main__": main()