Compare commits

...

2 Commits

16 changed files with 276 additions and 78 deletions

View File

@@ -8,5 +8,15 @@ SECRET_KEY=replace-with-a-random-64-char-hex-string
# Port configuration
APP_PORT=8000
# CORS allowed origins (comma-separated)
CORS_ORIGINS=http://localhost:8000
# Trusted proxy IPs for correct client IP extraction (comma-separated)
TRUSTED_PROXY_IPS=127.0.0.1
# Database path (default: ./data/gates.db relative to project root)
# DATABASE_URL=sqlite:///./data/gates.db
# Logging configuration
LOG_LEVEL=INFO
#LOG_FILE=logs/app.log

View File

@@ -1,6 +1,11 @@
# Use Python 3.11 slim image
FROM python:3.11-slim
# Ensure Python output is sent straight to stdout/stderr (no buffering),
# which is required for logs to appear in `docker logs`.
ENV PYTHONUNBUFFERED=1 \
PYTHONDONTWRITEBYTECODE=1
# Set working directory
WORKDIR /app
@@ -17,8 +22,8 @@ RUN pip install --no-cache-dir -r requirements.txt
COPY src/ ./src/
COPY data/ ./data/
# Create data directory if it doesn't exist
RUN mkdir -p data
# Create data and log directories if they don't exist
RUN mkdir -p data logs
# Expose port
EXPOSE $APP_PORT

View File

@@ -109,15 +109,46 @@ data/
## Configuration
All settings are read from environment variables.
All settings are read from environment variables (centralised in `src/core/config.py`).
### Security
| Variable | Default | Description |
|---|---|---|
| `SECRET_KEY` | Random 32 bytes | JWT signing key and Fernet encryption key. **Set this explicitly in production.** |
| `ADMIN_USERNAME` | `admin` | Username for the initial admin account |
| `ADMIN_PASSWORD` | *(none)* | Password for the initial admin account. If unset, no seed account is created. |
| `APP_PORT` | `8000` | HTTP port the server listens on |
| `DATABASE_URL` | `sqlite:///data/gates.db` | SQLAlchemy database URL |
| `SECRET_KEY` | *(required)* | JWT signing key and Fernet encryption key. The application will refuse to start if this is not set. Use a long random string (`openssl rand -hex 32`). |
### Admin seed account
| Variable | Default | Description |
|---|---|---|
| `ADMIN_USERNAME` | `admin` | Username for the initial admin account created on first run. |
| `ADMIN_PASSWORD` | *(none)* | Password for the initial admin account. If unset, no seed account is created. Minimum 12 characters. |
### Server
| Variable | Default | Description |
|---|---|---|
| `APP_PORT` | `8000` | HTTP port the server listens on. |
### Database
| Variable | Default | Description |
|---|---|---|
| `DATABASE_URL` | `sqlite:///data/gates.db` | SQLAlchemy database URL. |
### Network / reverse proxy
| Variable | Default | Description |
|---|---|---|
| `CORS_ORIGINS` | *(empty — no cross-origin requests)* | Comma-separated list of allowed CORS origins, e.g. `https://gates.example.com`. |
| `TRUSTED_PROXY_IPS` | `127.0.0.1` | Comma-separated list of reverse-proxy IPs whose `X-Forwarded-For` header is trusted for client IP resolution. |
### Logging
| Variable | Default | Description |
|---|---|---|
| `LOG_LEVEL` | `INFO` | Logging verbosity. One of `DEBUG`, `INFO`, `WARNING`, `ERROR`, `CRITICAL`. |
| `LOG_FILE` | `/var/log/lagomaregates.log` | Path to the rotating log file (10 MB, 5 backups). Set to an empty string to disable file logging. |
## Running with Docker Compose
@@ -126,7 +157,7 @@ All settings are read from environment variables.
docker compose up -d
```
The default `docker-compose.yml` starts the service on port `8000` with the initial admin credentials `admin` / `changeme`. Change `ADMIN_PASSWORD` and set a strong `SECRET_KEY` before deploying.
The default `docker-compose.yml` starts the service on port `8000`. Set a strong `SECRET_KEY` and, optionally, `ADMIN_USERNAME` / `ADMIN_PASSWORD` before deploying.
The `./data` directory is mounted into the container so the SQLite database persists across restarts.
@@ -137,11 +168,11 @@ python -m venv venv
source venv/bin/activate
pip install -r requirements.txt
export SECRET_KEY="change-me"
export SECRET_KEY="$(openssl rand -hex 32)"
export ADMIN_USERNAME="admin"
export ADMIN_PASSWORD="changeme"
export ADMIN_PASSWORD="changeme-at-least-12"
uvicorn src.main:app --reload --port 8000
uvicorn src.main:app --port 8000
```
The application is then available at:

View File

@@ -7,9 +7,14 @@ services:
- "8000:8000"
volumes:
- ./data:/app/data
- ./logs:/app/logs
environment:
- ADMIN_USERNAME=admin
- ADMIN_PASSWORD=changeme
- SECRET_KEY=supersecretkey
- APP_PORT=8000
- CORS_ORIGINS=http://localhost:8000
- TRUSTED_PROXY_IPS=127.0.0.1
- LOG_LEVEL=INFO
- LOG_FILE=/app/logs/app.log
restart: unless-stopped

View File

@@ -1,19 +1,16 @@
import base64
import hashlib
import os
import secrets
from typing import Optional
import bcrypt
from cryptography.fernet import Fernet
from jose import JWTError, jwt
from core.config import SECRET_KEY
# ── Password hashing ──────────────────────────────────────────────────────────
ALGORITHM = "HS256"
# Loaded once at import time; changing SECRET_KEY invalidates all existing tokens.
SECRET_KEY: str = os.environ.get("SECRET_KEY") or secrets.token_hex(32)
def hash_password(plain: str) -> str:
return bcrypt.hashpw(plain.encode(), bcrypt.gensalt()).decode()
@@ -40,7 +37,7 @@ def decode_token(token: str) -> Optional[dict]:
# ── Symmetric encryption for AVConnect passwords ─────────────────────────────
# Derive a stable 32-byte Fernet key from SECRET_KEY so only one env var is needed.
_raw = os.environ.get("SECRET_KEY") or SECRET_KEY
_raw = SECRET_KEY
_fernet_key = base64.urlsafe_b64encode(hashlib.sha256(_raw.encode()).digest())
_fernet = Fernet(_fernet_key)

View File

@@ -1 +1,57 @@
import logging
import os
from datetime import datetime, timezone
from typing import Optional
def utcnow() -> datetime:
"""Return the current UTC time as a timezone-naive datetime.
SQLite (and SQLAlchemy's default column handling) stores datetimes without
timezone info. Using this helper keeps all DB timestamps and comparisons
consistent and avoids TypeError on offset-naive vs offset-aware comparisons.
"""
return datetime.now(timezone.utc).replace(tzinfo=None)
# ── Paths ─────────────────────────────────────────────────────────────────────
_HERE = os.path.dirname(os.path.abspath(__file__)) # src/core/
_SRC_DIR = os.path.dirname(_HERE) # src/
_PROJECT_ROOT = os.path.dirname(_SRC_DIR) # project root
DATA_DIR: str = os.path.join(_PROJECT_ROOT, "data")
# ── Logging ───────────────────────────────────────────────────────────────────
# LOG_LEVEL: one of DEBUG, INFO, WARNING, ERROR, CRITICAL (default: INFO)
LOG_LEVEL: int = getattr(logging, os.environ.get("LOG_LEVEL", "INFO").upper(), logging.INFO)
# LOG_FILE: set to empty string to disable file logging
LOG_FILE: str = os.environ.get("LOG_FILE", "")
# ── Security ──────────────────────────────────────────────────────────────────
SECRET_KEY: str = os.environ.get("SECRET_KEY") or ""
if not SECRET_KEY:
raise RuntimeError("SECRET_KEY environment variable must be set")
# ── Database ──────────────────────────────────────────────────────────────────
DATABASE_URL: str = os.environ.get(
"DATABASE_URL",
f"sqlite:///{os.path.join(DATA_DIR, 'gates.db')}",
)
# ── CORS ──────────────────────────────────────────────────────────────────────
# Comma-separated list of allowed origins, e.g. "https://example.com,https://app.example.com"
# Default to empty list (no cross-origin requests allowed) when not set.
_cors_env = os.environ.get("CORS_ORIGINS", "")
CORS_ORIGINS: list[str] = [o.strip() for o in _cors_env.split(",") if o.strip()]
# ── Proxy ─────────────────────────────────────────────────────────────────────
# Comma-separated list of trusted reverse-proxy IPs for X-Forwarded-For propagation.
# e.g. "127.0.0.1,10.0.0.1"
_proxy_env = os.environ.get("TRUSTED_PROXY_IPS", "127.0.0.1")
TRUSTED_PROXY_IPS: list[str] = [ip.strip() for ip in _proxy_env.split(",") if ip.strip()]
# ── Admin seed ────────────────────────────────────────────────────────────────
ADMIN_USERNAME: str = os.environ.get("ADMIN_USERNAME", "admin")
ADMIN_PASSWORD: Optional[str] = os.environ.get("ADMIN_PASSWORD") or None
# ── Server ────────────────────────────────────────────────────────────────────
APP_PORT: int = int(os.environ.get("APP_PORT", 8000))

View File

@@ -5,15 +5,7 @@ from typing import Optional
from sqlalchemy import Boolean, String, Text, create_engine
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, sessionmaker
_HERE = os.path.dirname(os.path.abspath(__file__))
_SRC_DIR = os.path.dirname(_HERE)
_PROJECT_ROOT = os.path.dirname(_SRC_DIR)
_DATA_DIR = os.path.join(_PROJECT_ROOT, "data")
DATABASE_URL = os.environ.get(
"DATABASE_URL",
f"sqlite:///{os.path.join(_DATA_DIR, 'gates.db')}",
)
from core.config import DATA_DIR, DATABASE_URL
engine = create_engine(DATABASE_URL, connect_args={"check_same_thread": False})
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
@@ -90,5 +82,5 @@ def get_db():
def init_db() -> None:
os.makedirs(_DATA_DIR, exist_ok=True)
os.makedirs(DATA_DIR, exist_ok=True)
Base.metadata.create_all(bind=engine)

View File

@@ -6,6 +6,7 @@ from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from sqlalchemy.orm import Session
from core.auth import decode_token
from core.config import utcnow
from core.database import Keypass, get_db
_security = HTTPBearer()
@@ -45,6 +46,6 @@ def require_keypass(
raise HTTPException(status.HTTP_401_UNAUTHORIZED, "Keypass not found")
if kp.revoked:
raise HTTPException(status.HTTP_401_UNAUTHORIZED, "Keypass has been revoked")
if kp.expires_at is not None and kp.expires_at < datetime.utcnow():
if kp.expires_at is not None and kp.expires_at < utcnow():
raise HTTPException(status.HTTP_401_UNAUTHORIZED, "Keypass has expired")
return kp

View File

@@ -1,15 +1,51 @@
import logging
import logging.handlers
import os
import sys
from contextlib import asynccontextmanager
import uvicorn
from fastapi import FastAPI
from fastapi import FastAPI, Request, Response
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import FileResponse
from fastapi.staticfiles import StaticFiles
from uvicorn.middleware.proxy_headers import ProxyHeadersMiddleware
# Ensure src/ root is importable for models/services/routers
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from core.config import ADMIN_PASSWORD, ADMIN_USERNAME, APP_PORT, CORS_ORIGINS, LOG_FILE, LOG_LEVEL, TRUSTED_PROXY_IPS
# ── Logging ───────────────────────────────────────────────────────────────────
_log_fmt = logging.Formatter(
fmt="%(asctime)s %(levelname)-8s %(name)s: %(message)s",
datefmt="%Y-%m-%dT%H:%M:%S",
)
logging.basicConfig(
level=LOG_LEVEL,
format="%(asctime)s %(levelname)-8s %(name)s: %(message)s",
datefmt="%Y-%m-%dT%H:%M:%S",
)
if LOG_FILE:
try:
_file_handler = logging.handlers.RotatingFileHandler(
LOG_FILE,
maxBytes=10 * 1024 * 1024, # 10 MB
backupCount=5,
encoding="utf-8",
)
_file_handler.setFormatter(_log_fmt)
logging.getLogger().addHandler(_file_handler)
except OSError as _e:
logging.getLogger(__name__).warning(
"Cannot open log file %r — file logging disabled: %s", LOG_FILE, _e
)
# Quieten noisy third-party loggers
logging.getLogger("uvicorn.access").setLevel(max(LOG_LEVEL, logging.INFO))
logging.getLogger("sqlalchemy.engine").setLevel(max(LOG_LEVEL, logging.WARNING))
logger = logging.getLogger(__name__)
from core.auth import hash_password
from core.database import AdminUser, SessionLocal, init_db
from routers.auth import router as auth_router
@@ -20,18 +56,41 @@ from routers.admins import router as admins_router
from routers.stats import router as stats_router
# ── App ───────────────────────────────────────────────────────────────────────
app = FastAPI(title="Lagomare Gates", docs_url=None, redoc_url=None)
@asynccontextmanager
async def lifespan(app: FastAPI):
init_db()
_seed_admin()
yield
app = FastAPI(title="Lagomare Gates", docs_url=None, redoc_url=None, lifespan=lifespan)
_STATIC_DIR = os.path.join(os.path.dirname(__file__), "static")
app.mount("/static", StaticFiles(directory=_STATIC_DIR), name="static")
# Trust X-Forwarded-For only from configured proxy IPs so request.client.host
# is already the real client address everywhere else in the code.
app.add_middleware(ProxyHeadersMiddleware, trusted_hosts=TRUSTED_PROXY_IPS)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["*"],
allow_headers=["*"],
allow_origins=CORS_ORIGINS,
allow_methods=["GET", "POST", "PUT", "PATCH", "DELETE"],
allow_headers=["Authorization", "Content-Type"],
)
# Inject security headers on every response
@app.middleware("http")
async def _security_headers(request: Request, call_next) -> Response:
response = await call_next(request)
response.headers["X-Content-Type-Options"] = "nosniff"
response.headers["X-Frame-Options"] = "DENY"
response.headers["Referrer-Policy"] = "strict-origin-when-cross-origin"
response.headers["Content-Security-Policy"] = (
"default-src 'self'; script-src 'self'; style-src 'self' 'unsafe-inline'; img-src 'self' data:"
)
return response
# ── Routers (Controllers) ─────────────────────────────────────────────────────
app.include_router(auth_router)
app.include_router(keypasses_router)
@@ -72,28 +131,23 @@ async def _serve_manifest() -> FileResponse:
)
# ── Startup ───────────────────────────────────────────────────────────────────
@app.on_event("startup")
async def _startup() -> None:
init_db()
_seed_admin()
def _seed_admin() -> None:
"""Create the initial admin user from env vars if it doesn't exist yet."""
username = os.environ.get("ADMIN_USERNAME", "admin")
password = os.environ.get("ADMIN_PASSWORD")
if not password:
if not ADMIN_PASSWORD:
return
db = SessionLocal()
try:
if not db.query(AdminUser).filter_by(username=username).first():
db.add(AdminUser(username=username, password_hash=hash_password(password)))
if not db.query(AdminUser).filter_by(username=ADMIN_USERNAME).first():
db.add(AdminUser(username=ADMIN_USERNAME, password_hash=hash_password(ADMIN_PASSWORD)))
db.commit()
finally:
db.close()
if __name__ == "__main__":
port = int(os.environ.get("APP_PORT", 8000))
uvicorn.run("main:app", host="0.0.0.0", port=port, reload=True)
uvicorn.run(
"main:app",
host="0.0.0.0",
port=APP_PORT,
reload=False,
log_level=logging.getLevelName(LOG_LEVEL).lower(),
)

View File

@@ -29,6 +29,8 @@ async def create_admin(
raise HTTPException(422, "Username cannot be empty")
if req.role not in ("admin", "manager"):
raise HTTPException(422, "role must be 'admin' or 'manager'")
if len(req.password) < 12:
raise HTTPException(422, "Password must be at least 12 characters")
if db.query(AdminUser).filter_by(username=username).first():
raise HTTPException(409, "Username already exists")
user = AdminUser(username=username, password_hash=hash_password(req.password), role=req.role)
@@ -64,6 +66,8 @@ async def change_password(
):
if not req.new_password:
raise HTTPException(422, "Password cannot be empty")
if len(req.new_password) < 12:
raise HTTPException(422, "Password must be at least 12 characters")
user: Optional[AdminUser] = db.query(AdminUser).filter_by(username=username).first()
if not user:
raise HTTPException(404, "Admin not found")

View File

@@ -1,26 +1,30 @@
from datetime import datetime, timedelta
import logging
from datetime import datetime, timedelta, timezone
from typing import Optional
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.orm import Session
from core.auth import create_token, verify_password
from core.config import utcnow
from core.database import AdminUser, Keypass, get_db
from core.schemas import AdminLoginRequest, KeypassLoginRequest, TokenResponse
router = APIRouter(prefix="/api/auth", tags=["auth"])
logger = logging.getLogger(__name__)
@router.post("/admin", response_model=TokenResponse)
async def admin_login(req: AdminLoginRequest, db: Session = Depends(get_db)):
user: Optional[AdminUser] = db.query(AdminUser).filter_by(username=req.username).first()
if not user or not verify_password(req.password, user.password_hash):
logger.warning("Failed admin login attempt for username=%r", req.username)
raise HTTPException(status.HTTP_401_UNAUTHORIZED, "Invalid credentials")
logger.info("Admin login: username=%r role=%r", user.username, user.role)
token = create_token({
"sub": user.username,
"role": "admin",
"scope": user.role, # 'admin' | 'manager'
"exp": datetime.utcnow() + timedelta(hours=24),
"exp": datetime.now(timezone.utc) + timedelta(hours=24),
})
return TokenResponse(token=token)
@@ -34,9 +38,9 @@ async def keypass_login(req: KeypassLoginRequest, db: Session = Depends(get_db))
raise HTTPException(status.HTTP_401_UNAUTHORIZED, "Invalid keypass")
if kp.revoked:
raise HTTPException(status.HTTP_401_UNAUTHORIZED, "Keypass has been revoked")
if kp.expires_at is not None and kp.expires_at < datetime.utcnow():
if kp.expires_at is not None and kp.expires_at < utcnow():
raise HTTPException(status.HTTP_401_UNAUTHORIZED, "Keypass has expired")
exp = kp.expires_at if kp.expires_at else datetime(2099, 12, 31, 23, 59, 59)
exp = kp.expires_at if kp.expires_at else datetime(2099, 12, 31, 23, 59, 59, tzinfo=timezone.utc)
token = create_token({
"sub": str(kp.id),
"role": "keypass",

View File

@@ -1,6 +1,6 @@
from typing import Optional
from fastapi import APIRouter, Depends
from fastapi import APIRouter, Depends, HTTPException
from pydantic import BaseModel
from sqlalchemy.orm import Session
@@ -8,6 +8,7 @@ from core.auth import encrypt_secret
from core.database import ApiCredential, get_db
from core.dependencies import require_admin
from core.schemas import CredentialRead, CredentialUpsert
from services.avconnect import validate_credentials
router = APIRouter(prefix="/api/admin/credentials", tags=["admin-credentials"])
@@ -25,15 +26,23 @@ async def upsert_credential(
db: Session = Depends(get_db),
_: dict = Depends(require_admin),
):
try:
ok, session_id = validate_credentials(req.username, req.password)
except Exception as exc:
raise HTTPException(502, f"Could not reach AVConnect: {exc}")
if not ok:
raise HTTPException(422, "AVConnect rejected these credentials")
cred: Optional[ApiCredential] = db.query(ApiCredential).first()
if cred:
cred.username = req.username
cred.password_enc = encrypt_secret(req.password)
cred.session_id = None # invalidate any cached session
cred.session_id = session_id # reuse the session obtained during validation
else:
cred = ApiCredential(
username=req.username,
password_enc=encrypt_secret(req.password),
session_id=session_id,
)
db.add(cred)
db.commit()

View File

@@ -1,17 +1,19 @@
import json
from datetime import datetime
import logging
from typing import Optional
from fastapi import APIRouter, Depends, HTTPException, Request
from sqlalchemy.orm import Session
from core.auth import decrypt_secret
from core.config import utcnow
from core.database import ApiCredential, GateAccessLog, GateDB, Keypass, get_db
from core.dependencies import require_admin, require_manager, require_keypass
from core.schemas import GateCreate, GatePublicResponse, GateResponse
from services.gates import call_open_gate
router = APIRouter(tags=["gates"])
logger = logging.getLogger(__name__)
# ── Admin: gate CRUD ──────────────────────────────────────────────────────────
@@ -85,7 +87,7 @@ async def admin_open_gate(
if not cred_db:
raise HTTPException(503, "AVConnect credentials not configured")
ip = request.headers.get("X-Forwarded-For", request.client.host if request.client else None)
ip = request.client.host if request.client else None
ua = request.headers.get("User-Agent")
mock = bool(cred_db.mock_avconnect)
@@ -98,7 +100,7 @@ async def admin_open_gate(
)
db.add(GateAccessLog(
timestamp=datetime.utcnow(),
timestamp=utcnow(),
keypass_id=0,
keypass_code=f"[{caller['sub']}]",
gate_id=gate_db.id,
@@ -114,8 +116,10 @@ async def admin_open_gate(
db.commit()
if not success:
logger.error("Gate open failed: gate_id=%d caller=%r error=%r", gate_db.id, caller["sub"], error_msg)
raise HTTPException(502, error_msg or "Gate operation failed")
logger.info("Gate opened by admin: gate_id=%d gate=%r caller=%r ip=%r", gate_db.id, gate_db.name, caller["sub"], ip)
return {"success": True, "gate": gate_db.name}
@@ -149,7 +153,7 @@ async def open_gate(
if not cred_db:
raise HTTPException(503, "AVConnect credentials not configured")
ip = request.headers.get("X-Forwarded-For", request.client.host if request.client else None)
ip = request.client.host if request.client else None
ua = request.headers.get("User-Agent")
allowed = json.loads(_kp.allowed_gates) if _kp.allowed_gates else None
@@ -166,7 +170,7 @@ async def open_gate(
)
db.add(GateAccessLog(
timestamp=datetime.utcnow(),
timestamp=utcnow(),
keypass_id=_kp.id,
keypass_code=_kp.code,
gate_id=gate_db.id,
@@ -177,11 +181,9 @@ async def open_gate(
error=error_msg,
))
if new_sid and new_sid != cred_db.session_id:
cred_db.session_id = new_sid
db.commit()
if not success:
logger.error("Gate open failed: gate_id=%d keypass_id=%d error=%r", gate_db.id, _kp.id, error_msg)
raise HTTPException(502, error_msg or "Gate operation failed")
logger.info("Gate opened by keypass: gate_id=%d gate=%r keypass_id=%d ip=%r", gate_db.id, gate_db.name, _kp.id, ip)
return {"success": True, "gate": gate_db.name}

View File

@@ -1,12 +1,13 @@
import json
import secrets
import string
from datetime import datetime
from datetime import datetime, timezone
from typing import Optional
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.orm import Session
from core.config import utcnow
from core.database import Keypass, get_db
from core.dependencies import require_manager
from core.schemas import KeypassCreate, KeypassPatch, KeypassResponse, keypass_to_response
@@ -38,7 +39,7 @@ async def create_keypass(
kp = Keypass(
code=code,
description=req.description,
created_at=datetime.utcnow(),
created_at=utcnow(),
expires_at=req.expires_at,
revoked=False,
allowed_gates=json.dumps(req.gate_ids) if req.gate_ids else None,
@@ -80,10 +81,10 @@ async def revoke_keypass(
kp: Optional[Keypass] = db.query(Keypass).filter(Keypass.id == kp_id).first()
if not kp:
raise HTTPException(404, "Keypass not found")
if kp.expires_at is not None and kp.expires_at < datetime.utcnow():
if kp.expires_at is not None and kp.expires_at < utcnow():
raise HTTPException(409, "Expired keypasses cannot be revoked")
if kp.revoked:
raise HTTPException(409, "Keypass is already revoked")
kp.revoked = True
kp.revoked_at = datetime.utcnow()
kp.revoked_at = utcnow()
db.commit()

View File

@@ -1,6 +1,12 @@
import logging
import urllib.parse
import requests
from fake_useragent import UserAgent
logger = logging.getLogger(__name__)
class AVConnectAPI:
_BASE_URL = "https://www.avconnect.it"
@@ -15,20 +21,29 @@ class AVConnectAPI:
self._session.cookies.set("PHPSESSID", session_id)
self._authenticated = True
_LOGIN_SUCCESS_PATH = "/entraconf.php"
_LOGIN_DENIED_PATH = "/accessdenied.htm"
def _authenticate(self) -> bool:
login_url = f"{self._BASE_URL}/loginone.php"
headers = {
"User-Agent": self._ua,
"Content-Type": "application/x-www-form-urlencoded"
}
payload = f"userid={self._username}&password={self._password}&entra=Login"
response = self._session.post(login_url, data=payload, headers=headers)
if response.ok and "PHPSESSID" in self._session.cookies:
payload = urllib.parse.urlencode({"userid": self._username, "password": self._password, "entra": "Login"})
# allow_redirects=False so we can inspect the Location header directly.
response = self._session.post(login_url, data=payload, headers=headers, allow_redirects=False)
location = response.headers.get("Location", "")
if response.status_code == 302 and self._LOGIN_SUCCESS_PATH in location:
self._authenticated = True
print("Authenticated")
logger.debug("AVConnect authentication successful")
return True
if self._LOGIN_DENIED_PATH in location:
logger.warning("AVConnect authentication denied (invalid credentials)")
else:
logger.warning("AVConnect authentication failed: status=%d location=%r", response.status_code, location)
return False
def _check_sessionid(self) -> bool:
if not self._authenticated or not self._session.cookies.get("PHPSESSID"):
return False
@@ -37,7 +52,7 @@ class AVConnectAPI:
"User-Agent": self._ua,
}
response = self._session.get(exec_url, headers=headers)
print(response.ok)
logger.debug("AVConnect session check: %s", response.ok)
return response.ok
def exec_gate_macro(self, id_macro) -> bool:
@@ -48,6 +63,19 @@ class AVConnectAPI:
"User-Agent": self._ua,
"Content-Type": "application/x-www-form-urlencoded; charset=UTF-8"
}
payload = f"idmacrocom={id_macro}&nome=16"
payload = urllib.parse.urlencode({"idmacrocom": id_macro, "nome": "16"})
response = self._session.post(exec_url, data=payload, headers=headers)
return response.ok
return response.ok
def validate_credentials(username: str, password: str) -> tuple[bool, str | None]:
"""Attempt a login and return (ok, session_id_or_None).
Returns (False, None) if the credentials are rejected.
Raises on unexpected network errors.
"""
api = AVConnectAPI(username, password)
if not api._authenticate():
return False, None
session_id = api._session.cookies.get("PHPSESSID") or None
return True, session_id

View File

@@ -13,7 +13,6 @@ def call_open_gate(
"""Attempt to open a gate. Returns (success, error_msg, new_session_id)."""
if mock:
return True, None, None
return True, None, None
try:
api = AVConnectAPI(username, password, session_id)
ok = api.exec_gate_macro(macro_id)