我正在OSX上读取一个网络摄像头,该摄像头可以通过以下简单脚本正常运行:

import cv2
camera = cv2.VideoCapture(0)

while True:
    try:
        (grabbed, frame) = camera.read()  # grab the current frame
        frame = cv2.resize(frame, (640, 480))  # resize the frame
        cv2.imshow("Frame", frame)  # show the frame to our screen
        cv2.waitKey(1)  # Display it at least one ms before going to the next frame
    except KeyboardInterrupt:
        # cleanup the camera and close any open windows
        camera.release()
        cv2.destroyAllWindows()
        print "\n\nBye bye\n"
        break

现在,我想在一个单独的过程中读取视频,为此我拥有更长的脚本,并且可以在Linux上的一个单独过程中正确读取视频:
import numpy as np
import time
import ctypes
import argparse

from multiprocessing import Array, Value, Process
import cv2


class VideoCapture:
    """
    Class that handles video capture from device or video file
    """
    def __init__(self, device=0, delay=0.):
        """
        :param device: device index or video filename
        :param delay: delay between frame captures in seconds(floating point is allowed)
        """
        self._cap = cv2.VideoCapture(device)
        self._delay = delay

    def _proper_frame(self, delay=None):
        """
        :param delay: delay between frames capture(in seconds)
        :param finished: synchronized wrapper for int(see multiprocessing.Value)
        :return: frame
        """
        snapshot = None
        correct_img = False
        fail_counter = -1
        while not correct_img:
            # Capture the frame
            correct_img, snapshot = self._cap.read()
            fail_counter += 1
            # Raise exception if there's no output from the device
            if fail_counter > 10:
                raise Exception("Capture: exceeded number of tries to capture the frame.")
            # Delay before we get a new frame
            time.sleep(delay)
        return snapshot

    def get_size(self):
        """
        :return: size of the captured image
        """
        return (int(self._cap.get(int(cv2.CAP_PROP_FRAME_HEIGHT))),
                int(self._cap.get(int(cv2.CAP_PROP_FRAME_WIDTH))), 3)

    def get_stream_function(self):
        """
        Returns stream_function object function
        """

        def stream_function(image, finished):
            """
            Function keeps capturing frames until finished = 1
            :param image: shared numpy array for multiprocessing(see multiprocessing.Array)
            :param finished: synchronized wrapper for int(see multiprocessing.Value)
            :return: nothing
            """
            # Incorrect input array
            if image.shape != self.get_size():
                raise Exception("Capture: improper size of the input image")
            print("Capture: start streaming")
            # Capture frame until we get finished flag set to True
            while not finished.value:
                image[:, :, :] = self._proper_frame(self._delay)
            # Release the device
            self.release()

        return stream_function

    def release(self):
        self._cap.release()


def main():
    # Add program arguments
    parser = argparse.ArgumentParser(description='Captures the video from the webcamera and \nwrites it into the output file with predefined fps.', formatter_class=argparse.ArgumentDefaultsHelpFormatter)
    parser.add_argument('-output', dest="output",  default="output.avi", help='name of the output video file')
    parser.add_argument('-log', dest="log",  default="frames.log", help='name of the log file')
    parser.add_argument('-fps', dest="fps",  default=25., help='frames per second value')

    # Read the arguments if any
    result = parser.parse_args()
    fps = float(result.fps)
    output = result.output
    log = result.log

    # Initialize VideoCapture object and auxilary objects
    cap = VideoCapture()
    shape = cap.get_size()
    stream = cap.get_stream_function()

    # Define shared variables(which are synchronised so race condition is excluded)
    shared_array_base = Array(ctypes.c_uint8, shape[0] * shape[1] * shape[2])
    frame = np.ctypeslib.as_array(shared_array_base.get_obj())
    frame = frame.reshape(shape[0], shape[1], shape[2])
    finished = Value('i', 0)

    # Start processes which run in parallel
    video_process = Process(target=stream, args=(frame, finished))
    video_process.start()  # Launch capture process

    # Sleep for some time to allow videocapture start working first
    time.sleep(2)

    # Termination function
    def terminate():
        print("Main: termination")
        finished.value = True
        # Wait for all processes to finish
        time.sleep(1)
        # Terminate working processes
        video_process.terminate()

    # The capturing works until keyboard interrupt is pressed.
    while True:
        try:
            # Display the resulting frame
            cv2.imshow('frame', frame)
            cv2.waitKey(1)  # Display it at least one ms before going to the next frame
            time.sleep(0.1)

        except KeyboardInterrupt:
            cv2.destroyAllWindows()
            terminate()
            break

if __name__ == '__main__':
    main()

这在Linux上可以正常工作,但是在OSX上却遇到了麻烦,因为它似乎无法在创建的.read()对象(存储在var cv2.VideoCapture(device)中)上执行self._cap

经过一番搜索,我发现this SO answer,建议使用Billiard,它是python多处理的替代品,据说它有一些非常有用的改进。因此,在文件的顶部,我只是在先前的多处理导入之后添加了导入(有效地覆盖了multiprocessing.Process):
from billiard import Process, forking_enable

video_process变量实例化之前,我使用forking_enable,如下所示:
forking_enable(0)  # Supposedly this is all I need for billiard to do it's magic
video_process = Process(target=stream, args=(frame, finished))

因此,在这个版本(here on pastebin)中,我再次运行该文件,这给了我这个错误:



搜索该错误导致我找到an SO question with a long list of answers,其中一个建议是使用dill serialization lib解决此问题。但是,该lib应该与Pathos multiprocessing fork一起使用。所以我只是尝试从更改我的多处理导入行
from multiprocessing import Array, Value, Process


from pathos.multiprocessing import Array, Value, Process

但是Array包中似乎不存在ValueProcesspathos.multiprocessing

从这一点上我完全迷失了。我正在搜索我几乎不了解的东西,甚至不知道该朝哪个方向搜索或调试。

那么,有比我更聪明的人可以帮助我在单独的过程中捕获视频吗?欢迎所有提示!

最佳答案

您的第一个问题是您无法通过forked进程访问网络摄像头。当外部库与fork一起使用时会出现几个问题,因为fork操作不能清除父进程打开的所有文件描述符,从而导致奇怪的行为。该库通常对于Linux上的此类问题更健壮,但是在两个进程之间共享IO对象(例如cv2.VideoCapture)不是一个好主意。

当您使用billard.forking_enabled并将其设置为False时,您要求库不要使用fork来生成新进程,而是使用spawnforkserver方法来生成新进程,因为它们关闭了所有文件描述符,但它们启动起来也较慢,因此更干净,这不应该是在您的情况下。如果您使用的是python3.4+,则可以使用multiprocessing.set_start_method('forkserver')进行此操作。

当您使用这些方法之一时,目标函数和参数需要序列化以传递给子进程。序列化默认情况下使用pickle完成,正如您提到的那样,它具有多个流程,无法序列化本地定义的对象以及cv2.VideoCapture。但是您可以简化程序,使Process的所有参数都可腌制。这是解决您的问题的一种尝试:

import numpy as np
import time
import ctypes

from multiprocessing import set_start_method
from multiprocessing import Process, Array, Value
import cv2


class VideoCapture:
    """
    Class that handles video capture from device or video file
    """
    def __init__(self, device=0, delay=0.):
        """
        :param device: device index or video filename
        :param delay: delay between frame captures in seconds(float allowed)
        """
        self._delay = delay
        self._device = device
        self._cap = cv2.VideoCapture(device)
        assert self._cap.isOpened()

    def __getstate__(self):
        self._cap.release()
        return (self._delay, self._device)

    def __setstate__(self, state):
        self._delay, self._device = state
        self._cap = cv2.VideoCapture(self._device)
        assert self._cap.grab(), "The child could not grab the video capture"

    def _proper_frame(self, delay=None):
        """
        :param delay: delay between frames capture(in seconds)
        :param finished: synchronized wrapper for int
        :return: frame
        """
        snapshot = None
        correct_img = False
        fail_counter = -1
        while not correct_img:
            # Capture the frame
            correct_img, snapshot = self._cap.read()
            fail_counter += 1
            # Raise exception if there's no output from the device
            if fail_counter > 10:
                raise Exception("Capture: exceeded number of tries to capture "
                                "the frame.")
            # Delay before we get a new frame
            time.sleep(delay)
        return snapshot

    def get_size(self):
        """
        :return: size of the captured image
        """
        return (int(self._cap.get(int(cv2.CAP_PROP_FRAME_HEIGHT))),
                int(self._cap.get(int(cv2.CAP_PROP_FRAME_WIDTH))), 3)

    def release(self):
        self._cap.release()


def stream(capturer, image, finished):
    """
    Function keeps capturing frames until finished = 1
    :param image: shared numpy array for multiprocessing
    :param finished: synchronized wrapper for int
    :return: nothing
    """
    shape = capturer.get_size()

    # Define shared variables
    frame = np.ctypeslib.as_array(image.get_obj())
    frame = frame.reshape(shape[0], shape[1], shape[2])

    # Incorrect input array
    if frame.shape != capturer.get_size():
        raise Exception("Capture: improper size of the input image")
    print("Capture: start streaming")
    # Capture frame until we get finished flag set to True
    while not finished.value:
        frame[:, :, :] = capturer._proper_frame(capturer._delay)

    # Release the device
    capturer.release()


def main():

    # Initialize VideoCapture object and auxilary objects
    cap = VideoCapture()
    shape = cap.get_size()

    # Define shared variables
    shared_array_base = Array(ctypes.c_uint8, shape[0] * shape[1] * shape[2])
    frame = np.ctypeslib.as_array(shared_array_base.get_obj())
    frame = frame.reshape(shape[0], shape[1], shape[2])
    finished = Value('i', 0)

    # Start processes which run in parallel
    video_process = Process(target=stream,
                            args=(cap, shared_array_base, finished))
    video_process.start()  # Launch capture process

    # Sleep for some time to allow videocapture start working first
    time.sleep(2)

    # Termination function
    def terminate():
        print("Main: termination")
        finished.value = True
        # Wait for all processes to finish
        time.sleep(1)
        # Terminate working processes
        video_process.join()

    # The capturing works until keyboard interrupt is pressed.
    while True:
        try:
            # Display the resulting frame
            cv2.imshow('frame', frame)
            # Display it at least one ms before going to the next frame
            time.sleep(0.1)
            cv2.waitKey(1)

        except KeyboardInterrupt:
            cv2.destroyAllWindows()
            terminate()
            break


if __name__ == '__main__':
    set_start_method("spawn")
    main()

我目前无法在Mac上对其进行测试,因此它可能无法立即使用,但不应出现与multiprocessing相关的错误。一些注意事项:
  • 我在新子级中实例化cv2.VideoCapture对象,并抓取相机,因为仅应从相机读取一个进程。
  • 也许第一个使用fork的程序中的问题仅是由于共享的cv2.VideoCapture所致,在stream函数中重新创建它可以解决您的问题。
  • 您不能将numpy包装器传递给 child ,因为它不会共享mp.Array缓冲区(这确实很奇怪,花了我一段时间才弄清楚)。您需要显式传递Array并重新创建包装器。
  • 也许第一个使用fork的程序中的问题仅是由于共享的cv2.VideoCapture所致,在stream函数中重新创建它可以解决您的问题。
  • 我以为您在python3.4+中运行代码,所以我没有使用billard,但使用forking_enabled(False)而不是set_start_method应该有点类似。

  • 让我知道这项工作!

    关于python - 如何在OSX上的单独过程中读取网络摄像头?,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/43572744/

    10-10 08:27