Skip to content
Snippets Groups Projects
Commit 97e988e0 authored by Peter Bolha's avatar Peter Bolha :ok_hand_tone1:
Browse files

feat: satosa oidc probe

parent ca4be0a5
No related branches found
No related tags found
No related merge requests found
Pipeline #340594 passed
#!/usr/bin/env python3
"""
make a full roundtrip test.yaml for SAML based SSO
"""
import argparse
import base64
import hmac
import http.cookiejar
import http.server
import json
import logging
import os
import struct
import threading
import time
import urllib.error
import urllib.parse
import urllib.request
from enum import Enum
from html.parser import HTMLParser
from typing import Dict, Any, List
from urllib.parse import urlparse
from flask import Flask, Response, request
from idpyoidc.client.oauth2.stand_alone_client import StandAloneClient
CALLBACK_ENDPOINT_CALLED = threading.Event()
class Status(Enum):
OK = 0, "OK"
WARNING = 1, "WARNING"
CRITICAL = 2, "CRITICAL"
UNKNOWN = 3, "UNKNOWN"
class Evaluator:
def __init__(self):
self.start_time = None
def start_test(self):
self.start_time = time.time()
def finish_test(self, message: str, status: Status = Status.OK):
if not self.start_time:
raise ValueError("Test has not been started.")
auth_time = None
if status == Status.OK:
auth_time = round(time.time() - self.start_time, 2)
message = f"{message}|authtime={auth_time or ''};"
status_code, status_name = status.value
print(f"{status_name} - {message}")
os._exit(status_code)
def silence_flask_logs(app: Flask) -> None:
log = logging.getLogger("werkzeug")
log.setLevel(logging.ERROR)
app.logger.disabled = True
log.disabled = True
def get_flask_app(
client: StandAloneClient, evaluator: Evaluator, callback_path: str, verbose: int
) -> Flask:
app = Flask(__name__)
app.config["client"] = client
app.config["evaluator"] = evaluator
silence_flask_logs(app)
@app.route(callback_path)
def signin_oidc():
CALLBACK_ENDPOINT_CALLED.set()
client = app.config["client"]
query_params = request.args.to_dict()
if verbose >= 3:
print(f"Parameters returned to callback after OIDC auth '{query_params}'")
auth_response = client.finalize(query_params)
if verbose >= 3:
print(
f"Final authentication response after exchange of code for token '"
f"{auth_response}'"
)
response = Response(status=200, headers={"Content-type": "text/plain"})
@response.call_on_close
def trigger_evaluation():
# trigger auth response evaluation after the flask response has been sent
if not auth_response.get("id_token"):
evaluator.finish_test(
"ID token was not found in OP's response.", Status.CRITICAL
)
if not auth_response.get("userinfo"):
evaluator.finish_test(
"User info was not found in OP's response.", Status.CRITICAL
)
evaluator.finish_test("Authentication was successful.", Status.OK)
return response
return app
def start_flask_app(
client: StandAloneClient, evaluator: Evaluator, redirect_uri: str, verbose: int
):
parsed_redirect_uri = urlparse(redirect_uri)
app = get_flask_app(client, evaluator, parsed_redirect_uri.path, verbose)
app.run(host=parsed_redirect_uri.hostname, port=parsed_redirect_uri.port)
def get_query_params(url_path: str) -> Dict[Any, Any]:
query_params = urllib.parse.urlparse(url_path).query
query_params_dict = urllib.parse.parse_qs(query_params)
query_params_dict_parsed = {key: val[0] for key, val in query_params_dict.items()}
return query_params_dict_parsed
# TODO possibly extract these methods shared with check_saml or use reflector backend
# to avoid real logging in
# https://github.com/susam/mintotp/blob/master/mintotp.py
def hotp(key, counter, digits=6, digest="sha1"):
key = base64.b32decode(key.upper() + "=" * ((8 - len(key)) % 8))
counter = struct.pack(">Q", counter)
mac = hmac.new(key, counter, digest).digest()
offset = mac[-1] & 0x0F
endoffset = offset + 4
binary = struct.unpack(">L", mac[offset:endoffset])[0] & 0x7FFFFFFF
return str(binary)[-digits:].zfill(digits)
def totp(key, time_step=30, digits=6, digest="sha1"):
return hotp(key, int(time.time() / time_step), digits, digest)
class FormParser(HTMLParser):
form_action = None
form_data = {}
_form_in_progress = False
def __init__(self, *args, **kwargs):
self.form_action = None
self.form_data = {}
self._form_in_progress = False
super(FormParser, self).__init__(*args, **kwargs)
def handle_starttag(self, tag, attrs):
if tag.lower() == "form":
self._form_in_progress = True
for name, value in attrs:
if name.lower() == "action" and self.form_action is None:
self.form_action = value
elif tag.lower() == "input":
input_name = None
input_value = None
for name, value in attrs:
if name.lower() == "name":
input_name = value
elif name.lower() == "value":
input_value = value
if input_name and input_value:
self.form_data[input_name] = input_value
def handle_endtag(self, tag):
if tag.lower() == "form":
self._form_in_progress = False
def parse_form(html):
parser = FormParser()
parser.feed(html)
return parser.form_action, parser.form_data
def get_args():
"""
Supports the command-line arguments listed below.
"""
parser = argparse.ArgumentParser(description="SAML authentication check")
parser._optionals.title = "Options"
parser.add_argument(
"--username",
"-u",
required=True,
help="username for IdP",
)
parser.add_argument(
"--password",
"-p",
required=True,
help="password for IdP",
)
parser.add_argument(
"--redirect-uri",
"-r",
required=True,
help="URI where OIDC callback will be redirected",
)
parser.add_argument(
"--client-id",
"-cid",
required=True,
help="ID of OIDC client registered with OP",
)
parser.add_argument(
"--client-secret",
"-cs",
required=False,
help="secret of OIDC client registered with OP",
)
parser.add_argument(
"--issuer",
"-i",
required=True,
help="Issuer used for OIDC auth",
)
parser.add_argument(
"--scopes",
"-s",
nargs="+",
help="List of scopes client will ask to access",
default=["openid", "email", "profile"],
)
parser.add_argument(
"--username-field",
help="name of the username field on the login page",
default="username",
)
parser.add_argument(
"--password-field",
help="name of the password field on the login page",
default="password",
)
parser.add_argument(
"--totp",
help="secret key (seed) for TOTP in Base32 encoding",
default="ZYTYYE5FOAGW5ML7LRWUL4WTZLNJAMZS",
)
parser.add_argument(
"--verbose",
"-v",
action="count",
default=0,
help="verbose mode (for debugging)",
)
parser.add_argument(
"--remember-me",
action="store_true",
help="check the Remember me option when logging in",
)
return parser.parse_args()
class OIDCChecker:
def curl(self, url, data=None):
if self.args.verbose >= 1:
print("curl: {}".format(url))
req = urllib.request.Request(
url=url,
data=urllib.parse.urlencode(data).encode("ascii") if data else None,
)
if self.args.verbose >= 1:
print("")
response = None
try:
response = self.opener.open(req)
return response
except urllib.error.URLError as e:
if self.args.verbose >= 1:
print(e)
if self.args.verbose >= 2:
print(response)
self.evaluator.finish_test(e.reason, Status.CRITICAL)
def js_form_redirect(self, html, url, force=False):
if (
force
or "document.forms[0].submit()" in html
or "javascript:DoSubmit();" in html
):
form_action, form_data = parse_form(html)
return self.send_form(url, form_action, form_data)
return None, None
def initial_request(self, url):
response = self.curl(url)
response_html = response.read().decode("utf-8")
response_url = response.url
if not self.is_using_pkce:
response_html, response_url = self.js_form_redirect(
response_html, response.url
)
return response_html, response_url
def send_form(self, url, action, data):
target_url = urllib.parse.urljoin(url, action)
response = self.curl(target_url, data)
return response.read().decode("utf-8"), response.url
def send_credentials(self, login_form_url, login_form_action, login_form_data):
login_form_data[self.args.username_field] = self.args.username
login_form_data[self.args.password_field] = self.args.password
if self.args.remember_me:
login_form_data["remember_me"] = "Yes"
response_html, response_url = self.send_form(
login_form_url, login_form_action, login_form_data
)
if self.args.verbose >= 1:
print(response_url)
if self.args.verbose >= 3:
print(response_html)
# MFA
if "totp" in response_html.lower() or "privacyidea" in response_html.lower():
if self.args.verbose >= 1:
print("MFA is required")
totp_form_action, totp_form_data = parse_form(response_html)
totp_code = totp(self.args.totp)
totp_form_data["code"] = totp_code
totp_form_data["otp"] = totp_code
response_html, response_url = self.send_form(
response_url, totp_form_action, totp_form_data
)
if self.args.verbose >= 1:
print(response_url)
if "TOTP" in response_html or "privacyIDEA" in response_html:
if self.args.verbose >= 2:
print(response_html)
self.evaluator.finish_test("TOTP MFA failed", Status.CRITICAL)
if self.args.verbose >= 3:
print(response_html)
if "consent" in response_html:
self.evaluator.finish_test("Consent is required", Status.UNKNOWN)
elif "Wrong UČO or password" in response_html:
self.evaluator.finish_test(
"Login was not successful, invalid username or password",
Status.CRITICAL,
)
elif "Unhandled exception" in response_html:
self.evaluator.finish_test(
"Login was not successful, unhandled exception occured", Status.CRITICAL
)
elif "SAMLResponse" not in response_html:
self.evaluator.finish_test(
"Login was not successful, unknown error", Status.CRITICAL
)
form_action, form_data = parse_form(response_html)
if "SAMLResponse" not in form_data:
self.evaluator.finish_test(
"Login was not successful, unknown error", Status.CRITICAL
)
saml_response = base64.b64decode(form_data["SAMLResponse"]).decode("utf-8")
if (
'<samlp:StatusCode Value="urn:oasis:names:tc:SAML:2.0:status:Success"/>'
not in saml_response
):
self.evaluator.finish_test(
"Login was not successful, non-success response", Status.CRITICAL
)
return self.js_form_redirect(response_html, response_url, True)
def main(self):
"""
CMD Line tool
"""
self.evaluator.start_test()
# 1. start the Flask endpoint which handles OIDC callback
thread = threading.Thread(
target=start_flask_app,
args=[
self.client,
self.evaluator,
self.args.redirect_uri,
self.args.verbose,
],
)
thread.start()
# 2. initiate auth process
req_args = {"redirect_uri": self.args.redirect_uri}
login_url = self.client.init_authorization(req_args=req_args)
if self.args.verbose >= 3:
print(f"OIDC initiating url: '{login_url}'")
# 3. further proceed with auth
login_form_html, login_form_url = self.initial_request(login_url)
if self.args.verbose >= 3:
print(f"Login form url: '{login_form_url}'")
print(f"Login form html: '{login_form_html}'")
# 4. log in and wait for the response to be processed by Flask where exchange
# of code for token happens
# correct redirect ends the test in Flask endpoint
login_form_action, login_form_data = parse_form(login_form_html)
html, response_url = self.send_credentials(
login_form_url, login_form_action, login_form_data
)
if self.args.verbose >= 3:
print(html)
# This point should not be reached if callback endpoint was called
if not CALLBACK_ENDPOINT_CALLED.is_set():
self.evaluator.finish_test(
"Callback endpoint was not called.", Status.CRITICAL
)
def get_issuer_info(self, issuer):
well_known_endpoint = f"{issuer}/.well-known/openid-configuration"
urllib.request.Request(well_known_endpoint)
response = self.curl(well_known_endpoint)
return json.loads(response.read().decode("utf-8"))
def get_pkce_client_cfg(
self, client_id: str, issuer: str, scopes: List[str]
) -> Dict[str, Any]:
issuer_info = self.get_issuer_info(issuer)
return {
"provider_info": {
"issuer": issuer,
"jwks_uri": issuer_info.get("jwks_uri"),
"userinfo_endpoint": issuer_info.get("userinfo_endpoint"),
"token_endpoint": issuer_info.get("token_endpoint"),
"authorization_endpoint": issuer_info.get("authorization_endpoint"),
},
"add_ons": {
"pkce": {
"function": "idpyoidc.client.oauth2.add_on.pkce.add_support",
"kwargs": {
"code_challenge_length": 64,
"code_challenge_method": "S256",
},
},
},
"client_id": client_id,
"scopes_supported": scopes,
"client_type": "oidc",
}
def get_non_pkce_client_cfg(
self, client_id: str, client_secret: str, issuer: str, scopes: List[str]
) -> Dict[str, Any]:
return {
"provider_info": {
"issuer": issuer,
},
"client_id": client_id,
"client_secret": client_secret,
"scopes_supported": scopes,
"client_type": "oidc",
}
def get_registered_client(
self, client_id: str, client_secret: str, issuer: str, scopes: List[str]
) -> StandAloneClient:
if self.is_using_pkce:
client_cfg = self.get_pkce_client_cfg(client_id, issuer, scopes)
else:
client_cfg = self.get_non_pkce_client_cfg(
client_id, client_secret, issuer, scopes
)
client = StandAloneClient(config=client_cfg)
client.do_provider_info() # get provider info based on issuer in client's
# config
client.do_client_registration() # client is configured statically if
# client_id is provided in the config
return client
def __init__(self, args):
self.args = args
self.cookiejar = http.cookiejar.CookieJar()
self.opener = urllib.request.build_opener(
urllib.request.HTTPCookieProcessor(self.cookiejar),
)
self.is_using_pkce = args.client_secret is None
self.client = self.get_registered_client(
args.client_id, args.client_secret, args.issuer, args.scopes
)
self.evaluator = Evaluator()
def main():
checker = OIDCChecker(get_args())
checker.main()
if __name__ == "__main__":
main()
......@@ -38,6 +38,7 @@ setuptools.setup(
"check_ldap_syncrepl=check_syncrepl_extended.check_syncrepl_extended:main",
"check_mongodb=perun.proxy.utils.nagios.check_mongodb:main",
"check_nginx=check_nginx_status.check_nginx_status:main",
"check_oidc_login=perun.proxy.utils.nagios.check_oidc_login:main",
"check_rpc_status=perun.proxy.utils.nagios.check_rpc_status:main",
"check_saml=perun.proxy.utils.nagios.check_saml:main",
"check_user_logins=perun.proxy.utils.nagios.check_user_logins:main",
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment