From a52cdff0134d255a1be68bb1de35f7d4de4301af Mon Sep 17 00:00:00 2001 From: xxh Date: Sat, 21 Mar 2026 08:53:31 -0400 Subject: [PATCH] add no gui, no realtime mode, and train bash script --- command.md | 14 +++++ communication/server.py | 30 +++++++-- scripts/commons/Server.py | 30 ++++++--- scripts/gyms/Walk.py | 99 +++++++++++++++++++++++------- train.sh | 126 ++++++++++++++++++++++++++++++++++++++ 5 files changed, 262 insertions(+), 37 deletions(-) create mode 100644 command.md mode change 100644 => 100755 scripts/gyms/Walk.py create mode 100755 train.sh diff --git a/command.md b/command.md new file mode 100644 index 0000000..14ba73a --- /dev/null +++ b/command.md @@ -0,0 +1,14 @@ +训练(默认) +bash train.sh + +测试(实时+显示画面) +GYM_CPU_MODE=test GYM_CPU_TEST_MODEL=scripts/gyms/logs/Walk_R0_005/best_model.zip GYM_CPU_TEST_FOLDER=scripts/gyms/logs/Walk_R0_005/ GYM_CPU_TEST_NO_RENDER=0 GYM_CPU_TEST_NO_REALTIME=0 bash train.sh + +测试(无画面、非实时) +GYM_CPU_MODE=test GYM_CPU_TEST_NO_RENDER=1 GYM_CPU_TEST_NO_REALTIME=1 bash train.sh + +retrain(继续训练) +GYM_CPU_MODE=train GYM_CPU_TRAIN_MODEL=scripts/gyms/logs/Walk_R0_005/best_model.zip bash train.sh + +retrain+改训练超参 +GYM_CPU_MODE=train GYM_CPU_TRAIN_MODEL=scripts/gyms/logs/Walk_R0_004/best_model.zip GYM_CPU_TRAIN_LR=2e-4 GYM_CPU_TRAIN_BATCH_SIZE=256 GYM_CPU_TRAIN_EPOCHS=8 bash train.sh \ No newline at end of file diff --git a/communication/server.py b/communication/server.py index c956d0a..2720187 100644 --- a/communication/server.py +++ b/communication/server.py @@ -1,5 +1,6 @@ import logging import socket +import time from select import select from communication.world_parser import WorldParser @@ -10,15 +11,27 @@ class Server: def __init__(self, host: str, port: int, world_parser: WorldParser): self.world_parser: WorldParser = world_parser self.__host: str = host - self.__port: str = port - self.__socket: socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self.__socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) + self.__port: int = port + self.__socket: socket.socket = self._create_socket() self.__send_buff = [] self.__rcv_buffer_size = 1024 self.__rcv_buffer = bytearray(self.__rcv_buffer_size) + def _create_socket(self) -> socket.socket: + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) + return sock + def connect(self) -> None: logger.info("Connecting to server at %s:%d...", self.__host, self.__port) + + # Always reconnect with a fresh socket object. + try: + self.__socket.close() + except OSError: + pass + self.__socket = self._create_socket() + while True: try: self.__socket.connect((self.__host, self.__port)) @@ -27,12 +40,19 @@ class Server: logger.error( "Connection refused. Make sure the server is running and listening on {self.__host}:{self.__port}." ) + time.sleep(0.05) logger.info(f"Server connection established to {self.__host}:{self.__port}.") def shutdown(self) -> None: - self.__socket.close() - self.__socket.shutdown(socket.SHUT_RDWR) + try: + self.__socket.shutdown(socket.SHUT_RDWR) + except OSError: + pass + try: + self.__socket.close() + except OSError: + pass def send_immediate(self, msg: str) -> None: """ diff --git a/scripts/commons/Server.py b/scripts/commons/Server.py index 9e9a09a..b67b957 100644 --- a/scripts/commons/Server.py +++ b/scripts/commons/Server.py @@ -1,9 +1,10 @@ import subprocess import os +import time class Server(): - def __init__(self, first_server_p, first_monitor_p, n_servers) -> None: + def __init__(self, first_server_p, first_monitor_p, n_servers, no_render=True, no_realtime=True) -> None: try: import psutil self.check_running_servers(psutil, first_server_p, first_monitor_p, n_servers) @@ -17,21 +18,32 @@ class Server(): # makes it easier to kill test servers without affecting train servers cmd = "rcssservermj" + render_arg = "--no-render" if no_render else "" + realtime_arg = "--no-realtime" if no_realtime else "" for i in range(n_servers): port = first_server_p + i mport = first_monitor_p + i - server_cmd = f"{cmd} -c {port} -m {mport} " + server_cmd = f"{cmd} -c {port} -m {mport} {render_arg} {realtime_arg}".strip() - self.rcss_processes.append( - subprocess.Popen( - server_cmd.split(), - stdout=subprocess.DEVNULL, - stderr=subprocess.STDOUT, - start_new_session=True - ) + proc = subprocess.Popen( + server_cmd.split(), + stdout=subprocess.DEVNULL, + stderr=subprocess.STDOUT, + start_new_session=True ) + # Avoid startup storm when launching many servers at once. + time.sleep(0.03) + + rc = proc.poll() + if rc is not None: + raise RuntimeError( + f"rcssservermj exited early (code={rc}) on server port {port}, monitor port {mport}" + ) + + self.rcss_processes.append(proc) + def check_running_servers(self, psutil, first_server_p, first_monitor_p, n_servers): ''' Check if any server is running on chosen ports ''' found = False diff --git a/scripts/gyms/Walk.py b/scripts/gyms/Walk.py old mode 100644 new mode 100755 index d5fea42..8a2cfeb --- a/scripts/gyms/Walk.py +++ b/scripts/gyms/Walk.py @@ -7,7 +7,8 @@ from random import random from random import uniform from stable_baselines3 import PPO -from stable_baselines3.common.vec_env import SubprocVecEnv +from stable_baselines3.common.monitor import Monitor +from stable_baselines3.common.vec_env import SubprocVecEnv, DummyVecEnv import gymnasium as gym from gymnasium import spaces @@ -164,6 +165,32 @@ class WalkEnv(gym.Env): ) self.start_time = time.time() + def _reconnect_server(self): + try: + self.Player.server.shutdown() + except Exception: + pass + + self.Player.server.connect() + self.Player.server.send_immediate( + f"(init {self.Player.robot.name} {self.Player.world.team_name} {self.Player.world.number})" + ) + + def _safe_receive_world_update(self, retries=1): + last_exc = None + for attempt in range(retries + 1): + try: + self.Player.server.receive() + self.Player.world.update() + return + except (ConnectionResetError, OSError) as exc: + last_exc = exc + if attempt >= retries: + raise + self._reconnect_server() + if last_exc is not None: + raise last_exc + def debug_log(self, message): print(message) try: @@ -231,8 +258,7 @@ class WalkEnv(gym.Env): def sync(self): ''' Run a single simulation step ''' - self.Player.server.receive() - self.Player.world.update() + self._safe_receive_world_update(retries=1) self.Player.robot.commit_motor_targets_pd() self.Player.server.send() if self._target_dt > 0.0: @@ -302,8 +328,7 @@ class WalkEnv(gym.Env): beam_yaw = uniform(-self.reset_beam_yaw_range_deg, self.reset_beam_yaw_range_deg) for _ in range(5): - self.Player.server.receive() - self.Player.world.update() + self._safe_receive_world_update(retries=2) self.Player.robot.commit_motor_targets_pd() self.Player.server.commit_beam(pos2d=(beam_x, beam_y), rotation=beam_yaw) self.Player.server.send() @@ -510,13 +535,14 @@ class Train(Train_Base): def train(self, args): # --------------------------------------- Learning parameters - n_envs = 20 # Reduced from 8 to decrease CPU/network pressure during init + n_envs = int(os.environ.get("GYM_CPU_N_ENVS", "20")) if n_envs < 1: raise ValueError("GYM_CPU_N_ENVS must be >= 1") - n_steps_per_env = 256 # RolloutBuffer is of size (n_steps_per_env * n_envs) - minibatch_size = 512 # should be a factor of (n_steps_per_env * n_envs) + server_warmup_sec = float(os.environ.get("GYM_CPU_SERVER_WARMUP_SEC", "3.0")) + n_steps_per_env = int(os.environ.get("GYM_CPU_TRAIN_STEPS_PER_ENV", "256")) # RolloutBuffer is of size (n_steps_per_env * n_envs) + minibatch_size = int(os.environ.get("GYM_CPU_TRAIN_BATCH_SIZE", "512")) # should be a factor of (n_steps_per_env * n_envs) total_steps = 30000000 - learning_rate = 1e-4 + learning_rate = float(os.environ.get("GYM_CPU_TRAIN_LR", "3e-4")) folder_name = f'Walk_R{self.robot_type}' model_path = f'./scripts/gyms/logs/{folder_name}/' @@ -524,22 +550,29 @@ class Train(Train_Base): print(f"Using {n_envs} parallel environments") # --------------------------------------- Run algorithm - def init_env(i_env): + def init_env(i_env, monitor=False): def thunk(): - return WalkEnv(self.ip, self.server_p + i_env) + env = WalkEnv(self.ip, self.server_p + i_env) + if monitor: + env = Monitor(env) + return env return thunk server_log_dir = os.path.join(model_path, "server_logs") os.makedirs(server_log_dir, exist_ok=True) - servers = Train_Server(self.server_p, self.monitor_p_1000, n_envs + 1) # include 1 extra server for testing + servers = Train_Server(self.server_p, self.monitor_p_1000, n_envs + 1, no_render=True, no_realtime=True) # include 1 extra server for testing # Wait for servers to start print(f"Starting {n_envs + 1} rcssservermj servers...") + if server_warmup_sec > 0: + print(f"Waiting {server_warmup_sec:.1f}s for server warmup...") + sleep(server_warmup_sec) print("Servers started, creating environments...") - env = SubprocVecEnv([init_env(i) for i in range(n_envs)]) - eval_env = SubprocVecEnv([init_env(n_envs)]) + env = SubprocVecEnv([init_env(i, monitor=True) for i in range(n_envs)]) + # Use single-process eval env to avoid extra subprocess fragility during callback evaluation. + eval_env = DummyVecEnv([init_env(n_envs, monitor=True)]) try: # Custom policy network architecture @@ -564,16 +597,17 @@ class Train(Train_Base): learning_rate=learning_rate, device="cpu", policy_kwargs=policy_kwargs, - ent_coef=0.03, # Entropy coefficient for exploration - clip_range=0.13, # PPO clipping parameter + ent_coef=float(os.environ.get("GYM_CPU_TRAIN_ENT_COEF", "0.05")), # Entropy coefficient for exploration + clip_range=float(os.environ.get("GYM_CPU_TRAIN_CLIP_RANGE", "0.2")), # PPO clipping parameter gae_lambda=0.95, # GAE lambda - gamma=0.95 , # Discount factor + gamma=float(os.environ.get("GYM_CPU_TRAIN_GAMMA", "0.95")), # Discount factor target_kl=0.03, - n_epochs=5 + n_epochs=int(os.environ.get("GYM_CPU_TRAIN_EPOCHS", "5")), + # tensorboard_log=f"./scripts/gyms/logs/{folder_name}/tensorboard/" ) model_path = self.learn_model(model, total_steps, model_path, eval_env=eval_env, - eval_freq=n_steps_per_env * 10, save_freq=n_steps_per_env * 10, + eval_freq=n_steps_per_env * 20, save_freq=n_steps_per_env * 20, backup_env_file=__file__) except KeyboardInterrupt: sleep(1) # wait for child processes @@ -590,7 +624,16 @@ class Train(Train_Base): # Uses different server and monitor ports server_log_dir = os.path.join(args["folder_dir"], "server_logs") os.makedirs(server_log_dir, exist_ok=True) - server = Train_Server(self.server_p - 1, self.monitor_p, 1) + test_no_render = os.environ.get("GYM_CPU_TEST_NO_RENDER", "0") == "1" + test_no_realtime = os.environ.get("GYM_CPU_TEST_NO_REALTIME", "0") == "1" + + server = Train_Server( + self.server_p - 1, + self.monitor_p, + 1, + no_render=test_no_render, + no_realtime=test_no_realtime, + ) env = WalkEnv(self.ip, self.server_p - 1) model = PPO.load(args["model_file"], env=env) @@ -621,6 +664,16 @@ if __name__ == "__main__": ) trainer = Train(script_args) - trainer.train({"model_file": "scripts/gyms/logs/Walk_R0_004/best_model.zip"}) - # trainer.test({"model_file": "scripts/gyms/logs/Walk_R0_004/best_model.zip", - # "folder_dir": "scripts/gyms/logs/Walk_R0_004/",}) \ No newline at end of file + + run_mode = os.environ.get("GYM_CPU_MODE", "train").strip().lower() + + if run_mode == "test": + test_model_file = os.environ.get("GYM_CPU_TEST_MODEL", "scripts/gyms/logs/Walk_R0_004/best_model.zip") + test_folder = os.environ.get("GYM_CPU_TEST_FOLDER", "scripts/gyms/logs/Walk_R0_004/") + trainer.test({"model_file": test_model_file, "folder_dir": test_folder}) + else: + retrain_model = os.environ.get("GYM_CPU_TRAIN_MODEL", "").strip() + if retrain_model: + trainer.train({"model_file": retrain_model}) + else: + trainer.train({}) \ No newline at end of file diff --git a/train.sh b/train.sh new file mode 100755 index 0000000..a4b910a --- /dev/null +++ b/train.sh @@ -0,0 +1,126 @@ +#!/usr/bin/env bash +set -euo pipefail + +# ------------------------------ +# 资源限制配置(cgroup v2 + systemd-run) +# ------------------------------ +# 说明: +# 1) 这个脚本会把训练进程放进一个临时的 systemd scope 中,并施加 CPU/内存上限。 +# 2) 仅限制“本次训练进程”,不会永久改系统配置。 +# 3) 下面变量都支持“环境变量覆盖”,即你可以在命令前临时指定。 +# +# CPU 核数基准(默认 20): +# 例如你的机器按 20 核预算来算,可保持默认。 +CORES="${CORES:-20}" +# CPU 占用百分比(默认 95): +# 最终会与 CORES 相乘得到 CPUQuota。 +# 例:CORES=20, UTIL_PERCENT=95 -> CPUQuota=1900%(约 19 核等效) +UTIL_PERCENT="${UTIL_PERCENT:-95}" +CPU_QUOTA="$((CORES * UTIL_PERCENT))%" + +# 内存上限(默认 28G): +# 可改成 16G、24G 等,避免训练把系统内存吃满。 +MEMORY_MAX="${MEMORY_MAX:-28G}" + +# ------------------------------ +# 训练运行参数(由 scripts/gyms/Walk.py 读取) +# ------------------------------ +# 运行模式:train 或 test +GYM_CPU_MODE="${GYM_CPU_MODE:-train}" + +# 并行环境数量:越大通常吞吐越高,但也更容易触发服务器连接不稳定。 +# 建议从 8~12 起步,稳定后再升到 16/20。 +GYM_CPU_N_ENVS="${GYM_CPU_N_ENVS:-20}" +# 服务器预热时间(秒): +# 在批量拉起 rcssserver 后等待一段时间,再创建 SubprocVecEnv, +# 可降低 ConnectionReset/EOFError 概率。 +GYM_CPU_SERVER_WARMUP_SEC="${GYM_CPU_SERVER_WARMUP_SEC:-10}" + +# 训练专用参数 +GYM_CPU_TRAIN_STEPS_PER_ENV="${GYM_CPU_TRAIN_STEPS_PER_ENV:-256}" +GYM_CPU_TRAIN_BATCH_SIZE="${GYM_CPU_TRAIN_BATCH_SIZE:-512}" +GYM_CPU_TRAIN_LR="${GYM_CPU_TRAIN_LR:-1e-4}" +GYM_CPU_TRAIN_ENT_COEF="${GYM_CPU_TRAIN_ENT_COEF:-0.03}" +GYM_CPU_TRAIN_CLIP_RANGE="${GYM_CPU_TRAIN_CLIP_RANGE:-0.13}" +GYM_CPU_TRAIN_GAMMA="${GYM_CPU_TRAIN_GAMMA:-0.95}" +GYM_CPU_TRAIN_EPOCHS="${GYM_CPU_TRAIN_EPOCHS:-5}" +GYM_CPU_TRAIN_MODEL="${GYM_CPU_TRAIN_MODEL:-}" + +# 测试专用参数 +GYM_CPU_TEST_MODEL="${GYM_CPU_TEST_MODEL:-scripts/gyms/logs/Walk_R0_004/best_model.zip}" +GYM_CPU_TEST_FOLDER="${GYM_CPU_TEST_FOLDER:-scripts/gyms/logs/Walk_R0_004/}" +# 测试默认实时且显示画面:默认均为 0 +# 设为 1 表示关闭对应能力 +GYM_CPU_TEST_NO_RENDER="${GYM_CPU_TEST_NO_RENDER:-0}" +GYM_CPU_TEST_NO_REALTIME="${GYM_CPU_TEST_NO_REALTIME:-0}" + +# Python 解释器选择策略: +# 1) 优先使用你手动传入的 PYTHON_BIN +# 2) 其次用当前激活 conda 环境(CONDA_PREFIX/bin/python) +# 3) 再回退到默认 mujoco 环境路径 +# 4) 最后尝试系统 python / python3 +DEFAULT_PYTHON="/home/solren/Downloads/Anaconda/envs/mujoco/bin/python" +CONDA_PYTHON="${CONDA_PREFIX:-}/bin/python" + +# 安全保护:不要用 sudo 运行。 +# 原因:sudo 可能导致 conda 环境与用户会话环境不一致, +# 会引发 python 路径丢失、systemd --user 会话不可见等问题。 +if [[ "${EUID}" -eq 0 ]]; then + echo "Do not run this script with sudo; run as your normal user in conda env 'mujoco'." + exit 1 +fi + +# 解析最终使用的 Python 可执行文件。 +if [[ -n "${PYTHON_BIN:-}" ]]; then + PYTHON_EXEC="${PYTHON_BIN}" +elif [[ -n "${CONDA_PREFIX:-}" && -x "${CONDA_PYTHON}" ]]; then + PYTHON_EXEC="${CONDA_PYTHON}" +elif [[ -x "${DEFAULT_PYTHON}" ]]; then + PYTHON_EXEC="${DEFAULT_PYTHON}" +elif command -v python >/dev/null 2>&1; then + PYTHON_EXEC="$(command -v python)" +elif command -v python3 >/dev/null 2>&1; then + PYTHON_EXEC="$(command -v python3)" +else + echo "No Python executable found. Set PYTHON_BIN=/abs/path/to/python and retry." + exit 1 +fi + +# 脚本所在目录(绝对路径),便于后续定位模块/相对路径。 +SCRIPT_DIR="$(cd "$(dirname "$0")" && pwd)" + +# 打印当前生效配置,方便排障和复现实验。 +echo "Starting training with limits: CPU=${CPU_QUOTA}, Memory=${MEMORY_MAX}" +echo "Mode: ${GYM_CPU_MODE}" +echo "Runtime knobs: GYM_CPU_N_ENVS=${GYM_CPU_N_ENVS}, GYM_CPU_SERVER_WARMUP_SEC=${GYM_CPU_SERVER_WARMUP_SEC}" +echo "Using Python: ${PYTHON_EXEC}" +if [[ -n "${CONDA_DEFAULT_ENV:-}" ]]; then + echo "Detected conda env: ${CONDA_DEFAULT_ENV}" +fi + +# 使用 systemd-run --user --scope 启动“受限资源”的训练进程: +# - CPUQuota: 总 CPU 配额 +# - MemoryMax: 最大内存 +# - env ... : 显式传递训练参数到 Python 进程 +# - python -m scripts.gyms.Walk: 以模块方式启动训练入口 +systemd-run --user --scope \ + -p CPUQuota="${CPU_QUOTA}" \ + -p MemoryMax="${MEMORY_MAX}" \ + env \ + GYM_CPU_MODE="${GYM_CPU_MODE}" \ + GYM_CPU_N_ENVS="${GYM_CPU_N_ENVS}" \ + GYM_CPU_SERVER_WARMUP_SEC="${GYM_CPU_SERVER_WARMUP_SEC}" \ + GYM_CPU_TRAIN_STEPS_PER_ENV="${GYM_CPU_TRAIN_STEPS_PER_ENV}" \ + GYM_CPU_TRAIN_BATCH_SIZE="${GYM_CPU_TRAIN_BATCH_SIZE}" \ + GYM_CPU_TRAIN_LR="${GYM_CPU_TRAIN_LR}" \ + GYM_CPU_TRAIN_ENT_COEF="${GYM_CPU_TRAIN_ENT_COEF}" \ + GYM_CPU_TRAIN_CLIP_RANGE="${GYM_CPU_TRAIN_CLIP_RANGE}" \ + GYM_CPU_TRAIN_GAMMA="${GYM_CPU_TRAIN_GAMMA}" \ + GYM_CPU_TRAIN_EPOCHS="${GYM_CPU_TRAIN_EPOCHS}" \ + GYM_CPU_TRAIN_MODEL="${GYM_CPU_TRAIN_MODEL}" \ + GYM_CPU_TEST_MODEL="${GYM_CPU_TEST_MODEL}" \ + GYM_CPU_TEST_FOLDER="${GYM_CPU_TEST_FOLDER}" \ + GYM_CPU_TEST_NO_RENDER="${GYM_CPU_TEST_NO_RENDER}" \ + GYM_CPU_TEST_NO_REALTIME="${GYM_CPU_TEST_NO_REALTIME}" \ + "${PYTHON_EXEC}" "-m" "scripts.gyms.Walk" +