From 371e24b2ed002deea60939d7d17687908b759ce4 Mon Sep 17 00:00:00 2001 From: Mika Ayenson Date: Tue, 21 May 2024 16:14:45 -0500 Subject: [PATCH] Revert "[FR] Update Utility Path Computation to use Pathlib (#3659)" This reverts commit 23567c1d0cd855978cb266e583e1bac9c686b410. --- detection_rules/__main__.py | 9 ++++----- detection_rules/attack.py | 14 +++++++------- detection_rules/devtools.py | 10 +++++----- detection_rules/ecs.py | 7 ++++--- detection_rules/endgame.py | 3 ++- detection_rules/integrations.py | 4 ++-- detection_rules/main.py | 8 ++++---- detection_rules/mappings.py | 8 ++++---- detection_rules/misc.py | 2 +- detection_rules/ml.py | 2 +- detection_rules/packaging.py | 30 ++++++++++++++--------------- detection_rules/rule_loader.py | 4 ++-- detection_rules/schemas/__init__.py | 5 +++-- detection_rules/utils.py | 24 +++++++++++------------ detection_rules/version_lock.py | 4 ++-- 15 files changed, 68 insertions(+), 66 deletions(-) diff --git a/detection_rules/__main__.py b/detection_rules/__main__.py index 8d576e3e0..e045df810 100644 --- a/detection_rules/__main__.py +++ b/detection_rules/__main__.py @@ -5,19 +5,18 @@ # coding=utf-8 """Shell for detection-rules.""" +import os import sys -from pathlib import Path import click assert (3, 12) <= sys.version_info < (4, 0), "Only Python 3.12+ supported" - from .main import root # noqa: E402 -CURR_DIR = Path(__file__).resolve().parent -CLI_DIR = CURR_DIR.parent -ROOT_DIR = CLI_DIR.parent +CURR_DIR = os.path.dirname(os.path.abspath(__file__)) +CLI_DIR = os.path.dirname(CURR_DIR) +ROOT_DIR = os.path.dirname(CLI_DIR) BANNER = r""" █▀▀▄ ▄▄▄ ▄▄▄ ▄▄▄ ▄▄▄ ▄▄▄ ▄▄▄ ▄▄▄ ▄ ▄ █▀▀▄ ▄ ▄ ▄ ▄▄▄ ▄▄▄ diff --git a/detection_rules/attack.py b/detection_rules/attack.py index 2178d2c8c..a5908d186 100644 --- a/detection_rules/attack.py +++ b/detection_rules/attack.py @@ -17,8 +17,8 @@ from semver import Version from .utils import cached, clear_caches, get_etc_path, get_etc_glob_path, read_gzip, gzip_compress PLATFORMS = ['Windows', 'macOS', 'Linux'] -CROSSWALK_FILE = get_etc_path('attack-crosswalk.json') -TECHNIQUES_REDIRECT_FILE = get_etc_path('attack-technique-redirects.json') +CROSSWALK_FILE = Path(get_etc_path('attack-crosswalk.json')) +TECHNIQUES_REDIRECT_FILE = Path(get_etc_path('attack-technique-redirects.json')) tactics_map = {} @@ -28,17 +28,17 @@ def load_techniques_redirect() -> dict: return json.loads(TECHNIQUES_REDIRECT_FILE.read_text())['mapping'] -def get_attack_file_path() -> Path: +def get_attack_file_path() -> str: pattern = 'attack-v*.json.gz' attack_file = get_etc_glob_path(pattern) if len(attack_file) < 1: raise FileNotFoundError(f'Missing required {pattern} file') elif len(attack_file) != 1: raise FileExistsError(f'Multiple files found with {pattern} pattern. Only one is allowed') - return Path(attack_file[0]) + return attack_file[0] -_, _attack_path_base = str(get_attack_file_path()).split('-v') +_, _attack_path_base = get_attack_file_path().split('-v') _ext_length = len('.json.gz') CURRENT_ATTACK_VERSION = _attack_path_base[:-_ext_length] @@ -98,7 +98,7 @@ sub_technique_id_list = [t for t in technique_lookup if '.' in t] def refresh_attack_data(save=True) -> (Optional[dict], Optional[bytes]): """Refresh ATT&CK data from Mitre.""" - attack_path = get_attack_file_path() + attack_path = Path(get_attack_file_path()) filename, _, _ = attack_path.name.rsplit('.', 2) def get_version_from_tag(name, pattern='att&ck-v'): @@ -126,7 +126,7 @@ def refresh_attack_data(save=True) -> (Optional[dict], Optional[bytes]): compressed = gzip_compress(json.dumps(attack_data, sort_keys=True)) if save: - new_path = get_etc_path(f'attack-v{latest_version}.json.gz') + new_path = Path(get_etc_path(f'attack-v{latest_version}.json.gz')) new_path.write_bytes(compressed) attack_path.unlink() print(f'Replaced file: {attack_path} with {new_path}') diff --git a/detection_rules/devtools.py b/detection_rules/devtools.py index c6da762c4..71bd2f218 100644 --- a/detection_rules/devtools.py +++ b/detection_rules/devtools.py @@ -90,7 +90,7 @@ def dev_group(): def build_release(config_file, update_version_lock: bool, generate_navigator: bool, generate_docs: str, update_message: str, release=None, verbose=True): """Assemble all the rules into Kibana-ready release files.""" - config = load_dump(str(config_file))['package'] + config = load_dump(config_file)['package'] registry_data = config['registry_data'] if generate_navigator: @@ -441,7 +441,7 @@ def integrations_pr(ctx: click.Context, local_repo: str, token: str, draft: bool stack_version = Package.load_configs()["name"] package_version = Package.load_configs()["registry_data"]["version"] - release_dir = RELEASE_DIR / stack_version / "fleet" / package_version + release_dir = Path(RELEASE_DIR) / stack_version / "fleet" / package_version message = f"[Security Rules] Update security rules package to v{package_version}" if not release_dir.exists(): @@ -581,7 +581,7 @@ def license_check(ctx, ignore_directory): """Check that all code files contain a valid license.""" ignore_directory += ("env",) failed = False - base_path = get_path() + base_path = Path(get_path()) for path in base_path.rglob('*.py'): relative_path = path.relative_to(base_path) @@ -622,7 +622,7 @@ def test_version_lock(branches: tuple, remote: str): finally: diff = git('--no-pager', 'diff', get_etc_path('version.lock.json')) - outfile = get_path() / 'lock-diff.txt' + outfile = Path(get_path()).joinpath('lock-diff.txt') outfile.write_text(diff) click.echo(f'diff saved to {outfile}') @@ -740,7 +740,7 @@ def deprecate_rule(ctx: click.Context, rule_file: Path): deprecation_date=today, maturity='deprecated') contents = dataclasses.replace(rule.contents, metadata=new_meta) - new_rule = TOMLRule(contents=contents, path=deprecated_path) + new_rule = TOMLRule(contents=contents, path=Path(deprecated_path)) new_rule.save_toml() # remove the old rule diff --git a/detection_rules/ecs.py b/detection_rules/ecs.py index 4ec6bdb15..1418c9647 100644 --- a/detection_rules/ecs.py +++ b/detection_rules/ecs.py @@ -9,6 +9,7 @@ import glob import json import os import shutil +from pathlib import Path import eql import eql.types @@ -87,7 +88,7 @@ def get_max_version(include_master=False): versions = get_schema_map().keys() if include_master and any([v.startswith('master') for v in versions]): - return list(ECS_SCHEMAS_DIR.glob('master*'))[0].name + return list(Path(ECS_SCHEMAS_DIR).glob('master*'))[0].name return str(max([Version.parse(v) for v in versions if not v.startswith('master')])) @@ -302,9 +303,9 @@ def download_endpoint_schemas(target: str, overwrite: bool = True) -> None: flattened[f"{root_name}.{f['name']}"] = f['type'] # save schema to disk - ENDPOINT_SCHEMAS_DIR.mkdir(parents=True, exist_ok=True) + Path(ENDPOINT_SCHEMAS_DIR).mkdir(parents=True, exist_ok=True) compressed = gzip_compress(json.dumps(flattened, sort_keys=True, cls=DateTimeEncoder)) - new_path = ENDPOINT_SCHEMAS_DIR / f"endpoint_{target}.json.gz" + new_path = Path(ENDPOINT_SCHEMAS_DIR) / f"endpoint_{target}.json.gz" if overwrite: shutil.rmtree(new_path, ignore_errors=True) with open(new_path, 'wb') as f: diff --git a/detection_rules/endgame.py b/detection_rules/endgame.py index 4ed6bd624..249c515cd 100644 --- a/detection_rules/endgame.py +++ b/detection_rules/endgame.py @@ -7,12 +7,13 @@ import json import shutil import sys +from pathlib import Path import eql from .utils import ETC_DIR, DateTimeEncoder, cached, gzip_compress, read_gzip -ENDGAME_SCHEMA_DIR = ETC_DIR / "endgame_schemas" +ENDGAME_SCHEMA_DIR = Path(ETC_DIR) / "endgame_schemas" class EndgameSchemaManager: diff --git a/detection_rules/integrations.py b/detection_rules/integrations.py index 0c0d11e85..28a1b93d3 100644 --- a/detection_rules/integrations.py +++ b/detection_rules/integrations.py @@ -25,8 +25,8 @@ from .misc import load_current_package_version from .utils import cached, get_etc_path, read_gzip, unzip from .schemas import definitions -MANIFEST_FILE_PATH = get_etc_path('integration-manifests.json.gz') -SCHEMA_FILE_PATH = get_etc_path('integration-schemas.json.gz') +MANIFEST_FILE_PATH = Path(get_etc_path('integration-manifests.json.gz')) +SCHEMA_FILE_PATH = Path(get_etc_path('integration-schemas.json.gz')) _notified_integrations = set() diff --git a/detection_rules/main.py b/detection_rules/main.py index 4a3806010..882e6278e 100644 --- a/detection_rules/main.py +++ b/detection_rules/main.py @@ -79,7 +79,7 @@ def generate_rules_index(ctx: click.Context, query, overwrite, save_files=True): bulk_upload_docs, importable_rules_docs = package.create_bulk_index_body() if save_files: - path = get_path('enriched-rule-indexes', package_hash) + path = Path(get_path('enriched-rule-indexes', package_hash)) path.mkdir(parents=True, exist_ok=overwrite) bulk_upload_docs.dump(path.joinpath('enriched-rules-index-uploadable.ndjson'), sort_keys=True) importable_rules_docs.dump(path.joinpath('enriched-rules-index-importable.ndjson'), sort_keys=True) @@ -431,7 +431,7 @@ def create_dnstwist_index(ctx: click.Context, input_file: click.Path): es_client: Elasticsearch = ctx.obj['es'] click.echo(f'Attempting to load dnstwist data from {input_file}') - dnstwist_data: dict = load_dump(str(input_file)) + dnstwist_data: dict = load_dump(input_file) click.echo(f'{len(dnstwist_data)} records loaded') original_domain = next(r['domain-name'] for r in dnstwist_data if r.get('fuzzer', '') == 'original*') @@ -496,10 +496,10 @@ def create_dnstwist_index(ctx: click.Context, input_file: click.Path): @click.argument('author') def prep_rule(author: str): """Prep the detection threat match rule for dnstwist data with a rule_id and author.""" - rule_template_file = get_etc_path('rule_template_typosquatting_domain.json') + rule_template_file = Path(get_etc_path('rule_template_typosquatting_domain.json')) template_rule = json.loads(rule_template_file.read_text()) template_rule.update(author=[author], rule_id=str(uuid4())) - updated_rule = get_path('rule_typosquatting_domain.ndjson') + updated_rule = Path(get_path('rule_typosquatting_domain.ndjson')) updated_rule.write_text(json.dumps(template_rule, sort_keys=True)) click.echo(f'Rule saved to: {updated_rule}. Import this to Kibana to create alerts on all dnstwist-* indexes') click.echo('Note: you only need to import and enable this rule one time for all dnstwist-* indexes') diff --git a/detection_rules/mappings.py b/detection_rules/mappings.py index 54a9ca1c2..ae7317ff5 100644 --- a/detection_rules/mappings.py +++ b/detection_rules/mappings.py @@ -4,8 +4,8 @@ # 2.0. """RTA to rule mappings.""" +import os from collections import defaultdict -from pathlib import Path from rta import get_available_tests @@ -71,9 +71,9 @@ class RtaMappings: for rta_name in rta_list: # rip off the extension and add .py - rta_name = Path(rta_name).stem - rta_path = (RTA_DIR / rta_name).with_suffix(".py").resolve() - if rta_path.exists(): + rta_name, _ = os.path.splitext(os.path.basename(rta_name)) + rta_path = os.path.abspath(os.path.join(RTA_DIR, rta_name + ".py")) + if os.path.exists(rta_path): rta_files.add(rta_path) return list(sorted(rta_files)) diff --git a/detection_rules/misc.py b/detection_rules/misc.py index 22bd349cb..5c35a722d 100644 --- a/detection_rules/misc.py +++ b/detection_rules/misc.py @@ -271,7 +271,7 @@ def load_current_package_version() -> str: def get_default_config() -> Optional[Path]: - return next(get_path().glob('.detection-rules-cfg.*'), None) + return next(Path(get_path()).glob('.detection-rules-cfg.*'), None) @cached diff --git a/detection_rules/ml.py b/detection_rules/ml.py index d4c2a5adc..6b8ea821b 100644 --- a/detection_rules/ml.py +++ b/detection_rules/ml.py @@ -27,7 +27,7 @@ from .schemas import definitions from .utils import get_path, unzip_to_dict -ML_PATH = get_path('machine-learning') +ML_PATH = Path(get_path('machine-learning')) def info_from_tag(tag: str) -> (Literal['ml'], definitions.MachineLearningType, str, int): diff --git a/detection_rules/packaging.py b/detection_rules/packaging.py index 5dbfd3b89..c8b9804e2 100644 --- a/detection_rules/packaging.py +++ b/detection_rules/packaging.py @@ -69,7 +69,7 @@ def filter_rule(rule: TOMLRule, config_filter: dict, exclude_fields: Optional[di return True -CURRENT_RELEASE_PATH = RELEASE_DIR / load_current_package_version() +CURRENT_RELEASE_PATH = Path(RELEASE_DIR) / load_current_package_version() class Package(object): @@ -99,8 +99,8 @@ class Package(object): @classmethod def load_configs(cls): - """Load configs from packages.yml.""" - return load_etc_dump(str(PACKAGE_FILE))['package'] + """Load configs from packages.yaml.""" + return load_etc_dump(PACKAGE_FILE)['package'] @staticmethod def _package_kibana_notice_file(save_dir): @@ -175,17 +175,17 @@ class Package(object): def save(self, verbose=True): """Save a package and all artifacts.""" - save_dir = RELEASE_DIR / self.name - rules_dir = save_dir / 'rules' - extras_dir = save_dir / 'extras' + save_dir = os.path.join(RELEASE_DIR, self.name) + rules_dir = os.path.join(save_dir, 'rules') + extras_dir = os.path.join(save_dir, 'extras') # remove anything that existed before shutil.rmtree(save_dir, ignore_errors=True) - rules_dir.mkdir(parents=True, exist_ok=True) - extras_dir.mkdir(parents=True, exist_ok=True) + os.makedirs(rules_dir, exist_ok=True) + os.makedirs(extras_dir, exist_ok=True) for rule in self.rules: - rule.save_json(rules_dir / Path(rule.path.name).with_suffix('.json')) + rule.save_json(Path(rules_dir).joinpath(rule.path.name).with_suffix('.json')) self._package_kibana_notice_file(rules_dir) self._package_kibana_index_file(rules_dir) @@ -195,15 +195,15 @@ class Package(object): self.save_release_files(extras_dir, self.changed_ids, self.new_ids, self.removed_ids) # zip all rules only and place in extras - shutil.make_archive(extras_dir / self.name, 'zip', root_dir=rules_dir.parent, base_dir=rules_dir.name) + shutil.make_archive(os.path.join(extras_dir, self.name), 'zip', root_dir=os.path.dirname(rules_dir), + base_dir=os.path.basename(rules_dir)) # zip everything and place in release root - shutil.make_archive( - save_dir / f"{self.name}-all", "zip", root_dir=extras_dir.parent, base_dir=extras_dir.name - ) + shutil.make_archive(os.path.join(save_dir, '{}-all'.format(self.name)), 'zip', + root_dir=os.path.dirname(extras_dir), base_dir=os.path.basename(extras_dir)) if verbose: - click.echo(f'Package saved to: {save_dir}') + click.echo('Package saved to: {}'.format(save_dir)) def export(self, outfile, downgrade_version=None, verbose=True, skip_unsupported=False): """Export rules into a consolidated ndjson file.""" @@ -419,7 +419,7 @@ class Package(object): asset_path = rules_dir / f'{asset["id"]}.json' asset_path.write_text(json.dumps(asset, indent=4, sort_keys=True), encoding="utf-8") - notice_contents = NOTICE_FILE.read_text() + notice_contents = Path(NOTICE_FILE).read_text() readme_text = textwrap.dedent(""" # Prebuilt Security Detection Rules diff --git a/detection_rules/rule_loader.py b/detection_rules/rule_loader.py index 51c7b824a..fbd0364cf 100644 --- a/detection_rules/rule_loader.py +++ b/detection_rules/rule_loader.py @@ -24,8 +24,8 @@ from .rule import ( from .schemas import definitions from .utils import cached, get_path -DEFAULT_RULES_DIR = get_path("rules") -DEFAULT_BBR_DIR = get_path("rules_building_block") +DEFAULT_RULES_DIR = Path(get_path("rules")) +DEFAULT_BBR_DIR = Path(get_path("rules_building_block")) DEFAULT_DEPRECATED_DIR = DEFAULT_RULES_DIR / '_deprecated' RTA_DIR = get_path("rta") FILE_PATTERN = r'^([a-z0-9_])+\.(json|toml)$' diff --git a/detection_rules/schemas/__init__.py b/detection_rules/schemas/__init__.py index acd66982a..b91ee6b0b 100644 --- a/detection_rules/schemas/__init__.py +++ b/detection_rules/schemas/__init__.py @@ -4,6 +4,7 @@ # 2.0. import json from collections import OrderedDict +from pathlib import Path from typing import List, Optional from typing import OrderedDict as OrderedDictType @@ -28,7 +29,7 @@ __all__ = ( "all_versions", ) -SCHEMA_DIR = get_etc_path("api_schemas") +SCHEMA_DIR = Path(get_etc_path("api_schemas")) migrations = {} @@ -53,7 +54,7 @@ def migrate(version: str): @cached def get_schema_file(version: Version, rule_type: str) -> dict: - path = SCHEMA_DIR / str(version) / f"{version}.{rule_type}.json" + path = Path(SCHEMA_DIR) / str(version) / f"{version}.{rule_type}.json" if not path.exists(): raise ValueError(f"Unsupported rule type {rule_type}. Unable to downgrade to {version}") diff --git a/detection_rules/utils.py b/detection_rules/utils.py index 701f4132a..abb5c011a 100644 --- a/detection_rules/utils.py +++ b/detection_rules/utils.py @@ -30,10 +30,10 @@ from eql.utils import load_dump, stream_json_lines import kql -CURR_DIR = Path(__file__).resolve().parent -ROOT_DIR = CURR_DIR.parent -ETC_DIR = ROOT_DIR / "detection_rules" / "etc" -INTEGRATION_RULE_DIR = ROOT_DIR / "rules" / "integrations" +CURR_DIR = os.path.dirname(os.path.abspath(__file__)) +ROOT_DIR = os.path.dirname(CURR_DIR) +ETC_DIR = os.path.join(ROOT_DIR, "detection_rules", "etc") +INTEGRATION_RULE_DIR = os.path.join(ROOT_DIR, "rules", "integrations") class NonelessDict(dict): @@ -85,20 +85,20 @@ def get_json_iter(f): return data -def get_path(*paths) -> Path: +def get_path(*paths) -> str: """Get a file by relative path.""" - return ROOT_DIR.joinpath(*paths) + return os.path.join(ROOT_DIR, *paths) -def get_etc_path(*paths) -> Path: +def get_etc_path(*paths): """Load a file from the detection_rules/etc/ folder.""" - return ETC_DIR.joinpath(*paths) + return os.path.join(ETC_DIR, *paths) -def get_etc_glob_path(*patterns) -> list: +def get_etc_glob_path(*patterns): """Load a file from the detection_rules/etc/ folder.""" pattern = os.path.join(*patterns) - return glob.glob(str(ETC_DIR / pattern)) + return glob.glob(os.path.join(ETC_DIR, pattern)) def get_etc_file(name, mode="r"): @@ -109,12 +109,12 @@ def get_etc_file(name, mode="r"): def load_etc_dump(*path): """Load a json/yml/toml file from the detection_rules/etc/ folder.""" - return eql.utils.load_dump(str(get_etc_path(*path))) + return eql.utils.load_dump(get_etc_path(*path)) def save_etc_dump(contents, *path, **kwargs): """Save a json/yml/toml file from the detection_rules/etc/ folder.""" - path = str(get_etc_path(*path)) + path = get_etc_path(*path) _, ext = os.path.splitext(path) sort_keys = kwargs.pop('sort_keys', True) indent = kwargs.pop('indent', 2) diff --git a/detection_rules/version_lock.py b/detection_rules/version_lock.py index 227021acd..4424d03ec 100644 --- a/detection_rules/version_lock.py +++ b/detection_rules/version_lock.py @@ -17,9 +17,9 @@ from .schemas import definitions from .utils import cached, get_etc_path ETC_VERSION_LOCK_FILE = "version.lock.json" -ETC_VERSION_LOCK_PATH = get_etc_path() / ETC_VERSION_LOCK_FILE +ETC_VERSION_LOCK_PATH = Path(get_etc_path()) / ETC_VERSION_LOCK_FILE ETC_DEPRECATED_RULES_FILE = "deprecated_rules.json" -ETC_DEPRECATED_RULES_PATH = get_etc_path() / ETC_DEPRECATED_RULES_FILE +ETC_DEPRECATED_RULES_PATH = Path(get_etc_path()) / ETC_DEPRECATED_RULES_FILE # This was the original version the lock was created under. This constant has been replaced by # schemas.get_min_supported_stack_version to dynamically determine the minimum