Add token validation

This commit is contained in:
2025-08-21 00:49:18 +02:00
parent 626e6b6aee
commit c855fda3dc
4 changed files with 34 additions and 12 deletions

View File

@@ -5,17 +5,20 @@ from models import Users, Role
async def requestaccess(update: Update, context: ContextTypes.DEFAULT_TYPE, users: Users): async def requestaccess(update: Update, context: ContextTypes.DEFAULT_TYPE, users: Users):
assert update.effective_user is not None assert update.effective_user is not None
assert update.message is not None
user_id = str(update.effective_user.id) user_id = str(update.effective_user.id)
role = users.get_role(user_id) role = users.get_role(user_id)
if role != Role.GUEST: if role != Role.GUEST:
return await update.message.reply_text("Only guests can request access.") if update.callback_query:
if not context.args: await update.callback_query.answer("Only guests can request access.")
return await update.message.reply_text("Usage: `/requestaccess`", parse_mode="Markdown") elif update.message:
return await update.message.reply_text("Only guests can request access.")
requester = users.get_fullname(user_id) or users.get_username(user_id) requester = users.get_fullname(user_id) or users.get_username(user_id)
text = (f"Access request: {requester} ({user_id}) requests access.\nUse `/grantaccess {user_id} <gate_id|all> YYYY-MM-DDTHH:MM:SSZ` to grant access.") text = (f"Access request: {requester} ({user_id}) requests access.\nUse `/grantaccess {user_id} <gate_id|all> YYYY-MM-DDTHH:MM:SSZ` to grant access.")
await update.message.reply_text("Your request has been submitted.") if update.callback_query:
await update.callback_query.answer("Your request has been submitted.")
elif update.message:
return await update.message.reply_text("Your request has been submitted.")
admins = users.get_admins() admins = users.get_admins()
for admin_id in admins: for admin_id in admins:
try: try:

View File

@@ -9,11 +9,11 @@ async def setcredentials(update: Update, context: ContextTypes.DEFAULT_TYPE, use
user_id = str(update.effective_user.id) user_id = str(update.effective_user.id)
args = context.args args = context.args
if len(args) != 2:
return await update.message.reply_text("Usage: `/setcredentials <username> <password>`")
role = users.get_role(user_id) role = users.get_role(user_id)
if role not in (Role.ADMIN, Role.MEMBER): if role not in (Role.ADMIN, Role.MEMBER):
return await update.message.reply_text("Only members or admins can set credentials") return await update.message.reply_text("Only members or admins can set credentials")
if len(args) != 2:
return await update.message.reply_text("Usage: `/setcredentials <username> <password>`")
if users.set_credentials(user_id, Credential(args[0], args[1])): if users.set_credentials(user_id, Credential(args[0], args[1])):
await update.message.reply_text("Credentials saved") await update.message.reply_text("Credentials saved")
else: else:

View File

@@ -2,6 +2,7 @@ class Credential:
def __init__(self, username: str, password: str): def __init__(self, username: str, password: str):
self.username = username self.username = username
self.password = password self.password = password
self.sessionid = None
def to_dict(self) -> dict: def to_dict(self) -> dict:
return {"username": self.username, "password": self.password} return {"username": self.username, "password": self.password}

View File

@@ -7,23 +7,41 @@ class AVConnectAPI:
def __init__(self, credentials: Credential): def __init__(self, credentials: Credential):
self._ua = UserAgent(browsers=["Chrome Mobile"], os=["Android"], platforms=["mobile"]).random self._ua = UserAgent(browsers=["Chrome Mobile"], os=["Android"], platforms=["mobile"]).random
self._username = credentials.username self._credentials = credentials
self._password = credentials.password
self._session = requests.Session() self._session = requests.Session()
self._authenticated = False self._authenticated = False
if credentials.sessionid:
self._session.cookies.set("PHPSESSID", credentials.sessionid)
self._authenticated = True
def _authenticate(self) -> bool: def _authenticate(self) -> bool:
login_url = f"{self._BASE_URL}/loginone.php" login_url = f"{self._BASE_URL}/loginone.php"
headers = {"Content-Type": "application/x-www-form-urlencoded"} headers = {
payload = f"userid={self._username}&password={self._password}&entra=Login" "User-Agent": self._ua,
"Content-Type": "application/x-www-form-urlencoded"
}
payload = f"userid={self._credentials.username}&password={self._credentials.password}&entra=Login"
response = self._session.post(login_url, data=payload, headers=headers) response = self._session.post(login_url, data=payload, headers=headers)
if response.ok and "PHPSESSID" in self._session.cookies: if response.ok and "PHPSESSID" in self._session.cookies:
self._authenticated = True self._authenticated = True
print("Authenticated")
return True return True
return False return False
def _check_sessionid(self) -> bool:
if not self._authenticated or not self._credentials.sessionid:
return False
exec_url = f"{self._BASE_URL}/exemacrocom.php"
headers = {
"User-Agent": self._ua,
}
response = self._session.get(exec_url, headers=headers)
print(response.ok)
return response.ok
def exec_gate_macro(self, id_macro) -> bool: def exec_gate_macro(self, id_macro) -> bool:
if not self._authenticated and not self._authenticate(): if (not self._authenticated or not self._check_sessionid()) and not self._authenticate():
raise Exception("Authentication failed.") raise Exception("Authentication failed.")
exec_url = f"{self._BASE_URL}/exemacrocom.php" exec_url = f"{self._BASE_URL}/exemacrocom.php"
headers = { headers = {