Python gym 模块,Env() 实例源码
我们从Python开源项目中,提取了以下26个代码示例,用于说明如何使用gym.Env()。
def get_wrapper_by_name(env, classname):
"""Given an a gym environment possibly wrapped multiple times,returns a wrapper
of class named classname or raises ValueError if no such wrapper was applied
Parameters
----------
env: gym.Env of gym.Wrapper
gym environment
classname: str
name of the wrapper
Returns
-------
wrapper: gym.Wrapper
wrapper named classname
"""
currentenv = env
while True:
if classname == currentenv.class_name():
return currentenv
elif isinstance(currentenv, gym.Wrapper):
currentenv = currentenv.env
else:
raise ValueError("Couldn't find wrapper named %s" % classname)
def __init__(self, env: gym.Env, gamma, epsilon, final_epsilon, final_exp_step):
"""
:param env: environment
:param gamma: discount rate
:param epsilon: initial exploration rate
:param final_epsilon: final exploration rate
:param final_exp_step: the step terminating exploration
"""
self.env = env
self.action_size = self.env.action_space.n
self.net = DQN(self.action_size)
self.target_net = DQN(self.action_size)
self._gamma = gamma
self._initial_epsilon = epsilon
self.epsilon = epsilon
self._final_epsilon = final_epsilon
self._final_exp_step = final_exp_step
if cuda_available:
self.net.cuda()
self.target_net.cuda()
self.update_target_net()
def print_env(env: Env):
spec = getattr(env, 'spec', False)
if spec:
print(f'Env spec: {vars(spec)}')
acsp = env.action_space
obsp = env.observation_space
print(f'Observation space {obsp}')
if isinstance(obsp, Box) and len(obsp.high) < 20:
print(f'low = {obsp.low}\nhigh = {obsp.high}')
print(f'Action space {acsp}')
if isinstance(acsp, Box) and len(acsp.high) < 20:
print(f'low = {acsp.low}\nhigh = {acsp.high}')
print("")
def run_episode(self, env: gym.Env):
Meta_wrapper = get_wrapper(env, chi.rl.wrappers.Wrapper)
done = False
ob = env.reset()
a, Meta = self.act(ob)
rs = []
while not done:
if Meta_wrapper:
Meta_wrapper.set_Meta(Meta) # send Meta information to wrappers
ob, r, done, info = env.step(a)
a, Meta = self.act(ob, info)
rs.append(r)
return sum(rs)
def bot_play(mainDQN: DeepQNetwork, env: gym.Env) -> None:
"""Test runs with rendering and logger.infos the total score
Args:
mainDQN (DeepQNetwork): DQN agent to run a test
env (gym.Env): Gym Environment
"""
state = env.reset()
reward_sum = 0
while True:
env.render()
action = np.argmax(mainDQN.predict(state))
state, reward, _ = env.step(action)
reward_sum += reward
if done:
logger.info("Total score: {}".format(reward_sum))
break
def __init__(self, agent: Agent, val_env: gym.Env, lr, memory_size, target_update_freq, gradient_update_freq,
batch_size, replay_start, val_freq, log_freq_by_step, log_freq_by_ep, val_epsilon,
log_dir, weight_dir):
"""
:param agent: agent object
:param val_env: environment for validation
:param lr: learning rate of optimizer
:param memory_size: size of replay memory
:param target_update_freq: frequency of update target network in steps
:param gradient_update_freq: frequency of q-network update in steps
:param batch_size: batch size for q-net
:param replay_start: number of random exploration before starting
:param val_freq: frequency of validation in steps
:param log_freq_by_step: frequency of logging in steps
:param log_freq_by_ep: frequency of logging in episodes
:param val_epsilon: exploration rate for validation
:param log_dir: directory for saving tensorboard things
:param weight_dir: directory for saving weights when validated
"""
self.agent = agent
self.env = self.agent.env
self.val_env = val_env
self.optimizer = optim.RMSprop(params=self.agent.net.parameters(), lr=lr)
self.memory = Memory(memory_size)
self.target_update_freq = target_update_freq
self.batch_size = batch_size
self.replay_start = replay_start
self.gradient_update_freq = gradient_update_freq
self._step = 0
self._episode = 0
self._warmed = False
self._val_freq = val_freq
self.log_freq_by_step = log_freq_by_step
self.log_freq_by_ep = log_freq_by_ep
self._val_epsilon = val_epsilon
self._writer = SummaryWriter(os.path.join(log_dir, datetime.Now().strftime('%b%d_%H-%M-%s')))
if weight_dir is not None and not os.path.exists(weight_dir):
os.makedirs(weight_dir)
self.weight_dir = weight_dir
def __init__(self, **kwargs):
utils.EzPickle.__init__(self)
self.curr_seed = 0
self.screen = np.zeros((SCREEN_HEIGHT, SCREEN_WIDTH, 3), dtype=np.uint8)
self.closed = False
self.can_send_command = True
self.command_cond = Condition()
self.viewer = None
self.reward = 0
episode_time_length_secs = 7
frame_skip = 5
fps = 60
self.episode_length = episode_time_length_secs * fps / frame_skip
self.actions = [
'U', 'D', 'L', 'R',
'UR', 'DR', 'URA', 'DRB',
'A', 'B', 'RB', 'RA']
self.action_space = spaces.discrete(len(self.actions))
self.frame = 0
# for communication with emulator
self.pipe_in = None
self.pipe_out = None
self.thread_incoming = None
self.rom_file_path = None
self.lua_interface_path = None
self.emulator_started = False
## ---------- gym.Env methods -------------
def _close(self):
self.closed = True
## ------------- end gym.Env --------------
## ------------- emulator related ------------
def list_wrappers(env: Union[Env, gym.Wrapper]):
while isinstance(env, gym.Wrapper):
yield env
env = env.env
def _step(self, action):
s, t, i = super()._step(action)
assert isinstance(self.env, Env)
assert isinstance(self.env.action_space, Box)
l = self.env.action_space.low
h = self.env.action_space.high
m = h - l
dif = (action - np.clip(action, l - self.slack * m, h + self.slack * m))
i.setdefault('unwrapped_reward', r)
r -= self.alpha * np.mean(np.square(dif / m))
return s, i
def test_double_close():
class TestEnv(gym.Env):
def __init__(self):
self.close_count = 0
def _close(self):
self.close_count += 1
env = TestEnv()
assert env.close_count == 0
env.close()
assert env.close_count == 1
env.close()
assert env.close_count == 1
def test_no_monitor_reset_unless_done():
def assert_reset_raises(env):
errored = False
try:
env.reset()
except error.Error:
errored = True
assert errored, "Env allowed a reset when it shouldn't have"
with helpers.tempdir() as temp:
# Make sure we can reset as we please without monitor
env = gym.make('CartPole-v0')
env.reset()
env.step(env.action_space.sample())
env.step(env.action_space.sample())
env.reset()
# can reset once as soon as we start
env = Monitor(env, temp, video_callable=False)
env.reset()
# can reset multiple times in a row
env.reset()
env.reset()
env.step(env.action_space.sample())
env.step(env.action_space.sample())
assert_reset_raises(env)
# should allow resets after the episode is done
d = False
while not d:
_, _, d, _ = env.step(env.action_space.sample())
env.reset()
env.reset()
env.step(env.action_space.sample())
assert_reset_raises(env)
env.close()
def get_preprocessor_as_wrapper(cls, env, options=dict()):
"""Returns a preprocessor as a gym observation wrapper.
Args:
env (gym.Env): The gym environment to wrap.
options (dict): Options to pass to the preprocessor.
Returns:
wrapper (gym.ObservationWrapper): Preprocessor in wrapper form.
"""
preprocessor = cls.get_preprocessor(env, options)
return _RLlibPreprocessorWrapper(env, preprocessor)
def _get_raw_array(self):
raise NotImplementedError
# gym.Env functions
def test_double_close():
class TestEnv(gym.Env):
def __init__(self):
self.close_count = 0
def _close(self):
self.close_count += 1
env = TestEnv()
assert env.close_count == 0
env.close()
assert env.close_count == 1
env.close()
assert env.close_count == 1
def test_no_monitor_reset_unless_done():
def assert_reset_raises(env):
errored = False
try:
env.reset()
except error.Error:
errored = True
assert errored, _ = env.step(env.action_space.sample())
env.reset()
env.reset()
env.step(env.action_space.sample())
assert_reset_raises(env)
env.close()
def test_double_close():
class TestEnv(gym.Env):
def __init__(self):
self.close_count = 0
def _close(self):
self.close_count += 1
env = TestEnv()
assert env.close_count == 0
env.close()
assert env.close_count == 1
env.close()
assert env.close_count == 1
def test_no_monitor_reset_unless_done():
def assert_reset_raises(env):
errored = False
try:
env.reset()
except error.Error:
errored = True
assert errored, "Env allowed a reset when it shouldn't have"
with helpers.tempdir() as temp:
# Make sure we can reset as we please without monitor
env = gym.make('CartPole-v0')
env.reset()
env.step(env.action_space.sample())
env.step(env.action_space.sample())
env.reset()
# can reset once as soon as we start
env.monitor.start(temp, _ = env.step(env.action_space.sample())
env.reset()
env.reset()
env.step(env.action_space.sample())
assert_reset_raises(env)
env.monitor.close()
def add_observation_summary(obs, env):
"""Adds observation summary.
Supports observation tensors with 1,2 and 3 dimensions only.
1-D tensors logs as histogram summary.
2-D and 3-D tensors logs as image summary.
Args:
obs (Tensor): Observation.
env (gym.Env): Environment instance.
"""
from reinforceflow.envs.gym_wrapper import ObservationStackWrap, ImageWrap
# Get all wrappers
all_wrappers = {}
env_wrapper = env
while True:
if isinstance(env_wrapper, gym.Wrapper):
all_wrappers[env_wrapper.__class__] = env_wrapper
env_wrapper = env_wrapper.env
else:
break
# Check for grayscale
gray = False
if ImageWrap in all_wrappers:
gray = all_wrappers[ImageWrap].grayscale
# Check and wrap observation stack
if ObservationStackWrap in all_wrappers:
channels = 1 if gray else 3
for obs_id in range(all_wrappers[ObservationStackWrap].obs_stack):
o = obs[:, :, obs_id*channels:(obs_id+1)*channels]
tf.summary.image('observation%d' % obs_id, o, max_outputs=1)
return
# Try to wrap current observation
if len(env.observation_space.shape) == 1:
tf.summary.histogram('observation', obs)
elif len(env.observation_space.shape) == 2:
tf.summary.image('observation', obs)
elif len(env.observation_space.shape) == 3 and env.observation_space.shape[2] in (1, 3):
tf.summary.image('observation', obs)
else:
logger.warn('Cannot create summary for observation with shape',
env.observation_space.shape)
def get_preprocessor(cls, options=dict()):
"""Returns a suitable processor for the given environment.
Args:
env (gym.Env): The gym environment to preprocess.
options (dict): Options to pass to the preprocessor.
Returns:
preprocessor (Preprocessor): Preprocessor for the env observations.
"""
# For older gym versions that don't set shape for discrete
if not hasattr(env.observation_space, "shape") and \
isinstance(env.observation_space, gym.spaces.discrete):
env.observation_space.shape = ()
env_name = env.spec.id
obs_shape = env.observation_space.shape
for k in options.keys():
if k not in MODEL_CONfigS:
raise Exception(
"UnkNown config key `{}`,all keys: {}".format(
k, MODEL_CONfigS))
print("Observation shape is {}".format(obs_shape))
if env_name in cls._registered_preprocessor:
return cls._registered_preprocessor[env_name](
env.observation_space, options)
if obs_shape == ():
print("Using one-hot preprocessor for discrete envs.")
preprocessor = OneHotPreprocessor
elif obs_shape == cls.ATARI_OBS_SHAPE:
print("Assuming Atari pixel env,using AtariPixelPreprocessor.")
preprocessor = AtariPixelPreprocessor
elif obs_shape == cls.ATARI_RAM_OBS_SHAPE:
print("Assuming Atari ram env,using AtariRamPreprocessor.")
preprocessor = AtariRamPreprocessor
else:
print("Non-atari env,not using any observation preprocessor.")
preprocessor = nopreprocessor
return preprocessor(env.observation_space, options)
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。