Python sys 模块,exit() 实例源码
我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用sys.exit()。
def error(self, message, ensure_ascii=False):
"""Stop analyzer with an error message. Changing ensure_ascii can be helpful when stucking
with ascii <-> utf-8 issues. Additionally,the input as returned,too. Maybe helpful when dealing with errors.
:param message: Error message
:param ensure_ascii: Force ascii output. Default: False"""
analyzerInput = self.__input
if 'password' in analyzerInput.get('config', {}):
analyzerInput['config']['password'] = 'REMOVED'
if 'key' in analyzerInput.get('config', {}):
analyzerInput['config']['key'] = 'REMOVED'
if 'apikey' in analyzerInput.get('config', {}):
analyzerInput['config']['apikey'] = 'REMOVED'
if 'api_key' in analyzerInput.get('config', {}):
analyzerInput['config']['api_key'] = 'REMOVED'
json.dump({'success': False,
'input': analyzerInput,
'errorMessage': message},
self.fpoutput,
ensure_ascii=ensure_ascii)
# Force exit after error
sys.exit(1)
def get_named_set(lang_codes, feature_set):
if feature_set == 'id':
return get_id_set(lang_codes)
if feature_set not in FEATURE_SETS:
print("ERROR: Invalid feature set " + feature_set, file=sys.stderr)
sys.exit()
filename, source, prefix = FEATURE_SETS[feature_set]
feature_database = np.load(filename)
lang_codes = [ get_language_code(l, feature_database) for l in lang_codes ]
lang_indices = [ get_language_index(l, feature_database) for l in lang_codes ]
feature_names = get_feature_names(prefix, feature_database)
feature_indices = [ get_feature_index(f, feature_database) for f in feature_names ]
source_index = get_source_index(source, feature_database)
feature_values = feature_database["data"][lang_indices,:,:][:,feature_indices,source_index]
feature_values = feature_values.squeeze(axis=2)
return feature_names, feature_values
def __get_api_conf(self, sfile, conf_name):
full_path = Fun.get_file_in_directory_full_path(sfile)
print full_path
if not os.path.exists(full_path):
print("Error: Cannot get config file")
sys.exit(-1)
sfile = full_path
conf = ConfigParser.ConfigParser()
conf.read(sfile)
print conf.sections()
try:
self.url = conf.get(conf_name, "url")
self.access_token = conf.get(conf_name, "access_token")
self.api_token = conf.get(conf_name, "api_token")
except Exception, e:
print("Error: " + str(e))
sys.exit(-1)
# ????
def show_help():
clear()
title_text_colour = db.get_config("title_text_colour")
title_back_colour = db.get_config("title_back_colour")
option_menu_colour = db.get_config("option_menu_colour")
print(colorize.aplicar(1, title_text_colour, title_back_colour)
+ tr("Help") + colorize.reset())
string = colorize.aplicar(1, option_menu_colour)
string += "\n" + tr("You can select an option with "
"the given number or write 4 shortcuts:")
string += "\n" + tr("back or b -> Return to the prevIoUs option.")
string += "\n" + tr("help or h -> Show help.")
string += "\n" + tr("exit or e or Ctrl+C -> Finish execution script.")
string += "\n" + tr("Tasks or t -> Execute the tasks added to the list.")
print(string + colorize.reset())
pause("\n")
def __call__(self, *args, **kwargs):
"""Trap ``SIGTERM`` and call wrapped function."""
self._caught_signal = None
# Register handler for SIGTERM,then call `self.func`
self.old_signal_handler = signal.getsignal(signal.SIGTERM)
signal.signal(signal.SIGTERM, self.signal_handler)
self.func(*args, **kwargs)
# Restore old signal handler
signal.signal(signal.SIGTERM, self.old_signal_handler)
# Handle any signal caught during execution
if self._caught_signal is not None:
signum, frame = self._caught_signal
if callable(self.old_signal_handler):
self.old_signal_handler(signum, frame)
elif self.old_signal_handler == signal.SIG_DFL:
sys.exit(0)
def __get_param(self, name, default=None, message=None):
"""Extract a specific parameter from given source.
:param source: Python dict to search through
:param name: Name of the parameter to get. JSON-like Syntax,e.g. `config.username` at first,but in recursive
calls a list
:param default: Default value,if not found. Default: None
:param message: Error message. If given and name not found,exit with error. Default: None"""
if isinstance(name, str):
name = name.split('.')
if len(name) == 0:
# The name is empty,return the source content
return source
else:
new_source = source.get(name[0])
if new_source is not None:
return self.__get_param(new_source, name[1:], default, message)
else:
if message is not None:
self.error(message)
return default
def __init__(self, file):
self.file = file
if file == '':
self.infile = sys.stdin
elif file.lower().startswith('http://') or file.lower().startswith('https://'):
try:
if sys.hexversion >= 0x020601F0:
self.infile = urllib23.urlopen(file, timeout=5)
else:
self.infile = urllib23.urlopen(file)
except urllib23.HTTPError:
print('Error accessing URL %s' % file)
print(sys.exc_info()[1])
sys.exit()
elif file.lower().endswith('.zip'):
try:
self.zipfile = zipfile.ZipFile(file, 'r')
self.infile = self.zipfile.open(self.zipfile.infolist()[0], 'r', C2BIP3('infected'))
except:
print('Error opening file %s' % file)
print(sys.exc_info()[1])
sys.exit()
else:
try:
self.infile = open(file, 'rb')
except:
print('Error opening file %s' % file)
print(sys.exc_info()[1])
sys.exit()
self.ungetted = []
def GetUserCredentials():
"""Prompts the user for a username and password."""
try:
login = None
password = None
if login is None:
login = rawInput("Login: ")
if password is None:
password = rawInput("Password: ", True)
except (KeyboardInterrupt, SystemExit), e:
if e.message:
tools.exit(e.message)
else:
tools.exit
return (login, password)
def confirm(message):
printLine(message)
try:
while True:
answer = rawInput("Yes/No: ")
if answer.lower() in ["yes", "ye", "y"]:
return True
if answer.lower() in ["no", "n"]:
return False
failureMessage('Incorrect answer "%s",'
'please try again:\n' % answer)
except (KeyboardInterrupt, e:
if e.message:
tools.exit(e.message)
else:
tools.exit
def login():
# Try to login or sleep/wait until logged in,or exit if user/pass wrong
NotLoggedIn = True
while NotLoggedIn:
try:
reddit = praw.Reddit(
user_agent=credsUserAgent,
client_id=credsClientID,
client_secret=credsClientSecret,
username=credsUserName,
password=credsPassword)
print_and_log("Logged in")
NotLoggedIn = False
except praw.errors.InvalidUserPass:
print_and_log("Wrong username or password", error=True)
exit(1)
except Exception as err:
print_and_log(str(err), error=True)
time.sleep(5)
return reddit
def main():
try:
logger = configure_logger()
path_to_config_file, export_formats, export_profiles_to_list, loop_enabled = parse_command_line_arguments(logger)
sc_client, settings = configure(logger, path_to_config_file, export_formats)
if export_profiles_to_list is not None:
show_export_profiles_and_exit(export_profiles_to_list, sc_client)
if loop_enabled:
loop(logger, sc_client, settings)
else:
sync_exports(logger, settings, sc_client)
logger.info('Completed sync process,exiting')
except KeyboardInterrupt:
print("Interrupted by user,exiting.")
sys.exit(0)
def parseFile(self, inputfile):
try:
with open(inputfile) as f:
allEntries=[]
for line in f:
if line[0]=='#':
pass
else:
if len(line.split())==1:
allEntries.append(line.strip())
else:
raise
return allEntries
except:
print "Invalid file formatting!"
sys.exit()
def analyzeHostfile(self, hostfile):
try:
with open(hostfile) as f:
allHosts=[]
for line in f:
if line[0]=='#':
pass
else:
if len(line.split())==1:
allHosts.append(line.strip())
else:
raise
return allHosts
except:
print "Invalid host file formatting!"
sys.exit()
def analyzeHostfile(self, hostfile):
try:
with open(hostfile) as f:
allHosts=[]
for line in f:
if line[0]=='#':
pass
else:
if len(line.split())==3:
# Host Port Protocol
allHosts.append(line.split())
else:
raise
return allHosts
except:
print "Invalid host file formatting!"
sys.exit()
def start(self):
"""
Start the daemon
"""
# Check for a pidfile to see if the daemon already runs
try:
pf = file(self.pidfile,'r')
pid = int(pf.read().strip())
pf.close()
except IOError:
pid = None
if pid:
message = "pidfile %s already exist. Daemon already running?\n"
sys.stderr.write(message % self.pidfile)
sys.exit(1)
# Start the daemon
self.daemonize()
self.run()
def start(self):
"""
Start the daemon
"""
# Check for a pidfile to see if the daemon already runs
try:
pf = file(self.pidfile,'r')
pid = int(pf.read().strip())
pf.close()
except IOError:
pid = None
if pid:
message = "pidfile %s already exist. Daemon already running?\n"
sys.stderr.write(message % self.pidfile)
sys.exit(1)
# Start the daemon
self.daemonize()
self.run()
def handle(self, **kwargs):
path = kwargs['path']
# With DEBUG on this will DIE.
settings.DEBUG = False
# figure out which path we want to use.
years = ["2016", "2015", "2014", "2013", "2012", "2011"]
directories = [('tl_%s_us_state' % year, year) for year in years]
tiger_file = ""
for (directory, year) in directories:
if os.path.exists(os.path.join(path, directory)):
print('Found %s files.' % year)
tiger_file = os.path.join(path, directory + "/" + directory + ".shp")
break
if not tiger_file:
print('Could not find files.')
exit()
print("Start States: %s" % datetime.datetime.Now())
state_import(tiger_file, year)
print("End States: %s" % datetime.datetime.Now())
def handle(self, "2011"]
directories = [('tl_%s_us_county' % year, directory + "/" + directory + ".shp")
break
if not tiger_file:
print('Could not find files.')
exit()
print("Start Counties: %s" % datetime.datetime.Now())
county_import(tiger_file, year)
print("End Counties: %s" % datetime.datetime.Now())
def forwarder(tasks, interval, batch_size, dest):
'''Forward items from one storage to another.'''
from .utils import RunFlag, load_manager, redis_client
from .store import QueueStore
log = logging.getLogger('dsq.forwarder')
if not tasks and not source:
print('--tasks or --source must be provided')
sys.exit(1)
s = QueueStore(redis_client(source)) if source else load_manager(tasks).queue
d = QueueStore(redis_client(dest))
run = RunFlag()
while run:
batch = s.take_many(batch_size)
if batch['schedule'] or batch['queues']:
try:
d.put_many(batch)
except Exception:
s.put_many(batch)
log.exception('Forward error')
raise
else:
time.sleep(interval)
def main():
"""Small main program"""
import sys, getopt
try:
opts, args = getopt.getopt(sys.argv[1:], 'deut')
except getopt.error as msg:
sys.stdout = sys.stderr
print(msg)
print("""usage: %s [-d|-e|-u|-t] [file|-]
-d,-u: decode
-e: encode (default)
-t: encode and decode string 'Aladdin:open sesame'"""%sys.argv[0])
sys.exit(2)
func = encode
for o, a in opts:
if o == '-e': func = encode
if o == '-d': func = decode
if o == '-u': func = decode
if o == '-t': test(); return
if args and args[0] != '-':
with open(args[0], 'rb') as f:
func(f, sys.stdout.buffer)
else:
func(sys.stdin.buffer, sys.stdout.buffer)
def test():
import signal
t = AsyncFactory('inet:127.0.0.1:5000' , MilterProtocol)
def sigHandler(num , frame):
t.close()
sys.exit(0)
signal.signal(signal.SIGINT , sigHandler)
t.run()
# }}}
def do_full_login(account):
lock_network.acquire()
time.sleep(locktime)
lock_network.release()
if account['type'] == 'ptc':
login_ptc(account)
elif account['type'] == 'google':
login_google(account)
new_session(account)
else:
lprint('[{}] Error: Login type should be either ptc or google.'.format(account['num']))
sys.exit()
cursor_accs = db_accs.cursor()
while True:
try:
cursor_accs.execute("INSERT OR REPLACE INTO accounts VALUES(?,?,?)", [account['user'], account['access_token'], account['access_expire_timestamp'], account['api_url'], 0, '0', '0'])
db_accs.commit()
return
except sqlite3.OperationalError as e:
lprint('[-] sqlite operational error: {},account: {} retrying...'.format(e, account['user']))
except sqlite3.InterfaceError as e:
lprint('[-] sqlite interface error: {}, account['user']))
def __init__(self, root=None, transform=None, target_transform=None):
self.env = lmdb.open(
root,
max_readers=1,
readonly=True,
lock=False,
readahead=False,
meminit=False)
if not self.env:
print('cannot creat lmdb from %s' % (root))
sys.exit(0)
with self.env.begin(write=False) as txn:
nSamples = int(txn.get('num-samples'))
self.nSamples = nSamples
self.transform = transform
self.target_transform = target_transform
def main(argv):
parser = build_cli_parser()
opts, args = parser.parse_args(argv)
if not opts.server_url or not opts.token or not opts.group_id:
print "Missing required param; run with --help for usage"
sys.exit(-1)
# build a cbapi object
#
cb = cbapi.CbApi(opts.server_url, token=opts.token, ssl_verify=opts.ssl_verify)
#check if the given group_id truly corresponds to one of the existing sensor groups
does_exist = False
for group in cb.group_enum():
if int(opts.group_id) == int(group['id']):
does_exist = True
if does_exist:
config = cb.group_datasharing_del_all(opts.group_id)
for key in config.keys():
print "%-20s : %s" % (key, config[key])
else:
sys.exit(-1)
def main(argv):
parser = build_cli_parser()
opts, args = parser.parse_args(argv)
if not opts.server_url or not opts.token:
print "Missing required param; run with --help for usage"
sys.exit(-1)
# build a cbapi object
#
cb = cbapi.CbApi(opts.server_url, ssl_verify=opts.ssl_verify)
builds = cb.get_builds()
for build in builds:
print ""
for key in build.keys():
print "%-20s : %s" % (key, build[key])
def main(argv):
parser = build_cli_parser()
opts, build[key])
def main(argv):
parser = build_cli_parser()
opts, args = parser.parse_args(argv)
if not opts.url or not opts.token or not opts.fname or not opts.type:
print "Missing required param."
sys.exit(-1)
if not opts.type in ["md5", "domain", "ipaddr"]:
print "UnkNown type: ", opts.type
sys.exit(-1)
# setup the CbApi object
cb = CBQuery(opts.url, opts.token, ssl_verify=opts.ssl_verify)
# get the IOCs to check; this is a list of strings,one indicator
# per line. strip off the newlines as they come in
vals = [val.strip() for val in open(opts.fname, "r")]
# check each!
cb.check(vals, opts.type, opts.detail)
def main(argv):
parser = build_cli_parser()
opts, args = parser.parse_args(argv)
if not opts.url or not opts.token or opts.query is None:
print "Missing required param; run with --help for usage"
sys.exit(-1)
# build a cbapi object
#
cb = cbapi.CbApi(opts.url, ssl_verify=opts.ssl_verify)
# perform a single threat report search
#
reports = cb.threat_report_search(opts.query)
print "%-20s : %s" % ('displayed Results', len(reports['results']))
print "%-20s : %s" % ('Total Results', reports['total_results'])
print "%-20s : %sms" % ('QTime', int(1000*reports['elapsed']))
print '\n'
# for each result
for report in reports['results']:
pprint.pprint(report)
print '\n'
def main(argv):
parser = build_cli_parser()
opts, ssl_verify=opts.ssl_verify)
# enumerate configured Feeds
#
Feeds = cb.Feed_enum()
# output a banner
#
print "%-3s %-25s %-8s %s" % ("Id", "Name", "Enabled", "Url")
print "%s+%s+%s+%s" % ("-"*3, "-"*27, "-"*10, "-"*31)
# output a row about each Feed
#
for Feed in Feeds:
print "%-3s| %-25s | %-8s | %s" % (Feed['id'], Feed['name'], Feed['enabled'], Feed['Feed_url'])
def main(argv):
parser = build_cli_parser()
opts, args = parser.parse_args(argv)
if not opts.url or not opts.token or not opts.id or not opts.query:
print "Missing required param; run with --help for usage"
sys.exit(-1)
# build a cbapi object
#
cb = cbapi.CbApi(opts.url, ssl_verify=opts.ssl_verify)
# edit the search query of the just-added watchlist
#
watchlist = { 'search_query': opts.query }
print "-> Modifying the watchlist query..."
cb.watchlist_modify(opts.id, watchlist)
print "-> Watchlist modified"
# get record describing this watchlist
#
print "-> Querying for watchlist information..."
watchlist = cb.watchlist(opts.id)
print "-> Watchlist queried; details:"
watchlist_output(watchlist)
def main(argv):
parser = build_cli_parser()
opts, args = parser.parse_args(argv)
if not opts.url or not opts.token or not opts.sensorid:
print "Missing required param; run with --help for usage"
sys.exit(-1)
# build a cbapi object
#
cb = cbapi.CbApi(opts.url, ssl_verify=opts.ssl_verify)
# enumerate sensors
#
sensor = cb.sensor(opts.sensorid)
# output
#
for key in sensor.keys():
print "%-35s : %s" % (key, sensor[key])
def main(argv):
parser = build_cli_parser()
opts, args = parser.parse_args(argv)
if not opts.url or not opts.token:
print "Missing required param; run with --help for usage"
sys.exit(-1)
# build a cbapi object
#
cb = cbapi.CbApi(opts.url, ssl_verify=opts.ssl_verify)
# if a period is specified,handle that specially
#
if 0 != opts.interval:
return query_forever(cb, opts.interval, opts.udp)
# grab the global statistics
#
backlog = cb.sensor_backlog()
# output
#
for key in backlog.keys():
print "%-35s : %s" % (key, backlog[key])
def main(argv):
parser = build_cli_parser()
opts, args = parser.parse_args(argv)
if not opts.server_url or not opts.token or not opts.group_id or not opts.config_id:
print "Missing required param; run with --help for usage"
sys.exit(-1)
# build a cbapi object
#
cb = cbapi.CbApi(opts.server_url, ssl_verify=opts.ssl_verify)
#check if the given group_id truly corresponds to one of the existing sensor groups
does_exist = False
for group in cb.group_enum():
if int(opts.group_id) == int(group['id']):
does_exist = True
if does_exist:
datasharing_config = cb.group_datasharing_del(opts.group_id, opts.config_id)
for key in datasharing_config.keys():
print "%-20s : %s" % (key, datasharing_config[key])
else:
sys.exit(-1)
def main(argv):
parser = build_cli_parser()
opts, args = parser.parse_args(argv)
if not opts.server_url or not opts.token or not opts.id:
print "Missing required param; run with --help for usage"
sys.exit(-1)
# build a cbapi object
#
cb = cbapi.CbApi(opts.server_url, ssl_verify=opts.ssl_verify)
actions = cb.Feed_action_enum(opts.id)
count = 1
for action in actions:
print ""
print "Action number: %s" % count
print "-"*50
count += 1
for key in action.keys():
print "%-20s : %s" % (key, action[key])
def main(argv):
parser = build_cli_parser()
opts, args = parser.parse_args(argv)
if not opts.server_url or not opts.token or not opts.Feedname:
print "Missing required param; run with --help for usage"
sys.exit(-1)
# build a cbapi object
#
cb = cbapi.CbApi(opts.server_url, ssl_verify=opts.ssl_verify)
id = cb.Feed_get_id_by_name(opts.Feedname)
if id is None:
print "-> No configured Feed with name '%s' found!" % (opts.Feedname)
sys.exit(-1)
sync_result = cb.Feed_synchronize(opts.Feedname, True)
print sync_result
def main(argv):
parser = build_cli_parser()
opts, args = parser.parse_args(argv)
if not opts.url or not opts.token or not opts.id:
print "Missing required param; run with --help for usage"
sys.exit(-1)
# build a cbapi object
#
cb = cbapi.CbApi(opts.url, ssl_verify=opts.ssl_verify)
# delete the watchlist
# for the purposes of this test script,hardcode the watchlist type,name,and query string
#
print "-> Deleting watchlist [id=%s]..." % (opts.id,)
watchlist = cb.watchlist_del(opts.id)
print "-> Watchlist deleted"
def main(argv):
parser = build_cli_parser()
opts, args = parser.parse_args(argv)
if not opts.server_url or not opts.token or (not opts.Feedname and not opts.Feedid):
print "Missing required param; run with --help for usage"
print "One of -f or -i must be specified"
sys.exit(-1)
# build a cbapi object
#
cb = cbapi.CbApi(opts.server_url, ssl_verify=opts.ssl_verify)
if not opts.Feedid:
id = cb.Feed_get_id_by_name(opts.Feedname)
if id is None:
print "-> No configured Feed with name '%s' found!" % (opts.Feedname)
return
else:
id = opts.Feedid
# delete the Feed
#
cb.Feed_del(id)
print "-> Feed deleted [id=%s]" % (id,)
def main(argv):
parser = build_cli_parser()
opts, args = parser.parse_args(argv)
if not opts.server_url or not opts.token or (not opts.Feedname and not opts.Feedid):
print "Missing required param; run with --help for usage"
print "One of -f or -i must be specified"
sys.exit(-1)
# build a cbapi object
#
cb = cbapi.CbApi(opts.server_url, ssl_verify=opts.ssl_verify)
if not opts.Feedid:
id = cb.Feed_get_id_by_name(opts.Feedname)
if id is None:
print "-> No configured Feed with name '%s' found!" % (opts.Feedname)
return
else:
id = opts.Feedid
old_Feed = cb.Feed_info(id)
#create a new updated Feed based on user input
# create
def main(argv):
parser = build_cli_parser()
opts, build[key])
def main(argv):
parser = build_cli_parser()
opts, ssl_verify=opts.ssl_verify)
#check if the given group_id truly corresponds to one of the existing sensor groups
does_exist = False
for group in cb.group_enum():
if int(opts.group_id) == int(group['id']):
does_exist = True
if does_exist:
datasharing_config = cb.group_datasharing_info(opts.group_id, datasharing_config[key])
else:
sys.exit(-1)
def main(argv):
parser = build_cli_parser()
opts, args = parser.parse_args(argv)
if not opts.server_url or not opts.token or not opts.id or not opts.description:
print "Missing required param; run with --help for usage"
sys.exit(-1)
# build a cbapi object
#
cb = cbapi.CbApi(opts.server_url, ssl_verify=opts.ssl_verify)
event = cb.event_update(opts.id, opts.description)
print ""
for key in event.keys():
print "%-20s : %s" % (key, event[key])
def main(argv):
parser = build_cli_parser()
opts, args = parser.parse_args(argv)
if not opts.server_url or not opts.token:
print "Missing required param; run with --help for usage"
sys.exit(-1)
# build a cbapi object
#
cb = cbapi.CbApi(opts.server_url, ssl_verify=opts.ssl_verify)
start = 0
pagesize=100
while True:
results = cb.alert_search(opts.query, rows=int(pagesize), start=start)
if len(results['results']) == 0: break
for result in results['results']:
pprint.pprint(result)
start = start + int(pagesize)
def main(argv):
parser = build_cli_parser()
opts, args = parser.parse_args(argv)
if not opts.server_url or not opts.token or not opts.investigation_id or not opts.description or not opts.start_date:
print "Missing required param; run with --help for usage"
sys.exit(-1)
# build a cbapi object
#
cb = cbapi.CbApi(opts.server_url, ssl_verify=opts.ssl_verify)
event = cb.event_add(opts.investigation_id, opts.description, opts.start_date)
print ""
print "-->Event Added:"
for key in event.keys():
print "%-20s : %s" % (key, event[key])
def main(argv):
parser = build_cli_parser()
opts, ssl_verify=opts.ssl_verify)
event = cb.event_del(opts.id)
print ""
for key in event.keys():
print "%-20s : %s" % (key, event[key])
def main(argv):
parser = build_cli_parser()
opts, args = parser.parse_args(argv)
if not opts.url or not opts.token or not opts.type or not opts.group or not opts.filename:
print "Missing required param; run with --help for usage"
sys.exit(-1)
# build a cbapi object
#
cb = cbapi.CbApi(opts.url, ssl_verify=opts.ssl_verify)
# download the installer package
#
print "-> Downloading..."
bytes = cb.sensor_installer(opts.type, opts.group)
print "-> Sensor Installer Package is %s bytes" % (len(bytes))
print "-> Download complete"
# save the instaler package to disk
#
print "-> Saving to %s..." % (opts.filename)
open(opts.filename, 'wb').write(bytes)
print "-> Complete"
def main():
if len(sys.argv) < 2:
sys.stderr.write("USAGE: %s measurement\n" % sys.argv[0])
sys.exit(1)
path = sys.argv[1]
with open(os.path.join(path, "Metadata.json")) as f:
Metadata = json.load(f)
start = date(Metadata["start"][:-1])
end = date(Metadata["start"][:-1])
print('open measurement "%s" from "%s" to "%s"', Metadata["name"], start, end)
for service in Metadata["services"]:
print('open service "%s"' % service["name"])
with open(os.path.join(path, service["filename"])) as csvfile:
r = csv.DictReader(csvfile, dialect=csv.excel_tab)
for row in r:
print(row["time"])
def __init__(self):
Toplevel.__init__(self)
self.escapeBindId = None # used to exit fullscreen
self.stopFunction = None # used to stop
self.geometry("+%d+%d" % (100, 100))
self.modal = False
self.blocking = False
# removed for python2.7
# def __getattr__(self,name):
# def handlerFunction(*args,**kwargs):
# print("UnkNown function:",args,kwargs)
# return handlerFunction
#####################################
# SimpleGrid Stuff
#####################################
# first row is used as a header
def start(self):
try:
pf = file(self.pidfile,'r')
pid = int(pf.read().strip())
pf.close()
except IOError:
pid = None
if pid:
message = "pidfile %s already exist. Daemon already running?\n"
sys.stderr.write(message % self.pidfile)
sys.exit(1)
print("Daemonizing")
self.daemonize()
self.run()
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。