Python pyparsing 模块,Literal() 实例源码
我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用pyparsing.Literal()。
def parser(self):
join_type = (pp.Literal("LEFT") | pp.Literal("RIGHT") | pp.Literal("INNER") | pp.Literal("OUTER"))
node_name = pp.Word(pp.alphas, pp.alphanums + "_$")
col_name = pp.Word(pp.alphas, pp.alphanums + "_$")
col_name_list = pp.Group(pp.delimitedList(col_name, delim=","))
l_brac = pp.Suppress("[")
r_brac = pp.Suppress("]")
single_join = (join_type + pp.Suppress("(") + node_name + l_brac +
col_name_list + r_brac + pp.Suppress("==>") + node_name +
l_brac + col_name_list + r_brac + pp.Suppress(")"))
single_join.addParseAction(lambda x: self._add_join(join_type=x[0],
child_node_name=x[1],
child_cols=x[2],
parent_node_name=x[3],
parent_cols=x[4]))
join_block = pp.OneOrMore(single_join)
return join_block
def _build_parser():
date_literal = pp.Regex(r'(?P<year>\d{4})/(?P<month>\d{2})/(?P<day>\d{2})') \
.setParseAction(lambda s,l,t: schema.Date(t.year, t.month, t.day))
dollars_literal = pp.Regex(r'\$\d+(\.\d{2})') \
.setParseAction(lambda s,t: schema.Dollars(t[0]))
string_literal = (pp.QuotedString('"', escChar='\\') | pp.QuotedString("'", escChar='\\')) \
.setParseAction(lambda s,t: schema.String(t[0]))
literal = date_literal | dollars_literal | string_literal
ident = pp.Word(pp.alphas)
match_op = pp.oneOf(operator_map.keys())
match = ident + match_op + literal
assign_op = pp.Literal('=')
assign = ident + assign_op + literal
part = (match | assign).setParseAction(lambda s,t: [t])
rule = pp.delimitedList(part) + pp.LineEnd()
return rule
def __init__(self, calc = SimpleCalculator()):
self.exprStack = []
def pushStack(s, l, t):
self.exprStack.append(t[0])
integer = Word(nums).addParseAction(pushStack)
addop = Literal('+') | Literal('-')
mulop = Literal('*') | Literal('/')
lpar = Literal('(')
rpar = Literal(')')
expr = Forward()
atom = integer | lpar + expr + rpar
term = atom + ZeroOrMore((mulop + atom).addParseAction(pushStack))
expr << term + ZeroOrMore((addop + term).addParseAction(pushStack))
self.expr = expr + StringEnd()
self.opfun = {
'+' : (lambda a, b: calc.add(a,b)),
'-' : (lambda a, b: calc.sub(a,
'*' : (lambda a, b: calc.mul(a,
'/' : (lambda a, b: calc.div(a,b)) }
def anything_beetween(opener_and_closer):
"""Builds a (pyparsing) parser for the content inside delimiters.
Args:
opener_and_closer: a string containing two elements: opener and closer
Returns:
A (pyparsing) parser for the content inside delimiters.
"""
opener = pyparsing.Literal(opener_and_closer[0])
closer = pyparsing.Literal(opener_and_closer[1])
char_removal_mapping = dict.fromkeys(map(ord, opener_and_closer))
other_chars = unicode(string.printable).translate(char_removal_mapping)
word_without_delimiters = pyparsing.Word(other_chars).setName(
"other_chars")
anything = pyparsing.Forward()
delimited_block = opener + anything + closer
# pylint: disable=expression-not-assigned
anything << pyparsing.ZeroOrMore(
word_without_delimiters.setName("word_without_delimiters")
| delimited_block.setName("delimited_block")
)
# Combine all the parts into a single string.
return pyparsing.Combine(anything)
def anything_beetween(opener_and_closer):
"""Builds a (pyparsing) parser for the content inside delimiters.
Args:
opener_and_closer: a string containing two elements: opener and closer
Returns:
A (pyparsing) parser for the content inside delimiters.
"""
opener = pyparsing.Literal(opener_and_closer[0])
closer = pyparsing.Literal(opener_and_closer[1])
char_removal_mapping = dict.fromkeys(map(ord, opener_and_closer))
other_chars = unicode(string.printable).translate(char_removal_mapping)
word_without_delimiters = pyparsing.Word(other_chars).setName(
"other_chars")
anything = pyparsing.Forward()
delimited_block = opener + anything + closer
# pylint: disable=expression-not-assigned
anything << pyparsing.ZeroOrMore(
word_without_delimiters.setName("word_without_delimiters")
| delimited_block.setName("delimited_block")
)
# Combine all the parts into a single string.
return pyparsing.Combine(anything)
def parse_format(format):
deFinition = []
# define pattern grammar
variable_ptn = pp.QuotedString("{", endQuoteChar="}")("variable")
escape_open_ptn = pp.Literal("{{")("escape_open")
escape_close_ptn = pp.Literal("}}")("escape_close")
escape_ptn = escape_open_ptn | escape_close_ptn
literal_ptn = pp.Charsnotin("{}")("literal")
element_ptn = escape_ptn | variable_ptn | literal_ptn
for toks, start, end in element_ptn.leaveWhitespace().scanString(format):
try:
deFinition.append({
"literal": lambda: Literal(toks[0]),
"variable": lambda: Variable.create(toks[0]),
"escape_open": lambda: OpenBrace(),
"escape_close": lambda: CloseBrace(),
}[toks.items()[0][0]]())
except KeyError:
raise FormatStringError
return deFinition
def FromString(cls, desc):
"""Parse this stop condition from a string representation.
The string needs to match:
run_time number [seconds|minutes|hours|days|months|years]
Args:
desc (str): The description
Returns:
TimeBasedStopCondition
"""
parse_exp = Literal(u'run_time').suppress() + time_interval(u'interval')
try:
data = parse_exp.parseString(desc)
return TimeBasedStopCondition(data[u'interval'][0])
except ParseException:
raise ArgumentError(u"Could not parse time based stop condition")
def _create_simple_statements():
global binary, ident, rvalue, simple_statement, semi, comp, number, slot_id, callrpc_stmt, generic_statement, streamer_stmt, stream, selector
if simple_statement is not None:
return
Meta_stmt = Group(Literal('Meta').suppress() + ident + Literal('=').suppress() + rvalue + semi).setResultsName('Meta_statement')
require_stmt = Group(Literal('require').suppress() + ident + comp + rvalue + semi).setResultsName('require_statement')
set_stmt = Group(Literal('set').suppress() - (ident | number) - Literal("to").suppress() - (rvalue | binary) - Optional(Literal('as').suppress() + config_type) + semi).setResultsName('set_statement')
callrpc_stmt = Group(Literal("call").suppress() + (ident | number) + Literal("on").suppress() + slot_id + Optional(Literal("=>").suppress() + stream('explicit_stream')) + semi).setResultsName('call_statement')
streamer_stmt = Group(Optional(Literal("manual")('manual')) + Optional(oneOf(u'encrypted signed')('security')) + Optional(Literal(u'realtime')('realtime')) + Literal('streamer').suppress() -
Literal('on').suppress() - selector('selector') - Optional(Literal('to').suppress() - slot_id('explicit_tile')) - Optional(Literal('with').suppress() - Literal('streamer').suppress() - number('with_other')) - semi).setResultsName('streamer_statement')
copy_stmt = Group(Literal("copy").suppress() - Optional(oneOf("all count average")('modifier')) - Optional(stream('explicit_input') | number('constant_input')) - Literal("=>") - stream("output") - semi).setResultsName('copy_statement')
trigger_stmt = Group(Literal("trigger") - Literal("streamer") - number('index') - semi).setResultsName('trigger_statement')
simple_statement = Meta_stmt | require_stmt | set_stmt | callrpc_stmt | streamer_stmt | trigger_stmt | copy_stmt
# In generic statements,keep track of the location where the match started for error handling
locator = Empty().setParseAction(lambda s, t: l)('location')
generic_statement = Group(locator + Group(ZeroOrMore(Regex(u"[^{};]+")) + Literal(u';'))('match')).setResultsName('unparsed_statement')
def _create_block_bnf():
global block_bnf, time_interval, statement, block_id, stream
if block_bnf is not None:
return
trigger_clause = Group(stream_trigger | Group(stream).setResultsName('stream_always') | Group(ident).setResultsName('identifier'))
every_block_id = Group(Literal(u'every').suppress() - time_interval).setResultsName('every_block')
when_block_id = Group(Literal(u'when').suppress() + Literal("connected").suppress() - Literal("to").suppress() - slot_id).setResultsName('when_block')
latch_block_id = Group(Literal(u'when').suppress() - stream_trigger).setResultsName('latch_block')
config_block_id = Group(Literal(u'config').suppress() - slot_id).setResultsName('config_block')
on_block_id = Group(Literal(u'on').suppress() - trigger_clause.setResultsName('triggerA') - Optional((Literal("and") | Literal("or")) - trigger_clause.setResultsName('triggerB'))).setResultsName('on_block')
block_id = every_block_id | when_block_id | latch_block_id | config_block_id | on_block_id
block_bnf = Forward()
statement = generic_statement | block_bnf
block_bnf << Group(block_id + Group(Literal(u'{').suppress() + ZeroOrMore(statement) + Literal(u'}').suppress())).setResultsName('block')
def make_grammar():
"""Creates the grammar to be used by a spec matcher."""
# This is apparently how pyparsing recommends to be used,
# as http://pyparsing.wikispaces.com/share/view/644825 states that
# it is not thread-safe to use a parser across threads.
unary_ops = (
# Order matters here (so that '=' doesn't match before '==')
Literal("==") | Literal("=") |
Literal("!=") | Literal("<in>") |
Literal(">=") | Literal("<=") |
Literal(">") | Literal("<") |
Literal("s==") | Literal("s!=") |
# Order matters here (so that '<' doesn't match before '<=')
Literal("s<=") | Literal("s<") |
# Order matters here (so that '>' doesn't match before '>=')
Literal("s>=") | Literal("s>"))
or_ = Literal("<or>")
# An atom is anything not an keyword followed by anything but whitespace
atom = ~(unary_ops | or_) + Regex(r"\S+")
unary = unary_ops + atom
disjunction = OneOrMore(or_ + atom)
# Even-numbered tokens will be '<or>',so we drop them
disjunction.setParseAction(lambda _s, _l, t: ["<or>"] + t[1::2])
expr = disjunction | unary | atom
return expr
def parse_filter_str(self, filter_str):
"""
method to parse filter string
"""
prop = pp.WordStart(pp.alphas) + pp.Word(pp.alphanums +
"_").setResultsName("prop")
value = (pp.QuotedString("'") | pp.QuotedString('"') | pp.Word(
pp.printables, excludeChars=",")).setResultsName("value")
types_ = pp.oneOf("re eq ne gt ge lt le").setResultsName("types")
flags = pp.oneOf("C I").setResultsName("flags")
comma = pp.Literal(',')
quote = (pp.Literal("'") | pp.Literal('"')).setResultsName("quote")
type_exp = pp.Group(pp.Literal("type") + pp.Literal(
"=") + quote + types_ + quote).setResultsName("type_exp")
flag_exp = pp.Group(pp.Literal("flag") + pp.Literal(
"=") + quote + flags + quote).setResultsName("flag_exp")
semi_expression = pp.Forward()
semi_expression << pp.Group(pp.Literal("(") +
prop + comma + value +
pp.Optional(comma + type_exp) +
pp.Optional(comma + flag_exp) +
pp.Literal(")")
).setParseAction(
self.parse_filter_obj).setResultsName("semi_expression")
expr = pp.Forward()
expr << pp.operatorPrecedence(semi_expression, [
("not", 1, pp.opAssoc.RIGHT, self.not_operator),
("and", 2, pp.opAssoc.LEFT, self.and_operator),
("or", self.or_operator)
])
result = expr.parseString(filter_str)
return result
def parseformat(classname=None, formatstring=None):
attribmarker = (p.Literal('@')|p.Literal('!')).suppress()
cellseparator = '||'
concatmarker = p.Optional(p.Literal('+'))
attribgroup = attribmarker + concatmarker + p.Word(p.alphanums)
cells = []
_splitstring = [cell.strip() for cell in formatstring.split(cellseparator)]
for cell in _splitstring:
_scan = attribgroup.scanString(cell)
_templist = []
prestart = 0
end = 0
for match in _scan:
start = match[1]
end = match[2]
_start = cell[prestart:start]
if len(_start) > 0:
# conditional logic avoids empty leading output cells
_templist.append(om.Filler(_start))
_templist.append(om.AttributeMatch(cell[start + 1:end])) #,classname=classname))
prestart = end
# print('templist:',_templist)
_end = cell[end:]
if len(_end) > 0:
# conditional logic avoids empty trailing output cells
_templist.append(om.Filler(cell[end:]))
cells.append(_templist)
return cells
# --- static ---
def _build_input_source_parser(legalChars, commentInProgress):
"""Builds a PyParsing parser for alternate user input sources (from file,pipe,etc.)"""
input_mark = pyparsing.Literal('<')
input_mark.setParseAction(lambda x: '')
file_name = pyparsing.Word(legalChars + '/\\')
input_from = file_name('inputFrom')
input_from.setParseAction(replace_with_file_contents)
# a not-entirely-satisfactory way of distinguishing < as in "import from" from <
# as in "lesser than"
inputParser = input_mark + pyparsing.Optional(input_from) + pyparsing.Optional('>') + \
pyparsing.Optional(file_name) + (pyparsing.stringEnd | '|')
inputParser.ignore(commentInProgress)
return inputParser
def __init__(self, ffilter, queue_out):
FuzzQueue.__init__(self, queue_out)
Thread.__init__(self)
self.setName('filter_thread')
self.queue_out = queue_out
if PYPARSING:
element = oneOf("c l w h")
digits = "XB0123456789"
integer = Word( digits )#.setParseAction( self.__convertIntegers )
elementRef = Group(element + oneOf("= != < > >= <=") + integer)
operator = oneOf("and or")
deFinition = elementRef + ZeroOrMore( operator + elementRef)
nestedformula = Group(Suppress(Optional(Literal("("))) + deFinition + Suppress(Optional(Literal(")"))))
self.finalformula = nestedformula + ZeroOrMore( operator + nestedformula)
elementRef.setParseAction(self.__compute_element)
nestedformula.setParseAction(self.__compute_formula)
self.finalformula.setParseAction(self.__myreduce)
self.res = None
self.hideparams = ffilter
if "XXX" in self.hideparams['codes']:
self.hideparams['codes'].append("0")
self.baseline = None
def __init__(self):
if PYPARSING:
category = Word( alphas + "_-*", alphanums + "_-*" )
operator = oneOf("and or,")
neg_operator = "not"
elementRef = category
deFinition = elementRef + ZeroOrMore( operator + elementRef)
nestedformula = Group(Suppress(Optional(Literal("("))) + deFinition + Suppress(Optional(Literal(")"))))
neg_nestedformula = Optional(neg_operator) + nestedformula
self.finalformula = neg_nestedformula + ZeroOrMore( operator + neg_nestedformula)
elementRef.setParseAction(self.__compute_element)
neg_nestedformula.setParseAction(self.__compute_neg_formula)
nestedformula.setParseAction(self.__compute_formula)
self.finalformula.setParseAction(self.__myreduce)
def getrule():
"""
Using pyparsing,get rule out of a string.
"""
arrow = pp.Literal("==>")
buff = pp.Word(pp.alphas, "".join([pp.alphanums, "_"]))
special_valueLHS = pp.oneOf([x for x in _LHSCONVENTIONS.keys()])
end_buffer = pp.Literal(">")
special_valueRHS = pp.oneOf([x for x in _RHSCONVENTIONS.keys()])
chunk = getchunk()
rule_reader = pp.Group(pp.OneOrMore(pp.Group(special_valueLHS + buff + end_buffer + pp.Group(pp.Optional(chunk))))) + arrow + pp.Group(pp.OneOrMore(pp.Group(special_valueRHS + buff + end_buffer + pp.Group(pp.Optional(chunk)))))
return rule_reader
def __update_grammar(self, grammar):
registers = list(self.registers)
grammar.kRegisterNames << pp.Or(map(pp.Literal, list(self.registers)))
return grammar
def parse_search_query(query):
unicode_printables = u''.join(unichr(c) for c in xrange(65536) if not unichr(c).isspace())
word = TextNode.group(Word(unicode_printables))
exact = ExactNode.group(QuotedString('"', unquoteResults=True, escChar='\\'))
term = exact | word
comparison_name = Word(unicode_printables, excludeChars=':')
comparison = ComparisonNode.group(comparison_name + Literal(':') + term)
content = OneOrMore(comparison | term)
return content.parseString(query)
def __init__(self, queue_out)
Thread.__init__(self)
self.setName('filter_thread')
self.queue_out = queue_out
if PYPARSING:
element = oneOf("c l w h")
digits = "XB0123456789"
integer = Word( digits )#.setParseAction( self.__convertIntegers )
elementRef = Group(element + oneOf("= != < > >= <=") + integer)
operator = oneOf("and or")
deFinition = elementRef + ZeroOrMore( operator + elementRef)
nestedformula = Group(Suppress(Optional(Literal("("))) + deFinition + Suppress(Optional(Literal(")"))))
self.finalformula = nestedformula + ZeroOrMore( operator + nestedformula)
elementRef.setParseAction(self.__compute_element)
nestedformula.setParseAction(self.__compute_formula)
self.finalformula.setParseAction(self.__myreduce)
self.res = None
self.hideparams = ffilter
if "XXX" in self.hideparams['codes']:
self.hideparams['codes'].append("0")
self.baseline = None
def __init__(self):
if PYPARSING:
category = Word( alphas + "_-*",")
neg_operator = "not"
elementRef = category
deFinition = elementRef + ZeroOrMore( operator + elementRef)
nestedformula = Group(Suppress(Optional(Literal("("))) + deFinition + Suppress(Optional(Literal(")"))))
neg_nestedformula = Optional(neg_operator) + nestedformula
self.finalformula = neg_nestedformula + ZeroOrMore( operator + neg_nestedformula)
elementRef.setParseAction(self.__compute_element)
neg_nestedformula.setParseAction(self.__compute_neg_formula)
nestedformula.setParseAction(self.__compute_formula)
self.finalformula.setParseAction(self.__myreduce)
def parse_filter_str(self, self.or_operator)
])
result = expr.parseString(filter_str)
return result
def __update_grammar(self, list(self.registers)))
return grammar
def parse(self, ping_message):
headline, packet_info_line, body_line_list = self._preprocess_parse(
line_list=ping_message)
packet_pattern = (
pp.Word(pp.nums) +
pp.Literal("packets transmitted,") +
pp.Word(pp.nums) +
pp.Literal("received,")
)
self._destination = self._parse_destination(headline)
parse_list = packet_pattern.parseString(_to_unicode(packet_info_line))
self._packet_transmit = int(parse_list[0])
self._packet_receive = int(parse_list[2])
self._duplicates = self.__parse_duplicate(packet_info_line)
try:
rtt_line = body_line_list[1]
except IndexError:
return
if typepy.is_null_string(rtt_line):
return
rtt_pattern = (
pp.Literal("rtt min/avg/max/mdev =") +
pp.Word(pp.nums + ".") + "/" +
pp.Word(pp.nums + ".") + "/" +
pp.Word(pp.nums + ".") + "/" +
pp.Word(pp.nums + ".") +
pp.Word(pp.nums + "ms")
)
parse_list = rtt_pattern.parseString(_to_unicode(rtt_line))
self._rtt_min = float(parse_list[1])
self._rtt_avg = float(parse_list[3])
self._rtt_max = float(parse_list[5])
self._rtt_mdev = float(parse_list[7])
def __parse_duplicate(line):
packet_pattern = (
pp.SkipTo(pp.Word("+" + pp.nums) + pp.Literal("duplicates,")) +
pp.Word("+" + pp.nums) +
pp.Literal("duplicates,")
)
try:
duplicate_parse_list = packet_pattern.parseString(
_to_unicode(line))
except pp.ParseException:
return 0
return int(duplicate_parse_list[-2].strip("+"))
def parse(self, body_line_list = self._preprocess_parse(
line_list=ping_message)
packet_pattern = (
pp.Literal("Packets: Sent = ") +
pp.Word(pp.nums) +
pp.Literal(",Received = ") +
pp.Word(pp.nums)
)
self._destination = self._parse_destination(headline)
parse_list = packet_pattern.parseString(_to_unicode(packet_info_line))
self._packet_transmit = int(parse_list[1])
self._packet_receive = int(parse_list[3])
try:
rtt_line = body_line_list[2].strip()
except IndexError:
return
if typepy.is_null_string(rtt_line):
return
rtt_pattern = (
pp.Literal("Minimum = ") +
pp.Word(pp.nums) +
pp.Literal("ms,Maximum = ") +
pp.Word(pp.nums) +
pp.Literal("ms,Average = ") +
pp.Word(pp.nums)
)
parse_list = rtt_pattern.parseString(_to_unicode(rtt_line))
self._rtt_min = float(parse_list[1])
self._rtt_avg = float(parse_list[5])
self._rtt_max = float(parse_list[3])
def parse(self,") +
pp.Word(pp.nums) +
pp.Literal("packets received,")
)
self._destination = self._parse_destination(headline)
parse_list = packet_pattern.parseString(_to_unicode(packet_info_line))
self._packet_transmit = int(parse_list[0])
self._packet_receive = int(parse_list[2])
try:
rtt_line = body_line_list[1]
except IndexError:
return
if typepy.is_null_string(rtt_line):
return
rtt_pattern = (
pp.Literal("round-trip min/avg/max/stddev =") +
pp.Word(pp.nums + ".") + "/" +
pp.Word(pp.nums + ".") + "/" +
pp.Word(pp.nums + ".") + "/" +
pp.Word(pp.nums + ".") +
pp.Word(pp.nums + "ms")
)
parse_list = rtt_pattern.parseString(_to_unicode(rtt_line))
self._rtt_min = float(parse_list[1])
self._rtt_avg = float(parse_list[3])
self._rtt_max = float(parse_list[5])
self._rtt_mdev = float(parse_list[7])
def parse(self,")
)
self._destination = self._parse_destination(headline)
parse_list = packet_pattern.parseString(_to_unicode(packet_info_line))
self._packet_transmit = int(parse_list[0])
self._packet_receive = int(parse_list[2])
self._duplicates = self.__parse_duplicate(packet_info_line)
try:
rtt_line = body_line_list[1]
except IndexError:
return
if typepy.is_null_string(rtt_line):
return
rtt_pattern = (
pp.Literal("round-trip min/avg/max =") +
pp.Word(pp.nums + ".") + "/" +
pp.Word(pp.nums + ".") + "/" +
pp.Word(pp.nums + ".") +
pp.Word(pp.nums + "ms")
)
parse_list = rtt_pattern.parseString(_to_unicode(rtt_line))
self._rtt_min = float(parse_list[1])
self._rtt_avg = float(parse_list[3])
self._rtt_max = float(parse_list[5])
def wrapped_elem(wrapper, elem):
wrap = pp.Literal(wrapper).suppress()
return wrap + elem + wrap
def parser(self):
# Define punctuation as suppressed literals.
lparen, rparen, lbrack, rbrack, lbrace, rbrace, colon = \
map(pp.Suppress, "()[]{}:")
integer = pp.Combine(pp.Optional(pp.oneOf("+ -")) + pp.Word(pp.nums)) \
.setName("integer") \
.setParseAction(lambda toks: int(toks[0]))
real = pp.Combine(pp.Optional(pp.oneOf("+ -")) + pp.Word(pp.nums) + "." +
pp.Optional(pp.Word(pp.nums)) +
pp.Optional(pp.oneOf("e E") + pp.Optional(pp.oneOf("+ -")) + pp.Word(pp.nums))) \
.setName("real") \
.setParseAction(lambda toks: float(toks[0]))
_datetime_arg = (integer | real)
datetime_args = pp.Group(pp.delimitedList(_datetime_arg))
_datetime = pp.Suppress(pp.Literal('datetime') + pp.Literal("(")) + datetime_args + pp.Suppress(")")
_datetime.setParseAction(lambda x: self._make_datetime(x[0]))
tuple_str = pp.Forward()
list_str = pp.Forward()
dict_str = pp.Forward()
list_item = real | integer | _datetime | pp.quotedString.setParseAction(pp.removeQuotes) | \
pp.Group(list_str) | tuple_str | dict_str
tuple_str << (pp.Suppress("(") + pp.Optional(pp.delimitedList(list_item)) +
pp.Optional(pp.Suppress(",")) + pp.Suppress(")"))
tuple_str.setParseAction(lambda toks : tuple(toks.asList()))
list_str << (lbrack + pp.Optional(pp.delimitedList(list_item) +
pp.Optional(pp.Suppress(","))) + rbrack)
dict_entry = pp.Group(list_item + colon + list_item)
dict_str << (lbrace + pp.Optional(pp.delimitedList(dict_entry) +
pp.Optional(pp.Suppress(","))) + rbrace)
dict_str.setParseAction(lambda toks: dict(toks.asList()))
return list_item
def number_parser():
point = pp.Literal(".")
e = pp.CaselessLiteral("e")
plusorminus = pp.Literal("+") ^ pp.Literal("-")
num = pp.Word(pp.nums)
dec = pp.Combine(num + pp.Optional(point + pp.Optional(num)) + pp.Optional(e + pp.Optional(plusorminus) + num)) ^\
pp.Combine(point + pp.Optional(num) + pp.Optional(e + pp.Optional(plusorminus) + num))
bin = pp.Combine(pp.Literal("0") + pp.CaselessLiteral("b") + pp.Word("01"))
hex = pp.Combine(pp.Literal("0") + pp.CaselessLiteral("x") + pp.Word(pp.hexnums))
oct = pp.Combine(pp.Literal("0") + pp.Optional(pp.CaselessLiteral("o")) + pp.Word("01234567"))
return dec ^ bin ^ hex ^ oct
def XXXX_cast_expression(self):
"""A function returning a parser for parsing cast expressions.
Args:
expression: a pyparsing parser for parsing an expression to be cast.
Returns:
A (pyparsing) parser for parsing cast expressions.
"""
word = pyparsing.Word(pyparsing.alphanums + '_*[]')
nested = pyparsing.Forward().setName("nested")
nested << pyparsing.Combine(
pyparsing.Literal('(').suppress()
+ pyparsing.Combine(
pyparsing.ZeroOrMore(self._integer() | word | nested))
+ pyparsing.Literal(')').suppress()
)
typeof_expression = (
_OPEN_PARENTHESIS
+ pyparsing.Keyword('typeof')
+ nested("typeof_arg")
+ _CLOSE_PARENTHESIS
)
type_expression = (
typeof_expression
| nested("simple_type")
)
return (
type_expression
+ ~(_PLUS | _MINUS)
+ self.expression("expression")
).setParseAction(self._create_cast_expression)
def _integer(self):
integer = self._hexadecimal_as_string() | self._decimal_as_string()
# Python does not care about suffixes so we just drop them.
possible_suffix = pyparsing.Literal('u') | 'U' | 'll' | 'LL' | 'l' | 'L'
maybe_suffix = (
pyparsing.ZeroOrMore(possible_suffix)
).suppress()
return (
integer
+ maybe_suffix
).setParseAction(util.action(lambda x: int(x, base=0)))
def _define_function_like(self):
return (
(_IDENTIFIER.setResultsName("name")
+ _OPEN_PARENTHESES).leaveWhitespace()
+ pyparsing.Optional(
pyparsing.delimitedList(
_IDENTIFIER
| pyparsing.Literal("...") # vararg macro.
)).setResultsName("arguments")
+ _CLOSE_PARENTHESES
+ pyparsing.restOfLine.setResultsName("replacement")
).setParseAction(self._add_function_like)
def grammar():
"""Define the query grammar.
Backus-Naur form (BNF) of the grammar::
<grammar> ::= <item> | <item> <boolean> <grammar>
<item> ::= <hosts> | "(" <grammar> ")"
<boolean> ::= "and not" | "and" | "xor" | "or"
Given that the pyparsing library defines the grammar in a BNF-like style,for the details of the tokens not
specified above check directly the source code.
Returns:
pyparsing.ParserElement: the grammar parser.
"""
# Boolean operators
boolean = (pp.CaselessKeyword('and not').leaveWhitespace() | pp.CaselessKeyword('and') |
pp.CaselessKeyword('xor') | pp.CaselessKeyword('or'))('bool')
# Parentheses
lpar = pp.Literal('(')('open_subgroup')
rpar = pp.Literal(')')('close_subgroup')
# Hosts selection: clustershell (,!&^[]) Syntax is allowed: host10[10-42].domain
hosts = (~(boolean) + pp.Word(pp.alphanums + '-_.,!&^[]'))('hosts')
# Final grammar,see the docstring for its BNF based on the tokens defined above
# Groups are used to split the parsed results for an easy access
full_grammar = pp.Forward()
item = hosts | lpar + full_grammar + rpar
full_grammar << pp.Group(item) + pp.ZeroOrMore(pp.Group(boolean + item)) # pylint: disable=expression-not-assigned
return full_grammar
def parse_line(attribute, string):
Grammar = Suppress(Keyword('mpc.{}'.format(attribute)) + Keyword('=')) + String('data') + Suppress(Literal(';') + Optional(Comments))
result, i, j = Grammar.scanString(string).next()
return [int_else_float_except_string(s) for s in result['data'].asList()]
def compute(self):
def getname(obj, name):
_val = None
#Todo move this method's error checking into base class (add a more for attribute only vs method and attribute)
#Todo need more context to say which method has an invalid input config
try:
_val = getattr(obj, name)
except AttributeError as a_err:
raise AttributeError(a_err)
except SyntaxError as s_err:
print("Error in Math Plugin config:", SyntaxError(s_err))
sys.exit(1)
if isinstance(_val, int) or isinstance(_val, float): #if we get an a numeric value - the attrib is actual a method plugin output
#print('got a number')
return _val
try:
if not _val.issingleton():
raise ValueError('Math plugin cannot process multi value attributes in %s' % name)
except AttributeError:
raise TypeError('Expected an attribute but got a %s' % type(_val))
num = _val[0].raw()
return num
attrmarker = (p.Literal('@') | p.Literal('!'))
attrmatch = attrmarker.suppress() + p.Word(p.alphanums)
for i in attrmatch.scanString(self.config):
x = i[0][0]
self.names[x] = getname(self.targetobject, x)
if m.isnan(self.names[x]):
raise TypeError('Math plugin can only perform path on numeric '
'values but got a %s with a value of %s in %s'
% (type(self.names[x]), self.names[x], x))
if all(v is not None for v in self.names.values()):
self.computable = True
if self.computable:
_expr = self.config
if '@' in _expr:
_expr = _expr.replace('@', '')
if '!' in _expr:
_expr = _expr.replace('!', '')
self.__result__ = s.simple_eval(_expr, names=self.names)
def compute(self):
def getname(obj, name):
_val = None
if hasattr(obj, name):
_val = getattr(obj, name, None)
if _val is None:
return _val
try:
if _val.isdynamic: #Todo make this work for non-attributes,non-dynamics (use .issingleton? - what about a concat mode?)
raise ValueError('Combine plugin cannot process %s because it contains a dynamic class' % name)
except AttributeError:
raise TypeError('Expected an attribute but got a %s' % type(_val))
if _val.issingleton():
_ret = '%s' % _val[0].raw()
else:
_ret = ','.join(['%s' % v.raw() for v in _val])
return _ret
attrmarker = (p.Literal('@') | p.Literal('!'))
attrmatch = attrmarker.suppress() + p.Word(p.alphanums)
for i in attrmatch.scanString(self.config):
x = i[0][0]
self.__attribs__[x] = getname(self.targetobject, x)
if all(v is not None for v in self.__attribs__.values()):
self.computable = True
if self.computable:
attrmatch = p.Literal('@').suppress() + p.Word(p.alphanums)
attrmatch.setParseAction(self.substitute)
attrlist = p.ZeroOrMore(p.Optional(p.White()) + attrmatch + p.Optional(p.White()))
self.__result__ = attrlist.transformString(self.config)
def parseTerms():
"""
expop :: '^'
multop :: '*' | '/'
addop :: '+' | '-'
integer :: ['+' | '-'] '0'..'9'+
atom :: PI | E | real | fn '(' expr ')' | '(' expr ')'
factor :: atom [ expop factor ]*
term :: factor [ multop factor ]*
expr :: term [ addop term ]*
"""
global terms
if not terms:
point = Literal( "." )
e = CaselessLiteral( "E" )
fnumber = Combine( Word( "+-"+nums, nums ) +
Optional( point + Optional( Word( nums ) ) ) +
Optional( e + Word( "+-"+nums, nums ) ) )
ident = Word(alphas, alphas+nums+"_$")
plus = Literal( "+" )
minus = Literal( "-" )
mult = Literal( "*" )
div = Literal( "/" )
lpar = Literal( "(" ).suppress()
rpar = Literal( ")" ).suppress()
addop = plus | minus
multop = mult | div
expop = Literal( "^" )
pi = CaselessLiteral( "PI" )
expr = Forward()
atom = (Optional("-") + ( pi | e | fnumber | ident + lpar + expr + rpar ).setParseAction( pushFirst ) | ( lpar + expr.suppress() + rpar )).setParseAction(pushUMinus)
# by defining exponentiation as "atom [ ^ factor ]..." instead of "atom [ ^ atom ]...",we get right-to-left exponents,instead of left-to-righ
# that is,2^3^2 = 2^(3^2),not (2^3)^2.
factor = Forward()
factor << atom + ZeroOrMore( ( expop + factor ).setParseAction( pushFirst ) )
term = factor + ZeroOrMore( ( multop + factor ).setParseAction( pushFirst ) )
expr << term + ZeroOrMore( ( addop + term ).setParseAction( pushFirst ) )
terms = expr
return terms
def parse(cls, search=False):
"""Parse the main query text. This method will also set the
class attribute `parsed_search` to the parsed query,and it will
return it too.
:param cls: The class object,since it is a static method
:type cls: object
:param search: Search text string if a custom search string is to be
used. False if the `cls.search` class attribute is to be used.
:type search: str
:returns: Parsed query
:rtype: list
>>> print(DocMatcher.parse('hello author = einstein'))
[['hello'],['author','=','einstein']]
>>> print(DocMatcher.parse(''))
[]
>>> print(\
DocMatcher.parse(\
'"hello world whatever =" tags = \\\'hello ====\\\''))
[['hello world whatever ='],['tags','hello ====']]
>>> print(DocMatcher.parse('hello'))
[['hello']]
"""
import pyparsing
cls.logger.debug('Parsing search')
search = search or cls.search
papis_alphas = pyparsing.printables.replace('=', '')
papis_key = pyparsing.Word(pyparsing.alphanums + '-')
papis_value = pyparsing.QuotedString(
quoteChar='"', escChar='\\', escQuote='\\'
) ^ pyparsing.QuotedString(
quoteChar="'", escQuote='\\'
) ^ papis_key
equal = pyparsing.ZeroOrMore(" ") + \
pyparsing.Literal('=') + \
pyparsing.ZeroOrMore(" ")
papis_query = pyparsing.ZeroOrMore(
pyparsing.Group(
pyparsing.ZeroOrMore(
papis_key + equal
) + papis_value
)
)
parsed = papis_query.parseString(search)
cls.logger.debug('Parsed search = %s' % parsed)
cls.parsed_search = parsed
return cls.parsed_search
def strings_section():
return pyparsing.Group(
pyparsing.Literal("strings") +
_COLON +
pyparsing.OneOrMore(statement()).setResultsName("statements")
).setResultsName("strings")
def _create_primitives():
global binary, quoted_string, config_type, comment, stream_trigger, selector
if ident is not None:
return
semi = Literal(u';').suppress()
ident = Word(alphas+u"_", alphas + nums + u"_")
number = Regex(u'((0x[a-fA-F0-9]+)|[+-]?[0-9]+)').setParseAction(lambda s, t: [int(t[0], 0)])
binary = Regex(u'hex:([a-fA-F0-9][a-fA-F0-9])+').setParseAction(lambda s, t: [unhexlify(t[0][4:])])
quoted_string = dblQuotedString
comment = Literal('#') + restOfLine
rvalue = number | quoted_string
# Convert all time intervals into an integer number of seconds
time_unit_multipliers = {
u'second': 1,
u'seconds': 1,
u'minute': 60,
u'minutes': 60,
u'hour': 60*60,
u'hours': 60*60,
u'day': 60*60*24,
u'days': 60*60*24,
u'month': 60*60*24*30,
u'months': 60*60*24*30,
u'year': 60*60*24*365,
u'years': 60*60*24*365,
}
config_type = oneOf('uint8_t uint16_t uint32_t int8_t int16_t int32_t uint8_t[] uint16_t[] uint32_t[] int8_t[] int16_t[] int32_t[] string binary')
comp = oneOf('> < >= <= == ~=')
time_unit = oneOf(u"second seconds minute minutes hour hours day days week weeks month months year years")
time_interval = (number + time_unit).setParseAction(lambda s, t: [t[0]*time_unit_multipliers[t[1]]])
slot_id = Literal(u"controller") | (Literal(u'slot') + number)
slot_id.setParseAction(lambda s,t: [SlotIdentifier.FromString(u' '.join([str(x) for x in t]))])
stream_modifier = Literal("system") | Literal("user") | Literal("combined")
stream = Optional(Literal("system")) + oneOf("buffered unbuffered input output counter constant") + number + Optional(Literal("node"))
stream.setParseAction(lambda s,t: [DataStream.FromString(u' '.join([str(x) for x in t]))])
all_selector = Optional(Literal("all")) + Optional(stream_modifier) + oneOf("buffered unbuffered inputs outputs counters constants") + Optional(Literal("nodes"))
all_selector.setParseAction(lambda s,t: [DataStreamSelector.FromString(u' '.join([str(x) for x in t]))])
one_selector = Optional(Literal("system")) + oneOf("buffered unbuffered input output counter constant") + number + Optional(Literal("node"))
one_selector.setParseAction(lambda s,t: [DataStreamSelector.FromString(u' '.join([str(x) for x in t]))])
selector = one_selector | all_selector
trigger_comp = oneOf('> < >= <= ==')
stream_trigger = Group((Literal(u'count') | Literal(u'value')) + Literal(u'(').suppress() - stream - Literal(u')').suppress() - trigger_comp - number).setResultsName('stream_trigger')
def grammar():
"""Define the query grammar.
Backus-Naur form (BNF) of the grammar::
<grammar> ::= <item> | <item> <and_or> <grammar>
<item> ::= [<neg>] <query-token> | [<neg>] "(" <grammar> ")"
<query-token> ::= <token> | <hosts>
<token> ::= <category>:<key> [<operator> <value>]
Given that the pyparsing library defines the grammar in a BNF-like style,for the details of the tokens not
specified above check directly the source code.
Returns:
pyparsing.ParserElement: the grammar parser.
"""
# Boolean operators
and_or = (pp.CaselessKeyword('and') | pp.CaselessKeyword('or'))('bool')
# 'neg' is used as label to allow the use of dot notation,'not' is a reserved word in Python
neg = pp.CaselessKeyword('not')('neg')
operator = pp.oneOf(OPERATORS, caseless=True)('operator') # Comparison operators
quoted_string = pp.quotedString.copy().addParseAction(pp.removeQuotes) # Both single and double quotes are allowed
# Parentheses
lpar = pp.Literal('(')('open_subgroup')
rpar = pp.Literal(')')('close_subgroup')
# Hosts selection: glob (*) and clustershell (,!&^[]) Syntaxes are allowed:
# i.e. host10[10-42].*.domain
hosts = quoted_string | (~(and_or | neg) + pp.Word(pp.alphanums + '-_.*,!&^[]'))
# Key-value token for allowed categories using the available comparison operators
# i.e. F:key = value
category = pp.oneOf(CATEGORIES, caseless=True)('category')
key = pp.Word(pp.alphanums + '-_.%@:')('key')
selector = pp.Combine(category + ':' + key) # i.e. F:key
# All printables characters except the parentheses that are part of this or the global grammar
all_but_par = ''.join([c for c in pp.printables if c not in ('(', ')', '{', '}')])
value = (quoted_string | pp.Word(all_but_par))('value')
token = selector + pp.Optional(operator + value)
# Final grammar,see the docstring for its BNF based on the tokens defined above
# Groups are used to split the parsed results for an easy access
full_grammar = pp.Forward()
item = pp.Group(pp.Optional(neg) + (token | hosts('hosts'))) | pp.Group(
pp.Optional(neg) + lpar + full_grammar + rpar)
full_grammar << item + pp.ZeroOrMore(pp.Group(and_or) + full_grammar) # pylint: disable=expression-not-assigned
return full_grammar
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。