Python gym 模块,make() 实例源码
我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用gym.make()。
def main():
env = gym.make("PongNoFrameskip-v4")
env = ScaledFloatFrame(wrap_dqn(env))
model = deepq.models.cnn_to_mlp(
convs=[(32, 8, 4), (64, 4, 2), 3, 1)],
hiddens=[256],
dueling=True
)
act = deepq.learn(
env,
q_func=model,
lr=1e-4,
max_timesteps=2000000,
buffer_size=10000,
exploration_fraction=0.1,
exploration_final_eps=0.01,
train_freq=4,
learning_starts=10000,
target_network_update_freq=1000,
gamma=0.99,
prioritized_replay=True
)
act.save("pong_model.pkl")
env.close()
def main(game_count=1):
record = os.path.join(os.path.dirname(__file__), "funfun")
env = gym.make("Pong-v0")
hanamichi = Hanamichi()
env.monitor.start(record)
for i in range(game_count):
playing = True
observation = env.reset()
reward = -1
action = -1
while playing:
env.render()
if action < 0:
action = hanamichi.start(observation)
else:
action = hanamichi.act(observation, reward)
observation, reward, done, info = env.step(action)
playing = not done
if done:
hanamichi.end(reward)
env.monitor.close()
def __init__(self, env_name, num_episodes, alpha, gamma, policy, report_freq=100, **kwargs):
"""
base class for RL using lookup table
:param env_name: see https://github.com/openai/gym/wiki/Table-of-environments
:param num_episodes: int,number of episode for training
:param alpha: float,learning rate
:param gamma: float,discount rate
:param policy: str
:param report_freq: int,by default 100
:param kwargs: other arguments
"""
self.env = gym.make(env_name)
self.num_episodes = num_episodes
self.alpha = alpha
self.gamma = gamma
self.state = None
self._rewards = None
self._policy = policy
self.report_freq = report_freq
for k, v in kwargs.items():
setattr(self, str(k), v)
def make_atari(env_id, noop=True, max_and_skip=True, episode_life=True, clip_rewards=True, frame_stack=True,
scale=True):
"""Configure environment for DeepMind-style Atari.
"""
env = gym.make(env_id)
assert 'NoFrameskip' in env.spec.id
if noop:
env = NoopResetEnv(env, noop_max=30)
if max_and_skip:
env = MaxAndSkipEnv(env, skip=4)
if episode_life:
env = EpisodicLifeEnv(env)
if 'FIRE' in env.unwrapped.get_action_meanings():
env = FireResetEnv(env)
env = WarpFrame(env)
if scale:
env = ScaledFloatFrame(env)
if clip_rewards:
env = ClipRewardEnv(env)
if frame_stack:
env = FrameStack(env, 4)
return env
def main():
env = gym.make("PongNoFrameskip-v4")
env = ScaledFloatFrame(wrap_dqn(env))
model = deepq.models.cnn_to_mlp(
convs=[(32,
prioritized_replay=True
)
act.save("pong_model.pkl")
env.close()
def main():
env = gym.make("CartPole-v0")
model = deepq.models.mlp([64])
act = deepq.learn(
env,
lr=1e-3,
max_timesteps=100000,
buffer_size=50000,
exploration_final_eps=0.02,
print_freq=10,
callback=callback
)
print("Saving model to cartpole_model.pkl")
act.save("cartpole_model.pkl")
def main():
env = gym.make("CartPole-v0")
model = deepq.models.mlp([64])
act = deepq.learn(
env,
callback=callback
)
print("Saving model to cartpole_model.pkl")
act.save("cartpole_model.pkl")
def test_cartpole_contextual():
env_id = 'CartPoleContextual-v0'
env = gym.make(env_id)
if isinstance(env.unwrapped, CartPoleEnv):
env.reset()
else:
raise NotImplementedError
nr_of_items_context_space_info = 10
nr_unwrapped = len(list(env.unwrapped.context_space_info().keys()))
if nr_of_items_context_space_info != nr_unwrapped:
print('context_space_info() function needs to be implemented!')
raise NotImplementedError
context_vect = [0.01, 0.01, 0.01]
# these should change because change_context_function
if context_vect == env.unwrapped.context:
raise AttributeError
env.unwrapped.change_context(context_vect)
if context_vect != env.unwrapped.context:
raise AttributeError
def test_pendulum_contextual():
env_id = 'PendulumContextual-v0'
env = gym.make(env_id)
if isinstance(env.unwrapped, PendulumEnv):
env.reset()
else:
raise NotImplementedError
nr_of_items_context_space_info = 10
nr_unwrapped = len(list(env.unwrapped.context_space_info().keys()))
if nr_of_items_context_space_info != nr_unwrapped:
print('context_space_info() function needs to be implemented!')
raise NotImplementedError
context_vect = [0.01, 0.01]
if context_vect == env.unwrapped.context:
raise AttributeError
env.unwrapped.change_context(context_vect)
if context_vect != env.unwrapped.context:
raise AttributeError
def main():
env = gym.make("CartPole-v0")
model = deepq.models.mlp([64])
act = deepq.learn(
env,
callback=callback
)
print("Saving model to cartpole_model.pkl")
act.save("cartpole_model.pkl")
def main():
env = gym.make("MountainCar-v0")
# Enabling layer_norm here is import for parameter space noise!
model = deepq.models.mlp([64], layer_norm=True)
act = deepq.learn(
env,
exploration_final_eps=0.1,
param_noise=True
)
print("Saving model to mountaincar_model.pkl")
act.save("mountaincar_model.pkl")
def main():
env = gym.make("CartPole-v0")
model = deepq.models.mlp([64])
act = deepq.learn(
env,
callback=callback
)
print("Saving model to cartpole_model.pkl")
act.save("cartpole_model.pkl")
def train(env_id, num_timesteps, seed):
import baselines.common.tf_util as U
sess = U.single_threaded_session()
sess.__enter__()
rank = MPI.COMM_WORLD.Get_rank()
if rank != 0:
logger.set_level(logger.disABLED)
workerseed = seed + 10000 * MPI.COMM_WORLD.Get_rank()
set_global_seeds(workerseed)
env = gym.make(env_id)
def policy_fn(name, ob_space, ac_space):
return MlpPolicy(name=name, ob_space=env.observation_space, ac_space=env.action_space,
hid_size=32, num_hid_layers=2)
env = bench.Monitor(env, logger.get_dir() and
osp.join(logger.get_dir(), str(rank)))
env.seed(workerseed)
gym.logger.setLevel(logging.WARN)
trpo_mpi.learn(env, policy_fn, timesteps_per_batch=1024, max_kl=0.01, cg_iters=10, cg_damping=0.1,
max_timesteps=num_timesteps, gamma=0.99, lam=0.98, vf_iters=5, vf_stepsize=1e-3)
env.close()
def train(env_id, seed):
env=gym.make(env_id)
env = bench.Monitor(env, logger.get_dir() and os.path.join(logger.get_dir(), str(rank)))
set_global_seeds(seed)
env.seed(seed)
gym.logger.setLevel(logging.WARN)
with tf.Session(config=tf.ConfigProto()):
ob_dim = env.observation_space.shape[0]
ac_dim = env.action_space.shape[0]
with tf.variable_scope("vf"):
vf = NeuralNetValueFunction(ob_dim, ac_dim)
with tf.variable_scope("pi"):
policy = GaussianMlpPolicy(ob_dim, ac_dim)
learn(env, policy=policy, vf=vf,
gamma=0.99, lam=0.97, timesteps_per_batch=2500,
desired_kl=0.002,
num_timesteps=num_timesteps, animate=False)
env.close()
def main():
env = gym.make("SpaceInvadersNoFrameskip-v4")
env = ScaledFloatFrame(wrap_dqn(env))
model = deepq.models.cnn_to_mlp(
convs=[(32,
prioritized_replay=True
)
act.save("space_invaders_model.pkl")
env.close()
def train(env_id, seed):
from baselines.pposgd import mlp_policy, pposgd_simple
U.make_session(num_cpu=1).__enter__()
logger.session().__enter__()
set_global_seeds(seed)
env = gym.make(env_id)
def policy_fn(name, ac_space):
return mlp_policy.MlpPolicy(name=name, ob_space=ob_space, ac_space=ac_space,
hid_size=64, osp.join(logger.get_dir(), "monitor.json"))
env.seed(seed)
gym.logger.setLevel(logging.WARN)
pposgd_simple.learn(env,
max_timesteps=num_timesteps,
timesteps_per_batch=2048,
clip_param=0.2, entcoeff=0.0,
optim_epochs=10, optim_stepsize=3e-4, optim_batchsize=64, lam=0.95,
)
env.close()
def make_env(env_id, seed, rank, log_dir):
def _thunk():
env = gym.make(env_id)
is_atari = hasattr(gym.envs, 'atari') and isinstance(env.unwrapped, gym.envs.atari.atari_env.AtariEnv)
if is_atari:
env = make_atari(env_id)
env.seed(seed + rank)
if log_dir is not None:
env = bench.Monitor(env, os.path.join(log_dir, str(rank)))
if is_atari:
env = wrap_deepmind(env)
# If the input has shape (W,H,3),wrap for PyTorch convolutions
obs_shape = env.observation_space.shape
if len(obs_shape) == 3 and obs_shape[2] in [1, 3]:
env = WrapPyTorch(env)
return env
return _thunk
def main():
# initialize gym environment
environment = gym.make('CartPole-v0')
state_axes = ng.make_axes([
ng.make_axis(environment.observation_space.shape[0], name='width')
])
agent = dqn.Agent(
state_axes,
environment.action_space,
model=baselines_model,
epsilon=dqn.linear_generator(start=1.0, end=0.02, steps=10000),
learning_rate=1e-3,
gamma=1.0,
memory=dqn.Memory(maxlen=50000),
learning_starts=1000,
)
rl_loop.rl_loop_train(environment, agent, episodes=1000)
total_reward = rl_loop.evaluate_single_episode(environment, agent)
print(total_reward)
def test_dependent_environment():
environment = gym.make('DependentEnv-v0')
total_rewards = []
for i in range(10):
agent = dqn.Agent(
dqn.space_shape(environment.observation_space),
environment.action_space,
model=model,
epsilon=dqn.decay_generator(start=1.0, decay=0.995, minimum=0.1),
learning_rate=0.1,
)
rl_loop.rl_loop_train(environment, episodes=10)
total_rewards.append(
rl_loop.evaluate_single_episode(environment, agent)
)
# most of these 10 agents will be able to converge to the perfect policy
assert np.mean(np.array(total_rewards) == 100) >= 0.5
def create_flash_env(env_id, client_id, remotes, **_):
env = gym.make(env_id)
env = Vision(env)
env = Logger(env)
env = BlockingReset(env)
reg = universe.runtime_spec('flashgames').server_registry
height = reg[env_id]["height"]
width = reg[env_id]["width"]
env = CropScreen(env, height, width, 84, 18)
env = FlashRescale(env)
keys = ['left', 'right', 'up', 'down', 'x']
env = discretetoFixedKeysVNCActions(env, keys)
env = EpisodeID(env)
env = DiagnosticsInfo(env)
env = Unvectorize(env)
env.configure(fps=5.0, remotes=remotes, start_timeout=15 * 60, client_id=client_id,
vnc_driver='go', vnc_kwargs={
'encoding': 'tight', 'compress_level': 0,
'fine_quality_level': 50, 'subsample_level': 3})
return env
def make(env_id, hack=None):
if 'Deterministic-v4' not in env_id:
print('[Warning] Use Deterministic-v4 version '
'to reproduce the results of paper.')
_env = env = gym.make(env_id)
if hack:
# Hack gym env to output grayscale image
if env.spec.timestep_limit is not None:
from gym.wrappers.time_limit import TimeLimit
if isinstance(env, TimeLimit):
_env = env.env
if hack == 'train':
_env._get_image = _env.ale.getScreenGrayscale
_env._get_obs = _env.ale.getScreenGrayscale
elif hack == 'eval':
_env._get_obs = _env.ale.getScreenGrayscale
return env
def dqn_test(env='OneRoundDeterministicReward-v0'):
env = gym.make(env)
env = ObservationShapeWrapper(env)
@tt.model(tracker=tf.train.ExponentialMovingAverage(1-.01),
optimizer=tf.train.AdamOptimizer(.01))
def q_network(x):
x = layers.fully_connected(x, 32)
x = layers.fully_connected(x, env.action_space.n, activation_fn=None,
weights_initializer=tf.random_normal_initializer(0, 1e-4))
return x
agent = DqnAgent(env, q_network, double_dqn=False, replay_start=100, annealing_time=100)
rs = []
for ep in range(10000):
r, _ = agent.play_episode()
rs.append(r)
if ep % 100 == 0:
print(f'Return after episode {ep} is {sum(rs)/len(rs)}')
rs = []
def test_steps_limit_restart():
env = gym.make('test.StepsLimitDummyVNCEnv-v0')
env.configure(_n=1)
env = wrappers.TimeLimit(env)
env.reset()
assert env._max_episode_seconds == None
assert env._max_episode_steps == 2
# Episode has started
_, _, info = env.step([[]])
assert done == [False]
# Limit reached,Now we get a done signal and the env resets itself
_, info = env.step([[]])
assert done == [True]
assert env._elapsed_steps == 0
def test_seconds_limit_restart():
env = gym.make('test.SecondsLimitDummyVNCEnv-v0')
env.configure(_n=1)
env = wrappers.TimeLimit(env)
env.reset()
assert env._max_episode_seconds == 0.1
assert env._max_episode_steps == None
# Episode has started
_, info = env.step([[]])
assert done == [False]
# Not enough time has passed
_, info = env.step([[]])
assert done == [False]
time.sleep(0.2)
# Limit reached, info = env.step([[]])
assert done == [True]
def test_default_time_limit():
# We need an env without a default limit
register(
id='test.NoLimitDummyVNCEnv-v0',
entry_point='universe.envs:DummyVNCEnv',
tags={
'vnc': True,
},
)
env = gym.make('test.NoLimitDummyVNCEnv-v0')
env.configure(_n=1)
env = wrappers.TimeLimit(env)
env.reset()
assert env._max_episode_seconds == wrappers.time_limit.DEFAULT_MAX_EPISODE_SECONDS
assert env._max_episode_steps == None
def test_joint():
env1 = gym.make('test.DummyVNCEnv-v0')
env2 = gym.make('test.DummyVNCEnv-v0')
env1.configure(_n=3)
env2.configure(_n=3)
for reward_buffer in [env1._reward_buffers[0], env2._reward_buffers[0]]:
reward_buffer.set_env_info('running', 'test.DummyVNCEnv-v0', '1', 60)
reward_buffer.reset('1')
reward_buffer.push('1', 10, False, {})
env = wrappers.Joint([env1, env2])
assert env.n == 6
observation_n = env.reset()
assert observation_n == [None] * 6
observation_n, reward_n, done_n, info = env.step([[] for _ in range(env.n)])
assert reward_n == [10.0, 0.0, 10.0, 0.0]
assert done_n == [False] * 6
def __init__(self, env, gym_core_id=None):
super(GymCoreAction, self).__init__(env)
if gym_core_id is None:
# self.spec is None while inside of the make,so we need
# to pass gym_core_id in explicitly there. This case will
# be hit when instantiating by hand.
gym_core_id = self.spec._kwargs['gym_core_id']
spec = gym.spec(gym_core_id)
raw_action_space = gym_core_action_space(gym_core_id)
self._actions = raw_action_space.actions
self.action_space = gym_spaces.discrete(len(self._actions))
if spec._entry_point.startswith('gym.envs.atari:'):
self.key_state = translator.AtariKeyState(gym.make(gym_core_id))
else:
self.key_state = None
def test_describe_handling():
env = gym.make('flashgames.DuskDrive-v0')
env.configure(vnc_driver=FakeVNCSession, rewarder_driver=FakeRewarder, remotes='vnc://example.com:5900+15900')
env.reset()
reward_buffer = get_reward_buffer(env)
rewarder_client = get_rewarder_client(env)
rewarder_client._manual_recv('v0.env.describe', {'env_id': 'flashgames.DuskDrive-v0', 'env_state': 'resetting', 'fps': 60}, {'episode_id': '1'})
assert reward_buffer._remote_episode_id == '1'
assert reward_buffer._remote_env_state == 'resetting'
assert reward_buffer._current_episode_id == None
assert reward_buffer.reward_state(reward_buffer._current_episode_id)._env_state == None
rewarder_client._manual_recv('v0.reply.env.reset', {}, {'episode_id': '1'})
assert reward_buffer._remote_episode_id == '1'
assert reward_buffer._remote_env_state == 'resetting'
assert reward_buffer._current_episode_id == '1'
assert reward_buffer.reward_state(reward_buffer._current_episode_id)._env_state == 'resetting'
def test_smoke(env_id):
"""Check that environments start up without errors and that we can extract rewards and observations"""
gym.undo_logger_setup()
logging.getLogger().setLevel(logging.INFO)
env = gym.make(env_id)
if env.Metadata.get('configure.required', False):
if os.environ.get('FORCE_LATEST_UNIVERSE_DOCKER_RUNTIMES'): # Used to test universe-envs in CI
configure_with_latest_docker_runtime_tag(env)
else:
env.configure(remotes=1)
env = wrappers.Unvectorize(env)
env.reset()
_rollout(env, timestep_limit=60*30) # Check a rollout
def train_agent(rounds=10000, use_score=False, name='result_dir', create_agent=create_ddqn_agent):
ENV_NAME = 'malware-score-v0' if use_score else 'malware-v0'
env = gym.make( ENV_NAME )
np.random.seed(123)
env.seed(123)
agent = create_agent(env)
chainerrl.experiments.train_agent_with_evaluation(
agent,
steps=rounds, # Train the agent for this many rounds steps
max_episode_len=env.maxturns, # Maximum length of each episodes
eval_interval=1000, # Evaluate the agent after every 1000 steps
eval_n_runs=100, # 100 episodes are sampled for each evaluation
outdir=name) # Save everything to 'result' directory
return agent
def main():
env = gym.make('stochastic-4x4-FrozenLake-v0')
policy = learn_with_mdp_model(env)
render_single(env, policy)
# for i in range(10):
# print('\n%d' % i)
# env.render()
# print(env.step(env.action_space.sample()))
# env.render()
# for init_state in env.P.keys():
# for action in env.P[init_state]:
# print("\nState: %d,action: %d" % (init_state,action))
# for next_state in env.P[init_state][action]:
# print(next_state)
# for _ in range(10):
# env.render()
# env.step(env.action_space.sample())
def init(self):
gym.configuration.undo_logger_setup()
self.env = gym.make(self.env_name)
self.n_inputs, self.input_handler = self._init_space(
self.env.action_space)
self.inputs = np.empty(self.n_inputs)
self.n_outputs, _ = self._init_space(self.env.observation_space)
self.outputs = np.empty(self.n_outputs)
if self.seed is not None:
self.env.seed(self.seed)
self.logger = get_logger(self, self.log_to_file, self.log_to_stdout)
if self.log_to_stdout or self.log_to_file:
self.logger.info("Number of inputs: %d" % self.n_inputs)
self.logger.info("Number of outputs: %d" % self.n_outputs)
def __init__(self, state_builder=ALEStateBuilder(), repeat_action=4, no_op=30, monitoring_path=None):
assert isinstance(state_builder, StateBuilder), 'state_builder should inherit from StateBuilder'
assert isinstance(repeat_action, (int, tuple)), 'repeat_action should be int or tuple'
if isinstance(repeat_action, int):
assert repeat_action >= 1, "repeat_action should be >= 1"
elif isinstance(repeat_action, tuple):
assert len(repeat_action) == 2, 'repeat_action should be a length-2 tuple: (min frameskip,max frameskip)'
assert repeat_action[0] < repeat_action[1], 'repeat_action[0] should be < repeat_action[1]'
super(GymEnvironment, self).__init__()
self._state_builder = state_builder
self._env = gym.make(env_name)
self._env.env.frameskip = repeat_action
self._no_op = max(0, no_op)
self._done = True
if monitoring_path is not None:
self._env = Monitor(self._env, monitoring_path, video_callable=need_record)
def deterministic_grid_test():
env = gym.make("deterministic-grid-world-v0")
prev_state = env.state
for _ in xrange(100): env.step(0) # noop
assert env.state == prev_state
while env.state[0] > 0:
env.step(1)
assert env.state[0] == 0
env.step(1)
assert env.state[0] == 0
while env.state[1] < env.board_size[1] - 1:
env.step(3)
assert env.state[1] == env.board_size[1] - 1
env.step(3)
assert env.state[1] == env.board_size[1] - 1
def __init__(self, n_options=10, logger=None, plotting=False,
log_tf_graph=False):
if logger is None:
logger = logging.getLogger("logger")
logger.setLevel(logging.INFO)
self.logger = logger
self.n_options = n_options
self.env = gym.make("deterministic-grid-world-v0")
self.n_actions = self.env.action_space.n
self.n_states = 1 + reduce(lambda x, y: x*y,
map(lambda x: x.n, self.env.observation_space.spaces))
if plotting:
self.plot_robots = [PlotRobot('dqn loss', 0, log_scale=True),
PlotRobot('q loss', 1), PlotRobot('rewards', 2)]
else:
self.plot_robots = [None] * 3
self.plotting = self.plot_robots[2]
self.colors = list('bgrcmyk') + ['magenta', 'lime', 'gray']
self.build_graph(log_tf_graph)
def execute(symbol, begin, end, days, plot, model_path,random):
print model_path
model = load_model(model_path)
env = gym.make('Trading-v0').env
env.initialise(symbol=symbol, start=begin, end=end, days=days, random = random)
state_size = env.observation_space.shape[0]
state = env.reset()
done = False
while not done:
state = state.reshape(1, state_size)
# state = state.reshape(1,1,state_size)
qval = model.predict(state, batch_size=1)
action = (np.argmax(qval))
state, info = env.step(action)
# log.info("%s,%s,%s",state,_,done,info)
# log.info("\n%s",env.sim.to_df())
if plot:
env.render()
def __init__(self, game="MsPacman-v0"):
self.screen_h = Config.SCREEN_H
self.screen_w = Config.SCREEN_W
self.screen_shape = Config.SCREEN_SHAPE
self.frame_per_row = Config.FRAME_PER_ROW
self.frame_buffer = None
self.action_space = 9
# Meta
self.total_episode_run = 0
self.steps_in_episode = 0
self.max_steps_in_episode = 0
self.env = gym.make(game)
self.reset()
def demo_run():
env = gym.make("RoboschoolInvertedPendulum-v1")
pi = SmallReactivePolicy(env.observation_space, env.action_space)
while 1:
frame = 0
score = 0
restart_delay = 0
obs = env.reset()
while 1:
a = pi.act(obs)
obs, r, _ = env.step(a)
score += r
frame += 1
still_open = env.render("human")
if still_open==False:
return
if not done: continue
if restart_delay==0:
print("score=%0.2f in %i frames" % (score, frame))
restart_delay = 60*2 # 2 sec at 60 fps
else:
restart_delay -= 1
if restart_delay==0: break
def demo_run():
env = gym.make("RoboschoolHumanoidFlagrun-v1")
pi = SmallReactivePolicy(env.observation_space, frame))
restart_delay = 60*2 # 2 sec at 60 fps
else:
restart_delay -= 1
if restart_delay==0: break
def demo_run():
env = gym.make("RoboschoolAnt-v1")
pi = SmallReactivePolicy(env.observation_space, frame))
restart_delay = 60*2 # 2 sec at 60 fps
else:
restart_delay -= 1
if restart_delay==0: break
def demo_run():
env = gym.make("RoboschoolReacher-v1")
pi = SmallReactivePolicy(env.observation_space, env.action_space)
while 1:
frame = 0
score = 0
obs = env.reset()
while 1:
a = pi.act(obs)
obs, _ = env.step(a)
score += r
frame += 1
still_open = env.render("human")
if still_open==False:
return
if not done: continue
print("score=%0.2f in %i frames" % (score, frame))
break
def demo_run():
env = gym.make("RoboschoolHopper-v1")
pi = SmallReactivePolicy(env.observation_space, frame))
restart_delay = 60*2 # 2 sec at 60 fps
else:
restart_delay -= 1
if restart_delay==0: break
def demo_run():
env = gym.make("RoboschoolWalker2d-v1")
pi = SmallReactivePolicy(env.observation_space, frame))
restart_delay = 60*2 # 2 sec at 60 fps
else:
restart_delay -= 1
if restart_delay==0: break
def multiplayer(self, game_server_guid, player_n):
"""
That's the function you call between gym.make() and first env.reset(),to connect to multiplayer server.
game_server_guid -- is an id that server and client use to identify themselves to belong to the same session.
player_n -- integer,up to scene.players_count.
You see here env._reset() gets overwritten,that means if you call env.reset(),it will not create
single player scene on your side (as it usually do),but rather it will communicate to server,reset environment
there. Same with step() and render().
"""
self.shmem_client_init(game_server_guid, player_n)
env._step = self.shmem_client_step # replace real function with fake,that communicates with environment on server
env._reset = self.shmem_client_reset
env._render = self.shmem_client_rgb_array
self.shmem_client_send_env_id()
def read_env_id_and_create_env(self):
self.sh_pipe_actready = open(self.sh_pipe_actready_filename, "rt")
self.sh_pipe_obsready = os.open(self.sh_pipe_obsready_filename, os.O_WRONLY)
env_id = self.sh_pipe_actready.readline()[:-1]
if env_id.find("-v")==-1:
raise ValueError("multiplayer client %s sent here invalid environment id '%s'" % (self.prefix, env_id))
#
# And at this point we kNow env_id.
#
print("Player %i connected,wants to operate %s in this scene" % (self.player_n, env_id))
self.env = gym.make(env_id) # gym.make() creates at least timeout wrapper,we need it.
self.env.unwrapped.scene = self.scene
self.env.unwrapped.player_n = self.player_n
assert isinstance(self.env.observation_space, gym.spaces.Box)
assert isinstance(self.env.action_space, gym.spaces.Box)
self.sh_obs = np.memmap(self.prefix + "_obs", mode="w+", shape=self.env.observation_space.shape, dtype=np.float32)
self.sh_act = np.memmap(self.prefix + "_act", shape=self.env.action_space.shape, dtype=np.float32)
self.sh_rew = np.memmap(self.prefix + "_rew", shape=(1,), dtype=np.float32)
self.sh_rgb = np.memmap(self.prefix + "_rgb", shape=(self.env.unwrapped.VIDEO_H,self.env.unwrapped.VIDEO_W,3), dtype=np.uint8)
os.write(self.sh_pipe_obsready, b'accepted\n')
def main():
parser = argparse.ArgumentParser()
parser.add_argument("envid")
parser.add_argument("outfile")
parser.add_argument("--gymdir")
args = parser.parse_args()
if args.gymdir:
sys.path.insert(0, args.gymdir)
import gym
from gym import utils
print utils.colorize("gym directory: %s"%path.dirname(gym.__file__), "yellow")
env = gym.make(args.envid)
agent = RandomAgent(env.action_space)
alldata = {}
for i in xrange(2):
np.random.seed(i)
data = rollout(env, env.spec.max_episode_steps)
for (k, v) in data.items():
alldata["%i-%s"%(i, k)] = v
np.savez(args.outfile, **alldata)
def __init__(self, death_penalty=True, deterministic=True, v=3, **kwargs):
env_id = "MsPacman"
if deterministic:
env_id += "Deterministic"
env_id += "-v%d" % v
env = gym.make(env_id)
super(Pacman, self).__init__(env)
self.observation_space = gym.spaces.Box(0.0, 1.0, [42, 42, 1])
self.death_penalty = death_penalty
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。