Centraliza datos operativos de CartonCloud y Urbantz en un único Postgres consultable desde BI y SQL ad-hoc.
- Almacén: Supabase Postgres (proyecto
LogisticaPatagonia, regiónsa-east-1). - Ingesta: workflows de n8n. CartonCloud por pull diario; Urbantz por webhook push en tiempo real.
- Latencia: ~24h para CartonCloud, segundos para Urbantz.
- Consumo: Metabase / Looker Studio / Power BI / queries SQL.
CartonCloud API ─► n8n daily pull ─┐
├──► Supabase Postgres ──► Metabase / Looker / SQL
Urbantz Webhooks ► n8n webhook push ─┘ │
├── raw.* (JSONB crudo + claves)
├── staging.* (vistas tipadas)
├── marts.* (modelo de negocio)
└── _meta.* (sync_state, sync_run)
| Schema | Para qué | Acceso |
|---|---|---|
raw |
Payloads crudos por endpoint, JSONB + columnas extraídas |
sólo procesos de ingesta |
staging |
Vistas tipadas y limpias sobre raw |
bi_reader (read-only) |
marts |
Modelo de negocio para BI (fact_orders, fact_shipments, fact_purchase_orders, dim_order_status_history, dim_purchase_order_status_history) |
bi_reader (read-only) |
_meta |
sync_state (high-watermarks), sync_run (historial) |
sólo procesos de ingesta |
- Cada tabla
raw.*tiene como PK elidnatural de la API →INSERT ... ON CONFLICT (id) DO UPDATE. _meta.sync_state.last_high_watermarkalmacenaMAX(api_updated_at)del último run exitoso.- Cada ejecución abre un
sync_run, haceupdatedSince = last_high_watermark, ingesta, y avanza el watermark al cerrar. - Re-ejecutar un run NO duplica filas: actualiza las que ya estaban.
raw.cartoncloud_orders siempre muestra el estado actual de cada orden
(la PK es el id de CartonCloud y el upsert sobreescribe el payload con
cada cambio: DRAFT → PACKING_IN_PROGRESS → PACKED → DISPATCHED → ...).
Para no perder la historia, un trigger en Postgres detecta cuando cambia
status y appendea un evento a raw.cartoncloud_order_events. Los workflows
de n8n no necesitan hacer nada — el trigger es transparente al upsert.
Lo mismo aplica a consignments (raw.cartoncloud_shipment_events).
Para análisis, hay dos vistas en marts:
marts.dim_order_status_history— una fila por transición de estado conentered_at,exited_at,hours_in_status,is_current.marts.dim_shipment_status_history— idem para consignments.
-- ¿Cuánto tiempo en promedio queda una orden en cada estado?
SELECT current_status, AVG(hours_in_status) AS avg_hours
FROM marts.dim_order_status_history
WHERE NOT is_current
GROUP BY current_status
ORDER BY avg_hours DESC;
-- ¿En qué estado estaba ORD-X el 5 de mayo?
SELECT current_status
FROM marts.dim_order_status_history
WHERE order_id = 'ORD-X' AND entered_at <= '2026-05-05'
ORDER BY entered_at DESC LIMIT 1;supabase/migrations/ SQL ordenado por timestamp. Aplicar con `supabase db push`
o vía dashboard / MCP.
n8n/ Código fuente (SDK n8n) de los workflows.
Los workflows ya están creados en n8n; estos archivos
son la fuente de verdad versionada.
Ya aplicados al proyecto LogisticaPatagonia (qblcptyhyzbpbqcwiwtp):
20260511000001_schemas_and_sync_state.sql20260511000002_raw_cartoncloud.sql20260511000003_staging_cartoncloud.sql20260511000004_marts_and_bi_reader.sql20260511000005_harden_internal_schemas.sql
Hay que crear dos credenciales con estos nombres exactos (lo que esperan los workflows):
| Nombre | Tipo | Campos |
|---|---|---|
Supabase DWH |
Postgres | host=db.qblcptyhyzbpbqcwiwtp.supabase.co, port=5432, db=postgres, ssl on, user con permisos sobre raw, _meta (usar service role o un usuario dedicado) |
CartonCloud OAuth2 |
OAuth2 API | grant=Client Credentials, token URL = del docs de CartonCloud, client id/secret de un API Client en CartonCloud |
Crear el API Client en CartonCloud: Settings → API Clients → New Client, asignarle el rol WMS Create Job (u otro con permisos de lectura sobre orders y consignments).
Los dos workflows usan el endpoint Search de CartonCloud (POST con árbol
de condiciones), no GET. Cada uno tiene un node Config al inicio donde se
configura el tenant_id en un único lugar (propaga al URL y a la columna
tenant_id en raw.*).
-
CartonCloud Orders → Supabase (Daily) — ya apunta a
POST /tenants/{tenantId}/outbound-orders/search(confirmado contra api-docs.cartoncloud.com). Sólo hay que seteartenant_iden el nodeConfig. -
CartonCloud Consignments → Supabase (Daily) — apunta a
POST /tenants/{tenantId}/consignments/search. Endpoint inferido, verificar contra los docs.
El cuerpo del POST filtra por /timestamps/modified/time con
AndCondition (>= window_from, < window_to). El docs sólo muestra
TextComparisonCondition con métodos STARTS_WITH y EQUAL_TO, así que
estos valores son inferencias razonables pero pueden no ser exactos:
"type": "TimestampComparisonCondition"— podría llamarseDateComparisonConditiono similar."method": "GREATER_THAN_OR_EQUAL_TO"/"LESS_THAN"— podrían ser>=/<,AFTER/BEFORE, etc.- Paginación via
?page=N&limit=200— el docs menciona "see Pagination" pero no se vio esa sección; verificar nombres de query params reales.
Si algún valor está mal, el endpoint devuelve 400 con un mensaje claro. Ajustar
el jsonBody en Search CartonCloud Orders / Search CartonCloud
Consignments y el bloque de paginación.
Para no esperar a que el watermark consuma todo de a poco:
Opción A — la API soporta rangos amplios:
Ejecutar manualmente el workflow con _meta.sync_state.last_high_watermark = '1970-01-01' (default). Va a paginar todo de una.
Opción B — usar reportes CSV existentes de CartonCloud:
\copy raw.cartoncloud_orders (id, tenant_id, order_type, status, customer_ref,
api_created_at, api_updated_at, payload)
FROM 'orders_backfill.csv' WITH CSV HEADER;Después de cargar, mover el watermark al máximo:
INSERT INTO _meta.sync_state (source_system, entity, last_high_watermark, last_status)
SELECT 'cartoncloud', 'orders', MAX(api_updated_at), 'success'
FROM raw.cartoncloud_orders
ON CONFLICT (source_system, entity) DO UPDATE
SET last_high_watermark = EXCLUDED.last_high_watermark;host: db.qblcptyhyzbpbqcwiwtp.supabase.co
port: 5432
database: postgres
user: (rol con LOGIN que herede de bi_reader)
schemas: staging, marts
ssl: required
Para crear el usuario BI:
CREATE ROLE bi_metabase LOGIN PASSWORD '<strong-password>' IN ROLE bi_reader;bi_reader ya tiene SELECT sobre staging.* y marts.*. No tiene acceso a
raw ni _meta (a propósito).
-- Últimos 20 syncs
SELECT source_system, entity, started_at, finished_at, status, rows_ingested, error_message
FROM _meta.sync_run
ORDER BY started_at DESC
LIMIT 20;
-- Estado actual por entidad
SELECT * FROM _meta.sync_state;UPDATE _meta.sync_state
SET last_high_watermark = '2026-04-01'::timestamptz
WHERE source_system = 'cartoncloud' AND entity = 'orders';Y disparar el workflow manualmente en n8n.
Editar el node Daily 02:00 en n8n → cambiar field a hours o minutes,
o pasarse a una expresión cron.
El schema en Supabase ya está creado (migración 20260513000001_cartoncloud_purchase_orders.sql).
El source del workflow para n8n está en n8n/cartoncloud_purchase_orders_sync.workflow.ts.
Para evitar tipear de nuevo cada node, lo más rápido es duplicar el workflow de Orders en la UI de n8n y modificar 4 cosas:
- En n8n, en la lista de workflows, ... al lado de "CartonCloud Orders → Supabase (Daily)" → Duplicate.
- Renombrar a
CartonCloud Purchase Orders → Supabase (Daily). Cambiar Schedule a 03:00 (para no chocar con el de Orders). - Node "Open Sync Run" → reemplazar la query:
WITH state AS ( SELECT COALESCE(last_high_watermark, '2024-01-01'::timestamptz) AS hwm FROM _meta.sync_state WHERE source_system = 'cartoncloud' AND entity = 'purchase_orders' ), inserted AS ( INSERT INTO _meta.sync_run (source_system, entity, status, window_from, window_to) SELECT 'cartoncloud', 'purchase_orders', 'running', COALESCE((SELECT hwm FROM state), '2024-01-01'::timestamptz), NOW() RETURNING run_id, window_from, window_to ) SELECT run_id::text AS run_id, to_char(window_from AT TIME ZONE 'UTC', 'YYYY-MM-DD') AS window_from, to_char(window_to AT TIME ZONE 'UTC', 'YYYY-MM-DD') AS window_to FROM inserted;
- Node "Search CartonCloud Orders" (renombrarlo a "Search CartonCloud Purchase Orders"):
- URL: cambiar
outbound-orders/searchporinbound-orders/search. - El JSON Body se mantiene igual (mismo formato de condición).
- URL: cambiar
- Node "Normalize Orders" (renombrar a "Normalize Purchase Orders") → reemplazar el JS:
const tenantId = $('Config').first().json.tenant_id; const runId = $('Open Sync Run').first().json.run_id; const out = []; for (const item of $input.all()) { const payload = item.json; const rows = Array.isArray(payload) ? payload : (payload.data ?? payload.results ?? [payload]); for (const po of rows) { if (!po || typeof po !== 'object') continue; const id = String(po.id ?? ''); if (!id) continue; out.push({ json: { id, tenant_id: tenantId, status: po.status ?? null, supplier_ref: po.references?.supplier ?? po.references?.purchaseOrder ?? null, api_created_at: po.timestamps?.created?.time ?? null, api_updated_at: po.timestamps?.modified?.time ?? null, payload: JSON.stringify(po), source_run_id: runId, }}); } } return out;
- Node "Upsert raw.cartoncloud_orders" → cambiar el table a
cartoncloud_purchase_ordersy dejar sólo las columnas:id,tenant_id,status,supplier_ref,api_created_at,api_updated_at,payload,source_run_id(sinorder_typenicustomer_ref). - Node "Close Sync Run" → en la query reemplazar
'orders'por'purchase_orders'ycartoncloud_ordersporcartoncloud_purchase_orders.
Guardar. Sembrar el watermark si querés backfill:
INSERT INTO _meta.sync_state (source_system, entity, last_high_watermark, last_status)
VALUES ('cartoncloud', 'purchase_orders', '2024-01-01T00:00:00+00:00'::timestamptz, 'success')
ON CONFLICT (source_system, entity) DO UPDATE
SET last_high_watermark = EXCLUDED.last_high_watermark;Después Execute workflow manualmente las veces que haga falta (~5-15 corridas si traen volumen similar a las sale orders).
- Validar contra API real — corregir URL/parámetros, mapear paths JSONB en
staging. - Alertas — agregar nodo Slack/Email al final del workflow para notificar errores (
status = 'error'). - Materializar marts si las queries de BI se vuelven lentas:
CREATE MATERIALIZED VIEW+REFRESHdiario.
A diferencia de CartonCloud (pull incremental), Urbantz nos dispara un webhook por cada cambio de estado de una task. n8n recibe, normaliza y persiste en dos tablas:
raw.urbantz_task_events— log inmutable, una fila por hit del webhook por task. PKwebhook_idUNIQUE → idempotente ante retries de Urbantz.raw.urbantz_tasks— estado actual, upsert portask_idprotegido contra eventos out-of-order (sólo pisa cuandoEXCLUDED.updated_at_source >= existing).
Vistas analíticas iniciales (en marts.*, ya visibles para bi_reader):
marts.v_otd_30d— OTD% por cliente, últimos 30 días.marts.v_eta_quality_30d— desvío de arrival real vs ETA window, por hub y cliente.
-
Migración (DDL de las dos tablas + índices):
supabase db push # ya aplicada vía MCP en LogisticaPatagoniaVer
supabase/migrations/20260528120000_urbantz_events.sql. -
Vistas analíticas (idempotentes,
CREATE OR REPLACE VIEW):psql "$DATABASE_URL" -f sql/views.sql -
Workflow n8n: importar
n8n/workflow.json. Credencial Postgres apuntando al pooler de Supabase (puerto 6543,aws-0-sa-east-1.pooler.supabase.com, userpostgres.qblcptyhyzbpbqcwiwtp). Detalle enn8n/README.md. -
Smoke test del normalize contra el sample real:
node tests/normalize.test.js
-
Test end-to-end contra el webhook:
jq '.[0].body' samples/webhook-delivered.json | \ curl -X POST -H "Content-Type: application/json" \ -H "x-webhook-id: $(uuidgen)" \ -H "x-event-type: taskStatusChanged,taskChanged" \ --data-binary @- \ https://n8n.logisticapatagonia.com/webhook/84492e6a-180f-4d38-b392-bcf9b9acb653
samples/webhook-delivered.json # payload real del webhook (ejecución n8n 12367)
supabase/migrations/20260528120000_urbantz_events.sql
sql/views.sql # v_otd_30d, v_eta_quality_30d
n8n/normalize.js # función normalizadora (fuente canónica)
n8n/workflow.json # workflow importable
n8n/README.md # import + credencial pooler
tests/normalize.test.js # smoke test del normalize
