Python aiohttp.web 模块,run_app() 实例源码
我们从Python开源项目中,提取了以下47个代码示例,用于说明如何使用aiohttp.web.run_app()。
def run_webhook(self, webhook_url, **options):
"""
Convenience method for running bots in webhook mode
:Example:
>>> if __name__ == '__main__':
>>> bot.run_webhook(webhook_url="https://yourserver.com/webhooktoken")
Additional documentation on https://core.telegram.org/bots/api#setwebhook
"""
loop = asyncio.get_event_loop()
loop.run_until_complete(self.set_webhook(webhook_url, **options))
if webhook_url:
url = urlparse(webhook_url)
app = self.create_webhook_app(url.path, loop)
host = os.environ.get('HOST', '0.0.0.0')
port = int(os.environ.get('PORT', 0)) or url.port
web.run_app(app, host=host, port=port)
def run(self, arguments, settings, app):
if arguments.reload:
if not HAS_AUTORELOAD:
sys.stderr.write(
'You must install aiohttp_autoreload for the --reload option to work.\n'
'Use `pip install aiohttp_autoreload` to install aiohttp_autoreload.\n'
)
return 1
aiohttp_autoreload.start()
port = arguments.port or settings.get('address', settings.get('port'))
host = arguments.host or settings.get('host', '0.0.0.0')
try:
web.run_app(app, port=port, loop=self.get_loop(),
access_log_format=settings.get('access_log_format'))
except asyncio.CancelledError:
# server shut down,we're good here.
pass
def __main():
global __wxsql
if not myid3.init('config.xml'):
print('?????????')
return
loop = asyncio.get_event_loop()
# __initwxmenu(loop)
__wxsql = WXsql()
app = web.Application(loop=loop)
app.router.add_route('GET', '/', getIndex)
app.router.add_route('GET', '/WeiXin', getWX)
app.router.add_route('POST', postWX)
web.run_app(app, port=6670) # hu ??6670??
loop.close()
def run(cmdargs=None, logprefix=''):
if cmdargs:
config.args = config.parser.parse_args(args=cmdargs)
else:
config.args = config.parser.parse_args()
logging.basicConfig(
format=logprefix + '{asctime} {levelname} {name}: {message}',
style='{',
level=logging.DEBUG if config.args.verbose else logging.WARNING
)
if not os.path.exists(config.args.queue):
os.makedirs(config.args.queue)
replication.dellog = DeletionLog(os.path.join(config.args.queue, 'deletion.log'))
logger.info('Starting up...')
loop = asyncio.get_event_loop()
start_replication_workers(loop)
app = create_app(loop)
app.on_response_prepare.append(on_response_prepare)
web.run_app(app, port=config.args.port, host=config.args.host, print=logger.info)
logger.info('Starting to tear down workers...')
stop_replication_workers(loop)
logger.info('Goodbye.')
def main():
# enable_request_logging()
parser = argparse.ArgumentParser()
parser.add_argument('module', help='Module used as source for programs')
parser.add_argument('--host')
parser.add_argument('--port', type=int)
args = parser.parse_args()
app_kw = {}
if args.host is not None:
app_kw['host'] = args.host
if args.port is not None:
app_kw['port'] = args.port
app = CCApplication()
app.initialize(args.module)
web.run_app(app, **app_kw)
def main():
loop = asyncio.get_event_loop()
root = logging.getLogger()
if root.handlers:
for handler in root.handlers:
root.removeHandler(handler)
logging.basicConfig(
level=logging.DEBUG,
format='%(levelname)-8s [%(asctime)s.%(msecs)03d] '
'(%(name)s): %(message)s',
datefmt='%Y-%m-%d %H:%M:%s'
)
logging.Formatter.converter = time.gmtime
app = loop.run_until_complete(init())
# More useful log format than default
log_format = '%a (%{X-Real-IP}i) %t "%r" %s %b %Tf ' \
'"%{Referrer}i" "%{User-Agent}i"'
web.run_app(app, access_log_format=log_format)
def run_server_idsfree(config: IdsFreeRunServeRemoveModel):
"""
This functions does:
- Check that remote host has required software and versions
- Build the environment and launch the attacks
- Load raw results,transform it and return in selected format as string
It returns the name of cyphered network created.
"""
assert isinstance(config, IdsFreeRunServeRemoveModel)
loop = asyncio.get_event_loop()
# Check remote Docker version
if not config.skip_check_requisites:
loop.run_until_complete(check_remote_requisites(config))
app['IDSFREE_CONfig'] = config
app['GLOBAL_CONfig'] = config_to_dict(config)
# Launch attacks
web.run_app(app,
host=config.listen_addr,
port=int(config.listen_port))
def main(filename = 'postgresql2websocket.conf'):
config = configparser.ConfigParser()
if not config.read(filename):
print("Unable to read %s" % filename)
exit(1)
loop = asyncio.get_event_loop()
app = loop.run_until_complete(init_app(config))
app['websockets'] = []
app.on_shutdown.append(on_shutdown)
try:
web.run_app(app,
host = config.get('web', 'host'),
port = config.getint('web', 'port'),
)
except KeyboardInterrupt:
pass
finally:
loop.close()
def run(self, host, port):
from .view import AbstractsqlView
self.route.bind()
for _, cls in self.route.views:
if issubclass(cls, AbstractsqlView):
self.tables[cls.table_name] = cls
self.permissions[cls.table_name] = cls.permission
cls.permission.app = self
# Configure default CORS settings.
cors = aiohttp_cors.setup(self._raw_app, defaults={
"*": aiohttp_cors.ResourceOptions(
allow_credentials=True,
expose_headers="*",
allow_headers="*",
)
})
# Configure CORS on all routes.
for r in list(self._raw_app.router.routes()):
if type(r.resource) != StaticResource:
cors.add(r)
web.run_app(host=host, app=self._raw_app)
def run(self):
try:
subcommand = self.argv[1]
except IndexError:
subcommand = 'help'
if subcommand == 'runserver':
try:
host, port = self.argv[2].split(':')
port = int(port)
if not port:
port = 8080
except (IndexError, ValueError):
print('WARNING! Incorrect host:port - using default settings.')
host = '0.0.0.0'
port = 8080
web.run_app(self.app, loop=self.app.loop)
elif subcommand == 'routes':
from djaio.ext.routes import print_routes
print_routes(self.app)
elif subcommand == 'help':
print('=' * 60)
print('Usage: {} <command> <options>'.format(self.argv[0].rsplit('/', 1)[-1]))
print('Available commands:')
print(' * help - shows this message')
print(' * runserver host:port - runs web server')
for key, comm_obj in self.app.commands.items():
print(' * {} <options> - {}'.format(key, comm_obj.get('description')))
print('=' * 60)
elif subcommand == 'shell':
import IPython
IPython.start_ipython(argv=[])
elif subcommand in self.app.commands:
_args = self.argv[2:]
_coro = self.app.commands[subcommand].get('func')
self.app.loop.run_until_complete(_coro(self.app, *_args))
def run(self) -> None:
web.run_app(self.app, host=self.host, port=self.port, loop=self.loop)
def main():
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument('--occlude-root', action='store_true',
help='Add an explicit handler function for /.')
args = parser.parse_args()
app = web.Application(middlewares=[IndexMiddleware()])
app.router.add_route('GET', '/test/', make_handler('test'))
if args.occlude_root:
app.router.add_route('GET', make_handler('I occluded /index.html.'))
app.router.add_static('/', APP_PATH / 'static')
web.run_app(app)
def main():
application = Application(
[HelloWorldService],
tns='aiohttp_spyne.examples.hello',
in_protocol=Soap11(validator='lxml'),
out_protocol=Soap11())
spyne_app = AioApplication(application)
spyne_app['test_text'] = "Hello,%s"
app = web.Application()
app.add_subapp('/say_hello/', spyne_app)
web.run_app(app, port=8080)
def spyne_app_process():
class TestService(ServiceBase):
@rpc(Unicode, _returns=Unicode)
def ping(self, data):
return data
application = Application(
[TestService],
tns='aiohttp_spyne.tests.test',
out_protocol=Soap11())
spyne_app = AioApplication(application, client_max_size=1024 ** 2 * 2)
web.run_app(spyne_app, host="0.0.0.0", port=PORT)
def serve(serve_root, subdirectory, port, asset_file=None):
app = create_app(serve_root, subdirectory=subdirectory, asset_file=asset_file)
# Todo in theory file watching Could be replaced by accessing tool_chain.source_map
observer = Observer()
event_handler = DevServerEventEventHandler(app, serve_root)
observer.schedule(event_handler, str(serve_root), recursive=True)
observer.start()
logger.info('Started dev server at http://localhost:%s,use Ctrl+C to quit', port)
try:
web.run_app(app, print=lambda msg: None)
except KeyboardInterrupt:
pass
finally:
observer.stop()
observer.join()
def run(self, app):
web.run_app(app, port=settings['address'])
def main():
if not options.syslog:
coloredlogs.install(level=logging.DEBUG if options.debug else logging.INFO,
fmt='[%(levelname).1s %(asctime)s %(module)s:%(lineno)d] %(message)s',
datefmt='%y%m%d %H:%M:%s')
else:
syslog.enable_system_logging(level=logging.DEBUG if options.debug else logging.INFO,
fmt='vj4[%(process)d] %(programname)s %(levelname).1s %(message)s')
logging.getLogger('sockjs').setLevel(logging.WARNING)
url = urllib.parse.urlparse(options.listen)
if url.scheme == 'http':
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True)
host, port_str = url.netloc.rsplit(':', 1)
sock.bind((host, int(port_str)))
elif url.scheme == 'unix':
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
try:
os.remove(url.path)
except FileNotFoundError:
pass
sock.bind(url.path)
else:
_logger.error('Invalid listening scheme %s', url.scheme)
return 1
for i in range(1, options.prefork):
pid = os.fork()
if not pid:
break
else:
atexit.register(lambda: os.kill(pid, signal.SIGTERM))
web.run_app(app.Application(), sock=sock, access_log=None, shutdown_timeout=0)
def _core(socket, backend_host, backend_watchdog,
debug, identity, identity_key, i_am_john, cache_size):
app = unix_socket_app.UnixSocketApplication()
components = core_components_factory(app, cache_size)
dispatcher = components.get_dispatcher()
register_core_api(app, dispatcher)
# Todo: remove me once RSA key loading and backend handling are easier
if i_am_john:
async def load_identity(app):
from parsec.core.identity import EIdentityLoad
eff = Effect(EIdentityLoad(JOHN_DOE_IDENTITY, JOHN_DOE_PRIVATE_KEY))
await asyncio_perform(dispatcher, eff)
print('Welcome back M. Doe')
app.on_startup.append(load_identity)
elif identity:
async def load_identity(app):
from parsec.core.identity import EIdentityLoad, EIdentityLogin
if identity_key:
print("Reading %s's key from `%s`" % (identity, identity_key))
password = getpass()
eff = Effect(EIdentityLoad(identity, identity_key.read(), password))
await asyncio_perform(dispatcher, eff)
else:
print("Fetching %s's key from backend privkey store." % (identity))
password = getpass()
eff = Effect(EIdentityLogin(identity, eff)
print('Connected as %s' % identity)
app.on_startup.append(load_identity)
if debug:
loop = asyncio.get_event_loop()
loop.set_debug(True)
else:
logger_stream.level = WARNING
print('Starting parsec core on %s (connecting to backend %s)' % (socket, backend_host))
unix_socket_app.run_app(app, path=socket)
print('Bye ;-)')
def _backend(host, pubkeys, no_client_auth, store, block_store, debug):
host = host or environ.get('HOST', 'localhost')
port = port or int(environ.get('PORT', 6777))
app = web.Application()
if not block_store:
block_store = '/blockstore'
register_in_memory_block_store_api(app, prefix=block_store)
if store:
if store.startswith('postgres://'):
store_type = 'Postgresql'
backend_components = postgresql_components_factory(app, block_store)
else:
raise SystemExit('UnkNown store `%s` (should be a postgresql db url).' % store)
else:
store_type = 'mocked in memory'
backend_components = mocked_components_factory(block_store)
dispatcher = backend_components.get_dispatcher()
register_backend_api(app, dispatcher)
register_start_api(app, dispatcher)
if debug:
loop = asyncio.get_event_loop()
loop.set_debug(True)
else:
logger_stream.level = WARNING
# Todo: remove me once RSA key loading and backend handling are easier
async def insert_john(app):
from parsec.backend.pubkey import EPubKeyGet, EPubKeyAdd
dispatcher = backend_components.get_dispatcher()
try:
await asyncio_perform(dispatcher, Effect(EPubKeyGet(JOHN_DOE_IDENTITY)))
except PubKeyNotFound:
await asyncio_perform(
dispatcher, Effect(EPubKeyAdd(JOHN_DOE_IDENTITY, JOHN_DOE_PUBLIC_KEY)))
app.on_startup.append(insert_john)
print('Starting parsec backend on %s:%s with store %s' % (host, store_type))
web.run_app(app, port=port)
print('Bye ;-)')
def handle(self, *args, **kwargs):
LOGGER.addHandler(logging.StreamHandler())
LOGGER.setLevel(logging.DEBUG)
loop = asyncio.get_event_loop()
app = web.Application(loop=loop)
app.router.add_get('/file/{id}/', download)
app.router.add_put('/file/{id}/', upload)
web.run_app(app, host='localhost', port=8080)
def run_webserver(app, port=PORT):
aiohttp_jinja2.setup(app, loader=jinja2.FileSystemloader(os.curdir))
app.router.add_route('GET', handler)
app.router.add_route('GET', '/{max_pkgs}', handler)
app.router.add_static('/static/', path='./static')
web.run_app(app, port=PORT)
def main():
# init logging
logging.basicConfig(level=logging.DEBUG)
loop = asyncio.get_event_loop()
app, port = loop.run_until_complete(init(loop))
web.run_app(app, port=port)
def serve(opts):
if opts.mock:
bot = MockEiBotBoard()
else:
bot = EiBotBoard.find()
try:
app = make_app(bot)
web.run_app(app, port=opts.port)
finally:
bot.close()
def main():
# init logging
logging.basicConfig(level=logging.DEBUG)
loop = asyncio.get_event_loop()
app, port=port)
def main():
logging.basicConfig(level=logging.DEBUG)
loop = asyncio.get_event_loop()
app, port=port)
def main():
# init logging
logging.basicConfig(level=logging.DEBUG)
loop = asyncio.get_event_loop()
app, port=port)
def main():
"""
Main function to get the entire ball rolling.
"""
logging.basicConfig(level=logging.WARNING)
db.initialize(DATABASE_URL)
loop = asyncio.get_event_loop()
asyncio.ensure_future(TG_BOT.loop())
app = web.Application(loop=loop)
app.router.add_route('GET', '/rooms/{room_alias}', matrix_room)
app.router.add_route('PUT', '/transactions/{transaction}',
matrix_transaction)
web.run_app(app, port=AS_PORT)
def run_server():
app = web.Application()
app.router.add_get('/', home)
app.router.add_route('*', '/request-info', request_info)
app.router.add_route('*', '/502', view502)
sleep_time_resource = app.router.add_resource('/sleep/{sleep:\d+}')
sleep_time_resource.add_route('*', sleep_time)
web.run_app(app, port=21571)
def main():
app_config = TriggearConfig()
gh_client = github.Github(login_or_token=app_config.github_token)
mongo_client = motor.motor_asyncio.AsyncIOMotorClient() if not os.environ.get('COMPOSE') == 'true' \
else motor.motor_asyncio.AsyncIOMotorClient('mongodb://mongodb:27017')
jenkins_client = jenkins.Jenkins(url=app_config.jenkins_url,
username=app_config.jenkins_user_id,
password=app_config.jenkins_api_token)
github_controller = GithubController(github_client=gh_client,
mongo_client=mongo_client,
jenkins_client=jenkins_client,
config=app_config)
pipeline_controller = PipelineController(github_client=gh_client,
mongo_client=mongo_client,
api_token=app_config.triggear_token)
health_controller = HealthController(api_token=app_config.triggear_token)
app = web.Application()
app.router.add_post('/github', github_controller.handle_hook)
app.router.add_post('/register', pipeline_controller.handle_register)
app.router.add_post('/status', pipeline_controller.handle_status)
app.router.add_post('/comment', pipeline_controller.handle_comment)
app.router.add_get('/health', health_controller.handle_health_check)
app.router.add_get('/missing/{eventType}', pipeline_controller.handle_missing)
app.router.add_post('/deregister', pipeline_controller.handle_deregister)
app.router.add_post('/clear', pipeline_controller.handle_clear)
web.run_app(app)
def run_server(backend, loop=None):
if loop:
asyncio.set_event_loop(loop)
app = web.Application()
app['cache'] = CacheManager(backend)
app.router.add_route('GET', handler_get)
web.run_app(app)
def main(conf):
init_debug(conf)
app = init_aiohttp(conf)
setup_db(app)
setup_routes(app, conf)
register_timeout_handler(app)
web.run_app(app, host="localhost", port=8080)
def main():
app = web.Application()
app.router.add_route('GET', kitten)
web.run_app(app, port=6000)
def main():
app = web.Application()
app.router.add_route('GET', port=6000)
def start(self):
# SSH server
self.ssh_server = self._start_sshd()
self.loop.run_until_complete(self.ssh_server)
# http server
app = web.Application()
app.router.add_get('/', self._handle_root_get)
app.router.add_post('/post', self._handle_webhook_post)
self.web_app = app
# this calls run_until_complete somewhere
web.run_app(app)
def run_server(loop=None):
"""
:param asyncio.AbstractEventLoop loop:
"""
loop = loop or asyncio.get_event_loop()
app = web.Application(loop=loop)
routes.setup_routes(app)
return web.run_app(app, host='0.0.0.0', port=80)
def run():
app = web.Application(middlewares=[
submissions_middleware,
response_middleware,
error_middleware
])
app.on_startup.append(on_startup)
app.on_cleanup.append(on_cleanup)
scanner = venusian.Scanner(router=app.router)
scanner.scan(playlog.web)
if config.DEBUG:
import aioreloader
aioreloader.start()
web.run_app(app, host=config.SERVER_HOST, port=config.SERVER_PORT)
def main():
init_logger()
init_uvloop()
from bernard.conf import settings
from os import getenv
if settings.CODE_LIVE_RELOAD and getenv('_IN_CHILD') != 'yes':
from ._live_reload import start_parent
return start_parent()
# noinspection PybroadException
try:
from aiohttp import web
from bernard.server import app
from bernard.utils import run
from bernard.platforms import start_all
run(start_all())
if settings.CODE_LIVE_RELOAD:
init_live_reload(False)
except Exception:
logger.exception('Something bad happened while bootstraping')
if settings.CODE_LIVE_RELOAD:
init_live_reload(True)
else:
# noinspection PyArgumentList
web.run_app(app, **settings.SERVER_BIND)
def main(matrix_server, server_domain,
access_token, cache_path=None,
debug=False):
if debug:
logger.setLevel(logging.DEBUG)
if not cache_path:
cache_path = NamedTemporaryFile(delete=True).name
else:
cache_path = os.path.abspath(cache_path)
loop = asyncio.get_event_loop()
apps = AppService(matrix_server=matrix_server,
server_domain=server_domain,
access_token=access_token,
cache_path=cache_path,
loop=loop)
try:
# Do some setup
web.run_app(apps.app, host='127.0.0.1', port=5000)
apps.client_session.close()
finally:
apps.save_cache()
def start_rest_api(host, connection, timeout, registry):
"""Builds the web app,adds route handlers,and finally starts the app.
"""
loop = asyncio.get_event_loop()
connection.open()
app = web.Application(loop=loop)
app.on_cleanup.append(lambda app: connection.close())
# Add routes to the web app
LOGGER.info('Creating handlers for validator at %s', connection.url)
handler = RouteHandler(loop, registry)
app.router.add_post('/batches', handler.submit_batches)
app.router.add_get('/batch_statuses', handler.list_statuses)
app.router.add_post('/batch_statuses', handler.list_statuses)
app.router.add_get('/state', handler.list_state)
app.router.add_get('/state/{address}', handler.fetch_state)
app.router.add_get('/blocks', handler.list_blocks)
app.router.add_get('/blocks/{block_id}', handler.fetch_block)
app.router.add_get('/batches', handler.list_batches)
app.router.add_get('/batches/{batch_id}', handler.fetch_batch)
app.router.add_get('/transactions', handler.list_transactions)
app.router.add_get(
'/transactions/{transaction_id}',
handler.fetch_transaction)
app.router.add_get('/receipts', handler.list_receipts)
app.router.add_post('/receipts', handler.list_receipts)
app.router.add_get('/peers', handler.fetch_peers)
subscriber_handler = StateDeltaSubscriberHandler(connection)
app.router.add_get('/subscriptions', subscriber_handler.subscriptions)
app.on_shutdown.append(lambda app: subscriber_handler.on_shutdown())
# Start app
LOGGER.info('Starting REST API on %s:%s', port)
web.run_app(
app,
host=host,
port=port,
access_log=LOGGER,
access_log_format='%r: %s status,%b size,in %Tf s')
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。