mirror of
https://github.com/Noettore/lagomareGateKeeperBot.git
synced 2025-10-15 03:26:40 +02:00
Complete refactor... Some issue...
This commit is contained in:
256
users.py
256
users.py
@@ -1,65 +1,213 @@
|
||||
import json
|
||||
from enum import Enum
|
||||
from datetime import datetime, timezone
|
||||
from commons import Status, Credential
|
||||
|
||||
USER_FILE = "./data/users.json"
|
||||
class Role(Enum):
|
||||
ADMIN = "admin"
|
||||
MEMBER = "member"
|
||||
GUEST = "guest"
|
||||
|
||||
def load_users():
|
||||
try:
|
||||
with open(USER_FILE, "r") as f:
|
||||
return json.load(f)
|
||||
except FileNotFoundError:
|
||||
return {}
|
||||
|
||||
def save_users(data):
|
||||
with open(USER_FILE, "w") as f:
|
||||
json.dump(data, f)
|
||||
|
||||
def update_user(user_id, username, fullname):
|
||||
has_changes = False
|
||||
users = load_users()
|
||||
user = users.get(str(user_id), {})
|
||||
class _Grant:
|
||||
def __init__(self, grantor: str, expires_at: datetime, granted_at: datetime = datetime.now(), last_used_at: datetime = None, status: int | Status = Status.ENABLED):
|
||||
self._grantor: str = grantor
|
||||
self._granted_at: datetime = granted_at
|
||||
self._expires_at = expires_at
|
||||
self._last_used_at = last_used_at
|
||||
self._status: Status = status
|
||||
|
||||
if user.get("username", "") != username:
|
||||
has_changes = True
|
||||
user["username"] = username
|
||||
if user.get("fullname", "") != fullname:
|
||||
has_changes = True
|
||||
user["fullname"] = fullname
|
||||
if not user.get("role"):
|
||||
has_changes = True
|
||||
user["role"] = "guest"
|
||||
def __dict__(self):
|
||||
return {
|
||||
"grantor": self._grantor,
|
||||
"granted_at": self._granted_at.isoformat(),
|
||||
"expires_at": self._expires_at.isoformat(),
|
||||
"last_used_at": self._last_used_at.isoformat() if self._last_used_at else None,
|
||||
"status": self._status.value
|
||||
}
|
||||
|
||||
@property
|
||||
def grantor(self) -> str:
|
||||
return self._grantor
|
||||
|
||||
if has_changes:
|
||||
users[str(user_id)] = user
|
||||
save_users(users)
|
||||
@grantor.setter
|
||||
def grantor(self, grantor: str):
|
||||
self._grantor = grantor
|
||||
|
||||
@property
|
||||
def granted_at(self) -> datetime:
|
||||
return self._granted_at
|
||||
|
||||
@property
|
||||
def expires_at(self) -> datetime:
|
||||
return self._expires_at
|
||||
|
||||
@property
|
||||
def last_used_at(self) -> datetime:
|
||||
return self._last_used_at
|
||||
|
||||
@property
|
||||
def status(self) -> Status:
|
||||
return self._status
|
||||
|
||||
class _User:
|
||||
def __init__(self, id: str, username: str, fullname: str, role: str | Role = Role.GUEST, credentials: dict = None, grants: dict = None, status: int | Status = Status.ENABLED):
|
||||
self._id: str = id
|
||||
self._username: str = username
|
||||
self._fullname: str = fullname
|
||||
self._role: Role = role if isinstance(role, Role) else Role(role)
|
||||
self._credentials: Credential = Credential(**credentials)
|
||||
self._grants: dict[str, _Grant] = {gate: _Grant(**grant) for gate, grant in grants.items()} if grants else {}
|
||||
self._status: Status = status if isinstance(status, Status) else Status(status)
|
||||
|
||||
def __dict__(self):
|
||||
return {
|
||||
"id": self._id,
|
||||
"username": self._username,
|
||||
"fullname": self._fullname,
|
||||
"role": self._role.value,
|
||||
"credentials": self._credentials.__dict__(),
|
||||
"grants": {gate: grant.__dict__() for gate, grant in self._grants.items()},
|
||||
"status": self._status.value
|
||||
}
|
||||
|
||||
@property
|
||||
def id(self) -> str:
|
||||
return self._id
|
||||
|
||||
@property
|
||||
def username(self) -> str:
|
||||
return self._username
|
||||
|
||||
@username.setter
|
||||
def username(self, username: str):
|
||||
self._username = username
|
||||
|
||||
@property
|
||||
def fullname(self) -> str:
|
||||
return self._fullname
|
||||
|
||||
@fullname.setter
|
||||
def fullname(self, fullname: str):
|
||||
self._fullname = fullname
|
||||
|
||||
@property
|
||||
def role(self) -> Role:
|
||||
return self._role
|
||||
|
||||
@role.setter
|
||||
def role(self, role: str | Role):
|
||||
self._role = role if isinstance(role, Role) else Role(role)
|
||||
|
||||
@property
|
||||
def credentials(self) -> Credential:
|
||||
return self._credentials
|
||||
|
||||
@credentials.setter
|
||||
def credentials(self, credentials: Credential):
|
||||
self._credentials = credentials
|
||||
|
||||
@property
|
||||
def grants(self) -> dict[str, _Grant]:
|
||||
return self._grants
|
||||
|
||||
@property
|
||||
def status(self) -> Status:
|
||||
return self._status
|
||||
|
||||
@status.setter
|
||||
def status(self, status: int | Status):
|
||||
self._status = status if isinstance(status, Status) else Status(status)
|
||||
|
||||
def grant(self, gate: str, grant: _Grant):
|
||||
self._grants[gate] = grant
|
||||
|
||||
def revoke(self, gate: str):
|
||||
self._grants.pop(gate, None)
|
||||
|
||||
|
||||
def get_role(user_id):
|
||||
return load_users().get(str(user_id), {}).get("role", "guest")
|
||||
class Users:
|
||||
def __init__(self, json_path: str = "./data/users.json"):
|
||||
self._json_path: str = json_path
|
||||
self._users = self._load_users()
|
||||
|
||||
def get_username(user_id):
|
||||
return load_users().get(str(user_id), {}).get("username", None)
|
||||
def _load_users(self) -> dict[str, _User]:
|
||||
try:
|
||||
with open(self._json_path, "r") as f:
|
||||
return {uid: _User(**info) for uid, info in json.load(f)}
|
||||
except Exception:
|
||||
return {}
|
||||
|
||||
def get_fullname(user_id):
|
||||
return load_users().get(str(user_id), {}).get("fullname", None)
|
||||
|
||||
def set_credentials(user_id, username, password):
|
||||
users = load_users()
|
||||
user = users.get(str(user_id), {})
|
||||
user["credentials"] = {"username": username, "password": password}
|
||||
users[str(user_id)] = user
|
||||
save_users(users)
|
||||
|
||||
def get_credentials(user_id):
|
||||
return load_users().get(str(user_id), {}).get("credentials")
|
||||
|
||||
def get_grantor_credentials(user_id, gate):
|
||||
from access_control import load_access
|
||||
access = load_access().get(str(user_id), {})
|
||||
entry = access.get(gate) or access.get("all")
|
||||
if not entry:
|
||||
def _save_users(self) -> None:
|
||||
with open(self._json_path, "w") as f:
|
||||
json.dump({uid: user.__dict__ for uid, user in self._users.items()}, f, default=str)
|
||||
|
||||
def update_user(self, id: str, username: str, fullname: str) -> bool:
|
||||
if id in self._users.keys():
|
||||
self._users[id].username = username
|
||||
self._users[id].fullname = fullname
|
||||
else:
|
||||
self._users[id] = _User(id, username, fullname)
|
||||
self._save_users()
|
||||
return True
|
||||
|
||||
def get_username(self, id: str) -> str:
|
||||
if id in self._users.keys():
|
||||
return self._users[id].username
|
||||
return None
|
||||
grantor_id = entry.get("grantor")
|
||||
return get_credentials(grantor_id)
|
||||
|
||||
def get_fullname(self, id: str) -> str:
|
||||
if id in self._users.keys():
|
||||
return self._users[id].fullname
|
||||
return None
|
||||
|
||||
def get_status(self, id: str) -> Status:
|
||||
if id in self._users.keys():
|
||||
return self._users[id].status
|
||||
return Status.DISABLED
|
||||
|
||||
def get_role(self, id: str) -> Role:
|
||||
if id in self._users.keys():
|
||||
return self._users[id].role
|
||||
return Role.GUEST
|
||||
|
||||
def get_admins():
|
||||
return [uid for uid, u in load_users().items() if u.get("role") == "admin"]
|
||||
def get_credentials(self, id: str) -> Credential:
|
||||
if id in self._users.keys():
|
||||
return self._users[id].credentials
|
||||
return None
|
||||
|
||||
def set_credentials(self, id: str, credentials: Credential) -> bool:
|
||||
if id in self._users.keys():
|
||||
self._users[id].credentials = credentials
|
||||
return True
|
||||
return False
|
||||
|
||||
def can_open_gate(self, id: str, gate: str) -> bool:
|
||||
if id in self._users.keys():
|
||||
if gate in self._users[id].grants.keys():
|
||||
if self._users[id].grants[gate].status == Status.ENABLED:
|
||||
if self._users[id].grants[gate].expires_at > datetime.now(timezone.utc):
|
||||
return True
|
||||
return False
|
||||
|
||||
def get_grantor(self, id: str, gate: str) -> str:
|
||||
if id in self._users.keys():
|
||||
if gate in self._users[id].grants.keys():
|
||||
return self._users[id].grants[gate].grantor
|
||||
return None
|
||||
|
||||
def get_admins(self) -> list[str]:
|
||||
return [uid for uid, user in self._users.items() if user.role == Role.ADMIN]
|
||||
|
||||
def grant_access(self, id: str, gate: str, expires_at: datetime, grantor_id: str) -> bool:
|
||||
if id in self._users.keys():
|
||||
self._users[id].grant(gate, _Grant(grantor_id, expires_at))
|
||||
self._save_users()
|
||||
return True
|
||||
return False
|
||||
|
||||
def revoke_access(self, id: str, gate: str) -> bool:
|
||||
if id in self._users.keys():
|
||||
self._users[id].revoke(gate)
|
||||
self._save_users()
|
||||
return True
|
||||
return False
|
||||
|
Reference in New Issue
Block a user