Python flask.request 模块,headers() 实例源码
我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用flask.request.headers()。
def auth_jwt_project(short_name):
"""Create a JWT for a project via its secret KEY."""
project_secret_key = None
if 'Authorization' in request.headers:
project_secret_key = request.headers.get('Authorization')
if project_secret_key:
project = project_repo.get_by_shortname(short_name)
if project and project.secret_key == project_secret_key:
token = jwt.encode({'short_name': short_name,
'project_id': project.id},
project.secret_key, algorithm='HS256')
return token
else:
return abort(404)
else:
return abort(403)
def zmirror_enter(input_path='/'):
"""??????,??????????,??? main_function() """
try:
resp = main_function(input_path=input_path)
# ????????
for name, value in parse.extra_resp_headers.items():
resp.headers.set(name, value)
# ?????cookies
for name, cookie_string in parse.extra_cookies.items():
resp.headers.add("Set-Cookie", cookie_string)
except: # coverage: exclude
return generate_error_page(is_traceback=True)
else:
return resp
# noinspection PyUnusedLocal
def after_request_stuff(resp):
#support CORS
resp.headers['Access-Control-Allow-Origin'] = "*"
resp.headers['Access-Control-Allow-Methods'] = "POST,GET,OPTIONS,PUT,DELETE,PATCH"
resp.headers['Access-Control-Allow-Headers'] = "origin,content-type,accept,x-requested-with"
# remove session
db.session.remove()
# without this jason's heroku local buffers forever
sys.stdout.flush()
# log request for analytics
log_request(resp)
return resp
def make_request(url, json_data=None, method="post", headers=None):
if (headers is None):
headers = {
"Content-Type": "application/json",
"Accept": "application/json"
}
try:
func = getattr(requests, method)
if (json_data):
response = func(url, json=json_data, headers=headers)
else:
response = func(url, headers=headers)
if (response.content):
return json.loads(response.content)
except Exception as e:
app.ext_logger.exception(e)
return None
def request_remote_site():
"""
???????(high-level),????404/500??? domain_guess ??
"""
# ????????
# ??: ?zmirror?????????,??????????????
parse.remote_response = send_request(
parse.remote_url,
method=request.method,
headers=parse.client_header,
data=parse.request_data_encoded,
)
if parse.remote_response.url != parse.remote_url:
warnprint("requests's remote url", parse.remote_response.url,
'does no equals our rewrited url', parse.remote_url)
if 400 <= parse.remote_response.status_code <= 599:
# ??url????????
dbgprint("Domain guessing for", request.url)
result = guess_correct_domain()
if result is not None:
parse.remote_response = result
def put(self):
"""Change the password"""
us = User.query \
.filter(User.disabled == 0) \
.filter(User.id_user == g.current_user) \
.first()
abort_if_none(us, 404, 'User not found')
if not check_password_hash(us.password, request.json['old_password']):
return msg('Old password incorrect'), 403
us.password = request.json['password']
db.session.commit()
cache.blacklisted_tokens.append(request.headers['Authorization'])
return msg('success!')
def _process_request(self, request):
self._logger.info('[Ghost2logger]: Received Request')
if request.headers['Content-Type'] == 'application/json':
payload = request.json
if 'pattern' and 'host' and 'hostpattern' and 'message' in payload:
self._logger.info(request.json)
self._logger.debug('[ghost2logger_sensor]: processing request'
' {}'.format(payload))
self._sensor_service.dispatch(trigger=self._pmatch,
payload=payload)
return ('ok', 200)
else:
return ('fail', 415)
else:
return ('fail', 415)
def get_inBox():
pyldnlog.debug("Requested inBox data of {} in {}".format(request.url, request.headers['Accept']))
if not request.headers['Accept'] or request.headers['Accept'] == '*/*' or 'text/html' in request.headers['Accept']:
resp = make_response(inBox_graph.serialize(format='application/ld+json'))
resp.headers['Content-Type'] = 'application/ld+json'
elif request.headers['Accept'] in ACCEPTED_TYPES:
resp = make_response(inBox_graph.serialize(format=request.headers['Accept']))
resp.headers['Content-Type'] = request.headers['Accept']
else:
return 'Requested format unavailable', 415
resp.headers['X-Powered-By'] = 'https://github.com/albertmeronyo/pyldn'
resp.headers['Allow'] = "GET,HEAD,POST"
resp.headers['Link'] = '<http://www.w3.org/ns/ldp#Resource>; rel="type",<http://www.w3.org/ns/ldp#RDFSource>; rel="type",<http://www.w3.org/ns/ldp#Container>; rel="type",<http://www.w3.org/ns/ldp#BasicContainer>; rel="type"'
resp.headers['Accept-Post'] = 'application/ld+json,text/turtle'
return resp
def get_notification(id):
pyldnlog.debug("Requested notification data of {}".format(request.url))
pyldnlog.debug("Headers: {}".format(request.headers))
# Check if the named graph exists
pyldnlog.debug("Dict key is {}".format(pyldnconf._inBox_url + id))
if pyldnconf._inBox_url + id not in graphs:
return 'Requested notification does not exist', 404
if 'Accept' not in request.headers or request.headers['Accept'] == '*/*' or 'text/html' in request.headers['Accept']:
resp = make_response(graphs[pyldnconf._inBox_url + id].serialize(format='application/ld+json'))
resp.headers['Content-Type'] = 'application/ld+json'
elif request.headers['Accept'] in ACCEPTED_TYPES:
resp = make_response(graphs[pyldnconf._inBox_url + id].serialize(format=request.headers['Accept']))
resp.headers['Content-Type'] = request.headers['Accept']
else:
return 'Requested format unavailable', 415
resp.headers['X-Powered-By'] = 'https://github.com/albertmeronyo/pyldn'
resp.headers['Allow'] = "GET"
return resp
def pay_user(to_user_name, to_address, amount):
"""
Uses a BitTransferRequests to do an off chain payment.
"""
headers = {BitTransferRequests.HTTP_BITCOIN_PRICE: amount,
BitTransferRequests.HTTP_BITCOIN_ADDRESS: to_address,
BitTransferRequests.HTTP_BITCOIN_USERNAME: to_user_name}
response = MockRequest()
setattr(response, 'headers', headers)
setattr(response, 'url', 'http://10.244.119.122:11116')
logger.debug("Making 402 payment request with headers: {}".format(response))
req = requests.make_402_payment(response, amount)
logger.debug("Have the payment: {}".format(req))
transfer = BitTransfer(wallet, username=to_user_name)
logger.debug("Have the transfer: {}".format(transfer))
return transfer.redeem_payment(amount, req)
def to_paginated_list(self, result, _ns, _operation, **kwargs):
"""
Convert a controller result to a paginated list.
The result format is assumed to meet the contract of this page class's `parse_result` function.
"""
items, context = self.parse_result(result)
headers = dict()
paginated_list = PaginatedList(
items=items,
_page=self,
_ns=_ns,
_operation=_operation,
_context=context,
)
return paginated_list, headers
def create_restmin_app(app_name, config_path, base_url, request_processor):
from flask import Flask
app = Flask(app_name)
app.config.from_object(config_path)
@app.route(base_url + '<string:resource>', methods = ['GET', 'POST', 'PUT'])
@crossdomain(origin='*')
def _api_generic(resource):
return api_generic(request, request_processor, current_app, resource)
@app.route(base_url + '<string:resource>', methods = ['OPTIONS'])
@crossdomain(origin='*', headers='Content-Type,X-User,X-Token')
def _options(self):
return jsonify({'Allow' : 'GET,POST,PUT' }), 200
return app
def _provide_client_handler(self, section, name, kwargs_updator=None):
def _wrapper(func):
@wraps(func)
def _handler(**kwargs):
client_key = self.auth.authenticate(
request.method,
request.url,
request.headers)
client = self.client_class.load(client_key)
if not client:
abort(401)
g.ac_client = client
kwargs['client'] = client
if kwargs_updator:
kwargs.update(kwargs_updator(**kwargs))
ret = func(**kwargs)
if ret is not None:
return ret
return '', 204
self._add_handler(section, _handler)
return func
return _wrapper
def auth_required(f):
@wraps(f)
def decorated(*args, **kwargs):
try:
token = request.headers['X-AuthToken']
if token is None:
return {'state': 'fail', 'message': 'Authorization required'}, 403
token = Token.query.get(token)
if token is None:
return {'state': 'fail', 'message': 'Invalid token'}, 403
if token.expires_at is not None and token.expires_at < datetime.datetime.Now():
token.delete()
db.session.flush()
db.session.commit()
return {'state': 'fail', 'message': 'Token was expired'}, 403
return f(token=token, *args, **kwargs)
except Exception as e:
db.session.rollback()
log.exception(e)
return {'state': 'fail', 'message': str(e)}, 500
return decorated
def catalog():
# Return the catalog of services handled by this broker
#
# GET /v2/catalog:
#
# HEADER:
# X-broker-Api-Version: <version>
#
# return:
# JSON document with details about the
# services offered through this broker
api_version = request.headers.get('X-broker-Api-Version')
# Check broker API version
if not api_version or float(api_version) < X_broKER_API_VERSION:
abort(412, "Precondition Failed. Missing or incompatible %s. Expecting version %0.1f or later" % (X_broKER_API_VERSION_NAME, X_broKER_API_VERSION))
services={"services": [pseudo_service]}
return jsonify(services)
#
# Provision
#
def catch_all(path):
if not path.startswith('/'):
path = '/' + path
resp = Response('You want path: %s' % path)
if path.startswith('/ambassador/'):
resp.status_code = 200
elif path.endswith("/good/") or path.endswith("/demo/"):
resp.status_code = 200
resp.headers['X-Hurkle'] = 'Oh baby,oh baby.'
elif path.endswith("/nohdr/"):
resp.status_code = 200
# Don't add the header.
else:
resp.status_code = 403
resp.headers['X-Test'] = 'Should not be seen.'
return resp
def calendar_push():
watch_id = request.headers['X-Goog-Channel-ID']
db_calendar = Calendar.get_by_watch_id(watch_id)
if db_calendar:
if db_calendar.active:
user = db_calendar.key.parent().get()
client = make_client(user)
sync_time = datetime.utcNow()
try:
updates = sync_calendar(sync_time, user, client, db_calendar)
if updates:
sync_user(sync_time, client)
except Exception as e:
print e
else:
resource_id = request.headers['X-Goog-Resource-ID']
print 'UnkNown push notification for resource id %s' % resource_id
return make_response()
def authenticate():
# logger.debug("endpoint request: %s" % request.endpoint)
if re.search('tenant_provisioned', str(request.endpoint)):
g.user = "phone_home"
logger.info("Authentication bypassed: tenant_provisioned")
return
try:
decoded = jwt.decode(request.headers['X-Auth-Token'], credentials['tenant_secret'], algorithms=['HS256'])
g.user = decoded['user']
except KeyError:
logger.error("Error: key error.")
abort(401)
except jwt.DecodeError:
logger.error("Error: decode error")
abort(401)
def auth_redirect():
err = ds_authentication.auth_redirect()
# err is False or an error message
# We will use the Flash technique to show the message on the home page.
# Or a simpler alternative would be to show the error message on an intermediate
# page,with a "Continue" link to the home page
if err:
flash(err)
# flash("Debug info: " + str(request.headers))
# Authentication / re-authentication was successful
# figure out what to do next
if "auth_redirect" in session:
auth_redirect = session["auth_redirect"]
if auth_redirect:
session["auth_redirect"] = False
return redirect(auth_redirect)
return redirect(ds_recipe_lib.get_base_url(1))
def handle_auth():
"""
Forward the auth request to swift
replace the given storage url with our own:
'X-Storage-Url': 'http://192.168.209.204:8080/v1/AUTH_test'
becomes
'X-Storage-Url': 'http://localhost:4000/v1/AUTH_test'
this is the first request any client makes; we passed on an auth-token from swift
which is used in further requests
:return:
"""
clientHeaders = request.headers
swiftStatus, swiftHeaders, swiftBody = httpBackend.doAuthGetToken(reqHead=clientHeaders, method="GET")
log.debug("swift response: {} {} {}".format(swiftStatus, swiftBody))
if 200 == swiftStatus:
replaceStorageUrl(swiftResponse=swiftHeaders)
log.debug("proxy response: {} {} {}".format(swiftStatus, swiftBody))
return Response(status=swiftStatus, headers=swiftHeaders, response=swiftBody)
def authenticate(self, f):
@wraps(f)
def decorated(*args, **kwargs):
if not 'Access-Token' in request.headers:
return abort(401, "Set token to access protected routes")
token = request.headers['Access-Token']
user_id = self.redis.get(token)
if not user_id:
return abort(401, "Token is invalid or has expired")
g.user_id = user_id
return f(*args, **kwargs)
return decorated
def test(ALERTID=None):
"""Log the auth header,request,and parsed moreinfo field. Respond with success. Don't send the payload anywhere."""
try:
logging.info(request.headers['Authorization'])
except KeyError:
pass
if request.get_data():
logging.info(request.get_data())
a = parse(request)
try:
logging.info(a['moreinfo'])
except KeyError:
pass
return "OK"
# Import individual shims
def key_create(adapter_id):
"""Creates a key using a certain adapter."""
adapter = get_adapter(adapter_id)
if not adapter:
return output.failure("That adapter doesn't (yet) exist. Please check the adapter name and try again.", 501)
if not adapter.do_verify(request.headers):
return output.failure("Credential verification Failed. Please check your credentials and try again.", 401)
result = adapter.do_key_create(request.headers, request.json)
if 'error' in result:
return output.failure(result['error'], result['status'])
return output.success(result['data'], result['status'])
def key_query(adapter_id, key_id):
"""Queries data about a key using a certain adapter."""
adapter = get_adapter(adapter_id)
if not adapter:
return output.failure("That adapter doesn't (yet) exist. Please check the adapter name and try again.", 401)
result = adapter.do_key_query(request.headers, key_id)
if 'error' in result:
return output.failure(result['error'], result['status'])
def key_delete(adapter_id, key_id):
"""Deletes a key using a certain adapter."""
adapter = get_adapter(adapter_id)
if not adapter:
return output.failure("That adapter doesn't (yet) exist. Please check the adapter name and try again.", 401)
result = adapter.do_key_delete(request.headers, key_id)
if isinstance(result, dict) and 'error' in result:
return output.failure(result['error'], result['status'])
return ""
def server_create(adapter_id):
"""Creates a server using a certain adapter."""
adapter = get_adapter(adapter_id)
if not adapter:
return output.failure("That adapter doesn't (yet) exist. Please check the adapter name and try again.", 401)
result = adapter.do_server_create(request.headers, result['status'])
def server_cancel(adapter_id, server_id):
"""Cancels a server using a certain adapter."""
adapter = get_adapter(adapter_id)
if not adapter:
return output.failure("That adapter doesn't (yet) exist. Please check the adapter name and try again.", 401)
result = adapter.do_server_cancel(request.headers, server_id)
if isinstance(result, result['status'])
return ""
def server_install_key(adapter_id, server_id):
"""Installs an SSH key on a server using a certain adapter,if that adapter supports key installation."""
adapter = get_adapter(adapter_id)
if not adapter:
return output.failure("That adapter doesn't (yet) exist. Please check the adapter name and try again.", 501)
if not adapter.can_install_key():
return output.failure("This adapter doesn't support installing keys on servers.", 401)
result = adapter.do_install_key(request.headers, server_id, request.json)
if isinstance(result, result['status'])
return ""
def server_reboot(adapter_id, server_id):
"""Reboots a server using a certain adapter,if that adapter supports rebooting."""
adapter = get_adapter(adapter_id)
if not adapter:
return output.failure("That adapter doesn't (yet) exist. Please check the adapter name and try again.", 501)
if not adapter.can_reboot():
return output.failure("This adapter doesn't support rebooting servers.", 401)
result = adapter.do_server_reboot(request.headers, result['status'])
return ""
def server_rename(adapter_id, server_id):
"""Renames a server using a certain adapter,if that adapter supports renaming."""
adapter = get_adapter(adapter_id)
if not adapter:
return output.failure("That adapter doesn't (yet) exist. Please check the adapter name and try again.", 501)
if not adapter.can_rename():
return output.failure("This adapter doesn't support renaming servers.", 401)
result = adapter.do_server_rename(request.headers, result['status'])
return ""
def score():
if request.headers['Content-Type'] != 'application/json':
resp = Response('Unssuported content type,expected application/json', status=500);
return resp
if (not request.json.has_key('text')):
resp = Response('Bad request: missing "text" field in JSON body', status=500);
return resp
if (not request.json.has_key('entities')):
resp = Response('Bad request: missing "entities" field in JSON body', status=500);
return resp
text = request.json['text']
entities = request.json['entities']
try:
scorerResult = scorer.evaluate_score(text, entities)
resp = jsonify(scorer_result_to_response_format(scorerResult))
resp.status_code = 200
return resp
except Exception as e:
resp = Response("Internal Server Error: %s"%e, status = 500)
return resp
def update_model():
if request.headers['Content-Type'] != 'application/json':
resp = Response('Unssuported content type, status=500);
return resp
if (not request.json.has_key('path')):
resp = Response('Bad request: missing "path" field in JSON body', status=500);
return resp
path = request.json['path']
try:
scorer.load_model_from_url(path)
resp = Response("", status=200);
return resp
except Exception as e:
resp = Response("Internal Server Error: %s"%e, status = 500)
return resp
def contains_payment(self, price, request_headers, **kwargs):
"""Validate the payment information received in the request headers.
Args:
price (int): The price the user must pay for the resource.
request_headers (dict): Headers sent by client with their request.
keyword args: Any other headers needed to verify payment.
Returns:
(bool): True if payment is valid,
False if no payment attached (402 initiation).
Raises:
BadRequest: If request is malformed.
"""
for method in self.allowed_methods:
if method.should_redeem(request_headers):
try:
return method.redeem_payment(price, **kwargs)
except PaymentError as e:
raise BadRequest(str(e))
except Exception as e:
raise BadRequest(repr(e))
return False
def view_sql(backend):
"""View data from database.
By default,thise route runs in autocommit false.
Args:
sql: Valid sql for your backend
sql_params: Optional jinja2 params
Returns:
json
"""
key = request.headers['x-api-key']
args = request.args
sql = args['sql']
sql_params = args.get('sql_params', '{}')
autocommit = False
client = _connect(backend, key, autocommit)
rendered_sql = _format_sql(sql, sql_params)
results = _sql_cmd(client, rendered_sql)
return jsonify([(dict(row.items())) for row in results])
def execute_sql(backend):
"""Modify data from the database.
Args:
sql: valid sql for your backend
sql_params: Optional jinja2 params
Returns:
json
"""
key = request.headers['x-api-key']
args = request.json
sql = args['sql']
sql_params = args.get('sql_params', '{}')
autocommit = args.get('autocommit', False)
client = _connect(backend, rendered_sql)
return jsonify(str("%s row(s) affected." % results.rowcount))
def request_start():
content_type = request.headers.get('Accept') or ''
real_ip = request.headers.get('X-Real-Ip') or ''
Log.info(request.path+' '+format_args(request.args)\
+' '+real_ip\
+' '+content_type)
#Test content_type
# if content_type and content_type not in AVAILABLE_CONTENT_TYPES:
# results = {'message' : 'Content-Type not supported',
# 'message_code' : 8
# }
# return {'error' : 'content-type'}
# return self.render(results,status_code = 405)
def _default_auth_request_handler():
if not request.headers['Content-Type']:
raise JWTError("Bad Request", "Missing Content-Type", 400)
if request.headers['Content-Type'] != "application/json":
raise JWTError("Bad Request", "Invalid Content-Type", 400)
data = request.get_json()
username = data.get(current_app.config.get('JWT_AUTH_USERNAME_KEY'), None)
password = data.get(current_app.config.get('JWT_AUTH_PASSWORD_KEY'), None)
exp_delta = data.get(current_app.config.get('JWT_AUTH_EXPDELTA_KEY'), None)
criterion = [username, password, len(data) >= 2]
if not all(criterion):
raise JWTError('Bad Request', 'Invalid credentials')
identity = _jwt.authentication_callback(username, password)
if identity:
access_token = _jwt.jwt_encode_callback(identity, exp_delta)
return _jwt.auth_response_callback(access_token, identity)
else:
raise JWTError('Bad Request', 'Invalid credentials')
def content():
if request.method == 'GET':
works = model.getContentList()
return jsonld_response(json.dumps(works))
if request.method == 'POST':
# Parse the incoming JSON-LD data
force = request.headers['Content-Type'].startswith('application/ld+json')
data = request.get_json(force=force)
if data is None:
abort(400)
if 'name' not in data or \
'genre' not in data or \
'headline' not in data or \
'@type' not in data:
abort(400)
status,url,modified = model.createContent(data['@type'],data['genre'],data['name'],data['headline'])
return Response(status=status,headers=({'Location' : url, 'Date-Modified' : modified} if status==201 else {}))
def content_item(id):
if request.method == 'GET':
content = model.getContent(id)
return jsonld_response(json.dumps(content))
if request.method == 'POST':
abort(400)
if request.method == 'PUT':
force = request.headers['Content-Type'].startswith('application/ld+json')
data = request.get_json(force=force)
if data is None:
abort(400)
status_code,data,contentType = model.updateContent(id,data);
if status_code==200:
return Response(stream_with_context(data),content_type = contentType)
else:
abort(status_code)
if request.method == 'DELETE':
status = model.deleteContent(id)
return Response(status=status)
def content_item_resource_upload(id,property):
#print(request.headers['Content-Type'])
#print(request.files)
file = request.files['file']
#print(file.filename)
#print(file.content_type)
#print(file.content_length)
uploadContentType = file.content_type
if file.content_type.startswith("text/") and file.content_type.find("charset=")<0:
uploadContentType = file.content_type+"; charset=UTF-8"
uploadDir = app.config['UPLOAD_STAGING'] if 'UPLOAD_STAGING' in app.config else 'tmp'
os.makedirs(uploadDir,exist_ok=True)
staged = os.path.join(uploadDir, file.filename)
file.save(staged)
status = 500
responseJSON = None
contentType = None
with open(staged,"rb") as data:
status,responseJSON,contentType = model.uploadContentResource(id,property,file.filename,uploadContentType,os.path.getsize(staged),data)
os.unlink(staged)
if status==200 or status==201:
return Response(stream_with_context(responseJSON),status=status,content_type = contentType)
else:
return Response(status=status)
def query(self,q,limit=None,graph=None):
params = {'query':str(q)}
if limit is not None:
params['limit'] = limit
if graph is not None:
params['context'] = '<' + graph + '>'
#print(str(q))
req = requests.get(self.endpoints['query'],params=params,headers={'accept':'application/json'},auth=self.auth)
if (req.status_code>=200 or req.status_code<300):
data = json.loads(req.text)
return data
else:
raise IOError('Cannot query to uri <{}>,status={}'.format(self.service,req.status_code))
def update(self,graph=None):
params = {'query':str(q)}
if limit is not None:
params['limit'] = limit
if graph is not None:
params['context'] = '<' + graph + '>'
#print(str(q))
req = requests.post(self.endpoints['query'],auth=self.auth)
if (req.status_code>=200 or req.status_code<300):
#print(req.text)
data = json.loads(req.text)
return data
else:
raise IOError('Cannot query to uri <{}>,req.status_code))
def callback():
# get X-Line-Signature header value
signature = request.headers['X-Line-Signature']
# get request body as text
body = request.get_data(as_text=True)
app.logger.info("Request body: " + body)
event = request.get_json()
botimize.log_incoming(event)
# handle webhook body
try:
handler.handle(body, signature)
except InvalidSignatureError:
abort(400)
return 'OK'
def get_and_verify_user(request):
if not request.headers or 'Authorization' not in request.headers.keys():
return None
auth_token = request.headers['Authorization'].split(' ')[-1]
if not auth_token or auth_token == 'null':
return None
claims = google.oauth2.id_token.verify_firebase_token(
auth_token, _grequest)
if not claims:
return None
else:
global _save_auth_cookie
if _save_auth_cookie and request.json and \
'refreshToken' in request.json.keys():
get_db().refresh_auth_token(
claims['email'],
request.json['refreshToken']
)
get_db().register_user(claims['user_id'], claims['email'])
return claims['user_id']
def socketio_domains_viewer(data):
#~ if data['kind'] == 'file':
#~ consola=app.isardapi.get_viewer_ticket(data['pk'])
#~ viewer=''
#~ return Response(consola,
#~ mimetype="application/x-virt-viewer",
#~ headers={"Content-disposition":"attachment;filename=consola.vv"})
if data['kind'] == 'xpi':
viewer=app.isardapi.get_spice_xpi(data['pk'])
if data['kind'] == 'html5':
print('HTML5')
viewer=app.isardapi.get_domain_spice(data['pk'])
##### Change this when engine opens ports accordingly (without tls)
if viewer['port']:
viewer['port'] = viewer['port'] if viewer['port'] else viewer['tlsport']
viewer['port'] = "5"+ viewer['port']
#~ viewer['port']=viewer['port']-1
socketio.emit('domain_viewer',
json.dumps({'kind':data['kind'],'viewer':viewer}),
namespace='/sio_users',
room='user_'+current_user.username)
def login():
if request.method == 'POST':
if request.form['user'] is '' or request.form['password'] is '':
flash("Can't leave it blank",'danger')
else:
au=auth()
user=au.check(request.form['user'],request.form['password'])
if user:
login_user(user)
flash('Logged in successfully.','success')
if user.is_admin:
return redirect(url_for('admin'))
return redirect(url_for('desktops'))
else:
flash('Username not found or incorrect password.','warning')
remote_addr=request.headers['X-Forwarded-For'] if 'X-Forwarded-For' in request.headers else request.remote_addr
disposables=app.isardapi.show_disposable(remote_addr)
log.info(disposables)
log.info(remote_addr)
return render_template('login_disposables.html', disposables=disposables if disposables else '')
def voucher_login():
if request.method == 'POST':
remote_addr=request.headers['X-Forwarded-For'] if 'X-Forwarded-For' in request.headers else request.remote_addr
au=auth_voucher()
if au.check_voucher(request.form['voucher']):
if au.check_user_exists(request.form['email']):
au.register_user(request.form['voucher'],request.form['email'],remote_addr)
flash('Resetting account. Email with new isard user sent to '+request.form['email']+'. Please check your email','warning')
else:
au.register_user(request.form['voucher'],remote_addr)
flash('Email with isard user sent to '+request.form['email']+'. Please check your email','success')
else:
flash('Invalid registration voucher code','danger')
disposables=False
return render_template('login.html', disposables=disposables if disposables else '')
def allAccounts():
try:
token = request.headers["Authorization"] or None
if not token:
return custResponse(401, "Unauthorized. Sign in required.")
accId = accFunctions.getIdFromToken(token)
if not accId or not accFunctions.isAdmin(accId):
return custResponse(401, "Unauthorized. Invalid token.")
users = [user.serialize() for user in user.query.all()]
return custResponse(200, "Accounts successfully retrieved", {"Users": users})
except Exception as e:
if app.config["DEBUG"] == True:
print("*-*-*-*")
print(e)
return custResponse(500,{"Err": str(e)})
else:
return custResponse(500, "An unkNown error occured.")
# add same as above for put del on /accounts/id
def configure_linebot_app(app):
@app.after_request
def commit_database(response):
db.commit()
return response
@app.route("/api/line_webhook", methods=["POST"])
def line_webhook():
signature = request.headers['X-Line-Signature']
body = request.get_data(as_text=True)
logger.debug(f'Incoming message:\n{pformat(body)}')
try:
line_webhook_handler.handle(body, signature)
except InvalidSignatureError:
logger.warning('Message with an invalid signature received')
abort(400)
except LineBotApiError as e:
logger.error(f'{e}\nDetails:\n{pformat(e.error.details)}')
abort(500)
except Exception as e:
logger.error(f'Uncaught error: {e}')
abort(500)
return "OK"
def headers():
data = dict()
data["method"] = request.method
data["headers"] = dict(request.headers)
data["origin"] = request.remote_addr
return jsonify(data)
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。