前言
过去一周,我司「七月在线」长沙分部的具身团队在机械臂和人形上并行发力
- 关于机械臂
一方面,在IL和VLA的路线下,先后采集了抓杯子、桌面收纳、插入耳机孔的数据,然后云端训-本地5090推理
二方面,在RL的路线下,通过复现UC伯克利的HIL-SERL先后在仿真、真机上抓方块 - 关于人形
一者,对于manipulation,本周暂先实现了通过VR遥操宇树G1采集数据
二者,对于locomotion,我司长沙具身团队的其中一个小组准备复现基于PBHC的KungfuBot
故为了和团队更好的复现之,我在解读完该论文之后,准备把其源码也好好解析下
而根据此文《KungfuBot——基于物理约束和自适应运动追踪的人形全身控制PBHC,用于学习打拳或跳舞(即RL下的动作模仿和运控)》可知,KungfuBot中的RL模块是基于ASAP实现的
故为了更好的复现KungfuBot,本文先来解读下ASAP目前已经对外开源的代码
根据ASAP的GitHub代码仓库可知,ASAP专注于学习敏捷的人形机器人全身技能,构建在HumanoidVerse多模拟器框架之上,其
- 支持多种物理模拟器:IsaacGym、IsaacSim、Genesis
- 实现人形机器人的运动追踪和全身控制
- 提供从仿真到现实的转移能力
- 模块化设计,支持算法、环境、模拟器的分离
┌─────────────────────────────────────────────────┐
│ 配置系统 (config/) │
│ base.yaml → algo/ → env/ → robot/ → rewards/ │
└─────────────┬───────────────────────────────────┘
│ Hydra instantiate()
▼
┌─────────────────────────────────────────────────┐
│ 环境层 (envs/) │
│ BaseTask ← locomotion/motion_tracking │
└─────────────┬───────────────────────────────────┘
│ composition
▼
┌─────────────────────────────────────────────────┐
│ 模拟器层 (simulator/) │
│ BaseSimulator ← IsaacGym/IsaacSim/Genesis │
└─────────────┬───────────────────────────────────┘
│ used by
▼
┌─────────────────────────────────────────────────┐
│ 算法层 (agents/) │
│ BaseAlgo ← PPO/DAGGER/DeltaA/ForceControl │
└─────────────────────────────────────────────────┘
运行时调用顺序如下
1. train_agent.py::main()
2. ├── config加载 (Hydra)
3. ├── 模拟器选择 (IsaacGym/IsaacSim/Genesis)
4. ├── env实例化 (BaseTask子类)
5. │ ├── simulator实例化 (BaseSimulator子类)
6. │ ├── robot配置加载
7. │ └── terrain/obs/rewards设置
8. ├── algo实例化 (BaseAlgo子类)
9. │ ├── actor/critic网络创建
10. │ └── 数据缓冲区初始化
11. └── algo.learn() 训练循环
第一部分 算法模块 agents/
包含
- train_agent.py: 训练智能体的主入口
train_agent.py: ├── hydra → 配置加载 ├── utils.config_utils → 配置预处理 ├── BaseTask → 环境创建 ├── BaseAlgo → 算法创建 └── wandb → 日志记录
- eval_agent.py: 评估智能体的主入口
提供实时键盘控制接口
支持力控制调试功能
可视化评估结果
agents模块的完整目录如下
agents/
├── base_algo/ # 算法基类定义
│ └── base_algo.py # BaseAlgo抽象基类
├── modules/ # 神经网络核心组件
│ ├── ppo_modules.py # PPO Actor/Critic网络
│ ├── modules.py # 通用神经网络模块
│ ├── encoder_modules.py # 编码器模块
│ ├── world_models.py # 世界模型实现
│ └── data_utils.py # 数据存储和GAE计算
├── callbacks/ # 实时分析和可视化系统
│ ├── base_callback.py # 回调基类
│ ├── analysis_plot_*.py # 各类分析绘图回调
│ └── analysis_plot_template.html # 实时可视化模板
├── ppo/ # 标准PPO算法
├── dagger/ # 模仿学习算法
├── decouple/ # 解耦合控制算法
├── delta_a/ # ASAP核心创新:增量学习
├── delta_dynamics/ # Delta动力学模型
├── force_control/ # 力感知控制算法
├── mppi/ # 模型预测路径积分控制
└── ppo_locomanip.py # 运动操作专用PPO
1.1 模块组件agents/modules/
1.1.1 ppo_modules.py: PPO算法的Actor-Critic网络
这段代码实现了PPO(Proximal Policy Optimization)算法中的核心模块,包括策略网络(Actor)、价值网络(Critic)以及一个特殊的Actor变体
- PPOActor(策略网络):根据观测生成动作分布,用于采样动作
主要成员变量:
`self.actor_module`:底层神经网络(由BaseModule构建)
`self.std`:动作噪声的可学习参数(标准差)
`self.distribution`:当前动作的概率分布(Normal分布)
具体实现了以下主要方法:
update_distribution(actor_obs):用当前观测生成动作分布(均值为网络输出,方差为std)
act(actor_obs, **kwargs):采样动作(先更新分布,再采样)# 根据观测更新动作分布 def update_distribution(self, actor_obs): # 通过网络获得动作均值 mean = self.actor(actor_obs) # 构造正态分布 self.distribution = Normal(mean, mean*0. + self.std)
get_actions_log_prob(actions):计算给定动作的对数概率# 根据观测采样动作 def act(self, actor_obs, **kwargs): # 更新分布 self.update_distribution(actor_obs) # 从分布中采样动作 return self.distribution.sample()
act_inference(actor_obs):推理时直接输出均值(确定性动作)action_mean`/`action_std`/`entropy:分别返回分布的均值、标准差、熵# 计算给定动作的对数概率 def get_actions_log_prob(self, actions): return self.distribution.log_prob(actions).sum(dim=-1)
to_cpu():将模型和参数转到CPU# 推理时直接输出动作均值 def act_inference(self, actor_obs): actions_mean = self.actor(actor_obs) return actions_mean
- PPOCritic(价值网络):根据观测输出状态价值V(s)
主要成员变量:`self.critic_module`:底层神经网络(由BaseModule构建)
实现的主要方法有:
`evaluate(critic_obs, **kwargs)`:输入观测,输出状态价值
`critic`属性:返回底层网络# 输入观测,输出状态价值 def evaluate(self, critic_obs, **kwargs): value = self.critic(critic_obs) return value
1.1.2 modules.py:通用神经网络模块
1.1.3 encoder_modules.py:编码器模块
1.1.4 world_models.py:世界模型实现
1.1.5 data_utils.py: 数据处理工具,包含经验回放缓冲区
// 待更
1.2 回调系统 agents/callbacks/
涉及一系列分析和可视化——支持力估计、运动分析、开环跟踪等多种分析
- analysis_plot_.py: 各种分析图表生成
- base_callback.py: 回调基类
1.3 模仿学习:DAgger 算法实现(dagger.py)
本文件实现了 DAgger(Dataset Aggregation)模仿学习算法,是 ASAP 框架中用于“模仿学习/专家演示学习”的核心类。它让智能体在自己的策略下探索环境,并在这些状态下查询专家动作,逐步聚合数据,提升策略泛化能力
整个算法的流程为
- 初始化:加载学生和专家网络,准备数据存储
- 训练循环:
用学生策略探索环境,记录每一步的状态和专家动作
用专家动作作为标签,训练学生策略模仿专家
记录和可视化训练过程 - 评估:支持回调式评估和推理
DAgger(BaseAlgo)继承自 BaseAlgo,拥有统一的 setup、learn、evaluate_policy 等接口,其关键成员变量包括
- self.actor:待训练的策略网络
- self.gt_actor:专家(教师)策略网络
- self.optimizer:优化器
- self.storage:数据存储(经验回放)
- self.writer:Tensorboard 日志
- self.eval_callbacks:评估回调
- self.episode_env_tensors:环境统计
1.3.1 setup:模仿学习中"学生-教师"双网络架构的实现
该方法主要
- 网络初始化:创建学生网络(需要训练的actor)和教师网络(提供专家示范的gt_actor)
- 优化器设置:为学生网络配置Adam优化器
- 存储初始化:设置用于存储轨迹数据的RolloutStorage,注册各种数据类型的存储键
- 统计跟踪:初始化用于跟踪训练过程中各种统计信息的变量和缓冲区
即初始化 RolloutStorage,注册观测、动作、专家动作等数据字段
具体而言
- 方法开始时通过日志记录初始化过程,然后创建两个关键的神经网络组件
第一个是学习者网络 (`self.actor`),它是需要被训练的策略网络,使用当前配置中的网络架构参数进行初始化
第二个是专家网络 (`self.gt_actor`),它代表"ground truth"或教师策略,不仅使用专门的网络配置,还通过 `teacher_actor_network_load_dict` 加载预训练的权重def setup(self): logger.info("Setting up Dagger") # 记录开始设置DAgger算法的日志信息 # 从配置中提取演员网络的配置字典 actor_network_dict = dict(actor=self.config.network_dict['actor'])
最终,分别创建学生actor、教师gt_actor# 从配置中提取教师演员网络的配置字典 teacher_actor_network_dict = dict(actor=self.config.network_dict['teacher_actor']) # 从配置中提取教师演员网络的加载配置字典 teacher_actor_network_load_dict = dict(actor=self.config.network_load_dict['teacher_actor'])
这种设计使得专家网络能够在训练过程中为学习者提供高质量的示范动作# 创建学生演员网络(PPO演员,固定标准差) self.actor = PPOActorFixSigma(self.algo_obs_dim_dict, actor_network_dict, {}, self.num_act) # 创建教师演员网络(ground truth演员,用于提供专家示范) self.gt_actor = PPOActorFixSigma(self.algo_obs_dim_dict, teacher_actor_network_dict, teacher_actor_network_load_dict, self.num_act)
- 网络初始化完成后,代码将两个网络都移动到指定的计算设备上,确保训练过程中的计算效率
# 将学生演员网络移动到指定设备(CPU或GPU) self.actor.to(self.device) # 将教师演员网络移动到指定设备(CPU或GPU) self.gt_actor.to(self.device)
- 接下来,为学习者网络创建 Adam 优化器,这是训练过程中唯一需要更新参数的网络,而专家网络的参数保持固定
# 创建Adam优化器来训练学生演员网络 self.optimizer = optim.Adam(self.actor.parameters(), lr=self.learning_rate) logger.info(f"Setting up Storage") # 记录开始设置存储的日志信息
- 然后,方法初始化了一个专门的存储系统 `RolloutStorage`,用于管理训练数据的收集和批处理
存储系统的设计特别体现了 DAgger 算法的数据需求
首先,它为所有观测变量分配存储空间
然后注册两套完整的动作相关数据:# 创建回合存储对象,用于存储轨迹数据 self.storage = RolloutStorage(self.env.num_envs, self.num_steps_per_env) ## Register obs keys # 注册观测键 # 遍历算法观测维度字典,为每个观测类型注册存储键 for obs_key, obs_dim in self.algo_obs_dim_dict.items(): self.storage.register_key(obs_key, shape=(obs_dim,), dtype=torch.float)
一套用于学习者的动作、动作概率、动作均值和方差
# 注册学生演员的动作存储键 self.storage.register_key('actions', shape=(self.num_act,), dtype=torch.float) # 注册学生演员的动作对数概率存储键 self.storage.register_key('actions_log_prob', shape=(1,), dtype=torch.float) # 注册学生演员的动作均值存储键 self.storage.register_key('action_mean', shape=(self.num_act,), dtype=torch.float) # 注册学生演员的动作标准差存储键 self.storage.register_key('action_sigma', shape=(self.num_act,), dtype=torch.float)
另一套用于专家的对应数据(以 `gt_` 前缀标识)
这种并行存储机制使得算法能够在每个时间步同时记录学习者的行为和专家在相同状态下的示范行为,为后续的监督学习提供配对的训练数据# 注册教师演员的动作存储键(ground truth actions) self.storage.register_key('gt_actions', shape=(self.num_act,), dtype=torch.float) # 注册教师演员的动作对数概率存储键 self.storage.register_key('gt_actions_log_prob', shape=(1,), dtype=torch.float) # 注册教师演员的动作均值存储键 self.storage.register_key('gt_action_mean', shape=(self.num_act,), dtype=torch.float) # 注册教师演员的动作标准差存储键 self.storage.register_key('gt_action_sigma', shape=(self.num_act,), dtype=torch.float)
- 最后,方法初始化了训练过程中的统计和监控组件
这包括用于记录回合信息的列表,以及使用双端队列实现的滑动窗口统计,可以跟踪最近100个回合的奖励和长度
同时,为每个并行环境分配了当前奖励总和和回合长度的张量,支持多环境并行训练的实时统计# 初始化回合信息列表,用于存储每个回合的统计信息 self.ep_infos = [] # 创建奖励缓冲区,最多存储100个回合的奖励(用于计算平均奖励) self.rewbuffer = deque(maxlen=100) # 创建长度缓冲区,最多存储100个回合的长度(用于计算平均长度) self.lenbuffer = deque(maxlen=100)
# 初始化当前奖励累计张量,为每个环境跟踪当前回合的奖励总和 self.cur_reward_sum = torch.zeros(self.env.num_envs, dtype=torch.float, device=self.device) # 初始化当前回合长度张量,为每个环境跟踪当前回合的步数 self.cur_episode_length = torch.zeros(self.env.num_envs, dtype=torch.float, device=self.device)
1.3.2 learn:训练主循环,分为数据采集和训练两个阶段
每次迭代:
- _rollout_step:用当前策略与环境交互,收集状态、动作、专家动作等数据
- _training_step:用行为克隆损失(BC Loss)训练 actor,使其输出尽量接近专家动作
- 日志记录与模型保存
具体而言
- 首先,如果设置了 `init_at_random_ep_len`,会将环境的 episode 长度缓冲区随机初始化,以增加训练多样性
接着,环境被重置,获得初始观测,并将所有观测张量移动到指定设备(如 GPU),确保后续计算高效def learn(self): # 如果需要在随机的episode长度初始化,则对环境的episode长度缓冲区进行随机赋值 if self.init_at_random_ep_len: self.env.episode_length_buf = torch.randint_like(self.env.episode_length_buf, high=int(self.env.max_episode_length))
# 重置所有环境,获取初始观测 obs_dict = self.env.reset_all() # 将所有观测数据转移到指定设备(如GPU或CPU) for obs_key in obs_dict.keys(): obs_dict[obs_key] = obs_dict[obs_key].to(self.device)
- 随后,模型被切换到训练模式
主循环根据 `num_learning_iterations` 控制迭代次数,每次迭代都记录起始时间# 设置模型为训练模式 self._train_mode()
每一轮,首先通过 `_rollout_step` 与环境交互,采集一批新数据,并返回最新的观测# 获取本次学习的迭代次数 num_learning_iterations = self.num_learning_iterations # 计算总的迭代次数 tot_iter = self.current_learning_iteration + num_learning_iterations # 遍历每一次学习迭代 # for it in track(range(self.current_learning_iteration, tot_iter), description="Learning Iterations"): for it in range(self.current_learning_iteration, tot_iter): # 记录本次迭代的起始时间 self.start_time = time.time()
然后调用 `_training_step`,对采集到的数据进行行为克隆(BC)训练,得到平均损失# 进行一次rollout,返回新的观测(必须更新,否则一直用初始观测) obs_dict = self._rollout_step(obs_dict)
完成训练后,记录本轮训练所用时间# 执行一次训练步骤,返回平均BC损失 mean_bc_loss = self._training_step()
# 记录本次迭代的结束时间 self.stop_time = time.time() # 计算本次学习所用时间 self.learn_time = self.stop_time - self.start_time
- 每轮结束后,会整理日志信息(如当前迭代数、采集和训练时间、平均损失、奖励等)
并调用 `_post_epoch_logging` 进行可视化和记录
如果当前迭代数满足保存间隔条件,还会保存模型检查点
每轮结束后,episode 相关信息会被清空,为下次迭代做准备 - 循环结束后,更新当前的学习迭代计数,并再次保存最终模型
整体来看,该方法实现了 DAgger 算法的标准训练流程,包括数据采集、模型训练、日志记录和模型保存等关键步骤
1.3.3 _rollout_step:actor 采样动作与环境交互,gt_actor计算对应的专家动作
- 用 actor 采样动作与环境交互,同时用 gt_actor 计算同一状态下的专家动作
- 将观测、动作、专家动作等信息存入 storage
- 统计奖励、episode 长度等信息
具体而言
- 它首先在 `torch.inference_mode()` 下运行,确保在采样阶段不会计算梯度,提高效率
每次循环代表一个环境步,循环次数由 `self.num_steps_per_env` 决定def _rollout_step(self, obs_dict): # 在推理模式下,不计算梯度,提高效率 with torch.inference_mode():
# 按每个环境步数进行循环 for i in range(self.num_steps_per_env):
- 在每一步中,方法会通过 `self.actor`(当前策略)和 `self.gt_actor`(专家策略或教师策略)分别计算动作及其均值
并断言这些张量的形状符合预期# 用当前actor网络根据观测计算动作,并分离计算图 actions = self.actor.act(obs_dict).detach() # 获取actor输出的动作均值,并分离计算图 action_mean = self.actor.action_mean.detach() # 用教师actor(专家策略)计算动作,并分离计算图 gt_actions = self.gt_actor.act(obs_dict).detach() # 获取教师actor输出的动作均值,并分离计算图 gt_action_mean = self.gt_actor.action_mean.detach()
随后,这些动作及相关统计信息会被打包进 `policy_state_dict`,并与观测数据一起存入 `self.storage`,为后续训练做准备# 将当前观测数据存入存储器 for obs_key in obs_dict.keys(): self.storage.update_key(obs_key, obs_dict[obs_key]) # 将策略相关数据存入存储器 for obs_ in policy_state_dict.keys(): self.storage.update_key(obs_, policy_state_dict[obs_])
- 接下来,方法使用当前策略的动作与环境交互,获得新的观测、奖励、done 标志和额外信息
所有张量会被转移到指定设备(如 GPU),并将日志信息添加到`self.episode_env_tensors`# 从策略字典中取出动作,用于与环境交互 actions = policy_state_dict["actions"] # 用动作与环境交互,获得新的观测、奖励、done标志和信息 obs_dict, rewards, dones, infos = self.env.step(actions)
步数计数器递增,并调用 `_process_env_step` 处理环境步(如重置策略状态) - 如果启用了日志目录,还会进行奖励和回合长度的统计与记录。每当环境回合结束(done),就会将累计奖励和长度存入缓冲区,并重置相应计数
- 最后,方法记录采样时间,为后续训练阶段做准备,并返回最新的观测字典
# 返回最新的观测字典 return obs_dict
1.3.4 _training_step:计算行为克隆损失BC Loss
- 从 storage 生成小批量数据
- 计算行为克隆损失(BC Loss):均方误差,目标是让 actor 输出接近专家动作
- 反向传播并优化 actor 网络
具体而言
- 首先,方法初始化 `mean_bc_loss` 用于累计每个 mini-batch 的 BC 损失
它通过 `self.storage.mini_batch_generator` 生成多个 mini-batch,每个 batch 包含采集到的观测和动作数据,并支持多轮训练(由 `num_learning_epochs` 控制)# 定义训练步骤函数 def _training_step(self): # 初始化平均行为克隆损失为0 mean_bc_loss = 0.0
# 生成小批量数据生成器 generator = self.storage.mini_batch_generator(self.num_mini_batches, self.num_learning_epochs)
- 在每个 mini-batch 上,方法会将所有数据移动到指定设备(如 GPU),确保后续计算高效
然后,从 batch 中取出专家动作 `gt_actions_batch`# 遍历每个小批量数据 for obs_dict in generator: # 遍历观测字典的每个键 for obs_key in obs_dict.keys(): # 将数据移动到指定设备(如GPU) obs_dict[obs_key] = obs_dict[obs_key].to(self.device)
并用当前策略 `self.actor` 对 batch 观测进行前向推理,得到当前策略的动作均值 `mu_batch`# 获取教师动作批次 gt_actions_batch = obs_dict['gt_actions']
# 前向传播,计算当前策略的动作均值 self.actor.act(obs_dict,) # 获取当前策略输出的动作均值 mu_batch = self.actor.action_mean
- BC 损失的计算方式是当前策略动作均值与专家动作之间的均方误差(MSE),即 `torch.square(mu_batch - gt_actions_batch).mean()`
损失乘以配置中的系数后# 计算行为克隆损失(均方误差) bc_loss = torch.square(mu_batch - gt_actions_batch).mean()
进行反向传播和梯度裁剪(防止梯度爆炸),最后更新策略网络参数# 乘以损失系数,得到最终损失 loss = self.config.bc_loss_coef * bc_loss
每个 batch 的 BC 损失会累加到 `mean_bc_loss`# Gradient step # 梯度清零 self.optimizer.zero_grad() # 反向传播计算梯度 loss.backward() # 梯度裁剪,防止梯度爆炸 nn.utils.clip_grad_norm_(self.actor.parameters(), self.max_grad_norm) # 优化器更新参数 self.optimizer.step()
# 累加当前批次的bc损失 mean_bc_loss += bc_loss.item()
- 所有 mini-batch 训练完成后,方法会对累计损失取平均,并清空存储器,为下一个训练周期做准备
# 计算总的更新次数 num_updates = self.num_learning_epochs * self.num_mini_batches # 计算平均bc损失 mean_bc_loss /= num_updates # 清空存储器 self.storage.clear()
- 最终返回平均 BC 损失,便于日志记录和监控训练过程
# 返回平均bc损失 return mean_bc_loss
1.3.5 日志与评估
- _post_epoch_logging:详细记录训练过程中的损失、奖励、性能等信息,支持 Tensorboard 和 Rich 控制台美化
- evaluate_policy:支持评估模式,结合回调机制灵活扩展评估逻辑
1.4 PPO:标准实现与运动操作专用PPO的实现
1.4.1 PPO的标准实现:ppo/ppo.py
这段代码实现了 PPO(Proximal Policy Optimization)强化学习算法的主类。它负责整个算法的生命周期管理,包括初始化、模型和优化器的构建、数据存储、训练循环、模型保存与加载、评估流程等
1.4.1.1 __init__
- 在 `__init__` 构造函数中,PPO 类会初始化环境、配置、日志记录器(用于 TensorBoard)、以及一些用于统计和追踪训练过程的变量。它还会初始化奖励和回合长度的缓冲区,用于后续的统计分析
- `_init_config` 方法会从配置对象中提取所有关键超参数,包括环境数量、观测和动作维度、学习率、损失系数、折扣因子等,确保算法的灵活性和可配置性
1.4.1.2 setup方法:调用 `_setup_models_and_optimizer` 和 `_setup_storage`
`setup` 方法会调用 `_setup_models_and_optimizer` 和 `_setup_storage`,分别初始化 actor/critic 网络及其优化器,以及 rollout 数据存储结构
- 模型采用 PPOActor 和 PPOCritic 两个神经网络,优化器使用 Adam
- 数据存储结构会注册所有需要追踪的张量,包括观测、动作、奖励、优势等
1.4.1.3 训练主循环在 `learn` 方法
简言之,训练主循环在 `learn` 方法中实现。每次迭代会先进行 rollout(与环境交互收集数据),然后进行训练步骤(mini-batch SGD),并记录训练过程中的各种统计信息。训练过程中会定期保存模型和优化器状态,便于断点恢复
具体而言
- 首先,如果配置了 `init_at_random_ep_len`,会将环境的 episode 长度缓冲区随机初始化,这有助于增加训练的多样性
def learn(self): # 如果需要在随机的episode长度初始化,则对环境的episode长度缓冲区进行随机赋值 if self.init_at_random_ep_len: self.env.episode_length_buf = torch.randint_like(self.env.episode_length_buf, high=int(self.env.max_episode_length))
- 接着,环境会被重置,获得初始观测
并将所有观测张量移动到指定的设备(如 GPU 或 CPU),以确保后续计算的高效性# 重置所有环境,获取初始观测 obs_dict = self.env.reset_all()
# 将每个观测都转移到指定的设备上(如GPU或CPU) for obs_key in obs_dict.keys(): obs_dict[obs_key] = obs_dict[obs_key].to(self.device)
- 随后,模型被切换到训练模式(`self._train_mode()`),以启用如 dropout、BN 等训练相关的行为
主循环根据当前迭代次数和总训练迭代数进行# 设置模型为训练模式 self._train_mode()
每次迭代包括以下步骤:# 获取本次学习的迭代次数 num_learning_iterations = self.num_learning_iterations # 计算总的迭代次数(当前迭代数 + 本次要迭代的次数) tot_iter = self.current_learning_iteration + num_learning_iterations
1. 记录当前时间,便于后续统计训练耗时
2. 调用 `_rollout_step` 与环境交互,收集一批数据(观测、动作、奖励等),并返回最新的观测# 不使用track进度条,因为会和motion loading bar冲突 # for it in track(range(self.current_learning_iteration, tot_iter), description="Learning Iterations"): for it in range(self.current_learning_iteration, tot_iter): # 记录本次迭代的起始时间 self.start_time = time.time()
3. 调用 `_training_step`,对采集到的数据进行策略和价值网络的训练# 进行一次rollout,返回新的观测(必须更新,否则一直用初始观测) obs_dict = self._rollout_step(obs_dict)
4. 记录本次迭代的耗时,并将相关统计信息(如损失、采集时间、奖励等)打包到 `log_dict`,用于日志记录和可视化# 进行一次训练步骤,返回损失字典 loss_dict = self._training_step()
5. 每隔 `save_interval` 次迭代保存一次模型和优化器状态,便于断点恢复# 记录本次迭代的结束时间 self.stop_time = time.time() # 计算本次学习所用时间 self.learn_time = self.stop_time - self.start_time # 日志信息字典,包含当前迭代、损失、采集和学习时间、奖励等 log_dict = { 'it': it, 'loss_dict': loss_dict, 'collection_time': self.collection_time, 'learn_time': self.learn_time, 'ep_infos': self.ep_infos, 'rewbuffer': self.rewbuffer, 'lenbuffer': self.lenbuffer, 'num_learning_iterations': num_learning_iterations } # 执行日志记录 self._post_epoch_logging(log_dict)
6. 清空本轮的 episode 信息缓存,为下轮训练做准备# 每隔save_interval步保存一次模型 if it % self.save_interval == 0: self.save(os.path.join(self.log_dir, 'model_{}.pt'.format(it)))
# 清空本轮收集的episode信息 self.ep_infos.clear()
- 循环结束后,更新当前的训练迭代计数,并再次保存一次模型,确保所有训练进度都被持久化
# 累加当前学习迭代次数 self.current_learning_iteration += num_learning_iterations # 保存最终模型 self.save(os.path.join(self.log_dir, 'model_{}.pt'.format(self.current_learning_iteration)))
1.4.1.4 rollout (采样和数据收集)的实现:_rollout_step与_actor_rollout_step
rollout 过程由 `_rollout_step` 实现——_rollout_step又会调用_actor_rollout_step,负责与环境交互、收集数据、处理奖励和终止信息,并将数据写入存储
具体而言,
- 它首先在 `torch.inference_mode()` 上下文中运行,这样可以避免梯度计算,提高推理效率。方法内部通过循环 `self.num_steps_per_env` 次,每次循环代表智能体在环境中的一步
def _rollout_step(self, obs_dict): # 在推理模式下,不计算梯度,提升效率 with torch.inference_mode(): # 遍历每个环境步数 for i in range(self.num_steps_per_env): # 初始化策略状态字典 policy_state_dict = {}
- 在每一步中
首先通过 `_actor_rollout_step` 计算当前观测下的动作及相关策略信息
并通过 `_critic_eval_step` 评估当前状态的价值# 通过actor获取动作及相关策略信息,存入policy_state_dict policy_state_dict = self._actor_rollout_step(obs_dict, policy_state_dict)
所有与观测和策略相关的数据都会被存储到 `self.storage`,以便后续训练使用# 通过critic评估当前状态的价值 values = self._critic_eval_step(obs_dict).detach() # 将价值信息加入策略状态字典 policy_state_dict["values"] = values
随后,智能体根据动作与环境交互,获得新的观测、奖励、终止信号和额外信息# 将观测信息存入rollout存储 for obs_key in obs_dict.keys(): self.storage.update_key(obs_key, obs_dict[obs_key]) # 将策略相关信息存入rollout存储 for obs_ in policy_state_dict.keys(): self.storage.update_key(obs_, policy_state_dict[obs_])
并将这些数据转移到设备上(如 GPU)# 获取当前动作 actions = policy_state_dict["actions"] # 构造actor_state字典 actor_state = {} actor_state["actions"] = actions # 与环境交互,获得新的观测、奖励、done标志和额外信息 obs_dict, rewards, dones, infos = self.env.step(actor_state)
# 将新的观测转移到指定设备 for obs_key in obs_dict.keys(): obs_dict[obs_key] = obs_dict[obs_key].to(self.device) # 奖励和done也转移到设备 rewards, dones = rewards.to(self.device), dones.to(self.device)
- 奖励会根据是否有超时(`time_outs`)进行修正,确保奖励的准确性
每一步的数据(奖励、终止信号等)都会被存储,并通过 `self._process_env_step` 处理智能体和价值网络的重置
若启用了日志记录,还会统计每个 episode 的奖励和长度,并在 episode 结束时重置相关统计量 - 循环结束后,方法会记录采集数据所用的时间,并调用 `_compute_returns` 计算每一步的回报(returns)和优势(advantages)
这些数据随后批量存储到 `self.storage`,为后续的策略优化做准备# 为训练准备数据,计算returns和advantages returns, advantages = self._compute_returns( last_obs_dict=obs_dict, policy_state_dict=dict( values=self.storage.query_key('values'), dones=self.storage.query_key('dones'), rewards=self.storage.query_key('rewards')) )
- 最终,方法返回最新的观测字典
# 返回最新的观测字典 return obs_dict
1.4.1.5 GAE(广义优势估计):在 `_compute_returns` 方法中实现
优势和回报的计算采用 GAE(广义优势估计),在 `_compute_returns` 方法中实现,能够有效降低策略梯度的方差
- 首先,方法会用当前环境的最后一个观测(`last_obs_dict`)通过价值网络(critic)计算最后状态的价值 `last_values`
并将所有相关张量(values、dones、rewards、last_values)移动到指定设备(如 GPU)def _compute_returns(self, last_obs_dict, policy_state_dict): """ 计算每一步的回报(returns)和优势(advantages),用于PPO训练 使用广义优势估计(GAE)来降低策略梯度的方差。 """ # 用critic网络评估最后一个观测的状态价值,并去除梯度 last_values = self.critic.evaluate(last_obs_dict["critic_obs"]).detach() # 初始化优势为0 advantage = 0 # 从策略状态字典中获取values、dones和rewards values = policy_state_dict['values'] dones = policy_state_dict['dones'] rewards = policy_state_dict['rewards']
# 将所有张量转移到指定设备(如GPU) last_values = last_values.to(self.device) values = values.to(self.device) dones = dones.to(self.device) rewards = rewards.to(self.device)
- 随后,初始化一个与 values 形状相同的 returns 张量,用于存储每一步的回报
# 初始化returns张量,形状与values相同 returns = torch.zeros_like(values)
- 为方便大家更好的理解,我直接引用此文《ChatGPT技术原理解析:从RL之PPO算法、RLHF到GPT4、instructGPT》的中的「3.3 针对「对话序列/奖励值序列/values/DSC优势函数/returns」的一个完整示例」内容 给大家解释说明下
首先,GAE的计算公式为且考虑到有
故,实际计算的时候,为减少计算量,可以先计算A_7,再分别计算A_6、A_5、A_4、A_3、A_2、A_1
其次,再回顾一下TD误差的含义:δ_1 = r1 + γV_old(2) - V_old(1)
比如对于上式:实际获得的即时奖励 r1 加上折扣后的未来奖励预测 γV_old(2),再减去我们原先预测的当前时间步的奖励 V_old(1),这就是后者的预测与前者实际经验之间的差距
SO,核心计算在一个反向循环中完成(从最后一步到第一步)。对于每一步,方法会判断下一个状态是否为终止状态(`next_is_not_terminal`),并据此计算 TD 残差(delta)
优势(advantage)通过递归方式累加,结合了当前步的 TD 残差和未来步的优势,体现了 GAE 的思想# 获取步数 num_steps = returns.shape[0] # 反向遍历每一步,计算GAE优势和回报 for step in reversed(range(num_steps)): # 最后一步的next_values用last_values,其余用下一步的values if step == num_steps - 1: next_values = last_values else: next_values = values[step + 1] # 判断下一步是否为终止状态 next_is_not_terminal = 1.0 - dones[step].float() # 计算TD残差 delta = rewards[step] + next_is_not_terminal * self.gamma * next_values - values[step]
每一步的回报则等于当前优势加上当前状态的价值,即# 递归计算GAE优势 advantage = delta + next_is_not_terminal * self.gamma * self.lam * advantage
,故有
# 当前步的回报等于优势加上当前价值 returns[step] = advantage + values[step]
- 最后,方法计算优势(returns - values)
并进行标准化处理(减去均值,除以标准差),以便后续训练时数值更稳定# 计算优势(returns - values),并进行标准化 advantages = returns - values
advantages = (advantages - advantages.mean()) / (advantages.std() + 1e-8) # 返回回报和标准化后的优势
- 最终返回每一步的回报和标准化后的优势。这个过程确保了 PPO 算法在更新策略时能够利用高质量、低方差的优势估计
return returns, advantages
1.4.1.6 _training_step
训练步骤 `_training_step` 会从存储中生成 mini-batch,依次进行策略和价值网络的更新
1.4.1.7 _update_ppo
PPO 的核心损失计算在 `_update_ppo` 方法中实现,包括代理损失、价值损失和熵损失,并支持自适应 KL 散度调整学习率
1.4.1.8 其他
此外,类还实现了模型的保存与加载、评估流程(包括回调机制)、以及详细的日志记录和可视化
1.4.2 运动操作专用PPO(不训练 仅评估):ppo_locomanip.py
这段代码定义了一个名为 `PPOLocoManip` 的类,是基于PPO(Proximal Policy Optimization)算法的智能体,专门用于评估阶段——即该类不包含训练逻辑,只用于评估,支持多环境并行评估,主要用于“站立”和“行走操控”两种策略的切换与评估
- 类初始化(`__init__`)
参数:
`env`: 环境对象,通常是一个多智能体或多任务环境
`config`: 配置对象,包含网络结构、超参数等
`log_dir`: 日志目录
`device`: 运行设备(如'cpu'或'cuda')
主要成员变量:
`self.actor_obs`: 两种策略的观测键名(站立/行走)
`self.actor`: 存储两个策略网络(站立和行走)
`self.eval_callbacks`: 评估回调列表
`self.episode_env_tensors`: 用于统计评估指标
还有一些用于记录奖励、步数等的缓存 - 配置初始化(`_init_config`)
从环境配置中读取环境数、观测维度、动作维度等 - 网络与优化器初始化(`_setup_models_and_optimizer`)
分别初始化站立和行走的PPO Actor网络,并放到指定设备上
`self.actor` 是一个列表,包含两个策略网络 - 评估模式切换(`_eval_mode`)
将两个策略网络都切换到`eval()`模式,关闭dropout等训练特性 - 加载模型权重(`load`)
分别加载站立和行走策略的模型权重
返回权重文件中的`infos`信息 - 环境步进与策略动作(`env_step`, `_actor_act_step`)
`env_step`: 将下半身动作和参考上半身动作拼接后送入环境,获得新观测、奖励等_actor_act_step`: 根据当前控制模式,选择对应策略网络进行动作推理 - 评估流程相关
evaluate_loco_manip_policy: 评估主循环,反复与环境交互,调用回调,直到外部打断_create_eval_callbacks: 根据配置实例化评估回调
_pre_evaluate_policy/_post_evaluate_policy: 评估前后调用回调
_pre_eval_env_step/_post_eval_env_step: 每步环境交互前后调用回调_get_inference_policy: 获取推理用的策略函数(act_inference) - 其他辅助方法
get_example_obs: 打印并返回一次环境重置后的观测样本
inference_model属性:返回策略网络列表
1.5 ASAP的核心创新模块:增量学习与Delta动力学模型
1.5.1 机器人运控train_delta_a.py:基于PPO的扩展(支持双策略)
本文件定义了一个名为 `PPO DeltaA` 的类,是基于 PPO(Proximal Policy Optimization)算法的扩展,主要用于机器人运动控制任务,支持“双策略”机制:
- 主策略:当前正在训练的 PPO 策略
- 参考策略:从 checkpoint 加载的、冻结参数的预训练策略(可用于对比、辅助、模仿等)
其主要依赖包括
- `torch`、`torch.nn`、`torch.optim`:PyTorch 深度学习框架
- humanoidverse 相关模块:自定义的环境、网络结构、工具等
- `hydra`、`omegaconf`:配置管理
- `loguru`、`rich`:日志和美观的终端输出
- 其他:`os`、`time`、`deque`、`statistics` 等标准库
如此文《ASAP——让宇树G1后仰跳投且跳舞:仿真中重现现实轨迹,然后通过增量动作模型预测仿真与现实的差距,最终缩小差距以对齐》所说
- 工作流程如图2(b)所示
- PPO 用于训练增量动作策略
,学习修正后的
以匹配仿真与真实世界
1.5.1.1 初始化(`__init__`)
- -继承自 PPO 基类,初始化环境、配置、日志目录、设备等
如果配置中指定了 `policy_checkpoint`,则:
自动查找并加载 checkpoint 对应的 config.yaml 配置文件
合并 `eval_overrides` 配置(如果有) - 预处理配置
pre_process_config(policy_config) # 对配置进行预处理
- 使用 hydra 的 `instantiate` 动态实例化参考策略(`self.loaded_policy`),并调用其 `setup()` 方法
加载 checkpoint 权重到参考策略 - 设置参考策略为评估模式(冻结参数,不参与训练)
禁用参考策略所有参数的梯度(`param.requires_grad = False`)
获取参考策略的推理函数 `eval_policy`
1.5.1.2 rollout 步骤(`_rollout_step`):采集一段轨迹(rollout),供后续训练使用
这段 `_rollout_step` 方法负责在强化学习训练中收集一批(batch)环境交互数据。它的主要流程是在不计算梯度的情况下(`torch.inference_mode()`),循环执行 `num_steps_per_env` 次,每次代表环境中的一步
简言之,每步流程:
- 主策略推理,得到动作(actions)和值(values)
- 参考策略推理,得到 `actions_closed_loop`
- 将主策略和参考策略的动作都传入环境 `env.step(actor_state)`
- 收集环境返回的观测、奖励、done、info 等
- 更新存储器(RolloutStorage),包括奖励、done、values 等
- 统计回合奖励、长度等信息
- 采集完一段后,计算 returns 和 advantages,供 PPO 算法训练
具体而言,在每一步中
- 首先,主策略推理,得到动作(actions)和值(values)
通过 `_actor_rollout_step` 计算当前策略的动作,并通过 `_critic_eval_step` 计算当前状态的价值(value),这些信息被存储在 `policy_state_dict` 中def _rollout_step(self, obs_dict): # 采集一段rollout轨迹 with torch.inference_mode(): # 关闭梯度计算,加速推理 for i in range(self.num_steps_per_env): # 遍历每个环境步 # 计算动作和值 # 策略状态字典 policy_state_dict = {} # 主策略推理 policy_state_dict = self._actor_rollout_step(obs_dict, policy_state_dict) # 评估当前状态的价值 values = self._critic_eval_step(obs_dict).detach() # 存储value policy_state_dict["values"] = values
- 随后,所有观测和策略相关的数据都会被存入 `self.storage`,用于后续训练
## 存储观测 for obs_key in obs_dict.keys(): # 存储每个观测 self.storage.update_key(obs_key, obs_dict[obs_key]) for obs_ in policy_state_dict.keys(): # 存储策略相关数据 self.storage.update_key(obs_, policy_state_dict[obs_]) # 获取主策略动作 actions = policy_state_dict["actions"] # 构造actor状态 actor_state = {} # 主策略动作 actor_state["actions"] = actions
- 接下来,参考策略推理,得到 `actions_closed_loop`
方法还会用一个“参考策略”(`self.loaded_policy`,通常是预训练或专家策略)对 `closed_loop_actor_obs` 进行推理,得到 `actions_closed_loop`
并将其与主策略的动作一起组成 `actor_state`,用于环境的下一步模拟# 参考策略推理 policy_output = self.loaded_policy.eval_policy(obs_dict['closed_loop_actor_obs']).detach()
# 存储参考策略动作 actor_state["actions_closed_loop"] = policy_output
- 环境执行一步后,新的观测、奖励、终止标志和信息被返回。所有张量会被转移到正确的设备(如 GPU),奖励和终止信息也会被存储
若出现超时(`time_outs`),奖励会做相应调整# 与环境交互,获得新观测、奖励、done等 obs_dict, rewards, dones, infos = self.env.step(actor_state) for obs_key in obs_dict.keys(): # 移动到指定设备 obs_dict[obs_key] = obs_dict[obs_key].to(self.device) # 奖励和done也移动到设备 rewards, dones = rewards.to(self.device), dones.to(self.device) # 记录环境统计信息 self.episode_env_tensors.add(infos["to_log"]) # 奖励扩展维度 rewards_stored = rewards.clone().unsqueeze(1)
每一步还会调用 `_process_env_step` 进行额外处理# 如果有超时信息 if 'time_outs' in infos: # 修正奖励 rewards_stored += self.gamma * policy_state_dict['values'] * infos['time_outs'].unsqueeze(1).to(self.device) # 检查奖励维度 assert len(rewards_stored.shape) == 2 self.storage.update_key('rewards', rewards_stored) # 存储奖励 self.storage.update_key('dones', dones.unsqueeze(1)) # 存储done self.storage.increment_step() # 存储步数+1
self._process_env_step(rewards, dones, infos) # 处理环境步
- 如果设置了日志目录,还会记录每个 episode 的奖励和长度,便于后续统计和分析
- 循环结束后,方法会统计采集数据所用的时间,并调用 `_compute_returns` 计算每一步的回报(returns)和优势(advantages)
这些数据会批量更新到存储中,为后续的策略优化做准备# 计算回报和优势 returns, advantages = self._compute_returns( last_obs_dict=obs_dict, policy_state_dict=dict(values=self.storage.query_key('values'), dones=self.storage.query_key('dones'), rewards=self.storage.query_key('rewards')) )
self.storage.batch_update_data('returns', returns) # 存储回报 self.storage.batch_update_data('advantages', advantages) # 存储优势
- 最后返回最新的观测字典
return obs_dict # 返回最新观测
整个过程实现了数据采集、存储和预处理的自动化,是 PPO 及其变体算法训练流程的核心部分。
1.5.1.3 评估前处理(`_pre_eval_env_step`)
简言之
- 用于评估阶段,每步都让主策略和参考策略分别推理动作
- 更新 `actor_state`,包含主策略动作和参考策略动作
- 支持回调机制(`eval_callbacks`),可扩展评估逻辑
1.5.2 Delta动力学模型:delta_dynamics
这段代码定义了一个基于PyTorch的“Delta Dynamics”模型,主要用于强化学习环境中动力学建模。代码分为两个主要部分:
1.5.2.1 DeltaDynamics_NN 神经网络
作用:这是一个三层全连接神经网络(MLP),输入维度为`input_dim`,输出为`output_dim`,中间两层隐藏层各256单元,激活函数为ReLU
用途:用于预测动力学的“增量”(delta),即给定输入状态,预测下一步状态的变化量
1.5.2.2 DeltaDynamicsModel 算法主类
主要成员变量和初始化
- env:环境对象,通常是一个仿真环境
- config:配置参数,包含训练、保存、优化器等设置
- log_dir:日志目录
- device:运行设备(CPU/GPU)
- writer:Tensorboard日志记录器
- delta_dynamics_loss:损失函数,使用MSELoss
- delta_dynamics_path:模型保存路径
主要方法包括如下
- `_init_config`
从环境和配置中读取各种参数,如环境数、动作维度、训练步数、学习率等 - setup & _setup_models_and_optimizer
初始化神经网络模型和优化器 - learn
- 训练主循环:
- 每次迭代重置环境,采集数据
- 用神经网络预测delta,解析为各个动力学分量
- 计算预测与目标的MSE损失(包括关节位置、速度、基座位置、速度、角速度、四元数等)
- 反向传播并优化
- 定期保存模型和记录日志
learn 方法实现了 delta dynamics 神经网络的训练主循环
- 首先,方法会将模型切换到训练模式,并重置环境,确保所有观测数据都被移动到指定的设备(如 GPU)
主循环迭代次数由 num_learning_iterations 控制,每次迭代代表一次完整的训练周期def learn(self): # 设置模型为训练模式 self._train_mode() # 重置环境,获取初始观测 obs_dict = self.env.reset_all() for obs_key in obs_dict.keys(): # 将观测数据转移到指定设备(如GPU) obs_dict[obs_key] = obs_dict[obs_key].to(self.device)
for it in range(self.num_learning_iterations): # 迭代训练指定次数 print('Iteration: ', it) # 打印当前迭代次数
- 在每次迭代开始时,如果当前迭代数能被 1000 整除,会调用 resample_motion 方法对环境中的运动数据进行重采样,这有助于增加训练数据的多样性
随后环境再次重置,优化器梯度被清零,为新一轮参数更新做准备if it % 1000 == 0: # 每1000次重新采样动作 self.env.resample_motion()
# 每次迭代重置环境 obs_dict = self.env.reset_all() # 优化器梯度清零 self.delta_dynamics_optimizer.zero_grad()
- 每个迭代周期内部,还会有 num_steps_per_env 次小循环
每一步,模型会用当前观测 obs_dict['delta_dynamics_input_obs'] 预测状态变化 pred_delta,并通过 parse_delta 解析为字典结构# 收集梯度 loss = 0 # 初始化损失 for i in range(self.num_steps_per_env): # 每个环境步数循环
环境随后执行一步动作,返回新的观测和奖励等信息obs = obs_dict['delta_dynamics_input_obs'] # 获取输入观测 pred_delta = self.delta_dynamics(obs) # 预测状态变化量 # 解析张量为字典 # 解析预测的状态变化 delta_state_items = self.env.parse_delta(pred_delta.clone(), 'pred')
接着,利用 update_delta 和 assemble_delta 方法将预测结果还原为完整状态# 与环境交互,获取新观测和奖励等 obs_dict, rew_buf, reset_buf, extras = self.env.step(dict(on_policy=False))
并与目标状态 obs_dict['delta_dynamics_motion_state_obs'] 进行对比# 更新预测状态 pred_state = self.env.update_delta(delta_state_items) # 将字典组装为张量 # 组装预测状态为张量 pred = self.env.assemble_delta(pred_state)
损失函数被细分为多个部分(如关节位置、速度、基座位置等),分别计算均方误差# 获取目标状态 target = obs_dict['delta_dynamics_motion_state_obs'] # 解析目标状态 target_state_items = self.env.parse_delta(target.clone(), 'target')
最后将所有损失加总# 计算不同分量的损失 # 关节位置损失 loss_dof_pos = self.delta_dynamics_loss(pred_state['dof_pos'], target_state_items['motion_dof_pos']) # 关节速度损失 loss_dof_vel = self.delta_dynamics_loss(pred_state['dof_vel'], target_state_items['motion_dof_vel']) # 基座位置损失 loss_base_pos_xyz = self.delta_dynamics_loss(pred_state['base_pos_xyz'], target_state_items['motion_base_pos_xyz']) # 基座线速度损失 loss_base_lin_vel = self.delta_dynamics_loss(pred_state['base_lin_vel'], target_state_items['motion_base_lin_vel']) # 基座角速度损失 loss_base_ang_vel = self.delta_dynamics_loss(pred_state['base_ang_vel'], target_state_items['motion_base_ang_vel']) # 基座四元数损失 loss_base_quat = self.delta_dynamics_loss(pred_state['base_quat'], target_state_items['motion_base_quat'])
# 总损失 loss = loss_dof_pos + loss_dof_vel + loss_base_pos_xyz + loss_base_lin_vel + loss_base_ang_vel + loss_base_quat
- 每次主循环结束后,会将总损失和各个分项损失写入 TensorBoard,方便后续可视化和分析
最后,执行反向传播和优化器步进,更新模型参数如果当前迭代数能被 save_interval 整除,还会保存当前模型和优化器的状态到磁盘loss.backward() # 反向传播计算梯度 self.delta_dynamics_optimizer.step() # 优化器更新参数
# 每隔一定步数保存一次模型 if it % self.save_interval == 0: # 记录日志 logger.info(f"Iteration: {it}, Loss: {loss.item()}") # 保存模型 self.save(os.path.join(self.log_dir, 'model_{}.pt'.format(it)))
整个流程实现了标准的深度学习训练范式,并针对动力学建模任务做了细致的损失分解和日志记录。容易忽略的点是,loss 在每个小循环内被覆盖而不是累加,这意味着只记录了最后一步的损失,若需累计损失可考虑调整
- load & save
加载/保存模型参数和优化器状态 - evaluate_policy
- 评估当前策略,无梯度,循环与环境交互 - 其他辅助方法
`_pre_eval_env_step`、`eval_env_step`、`_post_eval_env_step`:用于评估流程中与环境的交互和状态更新 - `get_example_obs`:打印并返回环境的观测样本
关键点总结
- 该模型用于学习环境状态的“增量动力学”,即预测状态变化量
- 支持与已有策略(policy)联合使用,可加载预训练策略参数
- 训练过程中对各动力学分量分别计算损失,便于细粒度调优
- 支持Tensorboard日志、模型保存与加载、评估等常见功能
1.6 控制算法:解耦合控制、力感知控制、模型预测路径积分控制
// 待更
第二部分 环境模块 envs/
BaseTask (envs/base_task/base_task.py):
├── BaseSimulator → 物理模拟器接口
├── terrain → 地形生成
├── robot配置 → 机器人参数
├── obs配置 → 观察空间定义
└── rewards配置 → 奖励函数设计
具体任务继承关系:
BaseTask
├── LocomotionTask (locomotion/)
├── MotionTrackingTask (motion_tracking/)
└── LeggedBaseTask (legged_base_task/)
基础任务架构:
- BaseTask: 所有RL任务的基类,处理模拟器初始化、环境设置、观察空间定义
- base_task/: 包含基础任务实现和配置
专门任务类型
- locomotion/: 运动控制任务,实现机器人行走、跑步等基础运动
- motion_tracking/: 运动追踪任务,实现对人类动作的模仿和跟踪
- legged_base_task/: 腿式机器人基础任务
- env_utils/: 环境工具函数,包含地形生成、物理参数设置等
2.1 base_task
2.2 legged_base_task
2.3 locomotion:运动控制任务——实现机器人行走、跑步等基础运动
2.4 motion_tracking:运动追踪任务,RL中实现对人类动作的模仿和跟踪
这个 `LeggedRobotMotionTracking` 类是一个复杂的机器人运动跟踪系统,主要用于强化学习环境中的人形机器人运动仿真和跟踪
这个类继承自 `LeggedRobotBase`,专门处理机器人的运动跟踪任务,包括:运动数据管理、VR遥操作支持、奖励计算、状态重置和观察
2.4.1 核心初始化流程:__init__
2.4.2 关键组件解析:运动库初始化 (`_init_motion_lib`)
这个 `_init_motion_lib` 方法负责初始化机器人的运动库系统,这是整个运动跟踪环境的核心组件。该方法的主要职责是设置运动数据管理、加载参考运动序列,并为多个并行环境准备运动时间采样
- 首先,方法设置时间步长配置,将环境的仿真时间步 `self.dt` 赋值给运动配置的 `step_dt` 参数。这确保了运动库与仿真环境使用相同的时间分辨率,保证时间同步的准确性
接着创建 `MotionLibRobot` 实例,这个类继承自 `MotionLibBase`,专门处理人形机器人的运动数据def _init_motion_lib(self): # 设置机器人运动配置的时间步长为环境的时间步长 self.config.robot.motion.step_dt = self.dt
构造函数传入运动配置、环境数量和计算设备,为并行处理多个仿真环境做准备# 创建机器人运动库实例,传入运动配置、环境数量和设备 self._motion_lib = MotionLibRobot(self.config.robot.motion, num_envs=self.num_envs, device=self.device)
- 运动加载策略根据当前模式动态调整
在评估模式下(`self.is_evaluating` 为 True),系统调用 `load_motions(random_sample=False)`,这意味着运动序列将按照确定性顺序加载,确保评估结果的可重现性和一致性
在训练模式下,系统使用 `load_motions(random_sample=True)`,启用随机采样策略来增加训练数据的多样性,防止模型过拟合特定的运动序列# 如果当前处于评估模式 if self.is_evaluating: # 加载运动数据,不进行随机采样(按顺序加载) self._motion_lib.load_motions(random_sample=False)
`load_motions` 方法的内部实现展现了复杂的数据处理流程# 如果当前处于训练模式 else: # 加载运动数据,进行随机采样 self._motion_lib.load_motions(random_sample=True)
该方法根据采样策略选择运动序列:
随机模式下使用多项式采样 `torch.multinomial` 基于预设的采样概率分布选择运动
确定性模式下使用取余操作 `torch.remainder` 循环遍历所有可用运动
加载过程中,系统解析运动文件数据,提取关键信息如帧率、帧数、运动长度等,并将所有运动数据连接成大型张量存储在GPU上,包括全局位置、旋转、速度、角速度等多维运动状态 - 方法的最后部分处理运动时间的初始化
调用 `_resample_motion_times(torch.arange(self.num_envs))` 为所有环境重新采样运动开始时间,这个函数会为每个环境设置运动长度和随机的开始时间点(训练模式下)或零开始时间(评估模式下)# 重新采样所有环境的运动时间 res = self._resample_motion_times(torch.arange(self.num_envs))
- 最后,方法保存重要的运动参数:
`motion_dt` 存储运动的时间间隔
`motion_start_idx` 初始化为0用于跟踪当前运动序列的起始索引
`num_motions` 记录可用的独特运动数量# 设置运动数据的时间步长 self.motion_dt = self._motion_lib._motion_dt # 初始化运动开始索引为0 self.motion_start_idx = 0 # 获取唯一运动的总数量 self.num_motions = self._motion_lib._num_unique_motions
这种设计允许系统高效地管理大量运动数据,支持并行仿真环境,并提供灵活的训练与评估模式切换。通过批量预加载和GPU存储,系统能够快速访问运动状态,满足实时仿真的性能要求。
2.4.3. 扩展刚体系统 (`_init_motion_extend`)
这个 `_init_motion_extend` 方法负责初始化机器人运动追踪系统中的扩展身体部件。该方法主要用于处理需要额外追踪的虚拟关节或标记点,这些点通常附着在机器人的现有身体部件上
- 首先,方法检查配置文件中是否存在 `extend_config` 配置项。如果存在,它会创建三个临时列表来收集扩展配置数据:父身体部件的索引、相对位置和旋转信息
- 在配置解析阶段,代码遍历每个扩展配置项,提取关键信息:
- 通过 `self.simulator._body_list.index()` 查找父身体部件在模拟器中的索引
- 收集扩展点相对于父部件的位置和旋转数据
- 将新的关节名称添加到模拟器的身体列表中 - 接下来是张量创建和数据转换阶段。代码将收集的数据转换为 PyTorch 张量,并进行必要的格式调整:
- `extend_body_parent_ids` 存储父身体索引
- 位置和旋转数据通过 `.repeat(self.num_envs, 1, 1)` 扩展到所有环境
- 特别注意旋转数据的四元数格式转换:从 `wxyz` 格式转换为 `xyzw` 格式 - 最后,方法初始化用于运动追踪的关键张量。这些张量的尺寸都考虑了原始身体数量加上扩展身体数量:
- `marker_coords` 用于存储标记点坐标
- `ref_body_pos_extend` 存储参考身体位置
- `dif_global_body_pos` 用于计算全局位置差异
2.4.4 观察计算_pre_compute_observations_callback:计算参考运动与当前状态的差异
这个 `_pre_compute_observations_callback` 方法是运动追踪系统的核心观测预处理函数,负责计算机器人当前状态与参考运动之间的差异,并准备用于强化学习的观测数据
- 参考运动状态获取
方法首先调用父类的预处理函数,然后计算当前时间步的运动时间戳。通过 `self._motion_lib.get_motion_state()` 从运动库中获取参考运动的状态信息,包括身体位置、速度、旋转、角速度以及关节位置和速度。这些参考数据代表了机器人应该达到的目标状态 - 扩展刚体状态计算
接下来的四个部分处理扩展刚体的物理状态计算,这是该方法的技术核心:
1. 位置计算:使用四元数旋转将扩展身体的相对位置转换到全局坐标系。首先将相对位置在父身体坐标系中旋转,然后应用扩展身体自身的旋转,最后加上父身体的全局位置
2. 旋转计算:通过四元数乘法组合父身体旋转和扩展身体的相对旋转,得到扩展身体的全局旋转
3. 角速度计算:直接继承父身体的角速度,这是一个简化假设
4. 线速度计算:这里使用了刚体运动学的核心公式:`v = v_parent + ω × r`,其中角速度与相对位置的叉积计算出由旋转引起的速度贡献 - 差异计算
方法计算参考状态与当前模拟状态之间的差异,这些差异是强化学习奖励函数的基础:
- 位置差异:直接相减
- 旋转差异:使用四元数乘法和共轭计算旋转误差## diff compute - kinematic position # 差异计算 - 运动学位置 # 计算全局刚体位置差异 self.dif_global_body_pos = ref_body_pos_extend - self._rigid_body_pos_extend
- 速度和角速度差异:直接相减## diff compute - kinematic rotation # 差异计算 - 运动学旋转 # 计算全局刚体旋转差异 self.dif_global_body_rot = quat_mul(ref_body_rot_extend, quat_conjugate(self._rigid_body_rot_extend, w_last=True), w_last=True)
- 关节角度和速度差异:参考值与当前DOF状态的差异## diff compute - kinematic velocity # 差异计算 - 运动学速度 # 计算全局刚体速度差异 self.dif_global_body_vel = ref_body_vel_extend - self._rigid_body_vel_extend # 计算全局刚体角速度差异 self.dif_global_body_ang_vel = ref_body_ang_vel_extend - self._rigid_body_ang_vel_extend
## diff compute - kinematic joint position # 差异计算 - 运动学关节位置 # 计算关节角度差异 self.dif_joint_angles = ref_joint_pos - self.simulator.dof_pos ## diff compute - kinematic joint velocity # 差异计算 - 运动学关节速度 # 计算关节速度差异 self.dif_joint_velocities = ref_joint_vel - self.simulator.dof_vel
- 坐标系转换和观测准备
代码执行关键的坐标系转换,将全局坐标系中的差异转换为机器人本地坐标系。这通过计算机器人朝向的逆四元数并应用旋转实现。本地坐标系的观测数据对强化学习更有意义,因为它们与机器人的朝向无关 - VR三点追踪处理
方法处理VR控制模式下的三点追踪(通常是两只手和头部)。根据是否启用远程操作控制,选择使用参考运动数据或实时VR标记坐标 - 运动相位计算——Deepmimic阶段计算
最后计算运动相位信息,这对于周期性运动(如步行)的追踪很重要。相位通过当前时间与运动总长度的比值计算,并进行边界检查确保数值在有效范围内# 获取参考运动长度 self._ref_motion_length = self._motion_lib.get_motion_length(self.motion_ids) # 计算参考运动阶段(0-1之间的比例) self._ref_motion_phase = motion_times / self._ref_motion_length # 检查运动阶段是否在有效范围内(硬编码1.05,因为+1会超过1) if not (torch.all(self._ref_motion_phase >= 0) and torch.all(self._ref_motion_phase <= 1.05)): # 获取最大阶段值 max_phase = self._ref_motion_phase.max() # 在第二维增加一个维度 self._ref_motion_phase = self._ref_motion_phase.unsqueeze(1) # 记录运动跟踪信息 self._log_motion_tracking_info()
而上面代码中,倒数第一行的_log_motion_tracking_info 用于计算和记录机器人运动追踪过程中的差异指标。该方法属于`LeggedRobotMotionTracking`类,主要目的是监控机器人模拟动作与参考动作之间的偏差程度
- 首先,代码从类的实例变量中提取了四种不同的差异数据:上半身位置差异、下半身位置差异、VR三点位置差异以及关节角度差异
这些差异值存储在张量(tensor)中,格式为多维数组,表示不同环境、不同身体部位和不同坐标轴的差异值def _log_motion_tracking_info(self): # 计算上半身各关节点在全局坐标系下的位置差异 upper_body_diff = self.dif_global_body_pos[:, self.upper_body_id, :] # 计算下半身各关节点在全局坐标系下的位置差异 lower_body_diff = self.dif_global_body_pos[:, self.lower_body_id, :] # 计算VR三点跟踪点(通常是头部和双手)在全局坐标系下的位置差异 vr_3point_diff = self.dif_global_body_pos[:, self.motion_tracking_id, :] # 获取关节角度差异 joint_pos_diff = self.dif_joint_angles
- 接着,代码计算了每种差异的范数(norm)
具体来说,它对每个差异张量的最后一个维度(代表xyz坐标)计算欧几里得范数(L2范数),然后取所有值的平均值。这个操作将多维差异数据压缩成单个标量值,表示整体偏差的大小- 最后,这些计算得到的标量差异值被存储到类的`log_dict`字典中,使用描述性的键名,如"upper_body_diff_norm"(上半身差异范数)等。这样做的目的是为了后续可视化、分析或用于优化算法的反馈
总体而言,此方法是动作追踪系统中的一个重要组成部分,它通过量化模拟动作与目标动作之间的差异,为评估系统性能和调试提供了量化指标。这些指标可能会用于计算奖励函数、确定训练进度或者在可视化界面中展示动作匹配程度
2.4.5. 课程学习支持:支持动态调整终止条件
2.4 6. VR遥操作支持:通过ROS2接收VR设备数据
2.4.7. 运动数据保存_post_physics_step:支持保存训练过程中的运动数据
这个函数用于在物理模拟步骤之后收集和保存机器人的运动数据。它的主要功能是在每次物理模拟后跟踪记录机器人的各种状态,并在收集了足够多的数据后将它们保存到文件中
主要流程如下
- 首先调用父类方法:`super()._post_physics_step()`,继承父类的后处理行为
- 条件检查:只有当`self.save_motion`为真时才进行数据收集
- 时间更新:计算当前的运动时间点,根据已经过的帧数(`episode_length_buf`)、时间步长(`dt`)和起始时间
- 数据保存条件判断:
- 当收集的数据量超过配置的阈值(`self.config.save_total_steps`)时,进行数据处理和保存
- 处理包括将列表数据转换为张量、转置和转换为NumPy数组
- 跳过前3帧数据(`v[3:]`),可能是为了稳定性考虑 - 数据组织
- 为每个环境创建单独的`motion{i}`键
- 将所有收集的数据按环境分组
- 添加帧率(`fps`)信息 - 文件保存
- 使用`joblib.dump`将数据保存到指定路径
- 打印彩色的成功消息
- 保存后立即退出程序(`sys.exit()`) - 数据收集
- 如果未达到保存条件,则继续收集机器人的各种状态数据:
- 根部位置(`root_trans`)和旋转(`root_rot`)
- 根据不同模拟器(isaacgym/isaacsim/genesis)调整旋转四元数格式
- 转换旋转为轴角表示(`root_rot_vec`)
- 关节角度(`dof`)
- 构建姿势轴角表示(`pose_aa`)
- 收集动作、观察、终止状态、速度等信息 - 数据追加:将所有收集的数据添加到相应的保存列表中
- 开始保存标记:设置`self.start_save = True`,表示已开始数据收集过程
这个函数在机器人学习或仿真系统中实现了详细的运动数据收集,支持多种模拟器,并在达到预定数量的步骤后保存并退出,有可能是为了收集训练数据或分析机器人的运动表现
// 待更
2.4.8 多个奖励函数,用于评估机器人跟踪参考运动的表现
- _reward_teleop_body_position_extend(): 身体位置跟踪奖励
- _reward_teleop_vr_3point(): VR三点跟踪奖励
- _reward_teleop_joint_position(): 关节位置跟踪奖励
// 待更
第三部分 模拟器模块 simulator/
支持多种物理引擎:
- IsaacGym: NVIDIA的GPU加速物理模拟器
- IsaacSim: 基于Omniverse的仿真平台
- Genesis: 新兴的高性能物理模拟器
- base_simulator/: 模拟器基类,提供统一接口
第四部分 配置系统 config/
采用Hydra配置管理,结构化组织
- base.yaml/base_eval.yaml: 基础配置
- algo/: 算法特定配置
- env/: 环境配置
- robot/: 机器人配置 (如G1机器人的29自由度配置)
- rewards/: 奖励函数配置
- obs/: 观察空间配置
- terrain/: 地形配置
第五部分 工具模块 utils/
通用工具
- common.py: 通用函数
- math.py: 数学工具函数
- torch_utils.py: PyTorch相关工具
- config_utils.py: 配置处理工具
- motion_lib/: 动作库,存储和处理人类动作数据
第六部分 数据模块data/
- motions/: 存储人类动作数据
- robots/: 机器人模型文件和配置
第七部分 Isaac工具isaac_utils/
专门为Isaac模拟器提供的数学和旋转工具
- maths.py: 数学计算函数
- rotations.py: 旋转变换工具
// 待更