Python flask.request 模块,endpoint() 实例源码
我们从Python开源项目中,提取了以下48个代码示例,用于说明如何使用flask.request.endpoint()。
def paginate(query, schema):
page = request.args.get('page', DEFAULT_PAGE_NUMBER)
per_page = request.args.get('page_size', DEFAULT_PAGE_SIZE)
page_obj = query.paginate(page=page, per_page=per_page)
next = url_for(
request.endpoint,
page=page_obj.next_num if page_obj.has_next else page_obj.page,
per_page=per_page,
**request.view_args
)
prev = url_for(
request.endpoint,
page=page_obj.prev_num if page_obj.has_prev else page_obj.page,
**request.view_args
)
return {
'total': page_obj.total,
'pages': page_obj.pages,
'next': next,
'prev': prev,
'results': schema.dump(page_obj.items).data
}
def __init__(self, options, func, request_context):
self.options = options
self.operation = request.endpoint
self.func = func.__name__
self.method = request.method
self.args = request.args
self.view_args = request.view_args
self.request_context = request_context
self.timing = dict()
self.error = None
self.stack_trace = None
self.request_body = None
self.response_body = None
self.response_headers = None
self.status_code = None
self.success = None
def api_deprecated(new_endpoint, message='This endpoint is deprecated.'):
"""Decorator that adds a deprecation message for and endpoint.
Decorated function will not be executed.
:param new_endpoint: New endpoint to use
:param message: Warning message
:return:
:rtype: func
"""
def decorator(f):
@functools.wraps(f)
def wrapped(*args, **kwargs):
response = jsonify({
'message': message,
'endpoint': url_for(new_endpoint, _external=True)
})
# response = jsonify(rv)
response.status_code = 301
response.headers['DO-New-Endpoint'] = \
url_for(new_endpoint, _external=True)
return response
return wrapped
return decorator
def authenticate():
# logger.debug("endpoint request: %s" % request.endpoint)
if re.search('tenant_provisioned', str(request.endpoint)):
g.user = "phone_home"
logger.info("Authentication bypassed: tenant_provisioned")
return
try:
decoded = jwt.decode(request.headers['X-Auth-Token'], credentials['tenant_secret'], algorithms=['HS256'])
g.user = decoded['user']
except KeyError:
logger.error("Error: key error.")
abort(401)
except jwt.DecodeError:
logger.error("Error: decode error")
abort(401)
def before_request():
if request.endpoint != 'root':
if request.method != 'POST':
response = generate_response(
**{
'return': 'KO',
'data': 'Unsupported HTTP method'
}
)
return response, 400
elif request.form.get('APIKey') != Master_APIKey:
response = generate_response(
**{
'return': 'KO',
'data': 'Bad API key'
}
)
return response, 403
def error():
"""This endpoint is used by httpd,which redirects its errors to it."""
try:
status = int(request.environ['REDIRECT_STATUS'])
except Exception:
# if there's an exception,it means that a client accessed this directly;
# in this case,we want to make it look like the endpoint is not here
return api_404_handler()
msg = 'UnkNown error'
# for Now,we just provide specific error for stuff that already happened;
# before adding more,I'd like to see them actually happening with reproducers
if status == 401:
msg = 'Authentication Failed'
elif status == 405:
msg = 'Method not allowed for this endpoint'
raise HTTPError(status, msg)
def add_resource_no_matter_slashes(resource, route, endpoint=None, defaults=None):
"""Adds a resource for both trailing slash and no trailing slash to prevent redirects.
"""
slashless = route.rstrip('/')
_resource_paths.append(api_v1.url_prefix + slashless)
slashful = route + '/'
endpoint = endpoint or resource.__name__.lower()
defaults = defaults or {}
rest_api_v1.add_resource(resource,
slashless,
endpoint=endpoint + '__slashless',
defaults=defaults)
rest_api_v1.add_resource(resource,
slashful,
endpoint=endpoint + '__slashful',
defaults=defaults)
def stack_analyses_debug(external_request_id):
"""Debug endpoint exposing operational data for particular stack analysis.
This endpoint is not part of the public API.
Note the existence of the data is not guaranteed,
therefore the endpoint can return 404 even for valid request IDs.
"""
results = retrieve_worker_results(rdb, external_request_id)
if not results:
return jsonify(error='No operational data for the request ID'), 404
response = {'tasks': []}
for result in results:
op_data = result.to_dict()
audit = op_data.get('task_result', {}).get('_audit', {})
task_data = {'task_name': op_data.get('worker')}
task_data['started_at'] = audit.get('started_at')
task_data['ended_at'] = audit.get('ended_at')
task_data['error'] = op_data.get('error')
response['tasks'].append(task_data)
return jsonify(response), 200
def templated(template=None):
def decorator(f):
@wraps(f)
def decorated_function(*args, **kwargs):
template_name = template
if template_name is None:
template_name = request.endpoint \
.replace('.', '/') + '.html'
ctx = f(*args, **kwargs)
if ctx is None:
ctx = {}
elif not isinstance(ctx, dict):
return ctx
return render_template(template_name, **ctx)
return decorated_function
return decorator
def init_values(self):
current_total = self.found if self.search else self.total
pages = divmod(current_total, self.per_page)
self.total_pages = pages[0] + 1 if pages[1] else pages[0]
self.has_prev = self.page > 1
self.has_next = self.page < self.total_pages
args = request.args.copy()
args.update(request.view_args.copy())
self.args = {}
for k, v in args.lists():
if len(v) == 1:
self.args[k] = v[0]
else:
self.args[k] = v
self.endpoint = request.endpoint
def should_update_email():
"""Check whether the current user has not set his or her email address.
Some authentication providers may not give the email addresses.
However,Railgun relies on email addresss to finish login process.
So the users from these providers will be given `fake` email addresses.
:class:`should_update_email` should be called before a view is executed,
check whether the user should be redirected to
:func:`~railgun.website.views.profile_edit` to fill in his or her
email address.
:return: :data:`True` if the user has a fake email address,:data:`False`
otherwise.
.. note::
If the current view is :func:`~railgun.website.views.profile_edit`,
this method will return :data:`False`.
"""
if (current_user.email.endswith(app.config['EXAMPLE_USER_EMAIL_SUFFIX'])
and request.endpoint != 'profile_edit'):
return True
def make_view(title, endpoint, *args, **kwargs):
"""A shortcut to make a :class:`NaviItem` linking to a standard
Flask view. Equal to the following statement:
.. code-block:: python
NaviItem(
title=title,
url=lambda: url_for(endpoint,*args,**kwargs),
identity=endpoint
)
:param title: The title of new :class:`NaviItem`
:param endpoint: The endpoint of target view.
:type endpoint: :class:`str`
:param args: The unnamed arguments to :func:`flask.url_for` when
generating the `url`.
:param kwargs: The named arguments to :func:`flask.url_for` when
generating the `url`.
"""
return NaviItem(title, lambda: url_for(endpoint, **kwargs),
endpoint)
def patch_validate_handler(name, bl=None):
def params_validate_handler():
if not request.endpoint \
or not request.endpoint.startswith('{0}.'.format(name)) \
or request.method == 'HEAD':
return
_, blueprint, endpoint = request.endpoint.split('.')
mapping = get_mapping(blueprint, endpoint)
if mapping:
params = request.get_json()
if validate(params, mapping, format_checker=formatchecker):
raise ArgumentError('Json schema validate Failed')
if bl:
bl.before_app_first_request(params_validate_handler)
else:
sys.modules['flask'].app.before_app_first_request(params_validate_handler)
return params_validate_handler
def ratelimit(limit, per=300, send_x_headers=True,
methods=["POST"],
over_limit=on_over_limit,
scope_func=scope_func,
key_func=lambda: request.endpoint):
def decorator(f):
def rate_limited(*args, **kwargs):
if request.method in methods:
key = 'rate-limit/%s/%s/' % (key_func(), scope_func())
rlimit = RateLimit(key, limit, per, send_x_headers)
g._view_rate_limit = rlimit
if over_limit is not None and rlimit.over_limit:
return over_limit(rlimit)
return f(*args, **kwargs)
return update_wrapper(rate_limited, f)
return decorator
def before_request():
request.start_time = datetime.Now()
# @bp.after_request
# def after_request(resp):
# try:
# if '_' in request.endpoint:
# dbcon = influx_db.connection
# point = [{"measurement": config.APP_NAME,
# "tags": {"method": request.method,"status": resp.status_code,"endpoint":
# request.endpoint},
# "fields": {"base_url": request.base_url,"remote_address": request.remote_addr,
# 'response_time': (datetime.Now() - request.start_time).microseconds}}]
# dbcon.write_points(point)
# except Exception as e:
# pass
# logger.debug('Write api statistics data to influxdb Failed,error?' + e.message)
# return resp
def ratelimit(limit,
scope_func=lambda: request.remote_addr,
key_func=lambda: request.endpoint,
path=lambda: request.path):
"""
Decorator for limiting the access to a route.
Returns the function if within the limit,otherwise TooManyRequests error
"""
def decorator(f):
@wraps(f)
def rate_limited(*args, **kwargs):
try:
key = 'rate-limit/%s/%s/' % (key_func(), send_x_headers)
g._view_rate_limit = rlimit
#if over_limit is not None and rlimit.over_limit:
if rlimit.over_limit:
raise TooManyRequests
return f(*args, **kwargs)
except Exception as e:
return error.format_exception(e, target=path(),
action=f.__name__)
return update_wrapper(rate_limited, f)
return decorator
def log_request(resp):
if request.endpoint != "get_doi_endpoint":
return
logging_start_time = time()
try:
results = json.loads(resp.get_data())["results"][0]
except (ValueError, RuntimeError, KeyError):
# don't bother logging if no results
return
oa_color = results["oa_color"]
if not oa_color:
oa_color = "gray"
body = {
"timestamp": datetime.utcNow().isoformat(),
"elapsed": elapsed(g.request_start_time, 2),
"ip": get_ip(),
"status_code": resp.status_code,
"email": request.args.get("email", None),
"doi": results["doi"],
"year": results.get("year",
"oa_color": oa_color
}
h = {
"content-type": "text/json",
"X-Forwarded-For": get_ip()
}
url = "http://logs-01.loggly.com/inputs/6470410b-1d7f-4cb2-a625-72d8fa867d61/tag/{}/".format(
oa_color)
requests.post(url, headers=h, data=json.dumps(body))
# logger.info(u"log_request took {} seconds".format(elapsed(logging_start_time,2)))
def print_ip():
user_agent = request.headers.get('User-Agent')
logger.info(u"calling from IP {ip}. User-Agent is '{user_agent}'.".format(
ip=get_ip(),
user_agent=user_agent
))
# this is the old way of expressing this endpoint.
# the new way is POST api.oadoi.org/
# you can give it an object that lists DOIs
# you can also give it an object that lists biblios.
# this is undocumented and is just for impactstory use Now.
def get_doi_endpoint(doi):
# the GET api endpoint (returns json data)
my_pub = get_pub_from_doi(doi)
return jsonify({"results": [my_pub.to_dict()]})
def get_doi_endpoint_v2(doi):
# the GET api endpoint (returns json data)
my_pub = get_pub_from_doi(doi)
return jsonify(my_pub.to_dict_v2())
def before_request():
if current_user.is_authenticated:
current_user.ping()
if not current_user.confirmed \
and request.endpoint[:5] != 'auth.' \
and request.endpoint != 'static':
return redirect(url_for('auth.unconfirmed'))
def check_jwt_authorization():
current_identity = getattr(_request_ctx_stack.top,
'current_identity', None)
if current_identity:
return current_identity
skip_check = False
if current_app.config.get("disable_jwt", False):
skip_check = True
if request.endpoint in current_app.view_functions:
fn = current_app.view_functions[request.endpoint]
# Check Flask-RESTful endpoints for openness
if hasattr(fn, "view_class"):
exempt = getattr(fn.view_class, "no_jwt_check", [])
if request.method in exempt:
skip_check = True
elif fn in _open_endpoints:
skip_check = True
# the static folder is open to all without authentication
if request.endpoint == "static" or request.url.endswith("favicon.ico"):
skip_check = True
# In case the endpoint requires no authorization,and the request does not
# carry any authorization info as well,we will not try to verify any JWT's
if skip_check and 'Authorization' not in request.headers:
return
token, auth_type = get_auth_token_and_type()
current_identity = verify_token(token, auth_type)
if auth_type == "JWT":
# Cache this token
cache_token(current_identity)
# Authorization token has Now been converted to a verified payload
_request_ctx_stack.top.current_identity = current_identity
return current_identity
def requires_roles(_roles):
"""
endpoint decorator to lock down an endpoint
on a set of roles (comma delimitered)
"""
def wrapper(fn):
@wraps(fn)
def decorator(*args, **kwargs):
if not _roles:
return fn(*args, **kwargs)
current_user = query_current_user()
if not current_user:
abort_unauthorized("You do not have access to this resource."
" It requires role '%s'" % _roles)
required_roles = set(_roles.split(","))
user_roles = set(current_user.get("roles", []))
if not required_roles.intersection(user_roles):
log.warning("User does not have the needed roles for this "
"call. User roles = '%s',required roles = "
"'%s'. current_user = '%s'",
current_user.get("roles", ""),
_roles, repr(current_user))
abort_unauthorized("You do not have access to this resource. "
"It requires role '%s'" % _roles)
return fn(*args, **kwargs)
return decorator
return wrapper
# List of open endpoints,i.e. not requiring a valid JWT.
def auth_audit_log(response):
"""On deployment remove the ``crossdomain`` decorator"""
try:
jdata = json.loads(request.data.decode())
if 'password' in jdata:
jdata['password'] = '*********'
jdata_str = json.dumps(jdata)
except ValueError:
jdata_str = ''
kwargs = {
'module': auth.name,
'user': current_user.name,
'email': current_user.email,
'action': _HTTP_METHOD_TO_AUDIT_MAP[request.method.lower()],
'data': addslashes(jdata_str),
'url': request.url,
'endpoint': request.endpoint,
'ip': request.remote_addr,
'status': response.status,
'timestamp': datetime.datetime.utcNow().strftime('%Y-%m-%d %H:%M:%s')
}
entry = []
for k, v in kwargs.items():
entry.append('{0!s}="{1!s}"'.format(k, v))
entry = ' '.join(entry)
current_app.audit_log.info('{0!s}'.format(entry))
return response
def api_audit_log(response):
"""Saves information about the request in the ``audit_log``
:param response: Server :class:`~flask.Response`
:return: :class:`~flask.Response`
"""
kwargs = {
'module': api.name,
'data': addslashes(request.data.decode()),
'timestamp': datetime.datetime.utcNow().strftime('%Y-%m-%d %H:%M:%s')
}
if not request.view_args and request.method.lower() == 'put':
kwargs['action'] = _HTTP_METHOD_TO_AUDIT_MAP['post']
entry = []
for k, v))
entry = ' '.join(entry)
current_app.audit_log.info('{0!s}'.format(entry))
return response
def all(self):
try:
return getattr(get_mailman_client(), pluralize(self.endpoint))
except AttributeError:
raise MailmanApiError
except MailmanConnectionError as e:
raise MailmanApiError(e)
def cp_audit_log(response):
"""Saves information about the request in the ``audit_log``
:param response: Server :class:`~flask.Response`
:return: :class:`~flask.Response`
"""
try:
jdata = json.loads(request.data.decode())
if 'password' in jdata:
jdata['password'] = '*********'
jdata_str = json.dumps(jdata)
except ValueError:
jdata_str = ''
kwargs = {
'module': cp.name,
'user': g.user.name,
'email': g.user.email, v))
entry = ' '.join(entry)
current_app.audit_log.info('{0!s}'.format(entry))
return response
def build_menu_item(endpoint, name):
is_active = "active" if request.endpoint == endpoint else ""
return Markup(
f'<li class="nav-item {is_active}">'
f'<a class="nav-link" href="{url_for(endpoint)}">{name}</a>'
f'</li>'
)
def build_menu_item_as_filter(endpoint):
is_active = "active" if request.endpoint == endpoint else ""
return Markup(
f'<li class="nav-item {is_active}">'
f'<a class="nav-link" href="{url_for(endpoint)}">{endpoint.upper()}</a>'
f'</li>'
)
def url_for_other_page(page):
args = request.view_args.copy()
args.update(request.args)
args['page'] = page
return url_for(request.endpoint, **args)
def navbar():
try:
return dict(navbar_pages=navbar_pages,
active=request.endpoint)
except RuntimeError:
return {} # maybe we don't care
def sort_link(order):
args = request.view_args.copy()
args['sort'] = order
return url_for(request.endpoint, **args)
def url_for_other_page(page):
args = request.view_args.copy()
args.update(request.args)
args['page'] = page
return url_for(request.endpoint, **args)
def before_request():
if current_user.is_authenticated:
current_user.ping()
if not current_user.confirmed \
and request.endpoint[:5] != 'auth.' \
and request.endpoint != 'static':
return redirect(url_for('auth.unconfirmed'))
def ask_login():
if not current_user.is_anonymous:
return
if getattr(current_app.view_functions[request.endpoint], 'no_login', False):
return
return redirect(url_for('ares.aresLogin', next=request.endpoint))
def ask_login():
if not current_user.is_anonymous:
return
if getattr(current_app.view_functions[request.endpoint], next=request.endpoint))
def app_after_request(response):
if request.endpoint != 'static':
return response
response.cache_control.max_age = 15552000
return response
# jinja_env
def _finish_span(self, response=None, exception=None):
""" Close and finish the active span if it exists. """
span = getattr(g, 'flask_datadog_span', None)
if span:
if span.sampled:
error = 0
code = response.status_code if response else None
method = request.method if request else None
# if we didn't get a response,but we did get an exception,set
# codes accordingly.
if not response and exception:
code = 500
# The 3 next lines might not be strictly required,since `set_traceback`
# also get the exception from the sys.exc_info (and fill the error Meta).
# Since we aren't sure it always work/for insuring no BC break,keep
# these lines which get overridden anyway.
error = 1
span.set_tag(errors.ERROR_TYPE, type(exception))
span.set_tag(errors.ERROR_MSG, exception)
# The provided `exception` object doesn't have a stack trace attached,
# so attach the stack trace with `set_traceback`.
span.set_traceback()
# the endpoint that matched the request is None if an exception
# happened so we fallback to a common resource
resource = code if not request.endpoint else request.endpoint
span.resource = compat.to_unicode(resource).lower()
span.set_tag(http.URL, compat.to_unicode(request.base_url or ''))
span.set_tag(http.STATUS_CODE, code)
span.set_tag(http.METHOD, method)
span.error = error
span.finish()
# Clear our span just in case.
g.flask_datadog_span = None
# Request hook methods
def before_request():
if current_user.is_authenticated\
and not current_user.confirmed \
and request.endpoint[:5] != 'auth.' \
and request.endpoint != 'static':
return redirect(url_for('auth.unconfirmed'))
def test_index(app, bitmap, client):
with app.test_request_context('/bitmapist/'):
assert request.endpoint == 'bitmapist.index'
def test_cohort(app, client):
with app.test_request_context('/bitmapist/cohort'):
assert request.endpoint == 'bitmapist.cohort'
def allow_public_endpoints_only():
public_endpoints = (HealthCheck.__name__.lower(),)
if g.endpoint not in public_endpoints:
abort(401)
def set_name_for_reseller(reseller_id):
if not reseller_id:
return None
if g.endpoint == ApplicationList.__name__.lower():
return generate_reseller_name()
return get_reseller_name(reseller_id)
def get_reseller_info():
reseller_id = request.headers.get('Aps-Instance-Id')
is_new = g.endpoint == ApplicationList.__name__.lower()
reseller_name = set_name_for_reseller(reseller_id)
oauth = get_oauth()
return ResellerInfo(id=reseller_id, name=reseller_name, is_new=is_new, auth=oauth)
def before_request():
g.log = dict()
g.log['out'] = list()
g.log['request'] = log_request(request)
g.endpoint = request.endpoint
if request.blueprint:
g.endpoint = g.endpoint[len(request.blueprint):].lstrip('.')
reseller_info = get_reseller_info()
g.reseller_name = reseller_info.name
g.company_name = 'N/A'
if not reseller_info.name:
allow_public_endpoints_only()
return
if not check_oauth_signature(request):
abort(401)
g.auth = reseller_info.auth
g.reseller = Reseller(reseller_info.name, reseller_info.id, None)
g.reseller.refresh()
if not g.reseller.token and not reseller_info.is_new:
abort(403)
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。