[Bug] Use integration schemas for required_field types (#3303)

This commit is contained in:
Mika Ayenson
2023-12-11 11:32:38 -06:00
committed by GitHub
parent 6c614eb102
commit face95058f
3 changed files with 104 additions and 62 deletions
+15 -13
View File
@@ -7,7 +7,7 @@
import json
import os
import re
from typing import List, Optional
from typing import List, Optional, Union
import eql
import requests
@@ -246,7 +246,8 @@ def get_schema_from_datasets(beats, modules, datasets, version=None):
return filtered
def get_schema_from_eql(tree: eql.ast.BaseNode, beats: list, version: str = None) -> dict:
def get_datasets_and_modules(tree: Union[eql.ast.BaseNode, kql.ast.BaseNode]) -> tuple:
"""Get datasets and modules from an EQL or KQL AST."""
modules = set()
datasets = set()
@@ -263,22 +264,23 @@ def get_schema_from_eql(tree: eql.ast.BaseNode, beats: list, version: str = None
modules.add(node.get_literals())
elif node.expression == eql.ast.Field("event", ["dataset"]):
datasets.add(node.get_literals())
elif isinstance(node, kql.ast.FieldComparison) and node.field == kql.ast.Field("event.module"):
modules.update(child.value for child in node.value if isinstance(child, kql.ast.String))
elif isinstance(node, kql.ast.FieldComparison) and node.field == kql.ast.Field("event.dataset"):
datasets.update(child.value for child in node.value if isinstance(child, kql.ast.String))
return datasets, modules
def get_schema_from_eql(tree: eql.ast.BaseNode, beats: list, version: str = None) -> dict:
"""Get a schema based on datasets and modules in an EQL AST."""
datasets, modules = get_datasets_and_modules(tree)
return get_schema_from_datasets(beats, modules, datasets, version=version)
def get_schema_from_kql(tree: kql.ast.BaseNode, beats: list, version: str = None) -> dict:
modules = set()
datasets = set()
# extract out event.module and event.dataset from the query's AST
for node in tree:
if isinstance(node, kql.ast.FieldComparison) and node.field == kql.ast.Field("event.module"):
modules.update(child.value for child in node.value if isinstance(child, kql.ast.String))
if isinstance(node, kql.ast.FieldComparison) and node.field == kql.ast.Field("event.dataset"):
datasets.update(child.value for child in node.value if isinstance(child, kql.ast.String))
"""Get a schema based on datasets and modules in an KQL AST."""
datasets, modules = get_datasets_and_modules(tree)
return get_schema_from_datasets(beats, modules, datasets, version=version)
+66 -29
View File
@@ -10,7 +10,7 @@ import json
import re
from collections import OrderedDict
from pathlib import Path
from typing import Generator, Tuple, Union, Optional
from typing import Generator, List, Tuple, Union, Optional
import requests
from semver import Version
@@ -27,6 +27,7 @@ from .schemas import definitions
MANIFEST_FILE_PATH = Path(get_etc_path('integration-manifests.json.gz'))
SCHEMA_FILE_PATH = Path(get_etc_path('integration-schemas.json.gz'))
_notified_integrations = set()
@cached
@@ -304,9 +305,6 @@ def get_integration_schema_data(data, meta, package_integrations: dict) -> Gener
if (isinstance(data, QueryRuleData) or isinstance(data, ESQLRuleData)) \
and data.language != 'lucene' and meta.maturity == "production":
# flag to only warn once per integration for available upgrades
notify_update_available = True
for stack_version, mapping in meta.get_validation_stack_versions().items():
ecs_version = mapping['ecs']
endgame_version = mapping['endgame']
@@ -321,31 +319,11 @@ def get_integration_schema_data(data, meta, package_integrations: dict) -> Gener
min_stack = meta.min_stack_version or load_current_package_version()
min_stack = Version.parse(min_stack, optional_minor_and_patch=True)
package_version, notice = find_latest_compatible_version(package=package,
integration=integration,
rule_stack_version=min_stack,
packages_manifest=packages_manifest)
if notify_update_available and notice and data.get("notify", False):
# Notify for now, as to not lock rule stacks to integrations
notify_update_available = False
print(f"\n{data.get('name')}")
print(*notice)
schema = {}
if integration is None:
# Use all fields from each dataset
for dataset in integrations_schemas[package][package_version]:
# ignore jobs from machine learning packages
if dataset != "jobs":
schema.update(integrations_schemas[package][package_version][dataset])
else:
if integration not in integrations_schemas[package][package_version]:
raise ValueError(f"Integration {integration} not found in package {package} "
f"version {package_version}")
schema = integrations_schemas[package][package_version][integration]
schema.update(ecs_schema)
integration_schema = {k: kql.parser.elasticsearch_type_family(v) for k, v in schema.items()}
# Extract the integration schema fields
integration_schema, package_version = get_integration_schema_fields(integrations_schemas, package,
integration, min_stack,
packages_manifest, ecs_schema,
data)
data = {"schema": integration_schema, "package": package, "integration": integration,
"stack_version": stack_version, "ecs_version": ecs_version,
@@ -353,6 +331,65 @@ def get_integration_schema_data(data, meta, package_integrations: dict) -> Gener
yield data
def get_integration_schema_fields(integrations_schemas: dict, package: str, integration: str,
min_stack: Version, packages_manifest: dict,
ecs_schema: dict, data: dict) -> dict:
"""Extracts the integration fields to schema based on package integrations."""
package_version, notice = find_latest_compatible_version(package, integration, min_stack, packages_manifest)
notify_user_if_update_available(data, notice, integration)
schema = collect_schema_fields(integrations_schemas, package, package_version, integration)
schema.update(ecs_schema)
integration_schema = {key: kql.parser.elasticsearch_type_family(value) for key, value in schema.items()}
return integration_schema, package_version
def notify_user_if_update_available(data: dict, notice: list, integration: str) -> None:
"""Notifies the user if an update is available, only once per integration."""
global _notified_integrations
if notice and data.get("notify", False) and integration not in _notified_integrations:
# flag to only warn once per integration for available upgrades
_notified_integrations.add(integration)
print(f"\n{data.get('name')}")
print('\n'.join(notice))
def collect_schema_fields(integrations_schemas: dict, package: str, package_version: str,
integration: Optional[str] = None) -> dict:
"""Collects the schema fields for a given integration."""
if integration is None:
return {field: value for dataset in integrations_schemas[package][package_version] if dataset != "jobs"
for field, value in integrations_schemas[package][package_version][dataset].items()}
if integration not in integrations_schemas[package][package_version]:
raise ValueError(f"Integration {integration} not found in package {package} version {package_version}")
return integrations_schemas[package][package_version][integration]
def parse_datasets(datasets: list, package_manifest: dict) -> List[Optional[dict]]:
"""Parses datasets into packaged integrations from rule data."""
packaged_integrations = []
for value in sorted(datasets):
# cleanup extra quotes pulled from ast field
value = value.strip('"')
integration = 'Unknown'
if '.' in value:
package, integration = value.split('.', 1)
else:
package = value
if package in list(package_manifest):
packaged_integrations.append({"package": package, "integration": integration})
return packaged_integrations
class SecurityDetectionEngine:
"""Dedicated to Security Detection Engine integration."""
+23 -20
View File
@@ -22,11 +22,11 @@ from marko.ext.gfm import gfm
from marshmallow import ValidationError, validates_schema
import kql
from kql.ast import FieldComparison
from . import beats, ecs, endgame, utils
from .integrations import (find_least_compatible_version,
load_integrations_manifests)
from .integrations import (find_least_compatible_version, get_integration_schema_fields,
load_integrations_manifests, load_integrations_schemas,
parse_datasets)
from .misc import load_current_package_version
from .mixins import MarshmallowDataclassMixin, StackCompatMixin
from .rule_formatter import nested_normalize, toml_write
@@ -513,6 +513,21 @@ class QueryValidator:
beat_types, beat_schema, schema = self.get_beats_schema(index or [], beats_version, ecs_version)
endgame_schema = self.get_endgame_schema(index or [], endgame_version)
# construct integration schemas
packages_manifest = load_integrations_manifests()
integrations_schemas = load_integrations_schemas()
datasets, _ = beats.get_datasets_and_modules(self.ast)
package_integrations = parse_datasets(datasets, packages_manifest)
int_schema = {}
data = {"notify": False}
for pk_int in package_integrations:
package = pk_int["package"]
integration = pk_int["integration"]
schema, _ = get_integration_schema_fields(integrations_schemas, package, integration,
current_version, packages_manifest, {}, data)
int_schema.update(schema)
required = []
unique_fields = self.unique_fields or []
@@ -521,7 +536,9 @@ class QueryValidator:
is_ecs = field_type is not None
if not is_ecs:
if beat_schema:
if int_schema:
field_type = int_schema.get(fld, None)
elif beat_schema:
field_type = beat_schema.get(fld, {}).get('type')
elif endgame_schema:
field_type = endgame_schema.endgame_schema.get(fld, None)
@@ -1093,13 +1110,7 @@ class TOMLRuleContents(BaseRuleContents, MarshmallowDataclassMixin):
def get_packaged_integrations(cls, data: QueryRuleData, meta: RuleMeta,
package_manifest: dict) -> Optional[List[dict]]:
packaged_integrations = []
datasets = set()
for node in data.get('ast') or []:
if isinstance(node, eql.ast.Comparison) and str(node.left) == 'event.dataset':
datasets.update(set(n.value for n in node if isinstance(n, eql.ast.Literal)))
elif isinstance(node, FieldComparison) and str(node.field) == 'event.dataset':
datasets.update(set(str(n) for n in node if isinstance(n, kql.ast.Value)))
datasets, _ = beats.get_datasets_and_modules(data.get('ast') or [])
# integration is None to remove duplicate references upstream in Kibana
# chronologically, event.dataset is checked for package:integration, then rule tags
@@ -1114,15 +1125,7 @@ class TOMLRuleContents(BaseRuleContents, MarshmallowDataclassMixin):
if integration in ineligible_integrations or isinstance(data, MachineLearningRuleData):
packaged_integrations.append({"package": integration, "integration": None})
for value in sorted(datasets):
integration = 'Unknown'
if '.' in value:
package, integration = value.split('.', 1)
else:
package = value
if package in list(package_manifest):
packaged_integrations.append({"package": package, "integration": integration})
packaged_integrations.extend(parse_datasets(datasets, package_manifest))
return packaged_integrations