camera_source.py

Last modified: 2023-07-15

View on GitHub

This file defines the camera source class used in the RoboX system.

ExplanationCode

Set up logging

logger = logging.getLogger(__name__)

Define the resolution for RealSense depth capture

RS_DEPTH_CAPTURE_RES = (640, 480)

Unified image acquisition class for different types of cameras

class CameraSource:
    def __init__(self, default_config: Dict, target_color: str, cv_device_index: int = 0,
                 recording_source: Optional[Path] = None, recording_dest: Optional[Path] = None):
        """
        Initialize the CameraSource object.

        Args:
            default_config (Dict): Default camera configuration.
            target_color (str): Color of the target to detect.
            cv_device_index (int): Index of the OpenCV camera device.
            recording_source (Optional[Path]): Path to the recording source.
            recording_dest (Optional[Path]): Path to save the recording.
        """
        assert recording_source is None or recording_dest is None
        self.recording_source = recording_source
        self._rs_pipeline = None
        self._rs_frame_aligner = None
        self._cv_color_cap = None
        self._cv_depth_cap = None
        self.hik_frame_cap = None
        self.color_frame_writer = None
        self.depth_frame_writer = None
        self.active_cam_config = None
        self.hik_frame_init = None

        if recording_source is None:
            self.active_cam_config = default_config

            try:

Try to initialize RealSense camera

                import pyrealsense2 as rs

Configure depth and color streams

                pipeline = rs.pipeline()
                config = rs.config()

Get device product line for setting a supporting resolution

                pipeline_wrapper = rs.pipeline_wrapper(pipeline)
                pipeline_profile = config.resolve(pipeline_wrapper)
                device = pipeline_profile.get_device()
                device_name = str(device.get_info(rs.camera_info.name))

Set active camera configuration based on device name

                if device_name in camera_params:
                    self.active_cam_config = camera_params[device_name]
                else:
                    logger.warning(
                        f'Unknown device name: "{device_name}". Falling back to default configuration.')

Enable color stream

                config.enable_stream(rs.stream.color, self.active_cam_config['capture_res'][0],
                                     self.active_cam_config['capture_res'][1], rs.format.bgr8,
                                     self.active_cam_config['frame_rate'])

Enable depth stream if using stereo depth

                if self.active_cam_config['depth_source'] == DepthSource.STEREO:
                    config.enable_stream(rs.stream.depth, RS_DEPTH_CAPTURE_RES[0], RS_DEPTH_CAPTURE_RES[1],
                                         rs.format.z16,
                                         self.active_cam_config['frame_rate'])
                    frame_aligner = rs.align(rs.stream.color)
                else:
                    frame_aligner = None

Start streaming

                pipeline.start(config)

Get the sensor and set exposure

                sensor = pipeline.get_active_profile().get_device().query_sensors()[1]
                sensor.set_option(
                    rs.option.exposure, self.active_cam_config['exposure'][target_color])

                self._rs_pipeline = pipeline
                self._rs_frame_aligner = frame_aligner
            except ImportError:
                logger.warning(
                    'Intel RealSense backend is not available; pyrealsense2 could not be imported')
            except RuntimeError as ex:
                if len(ex.args) >= 1 and 'No device connected' in ex.args[0]:
                    logger.warning('No RealSense device was found')
                else:
                    raise

If RealSense is not available, try OpenCV camera

            if self._rs_pipeline is None:
                cap = cv2.VideoCapture()

                if cap.isOpened():
                    cap.open(cv_device_index)
                    cap.set(cv2.CAP_PROP_FOURCC,
                            cv2.VideoWriter_fourcc('M', 'J', 'P', 'G'))
                    cap.set(cv2.CAP_PROP_EXPOSURE,
                            self.active_cam_config['exposure'][target_color])
                    cap.set(cv2.CAP_PROP_FRAME_WIDTH,
                            self.active_cam_config['capture_res'][0])
                    cap.set(cv2.CAP_PROP_FRAME_HEIGHT,
                            self.active_cam_config['capture_res'][1])
                    cap.set(cv2.CAP_PROP_FPS, self.active_cam_config['frame_rate'])
                    self._cv_color_cap = cap

If built-in camera not available, open HIK camera

                else:
                    self.hik_frame_init = hik_init()

        else:

Load camera configuration from file if using recording source

            cam_config_path = recording_source.with_name(
                recording_source.name + '.config.json')
            with open(cam_config_path, 'r', encoding='utf8') as cam_config_file:
                self.active_cam_config = json.load(cam_config_file)

            self._cv_color_cap = cv2.VideoCapture(str(recording_source))

Initialize video writers if recording destination is provided

        self.color_frame_writer = self.depth_frame_writer = None
        if recording_dest is not None:
            cam_config_path = recording_dest.with_name(
                recording_dest.name + '.config.json')
            with open(cam_config_path, 'w', encoding='utf8') as cam_config_file:
                json.dump(self.active_cam_config, cam_config_file)

Set up video writers for color and depth frames

            codec = cv2.VideoWriter.fourcc('m', 'p', '4', 'v')
            color_frame_path = recording_dest.with_name(
                recording_dest.name + '.color.mp4')
            self.color_frame_writer = cv2.VideoWriter(str(color_frame_path), codec,
                                                      self.active_cam_config['frame_rate'],
                                                      self.active_cam_config['capture_res'])
            if self._rs_pipeline is not None:
                depth_frame_path = recording_dest.with_name(
                    recording_dest.name + '.depth.mp4')
                self.depth_frame_writer = cv2.VideoWriter(str(depth_frame_path), codec,
                                                          self.active_cam_config['frame_rate'],
                                                          self.active_cam_config['capture_res'])

    def get_frames(self) -> Tuple[Optional[np.ndarray], Optional[np.ndarray]]:
        """
        Capture and return color and depth frames from the active camera source.

        Returns:
            Tuple[Optional[np.ndarray], Optional[np.ndarray]]: Color and depth frames.
        """
        if self.recording_source is not None:

Read from recording source

            ret, color_image = self._cv_color_cap.read()
            if not ret:
                color_image = None

            if self._cv_depth_cap is not None:
                ret, depth_image = self._cv_depth_cap.read()
                if not ret:
                    depth_image = None
            else:
                depth_image = None

        elif self._rs_pipeline is not None:

Capture frames from RealSense camera

            frames = self._rs_pipeline.wait_for_frames()

            if self._rs_frame_aligner is not None:
                frames = self._rs_frame_aligner.process(frames)

            color_frame = frames.get_color_frame()
            if color_frame:
                color_image = np.asanyarray(color_frame.get_data())
            else:
                color_image = None

            depth_frame = frames.get_depth_frame()
            if depth_frame:
                depth_image = np.asanyarray(depth_frame.get_data())
            else:
                depth_image = None

        elif self.hik_frame_init is not None:

Capture frame from HIK camera

            color_image = read_hik_frame(self.hik_frame_init)
            depth_image = None

        elif self._cv_color_cap is not None:

Capture frame from OpenCV camera

            ret, color_image = self._cv_color_cap.read()
            if not ret:
                color_image = None

            if self._cv_depth_cap is None:
                depth_image = None
            else:
                ret, depth_image = self._cv_depth_cap.read()
                if ret:
                    B, G, R = cv2.split(depth_image)
                    depth_image = (B.astype(np.uint16) << 8) + \
                                  (G.astype(np.uint16) << 12) + R.astype(np.uint16)
                else:
                    depth_image = None

        else:
            raise RuntimeError('No image source available')

Write frames to video files if writers are initialized

        if self.color_frame_writer is not None and color_image is not None:
            self.color_frame_writer.write(color_image)

        if self.depth_frame_writer is not None and depth_image is not None:
            storage_format_image = cv2.merge(
                [(depth_image >> 8).astype(np.uint8), ((depth_image >> 12) % 16).astype(np.uint8),
                 (depth_image % 16).astype(np.uint8)])
            self.depth_frame_writer.write(storage_format_image)

        return color_image, depth_image

    def __del__(self):
        """
        Clean up resources when the object is deleted.
        """
        if self._rs_pipeline is not None:
            self._rs_pipeline.stop()

        if self.color_frame_writer is not None:
            self.color_frame_writer.release()

        if self.depth_frame_writer is not None:
            self.depth_frame_writer.release()

Uncomment the following lines if HIK camera cleanup is needed

if self.hik_frame_init is not None:

hik_close(self.hik_frame_init)