Python flask 模块,Response() 实例源码
我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用flask.Response()。
def get(self, oid):
"""Get an object.
Returns an item from the DB with the request.data JSON object or all
the items if oid == None
:arg self: The class of the object to be retrieved
:arg integer oid: the ID of the object in the DB
:returns: The JSON item/s stored in the DB
"""
try:
ensure_authorized_to('read', self.__class__)
query = self._db_query(oid)
json_response = self._create_json_response(query, oid)
return Response(json_response, mimetype='application/json')
except Exception as e:
return error.format_exception(
e,
target=self.__class__.__name__.lower(),
action='GET')
def export_to_csv(self, ids):
qs = json.loads(self.model.objects(id__in=ids).to_json())
def generate():
yield ','.join(list(max(qs, key=lambda x: len(x)).keys())) + '\n'
for item in qs:
yield ','.join([str(i) for i in list(item.values())]) + '\n'
return Response(
generate(),
mimetype="text/csv",
headers={
"Content-disposition":
"attachment;filename=%s.csv" % self.model.__name__.lower()
}
)
def enqueue_mwoffliner():
token = UserJWT.from_request_header(request)
# only admins can enqueue task
if not token.is_admin:
raise exception.NotEnoughPrivilege()
def check_task(config: dict):
# check config is a dict
if not isinstance(config, dict):
raise exception.InvalidRequest()
# check config has mandatory data
if config.get('mwUrl') is None or config.get('adminemail') is None:
raise exception.InvalidRequest()
def enqueue_task(config: dict):
task_name = 'mwoffliner'
celery_task = celery.send_task(task_name, kwargs={
'token': TaskJWT.new(task_name),
'config': config
})
TasksCollection().insert_one({
'_id': celery_task.id,
'celery_task_name': task_name,
'status': 'PENDING',
'time_stamp': {'created': datetime.utcNow(), 'started': None, 'ended': None},
'options': config,
'steps': []
})
task_configs = request.get_json()
if not isinstance(task_configs, list):
raise exception.InvalidRequest()
for task_config in task_configs:
check_task(task_config)
for task_config in task_configs:
enqueue_task(task_config)
return Response(status=202)
def retrieveAlertsCyber():
""" Retrieve Alerts from ElasticSearch and return formatted
XML with limited alert content
"""
# get result from cache
getCacheResult = getCache(request.url, "url")
if getCacheResult is not False:
app.logger.debug('Returning /retrieveAlertsCyber from Cache for %s' % str(request.remote_addr))
return Response(getCacheResult)
# query ES
else:
returnResult = formatAlertsXml(queryAlerts(app.config['MAXALERTS'], checkCommunityIndex(request)))
setCache(request.url, returnResult, 1, "url")
app.logger.debug('Returning /retrieveAlertsCyber from ES for %s' % str(request.remote_addr))
return Response(returnResult, mimetype='text/xml')
def querySingleIP():
""" Retrieve Attack data from index about a single IP
"""
# get result from cache
getCacheResult = getCache(request.url, "url")
if getCacheResult is not False:
app.logger.debug('Returning /querySingleIP from Cache for %s' % str(request.remote_addr))
return Response(getCacheResult)
# query ES
else:
returnResult = formatSingleIP(queryForSingleIP(app.config['MAXALERTS'], request.args.get('ip'), 60, "url")
app.logger.debug('Returning /querySingleIP from ES for %s' % str(request.remote_addr))
return Response(returnResult, mimetype='text/xml')
# Routes with both XML and JSON output
def retrieveAlertsCount():
""" Retrieve number of alerts in timeframe (GET-Parameter time as decimal or "day") """
# Retrieve Number of Alerts from ElasticSearch and return as xml / json
if not request.args.get('time'):
app.logger.error('No time GET-parameter supplied in retrieveAlertsCount. Must be decimal number (in minutes) or string "day"')
return app.config['DEFAULTRESPONSE']
else:
if request.args.get('out') and request.args.get('out') == 'json':
# get result from cache
getCacheResult = getCache(request.url, "url")
if getCacheResult is not False:
return jsonify(getCacheResult)
else:
returnResult = formatAlertsCount(queryAlertsCount(request.args.get('time'), checkCommunityIndex(request)), 'json')
setCache(request.url, "url")
return jsonify(returnResult)
else:
# get result from cache
getCacheResult = getCache(request.url, "url")
if getCacheResult is not False:
return Response(getCacheResult, mimetype='text/xml')
else:
returnResult = formatAlertsCount(queryAlertsCount(request.args.get('time'), 'xml')
setCache(request.url, "url")
return Response(returnResult, mimetype='text/xml')
def retrieveIPs():
""" Retrieve IPs from ElasticSearch and return formatted XML or JSON with IPs """
if request.args.get('out') and request.args.get('out') == 'json':
getCacheResult = getCache(request.url, "url")
if getCacheResult is not False:
return jsonify(getCacheResult)
else:
returnResult = formatBadIP(
queryBadIPs(app.config['BADIPTIMESPAN'], 'json')
setCache(request.url, "url")
return jsonify(returnResult)
else:
getCacheResult = getCache(request.url, "url")
if getCacheResult is not False:
return Response(getCacheResult, mimetype='text/xml')
else:
returnResult = formatBadIP(
queryBadIPs(app.config['BADIPTIMESPAN'], 'xml')
setCache(request.url, "url")
return Response(returnResult, mimetype='text/xml')
# Routes with JSON output
def getSimpleMessage():
return Response("POST is required for this action.", mimetype='text/html', status=500)
def authed(func: Callable[[], str]) -> Callable[[], Union[Response, str]]:
""" Given a function returns one that requires basic auth """
@wraps(func)
def decorator():
auth = request.authorization
if auth and validAuth(auth.username, auth.password):
return func()
return authFailure()
return decorator
def cache(self, seconds=60):
def inner_decorator(f):
@wraps(f)
def wrapper(*args, **kwds):
resp = f(*args, **kwds)
if not isinstance(resp, flask.Response):
resp = flask.make_response(resp)
resp.headers['Cache-Control'] = 'public,max-age={}'.format(seconds)
resp.headers["Expires"] = time.strftime("%a,%d %b %Y %H:%M:%s GMT", time.gmtime(time.time() + seconds))
resp.add_etag()
return resp
return wrapper
return inner_decorator
def require_api_token(self, f):
@wraps(f)
def wrapper(*args, **kwds):
token = flask.request.headers.get('X-Status-API-Token', flask.request.headers.get('X-Live-Status-API-Token'))
if token != self.settings['api_token']:
resp = flask.make_response(flask.jsonify({}), 403)
else:
resp = f(*args, flask.Response):
resp = flask.make_response(resp)
resp.headers['Cache-Control'] = 'no-cache,no-store,must-revalidate'
resp.headers['Pragma'] = 'no-cache'
resp.headers['Expires'] = '0'
return resp
return wrapper
def _chat():
data = request.args
question = data.get('question')
session = data.get('session')
lang = data.get('lang', 'en')
query = data.get('query', 'false')
query = query.lower() == 'true'
request_id = request.headers.get('X-Request-Id')
marker = data.get('marker', 'default')
response, ret = ask(
question, lang, session, query, request_id=request_id, marker=marker)
return Response(json_encode({'ret': ret, 'response': response}),
mimetype="application/json")
def _batch_chat():
auth = request.form.get('Auth')
if not auth or not check_auth(auth):
return authenticate()
questions = request.form.get('questions')
questions = json.loads(questions)
session = request.form.get('session')
lang = request.form.get('lang', 'en')
responses = []
for idx, question in questions:
response, ret = ask(str(question), session)
responses.append((idx, response, ret))
return Response(json_encode({'ret': 0, 'response': responses}),
mimetype="application/json")
def _said():
data = request.args
session = data.get('session')
message = data.get('message')
ret, response = said(session, message)
return Response(json_encode({'ret': ret,
mimetype="application/json")
def _rate():
data = request.args
response = ''
try:
ret = rate_answer(data.get('session'), int(
data.get('index')), data.get('rate'))
except Exception as ex:
response = ex.message
return Response(json_encode({'ret': ret,
mimetype="application/json")
def _start_session():
botname = request.args.get('botname')
user = request.args.get('user')
test = request.args.get('test', 'false')
refresh = request.args.get('refresh', 'false')
test = test.lower() == 'true'
refresh = refresh.lower() == 'true'
sid = session_manager.start_session(
user=user, key=botname, test=test, refresh=refresh)
sess = session_manager.get_session(sid)
sess.sdata.botname = botname
sess.sdata.user = user
return Response(json_encode({'ret': 0, 'sid': str(sid)}),
mimetype="application/json")
def _sessions():
sessions = session_manager.list_sessions()
return Response(json_encode({'ret': 0, 'response': sessions}),
mimetype="application/json")
def _set_weights():
data = request.args
lang = data.get('lang', None)
param = data.get('param')
sid = data.get('session')
ret, response = set_weights(param, sid)
if ret:
sess = session_manager.get_session(sid)
if sess and hasattr(sess.sdata, 'weights'):
logger.info("Set weights {} successfully".format(sess.sdata.weights))
else:
logger.info("Set weights Failed.")
return Response(json_encode({'ret': ret,
mimetype="application/json")
def _set_context():
data = request.args
context_str = data.get('context')
context = {}
for tok in context_str.split(','):
k, v = tok.split('=')
context[k.strip()] = v.strip()
sid = data.get('session')
ret, response = set_context(context, sid)
return Response(json_encode({'ret': ret,
mimetype="application/json")
def _remove_context():
data = request.args
keys = data.get('keys')
keys = keys.split(',')
sid = data.get('session')
ret, response = remove_context(keys,
mimetype="application/json")
def _update_config():
data = request.args.to_dict()
for k, v in data.iteritems():
if v.lower() == 'true':
data[k]=True
elif v.lower() == 'false':
data[k]=False
elif re.match(r'[0-9]+', v):
data[k]=int(v)
elif re.match(r'[0-9]+\.[0-9]+', v):
data[k]=float(v)
else:
data[k]=str(v)
ret, response = update_config(**data)
return Response(json_encode({'ret': ret,
mimetype="application/json")
def _log():
def generate():
with open(LOG_CONfig_FILE) as f:
for row in f:
yield row
return Response(generate(), mimetype='text/plain')
def _reset_session():
data = request.args
sid = data.get('session')
if session_manager.has_session(sid):
session_manager.reset_session(sid)
ret, response = True, "Session reset"
else:
ret, response = False, "No such session"
return Response(json_encode({
'ret': ret,
'response': response
}),
mimetype="application/json")
def _dump_history():
try:
dump_history()
ret, "Success"
except Exception:
ret, "Failure"
return Response(json_encode({
'ret': ret,
mimetype="application/json")
def _stats():
try:
data = request.args
days = int(data.get('lookback', 7))
dump_history()
response = history_stats(HISTORY_DIR, days)
ret = True
except Exception as ex:
ret, {'err_msg': str(ex)}
logger.error(ex)
return Response(json_encode({'ret': ret,
mimetype="application/json")
def authenticate():
return Response(json_encode({'ret': 401, 'response': {'text': 'Could not verify your access'}}),
mimetype="application/json")
def add_virtual_tn():
"""
The VirtualTN resource endpoint for adding VirtualTN's from the pool.
"""
body = request.json
try:
value = str(body['value'])
assert len(value) <= 18
except (AssertionError, KeyError):
raise InvalidAPIUsage(
"required argument: 'value' (str,length <= 18)",
payload={'reason':
'invalidAPIUsage'})
virtual_tn = VirtualTN(value)
try:
db_session.add(virtual_tn)
db_session.commit()
except IntegrityError:
db_session.rollback()
msg = ("Did not add virtual TN {} to the pool "
"-- already exists").format(value)
log.info({"message": msg})
raise InvalidAPIUsage(
"Virtual TN already exists",
payload={'reason':
'duplicate virtual TN'})
return Response(
json.dumps(
{"message": "Successfully added TN to pool",
"value": value}),
content_type="application/json")
def list_virtual_tns():
"""
The VirtualTN resource endpoint for listing VirtualTN's from the pool.
"""
virtual_tns = VirtualTN.query.all()
res = [{'value': tn.value, 'session_id': tn.session_id} for tn in virtual_tns]
available = len([tn.value for tn in virtual_tns if tn.session_id is None])
return Response(
json.dumps({"virtual_tns": res,
"pool_size": len(res),
"available": available,
"in_use": len(res) - available}),
content_type="application/json")
def list_proxy_sessions():
"""
The ProxySession resource endpoint for listing ProxySessions
from the pool.
"""
sessions = ProxySession.query.all()
res = [{
'id': s.id,
'date_created': s.date_created.strftime('%Y-%m-%d %H:%M:%s'),
'virtual_tn': s.virtual_TN,
'participant_a': s.participant_a,
'participant_b': s.participant_b,
'expiry_date': s.expiry_date.strftime('%Y-%m-%d %H:%M:%s')
if s.expiry_date else None}
for s in sessions]
return Response(
json.dumps({"total_sessions": len(res), "sessions": res}),
content_type="application/json")
def delete_session():
"""
The ProxySession resource endpoint for removing a ProxySession
to the pool.
"""
body = request.json
try:
session_id = str(body['session_id'])
except (KeyError):
raise InvalidAPIUsage(
"required argument: 'session_id' (str)",
payload={'reason':
'invalidAPIUsage'})
try:
session = ProxySession.query.filter_by(id=session_id).one()
except noresultFound:
msg = ("ProxySession {} Could not be deleted because"
" it does not exist".format(session_id))
log.info({"message": msg,
"status": "Failed"})
raise InvalidAPIUsage(
msg, status_code=404,
payload={'reason':
'ProxySession not found'})
participant_a, participant_b, virtual_tn = ProxySession.terminate(
session_id)
recipients = [participant_a, participant_b]
send_message(
recipients,
virtual_tn.value,
SESSION_END_MSG,
session_id,
is_system_msg=True)
msg = "Ended session {} and released {} back to pool".format(
session_id, virtual_tn.value)
log.info({"message": msg, "status": "succeeded"})
return Response(
json.dumps({"message": "Successfully ended the session.",
"status": "succeeded",
"session_id": session_id}),
content_type="application/json")
def getSwaggerjson():
json_file = open(os.path.join(
"./", "messages.swagger.json"), "r")
json = json_file.read()
json_file.close()
resp = Response(response=json,
status=200,
mimetype="application/json")
return resp
def corpora_list():
page = 1
query = None
category = None
doc = None
if 'page' in request.args:
page = request.args['page']
if 'query' in request.args:
query = request.args['query']
if 'category' in request.args:
category = request.args['category']
if 'doc' in request.args:
doc = unquote(request.args['doc'])
return Response(json.dumps(get_sentences(page=page, query=query, category=category, document=doc)),
mimetype='application/json')
def corpora_docs_list():
return Response(json.dumps(get_doc_list()), mimetype='application/json')
def docs_list():
page = 1
doc_type = None
state = None
if 'page' in request.args:
page = request.args['page']
if 'doc_type' in request.args:
doc_type = request.args['doc_type']
if 'state' in request.args:
state = request.args['state']
states = [state]
if state == ST_PENDING:
states.append(ST_PROCESSING)
if state == ST_PROCESSED:
states.append(ST_ERROR)
return Response(json.dumps(get_queue_list(page=page, doc_type=doc_type, states=states), default=datetime_handler),
mimetype='application/json')
def geocoding_list():
activity_id = None
queue_id = None
if 'activity_id' in request.args:
activity_id = request.args['activity_id']
if 'queue_id' in request.args:
queue_id = request.args['queue_id']
return Response(json.dumps(get_geocoding_list(activity_id=activity_id, queue_id=queue_id),
default=datetime_handler), mimetype='application/json')
def activity_list():
document_id = None
if 'document_id' in request.args:
document_id = request.args['document_id']
return Response(json.dumps(get_activity_list(document_id=document_id), mimetype='application/json')
def process_demoroom_members():
# Verify that the request is propery authorized
#authz = valid_request_check(request)
#if not authz[0]:
# return authz[1]
status = 200
if request.method == "POST":
data = request.form
try:
sys.stderr.write("Adding %s to demo room.\n" % (data["email"]))
reply=send_welcome_message(data["email"])
status = 201
resp = Response(reply, content_type='application/json', status=status)
except KeyError:
error = {"Error":"API Expects dictionary object with single element and key of 'email'"}
status = 400
resp = Response(json.dumps(error), status=status)
# demo_room_members = get_membership()
# resp = Response(
# json.dumps(demo_room_members,sort_keys=True,indent = 4,separators = (',',': ')),
# content_type='application/json',
# status=status)
else:
resp = Response("OK", status=status)
return resp
def remove_authorised_client(ip_addr_str=None):
"""Forgets that a client has been seen recently to allow running tests"""
source_ip = request.headers["X-Forwarded-For"]
if source_ip in _client_map:
del _client_map[source_ip]
return Response(status=204)
def custom_css():
return Response(get_config("css"), mimetype='text/css')
# Static HTML files
def post_json(self, endpoint: str, data: JsonDict) -> Response:
return self.client.post(endpoint,
content_type="application/json",
data=json.dumps(data))
def test_permalinks_work(self):
db = InMemoryDemoDatabase()
app = make_app(build_dir=self.TEST_DIR, demo_db=db)
predictor = CountingPredictor()
app.predictors = {"counting": predictor}
app.testing = True
client = app.test_client()
def post(endpoint: str, data: JsonDict) -> Response:
return client.post(endpoint, content_type="application/json", data=json.dumps(data))
data = {"some": "input"}
response = post("/predict/counting", data=data)
assert response.status_code == 200
result = json.loads(response.get_data())
slug = result.get("slug")
assert slug is not None
response = post("/permadata", data={"slug": "not the right slug"})
assert response.status_code == 400
response = post("/permadata", data={"slug": slug})
assert response.status_code == 200
result2 = json.loads(response.get_data())
assert set(result2.keys()) == {"modelName", "requestData", "responseData"}
assert result2["modelName"] == "counting"
assert result2["requestData"] == data
assert result2["responseData"] == result
def show_page(page):
try:
valid_length = len(page) >= MIN_PAGE_NAME_LENGTH
valid_name = re.match('^[a-z]+$', page.lower()) is not None
if valid_length and valid_name:
return render_template("{}.html".format(page))
else:
msg = "Sorry,Couldn't find page with name {}".format(page)
raise NotFound(msg)
except:
rollbar.report_exc_info()
return Response("404 Not Found")
def make_response(self, response_data, headers):
# Todo: pass in optional filename
filename = "response.csv"
headers["Content-disposition"] = "attachment; filename=\"{}\"".format(filename)
headers["Content-Type"] = "{}; charset=utf-8".format(CSV_CONTENT_TYPE)
response = Response(self.csvify(response_data), mimetype=CSV_CONTENT_TYPE)
return response, headers
def csvify(self, response_data):
"""
Make Flask `Response` object,with data returned as a generator for the CSV content
The CSV is built from JSON-like object (Python `dict` or list of `dicts`)
"""
if "items" in response_data:
list_response_data = response_data["items"]
else:
list_response_data = [response_data]
response_fields = list(list_response_data[0].keys())
column_order = getattr(self.response_schema, "csv_column_order", None)
if column_order is None:
# We should still be able to return a CSV even if no column order has been specified
column_names = response_fields
else:
column_names = self.response_schema.csv_column_order
# The column order be only partially specified
column_names.extend([field_name for field_name in response_fields if field_name not in column_names])
output = StringIO()
csv_writer = writer(output, quoting=QUOTE_MINIMAL)
csv_writer.writerow(column_names)
for item in list_response_data:
csv_writer.writerow([item[column] for column in column_names])
# Ideally we'd want to `yield` each line to stream the content
# But something downstream seems to break streaming
yield output.getvalue()
def setup(self):
@self.app.route('/anomaly', methods = ['POST'])
def api_anomaly():
data = request.json
if request.headers['Content-Type'] == 'application/json':
success = self.app.monitor.process_anomaly_data(data)
return self.handle_response(success, data)
else:
return Response("Unsupported media type\n" + data, status=415)
@self.app.route('/monitor', methods = ['POST'])
def api_monitor():
data = request.json
if request.headers['Content-Type'] == 'application/json':
success = self.app.monitor.process_monitor_flows(data)
return self.handle_response(success, status=415)
def handle_response(self, success, data):
json_data = json.dumps(data)
if success:
return Response("OK\n" + json_data, status=200)
else:
return Response("BAD REQUEST\n" + json_data, status=400)
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。