Source code for privacyidea.lib.user

# -*- coding: utf-8 -*-
#  privacyIDEA is a fork of LinOTP
#
#  2015-11-03   Cornelius Kölbel <cornelius@privacyidea.org>
#               Add memberfunction "exist"
#  2015-06-06   Cornelius Kölbel <cornelius@privacyidea.org>
#               Add the possibility to update the user data.
#  Nov 27, 2014 Cornelius Kölbel <cornelius@privacyidea.org>
#               Migration to flask
#               Rewrite of methods
#               100% test code coverage
#  May 08, 2014 Cornelius Kölbel
#
#  License:  AGPLv3
#  contact:  http://www.privacyidea.org
#
# 2014-10-03 fix getUsername function
#            Cornelius Kölbel <cornelius@privcyidea.org>
#
#  Copyright (C) 2010 - 2014 LSE Leading Security Experts GmbH
#  License:  AGPLv3
#  contact:  http://www.linotp.org
#            http://www.lsexperts.de
#            linotp@lsexperts.de
#
# This code is free software; you can redistribute it and/or
# modify it under the terms of the GNU AFFERO GENERAL PUBLIC LICENSE
# License as published by the Free Software Foundation; either
# version 3 of the License, or any later version.
#
# This code is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU AFFERO GENERAL PUBLIC LICENSE for more details.
#
# You should have received a copy of the GNU Affero General Public
# License along with this program.  If not, see <http://www.gnu.org/licenses/>.
#
__doc__ = '''There are the library functions for user functions.
It depends on the lib.resolver and lib.realm.

There are and must be no dependencies to the token functions (lib.token)
or to webservices!

This code is tested in tests/test_lib_user.py
'''

import logging
import traceback

from .error import UserError
from ..api.lib.utils import (getParam,
                             optional)
from .log import log_with
from .resolver import (get_resolver_object,
                       get_resolver_type)

from .realm import (get_realms,
                    get_default_realm,
                    get_realm)
from .config import get_from_config

ENCODING = 'utf-8'

log = logging.getLogger(__name__)


@log_with(log)
[docs]class User(object): """ The user has the attributes login, realm and resolver. Usually a user can be found via "login@realm". A user object with an empty login and realm should not exist, whereas a user object could have an empty resolver. """ # In some test case the login attribute from a not # initialized user is requested. This is why we need # these dummy class attributes. login = "" realm = "" resolver = "" def __init__(self, login="", realm="", resolver=""): self.login = login or "" self.realm = (realm or "").lower() self.resolver = resolver or "" if not self.resolver: # set the resolver implicitly! self.get_resolvers() self.Resolvers_list = [] def is_empty(self): # ignore if only resolver is set! as it makes no sense if len(self.login or "") + len(self.realm or "") == 0: return True else: return False def __eq__(self, other): """ Compare two User Objects. :param other: The other User object, to which this very object is compared. :type other: User object :return: True or False :rtype: bool """ return (self.login == other.login) and (self.resolver == other.resolver) and ( self.realm == other.realm) def __str__(self): ret = "<empty user>" if self.is_empty() is False: loginname = "" try: loginname = unicode(self.login) except UnicodeEncodeError: # pragma: no cover loginname = unicode(self.login.encode(ENCODING)) conf = '' if self.resolver is not None and self.resolver: conf = '.%s' % (unicode(self.resolver)) ret = '<%s%s@%s>' % (loginname, conf, unicode(self.realm)) return ret def __repr__(self): ret = ('User(login=%r, realm=%r, resolver=%r)' % (self.login, self.realm, self.resolver)) return ret @log_with(log) def get_realm_resolvers(self): """ This returns a dictionary of the resolvernames in the realm. It does not take into account, the self.login (username)! The key is the resolvername. :return: dict of resolvers in self.realm :rtype: dict """ resolvers = {} realm_config = get_realms(self.realm) resolvers_in_realm = realm_config.get(self.realm, {})\ .get("resolver", {}) for resolver in resolvers_in_realm: resolvername = resolver.get("name") resolvers[resolvername] = {"priority": resolver.get("priority"), "type": resolver.get("type")} return resolvers def get_resolvers(self): """ This returns the list of the resolvernames of the user. If no resolver attribute exists at the moment, the user is searched in the realm and according to this the resolver attribute is set. It will only return one resolver in the list for backward compatibilty .. note:: If the user does not exist in the realm, then an empty list is returned! :return: list of resolvers for self.login :rtype: list of strings """ if self.resolver: return [self.resolver] resolvers = [] resolver_with_highest_priority = None resolvers_in_realm = self.get_realm_resolvers() highest_priority = 1000 for resolvername in resolvers_in_realm.keys(): # test, if the user is contained in this resolver y = get_resolver_object(resolvername) if y is None: # pragma: no cover log.info("Resolver %r not found!" % resolvername) else: uid = y.getUserId(self.login) if uid not in ["", None]: log.info("user %r found in resolver %r" % (self.login, resolvername)) log.info("userid resolved to %r " % uid) priority = resolvers_in_realm.get(resolvername).get( "priority", 999) log.debug("priority of the resolver is %s" % priority) log.debug("The highest priority is %s" % highest_priority) if priority < highest_priority: highest_priority = priority resolver_with_highest_priority = resolvername else: log.debug("user %r not found" " in resolver %r" % (self.login, resolvername)) if resolver_with_highest_priority: self.resolver = resolver_with_highest_priority resolvers = [self.resolver] return resolvers def get_user_identifiers(self): """ This returns the UserId information from the resolver object and the resolvertype and the resolvername (former: getUserId) (former: getUserResolverId) :return: The userid, the resolver type and the resolver name like (1000, "passwdresolver", "resolver1") :rtype: tuple """ if not self.resolver: self.get_resolvers() if not self.resolver: # The resolver list is empty raise UserError("The user can not be found in any resolver in " "this realm!") rtype = get_resolver_type(self.resolver) y = get_resolver_object(self.resolver) if y is None: raise UserError("The resolver '%s' does not exist!" % self.resolver) uid = y.getUserId(self.login) return uid, rtype, self.resolver def exist(self): """ Check if the user object exists in the user store :return: True or False """ success = True try: uid, _rtype, _resolver = self.get_user_identifiers() except UserError: log.debug("User %s does not exist." % self) success = False if not uid: # The SQL resolver does not raise an exception but returns an # empty UID. success = False return success @property def info(self): """ return the detailed information for the user :return: a dict with all the userinformation :rtype: dict """ (uid, _rtype, _resolver) = self.get_user_identifiers() y = get_resolver_object(self.resolver) userInfo = y.getUserInfo(uid) return userInfo @log_with(log) def get_user_phone(self, phone_type='phone'): """ Returns the phone numer of a user :param phone_type: The type of the phone, i.e. either mobile or phone (land line) :type phone_type: string :returns: list with phone numbers of this user object """ userinfo = self.info if phone_type in userinfo: log.debug("got user phone %r of type %r" % (userinfo[phone_type], phone_type)) return userinfo[phone_type] else: log.warning("userobject (%r) has no phone of type %r." % (self, phone_type)) return "" @log_with(log) def get_user_realms(self): """ Returns a list of the realms, a user belongs to. Usually this will only be one realm. But if the user object has no realm but only a resolver, than all realms, containing this resolver are returned. This function is used for the policy module :return: realms of the user :rtype: list """ allRealms = get_realms() Realms = [] if self.realm == "" and self.resolver == "": defRealm = get_default_realm().lower() Realms.append(defRealm) self.realm = defRealm elif self.realm != "": Realms.append(self.realm.lower()) else: # User has not realm! # we have got a resolver and will get all realms # the resolver belongs to. for key, val in allRealms.items(): log.debug("evaluating realm %r: %r " % (key, val)) for reso in val.get('resolver', []): resoname = reso.get("name") if resoname == self.resolver: Realms.append(key.lower()) log.debug("added realm %r to Realms due to " "resolver %r" % (key, self.resolver)) return Realms @log_with(log) def check_password(self, password): """ The password of the user is checked against the user source :param password: The clear text password :return: the username of the authenticated user. If unsuccessful, returns None :rtype: string/None """ success = None try: log.info("User %r from realm %r tries to " "authenticate" % (self.login, self.realm)) if type(self.login) != unicode: self.login = self.login.decode(ENCODING) res = self.get_resolvers() # Now we know, the resolvers of this user and we can verify the # password if len(res) == 1: y = get_resolver_object(self.resolver) uid, _rtype, _rname = self.get_user_identifiers() if y.checkPass(uid, password): success = "%s@%s" % (self.login, self.realm) log.debug("Successfully authenticated user %r." % self) else: log.info("user %r failed to authenticate." % self) elif not res: log.error("The user %r exists in NO resolver." % self) except UserError as e: # pragma: no cover log.error("Error while trying to verify the username: %r" % e) except Exception as e: # pragma: no cover log.error("Error checking password within module %r" % e) log.debug("%s" % traceback.format_exc()) return success @log_with(log) def get_search_fields(self): """ Return the valid search fields of a user. The search fields are defined in the UserIdResolver class. :return: searchFields with name (key) and type (value) :rtype: dict """ searchFields = {} for reso in self.get_resolvers(): # try to load the UserIdResolver Class try: y = get_resolver_object(reso) sf = y.getSearchFields() searchFields[reso] = sf except Exception as e: # pragma: no cover log.warning("module %r: %r" % (reso, e)) return searchFields @log_with(log) def update_user_info(self, attributes): """ This updates the given attributes of a user. The attributes can be "username", "surname", "givenname", "email", "mobile", "phone", "password" :param attributes: A dictionary of the attributes to be updated :type attributes: dict :return: True in case of success """ success = False try: log.info("User info for user %s@%s about to be updated." % (self.login, self.realm)) if type(self.login) != unicode: self.login = self.login.decode(ENCODING) res = self.get_resolvers() # Now we know, the resolvers of this user and we can update the # user if len(res) == 1: y = get_resolver_object(self.resolver) if not y.updateable: # pragma: no cover log.warning("The resolver %s is not updateable." % y) else: uid, _rtype, _rname = self.get_user_identifiers() if y.update_user(uid, attributes): success = True # If necessary, update the username if attributes.get("username"): self.login = attributes.get("username") log.info("Successfully updated user %r." % self) else: # pragma: no cover log.info("user %r failed to update." % self) elif not res: # pragma: no cover log.error("The user %r exists in NO resolver." % self) except UserError as exx: # pragma: no cover log.error("Error while trying to verify the username: %s" % exx) return success @log_with(log) def delete(self): """ This deletes the user in the user store. I.e. the user in the SQL database or the LDAP gets deleted. Returns True in case of success """ success = False try: log.info("User %s@%s about to be deleted." % (self.login, self.realm)) if type(self.login) != unicode: self.login = self.login.decode(ENCODING) res = self.get_resolvers() # Now we know, the resolvers of this user and we can delete it if len(res) == 1: y = get_resolver_object(self.resolver) if not y.updateable: # pragma: no cover log.warning("The resolver %s is not updateable." % y) else: uid, _rtype, _rname = self.get_user_identifiers() if y.delete_user(uid): success = True log.info("Successfully deleted user %r." % self) else: # pragma: no cover log.info("user %r failed to update." % self) elif not res: # pragma: no cover log.error("The user %r exists in NO resolver." % self) except UserError as exx: # pragma: no cover log.error("Error while trying to verify the username: %r" % exx) except Exception as exx: # pragma: no cover log.error("Error checking password within module %r" % exx) log.debug("%s" % traceback.format_exc()) return success ####################################################################
@log_with(log)
[docs]def create_user(resolvername, attributes): """ This creates a new user in the given resolver. The resolver must be editable to do so. The attributes is a dictionary containing the keys "username", "email", "phone", "mobile", "surname", "givenname", "password". We return the UID and not the user object, since the user could be located in several realms! :param resolvername: The name of the resolver, in which the user should be created :type resolvername: basestring :param attributes: Attributes of the user :type attributes: dict :return: The uid of the user object """ y = get_resolver_object(resolvername) uid = y.add_user(attributes) return uid
@log_with(log)
[docs]def split_user(username): """ Split the username of the form user@realm into the username and the realm splitting myemail@emailprovider.com@realm is also possible and will return (myemail@emailprovider, realm). If for a user@domain the "domain" does not exist as realm, the name is not split, since it might be the user@domain in the default realm We can also split realm\\user to (user, realm) :param username: the username to split :type username: string :return: username and realm :rtype: tuple """ from privacyidea.lib.realm import realm_is_defined user = username.strip() realm = "" l = user.split('@') if len(l) >= 2: if realm_is_defined(l[-1]): # split the last only if the last part is really a realm (user, realm) = user.rsplit('@', 1) else: l = user.split('\\') if len(l) >= 2: (realm, user) = user.rsplit('\\', 1) return user, realm
[docs]def get_user_from_param(param, optionalOrRequired=optional): """ Find the parameters user, realm and resolver and create a user object from these parameters. An exception is raised, if a user in a realm is found in more than one resolvers. :param param: The dictionary of request parameters :type param: dict :return: User as found in the parameters :rtype: User object """ realm = "" username = getParam(param, "user", optionalOrRequired) if username is None: username = "" else: splitAtSign = get_from_config("splitAtSign") if splitAtSign.lower() in ["true", "1"]: (username, realm) = split_user(username) if "realm" in param: realm = param["realm"] if username != "": if realm is None or realm == "": realm = get_default_realm() user_object = User(login=username, realm=realm) if "resolver" in param: user_object.resolver = param["resolver"] else: user_object.get_resolvers() return user_object
@log_with(log)
[docs]def get_user_list(param=None, user=None): users = [] resolvers = [] searchDict = {"username": "*"} param = param or {} # we have to recreate a new searchdict without the realm key # as delete does not work for key in param: lval = param[key] if key == "realm": continue if key == "resolver": continue if key == "user": # If "user" is in the param we overwrite the username key = "username" searchDict[key] = lval log.debug("Parameter key:%r=%r" % (key, lval)) # determine which scope we want to show param_resolver = getParam(param, "resolver") param_realm = getParam(param, "realm") user_resolver = None user_realm = None if user is not None: user_resolver = user.resolver user_realm = user.realm # Append all possible resolvers if param_resolver: resolvers.append(param_resolver) if user_resolver: resolvers.append(user_resolver) for pu_realm in [param_realm, user_realm]: if pu_realm: realm_config = get_realm(pu_realm) for r in realm_config.get("resolver", {}): if r.get("name"): resolvers.append(r.get("name")) if not (param_resolver or user_resolver or param_realm or user_realm): # if no realm or resolver was specified, we search the resolvers # in all realms all_realms = get_realms() for _name, res_list in all_realms.items(): for resolver_entry in res_list.get("resolver"): resolvers.append(resolver_entry.get("name")) for resolver_name in set(resolvers): try: log.debug("Check for resolver class: %r" % resolver_name) y = get_resolver_object(resolver_name) log.debug("with this search dictionary: %r " % searchDict) ulist = y.getUserList(searchDict) # Add resolvername to the list for ue in ulist: ue["resolver"] = resolver_name ue["editable"] = y.updateable log.debug("Found this userlist: %r" % ulist) users.extend(ulist) except KeyError as exx: # pragma: no cover log.error("%r" % (exx)) log.debug("%s" % traceback.format_exc()) raise exx except Exception as exx: # pragma: no cover log.error("%r" % (exx)) log.debug("%s" % traceback.format_exc()) continue return users
@log_with(log)
[docs]def get_user_info(userid, resolvername): """ return the detailed information for a user in a resolver :param userid: The id of the user in a resolver :type userid: string :param resolvername: The name of the resolver :return: a dict with all the userinformation :rtype: dict """ userInfo = {} if userid: y = get_resolver_object(resolvername) userInfo = y.getUserInfo(userid) return userInfo
@log_with(log)
[docs]def get_username(userid, resolvername): """ Determine the username for a given id and a resolvername. :param userid: The id of the user in a resolver :type userid: string :param resolvername: The name of the resolver :return: the username or "" if it does not exist :rtype: string """ username = "" if userid: y = get_resolver_object(resolvername) if y: username = y.getUsername(userid) return username