diff --git a/detection_rules/__main__.py b/detection_rules/__main__.py index e045df810..8d576e3e0 100644 --- a/detection_rules/__main__.py +++ b/detection_rules/__main__.py @@ -5,18 +5,19 @@ # coding=utf-8 """Shell for detection-rules.""" -import os import sys +from pathlib import Path import click assert (3, 12) <= sys.version_info < (4, 0), "Only Python 3.12+ supported" + from .main import root # noqa: E402 -CURR_DIR = os.path.dirname(os.path.abspath(__file__)) -CLI_DIR = os.path.dirname(CURR_DIR) -ROOT_DIR = os.path.dirname(CLI_DIR) +CURR_DIR = Path(__file__).resolve().parent +CLI_DIR = CURR_DIR.parent +ROOT_DIR = CLI_DIR.parent BANNER = r""" █▀▀▄ ▄▄▄ ▄▄▄ ▄▄▄ ▄▄▄ ▄▄▄ ▄▄▄ ▄▄▄ ▄ ▄ █▀▀▄ ▄ ▄ ▄ ▄▄▄ ▄▄▄ diff --git a/detection_rules/attack.py b/detection_rules/attack.py index a5908d186..2178d2c8c 100644 --- a/detection_rules/attack.py +++ b/detection_rules/attack.py @@ -17,8 +17,8 @@ from semver import Version from .utils import cached, clear_caches, get_etc_path, get_etc_glob_path, read_gzip, gzip_compress PLATFORMS = ['Windows', 'macOS', 'Linux'] -CROSSWALK_FILE = Path(get_etc_path('attack-crosswalk.json')) -TECHNIQUES_REDIRECT_FILE = Path(get_etc_path('attack-technique-redirects.json')) +CROSSWALK_FILE = get_etc_path('attack-crosswalk.json') +TECHNIQUES_REDIRECT_FILE = get_etc_path('attack-technique-redirects.json') tactics_map = {} @@ -28,17 +28,17 @@ def load_techniques_redirect() -> dict: return json.loads(TECHNIQUES_REDIRECT_FILE.read_text())['mapping'] -def get_attack_file_path() -> str: +def get_attack_file_path() -> Path: pattern = 'attack-v*.json.gz' attack_file = get_etc_glob_path(pattern) if len(attack_file) < 1: raise FileNotFoundError(f'Missing required {pattern} file') elif len(attack_file) != 1: raise FileExistsError(f'Multiple files found with {pattern} pattern. Only one is allowed') - return attack_file[0] + return Path(attack_file[0]) -_, _attack_path_base = get_attack_file_path().split('-v') +_, _attack_path_base = str(get_attack_file_path()).split('-v') _ext_length = len('.json.gz') CURRENT_ATTACK_VERSION = _attack_path_base[:-_ext_length] @@ -98,7 +98,7 @@ sub_technique_id_list = [t for t in technique_lookup if '.' in t] def refresh_attack_data(save=True) -> (Optional[dict], Optional[bytes]): """Refresh ATT&CK data from Mitre.""" - attack_path = Path(get_attack_file_path()) + attack_path = get_attack_file_path() filename, _, _ = attack_path.name.rsplit('.', 2) def get_version_from_tag(name, pattern='att&ck-v'): @@ -126,7 +126,7 @@ def refresh_attack_data(save=True) -> (Optional[dict], Optional[bytes]): compressed = gzip_compress(json.dumps(attack_data, sort_keys=True)) if save: - new_path = Path(get_etc_path(f'attack-v{latest_version}.json.gz')) + new_path = get_etc_path(f'attack-v{latest_version}.json.gz') new_path.write_bytes(compressed) attack_path.unlink() print(f'Replaced file: {attack_path} with {new_path}') diff --git a/detection_rules/devtools.py b/detection_rules/devtools.py index 7befb9d1a..ab87cc619 100644 --- a/detection_rules/devtools.py +++ b/detection_rules/devtools.py @@ -90,7 +90,7 @@ def dev_group(): def build_release(config_file, update_version_lock: bool, generate_navigator: bool, generate_docs: str, update_message: str, release=None, verbose=True): """Assemble all the rules into Kibana-ready release files.""" - config = load_dump(config_file)['package'] + config = load_dump(str(config_file))['package'] registry_data = config['registry_data'] if generate_navigator: @@ -313,7 +313,7 @@ def prune_staging_area(target_stack_version: str, dry_run: bool, exception_list: continue # it's a change to a rule file, load it and check the version - if str(change.path.absolute()).startswith(RULES_DIR) and change.path.suffix == ".toml": + if str(change.path.absolute()).startswith(str(RULES_DIR)) and change.path.suffix == ".toml": # bypass TOML validation in case there were schema changes dict_contents = RuleCollection.deserialize_toml_string(change.read()) min_stack_version: Optional[str] = dict_contents.get("metadata", {}).get("min_stack_version") @@ -441,7 +441,7 @@ def integrations_pr(ctx: click.Context, local_repo: str, token: str, draft: bool stack_version = Package.load_configs()["name"] package_version = Package.load_configs()["registry_data"]["version"] - release_dir = Path(RELEASE_DIR) / stack_version / "fleet" / package_version + release_dir = RELEASE_DIR / stack_version / "fleet" / package_version message = f"[Security Rules] Update security rules package to v{package_version}" if not release_dir.exists(): @@ -581,7 +581,7 @@ def license_check(ctx, ignore_directory): """Check that all code files contain a valid license.""" ignore_directory += ("env",) failed = False - base_path = Path(get_path()) + base_path = get_path() for path in base_path.rglob('*.py'): relative_path = path.relative_to(base_path) @@ -622,7 +622,7 @@ def test_version_lock(branches: tuple, remote: str): finally: diff = git('--no-pager', 'diff', get_etc_path('version.lock.json')) - outfile = Path(get_path()).joinpath('lock-diff.txt') + outfile = get_path() / 'lock-diff.txt' outfile.write_text(diff) click.echo(f'diff saved to {outfile}') @@ -740,7 +740,7 @@ def deprecate_rule(ctx: click.Context, rule_file: Path): deprecation_date=today, maturity='deprecated') contents = dataclasses.replace(rule.contents, metadata=new_meta) - new_rule = TOMLRule(contents=contents, path=Path(deprecated_path)) + new_rule = TOMLRule(contents=contents, path=deprecated_path) new_rule.save_toml() # remove the old rule diff --git a/detection_rules/ecs.py b/detection_rules/ecs.py index 1418c9647..4ec6bdb15 100644 --- a/detection_rules/ecs.py +++ b/detection_rules/ecs.py @@ -9,7 +9,6 @@ import glob import json import os import shutil -from pathlib import Path import eql import eql.types @@ -88,7 +87,7 @@ def get_max_version(include_master=False): versions = get_schema_map().keys() if include_master and any([v.startswith('master') for v in versions]): - return list(Path(ECS_SCHEMAS_DIR).glob('master*'))[0].name + return list(ECS_SCHEMAS_DIR.glob('master*'))[0].name return str(max([Version.parse(v) for v in versions if not v.startswith('master')])) @@ -303,9 +302,9 @@ def download_endpoint_schemas(target: str, overwrite: bool = True) -> None: flattened[f"{root_name}.{f['name']}"] = f['type'] # save schema to disk - Path(ENDPOINT_SCHEMAS_DIR).mkdir(parents=True, exist_ok=True) + ENDPOINT_SCHEMAS_DIR.mkdir(parents=True, exist_ok=True) compressed = gzip_compress(json.dumps(flattened, sort_keys=True, cls=DateTimeEncoder)) - new_path = Path(ENDPOINT_SCHEMAS_DIR) / f"endpoint_{target}.json.gz" + new_path = ENDPOINT_SCHEMAS_DIR / f"endpoint_{target}.json.gz" if overwrite: shutil.rmtree(new_path, ignore_errors=True) with open(new_path, 'wb') as f: diff --git a/detection_rules/endgame.py b/detection_rules/endgame.py index 249c515cd..4ed6bd624 100644 --- a/detection_rules/endgame.py +++ b/detection_rules/endgame.py @@ -7,13 +7,12 @@ import json import shutil import sys -from pathlib import Path import eql from .utils import ETC_DIR, DateTimeEncoder, cached, gzip_compress, read_gzip -ENDGAME_SCHEMA_DIR = Path(ETC_DIR) / "endgame_schemas" +ENDGAME_SCHEMA_DIR = ETC_DIR / "endgame_schemas" class EndgameSchemaManager: diff --git a/detection_rules/integrations.py b/detection_rules/integrations.py index 28a1b93d3..0c0d11e85 100644 --- a/detection_rules/integrations.py +++ b/detection_rules/integrations.py @@ -25,8 +25,8 @@ from .misc import load_current_package_version from .utils import cached, get_etc_path, read_gzip, unzip from .schemas import definitions -MANIFEST_FILE_PATH = Path(get_etc_path('integration-manifests.json.gz')) -SCHEMA_FILE_PATH = Path(get_etc_path('integration-schemas.json.gz')) +MANIFEST_FILE_PATH = get_etc_path('integration-manifests.json.gz') +SCHEMA_FILE_PATH = get_etc_path('integration-schemas.json.gz') _notified_integrations = set() diff --git a/detection_rules/main.py b/detection_rules/main.py index 882e6278e..4a3806010 100644 --- a/detection_rules/main.py +++ b/detection_rules/main.py @@ -79,7 +79,7 @@ def generate_rules_index(ctx: click.Context, query, overwrite, save_files=True): bulk_upload_docs, importable_rules_docs = package.create_bulk_index_body() if save_files: - path = Path(get_path('enriched-rule-indexes', package_hash)) + path = get_path('enriched-rule-indexes', package_hash) path.mkdir(parents=True, exist_ok=overwrite) bulk_upload_docs.dump(path.joinpath('enriched-rules-index-uploadable.ndjson'), sort_keys=True) importable_rules_docs.dump(path.joinpath('enriched-rules-index-importable.ndjson'), sort_keys=True) @@ -431,7 +431,7 @@ def create_dnstwist_index(ctx: click.Context, input_file: click.Path): es_client: Elasticsearch = ctx.obj['es'] click.echo(f'Attempting to load dnstwist data from {input_file}') - dnstwist_data: dict = load_dump(input_file) + dnstwist_data: dict = load_dump(str(input_file)) click.echo(f'{len(dnstwist_data)} records loaded') original_domain = next(r['domain-name'] for r in dnstwist_data if r.get('fuzzer', '') == 'original*') @@ -496,10 +496,10 @@ def create_dnstwist_index(ctx: click.Context, input_file: click.Path): @click.argument('author') def prep_rule(author: str): """Prep the detection threat match rule for dnstwist data with a rule_id and author.""" - rule_template_file = Path(get_etc_path('rule_template_typosquatting_domain.json')) + rule_template_file = get_etc_path('rule_template_typosquatting_domain.json') template_rule = json.loads(rule_template_file.read_text()) template_rule.update(author=[author], rule_id=str(uuid4())) - updated_rule = Path(get_path('rule_typosquatting_domain.ndjson')) + updated_rule = get_path('rule_typosquatting_domain.ndjson') updated_rule.write_text(json.dumps(template_rule, sort_keys=True)) click.echo(f'Rule saved to: {updated_rule}. Import this to Kibana to create alerts on all dnstwist-* indexes') click.echo('Note: you only need to import and enable this rule one time for all dnstwist-* indexes') diff --git a/detection_rules/mappings.py b/detection_rules/mappings.py index ae7317ff5..54a9ca1c2 100644 --- a/detection_rules/mappings.py +++ b/detection_rules/mappings.py @@ -4,8 +4,8 @@ # 2.0. """RTA to rule mappings.""" -import os from collections import defaultdict +from pathlib import Path from rta import get_available_tests @@ -71,9 +71,9 @@ class RtaMappings: for rta_name in rta_list: # rip off the extension and add .py - rta_name, _ = os.path.splitext(os.path.basename(rta_name)) - rta_path = os.path.abspath(os.path.join(RTA_DIR, rta_name + ".py")) - if os.path.exists(rta_path): + rta_name = Path(rta_name).stem + rta_path = (RTA_DIR / rta_name).with_suffix(".py").resolve() + if rta_path.exists(): rta_files.add(rta_path) return list(sorted(rta_files)) diff --git a/detection_rules/misc.py b/detection_rules/misc.py index 5c35a722d..22bd349cb 100644 --- a/detection_rules/misc.py +++ b/detection_rules/misc.py @@ -271,7 +271,7 @@ def load_current_package_version() -> str: def get_default_config() -> Optional[Path]: - return next(Path(get_path()).glob('.detection-rules-cfg.*'), None) + return next(get_path().glob('.detection-rules-cfg.*'), None) @cached diff --git a/detection_rules/ml.py b/detection_rules/ml.py index 6b8ea821b..d4c2a5adc 100644 --- a/detection_rules/ml.py +++ b/detection_rules/ml.py @@ -27,7 +27,7 @@ from .schemas import definitions from .utils import get_path, unzip_to_dict -ML_PATH = Path(get_path('machine-learning')) +ML_PATH = get_path('machine-learning') def info_from_tag(tag: str) -> (Literal['ml'], definitions.MachineLearningType, str, int): diff --git a/detection_rules/packaging.py b/detection_rules/packaging.py index c8b9804e2..5dbfd3b89 100644 --- a/detection_rules/packaging.py +++ b/detection_rules/packaging.py @@ -69,7 +69,7 @@ def filter_rule(rule: TOMLRule, config_filter: dict, exclude_fields: Optional[di return True -CURRENT_RELEASE_PATH = Path(RELEASE_DIR) / load_current_package_version() +CURRENT_RELEASE_PATH = RELEASE_DIR / load_current_package_version() class Package(object): @@ -99,8 +99,8 @@ class Package(object): @classmethod def load_configs(cls): - """Load configs from packages.yaml.""" - return load_etc_dump(PACKAGE_FILE)['package'] + """Load configs from packages.yml.""" + return load_etc_dump(str(PACKAGE_FILE))['package'] @staticmethod def _package_kibana_notice_file(save_dir): @@ -175,17 +175,17 @@ class Package(object): def save(self, verbose=True): """Save a package and all artifacts.""" - save_dir = os.path.join(RELEASE_DIR, self.name) - rules_dir = os.path.join(save_dir, 'rules') - extras_dir = os.path.join(save_dir, 'extras') + save_dir = RELEASE_DIR / self.name + rules_dir = save_dir / 'rules' + extras_dir = save_dir / 'extras' # remove anything that existed before shutil.rmtree(save_dir, ignore_errors=True) - os.makedirs(rules_dir, exist_ok=True) - os.makedirs(extras_dir, exist_ok=True) + rules_dir.mkdir(parents=True, exist_ok=True) + extras_dir.mkdir(parents=True, exist_ok=True) for rule in self.rules: - rule.save_json(Path(rules_dir).joinpath(rule.path.name).with_suffix('.json')) + rule.save_json(rules_dir / Path(rule.path.name).with_suffix('.json')) self._package_kibana_notice_file(rules_dir) self._package_kibana_index_file(rules_dir) @@ -195,15 +195,15 @@ class Package(object): self.save_release_files(extras_dir, self.changed_ids, self.new_ids, self.removed_ids) # zip all rules only and place in extras - shutil.make_archive(os.path.join(extras_dir, self.name), 'zip', root_dir=os.path.dirname(rules_dir), - base_dir=os.path.basename(rules_dir)) + shutil.make_archive(extras_dir / self.name, 'zip', root_dir=rules_dir.parent, base_dir=rules_dir.name) # zip everything and place in release root - shutil.make_archive(os.path.join(save_dir, '{}-all'.format(self.name)), 'zip', - root_dir=os.path.dirname(extras_dir), base_dir=os.path.basename(extras_dir)) + shutil.make_archive( + save_dir / f"{self.name}-all", "zip", root_dir=extras_dir.parent, base_dir=extras_dir.name + ) if verbose: - click.echo('Package saved to: {}'.format(save_dir)) + click.echo(f'Package saved to: {save_dir}') def export(self, outfile, downgrade_version=None, verbose=True, skip_unsupported=False): """Export rules into a consolidated ndjson file.""" @@ -419,7 +419,7 @@ class Package(object): asset_path = rules_dir / f'{asset["id"]}.json' asset_path.write_text(json.dumps(asset, indent=4, sort_keys=True), encoding="utf-8") - notice_contents = Path(NOTICE_FILE).read_text() + notice_contents = NOTICE_FILE.read_text() readme_text = textwrap.dedent(""" # Prebuilt Security Detection Rules diff --git a/detection_rules/rule_loader.py b/detection_rules/rule_loader.py index fbd0364cf..51c7b824a 100644 --- a/detection_rules/rule_loader.py +++ b/detection_rules/rule_loader.py @@ -24,8 +24,8 @@ from .rule import ( from .schemas import definitions from .utils import cached, get_path -DEFAULT_RULES_DIR = Path(get_path("rules")) -DEFAULT_BBR_DIR = Path(get_path("rules_building_block")) +DEFAULT_RULES_DIR = get_path("rules") +DEFAULT_BBR_DIR = get_path("rules_building_block") DEFAULT_DEPRECATED_DIR = DEFAULT_RULES_DIR / '_deprecated' RTA_DIR = get_path("rta") FILE_PATTERN = r'^([a-z0-9_])+\.(json|toml)$' diff --git a/detection_rules/schemas/__init__.py b/detection_rules/schemas/__init__.py index b91ee6b0b..acd66982a 100644 --- a/detection_rules/schemas/__init__.py +++ b/detection_rules/schemas/__init__.py @@ -4,7 +4,6 @@ # 2.0. import json from collections import OrderedDict -from pathlib import Path from typing import List, Optional from typing import OrderedDict as OrderedDictType @@ -29,7 +28,7 @@ __all__ = ( "all_versions", ) -SCHEMA_DIR = Path(get_etc_path("api_schemas")) +SCHEMA_DIR = get_etc_path("api_schemas") migrations = {} @@ -54,7 +53,7 @@ def migrate(version: str): @cached def get_schema_file(version: Version, rule_type: str) -> dict: - path = Path(SCHEMA_DIR) / str(version) / f"{version}.{rule_type}.json" + path = SCHEMA_DIR / str(version) / f"{version}.{rule_type}.json" if not path.exists(): raise ValueError(f"Unsupported rule type {rule_type}. Unable to downgrade to {version}") diff --git a/detection_rules/utils.py b/detection_rules/utils.py index abb5c011a..701f4132a 100644 --- a/detection_rules/utils.py +++ b/detection_rules/utils.py @@ -30,10 +30,10 @@ from eql.utils import load_dump, stream_json_lines import kql -CURR_DIR = os.path.dirname(os.path.abspath(__file__)) -ROOT_DIR = os.path.dirname(CURR_DIR) -ETC_DIR = os.path.join(ROOT_DIR, "detection_rules", "etc") -INTEGRATION_RULE_DIR = os.path.join(ROOT_DIR, "rules", "integrations") +CURR_DIR = Path(__file__).resolve().parent +ROOT_DIR = CURR_DIR.parent +ETC_DIR = ROOT_DIR / "detection_rules" / "etc" +INTEGRATION_RULE_DIR = ROOT_DIR / "rules" / "integrations" class NonelessDict(dict): @@ -85,20 +85,20 @@ def get_json_iter(f): return data -def get_path(*paths) -> str: +def get_path(*paths) -> Path: """Get a file by relative path.""" - return os.path.join(ROOT_DIR, *paths) + return ROOT_DIR.joinpath(*paths) -def get_etc_path(*paths): +def get_etc_path(*paths) -> Path: """Load a file from the detection_rules/etc/ folder.""" - return os.path.join(ETC_DIR, *paths) + return ETC_DIR.joinpath(*paths) -def get_etc_glob_path(*patterns): +def get_etc_glob_path(*patterns) -> list: """Load a file from the detection_rules/etc/ folder.""" pattern = os.path.join(*patterns) - return glob.glob(os.path.join(ETC_DIR, pattern)) + return glob.glob(str(ETC_DIR / pattern)) def get_etc_file(name, mode="r"): @@ -109,12 +109,12 @@ def get_etc_file(name, mode="r"): def load_etc_dump(*path): """Load a json/yml/toml file from the detection_rules/etc/ folder.""" - return eql.utils.load_dump(get_etc_path(*path)) + return eql.utils.load_dump(str(get_etc_path(*path))) def save_etc_dump(contents, *path, **kwargs): """Save a json/yml/toml file from the detection_rules/etc/ folder.""" - path = get_etc_path(*path) + path = str(get_etc_path(*path)) _, ext = os.path.splitext(path) sort_keys = kwargs.pop('sort_keys', True) indent = kwargs.pop('indent', 2) diff --git a/detection_rules/version_lock.py b/detection_rules/version_lock.py index 4424d03ec..227021acd 100644 --- a/detection_rules/version_lock.py +++ b/detection_rules/version_lock.py @@ -17,9 +17,9 @@ from .schemas import definitions from .utils import cached, get_etc_path ETC_VERSION_LOCK_FILE = "version.lock.json" -ETC_VERSION_LOCK_PATH = Path(get_etc_path()) / ETC_VERSION_LOCK_FILE +ETC_VERSION_LOCK_PATH = get_etc_path() / ETC_VERSION_LOCK_FILE ETC_DEPRECATED_RULES_FILE = "deprecated_rules.json" -ETC_DEPRECATED_RULES_PATH = Path(get_etc_path()) / ETC_DEPRECATED_RULES_FILE +ETC_DEPRECATED_RULES_PATH = get_etc_path() / ETC_DEPRECATED_RULES_FILE # This was the original version the lock was created under. This constant has been replaced by # schemas.get_min_supported_stack_version to dynamically determine the minimum