Python json 模块,JSONDecodeError() 实例源码
我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用json.JSONDecodeError()。
def tba_get(self, path):
"""Base method for querying the TBA API. Returns the response JSON as a python dict.
:param path: (str) Request path,without the API address prefix (https://www.thebluealliance.com/api/v2/)
:return: A dict parsed from the response from the API.
"""
if self.app_id['X-TBA-app-id'] == "":
raise Exception('An API key is required for TBA. Please use set_api_key() to set one.')
url_str = 'https://www.thebluealliance.com/api/v2/' + path
r = self.session.get(url_str, headers=self.app_id)
# print(r.url)
tba_txt = r.text
try:
return json.loads(tba_txt)
except json.JSONDecodeError:
print(url_str)
print(tba_txt)
def donations_helper():
print("donations_helper")
FecApiObj = FECAPI(FEC_APIKEY)
committees = FecApiObj.get_committees()
PPCampFinObj = CampaignFinanceAPI(ProPublica_APIKEY)
PPCongressApi = CongressAPI(ProPublica_APIKEY)
legislator_index = dict()
legislators = PPCongressApi.list_members('house')["results"][0]["members"]
for legislator in legislators:
name = str(legislator['first_name']) + " " + str(legislator['last_name'])
legislator_index[name] = legislator
legislators = PPCongressApi.list_members('senate')["results"][0]["members"]
for legislator in legislators:
name = str(legislator['first_name']) + " " + str(legislator['last_name'])
legislator_index[name] = legislator
print("starting to iterate through superpacs")
donations = []
count = 0
for committee in committees:
if(2016 in committee['cycles']):
try:
indepExpend = PPCampFinObj.get_indep_expends(str(committee['committee_id']))
for expend in indepExpend["results"]:
try:
#expend fo a particular expenditure
expend['committee_id'] = str(committee['committee_id'])
expend['propublica_candidate_id'] = str(legislator_index[expend['candidate_name']]['id'])
donations.append(expend)
except KeyError:
pass
except JSONDecodeError:
pass
count += 1
return donations
def load(self):
# Prepare + load directory.
super().load()
# Load the files and parse JSON.
parsed_settings = dict()
try:
for file_name in self.files:
file_path = os.path.join(self.directory, file_name)
with open(file_path, 'r') as file_handle:
parsed_settings.update(json.load(file_handle))
except json.JSONDecodeError as e:
raise ImproperlyConfigured(
'Your settings file(s) contain invalid JSON Syntax! Please fix and restart!,{}'.format(str(e))
)
# Loop and set in local settings (+ uppercase keys).
for key, value in parsed_settings.items():
self.settings[key.upper()] = value
def from_json(cls, data):
if isinstance(data, cls):
return data
if isinstance(data, str):
try:
data = json.loads(data)
except json.JSONDecodeError:
raise
d = {}
for k, v in (data or {}).items():
d[cls._toPy.get(k, k)] = v
try:
return cls(**d)
except TypeError:
raise
def execute(self):
try:
self.system.run()
except (ReadTimeout, ConnectionError, JSONDecodeError):
pass
except exceptions.TradingSystemException as e:
curr = datetime.Now()
print('{time} - {text}'.format(time=curr.strftime('%Y-%m-%d %H:%M:%s'), text=str(e)))
except Exception as e:
curr = datetime.Now()
print('{time} - {text} - {args}'.format(time=curr.strftime('%Y-%m-%d %H:%M:%s'), text=str(e), args=e.args))
traceback.print_exc()
if self.interval:
threading.Timer(self.interval, self.execute).start()
def assertJSON(self, response):
if response.text:
try:
data = json.loads(response.text)
except json.JSONDecodeError:
self.fail("Response data is not JSON.")
else:
reference = "{formatted_data}\n".format(
formatted_data=json.dumps(
data, indent=2, sort_keys=True, separators=(',', ': ')
)
)
self.assertEqual(
reference,
response.text
)
def get_phenolist():
# Todo: should this be memoized?
from .file_utils import common_filepaths
filepath = common_filepaths['phenolist']
try:
with open(os.path.join(filepath)) as f:
phenolist = json.load(f)
except (FileNotFoundError, PermissionError):
raise PheWebError(
"You need a file to define your phenotypes at '{}'.\n".format(filepath) +
"For more information on how to make one,see <https://github.com/statgen/pheweb#3-make-a-list-of-your-phenotypes>")
except json.JSONDecodeError:
print("Your file at '{}' contains invalid json.\n".format(filepath) +
"The error it produced was:")
raise
for pheno in phenolist:
pheno['phenocode'] = urllib.parse.quote_plus(pheno['phenocode'])
return phenolist
def with_json(self, json_dict: dict) -> "Statistic":
body = json.dumps(json_dict, sort_keys=True)
actual_body = self.get_current_request().body.decode("utf-8", errors="skip")
try:
actual_json_dict = json.loads(actual_body)
except json.JSONDecodeError:
requested_time = self._current_request_index + 1
self._error_messages.append(f"\nFor the {requested_time} time: with json {body}.\n"
f"But for the {requested_time} time: json was corrupted "
f"{actual_body.__repr__()}.")
return self
actual_body = json.dumps(actual_json_dict, sort_keys=True)
if body != actual_body:
requested_time = self._current_request_index + 1
self._error_messages.append(f"\nFor the {requested_time} time: with json {body}.\n"
f"But for the {requested_time} time: json was {actual_body}.")
return self
def _load_stored_result_from_file(self, default=None):
# First,let's attempt to load the existing results json file
try:
with open(self._results_file_path) as fp:
file_data = fp.read()
except IOError:
return default
# Now attempt to deserialize the file... if that fails,save a backup of the file
try:
return json.loads(file_data)
except json.JSONDecodeError as e:
backup_file_name = "corrupted.{}.{}".format(
str(uuid.uuid4())[:8],
self._results_file_name,
)
logger.error(
"Unable to parse file {}: {},renaming to {} and creating new file".format(
self._results_file_path, e, backup_file_name,
)
)
dst = "{}/{}".format(self._results_directory, backup_file_name)
os.rename(self._results_file_path, dst)
return default
def setup_logging(config_file_path, log_level=logging.INFO):
"""
Logging configuration helper.
:param config_file_path: file path to logging configuration file.
https://docs.python.org/3/library/logging.config.html#object-connections
:param log_level: defaults to logging.INFO
:return: None - access the logger by name as described in the config--or the "root" logger as a backup.
"""
try:
with open(config_file_path, 'rt') as f:
config = json.load(f)
logging.config.dictConfig(config)
except (ValueError, IOError, OSError):
# json.JSONDecodeError is throwable in python3.5+ -- subclass of ValueError
logging.basicConfig(log_level=log_level)
logging.root.exception(
"Could not load specified logging configuration '{}'. Verify the filepath exists and is compliant with: "
"[https://docs.python.org/3/library/logging.config.html#object-connections]".format(config_file_path))
def __init__(self, callResult):
self.success = False
try:
# decode from json if possible
result = callResult.json()
# if there is a status then the response came from the API server
if result['status']:
# so store a copy of the json
self.response = result
# copy the json result code
code = result['status']['code']
# assemble a description of the result
self.text = str(code) + ': ' + result['status']['info']
# if we got a 200 OK then the the json object has the answer data
if code == 200:
self.success = True
else:
# otherwise,assemble a description from the HTTP results
self.text = 'Error ' + str(callResult.status_code) + ': ' + callResult.reason
except JSONDecodeError:
self.text = 'Error ' + str(callResult.status_code) + ': ' + callResult.reason
# chat API call
def __init__(self,assemble a description from the HTTP results
self.text = 'Error ' + str(callResult.status_code) + ': ' + callResult.reason
except JSONDecodeError:
self.text = 'Error ' + str(callResult.status_code) + ': ' + callResult.reason
# chat API call
def get_manifest(self, archive):
try:
with ZipFile(archive.temporary_file_path()) as plugin:
print(plugin.namelist())
prefix = self.get_prefix(plugin)
prefix = prefix + '/' if len(prefix) else ''
with plugin.open('{}manifest.json'.format(prefix)) as myfile:
manifest = json.loads(myfile.read())
validate_manifest(manifest)
return manifest
except BadZipFile:
raise ValidationError('Bad .zip format')
except FileNotFoundError:
raise ValidationError('Error with upload,please try again')
except KeyError:
raise ValidationError('No manifest.json found in archive')
except json.JSONDecodeError:
raise ValidationError('Error with manifest.json,bad Json Format')
except avasdk.exceptions.ValidationError as e:
raise ValidationError('Error in manifest.json ({})'.format(e))
def req_json(self, req):
if req.content_length is None or req.content_length == 0:
return None
if req.content_type is not None and req.content_type.lower(
) == 'application/json':
raw_body = req.stream.read(req.content_length or 0)
if raw_body is None:
return None
try:
json_body = json.loads(raw_body.decode('utf-8'))
return json_body
except json.JSONDecodeError as jex:
print(
"Invalid JSON in request: \n%s" % raw_body.decode('utf-8'))
self.error(
req.context,
"Invalid JSON in request: \n%s" % raw_body.decode('utf-8'))
raise errors.InvalidFormat("%s: Invalid JSON in body: %s" %
(req.path, jex))
else:
raise errors.InvalidFormat("Requires application/json payload")
def _parse_and_update_body(self, handler_def):
"""Parses the request body to JSON."""
if self.request.body:
try:
json_body = json.loads(self.request.body.decode('utf-8'))
except json.JSONDecodeError:
raise BadRequestError(
"Malformed request body. JSON is expected."
)
new_body = json_body
if handler_def.consumes:
try:
new_body = handler_def.consumes.from_json(json_body)
except ValidationError:
# Todo: log warning or error
raise BadRequestError("Bad data structure.")
self.request.body = new_body
def parseMessage(self, msg):
print(msg)
try:
decoded = json.loads(msg)
except json.JSONDecodeError:
return
type = decoded["msgtype"]
if type == "control":
self.handleControl(decoded)
elif type == "sheetdelta":
self.passSheetDelta(decoded)
elif type == "request":
self.handleRequest(decoded)
elif type == "nodedata":
self.passNodedata(decoded)
def discoverWorkers(self):
"""discover new workers via udp broadcasts"""
rlist, wlist, elist = select([self.discoverysocket], [], 0)
if rlist:
received = self.discoverysocket.recvfrom(4096)[0]
discoverydata = {}
try:
discoverydata = json.loads(bytes.decode(received))
except json.JSONDecodeError:
pass
if "ip" in discoverydata and "port" in discoverydata:
if "host" in discoverydata:
name = discoverydata["host"]
else:
name = discoverydata["ip"] + ":" + str(discoverydata["port"])
if name not in self.workers:
treeItem = QTreeWidgetItem(1001) # Type 1000 for Worker Item
treeItem.setText(0, name)
self.treeWidget.addTopLevelItem(treeItem)
self.grabPeriodicInfos() # Grab monitor data
self.workers[name] = Worker(discoverydata, treeItem, nodeDataJar=self.nodeDataJar)
self.workers[name].tick(self.sheetDeltaMemory)
self.workers[name].synchronize()
def _ws_recv_handler(self):
# Given command responses and notifications are all send through the
# same websocket,separate them here,passing command response thanks
# to a Queue.
while True:
raw = await self._websocket.recv()
try:
if isinstance(raw, bytes):
raw = raw.decode()
recv = ejson_loads(raw)
if 'status' in recv:
# Message response
self._resp_queue.put_Nowait(recv)
else:
# Event
self._signal_ns.signal(recv['event']).send(recv['sender'])
except (KeyError, TypeError, json.JSONDecodeError):
# Dummy ???
logger.warning('Backend server sent invalid message: %s' % raw)
def handshake(self):
if self.id:
raise HandshakeError('Handshake already done.')
challenge = _generate_challenge()
query = {'handshake': 'challenge', 'challenge': challenge}
yield Effect(EHandshakeSend(ejson_dumps(query)))
raw_resp = yield Effect(EHandshakeRecv())
try:
resp = ejson_loads(raw_resp)
except (TypeError, json.JSONDecodeError):
error = HandshakeError('Invalid challenge response format')
yield Effect(EHandshakeSend(error.to_raw()))
raise error
resp = HandshakeAnswerSchema().load(resp)
claimed_identity = resp['identity']
try:
pubkey = yield Effect(EPubKeyGet(claimed_identity))
pubkey.verify(resp['answer'], challenge.encode())
yield Effect(EHandshakeSend('{"status": "ok","handshake": "done"}'))
self.id = claimed_identity
except (TypeError, PubKeyNotFound, InvalidSignature):
error = HandshakeError('Invalid signature,challenge or identity')
yield Effect(EHandshakeSend(error.to_raw()))
raise error
def load_string(json_string):
"""Deserialize `json_string` to Python Object. If `json_string` is not a valid json
document,just return `json_string`. It is used in the context of activity results: floto
handles str and JSON serialized results.
Parameters
----------
json_string : any
Returns
-------
obj
"""
try:
j = json.loads(json_string)
except (TypeError, json.JSONDecodeError):
j = json_string
return j
def _json_get(inp):
""" Get a Python object (list or dict) regardless of whether data is passed
as a JSON string,a file path,or is already a python object.
Returns the parsed data,as well as "native" if the data was already a
Python object,"str" if the data was passed as a JSON string,or the path
if the data passed was a file path """
if not (isinstance(inp, dict) or isinstance(inp, list)): # Python object
try: # JSON string
data = json.loads(inp)
dataformat = "str"
except json.JSONDecodeError: # JSON filepath
# Store the filename in the dataformat variable if dataformat is a
# file,because it's just one fewer variable to keep track of
dataformat = inp
with open(inp, encoding="utf-8") as f:
data = json.load(f)
else:
dataformat = "native"
return data, dataformat
def post(self):
request_body = json.dumps(request.get_json())
channel = get_channel_name_from_json(request.get_json())
query_data = json.loads('{}')
try:
# Todo Asnycronous call? ??? ???.
response = ServerComponents().query(request_body, channel)
logging.debug(f"query result : {response}")
query_data['response_code'] = str(response.response_code)
try:
query_data['response'] = json.loads(response.response)
except json.JSONDecodeError as e:
logging.warning("your response is not json,your response(" + str(response.response) + ")")
query_data['response'] = response.response
except _Rendezvous as e:
logging.error(f'Execute Query Error : {e}')
if e.code() == grpc.StatusCode.DEADLINE_EXCEEDED:
# Todo REST ?? ??? ???(?? ??? Response code ??) Extract Method ?? ?? ??? ?? ??? ??
logging.debug("gRPC timeout !!!")
query_data['response_code'] = str(message_code.Response.timeout_exceed)
return query_data
def get(self):
args = ServerComponents().parser.parse_args()
response = ServerComponents().get_transaction(args['hash'], get_channel_name_from_args(args))
tx_data = json.loads('{}')
tx_data['response_code'] = str(response.response_code)
tx_data['data'] = ""
if len(response.data) is not 0:
try:
tx_data['data'] = json.loads(response.data)
except json.JSONDecodeError as e:
logging.warning("your data is not json,your data(" + str(response.data) + ")")
tx_data['data'] = response.data
tx_data['Meta'] = ""
if len(response.Meta) is not 0:
tx_data['Meta'] = json.loads(response.Meta)
tx_data['more_info'] = response.more_info
b64_sign = base64.b64encode(response.signature)
tx_data['signature'] = b64_sign.decode()
b64_public_key = base64.b64encode(response.public_key)
tx_data['public_key'] = b64_public_key.decode()
return tx_data
def get(self):
logging.debug('transaction result')
args = ServerComponents().parser.parse_args()
logging.debug('tx_hash : ' + args['hash'])
channel_name = get_channel_name_from_args(args)
response = ServerComponents().get_invoke_result(args['hash'], channel_name)
verify_result = dict()
verify_result['response_code'] = str(response.response_code)
if len(response.result) is not 0:
try:
result = json.loads(response.result)
result['jsonrpc'] = '2.0'
verify_result['response'] = result
except json.JSONDecodeError as e:
logging.warning("your data is not json,your data(" + str(response.data) + ")")
verify_result['response_code'] = message_code.Response.fail
else :
verify_result['response_code'] = str(message_code.Response.fail)
return verify_result
def jwks_to_keyjar(jwks, iss=''):
"""
Convert a JWKS to a KeyJar instance.
:param jwks: String representation of a JWKS
:return: A :py:class:`oic.utils.keyio.KeyJar` instance
"""
if not isinstance(jwks, dict):
try:
jwks = json.loads(jwks)
except json.JSONDecodeError:
raise ValueError('No proper JSON')
kj = KeyJar()
kj.import_jwks(jwks, issuer=iss)
return kj
def get_key_from_headers(self, request, key_names, key_in_body=False):
if key_in_body:
try:
body = json.loads(request.body.decode('utf-8'))
for k in key_names:
if k in body:
return body[k]
return None
except json.JSONDecodeError:
return None
for n in key_names:
name = n.upper().replace('-', '_')
key_name = 'HTTP_{0}'.format(name)
if key_name in request.Meta:
return request.Meta[key_name]
return None
def check_files():
default = {"VOLUME": 50, "MAX_LENGTH": 3700, "Vote_ENABLED": True,
"MAX_CACHE": 0, "SOUNDCLOUD_CLIENT_ID": None,
"TITLE_STATUS": True, "AVCONV": False, "Vote_THRESHOLD": 50,
"SERVERS": {}}
settings_path = "data/audio/settings.json"
if not os.path.isfile(settings_path):
print("Creating default audio settings.json...")
dataIO.save_json(settings_path, default)
else: # consistency check
try:
current = dataIO.load_json(settings_path)
except JSONDecodeError:
# settings.json keeps getting corrupted for unkNown reasons. Let's
# try to keep it from making the cog load fail.
dataIO.save_json(settings_path, default)
current = dataIO.load_json(settings_path)
if current.keys() != default.keys():
for key in default.keys():
if key not in current.keys():
current[key] = default[key]
print(
"Adding " + str(key) + " field to audio settings.json")
dataIO.save_json(settings_path, current)
def listen(self, ws, environ):
self._add(ws)
while not ws.closed:
try:
message = ws.receive()
except WebSocketError:
break
if message is not None:
try:
message = json.loads(message)
except json.JSONDecodeError:
break
# odoo heavily relies on httprequests,for each message
# a new httprequest will be created. This request will be
# based on the original environ from the socket initialization
# request.
httprequest = werkzeug.wrappers.Request(environ.copy())
odoo.http.root.setup_session(httprequest)
odoo.http.root.setup_db(httprequest)
odoo.http.root.setup_lang(httprequest)
gevent.spawn(self.respond, httprequest, message)
self._remove(ws)
def __get_conf_from_file(self, file_name):
try:
json_file = open(file_name, 'r')
setting_dict = json.load(json_file)
self.TRUST_P = setting_dict.get('trust_p', 0.45)
self.COUNTRY = setting_dict.get('country', 'JP')
self.COVER_MIN_SIZE = setting_dict.get('cover_min_size', 1000)
self.SAVE_COVER_TYPE = setting_dict.get('save_cover_type', 1)
self.DOWNLOAD_DIR = setting_dict.get('download_dir', './cover')
self.SEARCH_POSTFIX_NAME = setting_dict.get('search_postfix_name', 'cue,m4a,flac')
self.FILE_NAME_FORMAT = setting_dict.get('file_name_format', '')
self.REPLACE_COVER = setting_dict.get('replace_cover', 1)
self.SEARCH_THREAD_NUM = setting_dict.get('search_thread_num', 6)
self.DOWNLOAD_THREAD_NUM = setting_dict.get('download_thread_num', 5)
except FileNotFoundError:
print("Conf file 'setting.json' not found")
exit(1)
except json.JSONDecodeError:
print("Conf file err")
exit(1)
except Exception:
print("Some err occur")
exit(1)
def load_from_persistent_storage(self):
# precedence: files > (modules,scripts)
# Note that `files` will only be changed manually
# It is not updated by changing `modules` or `scripts`.
# However,`files` will affect `modules` and `scripts`.
decoded = {}
if self.project_directory and self.project_directory.exists():
for key, project_file in self.project_files.items():
try:
decoded[key] = serialize.decode(category_type[key], json.loads(project_file.read_text()))
except json.JSONDecodeError as err:
logger.error(err)
except FileNotFoundError as err:
#logger.error(err)
pass
return decoded
def fetch_valid_json(cmd):
"""Assert stdout contains valid JSON
:param cmd: program and arguments
:type cmd: [str]
:returns: parsed JSON AST
"""
returncode, stdout, stderr = exec_command(cmd)
assert returncode == 0
assert stderr == b''
try:
return json.loads(stdout.decode('utf-8'))
except json.JSONDecodeError:
error_text = 'Command "{}" returned invalid JSON'.format(' '.join(cmd))
raise Exception(error_text)
def do_request(ns):
method = METHODS[ns.command]
destinations = dictify(ns.destinations) if hasattr(ns, 'destinations') else None
json = jsonify(ns, destinations=destinations if destinations else None)
validate(ns.api_url, throw=True)
response = requests.request(method, ns.api_url / 'autocert', json=json)
status = response.status_code
try:
json = response.json()
output(json)
except JSONDecodeError as jde:
print('status =', status)
print('JSONDecodeError =', jde)
print('text =', response.text)
return -1
return status
def read(filepath):
path = None
try:
if os.path.isfile(os.path.expanduser(filepath)):
path = os.path.expanduser(filepath)
elif os.path.isfile('./.keystorerc'):
path = './.keystorerc'
else:
raise OSError('''The config file .keystorerc is not found in home or local working directory.
Please refer to https://pypi.python.org/pypi/keystore for setting up a .keystorerc file.''')
with open(path) as f:
conf = json.loads(f.read())
return conf
except OSError as err:
print('Unable to open config: {}'.format(err), file=sys.stderr)
return None
except json.JSONDecodeError as err:
print('Unable to parse config: {}'.format(err), file=sys.stderr)
return None
def show_response(self, response):
"""Print an HTTP Response."""
if response.headers.get('Content-Type', None) == 'application/json':
try:
body = json.dumps(response.json(), indent=2)
except json.JSONDecodeError:
body = response.text
else:
body = response.text
http_txt = self.HTTP_TPL.substitute(
http_version=str(float(response.raw.version) / 10),
status_code=response.status_code,
reason=response.reason,
headers=self.key_value_pairs(response.headers),
body=body,
)
return highlight(http_txt, self.http_lexer, self.formatter)
def populate_user_blacklist(self):
for user in self.user_blacklist:
user_id_url = self.url_user_detail % (user)
info = self.s.get(user_id_url)
# prevent error if 'Account of user was deleted or link is invalid
from json import JSONDecodeError
try:
all_data = json.loads(info.text)
except JSONDecodeError as e:
self.write_log('Account of user %s was deleted or link is '
'invalid' % (user))
else:
# prevent exception if user have no media
id_user = all_data['user']['id']
# Update the user_name with the user_id
self.user_blacklist[user] = id_user
log_string = "Blacklisted user %s added with ID: %s" % (user,
id_user)
self.write_log(log_string)
time.sleep(5 * random.random())
def load_json(ctx, filename):
if filename is None:
if sys.stdin.isatty():
click.echo(ctx.get_usage())
click.echo("Try `jsoncut --help' for more information.")
sys.exit(0)
else:
filename = '-'
try:
with click.open_file(filename) as file_:
return json.load(file_)
except EnvironmentError as e:
if not sys.stdin.isatty():
sys.stdin.read()
click.echo(exc.default_error_mesg_fmt(e), err=True)
sys.exit(1)
except json.JSONDecodeError as e:
click.echo(exc.default_error_mesg_fmt(e), err=True)
sys.exit(1)
def get_latest_log_entry_for(name,**kwargs):
db = connect_db()
c = db.cursor()
refine_search = ''
if 'successfully_uploaded' == True:
refine_search = 'AND uploaded = 1 '
c.execute('SELECT info_json,uploadable,uploaded FROM log WHERE name=? '+refine_search+'ORDER BY time DESC LIMIT 1',(name,))
try:
entry = c.fetchone()
result = json.loads(entry[0])
uploadable = entry[1]
uploaded = entry[2]
except (json.JSONDecodeError,TypeError):
result = {}
uploadable = None
uploaded = None
return result, uploadable, uploaded
def get_candles_df(self, currency_pair, epoch_start, epoch_end, period=False):
"""
Returns candlestick chart data in pandas dataframe
"""
try:
data = self.get_candles(currency_pair, period)
df = pd.DataFrame(data)
df = df.tail(1)
df['close'] = df['close'].astype(float)
df['volume'] = df['volume'].astype(float)
df['pair'] = currency_pair
return df
except (PoloniexError, JSONDecodeError) as e:
print()
print(colored('!!! Got exception while retrieving polo data:' + str(e) + ',pair: ' + currency_pair, 'red'))
return pd.DataFrame()
def recv(self):
n = 0
self._soc.settimeout(0.1)
try:
got = self._soc.recv()
while 1:
try:
val = json.loads(got)
self._handle_event(val['method'], val['params'])
n += 1
break
except json.JSONDecodeError as e:
self._handle_event(got[:e.pos])
n += 1
got = got[e.pos:]
except websocket.WebSocketTimeoutException:
pass
self._soc.settimeout(None)
return n
def read_json(conn, length):
"""
Reads json request of the given length from the client socket.
Specified length must be available in the socket; otherwise function
will raise BlockingIOError.
:param conn: active client socket to read data from
:param length: length of the json content
:return: dictionary corresponding to received json
:rtype: dict
:raise json.JSONDecodeError:
:raise BlockingIOError:
"""
buffer = io.BytesIO()
while length > 0:
chunk = conn.recv(min(2048, length))
length -= buffer.write(chunk)
content = buffer.getvalue().decode('utf-8')
if not content:
return None
else:
return json.loads(content)
def parse_sofa_message(message):
match = SOFA_REGEX.match(message)
if not match:
raise SyntaxError("Invalid SOFA message")
body = match.group('json')
try:
body = json.loads(body)
except json.JSONDecodeError:
raise SyntaxError("Invalid SOFA message: body is not valid json")
type = match.group('type').lower()
if type not in VALID_SOFA_TYPES:
raise SyntaxError("Invalid SOFA type")
if type not in IMPLEMENTED_SOFA_TYPES:
raise NotImplementedError("SOFA type '{}' has not been implemented yet".format(match.group('type')))
try:
return IMPLEMENTED_SOFA_TYPES[type](**body)
except TypeError:
raise SyntaxError("Invalid SOFA message: body contains unexpected fields")
def __call__(self, request):
if isinstance(request, (bytes, str)):
try:
request = json_decode(request)
except json.JSONDecodeError:
return _parse_error(request)
# check batch request
if isinstance(request, list):
resp = []
for r in request:
result = await self._handle_single_request(r)
if result:
resp.append(result)
# if all were notifications
if not resp:
return None
return resp
# standard single request
return await self._handle_single_request(request)
def __init__(self, *args, **kwargs):
super_self = super()
super_self.__init__()
super_self.__setattr__('__values__', {})
if len(args) > 0:
json_obj = args[0]
else:
json_obj = {}
if isinstance(json_obj, str):
try:
json_obj = json.loads(json_obj)
except json.JSONDecodeError:
raise TypeError('json object is not valid (dict-like or json string) for conversion to model: {!r}'
.format(json_obj))
self.from_dict(json_obj)
self.from_dict(kwargs)
def json(self, branch='master', filename=''):
"""Retrieve _filename_ from GitLab.
Args:
branch (str): Git Branch to find file.
filename (str): Name of file to retrieve.
Returns:
dict: Decoded JSON.
Raises:
SystemExit: Invalid JSON provided.
"""
file_contents = self.get(branch=branch, filename=filename)
try:
json_dict = json.loads(file_contents)
# Todo: Use json.JSONDecodeError when Python 3.4 has been deprecated
except ValueError as error:
msg = ('"{filename}" appears to be invalid json. '
'Please validate it with http://jsonlint.com. '
'JSON decoder error:\n'
'{error}').format(
filename=filename, error=error)
raise SystemExit(msg)
LOG.debug('JSON object:\n%s', json_dict)
return json_dict
def process_log(file_path):
"""
Expects header as first line in log file. Header begins with comment character '#'. The line is a json string dump of
a dictionary that contains the following keys:
name: str,Task name
maintainer: str,Name of person
tags: list of tags associated with the task. can be empty
properties: list of properties associated with the task. can be empty
run_id: str,a run ID for the task run
timestamp: str,timestamp for the task run
:param file_path:
:return:
"""
# read header
if isinstance(file_path, str):
with open(file_path) as f:
line = f.readline()
else:
line = file_path.readline()
if not line.startswith("#"):
raise ValueError("Expecting header in log file")
try:
Metadata = json.loads(line[1:])
if 'timestamp' in Metadata:
Metadata['timestamp'] = dateutil_parse(Metadata['timestamp'])
except JSONDecodeError as e:
Metadata = {"name": "", "timestamp": "", "run_id": ""}
df = parse_log(file_path)
return df, Metadata
def validate_json(value):
try:
json.loads(value)
except json.JSONDecodeError:
raise ValidationError("Not valid json")
def submit(request):
"""Endpoint for submitting new crawls.
Expects POST['data'] to be populated with data for a single document group."""
# Todo authentication? Secret keys?
# Todo stop processing the documents when submitted; use processing queues
input_raw = request.POST.get('data')
if not input_raw:
return HttpResponseBadRequest('POST["data"] is not set!')
try:
input = json.loads(input_raw)
except json.JSONDecodeError:
return HttpResponseBadRequest('POST["data"] is not valid json')
input_hash = hash.dict_sha1(input)
submitted, created = models.SubmittedData.objects.update_or_create(
sha1=input_hash,
data=input
)
try:
doc, new = process(submitted)
index_data(doc)
except ProcessingInputError as e:
print(e)
return HttpResponseServerError('error: ' + str(e))
response = {
'status': 'ok',
'new': new,
}
return JsonResponse(response)
def method(self, key, **data):
""" Return a result of executing vk's method `method`
Function for special cases only!
This method doesn't process nor errors nor captcha.
"""
url = f"https://api.vk.com/method/{key}?access_token={self.token}&v={VERSION}"
if data is None:
data = {}
if data.get("_replace_nl", True):
for k, v in data.items():
data[k] = v.replace("\n", "<br>")
if "_replace_nl" in data:
del data["_replace_nl"]
async with self.session.post(url, data=data, **self.req_kwargs) as resp:
try:
results = json_iter_parse(await resp.text())
for data in results:
if 'response' in data:
return data['response']
except json.JSONDecodeError:
self.logger.error("Error while executing vk method: vk's response is wrong!")
return False
return False
def _get_page(self, page_number):
data = {
'mid': str(self._mid),
'page': str(page_number),
'_': '1496132105785'
}
pages = 0
fansnumber = 0
fans_ids = ""
try:
url = "http://space.bilibili.com/ajax/friend/GetFansList?" + urlencode(data)
response = requests.get(url)
if response.status_code != 200:
return None
html_cont = response.text
try:
data = json.loads(html_cont)
if data and (data.get('status') is True):
if data and 'data' in data.keys():
if(page_number == 1):
pages = data.get('data').get('pages')
fansnumber = data.get('data').get('results')
for fans in data.get('data').get('list'):
fans_ids = str(fans.get('fid')) + ',' + fans_ids
elif (data.get('data') == "????????"):
pages = 0
fansnumber = 0
except JSONDecodeError:
pass
self._fans_ids = fans_ids + self._fans_ids
return pages, fansnumber
except RequestException:
return self._get_page(page_number)
def _getinfo(self, cont):
aids = ""
tag_ids = ""
tag_names = ""
tag_counts = ""
try:
data = json.loads(cont)
if data and 'data' in data.keys():
for video in data.get('data').get('vlist'):
aids = str(video.get('aid')) + ',' + aids
return aids
except JSONDecodeError:
pass
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。