# (c) NetKnights GmbH 2024, https://netknights.it
#
# This code is free software; you can redistribute it and/or
# modify it under the terms of the GNU AFFERO GENERAL PUBLIC LICENSE
# as published by the Free Software Foundation; either
# version 3 of the License, or any later version.
#
# This code is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU AFFERO GENERAL PUBLIC LICENSE for more details.
#
# You should have received a copy of the GNU Affero General Public
# License along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# SPDX-FileCopyrightText: 2024 Nils Behlen <nils.behlen@netknights.it>
# SPDX-FileCopyrightText: 2024 Jelina Unger <jelina.unger@netknights.it>
# SPDX-License-Identifier: AGPL-3.0-or-later
#
import logging
from datetime import datetime, timezone
from typing import List
from privacyidea.lib import _
from privacyidea.lib.config import get_token_types
from privacyidea.lib.error import ParameterError, ResourceNotFoundError, TokenAdminError
from privacyidea.lib.log import log_with
from privacyidea.lib.token import create_tokenclass_object
from privacyidea.lib.tokenclass import TokenClass
from privacyidea.lib.user import User
from privacyidea.models import (TokenContainerOwner, Realm, Token, db, TokenContainerStates,
TokenContainerInfo, TokenContainerRealm)
log = logging.getLogger(__name__)
[docs]class TokenContainerClass:
@log_with(log)
def __init__(self, db_container):
self._db_container = db_container
# Create the TokenClass objects from the database objects
token_list = []
for t in db_container.tokens:
token_object = create_tokenclass_object(t)
if isinstance(token_object, TokenClass):
token_list.append(token_object)
self.tokens = token_list
@property
def serial(self):
return self._db_container.serial
@property
def description(self):
return self._db_container.description
@description.setter
def description(self, value: str):
if not value:
value = ""
self._db_container.description = value
self._db_container.save()
self.update_last_updated()
@property
def type(self):
return self._db_container.type
@property
def last_seen(self):
return self._db_container.last_seen
[docs] def update_last_seen(self):
"""
Updates the timestamp of the last seen field in the database.
"""
self._db_container.last_seen = datetime.now(timezone.utc)
self._db_container.save()
@property
def last_updated(self):
return self._db_container.last_updated
[docs] def update_last_updated(self):
"""
Updates the timestamp of the last updated field in the database.
"""
self._db_container.last_updated = datetime.now(timezone.utc)
self._db_container.save()
self.update_last_seen()
@property
def realms(self):
return self._db_container.realms
[docs] def set_realms(self, realms, add=False):
"""
Set the realms of the container. If `add` is True, the realms will be added to the existing realms, otherwise
the existing realms will be removed.
:param realms: List of realm names
:param add: False if the existing realms shall be removed, True otherwise
:return: Dictionary in the format {realm: success}, the entry 'deleted' indicates whether existing realms were
deleted.
"""
result = {}
if not realms:
realms = []
# delete all container realms
if not add:
TokenContainerRealm.query.filter_by(container_id=self._db_container.id).delete()
result["deleted"] = True
self._db_container.save()
# Check that user realms are kept
user_realms = self._get_user_realms()
missing_realms = list(set(user_realms).difference(realms))
realms.extend(missing_realms)
for realm in missing_realms:
log.warning(
f"Realm {realm} can not be removed from container {self.serial} "
f"because a user from this realm is assigned th the container.")
else:
result["deleted"] = False
for realm in realms:
if realm:
realm_db = Realm.query.filter_by(name=realm).first()
if not realm_db:
result[realm] = False
log.warning(f"Realm {realm} does not exist.")
else:
realm_id = realm_db.id
# Check if realm is already assigned to the container
if not TokenContainerRealm.query.filter_by(container_id=self._db_container.id,
realm_id=realm_id).first():
self._db_container.realms.append(realm_db)
result[realm] = True
else:
log.info(f"Realm {realm} is already assigned to container {self.serial}.")
result[realm] = False
self._db_container.save()
self.update_last_updated()
return result
def _get_user_realms(self):
"""
Returns a list of the realms of the users that are assigned to the container.
"""
owners = self.get_users()
realms = [owner.realm for owner in owners]
return realms
[docs] def remove_token(self, serial: str):
"""
Remove a token from the container. Raises a ResourceNotFoundError if the token does not exist.
:param serial: Serial of the token
:return: True if the token was successfully removed, False if the token was not found in the container
"""
token = Token.query.filter(Token.serial == serial).first()
if not token:
raise ResourceNotFoundError(f"Token with serial {serial} does not exist.")
if token not in self._db_container.tokens:
log.info(f"Token with serial {serial} not found in container {self.serial}.")
return False
self._db_container.tokens.remove(token)
self._db_container.save()
self.tokens = [t for t in self.tokens if t.get_serial() != serial]
self.update_last_updated()
return True
[docs] def add_token(self, token: TokenClass):
"""
Add a token to the container.
Raises a ParameterError if the token type is not supported by the container.
:param token: TokenClass object
:return: True if the token was successfully added, False if the token is already in the container
"""
if token.get_type() not in self.get_supported_token_types():
raise ParameterError(f"Token type {token.get_type()} not supported for container type {self.type}. "
f"Supported types are {self.get_supported_token_types()}.")
if token.get_serial() not in [t.get_serial() for t in self.tokens]:
self.tokens.append(token)
self._db_container.tokens = [t.token for t in self.tokens]
self._db_container.save()
self.update_last_updated()
return True
return False
[docs] def get_tokens(self):
"""
Returns the tokens of the container as a list of TokenClass objects.
"""
return self.tokens
[docs] def delete(self):
"""
Deletes the container and all associated objects from the database.
"""
return self._db_container.delete()
[docs] def add_user(self, user: User):
"""
Assign a user to the container.
Raises a UserError if the user does not exist.
Raises a TokenAdminError if the container already has an owner.
:param user: User object
:return: True if the user was assigned
"""
(user_id, resolver_type, resolver_name) = user.get_user_identifiers()
if not self._db_container.owners.first():
TokenContainerOwner(container_id=self._db_container.id,
user_id=user_id,
resolver=resolver_name,
realm_id=user.realm_id).save()
# Add user realm to container realms
realm_db = Realm.query.filter_by(name=user.realm).first()
self._db_container.realms.append(realm_db)
self.update_last_updated()
return True
log.info(f"Container {self.serial} already has an owner.")
raise TokenAdminError("This container is already assigned to another user.")
[docs] def remove_user(self, user: User):
"""
Remove a user from the container. Raises a ResourceNotFoundError if the user does not exist.
:param user: User object to be removed
:return: True if the user was removed, False if the user was not found in the container
"""
(user_id, resolver_type, resolver_name) = user.get_user_identifiers()
count = TokenContainerOwner.query.filter_by(container_id=self._db_container.id,
user_id=user_id,
resolver=resolver_name).delete()
db.session.commit()
if count > 0:
self.update_last_updated()
return count > 0
[docs] def get_users(self):
"""
Returns a list of users that are assigned to the container.
"""
db_container_owners: List[TokenContainerOwner] = TokenContainerOwner.query.filter_by(
container_id=self._db_container.id).all()
users: List[User] = []
for owner in db_container_owners:
realm = Realm.query.filter_by(id=owner.realm_id).first()
user = User(uid=owner.user_id, realm=realm.name, resolver=owner.resolver)
users.append(user)
return users
[docs] def get_states(self):
"""
Returns the states of the container as a list of strings.
"""
db_states = self._db_container.states
states = [state.state for state in db_states]
return states
def _check_excluded_states(self, states):
"""
Validates whether the state list contains states that excludes each other
:param states: list of states
:returns: True if the state list contains exclusive states, False otherwise
"""
state_types = self.get_state_types()
for state in states:
if state in state_types:
excluded_states = state_types[state]
same_states = list(set(states).intersection(excluded_states))
if len(same_states) > 0:
return True
return False
[docs] def set_states(self, state_list: List[str]):
"""
Set the states of the container. Previous states will be removed.
Raises a ParameterError if the state list contains exclusive states.
:param state_list: List of states as strings
:returns: Dictionary in the format {state: success}
"""
if not state_list:
state_list = []
# Check for exclusive states
exclusive_states = self._check_excluded_states(state_list)
if exclusive_states:
raise ParameterError(f"The state list {state_list} contains exclusive states!")
# Remove old state entries
TokenContainerStates.query.filter_by(container_id=self._db_container.id).delete()
# Set new states
state_types = self.get_state_types().keys()
res = {}
for state in state_list:
if state not in state_types:
# We do not raise an error here to allow following states to be set
log.warning(f"State {state} not supported. Supported states are {state_types}.")
res[state] = False
else:
TokenContainerStates(container_id=self._db_container.id, state=state).save()
res[state] = True
self.update_last_updated()
return res
[docs] def add_states(self, state_list: List[str]):
"""
Add states to the container. Previous states are only removed if a new state excludes them.
Raises a ParameterError if the state list contains exclusive states.
:param state_list: List of states as strings
:returns: Dictionary in the format {state: success}
"""
if not state_list or len(state_list) == 0:
return {}
# Check for exclusive states
exclusive_states = self._check_excluded_states(state_list)
if exclusive_states:
raise ParameterError(f"The state list {state_list} contains exclusive states!")
# Add new states
res = {}
state_types = self.get_state_types()
for state in state_list:
if state not in state_types.keys():
# We do not raise an error here to allow following states to be set
res[state] = False
log.warning(f"State {state} not supported. Supported states are {state_types}.")
else:
# Remove old states that are excluded from the new state
for excluded_state in state_types[state]:
TokenContainerStates.query.filter_by(container_id=self._db_container.id,
state=excluded_state).delete()
log.debug(
f"Removed state {excluded_state} from container {self.serial} "
f"because it is excluded by the new state {state}.")
TokenContainerStates(container_id=self._db_container.id, state=state).save()
res[state] = True
self.update_last_updated()
return res
[docs] @classmethod
def get_state_types(cls):
"""
Returns the state types that are supported by this container class and the states that are exclusive
to each of these states.
:return: Dictionary in the format: {state: [excluded_states]}
"""
state_types_exclusions = {
"active": ["disabled"],
"disabled": ["active"],
"lost": [],
"damaged": []
}
return state_types_exclusions
[docs] def set_container_info(self, info):
"""
Set the containerinfo field in the DB. Old values will be deleted.
:param info: dictionary in the format: {key: value}
"""
self.delete_container_info()
if info:
self._db_container.set_info(info)
[docs] def add_container_info(self, key, value):
"""
Add a key and a value to the DB tokencontainerinfo
:param key: key
:param value: value
"""
self._db_container.set_info({key: value})
[docs] def get_container_info(self):
"""
Return the tokencontainerinfo from the DB
:return: list of tokencontainerinfo objects
"""
return self._db_container.info_list
[docs] def delete_container_info(self, key=None):
"""
Delete the tokencontainerinfo from the DB
:param key: key to delete, if None all keys are deleted
"""
res = {}
if key:
container_infos = TokenContainerInfo.query.filter_by(container_id=self._db_container.id, key=key)
else:
container_infos = TokenContainerInfo.query.filter_by(container_id=self._db_container.id)
for ci in container_infos:
ci.delete()
res[ci.key] = True
if container_infos.count() == 0:
log.debug(f"Container {self.serial} has no info with key {key} or no info at all.")
return res
[docs] @classmethod
def get_class_type(cls):
"""
Returns the type of the container class.
"""
return "generic"
[docs] @classmethod
def get_supported_token_types(cls):
"""
Returns the token types that are supported by the container class.
"""
return get_token_types()
[docs] @classmethod
def get_class_prefix(cls):
"""
Returns the container class specific prefix for the serial.
"""
return "CONT"
[docs] @classmethod
def get_class_description(cls):
"""
Returns a description of the container class.
"""
return _("General purpose container that can hold any type and any number of token.")