Remove unreachable and legacy code

Co-Authored-By: Justin Ibarra <brokensound77@users.noreply.github.com>
This commit is contained in:
Ross Wolf
2020-06-30 10:12:20 -06:00
parent fac5473aca
commit e2d97b0a74
8 changed files with 11 additions and 347 deletions
+4 -9
View File
@@ -110,17 +110,14 @@ def mass_update(ctx, query, field):
rule.validate(as_rule=True)
rule.save()
ctx.invoke(search_rules, query=query, columns=[k[0].split('.')[-1] for k in field])
return
return ctx.invoke(search_rules, query=query, columns=[k[0].split('.')[-1] for k in field])
@root.command('view-rule')
@click.argument('rule-id', required=False)
@click.option('--rule-file', '-f', type=click.Path(dir_okay=False), help='Optionally view a rule from a specified file')
@click.option('--as-api/--as-rule', default=True, help='Print the rule in final api or rule format')
@click.option('--optimize/--no-optimize', default=False, help='When viewing in api format, include optimizations')
def view_rule(rule_id, rule_file, as_api, optimize):
def view_rule(rule_id, rule_file, as_api):
"""View an internal rule or specified rule file."""
if rule_id:
rule = rule_loader.get_rule(rule_id, verbose=False)
@@ -134,9 +131,6 @@ def view_rule(rule_id, rule_file, as_api, optimize):
click.secho('Unknown format!', fg='red')
return
if optimize and as_api:
rule.tune()
click.echo(toml_write(rule.rule_format()) if not as_api else json.dumps(rule.contents, indent=2, sort_keys=True))
return rule
@@ -213,7 +207,7 @@ def validate_all(fail):
@click.option('--columns', '-c', multiple=True, help='Specify columns to add the table')
@click.option('--language', type=click.Choice(["eql", "kql"]), default="kql")
def search_rules(query, columns, language, verbose=True):
"""Use KQL to find matching rules."""
"""Use KQL or EQL to find matching rules."""
from kql import get_evaluator
from eql.table import Table
from eql.build import get_engine
@@ -235,6 +229,7 @@ def search_rules(query, columns, language, verbose=True):
flattened_rules.sort(key=lambda dct: dct["name"])
filtered = []
if language == "kql":
evaluator = get_evaluator(query) if query else lambda x: True
filtered = list(filter(evaluator, flattened_rules))
+2 -18
View File
@@ -105,8 +105,8 @@ def manage_versions(rules, current_versions=None, exclude_version_update=False,
class Package(object):
"""Packaging object for siem rules and releases."""
def __init__(self, rules, name, tune=False, release=False, current_versions=None, min_version=None,
max_version=None, update_version_lock=False):
def __init__(self, rules, name, release=False, current_versions=None, min_version=None, max_version=None,
update_version_lock=False):
"""Initialize a package."""
self.rules = [r.copy() for r in rules] # type: list[Rule]
self.name = name
@@ -118,10 +118,6 @@ class Package(object):
self.rules = [r for r in self.rules
if (min_version or 0) <= r.contents['version'] <= (max_version or r.contents['version'])]
if tune:
for rule in rules:
rule.tune()
def _add_versions(self, current_versions, update_versions_lock=False):
"""Add versions to rules at load time."""
return manage_versions(self.rules, current_versions=current_versions, save_changes=update_versions_lock)
@@ -232,18 +228,6 @@ class Package(object):
modified_rules = 'Modified Rules: \n{}'.format('\n'.join(' - ' + s for s in sorted(changed)) if new else 'N/A')
return '\n'.join([total, sha256, ecs_versions, indices, new_rules, modified_rules])
def generate_mitre(self):
"""Create an excel file based on mitre coverage."""
# mapping with highlights of covered cells - links to pivot table with technique id selected
def reconcile_changes(self):
"""Parse and generate changes since previous release based on changed.toml file."""
# at packaging, generate flat changes file to standard, based on consolidated and deduped interpretation of
# changed.toml and clear out changes.toml
# - all based on api_format only
# see packages.yml - can update management.changed = True:
# until released in package, then added with filter and changed to False
def generate_change_notes(self):
"""Generate change release notes."""
+2 -58
View File
@@ -26,7 +26,7 @@ _META_SCHEMA_REQ_DEFAULTS = {}
class Rule(object):
"""Rule class containing all the information about a rule."""
def __init__(self, path, contents, tune=False):
def __init__(self, path, contents):
"""Create a Rule from a toml management format."""
self.path = os.path.realpath(path)
self.contents = contents.get('rule', contents)
@@ -36,19 +36,13 @@ class Rule(object):
self.validate()
self.unoptimized_query = self.contents.get('query')
if tune:
self.tune_rule = True
self.tune()
self._original_hash = self.get_hash()
def __str__(self):
return 'name={}, path={}, query={}'.format(self.name, self.path, self.query)
def __repr__(self):
return '{}(path={}, contents={}, tune={})'.format(type(self).__name__, repr(self.path), repr(self.contents),
repr(self.tune_rule))
return '{}(path={}, contents={})'.format(type(self).__name__, repr(self.path), repr(self.contents))
def __eq__(self, other):
if type(self) == type(other):
@@ -122,48 +116,6 @@ class Rule(object):
"""Normalize the (api only) contents and return a serialized dump of it."""
return json.dumps(nested_normalize(self.contents), sort_keys=True, indent=indent)
def tune(self):
"""Tune query by including applicable fields derived from metadata."""
# if not self.query:
# return
#
# self.unoptimized_query = self.contents.get('query')
#
# if not hasattr(self.parsed_query, 'terms'):
# # can prepend here if we want
# return
#
# # TODO: This is error prone and absolutely can/should be better done with a custom walker to:
# # - find these fields
# # - move them to the front/highest precedence
# # - dedup+update them with these values from metadata
# # I am going to leave it for now as a good mechanism for testing the theory and since it only impacts at
# # "package" time and will open an issue in the meantime
#
# # add os version
# # many os ecs fields - will optimize later
# # if not any(str(term.left) == '' for term in parsed_query.terms) and self.metadata.get('os_type_list'):
# # self.contents['query'] = ':({}) and '.format(' or '.join(self.metadata['_os_type_list'])) + self.query
#
# # add ecs version
# # handle these better with eql2kql
# compares = [str(term.left) == 'ecs.version' for term in self.parsed_query.terms
# if isinstance(term, Comparison)]
# in_sets = [str(term.expression) == 'ecs.version' for term in self.parsed_query.terms
# if isinstance(term, InSet)]
#
# if any(in_sets):
# pass
# elif any(compares):
# pass
# elif not (any(compares) or any(in_sets)):
# ecs_query = ' or '.join(self.metadata['ecs_version'])
# self.contents['query'] = 'ecs.version:({}) and '.format(ecs_query) + self.query
def untune(self):
"""Restore query to pre-tuned state."""
# self.contents['query'] = self.unoptimized_query
def get_path(self):
"""Wrapper around getting path."""
if not self.path:
@@ -175,14 +127,6 @@ class Rule(object):
"""Determines if the rule was changed from original or was never saved."""
return self._original_hash != self.get_hash()
@classmethod # TODO
def from_eql_rule(cls, path, contents, validate=False):
"""Create a rule from loaded rule (toml) contents."""
# if validate:
# jsonschema.validate(contents, rule_schema)
return cls(path, contents)
def bump_version(self):
"""Bump the version of the rule."""
self.contents['version'] += 1
-5
View File
@@ -61,11 +61,6 @@ def save_etc_dump(contents, *path):
return eql.utils.save_dump(contents, get_etc_path(*path))
def get_ecs_fields(endgame_field):
ecs_mapping = load_etc_dump('ecs_mappings.json')
return ecs_mapping.get(endgame_field)
def save_gzip(contents):
gz_file = io.BytesIO()
-216
View File
@@ -1,216 +0,0 @@
{
"channel_name": [
"winlog.channel"
],
"command_line": [
"process.command_line",
"process.args"
],
"destination_address": [
"destination.ip"
],
"destination_port": [
"destination.port",
"server.port"
],
"effective_gid": [
"user.group.id"
],
"effective_group_name": [
"user.group.name"
],
"effective_uid": [
"user.id"
],
"effective_user_name": [
"user.name"
],
"endpoint.core_os": [
"host.os.platform"
],
"endpoint.hostname": [
"host.hostname"
],
"endpoint.ip_address": [
"host.ip"
],
"endpoint.mac_address": [
"host.mac"
],
"endpoint.name": [
"host.name"
],
"endpoint.operating_system": [
"host.os.full"
],
"event_id": [
"winlog.event_id"
],
"event_message": [
"winlog.message",
"winlog.event_data.*"
],
"eventlog_user_sid": [
"winlog.user.sid"
],
"file_name": [
"file.name",
"file.extension"
],
"file_path": [
"file.path"
],
"fileid": [
"file.inode"
],
"http_request": [
"http.request.method",
"http.request.referrer",
"http.version",
"http.*"
],
"imphash": [
"file.hash.imphash"
],
"in_packet_count": [
"source.packets",
"destination.packets"
],
"ip_address": [
"source.ip"
],
"logon_type": [
"winlog.event_data.LogonType"
],
"machine_id": [
"host.id"
],
"md5": [
"process.hash.md5",
"file.hash.md5"
],
"opcode": [
"winlog.opcode"
],
"out_packet_count": [
"source.packets",
"destination.packets"
],
"parent_pid": [
"process.parent.id"
],
"parent_process_name": [
"process.parent.name"
],
"parent_process_path": [
"process.parent.executable"
],
"pid": [
"process.pid"
],
"ppid": [
"process.parent.pid"
],
"process_name": [
"process.name"
],
"process_path": [
"process.executable"
],
"protocol": [
"network.transport"
],
"provider_guid": [
"winlog.provider_guid"
],
"provider_name": [
"winlog.provider_name"
],
"query_name": [
"dns.question.name"
],
"query_options": [
"dns.flags"
],
"query_results": [
"dns.answers.data"
],
"query_status": [
"dns.response_code"
],
"query_type": [
"dns.type"
],
"severity": [
"event.severity"
],
"sha1": [
"process.hash.sha1",
"file.hash.sha1"
],
"sha256": [
"process.hash.sha256",
"file.hash.sha256"
],
"source_address": [
"source.address",
"source.ip",
"client.address"
],
"source_port": [
"source.port",
"client.port"
],
"source_process_name": [
"process.name"
],
"source_process_path": [
"process.executable"
],
"subject_domain_name": [
"winlog.event_data.SubjectDomainName"
],
"subject_logon_id": [
"winlog.event_data.SubjectLogonId"
],
"subject_user_name": [
"winlog.event_data.UserName"
],
"subject_user_sid": [
"winlog.event_data.UserSid"
],
"target_domain_name": [
"winlog.event_data.TargetDomainName",
"user.domain"
],
"target_logon_id": [
"winlog.event_data.TargetLogonId"
],
"target_user_name": [
"user.name"
],
"task": [
"winlog.task"
],
"tid": [
"process.thread.id"
],
"timestamp": [
"@timestamp"
],
"total_in_bytes": [
"destination.bytes"
],
"total_out_bytes": [
"source.bytes"
],
"user_domain": [
"user.domain"
],
"user_name": [
"user.name"
],
"user_sid": [
"user.identifer"
]
}
-6
View File
@@ -1,7 +1,6 @@
---
package:
name: "7.9"
tune: false
release: true
# as_eql: true
filter:
@@ -10,8 +9,3 @@ package:
- 1.5.0
maturity:
- production
# need to add to schema - updated rules get this set to true when changed
# and reset when added to a package
# changed: true - maybe
# update:
# severity: low
+3 -3
View File
@@ -17,7 +17,7 @@ from . import common
@common.requires_os(common.WINDOWS)
def main(remote_host=None):
def main(username="rta-tester", remote_host=None):
if not remote_host:
common.log('A remote host is required to detonate this RTA', '!')
return common.MISSING_REMOTE_HOST
@@ -26,11 +26,11 @@ def main(remote_host=None):
common.log('Brute forcing login with invalid password against {}'.format(remote_host))
ps_command = '''
$PW = ConvertTo-SecureString "Lose-y0urse1f" -AsPlainText -Force
$PW = ConvertTo-SecureString "such-secure-passW0RD!" -AsPlainText -Force
$CREDS = New-Object System.Management.Automation.PsCredential {username}, $PW
Invoke-WmiMethod -ComputerName {host} -Class Win32_process -Name create -ArgumentList ipconfig -Credential $CREDS
'''
command = ['powershell', '-c', ps_command.format(username='zeus', host=remote_host)]
command = ['powershell', '-c', ps_command.format(username=username, host=remote_host)]
# fail 4 times - the first 3 concurrently and wait for the final to complete
for i in range(4):
-32
View File
@@ -68,12 +68,6 @@ class TestValidRules(unittest.TestCase):
rule = Rule(file_name, contents)
rule.validate(as_rule=True)
def test_all_rules_tuned(self):
"""Ensure that every rule file validates against the rule schema."""
for file_name, contents in rule_loader.load_rule_files().items():
rule = Rule(file_name, contents, tune=True)
rule.validate(as_rule=True)
def test_all_rule_queries_optimized(self):
"""Ensure that every rule query is in optimized form."""
for file_name, contents in rule_loader.load_rule_files().items():
@@ -86,32 +80,6 @@ class TestValidRules(unittest.TestCase):
rule.name, rule.id, optimized, rule.query)
self.assertEqual(tree, optimized, err_message)
def test_ecs_version_in_query(self):
"""Ensure that all rule queries have ecs version."""
# rule_loader.reset()
# rules = list(rule_loader.load_rules().values())
#
# for rule in rules:
# ecs_ver = rule.metadata.get('ecs_version')
# if ecs_ver:
# self.assertTrue('ecs.version:{}'.format(ecs_ver) in rule.query,
# 'ecs_version specified but missing from query')
def test_rules_lint_integrity(self):
"""Verify that linting is not compromising integrity of a rule."""
'''def validate(source, linted, *args):
self.assertEqual(kql.lint(source), linted, *args)
rules = rule_loader.load_rules().values()
for rule in rules:
try:
linted = eql2kql.convert(kql2eql.parse(rule.query).render())
validate(rule.query, linted, 'Linting improperly modified the query from: \n\t{} \nto \n\t{}'.format(
rule.query, linted))
except Exception as e:
raise Exception('{} - {}:\n{}'.format(rule.name, rule.query, e))'''
def test_no_unrequired_defaults(self):
"""Test that values that are not required in the schema are not set with default values."""
rules_with_hits = {}