From cfbf71421f22c65b19fa1a49869015f9bde34f18 Mon Sep 17 00:00:00 2001 From: Ettore Dreucci Date: Fri, 9 May 2025 01:11:08 +0200 Subject: [PATCH] First commit. Lot to do... --- .gitignore | 1 + access_control.py | 40 +++++++++++++++++++ avconnect.py | 42 ++++++++++++++++++++ bot.py | 99 +++++++++++++++++++++++++++++++++++++++++++++++ config.py | 1 + gates.py | 27 +++++++++++++ requirements.txt | 1 + users.py | 59 ++++++++++++++++++++++++++++ 8 files changed, 270 insertions(+) create mode 100644 access_control.py create mode 100644 avconnect.py create mode 100644 bot.py create mode 100644 config.py create mode 100644 gates.py create mode 100644 requirements.txt create mode 100644 users.py diff --git a/.gitignore b/.gitignore index 7f7cccc..b1bf1a0 100644 --- a/.gitignore +++ b/.gitignore @@ -58,3 +58,4 @@ docs/_build/ # PyBuilder target/ +data/ diff --git a/access_control.py b/access_control.py new file mode 100644 index 0000000..ad4d95a --- /dev/null +++ b/access_control.py @@ -0,0 +1,40 @@ +import json +from datetime import datetime, timezone + +ACCESS_FILE = "./data/access.json" + +def load_access(): + try: + with open(ACCESS_FILE, "r") as f: + return json.load(f) + except FileNotFoundError: + return {} + +def save_access(data): + with open(ACCESS_FILE, "w") as f: + json.dump(data, f) + +def get_grantor(user_id, gate): + access = load_access().get(str(user_id), {}) + entry = access.get(gate) or access.get("all") + return entry.get("grantor", "") + +def can_open_gate(user_id, gate): + access = load_access().get(str(user_id), {}) + entry = access.get(gate) or access.get("all") + if not entry or entry["type"] != "timed": + return False + if datetime.now(timezone.utc) < datetime.fromisoformat(entry["expires_at"].replace("Z", "+00:00")): + return True + return False + +def grant_access(user_id, gate, expires_at, grantor_id): + access = load_access() + user_access = access.get(str(user_id), {}) + user_access[gate] = { + "type": "timed", + "expires_at": expires_at, + "grantor": grantor_id + } + access[str(user_id)] = user_access + save_access(access) diff --git a/avconnect.py b/avconnect.py new file mode 100644 index 0000000..efa35e8 --- /dev/null +++ b/avconnect.py @@ -0,0 +1,42 @@ +import requests +import pprint + +BASE_URL = "https://www.avconnect.it" + +class AVConnectAPI: + def __init__(self, username: str, password: str): + self.username = username + self.password = password + self.session = requests.Session() + self.authenticated = False + + def authenticate(self) -> bool: + login_url = f"{BASE_URL}/loginone.php" + headers = { + "Content-Type": "application/x-www-form-urlencoded" + } + payload = f"userid={self.username}&password={self.password}&entra=Login" + response = self.session.post(login_url, data=payload, headers=headers) + + if response.ok and "PHPSESSID" in self.session.cookies: + self.authenticated = True + return True + return False + + def exec_gate_macro(self, id_macro) -> requests.Response: + if not self.authenticated and not self.authenticate(): + raise Exception("Authentication failed.") + + exec_url = f"{BASE_URL}/exemacrocom.php" + headers = { + "Content-Type": "application/x-www-form-urlencoded; charset=UTF-8" + } + payload = f"idmacrocom={id_macro}&nome=16" + + response = self.session.prepare_request(requests.Request("POST", exec_url, data=payload, headers=headers)) + pprint.pprint(response.headers) + return True + #response = self.session.post(exec_url, data=payload, headers=headers) + #if response.ok: + # return True + #return False \ No newline at end of file diff --git a/bot.py b/bot.py new file mode 100644 index 0000000..be022e1 --- /dev/null +++ b/bot.py @@ -0,0 +1,99 @@ +from telegram import Update +from telegram.ext import Application, CommandHandler, MessageHandler, ContextTypes +from config import BOT_TOKEN +from access_control import can_open_gate, grant_access, get_grantor +from gates import get_name, open_gate +from users import get_role, set_credentials, get_credentials, get_grantor_credentials, get_admins, update_user + +async def updateuser(update: Update, context: ContextTypes.DEFAULT_TYPE): + user_id = str(update.effective_user.id) + username = update.effective_user.username + fullname = update.effective_user.full_name + update_user(user_id, username, fullname) + +async def start(update: Update, context: ContextTypes.DEFAULT_TYPE): + await update.message.reply_text("Welcome to GatekeeperBot! Use `/setcredentials` to configure your access.") + +async def setcredentials(update: Update, context: ContextTypes.DEFAULT_TYPE): + user_id = str(update.effective_user.id) + args = context.args + if len(args) != 2: + return await update.message.reply_text("Usage: `/setcredentials `") + role = get_role(user_id) + if role not in ("admin", "member"): + return await update.message.reply_text("Only members or admins can set credentials.") + set_credentials(user_id, args[0], args[1]) + await update.message.reply_text("Credentials saved.") + +async def opengate(update: Update, context: ContextTypes.DEFAULT_TYPE): + user_id = str(update.effective_user.id) + args = context.args + if not args: + return await update.message.reply_text("Usage: `/opengate `") + gate = args[0] + role = get_role(user_id) + if role in ("admin", "member"): + creds = get_credentials(user_id) + if not creds: + return await update.message.reply_text("Please set your credentials with `/setcredentials` first.") + elif role == "guest" and can_open_gate(user_id, gate): + creds = get_grantor_credentials(user_id, gate) + if not creds: + return await update.message.reply_text("No valid grantor credentials available.") + grantor = get_grantor(user_id, gate) + if grantor: + try: + await context.bot.send_message(chat_id=grantor, text=f"Guest {user_id} opened {get_name(gate)}") + except Exception as e: + print(f"Failed to notify {grantor} that guest {user_id} opened {get_name(gate)}: {e}") + else: + return await update.message.reply_text("Access denied.") + + if open_gate(gate, creds): + return await update.message.reply_text(f"Gate {get_name(gate)} opened!") + await update.message.reply_text(f"ERROR: Cannot open gate {get_name(gate)}") + +async def requestaccess(update: Update, context: ContextTypes.DEFAULT_TYPE): + user_id = str(update.effective_user.id) + role = get_role(user_id) + if role not in ("guest"): + return await update.message.reply_text("Only guests can request access.") + if not context.args: + return await update.message.reply_text("Usage: `/requestaccess `") + gate = context.args[0] + requester = update.effective_user.username or update.effective_user.full_name + text = (f"Access request: @{requester} ({user_id}) requests access to *{gate}*.\nUse `/approve {user_id} {gate} 60` to grant access.") + await update.message.reply_text("Your request has been submitted.") + admins = get_admins() + for admin_id in admins: + try: + await context.bot.send_message(chat_id=admin_id, text=text, parse_mode="Markdown") + except: + pass + +async def approve(update: Update, context: ContextTypes.DEFAULT_TYPE): + approver_id = str(update.effective_user.id) + if get_role(approver_id) != "admin": + return await update.message.reply_text("Only admins can approve access.") + try: + user_id = context.args[0] + gate = context.args[1] + expires_at = context.args[2] + grant_access(user_id, gate, expires_at, grantor_id=approver_id) + await update.message.reply_text(f"Access to {gate} granted for user {user_id}.") + #TODO: notify guest of granted access + except: + await update.message.reply_text("Usage: `/approve `") + +def main(): + app = Application.builder().token(BOT_TOKEN).build() + app.add_handler(MessageHandler(None, updateuser), group=1) + app.add_handler(CommandHandler("start", start)) + app.add_handler(CommandHandler("setcredentials", setcredentials)) + app.add_handler(CommandHandler("opengate", opengate)) + app.add_handler(CommandHandler("requestaccess", requestaccess)) + app.add_handler(CommandHandler("approve", approve)) + app.run_polling() + +if __name__ == "__main__": + main() diff --git a/config.py b/config.py new file mode 100644 index 0000000..9efe429 --- /dev/null +++ b/config.py @@ -0,0 +1 @@ +BOT_TOKEN = "7548174804:AAF3da0PZR47RzOF4llQEJszMD5JjVdS8zI" \ No newline at end of file diff --git a/gates.py b/gates.py new file mode 100644 index 0000000..5b18783 --- /dev/null +++ b/gates.py @@ -0,0 +1,27 @@ +import json +from avconnect import AVConnectAPI + +GATE_FILE = "./data/gates.json" + +def load_gates(): + try: + with open(GATE_FILE, "r") as f: + return json.load(f) + except FileNotFoundError: + return {} + +def get_name(gate): + return load_gates().get(str(gate), {}).get("name", "") + +def open_gate(gate, credentials): + gate_info = load_gates().get(str(gate), {}) + if not gate_info: + return False + gate_id = gate_info["id"] + + try: + api = AVConnectAPI(credentials["username"], credentials["password"]) + return api.exec_gate_macro(gate_id) + except Exception as e: + print(f"Failed to open gate {gate}: {e}") + return False diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..5050803 --- /dev/null +++ b/requirements.txt @@ -0,0 +1 @@ +python-telegram-bot==20.7 \ No newline at end of file diff --git a/users.py b/users.py new file mode 100644 index 0000000..4e6595a --- /dev/null +++ b/users.py @@ -0,0 +1,59 @@ +import json + +USER_FILE = "./data/users.json" + +def load_users(): + try: + with open(USER_FILE, "r") as f: + return json.load(f) + except FileNotFoundError: + return {} + +def save_users(data): + with open(USER_FILE, "w") as f: + json.dump(data, f) + +def update_user(user_id, username, fullname): + has_changes = False + users = load_users() + user = users.get(str(user_id), {}) + + if user.get("username", "") != username: + has_changes = True + user["username"] = username + if user.get("fullname", "") != fullname: + has_changes = True + user["fullname"] = fullname + if not user.get("role"): + has_changes = True + user["role"] = "guest" + + if has_changes: + users[str(user_id)] = user + save_users(users) + + +def get_role(user_id): + return load_users().get(str(user_id), {}).get("role", "guest") + +def set_credentials(user_id, username, password): + users = load_users() + user = users.get(str(user_id), {}) + user["credentials"] = {"username": username, "password": password} + users[str(user_id)] = user + save_users(users) + +def get_credentials(user_id): + return load_users().get(str(user_id), {}).get("credentials") + +def get_grantor_credentials(user_id, gate): + from access_control import load_access + access = load_access().get(str(user_id), {}) + entry = access.get(gate) or access.get("all") + if not entry: + return None + grantor_id = entry.get("grantor") + return get_credentials(grantor_id) + +def get_admins(): + return [uid for uid, u in load_users().items() if u.get("role") == "admin"]