import torch import random import numpy as np import isaaclab.envs.mdp as mdp from isaaclab.assets import ArticulationCfg from isaaclab.envs import ManagerBasedRLEnvCfg, ManagerBasedRLEnv from isaaclab.managers import ObservationGroupCfg as ObsGroup from isaaclab.managers import ObservationTermCfg as ObsTerm from isaaclab.managers import RewardTermCfg as RewTerm from isaaclab.managers import TerminationTermCfg as DoneTerm from isaaclab.managers import EventTermCfg as EventTerm from isaaclab.envs.mdp import JointPositionActionCfg from isaaclab.managers import SceneEntityCfg from isaaclab.utils import configclass from rl_game.get_up.env.t1_env import T1SceneCfg # --- 1. 自定义逻辑:阶段性解锁奖励 --- def sequenced_getup_reward( env: ManagerBasedRLEnv, crouch_threshold: float = 0.7, # 蜷缩完成度达到多少解锁下一阶段 target_knee: float = 1.5, target_hip: float = 1.2 ) -> torch.Tensor: """ 【核心修改】只有先蜷缩,才能拿高度分: 1. 计算蜷缩程度。 2. 记录当前 Episode 是否曾经达到过蜷缩目标。 3. 返回 基础蜷缩奖 + (解锁标志 * 站立奖)。 """ # --- 1. 初始化/重置状态位 --- if "has_crouched" not in env.extras: env.extras["has_crouched"] = torch.zeros(env.num_envs, device=env.device, dtype=torch.bool) # 每一回合开始时(reset_buf 为 1),重置该机器人的状态位 env.extras["has_crouched"] &= ~env.reset_buf # --- 2. 计算当前蜷缩质量 --- knee_names = ['Left_Knee_Pitch', 'Right_Knee_Pitch'] hip_names = ['Left_Hip_Pitch', 'Right_Hip_Pitch'] knee_indices, _ = env.scene["robot"].find_joints(knee_names) hip_indices, _ = env.scene["robot"].find_joints(hip_names) joint_pos = env.scene["robot"].data.joint_pos knee_error = torch.mean(torch.abs(joint_pos[:, knee_indices] - target_knee), dim=-1) hip_error = torch.mean(torch.abs(joint_pos[:, hip_indices] - target_hip), dim=-1) # 蜷缩得分 (0.0 ~ 1.0) crouch_score = torch.exp(-(knee_error + hip_error) / 0.6) # --- 3. 判断是否触发解锁 --- # 只要在这一回合内,crouch_score 曾经超过阈值,就永久解锁高度奖 current_success = crouch_score > crouch_threshold env.extras["has_crouched"] |= current_success # --- 4. 计算高度奖励 --- pelvis_idx, _ = env.scene["robot"].find_bodies("Trunk") curr_pelvis_h = env.scene["robot"].data.body_state_w[:, pelvis_idx[0], 2] # 只有解锁后,高度奖励才生效 (0.0 或 高度值) standing_reward = torch.clamp(curr_pelvis_h - 0.3, min=0.0) * 20.0 gated_standing_reward = env.extras["has_crouched"].float() * standing_reward # 总奖励 = 持续引导蜷缩 + 只有解锁后才有的站立奖 return 5.0 * crouch_score + gated_standing_reward def is_standing_still( env: ManagerBasedRLEnv, min_head_height: float, min_pelvis_height: float, max_angle_error: float, standing_time: float, velocity_threshold: float = 0.15 ) -> torch.Tensor: head_idx, _ = env.scene["robot"].find_bodies("H2") pelvis_idx, _ = env.scene["robot"].find_bodies("Trunk") current_head_h = env.scene["robot"].data.body_state_w[:, head_idx[0], 2] current_pelvis_h = env.scene["robot"].data.body_state_w[:, pelvis_idx[0], 2] gravity_error = torch.norm(env.scene["robot"].data.projected_gravity_b[:, :2], dim=-1) root_vel_norm = torch.norm(env.scene["robot"].data.root_lin_vel_w, dim=-1) is_stable_now = ( (current_head_h > min_head_height) & (current_pelvis_h > min_pelvis_height) & (gravity_error < max_angle_error) & (root_vel_norm < velocity_threshold) ) if "stable_timer" not in env.extras: env.extras["stable_timer"] = torch.zeros(env.num_envs, device=env.device) dt = env.physics_dt * env.cfg.decimation env.extras["stable_timer"] = torch.where(is_stable_now, env.extras["stable_timer"] + dt, torch.zeros_like(env.extras["stable_timer"])) return env.extras["stable_timer"] > standing_time # --- 2. 配置类 --- T1_JOINT_NAMES = [ 'AAHead_yaw', 'Head_pitch', 'Left_Shoulder_Pitch', 'Left_Shoulder_Roll', 'Left_Elbow_Pitch', 'Left_Elbow_Yaw', 'Right_Shoulder_Pitch', 'Right_Shoulder_Roll', 'Right_Elbow_Pitch', 'Right_Elbow_Yaw', 'Waist', 'Left_Hip_Pitch', 'Right_Hip_Pitch', 'Left_Hip_Roll', 'Right_Hip_Roll', 'Left_Hip_Yaw', 'Right_Hip_Yaw', 'Left_Knee_Pitch', 'Right_Knee_Pitch', 'Left_Ankle_Pitch', 'Right_Ankle_Pitch', 'Left_Ankle_Roll', 'Right_Ankle_Roll' ] @configclass class T1ObservationCfg: @configclass class PolicyCfg(ObsGroup): concatenate_terms = True base_lin_vel = ObsTerm(func=mdp.base_lin_vel) base_ang_vel = ObsTerm(func=mdp.base_ang_vel) projected_gravity = ObsTerm(func=mdp.projected_gravity) root_pos = ObsTerm(func=mdp.root_pos_w) joint_pos = ObsTerm(func=mdp.joint_pos_rel, params={"asset_cfg": SceneEntityCfg("robot", joint_names=T1_JOINT_NAMES)}) joint_vel = ObsTerm(func=mdp.joint_vel_rel, params={"asset_cfg": SceneEntityCfg("robot", joint_names=T1_JOINT_NAMES)}) actions = ObsTerm(func=mdp.last_action) policy = PolicyCfg() @configclass class T1EventCfg: reset_robot_rotation = EventTerm( func=mdp.reset_root_state_uniform, params={ "asset_cfg": SceneEntityCfg("robot"), "pose_range": { "roll": (-1.57, 1.57), "pitch": tuple(np.array([1.4, 1.6], dtype=np.float32) * random.choice([-1 , 1])), "yaw": (-3.14, 3.14), "x": (0.0, 0.0), "y": (0.0, 0.0), "z": (0.35, 0.45), }, "velocity_range": {}, }, mode="reset", ) @configclass class T1ActionCfg: # 拆分动作组以防止抽搐。由于不强制规定动作,我们可以给各个部位较为均衡的探索范围。 arm_action = JointPositionActionCfg( asset_name="robot", joint_names=[ 'Left_Shoulder_Pitch', 'Left_Shoulder_Roll', 'Left_Elbow_Pitch', 'Left_Elbow_Yaw', 'Right_Shoulder_Pitch', 'Right_Shoulder_Roll', 'Right_Elbow_Pitch', 'Right_Elbow_Yaw' ], scale=1.0, # 给了手臂相对充裕的自由度去摸索 use_default_offset=True ) torso_action = JointPositionActionCfg( asset_name="robot", joint_names=['Waist', 'AAHead_yaw', 'Head_pitch'], scale=0.7, use_default_offset=True ) leg_action = JointPositionActionCfg( asset_name="robot", joint_names=[ 'Left_Hip_Pitch', 'Right_Hip_Pitch', 'Left_Hip_Roll', 'Right_Hip_Roll', 'Left_Hip_Yaw', 'Right_Hip_Yaw', 'Left_Knee_Pitch', 'Right_Knee_Pitch', 'Left_Ankle_Pitch', 'Right_Ankle_Pitch', 'Left_Ankle_Roll', 'Right_Ankle_Roll' ], scale=0.5, use_default_offset=True ) @configclass class T1GetUpRewardCfg: # 核心:顺序阶段奖励 sequenced_task = RewTerm( func=sequenced_getup_reward, weight=10.0, params={"crouch_threshold": 0.75} # 必须完成 75% 的收腿动作才解锁高度奖 ) # 姿态惩罚:即便解锁了高度奖,如果姿态歪了也要扣分 orientation = RewTerm( func=mdp.flat_orientation_l2, weight=-2.5 ) # 抑制抽搐 action_rate = RewTerm(func=mdp.action_rate_l2, weight=-0.08) # 最终站稳奖 is_success_maintain = RewTerm( func=is_standing_still, weight=100.0, params={ "min_head_height": 1.08, "min_pelvis_height": 0.72, "max_angle_error": 0.25, "standing_time": 0.4, "velocity_threshold": 0.2 } ) @configclass class T1GetUpTerminationsCfg: time_out = DoneTerm(func=mdp.time_out) standing_success = DoneTerm( func=is_standing_still, params={ "min_head_height": 1.08, "min_pelvis_height": 0.72, "max_angle_error": 0.3, "standing_time": 0.3, "velocity_threshold": 0.4 } ) @configclass class T1EnvCfg(ManagerBasedRLEnvCfg): scene = T1SceneCfg(num_envs=8192, env_spacing=2.5) observations = T1ObservationCfg() rewards = T1GetUpRewardCfg() terminations = T1GetUpTerminationsCfg() events = T1EventCfg() actions = T1ActionCfg() episode_length_s = 10.0 decimation = 4