diff --git a/tools/sigma.py b/tools/sigma.py index 367f0fcb8..2ee92130a 100644 --- a/tools/sigma.py +++ b/tools/sigma.py @@ -12,17 +12,19 @@ class SigmaParser: def __init__(self, sigma): self.definitions = dict() self.parsedyaml = yaml.safe_load(sigma) + + def parse_sigma(self): try: # definition uniqueness check - definitionNames = set() - for definitionName in self.parsedyaml["detection"]: - if definitionName in definitionNames: + for definitionName, definition in self.parsedyaml["detection"].items(): + if definitionName in self.definitions: raise SigmaParseError("Definition '%s' was already defined" % (definitionName)) + self.definitions[definitionName] = definition except KeyError: raise SigmaParseError("No detection definitions found") try: # tokenization conditions = self.parsedyaml["detection"]["condition"] - self.condtoken = list() + self.condtoken = list() # list of tokenized conditions if type(conditions) == str: self.condtoken.append(SigmaConditionTokenizer(conditions)) elif type(conditions) == list: @@ -31,7 +33,15 @@ class SigmaParser: except KeyError: raise SigmaParseError("No condition found") - def parse_definition(self, definition, condOverride=None): + self.condparsed = list() # list of parsed conditions + for tokens in self.condtoken: + self.condparsed.append(SigmaConditionParser(self, tokens)) + + def parse_definition(self, definitionName, condOverride=None): + try: + definition = self.definitions[definitionName] + except KeyError as e: + raise SigmaParseError("Unknown definition '%s'" % (definitionName)) from e if type(definition) not in (dict, list): raise SigmaParseError("Expected map or list, got type %s: '%s'" % (type(definition), str(definition))) @@ -92,9 +102,10 @@ class SigmaConditionToken: "BY", ] - def __init__(self, tokendef, match): + def __init__(self, tokendef, match, pos): self.type = tokendef[0] self.matched = match.group() + self.pos = pos def __eq__(self, other): if type(other) == int: # match against type @@ -136,7 +147,7 @@ class SigmaConditionTokenizer: match = tokendef[1].match(condition) if match: if tokendef[0] != None: - self.tokens.append(SigmaConditionToken(tokendef, match)) + self.tokens.append(SigmaConditionToken(tokendef, match, pos + match.start())) pos += match.end() # increase position and cut matched prefix from condition condition = condition[match.end():] break @@ -146,6 +157,15 @@ class SigmaConditionTokenizer: def __str__(self): return " ".join([str(token) for token in self.tokens]) + def __iter__(self): + return iter(self.tokens) + + def __getitem__(self, i): + return self.tokens[i] + + def index(self, item): + return self.tokens.index(item) + class SigmaParseError(Exception): pass @@ -165,8 +185,11 @@ class ConditionAND(ConditionBase): """AND Condition""" op = COND_AND - def __init__(self): - self.items = list() + def __init__(self, sigma=None, op=None, val1=None, val2=None): + if sigma == None and op == None and val1 == None and val2 == None: # no parameters given - initialize empty + self.items = list() + else: # called by parser, use given values + self.items = [ val1, val2 ] class ConditionOR(ConditionAND): """OR Condition""" @@ -176,11 +199,97 @@ class ConditionNOT(ConditionBase): """NOT Condition""" op = COND_NOT - def __init__(self): - self.items = None + def __init__(self, sigma=None, op=None, val=None): + if sigma == None and op == None and val == None: # no parameters given - initialize empty + self.items = None + else: # called by parser, use given values + self.items = val def add(self, item): if self.items == None: super.add(item) else: raise ValueError("Only one element allowed in NOT condition") + +class NodeSubexpression: + """Subexpression in parentheses""" + def __init__(self, subexpr): + self.subexpr = subexpr + +# Parse tree converters: convert something into one of the parse tree node classes defined above +def convertAllFrom(sigma, op, val): + """Convert 'all from x' into ConditionAND""" + return sigma.parse_definition(val, ConditionAND) + +def convertOneFrom(sigma, op, val): + """Convert '1 from x' into ConditionOR""" + return sigma.parse_definition(val, ConditionAND) + +def convertId(sigma, op): + """Convert search identifiers (lists or maps) into condition nodes according to spec defaults""" + return sigma.parse_definition(op.matched) + +# Condition parser class +class SigmaConditionParser: + """Parser for Sigma condition expression""" + searchOperators = [ # description of operators: (token id, number of operands, parse tree node class) - order == precedence + (SigmaConditionToken.TOKEN_ALL, 1, convertAllFrom), + (SigmaConditionToken.TOKEN_ONE, 1, convertOneFrom), + (SigmaConditionToken.TOKEN_ID, 0, convertId), + (SigmaConditionToken.TOKEN_NOT, 1, ConditionNOT), + (SigmaConditionToken.TOKEN_AND, 2, ConditionAND), + (SigmaConditionToken.TOKEN_OR, 2, ConditionOR), + ] + + def __init__(self, sigmaParser, tokens): + if SigmaConditionToken.TOKEN_PIPE in tokens: # aggregations are not yet supported + raise NotImplementedError("Aggregation expressions are not yet supported") + + self.sigmaParser = sigmaParser + parsedSearch = self.parseSearch(tokens) + + def parseSearch(self, tokens): + """ + Iterative parsing of search expression. + """ + # 1. Identify subexpressions with parentheses around them and parse them like a separate search expression + while SigmaConditionToken.TOKEN_LPAR in tokens: + lPos = tokens.index(SigmaConditionToken.TOKEN_LPAR) + lTok = tokens[lPos] + try: + rPos = tokens.index(SigmaConditionToken.TOKEN_RPAR) + rTok = tokens[rPos] + except ValueError as e: + raise SigmaParseError("Missing matching closing parentheses") from e + if lPos + 1 == rPos: + raise SigmaParseError("Empty subexpression at " + str(lTok.pos)) + if lPos > rPos: + raise SigmaParseError("Closing parentheses at position " + str(rTok.pos) + " precedes opening at position " + str(lTok.pos)) + + subparsed = self.parseSearch(tokens[lPos + 1:rPos - 1]) + tokens = tokens[:lPos] + [ NodeSubexpression(subparsed) ] + tokens[rPos + 1:] # replace parentheses + expression with group node that contains parsed subexpression + + # 2. Iterate over all known operators in given precedence + for operator in self.searchOperators: + # 3. reduce all occurrences into corresponding parse tree nodes + while operator[0] in tokens: + print(tokens) + pos_op = tokens.index(operator[0]) + tok_op = tokens[pos_op] + if operator[1] == 0: # operator + treenode = operator[2](self.sigmaParser, tok_op) + tokens = tokens[:pos_op] + [ treenode ] + tokens[pos_op + 1:] + elif operator[1] == 1: # operator value + pos_val = pos_op + 1 + tok_val = tokens[pos_val] + treenode = operator[2](self.sigmaParser, tok_op, tok_val) + tokens = tokens[:pos_op] + [ treenode ] + tokens[pos_val + 1:] + elif operator[1] == 2: # value1 operator value2 + print(operator, pos_op) + pos_val1 = pos_op - 1 + pos_val2 = pos_op + 1 + tok_val1 = tokens[pos_val1] + tok_val2 = tokens[pos_val2] + treenode = operator[2](self.sigmaParser, tok_op, tok_val1, tok_val2) + tokens = tokens[:pos_val1] + [ treenode ] + tokens[pos_val2 + 1:] + return tokens diff --git a/tools/sigmac.py b/tools/sigmac.py index 4e7a57c08..47bdfdf84 100755 --- a/tools/sigmac.py +++ b/tools/sigmac.py @@ -5,7 +5,7 @@ import sys import argparse import yaml import json -from sigma import SigmaParser +from sigma import SigmaParser, SigmaParseError import backends def print_verbose(*args, **kwargs): @@ -33,16 +33,23 @@ if cmdargs.target_list: sys.exit(0) for sigmafile in cmdargs.inputs: - print_verbose("Processing Sigma input %s" % (sigmafile)) + print_verbose("* Processing Sigma input %s" % (sigmafile)) try: f = open(sigmafile) parser = SigmaParser(f) print_debug(json.dumps(parser.parsedyaml, indent=2)) + parser.parse_sigma() for condtoken in parser.condtoken: print_debug(condtoken) except OSError as e: print("Failed to open Sigma file %s: %s" % (sigmafile, str(e))) except yaml.parser.ParserError as e: print("Sigma file %s is no valid YAML: %s" % (sigmafile, str(e))) + except SigmaParseError as e: + print("Sigma parse error in %s: %s" % (sigmafile, str(e))) + except NotImplementedError as e: + print("This tool currently doesn't support the provided input: " + str(e)) + print("Feel free to contribute for fun and fame, this is open source :) -> https://github.com/Neo23x0/sigma") finally: f.close() + print_debug()