Source code for privacyidea.lib.resolvers.SQLIdResolver

#  Copyright (C) 2014 Cornelius Kölbel
#  License:  AGPLv3
#  contact:  cornelius@privacyidea.org
#
#  2019-47-14 Paul Lettich <paul.lettich@netknights.it>
#             Remove hash calculation and switch to passlib
#  2016-07-15 Cornelius Kölbel <cornelius.koelbel@netknights.it>
#             Add sha512 PHP hash as suggested by Rick Romero
#  2016-04-08 Cornelius Kölbel <cornelius.koelbel@netknights.it>
#             Simplifying out of bounds check
#             Avoid repetition in comparison
#
# SPDX-License-Identifier: AGPL-3.0-or-later
#
# This code is free software; you can redistribute it and/or
# modify it under the terms of the GNU AFFERO GENERAL PUBLIC LICENSE
# License as published by the Free Software Foundation; either
# version 3 of the License, or any later version.
#
# This code is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU AFFERO GENERAL PUBLIC LICENSE for more details.
#
# You should have received a copy of the GNU Affero General Public
# License along with this program.  If not, see <http://www.gnu.org/licenses/>.
#
__doc__ = """This is the resolver to find users in SQL databases.

The file is tested in tests/test_lib_resolver.py
"""

import logging
import yaml
import binascii
import re

from privacyidea.lib.resolvers.UserIdResolver import UserIdResolver

from sqlalchemy import (Integer, cast, String, MetaData, Table, and_,
                        create_engine, select, insert, delete, update, RowMapping)
from sqlalchemy.orm import sessionmaker, scoped_session

import traceback
import hashlib
from privacyidea.lib.pooling import get_engine
from privacyidea.lib.lifecycle import register_finalizer
from privacyidea.lib.utils import (is_true, censor_connect_string,
                                   convert_column_to_unicode)
from privacyidea.lib.error import ParameterError, ResolverError

# TODO passlib has to be replaced before the next release, this is just a workaround that can not stay
# passlib 1.7.4 compatibility with modern bcrypt. Two breaking changes:
# 1. bcrypt 4.1.0 removed __about__.__version__ — passlib uses it for version detection.
#    https://github.com/pyca/bcrypt/issues/684
# 2. bcrypt 5.0.0 raises ValueError for passwords > 72 bytes instead of silently
#    truncating. passlib's wrap-bug detection passes a long password to hashpw during
#    backend initialization, which crashes and causes all bcrypt verification to silently
#    return False.
import bcrypt as _bcrypt
if not hasattr(_bcrypt, '__about__'):
    _bcrypt.__about__ = type('__about__', (), {'__version__': _bcrypt.__version__})()
_orig_hashpw = _bcrypt.hashpw
def _hashpw_compat(password, salt):
    if isinstance(password, bytes) and len(password) > 72:
        password = password[:72]
    return _orig_hashpw(password, salt)
_bcrypt.hashpw = _hashpw_compat

# passlib imports must stay below the bcrypt monkey-patch above (E402 suppressed).
from passlib.context import CryptContext  # noqa: E402
from passlib.utils import h64  # noqa: E402
from passlib.utils.compat import uascii_to_str  # noqa: E402
from passlib.utils.compat import unicode as pl_unicode  # noqa: E402
from passlib.utils import to_unicode  # noqa: E402
import passlib.utils.handlers as uh  # noqa: E402
import passlib.exc as exc  # noqa: E402
from passlib.registry import register_crypt_handler  # noqa: E402
from passlib.handlers.ldap_digests import _SaltedBase64DigestHelper  # noqa: E402


class phpass_drupal(uh.HasRounds, uh.HasSalt, uh.GenericHandler):  # pragma: no cover
    """This class implements the PHPass Portable Hash (Drupal version), and follows the
    :ref:`password-hash-api`.
    """
    name = "phpass_drupal"
    setting_kwds = ("salt", "rounds")
    checksum_chars = uh.HASH64_CHARS
    checksum_size = 43

    min_salt_size = max_salt_size = 8
    salt_chars = uh.HASH64_CHARS

    default_rounds = 19
    min_rounds = 7
    max_rounds = 30
    rounds_cost = "log2"

    ident = '$S$'

    @classmethod
    def from_string(cls, hash):
        hash = to_unicode(hash, "ascii", "hash")
        ident, data = hash[0:3], hash[3:]
        if ident != cls.ident:
            raise exc.InvalidHashError()
        rounds, salt, chk = data[0], data[1:9], data[9:]
        return cls(
            rounds=h64.decode_int6(rounds.encode("ascii")),
            salt=salt,
            checksum=chk or None,
        )

    def to_string(self):
        hash = "{}{}{}{}".format(self.ident,
                              h64.encode_int6(self.rounds).decode("ascii"),
                              self.salt,
                              self.checksum or '')
        return uascii_to_str(hash)

    def _calc_checksum(self, secret):
        if isinstance(secret, pl_unicode):
            secret = secret.encode("utf-8")
        real_rounds = 1 << self.rounds
        result = hashlib.sha512(self.salt.encode("ascii") + secret).digest()
        r = 0
        while r < real_rounds:
            result = hashlib.sha512(result + secret).digest()
            r += 1
        return h64.encode_bytes(result).decode("ascii")[:self.checksum_size]


class ldap_salted_sha256_pi(_SaltedBase64DigestHelper):
    name = 'ldap_salted_sha256_pi'
    ident = '{SSHA256}'
    checksum_size = 32
    # passlib sets the max_salt_size for SSHA256 to 16:
    # <https://foss.heptapod.net/python-libs/passlib/-/blob/branch/stable/passlib/handlers/ldap_digests.py#L71>
    # But we have encountered hashes with longer salt sizes. Since we set this
    # password hashing function when we create an internal db resolver,
    # we should be able to verify these hashes as well.
    max_salt_size = 32
    _hash_func = hashlib.sha256
    _hash_regex = re.compile(r"^\{SSHA256\}(?P<tmp>[+/a-zA-Z0-9]{48,}={0,2})$")


register_crypt_handler(phpass_drupal)
register_crypt_handler(ldap_salted_sha256_pi)


# The list of supported password hash types for verification (passlib handler)
pw_ctx = CryptContext(schemes=['phpass',
                               'phpass_drupal',
                               'ldap_salted_sha1',
                               'ldap_salted_sha256_pi',
                               'ldap_salted_sha512',
                               'ldap_sha1',
                               'md5_crypt',
                               'bcrypt',
                               'sha512_crypt',
                               'sha256_crypt',
                               'hex_sha256',
                               ])

# List of supported password hash types for hash generation (name to passlib handler id)
hash_type_dict = {"PHPASS": 'phpass',
                  "SHA": 'ldap_sha1',
                  "SSHA": 'ldap_salted_sha1',
                  "SSHA256": 'ldap_salted_sha256_pi',
                  "SSHA512": 'ldap_salted_sha512',
                  "OTRS": 'hex_sha256',
                  "SHA256CRYPT": 'sha256_crypt',
                  "SHA512CRYPT": 'sha512_crypt',
                  "MD5CRYPT": 'md5_crypt',
                  }

log = logging.getLogger(__name__)

[docs] class IdResolver (UserIdResolver): searchFields = {"username": "text", "userid": "numeric", "phone": "text", "mobile": "text", "surname": "text", "givenname": "text", "email": "text", "description": "text", } # If the resolver could be configured editable updateable = True def __init__(self): self.resolverId = "" self.server = "" self.driver = "" self.database = "" self.port = 0 self.limit = 100 self.user = "" self.password = "" # nosec B105 # default parameter self.table = "" self.TABLE = None self.map = {} self.reverse_map = {} self.where = "" self.encoding = "" self.conParams = "" self.connect_string = "" self.session = None self.pool_size = 10 self.pool_timeout = 120 self.pool_recycle = 7200 self.engine = None self._editable = False self.password_hash_type = None return def getSearchFields(self): return self.searchFields @staticmethod def _append_where_filter(conditions, table, where): """ Append contents of WHERE statement to the list of filter conditions :param conditions: filter conditions :type conditions: list :return: list of filter conditions """ if where: parts = re.split(' and ', where, flags=re.IGNORECASE) for part in parts: # this might result in errors if the # administrator enters nonsense (w_column, w_cond, w_value) = part.split() if w_cond.lower() == "like": conditions.append(table.columns[w_column].like(w_value)) elif w_cond == "==": conditions.append(table.columns[w_column] == w_value) elif w_cond == ">": conditions.append(table.columns[w_column] > w_value) elif w_cond == "<": conditions.append(table.columns[w_column] < w_value) return conditions
[docs] def checkPass(self, uid, password): """ This function checks the password for a given uid. :param uid: uid of the user for which the password should be checked :type uid: str :param password: the password to check :type password: str :return: True if password matches the saved password hash, False otherwise :rtype: bool """ res = False userinfo = self.get_user_info(uid) database_pw = userinfo.get("password", "XXXXXXX") # remove owncloud hash format identifier (currently only version 1) database_pw = re.sub(r'^1\|', '', database_pw) # translate lower case hash identifier to uppercase database_pw = re.sub(r'^{([a-z0-9]+)}', lambda match: f'{{{match.group(1).upper()}}}', database_pw) try: res = pw_ctx.verify(password, database_pw) except ValueError as _e: # if the hash could not be identified / verified, just return False pass return res
[docs] def get_user_info(self, user_id: int or str, attributes: list[str] = None) -> dict: """ This function returns all user info for a given userid/object. :param user_id: The userid of the object :param attributes: list of attribute names to be returned for the user. If None, all attributes are returned. :return: A dictionary with the keys defined in self.map """ userinfo = {} try: conditions = [self._get_userid_filter(user_id)] conditions = self._append_where_filter(conditions, self.TABLE, self.where) filter_condition = and_(*conditions) result = self.session.execute(select(self.TABLE).filter(filter_condition)) for r in result.mappings(): if userinfo: # pragma: no cover raise Exception(f"More than one user with userid {user_id!s} found!") userinfo = self._get_user_from_mapped_object(r, attributes) except Exception as exx: # pragma: no cover log.error(f"Could not get the user information: {exx!r}") return userinfo
[docs] def get_available_info_keys(self) -> list[str]: """ This function returns a list of known privacyIDEA user attributes which can be used, e.g. for getUserList or get_user_info :return: list of possible keys for searching users """ info_keys = list(self.map.keys()) info_keys.append("id") # id is always added return info_keys
def _get_userid_filter(self, userId): column = self.TABLE.columns[self.map.get("userid")] if isinstance(column.type, String): return column == str(userId) elif isinstance(column.type, Integer): # since our user ID is usually a string we need to cast return column == int(userId) # otherwise we cast the column to string (in case of postgres UUIDs) return cast(column, String).like(userId)
[docs] def getUsername(self, userId): """ Returns the username/loginname for a given userid :param userId: The userid in this resolver :type userId: string :return: username :rtype: string """ info = self.get_user_info(userId) return info.get('username', "")
[docs] def getUserId(self, LoginName): """ resolve the loginname to the userid. :param LoginName: The login name from the credentials :type LoginName: string :return: UserId as found for the LoginName :rtype: str """ userid = "" try: conditions = [] column = self.map.get("username") conditions.append(self.TABLE.columns[column].like(LoginName)) conditions = self._append_where_filter(conditions, self.TABLE, self.where) filter_condition = and_(*conditions) result = self.session.execute(select(self.TABLE).filter(filter_condition)) for r in result.mappings(): if userid != "": # pragma: no cover raise Exception("More than one user with loginname" f" {LoginName} found!") user = self._get_user_from_mapped_object(r) userid = convert_column_to_unicode(user["userid"]) except Exception as exx: # pragma: no cover log.error(f"Could not get the user ID: {exx!r}") return userid
def _get_user_from_mapped_object(self, row: RowMapping, attributes: list[str] = None) -> dict: """ :param row: row :param attributes: list of attribute names to be returned for the user. If None or an empty list, all attributes are returned. :return: user info as dictionary """ user = {} try: if self.map.get("userid") in row: user["id"] = row[self.map.get("userid")] except UnicodeEncodeError: # pragma: no cover log.error(f"Failed to convert user: {row!r}") log.debug(f"{traceback.format_exc()!s}") for key in self.map.keys(): if attributes and key not in attributes: # only include the requested attributes continue try: raw_value = row.get(self.map.get(key)) if raw_value: if key == 'userid': val = convert_column_to_unicode(raw_value) elif isinstance(raw_value, bytes): val = raw_value.decode(self.encoding) else: val = raw_value user[key] = val except UnicodeDecodeError: # pragma: no cover user[key] = "decoding_error" log.error(f"Failed to convert user: {row!r}") log.debug(f"{traceback.format_exc()!s}") return user
[docs] def getUserList(self, search_dict: dict = None, attributes: list[str] = None) -> list[dict]: """ :param search_dict: A dictionary with search parameters :type search_dict: dict :param attributes: list of attributes to be returned for each user (id and userid are always returned). If None or an empty list, all attributes are returned. :return: list of users, where each user is a dictionary :raises ParameterError: when the search key does not exist in the mapping or database """ users = [] conditions = [] if search_dict is None: search_dict = {} # Check if all the search keys are available in the mapping unknown_search_keys = [x for x in search_dict.keys() if x not in self.map.keys()] if unknown_search_keys: log.error(f"Could not find search key ({unknown_search_keys}) in " f"the column mapping keys ({list(self.map.keys())}).") raise ParameterError(f"Search parameter ({unknown_search_keys}) not available " f"in column mapping.") for key, value in search_dict.items(): column = self.map.get(key) value = value.replace("*", "%") if column in self.TABLE.columns: conditions.append(self.TABLE.columns[column].like(value)) else: # This is a configuration error, the mapping does not correspond with the table definition log.error(f"Mapped column ('{column}') is not available in the database " f"table '{self.table}' ({list(self.TABLE.columns.keys())}).") raise ResolverError(f"Search parameter ({key}) not available in resolver.") conditions = self._append_where_filter(conditions, self.TABLE, self.where) if conditions: filter_condition = and_(*conditions) else: filter_condition = and_(True, *conditions) result = self.session.execute(select(self.TABLE). filter(filter_condition). limit(self.limit)) if attributes and "userid" not in attributes: attributes.append("userid") for r in result.mappings(): user = self._get_user_from_mapped_object(r, attributes) if "userid" in user: # Remove the "password" attribute user.pop("password", None) users.append(user) return users
[docs] def getResolverId(self): """ Returns the resolver Id This should be an Identifier of the resolver, preferable the type and the name of the resolver. :return: identifier of the resolver :rtype: str """ # Take the following parts, join them with the NULL byte and return # the hexlified SHA-1 digest id_parts = (self.connect_string, str(self.pool_size), str(self.pool_recycle), str(self.pool_timeout)) id_str = "\x00".join(id_parts) resolver_id = binascii.hexlify(hashlib.sha1(id_str.encode('utf8')).digest()) # nosec B324 # hash used as unique identifier return "sql." + resolver_id.decode('utf8')
[docs] @staticmethod def getResolverClassType(): return 'sqlresolver'
[docs] @staticmethod def getResolverType(): return IdResolver.getResolverClassType()
[docs] def loadConfig(self, config): """ Load the config from conf. :param config: The configuration from the Config Table :type config: dict """ self.server = config.get('Server', "") self.driver = config.get('Driver', "") self.database = config.get('Database', "") self.resolverId = self.database self.port = config.get('Port', "") self.limit = config.get('Limit', 100) self.user = config.get('User', "") self.password = config.get('Password', "") self.table = config.get('Table', "") self._editable = config.get("Editable", False) self.password_hash_type = config.get("Password_Hash_Type", "SSHA256") usermap = config.get('Map', {}) self.map = yaml.safe_load(usermap) self.reverse_map = {v: k for k, v in self.map.items()} self.where = config.get('Where', "") self.encoding = str(config.get('Encoding') or "latin1") self.conParams = config.get('conParams', "") self.pool_size = int(config.get('poolSize') or 5) self.pool_timeout = int(config.get('poolTimeout') or 10) # recycle SQL connections after 2 hours by default # (necessary for MySQL servers, which terminate idle connections after some hours) self.pool_recycle = int(config.get('poolRecycle') or 7200) # create the connect-string like params = {'Port': self.port, 'Password': self.password, 'conParams': self.conParams, 'Driver': self.driver, 'User': self.user, 'Server': self.server, 'Database': self.database} self.connect_string = self._create_connect_string(params) # get an engine from the engine registry, using self.getResolverId() as the key, # which involves the connect-string and the pool settings. self.engine = get_engine(self.getResolverId(), self._create_engine) # We use ``scoped_session``. self.session = scoped_session(sessionmaker(bind=self.engine))() # Session should be closed on teardown register_finalizer(self.session.close) self.session._model_changes = {} table_parts = self.table.split(".") schema = table_parts[0] if len(table_parts) > 1 else None self.table = table_parts[-1] log.debug(f"Loading table {self.table!s} from schema {schema!s}") self.TABLE = Table(self.table, MetaData(), autoload_with=self.engine, schema=schema) return self
def _create_engine(self): log.debug("using the connect string " f"{censor_connect_string(self.connect_string)!s}") log.debug(f"using pool_size={self.pool_size!s}, pool_timeout={self.pool_timeout!s}, " f"pool_recycle={self.pool_recycle!s}") try: engine = create_engine(self.connect_string, pool_size=self.pool_size, pool_recycle=self.pool_recycle, pool_timeout=self.pool_timeout) except TypeError: # The DB Engine/Poolclass might not support the pool_size. log.debug("connecting without pool_size.") engine = create_engine(self.connect_string) return engine
[docs] @classmethod def getResolverClassDescriptor(cls): descriptor = {} typ = cls.getResolverType() descriptor['clazz'] = "useridresolver.SQLIdResolver.IdResolver" descriptor['config'] = {'Server': 'string', 'Driver': 'string', 'Database': 'string', 'User': 'string', 'Password': 'password', 'Password_Hash_Type': 'string', 'Port': 'int', 'Limit': 'int', 'Table': 'string', 'Map': 'string', 'Where': 'string', 'Editable': 'int', 'poolTimeout': 'int', 'poolSize': 'int', 'poolRecycle': 'int', 'Encoding': 'string', 'conParams': 'string'} return {typ: descriptor}
[docs] @staticmethod def getResolverDescriptor(): return IdResolver.getResolverClassDescriptor()
@staticmethod def _create_connect_string(param): """ create the connect-string. Port, Password, conParams, Driver, User, Server, Database """ port = "" password = "" # nosec B105 # default parameter conParams = "" if param.get("Port"): port = ":{!s}".format(param.get("Port")) if param.get("Password"): password = ":{!s}".format(param.get("Password")) if param.get("conParams"): conParams = "?{!s}".format(param.get("conParams")) connect_string = "{!s}://{!s}{!s}{!s}{!s}{!s}/{!s}{!s}".format(param.get("Driver") or "", param.get("User") or "", password, "@" if (param.get("User") or password) else "", param.get("Server") or "", port, param.get("Database") or "", conParams) return connect_string
[docs] @classmethod def testconnection(cls, param): """ This function lets you test the to be saved SQL connection. :param param: A dictionary with all necessary parameter to test the connection. :type param: dict :return: Tuple of success and a description :rtype: (bool, string) Parameters are: Server, Driver, Database, User, Password, Port, Limit, Table, Map Where, Encoding, conParams """ num = -1 try: connect_string = cls._create_connect_string(param) log.info(f"using the connect string {censor_connect_string(connect_string)!s}") engine = create_engine(connect_string) # create a configured "Session" class session = scoped_session(sessionmaker(bind=engine))() except Exception as e: log.warning(f"Unable to connect to database: {e}") return -1, "Unable to connect to database." table_parts = param.get("Table").split(".") schema = table_parts[0] if len(table_parts) > 1 else None table_name = table_parts[-1] log.debug(f"Loading table {table_name!s} from schema {schema!s}") try: TABLE = Table(table_name, MetaData(), autoload_with=engine, schema=schema) conditions = cls._append_where_filter([], TABLE, param.get("Where")) if conditions: filter_condition = and_(*conditions) else: filter_condition = and_(True, *conditions) result = session.query(TABLE).filter(filter_condition).count() num = result desc = f"Found {num:d} users." except Exception as e: log.warning(f"Failed to retrieve users: {e!r}") desc = "Failed to retrieve users." finally: # We do not want any leftover DB connection, so we first need to close # the session such that the DB connection gets returned to the pool (it # is still open at that point!) and then dispose the engine such that the # checked-in connection gets closed. session.close() engine.dispose() return num, desc
[docs] def add_user(self, attributes: dict=None): """ Add a new user to the SQL database. attributes are these "username", "surname", "givenname", "email", "mobile", "phone", "password" :param attributes: Attributes according to the attribute mapping :return: The new UID of the user. The UserIdResolver needs to determine the way how to create the UID. """ attributes = attributes or {} # TODO: add try/except kwargs = self.prepare_attributes_for_db(attributes) log.info(f"Insert new user with attributes {kwargs!s}") r = self.session.execute(insert(self.TABLE).values(**kwargs)) self.session.commit() # Return the UID of the new object primary_key_dict = r.inserted_primary_key._asdict() return primary_key_dict[self.map.get("userid")]
[docs] def prepare_attributes_for_db(self, attributes): """ Given a dictionary of attributes, return a dictionary mapping columns to values. If the attributes contain a password, hash the password according to the configured password hash type. :param attributes: attributes dictionary :return: dictionary with column name as keys """ attributes = attributes.copy() if "password" in attributes: attributes["password"] = hash_password(attributes["password"], self.password_hash_type) columns = {} for fieldname in attributes: if fieldname in self.map: columns[self.map[fieldname]] = attributes[fieldname] return columns
[docs] def delete_user(self, uid): """ Delete a user from the SQL database. The user is referenced by the user id. :param uid: The uid of the user object, that should be deleted. :type uid: basestring :return: Returns True in case of success :rtype: bool """ res = True try: conditions = [self._get_userid_filter(uid)] conditions = self._append_where_filter(conditions, self.TABLE, self.where) filter_condition = and_(*conditions) self.session.execute(delete(self.TABLE).where(filter_condition)) self.session.commit() log.info(f'Deleted user with uid: {uid!s}') except Exception as exx: log.error(f"Error deleting user: {exx!s}") res = False return res
[docs] def update_user(self, uid, attributes=None): """ Update an existing user. This function is also used to update the password. Since the attribute mapping know, which field contains the password, this function can also take care for password changing. Attributes that are not contained in the attributes dict are not modified. :param uid: The uid of the user object in the resolver. :type uid: basestring :param attributes: Attributes to be updated. :type attributes: dict :return: True in case of success """ success = False attributes = attributes or {} try: params = self.prepare_attributes_for_db(attributes) filter_condition = self._get_userid_filter(uid) stmt = update(self.TABLE).filter(filter_condition).values(**params) result = self.session.execute(stmt) success = result.rowcount > 0 self.session.commit() log.info(f'Updated user attributes for user with uid {uid!s}') except Exception as exx: log.error(f'Error updating user attributes for user with uid {uid!s}: ' f'{exx!s}') log.debug(f'Error updating attributes {attributes!s}', exc_info=True) return success
@property def editable(self): """ Return true, if the instance of the resolver is configured editable :return: :rtype: bool """ # Depending on the database this might look different # Usually this is "1" return is_true(self._editable)
def hash_password(password, hashtype): """ Hash a password with phppass, SHA, SSHA, SSHA256, SSHA512, OTRS :param password: The password in plain text :type password: str :param hashtype: One of the hash types as string :type hashtype: str :return: The hashed password :rtype: str """ hashtype = hashtype.upper() try: password = pw_ctx.handler(hash_type_dict[hashtype]).hash(password) except KeyError as _e: # pragma: no cover raise Exception(f"Unsupported password hashtype '{hashtype!s}'. " f"Use one of {hash_type_dict.keys()!s}.") return password