diff --git a/README.md b/README.md index 5610d1ffecebc02b12428a8336161bfb36afbee8..de59d49308e07de2de442d9a9f916ee05ef0925d 100644 --- a/README.md +++ b/README.md @@ -102,3 +102,14 @@ python3 check_user_logins.py ```sh python3 check_ldap.py --help ``` + +### check_privacyidea.py + +- check whether privacyidea is available +- use caching arguments for avoiding failure when one TOTP code is used two times + +- for usage run: + + ```sh + python3 check_privacyidea.py --help + ``` diff --git a/perun/proxy/utils/nagios/check_privacyidea.py b/perun/proxy/utils/nagios/check_privacyidea.py new file mode 100644 index 0000000000000000000000000000000000000000..29646171e6cf2f2c2ece6dfa787a4c57d245d3f8 --- /dev/null +++ b/perun/proxy/utils/nagios/check_privacyidea.py @@ -0,0 +1,165 @@ +#!/usr/bin/env python3 + +import argparse +import json +import sys +import tempfile +import time + +import pyotp + +import requests + +# nagios return codes +OK = 0 +CRITICAL = 2 + + +def get_args(): + parser = argparse.ArgumentParser( + description="Privacyidea TOTP authentication check" + ) + parser.add_argument( + "--hostname", + required=True, + type=str, + help="Privacyidea server hostname", + ) + parser.add_argument( + "--username", + required=True, + type=str, + help="The loginname/username of the user to authenticate with.", + ) + parser.add_argument("--pin", type=str, help="TOTP pin", default="") + parser.add_argument( + "--realm", + type=str, + help="The realm of the user to authenticate with. " + "If the realm is omitted, the user is looked up in the default realm.", + ) + parser.add_argument( + "--serial-number", + type=str, + dest="serial_number", + help="The serial number of the token.", + ) + parser.add_argument( + "--totp", + type=str, + help="secret key (seed) for TOTP in Base32 encoding", + required=True, + ) + parser.add_argument( + "--timeout", + type=int, + help="timeout for authentication request", + default=10, + ) + parser.add_argument( + "--otp-only", + action="store_true", + dest="otp_only", + default=False, + help="If set, only the OTP value is verified. Only used with " + "the parameter --serial-number.", + ) + parser.add_argument( + "--cache-timeout", + type=int, + dest="cache_timeout", + help="specify the time after which the cache will be wiped", + default=30, + ) + parser.add_argument( + "--cache-file", + dest="cache_file", + default="check_privacyidea_cache.txt", + type=str, + help="name of the file used for the cache stored in /tmp", + ) + + return parser.parse_args() + + +def main(): + args = get_args() + + if args.cache_timeout > 0: + try: + tempdir = tempfile.gettempdir() + file_path = tempdir + "/" + args.cache_file + with open(file_path, "r") as f: + data = json.load(f) + cached_time = data.get("time", time.time()) + exit_code = data.get("exit_code") + message = data.get("message") + time_diff = time.time() - cached_time + if time_diff < args.cache_timeout: + print(f"{exit_code} check_privacyidea - Cached: {message}") + return exit_code + except (OSError, ValueError): + pass + + request_data = { + "user": args.username, + "type": "totp", + } + + if args.otp_only and not args.serial_number: + raise argparse.ArgumentTypeError( + args.otp_only, "--otp-only cannot be used without --serial-number." + ) + + otp = pyotp.TOTP(args.totp) + + if args.realm: + request_data["realm"] = args.realm + + if args.serial_number: + request_data["serial"] = args.serial_number + + if args.otp_only: + request_data["otponly"] = 1 + request_data["pass"] = otp.now() + else: + request_data["pass"] = args.pin + otp.now() + + try: + response = requests.post( + f"https://{args.hostname}/validate/check", + json=request_data, + timeout=args.timeout, + ) + if response.status_code == 200: + result = response.json().get("result") + if not result.get("status"): + exit_code = CRITICAL + result_msg = "Server has problem." + elif not result.get("value"): + exit_code = CRITICAL + result_msg = "Authentication failed." + else: + exit_code = OK + result_msg = "Authentication was successful." + else: + exit_code = CRITICAL + result_msg = f"Response status code was {response.status_code}." + except requests.Timeout: + result_msg = f"Request timed out in {args.timeout} seconds." + exit_code = CRITICAL + + if args.cache_timeout > 0: + file_path = tempfile.gettempdir() + "/" + args.cache_file + with open(file_path, "w") as f: + f.write( + json.dumps( + {"time": time.time(), "exit_code": exit_code, "message": result_msg} + ) + ) + print(f"{exit_code} check_privacyidea - {result_msg}") + return exit_code + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/setup.py b/setup.py index 7bf8c55d2929928e6de870dcef805bced0842ffa..76f96ca942d86789b4a1cdedf076701f23874bd8 100644 --- a/setup.py +++ b/setup.py @@ -17,6 +17,7 @@ setuptools.setup( "PyYAML>=5.4,<7.0", "check_syncrepl_extended~=2020.13", "check_nginx_status~=1.0", + "pyotp~=2.9", ], extras_require={ "ldap": [ @@ -40,6 +41,7 @@ setuptools.setup( "check_php_syntax=perun.proxy.utils.nagios.check_php_syntax:main", "check_webserver_availability=" "perun.proxy.utils.nagios.webserver_availability:main", + "check_privacyidea=perun.proxy.utils.nagios.check_privacyidea:main", "metadata_expiration=perun.proxy.utils.metadata_expiration:main", "print_docker_versions=perun.proxy.utils.print_docker_versions:main", "run_version_script=perun.proxy.utils.run_version_script:main",