-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathwrapper.py
98 lines (76 loc) · 3.08 KB
/
wrapper.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
import numpy as np
from gymnasium import ObservationWrapper
import gymnasium as gym
from gymnasium.spaces import Box, Dict
from typing import Optional, Union, List
class RelativeRedundant(ObservationWrapper):
def __init__(self, env: gym.Env):
super().__init__(env)
self.observation_space = Box(
low=-self.size, high=self.size, shape=(12,), dtype=np.float32
)
def observation(self, observation) -> np.ndarray:
return np.concatenate(
[
observation["agent_location"] - observation["target_location"],
observation["agent_velocity"] - observation["target_velocity"],
observation["agent_location"],
observation["agent_velocity"],
observation["target_location"],
observation["target_velocity"],
]
)
class RelativePosition(ObservationWrapper):
def __init__(self, env: gym.Env):
super().__init__(env)
self.observation_space = Box(
low=-self.size, high=self.size, shape=(4,), dtype=np.float32
)
def observation(self, observation) -> np.ndarray:
return np.concatenate(
[
observation["agent_location"] - observation["target_location"],
observation["agent_velocity"] - observation["target_velocity"],
]
)
class ExtractRelative(ObservationWrapper):
def __init__(self, env: gym.Env):
super().__init__(env)
self.observation_space = Box(
low=-self.size, high=self.size, shape=(4,), dtype=np.float32
)
def observation(self, observation) -> np.ndarray:
return observation[:4]
class GaussianNoiseWrapper(ObservationWrapper):
"""
Inherit the existent observation space and add Gaussian noise to it.
"""
def __init__(
self,
env: gym.Env,
std_diff_ratio: float = 0.1,
std_value: Optional[Union[float, List[float]]] = None,
):
super().__init__(env)
if std_value is not None:
self.std = std_value
else:
# Determine std based on the difference between low and high values of the observation space
self.std = (
self.observation_space.high - self.observation_space.low
) * std_diff_ratio
def observation(self, observation) -> np.ndarray:
return observation + np.random.normal(0.0, self.std, size=observation.shape)
class GaussianNoiseWrapperRelativeRedundant(ObservationWrapper):
"""
Inherit the existent observation space and add Gaussian noise to it.
"""
def __init__(self, env: gym.Env, std_diff_ratio: float = 0.1):
env = RelativeRedundant(env)
super().__init__(env)
# Determine std based on the difference between low and high values of the observation space
self.std = (
self.observation_space.high - self.observation_space.low
) * std_diff_ratio
def observation(self, observation) -> np.ndarray:
return observation + np.random.normal(0.0, self.std, size=observation.shape)