Python click 模块,File() 实例源码
我们从Python开源项目中,提取了以下21个代码示例,用于说明如何使用click.File()。
def run(infile, outfile, time_interval, quiet):
logging.basicConfig(level=logging.WARN if quiet else logging.INFO)
logger = logging.getLogger(__name__)
logger.info('loading input file %s ...' % infile)
with open(infile) as fin:
# Do not use click.File because we want close the file asap
data = json.load(fin)
n = len(data)
logger.info(
'loading input file %s done. %d data found.'% (infile, n))
for i in xrange(len(data)):
logger.info('Sleeping for %d sec [%d/%d] ...' % (time_interval, i+1, n))
time.sleep(time_interval)
with open(outfile, 'w') as fout:
json.dump(data[:(i+1)], fout)
logger.info('Dumped %dth/%d data to %s' % (i+1, n, outfile))
def cfdv32mx(config):
"""Format cfdi v3.2 for Mexico.
\b
File where the files will be written document.xml.
cfdicli --in_file /path/to/yout/json/documnt.json cfdv32mx
\b
File where the files will be written from document.json.
cfdicli --out_file ./document.xml cfdv32mx
"""
# Todo: look for a secure option for eval.
# Or simply the CLI only should manage json?
# Todo: Implement json option also.
dict_input = eval(config.in_file.read())
invoice = cfdv32.get_invoice(dict_input)
if invoice.valid:
config.out_file.write(invoice.document)
config.out_file.flush()
click.echo('Document %s has been created.' % config.out_file.name)
else:
click.echo(invoice.ups.message)
def pull(project, run, kind, entity):
project, run = api.parse_slug(run, project=project)
urls = api.download_urls(project, run=run, entity=entity)
if len(urls) == 0:
raise ClickException("Run has no files")
click.echo("Downloading: {project}/{run}".format(
project=click.style(project, bold=True), run=run
))
for name in urls:
if api.file_current(name, urls[name]['md5']):
click.echo("File %s is up to date" % name)
else:
length, response = api.download_file(urls[name]['url'])
with click.progressbar(length=length, label='File %s' % name,
fill_char=click.style('&', fg='green')) as bar:
with open(name, "wb") as f:
for data in response.iter_content(chunk_size=4096):
f.write(data)
bar.update(len(data))
def codegen_options(f):
f = click.option(
"-c", "--categories", multiple=True, default=default_categories,
type=click.Choice(all_categories),
help="A list of the categories of inputs and outputs that should "
"be enabled"
)(f)
f = click.option(
"-f", "--param_file",
type=click.File(),
help="""YAML or JSON file describing the firmware module configuration to be flashed.
This is the same file that is used for rosparam in the launch file."""
"code"
)(f)
f = click.option(
"-p", "--plugin", help="Enable a specific plugin"
)(f)
f = click.option(
"-t", "--target", help="PlatformIO target (e.g. upload)"
)(f)
f = click.option(
"--status_update_interval", default=5,
help="Minimum interval between driver status updates (in seconds)"
)(f)
return f
def bake(input_file, output_file, data):
"""
This command bakes Open Badges data into a file and saves
the result to an output file.
Positional Arguments:
\b
Input filename: File must exist.
\b
Output filename: If file exists,it will be overwritten.
"""
output = utils.bake(input_file, data, output_file)
click.echo(
"{} is done baking. Remember to let it cool".format(output_file.name)
)
def generate_crowdflower_interface_template(input_csv, output_html):
""" Generate CrowFlower interface template based on input data spreadsheet
:param file input_csv: CSV file with the input data
:param output_html: File in which to write the output
:type output_html: file
:return: 0 on success
"""
# Get the filed names of the input data spreadsheet
sheet = csv.DictReader(input_csv)
fields = sheet.fieldnames
# Get "fe_[0-9][0-9]" fields
fe_fields = [f for f in fields if re.match(r'fe_[0-9]{2}$', f)]
# Get "chunk[0-9][0-9]" fields
token_fields = [f for f in fields if re.match(r'chunk_[0-9]{2}$', f)]
# Generate fe blocks for every token field
fe_blocks = []
for fe_field in fe_fields:
fe_blocks.append(FE_TEMPLATE % {'fe_field': fe_field})
crowdflower_interface_template = HEADER
# Generate fe_name blocks(question blocks) for every fe_name field
for idx, token_field in enumerate(token_fields):
dic = {'question_num': idx + 1, 'token_field': token_field}
# Add fe blocks into template
dic['fe_blocks'] = ''.join(fe_blocks)
# Add current fe_name block or question block into template
crowdflower_interface_template += (TOKEN_TEMPLATE % dic)
crowdflower_interface_template += FOOTER
output_html.write(crowdflower_interface_template)
return 0
def parse_files(ctx, param, values):
ret = []
converter = click.File('rb')
for item in values:
if '=' not in item:
raise click.BadParameter('String parameter "%s" should be in form of FIELD=VALUE')
field, value = item.split('=', 1)
input_file = converter.convert(value, ctx)
pair = (field, input_file)
ret.append(pair)
return ret
def tpflist(campaign, channel, sc, wget):
"""Prints the Target Pixel File URLS for a given CAMPAIGN/QUARTER and ccd CHANNEL.
CAMPAIGN can refer to a K2 Campaign (e.g. 'C4') or a Kepler Quarter (e.g. 'Q4').
"""
try:
urls = mast.get_tpf_urls(campaign, channel=channel, short_cadence=sc)
if wget:
WGET_CMD = 'wget -nH --cut-dirs=6 -c -N '
print('\n'.join([WGET_CMD + url for url in urls]))
else:
print('\n'.join(urls))
except mast.NoDataFoundException as e:
click.echo(e)
def load_hdf5(path):
'''Load a Dataset object from an HDF5 file.
'''
with h5py.File(path, 'r') as f:
x = f['x'][...]
return Dataset(
x=x.reshape(x.shape[0], -1),
y=f['y'][...].astype(np.int32),
vocab=f['vocab'][...],
height=x.shape[1],
width=x.shape[2]
)
def cli_render(input, output, size):
'''Render a JSONlines dataset to numpy arrays,saved in an HDF5 file.
'''
chars = []
images = []
for line in input:
datum = json.loads(line)
chars.append(datum['target'])
images.append(render(
[np.array(s) for s in datum['strokes']],
size))
vocab = list(sorted(set(chars)))
char_to_index = {ch: y for y, ch in enumerate(vocab)}
with h5py.File(output, 'a') as f:
str_dt = h5py.special_dtype(vlen=str)
f.require_dataset(
'vocab', (len(vocab),), dtype=str_dt
)[...] = vocab
f.require_dataset(
'x', shape=(len(images), size, size), dtype=np.float32
)[...] = np.array(images)
f.require_dataset(
'y', shape=(len(chars), dtype=np.int
)[...] = np.array([char_to_index[ch] for ch in chars])
def convert(self, param=None, ctx=None, value=None):
if not value:
content = sys.stdin.readlines()
else:
is_compressed = value.endswith('.gz')
is_binary = self.mode.endswith('b')
if os.path.exists(value):
local_mode = 'rb' if is_compressed else self.mode
file_obj = open(value, local_mode)
else:
url = urlparse(value)
if not url.scheme:
raise click.BadParameter(
'File \'{}\' not found'.format(url))
elif url.scheme not in self.SUPPORTED_SCHEMES:
raise click.BadParameter(
'Scheme \'{}\' not supported'.format(url.scheme))
else:
file_obj = urlopen(value)
if is_compressed:
with gzip.GzipFile(mode=self.mode, fileobj=file_obj) as file:
content = file.read()
if not is_binary:
content = local_decode(content)
else:
content = file_obj.read()
file_obj.close()
return content
def ssh_command(func):
func = click.option(
"-i", "--identity-file",
type=click.File(lazy=False),
default=str(get_private_key_path()),
help="Path to the private key file",
show_default=True
)(func)
func = click.option(
"-b", "--batch-size",
type=int,
default=20,
help="By default,command won't connect to all servers "
"simultaneously,it is trying to process servers in batches. "
"Negative number or 0 means connect to all hosts",
show_default=True,
)(func)
@functools.wraps(func)
@click.pass_context
def decorator(ctx, identity_file, batch_size, *args, **kwargs):
private_key = asyncssh.import_private_key(identity_file.read())
batch_size = batch_size if batch_size > 0 else None
identity_file.close()
ctx.obj["private_key"] = private_key
ctx.obj["batch_size"] = batch_size
ctx.obj["event_loop"] = asyncio.get_event_loop()
return func(*args, **kwargs)
return decorator
def main(
telegram_token: str,
ns_login: str,
ns_password: str,
log_file: click.File,
verbose: bool,
):
logging.basicConfig(
datefmt="%Y-%m-%d %H:%M:%s",
format="%(asctime)s [%(levelname).1s] %(message)s",
level=(logging.INFO if not verbose else logging.DEBUG),
stream=(log_file or click.get_text_stream("stderr")),
)
logging.info("Starting bot…")
with ExitStack() as exit_stack:
telegram = exit_stack.enter_context(closing(Telegram(telegram_token)))
ns = exit_stack.enter_context(closing(Ns(ns_login, ns_password)))
bot = exit_stack.enter_context(closing(Bot(telegram, ns)))
try:
asyncio.ensure_future(bot.run())
asyncio.get_event_loop().run_forever()
finally:
bot.stop()
# Bot response phrases.
# ----------------------------------------------------------------------------------------------------------------------
def fetch_samples_from_obserations(features, exact, from_,
context):
"""Fetch sample data containing features."""
import redbiom.util
iterable = redbiom.util.from_or_nargs(from_, features)
import redbiom.fetch
tab, map_ = redbiom.fetch.data_from_features(context, iterable, exact)
import h5py
with h5py.File(output, 'w') as fp:
tab.to_hdf5(fp, 'redbiom')
_write_ambig(map_, output)
def fetch_samples_from_samples(samples, context):
"""Fetch sample data."""
import redbiom.util
iterable = redbiom.util.from_or_nargs(from_, samples)
import redbiom.fetch
table, ambig = redbiom.fetch.data_from_samples(context, iterable)
import h5py
with h5py.File(output, 'w') as fp:
table.to_hdf5(fp, 'redbiom')
_write_ambig(ambig, output)
def test_persistfile_exists(self, filename, add_postfix=True):
filename = self._get_persist_filename(filename, add_postfix=add_postfix)
if os.path.isfile(filename):
raise click.UsageError("File '%s' already exists" % filename)
def cli_decrypt(ctx, key_file, passphrase, public_keyfile, verify, keep, progress, leave_progress_bar):
"""Decrypt file(s) with private key."""
if progress is None:
progress = sys.stdout.isatty()
if passphrase is None:
if progress:
passphrase = click.prompt('Passphrase', hide_input=True)
else:
raise click.ClickException("No passphrase given.")
ed_prv = load_private_keyfile(key_file, passphrase)
sender_pubkey = None
if public_keyfile:
sender_pubkey = load_public_keyfile(public_keyfile)
for fh in filename:
if not fh.name.endswith(".s11"):
# Todo: implement output-filename as cli option
raise click.UsageError("File '%s' does not end with .s11" % fh.name)
output_filename = fh.name[:-4]
_decrypt(
f=fh,
ed_prv=ed_prv,
sender_pubkey=sender_pubkey,
verify=verify,
progress=progress,
leave_progress_bar=leave_progress_bar,
output_filename=output_filename,
)
fh.close()
if verify is False and not keep:
os.unlink(fh.name)
def unbake(input_file, output_file):
"""
This command extracts Open Badges data from an image and
prints it to a file or the standard output.
Positional Arguments:
\b
Input filename: File must exist.
\b
Output filename: If file exists,it will be overwritten.
"""
click.echo('')
output_file.write(utils.unbake(input_file))
click.echo('\n')
def status(run, settings, project):
if settings:
click.echo(click.style("Logged in?", bold=True) + " %s" %
bool(api.api_key))
click.echo(click.style("Current Settings", bold=True) +
" (%s)" % api.settings_file)
settings = api.settings()
click.echo(json.dumps(
settings,
sort_keys=True,
indent=2,
separators=(',', ': ')
))
# project,run = api.parse_slug(run,project=project)
# existing = set() # Todo: populate this set with the current files in the run dir
# remote = api.download_urls(project,run)
# not_synced = set()
# remote_names = set([name for name in remote])
# for file in existing:
# Meta = remote.get(file)
# if Meta and not api.file_current(file,Meta['md5']):
# not_synced.add(file)
# elif not Meta:
# not_synced.add(file)
# Todo: remove items that exists and have the md5
# only_remote = remote_names.difference(existing)
# up_to_date = existing.difference(only_remote).difference(not_synced)
# click.echo('File status for ' + click.style('"%s/%s" ' %
# (project,run),bold=True))
# if len(not_synced) > 0:
# click.echo(click.style('Push needed: ',bold=True) +
# click.style(",".join(not_synced),fg="red"))
# if len(only_remote) > 0:
# click.echo(click.style('Pull needed: ',".join(only_remote),fg="red"))
# if len(up_to_date) > 0:
# click.echo(click.style('Up to date: ',".join(up_to_date),fg="green"))
#@cli.command(context_settings=CONTEXT,help="Store notes for a future training run")
def put(local, remote):
"""Put a file or folder and its contents on the board.
Put will upload a local file or folder to the board. If the file already
exists on the board it will be overwritten with no warning! You must pass
at least one argument which is the path to the local file/folder to
upload. If the item to upload is a folder then it will be copied to the
board recursively with its entire child structure. You can pass a second
optional argument which is the path and name of the file/folder to put to
on the connected board.
For example to upload a main.py from the current directory to the board's
root run:
ampy --port /board/serial/port put main.py
Or to upload a board_boot.py from a ./foo subdirectory and save it as boot.py
in the board's root run:
ampy --port /board/serial/port put ./foo/board_boot.py boot.py
To upload a local folder adafruit_library and all of its child files/folders
as an item under the board's root run:
ampy --port /board/serial/port put adafruit_library
Or to put a local folder adafruit_library on the board under the path
/lib/adafruit_library on the board run:
ampy --port /board/serial/port put adafruit_library /lib/adafruit_library
"""
# Use the local filename if no remote filename is provided.
if remote is None:
remote = os.path.basename(os.path.abspath(local))
# Check if path is a folder and do recursive copy of everything inside it.
# Otherwise it's a file and should simply be copied over.
if os.path.isdir(local):
# Directory copy,create the directory and walk all children to copy
# over the files.
board_files = files.Files(_board)
for parent, child_dirs, child_files in os.walk(local):
# Create board filesystem absolute path to parent directory.
remote_parent = posixpath.normpath(posixpath.join(remote, os.path.relpath(parent, local)))
try:
# Create remote parent directory.
board_files.mkdir(remote_parent)
# Loop through all the files and put them on the board too.
for filename in child_files:
with open(os.path.join(parent, filename), 'rb') as infile:
remote_filename = posixpath.join(remote_parent, filename)
board_files.put(remote_filename, infile.read())
except files.DirectoryExistsError:
# Ignore errors for directories that already exist.
pass
else:
# File copy,open the file and copy its contents to the board.
# Put the file on the board.
with open(local, 'rb') as infile:
board_files = files.Files(_board)
board_files.put(remote, infile.read())
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。