Python flask_script 模块,prompt() 实例源码
我们从Python开源项目中,提取了以下38个代码示例,用于说明如何使用flask_script.prompt()。
def database():
print_topic('Database')
print_info("Database is used to store presistant data.")
print_info("By having it disabled the data will be store in run-time RAM for the\
session")
enabled = None
while enabled is None:
enabled = prompt("Enable Database(Y/N)").upper()
if 'Y' in enabled:
enabled = True
elif 'N' in enabled:
enabled = False
else:
enabled = None
NodeDefender.config.database.set_cfg(enabled = enabled)
if enabled:
config_database_engine()
return True
def config_database_engine():
supported_databases = ['MysqL', 'sqlite']
engine = None
while engine is None:
engine = prompt("Enter DB Engine(sqlITE,MysqL)").lower()
if engine not in supported_databases:
engine = None
NodeDefender.config.database.set_cfg(engine = engine)
if engine == 'MysqL':
config_database_host()
config_database_user()
if engine == 'sqlite':
config_database_file()
return True
def config_database_user():
username = None
while not username:
username = prompt('Enter Username')
password = None
while not password:
password = prompt('Enter Password')
db = None
while not db:
db = prompt("Enter DB Name/Number")
NodeDefender.config.database.set_cfg(username = username,\
password = password,\
db = db)
return True
def info(name):
'Show information about a Group'
if name is None:
name = prompt('Group Name')
group = NodeDefender.db.group.get_sql(name)
if group is None:
print("Cant find group ", name)
return
print("ID: {},Name: {}".format(group.id, group.name))
print("User Members:")
for user in group.users:
print("ID: {},Mail: {}".format(user.id, user.email))
print("Nodes")
for node in group.nodes:
print("ID: {},Name: {}".format(node.id, node.name))
def create(email, password):
'Create User Account'
if email is None:
email = prompt('Email')
if password is None:
password = prompt_pass('Password')
try:
NodeDefender.db.user.create(email)
NodeDefender.db.user.set_password(email, password)
NodeDefender.db.user.enable(email)
except ValueError:
print("User already present")
return
print("User {} Successfully added!".format(email))
def run(self):
username = prompt("??????")
with app.app_context():
g_db = db.session
while True:
old_user = models.User.query.filter_by(username=username).first()
if old_user:
print "?????"
username = prompt("????????")
else:
break
nickname = prompt("?????")
while True:
password = prompt_pass("?????")
confirm_password = prompt_pass("??????")
if password != confirm_password:
print "????,???????"
continue
break
super_user = models.User(username, nickname, password, "", is_superuser=True)
g_db.add(super_user)
g_db.commit()
print super_user.create_date.strftime("%Y%m%d%H%M%s")
print "???????%s??" % username
def config_database_file():
filepath = None
while not filepath:
print("FilePath for sqlite Database,Enter leading slash(/) for\
absolute- path. Otherwise relative to your current folder.")
filepath = prompt("Enter File Path")
if filepath[0] == '/':
filepath = filepath
else:
filepath = NodeDefender.config.basepath + '/' + filepath
NodeDefender.config.database.set_cfg(filepath = filepath)
return True
def config_logging_type():
loggtype = None
while loggtype is None:
loggtype = prompt("Enter Logging Type(Syslog/Local)").lower()
if loggtype not in supported_loggtypes:
loggtype = None
NodeDefender.config.logging.set_cfg(TYPE = loggtype)
if loggtype == 'local':
config_logging_filepath()
elif loggtype == 'syslog':
config_logging_host()
config_logging_level()
return True
def config_logging_filepath():
filepath = None
while not filepath:
print_info("Enter filepath for loggingfile. Leading slah(/) for absolute-\
path. Otherwise relative to current directory")
filepath = prompt("Please Filename")
if filepath[0] == '/':
filepath = filepath
else:
filepath = NodeDefender.config.basepath + '/' + filepath
NodeDefender.config.logging.set_cfg(filepath = filepath)
def config_mail_user():
tls = None
while tls is None:
tls = prompt("TLS Enabled(Y/N)?")
if tls[0].upper() == 'Y':
tls = True
elif tls[0].upper() == 'N':
tls = False
else:
tls = None
ssl = None
while ssl is None:
ssl = prompt("SSL Enabled(Y/N)?")
if ssl[0].upper() == 'Y':
ssl = True
elif ssl[0].upper() == 'N':
ssl = False
else:
ssl = None
username = None
while username is None:
username = prompt('Username')
password = None
while password is None:
password = prompt('Password')
NodeDefender.config.mail.set_cfg(tls = tls,
ssl = ssl,
username = username,
password = password)
return True
def redis():
print_topic("Redis")
print_info("Redis is used to store temporary data(Current heat of sensor\
etc). With redis enabled it will store the data in Redis.\
disabled will store in as a local class- object")
enabled = None
while enabled is None:
enabled = prompt("Enable Redis(Y/N)").upper()
if 'Y' in enabled:
enabled = True
elif 'N' in enabled:
enabled = False
else:
enabled = None
NodeDefender.config.redis.set_cfg(enabled = enabled)
if not enabled:
return True
host = None
while host is None:
host = prompt("Enter Host Address")
NodeDefender.config.redis.set_cfg(host = host)
port = None
while port is None:
port = prompt("Enter Server Port")
NodeDefender.config.redis.set_cfg(port = port)
database = ''
while not database:
database = prompt("Enter Database")
NodeDefender.config.redis.set_cfg(database = database)
return True
def create(name):
'Create a Group'
if name is None:
name = prompt('Group Name')
try:
NodeDefender.db.group.create(name)
except ValueError as e:
print("Error: ", e)
print("Group {} successfully added".format(name))
def create(host, port, username, password):
'Create Node and Assign to Group'
if host is None:
host = prompt('Host Address')
if port is None:
port = prompt('Port Number')
'''
if username is None:
username = prompt('Username(blank for None)')
if not len(username):
username = None
if password is None:
password = prompt('Password(blank for None)')
if not len(password):
password = None
'''
try:
NodeDefender.db.mqtt.create(host, port)
except ValueError as e:
print("Error: ", e)
return
print("MQTT {}:{} Successfully created".format(host, port))
def delete(host):
'Delete Node'
if host is None:
host = prompt('Host Address')
try:
NodeDefender.db.mqtt.delete(host)
except LookupError as e:
print("Error: ", e)
return
print("MQTT {} Successfully deleted".format(host))
def create(name, group, street, city):
'Create Node and Assign to Group'
if name is None:
name = prompt('Node Name')
if group is None:
group = prompt('Group Name')
if street is None:
street = prompt('Street')
if city is None:
city = prompt('City')
try:
location = NodeDefender.db.node.location(street, city)
except LookupError as e:
print("Error: ", e)
return
try:
NodeDefender.db.node.create(name, location)
except (LookupError, ValueError) as e:
print("Error: ", e)
return
print("Node {} Successfully created".format(name))
def delete(name):
'Delete Node'
if name is None:
name = prompt('Node Name')
try:
node.Delete(name)
except LookupError as e:
print("Error: ", e)
return
print("Node {} Successfully deleted".format(name))
def delete(mac, index):
'Delete Sensor'
if mac is None:
mac = prompt('Mac')
if index is None:
index = prompt('Index')
try:
sensor.Delete(index, mac)
except LookupError as e:
print("Error: ", str(e))
return
print("Successfully Deleted: ", sensor)
def info(mac, index):
'Info about a specific Sensor'
if mac is None:
mac = prompt('mac')
if index is None:
index = prompt('Index')
s = sensor.Get(mac, index)
if icpe is None:
print("Unable to find iCPE {}".format(mac))
print('ID: {},Name: {}'.format(s.id, s.name))
print('iCPE: {},Mac: {}'.format(s.icpe.name, s.icpe.mac_address))
def technician(email):
"List users that are member of a group"
if email is None:
email = prompt('Email of User')
try:
NodeDefender.db.user.set_role(email, 'technician')
except AttributeError:
return print("User {} not found".format(email))
return print("User {} Added as Technician".format(email))
def admin(email):
"List users that are member of a group"
if email is None:
email = prompt('Email of User')
try:
NodeDefender.db.user.set_role(email, 'administrator')
except AttributeError:
return print("User {} not found".format(email))
return print("User {} Added as Administrator".format(email))
def superuser(email):
"List users that are member of a group"
if email is None:
email = prompt('Email of User')
try:
NodeDefender.db.user.set_role(email, 'superuser')
except AttributeError:
return print("User {} not found".format(email))
return print("User {} Added as superuser".format(email))
def delete(email):
"Deltes User"
if email is None:
email = prompt('Email')
try:
u = NodeDefender.db.user.delete(email)
except LookupError as e:
print("Error: ", e)
print("User {} Successfully Deleted!".format(u.email))
def enable(email):
"Enable User"
if email is None:
email = prompt('Email')
try:
user = NodeDefender.db.user.enable(email)
except LookupError as e:
print("Error: ", e)
print("User {} Successfully Enabled!".format(user.email))
def groups(email):
"List User Groups"
if email is None:
email = prompt('Email')
for group in NodeDefender.db.user.groups(email):
print("ID: {}, group.name))
def leave(email, group):
'Remove Role from User'
if email is None:
email = prompt('Email')
if group is None:
group = prompt('Group')
try:
NodeDefender.db.group.remove_user(group, email)
except LookupError as e:
print("Error: ", e)
return
print("User {},Successfully removed from {}".format(email, group))
def delete(mac):
'Delete iCPE'
if mac is None:
mac = prompt('Mac')
try:
NodeDefender.db.icpe.delete(mac)
except LookupError as e:
print("Error: ", str(e))
return
print("Successfully Deleted: ", mac)
def info(mac):
'Info about a specific iCPE'
if mac is None:
mac = prompt('Mac')
icpe = NodeDefender.db.icpe.get_sql(mac)
if icpe is None:
print("Unable to find iCPE {}".format(mac))
print('ID: {},MAC: {}'.format(icpe.id, icpe.mac_address))
print('Alias {},Node: {}'.format(icpe.alias, icpe.node.name))
print('ZWave Sensors: ')
for sensor in icpe.sensors:
print('Alias: {},Type: {}'.format(sensor.alias, sensor.type))
def run(self):
email = prompt('Email Address')
password = prompt_pass('Password')
if password == prompt_pass('Confirm Password'):
user = app.user_datastore.create_user(email=email, password=encrypt_password(password))
app.user_datastore.activate_user(user)
db.session.commit()
self.stdout.write('New User Created,<{id} : {email}>'.format(id=user.id, email=user.email))
else:
self.stderr.write('Passwords did not match!')
def run(self):
name = prompt('Role Name')
desc = prompt('Role Description')
role = Role(name=name, description=desc).save()
self.stdout.write(
'New Role Created,<{id} : {name} : {desc}>'.format(
id=role.id,
name=role.name,
desc=role.description
)
)
def createapp():
path = prompt(Fore.BLUE + "Write the name of the blueprint")
try:
int(path)
print (Fore.RED + "Name no valid")
return
except ValueError:
pass
folder = current_app.config['APP_FOLDER'] + path
register_blueprint_str = "Blueprint('{0}','app.{0}',template_folder='templates')".format(path.lower())
if not os.path.exists(folder):
# Scaffold new blueprint
os.makedirs(folder)
python_files = ["forms", "routes", "utils", "views", "__init__.py"]
for i, file in enumerate(python_files):
with open(os.path.join(folder, file + ".py"), 'w') as temp_file:
if i != 4:
if file is "routes":
temp_file.write("from flask_via.routers.default import Pluggable\nfrom views import *\n")
if file is "forms":
temp_file.write("from wtforms import *\n")
if file is "views":
temp_file.write("from flask import jsonify,request,"
"url_for,redirect,current_app,render_template,flash,make_response\n"
"from flask.views import MethodView")
else:
os.makedirs(folder + "/template/" + path)
# Register blueprint in app/route.py
route_path = os.path.join(current_app.config['APP_FOLDER'], "routes.py")
with open(route_path, "r") as old_routes:
data = old_routes.readlines()
data[-2] = data[-2] + " " + register_blueprint_str + ',\n'
os.remove(os.path.join(current_app.config['APP_FOLDER'], "routes.py"))
with open(route_path, 'w') as new_routes:
new_routes.writelines(data)
else:
print (Fore.RED + "This path exist")
def createadmin():
username = prompt(Fore.BLUE + "Username")
query_username = db.session.query(FinalUser).filter_by(username=username).first()
email = prompt(Fore.BLUE + "Write Email")
if re.match('^[_a-z0-9-]+(\.[_a-z0-9-]+)*@[a-z0-9-]+(\.[a-z0-9-]+)*(\.[a-z]{2,4})$', email) == None:
print(Fore.RED + "Invalid email format")
return
query_email = db.session.query(FinalUser).filter_by(email=email).first()
if query_username is None and query_email is None:
password = prompt(Fore.BLUE + "Write password")
repeat_password = prompt(Fore.BLUE + "Repeat password")
if password == repeat_password:
encrypted_password = utils.encrypt_password(password)
user_datastore.create_user(username=username,
password=encrypted_password,
email=email)
db.session.commit()
user_datastore.add_role_to_user(email, 'admin')
db.session.commit()
print(Fore.GREEN + "Admin created")
else:
print(Fore.RED + "The password does not match")
return
else:
print(Fore.RED + "The username or email are in use")
return
def general():
print_topic("General configuration")
print_info("Server Name. If you are using a local running server please enter\
as format NAME:PORT. Otherwise it will be\
generating non- accessable URLs")
print("/ Example: 127.0.0.1:5000")
servername = None
while servername is None:
servername = prompt("Enter Server Name")
NodeDefender.config.general.set_cfg(servername = servername)
port = None
while port is None:
port = prompt("Which port should the server be running on")
NodeDefender.config.general.set_cfg(port = port)
print_info("Security Key is used to Encrypt Password etc.")
key = None
while key is None:
key = prompt("Enter Secret Key")
NodeDefender.config.general.set_cfg(secret_key = key)
print_info("Salt is used to genereate URLS and more.")
salt = None
while salt is None:
salt = prompt("Please enter Salt")
NodeDefender.config.general.set_cfg(salt = salt)
print_info("You can either have users register by themselfs on the\
authentication- page or via invite mail. Invite mail requires\
that you also enable mail- support so that NodeDefender can send\
invitation- mail and such. Superuser can still administrate\
users in the same way.")
self_registration = None
while self_registration is None:
self_registration = prompt("Enable self-registration(Y/N)").upper()
if 'Y' in self_registration:
self_registration = True
elif 'N' in self_registration:
self_registration = False
else:
self_registration = None
NodeDefender.config.general.set_cfg(self_registration = self_registration)
return True
def celery():
print_topic("Celery")
print_info("Celery is used for concurrent operation. It will spawn multiple\
workes on multiple cpu cores and possibly even on multiple\
hosts,running as a cluster. disabling Celery will make\
NodeDefender as a single process. Celery requires AMQP or Redis\
to communicate between workes")
enabled = None
while enabled is None:
enabled = prompt("Enable Celery(Y/N)").upper()
if 'Y' in enabled:
enabled = True
elif 'N' in enabled:
enabled = False
else:
enabled = None
NodeDefender.config.celery.set_cfg(enabled = enabled)
if not enabled:
return True
broker = None
while broker is None:
broker = prompt("Enter broker type(AMQP or Redis)").lower()
if broker not in supported_brokers:
broker = None
NodeDefender.config.celery.set_cfg(broker = broker)
server = None
while server is None:
server = prompt("Enter Server Address")
NodeDefender.config.celery.set_cfg(server = server)
port = None
while port is None:
port = prompt("Enter Server Port")
NodeDefender.config.celery.set_cfg(port = port)
database = ''
while not database:
database = prompt("Enter Database")
NodeDefender.config.celery.set_cfg(database = database)
return True
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。