Use RIPEstat for ASN announced prefixes

This commit is contained in:
Ettore
2026-04-08 22:46:03 +02:00
parent 52e6b4a9a0
commit fa1e465d4f
2 changed files with 61 additions and 43 deletions

View File

@@ -1,6 +1,6 @@
# netflix-asn # netflix-asn
A small **Python utility** that fetches IPv4/IPv6 prefixes announced by one or more ASNs (via the [BGPView API](https://bgpview.io/api)) and ensures those prefixes are present in a MikroTik **IP firewall address-list**. A small **Python utility** that fetches IPv4/IPv6 prefixes announced by one or more ASNs (via the [RIPEstat API](https://stat.ripe.net/)) and ensures those prefixes are present in a MikroTik **IP firewall address-list**.
Its designed to run inside **Docker** — using a `Dockerfile` and `docker-compose.yml`. Its designed to run inside **Docker** — using a `Dockerfile` and `docker-compose.yml`.
@@ -68,9 +68,11 @@ ADDRESS_LIST_NAME=Netflix
1. The script loads configuration from environment variables. 1. The script loads configuration from environment variables.
2. For each ASN, it queries: 2. For each ASN, it queries:
```text
https://stat.ripe.net//data/announced-prefixes/data.json?resource=<ASN>"
``` ```
https://api.bgpview.io/asn/<ASN>/prefixes
```
3. It collects all IPv4/IPv6 prefixes and removes duplicates. 3. It collects all IPv4/IPv6 prefixes and removes duplicates.
4. Connects to the MikroTik API using [`librouteros`](https://pypi.org/project/librouteros/). 4. Connects to the MikroTik API using [`librouteros`](https://pypi.org/project/librouteros/).
5. For each prefix: 5. For each prefix:

View File

@@ -1,10 +1,10 @@
import os
import sys
import time
import logging import logging
import requests from sys import exit
from typing import List, Set from os import getenv
from librouteros import connect from typing import Dict, List, Set
from requests import get, RequestException
from ipaddress import ip_network, IPv4Network, IPv6Network
from librouteros import connect, Api
from librouteros.query import Key from librouteros.query import Key
from librouteros.exceptions import TrapError from librouteros.exceptions import TrapError
@@ -12,68 +12,86 @@ from librouteros.exceptions import TrapError
DEFAULT_ASN = "AS2906" # Netflix ASN DEFAULT_ASN = "AS2906" # Netflix ASN
TIMEOUT = "24:00:00" TIMEOUT = "24:00:00"
ASN = os.getenv("ASN", DEFAULT_ASN).split(',') APP_NAME = getenv("APP_NAME", "netflix-asn")
MIKROTIK_HOST = os.getenv("MIKROTIK_HOST") ASN = getenv("ASN", DEFAULT_ASN).split(',')
USERNAME = os.getenv("USERNAME") MIKROTIK_HOST = getenv("MIKROTIK_HOST")
PASSWORD = os.getenv("PASSWORD") USERNAME = getenv("USERNAME")
ADDRESS_LIST_NAME = os.getenv("ADDRESS_LIST_NAME", "Netflix") PASSWORD = getenv("PASSWORD")
ADDRESS_LIST_NAME = getenv("ADDRESS_LIST_NAME", "Netflix")
def get_subnets_from_asn(asn: str) -> List[str]: def get_subnets_from_asn(asn: str) -> Dict[str, Set[IPv4Network] | Set[IPv6Network]]:
""" """
Queries BGPView API to get all IPv4 prefixes announced by an ASN. Queries RIPEstat API to get all IPv4/IPv6 prefixes announced by an ASN.
""" """
prefixes: Dict[str, Set[IPv4Network] | Set[IPv6Network]] = {"ipv4": set(), "ipv6": set()}
logging.info(f"Fetching prefixes for ASN {asn}...") logging.info(f"Fetching prefixes for ASN {asn}...")
url = f"https://api.bgpview.io/asn/{asn}/prefixes" url = f"https://stat.ripe.net//data/announced-prefixes/data.json?resource={asn}&sourceapp={APP_NAME}"
try: try:
response = requests.get(url) response = get(url)
response.raise_for_status() response.raise_for_status()
except requests.RequestException as e: except RequestException as e:
raise RuntimeError(f"Failed to fetch ASN data: {e}") from e raise RuntimeError(f"Failed to fetch ASN data: {e}") from e
data = response.json() data = response.json()
prefixes = [item['prefix'] for item in data['data']['ipv4_prefixes']] for item in data['data']['prefixes']:
logging.info(f"Found {len(prefixes)} IPv4 prefixes for ASN {asn}.") prefix = ip_network(item['prefix'], strict=False)
if prefix.version == 4:
prefixes["ipv4"].add(prefix)
elif prefix.version == 6:
prefixes["ipv6"].add(prefix)
logging.info(f"Found {len(prefixes['ipv4'])} IPv4 prefixes and {len(prefixes['ipv6'])} IPv6 prefixes for ASN {asn}.")
return prefixes return prefixes
def get_subnets_from_asns(asns: List[str]) -> List[str]: def get_subnets_from_asns(asns: List[str]) -> Dict[str, Set[IPv4Network] | Set[IPv6Network]]:
""" """
Fetch prefixes from multiple ASNs. Fetch prefixes from multiple ASNs.
""" """
subnets: Set[str] = set() subnets: Dict[str, Set[IPv4Network] | Set[IPv6Network]] = {"ipv4": set(), "ipv6": set()}
for asn in asns: for asn in asns:
subnets.update(get_subnets_from_asn(asn)) asn_subnets = get_subnets_from_asn(asn)
time.sleep(2) # Respect API rate limits for family, prefixes in asn_subnets.items():
return list(subnets) subnets[family].update(prefixes)
return subnets
def subnet_exists(address_list, subnet: str, list_name: str) -> bool: def subnet_exists(mikrotik_api: Api, subnet: IPv4Network | IPv6Network, list_name: str) -> bool:
""" """
Check if a subnet already exists in the MikroTik address list. Check if a subnet already exists in the MikroTik address list.
""" """
entries = address_list.select().where(Key('address') == subnet, Key('list') == list_name) if subnet.version == 4:
entries = mikrotik_api.path('ip', 'firewall', 'address-list').select().where(Key('address') == str(subnet), Key('list') == list_name)
else:
entries = mikrotik_api.path('ipv6', 'firewall', 'address-list').select().where(Key('address') == str(subnet), Key('list') == list_name)
return any(entries) return any(entries)
def add_subnet_address_list(address_list, subnet: str, list_name: str) -> bool: def add_subnet_address_list(mikrotik_api: Api, subnet: IPv4Network | IPv6Network, list_name: str) -> bool:
""" """
Add a single subnet to the MikroTik address list. Add a single subnet to the MikroTik address list.
""" """
try: try:
address_list.add(address=subnet, list=list_name, timeout=TIMEOUT, comment="Added from ASN") if subnet.version == 4:
mikrotik_api.path('ip', 'firewall', 'address-list').add(address=str(subnet), list=list_name, timeout=TIMEOUT, comment="Added from ASN")
else:
mikrotik_api.path('ipv6', 'firewall', 'address-list').add(address=str(subnet), list=list_name, timeout=TIMEOUT, comment="Added from ASN")
return True return True
except TrapError as err: except TrapError as err:
logging.error(f"Failed to add subnet {subnet}: {err}") logging.error(f"Failed to add subnet {subnet}: {err}")
return False return False
def add_subnets_address_list(address_list, subnets: List[str], list_name: str) -> int: def add_subnets_address_list(mikrotik_api: Api, subnets: Dict[str, Set[IPv4Network] | Set[IPv6Network]], list_name: str) -> int:
""" """
Add multiple subnets to the MikroTik address list. Add multiple subnets to the MikroTik address list.
""" """
added = 0 added = 0
for subnet in subnets: for _, prefixes in subnets.items():
if subnet_exists(address_list, subnet, list_name): for subnet in prefixes:
if subnet_exists(mikrotik_api, subnet, list_name):
logging.debug(f"[SKIP] {subnet} already exists.") logging.debug(f"[SKIP] {subnet} already exists.")
continue continue
if add_subnet_address_list(address_list, subnet, list_name): if add_subnet_address_list(mikrotik_api, subnet, list_name):
logging.info(f"[ADD] {subnet}") logging.info(f"[ADD] {subnet}")
added += 1 added += 1
return added return added
@@ -87,24 +105,22 @@ def main():
if not MIKROTIK_HOST or not USERNAME or not PASSWORD: if not MIKROTIK_HOST or not USERNAME or not PASSWORD:
logging.fatal("Missing MikroTik connection info (check env vars)") logging.fatal("Missing MikroTik connection info (check env vars)")
sys.exit(1) exit(1)
logging.info(f"Connecting to MikroTik at {MIKROTIK_HOST}...") logging.info(f"Connecting to MikroTik at {MIKROTIK_HOST}...")
try: try:
api = connect(host=MIKROTIK_HOST, username=USERNAME, password=PASSWORD) api = connect(host=MIKROTIK_HOST, username=USERNAME, password=PASSWORD)
except Exception as e: except Exception as e:
logging.fatal(f"Failed to connect to MikroTik: {e}", exc_info=True) logging.fatal(f"Failed to connect to MikroTik: {e}", exc_info=True)
sys.exit(1) exit(1)
address_list = api.path('ip', 'firewall', 'address-list')
try: try:
subnets = get_subnets_from_asns(ASN) subnets = get_subnets_from_asns(ASN)
except Exception as e: except Exception as e:
logging.fatal(f"Failed to get subnets from ASN: {e}", exc_info=True) logging.fatal(f"Failed to get subnets from ASN: {e}", exc_info=True)
sys.exit(1) exit(1)
added = add_subnets_address_list(address_list, subnets, ADDRESS_LIST_NAME) added = add_subnets_address_list(api, subnets, ADDRESS_LIST_NAME)
logging.info(f"Done. Added {added} new subnets.") logging.info(f"Done. Added {added} new subnets.")
if __name__ == "__main__": if __name__ == "__main__":