Python 进阶用法
概述
在前一章中,我们介绍了 Python 的基础知识,包括语法、数据结构和函数。现在我们将深入探讨 Python 的高级特性,这些特性对于开发复杂的机器人应用至关重要。本章涵盖面向对象编程、装饰器、生成器、上下文管理器、多线程、多进程等高级概念。
面向对象编程(OOP)进阶
类的设计原则
在 Python 中,一切皆对象。理解类的设计原则对于编写可维护的代码至关重要。
单一职责原则(Single Responsibility Principle)
每个类应该只有一个责任,这样可以使代码更易维护和测试。
python
# 不好的设计:一个类负责多个职责
class RobotController:
"""机器人控制器 - 职责过多"""
def __init__(self):
self.motors = []
self.sensors = []
def drive_motor(self, motor_id, speed):
"""驱动电机"""
pass
def read_sensor(self, sensor_id):
"""读取传感器"""
pass
def save_log(self, message):
"""保存日志"""
pass
def send_network_data(self, data):
"""发送网络数据"""
pass
# 好的设计:分离职责
class Motor:
"""电机类 - 只负责电机控制"""
def __init__(self, motor_id, max_speed=1000):
self.motor_id = motor_id
self.max_speed = max_speed
self.current_speed = 0
def set_speed(self, speed):
"""设置电机速度"""
self.current_speed = max(-self.max_speed, min(speed, self.max_speed))
def get_current_speed(self):
"""获取当前速度"""
return self.current_speed
class Sensor:
"""传感器类 - 只负责传感器读取"""
def __init__(self, sensor_id, sensor_type):
self.sensor_id = sensor_id
self.sensor_type = sensor_type
self.data = 0
def read(self):
"""读取传感器数据"""
return self.data
def update(self, value):
"""更新传感器数据"""
self.data = value
class Logger:
"""日志类 - 只负责日志记录"""
def __init__(self, filename):
self.filename = filename
def log(self, level, message):
"""记录日志"""
with open(self.filename, 'a') as f:
f.write(f"[{level}] {message}\n")
class RobotController:
"""机器人控制器 - 协调各个组件"""
def __init__(self):
self.motors = {}
self.sensors = {}
self.logger = Logger("robot.log")
def add_motor(self, motor_id, max_speed):
"""添加电机"""
self.motors[motor_id] = Motor(motor_id, max_speed)
def add_sensor(self, sensor_id, sensor_type):
"""添加传感器"""
self.sensors[sensor_id] = Sensor(sensor_id, sensor_type)
def drive_motor(self, motor_id, speed):
"""驱动指定电机"""
if motor_id in self.motors:
self.motors[motor_id].set_speed(speed)
self.logger.log("INFO", f"Motor {motor_id} set to {speed}")
def read_sensor(self, sensor_id):
"""读取指定传感器"""
if sensor_id in self.sensors:
return self.sensors[sensor_id].read()
return None继承与多态
继承允许我们创建类的层级结构,重用代码。多态让我们用统一的接口处理不同的对象。
python
from abc import ABC, abstractmethod
from typing import List
# 定义抽象基类
class Robot(ABC):
"""机器人基类"""
def __init__(self, name):
self.name = name
self.is_moving = False
@abstractmethod
def move(self, direction, speed):
"""移动机器人 - 子类必须实现"""
pass
@abstractmethod
def stop(self):
"""停止机器人 - 子类必须实现"""
pass
def get_status(self):
"""获取状态"""
return {
'name': self.name,
'moving': self.is_moving
}
class WheelRobot(Robot):
"""轮式机器人"""
def __init__(self, name, num_wheels=4):
super().__init__(name)
self.num_wheels = num_wheels
self.motors = [0] * num_wheels
def move(self, direction, speed):
"""轮式机器人的移动实现"""
self.is_moving = True
if direction == 'forward':
self.motors = [speed] * self.num_wheels
elif direction == 'backward':
self.motors = [-speed] * self.num_wheels
elif direction == 'left':
# 左转:左轮减速,右轮加速
self.motors[0] = speed * 0.5
self.motors[1] = speed
self.motors[2] = speed * 0.5
self.motors[3] = speed
elif direction == 'right':
# 右转:右轮减速,左轮加速
self.motors[0] = speed
self.motors[1] = speed * 0.5
self.motors[2] = speed
self.motors[3] = speed * 0.5
def stop(self):
"""停止机器人"""
self.is_moving = False
self.motors = [0] * self.num_wheels
class LegRobot(Robot):
"""腿式机器人"""
def __init__(self, name, num_legs=4):
super().__init__(name)
self.num_legs = num_legs
self.leg_positions = [(0, 0)] * num_legs
self.gait_phase = 0
def move(self, direction, speed):
"""腿式机器人的移动实现"""
self.is_moving = True
# 步态控制(简化示例)
if direction == 'forward':
self._update_gait(speed)
elif direction == 'backward':
self._update_gait(-speed)
def _update_gait(self, speed):
"""更新步态"""
self.gait_phase = (self.gait_phase + speed) % 100
def stop(self):
"""停止机器人"""
self.is_moving = False
self.gait_phase = 0
class HybridRobot(WheelRobot, LegRobot):
"""轮腿混合机器人(LeBot 的简化模型)"""
def __init__(self, name):
# 初始化轮式部分
WheelRobot.__init__(self, name, num_wheels=4)
# 初始化腿部部分
self.legs = [0] * 4
self.current_mode = 'wheel' # 'wheel' 或 'leg'
def set_mode(self, mode):
"""切换运动模式"""
if mode in ['wheel', 'leg']:
self.current_mode = mode
def move(self, direction, speed):
"""根据当前模式移动"""
if self.current_mode == 'wheel':
WheelRobot.move(self, direction, speed)
else:
LegRobot.move(self, direction, speed)
# 多态示例
def control_robot(robot: Robot, commands: List[tuple]):
"""
控制任何类型的机器人
参数:
robot: Robot 的任何子类实例
commands: 命令列表 [(direction, speed), ...]
"""
for direction, speed in commands:
robot.move(direction, speed)
print(f"{robot.name} moving {direction} at {speed}")
robot.stop()
print(f"{robot.name} stopped")
print(f"Status: {robot.get_status()}")
# 使用示例
if __name__ == "__main__":
# 创建不同类型的机器人
wheel_robot = WheelRobot("轮式机器人")
leg_robot = LegRobot("腿式机器人")
lebot = HybridRobot("LeBot")
# 使用统一的接口控制它们
robots = [wheel_robot, leg_robot, lebot]
for robot in robots:
commands = [('forward', 100), ('left', 50), ('forward', 80)]
control_robot(robot, commands)
print("-" * 50)属性和描述符
属性允许我们在访问属性时执行自定义逻辑,而不需要显式调用方法。
python
class Motor:
"""电机类 - 使用属性管理速度"""
def __init__(self, motor_id, max_speed=1000):
self.motor_id = motor_id
self.max_speed = max_speed
self._speed = 0 # 私有属性
self._access_count = 0
@property
def speed(self):
"""获取电机速度"""
self._access_count += 1
return self._speed
@speed.setter
def speed(self, value):
"""设置电机速度,自动限制在有效范围内"""
if not isinstance(value, (int, float)):
raise TypeError("速度必须是数字")
if value < -self.max_speed or value > self.max_speed:
print(f"警告:速度超出范围 [{-self.max_speed}, {self.max_speed}],已限制")
self._speed = max(-self.max_speed, min(value, self.max_speed))
else:
self._speed = value
@speed.deleter
def speed(self):
"""删除速度(重置为0)"""
print(f"重置电机 {self.motor_id} 的速度")
self._speed = 0
@property
def access_count(self):
"""获取访问次数(只读属性)"""
return self._access_count
# 使用自定义描述符实现更复杂的逻辑
class BoundedValue:
"""有界值描述符"""
def __init__(self, min_val, max_val, initial=0):
self.min_val = min_val
self.max_val = max_val
self.initial = initial
self.values = {}
def __get__(self, obj, objtype=None):
"""获取值"""
if obj is None:
return self
return self.values.get(id(obj), self.initial)
def __set__(self, obj, value):
"""设置值"""
if not isinstance(value, (int, float)):
raise TypeError(f"值必须是数字")
bounded = max(self.min_val, min(value, self.max_val))
if bounded != value:
print(f"值 {value} 已限制到 [{self.min_val}, {self.max_val}]")
self.values[id(obj)] = bounded
def __delete__(self, obj):
"""删除值"""
del self.values[id(obj)]
class AdvancedMotor:
"""使用描述符的高级电机类"""
# 使用描述符定义约束属性
speed = BoundedValue(min_val=-1000, max_val=1000, initial=0)
temperature = BoundedValue(min_val=0, max_val=100, initial=25)
def __init__(self, motor_id):
self.motor_id = motor_id
def __repr__(self):
return f"AdvancedMotor(id={self.motor_id}, speed={self.speed}, temp={self.temperature})"
# 使用示例
if __name__ == "__main__":
# 基础属性示例
motor = Motor(1, max_speed=500)
motor.speed = 300
print(f"电机速度: {motor.speed}")
motor.speed = 1000 # 超出范围,将被限制
print(f"限制后的速度: {motor.speed}")
print(f"访问次数: {motor.access_count}")
del motor.speed # 删除速度
print(f"删除后的速度: {motor.speed}")
print("-" * 50)
# 描述符示例
motor1 = AdvancedMotor(1)
motor2 = AdvancedMotor(2)
motor1.speed = 200
motor1.temperature = 45
motor2.speed = -300
motor2.temperature = 60
print(motor1)
print(motor2)装饰器
装饰器是一种强大的 Python 特性,允许我们在不修改原始函数的情况下修改其行为。
函数装饰器
python
import functools
import time
from typing import Callable, Any
def timing_decorator(func: Callable) -> Callable:
"""
计时装饰器:测量函数执行时间
参数:
func: 被装饰的函数
返回:
包装后的函数
"""
@functools.wraps(func)
def wrapper(*args, **kwargs) -> Any:
start_time = time.time()
result = func(*args, **kwargs)
end_time = time.time()
elapsed = end_time - start_time
print(f"{func.__name__} 执行时间: {elapsed:.4f} 秒")
return result
return wrapper
def retry_decorator(max_retries: int = 3, delay: float = 1.0):
"""
重试装饰器:如果函数失败则重试
参数:
max_retries: 最多重试次数
delay: 重试之间的延迟(秒)
"""
def decorator(func: Callable) -> Callable:
@functools.wraps(func)
def wrapper(*args, **kwargs) -> Any:
for attempt in range(max_retries):
try:
return func(*args, **kwargs)
except Exception as e:
if attempt < max_retries - 1:
print(f"尝试 {attempt + 1} 失败: {e},{delay} 秒后重试...")
time.sleep(delay)
else:
print(f"所有 {max_retries} 次尝试均失败")
raise
return wrapper
return decorator
def log_decorator(func: Callable) -> Callable:
"""
日志装饰器:记录函数调用
"""
@functools.wraps(func)
def wrapper(*args, **kwargs) -> Any:
print(f"调用函数: {func.__name__}")
print(f"参数: args={args}, kwargs={kwargs}")
result = func(*args, **kwargs)
print(f"返回值: {result}")
return result
return wrapper
def validate_decorator(**validators):
"""
验证装饰器:验证函数参数
参数:
validators: 参数名 -> 验证函数的映射
"""
def decorator(func: Callable) -> Callable:
@functools.wraps(func)
def wrapper(*args, **kwargs) -> Any:
# 获取函数签名中的参数名
import inspect
sig = inspect.signature(func)
# 验证 kwargs 中的参数
for param_name, validator in validators.items():
if param_name in kwargs:
if not validator(kwargs[param_name]):
raise ValueError(f"参数 {param_name} 验证失败: {kwargs[param_name]}")
return func(*args, **kwargs)
return wrapper
return decorator
# 在 LeBot 中应用装饰器的实例
class RobotController:
"""机器人控制器"""
@timing_decorator
def calculate_trajectory(self, start_pos, end_pos, duration):
"""计算轨迹(模拟耗时操作)"""
time.sleep(0.5) # 模拟计算
return "trajectory_data"
@retry_decorator(max_retries=3, delay=0.5)
def connect_to_robot(self, ip, port):
"""连接到机器人(可能失败)"""
# 模拟随机失败
import random
if random.random() < 0.7:
raise ConnectionError(f"无法连接到 {ip}:{port}")
return "connected"
@log_decorator
def set_motor_speed(self, motor_id, speed):
"""设置电机速度"""
return f"Motor {motor_id} set to {speed}"
@validate_decorator(
motor_id=lambda x: isinstance(x, int) and 0 <= x < 4,
speed=lambda x: isinstance(x, (int, float)) and -1000 <= x <= 1000
)
def set_motor_speed_validated(self, motor_id, speed):
"""设置电机速度(带验证)"""
return f"Motor {motor_id} safely set to {speed}"
# 使用示例
if __name__ == "__main__":
controller = RobotController()
# 计时示例
print("=== 计时示例 ===")
controller.calculate_trajectory((0, 0), (1, 1), 10)
print("\n=== 重试示例 ===")
try:
result = controller.connect_to_robot("192.168.1.100", 5000)
print(f"结果: {result}")
except ConnectionError as e:
print(f"最终失败: {e}")
print("\n=== 日志示例 ===")
controller.set_motor_speed(0, 100)
print("\n=== 验证示例 ===")
try:
controller.set_motor_speed_validated(0, 500)
controller.set_motor_speed_validated(5, 500) # 错误的 ID
except ValueError as e:
print(f"验证错误: {e}")类装饰器
python
def singleton_decorator(cls):
"""
单例装饰器:确保类只有一个实例
"""
instances = {}
@functools.wraps(cls)
def get_instance(*args, **kwargs):
if cls not in instances:
instances[cls] = cls(*args, **kwargs)
return instances[cls]
return get_instance
@singleton_decorator
class ConfigManager:
"""配置管理器 - 单例"""
def __init__(self):
self.config = {}
def set(self, key, value):
self.config[key] = value
def get(self, key, default=None):
return self.config.get(key, default)
def dataclass_decorator(cls):
"""
简单的数据类装饰器
"""
def __init__(self, **kwargs):
for key, value in kwargs.items():
setattr(self, key, value)
def __repr__(self):
attrs = ', '.join(f"{k}={v}" for k, v in self.__dict__.items())
return f"{cls.__name__}({attrs})"
cls.__init__ = __init__
cls.__repr__ = __repr__
return cls
@dataclass_decorator
class MotorConfig:
"""电机配置"""
pass
# 使用示例
if __name__ == "__main__":
# 单例示例
config1 = ConfigManager()
config2 = ConfigManager()
config1.set("robot_name", "LeBot")
print(f"config1 ID: {id(config1)}")
print(f"config2 ID: {id(config2)}")
print(f"是同一实例: {config1 is config2}")
print(f"config2 中的值: {config2.get('robot_name')}")
print("\n=== 数据类示例 ===")
motor_cfg = MotorConfig(
motor_id=1,
max_speed=1000,
kp=10,
ki=0.1,
kd=1.0
)
print(motor_cfg)生成器与迭代器
生成器是一种特殊的迭代器,允许我们以懒加载的方式生成值序列。
python
from typing import Generator, Iterator
class RobotDataStream:
"""机器人数据流处理"""
def __init__(self, data_file):
self.data_file = data_file
def read_sensor_data(self) -> Generator[dict, None, None]:
"""
读取传感器数据的生成器
yield:逐个返回数据项,而不是一次性加载所有数据
"""
with open(self.data_file, 'r') as f:
for line in f:
# 解析每一行数据
parts = line.strip().split(',')
if len(parts) >= 4:
try:
data = {
'timestamp': float(parts[0]),
'sensor_id': int(parts[1]),
'value': float(parts[2]),
'unit': parts[3]
}
yield data
except ValueError:
continue
def process_real_time(self, duration: int) -> Generator[dict, None, None]:
"""
实时处理数据的生成器
"""
import random
for i in range(duration):
data = {
'timestamp': time.time(),
'motor_speeds': [random.uniform(-1000, 1000) for _ in range(4)],
'imu_data': {
'ax': random.uniform(-9.8, 9.8),
'ay': random.uniform(-9.8, 9.8),
'az': random.uniform(-9.8, 9.8)
}
}
yield data
time.sleep(0.1)
def fibonacci_generator(n: int) -> Generator[int, None, None]:
"""
斐波那契生成器
"""
a, b = 0, 1
for _ in range(n):
yield a
a, b = b, a + b
def sensor_data_aggregator(*generators) -> Generator[list, None, None]:
"""
聚合多个数据源
"""
import itertools
for items in itertools.zip_longest(*generators, fillvalue=None):
yield [item for item in items if item is not None]
# 使用示例
if __name__ == "__main__":
# 斐波那契生成器
print("=== 斐波那契数列 ===")
for num in fibonacci_generator(10):
print(num, end=' ')
print("\n")
# 内存效率对比
print("=== 生成器的内存效率 ===")
# 列表方式(一次性加载所有数据)
def get_large_data_list(n):
return [i**2 for i in range(n)]
# 生成器方式(按需生成)
def get_large_data_generator(n):
for i in range(n):
yield i**2
import sys
list_data = get_large_data_list(1000000)
generator_data = get_large_data_generator(1000000)
print(f"列表大小: {sys.getsizeof(list_data) / 1024:.2f} KB")
print(f"生成器大小: {sys.getsizeof(generator_data) / 1024:.2f} KB")上下文管理器
上下文管理器用于管理资源的获取和释放,使用 with 语句。
python
from contextlib import contextmanager
from typing import Generator, Any
class RobotConnection:
"""机器人连接上下文管理器"""
def __init__(self, host: str, port: int):
self.host = host
self.port = port
self.connected = False
def __enter__(self):
"""进入上下文时调用"""
print(f"正在连接到 {self.host}:{self.port}...")
# 模拟连接
time.sleep(0.5)
self.connected = True
print("连接成功")
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""退出上下文时调用"""
print("正在关闭连接...")
self.connected = False
print("连接已关闭")
# 如果发生异常,exc_type 不为 None
if exc_type is not None:
print(f"发生异常: {exc_type.__name__}: {exc_val}")
# 返回 True 抑制异常,False 让异常继续传播
return False
return True
def send_command(self, command: str) -> str:
"""发送命令"""
if not self.connected:
raise RuntimeError("未连接")
print(f"发送命令: {command}")
# 模拟接收响应
return f"Response to {command}"
@contextmanager
def robot_motor_control(motor_id: int) -> Generator[dict, None, None]:
"""
使用装饰器的电机控制上下文
"""
motor_state = {'enabled': True, 'speed': 0}
print(f"启用电机 {motor_id}")
try:
yield motor_state
finally:
# 清理:关闭电机
motor_state['enabled'] = False
motor_state['speed'] = 0
print(f"电机 {motor_id} 已禁用")
@contextmanager
def timing_context(name: str) -> Generator[dict, None, None]:
"""
计时上下文管理器
"""
start_time = time.time()
stats = {'start': start_time}
print(f"开始: {name}")
try:
yield stats
finally:
end_time = time.time()
stats['end'] = end_time
stats['elapsed'] = end_time - start_time
print(f"完成: {name} (耗时 {stats['elapsed']:.4f} 秒)")
class ResourcePool:
"""资源池上下文管理器"""
def __init__(self, size: int):
self.size = size
self.available = list(range(size))
self.in_use = []
def __enter__(self):
"""从池中获取资源"""
if not self.available:
raise RuntimeError("没有可用资源")
resource = self.available.pop()
self.in_use.append(resource)
print(f"获取资源 {resource}")
return resource
def __exit__(self, exc_type, exc_val, exc_tb):
"""归还资源到池"""
if self.in_use:
resource = self.in_use.pop()
self.available.append(resource)
print(f"归还资源 {resource}")
return False
# 使用示例
if __name__ == "__main__":
print("=== 基础上下文管理器 ===")
try:
with RobotConnection("192.168.1.100", 5000) as robot:
response = robot.send_command("forward")
print(f"收到响应: {response}")
except Exception as e:
print(f"错误: {e}")
print("\n=== 装饰器上下文管理器 ===")
with robot_motor_control(1) as motor:
motor['speed'] = 500
print(f"电机状态: {motor}")
print("\n=== 计时上下文 ===")
with timing_context("轨迹计算") as stats:
time.sleep(1)
# 进行一些计算
print("\n=== 资源池 ===")
pool = ResourcePool(3)
with pool as resource1:
print(f"使用资源 {resource1}")
with pool as resource2:
print(f"使用资源 {resource2}")多线程编程
对于 LeBot 这样的实时系统,多线程允许并发执行多个任务。
python
import threading
from queue import Queue
from typing import Callable, List
class ThreadPool:
"""简单的线程池"""
def __init__(self, num_workers: int):
self.num_workers = num_workers
self.task_queue = Queue()
self.workers = []
self.running = False
def start(self):
"""启动线程池"""
self.running = True
for i in range(self.num_workers):
worker = threading.Thread(
target=self._worker,
name=f"Worker-{i}",
daemon=True
)
worker.start()
self.workers.append(worker)
print(f"线程池已启动,{self.num_workers} 个工作线程")
def _worker(self):
"""工作线程的主循环"""
while self.running:
try:
# 从队列中获取任务
task, args, kwargs = self.task_queue.get(timeout=1)
try:
result = task(*args, **kwargs)
print(f"{threading.current_thread().name} 完成: {result}")
except Exception as e:
print(f"{threading.current_thread().name} 错误: {e}")
self.task_queue.task_done()
except:
continue
def submit_task(self, task: Callable, *args, **kwargs):
"""提交任务到线程池"""
self.task_queue.put((task, args, kwargs))
def wait_all(self):
"""等待所有任务完成"""
self.task_queue.join()
def stop(self):
"""停止线程池"""
self.running = False
for worker in self.workers:
worker.join(timeout=2)
print("线程池已停止")
class RobotMotorThread:
"""机器人电机线程"""
def __init__(self, motor_id: int, control_freq: float = 100):
self.motor_id = motor_id
self.control_freq = control_freq
self.target_speed = 0
self.current_speed = 0
self.running = False
self.thread = None
self.lock = threading.RLock()
def set_speed(self, speed: float):
"""设置目标速度(线程安全)"""
with self.lock:
self.target_speed = max(-1000, min(speed, 1000))
def get_speed(self) -> float:
"""获取当前速度(线程安全)"""
with self.lock:
return self.current_speed
def start(self):
"""启动电机线程"""
if self.running:
return
self.running = True
self.thread = threading.Thread(
target=self._control_loop,
name=f"Motor-{self.motor_id}",
daemon=True
)
self.thread.start()
print(f"电机 {self.motor_id} 线程已启动")
def _control_loop(self):
"""电机控制循环"""
period = 1.0 / self.control_freq
while self.running:
with self.lock:
# 简单的速度平滑
if self.current_speed < self.target_speed:
self.current_speed += 50
elif self.current_speed > self.target_speed:
self.current_speed -= 50
else:
self.current_speed = self.target_speed
# 限制范围
self.current_speed = max(-1000, min(self.current_speed, 1000))
time.sleep(period)
def stop(self):
"""停止电机线程"""
self.running = False
if self.thread:
self.thread.join(timeout=2)
print(f"电机 {self.motor_id} 线程已停止")
class RobotController:
"""机器人主控制器"""
def __init__(self, num_motors: int = 4):
self.num_motors = num_motors
self.motors = [
RobotMotorThread(i, control_freq=100)
for i in range(num_motors)
]
self.running = False
def start(self):
"""启动所有电机"""
self.running = True
for motor in self.motors:
motor.start()
print("机器人已启动")
def move_forward(self, speed: float):
"""前进"""
for motor in self.motors:
motor.set_speed(speed)
def move_backward(self, speed: float):
"""后退"""
for motor in self.motors:
motor.set_speed(-speed)
def turn_left(self, speed: float):
"""左转"""
for i, motor in enumerate(self.motors):
# 左侧电机减速,右侧电机加速
if i % 2 == 0:
motor.set_speed(speed * 0.5)
else:
motor.set_speed(speed)
def get_status(self) -> dict:
"""获取机器人状态"""
return {
'motor_speeds': [motor.get_speed() for motor in self.motors]
}
def stop(self):
"""停止机器人"""
self.running = False
for motor in self.motors:
motor.set_speed(0)
motor.stop()
print("机器人已停止")
# 使用示例
if __name__ == "__main__":
print("=== 线程池示例 ===")
def sample_task(x, y):
time.sleep(0.5)
return x + y
pool = ThreadPool(4)
pool.start()
for i in range(10):
pool.submit_task(sample_task, i, i + 1)
pool.wait_all()
pool.stop()
print("\n=== 机器人控制示例 ===")
robot = RobotController(num_motors=4)
robot.start()
# 模拟一些运动
print("前进...")
robot.move_forward(500)
time.sleep(2)
print("左转...")
robot.turn_left(400)
time.sleep(2)
print("停止...")
robot.stop()
print(f"最终状态: {robot.get_status()}")并发与性能
多进程
对于 CPU 密集型任务,多进程比多线程更有效,因为它绕过了 Python 的全局解释器锁(GIL)。
python
from multiprocessing import Process, Pool, Queue, Value, Array
import multiprocessing
def cpu_intensive_task(n: int) -> int:
"""CPU 密集型任务"""
result = 0
for i in range(n):
result += i ** 2
return result
def process_sensor_data(sensor_id: int, num_samples: int, output_queue: Queue):
"""处理传感器数据的进程"""
import random
for i in range(num_samples):
data = {
'sensor_id': sensor_id,
'sample': i,
'value': random.uniform(0, 100)
}
output_queue.put(data)
time.sleep(0.01)
def multiprocessing_example():
"""多进程示例"""
print("=== 多进程处理 ===")
# 创建进程池
with Pool(processes=4) as pool:
# 使用进程池处理多个任务
results = pool.map(
cpu_intensive_task,
[1000000, 2000000, 3000000, 4000000]
)
print(f"处理结果: {results}")
print("\n=== 进程通信 ===")
output_queue = Queue()
# 创建多个处理传感器数据的进程
processes = []
for sensor_id in range(4):
p = Process(
target=process_sensor_data,
args=(sensor_id, 5, output_queue)
)
p.start()
processes.append(p)
# 主进程从队列中读取数据
for _ in range(4 * 5):
data = output_queue.get()
print(f"收到数据: {data}")
# 等待所有进程完成
for p in processes:
p.join()
print("所有进程已完成")
# 共享数据的进程
def shared_data_example():
"""使用共享数据的多进程示例"""
def increment_counter(counter, iterations):
"""增加共享计数器"""
for _ in range(iterations):
with counter.get_lock():
counter.value += 1
print("=== 共享数据示例 ===")
# 创建共享计数器
counter = Value('i', 0)
# 创建多个进程
processes = []
for i in range(4):
p = Process(
target=increment_counter,
args=(counter, 1000)
)
p.start()
processes.append(p)
# 等待所有进程完成
for p in processes:
p.join()
print(f"最终计数: {counter.value}")总结
Python 的高级特性提供了强大的工具来构建复杂的应用:
- 面向对象设计:通过继承、多态和设计模式构建可维护的代码
- 装饰器:以优雅的方式添加功能到现有代码
- 生成器:处理大数据集时节省内存
- 上下文管理器:确保资源正确释放
- 多线程/多进程:实现并发执行,提高程序效率
对于 LeBot 这样的机器人应用,这些技术对于实现实时控制、数据处理和系统管理至关重要。
参考资源
- Python 官方文档:https://docs.python.org/3/
- Fluent Python:https://www.oreilly.com/library/view/fluent-python/9781491946237/
- Real Python 教程:https://realpython.com/