# SPDX-FileCopyrightText: 2026 NetKnights GmbH <https://netknights.it>
# SPDX-License-Identifier: AGPL-3.0-or-later
#
# contact: http://www.privacyidea.org
#
# 2019-02-08 Cornelius Kölbel <cornelius.koelbel@netknights.it>
# Start the pushtoken class
#
# This code is free software; you can redistribute it and/or
# modify it under the terms of the GNU AFFERO GENERAL PUBLIC LICENSE
# License as published by the Free Software Foundation; either
# version 3 of the License, or any later version.
#
# This code is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU AFFERO GENERAL PUBLIC LICENSE for more details.
#
# You should have received a copy of the GNU Affero General Public
# License along with this program. If not, see <http://www.gnu.org/licenses/>.
#
"""
The pushtoken sends a push notification via Firebase service to the registered smartphone.
The token is a challenge response token. The smartphone will sign the challenge
and send it back to the authentication endpoint.
This code is tested in tests/test_lib_tokens_push
"""
import logging
import random
import secrets
import string
import time
import traceback
from base64 import b32decode
from binascii import Error as BinasciiError
from datetime import datetime, timedelta, timezone
from typing import Any
from urllib.parse import quote
from cryptography.exceptions import InvalidSignature
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import padding
from dateutil.parser import isoparse
from privacyidea.api.lib.policyhelper import get_pushtoken_add_config, get_init_tokenlabel_parameters
from privacyidea.lib.params import get_optional, get_required
from privacyidea.lib import _, lazy_gettext
from privacyidea.lib.apps import _construct_extra_parameters
from privacyidea.lib.challenge import get_challenges
from privacyidea.lib.config import get_from_config
from privacyidea.lib.crypto import geturandom, generate_keypair
from privacyidea.lib.decorators import check_token_locked
from privacyidea.lib.error import ParameterError
from privacyidea.lib.error import (ResourceNotFoundError, ValidateError,
PrivacyIDEAError, ConfigAdminError, PolicyError)
from privacyidea.lib.log import log_with
from privacyidea.lib.policies.actions import PolicyAction
from privacyidea.lib.policy import (SCOPE, GROUP, Match,
get_action_values_from_options)
from privacyidea.lib.smsprovider.SMSProvider import get_smsgateway, create_sms_instance
from privacyidea.lib.token import get_one_token, init_token
from privacyidea.lib.tokenclass import (TokenClass, AuthenticationMode, ClientMode,
RolloutState, ChallengeSession)
from privacyidea.lib.tokens.push_types import (PushMode, PushPresenceOptions,
PushAction, PushAllowPolling,
CODE_TO_PHONE_DISPLAY_CODE_LENGTH)
from privacyidea.lib.user import User
from privacyidea.lib.utils import create_img, b32encode_and_unicode
from privacyidea.lib.utils import prepare_result, to_bytes, is_true, create_tag_dict
from privacyidea.models import Challenge, Token, db
log = logging.getLogger(__name__)
DEFAULT_CHALLENGE_TEXT = lazy_gettext("Please confirm the authentication on your mobile device!")
ERROR_CHALLENGE_TEXT = lazy_gettext("Use the polling feature of your privacyIDEA Authenticator App"
" to check for a new Login request.")
DEFAULT_MOBILE_TEXT = lazy_gettext("Do you want to confirm the login?")
DEFAULT_MOBILE_TEXT_CODE_TO_PHONE = lazy_gettext("Enter the code to log in")
PRIVATE_KEY_SERVER = "private_key_server"
PUBLIC_KEY_SERVER = "public_key_server"
PUBLIC_KEY_SMARTPHONE = "public_key_smartphone"
POLLING_ALLOWED = "polling_allowed"
GWTYPE = 'privacyidea.lib.smsprovider.FirebaseProvider.FirebaseProvider'
POLL_INTERVAL = 1.0
# Timedelta in minutes
POLL_TIME_WINDOW = 1
UPDATE_FB_TOKEN_WINDOW = 5
POLL_ONLY = "poll only"
AVAILABLE_PRESENCE_OPTIONS_ALPHABETIC = list(string.ascii_uppercase)
AVAILABLE_PRESENCE_OPTIONS_NUMERIC = [f'{x:02}' for x in range(100)]
ALLOWED_NUMBER_OF_OPTIONS = list(range(2, 11))
DEFAULT_NUMBER_OF_PRESENCE_OPTIONS = 3
def strip_pem_headers(key: str) -> str:
"""
strip the headers and footers like
-----BEGIN PUBLIC RSA KEY-----
-----END PUBLIC KEY-----
-----BEGIN PRIVATE RSA KEY-----
as well as whitespace
:param key: key as a string
:return: stripped key
"""
if key.startswith("-----BEGIN"):
return "\n".join(key.strip().splitlines()[1:-1]).strip()
else:
return key.strip()
@log_with(log)
def create_push_token_url(url: str | None = None, ttl: int | str = 10, issuer: str = "privacyIDEA",
serial: str = "mylabel", tokenlabel: str = "<s>", user_obj: User | None = None,
extra_data: dict | None = None, user: str | None = None, realm: str | None = None,
pia_scheme: bool = False) -> str:
"""
:param url:
:param ttl:
:param issuer:
:param serial:
:param tokenlabel:
:param user_obj:
:param extra_data:
:param user:
:param realm:
:param pia_scheme: Use the privacyIDEA App URL scheme "pia"
:return:
"""
extra_data = extra_data or {}
# policy depends on some lib.util
user_obj = user_obj or User()
# We need realm und user to be a string
realm = realm or ""
user = user or ""
# Deprecated
label = tokenlabel.replace("<s>",
serial).replace("<u>",
user).replace("<r>", realm)
user_info = user_obj.get_specific_info(["givenname", "surname"])
label = label.format(serial=serial, user=user, realm=realm,
givenname=user_info.get("givenname", ""),
surname=user_info.get("surname", ""))
issuer = issuer.format(serial=serial, user=user, realm=realm,
givenname=user_info.get("givenname", ""),
surname=user_info.get("surname", ""))
url_label = quote(label.encode("utf-8"))
url_issuer = quote(issuer.encode("utf-8"))
url_url = quote(url.encode("utf-8"))
scheme = "pia" if pia_scheme else "otpauth"
token_url = (f"{scheme}://pipush/{url_label}?url={url_url}&ttl={ttl}&issuer={url_issuer}"
f"{_construct_extra_parameters(extra_data)}")
return token_url
def _get_presence_options(options) -> list:
"""
Get the available presence options for the user to confirm the login based on the push policy configurations.
:param options: the request context parameters / data
:type options: dict
:return: The list of available presence characters/numbers
"""
try:
push_presence_option = PushPresenceOptions(get_action_values_from_options(
SCOPE.AUTH, PushAction.PRESENCE_OPTIONS, options))
except ValueError:
push_presence_option = PushPresenceOptions.ALPHABETIC
if push_presence_option == PushPresenceOptions.NUMERIC:
available_presence_options = list(AVAILABLE_PRESENCE_OPTIONS_NUMERIC)
elif push_presence_option == PushPresenceOptions.CUSTOM:
custom_presence_options = get_action_values_from_options(
SCOPE.AUTH, PushAction.PRESENCE_CUSTOM_OPTIONS, options)
available_presence_options = custom_presence_options.split(":")
# Default push_presence_option is "ALPHABETIC":
else:
available_presence_options = list(AVAILABLE_PRESENCE_OPTIONS_ALPHABETIC)
return available_presence_options
def _build_smartphone_data(token: TokenClass, challenge: str, registration_url: str, private_key_pem: str,
options: dict, presence_options: list = None) -> dict:
"""
Create the dictionary to be sent to the smartphone as challenge
:param token: The token object for which to create the smartphone data
:type token: A tokenclass object
:param challenge: base32 encoded random data string
:type challenge: str
:param registration_url: The privacyIDEA URL, to which the Push token communicates
:type registration_url: str
:param options: the options dictionary
:type options: dict
:param presence_options: Require the user to confirm with the correct button from the list of options.
:type presence_options: list
:return: the created smartphone_data dictionary
:rtype: dict
"""
sslverify = get_action_values_from_options(SCOPE.AUTH, PushAction.SSL_VERIFY,
options) or "1"
if sslverify not in ["0", "1"]:
sslverify = "1"
default_message = str(DEFAULT_MOBILE_TEXT)
message_on_mobile = get_action_values_from_options(SCOPE.AUTH,
PushAction.MOBILE_TEXT,
options) or default_message
# Get the request object
_g = options.get("g", {})
req_headers = None
request = None
if hasattr(_g, "request_headers"):
req_headers = _g.request_headers
if req_headers:
req_environment = req_headers.environ
request = req_environment.get("werkzeug.request")
if request:
user = request.User
else:
# Get owner from token object
try:
user = token.user
except Exception:
user = None
tags = create_tag_dict(serial=token.token.serial,
request=request,
client_ip=options.get("clientip"),
tokenowner=user,
tokentype="push",
recipient={"givenname": user.info.get("givenname") if user else "",
"surname": user.info.get("surname") if user else ""},
challenge=options.get("challenge"))
try:
message_on_mobile = message_on_mobile.format(**tags)
except KeyError as e:
log.warning(f"Could not format the message: {e}. Using default message.")
message_on_mobile = default_message
log.debug(f"Sending to mobile: {message_on_mobile}")
title = get_action_values_from_options(SCOPE.AUTH, PushAction.MOBILE_TITLE, options) or "privacyIDEA"
smartphone_data = {
"nonce": challenge,
"question": message_on_mobile,
"serial": token.token.serial,
"title": title,
"sslverify": sslverify,
"url": registration_url
}
# Create the signature.
sign_string = "{nonce}|{url}|{serial}|{question}|{title}|{sslverify}"
if presence_options is not None:
smartphone_data["require_presence"] = ",".join(presence_options)
smartphone_data["version"] = "2"
sign_string += f"|{smartphone_data['require_presence']}"
sign_string = sign_string.format(**smartphone_data)
# Since the private key is generated by privacyIDEA and only stored
# encrypted in the database, we can disable the costly key check here
private_key = serialization.load_pem_private_key(to_bytes(private_key_pem),
None, default_backend(),
unsafe_skip_rsa_key_validation=True)
# Sign the data with PKCS1 padding. Not all Androids support PSS padding.
signature = private_key.sign(sign_string.encode("utf8"),
padding.PKCS1v15(),
hashes.SHA256())
smartphone_data["signature"] = b32encode_and_unicode(signature)
return smartphone_data
def _load_public_key(pubkey_pem: str) -> Any:
"""
Load the given stripped and urlsafe public key and return the verify object
:param pubkey_pem:
:return:
"""
# The public key of the smartphone was probably sent as urlsafe:
pubkey_pem = pubkey_pem.replace("-", "+").replace("_", "/")
# The public key was sent without any header
pubkey_pem = "-----BEGIN PUBLIC KEY-----\n" \
f"{pubkey_pem.strip().replace(' ', '+')}\n" \
"-----END PUBLIC KEY-----"
return serialization.load_pem_public_key(to_bytes(pubkey_pem), default_backend())
[docs]
class PushTokenClass(TokenClass):
"""
The :ref:`push_token` uses the Firebase service to send challenges to the
user's smartphone. The user confirms on the smartphone, signs the
challenge and sends it back to privacyIDEA.
The enrollment occurs in two enrollment steps:
**Step 1**:
The device is enrolled using a QR code, which encodes the following URI::
otpauth://pipush/PIPU0006EF85?url=https://yourprivacyideaserver/enroll/this/token&ttl=120
**Step 2**:
In the QR code is a URL, where the smartphone sends the remaining data for the enrollment:
.. sourcecode:: http
POST /ttype/push HTTP/1.1
Host: https://yourprivacyideaserver/
enrollment_credential=<hex nonce>
serial=<token serial>
fbtoken=<Firebase token>
pubkey=<public key>
For more information see:
- https://github.com/privacyidea/privacyidea/issues/1342
- https://github.com/privacyidea/privacyidea/wiki/concept%3A-PushToken
"""
mode = [AuthenticationMode.AUTHENTICATE, AuthenticationMode.CHALLENGE, AuthenticationMode.OUTOFBAND]
client_mode = ClientMode.POLL
def __init__(self, db_token: Token):
TokenClass.__init__(self, db_token)
self.set_type("push")
self.hKeyRequired = False
[docs]
@staticmethod
def get_class_type() -> str:
"""
return the generic token class identifier
"""
return "push"
[docs]
@staticmethod
def get_class_prefix() -> str:
return "PIPU"
[docs]
@staticmethod
def get_class_info(key: str = None, ret: str = 'all') -> Any:
"""
returns all or a subtree of the token definition
:param key: subsection identifier
:type key: str
:param ret: default return value, if nothing is found
:type ret: user defined
:return: subsection if key exists or user defined
:rtype: dict
"""
gws = get_smsgateway(gwtype=GWTYPE)
res = {'type': 'push',
'title': _('PUSH Token'),
'description':
_('PUSH: Send a push notification to a smartphone.'),
'user': ['enroll'],
# This tokentype is enrollable in the UI for...
'ui_enroll': ["admin", "user"],
'policy':
{
SCOPE.ENROLL: {
PushAction.FIREBASE_CONFIG: {
'type': 'str',
'desc': _('The configuration of your Firebase application.'),
'group': "PUSH",
'value': [POLL_ONLY] + [gw.identifier for gw in gws]
},
PushAction.REGISTRATION_URL: {
"required": True,
'type': 'str',
'group': "PUSH",
'desc': _('The URL the Push App should contact in the second enrollment step.'
' Usually it is the endpoint /ttype/push of the privacyIDEA server.')
},
PushAction.TTL: {
'type': 'int',
'group': "PUSH",
'desc': _('The second enrollment step must be completed within this time (in minutes).')
},
PushAction.SSL_VERIFY: {
'type': 'str',
'desc': _('The smartphone needs to verify SSL during the enrollment. (default 1)'),
'group': "PUSH",
'value': ["0", "1"]
},
PolicyAction.MAXTOKENUSER: {
'type': 'int',
'desc': _("The user may only have this maximum number of Push tokens assigned."),
'group': GROUP.TOKEN
},
PolicyAction.MAXACTIVETOKENUSER: {
'type': 'int',
'desc': _("The user may only have this maximum number of active Push tokens assigned."),
'group': GROUP.TOKEN
},
'push_' + PolicyAction.FORCE_APP_PIN: {
'type': 'bool',
'group': "PUSH",
'desc': _('Require to unlock the Smartphone before Push requests can be accepted')
},
'push_' + PolicyAction.APP_FORCE_UNLOCK: {
'type': 'str',
'value': ["any",
"biometric",
"pin"],
'desc': _('Enforces the privacyIDEA Authenticator App that the token has to '
'be unlocked with pin or biometric. This needs the privacyIDEA '
'Authenticator app 4.6.1 or higher.')
},
PushAction.USE_PIA_SCHEME: {
'type': 'bool',
'desc': _("Use the privacyIDEA app URL scheme 'pia' in the enroll URL for push tokens "
"to open the privacyIDEA app."),
'group': "PUSH",
}
},
SCOPE.AUTH: {
PushAction.MOBILE_TEXT: {
'type': 'str',
'desc': _(
'The question the user sees on his mobile phone. Several tags like {serial} and '
'{client_ip} can be used as parameters.'),
'group': 'PUSH'
},
PushAction.MOBILE_TITLE: {
'type': 'str',
'desc': _('The title of the notification, the user sees on his mobile phone.'),
'group': 'PUSH'
},
PushAction.SSL_VERIFY: {
'type': 'str',
'desc': _('The smartphone needs to verify SSL during authentication. (default 1)'),
'group': "PUSH",
'value': ["0", "1"]
},
PushAction.REQUIRE_PRESENCE: {
'type': 'bool',
'desc': _('Require the user to confirm the login with a presence check.'),
'group': 'PUSH'
},
PushAction.PUSH_CODE_TO_PHONE: {
'type': 'bool',
'desc': _('Require the user to confirm the login with an OTP that is received in the'
' smartphone app. push_require_presence has precedence over this. '
'It is not compatible with push_wait.'),
'group': 'PUSH'
},
PushAction.PUSH_CODE_TO_PHONE_MESSAGE: {
'type': 'str',
'desc': _('The message that is shown above the code on the smartphone.'),
'group': 'PUSH'
},
PushAction.PRESENCE_OPTIONS: {
'type': 'str',
'desc': _('The options that can be presented to the user to confirm the login. '
'<code>ALPHABETIC</code>: A-Z, <code>NUMERIC</code>: 01-99, '
'<code>CUSTOM</code>: user defined. '
'Does only apply if <em>{0!s}</em> is set.').format(
PushAction.REQUIRE_PRESENCE),
'group': 'PUSH',
'value': [x for x in PushPresenceOptions.__members__],
},
PushAction.PRESENCE_CUSTOM_OPTIONS: {
'type': 'str',
'desc': _('Custom options that can be presented to the user to confirm the login. '
'The string must contain at least 2 options and should be unique. '
'The options are separated by <code>:</code>. '
'e.g.: <code>01:02:03:1A:1B:1C</code>. '
'Does only apply if <em>{0!s}</em> is set '
'to <code>CUSTOM</code>.').format(PushAction.PRESENCE_OPTIONS),
'group': 'PUSH'
},
PushAction.PRESENCE_NUM_OPTIONS: {
'type': 'int',
'desc': _('The number of options the user is presented with to confirm the login. '
'Does only apply if <em>{0!s}</em> is set.').format(
PushAction.REQUIRE_PRESENCE),
'group': 'PUSH',
'value': ALLOWED_NUMBER_OF_OPTIONS
},
PushAction.WAIT: {
'type': 'int',
'desc': _('Wait for number of seconds for the user to confirm the challenge in the '
'first request.'),
'group': 'PUSH'
},
PushAction.ALLOW_POLLING: {
'type': 'str',
'desc': _('Configure whether to allow push tokens to poll for challenges'),
'group': 'PUSH',
'value': [PushAllowPolling.ALLOW,
PushAllowPolling.DENY,
PushAllowPolling.TOKEN],
'default': PushAllowPolling.ALLOW
}
}
},
}
if key:
ret = res.get(key, {})
else:
if ret == 'all':
ret = res
return ret
[docs]
@log_with(log)
def use_for_authentication(self, options: dict) -> bool:
# A disabled PUSH token has to be removed from the list of checked tokens.
return self.is_active()
[docs]
@log_with(log)
def update(self, param: dict, reset_failcount: bool = True) -> None:
"""
process the initialization parameters
We need to distinguish the first authentication step
and the second authentication step.
1. step:
``param`` contains:
- ``type``
- ``genkey``
2. step:
``param`` contains:
- ``serial``
- ``fbtoken``
- ``pubkey``
:param param: dict of initialization parameters
:type param: dict
:return: nothing
"""
upd_param = {}
for k, v in param.items():
upd_param[k] = v
if "serial" in upd_param and "fbtoken" in upd_param and "pubkey" in upd_param:
# We are in step 2:
if self.token.rollout_state != RolloutState.CLIENTWAIT:
raise ParameterError("Invalid state! The token you want to enroll is not in the state 'clientwait'.")
enrollment_credential = get_required(upd_param, "enrollment_credential")
if enrollment_credential != self.get_tokeninfo("enrollment_credential"):
raise ParameterError("Invalid enrollment credential. You are not authorized to finalize this token.")
self.delete_tokeninfo("enrollment_credential")
self.token.rollout_state = "enrolled"
self.token.active = True
self.add_tokeninfo(PUBLIC_KEY_SMARTPHONE, upd_param.get("pubkey"))
self.add_tokeninfo("firebase_token", upd_param.get("fbtoken"))
# create a keypair for the server side.
pub_key, priv_key = generate_keypair(4096)
self.add_tokeninfo(PUBLIC_KEY_SERVER, pub_key)
self.add_tokeninfo(PRIVATE_KEY_SERVER, priv_key, "password")
elif "genkey" in upd_param:
# We are in step 1:
upd_param["2stepinit"] = 1
self.add_tokeninfo("enrollment_credential", geturandom(20, hex=True))
# We also store the Firebase config, that was used during the enrollment.
self.add_tokeninfo(PushAction.FIREBASE_CONFIG, param.get(PushAction.FIREBASE_CONFIG))
else:
raise ParameterError("Invalid Parameters. Either provide (genkey) or (serial, fbtoken, pubkey).")
TokenClass.update(self, upd_param, reset_failcount)
[docs]
@log_with(log)
def get_init_detail(self, params: dict = None, user: User = None) -> dict:
"""
This returns the init details during enrollment.
In the 1st step the QR Code is returned.
"""
response_detail = TokenClass.get_init_detail(self, params, user)
if "otpkey" in response_detail:
del response_detail["otpkey"]
params = params or {}
policy_params = params.get("policies", {})
user = user or User()
tokenlabel = params.get("tokenlabel", "<s>")
tokenissuer = params.get("tokenissuer", "privacyIDEA")
sslverify = get_optional(policy_params, PushAction.SSL_VERIFY, default="1")
if sslverify not in ["0", "1"]:
sslverify = "1"
# Add rollout state the response
response_detail['rollout_state'] = self.token.rollout_state
extra_data = {"enrollment_credential": self.get_tokeninfo("enrollment_credential")}
imageurl = params.get("appimageurl")
if imageurl:
extra_data.update({"image": imageurl})
if self.token.rollout_state == RolloutState.CLIENTWAIT:
# Get enrollment values from the policy
registration_url = get_required(policy_params, PushAction.REGISTRATION_URL)
ttl = policy_params.get(PushAction.TTL, "10")
# Get the values from the configured PUSH config
fb_identifier = policy_params.get(PushAction.FIREBASE_CONFIG)
if fb_identifier != POLL_ONLY:
# If do not do poll_only, then we load all the Firebase configuration
firebase_configs = get_smsgateway(identifier=fb_identifier, gwtype=GWTYPE)
if len(firebase_configs) != 1:
raise ParameterError("Unknown Firebase configuration!")
# this allows to upgrade our crypto
extra_data["v"] = 1
extra_data["serial"] = self.get_serial()
extra_data["sslverify"] = sslverify
extra_data["poll_only"] = fb_identifier == POLL_ONLY
# enforce App pin
if params.get(PolicyAction.FORCE_APP_PIN):
extra_data.update({'pin': True})
if params.get(PolicyAction.APP_FORCE_UNLOCK):
extra_data.update({'app_force_unlock': params.get(PolicyAction.APP_FORCE_UNLOCK)})
# Get scheme to use
pia_scheme = policy_params.get(PushAction.USE_PIA_SCHEME, False)
# We display this during the first enrollment step!
qr_url = create_push_token_url(url=registration_url,
user=user.login,
realm=user.realm,
serial=self.get_serial(),
tokenlabel=tokenlabel,
issuer=tokenissuer,
user_obj=user,
extra_data=extra_data,
ttl=ttl,
pia_scheme=pia_scheme)
response_detail["pushurl"] = {"description": _("URL for privacyIDEA Push Token"),
"value": qr_url,
"img": create_img(qr_url)
}
response_detail["enrollment_credential"] = self.get_tokeninfo("enrollment_credential")
elif self.token.rollout_state == RolloutState.ENROLLED:
# in the second enrollment step we return the public key of the server to the smartphone.
pubkey = strip_pem_headers(self.get_tokeninfo(PUBLIC_KEY_SERVER))
response_detail["public_key"] = pubkey
return response_detail
@staticmethod
def _check_timestamp_in_range(timestamp: str, window: int) -> None:
""" Check if the timestamp is a valid timestamp and if it matches the time window.
If the check fails a privacyIDEA error is thrown.
:param timestamp: A timestamp in iso format, either with a timezone or UTC is assumed
:type timestamp: str
:param window: Time window in minutes. The timestamp must lie within the
range of -window to +window of the current time.
:type window: int
"""
try:
ts = isoparse(timestamp)
except (ValueError, TypeError) as _e:
log.debug(f'{traceback.format_exc()}')
raise PrivacyIDEAError(f'Could not parse timestamp {timestamp}. ISO-Format required.')
td = timedelta(minutes=window)
# We don't know if the passed timestamp is timezone aware. If no
# timezone is passed, we assume UTC
if ts.tzinfo:
# We consider the timezone of the given timestamp
now = datetime.now(ts.tzinfo)
else:
# If a timestamp without timezone is given, we assume it is UTC
now = datetime.now(timezone.utc)
# We need to add the timezone UTC to the naive timestamp
ts = ts.replace(tzinfo=timezone.utc)
if not (now - td <= ts <= now + td):
raise PrivacyIDEAError(f'Timestamp {timestamp} not in valid range.')
@classmethod
def _handle_enrollment_step2(cls, serial: str, request_data: dict) -> tuple[bool, dict]:
log.debug("Do the 2nd step of the enrollment.")
try:
token = get_one_token(serial=serial, tokentype="push", rollout_state=RolloutState.CLIENTWAIT)
token.update(request_data)
# in case of validate/check enrollment
challenges = get_challenges(serial=serial)
if (challenges and challenges[0].is_valid()
and challenges[0].get_session() == ChallengeSession.ENROLLMENT):
challenges[0].set_otp_status(True)
challenges[0].save()
except ResourceNotFoundError:
raise ResourceNotFoundError("No token with this serial number in the rollout state 'clientwait'.")
init_detail_dict = request_data
details = token.get_init_detail(init_detail_dict)
return True, details
@classmethod
def _handle_auth_response(cls, serial: str, request_data: dict) -> tuple[bool, dict]:
log.debug("Handling the authentication response from the smartphone.")
signature = get_optional(request_data, "signature")
decline = is_true(get_optional(request_data, "decline", default=False))
presence_answer = get_optional(request_data, "presence_answer")
token = get_one_token(serial=serial, tokentype="push")
public_key = _load_public_key(token.get_tokeninfo(PUBLIC_KEY_SMARTPHONE))
challenges = get_challenges(serial=serial)
result = False
details = {}
if challenges:
# There are valid challenges, so we check this signature
for challenge in challenges:
# Re-construct the signature data and then verify the signature
sign_data = f"{challenge.challenge}|{serial}"
if decline:
sign_data += "|decline"
if presence_answer:
sign_data += f"|{presence_answer}"
try:
public_key.verify(b32decode(signature),
sign_data.encode("utf8"),
padding.PKCS1v15(),
hashes.SHA256())
log.debug(f"Found matching challenge {challenge}.")
result = True
if decline:
challenge.set_session(ChallengeSession.DECLINED)
else:
# Verify the presence_answer which is stored in the challenge data (json).
# Legacy format: the correct choice is stored as the last entry in the data (str)separated by
# a comma. Make sure that the presence_answer is given if it is set in the challenge so that
# a response with a valid signature but no presence_answer does not pass!
challenge_data = challenge.get_data()
if (isinstance(challenge_data, dict) and
challenge_data.get("mode") == PushMode.REQUIRE_PRESENCE and presence_answer):
correct_answer = challenge_data.get("correct_answer")
if presence_answer != correct_answer:
log.debug(f"Challenge data ({challenge_data}) does not match "
f"given presence answer ({presence_answer})!")
result = False
else:
# Presence answer matches, mark the challenge as answered
challenge.set_otp_status(True)
elif isinstance(challenge_data, str) and presence_answer:
# Legacy handling
if presence_answer != challenge_data.split(",").pop():
log.debug(f"Challenge data ({challenge_data}) does not match "
f"given presence answer ({presence_answer})!")
result = False
else:
challenge.set_otp_status(True)
# Check if presence_answer is missing, but it is required
elif (isinstance(challenge_data, dict)
and challenge_data.get("mode") == PushMode.REQUIRE_PRESENCE
and not presence_answer):
log.warning("'push_require_presence' Policy is set but the presence answer "
"is not present in the smartphone request!")
result = False
# Code to Phone: smartphone has confirmed, now generate a number to show and
# save it to the challenge.
elif (isinstance(challenge_data, dict) and challenge_data.get(
"mode") == PushMode.CODE_TO_PHONE):
display_code = "".join([str(secrets.randbelow(10))
for _ in range(CODE_TO_PHONE_DISPLAY_CODE_LENGTH)])
challenge_data["smartphone_confirmed"] = True
challenge_data["display_code"] = display_code
challenge.set_data(challenge_data)
# Do NOT mark otp_valid yet; the client still needs to send the display_code.
details["display_code"] = display_code
details["message"] = request_data.get(PushAction.PUSH_CODE_TO_PHONE_MESSAGE) or str(
DEFAULT_MOBILE_TEXT_CODE_TO_PHONE)
else:
challenge.set_otp_status(True)
challenge.save()
except InvalidSignature as _e:
pass
return result, details
@classmethod
def _handle_firebase_update(cls, serial: str, request_data: dict) -> tuple[bool, dict]:
log.debug("Updating the firebase token of the smartphone.")
timestamp = get_required(request_data, 'timestamp')
signature = get_required(request_data, 'signature')
# First, check if the timestamp is in the required span
cls._check_timestamp_in_range(timestamp, UPDATE_FB_TOKEN_WINDOW)
try:
token = get_one_token(serial=serial, tokentype=cls.get_class_type())
public_key = _load_public_key(token.get_tokeninfo(PUBLIC_KEY_SMARTPHONE))
sign_data = "{new_fb_token}|{serial}|{timestamp}".format(**request_data)
public_key.verify(b32decode(signature),
sign_data.encode("utf8"),
padding.PKCS1v15(),
hashes.SHA256())
# If the timestamp and signature are valid, we update the token
token.add_tokeninfo('firebase_token', request_data['new_fb_token'])
return True, {}
except (ResourceNotFoundError, ParameterError, TypeError,
InvalidSignature, ConfigAdminError, BinasciiError) as e:
# To avoid disclosing information, we always fail with an invalid
# signature error, even if the token with the serial could not be found
log.debug(f'{traceback.format_exc()}')
log.info(f'The following error occurred during the signature check: "{e}"')
raise PrivacyIDEAError('Could not verify signature!')
@classmethod
def _api_endpoint_post(cls, g, request_data: dict) -> tuple[bool, dict]:
""" Handle all POST requests to the api endpoint
:param request_data: Dictionary containing the parameters of the request
:type request_data: dict
:returns: The result of handling the request and a dictionary containing
the details of the request handling
:rtype: (bool, dict)
"""
serial = get_required(request_data, "serial")
if all(k in request_data for k in ("fbtoken", "pubkey")):
return cls._handle_enrollment_step2(serial, request_data)
elif "signature" in request_data and "new_fb_token" not in request_data:
return cls._handle_auth_response(serial, request_data)
elif all(k in request_data for k in ('new_fb_token', 'timestamp', 'signature')):
return cls._handle_firebase_update(serial, request_data)
else:
raise ParameterError("Missing parameters!")
def _get_existing_challenge_data(self, transaction_id: str, push_mode: PushMode) -> dict | None:
"""
Load challenge data either in the current format (json/dict) or in the legacy format, which was a string
that was used for require_presence.
"""
challenges = get_challenges(transaction_id=transaction_id)
for c in challenges:
c_data = c.get_data()
if (isinstance(c_data, dict) and c_data.get("type", "") == "push"
and c_data.get("mode") == push_mode):
return c_data
elif isinstance(c_data, str) and c.serial.startswith("PIPU") and push_mode == PushMode.REQUIRE_PRESENCE:
# Challenge data is string => legacy handling for presence
split_presence_options = c_data.split(",")
return {
"mode": PushMode.REQUIRE_PRESENCE,
"options": split_presence_options[:-1],
"correct_answer": split_presence_options[-1]
}
return None
def _handle_presence_challenge(self, options: dict, transaction_id: str) -> tuple[dict, list, str]:
current_presence_options = []
correct_presence_option = ""
if options.get("push_triggered"):
c_data = self._get_existing_challenge_data(transaction_id, PushMode.REQUIRE_PRESENCE)
if c_data:
current_presence_options = c_data.get("options")
correct_presence_option = c_data.get("correct_answer")
if not current_presence_options:
available_presence_options = _get_presence_options(options)
num_option = int(get_action_values_from_options(
SCOPE.AUTH, PushAction.PRESENCE_NUM_OPTIONS,
options) or DEFAULT_NUMBER_OF_PRESENCE_OPTIONS)
num_option = (num_option if num_option in ALLOWED_NUMBER_OF_OPTIONS
else DEFAULT_NUMBER_OF_PRESENCE_OPTIONS)
if num_option > len(available_presence_options):
log.warning(f"The required number of presence options exceeds "
f"the number of available presence options ({num_option} "
f"!= {len(available_presence_options)})")
num_option = len(available_presence_options)
current_presence_options = random.sample(available_presence_options,
num_option)
correct_presence_option = secrets.choice(current_presence_options)
data = {
"type": "push",
"mode": PushMode.REQUIRE_PRESENCE,
"options": current_presence_options,
"correct_answer": correct_presence_option
}
return data, current_presence_options, correct_presence_option
@classmethod
def _api_endpoint_get(cls, g: Any, request_data: dict) -> list:
"""
Handle all GET requests to the api endpoint.
Currently, this is only used for polling.
:param g: The Flask context
:param request_data: Dictionary containing the parameters of the request
:type request_data: dict
:returns: Result of the polling operation, 'True' if an unanswered and
matching challenge exists, 'False' otherwise.
:rtype: bool
"""
# By default, we allow polling if the policy is not set.
allow_polling = get_action_values_from_options(
SCOPE.AUTH, PushAction.ALLOW_POLLING,
options={'g': g}) or PushAllowPolling.ALLOW
if allow_polling == PushAllowPolling.DENY:
raise PolicyError('Polling not allowed!')
serial = get_required(request_data, "serial")
timestamp = get_required(request_data, 'timestamp')
signature = get_required(request_data, 'signature')
# first check if the timestamp is in the required span
cls._check_timestamp_in_range(timestamp, POLL_TIME_WINDOW)
# now check the signature
# first get the token
try:
token = get_one_token(serial=serial, tokentype=cls.get_class_type())
# If the push_allow_polling policy is set to "token" we also
# need to check the POLLING_ALLOWED tokeninfo. If it evaluated
# to 'False', polling is not allowed for this token. If the
# tokeninfo value evaluates to 'True' or is not set at all,
# polling is allowed for this token.
if allow_polling == PushAllowPolling.TOKEN:
if not is_true(token.get_tokeninfo(POLLING_ALLOWED, default='True')):
log.debug(f'Polling not allowed for pushtoken {serial} due to tokeninfo.')
raise PolicyError('Polling not allowed!')
public_key = _load_public_key(token.get_tokeninfo(PUBLIC_KEY_SMARTPHONE))
sign_data = "{serial}|{timestamp}".format(**request_data)
public_key.verify(b32decode(signature),
sign_data.encode("utf8"),
padding.PKCS1v15(),
hashes.SHA256())
# The signature was valid now check for an open challenge
# we need the private server key to sign the smartphone data
private_key = token.get_tokeninfo(PRIVATE_KEY_SERVER)
# We need the registration URL for the challenge
registration_url = get_action_values_from_options(SCOPE.ENROLL, PushAction.REGISTRATION_URL,
options={'g': g})
if not registration_url:
raise ResourceNotFoundError('There is no registration_url defined for the '
f' pushtoken {serial}. You need to define a push_registration_url '
'in an enrollment policy.')
options = {"g": g}
open_challenges = []
db_challenges = get_challenges(serial=serial)
for challenge in db_challenges:
if challenge.get_session() == ChallengeSession.DECLINED:
continue
# check if the challenge is active and not already answered
_, answered = challenge.get_otp_status()
if not answered and challenge.is_valid():
# Check if the challenge has mode set to require_presence or code_to_phone. This will
# change what we need to return to the smartphone.
challenge_data = challenge.get_data()
presence_options = None
if isinstance(challenge_data, dict):
if challenge_data.get("mode") == PushMode.REQUIRE_PRESENCE:
presence_options = challenge_data.get("options")
elif challenge_data.get("mode") == PushMode.CODE_TO_PHONE:
if challenge_data.get("smartphone_confirmed"):
# Smartphone already confirmed this challenge, skip it
continue
# code_to_phone step 1: present as standard challenge for the smartphone
# to confirm. No display_code is sent.
elif isinstance(challenge_data, str) and challenge_data:
# Legacy handling, when require_presence was a string of options with the correct one at the end
presence_options = challenge_data.split(",")[:-1]
# then return the necessary smartphone data to answer the challenge
smartphone_data = _build_smartphone_data(token, challenge.challenge, registration_url, private_key,
options, presence_options)
open_challenges.append(smartphone_data)
# return the challenges as a list in the result value
result = open_challenges
except (ResourceNotFoundError, ParameterError,
InvalidSignature, ConfigAdminError, BinasciiError) as e:
# to avoid disclosing information we always fail with an invalid
# signature error even if the token with the serial could not be found
log.debug(f'{traceback.format_exc()}')
log.info(f'The following error occurred during the signature check: "{e}"')
raise PrivacyIDEAError('Could not verify signature!')
return result
[docs]
@classmethod
def api_endpoint(cls, request: Any, g: Any) -> tuple[str, str]:
"""
This provides a function which is called by the API endpoint
``/ttype/push`` which is defined in :doc:`../../api/ttype`
The method returns a tuple ``("json", {})``
This endpoint provides several functionalities:
- It is used for the 2nd enrollment step of the smartphone.
It accepts the following parameters:
.. sourcecode:: http
POST /ttype/push HTTP/1.1
Host: https://yourprivacyideaserver
serial=<token serial>
fbtoken=<Firebase token>
pubkey=<public key>
- It is also used when the smartphone sends the signed response
to the challenge during authentication. The following parameters are accepted:
.. sourcecode:: http
POST /ttype/push HTTP/1.1
Host: https://yourprivacyideaserver
serial=<token serial>
signature=<signature over {challenge}|{serial}>
- The smartphone can also decline the authentication request, by sending
a response to the server:
.. sourcecode:: http
POST /ttype/push HTTP/1.1
Host: https://yourprivacyideaserver
serial=<token serial>
decline=1
signature=<signature over {challenge}|{serial}|decline
- In some cases the Firebase service changes the token of a device. This
needs to be communicated to privacyIDEA through this endpoint
(https://github.com/privacyidea/privacyidea/wiki/concept%3A-pushtoken-poll#update
-firebase-token):
.. sourcecode:: http
POST /ttype/push HTTP/1.1
Host: https://yourprivacyideaserver
new_fb_token=<new Firebase token>
serial=<token serial>
timestamp=<timestamp>
signature=SIGNATURE(<new_fb_token>|<serial>|<timestamp>)
- And it also acts as an endpoint for polling challenges:
.. sourcecode:: http
GET /ttype/push HTTP/1.1
Host: https://yourprivacyideaserver
serial=<tokenserial>
timestamp=<timestamp>
signature=SIGNATURE(<tokenserial>|<timestamp>)
More on polling can be found here: https://github.com/privacyidea/privacyidea/wiki/concept%3A-pushtoken-poll
:param request: The Flask request
:param g: The Flask global object g
:return: The json string representing the result dictionary
:rtype: tuple("json", str)
"""
details = {}
if request.method == 'POST':
result, details = cls._api_endpoint_post(g, request.all_data)
elif request.method == 'GET':
result = cls._api_endpoint_get(g, request.all_data)
else:
raise PrivacyIDEAError(f'Method {request.method} not allowed in \'api_endpoint\' for push token.')
return "json", prepare_result(result, details=details)
[docs]
@log_with(log, hide_args=[1])
def is_challenge_request(self, passw: str, user: User = None, options: dict = None) -> bool:
"""
check, if the request would start a challenge
We need to define the function again, to get rid of the
is_challenge_request-decorator of the base class
:param passw: password, which might be the pin or pin+otp
:param user: the user object
:param options: dictionary of additional request parameters
:return: returns true or false
"""
if options.get(PushAction.WAIT):
# We have a push_wait in the parameters
return False
return self.check_pin(passw, user=user, options=options)
[docs]
def create_challenge(self, transactionid: str = None, options: dict = None) -> tuple[bool, str, str, dict]:
"""
This method creates a challenge, which is submitted to the user.
The submitted challenge will be preserved in the challenge
database.
If no transaction id is given, the system will create a transaction
id and return it, so that the response can refer to this transaction.
:param transactionid: the id of this challenge
:param options: the request context parameters / data
:type options: dict
:return: tuple of (bool, message, transactionid, attributes)
:rtype: tuple
The return tuple builds up like this:
``bool`` if submit was successful;
``message`` which is displayed in the JSON response;
additional challenge ``reply_dict``, which is displayed in the JSON challenges response.
"""
options = options or {}
message = get_action_values_from_options(SCOPE.AUTH,
PolicyAction.CHALLENGETEXT,
options) or str(DEFAULT_CHALLENGE_TEXT)
message = message.replace(r'\,', ',')
# Determine if require presence is enabled
g = options.get("g")
require_presence = Match.user(g, scope=SCOPE.AUTH, action=PushAction.REQUIRE_PRESENCE,
user_object=options.get("user")).any()
code_to_phone_enabled = Match.user(g, scope=SCOPE.AUTH, action=PushAction.PUSH_CODE_TO_PHONE,
user_object=options.get("user")).any()
data = {"type": "push", "mode": PushMode.STANDARD}
current_presence_options = None
reply_dict = {}
client_mode = self.client_mode
# Handle require_presence or code_to_phone if enabled
if is_true(require_presence) and not options.get(PushAction.WAIT):
data, current_presence_options, correct_presence_option = self._handle_presence_challenge(options,
transactionid)
reply_dict.update({"presence_answer": correct_presence_option})
elif is_true(require_presence) and options.get(PushAction.WAIT):
log.warning("Unable to use 'require_presence' policy with 'push_wait'. "
"Disabling 'require_presence' policy!")
elif is_true(code_to_phone_enabled) and not options.get(PushAction.WAIT):
if options.get("push_triggered"):
data = self._get_existing_challenge_data(transactionid, PushMode.CODE_TO_PHONE)
else:
data = {
"type": "push",
"mode": PushMode.CODE_TO_PHONE,
"smartphone_confirmed": False,
}
message = _("Please enter the code displayed on your smartphone.")
client_mode = ClientMode.INTERACTIVE
# Initially we assume there is no error from Firebase
res = True
fb_identifier = self.get_tokeninfo(PushAction.FIREBASE_CONFIG)
if fb_identifier:
challenge = b32encode_and_unicode(geturandom())
if options.get("session") != ChallengeSession.ENROLLMENT:
if fb_identifier != POLL_ONLY:
# We only push to Firebase if this token is NOT POLL_ONLY.
fb_gateway = create_sms_instance(fb_identifier)
registration_url = get_action_values_from_options(
SCOPE.ENROLL, PushAction.REGISTRATION_URL, options=options)
private_key_pem = self.get_tokeninfo(PRIVATE_KEY_SERVER)
smartphone_data = _build_smartphone_data(self,
challenge, registration_url,
private_key_pem, options, current_presence_options)
log.debug(f"Sending to firebase the smartphone_data: {smartphone_data}")
res = fb_gateway.submit_message(self.get_tokeninfo("firebase_token"), smartphone_data)
# Create the challenge in the challenge table if either the message
# was successfully submitted to the Firebase API or if polling is
# allowed in general or for this specific token.
allow_polling = get_action_values_from_options(
SCOPE.AUTH, PushAction.ALLOW_POLLING, options=options) or PushAllowPolling.ALLOW
if ((allow_polling == PushAllowPolling.ALLOW or
(allow_polling == PushAllowPolling.TOKEN and
is_true(self.get_tokeninfo(POLLING_ALLOWED, default='True')))) or res):
validity = int(get_from_config('DefaultChallengeValidityTime', 120))
tokentype = self.get_tokentype().lower()
# Maybe there is a PushChallengeValidityTime...
lookup_for = tokentype.capitalize() + 'ChallengeValidityTime'
validity = int(get_from_config(lookup_for, validity))
# Create the challenge in the database
db_challenge = Challenge(self.token.serial,
transaction_id=transactionid,
challenge=challenge,
data=data,
session=options.get("session"),
validitytime=validity)
db_challenge.save()
self.challenge_janitor()
transactionid = db_challenge.transaction_id
# If sending the Push message failed, we log a warning
if not res:
log.warning(f"Failed to submit message to Firebase service for token {self.token.serial}.")
message += " " + ERROR_CHALLENGE_TEXT
if is_true(options.get("exception")):
raise ValidateError("Failed to submit message to Firebase service.")
else:
log.warning(f"The token {self.token.serial} has no tokeninfo {PushAction.FIREBASE_CONFIG}. "
f"The message could not be sent.")
message += " " + ERROR_CHALLENGE_TEXT
if is_true(options.get("exception")):
raise ValidateError("The token has no tokeninfo. Can not send via Firebase service.")
reply_dict.update({"attributes": {"hideResponseInput": client_mode != ClientMode.INTERACTIVE},
"client_mode": client_mode})
return True, message, transactionid, reply_dict
[docs]
@check_token_locked
def authenticate(self, passw: str, user: User = None, options: dict = None) -> tuple[bool, int, dict | None]:
"""
High level interface which covers the check_pin and check_otp
This is the method that verifies single shot authentication.
The challenge is sent to the smartphone app and privacyIDEA
waits for the response to arrive.
:param passw: the password which could be pin+otp value
:type passw: string
:param user: The authenticating user
:type user: User object
:param options: dictionary of additional request parameters
:type options: dict
:return: returns tuple of
1. true or false for the pin match,
2. the otpcounter (int) and the
3. reply (dict) that will be added as additional information in the
JSON response of ``/validate/check``.
:rtype: tuple
"""
otp_counter = -1
reply = None
pin_match = self.check_pin(passw, user=user, options=options)
# Check policies
g = options.get("g")
require_presence = Match.user(g, scope=SCOPE.AUTH, action=PushAction.REQUIRE_PRESENCE,
user_object=user).any()
code_to_phone = Match.user(g, scope=SCOPE.AUTH, action=PushAction.PUSH_CODE_TO_PHONE,
user_object=user).any()
code_to_phone_enabled = (is_true(code_to_phone) and not is_true(require_presence)
and not options.get(PushAction.WAIT, False))
if pin_match:
if not options.get("valid_token_num"):
# We should only do push_wait, if we do not already have successfully authenticated tokens!
waiting = int(options.get(PushAction.WAIT, 20))
# Trigger the challenge
_t, message, transaction_id, _attr = self.create_challenge(options=options)
if code_to_phone_enabled:
# code_to_phone: return challenge immediately, no waiting.
# The client_mode is INTERACTIVE so the client shows an input field.
# The user will enter the display_code after the smartphone confirms.
return True, -1, {"transaction_id": transaction_id, "message": message}
# Standard / require_presence: wait for the challenge to be answered
start_time = time.time()
while True:
db.session.commit()
otp_counter = self.check_challenge_response(options={"transaction_id": transaction_id})
elapsed_time = time.time() - start_time
if otp_counter >= 0 or elapsed_time > waiting or elapsed_time < 0:
break
time.sleep(POLL_INTERVAL - (elapsed_time % POLL_INTERVAL))
elif code_to_phone_enabled and options.get("transaction_id"):
# Step 2 of code_to_phone: the user submits the display_code shown after
# the smartphone confirmed. Delegate entirely to check_challenge_response,
# which enforces transaction_id binding and increments the failcount on
# wrong codes — avoiding both the unbounded-challenge-scan and the missing
# failcount increment that a hand-rolled loop here would have.
otp_counter = self.check_challenge_response(passw=passw, options=options)
if otp_counter >= 0:
return True, otp_counter, {}
return pin_match, otp_counter, reply
[docs]
@check_token_locked
def check_challenge_response(self, user: User = None, passw: str = None, options: dict = None) -> int:
"""
This function checks if the challenge for the given transaction_id was marked as answered correctly, if the mode
is standard or require_presence. In this case, the passw parameter does not matter because the challenge
has been answered by the smartphone, and we just check if that has happened correctly.
If the mode of the challenge is code_to_phone, this is a 2-step process:
1. Wait for the smartphone to confirm (smartphone_confirmed becomes True in the challenge data).
2. After confirmation, a display_code is stored in the challenge data. The client must send this
display_code via /validate/check. The display_code is only used for synchronization, the security
lies in the smartphone confirmation.
:param user: the requesting user
:type user: User object
:param passw: the password (pin+otp)
:type passw: string
:param options: additional arguments from the request, which could
be token specific. Usually "transaction_id"
:type options: dict
:return: return otp_counter. If -1, challenge does not match
:rtype: int
"""
options = options or {}
otp_counter = -1
transaction_id = options.get('transaction_id')
if transaction_id is None:
transaction_id = options.get('state')
if transaction_id is not None:
challenges = get_challenges(serial=self.token.serial, transaction_id=transaction_id)
for challenge in challenges:
# Check that the challenge is not expired
if challenge.is_valid():
data = challenge.get_data()
if isinstance(data, dict) and data.get("mode") == PushMode.CODE_TO_PHONE:
if not data.get("smartphone_confirmed"):
# Step 1 not completed yet; smartphone has not confirmed.
log.debug("code_to_phone: waiting for smartphone confirmation.")
return -1
# Step 2: smartphone has confirmed, check the display_code
display_code = data.get("display_code", "")
if display_code and display_code == passw:
return 1
elif passw is not None:
log.debug("Received the wrong display code for push code_to_phone!")
self.inc_failcount()
return -1
else:
# No passw provided yet (e.g. during polling)
return -1
else:
_, status = challenge.get_otp_status()
if status is True:
# create a positive response
otp_counter = 1
# do not delete the challenge yet, that is the job of the calling context
break
return otp_counter
[docs]
@classmethod
def enroll_via_validate(cls, g: Any, content: dict, user_obj: User, message: str = None) -> None:
"""
This class method is used in the policy ENROLL_VIA_MULTICHALLENGE.
It enrolls a new token of this type and returns the necessary information
to the client by modifying the content.
:param g: context object
:param content: The content of a response
:param user_obj: A user object
:param message: An alternative message displayed to the user during enrollment
:return: None, the content is modified
"""
# Get the firebase configuration from the policies
push_params = get_pushtoken_add_config(g, user_obj=user_obj)
token = init_token({"type": cls.get_class_type(), "genkey": 1, "2stepinit": 1}, user=user_obj)
# We are in step 1:
token.add_tokeninfo("enrollment_credential", geturandom(20, hex=True))
# We also store the Firebase config, that was used during the enrollment.
token.add_tokeninfo(PushAction.FIREBASE_CONFIG, push_params.get(PushAction.FIREBASE_CONFIG))
content.get("result")["value"] = False
content.get("result")["authentication"] = "CHALLENGE"
detail = content.setdefault("detail", {})
# Create a challenge!
challenge = token.create_challenge(options={"g": g, "user": user_obj, "session": ChallengeSession.ENROLLMENT})
# get details of token
params = get_init_tokenlabel_parameters(g, user_object=user_obj, token_type=cls.get_class_type())
params["policies"] = push_params
init_details = token.get_init_detail(params=params, user=user_obj)
detail["transaction_ids"] = [challenge[2]]
challenge_dict = {"transaction_id": challenge[2],
"image": init_details.get("pushurl", {}).get("img"),
"link": init_details.get("pushurl", {}).get("value"),
"client_mode": ClientMode.POLL,
"serial": token.token.serial,
"type": token.type,
"message": message or _("Please scan the QR code!")}
detail["multi_challenge"] = [challenge_dict]
detail.update(challenge_dict)
[docs]
@classmethod
def is_multichallenge_enrollable(cls) -> bool:
return True
[docs]
def get_enroll_url(self, user: User, params: dict) -> str:
"""
Return the URL to enroll this token. It is not supported by all token types.
:param user: The user object
:param params: Further parameters
:return: The URL containing all required information to enroll the token
"""
init_details = self.get_init_detail(params, user)
enroll_url = init_details.get("pushurl").get("value")
return enroll_url