Set up logging | logger = logging.getLogger(__name__)
|
Define the resolution for RealSense depth capture | RS_DEPTH_CAPTURE_RES = (640, 480)
|
Unified image acquisition class for different types of cameras | class CameraSource:
def __init__(self, default_config: Dict, target_color: str, cv_device_index: int = 0,
recording_source: Optional[Path] = None, recording_dest: Optional[Path] = None):
"""
Initialize the CameraSource object.
Args:
default_config (Dict): Default camera configuration.
target_color (str): Color of the target to detect.
cv_device_index (int): Index of the OpenCV camera device.
recording_source (Optional[Path]): Path to the recording source.
recording_dest (Optional[Path]): Path to save the recording.
"""
assert recording_source is None or recording_dest is None
self.recording_source = recording_source
self._rs_pipeline = None
self._rs_frame_aligner = None
self._cv_color_cap = None
self._cv_depth_cap = None
self.hik_frame_cap = None
self.color_frame_writer = None
self.depth_frame_writer = None
self.active_cam_config = None
self.hik_frame_init = None
if recording_source is None:
self.active_cam_config = default_config
try:
|
Try to initialize RealSense camera | import pyrealsense2 as rs
|
Configure depth and color streams | pipeline = rs.pipeline()
config = rs.config()
|
Get device product line for setting a supporting resolution | pipeline_wrapper = rs.pipeline_wrapper(pipeline)
pipeline_profile = config.resolve(pipeline_wrapper)
device = pipeline_profile.get_device()
device_name = str(device.get_info(rs.camera_info.name))
|
Set active camera configuration based on device name | if device_name in camera_params:
self.active_cam_config = camera_params[device_name]
else:
logger.warning(
f'Unknown device name: "{device_name}". Falling back to default configuration.')
|
Enable color stream | config.enable_stream(rs.stream.color, self.active_cam_config['capture_res'][0],
self.active_cam_config['capture_res'][1], rs.format.bgr8,
self.active_cam_config['frame_rate'])
|
Enable depth stream if using stereo depth | if self.active_cam_config['depth_source'] == DepthSource.STEREO:
config.enable_stream(rs.stream.depth, RS_DEPTH_CAPTURE_RES[0], RS_DEPTH_CAPTURE_RES[1],
rs.format.z16,
self.active_cam_config['frame_rate'])
frame_aligner = rs.align(rs.stream.color)
else:
frame_aligner = None
|
Start streaming | pipeline.start(config)
|
Get the sensor and set exposure | sensor = pipeline.get_active_profile().get_device().query_sensors()[1]
sensor.set_option(
rs.option.exposure, self.active_cam_config['exposure'][target_color])
self._rs_pipeline = pipeline
self._rs_frame_aligner = frame_aligner
except ImportError:
logger.warning(
'Intel RealSense backend is not available; pyrealsense2 could not be imported')
except RuntimeError as ex:
if len(ex.args) >= 1 and 'No device connected' in ex.args[0]:
logger.warning('No RealSense device was found')
else:
raise
|
If RealSense is not available, try OpenCV camera | if self._rs_pipeline is None:
cap = cv2.VideoCapture()
if cap.isOpened():
cap.open(cv_device_index)
cap.set(cv2.CAP_PROP_FOURCC,
cv2.VideoWriter_fourcc('M', 'J', 'P', 'G'))
cap.set(cv2.CAP_PROP_EXPOSURE,
self.active_cam_config['exposure'][target_color])
cap.set(cv2.CAP_PROP_FRAME_WIDTH,
self.active_cam_config['capture_res'][0])
cap.set(cv2.CAP_PROP_FRAME_HEIGHT,
self.active_cam_config['capture_res'][1])
cap.set(cv2.CAP_PROP_FPS, self.active_cam_config['frame_rate'])
self._cv_color_cap = cap
|
If built-in camera not available, open HIK camera | else:
self.hik_frame_init = hik_init()
else:
|
Load camera configuration from file if using recording source | cam_config_path = recording_source.with_name(
recording_source.name + '.config.json')
with open(cam_config_path, 'r', encoding='utf8') as cam_config_file:
self.active_cam_config = json.load(cam_config_file)
self._cv_color_cap = cv2.VideoCapture(str(recording_source))
|
Initialize video writers if recording destination is provided | self.color_frame_writer = self.depth_frame_writer = None
if recording_dest is not None:
cam_config_path = recording_dest.with_name(
recording_dest.name + '.config.json')
with open(cam_config_path, 'w', encoding='utf8') as cam_config_file:
json.dump(self.active_cam_config, cam_config_file)
|
Set up video writers for color and depth frames | codec = cv2.VideoWriter.fourcc('m', 'p', '4', 'v')
color_frame_path = recording_dest.with_name(
recording_dest.name + '.color.mp4')
self.color_frame_writer = cv2.VideoWriter(str(color_frame_path), codec,
self.active_cam_config['frame_rate'],
self.active_cam_config['capture_res'])
if self._rs_pipeline is not None:
depth_frame_path = recording_dest.with_name(
recording_dest.name + '.depth.mp4')
self.depth_frame_writer = cv2.VideoWriter(str(depth_frame_path), codec,
self.active_cam_config['frame_rate'],
self.active_cam_config['capture_res'])
def get_frames(self) -> Tuple[Optional[np.ndarray], Optional[np.ndarray]]:
"""
Capture and return color and depth frames from the active camera source.
Returns:
Tuple[Optional[np.ndarray], Optional[np.ndarray]]: Color and depth frames.
"""
if self.recording_source is not None:
|
Read from recording source | ret, color_image = self._cv_color_cap.read()
if not ret:
color_image = None
if self._cv_depth_cap is not None:
ret, depth_image = self._cv_depth_cap.read()
if not ret:
depth_image = None
else:
depth_image = None
elif self._rs_pipeline is not None:
|
Capture frames from RealSense camera | frames = self._rs_pipeline.wait_for_frames()
if self._rs_frame_aligner is not None:
frames = self._rs_frame_aligner.process(frames)
color_frame = frames.get_color_frame()
if color_frame:
color_image = np.asanyarray(color_frame.get_data())
else:
color_image = None
depth_frame = frames.get_depth_frame()
if depth_frame:
depth_image = np.asanyarray(depth_frame.get_data())
else:
depth_image = None
elif self.hik_frame_init is not None:
|
Capture frame from HIK camera | color_image = read_hik_frame(self.hik_frame_init)
depth_image = None
elif self._cv_color_cap is not None:
|
Capture frame from OpenCV camera | ret, color_image = self._cv_color_cap.read()
if not ret:
color_image = None
if self._cv_depth_cap is None:
depth_image = None
else:
ret, depth_image = self._cv_depth_cap.read()
if ret:
B, G, R = cv2.split(depth_image)
depth_image = (B.astype(np.uint16) << 8) + \
(G.astype(np.uint16) << 12) + R.astype(np.uint16)
else:
depth_image = None
else:
raise RuntimeError('No image source available')
|
Write frames to video files if writers are initialized | if self.color_frame_writer is not None and color_image is not None:
self.color_frame_writer.write(color_image)
if self.depth_frame_writer is not None and depth_image is not None:
storage_format_image = cv2.merge(
[(depth_image >> 8).astype(np.uint8), ((depth_image >> 12) % 16).astype(np.uint8),
(depth_image % 16).astype(np.uint8)])
self.depth_frame_writer.write(storage_format_image)
return color_image, depth_image
def __del__(self):
"""
Clean up resources when the object is deleted.
"""
if self._rs_pipeline is not None:
self._rs_pipeline.stop()
if self.color_frame_writer is not None:
self.color_frame_writer.release()
if self.depth_frame_writer is not None:
self.depth_frame_writer.release()
|
Uncomment the following lines if HIK camera cleanup is needed |
|
if self.hik_frame_init is not None: |
|
hik_close(self.hik_frame_init) |
|