Working with large ROS bag files on Hadoop and Spark

Large amount of sensor and robotic data is produced by the industry at an ever increasing peace. Be it from areas like mobility, perception, smart factory or from development tools through planing, modelling or simulation.

New effervescent robotic topics of research like self driving cars put pressure to develop new tools and techniques to deal with larger and more complex data sets. Some projects and industry players publicly announced the adoption of ROS as part of their process.

On the other hand, Hadoop and Spark Ecosystems are seeing a tremendous adoption for processing and analysing large data in parallel. (The Apache Hadoop software library is a framework that allows for the distributed processing of large data sets)

Why process large ROS bag files in parallel?

ROS command rosbag record subscribes to topics and writes a bag file with the contents of all messages published on those topics. For performance reasons the messages are written interlaced as they come over the wires, with different frequencies.

Associative operations can be applied in parallel. Or more precisely the parallelism requires associativity. (Although concurrency technically is not parallelism it also requires associativity.) Spark provides an unified functional API for processing locally, concurrently or on multiple machines.

Now you do not need to convert ROS bag files to work with them in Spark

The assumption was that the ROS bag files have to be converted into a more suitable format before they can be processed in parallel with tools like Hadoop or Spark. It turns out that the format is good enough for processing with a distributed file system like HDFS but it happened that nobody has written an Hadoop InputFormat for it.

So we did it. We took the time and wrote a Hadoop RosbagInputFormat :grinning: published under Apache 2.0 License.

http://github.com/valtech/ros_hadoop

RosbagInputFormat is an open source splittable Hadoop InputFormat for the rosbag file format.

16

We also prepared a Dockerfile and step-by-step tutorial that you could use to try the concepts presented here:

http://github.com/valtech/ros_hadoop

We hope that the RosbagInputFormat would be useful for you. It would be great if you give us some feedback.

Thanks!
Adrian, Jan

4 Likes

Jan, this seems great. Is it meant to run in the data center or as an on-premise solution? In other words are there any special requirements on the computer HW?

Did you do any profiling on how much faster can you process data using this spliter + spark vs if you’d just do it sequentially by playing a bag from an ext4 formatted disk?

D.

Both are possible, data center and on-premise. There are no hardware requirements but we recommend a 3 nodes setup to see the benefits of parallelism. Start with 64-128 GB memory, 4 disks and quad-core per node.

Spark performance really scale out with multiple machines. If there are 3 splits is 3 times faster with 3 workers. etc.

HI JAN
can you open rosbaginputformat_2.11-0.9.3.jar source on github ros_hadoop project ? i don’t know how it works. 3Q

The scala source code is in the https://github.com/valtech/ros_hadoop/tree/master/src/main/scala

The jar is packaged by the build script https://github.com/valtech/ros_hadoop/blob/master/build.sbt

It works similar to many other Hadoop InputFormats.

Hi Jan,

Will there be any plans to write up a tutorial for the scala api? I am able to pull the topic names but it seems a little cumbersome compaired to the python api.

Thanks

Hi Jan,

I was trying to implement “flux-project” from github “https://github.com/flux-project/flux”. But I am facing a error as “ImagePullBackOff”.

I wanted to know where the image “pod/flux-ros-hadoop-5f8cd4bc67-h5thl” is located.

Hi Joe,

Basically an Scala solution without ROS means effort of reimplementing parts of ROS in Scala. Spark might be Scala/Python but ROS is C++/Python so the glue is Python not Scala.

image

i have run the programme on spark-shell ,but i got a exception .
scala> fin = sc.newAPIHadoopFile(
| path = “hdfs://127.0.0.1:9000/user/root/HMB_4.bag”,
| inputFormatClass = “de.valtech.foss.RosbagMapInputFormat”,
| keyClass = “org.apache.hadoop.io.LongWritable”,
| valueClass = “org.apache.hadoop.io.MapWritable”,
| conf = {“RosbagInputFormat.chunkIdx”:"/srv/data/HMB_4.bag.idx.bin"})
:6: error: identifier expected but string literal found.
conf = {“RosbagInputFormat.chunkIdx”:"/srv/data/HMB_4.bag.idx.bin"})

do you know why ? Eagerly awaiting your reply…

Hi Jie,

In our tutorial we used Python code not Scala. Please use PySpark and Python.

thank you very much . i have a other problem,the site : http://flux-project.org/ does not work. :sweat_smile:

Hi Jan ,
did you know how to extract the pcd file from rosbag with hadoop&saprk ? I don’t see this example in the project.

Hi Jan,

I am using Azure HDInsight Spark cluster to extract data from rosbag files using RosbagInputFormat. I have followed the readme file. While running the code in pyspark I am getting the following error,

It is not able to read the idx file from local system.

Using Python version 2.7.12 (default, Jul  2 2016 17:42:40)
SparkSession available as 'spark'.
>>> sc.newAPIHadoopFile(
...     path =             "/user/spark/HMB_4.bag",
...     inputFormatClass = "de.valtech.foss.RosbagMapInputFormat",
...     keyClass =         "org.apache.hadoop.io.LongWritable",
...     valueClass =       "org.apache.hadoop.io.MapWritable",
...     conf = {"RosbagInputFormat.chunkIdx":"/opt/ros_hadoop/master/dist/HMB_4.bag.idx.bin"})
[Stage 0:>                                                          (0 + 1) / 1]19/04/10 14:16:35 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, wn1-avdp-h.cfzrwlyaxyyuvies4sglc0tsud.cx.internal.cloudapp.net, executor 5): java.io.FileNotFoundException: /opt/ros_hadoop/master/dist/HMB_3.bag.idx.bin (No such file or directory)
        at java.io.FileInputStream.open0(Native Method)

Could you please help me with that?

Thanks,
Sayandeep

@raysayandeep Thanks for your question. However we ask that you please ask questions on http://answers.ros.org following our support guidelines: http://wiki.ros.org/Support

ROS Discourse is for news and general interest discussions. ROS Answers provides a forum which can be filtered by tags to make sure the relevant people can find and/or answer the question, and not overload everyone with hundreds of posts.

1 Like

import org.apache.hadoop.io.{LongWritable, MapWritable}

val path = “hdfs://hadoop-master:9000/user/root/fichier_ros.bag”
val inputFormatClass = classOf[io.autovia.foss.RosbagMapInputFormat]
val keyClass = classOf[LongWritable]
val valueClass = classOf[MapWritable]
val conf = new org.apache.hadoop.conf.Configuration()
conf.set(“RosbagInputFormat.chunkIdx”, “hdfs://hadoop-master:9000/user/root/fichier_ros.bag.idx.bin”)
val rdd = sc.newAPIHadoopFile(path, inputFormatClass, keyClass, valueClass, conf)

I have same errors in this code
java.io.FileNotFoundException: hdfs:/hadoop-master:9000/fichier_ros.bag.idx.bin (No such file or directory)

plz i need to creat RDD in spark with fille rosbag if you have same ideas

I would recommend using either answers.ros.org, contacting the original author via DM, or filing an issue on the source repository. This ROS Discourse thread has been inactive for four years.

1 Like

Hey Mohamed! We (SensorSurf) built an open-source package to help you load all your MCAP or ROS Bag data into a Timescale database so you can run SQL queries. This might help solve some of your pains. Check it out here: GitHub - SensorSurf/mcap-etl: Transform mcap (or rosbag) into databases and other file formats