Fix security vulnerabilities. Add logging

This commit is contained in:
Ettore
2026-05-09 17:52:59 +02:00
parent d803e2d7f6
commit 69e4f594de
14 changed files with 226 additions and 72 deletions

View File

@@ -1,19 +1,16 @@
import base64
import hashlib
import os
import secrets
from typing import Optional
import bcrypt
from cryptography.fernet import Fernet
from jose import JWTError, jwt
from core.config import SECRET_KEY
# ── Password hashing ──────────────────────────────────────────────────────────
ALGORITHM = "HS256"
# Loaded once at import time; changing SECRET_KEY invalidates all existing tokens.
SECRET_KEY: str = os.environ.get("SECRET_KEY") or secrets.token_hex(32)
def hash_password(plain: str) -> str:
return bcrypt.hashpw(plain.encode(), bcrypt.gensalt()).decode()
@@ -40,7 +37,7 @@ def decode_token(token: str) -> Optional[dict]:
# ── Symmetric encryption for AVConnect passwords ─────────────────────────────
# Derive a stable 32-byte Fernet key from SECRET_KEY so only one env var is needed.
_raw = os.environ.get("SECRET_KEY") or SECRET_KEY
_raw = SECRET_KEY
_fernet_key = base64.urlsafe_b64encode(hashlib.sha256(_raw.encode()).digest())
_fernet = Fernet(_fernet_key)

View File

@@ -1 +1,45 @@
import logging
import os
from typing import Optional
# ── Paths ─────────────────────────────────────────────────────────────────────
_HERE = os.path.dirname(os.path.abspath(__file__)) # src/core/
_SRC_DIR = os.path.dirname(_HERE) # src/
_PROJECT_ROOT = os.path.dirname(_SRC_DIR) # project root
DATA_DIR: str = os.path.join(_PROJECT_ROOT, "data")
# ── Logging ───────────────────────────────────────────────────────────────────
# LOG_LEVEL: one of DEBUG, INFO, WARNING, ERROR, CRITICAL (default: INFO)
LOG_LEVEL: int = getattr(logging, os.environ.get("LOG_LEVEL", "INFO").upper(), logging.INFO)
# LOG_FILE: set to empty string to disable file logging
LOG_FILE: str = os.environ.get("LOG_FILE", "")
# ── Security ──────────────────────────────────────────────────────────────────
SECRET_KEY: str = os.environ.get("SECRET_KEY") or ""
if not SECRET_KEY:
raise RuntimeError("SECRET_KEY environment variable must be set")
# ── Database ──────────────────────────────────────────────────────────────────
DATABASE_URL: str = os.environ.get(
"DATABASE_URL",
f"sqlite:///{os.path.join(DATA_DIR, 'gates.db')}",
)
# ── CORS ──────────────────────────────────────────────────────────────────────
# Comma-separated list of allowed origins, e.g. "https://example.com,https://app.example.com"
# Default to empty list (no cross-origin requests allowed) when not set.
_cors_env = os.environ.get("CORS_ORIGINS", "")
CORS_ORIGINS: list[str] = [o.strip() for o in _cors_env.split(",") if o.strip()]
# ── Proxy ─────────────────────────────────────────────────────────────────────
# Comma-separated list of trusted reverse-proxy IPs for X-Forwarded-For propagation.
# e.g. "127.0.0.1,10.0.0.1"
_proxy_env = os.environ.get("TRUSTED_PROXY_IPS", "127.0.0.1")
TRUSTED_PROXY_IPS: list[str] = [ip.strip() for ip in _proxy_env.split(",") if ip.strip()]
# ── Admin seed ────────────────────────────────────────────────────────────────
ADMIN_USERNAME: str = os.environ.get("ADMIN_USERNAME", "admin")
ADMIN_PASSWORD: Optional[str] = os.environ.get("ADMIN_PASSWORD") or None
# ── Server ────────────────────────────────────────────────────────────────────
APP_PORT: int = int(os.environ.get("APP_PORT", 8000))

View File

@@ -5,15 +5,7 @@ from typing import Optional
from sqlalchemy import Boolean, String, Text, create_engine
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, sessionmaker
_HERE = os.path.dirname(os.path.abspath(__file__))
_SRC_DIR = os.path.dirname(_HERE)
_PROJECT_ROOT = os.path.dirname(_SRC_DIR)
_DATA_DIR = os.path.join(_PROJECT_ROOT, "data")
DATABASE_URL = os.environ.get(
"DATABASE_URL",
f"sqlite:///{os.path.join(_DATA_DIR, 'gates.db')}",
)
from core.config import DATA_DIR, DATABASE_URL
engine = create_engine(DATABASE_URL, connect_args={"check_same_thread": False})
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
@@ -90,5 +82,5 @@ def get_db():
def init_db() -> None:
os.makedirs(_DATA_DIR, exist_ok=True)
os.makedirs(DATA_DIR, exist_ok=True)
Base.metadata.create_all(bind=engine)

View File

@@ -1,4 +1,4 @@
from datetime import datetime
from datetime import datetime, timezone
from typing import Optional
from fastapi import Depends, HTTPException, status
@@ -45,6 +45,6 @@ def require_keypass(
raise HTTPException(status.HTTP_401_UNAUTHORIZED, "Keypass not found")
if kp.revoked:
raise HTTPException(status.HTTP_401_UNAUTHORIZED, "Keypass has been revoked")
if kp.expires_at is not None and kp.expires_at < datetime.utcnow():
if kp.expires_at is not None and kp.expires_at < datetime.now(timezone.utc):
raise HTTPException(status.HTTP_401_UNAUTHORIZED, "Keypass has expired")
return kp

View File

@@ -1,15 +1,51 @@
import logging
import logging.handlers
import os
import sys
from contextlib import asynccontextmanager
import uvicorn
from fastapi import FastAPI
from fastapi import FastAPI, Request, Response
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import FileResponse
from fastapi.staticfiles import StaticFiles
from uvicorn.middleware.proxy_headers import ProxyHeadersMiddleware
# Ensure src/ root is importable for models/services/routers
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from core.config import ADMIN_PASSWORD, ADMIN_USERNAME, APP_PORT, CORS_ORIGINS, LOG_FILE, LOG_LEVEL, TRUSTED_PROXY_IPS
# ── Logging ───────────────────────────────────────────────────────────────────
_log_fmt = logging.Formatter(
fmt="%(asctime)s %(levelname)-8s %(name)s: %(message)s",
datefmt="%Y-%m-%dT%H:%M:%S",
)
logging.basicConfig(
level=LOG_LEVEL,
format="%(asctime)s %(levelname)-8s %(name)s: %(message)s",
datefmt="%Y-%m-%dT%H:%M:%S",
)
if LOG_FILE:
try:
_file_handler = logging.handlers.RotatingFileHandler(
LOG_FILE,
maxBytes=10 * 1024 * 1024, # 10 MB
backupCount=5,
encoding="utf-8",
)
_file_handler.setFormatter(_log_fmt)
logging.getLogger().addHandler(_file_handler)
except OSError as _e:
logging.getLogger(__name__).warning(
"Cannot open log file %r — file logging disabled: %s", LOG_FILE, _e
)
# Quieten noisy third-party loggers
logging.getLogger("uvicorn.access").setLevel(max(LOG_LEVEL, logging.INFO))
logging.getLogger("sqlalchemy.engine").setLevel(max(LOG_LEVEL, logging.WARNING))
logger = logging.getLogger(__name__)
from core.auth import hash_password
from core.database import AdminUser, SessionLocal, init_db
from routers.auth import router as auth_router
@@ -20,18 +56,41 @@ from routers.admins import router as admins_router
from routers.stats import router as stats_router
# ── App ───────────────────────────────────────────────────────────────────────
app = FastAPI(title="Lagomare Gates", docs_url=None, redoc_url=None)
@asynccontextmanager
async def lifespan(app: FastAPI):
init_db()
_seed_admin()
yield
app = FastAPI(title="Lagomare Gates", docs_url=None, redoc_url=None, lifespan=lifespan)
_STATIC_DIR = os.path.join(os.path.dirname(__file__), "static")
app.mount("/static", StaticFiles(directory=_STATIC_DIR), name="static")
# Trust X-Forwarded-For only from configured proxy IPs so request.client.host
# is already the real client address everywhere else in the code.
app.add_middleware(ProxyHeadersMiddleware, trusted_hosts=TRUSTED_PROXY_IPS)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["*"],
allow_headers=["*"],
allow_origins=CORS_ORIGINS,
allow_methods=["GET", "POST", "PUT", "PATCH", "DELETE"],
allow_headers=["Authorization", "Content-Type"],
)
# Inject security headers on every response
@app.middleware("http")
async def _security_headers(request: Request, call_next) -> Response:
response = await call_next(request)
response.headers["X-Content-Type-Options"] = "nosniff"
response.headers["X-Frame-Options"] = "DENY"
response.headers["Referrer-Policy"] = "strict-origin-when-cross-origin"
response.headers["Content-Security-Policy"] = (
"default-src 'self'; script-src 'self'; style-src 'self' 'unsafe-inline'; img-src 'self' data:"
)
return response
# ── Routers (Controllers) ─────────────────────────────────────────────────────
app.include_router(auth_router)
app.include_router(keypasses_router)
@@ -72,28 +131,23 @@ async def _serve_manifest() -> FileResponse:
)
# ── Startup ───────────────────────────────────────────────────────────────────
@app.on_event("startup")
async def _startup() -> None:
init_db()
_seed_admin()
def _seed_admin() -> None:
"""Create the initial admin user from env vars if it doesn't exist yet."""
username = os.environ.get("ADMIN_USERNAME", "admin")
password = os.environ.get("ADMIN_PASSWORD")
if not password:
if not ADMIN_PASSWORD:
return
db = SessionLocal()
try:
if not db.query(AdminUser).filter_by(username=username).first():
db.add(AdminUser(username=username, password_hash=hash_password(password)))
if not db.query(AdminUser).filter_by(username=ADMIN_USERNAME).first():
db.add(AdminUser(username=ADMIN_USERNAME, password_hash=hash_password(ADMIN_PASSWORD)))
db.commit()
finally:
db.close()
if __name__ == "__main__":
port = int(os.environ.get("APP_PORT", 8000))
uvicorn.run("main:app", host="0.0.0.0", port=port, reload=True)
uvicorn.run(
"main:app",
host="0.0.0.0",
port=APP_PORT,
reload=False,
log_level=logging.getLevelName(LOG_LEVEL).lower(),
)

View File

@@ -29,6 +29,8 @@ async def create_admin(
raise HTTPException(422, "Username cannot be empty")
if req.role not in ("admin", "manager"):
raise HTTPException(422, "role must be 'admin' or 'manager'")
if len(req.password) < 12:
raise HTTPException(422, "Password must be at least 12 characters")
if db.query(AdminUser).filter_by(username=username).first():
raise HTTPException(409, "Username already exists")
user = AdminUser(username=username, password_hash=hash_password(req.password), role=req.role)
@@ -64,6 +66,8 @@ async def change_password(
):
if not req.new_password:
raise HTTPException(422, "Password cannot be empty")
if len(req.new_password) < 12:
raise HTTPException(422, "Password must be at least 12 characters")
user: Optional[AdminUser] = db.query(AdminUser).filter_by(username=username).first()
if not user:
raise HTTPException(404, "Admin not found")

View File

@@ -1,4 +1,5 @@
from datetime import datetime, timedelta
import logging
from datetime import datetime, timedelta, timezone
from typing import Optional
from fastapi import APIRouter, Depends, HTTPException, status
@@ -9,18 +10,21 @@ from core.database import AdminUser, Keypass, get_db
from core.schemas import AdminLoginRequest, KeypassLoginRequest, TokenResponse
router = APIRouter(prefix="/api/auth", tags=["auth"])
logger = logging.getLogger(__name__)
@router.post("/admin", response_model=TokenResponse)
async def admin_login(req: AdminLoginRequest, db: Session = Depends(get_db)):
user: Optional[AdminUser] = db.query(AdminUser).filter_by(username=req.username).first()
if not user or not verify_password(req.password, user.password_hash):
logger.warning("Failed admin login attempt for username=%r", req.username)
raise HTTPException(status.HTTP_401_UNAUTHORIZED, "Invalid credentials")
logger.info("Admin login: username=%r role=%r", user.username, user.role)
token = create_token({
"sub": user.username,
"role": "admin",
"scope": user.role, # 'admin' | 'manager'
"exp": datetime.utcnow() + timedelta(hours=24),
"exp": datetime.now(timezone.utc) + timedelta(hours=24),
})
return TokenResponse(token=token)
@@ -34,9 +38,9 @@ async def keypass_login(req: KeypassLoginRequest, db: Session = Depends(get_db))
raise HTTPException(status.HTTP_401_UNAUTHORIZED, "Invalid keypass")
if kp.revoked:
raise HTTPException(status.HTTP_401_UNAUTHORIZED, "Keypass has been revoked")
if kp.expires_at is not None and kp.expires_at < datetime.utcnow():
if kp.expires_at is not None and kp.expires_at < datetime.now(timezone.utc):
raise HTTPException(status.HTTP_401_UNAUTHORIZED, "Keypass has expired")
exp = kp.expires_at if kp.expires_at else datetime(2099, 12, 31, 23, 59, 59)
exp = kp.expires_at if kp.expires_at else datetime(2099, 12, 31, 23, 59, 59, tzinfo=timezone.utc)
token = create_token({
"sub": str(kp.id),
"role": "keypass",

View File

@@ -1,5 +1,6 @@
import json
from datetime import datetime
import logging
from datetime import datetime, timezone
from typing import Optional
from fastapi import APIRouter, Depends, HTTPException, Request
@@ -12,6 +13,7 @@ from core.schemas import GateCreate, GatePublicResponse, GateResponse
from services.gates import call_open_gate
router = APIRouter(tags=["gates"])
logger = logging.getLogger(__name__)
# ── Admin: gate CRUD ──────────────────────────────────────────────────────────
@@ -85,7 +87,7 @@ async def admin_open_gate(
if not cred_db:
raise HTTPException(503, "AVConnect credentials not configured")
ip = request.headers.get("X-Forwarded-For", request.client.host if request.client else None)
ip = request.client.host if request.client else None
ua = request.headers.get("User-Agent")
mock = bool(cred_db.mock_avconnect)
@@ -98,7 +100,7 @@ async def admin_open_gate(
)
db.add(GateAccessLog(
timestamp=datetime.utcnow(),
timestamp=datetime.now(timezone.utc),
keypass_id=0,
keypass_code=f"[{caller['sub']}]",
gate_id=gate_db.id,
@@ -114,8 +116,10 @@ async def admin_open_gate(
db.commit()
if not success:
logger.error("Gate open failed: gate_id=%d caller=%r error=%r", gate_db.id, caller["sub"], error_msg)
raise HTTPException(502, error_msg or "Gate operation failed")
logger.info("Gate opened by admin: gate_id=%d gate=%r caller=%r ip=%r", gate_db.id, gate_db.name, caller["sub"], ip)
return {"success": True, "gate": gate_db.name}
@@ -149,7 +153,7 @@ async def open_gate(
if not cred_db:
raise HTTPException(503, "AVConnect credentials not configured")
ip = request.headers.get("X-Forwarded-For", request.client.host if request.client else None)
ip = request.client.host if request.client else None
ua = request.headers.get("User-Agent")
allowed = json.loads(_kp.allowed_gates) if _kp.allowed_gates else None
@@ -166,7 +170,7 @@ async def open_gate(
)
db.add(GateAccessLog(
timestamp=datetime.utcnow(),
timestamp=datetime.now(timezone.utc),
keypass_id=_kp.id,
keypass_code=_kp.code,
gate_id=gate_db.id,
@@ -177,11 +181,9 @@ async def open_gate(
error=error_msg,
))
if new_sid and new_sid != cred_db.session_id:
cred_db.session_id = new_sid
db.commit()
if not success:
logger.error("Gate open failed: gate_id=%d keypass_id=%d error=%r", gate_db.id, _kp.id, error_msg)
raise HTTPException(502, error_msg or "Gate operation failed")
logger.info("Gate opened by keypass: gate_id=%d gate=%r keypass_id=%d ip=%r", gate_db.id, gate_db.name, _kp.id, ip)
return {"success": True, "gate": gate_db.name}

View File

@@ -1,7 +1,7 @@
import json
import secrets
import string
from datetime import datetime
from datetime import datetime, timezone
from typing import Optional
from fastapi import APIRouter, Depends, HTTPException
@@ -38,7 +38,7 @@ async def create_keypass(
kp = Keypass(
code=code,
description=req.description,
created_at=datetime.utcnow(),
created_at=datetime.now(timezone.utc),
expires_at=req.expires_at,
revoked=False,
allowed_gates=json.dumps(req.gate_ids) if req.gate_ids else None,
@@ -80,10 +80,10 @@ async def revoke_keypass(
kp: Optional[Keypass] = db.query(Keypass).filter(Keypass.id == kp_id).first()
if not kp:
raise HTTPException(404, "Keypass not found")
if kp.expires_at is not None and kp.expires_at < datetime.utcnow():
if kp.expires_at is not None and kp.expires_at < datetime.now(timezone.utc):
raise HTTPException(409, "Expired keypasses cannot be revoked")
if kp.revoked:
raise HTTPException(409, "Keypass is already revoked")
kp.revoked = True
kp.revoked_at = datetime.utcnow()
kp.revoked_at = datetime.now(timezone.utc)
db.commit()

View File

@@ -1,6 +1,12 @@
import logging
import urllib.parse
import requests
from fake_useragent import UserAgent
logger = logging.getLogger(__name__)
class AVConnectAPI:
_BASE_URL = "https://www.avconnect.it"
@@ -21,14 +27,14 @@ class AVConnectAPI:
"User-Agent": self._ua,
"Content-Type": "application/x-www-form-urlencoded"
}
payload = f"userid={self._username}&password={self._password}&entra=Login"
payload = urllib.parse.urlencode({"userid": self._username, "password": self._password, "entra": "Login"})
response = self._session.post(login_url, data=payload, headers=headers)
if response.ok and "PHPSESSID" in self._session.cookies:
self._authenticated = True
print("Authenticated")
logger.debug("AVConnect authentication successful")
return True
return False
def _check_sessionid(self) -> bool:
if not self._authenticated or not self._session.cookies.get("PHPSESSID"):
return False
@@ -37,7 +43,7 @@ class AVConnectAPI:
"User-Agent": self._ua,
}
response = self._session.get(exec_url, headers=headers)
print(response.ok)
logger.debug("AVConnect session check: %s", response.ok)
return response.ok
def exec_gate_macro(self, id_macro) -> bool:
@@ -48,6 +54,6 @@ class AVConnectAPI:
"User-Agent": self._ua,
"Content-Type": "application/x-www-form-urlencoded; charset=UTF-8"
}
payload = f"idmacrocom={id_macro}&nome=16"
payload = urllib.parse.urlencode({"idmacrocom": id_macro, "nome": "16"})
response = self._session.post(exec_url, data=payload, headers=headers)
return response.ok