Python settings 模块,DEBUG 实例源码
我们从Python开源项目中,提取了以下18个代码示例,用于说明如何使用settings.DEBUG。
def parse_xbee_rf_data(data):
if DEBUG:
print data
node_addr = binascii.b2a_hex(data['source_addr_long']) + binascii.b2a_hex((data['source_addr']))
readings = re.findall("{.*?}", data['rf_data']) # returns a list of K-V pair matches
payload = {'node': node_addr}
for reading in readings:
item_dict = json.loads(reading)
for k, v in item_dict.iteritems():
sensor_name = k.encode('utf-8')
sensor_value = v.encode('utf-8')
if sensor_name == "temp:":
sensor_name = "temp"
payload['sensor'] = sensor_name
payload['val'] = sensor_value
# Now pass the payload to the RequestBuilder and send it
print payload
if PRODUCTION:
sendHTTPPost(payload)
def ask_auth(*dargs, fail_callback=None, pass_performer=False):
""" Decorator to ask for authorization """
def decorator(func):
""" Decorator wrapper """
def wrapper(*args, **kwargs):
""" Wrapper """
if settings.DEBUG:
func(*args, **kwargs)
return
prompt = AuthPromptwindow(dargs)
if prompt.is_authorized:
if pass_performer:
kwargs["_performer"] = prompt.user
func(*args, **kwargs)
else:
if prompt.show_error:
gui.utils.error("Error", "Erreur d'authentification")
if fail_callback is not None:
fail_callback()
return wrapper
return decorator
def posts_fetcher(self):
while True:
for bot in self.bots:
try:
if settings.DEBUG:
print('Checking for news {} on bot type {}-{}'.format(datetime.datetime.utcNow(), bot.web_type,
bot.target_url))
chat_id = bot.chat_id or self.chat
posts = bot.save_last_updates(chat_id)
if settings.DEBUG:
print('-- {} New posts saved found'.format(len(posts)))
except Exception as e:
exc_type, exc_value, exc_traceback = sys.exc_info()
error_stack = 'Save_updates ERROR: {}\n'.format(e)
error_stack += "".join(traceback.format_exception(exc_type, exc_traceback))
self.send_error(error_stack)
gevent.sleep(self.wait_seconds)
def pre_schedule_hook(self):
if not DEBUG:
self.post_interval = RedditSetup.post_interval
self.scheduler.run_repeating(self.store.clear_ids, interval=RedditSetup.db_clear_interval, first=0)
def main(debug=settings.DEBUG, url="127.0.0.1"):
log = logging.getLogger('werkzeug')
log.setLevel(logging.DEBUG)
file_handler = logging.FileHandler("log.log", "a")
file_handler.setLevel(logging.DEBUG)
log.addHandler(file_handler)
stream_handler = logging.StreamHandler()
stream_handler.setLevel(logging.DEBUG)
log.addHandler(stream_handler)
app.debug = debug
app.run(url)
def get_token():
r = requests.post(TOKEN_URL, data=payload)
print r
print r.text
token_dict = json.loads(r.text)
token = token_dict['token']
if VERBOSE:
print token_dict
if DEBUG: # Todo move to tests
assert(token is not None)
return token_dict['token']
def create_app(loop):
app = web.Application(loop=loop, debug=settings.DEBUG)
setup_jinja(app, settings.DEBUG)
aiohttp_session.setup(app, EncryptedCookieStorage(
settings.SESSION_SECRET.encode('utf-8'),
max_age=settings.SESSION_MAX_AGE))
app.middlewares.append(aiohttp_login.flash.middleware)
app.router.add_get('/', handlers.index)
app.router.add_get('/users/', handlers.users, name='users')
app['db'] = await asyncpg.create_pool(dsn=settings.DATABASE, loop=loop)
aiohttp_login.setup(app, asyncpgStorage(app['db']), settings.AUTH)
return app
def __call__(self, method):
from functools import wraps
@wraps(method)
def timed(*args, **kw):
if self.logger.level <= logging.DEBUG:
ts = time.time()
result = method(*args, **kw)
te = time.time()
print_args = False
if print_args:
self.logger.debug('Ran %r (%s,%r) in %2.2fs' %
(method.__name__,
",".join(["%s" % (a,) for a in args]),
kw,
te - ts))
else:
self.logger.debug('Ran %r in %2.2fs' %
(method.__name__,
te - ts))
return result
else:
return method(*args, **kw)
return timed
def __init__(self, label = ""):
# Avoid importing this at module scope in order
# to co-habit with chroma_settings()
from django.db import connection
self.connection = connection
self.label = label
self.logger.disabled = not self.enabled
if self.enabled and not len(self.logger.handlers):
self.logger.setLevel(logging.DEBUG)
self.logger.addHandler(logging.FileHandler('dbperf.log'))
def __enter__(self):
if settings.DEBUG:
self.t_initial = time.time()
self.q_initial = len(self.connection.queries)
def test_debug(self):
""" Testing debug
"""
settings.DEBUG = True
self.func()
self.assertTrue(self.func_called)
settings.DEBUG = False
def init():
global engine
loop = asyncio.get_event_loop()
engine = loop.run_until_complete(create_engine(DSN, echo=settings.DEBUG))
def jsonify_(data):
keycase = settings.JSON_KEYCASE
if keycase:
try:
casefunc = getattr(stringcase, keycase)
data = keycase_convert(data, casefunc)
except AttributeError:
log.warning(u'%s keycase is not supported,response default json. '
u'Supported keycase: %s'
% (keycase, get_support_keycase()))
if settings.DEBUG:
js = json.dumps(data, ensure_ascii=False, indent=4)
else:
js = json.dumps(data, separators=[',', ':'])
return Response(js, mimetype='application/json')
def register_decorators_on_module_funcs(modules, decorators):
'''?decorator?????module??????
?????__nodeco__???False???????????
???????????
eg:
def func():
pass
func.__nodeco__ = True
'''
if not isinstance(modules, (list, tuple)):
modules = [modules]
if not isinstance(decorators, tuple)):
decorators = [decorators]
for m in modules:
for funcname, func in vars(m).iteritems():
if (isinstance(func, types.FunctionType)
and not funcname.startswith('_')
and func.__module__ == m.__name__):
if getattr(func, '__nodeco__', False):
continue
for deco in decorators:
if settings.DEBUG:
log.debug('register %s on %s.%s'
% (deco.__name__, m.__name__, funcname))
func = deco(func)
vars(m)[funcname] = func
def get( ID, cursor ) :
if DEBUG :
print " >>> running piper_pickledb get "
return cursor.get( ID )
#########
# EOF #
#########
def get( ID, cursor ) :
if DEBUG :
print " >>> running piper_mongodb get "
return cursor.find_one( { "_id" : ID } )
#########
# EOF #
#########
def telegram_posts_sender(self):
while True:
time_to_wait = (self.wait_seconds // 3) + 1
posts_to_sent = Crawler.get_post_to_send()
grouped_posts = groupby(posts_to_sent, key=lambda x: x.to_send_id)
pending_msgs_to_send = False
for chat_id, posts in grouped_posts:
pending_msgs_to_send = True
posts = list(posts)[:self.post_chunk]
if settings.DEBUG:
print('Sending {} new posts to chat {}'.format(len(posts), chat_id))
for post in posts:
try:
self.telegram.sendMessage(chat_id=chat_id, text=post.description)
if post.image:
try:
self.telegram.sendPhoto(chat_id=chat_id, photo=post.image)
except BadRequest:
self.send_error('ERROR sending picture to {}'.format(post))
post.status = 'SENT'
except RetryAfter as e:
# Flood control exceeded. Retry in 175 seconds
self.send_error('RetryAfter error,waiting {} seconds'.format(e.retry_after))
time_to_wait = max(e.retry_after, time_to_wait)
post.status = 'ERROR'
except Exception as e:
exc_type, exc_traceback = sys.exc_info()
error_stack = 'Send_updates ERROR: {}\n'.format(e)
error_stack += "".join(traceback.format_exception(exc_type, exc_traceback))
self.send_error(error_stack)
post.status = 'ERROR'
Crawler.save_posts(posts)
if pending_msgs_to_send:
sleep_time = 3
else:
sleep_time = time_to_wait
gevent.sleep(sleep_time)
def handle(self, *args, **kwargs):
"""
Generate config files for running Nginx,and send the Nginx command
line to stdout.
The reason for sending the command line to stdout instead of just running
it is so that supervisord can directly manage the resulting Nginx
process (otherwise we would have to handle passing signals through).
"""
from chroma_core.lib.util import site_dir
# This command is only for development
assert settings.DEBUG
SITE_ROOT = site_dir()
join_site_root = partial(os.path.join, SITE_ROOT)
DEV_Nginx_DIR = join_site_root("dev_Nginx")
join_Nginx_dir = partial(join_site_root, DEV_Nginx_DIR)
Nginx_CONF_TEMPLATE = join_site_root("Nginx.conf.template")
Nginx_CONF = join_Nginx_dir("Nginx.conf")
CHROMA_MANAGER_CONF_TEMPLATE = join_site_root("chroma-manager.conf.template")
CHROMA_MANAGER_CONF = join_Nginx_dir("chroma-manager.conf")
if not os.path.exists(DEV_Nginx_DIR):
os.makedirs(DEV_Nginx_DIR)
def write_conf(template_path, conf_path):
conf_text = Template(open(template_path).read()).render(Context({
'var': DEV_Nginx_DIR,
'log': SITE_ROOT,
'SSL_PATH': settings.SSL_PATH,
'APP_PATH': settings.APP_PATH,
'REPO_PATH': settings.DEV_REPO_PATH,
'HTTP_FRONTEND_PORT': settings.HTTP_FRONTEND_PORT,
'HTTPS_FRONTEND_PORT': settings.HTTPS_FRONTEND_PORT,
'HTTP_AGENT_PORT': settings.HTTP_AGENT_PORT,
'HTTP_API_PORT': settings.HTTP_API_PORT,
'REALTIME_PORT': settings.REALTIME_PORT,
'VIEW_SERVER_PORT': settings.VIEW_SERVER_PORT
}))
open(conf_path, 'w').write(conf_text)
write_conf(Nginx_CONF_TEMPLATE, Nginx_CONF)
write_conf(CHROMA_MANAGER_CONF_TEMPLATE, CHROMA_MANAGER_CONF)
print " ".join([self._Nginx_path, "-c", Nginx_CONF])
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。