Tinker Board Forum

Full Version: Software for Camera Control in Python
You're currently viewing a stripped down version of our content. View the full version with proper formatting.
Hi folks, just wanted to share a library we have developed for controlling the camera (tested with picamera v2).
The library allows to set exposure time, gain and analogic gain.


Code:
'''
Copyright (C) 2019 MVISIA

This library is free software; you can redistribute it and/or
modify it under the terms of the GNU Library General Public
License as published by the Free Software Foundation; either
version 2 of the License, or (at your option) any later version.

This library is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
Library General Public License for more details.

You should have received a copy of the GNU Library General Public
License along with this library; if not, write to the
Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
Boston, MA  02110-1301, USA.

'''

from gi.repository import Gst, GObject
Gst.init([])
GObject.threads_init()
src = Gst.ElementFactory.make('rkcamsrc') # rkcamsrc is a singleton and needs to be instantiated before importing cv2


import os
import time
import numpy as np
import logging
logging = logging.getLogger('server_log')
import glob
import cv2 as cv
from queue import Queue, Full


class IMX219:
    max_exposure = 4095
    min_exposure = 0
    max_gain = 43663.0
    min_gain = 256.0
    max_again = 2816.0
    min_again = 256.0
    max_height = 1050
    max_width = 1400


def unnorm(val, vmin, vmax):
    return int(round(val*(vmax - vmin)+vmin))


def norm(val, vmin, vmax):
    return (val-vmin)/(vmax-vmin)


def check_limits(val, vmin=None, vmax=None):
    if vmin is not None:
        if val < vmin:
            return True
    if vmax is not None:
        if val > vmax:
            return True
    return False


class MIPICamera():
    def __init__(self, width=1400, height=1050, cropx0=0, cropx1=0, cropy0=0, cropy1=0, buffer_size=2, exposure=None, gain=0.0, again=0.0):
        self.buffer_size = buffer_size
        self.queue = Queue(buffer_size)

        self.src = src
        self.setup_src(cropx0, cropx1, cropy0, cropy1)
        self.sink = Gst.ElementFactory.make('appsink')
        self.init_caps(width, height)
        self.init_callbacks()

        # configure aec
        if exposure is None:
            self.src.props.isp_mode ='2A'
        else:
            self.src.props.isp_mode ='0A'
            gain = unnorm(gain, IMX219.min_gain, IMX219.max_gain)
            again = unnorm(again, IMX219.min_again, IMX219.max_again)
            exposure = np.clip(exposure*10, IMX219.min_exposure, IMX219.max_exposure)

            # always set analog gain before gain
            os.system('v4l2-ctl -d /dev/video0 -c analogue_gain=%d' % again)
            os.system('v4l2-ctl -d /dev/video0 -c gain=%d' % gain)
            os.system('v4l2-ctl -d /dev/video0 -c exposure=%d' % exposure)
            logging.info('V4L2-CTL: Exposure Time: %d, gain: %d, analogue_gain: %d' % (exposure, gain, again))

        self.pipeline.set_state(Gst.State.PLAYING)

    def setup_src(self, cropx0, cropx1, cropy0, cropy1):
            self.src.props.output_crop = '%dx%dx%dx%d' % (cropx0, cropy0, cropx1, cropy1)
            self.src.props.input_crop = '%dx%dx%dx%d' % (cropx0, cropy0, cropx1, cropy1)
            self.src.props.device = '/dev/video0'
            self.src.props.io_mode = 4
            self.src.props.tuning_xml_path = '/etc/cam_iq/IMX219.xml'

    def init_caps(self, width, height):
        self.caps = Gst.ElementFactory.make('capsfilter')
        capsprop = Gst.caps_from_string('video/x-raw,format=NV12,width=%d,height=%d' %
                                        (width, height))
        if check_limits(width, vmax=IMX219.max_width) or check_limits(height, vmax=IMX219.max_height):
            raise PipelineCameraError('Max size is %dx%d' % (IMX219.max_height, IMX219.max_width))
        self.caps.set_property('caps', capsprop)
        self.pipeline = Gst.Pipeline()
        self.pipeline.add(self.src, self.caps, self.sink)
        src.link_filtered(self.sink, capsprop)

    def init_callbacks(self):
        self.sink.set_property('emit-signals', True)
        self.sink.set_property('sync', True)
        self.sink.connect('new-sample', self.on_sample, self.sink)
        self.bus = self.pipeline.get_bus()
        self.bus.enable_sync_message_emission()
        self.bus.add_signal_watch()

    def close(self):
        self.pipeline.set_state(Gst.State.NULL)
        self.pipeline.remove(self.src)
        self.pipeline.unlink(self.sink)
        self.pipeline.unlink(self.caps)

    def parse_sample(self, sample):
        buf = sample.get_buffer()
        caps = sample.get_caps().get_structure(0)
        f, h, w = caps.get_value('format'), caps.get_value('height'), caps.get_value('width')
        if f == 'NV12':
            img = np.ndarray((int(h*1.5), w, 1), buffer=buf.extract_dup(0, buf.get_size()), dtype=np.uint8)
            img = cv.cvtColor(img, cv.COLOR_YUV2BGR_NV12)
            return img
        else:
            raise ValueError('Unknown format')

    def on_sample(self, sink, data):
        sample = sink.emit('pull-sample')
        try:
            self.queue.put(self.parse_sample(sample), block=False) # DONT BLOCK THIS THREAD !!!
        except Full:
            pass
        return Gst.FlowReturn.OK

    def __call__(self):
        try:
            img = self.queue.get(timeout=2.5)
        except Full:
            raise PipelineCameraError('Camera capture failed')
        return img


if __name__ == '__main__':
    cam = MIPICamera()
    while True:
        img = cam()
        cv.imshow('win', img)
        if cv.waitKey(1) & 0xFF == ord('q'):
            break

Best regards,
A great praise to you Smile