commit 571b4283c7abc999a83695c239eb3abcb0dd6282 Author: ChenXi Date: Sat Mar 14 21:31:00 2026 -0400 Init diff --git a/agent/__pycache__/agent.cpython-311.pyc b/agent/__pycache__/agent.cpython-311.pyc new file mode 100644 index 0000000..dc0ab33 Binary files /dev/null and b/agent/__pycache__/agent.cpython-311.pyc differ diff --git a/agent/__pycache__/base_agent.cpython-311.pyc b/agent/__pycache__/base_agent.cpython-311.pyc new file mode 100644 index 0000000..194ed9f Binary files /dev/null and b/agent/__pycache__/base_agent.cpython-311.pyc differ diff --git a/agent/agent.py b/agent/agent.py new file mode 100644 index 0000000..7e88752 --- /dev/null +++ b/agent/agent.py @@ -0,0 +1,135 @@ +from dataclasses import Field +import logging +from typing import Mapping + +import numpy as np +from utils.math_ops import MathOps +from world.commons.field import FIFAField, HLAdultField +from world.commons.play_mode import PlayModeEnum, PlayModeGroupEnum + + +logger = logging.getLogger() + + +class Agent: + """ + Responsible for deciding what the agent should do at each moment. + + This class is called every simulation step to update the agent's behavior + based on the current state of the world and game conditions. + """ + + BEAM_POSES: Mapping[type[Field], Mapping[int, tuple[float, float, float]]] ={ + FIFAField: { + 1: (2.1, 0, 0), + 2: (22.0, 12.0, 0), + 3: (22.0, 4.0, 0), + 4: (22.0, -4.0, 0), + 5: (22.0, -12.0, 0), + 6: (15.0, 0.0, 0), + 7: (4.0, 16.0, 0), + 8: (11.0, 6.0, 0), + 9: (11.0, -6.0, 0), + 10: (4.0, -16.0, 0), + 11: (7.0, 0.0, 0), + }, + HLAdultField: { + 1: (7.0, 0.0, 0), + 2: (2.0, -1.5, 0), + 3: (2.0, 1.5, 0), + } + } + + def __init__(self, agent): + """ + Creates a new DecisionMaker linked to the given agent. + + Args: + agent: The main agent that owns this DecisionMaker. + """ + from agent.base_agent import Base_Agent # type hinting + + self.agent: Base_Agent = agent + self.is_getting_up: bool = False + + def update_current_behavior(self) -> None: + """ + Chooses what the agent should do in the current step. + + This function checks the game state and decides which behavior + or skill should be executed next. + """ + + if self.agent.world.playmode is PlayModeEnum.GAME_OVER: + return + + if self.agent.world.playmode_group in ( + PlayModeGroupEnum.ACTIVE_BEAM, + PlayModeGroupEnum.PASSIVE_BEAM, + ): + self.agent.server.commit_beam( + pos2d=self.BEAM_POSES[type(self.agent.world.field)][self.agent.world.number][:2], + rotation=self.BEAM_POSES[type(self.agent.world.field)][self.agent.world.number][2], + ) + + if self.is_getting_up or self.agent.skills_manager.is_ready(skill_name="GetUp"): + self.is_getting_up = not self.agent.skills_manager.execute(skill_name="GetUp") + + elif self.agent.world.playmode is PlayModeEnum.PLAY_ON: + self.carry_ball() + elif self.agent.world.playmode in (PlayModeEnum.BEFORE_KICK_OFF, PlayModeEnum.THEIR_GOAL, PlayModeEnum.OUR_GOAL): + self.agent.skills_manager.execute("Neutral") + else: + self.carry_ball() + + self.agent.robot.commit_motor_targets_pd() + + def carry_ball(self): + """ + Basic example of a behavior: moves the robot toward the goal while handling the ball. + """ + their_goal_pos = self.agent.world.field.get_their_goal_position()[:2] + ball_pos = self.agent.world.ball_pos[:2] + my_pos = self.agent.world.global_position[:2] + + ball_to_goal = their_goal_pos - ball_pos + bg_norm = np.linalg.norm(ball_to_goal) + if bg_norm == 0: + return + ball_to_goal_dir = ball_to_goal / bg_norm + + dist_from_ball_to_start_carrying = 0.30 + carry_ball_pos = ball_pos - ball_to_goal_dir * dist_from_ball_to_start_carrying + + my_to_ball = ball_pos - my_pos + my_to_ball_norm = np.linalg.norm(my_to_ball) + if my_to_ball_norm == 0: + my_to_ball_dir = np.zeros(2) + else: + my_to_ball_dir = my_to_ball / my_to_ball_norm + + cosang = np.dot(my_to_ball_dir, ball_to_goal_dir) + cosang = np.clip(cosang, -1.0, 1.0) + angle_diff = np.arccos(cosang) + + ANGLE_TOL = np.deg2rad(7.5) + aligned = (my_to_ball_norm > 1e-6) and (angle_diff <= ANGLE_TOL) + + behind_ball = np.dot(my_pos - ball_pos, ball_to_goal_dir) < 0 + desired_orientation = MathOps.vector_angle(ball_to_goal) + + if not aligned or not behind_ball: + self.agent.skills_manager.execute( + "Walk", + target_2d=carry_ball_pos, + is_target_absolute=True, + orientation=None if np.linalg.norm(my_pos - carry_ball_pos) > 2 else desired_orientation + ) + else: + self.agent.skills_manager.execute( + "Walk", + target_2d=their_goal_pos, + is_target_absolute=True, + orientation=desired_orientation + ) + diff --git a/agent/base_agent.py b/agent/base_agent.py new file mode 100644 index 0000000..b9ae3c9 --- /dev/null +++ b/agent/base_agent.py @@ -0,0 +1,81 @@ +import logging + +from agent.agent import Agent +from world.robot import T1, Robot +from behaviors.behavior_manager import BehaviorManager +from world.world import World +from communication.server import Server +from communication.world_parser import WorldParser + +logger = logging.getLogger(__file__) + + +class Base_Agent: + def __init__( + self, + team_name: str = "Default", + number: int = 1, + host: str = "localhost", + port: int = 60000, + field: str = 'fifa' + ): + """ + Initializes the agent and all its main components. + + Args: + team_name (str): The name of the team the agent belongs to. + number (int): The player number assigned to this agent. + host (str): The host address of the simulator server. + port (int): The port number of the simulator server. + field (str): The name of the field configuration to use. + """ + + self.world: World = World(agent=self, team_name=team_name, number=number, field_name=field) + self.world_parser: WorldParser = WorldParser(agent=self) + self.server: Server = Server( + host=host, port=port, world_parser=self.world_parser + ) + self.robot: Robot = T1(agent=self) + self.skills_manager: BehaviorManager = BehaviorManager(agent=self) + self.decision_maker: Agent = Agent(agent=self) + + def run(self): + """ + Starts the agent’s main control loop. + + This method: + 1. Connects to the simulator server. + 2. Sends the initial configuration (init message). + 3. Enters the main loop, where it: + - Receives and parses world updates. + - Updates internal world representation. + - Executes the decision-making process. + - Sends the next set of commands to the server. + """ + self.server.connect() + + self.server.send_immediate( + f"(init {self.robot.name} {self.world.team_name} {self.world.number})" + ) + + while True: + try: + self.server.receive() + + self.world.update() + + self.decision_maker.update_current_behavior() + + self.server.send() + except Exception: + self.shutdown() + raise + + def shutdown(self): + """ + Safely shuts down the agent. + + Logs a shutdown message and closes the server connection. + """ + logger.info("Shutting down.") + self.server.shutdown() \ No newline at end of file diff --git a/behaviors/__pycache__/behavior.cpython-311.pyc b/behaviors/__pycache__/behavior.cpython-311.pyc new file mode 100644 index 0000000..3f68cb8 Binary files /dev/null and b/behaviors/__pycache__/behavior.cpython-311.pyc differ diff --git a/behaviors/__pycache__/behavior_manager.cpython-311.pyc b/behaviors/__pycache__/behavior_manager.cpython-311.pyc new file mode 100644 index 0000000..fdb195d Binary files /dev/null and b/behaviors/__pycache__/behavior_manager.cpython-311.pyc differ diff --git a/behaviors/behavior.py b/behaviors/behavior.py new file mode 100644 index 0000000..e7ea2e6 --- /dev/null +++ b/behaviors/behavior.py @@ -0,0 +1,45 @@ +from abc import ABC, abstractmethod + + +class Behavior(ABC): + """ + Base interface for skills. + Each skill must implement: + - execute(reset: bool, *args) -> bool + - is_ready(*args) -> bool + """ + + def __init__(self, agent): + from agent.base_agent import Base_Agent # type hinting + + self.agent: Base_Agent = agent + + @abstractmethod + def execute(self, reset: bool, *args, **kwargs) -> bool: + """ + Executes one step of the skill. + + Parameters + ---------- + reset : bool + Indicates if this is the first invocation of this skill (should reset internal state). + *args + Skill-specific arguments. + + Returns + ------- + finished : bool + True if the skill has finished execution. + """ + raise NotImplementedError("Method 'execute' must be implemented in the Skill.") + + @abstractmethod + def is_ready(self, *args) -> bool: + """ + Checks if the current conditions allow starting the skill. + + Returns + ------- + ready : bool + """ + raise NotImplementedError("Method 'is_ready' must be implemented in the Skill.") diff --git a/behaviors/behavior_manager.py b/behaviors/behavior_manager.py new file mode 100644 index 0000000..c54f17c --- /dev/null +++ b/behaviors/behavior_manager.py @@ -0,0 +1,79 @@ +from behaviors.custom.keyframe.get_up.get_up import GetUp +from behaviors.custom.keyframe.keyframe import KeyframeSkill +from behaviors.custom.keyframe.poses.neutral.neutral import Neutral +from behaviors.behavior import Behavior +from behaviors.custom.reinforcement.walk.walk import Walk + + +class BehaviorManager: + + def __init__(self, agent) -> None: + self.current_skill_name: str | None = None + self.previous_skill_name: str | None = None + self.current_sub_skill_name: str | None = None + self.skills: dict[str, Behavior] = {} + self.agent = agent + + self.create_skills() + + def create_skills(self): + """ + Loads all Skill classes and instantiates them. + Each skill is indexed by its class name. + """ + + classes: list[type[Behavior]] = [Walk, Neutral, GetUp] + + # instantiate each Skill and store in the skills dictionary + self.skills = {cls.__name__: cls(agent=self.agent) for cls in classes} + + def get_skill_object(self, name: str) -> Behavior: + """Returns the Skill instance corresponding to the given name.""" + if name not in self.skills: + raise KeyError(f"No skill found with the name '{name}'") + return self.skills[name] + + def execute(self, skill_name: str, *args, **kwargs) -> bool: + """ + Executes one step of the specified skill. + + - On the first call for a skill, `reset=True` is sent. + - Returns True when the skill has finished execution. + """ + skill = self.get_skill_object(skill_name) + + # detect if the skill has changed to trigger automatic reset + reset = self.current_skill_name != skill_name + if reset: + # previous skill was interrupted + if self.current_skill_name is not None: + self.previous_skill_name = self.current_skill_name + self.current_skill_name = skill_name + + # call the Skill's own execute method + finished = skill.execute(reset, *args, **kwargs) + if not finished: + return False + + # skill finished execution + self.previous_skill_name = self.current_skill_name + self.current_skill_name = None + return True + + def execute_sub_skill(self, skill_name: str, reset: bool, *args, **kwargs) -> bool: + """ + Executes a step of a sub-skill within another skill. + + - Does not change `current_skill_name`. + - `current_sub_skill_name` reflects the sub-skill currently being executed. + """ + skill = self.get_skill_object(skill_name) + self.current_sub_skill_name = skill_name + return skill.execute(reset, *args, **kwargs) + + def is_ready(self, skill_name: str, *args, **kwargs) -> bool: + """ + Checks if the specified skill is ready to start based on current conditions. + """ + skill = self.get_skill_object(skill_name) + return skill.is_ready(*args, **kwargs) diff --git a/behaviors/custom/keyframe/__pycache__/keyframe.cpython-311.pyc b/behaviors/custom/keyframe/__pycache__/keyframe.cpython-311.pyc new file mode 100644 index 0000000..6cd2ee0 Binary files /dev/null and b/behaviors/custom/keyframe/__pycache__/keyframe.cpython-311.pyc differ diff --git a/behaviors/custom/keyframe/get_up/__pycache__/get_up.cpython-311.pyc b/behaviors/custom/keyframe/get_up/__pycache__/get_up.cpython-311.pyc new file mode 100644 index 0000000..297b19c Binary files /dev/null and b/behaviors/custom/keyframe/get_up/__pycache__/get_up.cpython-311.pyc differ diff --git a/behaviors/custom/keyframe/get_up/get_up.py b/behaviors/custom/keyframe/get_up/get_up.py new file mode 100644 index 0000000..c0be1ef --- /dev/null +++ b/behaviors/custom/keyframe/get_up/get_up.py @@ -0,0 +1,66 @@ +from collections import deque +import logging +import os + +from behaviors.behavior import Behavior +from behaviors.custom.keyframe.keyframe import KeyframeSkill + +logger = logging.getLogger() + +class GetUp(Behavior): + STABILITY_THRESHOLD_CYCLES: int = 3 + NEUTRAL_EXECUTION_TIME: float = 1.5 + def __init__(self, agent): + super().__init__(agent) + self.get_up_front = KeyframeSkill( + agent=agent, + file=os.path.join(os.path.dirname(__file__), "get_up_front.yaml"), + ) + self.get_up_back = KeyframeSkill( + agent=agent, + file=os.path.join(os.path.dirname(__file__), "get_up_back.yaml"), + ) + + def execute(self, reset, *args, **kwargs): + + robot = self.agent.robot + + if reset: + self.neutral_start_time = None + self.has_get_up = False + self.gyro_queue = deque(maxlen=self.STABILITY_THRESHOLD_CYCLES) + self.state = 0 + self.chosen_get_up = None + self.should_reset_get_up = True + + if not self.has_get_up: + if not self.chosen_get_up and self.agent.skills_manager.execute_sub_skill("Neutral", True): + self.gyro_queue.append(max(abs(robot.gyroscope))) + if len(self.gyro_queue) == self.STABILITY_THRESHOLD_CYCLES and all(g < 2.5 for g in self.gyro_queue): + if abs(robot.accelerometer[1]) < 2 and abs(robot.accelerometer[2]) < 3: + if robot.accelerometer[0] < -8: + self.chosen_get_up = self.get_up_front + elif robot.accelerometer[0] > 8: + self.chosen_get_up = self.get_up_back + if self.chosen_get_up: + self.has_get_up = self.chosen_get_up.execute(reset=self.should_reset_get_up) + self.should_reset_get_up = False + else: + if not self.neutral_start_time: + self.neutral_start_time = self.agent.world.server_time + + neutral_elapsed_time = ( + self.agent.world.server_time - self.neutral_start_time + ) + + if neutral_elapsed_time < self.NEUTRAL_EXECUTION_TIME: + self.agent.skills_manager.execute_sub_skill( + "Walk", reset=neutral_elapsed_time <= 1e-6, target_2d=(0, 0), is_target_absolute=False + ) + else: + return True + + return False + + def is_ready(self, *args): + return self.agent.world.is_fallen() diff --git a/behaviors/custom/keyframe/get_up/get_up_back.yaml b/behaviors/custom/keyframe/get_up/get_up_back.yaml new file mode 100644 index 0000000..5eaae15 --- /dev/null +++ b/behaviors/custom/keyframe/get_up/get_up_back.yaml @@ -0,0 +1,94 @@ +symmetry: true +kp: 75 +kd: 1 +keyframes: + + - delta: 1 + motor_positions: + Head_yaw: 0.000 + Head_pitch: 0.000 + Shoulder_Pitch: -114.592 + Shoulder_Roll: -71.619 + Elbow_Pitch: -85.944 + Elbow_Yaw: -171.887 + Waist: 0.000 + Hip_Pitch: -45.836 + Hip_Roll: 179.908 + Hip_Yaw: 179.908 + Knee_Pitch: -179.908 + Ankle_Pitch: 89.954 + Ankle_Roll: 89.954 + kp: 250 + kd: 1 + p_gains: + Hip_Pitch: 20 + Hip_Roll: 20 + Hip_Yaw: 20 + Knee_Pitch: 20 + d_gains: + Hip_Pitch: 1 + Hip_Roll: 1 + Hip_Yaw: 1 + Knee_Pitch: 1 + + - delta: 0.8125 + motor_positions: + Head_yaw: 0.000 + Head_pitch: 0.000 + Shoulder_Pitch: -114.592 + Shoulder_Roll: -89.954 + Elbow_Pitch: -85.944 + Elbow_Yaw: 0.000 + Waist: 0.000 + Hip_Pitch: -45.836 + Hip_Roll: 179.908 + Hip_Yaw: 179.908 + Knee_Pitch: -179.908 + Ankle_Pitch: 63.026 + Ankle_Roll: 45.836 + p_gains: + Hip_Pitch: 25 + Hip_Roll: 25 + Hip_Yaw: 25 + Knee_Pitch: 25 + + + + - delta: 0.6 + motor_positions: + Head_yaw: 0.000 + Head_pitch: 0.000 + Shoulder_Pitch: 89.954 + Shoulder_Roll: -89.954 + Elbow_Pitch: 0.000 + Elbow_Yaw: 0.000 + Waist: 0.000 + Hip_Pitch: 20.054 + Hip_Roll: 11.459 + Hip_Yaw: 0.000 + Knee_Pitch: -42.971 + Ankle_Pitch: 57.296 + Ankle_Roll: 0.000 + kp: 40 + kd: 4 + p_gains: + Ankle_Pitch: 100 + Ankle_Roll: 100 + + - delta: 0.25 + motor_positions: + Head_yaw: 0.000 + Head_pitch: 0.000 + Shoulder_Pitch: 0.000 + Shoulder_Roll: -77.363 + Elbow_Pitch: -85.944 + Elbow_Yaw: 0.000 + Waist: 0.000 + Hip_Pitch: 20.054 + Hip_Roll: 7.162 + Hip_Yaw: 0.000 + Knee_Pitch: -42.971 + Ankle_Pitch: 0.000 + Ankle_Roll: 0.000 + kp: 200 + kd: 3.5 \ No newline at end of file diff --git a/behaviors/custom/keyframe/get_up/get_up_front.yaml b/behaviors/custom/keyframe/get_up/get_up_front.yaml new file mode 100644 index 0000000..363a28a --- /dev/null +++ b/behaviors/custom/keyframe/get_up/get_up_front.yaml @@ -0,0 +1,91 @@ +symmetry: true +kp: 75 +kd: 1 +keyframes: + - delta: 0.1 + motor_positions: + Head_yaw: 0.000 + Head_pitch: 0.000 + Shoulder_Pitch: -89.954 + Shoulder_Roll: -71.619 + Elbow_Pitch: 0.000 + Elbow_Yaw: -114.592 + Waist: 0.000 + Hip_Pitch: 0.000 + Hip_Roll: 14.324 + Hip_Yaw: 0.000 + Knee_Pitch: 0.000 + Ankle_Pitch: 0.000 + Ankle_Roll: 0.000 + + - delta: 0.5 + motor_positions: + Head_yaw: 0.000 + Head_pitch: 0.000 + Shoulder_Pitch: 57.296 + Shoulder_Roll: -71.619 + Elbow_Pitch: -85.944 + Elbow_Yaw: -171.887 + Waist: 0.000 + Hip_Pitch: 0.000 + Hip_Roll: 14.324 + Hip_Yaw: 0.000 + Knee_Pitch: -28.648 + Ankle_Pitch: 0.000 + Ankle_Roll: 0.000 + + + - delta: 0.8125 + motor_positions: + Head_yaw: 0.000 + Head_pitch: 0.000 + Shoulder_Pitch: 89.954 + Shoulder_Roll: -89.954 + Elbow_Pitch: -85.944 + Elbow_Yaw: 0.000 + Waist: 0.000 + Hip_Pitch: 200.536 + Hip_Roll: 143.239 + Hip_Yaw: 114.592 + Knee_Pitch: -171.887 + Ankle_Pitch: 89.954 + Ankle_Roll: 24.919 + + - delta: 0.45 + motor_positions: + Head_yaw: 0.000 + Head_pitch: 0.000 + Shoulder_Pitch: 89.954 + Shoulder_Roll: -89.954 + Elbow_Pitch: -85.944 + Elbow_Yaw: 0.000 + Waist: 0.000 + Hip_Pitch: 20.054 + Hip_Roll: 11.459 + Hip_Yaw: 0.000 + Knee_Pitch: -42.972 + Ankle_Pitch: 0.000 + Ankle_Roll: 0.000 + kp: 50 + kd: 2 + p_gains: + Ankle_Pitch: 50 + Ankle_Roll: 50 + + - delta: 0.13 + motor_positions: + Head_yaw: 0.000 + Head_pitch: 0.000 + Shoulder_Pitch: 0.000 + Shoulder_Roll: -77.363 + Elbow_Pitch: -85.944 + Elbow_Yaw: 0.000 + Waist: 0.000 + Hip_Pitch: 20.054 + Hip_Roll: 7.162 + Hip_Yaw: 0.000 + Knee_Pitch: -42.972 + Ankle_Pitch: 0.000 + Ankle_Roll: 0.000 + kp: 200 + kd: 1.95 \ No newline at end of file diff --git a/behaviors/custom/keyframe/keyframe.py b/behaviors/custom/keyframe/keyframe.py new file mode 100644 index 0000000..78dbd10 --- /dev/null +++ b/behaviors/custom/keyframe/keyframe.py @@ -0,0 +1,89 @@ +from behaviors.behavior import Behavior +import yaml + + +class KeyframeSkill(Behavior): + def __init__(self, agent, file: str): + super().__init__(agent) + + self.keyframe_step: int = 0 + self.keyframe_start_time: float = None + + with open(f"{file}", "r") as f: + self.skill_description = yaml.safe_load(f) + + self.has_symmetry: bool = self.skill_description["symmetry"] + self.keyframes: list = self.skill_description["keyframes"] + self.kp: float = self.skill_description["kp"] + self.kd: float = self.skill_description["kd"] + + def execute(self, reset, *args, **kwargs): + if reset: + self.keyframe_step = 0 + self.keyframe_start_time = self.agent.world.server_time + + current_keyframe = self.keyframes[self.keyframe_step] + + raw_motor_positions = current_keyframe["motor_positions"] + + current_keyframe_kp = current_keyframe.get("kp", self.kp) + current_keyframe_kd = current_keyframe.get("kd", self.kd) + + p_gains = current_keyframe.get("p_gains", None) + d_gains = current_keyframe.get("d_gains", None) + + for readable_motor_name, position in raw_motor_positions.items(): + + p_gain = ( + p_gains.get(readable_motor_name, current_keyframe_kp) + if p_gains + else current_keyframe_kp + ) + d_gain = ( + d_gains.get(readable_motor_name, current_keyframe_kd) + if d_gains + else current_keyframe_kd + ) + + if self.has_symmetry: + motor_names, is_inverse_direction = self.agent.robot.MOTOR_SYMMETRY[ + readable_motor_name + ] + for motor_name in motor_names: + server_motor_name = self.agent.robot.MOTOR_FROM_READABLE_TO_SERVER[ + motor_name + ] + + self.agent.robot.set_motor_target_position( + motor_name=server_motor_name, + target_position=position if is_inverse_direction else -position, + kp=p_gain, + kd=d_gain, + ) + if is_inverse_direction: + is_inverse_direction = False # Only inverts one joint + else: + server_motor_name = self.agent.robot.MOTOR_FROM_READABLE_TO_SERVER[ + readable_motor_name + ] + self.agent.robot.set_motor_target_position( + motor_name=server_motor_name, + target_position=position, + kp=self.kp, + kd=self.kd, + ) + + keyframe_time: float = current_keyframe["delta"] + elapsed_keyframe_time = self.agent.world.server_time - self.keyframe_start_time + + if elapsed_keyframe_time >= keyframe_time: + self.keyframe_start_time = self.agent.world.server_time + self.keyframe_step += 1 + + if self.keyframe_step >= len(self.keyframes): + return True + + return False + + def is_ready(self, *args): + return True diff --git a/behaviors/custom/keyframe/poses/neutral/__pycache__/neutral.cpython-311.pyc b/behaviors/custom/keyframe/poses/neutral/__pycache__/neutral.cpython-311.pyc new file mode 100644 index 0000000..55e966d Binary files /dev/null and b/behaviors/custom/keyframe/poses/neutral/__pycache__/neutral.cpython-311.pyc differ diff --git a/behaviors/custom/keyframe/poses/neutral/neutral.py b/behaviors/custom/keyframe/poses/neutral/neutral.py new file mode 100644 index 0000000..c9ff372 --- /dev/null +++ b/behaviors/custom/keyframe/poses/neutral/neutral.py @@ -0,0 +1,7 @@ +import os +from behaviors.custom.keyframe.keyframe import KeyframeSkill + + +class Neutral(KeyframeSkill): + def __init__(self, agent): + super().__init__(agent, os.path.join(os.path.dirname(__file__), "neutral.yaml")) diff --git a/behaviors/custom/keyframe/poses/neutral/neutral.yaml b/behaviors/custom/keyframe/poses/neutral/neutral.yaml new file mode 100644 index 0000000..d365739 --- /dev/null +++ b/behaviors/custom/keyframe/poses/neutral/neutral.yaml @@ -0,0 +1,19 @@ +symmetry: true +kp: 150 +kd: 1 +keyframes: + - delta: 0.0 + motor_positions: + Head_yaw: 0.000 + Head_pitch: 0.000 + Shoulder_Pitch: 0.000 + Shoulder_Roll: -89.954 + Elbow_Pitch: 0.000 + Elbow_Yaw: 0.000 + Waist: 0.000 + Hip_Pitch: 0.000 + Hip_Roll: 0.000 + Hip_Yaw: 0.000 + Knee_Pitch: 0.000 + Ankle_Pitch: 0.000 + Ankle_Roll: 0.000 \ No newline at end of file diff --git a/behaviors/custom/reinforcement/walk/__pycache__/walk.cpython-311.pyc b/behaviors/custom/reinforcement/walk/__pycache__/walk.cpython-311.pyc new file mode 100644 index 0000000..fd85196 Binary files /dev/null and b/behaviors/custom/reinforcement/walk/__pycache__/walk.cpython-311.pyc differ diff --git a/behaviors/custom/reinforcement/walk/walk.onnx b/behaviors/custom/reinforcement/walk/walk.onnx new file mode 100644 index 0000000..183e131 Binary files /dev/null and b/behaviors/custom/reinforcement/walk/walk.onnx differ diff --git a/behaviors/custom/reinforcement/walk/walk.py b/behaviors/custom/reinforcement/walk/walk.py new file mode 100644 index 0000000..72b587b --- /dev/null +++ b/behaviors/custom/reinforcement/walk/walk.py @@ -0,0 +1,147 @@ +import math +import os +import numpy as np +from behaviors.behavior import Behavior +from utils.math_ops import MathOps +from utils.neural_network import run_network, load_network +from scipy.spatial.transform import Rotation as R + +class Walk(Behavior): + def __init__(self, agent): + super().__init__(agent) + self.joint_nominal_position = np.array( + [ + 0.0, + 0.0, + 0.0, + 1.4, + 0.0, + -0.4, + 0.0, + -1.4, + 0.0, + 0.4, + 0.0, + -0.4, + 0.0, + 0.0, + 0.8, + -0.4, + 0.0, + 0.4, + 0.0, + 0.0, + -0.8, + 0.4, + 0.0, + ] + ) + self.train_sim_flip = np.array( + [ + 1.0, + -1.0, + 1.0, + -1.0, + -1.0, + 1.0, + -1.0, + -1.0, + 1.0, + 1.0, + 1.0, + 1.0, + -1.0, + -1.0, + 1.0, + 1.0, + -1.0, + -1.0, + -1.0, + -1.0, + -1.0, + -1.0, + -1.0, + ] + ) + self.scaling_factor = 0.5 + + self.previous_action = np.zeros(len(self.agent.robot.ROBOT_MOTORS)) + + self.model = load_network(model_path=os.path.join(os.path.dirname(__file__), "walk.onnx")) + + + def execute(self, reset: bool, target_2d: list, is_target_absolute: bool, orientation: float=None, is_orientation_absolute: bool=True) -> bool: + + robot = self.agent.robot + world = self.agent.world + + velocity = None + + if is_target_absolute: + raw_target = target_2d - world.global_position[:2] + velocity = MathOps.rotate_2d_vec(raw_target, -robot.global_orientation_euler[2], is_rad=False) + else: + velocity = target_2d + + rel_orientation = None + if orientation is None: + rel_orientation = MathOps.vector_angle(velocity) * 0.3 + elif is_orientation_absolute: + rel_orientation = MathOps.normalize_deg(orientation - robot.global_orientation_euler[2]) + else: + rel_orientation = orientation * 0.3 + + rel_orientation = np.clip(rel_orientation, -0.25, 0.25) + + velocity = np.concatenate([velocity, np.array([rel_orientation])], axis=0) + + velocity[0] = np.clip(velocity[0], -0.5, 0.5) + velocity[1] = np.clip(velocity[1], -0.25, 0.25) + + radian_joint_positions = np.deg2rad(list(robot.motor_positions.values())) + radian_joint_speeds = np.deg2rad(list(robot.motor_speeds.values())) + + qpos_qvel_previous_action = np.vstack( + ( + [ + ( + (radian_joint_positions * self.train_sim_flip) + - self.joint_nominal_position + ) + / 4.6, + radian_joint_speeds / 110.0 * self.train_sim_flip, + self.previous_action / 10.0, + ] + ) + ).T.flatten() + + ang_vel = np.clip(np.deg2rad(robot.gyroscope) / 50.0, -1.0, 1.0) + orientation_quat_inv = R.from_quat(robot._global_cheat_orientation).inv() + projected_gravity = orientation_quat_inv.apply(np.array([0.0, 0.0, -1.0])) + #[0.5,0.25,0.25] + observation = np.concatenate( + [ + qpos_qvel_previous_action, + ang_vel, + velocity, + projected_gravity, + ] + ) + observation = np.clip(observation, -10.0, 10.0) + + nn_action = run_network(obs=observation, model=self.model) + + target_joint_positions = ( + self.joint_nominal_position + self.scaling_factor * nn_action + ) + target_joint_positions *= self.train_sim_flip + + self.previous_action = nn_action + + for idx, target in enumerate(target_joint_positions): + robot.set_motor_target_position( + robot.ROBOT_MOTORS[idx], target*180/math.pi, kp=25, kd=0.6 + ) + + def is_ready(self, *args, **kwargs) -> bool: + return True \ No newline at end of file diff --git a/build_binary.sh b/build_binary.sh new file mode 100755 index 0000000..ec82b3d --- /dev/null +++ b/build_binary.sh @@ -0,0 +1,78 @@ +#!/bin/bash +set -e + +if [ -z "$1" ]; then + echo "Error: Team name not provided." + echo "Usage: $0 " + exit 1 +fi + +TEAM_NAME="$1" + +BUILD_DIR="./build" +DIST_DIR="$BUILD_DIR/dist" +WORK_DIR="$BUILD_DIR/build" +ONEFILE="--onefile" + +rm -rf "$BUILD_DIR" +mkdir -p "$BUILD_DIR" + +echo "Building executable with PyInstaller..." +pyinstaller \ + --add-data './behaviors/custom/keyframe:behaviors/custom/keyframe' \ + --add-data './behaviors/custom/reinforcement/walk/*.onnx:behaviors/custom/reinforcement/walk' \ + ${ONEFILE} \ + --distpath "$DIST_DIR" \ + --workpath "$WORK_DIR" \ + --noconfirm \ + --name "${TEAM_NAME,,}" \ + ./run_player.py + +echo "Creating start.sh..." +cat > "${DIST_DIR}/start.sh" << EOF +#!/bin/bash +export OMP_NUM_THREADS=1 + +host=\${1:-localhost} +port=\${2:-60000} + +for i in {1..11}; do + ./$(echo "${TEAM_NAME,,}") -t ${TEAM_NAME} -n \$i --host \$host --port \$port & +done +EOF + +echo "Creating kill.sh..." +cat > "${DIST_DIR}/kill.sh" << EOF +#!/bin/bash +pkill -9 -e $(echo "${TEAM_NAME,,}") +EOF + +chmod a+x "${DIST_DIR}/start.sh" +chmod a+x "${DIST_DIR}/kill.sh" + +#TEMPORARY! +echo "Creating start3v3.sh..." +cat > "${DIST_DIR}/start3v3.sh" << EOF +#!/bin/bash +export OMP_NUM_THREADS=1 + +host=\${1:-localhost} +port=\${2:-60000} + +for i in {1..3}; do + ./$(echo "${TEAM_NAME,,}") -t ${TEAM_NAME} -f hl_adult -n \$i --host \$host --port \$port & +done +EOF + +chmod a+x "${DIST_DIR}/start3v3.sh" + +echo "Packing into ${BUILD_DIR}/${TEAM_NAME}.tar.gz..." +tar -czf "${BUILD_DIR}/${TEAM_NAME}.tar.gz" -C "$DIST_DIR" . --transform "s|^|${TEAM_NAME}/|" + +echo "Removing build files..." +rm -rf $DIST_DIR +rm -rf $WORK_DIR +echo "Removed build files!" + +echo "Generated file: ${BUILD_DIR}/${TEAM_NAME}.tar.gz" +echo "Build completed successfully!" \ No newline at end of file diff --git a/communication/__pycache__/server.cpython-311.pyc b/communication/__pycache__/server.cpython-311.pyc new file mode 100644 index 0000000..672d3cb Binary files /dev/null and b/communication/__pycache__/server.cpython-311.pyc differ diff --git a/communication/server.py b/communication/server.py new file mode 100644 index 0000000..9d8c0a2 --- /dev/null +++ b/communication/server.py @@ -0,0 +1,106 @@ +import logging +import socket +from select import select +from communication.world_parser import WorldParser + +logger = logging.getLogger(__file__) + + +class Server: + def __init__(self, host: str, port: int, world_parser: WorldParser): + self.world_parser: WorldParser = world_parser + self.__host: str = host + self.__port: str = port + self.__socket: socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.__socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) + self.__send_buff = [] + self.__rcv_buffer_size = 1024 + self.__rcv_buffer = bytearray(self.__rcv_buffer_size) + + def connect(self) -> None: + logger.info("Connecting to server at %s:%d...", self.__host, self.__port) + while True: + try: + self.__socket.connect((self.__host, self.__port)) + break + except ConnectionRefusedError: + logger.error( + "Connection refused. Make sure the server is running and listening on {self.__host}:{self.__port}." + ) + + logger.info(f"Server connection established to {self.__host}:{self.__port}.") + + def shutdown(self) -> None: + self.__socket.close() + self.__socket.shutdown(socket.SHUT_RDWR) + + def send_immediate(self, msg: str) -> None: + """ + Sends only the desired message, without the buffer + """ + try: + self.__socket.send( + (len(msg)).to_bytes(4, byteorder="big") + msg.encode() + ) # Add message length in the first 4 bytes + except BrokenPipeError: + print("\nError: socket was closed by rcssserver3d!") + exit() + + def send(self) -> None: + """ + Send all committed messages + """ + if len(select([self.__socket], [], [], 0.0)[0]) == 0: + self.send_immediate(("".join(self.__send_buff))) + else: + logger.info("Server_Comm.py: Received a new packet while thinking!") + self.__send_buff = [] + + def commit(self, msg: str) -> None: + """ + Appends message to buffer + """ + self.__send_buff.append(msg) + + def commit_and_send(self, msg: str = "") -> None: + """ + Appends a message to buffer and then sends the buffer + """ + self.commit(msg) + self.send() + + def receive(self) -> None: + """ + Receive the next message from the TCP/IP socket and updates world + """ + + # Receive message length information + if ( + self.__socket.recv_into( + self.__rcv_buffer, nbytes=4, flags=socket.MSG_WAITALL + ) + != 4 + ): + raise ConnectionResetError + + msg_size = int.from_bytes(self.__rcv_buffer[:4], byteorder="big", signed=False) + + # Ensure receive buffer is large enough to hold the message + if msg_size > self.__rcv_buffer_size: + self.__rcv_buffer_size = msg_size + self.__rcv_buffer = bytearray(self.__rcv_buffer_size) + + # Receive message with the specified length + if ( + self.__socket.recv_into( + self.__rcv_buffer, nbytes=msg_size, flags=socket.MSG_WAITALL + ) + != msg_size + ): + raise ConnectionResetError + + self.world_parser.parse(message=self.__rcv_buffer[:msg_size].decode()) + + def commit_beam(self, pos2d: list, rotation: float) -> None: + assert len(pos2d) == 2 + self.commit(f"(beam {pos2d[0]} {pos2d[1]} {rotation})") diff --git a/communication/world_parser.py b/communication/world_parser.py new file mode 100644 index 0000000..c63ca8b --- /dev/null +++ b/communication/world_parser.py @@ -0,0 +1,255 @@ +import logging +import re +import numpy as np +from scipy.spatial.transform import Rotation as R + +from utils.math_ops import MathOps +from world.commons.play_mode import PlayModeEnum + +logger = logging.getLogger() + + +class WorldParser: + def __init__(self, agent): + from agent.base_agent import Base_Agent # type hinting + + self.agent: Base_Agent = agent + + def parse(self, message: str) -> None: + perception_dict: dict = self.__sexpression_to_dict(message) + + world = self.agent.world + + # Game parse + + if world.is_left_team is None: + world.is_left_team = ( + True + if perception_dict["GS"]["tl"] == world.team_name + else False if perception_dict["GS"]["tr"] == world.team_name else None + ) + + world.playmode = PlayModeEnum.get_playmode_from_string( + playmode=perception_dict["GS"]["pm"], is_left_team=world.is_left_team + ) + + world.game_time = perception_dict["GS"]["t"] + world.score_left = perception_dict["GS"]["sl"] + world.score_right = perception_dict["GS"]["sr"] + + left_team_name: str = perception_dict["GS"].get("tl", None) + right_team_name: str = perception_dict["GS"].get("tr", None) + if left_team_name and right_team_name: + world.their_team_name = ( + right_team_name if world.is_left_team else left_team_name + ) + + world.last_server_time = world.server_time + world.server_time = perception_dict["time"]["now"] + + # Robot parse + + robot = self.agent.robot + + robot.motor_positions = {h["n"]: h["ax"] for h in perception_dict["HJ"]} + + robot.motor_speeds = {h["n"]: h["vx"] for h in perception_dict["HJ"]} + + world._global_cheat_position = np.array(perception_dict["pos"]["p"]) + + # changes quaternion from (w, x, y, z) to (x, y, z, w) + robot._global_cheat_orientation = np.array(perception_dict["quat"]["q"]) + robot._global_cheat_orientation = robot._global_cheat_orientation[[1, 2, 3, 0]] + + # flips 180 deg considering team side + try: + if not world.is_left_team: + world._global_cheat_position[:2] = -world._global_cheat_position[:2] + + global_rotation = R.from_quat(robot.global_orientation_quat) + yaw180 = R.from_euler('z', 180, degrees=True) + fixed_rotation = yaw180 * global_rotation + robot._global_cheat_orientation = fixed_rotation.as_quat() + + # updates global orientation + euler_angles_deg = R.from_quat(robot._global_cheat_orientation).as_euler('xyz', degrees=True) + robot.global_orientation_euler = np.array( + [MathOps.normalize_deg(axis_angle) for axis_angle in euler_angles_deg]) + robot.global_orientation_quat = robot._global_cheat_orientation + world.global_position = world._global_cheat_position + except: + logger.exception(f'Failed to rotate orientation and position considering team side') + + robot.gyroscope = np.array(perception_dict["GYR"]["rt"]) + + robot.accelerometer = np.array(perception_dict["ACC"]["a"]) + + world.is_ball_pos_updated = False + + # Vision parse + if 'See' in perception_dict: + + for seen_object in perception_dict['See']: + obj_type = seen_object['type'] + + if obj_type == 'B': # Ball + + polar_coords = np.array(seen_object['pol']) + local_cartesian_3d = MathOps.deg_sph2cart(polar_coords) + + world.ball_pos = MathOps.rel_to_global_3d( + local_pos_3d=local_cartesian_3d, + global_pos_3d=world.global_position, + global_orientation_quat=robot.global_orientation_quat + ) + world.is_ball_pos_updated = True + + elif obj_type == "P": + + team = seen_object.get('team') + player_id = seen_object.get('id') + + if team and player_id is not None: + if (team == world.team_name): + player = world.our_team_players[player_id - 1] + else: + player = world.their_team_players[player_id - 1] + + objects = [seen_object.get('head'), seen_object.get('l_foot'), seen_object.get('r_foot')] + + seen_objects = [object for object in objects if object] + + if seen_objects: + local_cartesian_seen_objects = [MathOps.deg_sph2cart(object) for object in seen_objects] + + approximated_centroid = np.mean(local_cartesian_seen_objects, axis=0) + + player.position = MathOps.rel_to_global_3d( + local_pos_3d=approximated_centroid, + global_pos_3d=world.global_position, + global_orientation_quat=robot._global_cheat_orientation + ) + player.last_seen_time = world.server_time + + elif obj_type: + + polar_coords = np.array(seen_object['pol']) + world.field.field_landmarks.update_from_perception( + landmark_id=obj_type, + landmark_pos=polar_coords + ) + + def __sexpression_to_dict(self, sexpression: str) -> dict: + """ + Parses a sensor data string of nested parenthesis groups into a structured dictionary. + Repeated top-level tags are aggregated into lists. + """ + + def split_top_level(s: str): + """Return a list of substrings that are top-level parenthesized groups.""" + groups = [] + depth = 0 + start = None + for i, ch in enumerate(s): + if ch == '(': + if depth == 0: + start = i + depth += 1 + elif ch == ')': + depth -= 1 + if depth == 0 and start is not None: + groups.append(s[start:i + 1]) + start = None + return groups + + result = {} + + top_groups = split_top_level(sexpression) + + for grp in top_groups: + m = re.match(r'^\((\w+)\s*(.*)\)$', grp, re.DOTALL) + if not m: + continue + tag = m.group(1) + inner = m.group(2).strip() + + if tag == "See": + see_items = [] + subs = split_top_level(inner) + + for sub in subs: + sm = re.match(r'^\((\w+)\s*(.*)\)$', sub, re.DOTALL) + if not sm: + continue + obj_type = sm.group(1) + inner2 = sm.group(2) + + if obj_type == "P": # Player + player_data = {"type": "P"} + team_m = re.search(r'\(team\s+([^)]+)\)', inner2) + if team_m: + player_data["team"] = team_m.group(1) + id_m = re.search(r'\(id\s+([^)]+)\)', inner2) + if id_m: + try: + player_data["id"] = int(id_m.group(1)) + except ValueError: + player_data["id"] = id_m.group(1) + + parts = re.findall(r'\((\w+)\s*\(pol\s+([-0-9.\s]+)\)\)', inner2) + for part_name, pol_str in parts: + pol_vals = [float(x) for x in pol_str.strip().split()] + player_data[part_name] = pol_vals + + see_items.append(player_data) + continue + + # Generic + pol_m = re.search(r'\(pol\s+([-0-9.\s]+)\)', inner2) + vals = [float(x) for x in pol_m.group(1).strip().split()] if pol_m else [] + see_items.append({"type": obj_type, "pol": vals}) + + result.setdefault("See", []).extend(see_items) + continue + + # Generic parse for other tags (time, GS, quat, pos, HJ, ...) + group = {} + children = split_top_level(inner) + if children: # (key val1 val2) + for child in children: + im = re.match(r'^\(\s*(\w+)\s+([^)]+)\)$', child.strip(), re.DOTALL) + if not im: + continue + key = im.group(1) + vals = im.group(2).strip().split() + parsed = [] + for t in vals: + try: + parsed.append(float(t)) + except ValueError: + parsed.append(t) + group[key] = parsed[0] if len(parsed) == 1 else parsed + else: + # search pairs (key vals...) + items = re.findall(r"\(\s*(\w+)((?:\s+[^()]+)+)\)", inner) + for key, vals in items: + tokens = vals.strip().split() + parsed_vals = [] + for t in tokens: + try: + parsed_vals.append(float(t)) + except ValueError: + parsed_vals.append(t) + # Single value vs. list + group[key] = parsed_vals[0] if len(parsed_vals) == 1 else parsed_vals + + # Merge into result, handling repeated tags as lists + if tag in result: + if isinstance(result[tag], list): + result[tag].append(group) + else: + result[tag] = [result[tag], group] + else: + result[tag] = group + + return result diff --git a/kill.sh b/kill.sh new file mode 100755 index 0000000..364f9f8 --- /dev/null +++ b/kill.sh @@ -0,0 +1 @@ +pkill -9 -e -f "python3 run_player" \ No newline at end of file diff --git a/readme.md b/readme.md new file mode 100644 index 0000000..be33a06 --- /dev/null +++ b/readme.md @@ -0,0 +1,91 @@ +# Apollo Mujoco base code for training on GPU + +This is a Python-based base code developed for the RCSSServerMJ. It was created to simplify the onboarding process for new teams joining the RoboCup 3D Soccer Simulation League using the Mujoco Simulator. + +This code was influenced by the early demonstrations from MagmaOffenburg team of a client for the RCSSServerMJ, and the FCPortugal base code for the SimSpark simulator. + +## Installation + +### Make sure the following are installed on your system: + +- Python = 3.11 + +- Any Python dependency manager can be used, but **either Poetry is recommended**. + +### Install Dependencies +The project dependencies are listed inside pyproject.toml + +- IsaacLab and IsaacSim +```bash +# Create a virtual environment using one of the package managers: + + conda create -n env_isaaclab python=3.11 + conda activate env_isaaclab + +# Ensure the latest pip version is installed. +# To update pip, run the following command from inside the virtual environment: + + pip install --upgrade pip +# Install the Isaac Lab packages along with Isaac Sim: + + pip install isaaclab[isaacsim,all]==2.3.2.post1 --extra-index-url https://pypi.nvidia.com +# Install a CUDA-enabled PyTorch 2.7.0 build for CUDA 12.8 that matches your system architecture: + + pip install -U torch==2.7.0 torchvision==0.22.0 --index-url https://download.pytorch.org/whl/cu128 + +# To use rl_games for training and inferencing, install its Python 3.11 enabled fork: + + pip install # rl_games or rsl_rl or stable-baselines3 based on your model +``` + +## Instructions + +### Run an agent +After installing the dependencies and setting up the environment, you can launch a player instance: + +```bash +python3 run_player.py -n -t +``` + +CLI parameter (a usage help is also available): + +- `--host ` to specify the host IP (default: 'localhost') +- `--port ` to specify the agent port (default: 60000) +- `-n ` Player number (1–11) (default: 1) +- `-t ` Team name (default: 'Default') + + +### Run a team +You can also use a shell script to start the entire team, optionally specifying host and port: + +```bash +./start.sh [host] [port] +``` + + +CLI parameter: + +- `[host]` Server IP address (default: 'localhost') +- `[port]` Server port for agents (default: 60000) + +### Binary building +To compete, a binary is needed. It provides a compact, portable version and protects the source code. To create a binary, just run the script ```build_binary.sh``` + +```bash +./build_binary.sh +``` + +Once binary generation is finished, the result will be inside the build folder, as ```.tar.gz``` + +### Authors and acknowledgment +This project was developed and contributed by: +- **Chenxi Liu** +- **Xuehao Xu** + +### Thanks for +- **Alan Nascimento** +- **Luís Magalhães** +- **Pedro Rabelo** +- **Melissa Damasceno** + +Contributions, bug reports, and feature requests are welcome via pull requests. diff --git a/run_player.py b/run_player.py new file mode 100755 index 0000000..bfea17b --- /dev/null +++ b/run_player.py @@ -0,0 +1,31 @@ +import argparse +import logging +from agent.base_agent import Base_Agent + +ch = logging.StreamHandler() +formatter = logging.Formatter( + "%(asctime)s [%(levelname)s] %(name)s (%(filename)s:%(lineno)d) - %(message)s", + datefmt="%Y-%m-%d %H:%M:%S", +) +ch.setLevel(logging.INFO) +logging.basicConfig(handlers=[ch], level=logging.DEBUG) + +parser = argparse.ArgumentParser(description="Start a MuJoCo Agent.") + +parser.add_argument("-t", "--team", type=str, default="Default", help="Team name") +parser.add_argument("-n", "--number", type=int, default=1, help="Player number") +parser.add_argument("--host", type=str, default="127.0.0.1", help="Server host") +parser.add_argument("--port", type=int, default=60000, help="Server port") +parser.add_argument("-f", "--field", type=str, default='fifa', help="Field to be played") + +args = parser.parse_args() + +player = Base_Agent( + team_name=args.team, + number=args.number, + host=args.host, + port=args.port, + field=args.field +) + +player.run() diff --git a/start.sh b/start.sh new file mode 100755 index 0000000..81f82a5 --- /dev/null +++ b/start.sh @@ -0,0 +1,9 @@ +#!/bin/bash +export OMP_NUM_THREADS=1 + +host=${1:-localhost} +port=${2:-60000} + +for i in {1..11}; do + python3 run_player.py --host $host --port $port -n $i -t SE & +done diff --git a/start3v3.sh b/start3v3.sh new file mode 100755 index 0000000..1fab52a --- /dev/null +++ b/start3v3.sh @@ -0,0 +1,9 @@ +#!/bin/bash +export OMP_NUM_THREADS=1 + +host=${1:-localhost} +port=${2:-60000} + +for i in {1..3}; do + python3 run_player.py --host $host --port $port -n $i -t SE -f hl_adult & +done diff --git a/utils/__pycache__/math_ops.cpython-311.pyc b/utils/__pycache__/math_ops.cpython-311.pyc new file mode 100644 index 0000000..fda0f58 Binary files /dev/null and b/utils/__pycache__/math_ops.cpython-311.pyc differ diff --git a/utils/__pycache__/neural_network.cpython-311.pyc b/utils/__pycache__/neural_network.cpython-311.pyc new file mode 100644 index 0000000..d9e910f Binary files /dev/null and b/utils/__pycache__/neural_network.cpython-311.pyc differ diff --git a/utils/math_ops.py b/utils/math_ops.py new file mode 100644 index 0000000..f142e34 --- /dev/null +++ b/utils/math_ops.py @@ -0,0 +1,427 @@ +from math import acos, asin, atan2, cos, pi, sin, sqrt, isnan +from scipy.spatial.transform import Rotation +import numpy as np +import sys + +try: + GLOBAL_DIR = sys._MEIPASS # temporary folder with libs & data files +except: + GLOBAL_DIR = "." + + +class MathOps(): + ''' + This class provides general mathematical operations that are not directly available through numpy + ''' + + @staticmethod + def deg_sph2cart(spherical_vec): + ''' Converts spherical coordinates in degrees to cartesian coordinates ''' + r = spherical_vec[0] + h = spherical_vec[1] * pi / 180 + v = spherical_vec[2] * pi / 180 + return np.array([r * cos(v) * cos(h), r * cos(v) * sin(h), r * sin(v)]) + + @staticmethod + def deg_sin(deg_angle): + ''' Returns sin of degrees ''' + return sin(deg_angle * pi / 180) + + @staticmethod + def deg_cos(deg_angle): + ''' Returns cos of degrees ''' + return cos(deg_angle * pi / 180) + + @staticmethod + def to_3d(vec_2d, value=0) -> np.ndarray: + ''' Returns new 3d vector from 2d vector ''' + return np.append(vec_2d, value) + + @staticmethod + def to_2d_as_3d(vec_3d) -> np.ndarray: + ''' Returns new 3d vector where the 3rd dimension is zero ''' + vec_2d_as_3d = np.copy(vec_3d) + vec_2d_as_3d[2] = 0 + return vec_2d_as_3d + + @staticmethod + def normalize_vec(vec) -> np.ndarray: + ''' Divides vector by its length ''' + size = np.linalg.norm(vec) + if size == 0: return vec + return vec / size + + def rel_to_global_3d(local_pos_3d: np.ndarray, global_pos_3d: np.ndarray, + global_orientation_quat: np.ndarray) -> np.ndarray: + ''' Converts a local 3d position to a global 3d position given the global position and orientation (quaternion) ''' + + rotation = Rotation.from_quat(global_orientation_quat) + rotated_vec = rotation.apply(local_pos_3d) + global_pos = global_pos_3d + rotated_vec + + return global_pos + + @staticmethod + def get_active_directory(dir: str) -> str: + global GLOBAL_DIR + return GLOBAL_DIR + dir + + @staticmethod + def acos(val): + ''' arccosine function that limits input ''' + return acos(np.clip(val, -1, 1)) + + @staticmethod + def asin(val): + ''' arcsine function that limits input ''' + return asin(np.clip(val, -1, 1)) + + @staticmethod + def normalize_deg(val): + ''' normalize val in range [-180,180[ ''' + return (val + 180.0) % 360 - 180 + + @staticmethod + def normalize_rad(val): + ''' normalize val in range [-pi,pi[ ''' + return (val + pi) % (2 * pi) - pi + + @staticmethod + def deg_to_rad(val): + ''' convert degrees to radians ''' + return val * 0.01745329251994330 + + @staticmethod + def rad_to_deg(val): + ''' convert radians to degrees ''' + return val * 57.29577951308232 + + @staticmethod + def vector_angle(vector, is_rad=False): + ''' angle (degrees or radians) of 2D vector ''' + if is_rad: + return atan2(vector[1], vector[0]) + else: + return atan2(vector[1], vector[0]) * 180 / pi + + @staticmethod + def vectors_angle(vec1, vec2, is_rad=False): + ''' get angle between vectors (degrees or radians) ''' + ang_rad = acos(np.dot(MathOps.normalize_vec(vec1), MathOps.normalize_vec(vec2))) + return ang_rad if is_rad else ang_rad * 180 / pi + + @staticmethod + def vector_from_angle(angle, is_rad=False): + ''' unit vector with direction given by `angle` ''' + if is_rad: + return np.array([cos(angle), sin(angle)], float) + else: + return np.array([MathOps.deg_cos(angle), MathOps.deg_sin(angle)], float) + + @staticmethod + def target_abs_angle(pos2d, target, is_rad=False): + ''' angle (degrees or radians) of vector (target-pos2d) ''' + if is_rad: + return atan2(target[1] - pos2d[1], target[0] - pos2d[0]) + else: + return atan2(target[1] - pos2d[1], target[0] - pos2d[0]) * 180 / pi + + @staticmethod + def target_rel_angle(pos2d, ori, target, is_rad=False): + ''' relative angle (degrees or radians) of target if we're located at 'pos2d' with orientation 'ori' (degrees or radians) ''' + if is_rad: + return MathOps.normalize_rad(atan2(target[1] - pos2d[1], target[0] - pos2d[0]) - ori) + else: + return MathOps.normalize_deg(atan2(target[1] - pos2d[1], target[0] - pos2d[0]) * 180 / pi - ori) + + @staticmethod + def rotate_2d_vec(vec, angle, is_rad=False): + ''' rotate 2D vector anticlockwise around the origin by `angle` ''' + cos_ang = cos(angle) if is_rad else cos(angle * pi / 180) + sin_ang = sin(angle) if is_rad else sin(angle * pi / 180) + return np.array([cos_ang * vec[0] - sin_ang * vec[1], sin_ang * vec[0] + cos_ang * vec[1]]) + + @staticmethod + def distance_point_to_line(p: np.ndarray, a: np.ndarray, b: np.ndarray): + ''' + Distance between point p and 2d line 'ab' (and side where p is) + + Parameters + ---------- + a : ndarray + 2D point that defines line + b : ndarray + 2D point that defines line + p : ndarray + 2D point + + Returns + ------- + distance : float + distance between line and point + side : str + if we are at a, looking at b, p may be at our "left" or "right" + ''' + line_len = np.linalg.norm(b - a) + + if line_len == 0: # assumes vertical line + dist = sdist = np.linalg.norm(p - a) + else: + sdist = np.cross(b - a, p - a) / line_len + dist = abs(sdist) + + return dist, "left" if sdist > 0 else "right" + + @staticmethod + def distance_point_to_point_2d(p1: np.ndarray, p2: np.ndarray) -> float: + ''' Distance in 2d from point p1 to point p2''' + + return np.sqrt((p1[0] - p2[0]) ** 2 + (p1[1] - p2[1]) ** 2) + + @staticmethod + def is_point_between(p: np.ndarray, a: np.ndarray, b: np.ndarray) -> bool: + ''' Verify if point 'p' is between 'a' and 'b' ''' + + p_2d = p[:2] + a_2d = a[:2] + b_2d = b[:2] + + return np.all(np.minimum(a_2d, b_2d) <= p_2d) and np.all(p_2d <= np.maximum(a_2d, b_2d)) + + @staticmethod + def is_x_between(x1: np.ndarray, x2: np.ndarray, x3: np.ndarray) -> bool: + ''' Verify if x-axis of point 'x1' is between 'x2' and 'x3' ''' + + return x1 >= min(x2, x3) and x1 <= max(x2, x3) + + def distance_point_to_segment(p: np.ndarray, a: np.ndarray, b: np.ndarray): + ''' Distance from point p to 2d line segment 'ab' ''' + + ap = p - a + ab = b - a + + ad = MathOps.vector_projection(ap, ab) + + # Is d in ab? We can find k in (ad = k * ab) without computing any norm + # we use the largest dimension of ab to avoid division by 0 + k = ad[0] / ab[0] if abs(ab[0]) > abs(ab[1]) else ad[1] / ab[1] + + if k <= 0: + return np.linalg.norm(ap) + elif k >= 1: + return np.linalg.norm(p - b) + else: + return np.linalg.norm(p - (ad + a)) # p-d + + @staticmethod + def distance_point_to_ray(p: np.ndarray, ray_start: np.ndarray, ray_direction: np.ndarray): + ''' Distance from point p to 2d ray ''' + + rp = p - ray_start + rd = MathOps.vector_projection(rp, ray_direction) + + # Is d in ray? We can find k in (rd = k * ray_direction) without computing any norm + # we use the largest dimension of ray_direction to avoid division by 0 + k = rd[0] / ray_direction[0] if abs(ray_direction[0]) > abs(ray_direction[1]) else rd[1] / ray_direction[1] + + if k <= 0: + return np.linalg.norm(rp) + else: + return np.linalg.norm(p - (rd + ray_start)) # p-d + + @staticmethod + def closest_point_on_ray_to_point(p: np.ndarray, ray_start: np.ndarray, ray_direction: np.ndarray): + ''' Point on ray closest to point p ''' + + rp = p - ray_start + rd = MathOps.vector_projection(rp, ray_direction) + + # Is d in ray? We can find k in (rd = k * ray_direction) without computing any norm + # we use the largest dimension of ray_direction to avoid division by 0 + k = rd[0] / ray_direction[0] if abs(ray_direction[0]) > abs(ray_direction[1]) else rd[1] / ray_direction[1] + + if k <= 0: + return ray_start + else: + return rd + ray_start + + @staticmethod + def does_circle_intersect_segment(p: np.ndarray, r, a: np.ndarray, b: np.ndarray): + ''' Returns true if circle (center p, radius r) intersect 2d line segment ''' + + ap = p - a + ab = b - a + + ad = MathOps.vector_projection(ap, ab) + + # Is d in ab? We can find k in (ad = k * ab) without computing any norm + # we use the largest dimension of ab to avoid division by 0 + k = ad[0] / ab[0] if abs(ab[0]) > abs(ab[1]) else ad[1] / ab[1] + + if k <= 0: + return np.dot(ap, ap) <= r * r + elif k >= 1: + return np.dot(p - b, p - b) <= r * r + + dp = p - (ad + a) + return np.dot(dp, dp) <= r * r + + @staticmethod + def vector_projection(a: np.ndarray, b: np.ndarray): + ''' Vector projection of a onto b ''' + b_dot = np.dot(b, b) + return b * np.dot(a, b) / b_dot if b_dot != 0 else b + + @staticmethod + def do_noncollinear_segments_intersect(a, b, c, d): + ''' + Check if 2d line segment 'ab' intersects with noncollinear 2d line segment 'cd' + Explanation: https://www.geeksforgeeks.org/check-if-two-given-line-segments-intersect/ + ''' + + ccw = lambda a, b, c: (c[1] - a[1]) * (b[0] - a[0]) > (b[1] - a[1]) * (c[0] - a[0]) + return ccw(a, c, d) != ccw(b, c, d) and ccw(a, b, c) != ccw(a, b, d) + + @staticmethod + def intersection_segment_opp_goal(a: np.ndarray, b: np.ndarray): + ''' Computes the intersection point of 2d segment 'ab' and the opponents' goal (front line) ''' + vec_x = b[0] - a[0] + + # Collinear intersections are not accepted + if vec_x == 0: return None + + k = (15.01 - a[0]) / vec_x + + # No collision + if k < 0 or k > 1: return None + + intersection_pt = a + (b - a) * k + + if -1.01 <= intersection_pt[1] <= 1.01: + return intersection_pt + else: + return None + + @staticmethod + def intersection_circle_opp_goal(p: np.ndarray, r): + ''' + Computes the intersection segment of circle (center p, radius r) and the opponents' goal (front line) + Only the y coordinates are returned since the x coordinates are always equal to 15 + ''' + + x_dev = abs(15 - p[0]) + + if x_dev > r: + return None # no intersection with x=15 + + y_dev = sqrt(r * r - x_dev * x_dev) + + p1 = max(p[1] - y_dev, -1.01) + p2 = min(p[1] + y_dev, 1.01) + + if p1 == p2: + return p1 # return the y coordinate of a single intersection point + elif p2 < p1: + return None # no intersection + else: + return p1, p2 # return the y coordinates of the intersection segment + + @staticmethod + def distance_point_to_opp_goal(p: np.ndarray): + ''' Distance between point 'p' and the opponents' goal (front line) ''' + + if p[1] < -1.01: + return np.linalg.norm(p - (15, -1.01)) + elif p[1] > 1.01: + return np.linalg.norm(p - (15, 1.01)) + else: + return abs(15 - p[0]) + + @staticmethod + def circle_line_segment_intersection(circle_center, circle_radius, pt1, pt2, full_line=True, tangent_tol=1e-9): + """ Find the points at which a circle intersects a line-segment. This can happen at 0, 1, or 2 points. + + :param circle_center: The (x, y) location of the circle center + :param circle_radius: The radius of the circle + :param pt1: The (x, y) location of the first point of the segment + :param pt2: The (x, y) location of the second point of the segment + :param full_line: True to find intersections along full line - not just in the segment. False will just return intersections within the segment. + :param tangent_tol: Numerical tolerance at which we decide the intersections are close enough to consider it a tangent + :return Sequence[Tuple[float, float]]: A list of length 0, 1, or 2, where each element is a point at which the circle intercepts a line segment. + + Note: We follow: http://mathworld.wolfram.com/Circle-LineIntersection.html + """ + + (p1x, p1y), (p2x, p2y), (cx, cy) = pt1, pt2, circle_center + (x1, y1), (x2, y2) = (p1x - cx, p1y - cy), (p2x - cx, p2y - cy) + dx, dy = (x2 - x1), (y2 - y1) + dr = (dx ** 2 + dy ** 2) ** .5 + big_d = x1 * y2 - x2 * y1 + discriminant = circle_radius ** 2 * dr ** 2 - big_d ** 2 + + if discriminant < 0: # No intersection between circle and line + return [] + else: # There may be 0, 1, or 2 intersections with the segment + intersections = [ + (cx + (big_d * dy + sign * (-1 if dy < 0 else 1) * dx * discriminant ** .5) / dr ** 2, + cy + (-big_d * dx + sign * abs(dy) * discriminant ** .5) / dr ** 2) + for sign in ((1, -1) if dy < 0 else (-1, 1))] # This makes sure the order along the segment is correct + if not full_line: # If only considering the segment, filter out intersections that do not fall within the segment + fraction_along_segment = [ + (xi - p1x) / dx if abs(dx) > abs(dy) else (yi - p1y) / dy for xi, yi in intersections] + intersections = [pt for pt, frac in zip( + intersections, fraction_along_segment) if 0 <= frac <= 1] + # If line is tangent to circle, return just one point (as both intersections have same location) + if len(intersections) == 2 and abs(discriminant) <= tangent_tol: + return [intersections[0]] + else: + return intersections + + # adapted from https://stackoverflow.com/questions/3252194/numpy-and-line-intersections + @staticmethod + def get_line_intersection(a1, a2, b1, b2): + """ + Returns the point of intersection of the lines passing through a2,a1 and b2,b1. + a1: [x, y] a point on the first line + a2: [x, y] another point on the first line + b1: [x, y] a point on the second line + b2: [x, y] another point on the second line + """ + s = np.vstack([a1, a2, b1, b2]) # s for stacked + h = np.hstack((s, np.ones((4, 1)))) # h for homogeneous + l1 = np.cross(h[0], h[1]) # get first line + l2 = np.cross(h[2], h[3]) # get second line + x, y, z = np.cross(l1, l2) # point of intersection + if z == 0: # lines are parallel + return np.array([float('inf'), float('inf')]) + return np.array([x / z, y / z], float) + + @staticmethod + def get_score_based_on_tanh(input, vertical_scaling=0.5, horizontal_scaling=1, vertical_shift=0.5, + horizontal_shift=0) -> float: + ''' + Based on hiperbolic tangent function, it's calculated a score between 0 and 1 (or others, depending on the parameters) + + Function: a * tanh(x * c + d) + b + + :param input: x value of the function + :param vertical_scaling: strech (a > 1) or compress (0 < a < 1) the function + :param horizontal_scaling: strech (c > 1) or compress (0 < c < 1) the function + :param vertical_shift: shift up (b > 0) or down (b < 0) the function + :param horizontal_shift: shift right (d < 0) or left (d > 0) the function + ''' + + return vertical_scaling * np.tanh(input * horizontal_scaling + horizontal_shift) + vertical_shift + + @staticmethod + def get_angle_to_origin_radians(point: np.ndarray): + x, y = point + + if x == 0: + return pi / 2 if y > 0 else -pi / 2 + + result = atan2(y, x) + + result = result if not isnan(result) else pi / 2 + + return MathOps.normalize_rad(result) diff --git a/utils/neural_network.py b/utils/neural_network.py new file mode 100644 index 0000000..e7b4600 --- /dev/null +++ b/utils/neural_network.py @@ -0,0 +1,69 @@ +import numpy as np +import onnxruntime as ort + +def export_model(model_class, weights_path, output_file): + """ + Export a PyTorch model to ONNX format automatically detecting input shape. + """ + import torch # imported only here + + model = model_class() + model.load_state_dict(torch.load(weights_path, map_location="cpu")) + model.eval() + + # Infer input size from first Linear layer + first_linear = next(m for m in model.modules() if isinstance(m, torch.nn.Linear)) + input_size = first_linear.in_features + dummy_input = torch.randn(1, input_size) + + torch.onnx.export( + model, + dummy_input, + output_file, + input_names=["obs"], + output_names=["action"], + dynamic_axes={"obs": {0: "batch"}}, + opset_version=17, + ) + print(f"Model exported to {output_file} (input size: {input_size})") + + +def load_network(model_path): + """ + Load an ONNX model into memory for fast reuse. + """ + session = ort.InferenceSession(model_path) + input_name = session.get_inputs()[0].name + output_name = session.get_outputs()[0].name + return {"session": session, "input_name": input_name, "output_name": output_name} + + +def run_network(obs, model): + """ + Run a preloaded ONNX model and return a flat float array suitable for motor targets. + + Args: + obs (np.ndarray): Input observation array. + model (dict): The loaded model from load_network(). + + Returns: + np.ndarray: 1D array of floats. + """ + if not isinstance(obs, np.ndarray): + obs = np.array(obs, dtype=np.float32) + else: + obs = obs.astype(np.float32) # ensure float32 + + if obs.ndim == 1: + obs = obs[np.newaxis, :] # make batch dimension + + session = model["session"] + input_name = model["input_name"] + output_name = model["output_name"] + + result = session.run([output_name], {input_name: obs})[0] + + # flatten to 1D and convert to float + result = result.flatten().astype(np.float32) + + return result \ No newline at end of file diff --git a/world/__pycache__/robot.cpython-311.pyc b/world/__pycache__/robot.cpython-311.pyc new file mode 100644 index 0000000..a11167d Binary files /dev/null and b/world/__pycache__/robot.cpython-311.pyc differ diff --git a/world/__pycache__/world.cpython-311.pyc b/world/__pycache__/world.cpython-311.pyc new file mode 100644 index 0000000..b244cdb Binary files /dev/null and b/world/__pycache__/world.cpython-311.pyc differ diff --git a/world/commons/__pycache__/field.cpython-311.pyc b/world/commons/__pycache__/field.cpython-311.pyc new file mode 100644 index 0000000..9a1ff2f Binary files /dev/null and b/world/commons/__pycache__/field.cpython-311.pyc differ diff --git a/world/commons/__pycache__/field_landmarks.cpython-311.pyc b/world/commons/__pycache__/field_landmarks.cpython-311.pyc new file mode 100644 index 0000000..e38003c Binary files /dev/null and b/world/commons/__pycache__/field_landmarks.cpython-311.pyc differ diff --git a/world/commons/__pycache__/other_robot.cpython-311.pyc b/world/commons/__pycache__/other_robot.cpython-311.pyc new file mode 100644 index 0000000..dabc23f Binary files /dev/null and b/world/commons/__pycache__/other_robot.cpython-311.pyc differ diff --git a/world/commons/__pycache__/play_mode.cpython-311.pyc b/world/commons/__pycache__/play_mode.cpython-311.pyc new file mode 100644 index 0000000..013bb37 Binary files /dev/null and b/world/commons/__pycache__/play_mode.cpython-311.pyc differ diff --git a/world/commons/field.py b/world/commons/field.py new file mode 100644 index 0000000..e541255 --- /dev/null +++ b/world/commons/field.py @@ -0,0 +1,50 @@ +from abc import ABC, abstractmethod +from typing_extensions import override +from world.commons.field_landmarks import FieldLandmarks + + +class Field(ABC): + def __init__(self, world): + from world.world import World # type hinting + self.world: World = world + self.field_landmarks: FieldLandmarks = FieldLandmarks(world=self.world) + + def get_our_goal_position(self): + return (-self.get_length() / 2, 0) + + def get_their_goal_position(self): + return (self.get_length() / 2, 0) + + @abstractmethod + def get_width(self): + raise NotImplementedError() + + @abstractmethod + def get_length(self): + raise NotImplementedError() + + +class FIFAField(Field): + def __init__(self, world): + super().__init__(world) + + @override + def get_width(self): + return 68 + + @override + def get_length(self): + return 105 + + +class HLAdultField(Field): + def __init__(self, world): + super().__init__(world) + + @override + def get_width(self): + return 9 + + @override + def get_length(self): + return 14 diff --git a/world/commons/field_landmarks.py b/world/commons/field_landmarks.py new file mode 100644 index 0000000..f686ef6 --- /dev/null +++ b/world/commons/field_landmarks.py @@ -0,0 +1,34 @@ +import numpy as np +from utils.math_ops import MathOps + + +class FieldLandmarks: + def __init__(self, world): + from world.world import World # type hinting + + self.world: World = world + + self.landmarks: dict = {} + + def update_from_perception(self, landmark_id: str, landmark_pos: np.ndarray) -> None: + """ + Calculates the global position of all currently visible landmarks. + """ + + world = self.world + local_cart_3d = MathOps.deg_sph2cart(landmark_pos) + + global_pos_3d = MathOps.rel_to_global_3d( + local_pos_3d=local_cart_3d, + global_pos_3d=world.global_position, + global_orientation_quat=world.agent.robot.global_orientation_quat + ) + + self.landmarks[landmark_id] = global_pos_3d + + def get_landmark_position(self, landmark_id: str) -> np.ndarray | None: + """ + Returns the calculated 2d global position for a given landmark ID. + Returns None if the landmark is not currently visible or processed. + """ + return self.global_positions.get(landmark_id) \ No newline at end of file diff --git a/world/commons/other_robot.py b/world/commons/other_robot.py new file mode 100644 index 0000000..2105ca1 --- /dev/null +++ b/world/commons/other_robot.py @@ -0,0 +1,7 @@ +import numpy as np + +class OtherRobot: + def __init__(self, is_teammate: bool=True): + self.is_teammate = is_teammate + self.position = np.zeros(3) + self.last_seen_time = None \ No newline at end of file diff --git a/world/commons/play_mode.py b/world/commons/play_mode.py new file mode 100644 index 0000000..326969b --- /dev/null +++ b/world/commons/play_mode.py @@ -0,0 +1,232 @@ +from enum import Enum, auto + + +class PlayModeEnum(Enum): + NOT_INITIALIZED = auto() + + """Enum specifying possible play modes.""" + + BEFORE_KICK_OFF = auto() + """The game hasn't started yet.""" + + OUR_KICK_OFF = auto() + """Our team has kick off.""" + + THEIR_KICK_OFF = auto() + """Their team has kick off.""" + + PLAY_ON = auto() + """The game is running normal.""" + + OUR_THROW_IN = auto() + """The ball left the field and our team has throw in.""" + + THEIR_THROW_IN = auto() + """The ball left the field and their team has throw in.""" + + OUR_CORNER_KICK = auto() + """Our team has corner kick.""" + + THEIR_CORNER_KICK = auto() + """Their team has corner kick.""" + + OUR_GOAL_KICK = auto() + """Our team has goal kick.""" + + THEIR_GOAL_KICK = auto() + """Their team has goal kick.""" + + OUR_OFFSIDE = auto() + """Their team violated the offside rule.""" + + THEIR_OFFSIDE = auto() + """Our team violated the offside rule.""" + + GAME_OVER = auto() + """The game has ended.""" + + OUR_GOAL = auto() + """Our team scored a goal.""" + + THEIR_GOAL = auto() + """Their team scored a goal.""" + + OUR_FREE_KICK = auto() + """Our team has a free kick.""" + + THEIR_FREE_KICK = auto() + """Their team has a free kick.""" + + OUR_DIRECT_FREE_KICK = auto() + """Our team has a direct free kick.""" + + THEIR_DIRECT_FREE_KICK = auto() + """Their team has a direct free kick.""" + + OUR_PENALTY_KICK = auto() + """Our team has a penalty kick (from the penalty spot).""" + + THEIR_PENALTY_KICK = auto() + """Their team has a penalty kick (from the penalty spot).""" + + OUR_PENALTY_SHOOT = auto() + """Our team has a penalty shoot (starting from somewhere on the field, allowed to touch the ball more than once).""" + + THEIR_PENALTY_SHOOT = auto() + """Their team has a penalty shoot (starting from somewhere on the field, allowed to touch the ball more than once).""" + + @classmethod + def get_playmode_from_string( + cls, playmode: str, is_left_team: bool + ) -> "PlayModeEnum": + assert isinstance(is_left_team, bool) + + playmode_ids = { + "BeforeKickOff": (PlayModeEnum.BEFORE_KICK_OFF,), + "KickOff_Left": ( + PlayModeEnum.OUR_KICK_OFF, + PlayModeEnum.THEIR_KICK_OFF, + ), + "KickOff_Right": ( + PlayModeEnum.THEIR_KICK_OFF, + PlayModeEnum.OUR_KICK_OFF, + ), + "PlayOn": (PlayModeEnum.PLAY_ON,), + "KickIn_Left": ( + PlayModeEnum.OUR_THROW_IN, + PlayModeEnum.THEIR_THROW_IN, + ), + "KickIn_Right": ( + PlayModeEnum.THEIR_THROW_IN, + PlayModeEnum.OUR_THROW_IN, + ), + "corner_kick_left": ( + PlayModeEnum.OUR_CORNER_KICK, + PlayModeEnum.THEIR_CORNER_KICK, + ), + "corner_kick_right": ( + PlayModeEnum.THEIR_CORNER_KICK, + PlayModeEnum.OUR_CORNER_KICK, + ), + "goal_kick_left": ( + PlayModeEnum.OUR_GOAL_KICK, + PlayModeEnum.THEIR_GOAL_KICK, + ), + "goal_kick_right": ( + PlayModeEnum.THEIR_GOAL_KICK, + PlayModeEnum.OUR_GOAL_KICK, + ), + "offside_left": ( + PlayModeEnum.OUR_OFFSIDE, + PlayModeEnum.THEIR_OFFSIDE, + ), + "offside_right": ( + PlayModeEnum.THEIR_OFFSIDE, + PlayModeEnum.OUR_OFFSIDE, + ), + "GameOver": (PlayModeEnum.GAME_OVER,), + "Goal_Left": ( + PlayModeEnum.OUR_GOAL, + PlayModeEnum.THEIR_GOAL, + ), + "Goal_Right": ( + PlayModeEnum.THEIR_GOAL, + PlayModeEnum.OUR_GOAL, + ), + "free_kick_left": ( + PlayModeEnum.OUR_FREE_KICK, + PlayModeEnum.THEIR_FREE_KICK, + ), + "free_kick_right": ( + PlayModeEnum.THEIR_FREE_KICK, + PlayModeEnum.OUR_FREE_KICK, + ), + "direct_free_kick_left": ( + PlayModeEnum.OUR_DIRECT_FREE_KICK, + PlayModeEnum.THEIR_DIRECT_FREE_KICK, + ), + "direct_free_kick_right": ( + PlayModeEnum.THEIR_DIRECT_FREE_KICK, + PlayModeEnum.OUR_DIRECT_FREE_KICK, + ), + "penalty_kick_left": ( + PlayModeEnum.OUR_PENALTY_KICK, + PlayModeEnum.THEIR_PENALTY_KICK, + ), + "penalty_kick_right": ( + PlayModeEnum.THEIR_PENALTY_KICK, + PlayModeEnum.OUR_PENALTY_KICK, + ), + "penalty_shoot_left": ( + PlayModeEnum.OUR_PENALTY_SHOOT, + PlayModeEnum.THEIR_PENALTY_SHOOT, + ), + "penalty_shoot_right": ( + PlayModeEnum.THEIR_PENALTY_SHOOT, + PlayModeEnum.OUR_PENALTY_SHOOT, + ), + }[playmode] + + playmode = None + if len(playmode_ids) > 1: + playmode = playmode_ids[0 if is_left_team else 1] + else: + playmode = playmode_ids[0] + + return playmode + + +class PlayModeGroupEnum(Enum): + NOT_INITIALIZED = auto() + OTHER = auto() + OUR_KICK = auto() + THEIR_KICK = auto() + ACTIVE_BEAM = auto() + PASSIVE_BEAM = auto() + + @classmethod + def get_group_from_playmode( + cls, playmode: PlayModeEnum, is_left_team: bool + ) -> "PlayModeGroupEnum": + playmode_group: PlayModeGroupEnum = None + + if playmode in (PlayModeEnum.PLAY_ON, PlayModeEnum.GAME_OVER): + playmode_group = cls.OTHER + elif playmode in ( + PlayModeEnum.OUR_CORNER_KICK, + PlayModeEnum.OUR_DIRECT_FREE_KICK, + PlayModeEnum.OUR_FREE_KICK, + PlayModeEnum.OUR_GOAL_KICK, + PlayModeEnum.OUR_KICK_OFF, + PlayModeEnum.OUR_OFFSIDE, + PlayModeEnum.OUR_PENALTY_KICK, + PlayModeEnum.OUR_PENALTY_SHOOT, + PlayModeEnum.OUR_THROW_IN, + ): + playmode_group = cls.OUR_KICK + elif playmode in ( + PlayModeEnum.THEIR_CORNER_KICK, + PlayModeEnum.THEIR_DIRECT_FREE_KICK, + PlayModeEnum.THEIR_FREE_KICK, + PlayModeEnum.THEIR_GOAL_KICK, + PlayModeEnum.THEIR_KICK_OFF, + PlayModeEnum.THEIR_OFFSIDE, + PlayModeEnum.THEIR_PENALTY_KICK, + PlayModeEnum.THEIR_PENALTY_SHOOT, + PlayModeEnum.THEIR_THROW_IN, + ): + playmode_group = cls.THEIR_KICK + elif (playmode is PlayModeEnum.THEIR_GOAL) or ( + is_left_team and playmode is PlayModeEnum.BEFORE_KICK_OFF + ): + playmode_group = cls.ACTIVE_BEAM + elif (playmode is PlayModeEnum.OUR_GOAL) or ( + not is_left_team and playmode is PlayModeEnum.BEFORE_KICK_OFF + ): + playmode_group = cls.PASSIVE_BEAM + else: + raise NotImplementedError( + f"Not implemented playmode group for playmode {playmode}" + ) + + return playmode_group diff --git a/world/robot.py b/world/robot.py new file mode 100644 index 0000000..9e8d172 --- /dev/null +++ b/world/robot.py @@ -0,0 +1,273 @@ +from abc import ABC, abstractmethod +from typing_extensions import override +import numpy as np +from communication.server import Server + + +class Robot(ABC): + """ + Base class for all robot models. + + This class defines the main structure and common data used by any robot, + such as motor positions, sensors, and control messages. + """ + def __init__(self, agent): + """ + Creates a new robot linked to the given agent. + + Args: + agent: The main agent that owns this robot. + """ + from agent.base_agent import Base_Agent # type hinting + + self.agent: Base_Agent = agent + self.server: Server = self.agent.server + + self.motor_targets: dict = { + motor: {"target_position": 0.0, "kp": 0.0, "kd": 0.0} + for motor in self.ROBOT_MOTORS + } + + self.motor_positions: dict = {motor: 0.0 for motor in self.ROBOT_MOTORS} # degrees + + self.motor_speeds: dict = {motor: 0.0 for motor in self.ROBOT_MOTORS} # degrees/s + + self._global_cheat_orientation = np.array([0, 0, 0, 1]) # quaternion [x, y, z, w] + + self.global_orientation_quat = np.array([0, 0, 0, 1]) # quaternion [x, y, z, w] + + self.global_orientation_euler = np.zeros(3) # euler [roll, pitch, yaw] + + self.gyroscope = np.zeros(3) # angular velocity [roll, pitch, yaw] (degrees/s) + + self.accelerometer = np.zeros(3) # linear acceleration [x, y, z] (m/s²) + + @property + @abstractmethod + def name(self) -> str: + """ + Returns the robot model name. + """ + raise NotImplementedError() + + @property + @abstractmethod + def ROBOT_MOTORS(self) -> tuple[str, ...]: + """ + Returns the list of motor names used by this robot. + """ + raise NotImplementedError() + + def set_motor_target_position( + self, motor_name: str, target_position: float, kp: float = 10, kd: float = 0.1 + ) -> None: + """ + Sets the desired position and PD gains for a given motor. + + For now, directly sets positions, as the simulator is doing the control + Args: + motor_name: Name of the motor. + target_position: Desired position in radians. + kp: Proportional gain. + kd: Derivative gain. + """ + self.motor_targets[motor_name] = { + "target_position": target_position, + "kp": kp, + "kd": kd, + } + + def commit_motor_targets_pd(self) -> None: + """ + Sends all motor target commands to the simulator. + """ + for motor_name, target_description in self.motor_targets.items(): + motor_msg = f'({motor_name} {target_description["target_position"]:.2f} 0.0 {target_description["kp"]:.2f} {target_description["kd"]:.2f} 0.0)' + self.server.commit(motor_msg) + + +class T1(Robot): + """ + Booster T1 + """ + @override + def __init__(self, agent): + super().__init__(agent) + + self.joint_nominal_position = np.array( + [ + 0.0, + 0.0, + 0.0, + 0.0, + 0.0, + 0.0, + 0.0, + 0.0, + 0.0, + 0.0, + 0.0, + 0.0, + 0.0, + 0.0, + 0.0, + 0.0, + 0.0, + 0.0, + 0.0, + 0.0, + 0.0, + 0.0, + 0.0, + ] + ) + + @property + @override + def name(self) -> str: + return "T1" + + @property + @override + def ROBOT_MOTORS(self) -> tuple[str, ...]: + return ( + "he1", + "he2", + "lae1", + "lae2", + "lae3", + "lae4", + "rae1", + "rae2", + "rae3", + "rae4", + "te1", + "lle1", + "lle2", + "lle3", + "lle4", + "lle5", + "lle6", + "rle1", + "rle2", + "rle3", + "rle4", + "rle5", + "rle6", + ) + + @property + def MOTOR_FROM_READABLE_TO_SERVER(self) -> dict: + """ + Maps readable joint names to their simulator motor codes. + """ + return { + "Head_yaw": "he1", + "Head_pitch": "he2", + "Left_Shoulder_Pitch": "lae1", + "Left_Shoulder_Roll": "lae2", + "Left_Elbow_Pitch": "lae3", + "Left_Elbow_Yaw": "lae4", + "Right_Shoulder_Pitch": "rae1", + "Right_Shoulder_Roll": "rae2", + "Right_Elbow_Pitch": "rae3", + "Right_Elbow_Yaw": "rae4", + "Waist": "te1", + "Left_Hip_Pitch": "lle1", + "Left_Hip_Roll": "lle2", + "Left_Hip_Yaw": "lle3", + "Left_Knee_Pitch": "lle4", + "Left_Ankle_Pitch": "lle5", + "Left_Ankle_Roll": "lle6", + "Right_Hip_Pitch": "rle1", + "Right_Hip_Roll": "rle2", + "Right_Hip_Yaw": "rle3", + "Right_Knee_Pitch": "rle4", + "Right_Ankle_Pitch": "rle5", + "Right_Ankle_Roll": "rle6", + } + + @property + def MOTOR_SYMMETRY(self) -> list[str, bool]: + """ + Defines pairs of symmetric motors and whether their direction is inverted. + + Returns: + A dictionary where each key is a logical joint group name, + and the value is a tuple (motor_names, inverted). + """ + return { + "Head_yaw": (("Head_yaw",), False), + "Head_pitch": (("Head_pitch",), False), + "Shoulder_Pitch": ( + ( + "Left_Shoulder_Pitch", + "Right_Shoulder_Pitch", + ), + False, + ), + "Shoulder_Roll": ( + ( + "Left_Shoulder_Roll", + "Right_Shoulder_Roll", + ), + True, + ), + "Elbow_Pitch": ( + ( + "Left_Elbow_Pitch", + "Right_Elbow_Pitch", + ), + False, + ), + "Elbow_Yaw": ( + ( + "Left_Elbow_Yaw", + "Right_Elbow_Yaw", + ), + True, + ), + "Waist": (("Waist",), False), + "Hip_Pitch": ( + ( + "Left_Hip_Pitch", + "Right_Hip_Pitch", + ), + False, + ), + "Hip_Roll": ( + ( + "Left_Hip_Roll", + "Right_Hip_Roll", + ), + True, + ), + "Hip_Yaw": ( + ( + "Left_Hip_Yaw", + "Right_Hip_Yaw", + ), + True, + ), + "Knee_Pitch": ( + ( + "Left_Knee_Pitch", + "Right_Knee_Pitch", + ), + False, + ), + "Ankle_Pitch": ( + ( + "Left_Ankle_Pitch", + "Right_Ankle_Pitch", + ), + False, + ), + "Ankle_Roll": ( + ( + "Left_Ankle_Roll", + "Right_Ankle_Roll", + ), + True, + ), + } diff --git a/world/world.py b/world/world.py new file mode 100644 index 0000000..8968a92 --- /dev/null +++ b/world/world.py @@ -0,0 +1,66 @@ +from dataclasses import Field +import numpy as np +from world.commons.other_robot import OtherRobot +from world.commons.field import FIFAField, HLAdultField +from world.commons.play_mode import PlayModeEnum, PlayModeGroupEnum + + +class World: + """ + Represents the current simulation world, containing all relevant + information about the environment, the ball, and the robots. + """ + + MAX_PLAYERS_PER_TEAM = 11 + + def __init__(self, agent, team_name: str, number: int, field_name: str): + """ + Initializes the world state. + + Args: + agent: Reference to the agent that owns this world. + team_name (str): The name of the agent's team. + number (int): The player's number within the team. + field_name (str): The name of the field to initialize + (e.g., 'fifa' or 'hl_adult'). + """ + + from agent.base_agent import Agent # type hinting + + self.agent: Agent = agent + self.team_name: str = team_name + self.number: int = number + self.playmode: PlayModeEnum = PlayModeEnum.NOT_INITIALIZED + self.playmode_group: PlayModeGroupEnum = PlayModeGroupEnum.NOT_INITIALIZED + self.is_left_team: bool = None + self.game_time: float = None + self.server_time: float = None + self.score_left: int = None + self.score_right: int = None + self.their_team_name: str = None + self.last_server_time: str = None + self._global_cheat_position: np.ndarray = np.zeros(3) + self.global_position: np.ndarray = np.zeros(3) + self.ball_pos: np.ndarray = np.zeros(3) + self.is_ball_pos_updated: bool = False + self.our_team_players: list[OtherRobot] = [OtherRobot() for _ in range(self.MAX_PLAYERS_PER_TEAM)] + self.their_team_players: list[OtherRobot] = [OtherRobot(is_teammate=False) for _ in + range(self.MAX_PLAYERS_PER_TEAM)] + self.field: Field = self.__initialize_field(field_name=field_name) + + def update(self) -> None: + """ + Updates the world state + """ + self.playmode_group = PlayModeGroupEnum.get_group_from_playmode( + playmode=self.playmode, is_left_team=self.is_left_team + ) + + def is_fallen(self) -> bool: + return self.global_position[2] < 0.3 + + def __initialize_field(self, field_name: str) -> Field: + if field_name in ('hl_adult', 'hl_adult_2020', 'hl_adult_2019',): + return HLAdultField(world=self) + else: + return FIFAField(world=self) \ No newline at end of file