ROS’s pub/sub is inherently asynchronous, and one of the drawbacks of that is there is no flow control implicit in the communication. For example, if you wanted a publisher to only send data when a subscriber has space available for it, then I would call that synchronous pub/sub. The advantage of asynchronous is that you can easily support one to many publishers, which in turn makes it easier to support features like playback of many recorded data streams from a single entity (essentially rosbag). The advantage of synchronous is that you can control the flow of the data for intermediate topics, as you’ve described above.
The problem you’ve described is sort of the problem that ecto
was designed to address:
It’s been around for a while, and I think people are still using it in perception like pipelines (sort of what you’ve described above) to much success. I’m not sure how active development is on it these days, but I think it integrates quite well into ROS.
Ecto creates a separate, self-contained synchronous graph which exists in a larger ROS system as a single node, and so its internals are not introspectable with the normal ROS tools.
It’s possible, in theory, to enforce a synchronous flow control over a set of asynchronous ROS nodes using extra topics and services to control the flow of the internals of each node, but I’m not aware of anyone who has done this in a generic way. Maybe someone else can speak up if they know of one.
For ROS 2, we’ve been discussion how we might do this in order to determine if there are any features missing in ROS 1’s communication system that would prevent us from doing so, but we’ve stopped short of writing this down in a white paper like document or prototyping it. The basic idea is to make it possible to control behavior of each node in the synchronous graph through a polling mechanism, and then use that polling (or pumping) mechanism to implement a supervisor who “fires” each node in sequence. Then the fact that the asynchronous comms is being used between the nodes is unimportant.
Some workarounds you can consider, is to only “step” rosbag manually, assuming that the beginning of your pipeline only needs one message to start the chain, e.g. would not work for a stereo vision pipeline where you’re playing back the images from both cameras. You can also write a script to do this based on some condition with the rosbag api. You could also implement your own flow control using services and other mechanisms, however this is usually a lot of work and leads to less reusable code.
I believe that OpenRTM can do something like I’ve described, but I don’t have any pointers off-hand. I just remember reading about it in the past with respect to their “executors” which control the execution of individual “nodes” (might be a different term in their nomenclature), even across processes.
Also, I believe that LCM allows you to playback data in a way that it will not overflow the queue while a downstream subscriber does not have queue space available. I don’t know if this applies to intermediate topics between two entities, or just for playback of a single topic. Again, no pointers on that off-hand, but I remember reading about it.
Also, related to the ROS 2 work, I think we could use some of the DDS QoS settings to implement a publisher which could block is any downstream subscriber is full, but there are reasons to not do that as well. If you imagine that someone adds an introspection tool to a topic as a subscriber which slows down the publisher unexpectedly and would adversely affect the behavior of the subscribers in the downstream nodes.