# -*- coding: utf-8 -*-
#
# 2016-05-04 Cornelius Kölbel <cornelius.koelbel@netknights.it>
# Initial writup
#
# License: AGPLv3
# (c) 2016. Cornelius Kölbel
#
# This code is free software; you can redistribute it and/or
# modify it under the terms of the GNU AFFERO GENERAL PUBLIC LICENSE
# License as published by the Free Software Foundation; either
# version 3 of the License, or any later version.
#
# This code is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU AFFERO GENERAL PUBLIC LICENSE for more details.
#
# You should have received a copy of the GNU Affero General Public
# License along with this program. If not, see <http://www.gnu.org/licenses/>.
#
#
__doc__ = """This is the base class for an event handler module.
The event handler module is bound to an event together with
* a condition and
* an action
* optional options ;-)
"""
from privacyidea.lib import _
from privacyidea.lib.config import get_token_types
from privacyidea.lib.realm import get_realms
from privacyidea.lib.auth import ROLE
from privacyidea.lib.policy import ACTION
from privacyidea.lib.token import get_token_owner, get_tokens
from privacyidea.lib.user import User, UserError
from privacyidea.lib.utils import compare_condition
import re
import json
import logging
log = logging.getLogger(__name__)
class CONDITION(object):
"""
Possible conditions
"""
TOKEN_HAS_OWNER = "token_has_owner"
TOKEN_IS_ORPHANED = "token_is_orphaned"
TOKEN_VALIDITY_PERIOD = "token_validity_period"
USER_TOKEN_NUMBER = "user_token_number"
OTP_COUNTER = "otp_counter"
TOKENTYPE = "tokentype"
LAST_AUTH = "last_auth"
COUNT_AUTH = "count_auth"
COUNT_AUTH_SUCCESS = "count_auth_success"
COUNT_AUTH_FAIL = "count_auth_fail"
[docs]class BaseEventHandler(object):
"""
An Eventhandler needs to return a list of actions, which it can handle.
It also returns a list of allowed action and conditions
It returns an identifier, which can be used in the eventhandlig definitions
"""
identifier = "BaseEventHandler"
description = "This is the base class of an EventHandler with no " \
"functionality"
def __init__(self):
pass
@property
def actions(cls):
"""
This method returns a list of available actions, that are provided
by this event handler.
:return: dictionary of actions.
"""
actions = ["sample_action_1", "sample_action_2"]
return actions
@property
def conditions(cls):
"""
The UserNotification can filter for conditions like
* type of logged in user and
* successful or failed value.success
allowed types are str, multi, text, regexp
:return: dict
"""
realms = get_realms()
cond = {
"realm": {
"type": "str",
"desc": _("The user realm, for which this event should apply."),
"value": realms.keys()
},
"tokenrealm": {
"type": "multi",
"desc": _("The token realm, for which this event should "
"apply."),
"value": [{"name": r} for r in realms.keys()]
},
CONDITION.TOKENTYPE: {
"type": "multi",
"desc": _("The type of the token."),
"value": [{"name": r} for r in get_token_types()]
},
"logged_in_user": {
"type": "str",
"desc": _("The logged in user is of the following type."),
"value": (ROLE.ADMIN, ROLE.USER)
},
"result_value": {
"type": "str",
"desc": _("The result.value within the response is "
"True or False."),
"value": ("True", "False")
},
"token_locked": {
"type": "str",
"desc": _("Check if the max failcounter of the token is "
"reached."),
"value": ("True", "False")
},
CONDITION.TOKEN_HAS_OWNER: {
"type": "str",
"desc": _("The token has a user assigned."),
"value": ("True", "False")
},
CONDITION.TOKEN_IS_ORPHANED: {
"type": "str",
"desc": _("The token has a user assigned, but the user does "
"not exist in the userstore anymore."),
"value": ("True", "False")
},
CONDITION.TOKEN_VALIDITY_PERIOD: {
"type": "str",
"desc": _("Check if the token is within its validity period."),
"value": ("True", "False")
},
"serial": {
"type": "regexp",
"desc": _("Action is triggered, if the serial matches this "
"regular expression.")
},
CONDITION.USER_TOKEN_NUMBER: {
"type": "str",
"desc": _("Action is triggered, if the user has this number "
"of tokens assigned.")
},
CONDITION.OTP_COUNTER: {
"type": "str",
"desc": _("Action is triggered, if the counter of the token "
"equals this setting.")
},
CONDITION.LAST_AUTH: {
"type": "str",
"desc": _("Action is triggered, if the last authentication of "
"the token is older than 7h, 10d or 1y.")
},
CONDITION.COUNT_AUTH: {
"type": "str",
"desc": _("This can be '>100', '<99', or '=100', to trigger "
"the action, if the tokeninfo field 'count_auth' is "
"bigger than 100, less than 99 or exactly 100.")
},
CONDITION.COUNT_AUTH_SUCCESS: {
"type": "str",
"desc": _("This can be '>100', '<99', or '=100', to trigger "
"the action, if the tokeninfo field "
"'count_auth_success' is "
"bigger than 100, less than 99 or exactly 100.")
},
CONDITION.COUNT_AUTH_FAIL: {
"type": "str",
"desc": _("This can be '>100', '<99', or '=100', to trigger "
"the action, if the difference between the tokeninfo "
"field 'count_auth' and 'count_auth_success is "
"bigger than 100, less than 99 or exactly 100.")
}
}
return cond
@property
def events(cls):
"""
This method returns a list allowed events, that this event handler
can be bound to and which it can handle with the corresponding actions.
An eventhandler may return an asterisk ["*"] indicating, that it can
be used in all events.
:return: list of events
"""
events = ["*"]
return events
@staticmethod
def _get_tokenowner(request):
user = request.User
serial = request.all_data.get("serial")
if user.is_empty() and serial:
# maybe the user is empty, but a serial was passed.
# Then we determine the user by the serial
try:
user = get_token_owner(serial) or User()
except Exception as exx:
user = User()
# This can happen for orphaned tokens.
log.info("Could not determine tokenowner for {0!s}. Maybe the "
"user does not exist anymore.".format(serial))
log.debug(exx)
# We now check, if the user exists at all!
try:
ui = user.info
except UserError as exx:
if exx.id == 905:
user = User()
else:
raise exx
return user
[docs] def check_condition(self, options):
"""
Check if all conditions are met and if the action should be executed.
The the conditions are met, we return "True"
:return: True
"""
res = True
g = options.get("g")
request = options.get("request")
response = options.get("response")
e_handler_def = options.get("handler_def")
if not response or not e_handler_def:
# options is missing a response and the handler definition
# We are probably in test mode.
return True
# conditions can be correspnding to the property conditions
conditions = e_handler_def.get("conditions")
content = json.loads(response.data)
user = self._get_tokenowner(request)
serial = request.all_data.get("serial") or \
content.get("detail", {}).get("serial")
tokenrealms = []
tokentype = None
token_obj = None
if serial:
# We have determined the serial number from the request.
token_obj_list = get_tokens(serial=serial)
else:
# We have to determine the token via the user object. But only if
# the user has only one token
token_obj_list = get_tokens(user=user)
if len(token_obj_list) == 1:
token_obj = token_obj_list[0]
tokenrealms = token_obj.get_realms()
tokentype = token_obj.get_tokentype()
if "realm" in conditions:
res = user.realm == conditions.get("realm")
if "logged_in_user" in conditions and res:
# Determine the role of the user
try:
logged_in_user = g.logged_in_user
user_role = logged_in_user.get("role")
except Exception:
# A non-logged-in-user is a User, not an admin
user_role = ROLE.USER
res = user_role == conditions.get("logged_in_user")
# Check result_value only if the check condition is still True
if "result_value" in conditions and res:
condition_value = conditions.get("result_value")
result_value = content.get("result", {}).get("value")
res = condition_value == str(result_value)
# checking of max-failcounter state of the token
if "token_locked" in conditions and res:
if token_obj:
locked = token_obj.get_failcount() >= \
token_obj.get_max_failcount()
res = (conditions.get("token_locked") in ["True", True]) == \
locked
else:
# check all tokens of the user, if any token is maxfail
token_objects = get_tokens(user=user, maxfail=True)
if not ','.join([tok.get_serial() for tok in token_objects]):
res = False
if "tokenrealm" in conditions and res and tokenrealms:
res = False
for trealm in tokenrealms:
if trealm in conditions.get("tokenrealm").split(","):
res = True
break
if "serial" in conditions and res and serial:
serial_match = conditions.get("serial")
res = bool(re.match(serial_match, serial))
if CONDITION.USER_TOKEN_NUMBER in conditions and res and user:
num_tokens = get_tokens(user=user, count=True)
res = num_tokens == int(conditions.get(CONDITION.USER_TOKEN_NUMBER))
# Token specific conditions
if token_obj:
if CONDITION.TOKENTYPE in conditions and res:
res = False
if tokentype in conditions.get(CONDITION.TOKENTYPE).split(","):
res = True
if CONDITION.TOKEN_HAS_OWNER in conditions and res:
uid = token_obj.get_user_id()
check = conditions.get(CONDITION.TOKEN_HAS_OWNER)
if uid and check in ["True", True]:
res = True
elif not uid and check in ["False", False]:
res = True
else:
log.debug("Condition token_has_owner for token {0!r} "
"not fulfilled.".format(token_obj))
res = False
if CONDITION.TOKEN_IS_ORPHANED in conditions and res:
uid = token_obj.get_user_id()
orphaned = uid and not user
check = conditions.get(CONDITION.TOKEN_IS_ORPHANED)
if orphaned and check in ["True", True]:
res = True
elif not orphaned and check in ["False", False]:
res = True
else:
log.debug("Condition token_is_orphaned for token {0!r} not "
"fulfilled.".format(token_obj))
res = False
if CONDITION.TOKEN_VALIDITY_PERIOD in conditions and res:
valid = token_obj.check_validity_period()
res = (conditions.get(CONDITION.TOKEN_VALIDITY_PERIOD)
in ["True", True]) == valid
if CONDITION.OTP_COUNTER in conditions and res:
res = token_obj.token.count == \
int(conditions.get(CONDITION.OTP_COUNTER))
if CONDITION.LAST_AUTH in conditions and res:
res = not token_obj.check_last_auth_newer(conditions.get(
CONDITION.LAST_AUTH))
if CONDITION.COUNT_AUTH in conditions and res:
count = token_obj.get_count_auth()
cond = conditions.get(CONDITION.COUNT_AUTH)
res = compare_condition(cond, count)
if CONDITION.COUNT_AUTH_SUCCESS in conditions and res:
count = token_obj.get_count_auth_success()
cond = conditions.get(CONDITION.COUNT_AUTH_SUCCESS)
res = compare_condition(cond, count)
if CONDITION.COUNT_AUTH_FAIL in conditions and res:
count = token_obj.get_count_auth()
c_success = token_obj.get_count_auth_success()
c_fail = count - c_success
cond = conditions.get(CONDITION.COUNT_AUTH_FAIL)
res = compare_condition(cond, c_fail)
return res
[docs] def do(self, action, options=None):
"""
This method executes the defined action in the given event.
:param action:
:param options: Contains the flask parameters g and request and the
handler_def configuration
:type options: dict
:return:
"""
log.info("In fact we are doing nothing, be we presume we are doing"
"{0!s}".format(action))
return True