Files
lagomareGateKeeperBot/src/services/users_service.py

126 lines
4.9 KiB
Python

from datetime import datetime, timezone
from src.models import Status, Role, Credential, Grant, User
from src.repository import UsersRepository
class UsersService:
def __init__(self, users_repository: UsersRepository):
self._users_repository = users_repository
def add_user(self, uid: str) -> User:
user = self._users_repository.get_by_uid(uid)
if not user:
user = User(uid)
self._users_repository.save(uid, user)
return user
def get_user(self, uid: str) -> User:
user = self._users_repository.get_by_uid(uid)
if not user:
raise Exception("User not found")
return user
def get_role(self, uid: str) -> Role:
return self.get_user(uid).role
def get_credentials(self, uid: str, gate_key: str) -> Credential | None:
user = self._users_repository.get_by_uid(uid)
if user.role in (Role.ADMIN, Role.MEMBER):
creds = user.credentials
if not creds:
raise Exception("Please set your credentials with `/setcredentials` first")
elif user.role == Role.GUEST and self.can_open_gate(uid, gate_key):
grantor = self.get_grantor(user, gate_key)
if not grantor:
raise Exception("No valid grantor available.")
creds = grantor.credentials
if not creds:
raise Exception("No valid grantor credentials available.")
self.update_grant_last_used(uid, gate_key)
# try:
# await context.bot.send_message(chat_id=grantor, text=f"Guest {uid} opened {gate_name}")
# except Exception as e:
# print(f"Failed to notify {grantor} that guest {uid} opened {gate_name}: {e}")
else:
raise Exception("Access denied.")
def set_credentials(self, uid: str, credentials: Credential) -> bool:
user = self._users_repository.get_by_uid(uid)
if user:
user.credentials = credentials
self._users_repository.save(uid, user)
return True
return False
def can_open_gate(self, uid: str, gate_key: str) -> bool:
user = self._users_repository.get_by_uid(uid)
if not user or user.status != Status.ENABLED:
return False
if user.role == Role.ADMIN or user.role == Role.MEMBER:
return True
grant = user.grants.get(gate_key)
if not grant or grant.status != Status.ENABLED:
return False
if grant.expires_at <= datetime.now(timezone.utc):
return False
return True
def has_grants(self, uid: str) -> bool:
user = self._users_repository.get_by_uid(uid)
if not user or user.status != Status.ENABLED:
return False
if user.role == Role.ADMIN or user.role == Role.MEMBER:
return True
return any(grant.status == Status.ENABLED and grant.expires_at > datetime.now(timezone.utc) for grant in
user.grants.values())
def get_grantor(self, user: User, gate: str) -> User | None:
grant = user.grants.get(gate)
return self._users_repository.get_by_uid(grant.grantor) if grant else None
def get_admins(self) -> list[str]:
return [user.uid for user in self._users_repository.get_all() if user.role == Role.ADMIN]
def grant_access(self, uid: str, gate: str, expires_at: datetime, grantor_id: str) -> bool:
user = self._users_repository.get_by_uid(uid)
if not user:
return False
user.grants[gate] = Grant(grantor_id, expires_at)
self._users_repository.save(uid, user)
return True
def revoke_access(self, uid: str, gate: str) -> bool:
user = self._users_repository.get_by_uid(uid)
if not user:
return False
if gate in user.grants:
del user.grants[gate]
self._users_repository.save(uid, user)
return True
return False
def update_grant_last_used(self, uid: str, gate_key: str) -> bool:
user = self._users_repository.get_by_uid(uid)
if not user:
return False
grant = user.grants.get(gate_key)
if not grant:
return False
grant.last_used_at = datetime.now(timezone.utc)
self._users_repository.save(uid, user)
return True
def get_granted_gates(self, uid: str) -> list[str]:
user = self._users_repository.get_by_uid(uid)
if not user:
return []
if user.role == Role.ADMIN or user.role == Role.MEMBER:
return ['all']
if 'all' in user.grants and user.grants['all'].status == Status.ENABLED and user.grants[
'all'].expires_at > datetime.now(timezone.utc):
return ['all']
return [gate for gate, grant in user.grants.items() if
grant.status == Status.ENABLED and grant.expires_at > datetime.now(timezone.utc)]