[FR] Add white space checking for KQL parse (#3789)
* Add whitespace checking for KQL parse * Add unit test for blank space check * Bump patch version * Add test cases for newline blank space * Add additional unit tests * Update to only walk tree once --------- Co-authored-by: Terrance DeJesus <99630311+terrancedejesus@users.noreply.github.com>
This commit is contained in:
@@ -16,6 +16,7 @@ from lark.visitors import Interpreter
|
||||
|
||||
from kql.errors import KqlParseError
|
||||
from .ast import * # noqa: F403
|
||||
from .utils import check_whitespace, collect_token_positions
|
||||
|
||||
|
||||
STRING_FIELDS = ("keyword", "text")
|
||||
@@ -376,7 +377,13 @@ def lark_parse(text):
|
||||
walker = BaseKqlParser(text)
|
||||
|
||||
try:
|
||||
return lark_parser.parse(text)
|
||||
tree = lark_parser.parse(text)
|
||||
|
||||
# Check for whitespace around "and" and "or" tokens
|
||||
lines = text.split('\n')
|
||||
check_whitespace(collect_token_positions(tree, ["and", "or"]), lines)
|
||||
|
||||
return tree
|
||||
except UnexpectedEOF:
|
||||
raise KqlParseError("Unexpected EOF", len(walker.lines), len(walker.lines[-1].strip()), walker.lines[-1])
|
||||
except LarkError as exc:
|
||||
|
||||
@@ -0,0 +1,53 @@
|
||||
# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
# or more contributor license agreements. Licensed under the Elastic License
|
||||
# 2.0; you may not use this file except in compliance with the Elastic License
|
||||
# 2.0.
|
||||
|
||||
import re
|
||||
|
||||
from lark import (
|
||||
Token,
|
||||
Tree,
|
||||
)
|
||||
|
||||
from kql.errors import KqlParseError
|
||||
|
||||
|
||||
def check_whitespace(token_positions: list[tuple[int, int, str]], lines: list[str]) -> None:
|
||||
"""Check for whitespace around a token."""
|
||||
for line_num, column, token in token_positions:
|
||||
# Check the substring at the given position
|
||||
line = lines[line_num - 1]
|
||||
start = column - 1
|
||||
end = column + len(token) - 1
|
||||
|
||||
# Handle cases where token starts at the beginning of the line and is followed by whitespace
|
||||
if start == 0 and (end < len(line) and re.match(r"\s", line[end])):
|
||||
continue
|
||||
|
||||
# Check for whitespace around the token
|
||||
if (
|
||||
start > 0
|
||||
and ((end < len(line) and re.match(r"\s", line[end])) or end == len(line))
|
||||
and re.match(r"\s", line[start - 1])
|
||||
):
|
||||
continue
|
||||
raise KqlParseError(
|
||||
error_msg=f"Missing whitespace around '{token}' token",
|
||||
line=line_num,
|
||||
column=column,
|
||||
source=line,
|
||||
width=len(token),
|
||||
trailer=None
|
||||
)
|
||||
|
||||
|
||||
def collect_token_positions(tree: Tree, token_list: list[str]) -> list[tuple[int, int, str]]:
|
||||
"""Collect token positions from a tree for a list of tokens."""
|
||||
token_positions = []
|
||||
for child in tree.children:
|
||||
if isinstance(child, Token) and child.value.lower() in [token.lower() for token in token_list]:
|
||||
token_positions.append((child.line, child.column, child.value))
|
||||
elif isinstance(child, Tree):
|
||||
token_positions.extend(collect_token_positions(child, token_list))
|
||||
return token_positions
|
||||
@@ -1,6 +1,6 @@
|
||||
[project]
|
||||
name = "detection-rules-kql"
|
||||
version = "0.1.8"
|
||||
version = "0.1.9"
|
||||
description = "Kibana Query Language parser for Elastic Detection Rules"
|
||||
license = {text = "Elastic License v2"}
|
||||
keywords = ["Elastic", "sour", "Detection Rules", "Security", "Elasticsearch", "kql"]
|
||||
|
||||
+1
-1
@@ -1,6 +1,6 @@
|
||||
[project]
|
||||
name = "detection_rules"
|
||||
version = "1.3.16"
|
||||
version = "1.3.17"
|
||||
description = "Detection Rules is the home for rules used by Elastic Security. This repository is used for the development, maintenance, testing, validation, and release of rules for Elastic Security’s Detection Engine."
|
||||
readme = "README.md"
|
||||
requires-python = ">=3.12"
|
||||
|
||||
@@ -103,3 +103,12 @@ class ParserTests(unittest.TestCase):
|
||||
"{'match': {'destination.ip': '169.254.169.254'}}]}}]}}"
|
||||
)
|
||||
self.assertEqual(dsl_str, good_case, "DSL string does not match the good case, optimization failed.")
|
||||
|
||||
def test_blank_space(self):
|
||||
with self.assertRaises(kql.KqlParseError):
|
||||
kql.lark_parse('"Test-ServiceDaclPermission" or"Update-ExeFunctions"')
|
||||
kql.lark_parse('"Test-ServiceDaclPermission"and "Update-ExeFunctions"')
|
||||
kql.lark_parse('"Test-ServiceDaclPermission" or "Update-ExeFunctions"')
|
||||
kql.lark_parse('"Test-ServiceDaclPermission" \nor "Update-ExeFunctions"')
|
||||
kql.lark_parse('"Test-ServiceDaclPermission" or\n "Update-ExeFunctions"')
|
||||
kql.lark_parse('"Test-ServiceDaclPermissionOr" or\n "Update-ExeAndFunctions"')
|
||||
|
||||
Reference in New Issue
Block a user