操作系统基础
简介
操作系统(Operating System, OS)是计算机硬件和用户应用程序之间的桥梁。它管理计算机的所有资源,包括处理器、内存、磁盘和外围设备。对于 LeBot 机器人开发者来说,理解操作系统的基本原理对于编写高效、稳定的控制程序至关重要。
本指南将帮助青少年开发者理解操作系统的核心概念,包括进程与线程、内存管理、文件系统和系统调用等内容。
1. 操作系统的基本角色
1.1 资源管理
操作系统的主要职责是管理计算机的所有资源:
- 处理器(CPU): 决定哪个程序在什么时候运行
- 内存(RAM): 分配物理内存给不同的程序
- 存储设备: 管理硬盘、SSD 等持久化存储
- 输入/输出设备: 管理键盘、鼠标、屏幕、传感器等设备
1.2 抽象层提供
操作系统向应用程序提供抽象接口,隐藏硬件细节:
应用程序层
↓
系统调用接口
↓
操作系统内核
↓
硬件层例如,当你的 Python 程序写入文件时,你不需要知道硬盘的具体工作原理,操作系统会为你处理所有细节。
1.3 并发与保护
操作系统允许多个程序同时运行,并确保它们不会互相干扰:
# 多个程序可以同时执行
# 程序 A: 计算机器人运动轨迹
# 程序 B: 读取传感器数据
# 程序 C: 显示实时视频流
# 操作系统负责在它们之间分配 CPU 时间2. 进程与线程
2.1 进程(Process)
进程是操作系统中最重要的概念之一。它是一个正在执行的程序实例。
进程的特点:
- 独立的地址空间: 每个进程都有自己的内存空间,互相隔离
- 独立的资源: 文件描述符、环境变量等
- 独立的执行上下文: 独立的 CPU 寄存器和栈空间
import os
import subprocess
import time
# 创建新进程
process = subprocess.Popen(['python', 'robot_controller.py'])
# 获取进程ID
pid = process.pid
print(f"进程ID: {pid}")
# 等待进程完成
process.wait()
# 获取进程状态
print(f"进程返回码: {process.returncode}")在 Linux 上查看进程:
# 列出所有进程
ps aux
# 显示进程树
pstree
# 实时监视进程
top
# 查看 LeBot 控制程序的进程
ps aux | grep lebot2.2 线程(Thread)
线程是进程内的执行单元。同一个进程中的多个线程共享内存空间。
进程 vs 线程:
| 特性 | 进程 | 线程 |
|---|---|---|
| 内存空间 | 独立 | 共享 |
| 创建开销 | 大 | 小 |
| 切换开销 | 大 | 小 |
| 通信方式 | IPC(复杂) | 共享内存(简单) |
| 安全性 | 高(隔离) | 低(需要同步) |
Python 中的多线程示例:
import threading
import time
def read_sensor_data():
"""读取传感器数据的线程函数"""
for i in range(5):
print(f"读取传感器数据: 第 {i+1} 次")
time.sleep(1)
def control_motor():
"""控制电机的线程函数"""
for i in range(5):
print(f"控制电机: 第 {i+1} 次")
time.sleep(1.5)
# 创建线程
thread1 = threading.Thread(target=read_sensor_data, name="SensorThread")
thread2 = threading.Thread(target=control_motor, name="MotorThread")
# 启动线程
thread1.start()
thread2.start()
# 等待线程完成
thread1.join()
thread2.join()
print("所有线程完成")Rust 中的多线程示例:
use std::thread;
use std::time::Duration;
fn main() {
// 创建第一个线程:读取传感器数据
let sensor_thread = thread::spawn(|| {
for i in 1..=5 {
println!("读取传感器数据: 第 {} 次", i);
thread::sleep(Duration::from_secs(1));
}
});
// 创建第二个线程:控制电机
let motor_thread = thread::spawn(|| {
for i in 1..=5 {
println!("控制电机: 第 {} 次", i);
thread::sleep(Duration::from_millis(1500));
}
});
// 等待线程完成
sensor_thread.join().unwrap();
motor_thread.join().unwrap();
println!("所有线程完成");
}2.3 线程同步
当多个线程访问共享数据时,需要进行同步以避免竞态条件。
互斥锁(Mutex)示例:
import threading
import time
# 共享资源
shared_data = {"motor_position": 0}
lock = threading.Lock()
def update_position(amount, thread_id):
"""更新电机位置"""
global shared_data
for _ in range(3):
# 获取锁
with lock:
current = shared_data["motor_position"]
print(f"线程 {thread_id}: 当前位置 = {current}")
# 模拟处理时间
time.sleep(0.1)
# 更新位置
shared_data["motor_position"] = current + amount
print(f"线程 {thread_id}: 更新位置为 {shared_data['motor_position']}")
# 创建多个线程
threads = []
for i in range(3):
t = threading.Thread(target=update_position, args=(10, i))
threads.append(t)
t.start()
# 等待所有线程完成
for t in threads:
t.join()
print(f"最终位置: {shared_data['motor_position']}")Rust 中使用 Mutex:
use std::sync::{Arc, Mutex};
use std::thread;
fn main() {
// 创建受保护的共享数据
let motor_position = Arc::new(Mutex::new(0));
let mut handles = vec![];
for thread_id in 0..3 {
let position = Arc::clone(&motor_position);
let handle = thread::spawn(move || {
for step in 1..=3 {
// 锁定互斥锁
let mut pos = position.lock().unwrap();
println!("线程 {}: 当前位置 = {}", thread_id, *pos);
// 更新位置
*pos += 10;
println!("线程 {}: 更新位置为 {}", thread_id, *pos);
}
});
handles.push(handle);
}
// 等待所有线程完成
for handle in handles {
handle.join().unwrap();
}
let final_pos = motor_position.lock().unwrap();
println!("最终位置: {}", *final_pos);
}3. 内存管理
3.1 内存地址空间
现代操作系统使用虚拟内存来为每个进程提供独立的地址空间。
进程地址空间(虚拟地址)
┌─────────────────┐
│ 栈(Stack) │ ← 局部变量、函数参数、返回地址
├─────────────────┤
│ 堆(Heap) │ ← 动态分配的内存
├─────────────────┤
│ 未初始化数据 │
│ (BSS段) │
├─────────────────┤
│ 初始化数据 │ ← 全局变量
├─────────────────┤
│ 代码段 │ ← 程序指令
└─────────────────┘3.2 栈内存
栈用于存储局部变量和函数调用信息。栈是自动管理的:
def calculate_trajectory():
"""计算机器人轨迹"""
# 这些变量分配在栈上
x = 10.5 # 8 字节浮点数
y = 20.3 # 8 字节浮点数
velocity = 1.5 # 8 字节浮点数
print(f"位置: ({x}, {y}), 速度: {velocity}")
# 函数返回时,栈被自动释放
calculate_trajectory()
# 在 Rust 中展示栈的自动释放
# let position = 100; // 分配到栈
# let velocity = 5; // 分配到栈
# } // 离开作用域,栈自动释放栈溢出示例(要避免):
# 递归深度过大导致栈溢出
def bad_recursion(n):
if n == 0:
return
bad_recursion(n - 1) # 无限递归
# 这会导致栈溢出错误
# bad_recursion(100000)3.3 堆内存
堆用于动态分配内存,需要手动或自动管理。
Python 中的堆分配:
# Python 自动管理堆内存
import numpy as np
# 创建大型数组在堆上
sensor_data = np.zeros((1000, 3)) # 3000 个浮点数
# 填充数据
for i in range(1000):
sensor_data[i] = [i * 0.1, i * 0.2, i * 0.3]
print(f"数组大小: {sensor_data.nbytes} 字节")
# Python 的垃圾回收会自动释放内存
# 当 sensor_data 不再被引用时
del sensor_dataRust 中的堆管理:
fn main() {
// 在堆上分配 Vec
let mut positions = Vec::new();
for i in 0..1000 {
positions.push((i as f32 * 0.1, i as f32 * 0.2));
}
println!("位置数据: {} 个点", positions.len());
// 离开作用域时,Rust 自动释放堆内存
} // 内存自动被释放
// Rust 的所有权系统确保内存安全
fn process_data(data: Vec<f32>) {
// 函数获得 data 的所有权
println!("处理 {} 个数据点", data.len());
// 函数返回时,data 被释放
}3.4 内存分页
虚拟内存系统使用分页机制:
虚拟地址空间 物理内存
┌─────────┐ ┌─────────┐
│ Page 0 │ ──┐ │ Frame 0 │
├─────────┤ │ ├─────────┤
│ Page 1 │ ├─ TLB ─→ Frame 1 │
├─────────┤ │ ├─────────┤
│ Page 2 │ │ │ Frame 2 │
├─────────┤ │ ├─────────┤
│ Page 3 │ ──┘ │ Frame 3 │
└─────────┘ └─────────┘
(磁盘) (RAM)
TLB = Translation Lookaside Buffer(地址转换缓冲)页面故障处理:
# 当访问不在物理内存中的页面时,触发页面故障
# 操作系统会:
# 1. 暂停当前进程
# 2. 从磁盘加载所需页面
# 3. 恢复进程执行
# 对开发者来说是透明的,但会影响性能
import time
# 模拟大数据访问
large_array = [0] * (100_000_000) # 400MB
start = time.time()
total = sum(large_array) # 顺序访问(局部性好)
end = time.time()
print(f"顺序访问耗时: {end - start:.4f} 秒")4. 文件系统
4.1 文件系统概念
文件系统是操作系统用来组织和存储数据的方式。
常见的文件系统:
- ext4: Linux 标准文件系统
- NTFS: Windows 文件系统
- FAT32: U盘、SD卡常用
- Btrfs: 新一代 Linux 文件系统
4.2 文件和目录
根目录 /
├── bin/ # 可执行程序
├── home/ # 用户主目录
│ └── robot/
│ ├── lebot_controller.py
│ └── sensor_data/
├── var/ # 可变数据
├── tmp/ # 临时文件
└── etc/ # 配置文件
└── ros/Python 文件操作:
import os
import shutil
from pathlib import Path
# 使用 pathlib(推荐的现代方式)
home_dir = Path.home()
lebot_dir = home_dir / "lebot_project"
# 创建目录
lebot_dir.mkdir(parents=True, exist_ok=True)
# 创建文件
config_file = lebot_dir / "config.txt"
config_file.write_text("""
ROBOT_NAME=LeBot
VELOCITY=1.5
MAX_DISTANCE=100
""")
# 读取文件
config_content = config_file.read_text()
print("配置内容:")
print(config_content)
# 列出目录
for item in lebot_dir.iterdir():
if item.is_file():
print(f"文件: {item.name}, 大小: {item.stat().st_size} 字节")
else:
print(f"目录: {item.name}")
# 删除文件
# config_file.unlink()
# 删除目录及其内容
# shutil.rmtree(lebot_dir)Rust 文件操作:
use std::fs;
use std::path::Path;
use std::io::Write;
fn main() -> std::io::Result<()> {
let project_dir = "lebot_project";
// 创建目录
fs::create_dir_all(project_dir)?;
// 创建并写入文件
let config_path = format!("{}/config.txt", project_dir);
let mut file = fs::File::create(&config_path)?;
file.write_all(b"ROBOT_NAME=LeBot\n")?;
file.write_all(b"VELOCITY=1.5\n")?;
file.write_all(b"MAX_DISTANCE=100\n")?;
// 读取文件
let content = fs::read_to_string(&config_path)?;
println!("配置内容:");
println!("{}", content);
// 列出目录
for entry in fs::read_dir(project_dir)? {
let entry = entry?;
let path = entry.path();
let file_name = entry.file_name();
println!("文件: {}", file_name.display());
}
// 删除文件
// fs::remove_file(&config_path)?;
// 删除目录
// fs::remove_dir_all(project_dir)?;
Ok(())
}4.3 inode 和文件元数据
Linux 文件系统中的每个文件都有一个 inode(索引节点)存储元数据:
# 查看文件的 inode 信息
ls -i robot_controller.py
# 详细信息
stat robot_controller.py
# 输出示例:
# File: robot_controller.py
# Size: 4096 Blocks: 8 IO Block: 4096 regular file
# Device: 803h/2051d Inode: 123456 Links: 1
# Access: (0644/-rw-r--r--) Uid: ( 1000/ user) Gid: ( 1000/ user)
# Modify: 2024-01-15 10:30:45.123456789 +0800inode 包含的信息:
- 文件类型(普通文件、目录、符号链接等)
- 文件大小
- 权限(rwx)
- 所有者(UID、GID)
- 时间戳(创建、修改、访问)
- 指向数据块的指针
import os
import stat
from pathlib import Path
file_path = Path("robot_controller.py")
# 获取文件元数据
stat_info = file_path.stat()
print(f"文件大小: {stat_info.st_size} 字节")
print(f"文件权限: {oct(stat_info.st_mode)[-3:]}")
print(f"所有者 UID: {stat_info.st_uid}")
print(f"最后修改时间: {stat_info.st_mtime}")
# 检查权限
is_readable = bool(stat_info.st_mode & stat.S_IRUSR)
is_writable = bool(stat_info.st_mode & stat.S_IWUSR)
is_executable = bool(stat_info.st_mode & stat.S_IXUSR)
print(f"可读: {is_readable}, 可写: {is_writable}, 可执行: {is_executable}")
# 改变权限
# file_path.chmod(0o755) # rwxr-xr-x5. 系统调用
5.1 什么是系统调用
系统调用是应用程序与操作系统内核交互的唯一方式。它们提供了访问硬件资源和系统功能的接口。
应用程序
↓
系统调用库(libc)
↓
系统调用接口
↓
操作系统内核
↓
硬件5.2 常见的系统调用
进程相关:
import os
import sys
# fork() - 创建新进程(UNIX/Linux)
# pid = os.fork()
# if pid == 0:
# print("子进程")
# else:
# print(f"父进程,子进程ID: {pid}")
# exec() - 执行新程序
# os.execvp("ls", ["ls", "-la"])
# 获取进程ID
print(f"当前进程ID: {os.getpid()}")
print(f"父进程ID: {os.getppid()}")
# wait() - 等待子进程
import subprocess
process = subprocess.Popen(['sleep', '2'])
process.wait()文件相关:
import os
# open() - 打开文件
fd = os.open("test.txt", os.O_CREAT | os.O_WRONLY)
# write() - 写入数据
os.write(fd, b"Hello, LeBot!\n")
# read() - 读取数据
# data = os.read(fd, 100)
# close() - 关闭文件
os.close(fd)
# unlink() - 删除文件
os.unlink("test.txt")内存相关:
import ctypes
# brk() - 改变堆大小(通常由内存分配器使用)
# mmap() - 内存映射文件
# 在 Python 中可以使用内存映射文件:
import mmap
with open("large_file.bin", "r+b") as f:
with mmap.mmap(f.fileno(), 0) as mmapped:
# 直接访问文件内容
print(mmapped[:100])5.3 系统调用开销
系统调用涉及到上下文切换,开销较大。频繁的系统调用会影响性能:
import time
import os
# 比较:直接内存操作 vs 系统调用
# 方法1:直接内存操作(快)
start = time.time()
data = []
for i in range(1_000_000):
data.append(i)
direct_time = time.time() - start
print(f"直接内存操作: {direct_time:.4f} 秒")
# 方法2:系统调用(慢)
start = time.time()
for i in range(1_000_000):
os.write(1, b"x") # 写到标准输出(系统调用)
syscall_time = time.time() - start
print(f"系统调用: {syscall_time:.4f} 秒")
print(f"系统调用慢 {syscall_time / direct_time:.1f} 倍")
# 优化建议:批量操作,减少系统调用次数
start = time.time()
output = "x" * 1_000_000
os.write(1, output.encode()) # 一次系统调用
optimized_time = time.time() - start
print(f"批量系统调用: {optimized_time:.4f} 秒")6. 进程调度
6.1 调度基础
操作系统的调度器决定了哪个进程/线程在什么时候运行。
调度的目标:
- 公平性: 每个进程都有机会运行
- 吞吐量: 单位时间内完成的工作最多
- 响应时间: 从请求到响应的时间最短
- 周转时间: 从提交到完成的时间最短
6.2 调度算法
时间片轮转(Round Robin):
进程队列: P1 -> P2 -> P3 -> P4
时间片: 10ms
时间轴:
0-10ms: 运行 P1
10-20ms: 运行 P2
20-30ms: 运行 P3
30-40ms: 运行 P4
40-50ms: 运行 P1 (继续)
...优先级调度:
# 模拟优先级调度
from queue import PriorityQueue
import heapq
class Task:
def __init__(self, name, priority, duration):
self.name = name
self.priority = priority
self.duration = duration
def __lt__(self, other):
return self.priority < other.priority
# 任务队列(优先级越小越高)
tasks = [
Task("传感器数据读取", 1, 5), # 高优先级
Task("电机控制", 1, 8),
Task("日志记录", 3, 2), # 低优先级
Task("视频处理", 2, 10),
]
queue = PriorityQueue()
for task in tasks:
queue.put((task.priority, task.name, task.duration))
# 执行任务
print("任务执行顺序:")
while not queue.empty():
priority, name, duration = queue.get()
print(f"执行: {name} (优先级: {priority}, 耗时: {duration}ms)")6.3 上下文切换
import os
import resource
# 获取进程的上下文切换信息
usage = resource.getrusage(resource.RUSAGE_SELF)
print(f"用户 CPU 时间: {usage.ru_utime:.4f} 秒")
print(f"系统 CPU 时间: {usage.ru_stime:.4f} 秒")
print(f"自愿上下文切换: {usage.ru_nvcsw}")
print(f"非自愿上下文切换: {usage.ru_nivcsw}")
# 自愿上下文切换: 进程等待 I/O、睡眠等
# 非自愿上下文切换: 时间片用尽,被强制切换7. 同步与通信
7.1 信号量(Semaphore)
信号量是一种用于同步的机制,包含一个计数器和两个原子操作。
二元信号量示例:
import threading
import time
class BinarySemaphore:
"""简单的二元信号量实现"""
def __init__(self):
self.lock = threading.Lock()
self.condition = threading.Condition(self.lock)
self.value = 1
def wait(self):
"""P 操作"""
with self.condition:
while self.value == 0:
self.condition.wait()
self.value -= 1
def signal(self):
"""V 操作"""
with self.condition:
self.value += 1
self.condition.notify_all()
# 使用信号量保护共享资源
semaphore = BinarySemaphore()
shared_resource = {"motor_position": 0}
def thread_function(thread_id):
for i in range(3):
semaphore.wait()
try:
# 临界区
pos = shared_resource["motor_position"]
print(f"线程 {thread_id}: 读取位置 = {pos}")
time.sleep(0.1)
shared_resource["motor_position"] = pos + 1
finally:
semaphore.signal()
threads = [threading.Thread(target=thread_function, args=(i,)) for i in range(3)]
for t in threads:
t.start()
for t in threads:
t.join()
print(f"最终位置: {shared_resource['motor_position']}")计数信号量示例:
import threading
import time
class CountingSemaphore:
"""计数信号量"""
def __init__(self, initial=1):
self.lock = threading.Lock()
self.condition = threading.Condition(self.lock)
self.value = initial
def wait(self):
with self.condition:
while self.value <= 0:
self.condition.wait()
self.value -= 1
def signal(self):
with self.condition:
self.value += 1
self.condition.notify()
# 模拟资源池(例如,最多3个线程可以同时访问资源)
semaphore = CountingSemaphore(3)
def access_resource(thread_id):
print(f"线程 {thread_id}: 等待资源")
semaphore.wait()
print(f"线程 {thread_id}: 获得资源")
time.sleep(2)
print(f"线程 {thread_id}: 释放资源")
semaphore.signal()
threads = [threading.Thread(target=access_resource, args=(i,)) for i in range(6)]
for t in threads:
t.start()
for t in threads:
t.join()7.2 消息队列
import queue
import threading
import time
# 创建消息队列
msg_queue = queue.Queue(maxsize=10)
def producer(name):
"""生产者线程"""
for i in range(5):
message = f"{name}-消息-{i}"
msg_queue.put(message)
print(f"{name} 发送: {message}")
time.sleep(0.5)
def consumer(name):
"""消费者线程"""
while True:
try:
message = msg_queue.get(timeout=1)
print(f"{name} 接收: {message}")
msg_queue.task_done()
except queue.Empty:
break
# 启动生产者和消费者
p_thread = threading.Thread(target=producer, args=("生产者1",))
c_thread1 = threading.Thread(target=consumer, args=("消费者1",))
c_thread2 = threading.Thread(target=consumer, args=("消费者2",))
p_thread.start()
c_thread1.start()
c_thread2.start()
p_thread.join()
c_thread1.join()
c_thread2.join()
print("生产消费完成")8. 实时操作系统(RTOS)基础
8.1 实时系统的特点
LeBot 机器人需要实时性,因为控制算法需要在确定的时间内执行。
实时系统的需求:
- 确定性: 任务必须在最坏情况下完成
- 低延迟: 响应时间短且可预测
- 高精度: 计时精度高
8.2 硬实时 vs 软实时
| 特性 | 硬实时 | 软实时 |
|---|---|---|
| 截止期限 | 必须满足 | 可以偶尔错过 |
| 后果 | 严重 | 性能下降 |
| 例子 | 飞行控制 | 多媒体播放 |
8.3 优先级反演问题
# 优先级反演示例
import threading
import time
high_priority_task_done = False
mutex = threading.Lock()
def low_priority_task():
"""低优先级任务持有锁"""
with mutex:
print("低优先级任务: 获得锁")
time.sleep(2) # 长时间持有
print("低优先级任务: 释放锁")
def high_priority_task():
"""高优先级任务等待锁"""
global high_priority_task_done
time.sleep(0.5) # 稍后执行
print("高优先级任务: 等待锁")
with mutex:
print("高优先级任务: 获得锁")
high_priority_task_done = True
# 低优先级任务先运行,导致高优先级任务等待
low_thread = threading.Thread(target=low_priority_task)
high_thread = threading.Thread(target=high_priority_task)
low_thread.start()
high_thread.start()
low_thread.join()
high_thread.join()
# 解决方案:使用优先级继承或优先级天花板9. 实际应用:LeBot 控制示例
9.1 多线程机器人控制程序
import threading
import time
import queue
from dataclasses import dataclass
from typing import List
@dataclass
class MotorCommand:
motor_id: int
velocity: float
duration: float
class LeBotController:
def __init__(self):
self.sensor_data = {"imu": [0, 0, 0], "distance": 0}
self.motor_speeds = [0, 0, 0, 0] # 4个轮腿电机
self.command_queue = queue.Queue()
self.running = True
# 互斥锁保护共享数据
self.sensor_lock = threading.Lock()
self.motor_lock = threading.Lock()
# 启动各个线程
self.threads = []
self._start_threads()
def _start_threads(self):
"""启动各个工作线程"""
threads = [
threading.Thread(target=self._sensor_reader, name="SensorReader"),
threading.Thread(target=self._motor_controller, name="MotorController"),
threading.Thread(target=self._command_processor, name="CommandProcessor"),
]
for t in threads:
t.daemon = True
t.start()
self.threads.append(t)
def _sensor_reader(self):
"""传感器读取线程(50Hz)"""
while self.running:
# 模拟读取传感器数据
import random
imu_data = [random.uniform(-1, 1) for _ in range(3)]
distance = random.uniform(10, 100)
with self.sensor_lock:
self.sensor_data["imu"] = imu_data
self.sensor_data["distance"] = distance
print(f"[传感器] IMU: {imu_data}, 距离: {distance:.1f}cm")
time.sleep(0.02) # 50Hz
def _motor_controller(self):
"""电机控制线程(100Hz)"""
while self.running:
with self.motor_lock:
print(f"[电机] 速度: {self.motor_speeds}")
time.sleep(0.01) # 100Hz
def _command_processor(self):
"""命令处理线程"""
while self.running:
try:
cmd = self.command_queue.get(timeout=1)
with self.motor_lock:
self.motor_speeds[cmd.motor_id] = cmd.velocity
print(f"[命令] 电机{cmd.motor_id}: {cmd.velocity} m/s")
except queue.Empty:
pass
def send_command(self, cmd: MotorCommand):
"""发送电机命令"""
self.command_queue.put(cmd)
def get_sensor_data(self):
"""获取传感器数据"""
with self.sensor_lock:
return dict(self.sensor_data)
def stop(self):
"""停止控制器"""
self.running = False
for t in self.threads:
t.join()
# 使用示例
if __name__ == "__main__":
controller = LeBotController()
# 发送一些命令
for i in range(4):
controller.send_command(MotorCommand(0, 1.5 + i * 0.1, 1.0))
time.sleep(0.5)
# 运行一段时间
time.sleep(3)
# 停止控制器
controller.stop()
print("控制器已停止")10. 总结与最佳实践
10.1 关键要点
进程与线程
- 进程隔离性好但开销大,适合独立任务
- 线程共享内存但需要同步,适合协作任务
- LeBot 可能同时需要多个进程(各个功能模块)和多个线程(同一模块的并发操作)
内存管理
- 理解栈和堆的区别
- 避免内存泄漏(特别是 C/C++)
- 使用 Python 和 Rust 的自动内存管理
文件系统
- 正确处理文件权限
- 理解虚拟文件系统的结构
- 使用路径操作库(pathlib)
并发同步
- 使用互斥锁保护共享数据
- 避免死锁
- 优先使用高级并发原语(Queue、Lock)
性能优化
- 减少系统调用次数
- 批量处理数据
- 使用适当的数据结构
10.2 LeBot 开发建议
系统设计:
LeBot 控制系统
├── 主进程
│ ├── 传感器线程(高优先级)
│ ├── 控制算法线程(高优先级)
│ ├── 电机驱动线程(高优先级)
│ └── UI/通信线程(低优先级)
├── 独立进程:视觉处理(高 CPU 占用)
└── 独立进程:日志记录同步策略:
- 使用消息队列进行线程间通信
- 为关键路径使用实时线程
- 定期监控系统负载和线程状态
10.3 调试技巧
# 监视进程和线程
top -H -p <pid>
# 查看进程的线程
ps -eLf | grep lebot
# 使用 strace 跟踪系统调用
strace -e trace=open,read,write python lebot_controller.py
# 使用 gdb 调试多线程程序
gdb python
(gdb) info threads
(gdb) thread <id>
(gdb) break function_name
(gdb) continue参考资源
- Linux Kernel Documentation: https://www.kernel.org/doc/
- The Linux Programming Interface (书籍)
- Understanding the Linux Kernel (书籍)
- ROS 官方文档: https://docs.ros.org/
练习题
- 编写一个多线程程序,模拟 LeBot 的运动规划(一个线程生成轨迹,一个线程执行轨迹)
- 使用互斥锁实现一个线程安全的传感器数据缓冲区
- 比较创建 100 个进程和创建 100 个线程的性能差异
- 使用消息队列实现一个生产者-消费者模型(传感器数据采集和处理)
- 调查 LeBot 使用的实时操作系统特性(如果有)
下一步建议: 学完本章后,可以继续学习《数据结构与算法》,为机器人的路径规划和控制算法打下基础。