2017-02-13 23:14:40 +01:00
#!/usr/bin/env python3
# A Sigma to SIEM converter
2017-12-07 21:55:43 +01:00
# Copyright 2016-2017 Thomas Patzke, Florian Roth
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
2017-02-13 23:14:40 +01:00
import sys
import argparse
import yaml
import json
2017-03-06 09:36:10 +01:00
import pathlib
import itertools
2017-10-31 22:13:20 +01:00
import logging
2018-07-27 00:02:07 +02:00
from sigma . parser . collection import SigmaCollectionParser
from sigma . parser . exceptions import SigmaCollectionParseError , SigmaParseError
2018-07-27 23:54:18 +02:00
from sigma . configuration import SigmaConfiguration
from sigma . config . exceptions import SigmaConfigParseError , SigmaRuleFilterParseException
from sigma . filter import SigmaRuleFilter
2018-07-20 23:30:32 +02:00
import sigma . backends . discovery as backends
from sigma . backends . base import BackendOptions
from sigma . backends . exceptions import BackendError , NotSupportedError , PartialMatchError , FullMatchError
2017-12-13 00:12:56 +01:00
import codecs
sys . stdout = codecs . getwriter ( ' utf-8 ' ) ( sys . stdout . detach ( ) )
2017-02-13 23:14:40 +01:00
2017-10-31 22:13:20 +01:00
logger = logging . getLogger ( __name__ )
2017-02-13 23:14:40 +01:00
def print_verbose ( * args , * * kwargs ) :
if cmdargs . verbose or cmdargs . debug :
print ( * args , * * kwargs )
def print_debug ( * args , * * kwargs ) :
if cmdargs . debug :
print ( * args , * * kwargs )
2017-03-06 09:36:10 +01:00
def alliter ( path ) :
for sub in path . iterdir ( ) :
2017-12-14 22:39:51 +01:00
if sub . name . startswith ( " . " ) :
continue
2017-03-06 09:36:10 +01:00
if sub . is_dir ( ) :
yield from alliter ( sub )
else :
yield sub
def get_inputs ( paths , recursive ) :
2018-10-07 10:11:47 -05:00
if paths == [ ' - ' ] :
return [ sys . stdin ]
2017-03-06 09:36:10 +01:00
if recursive :
return list ( itertools . chain . from_iterable ( [ list ( alliter ( pathlib . Path ( p ) ) ) for p in paths ] ) )
else :
2017-03-07 09:41:46 +01:00
return [ pathlib . Path ( p ) for p in paths ]
2017-03-06 09:36:10 +01:00
2018-03-21 00:53:44 +01:00
class SigmacArgumentParser ( argparse . ArgumentParser ) :
def format_help ( self ) :
helptext = super ( ) . format_help ( ) + " \n Backend options: \n "
for backend in backends . getBackendList ( ) :
if len ( backend . options ) > 0 :
helptext + = " " + backend . identifier + " \n "
for option , default , help , _ in backend . options :
helptext + = " {:10} : {} (default: {} ) " . format ( option , help , default ) + " \n "
return helptext
argparser = SigmacArgumentParser ( description = " Convert Sigma rules into SIEM signatures. " )
2018-06-08 11:53:03 +02:00
argparser . add_argument ( " --recurse " , " -r " , action = " store_true " , help = " Use directory as input (recurse into subdirectories is not implemented yet) " )
2017-11-02 00:02:15 +01:00
argparser . add_argument ( " --filter " , " -f " , help = """
Define comma-separated filters that must match (AND-linked) to rule to be processed.
2018-09-06 00:57:54 +02:00
Valid filters: level<=x, level>=x, level=x, status=y, logsource=z, tag=t.
2017-11-02 00:02:15 +01:00
x is one of: low, medium, high, critical.
y is one of: experimental, testing, stable.
z is a word appearing in an arbitrary log source attribute.
2018-09-06 00:57:54 +02:00
t is a tag that must appear in the rules tag list, case-insensitive matching.
2017-11-02 00:02:15 +01:00
Multiple log source specifications are AND linked.
""" )
2017-03-01 09:40:51 +01:00
argparser . add_argument ( " --target " , " -t " , default = " es-qs " , choices = backends . getBackendDict ( ) . keys ( ) , help = " Output target format " )
2017-02-13 23:14:40 +01:00
argparser . add_argument ( " --target-list " , " -l " , action = " store_true " , help = " List available output target formats " )
2018-08-04 23:31:29 +02:00
argparser . add_argument ( " --config " , " -c " , help = " Configuration with field name and index mapping for target environment " )
argparser . add_argument ( " --output " , " -o " , default = None , help = " Output file or filename prefix if multiple files are generated " )
2017-09-22 00:28:35 +02:00
argparser . add_argument ( " --backend-option " , " -O " , action = " append " , help = " Options and switches that are passed to the backend " )
2017-08-02 00:56:22 +02:00
argparser . add_argument ( " --defer-abort " , " -d " , action = " store_true " , help = " Don ' t abort on parse or conversion errors, proceed with next rule. The exit code from the last error is returned " )
2018-06-07 23:32:52 +02:00
argparser . add_argument ( " --ignore-backend-errors " , " -I " , action = " store_true " , help = " Only return error codes for parse errors and ignore errors for rules that cause backend errors. Useful, when you want to get as much queries as possible. " )
2017-02-13 23:14:40 +01:00
argparser . add_argument ( " --verbose " , " -v " , action = " store_true " , help = " Be verbose " )
2017-08-02 00:56:22 +02:00
argparser . add_argument ( " --debug " , " -D " , action = " store_true " , help = " Debugging output " )
2018-10-07 10:11:47 -05:00
argparser . add_argument ( " inputs " , nargs = " * " , help = " Sigma input files ( ' - ' for stdin) " )
2017-02-13 23:14:40 +01:00
cmdargs = argparser . parse_args ( )
2017-10-31 23:06:18 +01:00
if cmdargs . debug :
logger . setLevel ( logging . DEBUG )
2017-02-13 23:14:40 +01:00
if cmdargs . target_list :
for backend in backends . getBackendList ( ) :
print ( " %10s : %s " % ( backend . identifier , backend . __doc__ ) )
sys . exit ( 0 )
2017-12-08 23:50:08 +01:00
elif len ( cmdargs . inputs ) == 0 :
print ( " Nothing to do! " )
argparser . print_usage ( )
2018-08-02 22:41:32 +02:00
sys . exit ( 0 )
2017-02-13 23:14:40 +01:00
2017-11-02 00:02:15 +01:00
rulefilter = None
if cmdargs . filter :
try :
rulefilter = SigmaRuleFilter ( cmdargs . filter )
except SigmaRuleFilterParseException as e :
print ( " Parse error in Sigma rule filter expression: %s " % str ( e ) , file = sys . stderr )
sys . exit ( 9 )
2017-03-06 22:07:04 +01:00
sigmaconfig = SigmaConfiguration ( )
2017-03-04 23:36:46 +01:00
if cmdargs . config :
2017-03-05 23:44:52 +01:00
try :
conffile = cmdargs . config
f = open ( conffile )
2017-03-06 22:07:04 +01:00
sigmaconfig = SigmaConfiguration ( f )
2017-03-05 23:44:52 +01:00
except OSError as e :
2017-03-06 23:01:33 +01:00
print ( " Failed to open Sigma configuration file %s : %s " % ( conffile , str ( e ) ) , file = sys . stderr )
2017-10-19 17:42:56 +02:00
exit ( 5 )
except ( yaml . parser . ParserError , yaml . scanner . ScannerError ) as e :
2017-03-06 23:01:33 +01:00
print ( " Sigma configuration file %s is no valid YAML: %s " % ( conffile , str ( e ) ) , file = sys . stderr )
2017-10-19 17:42:56 +02:00
exit ( 6 )
except SigmaConfigParseError as e :
2017-03-06 23:01:33 +01:00
print ( " Sigma configuration parse error in %s : %s " % ( conffile , str ( e ) ) , file = sys . stderr )
2017-10-19 17:42:56 +02:00
exit ( 7 )
2017-03-04 23:36:46 +01:00
2018-07-20 23:30:32 +02:00
backend_options = BackendOptions ( cmdargs . backend_option )
2018-08-02 22:41:32 +02:00
backend = backends . getBackend ( cmdargs . target ) ( sigmaconfig , backend_options )
filename = cmdargs . output
if filename :
try :
out = open ( filename , " w " , encoding = ' utf-8 ' )
except ( IOError , OSError ) as e :
print ( " Failed to open output file ' %s ' : %s " % ( filename , str ( e ) ) , file = sys . stderr )
exit ( 1 )
else :
out = sys . stdout
2017-02-22 22:47:12 +01:00
2017-08-02 00:56:22 +02:00
error = 0
2017-03-06 09:36:10 +01:00
for sigmafile in get_inputs ( cmdargs . inputs , cmdargs . recurse ) :
2017-02-22 22:43:35 +01:00
print_verbose ( " * Processing Sigma input %s " % ( sigmafile ) )
2017-02-13 23:14:40 +01:00
try :
2018-10-07 10:11:47 -05:00
if cmdargs . inputs == [ ' - ' ] :
f = sigmafile
else :
f = sigmafile . open ( encoding = ' utf-8 ' )
2017-11-02 00:02:15 +01:00
parser = SigmaCollectionParser ( f , sigmaconfig , rulefilter )
2018-08-02 22:41:32 +02:00
results = parser . generate ( backend )
for result in results :
print ( result , file = out )
2017-02-13 23:14:40 +01:00
except OSError as e :
2017-03-06 23:01:33 +01:00
print ( " Failed to open Sigma file %s : %s " % ( sigmafile , str ( e ) ) , file = sys . stderr )
2017-08-07 08:54:18 +02:00
error = 5
2017-10-19 17:42:56 +02:00
except ( yaml . parser . ParserError , yaml . scanner . ScannerError ) as e :
2017-03-06 23:01:33 +01:00
print ( " Sigma file %s is no valid YAML: %s " % ( sigmafile , str ( e ) ) , file = sys . stderr )
2017-08-02 00:56:22 +02:00
error = 3
if not cmdargs . defer_abort :
sys . exit ( error )
2017-11-02 00:02:15 +01:00
except ( SigmaParseError , SigmaCollectionParseError ) as e :
2017-03-06 23:01:33 +01:00
print ( " Sigma parse error in %s : %s " % ( sigmafile , str ( e ) ) , file = sys . stderr )
2017-08-02 00:56:22 +02:00
error = 4
if not cmdargs . defer_abort :
sys . exit ( error )
2018-07-20 23:30:32 +02:00
except NotSupportedError as e :
2018-06-22 00:22:45 +02:00
print ( " The Sigma rule requires a feature that is not supported by the target system: " + str ( e ) , file = sys . stderr )
2018-06-22 00:41:21 +02:00
if not cmdargs . ignore_backend_errors :
2018-06-22 00:22:45 +02:00
error = 9
if not cmdargs . defer_abort :
sys . exit ( error )
2018-07-20 23:30:32 +02:00
except BackendError as e :
2017-10-23 00:45:01 +02:00
print ( " Backend error in %s : %s " % ( sigmafile , str ( e ) ) , file = sys . stderr )
2018-06-07 23:32:52 +02:00
if not cmdargs . ignore_backend_errors :
error = 8
if not cmdargs . defer_abort :
sys . exit ( error )
2017-02-22 22:43:35 +01:00
except NotImplementedError as e :
2018-09-23 19:12:50 +02:00
print ( " An unsupported feature is required for this Sigma rule ( %s ): " % ( sigmafile ) + str ( e ) , file = sys . stderr )
2017-03-06 23:01:33 +01:00
print ( " Feel free to contribute for fun and fame, this is open source :) -> https://github.com/Neo23x0/sigma " , file = sys . stderr )
2018-06-07 23:32:52 +02:00
if not cmdargs . ignore_backend_errors :
2017-08-02 00:56:22 +02:00
error = 42
if not cmdargs . defer_abort :
sys . exit ( error )
2018-07-20 23:30:32 +02:00
except PartialMatchError as e :
2018-06-07 23:32:52 +02:00
print ( " Partial field match error: %s " % str ( e ) , file = sys . stderr )
if not cmdargs . ignore_backend_errors :
error = 80
if not cmdargs . defer_abort :
sys . exit ( error )
2018-07-20 23:30:32 +02:00
except FullMatchError as e :
2018-06-07 23:32:52 +02:00
print ( " Full field match error " , file = sys . stderr )
if not cmdargs . ignore_backend_errors :
error = 90
if not cmdargs . defer_abort :
sys . exit ( error )
2017-02-13 23:14:40 +01:00
finally :
2017-08-07 08:54:18 +02:00
try :
f . close ( )
except :
pass
2018-08-02 22:41:32 +02:00
result = backend . finalize ( )
if result :
print ( result , file = out )
out . close ( )
2017-08-02 00:56:22 +02:00
sys . exit ( error )