Python ntpath 模块,join() 实例源码
我们从Python开源项目中,提取了以下48个代码示例,用于说明如何使用ntpath.join()。
def _file_path_fixup(self, path):
'''
We have a pseudo-cwd that we use to
base off all commands. This means we
need to figure out if a given path is relative,
absolute,or file relative and calculate against
the pseudo cwd.
This function takes in a given file path arguemnt
and performs the fixups.
'''
if (self._is_path_absolute(path)):
return path
elif (self._is_path_drive_relative(path)):
return self.cwd[:2] + path
else:
return ntpath.join(self.cwd + '\\', path)
def do_connect(self, line):
"""
Command: connect
Description:
Connect to a sensor given the sensor ID or the sensor hostname.
Args:
connect SENSOR_ID | SENSOR_HOSTNAME
"""
if not line:
raise CliArgsException("Need argument: sensor ID or hostname")
sensor = self.connect_callback(self.cb, line)
self.lr_session = sensor.lr_session()
print("Session: {0}".format(self.lr_session.session_id))
print(" Available Drives: %s" % ' '.join(self.lr_session.session_data.get('drives', [])))
# look up supported commands
print(" Supported Commands: %s" % ','.join(self.lr_session.session_data.get('supported_commands', [])))
print(" Working Directory: %s" % self.cwd)
log.info("Attached to sensor {0}".format(sensor._model_unique_id))
def getInterface(self, interface, resp):
# Now let's parse the answer and build an Interface instance
objRefType = OBJREF(''.join(resp))['flags']
objRef = None
if objRefType == FLAGS_OBJREF_CUSTOM:
objRef = OBJREF_CUSTOM(''.join(resp))
elif objRefType == FLAGS_OBJREF_HANDLER:
objRef = OBJREF_HANDLER(''.join(resp))
elif objRefType == FLAGS_OBJREF_STANDARD:
objRef = OBJREF_STANDARD(''.join(resp))
elif objRefType == FLAGS_OBJREF_EXTENDED:
objRef = OBJREF_EXTENDED(''.join(resp))
else:
logging.error("UnkNown OBJREF Type! 0x%x" % objRefType)
return IRemUnkNown2(
INTERFACE(interface.get_cinstance(), None, interface.get_ipidRemUnkNown(), objRef['std']['ipid'],
oxid=objRef['std']['oxid'], oid=objRef['std']['oxid'],
target=interface.get_target()))
def do_put(self, s):
try:
params = s.split(' ')
if len(params) > 1:
src_path = params[0]
dst_path = params[1]
elif len(params) == 1:
src_path = params[0]
dst_path = ''
src_file = os.path.basename(src_path)
fh = open(src_path, 'rb')
dst_path = string.replace(dst_path, '/','\\')
import ntpath
pathname = ntpath.join(ntpath.join(self.__pwd,dst_path), src_file)
drive, tail = ntpath.splitdrive(pathname)
logging.info("Uploading %s to %s" % (src_file, pathname))
self.__transferClient.putFile(drive[:-1]+'$', tail, fh.read)
fh.close()
except Exception, e:
logging.critical(str(e))
pass
def hexdump(data):
x=str(data)
strLen = len(x)
i = 0
while i < strLen:
print "%04x " % i,
for j in range(16):
if i+j < strLen:
print "%02X" % ord(x[i+j]),
else:
print " ",
if j%16 == 7:
print "",
print " ",
print ''.join(pretty_print(x) for x in x[i:i+16] )
i += 16
# Reserved/fixed MFTs
def do_cat(self, line, command = sys.stdout.write):
pathName = string.replace(line,'/','\\')
pathName = ntpath.normpath(ntpath.join(self.pwd,pathName))
res = self.findpathName(pathName)
if res is None:
logging.error("Not found!")
return
if res.isDirectory() > 0:
logging.error("It's a directory!")
return
if res.isCompressed() or res.isEncrypted() or res.issparse():
logging.error('Cannot handle compressed/encrypted/sparse files! :(')
return
stream = res.getStream(None)
chunks = 4096*10
written = 0
for i in range(stream.getDataSize()/chunks):
buf = stream.read(i*chunks, chunks)
written += len(buf)
command(buf)
if stream.getDataSize() % chunks:
buf = stream.read(written, stream.getDataSize() % chunks)
command(buf)
logging.info("%d bytes read" % stream.getDataSize())
def __executeRemote(self, data):
self.__tmpServiceName = ''.join([random.choice(string.letters) for _ in range(8)]).encode('utf-16le')
command = self.__shell + 'echo ' + data + ' ^> ' + self.__output + ' > ' + self.__batchFile + ' & ' + \
self.__shell + self.__batchFile
command += ' & ' + 'del ' + self.__batchFile
self.__serviceDeleted = False
resp = scmr.hRCreateServiceW(self.__scmr, self.__scManagerHandle, self.__tmpServiceName,
lpBinaryPathName=command)
service = resp['lpServiceHandle']
try:
scmr.hRStartServiceW(self.__scmr, service)
except:
pass
scmr.hRDeleteService(self.__scmr, service)
self.__serviceDeleted = True
scmr.hRCloseServiceHandle(self.__scmr, service)
def transformKey(self, InputKey):
# Section 2.2.11.1.2 Encrypting a 64-Bit Block with a 7-Byte Key
OutputKey = []
OutputKey.append( chr(ord(InputKey[0]) >> 0x01) )
OutputKey.append( chr(((ord(InputKey[0])&0x01)<<6) | (ord(InputKey[1])>>2)) )
OutputKey.append( chr(((ord(InputKey[1])&0x03)<<5) | (ord(InputKey[2])>>3)) )
OutputKey.append( chr(((ord(InputKey[2])&0x07)<<4) | (ord(InputKey[3])>>4)) )
OutputKey.append( chr(((ord(InputKey[3])&0x0F)<<3) | (ord(InputKey[4])>>5)) )
OutputKey.append( chr(((ord(InputKey[4])&0x1F)<<2) | (ord(InputKey[5])>>6)) )
OutputKey.append( chr(((ord(InputKey[5])&0x3F)<<1) | (ord(InputKey[6])>>7)) )
OutputKey.append( chr(ord(InputKey[6]) & 0x7F) )
for i in range(8):
OutputKey[i] = chr((ord(OutputKey[i]) << 1) & 0xfe)
return "".join(OutputKey)
def upload_file(self, host, src, dst):
dst = string.replace(dst,'\\')
dst = ntpath.normpath(dst)
dst = dst.split('\\')
share = dst[0]
dst = '\\'.join(dst[1:])
if os.path.exists(src):
color('[+] Starting upload: %s (%s bytes)' % (src, os.path.getsize(src)))
upFile = open(src, 'rb')
try:
self.smbconn[host].putFile(share, dst, upFile.read)
color('[+] Upload complete' )
except Exception as e:
color('[!]', e)
color('[!] Error uploading file,you need to include destination file name in the path')
upFile.close()
else:
color('[!] Invalid source. File does not exist')
sys.exit()
def _CreateMSVSUserFile(proj_path, version, spec):
"""Generates a .user file for the user running this Gyp program.
Arguments:
proj_path: The path of the project file being created. The .user file
shares the same path (with an appropriate suffix).
version: The VisualStudioVersion object.
spec: The target dictionary containing the properties of the target.
Returns:
The MSVSUserFile object created.
"""
(domain, username) = _GetDomainAndUserName()
vcuser_filename = '.'.join([proj_path, domain, username, 'user'])
user_file = MSVSUserFile.Writer(vcuser_filename,
spec['target_name'])
return user_file
def _GetDefines(config):
"""Returns the list of preprocessor deFinitions for this configuation.
Arguments:
config: The dictionary that defines the special processing to be done
for this configuration.
Returns:
The list of preprocessor deFinitions.
"""
defines = []
for d in config.get('defines', []):
if type(d) == list:
fd = '='.join([str(dpart) for dpart in d])
else:
fd = str(d)
defines.append(fd)
return defines
def _GetMSVSAttributes(spec, config, config_type):
# Prepare configuration attributes.
prepared_attrs = {}
source_attrs = config.get('msvs_configuration_attributes', {})
for a in source_attrs:
prepared_attrs[a] = source_attrs[a]
# Add props files.
vsprops_dirs = config.get('msvs_props', [])
vsprops_dirs = _FixPaths(vsprops_dirs)
if vsprops_dirs:
prepared_attrs['Inheritedpropertysheets'] = ';'.join(vsprops_dirs)
# Set configuration type.
prepared_attrs['ConfigurationType'] = config_type
output_dir = prepared_attrs.get('OutputDirectory',
'$(SolutionDir)$(ConfigurationName)')
prepared_attrs['OutputDirectory'] = _FixPath(output_dir) + '\\'
if 'IntermediateDirectory' not in prepared_attrs:
intermediate = '$(ConfigurationName)\\obj\\$(ProjectName)'
prepared_attrs['IntermediateDirectory'] = _FixPath(intermediate) + '\\'
else:
intermediate = _FixPath(prepared_attrs['IntermediateDirectory']) + '\\'
intermediate = MSVSSettings.FixVCMacroSlashes(intermediate)
prepared_attrs['IntermediateDirectory'] = intermediate
return prepared_attrs
def _Getcopies(spec):
copies = []
# Add copies.
for cpy in spec.get('copies', []):
for src in cpy.get('files', []):
dst = os.path.join(cpy['destination'], os.path.basename(src))
# _AddCustomBuildToolForMSVS() will call _FixPath() on the inputs and
# outputs,so do the same for our generated command line.
if src.endswith('/'):
src_bare = src[:-1]
base_dir = posixpath.split(src_bare)[0]
outer_dir = posixpath.split(src_bare)[1]
cmd = 'cd "%s" && xcopy /e /f /y "%s" "%s\\%s\\"' % (
_FixPath(base_dir), outer_dir, _FixPath(dst), outer_dir)
copies.append(([src], ['dummy_copies', dst], cmd,
'copying %s to %s' % (src, dst)))
else:
cmd = 'mkdir "%s" 2>nul & set ERRORLEVEL=0 & copy /Y "%s" "%s"' % (
_FixPath(cpy['destination']), _FixPath(src), _FixPath(dst))
copies.append(([src], [dst], 'copying %s to %s' % (src, dst)))
return copies
def _DictsToFolders(base_path, bucket, flat):
# Convert to folders recursively.
children = []
for folder, contents in bucket.iteritems():
if type(contents) == dict:
folder_children = _DictsToFolders(os.path.join(base_path, folder),
contents, flat)
if flat:
children += folder_children
else:
folder_children = MSVSNew.MSVSFolder(os.path.join(base_path,
name='(' + folder + ')',
entries=folder_children)
children.append(folder_children)
else:
children.append(contents)
return children
def _GetPathOfProject(qualified_target, spec, options, msvs_version):
default_config = _GetDefaultConfiguration(spec)
proj_filename = default_config.get('msvs_existing_vcproj')
if not proj_filename:
proj_filename = (spec['target_name'] + options.suffix +
msvs_version.ProjectExtension())
build_file = gyp.common.BuildFile(qualified_target)
proj_path = os.path.join(os.path.dirname(build_file), proj_filename)
fix_prefix = None
if options.generator_output:
project_dir_path = os.path.dirname(os.path.abspath(proj_path))
proj_path = os.path.join(options.generator_output, proj_path)
fix_prefix = gyp.common.RelativePath(project_dir_path,
os.path.dirname(proj_path))
return proj_path, fix_prefix
def _VerifySourcesExist(sources, root_dir):
"""Verifies that all source files exist on disk.
Checks that all regular source files,i.e. not created at run time,
exist on disk. Missing files cause needless recompilation but no otherwise
visible errors.
Arguments:
sources: A recursive list of Filter/file names.
root_dir: The root directory for the relative path names.
Returns:
A list of source files that cannot be found on disk.
"""
missing_sources = []
for source in sources:
if isinstance(source, MSVSProject.Filter):
missing_sources.extend(_VerifySourcesExist(source.contents, root_dir))
else:
if '$' not in source:
full_path = os.path.join(root_dir, source)
if not os.path.exists(full_path):
missing_sources.append(full_path)
return missing_sources
def _AddMSBuildAction(spec, primary_input, inputs, outputs, description,
sources_handled_by_action, actions_spec):
command = MSVSSettings.ConvertVCMacrosToMSBuild(cmd)
primary_input = _FixPath(primary_input)
inputs_array = _FixPaths(inputs)
outputs_array = _FixPaths(outputs)
additional_inputs = ';'.join([i for i in inputs_array
if i != primary_input])
outputs = ';'.join(outputs_array)
sources_handled_by_action.add(primary_input)
action_spec = ['CustomBuild', {'Include': primary_input}]
action_spec.extend(
# Todo(jeanluc) 'Document' for all or just if as_sources?
[['FileType', 'Document'],
['Command', command],
['Message', description],
['Outputs', outputs]
])
if additional_inputs:
action_spec.append(['AdditionalInputs', additional_inputs])
actions_spec.append(action_spec)
def transformKey(self, InputKey):
# Section 2.2.11.1.2 Encrypting a 64-Bit Block with a 7-Byte Key
OutputKey = []
OutputKey.append( chr(ord(InputKey[0]) >> 0x01) )
OutputKey.append( chr(((ord(InputKey[0])&0x01)<<6) | (ord(InputKey[1])>>2)) )
OutputKey.append( chr(((ord(InputKey[1])&0x03)<<5) | (ord(InputKey[2])>>3)) )
OutputKey.append( chr(((ord(InputKey[2])&0x07)<<4) | (ord(InputKey[3])>>4)) )
OutputKey.append( chr(((ord(InputKey[3])&0x0F)<<3) | (ord(InputKey[4])>>5)) )
OutputKey.append( chr(((ord(InputKey[4])&0x1F)<<2) | (ord(InputKey[5])>>6)) )
OutputKey.append( chr(((ord(InputKey[5])&0x3F)<<1) | (ord(InputKey[6])>>7)) )
OutputKey.append( chr(ord(InputKey[6]) & 0x7F) )
for i in range(8):
OutputKey[i] = chr((ord(OutputKey[i]) << 1) & 0xfe)
return "".join(OutputKey)
def _CreateMSVSUserFile(proj_path,
spec['target_name'])
return user_file
def _GetDefines(config):
"""Returns the list of preprocessor deFinitions for this configuation.
Arguments:
config: The dictionary that defines the special processing to be done
for this configuration.
Returns:
The list of preprocessor deFinitions.
"""
defines = []
for d in config.get('defines', []):
if type(d) == list:
fd = '='.join([str(dpart) for dpart in d])
else:
fd = str(d)
defines.append(fd)
return defines
def _GetMSVSAttributes(spec,
'$(SolutionDir)$(ConfigurationName)')
prepared_attrs['OutputDirectory'] = _FixPath(output_dir) + '\\'
if 'IntermediateDirectory' not in prepared_attrs:
intermediate = '$(ConfigurationName)\\obj\\$(ProjectName)'
prepared_attrs['IntermediateDirectory'] = _FixPath(intermediate) + '\\'
else:
intermediate = _FixPath(prepared_attrs['IntermediateDirectory']) + '\\'
intermediate = MSVSSettings.FixVCMacroSlashes(intermediate)
prepared_attrs['IntermediateDirectory'] = intermediate
return prepared_attrs
def _DictsToFolders(base_path,
entries=folder_children)
children.append(folder_children)
else:
children.append(contents)
return children
def _VerifySourcesExist(sources, source)
if not os.path.exists(full_path):
missing_sources.append(full_path)
return missing_sources
def _AddMSBuildAction(spec, additional_inputs])
actions_spec.append(action_spec)
def argv_to_cmdline(argv):
"""
Convert a list of arguments to a single command line string.
@type argv: list( str )
@param argv: List of argument strings.
The first element is the program to execute.
@rtype: str
@return: Command line string.
"""
cmdline = list()
for token in argv:
if not token:
token = '""'
else:
if '"' in token:
token = token.replace('"', '\\"')
if ' ' in token or \
'\t' in token or \
'\n' in token or \
'\r' in token:
token = '"%s"' % token
cmdline.append(token)
return ' '.join(cmdline)
def get_explorer_pid(self):
"""
Tries to find the process ID for "explorer.exe".
@rtype: int or None
@return: Returns the process ID,or C{None} on error.
"""
try:
exp = win32.SHGetFolderPath(win32.CSIDL_WINDOWS)
except Exception:
exp = None
if not exp:
exp = getenv('SystemRoot')
if exp:
exp = ntpath.join(exp, 'explorer.exe')
exp_list = self.find_processes_by_filename(exp)
if exp_list:
return exp_list[0][0].get_pid()
return None
#------------------------------------------------------------------------------
# XXX this methods musn't end up calling __initialize_snapshot by accident!
def _CreateMSVSUserFile(proj_path,
spec['target_name'])
return user_file
def _GetDefines(config):
"""Returns the list of preprocessor deFinitions for this configuation.
Arguments:
config: The dictionary that defines the special processing to be done
for this configuration.
Returns:
The list of preprocessor deFinitions.
"""
defines = []
for d in config.get('defines', []):
if type(d) == list:
fd = '='.join([str(dpart) for dpart in d])
else:
fd = str(d)
defines.append(fd)
return defines
def _GetMSVSAttributes(spec,
'$(SolutionDir)$(ConfigurationName)')
prepared_attrs['OutputDirectory'] = _FixPath(output_dir) + '\\'
if 'IntermediateDirectory' not in prepared_attrs:
intermediate = '$(ConfigurationName)\\obj\\$(ProjectName)'
prepared_attrs['IntermediateDirectory'] = _FixPath(intermediate) + '\\'
else:
intermediate = _FixPath(prepared_attrs['IntermediateDirectory']) + '\\'
intermediate = MSVSSettings.FixVCMacroSlashes(intermediate)
prepared_attrs['IntermediateDirectory'] = intermediate
return prepared_attrs
def _DictsToFolders(base_path,
entries=folder_children)
children.append(folder_children)
else:
children.append(contents)
return children
def _execvpe(file, args, env=None):
if env is not None:
exec_func = execve
argrest = (args, env)
else:
exec_func = execv
argrest = (args,)
env = environ
head, tail = path.split(file)
if head:
exec_func(file, *argrest)
return
last_exc = saved_exc = None
saved_tb = None
path_list = get_exec_path(env)
if name != 'nt':
file = fsencode(file)
path_list = map(fsencode, path_list)
for dir in path_list:
fullname = path.join(dir, file)
try:
exec_func(fullname, *argrest)
except OSError as e:
last_exc = e
tb = sys.exc_info()[2]
if (e.errno != errno.ENOENT and e.errno != errno.ENOTDIR
and saved_exc is None):
saved_exc = e
saved_tb = tb
if saved_exc:
raise saved_exc.with_traceback(saved_tb)
raise last_exc.with_traceback(tb)
def __repr__(self):
return 'environ({{{}}})'.format(','.join(
('{!r}: {!r}'.format(self.decodekey(key), self.decodevalue(value))
for key, value in self._data.items())))
def _execvpe(file, env=None):
if env is not None:
func = execve
argrest = (args, env)
else:
func = execv
argrest = (args, tail = path.split(file)
if head:
func(file, *argrest)
return
if 'PATH' in env:
envpath = env['PATH']
else:
envpath = defpath
PATH = envpath.split(pathsep)
saved_exc = None
saved_tb = None
for dir in PATH:
fullname = path.join(dir, file)
try:
func(fullname, *argrest)
except error, e:
tb = sys.exc_info()[2]
if (e.errno != errno.ENOENT and e.errno != errno.ENOTDIR
and saved_exc is None):
saved_exc = e
saved_tb = tb
if saved_exc:
raise error, saved_exc, saved_tb
raise error, e, tb
# Change environ to automatically call putenv() if it exists
def split_cli(line):
'''
we'd like to use shlex.split() but that doesn't work well for
windows types of things. for Now we'll take the easy approach
and just split on space. We'll then cycle through and look for leading
quotes and join those lines
'''
parts = line.split(' ')
final = []
inQuotes = False
while len(parts) > 0:
tok = parts.pop(0)
if (tok[:1] == '"'):
tok = tok[1:]
next = parts.pop(0)
while(next[-1:] != '"' and len(parts) > 0):
tok += ' ' + next
next = parts.pop(0)
if (next[-1:] == '"'):
tok += ' ' + next[:-1]
final.append(tok)
return final
def _execvpe(file, tb
# Change environ to automatically call putenv() if it exists
def _execvpe(file, tb
# Change environ to automatically call putenv() if it exists
def _execvpe(file, tb
# Change environ to automatically call putenv() if it exists
def _execvpe(file, tb
# Change environ to automatically call putenv() if it exists
def run(self, args):
self.rep = os.path.join("data", "downloads", self.client.short_name(), "creds")
try:
os.makedirs(self.rep)
except Exception:
pass
if self.client.is_windows():
self.windows()
else:
self.linux()
def dump(self, length=8):
FILTER=''.join([(len(repr(chr(x)))==3) and chr(x) or '.' for x in range(256)])
N=0; result=''
while src:
s,src = src[:length],src[length:]
hexa = ' '.join(["%02X"%ord(x) for x in s])
s = s.translate(FILTER)
result += "%04X %-*s %s\n" % (N, length*3, hexa, s)
N+=length
return result
def do_get(self, src_path):
try:
import ntpath
newPath = ntpath.normpath(ntpath.join(self.__pwd, src_path))
drive, tail = ntpath.splitdrive(newPath)
filename = ntpath.basename(tail)
fh = open(filename,'wb')
logging.info("Downloading %s\\%s" % (drive, tail))
self.__transferClient.getFile(drive[:-1]+'$', fh.write)
fh.close()
except Exception, e:
logging.error(str(e))
os.remove(filename)
pass
def do_cd(self, s):
self.execute_remote('cd ' + s)
if len(self.__outputBuffer.strip('\r\n')) > 0:
print self.__outputBuffer
self.__outputBuffer = ''
else:
self.__pwd = ntpath.normpath(ntpath.join(self.__pwd, s))
self.execute_remote('cd ')
self.__pwd = self.__outputBuffer.strip('\r\n')
self.prompt = self.__pwd + '>'
self.__outputBuffer = ''
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。