Files
lagomareGates/src/routers/keypasses.py
2026-05-06 01:51:22 +02:00

90 lines
2.9 KiB
Python

import json
import secrets
import string
from datetime import datetime
from typing import Optional
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.orm import Session
from core.database import Keypass, get_db
from core.dependencies import require_manager
from core.schemas import KeypassCreate, KeypassPatch, KeypassResponse, keypass_to_response
router = APIRouter(prefix="/api/admin/keypasses", tags=["admin-keypasses"])
def _generate_code(length: int = 12) -> str:
alphabet = string.ascii_uppercase + string.digits
return "".join(secrets.choice(alphabet) for _ in range(length))
@router.get("", response_model=list[KeypassResponse])
async def list_keypasses(
db: Session = Depends(get_db), _: dict = Depends(require_manager)
):
return [keypass_to_response(kp) for kp in db.query(Keypass).order_by(Keypass.created_at.desc()).all()]
@router.post("", response_model=KeypassResponse, status_code=201)
async def create_keypass(
req: KeypassCreate,
db: Session = Depends(get_db),
_: dict = Depends(require_manager),
):
code = req.code.strip().upper() if req.code and req.code.strip() else _generate_code()
if db.query(Keypass).filter(Keypass.code == code).first():
raise HTTPException(409, "A keypass with this code already exists")
kp = Keypass(
code=code,
description=req.description,
created_at=datetime.utcnow(),
expires_at=req.expires_at,
revoked=False,
allowed_gates=json.dumps(req.gate_ids) if req.gate_ids else None,
)
db.add(kp)
db.commit()
db.refresh(kp)
return keypass_to_response(kp)
@router.patch("/{kp_id}", response_model=KeypassResponse)
async def update_keypass(
kp_id: int,
req: KeypassPatch,
db: Session = Depends(get_db),
_: dict = Depends(require_manager),
):
kp: Optional[Keypass] = db.query(Keypass).filter(Keypass.id == kp_id).first()
if not kp:
raise HTTPException(404, "Keypass not found")
if kp.revoked:
raise HTTPException(409, "Revoked keypasses cannot be edited")
if req.description is not None:
kp.description = req.description.strip()
kp.expires_at = req.expires_at
if req.gate_ids is not None:
kp.allowed_gates = json.dumps(req.gate_ids) if req.gate_ids else None
db.commit()
db.refresh(kp)
return keypass_to_response(kp)
@router.delete("/{kp_id}", status_code=204)
async def revoke_keypass(
kp_id: int,
db: Session = Depends(get_db),
_: dict = Depends(require_manager),
):
kp: Optional[Keypass] = db.query(Keypass).filter(Keypass.id == kp_id).first()
if not kp:
raise HTTPException(404, "Keypass not found")
if kp.expires_at is not None and kp.expires_at < datetime.utcnow():
raise HTTPException(409, "Expired keypasses cannot be revoked")
if kp.revoked:
raise HTTPException(409, "Keypass is already revoked")
kp.revoked = True
kp.revoked_at = datetime.utcnow()
db.commit()