Skip to content

强化学习与 LoRA 微调

强化学习基础

什么是强化学习

强化学习(Reinforcement Learning, RL)是机器学习的一个重要分支,其核心思想是通过与环境的交互,学习策略以最大化累积奖励。不同于监督学习需要标注数据,强化学习通过试错(Trial and Error)的方式学习。

强化学习的基本概念

1. 智能体(Agent)

智能体是学习的主体,可以是机器人、游戏AI等。LeBot 机器人在强化学习框架中就是一个智能体。

2. 环境(Environment)

环境是智能体与之交互的外部系统。环境会根据智能体的动作进行变化,并提供状态和奖励反馈。

Agent ←→ Environment

3. 状态(State)

状态描述了环境的当前配置。对于 LeBot 机器人,状态可能包括:

  • 各关节的角度
  • 传感器读数(IMU、距离传感器等)
  • 摄像头图像
  • 目标位置

4. 动作(Action)

动作是智能体可以执行的操作。对于 LeBot 机器人,包括:

  • 改变关节的目标角度
  • 改变电机的转速
  • 改变身体重心位置

5. 奖励(Reward)

奖励是环境对智能体动作的反馈信号。设计好的奖励函数至关重要。

r(t) = 是否完成任务? + 能量消耗 + 平衡度 + ...

6. 策略(Policy)

策略是智能体根据当前状态选择动作的规则。

强化学习的循环过程

初始化状态 S₀

Agent 根据策略选择动作 A₀

环境执行动作,返回 (R₀, S₁)

Agent 根据奖励更新策略

重复上述过程直到收敛

常见的强化学习算法

1. Q-Learning

Q-Learning 是一种无模型的强化学习算法,学习动作-价值函数 Q(s,a)。

python
import numpy as np

class QLearning:
    def __init__(self, n_states, n_actions, learning_rate=0.1, discount_factor=0.99):
        self.n_states = n_states
        self.n_actions = n_actions
        self.alpha = learning_rate
        self.gamma = discount_factor
        
        # 初始化 Q 表
        self.Q = np.zeros((n_states, n_actions))
    
    def select_action(self, state, epsilon=0.1):
        """ε-贪心策略选择动作"""
        if np.random.random() < epsilon:
            # 探索:随机选择
            return np.random.randint(self.n_actions)
        else:
            # 开采:选择最优动作
            return np.argmax(self.Q[state, :])
    
    def update(self, state, action, reward, next_state, done):
        """更新 Q 值"""
        if done:
            target = reward
        else:
            target = reward + self.gamma * np.max(self.Q[next_state, :])
        
        self.Q[state, action] += self.alpha * (target - self.Q[state, action])

2. 策略梯度(Policy Gradient)

策略梯度方法直接优化策略网络的参数。

python
import torch
import torch.nn as nn
import torch.optim as optim

class PolicyNetwork(nn.Module):
    def __init__(self, state_dim, action_dim, hidden_dim=64):
        super().__init__()
        self.fc1 = nn.Linear(state_dim, hidden_dim)
        self.fc2 = nn.Linear(hidden_dim, hidden_dim)
        self.fc3 = nn.Linear(hidden_dim, action_dim)
    
    def forward(self, state):
        x = torch.relu(self.fc1(state))
        x = torch.relu(self.fc2(x))
        action_probs = torch.softmax(self.fc3(x), dim=-1)
        return action_probs

class PolicyGradient:
    def __init__(self, state_dim, action_dim, learning_rate=0.01):
        self.policy = PolicyNetwork(state_dim, action_dim)
        self.optimizer = optim.Adam(self.policy.parameters(), lr=learning_rate)
        self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
        self.policy = self.policy.to(self.device)
    
    def select_action(self, state):
        """选择动作"""
        state_tensor = torch.FloatTensor(state).unsqueeze(0).to(self.device)
        with torch.no_grad():
            action_probs = self.policy(state_tensor)
        
        action = torch.multinomial(action_probs, 1).item()
        return action
    
    def update(self, states, actions, rewards):
        """使用轨迹数据更新策略"""
        states = torch.FloatTensor(states).to(self.device)
        actions = torch.LongTensor(actions).to(self.device)
        
        # 计算折扣奖励
        discounted_rewards = []
        cumulative_reward = 0
        for r in reversed(rewards):
            cumulative_reward = r + 0.99 * cumulative_reward
            discounted_rewards.insert(0, cumulative_reward)
        
        discounted_rewards = torch.FloatTensor(discounted_rewards).to(self.device)
        discounted_rewards = (discounted_rewards - discounted_rewards.mean()) / (discounted_rewards.std() + 1e-8)
        
        # 计算损失
        action_probs = self.policy(states)
        log_probs = torch.log(action_probs.gather(1, actions.unsqueeze(1)))
        loss = -(log_probs * discounted_rewards.unsqueeze(1)).mean()
        
        # 反向传播
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()
        
        return loss.item()

3. Actor-Critic 方法

Actor-Critic 结合了策略梯度和价值函数,具有更好的学习效率。

python
class ActorCriticNetwork(nn.Module):
    def __init__(self, state_dim, action_dim, hidden_dim=64):
        super().__init__()
        # Actor 网络
        self.actor_fc1 = nn.Linear(state_dim, hidden_dim)
        self.actor_fc2 = nn.Linear(hidden_dim, hidden_dim)
        self.actor_fc3 = nn.Linear(hidden_dim, action_dim)
        
        # Critic 网络
        self.critic_fc1 = nn.Linear(state_dim, hidden_dim)
        self.critic_fc2 = nn.Linear(hidden_dim, hidden_dim)
        self.critic_fc3 = nn.Linear(hidden_dim, 1)
    
    def forward(self, state):
        # Actor 输出
        actor_x = torch.relu(self.actor_fc1(state))
        actor_x = torch.relu(self.actor_fc2(actor_x))
        action_probs = torch.softmax(self.actor_fc3(actor_x), dim=-1)
        
        # Critic 输出
        critic_x = torch.relu(self.critic_fc1(state))
        critic_x = torch.relu(self.critic_fc2(critic_x))
        state_value = self.critic_fc3(critic_x)
        
        return action_probs, state_value

class ActorCritic:
    def __init__(self, state_dim, action_dim, learning_rate=0.01):
        self.model = ActorCriticNetwork(state_dim, action_dim)
        self.optimizer = optim.Adam(self.model.parameters(), lr=learning_rate)
        self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
        self.model = self.model.to(self.device)
        self.gamma = 0.99
    
    def select_action(self, state):
        """选择动作"""
        state_tensor = torch.FloatTensor(state).unsqueeze(0).to(self.device)
        with torch.no_grad():
            action_probs, _ = self.model(state_tensor)
        
        action = torch.multinomial(action_probs, 1).item()
        log_prob = torch.log(action_probs[0, action])
        return action, log_prob
    
    def update(self, state, action, reward, next_state, done, log_prob):
        """更新 Actor-Critic"""
        state_tensor = torch.FloatTensor(state).unsqueeze(0).to(self.device)
        next_state_tensor = torch.FloatTensor(next_state).unsqueeze(0).to(self.device)
        
        _, state_value = self.model(state_tensor)
        with torch.no_grad():
            _, next_state_value = self.model(next_state_tensor)
        
        # 计算时序差分(TD)误差
        if done:
            target = reward
        else:
            target = reward + self.gamma * next_state_value.item()
        
        advantage = target - state_value.item()
        
        # Actor 损失
        actor_loss = -log_prob * advantage
        
        # Critic 损失
        critic_loss = 0.5 * (target - state_value) ** 2
        
        # 总损失
        total_loss = actor_loss + critic_loss
        
        self.optimizer.zero_grad()
        total_loss.backward()
        self.optimizer.step()
        
        return total_loss.item()

LeBot 的强化学习应用

1. 步行策略学习

LeBot 机器人可以通过强化学习自主学习步行策略。

python
import gym
from gym import spaces

class LeBotWalkingEnv(gym.Env):
    """LeBot 步行环境"""
    
    def __init__(self):
        super().__init__()
        
        # 状态空间:关节角度、角速度、IMU 数据等
        self.state_dim = 20  # 假设有 20 维状态
        self.action_dim = 8  # 假设有 8 个可控制的电机
        
        self.observation_space = spaces.Box(
            low=-np.inf, high=np.inf, shape=(self.state_dim,), dtype=np.float32
        )
        self.action_space = spaces.Box(
            low=-1, high=1, shape=(self.action_dim,), dtype=np.float32
        )
        
        self.robot = RobotController()  # 机器人控制器
        self.step_count = 0
        self.max_steps = 1000
    
    def reset(self):
        """重置环境"""
        self.robot.reset_to_default_pose()
        self.step_count = 0
        state = self._get_state()
        return state
    
    def step(self, action):
        """执行一步"""
        # 将动作映射到电机命令
        joint_angles = self._action_to_joint_angles(action)
        
        # 执行动作
        self.robot.set_joint_angles(joint_angles, duration=0.1)
        
        # 获取新状态
        new_state = self._get_state()
        
        # 计算奖励
        reward = self._compute_reward(new_state, action)
        
        # 检查是否完成
        done = self._is_done()
        
        self.step_count += 1
        
        return new_state, reward, done, {}
    
    def _get_state(self):
        """获取当前状态"""
        joint_angles = self.robot.get_joint_angles()
        joint_velocities = self.robot.get_joint_velocities()
        imu_data = self.robot.get_imu_data()
        
        state = np.concatenate([joint_angles, joint_velocities, imu_data])
        return state.astype(np.float32)
    
    def _action_to_joint_angles(self, action):
        """将动作转换为关节角度"""
        # 动作是 -1 到 1 之间的值
        # 需要映射到实际的关节角度范围
        joint_angles = []
        for i, a in enumerate(action):
            # 获取第 i 个关节的范围
            joint_range = self.robot.get_joint_range(i)
            mid_point = (joint_range[0] + joint_range[1]) / 2
            scale = (joint_range[1] - joint_range[0]) / 2
            
            angle = mid_point + a * scale
            joint_angles.append(angle)
        
        return joint_angles
    
    def _compute_reward(self, state, action):
        """计算奖励"""
        # 获取机器人的当前位置和速度
        position = self.robot.get_base_position()
        velocity = self.robot.get_base_velocity()
        
        # 奖励函数的几个部分:
        # 1. 前进速度奖励
        forward_velocity = velocity[0]
        forward_reward = forward_velocity
        
        # 2. 能量消耗惩罚
        action_cost = -0.1 * np.sum(action ** 2)
        
        # 3. 平衡奖励(保持身体直立)
        orientation = self.robot.get_orientation()
        roll, pitch = orientation[0], orientation[1]
        balance_reward = -0.1 * (abs(roll) + abs(pitch))
        
        # 4. 平滑度奖励(减少抖动)
        smoothness_reward = -0.01 * np.sum(np.diff(action) ** 2)
        
        total_reward = forward_reward + action_cost + balance_reward + smoothness_reward
        
        return total_reward
    
    def _is_done(self):
        """检查是否完成"""
        # 如果机器人摔倒
        if self.robot.is_fallen():
            return True
        
        # 如果达到最大步数
        if self.step_count >= self.max_steps:
            return True
        
        return False
    
    def render(self, mode='human'):
        """可视化"""
        # 可选:显示机器人状态
        pass


# 训练步行策略
def train_walking_policy():
    env = LeBotWalkingEnv()
    agent = ActorCritic(env.observation_space.shape[0], env.action_space.shape[0])
    
    num_episodes = 1000
    
    for episode in range(num_episodes):
        state = env.reset()
        episode_reward = 0
        
        for step in range(1000):
            action, log_prob = agent.select_action(state)
            next_state, reward, done, _ = env.step(action)
            
            agent.update(state, action, reward, next_state, done, log_prob)
            
            episode_reward += reward
            state = next_state
            
            if done:
                break
        
        if (episode + 1) % 10 == 0:
            print(f"Episode {episode + 1}, Reward: {episode_reward:.2f}")
    
    return agent

LoRA 微调

什么是 LoRA

LoRA(Low-Rank Adaptation)是一种参数高效的微调方法。与传统的全参数微调不同,LoRA 只在模型中添加少量的可训练参数,大大降低了计算成本和内存需求。

LoRA 的基本原理

对于一个预训练的权重矩阵 ,标准的微调会更新所有参数。而 LoRA 则使用两个低秩矩阵 (其中 )来近似权重的更新:

其中 保持冻结,只 参与训练。

LoRA 的优势

  1. 参数效率:只需要微调原模型参数的 0.1-1%
  2. 训练快速:计算量大幅减少,可以在 CPU 上进行
  3. 内存占用少:无需存储全尺寸的梯度
  4. 模型切换简单:可以快速在不同任务之间切换

LoRA 在深度学习模型中的应用

python
import torch
import torch.nn as nn
from torch.nn.parameter import Parameter

class LoRALinear(nn.Module):
    """LoRA 线性层"""
    
    def __init__(self, in_features, out_features, rank=8, alpha=1.0):
        super().__init__()
        
        # 原始权重(冻结)
        self.weight = Parameter(torch.empty(out_features, in_features))
        self.register_buffer('weight_requires_grad', torch.tensor(False))
        
        # LoRA 参数
        self.lora_A = Parameter(torch.empty(in_features, rank))
        self.lora_B = Parameter(torch.empty(rank, out_features))
        self.lora_alpha = alpha
        self.lora_rank = rank
        
        # 初始化 LoRA 参数
        nn.init.kaiming_uniform_(self.lora_A, a=torch.sqrt(torch.tensor(5.0)))
        nn.init.zeros_(self.lora_B)
        
        # 冻结原始权重
        self.weight.requires_grad = False
        
        self.bias = Parameter(torch.empty(out_features))
        nn.init.zeros_(self.bias)
    
    def forward(self, x):
        """前向传播"""
        # 标准的线性变换
        out = torch.nn.functional.linear(x, self.weight, self.bias)
        
        # 添加 LoRA 调整
        lora_out = x @ self.lora_A @ self.lora_B
        
        # 缩放并累加
        out = out + (self.lora_alpha / self.lora_rank) * lora_out
        
        return out


class LoRAViT(nn.Module):
    """应用 LoRA 的 Vision Transformer"""
    
    def __init__(self, base_model, rank=8, alpha=1.0, lora_layers=None):
        super().__init__()
        self.base_model = base_model
        self.rank = rank
        self.alpha = alpha
        
        # 确定要应用 LoRA 的层
        if lora_layers is None:
            self.lora_layers = list(range(len(base_model.transformer.layers)))
        else:
            self.lora_layers = lora_layers
        
        # 在指定的层中应用 LoRA
        self._apply_lora()
    
    def _apply_lora(self):
        """在模型中应用 LoRA"""
        for i in self.lora_layers:
            layer = self.base_model.transformer.layers[i]
            
            # 获取原始的 attention 和 feedforward 层
            in_features_attn = layer.self_attn.embed_dim
            out_features_attn = layer.self_attn.embed_dim
            
            in_features_ff = layer.mlp.fc1.in_features
            out_features_ff = layer.mlp.fc1.out_features
            
            # 保存原始权重
            original_attn_weight = layer.self_attn.out_proj.weight.data.clone()
            original_ff_weight = layer.mlp.fc1.weight.data.clone()
            
            # 替换为 LoRA 层
            layer.self_attn.out_proj = LoRALinear(
                in_features_attn, out_features_attn, 
                rank=self.rank, alpha=self.alpha
            )
            layer.self_attn.out_proj.weight.data = original_attn_weight
            
            layer.mlp.fc1 = LoRALinear(
                in_features_ff, out_features_ff,
                rank=self.rank, alpha=self.alpha
            )
            layer.mlp.fc1.weight.data = original_ff_weight
    
    def forward(self, x):
        """前向传播"""
        return self.base_model(x)
    
    def get_trainable_parameters(self):
        """获取可训练参数"""
        trainable_params = []
        total_params = 0
        
        for name, param in self.named_parameters():
            total_params += param.numel()
            if param.requires_grad:
                trainable_params.append(param)
        
        trainable_count = sum(p.numel() for p in trainable_params)
        print(f"可训练参数: {trainable_count}")
        print(f"总参数: {total_params}")
        print(f"比率: {100 * trainable_count / total_params:.2f}%")
        
        return trainable_params


# LoRA 微调示例
def finetune_with_lora():
    # 加载预训练模型
    base_model = load_pretrained_model()
    
    # 应用 LoRA
    lora_model = LoRAViT(base_model, rank=8, alpha=16)
    
    # 获取可训练参数
    trainable_params = lora_model.get_trainable_parameters()
    
    # 设置优化器
    optimizer = torch.optim.AdamW(trainable_params, lr=5e-4)
    
    # 冻结基础模型
    for name, param in lora_model.base_model.named_parameters():
        if 'lora' not in name:
            param.requires_grad = False
    
    # 训练循环
    num_epochs = 10
    for epoch in range(num_epochs):
        for batch in train_loader:
            inputs, labels = batch
            
            outputs = lora_model(inputs)
            loss = criterion(outputs, labels)
            
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
        
        print(f"Epoch {epoch}, Loss: {loss.item():.4f}")
    
    return lora_model

LeBot 中的 LoRA 应用

1. 视觉模型微调

使用 LoRA 快速适配 LeBot 的视觉模型到特定的环境或任务。

python
class LeBotVisionAdapter:
    """LeBot 视觉模型适配器,使用 LoRA"""
    
    def __init__(self, base_vision_model, num_classes, rank=8):
        self.model = LoRAViT(base_vision_model, rank=rank)
        
        # 替换分类头
        in_features = base_vision_model.head.in_features
        self.model.base_model.head = nn.Linear(in_features, num_classes)
        
        self.optimizer = torch.optim.AdamW(
            self.model.get_trainable_parameters(), 
            lr=5e-4
        )
    
    def adapt_to_environment(self, adaptation_data, num_epochs=10):
        """
        适配模型到新环境
        
        Args:
            adaptation_data: 新环境中的图像和标签
            num_epochs: 训练轮次
        """
        for epoch in range(num_epochs):
            for images, labels in adaptation_data:
                outputs = self.model(images)
                loss = nn.functional.cross_entropy(outputs, labels)
                
                self.optimizer.zero_grad()
                loss.backward()
                self.optimizer.step()
            
            print(f"Adaptation Epoch {epoch}, Loss: {loss.item():.4f}")
    
    def save_lora_weights(self, path):
        """保存 LoRA 权重"""
        lora_weights = {}
        for name, param in self.model.named_parameters():
            if 'lora' in name:
                lora_weights[name] = param.data.cpu()
        
        torch.save(lora_weights, path)
    
    def load_lora_weights(self, path):
        """加载 LoRA 权重"""
        lora_weights = torch.load(path)
        for name, weights in lora_weights.items():
            self.model.state_dict()[name].copy_(weights)

2. 动作策略微调

使用 LoRA 快速适配强化学习策略到新的动作任务。

python
class LeBotPolicyAdapter:
    """LeBot 策略适配器,使用 LoRA"""
    
    def __init__(self, base_policy, new_task_dim, rank=4):
        self.base_policy = base_policy
        self.rank = rank
        
        # 为新任务添加 LoRA 层
        self._add_lora_to_policy()
    
    def _add_lora_to_policy(self):
        """为策略网络添加 LoRA 层"""
        for name, module in self.base_policy.named_modules():
            if isinstance(module, nn.Linear):
                in_feat = module.in_features
                out_feat = module.out_features
                
                # 创建 LoRA 层
                lora_layer = LoRALinear(in_feat, out_feat, rank=self.rank)
                
                # 保存原始权重
                lora_layer.weight.data = module.weight.data.clone()
                if module.bias is not None:
                    lora_layer.bias.data = module.bias.data.clone()
                
                # 替换
                setattr(self.base_policy, name, lora_layer)
    
    def finetune_for_new_task(self, task_data, num_epochs=5):
        """为新任务微调策略"""
        optimizer = torch.optim.Adam(
            [p for p in self.base_policy.parameters() if p.requires_grad],
            lr=5e-5
        )
        
        for epoch in range(num_epochs):
            for states, actions, rewards in task_data:
                action_probs = self.base_policy(states)
                loss = self._compute_policy_loss(action_probs, actions, rewards)
                
                optimizer.zero_grad()
                loss.backward()
                optimizer.step()
            
            print(f"Policy Adaptation Epoch {epoch}, Loss: {loss.item():.4f}")

总结

通过结合强化学习和 LoRA 微调,LeBot 能够:

  1. 自主学习:通过与环境的交互自主学习步行、转向等基本动作
  2. 快速适应:使用 LoRA 快速适应新的环境或任务,无需重新训练整个模型
  3. 参数高效:大幅减少可训练参数数量,加快训练速度
  4. 模型切换灵活:轻松在不同的任务和策略之间切换

这使得 LeBot 成为一个真正的自适应机器人系统。

推荐资源

由 LeBot 开发团队编写