diff --git a/perun/proxy/utils/nagios/check_oidc_login.py b/perun/proxy/utils/nagios/check_oidc_login.py new file mode 100644 index 0000000000000000000000000000000000000000..f4981052edcf200373ca055811cc90dcf138293e --- /dev/null +++ b/perun/proxy/utils/nagios/check_oidc_login.py @@ -0,0 +1,514 @@ +#!/usr/bin/env python3 + +""" +make a full roundtrip test.yaml for SAML based SSO +""" + +import argparse +import base64 +import hmac +import http.cookiejar +import http.server +import json +import logging +import os +import struct +import threading +import time +import urllib.error +import urllib.parse +import urllib.request +from enum import Enum +from html.parser import HTMLParser +from typing import Dict, Any, List +from urllib.parse import urlparse + +from flask import Flask, Response, request +from idpyoidc.client.oauth2.stand_alone_client import StandAloneClient + +CALLBACK_ENDPOINT_CALLED = threading.Event() + + +class Status(Enum): + OK = 0, "OK" + WARNING = 1, "WARNING" + CRITICAL = 2, "CRITICAL" + UNKNOWN = 3, "UNKNOWN" + + +class Evaluator: + def __init__(self): + self.start_time = None + + def start_test(self): + self.start_time = time.time() + + def finish_test(self, message: str, status: Status = Status.OK): + if not self.start_time: + raise ValueError("Test has not been started.") + + auth_time = None + if status == Status.OK: + auth_time = round(time.time() - self.start_time, 2) + + message = f"{message}|authtime={auth_time or ''};" + status_code, status_name = status.value + + print(f"{status_name} - {message}") + os._exit(status_code) + + +def silence_flask_logs(app: Flask) -> None: + log = logging.getLogger("werkzeug") + log.setLevel(logging.ERROR) + app.logger.disabled = True + log.disabled = True + + +def get_flask_app( + client: StandAloneClient, evaluator: Evaluator, callback_path: str, verbose: int +) -> Flask: + app = Flask(__name__) + app.config["client"] = client + app.config["evaluator"] = evaluator + silence_flask_logs(app) + + @app.route(callback_path) + def signin_oidc(): + CALLBACK_ENDPOINT_CALLED.set() + client = app.config["client"] + query_params = request.args.to_dict() + + if verbose >= 3: + print(f"Parameters returned to callback after OIDC auth '{query_params}'") + + auth_response = client.finalize(query_params) + + if verbose >= 3: + print( + f"Final authentication response after exchange of code for token '" + f"{auth_response}'" + ) + + response = Response(status=200, headers={"Content-type": "text/plain"}) + + @response.call_on_close + def trigger_evaluation(): + # trigger auth response evaluation after the flask response has been sent + if not auth_response.get("id_token"): + evaluator.finish_test( + "ID token was not found in OP's response.", Status.CRITICAL + ) + + if not auth_response.get("userinfo"): + evaluator.finish_test( + "User info was not found in OP's response.", Status.CRITICAL + ) + + evaluator.finish_test("Authentication was successful.", Status.OK) + + return response + + return app + + +def start_flask_app( + client: StandAloneClient, evaluator: Evaluator, redirect_uri: str, verbose: int +): + parsed_redirect_uri = urlparse(redirect_uri) + app = get_flask_app(client, evaluator, parsed_redirect_uri.path, verbose) + + app.run(host=parsed_redirect_uri.hostname, port=parsed_redirect_uri.port) + + +def get_query_params(url_path: str) -> Dict[Any, Any]: + query_params = urllib.parse.urlparse(url_path).query + query_params_dict = urllib.parse.parse_qs(query_params) + query_params_dict_parsed = {key: val[0] for key, val in query_params_dict.items()} + + return query_params_dict_parsed + + +# TODO possibly extract these methods shared with check_saml or use reflector backend +# to avoid real logging in +# https://github.com/susam/mintotp/blob/master/mintotp.py +def hotp(key, counter, digits=6, digest="sha1"): + key = base64.b32decode(key.upper() + "=" * ((8 - len(key)) % 8)) + counter = struct.pack(">Q", counter) + mac = hmac.new(key, counter, digest).digest() + offset = mac[-1] & 0x0F + endoffset = offset + 4 + binary = struct.unpack(">L", mac[offset:endoffset])[0] & 0x7FFFFFFF + return str(binary)[-digits:].zfill(digits) + + +def totp(key, time_step=30, digits=6, digest="sha1"): + return hotp(key, int(time.time() / time_step), digits, digest) + + +class FormParser(HTMLParser): + form_action = None + form_data = {} + _form_in_progress = False + + def __init__(self, *args, **kwargs): + self.form_action = None + self.form_data = {} + self._form_in_progress = False + super(FormParser, self).__init__(*args, **kwargs) + + def handle_starttag(self, tag, attrs): + if tag.lower() == "form": + self._form_in_progress = True + for name, value in attrs: + if name.lower() == "action" and self.form_action is None: + self.form_action = value + elif tag.lower() == "input": + input_name = None + input_value = None + for name, value in attrs: + if name.lower() == "name": + input_name = value + elif name.lower() == "value": + input_value = value + if input_name and input_value: + self.form_data[input_name] = input_value + + def handle_endtag(self, tag): + if tag.lower() == "form": + self._form_in_progress = False + + +def parse_form(html): + parser = FormParser() + parser.feed(html) + return parser.form_action, parser.form_data + + +def get_args(): + """ + Supports the command-line arguments listed below. + """ + parser = argparse.ArgumentParser(description="SAML authentication check") + parser._optionals.title = "Options" + parser.add_argument( + "--username", + "-u", + required=True, + help="username for IdP", + ) + parser.add_argument( + "--password", + "-p", + required=True, + help="password for IdP", + ) + parser.add_argument( + "--redirect-uri", + "-r", + required=True, + help="URI where OIDC callback will be redirected", + ) + parser.add_argument( + "--client-id", + "-cid", + required=True, + help="ID of OIDC client registered with OP", + ) + parser.add_argument( + "--client-secret", + "-cs", + required=False, + help="secret of OIDC client registered with OP", + ) + parser.add_argument( + "--issuer", + "-i", + required=True, + help="Issuer used for OIDC auth", + ) + parser.add_argument( + "--scopes", + "-s", + nargs="+", + help="List of scopes client will ask to access", + default=["openid", "email", "profile"], + ) + parser.add_argument( + "--username-field", + help="name of the username field on the login page", + default="username", + ) + parser.add_argument( + "--password-field", + help="name of the password field on the login page", + default="password", + ) + parser.add_argument( + "--totp", + help="secret key (seed) for TOTP in Base32 encoding", + default="ZYTYYE5FOAGW5ML7LRWUL4WTZLNJAMZS", + ) + parser.add_argument( + "--verbose", + "-v", + action="count", + default=0, + help="verbose mode (for debugging)", + ) + parser.add_argument( + "--remember-me", + action="store_true", + help="check the Remember me option when logging in", + ) + + return parser.parse_args() + + +class OIDCChecker: + def curl(self, url, data=None): + if self.args.verbose >= 1: + print("curl: {}".format(url)) + req = urllib.request.Request( + url=url, + data=urllib.parse.urlencode(data).encode("ascii") if data else None, + ) + if self.args.verbose >= 1: + print("") + response = None + try: + response = self.opener.open(req) + return response + except urllib.error.URLError as e: + if self.args.verbose >= 1: + print(e) + if self.args.verbose >= 2: + print(response) + self.evaluator.finish_test(e.reason, Status.CRITICAL) + + def js_form_redirect(self, html, url, force=False): + if ( + force + or "document.forms[0].submit()" in html + or "javascript:DoSubmit();" in html + ): + form_action, form_data = parse_form(html) + return self.send_form(url, form_action, form_data) + return None, None + + def initial_request(self, url): + response = self.curl(url) + response_html = response.read().decode("utf-8") + response_url = response.url + + if not self.is_using_pkce: + response_html, response_url = self.js_form_redirect( + response_html, response.url + ) + + return response_html, response_url + + def send_form(self, url, action, data): + target_url = urllib.parse.urljoin(url, action) + response = self.curl(target_url, data) + + return response.read().decode("utf-8"), response.url + + def send_credentials(self, login_form_url, login_form_action, login_form_data): + login_form_data[self.args.username_field] = self.args.username + login_form_data[self.args.password_field] = self.args.password + if self.args.remember_me: + login_form_data["remember_me"] = "Yes" + response_html, response_url = self.send_form( + login_form_url, login_form_action, login_form_data + ) + + if self.args.verbose >= 1: + print(response_url) + if self.args.verbose >= 3: + print(response_html) + + # MFA + if "totp" in response_html.lower() or "privacyidea" in response_html.lower(): + if self.args.verbose >= 1: + print("MFA is required") + totp_form_action, totp_form_data = parse_form(response_html) + totp_code = totp(self.args.totp) + totp_form_data["code"] = totp_code + totp_form_data["otp"] = totp_code + response_html, response_url = self.send_form( + response_url, totp_form_action, totp_form_data + ) + if self.args.verbose >= 1: + print(response_url) + if "TOTP" in response_html or "privacyIDEA" in response_html: + if self.args.verbose >= 2: + print(response_html) + self.evaluator.finish_test("TOTP MFA failed", Status.CRITICAL) + if self.args.verbose >= 3: + print(response_html) + + if "consent" in response_html: + self.evaluator.finish_test("Consent is required", Status.UNKNOWN) + elif "Wrong UÄŚO or password" in response_html: + self.evaluator.finish_test( + "Login was not successful, invalid username or password", + Status.CRITICAL, + ) + elif "Unhandled exception" in response_html: + self.evaluator.finish_test( + "Login was not successful, unhandled exception occured", Status.CRITICAL + ) + elif "SAMLResponse" not in response_html: + self.evaluator.finish_test( + "Login was not successful, unknown error", Status.CRITICAL + ) + + form_action, form_data = parse_form(response_html) + if "SAMLResponse" not in form_data: + self.evaluator.finish_test( + "Login was not successful, unknown error", Status.CRITICAL + ) + saml_response = base64.b64decode(form_data["SAMLResponse"]).decode("utf-8") + + if ( + '<samlp:StatusCode Value="urn:oasis:names:tc:SAML:2.0:status:Success"/>' + not in saml_response + ): + self.evaluator.finish_test( + "Login was not successful, non-success response", Status.CRITICAL + ) + + return self.js_form_redirect(response_html, response_url, True) + + def main(self): + """ + CMD Line tool + """ + + self.evaluator.start_test() + + # 1. start the Flask endpoint which handles OIDC callback + thread = threading.Thread( + target=start_flask_app, + args=[ + self.client, + self.evaluator, + self.args.redirect_uri, + self.args.verbose, + ], + ) + thread.start() + + # 2. initiate auth process + req_args = {"redirect_uri": self.args.redirect_uri} + login_url = self.client.init_authorization(req_args=req_args) + if self.args.verbose >= 3: + print(f"OIDC initiating url: '{login_url}'") + + # 3. further proceed with auth + login_form_html, login_form_url = self.initial_request(login_url) + if self.args.verbose >= 3: + print(f"Login form url: '{login_form_url}'") + print(f"Login form html: '{login_form_html}'") + + # 4. log in and wait for the response to be processed by Flask where exchange + # of code for token happens + # correct redirect ends the test in Flask endpoint + login_form_action, login_form_data = parse_form(login_form_html) + html, response_url = self.send_credentials( + login_form_url, login_form_action, login_form_data + ) + + if self.args.verbose >= 3: + print(html) + + # This point should not be reached if callback endpoint was called + if not CALLBACK_ENDPOINT_CALLED.is_set(): + self.evaluator.finish_test( + "Callback endpoint was not called.", Status.CRITICAL + ) + + def get_issuer_info(self, issuer): + well_known_endpoint = f"{issuer}/.well-known/openid-configuration" + urllib.request.Request(well_known_endpoint) + response = self.curl(well_known_endpoint) + return json.loads(response.read().decode("utf-8")) + + def get_pkce_client_cfg( + self, client_id: str, issuer: str, scopes: List[str] + ) -> Dict[str, Any]: + issuer_info = self.get_issuer_info(issuer) + return { + "provider_info": { + "issuer": issuer, + "jwks_uri": issuer_info.get("jwks_uri"), + "userinfo_endpoint": issuer_info.get("userinfo_endpoint"), + "token_endpoint": issuer_info.get("token_endpoint"), + "authorization_endpoint": issuer_info.get("authorization_endpoint"), + }, + "add_ons": { + "pkce": { + "function": "idpyoidc.client.oauth2.add_on.pkce.add_support", + "kwargs": { + "code_challenge_length": 64, + "code_challenge_method": "S256", + }, + }, + }, + "client_id": client_id, + "scopes_supported": scopes, + "client_type": "oidc", + } + + def get_non_pkce_client_cfg( + self, client_id: str, client_secret: str, issuer: str, scopes: List[str] + ) -> Dict[str, Any]: + return { + "provider_info": { + "issuer": issuer, + }, + "client_id": client_id, + "client_secret": client_secret, + "scopes_supported": scopes, + "client_type": "oidc", + } + + def get_registered_client( + self, client_id: str, client_secret: str, issuer: str, scopes: List[str] + ) -> StandAloneClient: + if self.is_using_pkce: + client_cfg = self.get_pkce_client_cfg(client_id, issuer, scopes) + else: + client_cfg = self.get_non_pkce_client_cfg( + client_id, client_secret, issuer, scopes + ) + + client = StandAloneClient(config=client_cfg) + client.do_provider_info() # get provider info based on issuer in client's + # config + client.do_client_registration() # client is configured statically if + # client_id is provided in the config + + return client + + def __init__(self, args): + self.args = args + self.cookiejar = http.cookiejar.CookieJar() + self.opener = urllib.request.build_opener( + urllib.request.HTTPCookieProcessor(self.cookiejar), + ) + self.is_using_pkce = args.client_secret is None + self.client = self.get_registered_client( + args.client_id, args.client_secret, args.issuer, args.scopes + ) + self.evaluator = Evaluator() + + +def main(): + checker = OIDCChecker(get_args()) + checker.main() + + +if __name__ == "__main__": + main() diff --git a/setup.py b/setup.py index 1730994c0688b482591511ae26c3ca3f3d688b27..10cab7ea8bc4292b5eef79ffc4e03e84e251f8c3 100644 --- a/setup.py +++ b/setup.py @@ -38,6 +38,7 @@ setuptools.setup( "check_ldap_syncrepl=check_syncrepl_extended.check_syncrepl_extended:main", "check_mongodb=perun.proxy.utils.nagios.check_mongodb:main", "check_nginx=check_nginx_status.check_nginx_status:main", + "check_oidc_login=perun.proxy.utils.nagios.check_oidc_login:main", "check_rpc_status=perun.proxy.utils.nagios.check_rpc_status:main", "check_saml=perun.proxy.utils.nagios.check_saml:main", "check_user_logins=perun.proxy.utils.nagios.check_user_logins:main",