""" Canteen Asset Geolocation Tool — FastAPI server. Single-file backend: SQLite storage, asset CRUD, machine_id search, check-ins with GPS, stats, and CSV export. v2 schema: full asset management with customers, locations, keys, badges, etc. """ import csv import hashlib import io import json as _json import os import re import secrets import sqlite3 import uuid from contextlib import asynccontextmanager from pathlib import Path import pytesseract from PIL import Image as PILImage from fastapi import FastAPI, HTTPException, Query, Request, UploadFile, File from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import JSONResponse, StreamingResponse from fastapi.staticfiles import StaticFiles from pydantic import BaseModel from typing import Optional, List # ─── Config ───────────────────────────────────────────────────────────────── VALID_CATEGORIES = {"Furniture", "Appliances", "Utensils & Serveware", "Equipment", "Other"} VALID_STATUSES = {"active", "maintenance", "retired"} DB_PATH = os.environ.get("CANTEEN_DB_PATH", str(Path(__file__).parent / "assets.db")) UPLOADS_DIR = Path(os.environ.get("CANTEEN_UPLOADS_DIR", str(Path(__file__).parent / "uploads"))) STATIC_DIR = Path(__file__).parent / "static" # ─── Database ─────────────────────────────────────────────────────────────── def get_db() -> sqlite3.Connection: """Return a new DB connection with WAL + foreign keys enabled.""" conn = sqlite3.connect(DB_PATH) conn.execute("PRAGMA journal_mode=WAL") conn.execute("PRAGMA foreign_keys=ON") conn.row_factory = sqlite3.Row return conn def _create_v2_tables(conn: sqlite3.Connection): """Create all v2 tables if they don't exist.""" conn.executescript(""" CREATE TABLE IF NOT EXISTS users ( id INTEGER PRIMARY KEY AUTOINCREMENT, username TEXT UNIQUE NOT NULL, password_hash TEXT NOT NULL, role TEXT NOT NULL DEFAULT 'technician', created_at TEXT NOT NULL DEFAULT (datetime('now')) ); CREATE TABLE IF NOT EXISTS customers ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT UNIQUE NOT NULL, created_at TEXT NOT NULL DEFAULT (datetime('now')), updated_at TEXT NOT NULL DEFAULT (datetime('now')) ); CREATE TABLE IF NOT EXISTS customer_contacts ( id INTEGER PRIMARY KEY AUTOINCREMENT, customer_id INTEGER NOT NULL REFERENCES customers(id) ON DELETE CASCADE, name TEXT, phone TEXT, email TEXT ); CREATE TABLE IF NOT EXISTS locations ( id INTEGER PRIMARY KEY AUTOINCREMENT, customer_id INTEGER REFERENCES customers(id), name TEXT, address TEXT DEFAULT '', building_name TEXT DEFAULT '', building_number TEXT DEFAULT '', floor TEXT DEFAULT '', trailer_number TEXT DEFAULT '', site_hours TEXT DEFAULT '', access_notes TEXT DEFAULT '', walking_directions TEXT DEFAULT '', map_link TEXT DEFAULT '', latitude REAL DEFAULT NULL, longitude REAL DEFAULT NULL, created_at TEXT NOT NULL DEFAULT (datetime('now')), updated_at TEXT NOT NULL DEFAULT (datetime('now')) ); CREATE TABLE IF NOT EXISTS rooms ( id INTEGER PRIMARY KEY AUTOINCREMENT, location_id INTEGER NOT NULL REFERENCES locations(id) ON DELETE CASCADE, name TEXT, floor TEXT DEFAULT '', created_at TEXT NOT NULL DEFAULT (datetime('now')), updated_at TEXT NOT NULL DEFAULT (datetime('now')) ); CREATE TABLE IF NOT EXISTS categories ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT UNIQUE NOT NULL, icon TEXT DEFAULT '' ); CREATE TABLE IF NOT EXISTS key_names ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT UNIQUE NOT NULL ); CREATE TABLE IF NOT EXISTS key_types ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT UNIQUE NOT NULL ); CREATE TABLE IF NOT EXISTS badge_types ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT UNIQUE NOT NULL ); CREATE TABLE IF NOT EXISTS makes ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT UNIQUE NOT NULL ); CREATE TABLE IF NOT EXISTS models ( id INTEGER PRIMARY KEY AUTOINCREMENT, make_id INTEGER NOT NULL REFERENCES makes(id), name TEXT NOT NULL, icon_path TEXT ); CREATE TABLE IF NOT EXISTS assets ( id INTEGER PRIMARY KEY AUTOINCREMENT, machine_id TEXT NOT NULL UNIQUE, serial_number TEXT DEFAULT '', name TEXT NOT NULL, description TEXT DEFAULT '', category TEXT NOT NULL DEFAULT 'Other', status TEXT NOT NULL DEFAULT 'active', make TEXT DEFAULT '', model TEXT DEFAULT '', address TEXT DEFAULT '', building_name TEXT DEFAULT '', building_number TEXT DEFAULT '', floor TEXT DEFAULT '', room TEXT DEFAULT '', trailer_number TEXT DEFAULT '', walking_directions TEXT DEFAULT '', map_link TEXT DEFAULT '', parking_location TEXT DEFAULT '', photo_path TEXT, customer_id INTEGER REFERENCES customers(id), location_id INTEGER REFERENCES locations(id), assigned_to INTEGER REFERENCES users(id), latitude REAL DEFAULT NULL, longitude REAL DEFAULT NULL, geofence_radius_meters INTEGER DEFAULT 50, created_at TEXT NOT NULL DEFAULT (datetime('now')), updated_at TEXT NOT NULL DEFAULT (datetime('now')) ); CREATE TABLE IF NOT EXISTS checkins ( id INTEGER PRIMARY KEY AUTOINCREMENT, asset_id INTEGER NOT NULL REFERENCES assets(id) ON DELETE CASCADE, user_id INTEGER REFERENCES users(id), latitude REAL, longitude REAL, accuracy REAL, photo_path TEXT, notes TEXT DEFAULT '', created_at TEXT NOT NULL DEFAULT (datetime('now')) ); CREATE TABLE IF NOT EXISTS asset_keys ( id INTEGER PRIMARY KEY AUTOINCREMENT, asset_id INTEGER NOT NULL REFERENCES assets(id) ON DELETE CASCADE, key_name TEXT, key_type TEXT ); CREATE TABLE IF NOT EXISTS asset_badges ( id INTEGER PRIMARY KEY AUTOINCREMENT, asset_id INTEGER NOT NULL REFERENCES assets(id) ON DELETE CASCADE, badge_name TEXT ); CREATE TABLE IF NOT EXISTS settings ( id INTEGER PRIMARY KEY AUTOINCREMENT, key TEXT UNIQUE NOT NULL, value TEXT ); CREATE TABLE IF NOT EXISTS activity_log ( id INTEGER PRIMARY KEY AUTOINCREMENT, user_id INTEGER REFERENCES users(id), action TEXT, entity_type TEXT, entity_id INTEGER, details TEXT, created_at TEXT NOT NULL DEFAULT (datetime('now')) ); CREATE TABLE IF NOT EXISTS geofences ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT, points TEXT, color TEXT, created_at TEXT NOT NULL DEFAULT (datetime('now')), updated_at TEXT NOT NULL DEFAULT (datetime('now')) ); CREATE TABLE IF NOT EXISTS geofence_users ( id INTEGER PRIMARY KEY AUTOINCREMENT, geofence_id INTEGER NOT NULL REFERENCES geofences(id) ON DELETE CASCADE, user_id INTEGER NOT NULL REFERENCES users(id) ON DELETE CASCADE, UNIQUE(geofence_id, user_id) ); CREATE TABLE IF NOT EXISTS visits ( id INTEGER PRIMARY KEY AUTOINCREMENT, user_id INTEGER REFERENCES users(id), asset_id INTEGER REFERENCES assets(id) ON DELETE CASCADE, checkin_time TEXT, checkout_time TEXT, duration_minutes INTEGER, created_at TEXT NOT NULL DEFAULT (datetime('now')) ); CREATE TABLE IF NOT EXISTS sessions ( id INTEGER PRIMARY KEY AUTOINCREMENT, user_id INTEGER REFERENCES users(id) ON DELETE CASCADE, token TEXT UNIQUE NOT NULL, created_at TEXT NOT NULL DEFAULT (datetime('now')) ); CREATE INDEX IF NOT EXISTS idx_checkins_asset_id ON checkins(asset_id); CREATE INDEX IF NOT EXISTS idx_checkins_created_at ON checkins(created_at); """) def _seed_if_empty(conn: sqlite3.Connection, table: str, columns: tuple, rows: list): """Insert seed rows if table is empty.""" existing = conn.execute(f"SELECT COUNT(*) FROM {table}").fetchone()[0] if existing == 0: placeholders = ", ".join(["?"] * len(columns)) col_names = ", ".join(columns) conn.executemany( f"INSERT INTO {table} ({col_names}) VALUES ({placeholders})", rows, ) def _seed_data(conn: sqlite3.Connection): """Insert default seed data for lookup tables.""" _seed_if_empty(conn, "categories", ("name", "icon"), [ ("Furniture", "🪑"), ("Appliances", "🔌"), ("Utensils & Serveware", "🍽️"), ("Equipment", "⚙️"), ("Other", "📦"), ]) _seed_if_empty(conn, "key_names", ("name",), [ ("MK500",), ("Green Dot",), ("Red Key",), ("Blue Key",), ("Master Key",), ("Padlock Key",), ]) _seed_if_empty(conn, "key_types", ("name",), [ ("Round Short",), ("Barrel",), ("Standard",), ("Flat",), ("Tubular",), ]) _seed_if_empty(conn, "badge_types", ("name",), [ ("Disney Contractor Base",), ("Visitor Badge",), ("Employee Badge",), ("Contractor Badge",), ("Temporary Pass",), ]) _seed_if_empty(conn, "makes", ("name",), [ ("Canteen",), ("Hobart",), ("Vollrath",), ("Metro",), ("Rubbermaid",), ("Cambro",), ("Other",), ]) # Seed default admin user existing = conn.execute("SELECT COUNT(*) FROM users").fetchone()[0] if existing == 0: conn.execute( "INSERT INTO users (username, password_hash, role) VALUES (?, ?, ?)", ("admin", "057ba03d6c44104863dc7361fe4578965d1887360f90a0895882e58a6248fc86", "admin"), ) def _ensure_unique_machine_id(conn: sqlite3.Connection): """Remove duplicate machine_ids (keep oldest), then create UNIQUE index.""" dupes = conn.execute(""" SELECT machine_id FROM assets GROUP BY machine_id HAVING COUNT(*) > 1 """).fetchall() for dupe in dupes: mid = dupe["machine_id"] ids = conn.execute( "SELECT id FROM assets WHERE machine_id = ? ORDER BY id", (mid,) ).fetchall() keep_id = ids[0]["id"] for row in ids[1:]: conn.execute("DELETE FROM assets WHERE id = ?", (row["id"],)) conn.execute("DROP INDEX IF EXISTS idx_assets_machine_id") conn.execute("CREATE UNIQUE INDEX idx_assets_machine_id ON assets(machine_id)") def _migrate_v1_to_v2(conn: sqlite3.Connection): """Migrate existing v1 database (barcode-based) to v2 schema.""" # Create all new tables first _create_v2_tables(conn) # Create assets_v2 with new schema, copying old data conn.execute(""" CREATE TABLE assets_v2 ( id INTEGER PRIMARY KEY AUTOINCREMENT, machine_id TEXT NOT NULL UNIQUE, serial_number TEXT DEFAULT '', name TEXT NOT NULL, description TEXT DEFAULT '', category TEXT NOT NULL DEFAULT 'Other', status TEXT NOT NULL DEFAULT 'active', make TEXT DEFAULT '', model TEXT DEFAULT '', address TEXT DEFAULT '', building_name TEXT DEFAULT '', building_number TEXT DEFAULT '', floor TEXT DEFAULT '', room TEXT DEFAULT '', trailer_number TEXT DEFAULT '', walking_directions TEXT DEFAULT '', map_link TEXT DEFAULT '', parking_location TEXT DEFAULT '', photo_path TEXT, customer_id INTEGER REFERENCES customers(id), location_id INTEGER REFERENCES locations(id), assigned_to INTEGER REFERENCES users(id), created_at TEXT NOT NULL DEFAULT (datetime('now')), updated_at TEXT NOT NULL DEFAULT (datetime('now')) ) """) conn.execute(""" INSERT INTO assets_v2 (id, machine_id, name, description, category, status, photo_path, created_at, updated_at) SELECT id, barcode, name, description, category, status, photo_path, created_at, updated_at FROM assets """) conn.execute("DROP TABLE assets") conn.execute("ALTER TABLE assets_v2 RENAME TO assets") _ensure_unique_machine_id(conn) conn.execute("CREATE INDEX IF NOT EXISTS idx_assets_category ON assets(category)") # Add user_id to checkins if not present cursor = conn.execute("PRAGMA table_info(checkins)") checkin_cols = {row[1] for row in cursor.fetchall()} if "user_id" not in checkin_cols: conn.execute("ALTER TABLE checkins ADD COLUMN user_id INTEGER REFERENCES users(id)") # Seed lookup data _seed_data(conn) conn.commit() def init_db(conn: sqlite3.Connection): """Create tables and indexes if they don't exist. Runs v1→v2 migration if needed.""" # Check if assets table exists and has old schema cursor = conn.execute( "SELECT name FROM sqlite_master WHERE type='table' AND name='assets'" ) if cursor.fetchone(): col_cursor = conn.execute("PRAGMA table_info(assets)") columns = {row[1] for row in col_cursor.fetchall()} if "barcode" in columns and "machine_id" not in columns: _migrate_v1_to_v2(conn) return # Fresh install or already migrated — create all tables _create_v2_tables(conn) _seed_data(conn) # Asset indexes — created here (not in _create_v2_tables) to avoid # failing during migration when old v1 assets table lacks machine_id. _ensure_unique_machine_id(conn) conn.execute( "CREATE INDEX IF NOT EXISTS idx_assets_category ON assets(category)" ) conn.commit() # Add lat/lng to assets if not present (v3 migration) cursor = conn.execute("PRAGMA table_info(assets)") asset_cols = {row[1] for row in cursor.fetchall()} if "latitude" not in asset_cols: conn.execute("ALTER TABLE assets ADD COLUMN latitude REAL DEFAULT NULL") if "longitude" not in asset_cols: conn.execute("ALTER TABLE assets ADD COLUMN longitude REAL DEFAULT NULL") if "geofence_radius_meters" not in asset_cols: conn.execute("ALTER TABLE assets ADD COLUMN geofence_radius_meters INTEGER DEFAULT 50") cursor = conn.execute("PRAGMA table_info(locations)") loc_cols = {row[1] for row in cursor.fetchall()} if "latitude" not in loc_cols: conn.execute("ALTER TABLE locations ADD COLUMN latitude REAL DEFAULT NULL") if "longitude" not in loc_cols: conn.execute("ALTER TABLE locations ADD COLUMN longitude REAL DEFAULT NULL") conn.commit() # ─── App / Middleware ─────────────────────────────────────────────────────── @asynccontextmanager async def lifespan(app: FastAPI): UPLOADS_DIR.mkdir(parents=True, exist_ok=True) conn = get_db() init_db(conn) conn.close() yield app = FastAPI(title="Canteen Asset Tracker", lifespan=lifespan) app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # ─── Auth Middleware ────────────────────────────────────────────────────────── @app.middleware("http") async def auth_middleware(request: Request, call_next): """Require valid Bearer token for all /api/* routes except login.""" path = request.url.path # Skip auth enforcement in test mode (set by tests/test_server.py) if os.environ.get("CANTEEN_SKIP_AUTH") == "1": return await call_next(request) # Public paths — no auth required if not path.startswith("/api/") or path == "/api/auth/login": return await call_next(request) # Extract and validate token auth_header = request.headers.get("Authorization", "") if not auth_header.startswith("Bearer "): return JSONResponse( status_code=401, content={"detail": "Authentication required"}, ) token = auth_header[7:] conn = get_db() row = conn.execute( "SELECT u.id, u.username, u.role FROM users u " "JOIN sessions s ON u.id = s.user_id WHERE s.token = ?", (token,), ).fetchone() conn.close() if row is None: return JSONResponse( status_code=401, content={"detail": "Invalid or expired token"}, ) request.state.current_user = { "id": row["id"], "username": row["username"], "role": row["role"], } request.state.user_id = row["id"] return await call_next(request) # ─── Global Error Handling ─────────────────────────────────────────────────── @app.exception_handler(HTTPException) async def http_exception_handler(request: Request, exc: HTTPException): """Return structured JSON with status detail for all HTTP exceptions.""" return JSONResponse( status_code=exc.status_code, content={"detail": exc.detail}, ) @app.exception_handler(Exception) async def generic_exception_handler(request: Request, exc: Exception): """Catch-all for unhandled exceptions — log and return 500.""" import traceback traceback.print_exc() return JSONResponse( status_code=500, content={"detail": "Internal server error"}, ) # ─── Input sanitization helpers ────────────────────────────────────────────── def _sanitize_machine_id(machine_id: str) -> str: """Strip whitespace and reject empty machine IDs.""" clean = machine_id.strip() if not clean: raise HTTPException(status_code=422, detail="Machine ID must not be empty") if len(clean) > 256: raise HTTPException(status_code=422, detail="Machine ID too long (max 256 chars)") return clean def _sanitize_name(name: str) -> str: """Trim name and reject empty/massive names.""" clean = name.strip() if not clean: raise HTTPException(status_code=422, detail="Name must not be empty") if len(clean) > 512: raise HTTPException(status_code=422, detail="Name too long (max 512 chars)") return clean # ─── Pydantic Models ──────────────────────────────────────────────────────── class AssetKey(BaseModel): key_name: str key_type: Optional[str] = "" class AssetBadge(BaseModel): badge_name: str class AssetCreate(BaseModel): machine_id: str name: str serial_number: Optional[str] = "" description: Optional[str] = "" category: Optional[str] = "Other" status: Optional[str] = "active" make: Optional[str] = "" model: Optional[str] = "" address: Optional[str] = "" building_name: Optional[str] = "" building_number: Optional[str] = "" floor: Optional[str] = "" room: Optional[str] = "" trailer_number: Optional[str] = "" walking_directions: Optional[str] = "" map_link: Optional[str] = "" parking_location: Optional[str] = "" photo_path: Optional[str] = None customer_id: Optional[int] = None location_id: Optional[int] = None assigned_to: Optional[int] = None latitude: Optional[float] = None longitude: Optional[float] = None geofence_radius_meters: Optional[int] = 50 keys: Optional[List[AssetKey]] = [] badges: Optional[List[str]] = [] class AssetUpdate(BaseModel): machine_id: Optional[str] = None name: Optional[str] = None serial_number: Optional[str] = None description: Optional[str] = None category: Optional[str] = None status: Optional[str] = None make: Optional[str] = None model: Optional[str] = None address: Optional[str] = None building_name: Optional[str] = None building_number: Optional[str] = None floor: Optional[str] = None room: Optional[str] = None trailer_number: Optional[str] = None walking_directions: Optional[str] = None map_link: Optional[str] = None parking_location: Optional[str] = None photo_path: Optional[str] = None customer_id: Optional[int] = None location_id: Optional[int] = None assigned_to: Optional[int] = None latitude: Optional[float] = None longitude: Optional[float] = None geofence_radius_meters: Optional[int] = None keys: Optional[List[AssetKey]] = None badges: Optional[List[str]] = None class CheckinCreate(BaseModel): asset_id: int latitude: Optional[float] = None longitude: Optional[float] = None accuracy: Optional[float] = None photo_path: Optional[str] = None notes: Optional[str] = "" user_id: Optional[int] = None class CheckinUpdate(BaseModel): latitude: Optional[float] = None longitude: Optional[float] = None accuracy: Optional[float] = None photo_path: Optional[str] = None notes: Optional[str] = None user_id: Optional[int] = None class VisitCreate(BaseModel): user_id: Optional[int] = None asset_id: int latitude: Optional[float] = None longitude: Optional[float] = None duration_minutes: Optional[int] = None # ─── Phase B: Customer / Location / Room / Settings Models ────────────────── class CustomerContact(BaseModel): name: Optional[str] = "" phone: Optional[str] = "" email: Optional[str] = "" class CustomerCreate(BaseModel): name: str contacts: Optional[List[CustomerContact]] = [] class CustomerUpdate(BaseModel): name: Optional[str] = None contacts: Optional[List[CustomerContact]] = None class LocationCreate(BaseModel): customer_id: Optional[int] = None name: str address: Optional[str] = "" building_name: Optional[str] = "" building_number: Optional[str] = "" floor: Optional[str] = "" trailer_number: Optional[str] = "" site_hours: Optional[str] = "" access_notes: Optional[str] = "" walking_directions: Optional[str] = "" map_link: Optional[str] = "" latitude: Optional[float] = None longitude: Optional[float] = None class LocationUpdate(BaseModel): customer_id: Optional[int] = None name: Optional[str] = None address: Optional[str] = None building_name: Optional[str] = None building_number: Optional[str] = None floor: Optional[str] = None trailer_number: Optional[str] = None site_hours: Optional[str] = None access_notes: Optional[str] = None walking_directions: Optional[str] = None map_link: Optional[str] = None latitude: Optional[float] = None longitude: Optional[float] = None class RoomCreate(BaseModel): location_id: int name: str floor: Optional[str] = "" class RoomUpdate(BaseModel): name: Optional[str] = None floor: Optional[str] = None location_id: Optional[int] = None # ─── Helpers ──────────────────────────────────────────────────────────────── def row_to_dict(row: sqlite3.Row) -> dict: return dict(row) def _geofence_row(row: sqlite3.Row, conn: Optional[sqlite3.Connection] = None) -> dict: """Serialize a geofence row, parsing points JSON and including assigned users.""" d = dict(row) if isinstance(d.get("points"), str): try: d["points"] = _json.loads(d["points"]) except (_json.JSONDecodeError, TypeError): pass # Include assigned users close_conn = False if conn is None: conn = get_db() close_conn = True try: user_rows = conn.execute( """SELECT u.id, u.username, u.role FROM geofence_users gu JOIN users u ON u.id = gu.user_id WHERE gu.geofence_id = ? ORDER BY u.username""", (d["id"],), ).fetchall() d["assigned_users"] = [dict(r) for r in user_rows] finally: if close_conn: conn.close() return d def _sync_geofence_users(conn: sqlite3.Connection, geofence_id: int, user_ids: list[int]): """Replace assigned users for a geofence with the given list of user IDs. Validates all user_ids exist before modifying the junction table. Does NOT commit — caller manages transaction boundaries. """ # Validate all user IDs exist if user_ids: placeholders = ", ".join(["?"] * len(user_ids)) existing = conn.execute( f"SELECT id FROM users WHERE id IN ({placeholders})", user_ids, ).fetchall() existing_ids = {r["id"] for r in existing} for uid in user_ids: if uid not in existing_ids: raise HTTPException( status_code=422, detail=f"User with id {uid} not found", ) conn.execute("DELETE FROM geofence_users WHERE geofence_id = ?", (geofence_id,)) for uid in user_ids: conn.execute( "INSERT INTO geofence_users (geofence_id, user_id) VALUES (?, ?)", (geofence_id, uid), ) def _validate_category(category: str): if category not in VALID_CATEGORIES: raise HTTPException( status_code=422, detail=f"Invalid category '{category}'. Must be one of: {', '.join(sorted(VALID_CATEGORIES))}", ) def _validate_status(status: str): if status not in VALID_STATUSES: raise HTTPException( status_code=422, detail=f"Invalid status '{status}'. Must be one of: {', '.join(sorted(VALID_STATUSES))}", ) def _validate_ref(conn: sqlite3.Connection, table: str, id_val: int, label: str): """Validate a foreign key reference exists.""" if id_val is not None: row = conn.execute(f"SELECT id FROM {table} WHERE id = ?", (id_val,)).fetchone() if row is None: raise HTTPException(status_code=422, detail=f"{label} with id {id_val} not found") def _validate_enum_table(conn: sqlite3.Connection, table: str, value: str, label: str): """Validate that a value exists in a lookup table (name column).""" if value: row = conn.execute( f"SELECT id FROM {table} WHERE name = ?", (value,) ).fetchone() if row is None: raise HTTPException( status_code=422, detail=f"Invalid {label} '{value}'. Must exist in {table} table.", ) # ─── Task 3: Health ──────────────────────────────────────────────────────── @app.get("/health") def health(): return {"status": "ok"} # ─── Task 4: POST /api/assets ────────────────────────────────────────────── def _build_asset_insert(body: AssetCreate, machine_id: str, name: str): """Build column names and values tuple for asset INSERT.""" columns = [ "machine_id", "serial_number", "name", "description", "category", "status", "make", "model", "address", "building_name", "building_number", "floor", "room", "trailer_number", "walking_directions", "map_link", "parking_location", "photo_path", "customer_id", "location_id", "assigned_to", "latitude", "longitude", "geofence_radius_meters", ] values = ( machine_id, body.serial_number or "", name, body.description or "", body.category or "Other", body.status or "active", body.make or "", body.model or "", body.address or "", body.building_name or "", body.building_number or "", body.floor or "", body.room or "", body.trailer_number or "", body.walking_directions or "", body.map_link or "", body.parking_location or "", body.photo_path, body.customer_id, body.location_id, body.assigned_to, body.latitude, body.longitude, body.geofence_radius_meters if body.geofence_radius_meters is not None else 50, ) return columns, values def _get_asset_keys(conn: sqlite3.Connection, asset_id: int) -> list: """Load all keys for an asset.""" rows = conn.execute( "SELECT id, asset_id, key_name, key_type FROM asset_keys WHERE asset_id = ?", (asset_id,), ).fetchall() return [row_to_dict(r) for r in rows] def _get_asset_badges(conn: sqlite3.Connection, asset_id: int) -> list: """Load all badges for an asset.""" rows = conn.execute( "SELECT id, asset_id, badge_name FROM asset_badges WHERE asset_id = ?", (asset_id,), ).fetchall() return [row_to_dict(r) for r in rows] @app.post("/api/assets", status_code=201) def create_asset(body: AssetCreate): machine_id = _sanitize_machine_id(body.machine_id) name = _sanitize_name(body.name) _validate_status(body.status or "active") conn = get_db() # DB-based validation for reference fields _validate_enum_table(conn, "categories", body.category or "Other", "category") _validate_enum_table(conn, "makes", body.make or "", "make") if body.customer_id is not None: _validate_ref(conn, "customers", body.customer_id, "Customer") if body.location_id is not None: _validate_ref(conn, "locations", body.location_id, "Location") if body.assigned_to is not None: _validate_ref(conn, "users", body.assigned_to, "User") columns, values = _build_asset_insert(body, machine_id, name) placeholders = ", ".join(["?"] * len(columns)) col_names = ", ".join(columns) try: cursor = conn.execute( f"INSERT INTO assets ({col_names}) VALUES ({placeholders})", values, ) except sqlite3.IntegrityError: conn.close() raise HTTPException( status_code=409, detail=f"Asset with machine_id '{machine_id}' already exists", ) asset_id = cursor.lastrowid # Insert keys if body.keys: for k in body.keys: conn.execute( "INSERT INTO asset_keys (asset_id, key_name, key_type) VALUES (?, ?, ?)", (asset_id, k.key_name, k.key_type or ""), ) # Insert badges if body.badges: for b_name in body.badges: conn.execute( "INSERT INTO asset_badges (asset_id, badge_name) VALUES (?, ?)", (asset_id, b_name), ) conn.commit() _log_activity(conn, "created", "asset", asset_id, f"Asset '{name}' (machine_id: {machine_id}) created") conn.commit() row = conn.execute("SELECT * FROM assets WHERE id = ?", (asset_id,)).fetchone() result = row_to_dict(row) result["keys"] = _get_asset_keys(conn, asset_id) result["badges"] = _get_asset_badges(conn, asset_id) conn.close() return result # ─── Task 5: GET /api/assets ─────────────────────────────────────────────── @app.get("/api/assets") def list_assets( category: Optional[str] = Query(None), status: Optional[str] = Query(None), make: Optional[str] = Query(None), model: Optional[str] = Query(None), customer_id: Optional[int] = Query(None), location_id: Optional[int] = Query(None), assigned_to: Optional[int] = Query(None), q: Optional[str] = Query(None), limit: int = Query(100, ge=1, le=1000), offset: int = Query(0, ge=0), ): conn = get_db() conditions = [] params = [] if category: conditions.append("category = ?") params.append(category) if status: conditions.append("status = ?") params.append(status) if make: conditions.append("make = ?") params.append(make) if model: conditions.append("model = ?") params.append(model) if customer_id is not None: conditions.append("customer_id = ?") params.append(customer_id) if location_id is not None: conditions.append("location_id = ?") params.append(location_id) if assigned_to is not None: conditions.append("assigned_to = ?") params.append(assigned_to) if q: conditions.append("(name LIKE ? OR machine_id LIKE ? OR description LIKE ?)") like = f"%{q}%" params.extend([like, like, like]) where = " AND ".join(conditions) sql = "SELECT * FROM assets" if where: sql += f" WHERE {where}" sql += " ORDER BY created_at DESC LIMIT ? OFFSET ?" params.extend([limit, offset]) rows = conn.execute(sql, params).fetchall() conn.close() return [row_to_dict(r) for r in rows] @app.get("/api/assets/search") def search_by_machine_id(machine_id: str = Query(...)): conn = get_db() row = conn.execute( "SELECT * FROM assets WHERE machine_id = ?", (machine_id,) ).fetchone() conn.close() if row is None: raise HTTPException(status_code=404, detail="Asset not found") return row_to_dict(row) @app.get("/api/assets/{asset_id}") def get_asset(asset_id: int): conn = get_db() row = conn.execute("SELECT * FROM assets WHERE id = ?", (asset_id,)).fetchone() if row is None: conn.close() raise HTTPException(status_code=404, detail="Asset not found") result = row_to_dict(row) result["keys"] = _get_asset_keys(conn, asset_id) result["badges"] = _get_asset_badges(conn, asset_id) # Add joined names if result.get("customer_id"): cust = conn.execute( "SELECT name FROM customers WHERE id = ?", (result["customer_id"],) ).fetchone() result["customer_name"] = cust["name"] if cust else None if result.get("location_id"): loc = conn.execute( "SELECT name FROM locations WHERE id = ?", (result["location_id"],) ).fetchone() result["location_name"] = loc["name"] if loc else None conn.close() return result # ─── Task 6: PUT / DELETE /api/assets/{id} ───────────────────────────────── _TEXT_FIELDS = [ "name", "machine_id", "serial_number", "description", "make", "model", "address", "building_name", "building_number", "floor", "room", "trailer_number", "walking_directions", "map_link", "parking_location", ] @app.put("/api/assets/{asset_id}") def update_asset(asset_id: int, body: AssetUpdate): conn = get_db() existing = conn.execute("SELECT * FROM assets WHERE id = ?", (asset_id,)).fetchone() if existing is None: conn.close() raise HTTPException(status_code=404, detail="Asset not found") updates = {} for field in _TEXT_FIELDS: val = getattr(body, field, None) if val is not None: if field == "name": updates[field] = _sanitize_name(val) elif field == "machine_id": updates[field] = _sanitize_machine_id(val) else: updates[field] = val if body.category is not None: _validate_enum_table(conn, "categories", body.category, "category") updates["category"] = body.category if body.status is not None: _validate_status(body.status) updates["status"] = body.status if body.photo_path is not None: updates["photo_path"] = body.photo_path if body.customer_id is not None: updates["customer_id"] = body.customer_id if body.location_id is not None: updates["location_id"] = body.location_id if body.assigned_to is not None: updates["assigned_to"] = body.assigned_to if body.latitude is not None: updates["latitude"] = body.latitude if body.longitude is not None: updates["longitude"] = body.longitude if body.geofence_radius_meters is not None: updates["geofence_radius_meters"] = body.geofence_radius_meters if updates: updates["updated_at"] = "datetime('now')" set_clause = ", ".join( f"{k} = {v}" if k == "updated_at" else f"{k} = ?" for k, v in updates.items() ) values = [v for k, v in updates.items() if k != "updated_at"] conn.execute( f"UPDATE assets SET {set_clause} WHERE id = ?", values + [asset_id], ) conn.commit() row = conn.execute("SELECT * FROM assets WHERE id = ?", (asset_id,)).fetchone() _log_activity(conn, "updated", "asset", asset_id, f"Asset '{row['name']}' updated") conn.commit() conn.close() return row_to_dict(row) @app.delete("/api/assets/{asset_id}", status_code=204) def delete_asset(asset_id: int): conn = get_db() existing = conn.execute("SELECT id FROM assets WHERE id = ?", (asset_id,)).fetchone() if existing is None: conn.close() raise HTTPException(status_code=404, detail="Asset not found") # Delete dependent rows first (visits, asset_keys, asset_badges lack ON DELETE CASCADE) conn.execute("DELETE FROM visits WHERE asset_id = ?", (asset_id,)) conn.execute("DELETE FROM asset_keys WHERE asset_id = ?", (asset_id,)) conn.execute("DELETE FROM asset_badges WHERE asset_id = ?", (asset_id,)) conn.execute("DELETE FROM assets WHERE id = ?", (asset_id,)) _log_activity(conn, "deleted", "asset", asset_id, f"Asset {asset_id} deleted") conn.commit() conn.close() # ─── Task 8: POST /api/checkins ───────────────────────────────────────────── @app.post("/api/checkins", status_code=201) def create_checkin(body: CheckinCreate): conn = get_db() existing = conn.execute("SELECT id FROM assets WHERE id = ?", (body.asset_id,)).fetchone() if existing is None: conn.close() raise HTTPException(status_code=404, detail="Asset not found") cursor = conn.execute( """INSERT INTO checkins (asset_id, user_id, latitude, longitude, accuracy, photo_path, notes) VALUES (?, ?, ?, ?, ?, ?, ?)""", (body.asset_id, body.user_id, body.latitude, body.longitude, body.accuracy, body.photo_path, body.notes or ""), ) conn.commit() checkin_id = cursor.lastrowid # Auto-log visit row = conn.execute("SELECT created_at FROM checkins WHERE id = ?", (checkin_id,)).fetchone() _auto_log_visit(conn, body.user_id, body.asset_id, row["created_at"]) # Activity log _log_activity(conn, "created", "checkin", checkin_id, f"Check-in for asset {body.asset_id}", user_id=body.user_id) conn.commit() row = conn.execute("SELECT * FROM checkins WHERE id = ?", (checkin_id,)).fetchone() conn.close() return row_to_dict(row) # ─── Task 9: GET /api/checkins ────────────────────────────────────────────── @app.get("/api/checkins") def list_checkins( asset_id: Optional[int] = Query(None), user_id: Optional[int] = Query(None), limit: int = Query(100, ge=1, le=1000), offset: int = Query(0, ge=0), ): conn = get_db() conditions = [] params = [] if asset_id is not None: conditions.append("asset_id = ?") params.append(asset_id) if user_id is not None: conditions.append("user_id = ?") params.append(user_id) where = " AND ".join(conditions) sql = "SELECT * FROM checkins" if where: sql += f" WHERE {where}" sql += " ORDER BY created_at DESC, id DESC LIMIT ? OFFSET ?" params.extend([limit, offset]) rows = conn.execute(sql, params).fetchall() conn.close() return [row_to_dict(r) for r in rows] @app.get("/api/checkins/{checkin_id}") def get_checkin(checkin_id: int): conn = get_db() row = conn.execute("SELECT * FROM checkins WHERE id = ?", (checkin_id,)).fetchone() if row is None: conn.close() raise HTTPException(status_code=404, detail="Checkin not found") conn.close() return row_to_dict(row) @app.put("/api/checkins/{checkin_id}") def update_checkin(checkin_id: int, body: CheckinUpdate): conn = get_db() existing = conn.execute("SELECT id FROM checkins WHERE id = ?", (checkin_id,)).fetchone() if existing is None: conn.close() raise HTTPException(status_code=404, detail="Checkin not found") updates = {} for field in ("latitude", "longitude", "accuracy", "photo_path", "notes", "user_id"): val = getattr(body, field, None) if val is not None: updates[field] = val if updates: set_clause = ", ".join(f"{k} = ?" for k in updates) values = list(updates.values()) conn.execute( f"UPDATE checkins SET {set_clause} WHERE id = ?", values + [checkin_id], ) conn.commit() row = conn.execute("SELECT * FROM checkins WHERE id = ?", (checkin_id,)).fetchone() conn.close() return row_to_dict(row) @app.delete("/api/checkins/{checkin_id}", status_code=204) def delete_checkin(checkin_id: int): conn = get_db() existing = conn.execute("SELECT id FROM checkins WHERE id = ?", (checkin_id,)).fetchone() if existing is None: conn.close() raise HTTPException(status_code=404, detail="Checkin not found") conn.execute("DELETE FROM checkins WHERE id = ?", (checkin_id,)) conn.commit() conn.close() # ─── Task 10: GET /api/stats ──────────────────────────────────────────────── @app.get("/api/stats") def get_stats(): conn = get_db() total_assets = conn.execute("SELECT COUNT(*) FROM assets").fetchone()[0] total_checkins = conn.execute("SELECT COUNT(*) FROM checkins").fetchone()[0] cats = conn.execute( "SELECT category, COUNT(*) AS cnt FROM assets GROUP BY category ORDER BY cnt DESC" ).fetchall() by_category = {r["category"]: r["cnt"] for r in cats} statuses = conn.execute( "SELECT status, COUNT(*) AS cnt FROM assets GROUP BY status ORDER BY cnt DESC" ).fetchall() by_status = {r["status"]: r["cnt"] for r in statuses} # Enhanced: top visited assets top_visited = conn.execute( """SELECT a.name, a.machine_id, COUNT(*) AS visit_count, MAX(v.checkin_time) AS last_visit_date FROM visits v JOIN assets a ON v.asset_id = a.id GROUP BY v.asset_id ORDER BY visit_count DESC LIMIT 10""" ).fetchall() top_visited_list = [ {"name": r["name"], "machine_id": r["machine_id"], "visit_count": r["visit_count"], "last_visit_date": r["last_visit_date"]} for r in top_visited ] # Enhanced: time on site per technician time_on_site_rows = conn.execute( """SELECT u.username, COUNT(v.id) AS visit_count, SUM(CASE WHEN v.checkout_time IS NOT NULL AND v.checkin_time IS NOT NULL THEN (julianday(v.checkout_time) - julianday(v.checkin_time)) * 1440 ELSE 0 END) AS total_minutes FROM visits v JOIN users u ON v.user_id = u.id WHERE u.role = 'technician' GROUP BY v.user_id ORDER BY total_minutes DESC""" ).fetchall() time_on_site = [ {"username": r["username"], "visit_count": r["visit_count"], "total_minutes": round(r["total_minutes"] or 0, 1)} for r in time_on_site_rows ] # Enhanced: assets by make/manufacturer makes = conn.execute( "SELECT make, COUNT(*) AS cnt FROM assets WHERE make != '' GROUP BY make ORDER BY cnt DESC" ).fetchall() by_make = {r["make"]: r["cnt"] for r in makes} conn.close() return { "total_assets": total_assets, "total_checkins": total_checkins, "by_category": by_category, "by_status": by_status, "top_visited": top_visited_list, "time_on_site": time_on_site, "by_make": by_make, } # ─── Task 11: CSV Export ──────────────────────────────────────────────────── def _generate_csv(rows, fieldnames): """Yield CSV rows as a string generator for StreamingResponse.""" output = io.StringIO() writer = csv.DictWriter(output, fieldnames=fieldnames) writer.writeheader() yield output.getvalue() output.seek(0) output.truncate(0) for row in rows: writer.writerow(row_to_dict(row)) yield output.getvalue() output.seek(0) output.truncate(0) _ASSET_CSV_FIELDS = [ "id", "machine_id", "serial_number", "name", "description", "category", "status", "make", "model", "address", "building_name", "building_number", "floor", "room", "trailer_number", "walking_directions", "map_link", "parking_location", "photo_path", "customer_id", "location_id", "assigned_to", "latitude", "longitude", "geofence_radius_meters", "created_at", "updated_at", ] _CHECKIN_CSV_FIELDS = [ "id", "asset_id", "user_id", "latitude", "longitude", "accuracy", "photo_path", "notes", "created_at", ] @app.get("/api/export/assets") def export_assets_csv(): conn = get_db() rows = conn.execute("SELECT * FROM assets ORDER BY created_at DESC").fetchall() conn.close() return StreamingResponse( _generate_csv(rows, _ASSET_CSV_FIELDS), media_type="text/csv", headers={"Content-Disposition": "attachment; filename=assets.csv"}, ) @app.get("/api/export/checkins") def export_checkins_csv(asset_id: Optional[int] = Query(None)): conn = get_db() if asset_id is not None: rows = conn.execute( "SELECT * FROM checkins WHERE asset_id = ? ORDER BY created_at DESC, id DESC", (asset_id,), ).fetchall() else: rows = conn.execute("SELECT * FROM checkins ORDER BY created_at DESC").fetchall() conn.close() return StreamingResponse( _generate_csv(rows, _CHECKIN_CSV_FIELDS), media_type="text/csv", headers={"Content-Disposition": "attachment; filename=checkins.csv"}, ) # ─── Phase B: Customers API ────────────────────────────────────────────────── @app.post("/api/customers", status_code=201) def create_customer(body: CustomerCreate): name = body.name.strip() if not name: raise HTTPException(status_code=422, detail="Customer name must not be empty") conn = get_db() try: cursor = conn.execute("INSERT INTO customers (name) VALUES (?)", (name,)) cust_id = cursor.lastrowid if body.contacts: for c in body.contacts: conn.execute( "INSERT INTO customer_contacts (customer_id, name, phone, email) VALUES (?, ?, ?, ?)", (cust_id, c.name or "", c.phone or "", c.email or ""), ) conn.commit() _log_activity(conn, "created", "customer", cust_id, f"Customer '{name}' created") conn.commit() except sqlite3.IntegrityError: conn.close() raise HTTPException(status_code=409, detail=f"Customer '{name}' already exists") row = conn.execute("SELECT * FROM customers WHERE id = ?", (cust_id,)).fetchone() result = row_to_dict(row) result["contacts"] = _get_customer_contacts(conn, cust_id) conn.close() return result def _get_customer_contacts(conn: sqlite3.Connection, cust_id: int) -> list: rows = conn.execute( "SELECT id, customer_id, name, phone, email FROM customer_contacts WHERE customer_id = ?", (cust_id,), ).fetchall() return [row_to_dict(r) for r in rows] @app.get("/api/customers") def list_customers(): conn = get_db() rows = conn.execute("SELECT * FROM customers ORDER BY name").fetchall() result = [] for r in rows: d = row_to_dict(r) d["contacts"] = _get_customer_contacts(conn, r["id"]) result.append(d) conn.close() return result @app.get("/api/customers/{cust_id}") def get_customer(cust_id: int): conn = get_db() row = conn.execute("SELECT * FROM customers WHERE id = ?", (cust_id,)).fetchone() if row is None: conn.close() raise HTTPException(status_code=404, detail="Customer not found") result = row_to_dict(row) result["contacts"] = _get_customer_contacts(conn, cust_id) conn.close() return result @app.put("/api/customers/{cust_id}") def update_customer(cust_id: int, body: CustomerUpdate): conn = get_db() existing = conn.execute("SELECT id FROM customers WHERE id = ?", (cust_id,)).fetchone() if existing is None: conn.close() raise HTTPException(status_code=404, detail="Customer not found") if body.name is not None: name = body.name.strip() if not name: conn.close() raise HTTPException(status_code=422, detail="Customer name must not be empty") conn.execute("UPDATE customers SET name = ?, updated_at = datetime('now') WHERE id = ?", (name, cust_id)) if body.contacts is not None: # Replace all contacts conn.execute("DELETE FROM customer_contacts WHERE customer_id = ?", (cust_id,)) for c in body.contacts: conn.execute( "INSERT INTO customer_contacts (customer_id, name, phone, email) VALUES (?, ?, ?, ?)", (cust_id, c.name or "", c.phone or "", c.email or ""), ) conn.commit() row = conn.execute("SELECT * FROM customers WHERE id = ?", (cust_id,)).fetchone() result = row_to_dict(row) result["contacts"] = _get_customer_contacts(conn, cust_id) _log_activity(conn, "updated", "customer", cust_id, f"Customer '{result['name']}' updated") conn.commit() conn.close() return result @app.delete("/api/customers/{cust_id}", status_code=204) def delete_customer(cust_id: int): conn = get_db() existing = conn.execute("SELECT id FROM customers WHERE id = ?", (cust_id,)).fetchone() if existing is None: conn.close() raise HTTPException(status_code=404, detail="Customer not found") conn.execute("DELETE FROM customers WHERE id = ?", (cust_id,)) _log_activity(conn, "deleted", "customer", cust_id, f"Customer {cust_id} deleted") conn.commit() conn.close() # ─── Phase B: Locations API ────────────────────────────────────────────────── def _get_location_rooms(conn: sqlite3.Connection, loc_id: int) -> list: rows = conn.execute( "SELECT id, location_id, name, floor, created_at, updated_at FROM rooms WHERE location_id = ? ORDER BY name", (loc_id,), ).fetchall() return [row_to_dict(r) for r in rows] @app.post("/api/locations", status_code=201) def create_location(body: LocationCreate): name = body.name.strip() if not name: raise HTTPException(status_code=422, detail="Location name must not be empty") conn = get_db() if body.customer_id is not None: _validate_ref(conn, "customers", body.customer_id, "Customer") cursor = conn.execute( """INSERT INTO locations (customer_id, name, address, building_name, building_number, floor, trailer_number, site_hours, access_notes, walking_directions, map_link, latitude, longitude) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""", (body.customer_id, name, body.address or "", body.building_name or "", body.building_number or "", body.floor or "", body.trailer_number or "", body.site_hours or "", body.access_notes or "", body.walking_directions or "", body.map_link or "", body.latitude, body.longitude), ) conn.commit() loc_id = cursor.lastrowid row = conn.execute("SELECT * FROM locations WHERE id = ?", (loc_id,)).fetchone() result = row_to_dict(row) result["rooms"] = [] conn.close() return result @app.get("/api/locations") def list_locations(customer_id: Optional[int] = Query(None)): conn = get_db() if customer_id is not None: rows = conn.execute( "SELECT * FROM locations WHERE customer_id = ? ORDER BY name", (customer_id,), ).fetchall() else: rows = conn.execute("SELECT * FROM locations ORDER BY name").fetchall() result = [] for r in rows: d = row_to_dict(r) d["rooms"] = _get_location_rooms(conn, r["id"]) result.append(d) conn.close() return result @app.get("/api/locations/{loc_id}") def get_location(loc_id: int): conn = get_db() row = conn.execute("SELECT * FROM locations WHERE id = ?", (loc_id,)).fetchone() if row is None: conn.close() raise HTTPException(status_code=404, detail="Location not found") result = row_to_dict(row) result["rooms"] = _get_location_rooms(conn, loc_id) conn.close() return result _LOCATION_FIELDS = [ "customer_id", "name", "address", "building_name", "building_number", "floor", "trailer_number", "site_hours", "access_notes", "walking_directions", "map_link", "latitude", "longitude", ] @app.put("/api/locations/{loc_id}") def update_location(loc_id: int, body: LocationUpdate): conn = get_db() existing = conn.execute("SELECT id FROM locations WHERE id = ?", (loc_id,)).fetchone() if existing is None: conn.close() raise HTTPException(status_code=404, detail="Location not found") updates = {} for field in _LOCATION_FIELDS: val = getattr(body, field, None) if val is not None: if field == "name": name = val.strip() if not name: conn.close() raise HTTPException(status_code=422, detail="Location name must not be empty") updates[field] = name elif field == "customer_id": if val is not None: _validate_ref(conn, "customers", val, "Customer") updates[field] = val elif field in ("latitude", "longitude"): updates[field] = val else: updates[field] = val or "" if updates: updates["updated_at"] = "datetime('now')" set_clause = ", ".join( f"{k} = {v}" if k == "updated_at" else f"{k} = ?" for k, v in updates.items() ) values = [v for k, v in updates.items() if k != "updated_at"] conn.execute( f"UPDATE locations SET {set_clause} WHERE id = ?", values + [loc_id], ) conn.commit() row = conn.execute("SELECT * FROM locations WHERE id = ?", (loc_id,)).fetchone() result = row_to_dict(row) result["rooms"] = _get_location_rooms(conn, loc_id) conn.close() return result @app.delete("/api/locations/{loc_id}", status_code=204) def delete_location(loc_id: int): conn = get_db() existing = conn.execute("SELECT id FROM locations WHERE id = ?", (loc_id,)).fetchone() if existing is None: conn.close() raise HTTPException(status_code=404, detail="Location not found") conn.execute("DELETE FROM locations WHERE id = ?", (loc_id,)) conn.commit() conn.close() # ─── Phase B: Rooms API ────────────────────────────────────────────────────── @app.get("/api/rooms") def list_rooms(location_id: Optional[int] = Query(None)): """List rooms, optionally filtered by location_id.""" conn = get_db() if location_id is not None: rows = conn.execute( "SELECT * FROM rooms WHERE location_id = ? ORDER BY name", (location_id,), ).fetchall() else: rows = conn.execute("SELECT * FROM rooms ORDER BY name").fetchall() conn.close() return [row_to_dict(r) for r in rows] @app.get("/api/rooms/{room_id}") def get_room(room_id: int): """Get a single room by id.""" conn = get_db() row = conn.execute("SELECT * FROM rooms WHERE id = ?", (room_id,)).fetchone() conn.close() if row is None: raise HTTPException(status_code=404, detail="Room not found") return row_to_dict(row) @app.post("/api/rooms", status_code=201) def create_room(body: RoomCreate): conn = get_db() _validate_ref(conn, "locations", body.location_id, "Location") cursor = conn.execute( "INSERT INTO rooms (location_id, name, floor) VALUES (?, ?, ?)", (body.location_id, body.name.strip(), body.floor or ""), ) conn.commit() room_id = cursor.lastrowid row = conn.execute("SELECT * FROM rooms WHERE id = ?", (room_id,)).fetchone() conn.close() return row_to_dict(row) @app.put("/api/rooms/{room_id}") def update_room(room_id: int, body: RoomUpdate): conn = get_db() existing = conn.execute("SELECT id FROM rooms WHERE id = ?", (room_id,)).fetchone() if existing is None: conn.close() raise HTTPException(status_code=404, detail="Room not found") updates = {} if body.name is not None: updates["name"] = body.name.strip() if body.floor is not None: updates["floor"] = body.floor if body.location_id is not None: _validate_ref(conn, "locations", body.location_id, "Location") updates["location_id"] = body.location_id if updates: updates["updated_at"] = "datetime('now')" set_clause = ", ".join( f"{k} = {v}" if k == "updated_at" else f"{k} = ?" for k, v in updates.items() ) values = [v for k, v in updates.items() if k != "updated_at"] conn.execute( f"UPDATE rooms SET {set_clause} WHERE id = ?", values + [room_id], ) conn.commit() row = conn.execute("SELECT * FROM rooms WHERE id = ?", (room_id,)).fetchone() conn.close() return row_to_dict(row) @app.delete("/api/rooms/{room_id}", status_code=204) def delete_room(room_id: int): conn = get_db() existing = conn.execute("SELECT id FROM rooms WHERE id = ?", (room_id,)).fetchone() if existing is None: conn.close() raise HTTPException(status_code=404, detail="Room not found") conn.execute("DELETE FROM rooms WHERE id = ?", (room_id,)) conn.commit() conn.close() # ─── Phase B: Settings API ─────────────────────────────────────────────────── _SETTINGS_SCHEMAS = { "categories": {"table": "categories", "columns": ["name", "icon"]}, "makes": {"table": "makes", "columns": ["name"]}, "models": {"table": "models", "columns": ["make_id", "name", "icon_path"]}, "key_names": {"table": "key_names", "columns": ["name"]}, "key_types": {"table": "key_types", "columns": ["name"]}, "badge_types": {"table": "badge_types", "columns": ["name"]}, } def _settings_list(conn: sqlite3.Connection, table: str): if table == "models": rows = conn.execute("SELECT * FROM models ORDER BY name").fetchall() else: rows = conn.execute(f"SELECT * FROM {table} ORDER BY name").fetchall() return [row_to_dict(r) for r in rows] def _settings_create(conn: sqlite3.Connection, entity: str, body: dict): schema = _SETTINGS_SCHEMAS[entity] table = schema["table"] columns = schema["columns"] values = [] for col in columns: val = body.get(col, "" if col != "make_id" else None) if col == "make_id" and val is None: raise HTTPException(status_code=422, detail="make_id is required for models") values.append(val if val is not None else "") placeholders = ", ".join(["?"] * len(columns)) col_names = ", ".join(columns) try: cursor = conn.execute( f"INSERT INTO {table} ({col_names}) VALUES ({placeholders})", values ) conn.commit() eid = cursor.lastrowid row = conn.execute(f"SELECT * FROM {table} WHERE id = ?", (eid,)).fetchone() return row_to_dict(row) except sqlite3.IntegrityError: raise HTTPException(status_code=409, detail=f"Entry already exists in {entity}") def _settings_update(conn: sqlite3.Connection, entity: str, eid: int, body: dict): schema = _SETTINGS_SCHEMAS[entity] table = schema["table"] columns = schema["columns"] existing = conn.execute(f"SELECT id FROM {table} WHERE id = ?", (eid,)).fetchone() if existing is None: raise HTTPException(status_code=404, detail=f"{entity[:-1]} not found") updates = {} for col in columns: if col in body: updates[col] = body[col] if updates: set_clause = ", ".join(f"{k} = ?" for k in updates) values = list(updates.values()) conn.execute( f"UPDATE {table} SET {set_clause} WHERE id = ?", values + [eid] ) conn.commit() row = conn.execute(f"SELECT * FROM {table} WHERE id = ?", (eid,)).fetchone() return row_to_dict(row) def _settings_get(conn: sqlite3.Connection, entity: str, eid: int): """Get a single settings item by id. Returns dict or raises 404.""" schema = _SETTINGS_SCHEMAS[entity] table = schema["table"] row = conn.execute(f"SELECT * FROM {table} WHERE id = ?", (eid,)).fetchone() if row is None: raise HTTPException(status_code=404, detail=f"{entity[:-1]} not found") return row_to_dict(row) def _settings_delete(conn: sqlite3.Connection, entity: str, eid: int): schema = _SETTINGS_SCHEMAS[entity] table = schema["table"] existing = conn.execute(f"SELECT id FROM {table} WHERE id = ?", (eid,)).fetchone() if existing is None: raise HTTPException(status_code=404, detail=f"{entity[:-1]} not found") conn.execute(f"DELETE FROM {table} WHERE id = ?", (eid,)) conn.commit() # ─── Dynamic settings routes ───────────────────────────────────────────────── # Register CRUD for each settings entity. Note: these use raw request body # to avoid Pydantic model explosion — the schemas are validated by # _SETTINGS_SCHEMAS above. _SETTINGS_ENTITIES = ["categories", "makes", "models", "key_names", "key_types", "badge_types"] for _ent in _SETTINGS_ENTITIES: @app.get(f"/api/settings/{_ent}", name=f"list_{_ent}") def _list(entity=_ent): conn = get_db() result = _settings_list(conn, entity) conn.close() return result @app.get(f"/api/settings/{_ent}/{{eid}}", name=f"get_{_ent}") def _get(eid: int, entity=_ent): conn = get_db() try: result = _settings_get(conn, entity, eid) conn.close() return result except HTTPException: conn.close() raise @app.post(f"/api/settings/{_ent}", status_code=201, name=f"create_{_ent}") async def _create(request: Request, entity=_ent): body = await request.json() conn = get_db() try: result = _settings_create(conn, entity, body) conn.close() return result except HTTPException: conn.close() raise @app.put(f"/api/settings/{_ent}/{{eid}}", name=f"update_{_ent}") async def _update(eid: int, request: Request, entity=_ent): body = await request.json() conn = get_db() try: result = _settings_update(conn, entity, eid, body) conn.close() return result except HTTPException: conn.close() raise @app.delete(f"/api/settings/{_ent}/{{eid}}", status_code=204, name=f"delete_{_ent}") def _delete(eid: int, entity=_ent): conn = get_db() try: _settings_delete(conn, entity, eid) conn.close() except HTTPException: conn.close() raise # ─── Phase C: Helpers ──────────────────────────────────────────────────────── VALID_ROLES = {"admin", "technician", "readonly"} def _hash_password(password: str) -> str: """Simple SHA-256 password hashing.""" return hashlib.sha256(password.encode()).hexdigest() def _generate_token() -> str: """Generate a random session token.""" return secrets.token_hex(32) def _log_activity(conn: sqlite3.Connection, action: str, entity_type: str, entity_id: int, details: str = "", user_id: int = None): """Insert an activity log entry.""" conn.execute( """INSERT INTO activity_log (user_id, action, entity_type, entity_id, details) VALUES (?, ?, ?, ?, ?)""", (user_id, action, entity_type, entity_id, details), ) def _user_to_dict(row: sqlite3.Row) -> dict: """Convert user row to dict, excluding password_hash.""" d = row_to_dict(row) d.pop("password_hash", None) return d # ─── Phase C: Pydantic Models ──────────────────────────────────────────────── class UserCreate(BaseModel): username: str password: str role: Optional[str] = "technician" class UserUpdate(BaseModel): role: Optional[str] = None password: Optional[str] = None class LoginRequest(BaseModel): username: str password: str class GeofenceCreate(BaseModel): name: str points: list color: Optional[str] = "#3388ff" user_ids: Optional[list[int]] = None class GeofenceUpdate(BaseModel): name: Optional[str] = None points: Optional[list] = None color: Optional[str] = None user_ids: Optional[list[int]] = None class GeofencePointCheck(BaseModel): lat: float lng: float # ─── Phase C: Users API ────────────────────────────────────────────────────── @app.post("/api/users", status_code=201) def create_user(body: UserCreate): username = body.username.strip() if not username: raise HTTPException(status_code=422, detail="Username must not be empty") password = body.password if not password: raise HTTPException(status_code=422, detail="Password must not be empty") role = body.role or "technician" if role not in VALID_ROLES: raise HTTPException( status_code=422, detail=f"Invalid role '{role}'. Must be one of: {', '.join(sorted(VALID_ROLES))}", ) conn = get_db() password_hash = _hash_password(password) try: cursor = conn.execute( "INSERT INTO users (username, password_hash, role) VALUES (?, ?, ?)", (username, password_hash, role), ) conn.commit() uid = cursor.lastrowid _log_activity(conn, "created", "user", uid, f"User '{username}' created") conn.commit() except sqlite3.IntegrityError: conn.close() raise HTTPException(status_code=409, detail=f"User '{username}' already exists") row = conn.execute("SELECT * FROM users WHERE id = ?", (uid,)).fetchone() conn.close() return _user_to_dict(row) @app.get("/api/users") def list_users(): conn = get_db() rows = conn.execute("SELECT * FROM users ORDER BY username").fetchall() conn.close() return [_user_to_dict(r) for r in rows] @app.get("/api/users/{user_id}") def get_user(user_id: int): conn = get_db() row = conn.execute("SELECT * FROM users WHERE id = ?", (user_id,)).fetchone() if row is None: conn.close() raise HTTPException(status_code=404, detail="User not found") conn.close() return _user_to_dict(row) @app.put("/api/users/{user_id}") def update_user(user_id: int, body: UserUpdate): conn = get_db() existing = conn.execute("SELECT * FROM users WHERE id = ?", (user_id,)).fetchone() if existing is None: conn.close() raise HTTPException(status_code=404, detail="User not found") if body.role is not None: if body.role not in VALID_ROLES: conn.close() raise HTTPException( status_code=422, detail=f"Invalid role '{body.role}'. Must be one of: {', '.join(sorted(VALID_ROLES))}", ) conn.execute("UPDATE users SET role = ? WHERE id = ?", (body.role, user_id)) if body.password is not None: pw = body.password.strip() if not pw: conn.close() raise HTTPException(status_code=422, detail="Password must not be empty") password_hash = _hash_password(pw) conn.execute("UPDATE users SET password_hash = ? WHERE id = ?", (password_hash, user_id)) conn.commit() row = conn.execute("SELECT * FROM users WHERE id = ?", (user_id,)).fetchone() conn.close() return _user_to_dict(row) @app.delete("/api/users/{user_id}", status_code=204) def delete_user(user_id: int): conn = get_db() existing = conn.execute("SELECT id FROM users WHERE id = ?", (user_id,)).fetchone() if existing is None: conn.close() raise HTTPException(status_code=404, detail="User not found") conn.execute("DELETE FROM users WHERE id = ?", (user_id,)) conn.commit() conn.close() # ─── User's assigned geofences (service areas) ────────────────────────── @app.get("/api/users/{user_id}/geofences") def list_user_geofences(user_id: int): """List geofences assigned to a user (their service areas).""" conn = get_db() existing = conn.execute("SELECT id FROM users WHERE id = ?", (user_id,)).fetchone() if existing is None: conn.close() raise HTTPException(status_code=404, detail="User not found") rows = conn.execute( """SELECT g.* FROM geofences g JOIN geofence_users gu ON gu.geofence_id = g.id WHERE gu.user_id = ? ORDER BY g.name""", (user_id,), ).fetchall() result = [_geofence_row(r, conn) for r in rows] conn.close() return result # ─── Phase C: Auth API ─────────────────────────────────────────────────────── @app.post("/api/auth/login") def login(body: LoginRequest): conn = get_db() row = conn.execute( "SELECT * FROM users WHERE username = ?", (body.username,) ).fetchone() if row is None: conn.close() raise HTTPException(status_code=401, detail="Invalid username or password") password_hash = _hash_password(body.password) if password_hash != row["password_hash"]: conn.close() raise HTTPException(status_code=401, detail="Invalid username or password") token = _generate_token() conn.execute( "INSERT INTO sessions (user_id, token) VALUES (?, ?)", (row["id"], token), ) conn.commit() conn.close() result = _user_to_dict(row) result["token"] = token return result @app.get("/api/auth/me") def auth_me(request: Request): """Return the current authenticated user from the Authorization header.""" auth_header = request.headers.get("Authorization", "") if not auth_header.startswith("Bearer "): raise HTTPException(status_code=401, detail="Missing or invalid Authorization header") token = auth_header[7:] conn = get_db() row = conn.execute( "SELECT u.* FROM users u JOIN sessions s ON u.id = s.user_id WHERE s.token = ?", (token,), ).fetchone() conn.close() if row is None: raise HTTPException(status_code=401, detail="Invalid or expired token") return _user_to_dict(row) # ─── Phase C: Geofences API ────────────────────────────────────────────────── @app.post("/api/geofences", status_code=201) def create_geofence(body: GeofenceCreate): conn = get_db() points_json = _json.dumps(body.points) cursor = conn.execute( "INSERT INTO geofences (name, points, color) VALUES (?, ?, ?)", (body.name, points_json, body.color or "#3388ff"), ) gid = cursor.lastrowid # Assign users if provided if body.user_ids: try: _sync_geofence_users(conn, gid, body.user_ids) except Exception: conn.close() raise HTTPException(status_code=422, detail="One or more user IDs are invalid") _log_activity(conn, "created", "geofence", gid, f"Geofence '{body.name}' created") conn.commit() row = conn.execute("SELECT * FROM geofences WHERE id = ?", (gid,)).fetchone() result = _geofence_row(row, conn) conn.close() return result @app.get("/api/geofences") def list_geofences(): conn = get_db() rows = conn.execute("SELECT * FROM geofences ORDER BY name").fetchall() result = [_geofence_row(r, conn) for r in rows] conn.close() return result @app.put("/api/geofences/{geofence_id}") def update_geofence(geofence_id: int, body: GeofenceUpdate): conn = get_db() existing = conn.execute( "SELECT id FROM geofences WHERE id = ?", (geofence_id,) ).fetchone() if existing is None: conn.close() raise HTTPException(status_code=404, detail="Geofence not found") updates = {} if body.name is not None: updates["name"] = body.name if body.points is not None: updates["points"] = _json.dumps(body.points) if body.color is not None: updates["color"] = body.color if updates: updates["updated_at"] = "datetime('now')" set_clause = ", ".join( f"{k} = {v}" if k == "updated_at" else f"{k} = ?" for k, v in updates.items() ) values = [v for k, v in updates.items() if k != "updated_at"] conn.execute( f"UPDATE geofences SET {set_clause} WHERE id = ?", values + [geofence_id], ) conn.commit() # Sync assigned users if provided if body.user_ids is not None: _sync_geofence_users(conn, geofence_id, body.user_ids) conn.commit() row = conn.execute( "SELECT * FROM geofences WHERE id = ?", (geofence_id,) ).fetchone() result = _geofence_row(row, conn) conn.close() return result @app.delete("/api/geofences/{geofence_id}", status_code=204) def delete_geofence(geofence_id: int): conn = get_db() existing = conn.execute( "SELECT id FROM geofences WHERE id = ?", (geofence_id,) ).fetchone() if existing is None: conn.close() raise HTTPException(status_code=404, detail="Geofence not found") conn.execute("DELETE FROM geofences WHERE id = ?", (geofence_id,)) conn.commit() conn.close() # ─── Phase 0: Proximity & Geofence Check ───────────────────────────────────── def _point_in_polygon(lat: float, lng: float, polygon: list) -> bool: """Ray-casting algorithm for point-in-polygon test.""" inside = False n = len(polygon) if n < 3: return False j = n - 1 for i in range(n): yi = polygon[i]["lat"] if isinstance(polygon[i], dict) else polygon[i][0] xi = polygon[i]["lng"] if isinstance(polygon[i], dict) else polygon[i][1] yj = polygon[j]["lat"] if isinstance(polygon[j], dict) else polygon[j][0] xj = polygon[j]["lng"] if isinstance(polygon[j], dict) else polygon[j][1] if ((yi > lat) != (yj > lat)) and (lng < (xj - xi) * (lat - yi) / (yj - yi) + xi): inside = not inside j = i return inside @app.get("/api/proximity") def proximity_check( lat: float = Query(...), lng: float = Query(...), radius_meters: int = Query(200, ge=1, le=50000), ): """ Return assets within radius_meters of (lat, lng), sorted by distance. Uses Haversine formula for accurate spherical distance. """ conn = get_db() rows = conn.execute(""" SELECT * FROM ( SELECT *, ( 6371000 * acos( cos(radians(?)) * cos(radians(latitude)) * cos(radians(longitude) - radians(?)) + sin(radians(?)) * sin(radians(latitude)) ) ) AS distance_meters FROM assets WHERE latitude IS NOT NULL AND longitude IS NOT NULL ) WHERE distance_meters <= ? ORDER BY distance_meters LIMIT 50 """, (lat, lng, lat, radius_meters)).fetchall() conn.close() results = [row_to_dict(r) for r in rows] return results @app.post("/api/geofences/check") def check_geofence_point(body: GeofencePointCheck): """ Check if a GPS point falls inside any geofence polygon. Returns list of matching geofences. """ conn = get_db() rows = conn.execute("SELECT * FROM geofences ORDER BY name").fetchall() conn.close() matches = [] for row in rows: points = _json.loads(row["points"]) if _point_in_polygon(body.lat, body.lng, points): matches.append(row_to_dict(row)) return matches # ─── Phase C: Visits API & Auto-visit Logging ──────────────────────────────── def _auto_log_visit(conn: sqlite3.Connection, user_id: int, asset_id: int, checkin_time: str): """Check if a visit should be logged for recent check-ins at same asset by same user.""" if user_id is None: return # Find the most recent visit for this user+asset prev = conn.execute( """SELECT id, checkin_time FROM visits WHERE user_id = ? AND asset_id = ? ORDER BY checkin_time DESC LIMIT 1""", (user_id, asset_id), ).fetchone() if prev is None: # No prior visit — check if there are at least 2 check-ins in the window rows = conn.execute( """SELECT id, created_at FROM checkins WHERE user_id = ? AND asset_id = ? ORDER BY created_at DESC LIMIT 2""", (user_id, asset_id), ).fetchall() if len(rows) >= 2: t1 = rows[-1]["created_at"] t2 = rows[0]["created_at"] conn.execute( """INSERT INTO visits (user_id, asset_id, checkin_time, checkout_time) VALUES (?, ?, ?, ?)""", (user_id, asset_id, t1, t2), ) else: conn.execute( "UPDATE visits SET checkout_time = ? WHERE id = ?", (checkin_time, prev["id"]), ) @app.post("/api/visits", status_code=201) def create_visit(body: VisitCreate): conn = get_db() existing = conn.execute("SELECT id FROM assets WHERE id = ?", (body.asset_id,)).fetchone() if existing is None: conn.close() raise HTTPException(status_code=404, detail="Asset not found") cursor = conn.execute( """INSERT INTO visits (user_id, asset_id, checkin_time, checkout_time, duration_minutes) VALUES (?, ?, datetime('now'), datetime('now'), ?)""", (body.user_id, body.asset_id, body.duration_minutes or 0), ) conn.commit() visit_id = cursor.lastrowid # Activity log _log_activity(conn, "created", "visit", visit_id, f"Visit logged for asset {body.asset_id}", user_id=body.user_id) conn.commit() row = conn.execute("SELECT * FROM visits WHERE id = ?", (visit_id,)).fetchone() conn.close() return row_to_dict(row) @app.get("/api/visits") def list_visits( asset_id: Optional[int] = Query(None), user_id: Optional[int] = Query(None), date_from: Optional[str] = Query(None), date_to: Optional[str] = Query(None), limit: int = Query(100, ge=1, le=1000), offset: int = Query(0, ge=0), ): conn = get_db() conditions = [] params = [] if asset_id is not None: conditions.append("v.asset_id = ?") params.append(asset_id) if user_id is not None: conditions.append("v.user_id = ?") params.append(user_id) if date_from: conditions.append("v.checkin_time >= ?") params.append(date_from) if date_to: conditions.append("v.checkin_time <= ?") params.append(date_to + " 23:59:59") where = " AND ".join(conditions) sql = """SELECT v.*, a.name AS asset_name, a.machine_id, u.username AS user_name FROM visits v LEFT JOIN assets a ON v.asset_id = a.id LEFT JOIN users u ON v.user_id = u.id""" if where: sql += f" WHERE {where}" sql += " ORDER BY v.checkin_time DESC LIMIT ? OFFSET ?" params.extend([limit, offset]) rows = conn.execute(sql, params).fetchall() conn.close() return [row_to_dict(r) for r in rows] @app.get("/api/visits/stats") def get_visit_stats(): conn = get_db() total_visits = conn.execute("SELECT COUNT(*) FROM visits").fetchone()[0] per_asset = conn.execute( """SELECT a.name, a.machine_id, COUNT(*) AS cnt FROM visits v JOIN assets a ON v.asset_id = a.id GROUP BY v.asset_id ORDER BY cnt DESC""" ).fetchall() visits_per_asset = [ {"name": r["name"], "machine_id": r["machine_id"], "count": r["cnt"]} for r in per_asset ] per_user = conn.execute( """SELECT u.username, COUNT(*) AS visit_count, SUM( CASE WHEN v.checkout_time IS NOT NULL AND v.checkin_time IS NOT NULL THEN (julianday(v.checkout_time) - julianday(v.checkin_time)) * 1440 ELSE 0 END ) AS total_minutes FROM visits v JOIN users u ON v.user_id = u.id GROUP BY v.user_id ORDER BY total_minutes DESC""" ).fetchall() time_on_site = [ {"username": r["username"], "visit_count": r["visit_count"], "total_minutes": round(r["total_minutes"] or 0, 1)} for r in per_user ] conn.close() return { "total_visits": total_visits, "visits_per_asset": visits_per_asset, "time_on_site": time_on_site, } # ─── Phase C: Activity Feed API ────────────────────────────────────────────── @app.get("/api/activity") def list_activity( user_id: Optional[int] = Query(None), entity_type: Optional[str] = Query(None), date_from: Optional[str] = Query(None), date_to: Optional[str] = Query(None), limit: int = Query(100, ge=1, le=1000), offset: int = Query(0, ge=0), ): conn = get_db() conditions = [] params = [] if user_id is not None: conditions.append("a.user_id = ?") params.append(user_id) if entity_type is not None: conditions.append("a.entity_type = ?") params.append(entity_type) if date_from: conditions.append("a.created_at >= ?") params.append(date_from) if date_to: conditions.append("a.created_at <= ?") params.append(date_to + " 23:59:59") where = " AND ".join(conditions) sql = """SELECT a.*, u.username AS user_name FROM activity_log a LEFT JOIN users u ON a.user_id = u.id""" if where: sql += f" WHERE {where}" sql += " ORDER BY a.created_at DESC, a.id DESC LIMIT ? OFFSET ?" params.extend([limit, offset]) rows = conn.execute(sql, params).fetchall() conn.close() return [row_to_dict(r) for r in rows] # ─── Phase C: Export Endpoints Extension ───────────────────────────────────── @app.get("/api/export/service-summary") def export_service_summary_csv(): conn = get_db() rows = conn.execute( """SELECT c.name AS customer_name, l.name AS location_name, COUNT(a.id) AS asset_count, MAX(ck.created_at) AS last_checkin FROM customers c LEFT JOIN locations l ON l.customer_id = c.id LEFT JOIN assets a ON a.customer_id = c.id LEFT JOIN checkins ck ON ck.asset_id = a.id GROUP BY c.id, l.id ORDER BY c.name, l.name""" ).fetchall() conn.close() def _gen(): output = io.StringIO() writer = csv.DictWriter(output, fieldnames=["customer_name", "location_name", "asset_count", "last_checkin"]) writer.writeheader() yield output.getvalue() output.seek(0) output.truncate(0) for row in rows: d = row_to_dict(row) d["last_checkin"] = d["last_checkin"] or "" writer.writerow(d) yield output.getvalue() output.seek(0) output.truncate(0) return StreamingResponse( _gen(), media_type="text/csv", headers={"Content-Disposition": "attachment; filename=service_summary.csv"}, ) # ─── File Uploads ─────────────────────────────────────────────────────────── ICON_MAX_SIZE = 2 * 1024 * 1024 # 2 MB PHOTO_MAX_SIZE = 10 * 1024 * 1024 # 10 MB ICON_ALLOWED_EXTS = {".png", ".jpg", ".jpeg", ".svg"} PHOTO_ALLOWED_EXTS = {".png", ".jpg", ".jpeg"} def _save_upload(upload: UploadFile, subdir: str, allowed_exts: set, max_size: int) -> str: """Save uploaded file to uploads/{subdir}/ with a UUID filename. Returns the relative URL path, e.g. /uploads/icons/abc123.png. """ ext = Path(upload.filename or "").suffix.lower() if not ext or ext not in allowed_exts: allowed = ", ".join(sorted(allowed_exts)) raise HTTPException(400, f"Invalid file type. Allowed: {allowed}") contents = upload.file.read() if len(contents) > max_size: mb = max_size // (1024 * 1024) raise HTTPException(413, f"File too large. Maximum size: {mb} MB") dest_dir = UPLOADS_DIR / subdir dest_dir.mkdir(parents=True, exist_ok=True) fname = f"{uuid.uuid4().hex}{ext}" (dest_dir / fname).write_bytes(contents) return f"/uploads/{subdir}/{fname}" @app.post("/api/upload/icon", status_code=201) async def upload_icon(file: UploadFile = File(...)): path = _save_upload(file, "icons", ICON_ALLOWED_EXTS, ICON_MAX_SIZE) return {"path": path} @app.post("/api/upload/photo", status_code=201) async def upload_photo(file: UploadFile = File(...)): path = _save_upload(file, "photos", PHOTO_ALLOWED_EXTS, PHOTO_MAX_SIZE) return {"path": path} @app.post("/api/ocr", status_code=200) async def ocr_sticker(file: UploadFile = File(...)): """OCR a sticker photo to extract machine_id from XXXXX-XXXXXX format. Accepts an image upload, runs Tesseract OCR, and looks for a pattern like '12345-678901'. Returns the extracted machine_id or an error. """ # Validate file extension ext = file.filename.rsplit(".", 1)[-1].lower() if "." in (file.filename or "") else "" if ext not in {"png", "jpg", "jpeg", "webp", "bmp"}: raise HTTPException(status_code=400, detail=f"Unsupported image format: .{ext}") contents = await file.read() if len(contents) > 10 * 1024 * 1024: # 10MB max raise HTTPException(status_code=400, detail="Image too large (max 10MB)") # Save temp file for OCR temp_dir = Path(UPLOADS_DIR / "ocr") temp_dir.mkdir(parents=True, exist_ok=True) temp_path = temp_dir / f"{uuid.uuid4().hex}.{ext or 'jpg'}" temp_path.write_bytes(contents) try: img = PILImage.open(temp_path) # Preprocess: convert to grayscale and increase contrast for better OCR img_gray = img.convert("L") text = pytesseract.image_to_string(img_gray, config="--psm 6") except Exception as e: temp_path.unlink(missing_ok=True) raise HTTPException(status_code=500, detail=f"OCR processing failed: {str(e)}") # Don't keep temp file after processing temp_path.unlink(missing_ok=True) # Search for XXXXX-XXXXXX pattern (5 digits - 6 digits or more) match = re.search(r"(\d{5})[-\s]*(\d{6,})", text) if match: machine_id = f"{match.group(1)}-{match.group(2)}" return { "machine_id": machine_id, "raw_text": text.strip()[:500], "confidence": "high", } # Try looser: any 5+ digit number as machine_id loose = re.search(r"(\d{5,})", text) if loose: return { "machine_id": loose.group(1), "raw_text": text.strip()[:500], "confidence": "low", } return { "machine_id": None, "raw_text": text.strip()[:500], "confidence": "none", "detail": "No machine ID pattern found in image. Try again with better lighting.", } # ─── Static Files (mounted last to not shadow routes) ────────────────────── app.mount("/uploads", StaticFiles(directory=str(UPLOADS_DIR)), name="uploads") if STATIC_DIR.exists(): app.mount("/", StaticFiles(directory=str(STATIC_DIR), html=True), name="static")