# -*- coding: utf-8 -*-
#
# 2015-03-15 Cornelius Kölbel <cornelius@privacyidea.org>
# Add decorator for losttoken
# 2015-02-06 Cornelius Kölbel <cornelius@privacyidea.org>
# Rewrite for flask migration.
# Policies handled by decorators as
# 1. precondition for API calls
# 2. internal modifications of LIB-functions
# 3. postcondition for API calls
#
# Jul 07, 2014 add check_machine_policy, Cornelius Kölbel
# May 08, 2014 Cornelius Kölbel
#
# License: AGPLv3
# contact: http://www.privacyidea.org
#
# This code is free software; you can redistribute it and/or
# modify it under the terms of the GNU AFFERO GENERAL PUBLIC LICENSE
# License as published by the Free Software Foundation; either
# version 3 of the License, or any later version.
#
# This code is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNE7SS FOR A PARTICULAR PURPOSE. See the
# GNU AFFERO GENERAL PUBLIC LICENSE for more details.
#
# You should have received a copy of the GNU Affero General Public
# License along with this program. If not, see <http://www.gnu.org/licenses/>.
#
"""
These are the policy decorator functions for internal (lib) policy decorators.
policy decorators for the API (pre/post) are defined in api/lib/policy
The functions of this module are tested in tests/test_lib_policy_decorator.py
"""
import logging
from privacyidea.lib.error import PolicyError
import functools
from privacyidea.lib.policy import ACTION, SCOPE, ACTIONVALUE, LOGINMODE
from privacyidea.lib.user import User
log = logging.getLogger(__name__)
[docs]class libpolicy(object):
"""
This is the decorator wrapper to call a specific function before a
library call in contrast to prepolicy and postpolicy, which are to be
called in API Calls.
The decorator expects a named parameter "options". In this options dict
it will look for the flask global "g".
"""
def __init__(self, decorator_function):
"""
:param decorator_function: This is the policy function the is to be
called
:type decorator_function: function
"""
self.decorator_function = decorator_function
def __call__(self, wrapped_function):
"""
This decorates the given function.
If some error occur the a PolicyException is raised.
The decorator function takes the options parameter and can modify
the behaviour of the original function.
:param wrapped_function: The function, that is decorated.
:type wrapped_function: API function
:return: None
"""
@functools.wraps(wrapped_function)
def policy_wrapper(*args, **kwds):
return self.decorator_function(wrapped_function, *args, **kwds)
return policy_wrapper
[docs]def auth_user_has_no_token(wrapped_function, user_object, passw,
options=None):
"""
This decorator checks if the user has a token at all.
If the user has a token, the wrapped function is called.
The wrapped function is usually token.check_user_pass, which takes the
arguments (user, passw, options={})
:param wrapped_function:
:param user_object:
:param passw:
:param options: Dict containing values for "g" and "clientip"
:return: Tuple of True/False and reply-dictionary
"""
from privacyidea.lib.token import get_tokens
options = options or {}
g = options.get("g")
if g:
clientip = options.get("clientip")
policy_object = g.policy_object
pass_no_token = policy_object.get_policies(action=ACTION.PASSNOTOKEN,
scope=SCOPE.AUTH,
realm=user_object.realm,
user=user_object.login,
client=clientip)
if len(pass_no_token) > 0:
# Now we need to check, if the user really has no token.
tokencount = get_tokens(user=user_object, count=True)
if tokencount == 0:
return True, {"message": "The user has not token, but is "
"accepted due to policy '%s'." %
pass_no_token[0].get("name")}
# If nothing else returned, we return the wrapped function
return wrapped_function(user_object, passw, options)
[docs]def auth_user_does_not_exist(wrapped_function, user_object, passw,
options=None):
"""
This decorator checks, if the user does exist at all.
If the user does exist, the wrapped function is called.
The wrapped function is usually token.check_user_pass, which takes the
arguments (user, passw, options={})
:param wrapped_function:
:param user_object:
:param passw:
:param options: Dict containing values for "g" and "clientip"
:return: Tuple of True/False and reply-dictionary
"""
options = options or {}
g = options.get("g")
if g:
clientip = options.get("clientip")
policy_object = g.policy_object
pass_no_user = policy_object.get_policies(action=ACTION.PASSNOUSER,
scope=SCOPE.AUTH,
realm=user_object.realm,
user=user_object.login,
client=clientip)
if len(pass_no_user) > 0:
return True, {"message": "The user does not exist, but is "
"accepted due to policy '%s'." %
pass_no_user[0].get("name")}
# If nothing else returned, we return the wrapped function
return wrapped_function(user_object, passw, options)
[docs]def auth_user_passthru(wrapped_function, user_object, passw, options=None):
"""
This decorator checks the policy settings of ACTION.PASSTHRU.
If the authentication against the userstore is not successful,
the wrapped function is called.
The wrapped function is usually token.check_user_pass, which takes the
arguments (user, passw, options={})
:param wrapped_function:
:param user_object:
:param passw:
:param options: Dict containing values for "g" and "clientip"
:return: Tuple of True/False and reply-dictionary
"""
from privacyidea.lib.token import get_tokens
options = options or {}
g = options.get("g")
if g:
clientip = options.get("clientip")
policy_object = g.policy_object
pass_thru = policy_object.get_policies(action=ACTION.PASSTHRU,
scope=SCOPE.AUTH,
realm=user_object.realm,
user=user_object.login,
client=clientip)
if len(pass_thru) > 0:
# If the user has NO Token, authenticate against the user store
if get_tokens(user=user_object, count=True) == 0:
# Now we need to check the userstore password
if user_object.check_password(passw):
return True, {"message": "The user authenticated against his "
"userstore according to "
"policy '%s'." %
pass_thru[0].get("name")}
# If nothing else returned, we return the wrapped function
return wrapped_function(user_object, passw, options)
[docs]def login_mode(wrapped_function, *args, **kwds):
"""
Decorator to decorate the lib.auth.check_webui_user function.
Depending on ACTION.LOGINMODE it sets the check_otp parameter, to signal
that the authentication should be performed against privacyIDEA.
:param wrapped_function: Usually the function check_webui_user
:param args: arguments user_obj and password
:param kwds: keyword arguments like options and !check_otp!
kwds["options"] contains the flask g
:return: calls the original function with the modified "check_otp" argument
"""
ERROR = "There are contradicting policies for the action %s!" % \
ACTION.LOGINMODE
# if tokenclass.check_pin is called in any other way, options may be None
# or it might have no element "g".
options = kwds.get("options") or {}
g = options.get("g")
if g:
# We need the user but we do not need the password
user_object = args[0]
clientip = options.get("clientip")
# get the policy
policy_object = g.policy_object
login_mode_list = policy_object.get_action_values(ACTION.LOGINMODE,
scope=SCOPE.WEBUI,
realm=user_object.realm,
user=user_object.login,
client=clientip)
if len(login_mode_list) > 0:
# There is a login mode policy
# reduce the list:
login_mode_list = list(set(login_mode_list))
if len(login_mode_list) > 1: # pragma: no cover
# We can not decide how to handle the request, so we raise an
# exception
raise PolicyError(ERROR)
if login_mode_list[0] == LOGINMODE.PRIVACYIDEA:
# The original function should check against privacyidea!
kwds["check_otp"] = True
return wrapped_function(*args, **kwds)
[docs]def auth_otppin(wrapped_function, *args, **kwds):
"""
Decorator to decorate the tokenclass.check_pin function.
Depending on the ACTION.OTPPIN it
* either simply accepts an empty pin
* checks the pin against the userstore
* or passes the request to the wrapped_function
:param wrapped_function: In this case the wrapped function should be
tokenclass.check_ping
:param *args: args[1] is the pin
:param **kwds: kwds["options"] contains the flask g
:return: True or False
"""
ERROR = "There are contradicting policies for the action %s!" % \
ACTION.OTPPIN
# if tokenclass.check_pin is called in any other way, options may be None
# or it might have no element "g".
options = kwds.get("options") or {}
g = options.get("g")
if g:
token = args[0]
pin = args[1]
clientip = options.get("clientip")
user_object = kwds.get("user")
if not user_object:
# No user in the parameters, so we need to determine the owner of
# the token
user_object = token.get_user()
realms = token.get_realms()
if not user_object and len(realms):
# if the token has not owner, we take a realm.
user_object = User("", realm=realms[0])
if not user_object:
# If we still have no user and no tokenrealm, we create an empty
# user object.
user_object=User("", realm="")
# get the policy
policy_object = g.policy_object
otppin_list = policy_object.get_action_values(ACTION.OTPPIN,
scope=SCOPE.AUTH,
realm=user_object.realm,
user=user_object.login,
client=clientip)
if len(otppin_list) > 0:
# There is an otppin policy
# reduce the list:
otppin_list = list(set(otppin_list))
if len(otppin_list) > 1:
# We can not decide how to handle the request, so we raise an
# exception
raise PolicyError(ERROR)
if otppin_list[0] == ACTIONVALUE.NONE:
if pin == "":
# No PIN checking, we expect an empty PIN!
return True
else:
return False
if otppin_list[0] == ACTIONVALUE.USERSTORE:
rv = user_object.check_password(pin)
return rv is not None
# call and return the original check_pin function
return wrapped_function(*args, **kwds)
[docs]def config_lost_token(wrapped_function, *args, **kwds):
"""
Decorator to decorate the lib.token.lost_token function.
Depending on ACTION.LOSTTOKENVALID, ACTION.LOSTTOKENPWCONTENTS,
ACTION.LOSTTOKENPWLEN it sets the check_otp parameter, to signal
how the lostToken should be generated.
:param wrapped_function: Usually the function lost_token()
:param args: argument "serial" as the old serial number
:param kwds: keyword arguments like "validity", "contents", "pw_len"
kwds["options"] contains the flask g
:return: calls the original function with the modified "validity",
"contents" and "pw_len" argument
"""
# if called in any other way, options may be None
# or it might have no element "g".
from privacyidea.lib.token import get_tokens
options = kwds.get("options") or {}
g = options.get("g")
if g:
# We need the old serial number, to determine the user - if it exist.
serial = args[0]
toks = get_tokens(serial=serial)
if len(toks) == 1:
username = None
realm = None
user_object = toks[0].get_user()
if user_object:
username = user_object.login
realm = user_object.realm
clientip = options.get("clientip")
# get the policy
policy_object = g.policy_object
contents_list = policy_object.get_action_values(
ACTION.LOSTTOKENPWCONTENTS,
scope=SCOPE.ENROLL,
realm=realm,
user=username,
client=clientip)
validity_list = policy_object.get_action_values(
ACTION.LOSTTOKENVALID,
scope=SCOPE.ENROLL,
realm=realm,
user=username,
client=clientip)
pw_len_list = policy_object.get_action_values(
ACTION.LOSTTOKENPWLEN,
scope=SCOPE.ENROLL,
realm=realm,
user=username,
client=clientip)
if len(contents_list) > 0:
contents_list = list(set(contents_list))
if len(contents_list) > 1: # pragma: no cover
# We can not decide how to handle the request, so we raise an
# exception
raise PolicyError("There are contradicting policies for the "
"action %s" % ACTION.LOSTTOKENPWCONTENTS)
kwds["contents"] = contents_list[0]
if len(validity_list) > 0:
validity_list = list(set(validity_list))
if len(validity_list) > 1: # pragma: no cover
# We can not decide how to handle the request, so we raise an
# exception
raise PolicyError("There are contradicting policies for the "
"action %s" % ACTION.LOSTTOKENVALID)
kwds["validity"] = int(validity_list[0])
if len(pw_len_list) > 0:
pw_len_list = list(set(pw_len_list))
if len(pw_len_list) > 1: # pragma: no cover
# We can not decide how to handle the request, so we raise an
# exception
raise PolicyError("There are contradicting policies for the "
"action %s" % ACTION.LOSTTOKENPWLEN)
kwds["pw_len"] = int(pw_len_list[0])
return wrapped_function(*args, **kwds)