fix: removing outdated code in Kibana client auth (#4495)

* Simplify kibana session management

* Drop removed options from `kibana_args` set

* Style fix

* Patch version bump

* Bumping kibana lib version

* Relax CLI requirement, making `api_key` optional, to allow `help` to run
This commit is contained in:
Sergey Polzunov
2025-03-24 12:28:36 +01:00
committed by GitHub
parent db78756062
commit 65170c394b
5 changed files with 59 additions and 142 deletions
+25 -72
View File
@@ -12,23 +12,30 @@ import threading
import uuid
from typing import List, Optional, Union
from urllib.parse import urljoin
import requests
from elasticsearch import Elasticsearch
_context = threading.local()
class Kibana(object):
class Kibana:
"""Wrapper around the Kibana SIEM APIs."""
CACHED = False
def __init__(self, cloud_id=None, kibana_url=None, verify=True, elasticsearch=None, space=None):
def __init__(self, cloud_id=None, kibana_url=None, api_key=None, verify=True, elasticsearch=None, space=None):
""""Open a session to the platform."""
self.authenticated = False
self.session = requests.Session()
self.session.verify = verify
if api_key:
self.session.headers.update(
{
"kbn-xsrf": "true",
"Authorization": f"ApiKey {api_key}",
}
)
self.verify = verify
self.cloud_id = cloud_id
@@ -37,9 +44,6 @@ class Kibana(object):
self.space = space if space and space.lower() != 'default' else None
self.status = None
self.provider_name = None
self.provider_type = None
if self.cloud_id:
self.cluster_name, cloud_info = self.cloud_id.split(":")
self.domain, self.es_uuid, self.kibana_uuid = \
@@ -50,18 +54,24 @@ class Kibana(object):
kibana_url_from_cloud = f"https://{self.kibana_uuid}.{self.domain}:9243"
if self.kibana_url and self.kibana_url != kibana_url_from_cloud:
raise ValueError(f'kibana_url provided ({self.kibana_url}) does not match url derived from cloud_id '
f'{kibana_url_from_cloud}')
raise ValueError(
f'kibana_url provided ({self.kibana_url}) does not match url derived from cloud_id '
f'{kibana_url_from_cloud}'
)
self.kibana_url = kibana_url_from_cloud
self.elastic_url = f"https://{self.es_uuid}.{self.domain}:9243"
self.provider_name = 'cloud-basic'
self.provider_type = 'basic'
self.session.headers.update({'Content-Type': "application/json", "kbn-xsrf": str(uuid.uuid4())})
self.elasticsearch = elasticsearch
if not self.elasticsearch and self.elastic_url:
self.elasticsearch = Elasticsearch(
hosts=[self.elastic_url],
api_key=api_key,
verify_certs=self.verify,
)
self.elasticsearch.info()
if not verify:
from requests.packages.urllib3.exceptions import \
InsecureRequestWarning
@@ -75,7 +85,7 @@ class Kibana(object):
return self.status.get("version", {}).get("number")
@staticmethod
def ndjson_file_data_prep(lines: List[dict], filename: str) -> (dict, str):
def ndjson_file_data_prep(lines: List[dict], filename: str) -> tuple[dict, str]:
"""Prepare a request for an ndjson file upload to Kibana."""
data = ('\n'.join(json.dumps(r) for r in lines) + '\n')
boundary = '----JustAnotherBoundary'
@@ -144,63 +154,6 @@ class Kibana(object):
"""Perform an HTTP DELETE."""
return self.request('DELETE', uri, params=params, error=error, **kwargs)
def login(self, kibana_username, kibana_password, provider_type=None, provider_name=None):
"""Authenticate to Kibana using the API to update our cookies."""
payload = {'username': kibana_username, 'password': kibana_password}
path = '/internal/security/login'
try:
self.post(path, data=payload, error=True, verbose=False)
except requests.HTTPError as e:
# 7.10 changed the structure of the auth data
# providers dictated by Kibana configs in:
# https://www.elastic.co/guide/en/kibana/current/security-settings-kb.html#authentication-security-settings
# more details: https://discuss.elastic.co/t/kibana-7-10-login-issues/255201/2
if e.response.status_code == 400 and '[undefined]' in e.response.text:
provider_type = provider_type or self.provider_type or 'basic'
provider_name = provider_name or self.provider_name or 'basic'
payload = {
'params': payload,
'currentURL': '',
'providerType': provider_type,
'providerName': provider_name
}
self.post(path, data=payload, error=True)
else:
raise
# Kibana will authenticate against URLs which contain invalid spaces
if self.space:
self.verify_space(self.space)
self.authenticated = True
self.status = self.get("/api/status")
# create ES and force authentication
if self.elasticsearch is None and self.elastic_url is not None:
self.elasticsearch = Elasticsearch(hosts=[self.elastic_url], http_auth=(kibana_username, kibana_password),
verify_certs=self.verify)
self.elasticsearch.info()
# make chaining easier
return self
def add_cookie(self, cookie):
"""Add cookie to be used for auth (such as from an SSO session)."""
# https://www.elastic.co/guide/en/kibana/7.10/security-settings-kb.html#security-session-and-cookie-settings
self.session.headers['sid'] = cookie
self.session.cookies.set('sid', cookie)
self.status = self.get('/api/status')
self.authenticated = True
def add_api_key(self, api_key: str) -> bool:
"""Add an API key to be used for auth."""
self.session.headers['Authorization'] = f'ApiKey {api_key}'
self.status = self.get('/api/status')
self.authenticated = True
return bool(self.status)
def logout(self):
"""Quit the current session."""
try:
+1 -1
View File
@@ -1,6 +1,6 @@
[project]
name = "detection-rules-kibana"
version = "0.4.1"
version = "0.4.2"
description = "Kibana API utilities for Elastic Detection Rules"
license = {text = "Elastic License v2"}
keywords = ["Elastic", "Kibana", "Detection Rules", "Security", "Elasticsearch"]