Files
blue-team-tools/tools/sigma/filter.py
T
2021-10-28 20:56:19 +02:00

250 lines
9.9 KiB
Python

# Sigma parser
# Copyright 2016-2018 Thomas Patzke, Florian Roth
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
# Rule Filtering
import datetime
class SigmaRuleFilter:
"""Filter for Sigma rules with conditions"""
LEVELS = {
"low" : 0,
"medium" : 1,
"high" : 2,
"critical" : 3
}
STATES = [
"unsupported",
"deprecated",
"experimental",
"test",
"stable"]
def __init__(self, expr):
self.minlevel = None
self.maxlevel = None
self.status = None
self.notstatus = None
self.tlp = None
self.target = None
self.logsources = list()
self.notlogsources = list()
self.tags = list()
self.nottags = list()
self.inlastday = None
self.condition = list()
self.notcondition = list()
for cond in [c.replace(" ", "") for c in expr.split(",")]:
if cond.startswith("level<="):
try:
level = cond[cond.index("=") + 1:]
self.maxlevel = self.LEVELS[level]
except KeyError as e:
raise SigmaRuleFilterParseException("Unknown level '%s' in condition '%s'" % (level, cond)) from e
elif cond.startswith("level>="):
try:
level = cond[cond.index("=") + 1:]
self.minlevel = self.LEVELS[level]
except KeyError as e:
raise SigmaRuleFilterParseException("Unknown level '%s' in condition '%s'" % (level, cond)) from e
elif cond.startswith("level="):
try:
level = cond[cond.index("=") + 1:]
self.minlevel = self.LEVELS[level]
self.maxlevel = self.minlevel
except KeyError as e:
raise SigmaRuleFilterParseException("Unknown level '%s' in condition '%s'" % (level, cond)) from e
elif cond.startswith("status="):
self.status = cond[cond.index("=") + 1:]
if self.status not in self.STATES:
raise SigmaRuleFilterParseException("Unknown status '%s' in condition '%s'" % (self.status, cond))
elif cond.startswith("status!="):
self.notstatus = cond[cond.index("=") + 1:]
if self.notstatus not in self.STATES:
raise SigmaRuleFilterParseException("Unknown status '%s' in condition '%s'" % (self.notstatus, cond))
elif cond.startswith("tlp="):
self.tlp = cond[cond.index("=") + 1:].upper() #tlp is always uppercase
elif cond.startswith("target="):
self.target = cond[cond.index("=") + 1:].lower() # lower to make caseinsensitive
elif cond.startswith("logsource="):
self.logsources.append(cond[cond.index("=") + 1:])
elif cond.startswith("logsource!="):
self.notlogsources.append(cond[cond.index("=") + 1:])
elif cond.startswith("tag="):
self.tags.append(cond[cond.index("=") + 1:].lower())
elif cond.startswith("tag!="):
self.nottags.append(cond[cond.index("=") + 1:].lower())
elif cond.startswith("condition="):
self.condition.append(cond[cond.index("=") + 1:].lower())
elif cond.startswith("condition!="):
self.notcondition.append(cond[cond.index("=") + 1:].lower())
elif cond.startswith("inlastday="):
nbday = cond[cond.index("=") + 1:]
try:
self.inlastday = int(nbday)
except ValueError as e:
raise SigmaRuleFilterParseException("Unknown number '%s' in condition '%s'" % (nbday, cond)) from e
else:
raise SigmaRuleFilterParseException("Unknown condition '%s'" % cond)
def match(self, yamldoc):
"""Match filter conditions against rule"""
# Levels
if self.minlevel is not None or self.maxlevel is not None:
try:
level = self.LEVELS[yamldoc['level']]
except KeyError: # missing or invalid level
return False # User wants level restriction, but it's not possible here
# Minimum level
if self.minlevel is not None:
if level < self.minlevel:
return False
# Maximum level
if self.maxlevel is not None:
if level > self.maxlevel:
return False
# Status
if self.status is not None:
try:
status = yamldoc['status']
except KeyError: # missing status
return False # User wants status restriction, but it's not possible here
if status != self.status:
return False
if self.notstatus is not None:
try:
status = yamldoc['status']
except KeyError: # missing status
return False # User wants status restriction, but it's not possible here
if status == self.notstatus:
return False
# Tlp
if self.tlp is not None:
try:
tlp = yamldoc['tlp']
except KeyError: # missing tlp
tlp = "WHITE" # tlp is WHITE by default
if tlp != self.tlp:
return False
#Target
if self.target:
try:
targets = [ target.lower() for target in yamldoc['target']]
except (KeyError, AttributeError): # no target set
return False
if self.target not in targets:
return False
# Log Sources
if self.logsources:
try:
logsources = { value for key, value in yamldoc['logsource'].items() }
except (KeyError, AttributeError): # no log source set
return False # User wants status restriction, but it's not possible here
for logsrc in self.logsources:
if logsrc not in logsources:
return False
# NOT Log Sources
if self.notlogsources:
try:
notlogsources = { value for key, value in yamldoc['logsource'].items() }
except (KeyError, AttributeError): # no log source set
return False # User wants status restriction, but it's not possible here
for logsrc in self.notlogsources:
if logsrc in notlogsources:
return False
# Tags
if self.tags:
try:
tags = [ tag.lower() for tag in yamldoc['tags']]
except (KeyError, AttributeError): # no tags set
return False
for tag in self.tags:
if tag not in tags:
return False
# NOT Tags
if self.nottags:
try:
nottags = [ tag.lower() for tag in yamldoc['tags']]
except (KeyError, AttributeError): # no tags set
return False
for tag in self.nottags:
if tag in nottags:
return False
# date in the last N days
if self.inlastday:
try:
date_str = yamldoc['date']
except KeyError: # missing date
return False # User wants date time restriction, but it's not possible here
try:
modified_str = yamldoc['modified']
except KeyError: # no update
modified_str = None
if modified_str:
date_str = modified_str
date_object = datetime.datetime.strptime(date_str, '%Y/%m/%d')
today_objet = datetime.datetime.now()
delta = today_objet - date_object
if delta.days > self.inlastday:
return False
if self.condition:
try:
conditions = yamldoc['detection']['condition']
if isinstance(conditions,list): # sone time conditions are list even with only 1 line
s_condition = ' '.join(conditions)
else:
s_condition = conditions
except KeyError: # missing condition
return False # User wants condition restriction, but it's not possible here
for val in self.condition:
if not val in s_condition:
return False
if self.notcondition:
try:
conditions = yamldoc['detection']['condition']
if isinstance(conditions,list): # sone time conditions are list even with only 1 line
s_condition = ' '.join(conditions)
else:
s_condition = conditions
except KeyError: # missing condition
return False # User wants condition restriction, but it's not possible here
for val in self.notcondition:
if val in s_condition:
return False
# all tests passed
return True
class SigmaRuleFilterParseException(Exception):
pass