diff --git a/rclcpp/test/rclcpp/executors/test_events_executor.cpp b/rclcpp/test/rclcpp/executors/test_events_executor.cpp index 447567047d..d0bbdd039e 100644 --- a/rclcpp/test/rclcpp/executors/test_events_executor.cpp +++ b/rclcpp/test/rclcpp/executors/test_events_executor.cpp @@ -615,3 +615,59 @@ TEST_F(TestEventsExecutor, waitable_with_timer) EXPECT_TRUE(waitable->timerTriggeredWaitable()); } + +// Regression test for https://github.com/ros2/rmw_cyclonedds/pull/584 +// EventsExecutor must receive transient-local cached messages +// when the subscriber uses KEEP_ALL history. +// +// Bug: rmw_cyclonedds reports depth=0 for KEEP_ALL, and +// rmw_subscription_set_on_new_message_callback clips unread_count to depth +// via std::min(unread_count, 0) == 0, causing the executor to never take the +// cached message. +// +// This test publishes before the subscriber exists (transient-local caching), +// then subscribes with KEEP_ALL + transient-local via EventsExecutor and +// verifies the cached message is delivered. +TEST_F(TestEventsExecutor, keep_all_transient_local_receives_cached_message) +{ + // Publisher node: transient-local, reliable, depth=1 (standard /tf_static pattern) + auto pub_node = std::make_shared("pub_node"); + rclcpp::QoS pub_qos(1); + pub_qos.transient_local().reliable(); + auto publisher = pub_node->create_publisher("tl_test_topic", pub_qos); + + // Publish BEFORE subscriber exists — DDS caches this for late joiners + publisher->publish(test_msgs::msg::Empty{}); + std::this_thread::sleep_for(500ms); + + // Subscriber node: KEEP_ALL + transient-local (what rosbag2 adapt_request_to_offers produces) + auto sub_node = std::make_shared("sub_node"); + rclcpp::QoS sub_qos(rclcpp::KeepAll{}); + sub_qos.transient_local().reliable(); + + std::atomic received{0}; + auto subscription = sub_node->create_subscription( + "tl_test_topic", sub_qos, + [&received](test_msgs::msg::Empty::ConstSharedPtr) { + received.fetch_add(1); + }); + + EventsExecutor executor; + executor.add_node(sub_node); + + auto start = std::chrono::steady_clock::now(); + while (received.load() == 0 && + (std::chrono::steady_clock::now() - start) < 5s) + { + executor.spin_some(100ms); + } + executor.remove_node(sub_node); + + // With the rmw fix, the cached message must be received. + // Without the fix, received == 0 because the callback fires with 0 events. + EXPECT_GE(received.load(), 1) + << "EventsExecutor did not receive the transient-local cached message " + "with KEEP_ALL QoS. This indicates a regression in " + "rmw_subscription_set_on_new_message_callback where unread_count " + "is incorrectly clipped to depth=0 for KEEP_ALL history."; +}