import random import numpy import numpy as np import torch from isaaclab.assets import ArticulationCfg from isaaclab.envs import ManagerBasedRLEnvCfg, ManagerBasedRLEnv from isaaclab.managers import ObservationGroupCfg as ObsGroup from isaaclab.managers import ObservationTermCfg as ObsTerm from isaaclab.managers import RewardTermCfg as RewTerm from isaaclab.managers import TerminationTermCfg as DoneTerm from isaaclab.managers import EventTermCfg as EventTerm from isaaclab.envs.mdp import JointPositionActionCfg from isaaclab.managers import SceneEntityCfg from isaaclab.utils import configclass from rl_game.get_up.env.t1_env import T1SceneCfg import isaaclab.envs.mdp as mdp # --- 1. 自定义 MDP 逻辑函数 --- def standing_with_feet_reward( env: ManagerBasedRLEnv, min_head_height: float, min_pelvis_height: float, sensor_cfg: SceneEntityCfg, force_threshold: float = 20.0, max_v_z: float = 0.5 ) -> torch.Tensor: head_idx, _ = env.scene["robot"].find_bodies("H2") pelvis_idx, _ = env.scene["robot"].find_bodies("Trunk") curr_head_h = torch.clamp(env.scene["robot"].data.body_state_w[:, head_idx[0], 2], 0.0, 2.0) curr_pelvis_h = torch.clamp(env.scene["robot"].data.body_state_w[:, pelvis_idx[0], 2], 0.0, 2.0) head_score = torch.tanh(curr_head_h / (min_head_height + 1e-6) * 2.0) pelvis_score = torch.tanh(curr_pelvis_h / (min_pelvis_height + 1e-6) * 2.0) height_reward = (head_score + pelvis_score) / 2.0 contact_sensor = env.scene.sensors.get(sensor_cfg.name) foot_forces_z = torch.sum(contact_sensor.data.net_forces_w[:, :, 2], dim=-1) force_weight = torch.sigmoid((foot_forces_z - force_threshold) / 5.0) root_vel_z = env.scene["robot"].data.root_lin_vel_w[:, 2] vel_penalty = torch.exp(-2.0 * torch.clamp(torch.abs(root_vel_z) - max_v_z, min=0.0)) influence_weight = torch.clamp((curr_pelvis_h - 0.2) / 0.4, min=0.0, max=1.0) combined_reward = height_reward * ((1.0 - influence_weight) + influence_weight * force_weight * vel_penalty) return combined_reward def arm_push_up_reward( env: ManagerBasedRLEnv, sensor_cfg: SceneEntityCfg, height_threshold: float = 0.65, min_force: float = 3.0 ) -> torch.Tensor: contact_sensor = env.scene.sensors.get(sensor_cfg.name) if contact_sensor is None: return torch.zeros(env.num_envs, device=env.device) # 1. 获取受力数据 arm_forces_z = contact_sensor.data.net_forces_w[:, :, 2] avg_arm_force = torch.mean(arm_forces_z, dim=-1) # 2. 几何限制:手臂必须在躯干下方 (修复了之前的 AttributeError) arm_body_indices, _ = env.scene["robot"].find_bodies(sensor_cfg.body_names) pelvis_idx, _ = env.scene["robot"].find_bodies("Trunk") pelvis_pos_z = env.scene["robot"].data.body_state_w[:, pelvis_idx[0], 2] arm_pos_z = env.scene["robot"].data.body_state_w[:, arm_body_indices, 2] # 手臂是否全部低于盆骨 is_below_pelvis = torch.all(arm_pos_z < pelvis_pos_z.unsqueeze(1), dim=-1).float() # 3. 计算奖励 force_reward = torch.clamp((avg_arm_force - min_force) / 45.0, min=0.0, max=1.0) root_vel_z = env.scene["robot"].data.root_lin_vel_w[:, 2] velocity_factor = torch.clamp(root_vel_z * 3.0, min=0.0, max=1.5) total_reward = force_reward * is_below_pelvis * (1.0 + velocity_factor) # 高度越高,手臂奖励越低 (强迫切换到腿) height_fade = torch.clamp((height_threshold - pelvis_pos_z) / 0.1, min=0.0, max=1.0) return total_reward * height_fade def is_standing_still( env: ManagerBasedRLEnv, min_head_height: float, min_pelvis_height: float, max_angle_error: float, standing_time: float, velocity_threshold: float = 0.15 ) -> torch.Tensor: """判定逻辑:双高度达标 + 躯干垂直 + 全身静止""" head_idx, _ = env.scene["robot"].find_bodies("H2") pelvis_idx, _ = env.scene["robot"].find_bodies("Trunk") current_head_h = env.scene["robot"].data.body_state_w[:, head_idx[0], 2] current_pelvis_h = env.scene["robot"].data.body_state_w[:, pelvis_idx[0], 2] gravity_error = torch.norm(env.scene["robot"].data.projected_gravity_b[:, :2], dim=-1) root_vel_norm = torch.norm(env.scene["robot"].data.root_lin_vel_w, dim=-1) # 判定条件:头够高 且 盆骨够高 且 垂直误差小 且 速度低 is_stable_now = ( (current_head_h > min_head_height) & (current_pelvis_h > min_pelvis_height) & (gravity_error < max_angle_error) & (root_vel_norm < velocity_threshold) ) if "stable_timer" not in env.extras: env.extras["stable_timer"] = torch.zeros(env.num_envs, device=env.device) dt = env.physics_dt * env.cfg.decimation env.extras["stable_timer"] = torch.where(is_stable_now, env.extras["stable_timer"] + dt, torch.zeros_like(env.extras["stable_timer"])) return env.extras["stable_timer"] > standing_time def joint_deviation_l2(env: ManagerBasedRLEnv, asset_cfg: SceneEntityCfg) -> torch.Tensor: """计算关节相对于默认姿态(default_joint_pos)的偏差平方和""" # 获取当前关节位置相对于默认位置的差值 # mdp.joint_pos_rel 返回的是 (current_pos - default_pos) diff = mdp.joint_pos_rel(env, asset_cfg) return torch.sum(torch.square(diff), dim=-1) # --- 2. 配置类 --- T1_JOINT_NAMES = [ 'Left_Hip_Pitch', 'Right_Hip_Pitch', 'Left_Hip_Roll', 'Right_Hip_Roll', 'Left_Hip_Yaw', 'Right_Hip_Yaw', 'Left_Knee_Pitch', 'Right_Knee_Pitch', 'Left_Ankle_Pitch', 'Right_Ankle_Pitch', 'Left_Ankle_Roll', 'Right_Ankle_Roll', 'AL1', 'AL2', 'AL3', 'AR1', 'AR2', 'AR3' ] @configclass class T1ObservationCfg: @configclass class PolicyCfg(ObsGroup): concatenate_terms = True base_lin_vel = ObsTerm(func=mdp.base_lin_vel) base_ang_vel = ObsTerm(func=mdp.base_ang_vel) projected_gravity = ObsTerm(func=mdp.projected_gravity) root_pos = ObsTerm(func=mdp.root_pos_w) joint_pos = ObsTerm(func=mdp.joint_pos_rel, params={"asset_cfg": SceneEntityCfg("robot", joint_names=T1_JOINT_NAMES)}) joint_vel = ObsTerm(func=mdp.joint_vel_rel, params={"asset_cfg": SceneEntityCfg("robot", joint_names=T1_JOINT_NAMES)}) actions = ObsTerm(func=mdp.last_action) policy = PolicyCfg() @configclass class T1EventCfg: reset_robot_rotation = EventTerm( func=mdp.reset_root_state_uniform, params={ "asset_cfg": SceneEntityCfg("robot"), "pose_range": { "roll": (-1.57, 1.57), # 左右侧卧 "pitch": tuple(numpy.array([1.4, 1.6], dtype=np.float32) * random.choice([-1 , 1])), # 仰卧/俯卧 "yaw": (-3.14, 3.14), # 全向旋转 "x": (0.0, 0.0), "y": (0.0, 0.0), "z": (0.1, 0.2), }, "velocity_range": {}, }, mode="reset", ) @configclass class T1ActionCfg: """关键修改:降低 scale 让动作变丝滑,增大阻尼效果""" joint_pos = JointPositionActionCfg( asset_name="robot", joint_names=T1_JOINT_NAMES, scale=0.5, use_default_offset=True ) @configclass class T1GetUpRewardCfg: # 1. 姿态基础奖 (引导身体变正) upright = RewTerm(func=mdp.flat_orientation_l2, weight=2.0) # 2. 【条件高度奖】:双高度判定(头+盆骨),且必须脚踩地 height_with_feet = RewTerm( func=standing_with_feet_reward, weight=20.0, # 作为核心引导,增加权重 params={ "min_head_height": 1.10, "min_pelvis_height": 0.7, "sensor_cfg": SceneEntityCfg("contact_sensor", body_names=[".*_foot_link"]), "force_threshold": 20.0, "max_v_z": 0.3 } ) # 3. 手臂撑地奖:辅助脱离地面阶段 arm_push_support = RewTerm( func=arm_push_up_reward, weight=15.0, # 显著增加权重(从 3.0 提到 15.0),让它成为起步的关键 params={ "sensor_cfg": SceneEntityCfg("contact_sensor", body_names=[".*_hand_link", "AL3", "AR3"]), "height_threshold": 0.65, # 躯干升到 0.6m 前都鼓励手臂用力 "min_force": 3.0 # 只要有 15N 的力就触发 } ) # 4. 关节限位惩罚 (新增:防止关节撞死导致数值问题) joint_limits = RewTerm( func=mdp.joint_pos_limits, weight=-1.0, params={"asset_cfg": SceneEntityCfg("robot")} ) # 5. 时间惩罚 (强制效率) time_penalty = RewTerm( func=mdp.is_alive, weight=-1.2 ) # 6. 成功终极大奖 is_success = RewTerm( func=lambda env, keys: env.termination_manager.get_term(keys).float(), weight=500.0, params={"keys": "standing_success"} ) # 7. 手臂关节活跃度奖 (诱导摆动) arm_movement_exploration = RewTerm( func=mdp.joint_vel_l2, weight=2, # 权重不要太高,防止变成“风扇” params={"asset_cfg": SceneEntityCfg("robot", joint_names=["AL3", "AR3"])} ) # 8. 手臂位置多样性奖 (离开默认折叠姿态) arm_deviation_bonus = RewTerm( func=joint_deviation_l2, weight=1, params={ "asset_cfg": SceneEntityCfg("robot", joint_names=["AL3", "AR3"]) } ) @configclass class T1GetUpTerminationsCfg: time_out = DoneTerm(func=mdp.time_out) # 失败判定:躯干倾斜超过 45 度重置 #base_crash = DoneTerm(func=mdp.bad_orientation, params={"limit_angle": 0.785}) # 成功判定:双高度 + 稳定 standing_success = DoneTerm( func=is_standing_still, params={ "min_head_height": 1.05, "min_pelvis_height": 0.75, "max_angle_error": 0.3, "standing_time": 0.2, "velocity_threshold": 0.5 } ) @configclass class T1EnvCfg(ManagerBasedRLEnvCfg): scene = T1SceneCfg(num_envs=16384, env_spacing=2.5) # 5090 性能全开 def __post_init__(self): super().__post_init__() self.scene.robot.init_state.pos = (0.0, 0.0, 0.2) observations = T1ObservationCfg() rewards = T1GetUpRewardCfg() terminations = T1GetUpTerminationsCfg() events = T1EventCfg() actions = T1ActionCfg() episode_length_s = 6.0 decimation = 4