
import cv2
camera = cv2.VideoCapture(0)

while True:
        (grabbed, frame) = camera.read()  # grab the current frame
        frame = cv2.resize(frame, (640, 480))  # resize the frame
        cv2.imshow("Frame", frame)  # show the frame to our screen
        cv2.waitKey(1)  # Display it at least one ms before going to the next frame
    except KeyboardInterrupt:
        # cleanup the camera and close any open windows
        print "\n\nBye bye\n"

import numpy as np
import time
import ctypes
import argparse

from multiprocessing import Array, Value, Process
import cv2

class VideoCapture:
    Class that handles video capture from device or video file
    def __init__(self, device=0, delay=0.):
        :param device: device index or video filename
        :param delay: delay between frame captures in seconds(floating point is allowed)
        self._cap = cv2.VideoCapture(device)
        self._delay = delay

    def _proper_frame(self, delay=None):
        :param delay: delay between frames capture(in seconds)
        :param finished: synchronized wrapper for int(see multiprocessing.Value)
        :return: frame
        snapshot = None
        correct_img = False
        fail_counter = -1
        while not correct_img:
            # Capture the frame
            correct_img, snapshot = self._cap.read()
            fail_counter += 1
            # Raise exception if there's no output from the device
            if fail_counter > 10:
                raise Exception("Capture: exceeded number of tries to capture the frame.")
            # Delay before we get a new frame
        return snapshot

    def get_size(self):
        :return: size of the captured image
        return (int(self._cap.get(int(cv2.CAP_PROP_FRAME_HEIGHT))),
                int(self._cap.get(int(cv2.CAP_PROP_FRAME_WIDTH))), 3)

    def get_stream_function(self):
        Returns stream_function object function

        def stream_function(image, finished):
            Function keeps capturing frames until finished = 1
            :param image: shared numpy array for multiprocessing(see multiprocessing.Array)
            :param finished: synchronized wrapper for int(see multiprocessing.Value)
            :return: nothing
            # Incorrect input array
            if image.shape != self.get_size():
                raise Exception("Capture: improper size of the input image")
            print("Capture: start streaming")
            # Capture frame until we get finished flag set to True
            while not finished.value:
                image[:, :, :] = self._proper_frame(self._delay)
            # Release the device

        return stream_function

    def release(self):

def main():
    # Add program arguments
    parser = argparse.ArgumentParser(description='Captures the video from the webcamera and \nwrites it into the output file with predefined fps.', formatter_class=argparse.ArgumentDefaultsHelpFormatter)
    parser.add_argument('-output', dest="output",  default="output.avi", help='name of the output video file')
    parser.add_argument('-log', dest="log",  default="frames.log", help='name of the log file')
    parser.add_argument('-fps', dest="fps",  default=25., help='frames per second value')

    # Read the arguments if any
    result = parser.parse_args()
    fps = float(result.fps)
    output = result.output
    log = result.log

    # Initialize VideoCapture object and auxilary objects
    cap = VideoCapture()
    shape = cap.get_size()
    stream = cap.get_stream_function()

    # Define shared variables(which are synchronised so race condition is excluded)
    shared_array_base = Array(ctypes.c_uint8, shape[0] * shape[1] * shape[2])
    frame = np.ctypeslib.as_array(shared_array_base.get_obj())
    frame = frame.reshape(shape[0], shape[1], shape[2])
    finished = Value('i', 0)

    # Start processes which run in parallel
    video_process = Process(target=stream, args=(frame, finished))
    video_process.start()  # Launch capture process

    # Sleep for some time to allow videocapture start working first

    # Termination function
    def terminate():
        print("Main: termination")
        finished.value = True
        # Wait for all processes to finish
        # Terminate working processes

    # The capturing works until keyboard interrupt is pressed.
    while True:
            # Display the resulting frame
            cv2.imshow('frame', frame)
            cv2.waitKey(1)  # Display it at least one ms before going to the next frame

        except KeyboardInterrupt:

if __name__ == '__main__':

这在Linux上可以正常工作,但是在OSX上却遇到了麻烦,因为它似乎无法在创建的.read()对象(存储在var cv2.VideoCapture(device)中)上执行self._cap

经过一番搜索,我发现this SO answer,建议使用Billiard,它是python多处理的替代品,据说它有一些非常有用的改进。因此,在文件的顶部,我只是在先前的多处理导入之后添加了导入(有效地覆盖了multiprocessing.Process):
from billiard import Process, forking_enable

forking_enable(0)  # Supposedly this is all I need for billiard to do it's magic
video_process = Process(target=stream, args=(frame, finished))

因此,在这个版本(here on pastebin)中,我再次运行该文件,这给了我这个错误:

搜索该错误导致我找到an SO question with a long list of answers,其中一个建议是使用dill serialization lib解决此问题。但是,该lib应该与Pathos multiprocessing fork一起使用。所以我只是尝试从更改我的多处理导入行
from multiprocessing import Array, Value, Process

from pathos.multiprocessing import Array, Value, Process








import numpy as np
import time
import ctypes

from multiprocessing import set_start_method
from multiprocessing import Process, Array, Value
import cv2

class VideoCapture:
    Class that handles video capture from device or video file
    def __init__(self, device=0, delay=0.):
        :param device: device index or video filename
        :param delay: delay between frame captures in seconds(float allowed)
        self._delay = delay
        self._device = device
        self._cap = cv2.VideoCapture(device)
        assert self._cap.isOpened()

    def __getstate__(self):
        return (self._delay, self._device)

    def __setstate__(self, state):
        self._delay, self._device = state
        self._cap = cv2.VideoCapture(self._device)
        assert self._cap.grab(), "The child could not grab the video capture"

    def _proper_frame(self, delay=None):
        :param delay: delay between frames capture(in seconds)
        :param finished: synchronized wrapper for int
        :return: frame
        snapshot = None
        correct_img = False
        fail_counter = -1
        while not correct_img:
            # Capture the frame
            correct_img, snapshot = self._cap.read()
            fail_counter += 1
            # Raise exception if there's no output from the device
            if fail_counter > 10:
                raise Exception("Capture: exceeded number of tries to capture "
                                "the frame.")
            # Delay before we get a new frame
        return snapshot

    def get_size(self):
        :return: size of the captured image
        return (int(self._cap.get(int(cv2.CAP_PROP_FRAME_HEIGHT))),
                int(self._cap.get(int(cv2.CAP_PROP_FRAME_WIDTH))), 3)

    def release(self):

def stream(capturer, image, finished):
    Function keeps capturing frames until finished = 1
    :param image: shared numpy array for multiprocessing
    :param finished: synchronized wrapper for int
    :return: nothing
    shape = capturer.get_size()

    # Define shared variables
    frame = np.ctypeslib.as_array(image.get_obj())
    frame = frame.reshape(shape[0], shape[1], shape[2])

    # Incorrect input array
    if frame.shape != capturer.get_size():
        raise Exception("Capture: improper size of the input image")
    print("Capture: start streaming")
    # Capture frame until we get finished flag set to True
    while not finished.value:
        frame[:, :, :] = capturer._proper_frame(capturer._delay)

    # Release the device

def main():

    # Initialize VideoCapture object and auxilary objects
    cap = VideoCapture()
    shape = cap.get_size()

    # Define shared variables
    shared_array_base = Array(ctypes.c_uint8, shape[0] * shape[1] * shape[2])
    frame = np.ctypeslib.as_array(shared_array_base.get_obj())
    frame = frame.reshape(shape[0], shape[1], shape[2])
    finished = Value('i', 0)

    # Start processes which run in parallel
    video_process = Process(target=stream,
                            args=(cap, shared_array_base, finished))
    video_process.start()  # Launch capture process

    # Sleep for some time to allow videocapture start working first

    # Termination function
    def terminate():
        print("Main: termination")
        finished.value = True
        # Wait for all processes to finish
        # Terminate working processes

    # The capturing works until keyboard interrupt is pressed.
    while True:
            # Display the resulting frame
            cv2.imshow('frame', frame)
            # Display it at least one ms before going to the next frame

        except KeyboardInterrupt:

if __name__ == '__main__':

  • 我在新子级中实例化cv2.VideoCapture对象,并抓取相机,因为仅应从相机读取一个进程。
  • 也许第一个使用fork的程序中的问题仅是由于共享的cv2.VideoCapture所致,在stream函数中重新创建它可以解决您的问题。
  • 您不能将numpy包装器传递给 child ,因为它不会共享mp.Array缓冲区(这确实很奇怪,花了我一段时间才弄清楚)。您需要显式传递Array并重新创建包装器。
  • 也许第一个使用fork的程序中的问题仅是由于共享的cv2.VideoCapture所致,在stream函数中重新创建它可以解决您的问题。
  • 我以为您在python3.4+中运行代码,所以我没有使用billard,但使用forking_enabled(False)而不是set_start_method应该有点类似。

  • 让我知道这项工作!

