fix: make RabbitMQ exchanges durable for message durability#17
Open
thebookofiz wants to merge 17 commits into0xIntuition:mainfrom
Open
fix: make RabbitMQ exchanges durable for message durability#17thebookofiz wants to merge 17 commits into0xIntuition:mainfrom
thebookofiz wants to merge 17 commits into0xIntuition:mainfrom
Conversation
Replace Redis with RabbitMQ for message queuing: - Add RabbitMQ service (ports 18101 AMQP, 18102 Management UI) - Add RabbitMQ Prometheus exporter (port 18401) - Remove Redis, Redis Commander, and Redis exporter services - Update SurrealDB port from 18102 to 18103 - Update PostgreSQL exporter port from 18103 to 18104 - Replace REDIS_URL with RABBITMQ_URL in all service configs - Remove REDIS_STREAMS environment variable (to be handled by queues) - Update all service dependencies from redis to rabbitmq This is Step 1 of the Redis → RabbitMQ migration plan. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
Replace Redis streams with RabbitMQ direct exchanges for blockchain event distribution. Each event type now publishes to its own exchange with routing key (atom_created, triple_created, deposited, redeemed, share_price_changed). Migration Step 2 complete - rindexer successfully publishing to RabbitMQ. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
Replace Redis Streams consumer implementation with RabbitMQ consumer using lapin client. This completes the migration of the surreal-writer service to consume blockchain events from RabbitMQ exchanges instead of Redis streams. Key changes: - Replace redis dependency with lapin for RabbitMQ connectivity - Implement RabbitMQConsumer with direct exchange bindings - Update configuration for RabbitMQ URL, exchanges, and queue settings - Refactor pipeline to use RabbitMQ message acknowledgment - Update Docker Compose configuration with RabbitMQ environment variables - Fix clippy warnings for inline format args 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
Complete migration of postgres-writer service from Redis Streams to RabbitMQ messaging: **Core Changes:** - Replaced Redis Stream consumer with RabbitMQ consumer using lapin client - Implemented multi-queue consumption with per-exchange routing - Added RabbitMQ publisher for analytics term updates - Updated configuration to use RabbitMQ URLs, exchanges, and queue prefixes **Consumer Implementation:** - Created RabbitMQConsumer with support for multiple exchanges - Implemented proper queue declaration, binding, and consumption - Added ACK/NACK message handling with requeue logic - Integrated prefetch limits and graceful shutdown **Publisher Implementation:** - Created RabbitMQPublisher for term update notifications - Added batch publishing support for efficiency - Integrated metrics tracking for publish operations **Testing:** - Migrated test harness from Redis to RabbitMQ testcontainers - Implemented smart event routing in publish_events() helper - Updated all integration tests to work with RabbitMQ - All 14 integration tests passing **Configuration Updates:** - Removed Redis-specific environment variables - Added RabbitMQ connection settings - Updated exchanges, queue_prefix, and routing key configuration - Maintained backward-compatible defaults **Code Quality:** - Fixed clippy warnings (dead_code for connection field) - Applied cargo fmt formatting throughout - Maintained proper error handling and retry semantics 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
Completes Step 4 of the RabbitMQ migration plan.
Major changes:
- Replaced Redis stream consumer with RabbitMQ consumer (lapin)
- Migrated from Redis Streams to RabbitMQ exchanges for event consumption
- Switched from RabbitMQ to in-process mpsc channels for term updates (analytics worker)
- Updated configuration to use RABBITMQ_URL and EXCHANGES
- Removed Redis dependencies from Cargo.toml
Key architectural decisions:
- External events (blockchain) → RabbitMQ (persistence, reliability, decoupling)
- Internal events (term updates) → mpsc (performance, simplicity, type-safety)
Bug fixes:
- Fixed RabbitMQ channel errors in get_queue_depth() by returning full queue names ("postgres.atom_created") instead of just exchange names ("atom_created")
- This prevented "NOT_FOUND" errors that were closing channels and causing "invalid channel state: Closed" warnings
The postgres-writer now successfully processes blockchain events from RabbitMQ with proper queue depth monitoring and no channel state errors.
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
- Removed obsolete analytics config fields from test harness - analytics_exchange - analytics_routing_key - max_messages_per_second - min_batch_interval_ms - Updated all EventProcessingPipeline::new() calls to pass None for term_update_tx - Tests don't need the analytics worker mpsc channel All 19 integration tests now passing: - 5 basic tests - 14 ignored integration tests 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
Update Prometheus configuration and Grafana dashboards to monitor RabbitMQ instead of Redis: - Replace Redis exporter with RabbitMQ exporter in Prometheus config - Update postgres-writer metrics test helpers to use RabbitMQ metrics - Replace Redis Streams dashboard with comprehensive RabbitMQ dashboard - Update postgres-writer dashboard health metric (redis → rabbitmq) New RabbitMQ dashboard includes: - Queue depth and message consumption rates per queue - Ack/nack rates with color coding - Server-side queue metrics (ready, unacknowledged) - Consumer counts and connection/channel metrics - Health status gauges for RabbitMQ server and writers This completes Step 6 of the RabbitMQ migration plan. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
Address PR review feedback: - Attach acker to all array events to prevent data loss on intermediate failures - Fix queue depth query to use correct durable flag matching queue configuration These changes ensure proper message redelivery on any failure and accurate queue monitoring. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
- Set exchanges to durable: true to match queue configuration and prevent zombie queues after RabbitMQ restarts - Enhanced reconnect logic to re-declare exchanges, queues, and re-bind queue-to-exchange relationships - Updated Grafana dashboard labels from Redis to RabbitMQ for accurate monitoring - Added monitoring methods (get_queue_names, get_queue_depth) to surreal-writer for observability parity These changes address critical issues identified in PR review that could lead to data loss or service disruption during RabbitMQ downtime scenarios. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
This commit addresses the PR feedback comments: 1. Fix get_queue_depth() inconsistency: - Updated surreal-writer to return Result<u32> instead of Result<()> - Now returns actual message count instead of discarding it - Matches postgres-writer implementation 2. Fix clippy warnings in test harness: - Remove unnecessary .default() call on RabbitMq unit struct - Use inline format args for routing_key variables These changes ensure: - Consistent queue depth reporting across both writers - Cleaner code that passes all clippy lints - GitHub workflow tests now pass 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
…sing When RabbitMQ delivers an array of events, only attach the Acker to the last StreamMessage to prevent "already used Acker" errors. This ensures one ACK/NACK per delivery while maintaining individual event processing semantics. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
…ge distribution Adds a weighted round-robin algorithm to RabbitMQ consumer to prevent queue starvation. Previously, the consumer would exhaust the first queue before moving to the next, causing high-volume queues to starve low-volume ones. Changes: - Weighted round-robin with 100-message chunks per queue per cycle - Rotating start index across consume_batch calls for fairness - Comprehensive integration tests verifying starvation prevention and rotation - Applied to both postgres-writer and surreal-writer services - Updated Docker Compose project name to intuition-rindexer - Added RABBITMQ_URL to .env for consistency - Fixed test harness exchange durability to match production (durable: false) Test coverage: - test_round_robin_prevents_queue_starvation: Verifies multiple queues serviced fairly - test_round_robin_rotation: Verifies start index rotates across batches 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
- Changed exchange declaration from durable: false to durable: true - This ensures exchanges survive RabbitMQ broker restarts - Added documentation note about rindexer delivery_mode limitation Note: For full message persistence during broker restarts, rindexer must also publish messages with delivery_mode=2 (persistent). See issue 0xIntuition#15 for details on the rindexer configuration limitation.
- Upgrade next from 15.1.3 to 15.5.12 (critical security fixes) - Fix glob ReDoS vulnerability (GHSA-5j98-mcp5-4vw2) - Fix minimatch ReDoS vulnerabilities (multiple GHSA advisories) Resolves: - GHSA-3h52-269p-cp9r (minimatch ReDoS) - GHSA-7r86-cg39-jmmj (minimatch ReDoS) - GHSA-23c5-xmqv-rm74 (minimatch ReDoS) - GHSA-5j98-mcp5-4vw2 (glob CLI command injection) - Multiple critical Next.js vulnerabilities (GHSA-3ppc, GHSA-67rr, etc.)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
See internal PR #1