Skip to content

fix: make RabbitMQ exchanges durable for message durability#17

Open
thebookofiz wants to merge 17 commits into0xIntuition:mainfrom
thebookofiz:fix/rabbitmq-durable-exchanges
Open

fix: make RabbitMQ exchanges durable for message durability#17
thebookofiz wants to merge 17 commits into0xIntuition:mainfrom
thebookofiz:fix/rabbitmq-durable-exchanges

Conversation

@thebookofiz
Copy link
Copy Markdown

See internal PR #1

simonas-notcat and others added 17 commits November 11, 2025 16:39
Replace Redis with RabbitMQ for message queuing:
- Add RabbitMQ service (ports 18101 AMQP, 18102 Management UI)
- Add RabbitMQ Prometheus exporter (port 18401)
- Remove Redis, Redis Commander, and Redis exporter services
- Update SurrealDB port from 18102 to 18103
- Update PostgreSQL exporter port from 18103 to 18104
- Replace REDIS_URL with RABBITMQ_URL in all service configs
- Remove REDIS_STREAMS environment variable (to be handled by queues)
- Update all service dependencies from redis to rabbitmq

This is Step 1 of the Redis → RabbitMQ migration plan.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
Replace Redis streams with RabbitMQ direct exchanges for blockchain event distribution. Each event type now publishes to its own exchange with routing key (atom_created, triple_created, deposited, redeemed, share_price_changed).

Migration Step 2 complete - rindexer successfully publishing to RabbitMQ.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
Replace Redis Streams consumer implementation with RabbitMQ consumer using lapin client. This completes the migration of the surreal-writer service to consume blockchain events from RabbitMQ exchanges instead of Redis streams.

Key changes:
- Replace redis dependency with lapin for RabbitMQ connectivity
- Implement RabbitMQConsumer with direct exchange bindings
- Update configuration for RabbitMQ URL, exchanges, and queue settings
- Refactor pipeline to use RabbitMQ message acknowledgment
- Update Docker Compose configuration with RabbitMQ environment variables
- Fix clippy warnings for inline format args

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
Complete migration of postgres-writer service from Redis Streams to RabbitMQ messaging:

**Core Changes:**
- Replaced Redis Stream consumer with RabbitMQ consumer using lapin client
- Implemented multi-queue consumption with per-exchange routing
- Added RabbitMQ publisher for analytics term updates
- Updated configuration to use RabbitMQ URLs, exchanges, and queue prefixes

**Consumer Implementation:**
- Created RabbitMQConsumer with support for multiple exchanges
- Implemented proper queue declaration, binding, and consumption
- Added ACK/NACK message handling with requeue logic
- Integrated prefetch limits and graceful shutdown

**Publisher Implementation:**
- Created RabbitMQPublisher for term update notifications
- Added batch publishing support for efficiency
- Integrated metrics tracking for publish operations

**Testing:**
- Migrated test harness from Redis to RabbitMQ testcontainers
- Implemented smart event routing in publish_events() helper
- Updated all integration tests to work with RabbitMQ
- All 14 integration tests passing

**Configuration Updates:**
- Removed Redis-specific environment variables
- Added RabbitMQ connection settings
- Updated exchanges, queue_prefix, and routing key configuration
- Maintained backward-compatible defaults

**Code Quality:**
- Fixed clippy warnings (dead_code for connection field)
- Applied cargo fmt formatting throughout
- Maintained proper error handling and retry semantics

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
Completes Step 4 of the RabbitMQ migration plan.

Major changes:
- Replaced Redis stream consumer with RabbitMQ consumer (lapin)
- Migrated from Redis Streams to RabbitMQ exchanges for event consumption
- Switched from RabbitMQ to in-process mpsc channels for term updates (analytics worker)
- Updated configuration to use RABBITMQ_URL and EXCHANGES
- Removed Redis dependencies from Cargo.toml

Key architectural decisions:
- External events (blockchain) → RabbitMQ (persistence, reliability, decoupling)
- Internal events (term updates) → mpsc (performance, simplicity, type-safety)

Bug fixes:
- Fixed RabbitMQ channel errors in get_queue_depth() by returning full queue names ("postgres.atom_created") instead of just exchange names ("atom_created")
- This prevented "NOT_FOUND" errors that were closing channels and causing "invalid channel state: Closed" warnings

The postgres-writer now successfully processes blockchain events from RabbitMQ with proper queue depth monitoring and no channel state errors.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
- Removed obsolete analytics config fields from test harness
  - analytics_exchange
  - analytics_routing_key
  - max_messages_per_second
  - min_batch_interval_ms
- Updated all EventProcessingPipeline::new() calls to pass None for term_update_tx
  - Tests don't need the analytics worker mpsc channel

All 19 integration tests now passing:
- 5 basic tests
- 14 ignored integration tests

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
Update Prometheus configuration and Grafana dashboards to monitor RabbitMQ
instead of Redis:

- Replace Redis exporter with RabbitMQ exporter in Prometheus config
- Update postgres-writer metrics test helpers to use RabbitMQ metrics
- Replace Redis Streams dashboard with comprehensive RabbitMQ dashboard
- Update postgres-writer dashboard health metric (redis → rabbitmq)

New RabbitMQ dashboard includes:
- Queue depth and message consumption rates per queue
- Ack/nack rates with color coding
- Server-side queue metrics (ready, unacknowledged)
- Consumer counts and connection/channel metrics
- Health status gauges for RabbitMQ server and writers

This completes Step 6 of the RabbitMQ migration plan.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
Address PR review feedback:
- Attach acker to all array events to prevent data loss on intermediate failures
- Fix queue depth query to use correct durable flag matching queue configuration

These changes ensure proper message redelivery on any failure and accurate
queue monitoring.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
- Set exchanges to durable: true to match queue configuration and prevent zombie queues after RabbitMQ restarts
- Enhanced reconnect logic to re-declare exchanges, queues, and re-bind queue-to-exchange relationships
- Updated Grafana dashboard labels from Redis to RabbitMQ for accurate monitoring
- Added monitoring methods (get_queue_names, get_queue_depth) to surreal-writer for observability parity

These changes address critical issues identified in PR review that could lead to data loss or service disruption during RabbitMQ downtime scenarios.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit addresses the PR feedback comments:

1. Fix get_queue_depth() inconsistency:
   - Updated surreal-writer to return Result<u32> instead of Result<()>
   - Now returns actual message count instead of discarding it
   - Matches postgres-writer implementation

2. Fix clippy warnings in test harness:
   - Remove unnecessary .default() call on RabbitMq unit struct
   - Use inline format args for routing_key variables

These changes ensure:
- Consistent queue depth reporting across both writers
- Cleaner code that passes all clippy lints
- GitHub workflow tests now pass

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
…sing

When RabbitMQ delivers an array of events, only attach the Acker to the last
StreamMessage to prevent "already used Acker" errors. This ensures one ACK/NACK
per delivery while maintaining individual event processing semantics.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
…ge distribution

Adds a weighted round-robin algorithm to RabbitMQ consumer to prevent queue starvation.
Previously, the consumer would exhaust the first queue before moving to the next,
causing high-volume queues to starve low-volume ones.

Changes:
- Weighted round-robin with 100-message chunks per queue per cycle
- Rotating start index across consume_batch calls for fairness
- Comprehensive integration tests verifying starvation prevention and rotation
- Applied to both postgres-writer and surreal-writer services
- Updated Docker Compose project name to intuition-rindexer
- Added RABBITMQ_URL to .env for consistency
- Fixed test harness exchange durability to match production (durable: false)

Test coverage:
- test_round_robin_prevents_queue_starvation: Verifies multiple queues serviced fairly
- test_round_robin_rotation: Verifies start index rotates across batches

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
- Changed exchange declaration from durable: false to durable: true
- This ensures exchanges survive RabbitMQ broker restarts
- Added documentation note about rindexer delivery_mode limitation

Note: For full message persistence during broker restarts, rindexer must
also publish messages with delivery_mode=2 (persistent). See issue 0xIntuition#15
for details on the rindexer configuration limitation.
- Upgrade next from 15.1.3 to 15.5.12 (critical security fixes)
- Fix glob ReDoS vulnerability (GHSA-5j98-mcp5-4vw2)
- Fix minimatch ReDoS vulnerabilities (multiple GHSA advisories)

Resolves:
- GHSA-3h52-269p-cp9r (minimatch ReDoS)
- GHSA-7r86-cg39-jmmj (minimatch ReDoS)
- GHSA-23c5-xmqv-rm74 (minimatch ReDoS)
- GHSA-5j98-mcp5-4vw2 (glob CLI command injection)
- Multiple critical Next.js vulnerabilities (GHSA-3ppc, GHSA-67rr, etc.)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants