Patch to allow integration validation if ECS/beats fails (#2701)

* Updated for AND logic

* Added case for no package_intregrations

* Fixed linting

* Added unit test for new functionality

* Fixed linting

* Added valid query tests

* Add unit test for event.dataset

* Switched type calls to isinstance calls

* Removed  unused stack validation call

* Added additional error type

* Fixed linting

* Cleaned up error handling

* fixed linting

* Added proper type hints

* Fixed typo in Unions

* Updated unit test with additional test cases

* Updated  test_invalid_queries unit test

* Fixed linting

* Added kql to unit tests

* Updated tests

* Fixed error handling

* Fixed style issues

* updating integration manifests and schemas

---------

Co-authored-by: terrancedejesus <terrance.dejesus@elastic.co>
Co-authored-by: Terrance DeJesus <99630311+terrancedejesus@users.noreply.github.com>
This commit is contained in:
eric-forte-elastic
2023-04-18 19:43:35 +00:00
committed by GitHub
parent fb09208132
commit 8ef2f6557b
4 changed files with 218 additions and 29 deletions
Binary file not shown.
Binary file not shown.
+59 -29
View File
@@ -15,6 +15,15 @@ from . import ecs, endgame
from .integrations import get_integration_schema_data, load_integrations_manifests
from .rule import QueryRuleData, QueryValidator, RuleMeta, TOMLRuleContents
EQL_ERROR_TYPES = Union[eql.EqlCompileError,
eql.EqlError,
eql.EqlParseError,
eql.EqlSchemaError,
eql.EqlSemanticError,
eql.EqlSyntaxError,
eql.EqlTypeMismatchError]
KQL_ERROR_TYPES = Union[kql.KqlCompileError, kql.KqlParseError]
class KQLValidator(QueryValidator):
"""Specific fields for KQL query event types."""
@@ -40,14 +49,21 @@ class KQLValidator(QueryValidator):
packages_manifest = load_integrations_manifests()
package_integrations = TOMLRuleContents.get_packaged_integrations(data, meta, packages_manifest)
validation_checks = {"stack": None, "integrations": None}
# validate the query against fields within beats
self.validate_stack_combos(data, meta)
validation_checks["stack"] = self.validate_stack_combos(data, meta)
if package_integrations:
# validate the query against related integration fields
self.validate_integration(data, meta, package_integrations)
validation_checks["integrations"] = self.validate_integration(data, meta, package_integrations)
def validate_stack_combos(self, data: QueryRuleData, meta: RuleMeta) -> None:
if (validation_checks["stack"] and not package_integrations):
raise validation_checks["stack"]
if (validation_checks["stack"] and validation_checks["integrations"]):
raise ValueError(f"Error in both stack and integrations checks: {validation_checks}")
def validate_stack_combos(self, data: QueryRuleData, meta: RuleMeta) -> Union[KQL_ERROR_TYPES, None, TypeError]:
"""Validate the query against ECS and beats schemas across stack combinations."""
for stack_version, mapping in meta.get_validation_stack_versions().items():
beats_version = mapping['beats']
@@ -65,13 +81,14 @@ class KQLValidator(QueryValidator):
if "Unknown field" in message and beat_types:
trailer = f"\nTry adding event.module or event.dataset to specify beats module\n\n{trailer}"
raise kql.KqlParseError(exc.error_msg, exc.line, exc.column, exc.source,
len(exc.caret.lstrip()), trailer=trailer) from None
except Exception:
return kql.KqlParseError(exc.error_msg, exc.line, exc.column, exc.source,
len(exc.caret.lstrip()), trailer=trailer)
except Exception as exc:
print(err_trailer)
raise
return exc
def validate_integration(self, data: QueryRuleData, meta: RuleMeta, package_integrations: List[dict]) -> None:
def validate_integration(self, data: QueryRuleData, meta: RuleMeta, package_integrations: List[dict]) -> Union[
KQL_ERROR_TYPES, None, TypeError]:
"""Validate the query, called from the parent which contains [metadata] information."""
if meta.query_schema_validation is False or meta.maturity == "deprecated":
# syntax only, which is done via self.ast
@@ -113,8 +130,8 @@ class KQLValidator(QueryValidator):
if data.get("notify", False):
print(f"\nWarning: `{field}` in `{data.name}` not found in schema. {trailer}")
else:
raise kql.KqlParseError(exc.error_msg, exc.line, exc.column, exc.source,
len(exc.caret.lstrip()), trailer=trailer) from None
return kql.KqlParseError(exc.error_msg, exc.line, exc.column, exc.source,
len(exc.caret.lstrip()), trailer=trailer)
# don't error on fields that are in another integration schema
for field in list(error_fields.keys()):
@@ -127,8 +144,8 @@ class KQLValidator(QueryValidator):
exc = data["error"]
trailer = data["trailer"]
raise kql.KqlParseError(exc.error_msg, exc.line, exc.column, exc.source,
len(exc.caret.lstrip()), trailer=trailer) from None
return kql.KqlParseError(exc.error_msg, exc.line, exc.column, exc.source,
len(exc.caret.lstrip()), trailer=trailer)
class EQLValidator(QueryValidator):
@@ -160,14 +177,21 @@ class EQLValidator(QueryValidator):
packages_manifest = load_integrations_manifests()
package_integrations = TOMLRuleContents.get_packaged_integrations(data, meta, packages_manifest)
validation_checks = {"stack": None, "integrations": None}
# validate the query against fields within beats
self.validate_stack_combos(data, meta)
validation_checks["stack"] = self.validate_stack_combos(data, meta)
if package_integrations:
# validate the query against related integration fields
self.validate_integration(data, meta, package_integrations)
validation_checks["integrations"] = self.validate_integration(data, meta, package_integrations)
def validate_stack_combos(self, data: QueryRuleData, meta: RuleMeta) -> None:
if validation_checks["stack"] and not package_integrations:
raise validation_checks["stack"]
if validation_checks["stack"] and validation_checks["integrations"]:
raise ValueError(f"Error in both stack and integrations checks: {validation_checks}")
def validate_stack_combos(self, data: QueryRuleData, meta: RuleMeta) -> Union[EQL_ERROR_TYPES, None, ValueError]:
"""Validate the query against ECS and beats schemas across stack combinations."""
for stack_version, mapping in meta.get_validation_stack_versions().items():
beats_version = mapping['beats']
@@ -182,14 +206,19 @@ class EQLValidator(QueryValidator):
eql_schema = ecs.KqlSchema2Eql(schema)
# validate query against the beats and eql schema
self.validate_query_with_schema(data=data, schema=eql_schema, err_trailer=err_trailer,
beat_types=beat_types)
exc = self.validate_query_with_schema(data=data, schema=eql_schema, err_trailer=err_trailer,
beat_types=beat_types)
if exc:
return exc
if endgame_schema:
# validate query against the endgame schema
self.validate_query_with_schema(data=data, schema=endgame_schema, err_trailer=err_trailer)
exc = self.validate_query_with_schema(data=data, schema=endgame_schema, err_trailer=err_trailer)
if exc:
raise exc
def validate_integration(self, data: QueryRuleData, meta: RuleMeta, package_integrations: List[dict]) -> None:
def validate_integration(self, data: QueryRuleData, meta: RuleMeta, package_integrations: List[dict]) -> Union[
EQL_ERROR_TYPES, None, ValueError]:
"""Validate an EQL query while checking TOMLRule against integration schemas."""
if meta.query_schema_validation is False or meta.maturity == "deprecated":
# syntax only, which is done via self.ast
@@ -220,9 +249,9 @@ class EQLValidator(QueryValidator):
err_trailer = f'stack: {stack_version}, integration: {integration},' \
f'ecs: {ecs_version}, package: {package}, package_version: {package_version}'
try:
self.validate_query_with_schema(data=data, schema=eql_schema, err_trailer=err_trailer)
except eql.EqlParseError as exc:
exc = self.validate_query_with_schema(data=data, schema=eql_schema, err_trailer=err_trailer)
if isinstance(exc, eql.EqlParseError):
message = exc.error_msg
if message == "Unknown field" or "Field not recognized" in message:
field = extract_error_field(exc)
@@ -235,7 +264,7 @@ class EQLValidator(QueryValidator):
if data.get("notify", False):
print(f"\nWarning: `{field}` in `{data.name}` not found in schema. {trailer}")
else:
raise exc
return exc
# don't error on fields that are in another integration schema
for field in list(error_fields.keys()):
@@ -246,10 +275,11 @@ class EQLValidator(QueryValidator):
if error_fields:
_, data = next(iter(error_fields.items()))
exc = data["error"]
raise exc
return exc
def validate_query_with_schema(self, data: 'QueryRuleData', schema: Union[ecs.KqlSchema2Eql, endgame.EndgameSchema],
err_trailer: str, beat_types: list = None) -> None:
err_trailer: str, beat_types: list = None) -> Union[
EQL_ERROR_TYPES, ValueError, None]:
"""Validate the query against the schema."""
try:
with schema, eql.parser.elasticsearch_syntax, eql.parser.ignore_missing_functions:
@@ -265,12 +295,12 @@ class EQLValidator(QueryValidator):
fields_str = ', '.join(text_fields)
trailer = f"\neql does not support text fields: {fields_str}\n\n{trailer}"
raise exc.__class__(exc.error_msg, exc.line, exc.column, exc.source,
len(exc.caret.lstrip()), trailer=trailer) from None
return exc.__class__(exc.error_msg, exc.line, exc.column, exc.source,
len(exc.caret.lstrip()), trailer=trailer)
except Exception:
except Exception as exc:
print(err_trailer)
raise
return exc
def extract_error_field(exc: Union[eql.EqlParseError, kql.KqlParseError]) -> Optional[str]:
+159
View File
@@ -7,6 +7,7 @@
import os
import re
import unittest
import uuid
import warnings
from collections import defaultdict
from pathlib import Path
@@ -23,6 +24,7 @@ from detection_rules.packaging import current_stack_version
from detection_rules.rule import (QueryRuleData, TOMLRuleContents,
load_integrations_manifests, QueryValidator)
from detection_rules.rule_loader import FILE_PATTERN
from detection_rules.rule_validators import EQLValidator, KQLValidator
from detection_rules.schemas import definitions, get_stack_schemas
from detection_rules.utils import INTEGRATION_RULE_DIR, get_path, load_etc_dump, PatchedTemplate
from detection_rules.version_lock import default_version_lock
@@ -513,6 +515,163 @@ class TestRuleMetadata(BaseRuleTest):
"""
self.fail(err_msg + '\n'.join(failures))
def test_invalid_queries(self):
invalid_queries_eql = [
"""file where file.fake: (
"token","assig", "pssc", "keystore", "pub", "pgp.asc", "ps1xml", "pem", "gpg.sig", "der", "key",
"p7r", "p12", "asc", "jks", "p7b", "signature", "gpg", "pgp.sig", "sst", "pgp", "gpgz", "pfx", "crt",
"p8", "sig", "pkcs7", "jceks", "pkcs8", "psc1", "p7c", "csr", "cer", "spc", "ps2xml")
"""
]
invalid_integration_queries_eql = [
"""file where event.dataset == "google_workspace.drive" and event.action : ("copy", "view", "download") and
google_workspace.drive.fake: "people_with_link" and source.user.email == "" and
file.extension: (
"token","assig", "pssc", "keystore", "pub", "pgp.asc", "ps1xml", "pem", "gpg.sig", "der", "key",
"p7r", "p12", "asc", "jks", "p7b", "signature", "gpg", "pgp.sig", "sst", "pgp", "gpgz", "pfx",
"crt", "p8", "sig", "pkcs7", "jceks", "pkcs8", "psc1", "p7c", "csr", "cer", "spc", "ps2xml")
""",
"""file where event.dataset == "google_workspace.drive" and event.action : ("copy", "view", "download") and
google_workspace.drive.visibility: "people_with_link" and source.user.email == "" and
file.fake: (
"token","assig", "pssc", "keystore", "pub", "pgp.asc", "ps1xml", "pem", "gpg.sig", "der", "key",
"p7r", "p12", "asc", "jks", "p7b", "signature", "gpg", "pgp.sig", "sst", "pgp", "gpgz",
"pfx", "crt", "p8", "sig", "pkcs7", "jceks", "pkcs8", "psc1", "p7c", "csr", "cer", "spc",
"ps2xml")
"""
]
valid_queries_eql = [
"""file where file.extension: (
"token","assig", "pssc", "keystore", "pub", "pgp.asc", "ps1xml", "pem",
"p7r", "p12", "asc", "jks", "p7b", "signature", "gpg", "pgp.sig", "sst",
"p8", "sig", "pkcs7", "jceks", "pkcs8", "psc1", "p7c", "csr", "cer")
""",
"""file where event.dataset == "google_workspace.drive" and event.action : ("copy", "view", "download") and
google_workspace.drive.visibility: "people_with_link" and source.user.email == "" and
file.extension: (
"token","assig", "pssc", "keystore", "pub", "pgp.asc", "ps1xml", "pem", "gpg.sig", "der", "key",
"p7r", "p12", "asc", "jks", "p7b", "signature", "gpg", "pgp.sig", "sst", "pgp", "gpgz", "pfx",
"p8", "sig", "pkcs7", "jceks", "pkcs8", "psc1", "p7c", "csr", "cer", "spc", "ps2xml")
"""
]
invalid_queries_kql = [
"""
event.fake:"google_workspace.admin" and event.action:"CREATE_DATA_TRANSFER_REQUEST"
and event.category:"iam" and google_workspace.admin.application.name:Drive*
"""
]
invalid_integration_queries_kql = [
"""
event.dataset:"google_workspace.admin" and event.action:"CREATE_DATA_TRANSFER_REQUEST"
and event.category:"iam" and google_workspace.fake:Drive*
"""
]
valid_queries_kql = [
"""
event.dataset:"google_workspace.admin" and event.action:"CREATE_DATA_TRANSFER_REQUEST"
and event.category:"iam" and google_workspace.admin.application.name:Drive*
""",
"""
event.dataset:"google_workspace.admin" and event.action:"CREATE_DATA_TRANSFER_REQUEST"
"""
]
base_fields_eql = {
"author": ["Elastic"],
"description": "test description",
"index": ["filebeat-*"],
"language": "eql",
"license": "Elastic License v2",
"name": "test rule",
"risk_score": 21,
"rule_id": str(uuid.uuid4()),
"severity": "low",
"type": "eql"
}
base_fields_kql = {
"author": ["Elastic"],
"description": "test description",
"index": ["filebeat-*"],
"language": "kuery",
"license": "Elastic License v2",
"name": "test rule",
"risk_score": 21,
"rule_id": str(uuid.uuid4()),
"severity": "low",
"type": "query"
}
def build_rule(query: str, query_language: str):
metadata = {
"creation_date": "1970/01/01",
"integration": ["google_workspace"],
"updated_date": "1970/01/01",
"query_schema_validation": True,
"maturity": "production",
"min_stack_version": load_current_package_version()
}
if query_language == "eql":
data = base_fields_eql.copy()
elif query_language == "kuery":
data = base_fields_kql.copy()
data["query"] = query
obj = {"metadata": metadata, "rule": data}
return TOMLRuleContents.from_dict(obj)
# eql
for query in valid_queries_eql:
build_rule(query, "eql")
for query in invalid_queries_eql:
with self.assertRaises(eql.EqlSchemaError):
build_rule(query, "eql")
for query in invalid_integration_queries_eql:
with self.assertRaises(ValueError):
build_rule(query, "eql")
# kql
for query in valid_queries_kql:
build_rule(query, "kuery")
for query in invalid_queries_kql:
with self.assertRaises(kql.KqlParseError):
build_rule(query, "kuery")
for query in invalid_integration_queries_kql:
with self.assertRaises(ValueError):
build_rule(query, "kuery")
def test_event_dataset(self):
for rule in self.all_rules:
if(isinstance(rule.contents.data, QueryRuleData)):
# Need to pick validator based on language
if rule.contents.data.language == "kuery":
test_validator = KQLValidator(rule.contents.data.query)
if rule.contents.data.language == "eql":
test_validator = EQLValidator(rule.contents.data.query)
data = rule.contents.data
meta = rule.contents.metadata
if meta.query_schema_validation is not False or meta.maturity != "deprecated":
if isinstance(data, QueryRuleData) and data.language != 'lucene':
packages_manifest = load_integrations_manifests()
pkg_integrations = TOMLRuleContents.get_packaged_integrations(data, meta, packages_manifest)
validation_integrations_check = None
if pkg_integrations:
# validate the query against related integration fields
validation_integrations_check = test_validator.validate_integration(data,
meta,
pkg_integrations)
if(validation_integrations_check and "event.dataset" in rule.contents.data.query):
raise validation_integrations_check
class TestIntegrationRules(BaseRuleTest):
"""Test integration rules."""