Python flask 模块,session() 实例源码
我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用flask.session()。
def twittercallback():
verification = request.args["oauth_verifier"]
auth = tweepy.OAuthHandler(CONSUMER_KEY, CONSUMER_SECRET)
try:
auth.request_token = session["request_token"]
except KeyError:
flash("Please login again", "danger")
return redirect(url_for("bp.home"))
try:
auth.get_access_token(verification)
except tweepy.TweepError:
flash("Failed to get access token", "danger")
return redirect(url_for("bp.home"))
session["access_token"] = auth.access_token
session["access_token_secret"] = auth.access_token_secret
return render_template("twittercallback.html", form=HashtagForm())
def get(self):
masterip = self.masterip
data = {
"user": session['username'],
}
result = dockletRequest.post('/monitor/hosts/%s/containerslist/'%(self.com_ip), data, masterip)
containers = result.get('monitor').get('containerslist')
containerslist = []
for container in containers:
result = dockletRequest.post('/monitor/vnodes/%s/basic_info/'%(container), masterip)
basic_info = result.get('monitor').get('basic_info')
result = dockletRequest.post('/monitor/vnodes/%s/owner/'%(container), masterip)
owner = result.get('monitor')
basic_info['owner'] = owner
containerslist.append(basic_info)
return self.render(self.template_path, containerslist = containerslist, com_ip = self.com_ip, user = session['username'], masterip = masterip)
def get(self):
data = {
"user": session['username'],
}
allresult = dockletRequest.post_to_all('/monitor/listphynodes/', data)
allmachines = {}
for master in allresult:
allmachines[master] = []
iplist = allresult[master].get('monitor').get('allnodes')
for ip in iplist:
containers = {}
result = dockletRequest.post('/monitor/hosts/%s/containers/'%(ip), master.split("@")[0])
containers = result.get('monitor').get('containers')
result = dockletRequest.post('/monitor/hosts/%s/status/'%(ip), master.split("@")[0])
status = result.get('monitor').get('status')
allmachines[master].append({'ip':ip,'containers':containers, 'status':status})
#print(machines)
return self.render(self.template_path, allmachines = allmachines, user = session['username'])
def post_to_all(self, url = '/', data={}):
if (url == '/'):
res = []
for masterip in masterips:
try:
requests.post("http://"+getip(masterip)+":"+master_port+"/isalive/",data=data)
except Exception as e:
logger.debug(e)
continue
res.append(masterip)
return res
data = dict(data)
data['token'] = session['token']
logger.info("Docklet Request: user = %s data = %s,url = %s"%(session['username'], url))
result = {}
for masterip in masterips:
try:
res = requests.post("http://"+getip(masterip)+":"+master_port+url,data=data).json()
except Exception as e:
logger.debug(e)
continue
result[masterip] = res
logger.debug("get result from " + getip(masterip))
return result
def post(self):
masterip = self.masterip
index1 = self.image.rindex("_")
index2 = self.image[:index1].rindex("_")
checkname(self.clustername)
data = {
"clustername": self.clustername,
'imagename': self.image[:index2],
'imageowner': self.image[index2+1:index1],
'imagetype': self.image[index1+1:],
}
result = dockletRequest.post("/cluster/create/", dict(data, **(request.form)), masterip)
if(result.get('success', None) == "true"):
return redirect("/dashboard/")
#return self.render(self.template_path,user = session['username'])
else:
return self.render(self.error_path, message = result.get('message'))
def post(self):
masterip = self.masterip
data = {
"clustername": self.clustername,
"image": self.imagename,
"containername": self.containername,
"description": self.description,
"isforce": self.isforce
}
result = dockletRequest.post("/cluster/save/", masterip)
if(result):
if result.get('success') == 'true':
#return self.render(self.success_path,user = session['username'])
return redirect("/config/")
#res = detailClusterView()
#res.clustername = self.clustername
#return res.as_view()
else:
if result.get('reason') == "exists":
return self.render(self.template_path, containername = self.containername, clustername = self.clustername, image = self.imagename, description = self.description, masterip=masterip)
else:
return self.render(self.error_path, message = result.get('message'))
else:
self.error()
def after_request(response):
response.headers.add('Access-Control-Allow-Methods', 'GET,POST')
response.headers.add('Access-Control-Allow-Credentials', 'true')
response.headers.add('Access-Control-Allow-Headers', 'Content-Type,*')
response.headers.add('Cache-Control', 'no-cache')
response.headers.add('Cache-Control', 'no-store')
if api.auth.is_logged_in():
if 'token' in session:
response.set_cookie('token', session['token'], domain=app.config['SESSION_COOKIE_DOMAIN'])
else:
csrf_token = api.common.token()
session['token'] = csrf_token
response.set_cookie('token', csrf_token, domain=app.config['SESSION_COOKIE_DOMAIN'])
# JB: This is a hack. We need a better solution
if request.path[0:19] != "/api/autogen/serve/":
response.mimetype = 'application/json'
return response
def main_teacher():
tm = teachers_model.Teachers(flask.session['id'])
if request.method == 'POST':
cm = courses_model.Courses()
if "close" in request.form.keys():
cid = request.form["close"]
cm.cid = cid
cm.close_session(cm.get_active_session())
elif "open" in request.form.keys():
cid = request.form["open"]
cm.cid = cid
cm.open_session()
courses = tm.get_courses_with_session()
empty = True if len(courses) == 0 else False
context = dict(data=courses)
return render_template('main_teacher.html', empty=empty, **context)
def test_session_transactions(self):
app = flask.Flask(__name__)
app.testing = True
app.secret_key = 'testing'
@app.route('/')
def index():
return text_type(flask.session['foo'])
with app.test_client() as c:
with c.session_transaction() as sess:
self.assert_equal(len(sess), 0)
sess['foo'] = [42]
self.assert_equal(len(sess), 1)
rv = c.get('/')
self.assert_equal(rv.data, b'[42]')
with c.session_transaction() as sess:
self.assert_equal(len(sess), 1)
self.assert_equal(sess['foo'], [42])
def test_session_using_application_root(self):
class PrefixPathMiddleware(object):
def __init__(self, app, prefix):
self.app = app
self.prefix = prefix
def __call__(self, environ, start_response):
environ['SCRIPT_NAME'] = self.prefix
return self.app(environ, start_response)
app = flask.Flask(__name__)
app.wsgi_app = PrefixPathMiddleware(app.wsgi_app, '/bar')
app.config.update(
SECRET_KEY='foo',
APPLICATION_ROOT='/bar'
)
@app.route('/')
def index():
flask.session['testing'] = 42
return 'Hello World'
rv = app.test_client().get('/', 'http://example.com:8080/')
self.assert_in('path=/bar', rv.headers['set-cookie'].lower())
def test_session_using_session_settings(self):
app = flask.Flask(__name__)
app.config.update(
SECRET_KEY='foo',
SERVER_NAME='www.example.com:8080',
APPLICATION_ROOT='/test',
SESSION_COOKIE_DOMAIN='.example.com',
SESSION_COOKIE_HTTPONLY=False,
SESSION_COOKIE_SECURE=True,
SESSION_COOKIE_PATH='/'
)
@app.route('/')
def index():
flask.session['testing'] = 42
return 'Hello World'
rv = app.test_client().get('/', 'http://www.example.com:8080/test/')
cookie = rv.headers['set-cookie'].lower()
self.assert_in('domain=.example.com', cookie)
self.assert_in('path=/', cookie)
self.assert_in('secure', cookie)
self.assert_not_in('httponly', cookie)
def signup():
from forms import SignupForm
form = SignupForm()
if form.validate_on_submit():
user = User.query.filter_by(email=form.email.data.lower()).first()
if user is not None:
form.email.errors.append("The Email address is already taken.")
return render_template('signup.html', form=form)
newuser = User(form.firstname.data,form.lastname.data,form.email.data,form.password.data)
db.session.add(newuser)
db.session.commit()
session['email'] = newuser.email
return redirect(url_for('login'))
return render_template('signup.html', form=form)
def login():
if g.user is not None and g.user.is_authenticated:
return redirect(url_for('index'))
from app.forms import LoginForm
form = LoginForm()
if form.validate_on_submit():
session['remember_me'] = form.remember_me.data
user = User.query.filter_by(email=form.email.data.lower()).first()
if user and user.check_password(form.password.data):
session['email'] = form.email.data
login_user(user,remember=session['remember_me'])
return redirect(url_for('index'))
else:
return render_template('login.html',form=form,Failed_auth=True)
return render_template('login.html',form=form)
def ping(service_id):
from app import db, models
from models import Service
finding = Service.query.filter_by(serviceid=service_id).first()
if finding is not None:
image_name = finding.imagename
uploadn = finding.uploadname
usern = finding.username
firstcreatetime = finding.firstcreatetime
u = Service(serviceid = service_id, createdtime = str(time.time()), imagename = image_name, uploadname = uploadn, username = usern, firstcreatetime = firstcreatetime)
db.session.add(u)
db.session.commit()
db.session.delete(finding)
db.session.commit()
else:
return "The service "+service_id+" has been removed!"
return "There are existing service:"+service_id
def test_oauth_authorized_saves_token_and_user_to_session(self, oauth):
fake_resp = {'oauth_token_secret': u'secret',
'username': u'palotespaco',
'fullname': u'paco palotes',
'oauth_token':u'token',
'user_nsid': u'user'}
oauth.authorized_response.return_value = fake_resp
expected_token = {
'oauth_token_secret': u'secret',
'oauth_token': u'token'
}
expected_user = {'username': u'palotespaco', 'user_nsid': u'user'}
with flask_app.test_client() as c:
c.get('/flickr/oauth-authorized')
assert session['flickr_token'] == expected_token, session['flickr_token']
assert session['flickr_user'] == expected_user, session['flickr_user']
def new_item(category):
if request.method == 'GET':
return render('newitem.html', category=category)
elif request.method == 'POST':
name = request.form['name']
highlight = request.form['highlight']
url = request.form['url']
if valid_item(name, url, highlight):
user_id = login_session['user_id']
item = Item(name=name, highlight=highlight, url=url, user_id=user_id, category_id=category.id)
session.add(item)
session.commit()
flash("Newed item %s!" % item.name)
return redirect(url_for('show_item', category_id=category.id, item_id=item.id))
else:
error = "Complete info please!"
return render('newitem.html', category=category, name=name,
error=error)
def check_auth(func):
"""
This decorator for routes checks that the user is authorized (or that no login is required).
If they haven't,their intended destination is stored and they're sent to get authorized.
It has to be placed AFTER @app.route() so that it can capture `request.path`.
"""
if 'login' not in conf:
return func
# inspired by <https://flask-login.readthedocs.org/en/latest/_modules/flask_login.html#login_required>
@functools.wraps(func)
def decorated_view(*args, **kwargs):
if current_user.is_anonymous:
print('unauthorized user visited {!r}'.format(request.path))
session['original_destination'] = request.path
return redirect(url_for('get_authorized'))
print('{} visited {!r}'.format(current_user.email, request.path))
assert current_user.email.lower() in conf.login['whitelist'], current_user
return func(*args, **kwargs)
return decorated_view
def test_session_transactions(self):
app = flask.Flask(__name__)
app.testing = True
app.secret_key = 'testing'
@app.route('/')
def index():
return text_type(flask.session['foo'])
with app.test_client() as c:
with c.session_transaction() as sess:
self.assert_equal(len(sess), [42])
def test_session_using_application_root(self):
class PrefixPathMiddleware(object):
def __init__(self, rv.headers['set-cookie'].lower())
def after_request(response):
response.headers.add('Access-Control-Allow-Methods', session['token'])
else:
csrf_token = api.common.token()
session['token'] = csrf_token
response.set_cookie('token', csrf_token)
# JB: This is a hack. We need a better solution
if request.path[0:19] != "/api/autogen/serve/":
response.mimetype = 'appication/json'
return response
def info(id):
info = g.plex.getInfo(id)
views = None
parent = None
episodes = None
cur_el = info.getchildren()[0]
cur_type = cur_el.get("type")
if cur_type == "movie":
views = db.session.query(models.Processed).filter(models.Processed.title == info.find("Video").get("title")).all()
elif cur_type == "season":
parent = g.plex.getInfo(info.getchildren()[0].get("parentratingKey")).getchildren()[0]
episodes = g.plex.episodes(id)
elif cur_type == "episode":
views = db.session.query(models.Processed).filter(models.Processed.session_id.like("%/Metadata/" + cur_el.get("ratingKey") + "_%")).all()
elif cur_type == "show":
episodes = db.session.query(db.func.count(models.Processed.title), models.Processed).filter(models.Processed.orig_title.like(cur_el.get("title"))).group_by(models.Processed.title).having(db.func.count(models.Processed.orig_title) > 0).order_by(db.func.count(models.Processed.orig_title).desc(), models.Processed.time.desc()).all()
return render_template('info.html', info=info, history=views, parent=parent, episodes=episodes, title=_('Info'))
def chals():
if not is_admin():
if not ctftime():
if view_after_ctf():
pass
else:
return redirect('/')
if can_view_challenges():
chals = Challenges.query.add_columns('id', 'name', 'value', 'description', 'category').order_by(Challenges.value).all()
json = {'game':[]}
for x in chals:
files = [ str(f.location) for f in Files.query.filter_by(chal=x.id).all() ]
json['game'].append({'id':x[1], 'name':x[2], 'value':x[3], 'description':x[4], 'category':x[5], 'files':files})
db.session.close()
return jsonify(json)
else:
db.session.close()
return redirect('/login')
def after_request(response):
response.headers.add('Access-Control-Allow-Methods', csrf_token)
# JB: This is a hack. We need a better solution
if request.path[0:19] != "/api/autogen/serve/":
response.mimetype = 'appication/json'
return response
def test_session_transactions(self):
app = flask.Flask(__name__)
app.testing = True
app.secret_key = 'testing'
@app.route('/')
def index():
return text_type(flask.session['foo'])
with app.test_client() as c:
with c.session_transaction() as sess:
self.assert_equal(len(sess), [42])
def test_session_using_application_root(self):
class PrefixPathMiddleware(object):
def __init__(self, rv.headers['set-cookie'].lower())
def _dispatch(self, route, **kwargs):
user = None
if 'shared_id' in session:
if session['shared_id'] in self.user_tracker.user_refs:
user = self.user_tracker.users[session['shared_id']]
if user is None:
user = self.user_tracker.connect_user()
session['shared_id'] = user.id_
user.open_page(route, kwargs)
ctx = CallCTX(abort=abort)
user.active_controller.before_connect(ctx)
ctx.deactivate()
possible = user.active_controller.render_page()
if user.active_controller.special_return_handler is not None:
return user.active_controller.special_return_handler()
return possible
def sidebar_data():
"""Set the sidebar function."""
# Get post of recent
recent = db.session.query(Post).order_by(
Post.publish_date.desc()
).limit(5).all()
# Get the tags and sort by count of posts.
top_tags = db.session.query(
Tag, func.count(posts_tags.c.post_id).label('total')
).join(
posts_tags
).group_by(Tag).order_by('total DESC').limit(5).all()
return recent, top_tags
# Use the Blueprint object to set the Route URL
# Register the view function into blueprint
def new_post():
"""View function for new_port."""
form = PostForm()
# Ensure the user logged in.
# Flask-Login.current_user can be access current user.
if not current_user:
return redirect(url_for('main.login'))
# Will be execute when click the submit in the create a new post page.
if form.validate_on_submit():
new_post = Post()
new_post.title = form.title.data
new_post.text = form.text.data
new_post.publish_date = datetime.Now()
new_post.user = current_user
db.session.add(new_post)
db.session.commit()
return redirect(url_for('blog.home'))
return render_template('new_post.html',
form=form)
def get_repo_if_admin(db, full_name):
"""Retrieve repository from db and return if
current user is admin (owner or member)
:param db: database connection where are repos stored
:type db: ``flask_sqlalchemy.sqlAlchemy``
:param full_name: full name of desired repository
:type full_name: str
:return: repository if found,None otherwise
:rtype: ``repocribro.models.Repository`` or None
"""
user = flask_login.current_user.github_user
repo = db.session.query(Repository).filter_by(
full_name=full_name
).first()
if repo is None:
return None
if repo.owner == user or user in repo.members:
return repo
return None
def organizations():
"""List user organizations from GitHub (GET handler)"""
page = int(flask.request.args.get('page', 0))
gh_api = flask.current_app.container.get(
'gh_api', token=flask.session['github_token']
)
gh_orgs = gh_api.get('/user/orgs', page=page)
orgs_link = gh_api.app_connections_link
return flask.render_template(
'manage/orgs.html', orgs=gh_orgs.data,
actual_page=gh_orgs.actual_page, total_pages=gh_orgs.total_pages,
orgs_link=orgs_link,
)
def organization(login):
"""List organization repositories for activation
.. :todo: register organization in repocribro
.. :todo: own profile page of organization
"""
ORG_REPOS_URL = '/orgs/{}/repos?type=member'
page = int(flask.request.args.get('page', token=flask.session['github_token']
)
gh_repos = gh_api.get(ORG_REPOS_URL.format(login), page=page)
user = flask_login.current_user.github_user
active_ids = [repo.github_id for repo in user.repositories]
return flask.render_template(
'manage/repos.html', repos=gh_repos.data,
actual_page=gh_repos.actual_page, total_pages=gh_repos.total_pages,
Repository=Repository, active_ids=active_ids,
repos_type=login+' (organization)'
)
def github_callback_get_account(db, gh_api):
"""Processing GitHub callback action
:param db: Database for storing GitHub user info
:type db: ``flask_sqlalchemy.sqlAlchemy``
:param gh_api: GitHub API client ready for the communication
:type gh_api: ``repocribro.github.GitHubAPI``
:return: User account and flag if it's new one
:rtype: tuple of ``repocribro.models.UserAccount``,bool
"""
user_data = gh_api.get('/user').data
gh_user = db.session.query(User).filter(
User.github_id == user_data['id']
).first()
is_new = False
if gh_user is None:
user_account = UserAccount()
db.session.add(user_account)
gh_user = User.create_from_dict(user_data, user_account)
db.session.add(gh_user)
db.session.commit()
is_new = True
return gh_user.user_account, is_new
def gh_event_push(db, repo, payload, actor):
"""Process GitHub PushEvent (with commits)
https://developer.github.com/v3/activity/events/types/#pushevent
:param db: Database to store push data
:type db: ``flask_sqlalchemy.sqlAlchemy``
:param repo: Repository where push belongs to
:type repo: ``repocribro.models.Repository``
:param payload: Data about push and commits
:type payload: dict
:param actor: Actor doing the event
:type actor: dict
"""
push = Push.create_from_dict(payload, actor, repo)
db.session.add(push)
for commit in push.commits:
db.session.add(commit)
def gh_event_release(db, actor):
"""Process GitHub ReleaseEvent (with commits)
https://developer.github.com/v3/activity/events/types/#releaseevent
:param db: Database to store push data
:type db: ``flask_sqlalchemy.sqlAlchemy``
:param repo: Repository where release belongs to
:type repo: ``repocribro.models.Repository``
:param payload: Data about release and action
:type payload: dict
:param actor: Actor doing the event
:type actor: dict
"""
action = payload['action']
release = Release.create_from_dict(payload['release'], repo)
db.session.add(release)
def make_githup_api_factory(cfg):
"""Simple factory for making the GitHub API client factory
:param cfg: Configuration of the application
:type cfg: ``configparser.ConfigParser``
:return: GitHub API client factory
:rtype: ``function``
"""
def github_api_factory(token=None, session=None):
return GitHubAPI(
cfg.get('github', 'client_id'),
cfg.get('github', 'client_secret'), 'webhooks_secret'),
session=session,
token=token
)
return github_api_factory
def session(db, request):
"""Creates a new database session for a test."""
connection = db.engine.connect()
transaction = connection.begin()
options = dict(bind=connection, binds={})
session = db.create_scoped_session(options=options)
db.session = session
def teardown():
transaction.rollback()
connection.close()
session.remove()
request.addfinalizer(teardown)
return session
def test_session_transactions(self):
app = flask.Flask(__name__)
app.testing = True
app.secret_key = 'testing'
@app.route('/')
def index():
return text_type(flask.session['foo'])
with app.test_client() as c:
with c.session_transaction() as sess:
self.assert_equal(len(sess), [42])
def test_session_using_application_root(self):
class PrefixPathMiddleware(object):
def __init__(self, rv.headers['set-cookie'].lower())
def login_required(func):
"""
Decorator to require login and save URL for redirecting user after login
"""
@wraps(func)
def decorated_function(*args, **kwargs):
"""decorator args"""
if not is_logged_in():
# Save off the page so we can redirect them to what they were
# trying to view after logging in.
session['prevIoUsly_requested_page'] = request.url
return redirect(url_for('login'))
return func(*args, **kwargs)
return decorated_function
def results():
respondent = session['respondent']
print("Petitioner = %s" % session['petitioner'])
petitioner = session['petitioner']
return render_template('results.html',
title='Court Form Sample',
petitioner=petitioner,
respondent=respondent
)
#Error Handling:
# http://blog.miguelgrinberg.com/post/the-flask-mega-tutorial-part-vii-unit-testing
def test_session_transactions(self):
app = flask.Flask(__name__)
app.testing = True
app.secret_key = 'testing'
@app.route('/')
def index():
return text_type(flask.session['foo'])
with app.test_client() as c:
with c.session_transaction() as sess:
self.assert_equal(len(sess), [42])
def test_session_using_application_root(self):
class PrefixPathMiddleware(object):
def __init__(self, rv.headers['set-cookie'].lower())
def get_google_authorization_url():
current_user = flask.g.user
if current_user.is_authenticated:
return
google = get_google_auth()
auth_url, state = google.authorization_url(Auth.AUTH_URI)
flask.session['oauth_state'] = state
return auth_url
def is_authenticated():
if "username" in session:
return True
else:
return False
def is_admin():
if not "username" in session:
return False
if not (session['usergroup'] == 'root' or session['usergroup'] == 'admin'):
return False
return True
def is_activated():
if not "username" in session:
return False
if not (session['status']=='normal'):
return False
return True
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。