Skip to content

视觉识别实战

视觉识别概述

视觉识别是计算机视觉领域的核心应用,涉及从图像或视频中自动识别和分类物体、人物、场景等。在 LeBot 轮腿机器人项目中,视觉识别能力使机器人能够理解周围环境,做出智能决策。

视觉识别的主要任务

  1. 物体检测(Object Detection)- 找到图像中的物体并定位
  2. 图像分类(Image Classification)- 判断图像属于哪个类别
  3. 语义分割(Semantic Segmentation)- 为每个像素分配类别
  4. 实例分割(Instance Segmentation)- 识别和分割每个物体实例
  5. 人脸识别(Face Recognition)- 识别图像中的人脸
  6. 手势识别(Gesture Recognition)- 识别人的手势

实战 1:物体检测与跟踪

使用 YOLO 进行实时物体检测

python
import cv2
import torch
import numpy as np
from collections import deque
import time

class YOLOv8ObjectDetector:
    """基于 YOLOv8 的物体检测器"""
    
    def __init__(self, model_name='yolov8n.pt', conf_threshold=0.5):
        """
        初始化检测器
        
        Args:
            model_name: 模型名称(nano/small/medium/large/xlarge)
            conf_threshold: 置信度阈值
        """
        # 尝试使用 ultralytics YOLO
        try:
            from ultralytics import YOLO
            self.model = YOLO(model_name)
            self.use_ultralytics = True
        except ImportError:
            print("请安装 ultralytics: pip install ultralytics")
            self.use_ultralytics = False
        
        self.conf_threshold = conf_threshold
        self.device = 'cuda' if torch.cuda.is_available() else 'cpu'
    
    def detect(self, frame):
        """
        检测图像中的物体
        
        Args:
            frame: 输入图像 (BGR 格式)
            
        Returns:
            detections: 检测结果列表
            frame: 绘制了检测框的图像
        """
        if not self.use_ultralytics:
            return [], frame
        
        # 运行检测
        results = self.model(frame, conf=self.conf_threshold, device=self.device)
        
        detections = []
        
        # 解析结果
        for result in results:
            boxes = result.boxes
            
            for box in boxes:
                x1, y1, x2, y2 = map(int, box.xyxy[0])
                conf = box.conf[0].item()
                cls_id = int(box.cls[0].item())
                cls_name = self.model.names[cls_id]
                
                detection = {
                    'bbox': (x1, y1, x2, y2),
                    'confidence': conf,
                    'class_id': cls_id,
                    'class_name': cls_name,
                    'center': ((x1 + x2) // 2, (y1 + y2) // 2),
                    'area': (x2 - x1) * (y2 - y1)
                }
                detections.append(detection)
                
                # 绘制检测框
                cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
                cv2.putText(frame, f"{cls_name}: {conf:.2f}", (x1, y1 - 10),
                           cv2.FONT_HERSHEY_SIMPLEX, 0.9, (0, 255, 0), 2)
        
        return detections, frame
    
    def filter_detections(self, detections, target_classes=None, min_conf=0.5):
        """
        过滤检测结果
        
        Args:
            detections: 检测结果列表
            target_classes: 目标类别列表(None 表示所有类别)
            min_conf: 最小置信度
            
        Returns:
            filtered_detections: 过滤后的检测结果
        """
        filtered = []
        for det in detections:
            if det['confidence'] < min_conf:
                continue
            
            if target_classes and det['class_name'] not in target_classes:
                continue
            
            filtered.append(det)
        
        return filtered


class ObjectTracker:
    """物体追踪器"""
    
    def __init__(self, max_history=30, distance_threshold=50):
        """
        初始化追踪器
        
        Args:
            max_history: 历史数据最大长度
            distance_threshold: 距离阈值
        """
        self.tracked_objects = {}
        self.next_id = 0
        self.max_history = max_history
        self.distance_threshold = distance_threshold
    
    def update(self, detections):
        """
        更新追踪
        
        Args:
            detections: 当前帧的检测结果
            
        Returns:
            tracked_objects: 追踪对象字典
        """
        if not detections:
            # 没有检测到物体,更新追踪状态
            for obj_id in list(self.tracked_objects.keys()):
                self.tracked_objects[obj_id]['missing_frames'] += 1
                
                # 如果失踪太久,删除该追踪
                if self.tracked_objects[obj_id]['missing_frames'] > 30:
                    del self.tracked_objects[obj_id]
            
            return self.tracked_objects
        
        # 匹配当前检测到前一帧的追踪对象
        current_centers = [d['center'] for d in detections]
        tracked_centers = [obj['center'] for obj in self.tracked_objects.values()]
        
        # 计算距离矩阵
        if tracked_centers:
            distances = np.zeros((len(detections), len(tracked_centers)))
            for i, curr_center in enumerate(current_centers):
                for j, tracked_center in enumerate(tracked_centers):
                    dist = np.sqrt((curr_center[0] - tracked_center[0])**2 + 
                                  (curr_center[1] - tracked_center[1])**2)
                    distances[i, j] = dist
            
            # 简单的贪心匹配
            matched_pairs = self._match_detections(distances)
            
            # 更新匹配的对象
            matched_det_indices = set()
            for det_idx, obj_id in matched_pairs:
                matched_det_indices.add(det_idx)
                
                detection = detections[det_idx]
                self.tracked_objects[obj_id]['center'] = detection['center']
                self.tracked_objects[obj_id]['bbox'] = detection['bbox']
                self.tracked_objects[obj_id]['class_name'] = detection['class_name']
                self.tracked_objects[obj_id]['confidence'] = detection['confidence']
                self.tracked_objects[obj_id]['missing_frames'] = 0
                self.tracked_objects[obj_id]['history'].append(detection['center'])
                
                if len(self.tracked_objects[obj_id]['history']) > self.max_history:
                    self.tracked_objects[obj_id]['history'].pop(0)
            
            # 为未匹配的检测创建新的追踪
            for i, detection in enumerate(detections):
                if i not in matched_det_indices:
                    self.tracked_objects[self.next_id] = {
                        'id': self.next_id,
                        'center': detection['center'],
                        'bbox': detection['bbox'],
                        'class_name': detection['class_name'],
                        'confidence': detection['confidence'],
                        'missing_frames': 0,
                        'history': [detection['center']],
                        'first_detected_frame': 0
                    }
                    self.next_id += 1
        else:
            # 没有以前的追踪,所有检测都是新的
            for i, detection in enumerate(detections):
                self.tracked_objects[self.next_id] = {
                    'id': self.next_id,
                    'center': detection['center'],
                    'bbox': detection['bbox'],
                    'class_name': detection['class_name'],
                    'confidence': detection['confidence'],
                    'missing_frames': 0,
                    'history': [detection['center']],
                    'first_detected_frame': 0
                }
                self.next_id += 1
        
        return self.tracked_objects
    
    def _match_detections(self, distances):
        """使用贪心方法匹配检测"""
        matched_pairs = []
        matched_det_indices = set()
        matched_obj_indices = set()
        
        tracked_obj_ids = list(self.tracked_objects.keys())
        
        # 贪心匹配
        while True:
            min_dist = float('inf')
            best_pair = None
            
            for i in range(distances.shape[0]):
                if i in matched_det_indices:
                    continue
                
                for j in range(distances.shape[1]):
                    if j in matched_obj_indices:
                        continue
                    
                    if distances[i, j] < min_dist:
                        min_dist = distances[i, j]
                        best_pair = (i, j)
            
            if best_pair is None or min_dist > self.distance_threshold:
                break
            
            det_idx, obj_idx = best_pair
            matched_det_indices.add(det_idx)
            matched_obj_indices.add(obj_idx)
            
            obj_id = tracked_obj_ids[obj_idx]
            matched_pairs.append((det_idx, obj_id))
        
        return matched_pairs
    
    def draw_tracks(self, frame):
        """绘制追踪结果"""
        colors = [(0, 255, 0), (255, 0, 0), (0, 0, 255), (255, 255, 0), 
                 (255, 0, 255), (0, 255, 255)]
        
        for obj in self.tracked_objects.values():
            if obj['missing_frames'] > 5:
                continue
            
            color = colors[obj['id'] % len(colors)]
            
            # 绘制边界框
            x1, y1, x2, y2 = obj['bbox']
            cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2)
            
            # 绘制 ID 和类别
            label = f"ID: {obj['id']} {obj['class_name']}"
            cv2.putText(frame, label, (x1, y1 - 10),
                       cv2.FONT_HERSHEY_SIMPLEX, 0.7, color, 2)
            
            # 绘制轨迹
            history = obj['history']
            for i in range(1, len(history)):
                cv2.line(frame, history[i-1], history[i], color, 2)
        
        return frame


# 使用示例
def detect_and_track_objects():
    detector = YOLOv8ObjectDetector('yolov8n.pt', conf_threshold=0.5)
    tracker = ObjectTracker()
    
    cap = cv2.VideoCapture(0)
    
    while True:
        ret, frame = cap.read()
        if not ret:
            break
        
        # 检测物体
        detections, frame = detector.detect(frame)
        
        # 过滤检测结果
        detections = detector.filter_detections(detections, min_conf=0.5)
        
        # 更新追踪
        tracked_objects = tracker.update(detections)
        
        # 绘制追踪结果
        frame = tracker.draw_tracks(frame)
        
        # 显示帧率
        fps = cap.get(cv2.CAP_PROP_FPS)
        cv2.putText(frame, f"FPS: {fps:.1f}", (10, 30),
                   cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
        
        cv2.imshow('Object Detection and Tracking', frame)
        
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break
    
    cap.release()
    cv2.destroyAllWindows()

实战 2:语义分割与环境理解

使用 FCN 进行语义分割

python
import torch
import torchvision.transforms as transforms
from PIL import Image
import matplotlib.pyplot as plt

class SemanticSegmenter:
    """语义分割器"""
    
    def __init__(self, model_name='fcn_resnet50', num_classes=21):
        """
        初始化分割器
        
        Args:
            model_name: 模型名称
            num_classes: 类别数
        """
        from torchvision import models
        
        if model_name == 'fcn_resnet50':
            self.model = models.segmentation.fcn_resnet50(
                pretrained=True, 
                num_classes=num_classes
            )
        elif model_name == 'deeplabv3':
            self.model = models.segmentation.deeplabv3_resnet50(
                pretrained=True,
                num_classes=num_classes
            )
        
        self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
        self.model = self.model.to(self.device)
        self.model.eval()
        
        self.transform = transforms.Compose([
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.485, 0.456, 0.406],
                               std=[0.229, 0.224, 0.225])
        ])
        
        # PASCAL VOC 类别
        self.classes = [
            'background', 'aeroplane', 'bicycle', 'bird', 'boat',
            'bottle', 'bus', 'car', 'cat', 'chair', 'cow', 'diningtable',
            'dog', 'horse', 'motorbike', 'person', 'pottedplant',
            'sheep', 'sofa', 'train', 'tvmonitor'
        ]
    
    def segment(self, image):
        """
        对图像进行分割
        
        Args:
            image: 输入图像 (numpy array 或 PIL Image)
            
        Returns:
            segmentation_map: 分割结果
            output_image: 可视化的分割结果
        """
        if isinstance(image, np.ndarray):
            image = Image.fromarray(cv2.cvtColor(image, cv2.COLOR_BGR2RGB))
        
        # 预处理
        input_tensor = self.transform(image).unsqueeze(0).to(self.device)
        
        # 推理
        with torch.no_grad():
            output = self.model(input_tensor)
        
        # 获取分割图
        segmentation_map = output['out'].argmax(1)[0].cpu().numpy()
        
        return segmentation_map
    
    def visualize_segmentation(self, image, segmentation_map):
        """
        可视化分割结果
        
        Args:
            image: 原始图像
            segmentation_map: 分割结果
            
        Returns:
            vis_image: 可视化的图像
        """
        # 创建彩色分割图
        segmentation_color = np.zeros((*segmentation_map.shape, 3), dtype=np.uint8)
        
        colors = plt.cm.get_cmap('tab20')
        for i in range(len(self.classes)):
            mask = segmentation_map == i
            color = (np.array(colors(i)[:3]) * 255).astype(np.uint8)
            segmentation_color[mask] = color
        
        # 将原图缩放到分割图的大小
        if isinstance(image, np.ndarray):
            image_resized = cv2.resize(image, 
                                       (segmentation_map.shape[1], segmentation_map.shape[0]))
        else:
            image_resized = np.array(image.resize(
                (segmentation_map.shape[1], segmentation_map.shape[0])
            ))
        
        # 融合
        vis_image = cv2.addWeighted(
            cv2.cvtColor(image_resized, cv2.COLOR_RGB2BGR) if isinstance(image, Image.Image) else image_resized,
            0.6,
            segmentation_color,
            0.4,
            0
        )
        
        return vis_image
    
    def extract_roi_by_class(self, image, class_name):
        """
        根据类别提取 ROI
        
        Args:
            image: 输入图像
            class_name: 类别名称
            
        Returns:
            roi: 提取的区域
            mask: 对应的掩码
        """
        segmentation_map = self.segment(image)
        
        if class_name not in self.classes:
            print(f"未知类别: {class_name}")
            return None, None
        
        class_id = self.classes.index(class_name)
        mask = (segmentation_map == class_id).astype(np.uint8) * 255
        
        # 应用掩码
        if isinstance(image, np.ndarray):
            roi = cv2.bitwise_and(image, image, mask=mask)
        else:
            image_array = np.array(image)
            roi = cv2.bitwise_and(image_array, image_array, mask=mask)
        
        return roi, mask


# 使用示例
def segment_environment():
    segmenter = SemanticSegmenter('fcn_resnet50')
    cap = cv2.VideoCapture(0)
    
    while True:
        ret, frame = cap.read()
        if not ret:
            break
        
        # 调整大小以加快处理
        frame_resized = cv2.resize(frame, (640, 480))
        
        # 分割
        segmentation_map = segmenter.segment(frame_resized)
        
        # 可视化
        vis_image = segmenter.visualize_segmentation(frame_resized, segmentation_map)
        
        cv2.imshow('Semantic Segmentation', vis_image)
        
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break
    
    cap.release()
    cv2.destroyAllWindows()

实战 3:人脸识别与识别

使用 FaceNet 进行人脸识别

python
import torch
import numpy as np
from scipy.spatial.distance import euclidean
from sklearn.preprocessing import normalize

class FaceRecognizer:
    """人脸识别器"""
    
    def __init__(self, model_path=None):
        """
        初始化人脸识别器
        
        Args:
            model_path: 预训练模型路径
        """
        try:
            from facenet_pytorch import InceptionResnetV1
            
            self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
            self.model = InceptionResnetV1(pretrained='vggface2').eval().to(self.device)
            
        except ImportError:
            print("请安装 facenet-pytorch: pip install facenet-pytorch")
            self.model = None
        
        self.face_embeddings = {}  # 存储已知人脸的嵌入
        self.person_names = {}     # 映射 ID 到人名
        self.next_person_id = 0
    
    def extract_face_embedding(self, face_image):
        """
        提取人脸嵌入
        
        Args:
            face_image: 人脸图像 (numpy array)
            
        Returns:
            embedding: 人脸嵌入向量
        """
        if self.model is None:
            return None
        
        # 转换为张量
        if len(face_image.shape) == 3:
            face_tensor = torch.from_numpy(face_image).permute(2, 0, 1).unsqueeze(0)
        else:
            face_tensor = torch.from_numpy(face_image).unsqueeze(0).unsqueeze(0)
        
        face_tensor = face_tensor.float().to(self.device)
        
        # 标准化
        face_tensor = (face_tensor - 127.5) / 128
        
        # 提取嵌入
        with torch.no_grad():
            embedding = self.model(face_tensor)
        
        return embedding.cpu().numpy()
    
    def register_face(self, face_image, person_name):
        """
        注册新的人脸
        
        Args:
            face_image: 人脸图像
            person_name: 人的名字
        """
        embedding = self.extract_face_embedding(face_image)
        
        if embedding is not None:
            person_id = self.next_person_id
            self.face_embeddings[person_id] = embedding
            self.person_names[person_id] = person_name
            self.next_person_id += 1
            
            return person_id
        
        return None
    
    def recognize_face(self, face_image, threshold=0.6):
        """
        识别人脸
        
        Args:
            face_image: 人脸图像
            threshold: 相似度阈值
            
        Returns:
            person_name: 识别出的人名(如果有)
            confidence: 相似度
        """
        if not self.face_embeddings:
            return "Unknown", 0.0
        
        embedding = self.extract_face_embedding(face_image)
        
        if embedding is None:
            return "Unknown", 0.0
        
        min_distance = float('inf')
        best_person_id = None
        
        # 计算与所有已知人脸的距离
        for person_id, stored_embedding in self.face_embeddings.items():
            distance = euclidean(embedding.flatten(), stored_embedding.flatten())
            
            if distance < min_distance:
                min_distance = distance
                best_person_id = person_id
        
        # 转换距离为相似度
        similarity = 1 / (1 + min_distance)
        
        if similarity > threshold and best_person_id is not None:
            person_name = self.person_names[best_person_id]
            return person_name, similarity
        else:
            return "Unknown", similarity


class FaceDetector:
    """使用 MediaPipe 的人脸检测器"""
    
    def __init__(self):
        import mediapipe as mp
        
        self.mp_face_detection = mp.solutions.face_detection
        self.face_detection = self.mp_face_detection.FaceDetection(
            model_selection=0,
            min_detection_confidence=0.5
        )
    
    def detect_faces(self, frame):
        """
        检测图像中的人脸
        
        Args:
            frame: 输入图像
            
        Returns:
            faces: 人脸列表,每个元素包含 bbox 和置信度
        """
        h, w, c = frame.shape
        frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        
        results = self.face_detection.process(frame_rgb)
        
        faces = []
        if results.detections:
            for detection in results.detections:
                bbox = detection.location_data.relative_bounding_box
                
                x1 = int(bbox.xmin * w)
                y1 = int(bbox.ymin * h)
                x2 = int((bbox.xmin + bbox.width) * w)
                y2 = int((bbox.ymin + bbox.height) * h)
                
                confidence = detection.score[0]
                
                faces.append({
                    'bbox': (x1, y1, x2, y2),
                    'confidence': confidence,
                    'face_image': frame[max(0, y1):min(h, y2), max(0, x1):min(w, x2)]
                })
        
        return faces
    
    def draw_faces(self, frame, faces, names_and_confidence=None):
        """
        在图像上绘制人脸检测结果
        
        Args:
            frame: 输入图像
            faces: 人脸列表
            names_and_confidence: 人名和置信度列表
            
        Returns:
            frame: 标注后的图像
        """
        for i, face in enumerate(faces):
            x1, y1, x2, y2 = face['bbox']
            
            cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
            
            if names_and_confidence:
                name, conf = names_and_confidence[i]
                label = f"{name}: {conf:.2f}"
                cv2.putText(frame, label, (x1, y1 - 10),
                           cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)
        
        return frame


# 使用示例
def face_recognition_demo():
    detector = FaceDetector()
    recognizer = FaceRecognizer()
    
    cap = cv2.VideoCapture(0)
    
    # 先进行一些人脸注册
    registered = False
    
    while True:
        ret, frame = cap.read()
        if not ret:
            break
        
        # 检测人脸
        faces = detector.detect_faces(frame)
        
        # 如果还没有注册人脸,进行注册
        if not registered and len(faces) > 0:
            recognizer.register_face(faces[0]['face_image'], 'Person_1')
            registered = True
        
        # 识别人脸
        names_and_confidence = []
        for face in faces:
            name, conf = recognizer.recognize_face(face['face_image'])
            names_and_confidence.append((name, conf))
        
        # 绘制结果
        frame = detector.draw_faces(frame, faces, names_and_confidence)
        
        cv2.imshow('Face Recognition', frame)
        
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break
    
    cap.release()
    cv2.destroyAllWindows()

实战 4:手势识别与交互

使用 MediaPipe Hands 进行手势识别

python
import mediapipe as mp
import cv2
import numpy as np
from collections import deque

class GestureRecognizer:
    """手势识别器"""
    
    def __init__(self, history_length=30):
        self.mp_hands = mp.solutions.hands
        self.hands = self.mp_hands.Hands(
            static_image_mode=False,
            max_num_hands=2,
            min_detection_confidence=0.7,
            min_tracking_confidence=0.5
        )
        
        self.mp_drawing = mp.solutions.drawing_utils
        self.history_length = history_length
        self.hand_history = deque(maxlen=history_length)
    
    def detect_hands(self, frame):
        """
        检测图像中的手
        
        Args:
            frame: 输入图像
            
        Returns:
            hands: 手部检测结果
        """
        frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        results = self.hands.process(frame_rgb)
        
        hands = []
        if results.multi_hand_landmarks:
            for i, hand_landmarks in enumerate(results.multi_hand_landmarks):
                handedness = results.multi_handedness[i].classification[0].label
                
                hand_info = {
                    'landmarks': hand_landmarks.landmark,
                    'handedness': handedness,
                    'hand_id': i
                }
                hands.append(hand_info)
        
        return hands
    
    def recognize_gesture(self, hand):
        """
        识别手势
        
        Args:
            hand: 手部信息
            
        Returns:
            gesture: 识别出的手势
        """
        landmarks = hand['landmarks']
        
        # 提取手指的状态
        finger_states = self._get_finger_states(landmarks)
        
        # 根据手指状态识别手势
        if all(finger_states):  # 所有手指都张开
            return 'Open_Hand'
        elif not any(finger_states):  # 所有手指都弯曲
            return 'Fist'
        elif finger_states[0] and not any(finger_states[1:]):  # 只有拇指张开
            return 'Thumbs_Up'
        elif finger_states[1] and not any([finger_states[i] for i in [0, 2, 3, 4]]):  # 只有食指张开
            return 'Pointing'
        elif finger_states[0] and finger_states[1] and not any([finger_states[i] for i in [2, 3, 4]]):
            return 'Victory'
        else:
            return 'Unknown'
    
    def _get_finger_states(self, landmarks):
        """
        获取手指的状态(张开/弯曲)
        
        Args:
            landmarks: 手部关键点
            
        Returns:
            finger_states: 5 个手指的状态列表
        """
        # 手部关键点索引
        # 0: 手腕, 1-4: 拇指, 5-8: 食指, 9-12: 中指, 13-16: 无名指, 17-20: 小指
        
        finger_tips = [4, 8, 12, 16, 20]
        finger_pips = [3, 7, 11, 15, 19]
        
        finger_states = []
        
        for tip_idx, pip_idx in zip(finger_tips, finger_pips):
            # 如果指尖比指根高(y 坐标更小),则手指张开
            if landmarks[tip_idx].y < landmarks[pip_idx].y:
                finger_states.append(True)  # 张开
            else:
                finger_states.append(False)  # 弯曲
        
        return finger_states
    
    def draw_hands(self, frame, hands, gestures=None):
        """
        在图像上绘制手部检测结果
        
        Args:
            frame: 输入图像
            hands: 手部检测结果
            gestures: 识别出的手势
            
        Returns:
            frame: 标注后的图像
        """
        for i, hand in enumerate(hands):
            # 绘制关键点和连接
            self.mp_drawing.draw_landmarks(
                frame,
                hand['landmarks'],
                self.mp_hands.HAND_CONNECTIONS,
                self.mp_drawing.DrawingSpec(color=(0, 255, 0), thickness=2, circle_radius=2),
                self.mp_drawing.DrawingSpec(color=(255, 0, 0), thickness=2)
            )
            
            # 绘制手势标签
            if gestures:
                gesture = gestures[i]
                cv2.putText(frame, f"{hand['handedness']}: {gesture}", (10, 30 + i * 30),
                           cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
        
        return frame


# 使用示例
def gesture_recognition_demo():
    recognizer = GestureRecognizer()
    cap = cv2.VideoCapture(0)
    
    while True:
        ret, frame = cap.read()
        if not ret:
            break
        
        # 检测手
        hands = recognizer.detect_hands(frame)
        
        # 识别手势
        gestures = []
        for hand in hands:
            gesture = recognizer.recognize_gesture(hand)
            gestures.append(gesture)
        
        # 绘制结果
        frame = recognizer.draw_hands(frame, hands, gestures)
        
        cv2.imshow('Gesture Recognition', frame)
        
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break
    
    cap.release()
    cv2.destroyAllWindows()

LeBot 中的视觉识别应用

集成视觉识别系统

python
class LeBotVisionSystem:
    """LeBot 集成视觉系统"""
    
    def __init__(self):
        self.object_detector = YOLOv8ObjectDetector()
        self.object_tracker = ObjectTracker()
        self.segmenter = SemanticSegmenter()
        self.gesture_recognizer = GestureRecognizer()
        self.face_recognizer = FaceRecognizer()
        self.face_detector = FaceDetector()
        
        self.robot_controller = None  # 机器人控制接口
    
    def process_frame(self, frame):
        """
        处理单帧图像
        
        Args:
            frame: 输入帧
            
        Returns:
            results: 包含多种识别结果的字典
        """
        results = {
            'objects': [],
            'gestures': [],
            'faces': [],
            'segmentation': None,
            'timestamp': time.time()
        }
        
        # 物体检测和追踪
        detections, _ = self.object_detector.detect(frame)
        tracked_objects = self.object_tracker.update(detections)
        results['objects'] = list(tracked_objects.values())
        
        # 手势识别
        hands = self.gesture_recognizer.detect_hands(frame)
        for hand in hands:
            gesture = self.gesture_recognizer.recognize_gesture(hand)
            results['gestures'].append({
                'hand': hand['handedness'],
                'gesture': gesture
            })
        
        # 人脸识别
        face_detections = self.face_detector.detect_faces(frame)
        for face in face_detections:
            name, confidence = self.face_recognizer.recognize_face(face['face_image'])
            results['faces'].append({
                'name': name,
                'confidence': confidence,
                'bbox': face['bbox']
            })
        
        # 语义分割
        # results['segmentation'] = self.segmenter.segment(frame)
        
        return results
    
    def react_to_gestures(self, gestures):
        """
        根据手势做出反应
        
        Args:
            gestures: 识别出的手势
        """
        for gesture_info in gestures:
            gesture = gesture_info['gesture']
            
            if gesture == 'Pointing':
                print("用户指向某个方向")
                # 让机器人向指向的方向移动
            
            elif gesture == 'Open_Hand':
                print("用户摊开手掌,让机器人停止")
                if self.robot_controller:
                    self.robot_controller.stop()
            
            elif gesture == 'Thumbs_Up':
                print("用户赞同,机器人执行当前任务")
                if self.robot_controller:
                    self.robot_controller.execute_task()
            
            elif gesture == 'Fist':
                print("用户握拳,让机器人待命")
                if self.robot_controller:
                    self.robot_controller.standby()
    
    def follow_person(self, person_name):
        """
        让机器人跟随指定的人
        
        Args:
            person_name: 人的名字
        """
        # 在识别结果中查找指定的人
        # 计算该人与机器人的相对位置
        # 让机器人跟随该人
        pass
    
    def avoid_obstacles(self, objects):
        """
        让机器人避免障碍物
        
        Args:
            objects: 检测到的物体
        """
        # 判断是否有障碍物
        # 如果有,计算避障路径
        # 发送命令给机器人
        pass


# 使用示例
def run_vision_system():
    vision_system = LeBotVisionSystem()
    cap = cv2.VideoCapture(0)
    
    while True:
        ret, frame = cap.read()
        if not ret:
            break
        
        # 处理帧
        results = vision_system.process_frame(frame)
        
        # 根据识别结果做出反应
        vision_system.react_to_gestures(results['gestures'])
        vision_system.avoid_obstacles(results['objects'])
        
        # 显示结果
        cv2.imshow('LeBot Vision System', frame)
        
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break
    
    cap.release()
    cv2.destroyAllWindows()

性能优化建议

1. 实时处理优化

python
class OptimizedVisionPipeline:
    """优化的视觉处理管道"""
    
    def __init__(self, skip_frames=2):
        self.detector = YOLOv8ObjectDetector()
        self.skip_frames = skip_frames
        self.frame_count = 0
    
    def process_optimized(self, frame):
        """
        优化的处理流程
        
        Args:
            frame: 输入帧
            
        Returns:
            results: 处理结果
        """
        self.frame_count += 1
        
        results = {}
        
        # 每隔 skip_frames 帧进行一次重型处理
        if self.frame_count % self.skip_frames == 0:
            # 检测
            detections, _ = self.detector.detect(frame)
            results['detections'] = detections
        
        # 轻型处理(如追踪)可以每帧进行
        
        return results

2. 多线程处理

python
import threading
from queue import Queue

class ThreadedVisionSystem:
    """多线程视觉系统"""
    
    def __init__(self):
        self.frame_queue = Queue(maxsize=2)
        self.result_queue = Queue()
        
        self.processing_thread = threading.Thread(target=self._process_worker, daemon=True)
        self.processing_thread.start()
    
    def _process_worker(self):
        """处理线程"""
        detector = YOLOv8ObjectDetector()
        
        while True:
            frame = self.frame_queue.get()
            if frame is None:
                break
            
            detections, _ = detector.detect(frame)
            self.result_queue.put(detections)
    
    def add_frame(self, frame):
        """添加帧到处理队列"""
        try:
            self.frame_queue.put_nowait(frame)
        except:
            pass
    
    def get_results(self):
        """获取处理结果"""
        try:
            return self.result_queue.get_nowait()
        except:
            return None

总结

视觉识别是 LeBot 项目中的关键功能,通过集成多种识别算法,LeBot 能够:

  1. 理解环境 - 通过物体检测和分割理解周围环境
  2. 追踪对象 - 持续追踪感兴趣的目标
  3. 识别人员 - 识别和跟随特定的人员
  4. 理解手势 - 通过手势进行人机交互
  5. 做出决策 - 基于识别结果做出智能决策

推荐资源

由 LeBot 开发团队编写