diff --git a/access_control.py b/access_control.py index 50ba8b1..ec2b89f 100644 --- a/access_control.py +++ b/access_control.py @@ -13,7 +13,7 @@ class AccessControl: return json.load(f) except FileNotFoundError: return {} - + def _save_access(self): with open(self._ACCESS_FILE, "w") as f: json.dump(self._access, f) @@ -21,12 +21,12 @@ class AccessControl: def get_grantor(self, user_id: str, gate: str) -> str: access = self._access.get(user_id, {}) entry = access.get(gate) or access.get("all") - return entry.get("grantor", "") + return entry.get("grantor", "") if entry else "" def can_open_gate(self, user_id: str, gate: str) -> bool: access = self._access.get(user_id, {}) entry = access.get(gate) or access.get("all") - if not entry or entry["type"] != "timed": + if not entry or entry.get("type") != "timed": return False if datetime.now(timezone.utc) < datetime.fromisoformat(entry["expires_at"].replace("Z", "+00:00")): return True diff --git a/avconnect.py b/avconnect.py index d2e7963..f5d4dbd 100644 --- a/avconnect.py +++ b/avconnect.py @@ -1,5 +1,4 @@ import requests -import pprint from fake_useragent import UserAgent from commons import Credential @@ -15,12 +14,9 @@ class AVConnectAPI: def _authenticate(self) -> bool: login_url = f"{self._BASE_URL}/loginone.php" - headers = { - "Content-Type": "application/x-www-form-urlencoded" - } + headers = {"Content-Type": "application/x-www-form-urlencoded"} payload = f"userid={self._username}&password={self._password}&entra=Login" response = self._session.post(login_url, data=payload, headers=headers) - if response.ok and "PHPSESSID" in self._session.cookies: self._authenticated = True return True @@ -29,18 +25,13 @@ class AVConnectAPI: def exec_gate_macro(self, id_macro) -> bool: if not self._authenticated and not self._authenticate(): raise Exception("Authentication failed.") - exec_url = f"{self._BASE_URL}/exemacrocom.php" headers = { "User-Agent": self._ua, "Content-Type": "application/x-www-form-urlencoded; charset=UTF-8" } payload = f"idmacrocom={id_macro}&nome=16" - - response = self._session.prepare_request(requests.Request("POST", exec_url, data=payload, headers=headers)) - pprint.pprint(response.headers) - return True - #response = self._session.post(exec_url, data=payload, headers=headers) - #if response.ok: - # return True - #return False \ No newline at end of file + # Uncomment for real request: + # response = self._session.post(exec_url, data=payload, headers=headers) + # return response.ok + return True # For testing \ No newline at end of file diff --git a/bot.py b/bot.py index 31ebfb5..2420a12 100644 --- a/bot.py +++ b/bot.py @@ -1,5 +1,5 @@ -from telegram import Update -from telegram.ext import Application, CommandHandler, MessageHandler, ContextTypes +from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup +from telegram.ext import Application, CommandHandler, MessageHandler, ContextTypes, CallbackQueryHandler from datetime import datetime from config import BotConfig from gates import Gates @@ -23,7 +23,30 @@ async def updateuser(update: Update, context: ContextTypes.DEFAULT_TYPE): users.update_user(user_id, username, fullname) async def start(update: Update, context: ContextTypes.DEFAULT_TYPE): - await update.message.reply_text("Welcome to GatekeeperBot! Use `/setcredentials` to configure your access") + user_id = str(update.effective_user.id) + role = users.get_role(user_id) + keyboard = [] + + if role == Role.GUEST: + # Guests: can request access, or open a gate if granted + if users.has_grants(user_id): + keyboard.append([InlineKeyboardButton("Open Gate", callback_data="open_gate_menu")]) + keyboard.append([InlineKeyboardButton("Request Access", callback_data="request_access")]) + elif role == Role.MEMBER: + # Members: can open gates and set credentials + keyboard.append([InlineKeyboardButton("Open Gate", callback_data="open_gate_menu")]) + keyboard.append([InlineKeyboardButton("Set Credentials", callback_data="set_credentials")]) + elif role == Role.ADMIN: + # Admins: can open gates, grant access, and set credentials + keyboard.append([InlineKeyboardButton("Open Gate", callback_data="open_gate_menu")]) + keyboard.append([InlineKeyboardButton("Grant Access", callback_data="grant_access")]) + keyboard.append([InlineKeyboardButton("Set Credentials", callback_data="set_credentials")]) + + reply_markup = InlineKeyboardMarkup(keyboard) + await update.message.reply_text( + "Welcome to GatekeeperBot! Use the buttons below or commands.", + reply_markup=reply_markup + ) async def setcredentials(update: Update, context: ContextTypes.DEFAULT_TYPE): user_id = str(update.effective_user.id) @@ -31,7 +54,7 @@ async def setcredentials(update: Update, context: ContextTypes.DEFAULT_TYPE): if len(args) != 2: return await update.message.reply_text("Usage: `/setcredentials `") role = users.get_role(user_id) - if role not in ("admin", "member"): + if role not in (Role.ADMIN, Role.MEMBER): return await update.message.reply_text("Only members or admins can set credentials") if users.set_credentials(user_id, Credential(args[0], args[1])): await update.message.reply_text("Credentials saved") @@ -57,7 +80,7 @@ async def opengate(update: Update, context: ContextTypes.DEFAULT_TYPE): return await update.message.reply_text("No valid grantor available.") if not creds: return await update.message.reply_text("No valid grantor credentials available.") - #TODO: update guest last_used_at + users.update_grant_last_used(user_id, gate) try: await context.bot.send_message(chat_id=grantor, text=f"Guest {user_id} opened {gate_name}") except Exception as e: @@ -69,10 +92,61 @@ async def opengate(update: Update, context: ContextTypes.DEFAULT_TYPE): return await update.message.reply_text(f"Gate {gate_name} opened!") await update.message.reply_text(f"ERROR: Cannot open gate {gate_name}") +async def open_gate_menu(update: Update, context: ContextTypes.DEFAULT_TYPE): + # Show a list of available gates as buttons + keyboard = [ + [InlineKeyboardButton(gate.name, callback_data=f"opengate_{gate_id}")] + for gate_id, gate in gates._gates.items() + ] + reply_markup = InlineKeyboardMarkup(keyboard) + if update.callback_query: + await update.callback_query.answer() + await update.callback_query.edit_message_text( + "Select a gate to open:", reply_markup=reply_markup + ) + else: + await update.message.reply_text("Select a gate to open:", reply_markup=reply_markup) + +async def handle_gate_open_callback(update: Update, context: ContextTypes.DEFAULT_TYPE): + query = update.callback_query + user_id = str(query.from_user.id) + data = query.data + if data.startswith("opengate_"): + gate_id = data[len("opengate_") :] + gate_name = gates.get_name(gate_id) + role = users.get_role(user_id) + if role in (Role.ADMIN, Role.MEMBER): + creds = users.get_credentials(user_id) + if not creds: + await query.answer("Please set your credentials with /setcredentials first", show_alert=True) + return + elif role == Role.GUEST and users.can_open_gate(user_id, gate_id): + grantor = users.get_grantor(user_id, gate_id) + creds = users.get_credentials(grantor) + if not grantor or not creds: + await query.answer("No valid grantor credentials available.", show_alert=True) + return + users.update_grant_last_used(user_id, gate_id) + try: + await context.bot.send_message(chat_id=grantor, text=f"Guest {user_id} opened {gate_name}") + except Exception as e: + print(f"Failed to notify {grantor} that guest {user_id} opened {gate_name}: {e}") + else: + # TODO: guest not working + await query.answer("Access denied.", show_alert=True) + return + + if gates.open_gate(gate_id, creds): + await query.answer(f"Gate {gate_name} opened!", show_alert=True) + await query.edit_message_text(f"Gate {gate_name} opened!") + else: + await query.answer(f"ERROR: Cannot open gate {gate_name}", show_alert=True) + await query.edit_message_text(f"ERROR: Cannot open gate {gate_name}") + async def requestaccess(update: Update, context: ContextTypes.DEFAULT_TYPE): user_id = str(update.effective_user.id) role = users.get_role(user_id) - if role not in ("guest"): + if role != Role.GUEST: return await update.message.reply_text("Only guests can request access.") if not context.args: return await update.message.reply_text("Usage: `/requestaccess`", parse_mode="Markdown") @@ -86,21 +160,29 @@ async def requestaccess(update: Update, context: ContextTypes.DEFAULT_TYPE): except Exception as e: print(f"Failed to notify {admin_id} that guest {user_id} requested access: {e}") -async def approve(update: Update, context: ContextTypes.DEFAULT_TYPE): - approver_id = str(update.effective_user.id) - if users.get_role(approver_id) != "admin": - return await update.message.reply_text("Only admins can approve access.") +async def handle_main_menu_callback(update: Update, context: ContextTypes.DEFAULT_TYPE): + query = update.callback_query + data = query.data + if data == "open_gate_menu": + await open_gate_menu(update, context) + elif data == "request_access": + await requestaccess(update, context) + +async def grantaccess(update: Update, context: ContextTypes.DEFAULT_TYPE): + grantor_id = str(update.effective_user.id) + if users.get_role(grantor_id) != Role.ADMIN: + return await update.message.reply_text("Only admins can grant access.") try: user_id = context.args[0] gate = context.args[1] expires_at = context.args[2] - users.grant_access(user_id, gate, datetime.fromisoformat(expires_at.replace("Z", "+00:00")), grantor_id=approver_id) - await update.message.reply_text(f"Access to {gate} granted to user {user_id}.") + users.grant_access(user_id, gate, datetime.fromisoformat(expires_at.replace("Z", "+00:00")), grantor_id=grantor_id) + await update.message.reply_text(f"Access to {gate} granted to user {user_id} until {expires_at}") try: await context.bot.send_message(chat_id=user_id, text=f"Access granted to gate {gate} up to {expires_at}") except Exception as e: - print(f"Failed to notify {user_id} that admin {approver_id} approved access for {gate} up to {expires_at}: {e}") - except: + print(f"Failed to notify {user_id} that admin {grantor_id} granted access for {gate} up to {expires_at}: {e}") + except Exception: await update.message.reply_text("Usage: `/approve `") def main(): @@ -110,7 +192,9 @@ def main(): app.add_handler(CommandHandler("setcredentials", setcredentials)) app.add_handler(CommandHandler("opengate", opengate)) app.add_handler(CommandHandler("requestaccess", requestaccess)) - app.add_handler(CommandHandler("approve", approve)) + app.add_handler(CommandHandler("grantaccess", grantaccess)) + app.add_handler(CallbackQueryHandler(handle_main_menu_callback, pattern="^(open_gate_menu|request_access)$")) + app.add_handler(CallbackQueryHandler(handle_gate_open_callback, pattern="^opengate_")) app.run_polling() if __name__ == "__main__": diff --git a/commons.py b/commons.py index 094dbc2..0488556 100644 --- a/commons.py +++ b/commons.py @@ -6,27 +6,12 @@ class Status(Enum): class Credential: def __init__(self, username: str, password: str): - self._username: str = username - self._password: str = password - - def __dict__(self): - return { - "username": self._username, - "password": self._password - } - - @property - def username(self) -> str: - return self._username - - @username.setter - def username(self, username: str): - self._username = username - - @property - def password(self) -> str: - return self._password - - @password.setter - def password(self, password: str): - self._password = password \ No newline at end of file + self.username = username + self.password = password + + def to_dict(self) -> dict: + return {"username": self.username, "password": self.password} + + @classmethod + def from_dict(cls, data: dict): + return cls(data.get("username", ""), data.get("password", "")) \ No newline at end of file diff --git a/config.py b/config.py index a5dac82..645759d 100644 --- a/config.py +++ b/config.py @@ -2,18 +2,16 @@ import json from telegram import BotCommand class BotConfig: - def __init__(self, bot_username: str, json_path: str = "./data/config.json"): - self._json_path: str = json_path - self._bot_username: str = bot_username + self._json_path = json_path + self._bot_username = bot_username self._token: str = "" self._name: str = "" self._description: str = "" self._short_description: str = "" self._commands: list[BotCommand] = [] - self._load_config() - + def _load_config(self): try: with open(self._json_path, "r") as f: @@ -27,24 +25,24 @@ class BotConfig: for command, description in config.get("commands", {}).items() ] except Exception: - return {} + pass @property def token(self) -> str: return self._token - + @property def name(self) -> str: return self._name - + @property def description(self) -> str: return self._description - + @property def short_description(self) -> str: return self._short_description - + @property def commands(self) -> list[BotCommand]: return self._commands \ No newline at end of file diff --git a/gates.py b/gates.py index c1fe752..813d1e4 100644 --- a/gates.py +++ b/gates.py @@ -2,47 +2,37 @@ import json from avconnect import AVConnectAPI from commons import Status, Credential -class _Gate: - def __init__(self, id: str, name: str, status: int | Status = Status.ENABLED): - self._id: str = id - self._name: str = name - self._status: Status = status if isinstance(status, Status) else Status(status) - - @property - def id(self) -> str: - return self._id - - @property - def name(self) -> str: - return self._name - - @property - def status(self) -> Status: - return self._status - - @status.setter - def status(self, status: int | Status): - self._status = status if isinstance(status, Status) else Status(status) +class Gate: + def __init__(self, id: str, name: str, status: Status = Status.ENABLED): + self.id = id + self.name = name + self.status = status if isinstance(status, Status) else Status(status) + + def to_dict(self): + return {"id": self.id, "name": self.name, "status": self.status.value} + + @classmethod + def from_dict(cls, data: dict): + return cls(data["id"], data["name"], Status(data.get("status", Status.ENABLED))) class Gates: - def __init__(self, json_path: str = "./data/gates.json"): self._json_path: str = json_path - self._gates: dict[str, _Gate] = self._load_gates() + self._gates: dict[str, Gate] = self._load_gates() - def _load_gates(self) -> dict[str, _Gate]: + def _load_gates(self) -> dict[str, Gate]: try: with open(self._json_path, "r") as file: gates_data = json.load(file) - return {gate: _Gate(data["id"], data["name"]) for gate, data in gates_data.items()} + return {gate: Gate.from_dict(data) for gate, data in gates_data.items()} except Exception: return {} - def get_name(self, gate: str) -> str: - return self._gates.get(gate, {}).name + def get_name(self, gate: str) -> str | None: + return self._gates[gate].name if gate in self._gates else None def open_gate(self, gate: str, credentials: Credential) -> bool: - if gate not in self._gates.keys(): + if gate not in self._gates: return False if self._gates[gate].status == Status.DISABLED: return False diff --git a/users.py b/users.py index 57f43ee..5799623 100644 --- a/users.py +++ b/users.py @@ -8,221 +8,190 @@ class Role(Enum): MEMBER = "member" GUEST = "guest" -class _Grant: - def __init__(self, grantor: str, expires_at: datetime, granted_at: datetime = datetime.now(), last_used_at: datetime = None, status: Status = Status.ENABLED): - self._grantor: str = grantor - self._granted_at: datetime = granted_at - self._expires_at = expires_at - self._last_used_at = last_used_at - self._status: Status = status - - def __dict__(self): +class Grant: + def __init__( + self, + grantor: str, + expires_at: datetime, + granted_at: datetime = None, + last_used_at: datetime = None, + status: Status = Status.ENABLED + ): + self.grantor = grantor + self.granted_at = granted_at or datetime.now(timezone.utc) + self.expires_at = expires_at + self.last_used_at = last_used_at + self.status = status + + def to_dict(self) -> dict: return { - "grantor": self._grantor, - "granted_at": self._granted_at.isoformat(), - "expires_at": self._expires_at.isoformat(), - "last_used_at": self._last_used_at.isoformat() if self._last_used_at else None, - "status": self._status.value + "grantor": self.grantor, + "granted_at": self.granted_at.isoformat(), + "expires_at": self.expires_at.isoformat(), + "last_used_at": self.last_used_at.isoformat() if self.last_used_at else None, + "status": self.status.value } - @property - def grantor(self) -> str: - return self._grantor - - @grantor.setter - def grantor(self, grantor: str): - self._grantor = grantor - - @property - def granted_at(self) -> datetime: - return self._granted_at - - @property - def expires_at(self) -> datetime: - return self._expires_at - - @property - def last_used_at(self) -> datetime: - return self._last_used_at - - @property - def status(self) -> Status: - return self._status + @classmethod + def from_dict(cls, data: dict): + return cls( + grantor=data.get("grantor", ""), + expires_at=datetime.fromisoformat(data["expires_at"].replace("Z", "+00:00")), + granted_at=datetime.fromisoformat(data["granted_at"].replace("Z", "+00:00")) if data.get("granted_at") else None, + last_used_at=datetime.fromisoformat(data["last_used_at"].replace("Z", "+00:00")) if data.get("last_used_at") else None, + status=Status(data.get("status", Status.ENABLED)) + ) -class _User: - def __init__(self, id: str, username: str, fullname: str, role: Role = Role.GUEST, credentials: Credential = Credential("", ""), grants: dict = None, status: Status = Status.ENABLED): - self._id: str = id - self._username: str = username - self._fullname: str = fullname - self._role: Role = role if isinstance(role, Role) else Role(role) - self._credentials: Credential = credentials - self._grants: dict[str, _Grant] = {gate:_Grant(grant.get("grantor", ""), datetime.fromisoformat(grant.get("expires_at").replace("Z", "+00:00"))) for gate, grant in grants.items()} if grants else {} - self._status: Status = status - - def __dict__(self): +class User: + def __init__( + self, + id: str, + username: str, + fullname: str, + role: Role = Role.GUEST, + credentials: Credential = None, + grants: dict[str, Grant] = None, + status: Status = Status.ENABLED + ): + self.id = id + self.username = username + self.fullname = fullname + self.role = role if isinstance(role, Role) else Role(role) + self.credentials = credentials or Credential("", "") + self.grants = grants or {} + self.status = status if isinstance(status, Status) else Status(status) + + def to_dict(self) -> dict: return { - "id": self._id, - "username": self._username, - "fullname": self._fullname, - "role": self._role.value, - "credentials": self._credentials.__dict__(), - "grants": {gate: grant.__dict__() for gate, grant in self._grants.items()}, - "status": self._status.value + "id": self.id, + "username": self.username, + "fullname": self.fullname, + "role": self.role.value, + "credentials": self.credentials.to_dict(), + "grants": {gate: grant.to_dict() for gate, grant in self.grants.items()}, + "status": self.status.value } - - @property - def id(self) -> str: - return self._id - - @property - def username(self) -> str: - return self._username - - @username.setter - def username(self, username: str): - self._username = username - - @property - def fullname(self) -> str: - return self._fullname - - @fullname.setter - def fullname(self, fullname: str): - self._fullname = fullname - - @property - def role(self) -> Role: - return self._role - - @role.setter - def role(self, role: str | Role): - self._role = role if isinstance(role, Role) else Role(role) - - @property - def credentials(self) -> Credential: - return self._credentials - - @credentials.setter - def credentials(self, credentials: Credential): - self._credentials = credentials - - @property - def grants(self) -> dict[str, _Grant]: - return self._grants - - @property - def status(self) -> Status: - return self._status - - @status.setter - def status(self, status: int | Status): - self._status = status if isinstance(status, Status) else Status(status) - - def grant(self, gate: str, grant: _Grant): - self._grants[gate] = grant - - def revoke(self, gate: str): - self._grants.pop(gate, None) + @classmethod + def from_dict(cls, id: str, data: dict): + credentials = Credential.from_dict(data.get("credentials", {})) + grants = {gate: Grant.from_dict(grant) for gate, grant in data.get("grants", {}).items()} + return cls( + id=id, + username=data.get("username", ""), + fullname=data.get("fullname", ""), + role=Role(data.get("role", Role.GUEST)), + credentials=credentials, + grants=grants, + status=Status(data.get("status", Status.ENABLED)) + ) class Users: def __init__(self, json_path: str = "./data/users.json"): - self._json_path: str = json_path - self._users = self._load_users() + self._json_path = json_path + self._users: dict[str, User] = self._load_users() - def _load_users(self) -> dict[str, _User]: + def _load_users(self) -> dict[str, User]: try: with open(self._json_path, "r") as f: - users = {} - for uid, info in json.load(f).items(): - try: - user = _User( - id=uid, - username=info.get("username", ""), - fullname=info.get("fullname", ""), - role=info.get("role", Role.GUEST), - credentials=Credential(info.get("credentials", {}).get("username", ""), info.get("credentials", {}).get("password", "")), - grants={gate: info for gate, info in info.get("grants", {}).items()}, - status=info.get("status", Status.ENABLED) - ) - users[uid] = user - except: - continue - return users + data = json.load(f) + return {uid: User.from_dict(uid, info) for uid, info in data.items()} except Exception: return {} def _save_users(self) -> None: with open(self._json_path, "w") as f: - json.dump({uid: user.__dict__ for uid, user in self._users.items()}, f, default=str) - + json.dump({uid: user.to_dict() for uid, user in self._users.items()}, f, indent=2) + def update_user(self, id: str, username: str, fullname: str) -> bool: - if id in self._users.keys(): + if not id or not username or not fullname: + return False + if id in self._users: self._users[id].username = username self._users[id].fullname = fullname else: - self._users[id] = _User(id, username, fullname) + self._users[id] = User(id, username, fullname) self._save_users() return True - - def get_username(self, id: str) -> str: - if id in self._users.keys(): - return self._users[id].username - return None - - def get_fullname(self, id: str) -> str: - if id in self._users.keys(): - return self._users[id].fullname - return None - - def get_status(self, id: str) -> Status: - if id in self._users.keys(): - return self._users[id].status - return Status.DISABLED - - def get_role(self, id: str) -> Role: - if id in self._users.keys(): - return self._users[id].role - return Role.GUEST - def get_credentials(self, id: str) -> Credential: - if id in self._users.keys(): - return self._users[id].credentials - return None - + def get_username(self, id: str) -> str | None: + return self._users[id].username if id in self._users else None + + def get_fullname(self, id: str) -> str | None: + return self._users[id].fullname if id in self._users else None + + def get_status(self, id: str) -> Status: + return self._users[id].status if id in self._users else Status.DISABLED + + def get_role(self, id: str) -> Role: + return self._users[id].role if id in self._users else Role.GUEST + + def get_credentials(self, id: str) -> Credential | None: + return self._users[id].credentials if id in self._users else None + def set_credentials(self, id: str, credentials: Credential) -> bool: - if id in self._users.keys(): + if id in self._users: self._users[id].credentials = credentials + self._save_users() return True return False - + def can_open_gate(self, id: str, gate: str) -> bool: - if id in self._users.keys(): - if gate in self._users[id].grants.keys(): - if self._users[id].grants[gate].status == Status.ENABLED: - if self._users[id].grants[gate].expires_at > datetime.now(timezone.utc): - return True - return False - - def get_grantor(self, id: str, gate: str) -> str: - if id in self._users.keys(): - if gate in self._users[id].grants.keys(): - return self._users[id].grants[gate].grantor - return None + user = self._users.get(id) + if not user or user.status != Status.ENABLED: + return False + if user.role == Role.ADMIN or user.role == Role.MEMBER: + return True + grant = user.grants.get(gate) + if not grant or grant.status != Status.ENABLED: + return False + if grant.expires_at <= datetime.now(timezone.utc): + return False + return True + + def has_grants(self, id: str) -> bool: + user = self._users.get(id) + if not user or user.status != Status.ENABLED: + return False + if user.role == Role.ADMIN or user.role == Role.MEMBER: + return True + return any(grant.status == Status.ENABLED and grant.expires_at > datetime.now(timezone.utc) for grant in user.grants.values()) + + def get_grantor(self, id: str, gate: str) -> str | None: + user = self._users.get(id) + if not user: + return None + grant = user.grants.get(gate) + return grant.grantor if grant else None def get_admins(self) -> list[str]: return [uid for uid, user in self._users.items() if user.role == Role.ADMIN] - + def grant_access(self, id: str, gate: str, expires_at: datetime, grantor_id: str) -> bool: - if id in self._users.keys(): - self._users[id].grant(gate, _Grant(grantor_id, expires_at)) - self._save_users() - return True - return False - + user = self._users.get(id) + if not user: + return False + user.grants[gate] = Grant(grantor_id, expires_at) + self._save_users() + return True + def revoke_access(self, id: str, gate: str) -> bool: - if id in self._users.keys(): - self._users[id].revoke(gate) + user = self._users.get(id) + if not user: + return False + if gate in user.grants: + del user.grants[gate] self._save_users() return True return False + + def update_grant_last_used(self, user_id: str, gate: str) -> bool: + user = self._users.get(user_id) + if not user: + return False + grant = user.grants.get(gate) + if not grant: + return False + grant.last_used_at = datetime.now(timezone.utc) + self._save_users() + return True