mirror of
https://github.com/Noettore/lagomareGateKeeperBot.git
synced 2025-10-14 19:16:40 +02:00
First commit. Lot to do...
This commit is contained in:
1
.gitignore
vendored
1
.gitignore
vendored
@@ -58,3 +58,4 @@ docs/_build/
|
|||||||
# PyBuilder
|
# PyBuilder
|
||||||
target/
|
target/
|
||||||
|
|
||||||
|
data/
|
||||||
|
40
access_control.py
Normal file
40
access_control.py
Normal file
@@ -0,0 +1,40 @@
|
|||||||
|
import json
|
||||||
|
from datetime import datetime, timezone
|
||||||
|
|
||||||
|
ACCESS_FILE = "./data/access.json"
|
||||||
|
|
||||||
|
def load_access():
|
||||||
|
try:
|
||||||
|
with open(ACCESS_FILE, "r") as f:
|
||||||
|
return json.load(f)
|
||||||
|
except FileNotFoundError:
|
||||||
|
return {}
|
||||||
|
|
||||||
|
def save_access(data):
|
||||||
|
with open(ACCESS_FILE, "w") as f:
|
||||||
|
json.dump(data, f)
|
||||||
|
|
||||||
|
def get_grantor(user_id, gate):
|
||||||
|
access = load_access().get(str(user_id), {})
|
||||||
|
entry = access.get(gate) or access.get("all")
|
||||||
|
return entry.get("grantor", "")
|
||||||
|
|
||||||
|
def can_open_gate(user_id, gate):
|
||||||
|
access = load_access().get(str(user_id), {})
|
||||||
|
entry = access.get(gate) or access.get("all")
|
||||||
|
if not entry or entry["type"] != "timed":
|
||||||
|
return False
|
||||||
|
if datetime.now(timezone.utc) < datetime.fromisoformat(entry["expires_at"].replace("Z", "+00:00")):
|
||||||
|
return True
|
||||||
|
return False
|
||||||
|
|
||||||
|
def grant_access(user_id, gate, expires_at, grantor_id):
|
||||||
|
access = load_access()
|
||||||
|
user_access = access.get(str(user_id), {})
|
||||||
|
user_access[gate] = {
|
||||||
|
"type": "timed",
|
||||||
|
"expires_at": expires_at,
|
||||||
|
"grantor": grantor_id
|
||||||
|
}
|
||||||
|
access[str(user_id)] = user_access
|
||||||
|
save_access(access)
|
42
avconnect.py
Normal file
42
avconnect.py
Normal file
@@ -0,0 +1,42 @@
|
|||||||
|
import requests
|
||||||
|
import pprint
|
||||||
|
|
||||||
|
BASE_URL = "https://www.avconnect.it"
|
||||||
|
|
||||||
|
class AVConnectAPI:
|
||||||
|
def __init__(self, username: str, password: str):
|
||||||
|
self.username = username
|
||||||
|
self.password = password
|
||||||
|
self.session = requests.Session()
|
||||||
|
self.authenticated = False
|
||||||
|
|
||||||
|
def authenticate(self) -> bool:
|
||||||
|
login_url = f"{BASE_URL}/loginone.php"
|
||||||
|
headers = {
|
||||||
|
"Content-Type": "application/x-www-form-urlencoded"
|
||||||
|
}
|
||||||
|
payload = f"userid={self.username}&password={self.password}&entra=Login"
|
||||||
|
response = self.session.post(login_url, data=payload, headers=headers)
|
||||||
|
|
||||||
|
if response.ok and "PHPSESSID" in self.session.cookies:
|
||||||
|
self.authenticated = True
|
||||||
|
return True
|
||||||
|
return False
|
||||||
|
|
||||||
|
def exec_gate_macro(self, id_macro) -> requests.Response:
|
||||||
|
if not self.authenticated and not self.authenticate():
|
||||||
|
raise Exception("Authentication failed.")
|
||||||
|
|
||||||
|
exec_url = f"{BASE_URL}/exemacrocom.php"
|
||||||
|
headers = {
|
||||||
|
"Content-Type": "application/x-www-form-urlencoded; charset=UTF-8"
|
||||||
|
}
|
||||||
|
payload = f"idmacrocom={id_macro}&nome=16"
|
||||||
|
|
||||||
|
response = self.session.prepare_request(requests.Request("POST", exec_url, data=payload, headers=headers))
|
||||||
|
pprint.pprint(response.headers)
|
||||||
|
return True
|
||||||
|
#response = self.session.post(exec_url, data=payload, headers=headers)
|
||||||
|
#if response.ok:
|
||||||
|
# return True
|
||||||
|
#return False
|
99
bot.py
Normal file
99
bot.py
Normal file
@@ -0,0 +1,99 @@
|
|||||||
|
from telegram import Update
|
||||||
|
from telegram.ext import Application, CommandHandler, MessageHandler, ContextTypes
|
||||||
|
from config import BOT_TOKEN
|
||||||
|
from access_control import can_open_gate, grant_access, get_grantor
|
||||||
|
from gates import get_name, open_gate
|
||||||
|
from users import get_role, set_credentials, get_credentials, get_grantor_credentials, get_admins, update_user
|
||||||
|
|
||||||
|
async def updateuser(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
||||||
|
user_id = str(update.effective_user.id)
|
||||||
|
username = update.effective_user.username
|
||||||
|
fullname = update.effective_user.full_name
|
||||||
|
update_user(user_id, username, fullname)
|
||||||
|
|
||||||
|
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
||||||
|
await update.message.reply_text("Welcome to GatekeeperBot! Use `/setcredentials` to configure your access.")
|
||||||
|
|
||||||
|
async def setcredentials(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
||||||
|
user_id = str(update.effective_user.id)
|
||||||
|
args = context.args
|
||||||
|
if len(args) != 2:
|
||||||
|
return await update.message.reply_text("Usage: `/setcredentials <username> <password>`")
|
||||||
|
role = get_role(user_id)
|
||||||
|
if role not in ("admin", "member"):
|
||||||
|
return await update.message.reply_text("Only members or admins can set credentials.")
|
||||||
|
set_credentials(user_id, args[0], args[1])
|
||||||
|
await update.message.reply_text("Credentials saved.")
|
||||||
|
|
||||||
|
async def opengate(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
||||||
|
user_id = str(update.effective_user.id)
|
||||||
|
args = context.args
|
||||||
|
if not args:
|
||||||
|
return await update.message.reply_text("Usage: `/opengate <gate>`")
|
||||||
|
gate = args[0]
|
||||||
|
role = get_role(user_id)
|
||||||
|
if role in ("admin", "member"):
|
||||||
|
creds = get_credentials(user_id)
|
||||||
|
if not creds:
|
||||||
|
return await update.message.reply_text("Please set your credentials with `/setcredentials` first.")
|
||||||
|
elif role == "guest" and can_open_gate(user_id, gate):
|
||||||
|
creds = get_grantor_credentials(user_id, gate)
|
||||||
|
if not creds:
|
||||||
|
return await update.message.reply_text("No valid grantor credentials available.")
|
||||||
|
grantor = get_grantor(user_id, gate)
|
||||||
|
if grantor:
|
||||||
|
try:
|
||||||
|
await context.bot.send_message(chat_id=grantor, text=f"Guest {user_id} opened {get_name(gate)}")
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Failed to notify {grantor} that guest {user_id} opened {get_name(gate)}: {e}")
|
||||||
|
else:
|
||||||
|
return await update.message.reply_text("Access denied.")
|
||||||
|
|
||||||
|
if open_gate(gate, creds):
|
||||||
|
return await update.message.reply_text(f"Gate {get_name(gate)} opened!")
|
||||||
|
await update.message.reply_text(f"ERROR: Cannot open gate {get_name(gate)}")
|
||||||
|
|
||||||
|
async def requestaccess(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
||||||
|
user_id = str(update.effective_user.id)
|
||||||
|
role = get_role(user_id)
|
||||||
|
if role not in ("guest"):
|
||||||
|
return await update.message.reply_text("Only guests can request access.")
|
||||||
|
if not context.args:
|
||||||
|
return await update.message.reply_text("Usage: `/requestaccess <gate>`")
|
||||||
|
gate = context.args[0]
|
||||||
|
requester = update.effective_user.username or update.effective_user.full_name
|
||||||
|
text = (f"Access request: @{requester} ({user_id}) requests access to *{gate}*.\nUse `/approve {user_id} {gate} 60` to grant access.")
|
||||||
|
await update.message.reply_text("Your request has been submitted.")
|
||||||
|
admins = get_admins()
|
||||||
|
for admin_id in admins:
|
||||||
|
try:
|
||||||
|
await context.bot.send_message(chat_id=admin_id, text=text, parse_mode="Markdown")
|
||||||
|
except:
|
||||||
|
pass
|
||||||
|
|
||||||
|
async def approve(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
||||||
|
approver_id = str(update.effective_user.id)
|
||||||
|
if get_role(approver_id) != "admin":
|
||||||
|
return await update.message.reply_text("Only admins can approve access.")
|
||||||
|
try:
|
||||||
|
user_id = context.args[0]
|
||||||
|
gate = context.args[1]
|
||||||
|
expires_at = context.args[2]
|
||||||
|
grant_access(user_id, gate, expires_at, grantor_id=approver_id)
|
||||||
|
await update.message.reply_text(f"Access to {gate} granted for user {user_id}.")
|
||||||
|
#TODO: notify guest of granted access
|
||||||
|
except:
|
||||||
|
await update.message.reply_text("Usage: `/approve <user_id> <gate|all> <expires_at:YYYY-MM-DDTHH:MM:SSZ>`")
|
||||||
|
|
||||||
|
def main():
|
||||||
|
app = Application.builder().token(BOT_TOKEN).build()
|
||||||
|
app.add_handler(MessageHandler(None, updateuser), group=1)
|
||||||
|
app.add_handler(CommandHandler("start", start))
|
||||||
|
app.add_handler(CommandHandler("setcredentials", setcredentials))
|
||||||
|
app.add_handler(CommandHandler("opengate", opengate))
|
||||||
|
app.add_handler(CommandHandler("requestaccess", requestaccess))
|
||||||
|
app.add_handler(CommandHandler("approve", approve))
|
||||||
|
app.run_polling()
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
1
config.py
Normal file
1
config.py
Normal file
@@ -0,0 +1 @@
|
|||||||
|
BOT_TOKEN = "7548174804:AAF3da0PZR47RzOF4llQEJszMD5JjVdS8zI"
|
27
gates.py
Normal file
27
gates.py
Normal file
@@ -0,0 +1,27 @@
|
|||||||
|
import json
|
||||||
|
from avconnect import AVConnectAPI
|
||||||
|
|
||||||
|
GATE_FILE = "./data/gates.json"
|
||||||
|
|
||||||
|
def load_gates():
|
||||||
|
try:
|
||||||
|
with open(GATE_FILE, "r") as f:
|
||||||
|
return json.load(f)
|
||||||
|
except FileNotFoundError:
|
||||||
|
return {}
|
||||||
|
|
||||||
|
def get_name(gate):
|
||||||
|
return load_gates().get(str(gate), {}).get("name", "")
|
||||||
|
|
||||||
|
def open_gate(gate, credentials):
|
||||||
|
gate_info = load_gates().get(str(gate), {})
|
||||||
|
if not gate_info:
|
||||||
|
return False
|
||||||
|
gate_id = gate_info["id"]
|
||||||
|
|
||||||
|
try:
|
||||||
|
api = AVConnectAPI(credentials["username"], credentials["password"])
|
||||||
|
return api.exec_gate_macro(gate_id)
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Failed to open gate {gate}: {e}")
|
||||||
|
return False
|
1
requirements.txt
Normal file
1
requirements.txt
Normal file
@@ -0,0 +1 @@
|
|||||||
|
python-telegram-bot==20.7
|
59
users.py
Normal file
59
users.py
Normal file
@@ -0,0 +1,59 @@
|
|||||||
|
import json
|
||||||
|
|
||||||
|
USER_FILE = "./data/users.json"
|
||||||
|
|
||||||
|
def load_users():
|
||||||
|
try:
|
||||||
|
with open(USER_FILE, "r") as f:
|
||||||
|
return json.load(f)
|
||||||
|
except FileNotFoundError:
|
||||||
|
return {}
|
||||||
|
|
||||||
|
def save_users(data):
|
||||||
|
with open(USER_FILE, "w") as f:
|
||||||
|
json.dump(data, f)
|
||||||
|
|
||||||
|
def update_user(user_id, username, fullname):
|
||||||
|
has_changes = False
|
||||||
|
users = load_users()
|
||||||
|
user = users.get(str(user_id), {})
|
||||||
|
|
||||||
|
if user.get("username", "") != username:
|
||||||
|
has_changes = True
|
||||||
|
user["username"] = username
|
||||||
|
if user.get("fullname", "") != fullname:
|
||||||
|
has_changes = True
|
||||||
|
user["fullname"] = fullname
|
||||||
|
if not user.get("role"):
|
||||||
|
has_changes = True
|
||||||
|
user["role"] = "guest"
|
||||||
|
|
||||||
|
if has_changes:
|
||||||
|
users[str(user_id)] = user
|
||||||
|
save_users(users)
|
||||||
|
|
||||||
|
|
||||||
|
def get_role(user_id):
|
||||||
|
return load_users().get(str(user_id), {}).get("role", "guest")
|
||||||
|
|
||||||
|
def set_credentials(user_id, username, password):
|
||||||
|
users = load_users()
|
||||||
|
user = users.get(str(user_id), {})
|
||||||
|
user["credentials"] = {"username": username, "password": password}
|
||||||
|
users[str(user_id)] = user
|
||||||
|
save_users(users)
|
||||||
|
|
||||||
|
def get_credentials(user_id):
|
||||||
|
return load_users().get(str(user_id), {}).get("credentials")
|
||||||
|
|
||||||
|
def get_grantor_credentials(user_id, gate):
|
||||||
|
from access_control import load_access
|
||||||
|
access = load_access().get(str(user_id), {})
|
||||||
|
entry = access.get(gate) or access.get("all")
|
||||||
|
if not entry:
|
||||||
|
return None
|
||||||
|
grantor_id = entry.get("grantor")
|
||||||
|
return get_credentials(grantor_id)
|
||||||
|
|
||||||
|
def get_admins():
|
||||||
|
return [uid for uid, u in load_users().items() if u.get("role") == "admin"]
|
Reference in New Issue
Block a user