diff --git a/detection_rules/etc/integration-manifests.json.gz b/detection_rules/etc/integration-manifests.json.gz index 5db6d11af..50761ff2f 100644 Binary files a/detection_rules/etc/integration-manifests.json.gz and b/detection_rules/etc/integration-manifests.json.gz differ diff --git a/detection_rules/etc/integration-schemas.json.gz b/detection_rules/etc/integration-schemas.json.gz index 3d632e0f2..9f4b139dc 100644 Binary files a/detection_rules/etc/integration-schemas.json.gz and b/detection_rules/etc/integration-schemas.json.gz differ diff --git a/detection_rules/rule_validators.py b/detection_rules/rule_validators.py index 94601e988..e6818fe13 100644 --- a/detection_rules/rule_validators.py +++ b/detection_rules/rule_validators.py @@ -15,6 +15,15 @@ from . import ecs, endgame from .integrations import get_integration_schema_data, load_integrations_manifests from .rule import QueryRuleData, QueryValidator, RuleMeta, TOMLRuleContents +EQL_ERROR_TYPES = Union[eql.EqlCompileError, + eql.EqlError, + eql.EqlParseError, + eql.EqlSchemaError, + eql.EqlSemanticError, + eql.EqlSyntaxError, + eql.EqlTypeMismatchError] +KQL_ERROR_TYPES = Union[kql.KqlCompileError, kql.KqlParseError] + class KQLValidator(QueryValidator): """Specific fields for KQL query event types.""" @@ -40,14 +49,21 @@ class KQLValidator(QueryValidator): packages_manifest = load_integrations_manifests() package_integrations = TOMLRuleContents.get_packaged_integrations(data, meta, packages_manifest) + validation_checks = {"stack": None, "integrations": None} # validate the query against fields within beats - self.validate_stack_combos(data, meta) + validation_checks["stack"] = self.validate_stack_combos(data, meta) if package_integrations: # validate the query against related integration fields - self.validate_integration(data, meta, package_integrations) + validation_checks["integrations"] = self.validate_integration(data, meta, package_integrations) - def validate_stack_combos(self, data: QueryRuleData, meta: RuleMeta) -> None: + if (validation_checks["stack"] and not package_integrations): + raise validation_checks["stack"] + + if (validation_checks["stack"] and validation_checks["integrations"]): + raise ValueError(f"Error in both stack and integrations checks: {validation_checks}") + + def validate_stack_combos(self, data: QueryRuleData, meta: RuleMeta) -> Union[KQL_ERROR_TYPES, None, TypeError]: """Validate the query against ECS and beats schemas across stack combinations.""" for stack_version, mapping in meta.get_validation_stack_versions().items(): beats_version = mapping['beats'] @@ -65,13 +81,14 @@ class KQLValidator(QueryValidator): if "Unknown field" in message and beat_types: trailer = f"\nTry adding event.module or event.dataset to specify beats module\n\n{trailer}" - raise kql.KqlParseError(exc.error_msg, exc.line, exc.column, exc.source, - len(exc.caret.lstrip()), trailer=trailer) from None - except Exception: + return kql.KqlParseError(exc.error_msg, exc.line, exc.column, exc.source, + len(exc.caret.lstrip()), trailer=trailer) + except Exception as exc: print(err_trailer) - raise + return exc - def validate_integration(self, data: QueryRuleData, meta: RuleMeta, package_integrations: List[dict]) -> None: + def validate_integration(self, data: QueryRuleData, meta: RuleMeta, package_integrations: List[dict]) -> Union[ + KQL_ERROR_TYPES, None, TypeError]: """Validate the query, called from the parent which contains [metadata] information.""" if meta.query_schema_validation is False or meta.maturity == "deprecated": # syntax only, which is done via self.ast @@ -113,8 +130,8 @@ class KQLValidator(QueryValidator): if data.get("notify", False): print(f"\nWarning: `{field}` in `{data.name}` not found in schema. {trailer}") else: - raise kql.KqlParseError(exc.error_msg, exc.line, exc.column, exc.source, - len(exc.caret.lstrip()), trailer=trailer) from None + return kql.KqlParseError(exc.error_msg, exc.line, exc.column, exc.source, + len(exc.caret.lstrip()), trailer=trailer) # don't error on fields that are in another integration schema for field in list(error_fields.keys()): @@ -127,8 +144,8 @@ class KQLValidator(QueryValidator): exc = data["error"] trailer = data["trailer"] - raise kql.KqlParseError(exc.error_msg, exc.line, exc.column, exc.source, - len(exc.caret.lstrip()), trailer=trailer) from None + return kql.KqlParseError(exc.error_msg, exc.line, exc.column, exc.source, + len(exc.caret.lstrip()), trailer=trailer) class EQLValidator(QueryValidator): @@ -160,14 +177,21 @@ class EQLValidator(QueryValidator): packages_manifest = load_integrations_manifests() package_integrations = TOMLRuleContents.get_packaged_integrations(data, meta, packages_manifest) + validation_checks = {"stack": None, "integrations": None} # validate the query against fields within beats - self.validate_stack_combos(data, meta) + validation_checks["stack"] = self.validate_stack_combos(data, meta) if package_integrations: # validate the query against related integration fields - self.validate_integration(data, meta, package_integrations) + validation_checks["integrations"] = self.validate_integration(data, meta, package_integrations) - def validate_stack_combos(self, data: QueryRuleData, meta: RuleMeta) -> None: + if validation_checks["stack"] and not package_integrations: + raise validation_checks["stack"] + + if validation_checks["stack"] and validation_checks["integrations"]: + raise ValueError(f"Error in both stack and integrations checks: {validation_checks}") + + def validate_stack_combos(self, data: QueryRuleData, meta: RuleMeta) -> Union[EQL_ERROR_TYPES, None, ValueError]: """Validate the query against ECS and beats schemas across stack combinations.""" for stack_version, mapping in meta.get_validation_stack_versions().items(): beats_version = mapping['beats'] @@ -182,14 +206,19 @@ class EQLValidator(QueryValidator): eql_schema = ecs.KqlSchema2Eql(schema) # validate query against the beats and eql schema - self.validate_query_with_schema(data=data, schema=eql_schema, err_trailer=err_trailer, - beat_types=beat_types) + exc = self.validate_query_with_schema(data=data, schema=eql_schema, err_trailer=err_trailer, + beat_types=beat_types) + if exc: + return exc if endgame_schema: # validate query against the endgame schema - self.validate_query_with_schema(data=data, schema=endgame_schema, err_trailer=err_trailer) + exc = self.validate_query_with_schema(data=data, schema=endgame_schema, err_trailer=err_trailer) + if exc: + raise exc - def validate_integration(self, data: QueryRuleData, meta: RuleMeta, package_integrations: List[dict]) -> None: + def validate_integration(self, data: QueryRuleData, meta: RuleMeta, package_integrations: List[dict]) -> Union[ + EQL_ERROR_TYPES, None, ValueError]: """Validate an EQL query while checking TOMLRule against integration schemas.""" if meta.query_schema_validation is False or meta.maturity == "deprecated": # syntax only, which is done via self.ast @@ -220,9 +249,9 @@ class EQLValidator(QueryValidator): err_trailer = f'stack: {stack_version}, integration: {integration},' \ f'ecs: {ecs_version}, package: {package}, package_version: {package_version}' - try: - self.validate_query_with_schema(data=data, schema=eql_schema, err_trailer=err_trailer) - except eql.EqlParseError as exc: + exc = self.validate_query_with_schema(data=data, schema=eql_schema, err_trailer=err_trailer) + + if isinstance(exc, eql.EqlParseError): message = exc.error_msg if message == "Unknown field" or "Field not recognized" in message: field = extract_error_field(exc) @@ -235,7 +264,7 @@ class EQLValidator(QueryValidator): if data.get("notify", False): print(f"\nWarning: `{field}` in `{data.name}` not found in schema. {trailer}") else: - raise exc + return exc # don't error on fields that are in another integration schema for field in list(error_fields.keys()): @@ -246,10 +275,11 @@ class EQLValidator(QueryValidator): if error_fields: _, data = next(iter(error_fields.items())) exc = data["error"] - raise exc + return exc def validate_query_with_schema(self, data: 'QueryRuleData', schema: Union[ecs.KqlSchema2Eql, endgame.EndgameSchema], - err_trailer: str, beat_types: list = None) -> None: + err_trailer: str, beat_types: list = None) -> Union[ + EQL_ERROR_TYPES, ValueError, None]: """Validate the query against the schema.""" try: with schema, eql.parser.elasticsearch_syntax, eql.parser.ignore_missing_functions: @@ -265,12 +295,12 @@ class EQLValidator(QueryValidator): fields_str = ', '.join(text_fields) trailer = f"\neql does not support text fields: {fields_str}\n\n{trailer}" - raise exc.__class__(exc.error_msg, exc.line, exc.column, exc.source, - len(exc.caret.lstrip()), trailer=trailer) from None + return exc.__class__(exc.error_msg, exc.line, exc.column, exc.source, + len(exc.caret.lstrip()), trailer=trailer) - except Exception: + except Exception as exc: print(err_trailer) - raise + return exc def extract_error_field(exc: Union[eql.EqlParseError, kql.KqlParseError]) -> Optional[str]: diff --git a/tests/test_all_rules.py b/tests/test_all_rules.py index 31fc03fe2..37883483f 100644 --- a/tests/test_all_rules.py +++ b/tests/test_all_rules.py @@ -7,6 +7,7 @@ import os import re import unittest +import uuid import warnings from collections import defaultdict from pathlib import Path @@ -23,6 +24,7 @@ from detection_rules.packaging import current_stack_version from detection_rules.rule import (QueryRuleData, TOMLRuleContents, load_integrations_manifests, QueryValidator) from detection_rules.rule_loader import FILE_PATTERN +from detection_rules.rule_validators import EQLValidator, KQLValidator from detection_rules.schemas import definitions, get_stack_schemas from detection_rules.utils import INTEGRATION_RULE_DIR, get_path, load_etc_dump, PatchedTemplate from detection_rules.version_lock import default_version_lock @@ -513,6 +515,163 @@ class TestRuleMetadata(BaseRuleTest): """ self.fail(err_msg + '\n'.join(failures)) + def test_invalid_queries(self): + invalid_queries_eql = [ + """file where file.fake: ( + "token","assig", "pssc", "keystore", "pub", "pgp.asc", "ps1xml", "pem", "gpg.sig", "der", "key", + "p7r", "p12", "asc", "jks", "p7b", "signature", "gpg", "pgp.sig", "sst", "pgp", "gpgz", "pfx", "crt", + "p8", "sig", "pkcs7", "jceks", "pkcs8", "psc1", "p7c", "csr", "cer", "spc", "ps2xml") + """ + ] + invalid_integration_queries_eql = [ + """file where event.dataset == "google_workspace.drive" and event.action : ("copy", "view", "download") and + google_workspace.drive.fake: "people_with_link" and source.user.email == "" and + file.extension: ( + "token","assig", "pssc", "keystore", "pub", "pgp.asc", "ps1xml", "pem", "gpg.sig", "der", "key", + "p7r", "p12", "asc", "jks", "p7b", "signature", "gpg", "pgp.sig", "sst", "pgp", "gpgz", "pfx", + "crt", "p8", "sig", "pkcs7", "jceks", "pkcs8", "psc1", "p7c", "csr", "cer", "spc", "ps2xml") + """, + """file where event.dataset == "google_workspace.drive" and event.action : ("copy", "view", "download") and + google_workspace.drive.visibility: "people_with_link" and source.user.email == "" and + file.fake: ( + "token","assig", "pssc", "keystore", "pub", "pgp.asc", "ps1xml", "pem", "gpg.sig", "der", "key", + "p7r", "p12", "asc", "jks", "p7b", "signature", "gpg", "pgp.sig", "sst", "pgp", "gpgz", + "pfx", "crt", "p8", "sig", "pkcs7", "jceks", "pkcs8", "psc1", "p7c", "csr", "cer", "spc", + "ps2xml") + """ + ] + + valid_queries_eql = [ + """file where file.extension: ( + "token","assig", "pssc", "keystore", "pub", "pgp.asc", "ps1xml", "pem", + "p7r", "p12", "asc", "jks", "p7b", "signature", "gpg", "pgp.sig", "sst", + "p8", "sig", "pkcs7", "jceks", "pkcs8", "psc1", "p7c", "csr", "cer") + """, + """file where event.dataset == "google_workspace.drive" and event.action : ("copy", "view", "download") and + google_workspace.drive.visibility: "people_with_link" and source.user.email == "" and + file.extension: ( + "token","assig", "pssc", "keystore", "pub", "pgp.asc", "ps1xml", "pem", "gpg.sig", "der", "key", + "p7r", "p12", "asc", "jks", "p7b", "signature", "gpg", "pgp.sig", "sst", "pgp", "gpgz", "pfx", + "p8", "sig", "pkcs7", "jceks", "pkcs8", "psc1", "p7c", "csr", "cer", "spc", "ps2xml") + """ + + ] + + invalid_queries_kql = [ + """ + event.fake:"google_workspace.admin" and event.action:"CREATE_DATA_TRANSFER_REQUEST" + and event.category:"iam" and google_workspace.admin.application.name:Drive* + """ + ] + invalid_integration_queries_kql = [ + """ + event.dataset:"google_workspace.admin" and event.action:"CREATE_DATA_TRANSFER_REQUEST" + and event.category:"iam" and google_workspace.fake:Drive* + """ + ] + + valid_queries_kql = [ + """ + event.dataset:"google_workspace.admin" and event.action:"CREATE_DATA_TRANSFER_REQUEST" + and event.category:"iam" and google_workspace.admin.application.name:Drive* + """, + """ + event.dataset:"google_workspace.admin" and event.action:"CREATE_DATA_TRANSFER_REQUEST" + """ + + ] + + base_fields_eql = { + "author": ["Elastic"], + "description": "test description", + "index": ["filebeat-*"], + "language": "eql", + "license": "Elastic License v2", + "name": "test rule", + "risk_score": 21, + "rule_id": str(uuid.uuid4()), + "severity": "low", + "type": "eql" + } + + base_fields_kql = { + "author": ["Elastic"], + "description": "test description", + "index": ["filebeat-*"], + "language": "kuery", + "license": "Elastic License v2", + "name": "test rule", + "risk_score": 21, + "rule_id": str(uuid.uuid4()), + "severity": "low", + "type": "query" + } + + def build_rule(query: str, query_language: str): + metadata = { + "creation_date": "1970/01/01", + "integration": ["google_workspace"], + "updated_date": "1970/01/01", + "query_schema_validation": True, + "maturity": "production", + "min_stack_version": load_current_package_version() + } + if query_language == "eql": + data = base_fields_eql.copy() + elif query_language == "kuery": + data = base_fields_kql.copy() + data["query"] = query + obj = {"metadata": metadata, "rule": data} + return TOMLRuleContents.from_dict(obj) + # eql + for query in valid_queries_eql: + build_rule(query, "eql") + + for query in invalid_queries_eql: + with self.assertRaises(eql.EqlSchemaError): + build_rule(query, "eql") + + for query in invalid_integration_queries_eql: + with self.assertRaises(ValueError): + build_rule(query, "eql") + # kql + for query in valid_queries_kql: + build_rule(query, "kuery") + + for query in invalid_queries_kql: + with self.assertRaises(kql.KqlParseError): + build_rule(query, "kuery") + + for query in invalid_integration_queries_kql: + with self.assertRaises(ValueError): + build_rule(query, "kuery") + + def test_event_dataset(self): + for rule in self.all_rules: + if(isinstance(rule.contents.data, QueryRuleData)): + # Need to pick validator based on language + if rule.contents.data.language == "kuery": + test_validator = KQLValidator(rule.contents.data.query) + if rule.contents.data.language == "eql": + test_validator = EQLValidator(rule.contents.data.query) + data = rule.contents.data + meta = rule.contents.metadata + if meta.query_schema_validation is not False or meta.maturity != "deprecated": + if isinstance(data, QueryRuleData) and data.language != 'lucene': + packages_manifest = load_integrations_manifests() + pkg_integrations = TOMLRuleContents.get_packaged_integrations(data, meta, packages_manifest) + + validation_integrations_check = None + + if pkg_integrations: + # validate the query against related integration fields + validation_integrations_check = test_validator.validate_integration(data, + meta, + pkg_integrations) + + if(validation_integrations_check and "event.dataset" in rule.contents.data.query): + raise validation_integrations_check + class TestIntegrationRules(BaseRuleTest): """Test integration rules."""