视觉识别实战
视觉识别概述
视觉识别是计算机视觉领域的核心应用,涉及从图像或视频中自动识别和分类物体、人物、场景等。在 LeBot 轮腿机器人项目中,视觉识别能力使机器人能够理解周围环境,做出智能决策。
视觉识别的主要任务
- 物体检测(Object Detection)- 找到图像中的物体并定位
- 图像分类(Image Classification)- 判断图像属于哪个类别
- 语义分割(Semantic Segmentation)- 为每个像素分配类别
- 实例分割(Instance Segmentation)- 识别和分割每个物体实例
- 人脸识别(Face Recognition)- 识别图像中的人脸
- 手势识别(Gesture Recognition)- 识别人的手势
实战 1:物体检测与跟踪
使用 YOLO 进行实时物体检测
python
import cv2
import torch
import numpy as np
from collections import deque
import time
class YOLOv8ObjectDetector:
"""基于 YOLOv8 的物体检测器"""
def __init__(self, model_name='yolov8n.pt', conf_threshold=0.5):
"""
初始化检测器
Args:
model_name: 模型名称(nano/small/medium/large/xlarge)
conf_threshold: 置信度阈值
"""
# 尝试使用 ultralytics YOLO
try:
from ultralytics import YOLO
self.model = YOLO(model_name)
self.use_ultralytics = True
except ImportError:
print("请安装 ultralytics: pip install ultralytics")
self.use_ultralytics = False
self.conf_threshold = conf_threshold
self.device = 'cuda' if torch.cuda.is_available() else 'cpu'
def detect(self, frame):
"""
检测图像中的物体
Args:
frame: 输入图像 (BGR 格式)
Returns:
detections: 检测结果列表
frame: 绘制了检测框的图像
"""
if not self.use_ultralytics:
return [], frame
# 运行检测
results = self.model(frame, conf=self.conf_threshold, device=self.device)
detections = []
# 解析结果
for result in results:
boxes = result.boxes
for box in boxes:
x1, y1, x2, y2 = map(int, box.xyxy[0])
conf = box.conf[0].item()
cls_id = int(box.cls[0].item())
cls_name = self.model.names[cls_id]
detection = {
'bbox': (x1, y1, x2, y2),
'confidence': conf,
'class_id': cls_id,
'class_name': cls_name,
'center': ((x1 + x2) // 2, (y1 + y2) // 2),
'area': (x2 - x1) * (y2 - y1)
}
detections.append(detection)
# 绘制检测框
cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
cv2.putText(frame, f"{cls_name}: {conf:.2f}", (x1, y1 - 10),
cv2.FONT_HERSHEY_SIMPLEX, 0.9, (0, 255, 0), 2)
return detections, frame
def filter_detections(self, detections, target_classes=None, min_conf=0.5):
"""
过滤检测结果
Args:
detections: 检测结果列表
target_classes: 目标类别列表(None 表示所有类别)
min_conf: 最小置信度
Returns:
filtered_detections: 过滤后的检测结果
"""
filtered = []
for det in detections:
if det['confidence'] < min_conf:
continue
if target_classes and det['class_name'] not in target_classes:
continue
filtered.append(det)
return filtered
class ObjectTracker:
"""物体追踪器"""
def __init__(self, max_history=30, distance_threshold=50):
"""
初始化追踪器
Args:
max_history: 历史数据最大长度
distance_threshold: 距离阈值
"""
self.tracked_objects = {}
self.next_id = 0
self.max_history = max_history
self.distance_threshold = distance_threshold
def update(self, detections):
"""
更新追踪
Args:
detections: 当前帧的检测结果
Returns:
tracked_objects: 追踪对象字典
"""
if not detections:
# 没有检测到物体,更新追踪状态
for obj_id in list(self.tracked_objects.keys()):
self.tracked_objects[obj_id]['missing_frames'] += 1
# 如果失踪太久,删除该追踪
if self.tracked_objects[obj_id]['missing_frames'] > 30:
del self.tracked_objects[obj_id]
return self.tracked_objects
# 匹配当前检测到前一帧的追踪对象
current_centers = [d['center'] for d in detections]
tracked_centers = [obj['center'] for obj in self.tracked_objects.values()]
# 计算距离矩阵
if tracked_centers:
distances = np.zeros((len(detections), len(tracked_centers)))
for i, curr_center in enumerate(current_centers):
for j, tracked_center in enumerate(tracked_centers):
dist = np.sqrt((curr_center[0] - tracked_center[0])**2 +
(curr_center[1] - tracked_center[1])**2)
distances[i, j] = dist
# 简单的贪心匹配
matched_pairs = self._match_detections(distances)
# 更新匹配的对象
matched_det_indices = set()
for det_idx, obj_id in matched_pairs:
matched_det_indices.add(det_idx)
detection = detections[det_idx]
self.tracked_objects[obj_id]['center'] = detection['center']
self.tracked_objects[obj_id]['bbox'] = detection['bbox']
self.tracked_objects[obj_id]['class_name'] = detection['class_name']
self.tracked_objects[obj_id]['confidence'] = detection['confidence']
self.tracked_objects[obj_id]['missing_frames'] = 0
self.tracked_objects[obj_id]['history'].append(detection['center'])
if len(self.tracked_objects[obj_id]['history']) > self.max_history:
self.tracked_objects[obj_id]['history'].pop(0)
# 为未匹配的检测创建新的追踪
for i, detection in enumerate(detections):
if i not in matched_det_indices:
self.tracked_objects[self.next_id] = {
'id': self.next_id,
'center': detection['center'],
'bbox': detection['bbox'],
'class_name': detection['class_name'],
'confidence': detection['confidence'],
'missing_frames': 0,
'history': [detection['center']],
'first_detected_frame': 0
}
self.next_id += 1
else:
# 没有以前的追踪,所有检测都是新的
for i, detection in enumerate(detections):
self.tracked_objects[self.next_id] = {
'id': self.next_id,
'center': detection['center'],
'bbox': detection['bbox'],
'class_name': detection['class_name'],
'confidence': detection['confidence'],
'missing_frames': 0,
'history': [detection['center']],
'first_detected_frame': 0
}
self.next_id += 1
return self.tracked_objects
def _match_detections(self, distances):
"""使用贪心方法匹配检测"""
matched_pairs = []
matched_det_indices = set()
matched_obj_indices = set()
tracked_obj_ids = list(self.tracked_objects.keys())
# 贪心匹配
while True:
min_dist = float('inf')
best_pair = None
for i in range(distances.shape[0]):
if i in matched_det_indices:
continue
for j in range(distances.shape[1]):
if j in matched_obj_indices:
continue
if distances[i, j] < min_dist:
min_dist = distances[i, j]
best_pair = (i, j)
if best_pair is None or min_dist > self.distance_threshold:
break
det_idx, obj_idx = best_pair
matched_det_indices.add(det_idx)
matched_obj_indices.add(obj_idx)
obj_id = tracked_obj_ids[obj_idx]
matched_pairs.append((det_idx, obj_id))
return matched_pairs
def draw_tracks(self, frame):
"""绘制追踪结果"""
colors = [(0, 255, 0), (255, 0, 0), (0, 0, 255), (255, 255, 0),
(255, 0, 255), (0, 255, 255)]
for obj in self.tracked_objects.values():
if obj['missing_frames'] > 5:
continue
color = colors[obj['id'] % len(colors)]
# 绘制边界框
x1, y1, x2, y2 = obj['bbox']
cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2)
# 绘制 ID 和类别
label = f"ID: {obj['id']} {obj['class_name']}"
cv2.putText(frame, label, (x1, y1 - 10),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, color, 2)
# 绘制轨迹
history = obj['history']
for i in range(1, len(history)):
cv2.line(frame, history[i-1], history[i], color, 2)
return frame
# 使用示例
def detect_and_track_objects():
detector = YOLOv8ObjectDetector('yolov8n.pt', conf_threshold=0.5)
tracker = ObjectTracker()
cap = cv2.VideoCapture(0)
while True:
ret, frame = cap.read()
if not ret:
break
# 检测物体
detections, frame = detector.detect(frame)
# 过滤检测结果
detections = detector.filter_detections(detections, min_conf=0.5)
# 更新追踪
tracked_objects = tracker.update(detections)
# 绘制追踪结果
frame = tracker.draw_tracks(frame)
# 显示帧率
fps = cap.get(cv2.CAP_PROP_FPS)
cv2.putText(frame, f"FPS: {fps:.1f}", (10, 30),
cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
cv2.imshow('Object Detection and Tracking', frame)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
cap.release()
cv2.destroyAllWindows()实战 2:语义分割与环境理解
使用 FCN 进行语义分割
python
import torch
import torchvision.transforms as transforms
from PIL import Image
import matplotlib.pyplot as plt
class SemanticSegmenter:
"""语义分割器"""
def __init__(self, model_name='fcn_resnet50', num_classes=21):
"""
初始化分割器
Args:
model_name: 模型名称
num_classes: 类别数
"""
from torchvision import models
if model_name == 'fcn_resnet50':
self.model = models.segmentation.fcn_resnet50(
pretrained=True,
num_classes=num_classes
)
elif model_name == 'deeplabv3':
self.model = models.segmentation.deeplabv3_resnet50(
pretrained=True,
num_classes=num_classes
)
self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
self.model = self.model.to(self.device)
self.model.eval()
self.transform = transforms.Compose([
transforms.ToTensor(),
transforms.Normalize(mean=[0.485, 0.456, 0.406],
std=[0.229, 0.224, 0.225])
])
# PASCAL VOC 类别
self.classes = [
'background', 'aeroplane', 'bicycle', 'bird', 'boat',
'bottle', 'bus', 'car', 'cat', 'chair', 'cow', 'diningtable',
'dog', 'horse', 'motorbike', 'person', 'pottedplant',
'sheep', 'sofa', 'train', 'tvmonitor'
]
def segment(self, image):
"""
对图像进行分割
Args:
image: 输入图像 (numpy array 或 PIL Image)
Returns:
segmentation_map: 分割结果
output_image: 可视化的分割结果
"""
if isinstance(image, np.ndarray):
image = Image.fromarray(cv2.cvtColor(image, cv2.COLOR_BGR2RGB))
# 预处理
input_tensor = self.transform(image).unsqueeze(0).to(self.device)
# 推理
with torch.no_grad():
output = self.model(input_tensor)
# 获取分割图
segmentation_map = output['out'].argmax(1)[0].cpu().numpy()
return segmentation_map
def visualize_segmentation(self, image, segmentation_map):
"""
可视化分割结果
Args:
image: 原始图像
segmentation_map: 分割结果
Returns:
vis_image: 可视化的图像
"""
# 创建彩色分割图
segmentation_color = np.zeros((*segmentation_map.shape, 3), dtype=np.uint8)
colors = plt.cm.get_cmap('tab20')
for i in range(len(self.classes)):
mask = segmentation_map == i
color = (np.array(colors(i)[:3]) * 255).astype(np.uint8)
segmentation_color[mask] = color
# 将原图缩放到分割图的大小
if isinstance(image, np.ndarray):
image_resized = cv2.resize(image,
(segmentation_map.shape[1], segmentation_map.shape[0]))
else:
image_resized = np.array(image.resize(
(segmentation_map.shape[1], segmentation_map.shape[0])
))
# 融合
vis_image = cv2.addWeighted(
cv2.cvtColor(image_resized, cv2.COLOR_RGB2BGR) if isinstance(image, Image.Image) else image_resized,
0.6,
segmentation_color,
0.4,
0
)
return vis_image
def extract_roi_by_class(self, image, class_name):
"""
根据类别提取 ROI
Args:
image: 输入图像
class_name: 类别名称
Returns:
roi: 提取的区域
mask: 对应的掩码
"""
segmentation_map = self.segment(image)
if class_name not in self.classes:
print(f"未知类别: {class_name}")
return None, None
class_id = self.classes.index(class_name)
mask = (segmentation_map == class_id).astype(np.uint8) * 255
# 应用掩码
if isinstance(image, np.ndarray):
roi = cv2.bitwise_and(image, image, mask=mask)
else:
image_array = np.array(image)
roi = cv2.bitwise_and(image_array, image_array, mask=mask)
return roi, mask
# 使用示例
def segment_environment():
segmenter = SemanticSegmenter('fcn_resnet50')
cap = cv2.VideoCapture(0)
while True:
ret, frame = cap.read()
if not ret:
break
# 调整大小以加快处理
frame_resized = cv2.resize(frame, (640, 480))
# 分割
segmentation_map = segmenter.segment(frame_resized)
# 可视化
vis_image = segmenter.visualize_segmentation(frame_resized, segmentation_map)
cv2.imshow('Semantic Segmentation', vis_image)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
cap.release()
cv2.destroyAllWindows()实战 3:人脸识别与识别
使用 FaceNet 进行人脸识别
python
import torch
import numpy as np
from scipy.spatial.distance import euclidean
from sklearn.preprocessing import normalize
class FaceRecognizer:
"""人脸识别器"""
def __init__(self, model_path=None):
"""
初始化人脸识别器
Args:
model_path: 预训练模型路径
"""
try:
from facenet_pytorch import InceptionResnetV1
self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
self.model = InceptionResnetV1(pretrained='vggface2').eval().to(self.device)
except ImportError:
print("请安装 facenet-pytorch: pip install facenet-pytorch")
self.model = None
self.face_embeddings = {} # 存储已知人脸的嵌入
self.person_names = {} # 映射 ID 到人名
self.next_person_id = 0
def extract_face_embedding(self, face_image):
"""
提取人脸嵌入
Args:
face_image: 人脸图像 (numpy array)
Returns:
embedding: 人脸嵌入向量
"""
if self.model is None:
return None
# 转换为张量
if len(face_image.shape) == 3:
face_tensor = torch.from_numpy(face_image).permute(2, 0, 1).unsqueeze(0)
else:
face_tensor = torch.from_numpy(face_image).unsqueeze(0).unsqueeze(0)
face_tensor = face_tensor.float().to(self.device)
# 标准化
face_tensor = (face_tensor - 127.5) / 128
# 提取嵌入
with torch.no_grad():
embedding = self.model(face_tensor)
return embedding.cpu().numpy()
def register_face(self, face_image, person_name):
"""
注册新的人脸
Args:
face_image: 人脸图像
person_name: 人的名字
"""
embedding = self.extract_face_embedding(face_image)
if embedding is not None:
person_id = self.next_person_id
self.face_embeddings[person_id] = embedding
self.person_names[person_id] = person_name
self.next_person_id += 1
return person_id
return None
def recognize_face(self, face_image, threshold=0.6):
"""
识别人脸
Args:
face_image: 人脸图像
threshold: 相似度阈值
Returns:
person_name: 识别出的人名(如果有)
confidence: 相似度
"""
if not self.face_embeddings:
return "Unknown", 0.0
embedding = self.extract_face_embedding(face_image)
if embedding is None:
return "Unknown", 0.0
min_distance = float('inf')
best_person_id = None
# 计算与所有已知人脸的距离
for person_id, stored_embedding in self.face_embeddings.items():
distance = euclidean(embedding.flatten(), stored_embedding.flatten())
if distance < min_distance:
min_distance = distance
best_person_id = person_id
# 转换距离为相似度
similarity = 1 / (1 + min_distance)
if similarity > threshold and best_person_id is not None:
person_name = self.person_names[best_person_id]
return person_name, similarity
else:
return "Unknown", similarity
class FaceDetector:
"""使用 MediaPipe 的人脸检测器"""
def __init__(self):
import mediapipe as mp
self.mp_face_detection = mp.solutions.face_detection
self.face_detection = self.mp_face_detection.FaceDetection(
model_selection=0,
min_detection_confidence=0.5
)
def detect_faces(self, frame):
"""
检测图像中的人脸
Args:
frame: 输入图像
Returns:
faces: 人脸列表,每个元素包含 bbox 和置信度
"""
h, w, c = frame.shape
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
results = self.face_detection.process(frame_rgb)
faces = []
if results.detections:
for detection in results.detections:
bbox = detection.location_data.relative_bounding_box
x1 = int(bbox.xmin * w)
y1 = int(bbox.ymin * h)
x2 = int((bbox.xmin + bbox.width) * w)
y2 = int((bbox.ymin + bbox.height) * h)
confidence = detection.score[0]
faces.append({
'bbox': (x1, y1, x2, y2),
'confidence': confidence,
'face_image': frame[max(0, y1):min(h, y2), max(0, x1):min(w, x2)]
})
return faces
def draw_faces(self, frame, faces, names_and_confidence=None):
"""
在图像上绘制人脸检测结果
Args:
frame: 输入图像
faces: 人脸列表
names_and_confidence: 人名和置信度列表
Returns:
frame: 标注后的图像
"""
for i, face in enumerate(faces):
x1, y1, x2, y2 = face['bbox']
cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
if names_and_confidence:
name, conf = names_and_confidence[i]
label = f"{name}: {conf:.2f}"
cv2.putText(frame, label, (x1, y1 - 10),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)
return frame
# 使用示例
def face_recognition_demo():
detector = FaceDetector()
recognizer = FaceRecognizer()
cap = cv2.VideoCapture(0)
# 先进行一些人脸注册
registered = False
while True:
ret, frame = cap.read()
if not ret:
break
# 检测人脸
faces = detector.detect_faces(frame)
# 如果还没有注册人脸,进行注册
if not registered and len(faces) > 0:
recognizer.register_face(faces[0]['face_image'], 'Person_1')
registered = True
# 识别人脸
names_and_confidence = []
for face in faces:
name, conf = recognizer.recognize_face(face['face_image'])
names_and_confidence.append((name, conf))
# 绘制结果
frame = detector.draw_faces(frame, faces, names_and_confidence)
cv2.imshow('Face Recognition', frame)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
cap.release()
cv2.destroyAllWindows()实战 4:手势识别与交互
使用 MediaPipe Hands 进行手势识别
python
import mediapipe as mp
import cv2
import numpy as np
from collections import deque
class GestureRecognizer:
"""手势识别器"""
def __init__(self, history_length=30):
self.mp_hands = mp.solutions.hands
self.hands = self.mp_hands.Hands(
static_image_mode=False,
max_num_hands=2,
min_detection_confidence=0.7,
min_tracking_confidence=0.5
)
self.mp_drawing = mp.solutions.drawing_utils
self.history_length = history_length
self.hand_history = deque(maxlen=history_length)
def detect_hands(self, frame):
"""
检测图像中的手
Args:
frame: 输入图像
Returns:
hands: 手部检测结果
"""
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
results = self.hands.process(frame_rgb)
hands = []
if results.multi_hand_landmarks:
for i, hand_landmarks in enumerate(results.multi_hand_landmarks):
handedness = results.multi_handedness[i].classification[0].label
hand_info = {
'landmarks': hand_landmarks.landmark,
'handedness': handedness,
'hand_id': i
}
hands.append(hand_info)
return hands
def recognize_gesture(self, hand):
"""
识别手势
Args:
hand: 手部信息
Returns:
gesture: 识别出的手势
"""
landmarks = hand['landmarks']
# 提取手指的状态
finger_states = self._get_finger_states(landmarks)
# 根据手指状态识别手势
if all(finger_states): # 所有手指都张开
return 'Open_Hand'
elif not any(finger_states): # 所有手指都弯曲
return 'Fist'
elif finger_states[0] and not any(finger_states[1:]): # 只有拇指张开
return 'Thumbs_Up'
elif finger_states[1] and not any([finger_states[i] for i in [0, 2, 3, 4]]): # 只有食指张开
return 'Pointing'
elif finger_states[0] and finger_states[1] and not any([finger_states[i] for i in [2, 3, 4]]):
return 'Victory'
else:
return 'Unknown'
def _get_finger_states(self, landmarks):
"""
获取手指的状态(张开/弯曲)
Args:
landmarks: 手部关键点
Returns:
finger_states: 5 个手指的状态列表
"""
# 手部关键点索引
# 0: 手腕, 1-4: 拇指, 5-8: 食指, 9-12: 中指, 13-16: 无名指, 17-20: 小指
finger_tips = [4, 8, 12, 16, 20]
finger_pips = [3, 7, 11, 15, 19]
finger_states = []
for tip_idx, pip_idx in zip(finger_tips, finger_pips):
# 如果指尖比指根高(y 坐标更小),则手指张开
if landmarks[tip_idx].y < landmarks[pip_idx].y:
finger_states.append(True) # 张开
else:
finger_states.append(False) # 弯曲
return finger_states
def draw_hands(self, frame, hands, gestures=None):
"""
在图像上绘制手部检测结果
Args:
frame: 输入图像
hands: 手部检测结果
gestures: 识别出的手势
Returns:
frame: 标注后的图像
"""
for i, hand in enumerate(hands):
# 绘制关键点和连接
self.mp_drawing.draw_landmarks(
frame,
hand['landmarks'],
self.mp_hands.HAND_CONNECTIONS,
self.mp_drawing.DrawingSpec(color=(0, 255, 0), thickness=2, circle_radius=2),
self.mp_drawing.DrawingSpec(color=(255, 0, 0), thickness=2)
)
# 绘制手势标签
if gestures:
gesture = gestures[i]
cv2.putText(frame, f"{hand['handedness']}: {gesture}", (10, 30 + i * 30),
cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
return frame
# 使用示例
def gesture_recognition_demo():
recognizer = GestureRecognizer()
cap = cv2.VideoCapture(0)
while True:
ret, frame = cap.read()
if not ret:
break
# 检测手
hands = recognizer.detect_hands(frame)
# 识别手势
gestures = []
for hand in hands:
gesture = recognizer.recognize_gesture(hand)
gestures.append(gesture)
# 绘制结果
frame = recognizer.draw_hands(frame, hands, gestures)
cv2.imshow('Gesture Recognition', frame)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
cap.release()
cv2.destroyAllWindows()LeBot 中的视觉识别应用
集成视觉识别系统
python
class LeBotVisionSystem:
"""LeBot 集成视觉系统"""
def __init__(self):
self.object_detector = YOLOv8ObjectDetector()
self.object_tracker = ObjectTracker()
self.segmenter = SemanticSegmenter()
self.gesture_recognizer = GestureRecognizer()
self.face_recognizer = FaceRecognizer()
self.face_detector = FaceDetector()
self.robot_controller = None # 机器人控制接口
def process_frame(self, frame):
"""
处理单帧图像
Args:
frame: 输入帧
Returns:
results: 包含多种识别结果的字典
"""
results = {
'objects': [],
'gestures': [],
'faces': [],
'segmentation': None,
'timestamp': time.time()
}
# 物体检测和追踪
detections, _ = self.object_detector.detect(frame)
tracked_objects = self.object_tracker.update(detections)
results['objects'] = list(tracked_objects.values())
# 手势识别
hands = self.gesture_recognizer.detect_hands(frame)
for hand in hands:
gesture = self.gesture_recognizer.recognize_gesture(hand)
results['gestures'].append({
'hand': hand['handedness'],
'gesture': gesture
})
# 人脸识别
face_detections = self.face_detector.detect_faces(frame)
for face in face_detections:
name, confidence = self.face_recognizer.recognize_face(face['face_image'])
results['faces'].append({
'name': name,
'confidence': confidence,
'bbox': face['bbox']
})
# 语义分割
# results['segmentation'] = self.segmenter.segment(frame)
return results
def react_to_gestures(self, gestures):
"""
根据手势做出反应
Args:
gestures: 识别出的手势
"""
for gesture_info in gestures:
gesture = gesture_info['gesture']
if gesture == 'Pointing':
print("用户指向某个方向")
# 让机器人向指向的方向移动
elif gesture == 'Open_Hand':
print("用户摊开手掌,让机器人停止")
if self.robot_controller:
self.robot_controller.stop()
elif gesture == 'Thumbs_Up':
print("用户赞同,机器人执行当前任务")
if self.robot_controller:
self.robot_controller.execute_task()
elif gesture == 'Fist':
print("用户握拳,让机器人待命")
if self.robot_controller:
self.robot_controller.standby()
def follow_person(self, person_name):
"""
让机器人跟随指定的人
Args:
person_name: 人的名字
"""
# 在识别结果中查找指定的人
# 计算该人与机器人的相对位置
# 让机器人跟随该人
pass
def avoid_obstacles(self, objects):
"""
让机器人避免障碍物
Args:
objects: 检测到的物体
"""
# 判断是否有障碍物
# 如果有,计算避障路径
# 发送命令给机器人
pass
# 使用示例
def run_vision_system():
vision_system = LeBotVisionSystem()
cap = cv2.VideoCapture(0)
while True:
ret, frame = cap.read()
if not ret:
break
# 处理帧
results = vision_system.process_frame(frame)
# 根据识别结果做出反应
vision_system.react_to_gestures(results['gestures'])
vision_system.avoid_obstacles(results['objects'])
# 显示结果
cv2.imshow('LeBot Vision System', frame)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
cap.release()
cv2.destroyAllWindows()性能优化建议
1. 实时处理优化
python
class OptimizedVisionPipeline:
"""优化的视觉处理管道"""
def __init__(self, skip_frames=2):
self.detector = YOLOv8ObjectDetector()
self.skip_frames = skip_frames
self.frame_count = 0
def process_optimized(self, frame):
"""
优化的处理流程
Args:
frame: 输入帧
Returns:
results: 处理结果
"""
self.frame_count += 1
results = {}
# 每隔 skip_frames 帧进行一次重型处理
if self.frame_count % self.skip_frames == 0:
# 检测
detections, _ = self.detector.detect(frame)
results['detections'] = detections
# 轻型处理(如追踪)可以每帧进行
return results2. 多线程处理
python
import threading
from queue import Queue
class ThreadedVisionSystem:
"""多线程视觉系统"""
def __init__(self):
self.frame_queue = Queue(maxsize=2)
self.result_queue = Queue()
self.processing_thread = threading.Thread(target=self._process_worker, daemon=True)
self.processing_thread.start()
def _process_worker(self):
"""处理线程"""
detector = YOLOv8ObjectDetector()
while True:
frame = self.frame_queue.get()
if frame is None:
break
detections, _ = detector.detect(frame)
self.result_queue.put(detections)
def add_frame(self, frame):
"""添加帧到处理队列"""
try:
self.frame_queue.put_nowait(frame)
except:
pass
def get_results(self):
"""获取处理结果"""
try:
return self.result_queue.get_nowait()
except:
return None总结
视觉识别是 LeBot 项目中的关键功能,通过集成多种识别算法,LeBot 能够:
- 理解环境 - 通过物体检测和分割理解周围环境
- 追踪对象 - 持续追踪感兴趣的目标
- 识别人员 - 识别和跟随特定的人员
- 理解手势 - 通过手势进行人机交互
- 做出决策 - 基于识别结果做出智能决策
推荐资源
- YOLOv8 官方文档:https://docs.ultralytics.com/
- MediaPipe 官方文档:https://mediapipe.dev/
- OpenCV 教程:https://docs.opencv.org/
- PyTorch 视觉模型:https://pytorch.org/vision/