Refactoring to eliminate models. Moved to SQLAlchemy Mapped annotations

This commit is contained in:
Ettore
2026-05-06 23:51:29 +02:00
parent e153d54917
commit d30b320595
9 changed files with 70 additions and 117 deletions

View File

@@ -1,8 +1,9 @@
import os import os
from datetime import datetime from datetime import datetime
from typing import Optional
from sqlalchemy import Boolean, Column, DateTime, Integer, String, Text, create_engine from sqlalchemy import Boolean, String, Text, create_engine
from sqlalchemy.orm import DeclarativeBase, sessionmaker from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, sessionmaker
_HERE = os.path.dirname(os.path.abspath(__file__)) _HERE = os.path.dirname(os.path.abspath(__file__))
_SRC_DIR = os.path.dirname(_HERE) _SRC_DIR = os.path.dirname(_HERE)
@@ -25,57 +26,57 @@ class Base(DeclarativeBase):
class GateDB(Base): class GateDB(Base):
__tablename__ = "gates" __tablename__ = "gates"
id = Column(Integer, primary_key=True, autoincrement=True) id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True)
name = Column(String, nullable=False) name: Mapped[str] = mapped_column(String, nullable=False)
gate_type = Column(String, nullable=False) # 'car' | 'pedestrian' gate_type: Mapped[str] = mapped_column(String, nullable=False) # 'car' | 'pedestrian'
avconnect_macro_id = Column(String, nullable=False) # AVConnect macro ID avconnect_macro_id: Mapped[str] = mapped_column(String, nullable=False) # AVConnect macro ID
status = Column(String, default="enabled") # 'enabled' | 'disabled' status: Mapped[str] = mapped_column(String, default="enabled") # 'enabled' | 'disabled'
class ApiCredential(Base): class ApiCredential(Base):
__tablename__ = "api_credentials" __tablename__ = "api_credentials"
id = Column(Integer, primary_key=True, autoincrement=True) id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True)
username = Column(String, nullable=False) username: Mapped[str] = mapped_column(String, nullable=False)
password_enc = Column(String, nullable=False) # Fernet-encrypted password_enc: Mapped[str] = mapped_column(String, nullable=False) # Fernet-encrypted
session_id = Column(String, nullable=True) session_id: Mapped[Optional[str]] = mapped_column(String, nullable=True)
class Keypass(Base): class Keypass(Base):
__tablename__ = "keypasses" __tablename__ = "keypasses"
id = Column(Integer, primary_key=True, autoincrement=True) id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True)
code = Column(String, unique=True, nullable=False) code: Mapped[str] = mapped_column(String, unique=True, nullable=False)
description = Column(Text, nullable=False) description: Mapped[str] = mapped_column(Text, nullable=False)
created_at = Column(DateTime, nullable=False) created_at: Mapped[datetime] = mapped_column(nullable=False)
expires_at = Column(DateTime, nullable=True) # NULL = never expires expires_at: Mapped[Optional[datetime]] = mapped_column(nullable=True) # NULL = never expires
revoked = Column(Boolean, default=False) revoked: Mapped[bool] = mapped_column(Boolean, default=False)
revoked_at = Column(DateTime, nullable=True) revoked_at: Mapped[Optional[datetime]] = mapped_column(nullable=True)
allowed_gates = Column(Text, nullable=True) # JSON list of gate IDs; NULL = all gates allowed_gates: Mapped[Optional[str]] = mapped_column(Text, nullable=True) # JSON list of gate IDs; NULL = all gates
class GateAccessLog(Base): class GateAccessLog(Base):
__tablename__ = "gate_access_logs" __tablename__ = "gate_access_logs"
id = Column(Integer, primary_key=True, autoincrement=True) id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True)
timestamp = Column(DateTime, nullable=False) timestamp: Mapped[datetime] = mapped_column(nullable=False)
keypass_id = Column(Integer, nullable=False) keypass_id: Mapped[int] = mapped_column(nullable=False)
keypass_code = Column(String, nullable=False) keypass_code: Mapped[str] = mapped_column(String, nullable=False)
gate_id = Column(Integer, nullable=False) gate_id: Mapped[int] = mapped_column(nullable=False)
gate_name = Column(String, nullable=False) gate_name: Mapped[str] = mapped_column(String, nullable=False)
ip_address = Column(String, nullable=True) ip_address: Mapped[Optional[str]] = mapped_column(String, nullable=True)
user_agent = Column(Text, nullable=True) user_agent: Mapped[Optional[str]] = mapped_column(Text, nullable=True)
success = Column(Boolean, nullable=False) success: Mapped[bool] = mapped_column(Boolean, nullable=False)
error = Column(Text, nullable=True) error: Mapped[Optional[str]] = mapped_column(Text, nullable=True)
class AdminUser(Base): class AdminUser(Base):
__tablename__ = "admin_users" __tablename__ = "admin_users"
id = Column(Integer, primary_key=True, autoincrement=True) id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True)
username = Column(String, unique=True, nullable=False) username: Mapped[str] = mapped_column(String, unique=True, nullable=False)
password_hash = Column(String, nullable=False) password_hash: Mapped[str] = mapped_column(String, nullable=False)
role = Column(String, nullable=False, default="admin") # 'admin' | 'manager' role: Mapped[str] = mapped_column(String, nullable=False, default="admin") # 'admin' | 'manager'
def get_db(): def get_db():

View File

@@ -1,9 +0,0 @@
from .credential import Credential
from .gate import Gate
from .status import Status
__all__ = [
"Credential",
"Gate",
"Status"
]

View File

@@ -1,5 +0,0 @@
class Credential:
def __init__(self, username: str, password: str):
self.username = username
self.password = password
self.sessionid = None

View File

@@ -1,9 +0,0 @@
from .status import Status
from .credential import Credential
class Gate:
def __init__(self, id: str, name: str, status: Status = Status.ENABLED):
self.id = id
self.name = name
self.status = status if isinstance(status, Status) else Status(status)

View File

@@ -1,5 +0,0 @@
from enum import Enum
class Status(Enum):
ENABLED = 1
DISABLED = 0

View File

@@ -8,7 +8,6 @@ from sqlalchemy.orm import Session
from core.auth import decrypt_secret from core.auth import decrypt_secret
from core.database import ApiCredential, GateAccessLog, GateDB, Keypass, get_db from core.database import ApiCredential, GateAccessLog, GateDB, Keypass, get_db
from core.dependencies import require_admin, require_manager, require_keypass from core.dependencies import require_admin, require_manager, require_keypass
from models import Credential, Gate as GateModel, Status
from core.schemas import GateCreate, GateResponse from core.schemas import GateCreate, GateResponse
from services.gates import call_open_gate from services.gates import call_open_gate
@@ -86,17 +85,15 @@ async def admin_open_gate(
if not cred_db: if not cred_db:
raise HTTPException(503, "AVConnect credentials not configured") raise HTTPException(503, "AVConnect credentials not configured")
credential = Credential(
username=cred_db.username,
password=decrypt_secret(cred_db.password_enc),
)
credential.sessionid = cred_db.session_id
gate = GateModel(id=gate_db.avconnect_macro_id, name=gate_db.name, status=Status.ENABLED)
ip = request.headers.get("X-Forwarded-For", request.client.host if request.client else None) ip = request.headers.get("X-Forwarded-For", request.client.host if request.client else None)
ua = request.headers.get("User-Agent") ua = request.headers.get("User-Agent")
success, error_msg, new_sid = call_open_gate(gate, credential) success, error_msg, new_sid = call_open_gate(
gate_db.avconnect_macro_id,
cred_db.username,
decrypt_secret(cred_db.password_enc),
cred_db.session_id,
)
db.add(GateAccessLog( db.add(GateAccessLog(
timestamp=datetime.utcnow(), timestamp=datetime.utcnow(),
@@ -150,14 +147,6 @@ async def open_gate(
if not cred_db: if not cred_db:
raise HTTPException(503, "AVConnect credentials not configured") raise HTTPException(503, "AVConnect credentials not configured")
credential = Credential(
username=cred_db.username,
password=decrypt_secret(cred_db.password_enc),
)
credential.sessionid = cred_db.session_id
gate_status = Status.ENABLED if gate_db.status == "enabled" else Status.DISABLED
gate = GateModel(id=gate_db.avconnect_macro_id, name=gate_db.name, status=gate_status)
ip = request.headers.get("X-Forwarded-For", request.client.host if request.client else None) ip = request.headers.get("X-Forwarded-For", request.client.host if request.client else None)
ua = request.headers.get("User-Agent") ua = request.headers.get("User-Agent")
@@ -165,7 +154,12 @@ async def open_gate(
if allowed is not None and gate_id not in allowed: if allowed is not None and gate_id not in allowed:
raise HTTPException(403, "This keypass does not have access to this gate") raise HTTPException(403, "This keypass does not have access to this gate")
success, error_msg, new_sid = call_open_gate(gate, credential) success, error_msg, new_sid = call_open_gate(
gate_db.avconnect_macro_id,
cred_db.username,
decrypt_secret(cred_db.password_enc),
cred_db.session_id,
)
db.add(GateAccessLog( db.add(GateAccessLog(
timestamp=datetime.utcnow(), timestamp=datetime.utcnow(),

View File

@@ -1,9 +1,7 @@
from .avconnect import AVConnectAPI from .avconnect import AVConnectAPI
from .gates import GatesService, OpenResult, call_open_gate from .gates import call_open_gate
__all__ = [ __all__ = [
"AVConnectAPI", "AVConnectAPI",
"GatesService",
"OpenResult",
"call_open_gate", "call_open_gate",
] ]

View File

@@ -1,18 +1,18 @@
import requests import requests
from fake_useragent import UserAgent from fake_useragent import UserAgent
from models import Credential
class AVConnectAPI: class AVConnectAPI:
_BASE_URL = "https://www.avconnect.it" _BASE_URL = "https://www.avconnect.it"
def __init__(self, credentials: Credential): def __init__(self, username: str, password: str, session_id: str | None = None):
self._ua = UserAgent(browsers=["Chrome Mobile"], os=["Android"], platforms=["mobile"]).random self._ua = UserAgent(browsers=["Chrome Mobile"], os=["Android"], platforms=["mobile"]).random
self._credentials = credentials self._username = username
self._password = password
self._session = requests.Session() self._session = requests.Session()
self._authenticated = False self._authenticated = False
if credentials.sessionid: if session_id:
self._session.cookies.set("PHPSESSID", credentials.sessionid) self._session.cookies.set("PHPSESSID", session_id)
self._authenticated = True self._authenticated = True
def _authenticate(self) -> bool: def _authenticate(self) -> bool:
@@ -21,7 +21,7 @@ class AVConnectAPI:
"User-Agent": self._ua, "User-Agent": self._ua,
"Content-Type": "application/x-www-form-urlencoded" "Content-Type": "application/x-www-form-urlencoded"
} }
payload = f"userid={self._credentials.username}&password={self._credentials.password}&entra=Login" payload = f"userid={self._username}&password={self._password}&entra=Login"
response = self._session.post(login_url, data=payload, headers=headers) response = self._session.post(login_url, data=payload, headers=headers)
if response.ok and "PHPSESSID" in self._session.cookies: if response.ok and "PHPSESSID" in self._session.cookies:
self._authenticated = True self._authenticated = True
@@ -30,7 +30,7 @@ class AVConnectAPI:
return False return False
def _check_sessionid(self) -> bool: def _check_sessionid(self) -> bool:
if not self._authenticated or not self._credentials.sessionid: if not self._authenticated or not self._session.cookies.get("PHPSESSID"):
return False return False
exec_url = f"{self._BASE_URL}/exemacrocom.php" exec_url = f"{self._BASE_URL}/exemacrocom.php"
headers = { headers = {

View File

@@ -1,38 +1,26 @@
from dataclasses import dataclass
from typing import Optional from typing import Optional
from models import Credential, Status, Gate
from .avconnect import AVConnectAPI from .avconnect import AVConnectAPI
@dataclass def call_open_gate(
class OpenResult: macro_id: str,
success: bool username: str,
error: Optional[str] = None password: str,
new_session_id: Optional[str] = None session_id: Optional[str] = None,
) -> tuple[bool, Optional[str], Optional[str]]:
class GatesService:
def open_gate(self, gate: Gate, credentials: Credential) -> OpenResult:
if gate.status == Status.DISABLED:
return OpenResult(success=False, error="Gate is disabled")
try:
api = AVConnectAPI(credentials)
ok = api.exec_gate_macro(gate.id)
new_sid = api._session.cookies.get("PHPSESSID")
if not ok:
return OpenResult(success=False, error="Gate did not confirm open", new_session_id=new_sid)
return OpenResult(success=True, new_session_id=new_sid)
except Exception as e:
return OpenResult(success=False, error=str(e))
def call_open_gate(gate: Gate, credentials: Credential) -> tuple[bool, Optional[str], Optional[str]]:
"""Attempt to open a gate. Returns (success, error_msg, new_session_id). """Attempt to open a gate. Returns (success, error_msg, new_session_id).
Respects the MOCK_AVCONNECT environment variable. Respects the MOCK_AVCONNECT environment variable.
""" """
from core.config import MOCK_AVCONNECT from core.config import MOCK_AVCONNECT
if MOCK_AVCONNECT: if MOCK_AVCONNECT:
return True, None, None return True, None, None
result = GatesService().open_gate(gate, credentials) try:
return result.success, result.error, result.new_session_id api = AVConnectAPI(username, password, session_id)
ok = api.exec_gate_macro(macro_id)
new_sid = api._session.cookies.get("PHPSESSID")
if not ok:
return False, "Gate did not confirm open", new_sid
return True, None, new_sid
except Exception as e:
return False, str(e), None