Merge pull request #1274 from redcanaryco/remove-execution-framworks
[WIP] Remove execution frameworks from Atomic Red Team
This commit is contained in:
@@ -28,7 +28,7 @@
|
||||
<a href="use-cases" class="btn">Use Cases</a>
|
||||
<a href="testing" class="btn">Get Started</a>
|
||||
<a href="contributing" class="btn">Contributing</a>
|
||||
<a href="apis-execution-frameworks" class="btn">APIs & Execution Frameworks</a>
|
||||
<a href="apis" class="btn">APIs</a>
|
||||
<a href="related" class="btn">Related</a>
|
||||
<a href="{{ site.github.repository_url }}" class="btn">View on GitHub</a>
|
||||
<a href="https://slack.atomicredteam.io" class="btn">Join on Slack</a>
|
||||
|
||||
@@ -6,14 +6,10 @@ layout: default
|
||||
Atomic Red Team includes a Ruby API we use to validate atomic tests, generate docs, and
|
||||
[interact with ATT&CK](#bonus-apis-ruby-attck-api).
|
||||
|
||||
> Want to contribute APIs for another language such as Python or Powershell?
|
||||
Follow the interface in `atomic_red_team/atomic_red_team.rb` and submit a pull request!
|
||||
|
||||
## Ruby API
|
||||
|
||||
Atomic Red Team comes with a Ruby API that we use when validating tests again our spec, generating
|
||||
documentation in Markdown format, etc. You too can use the API to use Atomic Red Team tests
|
||||
in your test execution framework.
|
||||
documentation in Markdown format, etc.
|
||||
|
||||
### Installing
|
||||
Add atomic-red-team to your Gemfile:
|
||||
@@ -93,7 +89,7 @@ command-and-control has 21 techniques
|
||||
initial-access has 10 techniques
|
||||
```
|
||||
|
||||
### Example (my favorite): Getting a 2D array of the ATT&CK matrix of Tactic columns and Technique rows:
|
||||
### Example: Getting a 2D array of the ATT&CK matrix of Tactic columns and Technique rows:
|
||||
```ruby
|
||||
2.2.0 :062 > Attack.new.ordered_tactics
|
||||
=> ["initial-access", "execution", "persistence", "privilege-escalation", "defense-evasion", "credential-access",
|
||||
@@ -1,24 +0,0 @@
|
||||
# Atomic Red Team Execution Frameworks
|
||||
Execution frameworks help you run Atomic Tests in your environment.
|
||||
Each atomic test is defined in the [atomics folder](https://github.com/redcanaryco/atomic-red-team/tree/master/execution-frameworks) inside their respective Mitre Att&ck T# folders.
|
||||
Within each T# folder you will find a yaml file that defines the commands to be run during the test, and an easier to read markdown (md) version of the same thing.
|
||||
Here is an [example markdown file](https://github.com/redcanaryco/atomic-red-team/blob/master/atomics/T1003/T1003.md) describing some of the tests that can be run using one of the below execution frameworks.
|
||||
|
||||
## Invoke-AtomicRedTeam
|
||||
|
||||
Invoke-AtomicRedTeam is written in PowerShell, which can be executed cross-platform using PowerShell Core for Linux and MacOS.
|
||||
For detailed installation and usage instructions refer to the [README](https://github.com/redcanaryco/invoke-atomicredteam) file. Note that this execution framework exists in a separate GitHub Repository [here](https://github.com/redcanaryco/invoke-atomicredteam).
|
||||
|
||||
## Python
|
||||
|
||||
Surprise, this framework is written in Python. For detailed installation and usage instructions refer to the [README](https://github.com/redcanaryco/atomic-red-team/tree/master/execution-frameworks/contrib/python) file inside of the **_contrib/python_** folder.
|
||||
|
||||
## Ruby
|
||||
|
||||
Ruby version of the execution framework.
|
||||
|
||||
## Golang
|
||||
|
||||
go-atomicredteam is written in Golang, and [binaries](https://github.com/activeshadow/go-atomicredteam/releases) have been compiled for Windows, MacOS, and Linux.
|
||||
|
||||
For detailed installation and usage instructions refer to the [README](https://github.com/activeshadow/go-atomicredteam) file. Note that this execution framework exists in a separate GitHub Repository [here](https://github.com/activeshadow/go-atomicredteam).
|
||||
@@ -1,158 +0,0 @@
|
||||
Introduction
|
||||
============
|
||||
|
||||
ART Attack Runner is a wrapper made to automate the attacks described in the [Atomic Red Team Git Repository](https://github.com/redcanaryco/atomic-red-team). It allows running the various techniques via shell scripts (bash or powershell), via Python scripts or interactively.
|
||||
|
||||
Installing
|
||||
==========
|
||||
|
||||
This script runs on Windows and Linux at the moment, and was tested against Python 3.6. Python 2.7 is unsupported at the moment.
|
||||
|
||||
To install the script, pull the Git repo from [here](https://github.com/redcanaryco/atomic-red-team), move to the execution-frameworks/python directory and install the tool's dependencies:
|
||||
|
||||
```
|
||||
pip install -r requirements.txt
|
||||
```
|
||||
|
||||
This will pull in the external packages that are required to run.
|
||||
|
||||
Installing on Windows
|
||||
---------------------
|
||||
|
||||
Here are some more extensive instructions to install this software on Windows.
|
||||
|
||||
|
||||
- Ensure that Git is installed. If it is not, get it from the [official site](https://git-scm.com/downloads). If you right-click within a folder, you should have a "Git Bash Here" option.
|
||||
- Ensure Python 3 is installed (3.7 is known to work). If it is not installed, get it [here](https://www.python.org/getit/).
|
||||
- Ensure pip (Python package manager) is installed. It is normally installed when the python package is installed, so make sure that the step above has completed properly.
|
||||
- Add "%USERPROFILE%\AppData\Local\Programs\Python\Python37" and "%USERPROFILE%\AppData\Local\Programs\Python\Python37\Scripts" to the PATH environment variable. This adds pip and the python executable to the PATH, allowing them to be run more conveniently from the command line.
|
||||
- Open a shell with the "Git Bash Here" contextual menu option. You should be able to run the "python" command successfully. "python -V" should indicate Python 3 rather than python 2.7.
|
||||
- Upgrade pip to the latest version: "pip install --upgrade pip"
|
||||
- Clone the ART Attack Runner Git repository on the Desktop directly. Open Git Bash on the Desktop and type the following: "git clone https://github.com/redcanaryco/atomic-red-team". - Move into the cloned repository, and run the installation commands: "pip install -r requirements.txt". This will install the dependencies of the project.
|
||||
- Try to run the script: "python runner.py interactive". Try to run the technique of your choice.
|
||||
|
||||
|
||||
Usage
|
||||
=====
|
||||
|
||||
The tool may be used in three different ways: interactively, in Python scripts, or via shell scripts.
|
||||
|
||||
Interactive usage
|
||||
-----------------
|
||||
|
||||
Run the tool like follows:
|
||||
|
||||
```
|
||||
python runner.py interactive
|
||||
```
|
||||
|
||||
You will be dropped in a REPL that will allow you to interact with the system. Here is an example session:
|
||||
|
||||
Enter the name of the technique that you would like to execute (eg. T1033). Type 'exit' to quit.
|
||||
> T1033
|
||||
|
||||
===========================================================
|
||||
System Owner/User Discovery - T1033
|
||||
|
||||
-----------------------------------------------------------
|
||||
Name: System Owner/User Discovery
|
||||
Description: Identify System owner or users on an endpoint
|
||||
Platforms: windows
|
||||
|
||||
Arguments:
|
||||
computer_name: Name of remote computer (default: computer1)
|
||||
|
||||
Launcher: command_prompt
|
||||
Command: cmd.exe /C whoami
|
||||
quser
|
||||
quser /SERVER:"${computer_name}"
|
||||
wmic useraccount get /ALL
|
||||
qwinsta.exe /server:${computer_name}
|
||||
qwinsta.exe
|
||||
for /F "tokens=1,2" %i in ('qwinsta /server:${computer_name} ^| findstr "Active Disc"') do @echo %i | find /v "#" | find /v "c
|
||||
onsole" || echo %j > usernames.txt
|
||||
@FOR /F %n in (computers.txt) DO @FOR /F "tokens=1,2" %i in ('qwinsta /server:%n ^| findstr "Active Disc"') do @echo %i | find
|
||||
/v "#" | find /v "console" || echo %j > usernames.txt
|
||||
|
||||
|
||||
Do you want to run this? (Y/n): y
|
||||
Do you want to check dependencies? (Y/n): y
|
||||
Do you want to run the cleanup after the executor completes? (Y/n): y
|
||||
Please provide a parameter for 'computer_name' (blank for default): MY_COMPUTER
|
||||
|
||||
------------------------------------------------
|
||||
|
||||
Running: cmd.exe /C whoami
|
||||
Output: olivier.lemelin
|
||||
|
||||
|
||||
In Python Scripts
|
||||
-----------------
|
||||
|
||||
To use in a Python script, simply import the script as a package, create an `AtomicRunner` class instance, and use it as follows:
|
||||
|
||||
# Import the runner (the script "runner.py")
|
||||
import runner
|
||||
def main():
|
||||
|
||||
# Instantiate the AtomicRunner class instance.
|
||||
techniques = runner.AtomicRunner()
|
||||
|
||||
# Execute the chosen technique the following way:
|
||||
# First parameter ("T1033"): Name of the technique to execute.
|
||||
# Second parameter (0): Position of the executor in the list.
|
||||
# In order to find this number, launch an interactive execution. Just before actually launching
|
||||
# the command, the system will display what line needs to be added to your script.
|
||||
# If there is only one executor, you may omit this parameter.
|
||||
# Third parameter ({"computer_name": "DA2CTC"}): Arguments to pass to the executor.
|
||||
# This is a dictionary of the arguments to pass to the executor. Both the key and value are
|
||||
# to be passed as strings. You may also launch an interactive execution, and the script will
|
||||
# tell you what needs to be entered to invoke this test.
|
||||
# Forth parameter (True): Check dependencies before running the executor.
|
||||
# This is a boolean value that indicates if the runner has to check if the dependencies are met.
|
||||
# If there are missing dependencies the runner tries to fetch them if the `get_prereq_command` section
|
||||
# in the yaml file is specified.
|
||||
# Fifth parameter (True): Cleanup after the executor completed.
|
||||
# This is a boolean value that indicates if the runner has execute the cleanup section after the executor completes.
|
||||
# The cleanup commands are specified in the `cleanup_command` section within the executor
|
||||
techniques.execute("T1033", position=0, parameters={"computer_name": "DA2CTC"}, True, True)
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
||||
In Shell Scripts
|
||||
----------------
|
||||
|
||||
To use in a shell script, a CLI interface has been produced. You may use it the following way:
|
||||
|
||||
```
|
||||
python runner.py run T1033 0 --args '{"computer_name": "DA2CTC"}' --dependencies --cleanup
|
||||
```
|
||||
|
||||
If you're unsure of how to use this, you may also launch an interactive execution, and just before actually launching the command, the system will report the line that needs to be added to your shell script in order to run it as desired.
|
||||
|
||||
Help!
|
||||
=====
|
||||
|
||||
If you're ever unsure of the sytax to invoke the script, you may use the help function from the script:
|
||||
|
||||
`python runner.py -h`
|
||||
|
||||
`python runner.py interactive -h`
|
||||
|
||||
`python runner.py run -h`
|
||||
|
||||
Gotchas
|
||||
-------
|
||||
|
||||
Here are a few things you might want to keep in mind as you go along:
|
||||
|
||||
- The script "moves" into the ART technique's directory before launching the command. If your parameters include a Path, it will need to take this in consideration (or you may want to use fully qualified paths)
|
||||
|
||||
|
||||
FAQ
|
||||
---
|
||||
|
||||
Q. I am receiving the following error, what does this mean? : "Warning: new executor fingerprint does not match the old one! Skipping this execution"
|
||||
|
||||
A. Since executors are not associated to an ID, but only to a position in a YAML file, we need to make sure that we are running the right executor at any time. As such, we take a hash of an executor before it is run, and if they're different, we spit out this error. To fix this, simply do what the script asks you to do: Make sure that you are executing the right executor with the right parameters using the interactive mode, and run the clearhash function, which removes the hash from the database. You should be good to go afterwards.
|
||||
@@ -1,2 +0,0 @@
|
||||
Unidecode==1.0.22
|
||||
PyYAML>=4.2b1
|
||||
@@ -1,750 +0,0 @@
|
||||
"""
|
||||
ART Attack Runner
|
||||
Version: 1.0
|
||||
Author: Olivier Lemelin
|
||||
|
||||
Script that was built in order to automate the execution of ART.
|
||||
"""
|
||||
|
||||
import os
|
||||
import os.path
|
||||
import fnmatch
|
||||
import platform
|
||||
import re
|
||||
import subprocess
|
||||
import sys
|
||||
import hashlib
|
||||
import json
|
||||
import argparse
|
||||
|
||||
import yaml
|
||||
import unidecode
|
||||
|
||||
# pylint: disable=line-too-long, invalid-name
|
||||
|
||||
TECHNIQUE_DIRECTORY_PATTERN = 'T*'
|
||||
ATOMICS_DIR_RELATIVE_PATH = os.path.join("..", "..", "..", "atomics")
|
||||
HASH_DB_RELATIVE_PATH = "techniques_hash.db"
|
||||
COMMAND_TIMEOUT = 20
|
||||
|
||||
##########################################
|
||||
# Filesystem & Helpers
|
||||
##########################################
|
||||
|
||||
def get_platform():
|
||||
"""Gets the current platform."""
|
||||
|
||||
# We need to handle the platform a bit differently in certain cases.
|
||||
# Otherwise, we simply return the value that's given here.
|
||||
plat = platform.system().lower()
|
||||
|
||||
if plat == "darwin":
|
||||
# 'macos' is the term that is being used within the .yaml files.
|
||||
plat = "macos"
|
||||
|
||||
return plat
|
||||
|
||||
|
||||
def get_self_path():
|
||||
"""Gets the full path to this script's directory."""
|
||||
return os.path.dirname(os.path.abspath(__file__))
|
||||
|
||||
|
||||
def get_yaml_file_from_dir(path_to_dir):
|
||||
"""Returns path of the first file that matches "*.yaml" in a directory."""
|
||||
|
||||
for entry in os.listdir(path_to_dir):
|
||||
if fnmatch.fnmatch(entry, '*.yaml'):
|
||||
# Found the file!
|
||||
return os.path.join(path_to_dir, entry)
|
||||
|
||||
print("No YAML file describing the technique in {}!".format(path_to_dir))
|
||||
return None
|
||||
|
||||
|
||||
def load_technique(path_to_dir):
|
||||
"""Loads the YAML content of a technique from its directory. (T*)"""
|
||||
|
||||
# Get path to YAML file.
|
||||
file_entry = get_yaml_file_from_dir(path_to_dir)
|
||||
|
||||
# Load and parses its content.
|
||||
with open(file_entry, 'r', encoding="utf-8") as f:
|
||||
return yaml.load(unidecode.unidecode(f.read()), Loader=yaml.SafeLoader)
|
||||
|
||||
|
||||
def load_techniques():
|
||||
"""Loads multiple techniques from the 'atomics' directory."""
|
||||
|
||||
# Get path to atomics directory.
|
||||
atomics_path = os.path.join(get_self_path(),
|
||||
ATOMICS_DIR_RELATIVE_PATH)
|
||||
normalized_atomics_path = os.path.normpath(atomics_path)
|
||||
|
||||
print("Loading techniques from {}...".format(normalized_atomics_path))
|
||||
|
||||
# Create a dict to accept the techniques that will be loaded.
|
||||
techniques = {}
|
||||
print("Loading Technique", end="")
|
||||
|
||||
# For each tech directory in the main directory.
|
||||
for atomic_entry in os.listdir(normalized_atomics_path):
|
||||
|
||||
# Make sure that it matches the current pattern.
|
||||
if fnmatch.fnmatch(atomic_entry, TECHNIQUE_DIRECTORY_PATTERN):
|
||||
print(", {}".format(atomic_entry), end="")
|
||||
|
||||
# Get path to tech dir.
|
||||
path_to_dir = os.path.join(normalized_atomics_path, atomic_entry)
|
||||
|
||||
# Load, parse and add to dict.
|
||||
tech = load_technique(path_to_dir)
|
||||
techniques[atomic_entry] = tech
|
||||
|
||||
# Add path to technique's directory.
|
||||
techniques[atomic_entry]["path"] = path_to_dir
|
||||
print(".")
|
||||
return techniques
|
||||
|
||||
def check_dependencies(executor, cwd):
|
||||
dependencies = "dependencies"
|
||||
dependencies_executor = "dependency_executor_name"
|
||||
prereq_command = "prereq_command"
|
||||
get_prereq_command = "get_prereq_command"
|
||||
input_arguments = "input_arguments"
|
||||
|
||||
# If the executor doesn't have dependencies_executor key it doesn't have dependencies. Skip
|
||||
if dependencies not in executor or dependencies not in executor:
|
||||
print("No '{}' or '{}' section found in the yaml file. Skipping dependencies check.".format(dependencies_executor,dependencies))
|
||||
return True
|
||||
|
||||
launcher = executor[dependencies_executor]
|
||||
|
||||
for dep in executor[dependencies]:
|
||||
args = executor[input_arguments] if input_arguments in executor else {}
|
||||
final_parameters = set_parameters(args, {})
|
||||
command = build_command(launcher, dep[prereq_command], final_parameters, cwd)
|
||||
|
||||
p = subprocess.Popen(launcher, shell=False, stdin=subprocess.PIPE, stdout=subprocess.PIPE,
|
||||
stderr=subprocess.STDOUT, env=os.environ, cwd=cwd)
|
||||
|
||||
p.communicate(bytes(command, "utf-8") + b"\n", timeout=COMMAND_TIMEOUT)
|
||||
# If the dependencies are not satisfied the command will exit with code 1, 0 otherwise.
|
||||
if p.returncode != 0:
|
||||
print("Dependencies not found. Fetching them...")
|
||||
if get_prereq_command not in dep:
|
||||
print("Missing {} commands in the yaml file. Can't fetch requirements".format(get_prereq_command))
|
||||
return False
|
||||
command = build_command(launcher, dep[get_prereq_command], final_parameters, cwd)
|
||||
d = subprocess.Popen(launcher, shell=False, stdin=subprocess.PIPE, stdout=subprocess.PIPE,
|
||||
stderr=subprocess.STDOUT, env=os.environ, cwd=cwd)
|
||||
out, err = d.communicate(bytes(command, "utf-8") + b"\n", timeout=COMMAND_TIMEOUT)
|
||||
p.terminate()
|
||||
|
||||
return True
|
||||
|
||||
|
||||
|
||||
##########################################
|
||||
# Executors
|
||||
##########################################
|
||||
|
||||
def is_valid_executor(exe, self_platform):
|
||||
"""Validates that the executor can be run on the current platform."""
|
||||
if self_platform not in exe["supported_platforms"]:
|
||||
return False
|
||||
|
||||
# The "manual" executors need to be run by hand, normally.
|
||||
# This script should not be running them.
|
||||
if exe["executor"]["name"] == "manual":
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
|
||||
def get_valid_executors(tech):
|
||||
"""From a loaded technique, get all executors appropriate for the current platform."""
|
||||
return list(filter(lambda x: is_valid_executor(x, get_platform()), tech['atomic_tests']))
|
||||
|
||||
|
||||
def get_executors(tech):
|
||||
"""From a loaded technique, get all executors."""
|
||||
return tech['atomic_tests']
|
||||
|
||||
|
||||
def print_input_arguments(executor):
|
||||
"""Prints out the input arguments of an executor in a human-readable manner."""
|
||||
if "input_arguments" in executor:
|
||||
for name, values in executor["input_arguments"].items():
|
||||
print("{name}: {description} (default: {default})".format(name=name,
|
||||
description=values["description"],
|
||||
default=values["default"]))
|
||||
|
||||
|
||||
def print_executor(executor):
|
||||
"""Prints an executor in a human-readable manner."""
|
||||
|
||||
print("\n-----------------------------------------------------------")
|
||||
print("Name: " + executor["name"].strip())
|
||||
print("Description: " + executor["description"].strip())
|
||||
print("Platforms: " + ", ".join(map(lambda x: x.strip(), executor["supported_platforms"])))
|
||||
print("\nArguments:")
|
||||
print_input_arguments(executor)
|
||||
print("\nLauncher: " + executor["executor"]["name"])
|
||||
print("Command: " + executor["executor"]["command"] + "\n")
|
||||
|
||||
|
||||
def executor_get_input_arguments(input_arguments):
|
||||
"""Gets the input arguments from the user, displaying a prompt and converting them."""
|
||||
|
||||
# Empty dict to hold on the parameters.
|
||||
parameters = {}
|
||||
for name, values in input_arguments.items():
|
||||
|
||||
# If answer, use that.
|
||||
answer = input_string("Please provide a parameter for '{name}' (blank for default)".format(name=name))
|
||||
|
||||
# If no answer, use the default.
|
||||
if not answer:
|
||||
answer = values["default"]
|
||||
|
||||
# Cast parameter to string
|
||||
parameters[name] = str(answer)
|
||||
|
||||
return parameters
|
||||
|
||||
|
||||
def print_non_interactive_command_line(technique_name, executor_number, parameters, check_dep, run_cleanup):
|
||||
"""Prints the comand line to use in order to launch the technique non-interactively."""
|
||||
flag_dep = ""
|
||||
flag_cleanup = ""
|
||||
|
||||
if check_dep:
|
||||
flag_dep = "--dependencies"
|
||||
|
||||
if run_cleanup:
|
||||
flag_cleanup = "--cleanup"
|
||||
|
||||
print("In order to run this non-interactively:")
|
||||
print(" Python:")
|
||||
print(" techniques = runner.AtomicRunner()")
|
||||
print(" techniques.execute(\"{name}\", position={pos}, parameters={params}, dependencies={dep}, cleanup={cleanup})".format(name=technique_name, pos=executor_number, params=parameters, dep=check_dep, cleanup=run_cleanup))
|
||||
print(" Shell Script:")
|
||||
print(" python3 runner.py run {name} {pos} --args '{params}' {dep} {cleanup}\n".format(name=technique_name, pos=executor_number, params=json.dumps(parameters), dep=flag_dep, cleanup=flag_cleanup))
|
||||
|
||||
|
||||
def interactive_apply_executor(executor, path, technique_name, executor_number):
|
||||
"""Interactively run a given executor."""
|
||||
|
||||
# Prints information about the executor.
|
||||
print_executor(executor)
|
||||
|
||||
# Request if we still want to run this.
|
||||
if not yes_or_no("Do you want to run this? "):
|
||||
print("Cancelled.")
|
||||
return
|
||||
|
||||
# Request if we want to check the dependencies before running the executor.
|
||||
check_dep = yes_or_no("Do you want to check dependencies? ")
|
||||
|
||||
# Request if we want to cleanup after the executor completes.
|
||||
run_cleanup = yes_or_no("Do you want to run the cleanup after the executor completes? ")
|
||||
|
||||
# If so, get the input parameters.
|
||||
if "input_arguments" in executor:
|
||||
parameters = executor_get_input_arguments(executor["input_arguments"])
|
||||
else:
|
||||
parameters = {}
|
||||
|
||||
if check_dep:
|
||||
if not check_dependencies(executor, path):
|
||||
print("Check dependencies failed. Cancelling...")
|
||||
return
|
||||
|
||||
# Prints the Command line to enter for non-interactive execution.
|
||||
print_non_interactive_command_line(technique_name, executor_number, parameters, check_dep, run_cleanup)
|
||||
launcher = convert_launcher(executor["executor"]["name"])
|
||||
command = executor["executor"]["command"]
|
||||
built_command = build_command(launcher, command, parameters, path)
|
||||
|
||||
# begin execution with the above parameters.
|
||||
execute_command(launcher, built_command, path)
|
||||
|
||||
if run_cleanup:
|
||||
apply_cleanup(executor, path, parameters)
|
||||
|
||||
|
||||
def get_default_parameters(args):
|
||||
"""Build a default parameters dictionary from the content of the YAML file."""
|
||||
return {name: values["default"] for name, values in args.items()}
|
||||
|
||||
|
||||
def set_parameters(executor_input_arguments, given_arguments):
|
||||
"""Sets the default parameters if no value was given."""
|
||||
|
||||
# Default parameters as decribed in the executor.
|
||||
default_parameters = get_default_parameters(executor_input_arguments)
|
||||
|
||||
# Merging default parameters with the given parameters, giving precedence
|
||||
# to the given params.
|
||||
final_parameters = {**default_parameters, **given_arguments}
|
||||
|
||||
# Cast parameters to string
|
||||
for name, value in final_parameters.items():
|
||||
final_parameters[name] = str(value)
|
||||
|
||||
return final_parameters
|
||||
|
||||
|
||||
def apply_executor(executor, path, parameters):
|
||||
"""Non-interactively run a given executor."""
|
||||
|
||||
args = executor["input_arguments"] if "input_arguments" in executor else {}
|
||||
final_parameters = set_parameters(args, parameters)
|
||||
|
||||
launcher = convert_launcher(executor["executor"]["name"])
|
||||
command = executor["executor"]["command"]
|
||||
built_command = build_command(launcher, command, final_parameters, path)
|
||||
|
||||
# begin execution with the above parameters.
|
||||
execute_command(launcher, built_command, path)
|
||||
|
||||
def apply_cleanup(executor, path, parameters):
|
||||
if "cleanup_command" not in executor["executor"] or executor["executor"]["cleanup_command"] == None:
|
||||
print("No cleanup section found in the yaml file. Skipping...")
|
||||
return
|
||||
|
||||
args = executor["input_arguments"] if "input_arguments" in executor else {}
|
||||
final_parameters = set_parameters(args, parameters)
|
||||
launcher = convert_launcher(executor["executor"]["name"])
|
||||
command = executor["executor"]["cleanup_command"]
|
||||
built_command = build_command(launcher, command, final_parameters, path)
|
||||
# begin execution with the above parameters.
|
||||
execute_command(launcher, built_command, path)
|
||||
|
||||
##########################################
|
||||
# Text Input
|
||||
##########################################
|
||||
|
||||
def yes_or_no(question):
|
||||
"""Asks a yes or no question, and captures input. Blank input is interpreted as Y."""
|
||||
reply = str(input(question+' (Y/n): ')).capitalize().strip()
|
||||
|
||||
if reply == "": #pylint: disable=no-else-return
|
||||
return True
|
||||
elif reply[0] == 'Y':
|
||||
return True
|
||||
elif reply[0] == 'N':
|
||||
return False
|
||||
|
||||
return yes_or_no("Please enter Y or N.")
|
||||
|
||||
|
||||
def input_string(message):
|
||||
"""Asks a question and captures the string output."""
|
||||
return str(input(message + ': ')).strip()
|
||||
|
||||
|
||||
def parse_number_input(user_input):
|
||||
"""Converts a string of space-separated numbers to an array of numbers."""
|
||||
lst_str = user_input.strip().split(' ')
|
||||
return list(map(int, lst_str))
|
||||
|
||||
|
||||
##########################################
|
||||
# Commands
|
||||
##########################################
|
||||
|
||||
class ManualExecutorException(Exception):
|
||||
"""Custom Exception that we trigger triggered when we encounter manual executors."""
|
||||
pass
|
||||
|
||||
def convert_launcher(launcher):
|
||||
"""Takes the YAML launcher, and outputs an appropriate executable
|
||||
to run the command."""
|
||||
|
||||
plat = get_platform()
|
||||
|
||||
# Regular command prompt.
|
||||
if launcher == "command_prompt": #pylint: disable=no-else-return
|
||||
if plat == "windows": #pylint: disable=no-else-return
|
||||
# This is actually a 64bit CMD.EXE. Do not change this to a 32bits CMD.EXE
|
||||
return "C:\\Windows\\System32\\cmd.exe"
|
||||
|
||||
elif plat == "linux":
|
||||
# Good ol' Bourne Shell.
|
||||
return "/bin/sh"
|
||||
|
||||
elif plat == "macos":
|
||||
# I assume /bin/sh is available on OSX.
|
||||
return "/bin/sh"
|
||||
|
||||
else:
|
||||
# We hit a non-Linux, non-Windows OS. Use sh.
|
||||
print("Warning: Unsupported platform {}! Using /bin/sh.".format(plat))
|
||||
return "/bin/sh"
|
||||
|
||||
elif launcher == "powershell":
|
||||
return "C:\\Windows\\System32\\WindowsPowerShell\\v1.0\\powershell.exe"
|
||||
|
||||
elif launcher == "sh":
|
||||
return "/bin/sh"
|
||||
|
||||
elif launcher == "bash":
|
||||
return "/bin/bash"
|
||||
|
||||
elif launcher == "manual":
|
||||
# We cannot process manual execution with this script. Raise an exception.
|
||||
raise ManualExecutorException()
|
||||
|
||||
else:
|
||||
# This launcher is not known. Returning it directly.
|
||||
print("Warning: Launcher '{}' has no specific case! Invoking as is.".format(launcher))
|
||||
return launcher
|
||||
|
||||
|
||||
def build_command(launcher, command, parameters, path): #pylint: disable=unused-argument
|
||||
"""Builds the command line that will eventually be run."""
|
||||
|
||||
# Using a closure! We use the replace to match found objects
|
||||
# and replace them with the corresponding passed parameter.
|
||||
def replacer(matchobj):
|
||||
if matchobj.group(1) in parameters:
|
||||
val = parameters[matchobj.group(1)]
|
||||
else:
|
||||
print("Warning: no match found while building the replacement string.")
|
||||
val = None
|
||||
|
||||
return val
|
||||
|
||||
# Fix string interpolation (from ruby to Python!) -- ${}
|
||||
command = re.sub(r"\$\{(.+?)\}", replacer, command)
|
||||
|
||||
# Fix string interpolation (from ruby to Python!) -- #{}
|
||||
command = re.sub(r"\#\{(.+?)\}", replacer, command)
|
||||
|
||||
# Replace instances of PathToAtomicsFolder
|
||||
atomics = os.path.join(path, "..")
|
||||
command = command.replace("$PathToAtomicsFolder", atomics)
|
||||
command = command.replace("PathToAtomicsFolder", atomics)
|
||||
|
||||
return command
|
||||
|
||||
|
||||
def execute_subprocess(launcher, command, cwd):
|
||||
p = subprocess.Popen(launcher, shell=False, stdin=subprocess.PIPE, stdout=subprocess.PIPE,
|
||||
stderr=subprocess.STDOUT, env=os.environ, cwd=cwd)
|
||||
try:
|
||||
|
||||
outs, errs = p.communicate(bytes(command, "utf-8") + b"\n", timeout=COMMAND_TIMEOUT)
|
||||
return outs, errs
|
||||
except subprocess.TimeoutExpired as e:
|
||||
|
||||
# Display output if it exists.
|
||||
if e.output:
|
||||
print(e.output)
|
||||
if e.stdout:
|
||||
print(e.stdout)
|
||||
if e.stderr:
|
||||
print(e.stderr)
|
||||
print("Command timed out!")
|
||||
|
||||
# Kill the process.
|
||||
p.kill()
|
||||
return "", ""
|
||||
|
||||
|
||||
def print_process_output(outs, errs):
|
||||
def clean_output(s):
|
||||
# Remove Windows CLI garbage
|
||||
s = re.sub(r"Microsoft\ Windows\ \[version .+\]\r?\nCopyright.*(\r?\n)+[A-Z]\:.+?\>", "", s)
|
||||
return re.sub(r"(\r?\n)*[A-Z]\:.+?\>", "", s)
|
||||
|
||||
# Output the appropriate outputs if they exist.
|
||||
if outs:
|
||||
print("Output: {}".format(clean_output(outs.decode("utf-8", "ignore"))), flush=True)
|
||||
else:
|
||||
print("(No output)")
|
||||
if errs:
|
||||
print("Errors: {}".format(clean_output(errs.decode("utf-8", "ignore"))), flush=True)
|
||||
|
||||
|
||||
def execute_command(launcher, command, cwd):
|
||||
"""Executes a command with the given launcher."""
|
||||
|
||||
print("\n------------------------------------------------")
|
||||
|
||||
# If launcher is powershell we execute all commands under a single process
|
||||
# powershell.exe -Command - (Tell powershell to read scripts from stdin)
|
||||
if "powershell" in launcher:
|
||||
outs, errs = execute_subprocess([launcher, '-Command', '-'], command, cwd)
|
||||
print_process_output(outs, errs)
|
||||
|
||||
else:
|
||||
for comm in command.split("\n"):
|
||||
|
||||
# We skip empty lines. This is due to the split just above.
|
||||
if comm == "":
|
||||
continue
|
||||
|
||||
# # We actually run the command itself.
|
||||
outs, errs = execute_subprocess(launcher, comm, cwd)
|
||||
print_process_output(outs, errs)
|
||||
|
||||
continue
|
||||
|
||||
|
||||
#########################################
|
||||
# Hash database
|
||||
#########################################
|
||||
|
||||
def load_hash_db():
|
||||
"""Loads the hash database from a file, or create the empty file if it did not already exist."""
|
||||
hash_db_path = os.path.join(get_self_path(), HASH_DB_RELATIVE_PATH)
|
||||
try:
|
||||
with open(hash_db_path, 'r') as f:
|
||||
return json.load(f)
|
||||
except json.JSONDecodeError:
|
||||
print("Could not decode the JSON Hash DB! Please fix the syntax of the file.")
|
||||
sys.exit(3)
|
||||
except IOError:
|
||||
print("File did not exist. Created a new empty Hash DB.")
|
||||
empty_db = {}
|
||||
write_hash_db(hash_db_path, empty_db)
|
||||
return empty_db
|
||||
|
||||
|
||||
def write_hash_db(hash_db_path, db):
|
||||
"""Writes the hash DB dictionary to a file."""
|
||||
with open(hash_db_path, 'w') as f:
|
||||
json.dump(db, f, sort_keys=True, indent=4, separators=(',', ': '))
|
||||
|
||||
|
||||
def check_hash_db(hash_db_path, executor_data, technique_name, executor_position):
|
||||
"""Checks the hash DB for a hash, and verifies that it corresponds to the current executor data's
|
||||
hash. Adds the hash to the current database if it does not already exist."""
|
||||
hash_db = load_hash_db()
|
||||
executor_position = str(executor_position)
|
||||
|
||||
# Tries to load the technique section.
|
||||
if not technique_name in hash_db:
|
||||
print("Technique section '{}' did not exist. Creating.".format(technique_name))
|
||||
# Create section
|
||||
hash_db[technique_name] = {}
|
||||
|
||||
new_hash = hashlib.sha256(json.dumps(executor_data).encode()).hexdigest()
|
||||
|
||||
# Tries to load the executor hash.
|
||||
if not executor_position in hash_db[technique_name]:
|
||||
print("Hash was not in DB. Adding.")
|
||||
# Create the hash, since it does not exist. Return OK.
|
||||
hash_db[technique_name][executor_position] = new_hash
|
||||
|
||||
# Write DB to file.
|
||||
write_hash_db(hash_db_path, hash_db)
|
||||
return True
|
||||
|
||||
old_hash = hash_db[technique_name][executor_position]
|
||||
|
||||
# If a previous hash already exists, compare both hashes.
|
||||
return old_hash == new_hash
|
||||
|
||||
def clear_hash(hash_db_path, technique_to_clear, position_to_clear=-1):
|
||||
"""Clears a hash from the DB, then saves the DB to a file."""
|
||||
hash_db = load_hash_db()
|
||||
|
||||
if position_to_clear == -1:
|
||||
# We clear out the whole technique.
|
||||
del hash_db[technique_to_clear]
|
||||
else:
|
||||
# We clear the position.
|
||||
del hash_db[technique_to_clear][str(position_to_clear)]
|
||||
|
||||
print("Hash cleared.")
|
||||
|
||||
write_hash_db(hash_db_path, hash_db)
|
||||
|
||||
#########################################
|
||||
# Atomic Runner and Main
|
||||
#########################################
|
||||
|
||||
class AtomicRunner():
|
||||
"""Class that allows the execution, interactive or not, of the various techniques that are part of ART."""
|
||||
|
||||
def __init__(self):
|
||||
"""Constructor. Ensures that the techniques are loaded before we can run them."""
|
||||
# Loads techniques.
|
||||
self.techniques = load_techniques()
|
||||
|
||||
|
||||
def repl(self):
|
||||
"""Presents a REPL to the user so that they may interactively run certain techniques."""
|
||||
print("Enter the name of the technique that you would like to execute (eg. T1033). Type 'exit' to quit.")
|
||||
i = input("> ").strip()
|
||||
|
||||
while True:
|
||||
if i == "exit":
|
||||
break
|
||||
else:
|
||||
if i in self.techniques:
|
||||
self.interactive_execute(i)
|
||||
else:
|
||||
print("Technique '{}' does not exist.".format(i))
|
||||
|
||||
i = input("> ").strip()
|
||||
|
||||
|
||||
def execute(self, technique_name, position=0, parameters=None, dependencies=False, cleanup=False):
|
||||
"""Runs a technique non-interactively."""
|
||||
|
||||
parameters = parameters or {}
|
||||
|
||||
if technique_name not in self.techniques:
|
||||
print("No technique {} found. Skipping...".format(technique_name))
|
||||
return False
|
||||
|
||||
# Gets the tech.
|
||||
tech = self.techniques[technique_name]
|
||||
|
||||
# Gets Executors.
|
||||
executors = get_valid_executors(tech)
|
||||
|
||||
if len(executors) - 1 < position:
|
||||
print("The position '{}' couldn't be found.".format(position))
|
||||
print("The technique {} has {} available tests for the current platform. Skipping...".format(technique_name,len(executors)))
|
||||
return False
|
||||
|
||||
print("================================================")
|
||||
if dependencies:
|
||||
print("Checking dependencies {}/{}\n".format(technique_name, position))
|
||||
if not check_dependencies(executors[position], tech["path"]):
|
||||
return False
|
||||
|
||||
print("Executing {}/{}\n".format(technique_name, position))
|
||||
|
||||
|
||||
try:
|
||||
# Get executor at given position.
|
||||
executor = executors[position]
|
||||
except IndexError:
|
||||
print("Out of bounds: this executor is not part of that technique's list!")
|
||||
return False
|
||||
|
||||
# Make sure that it is compatible.
|
||||
if not is_valid_executor(executor, get_platform()):
|
||||
print("Warning: This executor is not compatible with the current platform!")
|
||||
return False
|
||||
|
||||
# Check that hash matches previous executor hash or that this is a new hash.
|
||||
if not check_hash_db(HASH_DB_RELATIVE_PATH, executor, technique_name, position):
|
||||
print("Warning: new executor fingerprint does not match the old one! Skipping this execution.")
|
||||
print("To re-enable this test, review this specific executor, test your payload, and clear out this executor's hash from the database.")
|
||||
print("Run this: python runner.py clearhash {} {}.".format(technique_name, position))
|
||||
return False
|
||||
|
||||
# Launch execution.
|
||||
try:
|
||||
apply_executor(executor, tech["path"], parameters)
|
||||
except ManualExecutorException:
|
||||
print("Cannot launch a technique with a manual executor. Aborting.")
|
||||
return False
|
||||
finally:
|
||||
if cleanup:
|
||||
print("Running cleanup commands.")
|
||||
apply_cleanup(executor, tech["path"], parameters)
|
||||
|
||||
return True
|
||||
|
||||
|
||||
def interactive_execute(self, technique_name):
|
||||
"""Interactively execute a single technique."""
|
||||
|
||||
# Gets the tech.
|
||||
tech = self.techniques[technique_name]
|
||||
|
||||
# Gets the compatible executors for this current platform.
|
||||
executors = get_valid_executors(tech)
|
||||
|
||||
# If there are none.
|
||||
if not executors:
|
||||
print("No valid executors for this platform/technique combination!")
|
||||
return
|
||||
|
||||
# Display technique info
|
||||
print("\n===========================================================")
|
||||
print("{} - {}".format(tech["display_name"], tech["attack_technique"]))
|
||||
|
||||
# Get number of executors.
|
||||
nb_executors = len(executors)
|
||||
if nb_executors > 1:
|
||||
|
||||
# Displays all executors with the index (for the number choice).
|
||||
for idx, executor in enumerate(executors):
|
||||
# Make it better!
|
||||
print("{}. ".format(idx))
|
||||
print_executor(executor)
|
||||
|
||||
# Display prompt, and get input as number list.
|
||||
while True:
|
||||
user_input = input("Please choose your executors: (space-separated list of numbers): ")
|
||||
try:
|
||||
numbers = parse_number_input(user_input)
|
||||
for i in numbers:
|
||||
# Interactively apply all chosen executors.
|
||||
interactive_apply_executor(executors[i], tech["path"], tech["attack_technique"], i)
|
||||
break
|
||||
except Exception as e: #pylint: disable=broad-except
|
||||
print("Could not parse the input. make sure this is a space-separated list of integers.")
|
||||
print(e)
|
||||
else:
|
||||
# We only have one executor in this case.
|
||||
interactive_apply_executor(executors[0], tech["path"], tech["attack_technique"], 0)
|
||||
|
||||
|
||||
def interactive(args): #pylint: disable=unused-argument
|
||||
"""Launch the runner in interactive mode."""
|
||||
runner = AtomicRunner()
|
||||
runner.repl()
|
||||
|
||||
|
||||
def run(args):
|
||||
"""Launch the runner in non-interactive mode."""
|
||||
runner = AtomicRunner()
|
||||
runner.execute(args.technique, args.position, json.loads(args.args), args.dependencies, args.cleanup, )
|
||||
|
||||
|
||||
def clear(args):
|
||||
"""Clears a stale hash from the Hash DB."""
|
||||
clear_hash(HASH_DB_RELATIVE_PATH, args.technique, args.position)
|
||||
|
||||
|
||||
def main():
|
||||
"""Main function, called every time this script is launched rather than imported."""
|
||||
parser = argparse.ArgumentParser(description="Allows the automation of tests in the Atomic Red Team repository.")
|
||||
subparsers = parser.add_subparsers()
|
||||
|
||||
parser_int = subparsers.add_parser('interactive', help='Runs the techniques interactively.')
|
||||
parser_int.set_defaults(func=interactive)
|
||||
|
||||
parser_run = subparsers.add_parser('run', help="Ponctually runs a single technique / executor pair.")
|
||||
parser_run.add_argument('technique', type=str, help="Technique to run.")
|
||||
parser_run.add_argument('position', type=int, help="Position of the executor in technique to run.")
|
||||
parser_run.add_argument("--dependencies", action='store_true', help="Check for dependencies, in any, and fetch them if necessary.")
|
||||
parser_run.add_argument("--cleanup", action='store_true', help="Run cleanup commands, if any, after executor completed.")
|
||||
parser_run.add_argument('--args', type=str, default="{}", help="JSON string representing a dictionary of arguments (eg. '{ \"arg1\": \"val1\", \"arg2\": \"val2\" }' )")
|
||||
parser_run.set_defaults(func=run)
|
||||
|
||||
parser_clear = subparsers.add_parser('clearhash', help="Clears a hash from the database, allowing the technique to be run once again.")
|
||||
parser_clear.add_argument('technique', type=str, help="Technique to run.")
|
||||
parser_clear.add_argument('--position', '-p', type=int, default=-1, help="Position of the executor in technique to run.")
|
||||
parser_clear.set_defaults(func=clear)
|
||||
|
||||
try:
|
||||
args = parser.parse_args()
|
||||
args.func(args)
|
||||
except AttributeError:
|
||||
parser.print_help()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -1,3 +0,0 @@
|
||||
*.yaml
|
||||
*.yml
|
||||
*.json
|
||||
@@ -1,269 +0,0 @@
|
||||
#!/usr/bin/env ruby
|
||||
#
|
||||
# USAGE: ./go-atomic.rb -t T1087 -n 'Enumerate all accounts' --input-output_file=bar
|
||||
#
|
||||
#
|
||||
# Example output:
|
||||
#
|
||||
#
|
||||
# ___ __ _ ____ __ ______
|
||||
# / | / /_____ ____ ___ (_)____ / __ \___ ____/ / /_ __/__ ____ _____ ___
|
||||
# / /| |/ __/ __ \/ __ `__ \/ / ___/ / /_/ / _ \/ __ / / / / _ \/ __ `/ __ `__ \
|
||||
# / ___ / /_/ /_/ / / / / / / / /__ / _, _/ __/ /_/ / / / / __/ /_/ / / / / / /
|
||||
# /_/ |_\__/\____/_/ /_/ /_/_/\___/ /_/ |_|\___/\__,_/ /_/ \___/\__,_/_/ /_/ /_/
|
||||
#
|
||||
# ***** EXECUTION PLAN IS *****
|
||||
# Technique T1087
|
||||
# Test Enumerate all accounts
|
||||
# Inputs output_file = bar
|
||||
# foo = bar
|
||||
#
|
||||
# * Use at your own risk :) *
|
||||
# ***** ***************** *****
|
||||
#
|
||||
# Getting Atomic Tests technique=T1087 from Github repo_org_branch=redcanaryco/master ...
|
||||
# - technique has 10 tests
|
||||
# - found test named 'Enumerate all accounts'
|
||||
#
|
||||
# Checking arguments...
|
||||
# - supplied on command line: ["output_file", "foo"]
|
||||
# - checking for argument name=output_file
|
||||
# * OK - found argument in supplied args
|
||||
# * using name=output_file value=bar
|
||||
#
|
||||
# Checking platform vs our platform (macos)...
|
||||
# - OK - our platform is supported!
|
||||
#
|
||||
# Interpolating command with input arguments...
|
||||
# - interpolating [#{output_file}] => [bar]
|
||||
#
|
||||
# Executing executor=sh command=[cat /etc/passwd > bar]
|
||||
#
|
||||
# Execution Results:
|
||||
# **************************************************
|
||||
#
|
||||
# **************************************************
|
||||
#
|
||||
#
|
||||
# EXECUTION COMPLETE
|
||||
# - Writing results to atomic-test-executor-execution-2018-06-23T04:05:06Z.yaml
|
||||
#
|
||||
require 'yaml'
|
||||
require 'rbconfig'
|
||||
require 'time'
|
||||
require 'optparse'
|
||||
require 'net/http'
|
||||
|
||||
class AtomicTestExecutor
|
||||
# executes a test and returns the recorded Execution Plan
|
||||
def execute!(technique_id:, test_name:, repo_org_branch:, input_args: {})
|
||||
puts <<-'EOF'
|
||||
___ __ _ ____ __ ______
|
||||
/ | / /_____ ____ ___ (_)____ / __ \___ ____/ / /_ __/__ ____ _____ ___
|
||||
/ /| |/ __/ __ \/ __ `__ \/ / ___/ / /_/ / _ \/ __ / / / / _ \/ __ `/ __ `__ \
|
||||
/ ___ / /_/ /_/ / / / / / / / /__ / _, _/ __/ /_/ / / / / __/ /_/ / / / / / /
|
||||
/_/ |_\__/\____/_/ /_/ /_/_/\___/ /_/ |_|\___/\__,_/ /_/ \___/\__,_/_/ /_/ /_/
|
||||
|
||||
EOF
|
||||
|
||||
puts "***** EXECUTION PLAN IS *****"
|
||||
puts " Technique #{technique_id}"
|
||||
puts " Test #{test_name}"
|
||||
puts " Inputs #{input_args.collect {|name, val| "#{name} = #{val}\n "}.join}"
|
||||
puts " * Use at your own risk :) *"
|
||||
puts "***** ***************** *****"
|
||||
|
||||
# find the test
|
||||
test = get_test technique_id: technique_id, test_name: test_name, repo_org_branch: repo_org_branch
|
||||
|
||||
# check our args to make sure we have them all, and get defaults if so
|
||||
input_args = check_args_and_get_defaults atomic_test: test, input_args: input_args
|
||||
|
||||
# check if we're on the right platform for the test
|
||||
check_platform atomic_test: test
|
||||
|
||||
raise "Test has no executor" unless test.has_key? 'executor'
|
||||
test_executor_name = test.fetch('executor').fetch('name')
|
||||
supported_executors = ['command_prompt', 'sh', 'bash', 'powershell']
|
||||
raise "Executor #{test_executor_name} is not supported" unless supported_executors.include? test_executor_name
|
||||
|
||||
# interpolate our input args into the test's command
|
||||
command_to_exec = interpolate_with_args interpolatee: test.fetch('executor').fetch('command').strip,
|
||||
input_args: input_args
|
||||
|
||||
# run the command and get the results
|
||||
executor_results = case test_executor_name
|
||||
when 'command_prompt'
|
||||
execute_command_prompt!(atomic_test: test, command: command_to_exec)
|
||||
when 'sh'
|
||||
execute_sh!(atomic_test: test, command: command_to_exec)
|
||||
when 'bash'
|
||||
execute_bash!(atomic_test: test, command: command_to_exec)
|
||||
when 'powershell'
|
||||
execute_powershell!(atomic_test: test, command: command_to_exec)
|
||||
end
|
||||
|
||||
puts
|
||||
puts "Execution Results:\n#{'*' * 50}\n#{executor_results}\n#{'*' * 50}"
|
||||
|
||||
# mix the results into the Atomic Test so we have an "execution plan"
|
||||
test.fetch('input_arguments', []).each do |arg, options|
|
||||
options['executed_value'] = input_args[arg['name']]
|
||||
end
|
||||
test.fetch('executor')['executed_command'] = {
|
||||
'command' => command_to_exec,
|
||||
'results' => executor_results
|
||||
}
|
||||
|
||||
# return the execution plan
|
||||
test
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def get_test(technique_id:, test_name:, repo_org_branch:)
|
||||
repo_org, branch = repo_org_branch.split('/', 2)
|
||||
raise "REPO/BRANCH must be in format <repo>/<branch>" unless (repo_org && branch)
|
||||
|
||||
technique_id.upcase!
|
||||
technique_id = "T#{technique_id}" unless technique_id.start_with? 'T'
|
||||
|
||||
puts "\nGetting Atomic Tests technique=#{technique_id} from Github repo_org_branch=#{repo_org_branch} ..."
|
||||
url = "https://raw.githubusercontent.com/#{repo_org}/atomic-red-team/#{branch}/atomics/#{technique_id}/#{technique_id}.yaml"
|
||||
atomic_yaml = YAML.safe_load Net::HTTP.get(URI(url))
|
||||
|
||||
puts " - technique has #{atomic_yaml['atomic_tests'].count} tests"
|
||||
test = atomic_yaml['atomic_tests'].find do |test|
|
||||
test['name'] == test_name
|
||||
end
|
||||
raise "Could not find test #{technique_id}/[#{test_name}]" unless test
|
||||
puts " - found test named '#{test_name}'"
|
||||
test
|
||||
end
|
||||
|
||||
def check_args_and_get_defaults(atomic_test:, input_args:)
|
||||
puts "\nChecking arguments..."
|
||||
puts " - supplied on command line: #{input_args.keys}"
|
||||
updated_args = {}
|
||||
atomic_test.fetch('input_arguments', []).each do |arg_name, arg_options|
|
||||
puts " - checking for argument name=#{arg_name}"
|
||||
arg_value = input_args[arg_name]
|
||||
if arg_value
|
||||
puts " * OK - found argument in supplied args"
|
||||
else
|
||||
puts " * XX not found, trying default arg"
|
||||
arg_value = arg_options['default']
|
||||
if arg_value
|
||||
puts " * OK - found argument in defaults"
|
||||
else
|
||||
raise "Argument [#{arg}] is required but not set and has no default" unless arg_value
|
||||
end
|
||||
end
|
||||
|
||||
updated_args[arg_name] = arg_value
|
||||
puts " * using name=#{arg_name} value=#{arg_value}"
|
||||
end
|
||||
updated_args
|
||||
end
|
||||
|
||||
# checks our platform vs test supported platforms, raise exception if not
|
||||
def check_platform(atomic_test:)
|
||||
our_platform = case RbConfig::CONFIG['host_os']
|
||||
when /mswin|msys|mingw|cygwin|bccwin|wince|emc/
|
||||
'windows'
|
||||
when /darwin|mac os/
|
||||
'macos'
|
||||
when /linux|solaris|bsd/
|
||||
'linux'
|
||||
end
|
||||
|
||||
puts "\nChecking platform vs our platform (#{our_platform})..."
|
||||
test_supported_platforms = atomic_test['supported_platforms']
|
||||
|
||||
if !test_supported_platforms.include? our_platform
|
||||
raise "Unable to run test that supports platforms #{test_supported_platforms} because we are on #{our_platform}"
|
||||
end
|
||||
puts " - OK - our platform is supported!"
|
||||
end
|
||||
|
||||
def interpolate_with_args(interpolatee:, input_args:)
|
||||
puts "\nInterpolating command with input arguments..."
|
||||
interpolated = interpolatee
|
||||
input_args.each do |name, value|
|
||||
puts " - interpolating [\#{#{name}}] => [#{value}]"
|
||||
interpolated = interpolated.gsub("\#{#{name}}", value)
|
||||
end
|
||||
interpolated
|
||||
end
|
||||
|
||||
def execute_command_prompt!(atomic_test:, command:)
|
||||
puts "\nExecuting executor=cmd command=[#{command}]"
|
||||
command_results = `cmd.exe /c #{command}`
|
||||
end
|
||||
|
||||
def execute_sh!(atomic_test:, command:)
|
||||
puts "\nExecuting executor=sh command=[#{command}]"
|
||||
command_results = `sh -c "#{command}"`
|
||||
end
|
||||
|
||||
def execute_bash!(atomic_test:, command:)
|
||||
puts "\nExecuting executor=bash command=[#{command}]"
|
||||
command_results = `bash -c #{command}`
|
||||
end
|
||||
|
||||
def execute_powershell!(atomic_test:, command:)
|
||||
puts "\nExecuting executor=powershell command=[#{command}]"
|
||||
command_results = `powershell -iex #{command}`
|
||||
end
|
||||
end
|
||||
|
||||
|
||||
cli_args = []
|
||||
input_args = {}
|
||||
ARGV.each do |arg|
|
||||
if arg.start_with? '--input-'
|
||||
name = arg.split('=', 2).first.gsub(/--input-/, '')
|
||||
value = arg.split('=', 2).last
|
||||
input_args[name] = value
|
||||
else
|
||||
cli_args << arg
|
||||
end
|
||||
end
|
||||
|
||||
options = {
|
||||
repo: 'redcanaryco/master'
|
||||
}
|
||||
parser = OptionParser.new do |opts|
|
||||
opts.banner = "Usage: ./go-atomic.rb -t T1087 -n 'Enumerate all accounts' --input-output_file=bar"
|
||||
|
||||
opts.on('-tTECHNIQUE_ID', '--techniqueTECHNIQUE_ID', 'Technique identifier') do |opt|
|
||||
options[:technique_id] = opt
|
||||
end
|
||||
|
||||
opts.on('-nTEST_NAME', '--nameTEST_NAME', 'Test name') do |opt|
|
||||
options[:test_name] = opt
|
||||
end
|
||||
|
||||
opts.on('-rREPO', '--repoREPO', 'Atomic Red Team repo/branch name (ie, redcanaryco/master)') do |opt|
|
||||
options[:repo] = opt
|
||||
end
|
||||
end
|
||||
parser.parse! cli_args
|
||||
|
||||
begin
|
||||
execution_plan = AtomicTestExecutor.new.execute! technique_id: options[:technique_id],
|
||||
test_name: options[:test_name],
|
||||
repo_org_branch: options[:repo],
|
||||
input_args: input_args
|
||||
|
||||
formatted_time = Time.now.utc.iso8601.gsub(/:/, '.')
|
||||
output_filename = "atomic-test-executor-execution-#{formatted_time}.yaml"
|
||||
puts "\n\nEXECUTION COMPLETE"
|
||||
puts " - Writing results to #{output_filename}"
|
||||
File.write(output_filename, YAML.dump(execution_plan))
|
||||
|
||||
rescue => ex
|
||||
puts "\n\nFATAL ERROR: #{ex.message}"
|
||||
puts ex.backtrace.join("\n")
|
||||
exit 1
|
||||
end
|
||||
Reference in New Issue
Block a user