Files
blue-team-tools/tools/sigma/parser/rule.py
T

171 lines
7.3 KiB
Python
Raw Normal View History

2017-02-13 23:29:56 +01:00
# Sigma parser
2019-05-26 23:58:56 +02:00
# Copyright 2016-2019 Thomas Patzke, Florian Roth
2017-12-07 21:55:43 +01:00
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
2017-02-13 23:29:56 +01:00
import re
2018-07-27 00:02:07 +02:00
from .exceptions import SigmaParseError
from .condition import SigmaConditionTokenizer, SigmaConditionParser, ConditionAND, ConditionOR, ConditionNULLValue
2019-05-26 23:58:56 +02:00
from .modifiers import apply_modifiers
2017-11-01 00:17:55 +01:00
2017-02-13 23:29:56 +01:00
class SigmaParser:
2017-10-31 23:06:18 +01:00
"""Parse a Sigma rule (definitions, conditions and aggregations)"""
2017-03-12 23:12:21 +01:00
def __init__(self, sigma, config):
2017-02-16 00:40:08 +01:00
self.definitions = dict()
2017-03-24 00:48:32 +01:00
self.values = dict()
2017-03-12 23:12:21 +01:00
self.config = config
2017-10-31 23:06:18 +01:00
self.parsedyaml = sigma
2017-09-04 00:56:04 +02:00
self.parse_sigma()
def parse_sigma(self):
2017-02-16 23:58:44 +01:00
try: # definition uniqueness check
for definitionName, definition in self.parsedyaml["detection"].items():
2018-03-06 23:13:42 +01:00
if definitionName != "condition":
self.definitions[definitionName] = definition
self.extract_values(definition) # builds key-values-table in self.values
2017-02-16 00:40:08 +01:00
except KeyError:
raise SigmaParseError("No detection definitions found")
2017-02-16 23:58:44 +01:00
try: # tokenization
conditions = self.parsedyaml["detection"]["condition"]
self.condtoken = list() # list of tokenized conditions
2017-02-16 23:58:44 +01:00
if type(conditions) == str:
self.condtoken.append(SigmaConditionTokenizer(conditions))
elif type(conditions) == list:
for condition in conditions:
self.condtoken.append(SigmaConditionTokenizer(condition))
except KeyError:
raise SigmaParseError("No condition found")
self.condparsed = list() # list of parsed conditions
for tokens in self.condtoken:
2017-10-31 23:06:18 +01:00
condparsed = SigmaConditionParser(self, tokens)
self.condparsed.append(condparsed)
2017-02-22 22:47:12 +01:00
def parse_definition_byname(self, definitionName, condOverride=None):
try:
definition = self.definitions[definitionName]
except KeyError as e:
2017-10-23 23:30:44 +02:00
raise SigmaParseError("Unknown definition '%s'" % definitionName) from e
2017-02-22 22:47:12 +01:00
return self.parse_definition(definition, condOverride)
def parse_definition(self, definition, condOverride=None):
2017-02-16 00:40:08 +01:00
if type(definition) not in (dict, list):
raise SigmaParseError("Expected map or list, got type %s: '%s'" % (type(definition), str(definition)))
if type(definition) == list: # list of values or maps
if condOverride: # condition given through rule detection condition, e.g. 1 of x
2017-02-22 22:47:12 +01:00
cond = condOverride()
2017-02-16 00:40:08 +01:00
else: # no condition given, use default from spec
cond = ConditionOR()
2017-02-22 22:47:12 +01:00
subcond = None
2017-02-16 00:40:08 +01:00
for value in definition:
2017-02-22 22:47:12 +01:00
if type(value) in (str, int):
2017-02-16 00:40:08 +01:00
cond.add(value)
2017-02-22 22:47:12 +01:00
elif type(value) in (dict, list):
cond.add(self.parse_definition(value))
2017-02-16 00:40:08 +01:00
else:
raise SigmaParseError("Definition list may only contain plain values or maps")
elif type(definition) == dict: # map
2017-02-16 23:58:44 +01:00
cond = ConditionAND()
for key, value in definition.items():
2019-05-26 23:58:56 +02:00
if "|" in key: # field name contains value modifier
fieldname, *modifiers = key.split("|")
value = apply_modifiers(value, modifiers)
else:
fieldname = key
mapping = self.config.get_fieldmapping(fieldname)
if isinstance(value, (ConditionAND, ConditionOR)): # value is condition node (by transformation modifier)
value.items = [ mapping.resolve(key, item, self) for item in value.items ]
cond.add(value)
else: # plain value or something unexpected (catched by backends)
mapped = mapping.resolve(key, value, self)
cond.add(mapped)
2017-02-16 00:40:08 +01:00
return cond
2017-03-24 00:48:32 +01:00
def extract_values(self, definition):
"""Extract all values from map key:value pairs info self.values"""
if type(definition) == list: # iterate through items of list
for item in definition:
self.extract_values(item)
elif type(definition) == dict: # add dict items to map
for key, value in definition.items():
self.add_value(key, value)
def add_value(self, key, value):
"""Add value to values table, create key if it doesn't exist"""
if key in self.values:
2017-03-25 00:21:44 +01:00
self.values[key].add(str(value))
2017-03-24 00:48:32 +01:00
else:
2017-03-25 00:21:44 +01:00
self.values[key] = { str(value) }
2017-03-24 00:48:32 +01:00
def get_logsource(self):
"""Returns logsource configuration object for current rule"""
try:
ls_rule = self.parsedyaml['logsource']
except KeyError:
return None
try:
category = ls_rule['category']
except KeyError:
category = None
try:
product = ls_rule['product']
except KeyError:
product = None
try:
service = ls_rule['service']
except KeyError:
service = None
return self.config.get_logsource(category, product, service)
2018-09-12 23:31:51 +02:00
def get_logsource_condition(self):
logsource = self.get_logsource()
if logsource is None:
return None
else:
if logsource.merged: # Merged log source, flatten nested list of condition items
kvconds = [ item for sublscond in logsource.conditions for item in sublscond ]
else: # Simple log sources already contain flat list of conditions items
kvconds = logsource.conditions
# Apply field mappings
mapped_kvconds = list()
for field, value in kvconds:
mapping = self.config.get_fieldmapping(field)
mapped_kvconds.append(mapping.resolve(field, value, self))
2018-09-12 23:31:51 +02:00
# AND-link condition items
cond = ConditionAND()
for kvcond in mapped_kvconds:
2018-09-12 23:31:51 +02:00
cond.add(kvcond)
# Add index condition if supported by backend and defined in log source
index_field = self.config.get_indexfield()
indices = logsource.index
if len(indices) > 0 and index_field is not None: # at least one index given and backend knows about indices in conditions
if len(indices) > 1: # More than one index, search in all by ORing them together
index_cond = ConditionOR()
for index in indices:
index_cond.add((index_field, index))
cond.add(index_cond)
else: # only one index, add directly to AND from above
cond.add((index_field, indices[0]))
return cond