Python json 模块,loads() 实例源码
我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用json.loads()。
def restartJobs(args, config): # Todo: reimplement
pipelinedbutils = Pipelinedbutils(config)
pipelineQueueUtils = PipelineQueueUtils('WAIT_Q')
if args.jobId:
request = json.loads(pipelinedbutils.getJobInfo(select=["request"], where={"job_id": args.jobId})[0].request)
msg = {
"job_id": args.jobId,
"request": request
}
pipelineQueueUtils.publish(json.dumps(msg))
if args.preempted:
preempted = pipelinedbutils.getJobInfo(select=["job_id", "request"], where={"current_status": "PREEMPTED"})
for p in preempted:
msg = {
"job_id": p.job_id,
"request": json.loads(p.request)
}
pipelineQueueUtils.publish(json.dumps(msg))
def fetch_data():
try:
r = requests.get(MTG_JSON_URL)
except requests.ConnectionError:
r = requests.get(FALLBACK_MTG_JSON_URL)
with closing(r), zipfile.ZipFile(io.BytesIO(r.content)) as archive:
unzipped_files = archive.infolist()
if len(unzipped_files) != 1:
raise RuntimeError("Found an unexpected number of files in the MTGJSON archive.")
data = archive.read(archive.infolist()[0])
decoded_data = data.decode('utf-8')
sets_data = json.loads(decoded_data)
return sets_data
def main():
for url in url_list :
try:
r = requests.get(url)
except : continue
tree = html.fromstring(r.text)
script = tree.xpath('//script[@language="javascript"]/text()')[0]
json_string = regex.findall(script)[0]
json_data = json.loads(json_string)
next_page_url = tree.xpath('//footer/a/@href')
links = [domain + x['nodeRef'] for x in json_data]
for link in links:
extract(link)
def get_transform_specs_json_by_project(self):
"""get transform_specs driver table info."""
transform_specs_json = """
{"aggregation_params_map":{
"aggregation_pipeline":{"source":"streaming",
"usage":"fetch_quantity",
"setters":["rollup_quantity",
"set_aggregated_metric_name",
"set_aggregated_period"],
"insert":["prepare_data",
"insert_data"]},
"aggregated_metric_name": "vcpus_agg",
"aggregation_period": "hourly",
"aggregation_group_by_list": ["host","metric_id","tenant_id"],
"usage_fetch_operation": "latest",
"setter_rollup_group_by_list": ["tenant_id"],
"setter_rollup_operation": "sum",
"dimension_list":["aggregation_period",
"host",
"project_id"]
},
"metric_group":"vcpus_project",
"metric_id":"vcpus_project"}"""
return [json.loads(transform_specs_json)]
def parse_answer(self, reponse):
# ??question?answer
ans_json = json.loads(reponse.text)
is_end = ans_json["paging"]["is_end"]
next_url = ans_json["paging"]["next"]
# ??answer?????
for answer in ans_json["data"]:
answer_item = ZhihuAnswerItem()
answer_item["zhihu_id"] = answer["id"]
answer_item["url"] = answer["url"]
answer_item["question_id"] = answer["question"]["id"]
answer_item["author_id"] = answer["author"]["id"] if "id" in answer["author"] else None
answer_item["content"] = answer["content"] if "content" in answer else None
answer_item["parise_num"] = answer["Voteup_count"]
answer_item["comments_num"] = answer["comment_count"]
answer_item["create_time"] = answer["created_time"]
answer_item["update_time"] = answer["updated_time"]
answer_item["crawl_time"] = datetime.datetime.Now()
yield answer_item
if not is_end:
yield scrapy.Request(next_url, headers=self.headers, callback=self.parse_answer)
def watchJob(jobId, exchangeName):
queue = PipelineQueue('PIPELINE_JOB_{j}'.format(j=jobId))
queue.bindToExchange(exchangeName, jobId)
while True:
body, method = queue.get()
if method:
body = json.loads(body)
if body["current_status"] == "SUCCEEDED":
return jobId
else:
raise PipelineserviceError("Job {j} has current status {s}!".format(j=jobId, s=body["current_status"]))
else:
pass
def tba_get(self, path):
"""Base method for querying the TBA API. Returns the response JSON as a python dict.
:param path: (str) Request path,without the API address prefix (https://www.thebluealliance.com/api/v2/)
:return: A dict parsed from the response from the API.
"""
if self.app_id['X-TBA-app-id'] == "":
raise Exception('An API key is required for TBA. Please use set_api_key() to set one.')
url_str = 'https://www.thebluealliance.com/api/v2/' + path
r = self.session.get(url_str, headers=self.app_id)
# print(r.url)
tba_txt = r.text
try:
return json.loads(tba_txt)
except json.JSONDecodeError:
print(url_str)
print(tba_txt)
def get_gw_interfaces():
'''
Gateway node can have multiple interfaces. This function parses json
provided in config to get all gateway interfaces for this node.
'''
node_interfaces = []
try:
all_interfaces = json.loads(config('external-interfaces'))
except ValueError:
raise ValueError("Invalid json provided for gateway interfaces")
hostname = get_unit_hostname()
if hostname in all_interfaces:
node_interfaces = all_interfaces[hostname].split(',')
elif 'DEFAULT' in all_interfaces:
node_interfaces = all_interfaces['DEFAULT'].split(',')
for interface in node_interfaces:
if not interface_exists(interface):
log('Provided gateway interface %s does not exist'
% interface)
raise ValueError('Provided gateway interface does not exist')
return node_interfaces
def getrange(self, key_prefix, strip=False):
"""
Get a range of keys starting with a common prefix as a mapping of
keys to values.
:param str key_prefix: Common prefix among all keys
:param bool strip: Optionally strip the common prefix from the key
names in the returned dict
:return dict: A (possibly empty) dict of key-value mappings
"""
self.cursor.execute("select key,data from kv where key like ?",
['%s%%' % key_prefix])
result = self.cursor.fetchall()
if not result:
return {}
if not strip:
key_prefix = ''
return dict([
(k[len(key_prefix):], json.loads(v)) for k, v in result])
def relation_get(attribute=None, unit=None, rid=None):
"""Get relation information"""
_args = ['relation-get', '--format=json']
if rid:
_args.append('-r')
_args.append(rid)
_args.append(attribute or '-')
if unit:
_args.append(unit)
try:
return json.loads(subprocess.check_output(_args).decode('UTF-8'))
except ValueError:
return None
except CalledProcessError as e:
if e.returncode == 2:
return None
raise
def status_get():
"""Retrieve the prevIoUsly set juju workload state and message
If the status-get command is not found then assume this is juju < 1.23 and
return 'unkNown',""
"""
cmd = ['status-get', "--format=json", "--include-data"]
try:
raw_status = subprocess.check_output(cmd)
except OSError as e:
if e.errno == errno.ENOENT:
return ('unkNown', "")
else:
raise
else:
status = json.loads(raw_status.decode("UTF-8"))
return (status["status"], status["message"])
def get_cache_mode(service, pool_name):
"""
Find the current caching mode of the pool_name given.
:param service: six.string_types. The Ceph user name to run the command under
:param pool_name: six.string_types
:return: int or None
"""
validator(value=service, valid_type=six.string_types)
validator(value=pool_name, valid_type=six.string_types)
out = check_output(['ceph', '--id', service, 'osd', 'dump', '--format=json'])
try:
osd_json = json.loads(out)
for pool in osd_json['pools']:
if pool['pool_name'] == pool_name:
return pool['cache_mode']
return None
except ValueError:
raise
def get_rmq_cluster_running_nodes(self, sentry_unit):
"""Parse rabbitmqctl cluster_status output string,return list of
running rabbitmq cluster nodes.
:param unit: sentry unit
:returns: List containing node names of running nodes
"""
# NOTE(beisner): rabbitmqctl cluster_status output is not
# json-parsable,do string chop foo,then json.loads that.
str_stat = self.get_rmq_cluster_status(sentry_unit)
if 'running_nodes' in str_stat:
pos_start = str_stat.find("{running_nodes,") + 15
pos_end = str_stat.find("]},", pos_start) + 1
str_run_nodes = str_stat[pos_start:pos_end].replace("'", '"')
run_nodes = json.loads(str_run_nodes)
return run_nodes
else:
return []
def run_action(self, unit_sentry, action,
_check_output=subprocess.check_output,
params=None):
"""Run the named action on a given unit sentry.
params a dict of parameters to use
_check_output parameter is used for dependency injection.
@return action_id.
"""
unit_id = unit_sentry.info["unit_name"]
command = ["juju", "action", "do", unit_id, action]
if params is not None:
for key, value in params.iteritems():
command.append("{}={}".format(key, value))
self.log.info("Running command: %s\n" % " ".join(command))
output = _check_output(command, universal_newlines=True)
data = json.loads(output)
action_id = data[u'Action queued with id']
return action_id
def test_view_responds_stats_on(self):
self.get(NODEINFO_DOCUMENT_PATH)
self.response_200()
self.assertEqual(
json.loads(decode_if_bytes(self.last_response.content))["usage"],
{
"users": {
"total": User.objects.count(),
"activeHalfyear": User.objects.filter(last_login__gte=Now() - datetime.timedelta(days=180)).count(),
"activeMonth": User.objects.filter(last_login__gte=Now() - datetime.timedelta(days=30)).count(),
},
"localPosts": Content.objects.filter(
author__user__isnull=False, content_type=ContentType.CONTENT).count(),
"localComments": Content.objects.filter(
author__user__isnull=False, content_type=ContentType.REPLY).count(),
}
)
def pull_user_data(session):
print 'pulling users'
user_data = requests.get(u"{}{}".format(config.prod_url, 'export/users'))
loaded_data = json.loads(user_data.text)
for user_dict in loaded_data:
user = User(
id=user_dict['id'],
name=user_dict['name'],
email=user_dict['email'],
admin=user_dict['admin'],
avatar=user_dict['avatar'],
active=user_dict['active'],
created_at=user_dict['created_at'],
elo=user_dict['elo'],
wins=user_dict['wins'],
losses=user_dict['losses']
)
session.add(user)
session.commit()
print 'done pulling users'
def pull_game_data(session):
print 'pulling games'
game_data = requests.get(u"{}{}".format(config.prod_url, 'export/games'))
loaded_data = json.loads(game_data.text)
for game_dict in loaded_data:
game = Game(
id=game_dict['id'],
created_at=game_dict['created_at'],
deleted_at=game_dict['deleted_at'],
winner_id=game_dict['winner_id'],
winner_elo_score=game_dict['winner_elo_score'],
loser_id=game_dict['loser_id'],
loser_elo_score=game_dict['loser_elo_score'],
submitted_by_id=game_dict['submitted_by_id']
)
session.add(game)
session.commit()
print 'done pulling games'
def generate(self, template_path, source_json_path, output_path):
print("Generating content at %s with template at %s,using key %s" % (
output_path, self.key_name))
data = []
with open(source_json_path) as f:
for line in f:
json_line = json.loads(line)
data_line = '(\'%s\',\n\'%s\')' % (
json_line[self.key_name], json.dumps(json_line))
data.append(str(data_line))
print(data)
with open(template_path) as f:
template = f.read()
with open(output_path, 'w') as write_file:
write_file.write(template)
for record in data:
write_file.write(record)
write_file.write(',\n')
write_file.seek(-2, 1)
write_file.truncate()
write_file.write(';')
def get_transform_specs_json_by_all(self):
"""get transform_specs driver table info."""
transform_specs_json = """
{"aggregation_params_map":{
"aggregation_pipeline":{"source":"streaming","metric_id"],
"setter_rollup_group_by_list": [],
"metric_group":"vcpus_all",
"metric_id":"vcpus_all"}"""
return [json.loads(transform_specs_json)]
def get_pre_transform_specs_json(self):
"""get pre_transform_specs driver table info."""
pre_transform_specs = ["""
{"event_processing_params":{"set_default_zone_to":"1",
"set_default_geolocation_to":"1",
"set_default_region_to":"W"},
"event_type":"cpu.total_logical_cores",
"metric_id_list":["cpu_util_all"],
"required_raw_fields_list":["creation_time"],
"service_id":"host_metrics"}""", """
{"event_processing_params":{"set_default_zone_to":"1",
"event_type":"cpu.idle_perc",
"service_id":"host_metrics"}"""]
pre_transform_specs_json_list = \
[json.loads(pre_transform_spec)
for pre_transform_spec in pre_transform_specs]
return pre_transform_specs_json_list
def get_transform_specs_json_invalid_name(self):
"""get transform_specs driver table info."""
transform_specs_json = """
{"aggregation_params_map":{
"aggregation_pipeline":{"source":"streaming",
"aggregated_metric_name": "&invalidmetricname",
"usage_fetch_operation": "sum",
"setter_rollup_group_by_list": ["host"],
"dimension_list":["aggregation_period",
"metric_group":"mem_total_all",
"metric_id":"mem_total_all"}"""
return [json.loads(transform_specs_json)]
def get_pre_transform_specs_json(self):
"""get pre_transform_specs driver table info."""
pre_transform_specs = ["""
{"event_processing_params":{"set_default_zone_to":"1",
"service_id":"host_metrics"}"""]
pre_transform_specs_json_list = \
[json.loads(pre_transform_spec)
for pre_transform_spec in pre_transform_specs]
return pre_transform_specs_json_list
def test_error_response(self):
self.assertEqual(self.analyzer.get_param('config.password'), "secret")
self.assertEqual(self.analyzer.get_param('config.key'), "secret")
self.assertEqual(self.analyzer.get_param('config.apikey'), "secret")
self.assertEqual(self.analyzer.get_param('config.api_key'), "secret")
# Run the error method
with self.assertRaises(SystemExit):
self.analyzer.error('Error', True)
# Get the output
output = self.analyzer.fpoutput.getvalue().strip()
json_output = json.loads(output)
self.assertEqual(json_output['success'], False)
self.assertEqual(json_output['errorMessage'], 'Error')
self.assertEqual(json_output['input']['dataType'], 'ip')
self.assertEqual(json_output['input']['data'], '1.1.1.1')
self.assertEqual(json_output['input']['config']['password'], 'REMOVED')
self.assertEqual(json_output['input']['config']['key'], 'REMOVED')
self.assertEqual(json_output['input']['config']['apikey'], 'REMOVED')
self.assertEqual(json_output['input']['config']['api_key'], 'REMOVED')
def get_sample(self, samplehash):
"""
Downloads information about a sample using a given hash.
:param samplehash: hash to search for. Has to be either md5,sha1 or sha256
:type samplehash: str
:returns: Dictionary of results
:rtype: dict
"""
apiurl = '/rest/sample/'
if len(samplehash) == 32: # MD5
apiurl += 'md5/'
elif len(samplehash) == 40: # SHA1
apiurl += 'sha1/'
elif len(samplehash) == 64: # SHA256
apiurl += 'sha256/'
else:
raise UnkNownHashTypeError('Sample hash has an unkNown length.')
res = self.session.get(self.url + apiurl + samplehash)
if res.status_code == 200:
return json.loads(res.text)
else:
raise BadResponseError('Response from VMRay was not HTTP 200.'
' Responsecode: {}; Text: {}'.format(res.status_code, res.text))
def query_job_status(self, submissionid):
"""
Queries vmray to check id a job was
:param submissionid: ID of the job/submission
:type submissionid: int
:returns: True if job finished,false if not
:rtype: bool
"""
apiurl = '/rest/submission/'
result = self.session.get('{}{}{}'.format(self.url, apiurl, submissionid))
if result.status_code == 200:
submission_info = json.loads(result.text)
if submission_info.get('data', {}).get('submission_finished', False): # Or something like that
return True
else:
raise UnkNownSubmissionIdError('Submission id seems invalid,response was not HTTP 200.')
return False
def __query_safebrowsing(self, search_value, search_type):
"""
The actual query to safebrowsing api
:param search_value: value to search for
:type search_value: str
:param search_type: 'url' or 'ip'
:type search_type: str
:return: Results
:rtype: str
"""
return json.loads(
self.session.post(
self.url,
json=self.__prepare_body(
search_value=search_value,
search_type=search_type
)
).text
)
def run(self):
data = self.getData()
value = {
data: {
"type": self.data_type
}
}
json_data = json.dumps(value)
post_data = json_data.encode('utf-8')
headers = {'Content-Type': 'application/json'}
try:
request = urllib2.Request('{}/hippocampe/api/v1.0/{}'.format(self.url, self.service), post_data, headers)
response = urllib2.urlopen(request)
report = json.loads(response.read())
self.report(report)
except urllib2.HTTPError:
self.error("Hippocampe: " + str(sys.exc_info()[1]))
except urllib2.URLError:
self.error("Hippocampe: service is not available")
except Exception as e:
self.unexpectedError(e)
def logic(data):
# ????????
#print data
f1 = open('/tmp/test.txt','a')
f1.write(data)
f1.write('\n')
f1.close()
data = json.loads(data)
print type(data)
mounts = data["MOUNT"]
netifaces = data["NET"]
for netif in netifaces:
if netif["status"] == "up":
ip = netif["ip"]
#print netifaces['ip']
for mount in mounts:
Mount_point= mount['path']
use_rate=mount['used_rate']
if use_rate > '70':
send_mail(recvmail_conf['addr'],"mount point"+Mount_point ,used_rate)
return("OK")
def _reply(self, json_reply):
"""
Handle a reply that came in over the transport provided
"""
if not json_reply.startswith('{'):
self.sdata.log('Received non-JSON data: "{}"'.format(json_reply))
return
reply = json.loads(json_reply, object_pairs_hook=OrderedDict)
if reply['jsonrpc'] != '2.0' or 'id' not in reply or reply['id'] is None:
self.sdata.log('Received bad JSON-RPC reply: {}'.format(json_reply))
if len(self.pending_reply_map) == 1:
# lucky! can guess a pending reply to kill
this_id = self.pending_reply_map.keys()[0]
d = self.pending_reply_map[this_id]
del self.pending_reply_map[this_id]
e = JsonRpcException('Bad reply: {}'.format(json_reply))
d.errback(e)
return
this_id = int(reply['id'])
if 'method' in reply and this_id in self.pending_reply_map:
self.sdata.log('Got echo of request for {},ignoring'.format(this_id))
else:
d = self.pending_reply_map[this_id]
del self.pending_reply_map[this_id]
d.callback(reply)
def create_cd(col_name, type, display_name):
url = 'https://api.kentik.com/api/v5/customdimension'
json_template = '''
{
"name": "{{ column }}",
"type": "{{ data_type }}",
"display_name": "{{ pretty_name }}"
}
'''
t = Template(json_template)
data = json.loads(t.render(column = col_name, data_type = type, pretty_name = display_name))
response = requests.post(url, headers=headers, data=data)
if response.status_code != 201:
print("Unable to create custom dimension column. Exiting.")
print("Status code: {}").format(response.status_code)
print("Error message: {}").format(response.json()['error'])
exit()
else:
print("Custom dimension \"{}\" created as id: {}").format(display_name, \
response.json()['customDimension']['id'])
return(response.json()['customDimension']['id'])
def push(self, environ):
ct = environ.get('CONTENT_TYPE')
stream = environ['wsgi.input']
content = stream.read(int(environ['CONTENT_LENGTH']))
if ct == 'application/json':
try:
task = json.loads(content if PY2 else content.decode('utf-8'))
except:
return Error('400 BAD REQUEST', 'invalid-encoding', 'Can\'t decode body')
elif ct == 'application/x-msgpack':
try:
task = msgpack.loads(content, encoding='utf-8')
except:
return Error('400 BAD REQUEST', 'Can\'t decode body')
else:
return Error('400 BAD REQUEST', 'invalid-content-type',
'Content must be json or msgpack')
if not task.get('queue'):
return Error('400 BAD REQUEST', 'bad-params', 'queue required')
if not task.get('name'):
return Error('400 BAD REQUEST', 'name required')
return {'id': self.manager.push(**task).id}
def _get_external_data(url):
result = {}
try:
# urlopen might fail if it runs into redirections,
# because of Python issue #13696. Fixed in locators
# using a custom redirect handler.
resp = urlopen(url)
headers = resp.info()
ct = headers.get('Content-Type')
if not ct.startswith('application/json'):
logger.debug('Unexpected response for JSON request: %s', ct)
else:
reader = codecs.getreader('utf-8')(resp)
#data = reader.read().decode('utf-8')
#result = json.loads(data)
result = json.load(reader)
except Exception as e:
logger.exception('Failed to get external data for %s: %s', url, e)
return result
def test_pydist():
"""Make sure pydist.json exists and validates against our schema."""
# XXX this test may need manual cleanup of older wheels
import jsonschema
def open_json(filename):
return json.loads(open(filename, 'rb').read().decode('utf-8'))
pyMeta_schema = open_json(resource_filename('wheel.test',
'pydist-schema.json'))
valid = 0
for dist in ("simple.dist", "complex-dist"):
basedir = pkg_resources.resource_filename('wheel.test', dist)
for (dirname, subdirs, filenames) in os.walk(basedir):
for filename in filenames:
if filename.endswith('.whl'):
whl = ZipFile(os.path.join(dirname, filename))
for entry in whl.infolist():
if entry.filename.endswith('/Metadata.json'):
pyMeta = json.loads(whl.read(entry).decode('utf-8'))
jsonschema.validate(pyMeta, pyMeta_schema)
valid += 1
assert valid > 0, "No Metadata.json found"
def test_load_save(self):
self.wk.data = json.loads(wheel_json)
self.wk.add_signer('+', '67890')
self.wk.add_signer('scope', 'abcdefg')
self.wk.trust('epocs', 'gfedcba')
self.wk.trust('+', '12345')
self.wk.save()
del self.wk.data
self.wk.load()
signers = self.wk.signers('scope')
self.assertTrue(signers[0] == ('scope', 'abcdefg'), self.wk.data['signers'])
self.assertTrue(signers[1][0] == '+', self.wk.data['signers'])
trusted = self.wk.trusted('epocs')
self.assertTrue(trusted[0] == ('epocs', 'gfedcba'))
self.assertTrue(trusted[1][0] == '+')
self.wk.untrust('epocs', 'gfedcba')
trusted = self.wk.trusted('epocs')
self.assertTrue(('epocs', 'gfedcba') not in trusted)
def read_json_file(file_path):
'''
Args:
1. file_path: File path for a json file.
File should be similar to the format -
https://gist.github.com/pandeydivesh15/2012ab10562cc85e796e1f57554aca33
Returns:
data: A list of dicts. Each dict contains timing info for a spoken word(or punctuation).
'''
with open(file_path, 'r') as f:
data = json.loads(f.read())['words']
# for line in f:
# temp = json.loads(line)
# temp['start'] = None if temp['start'] == 'NA' else float(temp['start'])
# temp['end'] = None if temp['end'] == 'NA' else float(temp['end'])
# try:
# temp['word'] = temp['word'].encode('ascii')
# except KeyError:
# temp['punctuation'] = temp['punctuation'].encode('ascii')
# data.append(temp)
return data
def package(data, volatile=False):
"""Package data for training / evaluation."""
data = map(lambda x: json.loads(x), data)
dat = map(lambda x: map(lambda y: dictionary.word2idx[y], x['text']), data)
maxlen = 0
for item in dat:
maxlen = max(maxlen, len(item))
targets = map(lambda x: x['label'], data)
maxlen = min(maxlen, 500)
for i in range(len(data)):
if maxlen < len(dat[i]):
dat[i] = dat[i][:maxlen]
else:
for j in range(maxlen - len(dat[i])):
dat[i].append(dictionary.word2idx['<pad>'])
dat = Variable(torch.LongTensor(dat), volatile=volatile)
targets = Variable(torch.LongTensor(targets), volatile=volatile)
return dat.t(), targets
def get_watchlist_id_by_name(watchlistsdict):
"""
For each watchlist name specified in the config file,find the
associated watchlist ID.
NOTE: We trigger on watchlist IDs,and not on watchlist names
"""
global cbtoken
global cbserver
headers = {'X-AUTH-TOKEN': cbtoken}
r = requests.get("https://%s/api/v1/watchlist" % (cbserver),
headers=headers,
verify=False)
parsed_json = json.loads(r.text)
for watchlist in parsed_json:
for key, value in watchlistsdict.iteritems():
if watchlist['name'].lower() == key.lower():
watchlistsdict[key] = watchlist['id']
def get_watchlist_id_by_name(watchlistsdict):
"""
For each watchlist name specified in the config file,and not on watchlist names
"""
headers = {'X-AUTH-TOKEN': cbtoken}
r = requests.get("https://%s/api/v1/watchlist" % (cbserver),
headers=headers,
verify=False)
parsed_json = json.loads(r.text)
for watchlist in parsed_json:
for key, value in watchlistsdict.iteritems():
if watchlist['name'].lower() == key.lower():
watchlistsdict[key] = int(watchlist['id'])
def Parse(text, message, ignore_unkNown_fields=False):
"""Parses a JSON representation of a protocol message into a message.
Args:
text: Message JSON representation.
message: A protocol buffer message to merge into.
ignore_unkNown_fields: If True,do not raise errors for unkNown fields.
Returns:
The same message passed as argument.
Raises::
ParseError: On JSON parsing problems.
"""
if not isinstance(text, six.text_type): text = text.decode('utf-8')
try:
if sys.version_info < (2, 7):
# object_pair_hook is not supported before python2.7
js = json.loads(text)
else:
js = json.loads(text, object_pairs_hook=_DuplicateChecker)
except ValueError as e:
raise ParseError('Failed to load JSON: {0}.'.format(str(e)))
return ParseDict(js, ignore_unkNown_fields)
def multiple_policies(self, policies, neighbor):
"""Creates a new policy that applies list of policies to it.
:param policies: list of policies that you want applied to a single policy
:param neighbor: the neighbor you are going to apply these policies (used for naming)
:type policies: list
:type neighbor: str
:return: Name of the policy that is created
:rtype: str
"""
policy_name = neighbor.replace('.', '_')
policy_name = 'multi_policy_' + policy_name
shell = '{"openconfig-routing-policy:routing-policy": {"policy-deFinitions": {"policy-deFinition": [{"name": "%s","statements": {"statement": []}}]}}}' % policy_name
shell = json.loads(shell, object_pairs_hook=OrderedDict)
conditions = shell['openconfig-routing-policy:routing-policy']['policy-deFinitions']['policy-deFinition'][0]['statements']['statement']
for policy in policies:
policy_nm = 'Policy_' + policy
json_policy = '{"name": "%s","conditions": {"call-policy": "%s"}}' % (policy_nm, policy)
json_policy = json.loads(json_policy, object_pairs_hook=OrderedDict)
conditions.append(json_policy)
multi_policy = json.dumps(shell)
print(self.merge_config(multi_policy))
return policy_name
def parse_cookie(cookie, securekey):
logger.info (">> parse cookie : %s" % cookie)
parts = cookie.split('.')
part1 = parts[0]
part2 = '' if len(parts) < 2 else parts[1]
try:
text = str(base64.b64decode(part1.encode('ascii')), encoding='utf-8')
except:
logger.info ("decode cookie Failed")
return None
logger.info ("cookie content : %s" % text)
thatpart2 = hashlib.md5((text+securekey).encode('ascii')).hexdigest()
logger.info ("hash from part1 : %s" % thatpart2)
logger.info ("hash from part2 : %s" % part2)
if part2 == thatpart2:
result = json.loads(text)['name']
else:
result = None
logger.info ("parse from cookie : %s" % result)
return result
def name_error():
quotafile = open(fspath+"/global/sys/quotainfo", 'r')
quotas = json.loads(quotafile.read())
quotafile.close()
if quotas['default'] == 'fundation':
quotas['default'] = 'foundation'
quotafile = open(fspath+"/global/sys/quotainfo",'w')
quotafile.write(json.dumps(quotas))
quotafile.close()
groupfile = open(fspath+"/global/sys/quota", 'r')
groups = json.loads(groupfile.read())
groupfile.close()
for group in groups:
if group['name'] == 'fundation':
group['name'] = 'foundation'
groupfile = open(fspath+"/global/sys/quota",'w')
groupfile.write(json.dumps(groups))
groupfile.close()
users = User.query.filter_by(user_group = 'fundation').all()
for user in users:
user.user_group = 'foundation'
db.session.commit()
def recover_allclusters(self):
logger.info("recovering all vclusters for all users...")
usersdir = self.fspath+"/global/users/"
auth_key = env.getenv('AUTH_KEY')
res = post_to_user("/master/user/groupinfo/", {'auth_key':auth_key})
#logger.info(res)
groups = json.loads(res['groups'])
quotas = {}
for group in groups:
#logger.info(group)
quotas[group['name']] = group['quotas']
for user in os.listdir(usersdir):
for cluster in self.list_clusters(user)[1]:
logger.info ("recovering cluster:%s for user:%s ..." % (cluster, user))
#res = post_to_user('/user/uid/',{'username':user,'auth_key':auth_key})
recover_info = post_to_user("/master/user/recoverinfo/", {'username':user,'auth_key':auth_key})
uid = recover_info['uid']
groupname = recover_info['groupname']
input_rate_limit = quotas[groupname]['input_rate_limit']
output_rate_limit = quotas[groupname]['output_rate_limit']
self.recover_cluster(cluster, user, uid, input_rate_limit, output_rate_limit)
logger.info("recovered all vclusters for all users")
def get_clustersetting(self, clustername, username, containername, allcontainer):
clusterpath = self.fspath + "/global/users/" + username + "/clusters/" + clustername
if not os.path.isfile(clusterpath):
logger.error("cluster file: %s not found" % clustername)
return [False, "cluster file not found"]
infofile = open(clusterpath, 'r')
info = json.loads(infofile.read())
infofile.close()
cpu = 0
memory = 0
disk = 0
if allcontainer:
for container in info['containers']:
if 'setting' in container:
cpu += int(container['setting']['cpu'])
memory += int(container['setting']['memory'])
disk += int(container['setting']['disk'])
else:
for container in info['containers']:
if container['containername'] == containername:
if 'setting' in container:
cpu += int(container['setting']['cpu'])
memory += int(container['setting']['memory'])
disk += int(container['setting']['disk'])
return [True, {'cpu':cpu, 'memory':memory, 'disk':disk}]
def diff_containers(self):
[status, localcontainers] = self.list_containers()
globalpath = self.fspath+"/global/users/"
users = os.listdir(globalpath)
globalcontainers = []
for user in users:
clusters = os.listdir(globalpath+user+"/clusters")
for cluster in clusters:
clusterfile = open(globalpath+user+"/clusters/"+cluster, 'r')
clusterinfo = json.loads(clusterfile.read())
for container in clusterinfo['containers']:
if container['host'] == self.addr:
globalcontainers.append(container['containername'])
both = []
onlylocal = []
onlyglobal = []
for container in localcontainers:
if container in globalcontainers:
both.append(container)
else:
onlylocal.append(container)
for container in globalcontainers:
if container not in localcontainers:
onlyglobal.append(container)
return [both, onlylocal, onlyglobal]
def groupList(*args, **kwargs):
'''
Usage: list(cur_user = token_from_auth)
List all groups for an administrator
'''
groupfile = open(fspath+"/global/sys/quota",'r')
groups = json.loads(groupfile.read())
groupfile.close()
quotafile = open(fspath+"/global/sys/quotainfo",'r')
quotas = json.loads(quotafile.read())
quotafile.close()
result = {
"success": 'true',
"groups": groups,
"quotas": quotas['quotainfo'],
"default": quotas['default'],
}
return result
def groupQuery(self, *args, **kwargs):
'''
Usage: groupQuery(name = XXX,cur_user = token_from_auth)
List a group for an administrator
'''
groupfile = open(fspath+"/global/sys/quota",'r')
groups = json.loads(groupfile.read())
groupfile.close()
for group in groups:
if group['name'] == kwargs['name']:
result = {
"success":'true',
"data": group['quotas'],
}
return result
else:
return {"success":False, "reason":"Group does not exist"}
def quotaadd(*args, **kwargs):
form = kwargs.get('form')
quotaname = form.get("quotaname")
default_value = form.get("default_value")
hint = form.get("hint")
if (quotaname == None):
return { "success":'false', "reason": "Empty quota name"}
if (default_value == None):
default_value = "--"
groupfile = open(fspath+"/global/sys/quota",'r')
groups = json.loads(groupfile.read())
groupfile.close()
for group in groups:
group['quotas'][quotaname] = default_value
groupfile = open(fspath+"/global/sys/quota",'w')
groupfile.write(json.dumps(groups))
groupfile.close()
quotafile = open(fspath+"/global/sys/quotainfo",'r')
quotas = json.loads(quotafile.read())
quotafile.close()
quotas['quotainfo'].append({'name':quotaname, 'hint':hint})
quotafile = open(fspath+"/global/sys/quotainfo",'w')
quotafile.write(json.dumps(quotas))
quotafile.close()
return {"success":'true'}
def get_billing_history(vnode_name):
clusters_dir = env.getenv("FS_PREFIX")+"/global/users/"+get_owner(vnode_name)+"/clusters/"
if os.path.exists(clusters_dir):
clusters = os.listdir(clusters_dir)
for cluster in clusters:
clusterpath = clusters_dir + cluster
if not os.path.isfile(clusterpath):
continue
infofile = open(clusterpath, 'r')
info = json.loads(infofile.read())
infofile.close()
if 'billing_history' not in info or vnode_name not in info['billing_history']:
continue
return info['billing_history'][vnode_name]
default = {}
default['cpu'] = 0
default['mem'] = 0
default['disk'] = 0
default['port'] = 0
return default
# the thread to collect data from each worker and store them in monitor_hosts and monitor_vnodes
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。