Skip to content

Python 进阶用法

概述

在前一章中,我们介绍了 Python 的基础知识,包括语法、数据结构和函数。现在我们将深入探讨 Python 的高级特性,这些特性对于开发复杂的机器人应用至关重要。本章涵盖面向对象编程、装饰器、生成器、上下文管理器、多线程、多进程等高级概念。

面向对象编程(OOP)进阶

类的设计原则

在 Python 中,一切皆对象。理解类的设计原则对于编写可维护的代码至关重要。

单一职责原则(Single Responsibility Principle)

每个类应该只有一个责任,这样可以使代码更易维护和测试。

python
# 不好的设计:一个类负责多个职责
class RobotController:
    """机器人控制器 - 职责过多"""
    
    def __init__(self):
        self.motors = []
        self.sensors = []
    
    def drive_motor(self, motor_id, speed):
        """驱动电机"""
        pass
    
    def read_sensor(self, sensor_id):
        """读取传感器"""
        pass
    
    def save_log(self, message):
        """保存日志"""
        pass
    
    def send_network_data(self, data):
        """发送网络数据"""
        pass


# 好的设计:分离职责
class Motor:
    """电机类 - 只负责电机控制"""
    
    def __init__(self, motor_id, max_speed=1000):
        self.motor_id = motor_id
        self.max_speed = max_speed
        self.current_speed = 0
    
    def set_speed(self, speed):
        """设置电机速度"""
        self.current_speed = max(-self.max_speed, min(speed, self.max_speed))
    
    def get_current_speed(self):
        """获取当前速度"""
        return self.current_speed


class Sensor:
    """传感器类 - 只负责传感器读取"""
    
    def __init__(self, sensor_id, sensor_type):
        self.sensor_id = sensor_id
        self.sensor_type = sensor_type
        self.data = 0
    
    def read(self):
        """读取传感器数据"""
        return self.data
    
    def update(self, value):
        """更新传感器数据"""
        self.data = value


class Logger:
    """日志类 - 只负责日志记录"""
    
    def __init__(self, filename):
        self.filename = filename
    
    def log(self, level, message):
        """记录日志"""
        with open(self.filename, 'a') as f:
            f.write(f"[{level}] {message}\n")


class RobotController:
    """机器人控制器 - 协调各个组件"""
    
    def __init__(self):
        self.motors = {}
        self.sensors = {}
        self.logger = Logger("robot.log")
    
    def add_motor(self, motor_id, max_speed):
        """添加电机"""
        self.motors[motor_id] = Motor(motor_id, max_speed)
    
    def add_sensor(self, sensor_id, sensor_type):
        """添加传感器"""
        self.sensors[sensor_id] = Sensor(sensor_id, sensor_type)
    
    def drive_motor(self, motor_id, speed):
        """驱动指定电机"""
        if motor_id in self.motors:
            self.motors[motor_id].set_speed(speed)
            self.logger.log("INFO", f"Motor {motor_id} set to {speed}")
    
    def read_sensor(self, sensor_id):
        """读取指定传感器"""
        if sensor_id in self.sensors:
            return self.sensors[sensor_id].read()
        return None

继承与多态

继承允许我们创建类的层级结构,重用代码。多态让我们用统一的接口处理不同的对象。

python
from abc import ABC, abstractmethod
from typing import List

# 定义抽象基类
class Robot(ABC):
    """机器人基类"""
    
    def __init__(self, name):
        self.name = name
        self.is_moving = False
    
    @abstractmethod
    def move(self, direction, speed):
        """移动机器人 - 子类必须实现"""
        pass
    
    @abstractmethod
    def stop(self):
        """停止机器人 - 子类必须实现"""
        pass
    
    def get_status(self):
        """获取状态"""
        return {
            'name': self.name,
            'moving': self.is_moving
        }


class WheelRobot(Robot):
    """轮式机器人"""
    
    def __init__(self, name, num_wheels=4):
        super().__init__(name)
        self.num_wheels = num_wheels
        self.motors = [0] * num_wheels
    
    def move(self, direction, speed):
        """轮式机器人的移动实现"""
        self.is_moving = True
        
        if direction == 'forward':
            self.motors = [speed] * self.num_wheels
        elif direction == 'backward':
            self.motors = [-speed] * self.num_wheels
        elif direction == 'left':
            # 左转:左轮减速,右轮加速
            self.motors[0] = speed * 0.5
            self.motors[1] = speed
            self.motors[2] = speed * 0.5
            self.motors[3] = speed
        elif direction == 'right':
            # 右转:右轮减速,左轮加速
            self.motors[0] = speed
            self.motors[1] = speed * 0.5
            self.motors[2] = speed
            self.motors[3] = speed * 0.5
    
    def stop(self):
        """停止机器人"""
        self.is_moving = False
        self.motors = [0] * self.num_wheels


class LegRobot(Robot):
    """腿式机器人"""
    
    def __init__(self, name, num_legs=4):
        super().__init__(name)
        self.num_legs = num_legs
        self.leg_positions = [(0, 0)] * num_legs
        self.gait_phase = 0
    
    def move(self, direction, speed):
        """腿式机器人的移动实现"""
        self.is_moving = True
        
        # 步态控制(简化示例)
        if direction == 'forward':
            self._update_gait(speed)
        elif direction == 'backward':
            self._update_gait(-speed)
    
    def _update_gait(self, speed):
        """更新步态"""
        self.gait_phase = (self.gait_phase + speed) % 100
    
    def stop(self):
        """停止机器人"""
        self.is_moving = False
        self.gait_phase = 0


class HybridRobot(WheelRobot, LegRobot):
    """轮腿混合机器人(LeBot 的简化模型)"""
    
    def __init__(self, name):
        # 初始化轮式部分
        WheelRobot.__init__(self, name, num_wheels=4)
        # 初始化腿部部分
        self.legs = [0] * 4
        self.current_mode = 'wheel'  # 'wheel' 或 'leg'
    
    def set_mode(self, mode):
        """切换运动模式"""
        if mode in ['wheel', 'leg']:
            self.current_mode = mode
    
    def move(self, direction, speed):
        """根据当前模式移动"""
        if self.current_mode == 'wheel':
            WheelRobot.move(self, direction, speed)
        else:
            LegRobot.move(self, direction, speed)


# 多态示例
def control_robot(robot: Robot, commands: List[tuple]):
    """
    控制任何类型的机器人
    
    参数:
        robot: Robot 的任何子类实例
        commands: 命令列表 [(direction, speed), ...]
    """
    for direction, speed in commands:
        robot.move(direction, speed)
        print(f"{robot.name} moving {direction} at {speed}")
    
    robot.stop()
    print(f"{robot.name} stopped")
    print(f"Status: {robot.get_status()}")


# 使用示例
if __name__ == "__main__":
    # 创建不同类型的机器人
    wheel_robot = WheelRobot("轮式机器人")
    leg_robot = LegRobot("腿式机器人")
    lebot = HybridRobot("LeBot")
    
    # 使用统一的接口控制它们
    robots = [wheel_robot, leg_robot, lebot]
    
    for robot in robots:
        commands = [('forward', 100), ('left', 50), ('forward', 80)]
        control_robot(robot, commands)
        print("-" * 50)

属性和描述符

属性允许我们在访问属性时执行自定义逻辑,而不需要显式调用方法。

python
class Motor:
    """电机类 - 使用属性管理速度"""
    
    def __init__(self, motor_id, max_speed=1000):
        self.motor_id = motor_id
        self.max_speed = max_speed
        self._speed = 0  # 私有属性
        self._access_count = 0
    
    @property
    def speed(self):
        """获取电机速度"""
        self._access_count += 1
        return self._speed
    
    @speed.setter
    def speed(self, value):
        """设置电机速度,自动限制在有效范围内"""
        if not isinstance(value, (int, float)):
            raise TypeError("速度必须是数字")
        
        if value < -self.max_speed or value > self.max_speed:
            print(f"警告:速度超出范围 [{-self.max_speed}, {self.max_speed}],已限制")
            self._speed = max(-self.max_speed, min(value, self.max_speed))
        else:
            self._speed = value
    
    @speed.deleter
    def speed(self):
        """删除速度(重置为0)"""
        print(f"重置电机 {self.motor_id} 的速度")
        self._speed = 0
    
    @property
    def access_count(self):
        """获取访问次数(只读属性)"""
        return self._access_count


# 使用自定义描述符实现更复杂的逻辑
class BoundedValue:
    """有界值描述符"""
    
    def __init__(self, min_val, max_val, initial=0):
        self.min_val = min_val
        self.max_val = max_val
        self.initial = initial
        self.values = {}
    
    def __get__(self, obj, objtype=None):
        """获取值"""
        if obj is None:
            return self
        
        return self.values.get(id(obj), self.initial)
    
    def __set__(self, obj, value):
        """设置值"""
        if not isinstance(value, (int, float)):
            raise TypeError(f"值必须是数字")
        
        bounded = max(self.min_val, min(value, self.max_val))
        if bounded != value:
            print(f"值 {value} 已限制到 [{self.min_val}, {self.max_val}]")
        
        self.values[id(obj)] = bounded
    
    def __delete__(self, obj):
        """删除值"""
        del self.values[id(obj)]


class AdvancedMotor:
    """使用描述符的高级电机类"""
    
    # 使用描述符定义约束属性
    speed = BoundedValue(min_val=-1000, max_val=1000, initial=0)
    temperature = BoundedValue(min_val=0, max_val=100, initial=25)
    
    def __init__(self, motor_id):
        self.motor_id = motor_id
    
    def __repr__(self):
        return f"AdvancedMotor(id={self.motor_id}, speed={self.speed}, temp={self.temperature})"


# 使用示例
if __name__ == "__main__":
    # 基础属性示例
    motor = Motor(1, max_speed=500)
    
    motor.speed = 300
    print(f"电机速度: {motor.speed}")
    
    motor.speed = 1000  # 超出范围,将被限制
    print(f"限制后的速度: {motor.speed}")
    
    print(f"访问次数: {motor.access_count}")
    
    del motor.speed  # 删除速度
    print(f"删除后的速度: {motor.speed}")
    
    print("-" * 50)
    
    # 描述符示例
    motor1 = AdvancedMotor(1)
    motor2 = AdvancedMotor(2)
    
    motor1.speed = 200
    motor1.temperature = 45
    
    motor2.speed = -300
    motor2.temperature = 60
    
    print(motor1)
    print(motor2)

装饰器

装饰器是一种强大的 Python 特性,允许我们在不修改原始函数的情况下修改其行为。

函数装饰器

python
import functools
import time
from typing import Callable, Any

def timing_decorator(func: Callable) -> Callable:
    """
    计时装饰器:测量函数执行时间
    
    参数:
        func: 被装饰的函数
        
    返回:
        包装后的函数
    """
    @functools.wraps(func)
    def wrapper(*args, **kwargs) -> Any:
        start_time = time.time()
        result = func(*args, **kwargs)
        end_time = time.time()
        
        elapsed = end_time - start_time
        print(f"{func.__name__} 执行时间: {elapsed:.4f} 秒")
        
        return result
    
    return wrapper


def retry_decorator(max_retries: int = 3, delay: float = 1.0):
    """
    重试装饰器:如果函数失败则重试
    
    参数:
        max_retries: 最多重试次数
        delay: 重试之间的延迟(秒)
    """
    def decorator(func: Callable) -> Callable:
        @functools.wraps(func)
        def wrapper(*args, **kwargs) -> Any:
            for attempt in range(max_retries):
                try:
                    return func(*args, **kwargs)
                except Exception as e:
                    if attempt < max_retries - 1:
                        print(f"尝试 {attempt + 1} 失败: {e}{delay} 秒后重试...")
                        time.sleep(delay)
                    else:
                        print(f"所有 {max_retries} 次尝试均失败")
                        raise
        
        return wrapper
    
    return decorator


def log_decorator(func: Callable) -> Callable:
    """
    日志装饰器:记录函数调用
    """
    @functools.wraps(func)
    def wrapper(*args, **kwargs) -> Any:
        print(f"调用函数: {func.__name__}")
        print(f"参数: args={args}, kwargs={kwargs}")
        
        result = func(*args, **kwargs)
        
        print(f"返回值: {result}")
        
        return result
    
    return wrapper


def validate_decorator(**validators):
    """
    验证装饰器:验证函数参数
    
    参数:
        validators: 参数名 -> 验证函数的映射
    """
    def decorator(func: Callable) -> Callable:
        @functools.wraps(func)
        def wrapper(*args, **kwargs) -> Any:
            # 获取函数签名中的参数名
            import inspect
            sig = inspect.signature(func)
            
            # 验证 kwargs 中的参数
            for param_name, validator in validators.items():
                if param_name in kwargs:
                    if not validator(kwargs[param_name]):
                        raise ValueError(f"参数 {param_name} 验证失败: {kwargs[param_name]}")
            
            return func(*args, **kwargs)
        
        return wrapper
    
    return decorator


# 在 LeBot 中应用装饰器的实例

class RobotController:
    """机器人控制器"""
    
    @timing_decorator
    def calculate_trajectory(self, start_pos, end_pos, duration):
        """计算轨迹(模拟耗时操作)"""
        time.sleep(0.5)  # 模拟计算
        return "trajectory_data"
    
    @retry_decorator(max_retries=3, delay=0.5)
    def connect_to_robot(self, ip, port):
        """连接到机器人(可能失败)"""
        # 模拟随机失败
        import random
        if random.random() < 0.7:
            raise ConnectionError(f"无法连接到 {ip}:{port}")
        return "connected"
    
    @log_decorator
    def set_motor_speed(self, motor_id, speed):
        """设置电机速度"""
        return f"Motor {motor_id} set to {speed}"
    
    @validate_decorator(
        motor_id=lambda x: isinstance(x, int) and 0 <= x < 4,
        speed=lambda x: isinstance(x, (int, float)) and -1000 <= x <= 1000
    )
    def set_motor_speed_validated(self, motor_id, speed):
        """设置电机速度(带验证)"""
        return f"Motor {motor_id} safely set to {speed}"


# 使用示例
if __name__ == "__main__":
    controller = RobotController()
    
    # 计时示例
    print("=== 计时示例 ===")
    controller.calculate_trajectory((0, 0), (1, 1), 10)
    
    print("\n=== 重试示例 ===")
    try:
        result = controller.connect_to_robot("192.168.1.100", 5000)
        print(f"结果: {result}")
    except ConnectionError as e:
        print(f"最终失败: {e}")
    
    print("\n=== 日志示例 ===")
    controller.set_motor_speed(0, 100)
    
    print("\n=== 验证示例 ===")
    try:
        controller.set_motor_speed_validated(0, 500)
        controller.set_motor_speed_validated(5, 500)  # 错误的 ID
    except ValueError as e:
        print(f"验证错误: {e}")

类装饰器

python
def singleton_decorator(cls):
    """
    单例装饰器:确保类只有一个实例
    """
    instances = {}
    
    @functools.wraps(cls)
    def get_instance(*args, **kwargs):
        if cls not in instances:
            instances[cls] = cls(*args, **kwargs)
        return instances[cls]
    
    return get_instance


@singleton_decorator
class ConfigManager:
    """配置管理器 - 单例"""
    
    def __init__(self):
        self.config = {}
    
    def set(self, key, value):
        self.config[key] = value
    
    def get(self, key, default=None):
        return self.config.get(key, default)


def dataclass_decorator(cls):
    """
    简单的数据类装饰器
    """
    def __init__(self, **kwargs):
        for key, value in kwargs.items():
            setattr(self, key, value)
    
    def __repr__(self):
        attrs = ', '.join(f"{k}={v}" for k, v in self.__dict__.items())
        return f"{cls.__name__}({attrs})"
    
    cls.__init__ = __init__
    cls.__repr__ = __repr__
    
    return cls


@dataclass_decorator
class MotorConfig:
    """电机配置"""
    pass


# 使用示例
if __name__ == "__main__":
    # 单例示例
    config1 = ConfigManager()
    config2 = ConfigManager()
    
    config1.set("robot_name", "LeBot")
    print(f"config1 ID: {id(config1)}")
    print(f"config2 ID: {id(config2)}")
    print(f"是同一实例: {config1 is config2}")
    print(f"config2 中的值: {config2.get('robot_name')}")
    
    print("\n=== 数据类示例 ===")
    motor_cfg = MotorConfig(
        motor_id=1,
        max_speed=1000,
        kp=10,
        ki=0.1,
        kd=1.0
    )
    print(motor_cfg)

生成器与迭代器

生成器是一种特殊的迭代器,允许我们以懒加载的方式生成值序列。

python
from typing import Generator, Iterator

class RobotDataStream:
    """机器人数据流处理"""
    
    def __init__(self, data_file):
        self.data_file = data_file
    
    def read_sensor_data(self) -> Generator[dict, None, None]:
        """
        读取传感器数据的生成器
        
        yield:逐个返回数据项,而不是一次性加载所有数据
        """
        with open(self.data_file, 'r') as f:
            for line in f:
                # 解析每一行数据
                parts = line.strip().split(',')
                if len(parts) >= 4:
                    try:
                        data = {
                            'timestamp': float(parts[0]),
                            'sensor_id': int(parts[1]),
                            'value': float(parts[2]),
                            'unit': parts[3]
                        }
                        yield data
                    except ValueError:
                        continue
    
    def process_real_time(self, duration: int) -> Generator[dict, None, None]:
        """
        实时处理数据的生成器
        """
        import random
        
        for i in range(duration):
            data = {
                'timestamp': time.time(),
                'motor_speeds': [random.uniform(-1000, 1000) for _ in range(4)],
                'imu_data': {
                    'ax': random.uniform(-9.8, 9.8),
                    'ay': random.uniform(-9.8, 9.8),
                    'az': random.uniform(-9.8, 9.8)
                }
            }
            yield data
            time.sleep(0.1)


def fibonacci_generator(n: int) -> Generator[int, None, None]:
    """
    斐波那契生成器
    """
    a, b = 0, 1
    for _ in range(n):
        yield a
        a, b = b, a + b


def sensor_data_aggregator(*generators) -> Generator[list, None, None]:
    """
    聚合多个数据源
    """
    import itertools
    
    for items in itertools.zip_longest(*generators, fillvalue=None):
        yield [item for item in items if item is not None]


# 使用示例
if __name__ == "__main__":
    # 斐波那契生成器
    print("=== 斐波那契数列 ===")
    for num in fibonacci_generator(10):
        print(num, end=' ')
    print("\n")
    
    # 内存效率对比
    print("=== 生成器的内存效率 ===")
    
    # 列表方式(一次性加载所有数据)
    def get_large_data_list(n):
        return [i**2 for i in range(n)]
    
    # 生成器方式(按需生成)
    def get_large_data_generator(n):
        for i in range(n):
            yield i**2
    
    import sys
    
    list_data = get_large_data_list(1000000)
    generator_data = get_large_data_generator(1000000)
    
    print(f"列表大小: {sys.getsizeof(list_data) / 1024:.2f} KB")
    print(f"生成器大小: {sys.getsizeof(generator_data) / 1024:.2f} KB")

上下文管理器

上下文管理器用于管理资源的获取和释放,使用 with 语句。

python
from contextlib import contextmanager
from typing import Generator, Any

class RobotConnection:
    """机器人连接上下文管理器"""
    
    def __init__(self, host: str, port: int):
        self.host = host
        self.port = port
        self.connected = False
    
    def __enter__(self):
        """进入上下文时调用"""
        print(f"正在连接到 {self.host}:{self.port}...")
        # 模拟连接
        time.sleep(0.5)
        self.connected = True
        print("连接成功")
        return self
    
    def __exit__(self, exc_type, exc_val, exc_tb):
        """退出上下文时调用"""
        print("正在关闭连接...")
        self.connected = False
        print("连接已关闭")
        
        # 如果发生异常,exc_type 不为 None
        if exc_type is not None:
            print(f"发生异常: {exc_type.__name__}: {exc_val}")
            # 返回 True 抑制异常,False 让异常继续传播
            return False
        
        return True
    
    def send_command(self, command: str) -> str:
        """发送命令"""
        if not self.connected:
            raise RuntimeError("未连接")
        
        print(f"发送命令: {command}")
        # 模拟接收响应
        return f"Response to {command}"


@contextmanager
def robot_motor_control(motor_id: int) -> Generator[dict, None, None]:
    """
    使用装饰器的电机控制上下文
    """
    motor_state = {'enabled': True, 'speed': 0}
    
    print(f"启用电机 {motor_id}")
    
    try:
        yield motor_state
    finally:
        # 清理:关闭电机
        motor_state['enabled'] = False
        motor_state['speed'] = 0
        print(f"电机 {motor_id} 已禁用")


@contextmanager
def timing_context(name: str) -> Generator[dict, None, None]:
    """
    计时上下文管理器
    """
    start_time = time.time()
    stats = {'start': start_time}
    
    print(f"开始: {name}")
    
    try:
        yield stats
    finally:
        end_time = time.time()
        stats['end'] = end_time
        stats['elapsed'] = end_time - start_time
        print(f"完成: {name} (耗时 {stats['elapsed']:.4f} 秒)")


class ResourcePool:
    """资源池上下文管理器"""
    
    def __init__(self, size: int):
        self.size = size
        self.available = list(range(size))
        self.in_use = []
    
    def __enter__(self):
        """从池中获取资源"""
        if not self.available:
            raise RuntimeError("没有可用资源")
        
        resource = self.available.pop()
        self.in_use.append(resource)
        print(f"获取资源 {resource}")
        return resource
    
    def __exit__(self, exc_type, exc_val, exc_tb):
        """归还资源到池"""
        if self.in_use:
            resource = self.in_use.pop()
            self.available.append(resource)
            print(f"归还资源 {resource}")
        
        return False


# 使用示例
if __name__ == "__main__":
    print("=== 基础上下文管理器 ===")
    try:
        with RobotConnection("192.168.1.100", 5000) as robot:
            response = robot.send_command("forward")
            print(f"收到响应: {response}")
    except Exception as e:
        print(f"错误: {e}")
    
    print("\n=== 装饰器上下文管理器 ===")
    with robot_motor_control(1) as motor:
        motor['speed'] = 500
        print(f"电机状态: {motor}")
    
    print("\n=== 计时上下文 ===")
    with timing_context("轨迹计算") as stats:
        time.sleep(1)
        # 进行一些计算
    
    print("\n=== 资源池 ===")
    pool = ResourcePool(3)
    
    with pool as resource1:
        print(f"使用资源 {resource1}")
        with pool as resource2:
            print(f"使用资源 {resource2}")

多线程编程

对于 LeBot 这样的实时系统,多线程允许并发执行多个任务。

python
import threading
from queue import Queue
from typing import Callable, List

class ThreadPool:
    """简单的线程池"""
    
    def __init__(self, num_workers: int):
        self.num_workers = num_workers
        self.task_queue = Queue()
        self.workers = []
        self.running = False
    
    def start(self):
        """启动线程池"""
        self.running = True
        
        for i in range(self.num_workers):
            worker = threading.Thread(
                target=self._worker,
                name=f"Worker-{i}",
                daemon=True
            )
            worker.start()
            self.workers.append(worker)
        
        print(f"线程池已启动,{self.num_workers} 个工作线程")
    
    def _worker(self):
        """工作线程的主循环"""
        while self.running:
            try:
                # 从队列中获取任务
                task, args, kwargs = self.task_queue.get(timeout=1)
                
                try:
                    result = task(*args, **kwargs)
                    print(f"{threading.current_thread().name} 完成: {result}")
                except Exception as e:
                    print(f"{threading.current_thread().name} 错误: {e}")
                
                self.task_queue.task_done()
            
            except:
                continue
    
    def submit_task(self, task: Callable, *args, **kwargs):
        """提交任务到线程池"""
        self.task_queue.put((task, args, kwargs))
    
    def wait_all(self):
        """等待所有任务完成"""
        self.task_queue.join()
    
    def stop(self):
        """停止线程池"""
        self.running = False
        
        for worker in self.workers:
            worker.join(timeout=2)
        
        print("线程池已停止")


class RobotMotorThread:
    """机器人电机线程"""
    
    def __init__(self, motor_id: int, control_freq: float = 100):
        self.motor_id = motor_id
        self.control_freq = control_freq
        self.target_speed = 0
        self.current_speed = 0
        self.running = False
        self.thread = None
        self.lock = threading.RLock()
    
    def set_speed(self, speed: float):
        """设置目标速度(线程安全)"""
        with self.lock:
            self.target_speed = max(-1000, min(speed, 1000))
    
    def get_speed(self) -> float:
        """获取当前速度(线程安全)"""
        with self.lock:
            return self.current_speed
    
    def start(self):
        """启动电机线程"""
        if self.running:
            return
        
        self.running = True
        self.thread = threading.Thread(
            target=self._control_loop,
            name=f"Motor-{self.motor_id}",
            daemon=True
        )
        self.thread.start()
        print(f"电机 {self.motor_id} 线程已启动")
    
    def _control_loop(self):
        """电机控制循环"""
        period = 1.0 / self.control_freq
        
        while self.running:
            with self.lock:
                # 简单的速度平滑
                if self.current_speed < self.target_speed:
                    self.current_speed += 50
                elif self.current_speed > self.target_speed:
                    self.current_speed -= 50
                else:
                    self.current_speed = self.target_speed
                
                # 限制范围
                self.current_speed = max(-1000, min(self.current_speed, 1000))
            
            time.sleep(period)
    
    def stop(self):
        """停止电机线程"""
        self.running = False
        if self.thread:
            self.thread.join(timeout=2)
        print(f"电机 {self.motor_id} 线程已停止")


class RobotController:
    """机器人主控制器"""
    
    def __init__(self, num_motors: int = 4):
        self.num_motors = num_motors
        self.motors = [
            RobotMotorThread(i, control_freq=100)
            for i in range(num_motors)
        ]
        self.running = False
    
    def start(self):
        """启动所有电机"""
        self.running = True
        for motor in self.motors:
            motor.start()
        print("机器人已启动")
    
    def move_forward(self, speed: float):
        """前进"""
        for motor in self.motors:
            motor.set_speed(speed)
    
    def move_backward(self, speed: float):
        """后退"""
        for motor in self.motors:
            motor.set_speed(-speed)
    
    def turn_left(self, speed: float):
        """左转"""
        for i, motor in enumerate(self.motors):
            # 左侧电机减速,右侧电机加速
            if i % 2 == 0:
                motor.set_speed(speed * 0.5)
            else:
                motor.set_speed(speed)
    
    def get_status(self) -> dict:
        """获取机器人状态"""
        return {
            'motor_speeds': [motor.get_speed() for motor in self.motors]
        }
    
    def stop(self):
        """停止机器人"""
        self.running = False
        for motor in self.motors:
            motor.set_speed(0)
            motor.stop()
        print("机器人已停止")


# 使用示例
if __name__ == "__main__":
    print("=== 线程池示例 ===")
    
    def sample_task(x, y):
        time.sleep(0.5)
        return x + y
    
    pool = ThreadPool(4)
    pool.start()
    
    for i in range(10):
        pool.submit_task(sample_task, i, i + 1)
    
    pool.wait_all()
    pool.stop()
    
    print("\n=== 机器人控制示例 ===")
    
    robot = RobotController(num_motors=4)
    robot.start()
    
    # 模拟一些运动
    print("前进...")
    robot.move_forward(500)
    time.sleep(2)
    
    print("左转...")
    robot.turn_left(400)
    time.sleep(2)
    
    print("停止...")
    robot.stop()
    
    print(f"最终状态: {robot.get_status()}")

并发与性能

多进程

对于 CPU 密集型任务,多进程比多线程更有效,因为它绕过了 Python 的全局解释器锁(GIL)。

python
from multiprocessing import Process, Pool, Queue, Value, Array
import multiprocessing

def cpu_intensive_task(n: int) -> int:
    """CPU 密集型任务"""
    result = 0
    for i in range(n):
        result += i ** 2
    return result


def process_sensor_data(sensor_id: int, num_samples: int, output_queue: Queue):
    """处理传感器数据的进程"""
    import random
    
    for i in range(num_samples):
        data = {
            'sensor_id': sensor_id,
            'sample': i,
            'value': random.uniform(0, 100)
        }
        output_queue.put(data)
        time.sleep(0.01)


def multiprocessing_example():
    """多进程示例"""
    
    print("=== 多进程处理 ===")
    
    # 创建进程池
    with Pool(processes=4) as pool:
        # 使用进程池处理多个任务
        results = pool.map(
            cpu_intensive_task,
            [1000000, 2000000, 3000000, 4000000]
        )
        
        print(f"处理结果: {results}")
    
    print("\n=== 进程通信 ===")
    
    output_queue = Queue()
    
    # 创建多个处理传感器数据的进程
    processes = []
    for sensor_id in range(4):
        p = Process(
            target=process_sensor_data,
            args=(sensor_id, 5, output_queue)
        )
        p.start()
        processes.append(p)
    
    # 主进程从队列中读取数据
    for _ in range(4 * 5):
        data = output_queue.get()
        print(f"收到数据: {data}")
    
    # 等待所有进程完成
    for p in processes:
        p.join()
    
    print("所有进程已完成")


# 共享数据的进程
def shared_data_example():
    """使用共享数据的多进程示例"""
    
    def increment_counter(counter, iterations):
        """增加共享计数器"""
        for _ in range(iterations):
            with counter.get_lock():
                counter.value += 1
    
    print("=== 共享数据示例 ===")
    
    # 创建共享计数器
    counter = Value('i', 0)
    
    # 创建多个进程
    processes = []
    for i in range(4):
        p = Process(
            target=increment_counter,
            args=(counter, 1000)
        )
        p.start()
        processes.append(p)
    
    # 等待所有进程完成
    for p in processes:
        p.join()
    
    print(f"最终计数: {counter.value}")

总结

Python 的高级特性提供了强大的工具来构建复杂的应用:

  1. 面向对象设计:通过继承、多态和设计模式构建可维护的代码
  2. 装饰器:以优雅的方式添加功能到现有代码
  3. 生成器:处理大数据集时节省内存
  4. 上下文管理器:确保资源正确释放
  5. 多线程/多进程:实现并发执行,提高程序效率

对于 LeBot 这样的机器人应用,这些技术对于实现实时控制、数据处理和系统管理至关重要。


参考资源

由 LeBot 开发团队编写