import os import sys import time import logging import requests from typing import List, Set from librouteros import connect from librouteros.query import Key from librouteros.exceptions import TrapError # Constants and config DEFAULT_ASN = "AS2906" # Netflix ASN TIMEOUT = "24:00:00" ASN = os.getenv("ASN", DEFAULT_ASN).split(',') MIKROTIK_HOST = os.getenv("MIKROTIK_HOST") USERNAME = os.getenv("USERNAME") PASSWORD = os.getenv("PASSWORD") ADDRESS_LIST_NAME = os.getenv("ADDRESS_LIST_NAME", "Netflix") def get_subnets_from_asn(asn: str) -> List[str]: """ Queries BGPView API to get all IPv4 prefixes announced by an ASN. """ logging.info(f"Fetching prefixes for ASN {asn}...") url = f"https://api.bgpview.io/asn/{asn}/prefixes" try: response = requests.get(url) response.raise_for_status() except requests.RequestException as e: raise RuntimeError(f"Failed to fetch ASN data: {e}") from e data = response.json() prefixes = [item['prefix'] for item in data['data']['ipv4_prefixes']] logging.info(f"Found {len(prefixes)} IPv4 prefixes for ASN {asn}.") return prefixes def get_subnets_from_asns(asns: List[str]) -> List[str]: """ Fetch prefixes from multiple ASNs. """ subnets: Set[str] = set() for asn in asns: subnets.update(get_subnets_from_asn(asn)) time.sleep(2) # Respect API rate limits return list(subnets) def subnet_exists(address_list, subnet: str, list_name: str) -> bool: """ Check if a subnet already exists in the MikroTik address list. """ entries = address_list.select().where(Key('address') == subnet, Key('list') == list_name) return any(entries) def add_subnet_address_list(address_list, subnet: str, list_name: str) -> bool: """ Add a single subnet to the MikroTik address list. """ try: address_list.add(address=subnet, list=list_name, timeout=TIMEOUT, comment="Added from ASN") return True except TrapError as err: logging.error(f"Failed to add subnet {subnet}: {err}") return False def add_subnets_address_list(address_list, subnets: List[str], list_name: str) -> int: """ Add multiple subnets to the MikroTik address list. """ added = 0 for subnet in subnets: if subnet_exists(address_list, subnet, list_name): logging.debug(f"[SKIP] {subnet} already exists.") continue if add_subnet_address_list(address_list, subnet, list_name): logging.info(f"[ADD] {subnet}") added += 1 return added def main(): logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s", datefmt="%Y-%m-%d %H:%M" ) if not MIKROTIK_HOST or not USERNAME or not PASSWORD: logging.fatal("Missing MikroTik connection info (check env vars)") sys.exit(1) logging.info(f"Connecting to MikroTik at {MIKROTIK_HOST}...") try: api = connect(host=MIKROTIK_HOST, username=USERNAME, password=PASSWORD) except Exception as e: logging.fatal(f"Failed to connect to MikroTik: {e}", exc_info=True) sys.exit(1) address_list = api.path('ip', 'firewall', 'address-list') try: subnets = get_subnets_from_asns(ASN) except Exception as e: logging.fatal(f"Failed to get subnets from ASN: {e}", exc_info=True) sys.exit(1) added = add_subnets_address_list(address_list, subnets, ADDRESS_LIST_NAME) logging.info(f"Done. Added {added} new subnets.") if __name__ == "__main__": main()