From cf80becd1798cb9116e952c733feae54bc7bfbbc Mon Sep 17 00:00:00 2001 From: xxh Date: Fri, 13 Mar 2026 21:38:44 -0400 Subject: [PATCH] restore --- scripts/gyms/Walk.py | 606 ------------------------------------------- 1 file changed, 606 deletions(-) delete mode 100644 scripts/gyms/Walk.py diff --git a/scripts/gyms/Walk.py b/scripts/gyms/Walk.py deleted file mode 100644 index dd2f42a..0000000 --- a/scripts/gyms/Walk.py +++ /dev/null @@ -1,606 +0,0 @@ -import os -import numpy as np -import math -import time -from time import sleep -from random import random -from random import uniform - -from stable_baselines3 import PPO -from stable_baselines3.common.vec_env import SubprocVecEnv - -import gymnasium as gym -from gymnasium import spaces - -from scripts.commons.Train_Base import Train_Base -from scripts.commons.Server import Server as Train_Server - -from agent.base_agent import Base_Agent -from utils.math_ops import MathOps - -from scipy.spatial.transform import Rotation as R - -''' -Objective: -Learn how to run forward using step primitive ----------- -- class Basic_Run: implements an OpenAI custom gym -- class Train: implements algorithms to train a new model or test an existing model -''' - - -class WalkEnv(gym.Env): - def __init__(self, ip, server_p) -> None: - - # Args: Server IP, Agent Port, Monitor Port, Uniform No., Robot Type, Team Name, Enable Log, Enable Draw - self.Player = player = Base_Agent( - team_name="Gym", - number=1, - host=ip, - port=server_p - ) - self.robot_type = self.Player.robot - self.step_counter = 0 # to limit episode size - self.force_play_on = True - - self.target_position = np.array([0.0, 0.0]) # target position in the x-y plane - self.initial_position = np.array([0.0, 0.0]) # initial position in the x-y plane - self.target_direction = 0.0 # target direction in the x-y plane (relative to the robot's orientation) - self.isfallen = False - self.waypoint_index = 0 - self.route_completed = False - self.debug_every_n_steps = 5 - self.calibrate_nominal_from_neutral = True - self.auto_calibrate_train_sim_flip = True - self.nominal_calibrated_once = False - self.flip_calibrated_once = False - self._target_hz = 0.0 - self._target_dt = 0.0 - self._last_sync_time = None - target_hz_env = 24 - if target_hz_env: - try: - self._target_hz = float(target_hz_env) - except ValueError: - self._target_hz = 0.0 - if self._target_hz > 0.0: - self._target_dt = 1.0 / self._target_hz - - # State space - # 原始观测大小: 78 - obs_size = 78 - self.obs = np.zeros(obs_size, np.float32) - self.observation_space = spaces.Box( - low=-10.0, - high=10.0, - shape=(obs_size,), - dtype=np.float32 - ) - - action_dim = len(self.Player.robot.ROBOT_MOTORS) - self.no_of_actions = action_dim - self.action_space = spaces.Box( - low=-1.0, - high=1.0, - shape=(action_dim,), - dtype=np.float32 - ) - - # 中立姿态 - self.joint_nominal_position = np.array( - [ - 0.0, - 0.0, - 0.0, - 1.4, - 0.0, - -0.4, - 0.0, - -1.4, - 0.0, - 0.4, - 0.0, - -0.4, - 0.0, - 0.0, - 0.8, - -0.4, - 0.0, - 0.4, - 0.0, - 0.0, - -0.8, - 0.4, - 0.0, - ] - ) - self.reference_joint_nominal_position = self.joint_nominal_position.copy() - - self.train_sim_flip = np.array( - [ - 1.0, # 0: Head_yaw (he1) - -1.0, # 1: Head_pitch (he2) - 1.0, # 2: Left_Shoulder_Pitch (lae1) - -1.0, # 3: Left_Shoulder_Roll (lae2) - 1.0, # 4: Left_Elbow_Pitch (lae3) - 1.0, # 5: Left_Elbow_Yaw (lae4) - -1.0, # 6: Right_Shoulder_Pitch (rae1) - 1.0, # 7: Right_Shoulder_Roll (rae2) - 1.0, # 8: Right_Elbow_Pitch (rae3) - 1.0, # 9: Right_Elbow_Yaw (rae4) - 1.0, # 10: Waist (te1) - 1.0, # 11: Left_Hip_Pitch (lle1) - -1.0, # 12: Left_Hip_Roll (lle2) - -1.0, # 13: Left_Hip_Yaw (lle3) - 1.0, # 14: Left_Knee_Pitch (lle4) - 1.0, # 15: Left_Ankle_Pitch (lle5) - -1.0, # 16: Left_Ankle_Roll (lle6) - -1.0, # 17: Right_Hip_Pitch (rle1) - -1.0, # 18: Right_Hip_Roll (rle2) - -1.0, # 19: Right_Hip_Yaw (rle3) - -1.0, # 20: Right_Knee_Pitch (rle4) - -1.0, # 21: Right_Ankle_Pitch (rle5) - -1.0, # 22: Right_Ankle_Roll (rle6) - ] - ) - - self.scaling_factor = 0.5 - - self.previous_action = np.zeros(len(self.Player.robot.ROBOT_MOTORS)) - self.previous_pos = np.array([0.0, 0.0]) # Track previous position - self.Player.server.connect() - # sleep(2.0) # Longer wait for connection to establish completely - self.Player.server.send_immediate( - f"(init {self.Player.robot.name} {self.Player.world.team_name} {self.Player.world.number})" - ) - - def debug_log(self, message): - print(message) - try: - log_path = os.path.join(os.path.dirname(os.path.dirname(__file__)), "comm_debug.log") - with open(log_path, "a", encoding="utf-8") as f: - f.write(message + "\n") - except OSError: - pass - - def calibrate_train_sim_flip_from_neutral(self, neutral_joint_positions): - updated_flip = self.train_sim_flip.copy() - changed = [] - - for idx, (reference_value, observed_value) in enumerate( - zip(self.reference_joint_nominal_position, neutral_joint_positions) - ): - if idx >= 10: - continue - if abs(reference_value) < 0.15 or abs(observed_value) < 0.15: - continue - - inferred_flip = 1.0 if np.sign(reference_value) == np.sign(observed_value) else -1.0 - if updated_flip[idx] != inferred_flip: - changed.append((idx, updated_flip[idx], inferred_flip)) - updated_flip[idx] = inferred_flip - - self.train_sim_flip = updated_flip - - if changed: - self.debug_log( - "[FlipDebug] " - f"changes={[(idx, old, new) for idx, old, new in changed]}" - ) - - def is_reliable_neutral_pose(self, neutral_joint_positions): - leg_positions = neutral_joint_positions[11:] - leg_norm = float(np.linalg.norm(leg_positions)) - leg_max = float(np.max(np.abs(leg_positions))) - height = float(self.Player.world.global_position[2]) - - reliable = ( - leg_norm > 0.8 - and leg_max > 0.35 - and 0.12 < height < 0.8 - ) - - return reliable, leg_norm, leg_max, height - - def observe(self, init=False): - - """获取当前观测值""" - robot = self.Player.robot - world = self.Player.world - - # Safety check: ensure data is available - - # 计算目标速度 - raw_target = self.target_position - world.global_position[:2] - velocity = MathOps.rotate_2d_vec( - raw_target, - -robot.global_orientation_euler[2], - is_rad=False - ) - - # 计算相对方向 - rel_orientation = MathOps.vector_angle(velocity) * 0.3 - rel_orientation = np.clip(rel_orientation, -0.25, 0.25) - - velocity = np.concatenate([velocity, np.array([rel_orientation])]) - velocity[0] = np.clip(velocity[0], -0.5, 0.5) - velocity[1] = np.clip(velocity[1], -0.25, 0.25) - - # 关节状态 - radian_joint_positions = np.deg2rad( - [robot.motor_positions[motor] for motor in robot.ROBOT_MOTORS] - ) - radian_joint_speeds = np.deg2rad( - [robot.motor_speeds[motor] for motor in robot.ROBOT_MOTORS] - ) - - qpos_qvel_previous_action = np.concatenate([ - (radian_joint_positions * self.train_sim_flip - self.joint_nominal_position) / 4.6, - radian_joint_speeds / 110.0 * self.train_sim_flip, - self.previous_action / 10.0, - ]) - - # 角速度 - ang_vel = np.clip(np.deg2rad(robot.gyroscope) / 50.0, -1.0, 1.0) - - # 投影的重力方向 - orientation_quat_inv = R.from_quat(robot._global_cheat_orientation).inv() - projected_gravity = orientation_quat_inv.apply(np.array([0.0, 0.0, -1.0])) - - # 组合观测 - observation = np.concatenate([ - qpos_qvel_previous_action, - ang_vel, - velocity, - projected_gravity, - ]) - - observation = np.clip(observation, -10.0, 10.0) - return observation.astype(np.float32) - - def sync(self): - ''' Run a single simulation step ''' - self.Player.server.receive() - self.Player.world.update() - self.Player.robot.commit_motor_targets_pd() - self.Player.server.send() - if self._target_dt > 0.0: - now = time.time() - if self._last_sync_time is None: - self._last_sync_time = now - return - elapsed = now - self._last_sync_time - remaining = self._target_dt - elapsed - if remaining > 0.0: - time.sleep(remaining) - now = time.time() - self._last_sync_time = now - - def debug_joint_status(self): - robot = self.Player.robot - actual_joint_positions = np.deg2rad( - [robot.motor_positions[motor] for motor in robot.ROBOT_MOTORS] - ) - target_joint_positions = getattr( - self, - 'target_joint_positions', - np.zeros(len(robot.ROBOT_MOTORS), dtype=np.float32) - ) - joint_error = actual_joint_positions - target_joint_positions - leg_slice = slice(11, None) - - self.debug_log( - "[WalkDebug] " - f"step={self.step_counter} " - f"pos={np.round(self.Player.world.global_position, 3).tolist()} " - f"target_xy={np.round(self.target_position, 3).tolist()} " - f"target_leg={np.round(target_joint_positions[leg_slice], 3).tolist()} " - f"actual_leg={np.round(actual_joint_positions[leg_slice], 3).tolist()} " - f"err_norm={float(np.linalg.norm(joint_error)):.4f} " - f"fallen={self.Player.world.global_position[2] < 0.3}" - ) - - def reset(self, seed=None, options=None): - ''' - Reset and stabilize the robot - Note: for some behaviors it would be better to reduce stabilization or add noise - ''' - r = self.Player.robot - super().reset(seed=seed) - if seed is not None: - np.random.seed(seed) - - length1 = np.random.uniform(10, 20) # randomize target distance - length2 = np.random.uniform(10, 20) # randomize target distance - length3 = np.random.uniform(10, 20) # randomize target distance - angle2 = np.random.uniform(-30, 30) # randomize initial orientation - angle3 = np.random.uniform(-30, 30) # randomize target direction - - self.step_counter = 0 - self.waypoint_index = 0 - self.route_completed = False - self.previous_action = np.zeros(len(self.Player.robot.ROBOT_MOTORS)) - self.previous_pos = np.array([0.0, 0.0]) # Initialize for first step - self.walk_cycle_step = 0 - - # 随机 beam 目标位置和朝向,增加训练多样性 - beam_x = (random() - 0.5) * 10 - beam_y = (random() - 0.5) * 10 - - for _ in range(5): - self.Player.server.receive() - self.Player.world.update() - self.Player.robot.commit_motor_targets_pd() - self.Player.server.commit_beam(pos2d=(beam_x, beam_y), rotation=0) - self.Player.server.send() - - # 执行 Neutral 技能直到完成,给机器人足够时间在 beam 位置稳定站立 - finished_count = 0 - for _ in range(10): - finished = self.Player.skills_manager.execute("Neutral") - self.sync() - if finished: - finished_count += 1 - if finished_count >= 3: # 假设需要连续3次完成才算成功 - break - - # neutral_joint_positions = np.deg2rad( - # [self.Player.robot.motor_positions[motor] for motor in self.Player.robot.ROBOT_MOTORS] - # ) - # reliable_neutral, neutral_leg_norm, neutral_leg_max, neutral_height = self.is_reliable_neutral_pose(neutral_joint_positions) - - # if self.auto_calibrate_train_sim_flip and reliable_neutral and not self.flip_calibrated_once: - # self.calibrate_train_sim_flip_from_neutral(neutral_joint_positions) - # self.flip_calibrated_once = True - # if self.calibrate_nominal_from_neutral and reliable_neutral and not self.nominal_calibrated_once: - # self.joint_nominal_position = neutral_joint_positions * self.train_sim_flip - # self.nominal_calibrated_once = True - # self.debug_log( - # "[ResetDebug] " - # f"neutral_pos={np.round(self.Player.world.global_position, 3).tolist()} " - # f"shoulders={np.round(neutral_joint_positions[2:10], 3).tolist()} " - # f"legs={np.round(neutral_joint_positions[11:], 3).tolist()} " - # f"flip={self.train_sim_flip.tolist()} " - # f"nominal_legs={np.round(self.joint_nominal_position[11:], 3).tolist()} " - # f"calibrated_once={(self.flip_calibrated_once, self.nominal_calibrated_once)} " - # f"reliable_neutral={reliable_neutral} " - # f"leg_norm={neutral_leg_norm:.3f} leg_max={neutral_leg_max:.3f} height={neutral_height:.3f}" - # ) - - # reset_action_noise = np.random.uniform(-0.015, 0.015, size=(len(self.Player.robot.ROBOT_MOTORS),)) - # self.target_joint_positions = (self.joint_nominal_position + reset_action_noise) * self.train_sim_flip - - # for idx, target in enumerate(self.target_joint_positions): - # r.set_motor_target_position( - # r.ROBOT_MOTORS[idx], target*180/math.pi, kp=25, kd=0.6 - # ) - - # memory variables - self.initial_position = np.array(self.Player.world.global_position[:2]) - self.previous_pos = self.initial_position.copy() # Critical: set to actual position - self.act = np.zeros(self.no_of_actions, np.float32) - point1 = self.initial_position + np.array([length1, 0]) - point2 = point1 + MathOps.rotate_2d_vec(np.array([length2, 0]), angle2, is_rad=False) - point3 = point2 + MathOps.rotate_2d_vec(np.array([length3, 0]), angle3, is_rad=False) - self.point_list = [point1, point2, point3] - self.target_position = self.point_list[self.waypoint_index] - - return self.observe(True), {} - - def render(self, mode='human', close=False): - return - - def compute_reward(self, previous_pos, current_pos, action): - velocity = current_pos - previous_pos - velocity_magnitude = np.linalg.norm(velocity) - direction_to_target = self.target_position - current_pos - prev_direction_to_target = self.target_position - previous_pos - distance_to_target = np.linalg.norm(direction_to_target) - prev_distance_to_target = np.linalg.norm(prev_direction_to_target) - - progress_reward = np.clip((prev_distance_to_target - distance_to_target) * 30.0, -2.0, 4.0) - - velocity_in_m_per_sec = velocity_magnitude / 0.05 - speed_reward = np.clip(velocity_in_m_per_sec * 1.5, 0.0, 1.5) - - if velocity_magnitude > 1e-4 and distance_to_target > 1e-4: - directional_alignment = np.dot(velocity, direction_to_target) / (velocity_magnitude * distance_to_target) - directional_alignment = np.clip(directional_alignment, -1.0, 1.0) - direction_reward = max(0.0, directional_alignment) - else: - direction_reward = 0.0 - - alive_bonus = 0.05 - - height = self.Player.world.global_position[2] - if 0.45 <= height <= 1.2: - height_reward = 1.5 - else: - height_reward = -6.0 - - motionless_penalty = -1.5 if velocity_magnitude < 0.003 else 0.0 - - waypoint_bonus = 0.0 - if distance_to_target < 0.5: - waypoint_bonus = 25.0 - if self.waypoint_index < len(self.point_list) - 1: - self.waypoint_index += 1 - self.target_position = self.point_list[self.waypoint_index] - else: - waypoint_bonus = 100.0 - self.route_completed = True - - action_magnitude = np.linalg.norm(action[11:]) - action_penalty = -0.08 * action_magnitude - tilt_penalty = -0.2 * np.linalg.norm(self.Player.robot.gyroscope[:2]) / 100.0 - - return ( - progress_reward - + speed_reward - + direction_reward - + alive_bonus - + height_reward - + motionless_penalty - + waypoint_bonus - + action_penalty - + tilt_penalty - ) - - def step(self, action): - - r = self.Player.robot - - self.previous_action = action - - self.target_joint_positions = ( - self.joint_nominal_position - + self.scaling_factor * action - ) - self.target_joint_positions *= self.train_sim_flip - - for idx, target in enumerate(self.target_joint_positions): - r.set_motor_target_position( - r.ROBOT_MOTORS[idx], target * 180 / math.pi, kp=25, kd=0.6 - ) - - self.sync() # run simulation step - self.step_counter += 1 - - # if self.step_counter % self.debug_every_n_steps == 0: - # self.debug_joint_status() - - current_pos = np.array(self.Player.world.global_position[:2], dtype=np.float32) - - # Compute reward based on movement from previous step - reward = self.compute_reward(self.previous_pos, current_pos, action) - - # Update previous position - self.previous_pos = current_pos.copy() - - # Fall detection and penalty - is_fallen = self.Player.world.global_position[2] < 0.3 - - # terminal state: the robot is falling or timeout - terminated = is_fallen or self.step_counter > 800 or self.route_completed - truncated = False - - return self.observe(), reward, terminated, truncated, {} - - -class Train(Train_Base): - def __init__(self, script) -> None: - super().__init__(script) - - def train(self, args): - - # --------------------------------------- Learning parameters - n_envs = 8 # Reduced from 8 to decrease CPU/network pressure during init - if n_envs < 1: - raise ValueError("GYM_CPU_N_ENVS must be >= 1") - n_steps_per_env = 512 # RolloutBuffer is of size (n_steps_per_env * n_envs) - minibatch_size = 128 # should be a factor of (n_steps_per_env * n_envs) - total_steps = 30000000 - learning_rate = 3e-4 - folder_name = f'Walk_R{self.robot_type}' - model_path = f'./scripts/gyms/logs/{folder_name}/' - - print(f"Model path: {model_path}") - print(f"Using {n_envs} parallel environments") - - # --------------------------------------- Run algorithm - def init_env(i_env): - def thunk(): - return WalkEnv(self.ip, self.server_p + i_env) - - return thunk - - server_log_dir = os.path.join(model_path, "server_logs") - os.makedirs(server_log_dir, exist_ok=True) - servers = Train_Server(self.server_p, self.monitor_p_1000, n_envs + 1) # include 1 extra server for testing - - # Wait for servers to start - print(f"Starting {n_envs + 1} rcssservermj servers...") - print("Servers started, creating environments...") - - env = SubprocVecEnv([init_env(i) for i in range(n_envs)]) - eval_env = SubprocVecEnv([init_env(n_envs)]) - - try: - # Custom policy network architecture - policy_kwargs = dict( - net_arch=dict( - pi=[256, 256, 128], # Policy network: 3 layers - vf=[256, 256, 128] # Value network: 3 layers - ), - activation_fn=__import__('torch.nn', fromlist=['ReLU']).ReLU, - ) - - if "model_file" in args: # retrain - model = PPO.load(args["model_file"], env=env, device="cpu", n_envs=n_envs, n_steps=n_steps_per_env, - batch_size=minibatch_size, learning_rate=learning_rate) - else: # train new model - model = PPO( - "MlpPolicy", - env=env, - verbose=1, - n_steps=n_steps_per_env, - batch_size=minibatch_size, - learning_rate=learning_rate, - device="cpu", - policy_kwargs=policy_kwargs, - ent_coef=0.01, # Entropy coefficient for exploration - clip_range=0.2, # PPO clipping parameter - gae_lambda=0.95, # GAE lambda - gamma=0.99 # Discount factor - ) - - model_path = self.learn_model(model, total_steps, model_path, eval_env=eval_env, - eval_freq=n_steps_per_env * 10, save_freq=n_steps_per_env * 10, - backup_env_file=__file__) - except KeyboardInterrupt: - sleep(1) # wait for child processes - print("\nctrl+c pressed, aborting...\n") - servers.kill() - return - - env.close() - eval_env.close() - servers.kill() - - def test(self, args): - - # Uses different server and monitor ports - server_log_dir = os.path.join(args["folder_dir"], "server_logs") - os.makedirs(server_log_dir, exist_ok=True) - server = Train_Server(self.server_p - 1, self.monitor_p, 1, log_dir=server_log_dir) - env = WalkEnv(self.ip, self.server_p - 1) - model = PPO.load(args["model_file"], env=env) - - try: - self.export_model(args["model_file"], args["model_file"] + ".pkl", - False) # Export to pkl to create custom behavior - self.test_model(model, env, log_path=args["folder_dir"], model_path=args["folder_dir"]) - except KeyboardInterrupt: - print() - - env.close() - server.kill() - - -if __name__ == "__main__": - from types import SimpleNamespace - - # 创建默认参数 - script_args = SimpleNamespace( - args=SimpleNamespace( - i='127.0.0.1', # Server IP - p=3100, # Server port - m=3200, # Monitor port - r=0, # Robot type - t='Gym', # Team name - u=1 # Uniform number - ) - ) - - trainer = Train(script_args) - trainer.train({"model_file": "scripts/gyms/logs/Walk_R0_000/model_245760_steps.zip"})