This forum uses cookies
This forum makes use of cookies to store your login information if you are registered, and your last visit if you are not. Cookies are small text documents stored on your computer; the cookies set by this forum can only be used on this website and pose no security risk. Cookies on this forum also track the specific topics you have read and when you last read them. Please confirm whether you accept or reject these cookies being set.

A cookie will be stored in your browser regardless of choice to prevent you being asked this question again. You will be able to change your cookie settings at any time using the link in the footer.

Thread Rating:
  • 0 Vote(s) - 0 Average
  • 1
  • 2
  • 3
  • 4
  • 5
Software for Camera Control in Python
#1
Hi folks, just wanted to share a library we have developed for controlling the camera (tested with picamera v2).
The library allows to set exposure time, gain and analogic gain.


Code:
'''
Copyright (C) 2019 MVISIA

This library is free software; you can redistribute it and/or
modify it under the terms of the GNU Library General Public
License as published by the Free Software Foundation; either
version 2 of the License, or (at your option) any later version.

This library is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
Library General Public License for more details.

You should have received a copy of the GNU Library General Public
License along with this library; if not, write to the
Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
Boston, MA  02110-1301, USA.

'''

from gi.repository import Gst, GObject
Gst.init([])
GObject.threads_init()
src = Gst.ElementFactory.make('rkcamsrc') # rkcamsrc is a singleton and needs to be instantiated before importing cv2


import os
import time
import numpy as np
import logging
logging = logging.getLogger('server_log')
import glob
import cv2 as cv
from queue import Queue, Full


class IMX219:
    max_exposure = 4095
    min_exposure = 0
    max_gain = 43663.0
    min_gain = 256.0
    max_again = 2816.0
    min_again = 256.0
    max_height = 1050
    max_width = 1400


def unnorm(val, vmin, vmax):
    return int(round(val*(vmax - vmin)+vmin))


def norm(val, vmin, vmax):
    return (val-vmin)/(vmax-vmin)


def check_limits(val, vmin=None, vmax=None):
    if vmin is not None:
        if val < vmin:
            return True
    if vmax is not None:
        if val > vmax:
            return True
    return False


class MIPICamera():
    def __init__(self, width=1400, height=1050, cropx0=0, cropx1=0, cropy0=0, cropy1=0, buffer_size=2, exposure=None, gain=0.0, again=0.0):
        self.buffer_size = buffer_size
        self.queue = Queue(buffer_size)

        self.src = src
        self.setup_src(cropx0, cropx1, cropy0, cropy1)
        self.sink = Gst.ElementFactory.make('appsink')
        self.init_caps(width, height)
        self.init_callbacks()

        # configure aec
        if exposure is None:
            self.src.props.isp_mode ='2A'
        else:
            self.src.props.isp_mode ='0A'
            gain = unnorm(gain, IMX219.min_gain, IMX219.max_gain)
            again = unnorm(again, IMX219.min_again, IMX219.max_again)
            exposure = np.clip(exposure*10, IMX219.min_exposure, IMX219.max_exposure)

            # always set analog gain before gain
            os.system('v4l2-ctl -d /dev/video0 -c analogue_gain=%d' % again)
            os.system('v4l2-ctl -d /dev/video0 -c gain=%d' % gain)
            os.system('v4l2-ctl -d /dev/video0 -c exposure=%d' % exposure)
            logging.info('V4L2-CTL: Exposure Time: %d, gain: %d, analogue_gain: %d' % (exposure, gain, again))

        self.pipeline.set_state(Gst.State.PLAYING)

    def setup_src(self, cropx0, cropx1, cropy0, cropy1):
            self.src.props.output_crop = '%dx%dx%dx%d' % (cropx0, cropy0, cropx1, cropy1)
            self.src.props.input_crop = '%dx%dx%dx%d' % (cropx0, cropy0, cropx1, cropy1)
            self.src.props.device = '/dev/video0'
            self.src.props.io_mode = 4
            self.src.props.tuning_xml_path = '/etc/cam_iq/IMX219.xml'

    def init_caps(self, width, height):
        self.caps = Gst.ElementFactory.make('capsfilter')
        capsprop = Gst.caps_from_string('video/x-raw,format=NV12,width=%d,height=%d' %
                                        (width, height))
        if check_limits(width, vmax=IMX219.max_width) or check_limits(height, vmax=IMX219.max_height):
            raise PipelineCameraError('Max size is %dx%d' % (IMX219.max_height, IMX219.max_width))
        self.caps.set_property('caps', capsprop)
        self.pipeline = Gst.Pipeline()
        self.pipeline.add(self.src, self.caps, self.sink)
        src.link_filtered(self.sink, capsprop)

    def init_callbacks(self):
        self.sink.set_property('emit-signals', True)
        self.sink.set_property('sync', True)
        self.sink.connect('new-sample', self.on_sample, self.sink)
        self.bus = self.pipeline.get_bus()
        self.bus.enable_sync_message_emission()
        self.bus.add_signal_watch()

    def close(self):
        self.pipeline.set_state(Gst.State.NULL)
        self.pipeline.remove(self.src)
        self.pipeline.unlink(self.sink)
        self.pipeline.unlink(self.caps)

    def parse_sample(self, sample):
        buf = sample.get_buffer()
        caps = sample.get_caps().get_structure(0)
        f, h, w = caps.get_value('format'), caps.get_value('height'), caps.get_value('width')
        if f == 'NV12':
            img = np.ndarray((int(h*1.5), w, 1), buffer=buf.extract_dup(0, buf.get_size()), dtype=np.uint8)
            img = cv.cvtColor(img, cv.COLOR_YUV2BGR_NV12)
            return img
        else:
            raise ValueError('Unknown format')

    def on_sample(self, sink, data):
        sample = sink.emit('pull-sample')
        try:
            self.queue.put(self.parse_sample(sample), block=False) # DONT BLOCK THIS THREAD !!!
        except Full:
            pass
        return Gst.FlowReturn.OK

    def __call__(self):
        try:
            img = self.queue.get(timeout=2.5)
        except Full:
            raise PipelineCameraError('Camera capture failed')
        return img


if __name__ == '__main__':
    cam = MIPICamera()
    while True:
        img = cam()
        cv.imshow('win', img)
        if cv.waitKey(1) & 0xFF == ord('q'):
            break

Best regards,
Reply
#2
A great praise to you Smile
Light blue words might be a link. Have you try to click on them? Big Grin
Reply


Forum Jump:


Users browsing this thread: 1 Guest(s)