Python pyparsing 模块,Group() 实例源码
我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用pyparsing.Group()。
def parser(self):
join_type = (pp.Literal("LEFT") | pp.Literal("RIGHT") | pp.Literal("INNER") | pp.Literal("OUTER"))
node_name = pp.Word(pp.alphas, pp.alphanums + "_$")
col_name = pp.Word(pp.alphas, pp.alphanums + "_$")
col_name_list = pp.Group(pp.delimitedList(col_name, delim=","))
l_brac = pp.Suppress("[")
r_brac = pp.Suppress("]")
single_join = (join_type + pp.Suppress("(") + node_name + l_brac +
col_name_list + r_brac + pp.Suppress("==>") + node_name +
l_brac + col_name_list + r_brac + pp.Suppress(")"))
single_join.addParseAction(lambda x: self._add_join(join_type=x[0],
child_node_name=x[1],
child_cols=x[2],
parent_node_name=x[3],
parent_cols=x[4]))
join_block = pp.OneOrMore(single_join)
return join_block
def __init__(self, identifier_parser=None):
"""
:param IdentifierParser identifier_parser: An identifier parser for checking the 3P and 5P partners
"""
self.identifier_parser = identifier_parser if identifier_parser is not None else IdentifierParser()
identifier = self.identifier_parser.language
reference_seq = oneOf(['r', 'p', 'c'])
coordinate = pyparsing_common.integer | '?'
missing = Keyword('?')
range_coordinate = missing(FUSION_MISSING) | (
reference_seq(FUSION_REFERENCE) + Suppress('.') + coordinate(FUSION_START) + Suppress('_') + coordinate(
FUSION_STOP))
self.language = fusion_tags + nest(Group(identifier)(PARTNER_5P), Group(range_coordinate)(RANGE_5P),
Group(identifier)(PARTNER_3P), Group(range_coordinate)(RANGE_3P))
super(FusionParser, self).__init__(self.language)
def _parse_atat_lattice(lattice_in):
"""Parse an ATAT-style `lat.in` string.
The parsed string will be in three groups: (Coordinate system) (lattice) (atoms)
where the atom group is split up into subgroups,each describing the position and atom name
"""
float_number = Regex(r'[-+]?[0-9]*\.?[0-9]+([eE][-+]?[0-9]+)?').setParseAction(lambda t: [float(t[0])])
vector = Group(float_number + float_number + float_number)
angles = vector
vector_line = vector + Suppress(LineEnd())
coord_sys = Group((vector_line + vector_line + vector_line) | (vector + angles + Suppress(LineEnd())))
lattice = Group(vector + vector + vector)
atom = Group(vector + Group(OneOrMore(Word(alphas + '_'))))
atat_lattice_grammer = coord_sys + lattice + Group(OneOrMore(atom))
# parse the input string and convert it to a POSCAR string
return atat_lattice_grammer.parseString(lattice_in)
def expression(self):
expression = pyparsing.Forward()
# (1 + (2 + 3))
nested_expression = pyparsing.nestedExpr(
"(", ")", expression).setParseAction(self._combine_lists)
# FOO(2,3)
function_call = (
_TOKEN().setResultsName("function")
+ _OPEN_PARENTHESIS()
+ pyparsing.delimitedList(
pyparsing.Combine(expression, adjacent=False, joinString=" "),
delim=",").setResultsName("func_args")
+ _CLOSE_PARENTHESIS()
)
expression << pyparsing.OneOrMore(
function_call.setParseAction(self._is_kNown_function)
| pyparsing.Group(nested_expression)
| _TOKEN()
| _NOT_TOKEN()
)
return pyparsing.Combine(expression, joinString=" ")
def _maybe_attributes(self):
"""Possibly match some attributes.
The Syntax of attributes is described here:
https://gcc.gnu.org/onlinedocs/gcc/Attribute-Syntax.html
"""
return pyparsing.Group(
pyparsing.ZeroOrMore(
_ATTRIBUTE
+ _DOUBLE_OPEN_PARENTHESIS
+ pyparsing.delimitedList(
pyparsing.Group(
self._identifier()("name")
+ pyparsing.Optional(
_OPEN_PARENTHESIS
+ parsers.anything_beetween("()")("args")
+ _CLOSE_PARENTHESIS
)
)
)
+ _DOUBLE_CLOSE_PARENTHESIS
).setParseAction(self._make_attribute)
)("attributes")
def _create_simple_statements():
global binary, ident, rvalue, simple_statement, semi, comp, number, slot_id, callrpc_stmt, generic_statement, streamer_stmt, stream, selector
if simple_statement is not None:
return
Meta_stmt = Group(Literal('Meta').suppress() + ident + Literal('=').suppress() + rvalue + semi).setResultsName('Meta_statement')
require_stmt = Group(Literal('require').suppress() + ident + comp + rvalue + semi).setResultsName('require_statement')
set_stmt = Group(Literal('set').suppress() - (ident | number) - Literal("to").suppress() - (rvalue | binary) - Optional(Literal('as').suppress() + config_type) + semi).setResultsName('set_statement')
callrpc_stmt = Group(Literal("call").suppress() + (ident | number) + Literal("on").suppress() + slot_id + Optional(Literal("=>").suppress() + stream('explicit_stream')) + semi).setResultsName('call_statement')
streamer_stmt = Group(Optional(Literal("manual")('manual')) + Optional(oneOf(u'encrypted signed')('security')) + Optional(Literal(u'realtime')('realtime')) + Literal('streamer').suppress() -
Literal('on').suppress() - selector('selector') - Optional(Literal('to').suppress() - slot_id('explicit_tile')) - Optional(Literal('with').suppress() - Literal('streamer').suppress() - number('with_other')) - semi).setResultsName('streamer_statement')
copy_stmt = Group(Literal("copy").suppress() - Optional(oneOf("all count average")('modifier')) - Optional(stream('explicit_input') | number('constant_input')) - Literal("=>") - stream("output") - semi).setResultsName('copy_statement')
trigger_stmt = Group(Literal("trigger") - Literal("streamer") - number('index') - semi).setResultsName('trigger_statement')
simple_statement = Meta_stmt | require_stmt | set_stmt | callrpc_stmt | streamer_stmt | trigger_stmt | copy_stmt
# In generic statements,keep track of the location where the match started for error handling
locator = Empty().setParseAction(lambda s, l, t: l)('location')
generic_statement = Group(locator + Group(ZeroOrMore(Regex(u"[^{};]+")) + Literal(u';'))('match')).setResultsName('unparsed_statement')
def _create_block_bnf():
global block_bnf, time_interval, statement, block_id, stream
if block_bnf is not None:
return
trigger_clause = Group(stream_trigger | Group(stream).setResultsName('stream_always') | Group(ident).setResultsName('identifier'))
every_block_id = Group(Literal(u'every').suppress() - time_interval).setResultsName('every_block')
when_block_id = Group(Literal(u'when').suppress() + Literal("connected").suppress() - Literal("to").suppress() - slot_id).setResultsName('when_block')
latch_block_id = Group(Literal(u'when').suppress() - stream_trigger).setResultsName('latch_block')
config_block_id = Group(Literal(u'config').suppress() - slot_id).setResultsName('config_block')
on_block_id = Group(Literal(u'on').suppress() - trigger_clause.setResultsName('triggerA') - Optional((Literal("and") | Literal("or")) - trigger_clause.setResultsName('triggerB'))).setResultsName('on_block')
block_id = every_block_id | when_block_id | latch_block_id | config_block_id | on_block_id
block_bnf = Forward()
statement = generic_statement | block_bnf
block_bnf << Group(block_id + Group(Literal(u'{').suppress() + ZeroOrMore(statement) + Literal(u'}').suppress())).setResultsName('block')
def __init__(self, identifier_parser=None):
"""
:param IdentifierParser identifier_parser: An identifier parser for checking the 3P and 5P partners
"""
self.identifier_parser = identifier_parser if identifier_parser is not None else IdentifierParser()
gmod_default_ns = oneOf(list(language.gmod_namespace.keys())).setParseAction(self.handle_gmod_default)
gmod_identifier = Group(self.identifier_parser.identifier_qualified) | Group(gmod_default_ns)
self.language = gmod_tag + nest(gmod_identifier(IDENTIFIER))
super(GmodParser, self).__init__(self.language)
def build_legacy_fusion(identifier, reference):
break_start = (ppc.integer | '?').setParseAction(fusion_break_handler_wrapper(reference, start=True))
break_end = (ppc.integer | '?').setParseAction(fusion_break_handler_wrapper(reference, start=False))
res = identifier(PARTNER_5P) + WCW + fusion_tags + nest(identifier(PARTNER_3P) + Optional(
WCW + Group(break_start)(RANGE_5P) + WCW + Group(break_end)(RANGE_3P)))
res.setParseAction(fusion_legacy_handler)
return res
def __init__(self, identifier_parser=None):
"""
:param IdentifierParser identifier_parser: An identifier parser for checking the 3P and 5P partners
"""
identifier_parser = identifier_parser if identifier_parser is not None else IdentifierParser()
super(LocationParser, self).__init__(Group(location_tag + nest(identifier_parser.language))(LOCATION))
def parse_filter_str(self, filter_str):
"""
method to parse filter string
"""
prop = pp.WordStart(pp.alphas) + pp.Word(pp.alphanums +
"_").setResultsName("prop")
value = (pp.QuotedString("'") | pp.QuotedString('"') | pp.Word(
pp.printables, excludeChars=",")).setResultsName("value")
types_ = pp.oneOf("re eq ne gt ge lt le").setResultsName("types")
flags = pp.oneOf("C I").setResultsName("flags")
comma = pp.Literal(',')
quote = (pp.Literal("'") | pp.Literal('"')).setResultsName("quote")
type_exp = pp.Group(pp.Literal("type") + pp.Literal(
"=") + quote + types_ + quote).setResultsName("type_exp")
flag_exp = pp.Group(pp.Literal("flag") + pp.Literal(
"=") + quote + flags + quote).setResultsName("flag_exp")
semi_expression = pp.Forward()
semi_expression << pp.Group(pp.Literal("(") +
prop + comma + value +
pp.Optional(comma + type_exp) +
pp.Optional(comma + flag_exp) +
pp.Literal(")")
).setParseAction(
self.parse_filter_obj).setResultsName("semi_expression")
expr = pp.Forward()
expr << pp.operatorPrecedence(semi_expression, [
("not", 1, pp.opAssoc.RIGHT, self.not_operator),
("and", 2, pp.opAssoc.LEFT, self.and_operator),
("or", self.or_operator)
])
result = expr.parseString(filter_str)
return result
def __init__(self, ffilter, queue_out):
FuzzQueue.__init__(self, queue_out)
Thread.__init__(self)
self.setName('filter_thread')
self.queue_out = queue_out
if PYPARSING:
element = oneOf("c l w h")
digits = "XB0123456789"
integer = Word( digits )#.setParseAction( self.__convertIntegers )
elementRef = Group(element + oneOf("= != < > >= <=") + integer)
operator = oneOf("and or")
deFinition = elementRef + ZeroOrMore( operator + elementRef)
nestedformula = Group(Suppress(Optional(Literal("("))) + deFinition + Suppress(Optional(Literal(")"))))
self.finalformula = nestedformula + ZeroOrMore( operator + nestedformula)
elementRef.setParseAction(self.__compute_element)
nestedformula.setParseAction(self.__compute_formula)
self.finalformula.setParseAction(self.__myreduce)
self.res = None
self.hideparams = ffilter
if "XXX" in self.hideparams['codes']:
self.hideparams['codes'].append("0")
self.baseline = None
def __init__(self):
if PYPARSING:
category = Word( alphas + "_-*", alphanums + "_-*" )
operator = oneOf("and or,")
neg_operator = "not"
elementRef = category
deFinition = elementRef + ZeroOrMore( operator + elementRef)
nestedformula = Group(Suppress(Optional(Literal("("))) + deFinition + Suppress(Optional(Literal(")"))))
neg_nestedformula = Optional(neg_operator) + nestedformula
self.finalformula = neg_nestedformula + ZeroOrMore( operator + neg_nestedformula)
elementRef.setParseAction(self.__compute_element)
neg_nestedformula.setParseAction(self.__compute_neg_formula)
nestedformula.setParseAction(self.__compute_formula)
self.finalformula.setParseAction(self.__myreduce)
def getchunk():
"""
Using pyparsing,create chunk reader for chunk strings.
"""
slot = pp.Word("".join([pp.alphas, "_"]), "".join([pp.alphanums, "_"]))
special_value = pp.Group(pp.oneOf([ACTRVARIABLE, "".join([ACTRNEG, ACTRVARIABLE]), ACTRNEG, VISIONGREATER, VISIONSMALLER, "".join([VISIONGREATER, "".join([VISIONSMALLER, ACTRVARIABLE])])\
+ pp.Word("".join([pp.alphanums, "_", '"', "'"])))
strvalue = pp.QuotedString('"', unquoteResults=False)
strvalue2 = pp.QuotedString("'", unquoteResults=False)
varvalue = pp.Word("".join([pp.alphanums, "_"]))
value = varvalue | special_value | strvalue | strvalue2
chunk_reader = pp.OneOrMore(pp.Group(slot + value))
return chunk_reader
def getrule():
"""
Using pyparsing,get rule out of a string.
"""
arrow = pp.Literal("==>")
buff = pp.Word(pp.alphas, "_"]))
special_valueLHS = pp.oneOf([x for x in _LHSCONVENTIONS.keys()])
end_buffer = pp.Literal(">")
special_valueRHS = pp.oneOf([x for x in _RHSCONVENTIONS.keys()])
chunk = getchunk()
rule_reader = pp.Group(pp.OneOrMore(pp.Group(special_valueLHS + buff + end_buffer + pp.Group(pp.Optional(chunk))))) + arrow + pp.Group(pp.OneOrMore(pp.Group(special_valueRHS + buff + end_buffer + pp.Group(pp.Optional(chunk)))))
return rule_reader
def group(cls, expr):
def group_action(s, t):
try:
lst = t[0].asList()
except (IndexError, AttributeError):
lst = t
return [cls(lst)]
return Group(expr).setParseAction(group_action)
def __init__(self, queue_out)
Thread.__init__(self)
self.setName('filter_thread')
self.queue_out = queue_out
if PYPARSING:
element = oneOf("c l w h")
digits = "XB0123456789"
integer = Word( digits )#.setParseAction( self.__convertIntegers )
elementRef = Group(element + oneOf("= != < > >= <=") + integer)
operator = oneOf("and or")
deFinition = elementRef + ZeroOrMore( operator + elementRef)
nestedformula = Group(Suppress(Optional(Literal("("))) + deFinition + Suppress(Optional(Literal(")"))))
self.finalformula = nestedformula + ZeroOrMore( operator + nestedformula)
elementRef.setParseAction(self.__compute_element)
nestedformula.setParseAction(self.__compute_formula)
self.finalformula.setParseAction(self.__myreduce)
self.res = None
self.hideparams = ffilter
if "XXX" in self.hideparams['codes']:
self.hideparams['codes'].append("0")
self.baseline = None
def __init__(self):
if PYPARSING:
category = Word( alphas + "_-*",")
neg_operator = "not"
elementRef = category
deFinition = elementRef + ZeroOrMore( operator + elementRef)
nestedformula = Group(Suppress(Optional(Literal("("))) + deFinition + Suppress(Optional(Literal(")"))))
neg_nestedformula = Optional(neg_operator) + nestedformula
self.finalformula = neg_nestedformula + ZeroOrMore( operator + neg_nestedformula)
elementRef.setParseAction(self.__compute_element)
neg_nestedformula.setParseAction(self.__compute_neg_formula)
nestedformula.setParseAction(self.__compute_formula)
self.finalformula.setParseAction(self.__myreduce)
def main():
word = Word(alphanums)
command = Group(OneOrMore(word))
token = Suppress("->")
device = Group(OneOrMore(word))
argument = Group(OneOrMore(word))
event = command + token + device + Optional(token + argument)
gate = Gate()
garage = Garage()
airco = Aircondition()
heating = Heating()
boiler = Boiler()
fridge = Fridge()
tests = ('open -> gate',
'close -> garage',
'turn on -> aircondition',
'turn off -> heating',
'increase -> boiler temperature -> 5 degrees',
'decrease -> fridge temperature -> 2 degrees')
open_actions = {'gate':gate.open, 'garage':garage.open, 'aircondition':airco.turn_on,
'heating':heating.turn_on, 'boiler temperature':boiler.increase_temperature,
'fridge temperature':fridge.increase_temperature}
close_actions = {'gate':gate.close, 'garage':garage.close, 'aircondition':airco.turn_off,
'heating':heating.turn_off, 'boiler temperature':boiler.decrease_temperature,
'fridge temperature':fridge.decrease_temperature}
for t in tests:
if len(event.parseString(t)) == 2: # no argument
cmd, dev = event.parseString(t)
cmd_str, dev_str = ' '.join(cmd), ' '.join(dev)
if 'open' in cmd_str or 'turn on' in cmd_str:
open_actions[dev_str]()
elif 'close' in cmd_str or 'turn off' in cmd_str:
close_actions[dev_str]()
elif len(event.parseString(t)) == 3: # argument
cmd, dev, arg = event.parseString(t)
cmd_str, dev_str, arg_str = ' '.join(cmd), ' '.join(dev), ' '.join(arg)
num_arg = 0
try:
num_arg = int(arg_str.split()[0]) # extract the numeric part
except ValueError as err:
print("expected number but got: '{}'".format(arg_str[0]))
if 'increase' in cmd_str and num_arg > 0:
open_actions[dev_str](num_arg)
elif 'decrease' in cmd_str and num_arg > 0:
close_actions[dev_str](num_arg)
def build_parser(self):
parsed_term = pyparsing.Group(pyparsing.Combine(pyparsing.Word(pyparsing.alphanums) + \
pyparsing.Suppress('*'))).setResultsName('wildcard') | \
pyparsing.Group(pyparsing.Combine(pyparsing.Word(pyparsing.alphanums+"._") + \
pyparsing.Word(':') + pyparsing.Group(pyparsing.Optional("\"") + \
pyparsing.Optional("<") + pyparsing.Optional(">") + pyparsing.Optional("=") + \
pyparsing.Optional("-") + pyparsing.Word(pyparsing.alphanums+"._/") + \
pyparsing.Optional("&") + pyparsing.Optional("<") + pyparsing.Optional(">") + \
pyparsing.Optional("=") + pyparsing.Optional("-") + \
pyparsing.Optional(pyparsing.Word(pyparsing.alphanums+"._/")) + \
pyparsing.Optional("\"")))).setResultsName('fields') | \
pyparsing.Group(pyparsing.Combine(pyparsing.Suppress('-')+ \
pyparsing.Word(pyparsing.alphanums+"."))).setResultsName('not_term') | \
pyparsing.Group(pyparsing.Word(pyparsing.alphanums)).setResultsName('term')
parsed_or = pyparsing.Forward()
parsed_quote_block = pyparsing.Forward()
parsed_quote_block << ((parsed_term + parsed_quote_block) | parsed_term )
parsed_quote = pyparsing.Group(pyparsing.Suppress('"') + parsed_quote_block + \
pyparsing.Suppress('"')).setResultsName("quotes") | parsed_term
parsed_parenthesis = pyparsing.Group((pyparsing.Suppress("(") + parsed_or + \
pyparsing.Suppress(")"))).setResultsName("parenthesis") | parsed_quote
parsed_and = pyparsing.Forward()
parsed_and << (pyparsing.Group(parsed_parenthesis + pyparsing.Suppress(pyparsing.Keyword("and")) + \
parsed_and).setResultsName("and") | \
pyparsing.Group(parsed_parenthesis + pyparsing.OneOrMore(~pyparsing.oneOf("or and") + \
parsed_and)).setResultsName("and") | parsed_parenthesis)
parsed_or << (pyparsing.Group(parsed_and + pyparsing.Suppress(pyparsing.Keyword("or")) + \
parsed_or).setResultsName("or") | parsed_and)
return parsed_or.parseString
def parse_filter_str(self, self.or_operator)
])
result = expr.parseString(filter_str)
return result
def entry():
name = Word(alphas, alphas+'_')
value = Word(nums, nums+".").setResultsName('nominal_value')
uncert = Word(nums).setResultsName('std_dev')
value_uncert = value + Suppress("(") + uncert + Suppress(")")
return Dict( Group(name + Suppress("=") + value_uncert ) )
def parser(cls):
lpar = pp.Suppress("(")
rpar = pp.Suppress(")")
mutate = pp.Suppress('mutate')
col_name = pp.Word(pp.alphas, pp.alphanums + "_$")
expr_evaluator = Evaluator(deferred_eval=True)
col_expr = expr_evaluator.parser()
mutation = col_name + pp.Suppress("=") + col_expr
mutation.setParseAction(lambda x: {'col_name': x[0], 'col_expr': x[1]})
mutations = pp.Group(pp.delimitedList(mutation))
parser = mutate + lpar + mutations + rpar
parser.setParseAction(lambda x: Mutate(mutations=x))
return parser
def parser(cls):
rename = pp.Suppress("rename")
rename_kwarg = common_parsers.column + pp.Suppress("=") + common_parsers.column
rename_kwarg.setParseAction(lambda x: {x[0]: x[1]})
kwargs = pp.Group(pp.delimitedList(rename_kwarg))
kwargs.setParseAction(lambda x: {k: v for d in x for k, v in d.items()})
parser = rename + pp.Suppress("(") + kwargs + pp.Suppress(")")
parser.setParseAction(lambda x: Rename(columns=x[0]))
return parser
def parser(cls):
select = pp.Suppress("select")
column = common_parsers.column
parser = select + pp.Suppress("(") + pp.Group(pp.delimitedList(column)) + pp.Suppress(")")
parser.setParseAction(lambda x: Select(columns=x[0]))
return parser
def parser(cls):
remove = pp.Suppress("remove")
column = common_parsers.column
parser = remove + pp.Suppress("(") + pp.Group(pp.delimitedList(column)) + pp.Suppress(")")
parser.setParseAction(lambda x: Remove(columns=x[0]))
return parser
def parser(cls):
unpack = pp.Suppress("unpack")
packed_col_name = common_parsers.column
dict_key = pp.Suppress("[") + pp.QuotedString(quoteChar="'") + pp.Suppress("]")
dict_key_grp = pp.Group(pp.OneOrMore(dict_key))
new_col_name = common_parsers.column
unpack_arg = new_col_name + pp.Suppress("=") + packed_col_name + dict_key_grp
unpack_arg.setParseAction(lambda x: {'packed_col': x[1], 'key_list': x[2], 'new_col_name': x[0]})
parser = unpack + pp.Suppress("(") + pp.delimitedList(unpack_arg) + pp.Suppress(")")
parser.setParseAction(lambda x: Unpack(unpack_list=x))
return parser
def parser(self):
query_key = pp.Keyword("QUERY")
query_value = pp.Suppress("|") + pp.SkipTo(pp.Suppress(";"), include=True)
fields_key = pp.Keyword("FIELDS")
field_name = common_parsers.column
field_name_list = pp.Group(pp.delimitedList(field_name,")).setParseAction(lambda x: x.asList())
fields_block = (pp.Suppress(fields_key) + field_name_list)
connector_name = pp.Word(pp.alphas, pp.alphanums + "_$")
using_block = pp.Suppress("USING") + connector_name
then_key = pp.Suppress("THEN")
manipulation_set = pp.Suppress("|") + pp.SkipTo(pp.Suppress(";"), include=True)
then_block = then_key + manipulation_set
as_key = pp.Suppress("AS")
node_name = pp.Word(pp.alphas, pp.alphanums + "_$")
as_block = as_key + node_name
query_node_block = (pp.Suppress(query_key) + query_value + pp.Optional(fields_block, default=None) + using_block + pp.Optional(then_block, default=None) + as_block)
query_node_block.setParseAction(lambda x: self._add_query_node(query_value=x[0],
connector_name=x[2],
node_name=x[4],
fields=x[1],
manipulation_set=x[3]))
single_query_node = query_node_block + pp.Optional(pp.Suppress("---"))
retrieve_block = pp.OneOrMore(single_query_node)
return retrieve_block
def parser(self):
# Define punctuation as suppressed literals.
lparen, rparen, lbrack, rbrack, lbrace, rbrace, colon = \
map(pp.Suppress, "()[]{}:")
integer = pp.Combine(pp.Optional(pp.oneOf("+ -")) + pp.Word(pp.nums)) \
.setName("integer") \
.setParseAction(lambda toks: int(toks[0]))
real = pp.Combine(pp.Optional(pp.oneOf("+ -")) + pp.Word(pp.nums) + "." +
pp.Optional(pp.Word(pp.nums)) +
pp.Optional(pp.oneOf("e E") + pp.Optional(pp.oneOf("+ -")) + pp.Word(pp.nums))) \
.setName("real") \
.setParseAction(lambda toks: float(toks[0]))
_datetime_arg = (integer | real)
datetime_args = pp.Group(pp.delimitedList(_datetime_arg))
_datetime = pp.Suppress(pp.Literal('datetime') + pp.Literal("(")) + datetime_args + pp.Suppress(")")
_datetime.setParseAction(lambda x: self._make_datetime(x[0]))
tuple_str = pp.Forward()
list_str = pp.Forward()
dict_str = pp.Forward()
list_item = real | integer | _datetime | pp.quotedString.setParseAction(pp.removeQuotes) | \
pp.Group(list_str) | tuple_str | dict_str
tuple_str << (pp.Suppress("(") + pp.Optional(pp.delimitedList(list_item)) +
pp.Optional(pp.Suppress(",")) + pp.Suppress(")"))
tuple_str.setParseAction(lambda toks : tuple(toks.asList()))
list_str << (lbrack + pp.Optional(pp.delimitedList(list_item) +
pp.Optional(pp.Suppress(","))) + rbrack)
dict_entry = pp.Group(list_item + colon + list_item)
dict_str << (lbrace + pp.Optional(pp.delimitedList(dict_entry) +
pp.Optional(pp.Suppress(","))) + rbrace)
dict_str.setParseAction(lambda toks: dict(toks.asList()))
return list_item
def __setattr__(self, k, v):
if isinstance(v, pp.ParserElement):
Grammar.grm[k] = pp.Group(v).setResultsName(k)
def statement():
return pyparsing.Group(
_IDENTIFIER.setResultsName("lhs") + _EQUALS +
pyparsing.Combine(
(anything_in_curly() |
pyparsing.QuotedString("'", escChar="\\", unquoteResults=False) |
pyparsing.QuotedString("\"", unquoteResults=False) |
_REGEX) +
pyparsing.ZeroOrMore(_KEYWORD),
adjacent=False,
joinString=" ",
).setResultsName("rhs")
)
def _arguments(self):
return pyparsing.Group(
pyparsing.Optional(
pyparsing.delimitedList(self.expression())))
def _arguments(self):
return pyparsing.Group(
pyparsing.Optional(pyparsing.delimitedList(self._argument()))
)
def _multiword_argument(self):
return pyparsing.Group(
self._variable()
+ pyparsing.OneOrMore(self._variable())
).setParseAction(util.action(pre_ast.CompositeBlock))
def _enum_deFinition(self):
"""Detect an enum deFinition.
e.g.
enum foo {
OPTION_1: 1 + 2,
OPTION_2
}
"""
return (
_ENUM
+ pyparsing.Optional(self._identifier())("enum_name")
+ _OPEN_CURLY
+ pyparsing.ZeroOrMore(
pyparsing.Group(
self._identifier()("name")
+ pyparsing.Optional(
_EQUALS
# This allows us to get even invalid expressions.
+ pyparsing.SkipTo(pyparsing.Word(",}"))("expression")
)
+ pyparsing.Optional(_COMMA)
)
)("fields")
+ _CLOSE_CURLY
+ self._maybe_attributes()("attributes")
).setParseAction(self._process_enum_deFinition)
def grammar():
"""Define the query grammar.
Backus-Naur form (BNF) of the grammar::
<grammar> ::= <item> | <item> <boolean> <grammar>
<item> ::= <hosts> | "(" <grammar> ")"
<boolean> ::= "and not" | "and" | "xor" | "or"
Given that the pyparsing library defines the grammar in a BNF-like style,for the details of the tokens not
specified above check directly the source code.
Returns:
pyparsing.ParserElement: the grammar parser.
"""
# Boolean operators
boolean = (pp.CaselessKeyword('and not').leaveWhitespace() | pp.CaselessKeyword('and') |
pp.CaselessKeyword('xor') | pp.CaselessKeyword('or'))('bool')
# Parentheses
lpar = pp.Literal('(')('open_subgroup')
rpar = pp.Literal(')')('close_subgroup')
# Hosts selection: clustershell (,!&^[]) Syntax is allowed: host10[10-42].domain
hosts = (~(boolean) + pp.Word(pp.alphanums + '-_.,!&^[]'))('hosts')
# Final grammar,see the docstring for its BNF based on the tokens defined above
# Groups are used to split the parsed results for an easy access
full_grammar = pp.Forward()
item = hosts | lpar + full_grammar + rpar
full_grammar << pp.Group(item) + pp.ZeroOrMore(pp.Group(boolean + item)) # pylint: disable=expression-not-assigned
return full_grammar
def grammar():
"""Define the query grammar for the external backend used for testing."""
# Hosts selection: clustershell (,!&^[]) Syntax is allowed: host10[10-42].domain
hosts = pp.Word(pp.alphanums + '-_.,!&^[]')('hosts')
# Final grammar,see the docstring for its BNF based on the tokens defined above
# Groups are used to split the parsed results for an easy access
full_grammar = pp.Forward()
full_grammar << pp.Group(hosts) + pp.ZeroOrMore(pp.Group(hosts)) # pylint: disable=expression-not-assigned
return full_grammar
def main():
word = Word(alphanums)
command = Group(OneOrMore(word))
token = Suppress("->")
device = Group(OneOrMore(word))
argument = Group(OneOrMore(word))
event = command + token + device + Optional(token + argument)
gate = Gate()
garage = Garage()
airco = Aircondition()
heating = Heating()
boiler = Boiler()
fridge = Fridge()
tests = ('open -> gate',
'increase -> boiler temperature -> 20 degrees',
'decrease -> fridge temperature -> 6 degree')
open_actions = {'gate':gate.open, ' '.join(arg)
num_arg = 0
try:
num_arg = int(arg_str.split()[0]) # extract the numeric part
except ValueError as err:
print("expected number but got: '{}'".format(arg_str[0]))
if 'increase' in cmd_str and num_arg > 0:
open_actions[dev_str](num_arg)
elif 'decrease' in cmd_str and num_arg > 0:
close_actions[dev_str](num_arg)
def parse(cls, search=False):
"""Parse the main query text. This method will also set the
class attribute `parsed_search` to the parsed query,and it will
return it too.
:param cls: The class object,since it is a static method
:type cls: object
:param search: Search text string if a custom search string is to be
used. False if the `cls.search` class attribute is to be used.
:type search: str
:returns: Parsed query
:rtype: list
>>> print(DocMatcher.parse('hello author = einstein'))
[['hello'],['author','=','einstein']]
>>> print(DocMatcher.parse(''))
[]
>>> print(\
DocMatcher.parse(\
'"hello world whatever =" tags = \\\'hello ====\\\''))
[['hello world whatever ='],['tags','hello ====']]
>>> print(DocMatcher.parse('hello'))
[['hello']]
"""
import pyparsing
cls.logger.debug('Parsing search')
search = search or cls.search
papis_alphas = pyparsing.printables.replace('=', '')
papis_key = pyparsing.Word(pyparsing.alphanums + '-')
papis_value = pyparsing.QuotedString(
quoteChar='"', escChar='\\', escQuote='\\'
) ^ pyparsing.QuotedString(
quoteChar="'", escQuote='\\'
) ^ papis_key
equal = pyparsing.ZeroOrMore(" ") + \
pyparsing.Literal('=') + \
pyparsing.ZeroOrMore(" ")
papis_query = pyparsing.ZeroOrMore(
pyparsing.Group(
pyparsing.ZeroOrMore(
papis_key + equal
) + papis_value
)
)
parsed = papis_query.parseString(search)
cls.logger.debug('Parsed search = %s' % parsed)
cls.parsed_search = parsed
return cls.parsed_search
def main():
word = Word(alphanums)
command = Group(OneOrMore(word))
token = Suppress("->")
device = Group(OneOrMore(word))
argument = Group(OneOrMore(word))
event = command + token + device + Optional(token + argument)
gate = Gate()
garage = Garage()
airco = Aircondition()
heating = Heating()
boiler = Boiler()
fridge = Fridge()
tests = ('open -> gate',
'decrease -> fridge temperature -> 2 degrees')
open_actions = {'gate': gate.open,
'garage': garage.open,
'aircondition': airco.turn_on,
'heating': heating.turn_on,
'boiler temperature': boiler.increase_temperature,
'fridge temperature': fridge.increase_temperature}
close_actions = {'gate': gate.close,
'garage': garage.close,
'aircondition': airco.turn_off,
'heating': heating.turn_off,
'boiler temperature': boiler.decrease_temperature,
'fridge temperature': fridge.decrease_temperature}
for t in tests:
if len(event.parseString(t)) == 2: # ????
cmd, ' '.join(dev)
if 'open' in cmd_str or 'turn on' in cmd_str:
open_actions[dev_str]()
elif 'close' in cmd_str or 'turn off' in cmd_str:
close_actions[dev_str]()
elif len(event.parseString(t)) == 3: # ???
cmd, ' '.join(arg)
num_arg = 0
try:
num_arg = int(arg_str.split()[0]) # ??????
except ValueError as err:
print("expected number but got: '{}'".format(arg_str[0]))
if 'increase' in cmd_str and num_arg > 0:
open_actions[dev_str](num_arg)
elif 'decrease' in cmd_str and num_arg > 0:
close_actions[dev_str](num_arg)
def strings_section():
return pyparsing.Group(
pyparsing.Literal("strings") +
_COLON +
pyparsing.OneOrMore(statement()).setResultsName("statements")
).setResultsName("strings")
def _type_instance(self):
"""A type declaration.
The modifiers of a typedef:
struct s *P[];
^^^^<- The type instance.
"""
type_instance = (
# Function pointer (*f)(int foobar)
pyparsing.ZeroOrMore(_STAR)
+ _OPEN_PARENTHESIS
+ pyparsing.Optional(_STAR("function_pointer"))
+ self._identifier()("type_instance_name")
+ _CLOSE_PARENTHESIS
+ parsers.anything_in_parentheses()("function_args")
) | (
# Function object f(foo bar *)
pyparsing.ZeroOrMore(_STAR)
+ self._identifier()("type_instance_name")
+ parsers.anything_in_parentheses()("function_args")
) | (
# Simple form: *foo[10];
pyparsing.ZeroOrMore(_STAR)("type_pointer")
+ self._identifier()("type_instance_name")
# Possibly array: [],[][]
+ pyparsing.ZeroOrMore(
_OPEN_BRACKET
+ pyparsing.SkipTo(_CLOSE_BRACKET)(
"brackets_with_expression_inside*")
+ _CLOSE_BRACKET)
# Bitfields: int x: 7;
+ pyparsing.Optional(
_COLON
+ pyparsing.SkipTo(
_SEMICOLON | _COMMA)("bitfield")
)
)
return pyparsing.Group(
type_instance
+ self._maybe_attributes()
)
def _create_primitives():
global binary, quoted_string, config_type, comment, stream_trigger, selector
if ident is not None:
return
semi = Literal(u';').suppress()
ident = Word(alphas+u"_", alphas + nums + u"_")
number = Regex(u'((0x[a-fA-F0-9]+)|[+-]?[0-9]+)').setParseAction(lambda s, t: [int(t[0], 0)])
binary = Regex(u'hex:([a-fA-F0-9][a-fA-F0-9])+').setParseAction(lambda s, t: [unhexlify(t[0][4:])])
quoted_string = dblQuotedString
comment = Literal('#') + restOfLine
rvalue = number | quoted_string
# Convert all time intervals into an integer number of seconds
time_unit_multipliers = {
u'second': 1,
u'seconds': 1,
u'minute': 60,
u'minutes': 60,
u'hour': 60*60,
u'hours': 60*60,
u'day': 60*60*24,
u'days': 60*60*24,
u'month': 60*60*24*30,
u'months': 60*60*24*30,
u'year': 60*60*24*365,
u'years': 60*60*24*365,
}
config_type = oneOf('uint8_t uint16_t uint32_t int8_t int16_t int32_t uint8_t[] uint16_t[] uint32_t[] int8_t[] int16_t[] int32_t[] string binary')
comp = oneOf('> < >= <= == ~=')
time_unit = oneOf(u"second seconds minute minutes hour hours day days week weeks month months year years")
time_interval = (number + time_unit).setParseAction(lambda s, t: [t[0]*time_unit_multipliers[t[1]]])
slot_id = Literal(u"controller") | (Literal(u'slot') + number)
slot_id.setParseAction(lambda s,l,t: [SlotIdentifier.FromString(u' '.join([str(x) for x in t]))])
stream_modifier = Literal("system") | Literal("user") | Literal("combined")
stream = Optional(Literal("system")) + oneOf("buffered unbuffered input output counter constant") + number + Optional(Literal("node"))
stream.setParseAction(lambda s,t: [DataStream.FromString(u' '.join([str(x) for x in t]))])
all_selector = Optional(Literal("all")) + Optional(stream_modifier) + oneOf("buffered unbuffered inputs outputs counters constants") + Optional(Literal("nodes"))
all_selector.setParseAction(lambda s,t: [DataStreamSelector.FromString(u' '.join([str(x) for x in t]))])
one_selector = Optional(Literal("system")) + oneOf("buffered unbuffered input output counter constant") + number + Optional(Literal("node"))
one_selector.setParseAction(lambda s,t: [DataStreamSelector.FromString(u' '.join([str(x) for x in t]))])
selector = one_selector | all_selector
trigger_comp = oneOf('> < >= <= ==')
stream_trigger = Group((Literal(u'count') | Literal(u'value')) + Literal(u'(').suppress() - stream - Literal(u')').suppress() - trigger_comp - number).setResultsName('stream_trigger')
def grammar():
"""Define the query grammar.
Backus-Naur form (BNF) of the grammar::
<grammar> ::= <item> | <item> <and_or> <grammar>
<item> ::= [<neg>] <query-token> | [<neg>] "(" <grammar> ")"
<query-token> ::= <token> | <hosts>
<token> ::= <category>:<key> [<operator> <value>]
Given that the pyparsing library defines the grammar in a BNF-like style,for the details of the tokens not
specified above check directly the source code.
Returns:
pyparsing.ParserElement: the grammar parser.
"""
# Boolean operators
and_or = (pp.CaselessKeyword('and') | pp.CaselessKeyword('or'))('bool')
# 'neg' is used as label to allow the use of dot notation,'not' is a reserved word in Python
neg = pp.CaselessKeyword('not')('neg')
operator = pp.oneOf(OPERATORS, caseless=True)('operator') # Comparison operators
quoted_string = pp.quotedString.copy().addParseAction(pp.removeQuotes) # Both single and double quotes are allowed
# Parentheses
lpar = pp.Literal('(')('open_subgroup')
rpar = pp.Literal(')')('close_subgroup')
# Hosts selection: glob (*) and clustershell (,!&^[]) Syntaxes are allowed:
# i.e. host10[10-42].*.domain
hosts = quoted_string | (~(and_or | neg) + pp.Word(pp.alphanums + '-_.*,!&^[]'))
# Key-value token for allowed categories using the available comparison operators
# i.e. F:key = value
category = pp.oneOf(CATEGORIES, caseless=True)('category')
key = pp.Word(pp.alphanums + '-_.%@:')('key')
selector = pp.Combine(category + ':' + key) # i.e. F:key
# All printables characters except the parentheses that are part of this or the global grammar
all_but_par = ''.join([c for c in pp.printables if c not in ('(', ')', '{', '}')])
value = (quoted_string | pp.Word(all_but_par))('value')
token = selector + pp.Optional(operator + value)
# Final grammar,see the docstring for its BNF based on the tokens defined above
# Groups are used to split the parsed results for an easy access
full_grammar = pp.Forward()
item = pp.Group(pp.Optional(neg) + (token | hosts('hosts'))) | pp.Group(
pp.Optional(neg) + lpar + full_grammar + rpar)
full_grammar << item + pp.ZeroOrMore(pp.Group(and_or) + full_grammar) # pylint: disable=expression-not-assigned
return full_grammar
def grammar():
"""Define the query grammar.
Backus-Naur form (BNF) of the grammar::
<grammar> ::= "*" | <items>
<items> ::= <item> | <item> <whitespace> <items>
<item> ::= <key>:<value>
Given that the pyparsing library defines the grammar in a BNF-like style,for the details of the tokens not
specified above check directly the source code.
Returns:
pyparsing.ParserElement: the grammar parser.
"""
quoted_string = pp.quotedString.copy().addParseAction(pp.removeQuotes) # Both single and double quotes are allowed
# Key-value tokens: key:value
# Lowercase key,all printable characters except the parentheses that are part of the global grammar for the value
key = pp.Word(pp.srange('[a-z0-9-_.]"'), min=2)('key')
all_but_par = ''.join([c for c in pp.printables if c not in ('(', '}')])
value = (quoted_string | pp.Word(all_but_par))('value')
item = pp.Combine(key + ':' + value)
# Final grammar,see the docstring for its BNF based on the tokens defined above
# Groups are used to split the parsed results for an easy access
return pp.Group(pp.Literal('*')('all')) | pp.OneOrMore(pp.Group(item))
def grammar(backend_keys):
"""Define the main multi-query grammar.
Cumin provides a user-friendly generic query language that allows to combine the results of subqueries for multiple
backends:
* Each query part can be composed with the others using boolean operators ``and``,``or``,``and not``,``xor``.
* Multiple query parts can be grouped together with parentheses ``(``,``)``.
* Specific backend query ``I{backend-specific query Syntax}``,where ``I`` is an identifier for the specific
backend.
* Alias replacement,according to aliases defined in the configuration file ``A:group1``.
* The identifier ``A`` is reserved for the aliases replacement and cannot be used to identify a backend.
* A complex query example: ``(D{host1 or host2} and (P{R:Class = Role::MyClass} and not A:group1)) or D{host3}``
Backus-Naur form (BNF) of the grammar::
<grammar> ::= <item> | <item> <boolean> <grammar>
<item> ::= <backend_query> | <alias> | "(" <grammar> ")"
<backend_query> ::= <backend> "{" <query> "}"
<alias> ::= A:<alias_name>
<boolean> ::= "and not" | "and" | "xor" | "or"
Given that the pyparsing library defines the grammar in a BNF-like style,for the details of the tokens not
specified above check directly the source code.
Arguments:
backend_keys (list): list of the GRAMMAR_PREFIX for each registered backend.
Returns:
pyparsing.ParserElement: the grammar parser.
"""
# Boolean operators
boolean = (pp.CaselessKeyword('and not').leaveWhitespace() | pp.CaselessKeyword('and') |
pp.CaselessKeyword('xor') | pp.CaselessKeyword('or'))('bool')
# Parentheses
lpar = pp.Literal('(')('open_subgroup')
rpar = pp.Literal(')')('close_subgroup')
# Backend query: P{PuppetDB specific query}
query_start = pp.Combine(pp.oneOf(backend_keys, caseless=True)('backend') + pp.Literal('{'))
query_end = pp.Literal('}')
# Allow the backend specific query to use the end_query token as well,as long as it's in a quoted string
# and fail if there is a query_start token before the first query_end is reached
query = pp.SkipTo(query_end, ignore=pp.quotedString, failOn=query_start)('query')
backend_query = pp.Combine(query_start + query + query_end)
# Alias
alias = pp.Combine(pp.CaselessKeyword('A') + ':' + pp.Word(pp.alphanums + '-_.+')('alias'))
# Final grammar,see the docstring for its BNF based on the tokens defined above
# Group are used to have an easy dictionary access to the parsed results
full_grammar = pp.Forward()
item = backend_query | alias | lpar + full_grammar + rpar
full_grammar << pp.Group(item) + pp.ZeroOrMore(pp.Group(boolean + item)) # pylint: disable=expression-not-assigned
return full_grammar
def parse_table(attribute, string):
Line = OneOrMore(Float)('data') + Literal(';') + Optional(Comments, default='')('name')
Grammar = Suppress(Keyword('mpc.{}'.format(attribute)) + Keyword('=') + Keyword('[') + Optional(Comments)) + OneOrMore(Group(Line)) + Suppress(Keyword(']') + Optional(Comments))
result, i, j = Grammar.scanString(string).next()
_list = list()
for r in result:
_list.append([int_else_float_except_string(s) for s in r['data'].asList()])
return _list
def __init__(self, obj, config=str()):
def process(config):
pathexpr = p.Literal("'").suppress() + \
p.Optional(
p.Combine(
p.OneOrMore(p.Literal("/") + p.Word(p.alphanums)) + p.Literal("/").suppress()
)
).setResultsName('path') + \
p.Combine(
(p.Literal('@').suppress() | p.Literal('!').suppress()) +
p.Word(p.alphanums) +
p.Literal("'").suppress()
).setResultsName('attrib')
expr = p.Group(pathexpr).setResultsName('search')
match = expr.parseString(config)
_ret = []
if 'search' in match:
if 'path' in match['search']:
_ret.append(match['search']['path'])
if 'attrib' in match['search']:
_ret.append(match['search']['attrib'])
return _ret
super(Xattrib, self).__init__(obj, config=config, defer=True)
if self.config is None or len(self.config) < 1 or not isinstance(self.config, str):
raise ValueError('Xattrib plugin function requires a config string')
try:
_result = process("'%s'" % self.config)
if len(_result) == 2:
self.targetobject, self.targetattribute = _result
elif len(_result) == 1:
_config = getattr(obj, _result[0], None)
if _config is None:
raise ValueError('Xattrib plugin received an attribute name that does not exist')
# Todo len check only required for attributes,but not method plugins
if len(_config) > 1:
raise ValueError('Xattrib plugin received a attribute name that contains multiple values')
self.targetobject, self.targetattribute = process("'%s'" % _config[0])
else:
raise Exception()
except:
raise ValueError('An error occured when processing the search string for the Xattrib plugin function')
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。