Fix AVConnect authentication issue

This commit is contained in:
Ettore
2026-05-13 10:17:50 +02:00
parent 876e44272b
commit adcc2b9522
4 changed files with 42 additions and 31 deletions

View File

@@ -8,7 +8,7 @@ from core.auth import encrypt_secret
from core.database import ApiCredential, get_db from core.database import ApiCredential, get_db
from core.dependencies import require_admin from core.dependencies import require_admin
from core.schemas import CredentialRead, CredentialUpsert from core.schemas import CredentialRead, CredentialUpsert
from services.avconnect import validate_credentials from services.avconnect import AVConnectAPI
router = APIRouter(prefix="/api/admin/credentials", tags=["admin-credentials"]) router = APIRouter(prefix="/api/admin/credentials", tags=["admin-credentials"])
@@ -27,7 +27,7 @@ async def upsert_credential(
_: dict = Depends(require_admin), _: dict = Depends(require_admin),
): ):
try: try:
ok, session_id = validate_credentials(req.username, req.password) ok, session_id = AVConnectAPI(req.username, req.password).validate_credentials()
except Exception as exc: except Exception as exc:
raise HTTPException(502, f"Could not reach AVConnect: {exc}") raise HTTPException(502, f"Could not reach AVConnect: {exc}")
if not ok: if not ok:

View File

@@ -201,6 +201,10 @@ async def open_gate(
error=error_msg, error=error_msg,
)) ))
if new_sid and new_sid != cred_db.session_id:
cred_db.session_id = new_sid
db.commit()
if not success: if not success:
logger.error("Gate open failed: gate_id=%d keypass_id=%d error=%r", gate_db.id, _kp.id, error_msg) logger.error("Gate open failed: gate_id=%d keypass_id=%d error=%r", gate_db.id, _kp.id, error_msg)
raise HTTPException(502, error_msg or "Gate operation failed") raise HTTPException(502, error_msg or "Gate operation failed")

View File

@@ -9,20 +9,18 @@ logger = logging.getLogger(__name__)
class AVConnectAPI: class AVConnectAPI:
_BASE_URL = "https://www.avconnect.it" _BASE_URL = "https://www.avconnect.it"
_LOGIN_SUCCESS_PATH = "/entraconf.php"
_LOGIN_DENIED_PATH = "/accessdenied.htm"
_SESSION_EXPIRED_PATH = "/accessespired.htm"
def __init__(self, username: str, password: str, session_id: str | None = None): def __init__(self, username: str, password: str, session_id: str | None = None):
self._ua = UserAgent(browsers=["Chrome Mobile"], os=["Android"], platforms=["mobile"]).random self._ua = UserAgent(browsers=["Chrome Mobile"], os=["Android"], platforms=["mobile"]).random
self._username = username self._username = username
self._password = password self._password = password
self._session = requests.Session() self._session = requests.Session()
self._authenticated = False
if session_id: if session_id:
self._session.cookies.set("PHPSESSID", session_id) self._session.cookies.set("PHPSESSID", session_id)
self._authenticated = True
_LOGIN_SUCCESS_PATH = "/entraconf.php"
_LOGIN_DENIED_PATH = "/accessdenied.htm"
def _authenticate(self) -> bool: def _authenticate(self) -> bool:
login_url = f"{self._BASE_URL}/loginone.php" login_url = f"{self._BASE_URL}/loginone.php"
@@ -35,47 +33,57 @@ class AVConnectAPI:
response = self._session.post(login_url, data=payload, headers=headers, allow_redirects=False) response = self._session.post(login_url, data=payload, headers=headers, allow_redirects=False)
location = response.headers.get("Location", "") location = response.headers.get("Location", "")
if response.status_code == 302 and self._LOGIN_SUCCESS_PATH in location: if response.status_code == 302 and self._LOGIN_SUCCESS_PATH in location:
self._authenticated = True # Follow the redirect to complete the login and receive the PHPSESSID cookie.
logger.debug("AVConnect authentication successful") redirect_url = location if location.startswith("http") else f"{self._BASE_URL}{location}"
self._session.get(redirect_url, headers={"User-Agent": self._ua})
logger.debug(f"AVConnect authentication successful: session_id={self._session.cookies.get('PHPSESSID')}")
return True return True
if self._LOGIN_DENIED_PATH in location: if self._LOGIN_DENIED_PATH in location:
logger.warning("AVConnect authentication denied (invalid credentials)") logger.warning("AVConnect authentication denied (invalid credentials)")
else: else:
logger.warning("AVConnect authentication failed: status=%d location=%r", response.status_code, location) logger.warning(f"AVConnect authentication failed: status={response.status_code} location={location}")
return False return False
def _check_sessionid(self) -> bool: def _check_sessionid(self) -> bool:
if not self._authenticated or not self._session.cookies.get("PHPSESSID"): if not self._session.cookies.get("PHPSESSID"):
return False return False
exec_url = f"{self._BASE_URL}/exemacrocom.php" exec_url = f"{self._BASE_URL}/exemacrocom.php"
headers = { headers = {
"User-Agent": self._ua, "User-Agent": self._ua,
} }
response = self._session.get(exec_url, headers=headers) response = self._session.get(exec_url, headers=headers, allow_redirects=False)
logger.debug("AVConnect session check: %s", response.ok) if response.status_code == 302 and self._SESSION_EXPIRED_PATH in response.headers.get("Location", ""):
logger.debug("AVConnect session expired")
return False
logger.debug(f"AVConnect session check: {response.ok}")
return response.ok return response.ok
def exec_gate_macro(self, id_macro) -> bool: def exec_gate_macro(self, id_macro) -> tuple[bool, str | None]:
if (not self._authenticated or not self._check_sessionid()) and not self._authenticate(): if not self._check_sessionid() and not self._authenticate():
raise Exception("Authentication failed.") raise Exception("AVConnect authentication denied (invalid credentials)")
exec_url = f"{self._BASE_URL}/exemacrocom.php" exec_url = f"{self._BASE_URL}/exemacrocom.php"
headers = { headers = {
"User-Agent": self._ua, "User-Agent": self._ua,
"Content-Type": "application/x-www-form-urlencoded; charset=UTF-8" "Content-Type": "application/x-www-form-urlencoded; charset=UTF-8"
} }
payload = urllib.parse.urlencode({"idmacrocom": id_macro, "nome": "16"}) payload = urllib.parse.urlencode({"idmacrocom": id_macro, "nome": "16"})
logger.debug(f"AVConnect executing gate macro: id_macro={id_macro} session_id={self._session.cookies.get('PHPSESSID')}")
response = self._session.post(exec_url, data=payload, headers=headers) response = self._session.post(exec_url, data=payload, headers=headers)
return response.ok if response.ok:
return True, self._session.cookies.get("PHPSESSID")
logger.warning(f"AVConnect gate macro execution failed: id_macro={id_macro} status={response.status_code} response={response.text}")
return False, self._session.cookies.get("PHPSESSID")
def validate_credentials(self) -> tuple[bool, str | None]:
"""Attempt a login and return (ok, session_id_or_None).
Returns (False, None) if the credentials are rejected.
def validate_credentials(username: str, password: str) -> tuple[bool, str | None]: Raises on unexpected network errors.
"""Attempt a login and return (ok, session_id_or_None). """
logger.debug("AVConnect validating credentials by attempting login")
Returns (False, None) if the credentials are rejected. if self._authenticate():
Raises on unexpected network errors. logger.debug(f"AVConnect credentials valid, session_id={self._session.cookies.get('PHPSESSID')}")
""" return True, self._session.cookies.get("PHPSESSID")
api = AVConnectAPI(username, password) logger.debug("AVConnect credentials invalid")
if not api._authenticate():
return False, None return False, None
session_id = api._session.cookies.get("PHPSESSID") or None
return True, session_id

View File

@@ -15,8 +15,7 @@ def call_open_gate(
return True, None, None return True, None, None
try: try:
api = AVConnectAPI(username, password, session_id) api = AVConnectAPI(username, password, session_id)
ok = api.exec_gate_macro(macro_id) ok, new_sid = api.exec_gate_macro(macro_id)
new_sid = api._session.cookies.get("PHPSESSID")
if not ok: if not ok:
return False, "Gate did not confirm open", new_sid return False, "Gate did not confirm open", new_sid
return True, None, new_sid return True, None, new_sid