Validate against beats and integrations schemas (#2524)

This commit is contained in:
Mika Ayenson
2023-02-08 12:01:31 -05:00
committed by GitHub
parent 443478c8c0
commit 60115443a4
2 changed files with 51 additions and 46 deletions
+49 -46
View File
@@ -40,31 +40,36 @@ class KQLValidator(QueryValidator):
packages_manifest = load_integrations_manifests()
package_integrations = TOMLRuleContents.get_packaged_integrations(data, meta, packages_manifest)
# validate the query against fields within beats
self.validate_stack_combos(data, meta)
if package_integrations:
# validate the query against related integration fields
self.validate_integration(data, meta, package_integrations)
else:
for stack_version, mapping in meta.get_validation_stack_versions().items():
beats_version = mapping['beats']
ecs_version = mapping['ecs']
err_trailer = f'stack: {stack_version}, beats: {beats_version}, ecs: {ecs_version}'
beat_types, beat_schema, schema = self.get_beats_schema(data.index or [],
beats_version, ecs_version)
def validate_stack_combos(self, data: QueryRuleData, meta: RuleMeta) -> None:
"""Validate the query against ECS and beats schemas across stack combinations."""
for stack_version, mapping in meta.get_validation_stack_versions().items():
beats_version = mapping['beats']
ecs_version = mapping['ecs']
err_trailer = f'stack: {stack_version}, beats: {beats_version}, ecs: {ecs_version}'
try:
kql.parse(self.query, schema=schema)
except kql.KqlParseError as exc:
message = exc.error_msg
trailer = err_trailer
if "Unknown field" in message and beat_types:
trailer = f"\nTry adding event.module or event.dataset to specify beats module\n\n{trailer}"
beat_types, beat_schema, schema = self.get_beats_schema(data.index or [],
beats_version, ecs_version)
raise kql.KqlParseError(exc.error_msg, exc.line, exc.column, exc.source,
len(exc.caret.lstrip()), trailer=trailer) from None
except Exception:
print(err_trailer)
raise
try:
kql.parse(self.query, schema=schema)
except kql.KqlParseError as exc:
message = exc.error_msg
trailer = err_trailer
if "Unknown field" in message and beat_types:
trailer = f"\nTry adding event.module or event.dataset to specify beats module\n\n{trailer}"
raise kql.KqlParseError(exc.error_msg, exc.line, exc.column, exc.source,
len(exc.caret.lstrip()), trailer=trailer) from None
except Exception:
print(err_trailer)
raise
def validate_integration(self, data: QueryRuleData, meta: RuleMeta, package_integrations: List[dict]) -> None:
"""Validate the query, called from the parent which contains [metadata] information."""
@@ -105,7 +110,8 @@ class KQLValidator(QueryValidator):
f"{stack_version=}, {ecs_version=}"
)
error_fields[field] = {"error": exc, "trailer": trailer}
print(f"\nWarning: `{field}` in `{data.name}` not found in schema. {trailer}")
if data.get("notify", False):
print(f"\nWarning: `{field}` in `{data.name}` not found in schema. {trailer}")
else:
raise kql.KqlParseError(exc.error_msg, exc.line, exc.column, exc.source,
len(exc.caret.lstrip()), trailer=trailer) from None
@@ -154,30 +160,34 @@ class EQLValidator(QueryValidator):
packages_manifest = load_integrations_manifests()
package_integrations = TOMLRuleContents.get_packaged_integrations(data, meta, packages_manifest)
# validate the query against fields within beats
self.validate_stack_combos(data, meta)
if package_integrations:
# validate the query against related integration fields
self.validate_integration(data, meta, package_integrations)
else:
for stack_version, mapping in meta.get_validation_stack_versions().items():
beats_version = mapping['beats']
ecs_version = mapping['ecs']
endgame_version = mapping['endgame']
err_trailer = f'stack: {stack_version}, beats: {beats_version},' \
f'ecs: {ecs_version}, endgame: {endgame_version}'
def validate_stack_combos(self, data: QueryRuleData, meta: RuleMeta) -> None:
"""Validate the query against ECS and beats schemas across stack combinations."""
for stack_version, mapping in meta.get_validation_stack_versions().items():
beats_version = mapping['beats']
ecs_version = mapping['ecs']
endgame_version = mapping['endgame']
err_trailer = f'stack: {stack_version}, beats: {beats_version},' \
f'ecs: {ecs_version}, endgame: {endgame_version}'
beat_types, beat_schema, schema = self.get_beats_schema(data.index or [],
beats_version, ecs_version)
endgame_schema = self.get_endgame_schema(data.index, endgame_version)
eql_schema = ecs.KqlSchema2Eql(schema)
beat_types, beat_schema, schema = self.get_beats_schema(data.index or [],
beats_version, ecs_version)
endgame_schema = self.get_endgame_schema(data.index, endgame_version)
eql_schema = ecs.KqlSchema2Eql(schema)
# validate query against the beats and eql schema
self.validate_query_with_schema(data=data, schema=eql_schema, err_trailer=err_trailer,
beat_types=beat_types)
# validate query against the beats and eql schema
self.validate_query_with_schema(data=data, schema=eql_schema, err_trailer=err_trailer,
beat_types=beat_types)
if endgame_schema:
# validate query against the endgame schema
self.validate_query_with_schema(data=data, schema=endgame_schema, err_trailer=err_trailer)
if endgame_schema:
# validate query against the endgame schema
self.validate_query_with_schema(data=data, schema=endgame_schema, err_trailer=err_trailer)
def validate_integration(self, data: QueryRuleData, meta: RuleMeta, package_integrations: List[dict]) -> None:
"""Validate an EQL query while checking TOMLRule against integration schemas."""
@@ -195,7 +205,6 @@ class EQLValidator(QueryValidator):
package_version = integration_schema_data['package_version']
integration_schema = integration_schema_data['schema']
stack_version = integration_schema_data['stack_version']
endgame_version = integration_schema_data['endgame_version']
if stack_version != current_stack_version:
# reset the combined schema for each stack version
@@ -223,17 +232,11 @@ class EQLValidator(QueryValidator):
f"{stack_version=}, {ecs_version=}"
)
error_fields[field] = {"error": exc, "trailer": trailer}
print(f"\nWarning: `{field}` in `{data.name}` not found in schema. {trailer}")
if data.get("notify", False):
print(f"\nWarning: `{field}` in `{data.name}` not found in schema. {trailer}")
else:
raise exc
# Still need to check endgame if it's in the index
endgame_schema = self.get_endgame_schema(data.index, endgame_version)
if endgame_schema:
# validate query against the endgame schema
err_trailer = f'stack: {stack_version}, endgame: {endgame_version}'
self.validate_query_with_schema(data=data, schema=endgame_schema, err_trailer=err_trailer)
# don't error on fields that are in another integration schema
for field in list(error_fields.keys()):
if field in combined_schema:
+2
View File
@@ -5,6 +5,7 @@
"""Shared resources for tests."""
import os
import unittest
from typing import Union
@@ -17,6 +18,7 @@ class BaseRuleTest(unittest.TestCase):
@classmethod
def setUpClass(cls):
os.environ["DR_NOTIFY_INTEGRATION_UPDATE_AVAILABLE"] = "1"
rc = RuleCollection.default()
cls.all_rules = rc.rules
cls.rule_lookup = rc.id_map