Files
canteen-asset-tracker/server.py
T

2590 lines
91 KiB
Python

"""
Canteen Asset Geolocation Tool — FastAPI server.
Single-file backend: SQLite storage, asset CRUD, machine_id search,
check-ins with GPS, stats, and CSV export.
v2 schema: full asset management with customers, locations, keys, badges, etc.
"""
import csv
import hashlib
import io
import json as _json
import os
import re
import secrets
import sqlite3
import uuid
from contextlib import asynccontextmanager
from pathlib import Path
import pytesseract
from PIL import Image as PILImage
from fastapi import FastAPI, HTTPException, Query, Request, UploadFile, File
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse, StreamingResponse
from fastapi.staticfiles import StaticFiles
from pydantic import BaseModel
from typing import Optional, List
# ─── Config ─────────────────────────────────────────────────────────────────
VALID_CATEGORIES = {"Furniture", "Appliances", "Utensils & Serveware", "Equipment", "Other"}
VALID_STATUSES = {"active", "maintenance", "retired"}
DB_PATH = os.environ.get("CANTEEN_DB_PATH", str(Path(__file__).parent / "assets.db"))
UPLOADS_DIR = Path(os.environ.get("CANTEEN_UPLOADS_DIR", str(Path(__file__).parent / "uploads")))
STATIC_DIR = Path(__file__).parent / "static"
# ─── Database ───────────────────────────────────────────────────────────────
def get_db() -> sqlite3.Connection:
"""Return a new DB connection with WAL + foreign keys enabled."""
conn = sqlite3.connect(DB_PATH)
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA foreign_keys=ON")
conn.row_factory = sqlite3.Row
return conn
def _create_v2_tables(conn: sqlite3.Connection):
"""Create all v2 tables if they don't exist."""
conn.executescript("""
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT UNIQUE NOT NULL,
password_hash TEXT NOT NULL,
role TEXT NOT NULL DEFAULT 'technician',
created_at TEXT NOT NULL DEFAULT (datetime('now'))
);
CREATE TABLE IF NOT EXISTS customers (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT UNIQUE NOT NULL,
created_at TEXT NOT NULL DEFAULT (datetime('now')),
updated_at TEXT NOT NULL DEFAULT (datetime('now'))
);
CREATE TABLE IF NOT EXISTS customer_contacts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
customer_id INTEGER NOT NULL REFERENCES customers(id) ON DELETE CASCADE,
name TEXT,
phone TEXT,
email TEXT
);
CREATE TABLE IF NOT EXISTS locations (
id INTEGER PRIMARY KEY AUTOINCREMENT,
customer_id INTEGER REFERENCES customers(id),
name TEXT,
address TEXT DEFAULT '',
building_name TEXT DEFAULT '',
building_number TEXT DEFAULT '',
floor TEXT DEFAULT '',
trailer_number TEXT DEFAULT '',
site_hours TEXT DEFAULT '',
access_notes TEXT DEFAULT '',
walking_directions TEXT DEFAULT '',
map_link TEXT DEFAULT '',
latitude REAL DEFAULT NULL,
longitude REAL DEFAULT NULL,
created_at TEXT NOT NULL DEFAULT (datetime('now')),
updated_at TEXT NOT NULL DEFAULT (datetime('now'))
);
CREATE TABLE IF NOT EXISTS rooms (
id INTEGER PRIMARY KEY AUTOINCREMENT,
location_id INTEGER NOT NULL REFERENCES locations(id) ON DELETE CASCADE,
name TEXT,
floor TEXT DEFAULT '',
created_at TEXT NOT NULL DEFAULT (datetime('now')),
updated_at TEXT NOT NULL DEFAULT (datetime('now'))
);
CREATE TABLE IF NOT EXISTS categories (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT UNIQUE NOT NULL,
icon TEXT DEFAULT ''
);
CREATE TABLE IF NOT EXISTS key_names (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT UNIQUE NOT NULL
);
CREATE TABLE IF NOT EXISTS key_types (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT UNIQUE NOT NULL
);
CREATE TABLE IF NOT EXISTS badge_types (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT UNIQUE NOT NULL
);
CREATE TABLE IF NOT EXISTS makes (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT UNIQUE NOT NULL
);
CREATE TABLE IF NOT EXISTS models (
id INTEGER PRIMARY KEY AUTOINCREMENT,
make_id INTEGER NOT NULL REFERENCES makes(id),
name TEXT NOT NULL,
icon_path TEXT
);
CREATE TABLE IF NOT EXISTS assets (
id INTEGER PRIMARY KEY AUTOINCREMENT,
machine_id TEXT NOT NULL UNIQUE,
serial_number TEXT DEFAULT '',
name TEXT NOT NULL,
description TEXT DEFAULT '',
category TEXT NOT NULL DEFAULT 'Other',
status TEXT NOT NULL DEFAULT 'active',
make TEXT DEFAULT '',
model TEXT DEFAULT '',
address TEXT DEFAULT '',
building_name TEXT DEFAULT '',
building_number TEXT DEFAULT '',
floor TEXT DEFAULT '',
room TEXT DEFAULT '',
trailer_number TEXT DEFAULT '',
walking_directions TEXT DEFAULT '',
map_link TEXT DEFAULT '',
parking_location TEXT DEFAULT '',
photo_path TEXT,
customer_id INTEGER REFERENCES customers(id),
location_id INTEGER REFERENCES locations(id),
assigned_to INTEGER REFERENCES users(id),
latitude REAL DEFAULT NULL,
longitude REAL DEFAULT NULL,
geofence_radius_meters INTEGER DEFAULT 50,
created_at TEXT NOT NULL DEFAULT (datetime('now')),
updated_at TEXT NOT NULL DEFAULT (datetime('now'))
);
CREATE TABLE IF NOT EXISTS checkins (
id INTEGER PRIMARY KEY AUTOINCREMENT,
asset_id INTEGER NOT NULL REFERENCES assets(id) ON DELETE CASCADE,
user_id INTEGER REFERENCES users(id),
latitude REAL,
longitude REAL,
accuracy REAL,
photo_path TEXT,
notes TEXT DEFAULT '',
created_at TEXT NOT NULL DEFAULT (datetime('now'))
);
CREATE TABLE IF NOT EXISTS asset_keys (
id INTEGER PRIMARY KEY AUTOINCREMENT,
asset_id INTEGER NOT NULL REFERENCES assets(id) ON DELETE CASCADE,
key_name TEXT,
key_type TEXT
);
CREATE TABLE IF NOT EXISTS asset_badges (
id INTEGER PRIMARY KEY AUTOINCREMENT,
asset_id INTEGER NOT NULL REFERENCES assets(id) ON DELETE CASCADE,
badge_name TEXT
);
CREATE TABLE IF NOT EXISTS settings (
id INTEGER PRIMARY KEY AUTOINCREMENT,
key TEXT UNIQUE NOT NULL,
value TEXT
);
CREATE TABLE IF NOT EXISTS activity_log (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER REFERENCES users(id),
action TEXT,
entity_type TEXT,
entity_id INTEGER,
details TEXT,
created_at TEXT NOT NULL DEFAULT (datetime('now'))
);
CREATE TABLE IF NOT EXISTS geofences (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT,
points TEXT,
color TEXT,
created_at TEXT NOT NULL DEFAULT (datetime('now')),
updated_at TEXT NOT NULL DEFAULT (datetime('now'))
);
CREATE TABLE IF NOT EXISTS geofence_users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
geofence_id INTEGER NOT NULL REFERENCES geofences(id) ON DELETE CASCADE,
user_id INTEGER NOT NULL REFERENCES users(id) ON DELETE CASCADE,
UNIQUE(geofence_id, user_id)
);
CREATE TABLE IF NOT EXISTS visits (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER REFERENCES users(id),
asset_id INTEGER REFERENCES assets(id) ON DELETE CASCADE,
checkin_time TEXT,
checkout_time TEXT,
duration_minutes INTEGER,
created_at TEXT NOT NULL DEFAULT (datetime('now'))
);
CREATE TABLE IF NOT EXISTS sessions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER REFERENCES users(id) ON DELETE CASCADE,
token TEXT UNIQUE NOT NULL,
created_at TEXT NOT NULL DEFAULT (datetime('now'))
);
CREATE INDEX IF NOT EXISTS idx_checkins_asset_id ON checkins(asset_id);
CREATE INDEX IF NOT EXISTS idx_checkins_created_at ON checkins(created_at);
""")
def _seed_if_empty(conn: sqlite3.Connection, table: str, columns: tuple, rows: list):
"""Insert seed rows if table is empty."""
existing = conn.execute(f"SELECT COUNT(*) FROM {table}").fetchone()[0]
if existing == 0:
placeholders = ", ".join(["?"] * len(columns))
col_names = ", ".join(columns)
conn.executemany(
f"INSERT INTO {table} ({col_names}) VALUES ({placeholders})",
rows,
)
def _seed_data(conn: sqlite3.Connection):
"""Insert default seed data for lookup tables."""
_seed_if_empty(conn, "categories", ("name", "icon"), [
("Furniture", "🪑"), ("Appliances", "🔌"),
("Utensils & Serveware", "🍽️"), ("Equipment", "⚙️"), ("Other", "📦"),
])
_seed_if_empty(conn, "key_names", ("name",), [
("MK500",), ("Green Dot",), ("Red Key",), ("Blue Key",),
("Master Key",), ("Padlock Key",),
])
_seed_if_empty(conn, "key_types", ("name",), [
("Round Short",), ("Barrel",), ("Standard",), ("Flat",), ("Tubular",),
])
_seed_if_empty(conn, "badge_types", ("name",), [
("Disney Contractor Base",), ("Visitor Badge",), ("Employee Badge",),
("Contractor Badge",), ("Temporary Pass",),
])
_seed_if_empty(conn, "makes", ("name",), [
("Canteen",), ("Hobart",), ("Vollrath",), ("Metro",),
("Rubbermaid",), ("Cambro",), ("Other",),
])
# Seed default admin user
existing = conn.execute("SELECT COUNT(*) FROM users").fetchone()[0]
if existing == 0:
conn.execute(
"INSERT INTO users (username, password_hash, role) VALUES (?, ?, ?)",
("admin", "057ba03d6c44104863dc7361fe4578965d1887360f90a0895882e58a6248fc86", "admin"),
)
def _ensure_unique_machine_id(conn: sqlite3.Connection):
"""Remove duplicate machine_ids (keep oldest), then create UNIQUE index."""
dupes = conn.execute("""
SELECT machine_id FROM assets
GROUP BY machine_id
HAVING COUNT(*) > 1
""").fetchall()
for dupe in dupes:
mid = dupe["machine_id"]
ids = conn.execute(
"SELECT id FROM assets WHERE machine_id = ? ORDER BY id", (mid,)
).fetchall()
keep_id = ids[0]["id"]
for row in ids[1:]:
conn.execute("DELETE FROM assets WHERE id = ?", (row["id"],))
conn.execute("DROP INDEX IF EXISTS idx_assets_machine_id")
conn.execute("CREATE UNIQUE INDEX idx_assets_machine_id ON assets(machine_id)")
def _migrate_v1_to_v2(conn: sqlite3.Connection):
"""Migrate existing v1 database (barcode-based) to v2 schema."""
# Create all new tables first
_create_v2_tables(conn)
# Create assets_v2 with new schema, copying old data
conn.execute("""
CREATE TABLE assets_v2 (
id INTEGER PRIMARY KEY AUTOINCREMENT,
machine_id TEXT NOT NULL UNIQUE,
serial_number TEXT DEFAULT '',
name TEXT NOT NULL,
description TEXT DEFAULT '',
category TEXT NOT NULL DEFAULT 'Other',
status TEXT NOT NULL DEFAULT 'active',
make TEXT DEFAULT '',
model TEXT DEFAULT '',
address TEXT DEFAULT '',
building_name TEXT DEFAULT '',
building_number TEXT DEFAULT '',
floor TEXT DEFAULT '',
room TEXT DEFAULT '',
trailer_number TEXT DEFAULT '',
walking_directions TEXT DEFAULT '',
map_link TEXT DEFAULT '',
parking_location TEXT DEFAULT '',
photo_path TEXT,
customer_id INTEGER REFERENCES customers(id),
location_id INTEGER REFERENCES locations(id),
assigned_to INTEGER REFERENCES users(id),
created_at TEXT NOT NULL DEFAULT (datetime('now')),
updated_at TEXT NOT NULL DEFAULT (datetime('now'))
)
""")
conn.execute("""
INSERT INTO assets_v2 (id, machine_id, name, description, category, status, photo_path, created_at, updated_at)
SELECT id, barcode, name, description, category, status, photo_path, created_at, updated_at
FROM assets
""")
conn.execute("DROP TABLE assets")
conn.execute("ALTER TABLE assets_v2 RENAME TO assets")
_ensure_unique_machine_id(conn)
conn.execute("CREATE INDEX IF NOT EXISTS idx_assets_category ON assets(category)")
# Add user_id to checkins if not present
cursor = conn.execute("PRAGMA table_info(checkins)")
checkin_cols = {row[1] for row in cursor.fetchall()}
if "user_id" not in checkin_cols:
conn.execute("ALTER TABLE checkins ADD COLUMN user_id INTEGER REFERENCES users(id)")
# Seed lookup data
_seed_data(conn)
conn.commit()
def init_db(conn: sqlite3.Connection):
"""Create tables and indexes if they don't exist. Runs v1→v2 migration if needed."""
# Check if assets table exists and has old schema
cursor = conn.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name='assets'"
)
if cursor.fetchone():
col_cursor = conn.execute("PRAGMA table_info(assets)")
columns = {row[1] for row in col_cursor.fetchall()}
if "barcode" in columns and "machine_id" not in columns:
_migrate_v1_to_v2(conn)
return
# Fresh install or already migrated — create all tables
_create_v2_tables(conn)
_seed_data(conn)
# Asset indexes — created here (not in _create_v2_tables) to avoid
# failing during migration when old v1 assets table lacks machine_id.
_ensure_unique_machine_id(conn)
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_assets_category ON assets(category)"
)
conn.commit()
# Add lat/lng to assets if not present (v3 migration)
cursor = conn.execute("PRAGMA table_info(assets)")
asset_cols = {row[1] for row in cursor.fetchall()}
if "latitude" not in asset_cols:
conn.execute("ALTER TABLE assets ADD COLUMN latitude REAL DEFAULT NULL")
if "longitude" not in asset_cols:
conn.execute("ALTER TABLE assets ADD COLUMN longitude REAL DEFAULT NULL")
if "geofence_radius_meters" not in asset_cols:
conn.execute("ALTER TABLE assets ADD COLUMN geofence_radius_meters INTEGER DEFAULT 50")
cursor = conn.execute("PRAGMA table_info(locations)")
loc_cols = {row[1] for row in cursor.fetchall()}
if "latitude" not in loc_cols:
conn.execute("ALTER TABLE locations ADD COLUMN latitude REAL DEFAULT NULL")
if "longitude" not in loc_cols:
conn.execute("ALTER TABLE locations ADD COLUMN longitude REAL DEFAULT NULL")
conn.commit()
# ─── App / Middleware ───────────────────────────────────────────────────────
@asynccontextmanager
async def lifespan(app: FastAPI):
UPLOADS_DIR.mkdir(parents=True, exist_ok=True)
conn = get_db()
init_db(conn)
conn.close()
yield
app = FastAPI(title="Canteen Asset Tracker", lifespan=lifespan)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# ─── Auth Middleware ──────────────────────────────────────────────────────────
@app.middleware("http")
async def auth_middleware(request: Request, call_next):
"""Require valid Bearer token for all /api/* routes except login."""
path = request.url.path
# Skip auth enforcement in test mode (set by tests/test_server.py)
if os.environ.get("CANTEEN_SKIP_AUTH") == "1":
return await call_next(request)
# Public paths — no auth required
if not path.startswith("/api/") or path == "/api/auth/login":
return await call_next(request)
# Extract and validate token
auth_header = request.headers.get("Authorization", "")
if not auth_header.startswith("Bearer "):
return JSONResponse(
status_code=401,
content={"detail": "Authentication required"},
)
token = auth_header[7:]
conn = get_db()
row = conn.execute(
"SELECT u.id, u.username, u.role FROM users u "
"JOIN sessions s ON u.id = s.user_id WHERE s.token = ?",
(token,),
).fetchone()
conn.close()
if row is None:
return JSONResponse(
status_code=401,
content={"detail": "Invalid or expired token"},
)
request.state.current_user = {
"id": row["id"],
"username": row["username"],
"role": row["role"],
}
request.state.user_id = row["id"]
return await call_next(request)
# ─── Global Error Handling ───────────────────────────────────────────────────
@app.exception_handler(HTTPException)
async def http_exception_handler(request: Request, exc: HTTPException):
"""Return structured JSON with status detail for all HTTP exceptions."""
return JSONResponse(
status_code=exc.status_code,
content={"detail": exc.detail},
)
@app.exception_handler(Exception)
async def generic_exception_handler(request: Request, exc: Exception):
"""Catch-all for unhandled exceptions — log and return 500."""
import traceback
traceback.print_exc()
return JSONResponse(
status_code=500,
content={"detail": "Internal server error"},
)
# ─── Input sanitization helpers ──────────────────────────────────────────────
def _sanitize_machine_id(machine_id: str) -> str:
"""Strip whitespace and reject empty machine IDs."""
clean = machine_id.strip()
if not clean:
raise HTTPException(status_code=422, detail="Machine ID must not be empty")
if len(clean) > 256:
raise HTTPException(status_code=422, detail="Machine ID too long (max 256 chars)")
return clean
def _sanitize_name(name: str) -> str:
"""Trim name and reject empty/massive names."""
clean = name.strip()
if not clean:
raise HTTPException(status_code=422, detail="Name must not be empty")
if len(clean) > 512:
raise HTTPException(status_code=422, detail="Name too long (max 512 chars)")
return clean
# ─── Pydantic Models ────────────────────────────────────────────────────────
class AssetKey(BaseModel):
key_name: str
key_type: Optional[str] = ""
class AssetBadge(BaseModel):
badge_name: str
class AssetCreate(BaseModel):
machine_id: str
name: str
serial_number: Optional[str] = ""
description: Optional[str] = ""
category: Optional[str] = "Other"
status: Optional[str] = "active"
make: Optional[str] = ""
model: Optional[str] = ""
address: Optional[str] = ""
building_name: Optional[str] = ""
building_number: Optional[str] = ""
floor: Optional[str] = ""
room: Optional[str] = ""
trailer_number: Optional[str] = ""
walking_directions: Optional[str] = ""
map_link: Optional[str] = ""
parking_location: Optional[str] = ""
photo_path: Optional[str] = None
customer_id: Optional[int] = None
location_id: Optional[int] = None
assigned_to: Optional[int] = None
latitude: Optional[float] = None
longitude: Optional[float] = None
geofence_radius_meters: Optional[int] = 50
keys: Optional[List[AssetKey]] = []
badges: Optional[List[str]] = []
class AssetUpdate(BaseModel):
machine_id: Optional[str] = None
name: Optional[str] = None
serial_number: Optional[str] = None
description: Optional[str] = None
category: Optional[str] = None
status: Optional[str] = None
make: Optional[str] = None
model: Optional[str] = None
address: Optional[str] = None
building_name: Optional[str] = None
building_number: Optional[str] = None
floor: Optional[str] = None
room: Optional[str] = None
trailer_number: Optional[str] = None
walking_directions: Optional[str] = None
map_link: Optional[str] = None
parking_location: Optional[str] = None
photo_path: Optional[str] = None
customer_id: Optional[int] = None
location_id: Optional[int] = None
assigned_to: Optional[int] = None
latitude: Optional[float] = None
longitude: Optional[float] = None
geofence_radius_meters: Optional[int] = None
keys: Optional[List[AssetKey]] = None
badges: Optional[List[str]] = None
class CheckinCreate(BaseModel):
asset_id: int
latitude: Optional[float] = None
longitude: Optional[float] = None
accuracy: Optional[float] = None
photo_path: Optional[str] = None
notes: Optional[str] = ""
user_id: Optional[int] = None
class CheckinUpdate(BaseModel):
latitude: Optional[float] = None
longitude: Optional[float] = None
accuracy: Optional[float] = None
photo_path: Optional[str] = None
notes: Optional[str] = None
user_id: Optional[int] = None
class VisitCreate(BaseModel):
user_id: Optional[int] = None
asset_id: int
latitude: Optional[float] = None
longitude: Optional[float] = None
duration_minutes: Optional[int] = None
# ─── Phase B: Customer / Location / Room / Settings Models ──────────────────
class CustomerContact(BaseModel):
name: Optional[str] = ""
phone: Optional[str] = ""
email: Optional[str] = ""
class CustomerCreate(BaseModel):
name: str
contacts: Optional[List[CustomerContact]] = []
class CustomerUpdate(BaseModel):
name: Optional[str] = None
contacts: Optional[List[CustomerContact]] = None
class LocationCreate(BaseModel):
customer_id: Optional[int] = None
name: str
address: Optional[str] = ""
building_name: Optional[str] = ""
building_number: Optional[str] = ""
floor: Optional[str] = ""
trailer_number: Optional[str] = ""
site_hours: Optional[str] = ""
access_notes: Optional[str] = ""
walking_directions: Optional[str] = ""
map_link: Optional[str] = ""
latitude: Optional[float] = None
longitude: Optional[float] = None
class LocationUpdate(BaseModel):
customer_id: Optional[int] = None
name: Optional[str] = None
address: Optional[str] = None
building_name: Optional[str] = None
building_number: Optional[str] = None
floor: Optional[str] = None
trailer_number: Optional[str] = None
site_hours: Optional[str] = None
access_notes: Optional[str] = None
walking_directions: Optional[str] = None
map_link: Optional[str] = None
latitude: Optional[float] = None
longitude: Optional[float] = None
class RoomCreate(BaseModel):
location_id: int
name: str
floor: Optional[str] = ""
class RoomUpdate(BaseModel):
name: Optional[str] = None
floor: Optional[str] = None
location_id: Optional[int] = None
# ─── Helpers ────────────────────────────────────────────────────────────────
def row_to_dict(row: sqlite3.Row) -> dict:
return dict(row)
def _geofence_row(row: sqlite3.Row, conn: Optional[sqlite3.Connection] = None) -> dict:
"""Serialize a geofence row, parsing points JSON and including assigned users."""
d = dict(row)
if isinstance(d.get("points"), str):
try:
d["points"] = _json.loads(d["points"])
except (_json.JSONDecodeError, TypeError):
pass
# Include assigned users
close_conn = False
if conn is None:
conn = get_db()
close_conn = True
try:
user_rows = conn.execute(
"""SELECT u.id, u.username, u.role FROM geofence_users gu
JOIN users u ON u.id = gu.user_id
WHERE gu.geofence_id = ? ORDER BY u.username""",
(d["id"],),
).fetchall()
d["assigned_users"] = [dict(r) for r in user_rows]
finally:
if close_conn:
conn.close()
return d
def _sync_geofence_users(conn: sqlite3.Connection, geofence_id: int, user_ids: list[int]):
"""Replace assigned users for a geofence with the given list of user IDs.
Validates all user_ids exist before modifying the junction table.
Does NOT commit — caller manages transaction boundaries.
"""
# Validate all user IDs exist
if user_ids:
placeholders = ", ".join(["?"] * len(user_ids))
existing = conn.execute(
f"SELECT id FROM users WHERE id IN ({placeholders})",
user_ids,
).fetchall()
existing_ids = {r["id"] for r in existing}
for uid in user_ids:
if uid not in existing_ids:
raise HTTPException(
status_code=422,
detail=f"User with id {uid} not found",
)
conn.execute("DELETE FROM geofence_users WHERE geofence_id = ?", (geofence_id,))
for uid in user_ids:
conn.execute(
"INSERT INTO geofence_users (geofence_id, user_id) VALUES (?, ?)",
(geofence_id, uid),
)
def _validate_category(category: str):
if category not in VALID_CATEGORIES:
raise HTTPException(
status_code=422,
detail=f"Invalid category '{category}'. Must be one of: {', '.join(sorted(VALID_CATEGORIES))}",
)
def _validate_status(status: str):
if status not in VALID_STATUSES:
raise HTTPException(
status_code=422,
detail=f"Invalid status '{status}'. Must be one of: {', '.join(sorted(VALID_STATUSES))}",
)
def _validate_ref(conn: sqlite3.Connection, table: str, id_val: int, label: str):
"""Validate a foreign key reference exists."""
if id_val is not None:
row = conn.execute(f"SELECT id FROM {table} WHERE id = ?", (id_val,)).fetchone()
if row is None:
raise HTTPException(status_code=422, detail=f"{label} with id {id_val} not found")
def _validate_enum_table(conn: sqlite3.Connection, table: str, value: str, label: str):
"""Validate that a value exists in a lookup table (name column)."""
if value:
row = conn.execute(
f"SELECT id FROM {table} WHERE name = ?", (value,)
).fetchone()
if row is None:
raise HTTPException(
status_code=422,
detail=f"Invalid {label} '{value}'. Must exist in {table} table.",
)
# ─── Task 3: Health ────────────────────────────────────────────────────────
@app.get("/health")
def health():
return {"status": "ok"}
# ─── Task 4: POST /api/assets ──────────────────────────────────────────────
def _build_asset_insert(body: AssetCreate, machine_id: str, name: str):
"""Build column names and values tuple for asset INSERT."""
columns = [
"machine_id", "serial_number", "name", "description", "category", "status",
"make", "model", "address", "building_name", "building_number", "floor",
"room", "trailer_number", "walking_directions", "map_link",
"parking_location", "photo_path", "customer_id", "location_id",
"assigned_to", "latitude", "longitude", "geofence_radius_meters",
]
values = (
machine_id, body.serial_number or "", name, body.description or "",
body.category or "Other", body.status or "active",
body.make or "", body.model or "", body.address or "",
body.building_name or "", body.building_number or "",
body.floor or "", body.room or "", body.trailer_number or "",
body.walking_directions or "", body.map_link or "",
body.parking_location or "", body.photo_path,
body.customer_id, body.location_id, body.assigned_to,
body.latitude, body.longitude,
body.geofence_radius_meters if body.geofence_radius_meters is not None else 50,
)
return columns, values
def _get_asset_keys(conn: sqlite3.Connection, asset_id: int) -> list:
"""Load all keys for an asset."""
rows = conn.execute(
"SELECT id, asset_id, key_name, key_type FROM asset_keys WHERE asset_id = ?",
(asset_id,),
).fetchall()
return [row_to_dict(r) for r in rows]
def _get_asset_badges(conn: sqlite3.Connection, asset_id: int) -> list:
"""Load all badges for an asset."""
rows = conn.execute(
"SELECT id, asset_id, badge_name FROM asset_badges WHERE asset_id = ?",
(asset_id,),
).fetchall()
return [row_to_dict(r) for r in rows]
@app.post("/api/assets", status_code=201)
def create_asset(body: AssetCreate):
machine_id = _sanitize_machine_id(body.machine_id)
name = _sanitize_name(body.name)
_validate_status(body.status or "active")
conn = get_db()
# DB-based validation for reference fields
_validate_enum_table(conn, "categories", body.category or "Other", "category")
_validate_enum_table(conn, "makes", body.make or "", "make")
if body.customer_id is not None:
_validate_ref(conn, "customers", body.customer_id, "Customer")
if body.location_id is not None:
_validate_ref(conn, "locations", body.location_id, "Location")
if body.assigned_to is not None:
_validate_ref(conn, "users", body.assigned_to, "User")
columns, values = _build_asset_insert(body, machine_id, name)
placeholders = ", ".join(["?"] * len(columns))
col_names = ", ".join(columns)
try:
cursor = conn.execute(
f"INSERT INTO assets ({col_names}) VALUES ({placeholders})",
values,
)
except sqlite3.IntegrityError:
conn.close()
raise HTTPException(
status_code=409,
detail=f"Asset with machine_id '{machine_id}' already exists",
)
asset_id = cursor.lastrowid
# Insert keys
if body.keys:
for k in body.keys:
conn.execute(
"INSERT INTO asset_keys (asset_id, key_name, key_type) VALUES (?, ?, ?)",
(asset_id, k.key_name, k.key_type or ""),
)
# Insert badges
if body.badges:
for b_name in body.badges:
conn.execute(
"INSERT INTO asset_badges (asset_id, badge_name) VALUES (?, ?)",
(asset_id, b_name),
)
conn.commit()
_log_activity(conn, "created", "asset", asset_id,
f"Asset '{name}' (machine_id: {machine_id}) created")
conn.commit()
row = conn.execute("SELECT * FROM assets WHERE id = ?", (asset_id,)).fetchone()
result = row_to_dict(row)
result["keys"] = _get_asset_keys(conn, asset_id)
result["badges"] = _get_asset_badges(conn, asset_id)
conn.close()
return result
# ─── Task 5: GET /api/assets ───────────────────────────────────────────────
@app.get("/api/assets")
def list_assets(
category: Optional[str] = Query(None),
status: Optional[str] = Query(None),
make: Optional[str] = Query(None),
model: Optional[str] = Query(None),
customer_id: Optional[int] = Query(None),
location_id: Optional[int] = Query(None),
assigned_to: Optional[int] = Query(None),
q: Optional[str] = Query(None),
limit: int = Query(100, ge=1, le=1000),
offset: int = Query(0, ge=0),
):
conn = get_db()
conditions = []
params = []
if category:
conditions.append("category = ?")
params.append(category)
if status:
conditions.append("status = ?")
params.append(status)
if make:
conditions.append("make = ?")
params.append(make)
if model:
conditions.append("model = ?")
params.append(model)
if customer_id is not None:
conditions.append("customer_id = ?")
params.append(customer_id)
if location_id is not None:
conditions.append("location_id = ?")
params.append(location_id)
if assigned_to is not None:
conditions.append("assigned_to = ?")
params.append(assigned_to)
if q:
conditions.append("(name LIKE ? OR machine_id LIKE ? OR description LIKE ?)")
like = f"%{q}%"
params.extend([like, like, like])
where = " AND ".join(conditions)
sql = "SELECT * FROM assets"
if where:
sql += f" WHERE {where}"
sql += " ORDER BY created_at DESC LIMIT ? OFFSET ?"
params.extend([limit, offset])
rows = conn.execute(sql, params).fetchall()
conn.close()
return [row_to_dict(r) for r in rows]
@app.get("/api/assets/search")
def search_by_machine_id(machine_id: str = Query(...)):
conn = get_db()
row = conn.execute(
"SELECT * FROM assets WHERE machine_id = ?", (machine_id,)
).fetchone()
conn.close()
if row is None:
raise HTTPException(status_code=404, detail="Asset not found")
return row_to_dict(row)
@app.get("/api/assets/{asset_id}")
def get_asset(asset_id: int):
conn = get_db()
row = conn.execute("SELECT * FROM assets WHERE id = ?", (asset_id,)).fetchone()
if row is None:
conn.close()
raise HTTPException(status_code=404, detail="Asset not found")
result = row_to_dict(row)
result["keys"] = _get_asset_keys(conn, asset_id)
result["badges"] = _get_asset_badges(conn, asset_id)
# Add joined names
if result.get("customer_id"):
cust = conn.execute(
"SELECT name FROM customers WHERE id = ?", (result["customer_id"],)
).fetchone()
result["customer_name"] = cust["name"] if cust else None
if result.get("location_id"):
loc = conn.execute(
"SELECT name FROM locations WHERE id = ?", (result["location_id"],)
).fetchone()
result["location_name"] = loc["name"] if loc else None
conn.close()
return result
# ─── Task 6: PUT / DELETE /api/assets/{id} ─────────────────────────────────
_TEXT_FIELDS = [
"name", "machine_id", "serial_number", "description", "make", "model",
"address", "building_name", "building_number", "floor", "room",
"trailer_number", "walking_directions", "map_link", "parking_location",
]
@app.put("/api/assets/{asset_id}")
def update_asset(asset_id: int, body: AssetUpdate):
conn = get_db()
existing = conn.execute("SELECT * FROM assets WHERE id = ?", (asset_id,)).fetchone()
if existing is None:
conn.close()
raise HTTPException(status_code=404, detail="Asset not found")
updates = {}
for field in _TEXT_FIELDS:
val = getattr(body, field, None)
if val is not None:
if field == "name":
updates[field] = _sanitize_name(val)
elif field == "machine_id":
updates[field] = _sanitize_machine_id(val)
else:
updates[field] = val
if body.category is not None:
_validate_enum_table(conn, "categories", body.category, "category")
updates["category"] = body.category
if body.status is not None:
_validate_status(body.status)
updates["status"] = body.status
if body.photo_path is not None:
updates["photo_path"] = body.photo_path
if body.customer_id is not None:
updates["customer_id"] = body.customer_id
if body.location_id is not None:
updates["location_id"] = body.location_id
if body.assigned_to is not None:
updates["assigned_to"] = body.assigned_to
if body.latitude is not None:
updates["latitude"] = body.latitude
if body.longitude is not None:
updates["longitude"] = body.longitude
if body.geofence_radius_meters is not None:
updates["geofence_radius_meters"] = body.geofence_radius_meters
if updates:
updates["updated_at"] = "datetime('now')"
set_clause = ", ".join(
f"{k} = {v}" if k == "updated_at" else f"{k} = ?"
for k, v in updates.items()
)
values = [v for k, v in updates.items() if k != "updated_at"]
conn.execute(
f"UPDATE assets SET {set_clause} WHERE id = ?",
values + [asset_id],
)
conn.commit()
row = conn.execute("SELECT * FROM assets WHERE id = ?", (asset_id,)).fetchone()
_log_activity(conn, "updated", "asset", asset_id,
f"Asset '{row['name']}' updated")
conn.commit()
conn.close()
return row_to_dict(row)
@app.delete("/api/assets/{asset_id}", status_code=204)
def delete_asset(asset_id: int):
conn = get_db()
existing = conn.execute("SELECT id FROM assets WHERE id = ?", (asset_id,)).fetchone()
if existing is None:
conn.close()
raise HTTPException(status_code=404, detail="Asset not found")
# Delete dependent rows first (visits, asset_keys, asset_badges lack ON DELETE CASCADE)
conn.execute("DELETE FROM visits WHERE asset_id = ?", (asset_id,))
conn.execute("DELETE FROM asset_keys WHERE asset_id = ?", (asset_id,))
conn.execute("DELETE FROM asset_badges WHERE asset_id = ?", (asset_id,))
conn.execute("DELETE FROM assets WHERE id = ?", (asset_id,))
_log_activity(conn, "deleted", "asset", asset_id,
f"Asset {asset_id} deleted")
conn.commit()
conn.close()
# ─── Task 8: POST /api/checkins ─────────────────────────────────────────────
@app.post("/api/checkins", status_code=201)
def create_checkin(body: CheckinCreate):
conn = get_db()
existing = conn.execute("SELECT id FROM assets WHERE id = ?", (body.asset_id,)).fetchone()
if existing is None:
conn.close()
raise HTTPException(status_code=404, detail="Asset not found")
cursor = conn.execute(
"""INSERT INTO checkins (asset_id, user_id, latitude, longitude, accuracy, photo_path, notes)
VALUES (?, ?, ?, ?, ?, ?, ?)""",
(body.asset_id, body.user_id, body.latitude, body.longitude,
body.accuracy, body.photo_path, body.notes or ""),
)
conn.commit()
checkin_id = cursor.lastrowid
# Auto-log visit
row = conn.execute("SELECT created_at FROM checkins WHERE id = ?", (checkin_id,)).fetchone()
_auto_log_visit(conn, body.user_id, body.asset_id, row["created_at"])
# Activity log
_log_activity(conn, "created", "checkin", checkin_id,
f"Check-in for asset {body.asset_id}",
user_id=body.user_id)
conn.commit()
row = conn.execute("SELECT * FROM checkins WHERE id = ?", (checkin_id,)).fetchone()
conn.close()
return row_to_dict(row)
# ─── Task 9: GET /api/checkins ──────────────────────────────────────────────
@app.get("/api/checkins")
def list_checkins(
asset_id: Optional[int] = Query(None),
user_id: Optional[int] = Query(None),
limit: int = Query(100, ge=1, le=1000),
offset: int = Query(0, ge=0),
):
conn = get_db()
conditions = []
params = []
if asset_id is not None:
conditions.append("asset_id = ?")
params.append(asset_id)
if user_id is not None:
conditions.append("user_id = ?")
params.append(user_id)
where = " AND ".join(conditions)
sql = "SELECT * FROM checkins"
if where:
sql += f" WHERE {where}"
sql += " ORDER BY created_at DESC, id DESC LIMIT ? OFFSET ?"
params.extend([limit, offset])
rows = conn.execute(sql, params).fetchall()
conn.close()
return [row_to_dict(r) for r in rows]
@app.get("/api/checkins/{checkin_id}")
def get_checkin(checkin_id: int):
conn = get_db()
row = conn.execute("SELECT * FROM checkins WHERE id = ?", (checkin_id,)).fetchone()
if row is None:
conn.close()
raise HTTPException(status_code=404, detail="Checkin not found")
conn.close()
return row_to_dict(row)
@app.put("/api/checkins/{checkin_id}")
def update_checkin(checkin_id: int, body: CheckinUpdate):
conn = get_db()
existing = conn.execute("SELECT id FROM checkins WHERE id = ?", (checkin_id,)).fetchone()
if existing is None:
conn.close()
raise HTTPException(status_code=404, detail="Checkin not found")
updates = {}
for field in ("latitude", "longitude", "accuracy", "photo_path", "notes", "user_id"):
val = getattr(body, field, None)
if val is not None:
updates[field] = val
if updates:
set_clause = ", ".join(f"{k} = ?" for k in updates)
values = list(updates.values())
conn.execute(
f"UPDATE checkins SET {set_clause} WHERE id = ?",
values + [checkin_id],
)
conn.commit()
row = conn.execute("SELECT * FROM checkins WHERE id = ?", (checkin_id,)).fetchone()
conn.close()
return row_to_dict(row)
@app.delete("/api/checkins/{checkin_id}", status_code=204)
def delete_checkin(checkin_id: int):
conn = get_db()
existing = conn.execute("SELECT id FROM checkins WHERE id = ?", (checkin_id,)).fetchone()
if existing is None:
conn.close()
raise HTTPException(status_code=404, detail="Checkin not found")
conn.execute("DELETE FROM checkins WHERE id = ?", (checkin_id,))
conn.commit()
conn.close()
# ─── Task 10: GET /api/stats ────────────────────────────────────────────────
@app.get("/api/stats")
def get_stats():
conn = get_db()
total_assets = conn.execute("SELECT COUNT(*) FROM assets").fetchone()[0]
total_checkins = conn.execute("SELECT COUNT(*) FROM checkins").fetchone()[0]
cats = conn.execute(
"SELECT category, COUNT(*) AS cnt FROM assets GROUP BY category ORDER BY cnt DESC"
).fetchall()
by_category = {r["category"]: r["cnt"] for r in cats}
statuses = conn.execute(
"SELECT status, COUNT(*) AS cnt FROM assets GROUP BY status ORDER BY cnt DESC"
).fetchall()
by_status = {r["status"]: r["cnt"] for r in statuses}
# Enhanced: top visited assets
top_visited = conn.execute(
"""SELECT a.name, a.machine_id, COUNT(*) AS visit_count,
MAX(v.checkin_time) AS last_visit_date
FROM visits v JOIN assets a ON v.asset_id = a.id
GROUP BY v.asset_id ORDER BY visit_count DESC LIMIT 10"""
).fetchall()
top_visited_list = [
{"name": r["name"], "machine_id": r["machine_id"],
"visit_count": r["visit_count"], "last_visit_date": r["last_visit_date"]}
for r in top_visited
]
# Enhanced: time on site per technician
time_on_site_rows = conn.execute(
"""SELECT u.username, COUNT(v.id) AS visit_count,
SUM(CASE WHEN v.checkout_time IS NOT NULL AND v.checkin_time IS NOT NULL
THEN (julianday(v.checkout_time) - julianday(v.checkin_time)) * 1440
ELSE 0 END) AS total_minutes
FROM visits v JOIN users u ON v.user_id = u.id
WHERE u.role = 'technician'
GROUP BY v.user_id ORDER BY total_minutes DESC"""
).fetchall()
time_on_site = [
{"username": r["username"], "visit_count": r["visit_count"],
"total_minutes": round(r["total_minutes"] or 0, 1)}
for r in time_on_site_rows
]
# Enhanced: assets by make/manufacturer
makes = conn.execute(
"SELECT make, COUNT(*) AS cnt FROM assets WHERE make != '' GROUP BY make ORDER BY cnt DESC"
).fetchall()
by_make = {r["make"]: r["cnt"] for r in makes}
conn.close()
return {
"total_assets": total_assets,
"total_checkins": total_checkins,
"by_category": by_category,
"by_status": by_status,
"top_visited": top_visited_list,
"time_on_site": time_on_site,
"by_make": by_make,
}
# ─── Task 11: CSV Export ────────────────────────────────────────────────────
def _generate_csv(rows, fieldnames):
"""Yield CSV rows as a string generator for StreamingResponse."""
output = io.StringIO()
writer = csv.DictWriter(output, fieldnames=fieldnames)
writer.writeheader()
yield output.getvalue()
output.seek(0)
output.truncate(0)
for row in rows:
writer.writerow(row_to_dict(row))
yield output.getvalue()
output.seek(0)
output.truncate(0)
_ASSET_CSV_FIELDS = [
"id", "machine_id", "serial_number", "name", "description", "category",
"status", "make", "model", "address", "building_name", "building_number",
"floor", "room", "trailer_number", "walking_directions", "map_link",
"parking_location", "photo_path", "customer_id", "location_id",
"assigned_to", "latitude", "longitude", "geofence_radius_meters",
"created_at", "updated_at",
]
_CHECKIN_CSV_FIELDS = [
"id", "asset_id", "user_id", "latitude", "longitude", "accuracy",
"photo_path", "notes", "created_at",
]
@app.get("/api/export/assets")
def export_assets_csv():
conn = get_db()
rows = conn.execute("SELECT * FROM assets ORDER BY created_at DESC").fetchall()
conn.close()
return StreamingResponse(
_generate_csv(rows, _ASSET_CSV_FIELDS),
media_type="text/csv",
headers={"Content-Disposition": "attachment; filename=assets.csv"},
)
@app.get("/api/export/checkins")
def export_checkins_csv(asset_id: Optional[int] = Query(None)):
conn = get_db()
if asset_id is not None:
rows = conn.execute(
"SELECT * FROM checkins WHERE asset_id = ? ORDER BY created_at DESC, id DESC",
(asset_id,),
).fetchall()
else:
rows = conn.execute("SELECT * FROM checkins ORDER BY created_at DESC").fetchall()
conn.close()
return StreamingResponse(
_generate_csv(rows, _CHECKIN_CSV_FIELDS),
media_type="text/csv",
headers={"Content-Disposition": "attachment; filename=checkins.csv"},
)
# ─── Phase B: Customers API ──────────────────────────────────────────────────
@app.post("/api/customers", status_code=201)
def create_customer(body: CustomerCreate):
name = body.name.strip()
if not name:
raise HTTPException(status_code=422, detail="Customer name must not be empty")
conn = get_db()
try:
cursor = conn.execute("INSERT INTO customers (name) VALUES (?)", (name,))
cust_id = cursor.lastrowid
if body.contacts:
for c in body.contacts:
conn.execute(
"INSERT INTO customer_contacts (customer_id, name, phone, email) VALUES (?, ?, ?, ?)",
(cust_id, c.name or "", c.phone or "", c.email or ""),
)
conn.commit()
_log_activity(conn, "created", "customer", cust_id,
f"Customer '{name}' created")
conn.commit()
except sqlite3.IntegrityError:
conn.close()
raise HTTPException(status_code=409, detail=f"Customer '{name}' already exists")
row = conn.execute("SELECT * FROM customers WHERE id = ?", (cust_id,)).fetchone()
result = row_to_dict(row)
result["contacts"] = _get_customer_contacts(conn, cust_id)
conn.close()
return result
def _get_customer_contacts(conn: sqlite3.Connection, cust_id: int) -> list:
rows = conn.execute(
"SELECT id, customer_id, name, phone, email FROM customer_contacts WHERE customer_id = ?",
(cust_id,),
).fetchall()
return [row_to_dict(r) for r in rows]
@app.get("/api/customers")
def list_customers():
conn = get_db()
rows = conn.execute("SELECT * FROM customers ORDER BY name").fetchall()
result = []
for r in rows:
d = row_to_dict(r)
d["contacts"] = _get_customer_contacts(conn, r["id"])
result.append(d)
conn.close()
return result
@app.get("/api/customers/{cust_id}")
def get_customer(cust_id: int):
conn = get_db()
row = conn.execute("SELECT * FROM customers WHERE id = ?", (cust_id,)).fetchone()
if row is None:
conn.close()
raise HTTPException(status_code=404, detail="Customer not found")
result = row_to_dict(row)
result["contacts"] = _get_customer_contacts(conn, cust_id)
conn.close()
return result
@app.put("/api/customers/{cust_id}")
def update_customer(cust_id: int, body: CustomerUpdate):
conn = get_db()
existing = conn.execute("SELECT id FROM customers WHERE id = ?", (cust_id,)).fetchone()
if existing is None:
conn.close()
raise HTTPException(status_code=404, detail="Customer not found")
if body.name is not None:
name = body.name.strip()
if not name:
conn.close()
raise HTTPException(status_code=422, detail="Customer name must not be empty")
conn.execute("UPDATE customers SET name = ?, updated_at = datetime('now') WHERE id = ?", (name, cust_id))
if body.contacts is not None:
# Replace all contacts
conn.execute("DELETE FROM customer_contacts WHERE customer_id = ?", (cust_id,))
for c in body.contacts:
conn.execute(
"INSERT INTO customer_contacts (customer_id, name, phone, email) VALUES (?, ?, ?, ?)",
(cust_id, c.name or "", c.phone or "", c.email or ""),
)
conn.commit()
row = conn.execute("SELECT * FROM customers WHERE id = ?", (cust_id,)).fetchone()
result = row_to_dict(row)
result["contacts"] = _get_customer_contacts(conn, cust_id)
_log_activity(conn, "updated", "customer", cust_id,
f"Customer '{result['name']}' updated")
conn.commit()
conn.close()
return result
@app.delete("/api/customers/{cust_id}", status_code=204)
def delete_customer(cust_id: int):
conn = get_db()
existing = conn.execute("SELECT id FROM customers WHERE id = ?", (cust_id,)).fetchone()
if existing is None:
conn.close()
raise HTTPException(status_code=404, detail="Customer not found")
conn.execute("DELETE FROM customers WHERE id = ?", (cust_id,))
_log_activity(conn, "deleted", "customer", cust_id,
f"Customer {cust_id} deleted")
conn.commit()
conn.close()
# ─── Phase B: Locations API ──────────────────────────────────────────────────
def _get_location_rooms(conn: sqlite3.Connection, loc_id: int) -> list:
rows = conn.execute(
"SELECT id, location_id, name, floor, created_at, updated_at FROM rooms WHERE location_id = ? ORDER BY name",
(loc_id,),
).fetchall()
return [row_to_dict(r) for r in rows]
@app.post("/api/locations", status_code=201)
def create_location(body: LocationCreate):
name = body.name.strip()
if not name:
raise HTTPException(status_code=422, detail="Location name must not be empty")
conn = get_db()
if body.customer_id is not None:
_validate_ref(conn, "customers", body.customer_id, "Customer")
cursor = conn.execute(
"""INSERT INTO locations (customer_id, name, address, building_name, building_number,
floor, trailer_number, site_hours, access_notes, walking_directions, map_link,
latitude, longitude)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(body.customer_id, name, body.address or "", body.building_name or "",
body.building_number or "", body.floor or "", body.trailer_number or "",
body.site_hours or "", body.access_notes or "",
body.walking_directions or "", body.map_link or "",
body.latitude, body.longitude),
)
conn.commit()
loc_id = cursor.lastrowid
row = conn.execute("SELECT * FROM locations WHERE id = ?", (loc_id,)).fetchone()
result = row_to_dict(row)
result["rooms"] = []
conn.close()
return result
@app.get("/api/locations")
def list_locations(customer_id: Optional[int] = Query(None)):
conn = get_db()
if customer_id is not None:
rows = conn.execute(
"SELECT * FROM locations WHERE customer_id = ? ORDER BY name",
(customer_id,),
).fetchall()
else:
rows = conn.execute("SELECT * FROM locations ORDER BY name").fetchall()
result = []
for r in rows:
d = row_to_dict(r)
d["rooms"] = _get_location_rooms(conn, r["id"])
result.append(d)
conn.close()
return result
@app.get("/api/locations/{loc_id}")
def get_location(loc_id: int):
conn = get_db()
row = conn.execute("SELECT * FROM locations WHERE id = ?", (loc_id,)).fetchone()
if row is None:
conn.close()
raise HTTPException(status_code=404, detail="Location not found")
result = row_to_dict(row)
result["rooms"] = _get_location_rooms(conn, loc_id)
conn.close()
return result
_LOCATION_FIELDS = [
"customer_id", "name", "address", "building_name", "building_number",
"floor", "trailer_number", "site_hours", "access_notes",
"walking_directions", "map_link", "latitude", "longitude",
]
@app.put("/api/locations/{loc_id}")
def update_location(loc_id: int, body: LocationUpdate):
conn = get_db()
existing = conn.execute("SELECT id FROM locations WHERE id = ?", (loc_id,)).fetchone()
if existing is None:
conn.close()
raise HTTPException(status_code=404, detail="Location not found")
updates = {}
for field in _LOCATION_FIELDS:
val = getattr(body, field, None)
if val is not None:
if field == "name":
name = val.strip()
if not name:
conn.close()
raise HTTPException(status_code=422, detail="Location name must not be empty")
updates[field] = name
elif field == "customer_id":
if val is not None:
_validate_ref(conn, "customers", val, "Customer")
updates[field] = val
elif field in ("latitude", "longitude"):
updates[field] = val
else:
updates[field] = val or ""
if updates:
updates["updated_at"] = "datetime('now')"
set_clause = ", ".join(
f"{k} = {v}" if k == "updated_at" else f"{k} = ?"
for k, v in updates.items()
)
values = [v for k, v in updates.items() if k != "updated_at"]
conn.execute(
f"UPDATE locations SET {set_clause} WHERE id = ?",
values + [loc_id],
)
conn.commit()
row = conn.execute("SELECT * FROM locations WHERE id = ?", (loc_id,)).fetchone()
result = row_to_dict(row)
result["rooms"] = _get_location_rooms(conn, loc_id)
conn.close()
return result
@app.delete("/api/locations/{loc_id}", status_code=204)
def delete_location(loc_id: int):
conn = get_db()
existing = conn.execute("SELECT id FROM locations WHERE id = ?", (loc_id,)).fetchone()
if existing is None:
conn.close()
raise HTTPException(status_code=404, detail="Location not found")
conn.execute("DELETE FROM locations WHERE id = ?", (loc_id,))
conn.commit()
conn.close()
# ─── Phase B: Rooms API ──────────────────────────────────────────────────────
@app.get("/api/rooms")
def list_rooms(location_id: Optional[int] = Query(None)):
"""List rooms, optionally filtered by location_id."""
conn = get_db()
if location_id is not None:
rows = conn.execute(
"SELECT * FROM rooms WHERE location_id = ? ORDER BY name",
(location_id,),
).fetchall()
else:
rows = conn.execute("SELECT * FROM rooms ORDER BY name").fetchall()
conn.close()
return [row_to_dict(r) for r in rows]
@app.get("/api/rooms/{room_id}")
def get_room(room_id: int):
"""Get a single room by id."""
conn = get_db()
row = conn.execute("SELECT * FROM rooms WHERE id = ?", (room_id,)).fetchone()
conn.close()
if row is None:
raise HTTPException(status_code=404, detail="Room not found")
return row_to_dict(row)
@app.post("/api/rooms", status_code=201)
def create_room(body: RoomCreate):
conn = get_db()
_validate_ref(conn, "locations", body.location_id, "Location")
cursor = conn.execute(
"INSERT INTO rooms (location_id, name, floor) VALUES (?, ?, ?)",
(body.location_id, body.name.strip(), body.floor or ""),
)
conn.commit()
room_id = cursor.lastrowid
row = conn.execute("SELECT * FROM rooms WHERE id = ?", (room_id,)).fetchone()
conn.close()
return row_to_dict(row)
@app.put("/api/rooms/{room_id}")
def update_room(room_id: int, body: RoomUpdate):
conn = get_db()
existing = conn.execute("SELECT id FROM rooms WHERE id = ?", (room_id,)).fetchone()
if existing is None:
conn.close()
raise HTTPException(status_code=404, detail="Room not found")
updates = {}
if body.name is not None:
updates["name"] = body.name.strip()
if body.floor is not None:
updates["floor"] = body.floor
if body.location_id is not None:
_validate_ref(conn, "locations", body.location_id, "Location")
updates["location_id"] = body.location_id
if updates:
updates["updated_at"] = "datetime('now')"
set_clause = ", ".join(
f"{k} = {v}" if k == "updated_at" else f"{k} = ?"
for k, v in updates.items()
)
values = [v for k, v in updates.items() if k != "updated_at"]
conn.execute(
f"UPDATE rooms SET {set_clause} WHERE id = ?",
values + [room_id],
)
conn.commit()
row = conn.execute("SELECT * FROM rooms WHERE id = ?", (room_id,)).fetchone()
conn.close()
return row_to_dict(row)
@app.delete("/api/rooms/{room_id}", status_code=204)
def delete_room(room_id: int):
conn = get_db()
existing = conn.execute("SELECT id FROM rooms WHERE id = ?", (room_id,)).fetchone()
if existing is None:
conn.close()
raise HTTPException(status_code=404, detail="Room not found")
conn.execute("DELETE FROM rooms WHERE id = ?", (room_id,))
conn.commit()
conn.close()
# ─── Phase B: Settings API ───────────────────────────────────────────────────
_SETTINGS_SCHEMAS = {
"categories": {"table": "categories", "columns": ["name", "icon"]},
"makes": {"table": "makes", "columns": ["name"]},
"models": {"table": "models", "columns": ["make_id", "name", "icon_path"]},
"key_names": {"table": "key_names", "columns": ["name"]},
"key_types": {"table": "key_types", "columns": ["name"]},
"badge_types": {"table": "badge_types", "columns": ["name"]},
}
def _settings_list(conn: sqlite3.Connection, table: str):
if table == "models":
rows = conn.execute("SELECT * FROM models ORDER BY name").fetchall()
else:
rows = conn.execute(f"SELECT * FROM {table} ORDER BY name").fetchall()
return [row_to_dict(r) for r in rows]
def _settings_create(conn: sqlite3.Connection, entity: str, body: dict):
schema = _SETTINGS_SCHEMAS[entity]
table = schema["table"]
columns = schema["columns"]
values = []
for col in columns:
val = body.get(col, "" if col != "make_id" else None)
if col == "make_id" and val is None:
raise HTTPException(status_code=422, detail="make_id is required for models")
values.append(val if val is not None else "")
placeholders = ", ".join(["?"] * len(columns))
col_names = ", ".join(columns)
try:
cursor = conn.execute(
f"INSERT INTO {table} ({col_names}) VALUES ({placeholders})", values
)
conn.commit()
eid = cursor.lastrowid
row = conn.execute(f"SELECT * FROM {table} WHERE id = ?", (eid,)).fetchone()
return row_to_dict(row)
except sqlite3.IntegrityError:
raise HTTPException(status_code=409, detail=f"Entry already exists in {entity}")
def _settings_update(conn: sqlite3.Connection, entity: str, eid: int, body: dict):
schema = _SETTINGS_SCHEMAS[entity]
table = schema["table"]
columns = schema["columns"]
existing = conn.execute(f"SELECT id FROM {table} WHERE id = ?", (eid,)).fetchone()
if existing is None:
raise HTTPException(status_code=404, detail=f"{entity[:-1]} not found")
updates = {}
for col in columns:
if col in body:
updates[col] = body[col]
if updates:
set_clause = ", ".join(f"{k} = ?" for k in updates)
values = list(updates.values())
conn.execute(
f"UPDATE {table} SET {set_clause} WHERE id = ?", values + [eid]
)
conn.commit()
row = conn.execute(f"SELECT * FROM {table} WHERE id = ?", (eid,)).fetchone()
return row_to_dict(row)
def _settings_get(conn: sqlite3.Connection, entity: str, eid: int):
"""Get a single settings item by id. Returns dict or raises 404."""
schema = _SETTINGS_SCHEMAS[entity]
table = schema["table"]
row = conn.execute(f"SELECT * FROM {table} WHERE id = ?", (eid,)).fetchone()
if row is None:
raise HTTPException(status_code=404, detail=f"{entity[:-1]} not found")
return row_to_dict(row)
def _settings_delete(conn: sqlite3.Connection, entity: str, eid: int):
schema = _SETTINGS_SCHEMAS[entity]
table = schema["table"]
existing = conn.execute(f"SELECT id FROM {table} WHERE id = ?", (eid,)).fetchone()
if existing is None:
raise HTTPException(status_code=404, detail=f"{entity[:-1]} not found")
conn.execute(f"DELETE FROM {table} WHERE id = ?", (eid,))
conn.commit()
# ─── Dynamic settings routes ─────────────────────────────────────────────────
# Register CRUD for each settings entity. Note: these use raw request body
# to avoid Pydantic model explosion — the schemas are validated by
# _SETTINGS_SCHEMAS above.
_SETTINGS_ENTITIES = ["categories", "makes", "models", "key_names", "key_types", "badge_types"]
for _ent in _SETTINGS_ENTITIES:
@app.get(f"/api/settings/{_ent}", name=f"list_{_ent}")
def _list(entity=_ent):
conn = get_db()
result = _settings_list(conn, entity)
conn.close()
return result
@app.get(f"/api/settings/{_ent}/{{eid}}", name=f"get_{_ent}")
def _get(eid: int, entity=_ent):
conn = get_db()
try:
result = _settings_get(conn, entity, eid)
conn.close()
return result
except HTTPException:
conn.close()
raise
@app.post(f"/api/settings/{_ent}", status_code=201, name=f"create_{_ent}")
async def _create(request: Request, entity=_ent):
body = await request.json()
conn = get_db()
try:
result = _settings_create(conn, entity, body)
conn.close()
return result
except HTTPException:
conn.close()
raise
@app.put(f"/api/settings/{_ent}/{{eid}}", name=f"update_{_ent}")
async def _update(eid: int, request: Request, entity=_ent):
body = await request.json()
conn = get_db()
try:
result = _settings_update(conn, entity, eid, body)
conn.close()
return result
except HTTPException:
conn.close()
raise
@app.delete(f"/api/settings/{_ent}/{{eid}}", status_code=204, name=f"delete_{_ent}")
def _delete(eid: int, entity=_ent):
conn = get_db()
try:
_settings_delete(conn, entity, eid)
conn.close()
except HTTPException:
conn.close()
raise
# ─── Phase C: Helpers ────────────────────────────────────────────────────────
VALID_ROLES = {"admin", "technician", "readonly"}
def _hash_password(password: str) -> str:
"""Simple SHA-256 password hashing."""
return hashlib.sha256(password.encode()).hexdigest()
def _generate_token() -> str:
"""Generate a random session token."""
return secrets.token_hex(32)
def _log_activity(conn: sqlite3.Connection, action: str, entity_type: str,
entity_id: int, details: str = "", user_id: int = None):
"""Insert an activity log entry."""
conn.execute(
"""INSERT INTO activity_log (user_id, action, entity_type, entity_id, details)
VALUES (?, ?, ?, ?, ?)""",
(user_id, action, entity_type, entity_id, details),
)
def _user_to_dict(row: sqlite3.Row) -> dict:
"""Convert user row to dict, excluding password_hash."""
d = row_to_dict(row)
d.pop("password_hash", None)
return d
# ─── Phase C: Pydantic Models ────────────────────────────────────────────────
class UserCreate(BaseModel):
username: str
password: str
role: Optional[str] = "technician"
class UserUpdate(BaseModel):
role: Optional[str] = None
password: Optional[str] = None
class LoginRequest(BaseModel):
username: str
password: str
class GeofenceCreate(BaseModel):
name: str
points: list
color: Optional[str] = "#3388ff"
user_ids: Optional[list[int]] = None
class GeofenceUpdate(BaseModel):
name: Optional[str] = None
points: Optional[list] = None
color: Optional[str] = None
user_ids: Optional[list[int]] = None
class GeofencePointCheck(BaseModel):
lat: float
lng: float
# ─── Phase C: Users API ──────────────────────────────────────────────────────
@app.post("/api/users", status_code=201)
def create_user(body: UserCreate):
username = body.username.strip()
if not username:
raise HTTPException(status_code=422, detail="Username must not be empty")
password = body.password
if not password:
raise HTTPException(status_code=422, detail="Password must not be empty")
role = body.role or "technician"
if role not in VALID_ROLES:
raise HTTPException(
status_code=422,
detail=f"Invalid role '{role}'. Must be one of: {', '.join(sorted(VALID_ROLES))}",
)
conn = get_db()
password_hash = _hash_password(password)
try:
cursor = conn.execute(
"INSERT INTO users (username, password_hash, role) VALUES (?, ?, ?)",
(username, password_hash, role),
)
conn.commit()
uid = cursor.lastrowid
_log_activity(conn, "created", "user", uid, f"User '{username}' created")
conn.commit()
except sqlite3.IntegrityError:
conn.close()
raise HTTPException(status_code=409, detail=f"User '{username}' already exists")
row = conn.execute("SELECT * FROM users WHERE id = ?", (uid,)).fetchone()
conn.close()
return _user_to_dict(row)
@app.get("/api/users")
def list_users():
conn = get_db()
rows = conn.execute("SELECT * FROM users ORDER BY username").fetchall()
conn.close()
return [_user_to_dict(r) for r in rows]
@app.get("/api/users/{user_id}")
def get_user(user_id: int):
conn = get_db()
row = conn.execute("SELECT * FROM users WHERE id = ?", (user_id,)).fetchone()
if row is None:
conn.close()
raise HTTPException(status_code=404, detail="User not found")
conn.close()
return _user_to_dict(row)
@app.put("/api/users/{user_id}")
def update_user(user_id: int, body: UserUpdate):
conn = get_db()
existing = conn.execute("SELECT * FROM users WHERE id = ?", (user_id,)).fetchone()
if existing is None:
conn.close()
raise HTTPException(status_code=404, detail="User not found")
if body.role is not None:
if body.role not in VALID_ROLES:
conn.close()
raise HTTPException(
status_code=422,
detail=f"Invalid role '{body.role}'. Must be one of: {', '.join(sorted(VALID_ROLES))}",
)
conn.execute("UPDATE users SET role = ? WHERE id = ?", (body.role, user_id))
if body.password is not None:
pw = body.password.strip()
if not pw:
conn.close()
raise HTTPException(status_code=422, detail="Password must not be empty")
password_hash = _hash_password(pw)
conn.execute("UPDATE users SET password_hash = ? WHERE id = ?", (password_hash, user_id))
conn.commit()
row = conn.execute("SELECT * FROM users WHERE id = ?", (user_id,)).fetchone()
conn.close()
return _user_to_dict(row)
@app.delete("/api/users/{user_id}", status_code=204)
def delete_user(user_id: int):
conn = get_db()
existing = conn.execute("SELECT id FROM users WHERE id = ?", (user_id,)).fetchone()
if existing is None:
conn.close()
raise HTTPException(status_code=404, detail="User not found")
conn.execute("DELETE FROM users WHERE id = ?", (user_id,))
conn.commit()
conn.close()
# ─── User's assigned geofences (service areas) ──────────────────────────
@app.get("/api/users/{user_id}/geofences")
def list_user_geofences(user_id: int):
"""List geofences assigned to a user (their service areas)."""
conn = get_db()
existing = conn.execute("SELECT id FROM users WHERE id = ?", (user_id,)).fetchone()
if existing is None:
conn.close()
raise HTTPException(status_code=404, detail="User not found")
rows = conn.execute(
"""SELECT g.* FROM geofences g
JOIN geofence_users gu ON gu.geofence_id = g.id
WHERE gu.user_id = ? ORDER BY g.name""",
(user_id,),
).fetchall()
result = [_geofence_row(r, conn) for r in rows]
conn.close()
return result
# ─── Phase C: Auth API ───────────────────────────────────────────────────────
@app.post("/api/auth/login")
def login(body: LoginRequest):
conn = get_db()
row = conn.execute(
"SELECT * FROM users WHERE username = ?", (body.username,)
).fetchone()
if row is None:
conn.close()
raise HTTPException(status_code=401, detail="Invalid username or password")
password_hash = _hash_password(body.password)
if password_hash != row["password_hash"]:
conn.close()
raise HTTPException(status_code=401, detail="Invalid username or password")
token = _generate_token()
conn.execute(
"INSERT INTO sessions (user_id, token) VALUES (?, ?)",
(row["id"], token),
)
conn.commit()
conn.close()
result = _user_to_dict(row)
result["token"] = token
return result
@app.get("/api/auth/me")
def auth_me(request: Request):
"""Return the current authenticated user from the Authorization header."""
auth_header = request.headers.get("Authorization", "")
if not auth_header.startswith("Bearer "):
raise HTTPException(status_code=401, detail="Missing or invalid Authorization header")
token = auth_header[7:]
conn = get_db()
row = conn.execute(
"SELECT u.* FROM users u JOIN sessions s ON u.id = s.user_id WHERE s.token = ?",
(token,),
).fetchone()
conn.close()
if row is None:
raise HTTPException(status_code=401, detail="Invalid or expired token")
return _user_to_dict(row)
# ─── Phase C: Geofences API ──────────────────────────────────────────────────
@app.post("/api/geofences", status_code=201)
def create_geofence(body: GeofenceCreate):
conn = get_db()
points_json = _json.dumps(body.points)
cursor = conn.execute(
"INSERT INTO geofences (name, points, color) VALUES (?, ?, ?)",
(body.name, points_json, body.color or "#3388ff"),
)
gid = cursor.lastrowid
# Assign users if provided
if body.user_ids:
try:
_sync_geofence_users(conn, gid, body.user_ids)
except Exception:
conn.close()
raise HTTPException(status_code=422, detail="One or more user IDs are invalid")
_log_activity(conn, "created", "geofence", gid, f"Geofence '{body.name}' created")
conn.commit()
row = conn.execute("SELECT * FROM geofences WHERE id = ?", (gid,)).fetchone()
result = _geofence_row(row, conn)
conn.close()
return result
@app.get("/api/geofences")
def list_geofences():
conn = get_db()
rows = conn.execute("SELECT * FROM geofences ORDER BY name").fetchall()
result = [_geofence_row(r, conn) for r in rows]
conn.close()
return result
@app.put("/api/geofences/{geofence_id}")
def update_geofence(geofence_id: int, body: GeofenceUpdate):
conn = get_db()
existing = conn.execute(
"SELECT id FROM geofences WHERE id = ?", (geofence_id,)
).fetchone()
if existing is None:
conn.close()
raise HTTPException(status_code=404, detail="Geofence not found")
updates = {}
if body.name is not None:
updates["name"] = body.name
if body.points is not None:
updates["points"] = _json.dumps(body.points)
if body.color is not None:
updates["color"] = body.color
if updates:
updates["updated_at"] = "datetime('now')"
set_clause = ", ".join(
f"{k} = {v}" if k == "updated_at" else f"{k} = ?"
for k, v in updates.items()
)
values = [v for k, v in updates.items() if k != "updated_at"]
conn.execute(
f"UPDATE geofences SET {set_clause} WHERE id = ?",
values + [geofence_id],
)
conn.commit()
# Sync assigned users if provided
if body.user_ids is not None:
_sync_geofence_users(conn, geofence_id, body.user_ids)
conn.commit()
row = conn.execute(
"SELECT * FROM geofences WHERE id = ?", (geofence_id,)
).fetchone()
result = _geofence_row(row, conn)
conn.close()
return result
@app.delete("/api/geofences/{geofence_id}", status_code=204)
def delete_geofence(geofence_id: int):
conn = get_db()
existing = conn.execute(
"SELECT id FROM geofences WHERE id = ?", (geofence_id,)
).fetchone()
if existing is None:
conn.close()
raise HTTPException(status_code=404, detail="Geofence not found")
conn.execute("DELETE FROM geofences WHERE id = ?", (geofence_id,))
conn.commit()
conn.close()
# ─── Phase 0: Proximity & Geofence Check ─────────────────────────────────────
def _point_in_polygon(lat: float, lng: float, polygon: list) -> bool:
"""Ray-casting algorithm for point-in-polygon test."""
inside = False
n = len(polygon)
if n < 3:
return False
j = n - 1
for i in range(n):
yi = polygon[i]["lat"] if isinstance(polygon[i], dict) else polygon[i][0]
xi = polygon[i]["lng"] if isinstance(polygon[i], dict) else polygon[i][1]
yj = polygon[j]["lat"] if isinstance(polygon[j], dict) else polygon[j][0]
xj = polygon[j]["lng"] if isinstance(polygon[j], dict) else polygon[j][1]
if ((yi > lat) != (yj > lat)) and (lng < (xj - xi) * (lat - yi) / (yj - yi) + xi):
inside = not inside
j = i
return inside
@app.get("/api/proximity")
def proximity_check(
lat: float = Query(...),
lng: float = Query(...),
radius_meters: int = Query(200, ge=1, le=50000),
):
"""
Return assets within radius_meters of (lat, lng), sorted by distance.
Uses Haversine formula for accurate spherical distance.
"""
conn = get_db()
rows = conn.execute("""
SELECT * FROM (
SELECT *, (
6371000 * acos(
cos(radians(?)) * cos(radians(latitude)) *
cos(radians(longitude) - radians(?)) +
sin(radians(?)) * sin(radians(latitude))
)
) AS distance_meters
FROM assets
WHERE latitude IS NOT NULL AND longitude IS NOT NULL
)
WHERE distance_meters <= ?
ORDER BY distance_meters
LIMIT 50
""", (lat, lng, lat, radius_meters)).fetchall()
conn.close()
results = [row_to_dict(r) for r in rows]
return results
@app.post("/api/geofences/check")
def check_geofence_point(body: GeofencePointCheck):
"""
Check if a GPS point falls inside any geofence polygon.
Returns list of matching geofences.
"""
conn = get_db()
rows = conn.execute("SELECT * FROM geofences ORDER BY name").fetchall()
conn.close()
matches = []
for row in rows:
points = _json.loads(row["points"])
if _point_in_polygon(body.lat, body.lng, points):
matches.append(row_to_dict(row))
return matches
# ─── Phase C: Visits API & Auto-visit Logging ────────────────────────────────
def _auto_log_visit(conn: sqlite3.Connection, user_id: int, asset_id: int,
checkin_time: str):
"""Check if a visit should be logged for recent check-ins at same asset by same user."""
if user_id is None:
return
# Find the most recent visit for this user+asset
prev = conn.execute(
"""SELECT id, checkin_time FROM visits
WHERE user_id = ? AND asset_id = ?
ORDER BY checkin_time DESC LIMIT 1""",
(user_id, asset_id),
).fetchone()
if prev is None:
# No prior visit — check if there are at least 2 check-ins in the window
rows = conn.execute(
"""SELECT id, created_at FROM checkins
WHERE user_id = ? AND asset_id = ?
ORDER BY created_at DESC LIMIT 2""",
(user_id, asset_id),
).fetchall()
if len(rows) >= 2:
t1 = rows[-1]["created_at"]
t2 = rows[0]["created_at"]
conn.execute(
"""INSERT INTO visits (user_id, asset_id, checkin_time, checkout_time)
VALUES (?, ?, ?, ?)""",
(user_id, asset_id, t1, t2),
)
else:
conn.execute(
"UPDATE visits SET checkout_time = ? WHERE id = ?",
(checkin_time, prev["id"]),
)
@app.post("/api/visits", status_code=201)
def create_visit(body: VisitCreate):
conn = get_db()
existing = conn.execute("SELECT id FROM assets WHERE id = ?", (body.asset_id,)).fetchone()
if existing is None:
conn.close()
raise HTTPException(status_code=404, detail="Asset not found")
cursor = conn.execute(
"""INSERT INTO visits (user_id, asset_id, checkin_time, checkout_time, duration_minutes)
VALUES (?, ?, datetime('now'), datetime('now'), ?)""",
(body.user_id, body.asset_id, body.duration_minutes or 0),
)
conn.commit()
visit_id = cursor.lastrowid
# Activity log
_log_activity(conn, "created", "visit", visit_id,
f"Visit logged for asset {body.asset_id}",
user_id=body.user_id)
conn.commit()
row = conn.execute("SELECT * FROM visits WHERE id = ?", (visit_id,)).fetchone()
conn.close()
return row_to_dict(row)
@app.get("/api/visits")
def list_visits(
asset_id: Optional[int] = Query(None),
user_id: Optional[int] = Query(None),
date_from: Optional[str] = Query(None),
date_to: Optional[str] = Query(None),
limit: int = Query(100, ge=1, le=1000),
offset: int = Query(0, ge=0),
):
conn = get_db()
conditions = []
params = []
if asset_id is not None:
conditions.append("v.asset_id = ?")
params.append(asset_id)
if user_id is not None:
conditions.append("v.user_id = ?")
params.append(user_id)
if date_from:
conditions.append("v.checkin_time >= ?")
params.append(date_from)
if date_to:
conditions.append("v.checkin_time <= ?")
params.append(date_to + " 23:59:59")
where = " AND ".join(conditions)
sql = """SELECT v.*, a.name AS asset_name, a.machine_id,
u.username AS user_name
FROM visits v
LEFT JOIN assets a ON v.asset_id = a.id
LEFT JOIN users u ON v.user_id = u.id"""
if where:
sql += f" WHERE {where}"
sql += " ORDER BY v.checkin_time DESC LIMIT ? OFFSET ?"
params.extend([limit, offset])
rows = conn.execute(sql, params).fetchall()
conn.close()
return [row_to_dict(r) for r in rows]
@app.get("/api/visits/stats")
def get_visit_stats():
conn = get_db()
total_visits = conn.execute("SELECT COUNT(*) FROM visits").fetchone()[0]
per_asset = conn.execute(
"""SELECT a.name, a.machine_id, COUNT(*) AS cnt
FROM visits v JOIN assets a ON v.asset_id = a.id
GROUP BY v.asset_id ORDER BY cnt DESC"""
).fetchall()
visits_per_asset = [
{"name": r["name"], "machine_id": r["machine_id"], "count": r["cnt"]}
for r in per_asset
]
per_user = conn.execute(
"""SELECT u.username, COUNT(*) AS visit_count,
SUM(
CASE WHEN v.checkout_time IS NOT NULL AND v.checkin_time IS NOT NULL
THEN (julianday(v.checkout_time) - julianday(v.checkin_time)) * 1440
ELSE 0 END
) AS total_minutes
FROM visits v JOIN users u ON v.user_id = u.id
GROUP BY v.user_id ORDER BY total_minutes DESC"""
).fetchall()
time_on_site = [
{"username": r["username"], "visit_count": r["visit_count"],
"total_minutes": round(r["total_minutes"] or 0, 1)}
for r in per_user
]
conn.close()
return {
"total_visits": total_visits,
"visits_per_asset": visits_per_asset,
"time_on_site": time_on_site,
}
# ─── Phase C: Activity Feed API ──────────────────────────────────────────────
@app.get("/api/activity")
def list_activity(
user_id: Optional[int] = Query(None),
entity_type: Optional[str] = Query(None),
date_from: Optional[str] = Query(None),
date_to: Optional[str] = Query(None),
limit: int = Query(100, ge=1, le=1000),
offset: int = Query(0, ge=0),
):
conn = get_db()
conditions = []
params = []
if user_id is not None:
conditions.append("a.user_id = ?")
params.append(user_id)
if entity_type is not None:
conditions.append("a.entity_type = ?")
params.append(entity_type)
if date_from:
conditions.append("a.created_at >= ?")
params.append(date_from)
if date_to:
conditions.append("a.created_at <= ?")
params.append(date_to + " 23:59:59")
where = " AND ".join(conditions)
sql = """SELECT a.*, u.username AS user_name
FROM activity_log a
LEFT JOIN users u ON a.user_id = u.id"""
if where:
sql += f" WHERE {where}"
sql += " ORDER BY a.created_at DESC, a.id DESC LIMIT ? OFFSET ?"
params.extend([limit, offset])
rows = conn.execute(sql, params).fetchall()
conn.close()
return [row_to_dict(r) for r in rows]
# ─── Phase C: Export Endpoints Extension ─────────────────────────────────────
@app.get("/api/export/service-summary")
def export_service_summary_csv():
conn = get_db()
rows = conn.execute(
"""SELECT c.name AS customer_name, l.name AS location_name,
COUNT(a.id) AS asset_count,
MAX(ck.created_at) AS last_checkin
FROM customers c
LEFT JOIN locations l ON l.customer_id = c.id
LEFT JOIN assets a ON a.customer_id = c.id
LEFT JOIN checkins ck ON ck.asset_id = a.id
GROUP BY c.id, l.id
ORDER BY c.name, l.name"""
).fetchall()
conn.close()
def _gen():
output = io.StringIO()
writer = csv.DictWriter(output, fieldnames=["customer_name", "location_name", "asset_count", "last_checkin"])
writer.writeheader()
yield output.getvalue()
output.seek(0)
output.truncate(0)
for row in rows:
d = row_to_dict(row)
d["last_checkin"] = d["last_checkin"] or ""
writer.writerow(d)
yield output.getvalue()
output.seek(0)
output.truncate(0)
return StreamingResponse(
_gen(),
media_type="text/csv",
headers={"Content-Disposition": "attachment; filename=service_summary.csv"},
)
# ─── File Uploads ───────────────────────────────────────────────────────────
ICON_MAX_SIZE = 2 * 1024 * 1024 # 2 MB
PHOTO_MAX_SIZE = 10 * 1024 * 1024 # 10 MB
ICON_ALLOWED_EXTS = {".png", ".jpg", ".jpeg", ".svg"}
PHOTO_ALLOWED_EXTS = {".png", ".jpg", ".jpeg"}
def _save_upload(upload: UploadFile, subdir: str, allowed_exts: set, max_size: int) -> str:
"""Save uploaded file to uploads/{subdir}/ with a UUID filename.
Returns the relative URL path, e.g. /uploads/icons/abc123.png.
"""
ext = Path(upload.filename or "").suffix.lower()
if not ext or ext not in allowed_exts:
allowed = ", ".join(sorted(allowed_exts))
raise HTTPException(400, f"Invalid file type. Allowed: {allowed}")
contents = upload.file.read()
if len(contents) > max_size:
mb = max_size // (1024 * 1024)
raise HTTPException(413, f"File too large. Maximum size: {mb} MB")
dest_dir = UPLOADS_DIR / subdir
dest_dir.mkdir(parents=True, exist_ok=True)
fname = f"{uuid.uuid4().hex}{ext}"
(dest_dir / fname).write_bytes(contents)
return f"/uploads/{subdir}/{fname}"
@app.post("/api/upload/icon", status_code=201)
async def upload_icon(file: UploadFile = File(...)):
path = _save_upload(file, "icons", ICON_ALLOWED_EXTS, ICON_MAX_SIZE)
return {"path": path}
@app.post("/api/upload/photo", status_code=201)
async def upload_photo(file: UploadFile = File(...)):
path = _save_upload(file, "photos", PHOTO_ALLOWED_EXTS, PHOTO_MAX_SIZE)
return {"path": path}
@app.post("/api/ocr", status_code=200)
async def ocr_sticker(file: UploadFile = File(...)):
"""OCR a sticker photo to extract machine_id from XXXXX-XXXXXX format.
Accepts an image upload, runs Tesseract OCR, and looks for a pattern like
'12345-678901'. Returns the extracted machine_id or an error.
"""
# Validate file extension
ext = file.filename.rsplit(".", 1)[-1].lower() if "." in (file.filename or "") else ""
if ext not in {"png", "jpg", "jpeg", "webp", "bmp"}:
raise HTTPException(status_code=400, detail=f"Unsupported image format: .{ext}")
contents = await file.read()
if len(contents) > 10 * 1024 * 1024: # 10MB max
raise HTTPException(status_code=400, detail="Image too large (max 10MB)")
# Save temp file for OCR
temp_dir = Path(UPLOADS_DIR / "ocr")
temp_dir.mkdir(parents=True, exist_ok=True)
temp_path = temp_dir / f"{uuid.uuid4().hex}.{ext or 'jpg'}"
temp_path.write_bytes(contents)
try:
img = PILImage.open(temp_path)
# Preprocess: convert to grayscale and increase contrast for better OCR
img_gray = img.convert("L")
text = pytesseract.image_to_string(img_gray, config="--psm 6")
except Exception as e:
temp_path.unlink(missing_ok=True)
raise HTTPException(status_code=500, detail=f"OCR processing failed: {str(e)}")
# Don't keep temp file after processing
temp_path.unlink(missing_ok=True)
# Search for XXXXX-XXXXXX pattern (5 digits - 6 digits or more)
match = re.search(r"(\d{5})[-\s]*(\d{6,})", text)
if match:
machine_id = f"{match.group(1)}-{match.group(2)}"
return {
"machine_id": machine_id,
"raw_text": text.strip()[:500],
"confidence": "high",
}
# Try looser: any 5+ digit number as machine_id
loose = re.search(r"(\d{5,})", text)
if loose:
return {
"machine_id": loose.group(1),
"raw_text": text.strip()[:500],
"confidence": "low",
}
return {
"machine_id": None,
"raw_text": text.strip()[:500],
"confidence": "none",
"detail": "No machine ID pattern found in image. Try again with better lighting.",
}
# ─── Static Files (mounted last to not shadow routes) ──────────────────────
app.mount("/uploads", StaticFiles(directory=str(UPLOADS_DIR)), name="uploads")
if STATIC_DIR.exists():
app.mount("/", StaticFiles(directory=str(STATIC_DIR), html=True), name="static")