# privacyIDEA is a fork of LinOTP
#
# 2015-04-15 Cornelius Kölbel <cornelius.koelbel@netknights.it>
# Migrate SCIM Resolver to work with privacyidea 2 (Flask)
#
# May 08, 2014 Cornelius Kölbel
# contact: http://www.privacyidea.org
#
# product: LinOTP2
# module: useridresolver
# tool: SCIMIdResolver
# edition: Community Edition
#
# Copyright (C) 2010 - 2014 LSE Leading Security Experts GmbH
# License: AGPLv3
# contact: http://www.linotp.org
# http://www.lsexperts.de
# linotp@lsexperts.de
#
# This code is free software; you can redistribute it and/or
# modify it under the terms of the GNU AFFERO GENERAL PUBLIC LICENSE
# License as published by the Free Software Foundation; either
# version 3 of the License, or any later version.
#
# This code is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU AFFERO GENERAL PUBLIC LICENSE for more details.
#
# You should have received a copy of the GNU Affero General Public
# License along with this program. If not, see <http://www.gnu.org/licenses/>.
#
__doc__ = """This is the resolver to find users in a SCIM service.
The file is tested in tests/test_lib_resolver.py
"""
import logging
import traceback
from .UserIdResolver import UserIdResolver
import yaml
import requests
import base64
from urllib.parse import urlencode
from privacyidea.lib.utils import to_bytes, to_unicode, convert_column_to_unicode
log = logging.getLogger(__name__)
[docs]
class IdResolver (UserIdResolver):
def __init__(self):
self.auth_server = ''
self.resource_server = ''
self.auth_client = 'localhost'
self.auth_secret = '' # nosec B105 # default parameter
self.access_token = None
[docs]
def checkPass(self, uid, password):
"""
This function checks the password for a given uid.
- returns true in case of success
- false if password does not match
"""
# TODO: Implement password checking with SCIM
return False
[docs]
def get_user_info(self, user_id: int or str, attributes: list[str] = None) -> dict:
"""
returns the user information for a given uid.
"""
ret = {}
# The SCIM ID is always /Users/ID
# Alas, we can not map the ID to any other attribute
res = self._get_user(self.resource_server,
self.access_token,
user_id)
user = res
ret = self._fill_user_schema_1_0(user, attributes)
return ret
[docs]
def get_available_info_keys(self) -> list[str]:
"""
This function returns a list of known privacyIDEA user attributes which can be used, e.g. for getUserList or
get_user_info
:return: list of possible keys for searching users
"""
return ["username", "givenname", "surname", "phone", "email", "mobile"]
def _fill_user_schema_1_0(self, user: dict, attributes: list[str] = None) -> dict:
# We assume the schema:
# "schemas": ["urn:scim:schemas:core:1.0"]
#ret['username'] = user.get(self.mapping.get("username"))
#ret['givenname'] = user.get(self.mapping.get("givenname"), "")
#ret['surname'] = user.get(self.mapping.get("surname"), "")
#ret['phone'] = user.get(self.mapping.get("phone"), "")
#ret['mobile'] = user.get(self.mapping.get("mobile"), "")
#ret['email'] = user.get(self.mapping.get("email"), "")
attributes = attributes or self.get_available_info_keys()
ret = {}
if "username" in attributes:
ret['username'] = user.get("userName", {})
if "givenname" in attributes:
ret['givenname'] = user.get("name", {}).get("givenName", "")
if "surname" in attributes:
ret['surname'] = user.get("name", {}).get("familyName", "")
if "phone" in attributes:
ret["phone"] = ""
if user.get("phoneNumbers", {}):
ret['phone'] = user.get("phoneNumbers")[0].get("value")
if "email" in attributes:
ret["email"] = ""
if user.get("emails", {}):
ret['email'] = user.get("emails")[0].get("value")
if "mobile" in attributes:
ret["mobile"] = ""
return ret
[docs]
def getUsername(self, userid):
"""
Returns the username/loginname for a given userid
:param userid: The userid in this resolver
:type userid: string
:return: username
:rtype: string
"""
#user = self.getUserInfo(userid)
#return user.get("username", "")
# It seems that the userName is the UserId
return userid
[docs]
def getUserId(self, loginName):
"""
returns the uid for a given loginname/username
:rtype: str
"""
#res = {}
#if self.access_token:
# res = self._search_users(self.resource_server,
# self.access_token,
# {'filter': '%s eq "%s"' %
# ("userName", loginName)})
#return res.get("Resources", [{}])[0].get("externalId")
# It seems that the userName is the userId
return convert_column_to_unicode(loginName)
[docs]
def getUserList(self, search_dict=None, attributes: list[str] = None) -> list[dict]:
"""
Return the list of users
"""
ret = []
# TODO: search dict is not used at the moment
res = {}
if self.access_token:
res = self._search_users(self.resource_server,
self.access_token)
for user in res.get("Resources"):
ret_user = self._fill_user_schema_1_0(user, attributes)
ret.append(ret_user)
return ret
[docs]
def getResolverId(self):
"""
:return: the resolver identifier string, empty string if not exist
"""
return self.auth_server
[docs]
@staticmethod
def getResolverClassType():
return 'scimresolver'
[docs]
@staticmethod
def getResolverDescriptor():
return IdResolver.getResolverClassDescriptor()
[docs]
@staticmethod
def getResolverType():
return IdResolver.getResolverClassType()
[docs]
@classmethod
def getResolverClassDescriptor(cls):
"""
return the descriptor of the resolver, which is
- the class name and
- the config description
:return: resolver description dict
:rtype: dict
"""
descriptor = {}
typ = cls.getResolverClassType()
descriptor['clazz'] = "useridresolver.SCIMIdResolver.IdResolver"
descriptor['config'] = {'authserver': 'string',
'resourceserver': 'string',
'authclient': 'string',
'authsecret': 'string',
'mapping': 'string'}
return {typ: descriptor}
[docs]
def loadConfig(self, config):
"""load the configuration to the Resolver instance
Keys in the dict are
* Authserver
* Resouceserver
* Client
* Secret
* Mapping
:param config: the configuration dictionary
:type config: dict
:return: the resolver instance
"""
self.auth_server = config.get('Authserver')
self.resource_server = config.get('Resourceserver')
self.auth_client = config.get('Client')
self.auth_secret = config.get('Secret')
self.mapping = yaml.safe_load(config.get('Mapping'))
self.create_scim_object()
return self
[docs]
@classmethod
def testconnection(cls, param):
"""
This function lets you test the to be saved SCIM connection.
:param param: A dictionary with all necessary parameter to test the
connection.
:type param: dict
:return: Tuple of success and a description
:rtype: (bool, string)
Parameters are: Authserver, Resourceserver, Client, Secret, Mapping
"""
desc = None
success = False
try:
access_token = cls.get_access_token(str(param.get("Authserver")),
param.get("Client"),
param.get("Secret"))
content = cls._search_users(param.get("Resourceserver"),
access_token, "")
num = content.get("totalResults", -1)
desc = f"Found {num!s} users"
success = True
except Exception as exx:
log.error(f"Failed to retrieve users: {exx!s}")
log.debug(f"{traceback.format_exc()!s}")
desc = f"failed to retrieve users: {exx!s}"
return success, desc
@staticmethod
def _search_users(resource_server, access_token, params=None):
"""
:param params: Additional http parameters added to the URL
:type params: dictionary
"""
params = params or {}
headers = {'Authorization': f"Bearer {access_token}",
'content-type': 'application/json'}
url = f'{resource_server}/Users?{urlencode(params)}'
resp = requests.get(url, headers=headers, timeout=60)
if resp.status_code != 200:
info = f"Could not get user list: {resp.status_code!s}"
log.error(info)
raise Exception(info)
j_content = yaml.safe_load(resp.content)
return j_content
@staticmethod
def _get_user(resource_server, access_token, userid):
"""
Get a User from the SCIM service
:param resource_server: The Resource Server
:type resource_server: basestring / URI
:param access_token: Access Token
:type access_token: basestring
:param userid: The userid to fetch
:type userid: basestring
:return: Dictionary of User object.
"""
headers = {'Authorization': f"Bearer {access_token}",
'content-type': 'application/json'}
url = f'{resource_server}/Users/{userid}'
resp = requests.get(url, headers=headers, timeout=60)
if resp.status_code != 200:
info = f"Could not get user: {resp.status_code!s}"
log.error(info)
raise Exception(info)
j_content = yaml.safe_load(resp.content)
return j_content
@staticmethod
def get_access_token(server=None, client='', secret=''):
auth = to_unicode(base64.b64encode(to_bytes(client + ':' + secret)))
url = f"{server!s}/oauth/token?grant_type=client_credentials"
resp = requests.get(url,
headers={'Authorization': 'Basic ' + auth},
timeout=60)
if resp.status_code != 200:
info = f"Could not get access token: {resp.status_code!s}"
log.error(info)
raise Exception(info)
access_token = yaml.safe_load(resp.content).get('access_token')
return access_token
def create_scim_object(self):
self.access_token = self.get_access_token(self.auth_server,
self.auth_client,
self.auth_secret)