mirror of
https://github.com/Noettore/lagomareGateKeeperBot.git
synced 2025-10-14 19:16:40 +02:00
208 lines
7.5 KiB
Python
208 lines
7.5 KiB
Python
import json
|
|
from enum import Enum
|
|
from datetime import datetime, timezone
|
|
from .status import Status
|
|
from .credential import Credential
|
|
|
|
class Role(Enum):
|
|
ADMIN = "admin"
|
|
MEMBER = "member"
|
|
GUEST = "guest"
|
|
|
|
class Grant:
|
|
def __init__(
|
|
self,
|
|
grantor: str,
|
|
expires_at: datetime,
|
|
granted_at: datetime | None = None,
|
|
last_used_at: datetime | None = None,
|
|
status: Status = Status.ENABLED
|
|
):
|
|
self.grantor = grantor
|
|
self.granted_at = granted_at or datetime.now(timezone.utc)
|
|
self.expires_at = expires_at
|
|
self.last_used_at = last_used_at
|
|
self.status = status
|
|
|
|
def to_dict(self) -> dict:
|
|
return {
|
|
"grantor": self.grantor,
|
|
"granted_at": self.granted_at.isoformat(),
|
|
"expires_at": self.expires_at.isoformat(),
|
|
"last_used_at": self.last_used_at.isoformat() if self.last_used_at else None,
|
|
"status": self.status.value
|
|
}
|
|
|
|
@classmethod
|
|
def from_dict(cls, data: dict):
|
|
return cls(
|
|
grantor=data.get("grantor", ""),
|
|
expires_at=datetime.fromisoformat(data["expires_at"].replace("Z", "+00:00")),
|
|
granted_at=datetime.fromisoformat(data["granted_at"].replace("Z", "+00:00")) if data.get("granted_at") else None,
|
|
last_used_at=datetime.fromisoformat(data["last_used_at"].replace("Z", "+00:00")) if data.get("last_used_at") else None,
|
|
status=Status(data.get("status", Status.ENABLED))
|
|
)
|
|
|
|
class User:
|
|
def __init__(
|
|
self,
|
|
id: str,
|
|
username: str,
|
|
fullname: str,
|
|
role: Role = Role.GUEST,
|
|
credentials: Credential | None = None,
|
|
grants: dict[str, Grant] | None = None,
|
|
status: Status = Status.ENABLED
|
|
):
|
|
self.id = id
|
|
self.username = username
|
|
self.fullname = fullname
|
|
self.role = role if isinstance(role, Role) else Role(role)
|
|
self.credentials = credentials or Credential("", "")
|
|
self.grants = grants or {}
|
|
self.status = status if isinstance(status, Status) else Status(status)
|
|
|
|
def to_dict(self) -> dict:
|
|
return {
|
|
"id": self.id,
|
|
"username": self.username,
|
|
"fullname": self.fullname,
|
|
"role": self.role.value,
|
|
"credentials": self.credentials.to_dict(),
|
|
"grants": {gate: grant.to_dict() for gate, grant in self.grants.items()},
|
|
"status": self.status.value
|
|
}
|
|
|
|
@classmethod
|
|
def from_dict(cls, id: str, data: dict):
|
|
credentials = Credential.from_dict(data.get("credentials", {}))
|
|
grants = {gate: Grant.from_dict(grant) for gate, grant in data.get("grants", {}).items()}
|
|
return cls(
|
|
id=id,
|
|
username=data.get("username", ""),
|
|
fullname=data.get("fullname", ""),
|
|
role=Role(data.get("role", Role.GUEST)),
|
|
credentials=credentials,
|
|
grants=grants,
|
|
status=Status(data.get("status", Status.ENABLED))
|
|
)
|
|
|
|
class Users:
|
|
def __init__(self, json_path: str = "./data/users.json"):
|
|
self._json_path = json_path
|
|
self._users: dict[str, User] = self._load_users()
|
|
|
|
def _load_users(self) -> dict[str, User]:
|
|
try:
|
|
with open(self._json_path, "r") as f:
|
|
data = json.load(f)
|
|
return {uid: User.from_dict(uid, info) for uid, info in data.items()}
|
|
except Exception:
|
|
return {}
|
|
|
|
def _save_users(self) -> None:
|
|
with open(self._json_path, "w") as f:
|
|
json.dump({uid: user.to_dict() for uid, user in self._users.items()}, f, indent=2)
|
|
|
|
def update_user(self, id: str, username: str | None, fullname: str | None) -> bool:
|
|
if not id or not username or not fullname:
|
|
return False
|
|
if id in self._users:
|
|
self._users[id].username = username
|
|
self._users[id].fullname = fullname
|
|
else:
|
|
self._users[id] = User(id, username, fullname)
|
|
self._save_users()
|
|
return True
|
|
|
|
def get_username(self, id: str) -> str | None:
|
|
return self._users[id].username if id in self._users else None
|
|
|
|
def get_fullname(self, id: str) -> str | None:
|
|
return self._users[id].fullname if id in self._users else None
|
|
|
|
def get_status(self, id: str) -> Status:
|
|
return self._users[id].status if id in self._users else Status.DISABLED
|
|
|
|
def get_role(self, id: str) -> Role:
|
|
return self._users[id].role if id in self._users else Role.GUEST
|
|
|
|
def get_credentials(self, id: str) -> Credential | None:
|
|
return self._users[id].credentials if id in self._users else None
|
|
|
|
def set_credentials(self, id: str, credentials: Credential) -> bool:
|
|
if id in self._users:
|
|
self._users[id].credentials = credentials
|
|
self._save_users()
|
|
return True
|
|
return False
|
|
|
|
def can_open_gate(self, id: str, gate: str) -> bool:
|
|
user = self._users.get(id)
|
|
if not user or user.status != Status.ENABLED:
|
|
return False
|
|
if user.role == Role.ADMIN or user.role == Role.MEMBER:
|
|
return True
|
|
grant = user.grants.get(gate)
|
|
if not grant or grant.status != Status.ENABLED:
|
|
return False
|
|
if grant.expires_at <= datetime.now(timezone.utc):
|
|
return False
|
|
return True
|
|
|
|
def has_grants(self, id: str) -> bool:
|
|
user = self._users.get(id)
|
|
if not user or user.status != Status.ENABLED:
|
|
return False
|
|
if user.role == Role.ADMIN or user.role == Role.MEMBER:
|
|
return True
|
|
return any(grant.status == Status.ENABLED and grant.expires_at > datetime.now(timezone.utc) for grant in user.grants.values())
|
|
|
|
def get_grantor(self, id: str, gate: str) -> str | None:
|
|
user = self._users.get(id)
|
|
if not user:
|
|
return None
|
|
grant = user.grants.get(gate)
|
|
return grant.grantor if grant else None
|
|
|
|
def get_admins(self) -> list[str]:
|
|
return [uid for uid, user in self._users.items() if user.role == Role.ADMIN]
|
|
|
|
def grant_access(self, id: str, gate: str, expires_at: datetime, grantor_id: str) -> bool:
|
|
user = self._users.get(id)
|
|
if not user:
|
|
return False
|
|
user.grants[gate] = Grant(grantor_id, expires_at)
|
|
self._save_users()
|
|
return True
|
|
|
|
def revoke_access(self, id: str, gate: str) -> bool:
|
|
user = self._users.get(id)
|
|
if not user:
|
|
return False
|
|
if gate in user.grants:
|
|
del user.grants[gate]
|
|
self._save_users()
|
|
return True
|
|
return False
|
|
|
|
def update_grant_last_used(self, user_id: str, gate: str) -> bool:
|
|
user = self._users.get(user_id)
|
|
if not user:
|
|
return False
|
|
grant = user.grants.get(gate)
|
|
if not grant:
|
|
return False
|
|
grant.last_used_at = datetime.now(timezone.utc)
|
|
self._save_users()
|
|
return True
|
|
|
|
def get_granted_gates(self, user_id: str) -> list[str]:
|
|
user = self._users.get(user_id)
|
|
if not user:
|
|
return []
|
|
if user.role == Role.ADMIN or user.role == Role.MEMBER:
|
|
return ['all']
|
|
if 'all' in user.grants and user.grants['all'].status == Status.ENABLED and user.grants['all'].expires_at > datetime.now(timezone.utc):
|
|
return ['all']
|
|
return [gate for gate, grant in user.grants.items() if grant.status == Status.ENABLED and grant.expires_at > datetime.now(timezone.utc)] |