Test_robot_pose() in test_nav2_robot_adapter.py of free_fleet fails (#682)

Posted by @ptientho:

Hi,

I have a question on how to test the function test_robot_pose() in test_nav2_robot_adapter.py of free_fleet adapter.

I follow the link to implement my own fleet adapter with Zenoh, which is similar to the test script.

I’m not sure what causes the function to fail, and the result is below.

______________________ TestFastbotAdapter.test_robot_pose ______________________
test/test_fastbot_adapter.py:63: in test_robot_pose
    assert transform is not None
E   AssertionError: assert None is not None

the method returns None so that transform is None

Here is my code, which takes Nav2 fleet adapter as an example.

class Nav2TfHandler:
    def __init__(self, robot_name, zenoh_session, tf_buffer, node, robot_frame='base_footprint', map_frame='map'):
        self.robot_name = robot_name
        self.zenoh_session = zenoh_session
        self.tf_buffer = tf_buffer
        self.robot_frame = robot_frame
        self.map_frame = map_frame
        self.node = node

        
        def _tf_callback(sample: zenoh.Sample):
            try:
                transform = TFMessage.deserialize(sample.payload.to_bytes())
                
            except Exception as e:
                self.node.get_logger().debug(
                    f'Failed to deserialize TF payload: {type(e)}: {e}'
                )
                return None
            for zt in transform.transforms:
                t = transform_stamped_to_ros2_msg(zt)
                # t.header.frame_id = namespacify(zt.header.frame_id,
                #                                 self.robot_name)
                # t.child_frame_id = namespacify(zt.child_frame_id,
                #                                self.robot_name)
                # t.header.frame_id = namespacify(zt.header.frame_id,
                #                                 '')
                # t.child_frame_id = namespacify(zt.child_frame_id,
                #                                '')
                print(f'Received TF from {t.header.frame_id} to {t.child_frame_id} at time {t.header.stamp}')

                self.tf_buffer.set_transform(
                    t, f'{self.robot_name}_TfListener')
        
        def _duration_callback(sample: zenoh.Sample):
            feedback = NavigateToPose_Feedback.deserialize(sample.payload.to_bytes())
            self.time_remaining = feedback.estimated_time_remaining
            print(
                f'Robot [{self.robot_name}] time remaining: {self.time_remaining}')
    
        self.tf_sub = self.zenoh_session.declare_subscriber(
            namespacify('tf', self.robot_name),
            _tf_callback
        )
        
        self.time_sub = self.zenoh_session.declare_subscriber(
            namespacify('navigate_to_pose/_action/feedback', self.robot_name),
            _duration_callback
        )
        
        
    def get_transform(self) -> TransformStamped | None:
        # This function should return a TransformStamped object or None
        # if the transform is not available.
        try:
            transform  = self.tf_buffer.lookup_transform(
                #namespacify(self.map_frame, self.robot_name),
                self.map_frame,
                #namespacify(self.robot_frame, ''),
                self.robot_frame,
                rclpy.time.Time(),
            )
            return transform
        except Exception as e:
            self.node.get_logger().info(
                f'Unabled to get transform between {self.robot_frame} and {self.map_frame}: {type(e)}: {e}'
            )
        return None

I suspect that the callback functions - _tf_callback() and _duration_callback - are never executed.

Not sure what I did wrong here. If anyone knows, please give me idea.

Thanks

Chosen answer

Answer chosen by @aaronchongth at 2025-05-09T07:11:02Z.
Answered by @ptientho:

I found the cause. It’s that because get_transform() takes sometime to receive the transform message. After I used nav2_get_tf.py script you provided, I tested several times and it takes around 5 seconds to receive the transform.

Posted by @aaronchongth:

Hi @ptientho, thanks for sharing your code changes. At first glance I don’t see anything wrong with it.

But it’d be hard to debug through the test itself. I’d suggest using this script, free_fleet/free_fleet_examples/free_fleet_examples/nav2_get_tf.py at main · open-rmf/free_fleet · GitHub for testing. This way at least you’ll be able to see if the printouts print(f'Received TF from {t.header.frame_id} to {t.child_frame_id} at time {t.header.stamp}') are showing up.

Another case might just be that the map or robot frame names are different from our usual nav2 test cases. You can run a quick ros2 run tf2_tools view_frames for your robot to see if the frames are named accurately (I’ve seen variations to the robot frame for different setups, base_frame or base_footprint, etc). In any case being able to check the logs would be helpful

One last possibility would be that your Zenoh comms has broken down. Even though bridges can work peer-to-peer, the best solution for discovery and connectivity is still to always have a router in the network.

Posted by @ptientho:

I found the cause. It’s that because get_transform() takes sometime to receive the transform message. After I used nav2_get_tf.py script you provided, I tested several times and it takes around 5 seconds to receive the transform.


This is the chosen answer.

Posted by @aaronchongth:

Interesting, are you using a Zenoh router? The last time I’ve experienced this amount of delay, was when I was still trying things out peer-to-peer, without a router.

Posted by @ptientho:

Yes, I used Zenoh router.

Posted by @aaronchongth:

Can you perhaps share your zenoh config file for your zenoh bridge?

for zt in transform.transforms:
                t = transform_stamped_to_ros2_msg(zt)
                # t.header.frame_id = namespacify(zt.header.frame_id,
                #                                 self.robot_name)
                # t.child_frame_id = namespacify(zt.child_frame_id,
                #                                self.robot_name)
                # t.header.frame_id = namespacify(zt.header.frame_id,
                #                                 '')
                # t.child_frame_id = namespacify(zt.child_frame_id,
                #                                '')
                print(f'Received TF from {t.header.frame_id} to {t.child_frame_id} at time {t.header.stamp}')

how often does this printout happen? I’m wondering if it is all due to packet loss, or general networking slowness, or your topic has been downsampled

what happens if you send a navigate_to_pose command? does it also take that long to start the task too? This is to check if the issue is happening in both directions

edit: maybe it’s nothing too, in general it takes my setup 2 to 3 seconds to get the transform when launched too, since it needs to collect TFs from various sources before being able to compose the transform that we are looking for.


Edited by @aaronchongth at 2025-05-09T02:50:04Z

Posted by @ptientho:

Screencast from 05-08-2025 11:26:07 PM.webm

According to the video, this is the speed of receiving the transformations. Pretty normal. Also, sending navigate_to_pose command happens immediately after requesting.

Below is the speed of get_transform() subscribing to the message; I just commented out the print(f'Received TF from {t.header.frame_id} to {t.child_frame_id} at time {t.header.stamp}') to try to see the transformation I actually want. You can see the first few messages that the data is missing before getting the actual data.

Screencast from 05-08-2025 11:33:07 PM.webm

Posted by @aaronchongth:

ok I guess it looks pretty normal then. I’ll mark your own solution as the answer for this discussion then