diff --git a/detection_rules/beats.py b/detection_rules/beats.py index 578a7d154..4af93dfdb 100644 --- a/detection_rules/beats.py +++ b/detection_rules/beats.py @@ -7,7 +7,7 @@ import json import os import re -from typing import List, Optional +from typing import List, Optional, Union import eql import requests @@ -246,7 +246,8 @@ def get_schema_from_datasets(beats, modules, datasets, version=None): return filtered -def get_schema_from_eql(tree: eql.ast.BaseNode, beats: list, version: str = None) -> dict: +def get_datasets_and_modules(tree: Union[eql.ast.BaseNode, kql.ast.BaseNode]) -> tuple: + """Get datasets and modules from an EQL or KQL AST.""" modules = set() datasets = set() @@ -263,22 +264,23 @@ def get_schema_from_eql(tree: eql.ast.BaseNode, beats: list, version: str = None modules.add(node.get_literals()) elif node.expression == eql.ast.Field("event", ["dataset"]): datasets.add(node.get_literals()) + elif isinstance(node, kql.ast.FieldComparison) and node.field == kql.ast.Field("event.module"): + modules.update(child.value for child in node.value if isinstance(child, kql.ast.String)) + elif isinstance(node, kql.ast.FieldComparison) and node.field == kql.ast.Field("event.dataset"): + datasets.update(child.value for child in node.value if isinstance(child, kql.ast.String)) + return datasets, modules + + +def get_schema_from_eql(tree: eql.ast.BaseNode, beats: list, version: str = None) -> dict: + """Get a schema based on datasets and modules in an EQL AST.""" + datasets, modules = get_datasets_and_modules(tree) return get_schema_from_datasets(beats, modules, datasets, version=version) def get_schema_from_kql(tree: kql.ast.BaseNode, beats: list, version: str = None) -> dict: - modules = set() - datasets = set() - - # extract out event.module and event.dataset from the query's AST - for node in tree: - if isinstance(node, kql.ast.FieldComparison) and node.field == kql.ast.Field("event.module"): - modules.update(child.value for child in node.value if isinstance(child, kql.ast.String)) - - if isinstance(node, kql.ast.FieldComparison) and node.field == kql.ast.Field("event.dataset"): - datasets.update(child.value for child in node.value if isinstance(child, kql.ast.String)) - + """Get a schema based on datasets and modules in an KQL AST.""" + datasets, modules = get_datasets_and_modules(tree) return get_schema_from_datasets(beats, modules, datasets, version=version) diff --git a/detection_rules/integrations.py b/detection_rules/integrations.py index 467b2a0ad..e1b38bb0f 100644 --- a/detection_rules/integrations.py +++ b/detection_rules/integrations.py @@ -10,7 +10,7 @@ import json import re from collections import OrderedDict from pathlib import Path -from typing import Generator, Tuple, Union, Optional +from typing import Generator, List, Tuple, Union, Optional import requests from semver import Version @@ -27,6 +27,7 @@ from .schemas import definitions MANIFEST_FILE_PATH = Path(get_etc_path('integration-manifests.json.gz')) SCHEMA_FILE_PATH = Path(get_etc_path('integration-schemas.json.gz')) +_notified_integrations = set() @cached @@ -304,9 +305,6 @@ def get_integration_schema_data(data, meta, package_integrations: dict) -> Gener if (isinstance(data, QueryRuleData) or isinstance(data, ESQLRuleData)) \ and data.language != 'lucene' and meta.maturity == "production": - # flag to only warn once per integration for available upgrades - notify_update_available = True - for stack_version, mapping in meta.get_validation_stack_versions().items(): ecs_version = mapping['ecs'] endgame_version = mapping['endgame'] @@ -321,31 +319,11 @@ def get_integration_schema_data(data, meta, package_integrations: dict) -> Gener min_stack = meta.min_stack_version or load_current_package_version() min_stack = Version.parse(min_stack, optional_minor_and_patch=True) - package_version, notice = find_latest_compatible_version(package=package, - integration=integration, - rule_stack_version=min_stack, - packages_manifest=packages_manifest) - - if notify_update_available and notice and data.get("notify", False): - # Notify for now, as to not lock rule stacks to integrations - notify_update_available = False - print(f"\n{data.get('name')}") - print(*notice) - - schema = {} - if integration is None: - # Use all fields from each dataset - for dataset in integrations_schemas[package][package_version]: - # ignore jobs from machine learning packages - if dataset != "jobs": - schema.update(integrations_schemas[package][package_version][dataset]) - else: - if integration not in integrations_schemas[package][package_version]: - raise ValueError(f"Integration {integration} not found in package {package} " - f"version {package_version}") - schema = integrations_schemas[package][package_version][integration] - schema.update(ecs_schema) - integration_schema = {k: kql.parser.elasticsearch_type_family(v) for k, v in schema.items()} + # Extract the integration schema fields + integration_schema, package_version = get_integration_schema_fields(integrations_schemas, package, + integration, min_stack, + packages_manifest, ecs_schema, + data) data = {"schema": integration_schema, "package": package, "integration": integration, "stack_version": stack_version, "ecs_version": ecs_version, @@ -353,6 +331,65 @@ def get_integration_schema_data(data, meta, package_integrations: dict) -> Gener yield data +def get_integration_schema_fields(integrations_schemas: dict, package: str, integration: str, + min_stack: Version, packages_manifest: dict, + ecs_schema: dict, data: dict) -> dict: + """Extracts the integration fields to schema based on package integrations.""" + package_version, notice = find_latest_compatible_version(package, integration, min_stack, packages_manifest) + notify_user_if_update_available(data, notice, integration) + + schema = collect_schema_fields(integrations_schemas, package, package_version, integration) + schema.update(ecs_schema) + + integration_schema = {key: kql.parser.elasticsearch_type_family(value) for key, value in schema.items()} + return integration_schema, package_version + + +def notify_user_if_update_available(data: dict, notice: list, integration: str) -> None: + """Notifies the user if an update is available, only once per integration.""" + + global _notified_integrations + if notice and data.get("notify", False) and integration not in _notified_integrations: + + # flag to only warn once per integration for available upgrades + _notified_integrations.add(integration) + + print(f"\n{data.get('name')}") + print('\n'.join(notice)) + + +def collect_schema_fields(integrations_schemas: dict, package: str, package_version: str, + integration: Optional[str] = None) -> dict: + """Collects the schema fields for a given integration.""" + if integration is None: + return {field: value for dataset in integrations_schemas[package][package_version] if dataset != "jobs" + for field, value in integrations_schemas[package][package_version][dataset].items()} + + if integration not in integrations_schemas[package][package_version]: + raise ValueError(f"Integration {integration} not found in package {package} version {package_version}") + + return integrations_schemas[package][package_version][integration] + + +def parse_datasets(datasets: list, package_manifest: dict) -> List[Optional[dict]]: + """Parses datasets into packaged integrations from rule data.""" + packaged_integrations = [] + for value in sorted(datasets): + + # cleanup extra quotes pulled from ast field + value = value.strip('"') + + integration = 'Unknown' + if '.' in value: + package, integration = value.split('.', 1) + else: + package = value + + if package in list(package_manifest): + packaged_integrations.append({"package": package, "integration": integration}) + return packaged_integrations + + class SecurityDetectionEngine: """Dedicated to Security Detection Engine integration.""" diff --git a/detection_rules/rule.py b/detection_rules/rule.py index b001c0bab..31f216015 100644 --- a/detection_rules/rule.py +++ b/detection_rules/rule.py @@ -22,11 +22,11 @@ from marko.ext.gfm import gfm from marshmallow import ValidationError, validates_schema import kql -from kql.ast import FieldComparison from . import beats, ecs, endgame, utils -from .integrations import (find_least_compatible_version, - load_integrations_manifests) +from .integrations import (find_least_compatible_version, get_integration_schema_fields, + load_integrations_manifests, load_integrations_schemas, + parse_datasets) from .misc import load_current_package_version from .mixins import MarshmallowDataclassMixin, StackCompatMixin from .rule_formatter import nested_normalize, toml_write @@ -513,6 +513,21 @@ class QueryValidator: beat_types, beat_schema, schema = self.get_beats_schema(index or [], beats_version, ecs_version) endgame_schema = self.get_endgame_schema(index or [], endgame_version) + # construct integration schemas + packages_manifest = load_integrations_manifests() + integrations_schemas = load_integrations_schemas() + datasets, _ = beats.get_datasets_and_modules(self.ast) + package_integrations = parse_datasets(datasets, packages_manifest) + int_schema = {} + data = {"notify": False} + + for pk_int in package_integrations: + package = pk_int["package"] + integration = pk_int["integration"] + schema, _ = get_integration_schema_fields(integrations_schemas, package, integration, + current_version, packages_manifest, {}, data) + int_schema.update(schema) + required = [] unique_fields = self.unique_fields or [] @@ -521,7 +536,9 @@ class QueryValidator: is_ecs = field_type is not None if not is_ecs: - if beat_schema: + if int_schema: + field_type = int_schema.get(fld, None) + elif beat_schema: field_type = beat_schema.get(fld, {}).get('type') elif endgame_schema: field_type = endgame_schema.endgame_schema.get(fld, None) @@ -1093,13 +1110,7 @@ class TOMLRuleContents(BaseRuleContents, MarshmallowDataclassMixin): def get_packaged_integrations(cls, data: QueryRuleData, meta: RuleMeta, package_manifest: dict) -> Optional[List[dict]]: packaged_integrations = [] - datasets = set() - - for node in data.get('ast') or []: - if isinstance(node, eql.ast.Comparison) and str(node.left) == 'event.dataset': - datasets.update(set(n.value for n in node if isinstance(n, eql.ast.Literal))) - elif isinstance(node, FieldComparison) and str(node.field) == 'event.dataset': - datasets.update(set(str(n) for n in node if isinstance(n, kql.ast.Value))) + datasets, _ = beats.get_datasets_and_modules(data.get('ast') or []) # integration is None to remove duplicate references upstream in Kibana # chronologically, event.dataset is checked for package:integration, then rule tags @@ -1114,15 +1125,7 @@ class TOMLRuleContents(BaseRuleContents, MarshmallowDataclassMixin): if integration in ineligible_integrations or isinstance(data, MachineLearningRuleData): packaged_integrations.append({"package": integration, "integration": None}) - for value in sorted(datasets): - integration = 'Unknown' - if '.' in value: - package, integration = value.split('.', 1) - else: - package = value - - if package in list(package_manifest): - packaged_integrations.append({"package": package, "integration": integration}) + packaged_integrations.extend(parse_datasets(datasets, package_manifest)) return packaged_integrations