[FR] Updates to KQL Lib Parsing and Install (#3605)

* Bump Version

* updated

* Bump patch version

* Optimization should only occur on single values

* Wildcard semantically equivalent to query_string*

* Add unit test for optimization

* Move code-checks to yml

* Add tests path to code-checks

* Add lib path for code-checks

* Install deps from local

* Update DSL optimization unit test

---------

Co-authored-by: Terrance DeJesus <99630311+terrancedejesus@users.noreply.github.com>
This commit is contained in:
Eric Forte
2025-07-10 15:03:08 -04:00
committed by GitHub
parent b70792082a
commit 03f977246f
8 changed files with 37 additions and 7 deletions
@@ -8,6 +8,8 @@ on:
paths:
- 'detection_rules/**/*.py'
- 'hunting/**/*.py'
- 'tests/**/*.py'
- 'lib/**/*.py'
jobs:
code-checks:
+2
View File
@@ -30,6 +30,8 @@ jobs:
python -m pip install --upgrade pip
pip cache purge
pip install .[dev]
pip install lib/kibana
pip install lib/kql
- name: Unit tests
env:
+1 -1
View File
@@ -13,7 +13,7 @@ from .evaluator import FilterGenerator
from .kql2eql import KqlToEQL
from .parser import lark_parse, KqlParser
__version__ = '0.1.7'
__version__ = '0.1.8'
__all__ = (
"ast",
"from_eql",
+11 -3
View File
@@ -47,9 +47,18 @@ def boolean(**kwargs):
elif boolean_type == "must_not" and len(children) == 1:
# must_not: [{bool: {must: x}}] -> {must_not: x}
# optimize can only occur with one term
# e.g. the following would not be valid
# must_not: [{bool: {must: x} and {bool: {must: y} }] -> {must_not: x} {must_not: y}
child = children[0]
if list(child) == ["bool"] and list(child["bool"]) in (["filter"], ["must"]):
negated, = child["bool"].values()
is_bool = list(child) == ["bool"]
bool_keys = list(child.get("bool", {}))
has_valid_keys = bool_keys in (["filter"], ["must"])
has_single_filter = len(child.get("bool", {}).get("filter", [])) == 1
has_single_must = len(child.get("bool", {}).get("must", [])) == 1
if is_bool and has_valid_keys and (has_single_filter or has_single_must):
(negated,) = child["bool"].values()
dsl = {"must_not": negated}
else:
dsl = {"must_not": children}
@@ -65,7 +74,6 @@ def boolean(**kwargs):
class ToDsl(Walker):
def _walk_default(self, node, *args, **kwargs):
raise KqlCompileError("Unable to convert {}".format(node))
+1 -1
View File
@@ -1,6 +1,6 @@
[project]
name = "detection-rules-kql"
version = "0.1.7"
version = "0.1.8"
description = "Kibana Query Language parser for Elastic Detection Rules"
license = {text = "Elastic License v2"}
keywords = ["Elastic", "sour", "Detection Rules", "Security", "Elasticsearch", "kql"]
+1 -1
View File
@@ -1,6 +1,6 @@
[project]
name = "detection_rules"
version = "1.3.6"
version = "1.3.7"
description = "Detection Rules is the home for rules used by Elastic Security. This repository is used for the development, maintenance, testing, validation, and release of rules for Elastic Securitys Detection Engine."
readme = "README.md"
requires-python = ">=3.12"
+1 -1
View File
@@ -76,7 +76,7 @@ class TestKQLtoDSL(unittest.TestCase):
self.validate(
"not (field:value and field2:value2)",
{"must_not": [{"match": {"field": "value"}}, {"match": {"field2": "value2"}}]},
{"must_not": [{"bool": {"filter": [{"match": {"field": "value"}}, {"match": {"field2": "value2"}}]}}]},
)
def test_optimizations(self):
+18
View File
@@ -85,3 +85,21 @@ class ParserTests(unittest.TestCase):
with self.assertRaises(kql.KqlParseError):
kql.parse("@time > 5", schema=schema)
def test_optimization(self):
query = 'host.name: test-* and not (destination.ip : "127.0.0.53" and destination.ip : "169.254.169.254")'
dsl_str = str(kql.to_dsl(query))
bad_case = (
"{'bool': {'filter': [{'query_string': {'fields': ['host.name'], 'query': 'test-*'}}], "
"'must_not': [{'match': {'destination.ip': '127.0.0.53'}}, "
"{'match': {'destination.ip': '169.254.169.254'}}]}}"
)
self.assertNotEqual(dsl_str, bad_case, "DSL string matches the bad case, optimization failed.")
good_case = (
"{'bool': {'filter': [{'query_string': {'fields': ['host.name'], 'query': 'test-*'}}], "
"'must_not': [{'bool': {'filter': [{'match': {'destination.ip': '127.0.0.53'}}, "
"{'match': {'destination.ip': '169.254.169.254'}}]}}]}}"
)
self.assertEqual(dsl_str, good_case, "DSL string does not match the good case, optimization failed.")