Files
blue-team-tools/tests/test_logsource.py
T
2023-05-08 11:04:16 +02:00

243 lines
9.6 KiB
Python

#!/usr/bin/env python3
"""
Checks for logsource or fieldname errors on all rules
Run using the command
# python test_logsource.py
"""
import os
import unittest
import yaml
from colorama import init
from colorama import Fore
import json
class TestRules(unittest.TestCase):
path_to_rules_ = ["rules", "rules-emerging-threats", "rules-placeholder", "rules-threat-hunting", "rules-compliance"]
path_to_rules = []
for path_ in path_to_rules_:
path_to_rules.append(os.path.join(os.path.dirname(os.path.realpath(__name__)), path_))
# Helper functions
def yield_next_rule_file_path(self, path_to_rules: list) -> str:
for path_ in path_to_rules:
for root, _, files in os.walk(path_):
for file in files:
if file.endswith('.yml'):
yield os.path.join(root, file)
def get_rule_yaml(self, file_path: str) -> dict:
data = []
with open(file_path, encoding='utf-8') as f:
yaml_parts = yaml.safe_load_all(f)
for part in yaml_parts:
data.append(part)
return data
def get_rule_part(self, file_path: str, part_name: str):
yaml_dicts = self.get_rule_yaml(file_path)
for yaml_part in yaml_dicts:
if part_name in yaml_part.keys():
return yaml_part[part_name]
return None
def get_detection_field(self,detection: dict):
data = []
def get_field_name(selection: dict):
name = []
for field in selection:
if field == '|all':
continue
elif "|" in field:
name.append(field.split('|')[0])
else:
name.append(field)
return name
for search_identifier in detection:
if isinstance(detection[search_identifier], dict):
data += get_field_name(detection[search_identifier])
if isinstance(detection[search_identifier], list):
for list_value in detection[search_identifier]:
if isinstance(list_value, dict):
data += get_field_name(list_value)
return data
def full_logsource(self,logsource: dict) -> dict:
data = {}
data["product"] = logsource["product"] if "product" in logsource.keys() else None
data["category"] = logsource["category"] if "category" in logsource.keys() else None
data["service"] = logsource["service"] if "service" in logsource.keys() else None
return data
def exist_logsource(self,logsource:dict) -> bool:
#Check New product
if logsource["product"]:
if logsource["product"] in fieldname_dict.keys():
product = logsource["product"]
else:
return False
else:
product="empty"
if logsource["category"] and logsource["category"] in fieldname_dict[product]['category'].keys():
return True
elif logsource["service"] and logsource["service"] in fieldname_dict[product]['service'].keys():
return True
elif logsource["category"] == None and logsource["service"] == None:
return True # We known the product but there are no category or service
return False
def get_logsource(self,logsource:dict) -> list:
data = None
product = logsource["product"] if logsource["product"] in fieldname_dict.keys() else "empty"
if logsource["category"] and logsource["category"] in fieldname_dict[product]['category'].keys():
data= fieldname_dict[product]["category"][logsource["category"]]
elif logsource["service"] and logsource["service"] in fieldname_dict[product]['service'].keys():
data= fieldname_dict[product]["service"][logsource["service"]]
elif logsource["category"] == None and logsource["service"] == None:
data = fieldname_dict[product]["empty"]
return data
def not_commun(self,logsource:dict,data:list) -> bool:
product = logsource["product"] if logsource["product"] in fieldname_dict.keys() else "empty"
if fieldname_dict[product]["commun"] == data:
return False
else:
return True
#
# test functions
#
def test_invalid_logsource_attributes(self):
faulty_rules = []
valid_logsource = [
'category',
'product',
'service',
'definition',
]
for file in self.yield_next_rule_file_path(self.path_to_rules):
logsource = self.get_rule_part(
file_path=file, part_name="logsource")
if not logsource:
print(Fore.RED + "Rule {} has no 'logsource'.".format(file))
faulty_rules.append(file)
continue
valid = True
for key in logsource:
if key not in valid_logsource:
print(
Fore.RED + "Rule {} has a logsource with an invalid field ({})".format(file, key))
valid = False
elif not isinstance(logsource[key], str):
print(
Fore.RED + "Rule {} has a logsource with an invalid field type ({})".format(file, key))
valid = False
if not valid:
faulty_rules.append(file)
self.assertEqual(faulty_rules, [], Fore.RED +
"There are rules with non-conform 'logsource' fields. Please check: https://github.com/SigmaHQ/sigma/wiki/Rule-Creation-Guide#log-source")
def test_logsource_value(self):
faulty_rules = []
for file in self.yield_next_rule_file_path(self.path_to_rules):
logsource = self.get_rule_part(
file_path=file, part_name="logsource")
if logsource:
full_logsource = self.full_logsource(logsource)
if not self.exist_logsource(full_logsource):
faulty_rules.append(file)
print(
Fore.RED + "Rule {} has the unknown logsource product/category/service ({}/{}/{})".format(file,
full_logsource["product"],
full_logsource["category"],
full_logsource["service"]
))
self.assertEqual(faulty_rules, [], Fore.RED +
"There are rules with non-conform 'logsource' values.")
def test_fieldname_case(self):
files_with_fieldname_issues = []
for file in self.yield_next_rule_file_path(self.path_to_rules):
logsource = self.get_rule_part(file_path=file, part_name="logsource")
detection = self.get_rule_part(file_path=file, part_name="detection")
if logsource and detection :
full_logsource = self.full_logsource(logsource)
list_valid = self.get_logsource(full_logsource)
fisrt_time = True
if list_valid != [] and self.not_commun(full_logsource,list_valid):
for field in self.get_detection_field(detection):
if not field in list_valid:
print(
Fore.RED + "Rule {} has the invalid field <{}>".format(file, field))
if fisrt_time:
files_with_fieldname_issues.append(file)
fisrt_time = False # can be many error in the same rule
self.assertEqual(files_with_fieldname_issues, [], Fore.RED +
"There are rule files which contains unknown field or with cast error")
def load_fields_json(name:str):
data = {}
file_path = os.path.abspath( os.path.dirname( __file__ ) ) +'/'+ name
with open(file_path, 'r') as file:
json_dict = json.load(file)
for product in json_dict["legit"]:
data[product] = json_dict["legit"][product]
for product in json_dict["addon"]:
for category in json_dict["addon"][product]["category"]:
data[product]["category"][category] += json_dict["addon"][product]["category"][category]
for service in json_dict["addon"][product]["service"]:
data[product]["service"][service] += json_dict["addon"][product]["service"][service]
# We use some extracted hash
# Add commun field
for product in data:
for category in data[product]["category"]:
if "Hashes" in data[product]["category"][category]:
data[product]["category"][category] += ["md5","sha1","sha256","Imphash"]
if "Hash" in data[product]["category"][category]: # Sysmon 15 create_stream_hash
data[product]["category"][category] += ["md5","sha1","sha256","Imphash"]
if "commun" in data[product].keys():
data[product]["category"][category] += data[product]["commun"]
for service in data[product]["service"]:
if "commun" in data[product].keys():
data[product]["service"][service] += data[product]["commun"]
return data
if __name__ == "__main__":
init(autoreset=True)
# load field name information
fieldname_dict = load_fields_json('logsource.json')
# Run the tests
unittest.main()