diff --git a/README.md b/README.md index 4c0b221b47a1c778895dac137de9603ef1384f87..2cb552107f7237fdecc80895adb5aab9ac07fdb5 100644 --- a/README.md +++ b/README.md @@ -1,8 +1,6 @@ # ProxyIdP scripts -All nagios scripts are located under `nagios` directory. - -## List scripts +## Scripts ### separate_ssp_script.py @@ -43,11 +41,40 @@ All nagios scripts are located under `nagios` directory. - Params: - 1... - machines to run the script on in the form of user@adress, the user needs root privileges to execute the script +## Nagios probes + +All nagios scripts are located under `nagios` directory. + ### check_mongodb.py - nagios monitoring probe for mongodb - connect, connections, replication_lag, replset_state monitoring options are tested (some possible options may not work since there are constructs which are not supported by latest mongodb versions) -- For usage run: +- for usage run: `python3 check_mongodb.py --help` + +### check_saml.py + +- SAML authentication check + +- for usage run: + `python3 check_saml.py --help` + +### check_user_logins.py + +- check users which login in repeatedly more often than a specified threshold (logins per seconds) + +- for usage run: + `python3 check_user_logins.py --help` + +- example: + +``` +python3 check_user_logins.py + -p /var/log/proxyaai/simplesamlphp/simplesamlphp/simplesamlphp.log + -l 5 + -s 60 + -r "^(?P<datetime>.{20}).*audit-login.* (?P<userid>[0-9]+)@muni\.cz$" + -d "%b %d %Y %H:%M:%S" +``` diff --git a/perun/proxy/utils/nagios/check_saml.py b/perun/proxy/utils/nagios/check_saml.py new file mode 100755 index 0000000000000000000000000000000000000000..fc4601d745ae48ec94e4ec154780594c64ea4958 --- /dev/null +++ b/perun/proxy/utils/nagios/check_saml.py @@ -0,0 +1,543 @@ +#!/usr/bin/env python3 + +""" +make a full roundtrip test for SAML based SSO +""" + +import base64 +import hmac +import os +import struct +import argparse +import sys +import time +import urllib.request +import urllib.error +import urllib.parse +import ssl +import tempfile +import re +from html.parser import HTMLParser +import http.cookiejar + +STATUS = {"OK": 0, "WARNING": 1, "CRITICAL": 2, "UNKNOWN": 3} + +DEFAULT_HEADERS = { + "Accept": "text/html,application/xhtml+xml," + + "application/xml;q=0.9,image/webp,*/*;q=0.8", +} + +CACHE_REGEX = ".*_(OK|WARNING|CRITICAL|UNKNOWN)_.*" + + +# https://github.com/susam/mintotp/blob/master/mintotp.py +def hotp(key, counter, digits=6, digest="sha1"): + key = base64.b32decode(key.upper() + "=" * ((8 - len(key)) % 8)) + counter = struct.pack(">Q", counter) + mac = hmac.new(key, counter, digest).digest() + offset = mac[-1] & 0x0F + endoffset = offset + 4 + binary = struct.unpack(">L", mac[offset:endoffset])[0] & 0x7FFFFFFF + return str(binary)[-digits:].zfill(digits) + + +def totp(key, time_step=30, digits=6, digest="sha1"): + return hotp(key, int(time.time() / time_step), digits, digest) + + +class FormParser(HTMLParser): + form_action = None + form_data = {} + _form_in_progress = False + + def __init__(self, *args, **kwargs): + self.form_action = None + self.form_data = {} + self._form_in_progress = False + super(FormParser, self).__init__(*args, **kwargs) + + def handle_starttag(self, tag, attrs): + if tag.lower() == "form": + self._form_in_progress = True + for name, value in attrs: + if name.lower() == "action" and self.form_action is None: + self.form_action = value + elif tag.lower() == "input": + input_name = None + input_value = None + for name, value in attrs: + if name.lower() == "name": + input_name = value + elif name.lower() == "value": + input_value = value + if input_name and input_value: + self.form_data[input_name] = input_value + + def handle_endtag(self, tag): + if tag.lower() == "form": + self._form_in_progress = False + + +def parse_form(html): + parser = FormParser() + parser.feed(html) + return parser.form_action, parser.form_data + + +def get_host_from_url(url): + return urllib.parse.urlparse(url).hostname + + +def get_args(): + """ + Supports the command-line arguments listed below. + """ + parser = argparse.ArgumentParser(description="SAML authentication check") + parser._optionals.title = "Options" + parser.add_argument( + "--username", + required=True, + help="username for IdP", + ) + parser.add_argument( + "--password", + required=True, + help="password for IdP", + ) + parser.add_argument( + "--url", + help="URL that starts authentication", + default="https://inet.muni.cz/sys/servertest", + ) + parser.add_argument( + "--string", + help="string to expect after successful authentication", + default="OSCIS", + ) + parser.add_argument("--idp-host", help="hostname of IdP", default="id.muni.cz") + parser.add_argument( + "--hosts", + nargs="*", + default=[], + help="space separated list of hostname:ip or hostname:hostname pairs " + + "for replacing in all URLs", + ) + """ + parser.add_argument( + "--other-urls", + nargs="*", + default=[], + help="list of more services for testing single sign-on", + ) + parser.add_argument( + "--other-urls-final", + nargs="*", + default=[], + help="list of final URLs of the other services for testing single sign-on", + ) + """ + parser.add_argument( + "--warn-time", + type=int, + help="warning threshold in seconds", + default=5, + ) + parser.add_argument( + "--critical-time", + type=int, + help="critical threshold in seconds", + default=15, + ) + parser.add_argument( + "--insecure", + action="store_true", + help="ignore server name in SSL/TLS certificates", + ) + parser.add_argument( + "--username-field", + help="name of the username field on the login page", + default="username", + ) + parser.add_argument( + "--password-field", + help="name of the password field on the login page", + default="password", + ) + parser.add_argument( + "--totp", + help="secret key (seed) for TOTP in Base32 encoding", + default="ZYTYYE5FOAGW5ML7LRWUL4WTZLNJAMZS", + ) + parser.add_argument( + "--verbose", + "-v", + action="count", + default=0, + help="verbose mode (for debugging)", + ) + parser.add_argument( + "--remember-me", + action="store_true", + help="check the Remember me option when logging in", + ) + parser.add_argument( + "--skip-security-image-check", + action="store_true", + help="skip security image check when remember me is used", + ) + parser.add_argument( + "--cache-timeout", + type=int, + help="specify the time after which the cache will be wiped", + default=0, + ) + parser.add_argument( + "--cache-file", + default="check_saml_cache", + help="name of the file used for the cache", + ) + + return parser.parse_args() + + +def replace_host_in_url(hosts, url, headers): + host = get_host_from_url(url) + headers["Host"] = host + if host in hosts: + parsed = urllib.parse.urlparse(url) + url = parsed._replace(netloc=hosts[host]).geturl() + headers["Host"] = host + return url, headers + + +class ResolvingHTTPRedirectHandler(urllib.request.HTTPRedirectHandler): + def __init__(self, hosts, verbose=0): + self.hosts = hosts + self.verbose = verbose + + def redirect_request(self, req, fp, code, msg, headers, newurl): + """Check whether the host should be replaced with an IP""" + referrer = req.headers["Referer"] if "Referer" in req.headers else None + newurl, newheaders = replace_host_in_url(self.hosts, newurl, DEFAULT_HEADERS) + req.headers = newheaders + if referrer: + req.headers["Referer"] = referrer + if self.verbose >= 1: + print("Redirecting to {}".format(newurl)) + return super().redirect_request(req, fp, code, msg, headers, newurl) + + def http_error_308(self, req, fp, code, msg, hdrs): + return self.http_error_301(req, fp, 301, msg, hdrs) + + +class SAMLChecker: + def curl(self, url, data=None, referrer=None): + url, headers = replace_host_in_url(self.hosts, url, DEFAULT_HEADERS) + + if referrer: + headers["Referer"], _ = replace_host_in_url( + dict(map(reversed, self.hosts.items())), + urllib.parse.urlparse(referrer) + ._replace(fragment="") + ._replace(query="") + .geturl(), + {}, + ) + if self.args.verbose >= 1: + print("curl: {}".format(url)) + if "Referer" in headers: + print("Referrer: {}".format(headers["Referer"])) + req = urllib.request.Request( + url=url, + data=urllib.parse.urlencode(data).encode("ascii") if data else None, + headers=headers, + ) + if self.args.verbose >= 1: + print("") + response = None + try: + response = self.opener.open(req) + return response + except urllib.error.URLError as e: + if self.args.verbose >= 1: + print(e) + if self.args.verbose >= 2: + print(response) + self.finish(e.reason, "CRITICAL") + + def js_form_redirect(self, html, url, force=False): + if ( + force + or "document.forms[0].submit()" in html + or "javascript:DoSubmit();" in html + ): + form_action, form_data = parse_form(html) + return self.send_form(url, form_action, form_data) + return None, None + + def initial_request(self, url): + response = self.curl(url) + response_html = response.read().decode("utf-8") + response_url = response.url + if get_host_from_url(response_url) != ( + self.hosts[self.args.idp_host] + if self.args.idp_host in self.hosts + else self.args.idp_host + ): + response_html, response_url = self.js_form_redirect( + response_html, response_url + ) + if response_html is None: + self.finish( + "Initial URL does not redirect to IdP and JS redirect not detected", + "CRITICAL", + ) + return response_html, response_url + + def send_form(self, url, action, data): + target_url = urllib.parse.urljoin(url, action) + response = self.curl(target_url, data, url) + return response.read().decode("utf-8"), response.url + + def send_credentials(self, login_form_url, login_form_action, login_form_data): + login_form_data[self.args.username_field] = self.args.username + login_form_data[self.args.password_field] = self.args.password + if self.args.remember_me: + login_form_data["remember_me"] = "Yes" + response_html, response_url = self.send_form( + login_form_url, login_form_action, login_form_data + ) + + if self.args.verbose >= 1: + print(response_url) + if self.args.verbose >= 3: + print(response_html) + + # MFA + if "TOTP" in response_html or "privacyIDEA" in response_html: + if self.args.verbose >= 1: + print("MFA is required") + totp_form_action, totp_form_data = parse_form(response_html) + totp_code = totp(self.args.totp) + totp_form_data["code"] = totp_code + totp_form_data["otp"] = totp_code + response_html, response_url = self.send_form( + response_url, totp_form_action, totp_form_data + ) + if self.args.verbose >= 1: + print(response_url) + if "TOTP" in response_html or "privacyIDEA" in response_html: + if self.args.verbose >= 2: + print(response_html) + self.finish("TOTP MFA failed", "CRITICAL") + if self.args.verbose >= 3: + print(response_html) + + if "consent" in response_html: + self.finish("Consent is required", "UNKNOWN") + elif "Wrong UČO or password" in response_html: + self.finish( + "Login was not successful, invalid username or password", "CRITICAL" + ) + elif "Unhandled exception" in response_html: + self.finish( + "Login was not successful, unhandled exception occured", "CRITICAL" + ) + elif "SAMLResponse" not in response_html: + self.finish("Login was not successful, unknown error", "CRITICAL") + + form_action, form_data = parse_form(response_html) + if "SAMLResponse" not in form_data: + self.finish("Login was not successful, unknown error", "CRITICAL") + saml_response = base64.b64decode(form_data["SAMLResponse"]).decode("utf-8") + + if ( + '<samlp:StatusCode Value="urn:oasis:names:tc:SAML:2.0:status:Success"/>' + not in saml_response + ): + self.finish("Login was not successful, non-success response", "CRITICAL") + + return self.js_form_redirect(response_html, response_url, True) + + def js_form_redirect_all(self, html, url): + for _ in range(10): + try_html, try_url = self.js_form_redirect(html, url) + if try_html is not None and try_url is not None: + html = try_html + url = try_url + else: + return (html, url) + return (html, url) + + def finish( + self, + message, + status="OK", + cache_time=time.time(), + from_cache=False, + auth_time=None, + ): + if auth_time is not None and isinstance(auth_time, float): + message = "{}|authtime={:.2f};{};{};;".format( + message, auth_time, self.args.warn_time, self.args.critical_time + ) + if auth_time is None and from_cache is False: + message = "{}|authtime=;{};{};;".format( + message, self.args.warn_time, self.args.critical_time + ) + if self.args.cache_timeout > 0: + try: + file_path = tempfile.gettempdir() + "/" + args.cache_file + f = open(file_path, "w") + f.write("{}_{}_{}".format(cache_time, status, message)) + f.close() + except (OSError, ValueError): + pass + if from_cache: + message = "Cached: " + message + print("{} - {}".format(status, message)) + sys.exit(STATUS[status]) + + def check_cache(self): + try: + tempdir = tempfile.gettempdir() + file_path = tempdir + "/" + args.cache_file + if os.path.isfile(file_path): + with open(file_path, "r") as f: + res_b = f.read() + if not re.match(CACHE_REGEX, res_b): + raise ValueError("Bad cache content!") + res = res_b.split("_") + cached_time = float(res[0]) + status = res[1] + message = res[2] + actual_time = time.time() + time_diff = actual_time - float(cached_time) + if time_diff < args.cache_timeout: + self.finish( + message=message, + status=status, + cache_time=cached_time, + from_cache=True, + ) + except (OSError, ValueError): + pass + + def main(self): + """ + CMD Line tool + """ + + if self.args.cache_timeout > 0: + self.check_cache() + + start_time = time.time() + + # 1. start authentication + login_form_html, login_form_url = self.initial_request(self.args.url) + if self.args.verbose >= 3: + print(login_form_html) + + # 2. log in and post response back + login_form_action, login_form_data = parse_form(login_form_html) + html, response_url = self.send_credentials( + login_form_url, login_form_action, login_form_data + ) + + # 3. follow all JS redirects + html, response_url = self.js_form_redirect_all(html, response_url) + + if self.args.string not in html: + if self.args.verbose >= 2: + print(html) + self.finish( + "Missing the testing string {} in the response.".format( + self.args.string + ), + "CRITICAL", + ) + + if self.args.verbose >= 3: + print(html) + + elapsed_seconds = time.time() - start_time + status = "OK" + if elapsed_seconds >= self.args.critical_time: + status = "CRITICAL" + if elapsed_seconds >= self.args.warn_time: + status = "WARNING" + + # test whether security image is shown + if self.args.remember_me: + # logout from SP and IdP but keep username cookie + self.cookiejar.clear_session_cookies() + self.cookiejar.clear(get_host_from_url(self.args.url)) + self.cookiejar.clear( + ( + self.hosts[self.args.idp_host] + if self.args.idp_host in self.hosts + else self.args.idp_host + ), + "/", + "SimpleSAMLAuthToken", + ) + self.cookiejar.clear( + ( + self.hosts[self.args.idp_host] + if self.args.idp_host in self.hosts + else self.args.idp_host + ), + "/", + "SimpleSAMLSessionID", + ) + login_form_html, login_form_url = self.initial_request(self.args.url) + if self.args.verbose >= 3: + print(login_form_html) + if ( + not self.args.skip_security_image_check + and "class='antiphishing-img'" not in login_form_html + ): + self.finish( + "Missing security image on the login page.", + "CRITICAL", + ) + if self.args.username not in login_form_html: + self.finish( + "Missing remembered username on the login page.", + "WARNING", + ) + + self.finish( + "Authentication took {:.2f} seconds".format(elapsed_seconds), + status, + auth_time=elapsed_seconds, + ) + + def __init__(self, args): + self.args = args + self.hosts = { + host.split(":", 1)[0]: host.split(":", 1)[1] + for host in ( + self.args.hosts[0].strip("\"'").split(" ") + if self.args.hosts and " " in self.args.hosts[0] + else self.args.hosts + ) + } + self.cookiejar = http.cookiejar.CookieJar() + if self.args.insecure: + self.opener = urllib.request.build_opener( + urllib.request.HTTPCookieProcessor(self.cookiejar), + ResolvingHTTPRedirectHandler(self.hosts, self.args.verbose), + urllib.request.HTTPSHandler(context=ssl.SSLContext()), + ) + else: + self.opener = urllib.request.build_opener( + urllib.request.HTTPCookieProcessor(self.cookiejar), + ResolvingHTTPRedirectHandler(self.hosts, self.args.verbose), + ) + + +if __name__ == "__main__": + args = get_args() + checker = SAMLChecker(args) + checker.main() diff --git a/perun/proxy/utils/nagios/check_user_logins.py b/perun/proxy/utils/nagios/check_user_logins.py new file mode 100644 index 0000000000000000000000000000000000000000..29867e62eb0053d7bc7b41e0059cf5dbaf77c929 --- /dev/null +++ b/perun/proxy/utils/nagios/check_user_logins.py @@ -0,0 +1,151 @@ +#!/usr/bin/env python3 + +import datetime +import getopt +import sys +import re + +# nagios return codes +UNKNOWN = -1 +OK = 0 +WARNING = 1 +CRITICAL = 2 +usage = ( + "usage: ./check_user_logins.py\n" + " -p/--path <string> path to log file\n" + " -r/--regex <string> parsing regex of logfile, must include userid group and datetime group\n" + " -d/--datetime_format <string> datetime format of log file\n" + " -l/--logins <integer> maximal number of logins\n" + " -s/--seconds <integer> time interval for logins check\n" +) + + +def parse_log_data(log_path, regex, date_format): + file = open(log_path, "r", encoding="utf-8") + lines = file.readlines() + user_dict = {} + for line in lines: + result = re.match(regex, line) + if result: + user_id = result.group("userid") + if user_id not in user_dict.keys(): + user_dict[user_id] = [ + datetime.datetime.strptime( + result.group("datetime"), date_format + ).timestamp() + ] + else: + user_dict[user_id].append( + datetime.datetime.strptime( + result.group("datetime"), date_format + ).timestamp() + ) + return user_dict + + +def check_log_data(user_dict, limits, seconds): + warning = False + for user, date_times in user_dict.items(): + final_count = 0 + count = 0 + date_times.sort() + for check_date_time in range(len(date_times)): + for i in range(len(date_times)): + if check_date_time <= i: + if date_times[i] - date_times[check_date_time] <= seconds: + count += 1 + else: + break + if final_count < count: + final_count = count + count = 0 + + if final_count > limits: + print("WARNING - User: {} logins count: {}".format(user, final_count)) + warning = True + if warning: + sys.exit(WARNING) + + +# define command lnie options and validate data. Show usage or provide info on required options +def command_line_validate(argv): + try: + opts, args = getopt.getopt( + argv, + "p:r:d:l:s:", + ["path=", "regex=", "datetime_format=" "logins=", "seconds="], + ) + except getopt.GetoptError: + print(usage) + try: + for opt, arg in opts: + if opt in ("-l", "--logins"): + try: + logins = int(arg) + if logins <= 0: + sys.exit(CRITICAL) + except: + print("***logins value must be an integer higher than 0 ***") + sys.exit(CRITICAL) + elif opt in ("-s", "--seconds"): + try: + seconds = int(arg) + if seconds <= 0: + sys.exit(CRITICAL) + except: + print("***seconds value must be an integer higher than 0 ***") + sys.exit(CRITICAL) + elif opt in ("-p", "--path"): + path = arg + elif opt in ("-r", "--regex"): + regex = arg + elif opt in ("-d", "--datetime_format"): + datetime_format = arg + else: + print(usage) + try: + isinstance(logins, int) + except: + print("***logins is required***") + print(usage) + sys.exit(CRITICAL) + try: + isinstance(seconds, int) + except: + print("***seconds is required***") + print(usage) + sys.exit(CRITICAL) + try: + isinstance(path, str) + except: + print("***path to log file is required***") + print(usage) + sys.exit(CRITICAL) + try: + isinstance(regex, str) + except: + print("***parsing regex is required***") + print(usage) + sys.exit(CRITICAL) + try: + isinstance(datetime_format, str) + except: + print("***parsing datetime format is required***") + print(usage) + sys.exit(CRITICAL) + except: + sys.exit(CRITICAL) + return path, regex, datetime_format, logins, seconds + + +def main(): + argv = sys.argv[1:] + path, regex, datetime_format, logins, seconds = command_line_validate(argv) + user_dict = parse_log_data(path, regex, datetime_format) + check_log_data(user_dict, logins, seconds) + print("OK", logins, seconds) + return 0 + + +if __name__ == "__main__": + main()