First version

This commit is contained in:
2025-09-28 15:04:01 +02:00
commit b8d58bd20f
8 changed files with 426 additions and 0 deletions

2
.gitignore vendored Normal file
View File

@@ -0,0 +1,2 @@
.idea
.venv

122
README.md Normal file
View File

@@ -0,0 +1,122 @@
# Technitium Zone Exporter
This tool watches a directory for changes in [Technitium DNS Server](https://technitium.com/dns/) zone files.
When a change is detected, it:
1. Exports zones via the Technitium DNS API.
2. Writes the exported zones into a Git repository.
3. Commits (and optionally pushes) the changes.
Useful for keeping DNS zones under version control automatically.
---
## Features
- Watches any directory (using `watchdog`) and debounces rapid changes.
- Can export **all zones** or just the zone corresponding to the changed file.
- Commits with timestamp and changed file info.
- Reads config from environment variables.
- Runs continuously as a systemd service.
---
## Requirements
- Python 3.8+
- [watchdog](https://pypi.org/project/watchdog/)
- [requests](https://pypi.org/project/requests/)
- Git installed and repo initialized at the target directory
Install dependencies:
```bash
pip install watchdog requests
```
---
## Environment Variables
| Variable | Default | Description |
|------------------------|----------|----------------------------------------------------------|
| `TECHNITIUM_ZONE_DIR` | *(none)* | Directory where Technitium stores the zone files. |
| `TECHNITIUM_API_BASE` | *(none)* | Technitium URL with protocol and port |
| `TECHNITIUM_API_TOKEN` | *(none)* | API token for Technitium DNS. |
| `GIT_REPO_DIR` | *(none)* | Zone Git repository path |
| `GIT_AUTHOR_NAME` | *(none)* | Author name of the git commits |
| `GIT_AUTHOR_EMAIL` | *(none)* | Mail address of the autor of git commits |
| `GIT_PUSH` | *(none)* | Boolean (True/False) to enable commits push |
| `LOG_LEVEL` | `INFO` | Logging verbosity (`DEBUG`, `INFO`, `WARNING`, `ERROR`). |
---
## Running Manually
Export all zones immediately:
```bash
TECHNITIUM_API_TOKEN="yourtoken" LOG_LEVEL=DEBUG python3 technitium_zone_exporter.py
```
---
## Running as a Systemd Service
### 1. Service file
Create `/etc/systemd/system/technitium-zone-exporter.service`:
```ini
[Unit]
Description=Technitium DNS zone auto-exporter
After=network.target
[Service]
Type=simple
ExecStart=/usr/bin/python3 /opt/technitium_zone_exporter/src/technitium_zone_exporter.py
WorkingDirectory=/opt/technitium_zone_exporter
EnvironmentFile=/etc/technitium-zone-exporter.env
Restart=always
RestartSec=5s
User=root
[Install]
WantedBy=multi-user.target
```
### 2. Environment file
Create `/etc/technitium-zone-exporter.env`:
```bash
TECHNITIUM_ZONE_DIR=technitium_zone_dir
TECHNITIUM_API_BASE=technitium_url
TECHNITIUM_API_TOKEN=technitium_token
GIT_REPO_DIR=git_repo_dir
GIT_AUTHOR_NAME=technitium_git_user
GIT_AUTHOR_EMAIL=technitium_git_user_mail
GIT_PUSH=True
LOG_LEVEL=INFO
```
### 3. Enable & start
```bash
sudo systemctl daemon-reload
sudo systemctl enable technitium-zone-exporter
sudo systemctl start technitium-zone-exporter
sudo systemctl status technitium-zone-exporter
```
Logs:
```bash
journalctl -u technitium-zone-exporter -f
```
---
## Git Workflow
- The script automatically runs:
```bash
git add -A
git commit -m "Technitium zone export: <timestamp>"
git push
```
- Make sure the service user has push access to the remote repo.

39
src/DebouncedHandler.py Normal file
View File

@@ -0,0 +1,39 @@
import threading
import logging
from watchdog.events import PatternMatchingEventHandler
from config import *
from helpers import export_single_zone
# Internal state for debounce
debounce_timer = None
debounce_lock = threading.Lock()
def run_export(trigger_path):
global debounce_timer
with debounce_lock:
debounce_timer = None
try:
export_single_zone(trigger_path)
except Exception:
logging.exception("Export run failed.")
def schedule_export(trigger_path):
global debounce_timer
with debounce_lock:
if debounce_timer is not None:
debounce_timer.cancel()
debounce_timer = threading.Timer(DEBOUNCE_SECONDS, run_export, args=(trigger_path,))
debounce_timer.daemon = True
debounce_timer.start()
logging.debug("Debounce timer started/reset (%.1fs)", DEBOUNCE_SECONDS)
class DebouncedHandler(PatternMatchingEventHandler):
def __init__(self, patterns=None, ignore_patterns=None, ignore_directories=False, case_sensitive=True):
super().__init__(patterns=patterns or ["*"], ignore_patterns=ignore_patterns or [], ignore_directories=ignore_directories, case_sensitive=case_sensitive)
def on_any_event(self, event):
# When any matching event occurs, start/reset debounce timer
logging.debug(f"Filesystem event: {event.event_type} on {event.src_path}")
schedule_export(event.src_path)

30
src/config.py Normal file
View File

@@ -0,0 +1,30 @@
import os
import re
# Directory to watch for changes (e.g., technitium config folder where zone files are modified)
WATCH_DIR = os.environ.get("TECHNITIUM_ZONE_DIR")
# Git repo directory where exports will be stored (must be a git repo)
GIT_REPO_DIR = os.environ.get("GIT_REPO_DIR")
# Technitium API settings
TECHNITIUM_API_BASE = os.environ.get("TECHNITIUM_API_BASE")
API_TOKEN = os.environ.get("TECHNITIUM_API_TOKEN")
# API endpoints
LIST_ZONES_ENDPOINT = "/api/zones/list"
EXPORT_ZONE_ENDPOINT = "/api/zones/export"
# Git options
GIT_AUTHOR_NAME = os.environ.get("GIT_AUTHOR_NAME")
GIT_AUTHOR_EMAIL = os.environ.get("GIT_AUTHOR_EMAIL")
GIT_PUSH = os.environ.get("GIT_PUSH")
# Debounce (seconds) to coalesce many quick FS events into a single export
DEBOUNCE_SECONDS = 2.0
# Domain regex
DOMAIN_FRAGMENT_RE = re.compile(r"([a-z0-9][a-z0-9\-]*(?:\.[a-z0-9][a-z0-9\-]*)+)", re.IGNORECASE)
# Logging
LOG_LEVEL = os.environ.get("LOG_LEVEL", "INFO").upper()

19
src/git.py Normal file
View File

@@ -0,0 +1,19 @@
import logging
import subprocess
import sys
from pathlib import Path
from config import *
def run_git_cmd(args, check=True, capture_output=False):
cmd = ["git", "-C", GIT_REPO_DIR] + args
logging.debug(f"Running git: {" ".join(cmd)}")
return subprocess.run(cmd, check=check, capture_output=capture_output, text=True)
def ensure_git_repo():
gitdir = Path(GIT_REPO_DIR) / ".git"
if not gitdir.exists():
logging.error(f"Git repo not found at {GIT_REPO_DIR} (no .git directory). Initialize or set correct path.")
sys.exit(2)

132
src/helpers.py Normal file
View File

@@ -0,0 +1,132 @@
import logging
import subprocess
from pathlib import Path
from datetime import datetime, UTC
from config import *
from git import run_git_cmd, ensure_git_repo
from technitium import list_zones, export_zone
def write_zone_export(zone_name, content) -> Path:
dest_dir = Path(GIT_REPO_DIR)
dest_dir.mkdir(parents=True, exist_ok=True)
safe_name = zone_name.replace("/", "_")
out_path = dest_dir / f"db.{safe_name}"
mode = "w"
logging.info(f"Writing export for zone {zone_name} -> {out_path}")
with open(out_path, mode, encoding="utf-8") as f:
f.write(content)
return out_path
def commit_and_push(changed_files, trigger_path):
# Stage files
try:
# Add only the exports folder (keeps repo tidy)
run_git_cmd(["add", "-A"])
except subprocess.CalledProcessError as e:
logging.exception(f"git add failed: {e}")
return
# Check if there is anything to commit
try:
# git diff --cached --quiet will exit 0 if no changes staged
subprocess.run(["git", "-C", GIT_REPO_DIR, "diff", "--cached", "--quiet"], check=True)
logging.info("No changes to commit (nothing staged).")
return
except subprocess.CalledProcessError:
# Non-zero return means there are changes staged.
pass
changed_list_text = "\n".join(str(p) for p in changed_files)
ts = datetime.now(UTC).strftime("%Y-%m-%dT%H:%M:%SZ")
commit_msg = f"Technitium zone export: {ts}\n\nTrigger: {trigger_path}\n\nChanged files:\n{changed_list_text}\n"
env = os.environ.copy()
env["GIT_AUTHOR_NAME"] = GIT_AUTHOR_NAME
env["GIT_AUTHOR_EMAIL"] = GIT_AUTHOR_EMAIL
try:
run_git_cmd(["commit", "-m", commit_msg], check=True)
logging.info("Committed changes to git.")
except subprocess.CalledProcessError as e:
logging.exception(f"git commit failed: {e}")
return
if GIT_PUSH:
try:
run_git_cmd(["push"], check=True)
logging.info("Pushed commit to remote.")
except subprocess.CalledProcessError as e:
logging.exception(f"git push failed: {e}")
def extract_domain_from_path(path: str) -> str|None:
name = Path(path).name
name_no_ext = name.rstrip(".zone")
candidates = set()
if DOMAIN_FRAGMENT_RE.search(name_no_ext):
found = DOMAIN_FRAGMENT_RE.findall(name_no_ext)
for f in found:
return f
return None
def export_single_zone(trigger_path: str) -> list[Path]:
logging.info(f"Starting export of single zone for trigger path {trigger_path})")
ensure_git_repo()
domain = extract_domain_from_path(trigger_path)
try:
zones = list_zones()
except Exception as e:
logging.exception(f"Failed to list zones from API; falling back to full export: {e}")
return export_all_zones(trigger_path)
if domain is not None:
for zone in zones:
zone_name = zone.get("name")
if zone_name == domain:
logging.info(f"Single matching zone found: {zone_name}")
try:
content = export_zone(zone)
out = write_zone_export(zone_name, content)
commit_and_push([out], trigger_path)
return [out]
except Exception as e:
logging.exception(f"Failed to export zone {zone_name}; falling back to full export: {e}")
return export_all_zones(trigger_path)
logging.info(f"No unique match found for {domain}; falling back to full export")
return export_all_zones(trigger_path)
else:
logging.info(f"No domain found for trigger path {trigger_path}; falling back to full export")
return export_all_zones(trigger_path)
def export_all_zones(trigger_path: str ="filesystem-change") -> list[Path]:
logging.info(f"Starting export of all zones (trigger={trigger_path})")
ensure_git_repo()
try:
zones = list_zones()
except Exception as e:
logging.exception(f"Failed to list zones from API: {e}")
return []
written_files = []
for z in zones:
# zone may be a dict with keys like 'id' and 'domain' — adapt to your API result shape
zone_name = z.get("name")
try:
content = export_zone(z)
out = write_zone_export(zone_name, content)
written_files.append(out)
except Exception as e:
logging.exception(f"Failed to export zone {zone_name}: {e}")
if written_files:
commit_and_push(written_files, trigger_path)
else:
logging.info("No zone files were written; skipping commit.")
return written_files

29
src/technitium.py Normal file
View File

@@ -0,0 +1,29 @@
import logging
import requests
from config import *
session = requests.Session()
def list_zones() -> list[dict]:
url = f"{TECHNITIUM_API_BASE.rstrip("/")}{LIST_ZONES_ENDPOINT}?token={API_TOKEN}"
logging.debug(f"Listing zones from {url}")
r = session.get(url, timeout=15)
r.raise_for_status()
try:
response = r.json()
except ValueError:
logging.error(f"List zones endpoint did not return JSON; got: {r.text}")
raise
try:
return response['response']['zones']
except KeyError:
logging.error(f"Response did not include zones; got {response}")
raise
def export_zone(zone_name) -> str:
url = f"{TECHNITIUM_API_BASE.rstrip("/")}{EXPORT_ZONE_ENDPOINT}?token={API_TOKEN}&zone={zone_name}"
r = session.get(url, timeout=30)
r.raise_for_status()
return r.text

View File

@@ -0,0 +1,53 @@
#!/usr/bin/env python3
import time
import logging
import sys
from watchdog.observers import Observer
from pathlib import Path
from config import *
from helpers import export_all_zones
from DebouncedHandler import DebouncedHandler
logging.basicConfig(
level=getattr(logging, LOG_LEVEL, logging.INFO),
format="%(asctime)s %(levelname)-8s %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
def main():
# sanity checks
if not Path(WATCH_DIR).exists():
logging.error(f"Watch directory does not exist: {WATCH_DIR}")
sys.exit(1)
if not Path(GIT_REPO_DIR).exists():
logging.error(f"Git repo directory does not exist: {GIT_REPO_DIR}")
sys.exit(1)
logging.info(f"Watching {WATCH_DIR} for changes; exports will be written to {GIT_REPO_DIR}")
event_handler = DebouncedHandler(ignore_directories=False)
observer = Observer()
observer.schedule(event_handler, WATCH_DIR, recursive=True)
observer.start()
# initial export on startup
try:
export_all_zones(trigger_path="startup")
except Exception as e:
logging.exception(f"Initial export failed: {e}")
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
logging.info("Stopping watcher...")
finally:
observer.stop()
observer.join()
if __name__ == "__main__":
main()