Python flask 模块,abort() 实例源码
我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用flask.abort()。
def auth_jwt_project(short_name):
"""Create a JWT for a project via its secret KEY."""
project_secret_key = None
if 'Authorization' in request.headers:
project_secret_key = request.headers.get('Authorization')
if project_secret_key:
project = project_repo.get_by_shortname(short_name)
if project and project.secret_key == project_secret_key:
token = jwt.encode({'short_name': short_name,
'project_id': project.id},
project.secret_key, algorithm='HS256')
return token
else:
return abort(404)
else:
return abort(403)
def init_login_manager(db):
"""Init security extensions (login manager and principal)
:param db: Database which stores user accounts and roles
:type db: ``flask_sqlalchemy.sqlAlchemy``
:return: Login manager and principal extensions
:rtype: (``flask_login.LoginManager``,``flask_principal.Principal``
"""
login_manager = flask_login.LoginManager()
principals = flask_principal.Principal()
login_manager.anonymous_user = Anonymous
@login_manager.unauthorized_handler
def unauthorized():
flask.abort(403)
@login_manager.user_loader
def load_user(user_id):
return db.session.query(UserAccount).get(int(user_id))
return login_manager, principals
def lookup():
icao_identifier = request.form['airport']
if request.form['date']:
try:
date = dateparser.parse(request.form['date']).date()
except ValueError:
return "Unable to understand date %s" % request.form['date'], 400
else:
date = datetime.date.today()
try:
result = do_lookup(icao_identifier, date)
except Exception as e:
return str(e), 400
except:
flask.abort(500)
return json.dumps(result)
def star():
"""Starring/Highlighting handler.
Attempts to toggle a star/highlight on a particular show. The show ID must
be passed in the ``id`` query string. If the user is unauthenticated,the
function is aborted with a ``404`` message to hide the page.
Returns:
JSON formatted output describing success and the ID of the show starred.
"""
log.debug("Entering star,trying to toggle star.")
if fe.check_login_id(escape(session['logged_in'])):
log.debug("Sending show ID {0} to function".format(request.args['id']))
fe.star_show(request.args['id'])
log.debug("Returning to user.")
return jsonify({ "star": "success", "id": request.args['id'] })
log.debug("User cannot be authenticated,send 404 to hide page.")
abort(404)
def drop_show():
"""Show removal handler.
Attempts to remove a show from the backend system. The show ID must
be passed in the ``id`` query string. If the user if unauthenticated,the
function is aborted with a ``404`` message to hide the page.
Returns:
An HTTP redirect to the home page,to refresh.
"""
log.debug("Entering drop_show,trying to remove show from list.")
if fe.check_login_id(escape(session['logged_in'])):
log.debug("Sending show ID {0} to function".format(request.args['id']))
fe.remove_show(request.args['id'])
log.debug("Refreshing user's page.")
return redirect('/')
log.debug("User cannot be authenticated,send 404 to hide page.")
abort(404)
def scan_scrapers():
"""On demand scrapper scanning handler.
For some reason the scheduler doesn't always work,this endpoint allows for
instant scanning,assuming it's not already occurring. The function is aborted
with a ``404`` message to hide the page if the user is not authenticated.
Scanning can take a long time - 20 to 30 minutes - so it's recommended this
endpoint be called asynchronously.
Returns:
JSON formatted output to identify that scanning has completed or is already
ongoing.
"""
log.debug("Entering scan_scrapers.")
if fe.check_login_id(escape(session['logged_in'])):
log.debug("User is logged in,attempting to begin scan.")
if not fe.scrape_shows():
log.debug("scrape_shows returned false,either the lockfile exists incorrectly or scraping is ongoing.")
return jsonify({"scan":"failure", "reason":"A scan is ongoing"})
log.debug("scrape_shows just returned. Returning success.")
return jsonify({"scan":"success"})
log.debug("User cannot be authenticated,send 404 to hide page.")
abort(404)
def test_error_handling(self):
app = flask.Flask(__name__)
admin = flask.Module(__name__, 'admin')
@admin.app_errorhandler(404)
def not_found(e):
return 'not found', 404
@admin.app_errorhandler(500)
def internal_server_error(e):
return 'internal server error', 500
@admin.route('/')
def index():
flask.abort(404)
@admin.route('/error')
def error():
1 // 0
app.register_module(admin)
c = app.test_client()
rv = c.get('/')
self.assert_equal(rv.status_code, 404)
self.assert_equal(rv.data, b'not found')
rv = c.get('/error')
self.assert_equal(rv.status_code, 500)
self.assert_equal(b'internal server error', rv.data)
def test_aborting(self):
class Foo(Exception):
whatever = 42
app = flask.Flask(__name__)
app.testing = True
@app.errorhandler(Foo)
def handle_foo(e):
return str(e.whatever)
@app.route('/')
def index():
raise flask.abort(flask.redirect(flask.url_for('test')))
@app.route('/test')
def test():
raise Foo()
with app.test_client() as c:
rv = c.get('/')
self.assertEqual(rv.headers['Location'], 'http://localhost/test')
rv = c.get('/test')
self.assertEqual(rv.data, b'42')
def index(page=1):
"""Index page for all PYBOSSA registered users."""
update_Feed = get_update_Feed()
per_page = 24
count = cached_users.get_total_users()
accounts = cached_users.get_users_page(page, per_page)
if not accounts and page != 1:
abort(404)
pagination = Pagination(page, per_page, count)
if current_user.is_authenticated():
user_id = current_user.id
else:
user_id = None
top_users = cached_users.get_leaderboard(current_app.config['leaderBOARD'],
user_id)
tmp = dict(template='account/index.html', accounts=accounts,
total=count,
top_users=top_users,
title="Community", pagination=pagination,
update_Feed=update_Feed)
return handle_content_type(tmp)
def confirm_email():
"""Send email to confirm user email."""
acc_conf_dis = current_app.config.get('ACCOUNT_CONFIRMATION_disABLED')
if acc_conf_dis:
return abort(404)
if current_user.valid_email is False:
user = user_repo.get(current_user.id)
account = dict(fullname=current_user.fullname, name=current_user.name,
email_addr=current_user.email_addr)
confirm_url = get_email_confirmation_url(account)
subject = ('Verify your email in %s' % current_app.config.get('BRAND'))
msg = dict(subject=subject,
recipients=[current_user.email_addr],
body=render_template('/account/email/validate_email.md',
user=account, confirm_url=confirm_url))
msg['html'] = render_template('/account/email/validate_email.html',
user=account, confirm_url=confirm_url)
mail_queue.enqueue(send_mail, msg)
msg = gettext("An e-mail has been sent to \
validate your e-mail address.")
flash(msg, 'info')
user.confirmation_email_sent = True
user_repo.update(user)
return redirect_content_type(url_for('.profile', name=current_user.name))
def projects(name):
"""
List user's project list.
Returns a Jinja2 template with the list of projects of the user.
"""
user = user_repo.get_by_name(name)
if not user:
return abort(404)
if current_user.name != name:
return abort(403)
user = user_repo.get(current_user.id)
projects_published, projects_draft = _get_user_projects(user.id)
response = dict(template='account/projects.html',
title=gettext("Projects"),
projects_published=projects_published,
projects_draft=projects_draft)
return handle_content_type(response)
def reset_api_key(name):
"""
Reset API-KEY for user.
Returns a Jinja2 template.
"""
if request.method == 'POST':
user = user_repo.get_by_name(name)
if not user:
return abort(404)
ensure_authorized_to('update', user)
user.api_key = model.make_uuid()
user_repo.update(user)
cached_users.delete_user_summary(user.name)
msg = gettext('New API-KEY generated')
flash(msg, 'success')
return redirect_content_type(url_for('account.profile', name=name))
else:
csrf = dict(form=dict(csrf=generate_csrf()))
return jsonify(csrf)
def project_by_shortname(short_name):
project = cached_projects.get_project(short_name)
if project:
# Get owner
owner = user_repo.get(project.owner_id)
# Populate CACHE with the data of the project
return (project,
owner,
cached_projects.n_tasks(project.id),
cached_projects.n_task_runs(project.id),
cached_projects.overall_progress(project.id),
cached_projects.last_activity(project.id),
cached_projects.n_results(project.id))
else:
cached_projects.delete_project(short_name)
return abort(404)
def delete_autoimporter(short_name):
pro = pro_features()
if not pro['autoimporter_enabled']:
raise abort(403)
project = project_by_shortname(short_name)[0]
ensure_authorized_to('read', project)
ensure_authorized_to('update', project)
if project.has_autoimporter():
autoimporter = project.get_autoimporter()
project.delete_autoimporter()
project_repo.save(project)
auditlogger.log_event(project, current_user, 'delete', 'autoimporter',
json.dumps(autoimporter), 'nothing')
return redirect(url_for('.tasks', short_name=project.short_name))
def auditlog(short_name):
pro = pro_features()
if not pro['auditlog_enabled']:
raise abort(403)
(project, owner, n_tasks, n_task_runs,
overall_progress, last_activity,
n_results) = project_by_shortname(short_name)
ensure_authorized_to('read', Auditlog, project_id=project.id)
logs = auditlogger.get_project_logs(project.id)
project = add_custom_contrib_button_to(project, get_user_id_or_ip())
return render_template('projects/auditlog.html', project=project,
owner=owner, logs=logs,
overall_progress=overall_progress,
n_tasks=n_tasks,
n_task_runs=n_task_runs,
n_completed_tasks=cached_projects.n_completed_tasks(project.get('id')),
n_volunteers=cached_projects.n_volunteers(project.get('id')),
pro_features=pro)
def add_admin(user_id=None):
"""Add admin flag for user_id."""
try:
if user_id:
user = user_repo.get(user_id)
if user:
ensure_authorized_to('update', user)
user.admin = True
user_repo.update(user)
return redirect_content_type(url_for(".users"))
else:
msg = "User not found"
return format_error(msg, 404)
except Exception as e: # pragma: no cover
current_app.logger.error(e)
return abort(500)
def del_admin(user_id=None):
"""Del admin flag for user_id."""
try:
if user_id:
user = user_repo.get(user_id)
if user:
ensure_authorized_to('update', user)
user.admin = False
user_repo.update(user)
return redirect_content_type(url_for('.users'))
else:
msg = "User.id not found"
return format_error(msg, 404)
else: # pragma: no cover
msg = "User.id is missing for method del_admin"
return format_error(msg, 415)
except Exception as e: # pragma: no cover
current_app.logger.error(e)
return abort(500)
def run_cmd_host(host_id):
"""Execute host specific command."""
#Select host on the server.
res = RSPET_API.select([host_id])
#Check if host selected correctly (if not probably host_id is invalid).
if res["code"] != rspet_server.ReturnCodes.OK:
abort(404)
#Read 'command' argument from query string.
comm = request.args.get('command')
if not comm or comm in EXCLUDED_FUNCTIONS:
abort(400)
try:
#Read 'args' argument from query string.
args = request.args.getlist('args')
#Cast arguments to string.
for i, val in enumerate(args):
args[i] = str(val)
except KeyError:
args = []
#Execute command.
res = RSPET_API.call_plugin(comm, args)
#Unselect host. Maintain statelessness of RESTful.
RSPET_API.select()
return jsonify(res)
def run_cmd():
"""Execute general (non-host specific) command."""
#Read 'command' argument from query string.
comm = request.args.get('command')
if not comm or comm in EXCLUDED_FUNCTIONS:
abort(400)
try:
#Read 'args' argument from query string.
args = request.args.getlist('args')
#Cast arguments to string.
for i, val in enumerate(args):
args[i] = str(val)
except KeyError:
args = []
#Execute command.
ret = RSPET_API.call_plugin(comm, args)
if ret['code'] == rspet_server.ReturnCodes.OK:
http_code = 200
else:
http_code = 404
return make_response(jsonify(ret), http_code)
def post(self):
parser = reqparse.RequestParser()
parser.add_argument("schoolID", type=str, required=True, location="json")
parser.add_argument("userID", location="json")
args = parser.parse_args()
user = db.tempuser.find_one({"_id": args["userID"]})
if user is None: abort(400)
school = db.school.find_one({"_id": args["schoolID"]})
if school is None: abort(400)
user["schoolID"] = args["schoolID"]
db.user.insert_one(user)
session['userID'] = user["_id"]
return jsonify(user, status=201)
def test_error_handling(self):
app = flask.Flask(__name__)
@app.errorhandler(404)
def not_found(e):
return 'not found', 404
@app.errorhandler(500)
def internal_server_error(e):
return 'internal server error', 500
@app.route('/')
def index():
flask.abort(404)
@app.route('/error')
def error():
1 // 0
c = app.test_client()
rv = c.get('/')
self.assert_equal(rv.status_code, rv.data)
def test_aborting(self):
class Foo(Exception):
whatever = 42
app = flask.Flask(__name__)
app.testing = True
@app.errorhandler(Foo)
def handle_foo(e):
return str(e.whatever)
@app.route('/')
def index():
raise flask.abort(flask.redirect(flask.url_for('test')))
@app.route('/test')
def test():
raise Foo()
with app.test_client() as c:
rv = c.get('/')
self.assertEqual(rv.headers['Location'], b'42')
def post_recipe():
payload = request.get_json()
topology = payload.get("topology")
scenarios = payload.get("scenarios")
headers = payload.get("headers")
#pattern = payload.get("header_pattern")
if not topology:
abort(400, "Topology required")
if not scenarios:
abort(400, "Failure scenarios required")
if headers and type(headers)!=dict:
abort(400, "Headers must be a dictionary")
# if not pattern:
# abort(400,"Header_pattern required")
appgraph = ApplicationGraph(topology)
fg = A8FailureGenerator(appgraph, a8_controller_url='{0}/v1/rules'.format(a8_controller_url), a8_controller_token=a8_controller_token, headers=headers, debug=debug)
fg.setup_failures(scenarios)
return make_response(jsonify(recipe_id=fg.get_id()), 201, {'location': url_for('get_recipe_results', recipe_id=fg.get_id())})
def delete_recipe(recipe_id):
# clear fault injection rules
url = '{0}/v1/rules?tag={1}'.format(a8_controller_url, recipe_id)
headers = {}
if a8_controller_token != "" :
headers['Authorization'] = "Bearer " + token
try:
r = requests.delete(url, headers=headers)
except Exception, e:
sys.stderr.write("Could not DELETE {0}".format(url))
sys.stderr.write("\n")
sys.stderr.write(str(e))
sys.stderr.write("\n")
abort(500, "Could not DELETE {0}".format(url))
if r.status_code != 200 and r.status_code != 204:
abort(r.status_code)
return ""
def serve_static_files(p, index_on_error=True):
"""Securely serve static files for the given path using send_file."""
# Determine the canonical path of the file
full_path = os.path.realpath(os.path.join(app.static_folder, p))
# We have a problem if either:
# - the path is not a sub-path of app.static_folder; or
# - the path does not refer to a real file.
if (os.path.commonprefix([app.static_folder, full_path]) != app.static_folder or
not os.path.isfile(full_path)):
file_to_return = app.config.get('STATIC_FILE_ON_404', None)
if file_to_return is not None:
full_path = os.path.realpath(os.path.join(app.static_folder, file_to_return))
else:
return abort(404)
return send_file(full_path)
def post(self):
if current_user() is None:
redirect(url_for('authorized'))
user = current_user()
if not user.faculty:
return abort(403)
key = request.form.get('key', None)
value = request.form.get('value', None)
try:
try:
config.set(key, config.from_frontend_value(key, json.loads(value)))
return jsonify({'status': 'OK'})
except ValueError:
return abort(404)
except:
return abort(400)
def configure_views(app):
@app.route('/')
def home():
return render_template('ok.html')
@app.route('/mistake')
def mistake():
# this causes Internal Server Error
return abort(500)
@app.route('/secret')
def secret():
# this causes Foridden (user not authorized)
return abort(403)
def test_error_handling(self):
app = flask.Flask(__name__)
@app.errorhandler(404)
def not_found(e):
return 'not found', rv.data)
def test_aborting(self):
class Foo(Exception):
whatever = 42
app = flask.Flask(__name__)
app.testing = True
@app.errorhandler(Foo)
def handle_foo(e):
return str(e.whatever)
@app.route('/')
def index():
raise flask.abort(flask.redirect(flask.url_for('test')))
@app.route('/test')
def test():
raise Foo()
with app.test_client() as c:
rv = c.get('/')
self.assertEqual(rv.headers['Location'], b'42')
def validate_room_jwt(function=None, requires_jamf_configured=True):
def decorator(f):
@functools.wraps(f)
def wrapper(*args, **kwargs):
try:
token = flask.request.headers.get('Authorization').split()[1]
except:
flask.abort(401)
hipchat_room = verify_jwt(token)
if requires_jamf_configured and not hipchat_room.jamf_configured:
message = "You must first configure a Jamf Pro service account to use this feature."
send_notification(hipchat_room.hipchat_token, hipchat_room.hipchat_room_id, message, color='yellow')
flask.abort(400)
return f(hipchat_room, *args, **kwargs)
return wrapper
return decorator(function) if function else decorator
def _dispatch(self, route, **kwargs):
user = None
if 'shared_id' in session:
if session['shared_id'] in self.user_tracker.user_refs:
user = self.user_tracker.users[session['shared_id']]
if user is None:
user = self.user_tracker.connect_user()
session['shared_id'] = user.id_
user.open_page(route, kwargs)
ctx = CallCTX(abort=abort)
user.active_controller.before_connect(ctx)
ctx.deactivate()
possible = user.active_controller.render_page()
if user.active_controller.special_return_handler is not None:
return user.active_controller.special_return_handler()
return possible
def role_assignment_add(name):
"""Assign role to user (POST handler)"""
db = flask.current_app.container.get('db')
login = flask.request.form.get('login', '')
user = db.session.query(User).filter_by(login=login).first()
role = db.session.query(Role).filter_by(name=name).first()
if user is None or role is None:
flask.abort(404)
account = user.user_account
if account in role.user_accounts:
flask.flash('User {} already has role {}'.format(login, name),
'error')
else:
role.user_accounts.append(account)
db.session.commit()
flask.flash('Role {} assigned to user {}'.format(name, login),
'success')
return flask.redirect(flask.url_for('admin.role_detail', name=name))
def role_assignment_remove(name):
"""Remove assignment of role to user (POST handler)"""
db = flask.current_app.container.get('db')
login = flask.request.form.get('login', '')
user = db.session.query(User).filter_by(login=login).first()
role = db.session.query(Role).filter_by(name=name).first()
if user is None or role is None:
flask.abort(404)
account = user.user_account
if account not in role.user_accounts:
flask.flash('User {} doesn\'t have role {}'.format(login,
'error')
else:
role.user_accounts.remove(account)
db.session.commit()
flask.flash('Role {} removed from user {}'.format(name, name=name))
def org_detail(login):
"""Organization detail (GET handler)
.. todo:: implement 410 (org deleted/archived/renamed)
"""
db = flask.current_app.container.get('db')
ext_master = flask.current_app.container.get('ext_master')
org = db.session.query(Organization).filter_by(login=login).first()
if org is None:
user = db.session.query(User).filter_by(login=login).first()
if user is None:
flask.abort(404)
flask.flash('Oy! You wanted to access organization,but it\'s auser.'
'We redirected you but be careful next time!', 'notice')
return flask.redirect(flask.url_for('core.user_detail', login=login))
tabs = {}
ext_master.call('view_core_org_detail_tabs',
org=org, tabs_dict=tabs)
tabs = sorted(tabs.values())
active_tab = flask.request.args.get('tab', tabs[0].id)
return flask.render_template(
'core/org.html', org=org, tabs=tabs, active_tab=active_tab
)
def test_error_handling(self):
app = flask.Flask(__name__)
@app.errorhandler(404)
def not_found(e):
return 'not found', rv.data)
def test_aborting(self):
class Foo(Exception):
whatever = 42
app = flask.Flask(__name__)
app.testing = True
@app.errorhandler(Foo)
def handle_foo(e):
return str(e.whatever)
@app.route('/')
def index():
raise flask.abort(flask.redirect(flask.url_for('test')))
@app.route('/test')
def test():
raise Foo()
with app.test_client() as c:
rv = c.get('/')
self.assertEqual(rv.headers['Location'], b'42')
def test_error_handling(self):
app = flask.Flask(__name__)
@app.errorhandler(404)
def not_found(e):
return 'not found', rv.data)
def test_aborting(self):
class Foo(Exception):
whatever = 42
app = flask.Flask(__name__)
app.testing = True
@app.errorhandler(Foo)
def handle_foo(e):
return str(e.whatever)
@app.route('/')
def index():
raise flask.abort(flask.redirect(flask.url_for('test')))
@app.route('/test')
def test():
raise Foo()
with app.test_client() as c:
rv = c.get('/')
self.assertEqual(rv.headers['Location'], b'42')
def post(self):
args = parser.parse({
'username': wf.Str(missing=None),
'email': wf.Str(missing=None),
'password': wf.Str(missing=None),
})
username = args['username'] or args['email']
password = args['password']
if not username or not password:
return flask.abort(400)
if username.find('@') > 0:
user_db = model.User.get_by('email', username.lower())
else:
user_db = model.User.get_by('username', username.lower())
if user_db and user_db.password_hash == util.password_hash(user_db, password):
auth.signin_user_db(user_db)
return helpers.make_response(user_db, model.User.FIELDS)
return flask.abort(401)
def project_view(project_id):
project_db = model.Project.get_by_id(project_id)
if not project_db or project_db.user_key != auth.current_user_key():
flask.abort(404)
crash_dbs, crash_cursor = project_db.get_crash_dbs(
order=util.param('order') or '-created',
)
return flask.render_template(
'project/project_view.html',
html_class='project-view',
title=project_db.name,
project_db=project_db,
crash_dbs=crash_dbs,
next_url=util.generate_next_url(crash_cursor),
api_url=flask.url_for('api.project', project_key=project_db.key.urlsafe() if project_db.key else ''),
)
###############################################################################
# Admin List
###############################################################################
def admin_project_update(project_id=0):
if project_id:
project_db = model.Project.get_by_id(project_id)
else:
project_db = model.Project(user_key=auth.current_user_key())
if not project_db:
flask.abort(404)
form = ProjectUpdateAdminForm(obj=project_db)
if form.validate_on_submit():
form.populate_obj(project_db)
project_db.put()
return flask.redirect(flask.url_for('admin_project_list', order='-modified'))
return flask.render_template(
'project/admin_project_update.html',
html_class='admin-project-update',
form=form,
back_url_for='admin_project_list',
api_url=flask.url_for('api.admin.project',
)
def admin_test(test=None):
if test and test not in TESTS:
flask.abort(404)
form = TestForm()
if form.validate_on_submit():
pass
return flask.render_template(
'test/test_one.html' if test else 'test/test.html',
title='Test: %s' % test.title() if test else 'Test',
html_class='test',
test=test,
tests=TESTS,
back_url_for='admin_test' if test else None,
)
def test_error_handling(self):
app = flask.Flask(__name__)
@app.errorhandler(404)
def not_found(e):
return 'not found', rv.data)
def backup(username:str, timeframe:str, backup_date:str):
"""
Route: /backup/username/timeframe/backup_date
This route returns the requested backup.
:param username the server username of the user needing their backup.
:param timeframe the timeframe of the requested backup. Can be either
"weekly",or "monthly".
:param backup_date the backup-date of the requested backup. Must be in the
form YYYY-MM-DD.
"""
if flask.request.method != "GET":
return flask.abort(400)
# make sure the arguments are valid
if not re.match(r"^[a-z]+$", username) or \
not re.match(r"^[0-9]{4}-[0-9]{2}-[0-9]{2}", backup_date) or \
timeframe not in ["weekly", "monthly"]:
app.logger.debug("backups(%s,%s,%s): invalid arguments"%(
username, timeframe, backup_date))
return flask.abort(400)
backups_base_dir = os.path.join(b.BACKUPS_DIR, username, timeframe)
return flask.send_from_directory(backups_base_dir, backup_date+".tgz")
def tutorials():
"""
Route: /tutorials
This route will render the tutorials page. Note that the markdown tutorial
files are read when the application starts-up.
"""
global TUTORIALS
if flask.request.method != "GET":
return flask.abort(400)
if len(TUTORIALS) == 0:
return flask.render_template("tutorials.html",
show_logout_button=l.is_logged_in(),
error="No tutorials to show")
if DEBUG:
TUTORIALS = []
populate_tutorials()
return flask.render_template("tutorials.html",
show_logout_button=l.is_logged_in(),
tutorials=TUTORIALS)
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。