Refactor users creating separate classes, service and repository

This commit is contained in:
Alessandro Franchini
2025-08-22 02:56:02 +02:00
parent 58c0916deb
commit 1d423c5ea2
16 changed files with 309 additions and 276 deletions

24
bot.py
View File

@@ -1,27 +1,27 @@
from telegram.ext import Application, CommandHandler, MessageHandler, CallbackQueryHandler
from functools import partial
from config import BotConfig
from models import Users
from services import GatesService, AVConnectService
from repository import GatesRepository
from services import GatesService, AVConnectService, UsersService
from repository import GatesRepository, UsersRepository
from handlers import *
bot_config = BotConfig("lagomareGateKeeperBot")
gates_repository = GatesRepository()
avconnect_service = AVConnectService()
gates_service = GatesService(gates_repository, avconnect_service)
users = Users()
users_repository = UsersRepository()
users_service = UsersService(users_repository)
def main():
app = Application.builder().token(bot_config.token).post_init(partial(post_init, bot_config=bot_config)).build()
app.add_handler(MessageHandler(None, partial(updateuser, users=users)), group=1)
app.add_handler(CommandHandler("start", partial(start, users=users)))
app.add_handler(CommandHandler("setcredentials", partial(setcredentials, users=users)))
app.add_handler(CommandHandler("opengate", partial(opengate, users=users, gates=gates_service)))
app.add_handler(CommandHandler("requestaccess", partial(requestaccess, users=users)))
app.add_handler(CommandHandler("grantaccess", partial(grantaccess, users=users)))
app.add_handler(CallbackQueryHandler(partial(handle_main_menu_callback, users=users, gates=gates_service), pattern="^(open_gate_menu|request_access)$"))
app.add_handler(CallbackQueryHandler(partial(handle_gate_open_callback, users=users, gates=gates_service), pattern="^opengate_"))
# app.add_handler(MessageHandler(None, partial(handle_message, users=users_service)), group=1)
app.add_handler(CommandHandler("start", partial(start, users=users_service)))
app.add_handler(CommandHandler("setcredentials", partial(setcredentials, users=users_service)))
app.add_handler(CommandHandler("opengate", partial(opengate, users=users_service, gates=gates_service)))
app.add_handler(CommandHandler("requestaccess", partial(requestaccess, users=users_service)))
app.add_handler(CommandHandler("grantaccess", partial(grantaccess, users=users_service)))
app.add_handler(CallbackQueryHandler(partial(handle_main_menu_callback, users=users_service, gates=gates_service), pattern="^(open_gate_menu|request_access)$"))
app.add_handler(CallbackQueryHandler(partial(handle_gate_open_callback, users=users_service, gates=gates_service), pattern="^opengate_"))
app.run_polling()
if __name__ == "__main__":

View File

@@ -2,7 +2,7 @@ from .access import requestaccess, grantaccess
from .credentials import setcredentials
from .gates import opengate, open_gate_menu, handle_gate_open_callback
from .main import handle_main_menu_callback, post_init, start
from .users import updateuser
from .users import handle_message
__all__ = [
"grantaccess",
@@ -14,5 +14,5 @@ __all__ = [
"requestaccess",
"setcredentials",
"start",
"updateuser"
"handle_message"
]

View File

@@ -1,9 +1,8 @@
from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup
from telegram.ext import ContextTypes
from models import Users, Role
from services import GatesService
from services import GatesService, UsersService
async def opengate(update: Update, context: ContextTypes.DEFAULT_TYPE, users: Users, gates_service: GatesService):
async def opengate(update: Update, context: ContextTypes.DEFAULT_TYPE, gates_service: GatesService):
assert update.effective_user is not None
assert update.message is not None
@@ -13,37 +12,15 @@ async def opengate(update: Update, context: ContextTypes.DEFAULT_TYPE, users: Us
return await update.message.reply_text("Usage: `/opengate <gate>`")
gate = args[0]
gate_name = gates_service.get_name(gate)
role = users.get_role(user_id)
if role in (Role.ADMIN, Role.MEMBER):
creds = users.get_credentials(user_id)
if not creds:
return await update.message.reply_text("Please set your credentials with `/setcredentials` first")
elif role == Role.GUEST and users.can_open_gate(user_id, gate):
grantor = users.get_grantor(user_id, gate)
assert grantor is not None
creds = users.get_credentials(grantor)
if not grantor:
return await update.message.reply_text("No valid grantor available.")
if not creds:
return await update.message.reply_text("No valid grantor credentials available.")
users.update_grant_last_used(user_id, gate)
try:
await context.bot.send_message(chat_id=grantor, text=f"Guest {user_id} opened {gate_name}")
except Exception as e:
print(f"Failed to notify {grantor} that guest {user_id} opened {gate_name}: {e}")
else:
return await update.message.reply_text("Access denied.")
if gates_service.open_gate(gate, creds):
if gates_service.open_gate(gate, user_id):
return await update.message.reply_text(f"Gate {gate_name} opened!")
await update.message.reply_text(f"ERROR: Cannot open gate {gate_name}")
async def open_gate_menu(update: Update, context: ContextTypes.DEFAULT_TYPE, users: Users, gates: Gates):
async def open_gate_menu(update: Update, context: ContextTypes.DEFAULT_TYPE, users_service: UsersService, gates: Gates):
assert update.effective_user is not None
user_id = str(update.effective_user.id)
granted_gates = users.get_granted_gates(user_id)
granted_gates = users_service.get_granted_gates(user_id)
if not granted_gates:
if update.callback_query:
await update.callback_query.answer("You have no gates to open.")

View File

@@ -1,9 +1,11 @@
from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup
from telegram.ext import Application, ContextTypes
from models import Role
from .gates import open_gate_menu
from .access import requestaccess
from models import Gates, Users, Role
from config import BotConfig
from services import UsersService
async def post_init(application: Application, bot_config: BotConfig) -> None:
await application.bot.set_my_name(bot_config.name)
@@ -11,17 +13,18 @@ async def post_init(application: Application, bot_config: BotConfig) -> None:
await application.bot.set_my_short_description(bot_config.short_description)
await application.bot.set_my_commands(bot_config.commands)
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE, users: Users):
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE, users_service: UsersService):
assert update.effective_user is not None
assert update.message is not None
user_id = str(update.effective_user.id)
role = users.get_role(user_id)
user = users_service.add_user(user_id)
role = user.role
keyboard = []
if role == Role.GUEST:
# Guests: can request access, or open a gate if granted
if users.has_grants(user_id):
if users_service.has_grants(user_id):
keyboard.append([InlineKeyboardButton("Open Gate", callback_data="open_gate_menu")])
keyboard.append([InlineKeyboardButton("Request Access", callback_data="request_access")])
elif role == Role.MEMBER:

View File

@@ -1,11 +1,9 @@
from telegram import Update
from telegram.ext import ContextTypes
from models import Users
from services import UsersService
async def updateuser(update: Update, context: ContextTypes.DEFAULT_TYPE, users: Users):
async def handle_message(update: Update, context: ContextTypes.DEFAULT_TYPE, users_service: UsersService):
assert update.effective_user is not None
user_id = str(update.effective_user.id)
username = update.effective_user.username
fullname = update.effective_user.full_name
users.update_user(user_id, username, fullname)
users_service.add_user(user_id)

View File

@@ -1,12 +1,15 @@
from .credential import Credential
from .gate import Gate
from .status import Status
from .users import Users, Role
from .user import User
from .grant import Grant
from .role import Role
__all__ = [
"Credential",
"Gate",
"Status",
"Users",
"Role"
"User",
"Role",
"Grant"
]

38
src/models/grant.py Normal file
View File

@@ -0,0 +1,38 @@
from datetime import datetime, timezone
from models import Status
class Grant:
def __init__(
self,
grantor: str,
expires_at: datetime,
granted_at: datetime | None = None,
last_used_at: datetime | None = None,
status: Status = Status.ENABLED
):
self.grantor = grantor
self.granted_at = granted_at or datetime.now(timezone.utc)
self.expires_at = expires_at
self.last_used_at = last_used_at
self.status = status
def to_dict(self) -> dict:
return {
"grantor": self.grantor,
"granted_at": self.granted_at.isoformat(),
"expires_at": self.expires_at.isoformat(),
"last_used_at": self.last_used_at.isoformat() if self.last_used_at else None,
"status": self.status.value
}
@classmethod
def from_dict(cls, data: dict):
return cls(
grantor=data.get("grantor", ""),
expires_at=datetime.fromisoformat(data["expires_at"].replace("Z", "+00:00")),
granted_at=datetime.fromisoformat(data["granted_at"].replace("Z", "+00:00")) if data.get("granted_at") else None,
last_used_at=datetime.fromisoformat(data["last_used_at"].replace("Z", "+00:00")) if data.get("last_used_at") else None,
status=Status(data.get("status", Status.ENABLED))
)

6
src/models/role.py Normal file
View File

@@ -0,0 +1,6 @@
from enum import Enum
class Role(Enum):
ADMIN = "admin"
MEMBER = "member"
GUEST = "guest"

42
src/models/user.py Normal file
View File

@@ -0,0 +1,42 @@
from enum import Enum
from . import Grant
from .role import Role
from .status import Status
from .credential import Credential
class User:
def __init__(
self,
id: str,
role: Role = Role.GUEST,
credentials: Credential | None = None,
grants: dict[str, Grant] | None = None,
status: Status = Status.ENABLED
):
self.id = id
self.role = role if isinstance(role, Role) else Role(role)
self.credentials = credentials or Credential("", "")
self.grants = grants or {}
self.status = status if isinstance(status, Status) else Status(status)
def to_dict(self) -> dict:
return {
"id": self.id,
"role": self.role.value,
"credentials": self.credentials.to_dict(),
"grants": {gate: grant.to_dict() for gate, grant in self.grants.items()},
"status": self.status.value
}
@classmethod
def from_dict(cls, id: str, data: dict):
credentials = Credential.from_dict(data.get("credentials", {}))
grants = {gate: Grant.from_dict(grant) for gate, grant in data.get("grants", {}).items()}
return cls(
id=id,
role=Role(data.get("role", Role.GUEST)),
credentials=credentials,
grants=grants,
status=Status(data.get("status", Status.ENABLED))
)

View File

@@ -1,208 +0,0 @@
import json
from enum import Enum
from datetime import datetime, timezone
from .status import Status
from .credential import Credential
class Role(Enum):
ADMIN = "admin"
MEMBER = "member"
GUEST = "guest"
class Grant:
def __init__(
self,
grantor: str,
expires_at: datetime,
granted_at: datetime | None = None,
last_used_at: datetime | None = None,
status: Status = Status.ENABLED
):
self.grantor = grantor
self.granted_at = granted_at or datetime.now(timezone.utc)
self.expires_at = expires_at
self.last_used_at = last_used_at
self.status = status
def to_dict(self) -> dict:
return {
"grantor": self.grantor,
"granted_at": self.granted_at.isoformat(),
"expires_at": self.expires_at.isoformat(),
"last_used_at": self.last_used_at.isoformat() if self.last_used_at else None,
"status": self.status.value
}
@classmethod
def from_dict(cls, data: dict):
return cls(
grantor=data.get("grantor", ""),
expires_at=datetime.fromisoformat(data["expires_at"].replace("Z", "+00:00")),
granted_at=datetime.fromisoformat(data["granted_at"].replace("Z", "+00:00")) if data.get("granted_at") else None,
last_used_at=datetime.fromisoformat(data["last_used_at"].replace("Z", "+00:00")) if data.get("last_used_at") else None,
status=Status(data.get("status", Status.ENABLED))
)
class User:
def __init__(
self,
id: str,
username: str,
fullname: str,
role: Role = Role.GUEST,
credentials: Credential | None = None,
grants: dict[str, Grant] | None = None,
status: Status = Status.ENABLED
):
self.id = id
self.username = username
self.fullname = fullname
self.role = role if isinstance(role, Role) else Role(role)
self.credentials = credentials or Credential("", "")
self.grants = grants or {}
self.status = status if isinstance(status, Status) else Status(status)
def to_dict(self) -> dict:
return {
"id": self.id,
"username": self.username,
"fullname": self.fullname,
"role": self.role.value,
"credentials": self.credentials.to_dict(),
"grants": {gate: grant.to_dict() for gate, grant in self.grants.items()},
"status": self.status.value
}
@classmethod
def from_dict(cls, id: str, data: dict):
credentials = Credential.from_dict(data.get("credentials", {}))
grants = {gate: Grant.from_dict(grant) for gate, grant in data.get("grants", {}).items()}
return cls(
id=id,
username=data.get("username", ""),
fullname=data.get("fullname", ""),
role=Role(data.get("role", Role.GUEST)),
credentials=credentials,
grants=grants,
status=Status(data.get("status", Status.ENABLED))
)
class Users:
def __init__(self, json_path: str = "./data/users.json"):
self._json_path = json_path
self._users: dict[str, User] = self._load_users()
def _load_users(self) -> dict[str, User]:
try:
with open(self._json_path, "r") as f:
data = json.load(f)
return {uid: User.from_dict(uid, info) for uid, info in data.items()}
except Exception:
return {}
def _save_users(self) -> None:
with open(self._json_path, "w") as f:
json.dump({uid: user.to_dict() for uid, user in self._users.items()}, f, indent=2)
def update_user(self, id: str, username: str | None, fullname: str | None) -> bool:
if not id or not username or not fullname:
return False
if id in self._users:
self._users[id].username = username
self._users[id].fullname = fullname
else:
self._users[id] = User(id, username, fullname)
self._save_users()
return True
def get_username(self, id: str) -> str | None:
return self._users[id].username if id in self._users else None
def get_fullname(self, id: str) -> str | None:
return self._users[id].fullname if id in self._users else None
def get_status(self, id: str) -> Status:
return self._users[id].status if id in self._users else Status.DISABLED
def get_role(self, id: str) -> Role:
return self._users[id].role if id in self._users else Role.GUEST
def get_credentials(self, id: str) -> Credential | None:
return self._users[id].credentials if id in self._users else None
def set_credentials(self, id: str, credentials: Credential) -> bool:
if id in self._users:
self._users[id].credentials = credentials
self._save_users()
return True
return False
def can_open_gate(self, id: str, gate: str) -> bool:
user = self._users.get(id)
if not user or user.status != Status.ENABLED:
return False
if user.role == Role.ADMIN or user.role == Role.MEMBER:
return True
grant = user.grants.get(gate)
if not grant or grant.status != Status.ENABLED:
return False
if grant.expires_at <= datetime.now(timezone.utc):
return False
return True
def has_grants(self, id: str) -> bool:
user = self._users.get(id)
if not user or user.status != Status.ENABLED:
return False
if user.role == Role.ADMIN or user.role == Role.MEMBER:
return True
return any(grant.status == Status.ENABLED and grant.expires_at > datetime.now(timezone.utc) for grant in user.grants.values())
def get_grantor(self, id: str, gate: str) -> str | None:
user = self._users.get(id)
if not user:
return None
grant = user.grants.get(gate)
return grant.grantor if grant else None
def get_admins(self) -> list[str]:
return [uid for uid, user in self._users.items() if user.role == Role.ADMIN]
def grant_access(self, id: str, gate: str, expires_at: datetime, grantor_id: str) -> bool:
user = self._users.get(id)
if not user:
return False
user.grants[gate] = Grant(grantor_id, expires_at)
self._save_users()
return True
def revoke_access(self, id: str, gate: str) -> bool:
user = self._users.get(id)
if not user:
return False
if gate in user.grants:
del user.grants[gate]
self._save_users()
return True
return False
def update_grant_last_used(self, user_id: str, gate: str) -> bool:
user = self._users.get(user_id)
if not user:
return False
grant = user.grants.get(gate)
if not grant:
return False
grant.last_used_at = datetime.now(timezone.utc)
self._save_users()
return True
def get_granted_gates(self, user_id: str) -> list[str]:
user = self._users.get(user_id)
if not user:
return []
if user.role == Role.ADMIN or user.role == Role.MEMBER:
return ['all']
if 'all' in user.grants and user.grants['all'].status == Status.ENABLED and user.grants['all'].expires_at > datetime.now(timezone.utc):
return ['all']
return [gate for gate, grant in user.grants.items() if grant.status == Status.ENABLED and grant.expires_at > datetime.now(timezone.utc)]

View File

@@ -1,5 +1,7 @@
from .gates_repository import GatesRepository
from .users_repository import UsersRepository
__all__ = [
"GatesRepository"
"GatesRepository",
"UsersRepository",
]

View File

@@ -0,0 +1,33 @@
import json
from models import User
class UsersRepository:
def __init__(self, json_path: str = "./data/users.json"):
self._json_path = json_path
self._users: dict[str, User] = self._load_users()
def _load_users(self) -> dict[str, User]:
try:
with open(self._json_path, "r") as f:
data = json.load(f)
return {uid: User.from_dict(uid, info) for uid, info in data.items()}
except Exception:
return {}
def save(self, uid: str, user: User) -> None:
self._users[uid] = user
self._dump_users()
def _dump_users(self) -> None:
with open(self._json_path, "w") as f:
json.dump({uid: user.to_dict() for uid, user in self._users.items()}, f, indent=2)
def delete(self, user: User) -> None:
del self._users[user.id]
self._dump_users()
def get_by_uid(self, uid: str) -> User | None:
return self._users.get(uid)
def get_all(self) -> list[User]:
return list(self._users.values())

View File

@@ -1,7 +1,9 @@
from .avconnect_service import AVConnectService
from .gates_service import GatesService
from .users_service import UsersService
__all__ = [
"AVConnectService",
"GatesService"
"GatesService",
"UsersService"
]

View File

@@ -1,23 +1,22 @@
from models import Credential, Status
from repository import GatesRepository
from services import AVConnectService
from services import AVConnectService, UsersService
class GatesService:
def __init__(self, gate_repository: GatesRepository, avconnect_service: AVConnectService):
self.gate_repository = gate_repository
self.avconnect_service = avconnect_service
def __init__(self, gate_repository: GatesRepository, avconnect_service: AVConnectService, user_service: UsersService):
self._gate_repository = gate_repository
self._avconnect_service = avconnect_service
self._user_service = user_service
def open_gate(self, gate_key: str, credentials: Credential) -> bool:
gate = self.gate_repository.get_by_key(gate_key)
def open_gate(self, gate_key: str, uid: str) -> bool:
gate = self._gate_repository.get_by_key(gate_key)
if not gate or gate.status == Status.DISABLED:
return False
try:
return self.avconnect_service.open_gate_by_id(gate.id, credentials)
except Exception as e:
print(f"Failed to open gate {gate.name}: {e}")
return False
credentials = self._user_service.get_credentials(uid, gate_key)
return self._avconnect_service.open_gate_by_id(gate.id, credentials)
def get_name(self, gate_key: str) -> str | None:
gate = self.gate_repository.get_by_key(gate_key)
gate = self._gate_repository.get_by_key(gate_key)
return gate.name if gate else None

View File

@@ -0,0 +1,125 @@
from datetime import datetime, timezone
from models import Status, Role, Credential, Grant, User
from repository import UsersRepository
class UsersService:
def __init__(self, users_repository: UsersRepository):
self._users_repository = users_repository
def add_user(self, uid: str) -> User:
user = self._users_repository.get_by_uid(uid)
if not user:
user = User(uid)
self._users_repository.save(uid, user)
return user
def get_user(self, uid: str) -> User:
user = self._users_repository.get_by_uid(uid)
if not user:
raise Exception("User not found")
return user
def get_role(self, uid: str) -> Role:
return self.get_user(uid).role
def get_credentials(self, uid: str, gate_key: str) -> Credential | None:
user = self._users_repository.get_by_uid(uid)
if user.role in (Role.ADMIN, Role.MEMBER):
creds = user.credentials
if not creds:
raise Exception("Please set your credentials with `/setcredentials` first")
elif user.role == Role.GUEST and self.can_open_gate(uid, gate_key):
grantor = self.get_grantor(user, gate_key)
if not grantor:
raise Exception("No valid grantor available.")
creds = grantor.credentials
if not creds:
raise Exception("No valid grantor credentials available.")
self.update_grant_last_used(uid, gate_key)
# try:
# await context.bot.send_message(chat_id=grantor, text=f"Guest {uid} opened {gate_name}")
# except Exception as e:
# print(f"Failed to notify {grantor} that guest {uid} opened {gate_name}: {e}")
else:
raise Exception("Access denied.")
def set_credentials(self, uid: str, credentials: Credential) -> bool:
user = self._users_repository.get_by_uid(uid)
if user:
user.credentials = credentials
self._users_repository.save(uid, user)
return True
return False
def can_open_gate(self, uid: str, gate_key: str) -> bool:
user = self._users_repository.get_by_uid(uid)
if not user or user.status != Status.ENABLED:
return False
if user.role == Role.ADMIN or user.role == Role.MEMBER:
return True
grant = user.grants.get(gate_key)
if not grant or grant.status != Status.ENABLED:
return False
if grant.expires_at <= datetime.now(timezone.utc):
return False
return True
def has_grants(self, uid: str) -> bool:
user = self._users_repository.get_by_uid(uid)
if not user or user.status != Status.ENABLED:
return False
if user.role == Role.ADMIN or user.role == Role.MEMBER:
return True
return any(grant.status == Status.ENABLED and grant.expires_at > datetime.now(timezone.utc) for grant in
user.grants.values())
def get_grantor(self, user: User, gate: str) -> User | None:
grant = user.grants.get(gate)
return self._users_repository.get_by_uid(grant.grantor) if grant else None
def get_admins(self) -> list[str]:
return [user.id for user in self._users_repository.get_all() if user.role == Role.ADMIN]
def grant_access(self, uid: str, gate: str, expires_at: datetime, grantor_id: str) -> bool:
user = self._users_repository.get_by_uid(uid)
if not user:
return False
user.grants[gate] = Grant(grantor_id, expires_at)
self._users_repository.save(uid, user)
return True
def revoke_access(self, uid: str, gate: str) -> bool:
user = self._users_repository.get_by_uid(uid)
if not user:
return False
if gate in user.grants:
del user.grants[gate]
self._users_repository.save(uid, user)
return True
return False
def update_grant_last_used(self, uid: str, gate_key: str) -> bool:
user = self._users_repository.get_by_uid(uid)
if not user:
return False
grant = user.grants.get(gate_key)
if not grant:
return False
grant.last_used_at = datetime.now(timezone.utc)
self._users_repository.save(uid, user)
return True
def get_granted_gates(self, uid: str) -> list[str]:
user = self._users_repository.get_by_uid(uid)
if not user:
return []
if user.role == Role.ADMIN or user.role == Role.MEMBER:
return ['all']
if 'all' in user.grants and user.grants['all'].status == Status.ENABLED and user.grants[
'all'].expires_at > datetime.now(timezone.utc):
return ['all']
return [gate for gate, grant in user.grants.items() if
grant.status == Status.ENABLED and grant.expires_at > datetime.now(timezone.utc)]

View File

@@ -0,0 +1,13 @@
{
"12345": {
"id": "12345",
"username": "Pippo",
"fullname": "Pippo Franco",
"role": "admin",
"credentials": {
"username": "pippo.franco",
"password": "pippo12345"
},
"status": 1
}
}