Python flask.request 模块,environ() 实例源码
我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用flask.request.environ()。
def request_validate(view):
@wraps(view)
def wrapper(*args, **kwargs):
endpoint = request.endpoint.partition('.')[-1]
# data
method = request.method
if method == 'HEAD':
method = 'GET'
locations = validators.get((endpoint, method), {})
data_type = {"json": "request_data", "args": "request_args"}
for location, schema in locations.items():
value = getattr(request, location, MultiDict())
validator = FlaskValidatorAdaptor(schema)
result = validator.validate(value)
LOG.info("Validated request %s: %s" % (location, result))
if schema.get("maxProperties") == 0:
continue
else:
kwargs[data_type[location]] = result
context = request.environ['context']
return view(*args, context=context, **kwargs)
return wrapper
def auth():
"""
Check user/password and returns token if valid
"""
if settings.API.get('forwarded_host'):
try:
if not request.environ['HTTP_X_FORWARDED_HOST'] == settings.API['forwarded_host']:
raise BadRequest('Invalid HTTP_X_FORWARDED_HOST')
except KeyError:
raise BadRequest('Missing HTTP_X_FORWARDED_HOST')
body = request.get_json()
authenticated, ret = GeneralController.auth(body)
if authenticated:
return ret
else:
raise Unauthorized(ret)
def createTeam():
""" """
if request.environ.get('HTTP_ORIGIN') == request.host_url[:-1]:
newTeam = Team.query.filter_by(team_name=request.args['team']).first()
dbEnv = Aressql.sqliteDB(request.args['report_name'])
if not newTeam:
team = Team(request.args['team'], request.args['team_email'])
db.session.add(team)
db.session.commit()
newTeam = Team.query.filter_by(team_name=request.args['team']).first()
team_id = newTeam.team_id
role = request.args.get('role', 'user')
dbEnv.modify("""INSERT INTO team_def (team_id,team_name,role) VALUES (%s,'%s','%s');
INSERT INTO env_auth (env_id,team_id)
SELECT env_def.env_id,%s
FROM env_def
WHERE env_def.env_name = '%s' ;""" % (team_id, request.args['team'], role, team_id, request.args['report_name']))
return json.dumps('Success'), 200
return json.dumps('Forbidden', 403)
def page_not_found(error):
logdate = datetime.strftime(date.today(), '%Y-%m-%d')
logfn = './logs/activity-'+logdate
user = 'Anonymous' # Username is Anonymous by default
if 'token' in session:
token = session['token']
tokenfilename = 'registered/'+token
with open(tokenfilename, 'r') as f: # for getting username associated with the set token
user = f.readline()[:-1]
with open(logfn, 'a') as f: # logging username,IP addr,error code
log = user+' '+request.environ['REMOTE_ADDR']+' 500\n'
f.write(log)
return render_template('500.html'), 500
# No cache
def post_comment():
"""Add post to article."""
form = PostForm()
article = request.environ["HTTP_REFERER"].split("=")[-1]
tim = time.time()
user = current_user.name
post = Post(author=user, article=article,
message=form.message.data, time=tim)
db.session.add(post)
user = current_user.name
event = Event(author=user,
event="COMMENT", time=time.time())
db.session.add(event)
db.session.commit()
return redirect("/biblio/article=" + article)
def send_error_mail(exception):
"""Sends an error mail to the admin containing the traceback and configuration.
After that,a custom HTTP 500 page is shown.
:param exception: the exception raised
:type exception: ``Exception``
:return:
the HTML to send back in the response,and the HTTP code 500.
:rtype: str,int
"""
# Inspired from <https://github.com/jasonwyatt/Flask-ErrorMail>.
message = Message("Join2 ORCID exception: %s" % exception, sender=CFG_SITE_ADMIN_EMAIL,
recipients=[CFG_SITE_ADMIN_EMAIL])
message_contents = ["Traceback:", "=" * 80, traceback.format_exc(), "\n", "Request information:", "=" * 80]
environ = request.environ
for key in sorted(environ.keys()):
message_contents.append("%s: %s" % (key, environ.get(key)))
message.body = "\n".join(message_contents) + "\n"
mailer.send(message)
return render_template("500.html"), 500
def run_flask_request(environ):
from .wsgi_aux import app
if '_wsgi.input' in environ:
environ['wsgi.input'] = BytesIO(environ['_wsgi.input'])
# Create a request context similar to that of the original request
# so that the task can have access to flask.g,flask.request,etc.
with app.request_context(environ):
# Record the fact that we are running in the Celery worker Now
g.in_celery = True
# Run the route function and record the response
try:
rv = app.full_dispatch_request()
except:
# If we are in debug mode we want to see the exception
# Else,return a 500 error
if app.debug:
raise
rv = app.make_response(InternalServerError())
return (rv.get_data(), rv.status_code, rv.headers)
def wrapped(*args, **kwargs):
# If we are already running the request on the celery side,then we
# just call the wrapped function to allow the request to execute.
if getattr(g, 'in_celery', False):
return f(*args, **kwargs)
# If we are on the Flask side,we need to launch the Celery task,
# passing the request environment,which will be used to reconstruct
# the request object. The request body has to be handled as a special
# case,since Wsgi requires it to be provided as a file-like object.
environ = {k: v for k, v in request.environ.items()
if isinstance(v, text_types)}
if 'wsgi.input' in request.environ:
environ['_wsgi.input'] = request.get_data()
t = run_flask_request.apply_async(args=(environ,))
# Return a 202 response,with a link that the client can use to
# obtain task status that is based on the Celery task id.
if t.state == states.PENDING or t.state == states.RECEIVED or \
t.state == states.STARTED:
return '', 202, {'Location': url_for('tasks.get_status', id=t.id)}
# If the task already finished,return its return value as response.
# This would be the case when CELERY_ALWAYS_EAGER is set to True.
return t.info
def error():
"""This endpoint is used by httpd,which redirects its errors to it."""
try:
status = int(request.environ['REDIRECT_STATUS'])
except Exception:
# if there's an exception,it means that a client accessed this directly;
# in this case,we want to make it look like the endpoint is not here
return api_404_handler()
msg = 'UnkNown error'
# for Now,we just provide specific error for stuff that already happened;
# before adding more,I'd like to see them actually happening with reproducers
if status == 401:
msg = 'Authentication Failed'
elif status == 405:
msg = 'Method not allowed for this endpoint'
raise HTTPError(status, msg)
def flask_cache_key(*args, **kwargs):
"""Create a key for use by Flask-Caching.
Args:
None
Returns:
result: Key to be used
"""
# Use the request URI as part of the key
path = request.path
# This helps to differentiate between varIoUs instances of infoset
# each running on different ports
server_port = request.environ['SERVER_PORT']
# Use a hash of the request arguments as part of the key
args = str(hash(frozenset(request.args.items())))
# Return
result = (
'infoset_flask_{}_{}_{}'.format(
path, server_port, args)[:255].encode('utf-8'))
return result
def run_ctx_request(environ):
"""
run flask request context in celery worker
"""
from blueprints import app # wsgi.app
if '_wsgi.input' in environ:
# an input stream (file-like object) from which the HTTP request body can be read.
# detail: https://www.python.org/dev/peps/pep-0333/#environ-variables
environ['wsgi.input'] = BytesIO(environ['_wsgi.input'])
with app.request_context():
g.in_celery = True
try:
rv = app.full_dispatch_request()
except InternalServerError:
if app.debug:
raise
return app.make_response(InternalServerError())
return (rv.get_data(), rv.headers)
def decorator(*args, **kwargs):
if getattr(g, **kwargs)
environ = {k: v for k, text_types)}
if 'wsgi.input' in request.environ:
environ['_wsgi.input'] = request.get_data() # request.body
task = run_ctx_request.apply_async(args=(environ,))
if task.state == states.PENDING or task.state == states.RECEIVED or \
task.state == states.STARTED:
return '', {'Location': url_for('api.get_status', id=task.id)}
return task.info
def __init__(self, rate_limiter):
super(LogApiCallCount, self).__init__()
if self.stats:
tags = ['version:'+str(Config.API_VERSION_MInor),
'application:rest-api',
'api-key:'+rate_limiter._auth_key.id,
'method:'+request.environ['REQUEST_METHOD'],
'endpoint:'+request.url_rule.rule.split('<')[0].split(request.blueprint)[1]]
ip = get_remote_addr()
self.stats.track(get_remote_addr(),
'api_call_count',
properties=dict(tags=tags,
ip=ip,
ignore_time = True,
))
# self.stats.increment('api.call.count',
# tags=tags,)
def get_debug_flag(default=None):
val = os.environ.get('FLASK_DEBUG')
if not val:
return default
return val not in ('0', 'false', 'no')
def get_debug_flag(default=None):
val = os.environ.get('FLASK_DEBUG')
if not val:
return default
return val not in ('0', 'no')
def get_debug_flag(default=None):
val = os.environ.get('FLASK_DEBUG')
if not val:
return default
return val not in ('0', 'no')
def twitter():
auth = tweepy.OAuthHandler("T4NRPcEtUrCEU58FesRmRtkdW", "zmpbytgPpSbro6RZcXsKgYQoz24zLH3vYZHOHAAs5j33P4eoRg", "http://"+ request.environ["HTTP_HOST"] + "/auth/twitter")
auth.set_access_token(config.TWITTER_ACCESS_TOKEN, config.TWITTER_ACCESS_TOKEN_SECRET)
api = tweepy.API(auth)
try:
if api.me().name:
return redirect(url_for('index'))
except tweepy.TweepError:
pass
redirect_url = auth.get_authorization_url()
session["request_token"] = auth.request_token
return redirect(redirect_url)
def make_response(self, rv):
status_or_headers = headers = None
if isinstance(rv, tuple):
rv, status_or_headers, headers = rv + (None,) * (3 - len(rv))
if rv is None:
raise ValueError('View function did not return a response')
if isinstance(status_or_headers, (dict, list)):
headers, status_or_headers = status_or_headers, None
if not isinstance(rv, self.response_class):
# When we create a response object directly,we let the constructor
# set the headers and status. We do this because there can be
# some extra logic involved when creating these objects with
# specific values (like default content type selection).
if isinstance(rv, (JSONRender, text_type, bytes, bytearray, list, dict)):
rv = self.response_class(rv, headers=headers, status=status_or_headers)
headers = status_or_headers = None
else:
rv = self.response_class.force_type(rv, request.environ)
if status_or_headers is not None:
if isinstance(status_or_headers, string_types):
rv.status = status_or_headers
else:
rv.status_code = status_or_headers
if headers:
rv.headers.extend(headers)
return rv
def force_type(cls, rv, environ=None):
if isinstance(rv, dict) or isinstance(rv, list):
rv = Response(
json.dumps(
rv,
cls=TimestampJSONEncoder,
),
content_type='application/json'
)
return super(CerberusResponse, cls).force_type(rv, environ)
def get_http_info_with_retriever(self, retriever=None):
"""
Exact method for getting http_info but with form data work around.
"""
if retriever is None:
retriever = self.get_form_data
url_parts = urlparse.urlsplit(request.url)
try:
data = retriever()
except Clientdisconnected:
data = {}
headers = dict(get_headers(request.environ))
if self.data_blacklist:
data = apply_blacklist(data, self.data_blacklist)
if self.headers_blacklist:
headers = apply_blacklist(headers, self.headers_blacklist)
return {
'url': '%s://%s%s' % (url_parts.scheme, url_parts.netloc, url_parts.path),
'query_string': url_parts.query,
'method': request.method,
'data': data,
'headers': headers,
'env': dict(get_environ(request.environ)),
}
def get_debug_flag(default=None):
val = os.environ.get('FLASK_DEBUG')
if not val:
return default
return val not in ('0', 'no')
def get_script_url():
# Dynamically determine the script's url
# For production use,this is not a great idea. Instead,set it
# explicitly. Remember that for production,webhook urls must start with
# https!
my_url = rm_queryparameters(full_url(request.environ))
# See http://flask.pocoo.org/docs/0.10/api/#flask.request
return my_url
def url_origin(s, use_forwarded_host = False):
# testing if Heroku includes forwarding host
use_forwarded_host = True
include_protocol = True
ssl = (('HTTPS' in s) and s['HTTPS'] == 'on')
sp = s['SERVER_PROTOCOL'].lower()
protocol = sp[:sp.find('/')] + ('s' if ssl else '' )
port = s['SERVER_PORT']
port = '' if ((not ssl and port=='80') or (ssl and port=='443')) else (':' + port)
host = s['HTTP_X_FORWARDED_HOST'] if (use_forwarded_host and ('HTTP_X_FORWARDED_HOST' in s)) \
else (s['HTTP_HOST'] if ('HTTP_HOST' in s) else None)
host = host if (host != None) else (s['SERVER_NAME'] + port)
# The protocol can easily be wrong if we're frontended by a HTTPS proxy
# (Like the standard Heroku setup!)
on_heroku = heroku_env in os.environ
upgrade_insecure_request = request.headers.get('Upgrade-Insecure-Requests')
upgrade_insecure_request = upgrade_insecure_request and upgrade_insecure_request == 1
https_proto = request.headers.get('X-Forwarded-Proto')
https_proto = https_proto and https_proto == 'https'
use_https = on_heroku or upgrade_insecure_request or https_proto
if use_https: # Special handling
protocol = "https"
return protocol + '://' + host
def editScript():
""" """
scriptDtls = request.environ['HTTP_REFERER'].split('?')[0].split('/')
if scriptDtls[-2] == 'run':
script = open(os.path.join(config.ARES_USERS_LOCATION, scriptDtls[-1], "%s.py" % scriptDtls[-1]), 'w')
else:
script = open(os.path.join(config.ARES_USERS_LOCATION, scriptDtls[-2], 'w')
try:
for line in request.data.decode('utf-8').split('\n'):
script.write('%s\n' % line)
except Exception as e:
logging.debug(e)
finally:
script.close()
return 'OK'
def get_debug_flag(default=None):
val = os.environ.get('FLASK_DEBUG')
if not val:
return default
return val not in ('0', 'no')
def on_connect():
addr = request.environ['REMOTE_ADDR']
if addr in blacklist:
print('{} was found in the blacklist and was rejected'.format(addr))
shard.update_connection(request)
def get_debug_flag(default=None):
val = os.environ.get('FLASK_DEBUG')
if not val:
return default
return val not in ('0', 'no')
def get_debug_flag(default=None):
val = os.environ.get('FLASK_DEBUG')
if not val:
return default
return val not in ('0', 'no')
def get_debug_flag(default=None):
val = os.environ.get('FLASK_DEBUG')
if not val:
return default
return val not in ('0', 'no')
def after_request(resp):
"""
Adds HTTP Headers to the response
Arguments
----------
resp : flask.Response object
the Flask response object
"""
resp.headers['Cache-Control'] = 'no-cache'
if app.config['DEV']:
resp.headers['Access-Control-Allow-Origin'] = '*'
resp.headers['Access-Control-Allow-Methods'] = 'GET,POST'
resp.headers['Access-Control-Allow-Headers'] = 'Origin,X-Requested-With,Content-Type,Accept'
source_ip = get_real_source_ip()
structured_log(
level='info',
msg="HTTP Request",
method=request.method,
uri=request.path,
status=resp.status_code,
src_ip=source_ip,
protocol=request.environ['SERVER_PROTOCOL'],
user_agent=request.environ.get('HTTP_USER_AGENT', '-')
)
return resp
def get_real_source_ip():
"""
Returns the real source IP address of the HTTP request.
"""
if 'X-Forwarded-For' in request.headers:
return request.headers.getlist("X-Forwarded-For")[0].rpartition(' ')[-1]
else:
return request.environ['REMOTE_ADDR']
def register():
with open('users.pkl', 'r') as f:
userdict = load(f)
if request.method == 'GET':
logdate = datetime.strftime(date.today(), '%Y-%m-%d')
logfn = './logs/activity-'+logdate
user = 'Anonymous' # Username is Anonymous by default
if 'token' in session:
token = session['token']
tokenfilename = 'registered/'+token
with open(tokenfilename, 'r') as f: # for getting username associated with the set token
user = f.readline()[:-1]
with open(logfn,end-point,request type
log = user+' '+request.environ['REMOTE_ADDR']+' register'+' GET\n'
f.write(log)
return render_template('register.html', userdict=userdict) # inputs for name,email,timezone,phone,aboutme
elif request.method == 'POST':
uname = str(request.form['uname']) # mandatory
email = str(request.form['email']) # mandatory
timezone = request.form['timezone'] # optional
phone = request.form['phone'] # optional
aboutme = request.form['aboutme'] # optional
token = pbkdf2_sha512.encrypt(email) # setting the token as a salted and hashed email
token = token.replace('/', '+') # filenames can't contain '/'
session['token'] = token # set token in session key on successful registration
fn = 'registered' + path.sep + token
with open(fn, 'w') as f: # write reviewer info into a file named by the token
f.write(uname+'\n'+email+'\n'+timezone+'\n'+phone+'\n'+aboutme+'\n')
f.write('--files--\n')
userdict[uname] = email
with open('users.pkl', 'w') as f: # dictionary with keys as usernames and values as emails
dump(userdict, f)
send_email(email, 'Welcome to AROWF!', 'registration_mail', name=uname, token=token) # send welcome email with token
logdate = datetime.strftime(date.today(), '%Y-%m-%d')
logfn = './logs/activity-'+logdate
with open(logfn,request type
log = uname+' '+request.environ['REMOTE_ADDR']+' register'+' POST\n'
f.write(log)
return redirect(url_for('index'))
def token():
if request.method == 'GET':
logdate = datetime.strftime(date.today(),request type
log = user+' '+request.environ['REMOTE_ADDR']+' token'+' GET\n'
f.write(log)
tokenNames = listdir('registered/') # get list of all tokens
return render_template('token.html', user=user, tokenNames=tokenNames) # displays links to help docs for each end-point
elif request.method == 'POST': # if token not set in session key
if request.form['tokeninput'] != 'null':
session['token'] = request.form['tokeninput'] # obtain from form and set it
else:
session.pop('token', None)
logdate = datetime.strftime(date.today(),request type
log = user+' '+request.environ['REMOTE_ADDR']+' token'+' POST\n'
f.write(log)
return redirect(url_for('index'))
def page_not_found(error):
logdate = datetime.strftime(date.today(),error code
log = user+' '+request.environ['REMOTE_ADDR']+' 404\n'
f.write(log)
return render_template('404.html'), 404
def get_debug_flag(default=None):
val = os.environ.get('FLASK_DEBUG')
if not val:
return default
return val not in ('0', 'no')
def update_entry():
"""Add a new entry to the bibliography."""
form = BiblioForm()
article_name = request.environ["HTTP_REFERER"].split("=")[-1]
if form.validate_on_submit():
article = BiblioEntry.query.filter_by(ID=form.ID.data).first()
article.ID = form.ID.data
article.ENTRYTYPE = form.typ.data
article.authors = form.author.data
article.title = form.title.data
article.year = form.year.data
article.journal = form.journal.data
article.school = form.school.data
article.url = form.url.data
article.keywords = form.keywords.data
article.tag = form.tag.data
db.session.add(article)
user = current_user.name
event = Event(author=user, article=form.ID.data,
event="UPDATE", time=time.time())
db.session.add(event)
db.session.commit()
return redirect("/biblio/article=" + article_name)
return redirect("/biblio")
def get_debug_flag(default=None):
val = os.environ.get('FLASK_DEBUG')
if not val:
return default
return val not in ('0', 'no')
def get_debug_flag(default=None):
val = os.environ.get('FLASK_DEBUG')
if not val:
return default
return val not in ('0', 'no')
def get_debug_flag(default=None):
val = os.environ.get('FLASK_DEBUG')
if not val:
return default
return val not in ('0', 'no')
def get_debug_flag(default=None):
val = os.environ.get('FLASK_DEBUG')
if not val:
return default
return val not in ('0', 'no')
def ensure_secure_request():
is_request_secure = request.environ['wsgi.url_scheme'] == 'https'
if not is_request_secure and not config.allow_insecure_transport:
if request.method in ('POST', 'PUT', 'PATCH'):
# request body already sent in insecure manner
# return error in this case to notify cluster admin
return abort(400)
else:
return redirect(request.url.replace('http://', 'https://', 1))
def get_debug_flag(default=None):
val = os.environ.get('FLASK_DEBUG')
if not val:
return default
return val not in ('0', 'no')
def get_debug_flag(default=None):
val = os.environ.get('FLASK_DEBUG')
if not val:
return default
return val not in ('0', 'no')
def swagger(file_name):
with open('/usr/local/share/ari/api-docs/{file_name}'.format(file_name=file_name), 'r') as swagger_file:
swagger_spec = swagger_file.read()
swagger_spec = swagger_spec.replace('localhost:8088', 'ari:{port}'.format(port=request.environ['SERVER_PORT']))
return make_response(swagger_spec, 200, {'Content-Type': 'application/json'})
def swagger(file_name):
with open('/usr/local/share/ari/api-docs/{file_name}'.format(file_name=file_name), {'Content-Type': 'application/json'})
def swagger(file_name):
with open('/usr/local/share/ari/api-docs/{file_name}'.format(file_name=file_name), {'Content-Type': 'application/json'})
def swagger(file_name):
with open('/usr/local/share/ari/api-docs/{file_name}'.format(file_name=file_name), {'Content-Type': 'application/json'})
def swagger(file_name):
with open('/usr/local/share/ari/api-docs/{file_name}'.format(file_name=file_name), {'Content-Type': 'application/json'})
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。