diff --git a/src/core/config.py b/src/core/config.py index 01abc4c..749854f 100644 --- a/src/core/config.py +++ b/src/core/config.py @@ -1,7 +1,19 @@ import logging import os +from datetime import datetime, timezone from typing import Optional + +def utcnow() -> datetime: + """Return the current UTC time as a timezone-naive datetime. + + SQLite (and SQLAlchemy's default column handling) stores datetimes without + timezone info. Using this helper keeps all DB timestamps and comparisons + consistent and avoids TypeError on offset-naive vs offset-aware comparisons. + """ + return datetime.now(timezone.utc).replace(tzinfo=None) + + # ── Paths ───────────────────────────────────────────────────────────────────── _HERE = os.path.dirname(os.path.abspath(__file__)) # src/core/ _SRC_DIR = os.path.dirname(_HERE) # src/ diff --git a/src/core/dependencies.py b/src/core/dependencies.py index 15c7c5d..4839c83 100644 --- a/src/core/dependencies.py +++ b/src/core/dependencies.py @@ -1,4 +1,4 @@ -from datetime import datetime, timezone +from datetime import datetime from typing import Optional from fastapi import Depends, HTTPException, status @@ -6,6 +6,7 @@ from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer from sqlalchemy.orm import Session from core.auth import decode_token +from core.config import utcnow from core.database import Keypass, get_db _security = HTTPBearer() @@ -45,6 +46,6 @@ def require_keypass( raise HTTPException(status.HTTP_401_UNAUTHORIZED, "Keypass not found") if kp.revoked: raise HTTPException(status.HTTP_401_UNAUTHORIZED, "Keypass has been revoked") - if kp.expires_at is not None and kp.expires_at < datetime.now(timezone.utc): + if kp.expires_at is not None and kp.expires_at < utcnow(): raise HTTPException(status.HTTP_401_UNAUTHORIZED, "Keypass has expired") return kp diff --git a/src/routers/auth.py b/src/routers/auth.py index e4c3c4e..f5624b3 100644 --- a/src/routers/auth.py +++ b/src/routers/auth.py @@ -6,9 +6,9 @@ from fastapi import APIRouter, Depends, HTTPException, status from sqlalchemy.orm import Session from core.auth import create_token, verify_password +from core.config import utcnow from core.database import AdminUser, Keypass, get_db from core.schemas import AdminLoginRequest, KeypassLoginRequest, TokenResponse - router = APIRouter(prefix="/api/auth", tags=["auth"]) logger = logging.getLogger(__name__) @@ -38,7 +38,7 @@ async def keypass_login(req: KeypassLoginRequest, db: Session = Depends(get_db)) raise HTTPException(status.HTTP_401_UNAUTHORIZED, "Invalid keypass") if kp.revoked: raise HTTPException(status.HTTP_401_UNAUTHORIZED, "Keypass has been revoked") - if kp.expires_at is not None and kp.expires_at < datetime.now(timezone.utc): + if kp.expires_at is not None and kp.expires_at < utcnow(): raise HTTPException(status.HTTP_401_UNAUTHORIZED, "Keypass has expired") exp = kp.expires_at if kp.expires_at else datetime(2099, 12, 31, 23, 59, 59, tzinfo=timezone.utc) token = create_token({ diff --git a/src/routers/credentials.py b/src/routers/credentials.py index ad5aa3f..961f09e 100644 --- a/src/routers/credentials.py +++ b/src/routers/credentials.py @@ -1,6 +1,6 @@ from typing import Optional -from fastapi import APIRouter, Depends +from fastapi import APIRouter, Depends, HTTPException from pydantic import BaseModel from sqlalchemy.orm import Session @@ -8,6 +8,7 @@ from core.auth import encrypt_secret from core.database import ApiCredential, get_db from core.dependencies import require_admin from core.schemas import CredentialRead, CredentialUpsert +from services.avconnect import validate_credentials router = APIRouter(prefix="/api/admin/credentials", tags=["admin-credentials"]) @@ -25,15 +26,23 @@ async def upsert_credential( db: Session = Depends(get_db), _: dict = Depends(require_admin), ): + try: + ok, session_id = validate_credentials(req.username, req.password) + except Exception as exc: + raise HTTPException(502, f"Could not reach AVConnect: {exc}") + if not ok: + raise HTTPException(422, "AVConnect rejected these credentials") + cred: Optional[ApiCredential] = db.query(ApiCredential).first() if cred: cred.username = req.username cred.password_enc = encrypt_secret(req.password) - cred.session_id = None # invalidate any cached session + cred.session_id = session_id # reuse the session obtained during validation else: cred = ApiCredential( username=req.username, password_enc=encrypt_secret(req.password), + session_id=session_id, ) db.add(cred) db.commit() diff --git a/src/routers/gates.py b/src/routers/gates.py index eafef1a..b3e7c09 100644 --- a/src/routers/gates.py +++ b/src/routers/gates.py @@ -1,12 +1,12 @@ import json import logging -from datetime import datetime, timezone from typing import Optional from fastapi import APIRouter, Depends, HTTPException, Request from sqlalchemy.orm import Session from core.auth import decrypt_secret +from core.config import utcnow from core.database import ApiCredential, GateAccessLog, GateDB, Keypass, get_db from core.dependencies import require_admin, require_manager, require_keypass from core.schemas import GateCreate, GatePublicResponse, GateResponse @@ -100,7 +100,7 @@ async def admin_open_gate( ) db.add(GateAccessLog( - timestamp=datetime.now(timezone.utc), + timestamp=utcnow(), keypass_id=0, keypass_code=f"[{caller['sub']}]", gate_id=gate_db.id, @@ -170,7 +170,7 @@ async def open_gate( ) db.add(GateAccessLog( - timestamp=datetime.now(timezone.utc), + timestamp=utcnow(), keypass_id=_kp.id, keypass_code=_kp.code, gate_id=gate_db.id, diff --git a/src/routers/keypasses.py b/src/routers/keypasses.py index 0b59250..c349fb6 100644 --- a/src/routers/keypasses.py +++ b/src/routers/keypasses.py @@ -7,6 +7,7 @@ from typing import Optional from fastapi import APIRouter, Depends, HTTPException from sqlalchemy.orm import Session +from core.config import utcnow from core.database import Keypass, get_db from core.dependencies import require_manager from core.schemas import KeypassCreate, KeypassPatch, KeypassResponse, keypass_to_response @@ -38,7 +39,7 @@ async def create_keypass( kp = Keypass( code=code, description=req.description, - created_at=datetime.now(timezone.utc), + created_at=utcnow(), expires_at=req.expires_at, revoked=False, allowed_gates=json.dumps(req.gate_ids) if req.gate_ids else None, @@ -80,10 +81,10 @@ async def revoke_keypass( kp: Optional[Keypass] = db.query(Keypass).filter(Keypass.id == kp_id).first() if not kp: raise HTTPException(404, "Keypass not found") - if kp.expires_at is not None and kp.expires_at < datetime.now(timezone.utc): + if kp.expires_at is not None and kp.expires_at < utcnow(): raise HTTPException(409, "Expired keypasses cannot be revoked") if kp.revoked: raise HTTPException(409, "Keypass is already revoked") kp.revoked = True - kp.revoked_at = datetime.now(timezone.utc) + kp.revoked_at = utcnow() db.commit() diff --git a/src/services/avconnect.py b/src/services/avconnect.py index 2415c04..a559d5e 100644 --- a/src/services/avconnect.py +++ b/src/services/avconnect.py @@ -21,6 +21,9 @@ class AVConnectAPI: self._session.cookies.set("PHPSESSID", session_id) self._authenticated = True + _LOGIN_SUCCESS_PATH = "/entraconf.php" + _LOGIN_DENIED_PATH = "/accessdenied.htm" + def _authenticate(self) -> bool: login_url = f"{self._BASE_URL}/loginone.php" headers = { @@ -28,11 +31,17 @@ class AVConnectAPI: "Content-Type": "application/x-www-form-urlencoded" } payload = urllib.parse.urlencode({"userid": self._username, "password": self._password, "entra": "Login"}) - response = self._session.post(login_url, data=payload, headers=headers) - if response.ok and "PHPSESSID" in self._session.cookies: + # allow_redirects=False so we can inspect the Location header directly. + response = self._session.post(login_url, data=payload, headers=headers, allow_redirects=False) + location = response.headers.get("Location", "") + if response.status_code == 302 and self._LOGIN_SUCCESS_PATH in location: self._authenticated = True logger.debug("AVConnect authentication successful") return True + if self._LOGIN_DENIED_PATH in location: + logger.warning("AVConnect authentication denied (invalid credentials)") + else: + logger.warning("AVConnect authentication failed: status=%d location=%r", response.status_code, location) return False def _check_sessionid(self) -> bool: @@ -56,4 +65,17 @@ class AVConnectAPI: } payload = urllib.parse.urlencode({"idmacrocom": id_macro, "nome": "16"}) response = self._session.post(exec_url, data=payload, headers=headers) - return response.ok \ No newline at end of file + return response.ok + + +def validate_credentials(username: str, password: str) -> tuple[bool, str | None]: + """Attempt a login and return (ok, session_id_or_None). + + Returns (False, None) if the credentials are rejected. + Raises on unexpected network errors. + """ + api = AVConnectAPI(username, password) + if not api._authenticate(): + return False, None + session_id = api._session.cookies.get("PHPSESSID") or None + return True, session_id \ No newline at end of file diff --git a/src/services/gates.py b/src/services/gates.py index 43a7544..e1ac2ce 100644 --- a/src/services/gates.py +++ b/src/services/gates.py @@ -13,7 +13,6 @@ def call_open_gate( """Attempt to open a gate. Returns (success, error_msg, new_session_id).""" if mock: return True, None, None - return True, None, None try: api = AVConnectAPI(username, password, session_id) ok = api.exec_gate_macro(macro_id)