Validate AVConnect credentials on saving. Improved AVConnect login method. Fixed issue with UTC datetimes

This commit is contained in:
Ettore
2026-05-09 18:18:10 +02:00
parent 69e4f594de
commit 0cb35a30cb
8 changed files with 60 additions and 16 deletions

View File

@@ -1,7 +1,19 @@
import logging
import os
from datetime import datetime, timezone
from typing import Optional
def utcnow() -> datetime:
"""Return the current UTC time as a timezone-naive datetime.
SQLite (and SQLAlchemy's default column handling) stores datetimes without
timezone info. Using this helper keeps all DB timestamps and comparisons
consistent and avoids TypeError on offset-naive vs offset-aware comparisons.
"""
return datetime.now(timezone.utc).replace(tzinfo=None)
# ── Paths ─────────────────────────────────────────────────────────────────────
_HERE = os.path.dirname(os.path.abspath(__file__)) # src/core/
_SRC_DIR = os.path.dirname(_HERE) # src/

View File

@@ -1,4 +1,4 @@
from datetime import datetime, timezone
from datetime import datetime
from typing import Optional
from fastapi import Depends, HTTPException, status
@@ -6,6 +6,7 @@ from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from sqlalchemy.orm import Session
from core.auth import decode_token
from core.config import utcnow
from core.database import Keypass, get_db
_security = HTTPBearer()
@@ -45,6 +46,6 @@ def require_keypass(
raise HTTPException(status.HTTP_401_UNAUTHORIZED, "Keypass not found")
if kp.revoked:
raise HTTPException(status.HTTP_401_UNAUTHORIZED, "Keypass has been revoked")
if kp.expires_at is not None and kp.expires_at < datetime.now(timezone.utc):
if kp.expires_at is not None and kp.expires_at < utcnow():
raise HTTPException(status.HTTP_401_UNAUTHORIZED, "Keypass has expired")
return kp

View File

@@ -6,9 +6,9 @@ from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.orm import Session
from core.auth import create_token, verify_password
from core.config import utcnow
from core.database import AdminUser, Keypass, get_db
from core.schemas import AdminLoginRequest, KeypassLoginRequest, TokenResponse
router = APIRouter(prefix="/api/auth", tags=["auth"])
logger = logging.getLogger(__name__)
@@ -38,7 +38,7 @@ async def keypass_login(req: KeypassLoginRequest, db: Session = Depends(get_db))
raise HTTPException(status.HTTP_401_UNAUTHORIZED, "Invalid keypass")
if kp.revoked:
raise HTTPException(status.HTTP_401_UNAUTHORIZED, "Keypass has been revoked")
if kp.expires_at is not None and kp.expires_at < datetime.now(timezone.utc):
if kp.expires_at is not None and kp.expires_at < utcnow():
raise HTTPException(status.HTTP_401_UNAUTHORIZED, "Keypass has expired")
exp = kp.expires_at if kp.expires_at else datetime(2099, 12, 31, 23, 59, 59, tzinfo=timezone.utc)
token = create_token({

View File

@@ -1,6 +1,6 @@
from typing import Optional
from fastapi import APIRouter, Depends
from fastapi import APIRouter, Depends, HTTPException
from pydantic import BaseModel
from sqlalchemy.orm import Session
@@ -8,6 +8,7 @@ from core.auth import encrypt_secret
from core.database import ApiCredential, get_db
from core.dependencies import require_admin
from core.schemas import CredentialRead, CredentialUpsert
from services.avconnect import validate_credentials
router = APIRouter(prefix="/api/admin/credentials", tags=["admin-credentials"])
@@ -25,15 +26,23 @@ async def upsert_credential(
db: Session = Depends(get_db),
_: dict = Depends(require_admin),
):
try:
ok, session_id = validate_credentials(req.username, req.password)
except Exception as exc:
raise HTTPException(502, f"Could not reach AVConnect: {exc}")
if not ok:
raise HTTPException(422, "AVConnect rejected these credentials")
cred: Optional[ApiCredential] = db.query(ApiCredential).first()
if cred:
cred.username = req.username
cred.password_enc = encrypt_secret(req.password)
cred.session_id = None # invalidate any cached session
cred.session_id = session_id # reuse the session obtained during validation
else:
cred = ApiCredential(
username=req.username,
password_enc=encrypt_secret(req.password),
session_id=session_id,
)
db.add(cred)
db.commit()

View File

@@ -1,12 +1,12 @@
import json
import logging
from datetime import datetime, timezone
from typing import Optional
from fastapi import APIRouter, Depends, HTTPException, Request
from sqlalchemy.orm import Session
from core.auth import decrypt_secret
from core.config import utcnow
from core.database import ApiCredential, GateAccessLog, GateDB, Keypass, get_db
from core.dependencies import require_admin, require_manager, require_keypass
from core.schemas import GateCreate, GatePublicResponse, GateResponse
@@ -100,7 +100,7 @@ async def admin_open_gate(
)
db.add(GateAccessLog(
timestamp=datetime.now(timezone.utc),
timestamp=utcnow(),
keypass_id=0,
keypass_code=f"[{caller['sub']}]",
gate_id=gate_db.id,
@@ -170,7 +170,7 @@ async def open_gate(
)
db.add(GateAccessLog(
timestamp=datetime.now(timezone.utc),
timestamp=utcnow(),
keypass_id=_kp.id,
keypass_code=_kp.code,
gate_id=gate_db.id,

View File

@@ -7,6 +7,7 @@ from typing import Optional
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.orm import Session
from core.config import utcnow
from core.database import Keypass, get_db
from core.dependencies import require_manager
from core.schemas import KeypassCreate, KeypassPatch, KeypassResponse, keypass_to_response
@@ -38,7 +39,7 @@ async def create_keypass(
kp = Keypass(
code=code,
description=req.description,
created_at=datetime.now(timezone.utc),
created_at=utcnow(),
expires_at=req.expires_at,
revoked=False,
allowed_gates=json.dumps(req.gate_ids) if req.gate_ids else None,
@@ -80,10 +81,10 @@ async def revoke_keypass(
kp: Optional[Keypass] = db.query(Keypass).filter(Keypass.id == kp_id).first()
if not kp:
raise HTTPException(404, "Keypass not found")
if kp.expires_at is not None and kp.expires_at < datetime.now(timezone.utc):
if kp.expires_at is not None and kp.expires_at < utcnow():
raise HTTPException(409, "Expired keypasses cannot be revoked")
if kp.revoked:
raise HTTPException(409, "Keypass is already revoked")
kp.revoked = True
kp.revoked_at = datetime.now(timezone.utc)
kp.revoked_at = utcnow()
db.commit()

View File

@@ -21,6 +21,9 @@ class AVConnectAPI:
self._session.cookies.set("PHPSESSID", session_id)
self._authenticated = True
_LOGIN_SUCCESS_PATH = "/entraconf.php"
_LOGIN_DENIED_PATH = "/accessdenied.htm"
def _authenticate(self) -> bool:
login_url = f"{self._BASE_URL}/loginone.php"
headers = {
@@ -28,11 +31,17 @@ class AVConnectAPI:
"Content-Type": "application/x-www-form-urlencoded"
}
payload = urllib.parse.urlencode({"userid": self._username, "password": self._password, "entra": "Login"})
response = self._session.post(login_url, data=payload, headers=headers)
if response.ok and "PHPSESSID" in self._session.cookies:
# allow_redirects=False so we can inspect the Location header directly.
response = self._session.post(login_url, data=payload, headers=headers, allow_redirects=False)
location = response.headers.get("Location", "")
if response.status_code == 302 and self._LOGIN_SUCCESS_PATH in location:
self._authenticated = True
logger.debug("AVConnect authentication successful")
return True
if self._LOGIN_DENIED_PATH in location:
logger.warning("AVConnect authentication denied (invalid credentials)")
else:
logger.warning("AVConnect authentication failed: status=%d location=%r", response.status_code, location)
return False
def _check_sessionid(self) -> bool:
@@ -57,3 +66,16 @@ class AVConnectAPI:
payload = urllib.parse.urlencode({"idmacrocom": id_macro, "nome": "16"})
response = self._session.post(exec_url, data=payload, headers=headers)
return response.ok
def validate_credentials(username: str, password: str) -> tuple[bool, str | None]:
"""Attempt a login and return (ok, session_id_or_None).
Returns (False, None) if the credentials are rejected.
Raises on unexpected network errors.
"""
api = AVConnectAPI(username, password)
if not api._authenticate():
return False, None
session_id = api._session.cookies.get("PHPSESSID") or None
return True, session_id

View File

@@ -13,7 +13,6 @@ def call_open_gate(
"""Attempt to open a gate. Returns (success, error_msg, new_session_id)."""
if mock:
return True, None, None
return True, None, None
try:
api = AVConnectAPI(username, password, session_id)
ok = api.exec_gate_macro(macro_id)