Python json 模块,dumps() 实例源码
我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用json.dumps()。
def restartJobs(args, config): # Todo: reimplement
pipelinedbutils = Pipelinedbutils(config)
pipelineQueueUtils = PipelineQueueUtils('WAIT_Q')
if args.jobId:
request = json.loads(pipelinedbutils.getJobInfo(select=["request"], where={"job_id": args.jobId})[0].request)
msg = {
"job_id": args.jobId,
"request": request
}
pipelineQueueUtils.publish(json.dumps(msg))
if args.preempted:
preempted = pipelinedbutils.getJobInfo(select=["job_id", "request"], where={"current_status": "PREEMPTED"})
for p in preempted:
msg = {
"job_id": p.job_id,
"request": json.loads(p.request)
}
pipelineQueueUtils.publish(json.dumps(msg))
def add_user_devices(self, serial):
# (url,access_token,api_token) = self.get_api_conf()
api_url = self.url + "/api/v1/user/devices"
token = self.access_token + " " + self.api_token
data = {'serial': serial}
request = urllib2.Request(api_url, json.dumps(data))
request.add_header('Authorization', token)
request.add_header('Content-Type', 'application/json')
try:
urllib2.urlopen(request)
except Exception, e:
print e.code
print e.read()
# ?????????
def notify_listeners(content):
"""Send out to listening consumers."""
data = json.dumps({"event": "new", "id": content.id})
if content.content_type == ContentType.REPLY:
# Content reply
StreamConsumer.group_send("streams_content__%s" % content.parent.channel_group_name, data)
elif content.content_type == ContentType.SHARE:
# Share
# Todo do we need to do much?
pass
else:
# Public stream
if content.visibility == Visibility.PUBLIC:
StreamConsumer.group_send("streams_public", data)
# Tag streams
for tag in content.tags.all():
StreamConsumer.group_send("streams_tag__%s" % tag.channel_group_name, data)
# Profile streams
StreamConsumer.group_send("streams_profile__%s" % content.author.id, data)
StreamConsumer.group_send("streams_profile_all__%s" % content.author.id, data)
# Followed stream
followed_qs = Profile.objects.followers(content.author).filter(user__isnull=False)
for username in followed_qs.values_list("user__username", flat=True):
StreamConsumer.group_send("streams_followed__%s" % username, data)
def write_response(self, d):
# send header
self.send_response(200)
self.send_header('Content-type', 'text/json; charset=utf-8')
self.end_headers()
log('I', 'conn', 'Header sent.')
# send data
log('I', 'Sending data...')
json_str = json.dumps(d)
# print(json_str.encode('utf-8').decode('unicode-escape').replace('\n','\\n'))
self.wfile.write(json_str.replace('"', '\\"')
.decode('unicode-escape')
.encode('utf-8')
.replace('\n', '\\n'))
log('C', 'Data sent.')
# clean up
self.wfile.close()
log('C', 'Connection closed.')
def getESCXBalance(address):
try:
payload = {
"method": "get_balances",
"params": {
"filters":[{"field": "address", "op": "==", "value": address},
{"field": "asset", "value": "ESCX"}],
"filterop": "and"
},
"jsonrpc":"2.0",
"id":0
}
response = requests.post(url, data=json.dumps(payload), headers=headers, auth=auth)
json_data = response.json()
#quantity = json_data.quantity
return (json_data['result'].pop()['quantity']) / 100000000
except:
return 0;
def getFilesize(fileUuid, tokenFile=None, projectId=None):
if tokenFile:
filters = {
"op": "=",
"content": {
"field": "file_id",
"value": [fileUuid]
}
}
params = {
"filters": json.dumps(filters)
}
fileInfo = GDCDataUtils.query(tokenFile, "files", params=params)
return int(fileInfo.json()["data"]["hits"][0]["file_size"])
else:
bq = GoogleApiService.create('bq', 'v2')
body = {
"query": "SELECT SUM(a_file_size) FROM GDC_Metadata.GCS_join1 WHERE file_id = {fileUuid}".format(fileUuid=fileUuid)
}
results = bq.jobs().query(projectId=projectId, body=body).execute()
return results["rows"][results["rows"].keys()[0]][0]
def constructGCSFilePath(fileUuid, tokenFile):
filters = {
"op": "=",
"content": {
"field": "file_id",
"value": [fileUuid]
}
}
params = {
"filters": json.dumps(filters)
}
query = "?expand=cases.project"
fileInfo = GDCDataUtils.query(tokenFile, query=query, params=params).json()
pprint.pprint(fileInfo)
return "{project}/{strategy}/{platform}/{uuid}/{filename}".format(
project=fileInfo["data"]["hits"][0]["cases"][0]["project"]["project_id"],
strategy=str(fileInfo["data"]["hits"][0]["experimental_strategy"]),
platform=str(fileInfo["data"]["hits"][0]["platform"]),
uuid=str(fileUuid),
filename=str(fileInfo["data"]["hits"][0]["file_name"])
)
def unsetrange(self, keys=None, prefix=""):
"""
Remove a range of keys starting with a common prefix,from the database
entirely.
:param list keys: List of keys to remove.
:param str prefix: Optional prefix to apply to all keys in ``keys``
before removing.
"""
if keys is not None:
keys = ['%s%s' % (prefix, key) for key in keys]
self.cursor.execute('delete from kv where key in (%s)' % ','.join(['?'] * len(keys)), keys)
if self.revision and self.cursor.rowcount:
self.cursor.execute(
'insert into kv_revisions values %s' % ','.join(['(?,?,?)'] * len(keys)),
list(itertools.chain.from_iterable((key, self.revision, json.dumps('DELETED')) for key in keys)))
else:
self.cursor.execute('delete from kv where key like ?',
['%s%%' % prefix])
if self.revision and self.cursor.rowcount:
self.cursor.execute(
'insert into kv_revisions values (?,?)',
['%s%%' % prefix, json.dumps('DELETED')])
def json_context(context, raise_error=False):
"""Generate a JS <script> tag from context
Serializes ``context["json_context"]`` into JSON and generates a JS
script to attach it to ``window.context``
If ``raise_error`` is False,values in context that are not
JSON serialisable will be converted to string. Otherwise,it
raises TypeError
:param context: Current view context
:param raise_error: Control whether to raise an error on non-JSON serialisable values
:return: ``<script>window.context = {<context["json_context"]>}</script>``
"""
if not context.get("json_context"):
return ""
json_default = None if raise_error else lambda obj: str(obj)
json_dump = json.dumps(context["json_context"], default=json_default)
return mark_safe("<script>window.context = %s;</script>" % json_dump)
def write_response(self, '\\"').decode('unicode-escape').encode('utf-8').replace('\n', 'Connection closed.')
def connect(self, gateway):
self.gateway = gateway
if self.config['id'] is not None:
self.topic = gateway.topic + "/" + self.config['id']
else:
self.topic = gateway.topic + "/" + self.__class__.__name__
if self.config['gpio'] is not None:
self.topic = self.topic + "/" + str(self.config["gpio"])
payload = { 'connected': self.topic }
self.gateway.publish(gateway.topic, json.dumps(payload))
self.gateway.subscribe(self.topic+"/configure")
self.gateway.subscribe(self.topic+"/get")
self.gateway.subscribe(self.topic+"/set")
def generate(self, template_path, source_json_path, output_path):
print("Generating content at %s with template at %s,using key %s" % (
output_path, self.key_name))
data = []
with open(source_json_path) as f:
for line in f:
json_line = json.loads(line)
data_line = '(\'%s\',\n\'%s\')' % (
json_line[self.key_name], json.dumps(json_line))
data.append(str(data_line))
print(data)
with open(template_path) as f:
template = f.read()
with open(output_path, 'w') as write_file:
write_file.write(template)
for record in data:
write_file.write(record)
write_file.write(',\n')
write_file.seek(-2, 1)
write_file.truncate()
write_file.write(';')
def run(self):
data = self.getData()
value = {
data: {
"type": self.data_type
}
}
json_data = json.dumps(value)
post_data = json_data.encode('utf-8')
headers = {'Content-Type': 'application/json'}
try:
request = urllib2.Request('{}/hippocampe/api/v1.0/{}'.format(self.url, self.service), post_data, headers)
response = urllib2.urlopen(request)
report = json.loads(response.read())
self.report(report)
except urllib2.HTTPError:
self.error("Hippocampe: " + str(sys.exc_info()[1]))
except urllib2.URLError:
self.error("Hippocampe: service is not available")
except Exception as e:
self.unexpectedError(e)
def setUp(self):
tempFile = tempfile.NamedTemporaryFile()
self.fileServerDir = tempFile.name
tempFile.close()
os.mkdir(self.fileServerDir)
os.environ['PYUPDATER_FILESERVER_DIR'] = self.fileServerDir
privateKey = ed25519.SigningKey(PRIVATE_KEY.encode('utf-8'),
encoding='base64')
signature = privateKey.sign(six.b(json.dumps(VERSIONS, sort_keys=True)),
encoding='base64').decode()
VERSIONS['signature'] = signature
keysFilePath = os.path.join(self.fileServerDir, 'keys.gz')
with gzip.open(keysFilePath, 'wb') as keysFile:
keysFile.write(json.dumps(KEYS, sort_keys=True))
versionsFilePath = os.path.join(self.fileServerDir, 'versions.gz')
with gzip.open(versionsFilePath, 'wb') as versionsFile:
versionsFile.write(json.dumps(VERSIONS, sort_keys=True))
os.environ['WXUPDATEDEMO_TESTING'] = 'True'
from wxupdatedemo.config import CLIENT_CONfig
self.clientConfig = CLIENT_CONfig
self.clientConfig.PUBLIC_KEY = PUBLIC_KEY
def setUp(self):
tempFile = tempfile.NamedTemporaryFile()
self.fileServerDir = tempFile.name
tempFile.close()
os.mkdir(self.fileServerDir)
os.environ['PYUPDATER_FILESERVER_DIR'] = self.fileServerDir
privateKey = ed25519.SigningKey(PRIVATE_KEY.encode('utf-8'), sort_keys=True))
os.environ['WXUPDATEDEMO_TESTING'] = 'True'
from wxupdatedemo.config import CLIENT_CONfig
self.clientConfig = CLIENT_CONfig
self.clientConfig.PUBLIC_KEY = PUBLIC_KEY
self.clientConfig.APP_NAME = APP_NAME
def get_audit_actions(self, date_modified, offset=0, page_length=100):
"""
Get all actions created after a specified date. If the number of actions found is more than 100,this function will
page until it has collected all actions
:param date_modified: ISO formatted date/time string. Only actions created after this date are are returned.
:param offset: The index to start retrieving actions from
:param page_length: How many actions to fetch for each page of action results
:return: Array of action objects
"""
logger = logging.getLogger('sp_logger')
actions_url = self.api_url + 'actions/search'
response = self.authenticated_request_post(
actions_url,
data=json.dumps({
"modified_at": {"from": str(date_modified)},
"offset": offset,
"status": [0, 10, 50, 60]
})
)
result = self.parse_json(response.content) if response.status_code == requests.codes.ok else None
self.log_http_status(response.status_code, 'GET actions')
if result is None or None in [result.get('count'), result.get('offset'), result.get('total'), result.get('actions')]:
return None
return self.get_page_of_actions(logger, result, offset, page_length)
def tokenize(self, language_tag, query):
id = self._next_id
self._next_id += 1
req = dict(req=id, utterance=query, languageTag=language_tag)
outer = Future()
self._requests[id] = outer
def then(future):
if future.exception():
outer.set_exception(future.exception())
del self._requests[id]
future = self._socket.write(json.dumps(req).encode())
future.add_done_callback(then)
return outer
def process_reply(reply, nested=False):
"""
Process a reply so it looks nice:
- if it's from the prototype yang integration,ditch the 'data' root
- convert from list to nested format if requested
- convert quotes to avoid escaping
"""
try:
# @@@ strip 'data' from yang output
reply['result'] = reply['result'][0]['data']
except Exception:
pass
# If required,and query successful,convert the reply['result'] format.
try:
if nested:
reply['result'] = reformat(reply['result'])
except KeyError:
# Fails silently if there is no 'reply['result']' in the reply['result'],this
# means an error occurred.
pass
# @@@ cheesily try to avoid \" everywhere,at cost of valid json
return re.sub(r'\\"', "'", json.dumps(reply))
def render_POST(self, request):
paths = request.args['paths[]']
def got_data(responses):
reply = {}
for path, data in zip(paths, responses):
try:
reply[path] = data['result']
except KeyError:
reply[path] = data['error']
request.sdata.add_to_push_queue('explorer', text=dumps(reply))
request.sdata.log('got reply {}'.format(paths))
reqs = map(request.sdata.api.get_schema, paths)
d = defer.gatherResults(reqs)
d.addCallback(got_data)
request.setHeader('Content-Type', 'application/json')
return '{}'
def render_POST(self, request):
request.setHeader('Content-Type', 'application/json')
pq = request.sdata.drain_push_queue()
if len(pq) > 0:
return json.dumps(pq)
else:
def finish_later(pq):
try:
request.write(json.dumps(pq))
request.finish()
except exceptions.RuntimeError as e:
print("### can't send push queue: ", e)
request.sdata.restore_push_queue(pq)
request.sdata.add_pending_push_queue_dispatch(finish_later)
request.notifyFinish().addErrback(lambda _: request.sdata.remove_pending_push_queue_dispatch())
return server.NOT_DONE_YET
def update_json(self, request):
interface = request.args['interface'][0]
base_path = "RootCfg.InterfaceConfiguration(" \
"['act','{}'])".format(interface)
cfg = OrderedDict()
cfg[base_path + '.Description'] = request.args['description'][0]
cfg[base_path + '.IPV4Network.Addresses.Primary'] = \
OrderedDict((('Address', request.args['ipv4_addr'][0]),
('Netmask', request.args['ipv4_mask'][0])))
#[request.args['ipv4_addr'][0],
# request.args['ipv4_mask'][0]]
extra_cli = ["interface {} ".format(interface) + x
for x in request.args['extra_cli'][0].split('\n') if len(x) > 0]
cfg_json = OrderedDict((('sets', cfg), ('cli_sets', extra_cli)))
request.sdata.set_text('#manage_intf_json', json.dumps(cfg_json, indent=4))
request.sdata.highlight('#manage_intf_json')
return base_path, cfg, extra_cli
def render_POST(self, request):
file_path = request.args['file_path'][0]
data = request.args['file_contents'][0]
def got_reply(reply):
request.sdata.add_to_push_queue('write_file',
text=dumps(reply),
filename=file_path)
request.sdata.log('got reply id {}'.format(reply['id']))
def got_error(error):
error_code = error.getErrorMessage()
traceback = error.getTraceback()
request.sdata.add_to_push_queue('error',
error=error_code,
traceback=traceback,
tab='write_file')
d = request.sdata.api.write_file(file_path, data)
d.addCallback(got_reply)
d.addErrback(got_error)
request.setHeader('Content-Type', 'application/json')
return '{}'
def getESCXBalance(address):
try:
payload = {
"method": "get_balances", auth=auth)
json_data = response.json()
#quantity = json_data.quantity
return (json_data['result'].pop()['quantity']) / 100000000
except:
return 0;
def dump_schedule(tasks):
"""Dump schedule content"""
from .utils import load_manager
manager = load_manager(tasks)
count = 5000
offset = 0
while True:
items = manager.queue.get_schedule(offset, count)
if not items:
break
for ts, queue, item in items:
print(datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%s'),
queue,
json.dumps(item, ensure_ascii=False, sort_keys=True),
sep='\t')
offset += count
def main():
import argparse
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
logger.addHandler(logging.StreamHandler(sys.stdout))
parser = argparse.ArgumentParser(description="Linux distro info tool")
parser.add_argument(
'--json',
'-j',
help="Output in machine readable format",
action="store_true")
args = parser.parse_args()
if args.json:
logger.info(json.dumps(info(), indent=4, sort_keys=True))
else:
logger.info('Name: %s', name(pretty=True))
distribution_version = version(pretty=True)
if distribution_version:
logger.info('Version: %s', distribution_version)
distribution_codename = codename()
if distribution_codename:
logger.info('Codename: %s', distribution_codename)
def sign(payload, keypair):
"""Return a JWS-JS format signature given a JSON-serializable payload and
an Ed25519 keypair."""
get_ed25519ll()
#
header = {
"alg": ALG,
"jwk": {
"kty": ALG, # alg -> kty in jwk-08.
"vk": native(urlsafe_b64encode(keypair.vk))
}
}
encoded_header = urlsafe_b64encode(binary(json.dumps(header, sort_keys=True)))
encoded_payload = urlsafe_b64encode(binary(json.dumps(payload, sort_keys=True)))
secured_input = b".".join((encoded_header, encoded_payload))
sig_msg = ed25519ll.crypto_sign(secured_input, keypair.sk)
signature = sig_msg[:ed25519ll.SIGNATUREBYTES]
encoded_signature = urlsafe_b64encode(signature)
return {"recipients":
[{"header":native(encoded_header),
"signature":native(encoded_signature)}],
"payload": native(encoded_payload)}
def query_forever(cb, interval, udp):
while True:
try:
sensors = cb.sensors()
for sensor in sensors:
summary = {}
summary['computer_name'] = sensor['computer_name'].strip()
summary['id'] = sensor['id']
summary['computer_sid'] = sensor['computer_sid'].strip()
summary['num_storefiles_bytes'] = sensor['num_storefiles_bytes']
summary['num_eventlog_bytes'] = sensor['num_eventlog_bytes']
output(json.dumps(summary), udp)
except Exception, e:
print e
pass
time.sleep(float(interval))
return
def main():
'''
To not use tls we need to do 2 things.
1. Comment the variables creds and options out
2. Remove creds and options CiscoGRPcclient
ex: client = CiscoGRPcclient('11.1.1.10',57777,10,'vagrant','vagrant')
'''
creds = open('ems.pem').read()
options = 'ems.cisco.com'
client = CiscoGRPcclient('127.0.0.1', 57777, 'vagrant', creds, options)
#Test 1: Test Get config json requests
path = '{"Cisco-IOS-XR-ip-static-cfg:router-static": [null]}'
try:
err, result = client.getconfig(path)
if err:
print(err)
print(json.dumps(json.loads(result)))
except AbortionError:
print(
'Unable to connect to local Box,check your gRPC destination.'
)
def on_success(self, data):
''' Called when we detect an event through the streaming API.
The base class version looks for quoted tweets and for each one it
finds,we write out a text file that contains the ID of the tweet
that mentions us.
The other (cron-job) version of your bot will look for any files with the
correct extension (identified by `kStreamFileExtension`) in its
HandleQuotes() method and favorite^H^H^H^H like those tweets.
See https://dev.twitter.com/streaming/userstreams
'''
# for Now,all we're interested in handling are events.
if 'event' in data:
# Dump the data into a JSON file for the other cron-process to
# handle the next time it wakes up.
fileName = os.path.join(self.path, "{0}{1}".format(
uuid4().hex, kStreamFileExtension))
with open(fileName, "wt") as f:
f.write(json.dumps(data).encode("utf-8"))
def add_port_mapping(self,username,clustername,node_name,node_ip,port,quota):
port_mapping_count = self.count_port_mapping(username)
if port_mapping_count >= int(quota['portmapping']):
return [False, 'Port mapping quota exceed.']
[status, clusterinfo] = self.get_clusterinfo(clustername, username)
host_port = 0
if self.distributedgw == 'True':
worker = self.nodemgr.ip_to_rpc(clusterinfo['proxy_server_ip'])
[success, host_port] = worker.acquire_port_mapping(node_name, node_ip, port)
else:
[success, host_port] = portcontrol.acquire_port_mapping(node_name, port)
if not success:
return [False, host_port]
if 'port_mapping' not in clusterinfo.keys():
clusterinfo['port_mapping'] = []
clusterinfo['port_mapping'].append({'node_name':node_name, 'node_ip':node_ip, 'node_port':port, 'host_port':host_port})
clusterfile = open(self.fspath + "/global/users/" + username + "/clusters/" + clustername, 'w')
clusterfile.write(json.dumps(clusterinfo))
clusterfile.close()
return [True, clusterinfo]
def create_image(self,containername,imagename,description,imagenum=10):
[status, info] = self.get_clusterinfo(clustername,username)
if not status:
return [False, "cluster not found"]
containers = info['containers']
for container in containers:
if container['containername'] == containername:
logger.info("container: %s found" % containername)
worker = xmlrpc.client.ServerProxy("http://%s:%s" % (container['host'], env.getenv("WORKER_PORT")))
if worker is None:
return [False, "The worker can't be found or has been stopped."]
res = worker.create_image(username,imagenum)
container['lastsave'] = datetime.datetime.Now().strftime("%Y-%m-%d %H:%M:%s")
container['image'] = imagename
break
else:
res = [False, "container not found"]
logger.error("container: %s not found" % containername)
clusterpath = self.fspath + "/global/users/" + username + "/clusters/" + clustername
infofile = open(clusterpath, 'w')
infofile.write(json.dumps(info))
infofile.close()
return res
def stop_cluster(self, clustername, username):
[status, username)
if not status:
return [False, "cluster not found"]
if info['status'] == 'stopped':
return [False, 'cluster is already stopped']
if self.distributedgw == 'True':
worker = self.nodemgr.ip_to_rpc(info['proxy_server_ip'])
worker.delete_route("/" + info['proxy_public_ip'] + '/go/'+username+'/'+clustername)
else:
proxytool.delete_route("/" + info['proxy_public_ip'] + '/go/'+username+'/'+clustername)
for container in info['containers']:
self.delete_all_port_mapping(username,container['containername'])
worker = xmlrpc.client.ServerProxy("http://%s:%s" % (container['host'], env.getenv("WORKER_PORT")))
if worker is None:
return [False, "The worker can't be found or has been stopped."]
worker.stop_container(container['containername'])
[status, username)
info['status']='stopped'
info['start_time']="------"
infofile = open(self.fspath+"/global/users/"+username+"/clusters/"+clustername, 'w')
infofile.write(json.dumps(info))
infofile.close()
return [True, "stop cluster"]
def quotaadd(*args, **kwargs):
form = kwargs.get('form')
quotaname = form.get("quotaname")
default_value = form.get("default_value")
hint = form.get("hint")
if (quotaname == None):
return { "success":'false', "reason": "Empty quota name"}
if (default_value == None):
default_value = "--"
groupfile = open(fspath+"/global/sys/quota",'r')
groups = json.loads(groupfile.read())
groupfile.close()
for group in groups:
group['quotas'][quotaname] = default_value
groupfile = open(fspath+"/global/sys/quota",'w')
groupfile.write(json.dumps(groups))
groupfile.close()
quotafile = open(fspath+"/global/sys/quotainfo",'r')
quotas = json.loads(quotafile.read())
quotafile.close()
quotas['quotainfo'].append({'name':quotaname, 'hint':hint})
quotafile = open(fspath+"/global/sys/quotainfo",'w')
quotafile.write(json.dumps(quotas))
quotafile.close()
return {"success":'true'}
def save_billing_history(vnode_name, billing_history):
clusters_dir = env.getenv("FS_PREFIX")+"/global/users/"+get_owner(vnode_name)+"/clusters/"
if not os.path.exists(clusters_dir):
return
clusters = os.listdir(clusters_dir)
vnode_cluster_id = get_cluster(vnode_name)
for cluster in clusters:
clusterpath = clusters_dir + cluster
if not os.path.isfile(clusterpath):
continue
infofile = open(clusterpath, 'r')
info = json.loads(infofile.read())
infofile.close()
if vnode_cluster_id != str(info['clusterid']):
continue
if 'billing_history' not in info:
info['billing_history'] = {}
info['billing_history'][vnode_name] = billing_history
infofile = open(clusterpath, 'w')
infofile.write(json.dumps(info))
infofile.close()
break
return
def login_required(func):
@wraps(func)
def wrapper(*args, **kwargs):
logger.info ("get request,path: %s" % request.path)
token = request.form.get("token", None)
if (token == None):
logger.info ("get request without token,path: %s" % request.path)
return json.dumps({'success':'false', 'message':'user or key is null'})
result = post_to_user("/authtoken/", {'token':token})
if result.get('success') == 'true':
username = result.get('username')
beans = result.get('beans')
else:
return result
#if (cur_user == None):
# return json.dumps({'success':'false','message':'token Failed or expired','Unauthorized': 'True'})
return func(username, beans, request.form, *args, **kwargs)
return wrapper
def save_cluster(user, form):
global G_vclustermgr
clustername = form.get('clustername', None)
if (clustername == None):
return json.dumps({'success':'false', 'message':'clustername is null'})
imagename = form.get("image", None)
description = form.get("description", None)
containername = form.get("containername", None)
isforce = form.get("isforce", None)
if not isforce == "true":
[status,message] = G_vclustermgr.image_check(user,imagename)
if not status:
return json.dumps({'success':'false','reason':'exists', 'message':message})
user_info = post_to_user("/user/selfQuery/", {'token':form.get("token")})
[status,message] = G_vclustermgr.create_image(user,user_info["data"]["groupinfo"]["image"])
if status:
logger.info("image has been saved")
return json.dumps({'success':'true', 'action':'save'})
else:
logger.debug(message)
return json.dumps({'success':'false', 'reason':'exceed', 'message':message})
def _submitSchema(self):
jobIdMap = {}
for p in self._schema["pipelines"]: # Add all jobs to the jobs table
jobIdMap[p["name"]] = self._pipelinedbutils.insertJob(None, None, p["name"], p["tag"], 0,
p["request"]["pipelineArgs"]["logging"]["gcsPath"],
None,
json.dumps(p["request"]))
for p in self._schema["pipelines"]: # Add dependency info to the job dependency table
if "children" in p.keys() and len(p["children"]) > 0:
for c in p["children"]:
parentId = jobIdMap[p["name"]]
childId = jobIdMap[c]
self._pipelinedbutils.insertJobDependency(parentId, childId)
for p in self._schema["pipelines"]: # schedule pipelines
parents = self._pipelinedbutils.getParentJobs(jobIdMap[p["name"]])
self._pipelinedbutils.updateJob(jobIdMap[p["name"]], setValues={"current_status": "WAITING"},
keyName="job_id")
# if the job is a root job,send the job request to the queue
msg = {
"job_id": jobIdMap[p["name"]],
"request": p["request"]
}
#pprint.pprint(msg)
if len(parents) == 0:
self._pipelineQueueUtils.publish(json.dumps(msg))
def stopPipeline(args, config):
pipelineQueueUtils = PipelineQueueUtils('CANCEL_Q')
pipelinedbutils = Pipelinedbutils(config)
if args.jobId:
jobInfo = pipelinedbutils.getJobInfo(select=["current_status", "operation_id", "job_id"],
where={"job_id": args.jobId})
elif args.pipeline:
jobInfo = pipelinedbutils.getJobInfo(select=["current_status",
where={"pipeline_name": args.pipeline})
elif args.tag:
jobInfo = pipelinedbutils.getJobInfo(select=["current_status",
where={"tag": args.tag})
for j in jobInfo:
if j.current_status == "RUNNING":
msg = {
"job_id": j.job_id,
"operation_id": j.operation_id
}
pipelineQueueUtils.publish(json.dumps(msg))
def editPipeline(args, config):
pipelinedbutils = Pipelinedbutils(config)
request = json.loads(pipelinedbutils.getJobInfo(select=["request"], where={"job_id": args.jobId})[0].request)
_, tmp = mkstemp()
with open(tmp, 'w') as f:
f.write("{data}".format(data=json.dumps(request, indent=4)))
if "EDITOR" in os.environ.keys():
editor = os.environ["EDITOR"]
else:
editor = "/usr/bin/nano"
if subprocess.call([editor, tmp]) == 0:
with open(tmp, 'r') as f:
request = json.load(f)
pipelinedbutils.updateJob(args.jobId, keyName="job_id", setValues={"request": json.dumps(request)})
else:
print "ERROR: there was a problem editing the request"
exit(-1)
def unset(self, key):
"""
Remove a key from the database entirely.
"""
self.cursor.execute('delete from kv where key=?', [key])
if self.revision and self.cursor.rowcount:
self.cursor.execute(
'insert into kv_revisions values (?,
[key, json.dumps('DELETED')])
def json(self):
"""Serialize the object to json"""
return json.dumps(self.data)
def request(self):
return json.dumps({'api-version': self.api_version, 'ops': self.ops,
'request-id': self.request_id})
def send_payload(self, payload):
self.send(json.dumps(payload))
def pretty_print(response):
try:
print(json.dumps(response.json(), sort_keys=True))
except TypeError:
print("Error: " + response.content.decode('UTF-8'))
def get_devices_list(self):
# (url,api_token) = self.get_api_conf("conf/stf.conf","renguoliang")
api_url = self.url + "/api/v1/devices"
token = self.access_token + " " + self.api_token
# ??STF?API???????????json??
try:
headers = {"Authorization": token}
req = requests.get(api_url, headers=headers)
# print req.text.encode('utf-8')
req_dict = json.loads(json.dumps(req.json(), encoding='utf-8'))
except Exception, e:
print("Error: " + str(e))
sys.exit(-1)
device_list = req_dict["devices"]
total_devices_num = len(device_list)
device_status_list = []
# ????????????stf_status.mmap????STF??
for device in device_list:
if device['present']:
if device['status'] == 3:
if device['ready']:
device_status_list.append(
{'serial': device['serial'].encode('utf-8'),
# ws://10.60.114.29:7548
'display_url': device['display']['url'].encode('utf-8'),
'manufacturer': device['manufacturer'].encode('utf-8'),
'using': device['using'],
'owner': device['owner'],
'model': device['model'].encode('utf-8'),
'version': device['version'].encode('utf-8'),
'apilevel': device['sdk'].encode('utf-8')})
return device_status_list
def publish(self, payload):
if payload is None:
return
if self.config["publish_changes_only"] is False or payload != self.last_payload:
try:
self.gateway.publish(self.topic, json.dumps(payload))
except Exception as e:
import sys
sys.print_exception(e)
self.last_payload = payload
def do_message_callback(self, b_topic, payload):
topic = b_topic.decode() #convert to string
Util.log(self,"received: topic '{}' payload: '{}'".format(topic,payload))
if topic == self.exit_topic:
raise ExitGatewayException()
for device in self.devices:
if device.do_message(topic, payload):
# Util.log(self,"consumed: topic '{}' payload: '{}' by device {}".format(topic,payload,json.dumps(device.config)))
break
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。