Skip to content

网络协议进阶

引言

在上一章中,我们学习了 Socket 通信和蓝牙通信的基础知识。现在我们要深入探讨网络协议的进阶概念,包括 TCP/IP 协议栈、UDP 的高级用法、协议设计、网络安全等内容。这些知识对于开发 LeBot 这样的实时机器人系统至关重要,因为机器人需要与控制端进行稳定、高效、安全的通信。

TCP/IP 协议栈详解

协议栈概述

TCP/IP 协议栈是现代网络通信的基础,它由多层协议组成,从下往上分别是:

  1. 物理层:定义如何在物理介质上传输数据(网线、光纤等)
  2. 数据链路层:处理帧的传输、MAC 地址等(以太网协议)
  3. 网络层:负责路由和 IP 寻址(IP 协议)
  4. 传输层:提供端到端的通信服务(TCP、UDP 协议)
  5. 应用层:提供各种应用服务(HTTP、FTP、DNS 等)
应用层   │ HTTP, FTP, DNS, Telnet, SSH, 自定义协议 (如 LeBot 控制协议)

传输层   │ TCP, UDP

网络层   │ IP (IPv4, IPv6), ICMP

数据链路层│ 以太网, Wi-Fi, PPP

物理层   │ 光纤, 铜线, 无线电波

IP 协议(Internet Protocol)

IP 协议是网络层的核心协议,负责将数据包从源主机传送到目标主机。

IPv4 地址空间

IPv4 地址由 32 位二进制数组成,通常用四个十进制数表示(点分十进制):

192.168.1.100

转换为二进制:
11000000.10101000.00000001.01100100

IPv4 地址分为五类:

  • A 类(1.0.0.0 ~ 126.0.0.0):用于大型网络
  • B 类(128.0.0.0 ~ 191.255.0.0):用于中等网络
  • C 类(192.0.0.0 ~ 223.255.255.0):用于小型网络
  • D 类(224.0.0.0 ~ 239.255.255.255):用于组播
  • E 类(240.0.0.0 ~ 255.255.255.255):保留

子网掩码和 CIDR 表示法

子网掩码用于确定 IP 地址的网络部分和主机部分:

IP 地址:        192.168.1.100
子网掩码:      255.255.255.0

网络地址:      192.168.1.0
广播地址:      192.168.1.255
可用主机范围:  192.168.1.1 ~ 192.168.1.254

CIDR 表示法(无类域间路由)更加简洁:

192.168.1.0/24

其中 /24 表示前 24 位是网络部分,剩余 8 位是主机部分

IPv6 协议

IPv6 使用 128 位地址,提供了更大的地址空间。IPv6 地址用冒号分隔的十六进制表示:

2001:0db8:0000:0000:0000:ff00:0042:8329

简化形式:
2001:db8::ff00:42:8329

IPv6 的优势:

  • 地址空间巨大(2^128 个地址)
  • 内置安全性(IPsec)
  • 更好的多播和任播支持
  • 简化了报头格式

对于 LeBot,如果需要在局域网内通信,IPv4 通常足够。但如果考虑互联网连接或未来扩展,IPv6 是一个很好的选择。

TCP 协议深入

TCP(传输控制协议)提供可靠的、面向连接的通信服务。

TCP 连接的三次握手

建立 TCP 连接需要三次握手:

客户端                              服务器

         SYN (seq=x)
    ─────────────────────>
              [LISTEN]

     [SYN_RECEIVED]
         SYN-ACK (seq=y, ack=x+1)
    <─────────────────────
         
     [ESTABLISHED]
         ACK (seq=x+1, ack=y+1)
    ─────────────────────>
              [ESTABLISHED]

这个过程确保了双方都准备好发送和接收数据。

TCP 连接的四次挥手

关闭 TCP 连接需要四次挥手:

客户端                              服务器

     [ESTABLISHED]
         FIN (seq=x)
    ─────────────────────>
              [CLOSE_WAIT]
     [FIN_WAIT_1]
         ACK (ack=x+1)
    <─────────────────────
     [FIN_WAIT_2]

              [LAST_ACK]
         FIN (seq=y)
    <─────────────────────
         
         ACK (ack=y+1)
    ─────────────────────>
     [TIME_WAIT]          [CLOSED]

TCP 流量控制

TCP 使用滑动窗口机制进行流量控制,确保发送方不会淹没接收方:

python
class TCPFlowControl:
    """TCP 流量控制演示"""
    
    def __init__(self, window_size=65535):
        """
        初始化流量控制
        
        参数:
            window_size: 初始窗口大小(字节)
        """
        self.window_size = window_size
        self.ack_num = 0
        self.send_buffer = bytearray()
        
    def can_send(self, data_size):
        """
        检查是否可以发送数据
        
        参数:
            data_size: 要发送的数据大小
            
        返回:
            是否可以发送
        """
        return len(self.send_buffer) + data_size <= self.window_size
    
    def add_to_send_buffer(self, data):
        """添加数据到发送缓冲区"""
        if self.can_send(len(data)):
            self.send_buffer.extend(data)
            return True
        return False
    
    def receive_ack(self, ack_num, new_window_size):
        """
        接收 ACK,更新窗口
        
        参数:
            ack_num: 确认号
            new_window_size: 新的窗口大小
        """
        # 移除已确认的数据
        if ack_num > self.ack_num:
            removed_size = ack_num - self.ack_num
            del self.send_buffer[:removed_size]
            self.ack_num = ack_num
        
        # 更新窗口大小
        self.window_size = new_window_size
    
    def get_window_available(self):
        """获取窗口中可用空间"""
        return max(0, self.window_size - len(self.send_buffer))

TCP 拥塞控制

TCP 使用多种拥塞控制算法来适应网络状况:

  1. 慢启动(Slow Start):初始拥塞窗口很小,以指数方式增长
  2. 拥塞避免(Congestion Avoidance):拥塞窗口以线性方式增长
  3. 快速重传(Fast Retransmit):检测到丢包立即重传
  4. 快速恢复(Fast Recovery):恢复后进入拥塞避免阶段

UDP 协议深入

UDP(用户数据报协议)提供无连接、不可靠的通信服务,但开销小、延迟低。

UDP 的优势和劣势

优势:

  • 低延迟:无需建立连接
  • 低开销:报头只有 8 字节
  • 适合实时应用:如视频流、游戏、机器人控制

劣势:

  • 不可靠:可能丢包、乱序、重复
  • 无流量控制:可能导致接收方过载
  • 无拥塞控制:可能导致网络拥塞

为 LeBot 设计 UDP 应用层协议

对于实时的机器人控制,UDP 是一个很好的选择。但需要在应用层实现一些可靠性机制。

python
import socket
import struct
import time
from enum import Enum

class MessageType(Enum):
    """消息类型"""
    HEARTBEAT = 1
    MOTOR_COMMAND = 2
    SENSOR_DATA = 3
    IMU_DATA = 4
    STATUS_REQUEST = 5
    STATUS_RESPONSE = 6

class LeBotUDPProtocol:
    """LeBot 的自定义 UDP 协议"""
    
    # 协议常数
    PROTOCOL_VERSION = 1
    HEADER_SIZE = 12  # 版本(1) + 消息类型(1) + 序列号(2) + 时间戳(4) + 数据长度(2) + 校验和(2)
    MAX_PAYLOAD_SIZE = 1024
    
    def __init__(self):
        self.sequence = 0
    
    def create_header(self, msg_type, payload):
        """
        创建消息头
        
        参数:
            msg_type: 消息类型
            payload: 消息负载
            
        返回:
            消息头的字节数据
        """
        self.sequence = (self.sequence + 1) % 65536
        timestamp = int(time.time() * 1000) & 0xFFFFFFFF
        payload_len = len(payload)
        
        # 打包头部(不包括校验和)
        header = struct.pack(
            '>BBHIHxx',  # 大端序:版本、类型、序列号、时间戳、长度、2字节填充
            self.PROTOCOL_VERSION,
            msg_type.value,
            self.sequence,
            timestamp,
            payload_len
        )
        
        return header, self.sequence
    
    def calculate_checksum(self, data):
        """
        计算简单校验和
        
        参数:
            data: 要校验的数据
            
        返回:
            校验和
        """
        checksum = 0
        for byte in data:
            checksum = (checksum + byte) & 0xFFFF
        return (~checksum) & 0xFFFF
    
    def create_motor_command(self, motor_id, target_angle, velocity, torque_limit):
        """
        创建电机命令消息
        
        参数:
            motor_id: 电机 ID
            target_angle: 目标角度(弧度)
            velocity: 目标速度(弧度/秒)
            torque_limit: 力矩限制(牛米)
            
        返回:
            完整的消息字节数据和序列号
        """
        # 创建负载
        payload = struct.pack(
            '>BfffH',  # 大端序:ID、角度、速度、力矩限制、预留
            motor_id,
            target_angle,
            velocity,
            torque_limit,
            0
        )
        
        # 创建头部
        header, seq = self.create_header(MessageType.MOTOR_COMMAND, payload)
        
        # 计算校验和
        checksum = self.calculate_checksum(header + payload)
        
        # 添加校验和到头部最后 2 字节
        message = header[:-2] + struct.pack('>H', checksum) + payload
        
        return message, seq
    
    def create_sensor_data(self, motor_id, current_angle, current_velocity, current_torque, temperature):
        """
        创建传感器数据消息
        
        参数:
            motor_id: 电机 ID
            current_angle: 当前角度
            current_velocity: 当前速度
            current_torque: 当前力矩
            temperature: 温度(摄氏度)
            
        返回:
            完整的消息字节数据和序列号
        """
        payload = struct.pack(
            '>BfffB',  # 大端序:ID、角度、速度、力矩、温度
            motor_id,
            current_angle,
            current_velocity,
            current_torque,
            int(temperature)
        )
        
        header, seq = self.create_header(MessageType.SENSOR_DATA, payload)
        checksum = self.calculate_checksum(header + payload)
        message = header[:-2] + struct.pack('>H', checksum) + payload
        
        return message, seq
    
    def parse_message(self, data):
        """
        解析接收到的消息
        
        参数:
            data: 消息字节数据
            
        返回:
            (消息类型, 序列号, 时间戳, 负载数据) 或 None(如果校验失败)
        """
        if len(data) < self.HEADER_SIZE:
            return None
        
        # 解析头部
        header = data[:self.HEADER_SIZE]
        version, msg_type, seq, timestamp, payload_len, checksum = struct.unpack(
            '>BBHIHxH',
            header
        )
        
        # 验证版本
        if version != self.PROTOCOL_VERSION:
            return None
        
        # 验证长度
        if len(data) < self.HEADER_SIZE + payload_len:
            return None
        
        # 验证校验和
        expected_checksum = self.calculate_checksum(data[:10] + data[12:])
        if checksum != expected_checksum:
            return None
        
        # 提取负载
        payload = data[self.HEADER_SIZE:self.HEADER_SIZE + payload_len]
        
        return (MessageType(msg_type), seq, timestamp, payload)


class UDPRobotController:
    """基于 UDP 的机器人控制客户端"""
    
    def __init__(self, robot_ip, robot_port, timeout=2.0):
        """
        初始化控制器
        
        参数:
            robot_ip: 机器人 IP 地址
            robot_port: 机器人 UDP 端口
            timeout: 超时时间(秒)
        """
        self.robot_ip = robot_ip
        self.robot_port = robot_port
        self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        self.socket.settimeout(timeout)
        self.protocol = LeBotUDPProtocol()
        self.last_ack = {}  # 记录最后收到的 ACK 序列号
        
    def send_motor_command(self, motor_id, target_angle, velocity, torque_limit):
        """
        发送电机命令
        
        参数:
            motor_id: 电机 ID
            target_angle: 目标角度
            velocity: 目标速度
            torque_limit: 力矩限制
            
        返回:
            是否发送成功
        """
        try:
            message, seq = self.protocol.create_motor_command(
                motor_id, target_angle, velocity, torque_limit
            )
            self.socket.sendto(message, (self.robot_ip, self.robot_port))
            return True
        except Exception as e:
            print(f"发送命令失败: {e}")
            return False
    
    def receive_sensor_data(self):
        """
        接收传感器数据
        
        返回:
            解析后的传感器数据字典,或 None
        """
        try:
            data, addr = self.socket.recvfrom(1024)
            parsed = self.protocol.parse_message(data)
            
            if parsed is None:
                return None
            
            msg_type, seq, timestamp, payload = parsed
            
            if msg_type == MessageType.SENSOR_DATA:
                motor_id, angle, velocity, torque, temp = struct.unpack(
                    '>BfffB', payload
                )
                return {
                    'motor_id': motor_id,
                    'angle': angle,
                    'velocity': velocity,
                    'torque': torque,
                    'temperature': temp,
                    'timestamp': timestamp,
                    'sequence': seq
                }
            
            return None
        except socket.timeout:
            return None
        except Exception as e:
            print(f"接收数据失败: {e}")
            return None
    
    def close(self):
        """关闭连接"""
        self.socket.close()


# 使用示例
if __name__ == "__main__":
    # 创建控制器
    controller = UDPRobotController("192.168.1.100", 5000)
    
    try:
        # 发送电机命令
        controller.send_motor_command(
            motor_id=0,
            target_angle=0.5,
            velocity=1.0,
            torque_limit=5.0
        )
        
        # 接收传感器数据
        data = controller.receive_sensor_data()
        if data:
            print(f"电机 {data['motor_id']} 当前角度: {data['angle']:.3f} rad")
            print(f"当前速度: {data['velocity']:.3f} rad/s")
            print(f"当前力矩: {data['torque']:.3f} Nm")
            print(f"温度: {data['temperature']}°C")
    
    finally:
        controller.close()

网络编程实践

异步网络编程

在高实时性的应用中,同步 I/O 可能不够高效。异步 I/O 允许程序同时处理多个连接。

使用 asyncio 进行异步 UDP 通信

python
import asyncio
import struct
from typing import Optional, Dict, List

class AsyncUDPRobotServer:
    """异步 UDP 服务器,用于接收机器人数据"""
    
    def __init__(self, host='0.0.0.0', port=5000):
        self.host = host
        self.port = port
        self.protocol = LeBotUDPProtocol()
        self.transport = None
        self.protocol_obj = None
        self.callbacks = {}  # 消息类型 -> 回调函数列表
        
    def register_callback(self, msg_type, callback):
        """
        注册消息回调
        
        参数:
            msg_type: 消息类型
            callback: 异步回调函数
        """
        if msg_type not in self.callbacks:
            self.callbacks[msg_type] = []
        self.callbacks[msg_type].append(callback)
    
    async def start(self):
        """启动异步服务器"""
        loop = asyncio.get_event_loop()
        
        # 创建 UDP 服务器
        transport, protocol = await loop.create_datagram_endpoint(
            lambda: ServerProtocol(self),
            local_addr=(self.host, self.port)
        )
        
        self.transport = transport
        self.protocol_obj = protocol
        
        print(f"UDP 服务器已启动,监听 {self.host}:{self.port}")
    
    async def handle_message(self, data, addr):
        """
        处理接收到的消息
        
        参数:
            data: 消息数据
            addr: 发送方地址
        """
        parsed = self.protocol.parse_message(data)
        if parsed is None:
            return
        
        msg_type, seq, timestamp, payload = parsed
        
        # 调用所有注册的回调
        if msg_type in self.callbacks:
            for callback in self.callbacks[msg_type]:
                try:
                    await callback(msg_type, seq, timestamp, payload, addr)
                except Exception as e:
                    print(f"回调执行失败: {e}")
    
    async def stop(self):
        """停止服务器"""
        if self.transport:
            self.transport.close()


class ServerProtocol(asyncio.DatagramProtocol):
    """异步 UDP 协议处理类"""
    
    def __init__(self, server):
        self.server = server
        self.transport = None
    
    def connection_made(self, transport):
        self.transport = transport
    
    def datagram_received(self, data, addr):
        # 将接收到的数据放入事件循环处理
        asyncio.ensure_future(self.server.handle_message(data, addr))
    
    def error_received(self, exc):
        print(f"错误: {exc}")


# 异步客户端示例
class AsyncUDPRobotClient:
    """异步 UDP 机器人客户端"""
    
    def __init__(self, robot_ip, robot_port):
        self.robot_ip = robot_ip
        self.robot_port = robot_port
        self.protocol = LeBotUDPProtocol()
        self.transport = None
        self.protocol_obj = None
        self.response_queue = asyncio.Queue()
    
    async def connect(self):
        """连接到机器人"""
        loop = asyncio.get_event_loop()
        transport, protocol = await loop.create_datagram_endpoint(
            lambda: ClientProtocol(self),
            remote_addr=(self.robot_ip, self.robot_port)
        )
        self.transport = transport
        self.protocol_obj = protocol
    
    async def send_command(self, motor_id, angle, velocity, torque_limit):
        """发送电机命令"""
        message, seq = self.protocol.create_motor_command(
            motor_id, angle, velocity, torque_limit
        )
        self.transport.sendto(message)
        return seq
    
    async def receive_response(self, timeout=1.0):
        """接收响应"""
        try:
            return await asyncio.wait_for(self.response_queue.get(), timeout)
        except asyncio.TimeoutError:
            return None
    
    async def close(self):
        """关闭连接"""
        if self.transport:
            self.transport.close()


class ClientProtocol(asyncio.DatagramProtocol):
    """异步 UDP 客户端协议"""
    
    def __init__(self, client):
        self.client = client
        self.transport = None
    
    def connection_made(self, transport):
        self.transport = transport
    
    def datagram_received(self, data, addr):
        parsed = self.client.protocol.parse_message(data)
        if parsed:
            asyncio.ensure_future(self.client.response_queue.put(parsed))


# 异步应用示例
async def robot_control_demo():
    """机器人控制演示"""
    
    # 创建服务器
    server = AsyncUDPRobotServer('0.0.0.0', 5000)
    await server.start()
    
    # 注册回调
    async def on_sensor_data(msg_type, seq, timestamp, payload, addr):
        motor_id, angle, velocity, torque, temp = struct.unpack(
            '>BfffB', payload
        )
        print(f"[{addr}] 电机 {motor_id}: 角度={angle:.3f}, 速度={velocity:.3f}, "
              f"力矩={torque:.3f}, 温度={temp}°C")
    
    server.register_callback(MessageType.SENSOR_DATA, on_sensor_data)
    
    # 创建客户端
    client = AsyncUDPRobotClient('127.0.0.1', 5000)
    await client.connect()
    
    # 发送几个命令
    for i in range(5):
        await client.send_command(
            motor_id=0,
            angle=0.5 + i * 0.1,
            velocity=1.0,
            torque_limit=5.0
        )
        await asyncio.sleep(0.5)
    
    # 保持运行 5 秒
    await asyncio.sleep(5)
    
    # 清理
    await client.close()
    await server.stop()


# 运行异步演示
if __name__ == "__main__":
    asyncio.run(robot_control_demo())

网络安全基础

加密通信

对于连接到互联网的机器人,加密通信是必须的。

使用 TLS/SSL

python
import ssl
import socket

class SecureUDPConnection:
    """安全 UDP 连接(使用 DTLS)"""
    
    # 注意:标准 UDP 不支持 TLS。对于安全 UDP,应使用 DTLS(Datagram TLS)
    # 这里展示如何使用 TCP + TLS 作为替代方案
    
    def __init__(self, host, port, certfile, keyfile):
        self.host = host
        self.port = port
        self.certfile = certfile
        self.keyfile = keyfile
        self.socket = None
        self.context = None
    
    def setup_ssl_context(self, is_server=True):
        """设置 SSL 上下文"""
        self.context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER if is_server else ssl.PROTOCOL_TLS_CLIENT)
        
        if is_server:
            self.context.load_cert_chain(self.certfile, self.keyfile)
            # 要求客户端证书
            self.context.verify_mode = ssl.CERT_REQUIRED
        else:
            self.context.load_verify_locations(self.certfile)
            self.context.verify_mode = ssl.CERT_REQUIRED
        
        # 仅允许 TLS 1.2 及以上
        self.context.minimum_version = ssl.TLSVersion.TLSv1_2
        self.context.set_ciphers('ECDHE+AESGCM:ECDHE+CHACHA20:DHE+AESGCM:DHE+CHACHA20')
    
    def connect_secure(self):
        """建立安全连接"""
        self.setup_ssl_context(is_server=False)
        
        # 创建 TCP 套接字
        tcp_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        
        # 包装为 SSL 套接字
        self.socket = self.context.wrap_socket(
            tcp_socket,
            server_hostname=self.host
        )
        
        self.socket.connect((self.host, self.port))
        print(f"已建立安全连接到 {self.host}:{self.port}")
        print(f"TLS 版本: {self.socket.version()}")
        print(f"加密套件: {self.socket.cipher()}")
    
    def send_secure(self, data):
        """发送加密数据"""
        if self.socket:
            self.socket.sendall(data)
    
    def recv_secure(self, buffer_size=1024):
        """接收加密数据"""
        if self.socket:
            return self.socket.recv(buffer_size)
        return None
    
    def close_secure(self):
        """关闭安全连接"""
        if self.socket:
            self.socket.close()

消息认证

确保消息不被篡改:

python
import hmac
import hashlib

class MessageAuthenticator:
    """消息认证和完整性检查"""
    
    def __init__(self, secret_key):
        """
        初始化认证器
        
        参数:
            secret_key: 共享密钥(字节)
        """
        self.secret_key = secret_key
    
    def generate_signature(self, message):
        """
        生成消息签名
        
        参数:
            message: 消息数据
            
        返回:
            签名
        """
        signature = hmac.new(
            self.secret_key,
            message,
            hashlib.sha256
        ).digest()
        
        return signature
    
    def verify_signature(self, message, signature):
        """
        验证消息签名
        
        参数:
            message: 消息数据
            signature: 签名
            
        返回:
            是否有效
        """
        expected_signature = self.generate_signature(message)
        # 使用常时间比较防止时序攻击
        return hmac.compare_digest(signature, expected_signature)
    
    def create_authenticated_message(self, message):
        """
        创建带有认证的消息
        
        参数:
            message: 消息数据
            
        返回:
            消息 + 签名
        """
        signature = self.generate_signature(message)
        return message + signature
    
    def extract_and_verify(self, authenticated_message):
        """
        提取消息并验证
        
        参数:
            authenticated_message: 认证过的消息
            
        返回:
            (消息, 是否有效) 元组
        """
        # SHA256 签名长度为 32 字节
        signature_len = 32
        
        if len(authenticated_message) < signature_len:
            return None, False
        
        message = authenticated_message[:-signature_len]
        signature = authenticated_message[-signature_len:]
        
        if self.verify_signature(message, signature):
            return message, True
        
        return None, False

网络调试和监控

使用 Wireshark 分析网络流量

虽然我们不能直接在代码中使用 Wireshark,但了解如何使用它进行调试是重要的:

bash
# 启动 Wireshark 并抓取 eth0 接口上的所有 UDP 流量
wireshark -i eth0 -f "udp"

# 保存抓包文件供后续分析
tshark -i eth0 -f "udp port 5000" -w lebot_traffic.pcap

# 读取和分析抓包文件
tshark -r lebot_traffic.pcap -T fields -e frame.time -e ip.src -e ip.dst -e udp.srcport -e udp.dstport

网络性能监控

python
import time
from collections import deque
from datetime import datetime

class NetworkPerformanceMonitor:
    """网络性能监控"""
    
    def __init__(self, window_size=100):
        """
        初始化监控器
        
        参数:
            window_size: 统计窗口大小(消息数)
        """
        self.window_size = window_size
        self.latencies = deque(maxlen=window_size)
        self.packet_sizes = deque(maxlen=window_size)
        self.timestamps = deque(maxlen=window_size)
        self.dropped_packets = 0
        self.last_seq = -1
    
    def record_latency(self, latency_ms):
        """记录延迟"""
        self.latencies.append(latency_ms)
    
    def record_packet(self, size, sequence_num):
        """
        记录收到的数据包
        
        参数:
            size: 数据包大小
            sequence_num: 序列号(用于检测丢包)
        """
        self.packet_sizes.append(size)
        self.timestamps.append(datetime.now())
        
        # 检测丢包
        if self.last_seq != -1:
            expected_seq = (self.last_seq + 1) % 65536
            if sequence_num != expected_seq:
                # 计算丢失的数据包数
                lost = (sequence_num - expected_seq) % 65536
                self.dropped_packets += lost
        
        self.last_seq = sequence_num
    
    def get_statistics(self):
        """获取统计信息"""
        if not self.latencies:
            return None
        
        latencies = list(self.latencies)
        packet_sizes = list(self.packet_sizes)
        
        return {
            'avg_latency_ms': sum(latencies) / len(latencies),
            'min_latency_ms': min(latencies),
            'max_latency_ms': max(latencies),
            'avg_packet_size': sum(packet_sizes) / len(packet_sizes),
            'throughput_bps': self._calculate_throughput(),
            'dropped_packets': self.dropped_packets,
            'packet_loss_rate': self._calculate_packet_loss()
        }
    
    def _calculate_throughput(self):
        """计算吞吐量(比特/秒)"""
        if len(self.timestamps) < 2:
            return 0
        
        first_time = self.timestamps[0]
        last_time = self.timestamps[-1]
        
        time_diff = (last_time - first_time).total_seconds()
        if time_diff == 0:
            return 0
        
        total_bytes = sum(self.packet_sizes)
        return (total_bytes * 8) / time_diff
    
    def _calculate_packet_loss(self):
        """计算丢包率"""
        total_packets = len(self.packet_sizes) + self.dropped_packets
        if total_packets == 0:
            return 0
        
        return (self.dropped_packets / total_packets) * 100
    
    def print_report(self):
        """打印性能报告"""
        stats = self.get_statistics()
        
        if stats is None:
            print("没有数据可显示")
            return
        
        print("=" * 50)
        print("网络性能报告")
        print("=" * 50)
        print(f"平均延迟: {stats['avg_latency_ms']:.2f} ms")
        print(f"最小延迟: {stats['min_latency_ms']:.2f} ms")
        print(f"最大延迟: {stats['max_latency_ms']:.2f} ms")
        print(f"平均数据包大小: {stats['avg_packet_size']:.0f} 字节")
        print(f"吞吐量: {stats['throughput_bps'] / 1000:.2f} kbps")
        print(f"丢弃数据包: {stats['dropped_packets']} 个")
        print(f"丢包率: {stats['packet_loss_rate']:.2f}%")
        print("=" * 50)


# 使用示例
if __name__ == "__main__":
    monitor = NetworkPerformanceMonitor()
    
    # 模拟接收数据包
    import random
    
    for i in range(100):
        # 模拟延迟(5-50 ms)
        latency = random.uniform(5, 50)
        monitor.record_latency(latency)
        
        # 模拟数据包大小(100-500 字节)
        size = random.randint(100, 500)
        monitor.record_packet(size, i)
        
        time.sleep(0.01)
    
    # 打印报告
    monitor.print_report()

网络协议设计最佳实践

1. 清晰的报文格式

设计自定义协议时,应该遵循清晰的结构:

┌─────────────────────────────────────────┐
│  帧头 (4 字节)                          │
├─────────────────────────────────────────┤
│  版本 (1 字节) | 类型 (1 字节)          │
├─────────────────────────────────────────┤
│  序列号 (2 字节)                        │
├─────────────────────────────────────────┤
│  时间戳 (4 字节)                        │
├─────────────────────────────────────────┤
│  数据长度 (2 字节)                      │
├─────────────────────────────────────────┤
│  校验和 (2 字节)                        │
├─────────────────────────────────────────┤
│  负载数据 (可变长度)                    │
├─────────────────────────────────────────┤
│  帧尾 (1 字节)                          │
└─────────────────────────────────────────┘

2. 向后兼容性

协议版本号允许在不破坏现有设备的情况下进行升级。

3. 错误处理

  • 实现重传机制
  • 检测和处理超时
  • 记录所有错误

4. 性能考虑

  • 最小化数据包大小
  • 选择合适的传输层协议(TCP 还是 UDP)
  • 考虑网络延迟对控制的影响

总结

网络协议进阶涉及多个层面:

  • 深入理解 TCP/IP 栈 是设计可靠网络应用的基础
  • UDP 适合实时应用,但需要应用层支持
  • 异步编程 可以提高多连接处理效率
  • 安全性 对于连网机器人至关重要
  • 监控和调试 工具帮助优化网络性能

对于 LeBot 这样的机器人系统,选择合适的协议和实现方式直接影响控制效果和系统稳定性。


参考资源

  • RFC 793: Transmission Control Protocol (TCP)
  • RFC 768: User Datagram Protocol (UDP)
  • RFC 5246: The Transport Layer Security (TLS) Protocol Version 1.2
  • "Computer Networking: A Top-Down Approach" by Kurose & Ross

由 LeBot 开发团队编写