A LangGraph pipeline that turns a customer CSV into a 30-60-90 retention playbook.
CSV → Survival analysis → RAG-grounded diagnosis → Strategy agents → Simulated playbook — streamed live over SSE.
Overview · Architecture · Graph · RAG · UI Flow · Stack · Setup
Retain AI ingests a customer CSV plus a qualitative questionnaire and produces a ranked, evidence-cited retention playbook. The backend is an 18-node LangGraph that fans out to parallel LLM agents for discovery and strategy, with a Chroma-backed RAG layer grounding root-cause diagnosis in curated retention frameworks. The frontend is a Next.js 16 App Router app that streams each stage's output as it completes via Server-Sent Events.
Three things the graph does that aren't visible in the final playbook:
- Kaplan-Meier survival on the raw tenure + churn columns (via
lifelines) — powers the interactive churn probability slider on the results page. - CoxPH predictive risk model — identifies which currently-active users have the shortest expected remaining lifetime.
- Signal-boosted RAG retrieval — the forensic agent derives signal tags from the stats (e.g.
30_day_cliff,low_integration) and biases retrieval toward framework chunks tagged with those signals.
Click to expand Architecture Diagram
flowchart TD
%% Native GitHub theme adaptation
classDef default fill:transparent,stroke:#52525B,stroke-width:1px,rx:6px,ry:6px;
classDef ui fill:transparent,stroke:#3b82f6,stroke-width:2px,rx:6px,ry:6px;
classDef ext fill:transparent,stroke:#8b5cf6,stroke-width:2px,stroke-dasharray: 4 4,rx:6px,ry:6px;
classDef agent fill:transparent,stroke:#14b8a6,stroke-width:2px,rx:6px,ry:6px;
classDef util fill:transparent,stroke:#64748b,stroke-width:2px,rx:6px,ry:6px;
classDef gate fill:transparent,stroke:#f59e0b,stroke-width:2px,rx:6px,ry:6px;
UI[Next.js UI<br/>form + results]:::ui -->|POST /analyze| API[FastAPI]:::ext
API -.->|SSE stream live| UI
API -->|Inngest event| JOB[analyze_retention_job]:::ext
JOB --> II
%% LangGraph Pipeline
II[input_ingest]:::util --> DA{data_audit}:::gate
%% Main branch
DA -->|score ≥ 0.5| FE[feature_engineering]:::util
%% Side branch
DA -.->|score < 0.5| RH[retry_handler]:::util
RH -.-> FE
FE --> BM[behavioral_map]:::util
subgraph Discovery [Discovery Pod]
FD[forensic_detective]:::agent
PM[pattern_matcher]:::agent
end
style Discovery fill:none,stroke:#52525B,stroke-width:1px,stroke-dasharray:5 5,color:#A1A1AA
BM --> FD & PM
FD & PM --> DM[diagnosis_merge]:::gate
DM --> HV{hypothesis_validation}:::gate
HV -->|verified| CA[constraint_add]:::util
CA --> HITL[adaptive_hitl]:::agent
subgraph Strategy [Strategy Pod]
UE[unit_economist]:::agent
JTBD[jtbd_specialist]:::agent
GH[growth_hacker]:::agent
end
style Strategy fill:none,stroke:#52525B,stroke-width:1px,stroke-dasharray:5 5,color:#A1A1AA
HITL --> UE & JTBD & GH
UE & JTBD & GH --> SM[strategy_merge]:::gate
SM --> SIM[simulation]:::util
SIM --> SC[strategy_critic]:::agent
SC -->|approved| EA[execution_architect]:::agent
EA --> END([END]):::util
%% Loops (placed at end to keep DAG rank aligned)
RH -.->|loop| II
HV -.->|weak| BM
SC -.->|low_lift/violation| HITL
RAG[(Chroma<br/>vector DB)]:::ext
FD -.->|RAG query| RAG
%% Interactivity
click II "./docs/nodes/input-ingest.md" "Ingest and normalize raw CSV data"
click DA "./docs/nodes/data-audit.md" "Quality score check for nulls and size"
click RH "./docs/nodes/retry-handler.md" "Fallback loops for low-quality data"
click FE "./docs/nodes/feature-engineering.md" "Computes RFM, LTV, CoxPH risk model"
click BM "./docs/nodes/behavioral-map.md" "Fits Kaplan-Meier survival curves & cohorts"
click FD "./docs/nodes/forensic-detective.md" "Investigates root-cause diagnosis using RAG"
click PM "./docs/nodes/pattern-matcher.md" "Discovers hidden behavioral segments & sequences"
click DM "./docs/nodes/diagnosis-merge.md" "Cross-examines and merges causal hypotheses"
click HV "./docs/nodes/hypothesis-validation.md" "Gates weak vs verified hypotheses"
click CA "./docs/nodes/constraint-add.md" "Filters feasibility by budget and legal rules"
click HITL "./docs/nodes/adaptive-hitl.md" "Requests clarifying answers from the human"
click UE "./docs/nodes/unit-economist.md" "Builds ROI and LTV/CAC strategies"
click JTBD "./docs/nodes/jtbd-specialist.md" "Builds Jobs-to-be-Done interventions"
click GH "./docs/nodes/growth-hacker.md" "Builds AARRR tactics and experiments"
click SM "./docs/nodes/strategy-merge.md" "Consolidates and ranks top recommendations"
click SIM "./docs/nodes/simulation.md" "Monte Carlo 10k simulations for lift %"
click SC "./docs/nodes/strategy-critic.md" "Senior-partner style critique and review"
click EA "./docs/nodes/execution-architect.md" "Outputs final 30-60-90 day playbook"
click RAG "./docs/rag.md" "Stores and retrieves framework context"
Parallel fan-out is native LangGraph: behavioral_map emits edges to both discovery nodes; adaptive_hitl emits edges to all three strategy nodes. Merge nodes collect the outputs.
Entry: input_ingest · Exit: execution_architect → END · Compiled in backend/app/graph/builder.py.
| # | Node | Role | Tool / Model |
|---|---|---|---|
| 1 | input_ingest | Load CSV, detect key columns | DuckDB |
| 2 | data_audit | Quality score (nulls, dupes, size) | Pandas |
| — | retry_handler | Loop back if score < 0.5 | — |
| 3 | feature_engineering | RFM, LTV, CoxPH risk model | lifelines CoxPHFitter |
| 4 | behavioral_map | KM survival curve + cohorts | lifelines KaplanMeierFitter |
| 5a | forensic_detective | Root-cause diagnosis (RAG-grounded) | Gemini 3 Flash + Chroma |
| 5b | pattern_matcher | Segment + sequence discovery | Gemini 3 Flash |
| 5c | diagnosis_merge | Run skeptic, merge hypotheses | Gemini 3 Flash |
| 6 | hypothesis_validation | Confidence × robustness gate | pure Python |
| 7 | constraint_add | Budget/legal feasibility filter | pure Python |
| 8 | adaptive_hitl | Generate clarifying questions | Gemini 3 Flash |
| 9a | unit_economist | ROI/LTV-CAC strategies | Groq Llama 3.3 70B |
| 9b | jtbd_specialist | Jobs-to-be-Done strategies | Groq Llama 3.3 70B |
| 9c | growth_hacker | AARRR tactics + experiments | Groq Llama 3.3 70B |
| 9d | strategy_merge | Rank merged recommendations | pure Python |
| 10 | simulation | Monte Carlo lift (10k runs) | NumPy |
| 11 | strategy_critic | Senior-partner review | Groq Llama 3.3 70B |
| 12 | execution_architect | Final 30-60-90 playbook | Groq Llama 3.3 70B |
Routing thresholds live in backend/app/graph/conditions.py. Retry/iteration loops are currently capped at 0 (one-shot); raise MAX_RETRIES, MAX_DISCOVERY_ATTEMPTS, or MAX_CRITIC_ITERATIONS to enable looping.
The forensic agent retrieves 5 relevant framework chunks before reasoning about root causes, and must cite the chunk IDs it used in its JSON output. Chunks are ranked by cosine similarity plus a signal-tag boost (e.g. if stats show a 30-day cliff, chunks tagged 30_day_cliff get +0.05 per matching tag). See docs/rag.md for the corpus, ingestion, and retrieval logic.
flowchart LR
S[stats + behavior_curves] --> D[_derive_signals]
D --> Q[RAG query + signals]
Q --> R[Chroma.query<br/>k=5]
R --> B[signal boost<br/>+0.05 per tag]
B --> E[evidence_block<br/>in prompt]
E --> L[Gemini 3 Flash]
L --> C[causes + citations]
sequenceDiagram
participant U as User
participant F as Next.js UI
participant A as FastAPI
participant I as Inngest
participant G as LangGraph
U->>F: Fill 5-phase form + attach CSV
F->>A: POST /upload (FormData)
A-->>F: { file_path }
F->>A: POST /analyze (questionnaire + file_path)
A->>A: create job_id, queue asyncio.Queue
A->>I: send event app/analyze
A-->>F: { job_id }
F->>A: GET /analyze/stream/{job_id} (SSE)
I->>G: invoke analyze_retention_job
G->>G: astream nodes
loop every node
G-->>A: current_node update
A-->>F: SSE event (risk_ready / churn_profile_ready / diagnosis_ready / solution_ready)
F->>U: render card live
end
G-->>A: complete
A-->>F: SSE complete
Full detail in docs/ui-flow.md.
| Layer | Stack |
|---|---|
| Frontend | Next.js 16.2.4 (App Router), React 19, Tailwind v4, shadcn/ui, lucide-react |
| Backend | FastAPI, Inngest (background jobs), LangGraph, LangChain |
| LLMs | Google Gemini 3 Flash (discovery), Groq Llama 3.3 70B (strategy & critique) |
| Data | DuckDB (CSV parsing), Pandas, NumPy, lifelines (KM + CoxPH) |
| RAG | ChromaDB (PersistentClient, all-MiniLM-L6-v2, cosine) |
| Transport | Server-Sent Events (SSE) for live results streaming |
Local dev — click to expand
# Backend
cd backend
python -m venv .venv && source .venv/bin/activate
pip install -r requirements.txt
python -m app.rag.ingest # one-time: load retention corpus into Chroma
uvicorn app.main:app --reload # http://localhost:8000
# Frontend
cd frontend
npm install
npm run dev # http://localhost:3000Required env (backend/.env):
GOOGLE_API_KEY_1=...
GOOGLE_API_KEY_2=...
GROQ_API_KEY_1=...
GROQ_API_KEY_2=...
GROQ_API_KEY_3=...
INNGEST_DEV=1
The form handles explicit CSV file uploads to a backend temp directory, but you can also drop sample CSVs directly into backend/data/ for local testing.
Further reading: State schema · Nodes · Agents · RAG · UI flow