Posted by @ptientho:
Hi,
I have a question on how to test the function test_robot_pose()
in test_nav2_robot_adapter.py of free_fleet adapter.
I follow the link to implement my own fleet adapter with Zenoh, which is similar to the test script.
I’m not sure what causes the function to fail, and the result is below.
______________________ TestFastbotAdapter.test_robot_pose ______________________
test/test_fastbot_adapter.py:63: in test_robot_pose
assert transform is not None
E AssertionError: assert None is not None
the method returns None
so that transform is None
Here is my code, which takes Nav2 fleet adapter as an example.
class Nav2TfHandler:
def __init__(self, robot_name, zenoh_session, tf_buffer, node, robot_frame='base_footprint', map_frame='map'):
self.robot_name = robot_name
self.zenoh_session = zenoh_session
self.tf_buffer = tf_buffer
self.robot_frame = robot_frame
self.map_frame = map_frame
self.node = node
def _tf_callback(sample: zenoh.Sample):
try:
transform = TFMessage.deserialize(sample.payload.to_bytes())
except Exception as e:
self.node.get_logger().debug(
f'Failed to deserialize TF payload: {type(e)}: {e}'
)
return None
for zt in transform.transforms:
t = transform_stamped_to_ros2_msg(zt)
# t.header.frame_id = namespacify(zt.header.frame_id,
# self.robot_name)
# t.child_frame_id = namespacify(zt.child_frame_id,
# self.robot_name)
# t.header.frame_id = namespacify(zt.header.frame_id,
# '')
# t.child_frame_id = namespacify(zt.child_frame_id,
# '')
print(f'Received TF from {t.header.frame_id} to {t.child_frame_id} at time {t.header.stamp}')
self.tf_buffer.set_transform(
t, f'{self.robot_name}_TfListener')
def _duration_callback(sample: zenoh.Sample):
feedback = NavigateToPose_Feedback.deserialize(sample.payload.to_bytes())
self.time_remaining = feedback.estimated_time_remaining
print(
f'Robot [{self.robot_name}] time remaining: {self.time_remaining}')
self.tf_sub = self.zenoh_session.declare_subscriber(
namespacify('tf', self.robot_name),
_tf_callback
)
self.time_sub = self.zenoh_session.declare_subscriber(
namespacify('navigate_to_pose/_action/feedback', self.robot_name),
_duration_callback
)
def get_transform(self) -> TransformStamped | None:
# This function should return a TransformStamped object or None
# if the transform is not available.
try:
transform = self.tf_buffer.lookup_transform(
#namespacify(self.map_frame, self.robot_name),
self.map_frame,
#namespacify(self.robot_frame, ''),
self.robot_frame,
rclpy.time.Time(),
)
return transform
except Exception as e:
self.node.get_logger().info(
f'Unabled to get transform between {self.robot_frame} and {self.map_frame}: {type(e)}: {e}'
)
return None
I suspect that the callback functions - _tf_callback() and _duration_callback - are never executed.
Not sure what I did wrong here. If anyone knows, please give me idea.
Thanks
Chosen answer
Answer chosen by @aaronchongth at 2025-05-09T07:11:02Z.
Answered by @ptientho:
I found the cause. It’s that because get_transform()
takes sometime to receive the transform message. After I used nav2_get_tf.py script you provided, I tested several times and it takes around 5 seconds to receive the transform.