From 0efae3a52ed20d46c48245f15273dc6083d2e951 Mon Sep 17 00:00:00 2001 From: Justin Ibarra Date: Mon, 15 Nov 2021 08:46:12 -0900 Subject: [PATCH] Move version lock code to object for portability (#1553) * Move version lock code to object for portability * use cached_property to bypass frozen dataclass and set property * replace load_versions function --- detection_rules/devtools.py | 16 +- detection_rules/packaging.py | 4 +- detection_rules/rule.py | 106 ++++++++---- detection_rules/rule_loader.py | 52 +++++- detection_rules/version_lock.py | 284 +++++++++++++++++--------------- tests/test_all_rules.py | 6 +- 6 files changed, 277 insertions(+), 191 deletions(-) diff --git a/detection_rules/devtools.py b/detection_rules/devtools.py index 6d74b76da..40fe30b5c 100644 --- a/detection_rules/devtools.py +++ b/detection_rules/devtools.py @@ -29,7 +29,7 @@ from .ghwrap import GithubClient from .main import root from .misc import PYTHON_LICENSE, add_client, client_error from .packaging import PACKAGE_FILE, Package, RELEASE_DIR, current_stack_version -from .version_lock import manage_versions, load_versions +from .version_lock import default_version_lock from .rule import AnyRuleData, BaseRuleData, QueryRuleData, TOMLRule from .rule_loader import RuleCollection, production_filter from .schemas import definitions @@ -71,7 +71,7 @@ def build_release(config_file, update_version_lock, release=None, verbose=True): package = Package.from_config(config, verbose=verbose) if update_version_lock: - manage_versions(package.rules, save_changes=True, verbose=verbose) + default_version_lock.manage_versions(package.rules, save_changes=True, verbose=verbose) package.save(verbose=verbose) @@ -176,8 +176,6 @@ def prune_staging_area(target_stack_version: str, dry_run: bool): @click.argument('rule-ids', nargs=-1, required=False) def update_lock_versions(rule_ids): """Update rule hashes in version.lock.json file without bumping version.""" - from .packaging import manage_versions - rules = RuleCollection.default() if rule_ids: @@ -189,7 +187,7 @@ def update_lock_versions(rule_ids): return # this command may not function as expected anymore due to previous changes eliminating the use of add_new=False - changed, new, _ = manage_versions(rules, exclude_version_update=True, save_changes=True) + changed, new, _ = default_version_lock.manage_versions(rules, exclude_version_update=True, save_changes=True) if not changed: click.echo('No hashes updated') @@ -648,7 +646,7 @@ def search_rule_prs(ctx, no_loop, query, columns, language, token, threads): @click.pass_context def deprecate_rule(ctx: click.Context, rule_file: Path): """Deprecate a rule.""" - version_info = load_versions() + version_info = default_version_lock.version_lock rule_collection = RuleCollection() contents = rule_collection.load_file(rule_file).contents rule = TOMLRule(path=rule_file, contents=contents) @@ -659,12 +657,6 @@ def deprecate_rule(ctx: click.Context, rule_file: Path): ctx.exit() today = time.strftime('%Y/%m/%d') - - new_meta = { - 'updated_date': today, - 'deprecation_date': today, - 'maturity': 'deprecated' - } deprecated_path = get_path('rules', '_deprecated', rule_file.name) # create the new rule and save it diff --git a/detection_rules/packaging.py b/detection_rules/packaging.py index 0137f7dce..479e12258 100644 --- a/detection_rules/packaging.py +++ b/detection_rules/packaging.py @@ -23,7 +23,7 @@ from .rule import TOMLRule, QueryRuleData, ThreatMapping from .rule_loader import DeprecatedCollection, RuleCollection, DEFAULT_RULES_DIR from .schemas import definitions from .utils import Ndjson, get_path, get_etc_path, load_etc_dump -from .version_lock import manage_versions +from .version_lock import default_version_lock RELEASE_DIR = get_path("releases") PACKAGE_FILE = get_etc_path('packages.yml') @@ -93,7 +93,7 @@ class Package(object): self.rules = self.rules.filter(lambda r: max_version >= r.contents.latest_version) self.changed_ids, self.new_ids, self.removed_ids = \ - manage_versions(self.rules, verbose=verbose, save_changes=False) + default_version_lock.manage_versions(self.rules, verbose=verbose, save_changes=False) @classmethod def load_configs(cls): diff --git a/detection_rules/rule.py b/detection_rules/rule.py index b99fea990..6523f36e3 100644 --- a/detection_rules/rule.py +++ b/detection_rules/rule.py @@ -382,6 +382,11 @@ class BaseRuleContents(ABC): def name(self): pass + @property + @abstractmethod + def version_lock(self): + pass + def lock_info(self, bump=True) -> dict: version = self.autobumped_version if bump else (self.latest_version or 1) contents = {"rule_name": self.name, "sha256": self.sha256(), "version": version} @@ -391,9 +396,7 @@ class BaseRuleContents(ABC): @property def is_dirty(self) -> Optional[bool]: """Determine if the rule has changed since its version was locked.""" - from .version_lock import get_locked_hash - - existing_sha256 = get_locked_hash(self.id, self.metadata.get('min_stack_version')) + existing_sha256 = self.version_lock.get_locked_hash(self.id, self.metadata.get('min_stack_version')) if existing_sha256 is not None: return existing_sha256 != self.sha256() @@ -401,9 +404,7 @@ class BaseRuleContents(ABC): @property def latest_version(self) -> Optional[int]: """Retrieve the latest known version of the rule.""" - from .version_lock import get_locked_version - - return get_locked_version(self.id, self.metadata.get('min_stack_version')) + return self.version_lock.get_locked_version(self.id, self.metadata.get('min_stack_version')) @property def autobumped_version(self) -> Optional[int]: @@ -443,6 +444,23 @@ class TOMLRuleContents(BaseRuleContents, MarshmallowDataclassMixin): """Rule object which maps directly to the TOML layout.""" metadata: RuleMeta data: AnyRuleData = field(metadata=dict(data_key="rule")) + # _version_lock: Optional[Any] = None + + @cached_property + def version_lock(self): + # VersionLock + from .version_lock import default_version_lock + + return getattr(self, '_version_lock', None) or default_version_lock + + def set_version_lock(self, value): + from .version_lock import VersionLock + + if value and not isinstance(value, VersionLock): + raise TypeError(f'version lock property must be set with VersionLock objects only. Got {type(value)}') + + # circumvent frozen class + self.__dict__['_version_lock'] = value @classmethod def all_rule_types(cls) -> set: @@ -534,12 +552,59 @@ class TOMLRule: f.write('\n') +@dataclass(frozen=True) +class DeprecatedRuleContents(BaseRuleContents): + metadata: dict + data: dict + + @cached_property + def version_lock(self): + # VersionLock + from .version_lock import default_version_lock + + return getattr(self, '_version_lock', None) or default_version_lock + + def set_version_lock(self, value): + from .version_lock import VersionLock + + if value and not isinstance(value, VersionLock): + raise TypeError(f'version lock property must be set with VersionLock objects only. Got {type(value)}') + + # circumvent frozen class + self.__dict__['_version_lock'] = value + + @property + def id(self) -> str: + return self.data.get('rule_id') + + @property + def name(self) -> str: + return self.data.get('name') + + @classmethod + def from_dict(cls, obj: dict): + return cls(metadata=obj['metadata'], data=obj['rule']) + + def to_api_format(self, include_version=True) -> dict: + """Convert the TOML rule to the API format.""" + converted = copy.deepcopy(self.data) + if include_version: + converted["version"] = self.autobumped_version + + converted = self._post_dict_transform(converted) + return converted + + class DeprecatedRule(dict): """Minimal dict object for deprecated rule.""" - def __init__(self, path: Path, *args, **kwargs): + def __init__(self, path: Path, contents: DeprecatedRuleContents, *args, **kwargs): super(DeprecatedRule, self).__init__(*args, **kwargs) self.path = path + self.contents = contents + + def __repr__(self): + return f'{type(self).__name__}(contents={self.contents}, path={self.path})' @property def id(self) -> str: @@ -549,33 +614,6 @@ class DeprecatedRule(dict): def name(self) -> str: return self.contents.name - @property - def contents(self): - @dataclass - class Contents(BaseRuleContents): - metadata: dict - data: dict - - @property - def id(self) -> str: - return self.data.get('rule_id') - - @property - def name(self) -> str: - return self.data.get('name') - - def to_api_format(self, include_version=True) -> dict: - """Convert the TOML rule to the API format.""" - converted = copy.deepcopy(self.data) - if include_version: - converted["version"] = self.autobumped_version - - converted = self._post_dict_transform(converted) - return converted - - contents = Contents(self.get('metadata'), self.get('rule')) - return contents - def downgrade_contents_from_rule(rule: TOMLRule, target_version: str) -> dict: """Generate the downgraded contents from a rule.""" diff --git a/detection_rules/rule_loader.py b/detection_rules/rule_loader.py index de35d131c..129abc32e 100644 --- a/detection_rules/rule_loader.py +++ b/detection_rules/rule_loader.py @@ -15,7 +15,7 @@ import pytoml from . import utils from .mappings import RtaMappings -from .rule import DeprecatedRule, TOMLRule, TOMLRuleContents +from .rule import DeprecatedRule, DeprecatedRuleContents, TOMLRule, TOMLRuleContents from .schemas import definitions from .utils import get_path, cached @@ -76,6 +76,32 @@ def metadata_filter(**metadata) -> Callable[[TOMLRule], bool]: production_filter = metadata_filter(maturity="production") +def load_locks_from_tag(remote: str, tag: str) -> (str, dict, dict): + """Loads version and deprecated lock files from git tag.""" + import json + git = utils.make_git() + + exists_args = ['ls-remote'] + if remote: + exists_args.append(remote) + exists_args.append(f'refs/tags/{tag}') + + assert git(*exists_args), f'tag: {tag} does not exist in {remote or "local"}' + + fetch_tags = ['fetch'] + if remote: + fetch_tags += [remote, '--tags', '-f', tag] + else: + fetch_tags += ['--tags', '-f', tag] + + git(*fetch_tags) + + commit_hash = git('rev-list', '-1', tag) + version = json.loads(git('show', f'{tag}:etc/version.lock.json')) + deprecated = json.loads(git('show', f'{tag}:etc/deprecated_rules.json')) + return commit_hash, version, deprecated + + @dataclass class BaseCollection: """Base class for collections.""" @@ -127,6 +153,7 @@ class RuleCollection(BaseCollection): self.frozen = False self._toml_load_cache: Dict[Path, dict] = {} + self._version_lock: Optional[dict] = None for rule in (rules or []): self.add_rule(rule) @@ -193,15 +220,18 @@ class RuleCollection(BaseCollection): def load_dict(self, obj: dict, path: Optional[Path] = None) -> Union[TOMLRule, DeprecatedRule]: # bypass rule object load (load_dict) and load as a dict only if obj.get('metadata', {}).get('maturity', '') == 'deprecated': - deprecated_rule = DeprecatedRule(path, **obj) + contents = DeprecatedRuleContents.from_dict(obj) + contents.set_version_lock(self._version_lock) + deprecated_rule = DeprecatedRule(path, contents) self.add_deprecated_rule(deprecated_rule) return deprecated_rule else: + # obj['_version_lock'] = self._version_lock contents = TOMLRuleContents.from_dict(obj) + contents.set_version_lock(self._version_lock) rule = TOMLRule(path=path, contents=contents) self.add_rule(rule) - - return rule + return rule def load_file(self, path: Path) -> Union[TOMLRule, DeprecatedRule]: try: @@ -225,11 +255,21 @@ class RuleCollection(BaseCollection): print(f"Error loading rule in {path}") raise - def load_git_branch(self, branch: str): + def load_git_tag(self, branch: str, remote: Optional[str] = None): """Load rules from a Git branch.""" + from .version_lock import VersionLock + + commit_hash, v_lock, d_lock = load_locks_from_tag(remote, branch) + + v_lock_name_prefix = f'{remote}/' if remote else '' + v_lock_name = f'{v_lock_name_prefix}{branch}-{commit_hash}' + + version_lock = VersionLock(version_lock=v_lock, deprecated_lock=d_lock, name=v_lock_name) + self._version_lock = version_lock + git = utils.make_git() rules_dir = DEFAULT_RULES_DIR.relative_to(get_path(".")) - paths = git("ls-files", "--with-tree", branch, rules_dir).splitlines() + paths = git("ls-tree", "-r", "--name-only", branch, rules_dir).splitlines() for path in paths: path = Path(path) diff --git a/detection_rules/version_lock.py b/detection_rules/version_lock.py index fa940d5ca..66dae674f 100644 --- a/detection_rules/version_lock.py +++ b/detection_rules/version_lock.py @@ -24,143 +24,159 @@ def _convert_lock_version(stack_version: Optional[str]) -> Version: return max(Version(stack_version), MIN_LOCK_VERSION_DEFAULT) -def get_locked_version(rule_id: str, min_stack_version: Optional[str] = None) -> Optional[int]: - rules_versions = load_versions() - - if rule_id in rules_versions: - latest_version_info = rules_versions[rule_id] - stack_version_info = latest_version_info.get("previous", {}).get(min_stack_version, latest_version_info) - return stack_version_info['version'] - - -def get_locked_hash(rule_id: str, min_stack_version: Optional[str] = None) -> Optional[str]: - rules_versions = load_versions() - - # Get the version info matching the min_stack_version if present - if rule_id in rules_versions: - latest_version_info = rules_versions[rule_id] - stack_version_info = latest_version_info.get("previous", {}).get(min_stack_version, latest_version_info) - existing_sha256: str = stack_version_info['sha256'] - return existing_sha256 - - -def manage_versions(rules: RuleCollection, - exclude_version_update=False, save_changes=False, - verbose=True) -> (List[str], List[str], List[str]): - """Update the contents of the version.lock file and optionally save changes.""" - from .packaging import current_stack_version - - current_versions = deepcopy(load_versions()) - versions_hash = dict_hash(current_versions) - rule_deprecations = load_etc_dump(ETC_DEPRECATED_RULES_FILE) - - verbose_echo = click.echo if verbose else (lambda x: None) - - already_deprecated = set(rule_deprecations) - deprecated_rules = set(rules.deprecated.id_map) - new_rules = set(rule.id for rule in rules if rule.contents.latest_version is None) - deprecated_rules - changed_rules = set(rule.id for rule in rules if rule.contents.is_dirty) - deprecated_rules - - # manage deprecated rules - newly_deprecated = deprecated_rules - already_deprecated - - if not (new_rules or changed_rules or newly_deprecated): - return list(changed_rules), list(new_rules), list(newly_deprecated) - - verbose_echo('Rule changes detected!') - - for rule in rules: - if rule.contents.metadata.maturity == "production" or rule.id in newly_deprecated: - # assume that older stacks are always locked first - min_stack = _convert_lock_version(rule.contents.metadata.min_stack_version) - - lock_info = rule.contents.lock_info(bump=not exclude_version_update) - current_rule_lock: dict = current_versions.setdefault(rule.id, {}) - - # scenarios to handle, assuming older stacks are always locked first: - # 1) no breaking changes ever made or the first time a rule is created - # 2) on the latest, after a breaking change has been locked - # 3) on the latest stack, locking in a breaking change - # 4) on an old stack, after a breaking change has been made - latest_locked_stack_version = _convert_lock_version(current_rule_lock.get("min_stack_version")) - - if not current_rule_lock or min_stack == latest_locked_stack_version: - # 1) no breaking changes ever made or the first time a rule is created - # 2) on the latest, after a breaking change has been locked - current_rule_lock.update(lock_info) - - # add the min_stack_version to the lock if it's explicitly set - if rule.contents.metadata.min_stack_version is not None: - current_rule_lock["min_stack_version"] = str(min_stack) - - elif min_stack > latest_locked_stack_version: - # 3) on the latest stack, locking in a breaking change - previous_lock_info = { - "rule_name": current_rule_lock["rule_name"], - "sha256": current_rule_lock["sha256"], - "version": current_rule_lock["version"], - } - current_rule_lock.setdefault("previous", {}) - - # move the current locked info into the previous section - current_rule_lock["previous"][str(latest_locked_stack_version)] = previous_lock_info - - # overwrite the "latest" part of the lock at the top level - current_rule_lock.update(lock_info, min_stack_version=str(min_stack)) - - elif min_stack < latest_locked_stack_version: - # 4) on an old stack, after a breaking change has been made - assert str(min_stack) in current_rule_lock.get("previous", {}), \ - f"Expected {rule.id} @ v{min_stack} in the rule lock" - - # TODO: Figure out whether we support locking old versions and if we want to - # "leave room" by skipping versions when breaking changes are made. - # We can still inspect the version lock manually after locks are made, - # since it's a good summary of everything that happens - current_rule_lock["previous"][str(min_stack)] = lock_info - continue - else: - raise RuntimeError("Unreachable code") - - for rule in rules.deprecated: - if rule.id in newly_deprecated: - rule_deprecations[rule.id] = { - "rule_name": rule.name, - "stack_version": current_stack_version, - "deprecation_date": rule.contents.metadata['deprecation_date'] - } - - if save_changes or verbose: - click.echo(f' - {len(changed_rules)} changed rules') - click.echo(f' - {len(new_rules)} new rules') - click.echo(f' - {len(newly_deprecated)} newly deprecated rules') - - if not save_changes: - verbose_echo('run `build-release --update-version-lock` to update version.lock.json and deprecated_rules.json') - return list(changed_rules), list(new_rules), list(newly_deprecated) - - new_hash = dict_hash(current_versions) - - if versions_hash != new_hash: - save_etc_dump(current_versions, ETC_VERSION_LOCK_FILE) - click.echo('Updated version.lock.json file') - - # reset the cache - load_versions.clear() - - if newly_deprecated: - save_etc_dump(rule_deprecations, ETC_DEPRECATED_RULES_FILE) - click.echo('Updated deprecated_rules.json file') - - return changed_rules, list(new_rules), newly_deprecated - - @cached def load_versions(): """Load the versions file.""" return load_etc_dump(ETC_VERSION_LOCK_FILE) -def save_versions(current_versions: dict): - save_etc_dump(current_versions, ETC_VERSION_LOCK_FILE) - print('Updated version.lock.json file') +class VersionLock: + """Version handling for rule files and collections.""" + + def __init__(self, version_lock_file: Optional[str] = None, deprecated_lock_file: Optional[str] = None, + version_lock: Optional[dict] = None, deprecated_lock: Optional[dict] = None, + name: Optional[str] = None): + assert (version_lock_file or version_lock), 'Must provide version lock file or contents' + assert (deprecated_lock_file or deprecated_lock), 'Must provide deprecated lock file or contents' + + self.name = name + self.version_lock_file = version_lock_file + self.deprecated_lock_file = deprecated_lock_file + + self.version_lock = load_etc_dump(self.version_lock_file) if version_lock_file else version_lock + self.deprecated_lock = load_etc_dump(self.deprecated_lock_file) if deprecated_lock_file else deprecated_lock + + def save_versions(self, current_versions: dict): + save_etc_dump(current_versions, self.version_lock_file) + print('Updated version.lock.json file') + + def get_locked_version(self, rule_id: str, min_stack_version: Optional[str] = None) -> Optional[int]: + if rule_id in self.version_lock: + latest_version_info = self.version_lock[rule_id] + stack_version_info = latest_version_info.get("previous", {}).get(min_stack_version, latest_version_info) + return stack_version_info['version'] + + def get_locked_hash(self, rule_id: str, min_stack_version: Optional[str] = None) -> Optional[str]: + """Get the version info matching the min_stack_version if present.""" + if rule_id in self.version_lock: + latest_version_info = self.version_lock[rule_id] + stack_version_info = latest_version_info.get("previous", {}).get(min_stack_version, latest_version_info) + existing_sha256: str = stack_version_info['sha256'] + return existing_sha256 + + def manage_versions(self, rules: RuleCollection, + exclude_version_update=False, save_changes=False, + verbose=True) -> (List[str], List[str], List[str]): + """Update the contents of the version.lock file and optionally save changes.""" + from .packaging import current_stack_version + + current_version_lock = deepcopy(self.version_lock) + version_lock_hash = dict_hash(current_version_lock) + current_deprecated_lock = deepcopy(self.deprecated_lock) + + verbose_echo = click.echo if verbose else (lambda x: None) + + already_deprecated = set(current_deprecated_lock) + deprecated_rules = set(rules.deprecated.id_map) + new_rules = set(rule.id for rule in rules if rule.contents.latest_version is None) - deprecated_rules + changed_rules = set(rule.id for rule in rules if rule.contents.is_dirty) - deprecated_rules + + # manage deprecated rules + newly_deprecated = deprecated_rules - already_deprecated + + if not (new_rules or changed_rules or newly_deprecated): + return list(changed_rules), list(new_rules), list(newly_deprecated) + + verbose_echo('Rule changes detected!') + + for rule in rules: + if rule.contents.metadata.maturity == "production" or rule.id in newly_deprecated: + # assume that older stacks are always locked first + min_stack = _convert_lock_version(rule.contents.metadata.min_stack_version) + + lock_info = rule.contents.lock_info(bump=not exclude_version_update) + current_rule_lock: dict = current_version_lock.setdefault(rule.id, {}) + + # scenarios to handle, assuming older stacks are always locked first: + # 1) no breaking changes ever made or the first time a rule is created + # 2) on the latest, after a breaking change has been locked + # 3) on the latest stack, locking in a breaking change + # 4) on an old stack, after a breaking change has been made + latest_locked_stack_version = _convert_lock_version(current_rule_lock.get("min_stack_version")) + + if not current_rule_lock or min_stack == latest_locked_stack_version: + # 1) no breaking changes ever made or the first time a rule is created + # 2) on the latest, after a breaking change has been locked + current_rule_lock.update(lock_info) + + # add the min_stack_version to the lock if it's explicitly set + if rule.contents.metadata.min_stack_version is not None: + current_rule_lock["min_stack_version"] = str(min_stack) + + elif min_stack > latest_locked_stack_version: + # 3) on the latest stack, locking in a breaking change + previous_lock_info = { + "rule_name": current_rule_lock["rule_name"], + "sha256": current_rule_lock["sha256"], + "version": current_rule_lock["version"], + } + current_rule_lock.setdefault("previous", {}) + + # move the current locked info into the previous section + current_rule_lock["previous"][str(latest_locked_stack_version)] = previous_lock_info + + # overwrite the "latest" part of the lock at the top level + current_rule_lock.update(lock_info, min_stack_version=str(min_stack)) + + elif min_stack < latest_locked_stack_version: + # 4) on an old stack, after a breaking change has been made + assert str(min_stack) in current_rule_lock.get("previous", {}), \ + f"Expected {rule.id} @ v{min_stack} in the rule lock" + + # TODO: Figure out whether we support locking old versions and if we want to + # "leave room" by skipping versions when breaking changes are made. + # We can still inspect the version lock manually after locks are made, + # since it's a good summary of everything that happens + current_rule_lock["previous"][str(min_stack)] = lock_info + continue + else: + raise RuntimeError("Unreachable code") + + for rule in rules.deprecated: + if rule.id in newly_deprecated: + current_deprecated_lock[rule.id] = { + "rule_name": rule.name, + "stack_version": current_stack_version, + "deprecation_date": rule.contents.metadata['deprecation_date'] + } + + if save_changes or verbose: + click.echo(f' - {len(changed_rules)} changed rules') + click.echo(f' - {len(new_rules)} new rules') + click.echo(f' - {len(newly_deprecated)} newly deprecated rules') + + if not save_changes: + verbose_echo( + 'run `build-release --update-version-lock` to update version.lock.json and deprecated_rules.json') + return list(changed_rules), list(new_rules), list(newly_deprecated) + + new_hash = dict_hash(current_version_lock) + + if version_lock_hash != new_hash: + save_etc_dump(current_version_lock, ETC_VERSION_LOCK_FILE) + click.echo('Updated version.lock.json file') + + # reset local version lock + self.version_lock = current_version_lock + + if newly_deprecated: + save_etc_dump(current_deprecated_lock, ETC_DEPRECATED_RULES_FILE) + click.echo('Updated deprecated_rules.json file') + + # reset local version lock + self.deprecated_lock = current_deprecated_lock + + return changed_rules, list(new_rules), newly_deprecated + + +default_version_lock = VersionLock(ETC_VERSION_LOCK_FILE, ETC_DEPRECATED_RULES_FILE, name='default') diff --git a/tests/test_all_rules.py b/tests/test_all_rules.py index bf67f650a..a18e63c52 100644 --- a/tests/test_all_rules.py +++ b/tests/test_all_rules.py @@ -14,7 +14,7 @@ import eql import kql from detection_rules import attack -from detection_rules.version_lock import load_versions +from detection_rules.version_lock import default_version_lock from detection_rules.rule import QueryRuleData from detection_rules.rule_loader import FILE_PATTERN from detection_rules.schemas import definitions @@ -377,7 +377,7 @@ class TestRuleMetadata(BaseRuleTest): def test_deprecated_rules(self): """Test that deprecated rules are properly handled.""" - versions = load_versions() + versions = default_version_lock.version_lock deprecations = load_etc_dump('deprecated_rules.json') deprecated_rules = {} rules_path = get_path('rules') @@ -443,7 +443,7 @@ class TestRuleMetadata(BaseRuleTest): def test_rule_demotions(self): """Test to ensure a locked rule is not dropped to development, only deprecated""" - versions = load_versions() + versions = default_version_lock.version_lock failures = [] for rule in self.all_rules: