From 1d423c5ea2e8dcccd56562643686117f8573583e Mon Sep 17 00:00:00 2001 From: Alessandro Franchini Date: Fri, 22 Aug 2025 02:56:02 +0200 Subject: [PATCH] Refactor users creating separate classes, service and repository --- bot.py | 24 ++-- src/handlers/__init__.py | 4 +- src/handlers/gates.py | 33 +---- src/handlers/main.py | 11 +- src/handlers/users.py | 8 +- src/models/__init__.py | 9 +- src/models/grant.py | 38 ++++++ src/models/role.py | 6 + src/models/user.py | 42 ++++++ src/models/users.py | 208 ----------------------------- src/repository/__init__.py | 4 +- src/repository/users_repository.py | 33 +++++ src/services/__init__.py | 4 +- src/services/gates_service.py | 23 ++-- src/services/users_service.py | 125 +++++++++++++++++ tests/resources/mock_users.json | 13 ++ 16 files changed, 309 insertions(+), 276 deletions(-) create mode 100644 src/models/grant.py create mode 100644 src/models/role.py create mode 100644 src/models/user.py delete mode 100644 src/models/users.py create mode 100644 src/repository/users_repository.py create mode 100644 src/services/users_service.py create mode 100644 tests/resources/mock_users.json diff --git a/bot.py b/bot.py index 2314569..a21cff8 100644 --- a/bot.py +++ b/bot.py @@ -1,27 +1,27 @@ from telegram.ext import Application, CommandHandler, MessageHandler, CallbackQueryHandler from functools import partial from config import BotConfig -from models import Users -from services import GatesService, AVConnectService -from repository import GatesRepository +from services import GatesService, AVConnectService, UsersService +from repository import GatesRepository, UsersRepository from handlers import * bot_config = BotConfig("lagomareGateKeeperBot") gates_repository = GatesRepository() avconnect_service = AVConnectService() gates_service = GatesService(gates_repository, avconnect_service) -users = Users() +users_repository = UsersRepository() +users_service = UsersService(users_repository) def main(): app = Application.builder().token(bot_config.token).post_init(partial(post_init, bot_config=bot_config)).build() - app.add_handler(MessageHandler(None, partial(updateuser, users=users)), group=1) - app.add_handler(CommandHandler("start", partial(start, users=users))) - app.add_handler(CommandHandler("setcredentials", partial(setcredentials, users=users))) - app.add_handler(CommandHandler("opengate", partial(opengate, users=users, gates=gates_service))) - app.add_handler(CommandHandler("requestaccess", partial(requestaccess, users=users))) - app.add_handler(CommandHandler("grantaccess", partial(grantaccess, users=users))) - app.add_handler(CallbackQueryHandler(partial(handle_main_menu_callback, users=users, gates=gates_service), pattern="^(open_gate_menu|request_access)$")) - app.add_handler(CallbackQueryHandler(partial(handle_gate_open_callback, users=users, gates=gates_service), pattern="^opengate_")) + # app.add_handler(MessageHandler(None, partial(handle_message, users=users_service)), group=1) + app.add_handler(CommandHandler("start", partial(start, users=users_service))) + app.add_handler(CommandHandler("setcredentials", partial(setcredentials, users=users_service))) + app.add_handler(CommandHandler("opengate", partial(opengate, users=users_service, gates=gates_service))) + app.add_handler(CommandHandler("requestaccess", partial(requestaccess, users=users_service))) + app.add_handler(CommandHandler("grantaccess", partial(grantaccess, users=users_service))) + app.add_handler(CallbackQueryHandler(partial(handle_main_menu_callback, users=users_service, gates=gates_service), pattern="^(open_gate_menu|request_access)$")) + app.add_handler(CallbackQueryHandler(partial(handle_gate_open_callback, users=users_service, gates=gates_service), pattern="^opengate_")) app.run_polling() if __name__ == "__main__": diff --git a/src/handlers/__init__.py b/src/handlers/__init__.py index 25fa59e..cce664e 100644 --- a/src/handlers/__init__.py +++ b/src/handlers/__init__.py @@ -2,7 +2,7 @@ from .access import requestaccess, grantaccess from .credentials import setcredentials from .gates import opengate, open_gate_menu, handle_gate_open_callback from .main import handle_main_menu_callback, post_init, start -from .users import updateuser +from .users import handle_message __all__ = [ "grantaccess", @@ -14,5 +14,5 @@ __all__ = [ "requestaccess", "setcredentials", "start", - "updateuser" + "handle_message" ] \ No newline at end of file diff --git a/src/handlers/gates.py b/src/handlers/gates.py index 743a3be..48fc69f 100644 --- a/src/handlers/gates.py +++ b/src/handlers/gates.py @@ -1,9 +1,8 @@ from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup from telegram.ext import ContextTypes -from models import Users, Role -from services import GatesService +from services import GatesService, UsersService -async def opengate(update: Update, context: ContextTypes.DEFAULT_TYPE, users: Users, gates_service: GatesService): +async def opengate(update: Update, context: ContextTypes.DEFAULT_TYPE, gates_service: GatesService): assert update.effective_user is not None assert update.message is not None @@ -13,37 +12,15 @@ async def opengate(update: Update, context: ContextTypes.DEFAULT_TYPE, users: Us return await update.message.reply_text("Usage: `/opengate `") gate = args[0] gate_name = gates_service.get_name(gate) - role = users.get_role(user_id) - if role in (Role.ADMIN, Role.MEMBER): - creds = users.get_credentials(user_id) - if not creds: - return await update.message.reply_text("Please set your credentials with `/setcredentials` first") - elif role == Role.GUEST and users.can_open_gate(user_id, gate): - grantor = users.get_grantor(user_id, gate) - assert grantor is not None - - creds = users.get_credentials(grantor) - if not grantor: - return await update.message.reply_text("No valid grantor available.") - if not creds: - return await update.message.reply_text("No valid grantor credentials available.") - users.update_grant_last_used(user_id, gate) - try: - await context.bot.send_message(chat_id=grantor, text=f"Guest {user_id} opened {gate_name}") - except Exception as e: - print(f"Failed to notify {grantor} that guest {user_id} opened {gate_name}: {e}") - else: - return await update.message.reply_text("Access denied.") - - if gates_service.open_gate(gate, creds): + if gates_service.open_gate(gate, user_id): return await update.message.reply_text(f"Gate {gate_name} opened!") await update.message.reply_text(f"ERROR: Cannot open gate {gate_name}") -async def open_gate_menu(update: Update, context: ContextTypes.DEFAULT_TYPE, users: Users, gates: Gates): +async def open_gate_menu(update: Update, context: ContextTypes.DEFAULT_TYPE, users_service: UsersService, gates: Gates): assert update.effective_user is not None user_id = str(update.effective_user.id) - granted_gates = users.get_granted_gates(user_id) + granted_gates = users_service.get_granted_gates(user_id) if not granted_gates: if update.callback_query: await update.callback_query.answer("You have no gates to open.") diff --git a/src/handlers/main.py b/src/handlers/main.py index 8df9401..c6fe08b 100644 --- a/src/handlers/main.py +++ b/src/handlers/main.py @@ -1,9 +1,11 @@ from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup from telegram.ext import Application, ContextTypes + +from models import Role from .gates import open_gate_menu from .access import requestaccess -from models import Gates, Users, Role from config import BotConfig +from services import UsersService async def post_init(application: Application, bot_config: BotConfig) -> None: await application.bot.set_my_name(bot_config.name) @@ -11,17 +13,18 @@ async def post_init(application: Application, bot_config: BotConfig) -> None: await application.bot.set_my_short_description(bot_config.short_description) await application.bot.set_my_commands(bot_config.commands) -async def start(update: Update, context: ContextTypes.DEFAULT_TYPE, users: Users): +async def start(update: Update, context: ContextTypes.DEFAULT_TYPE, users_service: UsersService): assert update.effective_user is not None assert update.message is not None user_id = str(update.effective_user.id) - role = users.get_role(user_id) + user = users_service.add_user(user_id) + role = user.role keyboard = [] if role == Role.GUEST: # Guests: can request access, or open a gate if granted - if users.has_grants(user_id): + if users_service.has_grants(user_id): keyboard.append([InlineKeyboardButton("Open Gate", callback_data="open_gate_menu")]) keyboard.append([InlineKeyboardButton("Request Access", callback_data="request_access")]) elif role == Role.MEMBER: diff --git a/src/handlers/users.py b/src/handlers/users.py index 20eaba9..bb936c3 100644 --- a/src/handlers/users.py +++ b/src/handlers/users.py @@ -1,11 +1,9 @@ from telegram import Update from telegram.ext import ContextTypes -from models import Users +from services import UsersService -async def updateuser(update: Update, context: ContextTypes.DEFAULT_TYPE, users: Users): +async def handle_message(update: Update, context: ContextTypes.DEFAULT_TYPE, users_service: UsersService): assert update.effective_user is not None user_id = str(update.effective_user.id) - username = update.effective_user.username - fullname = update.effective_user.full_name - users.update_user(user_id, username, fullname) \ No newline at end of file + users_service.add_user(user_id) \ No newline at end of file diff --git a/src/models/__init__.py b/src/models/__init__.py index bf315d6..c567b54 100644 --- a/src/models/__init__.py +++ b/src/models/__init__.py @@ -1,12 +1,15 @@ from .credential import Credential from .gate import Gate from .status import Status -from .users import Users, Role +from .user import User +from .grant import Grant +from .role import Role __all__ = [ "Credential", "Gate", "Status", - "Users", - "Role" + "User", + "Role", + "Grant" ] \ No newline at end of file diff --git a/src/models/grant.py b/src/models/grant.py new file mode 100644 index 0000000..05fc7f8 --- /dev/null +++ b/src/models/grant.py @@ -0,0 +1,38 @@ +from datetime import datetime, timezone + +from models import Status + + +class Grant: + def __init__( + self, + grantor: str, + expires_at: datetime, + granted_at: datetime | None = None, + last_used_at: datetime | None = None, + status: Status = Status.ENABLED + ): + self.grantor = grantor + self.granted_at = granted_at or datetime.now(timezone.utc) + self.expires_at = expires_at + self.last_used_at = last_used_at + self.status = status + + def to_dict(self) -> dict: + return { + "grantor": self.grantor, + "granted_at": self.granted_at.isoformat(), + "expires_at": self.expires_at.isoformat(), + "last_used_at": self.last_used_at.isoformat() if self.last_used_at else None, + "status": self.status.value + } + + @classmethod + def from_dict(cls, data: dict): + return cls( + grantor=data.get("grantor", ""), + expires_at=datetime.fromisoformat(data["expires_at"].replace("Z", "+00:00")), + granted_at=datetime.fromisoformat(data["granted_at"].replace("Z", "+00:00")) if data.get("granted_at") else None, + last_used_at=datetime.fromisoformat(data["last_used_at"].replace("Z", "+00:00")) if data.get("last_used_at") else None, + status=Status(data.get("status", Status.ENABLED)) + ) diff --git a/src/models/role.py b/src/models/role.py new file mode 100644 index 0000000..99b2459 --- /dev/null +++ b/src/models/role.py @@ -0,0 +1,6 @@ +from enum import Enum + +class Role(Enum): + ADMIN = "admin" + MEMBER = "member" + GUEST = "guest" \ No newline at end of file diff --git a/src/models/user.py b/src/models/user.py new file mode 100644 index 0000000..d4ee3ba --- /dev/null +++ b/src/models/user.py @@ -0,0 +1,42 @@ +from enum import Enum + +from . import Grant +from .role import Role +from .status import Status +from .credential import Credential + +class User: + def __init__( + self, + id: str, + role: Role = Role.GUEST, + credentials: Credential | None = None, + grants: dict[str, Grant] | None = None, + status: Status = Status.ENABLED + ): + self.id = id + self.role = role if isinstance(role, Role) else Role(role) + self.credentials = credentials or Credential("", "") + self.grants = grants or {} + self.status = status if isinstance(status, Status) else Status(status) + + def to_dict(self) -> dict: + return { + "id": self.id, + "role": self.role.value, + "credentials": self.credentials.to_dict(), + "grants": {gate: grant.to_dict() for gate, grant in self.grants.items()}, + "status": self.status.value + } + + @classmethod + def from_dict(cls, id: str, data: dict): + credentials = Credential.from_dict(data.get("credentials", {})) + grants = {gate: Grant.from_dict(grant) for gate, grant in data.get("grants", {}).items()} + return cls( + id=id, + role=Role(data.get("role", Role.GUEST)), + credentials=credentials, + grants=grants, + status=Status(data.get("status", Status.ENABLED)) + ) \ No newline at end of file diff --git a/src/models/users.py b/src/models/users.py deleted file mode 100644 index 02dbfd2..0000000 --- a/src/models/users.py +++ /dev/null @@ -1,208 +0,0 @@ -import json -from enum import Enum -from datetime import datetime, timezone -from .status import Status -from .credential import Credential - -class Role(Enum): - ADMIN = "admin" - MEMBER = "member" - GUEST = "guest" - -class Grant: - def __init__( - self, - grantor: str, - expires_at: datetime, - granted_at: datetime | None = None, - last_used_at: datetime | None = None, - status: Status = Status.ENABLED - ): - self.grantor = grantor - self.granted_at = granted_at or datetime.now(timezone.utc) - self.expires_at = expires_at - self.last_used_at = last_used_at - self.status = status - - def to_dict(self) -> dict: - return { - "grantor": self.grantor, - "granted_at": self.granted_at.isoformat(), - "expires_at": self.expires_at.isoformat(), - "last_used_at": self.last_used_at.isoformat() if self.last_used_at else None, - "status": self.status.value - } - - @classmethod - def from_dict(cls, data: dict): - return cls( - grantor=data.get("grantor", ""), - expires_at=datetime.fromisoformat(data["expires_at"].replace("Z", "+00:00")), - granted_at=datetime.fromisoformat(data["granted_at"].replace("Z", "+00:00")) if data.get("granted_at") else None, - last_used_at=datetime.fromisoformat(data["last_used_at"].replace("Z", "+00:00")) if data.get("last_used_at") else None, - status=Status(data.get("status", Status.ENABLED)) - ) - -class User: - def __init__( - self, - id: str, - username: str, - fullname: str, - role: Role = Role.GUEST, - credentials: Credential | None = None, - grants: dict[str, Grant] | None = None, - status: Status = Status.ENABLED - ): - self.id = id - self.username = username - self.fullname = fullname - self.role = role if isinstance(role, Role) else Role(role) - self.credentials = credentials or Credential("", "") - self.grants = grants or {} - self.status = status if isinstance(status, Status) else Status(status) - - def to_dict(self) -> dict: - return { - "id": self.id, - "username": self.username, - "fullname": self.fullname, - "role": self.role.value, - "credentials": self.credentials.to_dict(), - "grants": {gate: grant.to_dict() for gate, grant in self.grants.items()}, - "status": self.status.value - } - - @classmethod - def from_dict(cls, id: str, data: dict): - credentials = Credential.from_dict(data.get("credentials", {})) - grants = {gate: Grant.from_dict(grant) for gate, grant in data.get("grants", {}).items()} - return cls( - id=id, - username=data.get("username", ""), - fullname=data.get("fullname", ""), - role=Role(data.get("role", Role.GUEST)), - credentials=credentials, - grants=grants, - status=Status(data.get("status", Status.ENABLED)) - ) - -class Users: - def __init__(self, json_path: str = "./data/users.json"): - self._json_path = json_path - self._users: dict[str, User] = self._load_users() - - def _load_users(self) -> dict[str, User]: - try: - with open(self._json_path, "r") as f: - data = json.load(f) - return {uid: User.from_dict(uid, info) for uid, info in data.items()} - except Exception: - return {} - - def _save_users(self) -> None: - with open(self._json_path, "w") as f: - json.dump({uid: user.to_dict() for uid, user in self._users.items()}, f, indent=2) - - def update_user(self, id: str, username: str | None, fullname: str | None) -> bool: - if not id or not username or not fullname: - return False - if id in self._users: - self._users[id].username = username - self._users[id].fullname = fullname - else: - self._users[id] = User(id, username, fullname) - self._save_users() - return True - - def get_username(self, id: str) -> str | None: - return self._users[id].username if id in self._users else None - - def get_fullname(self, id: str) -> str | None: - return self._users[id].fullname if id in self._users else None - - def get_status(self, id: str) -> Status: - return self._users[id].status if id in self._users else Status.DISABLED - - def get_role(self, id: str) -> Role: - return self._users[id].role if id in self._users else Role.GUEST - - def get_credentials(self, id: str) -> Credential | None: - return self._users[id].credentials if id in self._users else None - - def set_credentials(self, id: str, credentials: Credential) -> bool: - if id in self._users: - self._users[id].credentials = credentials - self._save_users() - return True - return False - - def can_open_gate(self, id: str, gate: str) -> bool: - user = self._users.get(id) - if not user or user.status != Status.ENABLED: - return False - if user.role == Role.ADMIN or user.role == Role.MEMBER: - return True - grant = user.grants.get(gate) - if not grant or grant.status != Status.ENABLED: - return False - if grant.expires_at <= datetime.now(timezone.utc): - return False - return True - - def has_grants(self, id: str) -> bool: - user = self._users.get(id) - if not user or user.status != Status.ENABLED: - return False - if user.role == Role.ADMIN or user.role == Role.MEMBER: - return True - return any(grant.status == Status.ENABLED and grant.expires_at > datetime.now(timezone.utc) for grant in user.grants.values()) - - def get_grantor(self, id: str, gate: str) -> str | None: - user = self._users.get(id) - if not user: - return None - grant = user.grants.get(gate) - return grant.grantor if grant else None - - def get_admins(self) -> list[str]: - return [uid for uid, user in self._users.items() if user.role == Role.ADMIN] - - def grant_access(self, id: str, gate: str, expires_at: datetime, grantor_id: str) -> bool: - user = self._users.get(id) - if not user: - return False - user.grants[gate] = Grant(grantor_id, expires_at) - self._save_users() - return True - - def revoke_access(self, id: str, gate: str) -> bool: - user = self._users.get(id) - if not user: - return False - if gate in user.grants: - del user.grants[gate] - self._save_users() - return True - return False - - def update_grant_last_used(self, user_id: str, gate: str) -> bool: - user = self._users.get(user_id) - if not user: - return False - grant = user.grants.get(gate) - if not grant: - return False - grant.last_used_at = datetime.now(timezone.utc) - self._save_users() - return True - - def get_granted_gates(self, user_id: str) -> list[str]: - user = self._users.get(user_id) - if not user: - return [] - if user.role == Role.ADMIN or user.role == Role.MEMBER: - return ['all'] - if 'all' in user.grants and user.grants['all'].status == Status.ENABLED and user.grants['all'].expires_at > datetime.now(timezone.utc): - return ['all'] - return [gate for gate, grant in user.grants.items() if grant.status == Status.ENABLED and grant.expires_at > datetime.now(timezone.utc)] \ No newline at end of file diff --git a/src/repository/__init__.py b/src/repository/__init__.py index 200d2f9..7ff35b8 100644 --- a/src/repository/__init__.py +++ b/src/repository/__init__.py @@ -1,5 +1,7 @@ from .gates_repository import GatesRepository +from .users_repository import UsersRepository __all__ = [ - "GatesRepository" + "GatesRepository", + "UsersRepository", ] \ No newline at end of file diff --git a/src/repository/users_repository.py b/src/repository/users_repository.py new file mode 100644 index 0000000..7b4578f --- /dev/null +++ b/src/repository/users_repository.py @@ -0,0 +1,33 @@ +import json +from models import User + +class UsersRepository: + def __init__(self, json_path: str = "./data/users.json"): + self._json_path = json_path + self._users: dict[str, User] = self._load_users() + + def _load_users(self) -> dict[str, User]: + try: + with open(self._json_path, "r") as f: + data = json.load(f) + return {uid: User.from_dict(uid, info) for uid, info in data.items()} + except Exception: + return {} + + def save(self, uid: str, user: User) -> None: + self._users[uid] = user + self._dump_users() + + def _dump_users(self) -> None: + with open(self._json_path, "w") as f: + json.dump({uid: user.to_dict() for uid, user in self._users.items()}, f, indent=2) + + def delete(self, user: User) -> None: + del self._users[user.id] + self._dump_users() + + def get_by_uid(self, uid: str) -> User | None: + return self._users.get(uid) + + def get_all(self) -> list[User]: + return list(self._users.values()) \ No newline at end of file diff --git a/src/services/__init__.py b/src/services/__init__.py index b47e531..55b0d70 100644 --- a/src/services/__init__.py +++ b/src/services/__init__.py @@ -1,7 +1,9 @@ from .avconnect_service import AVConnectService from .gates_service import GatesService +from .users_service import UsersService __all__ = [ "AVConnectService", - "GatesService" + "GatesService", + "UsersService" ] \ No newline at end of file diff --git a/src/services/gates_service.py b/src/services/gates_service.py index a93965f..a817ebd 100644 --- a/src/services/gates_service.py +++ b/src/services/gates_service.py @@ -1,23 +1,22 @@ from models import Credential, Status from repository import GatesRepository -from services import AVConnectService +from services import AVConnectService, UsersService + class GatesService: - def __init__(self, gate_repository: GatesRepository, avconnect_service: AVConnectService): - self.gate_repository = gate_repository - self.avconnect_service = avconnect_service + def __init__(self, gate_repository: GatesRepository, avconnect_service: AVConnectService, user_service: UsersService): + self._gate_repository = gate_repository + self._avconnect_service = avconnect_service + self._user_service = user_service - def open_gate(self, gate_key: str, credentials: Credential) -> bool: - gate = self.gate_repository.get_by_key(gate_key) + def open_gate(self, gate_key: str, uid: str) -> bool: + gate = self._gate_repository.get_by_key(gate_key) if not gate or gate.status == Status.DISABLED: return False - try: - return self.avconnect_service.open_gate_by_id(gate.id, credentials) - except Exception as e: - print(f"Failed to open gate {gate.name}: {e}") - return False + credentials = self._user_service.get_credentials(uid, gate_key) + return self._avconnect_service.open_gate_by_id(gate.id, credentials) def get_name(self, gate_key: str) -> str | None: - gate = self.gate_repository.get_by_key(gate_key) + gate = self._gate_repository.get_by_key(gate_key) return gate.name if gate else None \ No newline at end of file diff --git a/src/services/users_service.py b/src/services/users_service.py new file mode 100644 index 0000000..9df2d00 --- /dev/null +++ b/src/services/users_service.py @@ -0,0 +1,125 @@ +from datetime import datetime, timezone + +from models import Status, Role, Credential, Grant, User +from repository import UsersRepository + + +class UsersService: + + def __init__(self, users_repository: UsersRepository): + self._users_repository = users_repository + + def add_user(self, uid: str) -> User: + user = self._users_repository.get_by_uid(uid) + if not user: + user = User(uid) + self._users_repository.save(uid, user) + return user + + def get_user(self, uid: str) -> User: + user = self._users_repository.get_by_uid(uid) + if not user: + raise Exception("User not found") + return user + + def get_role(self, uid: str) -> Role: + return self.get_user(uid).role + + def get_credentials(self, uid: str, gate_key: str) -> Credential | None: + user = self._users_repository.get_by_uid(uid) + if user.role in (Role.ADMIN, Role.MEMBER): + creds = user.credentials + if not creds: + raise Exception("Please set your credentials with `/setcredentials` first") + elif user.role == Role.GUEST and self.can_open_gate(uid, gate_key): + grantor = self.get_grantor(user, gate_key) + if not grantor: + raise Exception("No valid grantor available.") + creds = grantor.credentials + if not creds: + raise Exception("No valid grantor credentials available.") + self.update_grant_last_used(uid, gate_key) + # try: + # await context.bot.send_message(chat_id=grantor, text=f"Guest {uid} opened {gate_name}") + # except Exception as e: + # print(f"Failed to notify {grantor} that guest {uid} opened {gate_name}: {e}") + else: + raise Exception("Access denied.") + + def set_credentials(self, uid: str, credentials: Credential) -> bool: + user = self._users_repository.get_by_uid(uid) + if user: + user.credentials = credentials + self._users_repository.save(uid, user) + return True + return False + + def can_open_gate(self, uid: str, gate_key: str) -> bool: + user = self._users_repository.get_by_uid(uid) + if not user or user.status != Status.ENABLED: + return False + if user.role == Role.ADMIN or user.role == Role.MEMBER: + return True + grant = user.grants.get(gate_key) + if not grant or grant.status != Status.ENABLED: + return False + if grant.expires_at <= datetime.now(timezone.utc): + return False + return True + + def has_grants(self, uid: str) -> bool: + user = self._users_repository.get_by_uid(uid) + if not user or user.status != Status.ENABLED: + return False + if user.role == Role.ADMIN or user.role == Role.MEMBER: + return True + return any(grant.status == Status.ENABLED and grant.expires_at > datetime.now(timezone.utc) for grant in + user.grants.values()) + + def get_grantor(self, user: User, gate: str) -> User | None: + grant = user.grants.get(gate) + return self._users_repository.get_by_uid(grant.grantor) if grant else None + + def get_admins(self) -> list[str]: + return [user.id for user in self._users_repository.get_all() if user.role == Role.ADMIN] + + def grant_access(self, uid: str, gate: str, expires_at: datetime, grantor_id: str) -> bool: + user = self._users_repository.get_by_uid(uid) + if not user: + return False + user.grants[gate] = Grant(grantor_id, expires_at) + self._users_repository.save(uid, user) + return True + + def revoke_access(self, uid: str, gate: str) -> bool: + user = self._users_repository.get_by_uid(uid) + if not user: + return False + if gate in user.grants: + del user.grants[gate] + self._users_repository.save(uid, user) + return True + return False + + def update_grant_last_used(self, uid: str, gate_key: str) -> bool: + user = self._users_repository.get_by_uid(uid) + if not user: + return False + grant = user.grants.get(gate_key) + if not grant: + return False + grant.last_used_at = datetime.now(timezone.utc) + self._users_repository.save(uid, user) + return True + + def get_granted_gates(self, uid: str) -> list[str]: + user = self._users_repository.get_by_uid(uid) + if not user: + return [] + if user.role == Role.ADMIN or user.role == Role.MEMBER: + return ['all'] + if 'all' in user.grants and user.grants['all'].status == Status.ENABLED and user.grants[ + 'all'].expires_at > datetime.now(timezone.utc): + return ['all'] + return [gate for gate, grant in user.grants.items() if + grant.status == Status.ENABLED and grant.expires_at > datetime.now(timezone.utc)] diff --git a/tests/resources/mock_users.json b/tests/resources/mock_users.json new file mode 100644 index 0000000..b350a61 --- /dev/null +++ b/tests/resources/mock_users.json @@ -0,0 +1,13 @@ +{ + "12345": { + "id": "12345", + "username": "Pippo", + "fullname": "Pippo Franco", + "role": "admin", + "credentials": { + "username": "pippo.franco", + "password": "pippo12345" + }, + "status": 1 + } +} \ No newline at end of file