Python click 模块,confirm() 实例源码
我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用click.confirm()。
def delete(experiment, project):
"""Delete experiment group."""
user, project_name = get_project_or_local(project)
if not click.confirm("Are sure you want to delete experiment `{}`".format(experiment)):
click.echo('Existing without deleting experiment.')
sys.exit(1)
try:
response = polyaxonClients().experiment.delete_experiment(
user, project_name, experiment)
except (polyaxonHTTPError, polyaxonShouldExitError) as e:
Printer.print_error('Could not delete experiment `{}`.'.format(experiment))
Printer.print_error('Error message `{}`.'.format(e))
sys.exit(1)
if response.status_code == 204:
Printer.print_success("Experiment `{}` was delete successfully".format(experiment))
def stop(experiment, project):
"""Get experiment by uuid.
Examples:
polyaxon experiment stop 2
```
"""
user,project_name = get_project_or_local(project)
if not click.confirm("Are sure you want to stop experiment `{}`".format(experiment)):
click.echo('Existing without stopping experiment.')
sys.exit(0)
try:
polyaxonClients().experiment.stop(user,project_name,experiment)
except (polyaxonHTTPError,polyaxonShouldExitError) as e:
Printer.print_error('Could not stop experiment `{}`.'.format(experiment))
Printer.print_error('Error message `{}`.'.format(e))
sys.exit(1)
Printer.print_success("Experiment is being stopped.")
```
def delete(project):
"""Delete project."""
user, project_name = get_project_or_local(project)
if not click.confirm("Are sure you want to delete project `{}/{}`".format(user, project_name)):
click.echo('Existing without deleting project.')
sys.exit(1)
try:
response = polyaxonClients().project.delete_project(user, project_name)
except (polyaxonHTTPError, polyaxonShouldExitError) as e:
Printer.print_error('Could not delete project `{}`.'.format(project))
Printer.print_error('Error message `{}`.'.format(e))
sys.exit(1)
if response.status_code == 204:
Printer.print_success("Project `{}` was delete successfully".format(project))
def build_end_callback(ctx,sub_command_returns,keep,report):
# Catch the case where no subcommands have been issued and offer a default build
if not sub_command_returns:
if click.confirm("No build stages specified,would you like to run a default sequence using all the build stages?", abort=True):
do_default_build(ctx)
ctx.obj.file.write("exit" + "\n")
ctx.obj.file.close()
# Call the Vivado HLS process
hls_processs = subprocess.run(["vivado_hls", "-f", "run_hls.tcl"])
# Check return status of the HLS process.
if hls_processs.returncode < 0:
raise click.Abort()
elif hls_processs.returncode > 0:
click.echo("Warning: HLS Process returned an error,skipping report opening!")
raise click.Abort()
else:
do_end_build_stuff(ctx,report)
# csim subcommand
def init(banner, hidden, backup):
"""Initialize a manage shell in current directory
$ manage init --banner="My awesome app shell"
initializing manage...
creating manage.yml
"""
manage_file = HIDDEN_MANAGE_FILE if hidden else MANAGE_FILE
if os.path.exists(manage_file):
if not click.confirm('Rewrite {0}?'.format(manage_file)):
return
if backup:
bck = '.bck_{0}'.format(manage_file)
with open(manage_file, 'r') as source, open(bck, 'w') as bck_file:
bck_file.write(source.read())
with open(manage_file, 'w') as output:
data = default_manage_dict
if banner:
data['shell']['banner']['message'] = banner
output.write(yaml.dump(data, default_flow_style=False))
def run_add_system(name, token, org, system, prompt):
"""
Adds a new system to the repo.
"""
repo = get_repo(token=token, org=org, name=name)
try:
repo.create_label(name=system.strip(), color=SYstem_LABEL_COLOR)
click.secho("Successfully added new system {}".format(system), fg="green")
if prompt and click.confirm("Run update to re-generate the page?"):
run_update(name=name, token=token, org=org)
except GithubException as e:
if e.status == 422:
click.secho(
"Unable to add new system {},it already exists.".format(system), fg="yellow")
return
raise
def call(self, args, devnull=False):
"""Call other processes.
args - list of command args
devnull - whether to pipe stdout to /dev/null (or equivalent)
"""
if self.debug:
click.echo(subprocess.list2cmdline(args))
click.confirm('Continue?', default=True, abort=True)
try:
kwargs = {}
if devnull:
# Pipe to /dev/null (or equivalent).
kwargs['stderr'] = subprocess.STDOUT
kwargs['stdout'] = self.FNULL
ret_code = subprocess.call(args, **kwargs)
except subprocess.CalledProcessError:
return False
return ret_code
def undeploy(ctx):
"""Removes OpenShift from the cluster."""
print("Preparing to remove OpenShift...")
if not ctx.init_with_checks():
print("Failed cursory checks,exiting.")
exit(1)
if ctx.auto_confirm:
print("Auto confirm (-y) option set,clearing existing installation.")
else:
print("Really consider the decision you're about to make.")
if not click.confirm("Do you want to clear the existing installation?"):
print("Okay,cancelling.")
exit(1)
ctx.delete_namespace_byname("openshift-origin")
print("Note: namespaces with openshift finalizer will need to be manually deleted if desired.")
print("See: https://github.com/paralin/openshift-under-kubernetes/blob/master/REMOVING_OPENSHIFT.md")
def do_tag(config, force, src, requirement, yes):
tag = config.version
if not yes:
click.confirm('Tag project with {}?'.format(tag), abort=True)
if force:
force_cmd = ['-f']
else:
force_cmd = []
if call(['git', 'diff', '--exit-code']) != 0:
raise click.ClickException("Please commit first.")
if call(['git', '--exit-code', '--cached']) != 0:
raise click.ClickException("Please commit first.")
out = check_output(['git', 'ls-files', '--other', '--exclude-standard',
'--directory'])
if out:
click.echo(out)
raise click.ClickException("Please commit first.")
do_tag_requirements(config, yes=True)
check_call(['git', 'tag'] + force_cmd + [tag])
check_call(['git', 'push', '-q'] + force_cmd + ['origin', 'tag', tag])
def handle_unexpected_errors(f):
"""Decorator that catches unexpected errors."""
@wraps(f)
def call_f(*args, **kwargs):
try:
return f(*args, **kwargs)
except Exception as e:
errorf = StringIO()
print_exc(file=errorf)
error = errorf.getvalue()
click.echo(
"Looks like there's a bug in our code. Sorry about that!"
" Here's the traceback:\n" + error)
if click.confirm(
"Would you like to file an issue in our issue tracker?",
default=True, abort=True):
url = "https://github.com/datawire/pib/issues/new?body="
body = quote_plus(BUG_REPORT_TEMPLATE.format(
os.getcwd(), __version__, python_version,
run_result(["uname", "-a"]), error))
webbrowser.open_new(url + body)
return call_f
def send(ctx, address, amount, denomination, use_unconfirmed, verbose):
"""Send a specified address some satoshis.
\b
Usage
-----
Send 5000 satoshi from your on-chain balance to the Apache Foundation.
$ 21 send 1BtjAzWGLyAavUkbw3QsyzzNDKdtPXk95D 5000 satoshis
You can use the following denominations: satoshis,bitcoins,and USD.
By default,this command uses only confirmed transactions and
UTXOs to send coins. To use unconfirmed transactions,use the
--use-unconfirmed flag.
"""
if denomination == '':
confirmed = click.confirm(uxstring.UxString.default_price_denomination, default=True)
if not confirmed:
raise exceptions.Two1Error(uxstring.UxString.cancel_command)
denomination = currency.Price.SAT
price = currency.Price(amount, denomination)
return _send(ctx.obj['wallet'], price.satoshis, verbose, use_unconfirmed)
def convert_amount_to_satoshis_with_prompt(amount, denomination):
""" Converts and amount with denomination to satoshis. Prompts user if no denomination is specified.
Args:
amount (float): representing the amount to flush
denomination (str): One of [satoshis,usd]
Returns (int): converted amount to satoshis.
"""
if amount != 0.0:
if denomination == '':
confirmed = click.confirm(uxstring.UxString.default_price_denomination, default=True)
if not confirmed:
raise exceptions.Two1Error(uxstring.UxString.cancel_command)
denomination = Price.SAT
amount = Price(amount, denomination).satoshis
else:
amount = None
return amount
def build_id2word(self, fname=None, save_to=None):
# read words.csv file
if not fname:
fname = self.words_fname or click.prompt('words file')
fname = self.__dest(fname)
assert os.path.isfile(fname), 'No such file: %s' % fname
if save_to:
self.id2word_fname = self.__dest(save_to)
else:
self.id2word_fname = LdaUtils.change_ext(fname, 'id2word')
# if there is no id2word file or the user wants to rebuild,build .id2word
if not os.path.isfile(self.id2word_fname) or click.confirm('There alread is id2word. Do you want to rebuild?'):
print 'start building id2word'
start = time()
id2word = corpora.Dictionary(LdaUtils.filter_words(LdaUtils.iter_csv(fname, -1).split()))
id2word.save(self.id2word_fname) # save
print 'building id2word takes: %s' % LdaUtils.human_readable_time(time() - start)
self.id2word = corpora.Dictionary.load(self.id2word_fname)
return self.id2word
def build_corpus(self, save_to=None):
# read sentences file
if not fname:
fname = click.prompt('sentences file')
fname = self.__dest(fname)
assert os.path.isfile(fname), 'No such file: %s' % fname
if save_to:
self.corpus_fname = self.__dest(save_to)
else:
self.corpus_fname = LdaUtils.change_ext(fname, 'corpus')
# if there is no corpus file or the user wants to rebuild,build .corpus
if not os.path.isfile(self.corpus_fname) or click.confirm('There already is corpus. Do you want to rebuild?'):
print 'start building corpus'
start = time()
corpora.MmCorpus.serialize(self.corpus_fname, self.__iter_doc2bow(LdaUtils.iter_csv(fname, -1).split())) # save
print 'building corpus takes: %s' % LdaUtils.human_readable_time(time() - start)
self.corpus = corpora.MmCorpus(self.corpus_fname)
return self.corpus
def build_model(self, save_to=None):
id2word = self.id2word or self.build_id2word()
corpus = self.corpus or self.build_corpus()
# read model.lda file
if not fname:
fname = click.prompt('model file name', type=str, default='model.lda')
fname = self.__dest(fname)
# if there is no model file or the user wants to rebuild,build .model
if not os.path.isfile(fname) or click.confirm('There already is %s. Do you want to re run lda?' % fname):
num_procs = click.prompt('Number of processes to launch',
type=int,
default=multiprocessing.cpu_count())
num_epochs = click.prompt('Number of epochs to run', type=int, default=20)
num_topics = click.prompt('Number of topics', default=100)
print 'start building model'
start = time()
model = Ldamulticore(corpus, id2word=id2word, num_topics=num_topics, workers=num_procs, passes=num_epochs)
model.save(fname) #save
print 'building model takes: %s' % LdaUtils.human_readable_time(time() - start)
self.model = Ldamulticore.load(fname)
return self.model
def generate_resources(resources, path):
if len(resources) > 1 and not os.path.isdir(path):
if os.path.exists(path):
sys.exit('%s needs to be a directory to create multiple '
'resource files' % click.format_filename(path))
os.mkdir(path)
for resource in resources:
if len(resources) > 1 or os.path.isdir(path):
resource_path = '%s/%s' % (path, resource)
else:
resource_path = path
if os.path.exists(resource_path) and not \
click.confirm('%s already exists. Do you want to overwrite it?' %
click.format_filename(resource_path)):
continue
shutil.copy('%s/resources/%s' % (paths.DATA, resource), path)
click.echo('Created %s at path %s' % (resource,
click.format_filename(path)))
def config_examples(dest, user_dir):
""" copy the example workflows to a directory.
\b
DEST: Path to which the examples should be copied.
"""
examples_path = Path(lightflow.__file__).parents[1] / 'examples'
if examples_path.exists():
dest_path = Path(dest).resolve()
if not user_dir:
dest_path = dest_path / 'examples'
if dest_path.exists():
if not click.confirm('Directory already exists. Overwrite existing files?',
default=True, abort=True):
return
else:
dest_path.mkdir()
for example_file in examples_path.glob('*.py'):
shutil.copy(str(example_file), str(dest_path / example_file.name))
click.echo('copied examples to {}'.format(str(dest_path)))
else:
click.echo('The examples source path does not exist')
def follow(ctx, nick, url, force):
"""Add a new source to your followings."""
source = Source(nick, url)
sources = ctx.obj['conf'].following
if not force:
if source.nick in (source.nick for source in sources):
click.confirm("? You’re already following {0}. Overwrite?".format(
click.style(source.nick, bold=True)), default=False, abort=True)
_, status = get_remote_status([source])[0]
if not status or status.status_code != 200:
click.confirm("? The Feed of {0} at {1} is not available. Follow anyway?".format(
click.style(source.nick, bold=True),
click.style(source.url, abort=True)
ctx.obj['conf'].add_source(source)
click.echo("? You’re Now following {0}.".format(
click.style(source.nick, bold=True)))
def validate_text(ctx, param, value):
conf = click.get_current_context().obj["conf"]
if isinstance(value, tuple):
value = " ".join(value)
if not value and not sys.stdin.isatty():
value = click.get_text_stream("stdin").read()
if value:
value = value.strip()
if conf.character_warning and len(value) > conf.character_warning:
click.confirm("? Warning: Tweet is longer than {0} characters. Are you sure?".format(
conf.character_warning), abort=True)
return value
else:
raise click.BadArgumentUsage("Text can’t be empty.")
def remove(inventory_name):
"""
Remove inventories.
"""
if not inventory_name.isalnum():
fatal("Your inventory name should only contains alphanumeric "
"characters.")
dest_path = os.path.join(inventory_path, inventory_name)
if not os.path.exists(dest_path):
fatal("The %s inventory doesn't exist." % inventory_name)
if not click.confirm("Do you want to delete the %s inventory?" %
inventory_name,
default=False):
info("Aborted. nothing has been done.")
return
if os.path.isdir(dest_path) and not os.path.islink(dest_path):
shutil.rmtree(dest_path)
else:
os.remove(dest_path)
success("The %s inventory has been removed." % inventory_name)
def remove(organization_name):
"""
Remove organizations.
"""
if not organization_name.isalnum():
fatal("Your organization name should only contains alphanumeric "
"characters.")
dest_path = get_env_path(organization_name)
if not os.path.exists(dest_path):
fatal("The %s organization doesn't exist." % organization_name)
if not click.confirm("Do you want to delete the %s organization?" %
organization_name,
default=False):
info("Aborted. nothing has been done.")
return
if os.path.isdir(dest_path) and not os.path.islink(dest_path):
shutil.rmtree(dest_path)
else:
os.remove(dest_path)
success("The %s organization has been removed." % organization_name)
def _setup_github():
if (config.has('github', 'enabled') and
config.get('github', 'enabled') == 'false'):
return
if (config.has('github', 'token') and
config.has('github', 'organizations')):
return
if not click.confirm('Do you wish to enable Github integration?',
default=True):
config.set('github', 'enabled', 'false')
return
config.set('github', 'true')
for i in range(0, 3):
try:
_try_setup_github()
break
except GitHubError:
pass
else:
sys.exit(1)
def _ask_project_details(git_root, project):
if not project.initialized():
(app_name, organization, app_id) = _ask_general_info(git_root)
services = _ask_services(organization)
baseBox, baseBox_url = _ask_baseBox()
project.set_name(app_name)
project.set_organization(organization)
project.set_id(app_id)
project.add_Box({
'baseBox': baseBox
}, baseBox_url)
else:
if not click.confirm('There is already a project configured in this '
'folder,do you want to modify it\'s config?'):
fatal('aborting')
services = _ask_services(project.organization(), project.services())
project.set_services(services)
def upload(ctx, yes, local_path, remote_path):
"""Upload files to remote host.
When uploading a single file or directory,the remote path can be
either the full path to upload data into or the path to an existing
directory where the data should be placed. In the latter case,the
base file name from the local path will be used as the remote name.
When uploading multiple files,the remote path must refer to an
existing directory.
Local path Could be glob.
"""
ask_confirm, to_upload = get_files_to_upload(ctx, local_path)
if not yes and ask_confirm:
if not click.confirm("Some files cannot be uploaded. Proceed?"):
ctx.exit(0)
ctx.obj["event_loop"].run_until_complete(
upload_files(ctx, remote_path, sorted(to_upload)))
def init(path, github_client_id, github_client_secret):
path = os.path.expanduser(path)
if not os.path.exists(path):
os.makedirs(path)
config_path = os.path.join(path, 'zeus.config.py')
if os.path.exists(config_path):
click.confirm(
'Configuration file already present at [{}]. Overwrite it?'.format(
config_path),
abort=True
)
with open(config_path, 'wb') as fp:
fp.write(
CONfig.format(
secret_key=repr(binascii.hexlify(os.urandom(24))),
github_client_id=repr(github_client_id),
github_client_secret=repr(github_client_secret),
workspace_root=repr(path),
).encode('utf-8')
)
click.echo('Configuration written at {}'.format(config_path))
def create_database():
"""Delete then create all the database tables."""
if not click.confirm('Are you sure?'):
click.secho('Aborted', fg='red')
return
click.echo('Dropping everything')
db.drop_all()
click.echo('Creating tables')
db.create_all()
click.secho('Done', fg='green')
def clone(force):
"""Clone a copy of the TigerHost project to
a private location and set the project path
to the cloned location.
"""
path = default_project_path()
if os.path.exists(path):
if not force:
click.confirm('Path {} already exists. Continuing will remove this path.'.format(
path), abort=True)
if os.path.isdir(path):
shutil.rmtree(path)
else:
os.remove(path)
click.echo('cloning to {}...'.format(path), nl=False)
clone_project()
click.secho('Done', fg='black', bg='green')
save_project_path(path)
def copy_config(source_path):
if not os.path.isabs(source_path):
source_path = os.path.join(os.getcwd(), source_path)
destination_path = os.path.join(config_path, config_filename)
if source_path and source_path != destination_path:
if os.path.exists(source_path):
if not os.path.exists(destination_path):
common.mkdir(config_path)
copyfile(source_path, destination_path)
else:
output.warning("Destination file %s already exists." % destination_path)
if click.confirm('Do you want to overwrite it?'):
os.remove(destination_path)
copyfile(source_path, destination_path)
else:
output.abort("To run osxstrap without copying config,use the osxstrap command.")
else:
output.abort("Input file %s does not exist." % source_path)
def sample_context(ctx, app, sample_context, config):
with open(sample_context) as fd:
sample_ctx = fd.read()
try:
ujson.loads(sample_ctx)
except ValueError as e:
ctx.fail('Invalid JSON,%s: %s' % (str(e), sample_ctx))
with open(config, 'r') as config_file:
config = yaml.safe_load(config_file)
click.echo('Setting sample contenxt for %s to:\n%s' % (app, sample_ctx))
click.confirm('Do you want to continue?', abort=True)
with db_from_config(config) as (conn, cursor):
cursor.execute('UPDATE `application` SET `sample_context`=%s WHERE `name`=%s;',
(sample_ctx, app))
conn.commit()
click.echo(click.style('All done!', fg='green'))
def delete(ctx, name):
if not click.confirm('Delete cluster {}?'.format(name)):
return
conf = load_config(name)
region = conf['cluster']['zone']
zone = '-'.join(region.split('-')[:2])
context = get_context_from_settings(name)
jupyter, jport, jlport, scheduler, sport, bport = services_in_context(context)
cmd = "kubectl delete services --all --context {0}".format(context)
logger.info(cmd)
call(cmd)
cmd = 'gcloud compute forwarding-rules list --format json'
logger.info(cmd)
out = check_output(cmd)
items = json.loads(out)
for item in items:
if item["IPAddress"] in [jupyter, scheduler]:
assert ('jupyter-notebook' in item['description'] or
'dask-scheduler' in item['description'])
cmd = ('gcloud compute forwarding-rules delete ' +
item['name'] + ' --region ' + zone)
logger.info(cmd)
call(cmd)
call("gcloud container clusters delete {0}".format(name))
def stop(id):
"""
Stop a run before it can finish.
"""
try:
experiment = ExperimentClient().get(normalize_job_name(id))
except FloydException:
experiment = ExperimentClient().get(id)
if experiment.state not in ["queued", "running"]:
floyd_logger.info("Job in {} state cannot be stopped".format(experiment.state))
return
if ExperimentClient().stop(experiment.id):
floyd_logger.info("Experiment shutdown request submitted. Check status to confirm shutdown")
else:
floyd_logger.error("Failed to stop job")
def input_createntials(config):
"""
Read user input and check if credentials are valid.
"""
while True:
try:
config = IO.input_jira_credentials(config.url, config.username, config.password)
BaseFactory.create_jira(config)
except Exception:
IO.error('Credentials not valid')
if click.confirm('Try again?', abort=True):
continue
return config
def create_pub_issue(self, sk_issue):
"""
Migrate SK issue to PUB Jira
:param sk_issue:
:return:
"""
fields = self.convert_fields(sk_issue)
click.echo("\nPlease confirm migration:")
io.print_dict(fields, indent=1)
if not click.confirm('\nMigrate?', default=False):
return None
return self._pub_jira.create_issue(fields=fields)
def list_todo(name):
os.system('clear')
todo_title()
s = select([Category])
result = s.execute()
for r in result:
if r[1] == name:
q = session.query(Items).filter(Items.category_id==r[0]).all()
click.secho(name , fg='cyan', bold=True)
for i in q:
click.secho('>>>' + i.items , fg='white', bold=True)
if click.confirm('Do you want to sync?'):
# click.secho('Well done!',fg='green',bold=True)
F = Firebase()
F.upload_firebase()
def __init__(self, load=True):
if load:
self._parse_cli_args()
self._read_ini_settings()
self._check_ocp_vars()
if not os.path.exists(self.inventory_file) or self.args.create_inventory:
if self.no_confirm:
self._create_inventory_file()
else:
if click.confirm('Overwrite the existing inventory file?'):
self._create_inventory_file()
if self.args.create_ocp_vars or "load_balancer_hostname:" in self.lb_config:
if self.no_confirm:
self._create_ocp_vars()
else:
if click.confirm('Update the OCP install variables?'):
self._create_ocp_vars()
if os.path.exists(self.inventory_file):
self._launch_refarch_env()
def __init__(self, load=True):
if load:
self._parse_cli_args()
self._read_ini_settings()
self._check_ocp_vars()
if not os.path.exists(self.inventory_file) or self.args.create_inventory:
if self.no_confirm:
self._create_inventory_file()
else:
if click.confirm('Overwrite the existing inventory file?'):
self._create_inventory_file()
if self.args.create_ocp_vars or "load_balancer_hostname:" in self.lb_config:
if self.no_confirm:
self._create_ocp_vars()
else:
if click.confirm('Update the OCP install variables?'):
self._create_ocp_vars()
if os.path.exists(self.inventory_file):
self._launch_refarch_env()
def _confirm_and_delete(base_dir, unkNown_jobs, confirm, jenkins_url):
utils.sechowrap('The following jobs are present on the server but not in '
'the local repository:', fg='yellow')
click.secho(' ' + '\n '.join(sorted(unkNown_jobs)), fg='yellow')
if confirm:
utils.sechowrap('')
delete = click.confirm(click.style(click.wrap_text(
'Do you really want to delete these jobs? THIS CANNOT BE '
'RECOVERED!'
), fg='red', bold=True))
else:
delete = True
if delete:
for job in unkNown_jobs:
_, jenkins_url = jenkins_api.handle_auth(base_dir,
jenkins_api.delete_job,
jenkins_url,
job)
def create_first_user(app, created_models, verbosity, db, **kwargs):
if User not in created_models:
return
if not router.allow_syncdb(db, User):
return
if not kwargs.get('interactive', True):
return
import click
if not click.confirm('\nWould you like to create a user account Now?', default=True):
# Not using `abort=1` because we don't want to exit out from further execution
click.echo('\nRun `sentry createuser` to do this later.\n')
return
from sentry.runner import call_command
call_command('sentry.runner.commands.createuser.createuser')
def deploy_app(ctx, tag, yes):
""" trigger a deployment """
ctx.set_git_hash(git_hash=tag)
if not yes:
status.echo(ctx)
click.confirm("Continue?", abort=True, default=True)
app = App(ctx.name, ctx.docker, ctx.tag)
if not ecr.image_exists(app.image):
click.echo("Can't find a docker for {}\naborting..".format(app.image))
return
click.echo("deploying {}".format(ctx.tag))
deployments = deploy.execute(app, force)
deploy.wait_for(deployments)
def config_envdir(ctx, target_dir):
env = Environment(ctx.name)
cfg = env.secret
target_dir = os.path.abspath(target_dir)
if not click.confirm("found {} vars,will write to `{}/*`".format(len(cfg), target_dir)):
click.echo("Exiting..")
sys.exit(1)
if not os.path.exists(target_dir):
click.echo("{} did not exists,creating".format(target_dir))
os.mkdir(target_dir)
else:
if os.path.exists(target_dir) and not os.path.isdir(target_dir):
click.echo("{} exists but isn't a directory,unable to continue".format(target_dir))
sys.exit(2)
config.write_envdir(cfg, target_dir)
def create_config():
'''Create the config file with default close statuses
and an empty github auth_token'''
CONfig['close_status'] = {'Invalid': ['invalid', 'wontfix', 'worksforme'],
'Insufficient data': ['insufficient_info'],
'Duplicate': ['duplicate']}
CONfig['github'] = {'auth_token': ''}
if os.path.exists(CFG_PATH):
if click.confirm('You already have a config file,if you continue '
'your custom settings will be lost'):
with click.open_file(CFG_PATH, 'w+') as config_file:
CONfig.write(config_file)
else:
sys.exit(1)
else:
with click.open_file(CFG_PATH, 'w+') as config_file:
CONfig.write(config_file)
def yaml_wizard(directory):
while True:
command = choose_command(directory)
image = choose_image()
yaml = YAML_SKELLINGTON.format(
image=image,
command=command,
)
click.secho('Here\'s a preview of the Valohai.yaml file I\'m going to create.', fg='cyan')
print(yaml)
yaml_path = os.path.join(directory, 'valohai.yaml')
if not click.confirm('Write this to {path}?'.format(path=click.style(yaml_path, bold=True))): # pragma: no cover
click.echo('Okay,let\'s try again...')
continue
with codecs.open(yaml_path, 'w', 'UTF-8') as out_fp:
out_fp.write(yaml)
success('All done! Wrote {path}.'.format(path=yaml_path))
break
def choose_image():
click.echo(
'Now let\'s pick a Docker image to use with your code.\n'
'Here are some recommended choices,but feel free to type in one of your own.'
)
while True:
image = prompt_from_list(
IMAGE_SUGGESTIONS,
'Choose a number or enter a Docker image name.',
nonlist_validator=lambda s: s.strip()
)
if isinstance(image, dict):
image = image['name']
if click.confirm('Is {image} correct?'.format(image=click.style(image, bold=True))):
break
success('Great! Using {image}.'.format(image=image))
return image
def unlink(yes):
"""
Unlink a linked Valohai project.
"""
dir = get_project_directory()
project = get_project()
if not project:
click.echo('{dir} or its parents do not seem linked to a project.'.format(dir=dir))
return 1
if not yes:
click.confirm(
'Unlink {dir} from {name}?'.format(
dir=click.style(project.directory,
name=click.style(project.name,
),
abort=True,
)
links = settings.get('links', {})
links.pop(dir)
settings['links'] = links
settings.save()
success('Unlinked {dir} from {name}.'.format(
dir=click.style(dir,
name=click.style(project.name, bold=True)
))
def check_cli_version():
"""Check if the current cli version satisfies the server requirements"""
try:
server_version = polyaxonClients().version.get_cli_version()
except (polyaxonHTTPError, polyaxonShouldExitError) as e:
Printer.print_error('Could not get cli version.')
Printer.print_error('Error message `{}`.'.format(e))
sys.exit(1)
current_version = pkg_resources.get_distribution(PROJECT_CLI_NAME).version
if LooseVersion(current_version) < LooseVersion(server_version.min_version):
click.echo("""Your version of CLI ({}) is no longer compatible with server.""".format(
current_version))
if click.confirm("Do you want to upgrade to "
"version {} Now?".format(server_version.latest_version)):
pip_upgrade()
sys.exit(0)
else:
clint.textui.puts("Your can manually run:")
with clint.textui.indent(4):
clint.textui.puts("pip install -U polyaxon-cli")
clint.textui.puts(
"to upgrade to the latest version `{}`".format(server_version.latest_version))
sys.exit(0)
elif LooseVersion(current_version) < LooseVersion(server_version.latest_version):
clint.textui.puts("New version of CLI ({}) is Now available. To upgrade run:".format(
server_version.latest_version
))
with clint.textui.indent(4):
clint.textui.puts("pip install -U polyaxon-cli")
def get_key(env):
bucket_name, key_path = get_bucket_name_and_key_path(env)
try:
bucket = bucketstore.get(bucket_name)
except ValueError:
if click.confirm("The bucket {} does not exist,would you like to create it?".format(bucket_name)):
try:
bucket = bucketstore.get(bucket_name, create=True)
except botocore.exceptions.ClientError as e:
if (e.response["Error"]["Code"] == "BucketAlreadyExists"):
raise click.ClickException("The bucket {} already exists.\nIf you created this bucket,you currently credentials do not have access,else try renaming to a different bucket".format(bucket_name))
else:
raise e
else:
return
try:
key = bucket.key(key_path)
# Call the Meta,will raise the error if does not exist
key.Meta
except botocore.exceptions.ClientError as e:
if (e.response["Error"]["Code"] == "NoSuchKey"):
bucket.set(key_path, "{}")
key = bucket.key(key_path)
else:
raise e
return key
def Confirm(self, text: str, abort=True):
"""Ask for a confirmation,either yes or no to a question.
Args:
text (str): prompts the user for a confirmation,with the given test as
the question
default (bool): the default for the confirmation answer. If True the
default is Y(es),if False the default is N(o)
abort (bool): if the program should abort if the user answer to the
confirm prompt is no. The default is an abort.
Returns:
bool: False if the user entered no,True if the user entered yes
"""
return click.confirm(text, abort=abort, default=default)
def cli(homedir):
"""pakalolo key management tool"""
chain_updated = False
storedir, gpg = open_keying(homedir)
chain = load_chain(storedir)
if chain is None:
if click.confirm("No claimchain was found in directory " + storedir +
",would you like to create one?", abort=True):
chain, chain_updated = first_run(gpg)
if len(chain.store) == 0:
if click.confirm("Your claimchain contains no keys, chain_updated = first_run(gpg)
if chain_updated and click.confirm('Do you want to save your changes?', abort=True):
store_chain(chain, storedir)
def create_index(index, mappings_and_settings):
"""Create an index given mappings and settings as a JSON"""
body = simplejson.load(mappings_and_settings)
click.confirm('Create index "%s"?' % index, abort=True)
es.client.indices.create(index=index, body=body)
log.info('Index created')
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。