Python gym 模块,utils() 实例源码
我们从Python开源项目中,提取了以下15个代码示例,用于说明如何使用gym.utils()。
def __init__(self, **kwargs):
utils.EzPickle.__init__(self)
self.curr_seed = 0
self.screen = np.zeros((SCREEN_HEIGHT, SCREEN_WIDTH, 3), dtype=np.uint8)
self.closed = False
self.can_send_command = True
self.command_cond = Condition()
self.viewer = None
self.reward = 0
episode_time_length_secs = 7
frame_skip = 5
fps = 60
self.episode_length = episode_time_length_secs * fps / frame_skip
self.actions = [
'U', 'D', 'L', 'R',
'UR', 'DR', 'URA', 'DRB',
'A', 'B', 'RB', 'RA']
self.action_space = spaces.discrete(len(self.actions))
self.frame = 0
# for communication with emulator
self.pipe_in = None
self.pipe_out = None
self.thread_incoming = None
self.rom_file_path = None
self.lua_interface_path = None
self.emulator_started = False
## ---------- gym.Env methods -------------
def _seed(self, seed=None):
self.np_random, seed = gym.utils.seeding.np_random(seed)
return [seed]
def _seed(self, seed = gym.utils.seeding.np_random(seed)
return [seed]
def _seed(self, seed = gym.utils.seeding.np_random(seed)
return [seed]
def _seed(self, seed = gym.utils.seeding.np_random(seed)
return [seed]
def _seed(self, seed = gym.utils.seeding.np_random(seed)
return [seed]
def _seed(self, seed = gym.utils.seeding.np_random(seed)
return [seed]
def __init__(self, game='pong', obs_type='ram', frameskip=(2, 5), repeat_action_probability=0.):
"""Frameskip should be either a tuple (indicating a random range to
choose from,with the top value exclude),or an int."""
utils.EzPickle.__init__(self, game, obs_type)
assert obs_type in ('ram', 'image')
self.game_path = atari_py.get_game_path(game)
if not os.path.exists(self.game_path):
raise IOError('You asked for game %s but path %s does not exist'%(game, self.game_path))
self._obs_type = obs_type
self.frameskip = frameskip
self.ale = atari_py.ALEInterface()
self.viewer = None
# Tune (or disable) ALE's action repeat:
# https://github.com/openai/gym/issues/349
assert isinstance(repeat_action_probability, (float, int)), "Invalid repeat_action_probability: {!r}".format(repeat_action_probability)
self.ale.setFloat('repeat_action_probability'.encode('utf-8'), repeat_action_probability)
self._seed()
(screen_width, screen_height) = self.ale.getScreenDims()
self._action_set = self.ale.getMinimalActionSet()
self.action_space = spaces.discrete(len(self._action_set))
(screen_width,screen_height) = self.ale.getScreenDims()
if self._obs_type == 'ram':
self.observation_space = spaces.Box(low=np.zeros(128), high=np.zeros(128)+255)
elif self._obs_type == 'image':
self.observation_space = spaces.Box(low=0, high=255, shape=(screen_height, screen_width, 3))
else:
raise error.Error('Unrecognized observation type: {}'.format(self._obs_type))
def _render(self, mode='human', close=False):
if mode == 'rgb_array':
return self.get_bitmap()
# utils functions
def __init__(self, 3))
else:
raise error.Error('Unrecognized observation type: {}'.format(self._obs_type))
def __init__(self, game='classic_kong', 'image')
self.game_path = self.get_rom_path(game)
self._obs_type = obs_type
self.frameskip = frameskip
self.rle = rle_python_interface.RLEInterface()
self.viewer = None
# Tune (or disable) RLE's action repeat:
# https://github.com/openai/gym/issues/349
assert isinstance(repeat_action_probability, "Invalid repeat_action_probability: {!r}".format(repeat_action_probability)
self.rle.setFloat('repeat_action_probability'.encode('utf-8'), screen_height) = self.rle.getScreenDims()
self._buffer = np.empty((screen_height, 4), dtype=np.uint8)
self._action_set = self.rle.getMinimalActionSet()
self.action_space = spaces.discrete(len(self._action_set))
(screen_width,screen_height) = self.rle.getScreenDims()
ram_size = self.rle.getRAMSize()
if self._obs_type == 'ram':
self.observation_space = spaces.Box(low=np.zeros(ram_size), high=np.zeros(ram_size)+255)
elif self._obs_type == 'image':
self.observation_space = spaces.Box(low=0, ))
else:
raise error.Error('Unrecognized observation type: {}'.format(self._obs_type))
def __init__(self, screen_height) = self.ale.getScreenDims()
self._buffer = np.empty((screen_height, dtype=np.uint8)
self._action_set = self.ale.getMinimalActionSet()
self.action_space = spaces.discrete(len(self._action_set))
(screen_width, 3))
else:
raise error.Error('Unrecognized observation type: {}'.format(self._obs_type))
def __init__(self, client_id, base_url=allocator_base,
address_type=None, start_timeout=None, api_key=None,
runtime_id=None, params=None, placement=None,
use_recorder_ports=False,
):
super(AllocatorManager, self).__init__()
self.label = 'AllocatorManager'
self.supports_reconnect = True
self.connect_vnc = True
self.connect_rewarder = True
if address_type is None: address_type = 'public'
if address_type not in ['public', 'pod', 'private']:
raise error.Error('Bad address type specified: {}. Must be public,pod,or private.'.format(address_type))
self.client_id = client_id
self.address_type = address_type
if start_timeout is None:
start_timeout = 20 * 60
self.start_timeout = start_timeout
self.params = params
self.placement = placement
self.use_recorder_ports = use_recorder_ports
# if base_url is None:
# base_url = scoreboard.api_base
# if base_url is None:
# base_url = gym_base_url
# if api_key is None:
# api_key = scoreboard.api_key
# if api_key is None:
# raise gym.error.AuthenticationError("""You must provide an OpenAI Gym API key.
# (HINT: Set your API key using "gym.scoreboard.api_key = .." or "export OPENAI_GYM_API_KEY=..."). You can find your API key in the OpenAI Gym web interface: https://gym.openai.com/settings/profile.""")
if api_key is None:
api_key = _api_key
self._requestor = AllocatorClient(self.label, api_key, base_url=base_url)
self.base_url = base_url
# These Could be overridden on a per-allocation basis,if you
# want heterogeoneous envs. We don't support those currently
# in the higher layers,but this layer Could support it
# easily.
self.runtime_id = runtime_id
self.pending = {}
self.error_buffer = utils.ErrorBuffer()
self.requests = queue.Queue()
self.ready = queue.Queue()
self._reconnect_history = {}
self._sleep = 1
def __init__(self):
utils.EzPickle.__init__(self)
self.rom_path = ''
self.screen_height = 224
self.screen_width = 256
self.action_space = spaces.Multidiscrete([[0, 1]] * NUM_ACTIONS)
self.observation_space = spaces.Box(low=0, shape=(self.screen_height, self.screen_width, 3))
self.launch_vars = {}
self.cmd_args = ['--xscale 2', '--yscale 2', '-f 0']
self.lua_path = []
self.subprocess = None
self.no_render = True
self.viewer = None
# Pipes
self.pipe_name = ''
self.path_pipe_prefix = os.path.join(tempfile.gettempdir(), 'smb-fifo')
self.path_pipe_in = '' # Input pipe (maps to fceux out-pipe and to 'in' file)
self.path_pipe_out = '' # Output pipe (maps to fceux in-pipe and to 'out' file)
self.pipe_out = None
self.lock_out = Lock()
self.disable_in_pipe = False
self.disable_out_pipe = False
self.launch_vars['pipe_name'] = ''
self.launch_vars['pipe_prefix'] = self.path_pipe_prefix
# Other vars
self.is_initialized = 0 # Used to indicate fceux has been launched and is running
self.is_exiting = 0 # Used to stop the listening thread
self.last_frame = 0 # Last processed frame
self.reward = 0 # Reward for last action
self.episode_reward = 0 # Total rewards for episode
self.is_finished = False
self.screen = np.zeros(shape=(self.screen_height, dtype=np.uint8)
self.info = {}
self.level = 0
self._reset_info_vars()
self.first_step = False
self.lock = (nesLock()).get_lock()
# seeding
self.curr_seed = 0
self._seed()
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。