Source code for privacyidea.lib.tokens.pushtoken

# SPDX-FileCopyrightText: 2026 NetKnights GmbH <https://netknights.it>
# SPDX-License-Identifier: AGPL-3.0-or-later
#
#  contact:  http://www.privacyidea.org
#
#  2019-02-08   Cornelius Kölbel <cornelius.koelbel@netknights.it>
#               Start the pushtoken class
#
# This code is free software; you can redistribute it and/or
# modify it under the terms of the GNU AFFERO GENERAL PUBLIC LICENSE
# License as published by the Free Software Foundation; either
# version 3 of the License, or any later version.
#
# This code is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU AFFERO GENERAL PUBLIC LICENSE for more details.
#
# You should have received a copy of the GNU Affero General Public
# License along with this program.  If not, see <http://www.gnu.org/licenses/>.
#
"""
The pushtoken sends a push notification via Firebase service to the registered smartphone.
The token is a challenge response token. The smartphone will sign the challenge
and send it back to the authentication endpoint.

This code is tested in tests/test_lib_tokens_push
"""

import logging
import random
import secrets
import string
import time
import traceback
from base64 import b32decode
from binascii import Error as BinasciiError
from datetime import datetime, timedelta, timezone
from typing import Any
from urllib.parse import quote

from cryptography.exceptions import InvalidSignature
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import padding
from dateutil.parser import isoparse

from privacyidea.api.lib.policyhelper import get_pushtoken_add_config, get_init_tokenlabel_parameters
from privacyidea.lib.params import get_optional, get_required
from privacyidea.lib import _, lazy_gettext
from privacyidea.lib.apps import _construct_extra_parameters
from privacyidea.lib.challenge import get_challenges
from privacyidea.lib.config import get_from_config
from privacyidea.lib.crypto import geturandom, generate_keypair
from privacyidea.lib.decorators import check_token_locked
from privacyidea.lib.error import ParameterError
from privacyidea.lib.error import (ResourceNotFoundError, ValidateError,
                                   PrivacyIDEAError, ConfigAdminError, PolicyError)
from privacyidea.lib.log import log_with
from privacyidea.lib.policies.actions import PolicyAction
from privacyidea.lib.policy import (SCOPE, GROUP, Match,
                                    get_action_values_from_options)
from privacyidea.lib.smsprovider.SMSProvider import get_smsgateway, create_sms_instance
from privacyidea.lib.token import get_one_token, init_token
from privacyidea.lib.tokenclass import (TokenClass, AuthenticationMode, ClientMode,
                                        RolloutState, ChallengeSession)
from privacyidea.lib.tokens.push_types import (PushMode, PushPresenceOptions,
                                               PushAction, PushAllowPolling,
                                               CODE_TO_PHONE_DISPLAY_CODE_LENGTH)
from privacyidea.lib.user import User
from privacyidea.lib.utils import create_img, b32encode_and_unicode
from privacyidea.lib.utils import prepare_result, to_bytes, is_true, create_tag_dict
from privacyidea.models import Challenge, Token, db

log = logging.getLogger(__name__)

DEFAULT_CHALLENGE_TEXT = lazy_gettext("Please confirm the authentication on your mobile device!")
ERROR_CHALLENGE_TEXT = lazy_gettext("Use the polling feature of your privacyIDEA Authenticator App"
                                    " to check for a new Login request.")
DEFAULT_MOBILE_TEXT = lazy_gettext("Do you want to confirm the login?")
DEFAULT_MOBILE_TEXT_CODE_TO_PHONE = lazy_gettext("Enter the code to log in")
PRIVATE_KEY_SERVER = "private_key_server"
PUBLIC_KEY_SERVER = "public_key_server"
PUBLIC_KEY_SMARTPHONE = "public_key_smartphone"
POLLING_ALLOWED = "polling_allowed"
GWTYPE = 'privacyidea.lib.smsprovider.FirebaseProvider.FirebaseProvider'
POLL_INTERVAL = 1.0

# Timedelta in minutes
POLL_TIME_WINDOW = 1
UPDATE_FB_TOKEN_WINDOW = 5
POLL_ONLY = "poll only"
AVAILABLE_PRESENCE_OPTIONS_ALPHABETIC = list(string.ascii_uppercase)
AVAILABLE_PRESENCE_OPTIONS_NUMERIC = [f'{x:02}' for x in range(100)]
ALLOWED_NUMBER_OF_OPTIONS = list(range(2, 11))
DEFAULT_NUMBER_OF_PRESENCE_OPTIONS = 3


def strip_pem_headers(key: str) -> str:
    """
    strip the headers and footers like
    -----BEGIN PUBLIC RSA KEY-----
    -----END PUBLIC KEY-----
    -----BEGIN PRIVATE RSA KEY-----
    as well as whitespace

    :param key: key as a string
    :return: stripped key
    """
    if key.startswith("-----BEGIN"):
        return "\n".join(key.strip().splitlines()[1:-1]).strip()
    else:
        return key.strip()


@log_with(log)
def create_push_token_url(url: str | None = None, ttl: int | str = 10, issuer: str = "privacyIDEA",
                          serial: str = "mylabel", tokenlabel: str = "<s>", user_obj: User | None = None,
                          extra_data: dict | None = None, user: str | None = None, realm: str | None = None,
                          pia_scheme: bool = False) -> str:
    """
    :param url:
    :param ttl:
    :param issuer:
    :param serial:
    :param tokenlabel:
    :param user_obj:
    :param extra_data:
    :param user:
    :param realm:
    :param pia_scheme: Use the privacyIDEA App URL scheme "pia"
    :return:
    """
    extra_data = extra_data or {}

    # policy depends on some lib.util

    user_obj = user_obj or User()

    # We need realm und user to be a string
    realm = realm or ""
    user = user or ""

    # Deprecated
    label = tokenlabel.replace("<s>",
                               serial).replace("<u>",
                                               user).replace("<r>", realm)
    user_info = user_obj.get_specific_info(["givenname", "surname"])
    label = label.format(serial=serial, user=user, realm=realm,
                         givenname=user_info.get("givenname", ""),
                         surname=user_info.get("surname", ""))

    issuer = issuer.format(serial=serial, user=user, realm=realm,
                           givenname=user_info.get("givenname", ""),
                           surname=user_info.get("surname", ""))

    url_label = quote(label.encode("utf-8"))
    url_issuer = quote(issuer.encode("utf-8"))
    url_url = quote(url.encode("utf-8"))

    scheme = "pia" if pia_scheme else "otpauth"

    token_url = (f"{scheme}://pipush/{url_label}?url={url_url}&ttl={ttl}&issuer={url_issuer}"
                 f"{_construct_extra_parameters(extra_data)}")

    return token_url


def _get_presence_options(options) -> list:
    """
    Get the available presence options for the user to confirm the login based on the push policy configurations.

    :param options: the request context parameters / data
    :type options: dict
    :return: The list of available presence characters/numbers
    """
    try:
        push_presence_option = PushPresenceOptions(get_action_values_from_options(
            SCOPE.AUTH, PushAction.PRESENCE_OPTIONS, options))
    except ValueError:
        push_presence_option = PushPresenceOptions.ALPHABETIC

    if push_presence_option == PushPresenceOptions.NUMERIC:
        available_presence_options = list(AVAILABLE_PRESENCE_OPTIONS_NUMERIC)
    elif push_presence_option == PushPresenceOptions.CUSTOM:
        custom_presence_options = get_action_values_from_options(
            SCOPE.AUTH, PushAction.PRESENCE_CUSTOM_OPTIONS, options)
        available_presence_options = custom_presence_options.split(":")
    # Default push_presence_option is "ALPHABETIC":
    else:
        available_presence_options = list(AVAILABLE_PRESENCE_OPTIONS_ALPHABETIC)
    return available_presence_options


def _build_smartphone_data(token: TokenClass, challenge: str, registration_url: str, private_key_pem: str,
                           options: dict, presence_options: list = None) -> dict:
    """
    Create the dictionary to be sent to the smartphone as challenge

    :param token: The token object for which to create the smartphone data
    :type token: A tokenclass object
    :param challenge: base32 encoded random data string
    :type challenge: str
    :param registration_url: The privacyIDEA URL, to which the Push token communicates
    :type registration_url: str
    :param options: the options dictionary
    :type options: dict
    :param presence_options: Require the user to confirm with the correct button from the list of options.
    :type presence_options: list
    :return: the created smartphone_data dictionary
    :rtype: dict
    """
    sslverify = get_action_values_from_options(SCOPE.AUTH, PushAction.SSL_VERIFY,
                                               options) or "1"
    if sslverify not in ["0", "1"]:
        sslverify = "1"
    default_message = str(DEFAULT_MOBILE_TEXT)

    message_on_mobile = get_action_values_from_options(SCOPE.AUTH,
                                                       PushAction.MOBILE_TEXT,
                                                       options) or default_message
    # Get the request object
    _g = options.get("g", {})
    req_headers = None
    request = None
    if hasattr(_g, "request_headers"):
        req_headers = _g.request_headers
    if req_headers:
        req_environment = req_headers.environ
        request = req_environment.get("werkzeug.request")
    if request:
        user = request.User
    else:
        # Get owner from token object
        try:
            user = token.user
        except Exception:
            user = None

    tags = create_tag_dict(serial=token.token.serial,
                           request=request,
                           client_ip=options.get("clientip"),
                           tokenowner=user,
                           tokentype="push",
                           recipient={"givenname": user.info.get("givenname") if user else "",
                                      "surname": user.info.get("surname") if user else ""},
                           challenge=options.get("challenge"))
    try:
        message_on_mobile = message_on_mobile.format(**tags)
    except KeyError as e:
        log.warning(f"Could not format the message: {e}. Using default message.")
        message_on_mobile = default_message
    log.debug(f"Sending to mobile: {message_on_mobile}")

    title = get_action_values_from_options(SCOPE.AUTH, PushAction.MOBILE_TITLE, options) or "privacyIDEA"
    smartphone_data = {
        "nonce": challenge,
        "question": message_on_mobile,
        "serial": token.token.serial,
        "title": title,
        "sslverify": sslverify,
        "url": registration_url
    }

    # Create the signature.
    sign_string = "{nonce}|{url}|{serial}|{question}|{title}|{sslverify}"
    if presence_options is not None:
        smartphone_data["require_presence"] = ",".join(presence_options)
        smartphone_data["version"] = "2"
        sign_string += f"|{smartphone_data['require_presence']}"

    sign_string = sign_string.format(**smartphone_data)
    # Since the private key is generated by privacyIDEA and only stored
    # encrypted in the database, we can disable the costly key check here
    private_key = serialization.load_pem_private_key(to_bytes(private_key_pem),
                                                     None, default_backend(),
                                                     unsafe_skip_rsa_key_validation=True)

    # Sign the data with PKCS1 padding. Not all Androids support PSS padding.
    signature = private_key.sign(sign_string.encode("utf8"),
                                 padding.PKCS1v15(),
                                 hashes.SHA256())
    smartphone_data["signature"] = b32encode_and_unicode(signature)
    return smartphone_data


def _load_public_key(pubkey_pem: str) -> Any:
    """
    Load the given stripped and urlsafe public key and return the verify object

    :param pubkey_pem:
    :return:
    """
    # The public key of the smartphone was probably sent as urlsafe:
    pubkey_pem = pubkey_pem.replace("-", "+").replace("_", "/")
    # The public key was sent without any header
    pubkey_pem = "-----BEGIN PUBLIC KEY-----\n" \
                 f"{pubkey_pem.strip().replace(' ', '+')}\n" \
                 "-----END PUBLIC KEY-----"

    return serialization.load_pem_public_key(to_bytes(pubkey_pem), default_backend())


[docs] class PushTokenClass(TokenClass): """ The :ref:`push_token` uses the Firebase service to send challenges to the user's smartphone. The user confirms on the smartphone, signs the challenge and sends it back to privacyIDEA. The enrollment occurs in two enrollment steps: **Step 1**: The device is enrolled using a QR code, which encodes the following URI:: otpauth://pipush/PIPU0006EF85?url=https://yourprivacyideaserver/enroll/this/token&ttl=120 **Step 2**: In the QR code is a URL, where the smartphone sends the remaining data for the enrollment: .. sourcecode:: http POST /ttype/push HTTP/1.1 Host: https://yourprivacyideaserver/ enrollment_credential=<hex nonce> serial=<token serial> fbtoken=<Firebase token> pubkey=<public key> For more information see: - https://github.com/privacyidea/privacyidea/issues/1342 - https://github.com/privacyidea/privacyidea/wiki/concept%3A-PushToken """ mode = [AuthenticationMode.AUTHENTICATE, AuthenticationMode.CHALLENGE, AuthenticationMode.OUTOFBAND] client_mode = ClientMode.POLL def __init__(self, db_token: Token): TokenClass.__init__(self, db_token) self.set_type("push") self.hKeyRequired = False
[docs] @staticmethod def get_class_type() -> str: """ return the generic token class identifier """ return "push"
[docs] @staticmethod def get_class_prefix() -> str: return "PIPU"
[docs] @staticmethod def get_class_info(key: str = None, ret: str = 'all') -> Any: """ returns all or a subtree of the token definition :param key: subsection identifier :type key: str :param ret: default return value, if nothing is found :type ret: user defined :return: subsection if key exists or user defined :rtype: dict """ gws = get_smsgateway(gwtype=GWTYPE) res = {'type': 'push', 'title': _('PUSH Token'), 'description': _('PUSH: Send a push notification to a smartphone.'), 'user': ['enroll'], # This tokentype is enrollable in the UI for... 'ui_enroll': ["admin", "user"], 'policy': { SCOPE.ENROLL: { PushAction.FIREBASE_CONFIG: { 'type': 'str', 'desc': _('The configuration of your Firebase application.'), 'group': "PUSH", 'value': [POLL_ONLY] + [gw.identifier for gw in gws] }, PushAction.REGISTRATION_URL: { "required": True, 'type': 'str', 'group': "PUSH", 'desc': _('The URL the Push App should contact in the second enrollment step.' ' Usually it is the endpoint /ttype/push of the privacyIDEA server.') }, PushAction.TTL: { 'type': 'int', 'group': "PUSH", 'desc': _('The second enrollment step must be completed within this time (in minutes).') }, PushAction.SSL_VERIFY: { 'type': 'str', 'desc': _('The smartphone needs to verify SSL during the enrollment. (default 1)'), 'group': "PUSH", 'value': ["0", "1"] }, PolicyAction.MAXTOKENUSER: { 'type': 'int', 'desc': _("The user may only have this maximum number of Push tokens assigned."), 'group': GROUP.TOKEN }, PolicyAction.MAXACTIVETOKENUSER: { 'type': 'int', 'desc': _("The user may only have this maximum number of active Push tokens assigned."), 'group': GROUP.TOKEN }, 'push_' + PolicyAction.FORCE_APP_PIN: { 'type': 'bool', 'group': "PUSH", 'desc': _('Require to unlock the Smartphone before Push requests can be accepted') }, 'push_' + PolicyAction.APP_FORCE_UNLOCK: { 'type': 'str', 'value': ["any", "biometric", "pin"], 'desc': _('Enforces the privacyIDEA Authenticator App that the token has to ' 'be unlocked with pin or biometric. This needs the privacyIDEA ' 'Authenticator app 4.6.1 or higher.') }, PushAction.USE_PIA_SCHEME: { 'type': 'bool', 'desc': _("Use the privacyIDEA app URL scheme 'pia' in the enroll URL for push tokens " "to open the privacyIDEA app."), 'group': "PUSH", } }, SCOPE.AUTH: { PushAction.MOBILE_TEXT: { 'type': 'str', 'desc': _( 'The question the user sees on his mobile phone. Several tags like {serial} and ' '{client_ip} can be used as parameters.'), 'group': 'PUSH' }, PushAction.MOBILE_TITLE: { 'type': 'str', 'desc': _('The title of the notification, the user sees on his mobile phone.'), 'group': 'PUSH' }, PushAction.SSL_VERIFY: { 'type': 'str', 'desc': _('The smartphone needs to verify SSL during authentication. (default 1)'), 'group': "PUSH", 'value': ["0", "1"] }, PushAction.REQUIRE_PRESENCE: { 'type': 'bool', 'desc': _('Require the user to confirm the login with a presence check.'), 'group': 'PUSH' }, PushAction.PUSH_CODE_TO_PHONE: { 'type': 'bool', 'desc': _('Require the user to confirm the login with an OTP that is received in the' ' smartphone app. push_require_presence has precedence over this. ' 'It is not compatible with push_wait.'), 'group': 'PUSH' }, PushAction.PUSH_CODE_TO_PHONE_MESSAGE: { 'type': 'str', 'desc': _('The message that is shown above the code on the smartphone.'), 'group': 'PUSH' }, PushAction.PRESENCE_OPTIONS: { 'type': 'str', 'desc': _('The options that can be presented to the user to confirm the login. ' '<code>ALPHABETIC</code>: A-Z, <code>NUMERIC</code>: 01-99, ' '<code>CUSTOM</code>: user defined. ' 'Does only apply if <em>{0!s}</em> is set.').format( PushAction.REQUIRE_PRESENCE), 'group': 'PUSH', 'value': [x for x in PushPresenceOptions.__members__], }, PushAction.PRESENCE_CUSTOM_OPTIONS: { 'type': 'str', 'desc': _('Custom options that can be presented to the user to confirm the login. ' 'The string must contain at least 2 options and should be unique. ' 'The options are separated by <code>:</code>. ' 'e.g.: <code>01:02:03:1A:1B:1C</code>. ' 'Does only apply if <em>{0!s}</em> is set ' 'to <code>CUSTOM</code>.').format(PushAction.PRESENCE_OPTIONS), 'group': 'PUSH' }, PushAction.PRESENCE_NUM_OPTIONS: { 'type': 'int', 'desc': _('The number of options the user is presented with to confirm the login. ' 'Does only apply if <em>{0!s}</em> is set.').format( PushAction.REQUIRE_PRESENCE), 'group': 'PUSH', 'value': ALLOWED_NUMBER_OF_OPTIONS }, PushAction.WAIT: { 'type': 'int', 'desc': _('Wait for number of seconds for the user to confirm the challenge in the ' 'first request.'), 'group': 'PUSH' }, PushAction.ALLOW_POLLING: { 'type': 'str', 'desc': _('Configure whether to allow push tokens to poll for challenges'), 'group': 'PUSH', 'value': [PushAllowPolling.ALLOW, PushAllowPolling.DENY, PushAllowPolling.TOKEN], 'default': PushAllowPolling.ALLOW } } }, } if key: ret = res.get(key, {}) else: if ret == 'all': ret = res return ret
[docs] @log_with(log) def use_for_authentication(self, options: dict) -> bool: # A disabled PUSH token has to be removed from the list of checked tokens. return self.is_active()
[docs] @log_with(log) def update(self, param: dict, reset_failcount: bool = True) -> None: """ process the initialization parameters We need to distinguish the first authentication step and the second authentication step. 1. step: ``param`` contains: - ``type`` - ``genkey`` 2. step: ``param`` contains: - ``serial`` - ``fbtoken`` - ``pubkey`` :param param: dict of initialization parameters :type param: dict :return: nothing """ upd_param = {} for k, v in param.items(): upd_param[k] = v if "serial" in upd_param and "fbtoken" in upd_param and "pubkey" in upd_param: # We are in step 2: if self.token.rollout_state != RolloutState.CLIENTWAIT: raise ParameterError("Invalid state! The token you want to enroll is not in the state 'clientwait'.") enrollment_credential = get_required(upd_param, "enrollment_credential") if enrollment_credential != self.get_tokeninfo("enrollment_credential"): raise ParameterError("Invalid enrollment credential. You are not authorized to finalize this token.") self.delete_tokeninfo("enrollment_credential") self.token.rollout_state = "enrolled" self.token.active = True self.add_tokeninfo(PUBLIC_KEY_SMARTPHONE, upd_param.get("pubkey")) self.add_tokeninfo("firebase_token", upd_param.get("fbtoken")) # create a keypair for the server side. pub_key, priv_key = generate_keypair(4096) self.add_tokeninfo(PUBLIC_KEY_SERVER, pub_key) self.add_tokeninfo(PRIVATE_KEY_SERVER, priv_key, "password") elif "genkey" in upd_param: # We are in step 1: upd_param["2stepinit"] = 1 self.add_tokeninfo("enrollment_credential", geturandom(20, hex=True)) # We also store the Firebase config, that was used during the enrollment. self.add_tokeninfo(PushAction.FIREBASE_CONFIG, param.get(PushAction.FIREBASE_CONFIG)) else: raise ParameterError("Invalid Parameters. Either provide (genkey) or (serial, fbtoken, pubkey).") TokenClass.update(self, upd_param, reset_failcount)
[docs] @log_with(log) def get_init_detail(self, params: dict = None, user: User = None) -> dict: """ This returns the init details during enrollment. In the 1st step the QR Code is returned. """ response_detail = TokenClass.get_init_detail(self, params, user) if "otpkey" in response_detail: del response_detail["otpkey"] params = params or {} policy_params = params.get("policies", {}) user = user or User() tokenlabel = params.get("tokenlabel", "<s>") tokenissuer = params.get("tokenissuer", "privacyIDEA") sslverify = get_optional(policy_params, PushAction.SSL_VERIFY, default="1") if sslverify not in ["0", "1"]: sslverify = "1" # Add rollout state the response response_detail['rollout_state'] = self.token.rollout_state extra_data = {"enrollment_credential": self.get_tokeninfo("enrollment_credential")} imageurl = params.get("appimageurl") if imageurl: extra_data.update({"image": imageurl}) if self.token.rollout_state == RolloutState.CLIENTWAIT: # Get enrollment values from the policy registration_url = get_required(policy_params, PushAction.REGISTRATION_URL) ttl = policy_params.get(PushAction.TTL, "10") # Get the values from the configured PUSH config fb_identifier = policy_params.get(PushAction.FIREBASE_CONFIG) if fb_identifier != POLL_ONLY: # If do not do poll_only, then we load all the Firebase configuration firebase_configs = get_smsgateway(identifier=fb_identifier, gwtype=GWTYPE) if len(firebase_configs) != 1: raise ParameterError("Unknown Firebase configuration!") # this allows to upgrade our crypto extra_data["v"] = 1 extra_data["serial"] = self.get_serial() extra_data["sslverify"] = sslverify extra_data["poll_only"] = fb_identifier == POLL_ONLY # enforce App pin if params.get(PolicyAction.FORCE_APP_PIN): extra_data.update({'pin': True}) if params.get(PolicyAction.APP_FORCE_UNLOCK): extra_data.update({'app_force_unlock': params.get(PolicyAction.APP_FORCE_UNLOCK)}) # Get scheme to use pia_scheme = policy_params.get(PushAction.USE_PIA_SCHEME, False) # We display this during the first enrollment step! qr_url = create_push_token_url(url=registration_url, user=user.login, realm=user.realm, serial=self.get_serial(), tokenlabel=tokenlabel, issuer=tokenissuer, user_obj=user, extra_data=extra_data, ttl=ttl, pia_scheme=pia_scheme) response_detail["pushurl"] = {"description": _("URL for privacyIDEA Push Token"), "value": qr_url, "img": create_img(qr_url) } response_detail["enrollment_credential"] = self.get_tokeninfo("enrollment_credential") elif self.token.rollout_state == RolloutState.ENROLLED: # in the second enrollment step we return the public key of the server to the smartphone. pubkey = strip_pem_headers(self.get_tokeninfo(PUBLIC_KEY_SERVER)) response_detail["public_key"] = pubkey return response_detail
@staticmethod def _check_timestamp_in_range(timestamp: str, window: int) -> None: """ Check if the timestamp is a valid timestamp and if it matches the time window. If the check fails a privacyIDEA error is thrown. :param timestamp: A timestamp in iso format, either with a timezone or UTC is assumed :type timestamp: str :param window: Time window in minutes. The timestamp must lie within the range of -window to +window of the current time. :type window: int """ try: ts = isoparse(timestamp) except (ValueError, TypeError) as _e: log.debug(f'{traceback.format_exc()}') raise PrivacyIDEAError(f'Could not parse timestamp {timestamp}. ISO-Format required.') td = timedelta(minutes=window) # We don't know if the passed timestamp is timezone aware. If no # timezone is passed, we assume UTC if ts.tzinfo: # We consider the timezone of the given timestamp now = datetime.now(ts.tzinfo) else: # If a timestamp without timezone is given, we assume it is UTC now = datetime.now(timezone.utc) # We need to add the timezone UTC to the naive timestamp ts = ts.replace(tzinfo=timezone.utc) if not (now - td <= ts <= now + td): raise PrivacyIDEAError(f'Timestamp {timestamp} not in valid range.') @classmethod def _handle_enrollment_step2(cls, serial: str, request_data: dict) -> tuple[bool, dict]: log.debug("Do the 2nd step of the enrollment.") try: token = get_one_token(serial=serial, tokentype="push", rollout_state=RolloutState.CLIENTWAIT) token.update(request_data) # in case of validate/check enrollment challenges = get_challenges(serial=serial) if (challenges and challenges[0].is_valid() and challenges[0].get_session() == ChallengeSession.ENROLLMENT): challenges[0].set_otp_status(True) challenges[0].save() except ResourceNotFoundError: raise ResourceNotFoundError("No token with this serial number in the rollout state 'clientwait'.") init_detail_dict = request_data details = token.get_init_detail(init_detail_dict) return True, details @classmethod def _handle_auth_response(cls, serial: str, request_data: dict) -> tuple[bool, dict]: log.debug("Handling the authentication response from the smartphone.") signature = get_optional(request_data, "signature") decline = is_true(get_optional(request_data, "decline", default=False)) presence_answer = get_optional(request_data, "presence_answer") token = get_one_token(serial=serial, tokentype="push") public_key = _load_public_key(token.get_tokeninfo(PUBLIC_KEY_SMARTPHONE)) challenges = get_challenges(serial=serial) result = False details = {} if challenges: # There are valid challenges, so we check this signature for challenge in challenges: # Re-construct the signature data and then verify the signature sign_data = f"{challenge.challenge}|{serial}" if decline: sign_data += "|decline" if presence_answer: sign_data += f"|{presence_answer}" try: public_key.verify(b32decode(signature), sign_data.encode("utf8"), padding.PKCS1v15(), hashes.SHA256()) log.debug(f"Found matching challenge {challenge}.") result = True if decline: challenge.set_session(ChallengeSession.DECLINED) else: # Verify the presence_answer which is stored in the challenge data (json). # Legacy format: the correct choice is stored as the last entry in the data (str)separated by # a comma. Make sure that the presence_answer is given if it is set in the challenge so that # a response with a valid signature but no presence_answer does not pass! challenge_data = challenge.get_data() if (isinstance(challenge_data, dict) and challenge_data.get("mode") == PushMode.REQUIRE_PRESENCE and presence_answer): correct_answer = challenge_data.get("correct_answer") if presence_answer != correct_answer: log.debug(f"Challenge data ({challenge_data}) does not match " f"given presence answer ({presence_answer})!") result = False else: # Presence answer matches, mark the challenge as answered challenge.set_otp_status(True) elif isinstance(challenge_data, str) and presence_answer: # Legacy handling if presence_answer != challenge_data.split(",").pop(): log.debug(f"Challenge data ({challenge_data}) does not match " f"given presence answer ({presence_answer})!") result = False else: challenge.set_otp_status(True) # Check if presence_answer is missing, but it is required elif (isinstance(challenge_data, dict) and challenge_data.get("mode") == PushMode.REQUIRE_PRESENCE and not presence_answer): log.warning("'push_require_presence' Policy is set but the presence answer " "is not present in the smartphone request!") result = False # Code to Phone: smartphone has confirmed, now generate a number to show and # save it to the challenge. elif (isinstance(challenge_data, dict) and challenge_data.get( "mode") == PushMode.CODE_TO_PHONE): display_code = "".join([str(secrets.randbelow(10)) for _ in range(CODE_TO_PHONE_DISPLAY_CODE_LENGTH)]) challenge_data["smartphone_confirmed"] = True challenge_data["display_code"] = display_code challenge.set_data(challenge_data) # Do NOT mark otp_valid yet; the client still needs to send the display_code. details["display_code"] = display_code details["message"] = request_data.get(PushAction.PUSH_CODE_TO_PHONE_MESSAGE) or str( DEFAULT_MOBILE_TEXT_CODE_TO_PHONE) else: challenge.set_otp_status(True) challenge.save() except InvalidSignature as _e: pass return result, details @classmethod def _handle_firebase_update(cls, serial: str, request_data: dict) -> tuple[bool, dict]: log.debug("Updating the firebase token of the smartphone.") timestamp = get_required(request_data, 'timestamp') signature = get_required(request_data, 'signature') # First, check if the timestamp is in the required span cls._check_timestamp_in_range(timestamp, UPDATE_FB_TOKEN_WINDOW) try: token = get_one_token(serial=serial, tokentype=cls.get_class_type()) public_key = _load_public_key(token.get_tokeninfo(PUBLIC_KEY_SMARTPHONE)) sign_data = "{new_fb_token}|{serial}|{timestamp}".format(**request_data) public_key.verify(b32decode(signature), sign_data.encode("utf8"), padding.PKCS1v15(), hashes.SHA256()) # If the timestamp and signature are valid, we update the token token.add_tokeninfo('firebase_token', request_data['new_fb_token']) return True, {} except (ResourceNotFoundError, ParameterError, TypeError, InvalidSignature, ConfigAdminError, BinasciiError) as e: # To avoid disclosing information, we always fail with an invalid # signature error, even if the token with the serial could not be found log.debug(f'{traceback.format_exc()}') log.info(f'The following error occurred during the signature check: "{e}"') raise PrivacyIDEAError('Could not verify signature!') @classmethod def _api_endpoint_post(cls, g, request_data: dict) -> tuple[bool, dict]: """ Handle all POST requests to the api endpoint :param request_data: Dictionary containing the parameters of the request :type request_data: dict :returns: The result of handling the request and a dictionary containing the details of the request handling :rtype: (bool, dict) """ serial = get_required(request_data, "serial") if all(k in request_data for k in ("fbtoken", "pubkey")): return cls._handle_enrollment_step2(serial, request_data) elif "signature" in request_data and "new_fb_token" not in request_data: return cls._handle_auth_response(serial, request_data) elif all(k in request_data for k in ('new_fb_token', 'timestamp', 'signature')): return cls._handle_firebase_update(serial, request_data) else: raise ParameterError("Missing parameters!") def _get_existing_challenge_data(self, transaction_id: str, push_mode: PushMode) -> dict | None: """ Load challenge data either in the current format (json/dict) or in the legacy format, which was a string that was used for require_presence. """ challenges = get_challenges(transaction_id=transaction_id) for c in challenges: c_data = c.get_data() if (isinstance(c_data, dict) and c_data.get("type", "") == "push" and c_data.get("mode") == push_mode): return c_data elif isinstance(c_data, str) and c.serial.startswith("PIPU") and push_mode == PushMode.REQUIRE_PRESENCE: # Challenge data is string => legacy handling for presence split_presence_options = c_data.split(",") return { "mode": PushMode.REQUIRE_PRESENCE, "options": split_presence_options[:-1], "correct_answer": split_presence_options[-1] } return None def _handle_presence_challenge(self, options: dict, transaction_id: str) -> tuple[dict, list, str]: current_presence_options = [] correct_presence_option = "" if options.get("push_triggered"): c_data = self._get_existing_challenge_data(transaction_id, PushMode.REQUIRE_PRESENCE) if c_data: current_presence_options = c_data.get("options") correct_presence_option = c_data.get("correct_answer") if not current_presence_options: available_presence_options = _get_presence_options(options) num_option = int(get_action_values_from_options( SCOPE.AUTH, PushAction.PRESENCE_NUM_OPTIONS, options) or DEFAULT_NUMBER_OF_PRESENCE_OPTIONS) num_option = (num_option if num_option in ALLOWED_NUMBER_OF_OPTIONS else DEFAULT_NUMBER_OF_PRESENCE_OPTIONS) if num_option > len(available_presence_options): log.warning(f"The required number of presence options exceeds " f"the number of available presence options ({num_option} " f"!= {len(available_presence_options)})") num_option = len(available_presence_options) current_presence_options = random.sample(available_presence_options, num_option) correct_presence_option = secrets.choice(current_presence_options) data = { "type": "push", "mode": PushMode.REQUIRE_PRESENCE, "options": current_presence_options, "correct_answer": correct_presence_option } return data, current_presence_options, correct_presence_option @classmethod def _api_endpoint_get(cls, g: Any, request_data: dict) -> list: """ Handle all GET requests to the api endpoint. Currently, this is only used for polling. :param g: The Flask context :param request_data: Dictionary containing the parameters of the request :type request_data: dict :returns: Result of the polling operation, 'True' if an unanswered and matching challenge exists, 'False' otherwise. :rtype: bool """ # By default, we allow polling if the policy is not set. allow_polling = get_action_values_from_options( SCOPE.AUTH, PushAction.ALLOW_POLLING, options={'g': g}) or PushAllowPolling.ALLOW if allow_polling == PushAllowPolling.DENY: raise PolicyError('Polling not allowed!') serial = get_required(request_data, "serial") timestamp = get_required(request_data, 'timestamp') signature = get_required(request_data, 'signature') # first check if the timestamp is in the required span cls._check_timestamp_in_range(timestamp, POLL_TIME_WINDOW) # now check the signature # first get the token try: token = get_one_token(serial=serial, tokentype=cls.get_class_type()) # If the push_allow_polling policy is set to "token" we also # need to check the POLLING_ALLOWED tokeninfo. If it evaluated # to 'False', polling is not allowed for this token. If the # tokeninfo value evaluates to 'True' or is not set at all, # polling is allowed for this token. if allow_polling == PushAllowPolling.TOKEN: if not is_true(token.get_tokeninfo(POLLING_ALLOWED, default='True')): log.debug(f'Polling not allowed for pushtoken {serial} due to tokeninfo.') raise PolicyError('Polling not allowed!') public_key = _load_public_key(token.get_tokeninfo(PUBLIC_KEY_SMARTPHONE)) sign_data = "{serial}|{timestamp}".format(**request_data) public_key.verify(b32decode(signature), sign_data.encode("utf8"), padding.PKCS1v15(), hashes.SHA256()) # The signature was valid now check for an open challenge # we need the private server key to sign the smartphone data private_key = token.get_tokeninfo(PRIVATE_KEY_SERVER) # We need the registration URL for the challenge registration_url = get_action_values_from_options(SCOPE.ENROLL, PushAction.REGISTRATION_URL, options={'g': g}) if not registration_url: raise ResourceNotFoundError('There is no registration_url defined for the ' f' pushtoken {serial}. You need to define a push_registration_url ' 'in an enrollment policy.') options = {"g": g} open_challenges = [] db_challenges = get_challenges(serial=serial) for challenge in db_challenges: if challenge.get_session() == ChallengeSession.DECLINED: continue # check if the challenge is active and not already answered _, answered = challenge.get_otp_status() if not answered and challenge.is_valid(): # Check if the challenge has mode set to require_presence or code_to_phone. This will # change what we need to return to the smartphone. challenge_data = challenge.get_data() presence_options = None if isinstance(challenge_data, dict): if challenge_data.get("mode") == PushMode.REQUIRE_PRESENCE: presence_options = challenge_data.get("options") elif challenge_data.get("mode") == PushMode.CODE_TO_PHONE: if challenge_data.get("smartphone_confirmed"): # Smartphone already confirmed this challenge, skip it continue # code_to_phone step 1: present as standard challenge for the smartphone # to confirm. No display_code is sent. elif isinstance(challenge_data, str) and challenge_data: # Legacy handling, when require_presence was a string of options with the correct one at the end presence_options = challenge_data.split(",")[:-1] # then return the necessary smartphone data to answer the challenge smartphone_data = _build_smartphone_data(token, challenge.challenge, registration_url, private_key, options, presence_options) open_challenges.append(smartphone_data) # return the challenges as a list in the result value result = open_challenges except (ResourceNotFoundError, ParameterError, InvalidSignature, ConfigAdminError, BinasciiError) as e: # to avoid disclosing information we always fail with an invalid # signature error even if the token with the serial could not be found log.debug(f'{traceback.format_exc()}') log.info(f'The following error occurred during the signature check: "{e}"') raise PrivacyIDEAError('Could not verify signature!') return result
[docs] @classmethod def api_endpoint(cls, request: Any, g: Any) -> tuple[str, str]: """ This provides a function which is called by the API endpoint ``/ttype/push`` which is defined in :doc:`../../api/ttype` The method returns a tuple ``("json", {})`` This endpoint provides several functionalities: - It is used for the 2nd enrollment step of the smartphone. It accepts the following parameters: .. sourcecode:: http POST /ttype/push HTTP/1.1 Host: https://yourprivacyideaserver serial=<token serial> fbtoken=<Firebase token> pubkey=<public key> - It is also used when the smartphone sends the signed response to the challenge during authentication. The following parameters are accepted: .. sourcecode:: http POST /ttype/push HTTP/1.1 Host: https://yourprivacyideaserver serial=<token serial> signature=<signature over {challenge}|{serial}> - The smartphone can also decline the authentication request, by sending a response to the server: .. sourcecode:: http POST /ttype/push HTTP/1.1 Host: https://yourprivacyideaserver serial=<token serial> decline=1 signature=<signature over {challenge}|{serial}|decline - In some cases the Firebase service changes the token of a device. This needs to be communicated to privacyIDEA through this endpoint (https://github.com/privacyidea/privacyidea/wiki/concept%3A-pushtoken-poll#update -firebase-token): .. sourcecode:: http POST /ttype/push HTTP/1.1 Host: https://yourprivacyideaserver new_fb_token=<new Firebase token> serial=<token serial> timestamp=<timestamp> signature=SIGNATURE(<new_fb_token>|<serial>|<timestamp>) - And it also acts as an endpoint for polling challenges: .. sourcecode:: http GET /ttype/push HTTP/1.1 Host: https://yourprivacyideaserver serial=<tokenserial> timestamp=<timestamp> signature=SIGNATURE(<tokenserial>|<timestamp>) More on polling can be found here: https://github.com/privacyidea/privacyidea/wiki/concept%3A-pushtoken-poll :param request: The Flask request :param g: The Flask global object g :return: The json string representing the result dictionary :rtype: tuple("json", str) """ details = {} if request.method == 'POST': result, details = cls._api_endpoint_post(g, request.all_data) elif request.method == 'GET': result = cls._api_endpoint_get(g, request.all_data) else: raise PrivacyIDEAError(f'Method {request.method} not allowed in \'api_endpoint\' for push token.') return "json", prepare_result(result, details=details)
[docs] @log_with(log, hide_args=[1]) def is_challenge_request(self, passw: str, user: User = None, options: dict = None) -> bool: """ check, if the request would start a challenge We need to define the function again, to get rid of the is_challenge_request-decorator of the base class :param passw: password, which might be the pin or pin+otp :param user: the user object :param options: dictionary of additional request parameters :return: returns true or false """ if options.get(PushAction.WAIT): # We have a push_wait in the parameters return False return self.check_pin(passw, user=user, options=options)
[docs] def create_challenge(self, transactionid: str = None, options: dict = None) -> tuple[bool, str, str, dict]: """ This method creates a challenge, which is submitted to the user. The submitted challenge will be preserved in the challenge database. If no transaction id is given, the system will create a transaction id and return it, so that the response can refer to this transaction. :param transactionid: the id of this challenge :param options: the request context parameters / data :type options: dict :return: tuple of (bool, message, transactionid, attributes) :rtype: tuple The return tuple builds up like this: ``bool`` if submit was successful; ``message`` which is displayed in the JSON response; additional challenge ``reply_dict``, which is displayed in the JSON challenges response. """ options = options or {} message = get_action_values_from_options(SCOPE.AUTH, PolicyAction.CHALLENGETEXT, options) or str(DEFAULT_CHALLENGE_TEXT) message = message.replace(r'\,', ',') # Determine if require presence is enabled g = options.get("g") require_presence = Match.user(g, scope=SCOPE.AUTH, action=PushAction.REQUIRE_PRESENCE, user_object=options.get("user")).any() code_to_phone_enabled = Match.user(g, scope=SCOPE.AUTH, action=PushAction.PUSH_CODE_TO_PHONE, user_object=options.get("user")).any() data = {"type": "push", "mode": PushMode.STANDARD} current_presence_options = None reply_dict = {} client_mode = self.client_mode # Handle require_presence or code_to_phone if enabled if is_true(require_presence) and not options.get(PushAction.WAIT): data, current_presence_options, correct_presence_option = self._handle_presence_challenge(options, transactionid) reply_dict.update({"presence_answer": correct_presence_option}) elif is_true(require_presence) and options.get(PushAction.WAIT): log.warning("Unable to use 'require_presence' policy with 'push_wait'. " "Disabling 'require_presence' policy!") elif is_true(code_to_phone_enabled) and not options.get(PushAction.WAIT): if options.get("push_triggered"): data = self._get_existing_challenge_data(transactionid, PushMode.CODE_TO_PHONE) else: data = { "type": "push", "mode": PushMode.CODE_TO_PHONE, "smartphone_confirmed": False, } message = _("Please enter the code displayed on your smartphone.") client_mode = ClientMode.INTERACTIVE # Initially we assume there is no error from Firebase res = True fb_identifier = self.get_tokeninfo(PushAction.FIREBASE_CONFIG) if fb_identifier: challenge = b32encode_and_unicode(geturandom()) if options.get("session") != ChallengeSession.ENROLLMENT: if fb_identifier != POLL_ONLY: # We only push to Firebase if this token is NOT POLL_ONLY. fb_gateway = create_sms_instance(fb_identifier) registration_url = get_action_values_from_options( SCOPE.ENROLL, PushAction.REGISTRATION_URL, options=options) private_key_pem = self.get_tokeninfo(PRIVATE_KEY_SERVER) smartphone_data = _build_smartphone_data(self, challenge, registration_url, private_key_pem, options, current_presence_options) log.debug(f"Sending to firebase the smartphone_data: {smartphone_data}") res = fb_gateway.submit_message(self.get_tokeninfo("firebase_token"), smartphone_data) # Create the challenge in the challenge table if either the message # was successfully submitted to the Firebase API or if polling is # allowed in general or for this specific token. allow_polling = get_action_values_from_options( SCOPE.AUTH, PushAction.ALLOW_POLLING, options=options) or PushAllowPolling.ALLOW if ((allow_polling == PushAllowPolling.ALLOW or (allow_polling == PushAllowPolling.TOKEN and is_true(self.get_tokeninfo(POLLING_ALLOWED, default='True')))) or res): validity = int(get_from_config('DefaultChallengeValidityTime', 120)) tokentype = self.get_tokentype().lower() # Maybe there is a PushChallengeValidityTime... lookup_for = tokentype.capitalize() + 'ChallengeValidityTime' validity = int(get_from_config(lookup_for, validity)) # Create the challenge in the database db_challenge = Challenge(self.token.serial, transaction_id=transactionid, challenge=challenge, data=data, session=options.get("session"), validitytime=validity) db_challenge.save() self.challenge_janitor() transactionid = db_challenge.transaction_id # If sending the Push message failed, we log a warning if not res: log.warning(f"Failed to submit message to Firebase service for token {self.token.serial}.") message += " " + ERROR_CHALLENGE_TEXT if is_true(options.get("exception")): raise ValidateError("Failed to submit message to Firebase service.") else: log.warning(f"The token {self.token.serial} has no tokeninfo {PushAction.FIREBASE_CONFIG}. " f"The message could not be sent.") message += " " + ERROR_CHALLENGE_TEXT if is_true(options.get("exception")): raise ValidateError("The token has no tokeninfo. Can not send via Firebase service.") reply_dict.update({"attributes": {"hideResponseInput": client_mode != ClientMode.INTERACTIVE}, "client_mode": client_mode}) return True, message, transactionid, reply_dict
[docs] @check_token_locked def authenticate(self, passw: str, user: User = None, options: dict = None) -> tuple[bool, int, dict | None]: """ High level interface which covers the check_pin and check_otp This is the method that verifies single shot authentication. The challenge is sent to the smartphone app and privacyIDEA waits for the response to arrive. :param passw: the password which could be pin+otp value :type passw: string :param user: The authenticating user :type user: User object :param options: dictionary of additional request parameters :type options: dict :return: returns tuple of 1. true or false for the pin match, 2. the otpcounter (int) and the 3. reply (dict) that will be added as additional information in the JSON response of ``/validate/check``. :rtype: tuple """ otp_counter = -1 reply = None pin_match = self.check_pin(passw, user=user, options=options) # Check policies g = options.get("g") require_presence = Match.user(g, scope=SCOPE.AUTH, action=PushAction.REQUIRE_PRESENCE, user_object=user).any() code_to_phone = Match.user(g, scope=SCOPE.AUTH, action=PushAction.PUSH_CODE_TO_PHONE, user_object=user).any() code_to_phone_enabled = (is_true(code_to_phone) and not is_true(require_presence) and not options.get(PushAction.WAIT, False)) if pin_match: if not options.get("valid_token_num"): # We should only do push_wait, if we do not already have successfully authenticated tokens! waiting = int(options.get(PushAction.WAIT, 20)) # Trigger the challenge _t, message, transaction_id, _attr = self.create_challenge(options=options) if code_to_phone_enabled: # code_to_phone: return challenge immediately, no waiting. # The client_mode is INTERACTIVE so the client shows an input field. # The user will enter the display_code after the smartphone confirms. return True, -1, {"transaction_id": transaction_id, "message": message} # Standard / require_presence: wait for the challenge to be answered start_time = time.time() while True: db.session.commit() otp_counter = self.check_challenge_response(options={"transaction_id": transaction_id}) elapsed_time = time.time() - start_time if otp_counter >= 0 or elapsed_time > waiting or elapsed_time < 0: break time.sleep(POLL_INTERVAL - (elapsed_time % POLL_INTERVAL)) elif code_to_phone_enabled and options.get("transaction_id"): # Step 2 of code_to_phone: the user submits the display_code shown after # the smartphone confirmed. Delegate entirely to check_challenge_response, # which enforces transaction_id binding and increments the failcount on # wrong codes — avoiding both the unbounded-challenge-scan and the missing # failcount increment that a hand-rolled loop here would have. otp_counter = self.check_challenge_response(passw=passw, options=options) if otp_counter >= 0: return True, otp_counter, {} return pin_match, otp_counter, reply
[docs] @check_token_locked def check_challenge_response(self, user: User = None, passw: str = None, options: dict = None) -> int: """ This function checks if the challenge for the given transaction_id was marked as answered correctly, if the mode is standard or require_presence. In this case, the passw parameter does not matter because the challenge has been answered by the smartphone, and we just check if that has happened correctly. If the mode of the challenge is code_to_phone, this is a 2-step process: 1. Wait for the smartphone to confirm (smartphone_confirmed becomes True in the challenge data). 2. After confirmation, a display_code is stored in the challenge data. The client must send this display_code via /validate/check. The display_code is only used for synchronization, the security lies in the smartphone confirmation. :param user: the requesting user :type user: User object :param passw: the password (pin+otp) :type passw: string :param options: additional arguments from the request, which could be token specific. Usually "transaction_id" :type options: dict :return: return otp_counter. If -1, challenge does not match :rtype: int """ options = options or {} otp_counter = -1 transaction_id = options.get('transaction_id') if transaction_id is None: transaction_id = options.get('state') if transaction_id is not None: challenges = get_challenges(serial=self.token.serial, transaction_id=transaction_id) for challenge in challenges: # Check that the challenge is not expired if challenge.is_valid(): data = challenge.get_data() if isinstance(data, dict) and data.get("mode") == PushMode.CODE_TO_PHONE: if not data.get("smartphone_confirmed"): # Step 1 not completed yet; smartphone has not confirmed. log.debug("code_to_phone: waiting for smartphone confirmation.") return -1 # Step 2: smartphone has confirmed, check the display_code display_code = data.get("display_code", "") if display_code and display_code == passw: return 1 elif passw is not None: log.debug("Received the wrong display code for push code_to_phone!") self.inc_failcount() return -1 else: # No passw provided yet (e.g. during polling) return -1 else: _, status = challenge.get_otp_status() if status is True: # create a positive response otp_counter = 1 # do not delete the challenge yet, that is the job of the calling context break return otp_counter
[docs] @classmethod def enroll_via_validate(cls, g: Any, content: dict, user_obj: User, message: str = None) -> None: """ This class method is used in the policy ENROLL_VIA_MULTICHALLENGE. It enrolls a new token of this type and returns the necessary information to the client by modifying the content. :param g: context object :param content: The content of a response :param user_obj: A user object :param message: An alternative message displayed to the user during enrollment :return: None, the content is modified """ # Get the firebase configuration from the policies push_params = get_pushtoken_add_config(g, user_obj=user_obj) token = init_token({"type": cls.get_class_type(), "genkey": 1, "2stepinit": 1}, user=user_obj) # We are in step 1: token.add_tokeninfo("enrollment_credential", geturandom(20, hex=True)) # We also store the Firebase config, that was used during the enrollment. token.add_tokeninfo(PushAction.FIREBASE_CONFIG, push_params.get(PushAction.FIREBASE_CONFIG)) content.get("result")["value"] = False content.get("result")["authentication"] = "CHALLENGE" detail = content.setdefault("detail", {}) # Create a challenge! challenge = token.create_challenge(options={"g": g, "user": user_obj, "session": ChallengeSession.ENROLLMENT}) # get details of token params = get_init_tokenlabel_parameters(g, user_object=user_obj, token_type=cls.get_class_type()) params["policies"] = push_params init_details = token.get_init_detail(params=params, user=user_obj) detail["transaction_ids"] = [challenge[2]] challenge_dict = {"transaction_id": challenge[2], "image": init_details.get("pushurl", {}).get("img"), "link": init_details.get("pushurl", {}).get("value"), "client_mode": ClientMode.POLL, "serial": token.token.serial, "type": token.type, "message": message or _("Please scan the QR code!")} detail["multi_challenge"] = [challenge_dict] detail.update(challenge_dict)
[docs] @classmethod def is_multichallenge_enrollable(cls) -> bool: return True
[docs] def get_enroll_url(self, user: User, params: dict) -> str: """ Return the URL to enroll this token. It is not supported by all token types. :param user: The user object :param params: Further parameters :return: The URL containing all required information to enroll the token """ init_details = self.get_init_detail(params, user) enroll_url = init_details.get("pushurl").get("value") return enroll_url