[FR] Add Support for Local Dates Flag (#4582)
* Add support for local dates flag * Use two variables * Add support for import-rules-to-repo * Revert arg formatting * Update comment * Pass Rule Path as Path Object * Update to rule loader function * Streamline metadata function * Also support dictionaries * Bump patch version * Reduce complexity * Add if path exists check * Fix version bump
This commit is contained in:
@@ -210,18 +210,11 @@ def rule_prompt(path=None, rule_type=None, required_only=True, save=True, verbos
|
||||
# DEFAULT_PREBUILT_RULES_DIRS[0] is a required directory just as a suggestion
|
||||
suggested_path = Path(DEFAULT_PREBUILT_RULES_DIRS[0]) / contents['name']
|
||||
path = Path(path or input(f'File path for rule [{suggested_path}]: ') or suggested_path).resolve()
|
||||
# Inherit maturity from the rule already exists
|
||||
maturity = "development"
|
||||
if path.exists():
|
||||
rules = RuleCollection()
|
||||
rules.load_file(path)
|
||||
if rules:
|
||||
maturity = rules.rules[0].contents.metadata.maturity
|
||||
|
||||
# Inherit maturity and optionally local dates from the rule if it already exists
|
||||
meta = {
|
||||
"creation_date": creation_date,
|
||||
"updated_date": creation_date,
|
||||
"maturity": maturity,
|
||||
"creation_date": kwargs.get("creation_date") or creation_date,
|
||||
"updated_date": kwargs.get("updated_date") or creation_date,
|
||||
"maturity": "development" or kwargs.get("maturity"),
|
||||
}
|
||||
|
||||
try:
|
||||
|
||||
+19
-14
@@ -24,7 +24,7 @@ from .generic_loader import GenericCollection
|
||||
from .main import root
|
||||
from .misc import add_params, client_error, kibana_options, get_kibana_client, nested_set
|
||||
from .rule import downgrade_contents_from_rule, TOMLRuleContents, TOMLRule
|
||||
from .rule_loader import RuleCollection
|
||||
from .rule_loader import RuleCollection, update_metadata_from_file
|
||||
from .utils import format_command_options, rulename_to_filename
|
||||
|
||||
RULES_CONFIG = parse_rules_config()
|
||||
@@ -199,12 +199,14 @@ def kibana_import_rules(ctx: click.Context, rules: RuleCollection, overwrite: Op
|
||||
@click.option("--export-exceptions", "-e", is_flag=True, help="Include exceptions in export")
|
||||
@click.option("--skip-errors", "-s", is_flag=True, help="Skip errors when exporting rules")
|
||||
@click.option("--strip-version", "-sv", is_flag=True, help="Strip the version fields from all rules")
|
||||
@click.option("--local-creation-date", "-lc", is_flag=True, help="Preserve the local creation date of the rule")
|
||||
@click.option("--local-updated-date", "-lu", is_flag=True, help="Preserve the local updated date of the rule")
|
||||
@click.pass_context
|
||||
def kibana_export_rules(ctx: click.Context, directory: Path, action_connectors_directory: Optional[Path],
|
||||
exceptions_directory: Optional[Path], default_author: str,
|
||||
rule_id: Optional[Iterable[str]] = None, export_action_connectors: bool = False,
|
||||
export_exceptions: bool = False, skip_errors: bool = False, strip_version: bool = False
|
||||
) -> List[TOMLRule]:
|
||||
export_exceptions: bool = False, skip_errors: bool = False, strip_version: bool = False,
|
||||
local_creation_date: bool = False, local_updated_date: bool = False) -> List[TOMLRule]:
|
||||
"""Export custom rules from Kibana."""
|
||||
kibana = ctx.obj["kibana"]
|
||||
kibana_include_details = export_exceptions or export_action_connectors
|
||||
@@ -232,6 +234,8 @@ def kibana_export_rules(ctx: click.Context, directory: Path, action_connectors_d
|
||||
return []
|
||||
|
||||
rules_results = results
|
||||
action_connector_results = []
|
||||
exception_results = []
|
||||
if kibana_include_details:
|
||||
# Assign counts to variables
|
||||
rules_count = results[-1]["exported_rules_count"]
|
||||
@@ -259,22 +263,23 @@ def kibana_export_rules(ctx: click.Context, directory: Path, action_connectors_d
|
||||
rule_resource["author"] = rule_resource.get("author") or default_author or [rule_resource.get("created_by")]
|
||||
if isinstance(rule_resource["author"], str):
|
||||
rule_resource["author"] = [rule_resource["author"]]
|
||||
# Inherit maturity from the rule already exists
|
||||
maturity = "development"
|
||||
# Inherit maturity and optionally local dates from the rule if it already exists
|
||||
params = {
|
||||
"rule": rule_resource,
|
||||
"maturity": "development",
|
||||
}
|
||||
threat = rule_resource.get("threat")
|
||||
first_tactic = threat[0].get("tactic").get("name") if threat else ""
|
||||
rule_name = rulename_to_filename(rule_resource.get("name"), tactic_name=first_tactic)
|
||||
# check if directory / f"{rule_name}" exists
|
||||
if (directory / f"{rule_name}").exists():
|
||||
rules = RuleCollection()
|
||||
rules.load_file(directory / f"{rule_name}")
|
||||
if rules:
|
||||
maturity = rules.rules[0].contents.metadata.maturity
|
||||
|
||||
contents = TOMLRuleContents.from_rule_resource(
|
||||
rule_resource, maturity=maturity
|
||||
save_path = directory / f"{rule_name}"
|
||||
params.update(
|
||||
update_metadata_from_file(
|
||||
save_path, {"creation_date": local_creation_date, "updated_date": local_updated_date}
|
||||
)
|
||||
)
|
||||
rule = TOMLRule(contents=contents, path=directory / f"{rule_name}")
|
||||
contents = TOMLRuleContents.from_rule_resource(**params)
|
||||
rule = TOMLRule(contents=contents, path=save_path)
|
||||
except Exception as e:
|
||||
if skip_errors:
|
||||
print(f'- skipping {rule_resource.get("name")} - {type(e).__name__}')
|
||||
|
||||
+11
-2
@@ -32,7 +32,7 @@ from .misc import (
|
||||
)
|
||||
from .rule import TOMLRule, TOMLRuleContents, QueryRuleData
|
||||
from .rule_formatter import toml_write
|
||||
from .rule_loader import RuleCollection
|
||||
from .rule_loader import RuleCollection, update_metadata_from_file
|
||||
from .schemas import all_versions, definitions, get_incompatible_fields, get_schema_file
|
||||
from .utils import Ndjson, get_path, get_etc_path, clear_caches, load_dump, load_rule_contents, rulename_to_filename
|
||||
|
||||
@@ -128,10 +128,13 @@ def generate_rules_index(ctx: click.Context, query, overwrite, save_files=True):
|
||||
@click.option("--skip-errors", "-ske", is_flag=True, help="Skip rule import errors")
|
||||
@click.option("--default-author", "-da", type=str, required=False, help="Default author for rules missing one")
|
||||
@click.option("--strip-none-values", "-snv", is_flag=True, help="Strip None values from the rule")
|
||||
@click.option("--local-creation-date", "-lc", is_flag=True, help="Preserve the local creation date of the rule")
|
||||
@click.option("--local-updated-date", "-lu", is_flag=True, help="Preserve the local updated date of the rule")
|
||||
def import_rules_into_repo(input_file: click.Path, required_only: bool, action_connector_import: bool,
|
||||
exceptions_import: bool, directory: click.Path, save_directory: click.Path,
|
||||
action_connectors_directory: click.Path, exceptions_directory: click.Path,
|
||||
skip_errors: bool, default_author: str, strip_none_values: bool):
|
||||
skip_errors: bool, default_author: str, strip_none_values: bool, local_creation_date: bool,
|
||||
local_updated_date: bool):
|
||||
"""Import rules from json, toml, or yaml files containing Kibana exported rule(s)."""
|
||||
errors = []
|
||||
rule_files = glob.glob(os.path.join(directory, "**", "*.*"), recursive=True) if directory else []
|
||||
@@ -179,6 +182,12 @@ def import_rules_into_repo(input_file: click.Path, required_only: bool, action_c
|
||||
if isinstance(contents["author"], str):
|
||||
contents["author"] = [contents["author"]]
|
||||
|
||||
contents.update(
|
||||
update_metadata_from_file(
|
||||
Path(rule_path), {"creation_date": local_creation_date, "updated_date": local_updated_date}
|
||||
)
|
||||
)
|
||||
|
||||
output = rule_prompt(
|
||||
rule_path,
|
||||
required_only=required_only,
|
||||
|
||||
@@ -18,7 +18,8 @@ from marshmallow.exceptions import ValidationError
|
||||
from . import utils
|
||||
from .config import parse_rules_config
|
||||
from .rule import (
|
||||
DeprecatedRule, DeprecatedRuleContents, DictRule, TOMLRule, TOMLRuleContents
|
||||
DeprecatedRule, DeprecatedRuleContents, DictRule, TOMLRule,
|
||||
TOMLRuleContents
|
||||
)
|
||||
from .schemas import definitions
|
||||
from .utils import cached, get_path
|
||||
@@ -116,6 +117,20 @@ def load_locks_from_tag(remote: str, tag: str, version_lock: str = 'detection_ru
|
||||
return commit_hash, version, deprecated
|
||||
|
||||
|
||||
def update_metadata_from_file(rule_path: Path, fields_to_update: dict) -> dict:
|
||||
"""Update metadata fields for a rule with local contents."""
|
||||
contents = {}
|
||||
if not rule_path.exists():
|
||||
return contents
|
||||
local_metadata = RuleCollection().load_file(rule_path).contents.metadata.to_dict()
|
||||
if local_metadata:
|
||||
contents["maturity"] = local_metadata.get("maturity", "development")
|
||||
for field_name, should_update in fields_to_update.items():
|
||||
if should_update and field_name in local_metadata:
|
||||
contents[field_name] = local_metadata[field_name]
|
||||
return contents
|
||||
|
||||
|
||||
@dataclass
|
||||
class BaseCollection:
|
||||
"""Base class for collections."""
|
||||
|
||||
+1
-1
@@ -1,6 +1,6 @@
|
||||
[project]
|
||||
name = "detection_rules"
|
||||
version = "1.0.9"
|
||||
version = "1.0.10"
|
||||
description = "Detection Rules is the home for rules used by Elastic Security. This repository is used for the development, maintenance, testing, validation, and release of rules for Elastic Security’s Detection Engine."
|
||||
readme = "README.md"
|
||||
requires-python = ">=3.12"
|
||||
|
||||
Reference in New Issue
Block a user