Add export-rules command (#639)

* Add export-rule command to CLI
* add `export` method to packaging class
This commit is contained in:
Justin Ibarra
2021-02-08 20:43:16 -09:00
committed by GitHub
parent e507898dbd
commit 56dc4745b5
8 changed files with 158 additions and 18 deletions
+2
View File
@@ -108,3 +108,5 @@ ENV/
# Siem rules
releases/
collections/
exports/
surveys/
+23
View File
@@ -164,9 +164,32 @@ Usage: detection_rules kibana upload-rule [OPTIONS] TOML_FILES...
Upload a list of rule .toml files to Kibana.
Options:
-r, --replace-id Replace rule IDs with new IDs before export
-h, --help Show this message and exit.
```
Alternatively, rules can be exported into a consolidated ndjson file which can be imported in the Kibana security app
directly.
```console
Usage: detection_rules export-rules [OPTIONS] [RULE_ID]...
Export rule(s) into an importable ndjson file.
Options:
-f, --rule-file FILE Export specified rule files
-d, --directory DIRECTORY Recursively export rules from a directory
-o, --outfile FILE Name of file for exported rules
-r, --replace-id Replace rule IDs with new IDs before export
--stack-version [7.8|7.9|7.10|7.11]
Downgrade a rule version to be compatible
with older instances of Kibana
-s, --skip-unsupported If `--stack-version` is passed, skip
rule types which are unsupported (an error
will be raised otherwise)
-h, --help Show this message and exit.
```
_*To load a custom rule, the proper index must be setup first. The simplest way to do this is to click
the `Load prebuilt detection rules and timeline templates` button on the `detections` page in the Kibana security app._
+7 -3
View File
@@ -49,8 +49,9 @@ def kibana_group(ctx: click.Context, **kibana_kwargs):
@kibana_group.command("upload-rule")
@click.argument("toml-files", nargs=-1, required=True)
@click.option('--replace-id', '-r', is_flag=True, help='Replace rule IDs with new IDs before export')
@click.pass_context
def upload_rule(ctx, toml_files):
def upload_rule(ctx, toml_files, replace_id):
"""Upload a list of rule .toml files to Kibana."""
from .packaging import manage_versions
@@ -66,8 +67,11 @@ def upload_rule(ctx, toml_files):
api_payloads = []
for rule in rules:
payload = rule.get_payload(include_version=True, replace_id=True, embed_metadata=True,
target_version=kibana.version)
try:
payload = rule.get_payload(include_version=True, replace_id=replace_id, embed_metadata=True,
target_version=kibana.version)
except ValueError as e:
client_error(f'{e} in version:{kibana.version}, for rule: {rule.name}', e, ctx=ctx)
rule = RuleResource(payload)
api_payloads.append(rule)
+67 -4
View File
@@ -7,6 +7,8 @@ import glob
import json
import os
import re
import time
from pathlib import Path
import click
import jsonschema
@@ -16,7 +18,7 @@ from . import rule_loader
from .misc import client_error, nested_set, parse_config
from .rule import Rule
from .rule_formatter import toml_write
from .schemas import CurrentSchema
from .schemas import CurrentSchema, available_versions
from .utils import get_path, clear_caches, load_rule_contents
@@ -130,7 +132,7 @@ def mass_update(ctx, query, metadata, language, field):
@click.option('--rule-file', '-f', type=click.Path(dir_okay=False), help='Optionally view a rule from a specified file')
@click.option('--api-format/--rule-format', default=True, help='Print the rule in final api or rule format')
@click.pass_context
def view_rule(ctx, rule_id, rule_file, api_format):
def view_rule(ctx, rule_id, rule_file, api_format, verbose=True):
"""View an internal rule or specified rule file."""
rule = None
@@ -149,12 +151,73 @@ def view_rule(ctx, rule_id, rule_file, api_format):
if not rule:
client_error('Unknown format!')
click.echo(toml_write(rule.rule_format()) if not api_format else
json.dumps(rule.get_payload(), indent=2, sort_keys=True))
if verbose:
click.echo(toml_write(rule.rule_format()) if not api_format else
json.dumps(rule.get_payload(), indent=2, sort_keys=True))
return rule
@root.command('export-rules')
@click.argument('rule-id', nargs=-1, required=False)
@click.option('--rule-file', '-f', multiple=True, type=click.Path(dir_okay=False), help='Export specified rule files')
@click.option('--directory', '-d', multiple=True, type=click.Path(file_okay=False),
help='Recursively export rules from a directory')
@click.option('--outfile', '-o', default=get_path('exports', f'{time.strftime("%Y%m%dT%H%M%SL")}.ndjson'),
type=click.Path(dir_okay=False), help='Name of file for exported rules')
@click.option('--replace-id', '-r', is_flag=True, help='Replace rule IDs with new IDs before export')
@click.option('--stack-version', type=click.Choice(available_versions),
help='Downgrade a rule version to be compatible with older instances of Kibana')
@click.option('--skip-unsupported', '-s', is_flag=True,
help='If `--stack-version` is passed, skip rule types which are unsupported '
'(an error will be raised otherwise)')
def export_rules(rule_id, rule_file, directory, outfile, replace_id, stack_version, skip_unsupported):
"""Export rule(s) into an importable ndjson file."""
from .packaging import Package
if not (rule_id or rule_file or directory):
client_error('Required: at least one of --rule-id, --rule-file, or --directory')
if rule_id:
all_rules = {r.id: r for r in rule_loader.load_rules(verbose=False).values()}
missing = [rid for rid in rule_id if rid not in all_rules]
if missing:
client_error(f'Unknown rules for rule IDs: {", ".join(missing)}')
rules = [r for r in all_rules.values() if r.id in rule_id]
rule_ids = [r.id for r in rules]
else:
rules = []
rule_ids = []
rule_files = list(rule_file)
for dirpath in directory:
rule_files.extend(list(Path(dirpath).rglob('*.toml')))
file_lookup = rule_loader.load_rule_files(verbose=False, paths=rule_files)
rules_from_files = rule_loader.load_rules(file_lookup=file_lookup).values() if file_lookup else []
# rule_loader.load_rules handles checks for duplicate rule IDs - this means rules loaded by ID are de-duped and
# rules loaded from files and directories are de-duped from each other, so this check is to ensure that there is
# no overlap between the two sets of rules
duplicates = [r.id for r in rules_from_files if r.id in rule_ids]
if duplicates:
client_error(f'Duplicate rules for rule IDs: {", ".join(duplicates)}')
rules.extend(rules_from_files)
if replace_id:
from uuid import uuid4
for rule in rules:
rule.contents['rule_id'] = str(uuid4())
Path(outfile).parent.mkdir(exist_ok=True)
package = Package(rules, '_', verbose=False)
package.export(outfile, downgrade_version=stack_version, skip_unsupported=skip_unsupported)
return package.rules
@root.command('validate-rule')
@click.argument('rule-id', required=False)
@click.option('--rule-name', '-n')
+3 -2
View File
@@ -17,7 +17,7 @@ from dataclasses import dataclass, field
from datetime import datetime
from functools import wraps
from pathlib import Path
from typing import Dict, Tuple
from typing import Dict, NoReturn, Tuple
from zipfile import ZipFile
import click
@@ -359,7 +359,8 @@ class ClientError(click.ClickException):
click.echo(msg, err=err, file=file)
def client_error(message, exc: Exception = None, debug=None, ctx: click.Context = None, file=None, err=None):
def client_error(message, exc: Exception = None, debug=None, ctx: click.Context = None, file=None,
err=None) -> NoReturn:
config_debug = True if ctx and ctx.ensure_object(dict) and ctx.obj.get('debug') is True else False
debug = debug if debug is not None else config_debug
+42 -7
View File
@@ -10,12 +10,14 @@ import json
import os
import shutil
from collections import defaultdict, OrderedDict
from pathlib import Path
from typing import List
import click
from . import rule_loader
from .misc import JS_LICENSE, cached
from .rule import Rule # noqa: F401
from .rule import Rule, downgrade_contents_from_rule # noqa: F401
from .utils import get_path, get_etc_path, load_etc_dump, save_etc_dump
RELEASE_DIR = get_path("releases")
@@ -142,24 +144,25 @@ class Package(object):
"""Packaging object for siem rules and releases."""
def __init__(self, rules, name, deprecated_rules=None, release=False, current_versions=None, min_version=None,
max_version=None, update_version_lock=False):
max_version=None, update_version_lock=False, verbose=True):
"""Initialize a package."""
self.rules = [r.copy() for r in rules] # type: list[Rule]
self.rules: List[Rule] = [r.copy() for r in rules]
self.name = name
self.deprecated_rules = [r.copy() for r in deprecated_rules or []] # type: list[Rule]
self.deprecated_rules: List[Rule] = [r.copy() for r in deprecated_rules or []]
self.release = release
self.changed_rule_ids, self.new_rules_ids, self.removed_rule_ids = self._add_versions(current_versions,
update_version_lock)
update_version_lock,
verbose=verbose)
if min_version or max_version:
self.rules = [r for r in self.rules
if (min_version or 0) <= r.contents['version'] <= (max_version or r.contents['version'])]
def _add_versions(self, current_versions, update_versions_lock=False):
def _add_versions(self, current_versions, update_versions_lock=False, verbose=True):
"""Add versions to rules at load time."""
return manage_versions(self.rules, deprecated_rules=self.deprecated_rules, current_versions=current_versions,
save_changes=update_versions_lock)
save_changes=update_versions_lock, verbose=verbose)
@staticmethod
def _package_notice_file(save_dir):
@@ -250,6 +253,38 @@ class Package(object):
if verbose:
click.echo('Package saved to: {}'.format(save_dir))
def export(self, outfile, downgrade_version=None, verbose=True, skip_unsupported=False):
"""Export rules into a consolidated ndjson file."""
outfile = Path(outfile).with_suffix('.ndjson')
unsupported = []
if downgrade_version:
if skip_unsupported:
output_lines = []
for rule in self.rules:
try:
output_lines.append(json.dumps(downgrade_contents_from_rule(rule, downgrade_version),
sort_keys=True))
except ValueError as e:
unsupported.append(f'{e}: {rule.id} - {rule.name}')
continue
else:
output_lines = [json.dumps(downgrade_contents_from_rule(r, downgrade_version), sort_keys=True)
for r in self.rules]
else:
output_lines = [json.dumps(r.contents, sort_keys=True) for r in self.rules]
outfile.write_text('\n'.join(output_lines) + '\n')
if verbose:
click.echo(f'Exported {len(self.rules) - len(unsupported)} rules into {outfile}')
if skip_unsupported and unsupported:
unsupported_str = '\n- '.join(unsupported)
click.echo(f'Skipped {len(unsupported)} unsupported rules: \n- {unsupported_str}')
def get_package_hash(self, as_api=True, verbose=True):
"""Get hash of package contents."""
contents = base64.b64encode(self.get_consolidated(as_api=as_api).encode('utf-8'))
+12 -1
View File
@@ -7,6 +7,7 @@ import copy
import hashlib
import json
import os
from uuid import uuid4
import click
import kql
@@ -15,7 +16,7 @@ import eql
from . import ecs, beats
from .attack import tactics, build_threat_map_entry, matrix
from .rule_formatter import nested_normalize, toml_write
from .schemas import CurrentSchema, TomlMetadata # RULE_TYPES, metadata_schema, schema_validate, get_schema
from .schemas import CurrentSchema, TomlMetadata, downgrade
from .utils import get_path, clear_caches, cached
@@ -502,3 +503,13 @@ class Rule(object):
click.echo(' - to have a rule validate against a specific beats schema, add it to metadata->beats_version')
return rule
def downgrade_contents_from_rule(rule: Rule, target_version: str) -> dict:
"""Generate the downgraded contents from a rule."""
payload = rule.contents.copy()
meta = payload.setdefault("meta", {})
meta["original"] = dict(id=rule.id, **rule.metadata)
payload["rule_id"] = str(uuid4())
payload = downgrade(payload, target_version)
return payload
+2 -1
View File
@@ -14,6 +14,7 @@ from .v7_11 import ApiSchema711
__all__ = (
"all_schemas",
"available_versions",
"downgrade",
"CurrentSchema",
"validate_rta_mapping",
@@ -26,8 +27,8 @@ all_schemas = [
ApiSchema710,
ApiSchema711,
]
CurrentSchema = all_schemas[-1]
available_versions = [cls.STACK_VERSION for cls in all_schemas]
def downgrade(api_contents: dict, target_version: str):