Source code for privacyidea.lib.tokens.radiustoken

# -*- coding: utf-8 -*-
#
#  2016-02-22 Cornelius Kölbel <cornelius@privacyidea.org>
#             Add the RADIUS identifier, which points to the system wide list
#             of RADIUS servers.
#  2015-10-09 Cornelius Kölbel <cornelius@privacyidea.org>
#             Add the RADIUS-System-Config, so that not each
#             RADIUS-token needs his own secret. -> change the
#             secret globally
#  2015-01-29 Adapt for migration to flask
#             Cornelius Kölbel <cornelius@privacyidea.org>
#
#  May 08, 2014 Cornelius Kölbel
#  License:  AGPLv3
#  contact:  http://www.privacyidea.org
#
#  Copyright (C) 2010 - 2014 LSE Leading Security Experts GmbH
#  License:  LSE
#  contact:  http://www.linotp.org
#            http://www.lsexperts.de
#            linotp@lsexperts.de
#
# This code is free software; you can redistribute it and/or
# modify it under the terms of the GNU AFFERO GENERAL PUBLIC LICENSE
# License as published by the Free Software Foundation; either
# version 3 of the License, or any later version.
#
# This code is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU AFFERO GENERAL PUBLIC LICENSE for more details.
#
# You should have received a copy of the GNU Affero General Public
# License along with this program.  If not, see <http://www.gnu.org/licenses/>.
#
__doc__ = """This module defines the RadiusTokenClass. The RADIUS token
forwards the authentication request to another RADIUS server.

The code is tested in tests/test_lib_tokens_radius
"""

import logging

import traceback
import binascii
from privacyidea.lib.tokenclass import TokenClass
from privacyidea.lib.tokens.remotetoken import RemoteTokenClass
from privacyidea.api.lib.utils import getParam, ParameterError
from privacyidea.lib.log import log_with
from privacyidea.lib.config import get_from_config
from privacyidea.lib.decorators import check_token_locked
from privacyidea.lib.radiusserver import get_radius

import pyrad.packet
from pyrad.client import Client
from pyrad.dictionary import Dictionary
from privacyidea.lib import _


optional = True
required = False

log = logging.getLogger(__name__)


###############################################
[docs]class RadiusTokenClass(RemoteTokenClass): def __init__(self, db_token): RemoteTokenClass.__init__(self, db_token) self.set_type(u"radius") self.mode = ['authenticate', 'challenge'] @staticmethod
[docs] def get_class_type(): return "radius"
@staticmethod
[docs] def get_class_prefix(): return "PIRA"
@staticmethod @log_with(log)
[docs] def get_class_info(key=None, ret='all'): """ returns a subtree of the token definition :param key: subsection identifier :type key: string :param ret: default return value, if nothing is found :type ret: user defined :return: subsection if key exists or user defined :rtype: dict or string """ res = {'type': 'radius', 'title': 'RADIUS Token', 'description': _('RADIUS: Forward authentication request to a ' 'RADIUS server.'), 'user': ['enroll'], # This tokentype is enrollable in the UI for... 'ui_enroll': ["admin", "user"], 'policy': {}, } if key: ret = res.get(key, {}) else: if ret == 'all': ret = res return ret
[docs] def update(self, param): # New value radius_identifier = getParam(param, "radius.identifier") self.add_tokeninfo("radius.identifier", radius_identifier) # old values if not radius_identifier: radiusServer = getParam(param, "radius.server", optional=required) self.add_tokeninfo("radius.server", radiusServer) radius_secret = getParam(param, "radius.secret", optional=required) self.token.set_otpkey(binascii.hexlify(radius_secret)) system_settings = getParam(param, "radius.system_settings", default=False) self.add_tokeninfo("radius.system_settings", system_settings) if not radius_identifier and not (radiusServer or radius_secret) and \ not system_settings: raise ParameterError("Missing parameter: radius.identifier", id=905) # if another OTP length would be specified in /admin/init this would # be overwritten by the parent class, which is ok. self.set_otplen(6) TokenClass.update(self, param) val = getParam(param, "radius.local_checkpin", optional) or 0 self.add_tokeninfo("radius.local_checkpin", val) val = getParam(param, "radius.user", required) self.add_tokeninfo("radius.user", val)
@property def check_pin_local(self): """ lookup if pin should be checked locally or on radius host :return: bool """ local_check = 1 == int(self.get_tokeninfo("radius.local_checkpin")) log.debug("local checking pin? {0!r}".format(local_check)) return local_check @log_with(log)
[docs] def split_pin_pass(self, passw, user=None, options=None): """ Split the PIN and the OTP value. Only if it is locally checked and not remotely. """ res = 0 pin = "" otpval = passw if self.check_pin_local: (res, pin, otpval) = TokenClass.split_pin_pass(self, passw) return res, pin, otpval
@log_with(log) @check_token_locked
[docs] def check_otp(self, otpval, counter=None, window=None, options=None): """ run the RADIUS request against the RADIUS server :param otpval: the OTP value :param counter: The counter for counter based otp values :type counter: int :param window: a counter window :type counter: int :param options: additional token specific options :type options: dict :return: counter of the matching OTP value. :rtype: int """ otp_count = -1 options = options or {} radius_dictionary = None radius_identifier = self.get_tokeninfo("radius.identifier") radius_user = self.get_tokeninfo("radius.user") system_radius_settings = self.get_tokeninfo("radius.system_settings") if radius_identifier: # New configuration radius_server_object = get_radius(radius_identifier) radius_server = radius_server_object.config.server radius_port = radius_server_object.config.port radius_server = "{0!s}:{1!s}".format(radius_server, radius_port) radius_secret = radius_server_object.get_secret() radius_dictionary = radius_server_object.config.dictionary elif system_radius_settings: # system configuration radius_server = get_from_config("radius.server") radius_secret = get_from_config("radius.secret") # Is returned as unicode, so we convert it to utf-8 radius_secret = radius_secret.encode("utf-8") else: # individual token settings radius_server = self.get_tokeninfo("radius.server") # Read the secret secret = self.token.get_otpkey() radius_secret = binascii.unhexlify(secret.getKey()) # here we also need to check for radius.user log.debug("checking OTP len:{0!s} on radius server: {1!s}, user: {2!s}".format(len(otpval), radius_server, radius_user)) try: # pyrad does not allow to set timeout and retries. # it defaults to retries=3, timeout=5 # TODO: At the moment we support only one radius server. # No round robin. server = radius_server.split(':') r_server = server[0] r_authport = 1812 if len(server) >= 2: r_authport = int(server[1]) nas_identifier = get_from_config("radius.nas_identifier", "privacyIDEA") if not radius_dictionary: radius_dictionary = get_from_config("radius.dictfile", "/etc/privacyidea/" "dictionary") log.debug("NAS Identifier: %r, " "Dictionary: %r" % (nas_identifier, radius_dictionary)) log.debug("constructing client object " "with server: %r, port: %r, secret: %r" % (r_server, r_authport, radius_secret)) srv = Client(server=r_server, authport=r_authport, secret=radius_secret, dict=Dictionary(radius_dictionary)) req = srv.CreateAuthPacket(code=pyrad.packet.AccessRequest, User_Name=radius_user.encode('ascii'), NAS_Identifier=nas_identifier.encode('ascii')) req["User-Password"] = req.PwCrypt(otpval) if "transactionid" in options: req["State"] = str(options.get("transactionid")) response = srv.SendPacket(req) c = response.code # TODO: handle the RADIUS challenge """ if response.code == pyrad.packet.AccessChallenge: opt = {} for attr in response.keys(): opt[attr] = response[attr] res = False log.debug("challenge returned %r " % opt) # now we map this to a privacyidea challenge if "State" in opt: reply["transactionid"] = opt["State"][0] if "Reply-Message" in opt: reply["message"] = opt["Reply-Message"][0] """ if response.code == pyrad.packet.AccessAccept: log.info("Radiusserver %s granted " "access to user %s." % (r_server, radius_user)) otp_count = 0 else: log.warning("Radiusserver %s" "rejected access to user %s." % (r_server, radius_user)) except Exception as ex: # pragma: no cover log.error("Error contacting radius Server: {0!r}".format((ex))) log.debug("{0!s}".format(traceback.format_exc())) return otp_count