Efficient Camera Stream With Python

Argo Saakyan
Towards AI
Published in
5 min readAug 23, 2023

--

Photo by Rahul Chakraborty on Unsplash

Let’s talk about using webcams with Python. I had a simple task of reading frames from the camera and running a neural net on each frame. With one specific webcam, I was having issues with setting up targeted fps (as I now understand — because the camera could run 30 fps with mjpeg format, but not raw), so I decided to dig into FFmpeg to see if it helps.

I ended up getting both OpenCV and FFmpeg working, but I found out a very interesting thing: FFmpeg performance was superior to OpenCV is my main use case. In fact, with FFmpeg, I had a 15x speedup for reading the frame and a 32% speedup for the whole pipeline. I could not believe the results and rechecked everything several times, but they were consistent.

Note: performance was exactly the same, when I just read frame after frame, but FFmpeg was faster when I ran something after reading the frame (which takes time). I’ll show exactly what I mean below.

Now, let’s take a look at the code. Firstly — class for reading webcam frames with OpenCV:

class VideoStreamCV:
def __init__(self, src: int, fps: int, resolution: Tuple[int, int]):
self.src = src
self.fps = fps
self.resolution = resolution
self.cap = self._open_camera()
self.wait_for_cam()

def _open_camera(self):
cap = cv2.VideoCapture(self.src)
cap.set(cv2.CAP_PROP_FRAME_WIDTH, self.resolution[0])
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, self.resolution[1])
fourcc = cv2.VideoWriter_fourcc(*"MJPG")
cap.set(cv2.CAP_PROP_FOURCC, fourcc)
cap.set(cv2.CAP_PROP_FPS, self.fps)
return cap

def read(self):
ret, frame = self.cap.read()
if not ret:
return None
return frame

def release(self):
self.cap.release()

def wait_for_cam(self):
for _ in range(30):
frame = self.read()
if frame is not None:
return True
return False

I use wait_for_cam function, as cameras often need time 'warm up'. Same warmup is used with FFmpeg class:

class VideoStreamFFmpeg:
def __init__(self, src: int, fps: int, resolution: Tuple[int, int]):
self.src = src
self.fps = fps
self.resolution = resolution
self.pipe = self._open_ffmpeg()
self.frame_shape = (self.resolution[1], self.resolution[0], 3)
self.frame_size = np.prod(self.frame_shape)
self.wait_for_cam()

def _open_ffmpeg(self):
os_name = platform.system()
if os_name == "Darwin": # macOS
input_format = "avfoundation"
video_device = f"{self.src}:none"
elif os_name == "Linux":
input_format = "v4l2"
video_device = f"{self.src}"
elif os_name == "Windows":
input_format = "dshow"
video_device = f"video={self.src}"
else:
raise ValueError("Unsupported OS")

command = [
'ffmpeg',
'-f', input_format,
'-r', str(self.fps),
'-video_size', f'{self.resolution[0]}x{self.resolution[1]}',
'-i', video_device,
'-vcodec', 'mjpeg', # Input codec set to mjpeg
'-an', '-vcodec', 'rawvideo', # Decode the MJPEG stream to raw video
'-pix_fmt', 'bgr24',
'-vsync', '2',
'-f', 'image2pipe', '-'
]

if os_name == "Linux":
command.insert(2, "-input_format")
command.insert(3, "mjpeg")

return subprocess.Popen(
command, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, bufsize=10**8
)

def read(self):
raw_image = self.pipe.stdout.read(self.frame_size)
if len(raw_image) != self.frame_size:
return None
image = np.frombuffer(raw_image, dtype=np.uint8).reshape(self.frame_shape)
return image

def release(self):
self.pipe.terminate()

def wait_for_cam(self):
for _ in range(30):
frame = self.read()
if frame is not None:
return True
return False

For timing run function, I used decorator:

def timeit(func):
def wrapper(*args, **kwargs):
t0 = time.perf_counter()
result = func(*args, **kwargs)
t1 = time.perf_counter()
print(f"Main function time: {round(t1-t0, 4)}s")
return result

return wrapper

As a heavy synthetic task, in place of a neural net, I used this simple function (it also could be just time.sleep). This is a very important part, as without any task, reading speeds are the same for both OpenCV and FFmpeg:

def computation_task():
for _ in range(5000000):
9999 * 9999

Now function with a cycle where I read the frame, the time it, run computation_task:

@timeit
def run(cam: VideoStreamCV | VideoStreamFFmpeg, run_task: bool):
timer = []
for _ in range(100):
t0 = time.perf_counter()
cam.read()
timer.append(time.perf_counter() - t0)

if run_task:
computation_task()

cam.release()
return round(np.mean(timer), 4)

And finally main function where I set up a couple of parameters, init 2 video streams with OpenCV and FFmpeg, and run them without computation_task and with it.

def main():
fsp = 30
resolution = (1920, 1080)

for run_task in [False, True]:
ff_cam = VideoStreamFFmpeg(src=0, fps=fsp, resolution=resolution)
cv_cam = VideoStreamCV(src=0, fps=fsp, resolution=resolution)

print(f"FFMPEG, task {run_task}:")
print(f"Mean frame read time: {run(cam=ff_cam, run_task=run_task)}s\n")
print(f"CV2, task {run_task}:")
print(f"Mean frame read time: {run(cam=cv_cam, run_task=run_task)}s\n")

And here is what I get:

FFMPEG, task False:
Main function time: 3.2334s
Mean frame read time: 0.0323s

CV2, task False:
Main function time: 3.3934s
Mean frame read time: 0.0332s

FFMPEG, task True:
Main function time: 4.461s
Mean frame read time: 0.0014s

CV2, task True:
Main function time: 6.6833s
Mean frame read time: 0.023s

So, without a synthetic task, I get the same reading time: 0.0323, 0.0332. But with synthetic task: 0.0014 and 0.023, so FFmpeg is significantly faster. The beauty is that I got a real speedup with my neural net application, not only with synthetic tests, so I decided to share the results.

Here is a graph that shows how much time it takes for 1 iteration: read the frame, process it with a yolov8s model (on CPU), and save frames with detected objects:

Here is a full script with synthetic tests:

import platform
import subprocess
import time
from typing import Tuple
import cv2
import numpy as np


class VideoStreamFFmpeg:
def __init__(self, src: int, fps: int, resolution: Tuple[int, int]):
self.src = src
self.fps = fps
self.resolution = resolution
self.pipe = self._open_ffmpeg()
self.frame_shape = (self.resolution[1], self.resolution[0], 3)
self.frame_size = np.prod(self.frame_shape)
self.wait_for_cam()

def _open_ffmpeg(self):
os_name = platform.system()
if os_name == "Darwin": # macOS
input_format = "avfoundation"
video_device = f"{self.src}:none"
elif os_name == "Linux":
input_format = "v4l2"
video_device = f"{self.src}"
elif os_name == "Windows":
input_format = "dshow"
video_device = f"video={self.src}"
else:
raise ValueError("Unsupported OS")

command = [
'ffmpeg',
'-f', input_format,
'-r', str(self.fps),
'-video_size', f'{self.resolution[0]}x{self.resolution[1]}',
'-i', video_device,
'-vcodec', 'mjpeg', # Input codec set to mjpeg
'-an', '-vcodec', 'rawvideo', # Decode the MJPEG stream to raw video
'-pix_fmt', 'bgr24',
'-vsync', '2',
'-f', 'image2pipe', '-'
]

if os_name == "Linux":
command.insert(2, "-input_format")
command.insert(3, "mjpeg")

return subprocess.Popen(
command, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, bufsize=10**8
)

def read(self):
raw_image = self.pipe.stdout.read(self.frame_size)
if len(raw_image) != self.frame_size:
return None
image = np.frombuffer(raw_image, dtype=np.uint8).reshape(self.frame_shape)
return image

def release(self):
self.pipe.terminate()

def wait_for_cam(self):
for _ in range(30):
frame = self.read()
if frame is not None:
return True
return False


class VideoStreamCV:
def __init__(self, src: int, fps: int, resolution: Tuple[int, int]):
self.src = src
self.fps = fps
self.resolution = resolution
self.cap = self._open_camera()
self.wait_for_cam()

def _open_camera(self):
cap = cv2.VideoCapture(self.src)
cap.set(cv2.CAP_PROP_FRAME_WIDTH, self.resolution[0])
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, self.resolution[1])
fourcc = cv2.VideoWriter_fourcc(*"MJPG")
cap.set(cv2.CAP_PROP_FOURCC, fourcc)
cap.set(cv2.CAP_PROP_FPS, self.fps)
return cap

def read(self):
ret, frame = self.cap.read()
if not ret:
return None
return frame

def release(self):
self.cap.release()

def wait_for_cam(self):
for _ in range(30):
frame = self.read()
if frame is not None:
return True
return False


def timeit(func):
def wrapper(*args, **kwargs):
t0 = time.perf_counter()
result = func(*args, **kwargs)
t1 = time.perf_counter()
print(f"Main function time: {round(t1-t0, 4)}s")
return result

return wrapper


def computation_task():
for _ in range(5000000):
9999 * 9999


@timeit
def run(cam: VideoStreamCV | VideoStreamFFmpeg, run_task: bool):
timer = []
for _ in range(100):
t0 = time.perf_counter()
cam.read()
timer.append(time.perf_counter() - t0)

if run_task:
computation_task()

cam.release()
return round(np.mean(timer), 4)


def main():
fsp = 30
resolution = (1920, 1080)

for run_task in [False, True]:
ff_cam = VideoStreamFFmpeg(src=0, fps=fsp, resolution=resolution)
cv_cam = VideoStreamCV(src=0, fps=fsp, resolution=resolution)

print(f"FFMPEG, task {run_task}:")
print(f"Mean frame read time: {run(cam=ff_cam, run_task=run_task)}s\n")
print(f"CV2, task {run_task}:")
print(f"Mean frame read time: {run(cam=cv_cam, run_task=run_task)}s\n")


if __name__ == "__main__":
main()

Note: This script was tested on an M1 Pro chip from Apple. Hope this was helpful!

--

--