From eb440b33575c7b69b79bd702dbeeb34c7f424a41 Mon Sep 17 00:00:00 2001 From: Thomas Patzke Date: Fri, 27 Jul 2018 23:02:35 +0200 Subject: [PATCH] Split config - code removal from configuration --- tools/sigma/configuration.py | 208 ----------------------------------- 1 file changed, 208 deletions(-) diff --git a/tools/sigma/configuration.py b/tools/sigma/configuration.py index 1a522832d..4e93ef623 100644 --- a/tools/sigma/configuration.py +++ b/tools/sigma/configuration.py @@ -19,121 +19,6 @@ import re import logging from sigma.parser.condition import ConditionAND, ConditionOR -# Field Mapping Definitions -def FieldMapping(source, target=None): - """Determines target type and instantiate appropriate mapping type""" - if target == None: - return SimpleFieldMapping(source, source) - elif type(target) == str: - return SimpleFieldMapping(source, target) - elif type(target) == list: - return MultiFieldMapping(source, target) - elif type(target) == dict: - return ConditionalFieldMapping(source, target) - -class SimpleFieldMapping: - """1:1 field mapping""" - target_type = str - - def __init__(self, source, target): - """Initialization with generic target type check""" - if type(target) != self.target_type: - raise TypeError("Target type mismatch: wrong mapping type for this target") - self.source = source - self.target = target - - def resolve(self, key, value, sigmaparser): - """Return mapped field name""" - return (self.target, value) - - def resolve_fieldname(self, fieldname): - return self.target - -class MultiFieldMapping(SimpleFieldMapping): - """1:n field mapping that expands target field names into OR conditions""" - target_type = list - - def resolve(self, key, value, sigmaparser): - """Returns multiple target field names as OR condition""" - cond = ConditionOR() - for fieldname in self.target: - cond.add((fieldname, value)) - return cond - - def resolve_fieldname(self, fieldname): - return self.target - -class ConditionalFieldMapping(SimpleFieldMapping): - """ - Conditional field mapping: - * key contains field=value condition, value target mapping - * key "default" maps when no condition matches - * if no condition matches and there is no default, don't perform mapping - """ - target_type = dict - - def __init__(self, source, target): - """Init table between condition field names and values""" - super().__init__(source, target) - self.conditions = dict() # condition field -> condition value -> target fields - self.default = None - for condition, target in self.target.items(): - try: # key contains condition (field=value) - field, value = condition.split("=") - self.add_condition(field, value, target) - except ValueError as e: # no, condition - "default" expected - if condition == "default": - if self.default == None: - if type(target) == str: - self.default = [ target ] - elif type(target) == list: - self.default = target - else: - raise SigmaConfigParseError("Default mapping must be single value or list") - else: - raise SigmaConfigParseError("Conditional field mapping can have only one default value, use list for multiple target mappings") - else: - raise SigmaConfigParseError("Expected condition or default") from e - - def add_condition(self, field, value, target): - if field not in self.conditions: - self.conditions[field] = dict() - if value not in self.conditions[field]: - self.conditions[field][value] = list() - if type(target) == str: - self.conditions[field][value].append(target) - elif type(target) == list: - self.conditions[field][value].extend(target) - - def resolve(self, key, value, sigmaparser): - # build list of matching target mappings - targets = set() - for condfield in self.conditions: - if condfield in sigmaparser.values: - rulefieldvalues = sigmaparser.values[condfield] - for condvalue in self.conditions[condfield]: - if condvalue in rulefieldvalues: - targets.update(self.conditions[condfield][condvalue]) - if len(targets) == 0: # no matching condition, try with default mapping - if self.default != None: - targets = self.default - - if len(targets) == 1: # result set contains only one target, return mapped item (like SimpleFieldMapping) - return (targets.pop(), value) - elif len(targets) > 1: # result set contains multiple targets, return all linked as OR condition (like MultiFieldMapping) - cond = ConditionOR() - for target in targets: - cond.add((target, value)) - return cond - else: # no mapping found - return (key, value) - - def resolve_fieldname(self, fieldname): - if self.default != None: - return self.default - else: - return fieldname - # Configuration class SigmaConfiguration: """Sigma converter configuration. Contains field mappings and logsource descriptions""" @@ -338,96 +223,3 @@ class SigmaLogsourceConfiguration: def __str__(self): return "[ LogSourceConfiguration: %s %s %s indices: %s ]" % (self.category, self.product, self.service, str(self.index)) - -class SigmaConfigParseError(Exception): - pass - -# Rule Filtering -class SigmaRuleFilter: - """Filter for Sigma rules with conditions""" - LEVELS = { - "low" : 0, - "medium" : 1, - "high" : 2, - "critical" : 3 - } - STATES = ["experimental", "testing", "stable"] - - def __init__(self, expr): - self.minlevel = None - self.maxlevel = None - self.status = None - self.logsources = list() - - for cond in [c.replace(" ", "") for c in expr.split(",")]: - if cond.startswith("level<="): - try: - level = cond[cond.index("=") + 1:] - self.maxlevel = self.LEVELS[level] - except KeyError as e: - raise SigmaRuleFilterParseException("Unknown level '%s' in condition '%s'" % (level, cond)) from e - elif cond.startswith("level>="): - try: - level = cond[cond.index("=") + 1:] - self.minlevel = self.LEVELS[level] - except KeyError as e: - raise SigmaRuleFilterParseException("Unknown level '%s' in condition '%s'" % (level, cond)) from e - elif cond.startswith("level="): - try: - level = cond[cond.index("=") + 1:] - self.minlevel = self.LEVELS[level] - self.maxlevel = self.minlevel - except KeyError as e: - raise SigmaRuleFilterParseException("Unknown level '%s' in condition '%s'" % (level, cond)) from e - elif cond.startswith("status="): - self.status = cond[cond.index("=") + 1:] - if self.status not in self.STATES: - raise SigmaRuleFilterParseException("Unknown status '%s' in condition '%s'" % (self.status, cond)) - elif cond.startswith("logsource="): - self.logsources.append(cond[cond.index("=") + 1:]) - else: - raise SigmaRuleFilterParseException("Unknown condition '%s'" % cond) - - def match(self, yamldoc): - """Match filter conditions against rule""" - # Levels - if self.minlevel is not None or self.maxlevel is not None: - try: - level = self.LEVELS[yamldoc['level']] - except KeyError: # missing or invalid level - return False # User wants level restriction, but it's not possible here - - # Minimum level - if self.minlevel is not None: - if level < self.minlevel: - return False - # Maximum level - if self.maxlevel is not None: - if level > self.maxlevel: - return False - - # Status - if self.status is not None: - try: - status = yamldoc['status'] - except KeyError: # missing status - return False # User wants status restriction, but it's not possible here - if status != self.status: - return False - - # Log Sources - if len(self.logsources) > 0: - try: - logsources = { value for key, value in yamldoc['logsource'].items() } - except (KeyError, AttributeError): # no log source set - return False # User wants status restriction, but it's not possible here - - for logsrc in self.logsources: - if logsrc not in logsources: - return False - - # all tests passed - return True - -class SigmaRuleFilterParseException(Exception): - pass