2017-02-13 23:29:56 +01:00
# Sigma parser
2019-05-26 23:58:56 +02:00
# Copyright 2016-2019 Thomas Patzke, Florian Roth
2017-12-07 21:55:43 +01:00
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
2017-02-13 23:29:56 +01:00
import re
2018-07-27 00:02:07 +02:00
from . exceptions import SigmaParseError
2021-06-12 23:09:22 +03:00
from . condition import SigmaConditionTokenizer , SigmaConditionParser , ConditionAND , ConditionOR , ConditionNULLValue , SigmaSearchValueAsIs
2019-05-26 23:58:56 +02:00
from . modifiers import apply_modifiers
2017-11-01 00:17:55 +01:00
2017-02-13 23:29:56 +01:00
class SigmaParser :
2017-10-31 23:06:18 +01:00
""" Parse a Sigma rule (definitions, conditions and aggregations) """
2017-03-12 23:12:21 +01:00
def __init__ ( self , sigma , config ) :
2017-02-16 00:40:08 +01:00
self . definitions = dict ( )
2017-03-24 00:48:32 +01:00
self . values = dict ( )
2017-03-12 23:12:21 +01:00
self . config = config
2017-10-31 23:06:18 +01:00
self . parsedyaml = sigma
2017-09-04 00:56:04 +02:00
self . parse_sigma ( )
2017-02-22 22:43:35 +01:00
def parse_sigma ( self ) :
2017-02-16 23:58:44 +01:00
try : # definition uniqueness check
2017-02-22 22:43:35 +01:00
for definitionName , definition in self . parsedyaml [ " detection " ] . items ( ) :
2018-03-06 23:13:42 +01:00
if definitionName != " condition " :
self . definitions [ definitionName ] = definition
self . extract_values ( definition ) # builds key-values-table in self.values
2017-02-16 00:40:08 +01:00
except KeyError :
raise SigmaParseError ( " No detection definitions found " )
2017-02-16 23:58:44 +01:00
try : # tokenization
conditions = self . parsedyaml [ " detection " ] [ " condition " ]
2017-02-22 22:43:35 +01:00
self . condtoken = list ( ) # list of tokenized conditions
2017-02-16 23:58:44 +01:00
if type ( conditions ) == str :
self . condtoken . append ( SigmaConditionTokenizer ( conditions ) )
elif type ( conditions ) == list :
for condition in conditions :
self . condtoken . append ( SigmaConditionTokenizer ( condition ) )
except KeyError :
raise SigmaParseError ( " No condition found " )
2017-02-22 22:43:35 +01:00
self . condparsed = list ( ) # list of parsed conditions
for tokens in self . condtoken :
2017-10-31 23:06:18 +01:00
condparsed = SigmaConditionParser ( self , tokens )
self . condparsed . append ( condparsed )
2017-02-22 22:43:35 +01:00
2017-02-22 22:47:12 +01:00
def parse_definition_byname ( self , definitionName , condOverride = None ) :
2017-02-22 22:43:35 +01:00
try :
definition = self . definitions [ definitionName ]
except KeyError as e :
2017-10-23 23:30:44 +02:00
raise SigmaParseError ( " Unknown definition ' %s ' " % definitionName ) from e
2017-02-22 22:47:12 +01:00
return self . parse_definition ( definition , condOverride )
def parse_definition ( self , definition , condOverride = None ) :
2017-02-16 00:40:08 +01:00
if type ( definition ) not in ( dict , list ) :
raise SigmaParseError ( " Expected map or list, got type %s : ' %s ' " % ( type ( definition ) , str ( definition ) ) )
if type ( definition ) == list : # list of values or maps
if condOverride : # condition given through rule detection condition, e.g. 1 of x
2017-02-22 22:47:12 +01:00
cond = condOverride ( )
2017-02-16 00:40:08 +01:00
else : # no condition given, use default from spec
cond = ConditionOR ( )
2017-02-22 22:47:12 +01:00
subcond = None
2017-02-16 00:40:08 +01:00
for value in definition :
2017-02-22 22:47:12 +01:00
if type ( value ) in ( str , int ) :
2017-02-16 00:40:08 +01:00
cond . add ( value )
2017-02-22 22:47:12 +01:00
elif type ( value ) in ( dict , list ) :
cond . add ( self . parse_definition ( value ) )
2017-02-16 00:40:08 +01:00
else :
raise SigmaParseError ( " Definition list may only contain plain values or maps " )
elif type ( definition ) == dict : # map
2017-02-16 23:58:44 +01:00
cond = ConditionAND ( )
for key , value in definition . items ( ) :
2019-05-26 23:58:56 +02:00
if " | " in key : # field name contains value modifier
fieldname , * modifiers = key . split ( " | " )
2023-02-09 23:35:56 +01:00
if " cidr " in modifiers : # Add other unsupported modifiers here
raise SigmaParseError ( " Cannot convert the rule. Unsupported new cidr modifier by SIGMAC. Please use the new PySigma/SigmaCLI to be able to convert the rule " )
break
2019-05-26 23:58:56 +02:00
value = apply_modifiers ( value , modifiers )
else :
fieldname = key
mapping = self . config . get_fieldmapping ( fieldname )
if isinstance ( value , ( ConditionAND , ConditionOR ) ) : # value is condition node (by transformation modifier)
value . items = [ mapping . resolve ( key , item , self ) for item in value . items ]
cond . add ( value )
2021-02-24 14:43:13 +00:00
else : # plain value or something unexpected (caught by backends)
2019-05-26 23:58:56 +02:00
mapped = mapping . resolve ( key , value , self )
cond . add ( mapped )
2017-02-16 00:40:08 +01:00
return cond
2017-03-24 00:48:32 +01:00
def extract_values ( self , definition ) :
""" Extract all values from map key:value pairs info self.values """
if type ( definition ) == list : # iterate through items of list
for item in definition :
self . extract_values ( item )
elif type ( definition ) == dict : # add dict items to map
for key , value in definition . items ( ) :
self . add_value ( key , value )
def add_value ( self , key , value ) :
""" Add value to values table, create key if it doesn ' t exist """
if key in self . values :
2017-03-25 00:21:44 +01:00
self . values [ key ] . add ( str ( value ) )
2017-03-24 00:48:32 +01:00
else :
2017-03-25 00:21:44 +01:00
self . values [ key ] = { str ( value ) }
2017-03-24 00:48:32 +01:00
2017-03-14 23:22:32 +01:00
def get_logsource ( self ) :
""" Returns logsource configuration object for current rule """
try :
ls_rule = self . parsedyaml [ ' logsource ' ]
except KeyError :
return None
try :
category = ls_rule [ ' category ' ]
except KeyError :
category = None
try :
product = ls_rule [ ' product ' ]
except KeyError :
product = None
try :
service = ls_rule [ ' service ' ]
except KeyError :
service = None
return self . config . get_logsource ( category , product , service )
2018-09-12 23:31:51 +02:00
2020-12-15 09:23:49 +01:00
def build_conditions ( self , condition_func , items ) :
cond = condition_func ( )
for item in items :
if type ( item ) is list :
cond . add ( self . build_conditions ( ConditionAND , item ) )
else :
mapping = self . config . get_fieldmapping ( item [ 0 ] )
cond . add ( mapping . resolve ( item [ 0 ] , item [ 1 ] , self ) )
return cond
2018-09-12 23:31:51 +02:00
def get_logsource_condition ( self ) :
logsource = self . get_logsource ( )
if logsource is None :
return None
else :
cond = ConditionAND ( )
2020-12-15 09:23:49 +01:00
if self . config . get_logsourcemerging ( ) == ' or ' :
cond . add ( self . build_conditions ( ConditionOR , logsource . conditions ) )
else :
cond . add ( self . build_conditions ( ConditionAND , logsource . conditions ) )
2018-09-12 23:31:51 +02:00
# Add index condition if supported by backend and defined in log source
index_field = self . config . get_indexfield ( )
indices = logsource . index
if len ( indices ) > 0 and index_field is not None : # at least one index given and backend knows about indices in conditions
if len ( indices ) > 1 : # More than one index, search in all by ORing them together
index_cond = ConditionOR ( )
for index in indices :
index_cond . add ( ( index_field , index ) )
cond . add ( index_cond )
else : # only one index, add directly to AND from above
cond . add ( ( index_field , indices [ 0 ] ) )
2021-06-12 23:09:22 +03:00
# Add free-text search condition, expressed in the configuration as 'search' field.
if len ( logsource . search ) > 0 :
for item in logsource . search :
cond . add ( SigmaSearchValueAsIs ( item ) )
2018-09-12 23:31:51 +02:00
return cond