Python flask.request 模块,url() 实例源码
我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用flask.request.url()。
def bookmarklet_js():
base_url = request.url.replace(
"browser-tools/bookmarklet.js",
"static/browser-tools/"
)
if "localhost:" not in base_url:
# seems like this shouldn't be necessary. but i think
# flask's request.url is coming in with http even when
# we asked for https on the server. weird.
base_url = base_url.replace("http://", "https://")
rendered = render_template(
"browser-tools/bookmarklet.js",
base_url=base_url
)
resp = make_response(rendered, 200)
resp.mimetype = "application/javascript"
return resp
def showTimeMap(urir, format):
urir = getCompleteURI(urir)
s = surt.surt(urir, path_strip_trailing_slash_unless_empty=False)
indexPath = ipwbConfig.getipwBReplayIndexPath()
cdxjlinesWithURIR = getCDXjlinesWithURIR(urir, indexPath)
tmContentType = ''
if format == 'link':
tm = generateLinkTimeMapFromCDXjlines(
cdxjlinesWithURIR, s, request.url)
tmContentType = 'application/link-format'
elif format == 'cdxj':
tm = generateCDXJTimeMapFromCDXjlines(
cdxjlinesWithURIR, request.url)
tmContentType = 'application/cdxj+ors'
resp = Response(tm)
resp.headers['Content-Type'] = tmContentType
return resp
def update_content_in_local_cache(url, content, method='GET'):
"""?? local_cache ??????,??content
?stream?????"""
if local_cache_enable and method == 'GET' and cache.is_cached(url):
info_dict = cache.get_info(url)
resp = cache.get_obj(url)
resp.set_data(content)
# ???????????content?,without_content ????true
# ?????????,???content????,????????
# ?stream???,??????http?,???????,????????????????
# ?????????????????????,???????????????
info_dict['without_content'] = False
if verbose_level >= 4: dbgprint('LocalCache_UpdateCache', url, content[:30], len(content))
cache.put_obj(
url,
resp,
obj_size=len(content),
expires=get_expire_from_mime(parse.mime),
last_modified=info_dict.get('last_modified'),
info_dict=info_dict,
)
def extract_url_path_and_query(full_url=None, no_query=False):
"""
Convert http://foo.bar.com/aaa/p.html?x=y to /aaa/p.html?x=y
:param no_query:
:type full_url: str
:param full_url: full url
:return: str
"""
if full_url is None:
full_url = request.url
split = urlsplit(full_url)
result = split.path or "/"
if not no_query and split.query:
result += '?' + split.query
return result
# ################# End Client Request Handler #################
# ################# Begin Middle Functions #################
def request_remote_site():
"""
???????(high-level),????404/500??? domain_guess ??
"""
# ????????
# ??: ?zmirror?????????,??????????????
parse.remote_response = send_request(
parse.remote_url,
method=request.method,
headers=parse.client_header,
data=parse.request_data_encoded,
)
if parse.remote_response.url != parse.remote_url:
warnprint("requests's remote url", parse.remote_response.url,
'does no equals our rewrited url', parse.remote_url)
if 400 <= parse.remote_response.status_code <= 599:
# ??url????????
dbgprint("Domain guessing for", request.url)
result = guess_correct_domain()
if result is not None:
parse.remote_response = result
def advisory_atom():
last_recent_entries = 15
data = get_advisory_data()['published'][:last_recent_entries]
Feed = AtomFeed('Arch Linux Security - Recent advisories',
Feed_url=request.url, url=request.url_root)
for entry in data:
advisory = entry['advisory']
package = entry['package']
title = '[{}] {}: {}'.format(advisory.id, package.pkgname, advisory.advisory_type)
Feed.add(title=title,
content=render_template('Feed.html', content=advisory.content),
content_type='html',
summary=render_template('Feed.html', content=advisory.impact),
summary_tpe='html',
author='Arch Linux Security Team',
url=TRACKER_ISSUE_URL.format(advisory.id),
published=advisory.created,
updated=advisory.created)
return Feed.get_response()
def predict():
import ipdb; ipdb.set_trace(context=20)
if 'file' not in request.files:
flash('No file part')
return redirect(request.url)
file = request.files['file']
if file.filename == '':
flash('No selected file')
return redirect(request.url)
if file and allowed_file(file.filename):
filename = secure_filename(file.filename)
try:
pokemon_name = predict_mlp(file).capitalize()
pokemon_desc = pokemon_entries.get(pokemon_name)
msg = ""
except Exception as e:
pokemon_name = None
pokemon_desc = None
msg = str(e)
return jsonify({'name': pokemon_name, 'description': pokemon_desc, "msg": msg})
def RSS_Feed():
Feed = AtomFeed('White House Briefing Room Releases', Feed_url=request.url, url=request.url_root)
documents = WhiteHouse.query.order_by(WhiteHouse.document_date.desc())
for document in documents:
Feed.add(document.title, document.tweet,
content_type='text',
author="@presproject2017",
url=make_external(document.full_url),
updated=document.document_date,
published=document.document_date)
return Feed.get_response()
def retrieveAlertsCountWithType():
""" Retrieve number of alerts in timeframe (GET-Parameter time as decimal or "day") and divide into honypot types"""
# get result from cache
getCacheResult = getCache(request.url, "url")
if getCacheResult is not False:
return jsonify(getCacheResult)
# query ES
else:
# Retrieve Number of Alerts from ElasticSearch and return as xml / json
if not request.args.get('time'):
app.logger.error('No time GET-parameter supplied in retrieveAlertsCountWithType. Must be decimal number (in minutes) or string "day"')
return app.config['DEFAULTRESPONSE']
else:
returnResult = formatAlertsCountWithType(queryAlertsCountWithType(request.args.get('time'), checkCommunityIndex(request)))
setCache(request.url, returnResult, 13, "url")
app.logger.debug('UNCACHED %s' % str(request.url))
return jsonify(returnResult)
def retrieveDatasetAlertsPerMonth():
""" Retrieve the attacks / day in the last x days from elasticsearch
and return as JSON for the last months,defaults to last month,
if no GET parameter days is given
"""
# get result from cache
getCacheResult = getCache(request.url, "url")
if getCacheResult is not False:
return jsonify(getCacheResult)
# query ES
else:
if not request.args.get('days'):
# Using default : within the last month
returnResult = formatDatasetAlertsPerMonth(queryDatasetAlertsPerMonth(None, checkCommunityIndex(request)))
else:
returnResult = formatDatasetAlertsPerMonth(queryDatasetAlertsPerMonth(request.args.get('days'), checkCommunityIndex(request)))
setCache(request.url, 600, "url")
return jsonify(returnResult)
def retrieveDatasetAlertTypesPerMonth():
""" Retrieve the attacks / day in the last x days from elasticsearch,
split by attack group
and return as JSON for the last x months, "url")
if getCacheResult is not False:
return jsonify(getCacheResult)
# query ES
else:
if not request.args.get('days'):
# Using default : within the last month
returnResult = formatDatasetAlertTypesPerMonth(queryDatasetAlertTypesPerMonth(None, checkCommunityIndex(request)))
else:
returnResult = formatDatasetAlertTypesPerMonth(queryDatasetAlertTypesPerMonth(request.args.get('days'), 3600, "url")
return jsonify(returnResult)
def retrieveAlertStats():
""" Retrieve combined statistics
AlertsLastMinute,AlertsLastHour,AlertsLast24Hours
"""
# get result from cache
getCacheResult = getCache(request.url, "url")
if getCacheResult is not False:
return jsonify(getCacheResult)
# query ES
else:
returnResult = formatAlertStats(queryAlertStats(checkCommunityIndex(request)))
setCache(request.url, "url")
app.logger.debug('UNCACHED %s' % str(request.url))
return jsonify(returnResult)
def get_inBox():
pyldnlog.debug("Requested inBox data of {} in {}".format(request.url, request.headers['Accept']))
if not request.headers['Accept'] or request.headers['Accept'] == '*/*' or 'text/html' in request.headers['Accept']:
resp = make_response(inBox_graph.serialize(format='application/ld+json'))
resp.headers['Content-Type'] = 'application/ld+json'
elif request.headers['Accept'] in ACCEPTED_TYPES:
resp = make_response(inBox_graph.serialize(format=request.headers['Accept']))
resp.headers['Content-Type'] = request.headers['Accept']
else:
return 'Requested format unavailable', 415
resp.headers['X-Powered-By'] = 'https://github.com/albertmeronyo/pyldn'
resp.headers['Allow'] = "GET,HEAD,OPTIONS,POST"
resp.headers['Link'] = '<http://www.w3.org/ns/ldp#Resource>; rel="type",<http://www.w3.org/ns/ldp#RDFSource>; rel="type",<http://www.w3.org/ns/ldp#Container>; rel="type",<http://www.w3.org/ns/ldp#BasicContainer>; rel="type"'
resp.headers['Accept-Post'] = 'application/ld+json,text/turtle'
return resp
def get_notification(id):
pyldnlog.debug("Requested notification data of {}".format(request.url))
pyldnlog.debug("Headers: {}".format(request.headers))
# Check if the named graph exists
pyldnlog.debug("Dict key is {}".format(pyldnconf._inBox_url + id))
if pyldnconf._inBox_url + id not in graphs:
return 'Requested notification does not exist', 404
if 'Accept' not in request.headers or request.headers['Accept'] == '*/*' or 'text/html' in request.headers['Accept']:
resp = make_response(graphs[pyldnconf._inBox_url + id].serialize(format='application/ld+json'))
resp.headers['Content-Type'] = 'application/ld+json'
elif request.headers['Accept'] in ACCEPTED_TYPES:
resp = make_response(graphs[pyldnconf._inBox_url + id].serialize(format=request.headers['Accept']))
resp.headers['Content-Type'] = request.headers['Accept']
else:
return 'Requested format unavailable', 415
resp.headers['X-Powered-By'] = 'https://github.com/albertmeronyo/pyldn'
resp.headers['Allow'] = "GET"
return resp
def register_extension(app):
@app.before_request
def add_correlation_id(*args, **kw):
correlation_id = request.headers.get(CORRELATION_ID)
log.debug("%s %s", request.method, request.url)
if not correlation_id:
correlation_id = str(uuid.uuid4())
if request.method != "GET":
"""
Todo: remove sensitive information such as username/password
"""
log.debug({
"message": "Tracking request",
"correlation_id": correlation_id,
"method": request.method,
"uri": request.url,
"data": request.data,
})
request.correlation_id = correlation_id
@app.after_request
def save_correlation_id(response):
if CORRELATION_ID not in response.headers:
response.headers[CORRELATION_ID] = getattr(request, "correlation_id", None)
return response
def _provide_client_handler(self, section, name, kwargs_updator=None):
def _wrapper(func):
@wraps(func)
def _handler(**kwargs):
client_key = self.auth.authenticate(
request.method,
request.url,
request.headers)
client = self.client_class.load(client_key)
if not client:
abort(401)
g.ac_client = client
kwargs['client'] = client
if kwargs_updator:
kwargs.update(kwargs_updator(**kwargs))
ret = func(**kwargs)
if ret is not None:
return ret
return '', 204
self._add_handler(section, _handler)
return func
return _wrapper
def error_mail(subject, data, r, via_web=True):
body = '''
remote URL: {r.url}
status code: {r.status_code}
request data:
{data}
status code: {r.status_code}
content-type: {r.headers[content-type]}
reply:
{r.text}
'''.format(r=r, data=data)
if not has_request_context():
via_web = False
if via_web:
user = get_username()
body = 'site URL: {}\nuser: {}\n'.format(request.url, user) + body
send_mail(subject, body)
def announce_change(change):
place = change.place
body = '''
user: {change.user.username}
name: {name}
page: {url}
items: {change.update_count}
comment: {change.comment}
https://www.openstreetmap.org/changeset/{change.id}
'''.format(name=place.display_name,
url=place.candidates_url(_external=True),
change=change)
send_mail('tags added: {}'.format(place.name_for_changeset), body)
def open_changeset_error(place, changeset, r):
template = '''
user: {change.user.username}
name: {name}
page: {url}
sent:
{sent}
reply:
{reply}
'''
body = template.format(name=place.display_name,
url=place.candidates_url(_external=True),
sent=changeset,
reply=r.text)
send_mail('error creating changeset:' + place.name, body)
def log_exception(self, exc_info):
self.logger.error("""
Path: %s
HTTP Method: %s
Client IP Address: %s
User Agent: %s
User Platform: %s
User browser: %s
User browser Version: %s
GET args: %s
view args: %s
URL: %s
""" % (
request.path,
request.method,
request.remote_addr,
request.user_agent.string,
request.user_agent.platform,
request.user_agent.browser,
request.user_agent.version,
dict(request.args),
request.view_args,
request.url
), exc_info=exc_info)
def authenticated(fn):
"""Mark a route as requiring authentication."""
@wraps(fn)
def decorated_function(*args, **kwargs):
if not session.get('is_authenticated'):
return redirect(url_for('login', next=request.url))
if request.path == '/logout':
return fn(*args, **kwargs)
if (not session.get('name') or
not session.get('email') or
not session.get('institution')) and request.path != '/profile':
return redirect(url_for('profile', next=request.url))
return fn(*args, **kwargs)
return decorated_function
def login_required(func):
"""
Decorator to require login and save URL for redirecting user after login
"""
@wraps(func)
def decorated_function(*args, **kwargs):
"""decorator args"""
if not is_logged_in():
# Save off the page so we can redirect them to what they were
# trying to view after logging in.
session['prevIoUsly_requested_page'] = request.url
return redirect(url_for('login'))
return func(*args, **kwargs)
return decorated_function
def set_featured_title():
"""Form POST to update featured title"""
title = request.form['title']
stack = request.form['stack']
article = models.search_for_article(title, stacks=[stack], status=PUBLISHED)
if article is None:
flash('Cannot find published guide "%s" stack "%s"' % (title, stack),
category='error')
url = session.pop('prevIoUsly_requested_page', None)
if url is None:
url = url_for('index')
return redirect(url)
models.set_featured_article(article)
flash('Featured guide updated', category='info')
return redirect(url_for('index'))
def get_social_redirect_url(article, share_domain):
"""
Get social redirect url for po.st to enable all counts to follow us
regardless of where we're hosted.
"""
# Strip of trailing / to avoid having two slashes together in resulting url
if share_domain.endswith('/'):
share_domain = share_domain[:-1]
redirect_url = filters.url_for_article(article)
# Use full domain for redirect_url b/c this controls the po.st social
# sharing numbers. We want these numbers to stick with the domain
# we're running on so counts go with us.
url = url_for_domain(redirect_url, domain=share_domain)
return strip_subfolder(url)
def strip_subfolder(url):
"""
Strip off the subfolder if it exists so we always use the exact same
share url for saving counts.
"""
subfolder = app.config.get('SUBFOLDER', None)
if not subfolder:
return url
p = urlparse.urlparse(url)
if not p.path.startswith(subfolder):
return url
new_path = p.path.replace('%s' % (subfolder), '', 1)
new_url = urlparse.ParseResult(p.scheme, p.netloc, new_path, p.params,
p.query, p.fragment)
return new_url.geturl()
def require_login_frontend(only_if=True):
"""
Same logic as the API require_login,but this decorator is intended for use for frontend interfaces.
It returns a redirect to the login page,along with a post-login redirect_url as a GET parameter.
:param only_if: Optionally specify a boolean condition that needs to be true for the frontend login to be required.
This is semantically equivalent to "require login for this view endpoint only if <condition>,
otherwise,no login is required"
"""
def decorator(func):
@wraps(func)
def decorated_view(*args, **kwargs):
if not current_user.is_authenticated and only_if:
return redirect(UserLoginInterfaceURI.uri(redirect_url=quote(request.url, safe='')))
return func(*args, **kwargs)
return decorated_view
return decorator
def curl():
form = MyForm.MyForm_input()
if form.submit.data:
urls = form.text.data.strip().splitlines()
urls = set(urls)
for url in urls:
Purge = purge.Purged()
if not url or url.startswith('#'):
continue
else:
url = url.strip()
if not url.startswith('http'):
flash('url begin with http(s)://')
return render_template('Message_static.html',Main_Infos=g.main_infos)
url_rep=Purge.purge_cdn(url)
flash(url+' purge CDN '+url_rep)
return render_template('Message_static.html',Main_Infos=g.main_infos)
return render_template('cdn.html',form=form,Main_Infos=g.main_infos)
def chart_center_traffic():
try:
Tra_cli_url_minute_datas = collections.OrderedDict()
Tra_ser_url_minute_datas = collections.OrderedDict()
for i in range(1,5):
Tm = datetime.datetime.Now() - datetime.timedelta(minutes=i)
Tm = Tm.strftime('%H:%M')
Tra_cli_url_minute_Key = 'traffic.cli.url_%s' % Tm
Tra_ser_url_minute_Key = 'traffic.ser.url_%s' % Tm
Tra_cli_url_minute_datas[Tm] = [[str(url), int(RC.zscore(Tra_cli_url_minute_Key, url)) * 8 / 1024 / 1024] for url in RC.zrevrange(Tra_cli_url_minute_Key, 0,4)]
Tra_ser_url_minute_datas[Tm] = [[str(url), int(RC.zscore(Tra_ser_url_minute_Key,url)) * 8 / 1024 / 1024] for url in RC.zrevrange(Tra_ser_url_minute_Key,4) ]
return render_template('chart_center_traffic.html',Main_Infos=g.main_infos,Tra_cli_url_minute_datas=Tra_cli_url_minute_datas,Tra_ser_url_minute_datas=Tra_ser_url_minute_datas)
except Exception as e:
logging.error(e)
flash('??????!')
return render_template('Message_static.html', Main_Infos=g.main_infos)
def gateway_domain():
try:
DATA = [eval(v) for v in RC.lrange('top_url_%s'%time.strftime('%Y-%m-%d',time.localtime()), -1)]
TOP_URL_DATA = [{'data': DATA, 'name': 'conn'}]
values = collections.OrderedDict()
for k in range(1,7):
td = datetime.datetime.Now()-datetime.timedelta(minutes=k)
tt = td.strftime("%H:%M")
tm = td.strftime('%Y%m%d%H%M')
tables = ('????','????')
httpry_Key = 'httpry_domain.%s' % tm
values[tt] = [[url,int(RC.zscore(httpry_Key, url))] for url in RC.zrevrange(httpry_Key, 10)]
return render_template('gateway_domain.html',tables = tables,values=values,TOP_URL_DATA=TOP_URL_DATA)
except Exception as e:
logging.error(e)
flash('??????!')
return render_template('Message_static.html', Main_Infos=g.main_infos)
def backup_MysqL_results():
produce.Async_log(g.user, request.url)
try:
if Redis.exists('finish_backup'):
Infos = Redis.lrange('finish_backup',0,-1)
if Infos:
Infos = [eval(info) for info in set(Infos)]
tt = time.strftime('%Y-%m-%d', time.localtime())
tables = ('??','?????','MysqL???',' ??')
return render_template('backup_MysqL_results.html',Infos=Infos,tt=tt,tables=tables)
else:
raise flash('????:?????????!')
else:
raise flash('????:?????????!')
except Exception as e:
if 'old' not in str(e):
flash(str(e))
return render_template('Message_static.html',Main_Infos=g.main_infos)
def get_proxy_request_url(thisAuth, thisContainer=None, thisObject=None):
"""
create the url under which this API proxy will reach its swift back end. basically this is the request url with a different hostname
:param thisAuth:
:param thisContainer:
:param thisObject:
:return:
"""
u = configuration.swift_store_url.format(thisAuth)
if thisContainer:
u += "/" + thisContainer
if thisObject:
u += "/" + thisObject
return u
##############################################################################
# Frontend Pool
##############################################################################
def handle_auth():
"""
Forward the auth request to swift
replace the given storage url with our own:
'X-Storage-Url': 'http://192.168.209.204:8080/v1/AUTH_test'
becomes
'X-Storage-Url': 'http://localhost:4000/v1/AUTH_test'
this is the first request any client makes; we passed on an auth-token from swift
which is used in further requests
:return:
"""
clientHeaders = request.headers
swiftStatus, swiftHeaders, swiftBody = httpBackend.doAuthGetToken(reqHead=clientHeaders, method="GET")
log.debug("swift response: {} {} {}".format(swiftStatus, swiftBody))
if 200 == swiftStatus:
replaceStorageUrl(swiftResponse=swiftHeaders)
log.debug("proxy response: {} {} {}".format(swiftStatus, swiftBody))
return Response(status=swiftStatus, headers=swiftHeaders, response=swiftBody)
def __init__(self, **kwargs):
"""Initialises a new ``Self`` link instance. Accepts the same
Keyword Arguments as :class:`.Link`.
Additional Keyword Args:
external (bool): if true,force link to be fully-qualified URL,defaults to False
See Also:
:class:`.Link`
"""
url = request.url
external = kwargs.get('external', False)
if not external and current_app.config['SERVER_NAME'] is None:
url = request.url.replace(request.host_url, '/')
return super(Self, self).__init__('self', **kwargs)
def Feed():
searches = query(
'''
SELECT * FROM searches
WHERE published IS NOT NULL
ORDER BY id DESC
''', json=True)
site_url = 'http://' + app.config['HOSTNAME']
Feed_url = site_url + '/Feed/'
def add_url(s):
s['url'] = site_url + '/summary/' + s['date_path'] + '/'
return s
searches = map(_date_format, searches)
searches = list(map(add_url, searches))
resp = make_response(
render_template(
'Feed.xml',
updated=searches[0]['created'],
site_url=site_url,
Feed_url=Feed_url,
searches=searches
)
)
resp.headers['Content-Type'] = 'application/atom+xml'
return resp
def parse_post(post, external_links=False, create_html=True):
with open(os.path.join(BLOG_CONTENT_DIR, post)) as handle:
raw = handle.read()
frontmatter, content = REGEX_SPLIT_FRONTMATTER.split(raw, 2)
data = yaml.load(frontmatter)
y, m, d, slug = post[:-3].split('-', maxsplit=3)
if create_html:
data['html'] = markdown.markdown(content, extensions=[
'markdown.extensions.extra',
'markdown.extensions.codehilite',
'markdown.extensions.toc'
])
data['url'] = url_for('blog_post', y=y, m=m, d=d, slug=slug,
_external=external_links)
data['reading_time'] = reading_time(content)
return data
def get_Feed():
from mhn.common.clio import Clio
from mhn.auth import current_user
authFeed = mhn.config['Feed_AUTH_required']
if authFeed and not current_user.is_authenticated():
abort(404)
Feed = AtomFeed('MHN HpFeeds Report',
url=request.url_root)
sessions = Clio().session.get(options={'limit': 1000})
for s in sessions:
Feedtext = u'Sensor "{identifier}" '
Feedtext += '{source_ip}:{source_port} on sensorip:{destination_port}.'
Feedtext = Feedtext.format(**s.to_dict())
Feed.add('Feed', Feedtext, content_type='text',
published=s.timestamp, updated=s.timestamp,
url=makeurl(url_for('api.get_session', session_id=str(s._id))))
return Feed
def after_request_log(response):
name = dns_resolve(request.remote_addr)
current_app.logger.warn(u"""[client {ip} {host}] {http} "{method} {path}" {status}
Request: {method} {path}
Version: {http}
Status: {status}
Url: {url}
IP: {ip}
Hostname: {host}
Agent: {agent_platform} | {agent_browser} | {agent_browser_version}
Raw Agent: {agent}
""".format(method=request.method,
path=request.path,
url=request.url,
ip=request.remote_addr,
host=name if name is not None else '?',
agent_platform=request.user_agent.platform,
agent_browser=request.user_agent.browser,
agent_browser_version=request.user_agent.version,
agent=request.user_agent.string,
http=request.environ.get('SERVER_PROTOCOL'),
status=response.status))
return response
def get(self):
url = request.args.get('url')
tags = request.args.getlist('tag')
filters = [db.Bookmark.user == current_user.id]
if url is not None:
filters.append(db.Bookmark.url == urldefrag(url).url)
filters.append(db.Bookmark.tags.contains(tags))
result = db.Bookmark.query.filter(*filters) \
.order_by(
db.Bookmark.read.desc().nullsfirst(),
db.Bookmark.timestamp.desc()) \
.paginate()
headers = {}
links = []
if result.has_next:
last_url = update_query(request.url, {'page': result.pages})
links.append(lh.Link(last_url, rel='last'))
if links:
headers['Link'] = lh.format_links(links)
return list(map(lambda x: x.to_dict(), result.items)), 200, headers
def add_entry():
"""Add a new entry to the bibliography."""
form = BiblioForm()
if form.validate_on_submit():
bib_entry = BiblioEntry(ID=form.ID.data,
ENTRYTYPE=form.typ.data,
authors=form.author.data,
title=form.title.data,
year=form.year.data,
school="",
publisher="",
keywords=form.keywords.data,
url=form.url.data,
journal=form.journal.data)
db.session.add(bib_entry)
user = current_user.name
event = Event(author=user, article=form.ID.data,
event="ADD", time=time.time())
db.session.add(event)
db.session.commit()
return redirect("/biblio")
return redirect("/biblio")
def apidocs():
url = urlparse(request.url)
if ":" in url.netloc:
host, port = url.netloc.split(":")
else:
host = url.netloc
port = "80"
base_path = url.path.replace('/apidocs','') if url.path != "/apidocs" else "/"
schemes = [url.scheme]
other_scheme = "https" if url.scheme is "http" else "http"
try:
if request.get(other_scheme+"://"+url.netloc+url.path.replace('/apidocs','')+"/scheme").status_code is 200:
schemes += [other_scheme]
except:
pass
r = make_response(swagger.json(schemes, host, port, base_path))
r.mimetype = 'application/json'
return r
# return send_from_directory("www","swagger.json")
def tensorboard(logdir):
port = _tensorboard_dirs.get(logdir)
if not port:
sock = socket.socket(socket.AF_INET)
sock.bind(('', 0))
port = sock.getsockname()[1]
sock.close()
subprocess.Popen([
'tensorboard',
'--logdir=' + logdir,
'--port=' + str(port)])
time.sleep(5) # wait for tensorboard to spin up
_tensorboard_dirs[logdir] = port
redirect_url = 'http://{}:{}'.format(
six.moves.urllib.parse.urlparse(request.url).hostname,
port)
logger.debug('Redirecting to ' + redirect_url)
return redirect(redirect_url)
def make_error_page(app, code, sentry=None, data=None, exception=None):
''' creates the error page dictionary for web errors '''
shortname = name.lower().replace(' ', '_')
error = {}
error['title'] = 'Marvin | {0}'.format(name)
error['page'] = request.url
error['event_id'] = g.get('sentry_event_id', None)
error['data'] = data
error['name'] = name
error['code'] = code
error['message'] = exception.description if exception and hasattr(exception, 'description') else None
if app.config['USE_SENTRY'] and sentry:
error['public_dsn'] = sentry.client.get_public_dsn('https')
app.logger.error('{0} Exception {1}'.format(name, error))
return render_template('errors/{0}.html'.format(shortname), **error), code
# ----------------
# Error Handling
# ----------------
def weibo_nearbytimeline_wrapper():
try:
data = r.get(request.url)
if data is None:
data = weibo_service.get_all_weibo_nearby_async(
request.args["lat"],
request.args["lng"],
int(request.args["starttime"]),
int(request.args["endtime"]),
int(request.args["range"])
)
data = json.dumps(weibo_service.nearby_weibo_statis_wrapper(data))
r.set(request.url, data)
return data
except:
traceback.print_exc()
abort(404)
def get_user_flow_to_html():
try:
# data = None
data = r.get(request.url)
if data is None:
if "ring_str" in request.args:
ring_str = request.args["ring_str"]
else:
ring_str = None
data = hotel_data_service.get_user_flow_to_html(
request.args["hotel_name"],
request.args["baseinfo_id"].encode("utf-8"),
int(request.args["page"]),
ring_str=ring_str
)
data = json.dumps(data)
r.set(request.url, data)
return data
except:
traceback.print_exc()
abort(404)
def query_floorstate():
try:
result = r.get(request.url)
#result = None
if result is None:
result = pms_service.query_floorstate(
request.args['floornum'].encode('utf-8'),
request.args['time'].encode('utf-8')
)
result = json.dumps(result,ensure_ascii=False)
#print result
r.set(request.url,result)
return result
except Exception,e:
print e
traceback.print_exc()
abort(404)
def callback():
""" Step 3: Retrieving an access token.
The user has been redirected back from the provider to your registered
callback URL. With this redirection comes an authorization code included
in the redirect URL. We will use that to obtain an access token.
"""
mautic = OAuth2Session(client_id, redirect_uri=redirect_uri,
state=session['oauth_state'])
token = mautic.fetch_token(token_url, client_secret=client_secret, authorization_response=request.url)
# We use the session as a simple DB for this example.
session['oauth_token'] = token
update_token_tempfile(token) # store token in /tmp/mautic_creds.json
return redirect(url_for('.menu'))
def paths(self):
"""
:class:`dict`: The top level :swagger:`pathsObject`.
"""
paths = {}
for rule in self.app.url_map.iter_rules():
if rule.endpoint == 'static':
continue
log.info('Processing %r', rule)
url, parameters = parse_werkzeug_url(rule.rule)
paths.setdefault(url, {})
if parameters:
paths[url]['parameters'] = parameters
for method in rule.methods:
if method in ('HEAD', 'OPTIONS'):
# XXX Do we want to process these?
continue
paths[url][method.lower()] = self._process_rule(rule)
return paths
def deprecated(self, fn):
"""
Mark an operation as deprecated.
This will be exposed through the OpenAPI operation object.
Additionally a warning will be emitted when the API is used.
This can be configured using the ``OPENAPI_WARN_DEPRECATED``
configuration option. This must be one of ``warn`` or ``log``.
See :swagger:`operationDeprecated`.
"""
fn.deprecated = True
@functools.wraps(fn)
def call_deprecated(*args, **kwargs):
method = self._config('warn_deprecated')
log_args = request.method, request.url
if method == 'warn':
warnings.warn(_DEPRECATION_MESSAGE % log_args,
DeprecationWarning)
else:
log.warning(_DEPRECATION_MESSAGE, *log_args)
return fn(*args, **kwargs)
return call_deprecated
def getCurrentSpotifyPlaylistList():
ret_list = []
access_token = getSpotifyToken()
if access_token:
sp = spy.Spotify(access_token)
results = sp.current_user()
uri = 'spotify:user:12120746446:playlist:6ZK4Tz0ZsZuJBYyDZqlbGt'
username = uri.split(":")[2]
playlist_id = uri.split(":")[4]
results = sp.user_playlist(username, playlist_id)
for i, t in enumerate(results['tracks']['items']):
temp = {}
temp['img_url'] = t['track']['album']['images'][2]['url']
temp['name'] = t['track']['name']
temp['artist'] = t['track']['artists'][0]['name']
temp['album'] = t['track']['album']['name']
duration = int(t['track']['duration_ms']) / (1000 * 60)
temp['duration'] = "{:2.2f}".format(duration)
ret_list.append(temp)
return ret_list
def getCurrentYoutubePlaylistList():
#keep dicts of videos in ret_list
ret_list = []
youtube = initYoutube()
playlistitems_list_request = youtube.playlistItems().list(
playlistId=constants.YOUTUBE_PLAYLIST_ID,
part="snippet,contentDetails",
maxResults=30
)
playlistitems_list_response = playlistitems_list_request.execute()
#print(playlistitems_list_response)
for playlist_item in playlistitems_list_response['items']:
video_dict = {}
#print(playlist_item)
video_dict['name'] = playlist_item['snippet']['title']
video_dict['img_url'] = playlist_item['snippet']['thumbnails']['default']['url']
video_dict['artist'] = "--"
video_dict['album'] = "--"
video_dict['duration'] = "--:--"
ret_list.append(video_dict)
return ret_list
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。