# Copyright 2023-present ifm electronic, gmbh
# SPDX-License-Identifier: Apache-2.0
# This example shows how to unpack data saved
# in the ifm h5 format (for example with the
# ifm Vision Assistant) into .pcd files. One
# file per recorded frame is created.
# Record data with the iVA. If you do not record
# the point cloud along with the rest of the data,
# we will attempt to calculate it in this script.
# If the necessary imported functions are not
# available, an error will be raised.
# %%
from pathlib import Path
from dataclasses import dataclass
from os.path import join as path_join
import argparse
import logging
import numpy as np
import h5py
import open3d
# Optional import of utilities from another example.
# This is used in cases where the point cloud data is
# not available in the HDF5 dataset.
# If the module is not found, the point cloud cannot
# be calculated from the radial distance image.
import sys
import os
sys.path.append(os.path.dirname(os.getcwd()) + "/Registration2d3d")
from transforms import intrinsic_projection
transforms_available = True
except ModuleNotFoundError:
transforms_available = False
status_logger = logging.getLogger(__name__)
# %%
class TOFData:
This data class mirrors how data is formatted
in the ifm HDF5 format as saved in iVA recordings
and provides utilities to convert to pcd format.
dis: np.ndarray
amp: np.ndarray
width: np.uint16
height: np.uint16
frameCounter: np.uint32
distanceResolution: np.float32
amplitudeResolution: np.float32
modelID3D: np.uint32
intrinsics3: np.ndarray
inv_intrinsic3: np.ndarray
extrinsic3D: np.ndarray
cloud: np.ndarray
cloud_data: bool
def pcd_from_numpy_array(self):
"""Format the point cloud to the pcd format.
:return: formatted cloud
array = self.cloud
height = self.dis.shape[0]
width = self.dis.shape[1]
pcd = open3d.geometry.PointCloud()
xyz = np.reshape(array, (3, height * width))
xyz = np.transpose(xyz)
pcd.points = open3d.utility.Vector3dVector(xyz)
return pcd
def calc_pointcloud(self):
"""Calculate the point cloud from the radial
distance image in cases where the point cloud
was not saved in the HDF5 dataset.
:return: point cloud
# Calculate 3D unit vectors corresponding to each pixel
# of depth camera
ux, uy, uz = intrinsic_projection(
self.modelID3D, self.intrinsics3, *self.dis.shape[::-1]
# Multiply unit vectors by depth of corresponding pixel
x = (ux * self.dis).flatten()
y = (uy * self.dis).flatten()
z = (uz * self.dis).flatten()
valid = self.dis.flatten() > 0.05
status_logger.info(f"{round(sum(valid)/x.size*100)}% valid pts")
for i, pt_valid in enumerate(valid):
if not pt_valid:
x[i] = y[i] = z[i] = 0.0
# Restructure point cloud as sequence of points
pcd_o3 = np.stack((x, y, z), axis=0)
return pcd_o3
# %%
def load_o3r_tof_h5(filename: str) -> list:
"""load data: ifm h5 data container - e.g. recording from ifm Vision Assistant
:param filename (str): filename
:raises ValueError: if missing data in file
:return list: list of data classes
hf1 = h5py.File(filename, "r")
status_logger.info(f"data file loaded: {filename}")
# list all available data keys
# hf1["streams"]["o3r_tof_0"][0].dtype
def load_tof_stream_data(tof_stream_name: str, cloud_data=True) -> dict:
"""Get the point cloud data for a specific tof data stream.
:param tof_stream_name: tof stream name in recording.
:param cloud_data: availability of cloud data, defaults to True
:raises Exception: if no point cloud data is available and
it cannot be calculated dut to missing imports.
:return: data including point cloud.
data = []
_ = hf1["streams"][tof_stream_name][0]["cloud"]
cloud_data = True
except ValueError as e:
cloud_data = False
status_logger.error(f"No 3D data available in {tof_stream_name}")
# raise e
for d in hf1["streams"][tof_stream_name]:
extrinsic3D = d["extrinsicOpticToUserTrans"]
extrinsic3D = np.append(extrinsic3D, d["extrinsicOpticToUserRot"])
tof_data = TOFData(
if cloud_data:
tof_data.cloud = d["cloud"]
if transforms_available:
tof_data.cloud = tof_data.calc_pointcloud()
tof_data.cloud = np.zeros((tof_data.width, tof_data.height, 3))
cloud_data = False
raise Exception(
"Cannot calculate the point cloud due to missing transforms package."
return {"tof_stream": tof_stream_name, "data": data}
tof_data = []
tof_stream_names = [
streams for streams in list(hf1["streams"]) if "o3r_tof" in streams
for tof_stream_name in tof_stream_names:
return tof_data
def visualize_pcd(path_to_pcd: str) -> None:
pcd = open3d.io.read_point_cloud(path_to_pcd, format="pcd")
if len(pcd.points) == 0:
raise Exception("No point cloud data, possibly wrong path provided")
def safe_pcd_file(pcd: open3d.cpu.pybind.geometry.PointCloud, filename: str) -> bool:
open3d.io.write_point_cloud(filename, pcd)
return True
except Exception as e:
return False
# %%
def main(filename):
data = load_o3r_tof_h5(filename=filename)
parent_file_name = filename.split(".")[0]
directory = Path(filename).parent.resolve()
status_logger.info(f"parent filename: {parent_file_name}")
status_logger.info(f"saving directory: {directory}")
for tof_stream in data:
tof_stream_data = tof_stream["data"]
tof_stream_name = tof_stream["tof_stream"]
for d in tof_stream_data:
pcd = d.pcd_from_numpy_array()
filename = "".join(
directory_filename = path_join(directory, filename)
f"data converted to PCD: start saving the data to file {filename}"
safe_pcd_file(filename=directory_filename, pcd=pcd)
status_logger.info("finished converting")
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="converts ifm HDf5 data files to PCD files",
parser.add_argument("--filename", default="test_rec.h5")
args = parser.parse_args()
# %%