Skip to content

Feat/celery integration#527

Open
parinporecha wants to merge 8 commits intoPostHog:mainfrom
parinporecha:feat/celery_integration
Open

Feat/celery integration#527
parinporecha wants to merge 8 commits intoPostHog:mainfrom
parinporecha:feat/celery_integration

Conversation

@parinporecha
Copy link
Copy Markdown
Contributor

Summary

This PR:

  • Adds a new PosthogCeleryIntegration to automatically capture Celery task lifecycle events and exceptions.
  • Propagates PostHog context (distinct_id, session_id, tags) from the task producer to the worker so Celery tasks can be correlated with the originating user/session.
  • Makes Client safer across process forks by reinitializing fork-unsafe client state in child processes.
from posthog.integrations.celery import PosthogCeleryIntegration
integration = PosthogCeleryIntegration(
    capture_exceptions=True,
    capture_task_lifecycle_events=True,
    propagate_context=True
)
integration.instrument()

Context

I saw users asking for advice on how to use PostHog with Celery for error tracking in community questions and realized that there's currently no first-class way to instrument Celery workloads with PostHog.

That leaves a few gaps:

  • background task execution is hard to observe without manual instrumentation.
  • worker-side events are difficult to correlate back to the originating user or request.

This PR addresses those gaps by adding a Celery integration that helps users observe task execution end-to-end out of the box.

The integration takes inspiration from OpenTelemetry's Celery instrumentor and PostHog context propagation is achieved through task headers mirroring Sentry and DataDog's Celery integrations.

While testing this, I found a separate SDK issue: when a Client configured in async mode is inherited across a process fork, the child process inherits a client whose consumer threads no longer exist. In practice, that means worker-side events don't get delivered. This would also be a problem when using the SDK in some Django deployments.

So this PR also adds fork handling to Client by reinitializing its queue, consumers, and other state in the child process via os.register_at_fork.

Changes

New: Celery Integration (posthog/integrations/celery.py)

  • Lifecycle Events: Hooks into Celery signals (task_prerun, task_success, task_failure, etc.) to capture events like celery task started, celery task success etc. Check the docstring in the integration module code for complete list of supported events.
    • Lifecycle events include Celery-specific properties such as task ID, task name, queue, retry count, duration, Celery version etc. Check the docstring for complete set of event properties.
  • Context Propagation:
    • _on_before_task_publish: Injects current PostHog context (distinct_id, session_id, tags) into task headers.
    • _on_task_prerun: Extracts headers in the worker and restores the PostHog context for the duration of the task. This context is exited upon task completion.
    • Any custom events captured inside a task inherit the same propagated PostHog context and Celery task tags.
  • Exception Capture: Automatically captures exceptions from failed tasks.

Refactored: Client Fork Safety posthog/client.py

  • Added _reinit_after_fork method to reset the internal queue and spin up new consumers in a child process.
  • Uses os.register_at_fork (on supported platforms) to automatically call this method, so that the SDK does not drop captured events when used in child processes.

Examples examples/celery_integration.py

  • Added a complete example showing how to configure the integration on both the producer and worker sides and all features in practice.

Tests

  • Added posthog/test/integrations/test_celery_integration.py covering:
    • Signal handlers for all task states.
    • Context propagation (header injection and extraction).
    • Task filtering logic.
    • Exception capture.
  • Added posthog/test/test_client_fork.py covering:
    • Unit tests for client's fork safety
    • End to end tests forking Client and verifying behaviour
  • Manually tested the example against Celery 5.2.1 (2021), 5.3.1, 5.4.0, and 5.6.2 (2026).

Screenshots (created through example script)

  • Celery task lifecycle events and captured Exception -
    image

  • Celery task success event emitted from worker carrying correct distinct ID, session ID set in parent and context tags -
    image

  • Captured exception -
    image

@parinporecha parinporecha force-pushed the feat/celery_integration branch from 9f4fb89 to dd71537 Compare April 21, 2026 13:21
@marandaneto
Copy link
Copy Markdown
Member

follow up from #464

@marandaneto
Copy link
Copy Markdown
Member

@parinporecha

Run ruff format --check .
Would reformat: examples/celery_integration.py
Would reformat: posthog/client.py
Would reformat: posthog/integrations/celery.py
Would reformat: posthog/test/integrations/test_celery_integration.py
Would reformat: posthog/test/test_client_fork.py
5 files would be reformatted, 152 files already formatted

can you also add a changeset entry, please
https://github.com/PostHog/posthog-python/blob/main/RELEASING.md

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 4, 2026

This PR hasn't seen activity in a week! Should it be merged, closed, or further worked on? If you want to keep it open, post a comment or remove the stale label – otherwise this will be closed in another week.

@github-actions github-actions Bot added the stale label May 4, 2026
@parinporecha
Copy link
Copy Markdown
Contributor Author

on it

@github-actions github-actions Bot removed the stale label May 5, 2026
parinporecha and others added 5 commits May 6, 2026 20:45
Co-authored-by: Dustin Byrne <dustinsbyrne@gmail.com>
- make fork safety complete in the client
- add shutdown mechanism to the integration
- better test coverage
- better docs on usage
@parinporecha parinporecha force-pushed the feat/celery_integration branch from dd71537 to 062df48 Compare May 6, 2026 18:47
@parinporecha
Copy link
Copy Markdown
Contributor Author

@marandaneto - ruff formatting has been applied, and changeset entry existed already

@dustinbyrne - added comments to integration and example explicitly asking to reinitialize PostHog client if using a custom flag definition cache provider and reinstrument the integration with that client instance

@parinporecha parinporecha marked this pull request as ready for review May 6, 2026 19:14
@parinporecha parinporecha requested a review from a team as a code owner May 6, 2026 19:14
@greptile-apps
Copy link
Copy Markdown
Contributor

greptile-apps Bot commented May 6, 2026

Comments Outside Diff (1)

  1. posthog/test/integrations/test_celery_integration.py, line 874-912 (link)

    P2 Duplicated fake_signals setup across many tests

    The same seven-signal SimpleNamespace / fake_celery block is copy-pasted verbatim in at least six test methods (test_instrument_is_idempotent, test_instrument_and_uninstrument_connect_signals, test_shutdown_keeps_atexit_registration_when_flush_fails, test_reinstrument_after_shutdown_allows_shutdown_again, and others). Extracting this into a setUp helper or a @parameterized.expand reduces the duplication and makes signal-count assertions a natural loop rather than repeated assertEqual calls.

    Prompt To Fix With AI
    This is a comment left during a code review.
    Path: posthog/test/integrations/test_celery_integration.py
    Line: 874-912
    
    Comment:
    **Duplicated `fake_signals` setup across many tests**
    
    The same seven-signal `SimpleNamespace` / `fake_celery` block is copy-pasted verbatim in at least six test methods (`test_instrument_is_idempotent`, `test_instrument_and_uninstrument_connect_signals`, `test_shutdown_keeps_atexit_registration_when_flush_fails`, `test_reinstrument_after_shutdown_allows_shutdown_again`, and others). Extracting this into a `setUp` helper or a `@parameterized.expand` reduces the duplication and makes signal-count assertions a natural loop rather than repeated `assertEqual` calls.
    
    How can I resolve this? If you propose a fix, please make it concise.

    Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!

Prompt To Fix All With AI
Fix the following 2 code review issues. Work through them one at a time, proposing concise fixes.

---

### Issue 1 of 2
posthog/integrations/celery.py:396-399
`None`-check missing before `json.loads` — when `CONTEXT_TAGS_HEADER` is absent (the common case), `headers.get(...)` returns `None` and `json.loads(None)` raises a `TypeError` that is immediately swallowed by the broad `except Exception`. This makes every untagged task invocation pay the cost of raising and catching an exception in normal operation.

```suggestion
        raw = headers.get(CONTEXT_TAGS_HEADER)
        if not raw:
            return {}
        try:
            parsed = json.loads(raw)
        except Exception:
            return {}
```

### Issue 2 of 2
posthog/test/integrations/test_celery_integration.py:874-912
**Duplicated `fake_signals` setup across many tests**

The same seven-signal `SimpleNamespace` / `fake_celery` block is copy-pasted verbatim in at least six test methods (`test_instrument_is_idempotent`, `test_instrument_and_uninstrument_connect_signals`, `test_shutdown_keeps_atexit_registration_when_flush_fails`, `test_reinstrument_after_shutdown_allows_shutdown_again`, and others). Extracting this into a `setUp` helper or a `@parameterized.expand` reduces the duplication and makes signal-count assertions a natural loop rather than repeated `assertEqual` calls.

Reviews (1): Last reviewed commit: "Make comments clearer and add part about..." | Re-trigger Greptile

Comment thread posthog/integrations/celery.py
@parinporecha parinporecha force-pushed the feat/celery_integration branch from beacfbf to 1ad94df Compare May 6, 2026 20:11
@marandaneto
Copy link
Copy Markdown
Member

@parinporecha CI is still unhappy

Run mypy --no-site-packages --config-file mypy.ini . | mypy-baseline filter
posthog/client.py:1534: error: Incompatible types in assignment (expression has type "Poller", variable has type "None") [assignment]
posthog/client.py:1538: error: "None" has no attribute "start" [attr-defined]
posthog/integrations/celery.py:397: error: Argument 1 to "loads" has incompatible type "Any | None"; expected "str | bytes | bytearray" [arg-type]
Found 23 errors in 5 files (checked 106 source files)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants