Python os 模块,access() 实例源码
我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用os.access()。
def _clean_check(cmd, target):
"""
Run the command to download target. If the command fails,clean up before
re-raising the error.
"""
try:
subprocess.check_call(cmd)
except subprocess.CalledProcessError:
if os.access(target, os.F_OK):
os.unlink(target)
raise
def copy_from_host(module):
compress = module.params.get('compress')
src = module.params.get('src')
if not os.path.exists(src):
module.fail_json(msg="file not found: {}".format(src))
if not os.access(src, os.R_OK):
module.fail_json(msg="file is not readable: {}".format(src))
mode = oct(os.stat(src).st_mode & 0o777)
with open(src, 'rb') as f:
raw_data = f.read()
sha1 = hashlib.sha1(raw_data).hexdigest()
data = zlib.compress(raw_data) if compress else raw_data
module.exit_json(content=base64.b64encode(data), sha1=sha1, mode=mode,
source=src)
def which(program):
"""Determines whether program exists
"""
def is_exe(fpath):
return os.path.isfile(fpath) and os.access(fpath, os.X_OK)
fpath, fname = os.path.split(program)
if fpath:
if is_exe(program):
return program
else:
for path in os.environ["PATH"].split(os.pathsep):
path = path.strip('"')
exe_file = os.path.join(path, program)
if is_exe(exe_file):
return exe_file
raise
def populate_memory(self, areas):
for name, address, size, permission, input_file in areas:
perm = self.unicorn_permissions(permission)
self.vm.mem_map(address, perm)
self.areas[name] = [address,]
msg = "Map %s @%x (size=%d,perm=%s)" % (name, permission)
if input_file is not None and os.access(input_file, os.R_OK):
code = open(input_file, 'rb').read()
self.vm.mem_write(address, bytes(code[:size]))
msg += " and content from '%s'" % input_file
self.log(msg, "Setup")
self.start_addr = self.areas[".text"][0]
self.end_addr = -1
return True
def check_path_owner(path):
# If we don't have a way to check the effective uid of this process,then
# we'll just assume that we own the directory.
if not hasattr(os, "geteuid"):
return True
prevIoUs = None
while path != prevIoUs:
if os.path.lexists(path):
# Check if path is writable by current user.
if os.geteuid() == 0:
# Special handling for root user in order to handle properly
# cases where users use sudo without -H flag.
try:
path_uid = get_path_uid(path)
except OSError:
return False
return path_uid == 0
else:
return os.access(path, os.W_OK)
else:
prevIoUs, path = path, os.path.dirname(path)
def __addTooltip(self, item, text, hideWarn=False):
self.__loadTooltip()
if not ToolTip:
if not hideWarn:
self.warn("ToolTips unavailable - check tooltip.py is in the lib folder")
else:
# turn off warnings about tooltips
if hideWarn:
myWarn = self.__pauseWarn()
var = StringVar(self.topLevel)
var.set(text)
tip = ToolTip(item, delay=500, follow_mouse=1, textvariable=var)
item.tooltip = tip
if hideWarn:
self.__resumeWarn(myWarn)
return var
#####################################
# FUNCTIONS to show pop-up dialogs
#####################################
# function to access the last made pop_up
def check_path_owner(path):
# If we don't have a way to check the effective uid of this process, os.path.dirname(path)
def get_config_file_path(cls):
if not cls.IS_GLOBAL:
# local to this directory
base_path = os.path.join('.')
else:
base_path = os.path.expanduser('~')
if not os.access(base_path, os.W_OK):
base_path = '/tmp'
base_path = os.path.join(base_path, '.polyaxon')
if not os.path.exists(base_path):
try:
os.makedirs(base_path)
except OSError:
# Except permission denied and potential race conditions
# in multi-threaded environments.
logger.error('Could not create config directory `{}`'.format(base_path))
return os.path.join(base_path, cls.CONfig_FILE_NAME)
def _test_NoAccessDir(self, nodeName):
devBooter, devMgr = self.launchdeviceManager("/nodes/%s/DeviceManager.dcd.xml" % nodeName)
device = devMgr._get_registeredDevices()[0]
fileMgr = self._domMgr._get_fileMgr()
dirname = '/noaccess'
testdir = os.path.join(scatest.getSdrPath(), 'dom' + dirname)
if not os.path.exists(testdir):
os.mkdir(testdir, 0000)
else:
os.chmod(testdir, 0000)
try:
self.assertFalse(os.access(testdir, os.R_OK|os.X_OK), 'Current user can still access directory')
self.assertRaises(CF.LoadableDevice.LoadFail, device.load, fileMgr, dirname, CF.LoadableDevice.SHARED_LIBRARY)
finally:
os.rmdir(testdir)
def test_ExistsException(self):
self.assertNotEqual(self._domMgr, None)
fileMgr = self._domMgr._get_fileMgr()
# Makes sure that FileSystem::exists() throws correct exception and
# doesn't kill domain for files in directories it cannot access
dirname = '/noaccess'
testdir = os.path.join(scatest.getSdrPath(), 0644)
else:
os.chmod(testdir, 0644)
try:
self.assertFalse(os.access(testdir, 'Current user can still access directory')
self.assertRaises(CF.InvalidFileName, fileMgr.exists, os.path.join(dirname, 'testfile'))
finally:
os.rmdir(testdir)
def check_fastq(fastq):
# Check if fastq is readable
if not os.access(fastq, os.R_OK):
martian.exit("Do not have file read permission for FASTQ file: %s" % fastq)
# Check if fastq is gzipped
is_gzip_fastq = True
try:
with gzip.open(fastq) as f:
f.read(1)
except:
is_gzip_fastq = False
if is_gzip_fastq and not fastq.endswith(cr_constants.GZIP_SUFFIX):
martian.exit("Input FASTQ file is gzipped but filename does not have %s suffix: %s" % (fastq, cr_constants.GZIP_SUFFIX))
if not is_gzip_fastq and fastq.endswith(cr_constants.GZIP_SUFFIX):
martian.exit("Input FASTQ file is not gzipped but filename has %s suffix: %s" % (fastq, cr_constants.GZIP_SUFFIX))
def safeInstall():
FACTORIOPATH = getFactorioPath()
try:
if not os.path.isdir("%s" % (FACTORIOPATH) ):
if os.access("%s/.." % (FACTORIOPATH), os.W_OK):
os.mkdir(FACTORIOPATH, 0o777)
else:
subprocess.call(['sudo', 'mkdir', '-p', FACTORIOPATH])
subprocess.call(['sudo', 'chown', getpass.getuser(), FACTORIOPATH])
os.mkdir(os.path.join(FACTORIOPATH, "saves"))
os.mkdir(os.path.join(FACTORIOPATH, "config"))
with open("%s/.bashrc" % (os.path.expanduser("~")), "r+") as bashrc:
lines = bashrc.read()
if lines.find("eval \"$(_FACTOTUM_COMPLETE=source factotum)\"\n") == -1:
bashrc.write("eval \"$(_FACTOTUM_COMPLETE=source factotum)\"\n")
print("You'll want to restart your shell for command autocompletion. Tab is your friend.")
updateFactorio()
except IOError as e:
print("Cannot make %s. Please check permissions. Error %s" % (FACTORIOPATH, e))
sys.exit(1)
def __init__(self, dir, file_template=_default_file_template,
truncate_slug_length=40,
version_locations=None,
sourceless=False, output_encoding="utf-8"):
self.dir = dir
self.file_template = file_template
self.version_locations = version_locations
self.truncate_slug_length = truncate_slug_length or 40
self.sourceless = sourceless
self.output_encoding = output_encoding
self.revision_map = revision.RevisionMap(self._load_revisions)
if not os.access(dir, os.F_OK):
raise util.CommandError("Path doesn't exist: %r. Please use "
"the 'init' command to create a new "
"scripts folder." % dir)
def check_path_owner(path):
# If we don't have a way to check the effective uid of this process, os.path.dirname(path)
def __init__(self, certstore):
"""
certstore - path to the file used to store
CA certificates
eg /etc/apache/ssl.crt/ca-bundle.crt
>>> v = Verifier('/etc/dummy.crt')
>>> v.verify('pippo')
Traceback (most recent call last):
File "/usr/lib/python2.3/doctest.py",line 442,in _run_examples_inner
compileflags,1) in globs
File "<string>",line 1,in ?
File "verifier.py",line 46,in verify
self._setup()
File "verifier.py",line 36,in _setup
raise VerifierError,"cannot access %s" % self._certstore
VerifierError: cannot access /etc/dummy.crt
>>>
"""
self._certstore = certstore
self._smime = None
def main(argv):
dir_ = argv[1]
for entry in sorted(os.listdir(dir_)):
path = os.path.join(dir_, entry)
size = os.path.getsize(path)
mtime = os.path.getmtime(path)
mtime_format = time.strftime("%b %d %H:%M", time.localtime(mtime))
reset = '\033[0m'
if os.path.isdir(path):
color = '\033[1;94m'
elif os.access(path, os.X_OK):
color = '\033[1;92m'
else:
color = ''
senf.print_("%6d %13s %s%s%s" % (size, mtime_format, color,
entry, reset))
def check_exe(name, env=None):
"""
Ensure that a program exists
:type name: string
:param name: name or path to program
:return: path of the program or None
"""
if not name:
raise ValueError('Cannot execute an empty string!')
def is_exe(fpath):
return os.path.isfile(fpath) and os.access(fpath, fname = os.path.split(name)
if fpath and is_exe(name):
return os.path.abspath(name)
else:
env = env or os.environ
for path in env["PATH"].split(os.pathsep):
path = path.strip('"')
exe_file = os.path.join(path, name)
if is_exe(exe_file):
return os.path.abspath(exe_file)
return None
def check_exe(name, name)
if is_exe(exe_file):
return os.path.abspath(exe_file)
return None
def check_exe(name, env=None):
"""
Ensure that a program exists
:type name: string
:param name: name or path to program
:return: path of the program or None
"""
if not name:
raise ValueError('Cannot execute an empty string!')
def is_exe(fpath):
return os.path.isfile(fpath) and os.access(fpath, name)
if is_exe(exe_file):
return os.path.abspath(exe_file)
return None
def _test_resource_paths(my):
path = my.server.get_resource_path('admin')
# not a very accurate test
my.assertEquals(True, 'etc/admin.tacticrc' in path)
paths = my.server.create_resource_paths()
sys_login = getpass.getuser()
dir = my.server.get_home_dir()
is_dir_writeable = os.access(dir, os.W_OK) and os.path.isdir(dir)
if dir and is_dir_writeable:
dir = "%s/.tactic/etc" % dir
else:
if os.name == 'nt':
dir = 'C:/sthpw/etc'
else:
dir = '/tmp/sthpw/etc'
compared = '%s/%s.tacticrc' %(dir, sys_login) in paths
my.assertEquals(True, compared)
# since we use admin to get resource path,my.login should also be admin
my.assertEquals('admin', my.server.get_login())
def find_boot_files(name, shortname, basedir):
# find vmlinuz or initrd
if name:
fullpath = name if name[0]=='/' else basedir + '/boot/' + name
else:
# try the (only) symlink at the root directory
try1 = basedir + '/' + shortname + '*'
found = sorted(glob.glob(try1))
if len(found) >= 1 and os.access(found[0], os.R_OK):
fullpath = os.path.realpath(found[0])
else:
# try the highest numbered version at /boot
try2 = basedir + '/boot/' + shortname + '*'
found = sorted(glob.glob(try2))
if len(found) < 1:
sys.exit('cannot read ' + try1 + ' and cannot find ' + try2)
fullpath = found[-1]
if (len(found) > 1):
warnings.warn('found more than one ' + try2 + ',using ' + fullpath)
if not os.access(fullpath, os.R_OK):
sys.exit('Failed to read ' + fullpath)
return fullpath
def check_path_owner(path):
# If we don't have a way to check the effective uid of this process, os.path.dirname(path)
def check_path_owner(path):
# If we don't have a way to check the effective uid of this process, os.path.dirname(path)
def check_path_owner(path):
# If we don't have a way to check the effective uid of this process, os.path.dirname(path)
def check_path_owner(path):
# If we don't have a way to check the effective uid of this process, os.path.dirname(path)
def which(program):
import os
def is_exe(fpath):
return os.path.isfile(fpath) and os.access(fpath, program)
if is_exe(exe_file):
return exe_file
return None
#source: https://stackoverflow.com/questions/3431825/generating-an-md5-checksum-of-a-file
def check_path_owner(path):
# If we don't have a way to check the effective uid of this process, os.path.dirname(path)
def check_job_path(jobs_dir, job):
if 'path' not in job:
raise Exception('job does not have the "path" attribute: %s' % job)
if not isinstance(job['path'], basestring):
raise Exception('"path" attribute is not a string: %s' % job)
# check that 'path' is rooted in the same directory as the .vcsjob file
if job['path'].startswith('/'):
raise Exception('job path is not relative: %s' % job['path'])
path = os.path.normpath(os.path.join(jobs_dir, job['path']))
if not path.startswith(jobs_dir):
raise Exception('job path is not inside the job tree: %s' % path)
# also check that the job exists and is runnable
if not os.path.isfile(path):
raise Exception('no such file: %s' % path)
if not os.access(path, os.X_OK):
raise Exception('file is not executable: %s' % path)
if 'profiles' not in job:
job['profiles'] = None
def check_job_path(jobs_dir, os.X_OK):
raise Exception('file is not executable: %s' % path)
if 'profiles' not in job:
job['profiles'] = None
def libvlc_vlm_show_media(p_instance, psz_name):
'''Return information about the named media as a JSON
string representation.
This function is mainly intended for debugging use,
if you want programmatic access to the state of
a vlm_media_instance_t,please use the corresponding
libvlc_vlm_get_media_instance_xxx -functions.
Currently there are no such functions available for
vlm_media_t though.
@param p_instance: the instance.
@param psz_name: the name of the media,if the name is an empty string,all media is described.
@return: string with information about named media,or None on error.
'''
f = _Cfunctions.get('libvlc_vlm_show_media', None) or \
_Cfunction('libvlc_vlm_show_media', ((1,), (1, string_result,
ctypes.c_void_p, Instance, ctypes.c_char_p)
return f(p_instance, psz_name)
def getTasksProcForce(self) :
""" Get tasks list by bruteforcing /proc """
for i in range(1, 65535, 1) :
if(os.access("/proc/" + str(i) + "/status", os.F_OK) == True):
l = self.openProcTaskStatus("/proc/" + str(i) + "/status")
if(l != []):
self.tasks_procforce.map_tasks[int(l[0])] = [int(l[1]), int(l[2]), l[3], 0, 0]
self.tasks_procforce.list_tasks.append(int(l[0]))
if(os.access("/proc/" + str(i) + "/task", os.F_OK) == True):
for srep in os.listdir("/proc/" + str(i) + "/task"):
if(srep != l[0]):
ll = self.openProcTaskStatus("/proc/" + str(i) + "/task/" + srep + "/status")
self.tasks_procforce.map_tasks[int(ll[0])] = [int(ll[1]), int(ll[2]), ll[3], 1]
self.tasks_procforce.list_tasks.append(int(ll[0]))
def find_ancestor_cmd_path(self, cmd, cwd):
"""Recursively check for command binary in ancestors' node_modules/.bin directories."""
node_modules_bin = path.normpath(path.join(cwd, 'node_modules/.bin/'))
binary = path.join(node_modules_bin, cmd)
if sublime.platform() == 'windows' and path.splitext(binary)[1] != '.cmd':
binary += '.cmd'
if binary and access(binary, X_OK):
return binary
parent = path.normpath(path.join(cwd, '../'))
if parent == '/' or parent == cwd:
return None
return self.find_ancestor_cmd_path(cmd, parent)
def __addTooltip(self, textvariable=var)
item.tooltip = tip
if hideWarn:
self.__resumeWarn(myWarn)
return var
#####################################
# FUNCTIONS to show pop-up dialogs
#####################################
# function to access the last made pop_up
def _clone_test_db(self, number, verbosity, keepdb=False):
source_database_name = self.connection.settings_dict['NAME']
target_database_name = self.get_test_db_clone_settings(number)['NAME']
# Forking automatically makes a copy of an in-memory database.
if not self.connection.is_in_memory_db(source_database_name):
# Erase the old test database
if os.access(target_database_name, os.F_OK):
if keepdb:
return
if verbosity >= 1:
print("Destroying old test database for alias %s..." % (
self._get_database_display_str(verbosity, target_database_name),
))
try:
os.remove(target_database_name)
except Exception as e:
sys.stderr.write("Got an error deleting the old test database: %s\n" % e)
sys.exit(2)
try:
shutil.copy(source_database_name, target_database_name)
except Exception as e:
sys.stderr.write("Got an error cloning the test database: %s\n" % e)
sys.exit(2)
def check_path_owner(path):
# If we don't have a way to check the effective uid of this process, os.path.dirname(path)
def which1(program, pathstr=None):
if pathstr is None:
pathstr = os.environ["PATH"]
def is_exe(fpath):
return os.path.isfile(fpath) and os.access(fpath, fname = os.path.split(program)
if fpath:
if is_exe(program):
return program
else:
for path in pathstr.split(os.pathsep):
path = path.strip('"')
exe_file = os.path.join(path, program)
if is_exe(exe_file):
return exe_file
return None
def execd_submodule_paths(command, execd_dir=None):
"""Generate a list of full paths to the specified command within exec_dir.
"""
for module_path in execd_module_paths(execd_dir):
path = os.path.join(module_path, command)
if os.access(path, os.X_OK) and os.path.isfile(path):
yield path
def getMappingsFromTable(self):
self._maps = []
sz = self.memory_mapping.rowCount()
for i in range(sz):
name = self.memory_mapping.item(i, 0)
if not name:
continue
name = name.text()
address = self.memory_mapping.item(i, 1)
if address:
if ishex(address.text()):
address = int(address.text(), 0x10)
else:
address = int(address.text())
size = self.memory_mapping.item(i, 2)
if size:
size = int(size.text(), 0x10) if ishex(size.text()) else int(size.text())
permission = self.memory_mapping.item(i, 3)
if permission:
permission = permission.text()
read_from_file = self.memory_mapping.item(i, 4)
if read_from_file and not os.access(read_from_file.text(), os.R_OK):
read_from_file = None
self._maps.append([name, read_from_file])
return
def loadCode(self, title, filter, run_disassembler):
qFile, qFilter = QFileDialog.getopenFileName(self, EXAMPLES_PATH, filter)
if not os.access(qFile, os.R_OK):
return
if run_disassembler or qFile.endswith(".raw"):
body = disassemble_file(qFile, self.arch)
self.loadFile(qFile, data=body)
else:
self.loadFile(qFile)
return
def can_exe(self, fpath):
"""
Can test if path exists and is executable
"""
if isinstance(fpath, basestring):
return isfile(fpath) and access(fpath, X_OK)
return False
def export(self):
''' Export visible layers and layer groups to CoaSprite '''
if os.path.isfile(os.path.join(self.path, self.name)):
show_error_msg('ABORTING!\nDestination is not a folder.\n {path}/{name}'.format(path=self.path, name=self.name))
return
if not os.access(self.path, os.W_OK):
show_error_msg('ABORTING!\nDestination is not a writable.\n {path}'.format(path=self.path))
return
if os.path.isdir(os.path.join(self.path, self.name)):
show_error_msg('Destination exists,I may have overwritten something in {path}/{name}'.format(path=self.path, name=self.name))
self.mkdir()
# Loop through visible layers
self.img = self.original_img.duplicate()
self.img.undo_group_start()
for layer in self.img.layers:
if layer.visible:
name = '{name}.png'.format(name=layer.name)
pdb.gimp_image_set_active_layer(self.img, layer)
# Crop and the layer position
pdb.plug_in_autocrop_layer(self.img, layer)
z = 0 - pdb.gimp_image_get_item_position(self.img, layer)
if isinstance(layer, gimp.GroupLayer):
if len(layer.children) > 0:
self.sprites.append(self.export_sprite_sheet(layer, name, layer.offsets, z))
else:
self.sprites.append(self.export_sprite(layer, z))
self.write_json()
self.img.undo_group_end()
pdb.gimp_image_delete(self.img)
def is_writable(self, path):
result = False
while not result:
if os.path.exists(path):
result = os.access(path, os.W_OK)
break
parent = os.path.dirname(path)
if parent == path:
break
path = parent
return result
def get_cache_base(suffix=None):
"""
Return the default base location for distlib caches. If the directory does
not exist,it is created. Use the suffix provided for the base directory,
and default to '.distlib' if it isn't provided.
On Windows,if LOCALAPPDATA is defined in the environment,then it is
assumed to be a directory,and will be the parent directory of the result.
On POSIX,and on Windows if LOCALAPPDATA is not defined,the user's home
directory - using os.expanduser('~') - will be the parent directory of
the result.
The result is just the directory '.distlib' in the parent directory as
determined above,or with the name specified with ``suffix``.
"""
if suffix is None:
suffix = '.distlib'
if os.name == 'nt' and 'LOCALAPPDATA' in os.environ:
result = os.path.expandvars('$localappdata')
else:
# Assume posix,or old Windows
result = os.path.expanduser('~')
# we use 'isdir' instead of 'exists',because we want to
# fail if there's a file with that name
if os.path.isdir(result):
usable = os.access(result, os.W_OK)
if not usable:
logger.warning('Directory exists but is not writable: %s', result)
else:
try:
os.makedirs(result)
usable = True
except OSError:
logger.warning('Unable to create %s', result, exc_info=True)
usable = False
if not usable:
result = tempfile.mkdtemp()
logger.warning('Default location unusable,using %s', result)
return os.path.join(result, suffix)
def extraction_error(self):
"""Give an error message for problems extracting file(s)"""
old_exc = sys.exc_info()[1]
cache_path = self.extraction_path or get_default_cache()
tmpl = textwrap.dedent("""
Can't extract file(s) to egg cache
The following error occurred while trying to extract file(s) to the Python egg
cache:
{old_exc}
The Python egg cache directory is currently set to:
{cache_path}
Perhaps your account does not have write access to this directory? You can
change the cache directory by setting the PYTHON_EGG_CACHE environment
variable to point to an accessible directory.
""").lstrip()
err = ExtractionError(tmpl.format(**locals()))
err.manager = self
err.cache_path = cache_path
err.original_error = old_exc
raise err
def _mkstemp_inner(dir, pre, suf, flags, output_type):
"""Code common to mkstemp,TemporaryFile,and NamedTemporaryFile."""
names = _get_candidate_names()
if output_type is bytes:
names = map(_os.fsencode, names)
for seq in range(TMP_MAX):
name = next(names)
file = _os.path.join(dir, pre + name + suf)
try:
fd = _os.open(file, 0o600)
except FileExistsError:
continue # try again
except PermissionError:
# This exception is thrown when a directory with the chosen name
# already exists on windows.
if (_os.name == 'nt' and _os.path.isdir(dir) and
_os.access(dir, _os.W_OK)):
continue
else:
raise
return (fd, _os.path.abspath(file))
raise FileExistsError(_errno.EEXIST,
"No usable temporary file name found")
# User visible interfaces.
def mkdtemp(suffix=None, prefix=None, dir=None):
"""User-callable function to create and return a unique temporary
directory. The return value is the pathname of the directory.
Arguments are as for mkstemp,except that the 'text' argument is
not accepted.
The directory is readable,writable,and searchable only by the
creating user.
Caller is responsible for deleting the directory when done with it.
"""
prefix, suffix, output_type = _sanitize_params(prefix, dir)
names = _get_candidate_names()
if output_type is bytes:
names = map(_os.fsencode, prefix + name + suffix)
try:
_os.mkdir(file, 0o700)
except FileExistsError:
continue # try again
except PermissionError:
# This exception is thrown when a directory with the chosen name
# already exists on windows.
if (_os.name == 'nt' and _os.path.isdir(dir) and
_os.access(dir, _os.W_OK)):
continue
else:
raise
return file
raise FileExistsError(_errno.EEXIST,
"No usable temporary directory name found")
def _clean_check(cmd, os.F_OK):
os.unlink(target)
raise
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。