diff --git a/src/core/database.py b/src/core/database.py index 0f9a0f5..df733db 100644 --- a/src/core/database.py +++ b/src/core/database.py @@ -59,6 +59,7 @@ class Keypass(Base): revoked: Mapped[bool] = mapped_column(Boolean, default=False) revoked_at: Mapped[Optional[datetime]] = mapped_column(nullable=True) allowed_gates: Mapped[Optional[str]] = mapped_column(Text, nullable=True) # JSON list of gate IDs; NULL = all gates + schedule: Mapped[Optional[str]] = mapped_column(Text, nullable=True) # JSON schedule rule; NULL = always allowed class GateAccessLog(Base): @@ -107,3 +108,9 @@ def get_db(): def init_db() -> None: os.makedirs(DATA_DIR, exist_ok=True) Base.metadata.create_all(bind=engine) + # Lightweight migrations: add columns that may not exist in older databases + with engine.connect() as conn: + existing = {row[1] for row in conn.execute(text("PRAGMA table_info(keypasses)"))} + if "schedule" not in existing: + conn.execute(text("ALTER TABLE keypasses ADD COLUMN schedule TEXT")) + conn.commit() diff --git a/src/core/dependencies.py b/src/core/dependencies.py index 4839c83..b5ac6db 100644 --- a/src/core/dependencies.py +++ b/src/core/dependencies.py @@ -1,3 +1,4 @@ +import json from datetime import datetime from typing import Optional @@ -48,4 +49,19 @@ def require_keypass( raise HTTPException(status.HTTP_401_UNAUTHORIZED, "Keypass has been revoked") if kp.expires_at is not None and kp.expires_at < utcnow(): raise HTTPException(status.HTTP_401_UNAUTHORIZED, "Keypass has expired") + if kp.schedule: + try: + sched = json.loads(kp.schedule) + except (ValueError, TypeError): + sched = {} + now_local = datetime.now() + allowed_days = sched.get("days") + if allowed_days is not None and now_local.weekday() not in allowed_days: + raise HTTPException(status.HTTP_403_FORBIDDEN, "Keypass not currently allowed") + time_start = sched.get("time_start") + time_end = sched.get("time_end") + if time_start and time_end: + now_hhmm = now_local.strftime("%H:%M") + if now_hhmm < time_start or now_hhmm > time_end: + raise HTTPException(status.HTTP_403_FORBIDDEN, "Keypass not currently allowed") return kp diff --git a/src/core/schemas.py b/src/core/schemas.py index 8f885a3..c27a0e4 100644 --- a/src/core/schemas.py +++ b/src/core/schemas.py @@ -32,6 +32,13 @@ class AdminLoginResponse(BaseModel): # ── Keypasses ───────────────────────────────────────────────────────────────── +class ScheduleRule(BaseModel): + """Optional time/day-of-week restriction for a keypass.""" + days: Optional[list[int]] = None # 0=Mon..6=Sun; None/absent = any day + time_start: Optional[str] = None # "HH:MM" 24-hour server local time + time_end: Optional[str] = None # "HH:MM" 24-hour server local time + + class KeypassCreate(BaseModel): description: str expires_at: Optional[datetime] = None # None = never expires @@ -40,12 +47,14 @@ class KeypassCreate(BaseModel): # Auto-generation options (ignored when `code` is supplied manually) length: int = 12 # 6–32 charset: str = "alphanumeric" # "alphanumeric" | "alpha" | "numeric" | "passphrase" + schedule: Optional[ScheduleRule] = None # None = always allowed class KeypassPatch(BaseModel): description: Optional[str] = None expires_at: Optional[datetime] = None # None = never expires gate_ids: Optional[list[int]] = None # None = keep unchanged; [] = all gates + schedule: Optional[ScheduleRule] = None # absent = keep unchanged; null = clear class KeypassResponse(BaseModel): @@ -58,9 +67,16 @@ class KeypassResponse(BaseModel): revoked: bool revoked_at: Optional[datetime] = None allowed_gate_ids: list[int] # empty = all gates + schedule: Optional[ScheduleRule] = None def keypass_to_response(kp: Keypass) -> KeypassResponse: + sched: Optional[ScheduleRule] = None + if kp.schedule: + try: + sched = ScheduleRule(**json.loads(kp.schedule)) + except Exception: + pass return KeypassResponse( id=kp.id, code=kp.code, @@ -70,6 +86,7 @@ def keypass_to_response(kp: Keypass) -> KeypassResponse: revoked=kp.revoked, revoked_at=kp.revoked_at, allowed_gate_ids=json.loads(kp.allowed_gates) if kp.allowed_gates else [], + schedule=sched, ) diff --git a/src/routers/keypasses.py b/src/routers/keypasses.py index 9d2b41c..0d15abd 100644 --- a/src/routers/keypasses.py +++ b/src/routers/keypasses.py @@ -16,6 +16,16 @@ from core.schemas import KeypassCreate, KeypassPatch, KeypassResponse, keypass_t router = APIRouter(prefix="/api/admin/keypasses", tags=["admin-keypasses"]) + +def _serialize_schedule(s) -> Optional[str]: + """Serialize a ScheduleRule to a JSON string, or None if effectively empty.""" + if s is None: + return None + d = s.model_dump(exclude_none=True) + if "days" in d and not d["days"]: + del d["days"] + return json.dumps(d) if d else None + # ── Word list for passphrase mode ───────────────────────────────────────────── _WORDS = [ "apple", "beach", "brick", "brush", "cabin", "calm", "cedar", "chain", @@ -102,6 +112,7 @@ async def create_keypass( expires_at=req.expires_at, revoked=False, allowed_gates=json.dumps(req.gate_ids) if req.gate_ids else None, + schedule=_serialize_schedule(req.schedule), ) db.add(kp) db.commit() @@ -126,6 +137,8 @@ async def update_keypass( kp.expires_at = req.expires_at if req.gate_ids is not None: kp.allowed_gates = json.dumps(req.gate_ids) if req.gate_ids else None + if "schedule" in req.model_fields_set: + kp.schedule = _serialize_schedule(req.schedule) db.commit() db.refresh(kp) return keypass_to_response(kp) diff --git a/src/static/admin.html b/src/static/admin.html index 0766445..6d300e1 100644 --- a/src/static/admin.html +++ b/src/static/admin.html @@ -159,6 +159,7 @@ Description Gates Expires + Schedule Status @@ -468,6 +469,38 @@
+
+ + + +
+
+ + + +