Files
Sergey Polzunov 1fb60d6475 fix: type hinting fixes and additional code checks (#4790)
* first pass

* Adding a dedicated code checking workflow

* Type fixes

* linting config and python version bump

* Type hints

* Drop incorrect config option

* More fixes

* Style fixes

* CI adjustments

* Pyproject fixes

* CI & pyproject fixes

* Proper version bump

* Tests formatting

* Resolve cirtular dependency

* Test fixes

* Make sure the tests are formatted correctly

* Check tweaks

* Bumping python version in CI images

* Pin marshmallow do 3.x because 4.x is not supported

* License fix

* Convert path to str

* Making myself a codeowner

* Missing kwargs param

* Adding a missing kwargs to `set_score`

* Update .github/CODEOWNERS

Co-authored-by: Mika Ayenson, PhD <Mikaayenson@users.noreply.github.com>

* Dropping unnecessary raise

* Dropping skipped test

* Drop unnecessary var

* Drop unused commented-out func

* Disable typehinting for the whole func

* Update linting command

* Invalid type hist on the input param

* Incorrect field type

* Incorrect value used fix

* Stricter values check

* Simpler function call

* Type condition fix

* TOML formatter fix

* Simpligy output conditions

* Formatting

* Use proper types instead of aliases

* MITRE attack fixes

* Using pathlib.Path for an argument

* Use proper method to update a set from a dict

* First round of `ruff` fixes

* More fixes

* More fixes

* Hack against cyclic dependency

* Ignore `PLC0415`

* Remove unused markers

* Cleanup

* Fixing the incorrect condition

* Update .github/CODEOWNERS

Co-authored-by: Mika Ayenson, PhD <Mikaayenson@users.noreply.github.com>

* Set explicit default values for optional fields

* Update the guidelines

* Adding None Defaults

---------

Co-authored-by: Mika Ayenson, PhD <Mikaayenson@users.noreply.github.com>
Co-authored-by: eric-forte-elastic <eric.forte@elastic.co>
2025-07-01 08:20:55 -05:00

105 lines
3.9 KiB
Python

# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
# or more contributor license agreements. Licensed under the Elastic License
# 2.0; you may not use this file except in compliance with the Elastic License
# 2.0.
"""Test util time functions."""
import random
import time
import unittest
from detection_rules.ecs import get_kql_schema
from detection_rules.eswrap import Events
from detection_rules.utils import cached, normalize_timing_and_sort
class TestTimeUtils(unittest.TestCase):
"""Test util time functions."""
@staticmethod
def get_events(timestamp_field="@timestamp"):
"""Get test data."""
date_formats = {
"epoch_millis": lambda x: int(round(time.time(), 3) + x) * 1000,
"epoch_second": lambda x: round(time.time()) + x,
"unix_micros": lambda x: time.time() + x,
"unix_millis": lambda x: round(time.time(), 3) + x,
"strict_date_optional_time": lambda x: "2020-05-13T04:36:" + str(15 + x) + ".394Z",
}
def _get_data(func):
data = [
{timestamp_field: func(0), "foo": "bar", "id": 1},
{timestamp_field: func(1), "foo": "bar", "id": 2},
{timestamp_field: func(2), "foo": "bar", "id": 3},
{timestamp_field: func(3), "foo": "bar", "id": 4},
{timestamp_field: func(4), "foo": "bar", "id": 5},
{timestamp_field: func(5), "foo": "bar", "id": 6},
]
random.shuffle(data)
return data
return {fmt: _get_data(func) for fmt, func in date_formats.items()}
def assert_sort(self, normalized_events, date_format):
"""Assert normalize and sort."""
order = [e["id"] for e in normalized_events]
self.assertListEqual([1, 2, 3, 4, 5, 6], order, f"Sorting failed for date_format: {date_format}")
def test_time_normalize(self):
"""Test normalize_timing_from_date_format."""
events_data = self.get_events()
for date_format, events in events_data.items():
normalized = normalize_timing_and_sort(events)
self.assert_sort(normalized, date_format)
def test_event_class_normalization(self):
"""Test that events are normalized properly within Events."""
events_data = self.get_events()
for date_format, events in events_data.items():
normalized = Events({"winlogbeat": events})
self.assert_sort(normalized.events["winlogbeat"], date_format)
def test_schema_multifields(self):
"""Tests that schemas are loading multifields correctly."""
schema = get_kql_schema(version="1.4.0")
self.assertEqual(schema.get("process.name"), "keyword")
self.assertEqual(schema.get("process.name.text"), "text")
def test_caching(self):
"""Test that caching is working."""
counter = 0
@cached
def increment(*args, **kwargs):
nonlocal counter
counter += 1
return counter
self.assertEqual(increment(), 1)
self.assertEqual(increment(), 1)
self.assertEqual(increment(), 1)
self.assertEqual(increment(["hello", "world"]), 2)
self.assertEqual(increment(["hello", "world"]), 2)
self.assertEqual(increment(["hello", "world"]), 2)
self.assertEqual(increment(), 1)
self.assertEqual(increment(["hello", "world"]), 2)
self.assertEqual(increment({"hello": [("world",)]}), 3)
self.assertEqual(increment({"hello": [("world",)]}), 3)
self.assertEqual(increment(), 1)
self.assertEqual(increment(["hello", "world"]), 2)
self.assertEqual(increment({"hello": [("world",)]}), 3)
increment.clear()
self.assertEqual(increment({"hello": [("world",)]}), 4)
self.assertEqual(increment(["hello", "world"]), 5)
self.assertEqual(increment(), 6)
self.assertEqual(increment(None), 7)
self.assertEqual(increment(1), 8)