强化学习与 LoRA 微调
强化学习基础
什么是强化学习
强化学习(Reinforcement Learning, RL)是机器学习的一个重要分支,其核心思想是通过与环境的交互,学习策略以最大化累积奖励。不同于监督学习需要标注数据,强化学习通过试错(Trial and Error)的方式学习。
强化学习的基本概念
1. 智能体(Agent)
智能体是学习的主体,可以是机器人、游戏AI等。LeBot 机器人在强化学习框架中就是一个智能体。
2. 环境(Environment)
环境是智能体与之交互的外部系统。环境会根据智能体的动作进行变化,并提供状态和奖励反馈。
Agent ←→ Environment3. 状态(State)
状态描述了环境的当前配置。对于 LeBot 机器人,状态可能包括:
- 各关节的角度
- 传感器读数(IMU、距离传感器等)
- 摄像头图像
- 目标位置
4. 动作(Action)
动作是智能体可以执行的操作。对于 LeBot 机器人,包括:
- 改变关节的目标角度
- 改变电机的转速
- 改变身体重心位置
5. 奖励(Reward)
奖励是环境对智能体动作的反馈信号。设计好的奖励函数至关重要。
r(t) = 是否完成任务? + 能量消耗 + 平衡度 + ...6. 策略(Policy)
策略是智能体根据当前状态选择动作的规则。
强化学习的循环过程
初始化状态 S₀
↓
Agent 根据策略选择动作 A₀
↓
环境执行动作,返回 (R₀, S₁)
↓
Agent 根据奖励更新策略
↓
重复上述过程直到收敛常见的强化学习算法
1. Q-Learning
Q-Learning 是一种无模型的强化学习算法,学习动作-价值函数 Q(s,a)。
python
import numpy as np
class QLearning:
def __init__(self, n_states, n_actions, learning_rate=0.1, discount_factor=0.99):
self.n_states = n_states
self.n_actions = n_actions
self.alpha = learning_rate
self.gamma = discount_factor
# 初始化 Q 表
self.Q = np.zeros((n_states, n_actions))
def select_action(self, state, epsilon=0.1):
"""ε-贪心策略选择动作"""
if np.random.random() < epsilon:
# 探索:随机选择
return np.random.randint(self.n_actions)
else:
# 开采:选择最优动作
return np.argmax(self.Q[state, :])
def update(self, state, action, reward, next_state, done):
"""更新 Q 值"""
if done:
target = reward
else:
target = reward + self.gamma * np.max(self.Q[next_state, :])
self.Q[state, action] += self.alpha * (target - self.Q[state, action])2. 策略梯度(Policy Gradient)
策略梯度方法直接优化策略网络的参数。
python
import torch
import torch.nn as nn
import torch.optim as optim
class PolicyNetwork(nn.Module):
def __init__(self, state_dim, action_dim, hidden_dim=64):
super().__init__()
self.fc1 = nn.Linear(state_dim, hidden_dim)
self.fc2 = nn.Linear(hidden_dim, hidden_dim)
self.fc3 = nn.Linear(hidden_dim, action_dim)
def forward(self, state):
x = torch.relu(self.fc1(state))
x = torch.relu(self.fc2(x))
action_probs = torch.softmax(self.fc3(x), dim=-1)
return action_probs
class PolicyGradient:
def __init__(self, state_dim, action_dim, learning_rate=0.01):
self.policy = PolicyNetwork(state_dim, action_dim)
self.optimizer = optim.Adam(self.policy.parameters(), lr=learning_rate)
self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
self.policy = self.policy.to(self.device)
def select_action(self, state):
"""选择动作"""
state_tensor = torch.FloatTensor(state).unsqueeze(0).to(self.device)
with torch.no_grad():
action_probs = self.policy(state_tensor)
action = torch.multinomial(action_probs, 1).item()
return action
def update(self, states, actions, rewards):
"""使用轨迹数据更新策略"""
states = torch.FloatTensor(states).to(self.device)
actions = torch.LongTensor(actions).to(self.device)
# 计算折扣奖励
discounted_rewards = []
cumulative_reward = 0
for r in reversed(rewards):
cumulative_reward = r + 0.99 * cumulative_reward
discounted_rewards.insert(0, cumulative_reward)
discounted_rewards = torch.FloatTensor(discounted_rewards).to(self.device)
discounted_rewards = (discounted_rewards - discounted_rewards.mean()) / (discounted_rewards.std() + 1e-8)
# 计算损失
action_probs = self.policy(states)
log_probs = torch.log(action_probs.gather(1, actions.unsqueeze(1)))
loss = -(log_probs * discounted_rewards.unsqueeze(1)).mean()
# 反向传播
self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()
return loss.item()3. Actor-Critic 方法
Actor-Critic 结合了策略梯度和价值函数,具有更好的学习效率。
python
class ActorCriticNetwork(nn.Module):
def __init__(self, state_dim, action_dim, hidden_dim=64):
super().__init__()
# Actor 网络
self.actor_fc1 = nn.Linear(state_dim, hidden_dim)
self.actor_fc2 = nn.Linear(hidden_dim, hidden_dim)
self.actor_fc3 = nn.Linear(hidden_dim, action_dim)
# Critic 网络
self.critic_fc1 = nn.Linear(state_dim, hidden_dim)
self.critic_fc2 = nn.Linear(hidden_dim, hidden_dim)
self.critic_fc3 = nn.Linear(hidden_dim, 1)
def forward(self, state):
# Actor 输出
actor_x = torch.relu(self.actor_fc1(state))
actor_x = torch.relu(self.actor_fc2(actor_x))
action_probs = torch.softmax(self.actor_fc3(actor_x), dim=-1)
# Critic 输出
critic_x = torch.relu(self.critic_fc1(state))
critic_x = torch.relu(self.critic_fc2(critic_x))
state_value = self.critic_fc3(critic_x)
return action_probs, state_value
class ActorCritic:
def __init__(self, state_dim, action_dim, learning_rate=0.01):
self.model = ActorCriticNetwork(state_dim, action_dim)
self.optimizer = optim.Adam(self.model.parameters(), lr=learning_rate)
self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
self.model = self.model.to(self.device)
self.gamma = 0.99
def select_action(self, state):
"""选择动作"""
state_tensor = torch.FloatTensor(state).unsqueeze(0).to(self.device)
with torch.no_grad():
action_probs, _ = self.model(state_tensor)
action = torch.multinomial(action_probs, 1).item()
log_prob = torch.log(action_probs[0, action])
return action, log_prob
def update(self, state, action, reward, next_state, done, log_prob):
"""更新 Actor-Critic"""
state_tensor = torch.FloatTensor(state).unsqueeze(0).to(self.device)
next_state_tensor = torch.FloatTensor(next_state).unsqueeze(0).to(self.device)
_, state_value = self.model(state_tensor)
with torch.no_grad():
_, next_state_value = self.model(next_state_tensor)
# 计算时序差分(TD)误差
if done:
target = reward
else:
target = reward + self.gamma * next_state_value.item()
advantage = target - state_value.item()
# Actor 损失
actor_loss = -log_prob * advantage
# Critic 损失
critic_loss = 0.5 * (target - state_value) ** 2
# 总损失
total_loss = actor_loss + critic_loss
self.optimizer.zero_grad()
total_loss.backward()
self.optimizer.step()
return total_loss.item()LeBot 的强化学习应用
1. 步行策略学习
LeBot 机器人可以通过强化学习自主学习步行策略。
python
import gym
from gym import spaces
class LeBotWalkingEnv(gym.Env):
"""LeBot 步行环境"""
def __init__(self):
super().__init__()
# 状态空间:关节角度、角速度、IMU 数据等
self.state_dim = 20 # 假设有 20 维状态
self.action_dim = 8 # 假设有 8 个可控制的电机
self.observation_space = spaces.Box(
low=-np.inf, high=np.inf, shape=(self.state_dim,), dtype=np.float32
)
self.action_space = spaces.Box(
low=-1, high=1, shape=(self.action_dim,), dtype=np.float32
)
self.robot = RobotController() # 机器人控制器
self.step_count = 0
self.max_steps = 1000
def reset(self):
"""重置环境"""
self.robot.reset_to_default_pose()
self.step_count = 0
state = self._get_state()
return state
def step(self, action):
"""执行一步"""
# 将动作映射到电机命令
joint_angles = self._action_to_joint_angles(action)
# 执行动作
self.robot.set_joint_angles(joint_angles, duration=0.1)
# 获取新状态
new_state = self._get_state()
# 计算奖励
reward = self._compute_reward(new_state, action)
# 检查是否完成
done = self._is_done()
self.step_count += 1
return new_state, reward, done, {}
def _get_state(self):
"""获取当前状态"""
joint_angles = self.robot.get_joint_angles()
joint_velocities = self.robot.get_joint_velocities()
imu_data = self.robot.get_imu_data()
state = np.concatenate([joint_angles, joint_velocities, imu_data])
return state.astype(np.float32)
def _action_to_joint_angles(self, action):
"""将动作转换为关节角度"""
# 动作是 -1 到 1 之间的值
# 需要映射到实际的关节角度范围
joint_angles = []
for i, a in enumerate(action):
# 获取第 i 个关节的范围
joint_range = self.robot.get_joint_range(i)
mid_point = (joint_range[0] + joint_range[1]) / 2
scale = (joint_range[1] - joint_range[0]) / 2
angle = mid_point + a * scale
joint_angles.append(angle)
return joint_angles
def _compute_reward(self, state, action):
"""计算奖励"""
# 获取机器人的当前位置和速度
position = self.robot.get_base_position()
velocity = self.robot.get_base_velocity()
# 奖励函数的几个部分:
# 1. 前进速度奖励
forward_velocity = velocity[0]
forward_reward = forward_velocity
# 2. 能量消耗惩罚
action_cost = -0.1 * np.sum(action ** 2)
# 3. 平衡奖励(保持身体直立)
orientation = self.robot.get_orientation()
roll, pitch = orientation[0], orientation[1]
balance_reward = -0.1 * (abs(roll) + abs(pitch))
# 4. 平滑度奖励(减少抖动)
smoothness_reward = -0.01 * np.sum(np.diff(action) ** 2)
total_reward = forward_reward + action_cost + balance_reward + smoothness_reward
return total_reward
def _is_done(self):
"""检查是否完成"""
# 如果机器人摔倒
if self.robot.is_fallen():
return True
# 如果达到最大步数
if self.step_count >= self.max_steps:
return True
return False
def render(self, mode='human'):
"""可视化"""
# 可选:显示机器人状态
pass
# 训练步行策略
def train_walking_policy():
env = LeBotWalkingEnv()
agent = ActorCritic(env.observation_space.shape[0], env.action_space.shape[0])
num_episodes = 1000
for episode in range(num_episodes):
state = env.reset()
episode_reward = 0
for step in range(1000):
action, log_prob = agent.select_action(state)
next_state, reward, done, _ = env.step(action)
agent.update(state, action, reward, next_state, done, log_prob)
episode_reward += reward
state = next_state
if done:
break
if (episode + 1) % 10 == 0:
print(f"Episode {episode + 1}, Reward: {episode_reward:.2f}")
return agentLoRA 微调
什么是 LoRA
LoRA(Low-Rank Adaptation)是一种参数高效的微调方法。与传统的全参数微调不同,LoRA 只在模型中添加少量的可训练参数,大大降低了计算成本和内存需求。
LoRA 的基本原理
对于一个预训练的权重矩阵 ,标准的微调会更新所有参数。而 LoRA 则使用两个低秩矩阵 和 (其中 )来近似权重的更新:
其中 保持冻结,只 和 参与训练。
LoRA 的优势
- 参数效率:只需要微调原模型参数的 0.1-1%
- 训练快速:计算量大幅减少,可以在 CPU 上进行
- 内存占用少:无需存储全尺寸的梯度
- 模型切换简单:可以快速在不同任务之间切换
LoRA 在深度学习模型中的应用
python
import torch
import torch.nn as nn
from torch.nn.parameter import Parameter
class LoRALinear(nn.Module):
"""LoRA 线性层"""
def __init__(self, in_features, out_features, rank=8, alpha=1.0):
super().__init__()
# 原始权重(冻结)
self.weight = Parameter(torch.empty(out_features, in_features))
self.register_buffer('weight_requires_grad', torch.tensor(False))
# LoRA 参数
self.lora_A = Parameter(torch.empty(in_features, rank))
self.lora_B = Parameter(torch.empty(rank, out_features))
self.lora_alpha = alpha
self.lora_rank = rank
# 初始化 LoRA 参数
nn.init.kaiming_uniform_(self.lora_A, a=torch.sqrt(torch.tensor(5.0)))
nn.init.zeros_(self.lora_B)
# 冻结原始权重
self.weight.requires_grad = False
self.bias = Parameter(torch.empty(out_features))
nn.init.zeros_(self.bias)
def forward(self, x):
"""前向传播"""
# 标准的线性变换
out = torch.nn.functional.linear(x, self.weight, self.bias)
# 添加 LoRA 调整
lora_out = x @ self.lora_A @ self.lora_B
# 缩放并累加
out = out + (self.lora_alpha / self.lora_rank) * lora_out
return out
class LoRAViT(nn.Module):
"""应用 LoRA 的 Vision Transformer"""
def __init__(self, base_model, rank=8, alpha=1.0, lora_layers=None):
super().__init__()
self.base_model = base_model
self.rank = rank
self.alpha = alpha
# 确定要应用 LoRA 的层
if lora_layers is None:
self.lora_layers = list(range(len(base_model.transformer.layers)))
else:
self.lora_layers = lora_layers
# 在指定的层中应用 LoRA
self._apply_lora()
def _apply_lora(self):
"""在模型中应用 LoRA"""
for i in self.lora_layers:
layer = self.base_model.transformer.layers[i]
# 获取原始的 attention 和 feedforward 层
in_features_attn = layer.self_attn.embed_dim
out_features_attn = layer.self_attn.embed_dim
in_features_ff = layer.mlp.fc1.in_features
out_features_ff = layer.mlp.fc1.out_features
# 保存原始权重
original_attn_weight = layer.self_attn.out_proj.weight.data.clone()
original_ff_weight = layer.mlp.fc1.weight.data.clone()
# 替换为 LoRA 层
layer.self_attn.out_proj = LoRALinear(
in_features_attn, out_features_attn,
rank=self.rank, alpha=self.alpha
)
layer.self_attn.out_proj.weight.data = original_attn_weight
layer.mlp.fc1 = LoRALinear(
in_features_ff, out_features_ff,
rank=self.rank, alpha=self.alpha
)
layer.mlp.fc1.weight.data = original_ff_weight
def forward(self, x):
"""前向传播"""
return self.base_model(x)
def get_trainable_parameters(self):
"""获取可训练参数"""
trainable_params = []
total_params = 0
for name, param in self.named_parameters():
total_params += param.numel()
if param.requires_grad:
trainable_params.append(param)
trainable_count = sum(p.numel() for p in trainable_params)
print(f"可训练参数: {trainable_count}")
print(f"总参数: {total_params}")
print(f"比率: {100 * trainable_count / total_params:.2f}%")
return trainable_params
# LoRA 微调示例
def finetune_with_lora():
# 加载预训练模型
base_model = load_pretrained_model()
# 应用 LoRA
lora_model = LoRAViT(base_model, rank=8, alpha=16)
# 获取可训练参数
trainable_params = lora_model.get_trainable_parameters()
# 设置优化器
optimizer = torch.optim.AdamW(trainable_params, lr=5e-4)
# 冻结基础模型
for name, param in lora_model.base_model.named_parameters():
if 'lora' not in name:
param.requires_grad = False
# 训练循环
num_epochs = 10
for epoch in range(num_epochs):
for batch in train_loader:
inputs, labels = batch
outputs = lora_model(inputs)
loss = criterion(outputs, labels)
optimizer.zero_grad()
loss.backward()
optimizer.step()
print(f"Epoch {epoch}, Loss: {loss.item():.4f}")
return lora_modelLeBot 中的 LoRA 应用
1. 视觉模型微调
使用 LoRA 快速适配 LeBot 的视觉模型到特定的环境或任务。
python
class LeBotVisionAdapter:
"""LeBot 视觉模型适配器,使用 LoRA"""
def __init__(self, base_vision_model, num_classes, rank=8):
self.model = LoRAViT(base_vision_model, rank=rank)
# 替换分类头
in_features = base_vision_model.head.in_features
self.model.base_model.head = nn.Linear(in_features, num_classes)
self.optimizer = torch.optim.AdamW(
self.model.get_trainable_parameters(),
lr=5e-4
)
def adapt_to_environment(self, adaptation_data, num_epochs=10):
"""
适配模型到新环境
Args:
adaptation_data: 新环境中的图像和标签
num_epochs: 训练轮次
"""
for epoch in range(num_epochs):
for images, labels in adaptation_data:
outputs = self.model(images)
loss = nn.functional.cross_entropy(outputs, labels)
self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()
print(f"Adaptation Epoch {epoch}, Loss: {loss.item():.4f}")
def save_lora_weights(self, path):
"""保存 LoRA 权重"""
lora_weights = {}
for name, param in self.model.named_parameters():
if 'lora' in name:
lora_weights[name] = param.data.cpu()
torch.save(lora_weights, path)
def load_lora_weights(self, path):
"""加载 LoRA 权重"""
lora_weights = torch.load(path)
for name, weights in lora_weights.items():
self.model.state_dict()[name].copy_(weights)2. 动作策略微调
使用 LoRA 快速适配强化学习策略到新的动作任务。
python
class LeBotPolicyAdapter:
"""LeBot 策略适配器,使用 LoRA"""
def __init__(self, base_policy, new_task_dim, rank=4):
self.base_policy = base_policy
self.rank = rank
# 为新任务添加 LoRA 层
self._add_lora_to_policy()
def _add_lora_to_policy(self):
"""为策略网络添加 LoRA 层"""
for name, module in self.base_policy.named_modules():
if isinstance(module, nn.Linear):
in_feat = module.in_features
out_feat = module.out_features
# 创建 LoRA 层
lora_layer = LoRALinear(in_feat, out_feat, rank=self.rank)
# 保存原始权重
lora_layer.weight.data = module.weight.data.clone()
if module.bias is not None:
lora_layer.bias.data = module.bias.data.clone()
# 替换
setattr(self.base_policy, name, lora_layer)
def finetune_for_new_task(self, task_data, num_epochs=5):
"""为新任务微调策略"""
optimizer = torch.optim.Adam(
[p for p in self.base_policy.parameters() if p.requires_grad],
lr=5e-5
)
for epoch in range(num_epochs):
for states, actions, rewards in task_data:
action_probs = self.base_policy(states)
loss = self._compute_policy_loss(action_probs, actions, rewards)
optimizer.zero_grad()
loss.backward()
optimizer.step()
print(f"Policy Adaptation Epoch {epoch}, Loss: {loss.item():.4f}")总结
通过结合强化学习和 LoRA 微调,LeBot 能够:
- 自主学习:通过与环境的交互自主学习步行、转向等基本动作
- 快速适应:使用 LoRA 快速适应新的环境或任务,无需重新训练整个模型
- 参数高效:大幅减少可训练参数数量,加快训练速度
- 模型切换灵活:轻松在不同的任务和策略之间切换
这使得 LeBot 成为一个真正的自适应机器人系统。
推荐资源
- LoRA 原论文:https://arxiv.org/abs/2106.09685
- Hugging Face LoRA 实现:https://github.com/huggingface/peft
- OpenAI 强化学习教程:https://spinningup.openai.com/
- PyTorch RL 库:https://pytorch.org/rl/