Python aiohttp.web 模块,Response() 实例源码
我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用aiohttp.web.Response()。
def create_handler(transmute_func, context):
@wraps(transmute_func.raw_func)
async def handler(request):
exc, result = None, None
try:
args, kwargs = await extract_params(request, context,
transmute_func)
result = await transmute_func.raw_func(*args, **kwargs)
except HTTPException as hpe:
code = hpe.status_code or 400
exc = APIException(code=code, message=str(hpe))
except Exception as e:
exc = e
response = transmute_func.process_result(
context, result, exc, request.content_type
)
return web.Response(
body=response["body"], status=response["code"],
content_type=response["content-type"],
headers=response["headers"]
)
handler.transmute_func = transmute_func
return handler
def add_swagger_api_route(app, target_route, swagger_json_route):
"""
mount a swagger statics page.
app: the aiohttp app object
target_route: the path to mount the statics page.
swagger_json_route: the path where the swagger json deFinitions is
expected to be.
"""
static_root = get_swagger_static_root()
swagger_body = generate_swagger_html(
STATIC_ROOT, swagger_json_route
).encode("utf-8")
async def swagger_ui(request):
return web.Response(body=swagger_body, content_type="text/html")
app.router.add_route("GET", swagger_ui)
app.router.add_static(STATIC_ROOT, static_root)
def create_swagger_json_handler(app, **kwargs):
"""
Create a handler that returns the swagger deFinition
for an application.
This method assumes the application is using the
TransmuteUrldispatcher as the router.
"""
spec = get_swagger_spec(app).swagger_deFinition(**kwargs)
encoded_spec = json.dumps(spec).encode("UTF-8")
async def swagger(request):
return web.Response(
# we allow CORS,so this can be requested at swagger.io
headers={
"Access-Control-Allow-Origin": "*"
},
body=encoded_spec,
content_type="application/json",
)
return swagger
def _http_add_org_webhook(self, request):
await request.post()
data = {
"name": "web",
"active": True,
"events": ["*"],
"config": {
"url": self.root.config.core["url"] + self.url + "webhook",
"content_type": "json"
},
}
client = self._get_client(request)
resp = await client.post("/orgs/:org/hooks", request.POST["org"], **data)
org_data = await client.get("/orgs/:org", request.POST["org"])
self.orgs[str(org_data["id"]).encode("ascii")] = client.token
return web.Response(text=str(resp))
def http_handler(self, request):
if request.path.endswith("api.sock"):
return await self.ws_handler(request)
if request.path.endswith("/monitor/"):
data = pkgutil.get_data("rci.services.monitor",
"monitor.html").decode("utf8")
return web.Response(text=data, content_type="text/html")
if request.path.endswith("/login/github"):
if request.method == "POST":
url = self.oauth.generate_request_url(("read:org", ))
return web.HTTPFound(url)
if request.path.endswith("/oauth2/github"):
return (await self._oauth2_handler(request))
if request.path.endswith("logout"):
if request.method == "POST":
sid = request.cookies.get(self.config["cookie_name"])
del(self.sessions[sid])
return web.HTTPFound("/monitor/")
return web.HTTPNotFound()
def get(self) -> web.Response:
dbcon = self.request.app['dbcon']
if 'id' in self.request.rel_url.query:
contact_id = require_int(get_request_param(self.request, 'id'))
c = await contact.get_contact(dbcon, contact_id)
contact_list = [] # type: Iterable[object_models.Contact]
if c:
contact_list = [c]
Metadata_list = await Metadata.get_Metadata_for_object(dbcon, 'contact', contact_id)
elif 'Meta_key' in self.request.rel_url.query:
Meta_key = require_str(get_request_param(self.request, 'Meta_key'))
Meta_value = require_str(get_request_param(self.request, 'Meta_value'))
contact_list = await contact.get_contacts_for_Metadata(dbcon, Meta_key, Meta_value)
Metadata_list = await Metadata.get_Metadata_for_object_Metadata(
dbcon, Meta_value, 'contacts')
else:
contact_list = await contact.get_all_contacts(dbcon)
Metadata_list = await Metadata.get_Metadata_for_object_type(dbcon, 'contact')
return web.json_response(apply_Metadata_to_model_list(contact_list, Metadata_list))
def get(self) -> web.Response:
dbcon = self.request.app['dbcon']
if 'id' in self.request.rel_url.query:
contact_group_id = require_int(get_request_param(self.request, 'id'))
contact_group_item = await contact.get_contact_group(dbcon, contact_group_id)
contact_group_list = [] # type: Iterable[object_models.ContactGroup]
if contact_group_item:
contact_group_list = [contact_group_item]
Metadata_list = await Metadata.get_Metadata_for_object(dbcon, 'contact_group', contact_group_id)
elif 'Meta_key' in self.request.rel_url.query:
Meta_key = require_str(get_request_param(self.request, 'Meta_value'))
contact_group_list = await contact.get_contact_groups_for_Metadata(dbcon, 'contact_groups')
else:
contact_group_list = await contact.get_all_contact_groups(dbcon)
Metadata_list = await Metadata.get_Metadata_for_object_type(dbcon, 'monitor_group')
return web.json_response(apply_Metadata_to_model_list(contact_group_list, Metadata_list))
def get(self) -> web.Response:
dbcon = self.request.app['dbcon']
if 'id' in self.request.rel_url.query:
monitor_group_id = require_int(get_request_param(self.request, 'id'))
monitor_group_item = await monitor_group.get_monitor_group(dbcon, monitor_group_id)
monitor_group_list = [] # type: Iterable[object_models.MonitorGroup]
if monitor_group_item:
monitor_group_list = [monitor_group_item]
Metadata_list = await Metadata.get_Metadata_for_object(dbcon, 'monitor_group', monitor_group_id)
elif 'Meta_key' in self.request.rel_url.query:
Meta_key = require_str(get_request_param(self.request, 'Meta_value'))
monitor_group_list = await monitor_group.get_monitor_groups_for_Metadata(dbcon, 'monitor_groups')
else:
monitor_group_list = await monitor_group.get_all_monitor_groups(dbcon)
Metadata_list = await Metadata.get_Metadata_for_object_type(dbcon, 'monitor_group')
return web.json_response(apply_Metadata_to_model_list(monitor_group_list, Metadata_list))
def authenticate(*,email,passwd):
if not email:
raise APIValueError('email','Invalid email')
if not passwd:
raise APIValueError('passwd','Invalid passwd')
users = await User.findAll('email=?',[email])
if len(users) == 0:
raise APIValueError('email','Email not exist.')
user = users[0]
# check passwd
sha1 = hashlib.sha1()
sha1.update(user.id.encode('utf-8'))
sha1.update(b':')
sha1.update(passwd.encode('utf-8'))
if user.passwd != sha1.hexdigest():
raise APIValueError('passwd','passwd error')
# authenticate ok set cookie
r = web.Response()
r.set_cookie(COOKIE_NAME,user2cookie(user,86400),max_age=86400,httponly=True)
user.passwd = '******'
r.content_type = 'application/json'
r.body = json.dumps(user,ensure_ascii=False).encode('utf-8')
return r
# ??
def api_register_user(*,name,passwd):
if not name or not name.strip():
raise APIValueError('name')
if not email or not _RE_EMAIL.match(email):
raise APIValueError('email')
if not passwd or not _RE_SHA1.match(passwd):
raise APIValueError('passwd')
users = await User.findAll('email=?',[email])
if len(users) > 0:
raise APIError('register:Failed','email','Email is already in use')
uid = next_id()
sha1_passwd = '%s:%s' % (uid,passwd)
encrypt_passwd = hashlib.sha1(sha1_passwd.encode('utf-8')).hexdigest()
image = 'https://1.gravatar.com/avatar/%s?s=200&r=pg&d=mm'
user = User(id=uid,name=name.strip(),email=email,passwd=encrypt_passwd,image=image % hashlib.md5(email.encode('utf-8')).hexdigest())
await user.save()
# make session cookie
r = web.Response()
r.set_cookie(COOKIE_NAME,ensure_ascii=False).encode('utf-8')
return r
def handle(self, request):
filename = request.match_info['filename']
try:
filepath = self._directory.joinpath(filename).resolve()
except (ValueError, FileNotFoundError, OSError):
pass
else:
if filepath.is_dir():
request.match_info['filename'] = str(filepath.joinpath('index.html').relative_to(self._directory))
status, length = 'unkNown', ''
try:
response = await super().handle(request)
except HTTPNotModified:
status, length = 304, 0
raise
except HTTPNotFound:
_404_msg = '404: Not Found\n\n' + _get_asset_content(self._asset_path)
response = web.Response(body=_404_msg.encode('utf8'), status=404)
status, length = response.status, response.content_length
else:
status, response.content_length
finally:
l = logger.info if status in {200, 304} else logger.warning
l(' > %s %s %s %s', request.method, request.path, status, _fmt_size(length))
return response
def async_handle_subscribe(request, subscribed_clients, state_variables):
callback_url = request.headers.get('CALLBACK')[1:-1]
sid = 'uuid:' + str(uuid.uuid4())
subscribed_clients[sid] = callback_url
headers = {
'SID': sid
}
@asyncio.coroutine
def async_push_later(state_variable):
yield from asyncio.sleep(0.5)
yield from state_variable.async_notify_listeners()
for state_variable in state_variables.values():
LOGGER.debug('Pushing state_variable on SUBSCRIBE: %s', state_variable.name)
asyncio.get_event_loop().create_task(async_push_later(state_variable))
return web.Response(status=200, headers=headers)
#endregion
#region Unsubscribe
def api_register_user(*, email, name, password):
if not name or not name.strip():
raise APIValueError('name')
if not email or not _RE_EMAIL.match(email):
raise APIValueError('email')
if not password or not _RE_SHA1.match(password):
raise APIValueError('password')
users = yield from User.find_all('email=?', [email])
if len(users) > 0:
raise APIError('Register Failed', 'email', 'Email is already in use.')
uid = next_id()
sha1_password = '{}:{}'.format(uid, password)
logging.info('register password:{},sha1_password:{}'.format(password, sha1_password))
user = User(id=uid, name= name.strip(), email= email, password = hashlib.sha1(sha1_password.encode('utf-8')).hexdigest(), image='http://www.gravatar.com/avatar/{}?d=mm&s=120'.format(hashlib.md5(email.encode('utf-8')).hexdigest()))
yield from user.save()
r = web.Response()
r.set_cookie(COOKIE_NAME, user2cookie(user, 86400), max_age=86400, httponly=True)
user.password = '*' * 8
r.content_type = 'application/json'
r.body = json.dumps(user, ensure_ascii=False).encode('utf-8')
return r
def authenticate(*, password):
if not email:
raise APIValueError('email', 'Invalid Email')
if not password:
raise APIValueError('password', 'Invalid Password')
users = yield from User.find_all('email=?', [email])
if len(users) == 0:
raise APIValueError('email', 'Email not exist')
user = users[0]
#check password
sha1_password = '{}:{}'.format(user.id, password)
logging.info('login password:{}, sha1_password))
if user.password != hashlib.sha1(sha1_password.encode('utf-8')).hexdigest():
raise APIValueError('password', 'Invalid Password.')
# authenticate ok,set cookie
r = web.Response()
r.set_cookie(COOKIE_NAME, ensure_ascii=False).encode('utf-8')
return r
def datetime_filter(t):
date_time = datetime.fromtimestamp(t)
str_date = date_time.strftime("%Y-%m-%d %X")
delta = int(time.time() - t)
if delta < 60:
return u'<span title="{}">1???</span>'.format(str_date)
if delta < 3600:
return u'<span title="{}">{}???</span>'.format(str_date, delta // 60)
if delta < 86400:
return u'<span title="{}">{}???</span>'.format(str_date, delta // 3600)
if delta < 604800:
return u'<span title="{}">{}??</span>'.format(str_date, delta // 86400)
#dt = datetime.fromtimestamp(t)
return u'<span title="{}">{}</span>'.format(str_date, date_time.strftime("%Y?%m?%d?"))
#def index(request):
#return web.Response(body=b'<h1>Awesome python3 Web</h1>',content_type='text/html')
def aiohttp_server(loop, addr):
async def handle(request):
payload_size = int(request.match_info.get('size', 1024))
resp = _RESP_CACHE.get(payload_size)
if resp is None:
resp = b'X' * payload_size
_RESP_CACHE[payload_size] = resp
return web.Response(body=resp)
app = web.Application(loop=loop)
app.router.add_route('GET', '/{size}', handle)
app.router.add_route('GET', '/', handle)
handler = app.make_handler()
server = loop.create_server(handler, *addr)
return server
def __call__(self, value):
# Safe html transformation
if _is_pserver_response(value):
body = value.response
if not isinstance(body, bytes):
if not isinstance(body, str):
body = json.dumps(value.response)
body = body.encode('utf8')
value = aioResponse(
body=body, status=value.status,
headers=value.headers)
if 'content-type' not in value.headers:
value.headers.update({
'content-type': 'text/html'
})
return value
def guess_response(self, value):
resp = value.response
if isinstance(resp, dict):
resp = aioResponse(body=bytes(json.dumps(resp, cls=PServerjsonEncoder), 'utf-8'))
resp.headers['Content-Type'] = 'application/json'
elif isinstance(resp, list):
resp = aioResponse(body=bytes(json.dumps(resp, str):
resp = aioResponse(body=bytes(resp, 'utf-8'))
resp.headers['Content-Type'] = 'text/html'
elif resp is None:
# missing result...
resp = aioResponse(body=b'{}')
resp.headers['Content-Type'] = 'application/json'
resp.headers.update(value.headers)
if not resp.prepared:
resp.set_status(value.status)
return resp
def home(request): # <1>
query = request.GET.get('query', '').strip() # <2>
print('Query: {!r}'.format(query)) # <3>
if query: # <4>
descriptions = list(index.find_descriptions(query))
res = '\n'.join(ROW_TPL.format(**vars(descr))
for descr in descriptions)
msg = index.status(query, len(descriptions))
else:
descriptions = []
res = ''
msg = 'Enter words describing characters.'
html = template.format(query=query, result=res, # <5>
message=msg)
print('Sending {} results'.format(len(descriptions))) # <6>
return web.Response(content_type=CONTENT_TYPE, text=html) # <7>
# END HTTP_CHARFINDER_HOME
# BEGIN HTTP_CHARFINDER_SETUP
def get_chars(request):
peername = request.transport.get_extra_info('peername')
print('Request from: {},GET data: {!r}'.format(peername, dict(request.GET)))
query = request.GET.get('query', '')
if query:
try:
start = int(request.GET.get('start', 0))
stop = int(request.GET.get('stop', sys.maxsize))
except ValueError:
raise web.HTTPBadRequest()
stop = min(stop, start+RESULTS_PER_REQUEST)
num_results, chars = index.find_chars(query, start, stop)
else:
raise web.HTTPBadRequest()
text = ''.join(char if n % 64 else char+'\n'
for n, char in enumerate(chars, 1))
response_data = {'total': num_results, 'start': start, 'stop': stop}
print('Response to query: {query!r},start: {start},stop: {stop}'.format(
query=query, **response_data))
response_data['chars'] = text
json_obj = json.dumps(response_data)
print('Sending {} characters'.format(len(text)))
headers = {'Access-Control-Allow-Origin': '*'}
return web.Response(content_type=TEXT_TYPE, headers=headers, text=json_obj)
def handle(request):
query = request.GET.get('query', '')
print('Query: {!r}'.format(query))
if query:
descriptions = list(index.find_descriptions(query))
res = '\n'.join(ROW_TPL.format(**vars(descr))
for descr in descriptions)
msg = index.status(query, len(descriptions))
else:
descriptions = []
res = ''
msg = 'Type words describing characters.'
text = PAGE_TPL.format(query=query,
message=msg, links=LINKS_HTML)
print('Sending {} results'.format(len(descriptions)))
return web.Response(content_type=CONTENT_TYPE, text=text)
def home(request): # <1>
query = request.GET.get('query', text=html) # <7>
# END HTTP_CHARFINDER_HOME
# BEGIN HTTP_CHARFINDER_SETUP
def homepage(request):
conn = request.app['conn']
t = HTML_HDR
t += "<h2>%s</h2>" % conn.server_info
# NOTE: only a demo program would do all these Remote Server
# queries just to display the hompage...
donate = await conn.RPC('server.donation_address')
motd = await conn.RPC('server.banner')
t += '<pre style="font-size: 50%%">\n%s\n</pre><hr/>' % motd
t += '<p>Donations: %s</p>' % linkage(donate)
t += '</p><p>Top block: %s</p>' % linkage(top_blk)
t += '''
<form method=POST action="/">
<input name='q' style="width: 50%" placeholder="Txn hash / address / etc"></input>
'''
return Response(content_type='text/html', text=t)
def search(request):
query = (await request.post())['q'].strip()
if not (1 <= len(query) <= 200):
raise HTTPFound('/')
if len(query) <= 7:
raise HTTPFound('/blk/'+query.lower())
elif len(query) == 64:
# assume it's a hash of block or txn
raise HTTPFound('/txn/'+query.lower())
elif query[0] in '13mn':
# assume it'a payment address
raise HTTPFound('/addr/'+query)
else:
return Response(text="Can't search for that")
def address_page(request):
# address summary by bitcoin payment addr
addr = request.match_info['addr']
conn = request.app['conn']
t = HTML_HDR
t += '<h1><code>%s</code></h1>' % addr
for method in ['blockchain.address.get_balance',
'blockchain.address.get_status',
'blockchain.address.get_mempool',
'blockchain.address.get_proof',
'blockchain.address.listunspent']:
# get a balance,etc.
t += await call_and_format(conn, method, addr)
return Response(content_type='text/html', text=t)
def register_in_memory_block_store_api(app, prefix='/blockstore'):
# Really,really simple stuff ;-)
blocks = {}
async def api_block_get(request):
id = request.match_info['id']
try:
return web.Response(body=blocks[id], content_type='application/octet-stream')
except KeyError:
raise web.HTTPNotFound()
async def api_block_post(request):
id = request.match_info['id']
if id in blocks:
raise web.HTTPConflict()
blocks[id] = await request.read()
return web.Response()
app.router.add_get(prefix + '/{id}', api_block_get)
app.router.add_post(prefix + '/{id}', api_block_post)
def async_good(self, request):
"""TBD."""
data = {
"title": "Top of the best stikers for Telegram",
"active_good": "class=\"active\"",
"active_bad": "",
"top": {}
}
await self.session_handler(request=request)
data["top"] = await self.db.get_top(9, 'IteraTOR_LE')
return web.Response(
text=self.jinja.get_template('good.html').render(
title=data["title"],
active_good=data["active_good"],
active_bad=data["active_bad"],
top=data["top"]
),
content_type='text/html')
# @aiohttp_jinja2.template('bad.html')
def action_bad(self, request):
"""TBD."""
data = {
"title": "Top of bad stikers for Telegram",
"active_good": "",
"active_bad": "class=\"active\"",
"top": {}
}
await self.session_handler(request=request)
data['top'] = await self.db.get_top(9, 'IteraTOR_GE')
return web.Response(
text=self.jinja.get_template('good.html').render(
title=data["title"],
top=data["top"]
),
content_type='text/html')
def __call__(self, value):
if _is_guillotina_response(value):
body = value.response
if not isinstance(body, str):
body = ujson.dumps(value.response)
body = body.encode('utf8')
value = aioResponse(
body=body,
headers=value.headers)
if 'content-type' not in value.headers:
value.headers.update({
'content-type': self.content_type
})
return value
def guess_response(self, value):
resp = value.response
if type(resp) in (dict, list, int, float, bool):
resp = aioResponse(body=bytes(json.dumps(resp, cls=GuillotinaJSONEncoder), str):
original_resp = resp
resp = aioResponse(body=bytes(resp, 'utf-8'))
if '<html' in original_resp:
resp.headers['Content-Type'] = 'text/html'
else:
resp.headers['Content-Type'] = 'text/plain'
elif resp is None:
# missing result...
resp = aioResponse(body=b'{}')
resp.headers['Content-Type'] = 'application/json'
resp.headers.update(value.headers)
if not resp.prepared:
resp.set_status(value.status)
return resp
def adapter_do_OPTIONS(self, request):
origin = request.headers["Origin"]
allowed_origins = self._bot.config.get_option("api_origins")
if allowed_origins is None:
raise web.HTTPForbidden()
if "*" == allowed_origins or "*" in allowed_origins:
return web.Response(headers={
"Access-Control-Allow-Origin": origin,
"Access-Control-Allow-Headers": "content-type",
})
if not origin in allowed_origins:
raise web.HTTPForbidden()
return web.Response(headers={
"Access-Control-Allow-Origin": origin,
"Access-Control-Allow-Headers": "content-type",
"vary": "Origin",
})
def adapter_do_GET(self, request):
payload = { "sendto": request.match_info["id"],
"key": request.match_info["api_key"],
"content": unquote(request.match_info["message"]) }
results = await self.process_request('', # IGnorED
'', # IGnorED
payload)
if results:
content_type="text/html"
results = results.encode("ascii", "xmlcharrefreplace")
else:
content_type="text/plain"
results = "OK".encode('utf-8')
return web.Response(body=results, content_type=content_type)
def test_middleware_doesnt_reissue_on_bad_response(self):
secret = b'01234567890abcdef'
auth_ = auth.CookieTktAuthentication(secret, 15, 0, cookie_name='auth')
middlewares = [
auth_middleware(auth_)]
valid_until = time.time() + 15
session_data = TicketFactory(secret).new('some_user',
valid_until=valid_until)
request = await make_request('GET', middlewares, \
[(auth_.cookie_name, session_data)])
user_id = await auth.get_auth(request)
self.assertEqual(user_id, 'some_user')
response = await make_response(request, web.Response(status=400))
self.assertFalse(auth_.cookie_name in response.cookies)
def get(self):
result = web.Response(
body=str.encode('You shall not pass!'),
content_type='text/plain'
)
return result
def response_ok():
return web.Response(**{'status': 200})
def response_not_found():
return web.Response(**{'status': 404})
def data_factory(app, handler):
async def parse_data(request):
logging.info('data_factory...')
if request.method in ('POST', 'PUT'):
if not request.content_type:
return web.HTTPBadRequest(text='Missing Content-Type.')
content_type = request.content_type.lower()
if content_type.startswith('application/json'):
request.__data__ = await request.json()
if not isinstance(request.__data__, dict):
return web.HTTPBadRequest(text='JSON body must be object.')
logging.info('request json: %s' % request.__data__)
elif content_type.startswith(('application/x-www-form-urlencoded', 'multipart/form-data')):
params = await request.post()
request.__data__ = dict(**params)
logging.info('request form: %s' % request.__data__)
else:
return web.HTTPBadRequest(text='Unsupported Content-Type: %s' % content_type)
elif request.method == 'GET':
qs = request.query_string
request.__data__ = {k: v[0] for k, v in parse.parse_qs(qs, True).items()}
logging.info('request query: %s' % request.__data__)
else:
request.__data__ = dict()
return await handler(request)
return parse_data
# ??????????????????Response??
def response_factory(app, handler):
async def response(request):
logging.info('Response handler...')
r = await handler(request)
if isinstance(r, web.StreamResponse):
return r
if isinstance(r, bytes):
resp = web.Response(body=r)
resp.content_type = 'application/octet-stream'
return resp
if isinstance(r, str):
if r.startswith('redirect:'):
return web.HTTPFound(r[9:])
resp = web.Response(body=r.encode('utf-8'))
resp.content_type = 'text/html;charset=utf-8'
return resp
if isinstance(r, dict):
template = r.get('__template__')
if template is None:
resp = web.Response(body=json.dumps(r, ensure_ascii=False, default=lambda o: o.__dict__).encode('utf-8'))
resp.content_type = 'application/json;charset=utf-8'
return resp
else:
# ???jinja2????????????
r['__user__'] = request.__user__
resp = web.Response(body=app['__templating__'].get_template(template).render(**r).encode('utf-8'))
resp.content_type = 'text/html;charset=utf-8'
return resp
if isinstance(r, int) and 100 <= r < 600:
return web.Response(status=r)
if isinstance(r, tuple) and len(r) == 2:
status, message = r
if isinstance(status, int) and 100 <= status < 600:
return web.Response(status=status, text=str(message))
# default
resp = web.Response(body=str(r).encode('utf-8'))
resp.content_type = 'text/plain;charset=utf-8'
return resp
return response
def make_webserver(self):
async def page(request):
body = self.settings['content']
return web.Response(text=body, content_type='text/html')
self.app.router.add_get('/', page)
self.handler = self.app.make_handler()
port = self.settings['server_port']
self.server = await self.bot.loop.create_server(self.handler, '0.0.0.0', port)
print('Serving webserver on {}:{}'.format(self.ip, port))
def webhook_handle(self, request):
"""
aiohttp.web handle for processing web hooks
:Example:
>>> from aiohttp import web
>>> app = web.Application()
>>> app.router.add_route('/webhook')
"""
update = await request.json()
self._process_update(update)
return web.Response()
def sparql_endpoint(request):
result = {
"post": dict((await request.post()).items()),
"path": request.path,
}
if "failure" in result['post'].get('query', ""):
raise web.HTTPBadRequest()
if "failure" in result['post'].get('update', ""):
raise web.HTTPBadRequest()
return web.Response(text=json.dumps(result),
content_type="application/json")
def crud_endpoint(request):
request.app['last_request'] = request
if request.method == "PATCH":
return web.Response(text="{}", content_type="application/json")
else:
raise web.HTTPNoContent()
def handle(request):
text = "Hello,can you hear me?"
return web.Response(body=text.encode('utf-8'))
def header_response(request):
return Response("foo", headers={
"location": "boo"
})
def registry_dump_handle(request):
'''
only read
:param request:
:return:
'''
registry = registry_dump_handle.registry
response_dict = {}
repo = registry._repository
response_dict['registered_services'] = repo._registered_services
response_dict['uptimes'] = repo._uptimes
response_dict['service_dependencies'] = repo._service_dependencies
return web.Response(status=400, content_type='application/json', body=json.dumps(response_dict).encode())
def _enable_http_middleware(func): # pre and post http,processing
@wraps(func)
async def f(self, *args, **kwargs):
if hasattr(self, 'middlewares'):
for i in self.middlewares:
if hasattr(i, 'pre_request'):
pre_request = getattr(i, 'pre_request')
if callable(pre_request):
try:
res = await pre_request(self, **kwargs) # passing service as first argument
if res:
return res
except Exception as e:
return Response(status=400,
body=json.dumps(
{'error': str(e), 'sector': getattr(i, 'middleware_info')}).encode())
_func = coroutine(func) # func is a generator object
result = await _func(self, **kwargs)
if hasattr(self, 'post_request'):
post_request = getattr(i, 'post_request')
if callable(post_request):
try:
res = await post_request(self, **kwargs)
if res:
return res
except Exception as e:
return Response(status=400, 'middleware_info')}).encode())
return result
return f
def stats(self, _):
res_d = Aggregator.dump_stats()
return Response(status=200, body=json.dumps(res_d).encode())
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。