Python runner checks dependencies and run cleanup (#1011)
Co-authored-by: hypnoticpattern <> Co-authored-by: Carrie Roberts <clr2of8@gmail.com>
This commit is contained in:
@@ -76,6 +76,8 @@ You will be dropped in a REPL that will allow you to interact with the system.
|
||||
|
||||
|
||||
Do you want to run this? (Y/n): y
|
||||
Do you want to check dependencies? (Y/n): y
|
||||
Do you want to run the cleanup after the executor completes? (Y/n): y
|
||||
Please provide a parameter for 'computer_name' (blank for default): MY_COMPUTER
|
||||
|
||||
------------------------------------------------
|
||||
@@ -106,7 +108,14 @@ To use in a Python script, simply import the script as a package, create an `Ato
|
||||
# This is a dictionary of the arguments to pass to the executor. Both the key and value are
|
||||
# to be passed as strings. You may also launch an interactive execution, and the script will
|
||||
# tell you what needs to be entered to invoke this test.
|
||||
techniques.execute("T1033", position=0, parameters={"computer_name": "DA2CTC"})
|
||||
# Forth parameter (True): Check dependencies before running the executor.
|
||||
# This is a boolean value that indicates if the runner has to check if the dependencies are met.
|
||||
# If there are missing dependencies the runner tries to fetch them if the `get_prereq_command` section
|
||||
# in the yaml file is specified.
|
||||
# Fifth parameter (True): Cleanup after the executor completed.
|
||||
# This is a boolean value that indicates if the runner has execute the cleanup section after the executor completes.
|
||||
# The cleanup commands are specified in the `cleanup_command` section within the executor
|
||||
techniques.execute("T1033", position=0, parameters={"computer_name": "DA2CTC"}, True, True)
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -114,10 +123,10 @@ To use in a Python script, simply import the script as a package, create an `Ato
|
||||
In Shell Scripts
|
||||
----------------
|
||||
|
||||
To use in a shell script, a CLI interface has been produced. YOu may use it the following way:
|
||||
To use in a shell script, a CLI interface has been produced. You may use it the following way:
|
||||
|
||||
```
|
||||
python runner.py run T1033 0 --args '{"computer_name": "DA2CTC"}'
|
||||
python runner.py run T1033 0 --args '{"computer_name": "DA2CTC"}' --dependencies --cleanup
|
||||
```
|
||||
|
||||
If you're unsure of how to use this, you may also launch an interactive execution, and just before actually launching the command, the system will report the line that needs to be added to your shell script in order to run it as desired.
|
||||
|
||||
@@ -70,7 +70,7 @@ def load_technique(path_to_dir):
|
||||
|
||||
# Load and parses its content.
|
||||
with open(file_entry, 'r', encoding="utf-8") as f:
|
||||
return yaml.load(unidecode.unidecode(f.read()))
|
||||
return yaml.load(unidecode.unidecode(f.read()), Loader=yaml.SafeLoader)
|
||||
|
||||
|
||||
def load_techniques():
|
||||
@@ -85,13 +85,14 @@ def load_techniques():
|
||||
|
||||
# Create a dict to accept the techniques that will be loaded.
|
||||
techniques = {}
|
||||
print("Loading Technique", end="")
|
||||
|
||||
# For each tech directory in the main directory.
|
||||
for atomic_entry in os.listdir(normalized_atomics_path):
|
||||
|
||||
# Make sure that it matches the current pattern.
|
||||
if fnmatch.fnmatch(atomic_entry, TECHNIQUE_DIRECTORY_PATTERN):
|
||||
print("Loading Technique {}...".format(atomic_entry))
|
||||
print(", {}".format(atomic_entry), end="")
|
||||
|
||||
# Get path to tech dir.
|
||||
path_to_dir = os.path.join(normalized_atomics_path, atomic_entry)
|
||||
@@ -102,9 +103,47 @@ def load_techniques():
|
||||
|
||||
# Add path to technique's directory.
|
||||
techniques[atomic_entry]["path"] = path_to_dir
|
||||
|
||||
print(".")
|
||||
return techniques
|
||||
|
||||
def check_dependencies(executor, cwd):
|
||||
dependencies = "dependencies"
|
||||
dependencies_executor = "dependency_executor_name"
|
||||
prereq_command = "prereq_command"
|
||||
get_prereq_command = "get_prereq_command"
|
||||
input_arguments = "input_arguments"
|
||||
|
||||
# If the executor doesn't have dependencies_executor key it doesn't have dependencies. Skip
|
||||
if dependencies not in executor or dependencies not in executor:
|
||||
print("No '{}' or '{}' section found in the yaml file. Skipping dependencies check.".format(dependencies_executor,dependencies))
|
||||
return True
|
||||
|
||||
launcher = executor[dependencies_executor]
|
||||
|
||||
for dep in executor[dependencies]:
|
||||
args = executor[input_arguments] if input_arguments in executor else {}
|
||||
final_parameters = set_parameters(args, {})
|
||||
command = build_command(launcher, dep[prereq_command], final_parameters, cwd)
|
||||
|
||||
p = subprocess.Popen(launcher, shell=False, stdin=subprocess.PIPE, stdout=subprocess.PIPE,
|
||||
stderr=subprocess.STDOUT, env=os.environ, cwd=cwd)
|
||||
|
||||
p.communicate(bytes(command, "utf-8") + b"\n", timeout=COMMAND_TIMEOUT)
|
||||
# If the dependencies are not satisfied the command will exit with code 1, 0 otherwise.
|
||||
if p.returncode != 0:
|
||||
print("Dependencies not found. Fetching them...")
|
||||
if get_prereq_command not in dep:
|
||||
print("Missing {} commands in the yaml file. Can't fetch requirements".format(get_prereq_command))
|
||||
return False
|
||||
command = build_command(launcher, dep[get_prereq_command], final_parameters, cwd)
|
||||
d = subprocess.Popen(launcher, shell=False, stdin=subprocess.PIPE, stdout=subprocess.PIPE,
|
||||
stderr=subprocess.STDOUT, env=os.environ, cwd=cwd)
|
||||
out, err = d.communicate(bytes(command, "utf-8") + b"\n", timeout=COMMAND_TIMEOUT)
|
||||
p.terminate()
|
||||
|
||||
return True
|
||||
|
||||
|
||||
|
||||
##########################################
|
||||
# Executors
|
||||
@@ -175,14 +214,23 @@ def executor_get_input_arguments(input_arguments):
|
||||
return parameters
|
||||
|
||||
|
||||
def print_non_interactive_command_line(technique_name, executor_number, parameters):
|
||||
def print_non_interactive_command_line(technique_name, executor_number, parameters, check_dep, run_cleanup):
|
||||
"""Prints the comand line to use in order to launch the technique non-interactively."""
|
||||
flag_dep = ""
|
||||
flag_cleanup = ""
|
||||
|
||||
if check_dep:
|
||||
flag_dep = "--dependencies"
|
||||
|
||||
if run_cleanup:
|
||||
flag_cleanup = "--cleanup"
|
||||
|
||||
print("In order to run this non-interactively:")
|
||||
print(" Python:")
|
||||
print(" techniques = runner.AtomicRunner()")
|
||||
print(" techniques.execute(\"{name}\", position={pos}, parameters={params})".format(name=technique_name, pos=executor_number, params=parameters))
|
||||
print(" techniques.execute(\"{name}\", position={pos}, parameters={params}, dependencies={dep}, cleanup={cleanup})".format(name=technique_name, pos=executor_number, params=parameters, dep=check_dep, cleanup=run_cleanup))
|
||||
print(" Shell Script:")
|
||||
print(" python3 runner.py run {name} {pos} --args '{params}' \n".format(name=technique_name, pos=executor_number, params=json.dumps(parameters)))
|
||||
print(" python3 runner.py run {name} {pos} --args '{params}' {dep} {cleanup}\n".format(name=technique_name, pos=executor_number, params=json.dumps(parameters), dep=flag_dep, cleanup=flag_cleanup))
|
||||
|
||||
|
||||
def interactive_apply_executor(executor, path, technique_name, executor_number):
|
||||
@@ -196,22 +244,35 @@ def interactive_apply_executor(executor, path, technique_name, executor_number):
|
||||
print("Cancelled.")
|
||||
return
|
||||
|
||||
# Request if we want to check the dependencies before running the executor.
|
||||
check_dep = yes_or_no("Do you want to check dependencies? ")
|
||||
|
||||
# Request if we want to cleanup after the executor completes.
|
||||
run_cleanup = yes_or_no("Do you want to run the cleanup after the executor completes? ")
|
||||
|
||||
# If so, get the input parameters.
|
||||
if "input_arguments" in executor:
|
||||
parameters = executor_get_input_arguments(executor["input_arguments"])
|
||||
else:
|
||||
parameters = {}
|
||||
|
||||
# Prints the Command line to enter for non-interactive execution.
|
||||
print_non_interactive_command_line(technique_name, executor_number, parameters)
|
||||
if check_dep:
|
||||
if not check_dependencies(executor, path):
|
||||
print("Check dependencies failed. Cancelling...")
|
||||
return
|
||||
|
||||
# Prints the Command line to enter for non-interactive execution.
|
||||
print_non_interactive_command_line(technique_name, executor_number, parameters, check_dep, run_cleanup)
|
||||
launcher = convert_launcher(executor["executor"]["name"])
|
||||
command = executor["executor"]["command"]
|
||||
built_command = build_command(launcher, command, parameters)
|
||||
built_command = build_command(launcher, command, parameters, path)
|
||||
|
||||
# begin execution with the above parameters.
|
||||
execute_command(launcher, built_command, path)
|
||||
|
||||
if run_cleanup:
|
||||
apply_cleanup(executor, path, parameters)
|
||||
|
||||
|
||||
def get_default_parameters(args):
|
||||
"""Build a default parameters dictionary from the content of the YAML file."""
|
||||
@@ -243,11 +304,23 @@ def apply_executor(executor, path, parameters):
|
||||
|
||||
launcher = convert_launcher(executor["executor"]["name"])
|
||||
command = executor["executor"]["command"]
|
||||
built_command = build_command(launcher, command, final_parameters)
|
||||
built_command = build_command(launcher, command, final_parameters, path)
|
||||
|
||||
# begin execution with the above parameters.
|
||||
execute_command(launcher, built_command, path)
|
||||
|
||||
def apply_cleanup(executor, path, parameters):
|
||||
if "cleanup_command" not in executor["executor"] or executor["executor"]["cleanup_command"] == None:
|
||||
print("No cleanup section found in the yaml file. Skipping...")
|
||||
return
|
||||
|
||||
args = executor["input_arguments"] if "input_arguments" in executor else {}
|
||||
final_parameters = set_parameters(args, parameters)
|
||||
launcher = convert_launcher(executor["executor"]["name"])
|
||||
command = executor["executor"]["cleanup_command"]
|
||||
built_command = build_command(launcher, command, final_parameters, path)
|
||||
# begin execution with the above parameters.
|
||||
execute_command(launcher, built_command, path)
|
||||
|
||||
##########################################
|
||||
# Text Input
|
||||
@@ -316,6 +389,9 @@ def convert_launcher(launcher):
|
||||
|
||||
elif launcher == "sh":
|
||||
return "/bin/sh"
|
||||
|
||||
elif launcher == "bash":
|
||||
return "/bin/bash"
|
||||
|
||||
elif launcher == "manual":
|
||||
# We cannot process manual execution with this script. Raise an exception.
|
||||
@@ -323,11 +399,11 @@ def convert_launcher(launcher):
|
||||
|
||||
else:
|
||||
# This launcher is not known. Returning it directly.
|
||||
print("Warning: Launcher '{}' has no specific case! Returning as is.")
|
||||
print("Warning: Launcher '{}' has no specific case! Invoking as is.".format(launcher))
|
||||
return launcher
|
||||
|
||||
|
||||
def build_command(launcher, command, parameters): #pylint: disable=unused-argument
|
||||
def build_command(launcher, command, parameters, path): #pylint: disable=unused-argument
|
||||
"""Builds the command line that will eventually be run."""
|
||||
|
||||
# Using a closure! We use the replace to match found objects
|
||||
@@ -347,6 +423,11 @@ def build_command(launcher, command, parameters): #pylint: disable=unused-argume
|
||||
# Fix string interpolation (from ruby to Python!) -- #{}
|
||||
command = re.sub(r"\#\{(.+?)\}", replacer, command)
|
||||
|
||||
# Replace instances of PathToAtomicsFolder
|
||||
atomics = os.path.join(path, "..")
|
||||
command = command.replace("$PathToAtomicsFolder", atomics)
|
||||
command = command.replace("PathToAtomicsFolder", atomics)
|
||||
|
||||
return command
|
||||
|
||||
|
||||
@@ -393,11 +474,6 @@ def execute_command(launcher, command, cwd):
|
||||
|
||||
print("\n------------------------------------------------")
|
||||
|
||||
# Replace instances of PathToAtomicsFolder
|
||||
atomics = os.path.join(cwd, "..")
|
||||
command = command.replace("$PathToAtomicsFolder", atomics)
|
||||
command = command.replace("PathToAtomicsFolder", atomics)
|
||||
|
||||
# If launcher is powershell we execute all commands under a single process
|
||||
# powershell.exe -Command - (Tell powershell to read scripts from stdin)
|
||||
if "powershell" in launcher:
|
||||
@@ -518,20 +594,35 @@ class AtomicRunner():
|
||||
i = input("> ").strip()
|
||||
|
||||
|
||||
def execute(self, technique_name, position=0, parameters=None):
|
||||
def execute(self, technique_name, position=0, parameters=None, dependencies=False, cleanup=False):
|
||||
"""Runs a technique non-interactively."""
|
||||
|
||||
parameters = parameters or {}
|
||||
|
||||
print("================================================")
|
||||
print("Executing {}/{}\n".format(technique_name, position))
|
||||
|
||||
if technique_name not in self.techniques:
|
||||
print("No technique {} found. Skipping...".format(technique_name))
|
||||
return False
|
||||
|
||||
# Gets the tech.
|
||||
tech = self.techniques[technique_name]
|
||||
|
||||
# Gets Executors.
|
||||
executors = get_valid_executors(tech)
|
||||
|
||||
if len(executors) < position:
|
||||
print("The position '{}' couldn't be found.".format(position))
|
||||
print("The teqhnique {} has {} available tests for the current platform. Skipping...".format(technique_name,len(executors)))
|
||||
return False
|
||||
|
||||
print("================================================")
|
||||
if dependencies:
|
||||
print("Checking dependencies {}/{}\n".format(technique_name, position))
|
||||
if not check_dependencies(executors[position], tech["path"]):
|
||||
return False
|
||||
|
||||
print("Executing {}/{}\n".format(technique_name, position))
|
||||
|
||||
|
||||
try:
|
||||
# Get executor at given position.
|
||||
executor = executors[position]
|
||||
@@ -557,6 +648,10 @@ class AtomicRunner():
|
||||
except ManualExecutorException:
|
||||
print("Cannot launch a technique with a manual executor. Aborting.")
|
||||
return False
|
||||
finally:
|
||||
if cleanup:
|
||||
print("Running cleanup commands.")
|
||||
apply_cleanup(executor, tech["path"], parameters)
|
||||
|
||||
return True
|
||||
|
||||
@@ -615,7 +710,7 @@ def interactive(args): #pylint: disable=unused-argument
|
||||
def run(args):
|
||||
"""Launch the runner in non-interactive mode."""
|
||||
runner = AtomicRunner()
|
||||
runner.execute(args.technique, args.position, json.loads(args.args))
|
||||
runner.execute(args.technique, args.position, json.loads(args.args), args.dependencies, args.cleanup, )
|
||||
|
||||
|
||||
def clear(args):
|
||||
@@ -634,6 +729,8 @@ def main():
|
||||
parser_run = subparsers.add_parser('run', help="Ponctually runs a single technique / executor pair.")
|
||||
parser_run.add_argument('technique', type=str, help="Technique to run.")
|
||||
parser_run.add_argument('position', type=int, help="Position of the executor in technique to run.")
|
||||
parser_run.add_argument("--dependencies", action='store_true', help="Check for dependencies, in any, and fetch them if necessary.")
|
||||
parser_run.add_argument("--cleanup", action='store_true', help="Run cleanup commands, if any, after executor completed.")
|
||||
parser_run.add_argument('--args', type=str, default="{}", help="JSON string representing a dictionary of arguments (eg. '{ \"arg1\": \"val1\", \"arg2\": \"val2\" }' )")
|
||||
parser_run.set_defaults(func=run)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user