Auth to Kibana connector using an existing cookie (#711)

This commit is contained in:
Justin Ibarra
2020-12-12 19:10:52 -06:00
committed by Ross Wolf
parent 3042cbb5d6
commit b6aa6c6548
3 changed files with 35 additions and 12 deletions
+9 -5
View File
@@ -13,17 +13,21 @@ from .rule_loader import load_rule_files, load_rules
from .utils import format_command_options
def get_kibana_client(cloud_id, kibana_url, kibana_user, kibana_password, **kwargs):
def get_kibana_client(cloud_id, kibana_url, kibana_user, kibana_password, kibana_cookie, **kwargs):
"""Get an authenticated Kibana client."""
if not (cloud_id or kibana_url):
client_error("Missing required --cloud-id or --kibana-url")
# don't prompt for these until there's a cloud id or Kibana URL
kibana_user = kibana_user or click.prompt("kibana_user")
kibana_password = kibana_password or click.prompt("kibana_password", hide_input=True)
if not kibana_cookie:
# don't prompt for these until there's a cloud id or Kibana URL
kibana_user = kibana_user or click.prompt("kibana_user")
kibana_password = kibana_password or click.prompt("kibana_password", hide_input=True)
with Kibana(cloud_id=cloud_id, kibana_url=kibana_url, **kwargs) as kibana:
kibana.login(kibana_user, kibana_password)
if kibana_cookie:
kibana.add_cookie(kibana_cookie)
else:
kibana.login(kibana_user, kibana_password)
return kibana
+5 -3
View File
@@ -564,15 +564,17 @@ def getdefault(name):
client_options = {
'kibana': {
'kibana_url': click.Option(['--kibana-url'], default=getdefault('kibana_url')),
'cloud_id': click.Option(['--cloud-id'], default=getdefault('cloud_id')),
'kibana_user': click.Option(['--kibana-user', '-ku'], default=getdefault('kibana_user')),
'kibana_cookie': click.Option(['--kibana-cookie', '-kc'], default=getdefault('kibana_cookie'),
help='Cookie from an authed session'),
'kibana_password': click.Option(['--kibana-password', '-kp'], default=getdefault('kibana_password')),
'kibana_url': click.Option(['--kibana-url'], default=getdefault('kibana_url')),
'kibana_user': click.Option(['--kibana-user', '-ku'], default=getdefault('kibana_user')),
'space': click.Option(['--space'], default=None, help='Kibana space')
},
'elasticsearch': {
'elasticsearch_url': click.Option(['--elasticsearch-url'], default=getdefault("elasticsearch_url")),
'cloud_id': click.Option(['--cloud-id'], default=getdefault("cloud_id")),
'elasticsearch_url': click.Option(['--elasticsearch-url'], default=getdefault("elasticsearch_url")),
'es_user': click.Option(['--es-user', '-eu'], default=getdefault("es_user")),
'es_password': click.Option(['--es-password', '-ep'], default=getdefault("es_password")),
'timeout': click.Option(['--timeout', '-et'], default=60, help='Timeout for elasticsearch client')
+21 -4
View File
@@ -27,7 +27,7 @@ class Kibana(object):
self.session.verify = verify
self.cloud_id = cloud_id
self.kibana_url = kibana_url
self.kibana_url = kibana_url.rstrip('/')
self.elastic_url = None
self.space = space if space and space.lower() != 'default' else None
self.status = None
@@ -37,7 +37,12 @@ class Kibana(object):
self.domain, self.es_uuid, self.kibana_uuid = \
base64.b64decode(cloud_info.encode("utf-8")).decode("utf-8").split("$")
self.kibana_url = f"https://{self.kibana_uuid}.{self.domain}:9243"
kibana_url_from_cloud = f"https://{self.kibana_uuid}.{self.domain}:9243"
if self.kibana_url and self.kibana_url != kibana_url_from_cloud:
raise ValueError(f'kibana_url provided ({self.kibana_url}) does not match url derived from cloud_id '
f'{kibana_url_from_cloud}')
self.kibana_url = kibana_url_from_cloud
self.elastic_url = f"https://{self.es_uuid}.{self.domain}:9243"
self.session.headers.update({'Content-Type': "application/json", "kbn-xsrf": str(uuid.uuid4())})
@@ -62,7 +67,7 @@ class Kibana(object):
uri = "s/{}/{}".format(self.space, uri)
return f"{self.kibana_url}/{uri}"
def request(self, method, uri, params=None, data=None, error=True, verbose=True, raw=False):
def request(self, method, uri, params=None, data=None, error=True, verbose=True, raw=False, **kwargs):
"""Perform a RESTful HTTP request with JSON responses."""
params = params or {}
url = self.url(uri)
@@ -71,7 +76,7 @@ class Kibana(object):
if data is not None:
body = json.dumps(data)
response = self.session.request(method, url, params=params, data=body)
response = self.session.request(method, url, params=params, data=body, **kwargs)
if error:
try:
response.raise_for_status()
@@ -135,6 +140,13 @@ class Kibana(object):
# make chaining easier
return self
def add_cookie(self, cookie):
"""Add cookie to be used for auth (such as from an SSO session)."""
# the request to /api/status will also add the cookie to the cookie jar upon a successful response
self.session.headers['cookie'] = cookie
self.status = self.get('/api/status')
self.authenticated = True
def logout(self):
"""Quit the current session."""
try:
@@ -179,3 +191,8 @@ class Kibana(object):
space_names = [s['name'] for s in spaces]
if space not in space_names:
raise ValueError(f'Unknown Kibana space: {space}')
def current_user(self):
"""Retrieve info for currently authenticated user."""
if self.authenticated:
return self.get('/internal/security/me')