Source code for privacyidea.api.lib.prepolicy

# -*- coding: utf-8 -*-
#
#  2015-11-04 Cornelius Kölbel <cornelius.koelbel@netknights.i>
#             Add check for REMOTE_USER
#  2015-04-13 Cornelius Kölbel <cornelius.koelbel@netknights.it>
#             Add hook for external decorator for init and assign
#  2015-02-06 Cornelius Kölbel <cornelius@privacyidea.org>
#             Create this module for enabling decorators for API calls
#
#  License:  AGPLv3
#  contact:  http://www.privacyidea.org
#
# This code is free software; you can redistribute it and/or
# modify it under the terms of the GNU AFFERO GENERAL PUBLIC LICENSE
# License as published by the Free Software Foundation; either
# version 3 of the License, or any later version.
#
# This code is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU AFFERO GENERAL PUBLIC LICENSE for more details.
#
# You should have received a copy of the GNU Affero General Public
# License along with this program.  If not, see <http://www.gnu.org/licenses/>.
#
"""
These are the policy decorators as PRE conditions for the API calls.
I.e. these conditions are executed before the wrapped API call.
This module uses the policy base functions from
privacyidea.lib.policy but also components from flask like g.

Wrapping the functions in a decorator class enables easy modular testing.

The functions of this module are tested in tests/test_api_lib_policy.py
"""
import logging
log = logging.getLogger(__name__)
from privacyidea.lib.error import PolicyError
from flask import g, current_app
from privacyidea.lib.policy import SCOPE, ACTION, PolicyClass
from privacyidea.lib.user import (get_user_from_param, get_default_realm,
                                  split_user)
from privacyidea.lib.token import (get_tokens, get_realms_of_token)
from privacyidea.lib.utils import generate_password
from privacyidea.lib.auth import ROLE
import functools
import jwt
import re

optional = True
required = False


[docs]class prepolicy(object): """ This is the decorator wrapper to call a specific function before an API call. The prepolicy decorator is to be used in the API calls. A prepolicy decorator then will modify the request data or raise an exception """ def __init__(self, function, request, action=None): """ :param function: This is the policy function the is to be called :type function: function :param request: The original request object, that needs to be passed :type request: Request Object """ self.action = action self.request = request self.function = function def __call__(self, wrapped_function): """ This decorates the given function. The prepolicy decorator is ment for API functions on the API level. If some error occur the a PolicyException is raised. The decorator function can modify the request data. :param wrapped_function: The function, that is decorated. :type wrapped_function: API function :return: None """ @functools.wraps(wrapped_function) def policy_wrapper(*args, **kwds): self.function(request=self.request, action=self.action) return wrapped_function(*args, **kwds) return policy_wrapper
[docs]def init_random_pin(request=None, action=None): """ This policy function is to be used as a decorator in the API init function. If the policy is set accordingly it adds a random PIN to the request.all_data like. It uses the policy SCOPE.ENROLL, ACTION.OTPPINRANDOM to set a random OTP PIN during Token enrollment """ params = request.all_data policy_object = g.policy_object user_object = get_user_from_param(params) # get the length of the random PIN from the policies pin_pols = policy_object.get_action_values(action=ACTION.OTPPINRANDOM, scope=SCOPE.ENROLL, user=user_object.login, realm=user_object.realm, client=request.remote_addr, unique=True) if len(pin_pols) == 1: log.debug("Creating random OTP PIN with length %s" % pin_pols[0]) request.all_data["pin"] = generate_password(size=int(pin_pols[0])) # handle the PIN handle_pols = policy_object.get_action_values( action=ACTION.PINHANDLING, scope=SCOPE.ENROLL, user=user_object.login, realm=user_object.realm, client=request.remote_addr) # We can have more than one pin handler policy. So we can process the # PIN in several ways! for handle_pol in handle_pols: log.debug("Handle the random PIN with the class %s" % handle_pol) packageName = ".".join(handle_pol.split(".")[:-1]) className = handle_pol.split(".")[-1:][0] mod = __import__(packageName, globals(), locals(), [className]) pin_handler_class = getattr(mod, className) pin_handler = pin_handler_class() # Send the PIN pin_handler.send(request.all_data["pin"], request.all_data.get("serial", "N/A"), user_object, tokentype=request.all_data.get("type", "hotp"), logged_in_user=g.logged_in_user) return True
[docs]def check_otp_pin(request=None, action=None): """ This policy function checks if the OTP PIN that is about to be set follows the OTP PIN policies ACTION.OTPPINMAXLEN, ACTION.OTPPINMINLEN and ACTION.OTPPINCONTENTS in the SCOPE.USER. It is used to decorate the API functions. The pin is investigated in the params as pin = params.get("pin") In case the given OTP PIN does not match the requirements an exception is raised. """ # This policy is only used for USER roles at the moment: if g.logged_in_user.get("role") == "user": params = request.all_data pin = params.get("otppin", "") or params.get("pin", "") serial = params.get("serial") if serial: # if this is a token, that does not use a pin, we ignore this check # And immediately return true tokensobject_list = get_tokens(serial=serial) if len(tokensobject_list) == 1: if tokensobject_list[0].using_pin is False: return True policy_object = g.policy_object user_object = get_user_from_param(params) # get the policies for minimum length, maximum length and PIN contents pol_minlen = policy_object.get_action_values(action=ACTION.OTPPINMINLEN, scope=SCOPE.USER, user=user_object.login, realm=user_object.realm, client=request.remote_addr, unique=True) pol_maxlen = policy_object.get_action_values(action=ACTION.OTPPINMAXLEN, scope=SCOPE.USER, user=user_object.login, realm=user_object.realm, client=request.remote_addr, unique=True) pol_contents = policy_object.get_action_values(action=ACTION.OTPPINCONTENTS, scope=SCOPE.USER, user=user_object.login, realm=user_object.realm, client=request.remote_addr, unique=True) if len(pol_minlen) == 1: # check the minimum length requirement if len(pin) < int(pol_minlen[0]): raise PolicyError("The minimum OTP PIN length is %s" % pol_minlen[0]) if len(pol_maxlen) == 1: # check the maximum length requirement if len(pin) > int(pol_maxlen[0]): raise PolicyError("The maximum OTP PIN length is %s" % pol_minlen[0]) if len(pol_contents) == 1: # check the contents requirement chars = "[a-zA-Z]" # c digits = "[0-9]" # n special = "[.:,;-_<>+*!/()=?$§%&#~\^]" # s no_others = False grouping = False if pol_contents[0] == "-": no_others = True pol_contents = pol_contents[1:] elif pol_contents[0] == "+": grouping = True pol_contents = pol_contents[1:] # TODO implement grouping and substraction if "c" in pol_contents[0] and not re.search(chars, pin): raise PolicyError("Missing character in PIN: %s" % chars) if "n" in pol_contents[0] and not re.search(digits, pin): raise PolicyError("Missing character in PIN: %s" % digits) if "s" in pol_contents[0] and not re.search(special, pin): raise PolicyError("Missing character in PIN: %s" % special) return True
[docs]def encrypt_pin(request=None, action=None): """ This policy function is to be used as a decorator for several API functions. E.g. token/assign, token/setpin, token/init If the policy is set to define the PIN to be encrypted, the request.all_data is modified like this: encryptpin = True It uses the policy SCOPE.ENROLL, ACTION.ENCRYPTPIN """ params = request.all_data policy_object = g.policy_object user_object = get_user_from_param(params) # get the length of the random PIN from the policies pin_pols = policy_object.get_policies(action=ACTION.ENCRYPTPIN, scope=SCOPE.ENROLL, user=user_object.login, realm=user_object.realm, client=request.remote_addr, active=True) if len(pin_pols) > 0: request.all_data["encryptpin"] = "True" else: if "encryptpin" in request.all_data: del request.all_data["encryptpin"] return True
[docs]def init_tokenlabel(request=None, action=None): """ This policy function is to be used as a decorator in the API init function. It adds the tokenlabel definition to the params like this: params : { "tokenlabel": "<u>@<r>" } It uses the policy SCOPE.ENROLL, ACTION.TOKENLABEL to set the tokenlabel of Smartphone tokens during enrollment and this fill the details of the response. """ params = request.all_data policy_object = g.policy_object user_object = get_user_from_param(params) # get the serials from a policy definition label_pols = policy_object.get_action_values(action=ACTION.TOKENLABEL, scope=SCOPE.ENROLL, user=user_object.login, realm=user_object.realm, client=request.remote_addr, unique=True) if len(label_pols) == 1: # The policy was set, so we need to set the tokenlabel in the request. request.all_data["tokenlabel"] = label_pols[0] return True
[docs]def check_max_token_user(request=None, action=None): """ Pre Policy This checks the maximum token per user policy. Check ACTION.MAXTOKENUSER This decorator can wrap: /token/init (with a realm and user) /token/assign :param req: :param action: :return: True otherwise raises an Exception """ ERROR = "The number of tokens for this user is limited!" params = request.all_data user_object = get_user_from_param(params) if user_object.login: policy_object = g.policy_object limit_list = policy_object.get_action_values(ACTION.MAXTOKENUSER, scope=SCOPE.ENROLL, realm=user_object.realm, user=user_object.login, client=request.remote_addr) if len(limit_list) > 0: # we need to check how many tokens the user already has assigned! tokenobject_list = get_tokens(user=user_object) already_assigned_tokens = len(tokenobject_list) if already_assigned_tokens >= int(max(limit_list)): raise PolicyError(ERROR) return True
[docs]def check_max_token_realm(request=None, action=None): """ Pre Policy This checks the maximum token per realm. Check ACTION.MAXTOKENREALM This decorator can wrap: /token/init (with a realm and user) /token/assign /token/tokenrealms :param req: The request that is intercepted during the API call :type req: Request Object :param action: An optional Action :type action: basestring :return: True otherwise raises an Exception """ ERROR = "The number of tokens in this realm is limited!" params = request.all_data user_object = get_user_from_param(params) if user_object: realm = user_object.realm else: # pragma: no cover realm = params.get("realm") if realm: policy_object = g.policy_object limit_list = policy_object.get_action_values(ACTION.MAXTOKENREALM, scope=SCOPE.ENROLL, realm=realm, client=request.remote_addr) if len(limit_list) > 0: # we need to check how many tokens the user already has assigned! tokenobject_list = get_tokens(realm=realm) already_assigned_tokens = len(tokenobject_list) if already_assigned_tokens >= int(max(limit_list)): raise PolicyError(ERROR) return True
[docs]def set_realm(request=None, action=None): """ Pre Policy This pre condition gets the current realm and verifies if the realm should be rewritten due to the policy definition. I takes the realm from the request and - if a policy matches - replaces this realm with the realm defined in the policy Check ACTION.SETREALM This decorator should wrap /validate/check :param request: The request that is intercepted during the API call :type request: Request Object :param action: An optional Action :type action: basestring :returns: Always true. Modified the parameter request """ user_object = get_user_from_param(request.all_data) # At the moment a realm parameter with no user parameter returns a user # object like "@realm". If this is changed one day, we need to also fetch # the realm if user_object: realm = user_object.realm else: # pragma: no cover realm = request.all_data.get("realm") policy_object = g.policy_object new_realm = policy_object.get_action_values(ACTION.SETREALM, scope=SCOPE.AUTHZ, realm=realm, client=request.remote_addr) # reduce the entries to unique entries new_realm = list(set(new_realm)) if len(new_realm) > 1: raise PolicyError("I do not know, to which realm I should set the " "new realm. Conflicting policies exist.") elif len(new_realm) == 1: # There is one specific realm, which we set in the request request.all_data["realm"] = new_realm[0] return True
[docs]def mangle(request=None, action=None): """ This pre condition checks if either of the parameters pass, user or realm in a validate/check request should be rewritten based on an authentication policy with action "mangle". See :ref:`policy_mangle` for an example. Check ACTION.MANGLE This decorator should wrap /validate/check :param request: The request that is intercepted during the API call :type request: Request Object :param action: An optional Action :type action: basestring :returns: Always true. Modified the parameter request """ user_object = get_user_from_param(request.all_data) policy_object = g.policy_object mangle_pols = policy_object.get_action_values(ACTION.MANGLE, scope=SCOPE.AUTH, realm=user_object.realm, user=user_object.login, client=request.remote_addr) # reduce the entries to unique entries mangle_pols = list(set(mangle_pols)) # We can have several mangle policies! One for user, one for realm and # one for pass. So we do no checking here. for mangle_pol_action in mangle_pols: # mangle_pol_action looks like this: # keyword/search/replace/. Where "keyword" can be "user", "pass" or # "realm". mangle_key, search, replace, _rest = mangle_pol_action.split("/", 3) mangle_value = request.all_data.get(mangle_key) if mangle_value: log.debug("mangling authentication data: %s" % mangle_key) request.all_data[mangle_key] = re.sub(search, replace, mangle_value) return True
[docs]def check_base_action(request=None, action=None): """ This decorator function takes the request and verifies the given action for the SCOPE ADMIN or USER. :param req: :param action: :return: True otherwise raises an Exception """ ERROR = {"user": "User actions are defined, but this action is not " "allowed!", "admin": "Admin actions are defined, but this action is not " "allowed!"} params = request.all_data policy_object = g.policy_object username = g.logged_in_user.get("username") role = g.logged_in_user.get("role") scope = SCOPE.ADMIN admin_realm = g.logged_in_user.get("realm") if role == "user": scope = SCOPE.USER # Reset the admin realm admin_realm = None # get the realm by the serial: realm = params.get("realm") if params.get("serial") and not realm: realms = get_realms_of_token(params.get("serial")) if len(realms) > 0: realm = realms[0] else: realm = None action = policy_object.get_policies(action=action, user=username, realm=realm, scope=scope, client=request.remote_addr, adminrealm=admin_realm, active=True) action_at_all = policy_object.get_policies(scope=scope, active=True) if len(action_at_all) and len(action) == 0: raise PolicyError(ERROR.get(role)) return True
[docs]def check_token_upload(request=None, action=None): """ This decorator function takes the request and verifies the given action for scope ADMIN :param req: :param filename: :return: """ params = request.all_data policy_object = g.policy_object username = g.logged_in_user.get("username") admin_realm = g.logged_in_user.get("realm") action = policy_object.get_policies(action=ACTION.IMPORT, user=username, realm=params.get("realm"), scope=SCOPE.ADMIN, client=request.remote_addr, adminrealm=admin_realm, active=True) action_at_all = policy_object.get_policies(scope=SCOPE.ADMIN, active=True) if len(action_at_all) and len(action) == 0: raise PolicyError("Admin actions are defined, but you are not allowed" " to upload token files.") return True
[docs]def check_token_init(request=None, action=None): """ This decorator function takes the request and verifies if the requested tokentype is allowed to be enrolled in the SCOPE ADMIN or the SCOPE USER. :param request: :param action: :return: True or an Exception is raised """ ERROR = {"user": "User actions are defined, you are not allowed to " "enroll this token type!", "admin": "Admin actions are defined, but you are not allowed to " "enroll this token type!"} params = request.all_data policy_object = g.policy_object username = g.logged_in_user.get("username") role = g.logged_in_user.get("role") admin_realm = g.logged_in_user.get("realm") scope = SCOPE.ADMIN if role == "user": scope = SCOPE.USER admin_realm = None tokentype = params.get("type", "HOTP") action = "enroll%s" % tokentype.upper() action = policy_object.get_policies(action=action, user=username, realm=params.get("realm"), scope=scope, client=request.remote_addr, adminrealm=admin_realm, active=True) action_at_all = policy_object.get_policies(scope=scope, active=True) if len(action_at_all) and len(action) == 0: raise PolicyError(ERROR.get(role)) return True
[docs]def check_external(request=None, action="init"): """ This decorator is a hook to an external check function, that is called before the token/init or token/assign API. :param request: The REST request :type request: flask Request object :param action: This is either "init" or "assign" :type action: basestring :return: either True or an Exception is raised """ function_name = None try: module_func = current_app.config.get("PI_INIT_CHECK_HOOK") if module_func: module_name = ".".join(module_func.split(".")[:-1]) exec("import %s" % module_name) function_name = module_func.split(".")[-1] except Exception as exx: log.error("Error importing external check function: %s" % exx) # Import of function was successful if function_name: external_func = getattr(eval(module_name), function_name) external_func(request, action) return True
[docs]def api_key_required(request=None, action=None): """ This is a decorator for check_user_pass and check_serial_pass. It checks, if a policy scope=auth, action=apikeyrequired is set. If so, the validate request will only performed, if a JWT token is passed with role=validate. """ ERROR = "The policy requires an API key to authenticate, " \ "but no key was passed." params = request.all_data policy_object = g.policy_object user_object = get_user_from_param(params) # Get the policies action = policy_object.get_policies(action=ACTION.APIKEY, user=user_object.login, realm=user_object.realm, scope=SCOPE.AUTHZ, client=request.remote_addr, active=True) # Do we have a policy? if len(action) > 0: # check if we were passed a correct JWT # Get the Authorization token from the header auth_token = request.headers.get('PI-Authorization', None) if not auth_token: auth_token = request.headers.get('Authorization', None) try: r = jwt.decode(auth_token, current_app.secret_key) g.logged_in_user = {"username": r.get("username", ""), "realm": r.get("realm", ""), "role": r.get("role", "")} except AttributeError: raise PolicyError("No valid API key was passed.") role = g.logged_in_user.get("role") if role != ROLE.VALIDATE: raise PolicyError("A correct JWT was passed, but it was no API " "key.") # If everything went fine, we call the original function return True
[docs]def mock_success(req, action): """ This is a mock function as an example for check_external. This function returns success and the API call will go on unmodified. """ return True
[docs]def mock_fail(req, action): """ This is a mock function as an example for check_external. This function creates a problem situation and the token/init or token/assign will show this exception accordingly. """ raise Exception("This is an Exception in an external check function")
[docs]def is_remote_user_allowed(req): """ Checks if the REMOTE_USER server variable is allowed to be used. .. note:: This is not used as a decorator! :param req: The flask request, containing the remote user and the client IP :return: """ res = False if req.remote_user: loginname, realm = split_user(req.remote_user) realm = realm or get_default_realm() # Check if the remote user is allowed client_ip = req.remote_addr if "policy_object" not in g: g.policy_object = PolicyClass() ruser_active = g.policy_object.get_action_values(ACTION.REMOTE_USER, scope=SCOPE.WEBUI, user=loginname, realm=realm, client=client_ip) res = ruser_active return res