Python click 模块,secho() 实例源码
我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用click.secho()。
def write(self, file_path: Path, template: str, context: dict={}, preserve: bool = False):
"""Using a template file name it renders a template
into a file given a context
"""
if not context:
context = self.context
error = False
try:
self._write(file_path, template, context, preserve)
except TemplateSyntaxError as exc:
message = '{0}:{1} error: {2}'.format(exc.filename, exc.lineno, exc.message)
click.secho(message, fg='red')
error = True
except TemplateNotFound as exc:
message = '{0} error: Template not found'.format(exc.name)
click.secho(message, fg='red')
error = True
except TemplateError as exc:
message = 'error: {0}'.format(exc.message)
click.secho(message, fg='red')
error = True
if error and Generator.strict:
sys.exit(-1)
def print_task_deFinition(task_deFinition, indent=" "):
if task_deFinition.arn:
click.secho('{} arn : {}'.format(indent, task_deFinition.arn), fg="cyan")
click.secho('{} family : {}'.format(indent, task_deFinition.family), fg="cyan")
click.secho('{} network_mode : {}'.format(indent, task_deFinition.networkMode), fg="cyan")
if task_deFinition.taskRoleArn:
click.secho('{} task_role_arn : {}'.format(indent, task_deFinition.taskRoleArn), fg="cyan")
click.secho('{} containers:'.format(indent), fg="cyan")
for c in task_deFinition.containers:
click.secho('{} {}:'.format(indent, c.name), fg="cyan")
click.secho('{} image : {}'.format(indent, c.image), fg="cyan")
click.secho('{} cpu : {}'.format(indent, c.cpu), fg="cyan")
click.secho('{} memory : {}'.format(indent, c.memory), fg="cyan")
if c.portMappings:
for p in c.portMappings:
click.secho('{} port : {}'.format(indent, p), fg="cyan")
if c.extraHosts:
for h in c.extraHosts:
click.secho('{} extra_host : {}'.format(indent, h), fg="cyan")
def scale(ctx, service_name, count, dry_run, wait, asg, force_asg):
"""
Set the desired count for service SERVICE_NAME to COUNT.
"""
service = Service(yml=Config(filename=ctx.obj['CONfig_FILE'], env_file=ctx.obj['ENV_FILE']).get_service(service_name))
print
manage_asg_count(service, force_asg)
click.secho('Updating desiredCount on "{}" service in cluster "{}" to {}.'.format(
service.serviceName,
service.clusterName,
count
), fg="white")
if not dry_run:
service.scale(count)
if wait:
click.secho(" Waiting until the service is stable with our new count ...", fg='cyan')
if service.wait_until_stable():
click.secho(" Done.", fg='white')
else:
click.secho(" FAILURE: the service Failed to start.", fg='red')
sys.exit(1)
def delete(ctx, dry_run):
"""
Delete the service SERVICE_NAME from AWS.
"""
service = Service(yml=Config(filename=ctx.obj['CONfig_FILE'], env_file=ctx.obj['ENV_FILE']).get_service(service_name))
print()
click.secho('Deleting service "{}":'.format(service.serviceName), fg="white")
click.secho(' Service info:', fg="green")
print_service_info(service)
click.secho(' Task DeFinition info:', fg="green")
print_task_deFinition(service.active_task_deFinition)
print()
if not dry_run:
click.echo("If you really want to do this,answer \"{}\" to the question below.\n".format(service.serviceName))
value = click.prompt("What service do you want to delete? ")
if value == service.serviceName:
service.scale(0)
print(" Waiting for our existing containers to die ...")
service.wait_until_stable()
print(" All containers dead.")
service.delete()
print(" Deleted service {} from cluster {}.".format(service.serviceName, service.clusterName))
else:
click.echo("\nNot deleting service \"{}\"".format(service.serviceName))
def print_project_status(ctx):
config = ctx.obj.config
solution_num = ctx.obj.solution_num
project_status = gather_project_status(ctx)
# Print out a 'pretty' message showing project status,first up some project details
click.secho("Project Details", bold=True)
click.echo(" Project Name: " + config["project_name"])
click.echo(" Number of solutions generated: " + str(solution_num))
click.echo(" Latest Solution Folder: '" + config["project_name"] + "/solution" + str(solution_num) + "'")
click.echo(" Language Choice: " + config["language"])
# And Now details about what builds have been run/are passing.
# This section uses lots (too many!) 'conditional expressions' to embed formatting into the output.
click.secho("Build Status", bold=True)
click.echo(" C Simulation: " + (click.style("Pass", fg='green') if "csim_pass" in project_status else (click.style("Fail", fg='red') if "csim_fail" in project_status else (click.style("Run (Can't get status)", fg='yellow') if "csim_done" in project_status else click.style("Not Run", fg='yellow')))))
click.echo(" C Synthesis: " + (click.style("Run", fg='green') if "syn_done" in project_status else click.style("Not Run", fg='yellow')))
click.echo(" Cosimulation: " + (click.style("Pass", fg='green') if "cosim_pass" in project_status else (click.style("Fail", fg='red') if "cosim_fail" in project_status else (click.style("Run (Can't get status)", fg='yellow') if "cosim_done" in project_status else click.style("Not Run", fg='yellow')))))
click.echo(" Export:" )
click.echo(" IP Catalog: " + (click.style("Run", fg='green') if "export_ip_done" in project_status else click.style("Not Run", fg='yellow')))
click.echo(" System Generator: " + (click.style("Run", fg='green') if "export_sysgen_done" in project_status else click.style("Not Run", fg='yellow')))
click.echo(" Export Evaluation: " + (click.style("Run", fg='green') if "evaluate_done" in project_status else click.style("Not Run", fg='yellow')))
### Click Command DeFinitions ###
# Report Command
def lumogon(file):
"""
Lumogon scans the output from the lumogon container
inspection tool
"""
cve_data = load_vulnerability_database()
containers = load_data(file)["containers"]
for container in containers:
click.secho("==> Scanning %s" % containers[container]["container_name"], fg="blue")
packages = containers[container]["capabilities"]["dpkg"]["payload"]
host = containers[container]["capabilities"]["host"]["payload"]
os = DEBIAN_CODENAMES[math.floor(float(host["platformversion"]))]
for package in sorted(packages):
version = packages[package]
vulns = determine_cves(package, version, os, cve_data)
print_vulns(package, vulns)
def run_add_system(name, token, org, system, prompt):
"""
Adds a new system to the repo.
"""
repo = get_repo(token=token, org=org, name=name)
try:
repo.create_label(name=system.strip(), color=SYstem_LABEL_COLOR)
click.secho("Successfully added new system {}".format(system), fg="green")
if prompt and click.confirm("Run update to re-generate the page?"):
run_update(name=name, token=token, org=org)
except GithubException as e:
if e.status == 422:
click.secho(
"Unable to add new system {},it already exists.".format(system), fg="yellow")
return
raise
def get_config(repo):
"""
Get the config for the repo,merged with the default config. Returns the default config if
no config file is found.
"""
files = get_files(repo)
config = DEFAULT_CONfig
if "config.json" in files:
# get the config file,parse JSON and merge it with the default config
config_file = repo.get_file_contents('/config.json', ref="gh-pages")
try:
repo_config = json.loads(config_file.decoded_content.decode("utf-8"))
config.update(repo_config)
except ValueError:
click.secho("WARNING: Unable to parse config file. Using defaults.", fg="yellow")
return config
def export(mark, file, fenv):
""" Export U-Boot environment variables """
try:
envimg = uboot.EnvImgOld(start_string=mark)
envimg.open_img(file)
with open(fenv, 'w') as f:
f.write(envimg.store())
except Exception as e:
click.echo(str(e) if str(e) else "UnkNown Error !")
sys.exit(ERROR_CODE)
click.secho("Environment variables saved into: %s" % fenv)
# U-Boot envimg: Update U-Boot environment variables
def create(size, redundant, bigendian, infile, outfile):
""" Create new image from attached file """
try:
env = uboot.EnvBlob(size=size, redundant=redundant, bigendian=bigendian)
with open(infile, 'r') as f:
env.load(f.read())
with open(outfile, 'wb') as f:
f.write(env.export())
except Exception as e:
click.echo(str(e) if str(e) else "UnkNown Error !")
sys.exit(ERROR_CODE)
click.secho(" Successfully created: %s" % outfile)
# U-Boot mkenv: Extract image content
def extract(offset, size, file):
""" Extract image content """
try:
fileName, _ = os.path.splitext(file)
env = uboot.EnvBlob(size=size)
with open(file, "rb") as f:
f.seek(offset)
env.parse(f.read())
with open(fileName + '.txt', 'w') as f:
f.write(env.store())
except Exception as e:
click.echo(str(e) if str(e) else "UnkNown Error !")
sys.exit(ERROR_CODE)
click.secho(" Successfully extracted: %s.txt" % fileName)
def _report_test(test, passed, slack):
# overwrite globals with test specifics
merged = _config.copy()
merged.update(test['slack'])
merged.pop('text', None)
merged['as_user'] = 'false'
if passed:
logger.info(test['test_name'] + ": passed")
click.secho("passed", fg='green')
slack.api_call("chat.postMessage",
text=test['test_name'] + ": passed. ",
**merged)
else:
logger.info(test['test_name'] + ": Failed")
click.secho("Failed", fg='red')
slack.api_call("chat.postMessage",
text=test['test_name'] + ": Failed. ",
**merged)
def run(self):
click.echo('{Now}\n{bottery} version {version}'.format(
Now=datetime.Now().strftime('%B %m,%Y - %H:%M:%s'),
bottery=click.style('Bottery', fg='green'),
version=bottery.__version__
))
self.loop.run_until_complete(self.configure())
if self._server is not None:
handler = self.server.make_handler()
setup_server = self.loop.create_server(handler, '0.0.0.0', 7000)
self.loop.run_until_complete(setup_server)
click.echo('Server running at http://localhost:7000')
if not self.tasks:
click.secho('No tasks found.', fg='red')
self.stop()
sys.exit(1)
for task in self.tasks:
self.loop.create_task(task())
click.echo('Quit the bot with CONTROL-C')
self.loop.run_forever()
def standings(self, league_table, league):
""" Prints the league standings in a pretty way """
click.secho("-"*100, fg="green", bold=True)
click.secho("%-2s %-5s %-25s %-10s %-10s %-11s %-11s %-10s" %
("GP", "POS", "CLUB", "PLAYED", "GOALS", "GOAL LOSS", "GOAL DIFF","POINTS"))
click.secho("-"*100, bold=True)
for key in sorted(league_table["standings"].keys()):
for team in league_table["standings"][key]:
if team["goalDifference"] >= 0:
team["goalDifference"] = ' ' + str(team["goalDifference"])
team_str = (u"{group:<3} {rank:<5} {team:<25} "
u"{playedGames:<10} {goals:<10} {goalsAgainst:<10}"
u" {goalDifference:<12} {points}").format(**team)
click.secho(team_str, bold=True, fg=self.colors[team['group']])
click.secho("-"*100, bold=True)
def fixtures(self, league_table):
click.secho("-"*120, bold=True)
click.secho("%-27s %-27s %-6s %-6s %-22s %-10s %-8s %-6s" %
("HOME TEAM", "AWAY TEAM", "RES","MD",
"DATE", "STATUS", "ID", "LEAGUE"))
click.secho("-"*120, bold=True)
for fixture in league_table['fixtures']:
fixture['league'] = LEAGUE_NAMES[fixture['competitionId']]
if fixture['result']['goalsAwayTeam'] is None:
fixture['result'] = 'None'
else:
fixture['result'] = '{0}:{1}'.format(fixture['result']['goalsHomeTeam'], fixture['result']['goalsAwayTeam'])
fixture_str= (u"{homeTeamName:<27} {awayTeamName:<27} "
u"{result:<6} {matchday:<6} {date:<22} "
u"{status:<10} {id:<8} {league}").format(**fixture)
click.secho(fixture_str, fg=self.colors.TEXT)
click.secho("-"*120, bold=True)
def ensure_module(module_name: str):
"""
Makes sure that a module is importable.
In case the module cannot be found,print an error and exit.
Args:
module_name: name of the module to look for
"""
try:
importlib.import_module(module_name)
except ModuleNotFoundError:
click.secho(
f'Module not found: {module_name}\n'
f'Install it manually with: "pip install {module_name}"\n'
f'Or install all dependencies with: "pip install -r requirements-dev.txt"',
fg='red', err=True)
exit(-1)
def _get_version(ctx: click.Context):
if _get_version.leave_me_alone_already:
return
if not hasattr(ctx, 'obj') or ctx.obj is None:
ctx.obj = {}
try:
from emft.__version_frozen__ import __version__, __pep440__
ctx.obj['semver'] = __version__
ctx.obj['pep440'] = __pep440__
except ModuleNotFoundError:
ctx.invoke(pin_version)
click.secho(f"Semver: {ctx.obj['semver']}", fg='green')
click.secho(f"PEP440: {ctx.obj['pep440']}", fg='green')
_get_version.leave_me_alone_already = True
def handle_result(f):
@wraps(f)
def wrapper(*args, **kwargs):
while True:
try:
return f(*args, **kwargs)
except UnauthorizedError:
url = kwargs.get('url')
click.echo('Please login')
subprocess.call(['scm', 'login', url])
break
except requests.ConnectionError:
click.secho('Can not connect to content manager!', fg='red')
break
except Exception as e:
click.secho(str(e), fg='red')
return wrapper
def cli(action, config_file, target, dev_addr, verbose):
"""
harrier - Jinja2 & sass/scss aware site builder
"""
is_live = action == 'serve' # Todo add watch
is_served = action == 'serve'
setup_logging(verbose, times=is_live)
try:
config = Config(config_file)
target = target or action
config.setup(target, served_direct=is_served)
if action == 'serve':
watch(config)
else:
assert action == 'build'
build(config)
except HarrierProblem as e:
msg = 'Error: {}'
if not verbose:
msg += ',use "--verbose" for more details'
click.secho(msg.format(e), fg='red', err=True)
def _check_sem_version(self, spec):
try:
if semantic_version.Version(version) in spec:
return version
else:
click.secho(
'Error: Invalid semantic version ({0})'.format(
self.specversion),
fg='red')
exit(1)
except ValueError:
click.secho(
'Error: Invalid semantic version ({0})'.format(
self.specversion),
fg='red')
exit(1)
def _download(self, url):
# Note: here we check only for the version of locally installed
# packages. For this reason we don't say what's the installation
# path.
if self.profile.check_package_version(self.package, self.version) \
or self.force_install:
fd = FileDownloader(url, self.packages_dir)
filepath = fd.get_filepath()
click.secho('Download ' + basename(filepath))
try:
fd.start()
except KeyboardInterrupt:
if isfile(filepath):
remove(filepath)
click.secho('Abort download!', fg='red')
exit(1)
return filepath
else:
click.secho('Already installed. Version {0}'.format(
self.profile.get_package_version(self.package)), fg='yellow')
return None
def copy_example_files(self, example, project_dir, sayno):
if isdir(self.examples_dir):
project_dir = util.check_dir(project_dir)
example_path = project_dir
local_example_path = util.safe_join(self.examples_dir, example)
if isdir(local_example_path):
self._copy_files(example, local_example_path,
example_path, sayno)
else:
click.secho(EXAMPLE_NOT_FOUND_MSG, fg='yellow')
else:
click.secho('Error: examples are not installed', fg='red')
click.secho('Please run:\n'
' apio install examples', fg='yellow')
return 1
return 0
def _enable_darwin(self):
# Check homebrew
brew = subprocess.call('which brew > /dev/null', shell=True)
if brew != 0:
click.secho('Error: homebrew is required', fg='red')
else:
click.secho('Enable FTDI drivers for FPGA')
subprocess.call(['brew', 'update'])
subprocess.call(['brew', 'install', '--force', 'libftdi'])
subprocess.call(['brew', 'unlink', 'link', 'libffi'])
subprocess.call(['brew', 'libffi'])
self.profile.add_setting('macos_drivers', True)
self.profile.save()
click.secho('FPGA drivers enabled', fg='green')
def api_request(command, organization='FPGAwars'):
result = None
r = None
try:
r = requests.get(
'https://api.github.com/repos/{0}/{1}'.format(
organization, command),
headers=_get_headers())
result = r.json()
r.raise_for_status()
except requests.exceptions.ConnectionError:
click.secho(ERROR_MESSAGE, fg='red')
exit(1)
except Exception as e:
click.secho('Error: ' + str(e), fg='red')
exit(1)
finally:
if r:
r.close()
if result is None:
click.secho('Error: wrong data from GitHub API', fg='red')
exit(1)
return result
def cli(ctx, lsftdi, lsusb, info):
"""System tools.\n
Install with `apio install system`"""
exit_code = 0
if lsftdi:
exit_code = System().lsftdi()
elif lsusb:
exit_code = System().lsusb()
elif info:
click.secho('Platform: ', nl=False)
click.secho(get_systype(), fg='yellow')
else:
click.secho(ctx.get_help())
ctx.exit(exit_code)
def cli(ctx, list, dir, files, sayno):
"""Manage verilog examples.\n
Install with `apio install examples`"""
exit_code = 0
if list:
exit_code = Examples().list_examples()
elif dir:
exit_code = Examples().copy_example_dir(dir, sayno)
elif files:
exit_code = Examples().copy_example_files(files, sayno)
else:
click.secho(ctx.get_help())
click.secho(Examples().examples_of_use_cad())
ctx.exit(exit_code)
def cli(ctx, packages, all, force, platform):
"""Install packages."""
if packages:
for package in packages:
Installer(package, platform, force).install()
elif all: # pragma: no cover
packages = Resources(platform).packages
for package in packages:
if package == 'pio-fpga': # skip pio-fpga
continue
Installer(package, force).install()
elif list:
Resources(platform).list_packages(installed=True, notinstalled=True)
else:
click.secho(ctx.get_help())
def cli(ctx):
"""Check the latest Apio version."""
current_version = get_distribution('apio').version
latest_version = get_pypi_latest_version()
if latest_version is None:
ctx.exit(1)
if latest_version == current_version:
click.secho('You\'re up-to-date!\nApio {} is currently the '
'newest version available.'.format(latest_version),
fg='green')
else:
click.secho('You\'re not updated\nPlease execute '
'`pip install -U apio` to upgrade.',
fg="yellow")
def _check_package(name, path=''):
is_dir = isdir(path)
if not is_dir:
click.secho(
'Error: {} toolchain is not installed'.format(name), fg='red')
if config_data: # /etc/apio.json file exists
if _check_apt_get():
click.secho('Please run:\n'
' apt-get install apio-{}'.format(name),
fg='yellow')
else:
click.secho('Please run:\n'
' apio install {}'.format(name), fg='yellow')
else:
click.secho('Please run:\n'
' apio install {}'.format(name), fg='yellow')
return is_dir
def get_pypi_latest_version():
r = None
version = None
try:
r = requests.get('https://pypi.python.org/pypi/apio/json')
version = r.json()['info']['version']
r.raise_for_status()
except requests.exceptions.ConnectionError:
click.secho('Error: Could not connect to Pypi.\n'
'Check your internet connection and try again', fg='red')
except Exception as e:
click.secho('Error: ' + str(e), fg='red')
finally:
if r:
r.close()
return version
def check_dir(_dir):
if _dir is None:
_dir = os.getcwd()
if isfile(_dir):
click.secho(
'Error: project directory is already a file: {0}'.format(_dir),
fg='red')
exit(1)
if not exists(_dir):
try:
os.makedirs(_dir)
except OSError:
pass
return _dir
def synthesize(access_key, secret_key, output_file, voice_name, voice_language,
codec, text):
"""Synthesize passed text and save it as an audio file"""
try:
ivona_api = IvonaAPI(
access_key,
voice_name=voice_name, language=voice_language, codec=codec,
)
except (ValueError, IvonaAPIException) as e:
raise click.ClickException("Something went wrong: {}".format(repr(e)))
with click.open_file(output_file, 'wb') as file:
ivona_api.text_to_speech(text, file)
click.secho(
"File successfully saved as '{}'".format(output_file),
fg='green',
)
def list_voices(access_key, voice_gender):
"""List available Ivona voices"""
try:
ivona_api = IvonaAPI(access_key, secret_key)
except (ValueError, IvonaAPIException) as e:
raise click.ClickException("Something went wrong: {}".format(repr(e)))
click.echo("Listing available voices...")
voices_list = ivona_api.get_available_voices(
language=voice_language,
gender=voice_gender,
)
# Group voices by language
voices_dict = dict()
data = sorted(voices_list, key=lambda x: x['Language'])
for k, g in groupby(data, key=lambda x: x['Language']):
voices_dict[k] = list(g)
for ln, voices in voices_dict.items():
voice_names = [v['Name'] for v in voices]
click.echo("{}: {}".format(ln, ','.join(voice_names)))
click.secho("All done", fg='green')
def info(ctx, spec_file):
"""
Describes the specification provided.
"""
if ctx.obj.debug:
click.secho(
"Specification: " + click.format_filename(spec_file),
fg="yellow"
)
spec = Specification(spec_file)
if 'info' in spec:
version = spec['info']['version']
title = spec['info']['title']
spec_license = spec['info']['license']['name'] or 'UnkNown'
banner = f"{title} - {version}. {spec_license} licensed"
click.secho(banner, fg='green')
else:
click.secho(f"No info was found in {spec}.", fg="red")
def combine_command(ctx,
fg="yellow"
)
spec = Specification(spec_file)
combined_spec_schema = spec.combine().schema()
# Todo: If -y,format as yaml
print(yaml.dump(combined_spec_schema))
def spec_command(ctx,
fg="yellow"
)
spec = Specification(spec_file)
# This is britle as it assumes info fields are defined in the spec.
if 'info' in spec:
version = spec['info']['version']
title = spec['info']['title']
spec_license = spec['info']['license']['name'] or 'UnkNown'
banner = f"{title} - v{version}. {spec_license} licensed"
click.secho(banner, fg="red")
# Todo: Implement linting of a spec.
# Todo: implement validation of a spec
def update(taxids, conn, force_download, silent):
"""Update local UniProt database"""
if not silent:
click.secho("WARNING: Update is very time consuming and can take several "
"hours depending which organisms you are importing!", fg="yellow")
if not taxids:
click.echo("Please note that you can restrict import to organisms by "
"NCBI taxonomy IDs")
click.echo("Example (human,mouse,rat):\n")
click.secho("\tpyuniprot update --taxids 9606,10090,10116\n\n", fg="green")
if taxids:
taxids = [int(taxid.strip()) for taxid in taxids.strip().split(',') if re.search('^ *\d+ *$', taxid)]
database.update(taxids=taxids, connection=conn, force_download=force_download, silent=silent)
def _read_conf(conf, format):
try:
conf = parse_yaml(conf)
except:
click.secho(
'Missing configuration. Did you put a `beeper.yml` file?',
blink=True,
fg='red'
)
sys.exit(1)
conf.setdefault('language', 'python')
conf.setdefault('python', 'python')
conf.setdefault('postinstall', [])
conf.setdefault('postinstall_commands', '\n'.join(conf.get('postinstall')))
conf.setdefault('manifest', set())
conf.setdefault('current_dir', os.environ.get('WORK_DIR') or os.getcwd())
conf.setdefault('scripts', [])
conf['postbuild'] = conf['scripts']
conf['version'] = version
conf['manifest'] = set(conf['manifest'])
conf['format'] = format
return conf
def prompt_overwrite(file_name):
# Skip if file doesn't exist
if not os.path.exists(file_name):
return True
# Prompt for file overwrite if outfile already exists
fmt = "File ({}) already exists. Do you want to overwrite? (y/n): "
message = fmt.format(file_name)
click.secho(message, nl=False, fg="red")
choice = click.getchar()
click.echo()
if choice not in ("y", "Y"):
return False
return True
def cli(*args, **kwargs):
"""
CSVtoTable commandline utility.
"""
# Convert CSV file
content = convert.convert(kwargs["input_file"], **kwargs)
# Serve the temporary file in browser.
if kwargs["serve"]:
convert.serve(content)
# Write to output file
elif kwargs["output_file"]:
# Check if file can be overwrite
if (not kwargs["overwrite"] and
not prompt_overwrite(kwargs["output_file"])):
raise click.Abort()
convert.save(kwargs["output_file"], content)
click.secho("File converted successfully: {}".format(
kwargs["output_file"]), fg="green")
else:
# If its not server and output file is missing then raise error
raise click.BadOptionUsage("Missing argument \"output_file\".")
def history(config, num_pages, start, output, arena):
"""Get your game history"""
_check_creds(config)
games = []
count = 0
while count != num_pages:
config.logger.debug('Getting page %d of history', start)
if arena:
history = config.trackobot.arena_history(page=start)
else:
history = config.trackobot.history(page=start)
config.logger.debug('Extending games list')
games.extend(history['history'])
count += 1
start += 1
if start > history['Meta']['total_pages']:
config.logger.info('Hit max pages on account')
break
config.logger.debug('Dumping game history to %s', output)
with open(output, 'w') as f:
json.dump(games, f)
click.secho('Wrote {} games to {}'.format(len(games), output), fg='green')
def __init__(self, *args, **kwargs):
import sys
invalid = []
unicodes = ["\u2018", "\u2019", "\u201C", "\u201D", "\u2014"]
for command in sys.argv:
if any(x in command for x in unicodes):
invalid.append(command)
if invalid:
click.secho("Error: Detected invalid character in: %s\n"
"Verify the correct quotes or dashes (ASCII) are "
"being used." %
','.join(invalid), err=True, bold=True)
sys.exit(-1)
super().__init__(*args, **kwargs)
# Plugin state for current deployment that will be loaded from cache.
# Used to construct the dynamic CLI.
self._plugins = None
def validate(path, level):
import qiime2.sdk
try:
artifact = qiime2.sdk.Artifact.load(path)
except Exception as e:
header = 'There was a problem loading %s as a QIIME 2 Artifact:' % path
q2cli.util.exit_with_error(e, header=header)
try:
artifact.validate(level)
except qiime2.plugin.ValidationError as e:
header = 'Artifact %s does not appear to be valid at level=%s:' % (
path, level)
with open(os.devnull, 'w') as dev_null:
q2cli.util.exit_with_error(e, header=header, file=dev_null,
suppress_footer=True)
except Exception as e:
header = ('An unexpected error has occurred while attempting to '
'validate artifact %s:' % path)
q2cli.util.exit_with_error(e, header=header)
else:
click.secho('Artifact %s appears to be valid at level=%s.'
% (path, level), fg="green")
def _echo_citations():
import q2cli.cache
click.secho('\nCitations', fg='green')
click.secho('QIIME 2 framework and command line interface', fg='cyan')
click.secho('Pending a QIIME 2 publication,please cite QIIME using the '
'original publication: '
'http://www.ncbi.nlm.nih.gov/pubmed/20383131')
plugins = q2cli.cache.CACHE.plugins
if plugins:
for name, plugin in sorted(plugins.items()):
click.secho('\n%s %s' % (name, plugin['version']), fg='cyan')
click.secho(plugin['citation_text'])
else:
click.secho('\nNo plugins are currently installed.\nYou can browse '
'the official QIIME 2 plugins at https://qiime2.org')
def exit_with_error(e, header='An error has been encountered:', file=None,
suppress_footer=False):
import sys
import traceback
import textwrap
import click
if file is None:
file = sys.stderr
footer = 'See above for debug info.'
else:
footer = 'Debug info has been saved to %s' % file.name
error = textwrap.indent(str(e), ' ')
segments = [header, error]
if not suppress_footer:
segments.append(footer)
traceback.print_exception(type(e), e, e.__traceback__, file=file)
file.write('\n')
click.secho('\n\n'.join(segments), err=True)
click.get_current_context().exit(1)
def init(directory: str, dsn: str):
"""
Initializes a migrations directory.
"""
try:
os.makedirs(directory)
except FileExistsError:
click.secho("Unable to make directory (it exists)!", fg='red')
if dsn is not None:
dsn = '"{}"'.format(dsn)
click.secho("Writing env.py...", fg='cyan')
(Path(directory) / "env.py").write_text(env_file.format(dsn=dsn))
click.secho("Making versions directory...", fg='cyan')
(Path(directory) / "versions").mkdir(mode=0o755)
(Path(directory) / "README").write_text("Basic asql-migrate setup.")
click.secho("Done!", fg='green')
def new(message: str):
"""
Creates a new migration file.
"""
files = _get_files()
# build the message filename
next_num = len(files) + 1
f_message = list(' '.join(message)[:32].lower().replace(" ", "_"))
filename_message = ''.join(filter(lambda c: c in string.ascii_lowercase + "_", f_message))
f_name = "{:03d}_{}.py".format(next_num, filename_message)
# format the template
formatted_file = migration_template.format(revision=next_num, message=' '.join(message))
p = migrations_dir / "versions" / f_name
p.write_text(formatted_file)
click.secho("Created new migration file {}.".format(f_name))
def validate(text, file):
"""Validate JSON input using dependencies-schema"""
content = None
if text:
print('Validating text input...')
content = text
if file:
print('Validating file input...')
content = file.read()
if content is None:
click.secho('Please give either text input or a file path. See help for more details.', fg='red')
exit(1)
try:
validate_json(content)
click.secho('Valid JSON schema!', fg='green')
except Exception as e:
click.secho('Invalid JSON schema!', fg='red')
raise e
def connect(config):
global session
# Connect to cassandra
auth_provider = None
if config.get('user'):
auth_provider = PlainTextAuthProvider(username=config.get('user'), password=config.get('password'))
cluster = Cluster(
contact_points=config.get('seeds'),
port=(int(config.get('port')) if config.get('port') else 9042),
auth_provider=auth_provider
)
try:
session = cluster.connect()
except:
click.secho("Unable to connect to Cassandra", fg='red')
sys.exit()
return session
def create_migration_table(keyspace):
session.set_keyspace(keyspace)
click.echo("Creating shift_migrations table... ", nl=False)
try:
session.execute(
"""
CREATE TABLE IF NOT EXISTS shift_migrations(
type text,
time timeuuid,
migration text,
hash text,
PRIMARY KEY (type,time)
)
WITH CLUSTERING ORDER BY(time DESC)
"""
)
click.secho('OK', fg='green', bold=True)
return (True, None)
except Exception as e:
click.secho('ERROR', bold=True)
return (False, e)
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。