Source code for privacyidea.lib.eventhandler.base

#  2017-08-11 Cornelius Kölbel <cornelius.koelbel@netknights.it>
#             Add condition for detail->error->message
#  2017-07-19 Cornelius Kölbel <cornelius.koelbel@netknights.it>
#             Add possibility to compare tokeninfo field against fixed time
#             and also {now} with offset.
#  2016-05-04 Cornelius Kölbel <cornelius.koelbel@netknights.it>
#             Initial writeup
#
# License:  AGPLv3
# (c) 2016. Cornelius Kölbel
#
# This code is free software; you can redistribute it and/or
# modify it under the terms of the GNU AFFERO GENERAL PUBLIC LICENSE
# License as published by the Free Software Foundation; either
# version 3 of the License, or any later version.
#
# This code is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU AFFERO GENERAL PUBLIC LICENSE for more details.
#
# You should have received a copy of the GNU Affero General Public
# License along with this program.  If not, see <http://www.gnu.org/licenses/>.
#
#
__doc__ = """This is the base class for an event handler module.
The event handler module is bound to an event together with

* a condition and
* an action
* optional options ;-)
"""

import datetime
import logging
import re

from dateutil.tz import tzlocal

from privacyidea.lib import _
from privacyidea.lib.auth import ROLE
from privacyidea.lib.config import get_token_types
from privacyidea.lib.container import (find_container_by_serial, find_container_for_token, get_all_containers,
                                       get_container_classes)
from privacyidea.lib.containerclass import TokenContainerClass
from privacyidea.lib.counter import read as counter_read
from privacyidea.lib.realm import get_realms
from privacyidea.lib.resolver import get_resolver_list
from privacyidea.lib.token import get_token_owner, get_tokens
from privacyidea.lib.user import User
from privacyidea.lib.utils import (compare_condition, compare_generic_condition,
                                   parse_time_offset_from_now, is_true,
                                   check_ip_in_policy, AUTH_RESPONSE)
from privacyidea.lib.tokenclass import DATE_FORMAT
from privacyidea.lib.challenge import get_challenges

log = logging.getLogger(__name__)


class CONDITION(object):
    """
    Possible conditions
    """
    TOKEN_HAS_OWNER = "token_has_owner"  # nosec B105 # condition name
    TOKEN_IS_ORPHANED = "token_is_orphaned"  # nosec B105 # condition name
    TOKEN_VALIDITY_PERIOD = "token_validity_period"  # nosec B105 # condition name
    USER_TOKEN_NUMBER = "user_token_number"  # nosec B105 # condition name
    USER_CONTAINER_NUMBER = "user_container_number"
    OTP_COUNTER = "otp_counter"
    TOKENTYPE = "tokentype"
    LAST_AUTH = "last_auth"
    COUNT_AUTH = "count_auth"
    COUNT_AUTH_SUCCESS = "count_auth_success"
    COUNT_AUTH_FAIL = "count_auth_fail"
    COUNTER = "counter"
    FAILCOUNTER = 'failcounter'
    TOKENINFO = "tokeninfo"
    DETAIL_ERROR_MESSAGE = "detail_error_message"
    DETAIL_MESSAGE = "detail_message"
    RESULT_VALUE = "result_value"
    RESULT_STATUS = "result_status"
    RESULT_AUTHENTICATION = "result_authentication"
    TOKENREALM = "tokenrealm"
    TOKENRESOLVER = "tokenresolver"
    TOKEN_IS_IN_CONTAINER = "token_is_in_container"
    REALM = "realm"
    RESOLVER = "resolver"
    CLIENT_IP = "client_ip"
    ROLLOUT_STATE = "rollout_state"
    CHALLENGE_SESSION = "challenge_session"
    CHALLENGE_EXPIRED = "challenge_expired"
    CONTAINER_STATE = "container_state"
    CONTAINER_EXACT_STATE = "container_exact_state"
    CONTAINER_HAS_OWNER = "container_has_owner"
    CONTAINER_TYPE = "container_type"
    CONTAINER_HAS_TOKEN = "container_has_token"
    SERIAL = "serial"


class GROUP(object):
    """
    These are the event handler groups. The conditions
    will be grouped in the UI.
    """
    TOKEN = "token"  # nosec B105 # group name
    GENERAL = "general"
    USER = "user"
    COUNTER = "counter"
    CHALLENGE = "challenge"
    CONTAINER = "container"


[docs]class BaseEventHandler(object): """ An Eventhandler needs to return a list of actions, which it can handle. It also returns a list of allowed action and conditions It returns an identifier, which can be used in the event-handling definitions """ identifier = "BaseEventHandler" description = "This is the base class of an EventHandler with no " \ "functionality" def __init__(self): pass self.run_details = None @property def allowed_positions(self): """ This returns the allowed positions of the event handler definition. This can be "post" or "pre" or both. :return: list of allowed positions """ return ["post"] @property def actions(self): """ This method returns a list of available actions, that are provided by this event handler. :return: dictionary of actions. """ actions = ["sample_action_1", "sample_action_2"] return actions @property def conditions(self): """ The UserNotification can filter for conditions like * type of logged-in user and * successful or failed value.success allowed types are str, multi, text, regexp :return: dict """ realms = get_realms() resolvers = get_resolver_list() container_states = [{"name": state} for state in TokenContainerClass.get_state_types().keys()] cond = { CONDITION.CHALLENGE_SESSION: { "type": "str", "desc": _("The challenge session matches the string or regular " "expression (like 'challenge_declined' or 'enrollment')"), "group": GROUP.CHALLENGE }, CONDITION.CHALLENGE_EXPIRED: { "type": "str", "desc": _("The challenge of a token during the authentication process" " is expired."), "value": ("True", "False"), "group": GROUP.CHALLENGE }, CONDITION.ROLLOUT_STATE: { "type": "str", "desc": _("The rollout_state of the token has a certain value like 'clientwait' or 'enrolled'."), "group": GROUP.TOKEN }, CONDITION.REALM: { "type": "multi", "desc": _("The realm of the user, for which this event should apply."), "value": [{"name": r} for r in realms], "group": GROUP.USER }, CONDITION.RESOLVER: { "type": "multi", "desc": _("The resolver of the user, for which this event should apply."), "value": [{"name": r} for r in resolvers], "group": GROUP.USER }, CONDITION.TOKENREALM: { "type": "multi", "desc": _("The realm of the token, for which this event should " "apply."), "value": [{"name": r} for r in realms], "group": GROUP.TOKEN }, CONDITION.TOKENRESOLVER: { "type": "multi", "desc": _("The resolver of the token, for which this event should " "apply."), "value": [{"name": r} for r in resolvers], "group": GROUP.TOKEN }, CONDITION.TOKENTYPE: { "type": "multi", "desc": _("The type of the token."), "value": [{"name": r} for r in get_token_types()], "group": GROUP.TOKEN }, "logged_in_user": { "type": "str", "desc": _("The logged in user is of the following type."), "value": (ROLE.ADMIN, ROLE.USER), "group": GROUP.USER }, CONDITION.RESULT_VALUE: { "type": "str", "desc": _("The result.value within the response is " "True or False."), "value": ("True", "False"), "group": GROUP.GENERAL }, CONDITION.RESULT_STATUS: { "type": "str", "desc": _("The result.status within the response is " "True or False."), "value": ("True", "False"), "group": GROUP.GENERAL }, CONDITION.RESULT_AUTHENTICATION: { "type": "str", "desc": _("The result.authentication within the response is the given value."), "value": ( AUTH_RESPONSE.ACCEPT, AUTH_RESPONSE.REJECT, AUTH_RESPONSE.CHALLENGE, AUTH_RESPONSE.DECLINED), "group": GROUP.GENERAL }, "token_locked": { "type": "str", "desc": _("Check if the max failcounter of the token is " "reached."), "value": ("True", "False"), "group": GROUP.TOKEN }, CONDITION.TOKEN_HAS_OWNER: { "type": "str", "desc": _("The token has a user assigned."), "value": ("True", "False"), "group": GROUP.TOKEN }, CONDITION.TOKEN_IS_ORPHANED: { "type": "str", "desc": _("The token has a user assigned, but the user does " "not exist in the userstore anymore."), "value": ("True", "False"), "group": GROUP.TOKEN }, CONDITION.TOKEN_VALIDITY_PERIOD: { "type": "str", "desc": _("Check if the token is within its validity period."), "value": ("True", "False"), "group": GROUP.TOKEN }, CONDITION.SERIAL: { "type": "regexp", "desc": _("Action is triggered, if the serial matches this " "regular expression."), "group": GROUP.TOKEN }, CONDITION.TOKEN_IS_IN_CONTAINER: { "type": "str", "desc": _("The token is in a container."), "value": ("True", "False"), "group": GROUP.TOKEN }, CONDITION.USER_TOKEN_NUMBER: { "type": "str", "desc": _("Action is triggered, if the user has this number " "of tokens assigned."), "group": GROUP.USER }, CONDITION.USER_CONTAINER_NUMBER: { "type": "str", "desc": _("Action is triggered, if the user has this number " "of containers assigned."), "group": GROUP.USER }, CONDITION.OTP_COUNTER: { "type": "str", "desc": _("Action is triggered, if the counter of the token " "equals this setting. Can also be " "'>100' or '<99' for no exact match."), "group": GROUP.COUNTER }, CONDITION.LAST_AUTH: { "type": "str", "desc": _("Action is triggered, if the last authentication of " "the token is older than 7h, 10d or 1y."), "group": GROUP.TOKEN }, CONDITION.COUNT_AUTH: { "type": "str", "desc": _("This can be '>100', '<99', or '=100', to trigger " "the action, if the tokeninfo field 'count_auth' is " "bigger than 100, less than 99 or exactly 100."), "group": GROUP.COUNTER }, CONDITION.COUNT_AUTH_SUCCESS: { "type": "str", "desc": _("This can be '>100', '<99', or '=100', to trigger " "the action, if the tokeninfo field " "'count_auth_success' is " "bigger than 100, less than 99 or exactly 100."), "group": GROUP.COUNTER }, CONDITION.COUNT_AUTH_FAIL: { "type": "str", "desc": _("This can be '>100', '<99', or '=100', to trigger " "the action, if the difference between the tokeninfo " "field 'count_auth' and 'count_auth_success is " "bigger than 100, less than 99 or exactly 100."), "group": GROUP.COUNTER }, CONDITION.FAILCOUNTER: { "type": "str", "desc": _("This can be '>9', '<9', or '=10', to trigger " "the action, if the failcounter of a token matches this value. " "Note that the failcounter stops increasing, if the max_failcount is " "reached."), "group": GROUP.COUNTER }, CONDITION.TOKENINFO: { "type": "str", "desc": _("This condition can check any arbitrary tokeninfo " "field. You need to enter something like " "'<fieldname> == <fieldvalue>', '<fieldname> > " "<fieldvalue>' or '<fieldname> < <fieldvalue>'."), "group": GROUP.TOKEN }, CONDITION.COUNTER: { "type": "str", "desc": _("This condition can check the value of an arbitrary event counter and " "compare it like 'myCounter == 1000', 'myCounter > 1000' or " "'myCounter < 1000'."), "group": GROUP.COUNTER }, CONDITION.DETAIL_ERROR_MESSAGE: { "type": "str", "desc": _("Here you can enter a regular expression. The " "condition only applies if the regular expression " "matches the detail->error->message in the response."), "group": GROUP.GENERAL }, CONDITION.DETAIL_MESSAGE: { "type": "str", "desc": _("Here you can enter a regular expression. The " "condition only applies if the regular expression " "matches the detail->message in the response."), "group": GROUP.GENERAL }, CONDITION.CLIENT_IP: { "type": "str", "desc": _("Trigger the action, if the client IP matches."), "group": GROUP.GENERAL }, CONDITION.CONTAINER_STATE: { "type": "multi", "desc": _("The container is in the specified states, but can additionally be in other states."), "value": container_states, "group": GROUP.CONTAINER }, CONDITION.CONTAINER_EXACT_STATE: { "type": "multi", "desc": _("The container is only in the specified states."), "value": container_states, "group": GROUP.CONTAINER }, CONDITION.CONTAINER_HAS_OWNER: { "type": "str", "desc": _("The container has a user assigned."), "value": ("True", "False"), "group": GROUP.CONTAINER }, CONDITION.CONTAINER_HAS_TOKEN: { "type": "str", "desc": _("The container has at least one token assigned."), "value": ("True", "False"), "group": GROUP.CONTAINER }, CONDITION.CONTAINER_TYPE: { "type": "str", "desc": _("The container is of a certain type."), "value": list(get_container_classes().keys()), "group": GROUP.CONTAINER } } return cond @property def events(self): """ This method returns a list allowed events, that this event handler can be bound to and which it can handle with the corresponding actions. An eventhandler may return an asterisk ["*"] indicating, that it can be used in all events. :return: list of events """ events = ["*"] return events @staticmethod def _get_tokenowner(request): user = User() if hasattr(request, "User"): user = request.User if user is None: user = User() serial = request.all_data.get("serial") if user.is_empty() and serial: # maybe the user is empty, but a serial was passed. # Then we determine the user by the serial try: user = get_token_owner(serial) or User() except Exception as exx: user = User() # This can happen for orphaned tokens. log.info("Could not determine tokenowner for {0!s}. Maybe the " "user does not exist anymore.".format(serial)) log.debug(exx) # If the user does not exist, we set an empty user if not user.exist(): user = User() return user @staticmethod def _get_container_owners(request): users = [] user = User(login='', realm='') if hasattr(request, "User"): user = request.User if user: users.append(user) serial = request.all_data.get("container_serial") if not user and serial: # maybe the user is empty, but a serial was passed. # Then we determine the user by the serial container = find_container_by_serial(serial) users = container.get_users() return users @classmethod def _get_users_from_request(cls, request): """ Extracts the user information from the request or searches for the container owners or token owner. If no user is found, an empty user is returned. :param request: The request object :return: List of user objects """ users = cls._get_container_owners(request) if len(users) == 0: user = cls._get_tokenowner(request) if not user.is_empty(): users.append(user) return users @classmethod def _get_token_serials(cls, request, content, g): """ Extracts the token serials from the request, content or audit object. :param request: The request object :param content: The content of the response :param g: The g object :return: Comma separated string of token serials """ # Get single token serial serial = request.all_data.get("serial") or \ content.get("detail", {}).get("serial") or \ g.audit_object.audit_data.get("serial") return serial @classmethod def _get_container_serial(cls, request, content): """ Get the container serial from the request, content or audit object. :param request: The request object :param content: The content of the response :return: The container serial or None if no serial could be found """ # get serial from request container_serial = request.all_data.get("container_serial") if not cls._serial_is_from_container(container_serial): container_serial = None if not container_serial: # get serial from response value = content.get("result", {}).get('value', {}) if isinstance(value, dict): container_serial = value.get("container_serial") if not cls._serial_is_from_container(container_serial): container_serial = None return container_serial @classmethod def _get_container_serial_from_token(cls, request, content, g): """ Tries to get a token serial. If a token is found, it tries to get the container serial from this token. :param request: The request object :param content: The content of the response :param g: The g object :return: The container serial or None if no serial could be found """ container_serial = None serials = cls._get_token_serials(request, content, g) if serials: serial_list = serials.replace(' ', '').split(',') if len(serial_list): # Only if one token serial is provided a corresponding container can be found serial = serial_list[0] container_serial = cls._get_container_serial_from_token_serial(serial) return container_serial @classmethod def _serial_is_from_container(cls, serial): """ Validates whether the given serial is from a container. :param serial: The serial to validate :return: True if a container with the given serial exists, False otherwise """ container_exists = False if serial: # Check if a container exists with this serial containers = get_all_containers(serial=serial, pagesize=0, page=0)['containers'] if len(containers) > 0: container_exists = True return container_exists @classmethod def _get_container_serial_from_token_serial(cls, token_serial): """ Tries to get the container serial from a token. :param token_serial: Serial of a token :return: container serial or None if the token is not part of a container or the token does not exist """ container_serial = None if token_serial: tokens = get_tokens(serial=token_serial) if len(tokens) > 0: token = tokens[0] container = find_container_for_token(token.get_serial()) if container: container_serial = container.serial return container_serial @staticmethod def _get_response_content(response): content = {} if response: if response.is_json: content = response.json return content
[docs] def check_condition(self, options): """ Check if all conditions are met and if the action should be executed. If the conditions are met, we return "True" :return: True """ g = options.get("g") request = options.get("request") response = options.get("response") e_handler_def = options.get("handler_def") if not e_handler_def: # options is the handler definition return True # conditions can be corresponding to the property conditions conditions = e_handler_def.get("conditions") content = self._get_response_content(response) user = self._get_tokenowner(request) serial = request.all_data.get("serial") or content.get("detail", {}).get("serial") transaction_id = request.all_data.get("transaction_id") tokenrealms = [] tokenresolvers = [] tokentype = None token_obj = None if serial: # We have determined the serial number from the request. token_obj_list = get_tokens(serial=serial) elif user: # We have to determine the token via the user object. But only if # the user has only one token token_obj_list = get_tokens(user=user) else: token_obj_list = [] if len(token_obj_list) == 1: # There is a token involved, so we determine its resolvers and realms token_obj = token_obj_list[0] tokenrealms = token_obj.get_realms() tokentype = token_obj.get_tokentype() all_realms = get_realms() for tokenrealm in tokenrealms: resolvers = all_realms.get(tokenrealm, {}).get("resolver", {}) tokenresolvers.extend([r.get("name") for r in resolvers]) tokenresolvers = list(set(tokenresolvers)) # Get container container_serial = self._get_container_serial(request, content) container = None if container_serial: container = find_container_by_serial(container_serial) elif serial and token_obj: container = find_container_for_token(serial) if CONDITION.CLIENT_IP in conditions: if g and g.client_ip: ip_policy = [ip.strip() for ip in conditions.get(CONDITION.CLIENT_IP).split(",")] found, excluded = check_ip_in_policy(g.client_ip, ip_policy) if not found or excluded: return False if CONDITION.REALM in conditions: if user.realm not in conditions.get(CONDITION.REALM).split(","): return False if CONDITION.RESOLVER in conditions: if user.resolver not in conditions.get(CONDITION.RESOLVER).split(","): return False if "logged_in_user" in conditions: # Determine the role of the user try: logged_in_user = g.logged_in_user user_role = logged_in_user.get("role") except Exception: # A non-logged-in-user is a User, not an admin user_role = ROLE.USER if user_role != conditions.get("logged_in_user"): return False if CONDITION.RESULT_VALUE in conditions: condition_value = conditions.get(CONDITION.RESULT_VALUE) result_value = content.get("result", {}).get("value") if is_true(condition_value) != is_true(result_value): return False if CONDITION.RESULT_STATUS in conditions: condition_value = conditions.get(CONDITION.RESULT_STATUS) result_status = content.get("result", {}).get("status") if is_true(condition_value) != is_true(result_status): return False if CONDITION.RESULT_AUTHENTICATION in conditions: condition_value = conditions.get(CONDITION.RESULT_AUTHENTICATION) result_auth = content.get("result", {}).get("authentication") if condition_value != result_auth: return False # checking of max-failcounter state of the token if "token_locked" in conditions: if token_obj: locked = token_obj.get_failcount() >= \ token_obj.get_max_failcount() if (conditions.get("token_locked") in ["True", True]) != \ locked: return False else: # check all tokens of the user, if any token is maxfail token_objects = get_tokens(user=user, maxfail=True) if not ','.join([tok.get_serial() for tok in token_objects]): return False if CONDITION.TOKENREALM in conditions and tokenrealms: res = False for trealm in tokenrealms: if trealm in conditions.get(CONDITION.TOKENREALM).split(","): res = True break if not res: return False if CONDITION.TOKENRESOLVER in conditions and tokenresolvers: res = False for tres in tokenresolvers: if tres in conditions.get(CONDITION.TOKENRESOLVER).split(","): res = True break if not res: return False if "serial" in conditions and serial: serial_match = conditions.get("serial") if not bool(re.match(serial_match, serial)): return False if CONDITION.USER_TOKEN_NUMBER in conditions and user: num_tokens = get_tokens(user=user, count=True) if num_tokens != int(conditions.get(CONDITION.USER_TOKEN_NUMBER)): return False if CONDITION.USER_CONTAINER_NUMBER in conditions and user: container_list = get_all_containers(user=user, page=1, pagesize=1) num_containers = container_list['count'] if num_containers != int(conditions.get(CONDITION.USER_CONTAINER_NUMBER)): return False if CONDITION.DETAIL_ERROR_MESSAGE in conditions: message = content.get("detail", {}).get("error", {}).get("message", "") search_exp = conditions.get(CONDITION.DETAIL_ERROR_MESSAGE) m = re.search(search_exp, message) if not bool(m): return False if CONDITION.DETAIL_MESSAGE in conditions: message = content.get("detail", {}).get("message", "") search_exp = conditions.get(CONDITION.DETAIL_MESSAGE) m = re.search(search_exp, message) if not bool(m): return False if CONDITION.COUNTER in conditions: # Can be counter==1000 if not compare_generic_condition(conditions.get(CONDITION.COUNTER), lambda x: counter_read(x) or 0, "Misconfiguration in your counter " "condition: {0!s}" ): return False # Token specific conditions if token_obj: if CONDITION.TOKENTYPE in conditions: if tokentype not in conditions.get(CONDITION.TOKENTYPE).split( ","): return False if CONDITION.TOKEN_HAS_OWNER in conditions: uid = token_obj.get_user_id() check = conditions.get(CONDITION.TOKEN_HAS_OWNER) if uid and check in ["True", True]: res = True elif not uid and check in ["False", False]: res = True else: log.debug("Condition token_has_owner for token {0!r} " "not fulfilled.".format(token_obj)) return False if CONDITION.TOKEN_IS_ORPHANED in conditions: orphaned = token_obj.is_orphaned() check = conditions.get(CONDITION.TOKEN_IS_ORPHANED) if orphaned and check in ["True", True]: res = True elif not orphaned and check in ["False", False]: res = True else: log.debug("Condition token_is_orphaned for token {0!r} not " "fulfilled.".format(token_obj)) return False if CONDITION.TOKEN_VALIDITY_PERIOD in conditions: valid = token_obj.check_validity_period() if (conditions.get(CONDITION.TOKEN_VALIDITY_PERIOD) in ["True", True]) != valid: return False if CONDITION.OTP_COUNTER in conditions: cond = conditions.get(CONDITION.OTP_COUNTER) if not compare_condition(cond, token_obj.token.count): return False if CONDITION.LAST_AUTH in conditions: if token_obj.check_last_auth_newer(conditions.get(CONDITION.LAST_AUTH)): return False if CONDITION.COUNT_AUTH in conditions: count = token_obj.get_count_auth() cond = conditions.get(CONDITION.COUNT_AUTH) if not compare_condition(cond, count): return False if CONDITION.COUNT_AUTH_SUCCESS in conditions: count = token_obj.get_count_auth_success() cond = conditions.get(CONDITION.COUNT_AUTH_SUCCESS) if not compare_condition(cond, count): return False if CONDITION.COUNT_AUTH_FAIL in conditions: count = token_obj.get_count_auth() c_success = token_obj.get_count_auth_success() c_fail = count - c_success cond = conditions.get(CONDITION.COUNT_AUTH_FAIL) if not compare_condition(cond, c_fail): return False if CONDITION.FAILCOUNTER in conditions: failcount = token_obj.get_failcount() cond = conditions.get(CONDITION.FAILCOUNTER) if not compare_condition(cond, failcount): return False if CONDITION.TOKENINFO in conditions: cond = conditions.get(CONDITION.TOKENINFO) # replace {now} in condition cond, td = parse_time_offset_from_now(cond) s_now = (datetime.datetime.now(tzlocal()) + td).strftime( DATE_FORMAT) cond = cond.format(now=s_now) if not compare_generic_condition(cond, token_obj.get_tokeninfo, "Misconfiguration in your tokeninfo " "condition: {0!s}"): return False if CONDITION.ROLLOUT_STATE in conditions: cond = conditions.get(CONDITION.ROLLOUT_STATE) if not cond == token_obj.token.rollout_state: return False # We also put the challenge condition here. If we do not have a # token-obj we can not identify challenges. if CONDITION.CHALLENGE_SESSION or CONDITION.CHALLENGE_EXPIRED in conditions: chals = get_challenges(serial=token_obj.token.serial, transaction_id=transaction_id) if len(chals) == 1: chal = chals[0] if CONDITION.CHALLENGE_SESSION in conditions: cond_match = conditions.get(CONDITION.CHALLENGE_SESSION) if not bool(re.match(cond_match, chal.session)): return False if CONDITION.CHALLENGE_EXPIRED in conditions: condition_value = conditions.get(CONDITION.CHALLENGE_EXPIRED) if is_true(condition_value) == chal.is_valid(): return False elif len(chals) > 1: # If there is more than one challenge, the conditions seams awkward log.warning("There are more than one challenge for token {0!s} " "and transaction_id {1!s}".format(token_obj.token.serial, transaction_id)) return False if CONDITION.TOKEN_IS_IN_CONTAINER in conditions: cond = conditions.get(CONDITION.TOKEN_IS_IN_CONTAINER) container = find_container_for_token(serial) token_is_in_container = container is not None if token_is_in_container and cond in ["True", True]: res = True elif not token_is_in_container and cond in ["False", False]: res = True else: log.debug(f"Condition {CONDITION.TOKEN_IS_IN_CONTAINER} for token {token_obj} " "not fulfilled.") return False # Container specific conditions if container: if CONDITION.CONTAINER_STATE in conditions: cond = conditions.get(CONDITION.CONTAINER_STATE).split(',') container_states = container.get_states() for cond_state in cond: if cond_state not in container_states: log.debug(f"Condition container_state {cond_state} for container {container.serial} " "not fulfilled.") return False if CONDITION.CONTAINER_EXACT_STATE in conditions: cond = conditions.get(CONDITION.CONTAINER_EXACT_STATE).split(',') container_states = container.get_states() if len(cond) != len(container_states): log.debug(f"Condition container_single_state {cond} for container {container.serial} " "not fulfilled.") return False for cond_state in cond: if cond_state not in container_states: log.debug(f"Condition container_state {cond_state} for container {container.serial} " "not fulfilled.") return False if CONDITION.CONTAINER_HAS_OWNER in conditions: has_container_owner = len(container.get_users()) > 0 cond = conditions.get(CONDITION.CONTAINER_HAS_OWNER) if has_container_owner and cond in ["True", True]: res = True elif not has_container_owner and cond in ["False", False]: res = True else: log.debug(f"Condition container_has_owner for container {container.serial} " "not fulfilled.") return False if CONDITION.CONTAINER_TYPE in conditions: cond = conditions.get(CONDITION.CONTAINER_TYPE) if container.type != cond: log.debug(f"Condition container_type {cond} for container {container.serial} " "not fulfilled.") return False if CONDITION.CONTAINER_HAS_TOKEN in conditions: tokens = [token.get_serial() for token in container.get_tokens()] cond = conditions.get(CONDITION.CONTAINER_HAS_TOKEN) if len(tokens) > 0 and cond in ["True", True]: res = True elif len(tokens) == 0 and cond in ["False", False]: res = True else: log.debug(f"Condition container_has_token for container {container.serial} " "not fulfilled.") return False return True
[docs] def do(self, action, options=None): """ This method executes the defined action in the given event. :param action: :param options: Contains the flask parameters g and request and the handler_def configuration :type options: dict :return: """ log.info("In fact we are doing nothing, be we presume we are doing" "{0!s}".format(action)) return True