Files
sigma-rules/detection_rules/custom_schemas.py
T
Eric Forte 47d7a3acaa [DaC] Beta Release (#3889)
Co-authored-by: Justin Ibarra <16747370+brokensound77@users.noreply.github.com>
Co-authored-by: brokensound77 <brokensound77@users.noreply.github.com>
Co-authored-by: Mika Ayenson <Mikaayenson@users.noreply.github.com>
Co-authored-by: Mika Ayenson <mika.ayenson@elastic.co>
2024-08-06 18:07:12 -04:00

108 lines
4.3 KiB
Python

# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
# or more contributor license agreements. Licensed under the Elastic License
# 2.0; you may not use this file except in compliance with the Elastic License
# 2.0.
"""Custom Schemas management."""
import uuid
from pathlib import Path
import eql
import eql.types
from eql import load_dump, save_dump
from .config import parse_rules_config
from .utils import cached, clear_caches
RULES_CONFIG = parse_rules_config()
RESERVED_SCHEMA_NAMES = ["beats", "ecs", "endgame"]
@cached
def get_custom_schemas(stack_version: str = None) -> dict:
"""Load custom schemas if present."""
custom_schema_dump = {}
stack_versions = [stack_version] if stack_version else RULES_CONFIG.stack_schema_map.keys()
for version in stack_versions:
stack_schema_map = RULES_CONFIG.stack_schema_map[version]
for schema, value in stack_schema_map.items():
if schema not in RESERVED_SCHEMA_NAMES:
schema_path = Path(value)
if not schema_path.is_absolute():
schema_path = RULES_CONFIG.stack_schema_map_file.parent / value
if schema_path.is_file():
custom_schema_dump.update(eql.utils.load_dump(str(schema_path)))
else:
raise ValueError(f"Custom schema must be a file: {schema_path}")
return custom_schema_dump
def resolve_schema_path(path: str) -> Path:
"""Helper function to resolve the schema path."""
path_obj = Path(path)
return path_obj if path_obj.is_absolute() else RULES_CONFIG.stack_schema_map_file.parent.joinpath(path)
def update_data(index: str, field: str, data: dict) -> dict:
"""Update the schema entry with the appropriate index and field."""
if index not in data:
data[index] = {}
data[index][field] = "keyword"
return data
def update_stack_schema_map(stack_schema_map: dict, auto_gen_schema_file: str) -> dict:
"""Update the stack-schema-map.yaml file with the appropriate auto_gen_schema_file location."""
random_uuid = str(uuid.uuid4())
auto_generated_id = None
for version in stack_schema_map:
key_found = False
for key, value in stack_schema_map[version].items():
value_path = resolve_schema_path(value)
if value_path == Path(auto_gen_schema_file).resolve() and key not in RESERVED_SCHEMA_NAMES:
auto_generated_id = key
key_found = True
break
if key_found is False:
if auto_generated_id is None:
auto_generated_id = random_uuid
stack_schema_map[version][auto_generated_id] = str(auto_gen_schema_file)
return stack_schema_map, auto_generated_id, random_uuid
def clean_stack_schema_map(stack_schema_map: dict, auto_generated_id: str, random_uuid: str) -> dict:
"""Clean up the stack-schema-map.yaml file replacing the random UUID with a known key if possible."""
for version in stack_schema_map:
if random_uuid in stack_schema_map[version]:
stack_schema_map[version][auto_generated_id] = stack_schema_map[version].pop(random_uuid)
return stack_schema_map
def update_auto_generated_schema(index: str, field: str):
"""Load custom schemas if present."""
auto_gen_schema_file = str(RULES_CONFIG.auto_gen_schema_file)
stack_schema_map_file = str(RULES_CONFIG.stack_schema_map_file)
# Update autogen schema file
data = load_dump(auto_gen_schema_file)
data = update_data(index, field, data)
save_dump(data, auto_gen_schema_file)
# Update the stack-schema-map.yaml file with the appropriate auto_gen_schema_file location
stack_schema_map = load_dump(stack_schema_map_file)
stack_schema_map, auto_generated_id, random_uuid = update_stack_schema_map(stack_schema_map, auto_gen_schema_file)
save_dump(stack_schema_map, stack_schema_map_file)
# Clean up the stack-schema-map.yaml file replacing the random UUID with the auto_generated_id
stack_schema_map = load_dump(stack_schema_map_file)
stack_schema_map = clean_stack_schema_map(stack_schema_map, auto_generated_id, random_uuid)
save_dump(stack_schema_map, stack_schema_map_file)
RULES_CONFIG.stack_schema_map = stack_schema_map
# IMPORTANT must clear cache in order to reload schema
clear_caches()