Skip to content

perf(jobs): defer aggregate count refresh in process_nats_pipeline_result (5-10x speedup expected) #1288

@mihow

Description

@mihow

Summary

process_nats_pipeline_result averages 20-33s per task in production (p99 78s, occasional 300s SoftTimeLimit hits). About 98% of that time is in DB, and Sentry trace data shows ~83% of all task wall time is consumed by three denormalized DISTINCT COUNT queries triggered by Event.update_calculated_fields() and Deployment.update_calculated_fields() running on every result message.

Each result message → save Detection / Classification / Occurrence rows → cascading saves trigger update_calculated_fields_for_events → that runs aggregate counts over the entire deployment / event. With thousands of result messages per job, the same expensive aggregate is recomputed thousands of times.

The actual writes (UPDATE main_sourceimage ~11ms, INSERT jobs_joblog ~7ms) are cheap. The latency is entirely the count refresh cascade.

Observed (last 1h prod sample, n=11,662 task transactions)

Query shape avg p95 sum (% of all task time)
SELECT COUNT(*) FROM (SELECT DISTINCT main_detection ... INNER JOIN sourceimage/occurrence/taxon WHERE deployment_id = ... + project filters) 13.7s 31s ~50%
Same shape, WHERE event_id = ... 7.0s ~26%
SELECT COUNT(*) FROM (SELECT DISTINCT main_sourceimage ... WHERE event_id = ...) 1.9s ~7%

p50 task latency 12.6s, p95 50.9s, p99 80.7s. The 300s tail = SoftTimeLimitExceeded on these queries → NATS redeliveries → cascade.

Root cause (verified locally)

  • ami/main/models.py:1217 Event.get_detections_count and :1233 Event.get_occurrences_count — both apply build_occurrence_default_filters_q and run .distinct().count() over Detection joined to SourceImage / Occurrence / Taxon with the project's include/exclude-taxa M2Ms
  • ami/main/models.py:3080-3081 — Deployment equivalent
  • Both fire from Event.update_calculated_fields() (models.py:1307-1309) and the matching Deployment update_calculated_fields(), which are called from update_calculated_fields_for_events invoked per result message inside ami/ml/models/pipeline.py:save_results

Proposed fixes (ordered by leverage)

1. Defer the aggregate refresh

Stop running Event.update_calculated_fields + Deployment.update_calculated_fields per result message. Options:

  • (a) Defer to job completion: run once at the end of the job (or once per N result messages) instead of on every single one. Simplest, biggest win.
  • (b) Batched periodic task: a celerybeat job that refreshes calculated fields for events touched in the last N minutes. Decouples write path from aggregate path entirely.
  • (c) Incremental update on save: compute counts incrementally via Django signals on Detection / Occurrence save, rather than recomputing from scratch.

(a) is the smallest change and would eliminate the bottleneck for active jobs. (b) generalizes to other write-heavy paths.

2. Rewrite the DISTINCT COUNT queries

Independent of (1), the queries themselves are doing more than necessary:

-- current
SELECT COUNT(*) FROM (
  SELECT DISTINCT main_detection.* FROM main_detection
  INNER JOIN main_sourceimage ... + filters
)

DISTINCT over all Detection columns is wasteful when only IDs matter. Equivalent:

# Django ORM
Detection.objects.filter(...).values("pk").distinct().count()
# Or
Detection.objects.filter(...).aggregate(c=Count("pk", distinct=True))["c"]
# Or for very large sets, EXISTS-based reformulation

Likely 5-10x speedup on the queries themselves, even before deferring.

3. Verify pg indexes

Check planner picks the right indexes on:

  • main_sourceimage(deployment_id), (event_id)
  • main_detection(source_image_id)
  • main_occurrence(deployment_id), (event_id)

main_taxon.parents_json @> %s (jsonb-contains) needs a GIN index to be cheap — verify.

Acceptance criteria

  • process_nats_pipeline_result p50 < 2s, p99 < 10s under load (currently p50 12.6s, p99 80.7s)
  • No SoftTimeLimitExceeded exceptions for this task during normal job execution
  • No NATS redeliveries triggered by result-handler timeouts during single-job execution
  • Two concurrent jobs (e.g. similar-size deployments) can run without one starving the other on ml_results queue
  • Event.detections_count / occurrences_count and Deployment equivalents stay accurate (verify via comparison test before/after)

What we still need to verify

  • Whether (a) deferring fully eliminates the per-message aggregate cost, or whether something else inside save_results (per-image source_image.save() loop at pipeline.py:987-988) is also material. NR has no Postgres span instrumentation today (see #separate-ticket on fixing that), so the relative weight of writes-vs-aggregates can only be confirmed via Sentry, which it has been. But other paths inside save_results may surface once the aggregate cost is removed.
  • The actual planner output for the three slow queries on production data — explain the queries to confirm index usage.

Related

  • Sentry issue AMI-PLATFORM-API-V46 (26k events, 9d) — the dominant slow query
  • Sentry issue AMI-PLATFORM-API-V4A — the by-event variant
  • Sentry issue AMI-PLATFORM-API-TYH (124 events) — SoftTimeLimitExceeded (the 300s tail)
  • Internal investigation notes (private): the 30-minute drilldown that produced these numbers — Sentry trace IDs cd503d3d7ec74e9fbcbf8730b9670564 (300s outlier), cd58dfe64bd44fd2afc4fa27ebc81625, c068b56703cc46dcadec08b0ebbfb55d

Sample trace IDs (Sentry)

  • cd503d3d7ec74e9fbcbf8730b9670564 — 300s SoftTimeLimit outlier
  • cd58dfe64bd44fd2afc4fa27ebc81625 — heavy mass example
  • c068b56703cc46dcadec08b0ebbfb55d — 54s with the N+1 placeholder lookup

Effort + risk

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions