mirror of
https://github.com/Noettore/lagomareGateKeeperBot.git
synced 2025-10-14 19:16:40 +02:00
108 lines
4.8 KiB
Python
108 lines
4.8 KiB
Python
from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup
|
|
from telegram.ext import ContextTypes
|
|
from models import Gates, Users, Role
|
|
|
|
async def opengate(update: Update, context: ContextTypes.DEFAULT_TYPE, users: Users, gates: Gates):
|
|
assert update.effective_user is not None
|
|
assert update.message is not None
|
|
|
|
user_id = str(update.effective_user.id)
|
|
args = context.args
|
|
if not args:
|
|
return await update.message.reply_text("Usage: `/opengate <gate>`")
|
|
gate = args[0]
|
|
gate_name = gates.get_name(gate)
|
|
role = users.get_role(user_id)
|
|
if role in (Role.ADMIN, Role.MEMBER):
|
|
creds = users.get_credentials(user_id)
|
|
if not creds:
|
|
return await update.message.reply_text("Please set your credentials with `/setcredentials` first")
|
|
elif role == Role.GUEST and users.can_open_gate(user_id, gate):
|
|
grantor = users.get_grantor(user_id, gate)
|
|
assert grantor is not None
|
|
|
|
creds = users.get_credentials(grantor)
|
|
if not grantor:
|
|
return await update.message.reply_text("No valid grantor available.")
|
|
if not creds:
|
|
return await update.message.reply_text("No valid grantor credentials available.")
|
|
users.update_grant_last_used(user_id, gate)
|
|
try:
|
|
await context.bot.send_message(chat_id=grantor, text=f"Guest {user_id} opened {gate_name}")
|
|
except Exception as e:
|
|
print(f"Failed to notify {grantor} that guest {user_id} opened {gate_name}: {e}")
|
|
else:
|
|
return await update.message.reply_text("Access denied.")
|
|
|
|
if gates.open_gate(gate, creds):
|
|
return await update.message.reply_text(f"Gate {gate_name} opened!")
|
|
await update.message.reply_text(f"ERROR: Cannot open gate {gate_name}")
|
|
|
|
async def open_gate_menu(update: Update, context: ContextTypes.DEFAULT_TYPE, users: Users, gates: Gates):
|
|
assert update.effective_user is not None
|
|
|
|
user_id = str(update.effective_user.id)
|
|
granted_gates = users.get_granted_gates(user_id)
|
|
if not granted_gates:
|
|
if update.callback_query:
|
|
await update.callback_query.answer("You have no gates to open.")
|
|
elif update.message:
|
|
return await update.message.reply_text("You have no gates to open.")
|
|
|
|
if 'all' in granted_gates:
|
|
granted_gates = gates.get_all_gates_id()
|
|
# Show a list of available gates as buttons
|
|
keyboard = []
|
|
for gate_id in granted_gates:
|
|
gate_name = gates.get_name(gate_id)
|
|
assert gate_name is not None
|
|
keyboard.append([InlineKeyboardButton(gate_name, callback_data=f"opengate_{gate_id}")])
|
|
|
|
reply_markup = InlineKeyboardMarkup(keyboard)
|
|
if update.callback_query:
|
|
await update.callback_query.answer()
|
|
await update.callback_query.edit_message_text(
|
|
"Select a gate to open:", reply_markup=reply_markup
|
|
)
|
|
elif update.message:
|
|
await update.message.reply_text("Select a gate to open:", reply_markup=reply_markup)
|
|
|
|
async def handle_gate_open_callback(update: Update, context: ContextTypes.DEFAULT_TYPE, users: Users, gates: Gates):
|
|
assert update.callback_query is not None
|
|
|
|
query = update.callback_query
|
|
user_id = str(query.from_user.id)
|
|
data = query.data
|
|
assert data is not None
|
|
if data.startswith("opengate_"):
|
|
gate_id = data[len("opengate_") :]
|
|
gate_name = gates.get_name(gate_id)
|
|
role = users.get_role(user_id)
|
|
if role in (Role.ADMIN, Role.MEMBER):
|
|
creds = users.get_credentials(user_id)
|
|
if not creds:
|
|
await query.answer("Please set your credentials with /setcredentials first", show_alert=True)
|
|
return
|
|
elif role == Role.GUEST and users.can_open_gate(user_id, gate_id):
|
|
grantor = users.get_grantor(user_id, gate_id)
|
|
assert grantor is not None
|
|
creds = users.get_credentials(grantor)
|
|
if not grantor or not creds:
|
|
await query.answer("No valid grantor credentials available.", show_alert=True)
|
|
return
|
|
users.update_grant_last_used(user_id, gate_id)
|
|
try:
|
|
await context.bot.send_message(chat_id=grantor, text=f"Guest {user_id} opened {gate_name}")
|
|
except Exception as e:
|
|
print(f"Failed to notify {grantor} that guest {user_id} opened {gate_name}: {e}")
|
|
else:
|
|
# TODO: guest not working
|
|
await query.answer("Access denied.", show_alert=True)
|
|
return
|
|
|
|
if gates.open_gate(gate_id, creds):
|
|
await query.answer(f"Gate {gate_name} opened!", show_alert=True)
|
|
await query.edit_message_text(f"Gate {gate_name} opened!")
|
|
else:
|
|
await query.answer(f"ERROR: Cannot open gate {gate_name}", show_alert=True)
|
|
await query.edit_message_text(f"ERROR: Cannot open gate {gate_name}") |