实时视频处理
视频处理基础
什么是视频
视频是由一系列连续的图像帧按照特定的帧率(FPS - Frame Per Second)组成的。视频处理涉及对这些帧进行实时或离线分析和处理。
视频文件格式
视频文件结构:
┌─────────────┐
│ 文件头信息 │ - 分辨率、帧率、编码格式等
├─────────────┤
│ 数据流 │ - 视频流、音频流、字幕流等
├─────────────┤
│ 编码数据 │ - H.264、H.265、VP9 等
└─────────────┘
常见格式:
- MP4:H.264 + AAC,兼容性好
- AVI:MPEG-4 + MP3,质量高
- MOV:Apple 格式
- MKV:Matroska 容器,支持多编码
- WebM:网络视频格式视频处理的主要任务
- 视频读取 - 从文件或摄像头读取视频流
- 帧处理 - 对每一帧进行图像处理
- 实时分析 - 动作检测、物体追踪等
- 视频输出 - 保存处理后的视频
OpenCV 中的视频处理
从摄像头捕获视频
python
import cv2
import numpy as np
def capture_from_webcam(display=True):
"""从网络摄像头捕获视频"""
# 打开摄像头
cap = cv2.VideoCapture(0) # 0 表示默认摄像头
# 检查是否成功打开
if not cap.isOpened():
print("错误:无法打开摄像头")
return
# 获取摄像头属性
fps = cap.get(cv2.CAP_PROP_FPS)
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
print(f"摄像头参数:")
print(f" 分辨率: {width}x{height}")
print(f" 帧率: {fps} FPS")
frame_count = 0
while True:
ret, frame = cap.read()
if not ret:
print("错误:无法读取帧")
break
frame_count += 1
# 处理帧
processed_frame = process_frame(frame)
if display:
# 显示帧
cv2.imshow('Video Capture', processed_frame)
# 按 'q' 退出
if cv2.waitKey(1) & 0xFF == ord('q'):
break
print(f"捕获总帧数: {frame_count}")
# 释放资源
cap.release()
cv2.destroyAllWindows()
def process_frame(frame):
"""处理单个帧"""
# 添加时间戳
timestamp = f"{cv2.getTickCount()}"
cv2.putText(frame, timestamp, (10, 30),
cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
return frame从视频文件读取
python
def process_video_file(filename, output_filename=None):
"""处理视频文件"""
cap = cv2.VideoCapture(filename)
if not cap.isOpened():
print(f"错误:无法打开视频文件 {filename}")
return
# 获取视频属性
fps = cap.get(cv2.CAP_PROP_FPS)
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
print(f"视频信息:")
print(f" 分辨率: {width}x{height}")
print(f" 帧率: {fps} FPS")
print(f" 总帧数: {total_frames}")
print(f" 持续时间: {total_frames/fps:.2f} 秒")
# 定义视频编码和输出
if output_filename:
fourcc = cv2.VideoWriter_fourcc(*'mp4v') # H.264 编码
out = cv2.VideoWriter(output_filename, fourcc, fps, (width, height))
else:
out = None
frame_count = 0
while True:
ret, frame = cap.read()
if not ret:
break
frame_count += 1
# 处理帧
processed = process_frame(frame)
# 保存处理后的帧
if out:
out.write(processed)
# 显示进度
if frame_count % 30 == 0:
progress = frame_count / total_frames * 100
print(f"进度: {progress:.1f}%")
# 按 'q' 退出
if cv2.waitKey(1) & 0xFF == ord('q'):
break
cap.release()
if out:
out.release()
print(f"处理完成!总帧数: {frame_count}")实时视频分析
帧率控制和性能监测
python
import time
class PerformanceMonitor:
"""性能监测器"""
def __init__(self, window_size=30):
self.window_size = window_size
self.frame_times = []
self.processing_times = []
def record_frame_time(self, elapsed_time):
"""记录帧处理时间"""
self.frame_times.append(elapsed_time)
if len(self.frame_times) > self.window_size:
self.frame_times.pop(0)
def record_processing_time(self, elapsed_time):
"""记录处理时间"""
self.processing_times.append(elapsed_time)
if len(self.processing_times) > self.window_size:
self.processing_times.pop(0)
def get_fps(self):
"""获取平均 FPS"""
if not self.frame_times:
return 0
avg_time = sum(self.frame_times) / len(self.frame_times)
return 1.0 / avg_time if avg_time > 0 else 0
def get_processing_time(self):
"""获取平均处理时间(毫秒)"""
if not self.processing_times:
return 0
return sum(self.processing_times) / len(self.processing_times) * 1000
def print_stats(self):
"""打印统计信息"""
print(f"FPS: {self.get_fps():.1f}")
print(f"处理时间: {self.get_processing_time():.2f} ms")
def real_time_video_processing(target_fps=30):
"""实时视频处理"""
cap = cv2.VideoCapture(0)
monitor = PerformanceMonitor()
frame_interval = 1.0 / target_fps
while True:
frame_start = time.time()
ret, frame = cap.read()
if not ret:
break
# 处理帧
processing_start = time.time()
processed = process_frame(frame)
processing_time = time.time() - processing_start
# 添加性能信息
fps = monitor.get_fps()
cv2.putText(processed, f"FPS: {fps:.1f}", (10, 60),
cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
cv2.imshow('Real-time Processing', processed)
# 帧率控制
frame_time = time.time() - frame_start
monitor.record_frame_time(frame_time)
monitor.record_processing_time(processing_time)
# 确保帧率不超过目标
if frame_time < frame_interval:
time.sleep(frame_interval - frame_time)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
monitor.print_stats()
cap.release()
cv2.destroyAllWindows()多帧融合处理
python
from collections import deque
class FrameBuffer:
"""视频帧缓冲区"""
def __init__(self, buffer_size=3):
self.buffer = deque(maxlen=buffer_size)
def add_frame(self, frame):
"""添加帧到缓冲区"""
self.buffer.append(frame)
def get_averaged_frame(self):
"""获取平均帧(用于降噪)"""
if not self.buffer:
return None
frames = np.array(list(self.buffer))
return np.uint8(np.mean(frames, axis=0))
def get_max_frame(self):
"""获取最大值帧"""
if not self.buffer:
return None
frames = np.array(list(self.buffer))
return np.uint8(np.max(frames, axis=0))
def get_min_frame(self):
"""获取最小值帧"""
if not self.buffer:
return None
frames = np.array(list(self.buffer))
return np.uint8(np.min(frames, axis=0))
def temporal_filtering():
"""时间滤波"""
cap = cv2.VideoCapture(0)
buffer = FrameBuffer(buffer_size=5)
while True:
ret, frame = cap.read()
if not ret:
break
buffer.add_frame(frame)
# 获取平均帧(降噪)
averaged = buffer.get_averaged_frame()
if averaged is not None:
cv2.imshow('Temporal Averaging', averaged)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
cap.release()
cv2.destroyAllWindows()视频物体追踪
光流法
python
def optical_flow_lucas_kanade():
"""Lucas-Kanade 光流法"""
cap = cv2.VideoCapture(0)
# Lucas-Kanade 参数
lk_params = dict(
winSize=(15, 15),
maxLevel=2,
criteria=(cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 10, 0.03)
)
ret, prev_frame = cap.read()
prev_gray = cv2.cvtColor(prev_frame, cv2.COLOR_BGR2GRAY)
# 检测特征点
feature_params = dict(
maxCorners=100,
qualityLevel=0.3,
minDistance=7,
blockSize=7
)
prev_points = cv2.goodFeaturesToTrack(prev_gray, **feature_params)
mask = np.zeros_like(prev_frame)
while True:
ret, frame = cap.read()
if not ret:
break
frame_gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
# 计算光流
next_points, status, error = cv2.calcOpticalFlowPyrLK(
prev_gray, frame_gray, prev_points, None, **lk_params
)
# 选择成功追踪的点
if next_points is not None:
good_new = next_points[status == 1]
good_old = prev_points[status == 1]
# 绘制光流
for new, old in zip(good_new, good_old):
x, y = new.ravel()
a, b = old.ravel()
mask = cv2.line(mask, (int(a), int(b)), (int(x), int(y)), (0, 255, 0), 2)
frame = cv2.circle(frame, (int(x), int(y)), 5, (0, 0, 255), -1)
output = cv2.add(frame, mask)
cv2.imshow('Optical Flow', output)
prev_gray = frame_gray.copy()
prev_points = good_new.reshape(-1, 1, 2) if good_new is not None else None
if cv2.waitKey(1) & 0xFF == ord('q'):
break
cap.release()
cv2.destroyAllWindows()
def optical_flow_dense():
"""稠密光流"""
cap = cv2.VideoCapture(0)
ret, prev_frame = cap.read()
prev_gray = cv2.cvtColor(prev_frame, cv2.COLOR_BGR2GRAY)
hsv = np.zeros_like(prev_frame)
hsv[..., 1] = 255
while True:
ret, frame = cap.read()
if not ret:
break
frame_gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
# 计算稠密光流
flow = cv2.calcOpticalFlowFarneback(
prev_gray, frame_gray, None, 0.5, 3, 15, 3, 5, 1.2, 0
)
# 将光流转换为颜色表示
mag, angle = cv2.cartToPolar(flow[..., 0], flow[..., 1])
hsv[..., 0] = angle * 180 / np.pi / 2
hsv[..., 2] = cv2.normalize(mag, None, 0, 255, cv2.NORM_MINMAX)
flow_rgb = cv2.cvtColor(hsv, cv2.COLOR_HSV2BGR)
cv2.imshow('Dense Optical Flow', flow_rgb)
prev_gray = frame_gray.copy()
if cv2.waitKey(1) & 0xFF == ord('q'):
break
cap.release()
cv2.destroyAllWindows()背景减除
python
def background_subtraction():
"""背景减除"""
cap = cv2.VideoCapture(0)
# 创建背景减除器
mog2 = cv2.createBackgroundSubtractorMOG2(detectShadows=True)
knn = cv2.createBackgroundSubtractorKNN(detectShadows=True)
while True:
ret, frame = cap.read()
if not ret:
break
# 应用背景减除
fgmask_mog2 = mog2.apply(frame)
fgmask_knn = knn.apply(frame)
# 后处理
kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5))
fgmask_mog2 = cv2.morphologyEx(fgmask_mog2, cv2.MORPH_CLOSE, kernel)
# 找轮廓
contours, _ = cv2.findContours(fgmask_mog2, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
output = frame.copy()
for contour in contours:
area = cv2.contourArea(contour)
if area > 500: # 过滤小轮廓
x, y, w, h = cv2.boundingRect(contour)
cv2.rectangle(output, (x, y), (x+w, y+h), (0, 255, 0), 2)
cv2.imshow('Original', frame)
cv2.imshow('MOG2 Mask', fgmask_mog2)
cv2.imshow('Detection', output)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
cap.release()
cv2.destroyAllWindows()多物体追踪
python
class ObjectTracker:
"""多物体追踪"""
def __init__(self, distance_threshold=50):
self.trackers = {}
self.next_id = 0
self.distance_threshold = distance_threshold
def update(self, detections):
"""更新追踪"""
# 匹配检测和已追踪对象
matched_pairs = self._match_detections(detections)
# 更新已匹配的追踪
matched_ids = set()
for det_idx, tracker_id in matched_pairs:
self.trackers[tracker_id]['center'] = detections[det_idx]['center']
self.trackers[tracker_id]['frames_since_update'] = 0
matched_ids.add(tracker_id)
# 创建新追踪
for i, det in enumerate(detections):
if not any(det_idx == i for det_idx, _ in matched_pairs):
self.trackers[self.next_id] = {
'center': det['center'],
'frames_since_update': 0
}
self.next_id += 1
# 移除丢失的追踪
for tracker_id in list(self.trackers.keys()):
self.trackers[tracker_id]['frames_since_update'] += 1
if self.trackers[tracker_id]['frames_since_update'] > 30:
del self.trackers[tracker_id]
return self.trackers
def _match_detections(self, detections):
"""匹配检测和追踪"""
matched_pairs = []
if not detections or not self.trackers:
return matched_pairs
# 计算距离矩阵
distances = np.zeros((len(detections), len(self.trackers)))
for i, det in enumerate(detections):
for j, (tracker_id, tracker) in enumerate(self.trackers.items()):
dx = det['center'][0] - tracker['center'][0]
dy = det['center'][1] - tracker['center'][1]
distances[i, j] = np.sqrt(dx**2 + dy**2)
# 贪心匹配
used_trackers = set()
for i in range(len(detections)):
best_j = np.argmin(distances[i])
if distances[i, best_j] < self.distance_threshold:
tracker_id = list(self.trackers.keys())[best_j]
if tracker_id not in used_trackers:
matched_pairs.append((i, tracker_id))
used_trackers.add(tracker_id)
return matched_pairs视频编码和输出
保存处理后的视频
python
def save_processed_video(input_filename, output_filename, processor):
"""保存处理后的视频"""
cap = cv2.VideoCapture(input_filename)
fps = cap.get(cv2.CAP_PROP_FPS)
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
# 定义编码器
fourcc = cv2.VideoWriter_fourcc(*'mp4v') # H.264
out = cv2.VideoWriter(output_filename, fourcc, fps, (width, height))
if not out.isOpened():
print("错误:无法打开输出视频")
return
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
frame_count = 0
while True:
ret, frame = cap.read()
if not ret:
break
# 处理帧
processed = processor(frame)
# 写入输出
out.write(processed)
frame_count += 1
if frame_count % 30 == 0:
progress = frame_count / total_frames * 100
print(f"进度: {progress:.1f}%")
cap.release()
out.release()
print(f"视频已保存到: {output_filename}")
def create_video_from_images(image_folder, output_filename, fps=30):
"""从图像序列创建视频"""
import os
images = sorted([img for img in os.listdir(image_folder) if img.endswith(".jpg")])
if not images:
print("错误:未找到图像文件")
return
# 读取第一张图像获取尺寸
first_image = cv2.imread(os.path.join(image_folder, images[0]))
height, width = first_image.shape[:2]
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(output_filename, fourcc, fps, (width, height))
for image_name in images:
image_path = os.path.join(image_folder, image_name)
frame = cv2.imread(image_path)
if frame.shape[:2] != (height, width):
frame = cv2.resize(frame, (width, height))
out.write(frame)
out.release()
print(f"视频已创建: {output_filename}")不同的编码格式
python
def compare_codecs():
"""比较不同的编码格式"""
codec_codes = {
'H.264': 'mp4v',
'H.265': 'hev1',
'VP9': 'vp90',
'MJPEG': 'MJPG',
'MPEG-4': 'DIVX',
'FFV1': 'FFV1'
}
for codec_name, codec_code in codec_codes.items():
fourcc = cv2.VideoWriter_fourcc(*codec_code)
print(f"{codec_name}: {fourcc}")实战案例:LeBot 视频分析管道
python
class LeBotVideoAnalysisPipeline:
"""LeBot 视频分析管道"""
def __init__(self, input_source=0):
self.cap = cv2.VideoCapture(input_source)
self.performance_monitor = PerformanceMonitor()
self.frame_buffer = FrameBuffer(buffer_size=3)
self.object_tracker = ObjectTracker()
def run(self):
"""运行分析管道"""
while True:
start_time = time.time()
ret, frame = self.cap.read()
if not ret:
break
# 1. 降噪
self.frame_buffer.add_frame(frame)
denoised = self.frame_buffer.get_averaged_frame()
if denoised is None:
denoised = frame
# 2. 物体检测
detections = self._detect_objects(denoised)
# 3. 物体追踪
tracked = self.object_tracker.update(detections)
# 4. 绘制结果
output = self._draw_results(denoised, tracked)
# 5. 显示
cv2.imshow('LeBot Analysis', output)
# 性能监测
elapsed = time.time() - start_time
self.performance_monitor.record_frame_time(elapsed)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
def _detect_objects(self, frame):
"""检测物体"""
# 这里可以使用 YOLO、SSD 等物体检测器
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
edges = cv2.Canny(gray, 100, 200)
contours, _ = cv2.findContours(edges, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
detections = []
for contour in contours:
area = cv2.contourArea(contour)
if area > 100:
x, y, w, h = cv2.boundingRect(contour)
detections.append({
'bbox': (x, y, x+w, y+h),
'center': ((x + x+w) // 2, (y + y+h) // 2),
'area': area
})
return detections
def _draw_results(self, frame, tracked):
"""绘制追踪结果"""
output = frame.copy()
for tracker_id, data in tracked.items():
x, y = data['center']
cv2.circle(output, (int(x), int(y)), 5, (0, 255, 0), -1)
cv2.putText(output, f"ID: {tracker_id}", (int(x), int(y) - 10),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)
# 显示 FPS
fps = self.performance_monitor.get_fps()
cv2.putText(output, f"FPS: {fps:.1f}", (10, 30),
cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
return output
def close(self):
"""关闭资源"""
self.cap.release()
cv2.destroyAllWindows()
self.performance_monitor.print_stats()总结
视频处理是 LeBot 机器人视觉系统的核心。本章涵盖了:
- 视频输入 - 摄像头和文件
- 实时处理 - 帧率控制和性能监测
- 时间分析 - 多帧融合和光流
- 物体追踪 - 背景减除和多物体追踪
- 视频输出 - 不同编码格式
在 LeBot 项目中,实时视频处理能力使机器人能够实时理解环境并做出相应的反应。