Conditional field mappings
This commit is contained in:
+70
-2
@@ -85,9 +85,9 @@ class SigmaParser:
|
||||
def add_value(self, key, value):
|
||||
"""Add value to values table, create key if it doesn't exist"""
|
||||
if key in self.values:
|
||||
self.values[key].append(value)
|
||||
self.values[key].add(str(value))
|
||||
else:
|
||||
self.values[key] = [ value ]
|
||||
self.values[key] = { str(value) }
|
||||
|
||||
def get_logsource(self):
|
||||
"""Returns logsource configuration object for current rule"""
|
||||
@@ -423,6 +423,8 @@ def FieldMapping(source, target=None):
|
||||
return SimpleFieldMapping(source, target)
|
||||
elif type(target) == list:
|
||||
return MultiFieldMapping(source, target)
|
||||
elif type(target) == dict:
|
||||
return ConditionalFieldMapping(source, target)
|
||||
|
||||
class SimpleFieldMapping:
|
||||
"""1:1 field mapping"""
|
||||
@@ -450,6 +452,72 @@ class MultiFieldMapping(SimpleFieldMapping):
|
||||
cond.add((fieldname, value))
|
||||
return cond
|
||||
|
||||
class ConditionalFieldMapping(SimpleFieldMapping):
|
||||
"""
|
||||
Conditional field mapping:
|
||||
* key contains field=value condition, value target mapping
|
||||
* key "default" maps when no condition matches
|
||||
* if no condition matches and there is no default, don't perform mapping
|
||||
"""
|
||||
target_type = dict
|
||||
|
||||
def __init__(self, source, target):
|
||||
"""Init table between condition field names and values"""
|
||||
super().__init__(source, target)
|
||||
self.conditions = dict() # condition field -> condition value -> target fields
|
||||
self.default = None
|
||||
for condition, target in self.target.items():
|
||||
try: # key contains condition (field=value)
|
||||
field, value = condition.split("=")
|
||||
self.add_condition(field, value, target)
|
||||
except ValueError as e: # no, condition - "default" expected
|
||||
if condition == "default":
|
||||
if self.default == None:
|
||||
if type(target) == str:
|
||||
self.default = [ target ]
|
||||
elif type(target) == list:
|
||||
self.default = target
|
||||
else:
|
||||
raise SigmaConfigParseError("Default mapping must be single value or list")
|
||||
else:
|
||||
raise SigmaConfigParseError("Conditional field mapping can have only one default value, use list for multiple target mappings")
|
||||
else:
|
||||
raise SigmaConfigParseError("Expected condition or default") from e
|
||||
|
||||
def add_condition(self, field, value, target):
|
||||
if field not in self.conditions:
|
||||
self.conditions[field] = dict()
|
||||
if value not in self.conditions[field]:
|
||||
self.conditions[field][value] = list()
|
||||
if type(target) == str:
|
||||
self.conditions[field][value].append(target)
|
||||
elif type(target) == list:
|
||||
self.conditions[field][value].extend(target)
|
||||
|
||||
def resolve(self, key, value, sigmaparser):
|
||||
# build list of matching target mappings
|
||||
targets = set()
|
||||
for condfield in self.conditions:
|
||||
if condfield in sigmaparser.values:
|
||||
rulefieldvalues = sigmaparser.values[condfield]
|
||||
for condvalue in self.conditions[condfield]:
|
||||
if condvalue in rulefieldvalues:
|
||||
print("found!")
|
||||
targets.update(self.conditions[condfield][condvalue])
|
||||
if len(targets) == 0: # no matching condition, try with default mapping
|
||||
if self.default != None:
|
||||
targets = self.default
|
||||
|
||||
if len(targets) == 1: # result set contains only one target, return mapped item (like SimpleFieldMapping)
|
||||
return (targets.pop(), value)
|
||||
elif len(targets) > 1: # result set contains multiple targets, return all linked as OR condition (like MultiFieldMapping)
|
||||
cond = ConditionOR()
|
||||
for target in targets:
|
||||
cond.add((target, value))
|
||||
return cond
|
||||
else: # no mapping found
|
||||
return (key, value)
|
||||
|
||||
# Configuration
|
||||
class SigmaConfiguration:
|
||||
"""Sigma converter configuration. Contains field mappings and logsource descriptions"""
|
||||
|
||||
Reference in New Issue
Block a user