Files
lagomareGates/src/routers/admins.py
2026-05-10 16:38:12 +02:00

148 lines
5.6 KiB
Python

import base64
import io
from typing import Optional
import pyotp
import qrcode
from fastapi import APIRouter, Depends, HTTPException
from pydantic import BaseModel
from sqlalchemy.orm import Session
from core.auth import decrypt_secret, encrypt_secret, hash_password
from core.database import AdminUser, get_db
from core.dependencies import require_admin
from core.schemas import AdminPasswordChange, AdminUserCreate, AdminUserResponse, TotpSetupResponse
router = APIRouter(prefix="/api/admin/admins", tags=["admin-admins"])
@router.get("", response_model=list[AdminUserResponse])
async def list_admins(
db: Session = Depends(get_db), _: dict = Depends(require_admin)
):
return [AdminUserResponse(id=u.id, username=u.username, role=u.role, totp_enabled=u.totp_enabled) for u in db.query(AdminUser).order_by(AdminUser.id).all()]
@router.post("", response_model=AdminUserResponse, status_code=201)
async def create_admin(
req: AdminUserCreate,
db: Session = Depends(get_db),
_: dict = Depends(require_admin),
):
username = req.username.strip()
if not username:
raise HTTPException(422, "Username cannot be empty")
if req.role not in ("admin", "manager"):
raise HTTPException(422, "role must be 'admin' or 'manager'")
if len(req.password) < 12:
raise HTTPException(422, "Password must be at least 12 characters")
if db.query(AdminUser).filter_by(username=username).first():
raise HTTPException(409, "Username already exists")
user = AdminUser(username=username, password_hash=hash_password(req.password), role=req.role)
db.add(user)
db.commit()
db.refresh(user)
return AdminUserResponse(id=user.id, username=user.username, role=user.role)
@router.delete("/{username}", status_code=204)
async def delete_admin(
username: str,
db: Session = Depends(get_db),
caller: dict = Depends(require_admin),
):
if username == caller["sub"]:
raise HTTPException(409, "Cannot delete your own account")
user: Optional[AdminUser] = db.query(AdminUser).filter_by(username=username).first()
if not user:
raise HTTPException(404, "Admin not found")
if db.query(AdminUser).count() <= 1:
raise HTTPException(409, "Cannot delete the last admin account")
db.delete(user)
db.commit()
@router.patch("/{username}/password", status_code=204)
async def change_password(
username: str,
req: AdminPasswordChange,
db: Session = Depends(get_db),
_: dict = Depends(require_admin),
):
if not req.new_password:
raise HTTPException(422, "Password cannot be empty")
if len(req.new_password) < 12:
raise HTTPException(422, "Password must be at least 12 characters")
user: Optional[AdminUser] = db.query(AdminUser).filter_by(username=username).first()
if not user:
raise HTTPException(404, "Admin not found")
user.password_hash = hash_password(req.new_password)
db.commit()
# ── TOTP (2FA) ────────────────────────────────────────────────────────────────
@router.post("/{username}/totp/setup", response_model=TotpSetupResponse)
async def totp_setup(
username: str,
db: Session = Depends(get_db),
caller: dict = Depends(require_admin),
):
"""Generate a new TOTP secret and return the provisioning URI for QR scanning.
Does NOT enable 2FA yet — call /enable with a valid code to activate."""
if caller["sub"] != username:
raise HTTPException(403, "You can only configure 2FA for your own account")
user: Optional[AdminUser] = db.query(AdminUser).filter_by(username=username).first()
if not user:
raise HTTPException(404, "Admin not found")
secret = pyotp.random_base32()
user.totp_secret_enc = encrypt_secret(secret)
user.totp_enabled = False # pending confirmation
db.commit()
uri = pyotp.TOTP(secret).provisioning_uri(name=username, issuer_name="Lagomare Gates")
buf = io.BytesIO()
qrcode.make(uri).save(buf)
qr_b64 = base64.b64encode(buf.getvalue()).decode()
return TotpSetupResponse(provisioning_uri=uri, qr_image_b64=qr_b64)
class TotpConfirm(BaseModel):
otp_code: str
@router.post("/{username}/totp/enable", status_code=204)
async def totp_enable(
username: str,
req: TotpConfirm,
db: Session = Depends(get_db),
caller: dict = Depends(require_admin),
):
"""Verify the OTP code and activate 2FA for the account."""
if caller["sub"] != username:
raise HTTPException(403, "You can only configure 2FA for your own account")
user: Optional[AdminUser] = db.query(AdminUser).filter_by(username=username).first()
if not user or not user.totp_secret_enc:
raise HTTPException(422, "Run /setup first to generate a secret")
secret = decrypt_secret(user.totp_secret_enc)
if not pyotp.TOTP(secret).verify(req.otp_code.strip(), valid_window=1):
raise HTTPException(422, "Invalid OTP code — make sure your authenticator is in sync")
user.totp_enabled = True
db.commit()
@router.delete("/{username}/totp", status_code=204)
async def totp_disable(
username: str,
db: Session = Depends(get_db),
caller: dict = Depends(require_admin),
):
"""Disable 2FA and discard the stored secret."""
if caller["sub"] != username:
raise HTTPException(403, "You can only configure 2FA for your own account")
user: Optional[AdminUser] = db.query(AdminUser).filter_by(username=username).first()
if not user:
raise HTTPException(404, "Admin not found")
user.totp_enabled = False
user.totp_secret_enc = None
db.commit()