Files
Gym_CPU/scripts/gyms/Walk.py
2026-03-11 09:54:29 +08:00

473 lines
16 KiB
Python

import os
import numpy as np
import math
from time import sleep
from random import random
from random import uniform
from stable_baselines3 import PPO
from stable_baselines3.common.vec_env import SubprocVecEnv
import gymnasium as gym
from gymnasium import spaces
from scripts.commons.Train_Base import Train_Base
from scripts.commons.Server import Server as Train_Server
from agent import Agent
from utils.math_ops import MathOps
from scipy.spatial.transform import Rotation as R
'''
Objective:
Learn how to run forward using step primitive
----------
- class Basic_Run: implements an OpenAI custom gym
- class Train: implements algorithms to train a new model or test an existing model
'''
class WalkEnv(gym.Env):
def __init__(self, ip, server_p) -> None:
# Args: Server IP, Agent Port, Monitor Port, Uniform No., Robot Type, Team Name, Enable Log, Enable Draw
self.Player = player = Agent(
team_name="Gym",
number=1,
host=ip,
port=server_p
)
self.robot_type = self.Player.robot
self.step_counter = 0 # to limit episode size
self.force_play_on = True
self.target_position = np.array([0.0, 0.0]) # target position in the x-y plane
self.initial_position = np.array([0.0, 0.0]) # initial position in the x-y plane
self.target_direction = 0.0 # target direction in the x-y plane (relative to the robot's orientation)
self.isfallen = False
self.waypoint_index = 0
# State space
obs_size = 78
self.obs = np.zeros(obs_size, np.float32)
self.observation_space = spaces.Box(
low=-10.0,
high=10.0,
shape=(obs_size,),
dtype=np.float32
)
action_dim = len(self.Player.robot.ROBOT_MOTORS)
self.no_of_actions = action_dim
self.action_space = spaces.Box(
low=-1.0,
high=1.0,
shape=(action_dim,),
dtype=np.float32
)
self.joint_nominal_position = np.array(
[
0.0,
0.0,
0.0,
1.4,
0.0,
-0.4,
0.0,
-1.4,
0.0,
0.4,
0.0,
-0.4,
0.0,
0.0,
0.8,
-0.4,
0.0,
0.4,
0.0,
0.0,
-0.8,
0.4,
0.0,
]
)
self.train_sim_flip = np.array(
[
1.0,
-1.0,
1.0,
-1.0,
-1.0,
1.0,
-1.0,
-1.0,
1.0,
1.0,
1.0,
1.0,
-1.0,
-1.0,
1.0,
1.0,
-1.0,
-1.0,
-1.0,
-1.0,
-1.0,
-1.0,
-1.0,
]
)
self.scaling_factor = 0.5
self.previous_action = np.zeros(len(self.Player.robot.ROBOT_MOTORS))
self.previous_pos = np.array([0.0, 0.0]) # Track previous position
self.Player.server.connect()
sleep(2.0) # Longer wait for connection to establish completely
self.Player.server.send_immediate(
f"(init {self.Player.robot.name} {self.Player.world.team_name} {self.Player.world.number})"
)
def observe(self, init=False):
"""获取当前观测值"""
robot = self.Player.robot
world = self.Player.world
# Safety check: ensure data is available
if not robot.motor_positions or not robot.motor_speeds:
return np.zeros(78, dtype=np.float32)
# 计算目标速度
raw_target = self.target_position - world.global_position[:2]
velocity = MathOps.rotate_2d_vec(
raw_target,
-robot.global_orientation_euler[2],
is_rad=False
)
# 计算相对方向
rel_orientation = MathOps.vector_angle(velocity) * 0.3
rel_orientation = np.clip(rel_orientation, -0.25, 0.25)
velocity = np.concatenate([velocity, np.array([rel_orientation])])
velocity[0] = np.clip(velocity[0], -0.5, 0.5)
velocity[1] = np.clip(velocity[1], -0.25, 0.25)
# 关节状态
radian_joint_positions = np.deg2rad(list(robot.motor_positions.values()))
radian_joint_speeds = np.deg2rad(list(robot.motor_speeds.values()))
qpos_qvel_previous_action = np.concatenate([
(radian_joint_positions * self.train_sim_flip - self.joint_nominal_position) / 4.6,
radian_joint_speeds / 110.0 * self.train_sim_flip,
self.previous_action / 10.0,
])
# 角速度
ang_vel = np.clip(np.deg2rad(robot.gyroscope) / 50.0, -1.0, 1.0)
# 投影的重力方向
orientation_quat_inv = R.from_quat(robot._global_cheat_orientation).inv()
projected_gravity = orientation_quat_inv.apply(np.array([0.0, 0.0, -1.0]))
# 组合观测
observation = np.concatenate([
qpos_qvel_previous_action,
ang_vel,
velocity,
projected_gravity,
])
observation = np.clip(observation, -10.0, 10.0)
return observation.astype(np.float32)
def sync(self):
''' Run a single simulation step '''
self.Player.server.receive()
self.Player.world.update()
self.Player.robot.commit_motor_targets_pd()
self.Player.server.send()
def reset(self, seed=None, options=None):
'''
Reset and stabilize the robot
Note: for some behaviors it would be better to reduce stabilization or add noise
'''
super().reset(seed=seed)
if seed is not None:
np.random.seed(seed)
length1 = np.random.uniform(10, 20) # randomize target distance
length2 = np.random.uniform(10, 20) # randomize target distance
length3 = np.random.uniform(10, 20) # randomize target distance
angle1 = np.random.uniform(-30, 30) # randomize target direction
angle2 = np.random.uniform(-30, 30) # randomize initial orientation
angle3 = np.random.uniform(-30, 30) # randomize target direction
self.step_counter = 0
self.waypoint_index = 0
self.previous_action = np.zeros(len(self.Player.robot.ROBOT_MOTORS))
self.previous_pos = np.array([0.0, 0.0]) # Initialize for first step
# beam player to ground
self.Player.server.commit_beam(
pos2d=((random()-1) * 5, (random()-1) * 5), # randomize initial position
rotation=0,
)
# Wait until first valid world timestamp is available
for _ in range(7):
self.sync()
if self.Player.world.server_time is not None:
break
# Execute Neutral skill until it finishes
for _ in range(7):
finished = self.Player.skills_manager.execute("Neutral")
self.sync()
if finished:
break
# memory variables
self.initial_position = np.array(self.Player.world.global_position[:2])
self.previous_pos = self.initial_position.copy() # Critical: set to actual position
self.act = np.zeros(self.no_of_actions ,np.float32)
point1 = self.initial_position + MathOps.rotate_2d_vec(np.array([length1, 0]), angle1, is_rad=False)
point2 = point1 + MathOps.rotate_2d_vec(np.array([length2, 0]), angle2, is_rad=False)
point3 = point2 + MathOps.rotate_2d_vec(np.array([length3, 0]), angle3, is_rad=False)
self.point_list = [point1, point2, point3]
self.target_position = self.point_list[self.waypoint_index]
return self.observe(True), {}
def render(self, mode='human', close=False):
return
def close(self):
self.Player.shutdown() # close server connection and cleanup
def compute_reward(self, previous_pos, current_pos):
"""
Reward function focused on forward progress and stability
"""
# 1. Progress reward: must move toward target
distance_before = np.linalg.norm(self.target_position - previous_pos)
distance_after = np.linalg.norm(self.target_position - current_pos)
progress = distance_before - distance_after
# Heavily reward forward progress, punish backward movement
if progress > 0:
progress_reward = progress * 20.0 # Strong reward for closing distance
else:
progress_reward = progress * 30.0 # Even stronger penalty for going backward
# 2. Absolute speed reward: reward any movement toward goal
movement_magnitude = np.linalg.norm(current_pos - previous_pos)
direction_to_target = self.target_position - current_pos
if np.linalg.norm(direction_to_target) > 0.01:
direction_to_target = direction_to_target / np.linalg.norm(direction_to_target)
movement_vector = current_pos - previous_pos
# Dot product: reward movement in target direction
directional_alignment = np.dot(movement_vector, direction_to_target)
speed_reward = max(0, directional_alignment) * 10.0
else:
speed_reward = 0.0
# 3. Height maintenance: encourage upright posture
height = self.Player.world.global_position[2]
if height > 0.40:
height_reward = 0.5
elif height > 0.30:
height_reward = 0.0
else:
height_reward = -0.5
# 4. Waypoint bonuses
waypoint_bonus = 0.0
if distance_after < 0.8:
waypoint_bonus = 20.0
if self.waypoint_index < len(self.point_list) - 1:
self.waypoint_index += 1
self.target_position = self.point_list[self.waypoint_index]
else:
waypoint_bonus = 50.0 # Final waypoint
return progress_reward + speed_reward + height_reward + waypoint_bonus
def step(self, action):
r = self.Player.robot
target_joint_positions = (
self.joint_nominal_position + self.scaling_factor * action
)
target_joint_positions *= self.train_sim_flip
self.previous_action = action
for idx, target in enumerate(target_joint_positions):
r.set_motor_target_position(
r.ROBOT_MOTORS[idx], target*180/math.pi, kp=25, kd=0.6
)
self.sync() # run simulation step
self.step_counter += 1
current_pos = np.array(self.Player.world.global_position[:2], dtype=np.float32)
# Compute reward based on movement from previous step
reward = self.compute_reward(self.previous_pos, current_pos)
# Penalty for standing still or minimal movement
movement = np.linalg.norm(current_pos - self.previous_pos)
if movement < 0.005: # Less than 5mm = basically standing
reward -= 2.0
# Small action penalty to encourage efficiency
action_magnitude = np.linalg.norm(action)
reward -= action_magnitude * 0.01
# Update previous position
self.previous_pos = current_pos.copy()
# Fall detection and penalty
is_fallen = self.Player.world.global_position[2] < 0.25
if is_fallen:
reward -= 15.0
# terminal state: the robot is falling or timeout
terminated = is_fallen or self.step_counter > 500 or self.waypoint_index >= len(self.point_list)
truncated = False
return self.observe(), reward, terminated, truncated, {}
class Train(Train_Base):
def __init__(self, script) -> None:
super().__init__(script)
def train(self, args):
#--------------------------------------- Learning parameters
n_envs = 4 # Reduced from 8 to decrease CPU/network pressure during init
n_steps_per_env = 512 # RolloutBuffer is of size (n_steps_per_env * n_envs)
minibatch_size = 64 # should be a factor of (n_steps_per_env * n_envs)
total_steps = 30000000
learning_rate = 3e-4
folder_name = f'Walk_R{self.robot_type}'
model_path = f'./mujococodebase/scripts/gyms/logs/{folder_name}/'
print(f"Model path: {model_path}")
print(f"Using {n_envs} parallel environments")
#--------------------------------------- Run algorithm
def init_env(i_env):
def thunk():
return WalkEnv( self.ip , self.server_p + i_env)
return thunk
servers = Train_Server( self.server_p, self.monitor_p_1000, n_envs+1 ) #include 1 extra server for testing
# Wait for servers to start
print(f"Starting {n_envs+1} rcssservermj servers...")
print("Servers started, creating environments...")
env = SubprocVecEnv( [init_env(i) for i in range(n_envs)] )
eval_env = SubprocVecEnv( [init_env(n_envs)] )
try:
# Custom policy network architecture
policy_kwargs = dict(
net_arch=dict(
pi=[256, 256, 128], # Policy network: 3 layers
vf=[256, 256, 128] # Value network: 3 layers
),
activation_fn=__import__('torch.nn', fromlist=['ReLU']).ReLU,
)
if "model_file" in args: # retrain
model = PPO.load( args["model_file"], env=env, device="cpu", n_envs=n_envs, n_steps=n_steps_per_env, batch_size=minibatch_size, learning_rate=learning_rate )
else: # train new model
model = PPO(
"MlpPolicy",
env=env,
verbose=1,
n_steps=n_steps_per_env,
batch_size=minibatch_size,
learning_rate=learning_rate,
device="cpu",
policy_kwargs=policy_kwargs,
ent_coef=0.01, # Entropy coefficient for exploration
clip_range=0.2, # PPO clipping parameter
gae_lambda=0.95, # GAE lambda
gamma=0.99 # Discount factor
)
model_path = self.learn_model( model, total_steps, model_path, eval_env=eval_env, eval_freq=n_steps_per_env*20, save_freq=n_steps_per_env*20, backup_env_file=__file__ )
except KeyboardInterrupt:
sleep(1) # wait for child processes
print("\nctrl+c pressed, aborting...\n")
servers.kill()
return
env.close()
eval_env.close()
servers.kill()
def test(self, args):
# Uses different server and monitor ports
server = Train_Server( self.server_p-1, self.monitor_p, 1 )
env = WalkEnv( self.ip, self.server_p-1, self.monitor_p, self.robot_type, True )
model = PPO.load( args["model_file"], env=env )
try:
self.export_model( args["model_file"], args["model_file"]+".pkl", False ) # Export to pkl to create custom behavior
self.test_model( model, env, log_path=args["folder_dir"], model_path=args["folder_dir"] )
except KeyboardInterrupt:
print()
env.close()
server.kill()
if __name__ == "__main__":
from types import SimpleNamespace
# 创建默认参数
script_args = SimpleNamespace(
args=SimpleNamespace(
i='127.0.0.1', # Server IP
p=3100, # Server port
m=3200, # Monitor port
r=0, # Robot type
t='Gym', # Team name
u=1 # Uniform number
)
)
trainer = Train(script_args)
trainer.train({})