Add GitHub PR rule loader (#670)

* add load_gh_pr_rules function
* add dev package-stats command
* add dev search-rule-prs command, which extends the same functionality in rule-search to rules in PR
This commit is contained in:
Justin Ibarra
2021-02-08 21:35:44 -09:00
committed by GitHub
parent 56dc4745b5
commit b8116a5b77
5 changed files with 163 additions and 12 deletions
+92 -6
View File
@@ -40,14 +40,23 @@ def dev_group():
@click.argument('config-file', type=click.Path(exists=True, dir_okay=False), required=False, default=PACKAGE_FILE)
@click.option('--update-version-lock', '-u', is_flag=True,
help='Save version.lock.json file with updated rule versions in the package')
def build_release(config_file, update_version_lock):
def build_release(config_file, update_version_lock, release=None, verbose=True):
"""Assemble all the rules into Kibana-ready release files."""
config = load_dump(config_file)['package']
click.echo('[+] Building package {}'.format(config.get('name')))
package = Package.from_config(config, update_version_lock=update_version_lock, verbose=True)
package.save()
package.get_package_hash(verbose=True)
click.echo('- {} rules included'.format(len(package.rules)))
if release is not None:
config['release'] = release
if verbose:
click.echo('[+] Building package {}'.format(config.get('name')))
package = Package.from_config(config, update_version_lock=update_version_lock, verbose=verbose)
package.save(verbose=verbose)
if verbose:
package.get_package_hash(verbose=True)
click.echo(f'- {len(package.rules)} rules included')
return package
@dev_group.command('update-lock-versions')
@@ -210,6 +219,83 @@ def license_check(ctx):
ctx.exit(int(failed))
@dev_group.command('package-stats')
@click.option('--token', '-t', help='GitHub token to search API authenticated (may exceed threshold without auth)')
@click.option('--threads', default=50, help='Number of threads to download rules from GitHub')
@click.pass_context
def package_stats(ctx, token, threads):
"""Get statistics for current rule package."""
current_package: Package = ctx.invoke(build_release, verbose=False, release=None)
release = f'v{current_package.name}.0'
new, modified, errors = rule_loader.load_github_pr_rules(labels=[release], token=token, threads=threads)
click.echo(f'Total rules as of {release} package: {len(current_package.rules)}')
click.echo(f'New rules: {len(current_package.new_rules_ids)}')
click.echo(f'Modified rules: {len(current_package.changed_rule_ids)}')
click.echo(f'Deprecated rules: {len(current_package.removed_rule_ids)}')
click.echo('\n-----\n')
click.echo('Rules in active PRs for current package: ')
click.echo(f'New rules: {len(new)}')
click.echo(f'Modified rules: {len(modified)}')
@dev_group.command('search-rule-prs')
@click.argument('query', required=False)
@click.option('--no-loop', '-n', is_flag=True, help='Run once with no loop')
@click.option('--columns', '-c', multiple=True, help='Specify columns to add the table')
@click.option('--language', type=click.Choice(["eql", "kql"]), default="kql")
@click.option('--token', '-t', help='GitHub token to search API authenticated (may exceed threshold without auth)')
@click.option('--threads', default=50, help='Number of threads to download rules from GitHub')
@click.pass_context
def search_rule_prs(ctx, no_loop, query, columns, language, token, threads):
"""Use KQL or EQL to find matching rules from active GitHub PRs."""
from uuid import uuid4
from .main import search_rules
all_rules = {}
new, modified, errors = rule_loader.load_github_pr_rules(token=token, threads=threads)
def add_github_meta(this_rule, status, original_rule_id=None):
pr = this_rule.gh_pr
rule.metadata['status'] = status
rule.metadata['github'] = {
'base': pr.base.label,
'comments': [c.body for c in pr.get_comments()],
'commits': pr.commits,
'created_at': str(pr.created_at),
'head': pr.head.label,
'is_draft': pr.draft,
'labels': [lbl.name for lbl in pr.get_labels()],
'last_modified': str(pr.last_modified),
'title': pr.title,
'url': pr.html_url,
'user': pr.user.login
}
if original_rule_id:
rule.metadata['original_rule_id'] = original_rule_id
rule.contents['rule_id'] = str(uuid4())
rule_path = f'pr-{pr.number}-{rule.path}'
all_rules[rule_path] = rule.rule_format()
for rule_id, rule in new.items():
add_github_meta(rule, 'new')
for rule_id, rules in modified.items():
for rule in rules:
add_github_meta(rule, 'modified', rule_id)
loop = not no_loop
ctx.invoke(search_rules, query=query, columns=columns, language=language, rules=all_rules, pager=loop)
while loop:
query = click.prompt(f'Search loop - enter new {language} query or ctrl-z to exit')
columns = click.prompt('columns', default=','.join(columns)).split(',')
ctx.invoke(search_rules, query=query, columns=columns, language=language, rules=all_rules, pager=True)
@dev_group.group('test')
def test_group():
"""Commands for testing against stack resources."""