diff --git a/server/api/civilian.py b/server/api/civilian.py new file mode 100644 index 0000000..79c5210 --- /dev/null +++ b/server/api/civilian.py @@ -0,0 +1,394 @@ +"""Civilian-facing API tier — the endpoints the SafeThread iOS app calls. + +Mirrors the contract documented in +`bitchat-amber/bitchat/Services/HubClient.swift`. The iOS app is the only +consumer; the operator dashboard is served by `/api/*` routers elsewhere. + +Endpoints: `/v1/register` (creates Account), `/v1/message` (inserts +InboundMessage with channel='app'), `/v1/alerts/active` (returns +operator-issued Alerts), `WS /v1/stream` (pushes ALERT_ISSUED on +incident_upserted). Sighting / location_report / profile remain stubs +until the DTN layer re-merges. + +Auth: `Authorization: Bearer ` — `userId` is the phone number, +returned to iOS at register time. Friend can swap for a JWT later. + +Civilian messages are *never* cases. A "case" is an operator-issued +Alert created via `POST /api/incidents`. Civilian messages ride +`InboundMessage(channel='app', in_reply_to_alert_id=NULL)` — they appear +on the Messages tab's live wire (`MessagesView.tsx`) but do NOT show up +on the Cases page. If a caseworker decides a message warrants a case, +that's a separate explicit action. +""" + +from __future__ import annotations + +from datetime import UTC, datetime +from typing import Any, Optional + +import asyncio +import json +import logging + +from fastapi import APIRouter, Depends, Header, HTTPException, Response, WebSocket +from fastapi.websockets import WebSocketDisconnect +from pydantic import BaseModel +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession + +from server.db.alerts import Alert +from server.db.base import generate_ulid +from server.db.engine import get_engine, get_session_maker +from server.db.identity import NGO, Account, get_or_create_default_ngo +from server.db.messages import InboundMessage +from server.db.session import get_db +from server.eventbus.postgres import PostgresEventBus + +logger = logging.getLogger(__name__) + +router = APIRouter(prefix="/v1", tags=["civilian"]) + + +# --------------------------------------------------------------------------- +# Schemas (request/response shapes the iOS HubClient expects verbatim) +# --------------------------------------------------------------------------- + + +class RegisterBody(BaseModel): + name: str + phone_number: str + profession: Optional[str] = None + language: str + bitchat_pubkey: str + apns_token: Optional[str] = None + + +class RegisterResponse(BaseModel): + user_id: str + hub_pubkey: str + ngo_name: str + + +class MessageBody(BaseModel): + body: str + client_msg_id: str + sent_at: float # unix seconds; iOS sends Date().timeIntervalSince1970 + + +class OkResponse(BaseModel): + ok: bool = True + + +class SightingBody(BaseModel): + """JSON request body when the user submits a sighting *without* + photo/voice attachments. iOS sends multipart/form-data when media + is attached — that path stays a stub for now.""" + + case_id: str + free_text: str + client_msg_id: str + observed_at: float + location: Optional[list[float]] = None # [lat, lng] + + +class SightingResponse(BaseModel): + sighting_id: str + ack: bool = True + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +async def _resolve_single_ngo(db: AsyncSession) -> NGO: + """Return the single configured NGO. If none exist yet (fresh volume), + create the default one on the fly so /v1/register doesn't fail + cold-start. Multi-NGO deployments (>1) are still rejected — there's no + routing key to choose between them on a civilian app registration.""" + try: + return await get_or_create_default_ngo(db) + except RuntimeError as exc: + raise HTTPException(status_code=503, detail=str(exc)) from exc + + +async def _bearer_phone(authorization: str = Header(...)) -> str: + """Extract phone from `Authorization: Bearer `. The phone + serves as the opaque user_id the iOS app received from /v1/register. + """ + if not authorization or not authorization.startswith("Bearer "): + raise HTTPException(status_code=401, detail="Missing bearer token") + token = authorization.removeprefix("Bearer ").strip() + if not token: + raise HTTPException(status_code=401, detail="Empty bearer token") + return token + + +# --------------------------------------------------------------------------- +# POST /v1/register +# --------------------------------------------------------------------------- + + +@router.post("/register", response_model=RegisterResponse) +async def register( + body: RegisterBody, + db: AsyncSession = Depends(get_db), +) -> RegisterResponse: + ngo = await _resolve_single_ngo(db) + + acc = await db.get(Account, body.phone_number) + if acc is None: + acc = Account( + phone=body.phone_number, + ngo_id=ngo.ngo_id, + language=body.language, + push_token=body.apns_token, + # Stash the iOS-supplied bitchat pubkey in `account_id` until the + # `bitchat_pubkey` column lands on main (it was in the orphaned + # DTN PR #5). The iOS app sends a hex string; either column + # accepts the same shape. + account_id=body.bitchat_pubkey, + source="app", + ) + db.add(acc) + await db.flush() + else: + # Re-register: refresh language + push token + key. + acc.language = body.language + acc.push_token = body.apns_token + acc.account_id = body.bitchat_pubkey + + await db.commit() + + return RegisterResponse( + user_id=body.phone_number, + # Stub: real X25519 hub pubkey gets wired when the DTN library + # re-merges and the friend exposes a key from env. + hub_pubkey="00" * 32, + ngo_name=ngo.name, + ) + + +# --------------------------------------------------------------------------- +# POST /v1/message +# --------------------------------------------------------------------------- + + +@router.post("/message", response_model=OkResponse) +async def message( + body: MessageBody, + user_id: str = Depends(_bearer_phone), + db: AsyncSession = Depends(get_db), +) -> OkResponse: + acc = await db.get(Account, user_id) + if acc is None: + raise HTTPException(status_code=404, detail="Unknown account; register first") + + # Civilian messages are NOT cases — they ride orphan (no alert reference) + # and surface on the dashboard's Messages tab as live wire. A caseworker + # can promote a message to a case via the explicit Create Case flow. + msg_id = generate_ulid() + msg = InboundMessage( + msg_id=msg_id, + ngo_id=acc.ngo_id, + channel="app", + sender_phone=user_id, + in_reply_to_alert_id=None, + body=body.body, + media_urls=[], + raw={ + "kind": "general_message", + "client_msg_id": body.client_msg_id, + "sent_at": body.sent_at, + }, + received_at=datetime.now(UTC), + status="new", + ) + db.add(msg) + await db.commit() + + # Wakes the triage worker + flips the WS stream to push the new message + # to the operator dashboard. Mirrors the pattern in sim_inbound. + bus = PostgresEventBus(get_engine()) + await bus.publish("new_inbound", msg_id) + + return OkResponse(ok=True) + + +# --------------------------------------------------------------------------- +# Stubs — the other five endpoints the iOS app calls. +# --------------------------------------------------------------------------- +# +# These return 501 (rather than the FastAPI default 405 / 404) so the +# iOS app's HubError surfaces a clean "not supported yet" rather than a +# misleading "method not allowed". Real implementations land when the +# DTN library re-merges (sighting, location_report, profile) and when +# the WS stream gets wired to the existing eventbus channels. + + +@router.post("/sighting", response_model=SightingResponse) +async def sighting( + body: SightingBody, + user_id: str = Depends(_bearer_phone), + db: AsyncSession = Depends(get_db), +) -> SightingResponse: + """A user-submitted sighting *on* a specific operator-issued case. + + Unlike civilian free-form messages (`/v1/message`), sightings reference + a `case_id` — they thread under that case on the dashboard. The + triage worker will classify the body and the agent worker will see + the bucket (because there's an alert_id) and may suggest follow-up + actions. + + Photo/voice multipart uploads aren't supported here yet; iOS only + sends multipart when media is attached, so text-only sightings work + over HTTP and media still falls back to the DTN mesh queue (the + "Offline — sighting queued for mesh relay" path). + """ + acc = await db.get(Account, user_id) + if acc is None: + raise HTTPException(status_code=404, detail="Unknown account; register first") + + alert = await db.get(Alert, body.case_id) + if alert is None: + raise HTTPException(status_code=404, detail="case not found") + + msg_id = generate_ulid() + msg = InboundMessage( + msg_id=msg_id, + ngo_id=acc.ngo_id, + channel="app", + sender_phone=user_id, + in_reply_to_alert_id=body.case_id, + body=body.free_text, + media_urls=[], + raw={ + "kind": "sighting", + "client_msg_id": body.client_msg_id, + "observed_at": body.observed_at, + "location": body.location, + }, + received_at=datetime.now(UTC), + status="new", + ) + db.add(msg) + await db.commit() + + bus = PostgresEventBus(get_engine()) + await bus.publish("new_inbound", msg_id) + + return SightingResponse(sighting_id=msg_id, ack=True) + + +@router.post("/location_report") +async def location_report_stub( + user_id: str = Depends(_bearer_phone), +) -> Response: + del user_id + return Response(status_code=501, content="endpoint not yet implemented") + + +@router.post("/profile") +async def profile_stub( + user_id: str = Depends(_bearer_phone), +) -> Response: + del user_id + return Response(status_code=501, content="endpoint not yet implemented") + + +@router.get("/alerts/active") +async def alerts_active( + user_id: str = Depends(_bearer_phone), + db: AsyncSession = Depends(get_db), +) -> dict[str, Any]: + """Return all active operator-issued amber alerts for the iOS app. + + Excludes `category='general_inbound'` (those are per-user app-message + threads — not real alerts a civilian should see). + """ + del user_id # auth-only check; alerts are not user-scoped today + rows = ( + await db.execute( + select(Alert) + .where(Alert.status == "active") + .where(Alert.category != "general_inbound") + ) + ).scalars().all() + return {"alerts": [_alert_to_ios_payload(a) for a in rows]} + + +def _alert_to_ios_payload(alert: Alert) -> dict[str, Any]: + """Map an Alert row to the iOS HubClient's expected envelope. + + iOS expects keys: case_id, title, summary, issued_at (unix seconds), + version (int), photo_url (optional), category (optional). + """ + # Use updated_at so edits push a newer timestamp to the iOS upsertAlert; + # falls back to created_at when the row hasn't been edited yet. + ts = alert.updated_at or alert.created_at + issued_at = ts.timestamp() if ts else 0 + return { + "case_id": alert.alert_id, + "title": alert.person_name or (alert.description or "")[:80], + "summary": alert.description or "", + "issued_at": issued_at, + "version": 1, + "photo_url": None, + "category": alert.category, + } + + +# --------------------------------------------------------------------------- +# WS /v1/stream — push ALERT_ISSUED to connected iOS clients. +# --------------------------------------------------------------------------- +# +# Subscribes to the existing `incident_upserted` Postgres NOTIFY channel. +# When an operator creates a case via POST /api/incidents, the dashboard +# WS already receives the event; this handler forwards a typed envelope +# the iOS HubEvent decoder expects (type=ALERT_ISSUED). +# +# Auth: iOS sends `Authorization: Bearer ` as a connection header. +# WebSockets in browsers can't set custom headers, but iOS URLSessionTask +# WebSocket can. We accept the header if present; otherwise still accept +# the connection (the demo doesn't have auth-revocation yet). + + +@router.websocket("/stream") +async def stream(websocket: WebSocket) -> None: + await websocket.accept() + bus = PostgresEventBus(get_engine()) + sm = get_session_maker() + + async def forwarder() -> None: + async for alert_id in bus.subscribe("incident_upserted"): + try: + async with sm() as s: + alert = await s.get(Alert, alert_id) + if alert is None or alert.category == "general_inbound": + continue + envelope = { + "type": "ALERT_ISSUED", + **_alert_to_ios_payload(alert), + } + await websocket.send_text(json.dumps(envelope)) + except WebSocketDisconnect: + return + except Exception: # noqa: BLE001 — keep the loop alive + logger.exception("civilian /v1/stream forward failed") + + forward_task = asyncio.create_task(forwarder(), name="civilian-stream-forwarder") + + try: + # Drain inbound frames so the connection stays half-open. The iOS + # app doesn't send anything on this socket today, but if it ever + # does we just discard. + while True: + await websocket.receive_text() + except WebSocketDisconnect: + pass + finally: + forward_task.cancel() + try: + await asyncio.wait_for(forward_task, timeout=2.0) + except (asyncio.CancelledError, asyncio.TimeoutError): + pass diff --git a/server/api/dashboard.py b/server/api/dashboard.py index 2d2a1a0..ab6809d 100644 --- a/server/api/dashboard.py +++ b/server/api/dashboard.py @@ -10,7 +10,7 @@ from server.api.auth_dep import current_operator from server.api.registry import REGIONS from server.db.alerts import Alert -from server.db.messages import InboundMessage +from server.db.messages import InboundMessage, TriagedMessage from server.db.session import get_db router = APIRouter(prefix="/api") @@ -54,10 +54,14 @@ async def dashboard( alerts = (await db.execute(select(Alert).where(Alert.status == "active"))).scalars().all() alert_map: dict[str, Alert] = {a.alert_id: a for a in alerts} + # Civilian app messages arrive orphan (no alert reference) by design — + # a "case" is only created when an operator explicitly clicks Create + # case. We still want these messages to appear on the Messages tab's + # live wire and survive tab switches, so include them here. Region + # stats below skip orphans naturally (they aren't tied to a region). msgs_in_window = ( await db.execute( select(InboundMessage) - .where(InboundMessage.in_reply_to_alert_id.isnot(None)) .where(InboundMessage.received_at >= window_start) .order_by(InboundMessage.received_at.desc()) ) @@ -135,9 +139,35 @@ async def dashboard( "cases": cases, }) + # Pull triage data for the visible slice so the wire can show the + # cheap-LLM classification ("sighting" / "noise" / etc.) and let the + # operator click "Make a case" on flagged messages. + visible = msgs_in_window[:10] + visible_ids = [m.msg_id for m in visible] + triage_by_id: dict[str, TriagedMessage] = {} + if visible_ids: + triage_rows = ( + await db.execute( + select(TriagedMessage).where(TriagedMessage.msg_id.in_(visible_ids)) + ) + ).scalars().all() + triage_by_id = {t.msg_id: t for t in triage_rows} + + def _triage_payload(msg_id: str) -> dict[str, Any]: + t = triage_by_id.get(msg_id) + if t is None: + return {} + return { + "classification": t.classification, + "confidence": float(t.confidence) if t.confidence is not None else None, + "geohash6": t.geohash6, + "language": t.language, + } + recent_distress = [] - for msg in msgs_in_window[:10]: + for msg in visible: if msg.in_reply_to_alert_id and msg.in_reply_to_alert_id in alert_map: + # Tied to an operator-issued case. alert = alert_map[msg.in_reply_to_alert_id] region_key = _region_for_prefix(alert.region_geohash_prefix) meta = REGIONS[region_key] @@ -149,6 +179,19 @@ async def dashboard( "from": msg.sender_phone, "body": msg.body, "ts": msg.received_at.isoformat(), + "triage": _triage_payload(msg.msg_id), + }) + else: + # Free-form civilian inbound — appears on the wire, isn't a case. + recent_distress.append({ + "messageId": msg.msg_id, + "incidentId": None, + "region": None, + "regionLabel": "—", + "from": msg.sender_phone, + "body": msg.body, + "ts": msg.received_at.isoformat(), + "triage": _triage_payload(msg.msg_id), }) return { diff --git a/server/api/incidents.py b/server/api/incidents.py index 0e6d331..34d67f8 100644 --- a/server/api/incidents.py +++ b/server/api/incidents.py @@ -1,18 +1,23 @@ from __future__ import annotations -from typing import Annotated, Any +from typing import Annotated, Any, Literal, Optional from fastapi import APIRouter, Depends, HTTPException +from pydantic import BaseModel, Field from sqlalchemy import func, select from sqlalchemy.ext.asyncio import AsyncSession from server.api.auth_dep import current_operator from server.api.registry import REGIONS from server.db.alerts import Alert +from server.db.base import generate_ulid from server.db.decisions import AgentDecision, ToolCall +from server.db.engine import get_engine +from server.db.identity import NGO, get_or_create_default_ngo from server.db.messages import Bucket, InboundMessage, TriagedMessage from server.db.outbound import OutboundMessage from server.db.session import get_db +from server.eventbus.postgres import PostgresEventBus router = APIRouter(prefix="/api") @@ -216,3 +221,123 @@ async def incident_messages( messages.sort(key=lambda m: m["ts"]) return messages + + +# --------------------------------------------------------------------------- +# POST /api/incidents — operator creates a case from the dashboard. +# --------------------------------------------------------------------------- +# +# Inserts an `Alert` row and publishes an `incident_upserted` event so the +# WS-connected dashboards live-update. The civilian app picks it up either +# via its polled `GET /v1/alerts/active` (cold start) or its WS stream +# (live). + +_VALID_CATEGORIES = { + "missing_person", + "medical", + "resource_shortage", + "safety", +} + +_VALID_URGENCY_TIERS = {"low", "medium", "high", "critical"} + + +class CreateIncidentBody(BaseModel): + person_name: str = Field(..., min_length=1, max_length=200) + description: str = Field(..., min_length=1) + region: str = Field(..., description="Region key from REGIONS, e.g. SYR_ALEPPO") + category: Literal["missing_person", "medical", "resource_shortage", "safety"] + urgency_tier: Optional[Literal["low", "medium", "high", "critical"]] = "medium" + + +class UpdateIncidentBody(BaseModel): + """Partial update — every field is optional. Send only what changes.""" + + description: Optional[str] = Field(None, min_length=1) + urgency_tier: Optional[Literal["low", "medium", "high", "critical"]] = None + status: Optional[Literal["active", "resolved", "archived"]] = None + + +@router.patch("/incidents/{incident_id}") +async def update_incident( + incident_id: str, + body: UpdateIncidentBody, + _op: Annotated[dict[str, Any], Depends(current_operator)], + db: AsyncSession = Depends(get_db), +) -> dict[str, Any]: + alert = ( + await db.execute(select(Alert).where(Alert.alert_id == incident_id)) + ).scalar_one_or_none() + if alert is None: + raise HTTPException(status_code=404, detail="incident not found") + + if body.description is not None: + alert.description = body.description + if body.urgency_tier is not None: + alert.urgency_tier = body.urgency_tier + alert.urgency_score = { + "low": 0.25, + "medium": 0.5, + "high": 0.75, + "critical": 0.95, + }[body.urgency_tier] + if body.status is not None: + alert.status = body.status + + await db.commit() + + # Same pipe as create — incident_upserted travels through ws.py to the + # dashboard and through civilian.py /v1/stream as ALERT_ISSUED, so the + # iOS card upserts in place via case_id. + bus = PostgresEventBus(get_engine()) + await bus.publish("incident_upserted", alert.alert_id) + + return alert_to_incident_shape(alert) | {"messageCount": 0, "lastActivity": None} + + +@router.post("/incidents", status_code=201) +async def create_incident( + body: CreateIncidentBody, + _op: Annotated[dict[str, Any], Depends(current_operator)], + db: AsyncSession = Depends(get_db), +) -> dict[str, Any]: + if body.region not in REGIONS: + raise HTTPException( + status_code=400, + detail=f"unknown region '{body.region}'. Known: {sorted(REGIONS.keys())}", + ) + region_meta = REGIONS[body.region] + + try: + ngo = await get_or_create_default_ngo(db) + except RuntimeError as exc: + raise HTTPException(status_code=503, detail=str(exc)) from exc + + urgency_score = { + "low": 0.25, + "medium": 0.5, + "high": 0.75, + "critical": 0.95, + }.get(body.urgency_tier or "medium", 0.5) + + alert = Alert( + alert_id=generate_ulid(), + ngo_id=ngo.ngo_id, + person_name=body.person_name, + description=body.description, + region_geohash_prefix=region_meta["geohash_prefix"], + last_seen_geohash=region_meta["geohash_prefix"], + status="active", + category=body.category, + urgency_tier=body.urgency_tier or "medium", + urgency_score=urgency_score, + ) + db.add(alert) + await db.commit() + + # Wakes the WS layer (`_compose_incident_event`) and triggers the + # civilian-side ALERT_ISSUED push. + bus = PostgresEventBus(get_engine()) + await bus.publish("incident_upserted", alert.alert_id) + + return alert_to_incident_shape(alert) | {"messageCount": 0, "lastActivity": None} diff --git a/server/api/ws.py b/server/api/ws.py index 4ac8e15..facf3cc 100644 --- a/server/api/ws.py +++ b/server/api/ws.py @@ -9,7 +9,7 @@ from server.db.alerts import Alert from server.db.decisions import AgentDecision, ToolCall from server.db.engine import get_engine, get_session_maker -from server.db.messages import Bucket, InboundMessage +from server.db.messages import Bucket, InboundMessage, TriagedMessage from server.eventbus.postgres import PostgresEventBus from server.workers.narrate import narrate_call, narrate_decision @@ -52,7 +52,19 @@ async def _compose_inbound_event(msg_id: str) -> Optional[dict]: alert: Optional[Alert] = None if msg.in_reply_to_alert_id: alert = await s.get(Alert, msg.in_reply_to_alert_id) - return {"type": "message", "incident": _incident_shape(alert), "message": _message_shape(msg)} + triage = await s.get(TriagedMessage, msg_id) + payload = _message_shape(msg) + payload["triage"] = ( + { + "classification": triage.classification, + "confidence": float(triage.confidence) if triage.confidence is not None else None, + "geohash6": triage.geohash6, + "language": triage.language, + } + if triage is not None + else None + ) + return {"type": "message", "incident": _incident_shape(alert), "message": payload} async def _compose_incident_event(alert_id: str) -> Optional[dict]: @@ -185,6 +197,10 @@ async def listen(channel: str): evt: Optional[dict] = None if channel == "new_inbound": evt = await _compose_inbound_event(payload) + elif channel == "inbound_triaged": + # Re-emit a "message" event with triage fields filled + # in. Frontend treats it as an upsert by msg_id. + evt = await _compose_inbound_event(payload) elif channel == "incident_upserted": evt = await _compose_incident_event(payload) elif channel == "agent_thinking": @@ -216,7 +232,7 @@ async def heartbeat(): tasks = [ asyncio.create_task(listen(c)) for c in ( - "new_inbound", "incident_upserted", + "new_inbound", "inbound_triaged", "incident_upserted", "agent_thinking", "decision_made", "suggestion_pending", "suggestion_resolved", ) diff --git a/server/db/identity.py b/server/db/identity.py index 6918f4f..37a1b3e 100644 --- a/server/db/identity.py +++ b/server/db/identity.py @@ -1,10 +1,11 @@ from datetime import datetime from typing import Optional -from sqlalchemy import Boolean, DateTime, Float, ForeignKey, Integer, String, Text +from sqlalchemy import Boolean, DateTime, Float, ForeignKey, Integer, String, Text, select +from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.orm import Mapped, mapped_column, relationship -from server.db.base import Base, CreatedAt, ULIDPK, UpdatedAt +from server.db.base import Base, CreatedAt, ULIDPK, UpdatedAt, generate_ulid class NGO(Base): @@ -37,3 +38,34 @@ class Account(Base): source: Mapped[str] = mapped_column(String(16), default="app", nullable=False) created_at: Mapped[CreatedAt] updated_at: Mapped[UpdatedAt] + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +DEFAULT_NGO_NAME = "Local Demo NGO" + + +async def get_or_create_default_ngo(db: AsyncSession) -> "NGO": + """Return the single configured NGO; create a default one if missing. + + Used by both the operator-side `POST /api/incidents` and the + civilian-side `POST /v1/register` so a fresh-volume DB always has + somewhere to attach. Multi-NGO deployments (>1 NGO row) are still + rejected at the call site since we have no routing key. + """ + rows = (await db.execute(select(NGO))).scalars().all() + if len(rows) == 1: + return rows[0] + if len(rows) > 1: + raise RuntimeError(f"Expected at most 1 NGO, found {len(rows)}.") + ngo = NGO( + ngo_id=generate_ulid(), + name=DEFAULT_NGO_NAME, + region_geohash_prefix="sv", + ) + db.add(ngo) + await db.flush() + return ngo diff --git a/server/llm/triage_client.py b/server/llm/triage_client.py index eaccdc4..dc8657d 100644 --- a/server/llm/triage_client.py +++ b/server/llm/triage_client.py @@ -41,16 +41,42 @@ async def classify(body: str, alert_summary: Optional[str]) -> dict: client = anthropic.AsyncAnthropic(api_key=api_key) system = ( - "You are a triage classifier for civilian sighting reports for a missing-person " - "alert system. Classify the message, extract a 6-character geohash if possible, " - "detect language, and produce a stable dedup_hash from the normalized body. " - "Return ONLY the structured tool call." + "You are a triage classifier for civilian messages reaching an NGO operating " + "in a humanitarian crisis. The NGO works four kinds of cases:\n" + " 1. missing_person — people reported missing, last-seen reports, sightings of missing persons\n" + " 2. medical — injuries, illness, medical evacuation, requests for medics or supplies\n" + " 3. resource_shortage — water, food, fuel, shelter, baby formula, blankets, etc.\n" + " 4. safety — fires, unsafe routes, ongoing incidents, security threats, evacuation needs\n\n" + "PRIMARY RULE: if the message could plausibly be promoted into ANY of the four " + "case categories above, classify it as `sighting`. This is by far the most common " + "category. Examples that are ALL `sighting`:\n" + " • 'I NEED A MEDIC RIGHT NOW' (medical)\n" + " • 'we have no water in district 4 since yesterday' (resource_shortage)\n" + " • 'fire on the south road, avoid' (safety)\n" + " • 'my brother is missing, last seen at the bus station' (missing_person)\n" + " • 'I saw Maryam near the central market' (missing_person)\n" + " • 'ALERT, my neighbour is missing' (missing_person — the word ALERT is the writer " + " raising one, NOT acknowledging a prior NGO alert)\n\n" + "Other categories — use sparingly:\n" + " • question — the user is ONLY asking the NGO for information / status and reports nothing new " + " (e.g. 'any updates on Maryam?'). If they're also reporting something, it's a `sighting`.\n" + " • ack — short acknowledgement of an NGO-issued alert the user previously received. " + " Words like 'ALERT' or 'help' written BY the user do NOT make a message an ack — those are " + " sightings. True acks look like: 'received', 'got it, thanks', 'understood', 'on my way'.\n" + " • noise — actually off-topic, automated, gibberish, or unrelated to humanitarian work. " + " A short message that names a person, a place, or a need is NEVER noise.\n" + " • bad_actor — spam, abuse, or appears intentionally false.\n\n" + "When in doubt between `sighting` and any other label, choose `sighting`. The cost of " + "missing a real distress signal far outweighs the cost of an operator glancing at one extra " + "message.\n\n" + "Extract a 6-character geohash if a location is mentionable, detect language, and produce a " + "stable dedup_hash from the normalized body. Return ONLY the structured tool call." ) context = f"\n\nAlert context: {alert_summary}" if alert_summary else "" tool = { "name": "classify", - "description": "Classify an inbound civilian sighting message.", + "description": "Classify an inbound civilian distress / report message across all NGO case categories.", "input_schema": { "type": "object", "properties": { diff --git a/server/main.py b/server/main.py index 99f7e13..0d4df2e 100644 --- a/server/main.py +++ b/server/main.py @@ -12,6 +12,7 @@ from server.api.agent_feed import router as agent_feed_router from server.api.audiences import router as audiences_router +from server.api.civilian import router as civilian_router from server.api.dashboard import router as dashboard_router from server.api.health import router as health_router from server.api.incidents import router as incidents_router @@ -140,6 +141,7 @@ async def lifespan(app: FastAPI): app.include_router(sim_router) app.include_router(webhooks_router) app.include_router(ws_router) +app.include_router(civilian_router) # --------------------------------------------------------------------------- diff --git a/server/workers/triage.py b/server/workers/triage.py index c28021a..a45f7ee 100644 --- a/server/workers/triage.py +++ b/server/workers/triage.py @@ -103,7 +103,12 @@ async def triage_worker_loop( try: await _process_message(msg_id, session_maker) retry_counts.pop(msg_id, None) + # Wakes the agent worker on alert-tied messages. await eventbus.publish("bucket_open", msg_id) + # Wakes the WS layer to re-emit the message event enriched with + # triage data so the dashboard's wire can render the + # classification pill + "Make a case" affordance. + await eventbus.publish("inbound_triaged", msg_id) except asyncio.CancelledError: raise except Exception as exc: diff --git a/web/src/components/CreateCaseModal.tsx b/web/src/components/CreateCaseModal.tsx new file mode 100644 index 0000000..01f7a50 --- /dev/null +++ b/web/src/components/CreateCaseModal.tsx @@ -0,0 +1,227 @@ +import { useEffect, useState } from "react"; +import { createIncident } from "../lib/api"; +import { useStore } from "../lib/store"; +import type { Incident, Region } from "../lib/types"; + +interface Props { + onClose: () => void; + onCreated?: (incident: Incident) => void; + /** Pre-fill values when promoting an inbound message to a case. */ + defaults?: { + description?: string; + region?: Region; + personName?: string; + }; +} + +const REGION_OPTIONS: { value: Region; label: string }[] = [ + { value: "IRQ_BAGHDAD", label: "Baghdad, Iraq" }, + { value: "IRQ_MOSUL", label: "Mosul, Iraq" }, + { value: "SYR_ALEPPO", label: "Aleppo, Syria" }, + { value: "SYR_DAMASCUS", label: "Damascus, Syria" }, + { value: "YEM_SANAA", label: "Sana'a, Yemen" }, + { value: "LBN_BEIRUT", label: "Beirut, Lebanon" }, +]; + +const CATEGORY_OPTIONS = [ + { value: "missing_person", label: "Missing person" }, + { value: "medical", label: "Medical" }, + { value: "resource_shortage", label: "Resource shortage" }, + { value: "safety", label: "Safety" }, +] as const; + +const URGENCY_OPTIONS = [ + { value: "low", label: "Low" }, + { value: "medium", label: "Medium" }, + { value: "high", label: "High" }, + { value: "critical", label: "Critical" }, +] as const; + +type CategoryValue = (typeof CATEGORY_OPTIONS)[number]["value"]; +type UrgencyValue = (typeof URGENCY_OPTIONS)[number]["value"]; + +export function CreateCaseModal({ onClose, onCreated, defaults }: Props) { + const upsertIncident = useStore((s) => s.upsertIncident); + const selectIncident = useStore((s) => s.selectIncident); + const me = useStore((s) => s.me); + + // Default region: explicit override > operator's first allowed region > fallback. + const defaultRegion: Region = + defaults?.region ?? (me?.regions[0] as Region) ?? "SYR_ALEPPO"; + + const [personName, setPersonName] = useState(defaults?.personName ?? ""); + const [description, setDescription] = useState(defaults?.description ?? ""); + const [region, setRegion] = useState(defaultRegion); + const [category, setCategory] = useState("missing_person"); + const [urgency, setUrgency] = useState("medium"); + const [submitting, setSubmitting] = useState(false); + const [error, setError] = useState(null); + + // ESC closes + useEffect(() => { + function onKey(e: KeyboardEvent) { + if (e.key === "Escape" && !submitting) onClose(); + } + document.addEventListener("keydown", onKey); + return () => document.removeEventListener("keydown", onKey); + }, [onClose, submitting]); + + async function handleSubmit(e: React.FormEvent) { + e.preventDefault(); + if (!personName.trim() || !description.trim() || submitting) return; + setError(null); + setSubmitting(true); + try { + const inc = await createIncident({ + person_name: personName.trim(), + description: description.trim(), + region, + category, + urgency_tier: urgency, + }); + // Optimistically upsert so the new card shows up before the WS event lands. + upsertIncident(inc); + selectIncident(inc.id); + onCreated?.(inc); + onClose(); + } catch (err) { + setError(err instanceof Error ? err.message : "create failed"); + } finally { + setSubmitting(false); + } + } + + return ( +
{ + if (e.target === e.currentTarget && !submitting) onClose(); + }} + > +
+
+

+ Create case +

+

+ Operator-issued amber alert +

+
+ +
+ + setPersonName(e.target.value)} + required + maxLength={200} + autoFocus + placeholder="e.g. Maryam, 11" + className="w-full rounded border border-surface-300 px-3 py-2 text-sm focus:outline-none focus:border-accent-500" + /> + + + +