From aa762141e6fa1000f016c3adee33e66e62de7086 Mon Sep 17 00:00:00 2001 From: peterbolha <xbolha@fi.muni.cz> Date: Wed, 10 Jan 2024 11:19:23 +0100 Subject: [PATCH] feat: sync script for privacyidea tokens to perun --- README.md | 21 +++ perun/proxy/utils/sync_usable_token_types.py | 135 +++++++++++++++++++ setup.py | 3 + 3 files changed, 159 insertions(+) create mode 100644 perun/proxy/utils/sync_usable_token_types.py diff --git a/README.md b/README.md index d219faf..5c904d7 100644 --- a/README.md +++ b/README.md @@ -77,6 +77,27 @@ run_probes - 1... - machines to run the script on in the form of user@adress, the user needs root privileges to execute the script +### sync_usable_token_types.py + +Collects information about the usable token types of each privacyIDEA user and sends it +to Perun. Each user with usable tokens in privacyIDEA is assigned a list of their types, for example: `['backupcode', 'totp']`. A token is considered usable when it is **active** and it is not **locked** or +**revoked** and its rollout state allows logging in. + +For more usage instructions, run: + +```sh +sync_usable_token_types --help +``` + +Example: + +```sh +python3 sync_usable_token_types.py + --mfa-active-tokens-attr-name "attr_name" + --perun-user-id-regex "\d+" + --perun-connector-config-path "/path/to/file" +``` + ## Nagios probes All nagios scripts are located under `nagios` directory. diff --git a/perun/proxy/utils/sync_usable_token_types.py b/perun/proxy/utils/sync_usable_token_types.py new file mode 100644 index 0000000..86037cc --- /dev/null +++ b/perun/proxy/utils/sync_usable_token_types.py @@ -0,0 +1,135 @@ +import argparse +import os +import re +from typing import List, Tuple, Optional, Pattern + +import yaml +from perun.connector import AdaptersManager +from privacyidea.models import db, TokenOwner, Token + +# supplied as default values for arguments +PERUN_USER_ID_REGEX = r"perunUserId=(?P<perun_user_id>\d+)," +MFA_ACTIVE_TOKENS_ATTR_NAME = "urn:perun:user:attribute-def:def:mfaTokenTypes:mu" +PERUN_CONNECTOR_CONFIG_PATH = "/etc/perun-connector.yaml" + + +class ROLLOUTSTATE(object): + CLIENTWAIT = "clientwait" + PENDING = "pending" + VERIFYPENDING = "verify" + ENROLLED = "enrolled" + BROKEN = "broken" + FAILED = "failed" + DENIED = "denied" + + +UNUSABLE_ROLLOUT_STATES = [ + ROLLOUTSTATE.CLIENTWAIT, + ROLLOUTSTATE.PENDING, + ROLLOUTSTATE.VERIFYPENDING, + ROLLOUTSTATE.BROKEN, + ROLLOUTSTATE.FAILED, + ROLLOUTSTATE.DENIED, +] + + +def load_attrs_manager_config(config_filepath): + if os.path.exists(config_filepath): + with open(config_filepath, "r") as f: + config = yaml.safe_load(f) + return config + else: + raise FileNotFoundError( + f"Attempted to load attributes manager config from '{config_filepath}' " + f"but the file was not found." + ) + + +def get_adapters_manager(config_path: str) -> AdaptersManager: + cfg = load_attrs_manager_config(config_path) + + if not cfg: + raise ValueError("Was not able to load the attributes manager config.") + + adapters_manager = AdaptersManager( + cfg["attributes_manager_config"], cfg["attributes_map"] + ) + + return adapters_manager + + +def get_args(): + """ + Supports the command-line arguments listed below. + """ + parser = argparse.ArgumentParser(description="SAML authentication check") + parser._optionals.title = "Options" + parser.add_argument( + "--mfa-active-tokens-attr-name", + "-a", + default=MFA_ACTIVE_TOKENS_ATTR_NAME, + help="name of Perun attribute containing user's active MFA tokens", + ) + parser.add_argument( + "--perun-user-id-regex", + "-r", + default=PERUN_USER_ID_REGEX, + help="regex for parsing Perun user ID from privacyIDEA user ID", + ) + parser.add_argument( + "--perun-connector-config-path", + "-c", + default=PERUN_CONNECTOR_CONFIG_PATH, + help="path to config for Perun Connector", + ) + + args = parser.parse_args() + return args + + +def get_user_token_types() -> List[Tuple[str, str]]: + user_token_types = ( + db.session.query(TokenOwner.user_id, Token.tokentype) + .join(Token, Token.id == TokenOwner.token_id) + .filter(Token.active.is_(True)) + .filter(Token.locked.is_(False)) + .filter(Token.revoked.is_(False)) + .filter(Token.rollout_state.notin_(UNUSABLE_ROLLOUT_STATES)) + .distinct(TokenOwner.user_id, Token.tokentype) + .order_by(TokenOwner.user_id) + .all() + ) + return user_token_types + + +def parse_perun_user_id( + perun_user_id_regex: Pattern[str], privacyidea_user_id: str +) -> Optional[str]: + match = re.search(perun_user_id_regex, privacyidea_user_id) + if match: + return match.group("perun_user_id") + return None + + +def main(args): + perun_user_id_regex = re.compile(args.perun_user_id_regex) + mfa_active_tokens_attr_name = args.mfa_active_tokens_attr_name + adapters_manager = get_adapters_manager(args.perun_connector_config_path) + + user_token_types = get_user_token_types() + current_user_id = "" + current_user_token_types = [] + for privacyidea_user_id, token_type in user_token_types: + perun_user_id = parse_perun_user_id(perun_user_id_regex, privacyidea_user_id) + if perun_user_id and perun_user_id != current_user_id: + if current_user_id: + current_user_token_types.sort() + attr_to_set = {mfa_active_tokens_attr_name: current_user_token_types} + adapters_manager.set_user_attributes(int(perun_user_id), attr_to_set) + current_user_id = perun_user_id + current_user_token_types = [] + current_user_token_types.append(token_type) + + +if __name__ == "__main__": + main(get_args()) diff --git a/setup.py b/setup.py index 1730994..be8501f 100644 --- a/setup.py +++ b/setup.py @@ -17,6 +17,8 @@ setuptools.setup( "PyYAML>=5.4,<7.0", "check_nginx_status~=1.0", "pyotp~=2.9", + "perun.connector~=3.8", + "privacyidea~=3.9", ], extras_require={ "ldap": [ @@ -51,6 +53,7 @@ setuptools.setup( "run_version_script=perun.proxy.utils.run_version_script:main", "separate_oidc_logs=perun.proxy.utils.separate_oidc_logs:main", "separate_ssp_logs=perun.proxy.utils.separate_ssp_logs:main", + "sync_usable_token_types=perun.proxy.utils.sync_usable_token_types:main", ] }, ) -- GitLab