diff --git a/.github/workflows/sigma-test.yml b/.github/workflows/sigma-test.yml
index b6e10159e..ee0c317a5 100644
--- a/.github/workflows/sigma-test.yml
+++ b/.github/workflows/sigma-test.yml
@@ -35,3 +35,6 @@ jobs:
- name: Test Generated Elasticsearch Query Strings
run: |
make test-backend-es-qs
+ - name: Test SQL(ite) Backend
+ run: |
+ make test-backend-sql
diff --git a/Makefile b/Makefile
index 1d36cd903..e4968975a 100644
--- a/Makefile
+++ b/Makefile
@@ -58,6 +58,7 @@ test-sigmac:
$(COVERAGE) run -a --include=$(COVSCOPE) tools/sigmac -rvdI -t humio -O rulecomment -c tools/config/humio.yml rules/ > /dev/null
$(COVERAGE) run -a --include=$(COVSCOPE) tools/sigmac -rvdI -t crowdstrike -O rulecomment -c tools/config/crowdstrike.yml rules/ > /dev/null
$(COVERAGE) run -a --include=$(COVSCOPE) tools/sigmac -rvdI -t sql -c sysmon rules/ > /dev/null
+ $(COVERAGE) run -a --include=$(COVSCOPE) tools/sigmac -rvdI -t sqlite -c sysmon rules/ > /dev/null
$(COVERAGE) run -a --include=$(COVSCOPE) tools/sigmac -rvdI -t logiq -c sysmon rules/ > /dev/null
$(COVERAGE) run -a --include=$(COVSCOPE) tools/sigmac -rvdI -t splunk -c tools/config/splunk-windows-index.yml -f 'level>=high,level<=critical,status=stable,logsource=windows,tag=attack.execution' rules/ > /dev/null
! $(COVERAGE) run -a --include=$(COVSCOPE) tools/sigmac -rvdI -t splunk -c tools/config/splunk-windows-index.yml -f 'level>=high,level<=critical,status=xstable,logsource=windows' rules/ > /dev/null
@@ -107,6 +108,10 @@ test-merge:
test-backend-es-qs:
tests/test-backend-es-qs.py
+test-backend-sql:
+ cd tools && python3 setup.py install
+ cd tools && $(COVERAGE) run -m pytest tests/test_backend_sql.py tests/test_backend_sqlite.py
+
test-sigma2attack:
$(COVERAGE) run -a --include=$(COVSCOPE) tools/sigma2attack
diff --git a/tools/sigma/backends/sql.py b/tools/sigma/backends/sql.py
index b3149c01d..5b446a6f3 100644
--- a/tools/sigma/backends/sql.py
+++ b/tools/sigma/backends/sql.py
@@ -1,5 +1,6 @@
# Output backends for sigmac
# Copyright 2019 Jayden Zheng
+# Copyright 2020 Jonas Hagg
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
@@ -16,7 +17,9 @@
import re
import sigma
-from .base import SingleTextQueryBackend
+from sigma.backends.base import SingleTextQueryBackend
+from sigma.parser.condition import SigmaAggregationParser, NodeSubexpression, ConditionAND, ConditionOR, ConditionNOT
+from sigma.parser.exceptions import SigmaParseError
class SQLBackend(SingleTextQueryBackend):
"""Converts Sigma rule into SQL query"""
@@ -34,12 +37,16 @@ class SQLBackend(SingleTextQueryBackend):
notNullExpression = "%s=*" # Expression of queries for not null values. %s is field name
mapExpression = "%s = %s" # Syntax for field/value conditions. First %s is fieldname, second is value
mapMulti = "%s IN %s" # Syntax for field/value conditions. First %s is fieldname, second is value
- mapWildcard = "%s LIKE %s" # Syntax for swapping wildcard conditions.
+ mapWildcard = "%s LIKE %s ESCAPE \'\\\'"# Syntax for swapping wildcard conditions: Adding \ as escape character
mapSource = "%s=%s" # Syntax for sourcetype
mapListsSpecialHandling = False # Same handling for map items with list values as for normal values (strings, integers) if True, generateMapItemListNode method is called with node
mapListValueExpression = "%s OR %s" # Syntax for field/value condititons where map value is a list
mapLength = "(%s %s)"
+ def __init__(self, sigmaconfig, table):
+ super().__init__(sigmaconfig)
+ self.table = table
+
def generateANDNode(self, node):
generated = [ self.generateNode(val) for val in node ]
filtered = [ g for g in generated if g is not None ]
@@ -78,29 +85,32 @@ class SQLBackend(SingleTextQueryBackend):
def generateMapItemNode(self, node):
fieldname, value = node
transformed_fieldname = self.fieldNameMapping(fieldname, value)
- if "," in self.generateNode(value) and "%" not in self.generateNode(value):
+
+ has_wildcard = re.search(r"((\\(\*|\?|\\))|\*|\?|_|%)", self.generateNode(value))
+
+ if "," in self.generateNode(value) and not has_wildcard:
return self.mapMulti % (transformed_fieldname, self.generateNode(value))
elif "LENGTH" in transformed_fieldname:
return self.mapLength % (transformed_fieldname, value)
elif type(value) == list:
return self.generateMapItemListNode(transformed_fieldname, value)
elif self.mapListsSpecialHandling == False and type(value) in (str, int, list) or self.mapListsSpecialHandling == True and type(value) in (str, int):
- if "%" in self.generateNode(value):
+ if has_wildcard:
return self.mapWildcard % (transformed_fieldname, self.generateNode(value))
else:
return self.mapExpression % (transformed_fieldname, self.generateNode(value))
elif "sourcetype" in transformed_fieldname:
return self.mapSource % (transformed_fieldname, self.generateNode(value))
- elif "*" in str(value):
+ elif has_wildcard:
return self.mapWildcard % (transformed_fieldname, self.generateNode(value))
else:
raise TypeError("Backend does not support map values of type " + str(type(value)))
def generateMapItemListNode(self, key, value):
- return "(" + (" OR ".join(['%s LIKE %s' % (key, self.generateValueNode(item)) for item in value])) + ")"
-
+ return "(" + (" OR ".join([self.mapWildcard % (key, self.generateValueNode(item)) for item in value])) + ")"
+
def generateValueNode(self, node):
- return self.valueExpression % (self.cleanValue(str(node)))
+ return self.valueExpression % (self.cleanValue(str(node)))
def generateNULLValueNode(self, node):
return self.nullExpression % (node.item)
@@ -117,10 +127,97 @@ class SQLBackend(SingleTextQueryBackend):
return fieldname
def cleanValue(self, val):
- if "*" == val:
- pass
- elif "*.*.*" in val:
- val = val.replace("*.*.*", "%")
- elif re.search(r'\*', val):
- val = re.sub(r'\*', '%', val)
+ if not isinstance(val, str):
+ return str(val)
+
+ #Single backlashes which are not in front of * or ? are doulbed
+ val = re.sub(r"(? full text search
+ #False: no subexpression found, where a full text search is needed
+
+ def _evaluateCondition(condition):
+ #Helper function to evaulate condtions
+ if type(condition) not in [ConditionAND, ConditionOR, ConditionNOT]:
+ raise NotImplementedError("Error in recursive Search logic")
+
+ results = []
+ for elem in condition.items:
+ if isinstance(elem, NodeSubexpression):
+ results.append(self._recursiveFtsSearch(elem))
+ if isinstance(elem, ConditionNOT):
+ results.append(_evaluateCondition(elem))
+ if isinstance(elem, tuple):
+ results.append(False)
+ if type(elem) in (str, int, list):
+ return True
+ return any(results)
+
+ if type(subexpression) in [str, int, list]:
+ return True
+ elif type(subexpression) in [tuple]:
+ return False
+
+ if not isinstance(subexpression, NodeSubexpression):
+ raise NotImplementedError("Error in recursive Search logic")
+
+ if isinstance(subexpression.items, NodeSubexpression):
+ return self._recursiveFtsSearch(subexpression.items)
+ elif type(subexpression.items) in [ConditionAND, ConditionOR, ConditionNOT]:
+ return _evaluateCondition(subexpression.items)
\ No newline at end of file
diff --git a/tools/sigma/backends/sqlite.py b/tools/sigma/backends/sqlite.py
new file mode 100644
index 000000000..8eec13ea7
--- /dev/null
+++ b/tools/sigma/backends/sqlite.py
@@ -0,0 +1,123 @@
+# Output backends for sigmac
+# Copyright 2020 Jonas Hagg
+
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Lesser General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Lesser General Public License for more details.
+
+# You should have received a copy of the GNU Lesser General Public License
+# along with this program. If not, see .
+
+from sigma.backends.sql import SQLBackend
+from sigma.parser.condition import NodeSubexpression, ConditionAND, ConditionOR, ConditionNOT
+import re
+
+
+class SQLiteBackend(SQLBackend):
+ """Converts Sigma rule into SQL query for SQLite"""
+ identifier = "sqlite"
+ active = True
+
+ mapFullTextSearch = "%s MATCH ('\"%s\"')"
+
+ def __init__(self, sigmaconfig, table):
+ super().__init__(sigmaconfig, table)
+ self.mappingItem = False
+
+ def requireFTS(self, node):
+ return (not self.mappingItem and
+ (type(node) in (int, str) or all(isinstance(val, str) for val in node) or all(isinstance(val, int) for val in node)))
+
+ def generateFTS(self, value):
+ if re.search(r"((\\(\*|\?|\\))|\*|\?|_|%)", value):
+ raise NotImplementedError(
+ "Wildcards in SQlite Full Text Search not implemented")
+ self.countFTS += 1
+ return self.mapFullTextSearch % (self.table, value)
+
+ def generateANDNode(self, node):
+
+ if self.requireFTS(node):
+ fts = str('"' + self.andToken + '"').join(self.cleanValue(val)
+ for val in node)
+ return self.generateFTS(fts)
+
+ generated = [self.generateNode(val) for val in node]
+ filtered = [g for g in generated if g is not None]
+ if filtered:
+ return self.andToken.join(filtered)
+ else:
+ return None
+
+ def generateORNode(self, node):
+
+ if self.requireFTS(node):
+ fts = str('"' + self.orToken + '"').join(self.cleanValue(val)
+ for val in node)
+ return self.generateFTS(fts)
+
+ generated = [self.generateNode(val) for val in node]
+ filtered = [g for g in generated if g is not None]
+ if filtered:
+ return self.orToken.join(filtered)
+ else:
+ return None
+
+ def generateMapItemNode(self, node):
+ try:
+ self.mappingItem = True
+ fieldname, value = node
+ transformed_fieldname = self.fieldNameMapping(fieldname, value)
+
+ has_wildcard = re.search(
+ r"((\\(\*|\?|\\))|\*|\?|_|%)", self.generateNode(value))
+
+ if "," in self.generateNode(value) and not has_wildcard:
+ return self.mapMulti % (transformed_fieldname, self.generateNode(value))
+ elif "LENGTH" in transformed_fieldname:
+ return self.mapLength % (transformed_fieldname, value)
+ elif type(value) == list:
+ return self.generateMapItemListNode(transformed_fieldname, value)
+ elif self.mapListsSpecialHandling == False and type(value) in (str, int, list) or self.mapListsSpecialHandling == True and type(value) in (str, int):
+
+ if has_wildcard:
+ return self.mapWildcard % (transformed_fieldname, self.generateNode(value))
+ else:
+ return self.mapExpression % (transformed_fieldname, self.generateNode(value))
+
+ elif "sourcetype" in transformed_fieldname:
+ return self.mapSource % (transformed_fieldname, self.generateNode(value))
+ elif has_wildcard:
+ return self.mapWildcard % (transformed_fieldname, self.generateNode(value))
+ else:
+ raise TypeError(
+ "Backend does not support map values of type " + str(type(value)))
+ finally:
+ self.mappingItem = False
+
+ def generateValueNode(self, node):
+ if self.mappingItem:
+ return self.valueExpression % (self.cleanValue(str(node)))
+ else:
+ return self.generateFTS(self.cleanValue(str(node)))
+
+ def generateQuery(self, parsed):
+ self.countFTS = 0
+ result = self.generateNode(parsed.parsedSearch)
+ if self.countFTS > 1:
+ raise NotImplementedError(
+ "Match operator ({}) is allowed only once in SQLite, parse rule in a different way:\n{}".format(self.countFTS, result))
+ self.countFTS = 0
+
+ if parsed.parsedAgg:
+ # Handle aggregation
+ fro, whe = self.generateAggregation(parsed.parsedAgg, result)
+ return "SELECT * FROM {} WHERE {}".format(fro, whe)
+
+ return "SELECT * FROM {} WHERE {}".format(self.table, result)
diff --git a/tools/tests/test_backend_sql.py b/tools/tests/test_backend_sql.py
new file mode 100644
index 000000000..b4bd82026
--- /dev/null
+++ b/tools/tests/test_backend_sql.py
@@ -0,0 +1,334 @@
+# Test output backends for sigmac
+# Copyright 2020 Jonas Hagg
+
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Lesser General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Lesser General Public License for more details.
+
+# You should have received a copy of the GNU Lesser General Public License
+# along with this program. If not, see .
+
+import unittest
+from unittest.mock import patch
+
+from sigma.backends.sql import SQLBackend
+
+from sigma.parser.collection import SigmaCollectionParser
+from sigma.config.mapping import FieldMapping
+from sigma.configuration import SigmaConfiguration
+
+class TestGenerateQuery(unittest.TestCase):
+
+ def setUp(self):
+ self.basic_rule = {"title": "Test", "level": "testing"}
+ self.table = "eventlog"
+
+ def test_regular_queries(self):
+ # Test regular queries
+ detection = {"selection": {"fieldname": "test1"},
+ "condition": "selection"}
+ expected_result = 'SELECT * FROM {} WHERE fieldname = "test1"'.format(
+ self.table)
+ self.validate(detection, expected_result)
+
+ detection = {"selection": {"fieldname": 4}, "condition": "selection"}
+ expected_result = 'SELECT * FROM {} WHERE fieldname = "4"'.format(
+ self.table)
+ self.validate(detection, expected_result)
+
+ detection = {"selection": {"fieldname": [
+ "test1", "test2"]}, "condition": "selection"}
+ expected_result = 'SELECT * FROM {} WHERE fieldname IN ("test1", "test2")'.format(
+ self.table)
+ self.validate(detection, expected_result)
+
+ detection = {"selection": {
+ "fieldname": [3, 4]}, "condition": "selection"}
+ expected_result = 'SELECT * FROM {} WHERE fieldname IN ("3", "4")'.format(
+ self.table)
+ self.validate(detection, expected_result)
+
+ detection = {"selection": {"fieldname1": "test1", "fieldname2": [
+ "test2", "test3"]}, "condition": "selection"}
+ expected_result = 'SELECT * FROM {} WHERE (fieldname1 = "test1" AND fieldname2 IN ("test2", "test3"))'.format(
+ self.table)
+ self.validate(detection, expected_result)
+
+ detection = {"selection": {"fieldname": "test1"}, "filter": {
+ "fieldname2": "whatever"}, "condition": "selection and filter"}
+ expected_result = 'SELECT * FROM {} WHERE (fieldname = "test1" AND fieldname2 = "whatever")'.format(
+ self.table)
+ self.validate(detection, expected_result)
+
+ detection = {"selection": {"fieldname": "test1"}, "filter": {
+ "fieldname2": "whatever"}, "condition": "selection or filter"}
+ expected_result = 'SELECT * FROM {} WHERE (fieldname = "test1" OR fieldname2 = "whatever")'.format(
+ self.table)
+ self.validate(detection, expected_result)
+
+ detection = {"selection": {"fieldname": "test1"}, "filter": {
+ "fieldname2": "whatever"}, "condition": "selection and not filter"}
+ expected_result = 'SELECT * FROM {} WHERE (fieldname = "test1" AND NOT (fieldname2 = "whatever"))'.format(
+ self.table)
+ self.validate(detection, expected_result)
+
+ detection = {"selection": {"fieldname1": "test1"}, "filter": {
+ "fieldname2": "test2"}, "condition": "1 of them"}
+ expected_result = 'SELECT * FROM {} WHERE (fieldname1 = "test1" OR fieldname2 = "test2")'.format(
+ self.table)
+ self.validate(detection, expected_result)
+
+ detection = {"selection": {"fieldname1": "test1"}, "filter": {
+ "fieldname2": "test2"}, "condition": "all of them"}
+ expected_result = 'SELECT * FROM {} WHERE (fieldname1 = "test1" AND fieldname2 = "test2")'.format(
+ self.table)
+ self.validate(detection, expected_result)
+
+ def test_modifiers(self):
+
+ # contains
+ detection = {"selection": {"fieldname|contains": "test"},
+ "condition": "selection"}
+ expected_result = 'SELECT * FROM {} WHERE fieldname LIKE "%test%" ESCAPE \'\\\''.format(
+ self.table)
+ self.validate(detection, expected_result)
+
+ # all
+ detection = {"selection": {"fieldname|all": [
+ "test1", "test2"]}, "condition": "selection"}
+ expected_result = 'SELECT * FROM {} WHERE (fieldname = "test1" AND fieldname = "test2")'.format(
+ self.table)
+ self.validate(detection, expected_result)
+
+ # endswith
+ detection = {"selection": {"fieldname|endswith": "test"},
+ "condition": "selection"}
+ expected_result = 'SELECT * FROM {} WHERE fieldname LIKE "%test" ESCAPE \'\\\''.format(
+ self.table)
+ self.validate(detection, expected_result)
+
+ # startswith
+ detection = {"selection": {"fieldname|startswith": "test"},
+ "condition": "selection"}
+ expected_result = 'SELECT * FROM {} WHERE fieldname LIKE "test%" ESCAPE \'\\\''.format(
+ self.table)
+ self.validate(detection, expected_result)
+
+ def test_aggregations(self):
+
+ # count
+ detection = {"selection": {"fieldname": "test"},
+ "condition": "selection | count() > 5"}
+ inner_query = 'SELECT count(*) AS agg FROM {} WHERE fieldname = "test"'.format(
+ self.table)
+ expected_result = 'SELECT * FROM ({}) WHERE agg > 5'.format(inner_query)
+ self.validate(detection, expected_result)
+
+ # min
+ detection = {"selection": {"fieldname1": "test"},
+ "condition": "selection | min(fieldname2) > 5"}
+ inner_query = 'SELECT min(fieldname2) AS agg FROM {} WHERE fieldname1 = "test"'.format(
+ self.table)
+ expected_result = 'SELECT * FROM ({}) WHERE agg > 5'.format(inner_query)
+ self.validate(detection, expected_result)
+
+ # max
+ detection = {"selection": {"fieldname1": "test"},
+ "condition": "selection | max(fieldname2) > 5"}
+ inner_query = 'SELECT max(fieldname2) AS agg FROM {} WHERE fieldname1 = "test"'.format(
+ self.table)
+ expected_result = 'SELECT * FROM ({}) WHERE agg > 5'.format(inner_query)
+ self.validate(detection, expected_result)
+
+ # avg
+ detection = {"selection": {"fieldname1": "test"},
+ "condition": "selection | avg(fieldname2) > 5"}
+ inner_query = 'SELECT avg(fieldname2) AS agg FROM {} WHERE fieldname1 = "test"'.format(
+ self.table)
+ expected_result = 'SELECT * FROM ({}) WHERE agg > 5'.format(inner_query)
+ self.validate(detection, expected_result)
+
+ # sum
+ detection = {"selection": {"fieldname1": "test"},
+ "condition": "selection | sum(fieldname2) > 5"}
+ inner_query = 'SELECT sum(fieldname2) AS agg FROM {} WHERE fieldname1 = "test"'.format(
+ self.table)
+ expected_result = 'SELECT * FROM ({}) WHERE agg > 5'.format(inner_query)
+ self.validate(detection, expected_result)
+
+ # <
+ detection = {"selection": {"fieldname1": "test"},
+ "condition": "selection | sum(fieldname2) < 5"}
+ inner_query = 'SELECT sum(fieldname2) AS agg FROM {} WHERE fieldname1 = "test"'.format(
+ self.table)
+ expected_result = 'SELECT * FROM ({}) WHERE agg < 5'.format(inner_query)
+ self.validate(detection, expected_result)
+
+ # ==
+ detection = {"selection": {"fieldname1": "test"},
+ "condition": "selection | sum(fieldname2) == 5"}
+ inner_query = 'SELECT sum(fieldname2) AS agg FROM {} WHERE fieldname1 = "test"'.format(
+ self.table)
+ expected_result = 'SELECT * FROM ({}) WHERE agg == 5'.format(inner_query)
+ self.validate(detection, expected_result)
+
+ # group by
+ detection = {"selection": {"fieldname1": "test"},
+ "condition": "selection | sum(fieldname2) by fieldname3 == 5"}
+ inner_query = 'SELECT sum(fieldname2) AS agg FROM {} WHERE fieldname1 = "test" GROUP BY fieldname3'.format(
+ self.table)
+ expected_result = 'SELECT * FROM ({}) WHERE agg == 5'.format(inner_query)
+ self.validate(detection, expected_result)
+
+ # multiple conditions
+ detection = {"selection": {"fieldname1": "test"}, "filter": {
+ "fieldname2": "tessst"}, "condition": "selection OR filter | sum(fieldname2) == 5"}
+ inner_query = 'SELECT sum(fieldname2) AS agg FROM {} WHERE (fieldname1 = "test" OR fieldname2 = "tessst")'.format(
+ self.table)
+ expected_result = 'SELECT * FROM ({}) WHERE agg == 5'.format(inner_query)
+ self.validate(detection, expected_result)
+
+ def test_wildcards(self):
+
+ # wildcard: *
+ detection = {"selection": {"fieldname": "test*"},
+ "condition": "selection"}
+ expected_result = 'SELECT * FROM {} WHERE fieldname LIKE '.format(
+ self.table) + r'"test%"' + r" ESCAPE '\'"
+ self.validate(detection, expected_result)
+
+ # wildcard: ?
+ detection = {"selection": {"fieldname": "test?"},
+ "condition": "selection"}
+ expected_result = 'SELECT * FROM {} WHERE fieldname LIKE '.format(
+ self.table) + r'"test_"' + r" ESCAPE '\'"
+ self.validate(detection, expected_result)
+
+ # escaping:
+ detection = {"selection": {"fieldname": r"test\?"},
+ "condition": "selection"}
+ expected_result = 'SELECT * FROM {} WHERE fieldname LIKE '.format(
+ self.table) + r'"test\?"' + r" ESCAPE '\'"
+ self.validate(detection, expected_result)
+
+ detection = {"selection": {"fieldname": r"test\\*"},
+ "condition": "selection"}
+ expected_result = 'SELECT * FROM {} WHERE fieldname LIKE '.format(
+ self.table) + r'"test\\%"' + r" ESCAPE '\'"
+ self.validate(detection, expected_result)
+
+ detection = {"selection": {"fieldname": r"test\*"},
+ "condition": "selection"}
+ expected_result = 'SELECT * FROM {} WHERE fieldname LIKE '.format(
+ self.table) + r'"test\*"' + r" ESCAPE '\'"
+ self.validate(detection, expected_result)
+
+ detection = {"selection": {"fieldname": r"test\\"},
+ "condition": "selection"}
+ expected_result = 'SELECT * FROM {} WHERE fieldname LIKE '.format(
+ self.table) + r'"test\\"' + r" ESCAPE '\'"
+ self.validate(detection, expected_result)
+
+ detection = {"selection": {"fieldname": r"test\abc"},
+ "condition": "selection"}
+ expected_result = 'SELECT * FROM {} WHERE fieldname LIKE '.format(
+ self.table) + r'"test\\abc"' + r" ESCAPE '\'"
+ self.validate(detection, expected_result)
+
+ detection = {"selection": {"fieldname": r"test%"},
+ "condition": "selection"}
+ expected_result = 'SELECT * FROM {} WHERE fieldname LIKE '.format(
+ self.table) + r'"test\%"' + r" ESCAPE '\'"
+ self.validate(detection, expected_result)
+
+ detection = {"selection": {"fieldname": r"test_"},
+ "condition": "selection"}
+ expected_result = 'SELECT * FROM {} WHERE fieldname LIKE '.format(
+ self.table) + r'"test\_"' + r" ESCAPE '\'"
+ self.validate(detection, expected_result)
+
+ # multiple options
+ detection = {"selection": {"fieldname": [
+ "test*", "*test"]}, "condition": "selection"}
+ opt1 = 'fieldname LIKE ' + r'"test%"' + r" ESCAPE '\'"
+ opt2 = 'fieldname LIKE ' + r'"%test"' + r" ESCAPE '\'"
+ expected_result = 'SELECT * FROM {} WHERE ({} OR {})'.format(
+ self.table, opt1, opt2)
+ self.validate(detection, expected_result)
+
+ detection = {"selection": {"fieldname|all": [
+ "test*", "*test"]}, "condition": "selection"}
+ opt1 = 'fieldname LIKE ' + r'"test%"' + r" ESCAPE '\'"
+ opt2 = 'fieldname LIKE ' + r'"%test"' + r" ESCAPE '\'"
+ expected_result = 'SELECT * FROM {} WHERE ({} AND {})'.format(
+ self.table, opt1, opt2)
+ self.validate(detection, expected_result)
+
+ def test_fieldname_mapping(self):
+ detection = {"selection": {"fieldname": "test1"},
+ "condition": "selection"}
+ expected_result = 'SELECT * FROM {} WHERE mapped_fieldname = "test1"'.format(
+ self.table)
+
+ # configure mapping
+ config = SigmaConfiguration()
+ config.fieldmappings["fieldname"] = FieldMapping(
+ "fieldname", "mapped_fieldname")
+
+ self.basic_rule["detection"] = detection
+
+ with patch("yaml.safe_load_all", return_value=[self.basic_rule]):
+ parser = SigmaCollectionParser("any sigma io", config, None)
+ backend = SQLBackend(config, self.table)
+
+ assert len(parser.parsers) == 1
+
+ for p in parser.parsers:
+ self.assertEqual(expected_result, backend.generate(p))
+
+ def test_not_implemented(self):
+ # near aggregation not implemented
+ detection = {"selection": {"fieldname": "test"}, "filter": {
+ "fieldname": "test2"}, "condition": "selection | near selection and filter"}
+ expected_result = NotImplementedError()
+ self.validate(detection, expected_result)
+
+ # re modifier is not implemented
+ detection = {"selection": {"fieldname|re": "test"},
+ "condition": "selection"}
+ expected_result = NotImplementedError()
+ self.validate(detection, expected_result)
+
+ #Full Text Search is not implemented
+ detection = {"selection": ["test1"], "condition": "selection"}
+ expected_result = NotImplementedError()
+ self.validate(detection, expected_result)
+
+
+ def validate(self, detection, expectation):
+
+ config = SigmaConfiguration()
+
+ self.basic_rule["detection"] = detection
+
+ with patch("yaml.safe_load_all", return_value=[self.basic_rule]):
+ parser = SigmaCollectionParser("any sigma io", config, None)
+ backend = SQLBackend(config, self.table)
+
+ assert len(parser.parsers) == 1
+
+ for p in parser.parsers:
+ if isinstance(expectation, str):
+ self.assertEqual(expectation, backend.generate(p))
+ elif isinstance(expectation, Exception):
+ self.assertRaises(type(expectation), backend.generate, p)
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/tools/tests/test_backend_sqlite.py b/tools/tests/test_backend_sqlite.py
new file mode 100644
index 000000000..66fc68123
--- /dev/null
+++ b/tools/tests/test_backend_sqlite.py
@@ -0,0 +1,148 @@
+# Test output backends for sigmac
+# Copyright 2020 Jonas Hagg
+
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Lesser General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Lesser General Public License for more details.
+
+# You should have received a copy of the GNU Lesser General Public License
+# along with this program. If not, see .
+
+import unittest
+from unittest.mock import patch
+
+from sigma.backends.sqlite import SQLiteBackend
+
+from sigma.parser.collection import SigmaCollectionParser
+from sigma.config.mapping import FieldMapping
+from sigma.configuration import SigmaConfiguration
+
+class TestFullTextSearch(unittest.TestCase):
+
+ def setUp(self):
+ self.basic_rule = {"title": "Test", "level": "testing"}
+ self.table = "eventlog"
+
+ def test_full_text_search(self):
+ detection = {"selection": ["test1"], "condition": "selection"}
+ expected_result = 'SELECT * FROM {0} WHERE {0} MATCH (\'"test1"\')'.format(
+ self.table)
+ self.validate(detection, expected_result)
+
+ detection = {"selection": [5], "condition": "selection"}
+ expected_result = 'SELECT * FROM {0} WHERE {0} MATCH (\'"5"\')'.format(
+ self.table)
+ self.validate(detection, expected_result)
+
+ detection = {"selection": ["test1", "test2"], "condition": "selection"}
+ expected_result = 'SELECT * FROM {0} WHERE ({0} MATCH (\'"test1" OR "test2"\'))'.format(
+ self.table)
+ self.validate(detection, expected_result)
+
+ detection = {"selection": ["test1"], "filter":["test2"], "condition": "selection and filter"}
+ expected_result = 'SELECT * FROM {0} WHERE ({0} MATCH (\'"test1" AND "test2"\'))'.format(
+ self.table)
+ self.validate(detection, expected_result)
+
+ detection = {"selection": [5, 6], "condition": "selection"}
+ expected_result = 'SELECT * FROM {0} WHERE ({0} MATCH (\'"5" OR "6"\'))'.format(
+ self.table)
+ self.validate(detection, expected_result)
+
+ detection = {"selection": ["test1"], "filter": [
+ "test2"], "condition": "selection or filter"}
+ expected_result = 'SELECT * FROM {0} WHERE ({0} MATCH (\'"test1" OR "test2"\'))'.format(
+ self.table)
+ self.validate(detection, expected_result)
+
+ detection = {"selection": ["test1"], "filter": [
+ "test2"], "condition": "selection and filter"}
+ expected_result = 'SELECT * FROM {0} WHERE ({0} MATCH (\'"test1" AND "test2"\'))'.format(
+ self.table)
+ self.validate(detection, expected_result)
+
+ def test_full_text_search_aggregation(self):
+ # aggregation with fts
+ detection = {"selection": ["test"],
+ "condition": "selection | count() > 5"}
+ inner_query = 'SELECT count(*) AS agg FROM {0} WHERE {0} MATCH (\'"test"\')'.format(
+ self.table)
+ expected_result = 'SELECT * FROM ({}) WHERE agg > 5'.format(inner_query)
+ self.validate(detection, expected_result)
+
+ detection = {"selection": ["test1", "test2"],
+ "condition": "selection | count() > 5"}
+ inner_query = 'SELECT count(*) AS agg FROM {0} WHERE ({0} MATCH (\'"test1" OR "test2"\'))'.format(
+ self.table)
+ expected_result = 'SELECT * FROM ({}) WHERE agg > 5'.format(inner_query)
+ self.validate(detection, expected_result)
+
+ # aggregation + group by + fts
+ detection = {"selection": ["test1", "test2"],
+ "condition": "selection | count() by fieldname > 5"}
+ inner_query = 'SELECT count(*) AS agg FROM {0} WHERE ({0} MATCH (\'"test1" OR "test2"\')) GROUP BY fieldname'.format(
+ self.table)
+ expected_result = 'SELECT * FROM ({}) WHERE agg > 5'.format(inner_query)
+ self.validate(detection, expected_result)
+
+ def test_not_implemented(self):
+ # fts not implemented with wildcards
+ detection = {"selection": ["test*"], "condition": "selection"}
+ expected_result = NotImplementedError()
+ self.validate(detection, expected_result)
+
+ detection = {"selection": ["test?"], "condition": "selection"}
+ expected_result = NotImplementedError()
+ self.validate(detection, expected_result)
+
+ detection = {"selection": ["test\\"], "condition": "selection"}
+ expected_result = NotImplementedError()
+ self.validate(detection, expected_result)
+
+
+ # fts is not implemented for nested condtions
+ detection = {"selection": ["test"], "filter": [
+ "test2"], "condition": "selection and filter"} # this is ok
+ detection = {"selection": ["test"], "filter": [
+ "test2"], "condition": "selection or filter"} # this is ok
+ detection = {"selection": ["test"], "filter": [
+ "test2"], "condition": "selection and not filter"} # this is already nested
+ expected_result = NotImplementedError()
+ self.validate(detection, expected_result)
+
+ detection = {"selection": ["test"], "filter": [
+ "test2"], "condition": "selection and filter and filter"} # this is nested
+ expected_result = NotImplementedError()
+ self.validate(detection, expected_result)
+
+ detection = {"selection": ["test"], "filter": [
+ "test2"], "condition": "selection and filter or filter"} # this is nested
+ expected_result = NotImplementedError()
+ self.validate(detection, expected_result)
+
+ def validate(self, detection, expectation):
+
+ config = SigmaConfiguration()
+
+ self.basic_rule["detection"] = detection
+
+ with patch("yaml.safe_load_all", return_value=[self.basic_rule]):
+ parser = SigmaCollectionParser("any sigma io", config, None)
+ backend = SQLiteBackend(config, self.table)
+
+ assert len(parser.parsers) == 1
+
+ for p in parser.parsers:
+ if isinstance(expectation, str):
+ self.assertEqual(expectation, backend.generate(p))
+ elif isinstance(expectation, Exception):
+ self.assertRaises(type(expectation), backend.generate, p)
+
+if __name__ == '__main__':
+ unittest.main()
\ No newline at end of file