Skip to content

实时视频处理

视频处理基础

什么是视频

视频是由一系列连续的图像帧按照特定的帧率(FPS - Frame Per Second)组成的。视频处理涉及对这些帧进行实时或离线分析和处理。

视频文件格式

视频文件结构:
┌─────────────┐
│ 文件头信息   │  - 分辨率、帧率、编码格式等
├─────────────┤
│ 数据流      │  - 视频流、音频流、字幕流等
├─────────────┤
│ 编码数据    │  - H.264、H.265、VP9 等
└─────────────┘

常见格式:
- MP4:H.264 + AAC,兼容性好
- AVI:MPEG-4 + MP3,质量高
- MOV:Apple 格式
- MKV:Matroska 容器,支持多编码
- WebM:网络视频格式

视频处理的主要任务

  1. 视频读取 - 从文件或摄像头读取视频流
  2. 帧处理 - 对每一帧进行图像处理
  3. 实时分析 - 动作检测、物体追踪等
  4. 视频输出 - 保存处理后的视频

OpenCV 中的视频处理

从摄像头捕获视频

python
import cv2
import numpy as np

def capture_from_webcam(display=True):
    """从网络摄像头捕获视频"""
    # 打开摄像头
    cap = cv2.VideoCapture(0)  # 0 表示默认摄像头
    
    # 检查是否成功打开
    if not cap.isOpened():
        print("错误:无法打开摄像头")
        return
    
    # 获取摄像头属性
    fps = cap.get(cv2.CAP_PROP_FPS)
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    
    print(f"摄像头参数:")
    print(f"  分辨率: {width}x{height}")
    print(f"  帧率: {fps} FPS")
    
    frame_count = 0
    
    while True:
        ret, frame = cap.read()
        
        if not ret:
            print("错误:无法读取帧")
            break
        
        frame_count += 1
        
        # 处理帧
        processed_frame = process_frame(frame)
        
        if display:
            # 显示帧
            cv2.imshow('Video Capture', processed_frame)
        
        # 按 'q' 退出
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break
    
    print(f"捕获总帧数: {frame_count}")
    
    # 释放资源
    cap.release()
    cv2.destroyAllWindows()

def process_frame(frame):
    """处理单个帧"""
    # 添加时间戳
    timestamp = f"{cv2.getTickCount()}"
    cv2.putText(frame, timestamp, (10, 30), 
                cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
    
    return frame

从视频文件读取

python
def process_video_file(filename, output_filename=None):
    """处理视频文件"""
    cap = cv2.VideoCapture(filename)
    
    if not cap.isOpened():
        print(f"错误:无法打开视频文件 {filename}")
        return
    
    # 获取视频属性
    fps = cap.get(cv2.CAP_PROP_FPS)
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    
    print(f"视频信息:")
    print(f"  分辨率: {width}x{height}")
    print(f"  帧率: {fps} FPS")
    print(f"  总帧数: {total_frames}")
    print(f"  持续时间: {total_frames/fps:.2f} 秒")
    
    # 定义视频编码和输出
    if output_filename:
        fourcc = cv2.VideoWriter_fourcc(*'mp4v')  # H.264 编码
        out = cv2.VideoWriter(output_filename, fourcc, fps, (width, height))
    else:
        out = None
    
    frame_count = 0
    
    while True:
        ret, frame = cap.read()
        
        if not ret:
            break
        
        frame_count += 1
        
        # 处理帧
        processed = process_frame(frame)
        
        # 保存处理后的帧
        if out:
            out.write(processed)
        
        # 显示进度
        if frame_count % 30 == 0:
            progress = frame_count / total_frames * 100
            print(f"进度: {progress:.1f}%")
        
        # 按 'q' 退出
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break
    
    cap.release()
    if out:
        out.release()
    
    print(f"处理完成!总帧数: {frame_count}")

实时视频分析

帧率控制和性能监测

python
import time

class PerformanceMonitor:
    """性能监测器"""
    
    def __init__(self, window_size=30):
        self.window_size = window_size
        self.frame_times = []
        self.processing_times = []
    
    def record_frame_time(self, elapsed_time):
        """记录帧处理时间"""
        self.frame_times.append(elapsed_time)
        if len(self.frame_times) > self.window_size:
            self.frame_times.pop(0)
    
    def record_processing_time(self, elapsed_time):
        """记录处理时间"""
        self.processing_times.append(elapsed_time)
        if len(self.processing_times) > self.window_size:
            self.processing_times.pop(0)
    
    def get_fps(self):
        """获取平均 FPS"""
        if not self.frame_times:
            return 0
        avg_time = sum(self.frame_times) / len(self.frame_times)
        return 1.0 / avg_time if avg_time > 0 else 0
    
    def get_processing_time(self):
        """获取平均处理时间(毫秒)"""
        if not self.processing_times:
            return 0
        return sum(self.processing_times) / len(self.processing_times) * 1000
    
    def print_stats(self):
        """打印统计信息"""
        print(f"FPS: {self.get_fps():.1f}")
        print(f"处理时间: {self.get_processing_time():.2f} ms")


def real_time_video_processing(target_fps=30):
    """实时视频处理"""
    cap = cv2.VideoCapture(0)
    monitor = PerformanceMonitor()
    
    frame_interval = 1.0 / target_fps
    
    while True:
        frame_start = time.time()
        
        ret, frame = cap.read()
        if not ret:
            break
        
        # 处理帧
        processing_start = time.time()
        processed = process_frame(frame)
        processing_time = time.time() - processing_start
        
        # 添加性能信息
        fps = monitor.get_fps()
        cv2.putText(processed, f"FPS: {fps:.1f}", (10, 60),
                   cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
        
        cv2.imshow('Real-time Processing', processed)
        
        # 帧率控制
        frame_time = time.time() - frame_start
        monitor.record_frame_time(frame_time)
        monitor.record_processing_time(processing_time)
        
        # 确保帧率不超过目标
        if frame_time < frame_interval:
            time.sleep(frame_interval - frame_time)
        
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break
    
    monitor.print_stats()
    cap.release()
    cv2.destroyAllWindows()

多帧融合处理

python
from collections import deque

class FrameBuffer:
    """视频帧缓冲区"""
    
    def __init__(self, buffer_size=3):
        self.buffer = deque(maxlen=buffer_size)
    
    def add_frame(self, frame):
        """添加帧到缓冲区"""
        self.buffer.append(frame)
    
    def get_averaged_frame(self):
        """获取平均帧(用于降噪)"""
        if not self.buffer:
            return None
        
        frames = np.array(list(self.buffer))
        return np.uint8(np.mean(frames, axis=0))
    
    def get_max_frame(self):
        """获取最大值帧"""
        if not self.buffer:
            return None
        
        frames = np.array(list(self.buffer))
        return np.uint8(np.max(frames, axis=0))
    
    def get_min_frame(self):
        """获取最小值帧"""
        if not self.buffer:
            return None
        
        frames = np.array(list(self.buffer))
        return np.uint8(np.min(frames, axis=0))


def temporal_filtering():
    """时间滤波"""
    cap = cv2.VideoCapture(0)
    buffer = FrameBuffer(buffer_size=5)
    
    while True:
        ret, frame = cap.read()
        if not ret:
            break
        
        buffer.add_frame(frame)
        
        # 获取平均帧(降噪)
        averaged = buffer.get_averaged_frame()
        
        if averaged is not None:
            cv2.imshow('Temporal Averaging', averaged)
        
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break
    
    cap.release()
    cv2.destroyAllWindows()

视频物体追踪

光流法

python
def optical_flow_lucas_kanade():
    """Lucas-Kanade 光流法"""
    cap = cv2.VideoCapture(0)
    
    # Lucas-Kanade 参数
    lk_params = dict(
        winSize=(15, 15),
        maxLevel=2,
        criteria=(cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 10, 0.03)
    )
    
    ret, prev_frame = cap.read()
    prev_gray = cv2.cvtColor(prev_frame, cv2.COLOR_BGR2GRAY)
    
    # 检测特征点
    feature_params = dict(
        maxCorners=100,
        qualityLevel=0.3,
        minDistance=7,
        blockSize=7
    )
    
    prev_points = cv2.goodFeaturesToTrack(prev_gray, **feature_params)
    mask = np.zeros_like(prev_frame)
    
    while True:
        ret, frame = cap.read()
        if not ret:
            break
        
        frame_gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        
        # 计算光流
        next_points, status, error = cv2.calcOpticalFlowPyrLK(
            prev_gray, frame_gray, prev_points, None, **lk_params
        )
        
        # 选择成功追踪的点
        if next_points is not None:
            good_new = next_points[status == 1]
            good_old = prev_points[status == 1]
            
            # 绘制光流
            for new, old in zip(good_new, good_old):
                x, y = new.ravel()
                a, b = old.ravel()
                
                mask = cv2.line(mask, (int(a), int(b)), (int(x), int(y)), (0, 255, 0), 2)
                frame = cv2.circle(frame, (int(x), int(y)), 5, (0, 0, 255), -1)
        
        output = cv2.add(frame, mask)
        cv2.imshow('Optical Flow', output)
        
        prev_gray = frame_gray.copy()
        prev_points = good_new.reshape(-1, 1, 2) if good_new is not None else None
        
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break
    
    cap.release()
    cv2.destroyAllWindows()


def optical_flow_dense():
    """稠密光流"""
    cap = cv2.VideoCapture(0)
    
    ret, prev_frame = cap.read()
    prev_gray = cv2.cvtColor(prev_frame, cv2.COLOR_BGR2GRAY)
    hsv = np.zeros_like(prev_frame)
    hsv[..., 1] = 255
    
    while True:
        ret, frame = cap.read()
        if not ret:
            break
        
        frame_gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        
        # 计算稠密光流
        flow = cv2.calcOpticalFlowFarneback(
            prev_gray, frame_gray, None, 0.5, 3, 15, 3, 5, 1.2, 0
        )
        
        # 将光流转换为颜色表示
        mag, angle = cv2.cartToPolar(flow[..., 0], flow[..., 1])
        hsv[..., 0] = angle * 180 / np.pi / 2
        hsv[..., 2] = cv2.normalize(mag, None, 0, 255, cv2.NORM_MINMAX)
        
        flow_rgb = cv2.cvtColor(hsv, cv2.COLOR_HSV2BGR)
        
        cv2.imshow('Dense Optical Flow', flow_rgb)
        
        prev_gray = frame_gray.copy()
        
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break
    
    cap.release()
    cv2.destroyAllWindows()

背景减除

python
def background_subtraction():
    """背景减除"""
    cap = cv2.VideoCapture(0)
    
    # 创建背景减除器
    mog2 = cv2.createBackgroundSubtractorMOG2(detectShadows=True)
    knn = cv2.createBackgroundSubtractorKNN(detectShadows=True)
    
    while True:
        ret, frame = cap.read()
        if not ret:
            break
        
        # 应用背景减除
        fgmask_mog2 = mog2.apply(frame)
        fgmask_knn = knn.apply(frame)
        
        # 后处理
        kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5))
        fgmask_mog2 = cv2.morphologyEx(fgmask_mog2, cv2.MORPH_CLOSE, kernel)
        
        # 找轮廓
        contours, _ = cv2.findContours(fgmask_mog2, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
        
        output = frame.copy()
        for contour in contours:
            area = cv2.contourArea(contour)
            if area > 500:  # 过滤小轮廓
                x, y, w, h = cv2.boundingRect(contour)
                cv2.rectangle(output, (x, y), (x+w, y+h), (0, 255, 0), 2)
        
        cv2.imshow('Original', frame)
        cv2.imshow('MOG2 Mask', fgmask_mog2)
        cv2.imshow('Detection', output)
        
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break
    
    cap.release()
    cv2.destroyAllWindows()

多物体追踪

python
class ObjectTracker:
    """多物体追踪"""
    
    def __init__(self, distance_threshold=50):
        self.trackers = {}
        self.next_id = 0
        self.distance_threshold = distance_threshold
    
    def update(self, detections):
        """更新追踪"""
        # 匹配检测和已追踪对象
        matched_pairs = self._match_detections(detections)
        
        # 更新已匹配的追踪
        matched_ids = set()
        for det_idx, tracker_id in matched_pairs:
            self.trackers[tracker_id]['center'] = detections[det_idx]['center']
            self.trackers[tracker_id]['frames_since_update'] = 0
            matched_ids.add(tracker_id)
        
        # 创建新追踪
        for i, det in enumerate(detections):
            if not any(det_idx == i for det_idx, _ in matched_pairs):
                self.trackers[self.next_id] = {
                    'center': det['center'],
                    'frames_since_update': 0
                }
                self.next_id += 1
        
        # 移除丢失的追踪
        for tracker_id in list(self.trackers.keys()):
            self.trackers[tracker_id]['frames_since_update'] += 1
            if self.trackers[tracker_id]['frames_since_update'] > 30:
                del self.trackers[tracker_id]
        
        return self.trackers
    
    def _match_detections(self, detections):
        """匹配检测和追踪"""
        matched_pairs = []
        
        if not detections or not self.trackers:
            return matched_pairs
        
        # 计算距离矩阵
        distances = np.zeros((len(detections), len(self.trackers)))
        
        for i, det in enumerate(detections):
            for j, (tracker_id, tracker) in enumerate(self.trackers.items()):
                dx = det['center'][0] - tracker['center'][0]
                dy = det['center'][1] - tracker['center'][1]
                distances[i, j] = np.sqrt(dx**2 + dy**2)
        
        # 贪心匹配
        used_trackers = set()
        for i in range(len(detections)):
            best_j = np.argmin(distances[i])
            if distances[i, best_j] < self.distance_threshold:
                tracker_id = list(self.trackers.keys())[best_j]
                if tracker_id not in used_trackers:
                    matched_pairs.append((i, tracker_id))
                    used_trackers.add(tracker_id)
        
        return matched_pairs

视频编码和输出

保存处理后的视频

python
def save_processed_video(input_filename, output_filename, processor):
    """保存处理后的视频"""
    cap = cv2.VideoCapture(input_filename)
    
    fps = cap.get(cv2.CAP_PROP_FPS)
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    
    # 定义编码器
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')  # H.264
    out = cv2.VideoWriter(output_filename, fourcc, fps, (width, height))
    
    if not out.isOpened():
        print("错误:无法打开输出视频")
        return
    
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    frame_count = 0
    
    while True:
        ret, frame = cap.read()
        if not ret:
            break
        
        # 处理帧
        processed = processor(frame)
        
        # 写入输出
        out.write(processed)
        
        frame_count += 1
        if frame_count % 30 == 0:
            progress = frame_count / total_frames * 100
            print(f"进度: {progress:.1f}%")
    
    cap.release()
    out.release()
    print(f"视频已保存到: {output_filename}")


def create_video_from_images(image_folder, output_filename, fps=30):
    """从图像序列创建视频"""
    import os
    
    images = sorted([img for img in os.listdir(image_folder) if img.endswith(".jpg")])
    
    if not images:
        print("错误:未找到图像文件")
        return
    
    # 读取第一张图像获取尺寸
    first_image = cv2.imread(os.path.join(image_folder, images[0]))
    height, width = first_image.shape[:2]
    
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    out = cv2.VideoWriter(output_filename, fourcc, fps, (width, height))
    
    for image_name in images:
        image_path = os.path.join(image_folder, image_name)
        frame = cv2.imread(image_path)
        
        if frame.shape[:2] != (height, width):
            frame = cv2.resize(frame, (width, height))
        
        out.write(frame)
    
    out.release()
    print(f"视频已创建: {output_filename}")

不同的编码格式

python
def compare_codecs():
    """比较不同的编码格式"""
    
    codec_codes = {
        'H.264': 'mp4v',
        'H.265': 'hev1',
        'VP9': 'vp90',
        'MJPEG': 'MJPG',
        'MPEG-4': 'DIVX',
        'FFV1': 'FFV1'
    }
    
    for codec_name, codec_code in codec_codes.items():
        fourcc = cv2.VideoWriter_fourcc(*codec_code)
        print(f"{codec_name}: {fourcc}")

实战案例:LeBot 视频分析管道

python
class LeBotVideoAnalysisPipeline:
    """LeBot 视频分析管道"""
    
    def __init__(self, input_source=0):
        self.cap = cv2.VideoCapture(input_source)
        self.performance_monitor = PerformanceMonitor()
        self.frame_buffer = FrameBuffer(buffer_size=3)
        self.object_tracker = ObjectTracker()
    
    def run(self):
        """运行分析管道"""
        while True:
            start_time = time.time()
            
            ret, frame = self.cap.read()
            if not ret:
                break
            
            # 1. 降噪
            self.frame_buffer.add_frame(frame)
            denoised = self.frame_buffer.get_averaged_frame()
            
            if denoised is None:
                denoised = frame
            
            # 2. 物体检测
            detections = self._detect_objects(denoised)
            
            # 3. 物体追踪
            tracked = self.object_tracker.update(detections)
            
            # 4. 绘制结果
            output = self._draw_results(denoised, tracked)
            
            # 5. 显示
            cv2.imshow('LeBot Analysis', output)
            
            # 性能监测
            elapsed = time.time() - start_time
            self.performance_monitor.record_frame_time(elapsed)
            
            if cv2.waitKey(1) & 0xFF == ord('q'):
                break
    
    def _detect_objects(self, frame):
        """检测物体"""
        # 这里可以使用 YOLO、SSD 等物体检测器
        gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        edges = cv2.Canny(gray, 100, 200)
        
        contours, _ = cv2.findContours(edges, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
        
        detections = []
        for contour in contours:
            area = cv2.contourArea(contour)
            if area > 100:
                x, y, w, h = cv2.boundingRect(contour)
                detections.append({
                    'bbox': (x, y, x+w, y+h),
                    'center': ((x + x+w) // 2, (y + y+h) // 2),
                    'area': area
                })
        
        return detections
    
    def _draw_results(self, frame, tracked):
        """绘制追踪结果"""
        output = frame.copy()
        
        for tracker_id, data in tracked.items():
            x, y = data['center']
            cv2.circle(output, (int(x), int(y)), 5, (0, 255, 0), -1)
            cv2.putText(output, f"ID: {tracker_id}", (int(x), int(y) - 10),
                       cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)
        
        # 显示 FPS
        fps = self.performance_monitor.get_fps()
        cv2.putText(output, f"FPS: {fps:.1f}", (10, 30),
                   cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
        
        return output
    
    def close(self):
        """关闭资源"""
        self.cap.release()
        cv2.destroyAllWindows()
        self.performance_monitor.print_stats()

总结

视频处理是 LeBot 机器人视觉系统的核心。本章涵盖了:

  1. 视频输入 - 摄像头和文件
  2. 实时处理 - 帧率控制和性能监测
  3. 时间分析 - 多帧融合和光流
  4. 物体追踪 - 背景减除和多物体追踪
  5. 视频输出 - 不同编码格式

在 LeBot 项目中,实时视频处理能力使机器人能够实时理解环境并做出相应的反应。

推荐资源

由 LeBot 开发团队编写