# (c) NetKnights GmbH 2025, https://netknights.it
#
# This code is free software; you can redistribute it and/or
# modify it under the terms of the GNU AFFERO GENERAL PUBLIC LICENSE
# as published by the Free Software Foundation; either
# version 3 of the License, or any later version.
#
# This code is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU AFFERO GENERAL PUBLIC LICENSE for more details.
#
# You should have received a copy of the GNU Affero General Public
# License along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# SPDX-FileCopyrightText: 2017 Cornelius Kölbel <cornelius.koelbel@netknights.it>
# SPDX-License-Identifier: AGPL-3.0-or-later
__doc__ = """This is the base class for an event handler module.
The event handler module is bound to an event together with
* a condition and
* an action
* optional options ;-)
"""
import datetime
import logging
import re
from dateutil.tz import tzlocal
from privacyidea.lib import _
from privacyidea.lib.auth import ROLE
from privacyidea.lib.challenge import get_challenges
from privacyidea.lib.config import get_token_types
from privacyidea.lib.container import (find_container_by_serial, find_container_for_token, get_all_containers,
get_container_classes, get_container_realms)
from privacyidea.lib.containers.container_states import ContainerStates
from privacyidea.lib.counter import read as counter_read
from privacyidea.lib.realm import get_realms
from privacyidea.lib.resolver import get_resolver_list
from privacyidea.lib.token import get_token_owner, get_tokens
from privacyidea.lib.tokenclass import DATE_FORMAT
from privacyidea.lib.user import User
from privacyidea.lib.utils import parse_time_offset_from_now, is_true, check_ip_in_policy, AUTH_RESPONSE
from privacyidea.lib.utils.compare import compare_time, compare_ints, compare_generic
from privacyidea.models import TokenContainer
log = logging.getLogger(__name__)
class CONDITION:
"""
Possible conditions
"""
TOKEN_HAS_OWNER = "token_has_owner" # nosec B105 # condition name
TOKEN_IS_ORPHANED = "token_is_orphaned" # nosec B105 # condition name
TOKEN_LOCKED = "token_locked" # nosec B105 # condition name
TOKEN_VALIDITY_PERIOD = "token_validity_period" # nosec B105 # condition name
USER_TOKEN_NUMBER = "user_token_number" # nosec B105 # condition name
USER_CONTAINER_NUMBER = "user_container_number"
OTP_COUNTER = "otp_counter"
TOKENTYPE = "tokentype"
LAST_AUTH = "last_auth"
COUNT_AUTH = "count_auth"
COUNT_AUTH_SUCCESS = "count_auth_success"
COUNT_AUTH_FAIL = "count_auth_fail"
COUNTER = "counter"
FAILCOUNTER = 'failcounter'
TOKENINFO = "tokeninfo"
DETAIL_ERROR_MESSAGE = "detail_error_message"
DETAIL_MESSAGE = "detail_message"
RESULT_VALUE = "result_value"
RESULT_STATUS = "result_status"
RESULT_AUTHENTICATION = "result_authentication"
TOKENREALM = "tokenrealm"
TOKENRESOLVER = "tokenresolver"
TOKEN_IS_IN_CONTAINER = "token_is_in_container"
REALM = "realm"
RESOLVER = "resolver"
CLIENT_IP = "client_ip"
ROLLOUT_STATE = "rollout_state"
CHALLENGE_SESSION = "challenge_session"
CHALLENGE_EXPIRED = "challenge_expired"
CONTAINER_STATE = "container_state"
CONTAINER_EXACT_STATE = "container_exact_state"
CONTAINER_HAS_OWNER = "container_has_owner"
CONTAINER_TYPE = "container_type"
CONTAINER_HAS_TOKEN = "container_has_token"
SERIAL = "serial"
CONTAINER_INFO = "container_info"
CONTAINER_REALM = "container_realm"
CONTAINER_RESOLVER = "container_resolver"
CONTAINER_LAST_AUTH = "container_last_authentication"
CONTAINER_LAST_SYNC = "container_last_synchronization"
class GROUP:
"""
These are the event handler groups. The conditions
will be grouped in the UI.
"""
TOKEN = "token" # nosec B105 # group name
GENERAL = "general"
USER = "user"
COUNTER = "counter"
CHALLENGE = "challenge"
CONTAINER = "container"
class EventHandlerContext:
"""
Context to pass information from the check condition function to the do action function.
"""
def __init__(self, user: User | None = None, token_serials: list[str] | None = None,
container: TokenContainer | None = None):
self.user = user
self.token_serials = token_serials
self.container = container
[docs]
class BaseEventHandler:
"""
An Eventhandler needs to return a list of actions, which it can handle.
It also returns a list of allowed action and conditions
It returns an identifier, which can be used in the event-handling definitions
"""
identifier = "BaseEventHandler"
description = "This is the base class of an EventHandler with no " \
"functionality"
run_details = None
def __init__(self):
pass
@property
def allowed_positions(self):
"""
This returns the allowed positions of the event handler definition.
This can be "post" or "pre" or both.
:return: list of allowed positions
"""
return ["post"]
@property
def actions(self):
"""
This method returns a list of available actions, that are provided
by this event handler.
:return: dictionary of actions.
"""
actions = ["sample_action_1", "sample_action_2"]
return actions
@property
def conditions(self):
"""
The UserNotification can filter for conditions like
* type of logged-in user and
* successful or failed value.success
allowed types are str, multi, text, regexp
:return: dict
"""
realms = get_realms()
resolvers = get_resolver_list()
container_states = [{"name": state} for state in ContainerStates.get_supported_states()]
cond = {
CONDITION.CHALLENGE_SESSION:
{
"type": "str",
"desc": _("The challenge session matches the string or regular "
"expression (like 'challenge_declined' or 'enrollment')"),
"group": GROUP.CHALLENGE
},
CONDITION.CHALLENGE_EXPIRED:
{
"type": "str",
"desc": _("The challenge of a token during the authentication process"
" is expired."),
"value": ("True", "False"),
"group": GROUP.CHALLENGE
},
CONDITION.ROLLOUT_STATE:
{
"type": "str",
"desc": _("The rollout_state of the token has a certain value like 'clientwait' or 'enrolled'."),
"group": GROUP.TOKEN
},
CONDITION.REALM:
{
"type": "multi",
"desc": _("The realm of the user, for which this event should apply."),
"value": [{"name": r} for r in realms],
"group": GROUP.USER
},
CONDITION.RESOLVER:
{
"type": "multi",
"desc": _("The resolver of the user, for which this event should apply."),
"value": [{"name": r} for r in resolvers],
"group": GROUP.USER
},
CONDITION.TOKENREALM:
{
"type": "multi",
"desc": _("The realm of the token, for which this event should "
"apply."),
"value": [{"name": r} for r in realms],
"group": GROUP.TOKEN
},
CONDITION.TOKENRESOLVER:
{
"type": "multi",
"desc": _("The resolver of the token, for which this event should "
"apply."),
"value": [{"name": r} for r in resolvers],
"group": GROUP.TOKEN
},
CONDITION.TOKENTYPE:
{
"type": "multi",
"desc": _("The type of the token."),
"value": [{"name": r} for r in get_token_types()],
"group": GROUP.TOKEN
},
"logged_in_user":
{
"type": "str",
"desc": _("The logged in user is of the following type."),
"value": (ROLE.ADMIN, ROLE.USER),
"group": GROUP.USER
},
CONDITION.RESULT_VALUE:
{
"type": "str",
"desc": _("The result.value within the response is "
"True or False."),
"value": ("True", "False"),
"group": GROUP.GENERAL
},
CONDITION.RESULT_STATUS:
{
"type": "str",
"desc": _("The result.status within the response is "
"True or False."),
"value": ("True", "False"),
"group": GROUP.GENERAL
},
CONDITION.RESULT_AUTHENTICATION:
{
"type": "str",
"desc": _("The result.authentication within the response is the given value."),
"value": (
AUTH_RESPONSE.ACCEPT, AUTH_RESPONSE.REJECT, AUTH_RESPONSE.CHALLENGE, AUTH_RESPONSE.DECLINED),
"group": GROUP.GENERAL
},
CONDITION.TOKEN_LOCKED:
{
"type": "str",
"desc": _("Check if the max failcounter of the token is "
"reached."),
"value": ("True", "False"),
"group": GROUP.TOKEN
},
CONDITION.TOKEN_HAS_OWNER:
{
"type": "str",
"desc": _("The token has a user assigned."),
"value": ("True", "False"),
"group": GROUP.TOKEN
},
CONDITION.TOKEN_IS_ORPHANED:
{
"type": "str",
"desc": _("The token has a user assigned, but the user does "
"not exist in the userstore anymore."),
"value": ("True", "False"),
"group": GROUP.TOKEN
},
CONDITION.TOKEN_VALIDITY_PERIOD:
{
"type": "str",
"desc": _("Check if the token is within its validity period."),
"value": ("True", "False"),
"group": GROUP.TOKEN
},
CONDITION.SERIAL:
{
"type": "regexp",
"desc": _("Action is triggered, if the serial matches this "
"regular expression."),
"group": GROUP.TOKEN
},
CONDITION.TOKEN_IS_IN_CONTAINER:
{
"type": "str",
"desc": _("The token is in a container."),
"value": ("True", "False"),
"group": GROUP.TOKEN
},
CONDITION.USER_TOKEN_NUMBER:
{
"type": "str",
"desc": _("Action is triggered if the user has this number of tokens assigned. > and < can be "
"used, like <8."),
"group": GROUP.USER
},
CONDITION.USER_CONTAINER_NUMBER:
{
"type": "str",
"desc": _("Action is triggered if the number of containers assigned to the user matches the "
"condition. The condition should start with a comparator such as '==', '<', '>', "
"followed by a number, e.g. '==0', '>3'."),
"group": GROUP.USER
},
CONDITION.OTP_COUNTER:
{
"type": "str",
"desc": _("Action is triggered, if the counter of the token "
"equals this setting. Can also be "
"'>100' or '<99' for no exact match."),
"group": GROUP.COUNTER
},
CONDITION.LAST_AUTH:
{
"type": "str",
"desc": _("Action is triggered, if the last authentication of "
"the token is older than 7h, 10d or 1y."),
"group": GROUP.TOKEN
},
CONDITION.COUNT_AUTH:
{
"type": "str",
"desc": _("This can be '>100', '<99', or '=100', to trigger "
"the action, if the tokeninfo field 'count_auth' is "
"bigger than 100, less than 99 or exactly 100."),
"group": GROUP.COUNTER
},
CONDITION.COUNT_AUTH_SUCCESS:
{
"type": "str",
"desc": _("This can be '>100', '<99', or '=100', to trigger "
"the action, if the tokeninfo field "
"'count_auth_success' is "
"bigger than 100, less than 99 or exactly 100."),
"group": GROUP.COUNTER
},
CONDITION.COUNT_AUTH_FAIL:
{
"type": "str",
"desc": _("This can be '>100', '<99', or '=100', to trigger "
"the action, if the difference between the tokeninfo "
"field 'count_auth' and 'count_auth_success is "
"bigger than 100, less than 99 or exactly 100."),
"group": GROUP.COUNTER
},
CONDITION.FAILCOUNTER:
{
"type": "str",
"desc": _("This can be '>9', '<9', or '=10', to trigger "
"the action, if the failcounter of a token matches this value. "
"Note that the failcounter stops increasing, if the max_failcount is "
"reached."),
"group": GROUP.COUNTER
},
CONDITION.TOKENINFO:
{
"type": "str",
"desc": _("This condition can check any arbitrary tokeninfo "
"field. You need to enter something like "
"'<fieldname> == <fieldvalue>', '<fieldname> > "
"<fieldvalue>' or '<fieldname> < <fieldvalue>'."),
"group": GROUP.TOKEN
},
CONDITION.COUNTER:
{
"type": "str",
"desc": _("This condition can check the value of an arbitrary event counter and "
"compare it like 'myCounter == 1000', 'myCounter > 1000' or "
"'myCounter < 1000'."),
"group": GROUP.COUNTER
},
CONDITION.DETAIL_ERROR_MESSAGE:
{
"type": "str",
"desc": _("Here you can enter a regular expression. The "
"condition only applies if the regular expression "
"matches the detail->error->message in the response."),
"group": GROUP.GENERAL
},
CONDITION.DETAIL_MESSAGE:
{
"type": "str",
"desc": _("Here you can enter a regular expression. The "
"condition only applies if the regular expression "
"matches the detail->message in the response."),
"group": GROUP.GENERAL
},
CONDITION.CLIENT_IP:
{
"type": "str",
"desc": _("Trigger the action, if the client IP matches."),
"group": GROUP.GENERAL
},
CONDITION.CONTAINER_STATE:
{
"type": "multi",
"desc": _("The container is in the specified states, but can additionally be in other states."),
"value": container_states,
"group": GROUP.CONTAINER
},
CONDITION.CONTAINER_EXACT_STATE:
{
"type": "multi",
"desc": _("The container is only in the specified states."),
"value": container_states,
"group": GROUP.CONTAINER
},
CONDITION.CONTAINER_HAS_OWNER:
{
"type": "str",
"desc": _("The container has a user assigned."),
"value": ("True", "False"),
"group": GROUP.CONTAINER
},
CONDITION.CONTAINER_HAS_TOKEN:
{
"type": "str",
"desc": _("The container has at least one token assigned."),
"value": ("True", "False"),
"group": GROUP.CONTAINER
},
CONDITION.CONTAINER_TYPE:
{
"type": "str",
"desc": _("The container is of a certain type."),
"value": list(get_container_classes().keys()),
"group": GROUP.CONTAINER
},
CONDITION.CONTAINER_REALM:
{
"type": "multi",
"desc": _("The container is in this realm or in no realm at all. If multiple realms are selected, "
"the condition is fulfilled if the container is in at least one realm of the list. The "
"condition is not checked if the container has no realm, hence the action would be "
"triggered."),
"value": [{"name": r} for r in realms],
"group": GROUP.CONTAINER
},
CONDITION.CONTAINER_RESOLVER:
{
"type": "multi",
"desc": _("An owner of the container is in this resolver. If multiple resolvers are selected, the "
"condition is fulfilled if at least one owner is in one resolver. The condition is not "
"checked if the container has no owner, hence the action would be triggered."),
"value": [{"name": r} for r in resolvers],
"group": GROUP.CONTAINER
},
CONDITION.CONTAINER_INFO:
{
"type": "str",
"desc": _("This condition can check any arbitrary container info field. You need to enter "
"something like 'fieldname == fieldvalue', 'fieldname > fieldvalue' or 'fieldname < "
"fieldvalue'."),
"group": GROUP.CONTAINER
},
CONDITION.CONTAINER_LAST_AUTH:
{
"type": "str",
"desc": _(
"Action is triggered, if the last authentication of the container is older than the specified "
"time. The time value has to be an integer followed by a time unit. Supported units are "
"'y' (years), 'd' (days), 'h' (hours), 'm' (minutes), 's' (seconds). Only one unit is allowed. "
"Examples: '8h', '7d', '1y'"),
"group": GROUP.CONTAINER
},
CONDITION.CONTAINER_LAST_SYNC:
{
"type": "str",
"desc": _(
"Action is triggered, if the last synchronization of the container is older than the specified "
"time. The time value has to be an integer followed by a time unit. Supported units are "
"'y' (years), 'd' (days), 'h' (hours), 'm' (minutes), 's' (seconds). Only one unit is allowed. "
"Examples: '8h', '7d', '1y'"),
"group": GROUP.CONTAINER
},
}
return cond
@property
def events(self):
"""
This method returns a list allowed events, that this event handler
can be bound to and which it can handle with the corresponding actions.
An eventhandler may return an asterisk ["*"] indicating, that it can
be used in all events.
:return: list of events
"""
events = ["*"]
return events
@staticmethod
def _get_tokenowner(request):
user = User()
if hasattr(request, "User"):
user = request.User
if user is None:
user = User()
serial = request.all_data.get("serial")
if user.is_empty() and serial:
# maybe the user is empty, but a serial was passed.
# Then we determine the user by the serial
try:
user = get_token_owner(serial) or User()
except Exception as exx:
user = User()
# This can happen for orphaned tokens.
log.info(f"Could not determine tokenowner for {serial!s}. Maybe the "
"user does not exist anymore.")
log.debug(exx)
# If the user does not exist, we set an empty user
if not user.exist():
user = User()
return user
@staticmethod
def _get_container_owners(request):
users = []
user = User(login='', realm='')
if hasattr(request, "User"):
user = request.User
if user:
users.append(user)
serial = request.all_data.get("container_serial")
if not user and serial:
# maybe the user is empty, but a serial was passed.
# Then we determine the user by the serial
container = find_container_by_serial(serial)
users = container.get_users()
return users
@classmethod
def _get_users_from_request(cls, request):
"""
Extracts the user information from the request or searches for the container owners or token owner.
If no user is found, an empty user is returned.
:param request: The request object
:return: List of user objects
"""
users = cls._get_container_owners(request)
if len(users) == 0:
user = cls._get_tokenowner(request)
if not user.is_empty():
users.append(user)
return users
@classmethod
def _get_token_serials(cls, request, content, g):
"""
Extracts the token serials from the request, content or audit object.
:param request: The request object
:param content: The content of the response
:param g: The g object
:return: Comma separated string of token serials
"""
# Get single token serial
serial = (request.all_data.get("serial") or content.get("detail", {}).get("serial") or
g.audit_object.audit_data.get("serial"))
return serial
@classmethod
def _get_container_serial(cls, request, content):
"""
Get the container serial from the request, content or audit object.
:param request: The request object
:param content: The content of the response
:return: The container serial or None if no serial could be found
"""
# get serial from request
container_serial = request.all_data.get("container_serial")
if not cls._serial_is_from_container(container_serial):
container_serial = None
if not container_serial:
# get serial from response
value = content.get("result", {}).get('value', {})
if isinstance(value, dict):
container_serial = value.get("container_serial")
if not cls._serial_is_from_container(container_serial):
container_serial = None
return container_serial
@classmethod
def _get_container_serial_from_token(cls, request, content, g):
"""
Tries to get a token serial. If a token is found, it tries to get the container serial from this token.
:param request: The request object
:param content: The content of the response
:param g: The g object
:return: The container serial or None if no serial could be found
"""
container_serial = None
serials = cls._get_token_serials(request, content, g)
if serials:
serial_list = serials.replace(' ', '').split(',')
if len(serial_list):
# Only if one token serial is provided a corresponding container can be found
serial = serial_list[0]
container_serial = cls._get_container_serial_from_token_serial(serial)
return container_serial
@classmethod
def _serial_is_from_container(cls, serial):
"""
Validates whether the given serial is from a container.
:param serial: The serial to validate
:return: True if a container with the given serial exists, False otherwise
"""
container_exists = False
if serial:
# Check if a container exists with this serial
containers = get_all_containers(serial=serial, pagesize=0, page=0)['containers']
if len(containers) > 0:
container_exists = True
return container_exists
@classmethod
def _get_container_serial_from_token_serial(cls, token_serial):
"""
Tries to get the container serial from a token.
:param token_serial: Serial of a token
:return: container serial or None if the token is not part of a container or the token does not exist
"""
container_serial = None
if token_serial:
tokens = get_tokens(serial=token_serial)
if len(tokens) > 0:
token = tokens[0]
container = find_container_for_token(token.get_serial())
if container:
container_serial = container.serial
return container_serial
@staticmethod
def _get_response_content(response):
content = {}
if response:
if response.is_json:
content = response.json
return content
[docs]
def check_condition(self, options):
"""
Check if all conditions are met and if the action should be executed.
If the conditions are met, we return "True"
:return: True
"""
g = options.get("g")
request = options.get("request")
response = options.get("response")
e_handler_def = options.get("handler_def")
if not e_handler_def:
options["context"] = EventHandlerContext()
# options is the handler definition
return True
# conditions can be corresponding to the property conditions
conditions = e_handler_def.get("conditions")
content = self._get_response_content(response)
user = self._get_tokenowner(request)
serial = self._get_token_serials(request, content, g)
transaction_id = request.all_data.get("transaction_id")
tokenrealms = []
tokenresolvers = []
tokentype = None
token_obj = None
if serial:
# We have determined the serial number from the request.
token_obj_list = get_tokens(serial=serial)
elif user:
# We have to determine the token via the user object. But only if
# the user has only one token
token_obj_list = get_tokens(user=user)
else:
token_obj_list = []
if len(token_obj_list) == 1:
# There is a token involved, so we determine its resolvers and realms
token_obj = token_obj_list[0]
tokenrealms = token_obj.get_realms()
tokentype = token_obj.get_tokentype()
serial = token_obj.get_serial()
all_realms = get_realms()
for tokenrealm in tokenrealms:
resolvers = all_realms.get(tokenrealm, {}).get("resolver", {})
tokenresolvers.extend([r.get("name") for r in resolvers])
tokenresolvers = list(set(tokenresolvers))
# Get container
container_serial = self._get_container_serial(request, content)
container = None
if container_serial:
container = find_container_by_serial(container_serial)
elif serial and token_obj:
container = find_container_for_token(serial)
# context to pass information to the action
token_serials = serial.replace(" ", "").split(",") if serial else None
options["context"] = EventHandlerContext(user=user, token_serials=token_serials, container=container)
if CONDITION.CLIENT_IP in conditions:
if g and g.client_ip:
ip_policy = [ip.strip() for ip in conditions.get(CONDITION.CLIENT_IP).split(",")]
found, excluded = check_ip_in_policy(g.client_ip, ip_policy)
if not found or excluded:
return False
if CONDITION.REALM in conditions:
if user.realm not in conditions.get(CONDITION.REALM).split(","):
return False
if CONDITION.RESOLVER in conditions:
if user.resolver not in conditions.get(CONDITION.RESOLVER).split(","):
return False
if "logged_in_user" in conditions:
# Determine the role of the user
try:
logged_in_user = g.logged_in_user
user_role = logged_in_user.get("role")
except Exception:
# A non-logged-in-user is a User, not an admin
user_role = ROLE.USER
if user_role != conditions.get("logged_in_user"):
return False
if CONDITION.RESULT_VALUE in conditions:
condition_value = conditions.get(CONDITION.RESULT_VALUE)
result_value = content.get("result", {}).get("value")
if is_true(condition_value) != is_true(result_value):
return False
if CONDITION.RESULT_STATUS in conditions:
condition_value = conditions.get(CONDITION.RESULT_STATUS)
result_status = content.get("result", {}).get("status")
if is_true(condition_value) != is_true(result_status):
return False
if CONDITION.RESULT_AUTHENTICATION in conditions:
condition_value = conditions.get(CONDITION.RESULT_AUTHENTICATION)
result_auth = content.get("result", {}).get("authentication")
if condition_value != result_auth:
return False
# checking of max-failcounter state of the token
if CONDITION.TOKEN_LOCKED in conditions:
if token_obj:
locked = token_obj.get_failcount() >= token_obj.get_max_failcount()
if (conditions.get(CONDITION.TOKEN_LOCKED) in ["True", True]) != locked:
return False
else:
# check all tokens of the user, if any token is maxfail
token_objects = get_tokens(user=user, maxfail=True)
if not ','.join([tok.get_serial() for tok in token_objects]):
return False
if CONDITION.TOKENREALM in conditions and tokenrealms:
res = False
for trealm in tokenrealms:
if trealm in conditions.get(CONDITION.TOKENREALM).split(","):
res = True
break
if not res:
return False
if CONDITION.TOKENRESOLVER in conditions and tokenresolvers:
res = False
for tres in tokenresolvers:
if tres in conditions.get(CONDITION.TOKENRESOLVER).split(","):
res = True
break
if not res:
return False
if "serial" in conditions and serial:
serial_match = conditions.get("serial")
if not bool(re.match(serial_match, serial)):
return False
if CONDITION.USER_TOKEN_NUMBER in conditions and user:
num_tokens = get_tokens(user=user, count=True)
cond = conditions.get(CONDITION.USER_TOKEN_NUMBER)
if not compare_ints(cond, num_tokens):
return False
if CONDITION.USER_CONTAINER_NUMBER in conditions and user:
container_list = get_all_containers(user=user, page=1, pagesize=1)
num_containers = container_list['count']
if not compare_ints(conditions.get(CONDITION.USER_CONTAINER_NUMBER), num_containers):
return False
if CONDITION.DETAIL_ERROR_MESSAGE in conditions:
message = content.get("detail", {}).get("error", {}).get("message", "")
search_exp = conditions.get(CONDITION.DETAIL_ERROR_MESSAGE)
m = re.search(search_exp, message)
if not bool(m):
return False
if CONDITION.DETAIL_MESSAGE in conditions:
message = content.get("detail", {}).get("message", "")
search_exp = conditions.get(CONDITION.DETAIL_MESSAGE)
m = re.search(search_exp, message)
if not bool(m):
return False
if CONDITION.COUNTER in conditions:
# Can be counter==1000
if not compare_generic(conditions.get(CONDITION.COUNTER),
lambda x: counter_read(x) or 0,
"Misconfiguration in your counter "
"condition: {0!s}"
):
return False
# Token specific conditions
if token_obj:
if CONDITION.TOKENTYPE in conditions:
if tokentype not in conditions.get(CONDITION.TOKENTYPE).split(
","):
return False
if CONDITION.TOKEN_HAS_OWNER in conditions:
uid = token_obj.get_user_id()
check = conditions.get(CONDITION.TOKEN_HAS_OWNER)
if (not uid and is_true(check)) or (uid and not is_true(check)):
log.debug(f"Condition token_has_owner for token {token_obj!r} not fulfilled.")
return False
if CONDITION.TOKEN_IS_ORPHANED in conditions:
orphaned = token_obj.is_orphaned()
check = conditions.get(CONDITION.TOKEN_IS_ORPHANED)
if (not orphaned and is_true(check)) or (orphaned and not is_true(check)):
log.debug(f"Condition token_is_orphaned for token {token_obj!r} not fulfilled.")
return False
if CONDITION.TOKEN_VALIDITY_PERIOD in conditions:
valid = token_obj.check_validity_period()
if (conditions.get(CONDITION.TOKEN_VALIDITY_PERIOD) in ["True", True]) != valid:
return False
if CONDITION.OTP_COUNTER in conditions:
cond = conditions.get(CONDITION.OTP_COUNTER)
if not compare_ints(cond, token_obj.token.count):
return False
if CONDITION.LAST_AUTH in conditions:
if token_obj.check_last_auth_newer(conditions.get(CONDITION.LAST_AUTH)):
return False
if CONDITION.COUNT_AUTH in conditions:
count = token_obj.get_count_auth()
cond = conditions.get(CONDITION.COUNT_AUTH)
if not compare_ints(cond, count):
return False
if CONDITION.COUNT_AUTH_SUCCESS in conditions:
count = token_obj.get_count_auth_success()
cond = conditions.get(CONDITION.COUNT_AUTH_SUCCESS)
if not compare_ints(cond, count):
return False
if CONDITION.COUNT_AUTH_FAIL in conditions:
count = token_obj.get_count_auth()
c_success = token_obj.get_count_auth_success()
c_fail = count - c_success
cond = conditions.get(CONDITION.COUNT_AUTH_FAIL)
if not compare_ints(cond, c_fail):
return False
if CONDITION.FAILCOUNTER in conditions:
failcount = token_obj.get_failcount()
cond = conditions.get(CONDITION.FAILCOUNTER)
res = compare_ints(cond, failcount)
if not res:
return False
if CONDITION.TOKENINFO in conditions:
cond = conditions.get(CONDITION.TOKENINFO)
# replace {now} in condition
cond, td = parse_time_offset_from_now(cond)
s_now = (datetime.datetime.now(tzlocal()) + td).strftime(
DATE_FORMAT)
cond = cond.format(now=s_now)
if not compare_generic(cond,
token_obj.get_tokeninfo,
"Misconfiguration in your tokeninfo "
"condition: {0!s}"):
return False
if CONDITION.ROLLOUT_STATE in conditions:
cond = conditions.get(CONDITION.ROLLOUT_STATE)
if not cond == token_obj.token.rollout_state:
return False
if CONDITION.TOKEN_IS_IN_CONTAINER in conditions:
cond = conditions.get(CONDITION.TOKEN_IS_IN_CONTAINER)
container = find_container_for_token(serial)
token_is_in_container = container is not None
if (not token_is_in_container and is_true(cond)) or (token_is_in_container and not is_true(cond)):
log.debug(f"Condition {CONDITION.TOKEN_IS_IN_CONTAINER} for token {token_obj} "
"not fulfilled.")
return False
# Evaluates the challenge either for a token or a container object
if token_obj or container:
serial = token_obj.get_serial() if token_obj else None
if not serial:
serial = container.serial
if CONDITION.CHALLENGE_SESSION or CONDITION.CHALLENGE_EXPIRED in conditions:
chals = get_challenges(serial=serial, transaction_id=transaction_id)
if len(chals) == 1:
chal = chals[0]
if CONDITION.CHALLENGE_SESSION in conditions:
cond_match = conditions.get(CONDITION.CHALLENGE_SESSION)
if not bool(re.match(cond_match, chal.session)):
return False
if CONDITION.CHALLENGE_EXPIRED in conditions:
condition_value = conditions.get(CONDITION.CHALLENGE_EXPIRED)
if is_true(condition_value) == chal.is_valid():
return False
elif len(chals) > 1:
# If there is more than one challenge, the conditions seams awkward
if token_obj:
log.warning(f"There is more than one challenge for token {token_obj.token.serial} "
f"and transaction_id {transaction_id}.")
else:
log.warning(f"There is more than one challenge for container {container.serial} "
f"and transaction_id {transaction_id}.")
return False
# Container specific conditions
if container:
if CONDITION.CONTAINER_STATE in conditions:
cond = conditions.get(CONDITION.CONTAINER_STATE).split(',')
container_states = container.get_states()
for cond_state in cond:
if cond_state not in container_states:
log.debug(f"Condition container_state {cond_state} for container {container.serial} not "
"fulfilled.")
return False
if CONDITION.CONTAINER_EXACT_STATE in conditions:
cond = conditions.get(CONDITION.CONTAINER_EXACT_STATE).split(',')
container_states = container.get_states()
if len(cond) != len(container_states):
log.debug(f"Condition container_single_state {cond} for container {container.serial} not "
"fulfilled.")
return False
for cond_state in cond:
if cond_state not in container_states:
log.debug(f"Condition container_state {cond_state} for container {container.serial} not "
"fulfilled.")
return False
if CONDITION.CONTAINER_HAS_OWNER in conditions:
has_container_owner = len(container.get_users()) > 0
cond = conditions.get(CONDITION.CONTAINER_HAS_OWNER)
if (not has_container_owner and is_true(cond)) or (has_container_owner and not is_true(cond)):
log.debug(f"Condition container_has_owner for container {container.serial} not fulfilled.")
return False
if CONDITION.CONTAINER_TYPE in conditions:
cond = conditions.get(CONDITION.CONTAINER_TYPE)
if container.type != cond:
log.debug(f"Condition container_type {cond} for container {container.serial} not fulfilled.")
return False
if CONDITION.CONTAINER_HAS_TOKEN in conditions:
tokens = [token.get_serial() for token in container.get_tokens()]
cond = conditions.get(CONDITION.CONTAINER_HAS_TOKEN)
if (len(tokens) == 0 and is_true(cond)) or (len(tokens) > 0 and not is_true(cond)):
log.debug(f"Condition container_has_token for container {container.serial} not fulfilled.")
return False
if CONDITION.CONTAINER_REALM in conditions:
condition_realms = conditions.get(CONDITION.CONTAINER_REALM).split(",")
container_realms = get_container_realms(container.serial)
# If the container is in no realm, the condition is not checked, it is fulfilled
if len(container_realms) > 0:
# check if at least one realm of the condition is also a realm of the container
matching_realms = list(set(condition_realms).intersection(container_realms))
if len(matching_realms) == 0:
log.debug(f"Condition container_realm {condition_realms} for container {container.serial} "
"not fulfilled.")
return False
if CONDITION.CONTAINER_RESOLVER in conditions:
condition_resolvers = conditions.get(CONDITION.CONTAINER_RESOLVER).split(",")
container_resolvers = [user.resolver for user in container.get_users()]
# If the container has no owner, the condition is not checked, it is fulfilled
if len(container_resolvers) > 0:
# check if at least one resolver of the condition is also a resolver of a container owner
matching_resolvers = list(set(condition_resolvers).intersection(container_resolvers))
if len(matching_resolvers) == 0:
log.debug(f"Condition container_resolver {condition_resolvers} for container "
f"{container.serial} not fulfilled.")
return False
if CONDITION.CONTAINER_INFO in conditions:
cond = conditions.get(CONDITION.CONTAINER_INFO)
if not compare_generic(cond, container.get_container_info_dict().get,
"Misconfiguration in your container info condition: {0!s}"):
return False
if CONDITION.CONTAINER_LAST_AUTH in conditions:
cond = conditions.get(CONDITION.CONTAINER_LAST_AUTH)
last_auth = container.last_authentication
res = False
if last_auth:
res = compare_time(cond, last_auth)
if not res:
return False
if CONDITION.CONTAINER_LAST_SYNC in conditions:
cond = conditions.get(CONDITION.CONTAINER_LAST_SYNC)
last_sync = container.last_synchronization
res = False
if last_sync:
res = compare_time(cond, last_sync)
if not res:
return False
return True
[docs]
def do(self, action, options=None):
"""
This method executes the defined action in the given event.
:param action:
:param options: Contains the flask parameters g and request and the
handler_def configuration
:type options: dict
:return:
"""
log.info("In fact we are doing nothing, be we presume we are doing"
f"{action!s}")
return True