2590 lines
91 KiB
Python
2590 lines
91 KiB
Python
"""
|
|
Canteen Asset Geolocation Tool — FastAPI server.
|
|
|
|
Single-file backend: SQLite storage, asset CRUD, machine_id search,
|
|
check-ins with GPS, stats, and CSV export.
|
|
v2 schema: full asset management with customers, locations, keys, badges, etc.
|
|
"""
|
|
import csv
|
|
import hashlib
|
|
import io
|
|
import json as _json
|
|
import os
|
|
import re
|
|
import secrets
|
|
import sqlite3
|
|
import uuid
|
|
from contextlib import asynccontextmanager
|
|
from pathlib import Path
|
|
|
|
import pytesseract
|
|
from PIL import Image as PILImage
|
|
|
|
from fastapi import FastAPI, HTTPException, Query, Request, UploadFile, File
|
|
from fastapi.middleware.cors import CORSMiddleware
|
|
from fastapi.responses import JSONResponse, StreamingResponse
|
|
from fastapi.staticfiles import StaticFiles
|
|
from pydantic import BaseModel
|
|
from typing import Optional, List
|
|
|
|
# ─── Config ─────────────────────────────────────────────────────────────────
|
|
|
|
VALID_CATEGORIES = {"Furniture", "Appliances", "Utensils & Serveware", "Equipment", "Other"}
|
|
VALID_STATUSES = {"active", "maintenance", "retired"}
|
|
DB_PATH = os.environ.get("CANTEEN_DB_PATH", str(Path(__file__).parent / "assets.db"))
|
|
UPLOADS_DIR = Path(os.environ.get("CANTEEN_UPLOADS_DIR", str(Path(__file__).parent / "uploads")))
|
|
STATIC_DIR = Path(__file__).parent / "static"
|
|
|
|
|
|
# ─── Database ───────────────────────────────────────────────────────────────
|
|
|
|
|
|
def get_db() -> sqlite3.Connection:
|
|
"""Return a new DB connection with WAL + foreign keys enabled."""
|
|
conn = sqlite3.connect(DB_PATH)
|
|
conn.execute("PRAGMA journal_mode=WAL")
|
|
conn.execute("PRAGMA foreign_keys=ON")
|
|
conn.row_factory = sqlite3.Row
|
|
return conn
|
|
|
|
|
|
def _create_v2_tables(conn: sqlite3.Connection):
|
|
"""Create all v2 tables if they don't exist."""
|
|
conn.executescript("""
|
|
CREATE TABLE IF NOT EXISTS users (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
username TEXT UNIQUE NOT NULL,
|
|
password_hash TEXT NOT NULL,
|
|
role TEXT NOT NULL DEFAULT 'technician',
|
|
created_at TEXT NOT NULL DEFAULT (datetime('now'))
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS customers (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
name TEXT UNIQUE NOT NULL,
|
|
created_at TEXT NOT NULL DEFAULT (datetime('now')),
|
|
updated_at TEXT NOT NULL DEFAULT (datetime('now'))
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS customer_contacts (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
customer_id INTEGER NOT NULL REFERENCES customers(id) ON DELETE CASCADE,
|
|
name TEXT,
|
|
phone TEXT,
|
|
email TEXT
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS locations (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
customer_id INTEGER REFERENCES customers(id),
|
|
name TEXT,
|
|
address TEXT DEFAULT '',
|
|
building_name TEXT DEFAULT '',
|
|
building_number TEXT DEFAULT '',
|
|
floor TEXT DEFAULT '',
|
|
trailer_number TEXT DEFAULT '',
|
|
site_hours TEXT DEFAULT '',
|
|
access_notes TEXT DEFAULT '',
|
|
walking_directions TEXT DEFAULT '',
|
|
map_link TEXT DEFAULT '',
|
|
latitude REAL DEFAULT NULL,
|
|
longitude REAL DEFAULT NULL,
|
|
created_at TEXT NOT NULL DEFAULT (datetime('now')),
|
|
updated_at TEXT NOT NULL DEFAULT (datetime('now'))
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS rooms (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
location_id INTEGER NOT NULL REFERENCES locations(id) ON DELETE CASCADE,
|
|
name TEXT,
|
|
floor TEXT DEFAULT '',
|
|
created_at TEXT NOT NULL DEFAULT (datetime('now')),
|
|
updated_at TEXT NOT NULL DEFAULT (datetime('now'))
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS categories (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
name TEXT UNIQUE NOT NULL,
|
|
icon TEXT DEFAULT ''
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS key_names (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
name TEXT UNIQUE NOT NULL
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS key_types (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
name TEXT UNIQUE NOT NULL
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS badge_types (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
name TEXT UNIQUE NOT NULL
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS makes (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
name TEXT UNIQUE NOT NULL
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS models (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
make_id INTEGER NOT NULL REFERENCES makes(id),
|
|
name TEXT NOT NULL,
|
|
icon_path TEXT
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS assets (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
machine_id TEXT NOT NULL UNIQUE,
|
|
serial_number TEXT DEFAULT '',
|
|
name TEXT NOT NULL,
|
|
description TEXT DEFAULT '',
|
|
category TEXT NOT NULL DEFAULT 'Other',
|
|
status TEXT NOT NULL DEFAULT 'active',
|
|
make TEXT DEFAULT '',
|
|
model TEXT DEFAULT '',
|
|
address TEXT DEFAULT '',
|
|
building_name TEXT DEFAULT '',
|
|
building_number TEXT DEFAULT '',
|
|
floor TEXT DEFAULT '',
|
|
room TEXT DEFAULT '',
|
|
trailer_number TEXT DEFAULT '',
|
|
walking_directions TEXT DEFAULT '',
|
|
map_link TEXT DEFAULT '',
|
|
parking_location TEXT DEFAULT '',
|
|
photo_path TEXT,
|
|
customer_id INTEGER REFERENCES customers(id),
|
|
location_id INTEGER REFERENCES locations(id),
|
|
assigned_to INTEGER REFERENCES users(id),
|
|
latitude REAL DEFAULT NULL,
|
|
longitude REAL DEFAULT NULL,
|
|
geofence_radius_meters INTEGER DEFAULT 50,
|
|
created_at TEXT NOT NULL DEFAULT (datetime('now')),
|
|
updated_at TEXT NOT NULL DEFAULT (datetime('now'))
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS checkins (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
asset_id INTEGER NOT NULL REFERENCES assets(id) ON DELETE CASCADE,
|
|
user_id INTEGER REFERENCES users(id),
|
|
latitude REAL,
|
|
longitude REAL,
|
|
accuracy REAL,
|
|
photo_path TEXT,
|
|
notes TEXT DEFAULT '',
|
|
created_at TEXT NOT NULL DEFAULT (datetime('now'))
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS asset_keys (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
asset_id INTEGER NOT NULL REFERENCES assets(id) ON DELETE CASCADE,
|
|
key_name TEXT,
|
|
key_type TEXT
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS asset_badges (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
asset_id INTEGER NOT NULL REFERENCES assets(id) ON DELETE CASCADE,
|
|
badge_name TEXT
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS settings (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
key TEXT UNIQUE NOT NULL,
|
|
value TEXT
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS activity_log (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
user_id INTEGER REFERENCES users(id),
|
|
action TEXT,
|
|
entity_type TEXT,
|
|
entity_id INTEGER,
|
|
details TEXT,
|
|
created_at TEXT NOT NULL DEFAULT (datetime('now'))
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS geofences (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
name TEXT,
|
|
points TEXT,
|
|
color TEXT,
|
|
created_at TEXT NOT NULL DEFAULT (datetime('now')),
|
|
updated_at TEXT NOT NULL DEFAULT (datetime('now'))
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS geofence_users (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
geofence_id INTEGER NOT NULL REFERENCES geofences(id) ON DELETE CASCADE,
|
|
user_id INTEGER NOT NULL REFERENCES users(id) ON DELETE CASCADE,
|
|
UNIQUE(geofence_id, user_id)
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS visits (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
user_id INTEGER REFERENCES users(id),
|
|
asset_id INTEGER REFERENCES assets(id) ON DELETE CASCADE,
|
|
checkin_time TEXT,
|
|
checkout_time TEXT,
|
|
duration_minutes INTEGER,
|
|
created_at TEXT NOT NULL DEFAULT (datetime('now'))
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS sessions (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
user_id INTEGER REFERENCES users(id) ON DELETE CASCADE,
|
|
token TEXT UNIQUE NOT NULL,
|
|
created_at TEXT NOT NULL DEFAULT (datetime('now'))
|
|
);
|
|
|
|
CREATE INDEX IF NOT EXISTS idx_checkins_asset_id ON checkins(asset_id);
|
|
CREATE INDEX IF NOT EXISTS idx_checkins_created_at ON checkins(created_at);
|
|
""")
|
|
|
|
|
|
def _seed_if_empty(conn: sqlite3.Connection, table: str, columns: tuple, rows: list):
|
|
"""Insert seed rows if table is empty."""
|
|
existing = conn.execute(f"SELECT COUNT(*) FROM {table}").fetchone()[0]
|
|
if existing == 0:
|
|
placeholders = ", ".join(["?"] * len(columns))
|
|
col_names = ", ".join(columns)
|
|
conn.executemany(
|
|
f"INSERT INTO {table} ({col_names}) VALUES ({placeholders})",
|
|
rows,
|
|
)
|
|
|
|
|
|
def _seed_data(conn: sqlite3.Connection):
|
|
"""Insert default seed data for lookup tables."""
|
|
_seed_if_empty(conn, "categories", ("name", "icon"), [
|
|
("Furniture", "🪑"), ("Appliances", "🔌"),
|
|
("Utensils & Serveware", "🍽️"), ("Equipment", "⚙️"), ("Other", "📦"),
|
|
])
|
|
_seed_if_empty(conn, "key_names", ("name",), [
|
|
("MK500",), ("Green Dot",), ("Red Key",), ("Blue Key",),
|
|
("Master Key",), ("Padlock Key",),
|
|
])
|
|
_seed_if_empty(conn, "key_types", ("name",), [
|
|
("Round Short",), ("Barrel",), ("Standard",), ("Flat",), ("Tubular",),
|
|
])
|
|
_seed_if_empty(conn, "badge_types", ("name",), [
|
|
("Disney Contractor Base",), ("Visitor Badge",), ("Employee Badge",),
|
|
("Contractor Badge",), ("Temporary Pass",),
|
|
])
|
|
_seed_if_empty(conn, "makes", ("name",), [
|
|
("Canteen",), ("Hobart",), ("Vollrath",), ("Metro",),
|
|
("Rubbermaid",), ("Cambro",), ("Other",),
|
|
])
|
|
# Seed default admin user
|
|
existing = conn.execute("SELECT COUNT(*) FROM users").fetchone()[0]
|
|
if existing == 0:
|
|
conn.execute(
|
|
"INSERT INTO users (username, password_hash, role) VALUES (?, ?, ?)",
|
|
("admin", "057ba03d6c44104863dc7361fe4578965d1887360f90a0895882e58a6248fc86", "admin"),
|
|
)
|
|
|
|
|
|
def _ensure_unique_machine_id(conn: sqlite3.Connection):
|
|
"""Remove duplicate machine_ids (keep oldest), then create UNIQUE index."""
|
|
dupes = conn.execute("""
|
|
SELECT machine_id FROM assets
|
|
GROUP BY machine_id
|
|
HAVING COUNT(*) > 1
|
|
""").fetchall()
|
|
for dupe in dupes:
|
|
mid = dupe["machine_id"]
|
|
ids = conn.execute(
|
|
"SELECT id FROM assets WHERE machine_id = ? ORDER BY id", (mid,)
|
|
).fetchall()
|
|
keep_id = ids[0]["id"]
|
|
for row in ids[1:]:
|
|
conn.execute("DELETE FROM assets WHERE id = ?", (row["id"],))
|
|
conn.execute("DROP INDEX IF EXISTS idx_assets_machine_id")
|
|
conn.execute("CREATE UNIQUE INDEX idx_assets_machine_id ON assets(machine_id)")
|
|
|
|
|
|
def _migrate_v1_to_v2(conn: sqlite3.Connection):
|
|
"""Migrate existing v1 database (barcode-based) to v2 schema."""
|
|
# Create all new tables first
|
|
_create_v2_tables(conn)
|
|
|
|
# Create assets_v2 with new schema, copying old data
|
|
conn.execute("""
|
|
CREATE TABLE assets_v2 (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
machine_id TEXT NOT NULL UNIQUE,
|
|
serial_number TEXT DEFAULT '',
|
|
name TEXT NOT NULL,
|
|
description TEXT DEFAULT '',
|
|
category TEXT NOT NULL DEFAULT 'Other',
|
|
status TEXT NOT NULL DEFAULT 'active',
|
|
make TEXT DEFAULT '',
|
|
model TEXT DEFAULT '',
|
|
address TEXT DEFAULT '',
|
|
building_name TEXT DEFAULT '',
|
|
building_number TEXT DEFAULT '',
|
|
floor TEXT DEFAULT '',
|
|
room TEXT DEFAULT '',
|
|
trailer_number TEXT DEFAULT '',
|
|
walking_directions TEXT DEFAULT '',
|
|
map_link TEXT DEFAULT '',
|
|
parking_location TEXT DEFAULT '',
|
|
photo_path TEXT,
|
|
customer_id INTEGER REFERENCES customers(id),
|
|
location_id INTEGER REFERENCES locations(id),
|
|
assigned_to INTEGER REFERENCES users(id),
|
|
created_at TEXT NOT NULL DEFAULT (datetime('now')),
|
|
updated_at TEXT NOT NULL DEFAULT (datetime('now'))
|
|
)
|
|
""")
|
|
|
|
conn.execute("""
|
|
INSERT INTO assets_v2 (id, machine_id, name, description, category, status, photo_path, created_at, updated_at)
|
|
SELECT id, barcode, name, description, category, status, photo_path, created_at, updated_at
|
|
FROM assets
|
|
""")
|
|
|
|
conn.execute("DROP TABLE assets")
|
|
conn.execute("ALTER TABLE assets_v2 RENAME TO assets")
|
|
_ensure_unique_machine_id(conn)
|
|
conn.execute("CREATE INDEX IF NOT EXISTS idx_assets_category ON assets(category)")
|
|
|
|
# Add user_id to checkins if not present
|
|
cursor = conn.execute("PRAGMA table_info(checkins)")
|
|
checkin_cols = {row[1] for row in cursor.fetchall()}
|
|
if "user_id" not in checkin_cols:
|
|
conn.execute("ALTER TABLE checkins ADD COLUMN user_id INTEGER REFERENCES users(id)")
|
|
|
|
# Seed lookup data
|
|
_seed_data(conn)
|
|
conn.commit()
|
|
|
|
|
|
def init_db(conn: sqlite3.Connection):
|
|
"""Create tables and indexes if they don't exist. Runs v1→v2 migration if needed."""
|
|
# Check if assets table exists and has old schema
|
|
cursor = conn.execute(
|
|
"SELECT name FROM sqlite_master WHERE type='table' AND name='assets'"
|
|
)
|
|
if cursor.fetchone():
|
|
col_cursor = conn.execute("PRAGMA table_info(assets)")
|
|
columns = {row[1] for row in col_cursor.fetchall()}
|
|
if "barcode" in columns and "machine_id" not in columns:
|
|
_migrate_v1_to_v2(conn)
|
|
return
|
|
|
|
# Fresh install or already migrated — create all tables
|
|
_create_v2_tables(conn)
|
|
_seed_data(conn)
|
|
# Asset indexes — created here (not in _create_v2_tables) to avoid
|
|
# failing during migration when old v1 assets table lacks machine_id.
|
|
_ensure_unique_machine_id(conn)
|
|
conn.execute(
|
|
"CREATE INDEX IF NOT EXISTS idx_assets_category ON assets(category)"
|
|
)
|
|
conn.commit()
|
|
|
|
# Add lat/lng to assets if not present (v3 migration)
|
|
cursor = conn.execute("PRAGMA table_info(assets)")
|
|
asset_cols = {row[1] for row in cursor.fetchall()}
|
|
if "latitude" not in asset_cols:
|
|
conn.execute("ALTER TABLE assets ADD COLUMN latitude REAL DEFAULT NULL")
|
|
if "longitude" not in asset_cols:
|
|
conn.execute("ALTER TABLE assets ADD COLUMN longitude REAL DEFAULT NULL")
|
|
if "geofence_radius_meters" not in asset_cols:
|
|
conn.execute("ALTER TABLE assets ADD COLUMN geofence_radius_meters INTEGER DEFAULT 50")
|
|
|
|
cursor = conn.execute("PRAGMA table_info(locations)")
|
|
loc_cols = {row[1] for row in cursor.fetchall()}
|
|
if "latitude" not in loc_cols:
|
|
conn.execute("ALTER TABLE locations ADD COLUMN latitude REAL DEFAULT NULL")
|
|
if "longitude" not in loc_cols:
|
|
conn.execute("ALTER TABLE locations ADD COLUMN longitude REAL DEFAULT NULL")
|
|
conn.commit()
|
|
|
|
|
|
# ─── App / Middleware ───────────────────────────────────────────────────────
|
|
|
|
|
|
@asynccontextmanager
|
|
async def lifespan(app: FastAPI):
|
|
UPLOADS_DIR.mkdir(parents=True, exist_ok=True)
|
|
conn = get_db()
|
|
init_db(conn)
|
|
conn.close()
|
|
yield
|
|
|
|
|
|
app = FastAPI(title="Canteen Asset Tracker", lifespan=lifespan)
|
|
|
|
app.add_middleware(
|
|
CORSMiddleware,
|
|
allow_origins=["*"],
|
|
allow_credentials=True,
|
|
allow_methods=["*"],
|
|
allow_headers=["*"],
|
|
)
|
|
|
|
|
|
# ─── Auth Middleware ──────────────────────────────────────────────────────────
|
|
|
|
|
|
@app.middleware("http")
|
|
async def auth_middleware(request: Request, call_next):
|
|
"""Require valid Bearer token for all /api/* routes except login."""
|
|
path = request.url.path
|
|
|
|
# Skip auth enforcement in test mode (set by tests/test_server.py)
|
|
if os.environ.get("CANTEEN_SKIP_AUTH") == "1":
|
|
return await call_next(request)
|
|
|
|
# Public paths — no auth required
|
|
if not path.startswith("/api/") or path == "/api/auth/login":
|
|
return await call_next(request)
|
|
|
|
# Extract and validate token
|
|
auth_header = request.headers.get("Authorization", "")
|
|
if not auth_header.startswith("Bearer "):
|
|
return JSONResponse(
|
|
status_code=401,
|
|
content={"detail": "Authentication required"},
|
|
)
|
|
|
|
token = auth_header[7:]
|
|
conn = get_db()
|
|
row = conn.execute(
|
|
"SELECT u.id, u.username, u.role FROM users u "
|
|
"JOIN sessions s ON u.id = s.user_id WHERE s.token = ?",
|
|
(token,),
|
|
).fetchone()
|
|
conn.close()
|
|
|
|
if row is None:
|
|
return JSONResponse(
|
|
status_code=401,
|
|
content={"detail": "Invalid or expired token"},
|
|
)
|
|
|
|
request.state.current_user = {
|
|
"id": row["id"],
|
|
"username": row["username"],
|
|
"role": row["role"],
|
|
}
|
|
request.state.user_id = row["id"]
|
|
|
|
return await call_next(request)
|
|
|
|
|
|
# ─── Global Error Handling ───────────────────────────────────────────────────
|
|
|
|
|
|
@app.exception_handler(HTTPException)
|
|
async def http_exception_handler(request: Request, exc: HTTPException):
|
|
"""Return structured JSON with status detail for all HTTP exceptions."""
|
|
return JSONResponse(
|
|
status_code=exc.status_code,
|
|
content={"detail": exc.detail},
|
|
)
|
|
|
|
|
|
@app.exception_handler(Exception)
|
|
async def generic_exception_handler(request: Request, exc: Exception):
|
|
"""Catch-all for unhandled exceptions — log and return 500."""
|
|
import traceback
|
|
traceback.print_exc()
|
|
return JSONResponse(
|
|
status_code=500,
|
|
content={"detail": "Internal server error"},
|
|
)
|
|
|
|
|
|
# ─── Input sanitization helpers ──────────────────────────────────────────────
|
|
|
|
|
|
def _sanitize_machine_id(machine_id: str) -> str:
|
|
"""Strip whitespace and reject empty machine IDs."""
|
|
clean = machine_id.strip()
|
|
if not clean:
|
|
raise HTTPException(status_code=422, detail="Machine ID must not be empty")
|
|
if len(clean) > 256:
|
|
raise HTTPException(status_code=422, detail="Machine ID too long (max 256 chars)")
|
|
return clean
|
|
|
|
|
|
def _sanitize_name(name: str) -> str:
|
|
"""Trim name and reject empty/massive names."""
|
|
clean = name.strip()
|
|
if not clean:
|
|
raise HTTPException(status_code=422, detail="Name must not be empty")
|
|
if len(clean) > 512:
|
|
raise HTTPException(status_code=422, detail="Name too long (max 512 chars)")
|
|
return clean
|
|
|
|
|
|
# ─── Pydantic Models ────────────────────────────────────────────────────────
|
|
|
|
|
|
class AssetKey(BaseModel):
|
|
key_name: str
|
|
key_type: Optional[str] = ""
|
|
|
|
|
|
class AssetBadge(BaseModel):
|
|
badge_name: str
|
|
|
|
|
|
class AssetCreate(BaseModel):
|
|
machine_id: str
|
|
name: str
|
|
serial_number: Optional[str] = ""
|
|
description: Optional[str] = ""
|
|
category: Optional[str] = "Other"
|
|
status: Optional[str] = "active"
|
|
make: Optional[str] = ""
|
|
model: Optional[str] = ""
|
|
address: Optional[str] = ""
|
|
building_name: Optional[str] = ""
|
|
building_number: Optional[str] = ""
|
|
floor: Optional[str] = ""
|
|
room: Optional[str] = ""
|
|
trailer_number: Optional[str] = ""
|
|
walking_directions: Optional[str] = ""
|
|
map_link: Optional[str] = ""
|
|
parking_location: Optional[str] = ""
|
|
photo_path: Optional[str] = None
|
|
customer_id: Optional[int] = None
|
|
location_id: Optional[int] = None
|
|
assigned_to: Optional[int] = None
|
|
latitude: Optional[float] = None
|
|
longitude: Optional[float] = None
|
|
geofence_radius_meters: Optional[int] = 50
|
|
keys: Optional[List[AssetKey]] = []
|
|
badges: Optional[List[str]] = []
|
|
|
|
|
|
class AssetUpdate(BaseModel):
|
|
machine_id: Optional[str] = None
|
|
name: Optional[str] = None
|
|
serial_number: Optional[str] = None
|
|
description: Optional[str] = None
|
|
category: Optional[str] = None
|
|
status: Optional[str] = None
|
|
make: Optional[str] = None
|
|
model: Optional[str] = None
|
|
address: Optional[str] = None
|
|
building_name: Optional[str] = None
|
|
building_number: Optional[str] = None
|
|
floor: Optional[str] = None
|
|
room: Optional[str] = None
|
|
trailer_number: Optional[str] = None
|
|
walking_directions: Optional[str] = None
|
|
map_link: Optional[str] = None
|
|
parking_location: Optional[str] = None
|
|
photo_path: Optional[str] = None
|
|
customer_id: Optional[int] = None
|
|
location_id: Optional[int] = None
|
|
assigned_to: Optional[int] = None
|
|
latitude: Optional[float] = None
|
|
longitude: Optional[float] = None
|
|
geofence_radius_meters: Optional[int] = None
|
|
keys: Optional[List[AssetKey]] = None
|
|
badges: Optional[List[str]] = None
|
|
|
|
|
|
class CheckinCreate(BaseModel):
|
|
asset_id: int
|
|
latitude: Optional[float] = None
|
|
longitude: Optional[float] = None
|
|
accuracy: Optional[float] = None
|
|
photo_path: Optional[str] = None
|
|
notes: Optional[str] = ""
|
|
user_id: Optional[int] = None
|
|
|
|
|
|
class CheckinUpdate(BaseModel):
|
|
latitude: Optional[float] = None
|
|
longitude: Optional[float] = None
|
|
accuracy: Optional[float] = None
|
|
photo_path: Optional[str] = None
|
|
notes: Optional[str] = None
|
|
user_id: Optional[int] = None
|
|
|
|
|
|
class VisitCreate(BaseModel):
|
|
user_id: Optional[int] = None
|
|
asset_id: int
|
|
latitude: Optional[float] = None
|
|
longitude: Optional[float] = None
|
|
duration_minutes: Optional[int] = None
|
|
|
|
|
|
# ─── Phase B: Customer / Location / Room / Settings Models ──────────────────
|
|
|
|
|
|
class CustomerContact(BaseModel):
|
|
name: Optional[str] = ""
|
|
phone: Optional[str] = ""
|
|
email: Optional[str] = ""
|
|
|
|
|
|
class CustomerCreate(BaseModel):
|
|
name: str
|
|
contacts: Optional[List[CustomerContact]] = []
|
|
|
|
|
|
class CustomerUpdate(BaseModel):
|
|
name: Optional[str] = None
|
|
contacts: Optional[List[CustomerContact]] = None
|
|
|
|
|
|
class LocationCreate(BaseModel):
|
|
customer_id: Optional[int] = None
|
|
name: str
|
|
address: Optional[str] = ""
|
|
building_name: Optional[str] = ""
|
|
building_number: Optional[str] = ""
|
|
floor: Optional[str] = ""
|
|
trailer_number: Optional[str] = ""
|
|
site_hours: Optional[str] = ""
|
|
access_notes: Optional[str] = ""
|
|
walking_directions: Optional[str] = ""
|
|
map_link: Optional[str] = ""
|
|
latitude: Optional[float] = None
|
|
longitude: Optional[float] = None
|
|
|
|
|
|
class LocationUpdate(BaseModel):
|
|
customer_id: Optional[int] = None
|
|
name: Optional[str] = None
|
|
address: Optional[str] = None
|
|
building_name: Optional[str] = None
|
|
building_number: Optional[str] = None
|
|
floor: Optional[str] = None
|
|
trailer_number: Optional[str] = None
|
|
site_hours: Optional[str] = None
|
|
access_notes: Optional[str] = None
|
|
walking_directions: Optional[str] = None
|
|
map_link: Optional[str] = None
|
|
latitude: Optional[float] = None
|
|
longitude: Optional[float] = None
|
|
|
|
|
|
class RoomCreate(BaseModel):
|
|
location_id: int
|
|
name: str
|
|
floor: Optional[str] = ""
|
|
|
|
|
|
class RoomUpdate(BaseModel):
|
|
name: Optional[str] = None
|
|
floor: Optional[str] = None
|
|
location_id: Optional[int] = None
|
|
|
|
|
|
# ─── Helpers ────────────────────────────────────────────────────────────────
|
|
|
|
|
|
def row_to_dict(row: sqlite3.Row) -> dict:
|
|
return dict(row)
|
|
|
|
|
|
def _geofence_row(row: sqlite3.Row, conn: Optional[sqlite3.Connection] = None) -> dict:
|
|
"""Serialize a geofence row, parsing points JSON and including assigned users."""
|
|
d = dict(row)
|
|
if isinstance(d.get("points"), str):
|
|
try:
|
|
d["points"] = _json.loads(d["points"])
|
|
except (_json.JSONDecodeError, TypeError):
|
|
pass
|
|
# Include assigned users
|
|
close_conn = False
|
|
if conn is None:
|
|
conn = get_db()
|
|
close_conn = True
|
|
try:
|
|
user_rows = conn.execute(
|
|
"""SELECT u.id, u.username, u.role FROM geofence_users gu
|
|
JOIN users u ON u.id = gu.user_id
|
|
WHERE gu.geofence_id = ? ORDER BY u.username""",
|
|
(d["id"],),
|
|
).fetchall()
|
|
d["assigned_users"] = [dict(r) for r in user_rows]
|
|
finally:
|
|
if close_conn:
|
|
conn.close()
|
|
return d
|
|
|
|
|
|
def _sync_geofence_users(conn: sqlite3.Connection, geofence_id: int, user_ids: list[int]):
|
|
"""Replace assigned users for a geofence with the given list of user IDs.
|
|
|
|
Validates all user_ids exist before modifying the junction table.
|
|
Does NOT commit — caller manages transaction boundaries.
|
|
"""
|
|
# Validate all user IDs exist
|
|
if user_ids:
|
|
placeholders = ", ".join(["?"] * len(user_ids))
|
|
existing = conn.execute(
|
|
f"SELECT id FROM users WHERE id IN ({placeholders})",
|
|
user_ids,
|
|
).fetchall()
|
|
existing_ids = {r["id"] for r in existing}
|
|
for uid in user_ids:
|
|
if uid not in existing_ids:
|
|
raise HTTPException(
|
|
status_code=422,
|
|
detail=f"User with id {uid} not found",
|
|
)
|
|
|
|
conn.execute("DELETE FROM geofence_users WHERE geofence_id = ?", (geofence_id,))
|
|
for uid in user_ids:
|
|
conn.execute(
|
|
"INSERT INTO geofence_users (geofence_id, user_id) VALUES (?, ?)",
|
|
(geofence_id, uid),
|
|
)
|
|
|
|
|
|
def _validate_category(category: str):
|
|
if category not in VALID_CATEGORIES:
|
|
raise HTTPException(
|
|
status_code=422,
|
|
detail=f"Invalid category '{category}'. Must be one of: {', '.join(sorted(VALID_CATEGORIES))}",
|
|
)
|
|
|
|
|
|
def _validate_status(status: str):
|
|
if status not in VALID_STATUSES:
|
|
raise HTTPException(
|
|
status_code=422,
|
|
detail=f"Invalid status '{status}'. Must be one of: {', '.join(sorted(VALID_STATUSES))}",
|
|
)
|
|
|
|
|
|
def _validate_ref(conn: sqlite3.Connection, table: str, id_val: int, label: str):
|
|
"""Validate a foreign key reference exists."""
|
|
if id_val is not None:
|
|
row = conn.execute(f"SELECT id FROM {table} WHERE id = ?", (id_val,)).fetchone()
|
|
if row is None:
|
|
raise HTTPException(status_code=422, detail=f"{label} with id {id_val} not found")
|
|
|
|
|
|
def _validate_enum_table(conn: sqlite3.Connection, table: str, value: str, label: str):
|
|
"""Validate that a value exists in a lookup table (name column)."""
|
|
if value:
|
|
row = conn.execute(
|
|
f"SELECT id FROM {table} WHERE name = ?", (value,)
|
|
).fetchone()
|
|
if row is None:
|
|
raise HTTPException(
|
|
status_code=422,
|
|
detail=f"Invalid {label} '{value}'. Must exist in {table} table.",
|
|
)
|
|
|
|
|
|
# ─── Task 3: Health ────────────────────────────────────────────────────────
|
|
|
|
|
|
@app.get("/health")
|
|
def health():
|
|
return {"status": "ok"}
|
|
|
|
|
|
# ─── Task 4: POST /api/assets ──────────────────────────────────────────────
|
|
|
|
|
|
def _build_asset_insert(body: AssetCreate, machine_id: str, name: str):
|
|
"""Build column names and values tuple for asset INSERT."""
|
|
columns = [
|
|
"machine_id", "serial_number", "name", "description", "category", "status",
|
|
"make", "model", "address", "building_name", "building_number", "floor",
|
|
"room", "trailer_number", "walking_directions", "map_link",
|
|
"parking_location", "photo_path", "customer_id", "location_id",
|
|
"assigned_to", "latitude", "longitude", "geofence_radius_meters",
|
|
]
|
|
values = (
|
|
machine_id, body.serial_number or "", name, body.description or "",
|
|
body.category or "Other", body.status or "active",
|
|
body.make or "", body.model or "", body.address or "",
|
|
body.building_name or "", body.building_number or "",
|
|
body.floor or "", body.room or "", body.trailer_number or "",
|
|
body.walking_directions or "", body.map_link or "",
|
|
body.parking_location or "", body.photo_path,
|
|
body.customer_id, body.location_id, body.assigned_to,
|
|
body.latitude, body.longitude,
|
|
body.geofence_radius_meters if body.geofence_radius_meters is not None else 50,
|
|
)
|
|
return columns, values
|
|
|
|
|
|
def _get_asset_keys(conn: sqlite3.Connection, asset_id: int) -> list:
|
|
"""Load all keys for an asset."""
|
|
rows = conn.execute(
|
|
"SELECT id, asset_id, key_name, key_type FROM asset_keys WHERE asset_id = ?",
|
|
(asset_id,),
|
|
).fetchall()
|
|
return [row_to_dict(r) for r in rows]
|
|
|
|
|
|
def _get_asset_badges(conn: sqlite3.Connection, asset_id: int) -> list:
|
|
"""Load all badges for an asset."""
|
|
rows = conn.execute(
|
|
"SELECT id, asset_id, badge_name FROM asset_badges WHERE asset_id = ?",
|
|
(asset_id,),
|
|
).fetchall()
|
|
return [row_to_dict(r) for r in rows]
|
|
|
|
|
|
@app.post("/api/assets", status_code=201)
|
|
def create_asset(body: AssetCreate):
|
|
machine_id = _sanitize_machine_id(body.machine_id)
|
|
name = _sanitize_name(body.name)
|
|
_validate_status(body.status or "active")
|
|
|
|
conn = get_db()
|
|
|
|
# DB-based validation for reference fields
|
|
_validate_enum_table(conn, "categories", body.category or "Other", "category")
|
|
_validate_enum_table(conn, "makes", body.make or "", "make")
|
|
if body.customer_id is not None:
|
|
_validate_ref(conn, "customers", body.customer_id, "Customer")
|
|
if body.location_id is not None:
|
|
_validate_ref(conn, "locations", body.location_id, "Location")
|
|
if body.assigned_to is not None:
|
|
_validate_ref(conn, "users", body.assigned_to, "User")
|
|
|
|
columns, values = _build_asset_insert(body, machine_id, name)
|
|
placeholders = ", ".join(["?"] * len(columns))
|
|
col_names = ", ".join(columns)
|
|
|
|
try:
|
|
cursor = conn.execute(
|
|
f"INSERT INTO assets ({col_names}) VALUES ({placeholders})",
|
|
values,
|
|
)
|
|
except sqlite3.IntegrityError:
|
|
conn.close()
|
|
raise HTTPException(
|
|
status_code=409,
|
|
detail=f"Asset with machine_id '{machine_id}' already exists",
|
|
)
|
|
asset_id = cursor.lastrowid
|
|
|
|
# Insert keys
|
|
if body.keys:
|
|
for k in body.keys:
|
|
conn.execute(
|
|
"INSERT INTO asset_keys (asset_id, key_name, key_type) VALUES (?, ?, ?)",
|
|
(asset_id, k.key_name, k.key_type or ""),
|
|
)
|
|
|
|
# Insert badges
|
|
if body.badges:
|
|
for b_name in body.badges:
|
|
conn.execute(
|
|
"INSERT INTO asset_badges (asset_id, badge_name) VALUES (?, ?)",
|
|
(asset_id, b_name),
|
|
)
|
|
|
|
conn.commit()
|
|
|
|
_log_activity(conn, "created", "asset", asset_id,
|
|
f"Asset '{name}' (machine_id: {machine_id}) created")
|
|
conn.commit()
|
|
|
|
row = conn.execute("SELECT * FROM assets WHERE id = ?", (asset_id,)).fetchone()
|
|
result = row_to_dict(row)
|
|
result["keys"] = _get_asset_keys(conn, asset_id)
|
|
result["badges"] = _get_asset_badges(conn, asset_id)
|
|
conn.close()
|
|
return result
|
|
|
|
|
|
# ─── Task 5: GET /api/assets ───────────────────────────────────────────────
|
|
|
|
|
|
@app.get("/api/assets")
|
|
def list_assets(
|
|
category: Optional[str] = Query(None),
|
|
status: Optional[str] = Query(None),
|
|
make: Optional[str] = Query(None),
|
|
model: Optional[str] = Query(None),
|
|
customer_id: Optional[int] = Query(None),
|
|
location_id: Optional[int] = Query(None),
|
|
assigned_to: Optional[int] = Query(None),
|
|
q: Optional[str] = Query(None),
|
|
limit: int = Query(100, ge=1, le=1000),
|
|
offset: int = Query(0, ge=0),
|
|
):
|
|
conn = get_db()
|
|
conditions = []
|
|
params = []
|
|
|
|
if category:
|
|
conditions.append("category = ?")
|
|
params.append(category)
|
|
if status:
|
|
conditions.append("status = ?")
|
|
params.append(status)
|
|
if make:
|
|
conditions.append("make = ?")
|
|
params.append(make)
|
|
if model:
|
|
conditions.append("model = ?")
|
|
params.append(model)
|
|
if customer_id is not None:
|
|
conditions.append("customer_id = ?")
|
|
params.append(customer_id)
|
|
if location_id is not None:
|
|
conditions.append("location_id = ?")
|
|
params.append(location_id)
|
|
if assigned_to is not None:
|
|
conditions.append("assigned_to = ?")
|
|
params.append(assigned_to)
|
|
if q:
|
|
conditions.append("(name LIKE ? OR machine_id LIKE ? OR description LIKE ?)")
|
|
like = f"%{q}%"
|
|
params.extend([like, like, like])
|
|
|
|
where = " AND ".join(conditions)
|
|
sql = "SELECT * FROM assets"
|
|
if where:
|
|
sql += f" WHERE {where}"
|
|
sql += " ORDER BY created_at DESC LIMIT ? OFFSET ?"
|
|
params.extend([limit, offset])
|
|
|
|
rows = conn.execute(sql, params).fetchall()
|
|
conn.close()
|
|
return [row_to_dict(r) for r in rows]
|
|
|
|
|
|
@app.get("/api/assets/search")
|
|
def search_by_machine_id(machine_id: str = Query(...)):
|
|
conn = get_db()
|
|
row = conn.execute(
|
|
"SELECT * FROM assets WHERE machine_id = ?", (machine_id,)
|
|
).fetchone()
|
|
conn.close()
|
|
if row is None:
|
|
raise HTTPException(status_code=404, detail="Asset not found")
|
|
return row_to_dict(row)
|
|
|
|
|
|
@app.get("/api/assets/{asset_id}")
|
|
def get_asset(asset_id: int):
|
|
conn = get_db()
|
|
row = conn.execute("SELECT * FROM assets WHERE id = ?", (asset_id,)).fetchone()
|
|
if row is None:
|
|
conn.close()
|
|
raise HTTPException(status_code=404, detail="Asset not found")
|
|
result = row_to_dict(row)
|
|
result["keys"] = _get_asset_keys(conn, asset_id)
|
|
result["badges"] = _get_asset_badges(conn, asset_id)
|
|
|
|
# Add joined names
|
|
if result.get("customer_id"):
|
|
cust = conn.execute(
|
|
"SELECT name FROM customers WHERE id = ?", (result["customer_id"],)
|
|
).fetchone()
|
|
result["customer_name"] = cust["name"] if cust else None
|
|
if result.get("location_id"):
|
|
loc = conn.execute(
|
|
"SELECT name FROM locations WHERE id = ?", (result["location_id"],)
|
|
).fetchone()
|
|
result["location_name"] = loc["name"] if loc else None
|
|
|
|
conn.close()
|
|
return result
|
|
|
|
|
|
# ─── Task 6: PUT / DELETE /api/assets/{id} ─────────────────────────────────
|
|
|
|
|
|
_TEXT_FIELDS = [
|
|
"name", "machine_id", "serial_number", "description", "make", "model",
|
|
"address", "building_name", "building_number", "floor", "room",
|
|
"trailer_number", "walking_directions", "map_link", "parking_location",
|
|
]
|
|
|
|
|
|
@app.put("/api/assets/{asset_id}")
|
|
def update_asset(asset_id: int, body: AssetUpdate):
|
|
conn = get_db()
|
|
existing = conn.execute("SELECT * FROM assets WHERE id = ?", (asset_id,)).fetchone()
|
|
if existing is None:
|
|
conn.close()
|
|
raise HTTPException(status_code=404, detail="Asset not found")
|
|
|
|
updates = {}
|
|
for field in _TEXT_FIELDS:
|
|
val = getattr(body, field, None)
|
|
if val is not None:
|
|
if field == "name":
|
|
updates[field] = _sanitize_name(val)
|
|
elif field == "machine_id":
|
|
updates[field] = _sanitize_machine_id(val)
|
|
else:
|
|
updates[field] = val
|
|
if body.category is not None:
|
|
_validate_enum_table(conn, "categories", body.category, "category")
|
|
updates["category"] = body.category
|
|
if body.status is not None:
|
|
_validate_status(body.status)
|
|
updates["status"] = body.status
|
|
if body.photo_path is not None:
|
|
updates["photo_path"] = body.photo_path
|
|
if body.customer_id is not None:
|
|
updates["customer_id"] = body.customer_id
|
|
if body.location_id is not None:
|
|
updates["location_id"] = body.location_id
|
|
if body.assigned_to is not None:
|
|
updates["assigned_to"] = body.assigned_to
|
|
if body.latitude is not None:
|
|
updates["latitude"] = body.latitude
|
|
if body.longitude is not None:
|
|
updates["longitude"] = body.longitude
|
|
if body.geofence_radius_meters is not None:
|
|
updates["geofence_radius_meters"] = body.geofence_radius_meters
|
|
|
|
if updates:
|
|
updates["updated_at"] = "datetime('now')"
|
|
set_clause = ", ".join(
|
|
f"{k} = {v}" if k == "updated_at" else f"{k} = ?"
|
|
for k, v in updates.items()
|
|
)
|
|
values = [v for k, v in updates.items() if k != "updated_at"]
|
|
conn.execute(
|
|
f"UPDATE assets SET {set_clause} WHERE id = ?",
|
|
values + [asset_id],
|
|
)
|
|
conn.commit()
|
|
|
|
row = conn.execute("SELECT * FROM assets WHERE id = ?", (asset_id,)).fetchone()
|
|
_log_activity(conn, "updated", "asset", asset_id,
|
|
f"Asset '{row['name']}' updated")
|
|
conn.commit()
|
|
conn.close()
|
|
return row_to_dict(row)
|
|
|
|
|
|
@app.delete("/api/assets/{asset_id}", status_code=204)
|
|
def delete_asset(asset_id: int):
|
|
conn = get_db()
|
|
existing = conn.execute("SELECT id FROM assets WHERE id = ?", (asset_id,)).fetchone()
|
|
if existing is None:
|
|
conn.close()
|
|
raise HTTPException(status_code=404, detail="Asset not found")
|
|
# Delete dependent rows first (visits, asset_keys, asset_badges lack ON DELETE CASCADE)
|
|
conn.execute("DELETE FROM visits WHERE asset_id = ?", (asset_id,))
|
|
conn.execute("DELETE FROM asset_keys WHERE asset_id = ?", (asset_id,))
|
|
conn.execute("DELETE FROM asset_badges WHERE asset_id = ?", (asset_id,))
|
|
conn.execute("DELETE FROM assets WHERE id = ?", (asset_id,))
|
|
_log_activity(conn, "deleted", "asset", asset_id,
|
|
f"Asset {asset_id} deleted")
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
|
|
# ─── Task 8: POST /api/checkins ─────────────────────────────────────────────
|
|
|
|
|
|
@app.post("/api/checkins", status_code=201)
|
|
def create_checkin(body: CheckinCreate):
|
|
conn = get_db()
|
|
existing = conn.execute("SELECT id FROM assets WHERE id = ?", (body.asset_id,)).fetchone()
|
|
if existing is None:
|
|
conn.close()
|
|
raise HTTPException(status_code=404, detail="Asset not found")
|
|
|
|
cursor = conn.execute(
|
|
"""INSERT INTO checkins (asset_id, user_id, latitude, longitude, accuracy, photo_path, notes)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?)""",
|
|
(body.asset_id, body.user_id, body.latitude, body.longitude,
|
|
body.accuracy, body.photo_path, body.notes or ""),
|
|
)
|
|
conn.commit()
|
|
checkin_id = cursor.lastrowid
|
|
|
|
# Auto-log visit
|
|
row = conn.execute("SELECT created_at FROM checkins WHERE id = ?", (checkin_id,)).fetchone()
|
|
_auto_log_visit(conn, body.user_id, body.asset_id, row["created_at"])
|
|
|
|
# Activity log
|
|
_log_activity(conn, "created", "checkin", checkin_id,
|
|
f"Check-in for asset {body.asset_id}",
|
|
user_id=body.user_id)
|
|
conn.commit()
|
|
|
|
row = conn.execute("SELECT * FROM checkins WHERE id = ?", (checkin_id,)).fetchone()
|
|
conn.close()
|
|
return row_to_dict(row)
|
|
|
|
|
|
# ─── Task 9: GET /api/checkins ──────────────────────────────────────────────
|
|
|
|
|
|
@app.get("/api/checkins")
|
|
def list_checkins(
|
|
asset_id: Optional[int] = Query(None),
|
|
user_id: Optional[int] = Query(None),
|
|
limit: int = Query(100, ge=1, le=1000),
|
|
offset: int = Query(0, ge=0),
|
|
):
|
|
conn = get_db()
|
|
conditions = []
|
|
params = []
|
|
|
|
if asset_id is not None:
|
|
conditions.append("asset_id = ?")
|
|
params.append(asset_id)
|
|
if user_id is not None:
|
|
conditions.append("user_id = ?")
|
|
params.append(user_id)
|
|
|
|
where = " AND ".join(conditions)
|
|
sql = "SELECT * FROM checkins"
|
|
if where:
|
|
sql += f" WHERE {where}"
|
|
sql += " ORDER BY created_at DESC, id DESC LIMIT ? OFFSET ?"
|
|
params.extend([limit, offset])
|
|
|
|
rows = conn.execute(sql, params).fetchall()
|
|
conn.close()
|
|
return [row_to_dict(r) for r in rows]
|
|
|
|
|
|
@app.get("/api/checkins/{checkin_id}")
|
|
def get_checkin(checkin_id: int):
|
|
conn = get_db()
|
|
row = conn.execute("SELECT * FROM checkins WHERE id = ?", (checkin_id,)).fetchone()
|
|
if row is None:
|
|
conn.close()
|
|
raise HTTPException(status_code=404, detail="Checkin not found")
|
|
conn.close()
|
|
return row_to_dict(row)
|
|
|
|
|
|
@app.put("/api/checkins/{checkin_id}")
|
|
def update_checkin(checkin_id: int, body: CheckinUpdate):
|
|
conn = get_db()
|
|
existing = conn.execute("SELECT id FROM checkins WHERE id = ?", (checkin_id,)).fetchone()
|
|
if existing is None:
|
|
conn.close()
|
|
raise HTTPException(status_code=404, detail="Checkin not found")
|
|
|
|
updates = {}
|
|
for field in ("latitude", "longitude", "accuracy", "photo_path", "notes", "user_id"):
|
|
val = getattr(body, field, None)
|
|
if val is not None:
|
|
updates[field] = val
|
|
|
|
if updates:
|
|
set_clause = ", ".join(f"{k} = ?" for k in updates)
|
|
values = list(updates.values())
|
|
conn.execute(
|
|
f"UPDATE checkins SET {set_clause} WHERE id = ?",
|
|
values + [checkin_id],
|
|
)
|
|
conn.commit()
|
|
|
|
row = conn.execute("SELECT * FROM checkins WHERE id = ?", (checkin_id,)).fetchone()
|
|
conn.close()
|
|
return row_to_dict(row)
|
|
|
|
|
|
@app.delete("/api/checkins/{checkin_id}", status_code=204)
|
|
def delete_checkin(checkin_id: int):
|
|
conn = get_db()
|
|
existing = conn.execute("SELECT id FROM checkins WHERE id = ?", (checkin_id,)).fetchone()
|
|
if existing is None:
|
|
conn.close()
|
|
raise HTTPException(status_code=404, detail="Checkin not found")
|
|
conn.execute("DELETE FROM checkins WHERE id = ?", (checkin_id,))
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
|
|
# ─── Task 10: GET /api/stats ────────────────────────────────────────────────
|
|
|
|
|
|
@app.get("/api/stats")
|
|
def get_stats():
|
|
conn = get_db()
|
|
total_assets = conn.execute("SELECT COUNT(*) FROM assets").fetchone()[0]
|
|
total_checkins = conn.execute("SELECT COUNT(*) FROM checkins").fetchone()[0]
|
|
|
|
cats = conn.execute(
|
|
"SELECT category, COUNT(*) AS cnt FROM assets GROUP BY category ORDER BY cnt DESC"
|
|
).fetchall()
|
|
by_category = {r["category"]: r["cnt"] for r in cats}
|
|
|
|
statuses = conn.execute(
|
|
"SELECT status, COUNT(*) AS cnt FROM assets GROUP BY status ORDER BY cnt DESC"
|
|
).fetchall()
|
|
by_status = {r["status"]: r["cnt"] for r in statuses}
|
|
|
|
# Enhanced: top visited assets
|
|
top_visited = conn.execute(
|
|
"""SELECT a.name, a.machine_id, COUNT(*) AS visit_count,
|
|
MAX(v.checkin_time) AS last_visit_date
|
|
FROM visits v JOIN assets a ON v.asset_id = a.id
|
|
GROUP BY v.asset_id ORDER BY visit_count DESC LIMIT 10"""
|
|
).fetchall()
|
|
top_visited_list = [
|
|
{"name": r["name"], "machine_id": r["machine_id"],
|
|
"visit_count": r["visit_count"], "last_visit_date": r["last_visit_date"]}
|
|
for r in top_visited
|
|
]
|
|
|
|
# Enhanced: time on site per technician
|
|
time_on_site_rows = conn.execute(
|
|
"""SELECT u.username, COUNT(v.id) AS visit_count,
|
|
SUM(CASE WHEN v.checkout_time IS NOT NULL AND v.checkin_time IS NOT NULL
|
|
THEN (julianday(v.checkout_time) - julianday(v.checkin_time)) * 1440
|
|
ELSE 0 END) AS total_minutes
|
|
FROM visits v JOIN users u ON v.user_id = u.id
|
|
WHERE u.role = 'technician'
|
|
GROUP BY v.user_id ORDER BY total_minutes DESC"""
|
|
).fetchall()
|
|
time_on_site = [
|
|
{"username": r["username"], "visit_count": r["visit_count"],
|
|
"total_minutes": round(r["total_minutes"] or 0, 1)}
|
|
for r in time_on_site_rows
|
|
]
|
|
|
|
# Enhanced: assets by make/manufacturer
|
|
makes = conn.execute(
|
|
"SELECT make, COUNT(*) AS cnt FROM assets WHERE make != '' GROUP BY make ORDER BY cnt DESC"
|
|
).fetchall()
|
|
by_make = {r["make"]: r["cnt"] for r in makes}
|
|
|
|
conn.close()
|
|
return {
|
|
"total_assets": total_assets,
|
|
"total_checkins": total_checkins,
|
|
"by_category": by_category,
|
|
"by_status": by_status,
|
|
"top_visited": top_visited_list,
|
|
"time_on_site": time_on_site,
|
|
"by_make": by_make,
|
|
}
|
|
|
|
|
|
# ─── Task 11: CSV Export ────────────────────────────────────────────────────
|
|
|
|
|
|
def _generate_csv(rows, fieldnames):
|
|
"""Yield CSV rows as a string generator for StreamingResponse."""
|
|
output = io.StringIO()
|
|
writer = csv.DictWriter(output, fieldnames=fieldnames)
|
|
writer.writeheader()
|
|
yield output.getvalue()
|
|
output.seek(0)
|
|
output.truncate(0)
|
|
for row in rows:
|
|
writer.writerow(row_to_dict(row))
|
|
yield output.getvalue()
|
|
output.seek(0)
|
|
output.truncate(0)
|
|
|
|
|
|
_ASSET_CSV_FIELDS = [
|
|
"id", "machine_id", "serial_number", "name", "description", "category",
|
|
"status", "make", "model", "address", "building_name", "building_number",
|
|
"floor", "room", "trailer_number", "walking_directions", "map_link",
|
|
"parking_location", "photo_path", "customer_id", "location_id",
|
|
"assigned_to", "latitude", "longitude", "geofence_radius_meters",
|
|
"created_at", "updated_at",
|
|
]
|
|
|
|
_CHECKIN_CSV_FIELDS = [
|
|
"id", "asset_id", "user_id", "latitude", "longitude", "accuracy",
|
|
"photo_path", "notes", "created_at",
|
|
]
|
|
|
|
|
|
@app.get("/api/export/assets")
|
|
def export_assets_csv():
|
|
conn = get_db()
|
|
rows = conn.execute("SELECT * FROM assets ORDER BY created_at DESC").fetchall()
|
|
conn.close()
|
|
return StreamingResponse(
|
|
_generate_csv(rows, _ASSET_CSV_FIELDS),
|
|
media_type="text/csv",
|
|
headers={"Content-Disposition": "attachment; filename=assets.csv"},
|
|
)
|
|
|
|
|
|
@app.get("/api/export/checkins")
|
|
def export_checkins_csv(asset_id: Optional[int] = Query(None)):
|
|
conn = get_db()
|
|
if asset_id is not None:
|
|
rows = conn.execute(
|
|
"SELECT * FROM checkins WHERE asset_id = ? ORDER BY created_at DESC, id DESC",
|
|
(asset_id,),
|
|
).fetchall()
|
|
else:
|
|
rows = conn.execute("SELECT * FROM checkins ORDER BY created_at DESC").fetchall()
|
|
conn.close()
|
|
return StreamingResponse(
|
|
_generate_csv(rows, _CHECKIN_CSV_FIELDS),
|
|
media_type="text/csv",
|
|
headers={"Content-Disposition": "attachment; filename=checkins.csv"},
|
|
)
|
|
|
|
|
|
# ─── Phase B: Customers API ──────────────────────────────────────────────────
|
|
|
|
|
|
@app.post("/api/customers", status_code=201)
|
|
def create_customer(body: CustomerCreate):
|
|
name = body.name.strip()
|
|
if not name:
|
|
raise HTTPException(status_code=422, detail="Customer name must not be empty")
|
|
conn = get_db()
|
|
try:
|
|
cursor = conn.execute("INSERT INTO customers (name) VALUES (?)", (name,))
|
|
cust_id = cursor.lastrowid
|
|
if body.contacts:
|
|
for c in body.contacts:
|
|
conn.execute(
|
|
"INSERT INTO customer_contacts (customer_id, name, phone, email) VALUES (?, ?, ?, ?)",
|
|
(cust_id, c.name or "", c.phone or "", c.email or ""),
|
|
)
|
|
conn.commit()
|
|
_log_activity(conn, "created", "customer", cust_id,
|
|
f"Customer '{name}' created")
|
|
conn.commit()
|
|
except sqlite3.IntegrityError:
|
|
conn.close()
|
|
raise HTTPException(status_code=409, detail=f"Customer '{name}' already exists")
|
|
|
|
row = conn.execute("SELECT * FROM customers WHERE id = ?", (cust_id,)).fetchone()
|
|
result = row_to_dict(row)
|
|
result["contacts"] = _get_customer_contacts(conn, cust_id)
|
|
conn.close()
|
|
return result
|
|
|
|
|
|
def _get_customer_contacts(conn: sqlite3.Connection, cust_id: int) -> list:
|
|
rows = conn.execute(
|
|
"SELECT id, customer_id, name, phone, email FROM customer_contacts WHERE customer_id = ?",
|
|
(cust_id,),
|
|
).fetchall()
|
|
return [row_to_dict(r) for r in rows]
|
|
|
|
|
|
@app.get("/api/customers")
|
|
def list_customers():
|
|
conn = get_db()
|
|
rows = conn.execute("SELECT * FROM customers ORDER BY name").fetchall()
|
|
result = []
|
|
for r in rows:
|
|
d = row_to_dict(r)
|
|
d["contacts"] = _get_customer_contacts(conn, r["id"])
|
|
result.append(d)
|
|
conn.close()
|
|
return result
|
|
|
|
|
|
@app.get("/api/customers/{cust_id}")
|
|
def get_customer(cust_id: int):
|
|
conn = get_db()
|
|
row = conn.execute("SELECT * FROM customers WHERE id = ?", (cust_id,)).fetchone()
|
|
if row is None:
|
|
conn.close()
|
|
raise HTTPException(status_code=404, detail="Customer not found")
|
|
result = row_to_dict(row)
|
|
result["contacts"] = _get_customer_contacts(conn, cust_id)
|
|
conn.close()
|
|
return result
|
|
|
|
|
|
@app.put("/api/customers/{cust_id}")
|
|
def update_customer(cust_id: int, body: CustomerUpdate):
|
|
conn = get_db()
|
|
existing = conn.execute("SELECT id FROM customers WHERE id = ?", (cust_id,)).fetchone()
|
|
if existing is None:
|
|
conn.close()
|
|
raise HTTPException(status_code=404, detail="Customer not found")
|
|
|
|
if body.name is not None:
|
|
name = body.name.strip()
|
|
if not name:
|
|
conn.close()
|
|
raise HTTPException(status_code=422, detail="Customer name must not be empty")
|
|
conn.execute("UPDATE customers SET name = ?, updated_at = datetime('now') WHERE id = ?", (name, cust_id))
|
|
|
|
if body.contacts is not None:
|
|
# Replace all contacts
|
|
conn.execute("DELETE FROM customer_contacts WHERE customer_id = ?", (cust_id,))
|
|
for c in body.contacts:
|
|
conn.execute(
|
|
"INSERT INTO customer_contacts (customer_id, name, phone, email) VALUES (?, ?, ?, ?)",
|
|
(cust_id, c.name or "", c.phone or "", c.email or ""),
|
|
)
|
|
|
|
conn.commit()
|
|
row = conn.execute("SELECT * FROM customers WHERE id = ?", (cust_id,)).fetchone()
|
|
result = row_to_dict(row)
|
|
result["contacts"] = _get_customer_contacts(conn, cust_id)
|
|
_log_activity(conn, "updated", "customer", cust_id,
|
|
f"Customer '{result['name']}' updated")
|
|
conn.commit()
|
|
conn.close()
|
|
return result
|
|
|
|
|
|
@app.delete("/api/customers/{cust_id}", status_code=204)
|
|
def delete_customer(cust_id: int):
|
|
conn = get_db()
|
|
existing = conn.execute("SELECT id FROM customers WHERE id = ?", (cust_id,)).fetchone()
|
|
if existing is None:
|
|
conn.close()
|
|
raise HTTPException(status_code=404, detail="Customer not found")
|
|
conn.execute("DELETE FROM customers WHERE id = ?", (cust_id,))
|
|
_log_activity(conn, "deleted", "customer", cust_id,
|
|
f"Customer {cust_id} deleted")
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
|
|
# ─── Phase B: Locations API ──────────────────────────────────────────────────
|
|
|
|
|
|
def _get_location_rooms(conn: sqlite3.Connection, loc_id: int) -> list:
|
|
rows = conn.execute(
|
|
"SELECT id, location_id, name, floor, created_at, updated_at FROM rooms WHERE location_id = ? ORDER BY name",
|
|
(loc_id,),
|
|
).fetchall()
|
|
return [row_to_dict(r) for r in rows]
|
|
|
|
|
|
@app.post("/api/locations", status_code=201)
|
|
def create_location(body: LocationCreate):
|
|
name = body.name.strip()
|
|
if not name:
|
|
raise HTTPException(status_code=422, detail="Location name must not be empty")
|
|
conn = get_db()
|
|
if body.customer_id is not None:
|
|
_validate_ref(conn, "customers", body.customer_id, "Customer")
|
|
|
|
cursor = conn.execute(
|
|
"""INSERT INTO locations (customer_id, name, address, building_name, building_number,
|
|
floor, trailer_number, site_hours, access_notes, walking_directions, map_link,
|
|
latitude, longitude)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
|
|
(body.customer_id, name, body.address or "", body.building_name or "",
|
|
body.building_number or "", body.floor or "", body.trailer_number or "",
|
|
body.site_hours or "", body.access_notes or "",
|
|
body.walking_directions or "", body.map_link or "",
|
|
body.latitude, body.longitude),
|
|
)
|
|
conn.commit()
|
|
loc_id = cursor.lastrowid
|
|
row = conn.execute("SELECT * FROM locations WHERE id = ?", (loc_id,)).fetchone()
|
|
result = row_to_dict(row)
|
|
result["rooms"] = []
|
|
conn.close()
|
|
return result
|
|
|
|
|
|
@app.get("/api/locations")
|
|
def list_locations(customer_id: Optional[int] = Query(None)):
|
|
conn = get_db()
|
|
if customer_id is not None:
|
|
rows = conn.execute(
|
|
"SELECT * FROM locations WHERE customer_id = ? ORDER BY name",
|
|
(customer_id,),
|
|
).fetchall()
|
|
else:
|
|
rows = conn.execute("SELECT * FROM locations ORDER BY name").fetchall()
|
|
result = []
|
|
for r in rows:
|
|
d = row_to_dict(r)
|
|
d["rooms"] = _get_location_rooms(conn, r["id"])
|
|
result.append(d)
|
|
conn.close()
|
|
return result
|
|
|
|
|
|
@app.get("/api/locations/{loc_id}")
|
|
def get_location(loc_id: int):
|
|
conn = get_db()
|
|
row = conn.execute("SELECT * FROM locations WHERE id = ?", (loc_id,)).fetchone()
|
|
if row is None:
|
|
conn.close()
|
|
raise HTTPException(status_code=404, detail="Location not found")
|
|
result = row_to_dict(row)
|
|
result["rooms"] = _get_location_rooms(conn, loc_id)
|
|
conn.close()
|
|
return result
|
|
|
|
|
|
_LOCATION_FIELDS = [
|
|
"customer_id", "name", "address", "building_name", "building_number",
|
|
"floor", "trailer_number", "site_hours", "access_notes",
|
|
"walking_directions", "map_link", "latitude", "longitude",
|
|
]
|
|
|
|
|
|
@app.put("/api/locations/{loc_id}")
|
|
def update_location(loc_id: int, body: LocationUpdate):
|
|
conn = get_db()
|
|
existing = conn.execute("SELECT id FROM locations WHERE id = ?", (loc_id,)).fetchone()
|
|
if existing is None:
|
|
conn.close()
|
|
raise HTTPException(status_code=404, detail="Location not found")
|
|
|
|
updates = {}
|
|
for field in _LOCATION_FIELDS:
|
|
val = getattr(body, field, None)
|
|
if val is not None:
|
|
if field == "name":
|
|
name = val.strip()
|
|
if not name:
|
|
conn.close()
|
|
raise HTTPException(status_code=422, detail="Location name must not be empty")
|
|
updates[field] = name
|
|
elif field == "customer_id":
|
|
if val is not None:
|
|
_validate_ref(conn, "customers", val, "Customer")
|
|
updates[field] = val
|
|
elif field in ("latitude", "longitude"):
|
|
updates[field] = val
|
|
else:
|
|
updates[field] = val or ""
|
|
|
|
if updates:
|
|
updates["updated_at"] = "datetime('now')"
|
|
set_clause = ", ".join(
|
|
f"{k} = {v}" if k == "updated_at" else f"{k} = ?"
|
|
for k, v in updates.items()
|
|
)
|
|
values = [v for k, v in updates.items() if k != "updated_at"]
|
|
conn.execute(
|
|
f"UPDATE locations SET {set_clause} WHERE id = ?",
|
|
values + [loc_id],
|
|
)
|
|
conn.commit()
|
|
|
|
row = conn.execute("SELECT * FROM locations WHERE id = ?", (loc_id,)).fetchone()
|
|
result = row_to_dict(row)
|
|
result["rooms"] = _get_location_rooms(conn, loc_id)
|
|
conn.close()
|
|
return result
|
|
|
|
|
|
@app.delete("/api/locations/{loc_id}", status_code=204)
|
|
def delete_location(loc_id: int):
|
|
conn = get_db()
|
|
existing = conn.execute("SELECT id FROM locations WHERE id = ?", (loc_id,)).fetchone()
|
|
if existing is None:
|
|
conn.close()
|
|
raise HTTPException(status_code=404, detail="Location not found")
|
|
conn.execute("DELETE FROM locations WHERE id = ?", (loc_id,))
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
|
|
# ─── Phase B: Rooms API ──────────────────────────────────────────────────────
|
|
|
|
|
|
@app.get("/api/rooms")
|
|
def list_rooms(location_id: Optional[int] = Query(None)):
|
|
"""List rooms, optionally filtered by location_id."""
|
|
conn = get_db()
|
|
if location_id is not None:
|
|
rows = conn.execute(
|
|
"SELECT * FROM rooms WHERE location_id = ? ORDER BY name",
|
|
(location_id,),
|
|
).fetchall()
|
|
else:
|
|
rows = conn.execute("SELECT * FROM rooms ORDER BY name").fetchall()
|
|
conn.close()
|
|
return [row_to_dict(r) for r in rows]
|
|
|
|
|
|
@app.get("/api/rooms/{room_id}")
|
|
def get_room(room_id: int):
|
|
"""Get a single room by id."""
|
|
conn = get_db()
|
|
row = conn.execute("SELECT * FROM rooms WHERE id = ?", (room_id,)).fetchone()
|
|
conn.close()
|
|
if row is None:
|
|
raise HTTPException(status_code=404, detail="Room not found")
|
|
return row_to_dict(row)
|
|
|
|
|
|
@app.post("/api/rooms", status_code=201)
|
|
def create_room(body: RoomCreate):
|
|
conn = get_db()
|
|
_validate_ref(conn, "locations", body.location_id, "Location")
|
|
|
|
cursor = conn.execute(
|
|
"INSERT INTO rooms (location_id, name, floor) VALUES (?, ?, ?)",
|
|
(body.location_id, body.name.strip(), body.floor or ""),
|
|
)
|
|
conn.commit()
|
|
room_id = cursor.lastrowid
|
|
row = conn.execute("SELECT * FROM rooms WHERE id = ?", (room_id,)).fetchone()
|
|
conn.close()
|
|
return row_to_dict(row)
|
|
|
|
|
|
@app.put("/api/rooms/{room_id}")
|
|
def update_room(room_id: int, body: RoomUpdate):
|
|
conn = get_db()
|
|
existing = conn.execute("SELECT id FROM rooms WHERE id = ?", (room_id,)).fetchone()
|
|
if existing is None:
|
|
conn.close()
|
|
raise HTTPException(status_code=404, detail="Room not found")
|
|
|
|
updates = {}
|
|
if body.name is not None:
|
|
updates["name"] = body.name.strip()
|
|
if body.floor is not None:
|
|
updates["floor"] = body.floor
|
|
if body.location_id is not None:
|
|
_validate_ref(conn, "locations", body.location_id, "Location")
|
|
updates["location_id"] = body.location_id
|
|
|
|
if updates:
|
|
updates["updated_at"] = "datetime('now')"
|
|
set_clause = ", ".join(
|
|
f"{k} = {v}" if k == "updated_at" else f"{k} = ?"
|
|
for k, v in updates.items()
|
|
)
|
|
values = [v for k, v in updates.items() if k != "updated_at"]
|
|
conn.execute(
|
|
f"UPDATE rooms SET {set_clause} WHERE id = ?",
|
|
values + [room_id],
|
|
)
|
|
conn.commit()
|
|
|
|
row = conn.execute("SELECT * FROM rooms WHERE id = ?", (room_id,)).fetchone()
|
|
conn.close()
|
|
return row_to_dict(row)
|
|
|
|
|
|
@app.delete("/api/rooms/{room_id}", status_code=204)
|
|
def delete_room(room_id: int):
|
|
conn = get_db()
|
|
existing = conn.execute("SELECT id FROM rooms WHERE id = ?", (room_id,)).fetchone()
|
|
if existing is None:
|
|
conn.close()
|
|
raise HTTPException(status_code=404, detail="Room not found")
|
|
conn.execute("DELETE FROM rooms WHERE id = ?", (room_id,))
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
|
|
# ─── Phase B: Settings API ───────────────────────────────────────────────────
|
|
|
|
|
|
_SETTINGS_SCHEMAS = {
|
|
"categories": {"table": "categories", "columns": ["name", "icon"]},
|
|
"makes": {"table": "makes", "columns": ["name"]},
|
|
"models": {"table": "models", "columns": ["make_id", "name", "icon_path"]},
|
|
"key_names": {"table": "key_names", "columns": ["name"]},
|
|
"key_types": {"table": "key_types", "columns": ["name"]},
|
|
"badge_types": {"table": "badge_types", "columns": ["name"]},
|
|
}
|
|
|
|
|
|
def _settings_list(conn: sqlite3.Connection, table: str):
|
|
if table == "models":
|
|
rows = conn.execute("SELECT * FROM models ORDER BY name").fetchall()
|
|
else:
|
|
rows = conn.execute(f"SELECT * FROM {table} ORDER BY name").fetchall()
|
|
return [row_to_dict(r) for r in rows]
|
|
|
|
|
|
def _settings_create(conn: sqlite3.Connection, entity: str, body: dict):
|
|
schema = _SETTINGS_SCHEMAS[entity]
|
|
table = schema["table"]
|
|
columns = schema["columns"]
|
|
values = []
|
|
for col in columns:
|
|
val = body.get(col, "" if col != "make_id" else None)
|
|
if col == "make_id" and val is None:
|
|
raise HTTPException(status_code=422, detail="make_id is required for models")
|
|
values.append(val if val is not None else "")
|
|
placeholders = ", ".join(["?"] * len(columns))
|
|
col_names = ", ".join(columns)
|
|
try:
|
|
cursor = conn.execute(
|
|
f"INSERT INTO {table} ({col_names}) VALUES ({placeholders})", values
|
|
)
|
|
conn.commit()
|
|
eid = cursor.lastrowid
|
|
row = conn.execute(f"SELECT * FROM {table} WHERE id = ?", (eid,)).fetchone()
|
|
return row_to_dict(row)
|
|
except sqlite3.IntegrityError:
|
|
raise HTTPException(status_code=409, detail=f"Entry already exists in {entity}")
|
|
|
|
|
|
def _settings_update(conn: sqlite3.Connection, entity: str, eid: int, body: dict):
|
|
schema = _SETTINGS_SCHEMAS[entity]
|
|
table = schema["table"]
|
|
columns = schema["columns"]
|
|
|
|
existing = conn.execute(f"SELECT id FROM {table} WHERE id = ?", (eid,)).fetchone()
|
|
if existing is None:
|
|
raise HTTPException(status_code=404, detail=f"{entity[:-1]} not found")
|
|
|
|
updates = {}
|
|
for col in columns:
|
|
if col in body:
|
|
updates[col] = body[col]
|
|
|
|
if updates:
|
|
set_clause = ", ".join(f"{k} = ?" for k in updates)
|
|
values = list(updates.values())
|
|
conn.execute(
|
|
f"UPDATE {table} SET {set_clause} WHERE id = ?", values + [eid]
|
|
)
|
|
conn.commit()
|
|
|
|
row = conn.execute(f"SELECT * FROM {table} WHERE id = ?", (eid,)).fetchone()
|
|
return row_to_dict(row)
|
|
|
|
|
|
def _settings_get(conn: sqlite3.Connection, entity: str, eid: int):
|
|
"""Get a single settings item by id. Returns dict or raises 404."""
|
|
schema = _SETTINGS_SCHEMAS[entity]
|
|
table = schema["table"]
|
|
row = conn.execute(f"SELECT * FROM {table} WHERE id = ?", (eid,)).fetchone()
|
|
if row is None:
|
|
raise HTTPException(status_code=404, detail=f"{entity[:-1]} not found")
|
|
return row_to_dict(row)
|
|
|
|
|
|
def _settings_delete(conn: sqlite3.Connection, entity: str, eid: int):
|
|
schema = _SETTINGS_SCHEMAS[entity]
|
|
table = schema["table"]
|
|
existing = conn.execute(f"SELECT id FROM {table} WHERE id = ?", (eid,)).fetchone()
|
|
if existing is None:
|
|
raise HTTPException(status_code=404, detail=f"{entity[:-1]} not found")
|
|
conn.execute(f"DELETE FROM {table} WHERE id = ?", (eid,))
|
|
conn.commit()
|
|
|
|
|
|
# ─── Dynamic settings routes ─────────────────────────────────────────────────
|
|
# Register CRUD for each settings entity. Note: these use raw request body
|
|
# to avoid Pydantic model explosion — the schemas are validated by
|
|
# _SETTINGS_SCHEMAS above.
|
|
|
|
_SETTINGS_ENTITIES = ["categories", "makes", "models", "key_names", "key_types", "badge_types"]
|
|
|
|
|
|
for _ent in _SETTINGS_ENTITIES:
|
|
|
|
@app.get(f"/api/settings/{_ent}", name=f"list_{_ent}")
|
|
def _list(entity=_ent):
|
|
conn = get_db()
|
|
result = _settings_list(conn, entity)
|
|
conn.close()
|
|
return result
|
|
|
|
@app.get(f"/api/settings/{_ent}/{{eid}}", name=f"get_{_ent}")
|
|
def _get(eid: int, entity=_ent):
|
|
conn = get_db()
|
|
try:
|
|
result = _settings_get(conn, entity, eid)
|
|
conn.close()
|
|
return result
|
|
except HTTPException:
|
|
conn.close()
|
|
raise
|
|
|
|
@app.post(f"/api/settings/{_ent}", status_code=201, name=f"create_{_ent}")
|
|
async def _create(request: Request, entity=_ent):
|
|
body = await request.json()
|
|
conn = get_db()
|
|
try:
|
|
result = _settings_create(conn, entity, body)
|
|
conn.close()
|
|
return result
|
|
except HTTPException:
|
|
conn.close()
|
|
raise
|
|
|
|
@app.put(f"/api/settings/{_ent}/{{eid}}", name=f"update_{_ent}")
|
|
async def _update(eid: int, request: Request, entity=_ent):
|
|
body = await request.json()
|
|
conn = get_db()
|
|
try:
|
|
result = _settings_update(conn, entity, eid, body)
|
|
conn.close()
|
|
return result
|
|
except HTTPException:
|
|
conn.close()
|
|
raise
|
|
|
|
@app.delete(f"/api/settings/{_ent}/{{eid}}", status_code=204, name=f"delete_{_ent}")
|
|
def _delete(eid: int, entity=_ent):
|
|
conn = get_db()
|
|
try:
|
|
_settings_delete(conn, entity, eid)
|
|
conn.close()
|
|
except HTTPException:
|
|
conn.close()
|
|
raise
|
|
|
|
|
|
# ─── Phase C: Helpers ────────────────────────────────────────────────────────
|
|
|
|
VALID_ROLES = {"admin", "technician", "readonly"}
|
|
|
|
|
|
def _hash_password(password: str) -> str:
|
|
"""Simple SHA-256 password hashing."""
|
|
return hashlib.sha256(password.encode()).hexdigest()
|
|
|
|
|
|
def _generate_token() -> str:
|
|
"""Generate a random session token."""
|
|
return secrets.token_hex(32)
|
|
|
|
|
|
def _log_activity(conn: sqlite3.Connection, action: str, entity_type: str,
|
|
entity_id: int, details: str = "", user_id: int = None):
|
|
"""Insert an activity log entry."""
|
|
conn.execute(
|
|
"""INSERT INTO activity_log (user_id, action, entity_type, entity_id, details)
|
|
VALUES (?, ?, ?, ?, ?)""",
|
|
(user_id, action, entity_type, entity_id, details),
|
|
)
|
|
|
|
|
|
def _user_to_dict(row: sqlite3.Row) -> dict:
|
|
"""Convert user row to dict, excluding password_hash."""
|
|
d = row_to_dict(row)
|
|
d.pop("password_hash", None)
|
|
return d
|
|
|
|
|
|
# ─── Phase C: Pydantic Models ────────────────────────────────────────────────
|
|
|
|
|
|
class UserCreate(BaseModel):
|
|
username: str
|
|
password: str
|
|
role: Optional[str] = "technician"
|
|
|
|
|
|
class UserUpdate(BaseModel):
|
|
role: Optional[str] = None
|
|
password: Optional[str] = None
|
|
|
|
|
|
class LoginRequest(BaseModel):
|
|
username: str
|
|
password: str
|
|
|
|
|
|
class GeofenceCreate(BaseModel):
|
|
name: str
|
|
points: list
|
|
color: Optional[str] = "#3388ff"
|
|
user_ids: Optional[list[int]] = None
|
|
|
|
|
|
class GeofenceUpdate(BaseModel):
|
|
name: Optional[str] = None
|
|
points: Optional[list] = None
|
|
color: Optional[str] = None
|
|
user_ids: Optional[list[int]] = None
|
|
|
|
|
|
class GeofencePointCheck(BaseModel):
|
|
lat: float
|
|
lng: float
|
|
|
|
|
|
# ─── Phase C: Users API ──────────────────────────────────────────────────────
|
|
|
|
|
|
@app.post("/api/users", status_code=201)
|
|
def create_user(body: UserCreate):
|
|
username = body.username.strip()
|
|
if not username:
|
|
raise HTTPException(status_code=422, detail="Username must not be empty")
|
|
password = body.password
|
|
if not password:
|
|
raise HTTPException(status_code=422, detail="Password must not be empty")
|
|
role = body.role or "technician"
|
|
if role not in VALID_ROLES:
|
|
raise HTTPException(
|
|
status_code=422,
|
|
detail=f"Invalid role '{role}'. Must be one of: {', '.join(sorted(VALID_ROLES))}",
|
|
)
|
|
|
|
conn = get_db()
|
|
password_hash = _hash_password(password)
|
|
try:
|
|
cursor = conn.execute(
|
|
"INSERT INTO users (username, password_hash, role) VALUES (?, ?, ?)",
|
|
(username, password_hash, role),
|
|
)
|
|
conn.commit()
|
|
uid = cursor.lastrowid
|
|
_log_activity(conn, "created", "user", uid, f"User '{username}' created")
|
|
conn.commit()
|
|
except sqlite3.IntegrityError:
|
|
conn.close()
|
|
raise HTTPException(status_code=409, detail=f"User '{username}' already exists")
|
|
|
|
row = conn.execute("SELECT * FROM users WHERE id = ?", (uid,)).fetchone()
|
|
conn.close()
|
|
return _user_to_dict(row)
|
|
|
|
|
|
@app.get("/api/users")
|
|
def list_users():
|
|
conn = get_db()
|
|
rows = conn.execute("SELECT * FROM users ORDER BY username").fetchall()
|
|
conn.close()
|
|
return [_user_to_dict(r) for r in rows]
|
|
|
|
|
|
@app.get("/api/users/{user_id}")
|
|
def get_user(user_id: int):
|
|
conn = get_db()
|
|
row = conn.execute("SELECT * FROM users WHERE id = ?", (user_id,)).fetchone()
|
|
if row is None:
|
|
conn.close()
|
|
raise HTTPException(status_code=404, detail="User not found")
|
|
conn.close()
|
|
return _user_to_dict(row)
|
|
|
|
|
|
@app.put("/api/users/{user_id}")
|
|
def update_user(user_id: int, body: UserUpdate):
|
|
conn = get_db()
|
|
existing = conn.execute("SELECT * FROM users WHERE id = ?", (user_id,)).fetchone()
|
|
if existing is None:
|
|
conn.close()
|
|
raise HTTPException(status_code=404, detail="User not found")
|
|
|
|
if body.role is not None:
|
|
if body.role not in VALID_ROLES:
|
|
conn.close()
|
|
raise HTTPException(
|
|
status_code=422,
|
|
detail=f"Invalid role '{body.role}'. Must be one of: {', '.join(sorted(VALID_ROLES))}",
|
|
)
|
|
conn.execute("UPDATE users SET role = ? WHERE id = ?", (body.role, user_id))
|
|
|
|
if body.password is not None:
|
|
pw = body.password.strip()
|
|
if not pw:
|
|
conn.close()
|
|
raise HTTPException(status_code=422, detail="Password must not be empty")
|
|
password_hash = _hash_password(pw)
|
|
conn.execute("UPDATE users SET password_hash = ? WHERE id = ?", (password_hash, user_id))
|
|
|
|
conn.commit()
|
|
row = conn.execute("SELECT * FROM users WHERE id = ?", (user_id,)).fetchone()
|
|
conn.close()
|
|
return _user_to_dict(row)
|
|
|
|
|
|
@app.delete("/api/users/{user_id}", status_code=204)
|
|
def delete_user(user_id: int):
|
|
conn = get_db()
|
|
existing = conn.execute("SELECT id FROM users WHERE id = ?", (user_id,)).fetchone()
|
|
if existing is None:
|
|
conn.close()
|
|
raise HTTPException(status_code=404, detail="User not found")
|
|
conn.execute("DELETE FROM users WHERE id = ?", (user_id,))
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
|
|
# ─── User's assigned geofences (service areas) ──────────────────────────
|
|
|
|
|
|
@app.get("/api/users/{user_id}/geofences")
|
|
def list_user_geofences(user_id: int):
|
|
"""List geofences assigned to a user (their service areas)."""
|
|
conn = get_db()
|
|
existing = conn.execute("SELECT id FROM users WHERE id = ?", (user_id,)).fetchone()
|
|
if existing is None:
|
|
conn.close()
|
|
raise HTTPException(status_code=404, detail="User not found")
|
|
rows = conn.execute(
|
|
"""SELECT g.* FROM geofences g
|
|
JOIN geofence_users gu ON gu.geofence_id = g.id
|
|
WHERE gu.user_id = ? ORDER BY g.name""",
|
|
(user_id,),
|
|
).fetchall()
|
|
result = [_geofence_row(r, conn) for r in rows]
|
|
conn.close()
|
|
return result
|
|
|
|
|
|
# ─── Phase C: Auth API ───────────────────────────────────────────────────────
|
|
|
|
|
|
@app.post("/api/auth/login")
|
|
def login(body: LoginRequest):
|
|
conn = get_db()
|
|
row = conn.execute(
|
|
"SELECT * FROM users WHERE username = ?", (body.username,)
|
|
).fetchone()
|
|
if row is None:
|
|
conn.close()
|
|
raise HTTPException(status_code=401, detail="Invalid username or password")
|
|
|
|
password_hash = _hash_password(body.password)
|
|
if password_hash != row["password_hash"]:
|
|
conn.close()
|
|
raise HTTPException(status_code=401, detail="Invalid username or password")
|
|
|
|
token = _generate_token()
|
|
conn.execute(
|
|
"INSERT INTO sessions (user_id, token) VALUES (?, ?)",
|
|
(row["id"], token),
|
|
)
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
result = _user_to_dict(row)
|
|
result["token"] = token
|
|
return result
|
|
|
|
|
|
@app.get("/api/auth/me")
|
|
def auth_me(request: Request):
|
|
"""Return the current authenticated user from the Authorization header."""
|
|
auth_header = request.headers.get("Authorization", "")
|
|
if not auth_header.startswith("Bearer "):
|
|
raise HTTPException(status_code=401, detail="Missing or invalid Authorization header")
|
|
token = auth_header[7:]
|
|
conn = get_db()
|
|
row = conn.execute(
|
|
"SELECT u.* FROM users u JOIN sessions s ON u.id = s.user_id WHERE s.token = ?",
|
|
(token,),
|
|
).fetchone()
|
|
conn.close()
|
|
if row is None:
|
|
raise HTTPException(status_code=401, detail="Invalid or expired token")
|
|
return _user_to_dict(row)
|
|
|
|
|
|
# ─── Phase C: Geofences API ──────────────────────────────────────────────────
|
|
|
|
|
|
@app.post("/api/geofences", status_code=201)
|
|
def create_geofence(body: GeofenceCreate):
|
|
conn = get_db()
|
|
points_json = _json.dumps(body.points)
|
|
cursor = conn.execute(
|
|
"INSERT INTO geofences (name, points, color) VALUES (?, ?, ?)",
|
|
(body.name, points_json, body.color or "#3388ff"),
|
|
)
|
|
gid = cursor.lastrowid
|
|
|
|
# Assign users if provided
|
|
if body.user_ids:
|
|
try:
|
|
_sync_geofence_users(conn, gid, body.user_ids)
|
|
except Exception:
|
|
conn.close()
|
|
raise HTTPException(status_code=422, detail="One or more user IDs are invalid")
|
|
|
|
_log_activity(conn, "created", "geofence", gid, f"Geofence '{body.name}' created")
|
|
conn.commit()
|
|
|
|
row = conn.execute("SELECT * FROM geofences WHERE id = ?", (gid,)).fetchone()
|
|
result = _geofence_row(row, conn)
|
|
conn.close()
|
|
return result
|
|
|
|
|
|
@app.get("/api/geofences")
|
|
def list_geofences():
|
|
conn = get_db()
|
|
rows = conn.execute("SELECT * FROM geofences ORDER BY name").fetchall()
|
|
result = [_geofence_row(r, conn) for r in rows]
|
|
conn.close()
|
|
return result
|
|
|
|
|
|
@app.put("/api/geofences/{geofence_id}")
|
|
def update_geofence(geofence_id: int, body: GeofenceUpdate):
|
|
conn = get_db()
|
|
existing = conn.execute(
|
|
"SELECT id FROM geofences WHERE id = ?", (geofence_id,)
|
|
).fetchone()
|
|
if existing is None:
|
|
conn.close()
|
|
raise HTTPException(status_code=404, detail="Geofence not found")
|
|
|
|
updates = {}
|
|
if body.name is not None:
|
|
updates["name"] = body.name
|
|
if body.points is not None:
|
|
updates["points"] = _json.dumps(body.points)
|
|
if body.color is not None:
|
|
updates["color"] = body.color
|
|
|
|
if updates:
|
|
updates["updated_at"] = "datetime('now')"
|
|
set_clause = ", ".join(
|
|
f"{k} = {v}" if k == "updated_at" else f"{k} = ?"
|
|
for k, v in updates.items()
|
|
)
|
|
values = [v for k, v in updates.items() if k != "updated_at"]
|
|
conn.execute(
|
|
f"UPDATE geofences SET {set_clause} WHERE id = ?",
|
|
values + [geofence_id],
|
|
)
|
|
conn.commit()
|
|
|
|
# Sync assigned users if provided
|
|
if body.user_ids is not None:
|
|
_sync_geofence_users(conn, geofence_id, body.user_ids)
|
|
conn.commit()
|
|
|
|
row = conn.execute(
|
|
"SELECT * FROM geofences WHERE id = ?", (geofence_id,)
|
|
).fetchone()
|
|
result = _geofence_row(row, conn)
|
|
conn.close()
|
|
return result
|
|
|
|
|
|
@app.delete("/api/geofences/{geofence_id}", status_code=204)
|
|
def delete_geofence(geofence_id: int):
|
|
conn = get_db()
|
|
existing = conn.execute(
|
|
"SELECT id FROM geofences WHERE id = ?", (geofence_id,)
|
|
).fetchone()
|
|
if existing is None:
|
|
conn.close()
|
|
raise HTTPException(status_code=404, detail="Geofence not found")
|
|
conn.execute("DELETE FROM geofences WHERE id = ?", (geofence_id,))
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
|
|
# ─── Phase 0: Proximity & Geofence Check ─────────────────────────────────────
|
|
|
|
|
|
def _point_in_polygon(lat: float, lng: float, polygon: list) -> bool:
|
|
"""Ray-casting algorithm for point-in-polygon test."""
|
|
inside = False
|
|
n = len(polygon)
|
|
if n < 3:
|
|
return False
|
|
j = n - 1
|
|
for i in range(n):
|
|
yi = polygon[i]["lat"] if isinstance(polygon[i], dict) else polygon[i][0]
|
|
xi = polygon[i]["lng"] if isinstance(polygon[i], dict) else polygon[i][1]
|
|
yj = polygon[j]["lat"] if isinstance(polygon[j], dict) else polygon[j][0]
|
|
xj = polygon[j]["lng"] if isinstance(polygon[j], dict) else polygon[j][1]
|
|
|
|
if ((yi > lat) != (yj > lat)) and (lng < (xj - xi) * (lat - yi) / (yj - yi) + xi):
|
|
inside = not inside
|
|
j = i
|
|
return inside
|
|
|
|
|
|
@app.get("/api/proximity")
|
|
def proximity_check(
|
|
lat: float = Query(...),
|
|
lng: float = Query(...),
|
|
radius_meters: int = Query(200, ge=1, le=50000),
|
|
):
|
|
"""
|
|
Return assets within radius_meters of (lat, lng), sorted by distance.
|
|
Uses Haversine formula for accurate spherical distance.
|
|
"""
|
|
conn = get_db()
|
|
rows = conn.execute("""
|
|
SELECT * FROM (
|
|
SELECT *, (
|
|
6371000 * acos(
|
|
cos(radians(?)) * cos(radians(latitude)) *
|
|
cos(radians(longitude) - radians(?)) +
|
|
sin(radians(?)) * sin(radians(latitude))
|
|
)
|
|
) AS distance_meters
|
|
FROM assets
|
|
WHERE latitude IS NOT NULL AND longitude IS NOT NULL
|
|
)
|
|
WHERE distance_meters <= ?
|
|
ORDER BY distance_meters
|
|
LIMIT 50
|
|
""", (lat, lng, lat, radius_meters)).fetchall()
|
|
conn.close()
|
|
results = [row_to_dict(r) for r in rows]
|
|
return results
|
|
|
|
|
|
@app.post("/api/geofences/check")
|
|
def check_geofence_point(body: GeofencePointCheck):
|
|
"""
|
|
Check if a GPS point falls inside any geofence polygon.
|
|
Returns list of matching geofences.
|
|
"""
|
|
conn = get_db()
|
|
rows = conn.execute("SELECT * FROM geofences ORDER BY name").fetchall()
|
|
conn.close()
|
|
|
|
matches = []
|
|
for row in rows:
|
|
points = _json.loads(row["points"])
|
|
if _point_in_polygon(body.lat, body.lng, points):
|
|
matches.append(row_to_dict(row))
|
|
return matches
|
|
|
|
|
|
# ─── Phase C: Visits API & Auto-visit Logging ────────────────────────────────
|
|
|
|
|
|
def _auto_log_visit(conn: sqlite3.Connection, user_id: int, asset_id: int,
|
|
checkin_time: str):
|
|
"""Check if a visit should be logged for recent check-ins at same asset by same user."""
|
|
if user_id is None:
|
|
return
|
|
# Find the most recent visit for this user+asset
|
|
prev = conn.execute(
|
|
"""SELECT id, checkin_time FROM visits
|
|
WHERE user_id = ? AND asset_id = ?
|
|
ORDER BY checkin_time DESC LIMIT 1""",
|
|
(user_id, asset_id),
|
|
).fetchone()
|
|
|
|
if prev is None:
|
|
# No prior visit — check if there are at least 2 check-ins in the window
|
|
rows = conn.execute(
|
|
"""SELECT id, created_at FROM checkins
|
|
WHERE user_id = ? AND asset_id = ?
|
|
ORDER BY created_at DESC LIMIT 2""",
|
|
(user_id, asset_id),
|
|
).fetchall()
|
|
if len(rows) >= 2:
|
|
t1 = rows[-1]["created_at"]
|
|
t2 = rows[0]["created_at"]
|
|
conn.execute(
|
|
"""INSERT INTO visits (user_id, asset_id, checkin_time, checkout_time)
|
|
VALUES (?, ?, ?, ?)""",
|
|
(user_id, asset_id, t1, t2),
|
|
)
|
|
else:
|
|
conn.execute(
|
|
"UPDATE visits SET checkout_time = ? WHERE id = ?",
|
|
(checkin_time, prev["id"]),
|
|
)
|
|
|
|
|
|
@app.post("/api/visits", status_code=201)
|
|
def create_visit(body: VisitCreate):
|
|
conn = get_db()
|
|
existing = conn.execute("SELECT id FROM assets WHERE id = ?", (body.asset_id,)).fetchone()
|
|
if existing is None:
|
|
conn.close()
|
|
raise HTTPException(status_code=404, detail="Asset not found")
|
|
|
|
cursor = conn.execute(
|
|
"""INSERT INTO visits (user_id, asset_id, checkin_time, checkout_time, duration_minutes)
|
|
VALUES (?, ?, datetime('now'), datetime('now'), ?)""",
|
|
(body.user_id, body.asset_id, body.duration_minutes or 0),
|
|
)
|
|
conn.commit()
|
|
visit_id = cursor.lastrowid
|
|
|
|
# Activity log
|
|
_log_activity(conn, "created", "visit", visit_id,
|
|
f"Visit logged for asset {body.asset_id}",
|
|
user_id=body.user_id)
|
|
conn.commit()
|
|
|
|
row = conn.execute("SELECT * FROM visits WHERE id = ?", (visit_id,)).fetchone()
|
|
conn.close()
|
|
return row_to_dict(row)
|
|
|
|
|
|
@app.get("/api/visits")
|
|
def list_visits(
|
|
asset_id: Optional[int] = Query(None),
|
|
user_id: Optional[int] = Query(None),
|
|
date_from: Optional[str] = Query(None),
|
|
date_to: Optional[str] = Query(None),
|
|
limit: int = Query(100, ge=1, le=1000),
|
|
offset: int = Query(0, ge=0),
|
|
):
|
|
conn = get_db()
|
|
conditions = []
|
|
params = []
|
|
|
|
if asset_id is not None:
|
|
conditions.append("v.asset_id = ?")
|
|
params.append(asset_id)
|
|
if user_id is not None:
|
|
conditions.append("v.user_id = ?")
|
|
params.append(user_id)
|
|
if date_from:
|
|
conditions.append("v.checkin_time >= ?")
|
|
params.append(date_from)
|
|
if date_to:
|
|
conditions.append("v.checkin_time <= ?")
|
|
params.append(date_to + " 23:59:59")
|
|
|
|
where = " AND ".join(conditions)
|
|
sql = """SELECT v.*, a.name AS asset_name, a.machine_id,
|
|
u.username AS user_name
|
|
FROM visits v
|
|
LEFT JOIN assets a ON v.asset_id = a.id
|
|
LEFT JOIN users u ON v.user_id = u.id"""
|
|
if where:
|
|
sql += f" WHERE {where}"
|
|
sql += " ORDER BY v.checkin_time DESC LIMIT ? OFFSET ?"
|
|
params.extend([limit, offset])
|
|
|
|
rows = conn.execute(sql, params).fetchall()
|
|
conn.close()
|
|
return [row_to_dict(r) for r in rows]
|
|
|
|
|
|
@app.get("/api/visits/stats")
|
|
def get_visit_stats():
|
|
conn = get_db()
|
|
total_visits = conn.execute("SELECT COUNT(*) FROM visits").fetchone()[0]
|
|
|
|
per_asset = conn.execute(
|
|
"""SELECT a.name, a.machine_id, COUNT(*) AS cnt
|
|
FROM visits v JOIN assets a ON v.asset_id = a.id
|
|
GROUP BY v.asset_id ORDER BY cnt DESC"""
|
|
).fetchall()
|
|
visits_per_asset = [
|
|
{"name": r["name"], "machine_id": r["machine_id"], "count": r["cnt"]}
|
|
for r in per_asset
|
|
]
|
|
|
|
per_user = conn.execute(
|
|
"""SELECT u.username, COUNT(*) AS visit_count,
|
|
SUM(
|
|
CASE WHEN v.checkout_time IS NOT NULL AND v.checkin_time IS NOT NULL
|
|
THEN (julianday(v.checkout_time) - julianday(v.checkin_time)) * 1440
|
|
ELSE 0 END
|
|
) AS total_minutes
|
|
FROM visits v JOIN users u ON v.user_id = u.id
|
|
GROUP BY v.user_id ORDER BY total_minutes DESC"""
|
|
).fetchall()
|
|
time_on_site = [
|
|
{"username": r["username"], "visit_count": r["visit_count"],
|
|
"total_minutes": round(r["total_minutes"] or 0, 1)}
|
|
for r in per_user
|
|
]
|
|
|
|
conn.close()
|
|
return {
|
|
"total_visits": total_visits,
|
|
"visits_per_asset": visits_per_asset,
|
|
"time_on_site": time_on_site,
|
|
}
|
|
|
|
|
|
# ─── Phase C: Activity Feed API ──────────────────────────────────────────────
|
|
|
|
|
|
@app.get("/api/activity")
|
|
def list_activity(
|
|
user_id: Optional[int] = Query(None),
|
|
entity_type: Optional[str] = Query(None),
|
|
date_from: Optional[str] = Query(None),
|
|
date_to: Optional[str] = Query(None),
|
|
limit: int = Query(100, ge=1, le=1000),
|
|
offset: int = Query(0, ge=0),
|
|
):
|
|
conn = get_db()
|
|
conditions = []
|
|
params = []
|
|
|
|
if user_id is not None:
|
|
conditions.append("a.user_id = ?")
|
|
params.append(user_id)
|
|
if entity_type is not None:
|
|
conditions.append("a.entity_type = ?")
|
|
params.append(entity_type)
|
|
if date_from:
|
|
conditions.append("a.created_at >= ?")
|
|
params.append(date_from)
|
|
if date_to:
|
|
conditions.append("a.created_at <= ?")
|
|
params.append(date_to + " 23:59:59")
|
|
|
|
where = " AND ".join(conditions)
|
|
sql = """SELECT a.*, u.username AS user_name
|
|
FROM activity_log a
|
|
LEFT JOIN users u ON a.user_id = u.id"""
|
|
if where:
|
|
sql += f" WHERE {where}"
|
|
sql += " ORDER BY a.created_at DESC, a.id DESC LIMIT ? OFFSET ?"
|
|
params.extend([limit, offset])
|
|
|
|
rows = conn.execute(sql, params).fetchall()
|
|
conn.close()
|
|
return [row_to_dict(r) for r in rows]
|
|
|
|
|
|
# ─── Phase C: Export Endpoints Extension ─────────────────────────────────────
|
|
|
|
|
|
@app.get("/api/export/service-summary")
|
|
def export_service_summary_csv():
|
|
conn = get_db()
|
|
rows = conn.execute(
|
|
"""SELECT c.name AS customer_name, l.name AS location_name,
|
|
COUNT(a.id) AS asset_count,
|
|
MAX(ck.created_at) AS last_checkin
|
|
FROM customers c
|
|
LEFT JOIN locations l ON l.customer_id = c.id
|
|
LEFT JOIN assets a ON a.customer_id = c.id
|
|
LEFT JOIN checkins ck ON ck.asset_id = a.id
|
|
GROUP BY c.id, l.id
|
|
ORDER BY c.name, l.name"""
|
|
).fetchall()
|
|
conn.close()
|
|
|
|
def _gen():
|
|
output = io.StringIO()
|
|
writer = csv.DictWriter(output, fieldnames=["customer_name", "location_name", "asset_count", "last_checkin"])
|
|
writer.writeheader()
|
|
yield output.getvalue()
|
|
output.seek(0)
|
|
output.truncate(0)
|
|
for row in rows:
|
|
d = row_to_dict(row)
|
|
d["last_checkin"] = d["last_checkin"] or ""
|
|
writer.writerow(d)
|
|
yield output.getvalue()
|
|
output.seek(0)
|
|
output.truncate(0)
|
|
|
|
return StreamingResponse(
|
|
_gen(),
|
|
media_type="text/csv",
|
|
headers={"Content-Disposition": "attachment; filename=service_summary.csv"},
|
|
)
|
|
|
|
|
|
|
|
|
|
# ─── File Uploads ───────────────────────────────────────────────────────────
|
|
|
|
ICON_MAX_SIZE = 2 * 1024 * 1024 # 2 MB
|
|
PHOTO_MAX_SIZE = 10 * 1024 * 1024 # 10 MB
|
|
ICON_ALLOWED_EXTS = {".png", ".jpg", ".jpeg", ".svg"}
|
|
PHOTO_ALLOWED_EXTS = {".png", ".jpg", ".jpeg"}
|
|
|
|
|
|
def _save_upload(upload: UploadFile, subdir: str, allowed_exts: set, max_size: int) -> str:
|
|
"""Save uploaded file to uploads/{subdir}/ with a UUID filename.
|
|
|
|
Returns the relative URL path, e.g. /uploads/icons/abc123.png.
|
|
"""
|
|
ext = Path(upload.filename or "").suffix.lower()
|
|
if not ext or ext not in allowed_exts:
|
|
allowed = ", ".join(sorted(allowed_exts))
|
|
raise HTTPException(400, f"Invalid file type. Allowed: {allowed}")
|
|
|
|
contents = upload.file.read()
|
|
if len(contents) > max_size:
|
|
mb = max_size // (1024 * 1024)
|
|
raise HTTPException(413, f"File too large. Maximum size: {mb} MB")
|
|
|
|
dest_dir = UPLOADS_DIR / subdir
|
|
dest_dir.mkdir(parents=True, exist_ok=True)
|
|
fname = f"{uuid.uuid4().hex}{ext}"
|
|
(dest_dir / fname).write_bytes(contents)
|
|
|
|
return f"/uploads/{subdir}/{fname}"
|
|
|
|
|
|
@app.post("/api/upload/icon", status_code=201)
|
|
async def upload_icon(file: UploadFile = File(...)):
|
|
path = _save_upload(file, "icons", ICON_ALLOWED_EXTS, ICON_MAX_SIZE)
|
|
return {"path": path}
|
|
|
|
|
|
@app.post("/api/upload/photo", status_code=201)
|
|
async def upload_photo(file: UploadFile = File(...)):
|
|
path = _save_upload(file, "photos", PHOTO_ALLOWED_EXTS, PHOTO_MAX_SIZE)
|
|
return {"path": path}
|
|
|
|
|
|
@app.post("/api/ocr", status_code=200)
|
|
async def ocr_sticker(file: UploadFile = File(...)):
|
|
"""OCR a sticker photo to extract machine_id from XXXXX-XXXXXX format.
|
|
|
|
Accepts an image upload, runs Tesseract OCR, and looks for a pattern like
|
|
'12345-678901'. Returns the extracted machine_id or an error.
|
|
"""
|
|
# Validate file extension
|
|
ext = file.filename.rsplit(".", 1)[-1].lower() if "." in (file.filename or "") else ""
|
|
if ext not in {"png", "jpg", "jpeg", "webp", "bmp"}:
|
|
raise HTTPException(status_code=400, detail=f"Unsupported image format: .{ext}")
|
|
|
|
contents = await file.read()
|
|
if len(contents) > 10 * 1024 * 1024: # 10MB max
|
|
raise HTTPException(status_code=400, detail="Image too large (max 10MB)")
|
|
|
|
# Save temp file for OCR
|
|
temp_dir = Path(UPLOADS_DIR / "ocr")
|
|
temp_dir.mkdir(parents=True, exist_ok=True)
|
|
temp_path = temp_dir / f"{uuid.uuid4().hex}.{ext or 'jpg'}"
|
|
temp_path.write_bytes(contents)
|
|
|
|
try:
|
|
img = PILImage.open(temp_path)
|
|
# Preprocess: convert to grayscale and increase contrast for better OCR
|
|
img_gray = img.convert("L")
|
|
text = pytesseract.image_to_string(img_gray, config="--psm 6")
|
|
except Exception as e:
|
|
temp_path.unlink(missing_ok=True)
|
|
raise HTTPException(status_code=500, detail=f"OCR processing failed: {str(e)}")
|
|
|
|
# Don't keep temp file after processing
|
|
temp_path.unlink(missing_ok=True)
|
|
|
|
# Search for XXXXX-XXXXXX pattern (5 digits - 6 digits or more)
|
|
match = re.search(r"(\d{5})[-\s]*(\d{6,})", text)
|
|
if match:
|
|
machine_id = f"{match.group(1)}-{match.group(2)}"
|
|
return {
|
|
"machine_id": machine_id,
|
|
"raw_text": text.strip()[:500],
|
|
"confidence": "high",
|
|
}
|
|
|
|
# Try looser: any 5+ digit number as machine_id
|
|
loose = re.search(r"(\d{5,})", text)
|
|
if loose:
|
|
return {
|
|
"machine_id": loose.group(1),
|
|
"raw_text": text.strip()[:500],
|
|
"confidence": "low",
|
|
}
|
|
|
|
return {
|
|
"machine_id": None,
|
|
"raw_text": text.strip()[:500],
|
|
"confidence": "none",
|
|
"detail": "No machine ID pattern found in image. Try again with better lighting.",
|
|
}
|
|
|
|
|
|
# ─── Static Files (mounted last to not shadow routes) ──────────────────────
|
|
|
|
app.mount("/uploads", StaticFiles(directory=str(UPLOADS_DIR)), name="uploads")
|
|
|
|
if STATIC_DIR.exists():
|
|
app.mount("/", StaticFiles(directory=str(STATIC_DIR), html=True), name="static")
|