Skip to content

Commits on Source 3

# [7.2.0](https://gitlab.ics.muni.cz/perun/perun-proxyidp/proxyidp-gui/compare/v7.1.0...v7.2.0) (2024-05-31)
### Features
* mitre sessions logouts ([9741e8c](https://gitlab.ics.muni.cz/perun/perun-proxyidp/proxyidp-gui/commit/9741e8c578e61c3959b06a9c265ab35051c766ba))
# [7.1.0](https://gitlab.ics.muni.cz/perun/perun-proxyidp/proxyidp-gui/compare/v7.0.0...v7.1.0) (2024-05-29)
......
......@@ -294,26 +294,26 @@ class UserManager:
"""
Returns list of unique client ids retrieved from active user's
sessions.
:param user_id: user, whose sessions are retrieved
:param sub: user, whose sessions are retrieved
:return: list of client ids
"""
# todo -- when user_id is stored in SSP db, this conversion will be needed
# subject = self.extract_user_attribute(self._SUBJECT_ATTRIBUTE, int(user_id))
# subject = self.extract_user_attribute(self._SUBJECT_ATTRIBUTE, int(sub))
ssp_clients = self._get_ssp_entity_ids_by_user(sub)
satosa_clients = self._get_satosa_client_ids_by_user(sub)
# mitre_clients = self._get_mitre_client_ids_by_user(user_id)
mitre_clients = self._get_mitre_client_ids_by_attribute(user_id=sub)
return ssp_clients + satosa_clients
return ssp_clients + satosa_clients + mitre_clients
def get_active_client_ids_for_session(self, session_id: str):
ssp_clients = self._get_ssp_entity_ids_by_session(session_id)
satosa_clients = self._get_satosa_client_ids_by_session(session_id)
# SKIP - mitre
mitre_clients = self._get_mitre_client_ids_by_attribute(session_id=session_id)
return ssp_clients + satosa_clients
return ssp_clients + satosa_clients + mitre_clients
def _get_mitre_client_ids_by_user(self, user_id: str) -> list[str]:
# todo - remove ? probably won't be used
def _get_mitre_client_ids_by_attribute(self, session_id=None, user_id=None):
engine = self.database_service.get_postgres_engine("mitre_database")
meta_data = MetaData()
meta_data.reflect(engine)
......@@ -324,25 +324,79 @@ class UserManager:
ACCESS_TOKEN_TBL = meta_data.tables["access_token"]
CLIENT_DETAILS_TBL = meta_data.tables["client_details"]
session_id_attr = (
self._cfg["mitre_database"]["ssp_session_id_attribute"]
or "urn:cesnet:proxyidp:attribute:sspSessionID"
)
# Not clear according witch attribute to search
if (session_id is None) == (user_id is None):
return []
matching_attr = False
# Search by session id
if session_id is not None:
matching_attr = SAVED_USER_AUTH_TBL.c.authentication_attributes.like(
f'%"{session_id_attr}":["{session_id}"]%'
)
# Search by user id
if user_id is not None:
matching_attr = SAVED_USER_AUTH_TBL.c.name == user_id
with engine.connect() as cnxn:
with cnxn.begin():
stmt = select(CLIENT_DETAILS_TBL.c.client_id).where(
CLIENT_DETAILS_TBL.c.id.in_(
session.query(ACCESS_TOKEN_TBL.c.client_id).filter(
# Get pair of user_id and session_id based on input attribute
stmt = select(
SAVED_USER_AUTH_TBL.c.id,
SAVED_USER_AUTH_TBL.c.authentication_attributes,
).where(matching_attr)
result = cnxn.execute(stmt)
result_dict = [
r for r in result
] # [(id, attrs(for session_id retrieval)))]
uid_sid_dict = []
# Retrieve right format of session id from auth attrs
for uid, auth_attrs in result_dict:
sid = json.loads(auth_attrs).get(session_id_attr)
uid_sid_dict.append((uid, sid))
uid_sid_dict = list(set(uid_sid_dict))
combined_result_dict = []
# Retrieve token value based on each user_ids
# -------------------------------------------
for uid, sid in uid_sid_dict:
stmt = select(
ACCESS_TOKEN_TBL.c.client_id, ACCESS_TOKEN_TBL.c.token_value
).where(
ACCESS_TOKEN_TBL.c.auth_holder_id.in_(
session.query(AUTH_HOLDER_TBL.c.id).filter(
AUTH_HOLDER_TBL.c.user_auth_id.in_(
session.query(SAVED_USER_AUTH_TBL.c.id).filter(
SAVED_USER_AUTH_TBL.c.name == user_id
)
)
)
AUTH_HOLDER_TBL.c.user_auth_id == uid
)
)
)
result = cnxn.execute(stmt)
result_dict = [r for r in result] # [(token_client_id, token_value)]
# Retrieve client_id for each token_client_id
# -------------------------------------------
for token_client_id, token_value in result_dict:
# Get issuer from token_value
issuer = self._get_issuer_from_id_token(token_value)
# Another select for clients_ids
stmt = select(CLIENT_DETAILS_TBL.c.client_id).where(
CLIENT_DETAILS_TBL.c.id == token_client_id
)
result = cnxn.execute(stmt)
return [r[0] for r in result]
client_ids = [r[0] for r in result]
for client_id in client_ids:
combined_result_dict.append((client_id, sid, issuer, None))
return list(set(combined_result_dict))
def get_user_id_by_ssp_session_id(self, ssp_session_id: str) -> Union[str, None]:
if ssp_session_id is None:
......
[metadata]
version = 7.1.0
version = 7.2.0
license_files = LICENSE