Source code for privacyidea.lib.token

# SPDX-FileCopyrightText: (C) 2014 NetKnights GmbH <https://netknights.it>
#
# SPDX-License-Identifier: AGPL-3.0-or-later
#
#  privacyIDEA is a fork of LinOTP
#
#  2018-12-10 Cornelius Kölbel <cornelius.koelbel@netknights.it>
#             Add Base58
#  2018-01-21 Cornelius Kölbel <cornelius.koelbel@netknights.it>
#             Add tokenkind
#  2017-08-11 Cornelius Kölbel <cornelius.koelbel@netknights.it>
#             Add auth_cache
#  2017-04-19 Cornelius Kölbel <cornelius.koelbel@netknights.it>
#             Add support for multiple challenge response token
#  2016-08-31 Cornelius Kölbel <cornelius.koelbel@netknights.it>
#             Reset failcounter of all user tokens.
#  2016-06-21 Cornelius Kölbel <cornelius@privacyidea.org>
#             Add next pin change response
#  2016-06-13 Cornelius Kölbel <cornelius@privacyidea.org>
#             Add otp length to detail response
#  2015-10-14 Cornelius Kölbel <cornelius@privacyidea.org>
#             Add timelimit to user auth.
#  2015-08-31 Cornelius Kölbel <cornelius@privacyidea.org>
#             Add check_realm_pass for 4-eyes policy
#  2015-03-20 Cornelius Kölbel, <cornelius@privacyidea.org>
#             Add policy decorator for encryption
#  2015-03-15 Cornelius Kölbel, <cornelius@privacyidea.org>
#             Add policy decorator for lost_token password
#  2014-12-08 Cornelius Kölbel, <cornelius@privacyidea.org>
#             Rewrite the module for operation with flask
#             assure >95% code coverage
#  2014-07-02 Cornelius Kölbel, <cornelius@privacyidea.org>
#             remove references to machines, when a token is deleted
#  2014-05-08 Cornelius Kölbel, <cornelius@privacyidea.org>
#
#  License:  AGPLv3
#  contact:  http://www.privacyidea.org
#
#  Copyright (C) 2010 - 2014 LSE Leading Security Experts GmbH
#  License:  AGPLv3
#  contact:  http://www.linotp.org
#            http://www.lsexperts.de
#            linotp@lsexperts.de
#
# This code is free software; you can redistribute it and/or
# modify it under the terms of the GNU AFFERO GENERAL PUBLIC LICENSE
# License as published by the Free Software Foundation; either
# version 3 of the License, or any later version.
#
# This code is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU AFFERO GENERAL PUBLIC LICENSE for more details.
#
# You should have received a copy of the GNU Affero General Public
# License along with this program.  If not, see <http://www.gnu.org/licenses/>.
#
"""
This module contains all top level token functions.
It depends on the models, lib.user and lib.tokenclass (which depends on the
tokenclass implementations like lib.tokens.hotptoken)

This is the middleware/glue between the HTTP API and the database
"""
import datetime
import logging
import os
import random
import string
import traceback
from collections import defaultdict
from dataclasses import dataclass
from typing import Union

from dateutil.tz import tzlocal
from flask import Request
from flask_sqlalchemy.session import Session
from sqlalchemy import and_, func, or_, select
from sqlalchemy.sql import Select
from sqlalchemy.sql.expression import delete

from privacyidea.api.lib.utils import send_result
from privacyidea.lib import _
from privacyidea.lib.challengeresponsedecorators import (generic_challenge_response_reset_pin,
                                                         generic_challenge_response_resync)
from privacyidea.lib.config import (get_token_class, get_token_prefix,
                                    get_token_types, get_from_config,
                                    get_inc_fail_count_on_false_pin, SYSCONF,
                                    get_enrollable_token_types)
from privacyidea.lib.crypto import generate_password
from privacyidea.lib.decorators import (check_user_or_serial,
                                        check_copy_serials)
from privacyidea.lib.error import (TokenAdminError,
                                   ParameterError,
                                   PrivacyIDEAError, ResourceNotFoundError, PolicyError, UserError)
from privacyidea.lib.framework import get_app_config_value
from privacyidea.lib.log import log_with
from privacyidea.lib.policies.actions import PolicyAction
from privacyidea.lib.policydecorators import (libpolicy,
                                              auth_user_does_not_exist,
                                              auth_user_has_no_token,
                                              auth_user_passthru,
                                              auth_user_timelimit,
                                              auth_lastauth,
                                              auth_cache,
                                              config_lost_token,
                                              reset_all_user_tokens, force_challenge_response)
from privacyidea.lib.realm import realm_is_defined, get_realms
from privacyidea.lib.resolver import get_resolver_object
from privacyidea.lib.tokenclass import DATE_FORMAT, Tokenkind, TokenClass
from privacyidea.lib.user import User
from privacyidea.lib.utils import (is_true, BASE58, hexlify_and_unicode, check_serial_valid, create_tag_dict,
                                   redacted_phone_number, redacted_email)
from privacyidea.models import (db, Token, Realm, TokenRealm, Challenge,
                                TokenInfo, TokenOwner, TokenTokengroup, Tokengroup, TokenContainer,
                                TokenContainerToken)
from privacyidea.models.utils import clob_to_varchar

log = logging.getLogger(__name__)

optional = True
required = False

ENCODING = "utf-8"

# Configuration to generate a complete random serial
PI_TOKEN_SERIAL_RANDOM = "PI_TOKEN_SERIAL_RANDOM"  # nosec B105

B32_ALPHABET = 'ABCDEFGHIJKLMNOPQRSTUVWXYZ234567'


[docs] @dataclass(frozen=True) class TokenImportResult: successful_tokens: list[str] updated_tokens: list[str] failed_tokens: list[str]
[docs] @dataclass(frozen=True) class TokenExportResult: successful_tokens: list[str] # The serialized tokens for which the export succeeded failed_tokens: list[str] # The serial of tokens for which the export failed
[docs] @log_with(log) def create_tokenclass_object(db_token): """ (was createTokenClassObject) create a token class object from a given type If a tokenclass for this type does not exist, the function returns None. :param db_token: the database referenced token :type db_token: database token object :return: instance of the token class object :rtype: tokenclass object """ # We use the tokentype from the database tokentype = db_token.tokentype.lower() token_object = None token_class = get_token_class(tokentype) if token_class: try: token_object = token_class(db_token) except Exception as e: # pragma: no cover raise TokenAdminError(_("create_tokenclass_object failed: {0!r}").format(e), id=1609) else: log.error('type {0!r} not found in tokenclasses'.format(tokentype)) return token_object
def _create_token_query(tokentype=None, token_type_list=None, realm=None, assigned=None, user=None, serial_exact=None, serial_wildcard=None, serial_list=None, active=None, resolver=None, rollout_state=None, description=None, revoked=None, locked=None, userid=None, tokeninfo=None, maxfail=None, allowed_realms=None, container_serial=None, all_nodes=False) -> Select: session = db.session session.expire_all() sql_query = select(Token) # Conditional Joins at the top to avoid re-joining should_join_token_realm = (bool(realm and realm.strip("*")) or allowed_realms is not None) should_join_token_owner = (bool(userid and userid.strip("*")) or bool(resolver and resolver.strip("*")) or bool(user) or assigned is not None or not all_nodes) if should_join_token_realm: sql_query = sql_query.outerjoin(TokenRealm, TokenRealm.token_id == Token.id) if should_join_token_owner: sql_query = sql_query.outerjoin(TokenOwner, Token.id == TokenOwner.token_id) # Filtering by realm and allowed_realms with exclusion logic if realm and realm.strip("*") and allowed_realms is not None: # Step 1: Find all realms that should be excluded (the intersection) # This subquery finds all token_ids that are in both the specified realm # and one of the allowed_realms. realm_id_subquery = select(Realm.id).where( func.lower(Realm.name) == realm.lower() ) allowed_realms_ids = select(Realm.id).where( func.lower(Realm.name).in_([r.lower() for r in allowed_realms]) ) excluded_token_ids = ( select(TokenRealm.token_id) .where(TokenRealm.realm_id.in_(realm_id_subquery)) .intersect( select(TokenRealm.token_id) .where(TokenRealm.realm_id.in_(allowed_realms_ids)) ) ) # Step 2: Apply the filters, excluding the intersection sql_query = sql_query.where( and_( TokenRealm.realm_id.in_(realm_id_subquery), TokenRealm.realm_id.in_(allowed_realms_ids), Token.id.notin_(excluded_token_ids) ) ) else: # Fallback to existing logic if the specific condition is not met if realm and realm.strip("*"): if "*" in realm: sql_query = sql_query.where( TokenRealm.realm_id.in_( select(Realm.id).where(func.lower(Realm.name).like(realm.lower().replace("*", "%"))) ) ) else: sql_query = sql_query.where( TokenRealm.realm_id == select(Realm.id).where( func.lower(Realm.name) == realm.lower()).scalar_subquery()) if allowed_realms is not None: sql_query = sql_query.where( TokenRealm.realm_id.in_( select(Realm.id).where(func.lower(Realm.name).in_(allowed_realms)) ) ) # Filtering by tokentype if tokentype and tokentype.strip("*"): if "*" in tokentype: sql_query = sql_query.where( Token.tokentype.like(tokentype.lower().replace("*", "%")) ) else: sql_query = sql_query.where( func.lower(Token.tokentype) == tokentype.lower() ) # Filtering by token_type_list if token_type_list: sql_query = sql_query.where( Token.tokentype.in_([t.lower() for t in token_type_list]) ) # Filtering by description if description and description.strip("*"): if "*" in description: sql_query = sql_query.where( func.lower(Token.description).like( description.lower().replace("*", "%") ) ) else: sql_query = sql_query.where( func.lower(Token.description) == description.lower() ) # Filtering by assigned status if assigned is not None: if assigned: sql_query = sql_query.where(TokenOwner.id.is_not(None)) else: sql_query = sql_query.where(TokenOwner.id.is_(None)) # Filtering by serial if serial_wildcard and serial_wildcard.strip("*"): sql_query = sql_query.where( Token.serial.like(serial_wildcard.replace("*", "%")) ) if serial_exact: sql_query = sql_query.where(Token.serial == serial_exact) if serial_list: sql_query = sql_query.where(Token.serial.in_(serial_list)) # Filtering by user object if user and not user.is_empty(): if user.login and not user.resolver: # A specific username was requested but could not be found in any # resolver. Raise the user error here instead of in the user class. The condition is the same. raise UserError("The user can not be found in any resolver in this realm!") else: if user.realm: realm_db = select(Realm).where(func.lower(Realm.name) == user.realm.lower()) # Execute the subquery using the provided session realm_db_result = session.execute(realm_db).scalars().first() if realm_db_result: sql_query = sql_query.where(TokenOwner.realm_id == realm_db_result.id) else: raise ResourceNotFoundError(f"Realm '{user.realm}' does not exist.") if user.resolver: sql_query = sql_query.where(TokenOwner.resolver == user.resolver) (uid, _rtype, _resolver) = user.get_user_identifiers() if uid: uid_str = str(uid) if isinstance(uid, int) else uid sql_query = sql_query.where(TokenOwner.user_id == uid_str) # Filtering by token status flags if active is not None: sql_query = sql_query.where(Token.active == active) if revoked is not None: sql_query = sql_query.where(Token.revoked == revoked) if locked is not None: sql_query = sql_query.where(Token.locked == locked) if maxfail is not None: if maxfail: sql_query = sql_query.where(Token.failcount >= Token.maxfail) else: sql_query = sql_query.where(Token.failcount < Token.maxfail) # Filtering by rollout state if rollout_state and rollout_state.strip("*"): if "*" in rollout_state: sql_query = sql_query.where( func.lower(Token.rollout_state).like( rollout_state.lower().replace("*", "%") ) ) else: sql_query = sql_query.where( func.lower(Token.rollout_state) == rollout_state.lower() ) # Filtering by tokeninfo if tokeninfo is not None: if len(tokeninfo) != 1: raise PrivacyIDEAError(_("I can only create SQL filters from tokeninfo of length 1.")) key, value = list(tokeninfo.items())[0] sql_query = sql_query.join(TokenInfo, TokenInfo.token_id == Token.id) sql_query = sql_query.where(TokenInfo.Key == key) sql_query = sql_query.where(clob_to_varchar(TokenInfo.Value) == value) # Filtering by container_serial if container_serial is not None: if not container_serial: sql_query = sql_query.outerjoin( TokenContainerToken, TokenContainerToken.token_id == Token.id ).where(TokenContainerToken.container_id.is_(None)) else: subquery = select(TokenContainerToken.token_id).join( TokenContainer, TokenContainer.id == TokenContainerToken.container_id ).where( func.upper(TokenContainer.serial) == container_serial.upper() ) sql_query = sql_query.where(Token.id.in_(subquery)) # Node-specific resolver and realm configuration. if not all_nodes: local_node_uuid = get_app_config_value("PI_NODE_UUID") realms = get_realms() resolvers = [] realms_to_filter = [] for realm_name, realm_data in realms.items(): added = False for res in realm_data.get("resolver", []): if res.get("name"): if not res.get("node") or res["node"] == local_node_uuid: resolvers.append(res["name"]) added = True if not added: realms_to_filter.append(realm_name) # Build the resolver filter condition resolver_filter = or_( TokenOwner.id.is_(None), TokenOwner.resolver.in_(resolvers), ) # Re-join realm and explicitly include the join conditions in the filter to handle unassigned tokens # The realm join is now correctly placed within the `if not all_nodes` block. sql_query = sql_query.outerjoin(Realm, TokenOwner.realm_id == Realm.id) realm_filter = or_( TokenOwner.realm_id.is_(None), and_( func.lower(Realm.name).not_in([r.lower() for r in realms_to_filter]), TokenOwner.realm_id == Realm.id, TokenOwner.token_id == Token.id, ) ) # Combine all filters with the existing query using and_() sql_query = sql_query.where(and_(resolver_filter, realm_filter)) # print(f"----------------------------- CREATE TOKEN QUERY -----------------------------") # from sqlalchemy.dialects import postgresql # print(sql_query.compile(dialect=postgresql.dialect(), compile_kwargs={"literal_binds": True})) # print("-------------------------------------------------------------------------------") return sql_query
[docs] def get_tokens_paginated_generator(tokentype=None, realm=None, assigned=None, user=None, serial_wildcard=None, active=None, resolver=None, rollout_state=None, revoked=None, locked=None, tokeninfo=None, maxfail=None, psize=1000): """ Fetch chunks of ``psize`` tokens that match the filter criteria from the database and generate lists of token objects. See ``get_tokens`` for information on the arguments. Note that individual lists may contain less than ``psize`` elements if a token entry has an invalid type. :param psize: Maximum size of chunks that are fetched from the database :param assigned: Whether the token is assigned to a user :type assigned: bool or None :return: This is a generator that generates non-empty lists of token objects. """ session = db.session main_sql_query = _create_token_query( tokentype=tokentype, realm=realm, assigned=assigned, user=user, serial_wildcard=serial_wildcard, active=active, resolver=resolver, rollout_state=rollout_state, revoked=revoked, locked=locked, tokeninfo=tokeninfo, maxfail=maxfail ).order_by(Token.id) last_id = None while True: sql_query = main_sql_query if last_id is not None: sql_query = sql_query.where(Token.id > last_id) sql_query = sql_query.limit(psize) tokens = session.scalars(sql_query).unique().all() if tokens: token_objects = [] for token in tokens: token_obj = create_tokenclass_object(token) if isinstance(token_obj, TokenClass): token_objects.append(token_obj) yield token_objects if len(tokens) < psize: break last_id = tokens[-1].id else: break
[docs] def convert_token_objects_to_dicts(tokens, user, user_role="user", allowed_realms=None, hidden_token_info=None): """ Convert a list of token objects to a list of dictionaries. Additionally, checks whether the requesting user is allowed to see the token information. If not it is reduced to the tokens serial. :param tokens: A list of token objects :type tokens: list :param user: The user object performing the request :type user: User object :param user_role: The role of the logged-in user :type user_role: str :param allowed_realms: A list of the realms the admin is allowed to see, None if the admin is allowed to see all realms :param hidden_token_info: List of token-info keys to remove from the results :return: A list of dictionaries :rtype: list """ token_dict_list = [] for token in tokens: if isinstance(token, TokenClass): token_dict = token.get_as_dict() # add user information # In certain cases the LDAP or SQL server might not be reachable. # Then an exception is raised token_dict["username"] = "" token_dict["user_realm"] = "" try: token_owner = token.user if token_owner: token_dict["username"] = token_owner.login token_dict["user_realm"] = token_owner.realm token_dict["user_editable"] = get_resolver_object(token_owner.resolver).editable except Exception as exx: log.error("User information can not be retrieved: {0!s}".format(exx)) log.debug(traceback.format_exc()) token_dict["username"] = "**resolver error**" if hidden_token_info: for key in list(token_dict['info']): if key in hidden_token_info: token_dict['info'].pop(key) # check if token is in a container token_dict["container_serial"] = "" from privacyidea.lib.container import find_container_for_token container = find_container_for_token(token.get_serial()) if container: token_dict["container_serial"] = container.serial # Reduce token info if the user is not the owner if user_role != "admin": if not user or user.login != token_dict["username"] or user.realm != token_dict["user_realm"]: token_dict = {"serial": token_dict["serial"]} elif user_role == "admin" and allowed_realms is not None: same_realms = list(set(token_dict["realms"]).intersection(allowed_realms)) if len(same_realms) == 0: # The token is in no realm the admin is allowed to see token_dict = {"serial": token_dict["serial"]} token_dict_list.append(token_dict) return token_dict_list
[docs] @log_with(log) # @cache.memoize(10) def get_tokens(tokentype=None, token_type_list=None, realm=None, assigned=None, user=None, serial=None, serial_wildcard=None, active=None, resolver=None, rollout_state=None, count=False, revoked=None, locked=None, tokeninfo=None, maxfail=None, all_nodes=False): """ (was getTokensOfType) This function returns a list of token objects of a * given type, * of a realm * or tokens with assignment or not * for a certain serial number or * for a User E.g. thus you can get all assigned tokens of type totp. :param tokentype: The type of the token. If None, all tokens are returned. :type tokentype: basestring :param token_type_list: A list of token types. If None or empty, all token types are returned. :type token_type_list: list :param realm: get tokens of a realm. If None, all tokens are returned. If allowed_realms is not None, it must contain this realm, otherwise no matching tokens will be found. :type realm: basestring :param assigned: Get either assigned (True) or unassigned (False) tokens. If None, gets all tokens. :type assigned: bool :param user: Filter for the Owner of the token :type user: User Object :param serial: The exact serial number of a token :type serial: basestring :param serial_wildcard: A wildcard to match token serials :type serial_wildcard: basestring :param active: Whether only active (True) or inactive (False) tokens should be returned :type active: bool :param resolver: filter for the given resolver name :type resolver: basestring :param rollout_state: returns a list of the tokens in the certain rollout state. Some tokens are not enrolled in a single step but in multiple steps. These tokens are then identified by the DB-column rollout_state. :param count: If set to True, only the number of the result and not the list is returned. :type count: bool :param revoked: Only search for revoked tokens or only for not revoked tokens :type revoked: bool :param locked: Only search for locked tokens or only for not locked tokens :type locked: bool :param tokeninfo: Return tokens with the given tokeninfo. The tokeninfo is a key/value dictionary :type tokeninfo: dict :param maxfail: If only tokens should be returned, which failcounter reached maxfail :param all_nodes: If True, ignore node specific realm configurations (default: False) :type all_nodes: bool :return: A list of lib.tokenclass objects. :rtype: list or int """ serial_list = None if serial and "*" not in serial and "," in serial: serial_list = serial.replace(" ", "").split(",") serial = None sql_query = _create_token_query(tokentype=tokentype, token_type_list=token_type_list, realm=realm, assigned=assigned, user=user, serial_exact=serial, serial_wildcard=serial_wildcard, serial_list=serial_list, active=active, resolver=resolver, rollout_state=rollout_state, revoked=revoked, locked=locked, tokeninfo=tokeninfo, maxfail=maxfail, all_nodes=all_nodes) # Warning for unintentional exact serial matches if serial is not None and "*" in serial: log.info("Exact match on a serial containing a wildcard: {!r}".format(serial)) # Warning for unintentional wildcard serial matches if serial_wildcard is not None and "*" not in serial_wildcard: log.info("Wildcard match on serial without a wildcard: {!r}".format(serial_wildcard)) session: Session = db.session if count: ret = session.execute( select(func.count()).select_from(sql_query.subquery()) ).scalar_one() else: tokens = session.execute(sql_query).unique().scalars().all() token_list = [] for token in tokens: token = create_tokenclass_object(token) if isinstance(token, TokenClass): token_list.append(token) ret = token_list return ret
[docs] @log_with(log) def get_tokens_paginate(tokentype=None, token_type_list=None, realm=None, assigned=None, user=None, serial=None, active=None, resolver=None, rollout_state=None, sortby=Token.serial, sortdir="asc", psize=15, page=1, description=None, userid=None, allowed_realms=None, tokeninfo=None, hidden_tokeninfo=None, container_serial=None): """ This function is used to retrieve a token list, that can be displayed in the Web UI. It supports pagination. Each retrieved page will also contain a "next" and a "prev", indicating the next or previous page. If either does not exist, it is None. :param tokentype: :param token_type_list: A list of token types :param realm: A realm the token is assigned to (if allowed_realms is not None, it must contain this realm, otherwise no matching tokens will be found) :param assigned: Returns assigned (True) or not assigned (False) tokens :type assigned: bool :param user: The user, whose token should be displayed :type user: User object :param serial: a pattern for matching the serial or a comma separated list of exact serials :param active: Returns active (True) or inactive (False) tokens :param resolver: A resolver name, which may contain "*" for filtering. :type resolver: basestring :param userid: A userid, which may contain "*" for filtering. :type userid: basestring :param rollout_state: :param sortby: Sort by a certain Token DB field. The default is Token.serial. If a string like "serial" is provided, we try to convert it to the DB column. :type sortby: A Token column or a string. :param sortdir: Can be "asc" (default) or "desc" :type sortdir: basestring :param psize: The size of the page :type psize: int :param page: The number of the page to view. Starts with 1 ;-) :type page: int :param allowed_realms: A list of realms, that the admin is allowed to see :type allowed_realms: list :param tokeninfo: Return tokens with the given tokeninfo. The tokeninfo is a key/value dictionary :param description: Take the description of the token into the query :type description: str :param hidden_tokeninfo: List of token-info keys to remove from the results :type hidden_tokeninfo: list :param container_serial: The serial number of a container :type container_serial: basestring :return: dict with tokens, prev, next and count :rtype: dict """ serial_list = None if serial and "*" not in serial and "," in serial: serial_list = serial.replace(" ", "").split(",") serial = None session: Session = db.session session.commit() sql_query: Select = _create_token_query(tokentype=tokentype, token_type_list=token_type_list, realm=realm, assigned=assigned, user=user, serial_wildcard=serial, serial_list=serial_list, active=active, resolver=resolver, tokeninfo=tokeninfo, rollout_state=rollout_state, description=description, userid=userid, allowed_realms=allowed_realms, container_serial=container_serial) if isinstance(sortby, str): cols = Token.__table__.columns if sortby in cols: sortby = cols.get(sortby) else: log.warning(f'Unknown sort column "{sortby}". Using "serial" instead.') sortby = Token.serial if sortdir == "desc": sql_query = sql_query.order_by(sortby.desc()) else: sql_query = sql_query.order_by(sortby.asc()) session: Session = db.session # Get the total count from a query without limit/offset total_count = session.execute( select(func.count()).select_from(sql_query.subquery()) ).scalar_one() # Now apply the limit and offset for the current page offset = (page - 1) * psize tokens = session.scalars(sql_query.limit(psize).offset(offset)).unique().all() token_list = [] for token in tokens: # TODO first creating the object and then converting it to a dict, probably not efficient token = create_tokenclass_object(token) if isinstance(token, TokenClass): token_dict = token.get_as_dict() # add user information # In certain cases the LDAP or SQL server might not be reachable. # Then an exception is raised token_dict["username"] = "" token_dict["user_realm"] = "" try: user = token.user if user: token_dict["username"] = user.login token_dict["user_realm"] = user.realm token_dict["user_editable"] = get_resolver_object( user.resolver).editable except Exception as ex: log.error(f"User information can not be retrieved: {ex!r}") log.debug(traceback.format_exc()) token_dict["username"] = "**resolver error**" if hidden_tokeninfo: for key in list(token_dict['info']): if key in hidden_tokeninfo: token_dict['info'].pop(key) # check if token is in a container token_dict["container_serial"] = "" from privacyidea.lib.container import find_container_for_token container = find_container_for_token(token.get_serial()) if container: token_dict["container_serial"] = container.serial token_list.append(token_dict) previous_page = page - 1 if page > 1 else None next_page = page + 1 if offset + psize < total_count else None ret = { "tokens": token_list, "prev": previous_page, "next": next_page, "count": total_count } return ret
[docs] def get_one_token(*args, silent_fail=False, **kwargs): """ Fetch exactly one token according to the given filter arguments, which are passed to ``get_tokens``. Raise ``ResourceNotFoundError`` if no token was found. Raise ``ParameterError`` if more than one token was found. :param silent_fail: Instead of raising an exception we return None silently :returns: Token object :rtype: privacyidea.lib.tokenclass.TokenClass """ result = get_tokens(*args, **kwargs) if not result: if silent_fail: return None raise ResourceNotFoundError(_("The requested token could not be found.")) elif len(result) > 1: if silent_fail: log.warning("More than one matching token was found.") return None raise ParameterError(_("More than one matching token was found.")) else: return result[0]
[docs] def get_tokens_from_serial_or_user(serial, user, **kwargs): """ Fetch tokens, either by (exact) serial, or all tokens of a single user. In case a serial number is given, check that exactly one token is returned and raise a ResourceNotFoundError if that is not the case. In case a user is given, the result can also be empty. :param serial: exact serial number or None :param user: a user object or None :param kwargs: additional arguments to ``get_tokens`` :return: a (possibly empty) list of tokens :rtype: list """ if serial: return [get_one_token(serial=serial, user=user, **kwargs)] else: return get_tokens(serial=serial, user=user, **kwargs)
[docs] @log_with(log) def get_token_type(serial): """ Returns the tokentype of a given serial number. If the token does not exist or can not be determined, an empty string is returned. :param serial: the serial number of the to be searched token :type serial: string :return: tokentype :rtype: string """ if serial and "*" in serial: return "" try: return get_one_token(serial=serial).type except ResourceNotFoundError: return ""
[docs] @log_with(log) def check_serial(serial): """ This checks, if the given serial number can be used for a new token. it returns a tuple (result, new_serial) result being True if the serial does not exist, yet. new_serial is a suggestion for a new serial number, that does not exist, yet. :param serial: Serial number to check if it can be used for a new token. :type serial: str :result: result of check and (new) serial number :rtype: tuple(bool, str) """ # serial does not exist, yet result = True new_serial = serial i = 0 while get_tokens(serial=new_serial): # as long as we find a token, modify the serial: i += 1 result = False new_serial = "{0!s}_{1:02d}".format(serial, i) return result, new_serial
[docs] @log_with(log) def get_num_tokens_in_realm(realm, active=True): """ This returns the number of tokens in one realm. :param realm: The name of the realm :type realm: basestring :param active: If only active tokens should be taken into account :type active: bool :return: The number of tokens in the realm :rtype: int """ return get_tokens(realm=realm, active=active, count=True)
[docs] @log_with(log) def get_realms_of_token(serial, only_first_realm=False): """ This function returns a list of the realms of a token :param serial: the exact serial number of the token :type serial: basestring :param only_first_realm: Whether we should only return the first realm :type only_first_realm: bool :return: list of the realm names :rtype: list """ if serial and "*" in serial: return [] try: token = get_one_token(serial=serial) realms = token.get_realms() except ResourceNotFoundError: realms = [] if len(realms) > 1: log.debug(f"Token {serial} in more than one realm: {realms}") if only_first_realm: if realms: realms = realms[0] else: realms = None return realms
[docs] @log_with(log) def token_exist(serial): """ returns true if the token with the exact given serial number exists :param serial: the serial number of the token """ if serial: return get_tokens(serial=serial, count=True) > 0 else: # If we have no serial we return false anyway! return False
[docs] @log_with(log) def get_token_owner(serial): """ returns the user object, to which the token is assigned. the token is identified and retrieved by its serial number If the token has no owner, None is returned Wildcards in the serial number are ignored. This raises ``ResourceNotFoundError`` if the token could not be found. :param serial: serial number of the token :type serial: basestring :return: The owner of the token :rtype: User object or None """ token = get_one_token(serial=serial) return token.user
[docs] @log_with(log) def is_token_owner(serial, user): """ Check if the given user is the owner of the token with the given serial number :param serial: The serial number of the token :type serial: str :param user: The user that needs to be checked :type user: User object :return: Return True or False :rtype: bool """ ret = False token_owner = get_token_owner(serial) if token_owner is not None: ret = token_owner == user return ret
[docs] @log_with(log) def get_tokens_in_resolver(resolver): """ Return a list of the token objects, that contain this very resolver :param resolver: The resolver, the tokens should be in :type resolver: basestring :return: list of tokens with this resolver :rtype: list of token objects """ ret = get_tokens(resolver=resolver) return ret
[docs] @log_with(log) def get_tokenclass_info(tokentype, section=None): """ return the config definition of a dynamic token :param tokentype: the tokentype of the token like "totp" or "hotp" :type tokentype: basestring :param section: subsection of the token definition - optional :type section: basestring :return: dictionary with the configuration definition of the token. If the token type is not found, an empty dictionary is returned :rtype: dict """ res = {} tokenclass = get_token_class(tokentype) if tokenclass: res = tokenclass.get_class_info(section) return res
[docs] @log_with(log) def get_otp(serial, current_time=None): """ This function returns the current OTP value for a given Token. The tokentype needs to support this function. if the token does not support getting the OTP value, a -2 is returned. If the token could not be found, ResourceNotFoundError is raised. :param serial: serial number of the token :param current_time: a fake servertime for testing of TOTP token :type current_time: datetime.datetime :return: tuple with (result, pin, otpval, passw) :rtype: tuple """ token = get_one_token(serial=serial) return token.get_otp(current_time=current_time)
[docs] @log_with(log) def get_multi_otp(serial, count=0, epoch_start=0, epoch_end=0, current_time=None, timestamp=None): """ This function returns a list of OTP values for the given Token. Please note, that the tokentype needs to support this function. :param serial: the serial number of the token :type serial: basestring :param count: number of the next otp values (to be used with event or time based tokens) :param epoch_start: unix time start date (used with time based tokens) :param epoch_end: unix time end date (used with time based tokens) :param current_time: Simulate the servertime :type current_time: datetime :param timestamp: Simulate the servertime (unix time in seconds) :type timestamp: int :return: dictionary of otp values :rtype: dictionary """ ret = {"result": False} token = get_one_token(serial=serial) log.debug(f"Getting multiple otp values for token {token}. curTime={current_time}") res, error, otp_dict = token.get_multi_otp(count=count, epoch_start=epoch_start, epoch_end=epoch_end, curTime=current_time, timestamp=timestamp) log.debug(f"Received {res!r}, {error!r}, and {len(otp_dict)} otp values") if res: ret = otp_dict ret["result"] = True else: ret["error"] = error return ret
[docs] @log_with(log) def get_token_by_otp(token_list, otp="", window=10): """ Search the token in the token_list, that creates the given OTP value. :param token_list: the list of token objects to be investigated :type token_list: list of token objects :param otp: the otp value, that needs to be found :type otp: basestring :param window: the window of search :type window: int :return: The token, that creates this OTP value :rtype: TokenClass """ result_token = None result_list = [] for token in token_list: log.debug(f"Checking token {token.get_serial()}") try: r = token.check_otp_exist(otp=otp, window=window) log.debug(f"Result = {int(r):d}") if r >= 0: result_list.append(token) except Exception as err: # A flaw in a single token should not stop privacyidea from finding the right token log.warning(f"Error calculating OTP for token {token.get_serial()}: {err}") if len(result_list) == 1: result_token = result_list[0] elif result_list: raise TokenAdminError(_('multiple tokens are matching this OTP value!'), id=1200) return result_token
[docs] @log_with(log) def get_serial_by_otp(token_list, otp="", window=10): """ Returns the serial for a given OTP value The token_list would be created by get_tokens() :param token_list: the list of token objects to be investigated :type token_list: list of token objects :param otp: the otp value, that needs to be found :param window: the window of search :type window: int :return: the serial for a given OTP value and the user :rtype: basestring """ serial = None token = get_token_by_otp(token_list, otp=otp, window=window) if token is not None: serial = token.get_serial() return serial
[docs] @log_with(log) def get_serial_by_otp_list(token_list: list, otp_list: list, window: int = 10, counter: int = None) -> list[str]: """ Returns a list of serials for a given list of OTP values The tokenobject_list would be created by get_tokens() :param token_list: the list of token objects to be investigated :param otp_list: a list of otp values, that need to be found :param window: the window of search :param counter: the counter value to be used for the OTP calculation, if None the actual counter of the token is used :return: a list of serials for the given OTP values and the user """ result_list = [] for otp in otp_list: for token in token_list: log.debug(f"checking token {token.get_serial()}") try: if token.type == "hotp": r = token.check_otp_exist(otp=otp, window=window, inc_counter=False, counter=counter) else: r = token.check_otp_exist(otp=otp, window=window, inc_counter=False) log.debug(f"otp_exists = {r > 0}") if r >= 0: result_list.append(token) except Exception as err: # A flaw in a single token should not stop privacyidea from finding # the right token log.warning(f"error in calculating OTP for token {token.get_serial()}: {err}") token_list = result_list result_list = [] serials = [token.get_serial() for token in token_list] return serials
[docs] @log_with(log) def gen_serial(tokentype: str, prefix: str = None) -> str: """ Generate a serial for a given token type. The serial consists of the token type prefix and a randomly generated string of characters. By default, the random string contains 8 characters. If no prefix is given, it is determined by the token class prefix. The generation of the random part of the serial is determined by the ``PI_TOKEN_SERIAL_RANDOM`` setting in :ref:`the config file <picfg_token_serial_random>`. The default is to calculate a two-part serial with the first 4 characters containing the current token count at the time of the generation and the next 4 characters containing a random hexadecimal value. This severely limits the number of generated serials since for every count value only 16\\ :sup:`4` possible values for the random part exist. Specific count values are only ever reused if tokens are deleted. Setting ``PI_TOKEN_SERIAL_RANDOM`` to ``True`` enables to completely generate the random string with 4 random digits and the rest using the Base32 character table (See :rfc:`4648#section-6`) thus allowing more than 10\\ :sup:`10` different serials. Due to the required uniqueness of the serial, each generated serial is checked if it already exists in the database. If the number of possibilities for generated serials decreases, this can lead to excessive queries on the database. :param tokentype: the token type prefix is done by a lookup on the tokens :type tokentype: str :param prefix: A prefix to the serial number :type prefix: str :return: serial number :rtype: str """ random_serial = get_app_config_value(PI_TOKEN_SERIAL_RANDOM, False) # TODO: the serial length is currently not configurable through the UI serial_len = int(get_from_config("SerialLength") or 8) if not prefix: prefix = get_token_prefix(tokentype.lower(), tokentype.upper()) if random_serial: def _gen_serial(_tokennum) -> str: digit_part = random.randrange(10000) # nosec B311 b32_part = "".join([random.choice(B32_ALPHABET) for _ in range(serial_len - 4)]) # nosec B311 return f"{prefix}{digit_part:04}{b32_part}" else: def _gen_serial(_tokennum): h_serial = '' num_str = '{:04d}'.format(_tokennum) h_len = serial_len - len(num_str) if h_len > 0: h_serial = hexlify_and_unicode(os.urandom(h_len)).upper()[0:h_len] return "{0!s}{1!s}{2!s}".format(prefix, num_str, h_serial) # now search the number of tokens of tokenytype in the token database session = db.session tokennum = session.execute( select(func.count()).select_from(Token).where(Token.tokentype == tokentype) ).scalar_one() # Now create the serial serial = _gen_serial(tokennum) # now test if serial already exists while True: numtokens = session.execute( select(func.count()).select_from(Token).where(Token.serial == serial) ).scalar_one() if numtokens == 0: # ok, there is no such token, so we're done break serial = _gen_serial(tokennum + numtokens) # pragma: no cover return serial
[docs] @log_with(log) def import_token(serial, token_dict, tokenrealms=None): """ This function is used during the import of a PSKC file. :param serial: The serial number of the token :type serial: str :param token_dict: A dictionary describing the token like :: { "type": ..., "description": ..., "otpkey": ..., "counter: ..., "timeShift": ... } :type token_dict: dict :param tokenrealms: List of realms to set as realms of the token :type tokenrealms: list :return: the token object """ init_param = {'serial': serial, 'description': token_dict.get("description", "imported")} for p in ['type', 'otpkey', 'otplen', 'timeStep', 'hashlib', 'tans']: if p in token_dict: init_param[p] = token_dict[p] user_obj = None if token_dict.get("user"): user_obj = User(token_dict.get("user").get("username"), token_dict.get("user").get("realm"), token_dict.get("user").get("resolver")) # Imported tokens are usually hardware tokens token = init_token(init_param, user=user_obj, tokenrealms=tokenrealms, tokenkind=Tokenkind.HARDWARE) if token_dict.get("counter"): token.set_otp_count(token_dict.get("counter")) if token_dict.get("timeShift"): token.add_tokeninfo("timeShift", token_dict.get("timeShift")) return token
[docs] @log_with(log, hide_args_keywords={'param': 'pin'}) def init_token(param: dict, user: User = None, tokenrealms: list[str] = None, tokenkind: str = None) -> TokenClass: """ Create a new token or update an existing token with the specified parameters. :param param: initialization parameters like :: { "serial": ..., (optional) "type": ...., (optional, default=hotp) "otpkey": ... } :type param: dict :param user: the token owner :type user: User Object :param tokenrealms: the realms, to which the token should belong :type tokenrealms: list :param tokenkind: The kind of the token, can be "software", "hardware" or "virtual" :return: token object or None :rtype: TokenClass """ token_type = param.get("type") or "hotp" # Check for unsupported token type token_types = get_enrollable_token_types() if token_type.lower() not in token_types: log.error(f"type {token_type} not found in tokentypes: {token_types}") raise TokenAdminError(_("init token failed. Unknown token type:") + f" {token_type}", id=1610) serial = param.get("serial") or gen_serial(token_type, param.get("prefix")) check_serial_valid(serial) realms = [] # Check if a token with this serial already exists and # create a list of the found tokens tokens = get_tokens(serial=serial) token_count = len(tokens) if token_count == 0: # A token with the serial was not found, so we create a new one db_token = Token(serial, tokentype=token_type.lower()) else: # The token already exist, so we update the token db_token = tokens[0].token # Make sure the type is not changed between the initialization and the update old_type = db_token.tokentype if old_type.lower() != token_type.lower(): msg = _("Token {serial} already exists with type {old_type}. " "Can not initialize token with new type " "{token_type}").format(serial=serial, old_type=old_type, token_type=token_type) log.error(msg) raise TokenAdminError(_("init token failed:") + " " + msg) # If there is a realm as parameter (and the realm is not empty), but no # user, we assign the token to this realm. if param.get("realm") and 'user' not in param: realms.append(param.get("realm")) # Assign the token to all tokenrealms and to the user realm if tokenrealms and isinstance(tokenrealms, list): realms.extend(tokenrealms) if user and user.realm: realms.append(user.realm) try: # Save the token to the database if token_count == 0: db_token.save() # The tokenclass object is created token = create_tokenclass_object(db_token) if token_count == 0: # If this token is a newly created one, we have to set up the defaults, # which later might be overwritten by the token.update(param) token.set_defaults() # Set the user of the token if user is not None and user.login != "": token.add_user(user) # Set the token realms (updates the TokenRealm table) if realms or user: token.set_realms(realms) token.update(param) except Exception as e: log.error(f"token create failed: {e}") log.debug(f"{traceback.format_exc()}") # Delete the newly created token from the db if token_count == 0: if token: token.delete_token() else: db_token.delete() raise # We only set the tokenkind here if it was explicitly set in the init_token call. # In all other cases it is set in the update method of the tokenclass. if tokenkind: token.add_tokeninfo("tokenkind", tokenkind) # Set the validity period validity_period_start = param.get("validity_period_start") validity_period_end = param.get("validity_period_end") if validity_period_end: token.set_validity_period_end(validity_period_end) if validity_period_start: token.set_validity_period_start(validity_period_start) # Creation Date token.add_tokeninfo("creation_date", datetime.datetime.now(datetime.timezone.utc).isoformat(timespec="seconds")) # Safe the token object to make sure all changes are persisted in the db token.save() return token
[docs] @log_with(log) @check_user_or_serial def remove_token(serial=None, user=None): """ remove the token that matches the serial number or all tokens of the given user and also remove the realm associations and all its challenges :param user: The user, who's tokens should be deleted. :type user: User object :param serial: The serial number of the token to delete (exact) :type serial: basestring :return: The number of deleted token :rtype: int """ tokens = get_tokens_from_serial_or_user(serial=serial, user=user) token_count = len(tokens) # Delete challenges of such a token for token in tokens: token.delete_token() return token_count
[docs] @log_with(log) def set_realms(serial, realms=None, add=False, allowed_realms: list = None): """ Set all realms of a token. This sets the realms new. I.e. it does not add realms. So realms that are not contained in the list will not be assigned to the token anymore. If the token could not be found, a ResourceNotFoundError is raised. Thus, setting ``realms=[]`` clears all realms assignments. :param serial: the serial number of the token (exact) :type serial: basestring :param realms: A list of realm names :type realms: list :param add: if the realms should be added and not replaced :type add: bool :param allowed_realms: A list of realms, that the admin is allowed to manage """ realms = realms or [] corrected_realms = [] # get rid of non-defined realms for realm in realms: if realm_is_defined(realm): corrected_realms.append(realm) token = get_one_token(serial=serial) # Check if admin is allowed to set the realms old_realms = token.get_realms() matching_realms = corrected_realms if allowed_realms: matching_realms = list(set(corrected_realms).intersection(allowed_realms)) excluded_realms = list(set(corrected_realms) - set(matching_realms)) if len(excluded_realms) > 0: log.info(f"User is not allowed to set realms {excluded_realms} for token {serial}.") # Check if admin is allowed to remove the old realms not_allowed_realms = set(old_realms) - set(allowed_realms) # Add realms that are not allowed to be removed to the set list matching_realms = list(set(matching_realms).union(not_allowed_realms)) token.set_realms(matching_realms, add=add) token.save()
[docs] @log_with(log) def set_defaults(serial): """ Set the default values for the token with the given serial number (exact) :param serial: token serial :type serial: basestring :return: None """ db_token = get_one_token(serial=serial).token db_token.otplen = int(get_from_config("DefaultOtpLen", 6)) db_token.count_window = int(get_from_config("DefaultCountWindow", 15)) db_token.maxfail = int(get_from_config("DefaultMaxFailCount", 15)) db_token.sync_window = int(get_from_config("DefaultSyncWindow", 1000)) db_token.tokentype = "hotp" db_token.save()
[docs] @log_with(log) def assign_token(serial, user, pin=None, encrypt_pin=False, error_message=None): """ Assign token to a user. If the PIN is given, the PIN is reset. :param serial: The serial number of the token :type serial: basestring :param user: The user, to whom the token should be assigned. :type user: User object :param pin: The PIN for the newly assigned token. :type pin: basestring :param encrypt_pin: Whether the PIN should be stored in an encrypted way :type encrypt_pin: bool :param error_message: The error message, that is displayed in case the token is already assigned :type error_message: basestring """ token = get_one_token(serial=serial) # Check if the token already belongs to another user old_user = token.user if old_user: log.warning(f"token already assigned to user: {old_user!r}") error_message = error_message or _("Token already assigned to user {old_user!r}").format(old_user=old_user) raise TokenAdminError(error_message, id=1103) token.add_user(user) if pin is not None: token.set_pin(pin, encrypt=encrypt_pin) # reset the OtpFailCounter token.set_failcount(0) try: token.save() except Exception as e: # pragma: no cover log.error('update Token DB failed') raise TokenAdminError(_("Token assign failed for {0!r}/{1!s} : {2!r}").format(user, serial, e), id=1105) log.debug("successfully assigned token with serial " "{0!r} to user {1!r}".format(serial, user)) return True
[docs] @log_with(log) @check_user_or_serial def unassign_token(serial, user=None): """ unassign the user from the token, or all tokens of a user :param serial: The serial number of the token to unassign (exact). Can be None :param user: A user whose tokens should be unassigned :return: number of unassigned tokens """ tokens = get_tokens_from_serial_or_user(serial=serial, user=user) for token in tokens: token.set_pin("") token.set_failcount(0) try: # Delete the tokenowner entry session = db.session stmt = delete(TokenOwner).where(TokenOwner.token_id == token.token.id) session.execute(stmt) session.commit() except Exception as e: # pragma: no cover log.error('update token DB failed') raise TokenAdminError(_("Token unassign failed for") + f" {serial!r}/{user!r}: {e!r}", id=1105) log.debug(f"successfully unassigned token with serial {token.get_serial()!r}") # TODO: test with more than 1 token return len(tokens)
[docs] @log_with(log) def resync_token(serial, otp1, otp2, options=None, user=None): """ Resynchronize the token of the given serial number and user by searching the otp1 and otp2 in the future otp values. :param serial: token serial number (exact) :type serial: str :param otp1: first OTP value :type otp1: str :param otp2: second OTP value, directly after the first :type otp2: str :param options: additional options like the servertime for TOTP token :type options: dict :param user: The user, who's token should be resynced :type user: User object :return: result of the resync :rtype: bool """ ret = False tokens = get_tokens_from_serial_or_user(serial=serial, user=user) for token in tokens: ret = token.resync(otp1, otp2, options) token.save() return ret
[docs] @log_with(log) @check_user_or_serial def reset_token(serial, user=None): """ Reset the failcounter of a single token, or of all tokens of one user. :param serial: serial number (exact) :param user: :return: The number of tokens, that were reset :rtype: int """ tokens = get_tokens_from_serial_or_user(serial=serial, user=user) for token in tokens: token.reset() token.save() return len(tokens)
[docs] @log_with(log) @check_user_or_serial def set_pin(serial, pin, user=None, encrypt_pin=False): """ Set the token PIN of the token. This is the static part that can be used to authenticate. :param pin: The pin of the token :type pin: str :param user: If the user is specified, the pins for all tokens of this user will be set :type user: User object :param serial: If the serial is specified, the PIN for this very token will be set. (exact) :param encrypt_pin: Whether the PIN should be stored in an encrypted way :type encrypt_pin: bool :return: The number of PINs set (usually 1) :rtype: int """ if isinstance(user, str): # check if by accident the wrong parameter (like PIN) # is put into the user attribute log.warning(f"Parameter user must not be a string: {user!r}") raise ParameterError(_("Parameter user must not be a string:") + f" {user!r}", id=1212) tokens = get_tokens_from_serial_or_user(serial=serial, user=user) for token in tokens: token.set_pin(pin, encrypt=encrypt_pin) token.save() return len(tokens)
[docs] @log_with(log) def set_pin_user(serial, user_pin, user=None): """ This sets the user pin of a token. This just stores the information of the user pin for (e.g. an eTokenNG, Smartcard) in the database :param serial: The serial number of the token (exact) :type serial: basestring :param user_pin: The user PIN :type user_pin: str :param user: The user, for who's token the PIN should be set :type user: User object :return: The number of PINs set (usually 1) :rtype: int """ tokens = get_tokens_from_serial_or_user(serial=serial, user=user) for token in tokens: token.set_user_pin(user_pin) token.save() return len(tokens)
[docs] @log_with(log) def set_pin_so(serial, so_pin, user=None): """ Set the SO PIN of a smartcard. The SO Pin can be used to reset the PIN of a smartcard. The SO PIN is stored in the database, so that it could be used for automatic processes for User PIN resetting. :param serial: The serial number of the token (exact) :type serial: basestring :param so_pin: The Security Officer PIN :type so_pin: basestring :param user: The user, for who's token the SO PIN should be set :type user: User object :return: The number of SO PINs set. (usually 1) :rtype: int """ tokens = get_tokens_from_serial_or_user(serial=serial, user=user) for token in tokens: token.set_so_pin(so_pin) token.save() return len(tokens)
[docs] @log_with(log) @check_user_or_serial def revoke_token(serial, user=None): """ Revoke a token, or all tokens of a single user. :param serial: The serial number of the token (exact) :type serial: basestring :param user: all tokens of the user will be enabled or disabled :type user: User object :return: Number of tokens that were enabled/disabled :rtype: int """ tokens = get_tokens_from_serial_or_user(user=user, serial=serial) for token in tokens: token.revoke() token.save() return len(tokens)
[docs] @log_with(log) @check_user_or_serial def enable_token(serial, enable=True, user=None): """ Enable or disable a token, or all tokens of a single user. This can be checked with is_token_active. Enabling an already active token will return 0. :param serial: The serial number of the token :type serial: basestring :param enable: False is the token should be disabled :type enable: bool :param user: all tokens of the user will be enabled or disabled :type user: User object :return: Number of tokens that were enabled/disabled :rtype: """ # We search for all matching tokens first, in case the user has # provided a wrong serial number. Then we filter for the desired tokens. tokens = get_tokens_from_serial_or_user(user=user, serial=serial) count = 0 for token in tokens: if token.is_active() == (not enable): token.enable(enable) token.save() count += 1 return count
[docs] def is_token_active(serial): """ Return True if the token is active, otherwise false Raise ResourceError if the token could not be found. :param serial: The serial number of the token :type serial: basestring :return: True or False :rtype: bool """ token = get_one_token(serial=serial) return token.token.active
[docs] @log_with(log) @check_user_or_serial def set_otplen(serial, otplen=6, user=None): """ Set the otp length of the token defined by serial or for all tokens of the user. The OTP length is usually 6 or 8. :param serial: The serial number of the token (exact) :type serial: basestring :param otplen: The length of the OTP value :type otplen: int :param user: The owner of the tokens :type user: User object :return: number of modified tokens :rtype: int """ tokens = get_tokens_from_serial_or_user(serial=serial, user=user) for token in tokens: token.set_otplen(otplen) token.save() return len(tokens)
[docs] @log_with(log) @check_user_or_serial def set_hashlib(serial, hashlib="sha1", user=None): """ Set the hashlib in the tokeninfo. Can be something like sha1, sha256... :param serial: The serial number of the token (exact) :type serial: basestring :param hashlib: The hashlib of the token :type hashlib: basestring :param user: The User, for who's token the hashlib should be set :type user: User object :return: the number of token infos set :rtype: int """ tokenobject_list = get_tokens_from_serial_or_user(serial=serial, user=user) for tokenobject in tokenobject_list: tokenobject.set_hashlib(hashlib) tokenobject.save() return len(tokenobject_list)
[docs] @log_with(log) @check_user_or_serial def set_count_auth(serial, count, user=None, max=False, success=False): """ The auth counters are stored in the token info database field. There are different counters that can be set:: count_auth -> max=False, success=False count_auth_max -> max=True, success=False count_auth_success -> max=False, success=True count_auth_success_max -> max=True, success=True :param count: The counter value :type count: int :param user: The user owner of the tokens to modify :type user: User object :param serial: The serial number of the one token to modify (exact) :type serial: basestring :param max: True, if either count_auth_max or count_auth_success_max are to be modified :type max: bool :param success: True, if either ``count_auth_success`` or ``count_auth_success_max`` are to be modified :type success: bool :return: number of modified tokens :rtype: int """ tokenobject_list = get_tokens_from_serial_or_user(serial=serial, user=user) for tokenobject in tokenobject_list: if max: if success: tokenobject.set_count_auth_success_max(count) else: tokenobject.set_count_auth_max(count) else: if success: tokenobject.set_count_auth_success(count) else: tokenobject.set_count_auth(count) tokenobject.save() return len(tokenobject_list)
[docs] @log_with(log) @check_user_or_serial def get_tokeninfo(serial, info): """ get a token info field in the database. :param serial: The serial number of the token :type serial: basestring :param info: The key of the info in the dict """ tokenobject_list = get_tokens_from_serial_or_user(serial=serial, user=None) if len(tokenobject_list) == 1: return tokenobject_list[0].get_tokeninfo(info)
[docs] @log_with(log) @check_user_or_serial def add_tokeninfo(serial, info, value=None, value_type=None, user=None): """ Sets a token info field in the database. The info is a dict for each token of key/value pairs. :param serial: The serial number of the token :type serial: basestring :param info: The key of the info in the dict :param value: The value of the info :param value_type: The type of the value. If set to "password" the value is stored encrypted :type value_type: basestring :param user: The owner of the tokens, that should be modified :type user: User object :return: the number of modified tokens :rtype: int """ tokenobject_list = get_tokens_from_serial_or_user(serial=serial, user=user) for tokenobject in tokenobject_list: tokenobject.add_tokeninfo(info, value) tokenobject.save() return len(tokenobject_list)
[docs] @log_with(log) @check_user_or_serial def delete_tokeninfo(serial, key, user=None): """ Delete a specific token info field in the database. :param serial: The serial number of the token :type serial: basestring :param key: The key of the info in the dict :param user: The owner of the tokens, that should be modified :type user: User object :return: the number of tokens matching the serial and user. This number also includes tokens that did not have the token info *key* set in the first place! :rtype: int """ tokenobject_list = get_tokens_from_serial_or_user(serial=serial, user=user) for tokenobject in tokenobject_list: tokenobject.delete_tokeninfo(key) tokenobject.save() return len(tokenobject_list)
[docs] @log_with(log) @check_user_or_serial def set_validity_period_start(serial, user, start): """ Set the validity period for the given token. :param serial: serial number (exact) :param user: :param start: Timestamp in the format DD/MM/YY HH:MM :type start: basestring """ tokenobject_list = get_tokens_from_serial_or_user(serial=serial, user=user) for tokenobject in tokenobject_list: tokenobject.set_validity_period_start(start) tokenobject.save() return len(tokenobject_list)
[docs] @log_with(log) @check_user_or_serial def set_validity_period_end(serial, user, end): """ Set the validity period for the given token. :param serial: serial number (exact) :param user: :param end: Timestamp in the format DD/MM/YY HH:MM :type end: basestring """ tokenobject_list = get_tokens_from_serial_or_user(serial=serial, user=user) for tokenobject in tokenobject_list: tokenobject.set_validity_period_end(end) tokenobject.save() return len(tokenobject_list)
[docs] @log_with(log) @check_user_or_serial def set_sync_window(serial, syncwindow=1000, user=None): """ The sync window is the window that is used during resync of a token. Such many OTP values are calculated ahead, to find the matching otp value and counter. :param serial: The serial number of the token (exact) :type serial: basestring :param syncwindow: The size of the sync window :type syncwindow: int :param user: The owner of the tokens, which should be modified :type user: User object :return: number of modified tokens :rtype: int """ tokenobject_list = get_tokens_from_serial_or_user(serial=serial, user=user) for tokenobject in tokenobject_list: tokenobject.set_sync_window(syncwindow) tokenobject.save() return len(tokenobject_list)
[docs] @log_with(log) @check_user_or_serial def set_count_window(serial, countwindow=10, user=None): """ The count window is used during authentication to find the matching OTP value. This sets the count window per token. :param serial: The serial number of the token (exact) :type serial: basestring :param countwindow: the size of the window :type countwindow: int :param user: The owner of the tokens, which should be modified :type user: User object :return: number of modified tokens :rtype: int """ tokenobject_list = get_tokens_from_serial_or_user(serial=serial, user=user) for tokenobject in tokenobject_list: tokenobject.set_count_window(countwindow) tokenobject.save() return len(tokenobject_list)
[docs] @log_with(log) @check_user_or_serial def set_description(serial, description, user=None, token=None): """ Set the description of a token :param serial: The serial number of the token (exact) :type serial: basestring :param description: The description for the token :type description: str :param user: The owner of the tokens, which should be modified :type user: User object :return: True. In case of an error raise an exception :rtype: int """ if token is None: token = get_one_token(serial=serial, user=user) token.set_description(description) token.save() return True
[docs] @log_with(log) @check_user_or_serial def set_failcounter(serial, counter, user=None): """ Set the fail counter of a token. :param serial: The serial number of the token (exact) :param counter: THe counter to which the fail counter should be set :param user: An optional user :return: Number of tokens, where the fail counter was set. """ tokenobject_list = get_tokens_from_serial_or_user(serial=serial, user=user) for tokenobject in tokenobject_list: tokenobject.set_failcount(counter) tokenobject.save() return len(tokenobject_list)
[docs] @log_with(log) @check_user_or_serial def set_max_failcount(serial, maxfail, user=None): """ Set the maximum fail counts of tokens. This is the maximum number a failed authentication is allowed. :param serial: The serial number of the token (exact) :type serial: basestring :param maxfail: The maximum allowed failed authentications :type maxfail: int :param user: The owner of the tokens, which should be modified :type user: User object :return: number of modified tokens :rtype: int """ tokenobject_list = get_tokens_from_serial_or_user(serial=serial, user=user) for tokenobject in tokenobject_list: tokenobject.set_maxfail(maxfail) tokenobject.save() return len(tokenobject_list)
[docs] @log_with(log) @check_copy_serials def copy_token_pin(serial_from, serial_to): """ This function copies the token PIN from one token to the other token. This can be used for workflows like lost token. In fact the PinHash and the PinSeed are transferred :param serial_from: The token to copy from :type serial_from: basestring :param serial_to: The token to copy to :type serial_to: basestring :return: True. In case of an error raise an exception :rtype: bool """ tokenobject_from = get_one_token(serial=serial_from) tokenobject_to = get_one_token(serial=serial_to) pinhash, seed = tokenobject_from.get_pin_hash_seed() tokenobject_to.set_pin_hash_seed(pinhash, seed) tokenobject_to.save() return True
[docs] @check_copy_serials def copy_token_user(serial_from, serial_to): """ This function copies the user from one token to the other token. In fact the user_id, resolver and resolver type are transferred. :param serial_from: The token to copy from :type serial_from: basestring :param serial_to: The token to copy to :type serial_to: basestring :return: True. In case of an error raise an exception :rtype: bool """ tokenobject_from = get_one_token(serial=serial_from) tokenobject_to = get_one_token(serial=serial_to) # For backward compatibility we remove the potentially old users from the token. # TODO: Later we probably want to be able to "add" new users to a token. unassign_token(serial_to) TokenOwner(token_id=tokenobject_to.token.id, user_id=tokenobject_from.token.first_owner.user_id, realm_id=tokenobject_from.token.first_owner.realm_id, resolver=tokenobject_from.token.first_owner.resolver).save() # Also copy other assigned realms of the token. copy_token_realms(serial_from, serial_to) return True
[docs] @check_copy_serials def copy_token_realms(serial_from, serial_to): """ Copy the realms of one token to the other token :param serial_from: The token to copy from :param serial_to: The token to copy to :return: None """ tokenobject_from = get_one_token(serial=serial_from) tokenobject_to = get_one_token(serial=serial_to) realm_list = tokenobject_from.token.get_realms() tokenobject_to.set_realms(realm_list)
[docs] @log_with(log) @libpolicy(config_lost_token) def lost_token(serial, new_serial=None, password=None, validity=10, contents="8", pw_len=16, options=None): """ This is the workflow to handle a lost token. The token <serial> is lost and will be disabled. A new token of type password token will be created and assigned to the user. The PIN of the lost token will be copied to the new token. The new token will have a certain validity period. :param serial: Token serial number :param new_serial: new serial number :param password: new password :param validity: Number of days, the new token should be valid :type validity: int :param contents: The contents of the generated password. Can be a string like ``"Ccn"``. * "C": upper case characters * "c": lower case characters * "n": digits * "s": special characters * "8": base58 :type contents: str :param pw_len: The length of the generated password :type pw_len: int :param options: optional values for the decorator passed from the upper API level :type options: dict :return: result dictionary :rtype: dict """ res = {} new_serial = new_serial or "lost{0!s}".format(serial) user = get_token_owner(serial) log.debug("doing lost token for serial {0!r} and user {1!r}".format(serial, user)) if user is None or user.is_empty(): err = _("You can only define a lost token for an assigned token.") log.warning("{0!s}".format(err)) raise TokenAdminError(err, id=2012) character_pool = "{0!s}{1!s}{2!s}".format(string.ascii_lowercase, string.ascii_uppercase, string.digits) if contents != "": character_pool = "" if "c" in contents: character_pool += string.ascii_lowercase if "C" in contents: character_pool += string.ascii_uppercase if "n" in contents: character_pool += string.digits if "s" in contents: character_pool += "!#$%&()*+,-./:;<=>?@[]^_" if "8" in contents: character_pool += BASE58 if password is None: password = generate_password(size=pw_len, characters=character_pool) res['serial'] = new_serial tokenobject = init_token({"otpkey": password, "serial": new_serial, "type": "pw", "description": _("temporary replacement for {0!s}").format( serial)}) res['init'] = tokenobject is not None if res['init']: res['user'] = copy_token_user(serial, new_serial) res['pin'] = copy_token_pin(serial, new_serial) # set validity period end_date = (datetime.datetime.now(tzlocal()) + datetime.timedelta(days=validity)).strftime(DATE_FORMAT) tokenobject_list = get_tokens(serial=new_serial) for tokenobject in tokenobject_list: tokenobject.set_validity_period_end(end_date) # fill results res['valid_to'] = "xxxx" res['password'] = password res['end_date'] = end_date # disable token res['disable'] = enable_token(serial, enable=False) return res
[docs] @log_with(log) def check_realm_pass(realm, passw, options=None, include_types=None, exclude_types=None): """ This function checks, if the given passw matches any token in the given realm. This can be used for the 4-eyes token. Only tokens that are assigned are tested. The options dictionary may contain a key/value pair 'exclude_types' or 'include_types' with the value containing a list of token types to exclude/include from/in the search. It returns the res True/False and a reply_dict, which contains the serial number of the matching token. :param realm: The realm of the user :param passw: The password containing PIN+OTP :param options: Additional options that are passed to the tokens :type options: dict :param include_types: List of token types to use :type include_types: list or str :param exclude_types: List to token types *not* to use :type exclude_types: list or str :return: tuple of bool and dict """ reply_dict = {} # since an attacker does not know, which token is tested, we restrict to # only active tokens. He would not guess that the given OTP value is that # of an inactive token. tokenobject_list = get_tokens(realm=realm, assigned=True, active=True) if not tokenobject_list: reply_dict["message"] = _("There is no active and assigned token in this realm") return False, reply_dict else: # reduce tokens by type if include_types: incl = include_types if isinstance(include_types, list) else [include_types] tokenobject_list = [tok for tok in tokenobject_list if tok.type in incl] elif exclude_types: excl = exclude_types if isinstance(exclude_types, list) else [exclude_types] tokenobject_list = [tok for tok in tokenobject_list if tok.type not in excl] if not tokenobject_list: reply_dict["message"] = _('There is no active and assigned token in ' 'this realm, included types: {0!s}, excluded ' 'types: {1!s}').format(include_types, exclude_types) return False, reply_dict return check_token_list(tokenobject_list, passw, options=options, allow_reset_all_tokens=False)
[docs] @log_with(log, hide_args=[1]) @libpolicy(auth_lastauth) def check_serial_pass(serial, passw, options=None): """ This function checks the otp for a given serial If the OTP matches, True is returned and the otp counter is increased. The function tries to determine the user (token owner), to derive possible additional policies from the user. :param serial: The serial number of the token :type serial: basestring :param passw: The password usually consisting of pin + otp :type passw: basestring :param options: Additional options. Token specific. :type options: dict :return: tuple of result (True, False) and additional dict :rtype: tuple """ token_object = get_one_token(serial=serial) res, reply_dict = check_token_list([token_object], passw, user=token_object.user, options=options, allow_reset_all_tokens=True) return res, reply_dict
[docs] @log_with(log) def check_otp(serial, otpval): """ This function checks the OTP for a given serial number :param serial: :param otpval: :return: tuple of result and dictionary containing a message if the verification failed :rtype: tuple(bool, dict) """ reply_dict = {} token_object = get_one_token(serial=serial) res = token_object.check_otp(otpval) >= 0 if not res: reply_dict["message"] = _("OTP verification failed.") return res, reply_dict
[docs] @libpolicy(auth_cache) @libpolicy(auth_user_does_not_exist) @libpolicy(auth_user_has_no_token) @libpolicy(auth_user_timelimit) @libpolicy(auth_lastauth) @libpolicy(auth_user_passthru) @libpolicy(force_challenge_response) @log_with(log, hide_kwargs=["passw"]) def check_user_pass(user, passw, options=None): """ This function checks the otp for a given user. It is called by the API /validate/check If the OTP matches, True is returned and the otp counter is increased. :param user: The user who is trying to authenticate :type user: User object :param passw: The password usually consisting of pin + otp :type passw: basestring :param options: Additional options. Token specific. :type options: dict :return: tuple of result (True, False) and additional dict :rtype: tuple """ token_type = options.pop("token_type", None) token_objects = get_tokens(user=user, tokentype=token_type) reply_dict = {} if not token_objects: # The user has no tokens assigned res = False reply_dict["message"] = _("The user has no tokens assigned") else: token_object = token_objects[0] res, reply_dict = check_token_list(token_objects, passw, user=token_object.user, options=options, allow_reset_all_tokens=True) return res, reply_dict
[docs] def create_challenges_from_tokens(token_list, reply_dict, options=None): """ Get a list of active tokens and create challenges for these tokens. The reply_dict is modified accordingly. The transaction_id and the messages are added to the reply_dict. :param token_list: The list of the token objects, that can do challenge response :param reply_dict: The dictionary that is passed to the API response :param options: Additional options. Passed from the upper layer :return: None """ options = options or {} options["push_triggered"] = False options["passkey_nonce"] = None reply_dict["multi_challenge"] = [] transaction_id = None message_list = [] for token in token_list: # Check if the max auth is succeeded if token.check_all(message_list): challenge_created, message, new_transaction_id, challenge_info = token.create_challenge( transactionid=transaction_id, options=options) if message: # We need to pass the info if a push token has been triggered, so that require presence can re-use the # challenge instead of creating a new one with a different answer # Also check the challenge info if the presence answer is returned to pass it on for tag replacement additional_tags = {} if token.get_type() == "push": options["push_triggered"] = True if "presence_answer" in challenge_info: additional_tags["presence_answer"] = challenge_info["presence_answer"] # Add the reply to the response message = challenge_text_replace(message, user=token.user, token_obj=token, additional_tags=additional_tags) message_list.append(message) if challenge_created: if new_transaction_id: transaction_id = new_transaction_id challenge_info = challenge_info or {} challenge_info["transaction_id"] = transaction_id challenge_info["serial"] = token.token.serial token_type = token.get_tokentype() challenge_info["type"] = token_type # Only set client_mode if it has not been returned by the tokenclass creating the challenge challenge_info["client_mode"] = challenge_info.get("client_mode") or token.client_mode challenge_info["message"] = message # If they exist, add next pin and next password change next_pin = token.get_tokeninfo("next_pin_change") if next_pin: challenge_info["next_pin_change"] = next_pin challenge_info["pin_change"] = token.is_pin_change() next_passw = token.get_tokeninfo("next_password_change") if next_passw: challenge_info["next_password_change"] = next_passw challenge_info["password_change"] = token.is_pin_change(password=True) # If a passkey challenge has been triggered, reuse the nonce for all other passkey challenges # Normally, you would use allowCredentials, but we do one challenge per one token if token_type == "passkey": passkey_nonce = challenge_info["challenge"] options["passkey_nonce"] = passkey_nonce reply_dict["multi_challenge"].append(challenge_info) reply_dict.update(challenge_info) # FIXME: This is deprecated and should be removed one day if message_list: unique_messages = set(message_list) reply_dict["message"] = ", ".join(unique_messages) # The "messages" element is needed by some decorators reply_dict["messages"] = message_list # TODO: This line is deprecated: Add the information for the old administrative triggerchallenge reply_dict["transaction_ids"] = [challenge.get("transaction_id") for challenge in reply_dict.get("multi_challenge", [])]
[docs] def weigh_token_type(token_obj): """ This method returns a weight of a token type, which is used to sort the tokentype list. Other weighing functions can be implemented. The Push token weighs the most, so that it will be sorted to the end. :param token_obj: token object :return: weight of the tokentype :rtype: int """ if token_obj.type.upper() == "PUSH": return 1000 else: return ord(token_obj.type[0])
[docs] @log_with(log, hide_args=[1]) @libpolicy(reset_all_user_tokens) @libpolicy(generic_challenge_response_reset_pin) @libpolicy(generic_challenge_response_resync) def check_token_list(token_object_list, passw, user=None, options=None, allow_reset_all_tokens=False): """ Takes a list of token objects and tries to find the matching token for the given passw. It also tests * if the token is active or * the max fail count is reached, * if the validity period is ok... This function is called by check_serial_pass, check_user_pass and check_yubikey_pass. :param token_object_list: list of identified tokens :param passw: the provided password, can be just the PIN or PIN+OTP :param user: the identified use - as class object :param options: additional parameters, which are passed to the token :param allow_reset_all_tokens: If set to True, the policy reset_all_user_tokens is evaluated to reset all user tokens accordingly. Note: This parameter is used in the decorator. # TODO not good :return: tuple of success and optional response :rtype: (bool, dict) """ res = False reply_dict = {} increase_auth_counters = not is_true(get_from_config(key="no_auth_counter")) # Add the user to the options, so that every token with access to options can see the user if options: options = options.copy() else: options = {} options.update({'user': user}) # If there has been one token in challenge mode, we only handle challenges challenge_response_token_list = [] challenge_request_token_list = [] pin_matching_token_list = [] invalid_token_list = [] valid_token_list = [] messages = [] # Remove locked tokens from token_object_list if len(token_object_list) > 0: token_object_list = [token for token in token_object_list if not token.is_revoked()] if len(token_object_list) == 0: # If there is no unlocked token left. raise TokenAdminError(_("This action is not possible, since the token is locked"), id=1007) # Remove disabled token types from token_object_list if PolicyAction.DISABLED_TOKEN_TYPES in options and options[PolicyAction.DISABLED_TOKEN_TYPES]: token_object_list = [token for token in token_object_list if token.type not in options[PolicyAction.DISABLED_TOKEN_TYPES]] # Remove certain disabled tokens from token_object_list if len(token_object_list) > 0: token_object_list = [token for token in token_object_list if token.use_for_authentication(options)] for token_object in sorted(token_object_list, key=weigh_token_type): if log.isEnabledFor(logging.DEBUG): # Avoid a SQL query triggered by ``token_object.user`` in case the log level is not DEBUG log.debug(f"Found user with loginId {token_object.user}: {token_object.get_serial()}") # Reset exceeded fail counter if reset timeout is reached token_object.check_reset_failcount() if not token_object.check_all(messages): # token can not be used for authentication (e.g. maxfail exceeded, disabled, not within validity period) pass elif token_object.is_challenge_response(passw, user=user, options=options): # This is a challenge response, and it still has a challenge DB entry if token_object.has_db_challenge_response(passw, user=user, options=options): challenge_response_token_list.append(token_object) else: # This is a transaction_id, that either never existed or has expired or is not for this token. # We add this to the invalid_token_list invalid_token_list.append(token_object) elif token_object.is_challenge_request(passw, user=user, options=options): # This is a challenge request challenge_request_token_list.append(token_object) else: if not (PolicyAction.FORCE_CHALLENGE_RESPONSE in options and is_true( options[PolicyAction.FORCE_CHALLENGE_RESPONSE])): # This is a normal authentication attempt try: # Pass the length of the valid_token_list to ``authenticate`` so that # the push token can react accordingly options["valid_token_num"] = len(valid_token_list) pin_match, otp_count, repl = token_object.authenticate(passw, user, options=options) except TokenAdminError as tae: # Token is locked pin_match = False otp_count = -1 repl = {'message': tae.message} repl = repl or {} reply_dict.update(repl) if otp_count >= 0: # This is a successful authentication valid_token_list.append(token_object) elif pin_match: # The PIN of the token matches pin_matching_token_list.append(token_object) else: # Nothing matches at all invalid_token_list.append(token_object) else: invalid_token_list.append(token_object) log.info(f"Skipping authentication try for token {token_object.get_serial()}" f" because policy force_challenge_response is set.") """ There might be 2 in pin_matching_token_list 0 in valid_token_list 10 in invalid_token_list 0 in challenge_token_list. in this case, the failcounter of the 2 tokens in pin_matching_token_list needs to be increased. And return False If there is 0 pin_matching 0 valid 10 invalid 0 challenge AND incFailCountOnFalsePin is True, then the failcounter of the 10 invalid tokens need to be increased. And return False If there is X pin_matching 1+ valid X invalid 0 challenge Then the authentication with the valid tokens was successful and the <count> of the valid tokens need to be increased to the new count. """ if valid_token_list: # One or more successfully authenticating tokens found # We need to return success message_list = [_("matching {0:d} tokens").format(len(valid_token_list))] # write serial numbers or something to audit log for token_obj in valid_token_list: # Reset the failcounter, if there is a timeout set token_obj.check_reset_failcount() # Check if the max auth is succeeded. # We need to set the offsets, since we are in the n+1st authentication. if token_obj.check_all(message_list): if increase_auth_counters: token_obj.inc_count_auth_success() # The token is active and the auth counters are ok. res = True if not reply_dict.get("type"): reply_dict["type"] = token_obj.token.tokentype if reply_dict["type"] != token_obj.token.tokentype: reply_dict["type"] = "undetermined" # reset the failcounter of valid token token_obj.reset() # Run the token post method. e.g. registration token deletes itself. token_obj.post_success() if len(valid_token_list) == 1: # If only one token was found, we add the serial number, # the token type and the OTP length reply_dict["serial"] = valid_token_list[0].token.serial reply_dict["type"] = valid_token_list[0].token.tokentype reply_dict["otplen"] = valid_token_list[0].token.otplen # If they exist, add next pin and next password change next_pin = valid_token_list[0].get_tokeninfo("next_pin_change") if next_pin: reply_dict["next_pin_change"] = next_pin reply_dict["pin_change"] = valid_token_list[0].is_pin_change() next_passw = valid_token_list[0].get_tokeninfo("next_password_change") if next_passw: reply_dict["next_password_change"] = next_passw reply_dict["password_change"] = valid_token_list[0].is_pin_change(password=True) reply_dict["message"] = ", ".join(message_list) elif challenge_response_token_list: # The RESPONSE for a previous request of a challenge response token was found. matching_challenge = False further_challenge = False for token_object in challenge_response_token_list: if token_object.check_challenge_response(passw=passw, options=options) >= 0: reply_dict["serial"] = token_object.token.serial matching_challenge = True messages = [] if not token_object.is_fit_for_challenge(messages, options=options): messages.insert(0, _("Challenge matches, but token is not fit for challenge")) reply_dict["message"] = ". ".join(messages) log.info("Received a valid response to a " "challenge for a non-fit token {0!s}. {1!s}".format(token_object.token.serial, reply_dict["message"])) else: # Challenge matches, token is active and token is fit for challenge res = True if increase_auth_counters: token_object.inc_count_auth_success() reply_dict["message"] = _("Found matching challenge") # If they exist, add next pin and next password change next_pin = token_object.get_tokeninfo("next_pin_change") if next_pin: reply_dict["next_pin_change"] = next_pin reply_dict["pin_change"] = token_object.is_pin_change() next_passw = token_object.get_tokeninfo("next_password_change") if next_passw: reply_dict["next_password_change"] = next_passw reply_dict["password_change"] = token_object.is_pin_change(password=True) token_object.challenge_janitor() if token_object.has_further_challenge(options): # The token creates further challenges, so create the new challenge # and new transaction_id create_challenges_from_tokens([token_object], reply_dict, options) further_challenge = True res = False else: # This was the last successful challenge, so # reset the fail counter of the challenge response token token_object.reset() token_object.post_success() # Clean up all challenges with this transaction_id transaction_id = options.get("transaction_id") or options.get("state") session = db.session stmt = delete(Challenge).where(Challenge.transaction_id == str(transaction_id)) session.execute(stmt) session.commit() # Authentication is successful, stop here break if not res and not further_challenge: # We did not find any successful response, so we need to increase the failcounters for token_obj in challenge_response_token_list: if not token_obj.is_outofband(): token_obj.inc_failcount() if not matching_challenge: if len(challenge_response_token_list) == 1: reply_dict["serial"] = challenge_response_token_list[0].token.serial reply_dict["type"] = challenge_response_token_list[0].token.tokentype reply_dict["message"] = _("Response did not match the challenge.") else: reply_dict["message"] = _("Response did not match for " "{0!s} tokens.").format(len(challenge_response_token_list)) elif challenge_request_token_list: # This is the initial REQUEST of a challenge response token active_challenge_token = [t for t in challenge_request_token_list if t.token.active] if len(active_challenge_token) == 0: reply_dict["message"] = _("No active challenge response token found") else: for token_obj in challenge_request_token_list: token_obj.check_reset_failcount() if is_true(options.get("increase_failcounter_on_challenge")): token_obj.inc_failcount() create_challenges_from_tokens(active_challenge_token, reply_dict, options) elif pin_matching_token_list: # We did not find a valid token and no challenge. # But there are tokens, with a matching pin. # So we increase the failcounter. Return failure. for token_object in pin_matching_token_list: token_object.inc_failcount() if get_from_config(SYSCONF.RESET_FAILCOUNTER_ON_PIN_ONLY, False, return_bool=True): token_object.check_reset_failcount() reply_dict["message"] = _("wrong otp value") if len(pin_matching_token_list) == 1: # If there is only one pin matching token, we look if it was # a previous OTP value token = pin_matching_token_list[0] _r, pin, otp = token.split_pin_pass(passw) if token.is_previous_otp(otp): reply_dict["message"] += _(". previous otp used again") if increase_auth_counters: for token_obj in pin_matching_token_list: token_obj.inc_count_auth() # write the serial numbers to the audit log if len(pin_matching_token_list) == 1: reply_dict["serial"] = pin_matching_token_list[0].token.serial reply_dict["type"] = pin_matching_token_list[0].token.tokentype reply_dict["otplen"] = pin_matching_token_list[0].token.otplen elif invalid_token_list: # There were only tokens, that did not match the OTP value and # not even the PIN. # Depending on IncFailCountOnFalsePin, we increase the failcounter. reply_dict["message"] = _("wrong otp pin") if get_inc_fail_count_on_false_pin(): for token_object in invalid_token_list: token_object.inc_failcount() if increase_auth_counters: token_object.inc_count_auth() elif messages: reply_dict["message"] = ", ".join(set(messages)) else: # There is no suitable token for authentication reply_dict["message"] = _("No suitable token found for authentication.") return res, reply_dict
[docs] def get_dynamic_policy_definitions(scope: str = None) -> dict: """ This returns the dynamic policy definitions that come with the new loaded token classes. :param scope: an optional scope parameter. Only return the policies of this scope. If the scope is not defined, an empty dictionary is returned. :return: The policy definition for the token or only for the scope. """ from privacyidea.lib.policy import SCOPE, MAIN_MENU, GROUP pol = {SCOPE.ADMIN: {}, SCOPE.USER: {}, SCOPE.AUTH: {}, SCOPE.ENROLL: {}, SCOPE.WEBUI: {}, SCOPE.AUTHZ: {}} enrollable_token_types = get_enrollable_token_types() for ttype in get_token_types(): if ttype in enrollable_token_types: pol[SCOPE.ADMIN][f"enroll{ttype.upper()}"] = { 'type': 'bool', 'desc': _("Admin is allowed to initialize {0!s} tokens.").format(ttype.upper()), 'mainmenu': [MAIN_MENU.TOKENS], 'group': GROUP.ENROLLMENT } conf = get_tokenclass_info(ttype, section='user') if 'enroll' in conf: pol[SCOPE.USER][f"enroll{ttype.upper()}"] = { 'type': 'bool', 'desc': _("The user is allowed to enroll a {0!s} token.").format(ttype.upper()), 'mainmenu': [MAIN_MENU.TOKENS], 'group': GROUP.ENROLLMENT } # now merge the dynamic Token policy definition # into the global definitions policy = get_tokenclass_info(ttype, section='policy') # get all policy sections like: admin, user, enroll, auth, authz pol_keys = list(pol) for pol_section in policy.keys(): # if we have a dyn token definition of this section type # add this to this section - and make sure, that it is # then token type prefixed if pol_section in pol_keys: pol_entry = policy.get(pol_section) for pol_def in pol_entry: set_def = pol_def if pol_def.startswith(ttype) is not True: set_def = '{0!s}_{1!s}'.format(ttype, pol_def) pol[pol_section][set_def] = pol_entry.get(pol_def) # If the token class should provide specific PIN policies, now merge # PIN policies pin_scopes = get_tokenclass_info(ttype, section='pin_scopes') or [] for pin_scope in pin_scopes: pol[pin_scope][f'{ttype.lower()}_otp_pin_maxlength'] = { 'type': 'int', 'value': list(range(0, 32)), "desc": _("Set the maximum allowed PIN length of the {0!s} token.").format(ttype.upper()), 'group': GROUP.PIN } pol[pin_scope][f'{ttype.lower()}_otp_pin_minlength'] = { 'type': 'int', 'value': list(range(0, 32)), "desc": _("Set the minimum required PIN length of the {0!s} token.").format(ttype.upper()), 'group': GROUP.PIN } pol[pin_scope][f'{ttype.lower()}_otp_pin_contents'] = { 'type': 'str', "desc": _("Specify the required PIN contents of the {0!s} token. (c)haracters, (n)umeric, (s)pecial, " "(o)thers. [+/-]!").format(ttype.upper()), 'group': GROUP.PIN } # return subsection, if scope is defined # return empty dict for invalid scopes if scope: if scope not in pol: log.debug(f"Scope '{scope}' is not defined in the dynamic policy definitions.") pol = pol.get(scope, {}) return pol
[docs] def set_tokengroups(serial, tokengroups=None, add=False): """ Set a list of tokengroups for one token :param serial: The serial of the token :param tokengroups: The list of tokengroups (names) :param add: Whether the list of tokengropus should be added :return: """ tokengroups = tokengroups or [] tokenobject = get_one_token(serial=serial) tokenobject.set_tokengroups(tokengroups, add=add)
[docs] def assign_tokengroup(serial, tokengroup=None, tokengroup_id=None): """ Assign a new tokengroup to a token :param serial: The serial number of the token :param tokengroup: The name of the tokengroup :param tokengroup_id: alternatively the id of the tokengroup :return: True """ tokenobject = get_one_token(serial=serial) try: return tokenobject.add_tokengroup(tokengroup, tokengroup_id) except Exception: raise ResourceNotFoundError(_("The tokengroup does not exist."))
[docs] def unassign_tokengroup(serial, tokengroup=None, tokengroup_id=None): """ Removes a tokengroup from a token :param serial: The serial number of the token :param tokengroup: The name of the tokengroup :param tokengroup_id: alternatively the id of the tokengroup :return: True """ try: tokenobject = get_one_token(serial=serial) return tokenobject.delete_tokengroup(tokengroup, tokengroup_id) except Exception: raise ResourceNotFoundError(_("The tokengroup does not exist."))
[docs] def list_tokengroups(tokengroup=None): """ Return a list of tokens that are assigned to a certain tokengroup If no tokengroup is specified, all groups/tokens are returned. :param tokengroup. The name of the token group :return: """ tg = None session = db.session if tokengroup: stmt = select(Tokengroup).where(Tokengroup.name == tokengroup) tg = session.execute(stmt).scalar_one_or_none() if tg: stmt = select(TokenTokengroup).where(TokenTokengroup.tokengroup_id == tg.id) tgs = session.scalars(stmt).unique().all() else: stmt = select(TokenTokengroup) tgs = session.scalars(stmt).unique().all() return tgs
[docs] def challenge_text_replace(message, user, token_obj, additional_tags: dict = None): # TODO this function should be a token function since most of the info is from that token anyway, optionally pass # TODO environment stuff into that function serial = token_obj.token.serial if token_obj.token.serial else None tokenowner = user if user else None token_type = token_obj.token.tokentype if token_obj.token.tokentype else "" tags = create_tag_dict(serial=serial, tokenowner=tokenowner, tokentype=token_type) if additional_tags: tags.update(additional_tags) if token_type == "sms": if is_true(token_obj.get_tokeninfo("dynamic_phone")): phone = token_obj.user.get_user_phone("mobile") if isinstance(phone, list) and phone: # if there is a non-empty list, we use the first entry phone = phone[0] else: phone = token_obj.get_tokeninfo("phone") if phone: tags["phone"] = phone tags["phone_redacted"] = redacted_phone_number(phone) if token_type == "email": if is_true(TokenClass.get_tokeninfo(token_obj, "dynamic_email")): email = token_obj.user.get_specific_info([token_obj.EMAIL_ADDRESS_KEY]).get(token_obj.EMAIL_ADDRESS_KEY) if isinstance(email, list) and email: # If there is a non-empty list, we use the first entry email = email[0] else: email = TokenClass.get_tokeninfo(token_obj, token_obj.EMAIL_ADDRESS_KEY) if email: tags["email"] = email tags["email_redacted"] = redacted_email(email) # If the message is for a pushtoken and the presence_answer is set, but there is no tag for placing that answer, # Append the answer to the message presence_answer = tags.get("presence_answer", None) if presence_answer and token_type == "push" and "{presence_answer}" not in message: # PyBabel gettext and f-strings don't like each other message += _(" Please press: {presence_answer}".format(presence_answer=presence_answer)) message = message.format_map(defaultdict(str, tags)) return message
[docs] def regenerate_enroll_url(serial: str, request: Request, g) -> Union[str, None]: """ Returns the enroll URL for a token with the given serial number that is already enrolled. Loads the configurations from the policies. If the rollout state of a token is 'enrolled' None is returned. """ token = get_one_token(serial=serial) token_owner = token.user or User() request.User = token_owner if token_owner: request.all_data["user"] = token_owner.login request.all_data["realm"] = token_owner.realm request.all_data["resolver"] = token_owner.resolver request.all_data["serial"] = serial request.all_data["type"] = token.get_type() g.serial = serial # Get policies for the token # TODO: Refactor including original uses of these functions (decorators on token init endpoint) from privacyidea.api.lib.prepolicy import (pushtoken_add_config, tantoken_count, papertoken_count, init_tokenlabel) from privacyidea.api.lib.postpolicy import check_verify_enrollment try: pushtoken_add_config(request, None) tantoken_count(request, None) papertoken_count(request, None) init_tokenlabel(request, None) except PolicyError as ex: log.warning(f"{ex}") params = request.all_data params.update({"genkey": True, "rollover": True}) params["policies"] = g.policies token = init_token(params) enroll_url = token.get_enroll_url(token_owner, params) # Check post policies init_result = {token.get_serial(): {"type": token.get_type()}} init_result[token.get_serial()].update(token.get_init_detail(params, token_owner)) try: response = send_result(True, details=init_result[token.get_serial()]) check_verify_enrollment(request, response) except PolicyError as ex: log.warning(f"{ex}") return enroll_url
[docs] def export_tokens(tokens: list[TokenClass], export_user: bool = True) -> TokenExportResult: """ Export a list of tokens. """ success = [] failed = [] for token in tokens: try: exported = token.export_token(export_user=export_user) success.append(exported) except Exception as ex: log.error(f"Failed to export token {token.get_serial()}: {ex}") failed.append(token.get_serial()) return TokenExportResult(successful_tokens=success, failed_tokens=failed)
[docs] def import_tokens(tokens: list[dict], update_existing_tokens: bool = True, assign_to_user: bool = True) -> TokenImportResult: """ Import a list of token dictionaries. :param tokens: list of dict with token information :param update_existing_tokens: If True, existing tokens will be updated with the new data. :param assign_to_user: If True, the user from the token data will be assigned to the token. :return: list of token objects """ successful_tokens = [] updated_tokens = [] failed_tokens = [] for token_info_dict in tokens: serial = token_info_dict.get("serial") existing_token = get_one_token(serial=serial, silent_fail=True) # We check if there is no existing token or if we want to update existing tokens if not existing_token or update_existing_tokens: # We create a new token, if there is no existing token if not existing_token: try: token_type = token_info_dict.get("type") db_token = Token(serial, tokentype=token_type.lower()) db_token.save() token = create_tokenclass_object(db_token) except Exception as e: log.error(f"Could not create token {serial}: {e}") failed_tokens.append(serial) continue # We use the existing token and update it else: token = existing_token # Assign the user, if wanted and if there is a user in the token info dict if assign_to_user and token_info_dict.get("user"): try: owner = User(login=token_info_dict.get("user").get("login"), resolver=token_info_dict.get("user").get("resolver"), realm=token_info_dict.get("user").get("realm"), uid=token_info_dict.get("user").get("uid")) token.add_user(owner, override=True) except Exception as e: log.error(f"Could not assign user to token {serial}: {e}. " f"The token will not be imported.") failed_tokens.append(serial) token.delete_token() continue try: token.import_token(token_info_dict) except Exception as e: log.exception(f"Could not import token {serial}: {e}") failed_tokens.append(serial) token.delete_token() continue if not existing_token: successful_tokens.append(serial) else: updated_tokens.append(serial) else: log.info(f"Token with serial {serial} already exists.") failed_tokens.append(serial) return TokenImportResult(successful_tokens=successful_tokens, updated_tokens=updated_tokens, failed_tokens=failed_tokens)