200 lines
6.2 KiB
Python
200 lines
6.2 KiB
Python
import json
|
||
from datetime import datetime
|
||
from typing import Optional
|
||
|
||
from pydantic import BaseModel, ConfigDict
|
||
|
||
from core.database import Keypass
|
||
|
||
|
||
# ── Auth ──────────────────────────────────────────────────────────────────────
|
||
|
||
class AdminLoginRequest(BaseModel):
|
||
username: str
|
||
password: str
|
||
otp_code: Optional[str] = None # 6-digit TOTP code; required when account has 2FA enabled
|
||
|
||
|
||
class KeypassLoginRequest(BaseModel):
|
||
code: str
|
||
|
||
|
||
class TokenResponse(BaseModel):
|
||
token: str
|
||
token_type: str = "bearer"
|
||
|
||
|
||
class AdminLoginResponse(BaseModel):
|
||
token: Optional[str] = None
|
||
token_type: str = "bearer"
|
||
otp_required: bool = False
|
||
|
||
|
||
# ── Keypasses ─────────────────────────────────────────────────────────────────
|
||
|
||
class ScheduleRule(BaseModel):
|
||
"""Optional time/day-of-week restriction for a keypass."""
|
||
days: Optional[list[int]] = None # 0=Mon..6=Sun; None/absent = any day
|
||
time_start: Optional[str] = None # "HH:MM" 24-hour server local time
|
||
time_end: Optional[str] = None # "HH:MM" 24-hour server local time
|
||
|
||
|
||
class KeypassCreate(BaseModel):
|
||
description: str
|
||
expires_at: Optional[datetime] = None # None = never expires
|
||
gate_ids: list[int] = [] # empty = all gates
|
||
code: Optional[str] = None # None = auto-generate
|
||
# Auto-generation options (ignored when `code` is supplied manually)
|
||
length: int = 12 # 6–32
|
||
charset: str = "alphanumeric" # "alphanumeric" | "alpha" | "numeric" | "passphrase"
|
||
schedule: Optional[ScheduleRule] = None # None = always allowed
|
||
|
||
|
||
class KeypassPatch(BaseModel):
|
||
description: Optional[str] = None
|
||
expires_at: Optional[datetime] = None # None = never expires
|
||
gate_ids: Optional[list[int]] = None # None = keep unchanged; [] = all gates
|
||
schedule: Optional[ScheduleRule] = None # absent = keep unchanged; null = clear
|
||
|
||
|
||
class KeypassResponse(BaseModel):
|
||
model_config = ConfigDict(from_attributes=True)
|
||
id: int
|
||
code: str
|
||
description: str
|
||
created_at: datetime
|
||
expires_at: Optional[datetime]
|
||
revoked: bool
|
||
revoked_at: Optional[datetime] = None
|
||
allowed_gate_ids: list[int] # empty = all gates
|
||
schedule: Optional[ScheduleRule] = None
|
||
|
||
|
||
def keypass_to_response(kp: Keypass) -> KeypassResponse:
|
||
sched: Optional[ScheduleRule] = None
|
||
if kp.schedule:
|
||
try:
|
||
sched = ScheduleRule(**json.loads(kp.schedule))
|
||
except Exception:
|
||
pass
|
||
return KeypassResponse(
|
||
id=kp.id,
|
||
code=kp.code,
|
||
description=kp.description,
|
||
created_at=kp.created_at,
|
||
expires_at=kp.expires_at,
|
||
revoked=kp.revoked,
|
||
revoked_at=kp.revoked_at,
|
||
allowed_gate_ids=json.loads(kp.allowed_gates) if kp.allowed_gates else [],
|
||
schedule=sched,
|
||
)
|
||
|
||
|
||
# ── Gates ─────────────────────────────────────────────────────────────────────
|
||
|
||
class GateResponse(BaseModel):
|
||
"""Full gate response — admin use only."""
|
||
model_config = ConfigDict(from_attributes=True)
|
||
id: int
|
||
name: str
|
||
gate_icon: str
|
||
api_provider: str
|
||
avconnect_macro_id: Optional[str] = None
|
||
shelly_device_id: Optional[str] = None
|
||
status: str
|
||
group_name: Optional[str] = None
|
||
lat: Optional[float] = None
|
||
lon: Optional[float] = None
|
||
|
||
|
||
class GatePublicResponse(BaseModel):
|
||
"""Gate response for keypass users — no internal fields."""
|
||
model_config = ConfigDict(from_attributes=True)
|
||
id: int
|
||
name: str
|
||
gate_icon: str
|
||
group_name: Optional[str] = None
|
||
lat: Optional[float] = None
|
||
lon: Optional[float] = None
|
||
|
||
|
||
class GateCreate(BaseModel):
|
||
name: str
|
||
gate_icon: str = "🚪" # any UTF-8 character/emoji
|
||
api_provider: str = "avconnect" # 'avconnect' | 'shelly'
|
||
avconnect_macro_id: Optional[str] = None
|
||
shelly_device_id: Optional[str] = None
|
||
status: str = "enabled"
|
||
group_name: Optional[str] = None
|
||
lat: Optional[float] = None
|
||
lon: Optional[float] = None
|
||
|
||
|
||
# ── API Credentials ──────────────────────────────────────────────────────────
|
||
|
||
class CredentialRead(BaseModel):
|
||
id: int
|
||
username: str
|
||
|
||
|
||
class CredentialUpsert(BaseModel):
|
||
username: str
|
||
password: str
|
||
|
||
|
||
class ShellyCredentialRead(BaseModel):
|
||
id: int
|
||
server_uri: str
|
||
|
||
|
||
class ShellyCredentialUpsert(BaseModel):
|
||
server_uri: str
|
||
auth_key: str
|
||
|
||
|
||
# ── Admin users ───────────────────────────────────────────────────────────────
|
||
|
||
class AdminUserResponse(BaseModel):
|
||
id: int
|
||
username: str
|
||
role: str # 'admin' | 'manager'
|
||
totp_enabled: bool = False
|
||
|
||
|
||
class TotpSetupResponse(BaseModel):
|
||
provisioning_uri: str # otpauth:// URI for QR scanning
|
||
qr_image_b64: str # base64-encoded PNG of the QR code
|
||
|
||
|
||
class AdminUserCreate(BaseModel):
|
||
username: str
|
||
password: str
|
||
role: str = "admin" # 'admin' | 'manager'
|
||
|
||
|
||
class AdminPasswordChange(BaseModel):
|
||
new_password: str
|
||
|
||
|
||
# ── Statistics ────────────────────────────────────────────────────────────────
|
||
|
||
class AccessLogResponse(BaseModel):
|
||
model_config = ConfigDict(from_attributes=True)
|
||
id: int
|
||
timestamp: datetime
|
||
keypass_id: int
|
||
keypass_code: str
|
||
gate_id: int
|
||
gate_name: str
|
||
ip_address: Optional[str]
|
||
user_agent: Optional[str]
|
||
success: bool
|
||
error: Optional[str]
|
||
|
||
|
||
class StatsPage(BaseModel):
|
||
total: int
|
||
page: int
|
||
page_size: int
|
||
items: list[AccessLogResponse]
|