Files
sigma-rules/kql/dsl.py
T
Ross Wolf 5f867dbb72 Add KQL -> DSL conversion (#81)
* Add KQL -> DSL converter
* Lint with black to 120 chars
* Add more tests and flatten shoulds
* Fix NotValue conversion to DSL
2020-07-22 11:05:45 -06:00

118 lines
4.1 KiB
Python

# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
# or more contributor license agreements. Licensed under the Elastic License;
# you may not use this file except in compliance with the Elastic License.
from collections import defaultdict
from eql import Walker
from .errors import KqlCompileError
def boolean(**kwargs):
"""Wrap a query in a boolean term and optimize while building."""
assert len(kwargs) == 1
[(boolean_type, children)] = kwargs.items()
if not isinstance(children, list):
children = [children]
dsl = defaultdict(list)
if boolean_type in ("must", "filter"):
# safe to convert and(and(x), y) -> and(x, y)
for child in children:
if list(child) == ["bool"]:
for child_type, child_terms in child["bool"].items():
if child_type in ("must", "filter"):
dsl[child_type].extend(child_terms)
elif child_type == "should":
if "should" not in dsl:
dsl[child_type].extend(child_terms)
else:
dsl[boolean_type].append(boolean(should=child_terms))
elif child_type == "must_not":
dsl[child_type].extend(child_terms)
elif child_type != "minimum_should_match":
raise ValueError("Unknown term {}: {}".format(child_type, child_terms))
else:
dsl[boolean_type].append(child)
elif boolean_type == "should":
# can flatten `should` of `should`
for child in children:
if list(child) == ["bool"] and set(child["bool"]).issubset({"should", "minimum_should_match"}):
dsl["should"].extend(child["bool"]["should"])
else:
dsl[boolean_type].append(child)
elif boolean_type == "must_not" and len(children) == 1:
# must_not: [{bool: {must: x}}] -> {must_not: x}
child = children[0]
if list(child) == ["bool"] and list(child["bool"]) in (["filter"], ["must"]):
negated, = child["bool"].values()
dsl = {"must_not": negated}
else:
dsl = {"must_not": children}
else:
dsl = dict(kwargs)
if "should" in dsl:
dsl.update(minimum_should_match=1)
dsl = {"bool": dict(dsl)}
return dsl
class ToDsl(Walker):
def _walk_default(self, node, *args, **kwargs):
raise KqlCompileError("Unable to convert {}".format(node))
def _walk_exists(self, _):
return lambda field: {"exists": {"field": field}}
def _walk_wildcard(self, tree):
return lambda field: {"query_string": {"fields": [field], "query": tree.value}}
def _walk_value(self, tree):
return lambda field: {"match": {field: tree.value}}
def _walk_field(self, field):
return field.name
def _walk_field_range(self, tree):
operator_map = {"<": "lt", "<=": "lte", ">=": "gte", ">": "gt"}
field = self.walk(tree.field)
return {"range": {field: {operator_map[tree.operator]: tree.value.value}}}
def _walk_not_expr(self, tree):
return boolean(must_not=[self.walk(tree.expr)])
def _walk_and_expr(self, tree):
return boolean(filter=[self.walk(node) for node in tree.items])
def _walk_or_expr(self, tree):
return boolean(should=[self.walk(node) for node in tree.items])
def _walk_and_values(self, tree):
children = [self.walk(node) for node in tree.items]
return lambda field: boolean(filter=[child(field) for child in children])
def _walk_or_values(self, tree):
children = [self.walk(node) for node in tree.items]
return lambda field: boolean(should=[child(field) for child in children])
def _walk_not_value(self, tree):
child = self.walk(tree.value)
return lambda field: boolean(must_not=[child(field)])
def _walk_field_comparison(self, tree):
field = self.walk(tree.field)
value_fn = self.walk(tree.value)
return value_fn(field)
@classmethod
def convert(cls, tree):
return boolean(filter=[cls().walk(tree)])