Python flask.current_app 模块,debug() 实例源码
我们从Python开源项目中,提取了以下40个代码示例,用于说明如何使用flask.current_app.debug()。
def logging_levels():
"""
Context manager to conditionally set logging levels.
Supports setting per-request debug logging using the `X-Request-Debug` header.
"""
enabled = strtobool(request.headers.get("x-request-debug", "false"))
level = None
try:
if enabled:
level = getLogger().getEffectiveLevel()
getLogger().setLevel(DEBUG)
yield
finally:
if enabled:
getLogger().setLevel(level)
def capture_request(self):
if not current_app.debug:
# only capture request body on debug
return
if not self.options.include_request_body:
# only capture request body if requested
return
if (
request.content_length and
self.options.include_request_body is not True and
request.content_length >= self.options.include_request_body
):
# don't capture request body if it's too large
return
if not request.get_json(force=True, silent=True):
# only capture request body if json
return
self.request_body = request.get_json(force=True)
def redirect_to_ssl(self):
"""Redirect incoming requests to HTTPS."""
# Should we redirect?
criteria = [
request.is_secure,
current_app.debug,
request.headers.get('X-Forwarded-Proto', 'http') == 'https'
]
if not any(criteria) and not self.skip:
if request.url.startswith('http://'):
url = request.url.replace('http://', 'https://', 1)
code = 302
if self.permanent:
code = 301
r = redirect(url, code=code)
return r
def dbuser_add(arg_userid, arg_password, arg_email):
global dbconn
if app.debug:
app.logger.debug("dbuser_add: arg_userid=%s,arg_email=%s", arg_userid, arg_email)
try:
dbcursor = dbconn.cursor()
stamp = epoch2dbfmt(time.time())
dbcursor.execute("INSERT INTO {tn} ('{cn1}','{cn2}','{cn3}','{cn4}') VALUES ('{cv1}','{cv2}','{cv3}','{cv4}')" \
.format(tn=ddl.TBL_USER, \
cn1=ddl.FLD_USER_ID, cv1=arg_userid, \
cn2=ddl.FLD_USER_PSWD, cv2=arg_password, \
cn3=ddl.FLD_USER_EMAIL, cv3=arg_email, \
cn4=ddl.FLD_USER_TSTAMP, cv4=stamp))
dbconn.commit()
except sqlite3.Error as e:
app.logger.error("dbuser_add: INSERT {%s,%s} Failed,reason: {%s}", arg_email, repr(e))
return False
# Success
return True
#=====================
# Remove a user record
#=====================
def dbuser_remove(arg_userid):
global dbconn
if app.debug:
app.logger.debug("dbuser_remove: arg_userid=%s", arg_userid)
try:
dbcursor = dbconn.cursor()
dbcursor.execute("DELETE FROM {tn} WHERE {cn1}='{cv1}'" \
.format(tn=ddl.TBL_USER, cv1=arg_userid))
dbconn.commit()
except sqlite3.Error as e:
app.logger.error("dbuser_remove: DELETE {%s} Failed, repr(e))
return False
# Success
return True
#========================
# Initialize the database
#========================
def delete_memoized_verhash(self, f, *args):
"""
Delete the version hash associated with the function.
..warning::
Performing this operation Could leave keys behind that have
been created with this version hash. It is up to the application
to make sure that all keys that may have been created with this
version hash at least have timeouts so they will not sit orphaned
in the cache backend.
"""
if not callable(f):
raise DeprecationWarning("Deleting messages by relative name is no longer"
" reliable,please use a function reference")
try:
self._memoize_version(f, delete=True)
except Exception:
if current_app.debug:
raise
logger.exception("Exception possibly due to cache backend.")
def delete_memoized_verhash(self, delete=True)
except Exception:
if current_app.debug:
raise
logger.exception("Exception possibly due to cache backend.")
def convert_to_json(data):
'''
Encode the data as JSON -- taken from
flask_restplus.representations.output_json -- updated to clean the
dictionary of nulls.
'''
settings = current_app.config.get('RESTPLUS_JSON', {})
# If we're in debug mode,and the indent is not set,we set it to a
# reasonable value here. Note that this won't override any existing value
# that was set. We also set the "sort_keys" value.
if current_app.debug:
settings.setdefault('indent', 4)
settings.setdefault('sort_keys', True)
# always end the json dumps with a new line
# see https://github.com/mitsuhiko/flask/pull/1262
dumped = dumps(cleandict(data), **settings) + "\n"
return dumped
def redirect_to_ssl(self):
"""
Redirect incoming requests to HTTPS.
"""
criteria = [
request.is_secure,
current_app.testing, 'http') == 'https'
]
if request.headers.get('User-Agent', '').lower().startswith(self.exclude_user_agents):
return
if not any(criteria):
if request.url.startswith('http://'):
url = request.url.replace('http://', 1)
r = redirect(url, code=301)
return r
def worker(channel, queue, token, repo_ids=None, build_ids=None):
allowed_repo_ids = frozenset(token['repo_ids'])
while (await channel.wait_message()):
msg = await channel.get_json()
data = msg.get('data')
if data['repository']['id'] not in allowed_repo_ids:
continue
if build_ids and data['id'] not in build_ids:
continue
if repo_ids and data['repository']['id'] not in repo_ids:
continue
evt = Event(
msg.get('id'),
msg.get('event'),
data,
)
await queue.put(evt)
current_app.logger.debug(
'pubsub.event.received qsize=%s', queue.qsize())
# @log_errors
def catch_parade_error(func):
def wrapper(*args, **kw):
try:
return func(*args, **kw)
except ParadeError as e:
if current_app.debug:
exc_type, exc_value, exc_traceback = sys.exc_info()
stack_info = traceback.format_exception(exc_type, exc_traceback)
abort(e.status, code=e.code, message=e.reason, traceback=stack_info)
else:
abort(e.status, message=e.reason)
except Exception as e:
if current_app.debug:
exc_type, exc_traceback)
abort(500, code=0, message=str(e), traceback=stack_info)
else:
abort(500, message=str(e))
return wrapper
def output_json(data, code, headers=None):
"""Makes a Flask response with a JSON encoded body"""
settings = current_app.config.get('RESTFUL_JSON', True)
# always end the json dumps with a new line
# see https://github.com/mitsuhiko/flask/pull/1262
dumped = dumps(data, **settings) + "\n"
resp = make_response(dumped, code)
resp.headers.extend(headers or {})
return resp
def setup_client(client):
""" Attach handlers to the clients
"""
#log.debug('setup_client {}'.format(client.clientId))
client.register(handlers.connection_handler, 'ManagedAccounts', 'NextValidId')
client.register(handlers.history_handler, 'HistoricalData')
client.register(handlers.order_handler, 'Openorder', 'OrderStatus', 'OpenorderEnd')
client.register(handlers.portfolio_positions_handler, 'Position', 'PositionEnd')
client.register(handlers.account_summary_handler, 'AccountSummary', 'AccountSummaryEnd')
client.register(handlers.account_update_handler, 'UpdateAccountTime', 'UpdateAccountValue', 'UpdatePortfolio',
'AccountDownloadEnd')
client.register(handlers.contract_handler, 'ContractDetails')
client.register(handlers.executions_handler, 'ExecDetails', 'ExecDetailsEnd', 'CommissionsReport')
client.register(handlers.error_handler, 'Error')
# Add handlers for Feeds
client.register(handlers.market_handler, 'TickSize', 'TickPrice')
# For easier debugging,register all messages with the generic handler
# client.registerall(handlers.generic_handler)
# Be sure we're in a disconnected state
client.disconnect()
def output_json(data, False)
# always end the json dumps with a new line
# see https://github.com/mitsuhiko/flask/pull/1262
dumped = dumps(data, code)
# resp.headers.extend(headers or {'Content-Type':'application/json'})
# Always return as JSON
resp.headers['Content-Type'] = 'application/json'
return resp
def output_json(data, code)
resp.headers.extend(headers or {})
return resp
def output_json(data, code)
resp.headers.extend(headers or {})
return resp
def log(self, logger):
if self.status_code == 500:
# something actually went wrong; investigate
dct = self.to_dict()
if current_app.debug or current_app.testing:
message = dct.pop("message")
logger.warning(message, extra=dct, exc_info=True)
else:
logger.warning(dct)
else:
# usually log at INFO; a raised exception can be an error or expected behavior (e.g. 404)
logger.info(self.to_dict())
def capture_response(self, response):
self.success = True
body, self.status_code, self.response_headers = parse_response(response)
if not current_app.debug:
# only capture responsebody on debug
return
if not self.options.include_response_body:
# only capture response body if requested
return
if not body:
# only capture request body if there is one
return
if (
self.options.include_response_body is not True and
len(body) >= self.options.include_response_body
):
# don't capture response body if it's too large
return
try:
self.response_body = loads(body)
except (TypeError, ValueError):
# not json
pass
def verify_email_recipient(arg_recipient):
if app.debug:
app.logger.debug("verify_email_recipient: email recipient = %s", arg_recipient)
# Inspect email address
result = re.match('^[_a-z0-9-]+(\.[_a-z0-9-]+)*@[a-z0-9-]+(\.[a-z0-9-]+)*(\.[a-z]{2,4})$', arg_recipient)
if result == None:
if app.debug:
app.logger.debug("verify_email_recipient: Not an email address: %s", arg_recipient)
return False
# Extract domain name from arg_recipient
pieces = arg_recipient.split("@")
if len(pieces) != 2:
if app.debug:
app.logger.debug("verify_email_recipient: Did not split into 2 pieces: %s", arg_recipient)
return False
domain = pieces[1]
if app.debug:
app.logger.debug("verify_email_recipient: email domain = %s", domain)
# Get MX record for target domain
try:
records = dns.resolver.query(domain, 'MX')
mxRecord = str(records[0].exchange)
except:
if app.debug:
app.logger.debug("verify_email_recipient: DNS MX-query exception with %s", domain)
return False
if app.debug:
app.logger.debug("verify_email_recipient: DNS MX record = %s", mxRecord)
return True
#======================================
# Convert epoch time to database format
#======================================
def _get_wrap(self, node, classes='form-group'):
# add required class,which strictly speaking isn't bootstrap,but
# a common enough customization
if node.flags.required:
classes += ' required'
div = tags.div(_class=classes)
if current_app.debug:
div.add(tags.comment(' Field: {} ({}) '.format(
node.name, node.__class__.__name__)))
return div
def logger(node=None):
'''
'''
data = request.get_json()
log_type = data['log_type']
log_level = current_app.config['DOORMAN_MINIMUM_OSQUERY_LOG_LEVEL']
if current_app.debug:
current_app.logger.debug(json.dumps(data, indent=2))
if log_type == 'status':
log_tee.handle_status(data, host_identifier=node.host_identifier)
status_logs = []
for item in data.get('data', []):
if int(item['severity']) < log_level:
continue
status_logs.append(StatusLog(node_id=node.id, **item))
else:
db.session.add(node)
db.session.bulk_save_objects(status_logs)
db.session.commit()
elif log_type == 'result':
db.session.add(node)
db.session.bulk_save_objects(process_result(data, node))
db.session.commit()
log_tee.handle_result(data, host_identifier=node.host_identifier)
analyze_result.delay(data, node.to_dict())
else:
current_app.logger.error("%s - UnkNown log_type %r",
request.remote_addr, log_type
)
current_app.logger.info(json.dumps(data))
# still need to write last_checkin,last_ip
db.session.add(node)
db.session.commit()
return jsonify(node_invalid=False)
def handle_netapp_exception(error):
'''Return the error message from the filer and 500 status code'''
return_message = {'message': error.msg, "errno": error.errno}
if current_app.debug:
return_message['failing_query'] = str(error.failing_query)
return return_message, 500
def _get_wrap(self, node.__class__.__name__)))
return div
def dbgdump(obj, default=None, cls=None):
if current_app.config.get('ASK_PRETTY_DEBUG_LOGS', False):
indent = 2
else:
indent = None
msg = json.dumps(obj, indent=indent, default=default, cls=cls)
logger.debug(msg)
def init_app(self, app, path='templates.yaml'):
"""Initializes Ask app by setting configuration variables,loading templates,and maps Ask route to a flask view.
The Ask instance is given the following configuration variables by calling on Flask's configuration:
`ASK_APPLICATION_ID`:
Turn on application ID verification by setting this variable to an application ID or a
list of allowed application IDs. By default,application ID verification is disabled and a
warning is logged. This variable should be set in production to ensure
requests are being sent by the applications you specify.
Default: None
`ASK_VERIFY_REQUESTS`:
Enables or disables Alexa request verification,which ensures requests sent to your skill
are from Amazon's Alexa service. This setting should not be disabled in production.
It is useful for mocking JSON requests in automated tests.
Default: True
`ASK_VERIFY_TIMESTAMP_DEBUG`:
Turn on request timestamp verification while debugging by setting this to True.
Timestamp verification helps mitigate against replay attacks. It relies on the system clock
being synchronized with an NTP server. This setting should not be enabled in production.
Default: False
`ASK_PRETTY_DEBUG_LOGS`:
Add tabs and linebreaks to the Alexa request and response printed to the debug log.
This improves readability when printing to the console,but breaks formatting when logging to CloudWatch.
Default: False
"""
if self._route is None:
raise TypeError("route is a required argument when app is not None")
app.ask = self
app.add_url_rule(self._route, view_func=self._flask_view_func, methods=['POST'])
app.jinja_loader = ChoiceLoader([app.jinja_loader, YamlLoader(app, path)])
def _alexa_request(self, verify=True):
raw_body = flask_request.data
alexa_request_payload = json.loads(raw_body)
if verify:
cert_url = flask_request.headers['Signaturecertchainurl']
signature = flask_request.headers['Signature']
# load certificate - this verifies a the certificate url and format under the hood
cert = verifier.load_certificate(cert_url)
# verify signature
verifier.verify_signature(cert, signature, raw_body)
# verify timestamp
raw_timestamp = alexa_request_payload.get('request', {}).get('timestamp')
timestamp = self._parse_timestamp(raw_timestamp)
if not current_app.debug or self.ask_verify_timestamp_debug:
verifier.verify_timestamp(timestamp)
# verify application id
try:
application_id = alexa_request_payload['session']['application']['applicationId']
except KeyError:
application_id = alexa_request_payload['context'][
'System']['application']['applicationId']
if self.ask_application_id is not None:
verifier.verify_application_id(application_id, self.ask_application_id)
return alexa_request_payload
def ping(loop, resp, client_guid):
# periodically send ping to the browser. Any message that
# starts with ":" colon ignored by a browser and Could be used
# as ping message.
while True:
await asyncio.sleep(15, loop=loop)
current_app.logger.debug('pubsub.ping guid=%s', client_guid)
resp.write(b': ping\r\n\r\n')
# @log_errors
def build_server(loop, host, port):
app = Application(loop=loop, logger=current_app.logger,
debug=current_app.debug)
app.router.add_route('GET', '/', stream)
app.router.add_route('GET', '/healthz', health_check)
return await loop.create_server(app.make_handler(), port)
def serve_swaggerui_assets(path):
"""
Swagger-UI assets serving route.
"""
if not current_app.debug:
import warnings
warnings.warn(
"/swaggerui/ is recommended to be served by public-facing server (e.g. Nginx)"
)
from flask import send_from_directory
return send_from_directory('../static/', path)
def _get_wrap(self, node.__class__.__name__)))
return div
def serve_swaggerui_assets(path):
"""
Swagger-UI assets serving route.
"""
if not current_app.debug:
import warnings
warnings.warn(
"/swaggerui/ is recommended to be served by public-facing server (e.g. Nginx)"
)
from flask import send_from_directory
return send_from_directory('../static/', path)
def get_client():
""" Creates a client connection to be used with orders
"""
# Get client ID from our non-order pool list in memory
timeout = g.timeout
while g.clientId_in_use:
log.debug('Waiting for clientId to become available...({})'.format(timeout))
time.sleep(0.5)
timeout -= 1
client = g.client_connection
# Enable logging if we're in debug mode
if current_app.debug is True:
client.enableLogging()
# Reconnect if needed
if not client.isConnected():
log.debug('Client {} not connected. Trying to reconnect...'.format(g.client_id))
client.disconnect()
time.sleep(1)
client.connect()
# If we Failed to reconnect,be sure to put our client ID back in the pool
if client.isConnected() is False:
raise Exception('Client cannot connect')
return client
def _get_wrap(self, node.__class__.__name__)))
return div
def _get_wrap(self, node.__class__.__name__)))
return div
def handle_all_exceptions(e):
is_server_error = not isinstance(e, HTTPException)
ret = {}
error = {}
ret['error'] = error
if is_server_error or e.code >= 500:
# Use context_id from the client if it's available,or make one if not.
log_context = request.headers.get("Drift-Log-Context")
log_context = json.loads(log_context) if log_context else {}
context_id = log_context.get("request_id", str(uuid.uuid4()).replace("-", ""))
error['context_id'] = context_id
title = str(e) + " - [{}]".format(context_id)
splunk_link = 'http://splunk.devnorth.dg-api.com:8000/en-US/app/search/search'
splunk_link += '?q=search%20sourcetype%3D%22*%22%20%7C%20search%20{}'.format(context_id)
error['link'] = splunk_link
if is_server_error:
# Do a traceback if caller has dev role,or we are running in debug mode.
current_user = query_current_user()
if (current_user and "dev" in current_user['roles']) or current_app.debug:
sio = cStringIO.StringIO()
ei = sys.exc_info()
sio.write("%s: %s\n" % (type(e).__name__, e))
traceback.print_exception(ei[0], ei[1], ei[2], None, sio)
error["traceback"] = sio.getvalue()
sio.close()
error['description'] = str(e)
else:
error['description'] = "Internal Server Error"
# The exception is logged out and picked up by Splunk or comparable tool.
# The 'context_id' in the title enables quick cross referencing with the
# response body below.
log.exception(title)
ret['status_code'] = 500
ret['message'] = "Internal Server Error"
error['code'] = 'internal_server_error'
else:
ret['status_code'] = e.code
ret['message'] = e.name
error['code'] = 'user_error' if e.code < 500 else 'server_error'
error['description'] = e.description
# Support for Flask Restful 'data' property in exceptions.
if hasattr(e, 'data') and e.data:
error.update(e.data)
# Legacy field 'message'. If it's in the 'data' payload,rename the field
# to 'description'.
if 'message' in e.data:
error['description'] = error.pop('message')
if e.code >= 500:
# It's a "soft" server error. Let's log it out.
log.warning(title + " " + error['description'])
return make_response(jsonify(ret), ret['status_code'])
def sign_csr(arg_userid, arg_csr_path, arg_crt_path):
# csr = User CSR file in internal crypto format
(result, buffer) = get_file_contents(arg_csr_path)
if not result:
app.logger.error("sign_csr: cannot access CSR {%s} for user {%s}, buffer)
return False
try:
csr = crypto.load_certificate_request(crypto.FILETYPE_PEM, buffer)
except Exception as e:
app.logger.error("sign_csr: load CSR {%s} for user {%s} Failed, repr(e))
return False
# CAcertificate = CA certificate in internal crypto format
(result, buffer) = get_file_contents(CA_CRT_FILE)
if not result:
app.logger.error("sign_csr: cannot access CA certificate {%s} for user {%s}, CA_CRT_FILE, repr(e))
return False
try:
CAcertificate = crypto.load_certificate(crypto.FILETYPE_PEM, buffer)
if app.debug:
app.logger.debug("sign_csr: CA cert subject = {%s}", CAcertificate.get_subject())
except Exception as e:
app.logger.error("sign_csr: load CA certificate {%s} for user {%s} Failed, repr(e))
return False
# CAprivatekey = CA private key in internal crypto format
(result, buffer) = get_file_contents(CA_KEY_FILE)
if not result:
app.logger.error("sign_csr: cannot access CA private key {%s} for user {%s}, CA_KEY_FILE, buffer)
return False
try:
CAprivatekey = crypto.load_privatekey(crypto.FILETYPE_PEM, buffer)
except Exception as e:
app.logger.error("sign_csr: load CA private key {%s} for user {%s} Failed, repr(e))
# Sign CSR,giving the CRT
try:
cert = crypto.X509()
cert.set_serial_number(42)
cert.gmtime_adj_notBefore(0)
cert.gmtime_adj_notAfter(EXPIRY_PERIOD)
cert.set_issuer(CAcertificate.get_subject())
subject = csr.get_subject() # will log the subject later
cert.set_subject(subject)
cert.set_pubkey(csr.get_pubkey())
cert.sign(CAprivatekey, DIGEST)
except Exception as e:
app.logger.error("sign_csr: Cannot sign CSR {%s} for user {%s}, repr(e))
return False
# Store signed CRT
try:
file = open(arg_crt_path, "w")
file.write(crypto.dump_certificate(crypto.FILETYPE_PEM, cert).decode('utf-8'))
file.flush()
os.fsync(file)
file.close()
except Exception as e:
app.logger.error("sign_csr: Cannot store CRT {%s} for user {%s}, arg_crt_path, repr(e))
return False
# Success
app.logger.info("sign_csr: Success with CRT {%s} for user {%s},subject={%s}", subject)
return True
def redis_cached(timeout=None, key_prefix='view/%s', unless=None):
"""
?????????
:param timeout: ??????,????????3600?
:param key_prefix: ???
:param unless: ??????????
:return: ???????????
"""
def decorator(f):
@functools.wraps(f) # ??????
def decorated_function(*args, **kwargs):
if callable(unless) and unless() is True:
return f(*args, **kwargs)
if kwargs.get('nocache'):
return f(*args, **kwargs) # ????????? nocache ???????
try:
cache_key = decorated_function.make_cache_key(*args, **kwargs)
cache_key = urllib.quote(cache_key, safe='')
rv = redis_get(cache_key)
except Exception:
if current_app.debug:
raise
return f(*args, **kwargs)
if rv is None:
rv = f(*args, **kwargs)
try:
redis_set(cache_key, rv, timeout=decorated_function.cache_timeout)
except Exception:
if current_app.debug:
raise
return f(*args, **kwargs)
return rv
def make_cache_key(*args, **kwargs):
if callable(key_prefix):
cache_key = key_prefix()
elif '%s' in key_prefix:
cache_key = key_prefix % (request.url+'_uid_'+str(current_user.get_id()))
else:
cache_key = key_prefix
cache_key = hashlib.md5(cache_key.encode('utf-8')).hexdigest()
cache_key = '_'.join((get_version(level='day'), cache_key))
return cache_key
decorated_function.uncached = f
decorated_function.cache_timeout = timeout
decorated_function.make_cache_key = make_cache_key
return decorated_function
return decorator
def redis_memoize(timeout=100, make_name=None,??100s
:param make_name: ????,???????????,?????????,??????????
:param unless: ??????????
:return: ???????????
"""
def decorator(f):
@functools.wraps(f) # ??????
def decorated_function(*args, **kwargs) # ????????? nocache ???????
try:
cache_key = decorated_function.make_cache_key(make_name, args, kwargs)
rv = redis_get(cache_key)
except Exception:
if current_app.debug:
raise
return f(*args, **kwargs)
return rv
def make_cache_key(make_name, keyargs, keykwargs):
fname = f.__name__
if callable(make_name):
fname = make_name(fname)
if isinstance(make_name, str):
fname = make_name
alt_fname = '.'.join((f.__module__, fname))
try:
origin_str = "{0}{1}{2}".format(alt_fname, keykwargs)
except AttributeError:
origin_str = "%s%s%s" % (alt_fname, keykwargs)
cache_key = hashlib.md5(origin_str.encode('utf-8')).hexdigest()
cache_key = '_'.join((get_version(level='day'), cache_key))
return cache_key
decorated_function.uncached = f
decorated_function.cache_timeout = timeout
decorated_function.make_cache_key = make_cache_key
return decorated_function
return decorator
def distributed_write(node=None):
'''
'''
data = request.get_json()
if current_app.debug:
current_app.logger.debug(json.dumps(data, indent=2))
queries = data.get('queries', {})
statuses = data.get('statuses', {})
for guid, results in queries.items():
task = distributedQueryTask.query.filter(
distributedQueryTask.guid == guid,
distributedQueryTask.status == distributedQueryTask.PENDING,
distributedQueryTask.node == node,
).first()
if not task:
current_app.logger.error(
"%s - Got result for distributed query not in PENDING "
"state: %s: %s",
request.remote_addr, guid, json.dumps(data)
)
continue
# non-zero status indicates sqlite errors
if not statuses.get(guid, 0):
status = distributedQueryTask.COMPLETE
else:
current_app.logger.error(
"%s - Got non-zero status code (%d) on distributed query %s", statuses.get(guid), guid
)
status = distributedQueryTask.Failed
for columns in results:
result = distributedQueryResult(
columns,
distributed_query=task.distributed_query,
distributed_query_task=task
)
db.session.add(result)
else:
task.status = status
db.session.add(task)
else:
# need to write last_checkin,last_ip on node
db.session.add(node)
db.session.commit()
return jsonify(node_invalid=False)
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。