mirror of
https://github.com/Noettore/lagomareGateKeeperBot.git
synced 2025-10-16 20:16:39 +02:00
Refactor gates creating service, repository and tests
This commit is contained in:
18
src/handlers/__init__.py
Normal file
18
src/handlers/__init__.py
Normal file
@@ -0,0 +1,18 @@
|
||||
from .access import requestaccess, grantaccess
|
||||
from .credentials import setcredentials
|
||||
from .gates import opengate, open_gate_menu, handle_gate_open_callback
|
||||
from .main import handle_main_menu_callback, post_init, start
|
||||
from .users import updateuser
|
||||
|
||||
__all__ = [
|
||||
"grantaccess",
|
||||
"handle_main_menu_callback",
|
||||
"handle_gate_open_callback",
|
||||
"opengate",
|
||||
"open_gate_menu",
|
||||
"post_init",
|
||||
"requestaccess",
|
||||
"setcredentials",
|
||||
"start",
|
||||
"updateuser"
|
||||
]
|
48
src/handlers/access.py
Normal file
48
src/handlers/access.py
Normal file
@@ -0,0 +1,48 @@
|
||||
from telegram import Update
|
||||
from telegram.ext import ContextTypes
|
||||
from datetime import datetime
|
||||
from models import Users, Role
|
||||
|
||||
async def requestaccess(update: Update, context: ContextTypes.DEFAULT_TYPE, users: Users):
|
||||
assert update.effective_user is not None
|
||||
|
||||
user_id = str(update.effective_user.id)
|
||||
role = users.get_role(user_id)
|
||||
if role != Role.GUEST:
|
||||
if update.callback_query:
|
||||
await update.callback_query.answer("Only guests can request access.")
|
||||
elif update.message:
|
||||
return await update.message.reply_text("Only guests can request access.")
|
||||
requester = users.get_fullname(user_id) or users.get_username(user_id)
|
||||
text = (f"Access request: {requester} ({user_id}) requests access.\nUse `/grantaccess {user_id} <gate_id|all> YYYY-MM-DDTHH:MM:SSZ` to grant access.")
|
||||
if update.callback_query:
|
||||
await update.callback_query.answer("Your request has been submitted.")
|
||||
elif update.message:
|
||||
return await update.message.reply_text("Your request has been submitted.")
|
||||
admins = users.get_admins()
|
||||
for admin_id in admins:
|
||||
try:
|
||||
await context.bot.send_message(chat_id=admin_id, text=text, parse_mode="Markdown")
|
||||
except Exception as e:
|
||||
print(f"Failed to notify {admin_id} that guest {user_id} requested access: {e}")
|
||||
|
||||
async def grantaccess(update: Update, context: ContextTypes.DEFAULT_TYPE, users: Users):
|
||||
assert update.effective_user is not None
|
||||
assert update.message is not None
|
||||
assert context.args is not None
|
||||
|
||||
grantor_id = str(update.effective_user.id)
|
||||
if users.get_role(grantor_id) != Role.ADMIN:
|
||||
return await update.message.reply_text("Only admins can grant access.")
|
||||
try:
|
||||
user_id = context.args[0]
|
||||
gate = context.args[1]
|
||||
expires_at = context.args[2]
|
||||
users.grant_access(user_id, gate, datetime.fromisoformat(expires_at.replace("Z", "+00:00")), grantor_id=grantor_id)
|
||||
await update.message.reply_text(f"Access to {gate} granted to user {user_id} until {expires_at}")
|
||||
try:
|
||||
await context.bot.send_message(chat_id=user_id, text=f"Access granted to gate {gate} up to {expires_at}")
|
||||
except Exception as e:
|
||||
print(f"Failed to notify {user_id} that admin {grantor_id} granted access for {gate} up to {expires_at}: {e}")
|
||||
except Exception:
|
||||
await update.message.reply_text("Usage: `/grantaccess <user_id> <gate_id|all> <expires_at:YYYY-MM-DDTHH:MM:SSZ>`")
|
20
src/handlers/credentials.py
Normal file
20
src/handlers/credentials.py
Normal file
@@ -0,0 +1,20 @@
|
||||
from telegram import Update
|
||||
from telegram.ext import ContextTypes
|
||||
from models import Users, Credential, Role
|
||||
|
||||
async def setcredentials(update: Update, context: ContextTypes.DEFAULT_TYPE, users: Users):
|
||||
assert update.effective_user is not None
|
||||
assert update.message is not None
|
||||
assert context.args is not None
|
||||
|
||||
user_id = str(update.effective_user.id)
|
||||
args = context.args
|
||||
role = users.get_role(user_id)
|
||||
if role not in (Role.ADMIN, Role.MEMBER):
|
||||
return await update.message.reply_text("Only members or admins can set credentials")
|
||||
if len(args) != 2:
|
||||
return await update.message.reply_text("Usage: `/setcredentials <username> <password>`")
|
||||
if users.set_credentials(user_id, Credential(args[0], args[1])):
|
||||
await update.message.reply_text("Credentials saved")
|
||||
else:
|
||||
await update.message.reply_text("Error saving credentials")
|
109
src/handlers/gates.py
Normal file
109
src/handlers/gates.py
Normal file
@@ -0,0 +1,109 @@
|
||||
from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup
|
||||
from telegram.ext import ContextTypes
|
||||
from models import Users, Role
|
||||
from services import GatesService
|
||||
|
||||
async def opengate(update: Update, context: ContextTypes.DEFAULT_TYPE, users: Users, gates_service: GatesService):
|
||||
assert update.effective_user is not None
|
||||
assert update.message is not None
|
||||
|
||||
user_id = str(update.effective_user.id)
|
||||
args = context.args
|
||||
if not args:
|
||||
return await update.message.reply_text("Usage: `/opengate <gate>`")
|
||||
gate = args[0]
|
||||
gate_name = gates_service.get_name(gate)
|
||||
role = users.get_role(user_id)
|
||||
if role in (Role.ADMIN, Role.MEMBER):
|
||||
creds = users.get_credentials(user_id)
|
||||
if not creds:
|
||||
return await update.message.reply_text("Please set your credentials with `/setcredentials` first")
|
||||
elif role == Role.GUEST and users.can_open_gate(user_id, gate):
|
||||
grantor = users.get_grantor(user_id, gate)
|
||||
assert grantor is not None
|
||||
|
||||
creds = users.get_credentials(grantor)
|
||||
if not grantor:
|
||||
return await update.message.reply_text("No valid grantor available.")
|
||||
if not creds:
|
||||
return await update.message.reply_text("No valid grantor credentials available.")
|
||||
users.update_grant_last_used(user_id, gate)
|
||||
try:
|
||||
await context.bot.send_message(chat_id=grantor, text=f"Guest {user_id} opened {gate_name}")
|
||||
except Exception as e:
|
||||
print(f"Failed to notify {grantor} that guest {user_id} opened {gate_name}: {e}")
|
||||
else:
|
||||
return await update.message.reply_text("Access denied.")
|
||||
|
||||
if gates_service.open_gate(gate, creds):
|
||||
return await update.message.reply_text(f"Gate {gate_name} opened!")
|
||||
await update.message.reply_text(f"ERROR: Cannot open gate {gate_name}")
|
||||
|
||||
async def open_gate_menu(update: Update, context: ContextTypes.DEFAULT_TYPE, users: Users, gates: Gates):
|
||||
assert update.effective_user is not None
|
||||
|
||||
user_id = str(update.effective_user.id)
|
||||
granted_gates = users.get_granted_gates(user_id)
|
||||
if not granted_gates:
|
||||
if update.callback_query:
|
||||
await update.callback_query.answer("You have no gates to open.")
|
||||
elif update.message:
|
||||
return await update.message.reply_text("You have no gates to open.")
|
||||
|
||||
if 'all' in granted_gates:
|
||||
granted_gates = gates.get_all_enabled()
|
||||
# Show a list of available gates as buttons
|
||||
keyboard = []
|
||||
for gate_id in granted_gates:
|
||||
gate_name = gates.get_name(gate_id)
|
||||
assert gate_name is not None
|
||||
keyboard.append([InlineKeyboardButton(gate_name, callback_data=f"opengate_{gate_id}")])
|
||||
|
||||
reply_markup = InlineKeyboardMarkup(keyboard)
|
||||
if update.callback_query:
|
||||
await update.callback_query.answer()
|
||||
await update.callback_query.edit_message_text(
|
||||
"Select a gate to open:", reply_markup=reply_markup
|
||||
)
|
||||
elif update.message:
|
||||
await update.message.reply_text("Select a gate to open:", reply_markup=reply_markup)
|
||||
|
||||
async def handle_gate_open_callback(update: Update, context: ContextTypes.DEFAULT_TYPE, users: Users, gates: Gates):
|
||||
assert update.callback_query is not None
|
||||
|
||||
query = update.callback_query
|
||||
user_id = str(query.from_user.id)
|
||||
data = query.data
|
||||
assert data is not None
|
||||
if data.startswith("opengate_"):
|
||||
gate_id = data[len("opengate_") :]
|
||||
gate_name = gates.get_name(gate_id)
|
||||
role = users.get_role(user_id)
|
||||
if role in (Role.ADMIN, Role.MEMBER):
|
||||
creds = users.get_credentials(user_id)
|
||||
if not creds:
|
||||
await query.answer("Please set your credentials with /setcredentials first", show_alert=True)
|
||||
return
|
||||
elif role == Role.GUEST and users.can_open_gate(user_id, gate_id):
|
||||
grantor = users.get_grantor(user_id, gate_id)
|
||||
assert grantor is not None
|
||||
creds = users.get_credentials(grantor)
|
||||
if not grantor or not creds:
|
||||
await query.answer("No valid grantor credentials available.", show_alert=True)
|
||||
return
|
||||
users.update_grant_last_used(user_id, gate_id)
|
||||
try:
|
||||
await context.bot.send_message(chat_id=grantor, text=f"Guest {user_id} opened {gate_name}")
|
||||
except Exception as e:
|
||||
print(f"Failed to notify {grantor} that guest {user_id} opened {gate_name}: {e}")
|
||||
else:
|
||||
# TODO: guest not working
|
||||
await query.answer("Access denied.", show_alert=True)
|
||||
return
|
||||
|
||||
if gates.open_gate(gate_id, creds):
|
||||
await query.answer(f"Gate {gate_name} opened!", show_alert=True)
|
||||
await query.edit_message_text(f"Gate {gate_name} opened!")
|
||||
else:
|
||||
await query.answer(f"ERROR: Cannot open gate {gate_name}", show_alert=True)
|
||||
await query.edit_message_text(f"ERROR: Cannot open gate {gate_name}")
|
50
src/handlers/main.py
Normal file
50
src/handlers/main.py
Normal file
@@ -0,0 +1,50 @@
|
||||
from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup
|
||||
from telegram.ext import Application, ContextTypes
|
||||
from .gates import open_gate_menu
|
||||
from .access import requestaccess
|
||||
from models import Gates, Users, Role
|
||||
from config import BotConfig
|
||||
|
||||
async def post_init(application: Application, bot_config: BotConfig) -> None:
|
||||
await application.bot.set_my_name(bot_config.name)
|
||||
await application.bot.set_my_description(bot_config.description)
|
||||
await application.bot.set_my_short_description(bot_config.short_description)
|
||||
await application.bot.set_my_commands(bot_config.commands)
|
||||
|
||||
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE, users: Users):
|
||||
assert update.effective_user is not None
|
||||
assert update.message is not None
|
||||
|
||||
user_id = str(update.effective_user.id)
|
||||
role = users.get_role(user_id)
|
||||
keyboard = []
|
||||
|
||||
if role == Role.GUEST:
|
||||
# Guests: can request access, or open a gate if granted
|
||||
if users.has_grants(user_id):
|
||||
keyboard.append([InlineKeyboardButton("Open Gate", callback_data="open_gate_menu")])
|
||||
keyboard.append([InlineKeyboardButton("Request Access", callback_data="request_access")])
|
||||
elif role == Role.MEMBER:
|
||||
# Members: can open gates and set credentials
|
||||
keyboard.append([InlineKeyboardButton("Open Gate", callback_data="open_gate_menu")])
|
||||
keyboard.append([InlineKeyboardButton("Set Credentials", callback_data="set_credentials")])
|
||||
elif role == Role.ADMIN:
|
||||
# Admins: can open gates, grant access, and set credentials
|
||||
keyboard.append([InlineKeyboardButton("Open Gate", callback_data="open_gate_menu")])
|
||||
keyboard.append([InlineKeyboardButton("Grant Access", callback_data="grant_access")])
|
||||
keyboard.append([InlineKeyboardButton("Set Credentials", callback_data="set_credentials")])
|
||||
|
||||
reply_markup = InlineKeyboardMarkup(keyboard)
|
||||
await update.message.reply_text(
|
||||
"Welcome to GatekeeperBot! Use the buttons below or commands.",
|
||||
reply_markup=reply_markup
|
||||
)
|
||||
|
||||
async def handle_main_menu_callback(update: Update, context: ContextTypes.DEFAULT_TYPE, users: Users, gates: Gates):
|
||||
query = update.callback_query
|
||||
assert query is not None
|
||||
data = query.data
|
||||
if data == "open_gate_menu":
|
||||
await open_gate_menu(update, context, users=users, gates=gates)
|
||||
elif data == "request_access":
|
||||
await requestaccess(update, context, users=users)
|
11
src/handlers/users.py
Normal file
11
src/handlers/users.py
Normal file
@@ -0,0 +1,11 @@
|
||||
from telegram import Update
|
||||
from telegram.ext import ContextTypes
|
||||
from models import Users
|
||||
|
||||
async def updateuser(update: Update, context: ContextTypes.DEFAULT_TYPE, users: Users):
|
||||
assert update.effective_user is not None
|
||||
|
||||
user_id = str(update.effective_user.id)
|
||||
username = update.effective_user.username
|
||||
fullname = update.effective_user.full_name
|
||||
users.update_user(user_id, username, fullname)
|
12
src/models/__init__.py
Normal file
12
src/models/__init__.py
Normal file
@@ -0,0 +1,12 @@
|
||||
from .credential import Credential
|
||||
from .gate import Gate
|
||||
from .status import Status
|
||||
from .users import Users, Role
|
||||
|
||||
__all__ = [
|
||||
"Credential",
|
||||
"Gate",
|
||||
"Status",
|
||||
"Users",
|
||||
"Role"
|
||||
]
|
12
src/models/credential.py
Normal file
12
src/models/credential.py
Normal file
@@ -0,0 +1,12 @@
|
||||
class Credential:
|
||||
def __init__(self, username: str, password: str):
|
||||
self.username = username
|
||||
self.password = password
|
||||
self.sessionid = None
|
||||
|
||||
def to_dict(self) -> dict:
|
||||
return {"username": self.username, "password": self.password}
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, data: dict):
|
||||
return cls(data.get("username", ""), data.get("password", ""))
|
15
src/models/gate.py
Normal file
15
src/models/gate.py
Normal file
@@ -0,0 +1,15 @@
|
||||
from .status import Status
|
||||
|
||||
class Gate:
|
||||
def __init__(self, id: str, name: str, status: Status = Status.ENABLED):
|
||||
self.id = id
|
||||
self.name = name
|
||||
self.status = status if isinstance(status, Status) else Status(status)
|
||||
|
||||
def to_dict(self):
|
||||
return {"id": self.id, "name": self.name, "status": self.status.value}
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, data: dict):
|
||||
return cls(data["id"], data["name"], Status(data.get("status", Status.ENABLED)))
|
||||
|
5
src/models/status.py
Normal file
5
src/models/status.py
Normal file
@@ -0,0 +1,5 @@
|
||||
from enum import Enum
|
||||
|
||||
class Status(Enum):
|
||||
ENABLED = 1
|
||||
DISABLED = 0
|
208
src/models/users.py
Normal file
208
src/models/users.py
Normal file
@@ -0,0 +1,208 @@
|
||||
import json
|
||||
from enum import Enum
|
||||
from datetime import datetime, timezone
|
||||
from .status import Status
|
||||
from .credential import Credential
|
||||
|
||||
class Role(Enum):
|
||||
ADMIN = "admin"
|
||||
MEMBER = "member"
|
||||
GUEST = "guest"
|
||||
|
||||
class Grant:
|
||||
def __init__(
|
||||
self,
|
||||
grantor: str,
|
||||
expires_at: datetime,
|
||||
granted_at: datetime | None = None,
|
||||
last_used_at: datetime | None = None,
|
||||
status: Status = Status.ENABLED
|
||||
):
|
||||
self.grantor = grantor
|
||||
self.granted_at = granted_at or datetime.now(timezone.utc)
|
||||
self.expires_at = expires_at
|
||||
self.last_used_at = last_used_at
|
||||
self.status = status
|
||||
|
||||
def to_dict(self) -> dict:
|
||||
return {
|
||||
"grantor": self.grantor,
|
||||
"granted_at": self.granted_at.isoformat(),
|
||||
"expires_at": self.expires_at.isoformat(),
|
||||
"last_used_at": self.last_used_at.isoformat() if self.last_used_at else None,
|
||||
"status": self.status.value
|
||||
}
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, data: dict):
|
||||
return cls(
|
||||
grantor=data.get("grantor", ""),
|
||||
expires_at=datetime.fromisoformat(data["expires_at"].replace("Z", "+00:00")),
|
||||
granted_at=datetime.fromisoformat(data["granted_at"].replace("Z", "+00:00")) if data.get("granted_at") else None,
|
||||
last_used_at=datetime.fromisoformat(data["last_used_at"].replace("Z", "+00:00")) if data.get("last_used_at") else None,
|
||||
status=Status(data.get("status", Status.ENABLED))
|
||||
)
|
||||
|
||||
class User:
|
||||
def __init__(
|
||||
self,
|
||||
id: str,
|
||||
username: str,
|
||||
fullname: str,
|
||||
role: Role = Role.GUEST,
|
||||
credentials: Credential | None = None,
|
||||
grants: dict[str, Grant] | None = None,
|
||||
status: Status = Status.ENABLED
|
||||
):
|
||||
self.id = id
|
||||
self.username = username
|
||||
self.fullname = fullname
|
||||
self.role = role if isinstance(role, Role) else Role(role)
|
||||
self.credentials = credentials or Credential("", "")
|
||||
self.grants = grants or {}
|
||||
self.status = status if isinstance(status, Status) else Status(status)
|
||||
|
||||
def to_dict(self) -> dict:
|
||||
return {
|
||||
"id": self.id,
|
||||
"username": self.username,
|
||||
"fullname": self.fullname,
|
||||
"role": self.role.value,
|
||||
"credentials": self.credentials.to_dict(),
|
||||
"grants": {gate: grant.to_dict() for gate, grant in self.grants.items()},
|
||||
"status": self.status.value
|
||||
}
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, id: str, data: dict):
|
||||
credentials = Credential.from_dict(data.get("credentials", {}))
|
||||
grants = {gate: Grant.from_dict(grant) for gate, grant in data.get("grants", {}).items()}
|
||||
return cls(
|
||||
id=id,
|
||||
username=data.get("username", ""),
|
||||
fullname=data.get("fullname", ""),
|
||||
role=Role(data.get("role", Role.GUEST)),
|
||||
credentials=credentials,
|
||||
grants=grants,
|
||||
status=Status(data.get("status", Status.ENABLED))
|
||||
)
|
||||
|
||||
class Users:
|
||||
def __init__(self, json_path: str = "./data/users.json"):
|
||||
self._json_path = json_path
|
||||
self._users: dict[str, User] = self._load_users()
|
||||
|
||||
def _load_users(self) -> dict[str, User]:
|
||||
try:
|
||||
with open(self._json_path, "r") as f:
|
||||
data = json.load(f)
|
||||
return {uid: User.from_dict(uid, info) for uid, info in data.items()}
|
||||
except Exception:
|
||||
return {}
|
||||
|
||||
def _save_users(self) -> None:
|
||||
with open(self._json_path, "w") as f:
|
||||
json.dump({uid: user.to_dict() for uid, user in self._users.items()}, f, indent=2)
|
||||
|
||||
def update_user(self, id: str, username: str | None, fullname: str | None) -> bool:
|
||||
if not id or not username or not fullname:
|
||||
return False
|
||||
if id in self._users:
|
||||
self._users[id].username = username
|
||||
self._users[id].fullname = fullname
|
||||
else:
|
||||
self._users[id] = User(id, username, fullname)
|
||||
self._save_users()
|
||||
return True
|
||||
|
||||
def get_username(self, id: str) -> str | None:
|
||||
return self._users[id].username if id in self._users else None
|
||||
|
||||
def get_fullname(self, id: str) -> str | None:
|
||||
return self._users[id].fullname if id in self._users else None
|
||||
|
||||
def get_status(self, id: str) -> Status:
|
||||
return self._users[id].status if id in self._users else Status.DISABLED
|
||||
|
||||
def get_role(self, id: str) -> Role:
|
||||
return self._users[id].role if id in self._users else Role.GUEST
|
||||
|
||||
def get_credentials(self, id: str) -> Credential | None:
|
||||
return self._users[id].credentials if id in self._users else None
|
||||
|
||||
def set_credentials(self, id: str, credentials: Credential) -> bool:
|
||||
if id in self._users:
|
||||
self._users[id].credentials = credentials
|
||||
self._save_users()
|
||||
return True
|
||||
return False
|
||||
|
||||
def can_open_gate(self, id: str, gate: str) -> bool:
|
||||
user = self._users.get(id)
|
||||
if not user or user.status != Status.ENABLED:
|
||||
return False
|
||||
if user.role == Role.ADMIN or user.role == Role.MEMBER:
|
||||
return True
|
||||
grant = user.grants.get(gate)
|
||||
if not grant or grant.status != Status.ENABLED:
|
||||
return False
|
||||
if grant.expires_at <= datetime.now(timezone.utc):
|
||||
return False
|
||||
return True
|
||||
|
||||
def has_grants(self, id: str) -> bool:
|
||||
user = self._users.get(id)
|
||||
if not user or user.status != Status.ENABLED:
|
||||
return False
|
||||
if user.role == Role.ADMIN or user.role == Role.MEMBER:
|
||||
return True
|
||||
return any(grant.status == Status.ENABLED and grant.expires_at > datetime.now(timezone.utc) for grant in user.grants.values())
|
||||
|
||||
def get_grantor(self, id: str, gate: str) -> str | None:
|
||||
user = self._users.get(id)
|
||||
if not user:
|
||||
return None
|
||||
grant = user.grants.get(gate)
|
||||
return grant.grantor if grant else None
|
||||
|
||||
def get_admins(self) -> list[str]:
|
||||
return [uid for uid, user in self._users.items() if user.role == Role.ADMIN]
|
||||
|
||||
def grant_access(self, id: str, gate: str, expires_at: datetime, grantor_id: str) -> bool:
|
||||
user = self._users.get(id)
|
||||
if not user:
|
||||
return False
|
||||
user.grants[gate] = Grant(grantor_id, expires_at)
|
||||
self._save_users()
|
||||
return True
|
||||
|
||||
def revoke_access(self, id: str, gate: str) -> bool:
|
||||
user = self._users.get(id)
|
||||
if not user:
|
||||
return False
|
||||
if gate in user.grants:
|
||||
del user.grants[gate]
|
||||
self._save_users()
|
||||
return True
|
||||
return False
|
||||
|
||||
def update_grant_last_used(self, user_id: str, gate: str) -> bool:
|
||||
user = self._users.get(user_id)
|
||||
if not user:
|
||||
return False
|
||||
grant = user.grants.get(gate)
|
||||
if not grant:
|
||||
return False
|
||||
grant.last_used_at = datetime.now(timezone.utc)
|
||||
self._save_users()
|
||||
return True
|
||||
|
||||
def get_granted_gates(self, user_id: str) -> list[str]:
|
||||
user = self._users.get(user_id)
|
||||
if not user:
|
||||
return []
|
||||
if user.role == Role.ADMIN or user.role == Role.MEMBER:
|
||||
return ['all']
|
||||
if 'all' in user.grants and user.grants['all'].status == Status.ENABLED and user.grants['all'].expires_at > datetime.now(timezone.utc):
|
||||
return ['all']
|
||||
return [gate for gate, grant in user.grants.items() if grant.status == Status.ENABLED and grant.expires_at > datetime.now(timezone.utc)]
|
5
src/repository/__init__.py
Normal file
5
src/repository/__init__.py
Normal file
@@ -0,0 +1,5 @@
|
||||
from .gates_repository import GatesRepository
|
||||
|
||||
__all__ = [
|
||||
"GatesRepository"
|
||||
]
|
21
src/repository/gates_repository.py
Normal file
21
src/repository/gates_repository.py
Normal file
@@ -0,0 +1,21 @@
|
||||
import json
|
||||
from models import Status, Gate
|
||||
|
||||
class GatesRepository:
|
||||
def __init__(self, json_path: str = "./data/gates.json"):
|
||||
self._json_path: str = json_path
|
||||
self._gates: dict[str, Gate] = self._load_gates()
|
||||
|
||||
def _load_gates(self) -> dict[str, Gate]:
|
||||
try:
|
||||
with open(self._json_path, "r") as file:
|
||||
gates_data = json.load(file)
|
||||
return {gate: Gate.from_dict(data) for gate, data in gates_data.items()}
|
||||
except Exception:
|
||||
return {}
|
||||
|
||||
def get_by_key(self, key: str) -> Gate | None:
|
||||
return self._gates[key] if key in self._gates else None
|
||||
|
||||
def get_all_enabled(self) -> list[Gate]:
|
||||
return [value for key, value in self._gates.items() if self._gates[key].status == Status.ENABLED]
|
7
src/services/__init__.py
Normal file
7
src/services/__init__.py
Normal file
@@ -0,0 +1,7 @@
|
||||
from .avconnect_service import AVConnectService
|
||||
from .gates_service import GatesService
|
||||
|
||||
__all__ = [
|
||||
"AVConnectService",
|
||||
"GatesService"
|
||||
]
|
54
src/services/avconnect_service.py
Normal file
54
src/services/avconnect_service.py
Normal file
@@ -0,0 +1,54 @@
|
||||
from fake_useragent import UserAgent
|
||||
from requests import Session
|
||||
|
||||
from models import Credential
|
||||
|
||||
|
||||
class AVConnectService:
|
||||
_BASE_URL = "https://www.avconnect.it"
|
||||
_USER_AGENT = UserAgent(browsers=["Chrome Mobile"], os=["Android"], platforms=["mobile"]).random
|
||||
_CONTENT_TYPE = "application/x-www-form-urlencoded; charset=UTF-8"
|
||||
|
||||
def open_gate_by_id(self, gate_id: str, credentials: Credential) -> bool:
|
||||
session = self._get_session(credentials)
|
||||
if not self._is_sessionid_valid(session) and not self._authenticate(session, credentials):
|
||||
raise Exception("Authentication failed.")
|
||||
exec_url = f"{self._BASE_URL}/exemacrocom.php"
|
||||
headers = {
|
||||
"User-Agent": self._USER_AGENT,
|
||||
"Content-Type": self._CONTENT_TYPE
|
||||
}
|
||||
payload = f"idmacrocom={gate_id}&nome=16"
|
||||
# Uncomment for real request:
|
||||
# response = self._session.post(exec_url, data=payload, headers=headers)
|
||||
# return response.ok
|
||||
return True # For testing
|
||||
|
||||
def _authenticate(self, session: Session, credentials: Credential) -> bool:
|
||||
login_url = f"{self._BASE_URL}/loginone.php"
|
||||
headers = {
|
||||
"User-Agent": self._USER_AGENT,
|
||||
"Content-Type": self._CONTENT_TYPE
|
||||
}
|
||||
payload = f"userid={credentials.username}&password={credentials.password}&entra=Login"
|
||||
response = session.post(login_url, data=payload, headers=headers)
|
||||
if response.ok and "PHPSESSID" in session.cookies:
|
||||
print("Authenticated")
|
||||
return True
|
||||
return False
|
||||
|
||||
def _is_sessionid_valid(self, session: Session) -> bool:
|
||||
exec_url = f"{self._BASE_URL}/exemacrocom.php"
|
||||
headers = {
|
||||
"User-Agent": self._USER_AGENT,
|
||||
}
|
||||
response = session.get(exec_url, headers=headers)
|
||||
print(response.ok)
|
||||
return response.ok
|
||||
|
||||
@staticmethod
|
||||
def _get_session(credentials: Credential) -> Session:
|
||||
session = Session()
|
||||
if credentials.sessionid:
|
||||
session.cookies.set("PHPSESSID", credentials.sessionid)
|
||||
return session
|
23
src/services/gates_service.py
Normal file
23
src/services/gates_service.py
Normal file
@@ -0,0 +1,23 @@
|
||||
from models import Credential, Status
|
||||
from repository import GatesRepository
|
||||
from services import AVConnectService
|
||||
|
||||
class GatesService:
|
||||
|
||||
def __init__(self, gate_repository: GatesRepository, avconnect_service: AVConnectService):
|
||||
self.gate_repository = gate_repository
|
||||
self.avconnect_service = avconnect_service
|
||||
|
||||
def open_gate(self, gate_key: str, credentials: Credential) -> bool:
|
||||
gate = self.gate_repository.get_by_key(gate_key)
|
||||
if not gate or gate.status == Status.DISABLED:
|
||||
return False
|
||||
try:
|
||||
return self.avconnect_service.open_gate_by_id(gate.id, credentials)
|
||||
except Exception as e:
|
||||
print(f"Failed to open gate {gate.name}: {e}")
|
||||
return False
|
||||
|
||||
def get_name(self, gate_key: str) -> str | None:
|
||||
gate = self.gate_repository.get_by_key(gate_key)
|
||||
return gate.name if gate else None
|
Reference in New Issue
Block a user