Files
Sergey Polzunov c7246313f7 feat: ESQL query validation against Elastic cluster (#4955)
* Add remote ESQL validation
---------

Co-authored-by: Eric Forte <119343520+eric-forte-elastic@users.noreply.github.com>
Co-authored-by: eric-forte-elastic <eric.forte@elastic.co>
Co-authored-by: Mika Ayenson <mika.ayenson@elastic.co>
Co-authored-by: Mika Ayenson, PhD <Mikaayenson@users.noreply.github.com>
2025-10-15 15:17:07 -04:00

61 lines
2.1 KiB
Python

# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
# or more contributor license agreements. Licensed under the Elastic License
# 2.0; you may not use this file except in compliance with the Elastic License
# 2.0.
"""ESQL Query Parsing Classes."""
import re
from dataclasses import dataclass
@dataclass
class EventDataset:
"""Dataclass for event.dataset with integration and datastream parts."""
package: str
integration: str
def __str__(self) -> str:
return f"{self.package}.{self.integration}"
def get_esql_query_event_dataset_integrations(query: str) -> list[EventDataset]:
"""Extract event.dataset and data_stream.dataset integrations from an ES|QL query."""
number_of_parts = 2
# Regex patterns for event.dataset, and data_stream.dataset
# This mimics the logic in get_datasets_and_modules but for ES|QL as we do not have an ast
regex_patterns = {
"in": [
re.compile(r"event\.dataset\s+in\s*\(\s*([^)]+)\s*\)"),
re.compile(r"data_stream\.dataset\s+in\s*\(\s*([^)]+)\s*\)"),
],
"eq": [
re.compile(r'event\.dataset\s*==\s*"([^"]+)"'),
re.compile(r'data_stream\.dataset\s*==\s*"([^"]+)"'),
],
}
# Extract datasets
datasets: list[str] = []
for regex_list in regex_patterns.values():
for regex in regex_list:
matches = regex.findall(query)
if matches:
for match in matches:
if "," in match:
# Handle `in` case with multiple values
datasets.extend([ds.strip().strip('"') for ds in match.split(",")])
else:
# Handle `==` case
datasets.append(match.strip().strip('"'))
event_datasets: list[EventDataset] = []
for dataset in datasets:
parts = dataset.split(".")
if len(parts) == number_of_parts: # Ensure there are exactly two parts
event_datasets.append(EventDataset(package=parts[0], integration=parts[1]))
return event_datasets