Split config - code removal from configuration
This commit is contained in:
@@ -19,121 +19,6 @@ import re
|
||||
import logging
|
||||
from sigma.parser.condition import ConditionAND, ConditionOR
|
||||
|
||||
# Field Mapping Definitions
|
||||
def FieldMapping(source, target=None):
|
||||
"""Determines target type and instantiate appropriate mapping type"""
|
||||
if target == None:
|
||||
return SimpleFieldMapping(source, source)
|
||||
elif type(target) == str:
|
||||
return SimpleFieldMapping(source, target)
|
||||
elif type(target) == list:
|
||||
return MultiFieldMapping(source, target)
|
||||
elif type(target) == dict:
|
||||
return ConditionalFieldMapping(source, target)
|
||||
|
||||
class SimpleFieldMapping:
|
||||
"""1:1 field mapping"""
|
||||
target_type = str
|
||||
|
||||
def __init__(self, source, target):
|
||||
"""Initialization with generic target type check"""
|
||||
if type(target) != self.target_type:
|
||||
raise TypeError("Target type mismatch: wrong mapping type for this target")
|
||||
self.source = source
|
||||
self.target = target
|
||||
|
||||
def resolve(self, key, value, sigmaparser):
|
||||
"""Return mapped field name"""
|
||||
return (self.target, value)
|
||||
|
||||
def resolve_fieldname(self, fieldname):
|
||||
return self.target
|
||||
|
||||
class MultiFieldMapping(SimpleFieldMapping):
|
||||
"""1:n field mapping that expands target field names into OR conditions"""
|
||||
target_type = list
|
||||
|
||||
def resolve(self, key, value, sigmaparser):
|
||||
"""Returns multiple target field names as OR condition"""
|
||||
cond = ConditionOR()
|
||||
for fieldname in self.target:
|
||||
cond.add((fieldname, value))
|
||||
return cond
|
||||
|
||||
def resolve_fieldname(self, fieldname):
|
||||
return self.target
|
||||
|
||||
class ConditionalFieldMapping(SimpleFieldMapping):
|
||||
"""
|
||||
Conditional field mapping:
|
||||
* key contains field=value condition, value target mapping
|
||||
* key "default" maps when no condition matches
|
||||
* if no condition matches and there is no default, don't perform mapping
|
||||
"""
|
||||
target_type = dict
|
||||
|
||||
def __init__(self, source, target):
|
||||
"""Init table between condition field names and values"""
|
||||
super().__init__(source, target)
|
||||
self.conditions = dict() # condition field -> condition value -> target fields
|
||||
self.default = None
|
||||
for condition, target in self.target.items():
|
||||
try: # key contains condition (field=value)
|
||||
field, value = condition.split("=")
|
||||
self.add_condition(field, value, target)
|
||||
except ValueError as e: # no, condition - "default" expected
|
||||
if condition == "default":
|
||||
if self.default == None:
|
||||
if type(target) == str:
|
||||
self.default = [ target ]
|
||||
elif type(target) == list:
|
||||
self.default = target
|
||||
else:
|
||||
raise SigmaConfigParseError("Default mapping must be single value or list")
|
||||
else:
|
||||
raise SigmaConfigParseError("Conditional field mapping can have only one default value, use list for multiple target mappings")
|
||||
else:
|
||||
raise SigmaConfigParseError("Expected condition or default") from e
|
||||
|
||||
def add_condition(self, field, value, target):
|
||||
if field not in self.conditions:
|
||||
self.conditions[field] = dict()
|
||||
if value not in self.conditions[field]:
|
||||
self.conditions[field][value] = list()
|
||||
if type(target) == str:
|
||||
self.conditions[field][value].append(target)
|
||||
elif type(target) == list:
|
||||
self.conditions[field][value].extend(target)
|
||||
|
||||
def resolve(self, key, value, sigmaparser):
|
||||
# build list of matching target mappings
|
||||
targets = set()
|
||||
for condfield in self.conditions:
|
||||
if condfield in sigmaparser.values:
|
||||
rulefieldvalues = sigmaparser.values[condfield]
|
||||
for condvalue in self.conditions[condfield]:
|
||||
if condvalue in rulefieldvalues:
|
||||
targets.update(self.conditions[condfield][condvalue])
|
||||
if len(targets) == 0: # no matching condition, try with default mapping
|
||||
if self.default != None:
|
||||
targets = self.default
|
||||
|
||||
if len(targets) == 1: # result set contains only one target, return mapped item (like SimpleFieldMapping)
|
||||
return (targets.pop(), value)
|
||||
elif len(targets) > 1: # result set contains multiple targets, return all linked as OR condition (like MultiFieldMapping)
|
||||
cond = ConditionOR()
|
||||
for target in targets:
|
||||
cond.add((target, value))
|
||||
return cond
|
||||
else: # no mapping found
|
||||
return (key, value)
|
||||
|
||||
def resolve_fieldname(self, fieldname):
|
||||
if self.default != None:
|
||||
return self.default
|
||||
else:
|
||||
return fieldname
|
||||
|
||||
# Configuration
|
||||
class SigmaConfiguration:
|
||||
"""Sigma converter configuration. Contains field mappings and logsource descriptions"""
|
||||
@@ -338,96 +223,3 @@ class SigmaLogsourceConfiguration:
|
||||
|
||||
def __str__(self):
|
||||
return "[ LogSourceConfiguration: %s %s %s indices: %s ]" % (self.category, self.product, self.service, str(self.index))
|
||||
|
||||
class SigmaConfigParseError(Exception):
|
||||
pass
|
||||
|
||||
# Rule Filtering
|
||||
class SigmaRuleFilter:
|
||||
"""Filter for Sigma rules with conditions"""
|
||||
LEVELS = {
|
||||
"low" : 0,
|
||||
"medium" : 1,
|
||||
"high" : 2,
|
||||
"critical" : 3
|
||||
}
|
||||
STATES = ["experimental", "testing", "stable"]
|
||||
|
||||
def __init__(self, expr):
|
||||
self.minlevel = None
|
||||
self.maxlevel = None
|
||||
self.status = None
|
||||
self.logsources = list()
|
||||
|
||||
for cond in [c.replace(" ", "") for c in expr.split(",")]:
|
||||
if cond.startswith("level<="):
|
||||
try:
|
||||
level = cond[cond.index("=") + 1:]
|
||||
self.maxlevel = self.LEVELS[level]
|
||||
except KeyError as e:
|
||||
raise SigmaRuleFilterParseException("Unknown level '%s' in condition '%s'" % (level, cond)) from e
|
||||
elif cond.startswith("level>="):
|
||||
try:
|
||||
level = cond[cond.index("=") + 1:]
|
||||
self.minlevel = self.LEVELS[level]
|
||||
except KeyError as e:
|
||||
raise SigmaRuleFilterParseException("Unknown level '%s' in condition '%s'" % (level, cond)) from e
|
||||
elif cond.startswith("level="):
|
||||
try:
|
||||
level = cond[cond.index("=") + 1:]
|
||||
self.minlevel = self.LEVELS[level]
|
||||
self.maxlevel = self.minlevel
|
||||
except KeyError as e:
|
||||
raise SigmaRuleFilterParseException("Unknown level '%s' in condition '%s'" % (level, cond)) from e
|
||||
elif cond.startswith("status="):
|
||||
self.status = cond[cond.index("=") + 1:]
|
||||
if self.status not in self.STATES:
|
||||
raise SigmaRuleFilterParseException("Unknown status '%s' in condition '%s'" % (self.status, cond))
|
||||
elif cond.startswith("logsource="):
|
||||
self.logsources.append(cond[cond.index("=") + 1:])
|
||||
else:
|
||||
raise SigmaRuleFilterParseException("Unknown condition '%s'" % cond)
|
||||
|
||||
def match(self, yamldoc):
|
||||
"""Match filter conditions against rule"""
|
||||
# Levels
|
||||
if self.minlevel is not None or self.maxlevel is not None:
|
||||
try:
|
||||
level = self.LEVELS[yamldoc['level']]
|
||||
except KeyError: # missing or invalid level
|
||||
return False # User wants level restriction, but it's not possible here
|
||||
|
||||
# Minimum level
|
||||
if self.minlevel is not None:
|
||||
if level < self.minlevel:
|
||||
return False
|
||||
# Maximum level
|
||||
if self.maxlevel is not None:
|
||||
if level > self.maxlevel:
|
||||
return False
|
||||
|
||||
# Status
|
||||
if self.status is not None:
|
||||
try:
|
||||
status = yamldoc['status']
|
||||
except KeyError: # missing status
|
||||
return False # User wants status restriction, but it's not possible here
|
||||
if status != self.status:
|
||||
return False
|
||||
|
||||
# Log Sources
|
||||
if len(self.logsources) > 0:
|
||||
try:
|
||||
logsources = { value for key, value in yamldoc['logsource'].items() }
|
||||
except (KeyError, AttributeError): # no log source set
|
||||
return False # User wants status restriction, but it's not possible here
|
||||
|
||||
for logsrc in self.logsources:
|
||||
if logsrc not in logsources:
|
||||
return False
|
||||
|
||||
# all tests passed
|
||||
return True
|
||||
|
||||
class SigmaRuleFilterParseException(Exception):
|
||||
pass
|
||||
|
||||
Reference in New Issue
Block a user