# -*- coding: utf-8 -*-
#
# 2019-08-15 Cornelius Kölbel <cornelius.koelbel@netknights.it>
# Allow RADIUS challenge / response
# Credits to @droobah, who provided the first pull request
# https://github.com/privacyidea/privacyidea/pull/1389
# 2018-01-21 Cornelius Kölbel <cornelius.koelbel@netknights.it>
# Add tokenkind
# 2016-02-22 Cornelius Kölbel <cornelius@privacyidea.org>
# Add the RADIUS identifier, which points to the system wide list
# of RADIUS servers.
# 2015-10-09 Cornelius Kölbel <cornelius@privacyidea.org>
# Add the RADIUS-System-Config, so that not each
# RADIUS-token needs his own secret. -> change the
# secret globally
# 2015-01-29 Adapt for migration to flask
# Cornelius Kölbel <cornelius@privacyidea.org>
#
# May 08, 2014 Cornelius Kölbel
# License: AGPLv3
# contact: http://www.privacyidea.org
#
# Copyright (C) 2010 - 2014 LSE Leading Security Experts GmbH
# License: LSE
# contact: http://www.linotp.org
# http://www.lsexperts.de
# linotp@lsexperts.de
#
# This code is free software; you can redistribute it and/or
# modify it under the terms of the GNU AFFERO GENERAL PUBLIC LICENSE
# License as published by the Free Software Foundation; either
# version 3 of the License, or any later version.
#
# This code is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU AFFERO GENERAL PUBLIC LICENSE for more details.
#
# You should have received a copy of the GNU Affero General Public
# License along with this program. If not, see <http://www.gnu.org/licenses/>.
#
__doc__ = """This module defines the RadiusTokenClass. The RADIUS token
forwards the authentication request to another RADIUS server.
The code is tested in tests/test_lib_tokens_radius
"""
import logging
import traceback
import binascii
from privacyidea.lib.utils import is_true, to_bytes, hexlify_and_unicode, to_unicode
from privacyidea.lib.tokens.remotetoken import RemoteTokenClass
from privacyidea.lib.tokenclass import TokenClass, TOKENKIND
from privacyidea.api.lib.utils import getParam, ParameterError
from privacyidea.lib.log import log_with
from privacyidea.lib.config import get_from_config
from privacyidea.lib.decorators import check_token_locked
from privacyidea.lib.radiusserver import get_radius
from privacyidea.models import Challenge
from privacyidea.lib.challenge import get_challenges
from privacyidea.lib.policydecorators import challenge_response_allowed
import pyrad.packet
from pyrad.client import Client, Timeout
from pyrad.dictionary import Dictionary
from pyrad.packet import AccessChallenge, AccessAccept, AccessReject
from privacyidea.lib import _
from privacyidea.lib.policy import SCOPE, ACTION, GROUP
optional = True
required = False
log = logging.getLogger(__name__)
###############################################
[docs]class RadiusTokenClass(RemoteTokenClass):
def __init__(self, db_token):
RemoteTokenClass.__init__(self, db_token)
self.set_type("radius")
self.mode = ['authenticate', 'challenge']
[docs] @staticmethod
def get_class_type():
return "radius"
[docs] @staticmethod
def get_class_prefix():
return "PIRA"
[docs] @staticmethod
@log_with(log)
def get_class_info(key=None, ret='all'):
"""
returns a subtree of the token definition
:param key: subsection identifier
:type key: string
:param ret: default return value, if nothing is found
:type ret: user defined
:return: subsection if key exists or user defined
:rtype: dict or string
"""
res = {'type': 'radius',
'title': 'RADIUS Token',
'description': _('RADIUS: Forward authentication request to a '
'RADIUS server.'),
'user': ['enroll'],
# This tokentype is enrollable in the UI for...
'ui_enroll': ["admin", "user"],
'policy': {
SCOPE.ENROLL: {
ACTION.MAXTOKENUSER: {
'type': 'int',
'desc': _("The user may only have this maximum number of RADIUS tokens assigned."),
'group': GROUP.TOKEN
},
ACTION.MAXACTIVETOKENUSER: {
'type': 'int',
'desc': _(
"The user may only have this maximum number of active RADIUS tokens assigned."),
'group': GROUP.TOKEN
}
}
},
}
if key:
ret = res.get(key, {})
else:
if ret == 'all':
ret = res
return ret
[docs] @log_with(log)
def update(self, param):
# New value
radius_identifier = getParam(param, "radius.identifier")
self.add_tokeninfo("radius.identifier", radius_identifier)
# old values
if not radius_identifier:
radiusServer = getParam(param, "radius.server", optional=required)
self.add_tokeninfo("radius.server", radiusServer)
radius_secret = getParam(param, "radius.secret", optional=required)
self.token.set_otpkey(hexlify_and_unicode(radius_secret))
system_settings = getParam(param, "radius.system_settings",
default=False)
self.add_tokeninfo("radius.system_settings", system_settings)
if not (radiusServer or radius_secret) and not system_settings:
raise ParameterError("Missing parameter: radius.identifier", id=905)
# if another OTP length would be specified in /admin/init this would
# be overwritten by the parent class, which is ok.
self.set_otplen(6)
TokenClass.update(self, param)
val = getParam(param, "radius.local_checkpin", optional) or 0
self.add_tokeninfo("radius.local_checkpin", val)
val = getParam(param, "radius.user", required)
self.add_tokeninfo("radius.user", val)
self.add_tokeninfo("tokenkind", TOKENKIND.VIRTUAL)
[docs] @log_with(log)
@challenge_response_allowed
def is_challenge_request(self, passw, user=None, options=None):
"""
This method checks, if this is a request, that triggers a challenge.
It depends on the way, the pin is checked - either locally or remotely.
In addition, the RADIUS token has to be configured to allow challenge response.
communication with RADIUS server: yes
modification of options: The communication with the RADIUS server can
change the options, radius_state, radius_result, radius_message
:param passw: password, which might be pin or pin+otp
:type passw: string
:param user: The user from the authentication request
:type user: User object
:param options: dictionary of additional request parameters
:type options: dict
:return: true or false
"""
if options is None:
options = {}
# should we check the pin locally?
if self.check_pin_local:
# With a local PIN the challenge response is always a privacyIDEA challenge response!
res = self.check_pin(passw, user=user, options=options)
return res
else:
state = options.get('radius_state')
# The pin is checked remotely
res = options.get('radius_result')
if res is None:
res = self._check_radius(passw, options=options, radius_state=state)
return res == AccessChallenge
[docs] @log_with(log)
def create_challenge(self, transactionid=None, options=None):
"""
create a challenge, which is submitted to the user
This method is called after ``is_challenge_request`` has verified,
that a challenge needs to be created.
communication with RADIUS server: no
modification of options: no
:param transactionid: the id of this challenge
:param options: the request context parameters / data
:return: tuple of (bool, message and data)
bool, if submit was successful
message is submitted to the user
data is preserved in the challenge
reply_dict - additional attributes, which are displayed in the
output
"""
if options is None:
options = {}
message = options.get('radius_message') or "Enter your RADIUS tokencode:"
state = hexlify_and_unicode(options.get('radius_state') or b'')
reply_dict = {'attributes': {'state': transactionid}}
validity = int(get_from_config('DefaultChallengeValidityTime', 120))
db_challenge = Challenge(self.token.serial,
transaction_id=transactionid,
data=state,
challenge=message,
validitytime=validity)
db_challenge.save()
self.challenge_janitor()
return True, message, db_challenge.transaction_id, reply_dict
[docs] @log_with(log)
def is_challenge_response(self, passw, user=None, options=None):
"""
This method checks, if this is a request, that is the response to
a previously sent challenge. But we do not query the RADIUS server.
This is the first method in the loop ``check_token_list``.
communication with RADIUS server: no
modification of options: The "radius_result" key is set to None
:param passw: password, which might be pin or pin+otp
:type passw: string
:param user: the requesting user
:type user: User object
:param options: dictionary of additional request parameters
:type options: dict
:return: true or false
:rtype: bool
"""
if options is None:
options = {}
challenge_response = False
# clear the radius_result since this is the first function called in the chain
# this value will be utilized to ensure we do not _check_radius more than once in the loop
options.update({'radius_result': None})
# fetch the transaction_id
transaction_id = options.get('transaction_id')
if transaction_id is None:
transaction_id = options.get('state')
if transaction_id:
# get the challenges for this transaction ID
challengeobject_list = get_challenges(serial=self.token.serial,
transaction_id=transaction_id)
for challengeobject in challengeobject_list:
if challengeobject.is_valid():
challenge_response = True
return challenge_response
[docs] @log_with(log)
@check_token_locked
def check_challenge_response(self, user=None, passw=None, options=None):
"""
This method verifies if there is a matching question for the given
passw and also verifies if the answer is correct.
It then returns the the otp_counter = 1
:param user: the requesting user
:type user: User object
:param passw: the password - in fact it is the answer to the question
:type passw: string
:param options: additional arguments from the request, which could
be token specific. Usually "transaction_id"
:type options: dict
:return: return otp_counter. If -1, challenge does not match
:rtype: int
"""
if options is None:
options = {}
otp_counter = -1
# fetch the transaction_id
transaction_id = options.get('transaction_id') or options.get('state')
# get the challenges for this transaction ID
if transaction_id is not None:
challengeobject_list = get_challenges(serial=self.token.serial,
transaction_id=transaction_id)
for challengeobject in challengeobject_list:
if challengeobject.is_valid():
state = binascii.unhexlify(challengeobject.data)
# challenge is still valid
radius_response = self._check_radius(passw, options=options, radius_state=state)
if radius_response == AccessAccept:
# We found the matching challenge,
# and the RADIUS server returned AccessAccept
challengeobject.delete()
otp_counter = 1
break
elif radius_response == AccessChallenge:
# The response was valid but triggered a new challenge
# Note: The second challenge currently does not work correctly
# see https://github.com/privacyidea/privacyidea/issues/1792
challengeobject.delete()
_, _, transaction_id, _ = self.create_challenge(options=options)
options["transaction_id"] = transaction_id
otp_counter = -1
break
else:
otp_counter = -1
# increase the received_count
challengeobject.set_otp_status()
self.challenge_janitor()
return otp_counter
@property
def check_pin_local(self):
"""
lookup if pin should be checked locally or on radius host
:return: bool
"""
local_check = is_true(self.get_tokeninfo("radius.local_checkpin"))
log.debug("local checking pin? {0!r}".format(local_check))
return local_check
[docs] @log_with(log)
def split_pin_pass(self, passw, user=None, options=None):
"""
Split the PIN and the OTP value.
Only if it is locally checked and not remotely.
"""
res = 0
pin = ""
otpval = passw
if self.check_pin_local:
(res, pin, otpval) = TokenClass.split_pin_pass(self, passw)
return res, pin, otpval
[docs] @log_with(log)
@check_token_locked
def authenticate(self, passw, user=None, options=None):
"""
do the authentication on base of password / otp and user and
options, the request parameters.
This is only called after it is verified, that the upper level is no challenge-request
or challenge-response
The "options" are read-only in this method. They are not modified here. authenticate
is the last method in the loop ``check_token_list``.
communication with RADIUS server: yes, if is no previous "radius_result"
If there is a "radius" result in the options, we do not query the radius server
modification of options: options can be modified if we query the radius server.
However, this is not important since authenticate is the last call.
:param passw: the password / otp
:param user: the requesting user
:param options: the additional request parameters
:return: tuple of (success, otp_count - 0 or -1, reply)
"""
options = options or {}
res = False
otp_counter = -1
reply = None
otpval = passw
# should we check the pin locally?
if self.check_pin_local:
(_res, pin, otpval) = self.split_pin_pass(passw, user,
options=options)
if not self.check_pin(pin, user=user, options=options):
return False, -1, {'message': "Wrong PIN"}
# attempt to retrieve saved state/result
state = options.get('radius_state')
result = options.get('radius_result')
if result is None:
radius_response = self._check_radius(otpval, options=options, radius_state=state)
else:
radius_response = result
if radius_response == AccessAccept:
res = True
otp_counter = 1
return res, otp_counter, reply
[docs] @log_with(log)
@check_token_locked
def check_otp(self, otpval, counter=None, window=None, options=None):
"""
Originally check_otp returns an OTP counter. I.e. in a failed attempt
we return -1. In case of success we return 1
:param otpval:
:param counter:
:param window:
:param options:
:return:
"""
res = self._check_radius(otpval, options=options)
if res == AccessAccept:
return 1
else:
return -1
@log_with(log)
@check_token_locked
def _check_radius(self, otpval, options=None, radius_state=None):
"""
run the RADIUS request against the RADIUS server
:param otpval: the OTP value
:param options: additional token specific options
:type options: dict
:return: counter of the matching OTP value.
:rtype: AccessAccept, AccessReject, AccessChallenge
"""
result = AccessReject
radius_message = None
if options is None:
options = {}
radius_dictionary = None
radius_identifier = self.get_tokeninfo("radius.identifier")
radius_user = self.get_tokeninfo("radius.user")
system_radius_settings = self.get_tokeninfo("radius.system_settings")
radius_timeout = 5
radius_retries = 3
if radius_identifier:
# New configuration
radius_server_object = get_radius(radius_identifier)
radius_server = radius_server_object.config.server
radius_port = radius_server_object.config.port
radius_server = "{0!s}:{1!s}".format(radius_server, radius_port)
radius_secret = radius_server_object.get_secret()
radius_dictionary = radius_server_object.config.dictionary
radius_timeout = int(radius_server_object.config.timeout or 10)
radius_retries = int(radius_server_object.config.retries or 1)
elif system_radius_settings:
# system configuration
radius_server = get_from_config("radius.server")
radius_secret = get_from_config("radius.secret")
else:
# individual token settings
radius_server = self.get_tokeninfo("radius.server")
# Read the secret
secret = self.token.get_otpkey()
radius_secret = binascii.unhexlify(secret.getKey())
# here we also need to check for radius.user
log.debug("checking OTP len:{0!s} on radius server: "
"{1!s}, user: {2!r}".format(len(otpval), radius_server,
radius_user))
try:
# pyrad does not allow to set timeout and retries.
# it defaults to retries=3, timeout=5
# TODO: At the moment we support only one radius server.
# No round robin.
server = radius_server.split(':')
r_server = server[0]
r_authport = 1812
if len(server) >= 2:
r_authport = int(server[1])
nas_identifier = get_from_config("radius.nas_identifier",
"privacyIDEA")
if not radius_dictionary:
radius_dictionary = get_from_config("radius.dictfile",
"/etc/privacyidea/dictionary")
log.debug("NAS Identifier: %r, "
"Dictionary: %r" % (nas_identifier, radius_dictionary))
log.debug("constructing client object "
"with server: %r, port: %r, secret: %r" %
(r_server, r_authport, to_unicode(radius_secret)))
srv = Client(server=r_server,
authport=r_authport,
secret=to_bytes(radius_secret),
dict=Dictionary(radius_dictionary))
# Set retries and timeout of the client
srv.timeout = radius_timeout
srv.retries = radius_retries
req = srv.CreateAuthPacket(code=pyrad.packet.AccessRequest,
User_Name=radius_user.encode('utf-8'),
NAS_Identifier=nas_identifier.encode('ascii'))
req["User-Password"] = req.PwCrypt(otpval)
if radius_state:
req["State"] = radius_state
log.info("Sending saved challenge to radius server: {0!r} ".format(radius_state))
try:
response = srv.SendPacket(req)
except Timeout:
log.warning("The remote RADIUS server {0!s} timeout out for user {1!s}.".format(
r_server, radius_user))
return AccessReject
# handle the RADIUS challenge
if response.code == pyrad.packet.AccessChallenge:
# now we map this to a privacyidea challenge
if "State" in response:
radius_state = response["State"][0]
if "Reply-Message" in response:
radius_message = response["Reply-Message"][0]
result = AccessChallenge
elif response.code == pyrad.packet.AccessAccept:
radius_state = '<SUCCESS>'
radius_message = 'RADIUS authentication succeeded'
log.info("RADIUS server {0!s} granted "
"access to user {1!s}.".format(r_server, radius_user))
result = AccessAccept
else:
radius_state = '<REJECTED>'
radius_message = 'RADIUS authentication failed'
log.debug('radius response code {0!s}'.format(response.code))
log.info("Radiusserver {0!s} "
"rejected access to user {1!s}.".format(r_server, radius_user))
result = AccessReject
except Exception as ex: # pragma: no cover
log.error("Error contacting radius Server: {0!r}".format((ex)))
log.info("{0!s}".format(traceback.format_exc()))
options.update({'radius_result': result})
options.update({'radius_state': radius_state})
options.update({'radius_message': radius_message})
return result