From c855fda3dcfe7d80546230a812ebefe54bd0a0ed Mon Sep 17 00:00:00 2001 From: Ettore Dreucci Date: Thu, 21 Aug 2025 00:49:18 +0200 Subject: [PATCH] Add token validation --- handlers/access.py | 13 ++++++++----- handlers/credentials.py | 4 ++-- models/credential.py | 1 + services/avconnect.py | 28 +++++++++++++++++++++++----- 4 files changed, 34 insertions(+), 12 deletions(-) diff --git a/handlers/access.py b/handlers/access.py index 2079d65..64a8e85 100644 --- a/handlers/access.py +++ b/handlers/access.py @@ -5,17 +5,20 @@ from models import Users, Role async def requestaccess(update: Update, context: ContextTypes.DEFAULT_TYPE, users: Users): assert update.effective_user is not None - assert update.message is not None user_id = str(update.effective_user.id) role = users.get_role(user_id) if role != Role.GUEST: - return await update.message.reply_text("Only guests can request access.") - if not context.args: - return await update.message.reply_text("Usage: `/requestaccess`", parse_mode="Markdown") + if update.callback_query: + await update.callback_query.answer("Only guests can request access.") + elif update.message: + return await update.message.reply_text("Only guests can request access.") requester = users.get_fullname(user_id) or users.get_username(user_id) text = (f"Access request: {requester} ({user_id}) requests access.\nUse `/grantaccess {user_id} YYYY-MM-DDTHH:MM:SSZ` to grant access.") - await update.message.reply_text("Your request has been submitted.") + if update.callback_query: + await update.callback_query.answer("Your request has been submitted.") + elif update.message: + return await update.message.reply_text("Your request has been submitted.") admins = users.get_admins() for admin_id in admins: try: diff --git a/handlers/credentials.py b/handlers/credentials.py index 776f36a..c74aa9e 100644 --- a/handlers/credentials.py +++ b/handlers/credentials.py @@ -9,11 +9,11 @@ async def setcredentials(update: Update, context: ContextTypes.DEFAULT_TYPE, use user_id = str(update.effective_user.id) args = context.args - if len(args) != 2: - return await update.message.reply_text("Usage: `/setcredentials `") role = users.get_role(user_id) if role not in (Role.ADMIN, Role.MEMBER): return await update.message.reply_text("Only members or admins can set credentials") + if len(args) != 2: + return await update.message.reply_text("Usage: `/setcredentials `") if users.set_credentials(user_id, Credential(args[0], args[1])): await update.message.reply_text("Credentials saved") else: diff --git a/models/credential.py b/models/credential.py index 5ff9e18..7af456f 100644 --- a/models/credential.py +++ b/models/credential.py @@ -2,6 +2,7 @@ class Credential: def __init__(self, username: str, password: str): self.username = username self.password = password + self.sessionid = None def to_dict(self) -> dict: return {"username": self.username, "password": self.password} diff --git a/services/avconnect.py b/services/avconnect.py index 9f06182..cae25f7 100644 --- a/services/avconnect.py +++ b/services/avconnect.py @@ -7,23 +7,41 @@ class AVConnectAPI: def __init__(self, credentials: Credential): self._ua = UserAgent(browsers=["Chrome Mobile"], os=["Android"], platforms=["mobile"]).random - self._username = credentials.username - self._password = credentials.password + self._credentials = credentials self._session = requests.Session() self._authenticated = False + + if credentials.sessionid: + self._session.cookies.set("PHPSESSID", credentials.sessionid) + self._authenticated = True def _authenticate(self) -> bool: login_url = f"{self._BASE_URL}/loginone.php" - headers = {"Content-Type": "application/x-www-form-urlencoded"} - payload = f"userid={self._username}&password={self._password}&entra=Login" + headers = { + "User-Agent": self._ua, + "Content-Type": "application/x-www-form-urlencoded" + } + payload = f"userid={self._credentials.username}&password={self._credentials.password}&entra=Login" response = self._session.post(login_url, data=payload, headers=headers) if response.ok and "PHPSESSID" in self._session.cookies: self._authenticated = True + print("Authenticated") return True return False + + def _check_sessionid(self) -> bool: + if not self._authenticated or not self._credentials.sessionid: + return False + exec_url = f"{self._BASE_URL}/exemacrocom.php" + headers = { + "User-Agent": self._ua, + } + response = self._session.get(exec_url, headers=headers) + print(response.ok) + return response.ok def exec_gate_macro(self, id_macro) -> bool: - if not self._authenticated and not self._authenticate(): + if (not self._authenticated or not self._check_sessionid()) and not self._authenticate(): raise Exception("Authentication failed.") exec_url = f"{self._BASE_URL}/exemacrocom.php" headers = {