Skip to content

feat(job): refactor job logging so it isn't a bottleneck #1256

@mihow

Description

@mihow

Observation

During a moderately-sized async ML job (~742 images), the Celery worker running process_nats_pipeline_result begins producing a recurring pair of errors once multiple NATS results are in flight:

ERROR Failed to save logs for job #N: SoftTimeLimitExceeded()
ERROR Failed to save logs for job #N: sending query failed: another command is already in progress
ERROR Error processing pipeline result for job N: sending query failed: another command is already in progress. NATS will re-deliver the task message.

The same 5-minute window also shows Django containers logging terminating connection due to idle-in-transaction timeout. Errors are spread across all ForkPoolWorker-* processes (not clustered on one) and are only ever attached to the single actively-processing job row. A live pg_stat_activity snapshot during the error window showed no lock contention and plenty of connection headroom (~90 of 500) — i.e. the DB is not saturated.

This appears (tentatively) to be a logical within-connection collision, not a capacity problem.

Current implementation

ami/jobs/models.py:335–361JobLogHandler.emit:

def emit(self, record: logging.LogRecord):
    logger.log(record.levelno, self.format(record))
    try:
        self.job.refresh_from_db(fields=["logs"])          # SELECT
        # ...in-memory insert at position 0, dedupe check, truncate to 1000...
        self.job.save(update_fields=["logs"], update_progress=False)  # UPDATE
    except Exception as e:
        logger.error(f"Failed to save logs for job #{self.job.pk}: {e}")

The refresh_from_db exists for a real reason (see PR #1162): each concurrent worker holds its own stale in-memory JobLogs pydantic object, and without the refresh the last save() wins and earlier entries are silently dropped. So the current design is trading concurrency-safety of log entries for concurrency-risk at the connection level.

Likely failure path (hypothesis)

_update_job_progress (same file, ~L179–209) wraps a transaction.atomic() + Job.objects.select_for_update().get(pk=job_id) and then calls job.logger.info(...) inside that block. JobLogHandler.emit then issues:

  1. SELECT (refresh_from_db) on the same connection that already holds a SELECT … FOR UPDATE cursor on the same row
  2. An UPDATE on the same row for logs only

Under ~180 result tasks per 5 minutes × concurrency 8, several workers end up waiting on the FOR UPDATE lock. They stall past Postgres's idle_in_transaction_session_timeout (Postgres terminates the connection server-side), or they stall past Celery's 300s soft_time_limit (soft timeout interrupts the task mid-transaction). The resulting exception is caught by the bare except Exception in emit, which then calls logger.error(...) — which re-enters emit — which issues another query on the same now-broken connection. That produces the "another command is already in progress" cascade.

What still needs to be verified: whether CONN_MAX_AGE > 0 + CONN_HEALTH_CHECKS = False on the celeryworker is causing Django to hand dead server-killed connections to subsequent tasks; and the exact value of idle_in_transaction_session_timeout on the Postgres instances used for async jobs. Both would affect how often this fires in practice.

Consequence

  • Log entries are dropped during bursts (the inner save is the exception site)
  • Connection churn / soft-timeouts make individual process_nats_pipeline_result tasks much slower than they otherwise would be, which compounds queue backlog
  • The bare-except cascade can convert a single DB blip into dozens of error lines per task

Proposals

Ordered by scope.

A. Separate JobLog table (recommended)

class JobLog(models.Model):
    job = models.ForeignKey(Job, on_delete=models.CASCADE, related_name="log_entries")
    level = models.CharField(max_length=10)
    message = models.TextField()
    created_at = models.DateTimeField(auto_now_add=True, db_index=True)

    class Meta:
        indexes = [models.Index(fields=["job", "-created_at"])]

emit becomes a single INSERT:

def emit(self, record):
    logger.log(record.levelno, self.format(record))
    try:
        JobLog.objects.create(
            job_id=self.job.pk,
            level=record.levelname,
            message=self.format(record),
        )
    except Exception as e:
        logger.error(f"Failed to save log for job #{self.job.pk}: {e}")

Why this is the right shape:

  • Pure INSERT — no read-before-write, no nested cursor, no row-level FOR UPDATE contention
  • Natural concurrency: multiple workers inserting into the same child table is a solved pattern (already the case for Detection, Classification, etc.)
  • Removes the arbitrary 1000-entry truncation; paginable / time-range queryable
  • Removes the if msg not in stdout dedupe scan (which is O(n) per write, ~n=1000)
  • Log levels become first-class (filter by ERROR for the "stderr" view; filter by INFO+ for the full view)

Costs:

  • One migration + a compatibility shim
  • Frontend read path changes, if it consumes Job.logs.stdout / .stderr directly

Proposed rollout to avoid a hard cutover:

  1. Add JobLog model + migration
  2. Dual-write: keep the existing JSON path; also JobLog.objects.create(...) in emit
  3. Switch Job.logs.stdout / .stderr to @property that reads the last N rows from JobLog (keeps existing consumers working with no API contract change)
  4. Verify frontend, then stop writing to the JSON field
  5. Migrate existing data + drop the JSON column in a separate release

B. Atomic jsonb_insert (no schema change)

Replace the refresh+mutate+save with a single SQL statement:

UPDATE jobs_job
SET logs = jsonb_set(
    COALESCE(logs, '{}'::jsonb),
    '{stdout}',
    jsonb_insert(COALESCE(logs->'stdout', '[]'::jsonb), '{0}', to_jsonb(%s::text), false)
)
WHERE id = %s;

This eliminates the read-before-write and therefore the nested cursor bug. But:

  • Truncation to 1000 entries becomes awkward (subquery on jsonb_array_length, two statements or a CTE)
  • Dedup (if msg not in stdout) disappears
  • Critically, the UPDATE still targets the same jobs_job row that _update_job_progress holds a SELECT FOR UPDATE on — so we trade a connection-state bug for a straight row-lock wait. Doesn't fix the contention, only the specific cursor error.

C. Redis-buffered + periodic flush

emit does LPUSH to a per-job Redis list; a celerybeat task flushes into the DB every N seconds.

  • Fully decouples log writes from the request/task path
  • Batched writes = much higher throughput
  • Adds eventual-consistency (logs lag real-time by up to N seconds)
  • More moving parts; loss-risk if the key is evicted before flush

Could also be layered on top of (A) as an optimisation later.

Recommendation

Go with A, rolled out dual-write as above. The nested-cursor bug is the visible symptom, but the deeper issue is that log writes share the row that the job-progress path locks. Moving them to their own table removes the coupling entirely and is consistent with how every other high-volume write in the model lives today.

B is tempting if we want something in this week — but because the remaining row-lock contention on jobs_job is a real source of stall, it only partially addresses the symptom set we saw on the ~742-image job.

C is a good follow-up once the write path is a plain insert; at that point it becomes an incremental throughput optimisation rather than a bug fix.

Adjacent quick wins worth considering in the same PR

  • Set CONN_HEALTH_CHECKS = True (Django ≥4.1) on the worker so a server-killed connection is detected before the next query reuses it
  • Move job.logger.* calls in _update_job_progress outside the transaction.atomic() block regardless of which proposal lands — logging shouldn't be inside a held row lock
  • Tighten the bare except Exception in emit to catch a narrower exception class so a broken connection can propagate instead of being swallowed, then re-enter emit again on the same broken connection

Related

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions