微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

Python flask.current_app 模块-debug() 实例源码

Python flask.current_app 模块,debug() 实例源码

我们从Python开源项目中,提取了以下40代码示例,用于说明如何使用flask.current_app.debug()

项目:microcosm-flask    作者:globality-corp    | 项目源码 | 文件源码
def logging_levels():
    """
    Context manager to conditionally set logging levels.

    Supports setting per-request debug logging using the `X-Request-Debug` header.

    """
    enabled = strtobool(request.headers.get("x-request-debug", "false"))
    level = None
    try:
        if enabled:
            level = getLogger().getEffectiveLevel()
            getLogger().setLevel(DEBUG)
        yield
    finally:
        if enabled:
            getLogger().setLevel(level)
项目:microcosm-flask    作者:globality-corp    | 项目源码 | 文件源码
def capture_request(self):
        if not current_app.debug:
            # only capture request body on debug
            return

        if not self.options.include_request_body:
            # only capture request body if requested
            return

        if (
                request.content_length and
                self.options.include_request_body is not True and
                request.content_length >= self.options.include_request_body
        ):
            # don't capture request body if it's too large
            return

        if not request.get_json(force=True, silent=True):
            # only capture request body if json
            return

        self.request_body = request.get_json(force=True)
项目:flasky    作者:RoSEOu    | 项目源码 | 文件源码
def redirect_to_ssl(self):
        """Redirect incoming requests to HTTPS."""
        # Should we redirect?
        criteria = [
            request.is_secure,
            current_app.debug,
            request.headers.get('X-Forwarded-Proto', 'http') == 'https'
        ]

        if not any(criteria) and not self.skip:
            if request.url.startswith('http://'):
                url = request.url.replace('http://', 'https://', 1)
                code = 302
                if self.permanent:
                    code = 301
                r = redirect(url, code=code)
                return r
项目:diyca    作者:texadactyl    | 项目源码 | 文件源码
def dbuser_add(arg_userid, arg_password, arg_email):
    global dbconn
    if app.debug:
        app.logger.debug("dbuser_add: arg_userid=%s,arg_email=%s", arg_userid, arg_email)
    try:
        dbcursor = dbconn.cursor() 
        stamp = epoch2dbfmt(time.time())
        dbcursor.execute("INSERT INTO {tn} ('{cn1}','{cn2}','{cn3}','{cn4}') VALUES ('{cv1}','{cv2}','{cv3}','{cv4}')" \
                         .format(tn=ddl.TBL_USER, \
                                 cn1=ddl.FLD_USER_ID, cv1=arg_userid, \
                                 cn2=ddl.FLD_USER_PSWD, cv2=arg_password, \
                                 cn3=ddl.FLD_USER_EMAIL, cv3=arg_email, \
                                 cn4=ddl.FLD_USER_TSTAMP, cv4=stamp))
        dbconn.commit()
    except sqlite3.Error as e:
        app.logger.error("dbuser_add: INSERT {%s,%s} Failed,reason: {%s}", arg_email, repr(e))
        return False
    # Success
    return True

#=====================
# Remove a user record
#=====================
项目:diyca    作者:texadactyl    | 项目源码 | 文件源码
def dbuser_remove(arg_userid):
    global dbconn
    if app.debug:
        app.logger.debug("dbuser_remove: arg_userid=%s", arg_userid)
    try:
        dbcursor = dbconn.cursor() 
        dbcursor.execute("DELETE FROM {tn} WHERE {cn1}='{cv1}'" \
                         .format(tn=ddl.TBL_USER, cv1=arg_userid))
        dbconn.commit()
    except sqlite3.Error as e:
        app.logger.error("dbuser_remove: DELETE {%s} Failed, repr(e))
        return False
    # Success
    return True

#========================
# Initialize the database
#========================
项目:oa_qian    作者:sunqb    | 项目源码 | 文件源码
def delete_memoized_verhash(self, f, *args):
        """
        Delete the version hash associated with the function.

        ..warning::

            Performing this operation Could leave keys behind that have
            been created with this version hash. It is up to the application
            to make sure that all keys that may have been created with this
            version hash at least have timeouts so they will not sit orphaned
            in the cache backend.
        """
        if not callable(f):
            raise DeprecationWarning("Deleting messages by relative name is no longer"
                          " reliable,please use a function reference")

        try:
            self._memoize_version(f, delete=True)
        except Exception:
            if current_app.debug:
                raise
            logger.exception("Exception possibly due to cache backend.")
项目:FileStoreGAE    作者:liantian-cn    | 项目源码 | 文件源码
def delete_memoized_verhash(self, delete=True)
        except Exception:
            if current_app.debug:
                raise
            logger.exception("Exception possibly due to cache backend.")
项目:mlab-vis-api    作者:m-lab    | 项目源码 | 文件源码
def convert_to_json(data):
    '''
    Encode the data as JSON -- taken from
    flask_restplus.representations.output_json -- updated to clean the
    dictionary of nulls.
    '''
    settings = current_app.config.get('RESTPLUS_JSON', {})

    # If we're in debug mode,and the indent is not set,we set it to a
    # reasonable value here.  Note that this won't override any existing value
    # that was set.  We also set the "sort_keys" value.
    if current_app.debug:
        settings.setdefault('indent', 4)
        settings.setdefault('sort_keys', True)

    # always end the json dumps with a new line
    # see https://github.com/mitsuhiko/flask/pull/1262
    dumped = dumps(cleandict(data), **settings) + "\n"

    return dumped
项目:zeus    作者:getsentry    | 项目源码 | 文件源码
def redirect_to_ssl(self):
        """
        Redirect incoming requests to HTTPS.
        """
        criteria = [
            request.is_secure,
            current_app.testing, 'http') == 'https'
        ]

        if request.headers.get('User-Agent', '').lower().startswith(self.exclude_user_agents):
            return

        if not any(criteria):
            if request.url.startswith('http://'):
                url = request.url.replace('http://', 1)
                r = redirect(url, code=301)
                return r
项目:zeus    作者:getsentry    | 项目源码 | 文件源码
def worker(channel, queue, token, repo_ids=None, build_ids=None):
    allowed_repo_ids = frozenset(token['repo_ids'])

    while (await channel.wait_message()):
        msg = await channel.get_json()
        data = msg.get('data')
        if data['repository']['id'] not in allowed_repo_ids:
            continue
        if build_ids and data['id'] not in build_ids:
            continue
        if repo_ids and data['repository']['id'] not in repo_ids:
            continue
        evt = Event(
            msg.get('id'),
            msg.get('event'),
            data,
        )
        await queue.put(evt)
        current_app.logger.debug(
            'pubsub.event.received qsize=%s', queue.qsize())


# @log_errors
项目:parade    作者:bailaohe    | 项目源码 | 文件源码
def catch_parade_error(func):
    def wrapper(*args, **kw):
        try:
            return func(*args, **kw)
        except ParadeError as e:
            if current_app.debug:
                exc_type, exc_value, exc_traceback = sys.exc_info()
                stack_info = traceback.format_exception(exc_type, exc_traceback)
                abort(e.status, code=e.code, message=e.reason, traceback=stack_info)
            else:
                abort(e.status, message=e.reason)
        except Exception as e:
            if current_app.debug:
                exc_type, exc_traceback)
                abort(500, code=0, message=str(e), traceback=stack_info)
            else:
                abort(500, message=str(e))
    return wrapper
项目:locust-demo    作者:bmd    | 项目源码 | 文件源码
def output_json(data, code, headers=None):
    """Makes a Flask response with a JSON encoded body"""

    settings = current_app.config.get('RESTFUL_JSON', True)

    # always end the json dumps with a new line
    # see https://github.com/mitsuhiko/flask/pull/1262
    dumped = dumps(data, **settings) + "\n"

    resp = make_response(dumped, code)
    resp.headers.extend(headers or {})
    return resp
项目:ibrest    作者:hamx0r    | 项目源码 | 文件源码
def setup_client(client):
    """ Attach handlers to the clients
    """
    #log.debug('setup_client {}'.format(client.clientId))
    client.register(handlers.connection_handler, 'ManagedAccounts', 'NextValidId')
    client.register(handlers.history_handler, 'HistoricalData')
    client.register(handlers.order_handler, 'Openorder', 'OrderStatus', 'OpenorderEnd')
    client.register(handlers.portfolio_positions_handler, 'Position', 'PositionEnd')
    client.register(handlers.account_summary_handler, 'AccountSummary', 'AccountSummaryEnd')
    client.register(handlers.account_update_handler, 'UpdateAccountTime', 'UpdateAccountValue', 'UpdatePortfolio',
                    'AccountDownloadEnd')
    client.register(handlers.contract_handler, 'ContractDetails')
    client.register(handlers.executions_handler, 'ExecDetails', 'ExecDetailsEnd', 'CommissionsReport')
    client.register(handlers.error_handler, 'Error')
    # Add handlers for Feeds
    client.register(handlers.market_handler, 'TickSize', 'TickPrice')

    # For easier debugging,register all messages with the generic handler
    # client.registerall(handlers.generic_handler)

    # Be sure we're in a disconnected state
    client.disconnect()
项目:flat-api    作者:harryho    | 项目源码 | 文件源码
def output_json(data, False)

    # always end the json dumps with a new line
    # see https://github.com/mitsuhiko/flask/pull/1262
    dumped = dumps(data, code)
    # resp.headers.extend(headers or {'Content-Type':'application/json'})
    # Always return as JSON 
    resp.headers['Content-Type'] = 'application/json'
    return resp
项目:noobotkit    作者:nazroll    | 项目源码 | 文件源码
def output_json(data, code)
    resp.headers.extend(headers or {})
    return resp
项目:Chorus    作者:DonaldBough    | 项目源码 | 文件源码
def output_json(data, code)
    resp.headers.extend(headers or {})
    return resp
项目:microcosm-flask    作者:globality-corp    | 项目源码 | 文件源码
def log(self, logger):
        if self.status_code == 500:
            # something actually went wrong; investigate
            dct = self.to_dict()

            if current_app.debug or current_app.testing:
                message = dct.pop("message")
                logger.warning(message, extra=dct, exc_info=True)
            else:
                logger.warning(dct)
        else:
            # usually log at INFO; a raised exception can be an error or expected behavior (e.g. 404)
            logger.info(self.to_dict())
项目:microcosm-flask    作者:globality-corp    | 项目源码 | 文件源码
def capture_response(self, response):
        self.success = True

        body, self.status_code, self.response_headers = parse_response(response)

        if not current_app.debug:
            # only capture responsebody on debug
            return

        if not self.options.include_response_body:
            # only capture response body if requested
            return

        if not body:
            # only capture request body if there is one
            return

        if (
                self.options.include_response_body is not True and
                len(body) >= self.options.include_response_body
        ):
            # don't capture response body if it's too large
            return

        try:
            self.response_body = loads(body)
        except (TypeError, ValueError):
            # not json
            pass
项目:diyca    作者:texadactyl    | 项目源码 | 文件源码
def verify_email_recipient(arg_recipient):
    if app.debug:
        app.logger.debug("verify_email_recipient: email recipient = %s", arg_recipient)
    # Inspect email address
    result = re.match('^[_a-z0-9-]+(\.[_a-z0-9-]+)*@[a-z0-9-]+(\.[a-z0-9-]+)*(\.[a-z]{2,4})$', arg_recipient)
    if result == None:
        if app.debug:
            app.logger.debug("verify_email_recipient: Not an email address: %s", arg_recipient)
        return False
    # Extract domain name from arg_recipient
    pieces = arg_recipient.split("@")
    if len(pieces) != 2:
        if app.debug:
            app.logger.debug("verify_email_recipient: Did not split into 2 pieces: %s", arg_recipient)
        return False
    domain = pieces[1]
    if app.debug:
        app.logger.debug("verify_email_recipient: email domain = %s", domain)
    # Get MX record for target domain
    try:
        records = dns.resolver.query(domain, 'MX')
        mxRecord = str(records[0].exchange)
    except:
        if app.debug:
            app.logger.debug("verify_email_recipient: DNS MX-query exception with %s", domain)
        return False
    if app.debug:
        app.logger.debug("verify_email_recipient: DNS MX record = %s", mxRecord)
    return True

#======================================
# Convert epoch time to database format
#======================================
项目:chihu    作者:yelongyu    | 项目源码 | 文件源码
def _get_wrap(self, node, classes='form-group'):
        # add required class,which strictly speaking isn't bootstrap,but
        # a common enough customization
        if node.flags.required:
            classes += ' required'

        div = tags.div(_class=classes)
        if current_app.debug:
            div.add(tags.comment(' Field: {} ({}) '.format(
                node.name, node.__class__.__name__)))

        return div
项目:doorman    作者:mwielgoszewski    | 项目源码 | 文件源码
def logger(node=None):
    '''
    '''
    data = request.get_json()
    log_type = data['log_type']
    log_level = current_app.config['DOORMAN_MINIMUM_OSQUERY_LOG_LEVEL']

    if current_app.debug:
        current_app.logger.debug(json.dumps(data, indent=2))

    if log_type == 'status':
        log_tee.handle_status(data, host_identifier=node.host_identifier)
        status_logs = []
        for item in data.get('data', []):
            if int(item['severity']) < log_level:
                continue
            status_logs.append(StatusLog(node_id=node.id, **item))
        else:
            db.session.add(node)
            db.session.bulk_save_objects(status_logs)
            db.session.commit()

    elif log_type == 'result':
        db.session.add(node)
        db.session.bulk_save_objects(process_result(data, node))
        db.session.commit()
        log_tee.handle_result(data, host_identifier=node.host_identifier)
        analyze_result.delay(data, node.to_dict())

    else:
        current_app.logger.error("%s - UnkNown log_type %r",
            request.remote_addr, log_type
        )
        current_app.logger.info(json.dumps(data))
        # still need to write last_checkin,last_ip
        db.session.add(node)
        db.session.commit()

    return jsonify(node_invalid=False)
项目:storage-api    作者:cerndb    | 项目源码 | 文件源码
def handle_netapp_exception(error):
    '''Return the error message from the filer and 500 status code'''
    return_message = {'message': error.msg, "errno": error.errno}
    if current_app.debug:
        return_message['failing_query'] = str(error.failing_query)

    return return_message, 500
项目:python_ddd_flask    作者:igorvinnicius    | 项目源码 | 文件源码
def _get_wrap(self, node.__class__.__name__)))

        return div
项目:flask-ask    作者:johnwheeler    | 项目源码 | 文件源码
def dbgdump(obj, default=None, cls=None):
    if current_app.config.get('ASK_PRETTY_DEBUG_LOGS', False):
        indent = 2
    else:
        indent = None
    msg = json.dumps(obj, indent=indent, default=default, cls=cls)
    logger.debug(msg)
项目:flask-ask    作者:johnwheeler    | 项目源码 | 文件源码
def init_app(self, app, path='templates.yaml'):
        """Initializes Ask app by setting configuration variables,loading templates,and maps Ask route to a flask view.

        The Ask instance is given the following configuration variables by calling on Flask's configuration:

        `ASK_APPLICATION_ID`:

            Turn on application ID verification by setting this variable to an application ID or a
            list of allowed application IDs. By default,application ID verification is disabled and a
            warning is logged. This variable should be set in production to ensure
            requests are being sent by the applications you specify.
            Default: None

        `ASK_VERIFY_REQUESTS`:

            Enables or disables Alexa request verification,which ensures requests sent to your skill
            are from Amazon's Alexa service. This setting should not be disabled in production.
            It is useful for mocking JSON requests in automated tests.
            Default: True

        `ASK_VERIFY_TIMESTAMP_DEBUG`:

            Turn on request timestamp verification while debugging by setting this to True.
            Timestamp verification helps mitigate against replay attacks. It relies on the system clock
            being synchronized with an NTP server. This setting should not be enabled in production.
            Default: False

        `ASK_PRETTY_DEBUG_LOGS`:

            Add tabs and linebreaks to the Alexa request and response printed to the debug log.
            This improves readability when printing to the console,but breaks formatting when logging to CloudWatch.
            Default: False
        """
        if self._route is None:
            raise TypeError("route is a required argument when app is not None")

        app.ask = self

        app.add_url_rule(self._route, view_func=self._flask_view_func, methods=['POST'])
        app.jinja_loader = ChoiceLoader([app.jinja_loader, YamlLoader(app, path)])
项目:flask-ask    作者:johnwheeler    | 项目源码 | 文件源码
def _alexa_request(self, verify=True):
        raw_body = flask_request.data
        alexa_request_payload = json.loads(raw_body)

        if verify:
            cert_url = flask_request.headers['Signaturecertchainurl']
            signature = flask_request.headers['Signature']

            # load certificate - this verifies a the certificate url and format under the hood
            cert = verifier.load_certificate(cert_url)
            # verify signature
            verifier.verify_signature(cert, signature, raw_body)

            # verify timestamp
            raw_timestamp = alexa_request_payload.get('request', {}).get('timestamp')
            timestamp = self._parse_timestamp(raw_timestamp)

            if not current_app.debug or self.ask_verify_timestamp_debug:
                verifier.verify_timestamp(timestamp)

            # verify application id
            try:
                application_id = alexa_request_payload['session']['application']['applicationId']
            except KeyError:
                application_id = alexa_request_payload['context'][
                    'System']['application']['applicationId']
            if self.ask_application_id is not None:
                verifier.verify_application_id(application_id, self.ask_application_id)

        return alexa_request_payload
项目:zeus    作者:getsentry    | 项目源码 | 文件源码
def ping(loop, resp, client_guid):
    # periodically send ping to the browser. Any message that
    # starts with ":" colon ignored by a browser and Could be used
    # as ping message.
    while True:
        await asyncio.sleep(15, loop=loop)
        current_app.logger.debug('pubsub.ping guid=%s', client_guid)
        resp.write(b': ping\r\n\r\n')


# @log_errors
项目:zeus    作者:getsentry    | 项目源码 | 文件源码
def build_server(loop, host, port):
    app = Application(loop=loop, logger=current_app.logger,
                      debug=current_app.debug)
    app.router.add_route('GET', '/', stream)
    app.router.add_route('GET', '/healthz', health_check)

    return await loop.create_server(app.make_handler(), port)
项目:ddots-api-server    作者:frol    | 项目源码 | 文件源码
def serve_swaggerui_assets(path):
    """
    Swagger-UI assets serving route.
    """
    if not current_app.debug:
        import warnings
        warnings.warn(
            "/swaggerui/ is recommended to be served by public-facing server (e.g. Nginx)"
        )
    from flask import send_from_directory
    return send_from_directory('../static/', path)
项目:flask-zhenai-mongo-echarts    作者:Fretice    | 项目源码 | 文件源码
def _get_wrap(self, node.__class__.__name__)))

        return div
项目:netzob-webapi    作者:netzob    | 项目源码 | 文件源码
def serve_swaggerui_assets(path):
    """
    Swagger-UI assets serving route.
    """
    if not current_app.debug:
        import warnings
        warnings.warn(
            "/swaggerui/ is recommended to be served by public-facing server (e.g. Nginx)"
        )
    from flask import send_from_directory
    return send_from_directory('../static/', path)
项目:ibrest    作者:hamx0r    | 项目源码 | 文件源码
def get_client():
    """ Creates a client connection to be used with orders
    """
    # Get client ID from our non-order pool list in memory
    timeout = g.timeout
    while g.clientId_in_use:
        log.debug('Waiting for clientId to become available...({})'.format(timeout))
        time.sleep(0.5)
        timeout -= 1

    client = g.client_connection

    # Enable logging if we're in debug mode
    if current_app.debug is True:
        client.enableLogging()

    # Reconnect if needed
    if not client.isConnected():
        log.debug('Client {} not connected.  Trying to reconnect...'.format(g.client_id))
        client.disconnect()
        time.sleep(1)
        client.connect()
        # If we Failed to reconnect,be sure to put our client ID back in the pool
        if client.isConnected() is False:
            raise Exception('Client cannot connect')
    return client
项目:Lixiang_zhaoxin    作者:hejaxian    | 项目源码 | 文件源码
def _get_wrap(self, node.__class__.__name__)))

        return div
项目:Hawkeye    作者:tozhengxq    | 项目源码 | 文件源码
def recording_enabled():
    return (current_app.debug
            or current_app.config.get('sqlALCHEMY_RECORD_QUERIES'))
项目:ngx_status    作者:YoYoAdorkable    | 项目源码 | 文件源码
def _get_wrap(self, node.__class__.__name__)))

        return div
项目:drift    作者:dgnorth    | 项目源码 | 文件源码
def handle_all_exceptions(e):
    is_server_error = not isinstance(e, HTTPException)

    ret = {}
    error = {}
    ret['error'] = error

    if is_server_error or e.code >= 500:            
        # Use context_id from the client if it's available,or make one if not.
        log_context = request.headers.get("Drift-Log-Context")
        log_context = json.loads(log_context) if log_context else {}
        context_id = log_context.get("request_id", str(uuid.uuid4()).replace("-", ""))
        error['context_id'] = context_id
        title = str(e) + " - [{}]".format(context_id)
        splunk_link = 'http://splunk.devnorth.dg-api.com:8000/en-US/app/search/search'
        splunk_link += '?q=search%20sourcetype%3D%22*%22%20%7C%20search%20{}'.format(context_id)
        error['link'] = splunk_link

    if is_server_error:
        # Do a traceback if caller has dev role,or we are running in debug mode.
        current_user = query_current_user()
        if (current_user and "dev" in current_user['roles']) or current_app.debug:
            sio = cStringIO.StringIO()
            ei = sys.exc_info()
            sio.write("%s: %s\n" % (type(e).__name__, e))
            traceback.print_exception(ei[0], ei[1], ei[2], None, sio)
            error["traceback"] = sio.getvalue()
            sio.close()
            error['description'] = str(e)
        else:
            error['description'] = "Internal Server Error"

        # The exception is logged out and picked up by Splunk or comparable tool.
        # The 'context_id' in the title enables quick cross referencing with the 
        # response body below.
        log.exception(title)

        ret['status_code'] = 500
        ret['message'] = "Internal Server Error"
        error['code'] = 'internal_server_error'
    else:
        ret['status_code'] = e.code
        ret['message'] = e.name
        error['code'] = 'user_error' if e.code < 500 else 'server_error'
        error['description'] = e.description

        # Support for Flask Restful 'data' property in exceptions.
        if hasattr(e, 'data') and e.data:
            error.update(e.data)        

            # Legacy field 'message'. If it's in the 'data' payload,rename the field
            # to 'description'.
            if 'message' in e.data:
                error['description'] = error.pop('message')

        if e.code >= 500:
            # It's a "soft" server error. Let's log it out.
            log.warning(title + " " + error['description'])

    return make_response(jsonify(ret), ret['status_code'])
项目:diyca    作者:texadactyl    | 项目源码 | 文件源码
def sign_csr(arg_userid, arg_csr_path, arg_crt_path):
    # csr = User CSR file in internal crypto format
    (result, buffer) = get_file_contents(arg_csr_path)
    if not result:
        app.logger.error("sign_csr: cannot access CSR {%s} for user {%s}, buffer)
        return False
    try:
        csr = crypto.load_certificate_request(crypto.FILETYPE_PEM, buffer)
    except Exception as e:
        app.logger.error("sign_csr: load CSR {%s} for user {%s} Failed, repr(e))
        return False
    # CAcertificate = CA certificate in internal crypto format
    (result, buffer) = get_file_contents(CA_CRT_FILE)
    if not result:
        app.logger.error("sign_csr: cannot access CA certificate {%s} for user {%s}, CA_CRT_FILE, repr(e))
        return False
    try:
        CAcertificate = crypto.load_certificate(crypto.FILETYPE_PEM, buffer)
        if app.debug:
            app.logger.debug("sign_csr: CA cert subject = {%s}", CAcertificate.get_subject())
    except Exception as e:
        app.logger.error("sign_csr: load CA certificate {%s} for user {%s} Failed, repr(e))
        return False
    # CAprivatekey = CA private key in internal crypto format
    (result, buffer) = get_file_contents(CA_KEY_FILE)
    if not result:
        app.logger.error("sign_csr: cannot access CA private key {%s} for user {%s}, CA_KEY_FILE, buffer)
        return False
    try:
        CAprivatekey = crypto.load_privatekey(crypto.FILETYPE_PEM, buffer)
    except Exception as e:
        app.logger.error("sign_csr: load CA private key {%s} for user {%s} Failed, repr(e))
    # Sign CSR,giving the CRT
    try:
        cert = crypto.X509()
        cert.set_serial_number(42)
        cert.gmtime_adj_notBefore(0)
        cert.gmtime_adj_notAfter(EXPIRY_PERIOD)
        cert.set_issuer(CAcertificate.get_subject())
        subject = csr.get_subject() # will log the subject later
        cert.set_subject(subject)
        cert.set_pubkey(csr.get_pubkey())
        cert.sign(CAprivatekey, DIGEST)
    except Exception as e:
        app.logger.error("sign_csr: Cannot sign CSR {%s} for user {%s}, repr(e)) 
        return False
    # Store signed CRT
    try:
        file = open(arg_crt_path, "w")
        file.write(crypto.dump_certificate(crypto.FILETYPE_PEM, cert).decode('utf-8'))
        file.flush()
        os.fsync(file)
        file.close()
    except Exception as e:
        app.logger.error("sign_csr: Cannot store CRT {%s} for user {%s}, arg_crt_path, repr(e))
        return False
    # Success
    app.logger.info("sign_csr: Success with CRT {%s} for user {%s},subject={%s}", subject)
    return True
项目:Plog    作者:thundernet8    | 项目源码 | 文件源码
def redis_cached(timeout=None, key_prefix='view/%s', unless=None):
    """
    ?????????
    :param timeout: ??????,????????3600?
    :param key_prefix: ???
    :param unless: ??????????
    :return: ???????????
    """
    def decorator(f):
        @functools.wraps(f)  # ??????
        def decorated_function(*args, **kwargs):
            if callable(unless) and unless() is True:
                return f(*args, **kwargs)

            if kwargs.get('nocache'):
                return f(*args, **kwargs)  # ????????? nocache ???????

            try:
                cache_key = decorated_function.make_cache_key(*args, **kwargs)
                cache_key = urllib.quote(cache_key, safe='')
                rv = redis_get(cache_key)
            except Exception:
                if current_app.debug:
                    raise
                return f(*args, **kwargs)

            if rv is None:
                rv = f(*args, **kwargs)
                try:
                    redis_set(cache_key, rv, timeout=decorated_function.cache_timeout)
                except Exception:
                    if current_app.debug:
                        raise
                    return f(*args, **kwargs)
            return rv

        def make_cache_key(*args, **kwargs):
            if callable(key_prefix):
                cache_key = key_prefix()
            elif '%s' in key_prefix:
                cache_key = key_prefix % (request.url+'_uid_'+str(current_user.get_id()))
            else:
                cache_key = key_prefix
            cache_key = hashlib.md5(cache_key.encode('utf-8')).hexdigest()
            cache_key = '_'.join((get_version(level='day'), cache_key))
            return cache_key

        decorated_function.uncached = f
        decorated_function.cache_timeout = timeout
        decorated_function.make_cache_key = make_cache_key

        return decorated_function
    return decorator
项目:Plog    作者:thundernet8    | 项目源码 | 文件源码
def redis_memoize(timeout=100, make_name=None,??100s
    :param make_name: ????,???????????,?????????,??????????
    :param unless: ??????????
    :return: ???????????
    """
    def decorator(f):
        @functools.wraps(f)  # ??????
        def decorated_function(*args, **kwargs)  # ????????? nocache ???????

            try:
                cache_key = decorated_function.make_cache_key(make_name, args, kwargs)
                rv = redis_get(cache_key)
            except Exception:
                if current_app.debug:
                    raise
                return f(*args, **kwargs)
            return rv

        def make_cache_key(make_name, keyargs, keykwargs):
            fname = f.__name__
            if callable(make_name):
                fname = make_name(fname)
            if isinstance(make_name, str):
                fname = make_name
            alt_fname = '.'.join((f.__module__, fname))
            try:
                origin_str = "{0}{1}{2}".format(alt_fname, keykwargs)
            except AttributeError:
                origin_str = "%s%s%s" % (alt_fname, keykwargs)
            cache_key = hashlib.md5(origin_str.encode('utf-8')).hexdigest()
            cache_key = '_'.join((get_version(level='day'), cache_key))
            return cache_key

        decorated_function.uncached = f
        decorated_function.cache_timeout = timeout
        decorated_function.make_cache_key = make_cache_key

        return decorated_function
    return decorator
项目:doorman    作者:mwielgoszewski    | 项目源码 | 文件源码
def distributed_write(node=None):
    '''
    '''
    data = request.get_json()

    if current_app.debug:
        current_app.logger.debug(json.dumps(data, indent=2))

    queries = data.get('queries', {})
    statuses = data.get('statuses', {})

    for guid, results in queries.items():
        task = distributedQueryTask.query.filter(
            distributedQueryTask.guid == guid,
            distributedQueryTask.status == distributedQueryTask.PENDING,
            distributedQueryTask.node == node,
        ).first()

        if not task:
            current_app.logger.error(
                "%s - Got result for distributed query not in PENDING "
                "state: %s: %s",
                request.remote_addr, guid, json.dumps(data)
            )
            continue

        # non-zero status indicates sqlite errors

        if not statuses.get(guid, 0):
            status = distributedQueryTask.COMPLETE
        else:
            current_app.logger.error(
                "%s - Got non-zero status code (%d) on distributed query %s", statuses.get(guid), guid
            )
            status = distributedQueryTask.Failed

        for columns in results:
            result = distributedQueryResult(
                columns,
                distributed_query=task.distributed_query,
                distributed_query_task=task
            )
            db.session.add(result)
        else:
            task.status = status
            db.session.add(task)

    else:
        # need to write last_checkin,last_ip on node
        db.session.add(node)
        db.session.commit()

    return jsonify(node_invalid=False)

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐