Intermediate backup state: Parsing of most conditions
* Conditions with parentheses cause exceptions
This commit is contained in:
+120
-11
@@ -12,17 +12,19 @@ class SigmaParser:
|
||||
def __init__(self, sigma):
|
||||
self.definitions = dict()
|
||||
self.parsedyaml = yaml.safe_load(sigma)
|
||||
|
||||
def parse_sigma(self):
|
||||
try: # definition uniqueness check
|
||||
definitionNames = set()
|
||||
for definitionName in self.parsedyaml["detection"]:
|
||||
if definitionName in definitionNames:
|
||||
for definitionName, definition in self.parsedyaml["detection"].items():
|
||||
if definitionName in self.definitions:
|
||||
raise SigmaParseError("Definition '%s' was already defined" % (definitionName))
|
||||
self.definitions[definitionName] = definition
|
||||
except KeyError:
|
||||
raise SigmaParseError("No detection definitions found")
|
||||
|
||||
try: # tokenization
|
||||
conditions = self.parsedyaml["detection"]["condition"]
|
||||
self.condtoken = list()
|
||||
self.condtoken = list() # list of tokenized conditions
|
||||
if type(conditions) == str:
|
||||
self.condtoken.append(SigmaConditionTokenizer(conditions))
|
||||
elif type(conditions) == list:
|
||||
@@ -31,7 +33,15 @@ class SigmaParser:
|
||||
except KeyError:
|
||||
raise SigmaParseError("No condition found")
|
||||
|
||||
def parse_definition(self, definition, condOverride=None):
|
||||
self.condparsed = list() # list of parsed conditions
|
||||
for tokens in self.condtoken:
|
||||
self.condparsed.append(SigmaConditionParser(self, tokens))
|
||||
|
||||
def parse_definition(self, definitionName, condOverride=None):
|
||||
try:
|
||||
definition = self.definitions[definitionName]
|
||||
except KeyError as e:
|
||||
raise SigmaParseError("Unknown definition '%s'" % (definitionName)) from e
|
||||
if type(definition) not in (dict, list):
|
||||
raise SigmaParseError("Expected map or list, got type %s: '%s'" % (type(definition), str(definition)))
|
||||
|
||||
@@ -92,9 +102,10 @@ class SigmaConditionToken:
|
||||
"BY",
|
||||
]
|
||||
|
||||
def __init__(self, tokendef, match):
|
||||
def __init__(self, tokendef, match, pos):
|
||||
self.type = tokendef[0]
|
||||
self.matched = match.group()
|
||||
self.pos = pos
|
||||
|
||||
def __eq__(self, other):
|
||||
if type(other) == int: # match against type
|
||||
@@ -136,7 +147,7 @@ class SigmaConditionTokenizer:
|
||||
match = tokendef[1].match(condition)
|
||||
if match:
|
||||
if tokendef[0] != None:
|
||||
self.tokens.append(SigmaConditionToken(tokendef, match))
|
||||
self.tokens.append(SigmaConditionToken(tokendef, match, pos + match.start()))
|
||||
pos += match.end() # increase position and cut matched prefix from condition
|
||||
condition = condition[match.end():]
|
||||
break
|
||||
@@ -146,6 +157,15 @@ class SigmaConditionTokenizer:
|
||||
def __str__(self):
|
||||
return " ".join([str(token) for token in self.tokens])
|
||||
|
||||
def __iter__(self):
|
||||
return iter(self.tokens)
|
||||
|
||||
def __getitem__(self, i):
|
||||
return self.tokens[i]
|
||||
|
||||
def index(self, item):
|
||||
return self.tokens.index(item)
|
||||
|
||||
class SigmaParseError(Exception):
|
||||
pass
|
||||
|
||||
@@ -165,8 +185,11 @@ class ConditionAND(ConditionBase):
|
||||
"""AND Condition"""
|
||||
op = COND_AND
|
||||
|
||||
def __init__(self):
|
||||
self.items = list()
|
||||
def __init__(self, sigma=None, op=None, val1=None, val2=None):
|
||||
if sigma == None and op == None and val1 == None and val2 == None: # no parameters given - initialize empty
|
||||
self.items = list()
|
||||
else: # called by parser, use given values
|
||||
self.items = [ val1, val2 ]
|
||||
|
||||
class ConditionOR(ConditionAND):
|
||||
"""OR Condition"""
|
||||
@@ -176,11 +199,97 @@ class ConditionNOT(ConditionBase):
|
||||
"""NOT Condition"""
|
||||
op = COND_NOT
|
||||
|
||||
def __init__(self):
|
||||
self.items = None
|
||||
def __init__(self, sigma=None, op=None, val=None):
|
||||
if sigma == None and op == None and val == None: # no parameters given - initialize empty
|
||||
self.items = None
|
||||
else: # called by parser, use given values
|
||||
self.items = val
|
||||
|
||||
def add(self, item):
|
||||
if self.items == None:
|
||||
super.add(item)
|
||||
else:
|
||||
raise ValueError("Only one element allowed in NOT condition")
|
||||
|
||||
class NodeSubexpression:
|
||||
"""Subexpression in parentheses"""
|
||||
def __init__(self, subexpr):
|
||||
self.subexpr = subexpr
|
||||
|
||||
# Parse tree converters: convert something into one of the parse tree node classes defined above
|
||||
def convertAllFrom(sigma, op, val):
|
||||
"""Convert 'all from x' into ConditionAND"""
|
||||
return sigma.parse_definition(val, ConditionAND)
|
||||
|
||||
def convertOneFrom(sigma, op, val):
|
||||
"""Convert '1 from x' into ConditionOR"""
|
||||
return sigma.parse_definition(val, ConditionAND)
|
||||
|
||||
def convertId(sigma, op):
|
||||
"""Convert search identifiers (lists or maps) into condition nodes according to spec defaults"""
|
||||
return sigma.parse_definition(op.matched)
|
||||
|
||||
# Condition parser class
|
||||
class SigmaConditionParser:
|
||||
"""Parser for Sigma condition expression"""
|
||||
searchOperators = [ # description of operators: (token id, number of operands, parse tree node class) - order == precedence
|
||||
(SigmaConditionToken.TOKEN_ALL, 1, convertAllFrom),
|
||||
(SigmaConditionToken.TOKEN_ONE, 1, convertOneFrom),
|
||||
(SigmaConditionToken.TOKEN_ID, 0, convertId),
|
||||
(SigmaConditionToken.TOKEN_NOT, 1, ConditionNOT),
|
||||
(SigmaConditionToken.TOKEN_AND, 2, ConditionAND),
|
||||
(SigmaConditionToken.TOKEN_OR, 2, ConditionOR),
|
||||
]
|
||||
|
||||
def __init__(self, sigmaParser, tokens):
|
||||
if SigmaConditionToken.TOKEN_PIPE in tokens: # aggregations are not yet supported
|
||||
raise NotImplementedError("Aggregation expressions are not yet supported")
|
||||
|
||||
self.sigmaParser = sigmaParser
|
||||
parsedSearch = self.parseSearch(tokens)
|
||||
|
||||
def parseSearch(self, tokens):
|
||||
"""
|
||||
Iterative parsing of search expression.
|
||||
"""
|
||||
# 1. Identify subexpressions with parentheses around them and parse them like a separate search expression
|
||||
while SigmaConditionToken.TOKEN_LPAR in tokens:
|
||||
lPos = tokens.index(SigmaConditionToken.TOKEN_LPAR)
|
||||
lTok = tokens[lPos]
|
||||
try:
|
||||
rPos = tokens.index(SigmaConditionToken.TOKEN_RPAR)
|
||||
rTok = tokens[rPos]
|
||||
except ValueError as e:
|
||||
raise SigmaParseError("Missing matching closing parentheses") from e
|
||||
if lPos + 1 == rPos:
|
||||
raise SigmaParseError("Empty subexpression at " + str(lTok.pos))
|
||||
if lPos > rPos:
|
||||
raise SigmaParseError("Closing parentheses at position " + str(rTok.pos) + " precedes opening at position " + str(lTok.pos))
|
||||
|
||||
subparsed = self.parseSearch(tokens[lPos + 1:rPos - 1])
|
||||
tokens = tokens[:lPos] + [ NodeSubexpression(subparsed) ] + tokens[rPos + 1:] # replace parentheses + expression with group node that contains parsed subexpression
|
||||
|
||||
# 2. Iterate over all known operators in given precedence
|
||||
for operator in self.searchOperators:
|
||||
# 3. reduce all occurrences into corresponding parse tree nodes
|
||||
while operator[0] in tokens:
|
||||
print(tokens)
|
||||
pos_op = tokens.index(operator[0])
|
||||
tok_op = tokens[pos_op]
|
||||
if operator[1] == 0: # operator
|
||||
treenode = operator[2](self.sigmaParser, tok_op)
|
||||
tokens = tokens[:pos_op] + [ treenode ] + tokens[pos_op + 1:]
|
||||
elif operator[1] == 1: # operator value
|
||||
pos_val = pos_op + 1
|
||||
tok_val = tokens[pos_val]
|
||||
treenode = operator[2](self.sigmaParser, tok_op, tok_val)
|
||||
tokens = tokens[:pos_op] + [ treenode ] + tokens[pos_val + 1:]
|
||||
elif operator[1] == 2: # value1 operator value2
|
||||
print(operator, pos_op)
|
||||
pos_val1 = pos_op - 1
|
||||
pos_val2 = pos_op + 1
|
||||
tok_val1 = tokens[pos_val1]
|
||||
tok_val2 = tokens[pos_val2]
|
||||
treenode = operator[2](self.sigmaParser, tok_op, tok_val1, tok_val2)
|
||||
tokens = tokens[:pos_val1] + [ treenode ] + tokens[pos_val2 + 1:]
|
||||
return tokens
|
||||
|
||||
+9
-2
@@ -5,7 +5,7 @@ import sys
|
||||
import argparse
|
||||
import yaml
|
||||
import json
|
||||
from sigma import SigmaParser
|
||||
from sigma import SigmaParser, SigmaParseError
|
||||
import backends
|
||||
|
||||
def print_verbose(*args, **kwargs):
|
||||
@@ -33,16 +33,23 @@ if cmdargs.target_list:
|
||||
sys.exit(0)
|
||||
|
||||
for sigmafile in cmdargs.inputs:
|
||||
print_verbose("Processing Sigma input %s" % (sigmafile))
|
||||
print_verbose("* Processing Sigma input %s" % (sigmafile))
|
||||
try:
|
||||
f = open(sigmafile)
|
||||
parser = SigmaParser(f)
|
||||
print_debug(json.dumps(parser.parsedyaml, indent=2))
|
||||
parser.parse_sigma()
|
||||
for condtoken in parser.condtoken:
|
||||
print_debug(condtoken)
|
||||
except OSError as e:
|
||||
print("Failed to open Sigma file %s: %s" % (sigmafile, str(e)))
|
||||
except yaml.parser.ParserError as e:
|
||||
print("Sigma file %s is no valid YAML: %s" % (sigmafile, str(e)))
|
||||
except SigmaParseError as e:
|
||||
print("Sigma parse error in %s: %s" % (sigmafile, str(e)))
|
||||
except NotImplementedError as e:
|
||||
print("This tool currently doesn't support the provided input: " + str(e))
|
||||
print("Feel free to contribute for fun and fame, this is open source :) -> https://github.com/Neo23x0/sigma")
|
||||
finally:
|
||||
f.close()
|
||||
print_debug()
|
||||
|
||||
Reference in New Issue
Block a user