# (c) NetKnights GmbH 2024, https://netknights.it
#
# This code is free software; you can redistribute it and/or
# modify it under the terms of the GNU AFFERO GENERAL PUBLIC LICENSE
# as published by the Free Software Foundation; either
# version 3 of the License, or any later version.
#
# This code is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU AFFERO GENERAL PUBLIC LICENSE for more details.
#
# You should have received a copy of the GNU Affero General Public
# License along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# SPDX-FileCopyrightText: 2024 Nils Behlen <nils.behlen@netknights.it>
# SPDX-FileCopyrightText: 2024 Jelina Unger <jelina.unger@netknights.it>
# SPDX-License-Identifier: AGPL-3.0-or-later
#
import copy
import importlib
import json
import logging
import os
from datetime import timezone, datetime
from typing import Union, Generator, Any
from flask import g
from sqlalchemy import func, select, and_
from sqlalchemy.orm import aliased
from sqlalchemy.sql import Select
from privacyidea.api.lib.utils import send_result
from privacyidea.lib.challenge import delete_challenges, get_challenges
from privacyidea.lib.config import get_from_config
from privacyidea.lib.containerclass import TokenContainerClass
from privacyidea.lib.containers.container_info import (PI_INTERNAL, TokenContainerInfoData, RegistrationState,
SERVER_URL, CHALLENGE_TTL)
from privacyidea.lib.containertemplate.containertemplatebase import ContainerTemplateBase
from privacyidea.lib.error import (ResourceNotFoundError, ParameterError, EnrollmentError, UserError, PolicyError,
ContainerNotRegistered, ContainerError)
from privacyidea.lib.log import log_with
from privacyidea.lib.machine import is_offline_token
from privacyidea.lib.token import (get_tokens_from_serial_or_user, get_tokens,
convert_token_objects_to_dicts, init_token)
from privacyidea.lib.user import User
from privacyidea.lib.utils import hexlify_and_unicode, parse_timedelta
from privacyidea.models import (TokenContainer, TokenContainerOwner, Token, TokenContainerToken,
Realm, TokenContainerTemplate, TokenContainerInfo, TokenContainerStates, db)
log = logging.getLogger(__name__)
[docs]
def delete_container_by_id(container_id: int) -> int:
"""
Delete the container with the given id. If it does not exist, raises a ResourceNotFoundError.
:param container_id: The id of the container to delete
:return: ID of the deleted container on success
"""
if not container_id:
raise ParameterError("Unable to delete container without id.")
container = find_container_by_id(container_id)
# Delete challenges
delete_challenges(serial=container.serial)
return container.delete()
[docs]
def delete_container_by_serial(serial: str) -> int:
"""
Delete the container with the given serial. If it does not exist, raises a ResourceNotFoundError.
:param serial: The serial of the container to delete
:return: ID of the deleted container on success
"""
if not serial:
raise ParameterError("Unable to delete container without serial.")
container = find_container_by_serial(serial)
# Delete challenges
delete_challenges(serial=serial)
return container.delete()
def _gen_serial(container_type: str) -> str:
"""
Generate a new serial for a container of the given type
:param container_type: The type of the container
:return: The generated serial
"""
serial_len = int(get_from_config("SerialLength") or 8)
prefix = "CONT"
for ctype, cls in get_container_classes().items():
if ctype.lower() == container_type.lower():
prefix = cls.get_class_prefix()
session = db.session
container_num = session.execute(
select(func.count()).select_from(TokenContainer).where(TokenContainer.type == container_type)
).scalar()
while True:
count = '{:04d}'.format(container_num)
rnd_len = serial_len - len(count)
rnd = ""
if rnd_len > 0:
rnd = hexlify_and_unicode(os.urandom(rnd_len)).upper()[0:rnd_len]
serial = f"{prefix}{count}{rnd}"
exists = session.execute(
select(TokenContainer).where(TokenContainer.serial == serial)
).scalar_one_or_none()
if not exists:
break
return serial
[docs]
def create_container_from_db_object(db_container: TokenContainer) -> Union[TokenContainerClass, None]:
"""
Create a TokenContainerClass object from the given db object.
:param db_container: The db object to create the container from
:return: The created container object or None if the container type is not supported
"""
for ctypes, cls in get_container_classes().items():
if ctypes.lower() == db_container.type.lower():
try:
container = cls(db_container)
except Exception as ex: # pragma: no cover
log.warning(f"Error creating container from db object: {ex}")
return None
return container
return None
[docs]
@log_with(log)
def find_container_by_id(container_id: int) -> TokenContainerClass:
"""
Returns the TokenContainerClass object for the given container id or raises a ResourceNotFoundError.
"""
db_container = db.session.execute(
select(TokenContainer).where(TokenContainer.id == container_id)
).scalar_one_or_none()
if not db_container:
raise ResourceNotFoundError(f"Unable to find container with id {container_id}.")
return create_container_from_db_object(db_container)
[docs]
def find_container_by_serial(serial: str) -> TokenContainerClass:
"""
Returns the TokenContainerClass object for the given container serial or raises a ResourceNotFoundError.
"""
if not serial:
db_container = None
else:
db_container = db.session.scalars(
select(TokenContainer).where(func.upper(TokenContainer.serial) == serial.upper())
).unique().one_or_none()
if not db_container:
raise ResourceNotFoundError(f"Unable to find container with serial {serial}.")
return create_container_from_db_object(db_container)
def _create_container_query(user: User = None, serial: str = None, ctype: str = None, token_serial: str = None,
realm: str = None, allowed_realms: list[str] = None, template: str = None,
description: str = None, assigned: bool = None, resolver: str = None, info: dict = None,
last_auth_delta: str = None, last_sync_delta: str = None, state: str = None,
sortby: str = 'serial', sortdir: str = 'asc') -> Select:
"""
Generates a sql query to filter containers by the given parameters.
:param user: container owner, optional
:param serial: container serial (case-insensitive and allows '*' as wildcard), optional
:param ctype: container type (case-insensitive and allows '*' as wildcard), optional
:param token_serial: serial of a token which is assigned to the container (case-insensitive and allows '*' as
wildcard), optional
:param realm: realm name to filter by (case-insensitive and allows '*' as wildcard), optional
:param allowed_realms: list of realms the user is allowed to see (None if all realms are allowed), optional
If realms and allowed_realms are given, the intersection of both lists is used. If there is no intersection, no
container is returned.
:param template: The name of the template the container was created with (case-sensitive and allows '*' as
wildcard), optional
:param description: The description of the container (case-insensitive and allows '*' as wildcard), optional
:param assigned: True if the container is assigned to a user, False if not, optional
:param resolver: The resolver of the user (case-insensitive and allows '*' as wildcard), optional
:param info: The info dictionary in the format {key: value} of length 1, optional
Both key and value can contain '*' as wildcard. key is case-sensitive, value is case-insensitive.
:param last_auth_delta: The maximum time difference the last authentication may have to now, e.g. "1y", "14d", "1h"
The following units are supported: y (years), d (days), h (hours), m (minutes), s (seconds)
:param last_sync_delta: The maximum time difference the last synchronization may have to now, e.g. "1y", "14d", "1h"
The following units are supported: y (years), d (days), h (hours), m (minutes), s (seconds)
:param state: State the container should have (case-insensitive and allows "*" as wildcard), optional
:param sortby: column to sort by, default is the container serial
:param sortdir: sort direction, default is ascending
:return: sql query
"""
stmt = select(TokenContainer)
realm1 = aliased(Realm)
if user:
stmt = stmt.join(TokenContainer.owners).where(TokenContainerOwner.user_id == user.uid)
if user.realm:
realm_db = db.session.execute(
select(realm1).where(func.lower(realm1.name) == user.realm.lower())
).scalar_one_or_none()
if realm_db:
stmt = stmt.where(TokenContainerOwner.realm_id == realm_db.id)
if user.resolver:
stmt = stmt.where(func.lower(TokenContainerOwner.resolver) == user.resolver.lower())
if serial and serial.strip("*"):
if "*" in serial:
stmt = stmt.where(TokenContainer.serial.ilike(serial.replace("*", "%")))
else:
stmt = stmt.where(func.upper(TokenContainer.serial) == serial.upper())
if ctype and ctype.strip("*"):
if "*" in ctype:
stmt = stmt.where(TokenContainer.type.ilike(ctype.replace("*", "%")))
else:
stmt = stmt.where(func.upper(TokenContainer.type) == ctype.upper())
if token_serial and token_serial.strip("*"):
if "*" in token_serial:
stmt = stmt.where(TokenContainer.tokens.any(Token.serial.ilike(token_serial.replace("*", "%"))))
else:
stmt = stmt.join(TokenContainer.tokens, isouter=True).where(
func.upper(Token.serial) == token_serial.upper()
)
if realm and realm.strip("*"):
# Correctly join with the aliased Realm table
stmt = stmt.join(TokenContainer.realms.of_type(realm1), isouter=True)
if "*" in realm:
# Use the input parameter 'realm' for the wildcard filter
stmt = stmt.where(realm1.name.ilike(realm.replace("*", "%")))
else:
# Use the input parameter 'realm' for the exact match filter
stmt = stmt.where(func.lower(realm1.name) == realm.lower())
# Use separate alias for each join to avoid conflicts
realm_alias_allowed = aliased(Realm)
if allowed_realms:
allowed_realms = [r.lower() for r in allowed_realms]
stmt = stmt.join(TokenContainer.realms.of_type(realm_alias_allowed), isouter=True).where(
func.lower(realm_alias_allowed.name).in_(allowed_realms)
)
if resolver and resolver.strip("*"):
if "*" in resolver:
stmt = stmt.where(TokenContainer.owners.any(
TokenContainerOwner.resolver.ilike(resolver.replace("*", "%"))
))
else:
stmt = stmt.where(TokenContainer.owners.any(
func.lower(TokenContainerOwner.resolver) == resolver.lower()
))
if template and template.strip("*"):
if "*" in template:
stmt = stmt.where(TokenContainer.template.has(
TokenContainerTemplate.name.ilike(template.replace("*", "%"))
))
else:
stmt = stmt.where(TokenContainer.template.has(
TokenContainerTemplate.name == template
))
if description and description.strip("*"):
if "*" in description:
stmt = stmt.where(TokenContainer.description.ilike(description.replace("*", "%")))
else:
stmt = stmt.where(func.lower(TokenContainer.description) == description.lower())
if assigned:
stmt = stmt.where(TokenContainer.owners.any())
elif assigned is False:
stmt = stmt.where(~TokenContainer.owners.any())
if info:
if len(info) == 1:
key, value = list(info.items())[0]
# Start with a list to hold all the filter conditions
conditions = []
if key and key.strip("*"):
conditions.append(TokenContainerInfo.key.ilike(key.replace("*", "%")))
if value and value.strip("*"):
conditions.append(func.lower(TokenContainerInfo.value).ilike(value.replace("*", "%").lower()))
# If there are conditions, apply them with `and_`
if conditions:
stmt = stmt.where(
TokenContainer.info_list.any(and_(*conditions))
)
if last_auth_delta:
time_delta = parse_timedelta(last_auth_delta)
max_time = datetime.now(timezone.utc).replace(tzinfo=None) - time_delta
stmt = stmt.where(TokenContainer.last_seen > max_time)
if last_sync_delta:
time_delta = parse_timedelta(last_sync_delta)
max_time = datetime.now(timezone.utc).replace(tzinfo=None) - time_delta
stmt = stmt.where(TokenContainer.last_updated > max_time)
if state and state.strip("*"):
if "*" in state:
stmt = stmt.where(TokenContainer.states.any(
TokenContainerStates.state.ilike(state.replace("*", "%"))
))
else:
stmt = stmt.where(TokenContainer.states.any(
func.lower(TokenContainerStates.state) == state.lower()
))
# Sorting
cols = TokenContainer.__table__.columns
if isinstance(sortby, str):
if sortby in cols:
sort_col = cols.get(sortby)
else:
sort_col = TokenContainer.serial
else:
sort_col = sortby
if sortdir == "desc":
stmt = stmt.order_by(sort_col.desc())
else:
stmt = stmt.order_by(sort_col.asc())
#print("----------------------------- CREATE CONTAINER QUERY -----------------------------")
#from sqlalchemy.dialects import postgresql
#print(stmt.compile(dialect=postgresql.dialect(), compile_kwargs={"literal_binds": True}))
#print("----------------------------------------------------------------------------------")
return stmt
[docs]
def get_all_containers(user: User = None, serial: str = None, ctype: str = None, token_serial: str = None,
realm: str = None, allowed_realms: list[str] = None, sortby: str = 'serial',
sortdir: str = 'asc', template: str = None, description: str = None, assigned: bool = None,
resolver: str = None, info: dict = None, last_auth_delta: str = None,
last_sync_delta: str = None, state: str = None, page: int = 0,
pagesize: int = 0) -> dict[str, Union[int, None, list[TokenContainerClass]]]:
"""
This function is used to retrieve a container list, that can be displayed in
the Web UI. It supports pagination if either page or pagesize is given (e.g. >0).
Each retrieved page will also contain a "next" and a "prev", indicating
the next or previous page. If page and pagesize are both smaller than 0, no pagination is used.
The containers are filtered by the given parameters.
:param user: container owner, optional
:param serial: container serial (case-insensitive and allows '*' as wildcard), optional
:param ctype: container type (case-insensitive and allows '*' as wildcard), optional
:param token_serial: serial of a token which is assigned to the container (case-insensitive and allows '*'
as wildcard), optional
:param realm: name of the realm the container is assigned to (case-insensitive and allows '*' as wildcard), optional
:param allowed_realms: list of realms the user is allowed to see (None if all realms are allowed), optional
If realms and allowed_realms are given, the intersection of both lists is used. If there is no intersection, no
container is returned.
:param template: The name of the template the container was created with (case-sensitive and allows '*' as wildcard)
, optional
:param description: The description of the container (case-insensitive and allows '*' as wildcard), optional
:param assigned: True if the container is assigned to a user, False otherwise, optional
:param resolver: The resolver of the user (case-insensitive and allows '*' as wildcard), optional
:param info: The info dictionary in the format {key: value} of length 1, optional
Both key and value can contain '*' as wildcard. key is case-sensitive, value is case-insensitive.
:param last_auth_delta: The maximum time difference the last authentication may have to now, e.g. "1y", "14d", "1h"
The following units are supported: y (years), d (days), h (hours), m (minutes), s (seconds)
:param last_sync_delta: The maximum time difference the last synchronization may have to now, e.g. "1y", "14d", "1h"
The following units are supported: y (years), d (days), h (hours), m (minutes), s (seconds)
:param state: State the container should have (case-insensitive and allows "*" as wildcard), optional
:param sortby: column to sort by, default is the container serial
:param sortdir: sort direction, default is ascending
:param page: The number of the page to view. Starts with 1 ;-)
:param pagesize: The size of the page
:returns: A dictionary with a list of containers at the key 'containers' and optionally pagination entries ('prev',
'next', 'current', 'count')
"""
sql_query: Select = _create_container_query(user=user, serial=serial, ctype=ctype, token_serial=token_serial, realm=realm,
allowed_realms=allowed_realms, template=template, description=description,
assigned=assigned, resolver=resolver, info=info,
last_auth_delta=last_auth_delta,
last_sync_delta=last_sync_delta, state=state, sortby=sortby, sortdir=sortdir)
ret = {}
# Paginate if requested
if page > 0 or pagesize > 0:
ret = create_pagination(page, pagesize, sql_query, "containers")
else: # No pagination
ret["containers"] = db.session.scalars(sql_query).unique().all()
container_list = [create_container_from_db_object(db_container) for db_container in ret["containers"]]
ret["containers"] = container_list
return ret
[docs]
def get_container_generator(pagesize: int = 10, **kwargs) -> Generator[list[TokenContainerClass], None, None]:
"""
Generator that yields pages of containers.
:param page_size: Number of containers per page
:param kwargs: Filter arguments for get_all_containers
:yield: List of TokenContainerClass objects for each page
"""
page = 1
while True:
result = get_all_containers(page=page, pagesize=pagesize, **kwargs)
containers = result.get("containers", [])
if not containers:
break
yield containers
if not result.get("next"):
break
page += 1
[docs]
def find_container_for_token(serial: str) -> Union[TokenContainerClass, None]:
"""
Returns a TokenContainerClass object for the given token or raises a ResourceNotFoundError
if the token does not exist.
:param serial: Serial of the token
:return: container object or None if the token is not in a container
"""
session = db.session
db_token = session.execute(
select(Token).where(Token.serial == serial)
).unique().scalar_one_or_none()
if not db_token:
raise ResourceNotFoundError(f"Unable to find token with serial {serial}.")
token_id = db_token.id
row = session.execute(
select(TokenContainerToken).where(TokenContainerToken.token_id == token_id)
).scalar_one_or_none()
if row:
container_id = row.container_id
return find_container_by_id(container_id)
return None
[docs]
def get_container_classes() -> dict[str, type[TokenContainerClass]]:
"""
Returns a dictionary of all available container classes in the format: { type: class }.
New container types have to be added here.
"""
# className: module
classes = {
"TokenContainerClass": "privacyidea.lib.containerclass",
"SmartphoneContainer": "privacyidea.lib.containers.smartphone",
"YubikeyContainer": "privacyidea.lib.containers.yubikey"
}
ret = {}
for cls, mod in classes.items():
try:
m = importlib.import_module(mod)
c = getattr(m, cls)
ret[c.get_class_type().lower()] = c
except Exception as ex: # pragma: no cover
log.warning(f"Error importing module {cls}: {ex}")
return ret
[docs]
def init_container(params: dict[str, any]) -> dict[str, Union[str, list]]:
"""
Create a new container with the given parameters. Requires at least the type.
:param params: The parameters for the new container as dictionary like
::
{
"type": ...,
"description": ..., (optional)
"container_serial": ..., (optional)
"user": ..., Name of the user (optional)
"realm": ..., Name of the realm (optional)
"template": {...}, Template as dictionary (optional)
"template_name": ..., Name of the template (optional)
}
To assign a user to the container, the user and realm are required.
:return: Dictionary containing the serial of the created container and a list of init details for tokens if the
container is created from a template
::
{
"container_serial": "CONT0001",
"template_tokens": [{"type": "hotp", ...}, ...]
}
"""
ctype = params.get("type")
if not ctype:
raise EnrollmentError("Type parameter is required!")
if ctype.lower() not in get_container_classes().keys():
raise EnrollmentError(f"Type '{ctype}' is not a valid type!")
template_dict = params.get("template")
template_name = params.get("template_name")
if template_dict and template_name:
raise ParameterError("Both template and template_name are given. Choose only one!")
desc = params.get("description") or ""
serial = params.get("container_serial")
if serial:
# Check if a container with this serial already exists
containers = get_all_containers(serial=serial)["containers"]
if len(containers) > 0:
raise EnrollmentError(f"Container with serial {serial} already exists!")
else:
serial = _gen_serial(ctype)
db_container = TokenContainer(serial=serial, container_type=ctype.lower(), description=desc)
db_container.save()
container = create_container_from_db_object(db_container)
# Creation Date
creation_date = datetime.now(timezone.utc).isoformat(timespec="seconds")
container.update_container_info(
[TokenContainerInfoData(key="creation_date", value=creation_date, info_type=PI_INTERNAL)])
# Template handling
template_tokens = []
if template_name:
# Use template from db
try:
template = get_template_obj(template_name)
except ResourceNotFoundError as ex:
template = None
log.warning(f"Template {template_name} does not exists, create container without template: {ex}")
if template:
if template.container_type == ctype:
template_options = template.get_template_options_as_dict()
template_tokens = template_options.get("tokens", [])
container.template = template_name
else:
log.warning(f"Template {template_name} is not of type {ctype}, create container without template.")
elif template_dict:
# Use template dictionary
if template_dict.get("container_type") == ctype:
# check if the template was modified, otherwise save the template name
stored_templates = get_templates_by_query(name=template_dict["name"])["templates"]
if len(stored_templates) > 0:
original_template = stored_templates[0]
original_template_used = compare_template_dicts(template_dict, original_template)
if original_template_used:
container.template = original_template["name"]
template_options = template_dict.get("template_options", {})
# tokens from template
template_tokens = template_options.get("tokens", [])
else:
log.warning(f"Template {template_name} is not of type {ctype}, create container without template.")
user = params.get("user")
realm = params.get("realm")
realms = []
if user and not realm:
log.info(f"Assigning container {container.serial} to user {user} on "
f"creation requires both user and realm parameters!")
elif realm and not user:
realms.append(realm)
container.set_realms(realms, add=True)
elif user and realm:
try:
container.add_user(User(login=user, realm=realm))
except UserError as ex:
log.warning(f"Error setting user for container {serial}: {ex}")
container.set_states(['active'])
res = {"container_serial": serial, "template_tokens": template_tokens}
return res
[docs]
def create_container_tokens_from_template(container_serial: str, template_tokens: list, request,
user_role: str) -> dict[str, dict]:
"""
Create tokens for the container from the given template. The token policies are checked and the enroll information
is read from the policies for each token. The tokens owner and the enroll information are added to the request
object to check the corresponding policies. All errors are caught and logged to be able to create the remaining
tokens.
:param container_serial: The serial of the container
:param template_tokens: The template to create the tokens from as list of dictionaries where each dictionary
contains the details for a token to be enrolled
:param request: The request object
:param user_role: The role of the user ('admin' or 'user')
:return: A dictionary containing the enroll details for each created token in the format:
::
{
<token_serial>: {"serial": <token_serial>,
"type": <token_type>,
"init_params": <params used for the enrollment>, ...},
}
"""
container = find_container_by_serial(container_serial)
users = container.get_users()
if len(users) > 0:
container_owner = users[0]
else:
container_owner = User()
realms = get_container_realms(container_serial)
init_result = {}
request_all_data_original = copy.deepcopy(request.all_data)
policies = copy.deepcopy(g.policies)
# Get policies for the token
from privacyidea.api.lib.prepolicy import (check_max_token_realm, sms_identifiers,
indexedsecret_force_attribute, pushtoken_add_config,
tantoken_count, papertoken_count, init_token_length_contents,
init_token_defaults, check_external, check_otp_pin, encrypt_pin,
init_random_pin, twostep_enrollment_parameters,
twostep_enrollment_activation, enroll_pin,
init_tokenlabel, check_token_init, check_max_token_user,
require_description, force_server_generate_key)
from privacyidea.api.lib.postpolicy import check_verify_enrollment, save_pin_change
# Create each token defined in the template. The template contains the enroll information for each token.
for token_info in template_tokens:
token = None
user = User()
# If the user flag is set, the token is assigned to the container owner: set the full user information in the
# enroll information
if token_info.get("user"):
if container_owner:
token_info["user"] = container_owner.login
token_info["realm"] = container_owner.realm
token_info["resolver"] = container_owner.resolver
elif realms:
token_info["realm"] = realms[0]
del token_info["user"]
else:
del token_info["user"]
user = container_owner
elif user_role == "user" and request.User:
# Users are always assigned to the tokens, only admins can create tokens without a user
user = request.User
token_info["user"] = user.login
token_info["realm"] = user.realm
token_info["resolver"] = user.resolver
elif token_info.get("user") is not None:
del token_info["user"]
# The pre-policy decorator functions require a request object containing the enroll information.
# Hence, we need to clear the data in the request object from the previous token and set the new enroll
# information for the current token.
request.all_data = {}
request.all_data.update(token_info)
g.policies = {}
# Pre-policy checks
# TODO: Refactor including original uses of these functions (decorators on token init endpoint)
try:
check_max_token_realm(request, None)
require_description(request, None)
check_max_token_user(request, None)
check_token_init(request, None)
init_tokenlabel(request, None)
enroll_pin(request, None)
twostep_enrollment_activation(request, None)
twostep_enrollment_parameters(request, None)
init_random_pin(request, None)
encrypt_pin(request, None)
check_otp_pin(request, None)
check_external(request, None)
init_token_defaults(request, None)
init_token_length_contents(request, None)
papertoken_count(request, None)
sms_identifiers(request, None)
tantoken_count(request, None)
pushtoken_add_config(request, None)
indexedsecret_force_attribute(request, None)
force_server_generate_key(request, None)
except Exception as ex:
log.warning(f"Error checking pre-policies for token {token_info} created from template: {ex}")
continue
init_params = request.all_data
init_params["policies"] = g.policies
try:
token = init_token(init_params, user)
init_result[token.get_serial()] = {"type": token.get_type()}
init_result[token.get_serial()].update(token.get_init_detail(init_params, user))
container.add_token(token)
except Exception as ex:
log.warning(f"Error creating token {token_info} from template: {ex}")
if token:
if init_result.get(token.get_serial()):
del init_result[token.get_serial()]
token.delete_token()
continue
# Post-policy checks
try:
# Post-policy decorators require a response object containing the result of the token creation.
response = send_result(True, details=init_result[token.get_serial()])
check_verify_enrollment(request, response)
save_pin_change(request, response)
except Exception as ex:
log.warning(f"Error checking post-policy for token {token_info} created from template: {ex}")
continue
init_result[token.get_serial()].update(response.json["detail"])
init_result[token.get_serial()]["init_params"] = init_params
request.all_data = request_all_data_original
g.policies = policies
return init_result
[docs]
def add_token_to_container(container_serial: str, token_serial: str) -> bool:
"""
Add a single token to a container. If a token is already in a container it is removed from the old container.
Raises a ResourceNotFoundError if either the container or token does not exist.
:param container_serial: The serial of the container
:param token_serial: The serial of the token
:return: True on success
"""
container = find_container_by_serial(container_serial)
# Get the token object
token = get_tokens_from_serial_or_user(token_serial, None)[0]
# Check if the token is in a container
old_container = find_container_for_token(token_serial)
if old_container and old_container.serial != container.serial:
# Remove token from old container
remove_token_from_container(old_container.serial, token_serial)
log.info(f"Adding token {token.get_serial()} to container {container_serial}: "
f"Token removed from previous container {old_container.serial}.")
res = container.add_token(token)
return res
[docs]
def add_multiple_tokens_to_container(container_serial: str, token_serials: list) -> dict[str, bool]:
"""
Add the given tokens to the container with the given serial. Raises a ResourceNotFoundError if the container does
not exist. If a token is already in a container it is removed from the old container.
:param container_serial: The serial of the container
:param token_serials: A list of token serials to add
:return: A dictionary in the format {<token_serial>: <success>}
"""
# Raises ResourceNotFound if container does not exist
find_container_by_serial(container_serial)
ret = {}
for token_serial in token_serials:
try:
res = add_token_to_container(container_serial, token_serial)
except Exception as ex:
# We are catching the exception here to be able to add the remaining tokens
log.warning(f"Error adding token {token_serial} to container {container_serial}: {ex}")
res = False
ret[token_serial] = res
return ret
[docs]
def add_not_authorized_tokens_result(result: dict, not_authorized_serials: list) -> dict[str, bool]:
"""
Add the result False for all tokens the user is not authorized to manage.
:param result: The result dictionary in the format {token_serial: success}
:param not_authorized_serials: A list of token serials the user is not authorized to manage
:return: The result dictionary with the not authorized tokens added like {<token_serial>: False}
"""
if not_authorized_serials:
for serial in not_authorized_serials:
result[serial] = False
return result
[docs]
def get_container_classes_descriptions() -> dict[str, str]:
"""
Returns a dictionary of {"type": "Type: description"} entries for all container types.
Used to list the container types.
"""
ret = {}
classes = get_container_classes()
for container_type, container_class in classes.items():
ret[container_type] = f"{container_type.capitalize()}: {container_class.get_class_description()}"
return ret
[docs]
def get_container_token_types() -> dict[str, list[str]]:
"""
Returns a dictionary of {"type": ["tokentype0", "tokentype1", ...]} entries for all container types.
Used to list the supported token types for each container type.
"""
ret = {}
classes = get_container_classes()
for container_type, container_class in classes.items():
ret[container_type] = container_class.get_supported_token_types()
return ret
[docs]
def remove_token_from_container(container_serial: str, token_serial: str) -> bool:
"""
Remove the given token from the container with the given serial.
Raises a ResourceNotFoundError if the container or token does not exist.
:param container_serial: The serial of the container
:param token_serial: the serial of the token to remove
:return: True on success
"""
container = find_container_by_serial(container_serial)
res = container.remove_token(token_serial)
return res
[docs]
def remove_multiple_tokens_from_container(container_serial: str, token_serials: str) -> dict[str, bool]:
"""
Remove the given tokens from the container with the given serial.
Raises a ResourceNotFoundError if no container for the given serial exist.
Errors of removing tokens are caught and only logged, in order to be able to remove the remaining
tokens in the list.
:param container_serial: The serial of the container
:param token_serials: A list of token serials to remove
:return: A dictionary in the format {token_serial: success}
"""
# Check that container exists
find_container_by_serial(container_serial)
ret = {}
for token_serial in token_serials:
try:
res = remove_token_from_container(container_serial, token_serial)
except Exception as ex:
# We are catching the exception here to be able to remove the remaining tokens
log.warning(f"Error removing token {token_serial} from container {container_serial}: {ex}")
res = False
ret[token_serial] = res
return ret
[docs]
def add_container_info(serial: str, ikey: str, ivalue) -> bool:
"""
Add the given info to the container with the given serial.
If the key already exists, the value is updated. However, if the entry is of type PI_INTERNAL, the value can not be
modified.
:param serial: The serial of the container
:param ikey: The info key
:param ivalue: The info value
:returns: True on success
"""
container = find_container_by_serial(serial)
# Check if key already exists and if it is an internal key
internal_keys = container.get_internal_info_keys()
if ikey in internal_keys:
raise PolicyError(f"The key {ikey} is an internal entry and can not be modified.")
container.update_container_info([TokenContainerInfoData(key=ikey, value=ivalue)])
return True
[docs]
def set_container_info(serial: str, info: dict) -> dict[str, bool]:
"""
Set the given info to the container with the given serial.
Keys of type PI_INTERNAL can not be modified and will be ignored.
:param serial: The serial of the container
:param info: The info dictionary in the format {key: value}
:returns: Dictionary with the success state for each info key
"""
container = find_container_by_serial(serial)
result = {}
# Remove internal keys from the info dictionary, they can not be modified by the user
internal_keys = container.get_internal_info_keys()
not_internal_info = []
for key, value in info.items():
if key not in internal_keys:
info_type = info.get(f"{key}.type")
not_internal_info.append(TokenContainerInfoData(key=key, value=value, info_type=info_type))
result[key] = True
else:
result[key] = False
log.warning(f"The key {key} is an internal entry and can not be modified.")
container.set_container_info(not_internal_info)
return result
[docs]
def get_container_info_dict(serial: str, ikey: str = None) -> dict[str, Union[str, None]]:
"""
Returns the info of the given key or all infos if no key is given for the container with the given serial.
:param serial: The serial of the container
:param ikey: The info key or None to get all info keys
:return: The info dict
"""
container = find_container_by_serial(serial)
container_info = {container_info.key: container_info.value for container_info in container.get_container_info()}
if ikey:
if ikey in container_info.keys():
container_info = {ikey: container_info[ikey]}
else:
container_info = {ikey: None}
log.warning(f"Info key {ikey} not found in container {serial}.")
return container_info
[docs]
def delete_container_info(serial: str, ikey: str = None) -> dict[str, bool]:
"""
Delete the info of the given key or all infos if no key is given.
Internal infos are not deleted
:param serial: The serial of the container
:param ikey: The info key or None to delete all info keys
:return: Dictionary with all info keys or only ikey if given and the value True on success, False otherwise
"""
container = find_container_by_serial(serial)
res = container.delete_container_info(ikey, keep_internal=True)
return res
[docs]
def assign_user(serial: str, user: User) -> bool:
"""
Assign a user to a container.
:param serial: container serial
:param user: user to assign to the container
:return: True on success, False otherwise
"""
container = find_container_by_serial(serial)
res = container.add_user(user)
return res
[docs]
def unassign_user(serial: str, user: User) -> bool:
"""
Unassign a user from a container.
:param serial: container serial
:param user: user to unassign from the container
:return: True on success, False otherwise
"""
container = find_container_by_serial(serial)
res = container.remove_user(user)
return res
[docs]
def set_container_description(serial: str, description: str):
"""
Set the description of a container.
:param serial: serial of the container
:param description: new description
"""
container = find_container_by_serial(serial)
container.description = description
[docs]
def set_container_states(serial: str, states: list[str]) -> dict[str, bool]:
"""
Set the states of a container.
:param serial: serial of the container
:param states: new states as list of str
:returns: Dictionary in the format {state: success}
"""
container = find_container_by_serial(serial)
res = container.set_states(states)
return res
[docs]
def add_container_states(serial: str, states: list[str]) -> dict[str, bool]:
"""
Add the states to a container.
:param serial: serial of the container
:param states: additional states as list of str
:returns: Dictionary in the format {state: success}
"""
container = find_container_by_serial(serial)
res = container.add_states(states)
return res
[docs]
def set_container_realms(serial: str, realms: list[str],
allowed_realms: Union[list[str], None] = []) -> dict[str, bool]:
"""
Set the realms of a container.
:param serial: serial of the container
:param realms: new realms as list of str
:param allowed_realms: A list of realms the admin is allowed to set (None if all realms are allowed), optional
:returns: Dictionary in the format {realm: success}, the entry 'deleted' indicates whether existing realms were
deleted.
"""
container = find_container_by_serial(serial)
old_realms = [realm.name for realm in container.realms]
# Check if admin is allowed to set the realms
matching_realms = realms
res_failed = {}
if allowed_realms:
matching_realms = list(set(realms).intersection(allowed_realms))
excluded_realms = list(set(realms) - set(matching_realms))
if len(excluded_realms) > 0:
log.info(f"User is not allowed to set realms {excluded_realms} for container {serial}.")
res_failed = {realm: False for realm in excluded_realms}
# Check if admin is allowed to remove the old realms
not_allowed_realms = set(old_realms) - set(allowed_realms)
# Add realms that are not allowed to be removed to the set list
matching_realms = list(set(matching_realms).union(not_allowed_realms))
# Set realms
res = container.set_realms(matching_realms, add=False)
res.update(res_failed)
return res
[docs]
def add_container_realms(serial: str, realms: list[str], allowed_realms: Union[list[str], None]) -> dict[str, bool]:
"""
Add the realms to the container realms.
:param serial: serial of the container
:param realms: new realms as list of str
:param allowed_realms: A list of realms the admin is allowed to set, optional
:returns: Dictionary in the format {realm: success}, the entry 'deleted' indicates whether existing realms were
deleted.
"""
container = find_container_by_serial(serial)
# Check if admin is allowed to set the realms
matching_realms = realms
res_failed = {}
if allowed_realms:
matching_realms = list(set(realms).intersection(allowed_realms))
excluded_realms = list(set(realms) - set(matching_realms))
if len(excluded_realms) > 0:
log.info(f"User is not allowed to set realms {excluded_realms} for container {serial}.")
res_failed = {realm: False for realm in excluded_realms}
# Add realms
res = container.set_realms(matching_realms, add=True)
res.update(res_failed)
return res
[docs]
def get_container_realms(serial: str) -> list[str]:
"""
Get the realms of the container.
:param serial: serial of the container
:returns: List of realm names
"""
container = find_container_by_serial(serial)
return [realm.name for realm in container.realms]
[docs]
def create_container_dict(container_list: list[TokenContainerClass], no_token: bool = False, user: User = None,
logged_in_user_role: str = 'user', allowed_token_realms: Union[list[str], None] = [],
hide_token_info: list[str] = None, hide_container_info: list[str] = None) -> list[dict]:
"""
Create a dictionary for each container in the list.
It contains the container properties, owners, realms, tokens and info.
The information is only provided if the user is allowed to see it.
:param container_list: List of container objects
:param no_token: If True, the token information is not included
:param user: The user object requesting the containers
:param logged_in_user_role: The role of the logged-in user ('admin' or 'user')
:param allowed_token_realms: A list of realms the admin is allowed to see tokens from
:param hide_token_info: List of token info keys to hide in the response, optional
:param hide_container_info: List of container info keys to hide in the response, optional
:return: List of container dictionaries
Example of a returned list:
::
[
{
"type": "generic",
"serial": "CONT0001",
"description": "Container description",
"last_authentication": "2021-06-01T12:00:00+00:00",
"last_synchronization": "2021-06-01T12:00:00+00:00",
"states": ["active"],
"users": [
{
"user_name": "user1",
"user_realm": "realm1",
"user_resolver": "resolver1",
"user_id": 1
}
],
"tokens": [
{
"serial": "TOTP0001",
"type": "totp",
"active": true,
...
}],
"info": {"hash_algorithm": "SHA256", ...},
"internal_info_keys": ["hash_algorithm"],
"realms": ["realm1", "realm2"],
"template": "template1"
}, ...
]
"""
res: list = []
for container in container_list:
container_dict = container.get_as_dict(include_tokens=not no_token, public_info=True,
additional_hide_info=hide_container_info)
if not no_token:
token_serials = ",".join(container_dict["tokens"])
tokens_dict_list = []
if len(token_serials) > 0:
tokens = get_tokens(serial=token_serials)
tokens_dict_list = convert_token_objects_to_dicts(tokens, user=user, user_role=logged_in_user_role,
allowed_realms=allowed_token_realms,
hidden_token_info=hide_token_info)
container_dict["tokens"] = tokens_dict_list
res.append(container_dict)
return res
[docs]
def create_endpoint_url(base_url: str, endpoint: str) -> str:
"""
Creates the url for an endpoint. It concat the base_url and the endpoint if the endpoint is not already in the
base_url. base_url and endpoint are separated by a slash.
:param base_url: The base url of the host
:param endpoint: The endpoint
:return: The url for the endpoint
:rtype: str
"""
if endpoint not in base_url:
if base_url[-1] != "/":
base_url += "/"
endpoint_url = base_url + endpoint
else:
endpoint_url = base_url
return endpoint_url
[docs]
def init_registration(container: TokenContainerClass, container_rollover: bool, server_url: str, registration_ttl: int,
ssl_verify: bool, challenge_ttl: int, params: dict) -> dict:
"""
Initiates the registration or rollover of a container. Checks if the container is in a valid registration state to
do so. The last synchronization and authentication timestamps from a potential previous old registration are
deleted and according registration data is written to the container info.
:param container: The container to be registered
:param container_rollover: True if a rollover should be performed instead of a fresh registration
:param server_url: The base url of the privacyIDEA server the container can contact
:param registration_ttl: The time in minutes the registration link is valid
:param ssl_verify: True if SSL should be used for the communication between the client and the server
:param challenge_ttl: Time in minutes a challenge is valid
:param params: Further container type specific parameters required for the registration
:return: A dictionary with the registration data (container type specific)
An example of a returned dictionary for a smartphone container:
::
{
"container_url": {
"description": "URL for privacyIDEA Container Registration",
"value": <url>,
"img": <qr code of the url>
},
"nonce": "ajhbdsuiuojno49877n4no3u09on38r98n",
"time_stamp": "2020-08-25T14:00:00.000000+00:00",
"key_algorithm": "secp384r1",
"hash_algorithm": "SHA256",
"ssl_verify": "True",
"ttl": 10,
"passphrase": <Passphrase prompt displayed to the user in the app> (optional)
}
"""
# Check registration state: registration init is only allowed for None (not yet registered) and "client_wait"
# otherwise do a rollover
registration_state = container.registration_state
if container_rollover:
if registration_state not in [RegistrationState.REGISTERED, RegistrationState.ROLLOVER,
RegistrationState.ROLLOVER_COMPLETED]:
raise ContainerNotRegistered("Container is not registered.")
elif registration_state not in [RegistrationState.NOT_REGISTERED, RegistrationState.CLIENT_WAIT]:
raise ContainerError("Container is already registered.")
# Reset last synchronization and authentication time stamps from possible previous registration
container.reset_last_synchronization()
container.reset_last_authentication()
# registration
scope = create_endpoint_url(server_url, "container/register/finalize")
res = container.init_registration(server_url, scope, registration_ttl, ssl_verify, params)
if container_rollover:
# Set registration state
info = [TokenContainerInfoData(key=RegistrationState.get_key(), value=RegistrationState.ROLLOVER.value,
info_type=PI_INTERNAL),
TokenContainerInfoData(key="rollover_server_url", value=server_url, info_type=PI_INTERNAL),
TokenContainerInfoData(key="rollover_challenge_ttl", value=str(challenge_ttl), info_type=PI_INTERNAL)]
else:
# save policy values in container info
info = [TokenContainerInfoData(key="server_url", value=server_url, info_type=PI_INTERNAL),
TokenContainerInfoData(key="challenge_ttl", value=str(challenge_ttl), info_type=PI_INTERNAL)]
container.update_container_info(info)
return res
[docs]
def finalize_registration(container_serial: str, params: dict) -> dict:
"""
Finalize the registration of a container if the challenge response is valid.
If the container is in the registration_state `rollover`, it finalizes the container rollover.
:param container_serial: The serial of the container
:param params: The parameters for the registration as dictionary
:return: dictionary with container specific information
"""
# Get container
container = find_container_by_serial(container_serial)
container_info = container.get_container_info_dict()
registration_state = container.registration_state
# Update params with registration url
if registration_state == RegistrationState.ROLLOVER:
server_url = container_info.get("rollover_server_url")
else:
server_url = container_info.get("server_url")
if server_url is None:
log.debug("Server url is not set in the container info. Ensure that registration/init is called first.")
server_url = " "
scope = create_endpoint_url(server_url, "container/register/finalize")
params.update({'scope': scope})
res = container.finalize_registration(params)
if registration_state == RegistrationState.ROLLOVER:
# container registration rolled over: set rollover info as correct info
for key, value in container_info.items():
if key.find("rollover_") == 0:
original_key = key.replace("rollover_", "")
container.update_container_info(
[TokenContainerInfoData(key=original_key, value=value, info_type=PI_INTERNAL)])
container.delete_container_info(key, keep_internal=False)
finalize_container_rollover(container)
container.update_container_info([TokenContainerInfoData(key=RegistrationState.get_key(),
value=RegistrationState.ROLLOVER_COMPLETED.value,
info_type=PI_INTERNAL)])
return res
[docs]
def finalize_container_rollover(container: TokenContainerClass):
"""
Finalize the rollover of a container. For each token in the container a rollover is performed.
All previous challenges are deleted.
:param container: The container object
"""
tokens = container.get_tokens()
# Offline tokens can not be rolled over, that would invalidate the offline otp values
offline_serials = [token.get_serial() for token in tokens if is_offline_token(token.get_serial())]
online_tokens = [token for token in tokens if token.get_serial() not in offline_serials]
if len(offline_serials) > 0:
log.info(f"The following offline tokens are in the container: {offline_serials}. "
"They can not be rolled over.")
for token in online_tokens:
params = {"serial": token.get_serial(),
"type": token.get_type(),
"genkey": True,
"rollover": True}
token_info = token.get_tokeninfo()
params.update(token_info)
try:
token = init_token(params)
except Exception as ex:
# Do not block the rollover process
log.debug(f"Error during rollover of token {token.get_serial()} in container rollover: {ex}")
# Delete previous challenges of the container
delete_challenges(container.serial)
[docs]
def init_container_rollover(container: TokenContainerClass, server_url: str, challenge_ttl: int, registration_ttl: int,
ssl_verify: bool, params: dict) -> dict:
"""
Initializes the rollover of a container.
First the response to the challenge is validated. If it is valid, the registration is initialized.
The new registration info is not finally set until the new container successfully finalized the registration.
:param container: The container object
:param server_url: The server url of the privacyIDEA server the client can contact
:param challenge_ttl: The time to live of the challenge in minutes
:param registration_ttl: The time to live of the challenge for the registration in minutes
:param ssl_verify: If the client has to verify the ssl certificate of the server
:param params: Container type specific parameters for the registration as dictionary
:return: dictionary with container specific information for the client
"""
# Check challenge if rollover is allowed
rollover_scope = create_endpoint_url(server_url, "container/rollover")
params.update({"scope": rollover_scope})
container.check_challenge_response(params)
registration_scope = create_endpoint_url(server_url, "container/register/finalize")
params.update({"scope": registration_scope})
# Get registration data
res = container.init_registration(server_url, registration_scope, registration_ttl, ssl_verify, params)
# Set registration state
info = [TokenContainerInfoData(key=RegistrationState.get_key(), value=RegistrationState.ROLLOVER.value,
info_type=PI_INTERNAL),
TokenContainerInfoData(key=f"rollover_{SERVER_URL}", value=server_url, info_type=PI_INTERNAL),
TokenContainerInfoData(key=f"rollover_{CHALLENGE_TTL}", value=str(challenge_ttl), info_type=PI_INTERNAL)]
container.update_container_info(info)
return res
[docs]
def unregister(container: TokenContainerClass) -> bool:
"""
Unregister a container from the synchronization and deletes all challenges for the container.
:param container: The container object
:return: True on success
"""
# terminate registration
container.terminate_registration()
# Delete all challenges of the container
delete_challenges(serial=container.serial)
return True
[docs]
def set_options(serial: str, options: dict):
"""
Set the options of a container. The user has to be an admin or the owner of the container.
:param serial: The serial of the container
:param options: The options as dictionary
"""
container = find_container_by_serial(serial)
container.add_options(options)
[docs]
def get_container_template_classes() -> dict[str, type[ContainerTemplateBase]]:
"""
Returns a dictionary of all available container template classes in the format: { type: class }.
New container template types have to be added here.
"""
# className: module
classes = {
"ContainerTemplateBase": "privacyidea.lib.containertemplate.containertemplatebase",
"SmartphoneContainerTemplate": "privacyidea.lib.containertemplate.smartphonetemplate",
"YubikeyContainerTemplate": "privacyidea.lib.containertemplate.yubikeytemplate"
}
ret = {}
for cls, mod in classes.items():
try:
m = importlib.import_module(mod)
c = getattr(m, cls)
ret[c.get_class_type().lower()] = c
except Exception as ex: # pragma: no cover
log.warning(f"Error importing module {cls}: {ex}")
return ret
[docs]
def delete_container_template(template_name: str) -> bool:
"""
Delete a container template by its name.
"""
try:
template = get_template_obj(template_name)
template.delete()
return True
except ResourceNotFoundError:
log.warning(f"Template with name '{template_name}' does not exist.")
return False
[docs]
def create_container_template(container_type: str, template_name: str, options: dict, default: bool = False) -> int:
"""
Create a new container template.
:param container_type: The type of the container
:param template_name: The name of the template
:param options: The options for the template as dictionary
:param default: If True, the template is set as default, optional
Example for the options dictionary:
::
{
"tokens": [{"type": "hotp", "genkey": True, "hashlib": "sha256"}, ...]
}
:return: ID of the created template
"""
# Check container type
if container_type.lower() not in get_container_classes().keys():
raise EnrollmentError(f"Type '{container_type}' is not a valid type!")
# Check if the template name already exists
try:
if get_template_obj(template_name):
raise EnrollmentError(f"Template with name '{template_name}' already exists!")
except ResourceNotFoundError:
pass
TokenContainerTemplate(name=template_name, container_type=container_type).save()
template = get_template_obj(template_name)
try:
if options:
template.template_options = options
if default:
template.default = default
except Exception as ex:
# We need to delete the template on error, but still want to raise the original exception
template.delete()
raise ex
return template.id
[docs]
def create_container_template_from_db_object(db_template: TokenContainerTemplate) -> Union[ContainerTemplateBase, None]:
"""
Create a TokenContainerTemplate object from the given db object.
:param db_template: The DB object to create the container template from
:return: The created container template object or None if the container template type is not supported
"""
for ctypes, cls in get_container_template_classes().items():
if ctypes.lower() == db_template.container_type.lower():
try:
template = cls(db_template)
except Exception as ex: # pragma: no cover
log.warning(f"Error creating container template from db object: {ex}")
return None
return template
return None
[docs]
def get_all_templates_with_type():
"""
Returns a list of display strings containing the name and type of all templates.
"""
session = db.session
stmt = select(TokenContainerTemplate)
templates = session.execute(stmt).scalars().all()
template_list = []
for template in templates:
template_list.append(f"{template.name}({template.container_type})")
return template_list
[docs]
def get_templates_by_query(name: str = None, container_type: str = None, default: bool = None, page: int = 0,
pagesize: int = 0, sortdir: str = "asc",
sortby: str = "name") -> dict[str, Union[int, list[dict], None]]:
"""
Returns a list of all templates or a list filtered by the given parameters.
:param name: The name of the template, optional
:param container_type: The type of the container, optional
:param default: Filters for default templates if True or non-default if False, optional
:param page: The number of the page to view. 0 if no pagination shall be used
:param pagesize: The size of the page. 0 if no pagination shall be used
:param sortdir: The sort direction, either 'asc' or 'desc'
:param sortby: The attribute to sort by
:return: a dictionary with a list of templates at the key 'templates' and optionally pagination entries ('prev',
'next', 'current', 'count')
"""
session = db.session
stmt = select(TokenContainerTemplate)
if name:
stmt = stmt.where(TokenContainerTemplate.name == name)
if container_type:
stmt = stmt.where(TokenContainerTemplate.container_type == container_type)
if default is not None:
stmt = stmt.where(TokenContainerTemplate.default == default)
if isinstance(sortby, str):
cols = TokenContainerTemplate.__table__.columns
if sortby in cols:
sort_col = cols.get(sortby)
else:
log.info(f'Unknown sort column "{sortby}". Using "name" instead.')
sort_col = TokenContainerTemplate.name
else:
sort_col = sortby
if sortdir == "desc":
stmt = stmt.order_by(sort_col.desc())
else:
stmt = stmt.order_by(sort_col.asc())
if page > 0 or pagesize > 0:
ret = create_pagination(page, pagesize, stmt, "templates")
else:
ret = {"templates": session.execute(stmt).scalars().all()}
# create class objects from db objects
template_obj_list = [create_container_template_from_db_object(template) for template in ret["templates"]]
# convert to dict
template_list = []
for template in template_obj_list:
template_options = {}
if template.template_options != "":
template_options = json.loads(template.template_options)
template_dict = {"name": template.name,
"container_type": template.container_type,
"template_options": template_options,
"default": template.default}
template_list.append(template_dict)
ret["templates"] = template_list
return ret
[docs]
def get_template_obj(template_name: str) -> ContainerTemplateBase:
"""
Returns the template class object for the given template name.
Raises a ResourceNotFoundError if no template with this name exists.
"""
session = db.session
stmt = select(TokenContainerTemplate).where(TokenContainerTemplate.name == template_name)
db_template = session.execute(stmt).scalar_one_or_none()
if not db_template:
raise ResourceNotFoundError(f"Template {template_name} does not exist.")
template = create_container_template_from_db_object(db_template)
return template
[docs]
def set_default_template(name: str):
"""
Sets the template of the given name as default and all other templates for the container type as non-default.
:param name: The name of the template to be the new default template
"""
default_template = get_template_obj(name)
# Get all default templates for the container type and reset them to non-default
old_default_templates = get_templates_by_query(container_type=default_template.container_type, default=True)
for template in old_default_templates["templates"]:
template_obj = get_template_obj(template["name"])
template_obj.default = False
default_template.default = True
[docs]
def compare_template_dicts(template_a: dict, template_b: dict) -> bool:
"""
Compares two template dictionaries for equal tokens.
:param template_a: The first template dictionary
:param template_b: The second template dictionary
:return: True if the templates contain the same tokens, False otherwise.
"""
if template_a is None or template_b is None:
return False
# get template options
template_options_a = template_a.get("template_options", {})
template_options_b = template_b.get("template_options", {})
# compare tokens
tokens_a = template_options_a.get("tokens", [])
tokens_b = template_options_b.get("tokens", [])
if len(tokens_a) != len(tokens_b):
# different number of tokens, templates can not be equal
return False
unique_tokens_a = [token for token in tokens_a if token not in tokens_b]
unique_tokens_b = [token for token in tokens_b if token not in tokens_a]
if len(unique_tokens_a) > 0 or len(unique_tokens_b) > 0:
return False
return True
[docs]
def compare_template_with_container(template: ContainerTemplateBase, container: TokenContainerClass) -> dict:
"""
Compares the template with the container. It is only evaluated if the token types are equal.
:param template: The template object
:param container: The container object
:return: A dictionary with the differences between the template and the container
Example of a returned dictionary:
::
{
"tokens": {
"missing": ["hotp"],
"additional": ["totp"]
}
}
"""
result = {"tokens": {"missing": [], "additional": []}}
template_options = json.loads(template.template_options)
# compare tokens
template_tokens = template_options.get("tokens", [])
template_token_types = [token["type"] for token in template_tokens]
template_token_count = {ttype: template_token_types.count(ttype) for ttype in template_token_types}
container_token_types = [token.type for token in container.get_tokens()]
container_token_count = {ttype: container_token_types.count(ttype) for ttype in container_token_types}
for ttype, count_template in template_token_count.items():
count_container = container_token_count.get(ttype, 0)
if count_template > count_container:
result["tokens"]["missing"].extend([ttype] * (count_template - count_container))
for ttype, count_container in container_token_count.items():
count_template = template_token_count.get(ttype, 0)
if count_template < count_container:
result["tokens"]["additional"].extend([ttype] * (count_container - count_template))
# Check if container and template are equal
if len(result["tokens"]["missing"]) == 0 and len(result["tokens"]["additional"]) == 0:
result["tokens"]["equal"] = True
else:
result["tokens"]["equal"] = False
return result
[docs]
def get_offline_token_serials(container: TokenContainerClass) -> list[str]:
"""
Returns a list of serials of offline tokens in the container.
:param container: A TokenContainerClass object
:return: List of serials of offline tokens in the container
"""
tokens = container.get_tokens()
offline_serials = [token.get_serial() for token in tokens if is_offline_token(token.get_serial())]
return offline_serials
[docs]
def check_container_challenge(transaction_id: str) -> dict:
"""
Check if the challenge for the given transaction_id belongs to a container.
If this is the case it checks if the challenge is valid and was already answered. Then it deletes the challenge
and returns a successful authentication response.
This function is used as last step during enroll via multi challenge.
:param transaction_id: The transaction ID of the challenge
:return: A dictionary with the success state and details of the authentication in the format
::
{
"success": True,
"details": {"serial": "CONT0001", "message": "Found matching challenge"}
}
"""
success = False
details = {}
challenge_type = None
if transaction_id:
challenges = get_challenges(transaction_id=transaction_id)
challenge = challenges[0] if challenges else None
if challenge:
if challenge.data:
# check if the challenge is for a container
try:
challenge_data = json.loads(challenge.data)
if isinstance(challenge_data, dict):
challenge_type = challenge_data.get("type")
except json.JSONDecodeError:
pass
if challenge_type and challenge_type == "container":
# The challenge belongs to a container, if the challenge is already answered, we can delete it and
# return a successful authentication
if challenge.is_valid():
_, status = challenge.get_otp_status()
success = status
if success:
details = {"serial": challenge.serial, "message": "Found matching challenge"}
challenge.delete()
return {"success": success, "details": details}