This commit is contained in:
2026-03-10 09:31:39 -04:00
commit 1f4ab43c3c
52 changed files with 2660 additions and 0 deletions

10
.idea/.gitignore generated vendored Normal file
View File

@@ -0,0 +1,10 @@
# Default ignored files
/shelf/
/workspace.xml
# Ignored default folder with query files
/queries/
# Datasource local storage ignored files
/dataSources/
/dataSources.local.xml
# Editor-based HTTP Client requests
/httpRequests/

8
.idea/SE.iml generated Normal file
View File

@@ -0,0 +1,8 @@
<?xml version="1.0" encoding="UTF-8"?>
<module type="PYTHON_MODULE" version="4">
<component name="NewModuleRootManager">
<content url="file://$MODULE_DIR$" />
<orderEntry type="jdk" jdkName="$MODULE_DIR$/../../Anaconda/envs/Isaac" jdkType="Python SDK" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
</module>

View File

@@ -0,0 +1,6 @@
<component name="InspectionProjectProfileManager">
<profile version="1.0">
<option name="myName" value="Project Default" />
<inspection_tool class="PyUnresolvedReferencesInspection" enabled="false" level="WARNING" enabled_by_default="false" />
</profile>
</component>

View File

@@ -0,0 +1,6 @@
<component name="InspectionProjectProfileManager">
<settings>
<option name="USE_PROJECT_PROFILE" value="false" />
<version value="1.0" />
</settings>
</component>

7
.idea/misc.xml generated Normal file
View File

@@ -0,0 +1,7 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="Black">
<option name="sdkName" value="$PROJECT_DIR$/../../Anaconda/envs/Isaac" />
</component>
<component name="ProjectRootManager" version="2" project-jdk-name="$PROJECT_DIR$/../../Anaconda/envs/Isaac" project-jdk-type="Python SDK" />
</project>

8
.idea/modules.xml generated Normal file
View File

@@ -0,0 +1,8 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectModuleManager">
<modules>
<module fileurl="file://$PROJECT_DIR$/.idea/SE.iml" filepath="$PROJECT_DIR$/.idea/SE.iml" />
</modules>
</component>
</project>

Binary file not shown.

Binary file not shown.

135
agent/agent.py Normal file
View File

@@ -0,0 +1,135 @@
from dataclasses import Field
import logging
from typing import Mapping
import numpy as np
from utils.math_ops import MathOps
from world.commons.field import FIFAField, HLAdultField
from world.commons.play_mode import PlayModeEnum, PlayModeGroupEnum
logger = logging.getLogger()
class Agent:
"""
Responsible for deciding what the agent should do at each moment.
This class is called every simulation step to update the agent's behavior
based on the current state of the world and game conditions.
"""
BEAM_POSES: Mapping[type[Field], Mapping[int, tuple[float, float, float]]] ={
FIFAField: {
1: (2.1, 0, 0),
2: (22.0, 12.0, 0),
3: (22.0, 4.0, 0),
4: (22.0, -4.0, 0),
5: (22.0, -12.0, 0),
6: (15.0, 0.0, 0),
7: (4.0, 16.0, 0),
8: (11.0, 6.0, 0),
9: (11.0, -6.0, 0),
10: (4.0, -16.0, 0),
11: (7.0, 0.0, 0),
},
HLAdultField: {
1: (7.0, 0.0, 0),
2: (2.0, -1.5, 0),
3: (2.0, 1.5, 0),
}
}
def __init__(self, agent):
"""
Creates a new DecisionMaker linked to the given agent.
Args:
agent: The main agent that owns this DecisionMaker.
"""
from agent.base_agent import Base_Agent # type hinting
self.agent: Base_Agent = agent
self.is_getting_up: bool = False
def update_current_behavior(self) -> None:
"""
Chooses what the agent should do in the current step.
This function checks the game state and decides which behavior
or skill should be executed next.
"""
if self.agent.world.playmode is PlayModeEnum.GAME_OVER:
return
if self.agent.world.playmode_group in (
PlayModeGroupEnum.ACTIVE_BEAM,
PlayModeGroupEnum.PASSIVE_BEAM,
):
self.agent.server.commit_beam(
pos2d=self.BEAM_POSES[type(self.agent.world.field)][self.agent.world.number][:2],
rotation=self.BEAM_POSES[type(self.agent.world.field)][self.agent.world.number][2],
)
if self.is_getting_up or self.agent.skills_manager.is_ready(skill_name="GetUp"):
self.is_getting_up = not self.agent.skills_manager.execute(skill_name="GetUp")
elif self.agent.world.playmode is PlayModeEnum.PLAY_ON:
self.carry_ball()
elif self.agent.world.playmode in (PlayModeEnum.BEFORE_KICK_OFF, PlayModeEnum.THEIR_GOAL, PlayModeEnum.OUR_GOAL):
self.agent.skills_manager.execute("Neutral")
else:
self.carry_ball()
self.agent.robot.commit_motor_targets_pd()
def carry_ball(self):
"""
Basic example of a behavior: moves the robot toward the goal while handling the ball.
"""
their_goal_pos = self.agent.world.field.get_their_goal_position()[:2]
ball_pos = self.agent.world.ball_pos[:2]
my_pos = self.agent.world.global_position[:2]
ball_to_goal = their_goal_pos - ball_pos
bg_norm = np.linalg.norm(ball_to_goal)
if bg_norm == 0:
return
ball_to_goal_dir = ball_to_goal / bg_norm
dist_from_ball_to_start_carrying = 0.30
carry_ball_pos = ball_pos - ball_to_goal_dir * dist_from_ball_to_start_carrying
my_to_ball = ball_pos - my_pos
my_to_ball_norm = np.linalg.norm(my_to_ball)
if my_to_ball_norm == 0:
my_to_ball_dir = np.zeros(2)
else:
my_to_ball_dir = my_to_ball / my_to_ball_norm
cosang = np.dot(my_to_ball_dir, ball_to_goal_dir)
cosang = np.clip(cosang, -1.0, 1.0)
angle_diff = np.arccos(cosang)
ANGLE_TOL = np.deg2rad(7.5)
aligned = (my_to_ball_norm > 1e-6) and (angle_diff <= ANGLE_TOL)
behind_ball = np.dot(my_pos - ball_pos, ball_to_goal_dir) < 0
desired_orientation = MathOps.vector_angle(ball_to_goal)
if not aligned or not behind_ball:
self.agent.skills_manager.execute(
"Walk",
target_2d=carry_ball_pos,
is_target_absolute=True,
orientation=None if np.linalg.norm(my_pos - carry_ball_pos) > 2 else desired_orientation
)
else:
self.agent.skills_manager.execute(
"Walk",
target_2d=their_goal_pos,
is_target_absolute=True,
orientation=desired_orientation
)

81
agent/base_agent.py Normal file
View File

@@ -0,0 +1,81 @@
import logging
from agent.agent import Agent
from world.robot import T1, Robot
from behaviors.behavior_manager import BehaviorManager
from world.world import World
from communication.server import Server
from communication.world_parser import WorldParser
logger = logging.getLogger(__file__)
class Base_Agent:
def __init__(
self,
team_name: str = "Default",
number: int = 1,
host: str = "localhost",
port: int = 60000,
field: str = 'fifa'
):
"""
Initializes the agent and all its main components.
Args:
team_name (str): The name of the team the agent belongs to.
number (int): The player number assigned to this agent.
host (str): The host address of the simulator server.
port (int): The port number of the simulator server.
field (str): The name of the field configuration to use.
"""
self.world: World = World(agent=self, team_name=team_name, number=number, field_name=field)
self.world_parser: WorldParser = WorldParser(agent=self)
self.server: Server = Server(
host=host, port=port, world_parser=self.world_parser
)
self.robot: Robot = T1(agent=self)
self.skills_manager: BehaviorManager = BehaviorManager(agent=self)
self.decision_maker: Agent = Agent(agent=self)
def run(self):
"""
Starts the agents main control loop.
This method:
1. Connects to the simulator server.
2. Sends the initial configuration (init message).
3. Enters the main loop, where it:
- Receives and parses world updates.
- Updates internal world representation.
- Executes the decision-making process.
- Sends the next set of commands to the server.
"""
self.server.connect()
self.server.send_immediate(
f"(init {self.robot.name} {self.world.team_name} {self.world.number})"
)
while True:
try:
self.server.receive()
self.world.update()
self.decision_maker.update_current_behavior()
self.server.send()
except Exception:
self.shutdown()
raise
def shutdown(self):
"""
Safely shuts down the agent.
Logs a shutdown message and closes the server connection.
"""
logger.info("Shutting down.")
self.server.shutdown()

Binary file not shown.

Binary file not shown.

45
behaviors/behavior.py Normal file
View File

@@ -0,0 +1,45 @@
from abc import ABC, abstractmethod
class Behavior(ABC):
"""
Base interface for skills.
Each skill must implement:
- execute(reset: bool, *args) -> bool
- is_ready(*args) -> bool
"""
def __init__(self, agent):
from agent.base_agent import Base_Agent # type hinting
self.agent: Base_Agent = agent
@abstractmethod
def execute(self, reset: bool, *args, **kwargs) -> bool:
"""
Executes one step of the skill.
Parameters
----------
reset : bool
Indicates if this is the first invocation of this skill (should reset internal state).
*args
Skill-specific arguments.
Returns
-------
finished : bool
True if the skill has finished execution.
"""
raise NotImplementedError("Method 'execute' must be implemented in the Skill.")
@abstractmethod
def is_ready(self, *args) -> bool:
"""
Checks if the current conditions allow starting the skill.
Returns
-------
ready : bool
"""
raise NotImplementedError("Method 'is_ready' must be implemented in the Skill.")

View File

@@ -0,0 +1,79 @@
from behaviors.custom.keyframe.get_up.get_up import GetUp
from behaviors.custom.keyframe.keyframe import KeyframeSkill
from behaviors.custom.keyframe.poses.neutral.neutral import Neutral
from behaviors.behavior import Behavior
from behaviors.custom.reinforcement.walk.walk import Walk
class BehaviorManager:
def __init__(self, agent) -> None:
self.current_skill_name: str | None = None
self.previous_skill_name: str | None = None
self.current_sub_skill_name: str | None = None
self.skills: dict[str, Behavior] = {}
self.agent = agent
self.create_skills()
def create_skills(self):
"""
Loads all Skill classes and instantiates them.
Each skill is indexed by its class name.
"""
classes: list[type[Behavior]] = [Walk, Neutral, GetUp]
# instantiate each Skill and store in the skills dictionary
self.skills = {cls.__name__: cls(agent=self.agent) for cls in classes}
def get_skill_object(self, name: str) -> Behavior:
"""Returns the Skill instance corresponding to the given name."""
if name not in self.skills:
raise KeyError(f"No skill found with the name '{name}'")
return self.skills[name]
def execute(self, skill_name: str, *args, **kwargs) -> bool:
"""
Executes one step of the specified skill.
- On the first call for a skill, `reset=True` is sent.
- Returns True when the skill has finished execution.
"""
skill = self.get_skill_object(skill_name)
# detect if the skill has changed to trigger automatic reset
reset = self.current_skill_name != skill_name
if reset:
# previous skill was interrupted
if self.current_skill_name is not None:
self.previous_skill_name = self.current_skill_name
self.current_skill_name = skill_name
# call the Skill's own execute method
finished = skill.execute(reset, *args, **kwargs)
if not finished:
return False
# skill finished execution
self.previous_skill_name = self.current_skill_name
self.current_skill_name = None
return True
def execute_sub_skill(self, skill_name: str, reset: bool, *args, **kwargs) -> bool:
"""
Executes a step of a sub-skill within another skill.
- Does not change `current_skill_name`.
- `current_sub_skill_name` reflects the sub-skill currently being executed.
"""
skill = self.get_skill_object(skill_name)
self.current_sub_skill_name = skill_name
return skill.execute(reset, *args, **kwargs)
def is_ready(self, skill_name: str, *args, **kwargs) -> bool:
"""
Checks if the specified skill is ready to start based on current conditions.
"""
skill = self.get_skill_object(skill_name)
return skill.is_ready(*args, **kwargs)

View File

@@ -0,0 +1,66 @@
from collections import deque
import logging
import os
from behaviors.behavior import Behavior
from behaviors.custom.keyframe.keyframe import KeyframeSkill
logger = logging.getLogger()
class GetUp(Behavior):
STABILITY_THRESHOLD_CYCLES: int = 3
NEUTRAL_EXECUTION_TIME: float = 1.5
def __init__(self, agent):
super().__init__(agent)
self.get_up_front = KeyframeSkill(
agent=agent,
file=os.path.join(os.path.dirname(__file__), "get_up_front.yaml"),
)
self.get_up_back = KeyframeSkill(
agent=agent,
file=os.path.join(os.path.dirname(__file__), "get_up_back.yaml"),
)
def execute(self, reset, *args, **kwargs):
robot = self.agent.robot
if reset:
self.neutral_start_time = None
self.has_get_up = False
self.gyro_queue = deque(maxlen=self.STABILITY_THRESHOLD_CYCLES)
self.state = 0
self.chosen_get_up = None
self.should_reset_get_up = True
if not self.has_get_up:
if not self.chosen_get_up and self.agent.skills_manager.execute_sub_skill("Neutral", True):
self.gyro_queue.append(max(abs(robot.gyroscope)))
if len(self.gyro_queue) == self.STABILITY_THRESHOLD_CYCLES and all(g < 2.5 for g in self.gyro_queue):
if abs(robot.accelerometer[1]) < 2 and abs(robot.accelerometer[2]) < 3:
if robot.accelerometer[0] < -8:
self.chosen_get_up = self.get_up_front
elif robot.accelerometer[0] > 8:
self.chosen_get_up = self.get_up_back
if self.chosen_get_up:
self.has_get_up = self.chosen_get_up.execute(reset=self.should_reset_get_up)
self.should_reset_get_up = False
else:
if not self.neutral_start_time:
self.neutral_start_time = self.agent.world.server_time
neutral_elapsed_time = (
self.agent.world.server_time - self.neutral_start_time
)
if neutral_elapsed_time < self.NEUTRAL_EXECUTION_TIME:
self.agent.skills_manager.execute_sub_skill(
"Walk", reset=neutral_elapsed_time <= 1e-6, target_2d=(0, 0), is_target_absolute=False
)
else:
return True
return False
def is_ready(self, *args):
return self.agent.world.is_fallen()

View File

@@ -0,0 +1,94 @@
symmetry: true
kp: 75
kd: 1
keyframes:
- delta: 1
motor_positions:
Head_yaw: 0.000
Head_pitch: 0.000
Shoulder_Pitch: -114.592
Shoulder_Roll: -71.619
Elbow_Pitch: -85.944
Elbow_Yaw: -171.887
Waist: 0.000
Hip_Pitch: -45.836
Hip_Roll: 179.908
Hip_Yaw: 179.908
Knee_Pitch: -179.908
Ankle_Pitch: 89.954
Ankle_Roll: 89.954
kp: 250
kd: 1
p_gains:
Hip_Pitch: 20
Hip_Roll: 20
Hip_Yaw: 20
Knee_Pitch: 20
d_gains:
Hip_Pitch: 1
Hip_Roll: 1
Hip_Yaw: 1
Knee_Pitch: 1
- delta: 0.8125
motor_positions:
Head_yaw: 0.000
Head_pitch: 0.000
Shoulder_Pitch: -114.592
Shoulder_Roll: -89.954
Elbow_Pitch: -85.944
Elbow_Yaw: 0.000
Waist: 0.000
Hip_Pitch: -45.836
Hip_Roll: 179.908
Hip_Yaw: 179.908
Knee_Pitch: -179.908
Ankle_Pitch: 63.026
Ankle_Roll: 45.836
p_gains:
Hip_Pitch: 25
Hip_Roll: 25
Hip_Yaw: 25
Knee_Pitch: 25
- delta: 0.6
motor_positions:
Head_yaw: 0.000
Head_pitch: 0.000
Shoulder_Pitch: 89.954
Shoulder_Roll: -89.954
Elbow_Pitch: 0.000
Elbow_Yaw: 0.000
Waist: 0.000
Hip_Pitch: 20.054
Hip_Roll: 11.459
Hip_Yaw: 0.000
Knee_Pitch: -42.971
Ankle_Pitch: 57.296
Ankle_Roll: 0.000
kp: 40
kd: 4
p_gains:
Ankle_Pitch: 100
Ankle_Roll: 100
- delta: 0.25
motor_positions:
Head_yaw: 0.000
Head_pitch: 0.000
Shoulder_Pitch: 0.000
Shoulder_Roll: -77.363
Elbow_Pitch: -85.944
Elbow_Yaw: 0.000
Waist: 0.000
Hip_Pitch: 20.054
Hip_Roll: 7.162
Hip_Yaw: 0.000
Knee_Pitch: -42.971
Ankle_Pitch: 0.000
Ankle_Roll: 0.000
kp: 200
kd: 3.5

View File

@@ -0,0 +1,91 @@
symmetry: true
kp: 75
kd: 1
keyframes:
- delta: 0.1
motor_positions:
Head_yaw: 0.000
Head_pitch: 0.000
Shoulder_Pitch: -89.954
Shoulder_Roll: -71.619
Elbow_Pitch: 0.000
Elbow_Yaw: -114.592
Waist: 0.000
Hip_Pitch: 0.000
Hip_Roll: 14.324
Hip_Yaw: 0.000
Knee_Pitch: 0.000
Ankle_Pitch: 0.000
Ankle_Roll: 0.000
- delta: 0.5
motor_positions:
Head_yaw: 0.000
Head_pitch: 0.000
Shoulder_Pitch: 57.296
Shoulder_Roll: -71.619
Elbow_Pitch: -85.944
Elbow_Yaw: -171.887
Waist: 0.000
Hip_Pitch: 0.000
Hip_Roll: 14.324
Hip_Yaw: 0.000
Knee_Pitch: -28.648
Ankle_Pitch: 0.000
Ankle_Roll: 0.000
- delta: 0.8125
motor_positions:
Head_yaw: 0.000
Head_pitch: 0.000
Shoulder_Pitch: 89.954
Shoulder_Roll: -89.954
Elbow_Pitch: -85.944
Elbow_Yaw: 0.000
Waist: 0.000
Hip_Pitch: 200.536
Hip_Roll: 143.239
Hip_Yaw: 114.592
Knee_Pitch: -171.887
Ankle_Pitch: 89.954
Ankle_Roll: 24.919
- delta: 0.45
motor_positions:
Head_yaw: 0.000
Head_pitch: 0.000
Shoulder_Pitch: 89.954
Shoulder_Roll: -89.954
Elbow_Pitch: -85.944
Elbow_Yaw: 0.000
Waist: 0.000
Hip_Pitch: 20.054
Hip_Roll: 11.459
Hip_Yaw: 0.000
Knee_Pitch: -42.972
Ankle_Pitch: 0.000
Ankle_Roll: 0.000
kp: 50
kd: 2
p_gains:
Ankle_Pitch: 50
Ankle_Roll: 50
- delta: 0.13
motor_positions:
Head_yaw: 0.000
Head_pitch: 0.000
Shoulder_Pitch: 0.000
Shoulder_Roll: -77.363
Elbow_Pitch: -85.944
Elbow_Yaw: 0.000
Waist: 0.000
Hip_Pitch: 20.054
Hip_Roll: 7.162
Hip_Yaw: 0.000
Knee_Pitch: -42.972
Ankle_Pitch: 0.000
Ankle_Roll: 0.000
kp: 200
kd: 1.95

View File

@@ -0,0 +1,89 @@
from behaviors.behavior import Behavior
import yaml
class KeyframeSkill(Behavior):
def __init__(self, agent, file: str):
super().__init__(agent)
self.keyframe_step: int = 0
self.keyframe_start_time: float = None
with open(f"{file}", "r") as f:
self.skill_description = yaml.safe_load(f)
self.has_symmetry: bool = self.skill_description["symmetry"]
self.keyframes: list = self.skill_description["keyframes"]
self.kp: float = self.skill_description["kp"]
self.kd: float = self.skill_description["kd"]
def execute(self, reset, *args, **kwargs):
if reset:
self.keyframe_step = 0
self.keyframe_start_time = self.agent.world.server_time
current_keyframe = self.keyframes[self.keyframe_step]
raw_motor_positions = current_keyframe["motor_positions"]
current_keyframe_kp = current_keyframe.get("kp", self.kp)
current_keyframe_kd = current_keyframe.get("kd", self.kd)
p_gains = current_keyframe.get("p_gains", None)
d_gains = current_keyframe.get("d_gains", None)
for readable_motor_name, position in raw_motor_positions.items():
p_gain = (
p_gains.get(readable_motor_name, current_keyframe_kp)
if p_gains
else current_keyframe_kp
)
d_gain = (
d_gains.get(readable_motor_name, current_keyframe_kd)
if d_gains
else current_keyframe_kd
)
if self.has_symmetry:
motor_names, is_inverse_direction = self.agent.robot.MOTOR_SYMMETRY[
readable_motor_name
]
for motor_name in motor_names:
server_motor_name = self.agent.robot.MOTOR_FROM_READABLE_TO_SERVER[
motor_name
]
self.agent.robot.set_motor_target_position(
motor_name=server_motor_name,
target_position=position if is_inverse_direction else -position,
kp=p_gain,
kd=d_gain,
)
if is_inverse_direction:
is_inverse_direction = False # Only inverts one joint
else:
server_motor_name = self.agent.robot.MOTOR_FROM_READABLE_TO_SERVER[
readable_motor_name
]
self.agent.robot.set_motor_target_position(
motor_name=server_motor_name,
target_position=position,
kp=self.kp,
kd=self.kd,
)
keyframe_time: float = current_keyframe["delta"]
elapsed_keyframe_time = self.agent.world.server_time - self.keyframe_start_time
if elapsed_keyframe_time >= keyframe_time:
self.keyframe_start_time = self.agent.world.server_time
self.keyframe_step += 1
if self.keyframe_step >= len(self.keyframes):
return True
return False
def is_ready(self, *args):
return True

View File

@@ -0,0 +1,7 @@
import os
from behaviors.custom.keyframe.keyframe import KeyframeSkill
class Neutral(KeyframeSkill):
def __init__(self, agent):
super().__init__(agent, os.path.join(os.path.dirname(__file__), "neutral.yaml"))

View File

@@ -0,0 +1,19 @@
symmetry: true
kp: 150
kd: 1
keyframes:
- delta: 0.0
motor_positions:
Head_yaw: 0.000
Head_pitch: 0.000
Shoulder_Pitch: 0.000
Shoulder_Roll: -89.954
Elbow_Pitch: 0.000
Elbow_Yaw: 0.000
Waist: 0.000
Hip_Pitch: 0.000
Hip_Roll: 0.000
Hip_Yaw: 0.000
Knee_Pitch: 0.000
Ankle_Pitch: 0.000
Ankle_Roll: 0.000

Binary file not shown.

View File

@@ -0,0 +1,147 @@
import math
import os
import numpy as np
from behaviors.behavior import Behavior
from utils.math_ops import MathOps
from utils.neural_network import run_network, load_network
from scipy.spatial.transform import Rotation as R
class Walk(Behavior):
def __init__(self, agent):
super().__init__(agent)
self.joint_nominal_position = np.array(
[
0.0,
0.0,
0.0,
1.4,
0.0,
-0.4,
0.0,
-1.4,
0.0,
0.4,
0.0,
-0.4,
0.0,
0.0,
0.8,
-0.4,
0.0,
0.4,
0.0,
0.0,
-0.8,
0.4,
0.0,
]
)
self.train_sim_flip = np.array(
[
1.0,
-1.0,
1.0,
-1.0,
-1.0,
1.0,
-1.0,
-1.0,
1.0,
1.0,
1.0,
1.0,
-1.0,
-1.0,
1.0,
1.0,
-1.0,
-1.0,
-1.0,
-1.0,
-1.0,
-1.0,
-1.0,
]
)
self.scaling_factor = 0.5
self.previous_action = np.zeros(len(self.agent.robot.ROBOT_MOTORS))
self.model = load_network(model_path=os.path.join(os.path.dirname(__file__), "walk.onnx"))
def execute(self, reset: bool, target_2d: list, is_target_absolute: bool, orientation: float=None, is_orientation_absolute: bool=True) -> bool:
robot = self.agent.robot
world = self.agent.world
velocity = None
if is_target_absolute:
raw_target = target_2d - world.global_position[:2]
velocity = MathOps.rotate_2d_vec(raw_target, -robot.global_orientation_euler[2], is_rad=False)
else:
velocity = target_2d
rel_orientation = None
if orientation is None:
rel_orientation = MathOps.vector_angle(velocity) * 0.3
elif is_orientation_absolute:
rel_orientation = MathOps.normalize_deg(orientation - robot.global_orientation_euler[2])
else:
rel_orientation = orientation * 0.3
rel_orientation = np.clip(rel_orientation, -0.25, 0.25)
velocity = np.concatenate([velocity, np.array([rel_orientation])], axis=0)
velocity[0] = np.clip(velocity[0], -0.5, 0.5)
velocity[1] = np.clip(velocity[1], -0.25, 0.25)
radian_joint_positions = np.deg2rad(list(robot.motor_positions.values()))
radian_joint_speeds = np.deg2rad(list(robot.motor_speeds.values()))
qpos_qvel_previous_action = np.vstack(
(
[
(
(radian_joint_positions * self.train_sim_flip)
- self.joint_nominal_position
)
/ 4.6,
radian_joint_speeds / 110.0 * self.train_sim_flip,
self.previous_action / 10.0,
]
)
).T.flatten()
ang_vel = np.clip(np.deg2rad(robot.gyroscope) / 50.0, -1.0, 1.0)
orientation_quat_inv = R.from_quat(robot._global_cheat_orientation).inv()
projected_gravity = orientation_quat_inv.apply(np.array([0.0, 0.0, -1.0]))
#[0.5,0.25,0.25]
observation = np.concatenate(
[
qpos_qvel_previous_action,
ang_vel,
velocity,
projected_gravity,
]
)
observation = np.clip(observation, -10.0, 10.0)
nn_action = run_network(obs=observation, model=self.model)
target_joint_positions = (
self.joint_nominal_position + self.scaling_factor * nn_action
)
target_joint_positions *= self.train_sim_flip
self.previous_action = nn_action
for idx, target in enumerate(target_joint_positions):
robot.set_motor_target_position(
robot.ROBOT_MOTORS[idx], target*180/math.pi, kp=25, kd=0.6
)
def is_ready(self, *args, **kwargs) -> bool:
return True

78
build_binary.sh Executable file
View File

@@ -0,0 +1,78 @@
#!/bin/bash
set -e
if [ -z "$1" ]; then
echo "Error: Team name not provided."
echo "Usage: $0 <TEAM_NAME>"
exit 1
fi
TEAM_NAME="$1"
BUILD_DIR="./build"
DIST_DIR="$BUILD_DIR/dist"
WORK_DIR="$BUILD_DIR/build"
ONEFILE="--onefile"
rm -rf "$BUILD_DIR"
mkdir -p "$BUILD_DIR"
echo "Building executable with PyInstaller..."
pyinstaller \
--add-data './behaviors/custom/keyframe:behaviors/custom/keyframe' \
--add-data './behaviors/custom/reinforcement/walk/*.onnx:behaviors/custom/reinforcement/walk' \
${ONEFILE} \
--distpath "$DIST_DIR" \
--workpath "$WORK_DIR" \
--noconfirm \
--name "${TEAM_NAME,,}" \
./run_player.py
echo "Creating start.sh..."
cat > "${DIST_DIR}/start.sh" << EOF
#!/bin/bash
export OMP_NUM_THREADS=1
host=\${1:-localhost}
port=\${2:-60000}
for i in {1..11}; do
./$(echo "${TEAM_NAME,,}") -t ${TEAM_NAME} -n \$i --host \$host --port \$port &
done
EOF
echo "Creating kill.sh..."
cat > "${DIST_DIR}/kill.sh" << EOF
#!/bin/bash
pkill -9 -e $(echo "${TEAM_NAME,,}")
EOF
chmod a+x "${DIST_DIR}/start.sh"
chmod a+x "${DIST_DIR}/kill.sh"
#TEMPORARY!
echo "Creating start3v3.sh..."
cat > "${DIST_DIR}/start3v3.sh" << EOF
#!/bin/bash
export OMP_NUM_THREADS=1
host=\${1:-localhost}
port=\${2:-60000}
for i in {1..3}; do
./$(echo "${TEAM_NAME,,}") -t ${TEAM_NAME} -f hl_adult -n \$i --host \$host --port \$port &
done
EOF
chmod a+x "${DIST_DIR}/start3v3.sh"
echo "Packing into ${BUILD_DIR}/${TEAM_NAME}.tar.gz..."
tar -czf "${BUILD_DIR}/${TEAM_NAME}.tar.gz" -C "$DIST_DIR" . --transform "s|^|${TEAM_NAME}/|"
echo "Removing build files..."
rm -rf $DIST_DIR
rm -rf $WORK_DIR
echo "Removed build files!"
echo "Generated file: ${BUILD_DIR}/${TEAM_NAME}.tar.gz"
echo "Build completed successfully!"

Binary file not shown.

106
communication/server.py Normal file
View File

@@ -0,0 +1,106 @@
import logging
import socket
from select import select
from communication.world_parser import WorldParser
logger = logging.getLogger(__file__)
class Server:
def __init__(self, host: str, port: int, world_parser: WorldParser):
self.world_parser: WorldParser = world_parser
self.__host: str = host
self.__port: str = port
self.__socket: socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.__socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
self.__send_buff = []
self.__rcv_buffer_size = 1024
self.__rcv_buffer = bytearray(self.__rcv_buffer_size)
def connect(self) -> None:
logger.info("Connecting to server at %s:%d...", self.__host, self.__port)
while True:
try:
self.__socket.connect((self.__host, self.__port))
break
except ConnectionRefusedError:
logger.error(
"Connection refused. Make sure the server is running and listening on {self.__host}:{self.__port}."
)
logger.info(f"Server connection established to {self.__host}:{self.__port}.")
def shutdown(self) -> None:
self.__socket.close()
self.__socket.shutdown(socket.SHUT_RDWR)
def send_immediate(self, msg: str) -> None:
"""
Sends only the desired message, without the buffer
"""
try:
self.__socket.send(
(len(msg)).to_bytes(4, byteorder="big") + msg.encode()
) # Add message length in the first 4 bytes
except BrokenPipeError:
print("\nError: socket was closed by rcssserver3d!")
exit()
def send(self) -> None:
"""
Send all committed messages
"""
if len(select([self.__socket], [], [], 0.0)[0]) == 0:
self.send_immediate(("".join(self.__send_buff)))
else:
logger.info("Server_Comm.py: Received a new packet while thinking!")
self.__send_buff = []
def commit(self, msg: str) -> None:
"""
Appends message to buffer
"""
self.__send_buff.append(msg)
def commit_and_send(self, msg: str = "") -> None:
"""
Appends a message to buffer and then sends the buffer
"""
self.commit(msg)
self.send()
def receive(self) -> None:
"""
Receive the next message from the TCP/IP socket and updates world
"""
# Receive message length information
if (
self.__socket.recv_into(
self.__rcv_buffer, nbytes=4, flags=socket.MSG_WAITALL
)
!= 4
):
raise ConnectionResetError
msg_size = int.from_bytes(self.__rcv_buffer[:4], byteorder="big", signed=False)
# Ensure receive buffer is large enough to hold the message
if msg_size > self.__rcv_buffer_size:
self.__rcv_buffer_size = msg_size
self.__rcv_buffer = bytearray(self.__rcv_buffer_size)
# Receive message with the specified length
if (
self.__socket.recv_into(
self.__rcv_buffer, nbytes=msg_size, flags=socket.MSG_WAITALL
)
!= msg_size
):
raise ConnectionResetError
self.world_parser.parse(message=self.__rcv_buffer[:msg_size].decode())
def commit_beam(self, pos2d: list, rotation: float) -> None:
assert len(pos2d) == 2
self.commit(f"(beam {pos2d[0]} {pos2d[1]} {rotation})")

View File

@@ -0,0 +1,255 @@
import logging
import re
import numpy as np
from scipy.spatial.transform import Rotation as R
from utils.math_ops import MathOps
from world.commons.play_mode import PlayModeEnum
logger = logging.getLogger()
class WorldParser:
def __init__(self, agent):
from agent.base_agent import Base_Agent # type hinting
self.agent: Base_Agent = agent
def parse(self, message: str) -> None:
perception_dict: dict = self.__sexpression_to_dict(message)
world = self.agent.world
# Game parse
if world.is_left_team is None:
world.is_left_team = (
True
if perception_dict["GS"]["tl"] == world.team_name
else False if perception_dict["GS"]["tr"] == world.team_name else None
)
world.playmode = PlayModeEnum.get_playmode_from_string(
playmode=perception_dict["GS"]["pm"], is_left_team=world.is_left_team
)
world.game_time = perception_dict["GS"]["t"]
world.score_left = perception_dict["GS"]["sl"]
world.score_right = perception_dict["GS"]["sr"]
left_team_name: str = perception_dict["GS"].get("tl", None)
right_team_name: str = perception_dict["GS"].get("tr", None)
if left_team_name and right_team_name:
world.their_team_name = (
right_team_name if world.is_left_team else left_team_name
)
world.last_server_time = world.server_time
world.server_time = perception_dict["time"]["now"]
# Robot parse
robot = self.agent.robot
robot.motor_positions = {h["n"]: h["ax"] for h in perception_dict["HJ"]}
robot.motor_speeds = {h["n"]: h["vx"] for h in perception_dict["HJ"]}
world._global_cheat_position = np.array(perception_dict["pos"]["p"])
# changes quaternion from (w, x, y, z) to (x, y, z, w)
robot._global_cheat_orientation = np.array(perception_dict["quat"]["q"])
robot._global_cheat_orientation = robot._global_cheat_orientation[[1, 2, 3, 0]]
# flips 180 deg considering team side
try:
if not world.is_left_team:
world._global_cheat_position[:2] = -world._global_cheat_position[:2]
global_rotation = R.from_quat(robot.global_orientation_quat)
yaw180 = R.from_euler('z', 180, degrees=True)
fixed_rotation = yaw180 * global_rotation
robot._global_cheat_orientation = fixed_rotation.as_quat()
# updates global orientation
euler_angles_deg = R.from_quat(robot._global_cheat_orientation).as_euler('xyz', degrees=True)
robot.global_orientation_euler = np.array(
[MathOps.normalize_deg(axis_angle) for axis_angle in euler_angles_deg])
robot.global_orientation_quat = robot._global_cheat_orientation
world.global_position = world._global_cheat_position
except:
logger.exception(f'Failed to rotate orientation and position considering team side')
robot.gyroscope = np.array(perception_dict["GYR"]["rt"])
robot.accelerometer = np.array(perception_dict["ACC"]["a"])
world.is_ball_pos_updated = False
# Vision parse
if 'See' in perception_dict:
for seen_object in perception_dict['See']:
obj_type = seen_object['type']
if obj_type == 'B': # Ball
polar_coords = np.array(seen_object['pol'])
local_cartesian_3d = MathOps.deg_sph2cart(polar_coords)
world.ball_pos = MathOps.rel_to_global_3d(
local_pos_3d=local_cartesian_3d,
global_pos_3d=world.global_position,
global_orientation_quat=robot.global_orientation_quat
)
world.is_ball_pos_updated = True
elif obj_type == "P":
team = seen_object.get('team')
player_id = seen_object.get('id')
if team and player_id is not None:
if (team == world.team_name):
player = world.our_team_players[player_id - 1]
else:
player = world.their_team_players[player_id - 1]
objects = [seen_object.get('head'), seen_object.get('l_foot'), seen_object.get('r_foot')]
seen_objects = [object for object in objects if object]
if seen_objects:
local_cartesian_seen_objects = [MathOps.deg_sph2cart(object) for object in seen_objects]
approximated_centroid = np.mean(local_cartesian_seen_objects, axis=0)
player.position = MathOps.rel_to_global_3d(
local_pos_3d=approximated_centroid,
global_pos_3d=world.global_position,
global_orientation_quat=robot._global_cheat_orientation
)
player.last_seen_time = world.server_time
elif obj_type:
polar_coords = np.array(seen_object['pol'])
world.field.field_landmarks.update_from_perception(
landmark_id=obj_type,
landmark_pos=polar_coords
)
def __sexpression_to_dict(self, sexpression: str) -> dict:
"""
Parses a sensor data string of nested parenthesis groups into a structured dictionary.
Repeated top-level tags are aggregated into lists.
"""
def split_top_level(s: str):
"""Return a list of substrings that are top-level parenthesized groups."""
groups = []
depth = 0
start = None
for i, ch in enumerate(s):
if ch == '(':
if depth == 0:
start = i
depth += 1
elif ch == ')':
depth -= 1
if depth == 0 and start is not None:
groups.append(s[start:i + 1])
start = None
return groups
result = {}
top_groups = split_top_level(sexpression)
for grp in top_groups:
m = re.match(r'^\((\w+)\s*(.*)\)$', grp, re.DOTALL)
if not m:
continue
tag = m.group(1)
inner = m.group(2).strip()
if tag == "See":
see_items = []
subs = split_top_level(inner)
for sub in subs:
sm = re.match(r'^\((\w+)\s*(.*)\)$', sub, re.DOTALL)
if not sm:
continue
obj_type = sm.group(1)
inner2 = sm.group(2)
if obj_type == "P": # Player
player_data = {"type": "P"}
team_m = re.search(r'\(team\s+([^)]+)\)', inner2)
if team_m:
player_data["team"] = team_m.group(1)
id_m = re.search(r'\(id\s+([^)]+)\)', inner2)
if id_m:
try:
player_data["id"] = int(id_m.group(1))
except ValueError:
player_data["id"] = id_m.group(1)
parts = re.findall(r'\((\w+)\s*\(pol\s+([-0-9.\s]+)\)\)', inner2)
for part_name, pol_str in parts:
pol_vals = [float(x) for x in pol_str.strip().split()]
player_data[part_name] = pol_vals
see_items.append(player_data)
continue
# Generic
pol_m = re.search(r'\(pol\s+([-0-9.\s]+)\)', inner2)
vals = [float(x) for x in pol_m.group(1).strip().split()] if pol_m else []
see_items.append({"type": obj_type, "pol": vals})
result.setdefault("See", []).extend(see_items)
continue
# Generic parse for other tags (time, GS, quat, pos, HJ, ...)
group = {}
children = split_top_level(inner)
if children: # (key val1 val2)
for child in children:
im = re.match(r'^\(\s*(\w+)\s+([^)]+)\)$', child.strip(), re.DOTALL)
if not im:
continue
key = im.group(1)
vals = im.group(2).strip().split()
parsed = []
for t in vals:
try:
parsed.append(float(t))
except ValueError:
parsed.append(t)
group[key] = parsed[0] if len(parsed) == 1 else parsed
else:
# search pairs (key vals...)
items = re.findall(r"\(\s*(\w+)((?:\s+[^()]+)+)\)", inner)
for key, vals in items:
tokens = vals.strip().split()
parsed_vals = []
for t in tokens:
try:
parsed_vals.append(float(t))
except ValueError:
parsed_vals.append(t)
# Single value vs. list
group[key] = parsed_vals[0] if len(parsed_vals) == 1 else parsed_vals
# Merge into result, handling repeated tags as lists
if tag in result:
if isinstance(result[tag], list):
result[tag].append(group)
else:
result[tag] = [result[tag], group]
else:
result[tag] = group
return result

1
kill.sh Executable file
View File

@@ -0,0 +1 @@
pkill -9 -e -f "python3 run_player"

27
pyproject.toml Executable file
View File

@@ -0,0 +1,27 @@
[project]
name = "mujococodebase"
version = "0.1.0"
description = ""
authors = [
{name = "Alan Nascimento",email = "alanvan3791@gmail.com"}
]
readme = "README.md"
requires-python = ">=3.13,<3.14"
dependencies = [
"black (>=25.1.0,<26.0.0)",
"flake8 (>=7.3.0,<8.0.0)",
"numpy (>=2.3.3,<3.0.0)",
"logging (>=0.4.9.6,<0.5.0.0)",
"scipy (>=1.16.1,<2.0.0)",
"pylint (>=3.3.8,<4.0.0)",
"pyyaml (>=6.0.2,<7.0.0)",
"pyinstaller (>=6.16.0,<7.0.0)",
"onnxruntime (>=1.23.0,<2.0.0)",
"onnx (>=1.19.0,<2.0.0)",
"onnxruntime-gpu (>=1.23.0,<2.0.0)",
]
[build-system]
requires = ["poetry-core>=2.0.0,<3.0.0"]
build-backend = "poetry.core.masonry.api"

88
readme.md Normal file
View File

@@ -0,0 +1,88 @@
# Apollo Mujoco base code for training on CPU
This is a Python-based base code developed for the RCSSServerMJ. It was created to simplify the onboarding process for new teams joining the RoboCup 3D Soccer Simulation League using the Mujoco Simulator.
This code was influenced by the early demonstrations from MagmaOffenburg team of a client for the RCSSServerMJ, and the FCPortugal base code for the SimSpark simulator.
## Installation
### Make sure the following are installed on your system:
- Python ≥ 3.13
- Any Python dependency manager can be used, but **either Poetry is recommended**.
- **Poetry ≥ 2.0.0** ([Installation Guide](https://python-poetry.org/docs/#installing-with-pipx))
### Install Dependencies
The project dependencies are listed inside pyproject.toml
Using **Poetry**:
```bash
poetry install
```
## Instructions
### Run an agent
After installing the dependencies and setting up the environment, you can launch a player instance:
```bash
python3 run_player.py -n <player-number> -t <team-name>
```
Using **Poetry**:
```bash
poetry run python run_player.py -n <player-number> -t <team-name>
```
CLI parameter (a usage help is also available):
- `--host <ip>` to specify the host IP (default: 'localhost')
- `--port <port>` to specify the agent port (default: 60000)
- `-n <number>` Player number (111) (default: 1)
- `-t <team_name>` Team name (default: 'Default')
### Run a team
You can also use a shell script to start the entire team, optionally specifying host and port:
```bash
./start.sh [host] [port]
```
Using **Poetry**:
```bash
poetry run ./start.sh [host] [port]
```
CLI parameter:
- `[host]` Server IP address (default: 'localhost')
- `[port]` Server port for agents (default: 60000)
### Binary building
To compete, a binary is needed. It provides a compact, portable version and protects the source code. To create a binary, just run the script ```build_binary.sh```
```bash
./build_binary.sh <team-name>
```
Using **Poetry**:
```bash
poetry run ./build_binary.sh <team-name>
```
Once binary generation is finished, the result will be inside the build folder, as ```<team-name>.tar.gz```
### Authors and acknowledgment
This project was developed and contributed by:
- **Chenxi Liu**
- **Xuehao Xu**
### Thanks for
- **Alan Nascimento**
- **Luís Magalhães**
- **Pedro Rabelo**
- **Melissa Damasceno**
Contributions, bug reports, and feature requests are welcome via pull requests.

31
run_player.py Executable file
View File

@@ -0,0 +1,31 @@
import argparse
import logging
from agent.base_agent import Base_Agent
ch = logging.StreamHandler()
formatter = logging.Formatter(
"%(asctime)s [%(levelname)s] %(name)s (%(filename)s:%(lineno)d) - %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
ch.setLevel(logging.INFO)
logging.basicConfig(handlers=[ch], level=logging.DEBUG)
parser = argparse.ArgumentParser(description="Start a MuJoCo Agent.")
parser.add_argument("-t", "--team", type=str, default="Default", help="Team name")
parser.add_argument("-n", "--number", type=int, default=1, help="Player number")
parser.add_argument("--host", type=str, default="127.0.0.1", help="Server host")
parser.add_argument("--port", type=int, default=60000, help="Server port")
parser.add_argument("-f", "--field", type=str, default='fifa', help="Field to be played")
args = parser.parse_args()
player = Base_Agent(
team_name=args.team,
number=args.number,
host=args.host,
port=args.port,
field=args.field
)
player.run()

9
start.sh Executable file
View File

@@ -0,0 +1,9 @@
#!/bin/bash
export OMP_NUM_THREADS=1
host=${1:-localhost}
port=${2:-60000}
for i in {1..11}; do
python3 run_player.py --host $host --port $port -n $i -t SE &
done

9
start3v3.sh Executable file
View File

@@ -0,0 +1,9 @@
#!/bin/bash
export OMP_NUM_THREADS=1
host=${1:-localhost}
port=${2:-60000}
for i in {1..3}; do
python3 run_player.py --host $host --port $port -n $i -t SE -f hl_adult &
done

Binary file not shown.

Binary file not shown.

427
utils/math_ops.py Normal file
View File

@@ -0,0 +1,427 @@
from math import acos, asin, atan2, cos, pi, sin, sqrt, isnan
from scipy.spatial.transform import Rotation
import numpy as np
import sys
try:
GLOBAL_DIR = sys._MEIPASS # temporary folder with libs & data files
except:
GLOBAL_DIR = "."
class MathOps():
'''
This class provides general mathematical operations that are not directly available through numpy
'''
@staticmethod
def deg_sph2cart(spherical_vec):
''' Converts spherical coordinates in degrees to cartesian coordinates '''
r = spherical_vec[0]
h = spherical_vec[1] * pi / 180
v = spherical_vec[2] * pi / 180
return np.array([r * cos(v) * cos(h), r * cos(v) * sin(h), r * sin(v)])
@staticmethod
def deg_sin(deg_angle):
''' Returns sin of degrees '''
return sin(deg_angle * pi / 180)
@staticmethod
def deg_cos(deg_angle):
''' Returns cos of degrees '''
return cos(deg_angle * pi / 180)
@staticmethod
def to_3d(vec_2d, value=0) -> np.ndarray:
''' Returns new 3d vector from 2d vector '''
return np.append(vec_2d, value)
@staticmethod
def to_2d_as_3d(vec_3d) -> np.ndarray:
''' Returns new 3d vector where the 3rd dimension is zero '''
vec_2d_as_3d = np.copy(vec_3d)
vec_2d_as_3d[2] = 0
return vec_2d_as_3d
@staticmethod
def normalize_vec(vec) -> np.ndarray:
''' Divides vector by its length '''
size = np.linalg.norm(vec)
if size == 0: return vec
return vec / size
def rel_to_global_3d(local_pos_3d: np.ndarray, global_pos_3d: np.ndarray,
global_orientation_quat: np.ndarray) -> np.ndarray:
''' Converts a local 3d position to a global 3d position given the global position and orientation (quaternion) '''
rotation = Rotation.from_quat(global_orientation_quat)
rotated_vec = rotation.apply(local_pos_3d)
global_pos = global_pos_3d + rotated_vec
return global_pos
@staticmethod
def get_active_directory(dir: str) -> str:
global GLOBAL_DIR
return GLOBAL_DIR + dir
@staticmethod
def acos(val):
''' arccosine function that limits input '''
return acos(np.clip(val, -1, 1))
@staticmethod
def asin(val):
''' arcsine function that limits input '''
return asin(np.clip(val, -1, 1))
@staticmethod
def normalize_deg(val):
''' normalize val in range [-180,180[ '''
return (val + 180.0) % 360 - 180
@staticmethod
def normalize_rad(val):
''' normalize val in range [-pi,pi[ '''
return (val + pi) % (2 * pi) - pi
@staticmethod
def deg_to_rad(val):
''' convert degrees to radians '''
return val * 0.01745329251994330
@staticmethod
def rad_to_deg(val):
''' convert radians to degrees '''
return val * 57.29577951308232
@staticmethod
def vector_angle(vector, is_rad=False):
''' angle (degrees or radians) of 2D vector '''
if is_rad:
return atan2(vector[1], vector[0])
else:
return atan2(vector[1], vector[0]) * 180 / pi
@staticmethod
def vectors_angle(vec1, vec2, is_rad=False):
''' get angle between vectors (degrees or radians) '''
ang_rad = acos(np.dot(MathOps.normalize_vec(vec1), MathOps.normalize_vec(vec2)))
return ang_rad if is_rad else ang_rad * 180 / pi
@staticmethod
def vector_from_angle(angle, is_rad=False):
''' unit vector with direction given by `angle` '''
if is_rad:
return np.array([cos(angle), sin(angle)], float)
else:
return np.array([MathOps.deg_cos(angle), MathOps.deg_sin(angle)], float)
@staticmethod
def target_abs_angle(pos2d, target, is_rad=False):
''' angle (degrees or radians) of vector (target-pos2d) '''
if is_rad:
return atan2(target[1] - pos2d[1], target[0] - pos2d[0])
else:
return atan2(target[1] - pos2d[1], target[0] - pos2d[0]) * 180 / pi
@staticmethod
def target_rel_angle(pos2d, ori, target, is_rad=False):
''' relative angle (degrees or radians) of target if we're located at 'pos2d' with orientation 'ori' (degrees or radians) '''
if is_rad:
return MathOps.normalize_rad(atan2(target[1] - pos2d[1], target[0] - pos2d[0]) - ori)
else:
return MathOps.normalize_deg(atan2(target[1] - pos2d[1], target[0] - pos2d[0]) * 180 / pi - ori)
@staticmethod
def rotate_2d_vec(vec, angle, is_rad=False):
''' rotate 2D vector anticlockwise around the origin by `angle` '''
cos_ang = cos(angle) if is_rad else cos(angle * pi / 180)
sin_ang = sin(angle) if is_rad else sin(angle * pi / 180)
return np.array([cos_ang * vec[0] - sin_ang * vec[1], sin_ang * vec[0] + cos_ang * vec[1]])
@staticmethod
def distance_point_to_line(p: np.ndarray, a: np.ndarray, b: np.ndarray):
'''
Distance between point p and 2d line 'ab' (and side where p is)
Parameters
----------
a : ndarray
2D point that defines line
b : ndarray
2D point that defines line
p : ndarray
2D point
Returns
-------
distance : float
distance between line and point
side : str
if we are at a, looking at b, p may be at our "left" or "right"
'''
line_len = np.linalg.norm(b - a)
if line_len == 0: # assumes vertical line
dist = sdist = np.linalg.norm(p - a)
else:
sdist = np.cross(b - a, p - a) / line_len
dist = abs(sdist)
return dist, "left" if sdist > 0 else "right"
@staticmethod
def distance_point_to_point_2d(p1: np.ndarray, p2: np.ndarray) -> float:
''' Distance in 2d from point p1 to point p2'''
return np.sqrt((p1[0] - p2[0]) ** 2 + (p1[1] - p2[1]) ** 2)
@staticmethod
def is_point_between(p: np.ndarray, a: np.ndarray, b: np.ndarray) -> bool:
''' Verify if point 'p' is between 'a' and 'b' '''
p_2d = p[:2]
a_2d = a[:2]
b_2d = b[:2]
return np.all(np.minimum(a_2d, b_2d) <= p_2d) and np.all(p_2d <= np.maximum(a_2d, b_2d))
@staticmethod
def is_x_between(x1: np.ndarray, x2: np.ndarray, x3: np.ndarray) -> bool:
''' Verify if x-axis of point 'x1' is between 'x2' and 'x3' '''
return x1 >= min(x2, x3) and x1 <= max(x2, x3)
def distance_point_to_segment(p: np.ndarray, a: np.ndarray, b: np.ndarray):
''' Distance from point p to 2d line segment 'ab' '''
ap = p - a
ab = b - a
ad = MathOps.vector_projection(ap, ab)
# Is d in ab? We can find k in (ad = k * ab) without computing any norm
# we use the largest dimension of ab to avoid division by 0
k = ad[0] / ab[0] if abs(ab[0]) > abs(ab[1]) else ad[1] / ab[1]
if k <= 0:
return np.linalg.norm(ap)
elif k >= 1:
return np.linalg.norm(p - b)
else:
return np.linalg.norm(p - (ad + a)) # p-d
@staticmethod
def distance_point_to_ray(p: np.ndarray, ray_start: np.ndarray, ray_direction: np.ndarray):
''' Distance from point p to 2d ray '''
rp = p - ray_start
rd = MathOps.vector_projection(rp, ray_direction)
# Is d in ray? We can find k in (rd = k * ray_direction) without computing any norm
# we use the largest dimension of ray_direction to avoid division by 0
k = rd[0] / ray_direction[0] if abs(ray_direction[0]) > abs(ray_direction[1]) else rd[1] / ray_direction[1]
if k <= 0:
return np.linalg.norm(rp)
else:
return np.linalg.norm(p - (rd + ray_start)) # p-d
@staticmethod
def closest_point_on_ray_to_point(p: np.ndarray, ray_start: np.ndarray, ray_direction: np.ndarray):
''' Point on ray closest to point p '''
rp = p - ray_start
rd = MathOps.vector_projection(rp, ray_direction)
# Is d in ray? We can find k in (rd = k * ray_direction) without computing any norm
# we use the largest dimension of ray_direction to avoid division by 0
k = rd[0] / ray_direction[0] if abs(ray_direction[0]) > abs(ray_direction[1]) else rd[1] / ray_direction[1]
if k <= 0:
return ray_start
else:
return rd + ray_start
@staticmethod
def does_circle_intersect_segment(p: np.ndarray, r, a: np.ndarray, b: np.ndarray):
''' Returns true if circle (center p, radius r) intersect 2d line segment '''
ap = p - a
ab = b - a
ad = MathOps.vector_projection(ap, ab)
# Is d in ab? We can find k in (ad = k * ab) without computing any norm
# we use the largest dimension of ab to avoid division by 0
k = ad[0] / ab[0] if abs(ab[0]) > abs(ab[1]) else ad[1] / ab[1]
if k <= 0:
return np.dot(ap, ap) <= r * r
elif k >= 1:
return np.dot(p - b, p - b) <= r * r
dp = p - (ad + a)
return np.dot(dp, dp) <= r * r
@staticmethod
def vector_projection(a: np.ndarray, b: np.ndarray):
''' Vector projection of a onto b '''
b_dot = np.dot(b, b)
return b * np.dot(a, b) / b_dot if b_dot != 0 else b
@staticmethod
def do_noncollinear_segments_intersect(a, b, c, d):
'''
Check if 2d line segment 'ab' intersects with noncollinear 2d line segment 'cd'
Explanation: https://www.geeksforgeeks.org/check-if-two-given-line-segments-intersect/
'''
ccw = lambda a, b, c: (c[1] - a[1]) * (b[0] - a[0]) > (b[1] - a[1]) * (c[0] - a[0])
return ccw(a, c, d) != ccw(b, c, d) and ccw(a, b, c) != ccw(a, b, d)
@staticmethod
def intersection_segment_opp_goal(a: np.ndarray, b: np.ndarray):
''' Computes the intersection point of 2d segment 'ab' and the opponents' goal (front line) '''
vec_x = b[0] - a[0]
# Collinear intersections are not accepted
if vec_x == 0: return None
k = (15.01 - a[0]) / vec_x
# No collision
if k < 0 or k > 1: return None
intersection_pt = a + (b - a) * k
if -1.01 <= intersection_pt[1] <= 1.01:
return intersection_pt
else:
return None
@staticmethod
def intersection_circle_opp_goal(p: np.ndarray, r):
'''
Computes the intersection segment of circle (center p, radius r) and the opponents' goal (front line)
Only the y coordinates are returned since the x coordinates are always equal to 15
'''
x_dev = abs(15 - p[0])
if x_dev > r:
return None # no intersection with x=15
y_dev = sqrt(r * r - x_dev * x_dev)
p1 = max(p[1] - y_dev, -1.01)
p2 = min(p[1] + y_dev, 1.01)
if p1 == p2:
return p1 # return the y coordinate of a single intersection point
elif p2 < p1:
return None # no intersection
else:
return p1, p2 # return the y coordinates of the intersection segment
@staticmethod
def distance_point_to_opp_goal(p: np.ndarray):
''' Distance between point 'p' and the opponents' goal (front line) '''
if p[1] < -1.01:
return np.linalg.norm(p - (15, -1.01))
elif p[1] > 1.01:
return np.linalg.norm(p - (15, 1.01))
else:
return abs(15 - p[0])
@staticmethod
def circle_line_segment_intersection(circle_center, circle_radius, pt1, pt2, full_line=True, tangent_tol=1e-9):
""" Find the points at which a circle intersects a line-segment. This can happen at 0, 1, or 2 points.
:param circle_center: The (x, y) location of the circle center
:param circle_radius: The radius of the circle
:param pt1: The (x, y) location of the first point of the segment
:param pt2: The (x, y) location of the second point of the segment
:param full_line: True to find intersections along full line - not just in the segment. False will just return intersections within the segment.
:param tangent_tol: Numerical tolerance at which we decide the intersections are close enough to consider it a tangent
:return Sequence[Tuple[float, float]]: A list of length 0, 1, or 2, where each element is a point at which the circle intercepts a line segment.
Note: We follow: http://mathworld.wolfram.com/Circle-LineIntersection.html
"""
(p1x, p1y), (p2x, p2y), (cx, cy) = pt1, pt2, circle_center
(x1, y1), (x2, y2) = (p1x - cx, p1y - cy), (p2x - cx, p2y - cy)
dx, dy = (x2 - x1), (y2 - y1)
dr = (dx ** 2 + dy ** 2) ** .5
big_d = x1 * y2 - x2 * y1
discriminant = circle_radius ** 2 * dr ** 2 - big_d ** 2
if discriminant < 0: # No intersection between circle and line
return []
else: # There may be 0, 1, or 2 intersections with the segment
intersections = [
(cx + (big_d * dy + sign * (-1 if dy < 0 else 1) * dx * discriminant ** .5) / dr ** 2,
cy + (-big_d * dx + sign * abs(dy) * discriminant ** .5) / dr ** 2)
for sign in ((1, -1) if dy < 0 else (-1, 1))] # This makes sure the order along the segment is correct
if not full_line: # If only considering the segment, filter out intersections that do not fall within the segment
fraction_along_segment = [
(xi - p1x) / dx if abs(dx) > abs(dy) else (yi - p1y) / dy for xi, yi in intersections]
intersections = [pt for pt, frac in zip(
intersections, fraction_along_segment) if 0 <= frac <= 1]
# If line is tangent to circle, return just one point (as both intersections have same location)
if len(intersections) == 2 and abs(discriminant) <= tangent_tol:
return [intersections[0]]
else:
return intersections
# adapted from https://stackoverflow.com/questions/3252194/numpy-and-line-intersections
@staticmethod
def get_line_intersection(a1, a2, b1, b2):
"""
Returns the point of intersection of the lines passing through a2,a1 and b2,b1.
a1: [x, y] a point on the first line
a2: [x, y] another point on the first line
b1: [x, y] a point on the second line
b2: [x, y] another point on the second line
"""
s = np.vstack([a1, a2, b1, b2]) # s for stacked
h = np.hstack((s, np.ones((4, 1)))) # h for homogeneous
l1 = np.cross(h[0], h[1]) # get first line
l2 = np.cross(h[2], h[3]) # get second line
x, y, z = np.cross(l1, l2) # point of intersection
if z == 0: # lines are parallel
return np.array([float('inf'), float('inf')])
return np.array([x / z, y / z], float)
@staticmethod
def get_score_based_on_tanh(input, vertical_scaling=0.5, horizontal_scaling=1, vertical_shift=0.5,
horizontal_shift=0) -> float:
'''
Based on hiperbolic tangent function, it's calculated a score between 0 and 1 (or others, depending on the parameters)
Function: a * tanh(x * c + d) + b
:param input: x value of the function
:param vertical_scaling: strech (a > 1) or compress (0 < a < 1) the function
:param horizontal_scaling: strech (c > 1) or compress (0 < c < 1) the function
:param vertical_shift: shift up (b > 0) or down (b < 0) the function
:param horizontal_shift: shift right (d < 0) or left (d > 0) the function
'''
return vertical_scaling * np.tanh(input * horizontal_scaling + horizontal_shift) + vertical_shift
@staticmethod
def get_angle_to_origin_radians(point: np.ndarray):
x, y = point
if x == 0:
return pi / 2 if y > 0 else -pi / 2
result = atan2(y, x)
result = result if not isnan(result) else pi / 2
return MathOps.normalize_rad(result)

69
utils/neural_network.py Normal file
View File

@@ -0,0 +1,69 @@
import numpy as np
import onnxruntime as ort
def export_model(model_class, weights_path, output_file):
"""
Export a PyTorch model to ONNX format automatically detecting input shape.
"""
import torch # imported only here
model = model_class()
model.load_state_dict(torch.load(weights_path, map_location="cpu"))
model.eval()
# Infer input size from first Linear layer
first_linear = next(m for m in model.modules() if isinstance(m, torch.nn.Linear))
input_size = first_linear.in_features
dummy_input = torch.randn(1, input_size)
torch.onnx.export(
model,
dummy_input,
output_file,
input_names=["obs"],
output_names=["action"],
dynamic_axes={"obs": {0: "batch"}},
opset_version=17,
)
print(f"Model exported to {output_file} (input size: {input_size})")
def load_network(model_path):
"""
Load an ONNX model into memory for fast reuse.
"""
session = ort.InferenceSession(model_path)
input_name = session.get_inputs()[0].name
output_name = session.get_outputs()[0].name
return {"session": session, "input_name": input_name, "output_name": output_name}
def run_network(obs, model):
"""
Run a preloaded ONNX model and return a flat float array suitable for motor targets.
Args:
obs (np.ndarray): Input observation array.
model (dict): The loaded model from load_network().
Returns:
np.ndarray: 1D array of floats.
"""
if not isinstance(obs, np.ndarray):
obs = np.array(obs, dtype=np.float32)
else:
obs = obs.astype(np.float32) # ensure float32
if obs.ndim == 1:
obs = obs[np.newaxis, :] # make batch dimension
session = model["session"]
input_name = model["input_name"]
output_name = model["output_name"]
result = session.run([output_name], {input_name: obs})[0]
# flatten to 1D and convert to float
result = result.flatten().astype(np.float32)
return result

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

50
world/commons/field.py Normal file
View File

@@ -0,0 +1,50 @@
from abc import ABC, abstractmethod
from typing_extensions import override
from world.commons.field_landmarks import FieldLandmarks
class Field(ABC):
def __init__(self, world):
from world.world import World # type hinting
self.world: World = world
self.field_landmarks: FieldLandmarks = FieldLandmarks(world=self.world)
def get_our_goal_position(self):
return (-self.get_length() / 2, 0)
def get_their_goal_position(self):
return (self.get_length() / 2, 0)
@abstractmethod
def get_width(self):
raise NotImplementedError()
@abstractmethod
def get_length(self):
raise NotImplementedError()
class FIFAField(Field):
def __init__(self, world):
super().__init__(world)
@override
def get_width(self):
return 68
@override
def get_length(self):
return 105
class HLAdultField(Field):
def __init__(self, world):
super().__init__(world)
@override
def get_width(self):
return 9
@override
def get_length(self):
return 14

View File

@@ -0,0 +1,34 @@
import numpy as np
from utils.math_ops import MathOps
class FieldLandmarks:
def __init__(self, world):
from world.world import World # type hinting
self.world: World = world
self.landmarks: dict = {}
def update_from_perception(self, landmark_id: str, landmark_pos: np.ndarray) -> None:
"""
Calculates the global position of all currently visible landmarks.
"""
world = self.world
local_cart_3d = MathOps.deg_sph2cart(landmark_pos)
global_pos_3d = MathOps.rel_to_global_3d(
local_pos_3d=local_cart_3d,
global_pos_3d=world.global_position,
global_orientation_quat=world.agent.robot.global_orientation_quat
)
self.landmarks[landmark_id] = global_pos_3d
def get_landmark_position(self, landmark_id: str) -> np.ndarray | None:
"""
Returns the calculated 2d global position for a given landmark ID.
Returns None if the landmark is not currently visible or processed.
"""
return self.global_positions.get(landmark_id)

View File

@@ -0,0 +1,7 @@
import numpy as np
class OtherRobot:
def __init__(self, is_teammate: bool=True):
self.is_teammate = is_teammate
self.position = np.zeros(3)
self.last_seen_time = None

232
world/commons/play_mode.py Normal file
View File

@@ -0,0 +1,232 @@
from enum import Enum, auto
class PlayModeEnum(Enum):
NOT_INITIALIZED = auto()
"""Enum specifying possible play modes."""
BEFORE_KICK_OFF = auto()
"""The game hasn't started yet."""
OUR_KICK_OFF = auto()
"""Our team has kick off."""
THEIR_KICK_OFF = auto()
"""Their team has kick off."""
PLAY_ON = auto()
"""The game is running normal."""
OUR_THROW_IN = auto()
"""The ball left the field and our team has throw in."""
THEIR_THROW_IN = auto()
"""The ball left the field and their team has throw in."""
OUR_CORNER_KICK = auto()
"""Our team has corner kick."""
THEIR_CORNER_KICK = auto()
"""Their team has corner kick."""
OUR_GOAL_KICK = auto()
"""Our team has goal kick."""
THEIR_GOAL_KICK = auto()
"""Their team has goal kick."""
OUR_OFFSIDE = auto()
"""Their team violated the offside rule."""
THEIR_OFFSIDE = auto()
"""Our team violated the offside rule."""
GAME_OVER = auto()
"""The game has ended."""
OUR_GOAL = auto()
"""Our team scored a goal."""
THEIR_GOAL = auto()
"""Their team scored a goal."""
OUR_FREE_KICK = auto()
"""Our team has a free kick."""
THEIR_FREE_KICK = auto()
"""Their team has a free kick."""
OUR_DIRECT_FREE_KICK = auto()
"""Our team has a direct free kick."""
THEIR_DIRECT_FREE_KICK = auto()
"""Their team has a direct free kick."""
OUR_PENALTY_KICK = auto()
"""Our team has a penalty kick (from the penalty spot)."""
THEIR_PENALTY_KICK = auto()
"""Their team has a penalty kick (from the penalty spot)."""
OUR_PENALTY_SHOOT = auto()
"""Our team has a penalty shoot (starting from somewhere on the field, allowed to touch the ball more than once)."""
THEIR_PENALTY_SHOOT = auto()
"""Their team has a penalty shoot (starting from somewhere on the field, allowed to touch the ball more than once)."""
@classmethod
def get_playmode_from_string(
cls, playmode: str, is_left_team: bool
) -> "PlayModeEnum":
assert isinstance(is_left_team, bool)
playmode_ids = {
"BeforeKickOff": (PlayModeEnum.BEFORE_KICK_OFF,),
"KickOff_Left": (
PlayModeEnum.OUR_KICK_OFF,
PlayModeEnum.THEIR_KICK_OFF,
),
"KickOff_Right": (
PlayModeEnum.THEIR_KICK_OFF,
PlayModeEnum.OUR_KICK_OFF,
),
"PlayOn": (PlayModeEnum.PLAY_ON,),
"KickIn_Left": (
PlayModeEnum.OUR_THROW_IN,
PlayModeEnum.THEIR_THROW_IN,
),
"KickIn_Right": (
PlayModeEnum.THEIR_THROW_IN,
PlayModeEnum.OUR_THROW_IN,
),
"corner_kick_left": (
PlayModeEnum.OUR_CORNER_KICK,
PlayModeEnum.THEIR_CORNER_KICK,
),
"corner_kick_right": (
PlayModeEnum.THEIR_CORNER_KICK,
PlayModeEnum.OUR_CORNER_KICK,
),
"goal_kick_left": (
PlayModeEnum.OUR_GOAL_KICK,
PlayModeEnum.THEIR_GOAL_KICK,
),
"goal_kick_right": (
PlayModeEnum.THEIR_GOAL_KICK,
PlayModeEnum.OUR_GOAL_KICK,
),
"offside_left": (
PlayModeEnum.OUR_OFFSIDE,
PlayModeEnum.THEIR_OFFSIDE,
),
"offside_right": (
PlayModeEnum.THEIR_OFFSIDE,
PlayModeEnum.OUR_OFFSIDE,
),
"GameOver": (PlayModeEnum.GAME_OVER,),
"Goal_Left": (
PlayModeEnum.OUR_GOAL,
PlayModeEnum.THEIR_GOAL,
),
"Goal_Right": (
PlayModeEnum.THEIR_GOAL,
PlayModeEnum.OUR_GOAL,
),
"free_kick_left": (
PlayModeEnum.OUR_FREE_KICK,
PlayModeEnum.THEIR_FREE_KICK,
),
"free_kick_right": (
PlayModeEnum.THEIR_FREE_KICK,
PlayModeEnum.OUR_FREE_KICK,
),
"direct_free_kick_left": (
PlayModeEnum.OUR_DIRECT_FREE_KICK,
PlayModeEnum.THEIR_DIRECT_FREE_KICK,
),
"direct_free_kick_right": (
PlayModeEnum.THEIR_DIRECT_FREE_KICK,
PlayModeEnum.OUR_DIRECT_FREE_KICK,
),
"penalty_kick_left": (
PlayModeEnum.OUR_PENALTY_KICK,
PlayModeEnum.THEIR_PENALTY_KICK,
),
"penalty_kick_right": (
PlayModeEnum.THEIR_PENALTY_KICK,
PlayModeEnum.OUR_PENALTY_KICK,
),
"penalty_shoot_left": (
PlayModeEnum.OUR_PENALTY_SHOOT,
PlayModeEnum.THEIR_PENALTY_SHOOT,
),
"penalty_shoot_right": (
PlayModeEnum.THEIR_PENALTY_SHOOT,
PlayModeEnum.OUR_PENALTY_SHOOT,
),
}[playmode]
playmode = None
if len(playmode_ids) > 1:
playmode = playmode_ids[0 if is_left_team else 1]
else:
playmode = playmode_ids[0]
return playmode
class PlayModeGroupEnum(Enum):
NOT_INITIALIZED = auto()
OTHER = auto()
OUR_KICK = auto()
THEIR_KICK = auto()
ACTIVE_BEAM = auto()
PASSIVE_BEAM = auto()
@classmethod
def get_group_from_playmode(
cls, playmode: PlayModeEnum, is_left_team: bool
) -> "PlayModeGroupEnum":
playmode_group: PlayModeGroupEnum = None
if playmode in (PlayModeEnum.PLAY_ON, PlayModeEnum.GAME_OVER):
playmode_group = cls.OTHER
elif playmode in (
PlayModeEnum.OUR_CORNER_KICK,
PlayModeEnum.OUR_DIRECT_FREE_KICK,
PlayModeEnum.OUR_FREE_KICK,
PlayModeEnum.OUR_GOAL_KICK,
PlayModeEnum.OUR_KICK_OFF,
PlayModeEnum.OUR_OFFSIDE,
PlayModeEnum.OUR_PENALTY_KICK,
PlayModeEnum.OUR_PENALTY_SHOOT,
PlayModeEnum.OUR_THROW_IN,
):
playmode_group = cls.OUR_KICK
elif playmode in (
PlayModeEnum.THEIR_CORNER_KICK,
PlayModeEnum.THEIR_DIRECT_FREE_KICK,
PlayModeEnum.THEIR_FREE_KICK,
PlayModeEnum.THEIR_GOAL_KICK,
PlayModeEnum.THEIR_KICK_OFF,
PlayModeEnum.THEIR_OFFSIDE,
PlayModeEnum.THEIR_PENALTY_KICK,
PlayModeEnum.THEIR_PENALTY_SHOOT,
PlayModeEnum.THEIR_THROW_IN,
):
playmode_group = cls.THEIR_KICK
elif (playmode is PlayModeEnum.THEIR_GOAL) or (
is_left_team and playmode is PlayModeEnum.BEFORE_KICK_OFF
):
playmode_group = cls.ACTIVE_BEAM
elif (playmode is PlayModeEnum.OUR_GOAL) or (
not is_left_team and playmode is PlayModeEnum.BEFORE_KICK_OFF
):
playmode_group = cls.PASSIVE_BEAM
else:
raise NotImplementedError(
f"Not implemented playmode group for playmode {playmode}"
)
return playmode_group

273
world/robot.py Normal file
View File

@@ -0,0 +1,273 @@
from abc import ABC, abstractmethod
from typing_extensions import override
import numpy as np
from communication.server import Server
class Robot(ABC):
"""
Base class for all robot models.
This class defines the main structure and common data used by any robot,
such as motor positions, sensors, and control messages.
"""
def __init__(self, agent):
"""
Creates a new robot linked to the given agent.
Args:
agent: The main agent that owns this robot.
"""
from agent.base_agent import Base_Agent # type hinting
self.agent: Base_Agent = agent
self.server: Server = self.agent.server
self.motor_targets: dict = {
motor: {"target_position": 0.0, "kp": 0.0, "kd": 0.0}
for motor in self.ROBOT_MOTORS
}
self.motor_positions: dict = {motor: 0.0 for motor in self.ROBOT_MOTORS} # degrees
self.motor_speeds: dict = {motor: 0.0 for motor in self.ROBOT_MOTORS} # degrees/s
self._global_cheat_orientation = np.array([0, 0, 0, 1]) # quaternion [x, y, z, w]
self.global_orientation_quat = np.array([0, 0, 0, 1]) # quaternion [x, y, z, w]
self.global_orientation_euler = np.zeros(3) # euler [roll, pitch, yaw]
self.gyroscope = np.zeros(3) # angular velocity [roll, pitch, yaw] (degrees/s)
self.accelerometer = np.zeros(3) # linear acceleration [x, y, z] (m/s²)
@property
@abstractmethod
def name(self) -> str:
"""
Returns the robot model name.
"""
raise NotImplementedError()
@property
@abstractmethod
def ROBOT_MOTORS(self) -> tuple[str, ...]:
"""
Returns the list of motor names used by this robot.
"""
raise NotImplementedError()
def set_motor_target_position(
self, motor_name: str, target_position: float, kp: float = 10, kd: float = 0.1
) -> None:
"""
Sets the desired position and PD gains for a given motor.
For now, directly sets positions, as the simulator is doing the control
Args:
motor_name: Name of the motor.
target_position: Desired position in radians.
kp: Proportional gain.
kd: Derivative gain.
"""
self.motor_targets[motor_name] = {
"target_position": target_position,
"kp": kp,
"kd": kd,
}
def commit_motor_targets_pd(self) -> None:
"""
Sends all motor target commands to the simulator.
"""
for motor_name, target_description in self.motor_targets.items():
motor_msg = f'({motor_name} {target_description["target_position"]:.2f} 0.0 {target_description["kp"]:.2f} {target_description["kd"]:.2f} 0.0)'
self.server.commit(motor_msg)
class T1(Robot):
"""
Booster T1
"""
@override
def __init__(self, agent):
super().__init__(agent)
self.joint_nominal_position = np.array(
[
0.0,
0.0,
0.0,
0.0,
0.0,
0.0,
0.0,
0.0,
0.0,
0.0,
0.0,
0.0,
0.0,
0.0,
0.0,
0.0,
0.0,
0.0,
0.0,
0.0,
0.0,
0.0,
0.0,
]
)
@property
@override
def name(self) -> str:
return "T1"
@property
@override
def ROBOT_MOTORS(self) -> tuple[str, ...]:
return (
"he1",
"he2",
"lae1",
"lae2",
"lae3",
"lae4",
"rae1",
"rae2",
"rae3",
"rae4",
"te1",
"lle1",
"lle2",
"lle3",
"lle4",
"lle5",
"lle6",
"rle1",
"rle2",
"rle3",
"rle4",
"rle5",
"rle6",
)
@property
def MOTOR_FROM_READABLE_TO_SERVER(self) -> dict:
"""
Maps readable joint names to their simulator motor codes.
"""
return {
"Head_yaw": "he1",
"Head_pitch": "he2",
"Left_Shoulder_Pitch": "lae1",
"Left_Shoulder_Roll": "lae2",
"Left_Elbow_Pitch": "lae3",
"Left_Elbow_Yaw": "lae4",
"Right_Shoulder_Pitch": "rae1",
"Right_Shoulder_Roll": "rae2",
"Right_Elbow_Pitch": "rae3",
"Right_Elbow_Yaw": "rae4",
"Waist": "te1",
"Left_Hip_Pitch": "lle1",
"Left_Hip_Roll": "lle2",
"Left_Hip_Yaw": "lle3",
"Left_Knee_Pitch": "lle4",
"Left_Ankle_Pitch": "lle5",
"Left_Ankle_Roll": "lle6",
"Right_Hip_Pitch": "rle1",
"Right_Hip_Roll": "rle2",
"Right_Hip_Yaw": "rle3",
"Right_Knee_Pitch": "rle4",
"Right_Ankle_Pitch": "rle5",
"Right_Ankle_Roll": "rle6",
}
@property
def MOTOR_SYMMETRY(self) -> list[str, bool]:
"""
Defines pairs of symmetric motors and whether their direction is inverted.
Returns:
A dictionary where each key is a logical joint group name,
and the value is a tuple (motor_names, inverted).
"""
return {
"Head_yaw": (("Head_yaw",), False),
"Head_pitch": (("Head_pitch",), False),
"Shoulder_Pitch": (
(
"Left_Shoulder_Pitch",
"Right_Shoulder_Pitch",
),
False,
),
"Shoulder_Roll": (
(
"Left_Shoulder_Roll",
"Right_Shoulder_Roll",
),
True,
),
"Elbow_Pitch": (
(
"Left_Elbow_Pitch",
"Right_Elbow_Pitch",
),
False,
),
"Elbow_Yaw": (
(
"Left_Elbow_Yaw",
"Right_Elbow_Yaw",
),
True,
),
"Waist": (("Waist",), False),
"Hip_Pitch": (
(
"Left_Hip_Pitch",
"Right_Hip_Pitch",
),
False,
),
"Hip_Roll": (
(
"Left_Hip_Roll",
"Right_Hip_Roll",
),
True,
),
"Hip_Yaw": (
(
"Left_Hip_Yaw",
"Right_Hip_Yaw",
),
True,
),
"Knee_Pitch": (
(
"Left_Knee_Pitch",
"Right_Knee_Pitch",
),
False,
),
"Ankle_Pitch": (
(
"Left_Ankle_Pitch",
"Right_Ankle_Pitch",
),
False,
),
"Ankle_Roll": (
(
"Left_Ankle_Roll",
"Right_Ankle_Roll",
),
True,
),
}

66
world/world.py Normal file
View File

@@ -0,0 +1,66 @@
from dataclasses import Field
import numpy as np
from world.commons.other_robot import OtherRobot
from world.commons.field import FIFAField, HLAdultField
from world.commons.play_mode import PlayModeEnum, PlayModeGroupEnum
class World:
"""
Represents the current simulation world, containing all relevant
information about the environment, the ball, and the robots.
"""
MAX_PLAYERS_PER_TEAM = 11
def __init__(self, agent, team_name: str, number: int, field_name: str):
"""
Initializes the world state.
Args:
agent: Reference to the agent that owns this world.
team_name (str): The name of the agent's team.
number (int): The player's number within the team.
field_name (str): The name of the field to initialize
(e.g., 'fifa' or 'hl_adult').
"""
from agent.base_agent import Agent # type hinting
self.agent: Agent = agent
self.team_name: str = team_name
self.number: int = number
self.playmode: PlayModeEnum = PlayModeEnum.NOT_INITIALIZED
self.playmode_group: PlayModeGroupEnum = PlayModeGroupEnum.NOT_INITIALIZED
self.is_left_team: bool = None
self.game_time: float = None
self.server_time: float = None
self.score_left: int = None
self.score_right: int = None
self.their_team_name: str = None
self.last_server_time: str = None
self._global_cheat_position: np.ndarray = np.zeros(3)
self.global_position: np.ndarray = np.zeros(3)
self.ball_pos: np.ndarray = np.zeros(3)
self.is_ball_pos_updated: bool = False
self.our_team_players: list[OtherRobot] = [OtherRobot() for _ in range(self.MAX_PLAYERS_PER_TEAM)]
self.their_team_players: list[OtherRobot] = [OtherRobot(is_teammate=False) for _ in
range(self.MAX_PLAYERS_PER_TEAM)]
self.field: Field = self.__initialize_field(field_name=field_name)
def update(self) -> None:
"""
Updates the world state
"""
self.playmode_group = PlayModeGroupEnum.get_group_from_playmode(
playmode=self.playmode, is_left_team=self.is_left_team
)
def is_fallen(self) -> bool:
return self.global_position[2] < 0.3
def __initialize_field(self, field_name: str) -> Field:
if field_name in ('hl_adult', 'hl_adult_2020', 'hl_adult_2019',):
return HLAdultField(world=self)
else:
return FIFAField(world=self)