Files
sigma-rules/hunting/utils.py
T
Terrance DeJesus 50e23ba242 [Hunting] Re-factor Hunting Library Code (#4085)
* updating python code for hunting library

* fixed okta queries; added MITRE search capability

* fixed hunting unit test imports

* fixed duplicate UUID; fixed duplicate index entry bug

* fixed technique finding sub-technique in search

* added more unit tests

* linted

* flake errors addressed; fixed unit test import; fixed markdown generate bug

* added description for generate-markdown command

* updated README

* adjusted YAML index, adjusted code for index changes

* adjusted relative imports; updated CODEOWNERS

* adding updates; moving to different branch for main dependencies

* finished run-query command; made some code adjustments

* removed some comments

* revised makefile; fixed unit tests; adjusted detection rules pyproject

* updated README

* updated README

* adjusted unit tests; adjusted hunt guidelines; updated makefile; adjusted several commands

* adjusted package to be more object-oriented

* removed unused variable

* Add simple breakdown stats

* addressed feedback; added keyword option for search

* Update hunting/README.md

Co-authored-by: Mika Ayenson <Mikaayenson@users.noreply.github.com>

* Update detection_rules/etc/test_hunting_cli.bash

Co-authored-by: Eric Forte <119343520+eric-forte-elastic@users.noreply.github.com>

* addressing feedback

* addressed feedback

* added message for unknown index; fixed function call

* fixed search command

* fixed flake error

---------

Co-authored-by: Mika Ayenson <Mika.ayenson@elastic.co>
Co-authored-by: Mika Ayenson <Mikaayenson@users.noreply.github.com>
Co-authored-by: Eric Forte <119343520+eric-forte-elastic@users.noreply.github.com>
2024-10-03 12:47:40 -04:00

135 lines
4.5 KiB
Python

# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
# or more contributor license agreements. Licensed under the Elastic License
# 2.0; you may not use this file except in compliance with the Elastic License
# 2.0.
import inspect
import tomllib
from pathlib import Path
from typing import Union
import click
import urllib3
import yaml
from detection_rules.misc import get_elasticsearch_client
from .definitions import HUNTING_DIR, Hunt
def get_hunt_path(uuid: str, file_path: str) -> (Path, str):
"""Resolve the path of the hunting query using either a UUID or file path."""
if uuid:
# Load the index and find the hunt by UUID
index_data = load_index_file()
for data_source, hunts in index_data.items():
if uuid in hunts:
hunt_data = hunts[uuid]
# Combine the relative path from the index with the HUNTING_DIR
hunt_path = HUNTING_DIR / hunt_data['path']
return hunt_path.resolve(), None
return None, f"No hunt found for UUID: {uuid}"
elif file_path:
# Use the provided file path
hunt_path = Path(file_path)
if not hunt_path.is_file():
return None, f"No file found at path: {file_path}"
return hunt_path.resolve(), None
return None, "Either UUID or file path must be provided."
def load_index_file() -> dict:
"""Load the hunting index.yml file."""
index_file = HUNTING_DIR / "index.yml"
if not index_file.exists():
click.echo(f"No index.yml found at {index_file}.")
return {}
with open(index_file, 'r') as f:
hunting_index = yaml.safe_load(f)
return hunting_index
def load_toml(source: Union[Path, str]) -> Hunt:
"""Load and validate TOML content as Hunt dataclass."""
if isinstance(source, Path):
if not source.is_file():
raise FileNotFoundError(f"TOML file not found: {source}")
contents = source.read_text(encoding="utf-8")
else:
contents = source
toml_dict = tomllib.loads(contents)
# Validate and load the content into the Hunt dataclass
return Hunt(**toml_dict["hunt"])
def load_all_toml(base_path: Path):
"""Load all TOML files from the directory and return a list of Hunt configurations and their paths."""
hunts = []
for toml_file in base_path.rglob("*.toml"):
hunt_config = load_toml(toml_file)
hunts.append((hunt_config, toml_file))
return hunts
def save_index_file(base_path: Path, directories: dict) -> None:
"""Save the updated index.yml file."""
index_file = base_path / "index.yml"
with open(index_file, 'w') as f:
yaml.safe_dump(directories, f, default_flow_style=False, sort_keys=False)
print(f"Index YAML updated at: {index_file}")
def validate_link(link: str):
"""Validate and return the link."""
http = urllib3.PoolManager()
response = http.request('GET', link)
if response.status != 200:
raise ValueError(f"Invalid link: {link}")
def update_index_yml(base_path: Path) -> None:
"""Update index.yml based on the current TOML files."""
directories = load_index_file()
# Load all TOML files recursively
toml_files = base_path.rglob("queries/*.toml")
for toml_file in toml_files:
# Load TOML and extract hunt configuration
hunt_config = load_toml(toml_file)
folder_name = toml_file.parent.parent.name
uuid = hunt_config.uuid
entry = {
'name': hunt_config.name,
'path': f"./{toml_file.relative_to(base_path).as_posix()}",
'mitre': hunt_config.mitre
}
# Check if the folder_name exists and if it's a list, convert it to a dictionary
if folder_name not in directories:
directories[folder_name] = {uuid: entry}
else:
if isinstance(directories[folder_name], list):
# Convert the list to a dictionary, using UUIDs as keys
directories[folder_name] = {item['uuid']: item for item in directories[folder_name]}
directories[folder_name][uuid] = entry
# Save the updated index.yml
save_index_file(base_path, directories)
def filter_elasticsearch_params(config: dict) -> dict:
"""Filter out unwanted keys from the config by inspecting the Elasticsearch client constructor."""
# Get the parameter names from the Elasticsearch class constructor
es_params = inspect.signature(get_elasticsearch_client).parameters
return {k: v for k, v in config.items() if k in es_params}