Skip to content
View szubillaga's full-sized avatar

Block or report szubillaga

Block user

Prevent this user from interacting with your repositories and sending you notifications. Learn more about blocking users.

You must be logged in to block users.

Maximum 250 characters. Please don’t include any personal information such as legal names or email addresses. Markdown is supported. This note will only be visible to you.
Report abuse

Contact GitHub support about this user’s behavior. Learn more about reporting abuse.

Report abuse
szubillaga/README.md

Logística Patagonia — Data Warehouse

Centraliza datos operativos de CartonCloud y Urbantz en un único Postgres consultable desde BI y SQL ad-hoc.

  • Almacén: Supabase Postgres (proyecto LogisticaPatagonia, región sa-east-1).
  • Ingesta: workflows de n8n. CartonCloud por pull diario; Urbantz por webhook push en tiempo real.
  • Latencia: ~24h para CartonCloud, segundos para Urbantz.
  • Consumo: Metabase / Looker Studio / Power BI / queries SQL.

Arquitectura

CartonCloud API ─► n8n daily pull   ─┐
                                     ├──► Supabase Postgres ──► Metabase / Looker / SQL
Urbantz Webhooks ► n8n webhook push ─┘         │
                                               ├── raw.*       (JSONB crudo + claves)
                                               ├── staging.*   (vistas tipadas)
                                               ├── marts.*     (modelo de negocio)
                                               └── _meta.*     (sync_state, sync_run)

Capas del DWH

Schema Para qué Acceso
raw Payloads crudos por endpoint, JSONB + columnas extraídas sólo procesos de ingesta
staging Vistas tipadas y limpias sobre raw bi_reader (read-only)
marts Modelo de negocio para BI (fact_orders, fact_shipments, fact_purchase_orders, dim_order_status_history, dim_purchase_order_status_history) bi_reader (read-only)
_meta sync_state (high-watermarks), sync_run (historial) sólo procesos de ingesta

Idempotencia e incrementalidad

  • Cada tabla raw.* tiene como PK el id natural de la API → INSERT ... ON CONFLICT (id) DO UPDATE.
  • _meta.sync_state.last_high_watermark almacena MAX(api_updated_at) del último run exitoso.
  • Cada ejecución abre un sync_run, hace updatedSince = last_high_watermark, ingesta, y avanza el watermark al cerrar.
  • Re-ejecutar un run NO duplica filas: actualiza las que ya estaban.

Historia de estados

raw.cartoncloud_orders siempre muestra el estado actual de cada orden (la PK es el id de CartonCloud y el upsert sobreescribe el payload con cada cambio: DRAFT → PACKING_IN_PROGRESS → PACKED → DISPATCHED → ...).

Para no perder la historia, un trigger en Postgres detecta cuando cambia status y appendea un evento a raw.cartoncloud_order_events. Los workflows de n8n no necesitan hacer nada — el trigger es transparente al upsert.

Lo mismo aplica a consignments (raw.cartoncloud_shipment_events).

Para análisis, hay dos vistas en marts:

  • marts.dim_order_status_history — una fila por transición de estado con entered_at, exited_at, hours_in_status, is_current.
  • marts.dim_shipment_status_history — idem para consignments.
-- ¿Cuánto tiempo en promedio queda una orden en cada estado?
SELECT current_status, AVG(hours_in_status) AS avg_hours
FROM marts.dim_order_status_history
WHERE NOT is_current
GROUP BY current_status
ORDER BY avg_hours DESC;

-- ¿En qué estado estaba ORD-X el 5 de mayo?
SELECT current_status
FROM marts.dim_order_status_history
WHERE order_id = 'ORD-X' AND entered_at <= '2026-05-05'
ORDER BY entered_at DESC LIMIT 1;

Layout del repo

supabase/migrations/   SQL ordenado por timestamp. Aplicar con `supabase db push`
                       o vía dashboard / MCP.
n8n/                   Código fuente (SDK n8n) de los workflows.
                       Los workflows ya están creados en n8n; estos archivos
                       son la fuente de verdad versionada.

Setup

1. Schemas en Supabase

Ya aplicados al proyecto LogisticaPatagonia (qblcptyhyzbpbqcwiwtp):

  • 20260511000001_schemas_and_sync_state.sql
  • 20260511000002_raw_cartoncloud.sql
  • 20260511000003_staging_cartoncloud.sql
  • 20260511000004_marts_and_bi_reader.sql
  • 20260511000005_harden_internal_schemas.sql

2. Credenciales en n8n

Hay que crear dos credenciales con estos nombres exactos (lo que esperan los workflows):

Nombre Tipo Campos
Supabase DWH Postgres host=db.qblcptyhyzbpbqcwiwtp.supabase.co, port=5432, db=postgres, ssl on, user con permisos sobre raw, _meta (usar service role o un usuario dedicado)
CartonCloud OAuth2 OAuth2 API grant=Client Credentials, token URL = del docs de CartonCloud, client id/secret de un API Client en CartonCloud

Crear el API Client en CartonCloud: Settings → API Clients → New Client, asignarle el rol WMS Create Job (u otro con permisos de lectura sobre orders y consignments).

3. Ajustes en los workflows

Los dos workflows usan el endpoint Search de CartonCloud (POST con árbol de condiciones), no GET. Cada uno tiene un node Config al inicio donde se configura el tenant_id en un único lugar (propaga al URL y a la columna tenant_id en raw.*).

  • CartonCloud Orders → Supabase (Daily) — ya apunta a POST /tenants/{tenantId}/outbound-orders/search (confirmado contra api-docs.cartoncloud.com). Sólo hay que setear tenant_id en el node Config.

  • CartonCloud Consignments → Supabase (Daily) — apunta a POST /tenants/{tenantId}/consignments/search. Endpoint inferido, verificar contra los docs.

Cosas a verificar contra los docs antes de activar

El cuerpo del POST filtra por /timestamps/modified/time con AndCondition (>= window_from, < window_to). El docs sólo muestra TextComparisonCondition con métodos STARTS_WITH y EQUAL_TO, así que estos valores son inferencias razonables pero pueden no ser exactos:

  • "type": "TimestampComparisonCondition" — podría llamarse DateComparisonCondition o similar.
  • "method": "GREATER_THAN_OR_EQUAL_TO" / "LESS_THAN" — podrían ser >= / <, AFTER / BEFORE, etc.
  • Paginación via ?page=N&limit=200 — el docs menciona "see Pagination" pero no se vio esa sección; verificar nombres de query params reales.

Si algún valor está mal, el endpoint devuelve 400 con un mensaje claro. Ajustar el jsonBody en Search CartonCloud Orders / Search CartonCloud Consignments y el bloque de paginación.

4. Backfill histórico

Para no esperar a que el watermark consuma todo de a poco:

Opción A — la API soporta rangos amplios: Ejecutar manualmente el workflow con _meta.sync_state.last_high_watermark = '1970-01-01' (default). Va a paginar todo de una.

Opción B — usar reportes CSV existentes de CartonCloud:

\copy raw.cartoncloud_orders (id, tenant_id, order_type, status, customer_ref,
                              api_created_at, api_updated_at, payload)
FROM 'orders_backfill.csv' WITH CSV HEADER;

Después de cargar, mover el watermark al máximo:

INSERT INTO _meta.sync_state (source_system, entity, last_high_watermark, last_status)
SELECT 'cartoncloud', 'orders', MAX(api_updated_at), 'success'
FROM raw.cartoncloud_orders
ON CONFLICT (source_system, entity) DO UPDATE
  SET last_high_watermark = EXCLUDED.last_high_watermark;

5. Conectar BI

host:     db.qblcptyhyzbpbqcwiwtp.supabase.co
port:     5432
database: postgres
user:     (rol con LOGIN que herede de bi_reader)
schemas:  staging, marts
ssl:      required

Para crear el usuario BI:

CREATE ROLE bi_metabase LOGIN PASSWORD '<strong-password>' IN ROLE bi_reader;

bi_reader ya tiene SELECT sobre staging.* y marts.*. No tiene acceso a raw ni _meta (a propósito).


Operación

Observar runs

-- Últimos 20 syncs
SELECT source_system, entity, started_at, finished_at, status, rows_ingested, error_message
FROM _meta.sync_run
ORDER BY started_at DESC
LIMIT 20;

-- Estado actual por entidad
SELECT * FROM _meta.sync_state;

Re-ingerir desde una fecha

UPDATE _meta.sync_state
SET last_high_watermark = '2026-04-01'::timestamptz
WHERE source_system = 'cartoncloud' AND entity = 'orders';

Y disparar el workflow manualmente en n8n.

Cambiar la cadencia

Editar el node Daily 02:00 en n8n → cambiar field a hours o minutes, o pasarse a una expresión cron.


Setup del workflow de Purchase Orders (Inbound Orders)

El schema en Supabase ya está creado (migración 20260513000001_cartoncloud_purchase_orders.sql). El source del workflow para n8n está en n8n/cartoncloud_purchase_orders_sync.workflow.ts.

Para evitar tipear de nuevo cada node, lo más rápido es duplicar el workflow de Orders en la UI de n8n y modificar 4 cosas:

  1. En n8n, en la lista de workflows, ... al lado de "CartonCloud Orders → Supabase (Daily)" → Duplicate.
  2. Renombrar a CartonCloud Purchase Orders → Supabase (Daily). Cambiar Schedule a 03:00 (para no chocar con el de Orders).
  3. Node "Open Sync Run" → reemplazar la query:
    WITH state AS (
      SELECT COALESCE(last_high_watermark, '2024-01-01'::timestamptz) AS hwm
      FROM _meta.sync_state
      WHERE source_system = 'cartoncloud' AND entity = 'purchase_orders'
    ), inserted AS (
      INSERT INTO _meta.sync_run (source_system, entity, status, window_from, window_to)
      SELECT 'cartoncloud', 'purchase_orders', 'running',
             COALESCE((SELECT hwm FROM state), '2024-01-01'::timestamptz),
             NOW()
      RETURNING run_id, window_from, window_to
    )
    SELECT run_id::text AS run_id,
           to_char(window_from AT TIME ZONE 'UTC', 'YYYY-MM-DD') AS window_from,
           to_char(window_to   AT TIME ZONE 'UTC', 'YYYY-MM-DD') AS window_to
    FROM inserted;
  4. Node "Search CartonCloud Orders" (renombrarlo a "Search CartonCloud Purchase Orders"):
    • URL: cambiar outbound-orders/search por inbound-orders/search.
    • El JSON Body se mantiene igual (mismo formato de condición).
  5. Node "Normalize Orders" (renombrar a "Normalize Purchase Orders") → reemplazar el JS:
    const tenantId = $('Config').first().json.tenant_id;
    const runId = $('Open Sync Run').first().json.run_id;
    const out = [];
    for (const item of $input.all()) {
      const payload = item.json;
      const rows = Array.isArray(payload) ? payload : (payload.data ?? payload.results ?? [payload]);
      for (const po of rows) {
        if (!po || typeof po !== 'object') continue;
        const id = String(po.id ?? '');
        if (!id) continue;
        out.push({ json: {
          id,
          tenant_id: tenantId,
          status: po.status ?? null,
          supplier_ref: po.references?.supplier ?? po.references?.purchaseOrder ?? null,
          api_created_at: po.timestamps?.created?.time ?? null,
          api_updated_at: po.timestamps?.modified?.time ?? null,
          payload: JSON.stringify(po),
          source_run_id: runId,
        }});
      }
    }
    return out;
  6. Node "Upsert raw.cartoncloud_orders" → cambiar el table a cartoncloud_purchase_orders y dejar sólo las columnas: id, tenant_id, status, supplier_ref, api_created_at, api_updated_at, payload, source_run_id (sin order_type ni customer_ref).
  7. Node "Close Sync Run" → en la query reemplazar 'orders' por 'purchase_orders' y cartoncloud_orders por cartoncloud_purchase_orders.

Guardar. Sembrar el watermark si querés backfill:

INSERT INTO _meta.sync_state (source_system, entity, last_high_watermark, last_status)
VALUES ('cartoncloud', 'purchase_orders', '2024-01-01T00:00:00+00:00'::timestamptz, 'success')
ON CONFLICT (source_system, entity) DO UPDATE
SET last_high_watermark = EXCLUDED.last_high_watermark;

Después Execute workflow manualmente las veces que haga falta (~5-15 corridas si traen volumen similar a las sale orders).


Próximos pasos

  1. Validar contra API real — corregir URL/parámetros, mapear paths JSONB en staging.
  2. Alertas — agregar nodo Slack/Email al final del workflow para notificar errores (status = 'error').
  3. Materializar marts si las queries de BI se vuelven lentas: CREATE MATERIALIZED VIEW + REFRESH diario.

Urbantz — webhooks en tiempo real

A diferencia de CartonCloud (pull incremental), Urbantz nos dispara un webhook por cada cambio de estado de una task. n8n recibe, normaliza y persiste en dos tablas:

  • raw.urbantz_task_events — log inmutable, una fila por hit del webhook por task. PK webhook_id UNIQUE → idempotente ante retries de Urbantz.
  • raw.urbantz_tasks — estado actual, upsert por task_id protegido contra eventos out-of-order (sólo pisa cuando EXCLUDED.updated_at_source >= existing).

Vistas analíticas iniciales (en marts.*, ya visibles para bi_reader):

  • marts.v_otd_30d — OTD% por cliente, últimos 30 días.
  • marts.v_eta_quality_30d — desvío de arrival real vs ETA window, por hub y cliente.

Cómo correr

  1. Migración (DDL de las dos tablas + índices):

    supabase db push   # ya aplicada vía MCP en LogisticaPatagonia

    Ver supabase/migrations/20260528120000_urbantz_events.sql.

  2. Vistas analíticas (idempotentes, CREATE OR REPLACE VIEW):

    psql "$DATABASE_URL" -f sql/views.sql
  3. Workflow n8n: importar n8n/workflow.json. Credencial Postgres apuntando al pooler de Supabase (puerto 6543, aws-0-sa-east-1.pooler.supabase.com, user postgres.qblcptyhyzbpbqcwiwtp). Detalle en n8n/README.md.

  4. Smoke test del normalize contra el sample real:

    node tests/normalize.test.js
  5. Test end-to-end contra el webhook:

    jq '.[0].body' samples/webhook-delivered.json | \
      curl -X POST -H "Content-Type: application/json" \
        -H "x-webhook-id: $(uuidgen)" \
        -H "x-event-type: taskStatusChanged,taskChanged" \
        --data-binary @- \
        https://n8n.logisticapatagonia.com/webhook/84492e6a-180f-4d38-b392-bcf9b9acb653

Layout

samples/webhook-delivered.json                  # payload real del webhook (ejecución n8n 12367)
supabase/migrations/20260528120000_urbantz_events.sql
sql/views.sql                                    # v_otd_30d, v_eta_quality_30d
n8n/normalize.js                                 # función normalizadora (fuente canónica)
n8n/workflow.json                                # workflow importable
n8n/README.md                                    # import + credencial pooler
tests/normalize.test.js                          # smoke test del normalize

Popular repositories Loading

  1. szubillaga szubillaga Public

    Config files for my GitHub profile.

    TypeScript