# (c) NetKnights GmbH 2024, https://netknights.it
#
# This code is free software; you can redistribute it and/or
# modify it under the terms of the GNU AFFERO GENERAL PUBLIC LICENSE
# as published by the Free Software Foundation; either
# version 3 of the License, or any later version.
#
# This code is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU AFFERO GENERAL PUBLIC LICENSE for more details.
#
# You should have received a copy of the GNU Affero General Public
# License along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# SPDX-FileCopyrightText: 2024 Jelina Unger <jelina.unger@netknights.it>
# SPDX-License-Identifier: AGPL-3.0-or-later
#
__doc__ = """This is the event handler module for container actions.
"""
from privacyidea.lib.container import (get_container_classes, delete_container_by_serial, init_container,
find_container_by_serial, set_container_states, add_container_states,
set_container_description, add_container_info, delete_container_info,
assign_user, unassign_user, set_container_info,
unregister, add_token_to_container)
from privacyidea.lib.containerclass import TokenContainerClass
from privacyidea.lib.eventhandler.base import BaseEventHandler
from privacyidea.lib import _
import logging
from privacyidea.lib.token import enable_token
log = logging.getLogger(__name__)
[docs]
class ACTION_TYPE(object):
"""
Allowed actions
"""
INIT = "create"
DELETE = "delete"
UNASSIGN = "unassign"
ASSIGN = "assign"
SET_STATES = "set states"
ADD_STATES = "add states"
SET_DESCRIPTION = "set description"
SET_CONTAINER_INFO = "set container info"
ADD_CONTAINER_INFO = "add container info"
DELETE_CONTAINER_INFO = "delete container info"
REMOVE_TOKENS = "remove all tokens"
DISABLE_TOKENS = "disable all tokens"
ENABLE_TOKENS = "enable all tokens"
UNREGISTER = "unregister"
[docs]
class ContainerEventHandler(BaseEventHandler):
"""
This is the event handler for container actions.
"""
identifier = "Container"
description = "This event handler can trigger new actions on containers."
@property
def allowed_positions(self):
"""
This returns the allowed positions of the event handler definition.
This can be "post" or "pre" or both.
:return: list of allowed positions
"""
return ["pre", "post"]
@property
def actions(self):
"""
This method returns a list of available actions, that are provided
by this event handler.
:return: dictionary of actions.
"""
container_types = list(get_container_classes().keys())
container_states = list(TokenContainerClass.get_state_types().keys())
action_states = {}
for state in container_states:
action_states[state] = {"type": "bool",
"required": False,
"description": _("Set the state {state}").format(state=state)}
actions = {
ACTION_TYPE.INIT:
{"type":
{"type": "str",
"required": True,
"description": _("Container type to create"),
"value": container_types
},
"description":
{"type": "str",
"required": False,
"description": _("Description of the container")
},
"user":
{"type": "bool",
"required": False,
"description": _("Assign container to user in request or to token/container owner")
},
"token":
{"type": "bool",
"required": False,
"description": _("Add token from request to container")
}
},
ACTION_TYPE.DELETE: {},
ACTION_TYPE.UNASSIGN: {},
ACTION_TYPE.ASSIGN: {},
ACTION_TYPE.SET_STATES: action_states,
ACTION_TYPE.ADD_STATES: action_states,
ACTION_TYPE.SET_DESCRIPTION:
{"description":
{"type": "str",
"required": True,
"description": _("Description of the container")
}
},
ACTION_TYPE.REMOVE_TOKENS: {},
ACTION_TYPE.SET_CONTAINER_INFO:
{"key":
{"type": "str",
"required": True,
"description": _("Set this container info key (deletes all existing keys).")
},
"value":
{"type": "str",
"description": _("Set the value for the key above.")}
},
ACTION_TYPE.ADD_CONTAINER_INFO:
{"key":
{"type": "str",
"required": True,
"description": _("Add this key to the container info.")
},
"value":
{"type": "str",
"description": _("Set the value for the key above.")}
},
ACTION_TYPE.DELETE_CONTAINER_INFO: {},
ACTION_TYPE.DISABLE_TOKENS: {},
ACTION_TYPE.ENABLE_TOKENS: {},
ACTION_TYPE.UNREGISTER: {}
}
return actions
[docs]
def do(self, action, options=None):
"""
Executes the defined action in the given event.
:param action:
:param options: Contains the flask parameters g, request, response
and the handler_def configuration
:type options: dict
:return: True if the action was successful, False otherwise (missing information, e.g. serial, user)
"""
ret = True
g = options.get("g")
request = options.get("request")
response = options.get("response")
content = self._get_response_content(response)
handler_def = options.get("handler_def")
handler_options = handler_def.get("options", {})
container_serial = self._get_container_serial(request, content)
if not container_serial:
container_serial = self._get_container_serial_from_token(request, content, g)
if action.lower() in [ACTION_TYPE.DELETE,
ACTION_TYPE.UNASSIGN,
ACTION_TYPE.ASSIGN,
ACTION_TYPE.SET_STATES,
ACTION_TYPE.ADD_STATES,
ACTION_TYPE.SET_DESCRIPTION,
ACTION_TYPE.REMOVE_TOKENS,
ACTION_TYPE.SET_CONTAINER_INFO,
ACTION_TYPE.ADD_CONTAINER_INFO,
ACTION_TYPE.DELETE_CONTAINER_INFO,
ACTION_TYPE.DISABLE_TOKENS,
ACTION_TYPE.ENABLE_TOKENS,
ACTION_TYPE.UNREGISTER]:
if container_serial:
log.info(f"{action} for container {container_serial}")
if action.lower() == ACTION_TYPE.DELETE:
delete_container_by_serial(container_serial)
elif action.lower() == ACTION_TYPE.UNASSIGN:
container = find_container_by_serial(container_serial)
container_owners = container.get_users()
if len(container_owners) == 0:
ret = False
log.debug(f"No user found to unassign from container {container_serial}")
for c_user in container_owners:
unassign_user(container_serial, c_user)
elif action.lower() == ACTION_TYPE.ASSIGN:
users = self._get_users_from_request(request)
if len(users) == 0:
ret = False
log.debug(f"No user found to assign to container {container_serial}")
for c_user in users:
assign_user(container_serial, c_user)
elif action.lower() == ACTION_TYPE.SET_STATES:
container_states = list(TokenContainerClass.get_state_types().keys())
selected_states = []
for state in container_states:
if handler_options.get(state):
selected_states.append(state)
if len(selected_states) > 0:
set_container_states(container_serial, selected_states)
else:
ret = False
log.debug(
f"No valid state found in the handler options {handler_options} "
f"to set in container {container_serial}")
elif action.lower() == ACTION_TYPE.ADD_STATES:
container_states = list(TokenContainerClass.get_state_types().keys())
selected_states = []
for state in container_states:
if handler_options.get(state):
selected_states.append(state)
if len(selected_states) > 0:
add_container_states(container_serial, selected_states)
else:
ret = False
log.debug(f"No states found to add to container {container_serial}")
elif action.lower() == ACTION_TYPE.SET_DESCRIPTION:
new_description = handler_options.get("description")
if new_description:
set_container_description(container_serial, new_description)
else:
ret = False
log.debug(f"No description found to set in container {container_serial}")
elif action.lower() == ACTION_TYPE.REMOVE_TOKENS:
container = find_container_by_serial(container_serial)
tokens = container.get_tokens()
if len(tokens) > 0:
[container.remove_token(t.get_serial()) for t in tokens]
else:
log.debug(f"No tokens found to remove from container {container_serial}")
elif action.lower() == ACTION_TYPE.SET_CONTAINER_INFO:
key = handler_options.get("key")
value = handler_options.get("value") or ""
info = {key: value}
set_container_info(container_serial, info)
elif action.lower() == ACTION_TYPE.ADD_CONTAINER_INFO:
key = handler_options.get("key")
value = handler_options.get("value") or ""
add_container_info(container_serial, key, value)
elif action.lower() == ACTION_TYPE.DELETE_CONTAINER_INFO:
delete_container_info(container_serial, ikey=None)
elif action.lower() == ACTION_TYPE.DISABLE_TOKENS:
container = find_container_by_serial(container_serial)
tokens = container.get_tokens()
for token in tokens:
enable_token(token.get_serial(), enable=False)
if len(tokens) == 0:
log.debug(f"No tokens found to disable in container {container_serial}")
elif action.lower() == ACTION_TYPE.ENABLE_TOKENS:
container = find_container_by_serial(container_serial)
tokens = container.get_tokens()
for token in tokens:
enable_token(token.get_serial(), enable=True)
if len(tokens) == 0:
log.debug(f"No tokens found to enable in container {container_serial}")
elif action.lower() == ACTION_TYPE.UNREGISTER:
container = find_container_by_serial(container_serial)
unregister(container)
else:
log.debug(f"Action {action} requires serial number. But no valid serial "
f"number could be found in request {request}.")
ret = False
if action.lower() == ACTION_TYPE.INIT:
if handler_options.get("type"):
params = {
"type": handler_options.get("type"),
"description": handler_options.get("description"),
}
users = []
if handler_options.get("user"):
users = self._get_users_from_request(request)
if len(users) > 0 and not users[0].is_empty():
params["user"] = users[0].login
params["realm"] = users[0].realm
else:
log.debug(f"No user found to assign to container {container_serial}")
new_serial = init_container(params)["container_serial"]
# assign remaining users to container
for user in users[1:]:
container = find_container_by_serial(new_serial)
if container and not user.is_empty():
container.add_user(user)
if handler_options.get("token"):
token_serial = request.all_data.get("serial") or \
content.get("detail", {}).get("serial") or \
g.audit_object.audit_data.get("serial")
if token_serial:
add_token_to_container(new_serial, token_serial)
else:
log.debug(f"No token found to add to container {new_serial}")
else:
ret = False
log.debug(f"Action {action} requires container type. But no type "
f"could be found in the handler options {handler_options}.")
return ret