Skip to content
Snippets Groups Projects
Verified Commit aa762141 authored by Peter Bolha's avatar Peter Bolha :ok_hand_tone1:
Browse files

feat: sync script for privacyidea tokens to perun

parent 2dad7f66
No related branches found
No related tags found
1 merge request!49feat: sync script for privacyidea tokens to perun
Pipeline #375875 passed
...@@ -77,6 +77,27 @@ run_probes ...@@ -77,6 +77,27 @@ run_probes
- 1... - machines to run the script on in the form of user@adress, the user needs - 1... - machines to run the script on in the form of user@adress, the user needs
root privileges to execute the script root privileges to execute the script
### sync_usable_token_types.py
Collects information about the usable token types of each privacyIDEA user and sends it
to Perun. Each user with usable tokens in privacyIDEA is assigned a list of their types, for example: `['backupcode', 'totp']`. A token is considered usable when it is **active** and it is not **locked** or
**revoked** and its rollout state allows logging in.
For more usage instructions, run:
```sh
sync_usable_token_types --help
```
Example:
```sh
python3 sync_usable_token_types.py
--mfa-active-tokens-attr-name "attr_name"
--perun-user-id-regex "\d+"
--perun-connector-config-path "/path/to/file"
```
## Nagios probes ## Nagios probes
All nagios scripts are located under `nagios` directory. All nagios scripts are located under `nagios` directory.
......
import argparse
import os
import re
from typing import List, Tuple, Optional, Pattern
import yaml
from perun.connector import AdaptersManager
from privacyidea.models import db, TokenOwner, Token
# supplied as default values for arguments
PERUN_USER_ID_REGEX = r"perunUserId=(?P<perun_user_id>\d+),"
MFA_ACTIVE_TOKENS_ATTR_NAME = "urn:perun:user:attribute-def:def:mfaTokenTypes:mu"
PERUN_CONNECTOR_CONFIG_PATH = "/etc/perun-connector.yaml"
class ROLLOUTSTATE(object):
CLIENTWAIT = "clientwait"
PENDING = "pending"
VERIFYPENDING = "verify"
ENROLLED = "enrolled"
BROKEN = "broken"
FAILED = "failed"
DENIED = "denied"
UNUSABLE_ROLLOUT_STATES = [
ROLLOUTSTATE.CLIENTWAIT,
ROLLOUTSTATE.PENDING,
ROLLOUTSTATE.VERIFYPENDING,
ROLLOUTSTATE.BROKEN,
ROLLOUTSTATE.FAILED,
ROLLOUTSTATE.DENIED,
]
def load_attrs_manager_config(config_filepath):
if os.path.exists(config_filepath):
with open(config_filepath, "r") as f:
config = yaml.safe_load(f)
return config
else:
raise FileNotFoundError(
f"Attempted to load attributes manager config from '{config_filepath}' "
f"but the file was not found."
)
def get_adapters_manager(config_path: str) -> AdaptersManager:
cfg = load_attrs_manager_config(config_path)
if not cfg:
raise ValueError("Was not able to load the attributes manager config.")
adapters_manager = AdaptersManager(
cfg["attributes_manager_config"], cfg["attributes_map"]
)
return adapters_manager
def get_args():
"""
Supports the command-line arguments listed below.
"""
parser = argparse.ArgumentParser(description="SAML authentication check")
parser._optionals.title = "Options"
parser.add_argument(
"--mfa-active-tokens-attr-name",
"-a",
default=MFA_ACTIVE_TOKENS_ATTR_NAME,
help="name of Perun attribute containing user's active MFA tokens",
)
parser.add_argument(
"--perun-user-id-regex",
"-r",
default=PERUN_USER_ID_REGEX,
help="regex for parsing Perun user ID from privacyIDEA user ID",
)
parser.add_argument(
"--perun-connector-config-path",
"-c",
default=PERUN_CONNECTOR_CONFIG_PATH,
help="path to config for Perun Connector",
)
args = parser.parse_args()
return args
def get_user_token_types() -> List[Tuple[str, str]]:
user_token_types = (
db.session.query(TokenOwner.user_id, Token.tokentype)
.join(Token, Token.id == TokenOwner.token_id)
.filter(Token.active.is_(True))
.filter(Token.locked.is_(False))
.filter(Token.revoked.is_(False))
.filter(Token.rollout_state.notin_(UNUSABLE_ROLLOUT_STATES))
.distinct(TokenOwner.user_id, Token.tokentype)
.order_by(TokenOwner.user_id)
.all()
)
return user_token_types
def parse_perun_user_id(
perun_user_id_regex: Pattern[str], privacyidea_user_id: str
) -> Optional[str]:
match = re.search(perun_user_id_regex, privacyidea_user_id)
if match:
return match.group("perun_user_id")
return None
def main(args):
perun_user_id_regex = re.compile(args.perun_user_id_regex)
mfa_active_tokens_attr_name = args.mfa_active_tokens_attr_name
adapters_manager = get_adapters_manager(args.perun_connector_config_path)
user_token_types = get_user_token_types()
current_user_id = ""
current_user_token_types = []
for privacyidea_user_id, token_type in user_token_types:
perun_user_id = parse_perun_user_id(perun_user_id_regex, privacyidea_user_id)
if perun_user_id and perun_user_id != current_user_id:
if current_user_id:
current_user_token_types.sort()
attr_to_set = {mfa_active_tokens_attr_name: current_user_token_types}
adapters_manager.set_user_attributes(int(perun_user_id), attr_to_set)
current_user_id = perun_user_id
current_user_token_types = []
current_user_token_types.append(token_type)
if __name__ == "__main__":
main(get_args())
...@@ -17,6 +17,8 @@ setuptools.setup( ...@@ -17,6 +17,8 @@ setuptools.setup(
"PyYAML>=5.4,<7.0", "PyYAML>=5.4,<7.0",
"check_nginx_status~=1.0", "check_nginx_status~=1.0",
"pyotp~=2.9", "pyotp~=2.9",
"perun.connector~=3.8",
"privacyidea~=3.9",
], ],
extras_require={ extras_require={
"ldap": [ "ldap": [
...@@ -51,6 +53,7 @@ setuptools.setup( ...@@ -51,6 +53,7 @@ setuptools.setup(
"run_version_script=perun.proxy.utils.run_version_script:main", "run_version_script=perun.proxy.utils.run_version_script:main",
"separate_oidc_logs=perun.proxy.utils.separate_oidc_logs:main", "separate_oidc_logs=perun.proxy.utils.separate_oidc_logs:main",
"separate_ssp_logs=perun.proxy.utils.separate_ssp_logs:main", "separate_ssp_logs=perun.proxy.utils.separate_ssp_logs:main",
"sync_usable_token_types=perun.proxy.utils.sync_usable_token_types:main",
] ]
}, },
) )
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment