Update package summary extras produced during package generation (#341)

* update summary.txt
* add summary.xlsx
* add changelog entry autogeneration
This commit is contained in:
Justin Ibarra
2020-09-30 17:43:45 -05:00
committed by GitHub
parent 83fb9bdf93
commit 7c1e9c1ed5
11 changed files with 425 additions and 60 deletions
+7
View File
@@ -36,6 +36,13 @@ jobs:
run: |
python -m detection_rules build-release
- name: Archive production artifacts
uses: actions/upload-artifact@v2
with:
name: release-files
path: |
releases
- name: Unit tests
run: |
python -m detection_rules test
+2
View File
@@ -3,6 +3,7 @@
# you may not use this file except in compliance with the Elastic License.
"""Detection rules."""
from . import docs
from . import eswrap
from . import main
from . import mappings
@@ -13,6 +14,7 @@ from . import schemas
from . import utils
__all__ = (
'docs',
'eswrap',
'mappings',
"main",
+205
View File
@@ -0,0 +1,205 @@
# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
# or more contributor license agreements. Licensed under the Elastic License;
# you may not use this file except in compliance with the Elastic License.
"""Create summary documents for a rule package."""
from collections import defaultdict
from pathlib import Path
import xlsxwriter
from .attack import technique_lookup, matrix, attack_tm, tactics
from .packaging import Package
class PackageDocument(xlsxwriter.Workbook):
"""Excel document for summarizing a rules package."""
def __init__(self, path, package):
"""Create an excel workbook for the package."""
self._default_format = {'font_name': 'Helvetica', 'font_size': 12}
super(PackageDocument, self).__init__(path)
self.package: Package = package
self.deprecated_rules = package.deprecated_rules
self.production_rules = package.rules
self.percent = self.add_format({'num_format': '0%'})
self.bold = self.add_format({'bold': True})
self.default_header_format = self.add_format({'bold': True, 'bg_color': '#FFBE33'})
self.center = self.add_format({'align': 'center', 'valign': 'center'})
self.bold_center = self.add_format({'bold': True, 'align': 'center', 'valign': 'center'})
self.right_align = self.add_format({'align': 'right'})
self._coverage = self._get_attack_coverage()
def add_format(self, properties=None):
"""Add a format to the doc."""
properties = properties or {}
for key in self._default_format:
if key not in properties:
properties[key] = self._default_format[key]
return super(PackageDocument, self).add_format(properties)
def _get_attack_coverage(self):
coverage = defaultdict(lambda: defaultdict(lambda: defaultdict(int)))
for rule in self.package.rules:
threat = rule.contents.get('threat')
sub_dir = Path(rule.path).parent.name
if threat:
for entry in threat:
tactic = entry['tactic']
techniques = entry['technique']
for technique in techniques:
if technique['id'] in matrix[tactic['name']]:
coverage[tactic['name']][technique['id']][sub_dir] += 1
return coverage
def populate(self):
"""Populate the different pages."""
self.add_summary()
self.add_rule_details()
self.add_attack_matrix()
self.add_rta_mapping()
self.add_rule_details(self.deprecated_rules, 'Deprecated Rules')
def add_summary(self):
"""Add the summary worksheet."""
worksheet = self.add_worksheet('Summary')
worksheet.freeze_panes(1, 0)
worksheet.set_column(0, 0, 25)
worksheet.set_column(1, 1, 10)
row = 0
worksheet.merge_range(row, 0, row, 1, "SUMMARY", self.bold_center)
row += 1
worksheet.write(row, 0, "Package Name")
worksheet.write(row, 1, self.package.name, self.right_align)
row += 1
tactic_counts = defaultdict(int)
for rule in self.package.rules:
threat = rule.contents.get('threat')
if threat:
for entry in threat:
tactic_counts[entry['tactic']['name']] += 1
worksheet.write(row, 0, "Total Production Rules")
worksheet.write(row, 1, len(self.production_rules))
row += 2
worksheet.write(row, 0, "Total Deprecated Rules")
worksheet.write(row, 1, len(self.deprecated_rules))
row += 1
worksheet.write(row, 0, "Total Rules")
worksheet.write(row, 1, len(self.package.rules))
row += 2
worksheet.merge_range(row, 0, row, 3, f"MITRE {attack_tm} TACTICS", self.bold_center)
row += 1
for tactic in tactics:
worksheet.write(row, 0, tactic)
worksheet.write(row, 1, tactic_counts[tactic])
num_techniques = len(self._coverage[tactic])
total_techniques = len(matrix[tactic])
percent = float(num_techniques) / float(total_techniques)
worksheet.write(row, 2, percent, self.percent)
worksheet.write(row, 3, f'{num_techniques}/{total_techniques}', self.right_align)
row += 1
def add_rule_details(self, rules=None, name='Rule Details'):
"""Add a worksheet for detailed metadata of rules."""
if rules is None:
rules = self.production_rules
worksheet = self.add_worksheet(name)
worksheet.freeze_panes(1, 1)
headers = ('Name', 'ID', 'Version', 'Type', 'Language', 'Index', 'Tags',
f'{attack_tm} Tactics', f'{attack_tm} Techniques', 'Description')
for column, header in enumerate(headers):
worksheet.write(0, column, header, self.default_header_format)
column_max_widths = [0 for i in range(len(headers))]
metadata_fields = (
'name', 'rule_id', 'version', 'type', 'language', 'index', 'tags', 'tactics', 'techniques', 'description'
)
for row, rule in enumerate(rules, 1):
tactic_names, _, _, technique_ids = rule.get_flat_mitre()
rule_contents = {'tactics': tactic_names, 'techniques': technique_ids}
rule_contents.update(rule.contents.copy())
for column, field in enumerate(metadata_fields):
value = rule_contents.get(field)
if value is None:
continue
elif isinstance(value, list):
value = ', '.join(value)
worksheet.write(row, column, value)
column_max_widths[column] = max(column_max_widths[column], len(str(value)))
# cap description width at 80
column_max_widths[-1] = 80
# this is still not perfect because the font used is not monospaced, but it gets it close
for index, width in enumerate(column_max_widths):
worksheet.set_column(index, index, width)
worksheet.autofilter(0, 0, len(rules) + 1, len(headers) - 1)
def add_rta_mapping(self):
"""Add a worksheet for the RTA/Rule RTA mapping."""
from .rule_loader import rta_mappings
worksheet = self.add_worksheet('RTA Mapping')
worksheet.freeze_panes(1, 0)
headers = ('Rule ID', 'Rule Name', 'RTA')
for column, header in enumerate(headers):
worksheet.write(0, column, header, self.default_header_format)
row = 1
for rule_id, mapping in rta_mappings.get_rta_mapping().items():
worksheet.write(row, 0, rule_id)
worksheet.write(row, 1, mapping['rule_name'])
worksheet.write(row, 2, mapping['rta_name'])
row += 1
worksheet.set_column(0, 0, 35)
worksheet.set_column(1, 1, 50)
worksheet.set_column(2, 2, 35)
def add_attack_matrix(self):
"""Add a worksheet for ATT&CK coverage."""
worksheet = self.add_worksheet(attack_tm + ' Coverage')
worksheet.freeze_panes(1, 0)
header = self.add_format({'font_size': 12, 'bold': True, 'bg_color': '#005B94', 'font_color': 'white'})
default = self.add_format({'font_size': 10, 'text_wrap': True})
bold = self.add_format({'font_size': 10, 'bold': True, 'text_wrap': True})
technique_url = 'https://attack.mitre.org/techniques/'
for column, tactic in enumerate(tactics):
worksheet.write(0, column, tactic, header)
worksheet.set_column(column, column, 20)
for row, technique_id in enumerate(matrix[tactic], 1):
technique = technique_lookup[technique_id]
fmt = bold if technique_id in self._coverage[tactic] else default
coverage = self._coverage[tactic].get(technique_id)
coverage_str = ''
if coverage:
coverage_str = '\n\n'
coverage_str += '\n'.join(f'{sub_dir}: {count}' for sub_dir, count in coverage.items())
worksheet.write_url(row, column, technique_url + technique_id.replace('.', '/'), cell_format=fmt,
string=technique['name'], tip=f'{technique_id}{coverage_str}')
worksheet.autofilter(0, 0, max([len(v) for k, v in matrix.items()]) + 1, len(tactics) - 1)
+1 -1
View File
@@ -50,7 +50,7 @@ class RtaMappings(object):
def get_rta_mapping(self):
"""Build the rule<-->rta mapping based off the mapping file."""
if not self._rta_mapping:
self._rta_mapping = {rule_id: data['rta'] for rule_id, data in self.mapping.items()}
self._rta_mapping = self.mapping.copy()
return self._rta_mapping
+178 -50
View File
@@ -4,26 +4,26 @@
"""Packaging and preparation for releases."""
import base64
import datetime
import hashlib
import json
import os
import shutil
from collections import OrderedDict
from collections import defaultdict, OrderedDict
import click
from . import rule_loader
from .misc import JS_LICENSE
from .rule import Rule # noqa: F401
from .utils import get_path, get_etc_path
from .utils import get_path, get_etc_path, load_etc_dump, save_etc_dump
RELEASE_DIR = get_path("releases")
PACKAGE_FILE = get_etc_path('packages.yml')
RULE_VERSIONS = get_etc_path('version.lock.json')
NOTICE_FILE = get_path('NOTICE.txt')
def filter_rule(rule, config_filter, exclude_fields): # type: (Rule,dict,dict) -> bool
def filter_rule(rule: Rule, config_filter: dict, exclude_fields: dict) -> bool:
"""Filter a rule based off metadata and a package configuration."""
flat_rule = rule.flattened_contents
for key, values in config_filter.items():
@@ -49,16 +49,14 @@ def filter_rule(rule, config_filter, exclude_fields): # type: (Rule,dict,dict)
return True
def manage_versions(rules, current_versions=None, exclude_version_update=False, add_new=True, save_changes=False,
verbose=True):
# type: (list[Rule], dict, bool, bool, bool, bool) -> [list, list]
def manage_versions(rules: list, deprecated_rules: list = None, current_versions: dict = None,
exclude_version_update=False, add_new=True, save_changes=False, verbose=True) -> (list, list, list):
"""Update the contents of the version.lock file and optionally save changes."""
new_rules = {}
changed_rules = []
if current_versions is None:
with open(RULE_VERSIONS, 'r') as f:
current_versions = json.load(f)
current_versions = load_etc_dump('version.lock.json')
for rule in rules:
# it is a new rule, so add it if specified, and add an initial version to the rule
@@ -82,44 +80,72 @@ def manage_versions(rules, current_versions=None, exclude_version_update=False,
else:
rule.contents['version'] = version
# manage deprecated rules
newly_deprecated = []
rule_deprecations = {}
if deprecated_rules:
rule_deprecations = load_etc_dump('deprecated_rules.json')
deprecation_date = str(datetime.date.today())
for rule in deprecated_rules:
if rule.id not in rule_deprecations:
rule_deprecations[rule.id] = {
'rule_name': rule.name,
'deprecation_date': deprecation_date
}
newly_deprecated.append(rule.id)
# update the document with the new rules
if new_rules or changed_rules:
if new_rules or changed_rules or newly_deprecated:
if verbose:
click.echo('Rule hash changes detected!')
if save_changes:
current_versions.update(new_rules if add_new else {})
current_versions = OrderedDict(sorted(current_versions.items(), key=lambda x: x[1]['rule_name']))
if changed_rules or (new_rules and add_new):
current_versions.update(new_rules if add_new else {})
current_versions = OrderedDict(sorted(current_versions.items(), key=lambda x: x[1]['rule_name']))
with open(RULE_VERSIONS, 'w') as f:
json.dump(current_versions, f, indent=2, sort_keys=True)
save_etc_dump(current_versions, 'version.lock.json')
if verbose:
click.echo('Updated version.lock.json file with:')
click.echo('Updated version.lock.json file')
if newly_deprecated:
save_etc_dump(sorted(OrderedDict(rule_deprecations)), 'deprecated_rules.json')
if verbose:
click.echo('Updated deprecated_rules.json file')
else:
if verbose:
click.echo('run `build-release --update-version-lock` to update the version.lock.json file')
click.echo('run `build-release --update-version-lock` to update the version.lock.json and '
'deprecated_rules.json files')
if verbose:
if changed_rules:
click.echo(' - {} changed rule version(s)'.format(len(changed_rules)))
click.echo(f' - {len(changed_rules)} changed rule version(s)')
if new_rules:
click.echo(' - {} new rule version addition(s)'.format(len(new_rules)))
click.echo(f' - {len(new_rules)} new rule version addition(s)')
if newly_deprecated:
click.echo(f' - {len(newly_deprecated)} newly deprecated rule(s)')
return changed_rules, new_rules.keys()
return changed_rules, list(new_rules), newly_deprecated
class Package(object):
"""Packaging object for siem rules and releases."""
def __init__(self, rules, name, release=False, current_versions=None, min_version=None, max_version=None,
update_version_lock=False):
def __init__(self, rules, name, deprecated_rules=None, release=False, current_versions=None, min_version=None,
max_version=None, update_version_lock=False):
"""Initialize a package."""
self.rules = [r.copy() for r in rules] # type: list[Rule]
self.name = name
self.deprecated_rules = [r.copy() for r in deprecated_rules or []] # type: list[Rule]
self.release = release
self.changed_rules, self.new_rules = self._add_versions(current_versions, update_version_lock)
self.changed_rule_ids, self.new_rules_ids, self.removed_rule_ids = self._add_versions(current_versions,
update_version_lock)
if min_version or max_version:
self.rules = [r for r in self.rules
@@ -127,7 +153,8 @@ class Package(object):
def _add_versions(self, current_versions, update_versions_lock=False):
"""Add versions to rules at load time."""
return manage_versions(self.rules, current_versions=current_versions, save_changes=update_versions_lock)
return manage_versions(self.rules, deprecated_rules=self.deprecated_rules, current_versions=current_versions,
save_changes=update_versions_lock)
@staticmethod
def _package_notice_file(save_dir):
@@ -167,12 +194,17 @@ class Package(object):
with open(os.path.join(save_dir, 'index.ts'), 'wt') as f:
f.write('\n'.join(index_ts))
def save_release_files(self, directory, changed_rules, new_rules):
def save_release_files(self, directory, changed_rules, new_rules, removed_rules):
"""Release a package."""
with open(os.path.join(directory, '{}-summary.txt'.format(self.name)), 'w') as f:
f.write(self.generate_summary(changed_rules, new_rules))
with open(os.path.join(directory, '{}-consolidated.json'.format(self.name)), 'w') as f:
summary, changelog = self.generate_summary_and_changelog(changed_rules, new_rules, removed_rules)
with open(os.path.join(directory, f'{self.name}-summary.txt'), 'w') as f:
f.write(summary)
with open(os.path.join(directory, f'{self.name}-changelog-entry.md'), 'w') as f:
f.write(changelog)
with open(os.path.join(directory, f'{self.name}-consolidated.json'), 'w') as f:
json.dump(json.loads(self.get_consolidated()), f, sort_keys=True, indent=2)
self.generate_xslx(os.path.join(directory, f'{self.name}-summary.xlsx'))
def get_consolidated(self, as_api=True):
"""Get a consolidated package of the rules in a single file."""
@@ -200,7 +232,7 @@ class Package(object):
self._package_index_file(rules_dir)
if self.release:
self.save_release_files(extras_dir, self.changed_rules, self.new_rules)
self.save_release_files(extras_dir, self.changed_rule_ids, self.new_rules_ids, self.removed_rule_ids)
# zip all rules only and place in extras
shutil.make_archive(os.path.join(extras_dir, self.name), 'zip', root_dir=os.path.dirname(rules_dir),
@@ -229,18 +261,17 @@ class Package(object):
all_rules = rule_loader.load_rules(verbose=False).values()
config = config or {}
exclude_fields = config.pop('exclude_fields', {})
log_deprecated = config.pop('log_deprecated', False)
rule_filter = config.pop('filter', {})
min_version = config.pop('min_version', None)
max_version = config.pop('max_version', None)
deprecated_rules = [r for r in all_rules if r.metadata['maturity'] == 'deprecated'] if log_deprecated else []
rules = list(filter(lambda rule: filter_rule(rule, rule_filter, exclude_fields), all_rules))
if verbose:
click.echo(f' - {len(all_rules) - len(rules)} rules excluded from package')
update = config.pop('update', {})
package = cls(rules, min_version=min_version, max_version=max_version, update_version_lock=update_version_lock,
**config)
package = cls(rules, deprecated_rules=deprecated_rules, update_version_lock=update_version_lock, **config)
# Allow for some fields to be overwritten
if update.get('data', {}):
@@ -250,29 +281,126 @@ class Package(object):
return package
def generate_summary(self, changed_rules, new_rules):
def generate_summary_and_changelog(self, changed_rule_ids, new_rule_ids, removed_rules):
"""Generate stats on package."""
ecs_versions = set()
indices = set()
changed = []
new = []
from string import ascii_lowercase, ascii_uppercase
summary = {
'changed': defaultdict(list),
'added': defaultdict(list),
'removed': defaultdict(list),
'unchanged': defaultdict(list)
}
changelog = {
'changed': defaultdict(list),
'added': defaultdict(list),
'removed': defaultdict(list),
'unchanged': defaultdict(list)
}
# build an index map first
longest_name = 0
indexes = set()
for rule in self.rules:
longest_name = max(longest_name, len(rule.name))
index_list = rule.contents.get('index')
if index_list:
indexes.update(index_list)
letters = ascii_uppercase + ascii_lowercase
index_map = {index: letters[i] for i, index in enumerate(sorted(indexes))}
def get_summary_rule_info(r: Rule):
rule_str = f'{r.name:<{longest_name}} (v:{r.contents.get("version")} t:{r.type}'
rule_str += f'-{r.contents["language"]})' if r.contents.get('language') else ')'
rule_str += f'(indexes:{"".join(index_map[i] for i in r.contents.get("index"))})' \
if r.contents.get('index') else ''
return rule_str
def get_markdown_rule_info(r: Rule, sd):
rules_dir_link = f'https://github.com/elastic/detection-rules/tree/v{self.name}/rules/{sd}/'
rule_type = r.contents['language'] if r.type in ('query', 'eql') else r.type
return f'`{r.id}` **[{r.name}]({rules_dir_link + os.path.basename(r.path)})** (_{rule_type}_)'
for rule in self.rules:
ecs_versions.update(rule.ecs_version)
indices.update(rule.contents.get('index', ''))
sub_dir = os.path.basename(os.path.dirname(rule.path))
if rule.id in changed_rules:
changed.append('{} (v{})'.format(rule.name, rule.contents.get('version')))
elif rule.id in new_rules:
new.append('{} (v{})'.format(rule.name, rule.contents.get('version')))
if rule.id in changed_rule_ids:
summary['changed'][sub_dir].append(get_summary_rule_info(rule))
changelog['changed'][sub_dir].append(get_markdown_rule_info(rule, sub_dir))
elif rule.id in new_rule_ids:
summary['added'][sub_dir].append(get_summary_rule_info(rule))
changelog['added'][sub_dir].append(get_markdown_rule_info(rule, sub_dir))
else:
summary['unchanged'][sub_dir].append(get_summary_rule_info(rule))
changelog['unchanged'][sub_dir].append(get_markdown_rule_info(rule, sub_dir))
total = 'Total Rules: {}'.format(len(self.rules))
sha256 = 'Package Hash: {}'.format(self.get_package_hash(verbose=False))
ecs_versions = 'ECS Versions: {}'.format(', '.join(ecs_versions))
indices = 'Included Indexes: {}'.format(', '.join(indices))
new_rules = 'New Rules: \n{}'.format('\n'.join(' - ' + s for s in sorted(new)) if new else 'N/A')
modified_rules = 'Modified Rules: \n{}'.format('\n'.join(' - ' + s for s in sorted(changed)) if new else 'N/A')
return '\n'.join([total, sha256, ecs_versions, indices, new_rules, modified_rules])
for rule in self.deprecated_rules:
sub_dir = os.path.basename(os.path.dirname(rule.path))
if rule.id in removed_rules:
summary['removed'][sub_dir].append(rule.name)
changelog['removed'][sub_dir].append(rule.name)
def format_summary_rule_str(rule_dict):
str_fmt = ''
for sd, rules in sorted(rule_dict.items(), key=lambda x: x[0]):
str_fmt += f'\n{sd} ({len(rules)})\n'
str_fmt += '\n'.join(' - ' + s for s in sorted(rules))
return str_fmt or '\nNone'
def format_changelog_rule_str(rule_dict):
str_fmt = ''
for sd, rules in sorted(rule_dict.items(), key=lambda x: x[0]):
str_fmt += f'\n- **{sd}** ({len(rules)})\n'
str_fmt += '\n'.join(' - ' + s for s in sorted(rules))
return str_fmt or '\nNone'
def rule_count(rule_dict):
count = 0
for _, rules in rule_dict.items():
count += len(rules)
return count
today = str(datetime.date.today())
summary_fmt = [f'{sf.capitalize()} ({rule_count(summary[sf])}): \n{format_summary_rule_str(summary[sf])}\n'
for sf in ('added', 'changed', 'removed', 'unchanged') if summary[sf]]
change_fmt = [f'{sf.capitalize()} ({rule_count(changelog[sf])}): \n{format_changelog_rule_str(changelog[sf])}\n'
for sf in ('added', 'changed', 'removed') if changelog[sf]]
summary_str = '\n'.join([
f'Version {self.name}',
f'Generated: {today}',
f'Total Rules: {len(self.rules)}',
f'Package Hash: {self.get_package_hash(verbose=False)}',
'---',
'(v: version, t: rule_type-language)',
'Index Map:\n{}'.format("\n".join(f" {v}: {k}" for k, v in index_map.items())),
'',
'Rules',
*summary_fmt
])
changelog_str = '\n'.join([
f'# Version {self.name}',
f'_Released {today}_',
'',
'### Rules',
*change_fmt,
'',
'### CLI'
])
return summary_str, changelog_str
def generate_xslx(self, path):
"""Generate a detailed breakdown of a package in an excel file."""
from .docs import PackageDocument
doc = PackageDocument(path, self)
doc.populate()
doc.close()
def bump_versions(self, save_changes=False, current_versions=None):
"""Bump the versions of all production rules included in a release and optionally save changes."""
+14
View File
@@ -104,6 +104,20 @@ class Rule(object):
if self.query and self.contents['language'] == 'kuery':
return kql.to_eql(self.query)
def get_flat_mitre(self):
"""Get flat lists of tactic and technique info."""
tactic_names = []
tactic_ids = []
technique_ids = set()
technique_names = set()
for entry in self.contents.get('threat', []):
tactic_names.append(entry['tactic']['name'])
tactic_ids.append(entry['tactic']['id'])
technique_names.update([t['name'] for t in entry['technique']])
technique_ids.update([t['id'] for t in entry['technique']])
return sorted(tactic_names), sorted(tactic_ids), sorted(technique_names), sorted(technique_ids)
@classmethod
def get_unique_query_fields(cls, rule_contents):
"""Get a list of unique fields used in a rule query from rule contents."""
+4 -2
View File
@@ -69,14 +69,16 @@ def load_etc_dump(*path):
return eql.utils.load_dump(get_etc_path(*path))
def save_etc_dump(contents, *path):
def save_etc_dump(contents, *path, **kwargs):
"""Load a json/yml/toml file from the etc/ folder."""
path = get_etc_path(*path)
_, ext = os.path.splitext(path)
sort_keys = kwargs.pop('sort_keys', True)
indent = kwargs.pop('indent', 2)
if ext == ".json":
with open(path, "wt") as f:
json.dump(contents, f, cls=DateTimeEncoder, sort_keys=True, indent=2)
json.dump(contents, f, cls=DateTimeEncoder, sort_keys=sort_keys, indent=indent, **kwargs)
else:
return eql.utils.save_dump(contents, path)
+1
View File
@@ -0,0 +1 @@
{}
+5
View File
@@ -16,3 +16,8 @@ package:
# - 1.5.0
maturity:
- production
# log deprecated rules in summary and change logs
log_deprecated: true
# rule version scoping
# min_version: 1
# max_version: 5
+1
View File
@@ -7,6 +7,7 @@ Click==7.0
PyYAML~=5.3
eql~=0.9
elasticsearch~=7.5.1
XlsxWriter==1.3.6
# test deps
pyflakes==2.2.0
+7 -7
View File
@@ -8,7 +8,7 @@ import uuid
import yaml
from detection_rules import rule_loader
from detection_rules.packaging import Package, PACKAGE_FILE
from detection_rules.packaging import PACKAGE_FILE, Package
class TestPackages(unittest.TestCase):
@@ -58,10 +58,10 @@ class TestPackages(unittest.TestCase):
@rule_loader.mock_loader
def test_package_summary(self):
"""Test the generation of the package summary."""
rules = list(rule_loader.load_rules().values())
rules = rule_loader.get_production_rules()
package = Package(rules, 'test-package')
changed_rules, new_rules = package.bump_versions(save_changes=False)
package.generate_summary(changed_rules, new_rules)
changed_rule_ids, new_rule_ids, deprecated_rule_ids = package.bump_versions(save_changes=False)
package.generate_summary_and_changelog(changed_rule_ids, new_rule_ids, deprecated_rule_ids)
def test_versioning_diffs(self):
"""Test that versioning is detecting diffs as expected."""
@@ -69,7 +69,7 @@ class TestPackages(unittest.TestCase):
package = Package(rules, 'test', current_versions=version_info)
# test versioning doesn't falsely detect changes
changed_rules, new_rules = package.changed_rules, package.new_rules
changed_rules, new_rules = package.changed_rule_ids, package.new_rules_ids
self.assertEqual(0, len(changed_rules), 'Package version bumping is improperly detecting changed rules')
self.assertEqual(0, len(new_rules), 'Package version bumping is improperly detecting new rules')
@@ -77,7 +77,7 @@ class TestPackages(unittest.TestCase):
# test versioning detects a new rule
package.rules[0].contents.pop('version')
changed_rules, new_rules = package.bump_versions(current_versions={})
changed_rules, new_rules, _ = package.bump_versions(current_versions={})
self.assertEqual(0, len(changed_rules), 'Package version bumping is improperly detecting changed rules')
self.assertEqual(1, len(new_rules), 'Package version bumping is not detecting new rules')
@@ -87,7 +87,7 @@ class TestPackages(unittest.TestCase):
# test versioning detects a hash changes
package.rules[0].contents.pop('version')
package.rules[0].contents['query'] = 'process.name:changed.test.query'
changed_rules, new_rules = package.bump_versions(current_versions=version_info)
changed_rules, new_rules, _ = package.bump_versions(current_versions=version_info)
self.assertEqual(1, len(changed_rules), 'Package version bumping is not detecting changed rules')
self.assertEqual(0, len(new_rules), 'Package version bumping is improperly detecting new rules')