Files
sigma-rules/hunting/run.py
T
Terrance DeJesus 50e23ba242 [Hunting] Re-factor Hunting Library Code (#4085)
* updating python code for hunting library

* fixed okta queries; added MITRE search capability

* fixed hunting unit test imports

* fixed duplicate UUID; fixed duplicate index entry bug

* fixed technique finding sub-technique in search

* added more unit tests

* linted

* flake errors addressed; fixed unit test import; fixed markdown generate bug

* added description for generate-markdown command

* updated README

* adjusted YAML index, adjusted code for index changes

* adjusted relative imports; updated CODEOWNERS

* adding updates; moving to different branch for main dependencies

* finished run-query command; made some code adjustments

* removed some comments

* revised makefile; fixed unit tests; adjusted detection rules pyproject

* updated README

* updated README

* adjusted unit tests; adjusted hunt guidelines; updated makefile; adjusted several commands

* adjusted package to be more object-oriented

* removed unused variable

* Add simple breakdown stats

* addressed feedback; added keyword option for search

* Update hunting/README.md

Co-authored-by: Mika Ayenson <Mikaayenson@users.noreply.github.com>

* Update detection_rules/etc/test_hunting_cli.bash

Co-authored-by: Eric Forte <119343520+eric-forte-elastic@users.noreply.github.com>

* addressing feedback

* addressed feedback

* added message for unknown index; fixed function call

* fixed search command

* fixed flake error

---------

Co-authored-by: Mika Ayenson <Mika.ayenson@elastic.co>
Co-authored-by: Mika Ayenson <Mikaayenson@users.noreply.github.com>
Co-authored-by: Eric Forte <119343520+eric-forte-elastic@users.noreply.github.com>
2024-10-03 12:47:40 -04:00

77 lines
3.2 KiB
Python

# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
# or more contributor license agreements. Licensed under the Elastic License
# 2.0; you may not use this file except in compliance with the Elastic License
# 2.0.
import re
import textwrap
from pathlib import Path
import click
from detection_rules.misc import get_elasticsearch_client
from .utils import load_toml
class QueryRunner:
def __init__(self, es_config: dict):
"""Initialize the QueryRunner with Elasticsearch config."""
self.es_config = es_config
def load_hunting_file(self, file_path: Path):
"""Load the hunting file and return the data."""
return load_toml(file_path)
def preprocess_query(self, query: str) -> str:
"""Pre-process the query by removing comments and adding a LIMIT."""
query = re.sub(r'//.*', '', query)
if not re.search(r'LIMIT', query, re.IGNORECASE):
query += " | LIMIT 10"
click.echo("No LIMIT detected in query. Added LIMIT 10 to truncate output.")
return query
def run_individual_query(self, query: str, wait_timeout: int):
"""Run a single query with the Elasticsearch config."""
es = get_elasticsearch_client(**self.es_config)
query = self.preprocess_query(query)
try:
click.secho("Running query. Press Ctrl+C to cancel.", fg="blue")
query = query.replace("\n", " ")
# Start the query synchronously
response = es.esql.query(query=query)
self.process_results(response)
except Exception as e:
# handle missing index error
if "Unknown index" in str(e):
click.secho("This query references indexes that do not exist in the target stack.", fg="red")
click.secho("Check if index exists (via integration installation) and contains data.", fg="red")
click.secho("Alternatively, update the query to reference an existing index.", fg="red")
else:
click.secho(f"Error running query: {str(e)}", fg="red")
def run_all_queries(self, queries: dict, wait_timeout: int):
"""Run all eligible queries in the hunting file."""
click.secho("Running all eligible queries...", fg="green", bold=True)
for i, query in queries.items():
click.secho(f"\nRunning Query {i + 1}:", fg="green", bold=True)
click.echo(self._format_query(query))
self.run_individual_query(query, wait_timeout)
click.secho("\n" + "-" * 120, fg="yellow")
def process_results(self, response):
"""Process the Elasticsearch query results and display the outcome."""
response_data = response.body
if response_data.get('values'):
click.secho("Query matches found!", fg="red", bold=True)
else:
click.secho("No matches found!", fg="green", bold=True)
def _format_query(self, query: str) -> str:
"""Format the query with word wrapping for better readability."""
lines = query.split('\n')
wrapped_lines = [textwrap.fill(line, width=120, subsequent_indent=' ') for line in lines]
return '\n'.join(wrapped_lines)