Sequentially starting launch files using ROS 2 Python when a specific log message is detected

Hi all,

I’m trying to create a ROS 2 Python launch file that starts multiple launch files in a sequence. I want to start the next launch file only after a specific log message is printed by the currently running launch file. (It’s necessary for starting Nav2 with one command)

Currently, i start the launch files with ExecuteProcess, so i can use the ProcessIO event to catch the output message that indicates readiness. My main issue with this is that ROS 2 doesn’t seem to correctly terminate all the process when the launch file was started with ExecuteProcess.

Is there a cleaner way to get launch files waiting for each? Any help or suggestions are greatly appreciated!

This is how my current solution looks like:

import launch
from launch_ros.actions import Node
from launch.actions import (
    ExecuteProcess,
    DeclareLaunchArgument,
    LogInfo,
    RegisterEventHandler,
    TimerAction,
)
from launch.substitutions import (
    LaunchConfiguration,
    PathJoinSubstitution,
    PythonExpression,
)
from launch.launch_description_sources import PythonLaunchDescriptionSource
from launch_ros.substitutions import FindPackageShare
from launch.events.process import ProcessIO
from launch.event_handlers import OnProcessIO

# Create event handler that waits for an output message and then returns actions
def on_matching_output(matcher: str, result: launch.SomeActionsType):
    def on_output(event: ProcessIO):
        for line in event.text.decode().splitlines():
            if matcher in line:
                return result

    return on_output


# Lanch the robot and the navigation stack


def generate_launch_description():
    # Messages are from: https://navigation.ros.org/setup_guides/sensors/setup_sensors.html#launching-nav2
    diff_drive_loaded_message = (
        "Sucessfully loaded controller diff_drive_base_controller into state active"
    )
    toolbox_ready_message = "Registering sensor"
    navigation_ready_message = "Creating bond timer"

    use_rviz = LaunchConfiguration("use_rviz")

    # Including launchfiles with execute process because i didn't find another way to wait for a certain messages befor starting the next launchfile
    bringup = ExecuteProcess(
        name="launch_bringup",
        cmd=[
            "ros2",
            "launch",
            PathJoinSubstitution(
                [
                    FindPackageShare("sam_bot_nav2_gz"),
                    "launch",
                    "display.launch.py",
                ]
            ),
            "use_rviz:=false",
            "use_localization:=false",
            # Thy to replace with LaunchConfigurationNotEquals
            PythonExpression(
                [
                    "'' if '",
                    LaunchConfiguration("gz_args"),
                    "' == ''",
                    " else 'gz_args:=",
                    LaunchConfiguration("gz_args"),
                    "'",
                ]
            ),
        ],
        output="screen",
    )
    toolbox = ExecuteProcess(
        name="launch_slam_toolbox",
        cmd=[
            "ros2",
            "launch",
            PathJoinSubstitution(
                [
                    FindPackageShare("slam_toolbox"),
                    "launch",
                    "online_async_launch.py",
                ]
            ),
        ],
        output="screen",
    )
    waiting_toolbox = RegisterEventHandler(
        OnProcessIO(
            target_action=bringup,
            on_stdout=on_matching_output(
                diff_drive_loaded_message,
                [
                    LogInfo(
                        msg="Diff drive controller loaded. Starting SLAM Toolbox..."
                    ),
                    toolbox,
                ],
            ),
        )
    )

    navigation = ExecuteProcess(
        name="launch_navigation",
        cmd=[
            "ros2",
            "launch",
            PathJoinSubstitution(
                [
                    FindPackageShare("nav2_bringup"),
                    "launch",
                    "navigation_launch.py",
                ]
            ),
            "use_sim_time:=True",
            ["params_file:=", LaunchConfiguration('params_file')]
        ],
        output="screen",
    )
    rviz_node = Node(
        condition=launch.conditions.IfCondition(use_rviz),
        package="rviz2",
        executable="rviz2",
        name="rviz2",
        output="screen",
        arguments=["-d", LaunchConfiguration("rvizconfig")],
    )
    waiting_navigation = RegisterEventHandler(
        OnProcessIO(
            target_action=toolbox,
            on_stdout=on_matching_output(
                # diff_drive_loaded_message,
                toolbox_ready_message,
                [
                    LogInfo(msg="SLAM Toolbox loaded. Starting navigation..."),
                    # TODO Debug: Navigation fails to start if it's launched right after the slam_toolbox
                    TimerAction(
                        period=20.0,
                        actions=[navigation],
                    ),
                    rviz_node,
                ],
            ),
        )
    )

    waiting_success = RegisterEventHandler(
        OnProcessIO(
            target_action=navigation,
            on_stdout=on_matching_output(
                navigation_ready_message,
                [
                    LogInfo(msg="Ready for navigation!"),
                ],
            ),
        )
    )

    return launch.LaunchDescription(
        [
            DeclareLaunchArgument(
                "params_file",
                default_value=[FindPackageShare("sam_bot_nav2_gz"), "/config/nav2_params.yaml"],
                description="Full path to the ROS2 parameters file to use for all launched nodes",
            ),
            DeclareLaunchArgument(
                name="rvizconfig",
                default_value=[
                    FindPackageShare("sam_bot_nav2_gz"),
                    "/rviz/navigation_config.rviz",
                ],
                description="Absolute path to rviz config file",
            ),
            DeclareLaunchArgument(
                "gz_args",
                default_value=" ",
                description="Extra args for Gazebo (ie. '-s' for running headless)",
            ),
            DeclareLaunchArgument(
                name="use_rviz",
                default_value="True",
                description="Absolute path to rviz config file",
            ),
            bringup,
            waiting_toolbox,
            waiting_navigation,
            waiting_success,
        ]
    )

I suggest you add up a flag each time the ExecuteProcess is called. If there is no print in the cmd cli the python code would allow to know when the launch file has terminated.
It is good to add : #echo “file launched” in ExecuteProcess for ROS2.

In the past, I’ve had success making a simple python node that exits when some event happens (a lifecycle change, a topic is published, a parameter change, looking for some text in the roslogs as they’re getting written, whatever). Then you can use the OnProcessExit event handler to trigger your subsequent stages. This way you don’t have to use ExecuteProcess - you can have Node or IncludeLaunchDescription actions directly.

If you do use ExecuteProcess, try setting shell=False - things seem to hang around in the background when you use shell=True

1 Like

This topic was automatically closed 30 days after the last reply. New replies are no longer allowed.