import random import numpy import numpy as np import torch from isaaclab.assets import ArticulationCfg from isaaclab.envs import ManagerBasedRLEnvCfg, ManagerBasedRLEnv from isaaclab.managers import ObservationGroupCfg as ObsGroup from isaaclab.managers import ObservationTermCfg as ObsTerm from isaaclab.managers import RewardTermCfg as RewTerm from isaaclab.managers import TerminationTermCfg as DoneTerm from isaaclab.managers import EventTermCfg as EventTerm from isaaclab.envs.mdp import JointPositionActionCfg from isaaclab.managers import SceneEntityCfg from isaaclab.utils import configclass from rl_game.get_up.env.t1_env import T1SceneCfg import isaaclab.envs.mdp as mdp # --- 1. 自定义 MDP 逻辑函数 --- def standing_with_feet_reward( env: ManagerBasedRLEnv, min_head_height: float, min_pelvis_height: float, sensor_cfg: SceneEntityCfg, force_threshold: float = 20.0, max_v_z: float = 0.5 ) -> torch.Tensor: """终极高度目标:头高、盆骨高、足部受力稳定""" head_idx, _ = env.scene["robot"].find_bodies("H2") pelvis_idx, _ = env.scene["robot"].find_bodies("Trunk") curr_head_h = env.scene["robot"].data.body_state_w[:, head_idx[0], 2] curr_pelvis_h = env.scene["robot"].data.body_state_w[:, pelvis_idx[0], 2] # 归一化高度评分 head_score = torch.clamp(curr_head_h / min_head_height, 0.0, 1.2) pelvis_score = torch.clamp(curr_pelvis_h / min_pelvis_height, 0.0, 1.2) height_reward = (head_score + pelvis_score) / 2.0 # 足部受力判定 contact_sensor = env.scene.sensors.get(sensor_cfg.name) if contact_sensor is None: return torch.zeros(env.num_envs, device=env.device) foot_forces_z = torch.sum(contact_sensor.data.net_forces_w[:, :, 2], dim=-1) force_weight = torch.sigmoid((foot_forces_z - force_threshold) / 5.0) # 垂直速度惩罚(防止跳跃不稳) root_vel_z = env.scene["robot"].data.root_lin_vel_w[:, 2] vel_penalty = torch.exp(-torch.abs(root_vel_z) / max_v_z) return height_reward * (0.5 + 0.5 * force_weight * vel_penalty) def arm_tuck_incremental_reward( env: ManagerBasedRLEnv, pitch_threshold: float = 1.4, shaping_weight: float = 0.2 ) -> torch.Tensor: """增量式收手奖励:鼓励向弯曲方向运动,达到阈值给大奖""" joint_names = ["Left_Elbow_Pitch", "Right_Elbow_Pitch"] joint_ids, _ = env.scene["robot"].find_joints(joint_names) elbow_pos = env.scene["robot"].data.joint_pos[:, joint_ids] elbow_vel = env.scene["robot"].data.joint_vel[:, joint_ids] # 1. 速度引导:只要在收缩(速度为正)就给小奖,伸直则惩罚 avg_vel = torch.mean(elbow_vel, dim=-1) shaping_reward = torch.tanh(avg_vel) * shaping_weight # 2. 阈值触发:一旦收缩到位,给稳定的静态奖 is_tucked = torch.all(elbow_pos > pitch_threshold, dim=-1).float() goal_bonus = is_tucked * 1.5 return shaping_reward + goal_bonus def dynamic_getup_strategy_reward(env: ManagerBasedRLEnv) -> torch.Tensor: """ 状态机奖励切换逻辑: - 仰卧时:重点是 翻身 + 缩手。 - 俯卧时:重点是 撑地起立。 """ # 获取重力投影:Z轴分量 > 0 表示仰卧 gravity_z = env.scene["robot"].data.projected_gravity_b[:, 2] # 状态掩码 is_on_back = (gravity_z > 0.2).float() is_on_belly = (gravity_z < -0.2).float() is_transition = (1.0 - is_on_back - is_on_belly) # 1. 翻身势能:引导 gravity_z 向 -1.0 靠拢 flip_shaping = torch.clamp(-gravity_z, min=-1.0, max=1.0) # 2. 缩手动作 tuck_rew = arm_tuck_incremental_reward(env) # 3. 撑地动作 (复用原逻辑,但去掉内部的高度衰减,统一由状态机控制) contact_sensor = env.scene.sensors.get("contact_sensor") max_arm_force = torch.zeros(env.num_envs, device=env.device) if contact_sensor is not None: # 假设手臂/手部 link 的受力 arm_forces_z = contact_sensor.data.net_forces_w[:, :, 2] max_arm_force = torch.max(arm_forces_z, dim=-1)[0] push_rew = torch.tanh(torch.clamp(max_arm_force - 15.0, min=0.0) / 40.0) # --- 权重动态合成 --- # 仰卧区:翻身(8.0) + 缩手(4.0) back_strategy = is_on_back * (8.0 * flip_shaping + 4.0 * tuck_rew) # 俯卧区:撑地(25.0) + 缩手维持(1.0) # 这里撑地权重远高于翻身,确保机器人更愿意待在俯卧区尝试站立 belly_strategy = is_on_belly * (25.0 * push_rew + 1.0 * tuck_rew) # 过渡区 trans_strategy = is_transition * (4.0 * flip_shaping + 10.0 * push_rew + 2.0 * tuck_rew) return back_strategy + belly_strategy + trans_strategy def is_standing_still( env: ManagerBasedRLEnv, min_head_height: float, min_pelvis_height: float, max_angle_error: float, standing_time: float, velocity_threshold: float = 0.15 ) -> torch.Tensor: """判定逻辑:双高度达标 + 躯干垂直 + 全身静止""" head_idx, _ = env.scene["robot"].find_bodies("H2") pelvis_idx, _ = env.scene["robot"].find_bodies("Trunk") current_head_h = env.scene["robot"].data.body_state_w[:, head_idx[0], 2] current_pelvis_h = env.scene["robot"].data.body_state_w[:, pelvis_idx[0], 2] gravity_error = torch.norm(env.scene["robot"].data.projected_gravity_b[:, :2], dim=-1) root_vel_norm = torch.norm(env.scene["robot"].data.root_lin_vel_w, dim=-1) is_stable_now = ( (current_head_h > min_head_height) & (current_pelvis_h > min_pelvis_height) & (gravity_error < max_angle_error) & (root_vel_norm < velocity_threshold) ) if "stable_timer" not in env.extras: env.extras["stable_timer"] = torch.zeros(env.num_envs, device=env.device) dt = env.physics_dt * env.cfg.decimation env.extras["stable_timer"] = torch.where(is_stable_now, env.extras["stable_timer"] + dt, torch.zeros_like(env.extras["stable_timer"])) return env.extras["stable_timer"] > standing_time # --- 2. 配置类 --- T1_JOINT_NAMES = [ 'AAHead_yaw', 'Head_pitch', 'Left_Shoulder_Pitch', 'Left_Shoulder_Roll', 'Left_Elbow_Pitch', 'Left_Elbow_Yaw', 'Right_Shoulder_Pitch', 'Right_Shoulder_Roll', 'Right_Elbow_Pitch', 'Right_Elbow_Yaw', 'Waist', 'Left_Hip_Pitch', 'Right_Hip_Pitch', 'Left_Hip_Roll', 'Right_Hip_Roll', 'Left_Hip_Yaw', 'Right_Hip_Yaw', 'Left_Knee_Pitch', 'Right_Knee_Pitch', 'Left_Ankle_Pitch', 'Right_Ankle_Pitch', 'Left_Ankle_Roll', 'Right_Ankle_Roll' ] @configclass class T1ObservationCfg: @configclass class PolicyCfg(ObsGroup): concatenate_terms = True base_lin_vel = ObsTerm(func=mdp.base_lin_vel) base_ang_vel = ObsTerm(func=mdp.base_ang_vel) projected_gravity = ObsTerm(func=mdp.projected_gravity) root_pos = ObsTerm(func=mdp.root_pos_w) joint_pos = ObsTerm(func=mdp.joint_pos_rel, params={"asset_cfg": SceneEntityCfg("robot", joint_names=T1_JOINT_NAMES)}) joint_vel = ObsTerm(func=mdp.joint_vel_rel, params={"asset_cfg": SceneEntityCfg("robot", joint_names=T1_JOINT_NAMES)}) actions = ObsTerm(func=mdp.last_action) policy = PolicyCfg() @configclass class T1EventCfg: reset_robot_rotation = EventTerm( func=mdp.reset_root_state_uniform, params={ "asset_cfg": SceneEntityCfg("robot"), "pose_range": { "roll": (-1.57, 1.57), "pitch": tuple(numpy.array([1.4, 1.6], dtype=np.float32) * random.choice([-1 , 1])), # 仰卧/俯卧 "yaw": (-3.14, 3.14), "x": (0.0, 0.0), "y": (0.0, 0.0), "z": (0.3, 0.4), }, "velocity_range": {}, }, mode="reset", ) @configclass class T1ActionCfg: joint_pos = JointPositionActionCfg( asset_name="robot", joint_names=T1_JOINT_NAMES, scale=0.5, use_default_offset=True ) @configclass class T1GetUpRewardCfg: # --- 1. 动态策略整合奖励 (包含了翻身、缩手、撑地的逻辑切换) --- adaptive_strategy = RewTerm( func=dynamic_getup_strategy_reward, weight=1.0 # 内部已经有细分权重 ) # --- 2. 核心高度目标 (维持最高优先级) --- height_with_feet = RewTerm( func=standing_with_feet_reward, weight=15.0, params={ "min_head_height": 1.1, "min_pelvis_height": 0.7, "sensor_cfg": SceneEntityCfg("contact_sensor", body_names=[".*_foot_link"]), "force_threshold": 30.0, "max_v_z": 0.3 } ) # --- 3. 辅助约束与惩罚 --- upright = RewTerm(func=mdp.flat_orientation_l2, weight=1.0) joint_limits = RewTerm(func=mdp.joint_pos_limits, weight=-20.0, params={"asset_cfg": SceneEntityCfg("robot")}) action_rate = RewTerm(func=mdp.action_rate_l2, weight=-0.01) # --- 4. 成功奖励 --- is_success_bonus = RewTerm( func=is_standing_still, weight=1000.0, params={ "min_head_height": 1.05, "min_pelvis_height": 0.75, "max_angle_error": 0.3, "standing_time": 0.2, "velocity_threshold": 0.5 } ) @configclass class T1GetUpTerminationsCfg: time_out = DoneTerm(func=mdp.time_out) standing_success = DoneTerm( func=is_standing_still, params={ "min_head_height": 1.05, "min_pelvis_height": 0.75, "max_angle_error": 0.3, "standing_time": 0.2, "velocity_threshold": 0.5 } ) @configclass class T1EnvCfg(ManagerBasedRLEnvCfg): scene = T1SceneCfg(num_envs=8192, env_spacing=2.5) def __post_init__(self): super().__post_init__() self.scene.robot.init_state.pos = (0.0, 0.0, 0.2) observations = T1ObservationCfg() rewards = T1GetUpRewardCfg() terminations = T1GetUpTerminationsCfg() events = T1EventCfg() actions = T1ActionCfg() episode_length_s = 6.0 decimation = 4