Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 56 additions & 0 deletions rclcpp/test/rclcpp/executors/test_events_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -615,3 +615,59 @@ TEST_F(TestEventsExecutor, waitable_with_timer)

EXPECT_TRUE(waitable->timerTriggeredWaitable());
}

// Regression test for https://github.com/ros2/rmw_cyclonedds/pull/584
// EventsExecutor must receive transient-local cached messages
// when the subscriber uses KEEP_ALL history.
//
// Bug: rmw_cyclonedds reports depth=0 for KEEP_ALL, and
// rmw_subscription_set_on_new_message_callback clips unread_count to depth
// via std::min(unread_count, 0) == 0, causing the executor to never take the
// cached message.
//
// This test publishes before the subscriber exists (transient-local caching),
// then subscribes with KEEP_ALL + transient-local via EventsExecutor and
// verifies the cached message is delivered.
TEST_F(TestEventsExecutor, keep_all_transient_local_receives_cached_message)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(i think) although this test only runs on the default rnw implementation (or rmw implementation in user's configuration), nightly CI matrix (ci.ros2.org) does run per-RMW jobs to catch this failure.

{
// Publisher node: transient-local, reliable, depth=1 (standard /tf_static pattern)
auto pub_node = std::make_shared<rclcpp::Node>("pub_node");
rclcpp::QoS pub_qos(1);
pub_qos.transient_local().reliable();
auto publisher = pub_node->create_publisher<test_msgs::msg::Empty>("tl_test_topic", pub_qos);

// Publish BEFORE subscriber exists — DDS caches this for late joiners
publisher->publish(test_msgs::msg::Empty{});
std::this_thread::sleep_for(500ms);

// Subscriber node: KEEP_ALL + transient-local (what rosbag2 adapt_request_to_offers produces)
auto sub_node = std::make_shared<rclcpp::Node>("sub_node");
rclcpp::QoS sub_qos(rclcpp::KeepAll{});
sub_qos.transient_local().reliable();

std::atomic<int> received{0};
auto subscription = sub_node->create_subscription<test_msgs::msg::Empty>(
"tl_test_topic", sub_qos,
[&received](test_msgs::msg::Empty::ConstSharedPtr) {
received.fetch_add(1);
});

EventsExecutor executor;
executor.add_node(sub_node);

auto start = std::chrono::steady_clock::now();
while (received.load() == 0 &&
(std::chrono::steady_clock::now() - start) < 5s)
{
executor.spin_some(100ms);
}
executor.remove_node(sub_node);

// With the rmw fix, the cached message must be received.
// Without the fix, received == 0 because the callback fires with 0 events.
EXPECT_GE(received.load(), 1)
<< "EventsExecutor did not receive the transient-local cached message "
"with KEEP_ALL QoS. This indicates a regression in "
"rmw_subscription_set_on_new_message_callback where unread_count "
"is incorrectly clipped to depth=0 for KEEP_ALL history.";
}