diff --git a/lib/kibana/pyproject.toml b/lib/kibana/pyproject.toml index a9f01f974..38066719a 100644 --- a/lib/kibana/pyproject.toml +++ b/lib/kibana/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "detection-rules-kibana" -version = "0.4.4" +version = "0.4.5" description = "Kibana API utilities for Elastic Detection Rules" license = {text = "Elastic License v2"} keywords = ["Elastic", "Kibana", "Detection Rules", "Security", "Elasticsearch"] diff --git a/lib/kql/kql/kql.g b/lib/kql/kql/kql.g index 8f2cec115..acc8ac4dc 100644 --- a/lib/kql/kql/kql.g +++ b/lib/kql/kql/kql.g @@ -15,7 +15,9 @@ field_value_expression: field ":" list_of_values ?value_expression: value ?list_of_values: "(" or_list_of_values ")" - | value + | optional_not value +?optional_not: NOT optional_not + | ?or_list_of_values: and_list_of_values (OR and_list_of_values)* ?and_list_of_values: not_list_of_values (AND not_list_of_values)* ?not_list_of_values: NOT? list_of_values @@ -23,6 +25,7 @@ field_value_expression: field ":" list_of_values field: literal value: QUOTED_STRING + | WILDCARD_LITERAL | UNQUOTED_LITERAL @@ -34,6 +37,21 @@ RANGE_OPERATOR: "<=" | "<" | ">" +// Wildcard literal - for wildcard values containing spaces +// Priority 3 ensures it matches before keywords (priority 2) and unquoted literals +// Uses word boundary \b to stop before 'or', 'and', 'not' keywords +// MUST contain at least one space to differentiate from field names like common.* +// Must not start with not/or/and so "not /tmp/go-build*" is not matched (value is UNQUOTED_LITERAL) +// All patterns end with (?=\s|$|[()":{}]) to prevent capturing trailing whitespace +// Pattern 1: Starts with * (e.g., *S3 Browser, *S3 Browser*) +// Pattern 2: Ends with * but doesn't start with * (e.g., S3 Browser*) +// Pattern 3a: Middle * - star appears AFTER a space (e.g., S3 B*owser) +// Pattern 3b: Middle * - star appears BEFORE a space (e.g., S3* Browser) +WILDCARD_LITERAL.3: /\*[^\s\r\n()"':{}]*(?:\s+(?!(?:or|and|not)\b)[^\s\r\n()"':{}]+)+\*?(?=\s|$|[()":{}])/i + | /(?!(?:not|or|and)\b)[^\s\r\n()"':{}][^\s\r\n()"':{}]*(?:\s+(?!(?:or|and|not)\b)[^\s\r\n()"':{}]+)+\*(?=\s|$|[()":{}])/i + | /(?!(?:not|or|and)\b)[^\s\r\n()"'*:{}][^\s\r\n()"':{}]*\s+(?!(?:or|and|not)\b)[^\s\r\n()"':{}]*\*[^\s\r\n()"':{}]+(?:\s+(?!(?:or|and|not)\b)[^\s\r\n()"':{}]+)*(?"*{}]/ // escaped specials diff --git a/lib/kql/kql/kql2eql.py b/lib/kql/kql/kql2eql.py index 7a275c00f..5395a0a8a 100755 --- a/lib/kql/kql/kql2eql.py +++ b/lib/kql/kql/kql2eql.py @@ -72,19 +72,49 @@ class KqlToEQL(BaseKqlParser): def not_list_of_values(self, tree): return eql.ast.Not(self.visit(tree.children[-1])) + def list_of_values(self, tree): + if len(tree.children) == 2: + # optional_not value + opt_not_tree, value_tree = tree.children + not_count = self.visit(opt_not_tree) + value = self.visit(value_tree) + for _ in range(not_count): + value = eql.ast.Not(value) + return value + else: + # "(" or_list_of_values ")" + return self.visit(tree.children[1]) + + def optional_not(self, tree): + count = 0 + for child in tree.children: + if hasattr(child, "type") and child.type == "NOT": + count += 1 + else: + count += self.visit(child) + return count + def field(self, tree): literal = self.visit(tree.children[0]) return eql.utils.to_unicode(literal) def value(self, tree): - # TODO: check the logic for kuery.peg - value = self.unescape_literal(tree.children[0]) - if self.scoped_field is None: raise self.error(tree, "Value not tied to field") + token = tree.children[0] + value = self.unescape_literal(token) + field_name = self.scoped_field field = self.to_eql_field(field_name) + + # Handle wildcard literals (may contain spaces) and unquoted literals with wildcards + if token.type == "WILDCARD_LITERAL" or (token.type == "UNQUOTED_LITERAL" and "*" in str(token.value)): + if eql.utils.is_string(value) and value.replace("*", "").strip() == "": + return eql.ast.IsNotNull(field) + value_ast = eql.ast.Literal.from_python(value) + return eql.ast.FunctionCall("wildcard", [field, value_ast]) + value = self.convert_value(field_name, value, tree) value_ast = eql.ast.Literal.from_python(value) diff --git a/lib/kql/kql/parser.py b/lib/kql/kql/parser.py index e0ec03492..020d0d304 100644 --- a/lib/kql/kql/parser.py +++ b/lib/kql/kql/parser.py @@ -338,6 +338,28 @@ class KqlParser(BaseKqlParser): self.assert_lower_token(*tree.child_tokens) return NotValue(self.visit(tree.children[-1])) + def list_of_values(self, tree): + if len(tree.children) == 2: + # optional_not value + opt_not_tree, value_tree = tree.children + not_count = self.visit(opt_not_tree) + value = self.visit(value_tree) + for _ in range(not_count): + value = NotValue(value) + return value + else: + # "(" or_list_of_values ")" + return self.visit(tree.children[1]) + + def optional_not(self, tree): + count = 0 + for child in tree.children: + if hasattr(child, "type") and child.type == "NOT": + count += 1 + else: + count += self.visit(child) + return count + def literal(self, tree): return self.unescape_literal(tree.children[0]) @@ -353,9 +375,11 @@ class KqlParser(BaseKqlParser): token = tree.children[0] value = self.unescape_literal(token) - if token.type == "UNQUOTED_LITERAL" and "*" in token.value: + # Handle wildcard literals (may contain spaces) and unquoted literals with wildcards + if token.type == "WILDCARD_LITERAL" or (token.type == "UNQUOTED_LITERAL" and "*" in token.value): field_type = self.get_field_type(field_name) - if len(value.replace("*", "")) == 0: + + if len(token.value.replace("*", "").strip()) == 0: return Exists() if field_type is not None and field_type not in ("keyword", "wildcard"): @@ -364,6 +388,12 @@ class KqlParser(BaseKqlParser): return Wildcard(token.value) + # For quoted strings, treat as literal values (wildcards in quotes are literal in Kibana) + # This bypasses Value.from_python's wildcard conversion for quoted strings + if token.type == "QUOTED_STRING": + value = self.convert_value(field_name, value, tree) + return String(eql.utils.to_unicode(value)) if eql.utils.is_string(value) else Value.from_python(value) + # try to convert the value to the appropriate type # example: 1 -> "1" if the field is actually keyword value = self.convert_value(field_name, value, tree) diff --git a/lib/kql/pyproject.toml b/lib/kql/pyproject.toml index daced3b23..422852262 100644 --- a/lib/kql/pyproject.toml +++ b/lib/kql/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "detection-rules-kql" -version = "0.1.9" +version = "0.1.10" description = "Kibana Query Language parser for Elastic Detection Rules" license = {text = "Elastic License v2"} keywords = ["Elastic", "sour", "Detection Rules", "Security", "Elasticsearch", "kql"] diff --git a/pyproject.toml b/pyproject.toml index 7ecb21069..254438509 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "detection_rules" -version = "1.6.4" +version = "1.6.5" description = "Detection Rules is the home for rules used by Elastic Security. This repository is used for the development, maintenance, testing, validation, and release of rules for Elastic Security’s Detection Engine." readme = "README.md" requires-python = ">=3.12" diff --git a/tests/kuery/test_parser.py b/tests/kuery/test_parser.py index 40cdea0b7..48e31af92 100644 --- a/tests/kuery/test_parser.py +++ b/tests/kuery/test_parser.py @@ -7,12 +7,16 @@ import unittest import kql from kql.ast import ( + AndExpr, Exists, Field, FieldComparison, FieldRange, + NotExpr, + NotValue, Number, String, + Wildcard, ) @@ -112,3 +116,59 @@ class ParserTests(unittest.TestCase): kql.lark_parse('"Test-ServiceDaclPermission" \nor "Update-ExeFunctions"') kql.lark_parse('"Test-ServiceDaclPermission" or\n "Update-ExeFunctions"') kql.lark_parse('"Test-ServiceDaclPermissionOr" or\n "Update-ExeAndFunctions"') + + def test_wildcard_with_spaces(self): + """Test wildcard values containing spaces (WILDCARD_LITERAL patterns).""" + # Pattern 1: Starts with * (e.g., *S3 Browser, *S3 Browser*) + self.validate("field: *S3 Browser*", FieldComparison(Field("field"), Wildcard("*S3 Browser*"))) + self.validate("field: *S3 Browser", FieldComparison(Field("field"), Wildcard("*S3 Browser"))) + + # Pattern 2: Ends with * but doesn't start with * (e.g., S3 Browser*) + self.validate("field: S3 Browser*", FieldComparison(Field("field"), Wildcard("S3 Browser*"))) + + # Pattern 3a: Middle * - star appears AFTER a space (e.g., S3 B*owser) + self.validate("field: S3 B*owser", FieldComparison(Field("field"), Wildcard("S3 B*owser"))) + + # Pattern 3b: Middle * - star appears BEFORE a space (e.g., S3* Browser) + self.validate("field: S3* Browser", FieldComparison(Field("field"), Wildcard("S3* Browser"))) + + # Multiple wildcards with spaces + self.validate("field: foo* bar* baz", FieldComparison(Field("field"), Wildcard("foo* bar* baz"))) + + def test_wildcard_with_spaces_and_keywords(self): + """Test wildcard values containing spaces followed by keywords.""" + # Wildcard followed by 'and' keyword + result = kql.parse("field: *S3 Browser* and other: value", optimize=False) + self.assertIsInstance(result, AndExpr) + + # Wildcard followed by 'or' keyword + result = kql.parse("field: S3 Browser* or other: value", optimize=False) + self.assertIsNotNone(result) + + # Keywords inside wildcard values (should be part of the wildcard) + self.validate("field: *or something*", FieldComparison(Field("field"), Wildcard("*or something*"))) + self.validate("field: *not this*", FieldComparison(Field("field"), Wildcard("*not this*"))) + + def test_not_prefix_with_wildcard(self): + """Test NOT keyword is not consumed as part of wildcard literal.""" + # NOT prefix should create NotExpr, not be part of the wildcard + result = kql.parse("process.executable: not /test/go-build*", optimize=False) + self.assertIsInstance(result, FieldComparison) + self.assertIsInstance(result.value, NotValue) + self.assertIsInstance(result.value.value, Wildcard) + self.assertEqual(result.value.value.value, "/test/go-build*") + + def test_quoted_wildcard_as_literal(self): + """Test that quoted wildcards are treated as literal strings, not wildcards.""" + # Quoted wildcard should be a String, not a Wildcard + self.validate('field: "*text*"', FieldComparison(Field("field"), String("*text*"))) + + def test_triple_not_optimization(self): + """Test that triple NOT optimizes correctly: not(not(not(x))) = not(x).""" + # Triple NOT should optimize to single NOT + result = kql.parse("process.name: not not not foo", optimize=True) + # After optimization, not(not(not(foo))) should become not(foo) + # The structure is NotExpr(FieldComparison(..., String(...))) + self.assertIsInstance(result, NotExpr) + self.assertIsInstance(result.expr, FieldComparison) + self.assertEqual(result.expr.value.value, "foo")