Add .gitignore and amend communication

This commit is contained in:
徐学颢
2026-03-12 20:12:00 +08:00
parent 1f4ab43c3c
commit 02afa3c1fc
3 changed files with 82 additions and 6 deletions

12
.gitignore vendored Normal file
View File

@@ -0,0 +1,12 @@
.venv/
.vscode/
**/__pycache__/
poetry.lock
poetry.toml
**/*.log
**/*.txt
**/build/
**/install/
**/log/
*.spec
dist/

View File

@@ -50,10 +50,13 @@ class Server:
"""
Send all committed messages
"""
if len(select([self.__socket], [], [], 0.0)[0]) == 0:
if not self.__send_buff:
return
if len(select([self.__socket], [], [], 0.0)[0]) != 0:
logger.debug("Socket is readable while sending; keeping full-duplex command send.")
self.send_immediate(("".join(self.__send_buff)))
else:
logger.info("Server_Comm.py: Received a new packet while thinking!")
self.__send_buff = []
def commit(self, msg: str) -> None:

View File

@@ -1,4 +1,5 @@
import logging
import os
import re
import numpy as np
from scipy.spatial.transform import Rotation as R
@@ -7,6 +8,16 @@ from utils.math_ops import MathOps
from world.commons.play_mode import PlayModeEnum
logger = logging.getLogger()
DEBUG_LOG_FILE = os.path.join(os.path.dirname(os.path.dirname(__file__)), "comm_debug.log")
def _debug_log(message: str) -> None:
print(message)
try:
with open(DEBUG_LOG_FILE, "a", encoding="utf-8") as f:
f.write(message + "\n")
except OSError:
pass
class WorldParser:
@@ -14,6 +25,36 @@ class WorldParser:
from agent.base_agent import Base_Agent # type hinting
self.agent: Base_Agent = agent
self._hj_debug_prints = 0
def _normalize_motor_name(self, motor_name: str) -> str:
alias_map = {
"q_hj1": "he1",
"q_hj2": "he2",
"q_laj1": "lae1",
"q_laj2": "lae2",
"q_laj3": "lae3",
"q_laj4": "lae4",
"q_raj1": "rae1",
"q_raj2": "rae2",
"q_raj3": "rae3",
"q_raj4": "rae4",
"q_wj1": "te1",
"q_tj1": "te1",
"q_llj1": "lle1",
"q_llj2": "lle2",
"q_llj3": "lle3",
"q_llj4": "lle4",
"q_llj5": "lle5",
"q_llj6": "lle6",
"q_rlj1": "rle1",
"q_rlj2": "rle2",
"q_rlj3": "rle3",
"q_rlj4": "rle4",
"q_rlj5": "rle5",
"q_rlj6": "rle6",
}
return alias_map.get(motor_name, motor_name)
def parse(self, message: str) -> None:
perception_dict: dict = self.__sexpression_to_dict(message)
@@ -51,9 +92,29 @@ class WorldParser:
robot = self.agent.robot
robot.motor_positions = {h["n"]: h["ax"] for h in perception_dict["HJ"]}
hj_states = perception_dict["HJ"] if isinstance(perception_dict["HJ"], list) else [perception_dict["HJ"]]
robot.motor_speeds = {h["n"]: h["vx"] for h in perception_dict["HJ"]}
if self._hj_debug_prints < 5:
names = [joint_state.get("n", "<missing>") for joint_state in hj_states]
normalized_names = [self._normalize_motor_name(name) for name in names]
matched_names = [name for name in names if name in robot.motor_positions]
matched_normalized_names = [name for name in normalized_names if name in robot.motor_positions]
# _debug_log(
# "[ParserDebug] "
# f"hj_count={len(hj_states)} "
# f"sample_names={names[:8]} "
# f"normalized_sample={normalized_names[:8]} "
# f"matched={len(matched_names)}/{len(names)} "
# f"matched_normalized={len(matched_normalized_names)}/{len(normalized_names)}"
# )
self._hj_debug_prints += 1
for joint_state in hj_states:
motor_name = self._normalize_motor_name(joint_state["n"])
if motor_name in robot.motor_positions:
robot.motor_positions[motor_name] = joint_state["ax"]
if motor_name in robot.motor_speeds:
robot.motor_speeds[motor_name] = joint_state["vx"]
world._global_cheat_position = np.array(perception_dict["pos"]["p"])