Stream ROS2 topic to mediasoup

I am currently working on getting some ros2 topics onto a streaming platform. I want to use mediasoup for this, as it is open source and self hosted.

However, I’m having some issues in streaming these topics to mediasoup.

I have tried the following:

In the demo of mediasoup, there is a gstreamer.sh, there I could stream random mp4 files to my server. I know therefore my server works.

They have also added a package called mediasoupbin, that allows for gstreamer to stream directly to mediasoup. So I thought, ok let’s use this.

I have managed to publish to a local RTP stream with:

import rclpy
from rclpy.node import Node
from sensor_msgs.msg import Image
import numpy as np
import cv2
import threading
import time

class RosImageServer(Node):
    def __init__(self):
        super().__init__('ros_image_server')

        # Subscribe to the ROS2 image topic for drone_00 camera_0
        self.subscription1 = self.create_subscription(
            Image,
            '/drone_00/camera_0/color/image_raw',
            self.listener_callback1,
            10
        )

        # Subscribe to the ROS2 image topic for drone_11 camera_1
        self.subscription2 = self.create_subscription(
            Image,
            '/drone_11/camera_1/color/image_raw',
            self.listener_callback2,
            10
        )

        # Store the latest images
        self.latest_image1 = None
        self.latest_image2 = None

        # Locks to synchronize access to images
        self.lock1 = threading.Lock()
        self.lock2 = threading.Lock()

        # Initialize streamers as None; they'll be created after first frame
        self.streamer1 = None
        self.streamer2 = None

        # Start streaming threads
        self.streaming_thread1 = threading.Thread(target=self.stream_video1)
        self.streaming_thread2 = threading.Thread(target=self.stream_video2)
        self.streaming_thread1.daemon = True
        self.streaming_thread2.daemon = True
        self.streaming_thread1.start()
        self.streaming_thread2.start()

    def initialize_rtp_stream(self, ip, port, width, height):
        """Initialize the RTP stream using OpenCV and GStreamer with the correct frame size."""
        gst_str = (
    f'appsrc is-live=true block=false format=GST_FORMAT_TIME ! '
    f'video/x-raw,format=BGR,width={width},height={height},framerate=30/1 ! '
    f'videoconvert ! video/x-raw,format=I420 ! '
    f'x264enc tune=zerolatency bitrate=500 speed-preset=superfast key-int-max=30 insert-vui=true ! '
    f'rtph264pay config-interval=1 pt=96 ! '
    f'udpsink host={ip} port={port} sync=false async=false'
)
        
        print(gst_str)
        self.get_logger().info(f"Initializing RTP stream to {ip}:{port} with size {width}x{height}")
        try:
            # Initialize VideoWriter with the correct GStreamer pipeline
            streamer = cv2.VideoWriter(
                gst_str,
                cv2.CAP_GSTREAMER,
                0,          # FourCC code (can be 0 when using GStreamer)
                30.0,       # Frames per second
                (width, height),
                True        # Is color
            )
            if not streamer.isOpened():
                self.get_logger().error(f"Failed to open RTP stream for {ip}:{port}")
                return None
            return streamer
        except Exception as e:
            self.get_logger().error(f"Failed to initialize RTP stream for {ip}:{port}: {e}")
            return None

    def listener_callback1(self, msg):
        try:
            self.get_logger().info("Received image from drone_00")
            with self.lock1:
                self.latest_image1 = self.process_image(msg)
                if self.latest_image1 is not None:
                    self.get_logger().info("Processed image from drone_00")
                    if self.streamer1 is None:
                        # Initialize the streamer with actual frame size
                        height, width = self.latest_image1.shape[:2]
                        self.streamer1 = self.initialize_rtp_stream('127.0.0.1', 5004, width, height)
                else:
                    self.get_logger().warning("No valid image data from drone_00")
        except Exception as e:
            self.get_logger().error(f"Exception in listener_callback1: {e}")

    def listener_callback2(self, msg):
        try:
            self.get_logger().info("Received image from drone_11")
            with self.lock2:
                self.latest_image2 = self.process_image(msg)
                if self.latest_image2 is not None:
                    self.get_logger().info("Processed image from drone_11")
                    if self.streamer2 is None:
                        # Initialize the streamer with actual frame size
                        height, width = self.latest_image2.shape[:2]
                        self.streamer2 = self.initialize_rtp_stream('127.0.0.1', 5006, width, height)
                else:
                    self.get_logger().warning("No valid image data from drone_11")
        except Exception as e:
            self.get_logger().error(f"Exception in listener_callback2: {e}")

    def process_image(self, msg):
        """Process the incoming ROS image message and convert it to BGR format for RTP streaming."""
        try:
            # Handle image data based on its encoding
            self.get_logger().info(f"Processing image with encoding {msg.encoding}")
            if msg.encoding == "rgb8":
                image_array = np.frombuffer(msg.data, dtype=np.uint8).reshape((msg.height, msg.width, 3))
                image_array = cv2.cvtColor(image_array, cv2.COLOR_RGB2BGR)
            elif msg.encoding == "bgr8":
                image_array = np.frombuffer(msg.data, dtype=np.uint8).reshape((msg.height, msg.width, 3))
            elif msg.encoding == "mono8":
                image_array = np.frombuffer(msg.data, dtype=np.uint8).reshape((msg.height, msg.width))
                image_array = cv2.cvtColor(image_array, cv2.COLOR_GRAY2BGR)
            else:
                raise ValueError(f"Unsupported encoding: {msg.encoding}")
            self.get_logger().info(f"Processed image shape: {image_array.shape}, dtype: {image_array.dtype}")
            return image_array
        except Exception as e:
            self.get_logger().error(f"Failed to process image: {e}")
            return None

    def stream_video1(self):
        self.get_logger().info("Starting streaming thread for drone_00")
        while rclpy.ok():
            with self.lock1:
                frame = self.latest_image1.copy() if self.latest_image1 is not None else None
            if frame is not None:
                if self.streamer1 is not None and self.streamer1.isOpened():
                    try:
                        self.streamer1.write(frame)
                        self.get_logger().info("Wrote frame to streamer1")
                    except Exception as e:
                        self.get_logger().error(f"Exception in streamer1.write(): {e}")
                        self.streamer1.release()
                        self.streamer1 = None
                elif self.streamer1 is None:
                    self.get_logger().info("Streamer1 is not initialized yet")
                else:
                    self.get_logger().error("Streamer for drone_00 is not open")
            else:
                self.get_logger().warning("No frame to stream for drone_00")
            time.sleep(0.01)

    def stream_video2(self):
        self.get_logger().info("Starting streaming thread for drone_11")
        while rclpy.ok():
            with self.lock2:
                frame = self.latest_image2.copy() if self.latest_image2 is not None else None
            if frame is not None:
                if self.streamer2 is not None and self.streamer2.isOpened():
                    try:
                        self.streamer2.write(frame)
                        self.get_logger().info("Wrote frame to streamer2")
                    except Exception as e:
                        self.get_logger().error(f"Exception in streamer2.write(): {e}")
                        self.streamer2.release()
                        self.streamer2 = None
                elif self.streamer2 is None:
                    self.get_logger().info("Streamer2 is not initialized yet")
                else:
                    self.get_logger().error("Streamer for drone_11 is not open")
            else:
                self.get_logger().warning("No frame to stream for drone_11")
            time.sleep(0.01)

    def cleanup(self):
        """Clean up resources before shutting down."""
        self.get_logger().info("Cleaning up RTP streams and shutting down")
        if self.streamer1 and self.streamer1.isOpened():
            self.streamer1.release()
        if self.streamer2 and self.streamer2.isOpened():
            self.streamer2.release()

def main(args=None):
    rclpy.init(args=args)
    ros_node = RosImageServer()
    try:
        rclpy.spin(ros_node)
    except KeyboardInterrupt:
        pass
    finally:
        ros_node.cleanup()
        ros_node.destroy_node()
        rclpy.shutdown()

if __name__ == '__main__':
    main()

# gst-launch-1.0 udpsrc port=5004 caps="application/x-rtp, media=video, encoding-name=H264, payload=96" ! rtph264depay ! avdec_h264 ! videoconvert ! autovideosink

# gst-launch-1.0     mediasoupbin_py name=ms server-url="https://localhost:4443/rooms/nwxqosdt"     videotestsrc is-live=true ! "video/x-raw,width=1280,height=720,framerate=25/1" ! ms.     audiotestsrc is-live=true wave=ticks ! "audio/x-raw" ! ms.

Which works. Now I want to stream the mediasoup, a test-stream can be created as:

gst-launch-1.0 mediasoupbin_py name=ms server-url="https://localhost:4443/rooms/nwxqosdt" videotestsrc is-live=true ! "video/x-raw,width=1280,height=720,framerate=25/1" ! ms. audiotestsrc is-live=true wave=ticks ! "audio/x-raw" ! ms.

So I thought all I need to do is create a a gstreamer element like videotestsrc and use the same command. However I am unable to do so.

import rclpy
from rclpy.node import Node
from sensor_msgs.msg import Image
import numpy as np
import cv2
import gi
import threading
import sys
import traceback

gi.require_version('Gst', '1.0')
from gi.repository import Gst, GLib

Gst.init(None)

class MediasoupStreamer(Node):
    def __init__(self):
        super().__init__('mediasoup_streamer')
        self.get_logger().info("Initializing MediasoupStreamer")

        # Subscribe to the ROS2 image topic
        self.subscription = self.create_subscription(
            Image,
            '/drone_00/camera_0/color/image_raw',
            self.listener_callback,
            10
        )
        self.get_logger().info("Subscribed to ROS2 image topic")

        # Store the latest image
        self.latest_image = None
        self.lock = threading.Lock()

        # Initialize GStreamer pipeline
        self.pipeline = None
        self.appsrc = None

        # Start streaming thread
        self.streaming_thread = threading.Thread(target=self.stream_video)
        self.streaming_thread.daemon = True
        self.streaming_thread.start()
        self.get_logger().info("Streaming thread started")

    def initialize_gstreamer_pipeline(self):
        self.get_logger().info("Initializing GStreamer pipeline")
        pipeline_str = (
            'appsrc name=source is-live=true format=GST_FORMAT_TIME ! '
            'videoconvert ! video/x-raw,format=I420 ! '
            'x264enc tune=zerolatency bitrate=2000 speed-preset=superfast ! '
            'video/x-h264,profile=baseline ! '
            'mediasoupbin_py name=ms server-url="https://localhost:4443/rooms/nwxqosdt" '
            'ms.'
        )

        try:
            self.pipeline = Gst.parse_launch(pipeline_str)
            self.get_logger().info("GStreamer pipeline created")
        except GLib.Error as e:
            self.get_logger().error(f"Error creating pipeline: {e}")
            return False

        self.appsrc = self.pipeline.get_by_name('source')
        if not self.appsrc:
            self.get_logger().error("Failed to get appsrc element")
            return False

        # Set appsrc properties
        self.appsrc.set_property('format', Gst.Format.TIME)
        self.appsrc.set_property('do-timestamp', True)

        # Start playing
        ret = self.pipeline.set_state(Gst.State.PLAYING)
        if ret == Gst.StateChangeReturn.FAILURE:
            self.get_logger().error("Failed to set pipeline to PLAYING")
            return False

        self.get_logger().info("GStreamer pipeline initialized and set to PLAYING")
        return True

    def listener_callback(self, msg):
        self.get_logger().debug("Received image message")
        with self.lock:
            self.latest_image = self.process_image(msg)

    def process_image(self, msg):
        try:
            if msg.encoding == "rgb8":
                image_array = np.frombuffer(msg.data, dtype=np.uint8).reshape((msg.height, msg.width, 3))
                image_array = cv2.cvtColor(image_array, cv2.COLOR_RGB2BGR)
            elif msg.encoding == "bgr8":
                image_array = np.frombuffer(msg.data, dtype=np.uint8).reshape((msg.height, msg.width, 3))
            elif msg.encoding == "mono8":
                image_array = np.frombuffer(msg.data, dtype=np.uint8).reshape((msg.height, msg.width))
                image_array = cv2.cvtColor(image_array, cv2.COLOR_GRAY2BGR)
            else:
                raise ValueError(f"Unsupported encoding: {msg.encoding}")
            self.get_logger().debug(f"Processed image: shape={image_array.shape}, dtype={image_array.dtype}")
            return image_array
        except Exception as e:
            self.get_logger().error(f"Failed to process image: {e}")
            return None

    def stream_video(self):
        self.get_logger().info("Starting streaming thread")
        if not self.initialize_gstreamer_pipeline():
            self.get_logger().error("Failed to initialize GStreamer pipeline")
            return

        while rclpy.ok():
            try:
                with self.lock:
                    frame = self.latest_image.copy() if self.latest_image is not None else None

                if frame is not None:
                    self.push_frame(frame)
                else:
                    self.get_logger().warn("No frame available to push")

                rclpy.spin_once(self, timeout_sec=0.01)
            except Exception as e:
                self.get_logger().error(f"Error in stream_video loop: {e}")
                traceback.print_exc()

    def push_frame(self, frame):
        try:
            data = frame.tobytes()
            buf = Gst.Buffer.new_allocate(None, len(data), None)
            buf.fill(0, data)

            # Set buffer metadata
            frame_duration = Gst.util_uint64_scale_int(1, Gst.SECOND, 30)  # 30 FPS
            timestamp = Gst.util_get_timestamp()
            buf.pts = timestamp
            buf.dts = timestamp
            buf.duration = frame_duration

            # Push buffer to appsrc
            ret = self.appsrc.emit('push-buffer', buf)
            if ret != Gst.FlowReturn.OK:
                self.get_logger().error(f"Error pushing buffer: {ret}")
            else:
                self.get_logger().debug("Pushed frame to appsrc")
        except Exception as e:
            self.get_logger().error(f"Error in push_frame: {e}")
            traceback.print_exc()

    def cleanup(self):
        self.get_logger().info("Cleaning up GStreamer pipeline and shutting down")
        if self.pipeline:
            self.pipeline.set_state(Gst.State.NULL)

def main(args=None):
    rclpy.init(args=args)
    mediasoup_streamer = MediasoupStreamer()
    try:
        rclpy.spin(mediasoup_streamer)
    except KeyboardInterrupt:
        pass
    except Exception as e:
        print(f"Unexpected error: {e}")
        traceback.print_exc()
    finally:
        mediasoup_streamer.cleanup()
        mediasoup_streamer.destroy_node()
        rclpy.shutdown()

if __name__ == '__main__':
    main()

But this gives a Segmentation fault (core dumped) , in the super init.

Does anyone have a suggestion for how to tackle this?

In summary:

  • I have a working ros2topic and am able to serve it on my rtp
  • mediasoup offers a gstreamer utility, but it doesn’t seem compatible with ros2
  • I have tried modifying my scripts, but it currently does not work.

Please let me know if you have some suggestions.

Ok I’ve skipped the gstreamer, and created a stream from the demo instead