[FR] Add Integration Schema Query Validation (#2470)

This commit is contained in:
Mika Ayenson
2023-02-02 16:22:44 -05:00
committed by GitHub
parent cd2307ba7d
commit 1784429aa7
54 changed files with 559 additions and 166 deletions
+12
View File
@@ -117,6 +117,14 @@ def _flatten_schema(schema: list, prefix="") -> list:
# it's probably not perfect, but we can fix other bugs as we run into them later
if len(schema) == 1 and nested_prefix.startswith(prefix + prefix):
nested_prefix = s["name"] + "."
if "field" in s:
# integrations sometimes have a group with a single field
flattened.extend(_flatten_schema(s["field"], prefix=nested_prefix))
continue
elif "fields" not in s:
# integrations sometimes have a group with no fields
continue
flattened.extend(_flatten_schema(s["fields"], prefix=nested_prefix))
elif "fields" in s:
flattened.extend(_flatten_schema(s["fields"], prefix=prefix))
@@ -131,6 +139,10 @@ def _flatten_schema(schema: list, prefix="") -> list:
return flattened
def flatten_ecs_schema(schema: dict) -> dict:
return _flatten_schema(schema)
def get_field_schema(base_directory, prefix="", include_common=False):
base_directory = base_directory.get("folders", {}).get("_meta", {}).get("files", {})
flattened = []
+43 -4
View File
@@ -33,7 +33,8 @@ from .docs import IntegrationSecurityDocs
from .endgame import EndgameSchemaManager
from .eswrap import CollectEvents, add_range_to_dsl
from .ghwrap import GithubClient, update_gist
from .integrations import build_integrations_manifest
from .integrations import (build_integrations_manifest, build_integrations_schemas, find_latest_compatible_version,
load_integrations_manifests)
from .main import root
from .misc import PYTHON_LICENSE, add_client, client_error
from .packaging import (CURRENT_RELEASE_PATH, PACKAGE_FILE, RELEASE_DIR,
@@ -1174,10 +1175,48 @@ def integrations_group():
def build_integration_manifests(overwrite: bool):
"""Builds consolidated integrations manifests file."""
click.echo("loading rules to determine all integration tags")
def flatten(tag_list: List[str]) -> List[str]:
return list(set([tag for tags in tag_list for tag in (flatten(tags) if isinstance(tags, list) else [tags])]))
rules = RuleCollection.default()
integration_tags = list(set([r.contents.metadata.integration for r in rules if r.contents.metadata.integration]))
click.echo(f"integration tags identified: {integration_tags}")
build_integrations_manifest(overwrite, integration_tags)
integration_tags = [r.contents.metadata.integration for r in rules if r.contents.metadata.integration]
unique_integration_tags = flatten(integration_tags)
click.echo(f"integration tags identified: {unique_integration_tags}")
build_integrations_manifest(overwrite, unique_integration_tags)
@integrations_group.command('build-schemas')
@click.option('--overwrite', '-o', is_flag=True, help="Overwrite the entire integrations-schema.json.gz file")
def build_integration_schemas(overwrite: bool):
"""Builds consolidated integrations schemas file."""
click.echo("Building integration schemas...")
start_time = time.perf_counter()
build_integrations_schemas(overwrite)
end_time = time.perf_counter()
click.echo(f"Time taken to generate schemas: {(end_time - start_time)/60:.2f} minutes")
@integrations_group.command('show-latest-compatible')
@click.option('--package', '-p', help='Name of package')
@click.option('--stack_version', '-s', required=True, help='Rule stack version')
def show_latest_compatible_version(package: str, stack_version: str) -> None:
"""Prints the latest integration compatible version for specified package based on stack version supplied."""
packages_manifest = None
try:
packages_manifest = load_integrations_manifests()
except Exception as e:
click.echo(f"Error loading integrations manifests: {str(e)}")
return
try:
version = find_latest_compatible_version(package, "", stack_version, packages_manifest)
click.echo(f"Compatible integration {version=}")
except Exception as e:
click.echo(f"Error finding compatible version: {str(e)}")
return
@dev_group.group('schemas')
Binary file not shown.
Binary file not shown.
+178 -5
View File
@@ -4,20 +4,27 @@
# 2.0.
"""Functions to support and interact with Kibana integrations."""
import glob
import gzip
import json
import os
import re
from collections import OrderedDict
from pathlib import Path
from typing import Generator, Tuple, Union
import requests
import yaml
from marshmallow import EXCLUDE, Schema, fields, post_load
import kql
from . import ecs
from .beats import flatten_ecs_schema
from .semver import Version
from .utils import cached, get_etc_path, read_gzip
from .utils import cached, get_etc_path, read_gzip, unzip
MANIFEST_FILE_PATH = Path(get_etc_path('integration-manifests.json.gz'))
SCHEMA_FILE_PATH = Path(get_etc_path('integration-schemas.json.gz'))
@cached
@@ -26,11 +33,18 @@ def load_integrations_manifests() -> dict:
return json.loads(read_gzip(get_etc_path('integration-manifests.json.gz')))
@cached
def load_integrations_schemas() -> dict:
"""Load the consolidated integrations schemas."""
return json.loads(read_gzip(get_etc_path('integration-schemas.json.gz')))
class IntegrationManifestSchema(Schema):
name = fields.Str(required=True)
version = fields.Str(required=True)
release = fields.Str(required=True)
description = fields.Str(required=True)
download = fields.Str(required=True)
conditions = fields.Dict(required=True)
policy_templates = fields.List(fields.Dict, required=True)
owner = fields.Dict(required=False)
@@ -44,8 +58,8 @@ class IntegrationManifestSchema(Schema):
def build_integrations_manifest(overwrite: bool, rule_integrations: list) -> None:
"""Builds a new local copy of manifest.yaml from integrations Github."""
if overwrite:
if os.path.exists(MANIFEST_FILE_PATH):
os.remove(MANIFEST_FILE_PATH)
if MANIFEST_FILE_PATH.exists():
MANIFEST_FILE_PATH.unlink()
final_integration_manifests = {integration: {} for integration in rule_integrations}
@@ -62,6 +76,63 @@ def build_integrations_manifest(overwrite: bool, rule_integrations: list) -> Non
print(f"final integrations manifests dumped: {MANIFEST_FILE_PATH}")
def build_integrations_schemas(overwrite: bool) -> None:
"""Builds a new local copy of integration-schemas.json.gz from EPR integrations."""
final_integration_schemas = {}
saved_integration_schemas = {}
# Check if the file already exists and handle accordingly
if overwrite and SCHEMA_FILE_PATH.exists():
SCHEMA_FILE_PATH.unlink()
elif SCHEMA_FILE_PATH.exists():
saved_integration_schemas = load_integrations_schemas()
# Load the integration manifests
integration_manifests = load_integrations_manifests()
# Loop through the packages and versions
for package, versions in integration_manifests.items():
print(f"processing {package}")
final_integration_schemas.setdefault(package, {})
for version, manifest in versions.items():
if package in saved_integration_schemas and version in saved_integration_schemas[package]:
continue
# Download the zip file
download_url = f"https://epr.elastic.co{manifest['download']}"
response = requests.get(download_url)
response.raise_for_status()
# Update the final integration schemas
final_integration_schemas[package].update({version: {}})
# Open the zip file
with unzip(response.content) as zip_ref:
for file in zip_ref.namelist():
# Check if the file is a match
if glob.fnmatch.fnmatch(file, '*/fields/*.yml'):
integration_name = Path(file).parent.parent.name
final_integration_schemas[package][version].setdefault(integration_name, {})
file_data = zip_ref.read(file)
schema_fields = yaml.safe_load(file_data)
# Parse the schema and add to the integration_manifests
data = flatten_ecs_schema(schema_fields)
flat_data = {field['name']: field['type'] for field in data}
final_integration_schemas[package][version][integration_name].update(flat_data)
del file_data
# Write the final integration schemas to disk
with gzip.open(SCHEMA_FILE_PATH, "w") as schema_file:
schema_file_bytes = json.dumps(final_integration_schemas).encode("utf-8")
schema_file.write(schema_file_bytes)
print(f"final integrations manifests dumped: {SCHEMA_FILE_PATH}")
def find_least_compatible_version(package: str, integration: str,
current_stack_version: str, packages_manifest: dict) -> str:
"""Finds least compatible version for specified integration based on stack version supplied."""
@@ -89,12 +160,54 @@ def find_least_compatible_version(package: str, integration: str,
raise ValueError(f"no compatible version for integration {package}:{integration}")
def find_latest_compatible_version(package: str, integration: str,
rule_stack_version: str, packages_manifest: dict) -> Union[None, Tuple[str, str]]:
"""Finds least compatible version for specified integration based on stack version supplied."""
if not package:
raise ValueError("Package must be specified")
package_manifest = packages_manifest.get(package)
if package_manifest is None:
raise ValueError(f"Package {package} not found in manifest.")
# Converts the dict keys (version numbers) to Version objects for proper sorting (descending)
integration_manifests = sorted(package_manifest.items(), key=lambda x: Version(str(x[0])), reverse=True)
notice = ""
for version, manifest in integration_manifests:
kibana_conditions = manifest.get("conditions", {}).get("kibana", {})
version_requirement = kibana_conditions.get("version")
if not version_requirement:
raise ValueError(f"Manifest for {package}:{integration} version {version} is missing conditions.")
compatible_versions = re.sub(r"\>|\<|\=|\^", "", version_requirement).split(" || ")
if not compatible_versions:
raise ValueError(f"Manifest for {package}:{integration} version {version} is missing compatible versions")
highest_compatible_version = max(compatible_versions, key=lambda x: Version(x))
if Version(highest_compatible_version) > Version(rule_stack_version):
# generate notice message that a later integration version is available
integration = f" {integration.strip()}" if integration else ""
notice = (f"There is a new integration {package}{integration} version {version} available!",
f"Update the rule min_stack version from {rule_stack_version} to "
f"{highest_compatible_version} if using new features in this latest version.")
elif int(highest_compatible_version[0]) == int(rule_stack_version[0]):
return version, notice
raise ValueError(f"no compatible version for integration {package}:{integration}")
def get_integration_manifests(integration: str) -> list:
"""Iterates over specified integrations from package-storage and combines manifests per version."""
epr_search_url = "https://epr.elastic.co/search"
# link for search parameters - https://github.com/elastic/package-registry
epr_search_parameters = {"package": f"{integration}", "prerelease": "true",
epr_search_parameters = {"package": f"{integration}", "prerelease": "false",
"all": "true", "include_policy_templates": "true"}
epr_search_response = requests.get(epr_search_url, params=epr_search_parameters)
epr_search_response.raise_for_status()
@@ -106,3 +219,63 @@ def get_integration_manifests(integration: str) -> list:
print(f"loaded {integration} manifests from the following package versions: "
f"{[manifest['version'] for manifest in manifests]}")
return manifests
def get_integration_schema_data(data, meta, package_integrations: dict) -> Generator[dict, None, None]:
"""Iterates over specified integrations from package-storage and combines schemas per version."""
# lazy import to avoid circular import
from .rule import ( # pylint: disable=import-outside-toplevel
QueryRuleData, RuleMeta
)
data: QueryRuleData = data
meta: RuleMeta = meta
packages_manifest = load_integrations_manifests()
integrations_schemas = load_integrations_schemas()
# validate the query against related integration fields
if isinstance(data, QueryRuleData) and data.language != 'lucene' and meta.maturity == "production":
# flag to only warn once per integration for available upgrades
notify_update_available = True
for stack_version, mapping in meta.get_validation_stack_versions().items():
ecs_version = mapping['ecs']
endgame_version = mapping['endgame']
ecs_schema = ecs.flatten_multi_fields(ecs.get_schema(ecs_version, name='ecs_flat'))
for pk_int in package_integrations:
package = pk_int["package"]
integration = pk_int["integration"]
package_version, notice = find_latest_compatible_version(package=package,
integration=integration,
rule_stack_version=meta.min_stack_version,
packages_manifest=packages_manifest)
if notify_update_available and notice and data.get("notify", False):
# Notify for now, as to not lock rule stacks to integrations
notify_update_available = False
print(f"\n{data.get('name')}")
print(*notice)
schema = {}
if integration is None:
# Use all fields from each dataset
for dataset in integrations_schemas[package][package_version]:
schema.update(integrations_schemas[package][package_version][dataset])
else:
if integration not in integrations_schemas[package][package_version]:
raise ValueError(f"Integration {integration} not found in package {package} "
f"version {package_version}")
schema = integrations_schemas[package][package_version][integration]
schema.update(ecs_schema)
integration_schema = {k: kql.parser.elasticsearch_type_family(v) for k, v in schema.items()}
data = {"schema": integration_schema, "package": package, "integration": integration,
"stack_version": stack_version, "ecs_version": ecs_version,
"package_version": package_version, "endgame_version": endgame_version}
yield data
+11 -5
View File
@@ -240,6 +240,10 @@ class BaseRuleData(MarshmallowDataclassMixin, StackCompatMixin):
def data_validator(self) -> Optional['DataValidator']:
return DataValidator(is_elastic_rule=self.is_elastic_rule, **self.to_dict())
@cached_property
def notify(self) -> bool:
return os.environ.get('DR_NOTIFY_INTEGRATION_UPDATE_AVAILABLE') is not None
@cached_property
def parsed_note(self) -> Optional[MarkoDocument]:
dv = self.data_validator
@@ -847,7 +851,7 @@ class TOMLRuleContents(BaseRuleContents, MarshmallowDataclassMixin):
if self.check_restricted_field_version(field_name):
if isinstance(self.data, QueryRuleData) and self.data.language != 'lucene':
package_integrations = self._get_packaged_integrations(packages_manifest)
package_integrations = self.get_packaged_integrations(self.data, self.metadata, packages_manifest)
if not package_integrations:
return
@@ -947,11 +951,13 @@ class TOMLRuleContents(BaseRuleContents, MarshmallowDataclassMixin):
max_stack = max_stack or current_version
return Version(min_stack) <= current_version >= Version(max_stack)
def _get_packaged_integrations(self, package_manifest: dict) -> Optional[List[dict]]:
@classmethod
def get_packaged_integrations(cls, data: QueryRuleData, meta: RuleMeta,
package_manifest: dict) -> Optional[List[dict]]:
packaged_integrations = []
datasets = set()
for node in self.data.get('ast', []):
for node in data.get('ast', []):
if isinstance(node, eql.ast.Comparison) and str(node.left) == 'event.dataset':
datasets.update(set(n.value for n in node if isinstance(n, eql.ast.Literal)))
elif isinstance(node, FieldComparison) and str(node.field) == 'event.dataset':
@@ -960,10 +966,10 @@ class TOMLRuleContents(BaseRuleContents, MarshmallowDataclassMixin):
if not datasets:
# windows and endpoint integration do not have event.dataset fields in queries
# integration is None to remove duplicate references upstream in Kibana
rule_integrations = self.metadata.get("integration", [])
rule_integrations = meta.get("integration", [])
if rule_integrations:
for integration in rule_integrations:
if integration in ["windows", "endpoint", "apm"]:
if integration in definitions.NON_DATASET_PACKAGES:
packaged_integrations.append({"package": integration, "integration": None})
for value in sorted(datasets):
+194 -43
View File
@@ -8,14 +8,16 @@ from functools import cached_property
from typing import List, Optional, Union
import eql
import kql
from . import ecs, endgame
from .rule import QueryRuleData, QueryValidator, RuleMeta
from .integrations import get_integration_schema_data, load_integrations_manifests
from .rule import QueryRuleData, QueryValidator, RuleMeta, TOMLRuleContents
class KQLValidator(QueryValidator):
"""Specific fields for query event types."""
"""Specific fields for KQL query event types."""
@cached_property
def ast(self) -> kql.ast.Expression:
@@ -34,29 +36,97 @@ class KQLValidator(QueryValidator):
# syntax only, which is done via self.ast
return
for stack_version, mapping in meta.get_validation_stack_versions().items():
beats_version = mapping['beats']
ecs_version = mapping['ecs']
err_trailer = f'stack: {stack_version}, beats: {beats_version}, ecs: {ecs_version}'
if isinstance(data, QueryRuleData) and data.language != 'lucene':
packages_manifest = load_integrations_manifests()
package_integrations = TOMLRuleContents.get_packaged_integrations(data, meta, packages_manifest)
beat_types, beat_schema, schema = self.get_beats_schema(data.index or [], beats_version, ecs_version)
if package_integrations:
# validate the query against related integration fields
self.validate_integration(data, meta, package_integrations)
else:
for stack_version, mapping in meta.get_validation_stack_versions().items():
beats_version = mapping['beats']
ecs_version = mapping['ecs']
err_trailer = f'stack: {stack_version}, beats: {beats_version}, ecs: {ecs_version}'
beat_types, beat_schema, schema = self.get_beats_schema(data.index or [],
beats_version, ecs_version)
try:
kql.parse(self.query, schema=schema)
except kql.KqlParseError as exc:
message = exc.error_msg
trailer = err_trailer
if "Unknown field" in message and beat_types:
trailer = f"\nTry adding event.module or event.dataset to specify beats module\n\n{trailer}"
raise kql.KqlParseError(exc.error_msg, exc.line, exc.column, exc.source,
len(exc.caret.lstrip()), trailer=trailer) from None
except Exception:
print(err_trailer)
raise
def validate_integration(self, data: QueryRuleData, meta: RuleMeta, package_integrations: List[dict]) -> None:
"""Validate the query, called from the parent which contains [metadata] information."""
if meta.query_schema_validation is False or meta.maturity == "deprecated":
# syntax only, which is done via self.ast
return
error_fields = {}
current_stack_version = ""
combined_schema = {}
for integration_schema_data in get_integration_schema_data(data, meta, package_integrations):
ecs_version = integration_schema_data['ecs_version']
integration = integration_schema_data['integration']
package = integration_schema_data['package']
package_version = integration_schema_data['package_version']
integration_schema = integration_schema_data['schema']
stack_version = integration_schema_data['stack_version']
if stack_version != current_stack_version:
# reset the combined schema for each stack version
current_stack_version = stack_version
combined_schema = {}
# add non-ecs-schema fields for edge cases not added to the integration
for index_name in data.index:
integration_schema.update(**ecs.flatten(ecs.get_index_schema(index_name)))
combined_schema.update(**integration_schema)
try:
kql.parse(self.query, schema=schema)
# validate the query against the integration fields with the package version
kql.parse(self.query, schema=integration_schema)
except kql.KqlParseError as exc:
message = exc.error_msg
trailer = err_trailer
if "Unknown field" in message and beat_types:
trailer = f"\nTry adding event.module or event.dataset to specify beats module\n\n{trailer}"
if exc.error_msg == "Unknown field":
field = extract_error_field(exc)
trailer = (f"\n\tTry adding event.module or event.dataset to specify integration module\n\t"
f"Will check against integrations {meta.integration} combined.\n\t"
f"{package=}, {integration=}, {package_version=}, "
f"{stack_version=}, {ecs_version=}"
)
error_fields[field] = {"error": exc, "trailer": trailer}
print(f"\nWarning: `{field}` in `{data.name}` not found in schema. {trailer}")
else:
raise kql.KqlParseError(exc.error_msg, exc.line, exc.column, exc.source,
len(exc.caret.lstrip()), trailer=trailer) from None
raise kql.KqlParseError(exc.error_msg, exc.line, exc.column, exc.source,
len(exc.caret.lstrip()), trailer=trailer) from None
except Exception:
print(err_trailer)
raise
# don't error on fields that are in another integration schema
for field in list(error_fields.keys()):
if field in combined_schema:
del error_fields[field]
# raise the first error
if error_fields:
_, data = next(iter(error_fields.items()))
exc = data["error"]
trailer = data["trailer"]
raise kql.KqlParseError(exc.error_msg, exc.line, exc.column, exc.source,
len(exc.caret.lstrip()), trailer=trailer) from None
class EQLValidator(QueryValidator):
"""Specific fields for EQL query event types."""
@cached_property
def ast(self) -> eql.ast.Expression:
@@ -74,7 +144,108 @@ class EQLValidator(QueryValidator):
def unique_fields(self) -> List[str]:
return list(set(str(f) for f in self.ast if isinstance(f, eql.ast.Field)))
def validate_query_with_schema(self, schema: Union[ecs.KqlSchema2Eql, endgame.EndgameSchema],
def validate(self, data: 'QueryRuleData', meta: RuleMeta) -> None:
"""Validate an EQL query while checking TOMLRule."""
if meta.query_schema_validation is False or meta.maturity == "deprecated":
# syntax only, which is done via self.ast
return
if isinstance(data, QueryRuleData) and data.language != 'lucene':
packages_manifest = load_integrations_manifests()
package_integrations = TOMLRuleContents.get_packaged_integrations(data, meta, packages_manifest)
if package_integrations:
# validate the query against related integration fields
self.validate_integration(data, meta, package_integrations)
else:
for stack_version, mapping in meta.get_validation_stack_versions().items():
beats_version = mapping['beats']
ecs_version = mapping['ecs']
endgame_version = mapping['endgame']
err_trailer = f'stack: {stack_version}, beats: {beats_version},' \
f'ecs: {ecs_version}, endgame: {endgame_version}'
beat_types, beat_schema, schema = self.get_beats_schema(data.index or [],
beats_version, ecs_version)
endgame_schema = self.get_endgame_schema(data.index, endgame_version)
eql_schema = ecs.KqlSchema2Eql(schema)
# validate query against the beats and eql schema
self.validate_query_with_schema(data=data, schema=eql_schema, err_trailer=err_trailer,
beat_types=beat_types)
if endgame_schema:
# validate query against the endgame schema
self.validate_query_with_schema(data=data, schema=endgame_schema, err_trailer=err_trailer)
def validate_integration(self, data: QueryRuleData, meta: RuleMeta, package_integrations: List[dict]) -> None:
"""Validate an EQL query while checking TOMLRule against integration schemas."""
if meta.query_schema_validation is False or meta.maturity == "deprecated":
# syntax only, which is done via self.ast
return
error_fields = {}
current_stack_version = ""
combined_schema = {}
for integration_schema_data in get_integration_schema_data(data, meta, package_integrations):
ecs_version = integration_schema_data['ecs_version']
integration = integration_schema_data['integration']
package = integration_schema_data['package']
package_version = integration_schema_data['package_version']
integration_schema = integration_schema_data['schema']
stack_version = integration_schema_data['stack_version']
endgame_version = integration_schema_data['endgame_version']
if stack_version != current_stack_version:
# reset the combined schema for each stack version
current_stack_version = stack_version
combined_schema = {}
# add non-ecs-schema fields for edge cases not added to the integration
for index_name in data.index:
integration_schema.update(**ecs.flatten(ecs.get_index_schema(index_name)))
combined_schema.update(**integration_schema)
eql_schema = ecs.KqlSchema2Eql(integration_schema)
err_trailer = f'stack: {stack_version}, integration: {integration},' \
f'ecs: {ecs_version}, package: {package}, package_version: {package_version}'
try:
self.validate_query_with_schema(data=data, schema=eql_schema, err_trailer=err_trailer)
except eql.EqlParseError as exc:
message = exc.error_msg
if message == "Unknown field" or "Field not recognized" in message:
field = extract_error_field(exc)
trailer = (f"\n\tTry adding event.module or event.dataset to specify integration module\n\t"
f"Will check against integrations {meta.integration} combined.\n\t"
f"{package=}, {integration=}, {package_version=}, "
f"{stack_version=}, {ecs_version=}"
)
error_fields[field] = {"error": exc, "trailer": trailer}
print(f"\nWarning: `{field}` in `{data.name}` not found in schema. {trailer}")
else:
raise exc
# Still need to check endgame if it's in the index
endgame_schema = self.get_endgame_schema(data.index, endgame_version)
if endgame_schema:
# validate query against the endgame schema
err_trailer = f'stack: {stack_version}, endgame: {endgame_version}'
self.validate_query_with_schema(data=data, schema=endgame_schema, err_trailer=err_trailer)
# don't error on fields that are in another integration schema
for field in list(error_fields.keys()):
if field in combined_schema:
del error_fields[field]
# raise the first error
if error_fields:
_, data = next(iter(error_fields.items()))
exc = data["error"]
raise exc
def validate_query_with_schema(self, data: 'QueryRuleData', schema: Union[ecs.KqlSchema2Eql, endgame.EndgameSchema],
err_trailer: str, beat_types: list = None) -> None:
"""Validate the query against the schema."""
try:
@@ -93,37 +264,17 @@ class EQLValidator(QueryValidator):
raise exc.__class__(exc.error_msg, exc.line, exc.column, exc.source,
len(exc.caret.lstrip()), trailer=trailer) from None
except Exception:
print(err_trailer)
raise
def validate(self, data: 'QueryRuleData', meta: RuleMeta) -> None:
"""Validate an EQL query while checking TOMLRule."""
if meta.query_schema_validation is False or meta.maturity == "deprecated":
# syntax only, which is done via self.ast
return
for stack_version, mapping in meta.get_validation_stack_versions().items():
beats_version = mapping['beats']
ecs_version = mapping['ecs']
endgame_version = mapping['endgame']
err_trailer = f'stack: {stack_version}, beats: {beats_version},' \
f'ecs: {ecs_version}, endgame: {endgame_version}'
beat_types, beat_schema, schema = self.get_beats_schema(data.index or [], beats_version, ecs_version)
endgame_schema = self.get_endgame_schema(data.index, endgame_version)
eql_schema = ecs.KqlSchema2Eql(schema)
# validate query against the beats and eql schema
self.validate_query_with_schema(schema=eql_schema, err_trailer=err_trailer, beat_types=beat_types)
if endgame_schema:
# validate query against the endgame schema
self.validate_query_with_schema(schema=endgame_schema, err_trailer=err_trailer)
def extract_error_field(exc: Union[eql.EqlParseError, kql.KqlParseError]) -> Optional[str]:
line = exc.source.splitlines()[exc.line]
"""Extract the field name from an EQL or KQL parse error."""
lines = exc.source.splitlines()
mod = -1 if exc.line == len(lines) else 0
line = lines[exc.line + mod]
start = exc.column
stop = start + len(exc.caret.strip())
return line[start:stop]
+1
View File
@@ -27,6 +27,7 @@ VERSION_PATTERN = f'^{_version}$'
MINOR_SEMVER = r'^\d+\.\d+$'
BRANCH_PATTERN = f'{VERSION_PATTERN}|^master$'
NON_DATASET_PACKAGES = ['apm', 'endpoint', 'system', 'windows']
INTERVAL_PATTERN = r'^\d+[mshd]$'
TACTIC_URL = r'^https://attack.mitre.org/tactics/TA[0-9]+/$'
TECHNIQUE_URL = r'^https://attack.mitre.org/techniques/T[0-9]+/$'
+1 -1
View File
@@ -113,7 +113,7 @@ def load_etc_dump(*path):
def save_etc_dump(contents, *path, **kwargs):
"""Load a json/yml/toml file from the detection_rules/etc/ folder."""
"""Save a json/yml/toml file from the detection_rules/etc/ folder."""
path = get_etc_path(*path)
_, ext = os.path.splitext(path)
sort_keys = kwargs.pop('sort_keys', True)
@@ -1,10 +1,10 @@
[metadata]
creation_date = "2020/09/01"
integration = ["azure"]
integration = ["azure", "o365"]
maturity = "production"
min_stack_comments = "New fields added: required_fields, related_integrations, setup"
min_stack_version = "8.3.0"
updated_date = "2022/12/14"
updated_date = "2023/02/01"
[rule]
author = ["Elastic"]
@@ -14,7 +14,7 @@ permissions to an application. An adversary may create an Azure-registered appli
as contact information, email, or documents.
"""
from = "now-25m"
index = ["filebeat-*", "logs-azure*"]
index = ["filebeat-*", "logs-azure*", "logs-o365*"]
language = "kuery"
license = "Elastic License v2"
name = "Possible Consent Grant Attack via Azure-Registered Application"
@@ -79,6 +79,7 @@ tags = [
"Cloud",
"Azure",
"Continuous Monitoring",
"Microsoft 365",
"SecOps",
"Identity and Access",
"Investigation Guide",
@@ -1,9 +1,10 @@
[metadata]
creation_date = "2022/09/14"
integration = ["system"]
maturity = "production"
min_stack_comments = "New fields added: required_fields, related_integrations, setup"
min_stack_version = "8.3.0"
updated_date = "2022/11/28"
updated_date = "2023/02/01"
[rule]
author = ["Elastic"]
@@ -1,9 +1,10 @@
[metadata]
creation_date = "2022/09/14"
integration = ["system"]
maturity = "production"
min_stack_comments = "New fields added: required_fields, related_integrations, setup"
min_stack_version = "8.3.0"
updated_date = "2022/11/28"
updated_date = "2023/02/01"
[rule]
author = ["Elastic"]
@@ -1,9 +1,10 @@
[metadata]
creation_date = "2022/09/14"
integration = ["system"]
maturity = "production"
min_stack_comments = "New fields added: required_fields, related_integrations, setup"
min_stack_version = "8.3.0"
updated_date = "2023/01/27"
updated_date = "2023/02/01"
[rule]
author = ["Elastic"]
@@ -1,10 +1,10 @@
[metadata]
creation_date = "2023/01/11"
integration = ["windows"]
integration = ["endpoint"]
maturity = "production"
min_stack_comments = "New fields added: required_fields, related_integrations, setup"
min_stack_version = "8.4.0"
updated_date = "2023/01/11"
updated_date = "2023/02/01"
[rule]
author = ["Elastic"]
@@ -28,8 +28,8 @@ timestamp_override = "event.ingested"
type = "eql"
query = '''
process where event.action == "start" and process.name : "OUTLOOK.EXE" and
process.Ext.effective_parent.name != null and
process where event.action == "start" and process.name : "OUTLOOK.EXE" and
process.Ext.effective_parent.name != null and
not process.Ext.effective_parent.executable : ("?:\\Program Files\\*", "?:\\Program Files (x86)\\*")
'''
@@ -56,14 +56,14 @@ reference = "https://attack.mitre.org/tactics/TA0009/"
[[rule.threat]]
framework = "MITRE ATT&CK"
[[rule.threat.technique]]
id = "T1559"
name = "Inter-Process Communication"
[[rule.threat.technique]]
id = "T1559"
name = "Inter-Process Communication"
reference = "https://attack.mitre.org/techniques/T1559/"
[[rule.threat.technique.subtechnique]]
[[rule.threat.technique.subtechnique]]
id = "T1559.001"
name = "Component Object Model"
name = "Component Object Model"
reference = "https://attack.mitre.org/techniques/T1559/001/"
@@ -1,10 +1,10 @@
[metadata]
creation_date = "2020/08/29"
integration = ["windows"]
integration = ["system", "windows"]
maturity = "production"
min_stack_comments = "New fields added: required_fields, related_integrations, setup"
min_stack_version = "8.3.0"
updated_date = "2022/12/21"
updated_date = "2023/02/01"
[rule]
author = ["Elastic"]
@@ -1,10 +1,10 @@
[metadata]
creation_date = "2020/08/29"
integration = ["windows"]
integration = ["system", "windows"]
maturity = "production"
min_stack_comments = "New fields added: required_fields, related_integrations, setup"
min_stack_version = "8.3.0"
updated_date = "2022/12/21"
updated_date = "2023/02/01"
[rule]
author = ["Elastic"]
@@ -1,10 +1,10 @@
[metadata]
creation_date = "2020/08/29"
integration = ["windows"]
integration = ["system", "windows"]
maturity = "production"
min_stack_comments = "New fields added: required_fields, related_integrations, setup"
min_stack_version = "8.3.0"
updated_date = "2022/12/21"
updated_date = "2023/02/01"
[rule]
author = ["Elastic"]
@@ -1,10 +1,10 @@
[metadata]
creation_date = "2022/02/08"
integration = ["windows"]
integration = ["system", "windows"]
maturity = "production"
min_stack_comments = "New fields added: required_fields, related_integrations, setup"
min_stack_version = "8.3.0"
updated_date = "2023/01/27"
updated_date = "2023/02/01"
[rule]
author = ["Elastic"]
@@ -1,10 +1,10 @@
[metadata]
creation_date = "2022/01/24"
integration = ["windows"]
integration = ["system", "windows"]
maturity = "production"
min_stack_comments = "New fields added: required_fields, related_integrations, setup"
min_stack_version = "8.3.0"
updated_date = "2022/12/21"
updated_date = "2023/02/01"
[rule]
author = ["Elastic"]
@@ -1,10 +1,10 @@
[metadata]
creation_date = "2022/11/09"
integration = ["windows"]
integration = ["system", "windows"]
maturity = "production"
min_stack_comments = "New fields added: required_fields, related_integrations, setup"
min_stack_version = "8.3.0"
updated_date = "2022/12/21"
updated_date = "2023/02/01"
[rule]
author = ["Elastic"]
@@ -1,8 +1,8 @@
[metadata]
creation_date = "2022/02/16"
integration = ["windows"]
integration = ["system", "windows"]
maturity = "production"
updated_date = "2023/01/03"
updated_date = "2023/02/01"
min_stack_comments = "New fields added: required_fields, related_integrations, setup"
min_stack_version = "8.3.0"
@@ -1,10 +1,10 @@
[metadata]
creation_date = "2022/03/01"
integration = ["endpoint", "windows"]
integration = ["endpoint", "system", "windows"]
maturity = "production"
min_stack_comments = "New fields added: required_fields, related_integrations, setup"
min_stack_version = "8.3.0"
updated_date = "2023/01/31"
updated_date = "2023/02/01"
[rule]
author = ["Elastic"]
@@ -1,10 +1,10 @@
[metadata]
creation_date = "2022/08/30"
integration = ["windows"]
integration = ["system", "windows"]
maturity = "production"
min_stack_comments = "New fields added: required_fields, related_integrations, setup"
min_stack_version = "8.3.0"
updated_date = "2022/12/21"
updated_date = "2023/02/01"
[rule]
author = ["Elastic"]
@@ -1,10 +1,10 @@
[metadata]
creation_date = "2022/01/27"
integration = ["windows"]
integration = ["system", "windows"]
maturity = "production"
min_stack_comments = "New fields added: required_fields, related_integrations, setup"
min_stack_version = "8.3.0"
updated_date = "2022/12/21"
updated_date = "2023/02/01"
[rule]
author = ["Elastic"]
@@ -1,10 +1,10 @@
[metadata]
creation_date = "2022/01/26"
integration = ["windows"]
integration = ["system", "windows"]
maturity = "production"
min_stack_comments = "New fields added: required_fields, related_integrations, setup"
min_stack_version = "8.3.0"
updated_date = "2023/01/27"
updated_date = "2023/02/01"
[rule]
author = ["Elastic"]
@@ -1,10 +1,10 @@
[metadata]
creation_date = "2022/02/22"
integration = ["windows"]
integration = ["system", "windows"]
maturity = "production"
min_stack_comments = "New fields added: required_fields, related_integrations, setup"
min_stack_version = "8.3.0"
updated_date = "2022/12/21"
updated_date = "2023/02/01"
[rule]
author = ["Elastic"]
@@ -1,10 +1,10 @@
[metadata]
creation_date = "2022/02/16"
integration = ["windows"]
integration = ["system", "windows"]
maturity = "production"
min_stack_comments = "New fields added: required_fields, related_integrations, setup"
min_stack_version = "8.3.0"
updated_date = "2022/12/21"
updated_date = "2023/02/01"
[rule]
author = ["Elastic"]
@@ -1,10 +1,10 @@
[metadata]
creation_date = "2020/11/12"
integration = ["windows"]
integration = ["system", "windows"]
maturity = "production"
min_stack_comments = "New fields added: required_fields, related_integrations, setup"
min_stack_version = "8.3.0"
updated_date = "2022/12/21"
updated_date = "2023/02/01"
[rule]
author = ["Elastic", "Anabella Cristaldi"]
@@ -1,8 +1,8 @@
[metadata]
creation_date = "2020/10/15"
integration = ["windows"]
integration = ["system", "windows"]
maturity = "production"
updated_date = "2023/01/18"
updated_date = "2023/02/01"
min_stack_comments = "New fields added: required_fields, related_integrations, setup"
min_stack_version = "8.3.0"
@@ -1,10 +1,10 @@
[metadata]
creation_date = "2020/02/18"
integration = ["endpoint", "windows"]
integration = ["endpoint", "system", "windows"]
maturity = "production"
min_stack_comments = "New fields added: required_fields, related_integrations, setup"
min_stack_version = "8.3.0"
updated_date = "2022/12/14"
updated_date = "2023/02/01"
[rule]
author = ["Elastic"]
@@ -1,10 +1,10 @@
[metadata]
creation_date = "2022/08/30"
integration = ["windows"]
integration = ["system", "windows"]
maturity = "production"
min_stack_comments = "New fields added: required_fields, related_integrations, setup"
min_stack_version = "8.3.0"
updated_date = "2022/12/21"
updated_date = "2023/02/01"
[rule]
author = ["Elastic"]
@@ -1,10 +1,10 @@
[metadata]
creation_date = "2022/08/29"
integration = ["windows"]
integration = ["system", "windows"]
maturity = "production"
min_stack_comments = "New fields added: required_fields, related_integrations, setup"
min_stack_version = "8.3.0"
updated_date = "2022/12/21"
updated_date = "2023/02/01"
[rule]
author = ["Elastic"]
@@ -1,10 +1,10 @@
[metadata]
creation_date = "2020/02/18"
integration = ["endpoint", "windows"]
integration = ["endpoint", "system", "windows"]
maturity = "production"
min_stack_comments = "New fields added: required_fields, related_integrations, setup"
min_stack_version = "8.3.0"
updated_date = "2022/12/21"
updated_date = "2023/02/01"
[rule]
author = ["Elastic"]
@@ -1,10 +1,10 @@
[metadata]
creation_date = "2022/01/31"
integration = ["windows"]
integration = ["system", "windows"]
maturity = "production"
min_stack_comments = "New fields added: required_fields, related_integrations, setup"
min_stack_version = "8.3.0"
updated_date = "2022/12/21"
updated_date = "2023/02/01"
[rule]
author = ["Elastic"]
@@ -1,10 +1,10 @@
[metadata]
creation_date = "2022/02/22"
integration = ["windows"]
integration = ["system", "windows"]
maturity = "production"
min_stack_comments = "New fields added: required_fields, related_integrations, setup"
min_stack_version = "8.3.0"
updated_date = "2022/12/21"
updated_date = "2023/02/01"
[rule]
author = ["Elastic"]
@@ -1,10 +1,10 @@
[metadata]
creation_date = "2022/01/27"
integration = ["windows"]
integration = ["system", "windows"]
maturity = "production"
min_stack_comments = "New fields added: required_fields, related_integrations, setup"
min_stack_version = "8.3.0"
updated_date = "2022/12/21"
updated_date = "2023/02/01"
[rule]
author = ["Elastic"]
@@ -1,10 +1,10 @@
[metadata]
creation_date = "2021/10/18"
integration = ["windows"]
integration = ["system", "windows"]
maturity = "production"
min_stack_comments = "New fields added: required_fields, related_integrations, setup"
min_stack_version = "8.3.0"
updated_date = "2022/12/21"
updated_date = "2023/02/01"
[rule]
author = ["Elastic"]
@@ -1,10 +1,10 @@
[metadata]
creation_date = "2022/08/29"
integration = ["windows"]
integration = ["system", "windows"]
maturity = "production"
min_stack_comments = "New fields added: required_fields, related_integrations, setup"
min_stack_version = "8.3.0"
updated_date = "2022/12/21"
updated_date = "2023/02/01"
[rule]
author = ["Elastic"]
@@ -1,10 +1,10 @@
[metadata]
creation_date = "2022/08/29"
integration = ["windows"]
integration = ["system", "windows"]
maturity = "production"
min_stack_comments = "New fields added: required_fields, related_integrations, setup"
min_stack_version = "8.3.0"
updated_date = "2022/12/21"
updated_date = "2023/02/01"
[rule]
author = ["Elastic"]
@@ -1,10 +1,10 @@
[metadata]
creation_date = "2022/02/24"
integration = ["windows"]
integration = ["system", "windows"]
maturity = "production"
min_stack_comments = "New fields added: required_fields, related_integrations, setup"
min_stack_version = "8.3.0"
updated_date = "2022/12/21"
updated_date = "2023/02/01"
[rule]
author = ["Elastic"]
@@ -1,6 +1,6 @@
[metadata]
creation_date = "2022/08/30"
integration = ["windows"]
integration = ["system", "windows"]
maturity = "production"
min_stack_comments = "New fields added: required_fields, related_integrations, setup"
min_stack_version = "8.3.0"
@@ -1,10 +1,10 @@
[metadata]
creation_date = "2022/08/29"
integration = ["windows"]
integration = ["system", "windows"]
maturity = "production"
min_stack_comments = "New fields added: required_fields, related_integrations, setup"
min_stack_version = "8.3.0"
updated_date = "2022/12/21"
updated_date = "2023/02/01"
[rule]
author = ["Elastic"]
@@ -1,10 +1,10 @@
[metadata]
creation_date = "2021/01/09"
integration = ["windows"]
integration = ["system", "windows"]
maturity = "production"
min_stack_comments = "New fields added: required_fields, related_integrations, setup"
min_stack_version = "8.3.0"
updated_date = "2022/12/21"
updated_date = "2023/02/01"
[rule]
author = ["Elastic", "Skoetting"]
@@ -1,8 +1,8 @@
[metadata]
creation_date = "2021/01/04"
integration = ["windows"]
integration = ["system", "windows"]
maturity = "development"
updated_date = "2022/12/21"
updated_date = "2023/02/01"
[rule]
author = ["Skoetting"]
@@ -1,10 +1,10 @@
[metadata]
creation_date = "2022/08/30"
integration = ["windows"]
integration = ["system", "windows"]
maturity = "production"
min_stack_comments = "New fields added: required_fields, related_integrations, setup"
min_stack_version = "8.3.0"
updated_date = "2022/12/21"
updated_date = "2023/02/01"
[rule]
author = ["Elastic"]
@@ -1,10 +1,10 @@
[metadata]
creation_date = "2022/11/09"
integration = ["windows"]
integration = ["system", "windows"]
maturity = "production"
min_stack_comments = "New fields added: required_fields, related_integrations, setup"
min_stack_version = "8.3.0"
updated_date = "2022/12/21"
updated_date = "2023/02/01"
[rule]
author = ["Elastic"]
@@ -1,10 +1,10 @@
[metadata]
creation_date = "2021/11/08"
integration = ["windows"]
integration = ["system", "windows"]
maturity = "production"
min_stack_comments = "New fields added: required_fields, related_integrations, setup"
min_stack_version = "8.3.0"
updated_date = "2022/12/21"
updated_date = "2023/02/01"
[rule]
author = ["Elastic"]
@@ -18,7 +18,7 @@ note = """## Triage and analysis
### Investigating Scheduled Task Execution at Scale via GPO
Group Policy Objects (GPOs) can be used by attackers to instruct arbitrarily large groups of clients to execute specified commands at startup, logon, shutdown, and logoff. This is done by creating or modifying the `scripts.ini` or `psscripts.ini` files. The scripts are stored in the following paths:
Group Policy Objects (GPOs) can be used by attackers to instruct arbitrarily large groups of clients to execute specified commands at startup, logon, shutdown, and logoff. This is done by creating or modifying the `scripts.ini` or `psscripts.ini` files. The scripts are stored in the following paths:
- `<GPOPath>\\Machine\\Scripts\\`
- `<GPOPath>\\User\\Scripts\\`
@@ -1,10 +1,10 @@
[metadata]
creation_date = "2021/11/08"
integration = ["windows"]
integration = ["system", "windows"]
maturity = "production"
min_stack_comments = "New fields added: required_fields, related_integrations, setup"
min_stack_version = "8.3.0"
updated_date = "2022/12/21"
updated_date = "2023/02/01"
[rule]
author = ["Elastic"]
@@ -1,10 +1,10 @@
[metadata]
creation_date = "2021/11/08"
integration = ["windows"]
integration = ["system", "windows"]
maturity = "production"
min_stack_comments = "New fields added: required_fields, related_integrations, setup"
min_stack_version = "8.3.0"
updated_date = "2022/12/21"
updated_date = "2023/02/01"
[rule]
author = ["Elastic"]
@@ -1,10 +1,10 @@
[metadata]
creation_date = "2022/04/27"
integration = ["windows"]
integration = ["system", "windows"]
maturity = "production"
min_stack_comments = "New fields added: required_fields, related_integrations, setup"
min_stack_version = "8.3.0"
updated_date = "2022/12/21"
updated_date = "2023/02/01"
[rule]
author = ["Elastic"]
@@ -1,10 +1,10 @@
[metadata]
creation_date = "2021/12/12"
integration = ["windows"]
integration = ["system", "windows"]
maturity = "production"
min_stack_comments = "New fields added: required_fields, related_integrations, setup"
min_stack_version = "8.3.0"
updated_date = "2022/12/21"
updated_date = "2023/02/01"
[rule]
author = ["Elastic"]
@@ -1,10 +1,10 @@
[metadata]
creation_date = "2022/05/11"
integration = ["windows"]
integration = ["system", "windows"]
maturity = "production"
min_stack_comments = "New fields added: required_fields, related_integrations, setup"
min_stack_version = "8.3.0"
updated_date = "2022/12/21"
updated_date = "2023/02/01"
[rule]
author = ["Elastic"]
@@ -1,10 +1,10 @@
[metadata]
creation_date = "2022/02/07"
integration = ["windows"]
integration = ["system", "windows"]
maturity = "production"
min_stack_comments = "New fields added: required_fields, related_integrations, setup"
min_stack_version = "8.3.0"
updated_date = "2022/12/21"
updated_date = "2023/02/01"
[rule]
author = ["Elastic"]
+22 -15
View File
@@ -443,36 +443,43 @@ class TestRuleMetadata(BaseRuleTest):
def test_integration_tag(self):
"""Test integration rules defined by metadata tag."""
failures = []
non_dataset_packages = ["apm", "endpoint", "windows", "winlog"]
non_dataset_packages = definitions.NON_DATASET_PACKAGES + ["winlog"]
packages_manifest = load_integrations_manifests()
valid_integration_folders = [p.name for p in list(Path(INTEGRATION_RULE_DIR).glob("*")) if p.name != 'endpoint']
for rule in self.production_rules:
rule_integrations = rule.contents.metadata.get('integration')
if rule_integrations:
if isinstance(rule.contents.data, QueryRuleData) and rule.contents.data.language != 'lucene':
rule_integrations = rule.contents.metadata.get('integration') or []
rule_integrations = [rule_integrations] if isinstance(rule_integrations, str) else rule_integrations
data = rule.contents.data
meta = rule.contents.metadata
package_integrations = TOMLRuleContents.get_packaged_integrations(data, meta, packages_manifest)
package_integrations_list = list(set([integration["package"] for integration in package_integrations]))
indices = data.get('index')
for rule_integration in rule_integrations:
# checks if metadata tag matches from a list of integrations in EPR
if rule_integration not in packages_manifest.keys():
err_msg = f"{self.rule_str(rule)} integration '{rule_integration}' unknown"
failures.append(err_msg)
# checks if the rule path matches the intended integration
if rule_integration in valid_integration_folders:
if rule_integration != rule.path.parent.name:
if rule.path.parent.name not in rule_integrations:
err_msg = f'{self.rule_str(rule)} {rule_integration} tag, path is {rule.path.parent.name}'
failures.append(err_msg)
else:
# checks if event.dataset exists in query object and a tag exists in metadata
if isinstance(rule.contents.data, QueryRuleData) and rule.contents.data.language != 'lucene':
trc = TOMLRuleContents(rule.contents.metadata, rule.contents.data)
package_integrations = trc._get_packaged_integrations(packages_manifest)
if package_integrations:
err_msg = f'{self.rule_str(rule)} integration tag should exist: '
# checks if an index pattern exists if the package integration tag exists
integration_string = "|".join(indices)
if not re.search(rule_integration, integration_string):
if rule_integration == "windows" and re.search("winlog", integration_string):
continue
err_msg = f'{self.rule_str(rule)} {rule_integration} tag, index pattern missing.'
failures.append(err_msg)
# checks if event.dataset exists in query object and a tag exists in metadata
# checks if metadata tag matches from a list of integrations in EPR
if package_integrations and sorted(rule_integrations) != sorted(package_integrations_list):
err_msg = f'{self.rule_str(rule)} integration tags: {rule_integrations} != ' \
f'package integrations: {package_integrations_list}'
failures.append(err_msg)
else:
# checks if rule has index pattern integration and the integration tag exists
# ignore the External Alerts rule, Threat Indicator Matching Rules, Guided onboarding
ignore_ids = [