From adcc2b952258219987b8b99a27c7e7e7872256e0 Mon Sep 17 00:00:00 2001 From: Ettore <=> Date: Wed, 13 May 2026 10:17:50 +0200 Subject: [PATCH] Fix AVConnect authentication issue --- src/routers/credentials.py | 4 +-- src/routers/gates.py | 4 +++ src/services/avconnect.py | 62 +++++++++++++++++++++----------------- src/services/gates.py | 3 +- 4 files changed, 42 insertions(+), 31 deletions(-) diff --git a/src/routers/credentials.py b/src/routers/credentials.py index 961f09e..ebf866a 100644 --- a/src/routers/credentials.py +++ b/src/routers/credentials.py @@ -8,7 +8,7 @@ from core.auth import encrypt_secret from core.database import ApiCredential, get_db from core.dependencies import require_admin from core.schemas import CredentialRead, CredentialUpsert -from services.avconnect import validate_credentials +from services.avconnect import AVConnectAPI router = APIRouter(prefix="/api/admin/credentials", tags=["admin-credentials"]) @@ -27,7 +27,7 @@ async def upsert_credential( _: dict = Depends(require_admin), ): try: - ok, session_id = validate_credentials(req.username, req.password) + ok, session_id = AVConnectAPI(req.username, req.password).validate_credentials() except Exception as exc: raise HTTPException(502, f"Could not reach AVConnect: {exc}") if not ok: diff --git a/src/routers/gates.py b/src/routers/gates.py index 2604e35..a799990 100644 --- a/src/routers/gates.py +++ b/src/routers/gates.py @@ -201,6 +201,10 @@ async def open_gate( error=error_msg, )) + if new_sid and new_sid != cred_db.session_id: + cred_db.session_id = new_sid + db.commit() + if not success: logger.error("Gate open failed: gate_id=%d keypass_id=%d error=%r", gate_db.id, _kp.id, error_msg) raise HTTPException(502, error_msg or "Gate operation failed") diff --git a/src/services/avconnect.py b/src/services/avconnect.py index a559d5e..d9a348a 100644 --- a/src/services/avconnect.py +++ b/src/services/avconnect.py @@ -9,20 +9,18 @@ logger = logging.getLogger(__name__) class AVConnectAPI: _BASE_URL = "https://www.avconnect.it" + _LOGIN_SUCCESS_PATH = "/entraconf.php" + _LOGIN_DENIED_PATH = "/accessdenied.htm" + _SESSION_EXPIRED_PATH = "/accessespired.htm" def __init__(self, username: str, password: str, session_id: str | None = None): self._ua = UserAgent(browsers=["Chrome Mobile"], os=["Android"], platforms=["mobile"]).random self._username = username self._password = password self._session = requests.Session() - self._authenticated = False - + if session_id: self._session.cookies.set("PHPSESSID", session_id) - self._authenticated = True - - _LOGIN_SUCCESS_PATH = "/entraconf.php" - _LOGIN_DENIED_PATH = "/accessdenied.htm" def _authenticate(self) -> bool: login_url = f"{self._BASE_URL}/loginone.php" @@ -35,47 +33,57 @@ class AVConnectAPI: response = self._session.post(login_url, data=payload, headers=headers, allow_redirects=False) location = response.headers.get("Location", "") if response.status_code == 302 and self._LOGIN_SUCCESS_PATH in location: - self._authenticated = True - logger.debug("AVConnect authentication successful") + # Follow the redirect to complete the login and receive the PHPSESSID cookie. + redirect_url = location if location.startswith("http") else f"{self._BASE_URL}{location}" + self._session.get(redirect_url, headers={"User-Agent": self._ua}) + logger.debug(f"AVConnect authentication successful: session_id={self._session.cookies.get('PHPSESSID')}") return True if self._LOGIN_DENIED_PATH in location: logger.warning("AVConnect authentication denied (invalid credentials)") else: - logger.warning("AVConnect authentication failed: status=%d location=%r", response.status_code, location) + logger.warning(f"AVConnect authentication failed: status={response.status_code} location={location}") return False def _check_sessionid(self) -> bool: - if not self._authenticated or not self._session.cookies.get("PHPSESSID"): + if not self._session.cookies.get("PHPSESSID"): return False + exec_url = f"{self._BASE_URL}/exemacrocom.php" headers = { "User-Agent": self._ua, } - response = self._session.get(exec_url, headers=headers) - logger.debug("AVConnect session check: %s", response.ok) + response = self._session.get(exec_url, headers=headers, allow_redirects=False) + if response.status_code == 302 and self._SESSION_EXPIRED_PATH in response.headers.get("Location", ""): + logger.debug("AVConnect session expired") + return False + logger.debug(f"AVConnect session check: {response.ok}") return response.ok - def exec_gate_macro(self, id_macro) -> bool: - if (not self._authenticated or not self._check_sessionid()) and not self._authenticate(): - raise Exception("Authentication failed.") + def exec_gate_macro(self, id_macro) -> tuple[bool, str | None]: + if not self._check_sessionid() and not self._authenticate(): + raise Exception("AVConnect authentication denied (invalid credentials)") exec_url = f"{self._BASE_URL}/exemacrocom.php" headers = { "User-Agent": self._ua, "Content-Type": "application/x-www-form-urlencoded; charset=UTF-8" } payload = urllib.parse.urlencode({"idmacrocom": id_macro, "nome": "16"}) + logger.debug(f"AVConnect executing gate macro: id_macro={id_macro} session_id={self._session.cookies.get('PHPSESSID')}") response = self._session.post(exec_url, data=payload, headers=headers) - return response.ok + if response.ok: + return True, self._session.cookies.get("PHPSESSID") + logger.warning(f"AVConnect gate macro execution failed: id_macro={id_macro} status={response.status_code} response={response.text}") + return False, self._session.cookies.get("PHPSESSID") + + def validate_credentials(self) -> tuple[bool, str | None]: + """Attempt a login and return (ok, session_id_or_None). - -def validate_credentials(username: str, password: str) -> tuple[bool, str | None]: - """Attempt a login and return (ok, session_id_or_None). - - Returns (False, None) if the credentials are rejected. - Raises on unexpected network errors. - """ - api = AVConnectAPI(username, password) - if not api._authenticate(): + Returns (False, None) if the credentials are rejected. + Raises on unexpected network errors. + """ + logger.debug("AVConnect validating credentials by attempting login") + if self._authenticate(): + logger.debug(f"AVConnect credentials valid, session_id={self._session.cookies.get('PHPSESSID')}") + return True, self._session.cookies.get("PHPSESSID") + logger.debug("AVConnect credentials invalid") return False, None - session_id = api._session.cookies.get("PHPSESSID") or None - return True, session_id \ No newline at end of file diff --git a/src/services/gates.py b/src/services/gates.py index e1ac2ce..23cb049 100644 --- a/src/services/gates.py +++ b/src/services/gates.py @@ -15,8 +15,7 @@ def call_open_gate( return True, None, None try: api = AVConnectAPI(username, password, session_id) - ok = api.exec_gate_macro(macro_id) - new_sid = api._session.cookies.get("PHPSESSID") + ok, new_sid = api.exec_gate_macro(macro_id) if not ok: return False, "Gate did not confirm open", new_sid return True, None, new_sid