[Hunt Tuning] Enforce STATS or KEEP functions in ES|QL hunting queries (#4157)

* enforcing aggregate or keep in ES|QL queries

* Update hunting/definitions.py

* Update hunting/definitions.py

* Update hunting/definitions.py

* updated capitalization of linting

* updated raise value error

* Update hunting/definitions.py

* added note about stats in best practices
This commit is contained in:
Terrance DeJesus
2024-10-16 09:16:28 -04:00
committed by GitHub
parent c1ce0d43d1
commit 4b4b2cc9c8
8 changed files with 44 additions and 7 deletions
+1
View File
@@ -49,6 +49,7 @@ Otherwise, the names do not require the integration, since it is already annotat
* Use `LIMIT` command to limit the number of results, depending on expected result volume
* Filter as much as possible in `WHERE` command to reduce events needed to be processed
* For `FROM` command for index patterns, be as specific as possible to reduce potential event matches that are irrelevant
* Use `STATS` to aggregate results into a tabular format for optimization
### Field Usage
Use standardized fields where possible to ensure that queries are compatible across different data environments and sources.
@@ -27,5 +27,6 @@ from logs-aws.cloudtrail-*
and aws.cloudtrail.request_parameters RLIKE ".*arn:aws:iam.*"
| dissect aws.cloudtrail.request_parameters "%{}AWS\": \"arn:aws:iam::%{target_account_id}:"
| where cloud.account.id != target_account_id
| keep @timestamp, event.provider, event.action, aws.cloudtrail.request_parameters, target_account_id, cloud.account.id
'''
]
@@ -25,5 +25,6 @@ from logs-aws.cloudtrail-*
| dissect aws.cloudtrail.request_parameters "{%{?principal_key}=%{principal_id}, %{?function_name_key}=%{function_name}, %{?statement_key}=%{statement_value}, %{?action_key}=lambda:%{action_value}}"
| eval write_action = (starts_with(action_value, "Invoke") or starts_with("Update", action_value) or starts_with("Put", action_value))
| where write_action == true
| keep @timestamp, principal_id, event.provider, event.action, aws.cloudtrail.request_parameters, principal_id, function_name, action_value, statement_value, write_action
'''
]
@@ -23,4 +23,5 @@ from logs-aws.cloudtrail-*
and aws.cloudtrail.user_identity.type == "FederatedUser"
| dissect aws.cloudtrail.additional_eventdata "{%{?mobile_version_key}=%{mobile_version}, %{?mfa_used_key}=%{mfa_used}}"
| where mfa_used == "No"
| keep @timestamp, event.provider, event.action, aws.cloudtrail.event_type, aws.cloudtrail.user_identity.type, aws.cloudtrail.additional_eventdata, mobile_version, mfa_used
''']
@@ -22,5 +22,6 @@ from logs-aws.cloudtrail-*
and aws.cloudtrail.user_identity.type == "AssumedRole"
and event.action == "SendCommand"
and user.id like "*:i-*"
| keep @timestamp, event.provider, event.action, aws.cloudtrail.user_identity.type, user.id, aws.cloudtrail.request_parameters
'''
]
@@ -27,4 +27,5 @@ from logs-aws.cloudtrail-*
| dissect aws.cloudtrail.request_parameters "{%{}policyArns=[%{policies_applied}]"
| eval duration_minutes = to_integer(duration_requested) / 60
| where (duration_minutes > 1440) or (policies_applied RLIKE ".*AdministratorAccess.*")
| keep @timestamp, event.dataset, event.provider, event.action, aws.cloudtrail.request_parameters, user_name, duration_requested, duration_minutes, policies_applied
''']
+36 -7
View File
@@ -3,9 +3,10 @@
# 2.0; you may not use this file except in compliance with the Elastic License
# 2.0.
import re
from dataclasses import dataclass, field
from pathlib import Path
from typing import Optional
from typing import Optional, List
# Define the hunting directory path
HUNTING_DIR = Path(__file__).parent
@@ -25,12 +26,40 @@ class Hunt:
"""Dataclass to represent a hunt."""
author: str
description: str
integration: list[str]
integration: List[str]
uuid: str
name: str
language: list[str]
language: List[str]
license: str
query: list[str]
notes: Optional[list[str]] = field(default_factory=list)
mitre: list[str] = field(default_factory=list)
references: Optional[list[str]] = field(default_factory=list)
query: List[str]
notes: Optional[List[str]] = field(default_factory=list)
mitre: List[str] = field(default_factory=list)
references: Optional[List[str]] = field(default_factory=list)
def __post_init__(self):
"""Post-initialization to determine which validation to apply."""
if not self.query:
raise ValueError(f"Hunt: {self.name} - Query field must be provided.")
# Loop through each query in the array
for idx, q in enumerate(self.query):
query_start = q.strip().lower()
# Only validate queries that start with "from" (ESQL queries)
if query_start.startswith("from"):
self.validate_esql_query(q)
def validate_esql_query(self, query: str) -> None:
"""Validation logic for ESQL."""
query = query.lower()
if self.author == "Elastic":
# Regex patterns for checking "stats by" and "| keep"
stats_by_pattern = re.compile(r'\bstats\b.*?\bby\b', re.DOTALL)
keep_pattern = re.compile(r'\| keep', re.DOTALL)
# Check if either "stats by" or "| keep" exists in the query
if not stats_by_pattern.search(query) and not keep_pattern.search(query):
raise ValueError(
f"Hunt: {self.name} contains an ES|QL query that must contain either 'stats by' or 'keep' functions."
)
@@ -34,4 +34,6 @@ from logs-okta.system*
// filter for scopes that are not implicitly granted
and okta.outcome.reason == "no_matching_scope"
| keep @timestamp, event.action, okta.actor.type, okta.outcome.result, okta.outcome.reason, okta.actor.display_name
''']