diff --git a/README.md b/README.md index 4f8e892..bda3b22 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # netflix-asn -A small **Python utility** that fetches IPv4/IPv6 prefixes announced by one or more ASNs (via the [BGPView API](https://bgpview.io/api)) and ensures those prefixes are present in a MikroTik **IP firewall address-list**. +A small **Python utility** that fetches IPv4/IPv6 prefixes announced by one or more ASNs (via the [RIPEstat API](https://stat.ripe.net/)) and ensures those prefixes are present in a MikroTik **IP firewall address-list**. It’s designed to run inside **Docker** — using a `Dockerfile` and `docker-compose.yml`. @@ -68,9 +68,11 @@ ADDRESS_LIST_NAME=Netflix 1. The script loads configuration from environment variables. 2. For each ASN, it queries: + + ```text + https://stat.ripe.net//data/announced-prefixes/data.json?resource=" ``` - https://api.bgpview.io/asn//prefixes - ``` + 3. It collects all IPv4/IPv6 prefixes and removes duplicates. 4. Connects to the MikroTik API using [`librouteros`](https://pypi.org/project/librouteros/). 5. For each prefix: diff --git a/src/netflix-asn.py b/src/netflix-asn.py index 2fe8a81..2ac9a82 100644 --- a/src/netflix-asn.py +++ b/src/netflix-asn.py @@ -1,10 +1,10 @@ -import os -import sys -import time import logging -import requests -from typing import List, Set -from librouteros import connect +from sys import exit +from os import getenv +from typing import Dict, List, Set +from requests import get, RequestException +from ipaddress import ip_network, IPv4Network, IPv6Network +from librouteros import connect, Api from librouteros.query import Key from librouteros.exceptions import TrapError @@ -12,69 +12,87 @@ from librouteros.exceptions import TrapError DEFAULT_ASN = "AS2906" # Netflix ASN TIMEOUT = "24:00:00" -ASN = os.getenv("ASN", DEFAULT_ASN).split(',') -MIKROTIK_HOST = os.getenv("MIKROTIK_HOST") -USERNAME = os.getenv("USERNAME") -PASSWORD = os.getenv("PASSWORD") -ADDRESS_LIST_NAME = os.getenv("ADDRESS_LIST_NAME", "Netflix") +APP_NAME = getenv("APP_NAME", "netflix-asn") +ASN = getenv("ASN", DEFAULT_ASN).split(',') +MIKROTIK_HOST = getenv("MIKROTIK_HOST") +USERNAME = getenv("USERNAME") +PASSWORD = getenv("PASSWORD") +ADDRESS_LIST_NAME = getenv("ADDRESS_LIST_NAME", "Netflix") -def get_subnets_from_asn(asn: str) -> List[str]: +def get_subnets_from_asn(asn: str) -> Dict[str, Set[IPv4Network] | Set[IPv6Network]]: """ - Queries BGPView API to get all IPv4 prefixes announced by an ASN. + Queries RIPEstat API to get all IPv4/IPv6 prefixes announced by an ASN. """ + + prefixes: Dict[str, Set[IPv4Network] | Set[IPv6Network]] = {"ipv4": set(), "ipv6": set()} + logging.info(f"Fetching prefixes for ASN {asn}...") - url = f"https://api.bgpview.io/asn/{asn}/prefixes" + url = f"https://stat.ripe.net//data/announced-prefixes/data.json?resource={asn}&sourceapp={APP_NAME}" try: - response = requests.get(url) + response = get(url) response.raise_for_status() - except requests.RequestException as e: + except RequestException as e: raise RuntimeError(f"Failed to fetch ASN data: {e}") from e data = response.json() - prefixes = [item['prefix'] for item in data['data']['ipv4_prefixes']] - logging.info(f"Found {len(prefixes)} IPv4 prefixes for ASN {asn}.") + for item in data['data']['prefixes']: + prefix = ip_network(item['prefix'], strict=False) + if prefix.version == 4: + prefixes["ipv4"].add(prefix) + elif prefix.version == 6: + prefixes["ipv6"].add(prefix) + + logging.info(f"Found {len(prefixes['ipv4'])} IPv4 prefixes and {len(prefixes['ipv6'])} IPv6 prefixes for ASN {asn}.") return prefixes -def get_subnets_from_asns(asns: List[str]) -> List[str]: +def get_subnets_from_asns(asns: List[str]) -> Dict[str, Set[IPv4Network] | Set[IPv6Network]]: """ Fetch prefixes from multiple ASNs. """ - subnets: Set[str] = set() + subnets: Dict[str, Set[IPv4Network] | Set[IPv6Network]] = {"ipv4": set(), "ipv6": set()} for asn in asns: - subnets.update(get_subnets_from_asn(asn)) - time.sleep(2) # Respect API rate limits - return list(subnets) + asn_subnets = get_subnets_from_asn(asn) + for family, prefixes in asn_subnets.items(): + subnets[family].update(prefixes) + return subnets -def subnet_exists(address_list, subnet: str, list_name: str) -> bool: +def subnet_exists(mikrotik_api: Api, subnet: IPv4Network | IPv6Network, list_name: str) -> bool: """ Check if a subnet already exists in the MikroTik address list. """ - entries = address_list.select().where(Key('address') == subnet, Key('list') == list_name) + if subnet.version == 4: + entries = mikrotik_api.path('ip', 'firewall', 'address-list').select().where(Key('address') == str(subnet), Key('list') == list_name) + else: + entries = mikrotik_api.path('ipv6', 'firewall', 'address-list').select().where(Key('address') == str(subnet), Key('list') == list_name) return any(entries) -def add_subnet_address_list(address_list, subnet: str, list_name: str) -> bool: +def add_subnet_address_list(mikrotik_api: Api, subnet: IPv4Network | IPv6Network, list_name: str) -> bool: """ Add a single subnet to the MikroTik address list. """ try: - address_list.add(address=subnet, list=list_name, timeout=TIMEOUT, comment="Added from ASN") + if subnet.version == 4: + mikrotik_api.path('ip', 'firewall', 'address-list').add(address=str(subnet), list=list_name, timeout=TIMEOUT, comment="Added from ASN") + else: + mikrotik_api.path('ipv6', 'firewall', 'address-list').add(address=str(subnet), list=list_name, timeout=TIMEOUT, comment="Added from ASN") return True except TrapError as err: logging.error(f"Failed to add subnet {subnet}: {err}") return False -def add_subnets_address_list(address_list, subnets: List[str], list_name: str) -> int: +def add_subnets_address_list(mikrotik_api: Api, subnets: Dict[str, Set[IPv4Network] | Set[IPv6Network]], list_name: str) -> int: """ Add multiple subnets to the MikroTik address list. """ added = 0 - for subnet in subnets: - if subnet_exists(address_list, subnet, list_name): - logging.debug(f"[SKIP] {subnet} already exists.") - continue - if add_subnet_address_list(address_list, subnet, list_name): - logging.info(f"[ADD] {subnet}") + for _, prefixes in subnets.items(): + for subnet in prefixes: + if subnet_exists(mikrotik_api, subnet, list_name): + logging.debug(f"[SKIP] {subnet} already exists.") + continue + if add_subnet_address_list(mikrotik_api, subnet, list_name): + logging.info(f"[ADD] {subnet}") added += 1 return added @@ -87,24 +105,22 @@ def main(): if not MIKROTIK_HOST or not USERNAME or not PASSWORD: logging.fatal("Missing MikroTik connection info (check env vars)") - sys.exit(1) + exit(1) logging.info(f"Connecting to MikroTik at {MIKROTIK_HOST}...") try: api = connect(host=MIKROTIK_HOST, username=USERNAME, password=PASSWORD) except Exception as e: logging.fatal(f"Failed to connect to MikroTik: {e}", exc_info=True) - sys.exit(1) - - address_list = api.path('ip', 'firewall', 'address-list') + exit(1) try: subnets = get_subnets_from_asns(ASN) except Exception as e: logging.fatal(f"Failed to get subnets from ASN: {e}", exc_info=True) - sys.exit(1) + exit(1) - added = add_subnets_address_list(address_list, subnets, ADDRESS_LIST_NAME) + added = add_subnets_address_list(api, subnets, ADDRESS_LIST_NAME) logging.info(f"Done. Added {added} new subnets.") if __name__ == "__main__":