Move version lock code to object for portability (#1553)
* Move version lock code to object for portability * use cached_property to bypass frozen dataclass and set property * replace load_versions function
This commit is contained in:
@@ -29,7 +29,7 @@ from .ghwrap import GithubClient
|
||||
from .main import root
|
||||
from .misc import PYTHON_LICENSE, add_client, client_error
|
||||
from .packaging import PACKAGE_FILE, Package, RELEASE_DIR, current_stack_version
|
||||
from .version_lock import manage_versions, load_versions
|
||||
from .version_lock import default_version_lock
|
||||
from .rule import AnyRuleData, BaseRuleData, QueryRuleData, TOMLRule
|
||||
from .rule_loader import RuleCollection, production_filter
|
||||
from .schemas import definitions
|
||||
@@ -71,7 +71,7 @@ def build_release(config_file, update_version_lock, release=None, verbose=True):
|
||||
package = Package.from_config(config, verbose=verbose)
|
||||
|
||||
if update_version_lock:
|
||||
manage_versions(package.rules, save_changes=True, verbose=verbose)
|
||||
default_version_lock.manage_versions(package.rules, save_changes=True, verbose=verbose)
|
||||
|
||||
package.save(verbose=verbose)
|
||||
|
||||
@@ -176,8 +176,6 @@ def prune_staging_area(target_stack_version: str, dry_run: bool):
|
||||
@click.argument('rule-ids', nargs=-1, required=False)
|
||||
def update_lock_versions(rule_ids):
|
||||
"""Update rule hashes in version.lock.json file without bumping version."""
|
||||
from .packaging import manage_versions
|
||||
|
||||
rules = RuleCollection.default()
|
||||
|
||||
if rule_ids:
|
||||
@@ -189,7 +187,7 @@ def update_lock_versions(rule_ids):
|
||||
return
|
||||
|
||||
# this command may not function as expected anymore due to previous changes eliminating the use of add_new=False
|
||||
changed, new, _ = manage_versions(rules, exclude_version_update=True, save_changes=True)
|
||||
changed, new, _ = default_version_lock.manage_versions(rules, exclude_version_update=True, save_changes=True)
|
||||
|
||||
if not changed:
|
||||
click.echo('No hashes updated')
|
||||
@@ -648,7 +646,7 @@ def search_rule_prs(ctx, no_loop, query, columns, language, token, threads):
|
||||
@click.pass_context
|
||||
def deprecate_rule(ctx: click.Context, rule_file: Path):
|
||||
"""Deprecate a rule."""
|
||||
version_info = load_versions()
|
||||
version_info = default_version_lock.version_lock
|
||||
rule_collection = RuleCollection()
|
||||
contents = rule_collection.load_file(rule_file).contents
|
||||
rule = TOMLRule(path=rule_file, contents=contents)
|
||||
@@ -659,12 +657,6 @@ def deprecate_rule(ctx: click.Context, rule_file: Path):
|
||||
ctx.exit()
|
||||
|
||||
today = time.strftime('%Y/%m/%d')
|
||||
|
||||
new_meta = {
|
||||
'updated_date': today,
|
||||
'deprecation_date': today,
|
||||
'maturity': 'deprecated'
|
||||
}
|
||||
deprecated_path = get_path('rules', '_deprecated', rule_file.name)
|
||||
|
||||
# create the new rule and save it
|
||||
|
||||
@@ -23,7 +23,7 @@ from .rule import TOMLRule, QueryRuleData, ThreatMapping
|
||||
from .rule_loader import DeprecatedCollection, RuleCollection, DEFAULT_RULES_DIR
|
||||
from .schemas import definitions
|
||||
from .utils import Ndjson, get_path, get_etc_path, load_etc_dump
|
||||
from .version_lock import manage_versions
|
||||
from .version_lock import default_version_lock
|
||||
|
||||
RELEASE_DIR = get_path("releases")
|
||||
PACKAGE_FILE = get_etc_path('packages.yml')
|
||||
@@ -93,7 +93,7 @@ class Package(object):
|
||||
self.rules = self.rules.filter(lambda r: max_version >= r.contents.latest_version)
|
||||
|
||||
self.changed_ids, self.new_ids, self.removed_ids = \
|
||||
manage_versions(self.rules, verbose=verbose, save_changes=False)
|
||||
default_version_lock.manage_versions(self.rules, verbose=verbose, save_changes=False)
|
||||
|
||||
@classmethod
|
||||
def load_configs(cls):
|
||||
|
||||
+72
-34
@@ -382,6 +382,11 @@ class BaseRuleContents(ABC):
|
||||
def name(self):
|
||||
pass
|
||||
|
||||
@property
|
||||
@abstractmethod
|
||||
def version_lock(self):
|
||||
pass
|
||||
|
||||
def lock_info(self, bump=True) -> dict:
|
||||
version = self.autobumped_version if bump else (self.latest_version or 1)
|
||||
contents = {"rule_name": self.name, "sha256": self.sha256(), "version": version}
|
||||
@@ -391,9 +396,7 @@ class BaseRuleContents(ABC):
|
||||
@property
|
||||
def is_dirty(self) -> Optional[bool]:
|
||||
"""Determine if the rule has changed since its version was locked."""
|
||||
from .version_lock import get_locked_hash
|
||||
|
||||
existing_sha256 = get_locked_hash(self.id, self.metadata.get('min_stack_version'))
|
||||
existing_sha256 = self.version_lock.get_locked_hash(self.id, self.metadata.get('min_stack_version'))
|
||||
|
||||
if existing_sha256 is not None:
|
||||
return existing_sha256 != self.sha256()
|
||||
@@ -401,9 +404,7 @@ class BaseRuleContents(ABC):
|
||||
@property
|
||||
def latest_version(self) -> Optional[int]:
|
||||
"""Retrieve the latest known version of the rule."""
|
||||
from .version_lock import get_locked_version
|
||||
|
||||
return get_locked_version(self.id, self.metadata.get('min_stack_version'))
|
||||
return self.version_lock.get_locked_version(self.id, self.metadata.get('min_stack_version'))
|
||||
|
||||
@property
|
||||
def autobumped_version(self) -> Optional[int]:
|
||||
@@ -443,6 +444,23 @@ class TOMLRuleContents(BaseRuleContents, MarshmallowDataclassMixin):
|
||||
"""Rule object which maps directly to the TOML layout."""
|
||||
metadata: RuleMeta
|
||||
data: AnyRuleData = field(metadata=dict(data_key="rule"))
|
||||
# _version_lock: Optional[Any] = None
|
||||
|
||||
@cached_property
|
||||
def version_lock(self):
|
||||
# VersionLock
|
||||
from .version_lock import default_version_lock
|
||||
|
||||
return getattr(self, '_version_lock', None) or default_version_lock
|
||||
|
||||
def set_version_lock(self, value):
|
||||
from .version_lock import VersionLock
|
||||
|
||||
if value and not isinstance(value, VersionLock):
|
||||
raise TypeError(f'version lock property must be set with VersionLock objects only. Got {type(value)}')
|
||||
|
||||
# circumvent frozen class
|
||||
self.__dict__['_version_lock'] = value
|
||||
|
||||
@classmethod
|
||||
def all_rule_types(cls) -> set:
|
||||
@@ -534,12 +552,59 @@ class TOMLRule:
|
||||
f.write('\n')
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class DeprecatedRuleContents(BaseRuleContents):
|
||||
metadata: dict
|
||||
data: dict
|
||||
|
||||
@cached_property
|
||||
def version_lock(self):
|
||||
# VersionLock
|
||||
from .version_lock import default_version_lock
|
||||
|
||||
return getattr(self, '_version_lock', None) or default_version_lock
|
||||
|
||||
def set_version_lock(self, value):
|
||||
from .version_lock import VersionLock
|
||||
|
||||
if value and not isinstance(value, VersionLock):
|
||||
raise TypeError(f'version lock property must be set with VersionLock objects only. Got {type(value)}')
|
||||
|
||||
# circumvent frozen class
|
||||
self.__dict__['_version_lock'] = value
|
||||
|
||||
@property
|
||||
def id(self) -> str:
|
||||
return self.data.get('rule_id')
|
||||
|
||||
@property
|
||||
def name(self) -> str:
|
||||
return self.data.get('name')
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, obj: dict):
|
||||
return cls(metadata=obj['metadata'], data=obj['rule'])
|
||||
|
||||
def to_api_format(self, include_version=True) -> dict:
|
||||
"""Convert the TOML rule to the API format."""
|
||||
converted = copy.deepcopy(self.data)
|
||||
if include_version:
|
||||
converted["version"] = self.autobumped_version
|
||||
|
||||
converted = self._post_dict_transform(converted)
|
||||
return converted
|
||||
|
||||
|
||||
class DeprecatedRule(dict):
|
||||
"""Minimal dict object for deprecated rule."""
|
||||
|
||||
def __init__(self, path: Path, *args, **kwargs):
|
||||
def __init__(self, path: Path, contents: DeprecatedRuleContents, *args, **kwargs):
|
||||
super(DeprecatedRule, self).__init__(*args, **kwargs)
|
||||
self.path = path
|
||||
self.contents = contents
|
||||
|
||||
def __repr__(self):
|
||||
return f'{type(self).__name__}(contents={self.contents}, path={self.path})'
|
||||
|
||||
@property
|
||||
def id(self) -> str:
|
||||
@@ -549,33 +614,6 @@ class DeprecatedRule(dict):
|
||||
def name(self) -> str:
|
||||
return self.contents.name
|
||||
|
||||
@property
|
||||
def contents(self):
|
||||
@dataclass
|
||||
class Contents(BaseRuleContents):
|
||||
metadata: dict
|
||||
data: dict
|
||||
|
||||
@property
|
||||
def id(self) -> str:
|
||||
return self.data.get('rule_id')
|
||||
|
||||
@property
|
||||
def name(self) -> str:
|
||||
return self.data.get('name')
|
||||
|
||||
def to_api_format(self, include_version=True) -> dict:
|
||||
"""Convert the TOML rule to the API format."""
|
||||
converted = copy.deepcopy(self.data)
|
||||
if include_version:
|
||||
converted["version"] = self.autobumped_version
|
||||
|
||||
converted = self._post_dict_transform(converted)
|
||||
return converted
|
||||
|
||||
contents = Contents(self.get('metadata'), self.get('rule'))
|
||||
return contents
|
||||
|
||||
|
||||
def downgrade_contents_from_rule(rule: TOMLRule, target_version: str) -> dict:
|
||||
"""Generate the downgraded contents from a rule."""
|
||||
|
||||
@@ -15,7 +15,7 @@ import pytoml
|
||||
|
||||
from . import utils
|
||||
from .mappings import RtaMappings
|
||||
from .rule import DeprecatedRule, TOMLRule, TOMLRuleContents
|
||||
from .rule import DeprecatedRule, DeprecatedRuleContents, TOMLRule, TOMLRuleContents
|
||||
from .schemas import definitions
|
||||
from .utils import get_path, cached
|
||||
|
||||
@@ -76,6 +76,32 @@ def metadata_filter(**metadata) -> Callable[[TOMLRule], bool]:
|
||||
production_filter = metadata_filter(maturity="production")
|
||||
|
||||
|
||||
def load_locks_from_tag(remote: str, tag: str) -> (str, dict, dict):
|
||||
"""Loads version and deprecated lock files from git tag."""
|
||||
import json
|
||||
git = utils.make_git()
|
||||
|
||||
exists_args = ['ls-remote']
|
||||
if remote:
|
||||
exists_args.append(remote)
|
||||
exists_args.append(f'refs/tags/{tag}')
|
||||
|
||||
assert git(*exists_args), f'tag: {tag} does not exist in {remote or "local"}'
|
||||
|
||||
fetch_tags = ['fetch']
|
||||
if remote:
|
||||
fetch_tags += [remote, '--tags', '-f', tag]
|
||||
else:
|
||||
fetch_tags += ['--tags', '-f', tag]
|
||||
|
||||
git(*fetch_tags)
|
||||
|
||||
commit_hash = git('rev-list', '-1', tag)
|
||||
version = json.loads(git('show', f'{tag}:etc/version.lock.json'))
|
||||
deprecated = json.loads(git('show', f'{tag}:etc/deprecated_rules.json'))
|
||||
return commit_hash, version, deprecated
|
||||
|
||||
|
||||
@dataclass
|
||||
class BaseCollection:
|
||||
"""Base class for collections."""
|
||||
@@ -127,6 +153,7 @@ class RuleCollection(BaseCollection):
|
||||
self.frozen = False
|
||||
|
||||
self._toml_load_cache: Dict[Path, dict] = {}
|
||||
self._version_lock: Optional[dict] = None
|
||||
|
||||
for rule in (rules or []):
|
||||
self.add_rule(rule)
|
||||
@@ -193,15 +220,18 @@ class RuleCollection(BaseCollection):
|
||||
def load_dict(self, obj: dict, path: Optional[Path] = None) -> Union[TOMLRule, DeprecatedRule]:
|
||||
# bypass rule object load (load_dict) and load as a dict only
|
||||
if obj.get('metadata', {}).get('maturity', '') == 'deprecated':
|
||||
deprecated_rule = DeprecatedRule(path, **obj)
|
||||
contents = DeprecatedRuleContents.from_dict(obj)
|
||||
contents.set_version_lock(self._version_lock)
|
||||
deprecated_rule = DeprecatedRule(path, contents)
|
||||
self.add_deprecated_rule(deprecated_rule)
|
||||
return deprecated_rule
|
||||
else:
|
||||
# obj['_version_lock'] = self._version_lock
|
||||
contents = TOMLRuleContents.from_dict(obj)
|
||||
contents.set_version_lock(self._version_lock)
|
||||
rule = TOMLRule(path=path, contents=contents)
|
||||
self.add_rule(rule)
|
||||
|
||||
return rule
|
||||
return rule
|
||||
|
||||
def load_file(self, path: Path) -> Union[TOMLRule, DeprecatedRule]:
|
||||
try:
|
||||
@@ -225,11 +255,21 @@ class RuleCollection(BaseCollection):
|
||||
print(f"Error loading rule in {path}")
|
||||
raise
|
||||
|
||||
def load_git_branch(self, branch: str):
|
||||
def load_git_tag(self, branch: str, remote: Optional[str] = None):
|
||||
"""Load rules from a Git branch."""
|
||||
from .version_lock import VersionLock
|
||||
|
||||
commit_hash, v_lock, d_lock = load_locks_from_tag(remote, branch)
|
||||
|
||||
v_lock_name_prefix = f'{remote}/' if remote else ''
|
||||
v_lock_name = f'{v_lock_name_prefix}{branch}-{commit_hash}'
|
||||
|
||||
version_lock = VersionLock(version_lock=v_lock, deprecated_lock=d_lock, name=v_lock_name)
|
||||
self._version_lock = version_lock
|
||||
|
||||
git = utils.make_git()
|
||||
rules_dir = DEFAULT_RULES_DIR.relative_to(get_path("."))
|
||||
paths = git("ls-files", "--with-tree", branch, rules_dir).splitlines()
|
||||
paths = git("ls-tree", "-r", "--name-only", branch, rules_dir).splitlines()
|
||||
|
||||
for path in paths:
|
||||
path = Path(path)
|
||||
|
||||
+150
-134
@@ -24,143 +24,159 @@ def _convert_lock_version(stack_version: Optional[str]) -> Version:
|
||||
return max(Version(stack_version), MIN_LOCK_VERSION_DEFAULT)
|
||||
|
||||
|
||||
def get_locked_version(rule_id: str, min_stack_version: Optional[str] = None) -> Optional[int]:
|
||||
rules_versions = load_versions()
|
||||
|
||||
if rule_id in rules_versions:
|
||||
latest_version_info = rules_versions[rule_id]
|
||||
stack_version_info = latest_version_info.get("previous", {}).get(min_stack_version, latest_version_info)
|
||||
return stack_version_info['version']
|
||||
|
||||
|
||||
def get_locked_hash(rule_id: str, min_stack_version: Optional[str] = None) -> Optional[str]:
|
||||
rules_versions = load_versions()
|
||||
|
||||
# Get the version info matching the min_stack_version if present
|
||||
if rule_id in rules_versions:
|
||||
latest_version_info = rules_versions[rule_id]
|
||||
stack_version_info = latest_version_info.get("previous", {}).get(min_stack_version, latest_version_info)
|
||||
existing_sha256: str = stack_version_info['sha256']
|
||||
return existing_sha256
|
||||
|
||||
|
||||
def manage_versions(rules: RuleCollection,
|
||||
exclude_version_update=False, save_changes=False,
|
||||
verbose=True) -> (List[str], List[str], List[str]):
|
||||
"""Update the contents of the version.lock file and optionally save changes."""
|
||||
from .packaging import current_stack_version
|
||||
|
||||
current_versions = deepcopy(load_versions())
|
||||
versions_hash = dict_hash(current_versions)
|
||||
rule_deprecations = load_etc_dump(ETC_DEPRECATED_RULES_FILE)
|
||||
|
||||
verbose_echo = click.echo if verbose else (lambda x: None)
|
||||
|
||||
already_deprecated = set(rule_deprecations)
|
||||
deprecated_rules = set(rules.deprecated.id_map)
|
||||
new_rules = set(rule.id for rule in rules if rule.contents.latest_version is None) - deprecated_rules
|
||||
changed_rules = set(rule.id for rule in rules if rule.contents.is_dirty) - deprecated_rules
|
||||
|
||||
# manage deprecated rules
|
||||
newly_deprecated = deprecated_rules - already_deprecated
|
||||
|
||||
if not (new_rules or changed_rules or newly_deprecated):
|
||||
return list(changed_rules), list(new_rules), list(newly_deprecated)
|
||||
|
||||
verbose_echo('Rule changes detected!')
|
||||
|
||||
for rule in rules:
|
||||
if rule.contents.metadata.maturity == "production" or rule.id in newly_deprecated:
|
||||
# assume that older stacks are always locked first
|
||||
min_stack = _convert_lock_version(rule.contents.metadata.min_stack_version)
|
||||
|
||||
lock_info = rule.contents.lock_info(bump=not exclude_version_update)
|
||||
current_rule_lock: dict = current_versions.setdefault(rule.id, {})
|
||||
|
||||
# scenarios to handle, assuming older stacks are always locked first:
|
||||
# 1) no breaking changes ever made or the first time a rule is created
|
||||
# 2) on the latest, after a breaking change has been locked
|
||||
# 3) on the latest stack, locking in a breaking change
|
||||
# 4) on an old stack, after a breaking change has been made
|
||||
latest_locked_stack_version = _convert_lock_version(current_rule_lock.get("min_stack_version"))
|
||||
|
||||
if not current_rule_lock or min_stack == latest_locked_stack_version:
|
||||
# 1) no breaking changes ever made or the first time a rule is created
|
||||
# 2) on the latest, after a breaking change has been locked
|
||||
current_rule_lock.update(lock_info)
|
||||
|
||||
# add the min_stack_version to the lock if it's explicitly set
|
||||
if rule.contents.metadata.min_stack_version is not None:
|
||||
current_rule_lock["min_stack_version"] = str(min_stack)
|
||||
|
||||
elif min_stack > latest_locked_stack_version:
|
||||
# 3) on the latest stack, locking in a breaking change
|
||||
previous_lock_info = {
|
||||
"rule_name": current_rule_lock["rule_name"],
|
||||
"sha256": current_rule_lock["sha256"],
|
||||
"version": current_rule_lock["version"],
|
||||
}
|
||||
current_rule_lock.setdefault("previous", {})
|
||||
|
||||
# move the current locked info into the previous section
|
||||
current_rule_lock["previous"][str(latest_locked_stack_version)] = previous_lock_info
|
||||
|
||||
# overwrite the "latest" part of the lock at the top level
|
||||
current_rule_lock.update(lock_info, min_stack_version=str(min_stack))
|
||||
|
||||
elif min_stack < latest_locked_stack_version:
|
||||
# 4) on an old stack, after a breaking change has been made
|
||||
assert str(min_stack) in current_rule_lock.get("previous", {}), \
|
||||
f"Expected {rule.id} @ v{min_stack} in the rule lock"
|
||||
|
||||
# TODO: Figure out whether we support locking old versions and if we want to
|
||||
# "leave room" by skipping versions when breaking changes are made.
|
||||
# We can still inspect the version lock manually after locks are made,
|
||||
# since it's a good summary of everything that happens
|
||||
current_rule_lock["previous"][str(min_stack)] = lock_info
|
||||
continue
|
||||
else:
|
||||
raise RuntimeError("Unreachable code")
|
||||
|
||||
for rule in rules.deprecated:
|
||||
if rule.id in newly_deprecated:
|
||||
rule_deprecations[rule.id] = {
|
||||
"rule_name": rule.name,
|
||||
"stack_version": current_stack_version,
|
||||
"deprecation_date": rule.contents.metadata['deprecation_date']
|
||||
}
|
||||
|
||||
if save_changes or verbose:
|
||||
click.echo(f' - {len(changed_rules)} changed rules')
|
||||
click.echo(f' - {len(new_rules)} new rules')
|
||||
click.echo(f' - {len(newly_deprecated)} newly deprecated rules')
|
||||
|
||||
if not save_changes:
|
||||
verbose_echo('run `build-release --update-version-lock` to update version.lock.json and deprecated_rules.json')
|
||||
return list(changed_rules), list(new_rules), list(newly_deprecated)
|
||||
|
||||
new_hash = dict_hash(current_versions)
|
||||
|
||||
if versions_hash != new_hash:
|
||||
save_etc_dump(current_versions, ETC_VERSION_LOCK_FILE)
|
||||
click.echo('Updated version.lock.json file')
|
||||
|
||||
# reset the cache
|
||||
load_versions.clear()
|
||||
|
||||
if newly_deprecated:
|
||||
save_etc_dump(rule_deprecations, ETC_DEPRECATED_RULES_FILE)
|
||||
click.echo('Updated deprecated_rules.json file')
|
||||
|
||||
return changed_rules, list(new_rules), newly_deprecated
|
||||
|
||||
|
||||
@cached
|
||||
def load_versions():
|
||||
"""Load the versions file."""
|
||||
return load_etc_dump(ETC_VERSION_LOCK_FILE)
|
||||
|
||||
|
||||
def save_versions(current_versions: dict):
|
||||
save_etc_dump(current_versions, ETC_VERSION_LOCK_FILE)
|
||||
print('Updated version.lock.json file')
|
||||
class VersionLock:
|
||||
"""Version handling for rule files and collections."""
|
||||
|
||||
def __init__(self, version_lock_file: Optional[str] = None, deprecated_lock_file: Optional[str] = None,
|
||||
version_lock: Optional[dict] = None, deprecated_lock: Optional[dict] = None,
|
||||
name: Optional[str] = None):
|
||||
assert (version_lock_file or version_lock), 'Must provide version lock file or contents'
|
||||
assert (deprecated_lock_file or deprecated_lock), 'Must provide deprecated lock file or contents'
|
||||
|
||||
self.name = name
|
||||
self.version_lock_file = version_lock_file
|
||||
self.deprecated_lock_file = deprecated_lock_file
|
||||
|
||||
self.version_lock = load_etc_dump(self.version_lock_file) if version_lock_file else version_lock
|
||||
self.deprecated_lock = load_etc_dump(self.deprecated_lock_file) if deprecated_lock_file else deprecated_lock
|
||||
|
||||
def save_versions(self, current_versions: dict):
|
||||
save_etc_dump(current_versions, self.version_lock_file)
|
||||
print('Updated version.lock.json file')
|
||||
|
||||
def get_locked_version(self, rule_id: str, min_stack_version: Optional[str] = None) -> Optional[int]:
|
||||
if rule_id in self.version_lock:
|
||||
latest_version_info = self.version_lock[rule_id]
|
||||
stack_version_info = latest_version_info.get("previous", {}).get(min_stack_version, latest_version_info)
|
||||
return stack_version_info['version']
|
||||
|
||||
def get_locked_hash(self, rule_id: str, min_stack_version: Optional[str] = None) -> Optional[str]:
|
||||
"""Get the version info matching the min_stack_version if present."""
|
||||
if rule_id in self.version_lock:
|
||||
latest_version_info = self.version_lock[rule_id]
|
||||
stack_version_info = latest_version_info.get("previous", {}).get(min_stack_version, latest_version_info)
|
||||
existing_sha256: str = stack_version_info['sha256']
|
||||
return existing_sha256
|
||||
|
||||
def manage_versions(self, rules: RuleCollection,
|
||||
exclude_version_update=False, save_changes=False,
|
||||
verbose=True) -> (List[str], List[str], List[str]):
|
||||
"""Update the contents of the version.lock file and optionally save changes."""
|
||||
from .packaging import current_stack_version
|
||||
|
||||
current_version_lock = deepcopy(self.version_lock)
|
||||
version_lock_hash = dict_hash(current_version_lock)
|
||||
current_deprecated_lock = deepcopy(self.deprecated_lock)
|
||||
|
||||
verbose_echo = click.echo if verbose else (lambda x: None)
|
||||
|
||||
already_deprecated = set(current_deprecated_lock)
|
||||
deprecated_rules = set(rules.deprecated.id_map)
|
||||
new_rules = set(rule.id for rule in rules if rule.contents.latest_version is None) - deprecated_rules
|
||||
changed_rules = set(rule.id for rule in rules if rule.contents.is_dirty) - deprecated_rules
|
||||
|
||||
# manage deprecated rules
|
||||
newly_deprecated = deprecated_rules - already_deprecated
|
||||
|
||||
if not (new_rules or changed_rules or newly_deprecated):
|
||||
return list(changed_rules), list(new_rules), list(newly_deprecated)
|
||||
|
||||
verbose_echo('Rule changes detected!')
|
||||
|
||||
for rule in rules:
|
||||
if rule.contents.metadata.maturity == "production" or rule.id in newly_deprecated:
|
||||
# assume that older stacks are always locked first
|
||||
min_stack = _convert_lock_version(rule.contents.metadata.min_stack_version)
|
||||
|
||||
lock_info = rule.contents.lock_info(bump=not exclude_version_update)
|
||||
current_rule_lock: dict = current_version_lock.setdefault(rule.id, {})
|
||||
|
||||
# scenarios to handle, assuming older stacks are always locked first:
|
||||
# 1) no breaking changes ever made or the first time a rule is created
|
||||
# 2) on the latest, after a breaking change has been locked
|
||||
# 3) on the latest stack, locking in a breaking change
|
||||
# 4) on an old stack, after a breaking change has been made
|
||||
latest_locked_stack_version = _convert_lock_version(current_rule_lock.get("min_stack_version"))
|
||||
|
||||
if not current_rule_lock or min_stack == latest_locked_stack_version:
|
||||
# 1) no breaking changes ever made or the first time a rule is created
|
||||
# 2) on the latest, after a breaking change has been locked
|
||||
current_rule_lock.update(lock_info)
|
||||
|
||||
# add the min_stack_version to the lock if it's explicitly set
|
||||
if rule.contents.metadata.min_stack_version is not None:
|
||||
current_rule_lock["min_stack_version"] = str(min_stack)
|
||||
|
||||
elif min_stack > latest_locked_stack_version:
|
||||
# 3) on the latest stack, locking in a breaking change
|
||||
previous_lock_info = {
|
||||
"rule_name": current_rule_lock["rule_name"],
|
||||
"sha256": current_rule_lock["sha256"],
|
||||
"version": current_rule_lock["version"],
|
||||
}
|
||||
current_rule_lock.setdefault("previous", {})
|
||||
|
||||
# move the current locked info into the previous section
|
||||
current_rule_lock["previous"][str(latest_locked_stack_version)] = previous_lock_info
|
||||
|
||||
# overwrite the "latest" part of the lock at the top level
|
||||
current_rule_lock.update(lock_info, min_stack_version=str(min_stack))
|
||||
|
||||
elif min_stack < latest_locked_stack_version:
|
||||
# 4) on an old stack, after a breaking change has been made
|
||||
assert str(min_stack) in current_rule_lock.get("previous", {}), \
|
||||
f"Expected {rule.id} @ v{min_stack} in the rule lock"
|
||||
|
||||
# TODO: Figure out whether we support locking old versions and if we want to
|
||||
# "leave room" by skipping versions when breaking changes are made.
|
||||
# We can still inspect the version lock manually after locks are made,
|
||||
# since it's a good summary of everything that happens
|
||||
current_rule_lock["previous"][str(min_stack)] = lock_info
|
||||
continue
|
||||
else:
|
||||
raise RuntimeError("Unreachable code")
|
||||
|
||||
for rule in rules.deprecated:
|
||||
if rule.id in newly_deprecated:
|
||||
current_deprecated_lock[rule.id] = {
|
||||
"rule_name": rule.name,
|
||||
"stack_version": current_stack_version,
|
||||
"deprecation_date": rule.contents.metadata['deprecation_date']
|
||||
}
|
||||
|
||||
if save_changes or verbose:
|
||||
click.echo(f' - {len(changed_rules)} changed rules')
|
||||
click.echo(f' - {len(new_rules)} new rules')
|
||||
click.echo(f' - {len(newly_deprecated)} newly deprecated rules')
|
||||
|
||||
if not save_changes:
|
||||
verbose_echo(
|
||||
'run `build-release --update-version-lock` to update version.lock.json and deprecated_rules.json')
|
||||
return list(changed_rules), list(new_rules), list(newly_deprecated)
|
||||
|
||||
new_hash = dict_hash(current_version_lock)
|
||||
|
||||
if version_lock_hash != new_hash:
|
||||
save_etc_dump(current_version_lock, ETC_VERSION_LOCK_FILE)
|
||||
click.echo('Updated version.lock.json file')
|
||||
|
||||
# reset local version lock
|
||||
self.version_lock = current_version_lock
|
||||
|
||||
if newly_deprecated:
|
||||
save_etc_dump(current_deprecated_lock, ETC_DEPRECATED_RULES_FILE)
|
||||
click.echo('Updated deprecated_rules.json file')
|
||||
|
||||
# reset local version lock
|
||||
self.deprecated_lock = current_deprecated_lock
|
||||
|
||||
return changed_rules, list(new_rules), newly_deprecated
|
||||
|
||||
|
||||
default_version_lock = VersionLock(ETC_VERSION_LOCK_FILE, ETC_DEPRECATED_RULES_FILE, name='default')
|
||||
|
||||
@@ -14,7 +14,7 @@ import eql
|
||||
|
||||
import kql
|
||||
from detection_rules import attack
|
||||
from detection_rules.version_lock import load_versions
|
||||
from detection_rules.version_lock import default_version_lock
|
||||
from detection_rules.rule import QueryRuleData
|
||||
from detection_rules.rule_loader import FILE_PATTERN
|
||||
from detection_rules.schemas import definitions
|
||||
@@ -377,7 +377,7 @@ class TestRuleMetadata(BaseRuleTest):
|
||||
|
||||
def test_deprecated_rules(self):
|
||||
"""Test that deprecated rules are properly handled."""
|
||||
versions = load_versions()
|
||||
versions = default_version_lock.version_lock
|
||||
deprecations = load_etc_dump('deprecated_rules.json')
|
||||
deprecated_rules = {}
|
||||
rules_path = get_path('rules')
|
||||
@@ -443,7 +443,7 @@ class TestRuleMetadata(BaseRuleTest):
|
||||
|
||||
def test_rule_demotions(self):
|
||||
"""Test to ensure a locked rule is not dropped to development, only deprecated"""
|
||||
versions = load_versions()
|
||||
versions = default_version_lock.version_lock
|
||||
failures = []
|
||||
|
||||
for rule in self.all_rules:
|
||||
|
||||
Reference in New Issue
Block a user