Python colorama.Fore 模块,RED 实例源码
我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用colorama.Fore.RED。
def print_validate_fail_detail(compare_object,key=''):
"""
print invalid reason.
@params:
compare_object - required : validation result object (result of compliace_report())
key - Optional : dict key of compliance_result
"""
if isinstance(compare_object,dict):
for key,dst in compare_object.items():
if isinstance(dst,dict):
# recursive
reason,result = print_validate_fail_detail(dst,key)
if not reason == None:
print(' '*9 , end='')
print(Fore.RED + 'INVALID! [type:{0}] {1} : {2}'.format(key,reason,result))
elif isinstance(dst,list):
for d in dst:
return key,d
elif isinstance(dst,int):
if not (isinstance(dst,bool)) or (key == 'actual_value'):
return key,dst
return None,None
def rollback_operation(device,config):
try:
device.discard_config()
replace_result = device.replace(config)
commit_result = device.commit()
rollback = replace_result & commit_result
print_bool_result(rollback,'Fore')
print('Rollbacked config!')
except Exception as err:
print(Back.RED + 'Rollback Error!!')
print(Back.RED + str(err))
finally:
device.close()
sys.exit()
def read_chat_history():
choice = select_a_friend()
if choice is not None:
print (Fore.BLUE + "Messages sent are shown in blue color \n" + Fore.GREEN + " Received Messages and Read Messages are shown in green color:"+Fore.RESET)
chats = friends[choice].chats
for chat in chats:
if chat.sent_by_me:
print (Fore.RED + str(chat['time']) + " " + Fore.BLUE + friends[choice]['name'] + " " + Fore.RESET + chat['message'])
else:
print (Fore.RED + str(chat['time']) + " " + Fore.GREEN + friends[choice]['name'] + " " + Fore.RESET + chat['message'])
else:
print "Wrong choice"
def ex_print (type, msg, ret):
colorama.init()
# Define color and style constants
c_error = Fore.RED
c_action = Fore.YELLOW
c_ok = Fore.GREEN
c_white = Fore.WHITE
s_br = Style.BRIGHT
s_reset = Style.RESET_ALL
message = {
"error": c_error + s_br,
"action": c_action,
"positive": c_ok + s_br,
"info": c_white + s_br,
"reset": s_reset
}
style = message.get(type, s_reset)
if ret == 0:
print(style + msg, end = "")
else:
print(style + msg)
def display_warning_to_user(self):
""" display warning to user
show user the commands + switches and ask if they
would still like to proceed or not """
user_message = Fore.CYAN + "\nYou are about to run the following commands:"
print(user_message)
for command in self.commands:
print(command)
user_message = Fore.CYAN + "\nOn the following devices:"
print(user_message)
for device in sorted(self.devices.keys()):
print(device)
user_message = Fore.RED + "\nAre you sure you wish to proceed? (y/n) " + Fore.WHITE
user_input = input(user_message)
if user_input.lower() == 'y':
return True
else:
return False
def tenant_report(tenant_config):
from drift.tenant import get_connection_string
conn_string = get_connection_string(tenant_config)
print "Tenant configuration for '{}' on tier '{}':" \
.format(tenant_config["name"], get_tier_name())
for k in sorted(tenant_config.keys()):
print " {} = {}".format(k, tenant_config[k])
print "Connection string:\n {}".format(conn_string)
print "Database check... "
db_error = db_check(tenant_config)
if db_error:
if "does not exist" in db_error:
print Fore.RED + " FAIL! DB does not exist"
print " You can create this database by running this " \
"command again with the action 'create'"
else:
print Fore.RED + " {}".format(db_error)
else:
print Fore.GREEN + " OK! Database is online and reachable"
def tenants_report():
print "The following tenants are registered in config on tier '{}':".format(get_tier_name())
config = load_config()
for tenant_config in config.get("tenants", []):
name = tenant_config["name"]
# Todo: Get rid of this
if name == "*":
continue
sys.stdout.write(" {}... ".format(name))
db_error = db_check(tenant_config)
if not db_error:
print Fore.GREEN + "OK"
else:
if "does not exist" in db_error:
print Fore.RED + "FAIL! DB does not exist"
else:
print Fore.RED + "Error: %s" % db_error
print "To view more @R_346_4045@ion about each tenant run this command again with the tenant name"
def process_search(options):
search_query = []
search_query.extend([hex_pattern(val.replace(' ', '')) for val in options.hex])
search_query.extend([ascii_pattern(val) for lst in options.a for val in lst])
search_query.extend([wide_pattern(val) for lst in options.w for val in lst])
result = BINOBJ.search(
search_query, limit=options.limit, exact=options.exact, test=options.test)
if 'error' in result:
print(Style.BRIGHT + Fore.RED + result['error']['message'])
return
if 'stats' in result:
show_stats_new(result['stats'], options.limit)
if len(result['results']) == 0:
return
# if len(result['results']) >= options.limit:
# print("Showing top {0} results:".format(options.limit))
# else:
# print("Results:")
show_results(result['results'], pretty_print=options.pretty_print)
def main(options):
if options.pretty_print and not HAS_TABULATE:
print(Style.BRIGHT + Fore.RED + "Pretty printing requires tabulate python module. (pip install tabulate)")
return
init_api(options)
cmd = options.commands
switcher = {
'search': process_search,
'hunt': process_hunt,
'sign': process_sign,
'classify': process_classify,
'Metadata': process_Metadata,
'demo': process_demo
}
# Get the function from switcher dictionary
process_fn = switcher.get(cmd)
# Execute the function
return process_fn(options)
def instant_giveaway(prize_name):
global won_giveaways, lost_giveaways
giveaway_Box = chromedriver.find_element_by_id(instant_Box)
giveaway_Box.click()
time.sleep(10)
get_result = chromedriver.find_element_by_id('title')
time.sleep(5)
if "you didn't win" in get_result.text:
lost_giveaways += 1
print(Fore.YELLOW + Style.BRIGHT + '\n **** You did not win: %s' % prize_name)
time.sleep(5)
elif "you're a winner!" in get_result.text:
won_giveaways += 1
print(Fore.GREEN + Style.BRIGHT + '\n **** Winner Winner! Chicken Dinner!: %s' % prize_name)
time.sleep(1)
playsound('.\sounds\\btyswt.mp3')
time.sleep(5)
else:
print(Fore.RED + Style.BRIGHT + '\n ---- UNRECOGNIZED RESPONSE FOR: %s' % prize_name)
# function to process the 'None' requirement giveaways.
def load_url(url, timeout):
# Build URL query to email signup page
urlquery = "http://" + url + "/m-users-a-email_list-job-add-email-" + targetEmail + "-source-2.htm"
print_out(Style.BRIGHT + Fore.WHITE + "Sending request to: " + url)
# Build the request
req = urllib.request.Request(
urlquery,
data=None,
headers={
'User-Agent': random.choice(useragents),
'Host': url
}
)
# Send
try:
f = urllib.request.urlopen(req)
print_out(Style.BRIGHT + Fore.GREEN + "Successfully sent!")
f.close()
except urllib.error.URLError as e:
print_out(Style.BRIGHT + Fore.RED + e.reason)
def _update_ui():
global timeout_count, result_codes, connection_error_count
print '\r',
for k, v in result_codes.iteritems():
print "%s:" % k,
if k == '200 OK':
print(Fore.LIGHTGREEN_EX),
else:
print(Fore.RED),
print "%s " % v,
print(Style.RESET_ALL),
if timeout_count > 0:
print('Timeouts: '+Fore.YELLOW + str(timeout_count) + Style.RESET_ALL) + ' ',
if connection_error_count >0:
print('Connection Errors: '+Fore.RED + str(connection_error_count) + Style.RESET_ALL),
sys.stdout.flush()
def __init__(self, ethers_path, static_ethers, wipe):
"""
:param ethers_path: path to the ethers file
:param static_ethers: path to static ethers file (only when wiping)
:param wipe: wipe the ethers
"""
self.ethers_path = ethers_path
self.wipe = wipe
self.ethers = {}
EthersManager.assert_writable(self.ethers_path)
if wipe:
print(Fore.RED + Style.BRIGHT + "The ethers file will be wiped!")
if static_ethers:
print(Fore.BLUE + "The static ethers will be loaded")
self.load_ethers(static_ethers)
else:
print(Fore.BLUE + "The ethers file will be created from scratch")
else:
self.load_ethers(self.ethers_path)
def add_contestant(self, mac, row, col, response):
try:
if not 1 <= int(row) <= 255: raise
if not 1 <= int(col) <= 255: raise
except:
response.status_code = 400
response.data = "Invalid row/col: row=%s col=%s" % (row, col)
return
ip = self.contestant_ip_format.replace('R', row).replace('C', col)
print(Fore.CYAN + "Contestant PC connected: MAC=%s IP=%s" % (mac, ip))
result = self.ethers_manager.add_ether(mac, ip)
if result:
print(Fore.RED + result)
response.data = result
response.status_code = 400
else:
response.data = ip
def add_worker(self, num, response):
try:
if not 1 <= int(num) <= 255: raise
except:
response.status_code = 400
response.data = "Invalid num: num=%s" % (num)
return
ip = self.worker_ip_format.replace('N', num)
print(Fore.BLUE + "Worker PC connected: MAC=%s IP=%s" % (mac, ip)
if result:
print(Fore.RED + result)
response.data = result
response.status_code = 400
else:
response.data = ip
def tick(game, template, is_json, no_color):
if not game:
raise click.BadParameter('Missing required parameter "game"')
matches = download_history(game)
if is_json:
click.echo(json.dumps(list(matches), indent=2, sort_keys=True))
return
template = template if template else DEFAULT_TEMPLATE_RECAP
template = Template(template)
for m in matches:
if no_color or not COLOR_ENABLED: # if color is disabled just stdout
print_match(m, template)
continue
if m['t1_score'] > m['t2_score']:
m['t1'] = Fore.GREEN + m['t1'] + Fore.RESET
m['t2'] = Fore.RED + m['t2'] + Fore.RESET
else:
m['t2'] = Fore.GREEN + m['t2'] + Fore.RESET
m['t1'] = Fore.RED + m['t1'] + Fore.RESET
print_match(m, template)
def createEntry(name, pdf_path, comment, bibtex):
bib = ''.join(bibtex)
#Todo Catch exceptions
try:
db = bibtexparser.loads(bib)
if db.entries:
parsedBib = db.entries[0]
else:
print(_F.RED, 'No citation for ', name, _F.RESET, sep='')
parsedBib = None
except Exception as e:
print('EX :', e.__class__.__name__)
print(e)
parsedBib = None
comment, tag_list, priority = parseComment(comment)
return Entry(name, parsedBib, priority, None, url=pdf_path, comment=comment)
def authorize_concept(self, concept):
if '2' not in concept.sf:
raise ValueError('No vocabulary code (2) given!')
if concept.sf['2'] in self.vocabularies:
vocab = self.vocabularies[concept.sf['2']]
else:
log.info(Fore.RED + '?' + Style.RESET_ALL + ' Could not authorize: %s', concept)
return
response = vocab.authorize_term(concept.term, concept.tag)
if response.get('id') is not None:
identifier = response.get('id')
if concept.sf.get('0'):
if concept.sf.get('0') == ANY_VALUE:
pass # ignore ANY_VALUE
elif identifier != concept.sf['0']:
identifier = pick_one('The $$0 value does not match the authority record id. ' +
'Please select which to use',
[concept.sf['0'], identifier])
concept.sf['0'] = identifier
log.info(Fore.GREEN + '?' + Style.RESET_ALL + ' Authorized: %s', concept)
else:
log.info(Fore.RED + '?' + Style.RESET_ALL + ' Could not authorize: %s', concept)
def facebook_login(driver, username, password):
print("\n\n\nLogin to Facebook...."),
sys.stdout.flush()
url = "http://www.facebook.com"
driver.get(url)
elem = driver.find_element_by_id("email")
elem.send_keys(username)
elem = driver.find_element_by_id("pass")
elem.send_keys(password)
elem.send_keys(Keys.RETURN)
time.sleep(1)
html_source = driver.page_source
if "Please re-enter your password" in html_source or "Incorrect Email" in html_source:
print(Fore.RED + "Incorrect Username or Password")
driver.close()
exit()
else:
print(Fore.GREEN + "Success\n")
return driver.get_cookies()
def defrag(self):
download_queue = self.handle.get_download_queue()
downloading = [piece['piece_index'] for piece in download_queue]
numerales = ""
pieces = self.status.pieces
for i, piece in enumerate(pieces):
numeral = Fore.GREEN + "#" if piece else Fore.RED + "#"
if i in downloading:
numeral = Fore.YELLOW + "v"
elif self._served_blocks is not None and self._served_blocks[i]:
numeral = Fore.BLUE + ">"
numeral += str(self.handle.piece_priority(i))
numerales += numeral
if numerales != "":
numerales = term.magenta("\nPieces download state:\n" + numerales)
return "%s\n" % numerales
def login(username, password):
echo('Downloading login page')
login_html = dA.get(url['login']).text
params = {
'validate_token' : get_validate_token(login_html),
'validate_key' : get_validate_key(login_html),
'username' : username,
'password' : password,
'ref' : url['login_ref']
}
echo('Logging in as %s' % username)
post = dA.post(url['login'], data=params)
post_html = post.text
if '"loggedIn":true' in post_html:
echo('Logged in as ' + username)
return True
else:
log('login_error.htm', post_html)
echo(Back.RED + post.url)
def insert_data(self, table, my_dict):
try:
cols = ','.join(my_dict.keys())
values = '","'.join(my_dict.values())
values = '"' + values + '"'
try:
# print "table:%s,cols:%s,values:%s." %(table,cols,values)
sql = "insert into %s (%s) values(%s)" % (table, cols, values)
# print "sql:",sql
result = self.cur.execute(sql)
self.db.commit()
if result:
return 1
else:
return 0
except MysqLdb.Error as e:
self.db.rollback()
if "key 'PRIMARY'" in e.args[1]:
print Fore.RED + self.get_current_time(), "???????????"
else:
print Fore.RED + self.get_current_time(), "????????? %d: %s" % (e.args[0], e.args[1])
except MysqLdb.Error as e:
print Fore.RED + self.get_current_time(), "????????%d: %s" % (e.args[0], e.args[1])
def TOR_PROC_CHECK():
isTorRunnin = False
TOR_INFO = {}
TOR_PROC = None
for proc in psutil.process_iter():
try:
pinfo = proc.as_dict(attrs=['pid', 'name'])
except psutil.NoSuchProcess:
pass
else:
if pinfo['name'] == "tor":
isTorRunnin = True
TOR_INFO['pid'] = pinfo['pid']
TOR_INFO['name'] = pinfo['name']
break
if isTorRunnin == True:
print ("[" + Fore.GREEN + Style.BRIGHT + "+" + Style.RESET_ALL + "]" + Fore.GREEN + Style.BRIGHT + " Tor is running." + Style.RESET_ALL)
TOR_PROC = psutil.Process(int(TOR_INFO['pid']))
return TOR_PROC
else:
print ("[" + Fore.RED + Style.BRIGHT + "-" + Style.RESET_ALL + "]" + Fore.RED + Style.BRIGHT + " Tor is not running." + Style.RESET_ALL)
exit()
def trains(self):
for raw_train in self.available_trains:
raw_train = raw_train['queryLeftNewDTO']
train_no = raw_train['station_train_code']
initial = train_no[0].lower()
if not self.options or initial in self.options:
train = [
train_no,
'\n'.join([Fore.GREEN + raw_train['from_station_name'] + Fore.RESET,
Fore.RED + raw_train['to_station_name'] + Fore.RESET]),
'\n'.join([Fore.GREEN + raw_train['start_time'] + Fore.RESET,
Fore.RED + raw_train['arrive_time'] + Fore.RESET]),
self._get_duration(raw_train),
raw_train['zy_num'],
raw_train['ze_num'],
raw_train['rw_num'],
raw_train['yw_num'],
raw_train['yz_num'],
raw_train['wz_num'],
]
yield train
def __data_parser__(self, data):
try:
if data['mods']['itemlist']['data']['auctions']:
search_results = data['mods']['itemlist']['data']['auctions']
return [{
'intro': result["raw_title"],
'price': float(result["view_price"]),
'delivery': colorful_text(result["view_fee"], Fore.RED)
if float(result["view_fee"]) > 0 else result["view_fee"],
'sales': int(result["view_sales"].split('?')[0]),
'belong': colorful_text("??", Fore.CYAN)
if result.get('shopcard', {}).get('isTmall', False) else "??",
'url': result["detail_url"]
} for result in search_results]
error('Ops,get no goods..')
return []
except KeyError:
error('Ops,some key error happened..')
return []
def Brute_Thread(ip,username,passwd):
ssh=paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
global n,flag,flag1
n=n+1
try:
ssh.connect(ip,username=username,password=passwd)
except paramiko.AuthenticationException:
print Fore.RED+"[-]Username: %s\tPassword: %s Failed."%(username,passwd) + Fore.RESET
else:
print Fore.GREEN+"\n********************************************************"
print "[#]Username: %s\tPassword: %s Found........!!!"%(username,passwd)
print "********************************************************"+Fore.RESET
flag=1
flag1=1
print Fore.RED+"\nFound correct password after %s attempts..." %n +Fore.RESET
return
ssh.close()
return
def ready_Dict(ip,filename):
global flag,n
f=open(filename,"r")
st=f.read()
wordlist=st.split('\n')
for i in wordlist:
if flag==0:
t=threading.Thread(Brute_Thread(ip,i))
t.start()
elif flag==1:
flag=0
break
if flag==1:
print Fore.RED+"\nFinished wordlist...%s words checked...password not found!!!" % n+Fore.RESET
n=0
f.close()
def getConfig():
try:
with open('%s/visionarypm.conf' % path) as f:
config = json.loads(f.read().strip())
if config['oLen'] < 16 or config['oLen'] > 64 or config['cost'] < 10 or config['cost'] > 20 or config['nwords'] > 16 or config['nwords'] < 4:
exit('Invalid config! Please delete the configuration file (%s) and a new one will be generated on the next run.' % (path + '/visionarypm.conf'))
return config, 1
except IOError:
config = get_defaults()
autosave = safe_input('Do you want to save this config? (Y/n) ').lower()
if autosave == 'yes' or autosave == 'y' or autosave == '':
print('\nAutosaving configuration...')
try:
with open('%s/visionarypm.conf' % path, 'a') as f:
f.write(json.dumps(config))
return config, 1
except:
print(color('Autosaving Failed! [Permission denied]\n', Fore.RED))
print('In order to save these settings,place %s' % color(json.dumps(config), Fore.YELLOW))
print('in %s' % (color('%s/visionarypm.conf' % path, Fore.YELLOW)))
return config, 0
except (KeyError, json.decoder.JSONDecodeError):
exit('Invalid config! Please delete the configuration file (%s) and a new one will be generated on the next run.' % (path + '/visionarypm.conf'))
def app_header(self):
header = '\n'
header += ' ??????????? ???? ?????? ???? ??? ?????? ??????? ??????????????? \n'
header += ' ????????????? ?????????????????? ??????????????????? ????????????????\n'
header += ' ??? ????????????????????????? ?????????????? ?????????? ????????\n'
header += ' ??? ???????????????????????????????????????? ????????? ????????\n'
header += ' ??????????? ??? ?????? ?????? ????????? ??????????????????????? ???\n'
header += ' ?????????? ?????? ?????? ???????? ??? ??????? ??????????? ???\n'
header2 = ' Connection Manager\n'
header3 = ' {}\n'.format('-' * 70)
header3 += ' Version : {}\n'.format(__version__)
header3 += ' Release date : {}\n'.format(__release_date__)
header3 += ' Github : https://github.com/fachrioktavian/CManager\n'
header3 += ' Dev by : {0} - ({1})\n'.format(__author__, __author_email__)
header3 += ' Official : https://dracos-linux.org\n'
header3 += ' {}\n'.format('-' * 70)
print (Fore.RED + Style.DIM + header + header2)
print (Fore.CYAN + Style.DIM + header3)
def __init__(self, settings, load = True):
""" Prepare the Manifest Builder. Optionally load @R_346_4045@ion from the given file. """
self.version = 2.0
self.data = {'@Meta':{'version': 0}}# Default to no version,which will be converted.
self.file = stringutil.normalize_file(settings.save_base()+'/Manifest.json.gz')
if load and os.path.isfile(self.file):
try:
with gzip.GzipFile(self.file, 'rb') as data_file:
self.data = json.loads(data_file.read().decode('utf8'))
except:
stringutil.print_color(Fore.RED, 'Failed to load Manifest at [%s]. Probably corrupt. Try removing the file.' % self.file)
raise
change, self.data = self.adapt(self.data)
while change:
change, self.data = self.adapt(self.data)
#
def get_wopi_test_endpoint(wopi_discovery_service_url):
logging.info("WOPI discovery Service Url: " + wopi_discovery_service_url)
discovery_service_response = requests.get(wopi_discovery_service_url)
try:
discovery_service_response.raise_for_status()
except requests.exceptions.HTTPError as exception:
print(Fore.RED + "Failed to retrieve WOPI discovery Service XML: Check Logs for more @R_346_4045@ion")
logging.critical("Failed to retrieve WOPI discovery Service XML - HTTP ErrorCode: ", exception.Code)
sys.exit(1)
try:
discovery_xml = ElementTree.fromstring(discovery_service_response.content)
wopi_test_endpoint_url = discovery_xml.find(WOPITESTAPPLICATION_NODE_PATH).attrib[
WOPITESTAPPLICATION_URLSRC_ATTRIBUTE]
except Exception as exception:
print(Fore.RED + "Failed to parse WOPI discovery Service XML: Check Logs for more @R_346_4045@ion")
logging.critical("Failed to parse WOPI discovery Service XML - Exception Details:", exception)
sys.exit(1)
return wopi_test_endpoint_url[:wopi_test_endpoint_url.find('?')]
def log(self, prefix, text, line=False):
Now = datetime.Now()
message = ""
if prefix == '?':
c = Fore.CYAN
elif prefix == '+':
c = Fore.GREEN
elif prefix == '-':
c = Fore.RED
elif prefix == '!':
c = Fore.YELLOW
c = Style.BRIGHT + c
e = Style.RESET_ALL + Fore.RESET
if line:
print c+"["+Now.strftime("%Y-%m-%d %H:%M:%s")+"]["+prefix+"] "+text+e
else :
print "["+Now.strftime("%Y-%m-%d %H:%M:%s")+"]["+c+prefix+e+"] "+text
def log(prefix, line=False):
Now = datetime.Now()
message = ""
if prefix == '?':
c = Fore.CYAN
elif prefix == '+':
c = Fore.GREEN
elif prefix == '-':
c = Fore.RED
elif prefix == '!':
c = Fore.YELLOW
c = Style.BRIGHT + c
e = Style.RESET_ALL + Fore.RESET
if line:
print c+"["+Now.strftime("%Y-%m-%d %H:%M")+"]["+prefix+"] "+text+e
else :
print "["+Now.strftime("%Y-%m-%d %H:%M")+"]["+c+prefix+e+"] "+text
def colorprint(text_color, bail_result=False):
'''
Colorprint text into the console. Call it as a curried function.
greenPrinter = colorprinter('GREEN')
greenPrinter('this will be green')
OR:
colorprint('GREEN')('this will be green')
'''
def printer(text):
''' This actually does the printing part. Allows for reusable color printers. '''
color = Fore.GREEN if text_color == 'GREEN' else Fore.RED
if not bail_result:
print(color + text)
print(Style.RESET_ALL)
else:
return color + text
return printer
def status_loop(self):
x = self.targets
status = ""
for i in range(0, len(x), 3):
a1 = "%d. %s (%s)" % (i+1, x[i]['team'], x[i]['ip'])
if i + 1 < len(x):
a2 = "%d. %s (%s)" % (i+2, x[i+1]['team'], x[i+1]['ip'])
else:
a2 = ""
if i + 2 < len(x):
a3 = "%d. %s (%s)" % (i+3, x[i+2]['team'], x[i+2]['ip'])
else:
a3 = ""
a1f = Fore.GREEN if x[i]['compromised'] else Fore.RED
if i + 1 < len(x):
a2f = Fore.GREEN if x[i+1]['compromised'] else Fore.RED
if i + 2 < len(x):
a3f = Fore.GREEN if x[i+2]['compromised'] else Fore.RED
a1 = a1f + a1.ljust(45, " ") + Style.RESET_ALL
a2 = a2f + a2.ljust(45, " ") + Style.RESET_ALL
a3 = a3f + a3.ljust(20, " ") + Style.RESET_ALL
status += ("%s%s%s\n" % (a1, a2, a3))
open("status", 'w').write(status)
self.loop.call_later(self.detect_interval, self.status_work)
def filter(self, record):
record.spent = record.relativeCreated // 1000
if iswindows:
record.fg = ''
elif record.levelname == 'DEBG':
record.fg = Fore.LIGHTBLACK_EX
elif record.levelname == 'INFO':
record.fg = Fore.LIGHTWHITE_EX
elif record.levelname == 'WARN':
record.fg = Fore.LIGHTYELLOW_EX
elif record.levelname == 'ERR ':
record.fg = Fore.LIGHTRED_EX
elif record.levelname == 'CRIT':
record.fg = Fore.RED
else:
record.fg = Fore.LIGHTWHITE_EX
return True
def trains(self):
for raw_train in self.available_trains:
raw_train = raw_train['queryLeftNewDTO']
train_no = raw_train['station_train_code']
initial = train_no[0].lower()
if not self.options or initial in self.options:
train = [
train_no,
]
yield train
def GetHTML(url):
init(autoreset=True)
print('\n' + '--------------------???????……')
# ??'User-Agent'
headers = ('User-Agent','Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML,like Gecko) Chrome/54.0.2840.99 Safari/537.36')
# ??????
opener = urllib.request.build_opener()
opener.addheaders = [headers]
try:
html = opener.open(url)
Project_bs = BeautifulSoup(html.read(), 'lxml')
html.close()
except (HTTPError,URLError) as e:
print(Fore.RED + '--------------------??????????????????????')
except socket.timeout as e:
print(Fore.RED + '--------------------????????????')
else:
return Project_bs
def GetHTML(url):
print('\n' * 2 + '--------------------???????……')
# ??'User-Agent'
headers = ('User-Agent',URLError) as e:
print(Fore.RED + '--------------------??????????????????????')
except socket.timeout as e:
print(Fore.RED + '--------------------????????????')
else:
return Project_bs
# ????????????'Geturls'
def compare_minion(self, test_case, minion):
actual_result = test_case.extract_actual_result()
if minion in actual_result:
if self._compare_type(test_case.expected_result, actual_result[minion]):
if test_case.operator == '=':
compare = self.comp(test_case.expected_result, actual_result[minion])
elif test_case.operator == '<':
compare = test_case.expected_result < actual_result[minion]
elif test_case.operator == '>':
compare = test_case.expected_result > actual_result[minion]
elif test_case.operator == 'not':
compare = test_case.expected_result != actual_result[minion]
else:
self.test_report_logger.warning('%s%s: Result type mismatch ---------%s', Fore.RED, test_case.name,
Fore.RESET)
compare = False
result = actual_result[minion]
else:
compare = False
result = None
return EvaluationResult(minion, result, compare)
def print_red(string):
if windows_client(): reinit()
print (Fore.RED + Style.BRIGHT + string + Style.RESET_ALL)
if windows_client(): deinit()
def print_error(lines):
"""Prints errors messages,enhanced graphical aspects."""
print Fore.RED + "---------------------------- Error -----------------------------\r"
for line in lines:
print Fore.RED + "| " + line + '\r'
print Fore.RED + "------------------------------------------------------------------\r" + Fore.WHITE
def print_bool_result(binary_result,color_spec):
"""
print boolean result.
@params:
binary_result - required : Ture or False boolean
color_spec - required : specifid colorama option. e.g. 'Back','Fore'
"""
if binary_result:
print(eval(color_spec).GREEN + '[OK]' , end=' ')
else:
print(eval(color_spec).RED + '[NG]' , end=' ')
def add_status(current_status_message):
global updated_status_message
if current_status_message is not None:
print "Your current status is " + current_status_message + "\n"
else:
print " provide some status"
# ask user for choosing from older messages.
default = raw_input("Do you want to select from the older status (y/n?)")
if default.upper() == "N":
new_status_message = raw_input("What status message do you want to set?")
# validating user's input
if len(new_status_message) > 0:
# adding new status to the list
STATUS_MESSAGES.append(new_status_message)
updated_status_message = new_status_message
print "Your updated status message is %s" % updated_status_message
else:
print Fore.RED + "You did not provide any status message"
elif default.upper() == "Y":
# counter for serial number of messages
item_position = 1
# printing all older messages
for message in STATUS_MESSAGES:
print str(item_position) + " . " + message
item_position = item_position + 1
# asking user's choice.
message_selection = int(raw_input("\nChoose from the above messages "))
# validating user's input
if len(STATUS_MESSAGES) >= message_selection:
updated_status_message = STATUS_MESSAGES[message_selection - 1]
print "Your updated message is: " + updated_status_message
else:
print Fore.RED + "Invalid choice. Try again. "
else:
print Fore.RED + "The option you chose is Invalid. "
return updated_status_message
def add_friend():
friend_obj = Spy_Info("Mr.", "Guest", 24, 3.6, True)
friend_obj.name = raw_input("Please add your friend's name: ")
if len(friend_obj.name) > 0:
# regular expression which matches only alphabets
pattern5 = '^[a-zA-Z\s]+$'
if re.match(pattern5, friend_obj.name) is not None:
# string matched
friend_obj.salutation = raw_input("Are they Mr or Ms ? ")
pattern6 = '^[Mm][r s]$'
if re.match(pattern6, friend_obj.salutation) is not None:
print "CHECKING .."
friend_obj.name =friend_obj.salutation + " " + friend_obj.name
friend_obj.age = raw_input("Age? ")
pattern7 = '^[0-9]+$'
if re.match(pattern7, friend_obj.age) is not None:
print "CHECKING..."
friend_obj.rating = raw_input("Spy rating")
pattern7 = '^[0-9]+\.[0-9]+$'
if re.match(pattern7, friend_obj.rating) is not None:
print "CHECKING.."
if len(friend_obj.name) > 0 and friend_obj.age > 12 < 50:
# add friend
friends.append(friend_obj)
return len(friends)
else:
print Fore.RED + " Sorry! Invalid entry. We cant add spy with the details you provided"
else:
print Fore.RED + " Input friend rating format is invalid"
else:
print Fore.RED + "Input friend's age format is invalid"
else:
print Fore.RED + "Input friend salutation's format is invalid. "
else:
print Fore.RED + "Input friend name's format is invalid"
else:
print Fore.RED + "Enter valid name to continue"
def error(message):
print(Fore.RED + "Error: " + message)
reset()
exit()
def compile_file (filename, language = None, args = ''):
print (args)
ret = compile.compile(filename, language = language, args = args)
if ret['status'] == compile.COMPILATION_CODE.ERROR:
print ('Compile: ' + Fore.RED + 'ERROR\n')
print (ret['stderr'])
exit(1)
else:
print ('Compile: ' + Fore.GREEN + 'OK\n')
def print_output(obj):
# to-do:
# Error while executing code
if obj['status'] != 0:
print ( '~ Test #{} - '.format(obj['testcase']) + Fore.RED + 'ERROR')
# In some cases the process spawn by cftool returns SIGSEGV (-11)
# and process.stderr is empty
if obj['stderr'] == '' and obj['status'] == -11:
print ('Process exited with SIGSEGV,probably because of a segmentation fault')
else:
print (obj['stderr'])
return
# split time between numbers and letters
m = re.split(r'(\d+)', obj['time'])
if int(m[1]) >= 5 and m[2] in ['s', 'm', 'h']:
print ( '~ Test #{} - '.format(obj['testcase']) + Fore.RED + '{}'.format(obj['time']) )
else:
print ( '~ Test #{} - {}'.format(obj['testcase'], obj['time']) )
stdout = obj['stdout']
expected = obj['expected']
if compare_outputs(stdout, expected):
print (Fore.GREEN + stdout)
print ('')
else:
print (Fore.RED + stdout)
print ('')
print (Fore.LIGHTBLACK_EX + 'Expected:')
print (Fore.LIGHTBLACK_EX + expected)
print ('')
def printIt( p, sha, type, repo, branch, login, message, filler):
#clean up comments that have \n in them
message = message.replace("\n"," ")
#print Fore.RED+sha[:SHACOLW].ljust(SHACOLW," ")+Fore.RESET,type[:EVENTCOLW].ljust(EVENTCOLW,filler),repo[:DFLTCOLW].ljust(DFLTCOLW,branch[:BRANCHCOLW].ljust(BRANCHCOLW,Fore.BLUE+login.ljust(DFLTCOLW,message.ljust(MSGCOLW," ")
line = Fore.RED+sha[:SHACOLW].ljust(SHACOLW," ")+Fore.RESET
line += " " + type[:EVENTCOLW].ljust(EVENTCOLW,filler)
line += " " + repo[:DFLTCOLW].ljust(DFLTCOLW,filler)
line += " " + branch[:BRANCHCOLW].ljust(BRANCHCOLW,filler)
line += " " + Fore.BLUE+login.ljust(DFLTCOLW," ")+Fore.RESET
line += " " + message.ljust(MSGCOLW," ")
line += "\n"
try:
p.stdin.write(line)
except:
# Stop loop on "Invalid pipe" or "Invalid argument".
# No sense in continuing with broken pipe.
exit(1)
return
#----------------------------------------------------------------------------------------------------------------------
#process arguments
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。