# (c) NetKnights GmbH 2025, https://netknights.it
#
# This code is free software; you can redistribute it and/or
# modify it under the terms of the GNU AFFERO GENERAL PUBLIC LICENSE
# as published by the Free Software Foundation; either
# version 3 of the License, or any later version.
#
# This code is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU AFFERO GENERAL PUBLIC LICENSE for more details.
#
# You should have received a copy of the GNU Affero General Public
# License along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# SPDX-FileCopyrightText: 2025 Agustin Prediger
# SPDX-FileCopyrightText: 2025 Jelina Unger <jelina.unger@netknights.it>
# SPDX-License-Identifier: AGPL-3.0-or-later
#
import logging
from requests import Response
from privacyidea.lib.error import ResolverError
from privacyidea.lib.resolvers.HTTPResolver import (HTTPResolver, METHOD, ENDPOINT, CONFIG_GET_USER_BY_ID,
CONFIG_GET_USER_BY_NAME, CONFIG_GET_USER_LIST, REQUEST_MAPPING,
RequestConfig, HEADERS, ADVANCED, RESPONSE_MAPPING,
CONFIG_CREATE_USER, CONFIG_DELETE_USER, CONFIG_EDIT_USER,
CONFIG_USER_AUTH, Error, ACTIVE,
USER_GROUPS_ATTRIBUTE, PI_USER_GROUPS_KEY)
from privacyidea.lib.resolvers.util import delete_user_error_handling_no_content
log = logging.getLogger(__name__)
REALM = "realm"
[docs]
class KeycloakResolver(HTTPResolver):
def __init__(self):
super().__init__()
self.config_get_user_by_id = {METHOD: "GET", ENDPOINT: "/admin/realms/{realm}/users/{userid}"}
self.config.update({CONFIG_GET_USER_BY_ID: self.config_get_user_by_id,
CONFIG_GET_USER_BY_NAME: {METHOD: "GET", ENDPOINT: "/admin/realms/{realm}/users",
REQUEST_MAPPING: '{"username": "{username}", "exact": true}'},
CONFIG_GET_USER_LIST: {METHOD: "GET", ENDPOINT: "/admin/realms/{realm}/users"},
CONFIG_CREATE_USER: {METHOD: "POST", ENDPOINT: "/admin/realms/{realm}/users",
REQUEST_MAPPING: '{"enabled": true}'},
CONFIG_EDIT_USER: {METHOD: "PUT", ENDPOINT: "/admin/realms/{realm}/users/{userid}"},
CONFIG_DELETE_USER: {METHOD: "DELETE", ENDPOINT: "/admin/realms/{realm}/users/{userid}"},
CONFIG_USER_AUTH: {METHOD: "POST",
HEADERS: '{"Content-Type": "application/x-www-form-urlencoded"}',
ENDPOINT: "/realms/{realm}/protocol/openid-connect/token",
REQUEST_MAPPING: "grant_type=password&client_id=admin-cli&"
"username={username}&password={password}"}
})
self.config_get_user_groups = {ACTIVE: True, METHOD: "get",
ENDPOINT: "/admin/realms/{realm}/users/{userid}/groups",
USER_GROUPS_ATTRIBUTE: "name", PI_USER_GROUPS_KEY: self.pi_user_groups_key}
self.attribute_mapping_pi_to_user_store = {"username": "username",
"userid": "id",
"givenname": "firstName",
"surname": "lastName",
"email": "email"}
self.attribute_mapping_user_store_to_pi = {rh_sso_key: pi_key for pi_key, rh_sso_key in
self.attribute_mapping_pi_to_user_store.items()}
self.base_url = "http://localhost:8080"
self.authorization_config = {METHOD: "POST",
ENDPOINT: "/realms/{realm}/protocol/openid-connect/token",
REQUEST_MAPPING: ("grant_type=password&client_id=admin-cli"
"&username={username}&password={password}"),
RESPONSE_MAPPING: '{"Authorization": "Bearer {access_token}"}',
HEADERS: '{"Content-Type": "application/x-www-form-urlencoded"}'}
# No wildcard required
self.wildcard = ""
# custom attributes
self.realm = None
[docs]
def loadConfig(self, config: dict):
"""
Load the configuration for the resolver.
"""
self.config.update(config)
super().loadConfig(self.config)
self.realm = self.config.get(REALM)
[docs]
def get_config(self) -> dict:
"""
Returns the configuration of the resolver.
"""
censored_config = super().get_config()
censored_config[ADVANCED] = True
return censored_config
[docs]
@staticmethod
def getResolverClassType() -> str:
"""
provide the resolver type for registration
"""
return "keycloakresolver"
[docs]
@staticmethod
def getResolverType() -> str:
"""
Returns the type of the resolver
"""
return KeycloakResolver.getResolverClassType()
[docs]
@classmethod
def getResolverClassDescriptor(cls) -> dict:
"""
Returns the class descriptor which is a dictionary with the resolver type as key and a dictionary containing
the data type for each configuration parameter as value.
"""
resolver_type = cls.getResolverType()
descriptor = super().getResolverClassDescriptor()[resolver_type]
descriptor['clazz'] = "useridresolver.KeycloakResolver.KeycloakResolver"
descriptor['config'].update({REALM: "string"})
return {resolver_type: descriptor}
[docs]
@staticmethod
def getResolverDescriptor() -> dict:
"""
Returns the descriptor of the resolver, which is the class name and the config description.
"""
return KeycloakResolver.getResolverClassDescriptor()
[docs]
def getUserId(self, login_name: str) -> str:
"""
Searches for a user by its name. Keycloak does not have an explicit endpoint for this purpose. Hence, we use
the endpoint to get all users but filter for the username.
"""
config_get_user_by_name = self.config.get(CONFIG_GET_USER_BY_NAME)
config = RequestConfig(config_get_user_by_name, self.headers, {"username": login_name}, "")
try:
users = self._get_user_list({}, config, ["userid"])
except ResolverError as error:
log.info(f"Failed to resolve user '{login_name}': {error}")
return ""
if len(users) == 1:
user_id = users[0].get("userid")
elif len(users) > 1:
raise ResolverError(f"Multiple users found for username '{login_name}'")
else:
user_id = ""
log.info(f"No user found for username '{login_name}'")
return user_id
def _replace_resolver_specific_tags(self, config: RequestConfig):
"""
Replaces resolver-specific tags in the configuration with their actual values.
:param config: The configuration dictionary for the request
:return: The configuration dictionary with tags replaced
"""
if self.realm:
config.endpoint = config.endpoint.replace("{realm}", self.realm)
return config
@staticmethod
def _is_wildcard_search(search_dict: dict | None) -> bool:
"""
Checks if the search dict contains a wildcard search. Since substring search can only be applied for username,
given name, surname, and email, we only check if asterisk is contained in at least one of the values of the
search dict.
If the search dict is None or empty, it is not a wildcard search.
:param search_dict: The search dictionary containing the search parameters
:return: True if it is a wildcard search, False otherwise
"""
if not search_dict:
return False
wildcard_keys = {"username", "givenname", "surname", "email"}
return any("*" in str(search_dict.get(key, "")) for key in wildcard_keys if key in search_dict)
def _get_user_list(self, search_dict: dict, config: RequestConfig, attributes: list[str] = None) -> list[dict]:
"""
Fetches a list of users from the user store.
:param search_dict: Dictionary containing search parameters that are added as query to the endpoint url
:param config: Configuration contains all information of the api endpoint to fetch the users.
:param attributes: List of attributes that should be included in the response. If None, all attributes
are included.
:return: List of dictionaries containing pi conform user attributes
"""
request_params = config.request_mapping or {}
request_params.update(self._get_search_params(search_dict))
if search_dict:
request_params["exact"] = str(not self._is_wildcard_search(search_dict)).lower()
config.headers.update(self._get_auth_header())
response = self._do_request(config, request_params)
self._get_user_list_error_handling(response, config)
# Map user store attributes to pi attributes
json_result = response.json()
json_result = self._apply_response_mapping(config, json_result)
user_store_users = self._get_user_list_from_response(json_result)
users = [self._user_store_user_to_pi_user(user, attributes) for user in user_store_users]
return users
# Error Handling
[docs]
@staticmethod
def get_error(response: Response) -> Error | None:
"""
Extracts the error message from the response if available.
It tries to get the error message under the key "errorMessage" or "error".
:param response: The response object from the HTTP request
:return: The error message if available, otherwise None
"""
error = Error(False, "", "")
try:
data = response.json()
error_message = data.get("errorMessage")
if not error_message:
error_code = data.get("error", "")
error_message = data.get("error_description", "")
if error_code or error_message:
error = Error(True, error_code, error_message)
else:
error = Error(True, "", error_message)
except ValueError:
error = None
return error
def _get_user_list_error_handling(self, response: Response, config: RequestConfig):
"""
Handles the error response from the user store
:param response: The response object from the HTTP request
:param config: The configuration for the user list request
"""
if response.status_code != 200:
error = self.get_error(response)
if error.error:
success = False
log.info(f"Failed to get the user list: {error.code} - {error.message}")
else:
# There is no error message in the expected format. Execute generic error handling.
success = super()._get_user_list_error_handling(response, config)
else:
# Custom errors can also occur in successful responses
success = self._custom_error_handling(response, config)
if not success:
raise ResolverError("Failed to get the user list!")
def _get_user_error_handling(self, response: Response, config: RequestConfig, user_identifier: str) -> bool:
"""
Handles the error response from the user store
:param response: The response object from the HTTP request
:param config: Configuration for the endpoint containing information about special error handling
:param user_identifier: Either the username or user id (used for logging)
:return: True if the request was successful, False otherwise
"""
if not response.status_code == 200:
# extract error messages
error = self.get_error(response)
if error.error:
success = False
log.info(f"Failed to resolve user: {error.code} - {error.message}")
if response.status_code == 404:
log.info(f"User '{user_identifier}' does not exist!")
else:
# There is no error message in the expected format. Execute generic error handling.
success = super()._get_user_error_handling(response, config, user_identifier)
else:
# Custom errors can also occur in successful responses
success = self._custom_error_handling(response, config)
return success
def _create_user_error_handling(self, response: Response, config: RequestConfig) -> bool:
"""
Handles the error response from the user store when creating a user
:param response: The response object from the HTTP request
:param config: Configuration for the endpoint containing information about special error handling
:return: True on success, False otherwise
"""
if not response.status_code == 201:
# extract error messages
error = self.get_error(response)
if error.error:
log.info(f"Failed to create user: {error.code} - {error.message}")
raise ResolverError(f"Failed to create user: {error.message}")
else:
# There is no error message in the expected format. Execute generic error handling.
success = super()._create_user_error_handling(response, config)
else:
# Custom errors can also occur in successful responses
success = self._custom_error_handling(response, config)
if not success:
raise ResolverError("Failed to create user!")
return success
def _update_user_error_handling(self, response: Response, config: RequestConfig, user_identifier: str) -> bool:
"""
Handles the error response from the user store when updating a user.
Does not raise an exception, as this is handled from the API function.
:param response: The response object from the HTTP request
:param config: Configuration for the endpoint containing information about special error handling
:return: True on success, False otherwise
"""
if response.status_code == 204:
return True
# extract error messages
error = self.get_error(response)
if error.error:
success = False
log.info(f"Failed to update user {user_identifier}: {error.code} - {error.message}")
else:
# There is no error message in the expected format. Execute generic error handling.
success = super()._update_user_error_handling(response, config, user_identifier)
return success
def _delete_user_error_handling(self, response: Response, config: RequestConfig, user_identifier: str) -> bool:
"""
Handles the error response from the user store when deleting a user
:param response: The response object from the HTTP request
:param config: Configuration for the endpoint containing information about special error handling
:param user_identifier: The identifier of the user to be deleted#
:return: True on success, False otherwise
"""
return delete_user_error_handling_no_content(self, response, config, user_identifier)
def _auth_header_error_handling(self, response: Response, config: RequestConfig) -> bool:
"""
Handles the error response from the user store when trying to get the authorization header.
:param response: The response object from the HTTP request
:param config: Configuration for the endpoint containing information about special error handling
:return: True if the password check was successful, False otherwise
"""
if not response.status_code == 200:
# extract error code and messages
error = self.get_error(response)
if error.error:
success = False
log.debug(f"Failed to get authorization header: {error.code} - {error.message}")
else:
success = super()._auth_header_error_handling(response, config)
else:
# Custom errors can also occur in successful responses
success = self._custom_error_handling(response, config)
return success
def _user_auth_error_handling(self, response: Response, config: RequestConfig, user_identifier: str) -> bool:
"""
Handles the error response from the user store when checking a user's password
:param response: The response object from the HTTP request
:param config: Configuration for the endpoint containing information about special error handling
:param user_identifier: The user identifier (username or user id)
:return: True if the password check was successful, False otherwise
"""
if not response.status_code == 200:
# extract error code and messages
error = self.get_error(response)
if error.error:
success = False
log.debug(f"Failed to authenticate user {user_identifier}: {error.code} - {error.message}")
else:
success = super()._user_auth_error_handling(response, config, user_identifier)
else:
# Custom errors can also occur in successful responses
success = self._custom_error_handling(response, config)
return success