I am currently working on getting some ros2 topics onto a streaming platform. I want to use mediasoup
for this, as it is open source and self hosted.
However, I’m having some issues in streaming these topics to mediasoup.
I have tried the following:
In the demo of mediasoup, there is a gstreamer.sh
, there I could stream random mp4 files to my server. I know therefore my server works.
They have also added a package called mediasoupbin
, that allows for gstreamer to stream directly to mediasoup
. So I thought, ok let’s use this.
I have managed to publish to a local RTP stream with:
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import Image
import numpy as np
import cv2
import threading
import time
class RosImageServer(Node):
def __init__(self):
super().__init__('ros_image_server')
# Subscribe to the ROS2 image topic for drone_00 camera_0
self.subscription1 = self.create_subscription(
Image,
'/drone_00/camera_0/color/image_raw',
self.listener_callback1,
10
)
# Subscribe to the ROS2 image topic for drone_11 camera_1
self.subscription2 = self.create_subscription(
Image,
'/drone_11/camera_1/color/image_raw',
self.listener_callback2,
10
)
# Store the latest images
self.latest_image1 = None
self.latest_image2 = None
# Locks to synchronize access to images
self.lock1 = threading.Lock()
self.lock2 = threading.Lock()
# Initialize streamers as None; they'll be created after first frame
self.streamer1 = None
self.streamer2 = None
# Start streaming threads
self.streaming_thread1 = threading.Thread(target=self.stream_video1)
self.streaming_thread2 = threading.Thread(target=self.stream_video2)
self.streaming_thread1.daemon = True
self.streaming_thread2.daemon = True
self.streaming_thread1.start()
self.streaming_thread2.start()
def initialize_rtp_stream(self, ip, port, width, height):
"""Initialize the RTP stream using OpenCV and GStreamer with the correct frame size."""
gst_str = (
f'appsrc is-live=true block=false format=GST_FORMAT_TIME ! '
f'video/x-raw,format=BGR,width={width},height={height},framerate=30/1 ! '
f'videoconvert ! video/x-raw,format=I420 ! '
f'x264enc tune=zerolatency bitrate=500 speed-preset=superfast key-int-max=30 insert-vui=true ! '
f'rtph264pay config-interval=1 pt=96 ! '
f'udpsink host={ip} port={port} sync=false async=false'
)
print(gst_str)
self.get_logger().info(f"Initializing RTP stream to {ip}:{port} with size {width}x{height}")
try:
# Initialize VideoWriter with the correct GStreamer pipeline
streamer = cv2.VideoWriter(
gst_str,
cv2.CAP_GSTREAMER,
0, # FourCC code (can be 0 when using GStreamer)
30.0, # Frames per second
(width, height),
True # Is color
)
if not streamer.isOpened():
self.get_logger().error(f"Failed to open RTP stream for {ip}:{port}")
return None
return streamer
except Exception as e:
self.get_logger().error(f"Failed to initialize RTP stream for {ip}:{port}: {e}")
return None
def listener_callback1(self, msg):
try:
self.get_logger().info("Received image from drone_00")
with self.lock1:
self.latest_image1 = self.process_image(msg)
if self.latest_image1 is not None:
self.get_logger().info("Processed image from drone_00")
if self.streamer1 is None:
# Initialize the streamer with actual frame size
height, width = self.latest_image1.shape[:2]
self.streamer1 = self.initialize_rtp_stream('127.0.0.1', 5004, width, height)
else:
self.get_logger().warning("No valid image data from drone_00")
except Exception as e:
self.get_logger().error(f"Exception in listener_callback1: {e}")
def listener_callback2(self, msg):
try:
self.get_logger().info("Received image from drone_11")
with self.lock2:
self.latest_image2 = self.process_image(msg)
if self.latest_image2 is not None:
self.get_logger().info("Processed image from drone_11")
if self.streamer2 is None:
# Initialize the streamer with actual frame size
height, width = self.latest_image2.shape[:2]
self.streamer2 = self.initialize_rtp_stream('127.0.0.1', 5006, width, height)
else:
self.get_logger().warning("No valid image data from drone_11")
except Exception as e:
self.get_logger().error(f"Exception in listener_callback2: {e}")
def process_image(self, msg):
"""Process the incoming ROS image message and convert it to BGR format for RTP streaming."""
try:
# Handle image data based on its encoding
self.get_logger().info(f"Processing image with encoding {msg.encoding}")
if msg.encoding == "rgb8":
image_array = np.frombuffer(msg.data, dtype=np.uint8).reshape((msg.height, msg.width, 3))
image_array = cv2.cvtColor(image_array, cv2.COLOR_RGB2BGR)
elif msg.encoding == "bgr8":
image_array = np.frombuffer(msg.data, dtype=np.uint8).reshape((msg.height, msg.width, 3))
elif msg.encoding == "mono8":
image_array = np.frombuffer(msg.data, dtype=np.uint8).reshape((msg.height, msg.width))
image_array = cv2.cvtColor(image_array, cv2.COLOR_GRAY2BGR)
else:
raise ValueError(f"Unsupported encoding: {msg.encoding}")
self.get_logger().info(f"Processed image shape: {image_array.shape}, dtype: {image_array.dtype}")
return image_array
except Exception as e:
self.get_logger().error(f"Failed to process image: {e}")
return None
def stream_video1(self):
self.get_logger().info("Starting streaming thread for drone_00")
while rclpy.ok():
with self.lock1:
frame = self.latest_image1.copy() if self.latest_image1 is not None else None
if frame is not None:
if self.streamer1 is not None and self.streamer1.isOpened():
try:
self.streamer1.write(frame)
self.get_logger().info("Wrote frame to streamer1")
except Exception as e:
self.get_logger().error(f"Exception in streamer1.write(): {e}")
self.streamer1.release()
self.streamer1 = None
elif self.streamer1 is None:
self.get_logger().info("Streamer1 is not initialized yet")
else:
self.get_logger().error("Streamer for drone_00 is not open")
else:
self.get_logger().warning("No frame to stream for drone_00")
time.sleep(0.01)
def stream_video2(self):
self.get_logger().info("Starting streaming thread for drone_11")
while rclpy.ok():
with self.lock2:
frame = self.latest_image2.copy() if self.latest_image2 is not None else None
if frame is not None:
if self.streamer2 is not None and self.streamer2.isOpened():
try:
self.streamer2.write(frame)
self.get_logger().info("Wrote frame to streamer2")
except Exception as e:
self.get_logger().error(f"Exception in streamer2.write(): {e}")
self.streamer2.release()
self.streamer2 = None
elif self.streamer2 is None:
self.get_logger().info("Streamer2 is not initialized yet")
else:
self.get_logger().error("Streamer for drone_11 is not open")
else:
self.get_logger().warning("No frame to stream for drone_11")
time.sleep(0.01)
def cleanup(self):
"""Clean up resources before shutting down."""
self.get_logger().info("Cleaning up RTP streams and shutting down")
if self.streamer1 and self.streamer1.isOpened():
self.streamer1.release()
if self.streamer2 and self.streamer2.isOpened():
self.streamer2.release()
def main(args=None):
rclpy.init(args=args)
ros_node = RosImageServer()
try:
rclpy.spin(ros_node)
except KeyboardInterrupt:
pass
finally:
ros_node.cleanup()
ros_node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
# gst-launch-1.0 udpsrc port=5004 caps="application/x-rtp, media=video, encoding-name=H264, payload=96" ! rtph264depay ! avdec_h264 ! videoconvert ! autovideosink
# gst-launch-1.0 mediasoupbin_py name=ms server-url="https://localhost:4443/rooms/nwxqosdt" videotestsrc is-live=true ! "video/x-raw,width=1280,height=720,framerate=25/1" ! ms. audiotestsrc is-live=true wave=ticks ! "audio/x-raw" ! ms.
Which works. Now I want to stream the mediasoup, a test-stream can be created as:
gst-launch-1.0 mediasoupbin_py name=ms server-url="https://localhost:4443/rooms/nwxqosdt" videotestsrc is-live=true ! "video/x-raw,width=1280,height=720,framerate=25/1" ! ms. audiotestsrc is-live=true wave=ticks ! "audio/x-raw" ! ms.
So I thought all I need to do is create a a gstreamer
element like videotestsrc and use the same command. However I am unable to do so.
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import Image
import numpy as np
import cv2
import gi
import threading
import sys
import traceback
gi.require_version('Gst', '1.0')
from gi.repository import Gst, GLib
Gst.init(None)
class MediasoupStreamer(Node):
def __init__(self):
super().__init__('mediasoup_streamer')
self.get_logger().info("Initializing MediasoupStreamer")
# Subscribe to the ROS2 image topic
self.subscription = self.create_subscription(
Image,
'/drone_00/camera_0/color/image_raw',
self.listener_callback,
10
)
self.get_logger().info("Subscribed to ROS2 image topic")
# Store the latest image
self.latest_image = None
self.lock = threading.Lock()
# Initialize GStreamer pipeline
self.pipeline = None
self.appsrc = None
# Start streaming thread
self.streaming_thread = threading.Thread(target=self.stream_video)
self.streaming_thread.daemon = True
self.streaming_thread.start()
self.get_logger().info("Streaming thread started")
def initialize_gstreamer_pipeline(self):
self.get_logger().info("Initializing GStreamer pipeline")
pipeline_str = (
'appsrc name=source is-live=true format=GST_FORMAT_TIME ! '
'videoconvert ! video/x-raw,format=I420 ! '
'x264enc tune=zerolatency bitrate=2000 speed-preset=superfast ! '
'video/x-h264,profile=baseline ! '
'mediasoupbin_py name=ms server-url="https://localhost:4443/rooms/nwxqosdt" '
'ms.'
)
try:
self.pipeline = Gst.parse_launch(pipeline_str)
self.get_logger().info("GStreamer pipeline created")
except GLib.Error as e:
self.get_logger().error(f"Error creating pipeline: {e}")
return False
self.appsrc = self.pipeline.get_by_name('source')
if not self.appsrc:
self.get_logger().error("Failed to get appsrc element")
return False
# Set appsrc properties
self.appsrc.set_property('format', Gst.Format.TIME)
self.appsrc.set_property('do-timestamp', True)
# Start playing
ret = self.pipeline.set_state(Gst.State.PLAYING)
if ret == Gst.StateChangeReturn.FAILURE:
self.get_logger().error("Failed to set pipeline to PLAYING")
return False
self.get_logger().info("GStreamer pipeline initialized and set to PLAYING")
return True
def listener_callback(self, msg):
self.get_logger().debug("Received image message")
with self.lock:
self.latest_image = self.process_image(msg)
def process_image(self, msg):
try:
if msg.encoding == "rgb8":
image_array = np.frombuffer(msg.data, dtype=np.uint8).reshape((msg.height, msg.width, 3))
image_array = cv2.cvtColor(image_array, cv2.COLOR_RGB2BGR)
elif msg.encoding == "bgr8":
image_array = np.frombuffer(msg.data, dtype=np.uint8).reshape((msg.height, msg.width, 3))
elif msg.encoding == "mono8":
image_array = np.frombuffer(msg.data, dtype=np.uint8).reshape((msg.height, msg.width))
image_array = cv2.cvtColor(image_array, cv2.COLOR_GRAY2BGR)
else:
raise ValueError(f"Unsupported encoding: {msg.encoding}")
self.get_logger().debug(f"Processed image: shape={image_array.shape}, dtype={image_array.dtype}")
return image_array
except Exception as e:
self.get_logger().error(f"Failed to process image: {e}")
return None
def stream_video(self):
self.get_logger().info("Starting streaming thread")
if not self.initialize_gstreamer_pipeline():
self.get_logger().error("Failed to initialize GStreamer pipeline")
return
while rclpy.ok():
try:
with self.lock:
frame = self.latest_image.copy() if self.latest_image is not None else None
if frame is not None:
self.push_frame(frame)
else:
self.get_logger().warn("No frame available to push")
rclpy.spin_once(self, timeout_sec=0.01)
except Exception as e:
self.get_logger().error(f"Error in stream_video loop: {e}")
traceback.print_exc()
def push_frame(self, frame):
try:
data = frame.tobytes()
buf = Gst.Buffer.new_allocate(None, len(data), None)
buf.fill(0, data)
# Set buffer metadata
frame_duration = Gst.util_uint64_scale_int(1, Gst.SECOND, 30) # 30 FPS
timestamp = Gst.util_get_timestamp()
buf.pts = timestamp
buf.dts = timestamp
buf.duration = frame_duration
# Push buffer to appsrc
ret = self.appsrc.emit('push-buffer', buf)
if ret != Gst.FlowReturn.OK:
self.get_logger().error(f"Error pushing buffer: {ret}")
else:
self.get_logger().debug("Pushed frame to appsrc")
except Exception as e:
self.get_logger().error(f"Error in push_frame: {e}")
traceback.print_exc()
def cleanup(self):
self.get_logger().info("Cleaning up GStreamer pipeline and shutting down")
if self.pipeline:
self.pipeline.set_state(Gst.State.NULL)
def main(args=None):
rclpy.init(args=args)
mediasoup_streamer = MediasoupStreamer()
try:
rclpy.spin(mediasoup_streamer)
except KeyboardInterrupt:
pass
except Exception as e:
print(f"Unexpected error: {e}")
traceback.print_exc()
finally:
mediasoup_streamer.cleanup()
mediasoup_streamer.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
But this gives a Segmentation fault (core dumped)
, in the super init.
Does anyone have a suggestion for how to tackle this?
In summary:
- I have a working ros2topic and am able to serve it on my rtp
- mediasoup offers a gstreamer utility, but it doesn’t seem compatible with ros2
- I have tried modifying my scripts, but it currently does not work.
Please let me know if you have some suggestions.