Files
atomic-red-team/atomic_red_team/atomic_red_team.rb
T
2023-08-15 16:53:26 -06:00

272 lines
13 KiB
Ruby
Executable File

require 'yaml'
require 'erb'
require 'attack_api'
require 'securerandom'
class AtomicRedTeam
ATTACK_API = Attack.new
ATOMICS_DIRECTORY = "#{File.dirname(File.dirname(__FILE__))}/atomics"
# TODO- should these all be relative URLs?
ROOT_GITHUB_URL = "https://github.com/redcanaryco/atomic-red-team"
#
# Returns a list of paths that contain Atomic Tests
#
def atomic_test_paths
Dir["#{ATOMICS_DIRECTORY}/T*/T*.yaml"].sort
end
#
# Returns a list of Atomic Tests in Atomic Red Team (as Hashes from source YAML)
#
def atomic_tests
@atomic_tests ||= atomic_test_paths.collect do |path|
atomic_yaml = YAML.load(File.read path)
atomic_yaml['atomic_yaml_path'] = path
atomic_yaml
end
end
#
# Returns the individual Atomic Tests for a given identifer, passed as either a string (T1234) or an ATT&CK technique object
#
def atomic_tests_for_technique_by_platform(technique_or_technique_identifier, platform)
technique_identifier = if technique_or_technique_identifier.is_a? Hash
ATTACK_API.technique_identifier_for_technique technique_or_technique_identifier
else
technique_or_technique_identifier
end
test_list = Array.new
atomic_tests.find do |atomic_yaml|
if atomic_yaml.fetch('attack_technique').upcase == technique_identifier.upcase
atomic_yaml['atomic_tests'].each do |a_test|
if a_test["supported_platforms"].include?(platform[:platform])
test_list.append(a_test)
end
end
end
end
test_list
end
#
# Returns the individual Atomic Tests for a given identifer, passed as either a string (T1234) or an ATT&CK technique object
#
def atomic_tests_for_technique(technique_or_technique_identifier)
technique_identifier = if technique_or_technique_identifier.is_a? Hash
ATTACK_API.technique_identifier_for_technique technique_or_technique_identifier
else
technique_or_technique_identifier
end
atomic_tests.find do |atomic_yaml|
atomic_yaml.fetch('attack_technique').upcase == technique_identifier.upcase
end.to_h.fetch('atomic_tests', [])
end
#
# Returns a Markdown formatted Github link to a technique. This will be to the edit page for
# techniques that already have one or more Atomic Red Team tests, or the create page for
# techniques that have no existing tests for the given OS.
#
def github_link_to_technique(technique, include_identifier: false, only_platform: self.only_platform)
technique_identifier = ATTACK_API.technique_identifier_for_technique(technique).upcase
link_display = "#{"#{technique_identifier.upcase} " if include_identifier}#{technique['name']}"
yaml_file = "#{ATOMICS_DIRECTORY}/#{technique_identifier}/#{technique_identifier}.yaml"
markdown_file = "#{ATOMICS_DIRECTORY}/#{technique_identifier}/#{technique_identifier}.md"
if atomic_yaml_has_test_for_platform(yaml_file, only_platform) && (File.exist? markdown_file)
# we have a file for this technique, so link to it's Markdown file
"[#{link_display}](../../#{technique_identifier}/#{technique_identifier}.md)"
else
# we don't have a file for this technique, or there are not tests for the given platform, so link to an edit page
"#{link_display} [CONTRIBUTE A TEST](https://github.com/redcanaryco/atomic-red-team/wiki/Contributing)"
end
end
def atomic_yaml_has_test_for_platform(yaml_file, only_platform)
has_test_for_platform = false
if File.exist? yaml_file
yaml = YAML.load_file(yaml_file)
yaml['atomic_tests'].each_with_index do |atomic, i|
if atomic["supported_platforms"].any? {|platform| platform.downcase =~ only_platform}
has_test_for_platform = true
break
end
end
end
return has_test_for_platform
end
def validate_atomic_yaml!(yaml, used_guids_file, unique_guid_array)
raise("YAML file has no elements") if yaml.nil?
raise('`attack_technique` element is required') unless yaml.has_key?('attack_technique')
raise('`attack_technique` element must be a string') unless yaml['attack_technique'].is_a?(String)
raise('`display_name` element is required') unless yaml.has_key?('display_name')
raise('`display_name` element must be an array') unless yaml['display_name'].is_a?(String)
raise('`atomic_tests` element is required') unless yaml.has_key?('atomic_tests')
raise('`atomic_tests` element must be an array') unless yaml['atomic_tests'].is_a?(Array)
raise('`atomic_tests` element is empty - you have no tests') unless yaml['atomic_tests'].count > 0
yaml['atomic_tests'].each_with_index do |atomic, i|
raise("`atomic_tests[#{i}].name` element is required") unless atomic.has_key?('name')
raise("`atomic_tests[#{i}].name` element must be a string") unless atomic['name'].is_a?(String)
if atomic.has_key?('auto_generated_guid')
guid = atomic["auto_generated_guid"].to_s
raise("`atomic_tests[#{i}].auto_generated_guid` element not a proper guid") unless /[a-f0-9]{8}-[a-f0-9]{4}-4[a-f0-9]{3}-[89aAbB][a-f0-9]{3}-[a-f0-9]{12}/.match(guid)
raise("`atomic_tests[#{i}].auto_generated_guid` element must be unique") unless !unique_guid_array.include?(guid)
unique_guid_array << guid
end
raise("`atomic_tests[#{i}].description` element is required") unless atomic.has_key?('description')
raise("`atomic_tests[#{i}].description` element must be a string") unless atomic['description'].is_a?(String)
raise("`atomic_tests[#{i}].supported_platforms` element is required") unless atomic.has_key?('supported_platforms')
raise("`atomic_tests[#{i}].supported_platforms` element must be an Array (was a #{atomic['supported_platforms'].class.name})") unless atomic['supported_platforms'].is_a?(Array)
valid_supported_platforms = ['windows', 'macos', 'linux', 'office-365', 'azure-ad', 'google-workspace', 'saas', 'iaas', 'containers', 'iaas:aws', 'iaas:azure', 'iaas:gcp']
atomic['supported_platforms'].each do |platform|
if !valid_supported_platforms.include?(platform)
raise("`atomic_tests[#{i}].supported_platforms` '#{platform}' must be one of #{valid_supported_platforms.join(', ')}")
end
end
if atomic['dependencies']
atomic['dependencies'].each do |dependency|
raise("`atomic_tests[#{i}].dependencies` '#{dependency}' must be have a description}") unless dependency.has_key?('description')
raise("`atomic_tests[#{i}].dependencies` '#{dependency}' must be have a prereq_command}") unless dependency.has_key?('prereq_command')
raise("`atomic_tests[#{i}].dependencies` '#{dependency}' must be have a get_prereq_command}") unless dependency.has_key?('get_prereq_command')
end
end
(atomic['input_arguments'] || {}).each_with_index do |arg_kvp, iai|
arg_name, arg = arg_kvp
raise("`atomic_tests[#{i}].input_arguments[#{iai}].description` element is required") unless arg.has_key?('description')
raise("`atomic_tests[#{i}].input_arguments[#{iai}].description` element must be a string") unless arg['description'].is_a?(String)
raise("`atomic_tests[#{i}].input_arguments[#{iai}].type` element is required") unless arg.has_key?('type')
raise("`atomic_tests[#{i}].input_arguments[#{iai}].type` element must be a string") unless arg['type'].is_a?(String)
raise("`atomic_tests[#{i}].input_arguments[#{iai}].type` element must be lowercased and underscored (was #{arg['type']})") unless arg['type'] =~ /[a-z_]+/
# TODO: determine if we think default values are required for EVERY input argument
# raise("`atomic_tests[#{i}].input_arguments[#{iai}].default` element is required") unless arg.has_key?('default')
# raise("`atomic_tests[#{i}].input_arguments[#{iai}].default` element must be a string (was a #{arg['default'].class.name})") unless arg['default'].is_a?(String)
end
raise("`atomic_tests[#{i}].executor` element is required") unless atomic.has_key?('executor')
executor = atomic['executor']
raise("`atomic_tests[#{i}].executor.name` element is required") unless executor.has_key?('name')
raise("`atomic_tests[#{i}].executor.name` element must be a string") unless executor['name'].is_a?(String)
raise("`atomic_tests[#{i}].executor.name` element must be lowercased and underscored (was #{executor['name']})") unless executor['name'] =~ /[a-z_]+/
valid_executor_types = ['command_prompt', 'sh', 'bash', 'powershell', 'manual', 'aws', 'az', 'gcloud', 'kubectl']
case executor['name']
when 'manual'
raise("`atomic_tests[#{i}].executor.steps` element is required") unless executor.has_key?('steps')
raise("`atomic_tests[#{i}].executor.steps` element must be a string") unless executor['steps'].is_a?(String)
validate_input_args_vs_string! input_args: (atomic['input_arguments'] || {}).keys,
string: executor['steps'],
string_description: "atomic_tests[#{i}].executor.steps"
when 'command_prompt', 'sh', 'bash', 'powershell', 'aws', 'az', 'gcloud', 'kubectl'
raise("`atomic_tests[#{i}].executor.command` element is required") unless executor.has_key?('command')
raise("`atomic_tests[#{i}].executor.command` element must be a string") unless executor['command'].is_a?(String)
validate_input_args_vs_string! input_args: (atomic['input_arguments'] || {}).keys,
string: executor['command'],
string_description: "atomic_tests[#{i}].executor.command"
else
raise("`atomic_tests[#{i}].executor.name` '#{executor['name']}' must be one of #{valid_executor_types.join(', ')}")
end
validate_no_todos!(atomic, path: "atomic_tests[#{i}]")
end
end
def record_used_guids!(yaml, used_guids_file)
return unless !yaml.nil?
yaml['atomic_tests'].each_with_index do |atomic, i|
next unless atomic.has_key?('auto_generated_guid')
guid = atomic["auto_generated_guid"].to_s
add_guid_to_used_guid_file(guid, used_guids_file) unless guid == ''
end
end
def generate_guids_for_yaml!(path, used_guids_file)
text = File.read(path)
# add the "auto_generated_guid:" element after the "- name:" element if it isn't already there
text.gsub!(/(?i)(^([ \t]*-[ \t]*)name:.*$(?!\s*auto_generated_guid))/) { |m| "#{$1}\n#{$2.gsub(/-/," ")}auto_generated_guid:"}
# fill the "auto_generated_guid:" element in if it doesn't contain a guid
text.gsub!(/(?i)^([ \t]*auto_generated_guid:)(?!([ \t]*[a-f0-9]{8}-[a-f0-9]{4}-4[a-f0-9]{3}-[89aAbB][a-f0-9]{3}-[a-f0-9]{12})).*$/) { |m| "#{$1} #{get_unique_guid!(used_guids_file)}"}
File.open(path, "w") { |file| file << text }
end
# generates a unique guid and records the guid as having been used by writing it to the used_guids_file
def get_unique_guid!(used_guids_file)
new_guid = ''
20.times do |i| # if it takes more than 20 tries to get a unique guid, there must be something else going on
new_guid = SecureRandom.uuid
break unless !is_unique_guid(new_guid, used_guids_file)
end
# add this new unique guid to the used guids file
add_guid_to_used_guid_file(new_guid, used_guids_file)
return new_guid
end
# add guid to used guid file if it is the proper format and is not already in the file. raises an exception if guid isn't valid
def add_guid_to_used_guid_file(guid, used_guids_file)
open(used_guids_file, 'a') { |f|
raise("the GUID (#{guid}) does not match the required format for the `auto_generated_guid` element") unless /[a-f0-9]{8}-[a-f0-9]{4}-4[a-f0-9]{3}-[89aAbB][a-f0-9]{3}-[a-f0-9]{12}/ =~ guid
f.puts guid unless !is_unique_guid(guid, used_guids_file)
}
end
def is_unique_guid(guid, used_guids_file)
return !File.foreach(used_guids_file).grep(/#{guid}/).any?
end
#
# Validates that the arguments (specified in "#{arg}" format) in a string
# match the input_arguments for a test
#
def validate_input_args_vs_string!(input_args:, string:, string_description:)
input_args_in_string = string.scan(/#\{([^}]+)\}/).to_a.flatten
input_args_in_string_and_not_specced = input_args_in_string - input_args
if input_args_in_string_and_not_specced.count > 0
raise("`#{string_description}` contains args #{input_args_in_string_and_not_specced} not in input_arguments")
end
input_args_in_spec_not_string = input_args - input_args_in_string
if input_args_in_string_and_not_specced.count > 0
raise("`atomic_tests[#{i}].input_arguments` contains args #{input_args_in_spec_not_string} not in command")
end
end
#
# Recursively validates that the hash (or something) doesn't contain a TODO
#
def validate_no_todos!(hashish, path:)
if hashish.is_a? String
raise "`#{path}` contains a TODO" if hashish.include? 'TODO'
elsif hashish.is_a? Array
hashish.each_with_index do |item, i|
validate_no_todos! item, path: "#{path}[#{i}]"
end
elsif hashish.is_a? Hash
hashish.each do |k, v|
validate_no_todos! v, path: "#{path}.#{k}"
end
end
end
end