diff --git a/detection_rules/kbwrap.py b/detection_rules/kbwrap.py index 921305650..dec19acf4 100644 --- a/detection_rules/kbwrap.py +++ b/detection_rules/kbwrap.py @@ -13,17 +13,21 @@ from .rule_loader import load_rule_files, load_rules from .utils import format_command_options -def get_kibana_client(cloud_id, kibana_url, kibana_user, kibana_password, **kwargs): +def get_kibana_client(cloud_id, kibana_url, kibana_user, kibana_password, kibana_cookie, **kwargs): """Get an authenticated Kibana client.""" if not (cloud_id or kibana_url): client_error("Missing required --cloud-id or --kibana-url") - # don't prompt for these until there's a cloud id or Kibana URL - kibana_user = kibana_user or click.prompt("kibana_user") - kibana_password = kibana_password or click.prompt("kibana_password", hide_input=True) + if not kibana_cookie: + # don't prompt for these until there's a cloud id or Kibana URL + kibana_user = kibana_user or click.prompt("kibana_user") + kibana_password = kibana_password or click.prompt("kibana_password", hide_input=True) with Kibana(cloud_id=cloud_id, kibana_url=kibana_url, **kwargs) as kibana: - kibana.login(kibana_user, kibana_password) + if kibana_cookie: + kibana.add_cookie(kibana_cookie) + else: + kibana.login(kibana_user, kibana_password) return kibana diff --git a/detection_rules/misc.py b/detection_rules/misc.py index 58a3772c1..6dd4de0ee 100644 --- a/detection_rules/misc.py +++ b/detection_rules/misc.py @@ -564,15 +564,17 @@ def getdefault(name): client_options = { 'kibana': { - 'kibana_url': click.Option(['--kibana-url'], default=getdefault('kibana_url')), 'cloud_id': click.Option(['--cloud-id'], default=getdefault('cloud_id')), - 'kibana_user': click.Option(['--kibana-user', '-ku'], default=getdefault('kibana_user')), + 'kibana_cookie': click.Option(['--kibana-cookie', '-kc'], default=getdefault('kibana_cookie'), + help='Cookie from an authed session'), 'kibana_password': click.Option(['--kibana-password', '-kp'], default=getdefault('kibana_password')), + 'kibana_url': click.Option(['--kibana-url'], default=getdefault('kibana_url')), + 'kibana_user': click.Option(['--kibana-user', '-ku'], default=getdefault('kibana_user')), 'space': click.Option(['--space'], default=None, help='Kibana space') }, 'elasticsearch': { - 'elasticsearch_url': click.Option(['--elasticsearch-url'], default=getdefault("elasticsearch_url")), 'cloud_id': click.Option(['--cloud-id'], default=getdefault("cloud_id")), + 'elasticsearch_url': click.Option(['--elasticsearch-url'], default=getdefault("elasticsearch_url")), 'es_user': click.Option(['--es-user', '-eu'], default=getdefault("es_user")), 'es_password': click.Option(['--es-password', '-ep'], default=getdefault("es_password")), 'timeout': click.Option(['--timeout', '-et'], default=60, help='Timeout for elasticsearch client') diff --git a/kibana/connector.py b/kibana/connector.py index b88630364..cbf00e6a1 100644 --- a/kibana/connector.py +++ b/kibana/connector.py @@ -27,7 +27,7 @@ class Kibana(object): self.session.verify = verify self.cloud_id = cloud_id - self.kibana_url = kibana_url + self.kibana_url = kibana_url.rstrip('/') self.elastic_url = None self.space = space if space and space.lower() != 'default' else None self.status = None @@ -37,7 +37,12 @@ class Kibana(object): self.domain, self.es_uuid, self.kibana_uuid = \ base64.b64decode(cloud_info.encode("utf-8")).decode("utf-8").split("$") - self.kibana_url = f"https://{self.kibana_uuid}.{self.domain}:9243" + kibana_url_from_cloud = f"https://{self.kibana_uuid}.{self.domain}:9243" + if self.kibana_url and self.kibana_url != kibana_url_from_cloud: + raise ValueError(f'kibana_url provided ({self.kibana_url}) does not match url derived from cloud_id ' + f'{kibana_url_from_cloud}') + self.kibana_url = kibana_url_from_cloud + self.elastic_url = f"https://{self.es_uuid}.{self.domain}:9243" self.session.headers.update({'Content-Type': "application/json", "kbn-xsrf": str(uuid.uuid4())}) @@ -62,7 +67,7 @@ class Kibana(object): uri = "s/{}/{}".format(self.space, uri) return f"{self.kibana_url}/{uri}" - def request(self, method, uri, params=None, data=None, error=True, verbose=True, raw=False): + def request(self, method, uri, params=None, data=None, error=True, verbose=True, raw=False, **kwargs): """Perform a RESTful HTTP request with JSON responses.""" params = params or {} url = self.url(uri) @@ -71,7 +76,7 @@ class Kibana(object): if data is not None: body = json.dumps(data) - response = self.session.request(method, url, params=params, data=body) + response = self.session.request(method, url, params=params, data=body, **kwargs) if error: try: response.raise_for_status() @@ -135,6 +140,13 @@ class Kibana(object): # make chaining easier return self + def add_cookie(self, cookie): + """Add cookie to be used for auth (such as from an SSO session).""" + # the request to /api/status will also add the cookie to the cookie jar upon a successful response + self.session.headers['cookie'] = cookie + self.status = self.get('/api/status') + self.authenticated = True + def logout(self): """Quit the current session.""" try: @@ -179,3 +191,8 @@ class Kibana(object): space_names = [s['name'] for s in spaces] if space not in space_names: raise ValueError(f'Unknown Kibana space: {space}') + + def current_user(self): + """Retrieve info for currently authenticated user.""" + if self.authenticated: + return self.get('/internal/security/me')