Using USB API (MTP) with libghoto2 and Python bindings on MacOS, Raspberry Pi, Linux, ROS

Here is some code using the gphoto2 python module…I have a UI that uses this to shoot HDRI exposures…an example in _unittest(). Not totally polished but maybe useful to others. Works with Z1:

"""
USB api for added performance over http

Theta api reference:
https://developers.theta360.com/en/docs/v2/usb_reference/

Unable to get mtp or ptp to connect to the camera; After some pain was able to get gphoto2 working
"""

import os
import time

import gphoto2 as gp

# Properties
SHUTTER_SPEED = 'd00f'
EXPOSURE_INDEX = '500f'
F_NUMBER = '5007'
AUDIO_VOLUME = '502c'
COLOR_TEMPERATURE = 'd813'
EXPOSURE_PROGRAM_MODE = '500e'

# milliseconds
TIMEOUT = 10
TIMEOUT_CAPTURE_DNG = 10000


def wait_for_event(camera, timeout=TIMEOUT, event_type=gp.GP_EVENT_TIMEOUT):
    """
    Wait for event_type to to be triggered.
    :param camera:
    :param timeout:
    :param event_type:
    :return: event_data
    """
    while True:
        _event_type, event_data = camera.wait_for_event(timeout)
        if _event_type == gp.GP_EVENT_TIMEOUT:
            return
        if _event_type == event_type:
            return event_data


def set_config_by_index(config, index):
    """Set config using choice index"""
    value = config.get_choice(index)
    config.set_value(value)

    return config


# def list_files(camera, path='/'):
#     result = []
#     # get files
#     for name, value in camera.folder_list_files(path):
#         result.append(os.path.join(path, name))
#     # read folders
#     folders = []
#     for name, value in camera.folder_list_folders(path):
#         folders.append(name)
#     # recurse over subfolders
#     for name in folders:
#         result.extend(list_files(camera, os.path.join(path, name)))
#     return result
#
#
# def get_file_info(camera, path):
#     folder, name = os.path.split(path)
#     return camera.file_get_info(folder, name)


class CameraUsb(object):
    """
    Define API for multiple exposure
    """
    def __init__(self, verbose=False):
        self.verbose = verbose

        self.camera = gp.Camera()

        self.camera_config = None
        self.status_config = None
        self.other_config = None
        self.shutter_speed_config = None
        self.shutter_speed_options = []

    def init(self):
        """
        Set manual exposure and other defaults
        :return: config
        """
        try:
            self.camera_config = self.camera.get_config()
        except gp.GPhoto2Error:
            raise RuntimeError("Unable to connect to Camera")

        self.other_config = self.camera_config.get_child_by_name('other')

        # Manual/f-stop/iso
        exposure_program_mode = self.other_config.get_child_by_name(EXPOSURE_PROGRAM_MODE)
        if not exposure_program_mode.get_value() == '1':
            print('Setting camera to Manual exposure program')
            exposure_program_mode.set_value('1')
            self.camera.set_config(self.camera_config)
            wait_for_event(self.camera)

            # When switching exposure program, we need to refresh the configs
            self.camera_config = self.camera.get_config()
            self.other_config = self.camera_config.get_child_by_name('other')

        self.status_config = self.camera_config.get_child_by_name('status')

        self.shutter_speed_config = self.other_config.get_child_by_name(SHUTTER_SPEED)
        self.shutter_speed_options = [str(x) for x in self.shutter_speed_config.get_choices()]
        if len(self.shutter_speed_options) != 61:
            raise RuntimeError('Unble to determine shutter speed options; restart app')

        fstop = self.other_config.get_child_by_name(F_NUMBER)
        fstop.set_value('560')

        iso = self.other_config.get_child_by_name(EXPOSURE_INDEX)
        iso.set_value('80')

        self.camera.set_config(self.camera_config)
        wait_for_event(self.camera)

    def get_info(self):
        """
        :return: Dict containing serialnumber, batterylevel, remainingpictures, etc
        """
        if not self.camera_config:
            self.init()

        battery_level = self.status_config.get_child_by_name('batterylevel').get_value()
        # Convert '67%' to int
        battery_level = int(''.join([x for x in battery_level if x.isdigit()]))

        info = {'serialnumber': self.status_config.get_child_by_name('serialnumber').get_value(),
                'cameramodel': self.status_config.get_child_by_name('cameramodel').get_value(),
                'deviceversion': self.status_config.get_child_by_name('deviceversion').get_value(),
                'batterylevel': battery_level,
                'remainingpictures': int(self.camera.get_storageinfo()[0].freeimages)}
        return info

    def take_picture(self, shutter_speed_index=None, color_temperature=None, volume=None):
        """
        Set camera options and take picture
        Blocking
        :param shutter_speed_index: int in range 0-60 (0 fastest shutter)
        :param color_temperature: in in range 2500-10000 by 100 increment
        :param volume: int in range 0-100
        :return: (jpg_path, dng_path)
        """
        t1 = time.time()
        if not self.camera_config:
            self.init()

        if shutter_speed_index is not None:
            self.shutter_speed_config.set_value(self.shutter_speed_options[shutter_speed_index])

        if color_temperature is not None:
            self.other_config.get_child_by_name(COLOR_TEMPERATURE).set_value(color_temperature)

        if volume is not None:
            self.other_config.get_child_by_name(AUDIO_VOLUME).set_value(str(volume))

        self.camera.set_config(self.camera_config)
        # We need this even though no event is triggered
        wait_for_event(self.camera)

        gp_jpg_path = self.camera.capture(gp.GP_CAPTURE_IMAGE)

        gp_dng_path = wait_for_event(self.camera, timeout=TIMEOUT_CAPTURE_DNG, event_type=gp.GP_EVENT_FILE_ADDED)
        if not gp_dng_path:
            raise RuntimeError('Unable to copy DNG')

        jpg_path = os.path.join(gp_jpg_path.folder, gp_jpg_path.name)
        dng_path = os.path.join(gp_dng_path.folder, gp_dng_path.name)

        print('Capture took %0.03f sec' % (time.time() - t1, ))
        return jpg_path, dng_path

    def download_file(self, src_path, dst_path, delete=True):
        """Copy the file from the camera src_path to local dst_path"""
        t1 = time.time()

        src_folder, src_name = os.path.split(src_path)
        src_file = self.camera.file_get(src_folder, src_name, gp.GP_FILE_TYPE_NORMAL)
        print('Download %s ->\n\t%s' % (src_path, dst_path))
        src_file.save(dst_path)
        wait_for_event(self.camera)
        print('Download took %0.03f sec' % (time.time() - t1, ))

        if delete:
            t1 = time.time()
            print('Delete %s' % src_path)
            self.camera.file_delete(src_folder, src_name)
            wait_for_event(self.camera)
            print('Delete took %0.03f sec' % (time.time() - t1, ))


def _unittest():
    """test a short exposure sequence"""
    # temporary directory
    dst_template = '/tmp/theta/capture.%04d.%s'

    t1 = time.time()
    camera = CameraUsb()

    camera.init()

    print(camera.get_info())

    frame = 1
    jpg_path, dng_path = camera.take_picture(0)
    print(jpg_path, dng_path)
    camera.download_file(dng_path, dst_template % (frame, 'dng'))
    frame += 1

    jpg_path, dng_path = camera.take_picture(24)
    print(jpg_path, dng_path)
    camera.download_file(dng_path, dst_template % (frame, 'dng'))
    frame += 1

    jpg_path, dng_path = camera.take_picture(42)
    print(jpg_path, dng_path)
    camera.download_file(dng_path, dst_template % (frame, 'dng'))
    frame += 1
    print('Done in %0.03f sec' % (time.time() - t1, ))


if __name__ == "__main__":

    _unittest()

2 Likes