# 2020-09-21 Cornelius Kölbel <cornelius.koelbel@netknights.it>
# Add possibility of multiple questions and answers
# http://www.privacyidea.org
# 2015-12-16 Initial writeup.
# Cornelius Kölbel <cornelius@privacyidea.org>
#
#
# This code is free software; you can redistribute it and/or
# modify it under the terms of the GNU AFFERO GENERAL PUBLIC LICENSE
# License as published by the Free Software Foundation; either
# version 3 of the License, or any later version.
#
# This code is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU AFFERO GENERAL PUBLIC LICENSE for more details.
#
# You should have received a copy of the GNU Affero General Public
# License along with this program. If not, see <http://www.gnu.org/licenses/>.
#
__doc__ = """The questionnaire token is a challenge response token.
The user can define a set of answers to questions. Within the challenge the
user is asked one of these questions and can respond with the corresponding
answer.
"""
from privacyidea.lib.params import get_required
from privacyidea.lib.config import get_from_config
from privacyidea.lib.tokenclass import TokenClass
from privacyidea.lib.log import log_with
from privacyidea.lib.error import TokenAdminError
import logging
from privacyidea.models import Challenge
from privacyidea.lib.challenge import get_challenges
from privacyidea.lib import _
from privacyidea.lib.decorators import check_token_locked
from privacyidea.lib.policy import SCOPE, GROUP, get_action_values_from_options
from privacyidea.lib.policies.actions import PolicyAction
from privacyidea.lib.crypto import safe_compare
import secrets
import json
import datetime
log = logging.getLogger(__name__)
DEFAULT_NUM_ANSWERS = 5
class QUESTACTION:
NUM_QUESTIONS = "number"
[docs]
class QuestionnaireTokenClass(TokenClass):
"""
This is a Questionnaire Token. The token stores a list of questions and
answers in the tokeninfo database table. The answers are encrypted.
During authentication a random answer is selected and presented as
challenge.
The user has to remember and pass the right answer.
"""
[docs]
@staticmethod
def get_class_type():
"""
Returns the internal token type identifier
:return: question
:rtype: basestring
"""
return "question"
[docs]
@staticmethod
def get_class_prefix():
"""
Return the prefix, that is used as a prefix for the serial numbers.
:return: QUST
:rtype: basestring
"""
return "QUST"
[docs]
@classmethod
@log_with(log)
def get_class_info(cls, key=None, ret='all'):
"""
returns a subtree of the token definition
:param key: subsection identifier
:type key: string
:param ret: default return value, if nothing is found
:type ret: user defined
:return: subsection if key exists or user defined
:rtype: dict or scalar
"""
res = {'type': cls.get_class_type(),
'title': 'Questionnaire Token',
'description': _('Questionnaire: Enroll Questions for the '
'user.'),
'init': {},
'config': {},
'user': ['enroll'],
# This tokentype is enrollable in the UI for...
'ui_enroll': ["admin", "user"],
'policy': {
SCOPE.AUTH: {
QUESTACTION.NUM_QUESTIONS: {
'type': 'int',
'desc': _("The user has to answer this number of questions during authentication."),
'group': GROUP.TOKEN,
'value': list(range(1, 31))
}
},
SCOPE.ENROLL: {
PolicyAction.MAXTOKENUSER: {
'type': 'int',
'desc': _("The user may only have this maximum number of questionaire tokens assigned."),
'group': GROUP.TOKEN
},
PolicyAction.MAXACTIVETOKENUSER: {
'type': 'int',
'desc': _("The user may only have this maximum number of active questionaire "
"tokens assigned."),
'group': GROUP.TOKEN
}
}
},
}
if key:
ret = res.get(key, {})
else:
if ret == 'all':
ret = res
return ret
@log_with(log)
def __init__(self, db_token):
"""
Create a new QUST Token object from a database token
:param db_token: instance of the orm db object
:type db_token: DB object
"""
TokenClass.__init__(self, db_token)
self.set_type(self.get_class_type())
self.hKeyRequired = False
[docs]
def update(self, param):
"""
This method is called during the initialization process.
:param param: parameters from the token init
:type param: dict
:return: None
"""
j_questions = get_required(param, "questions")
try:
# If we have a string, we load the json format
questions = json.loads(j_questions)
except TypeError:
# Obviously we have a dict...
questions = j_questions
num_answers = get_from_config("question.num_answers",
DEFAULT_NUM_ANSWERS)
if len(questions) < int(num_answers):
raise TokenAdminError(_("You need to provide at least %s "
"answers.") % num_answers)
# Save all questions and answers and encrypt them
for question, answer in questions.items():
self.add_tokeninfo(question, answer, value_type="password")
TokenClass.update(self, param)
[docs]
def is_challenge_request(self, passw, user=None, options=None):
"""
The questionnaire token is always a challenge response token.
The challenge is triggered by providing the PIN as the password.
:param passw: password, which might be pin or pin+otp
:type passw: string
:param user: The user from the authentication request
:type user: User object
:param options: dictionary of additional request parameters
:type options: dict
:return: true or false
:rtype: bool
"""
options = options or {}
pin_match = self.check_pin(passw, user=user, options=options)
return pin_match
[docs]
def create_challenge(self, transactionid=None, options=None):
"""
This method creates a challenge, which is submitted to the user.
The submitted challenge will be preserved in the challenge
database.
The challenge is a randomly selected question of the available
questions for this token.
If no transaction id is given, the system will create a transaction
id and return it, so that the response can refer to this transaction.
:param transactionid: the id of this challenge
:param options: the request context parameters / data
:type options: dict
:return: tuple of (bool, message, transactionid, reply_dict)
:rtype: tuple
The return tuple builds up like this:
``bool`` if submit was successful;
``message`` which is displayed in the JSON response;
additional challenge ``reply_dict``, which are displayed in the JSON challenges response.
"""
options = options or {}
questions = {}
# Get an integer list of the already used questions
used_questions = [int(x) for x in options.get("data", "").split(",") if options.get("data")]
# Fill the questions of the token
for tinfo in self.token.info_list:
if tinfo.Type == "password":
# Append a tuple of the DB Id and the actual question
questions[tinfo.id] = tinfo.Key
# if all questions are used up, make a new round
if len(questions) == len(used_questions):
log.info(f"User has only {len(questions)!s} questions in his token. Reusing questions now.")
used_questions = []
# Reduce the allowed questions
remaining_questions = {k: v for (k, v) in questions.items() if k not in used_questions}
message_id = secrets.choice(list(remaining_questions))
message = remaining_questions[message_id]
used_questions = (options.get("data", "") + f",{message_id!s}").strip(",")
validity = int(get_from_config('DefaultChallengeValidityTime', 120))
tokentype = self.get_tokentype().lower()
# Maybe there is a QUESTIONChallengeValidityTime...
lookup_for = tokentype.capitalize() + 'ChallengeValidityTime'
validity = int(get_from_config(lookup_for, validity))
# Create the challenge in the database
db_challenge = Challenge(self.token.serial,
transaction_id=transactionid,
data=used_questions,
session=options.get("session"),
challenge=message,
validitytime=validity)
db_challenge.save()
expiry_date = datetime.datetime.now() + \
datetime.timedelta(seconds=validity)
reply_dict = {'attributes': {'valid_until': f"{expiry_date!s}"}}
return True, message, db_challenge.transaction_id, reply_dict
[docs]
def check_answer(self, given_answer, challenge_object):
"""
Check if the given answer is the answer to the sent question.
The question for this challenge response was stored in the
challenge_object.
Then we get the answer from the tokeninfo.
:param given_answer: The answer given by the user
:param challenge_object: The challenge object as stored in the database
:return: in case of success: 1
"""
res = -1
question = challenge_object.challenge
answer = self.get_tokeninfo(question)
# We need to compare two unicode strings
if safe_compare(answer, given_answer):
res = 1
else:
log.debug(f"The answer for token {self.get_serial()!s} does not match.")
return res
[docs]
@check_token_locked
def check_challenge_response(self, user=None, passw=None, options=None):
"""
This method verifies if there is a matching question for the given
passw and also verifies if the answer is correct.
It then returns the the otp_counter = 1
:param user: the requesting user
:type user: User object
:param passw: the password - in fact it is the answer to the question
:type passw: string
:param options: additional arguments from the request, which could
be token specific. Usually "transaction_id"
:type options: dict
:return: return 1 if the answer to the question is correct, -1 otherwise.
:rtype: int
"""
options = options or {}
r_success = -1
# fetch the transaction_id
transaction_id = options.get('transaction_id')
if transaction_id is None:
transaction_id = options.get('state')
# get the challenges for this transaction ID
if transaction_id is not None:
challengeobject_list = get_challenges(serial=self.token.serial,
transaction_id=transaction_id)
for challengeobject in challengeobject_list:
if challengeobject.is_valid():
# challenge is still valid
if self.check_answer(passw, challengeobject) > 0:
r_success = 1
# Set valid OTP to true. We must not delete the challenge now,
# Since we need it for further mutlichallenges
challengeobject.set_otp_status(True)
log.debug("The presented answer was correct.")
break
else:
# increase the received_count
challengeobject.set_otp_status()
self.challenge_janitor()
return r_success
[docs]
@log_with(log)
def has_further_challenge(self, options=None):
"""
Check if there are still more questions to be asked.
:param options: Options dict
:return: True, if further challenge is required.
"""
transaction_id = options.get('transaction_id')
challengeobject_list = get_challenges(serial=self.token.serial,
transaction_id=transaction_id)
question_number = int(get_action_values_from_options(SCOPE.AUTH,
f"{self.get_class_type()!s}_{QUESTACTION.NUM_QUESTIONS!s}",
options) or 1)
if len(challengeobject_list) == 1:
session = int(challengeobject_list[0].session or "0") + 1
options["session"] = f"{session!s}"
# write the used questions to the data field
options["data"] = challengeobject_list[0].data or ""
if session < question_number:
return True
return False
[docs]
@staticmethod
def get_setting_type(key):
"""
The setting type of questions is public, so that the user can also
read the questions.
:param key: The key of the setting
:return: "public" string
"""
if key.startswith("question.question.") or key == "question.num_answers":
return "public"
return ""