diff --git a/access_control.py b/access_control.py index ad4d95a..50ba8b1 100644 --- a/access_control.py +++ b/access_control.py @@ -1,40 +1,45 @@ import json from datetime import datetime, timezone -ACCESS_FILE = "./data/access.json" +class AccessControl: + _ACCESS_FILE = "./data/access.json" -def load_access(): - try: - with open(ACCESS_FILE, "r") as f: - return json.load(f) - except FileNotFoundError: - return {} + def __init__(self): + self._access = self._load_access() -def save_access(data): - with open(ACCESS_FILE, "w") as f: - json.dump(data, f) + def _load_access(self) -> dict: + try: + with open(self._ACCESS_FILE, "r") as f: + return json.load(f) + except FileNotFoundError: + return {} + + def _save_access(self): + with open(self._ACCESS_FILE, "w") as f: + json.dump(self._access, f) -def get_grantor(user_id, gate): - access = load_access().get(str(user_id), {}) - entry = access.get(gate) or access.get("all") - return entry.get("grantor", "") + def get_grantor(self, user_id: str, gate: str) -> str: + access = self._access.get(user_id, {}) + entry = access.get(gate) or access.get("all") + return entry.get("grantor", "") -def can_open_gate(user_id, gate): - access = load_access().get(str(user_id), {}) - entry = access.get(gate) or access.get("all") - if not entry or entry["type"] != "timed": + def can_open_gate(self, user_id: str, gate: str) -> bool: + access = self._access.get(user_id, {}) + entry = access.get(gate) or access.get("all") + if not entry or entry["type"] != "timed": + return False + if datetime.now(timezone.utc) < datetime.fromisoformat(entry["expires_at"].replace("Z", "+00:00")): + return True return False - if datetime.now(timezone.utc) < datetime.fromisoformat(entry["expires_at"].replace("Z", "+00:00")): - return True - return False -def grant_access(user_id, gate, expires_at, grantor_id): - access = load_access() - user_access = access.get(str(user_id), {}) - user_access[gate] = { - "type": "timed", - "expires_at": expires_at, - "grantor": grantor_id - } - access[str(user_id)] = user_access - save_access(access) + def grant_access(self, user_id: str, gate: str, expires_at: str, grantor_id: str): + user_access = self._access.get(user_id, {}) + user_access[gate] = { + "type": "timed", + "granted_at": datetime.now(timezone.utc).isoformat(), + "expires_at": expires_at, + "grantor": grantor_id, + "last_used_at": None + } + self._access[user_id] = user_access + self._save_access() diff --git a/avconnect.py b/avconnect.py index efa35e8..d2e7963 100644 --- a/avconnect.py +++ b/avconnect.py @@ -1,42 +1,46 @@ import requests import pprint - -BASE_URL = "https://www.avconnect.it" +from fake_useragent import UserAgent +from commons import Credential class AVConnectAPI: - def __init__(self, username: str, password: str): - self.username = username - self.password = password - self.session = requests.Session() - self.authenticated = False + _BASE_URL = "https://www.avconnect.it" - def authenticate(self) -> bool: - login_url = f"{BASE_URL}/loginone.php" + def __init__(self, credentials: Credential): + self._ua = UserAgent(browsers=["Chrome Mobile"], os=["Android"], platforms=["mobile"]).random + self._username = credentials.username + self._password = credentials.password + self._session = requests.Session() + self._authenticated = False + + def _authenticate(self) -> bool: + login_url = f"{self._BASE_URL}/loginone.php" headers = { "Content-Type": "application/x-www-form-urlencoded" } - payload = f"userid={self.username}&password={self.password}&entra=Login" - response = self.session.post(login_url, data=payload, headers=headers) + payload = f"userid={self._username}&password={self._password}&entra=Login" + response = self._session.post(login_url, data=payload, headers=headers) - if response.ok and "PHPSESSID" in self.session.cookies: - self.authenticated = True + if response.ok and "PHPSESSID" in self._session.cookies: + self._authenticated = True return True return False - def exec_gate_macro(self, id_macro) -> requests.Response: - if not self.authenticated and not self.authenticate(): + def exec_gate_macro(self, id_macro) -> bool: + if not self._authenticated and not self._authenticate(): raise Exception("Authentication failed.") - exec_url = f"{BASE_URL}/exemacrocom.php" + exec_url = f"{self._BASE_URL}/exemacrocom.php" headers = { + "User-Agent": self._ua, "Content-Type": "application/x-www-form-urlencoded; charset=UTF-8" } payload = f"idmacrocom={id_macro}&nome=16" - response = self.session.prepare_request(requests.Request("POST", exec_url, data=payload, headers=headers)) + response = self._session.prepare_request(requests.Request("POST", exec_url, data=payload, headers=headers)) pprint.pprint(response.headers) return True - #response = self.session.post(exec_url, data=payload, headers=headers) + #response = self._session.post(exec_url, data=payload, headers=headers) #if response.ok: # return True #return False \ No newline at end of file diff --git a/bot.py b/bot.py index 91254c2..b363762 100644 --- a/bot.py +++ b/bot.py @@ -1,38 +1,42 @@ from telegram import Update from telegram.ext import Application, CommandHandler, MessageHandler, ContextTypes -from config import get_bot_token, get_commands -from access_control import can_open_gate, grant_access, get_grantor -from gates import get_name, open_gate -from users import get_role, set_credentials, get_credentials, get_grantor_credentials, get_admins, update_user, get_fullname, get_username +from config import BotConfig +from gates import Gates +from users import Users, Credential, Role -BOT_NAME = "lagomareGateKeeperBot" +BOT_USERNAME = "lagomareGateKeeperBot" +bot_config = BotConfig(BOT_USERNAME) +gates = Gates() +users = Users() async def post_init(application: Application) -> None: - bot_commands = get_commands(BOT_NAME) + bot_commands = bot_config.commands await application.bot.set_my_commands(bot_commands) - await application.bot.set_my_name("Lagomare GateKeeper Bot") - await application.bot.set_my_description("This bot is used to open Lagomare gates by members. You can also request timed access if you are a guest.") - await application.bot.set_my_short_description("Open Lagomare gates or request guest access") + await application.bot.set_my_name(bot_config.name) + await application.bot.set_my_description(bot_config.description) + await application.bot.set_my_short_description(bot_config.short_description) async def updateuser(update: Update, context: ContextTypes.DEFAULT_TYPE): user_id = str(update.effective_user.id) username = update.effective_user.username fullname = update.effective_user.full_name - update_user(user_id, username, fullname) + users.update_user(user_id, username, fullname) async def start(update: Update, context: ContextTypes.DEFAULT_TYPE): - await update.message.reply_text("Welcome to GatekeeperBot! Use `/setcredentials` to configure your access.") + await update.message.reply_text("Welcome to GatekeeperBot! Use `/setcredentials` to configure your access") async def setcredentials(update: Update, context: ContextTypes.DEFAULT_TYPE): user_id = str(update.effective_user.id) args = context.args if len(args) != 2: return await update.message.reply_text("Usage: `/setcredentials `") - role = get_role(user_id) + role = users.get_role(user_id) if role not in ("admin", "member"): - return await update.message.reply_text("Only members or admins can set credentials.") - set_credentials(user_id, args[0], args[1]) - await update.message.reply_text("Credentials saved.") + return await update.message.reply_text("Only members or admins can set credentials") + if users.set_credentials(user_id, Credential(args[0], args[1])): + await update.message.reply_text("Credentials saved") + else: + await update.message.reply_text("Error saving credentials") async def opengate(update: Update, context: ContextTypes.DEFAULT_TYPE): user_id = str(update.effective_user.id) @@ -40,55 +44,57 @@ async def opengate(update: Update, context: ContextTypes.DEFAULT_TYPE): if not args: return await update.message.reply_text("Usage: `/opengate `") gate = args[0] - role = get_role(user_id) - if role in ("admin", "member"): - creds = get_credentials(user_id) + gate_name = gates.get_name(gate) + role = users.get_role(user_id) + if role in (Role.ADMIN, Role.MEMBER): + creds = users.get_credentials(user_id) if not creds: - return await update.message.reply_text("Please set your credentials with `/setcredentials` first.") - elif role == "guest" and can_open_gate(user_id, gate): - creds = get_grantor_credentials(user_id, gate) + return await update.message.reply_text("Please set your credentials with `/setcredentials` first") + elif role == Role.GUEST and users.can_open_gate(user_id, gate): + grantor = users.get_grantor(user_id, gate) + creds = users.get_credentials(grantor) + if not grantor: + return await update.message.reply_text("No valid grantor available.") if not creds: return await update.message.reply_text("No valid grantor credentials available.") - grantor = get_grantor(user_id, gate) - if grantor: - try: - await context.bot.send_message(chat_id=grantor, text=f"Guest {user_id} opened {get_name(gate)}") - except Exception as e: - print(f"Failed to notify {grantor} that guest {user_id} opened {get_name(gate)}: {e}") + #TODO: update guest last_used_at + try: + await context.bot.send_message(chat_id=grantor, text=f"Guest {user_id} opened {gate_name}") + except Exception as e: + print(f"Failed to notify {grantor} that guest {user_id} opened {gate_name}: {e}") else: return await update.message.reply_text("Access denied.") - if open_gate(gate, creds): - return await update.message.reply_text(f"Gate {get_name(gate)} opened!") - await update.message.reply_text(f"ERROR: Cannot open gate {get_name(gate)}") + if gates.open_gate(gate, creds): + return await update.message.reply_text(f"Gate {gate_name} opened!") + await update.message.reply_text(f"ERROR: Cannot open gate {gate_name}") async def requestaccess(update: Update, context: ContextTypes.DEFAULT_TYPE): user_id = str(update.effective_user.id) - role = get_role(user_id) + role = users.get_role(user_id) if role not in ("guest"): return await update.message.reply_text("Only guests can request access.") if not context.args: - return await update.message.reply_text("Usage: `/requestaccess `") - gate = context.args[0] - requester = get_fullname(user_id) or get_username(user_id) - text = (f"Access request: {requester} ({user_id}) requests access to *{gate}*.\nUse `/approve {user_id} {gate} 60` to grant access.") + return await update.message.reply_text("Usage: `/requestaccess`", parse_mode="Markdown") + requester = users.get_fullname(user_id) or users.get_username(user_id) + text = (f"Access request: {requester} ({user_id}) requests access.\nUse `/approve {user_id} YYYY-MM-DDTHH:MM:SSZ` to grant access.") await update.message.reply_text("Your request has been submitted.") - admins = get_admins() + admins = users.get_admins() for admin_id in admins: try: await context.bot.send_message(chat_id=admin_id, text=text, parse_mode="Markdown") except Exception as e: - print(f"Failed to notify {admin_id} that guest {user_id} requested access for {gate}: {e}") + print(f"Failed to notify {admin_id} that guest {user_id} requested access: {e}") async def approve(update: Update, context: ContextTypes.DEFAULT_TYPE): approver_id = str(update.effective_user.id) - if get_role(approver_id) != "admin": + if users.get_role(approver_id) != "admin": return await update.message.reply_text("Only admins can approve access.") try: user_id = context.args[0] gate = context.args[1] expires_at = context.args[2] - grant_access(user_id, gate, expires_at, grantor_id=approver_id) + users.grant_access(user_id, gate, expires_at, grantor_id=approver_id) await update.message.reply_text(f"Access to {gate} granted to user {user_id}.") try: await context.bot.send_message(chat_id=user_id, text=f"Access granted to gate {gate} up to {expires_at}") @@ -98,7 +104,7 @@ async def approve(update: Update, context: ContextTypes.DEFAULT_TYPE): await update.message.reply_text("Usage: `/approve `") def main(): - app = Application.builder().token(get_bot_token(BOT_NAME)).post_init(post_init).build() + app = Application.builder().token(bot_config.token).post_init(post_init).build() app.add_handler(MessageHandler(None, updateuser), group=1) app.add_handler(CommandHandler("start", start)) app.add_handler(CommandHandler("setcredentials", setcredentials)) diff --git a/commons.py b/commons.py new file mode 100644 index 0000000..094dbc2 --- /dev/null +++ b/commons.py @@ -0,0 +1,32 @@ +from enum import Enum + +class Status(Enum): + ENABLED = 1 + DISABLED = 0 + +class Credential: + def __init__(self, username: str, password: str): + self._username: str = username + self._password: str = password + + def __dict__(self): + return { + "username": self._username, + "password": self._password + } + + @property + def username(self) -> str: + return self._username + + @username.setter + def username(self, username: str): + self._username = username + + @property + def password(self) -> str: + return self._password + + @password.setter + def password(self, password: str): + self._password = password \ No newline at end of file diff --git a/config.py b/config.py index 425d111..a5dac82 100644 --- a/config.py +++ b/config.py @@ -1,21 +1,50 @@ import json from telegram import BotCommand -ACCESS_FILE = "./data/config.json" +class BotConfig: -def load_config(): - try: - with open(ACCESS_FILE, "r") as f: - return json.load(f) - except FileNotFoundError: - return {} + def __init__(self, bot_username: str, json_path: str = "./data/config.json"): + self._json_path: str = json_path + self._bot_username: str = bot_username + self._token: str = "" + self._name: str = "" + self._description: str = "" + self._short_description: str = "" + self._commands: list[BotCommand] = [] -def get_bot_token(bot_name): - config = load_config().get(str(bot_name), {}) - return config.get("token", "") + self._load_config() + + def _load_config(self): + try: + with open(self._json_path, "r") as f: + config = json.load(f).get(self._bot_username, {}) + self._token = config.get("token", "") + self._name = config.get("name", "") + self._description = config.get("description", "") + self._short_description = config.get("short_description", "") + self._commands = [ + BotCommand(command, description) + for command, description in config.get("commands", {}).items() + ] + except Exception: + return {} -def get_commands(bot_name): - config = load_config().get(str(bot_name), {}) - commands = config.get("commands", {}) - bot_commands = [] - return [BotCommand(command, description) for command, description in commands.items()] \ No newline at end of file + @property + def token(self) -> str: + return self._token + + @property + def name(self) -> str: + return self._name + + @property + def description(self) -> str: + return self._description + + @property + def short_description(self) -> str: + return self._short_description + + @property + def commands(self) -> list[BotCommand]: + return self._commands \ No newline at end of file diff --git a/gates.py b/gates.py index 5b18783..9cd1592 100644 --- a/gates.py +++ b/gates.py @@ -1,27 +1,54 @@ import json from avconnect import AVConnectAPI +from commons import Status, Credential -GATE_FILE = "./data/gates.json" +class _Gate: + def __init__(self, id: str, name: str, status: int | Status = Status.ENABLED): + self._id: str = id + self._name: str = name + self._status: Status = status if isinstance(status, Status) else Status(status) + + @property + def id(self) -> str: + return self._id + + @property + def name(self) -> str: + return self._name + + @property + def status(self) -> Status: + return self._status + + @status.setter + def status(self, status: int | Status): + self._status = status if isinstance(status, Status) else Status(status) -def load_gates(): - try: - with open(GATE_FILE, "r") as f: - return json.load(f) - except FileNotFoundError: - return {} +class Gates: -def get_name(gate): - return load_gates().get(str(gate), {}).get("name", "") + def __init__(self, json_path: str = "./data/gates.json"): + self._json_path: str = json_path + self._gates: dict[str, _Gate] = self._load_gates() -def open_gate(gate, credentials): - gate_info = load_gates().get(str(gate), {}) - if not gate_info: - return False - gate_id = gate_info["id"] + def _load_gates(self): + try: + with open(self._json_path, "r") as file: + gates_data = json.load(file) + return {gate: _Gate(data["id"], data["name"]) for gate, data in gates_data.items()} + except Exception: + return {} - try: - api = AVConnectAPI(credentials["username"], credentials["password"]) - return api.exec_gate_macro(gate_id) - except Exception as e: - print(f"Failed to open gate {gate}: {e}") - return False + def get_name(self, gate: str) -> str: + return self._gates.get(gate, {}).name + + def open_gate(self, gate: str, credentials: Credential) -> bool: + if gate not in self._gates.keys(): + return False + if self._gates[gate].status == Status.DISABLED: + return False + try: + api = AVConnectAPI(credentials) + return api.exec_gate_macro(self._gates[gate].id) + except Exception as e: + print(f"Failed to open gate {gate}: {e}") + return False diff --git a/requirements.txt b/requirements.txt index 5050803..6379964 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1 +1,2 @@ -python-telegram-bot==20.7 \ No newline at end of file +python-telegram-bot==20.7 +fake-useragent==2.2.0 \ No newline at end of file diff --git a/users.py b/users.py index dddcf5d..7e7e187 100644 --- a/users.py +++ b/users.py @@ -1,65 +1,213 @@ import json +from enum import Enum +from datetime import datetime, timezone +from commons import Status, Credential -USER_FILE = "./data/users.json" +class Role(Enum): + ADMIN = "admin" + MEMBER = "member" + GUEST = "guest" -def load_users(): - try: - with open(USER_FILE, "r") as f: - return json.load(f) - except FileNotFoundError: - return {} - -def save_users(data): - with open(USER_FILE, "w") as f: - json.dump(data, f) - -def update_user(user_id, username, fullname): - has_changes = False - users = load_users() - user = users.get(str(user_id), {}) +class _Grant: + def __init__(self, grantor: str, expires_at: datetime, granted_at: datetime = datetime.now(), last_used_at: datetime = None, status: int | Status = Status.ENABLED): + self._grantor: str = grantor + self._granted_at: datetime = granted_at + self._expires_at = expires_at + self._last_used_at = last_used_at + self._status: Status = status - if user.get("username", "") != username: - has_changes = True - user["username"] = username - if user.get("fullname", "") != fullname: - has_changes = True - user["fullname"] = fullname - if not user.get("role"): - has_changes = True - user["role"] = "guest" + def __dict__(self): + return { + "grantor": self._grantor, + "granted_at": self._granted_at.isoformat(), + "expires_at": self._expires_at.isoformat(), + "last_used_at": self._last_used_at.isoformat() if self._last_used_at else None, + "status": self._status.value + } + + @property + def grantor(self) -> str: + return self._grantor - if has_changes: - users[str(user_id)] = user - save_users(users) + @grantor.setter + def grantor(self, grantor: str): + self._grantor = grantor + + @property + def granted_at(self) -> datetime: + return self._granted_at + + @property + def expires_at(self) -> datetime: + return self._expires_at + + @property + def last_used_at(self) -> datetime: + return self._last_used_at + + @property + def status(self) -> Status: + return self._status + +class _User: + def __init__(self, id: str, username: str, fullname: str, role: str | Role = Role.GUEST, credentials: dict = None, grants: dict = None, status: int | Status = Status.ENABLED): + self._id: str = id + self._username: str = username + self._fullname: str = fullname + self._role: Role = role if isinstance(role, Role) else Role(role) + self._credentials: Credential = Credential(**credentials) + self._grants: dict[str, _Grant] = {gate: _Grant(**grant) for gate, grant in grants.items()} if grants else {} + self._status: Status = status if isinstance(status, Status) else Status(status) + + def __dict__(self): + return { + "id": self._id, + "username": self._username, + "fullname": self._fullname, + "role": self._role.value, + "credentials": self._credentials.__dict__(), + "grants": {gate: grant.__dict__() for gate, grant in self._grants.items()}, + "status": self._status.value + } + + @property + def id(self) -> str: + return self._id + + @property + def username(self) -> str: + return self._username + + @username.setter + def username(self, username: str): + self._username = username + + @property + def fullname(self) -> str: + return self._fullname + + @fullname.setter + def fullname(self, fullname: str): + self._fullname = fullname + + @property + def role(self) -> Role: + return self._role + + @role.setter + def role(self, role: str | Role): + self._role = role if isinstance(role, Role) else Role(role) + + @property + def credentials(self) -> Credential: + return self._credentials + + @credentials.setter + def credentials(self, credentials: Credential): + self._credentials = credentials + + @property + def grants(self) -> dict[str, _Grant]: + return self._grants + + @property + def status(self) -> Status: + return self._status + + @status.setter + def status(self, status: int | Status): + self._status = status if isinstance(status, Status) else Status(status) + + def grant(self, gate: str, grant: _Grant): + self._grants[gate] = grant + + def revoke(self, gate: str): + self._grants.pop(gate, None) -def get_role(user_id): - return load_users().get(str(user_id), {}).get("role", "guest") +class Users: + def __init__(self, json_path: str = "./data/users.json"): + self._json_path: str = json_path + self._users = self._load_users() -def get_username(user_id): - return load_users().get(str(user_id), {}).get("username", None) + def _load_users(self) -> dict[str, _User]: + try: + with open(self._json_path, "r") as f: + return {uid: _User(**info) for uid, info in json.load(f)} + except Exception: + return {} -def get_fullname(user_id): - return load_users().get(str(user_id), {}).get("fullname", None) - -def set_credentials(user_id, username, password): - users = load_users() - user = users.get(str(user_id), {}) - user["credentials"] = {"username": username, "password": password} - users[str(user_id)] = user - save_users(users) - -def get_credentials(user_id): - return load_users().get(str(user_id), {}).get("credentials") - -def get_grantor_credentials(user_id, gate): - from access_control import load_access - access = load_access().get(str(user_id), {}) - entry = access.get(gate) or access.get("all") - if not entry: + def _save_users(self) -> None: + with open(self._json_path, "w") as f: + json.dump({uid: user.__dict__ for uid, user in self._users.items()}, f, default=str) + + def update_user(self, id: str, username: str, fullname: str) -> bool: + if id in self._users.keys(): + self._users[id].username = username + self._users[id].fullname = fullname + else: + self._users[id] = _User(id, username, fullname) + self._save_users() + return True + + def get_username(self, id: str) -> str: + if id in self._users.keys(): + return self._users[id].username return None - grantor_id = entry.get("grantor") - return get_credentials(grantor_id) + + def get_fullname(self, id: str) -> str: + if id in self._users.keys(): + return self._users[id].fullname + return None + + def get_status(self, id: str) -> Status: + if id in self._users.keys(): + return self._users[id].status + return Status.DISABLED + + def get_role(self, id: str) -> Role: + if id in self._users.keys(): + return self._users[id].role + return Role.GUEST -def get_admins(): - return [uid for uid, u in load_users().items() if u.get("role") == "admin"] + def get_credentials(self, id: str) -> Credential: + if id in self._users.keys(): + return self._users[id].credentials + return None + + def set_credentials(self, id: str, credentials: Credential) -> bool: + if id in self._users.keys(): + self._users[id].credentials = credentials + return True + return False + + def can_open_gate(self, id: str, gate: str) -> bool: + if id in self._users.keys(): + if gate in self._users[id].grants.keys(): + if self._users[id].grants[gate].status == Status.ENABLED: + if self._users[id].grants[gate].expires_at > datetime.now(timezone.utc): + return True + return False + + def get_grantor(self, id: str, gate: str) -> str: + if id in self._users.keys(): + if gate in self._users[id].grants.keys(): + return self._users[id].grants[gate].grantor + return None + + def get_admins(self) -> list[str]: + return [uid for uid, user in self._users.items() if user.role == Role.ADMIN] + + def grant_access(self, id: str, gate: str, expires_at: datetime, grantor_id: str) -> bool: + if id in self._users.keys(): + self._users[id].grant(gate, _Grant(grantor_id, expires_at)) + self._save_users() + return True + return False + + def revoke_access(self, id: str, gate: str) -> bool: + if id in self._users.keys(): + self._users[id].revoke(gate) + self._save_users() + return True + return False