Refactor and start implementing inline keyboards

This commit is contained in:
2025-05-19 23:58:17 +02:00
parent 4a3d2746fb
commit 020b5e0193
7 changed files with 291 additions and 274 deletions

View File

@@ -13,7 +13,7 @@ class AccessControl:
return json.load(f) return json.load(f)
except FileNotFoundError: except FileNotFoundError:
return {} return {}
def _save_access(self): def _save_access(self):
with open(self._ACCESS_FILE, "w") as f: with open(self._ACCESS_FILE, "w") as f:
json.dump(self._access, f) json.dump(self._access, f)
@@ -21,12 +21,12 @@ class AccessControl:
def get_grantor(self, user_id: str, gate: str) -> str: def get_grantor(self, user_id: str, gate: str) -> str:
access = self._access.get(user_id, {}) access = self._access.get(user_id, {})
entry = access.get(gate) or access.get("all") entry = access.get(gate) or access.get("all")
return entry.get("grantor", "") return entry.get("grantor", "") if entry else ""
def can_open_gate(self, user_id: str, gate: str) -> bool: def can_open_gate(self, user_id: str, gate: str) -> bool:
access = self._access.get(user_id, {}) access = self._access.get(user_id, {})
entry = access.get(gate) or access.get("all") entry = access.get(gate) or access.get("all")
if not entry or entry["type"] != "timed": if not entry or entry.get("type") != "timed":
return False return False
if datetime.now(timezone.utc) < datetime.fromisoformat(entry["expires_at"].replace("Z", "+00:00")): if datetime.now(timezone.utc) < datetime.fromisoformat(entry["expires_at"].replace("Z", "+00:00")):
return True return True

View File

@@ -1,5 +1,4 @@
import requests import requests
import pprint
from fake_useragent import UserAgent from fake_useragent import UserAgent
from commons import Credential from commons import Credential
@@ -15,12 +14,9 @@ class AVConnectAPI:
def _authenticate(self) -> bool: def _authenticate(self) -> bool:
login_url = f"{self._BASE_URL}/loginone.php" login_url = f"{self._BASE_URL}/loginone.php"
headers = { headers = {"Content-Type": "application/x-www-form-urlencoded"}
"Content-Type": "application/x-www-form-urlencoded"
}
payload = f"userid={self._username}&password={self._password}&entra=Login" payload = f"userid={self._username}&password={self._password}&entra=Login"
response = self._session.post(login_url, data=payload, headers=headers) response = self._session.post(login_url, data=payload, headers=headers)
if response.ok and "PHPSESSID" in self._session.cookies: if response.ok and "PHPSESSID" in self._session.cookies:
self._authenticated = True self._authenticated = True
return True return True
@@ -29,18 +25,13 @@ class AVConnectAPI:
def exec_gate_macro(self, id_macro) -> bool: def exec_gate_macro(self, id_macro) -> bool:
if not self._authenticated and not self._authenticate(): if not self._authenticated and not self._authenticate():
raise Exception("Authentication failed.") raise Exception("Authentication failed.")
exec_url = f"{self._BASE_URL}/exemacrocom.php" exec_url = f"{self._BASE_URL}/exemacrocom.php"
headers = { headers = {
"User-Agent": self._ua, "User-Agent": self._ua,
"Content-Type": "application/x-www-form-urlencoded; charset=UTF-8" "Content-Type": "application/x-www-form-urlencoded; charset=UTF-8"
} }
payload = f"idmacrocom={id_macro}&nome=16" payload = f"idmacrocom={id_macro}&nome=16"
# Uncomment for real request:
response = self._session.prepare_request(requests.Request("POST", exec_url, data=payload, headers=headers)) # response = self._session.post(exec_url, data=payload, headers=headers)
pprint.pprint(response.headers) # return response.ok
return True return True # For testing
#response = self._session.post(exec_url, data=payload, headers=headers)
#if response.ok:
# return True
#return False

114
bot.py
View File

@@ -1,5 +1,5 @@
from telegram import Update from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup
from telegram.ext import Application, CommandHandler, MessageHandler, ContextTypes from telegram.ext import Application, CommandHandler, MessageHandler, ContextTypes, CallbackQueryHandler
from datetime import datetime from datetime import datetime
from config import BotConfig from config import BotConfig
from gates import Gates from gates import Gates
@@ -23,7 +23,30 @@ async def updateuser(update: Update, context: ContextTypes.DEFAULT_TYPE):
users.update_user(user_id, username, fullname) users.update_user(user_id, username, fullname)
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE): async def start(update: Update, context: ContextTypes.DEFAULT_TYPE):
await update.message.reply_text("Welcome to GatekeeperBot! Use `/setcredentials` to configure your access") user_id = str(update.effective_user.id)
role = users.get_role(user_id)
keyboard = []
if role == Role.GUEST:
# Guests: can request access, or open a gate if granted
if users.has_grants(user_id):
keyboard.append([InlineKeyboardButton("Open Gate", callback_data="open_gate_menu")])
keyboard.append([InlineKeyboardButton("Request Access", callback_data="request_access")])
elif role == Role.MEMBER:
# Members: can open gates and set credentials
keyboard.append([InlineKeyboardButton("Open Gate", callback_data="open_gate_menu")])
keyboard.append([InlineKeyboardButton("Set Credentials", callback_data="set_credentials")])
elif role == Role.ADMIN:
# Admins: can open gates, grant access, and set credentials
keyboard.append([InlineKeyboardButton("Open Gate", callback_data="open_gate_menu")])
keyboard.append([InlineKeyboardButton("Grant Access", callback_data="grant_access")])
keyboard.append([InlineKeyboardButton("Set Credentials", callback_data="set_credentials")])
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(
"Welcome to GatekeeperBot! Use the buttons below or commands.",
reply_markup=reply_markup
)
async def setcredentials(update: Update, context: ContextTypes.DEFAULT_TYPE): async def setcredentials(update: Update, context: ContextTypes.DEFAULT_TYPE):
user_id = str(update.effective_user.id) user_id = str(update.effective_user.id)
@@ -31,7 +54,7 @@ async def setcredentials(update: Update, context: ContextTypes.DEFAULT_TYPE):
if len(args) != 2: if len(args) != 2:
return await update.message.reply_text("Usage: `/setcredentials <username> <password>`") return await update.message.reply_text("Usage: `/setcredentials <username> <password>`")
role = users.get_role(user_id) role = users.get_role(user_id)
if role not in ("admin", "member"): if role not in (Role.ADMIN, Role.MEMBER):
return await update.message.reply_text("Only members or admins can set credentials") return await update.message.reply_text("Only members or admins can set credentials")
if users.set_credentials(user_id, Credential(args[0], args[1])): if users.set_credentials(user_id, Credential(args[0], args[1])):
await update.message.reply_text("Credentials saved") await update.message.reply_text("Credentials saved")
@@ -57,7 +80,7 @@ async def opengate(update: Update, context: ContextTypes.DEFAULT_TYPE):
return await update.message.reply_text("No valid grantor available.") return await update.message.reply_text("No valid grantor available.")
if not creds: if not creds:
return await update.message.reply_text("No valid grantor credentials available.") return await update.message.reply_text("No valid grantor credentials available.")
#TODO: update guest last_used_at users.update_grant_last_used(user_id, gate)
try: try:
await context.bot.send_message(chat_id=grantor, text=f"Guest {user_id} opened {gate_name}") await context.bot.send_message(chat_id=grantor, text=f"Guest {user_id} opened {gate_name}")
except Exception as e: except Exception as e:
@@ -69,10 +92,61 @@ async def opengate(update: Update, context: ContextTypes.DEFAULT_TYPE):
return await update.message.reply_text(f"Gate {gate_name} opened!") return await update.message.reply_text(f"Gate {gate_name} opened!")
await update.message.reply_text(f"ERROR: Cannot open gate {gate_name}") await update.message.reply_text(f"ERROR: Cannot open gate {gate_name}")
async def open_gate_menu(update: Update, context: ContextTypes.DEFAULT_TYPE):
# Show a list of available gates as buttons
keyboard = [
[InlineKeyboardButton(gate.name, callback_data=f"opengate_{gate_id}")]
for gate_id, gate in gates._gates.items()
]
reply_markup = InlineKeyboardMarkup(keyboard)
if update.callback_query:
await update.callback_query.answer()
await update.callback_query.edit_message_text(
"Select a gate to open:", reply_markup=reply_markup
)
else:
await update.message.reply_text("Select a gate to open:", reply_markup=reply_markup)
async def handle_gate_open_callback(update: Update, context: ContextTypes.DEFAULT_TYPE):
query = update.callback_query
user_id = str(query.from_user.id)
data = query.data
if data.startswith("opengate_"):
gate_id = data[len("opengate_") :]
gate_name = gates.get_name(gate_id)
role = users.get_role(user_id)
if role in (Role.ADMIN, Role.MEMBER):
creds = users.get_credentials(user_id)
if not creds:
await query.answer("Please set your credentials with /setcredentials first", show_alert=True)
return
elif role == Role.GUEST and users.can_open_gate(user_id, gate_id):
grantor = users.get_grantor(user_id, gate_id)
creds = users.get_credentials(grantor)
if not grantor or not creds:
await query.answer("No valid grantor credentials available.", show_alert=True)
return
users.update_grant_last_used(user_id, gate_id)
try:
await context.bot.send_message(chat_id=grantor, text=f"Guest {user_id} opened {gate_name}")
except Exception as e:
print(f"Failed to notify {grantor} that guest {user_id} opened {gate_name}: {e}")
else:
# TODO: guest not working
await query.answer("Access denied.", show_alert=True)
return
if gates.open_gate(gate_id, creds):
await query.answer(f"Gate {gate_name} opened!", show_alert=True)
await query.edit_message_text(f"Gate {gate_name} opened!")
else:
await query.answer(f"ERROR: Cannot open gate {gate_name}", show_alert=True)
await query.edit_message_text(f"ERROR: Cannot open gate {gate_name}")
async def requestaccess(update: Update, context: ContextTypes.DEFAULT_TYPE): async def requestaccess(update: Update, context: ContextTypes.DEFAULT_TYPE):
user_id = str(update.effective_user.id) user_id = str(update.effective_user.id)
role = users.get_role(user_id) role = users.get_role(user_id)
if role not in ("guest"): if role != Role.GUEST:
return await update.message.reply_text("Only guests can request access.") return await update.message.reply_text("Only guests can request access.")
if not context.args: if not context.args:
return await update.message.reply_text("Usage: `/requestaccess`", parse_mode="Markdown") return await update.message.reply_text("Usage: `/requestaccess`", parse_mode="Markdown")
@@ -86,21 +160,29 @@ async def requestaccess(update: Update, context: ContextTypes.DEFAULT_TYPE):
except Exception as e: except Exception as e:
print(f"Failed to notify {admin_id} that guest {user_id} requested access: {e}") print(f"Failed to notify {admin_id} that guest {user_id} requested access: {e}")
async def approve(update: Update, context: ContextTypes.DEFAULT_TYPE): async def handle_main_menu_callback(update: Update, context: ContextTypes.DEFAULT_TYPE):
approver_id = str(update.effective_user.id) query = update.callback_query
if users.get_role(approver_id) != "admin": data = query.data
return await update.message.reply_text("Only admins can approve access.") if data == "open_gate_menu":
await open_gate_menu(update, context)
elif data == "request_access":
await requestaccess(update, context)
async def grantaccess(update: Update, context: ContextTypes.DEFAULT_TYPE):
grantor_id = str(update.effective_user.id)
if users.get_role(grantor_id) != Role.ADMIN:
return await update.message.reply_text("Only admins can grant access.")
try: try:
user_id = context.args[0] user_id = context.args[0]
gate = context.args[1] gate = context.args[1]
expires_at = context.args[2] expires_at = context.args[2]
users.grant_access(user_id, gate, datetime.fromisoformat(expires_at.replace("Z", "+00:00")), grantor_id=approver_id) users.grant_access(user_id, gate, datetime.fromisoformat(expires_at.replace("Z", "+00:00")), grantor_id=grantor_id)
await update.message.reply_text(f"Access to {gate} granted to user {user_id}.") await update.message.reply_text(f"Access to {gate} granted to user {user_id} until {expires_at}")
try: try:
await context.bot.send_message(chat_id=user_id, text=f"Access granted to gate {gate} up to {expires_at}") await context.bot.send_message(chat_id=user_id, text=f"Access granted to gate {gate} up to {expires_at}")
except Exception as e: except Exception as e:
print(f"Failed to notify {user_id} that admin {approver_id} approved access for {gate} up to {expires_at}: {e}") print(f"Failed to notify {user_id} that admin {grantor_id} granted access for {gate} up to {expires_at}: {e}")
except: except Exception:
await update.message.reply_text("Usage: `/approve <user_id> <gate|all> <expires_at:YYYY-MM-DDTHH:MM:SSZ>`") await update.message.reply_text("Usage: `/approve <user_id> <gate|all> <expires_at:YYYY-MM-DDTHH:MM:SSZ>`")
def main(): def main():
@@ -110,7 +192,9 @@ def main():
app.add_handler(CommandHandler("setcredentials", setcredentials)) app.add_handler(CommandHandler("setcredentials", setcredentials))
app.add_handler(CommandHandler("opengate", opengate)) app.add_handler(CommandHandler("opengate", opengate))
app.add_handler(CommandHandler("requestaccess", requestaccess)) app.add_handler(CommandHandler("requestaccess", requestaccess))
app.add_handler(CommandHandler("approve", approve)) app.add_handler(CommandHandler("grantaccess", grantaccess))
app.add_handler(CallbackQueryHandler(handle_main_menu_callback, pattern="^(open_gate_menu|request_access)$"))
app.add_handler(CallbackQueryHandler(handle_gate_open_callback, pattern="^opengate_"))
app.run_polling() app.run_polling()
if __name__ == "__main__": if __name__ == "__main__":

View File

@@ -6,27 +6,12 @@ class Status(Enum):
class Credential: class Credential:
def __init__(self, username: str, password: str): def __init__(self, username: str, password: str):
self._username: str = username self.username = username
self._password: str = password self.password = password
def __dict__(self): def to_dict(self) -> dict:
return { return {"username": self.username, "password": self.password}
"username": self._username,
"password": self._password @classmethod
} def from_dict(cls, data: dict):
return cls(data.get("username", ""), data.get("password", ""))
@property
def username(self) -> str:
return self._username
@username.setter
def username(self, username: str):
self._username = username
@property
def password(self) -> str:
return self._password
@password.setter
def password(self, password: str):
self._password = password

View File

@@ -2,18 +2,16 @@ import json
from telegram import BotCommand from telegram import BotCommand
class BotConfig: class BotConfig:
def __init__(self, bot_username: str, json_path: str = "./data/config.json"): def __init__(self, bot_username: str, json_path: str = "./data/config.json"):
self._json_path: str = json_path self._json_path = json_path
self._bot_username: str = bot_username self._bot_username = bot_username
self._token: str = "" self._token: str = ""
self._name: str = "" self._name: str = ""
self._description: str = "" self._description: str = ""
self._short_description: str = "" self._short_description: str = ""
self._commands: list[BotCommand] = [] self._commands: list[BotCommand] = []
self._load_config() self._load_config()
def _load_config(self): def _load_config(self):
try: try:
with open(self._json_path, "r") as f: with open(self._json_path, "r") as f:
@@ -27,24 +25,24 @@ class BotConfig:
for command, description in config.get("commands", {}).items() for command, description in config.get("commands", {}).items()
] ]
except Exception: except Exception:
return {} pass
@property @property
def token(self) -> str: def token(self) -> str:
return self._token return self._token
@property @property
def name(self) -> str: def name(self) -> str:
return self._name return self._name
@property @property
def description(self) -> str: def description(self) -> str:
return self._description return self._description
@property @property
def short_description(self) -> str: def short_description(self) -> str:
return self._short_description return self._short_description
@property @property
def commands(self) -> list[BotCommand]: def commands(self) -> list[BotCommand]:
return self._commands return self._commands

View File

@@ -2,47 +2,37 @@ import json
from avconnect import AVConnectAPI from avconnect import AVConnectAPI
from commons import Status, Credential from commons import Status, Credential
class _Gate: class Gate:
def __init__(self, id: str, name: str, status: int | Status = Status.ENABLED): def __init__(self, id: str, name: str, status: Status = Status.ENABLED):
self._id: str = id self.id = id
self._name: str = name self.name = name
self._status: Status = status if isinstance(status, Status) else Status(status) self.status = status if isinstance(status, Status) else Status(status)
@property def to_dict(self):
def id(self) -> str: return {"id": self.id, "name": self.name, "status": self.status.value}
return self._id
@classmethod
@property def from_dict(cls, data: dict):
def name(self) -> str: return cls(data["id"], data["name"], Status(data.get("status", Status.ENABLED)))
return self._name
@property
def status(self) -> Status:
return self._status
@status.setter
def status(self, status: int | Status):
self._status = status if isinstance(status, Status) else Status(status)
class Gates: class Gates:
def __init__(self, json_path: str = "./data/gates.json"): def __init__(self, json_path: str = "./data/gates.json"):
self._json_path: str = json_path self._json_path: str = json_path
self._gates: dict[str, _Gate] = self._load_gates() self._gates: dict[str, Gate] = self._load_gates()
def _load_gates(self) -> dict[str, _Gate]: def _load_gates(self) -> dict[str, Gate]:
try: try:
with open(self._json_path, "r") as file: with open(self._json_path, "r") as file:
gates_data = json.load(file) gates_data = json.load(file)
return {gate: _Gate(data["id"], data["name"]) for gate, data in gates_data.items()} return {gate: Gate.from_dict(data) for gate, data in gates_data.items()}
except Exception: except Exception:
return {} return {}
def get_name(self, gate: str) -> str: def get_name(self, gate: str) -> str | None:
return self._gates.get(gate, {}).name return self._gates[gate].name if gate in self._gates else None
def open_gate(self, gate: str, credentials: Credential) -> bool: def open_gate(self, gate: str, credentials: Credential) -> bool:
if gate not in self._gates.keys(): if gate not in self._gates:
return False return False
if self._gates[gate].status == Status.DISABLED: if self._gates[gate].status == Status.DISABLED:
return False return False

329
users.py
View File

@@ -8,221 +8,190 @@ class Role(Enum):
MEMBER = "member" MEMBER = "member"
GUEST = "guest" GUEST = "guest"
class _Grant: class Grant:
def __init__(self, grantor: str, expires_at: datetime, granted_at: datetime = datetime.now(), last_used_at: datetime = None, status: Status = Status.ENABLED): def __init__(
self._grantor: str = grantor self,
self._granted_at: datetime = granted_at grantor: str,
self._expires_at = expires_at expires_at: datetime,
self._last_used_at = last_used_at granted_at: datetime = None,
self._status: Status = status last_used_at: datetime = None,
status: Status = Status.ENABLED
def __dict__(self): ):
self.grantor = grantor
self.granted_at = granted_at or datetime.now(timezone.utc)
self.expires_at = expires_at
self.last_used_at = last_used_at
self.status = status
def to_dict(self) -> dict:
return { return {
"grantor": self._grantor, "grantor": self.grantor,
"granted_at": self._granted_at.isoformat(), "granted_at": self.granted_at.isoformat(),
"expires_at": self._expires_at.isoformat(), "expires_at": self.expires_at.isoformat(),
"last_used_at": self._last_used_at.isoformat() if self._last_used_at else None, "last_used_at": self.last_used_at.isoformat() if self.last_used_at else None,
"status": self._status.value "status": self.status.value
} }
@property @classmethod
def grantor(self) -> str: def from_dict(cls, data: dict):
return self._grantor return cls(
grantor=data.get("grantor", ""),
@grantor.setter expires_at=datetime.fromisoformat(data["expires_at"].replace("Z", "+00:00")),
def grantor(self, grantor: str): granted_at=datetime.fromisoformat(data["granted_at"].replace("Z", "+00:00")) if data.get("granted_at") else None,
self._grantor = grantor last_used_at=datetime.fromisoformat(data["last_used_at"].replace("Z", "+00:00")) if data.get("last_used_at") else None,
status=Status(data.get("status", Status.ENABLED))
@property )
def granted_at(self) -> datetime:
return self._granted_at
@property
def expires_at(self) -> datetime:
return self._expires_at
@property
def last_used_at(self) -> datetime:
return self._last_used_at
@property
def status(self) -> Status:
return self._status
class _User: class User:
def __init__(self, id: str, username: str, fullname: str, role: Role = Role.GUEST, credentials: Credential = Credential("", ""), grants: dict = None, status: Status = Status.ENABLED): def __init__(
self._id: str = id self,
self._username: str = username id: str,
self._fullname: str = fullname username: str,
self._role: Role = role if isinstance(role, Role) else Role(role) fullname: str,
self._credentials: Credential = credentials role: Role = Role.GUEST,
self._grants: dict[str, _Grant] = {gate:_Grant(grant.get("grantor", ""), datetime.fromisoformat(grant.get("expires_at").replace("Z", "+00:00"))) for gate, grant in grants.items()} if grants else {} credentials: Credential = None,
self._status: Status = status grants: dict[str, Grant] = None,
status: Status = Status.ENABLED
def __dict__(self): ):
self.id = id
self.username = username
self.fullname = fullname
self.role = role if isinstance(role, Role) else Role(role)
self.credentials = credentials or Credential("", "")
self.grants = grants or {}
self.status = status if isinstance(status, Status) else Status(status)
def to_dict(self) -> dict:
return { return {
"id": self._id, "id": self.id,
"username": self._username, "username": self.username,
"fullname": self._fullname, "fullname": self.fullname,
"role": self._role.value, "role": self.role.value,
"credentials": self._credentials.__dict__(), "credentials": self.credentials.to_dict(),
"grants": {gate: grant.__dict__() for gate, grant in self._grants.items()}, "grants": {gate: grant.to_dict() for gate, grant in self.grants.items()},
"status": self._status.value "status": self.status.value
} }
@property
def id(self) -> str:
return self._id
@property
def username(self) -> str:
return self._username
@username.setter
def username(self, username: str):
self._username = username
@property
def fullname(self) -> str:
return self._fullname
@fullname.setter
def fullname(self, fullname: str):
self._fullname = fullname
@property
def role(self) -> Role:
return self._role
@role.setter
def role(self, role: str | Role):
self._role = role if isinstance(role, Role) else Role(role)
@property
def credentials(self) -> Credential:
return self._credentials
@credentials.setter
def credentials(self, credentials: Credential):
self._credentials = credentials
@property
def grants(self) -> dict[str, _Grant]:
return self._grants
@property
def status(self) -> Status:
return self._status
@status.setter
def status(self, status: int | Status):
self._status = status if isinstance(status, Status) else Status(status)
def grant(self, gate: str, grant: _Grant):
self._grants[gate] = grant
def revoke(self, gate: str):
self._grants.pop(gate, None)
@classmethod
def from_dict(cls, id: str, data: dict):
credentials = Credential.from_dict(data.get("credentials", {}))
grants = {gate: Grant.from_dict(grant) for gate, grant in data.get("grants", {}).items()}
return cls(
id=id,
username=data.get("username", ""),
fullname=data.get("fullname", ""),
role=Role(data.get("role", Role.GUEST)),
credentials=credentials,
grants=grants,
status=Status(data.get("status", Status.ENABLED))
)
class Users: class Users:
def __init__(self, json_path: str = "./data/users.json"): def __init__(self, json_path: str = "./data/users.json"):
self._json_path: str = json_path self._json_path = json_path
self._users = self._load_users() self._users: dict[str, User] = self._load_users()
def _load_users(self) -> dict[str, _User]: def _load_users(self) -> dict[str, User]:
try: try:
with open(self._json_path, "r") as f: with open(self._json_path, "r") as f:
users = {} data = json.load(f)
for uid, info in json.load(f).items(): return {uid: User.from_dict(uid, info) for uid, info in data.items()}
try:
user = _User(
id=uid,
username=info.get("username", ""),
fullname=info.get("fullname", ""),
role=info.get("role", Role.GUEST),
credentials=Credential(info.get("credentials", {}).get("username", ""), info.get("credentials", {}).get("password", "")),
grants={gate: info for gate, info in info.get("grants", {}).items()},
status=info.get("status", Status.ENABLED)
)
users[uid] = user
except:
continue
return users
except Exception: except Exception:
return {} return {}
def _save_users(self) -> None: def _save_users(self) -> None:
with open(self._json_path, "w") as f: with open(self._json_path, "w") as f:
json.dump({uid: user.__dict__ for uid, user in self._users.items()}, f, default=str) json.dump({uid: user.to_dict() for uid, user in self._users.items()}, f, indent=2)
def update_user(self, id: str, username: str, fullname: str) -> bool: def update_user(self, id: str, username: str, fullname: str) -> bool:
if id in self._users.keys(): if not id or not username or not fullname:
return False
if id in self._users:
self._users[id].username = username self._users[id].username = username
self._users[id].fullname = fullname self._users[id].fullname = fullname
else: else:
self._users[id] = _User(id, username, fullname) self._users[id] = User(id, username, fullname)
self._save_users() self._save_users()
return True return True
def get_username(self, id: str) -> str:
if id in self._users.keys():
return self._users[id].username
return None
def get_fullname(self, id: str) -> str:
if id in self._users.keys():
return self._users[id].fullname
return None
def get_status(self, id: str) -> Status:
if id in self._users.keys():
return self._users[id].status
return Status.DISABLED
def get_role(self, id: str) -> Role:
if id in self._users.keys():
return self._users[id].role
return Role.GUEST
def get_credentials(self, id: str) -> Credential: def get_username(self, id: str) -> str | None:
if id in self._users.keys(): return self._users[id].username if id in self._users else None
return self._users[id].credentials
return None def get_fullname(self, id: str) -> str | None:
return self._users[id].fullname if id in self._users else None
def get_status(self, id: str) -> Status:
return self._users[id].status if id in self._users else Status.DISABLED
def get_role(self, id: str) -> Role:
return self._users[id].role if id in self._users else Role.GUEST
def get_credentials(self, id: str) -> Credential | None:
return self._users[id].credentials if id in self._users else None
def set_credentials(self, id: str, credentials: Credential) -> bool: def set_credentials(self, id: str, credentials: Credential) -> bool:
if id in self._users.keys(): if id in self._users:
self._users[id].credentials = credentials self._users[id].credentials = credentials
self._save_users()
return True return True
return False return False
def can_open_gate(self, id: str, gate: str) -> bool: def can_open_gate(self, id: str, gate: str) -> bool:
if id in self._users.keys(): user = self._users.get(id)
if gate in self._users[id].grants.keys(): if not user or user.status != Status.ENABLED:
if self._users[id].grants[gate].status == Status.ENABLED: return False
if self._users[id].grants[gate].expires_at > datetime.now(timezone.utc): if user.role == Role.ADMIN or user.role == Role.MEMBER:
return True return True
return False grant = user.grants.get(gate)
if not grant or grant.status != Status.ENABLED:
def get_grantor(self, id: str, gate: str) -> str: return False
if id in self._users.keys(): if grant.expires_at <= datetime.now(timezone.utc):
if gate in self._users[id].grants.keys(): return False
return self._users[id].grants[gate].grantor return True
return None
def has_grants(self, id: str) -> bool:
user = self._users.get(id)
if not user or user.status != Status.ENABLED:
return False
if user.role == Role.ADMIN or user.role == Role.MEMBER:
return True
return any(grant.status == Status.ENABLED and grant.expires_at > datetime.now(timezone.utc) for grant in user.grants.values())
def get_grantor(self, id: str, gate: str) -> str | None:
user = self._users.get(id)
if not user:
return None
grant = user.grants.get(gate)
return grant.grantor if grant else None
def get_admins(self) -> list[str]: def get_admins(self) -> list[str]:
return [uid for uid, user in self._users.items() if user.role == Role.ADMIN] return [uid for uid, user in self._users.items() if user.role == Role.ADMIN]
def grant_access(self, id: str, gate: str, expires_at: datetime, grantor_id: str) -> bool: def grant_access(self, id: str, gate: str, expires_at: datetime, grantor_id: str) -> bool:
if id in self._users.keys(): user = self._users.get(id)
self._users[id].grant(gate, _Grant(grantor_id, expires_at)) if not user:
self._save_users() return False
return True user.grants[gate] = Grant(grantor_id, expires_at)
return False self._save_users()
return True
def revoke_access(self, id: str, gate: str) -> bool: def revoke_access(self, id: str, gate: str) -> bool:
if id in self._users.keys(): user = self._users.get(id)
self._users[id].revoke(gate) if not user:
return False
if gate in user.grants:
del user.grants[gate]
self._save_users() self._save_users()
return True return True
return False return False
def update_grant_last_used(self, user_id: str, gate: str) -> bool:
user = self._users.get(user_id)
if not user:
return False
grant = user.grants.get(gate)
if not grant:
return False
grant.last_used_at = datetime.now(timezone.utc)
self._save_users()
return True