Skip to content

数据结构与算法

本文档为 LeBot 青少年入门指南系列,介绍数据结构和算法的基础概念。掌握这些知识对于编写高效的机器人控制程序至关重要。

1. 为什么学习数据结构和算法

1.1 重要性

数据结构和算法是计算机科学的核心基础:

  • 效率:好的算法能大幅提升程序性能
  • 可维护性:清晰的数据结构让代码更容易理解和维护
  • 可扩展性:合适的算法能处理更大规模的数据
  • LeBot 应用:机器人运动规划、路径搜索、传感器数据处理都需要高效算法

1.2 算法复杂度

我们用时间复杂度和空间复杂度来评估算法的效率。

时间复杂度记号(Big O):

O(1)      - 常数时间(最优)
O(log n)  - 对数时间
O(n)      - 线性时间
O(n log n)- 线性对数时间
O(n²)     - 平方时间
O(n³)     - 立方时间
O(2ⁿ)     - 指数时间(很差)
O(n!)     - 阶乘时间(极差)

复杂度比较:

数据规模 n | O(1) | O(log n) | O(n) | O(n log n) | O(n²) | O(2ⁿ)
-----------|------|----------|------|-----------|-------|-------
10        | 1ns  | 3ns      | 10ns | 30ns      | 100ns | 1μs
100       | 1ns  | 7ns      | 100ns| 700ns     | 10μs  | 超长
1000      | 1ns  | 10ns     | 1μs  | 10μs      | 1ms   | 超长
10000     | 1ns  | 13ns     | 10μs | 130μs     | 100ms | 超长

Python 示例:计算复杂度

python
import time

def measure_time(func, n):
    """测量函数执行时间"""
    start = time.time()
    result = func(n)
    end = time.time()
    return end - start, result

# O(1) - 常数时间
def constant_time(n):
    return 42

# O(log n) - 对数时间
def logarithmic_time(n):
    count = 0
    while n > 1:
        n //= 2
        count += 1
    return count

# O(n) - 线性时间
def linear_time(n):
    total = 0
    for i in range(n):
        total += i
    return total

# O(n²) - 平方时间
def quadratic_time(n):
    total = 0
    for i in range(n):
        for j in range(n):
            total += 1
    return total

# 测试
for n in [100, 1000, 10000]:
    print(f"\nn = {n}:")
    print(f"O(1):  {measure_time(constant_time, n)[0]:.6f}s")
    print(f"O(log n): {measure_time(logarithmic_time, n)[0]:.6f}s")
    print(f"O(n):  {measure_time(linear_time, n)[0]:.6f}s")
    if n <= 1000:
        print(f"O(n²): {measure_time(quadratic_time, n)[0]:.6f}s")

2. 基本数据结构

2.1 数组(Array)

数组是最基本的数据结构,存储相同类型的元素。

特点:

  • 连续的内存地址
  • 随机访问:O(1)
  • 插入/删除:O(n)

Python 中的数组操作:

python
import array
import numpy as np

# Python 列表(动态数组)
sensors = [10.5, 20.3, 15.2, 18.9]
print(f"第一个传感器值: {sensors[0]}")
print(f"数组长度: {len(sensors)}")

# 添加元素 O(1) 摊销
sensors.append(22.1)

# 查找元素 O(n)
if 20.3 in sensors:
    print("找到传感器值 20.3")

# 数组切片
subset = sensors[1:3]
print(f"子数组: {subset}")

# NumPy 数组(用于科学计算)
positions = np.array([[1, 2, 3],
                      [4, 5, 6],
                      [7, 8, 9]])
print(f"位置数据形状: {positions.shape}")
print(f"第一行: {positions[0]}")

# 向量化操作(快速)
scaled = positions * 2
print(f"缩放后: {scaled}")

Rust 中的数组:

rust
fn main() {
    // 固定大小数组
    let sensor_values: [f32; 4] = [10.5, 20.3, 15.2, 18.9];
    println!("第一个传感器值: {}", sensor_values[0]);
    println!("数组长度: {}", sensor_values.len());

    // 动态数组 Vec
    let mut positions = vec![1.0, 2.0, 3.0];
    positions.push(4.0);  // 添加元素
    println!("位置数据: {:?}", positions);

    // 遍历
    for (index, &value) in positions.iter().enumerate() {
        println!("位置[{}]: {}", index, value);
    }

    // 迭代器链(函数式编程)
    let scaled: Vec<f32> = positions
        .iter()
        .map(|&x| x * 2.0)
        .filter(|&x| x > 3.0)
        .collect();
    println!("缩放后的位置: {:?}", scaled);
}

2.2 链表(Linked List)

链表是由节点组成的序列,每个节点包含数据和指向下一个节点的指针。

特点:

  • 非连续内存
  • 随机访问:O(n)
  • 插入/删除:O(1)(如果已知位置)

Python 链表实现:

python
class ListNode:
    """链表节点"""
    def __init__(self, value):
        self.value = value
        self.next = None

class LinkedList:
    """链表"""
    def __init__(self):
        self.head = None

    def append(self, value):
        """在末尾添加节点 O(n)"""
        new_node = ListNode(value)
        if not self.head:
            self.head = new_node
            return

        current = self.head
        while current.next:
            current = current.next
        current.next = new_node

    def insert(self, index, value):
        """在指定位置插入 O(n)"""
        if index == 0:
            new_node = ListNode(value)
            new_node.next = self.head
            self.head = new_node
            return

        current = self.head
        for _ in range(index - 1):
            if not current:
                return
            current = current.next

        new_node = ListNode(value)
        new_node.next = current.next
        current.next = new_node

    def delete(self, value):
        """删除指定值的节点 O(n)"""
        if not self.head:
            return

        if self.head.value == value:
            self.head = self.head.next
            return

        current = self.head
        while current.next:
            if current.next.value == value:
                current.next = current.next.next
                return
            current = current.next

    def display(self):
        """显示链表内容"""
        elements = []
        current = self.head
        while current:
            elements.append(str(current.value))
            current = current.next
        print(" -> ".join(elements) + " -> None")

    def reverse(self):
        """反转链表 O(n)"""
        prev = None
        current = self.head

        while current:
            next_temp = current.next
            current.next = prev
            prev = current
            current = next_temp

        self.head = prev

# 使用示例
linked_list = LinkedList()
for value in [10, 20, 30, 40, 50]:
    linked_list.append(value)

print("原始链表:")
linked_list.display()

linked_list.insert(2, 25)
print("插入 25 后:")
linked_list.display()

linked_list.delete(30)
print("删除 30 后:")
linked_list.display()

linked_list.reverse()
print("反转后:")
linked_list.display()

在 LeBot 中的应用:

python
# 使用链表实现运动轨迹序列
class TrajectoryNode:
    def __init__(self, position, time):
        self.position = position
        self.time = time
        self.next = None

class TrajectoryPath:
    def __init__(self):
        self.head = None
        self.current = None

    def add_waypoint(self, position, time):
        """添加路径点"""
        new_node = TrajectoryNode(position, time)
        if not self.head:
            self.head = new_node
        else:
            current = self.head
            while current.next:
                current = current.next
            current.next = new_node

    def reset(self):
        """重置到起点"""
        self.current = self.head

    def next_waypoint(self):
        """获取下一个路径点"""
        if self.current:
            waypoint = self.current
            self.current = self.current.next
            return waypoint
        return None

# 使用示例
path = TrajectoryPath()
path.add_waypoint([0, 0], 0)
path.add_waypoint([1, 0], 1)
path.add_waypoint([1, 1], 2)
path.add_waypoint([0, 1], 3)

2.3 栈(Stack)

栈遵循后进先出(LIFO)原则。

特点:

  • 只能在栈顶操作
  • Push(入栈):O(1)
  • Pop(出栈):O(1)

应用:

  • 函数调用堆栈
  • 撤销/重做功能
  • 表达式求值

Python 栈实现:

python
class Stack:
    """栈"""
    def __init__(self):
        self.items = []

    def push(self, item):
        """入栈"""
        self.items.append(item)

    def pop(self):
        """出栈"""
        if not self.is_empty():
            return self.items.pop()
        return None

    def peek(self):
        """查看栈顶元素"""
        if not self.is_empty():
            return self.items[-1]
        return None

    def is_empty(self):
        """检查栈是否为空"""
        return len(self.items) == 0

    def size(self):
        """获取栈大小"""
        return len(self.items)

# 应用:括号匹配检查
def is_balanced(expression):
    """检查括号是否匹配"""
    stack = Stack()
    matching = {'(': ')', '[': ']', '{': '}'}

    for char in expression:
        if char in matching:
            stack.push(char)
        elif char in matching.values():
            if stack.is_empty() or matching[stack.pop()] != char:
                return False

    return stack.is_empty()

# 测试
expressions = [
    "({[]})",      # True
    "({[}])",      # False
    "((()))",      # True
    "([)]",        # False
]

for expr in expressions:
    print(f"{expr}: {is_balanced(expr)}")

# 应用:十进制转二进制
def decimal_to_binary(n):
    """使用栈将十进制转为二进制"""
    stack = Stack()

    while n > 0:
        stack.push(n % 2)
        n //= 2

    binary = ""
    while not stack.is_empty():
        binary += str(stack.pop())

    return binary

print(f"10 的二进制: {decimal_to_binary(10)}")
print(f"255 的二进制: {decimal_to_binary(255)}")

2.4 队列(Queue)

队列遵循先进先出(FIFO)原则。

特点:

  • 在队尾入队,队头出队
  • Enqueue(入队):O(1)
  • Dequeue(出队):O(1)

应用:

  • 任务调度
  • 广度优先搜索(BFS)
  • 消息队列

Python 队列实现:

python
class Queue:
    """队列"""
    def __init__(self):
        self.items = []

    def enqueue(self, item):
        """入队"""
        self.items.append(item)

    def dequeue(self):
        """出队"""
        if not self.is_empty():
            return self.items.pop(0)  # 注意:这是 O(n)
        return None

    def is_empty(self):
        """检查队列是否为空"""
        return len(self.items) == 0

    def size(self):
        """获取队列大小"""
        return len(self.items)

# 更高效的队列实现(使用 collections.deque)
from collections import deque

class EfficientQueue:
    """高效队列"""
    def __init__(self):
        self.items = deque()

    def enqueue(self, item):
        """入队 O(1)"""
        self.items.append(item)

    def dequeue(self):
        """出队 O(1)"""
        if not self.is_empty():
            return self.items.popleft()
        return None

    def is_empty(self):
        """检查队列是否为空"""
        return len(self.items) == 0

    def size(self):
        """获取队列大小"""
        return len(self.items)

# 应用:任务调度
class Task:
    def __init__(self, task_id, priority, duration):
        self.task_id = task_id
        self.priority = priority
        self.duration = duration

    def __repr__(self):
        return f"Task({self.task_id}, P{self.priority}, {self.duration}ms)"

# 优先级队列(使用 heapq)
import heapq

class PriorityQueue:
    """优先级队列"""
    def __init__(self):
        self.items = []

    def enqueue(self, item, priority):
        """入队,优先级越小越优先"""
        heapq.heappush(self.items, (priority, item))

    def dequeue(self):
        """出队"""
        if not self.is_empty():
            return heapq.heappop(self.items)[1]
        return None

    def is_empty(self):
        return len(self.items) == 0

# 机器人任务调度示例
task_queue = PriorityQueue()
task_queue.enqueue(Task(1, 2, 100), 2)  # 低优先级
task_queue.enqueue(Task(2, 1, 50), 1)   # 高优先级
task_queue.enqueue(Task(3, 2, 75), 2)   # 低优先级

print("任务执行顺序:")
while not task_queue.is_empty():
    task = task_queue.dequeue()
    print(f"执行: {task}")

2.5 树(Tree)

树是分层的数据结构,由节点和边组成。

基本概念:

        A (根)
       / \
      B   C
     / \
    D   E

- 根:A
- 内部节点:A, B, C
- 叶子节点:D, E
- 高度:3
- 深度:节点到根的距离

二叉树:

python
class TreeNode:
    """二叉树节点"""
    def __init__(self, value):
        self.value = value
        self.left = None
        self.right = None

class BinaryTree:
    """二叉树"""
    def __init__(self):
        self.root = None

    def insert(self, value):
        """插入值(二叉搜索树方式)"""
        if self.root is None:
            self.root = TreeNode(value)
        else:
            self._insert_recursive(self.root, value)

    def _insert_recursive(self, node, value):
        """递归插入"""
        if value < node.value:
            if node.left is None:
                node.left = TreeNode(value)
            else:
                self._insert_recursive(node.left, value)
        else:
            if node.right is None:
                node.right = TreeNode(value)
            else:
                self._insert_recursive(node.right, value)

    def search(self, value):
        """搜索值"""
        return self._search_recursive(self.root, value)

    def _search_recursive(self, node, value):
        """递归搜索"""
        if node is None:
            return False
        if node.value == value:
            return True
        elif value < node.value:
            return self._search_recursive(node.left, value)
        else:
            return self._search_recursive(node.right, value)

    def inorder_traversal(self):
        """中序遍历(左-根-右)O(n)"""
        result = []
        self._inorder_recursive(self.root, result)
        return result

    def _inorder_recursive(self, node, result):
        """递归中序遍历"""
        if node:
            self._inorder_recursive(node.left, result)
            result.append(node.value)
            self._inorder_recursive(node.right, result)

    def preorder_traversal(self):
        """前序遍历(根-左-右)O(n)"""
        result = []
        self._preorder_recursive(self.root, result)
        return result

    def _preorder_recursive(self, node, result):
        """递归前序遍历"""
        if node:
            result.append(node.value)
            self._preorder_recursive(node.left, result)
            self._preorder_recursive(node.right, result)

    def postorder_traversal(self):
        """后序遍历(左-右-根)O(n)"""
        result = []
        self._postorder_recursive(self.root, result)
        return result

    def _postorder_recursive(self, node, result):
        """递归后序遍历"""
        if node:
            self._postorder_recursive(node.left, result)
            self._postorder_recursive(node.right, result)
            result.append(node.value)

    def levelorder_traversal(self):
        """层序遍历(BFS)O(n)"""
        if not self.root:
            return []

        result = []
        queue = deque([self.root])

        while queue:
            node = queue.popleft()
            result.append(node.value)

            if node.left:
                queue.append(node.left)
            if node.right:
                queue.append(node.right)

        return result

# 使用示例
bst = BinaryTree()
values = [50, 30, 70, 20, 40, 60, 80]
for val in values:
    bst.insert(val)

print("中序遍历(排序):", bst.inorder_traversal())
print("前序遍历:", bst.preorder_traversal())
print("后序遍历:", bst.postorder_traversal())
print("层序遍历:", bst.levelorder_traversal())
print("搜索 40:", bst.search(40))
print("搜索 45:", bst.search(45))

在 LeBot 中的应用 - 机器人决策树:

python
class DecisionNode:
    """决策树节点"""
    def __init__(self, condition=None, action=None):
        self.condition = condition    # 条件函数
        self.action = action          # 执行动作
        self.yes_node = None          # 满足条件
        self.no_node = None           # 不满足条件

    def execute(self, context):
        """执行决策"""
        if self.condition is None:
            # 叶子节点,执行动作
            return self.action(context)

        if self.condition(context):
            if self.yes_node:
                return self.yes_node.execute(context)
        else:
            if self.no_node:
                return self.no_node.execute(context)

        return None

# 机器人行为决策树
def build_robot_decision_tree():
    """构建机器人行为决策树"""

    # 叶子节点(动作)
    move_forward = DecisionNode(
        action=lambda ctx: print("前进") or "moving_forward"
    )
    turn_left = DecisionNode(
        action=lambda ctx: print("左转") or "turning_left"
    )
    turn_right = DecisionNode(
        action=lambda ctx: print("右转") or "turning_right"
    )
    stop = DecisionNode(
        action=lambda ctx: print("停止") or "stopped"
    )

    # 内部节点(决策)
    is_obstacle = DecisionNode(
        condition=lambda ctx: ctx.get("obstacle_detected", False),
        yes_node=turn_right,
        no_node=move_forward
    )

    is_target = DecisionNode(
        condition=lambda ctx: ctx.get("target_reached", False),
        yes_node=stop,
        no_node=is_obstacle
    )

    return is_target

# 使用示例
root = build_robot_decision_tree()

print("场景 1: 前方无障碍,目标未到达")
root.execute({"obstacle_detected": False, "target_reached": False})

print("\n场景 2: 前方有障碍,目标未到达")
root.execute({"obstacle_detected": True, "target_reached": False})

print("\n场景 3: 目标已到达")
root.execute({"target_reached": True})

2.6 图(Graph)

图是由顶点(节点)和边组成的数据结构。

表示方法:

  • 邻接矩阵:O(1) 查询,O(V²) 空间
  • 邻接表:O(V+E) 空间

图的应用:

  • 路径规划
  • 社交网络
  • 导航系统
python
from collections import defaultdict, deque

class Graph:
    """图"""
    def __init__(self, directed=False):
        self.graph = defaultdict(list)
        self.directed = directed

    def add_edge(self, u, v, weight=1):
        """添加边"""
        self.graph[u].append((v, weight))
        if not self.directed:
            self.graph[v].append((u, weight))

    def bfs(self, start):
        """广度优先搜索 O(V+E)"""
        visited = set()
        queue = deque([start])
        visited.add(start)
        result = []

        while queue:
            node = queue.popleft()
            result.append(node)

            for neighbor, _ in self.graph[node]:
                if neighbor not in visited:
                    visited.add(neighbor)
                    queue.append(neighbor)

        return result

    def dfs(self, start):
        """深度优先搜索 O(V+E)"""
        visited = set()
        result = []

        def dfs_recursive(node):
            visited.add(node)
            result.append(node)

            for neighbor, _ in self.graph[node]:
                if neighbor not in visited:
                    dfs_recursive(neighbor)

        dfs_recursive(start)
        return result

    def dijkstra(self, start):
        """Dijkstra 最短路径算法 O((V+E)log V)"""
        import heapq

        distances = {node: float('inf') for node in self.graph}
        distances[start] = 0
        pq = [(0, start)]

        while pq:
            curr_dist, node = heapq.heappop(pq)

            if curr_dist > distances[node]:
                continue

            for neighbor, weight in self.graph[node]:
                new_dist = curr_dist + weight

                if new_dist < distances[neighbor]:
                    distances[neighbor] = new_dist
                    heapq.heappush(pq, (new_dist, neighbor))

        return distances

# 机器人路径规划示例
print("=== 机器人路径规划 ===")
robot_map = Graph()

# 添加位置和连接
locations = [
    ("A", "B", 1),
    ("A", "C", 4),
    ("B", "C", 2),
    ("B", "D", 5),
    ("C", "D", 1),
    ("D", "E", 3),
]

for u, v, w in locations:
    robot_map.add_edge(u, v, w)

print("BFS 探索 (从 A 开始):", robot_map.bfs("A"))
print("DFS 探索 (从 A 开始):", robot_map.dfs("A"))

# 最短路径
distances = robot_map.dijkstra("A")
print("从 A 到各地点的最短距离:")
for loc, dist in sorted(distances.items()):
    print(f"  A -> {loc}: {dist}")

3. 常见算法

3.1 排序算法

比较排序:

python
def bubble_sort(arr):
    """冒泡排序 O(n²)"""
    n = len(arr)
    for i in range(n):
        for j in range(0, n - i - 1):
            if arr[j] > arr[j + 1]:
                arr[j], arr[j + 1] = arr[j + 1], arr[j]
    return arr

def quick_sort(arr):
    """快速排序 O(n log n) 平均"""
    if len(arr) <= 1:
        return arr

    pivot = arr[0]
    less = [x for x in arr[1:] if x <= pivot]
    greater = [x for x in arr[1:] if x > pivot]

    return quick_sort(less) + [pivot] + quick_sort(greater)

def merge_sort(arr):
    """归并排序 O(n log n)"""
    if len(arr) <= 1:
        return arr

    mid = len(arr) // 2
    left = merge_sort(arr[:mid])
    right = merge_sort(arr[mid:])

    return merge(left, right)

def merge(left, right):
    """合并两个排序数组"""
    result = []
    i = j = 0

    while i < len(left) and j < len(right):
        if left[i] <= right[j]:
            result.append(left[i])
            i += 1
        else:
            result.append(right[j])
            j += 1

    result.extend(left[i:])
    result.extend(right[j:])
    return result

# 比较排序算法
import time
import random

data = [random.randint(1, 1000) for _ in range(100)]

algorithms = [
    ("快速排序", quick_sort),
    ("归并排序", merge_sort),
]

for name, func in algorithms:
    test_data = data.copy()
    start = time.time()
    result = func(test_data)
    elapsed = time.time() - start
    print(f"{name}: {elapsed:.6f}s")

计数排序(非比较排序):

python
def counting_sort(arr, max_val):
    """计数排序 O(n+k),k 是最大值"""
    count = [0] * (max_val + 1)

    # 计数
    for num in arr:
        count[num] += 1

    # 重建数组
    result = []
    for i in range(len(count)):
        result.extend([i] * count[i])

    return result

# 用于排序传感器值范围
sensor_readings = [100, 50, 75, 100, 25, 50, 75]
sorted_readings = counting_sort(sensor_readings, 100)
print(f"传感器排序结果: {sorted_readings}")

3.2 搜索算法

线性搜索:

python
def linear_search(arr, target):
    """线性搜索 O(n)"""
    for i in range(len(arr)):
        if arr[i] == target:
            return i
    return -1

# 二分搜索(需要排序的数组)
def binary_search(arr, target):
    """二分搜索 O(log n)"""
    left, right = 0, len(arr) - 1

    while left <= right:
        mid = (left + right) // 2

        if arr[mid] == target:
            return mid
        elif arr[mid] < target:
            left = mid + 1
        else:
            right = mid - 1

    return -1

# 比较搜索算法
sorted_data = [10, 20, 30, 40, 50, 60, 70, 80, 90, 100]

print("线性搜索 50:", linear_search(sorted_data, 50))
print("二分搜索 50:", binary_search(sorted_data, 50))
print("二分搜索 75:", binary_search(sorted_data, 75))

3.3 动态规划

动态规划通过保存中间结果来避免重复计算。

斐波那契数列:

python
# 方法 1:递归(低效)O(2ⁿ)
def fib_recursive(n):
    if n <= 1:
        return n
    return fib_recursive(n - 1) + fib_recursive(n - 2)

# 方法 2:动态规划(备忘录)O(n)
def fib_memo(n, memo=None):
    if memo is None:
        memo = {}

    if n in memo:
        return memo[n]

    if n <= 1:
        return n

    memo[n] = fib_memo(n - 1, memo) + fib_memo(n - 2, memo)
    return memo[n]

# 方法 3:动态规划(表格)O(n)
def fib_dp(n):
    if n <= 1:
        return n

    dp = [0] * (n + 1)
    dp[1] = 1

    for i in range(2, n + 1):
        dp[i] = dp[i - 1] + dp[i - 2]

    return dp[n]

# 比较性能
import time

n = 35
methods = [
    ("递归", fib_recursive),
    ("备忘录", fib_memo),
    ("表格", fib_dp),
]

for name, func in methods:
    start = time.time()
    result = func(n)
    elapsed = time.time() - start
    print(f"{name}: {result} (耗时 {elapsed:.6f}s)")

0/1 背包问题(LeBot 资源分配):

python
def knapsack_dp(weights, values, capacity):
    """0/1 背包问题 O(n*W)"""
    n = len(weights)

    # dp[i][w] = 使用前 i 个物品,容量为 w 时的最大价值
    dp = [[0] * (capacity + 1) for _ in range(n + 1)]

    for i in range(1, n + 1):
        for w in range(capacity + 1):
            # 不取第 i 个物品
            dp[i][w] = dp[i - 1][w]

            # 取第 i 个物品(如果容量足够)
            if weights[i - 1] <= w:
                dp[i][w] = max(
                    dp[i][w],
                    dp[i - 1][w - weights[i - 1]] + values[i - 1]
                )

    # 回溯找出选择的物品
    selected = []
    w = capacity
    for i in range(n, 0, -1):
        if dp[i][w] != dp[i - 1][w]:
            selected.append(i - 1)
            w -= weights[i - 1]

    return dp[n][capacity], selected

# LeBot 功能模块选择(容量限制)
modules = {
    "传感器处理": {"weight": 2, "value": 10},
    "路径规划": {"weight": 3, "value": 15},
    "视觉识别": {"weight": 5, "value": 20},
    "电机控制": {"weight": 2, "value": 12},
    "通信模块": {"weight": 1, "value": 5},
}

weights = [m["weight"] for m in modules.values()]
values = [m["value"] for m in modules.values()]
capacity = 8  # 内存限制

max_value, selected_indices = knapsack_dp(weights, values, capacity)
print(f"最大价值: {max_value}")
print("选择的模块:")
for i in selected_indices:
    module_name = list(modules.keys())[i]
    print(f"  - {module_name}")

3.4 图算法

A 搜索(路径规划):*

python
import heapq
import math

class AStar:
    """A* 路径规划算法"""
    def __init__(self, grid):
        self.grid = grid
        self.rows = len(grid)
        self.cols = len(grid[0])

    def heuristic(self, pos, goal):
        """曼哈顿距离"""
        return abs(pos[0] - goal[0]) + abs(pos[1] - goal[1])

    def get_neighbors(self, pos):
        """获取相邻节点"""
        neighbors = []
        for dx, dy in [(0, 1), (1, 0), (0, -1), (-1, 0)]:
            new_x, new_y = pos[0] + dx, pos[1] + dy
            if 0 <= new_x < self.rows and 0 <= new_y < self.cols:
                if self.grid[new_x][new_y] != 1:  # 1 表示障碍
                    neighbors.append((new_x, new_y))
        return neighbors

    def search(self, start, goal):
        """A* 搜索"""
        open_set = [(0, start)]
        came_from = {}
        g_score = {start: 0}
        f_score = {start: self.heuristic(start, goal)}

        while open_set:
            _, current = heapq.heappop(open_set)

            if current == goal:
                # 回溯路径
                path = []
                while current in came_from:
                    path.append(current)
                    current = came_from[current]
                path.append(start)
                return list(reversed(path))

            for neighbor in self.get_neighbors(current):
                tentative_g = g_score[current] + 1

                if neighbor not in g_score or tentative_g < g_score[neighbor]:
                    came_from[neighbor] = current
                    g_score[neighbor] = tentative_g
                    f_score[neighbor] = (
                        g_score[neighbor] +
                        self.heuristic(neighbor, goal)
                    )
                    heapq.heappush(open_set, (f_score[neighbor], neighbor))

        return []  # 路径不存在

# 机器人环境地图
# 0: 空地,1: 障碍
environment = [
    [0, 0, 0, 0, 0],
    [0, 1, 1, 1, 0],
    [0, 0, 0, 0, 0],
    [0, 1, 0, 1, 0],
    [0, 0, 0, 0, 0],
]

start = (0, 0)
goal = (4, 4)

astar = AStar(environment)
path = astar.search(start, goal)

print("机器人路径:")
for pos in path:
    print(f"  {pos}")

# 可视化路径
map_copy = [row[:] for row in environment]
for pos in path:
    map_copy[pos[0]][pos[1]] = '*'
map_copy[start[0]][start[1]] = 'S'
map_copy[goal[0]][goal[1]] = 'G'

print("\n地图可视化:")
for row in map_copy:
    print(' '.join(str(cell) for cell in row))

4. 实践项目:机器人路径规划系统

整合数据结构和算法的完整项目:

python
class RobotPathPlanner:
    """机器人路径规划系统"""
    def __init__(self, grid_size):
        self.grid_size = grid_size
        self.obstacles = set()
        self.path = []

    def add_obstacle(self, x, y):
        """添加障碍"""
        if 0 <= x < self.grid_size and 0 <= y < self.grid_size:
            self.obstacles.add((x, y))

    def remove_obstacle(self, x, y):
        """移除障碍"""
        self.obstacles.discard((x, y))

    def is_valid(self, x, y):
        """检查位置有效性"""
        return (0 <= x < self.grid_size and
                0 <= y < self.grid_size and
                (x, y) not in self.obstacles)

    def plan_path(self, start, goal):
        """使用 BFS 规划路径"""
        from collections import deque

        queue = deque([(start, [start])])
        visited = {start}

        while queue:
            (x, y), path = queue.popleft()

            if (x, y) == goal:
                self.path = path
                return path

            for dx, dy in [(0, 1), (1, 0), (0, -1), (-1, 0)]:
                nx, ny = x + dx, y + dy

                if self.is_valid(nx, ny) and (nx, ny) not in visited:
                    visited.add((nx, ny))
                    queue.append(((nx, ny), path + [(nx, ny)]))

        return None

    def visualize(self, start, goal):
        """可视化地图和路径"""
        grid = [['.' for _ in range(self.grid_size)]
                for _ in range(self.grid_size)]

        # 添加障碍
        for x, y in self.obstacles:
            grid[x][y] = '#'

        # 添加路径
        for x, y in self.path:
            if (x, y) != start and (x, y) != goal:
                grid[x][y] = '*'

        # 添加起点和终点
        sx, sy = start
        gx, gy = goal
        grid[sx][sy] = 'S'
        grid[gx][gy] = 'G'

        # 打印
        for row in grid:
            print(' '.join(row))

# 使用示例
planner = RobotPathPlanner(10)

# 添加障碍
for i in range(3, 7):
    planner.add_obstacle(i, 3)
planner.add_obstacle(6, 4)
planner.add_obstacle(6, 5)

# 规划路径
start = (0, 0)
goal = (9, 9)
path = planner.plan_path(start, goal)

if path:
    print(f"找到路径,长度: {len(path)}")
    print("地图和路径:")
    planner.visualize(start, goal)
else:
    print("无法找到路径")

5. 复杂度分析总结

数据结构插入删除查找空间
数组O(n)O(n)O(1)O(n)
链表O(1)*O(1)*O(n)O(n)
O(1)O(1)O(n)O(n)
队列O(1)O(1)O(n)O(n)
二叉搜索树O(log n)**O(log n)**O(log n)**O(n)
哈希表O(1)†O(1)†O(1)†O(n)

*已知位置 **平均情况 †平均情况,最坏 O(n)

总结

本章学习了:

  1. 算法复杂度分析
  2. 基本数据结构(数组、链表、栈、队列、树、图)
  3. 常见算法(排序、搜索、动态规划、图算法)
  4. 在 LeBot 中的应用

下一步:结合学到的数据结构和算法知识,深入学习机器人的运动规划算法。

由 LeBot 开发团队编写