Full example

This demo show how to run ODS with multiple camera heads while monitoring the diagnostic. It also uses a minimal viewer to display the occupancy grid.

By default, the following JSON configuration is used:

{
    "ports": {
        "port2": {
            "processing": {
                "extrinsicHeadToUser": {
                    "rotX": -1.57,
                    "rotY": 1.57,
                    "rotZ": 0.0,
                    "transX": 0.0,
                    "transY": 0.0,
                    "transZ": 0.6
                }
            }
        },
        "port3": {
            "processing": {
                "extrinsicHeadToUser": {
                    "rotX": -1.57,
                    "rotY": -1.57,
                    "rotZ": 0.0,
                    "transX": -1,
                    "transY": 0.0,
                    "transZ": 0.6
                }
            }
        }
    }
}
#############################################
# Copyright 2023-present ifm electronic, gmbh
# SPDX-License-Identifier: Apache-2.0
# %%############################################
# Showcases a typical sequence of an ODS application running
# from the initial configuration to the "while true" streaming of data
######################################################################

# Imports
import logging
import numpy as np
from ifm3dpy.device import O3R

from ovp8xxexamples.core.diagnostic import O3RDiagnostic
from ovp8xxexamples.core.bootup_monitor import BootUpMonitor
from ovp8xxexamples.ods.ods_config import validate_json, load_config_from_file
from ovp8xxexamples.ods.ods_stream import ODSStream
from ovp8xxexamples.ods.ods_visualization import OCVWindow, ODSViz


def main(ip, log_to_file, ods_cfg_file, calib_cfg_file):
    # %% Initialization of basic objects
    o3r = O3R(ip)
    logger = logging.getLogger(__name__)
    logging.basicConfig()
    logger.setLevel(logging.INFO)

    # %%######################################
    # Make sure boot up sequence is completed
    #########################################
    bootup_monitor = BootUpMonitor(o3r)
    bootup_monitor.monitor_VPU_bootup()

    # %%######################################
    # Check the diagnostic for any critical errors
    #########################################
    diag = O3RDiagnostic(o3r=o3r, log_to_file=log_to_file)
    for err in diag.get_diagnostic_filtered({"state": "active"})["events"]:
        logger.error(f"Active error: {err['id']}, {err['name']}")
    logger.info("Review any active errors before continuing.")

    # %%######################################
    # Start the async diag
    #########################################
    diag.start_async_diag()

    # %%######################################
    # Configure the ODS application
    #########################################
    o3r.reset("/applications")
    schema = o3r.get_schema()

    # Set the camera extrinsics so that ODS knows where everything is
    config_snippet_extrinsics = load_config_from_file(calib_cfg_file)
    validate_json(schema, config_snippet_extrinsics)
    o3r.set(config_snippet_extrinsics)

    # Initialize the app
    config_snippet_new_ods_app = load_config_from_file(ods_cfg_file)
    validate_json(schema, config_snippet_new_ods_app)
    o3r.set(config_snippet_new_ods_app)

    input("Press enter to run the ODS application\n")
    # Set the app state to RUN
    o3r.set({"applications": {"instances": {"app0": {"state": "RUN"}}}})

    # %%######################################
    # Start streaming data
    #########################################
    ods_stream = ODSStream(o3r=o3r, app_name="app0", buffer_length=1, timeout=200)
    ods_stream.start_ods_stream()

    # %%######################################
    # Display data (no interactivity)
    #########################################

    window = OCVWindow(
        "ODS output - Occupancy grid, zones and diagnostic. Press 'q' to exit."
    )
    window.open()
    visualizer = ODSViz(o3r)
    try:
        while window.window_created:
            # Collect ODS output
            zones = ods_stream.get_zones().zone_occupied
            raw_occupancy_grid = ods_stream.get_occupancy_grid().image

            # Generate a pretty visual
            ods_visualization = visualizer.render_visual(raw_occupancy_grid, zones)
            window.update_image(ods_visualization)

    except Exception as e:
        logger.info("Viewing interrupted, turning off viewer.")
        window.destroy()
        raise e

    # %%######################################
    # Display data and toggle active cameras using number keys
    #########################################
    app0 = o3r.get(["/applications/instances/app0"])["applications"]["instances"][
        "app0"
    ]
    available_3d_port_ns = [
        int(port[-1]) for port in app0["ports"] if int(port[-1]) < 6
    ]
    max_active_cameras = app0["configuration"]["maxNumSimultaneousCameras"]
    active_cameras = [
        port[-1]
        for port in o3r.get(["/applications/instances/app0/configuration/activePorts"])[
            "applications"
        ]["instances"]["app0"]["configuration"]["activePorts"]
    ]

    window = OCVWindow("ODS output - Occupancy grid, zones and diagnostic")
    window.open()
    visualizer = ODSViz(
        o3r,
        "Hints:\nToggle cameras by typing\ntheir corresponding port number.\nPress 'q' to quit.",
    )
    try:
        while window.window_created:
            # Collect ODS output
            if active_cameras:
                zones = ods_stream.get_zones().zone_occupied
                raw_occupancy_grid = ods_stream.get_occupancy_grid().image
            else:
                zones = [1, 1, 1]
                raw_occupancy_grid = np.ones((200, 200), np.uint8) * 51

            # Generate a pretty visual
            ods_visualization = visualizer.render_visual(raw_occupancy_grid, zones)
            window.update_image(ods_visualization)

            # toggle cameras based on keyboard input
            if window.keypress > 0 and chr(window.keypress) in [
                str(port) for port in available_3d_port_ns
            ]:
                active_cameras = [
                    port[-1]
                    for port in o3r.get(
                        ["/applications/instances/app0/configuration/activePorts"]
                    )["applications"]["instances"]["app0"]["configuration"][
                        "activePorts"
                    ]
                ]
                if chr(window.keypress) in active_cameras:
                    active_cameras.remove(chr(window.keypress))
                else:
                    if len(active_cameras) < max_active_cameras:
                        active_cameras.append(chr(window.keypress))
                    else:
                        continue
                logger.info(
                    f"Updating ODS application to change state of port {chr(window.keypress)}"
                )
                o3r.set(
                    {
                        "applications": {
                            "instances": {
                                "app0": {
                                    "configuration": {
                                        "activePorts": [
                                            "port" + port_n for port_n in active_cameras
                                        ]
                                    }
                                }
                            }
                        }
                    }
                )
                visualizer.get_active_ports()

    except Exception as e:
        logger.info("Viewing interrupted, turning off viewer.")
        window.destroy()
        raise e


# %%
if __name__ == "__main__":
    try:
        # If the example python package was build, import the configuration
        from ovp8xxexamples import config

        IP = config.IP
        LOG_TO_FILE = config.LOG_TO_FILE
        CALIB_CFG_FILE = config.CALIB_CFG_FILE
        ODS_CFG_FILE = config.ODS_CFG_FILE
    except ImportError:
        # Otherwise, use default values
        print(
            "Unable to import the configuration.\nPlease run 'pip install -e .' from the python root directory"
        )
        IP = "192.168.0.69"
        LOG_TO_FILE = False
        CALIB_CFG_FILE = "configs/extrinsic_two_heads.json"
        ODS_CFG_FILE = "configs/ods_two_heads_config.json"
        print(f"Choosing alternative parameters: IP = {IP}, LOG_TO_FILE = {LOG_TO_FILE}", f"CALIB_CFG_FILE = {CALIB_CFG_FILE}, ODS_CFG_FILE = {ODS_CFG_FILE}")

    main(
        ip=IP,
        log_to_file=LOG_TO_FILE,
        calib_cfg_file=CALIB_CFG_FILE,
        ods_cfg_file=ODS_CFG_FILE,
    )