# -*- coding: utf-8 -*-
#
# privacyIDEA
# Aug 12, 2014 Cornelius Kölbel
# License: AGPLv3
# contact: http://www.privacyidea.org
#
# 2020-10-16 Cornelius Kölbel <cornelius.koelbel@netknights.it>
# Add attestation certificate functionality
# 2016-04-26 Cornelius Kölbel <cornelius@privacyidea.org>
# Add the possibility to create key pair on server side
# Provide download for pkcs12 file
#
# 2015-05-15 Adapt during migration to flask
# Cornelius Kölbel <cornelius@privacyidea.org>
#
# This code is free software; you can redistribute it and/or
# modify it under the terms of the GNU AFFERO GENERAL PUBLIC LICENSE
# License as published by the Free Software Foundation; either
# version 3 of the License, or any later version.
#
# This code is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU AFFERO GENERAL PUBLIC LICENSE for more details.
#
# You should have received a copy of the GNU Affero General Public
# License along with this program. If not, see <http://www.gnu.org/licenses/>.
#
"""
This file contains the definition of the CertificateToken class.
The code is tested in test_lib_tokens_certificate.py.
"""
import logging
from privacyidea.lib.utils import to_unicode, b64encode_and_unicode, to_byte_string
from privacyidea.lib.tokenclass import TokenClass, ROLLOUTSTATE
from privacyidea.lib.log import log_with
from privacyidea.api.lib.utils import getParam
from privacyidea.lib.caconnector import get_caconnector_object, get_caconnector_list
from privacyidea.lib.user import get_user_from_param
from privacyidea.lib.utils import determine_logged_in_userparams
from OpenSSL import crypto
from cryptography.x509 import load_pem_x509_certificate, load_pem_x509_csr
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.asymmetric import padding
from privacyidea.lib.decorators import check_token_locked
from privacyidea.lib import _
from privacyidea.lib.policy import SCOPE, ACTION as BASE_ACTION, GROUP, Match
from privacyidea.lib.error import privacyIDEAError, CSRError, CSRPending
import traceback
optional = True
required = False
log = logging.getLogger(__name__)
DEFAULT_CA_PATH = ["/etc/privacyidea/trusted_attestation_ca"]
# This is the key of the tokeninfo, where the request Id of a pending certificate is stored
REQUEST_ID = "requestId"
class ACTION(BASE_ACTION):
__doc__ = """This is the list of special certificate actions."""
TRUSTED_CA_PATH = "certificate_trusted_Attestation_CA_path"
REQUIRE_ATTESTATION = "certificate_require_attestation"
CA_CONNECTOR = "ca_connector"
CERTIFICATE_TEMPLATE = "certificate_template"
CERTIFICATE_REQUEST_SUBJECT_COMPONENT = "certificate_request_subject_component"
class REQUIRE_ACTIONS(object):
IGNORE = "ignore"
VERIFY = "verify"
REQUIRE_AND_VERIFY = "require_and_verify"
def verify_certificate_path(certificate, trusted_ca_paths):
"""
Verify a certificate against the list of directories each containing files with
a certificate chain.
:param certificate: The PEM certificate to verify
:param trusted_ca_paths: A list of directories
:return: True or False
"""
from os import listdir
from os.path import isfile, join, isdir
for capath in trusted_ca_paths:
if isdir(capath):
chainfiles = [join(capath, f) for f in listdir(capath) if isfile(join(capath, f))]
for chainfile in chainfiles:
chain = parse_chainfile(chainfile)
try:
verify_certificate(to_byte_string(certificate), chain)
return True
except Exception as exx:
log.debug("Can not verify attestation certificate against chain {0!s}.".format(chain))
else:
log.warning("The configured attestation CA directory does not exist.")
return False
def parse_chainfile(chainfile):
"""
Parse a text file, that contains a list of CA files.
The topmost being the trusted Root CA followed by intermediate
:param chainfile: The filename to parse
:return: A list of PEM certificates
"""
cacerts = []
cacert = ""
with open(chainfile) as f:
lines = f.readlines()
for line in lines:
if line.startswith("-----BEGIN CERTIFICATE-----"):
cacert = line
elif line.startswith("-----END CERTIFICATE-----"):
cacert += line
# End of certificate
cacerts.append(cacert)
elif line.startswith("#") or line == "":
# Empty line or comment
pass
else:
cacert += line
return cacerts
def verify_certificate(certificate, chain):
"""
Verify a certificate against the certificate chain, which can be of any length
The certificate chain starts with the root certificate and contains further
intermediate certificates
:param certificate: The certificate
:type certificate: PEM encoded string
:param chain: A list of PEM encoded certificates
:type chain: list
:return: raises an exception
"""
# first reverse the list, since it can be popped better
chain = list(reversed(chain))
if not chain:
raise privacyIDEAError("Can not verify certificate against an empty chain.")
certificate = load_pem_x509_certificate(to_byte_string(certificate), default_backend())
chain = [load_pem_x509_certificate(to_byte_string(c), default_backend()) for c in chain]
# verify chain
while chain:
signer = chain.pop()
if chain:
# There is another element in the list, so we check the intermediate:
signee = chain.pop()
signer.public_key().verify(
signee.signature,
signee.tbs_certificate_bytes,
padding.PKCS1v15(),
signee.signature_hash_algorithm
)
signer = signee
# This was the last certificate in the chain, so we check the certificate
signer.public_key().verify(
certificate.signature,
certificate.tbs_certificate_bytes,
padding.PKCS1v15(),
certificate.signature_hash_algorithm
)
[docs]class CertificateTokenClass(TokenClass):
"""
Token to implement an X509 certificate.
The certificate can be enrolled by sending a CSR to the server or the
keypair is created by the server. If the server creates the keypair,
the user can download a PKCS12 file.
The OTP PIN is used as passphrase for the PKCS12 file.
privacyIDEA is capable of working with different CA connectors.
Valid parameters are *request* or *certificate*, both PEM encoded.
If you pass a *request* you also need to pass the *ca* that should be
used to sign the request. Passing a *certificate* just uploads the
certificate to a new token object.
A certificate token can be created by an administrative task with the
token/init api like this:
**Example Initialization Request**:
.. sourcecode:: http
POST /auth HTTP/1.1
Host: example.com
Accept: application/json
type=certificate
user=cornelius
realm=realm1
request=<PEM encoded request>
attestation=<PEM encoded attestation certificate>
ca=<name of the ca connector>
**Example Initialization Request, key generation on servers side**
In this case the certificate is created on behalf of another user.
.. sourcecode:: http
POST /auth HTTP/1.1
Host: example.com
Accept: application/json
type=certificate
user=cornelius
realm=realm1
generate=1
ca=<name of the ca connector>
**Example response**:
.. sourcecode:: http
HTTP/1.1 200 OK
Content-Type: application/json
{
"detail": {
"certificate": "...PEM..."
},
"id": 1,
"jsonrpc": "2.0",
"result": {
"status": true,
"value": true
},
"version": "privacyIDEA unknown"
}
"""
using_pin = False
hKeyRequired = False
def __init__(self, aToken):
TokenClass.__init__(self, aToken)
self.set_type("certificate")
self.otp_len = 0
try:
self._update_rollout_state()
except Exception as e:
log.warning("Failed to check for pending update. {0!s}".format(e))
[docs] @staticmethod
def get_class_type():
return "certificate"
[docs] @staticmethod
def get_class_prefix():
return "CRT"
[docs] @staticmethod
@log_with(log)
def get_class_info(key=None, ret='all'):
"""
returns a subtree of the token definition
:param key: subsection identifier
:type key: string
:param ret: default return value, if nothing is found
:type ret: user defined
:return: subsection if key exists or user defined
:rtype: dict or scalar
"""
res = {'type': 'certificate',
'title': 'Certificate Token',
'description': _('Certificate: Enroll an x509 Certificate '
'Token.'),
'init': {},
'config': {},
'user': ['enroll'],
# This tokentype is enrollable in the UI for...
'ui_enroll': ["admin", "user"],
'policy': {
SCOPE.ENROLL: {
ACTION.MAXTOKENUSER: {
'type': 'int',
'desc': _("The user may only have this maximum number of certificates assigned."),
'group': GROUP.TOKEN
},
ACTION.MAXACTIVETOKENUSER: {
'type': 'int',
'desc': _("The user may only have this maximum number of active certificates assigned."),
'group': GROUP.TOKEN
},
ACTION.REQUIRE_ATTESTATION: {
'type': 'str',
'desc': _("Enrolling a certificate token can require an attestation certificate. "
"(Default: ignore)"),
'group': GROUP.TOKEN,
'value': [REQUIRE_ACTIONS.IGNORE,
REQUIRE_ACTIONS.VERIFY,
REQUIRE_ACTIONS.REQUIRE_AND_VERIFY]
},
ACTION.CA_CONNECTOR: {
'type': 'str',
'desc': _("The CA connector that should be used during certificate enrollment."),
'group': GROUP.TOKEN,
'value': [x.get("connectorname") for x in get_caconnector_list()]
},
ACTION.CERTIFICATE_TEMPLATE: {
'type': 'str',
'desc': _("The template that should be used to issue a certificate."),
'group': GROUP.TOKEN
},
ACTION.CERTIFICATE_REQUEST_SUBJECT_COMPONENT: {
'type': 'str',
'desc': _("This takes a space separated list of elements to be added to the subject. "
"Can be 'email' and 'realm'."),
'group': GROUP.TOKEN
}
},
SCOPE.USER: {
ACTION.TRUSTED_CA_PATH: {
'type': 'str',
'desc': _("The directory containing attestation certificate chains."),
'group': GROUP.TOKEN
}
},
SCOPE.ADMIN: {
ACTION.TRUSTED_CA_PATH: {
'type': 'str',
'desc': _("The directory containing attestation certificate chains."),
'group': GROUP.TOKEN
}
}
}
}
if key:
ret = res.get(key, {})
else:
if ret == 'all':
ret = res
return ret
[docs] @classmethod
def get_default_settings(cls, g, params):
"""
This method returns a dictionary with additional settings for token
enrollment.
The settings that are evaluated are
SCOPE.ADMIN|SCOPE.USER, action=trusted_Assertion_CA_path
It sets a list of configured paths.
The returned dictionary is added to the parameters of the API call.
:param g: context object, see documentation of ``Match``
:param params: The call parameters
:type params: dict
:return: default parameters
"""
ret = {ACTION.TRUSTED_CA_PATH: DEFAULT_CA_PATH}
(role, username, userrealm, adminuser, adminrealm) = determine_logged_in_userparams(g.logged_in_user,
params)
# Now we fetch CA-pathes from the policies
paths = Match.generic(g, scope=role,
action=ACTION.TRUSTED_CA_PATH,
user=username,
realm=userrealm,
adminuser=adminuser,
adminrealm=adminrealm).action_values(unique=False,
allow_white_space_in_action=True)
if paths:
ret[ACTION.TRUSTED_CA_PATH] = list(paths)
return ret
def _update_rollout_state(self):
"""
This is a certificate specific method, that communicates to the CA and checks,
if a pending certificate has been enrolled, yet.
If the certificate is enrolled, it fetches the certificate from the CA and
updates the certificate token.
A return code of -1 means that the status is unchanged.
:return: the status of the rollout
"""
status = -1
if self.rollout_state == ROLLOUTSTATE.PENDING:
request_id = self.get_tokeninfo(REQUEST_ID)
ca = self.get_tokeninfo("CA")
if ca and request_id:
request_id = int(request_id)
cacon = get_caconnector_object(ca)
status = cacon.get_cr_status(request_id)
# TODO: Later we need to make the status CA dependent. Different CAs could return
# different codes. So each CA Connector needs a mapper for its specific codes.
if status in [3, 4]: # issued or "issued out of band"
log.info("The certificate {0!s} has been issued by the CA.".format(self.token.serial))
certificate = cacon.get_issued_certificate(request_id)
# Update the rollout state
self.token.rollout_state = ROLLOUTSTATE.ENROLLED
self.add_tokeninfo("certificate", certificate)
elif status == 2: # denied
log.warning("The certificate {0!s} has been denied by the CA.".format(self.token.serial))
self.token.rollout_state = ROLLOUTSTATE.DENIED
self.token.save()
else:
log.info("The certificate {0!s} is still pending.".format(self.token.serial))
else:
log.warning("The certificate token in rollout_state pending, but either the CA ({0!s}) "
"or the requestId ({1!s}) is missing.".format(ca, request_id))
return status
[docs] def update(self, param):
"""
This method is called during the initialization process.
:param param: parameters from the token init
:type param: dict
:return: None
"""
TokenClass.update(self, param)
request = getParam(param, "request", optional)
spkac = getParam(param, "spkac", optional)
certificate = getParam(param, "certificate", optional)
generate = getParam(param, "genkey", optional)
template_name = getParam(param, "template", optional)
subject_components = getParam(param, "subject_components", optional=optional, default=[])
request_id = None
if request or generate:
# If we do not upload a user certificate, then we need a CA do
# sign the uploaded request or generated certificate.
ca = getParam(param, "ca", required)
self.add_tokeninfo("CA", ca)
cacon = get_caconnector_object(ca)
if request:
if not spkac:
# We only do the whole attestation checking in case we have no SPKAC
request_csr = load_pem_x509_csr(to_byte_string(request), default_backend())
if not request_csr.is_signature_valid:
raise privacyIDEAError("request has invalid signature.")
# If a request is sent, we can have an attestation certificate
attestation = getParam(param, "attestation", optional)
verify_attestation = getParam(param, "verify_attestation", optional)
if attestation:
request_numbers = request_csr.public_key().public_numbers()
attestation_cert = load_pem_x509_certificate(to_byte_string(attestation), default_backend())
attestation_numbers = attestation_cert.public_key().public_numbers()
if request_numbers != attestation_numbers:
log.warning("certificate request does not match attestation certificate.")
raise privacyIDEAError("certificate request does not match attestation certificate.")
try:
verified = verify_certificate_path(attestation,
param.get(ACTION.TRUSTED_CA_PATH))
except Exception as exx:
# We could have file system errors during verification.
log.debug("{0!s}".format(traceback.format_exc()))
verified = False
if not verified:
log.warning("Failed to verify certificate chain of attestation certificate.")
if verify_attestation:
raise privacyIDEAError("Failed to verify certificate chain of attestation certificate.")
# During the initialization process, we need to create the certificate
request_id, x509object = cacon.sign_request(request,
options={"spkac": spkac,
"template": template_name})
certificate = crypto.dump_certificate(crypto.FILETYPE_PEM,
x509object)
elif generate:
"""
Create the certificate on behalf of another user. Now we need to create
* the key pair,
* the request
* and the certificate
We need the user for whom the certificate should be created
"""
user = get_user_from_param(param, optionalOrRequired=required)
keysize = getParam(param, "keysize", optional, 2048)
key = crypto.PKey()
key.generate_key(crypto.TYPE_RSA, keysize)
req = crypto.X509Req()
req.get_subject().CN = user.login
# Add components to subject
if subject_components:
if "email" in subject_components and user.info.get("email"):
req.get_subject().emailAddress = user.info.get("email")
if "realm" in subject_components:
req.get_subject().organizationalUnitName = user.realm
# TODO: Add Country, Organization
"""
req.get_subject().countryName = 'xxx'
req.get_subject().stateOrProvinceName = 'xxx'
req.get_subject().localityName = 'xxx'
req.get_subject().organizationName = 'xxx'
"""
req.set_pubkey(key)
r = req.sign(key, "sha256")
csr = to_unicode(crypto.dump_certificate_request(crypto.FILETYPE_PEM, req))
try:
request_id, x509object = cacon.sign_request(csr, options={"template": template_name})
certificate = crypto.dump_certificate(crypto.FILETYPE_PEM, x509object)
except CSRError:
# Mark the token as broken
self.token.rollout_state = ROLLOUTSTATE.FAILED
# Reraise the error
raise CSRError()
except CSRPending as e:
self.token.rollout_state = ROLLOUTSTATE.PENDING
if hasattr(e, "requestId"):
request_id = e.requestId
finally:
# Save the private key to the encrypted key field of the token
s = crypto.dump_privatekey(crypto.FILETYPE_PEM, key)
self.add_tokeninfo("privatekey", s, value_type="password")
if "pin" in param:
self.set_pin(param.get("pin"), encrypt=True)
if certificate:
self.add_tokeninfo("certificate", certificate)
if request_id:
self.add_tokeninfo(REQUEST_ID, request_id)
[docs] @log_with(log)
def get_init_detail(self, params=None, user=None):
"""
At the end of the initialization we return the certificate and the
PKCS12 file, if the private key exists.
"""
response_detail = TokenClass.get_init_detail(self, params, user)
params = params or {}
certificate = self.get_tokeninfo("certificate")
response_detail["certificate"] = certificate
response_detail["rollout_state"] = self.token.rollout_state
privatekey = self.get_tokeninfo("privatekey")
# If there is a private key, we dump a PKCS12
if privatekey:
try:
response_detail["pkcs12"] = b64encode_and_unicode(self._create_pkcs12_bin())
except Exception:
log.warning("Can not create PKCS12 for token {0!s}.".format(self.token.serial))
return response_detail
def _create_pkcs12_bin(self):
"""
Helper function to create an encrypted pkcs12 binary for download
:return: PKCS12 binary
"""
certificate = self.get_tokeninfo("certificate")
privatekey = self.get_tokeninfo("privatekey")
pkcs12 = crypto.PKCS12()
pkcs12.set_certificate(crypto.load_certificate(
crypto.FILETYPE_PEM, certificate))
pkcs12.set_privatekey(crypto.load_privatekey(crypto.FILETYPE_PEM,
privatekey))
# TODO define a random passphrase and hand it to the user
passphrase = self.token.get_pin()
if passphrase == -1:
passphrase = ""
pkcs12_bin = pkcs12.export(passphrase=passphrase.encode('utf8'))
return pkcs12_bin
[docs] def get_as_dict(self):
"""
This returns the token data as a dictionary.
It is used to display the token list at /token/list.
The certificate token can add the PKCS12 file if it exists
:return: The token data as dict
:rtype: dict
"""
# first get the database values as dict
token_dict = self.token.get()
if "privatekey" in token_dict.get("info"):
try:
token_dict["info"]["pkcs12"] = b64encode_and_unicode(self._create_pkcs12_bin())
except Exception:
log.warning("Can not create PKCS12 for token {0!s}.".format(self.token.serial))
return token_dict
[docs] @check_token_locked
def set_pin(self, pin, encrypt=False):
"""
set the PIN of a token.
The PIN of the certificate token is stored encrypted. It is used as
passphrase for the PKCS12 file.
:param pin: the pin to be set for the token
:type pin: basestring
:param encrypt: If set to True, the pin is stored encrypted and
can be retrieved from the database again
:type encrypt: bool
"""
storeHashed = False
self.token.set_pin(pin, storeHashed)
[docs] def revoke(self):
"""
This revokes the token. We need to determine the CA, which issues the
certificate, contact the connector and revoke the certificate
Some token types may revoke a token without locking it.
"""
TokenClass.revoke(self)
# determine the CA and its connector.
ti = self.get_tokeninfo()
ca_specifier = ti.get("CA")
log.debug("Revoking certificate {0!s} on CA {1!s}.".format(
self.token.serial, ca_specifier))
certificate_pem = ti.get("certificate")
# call CAConnector.revoke_cert()
ca_obj = get_caconnector_object(ca_specifier)
revoked = ca_obj.revoke_cert(certificate_pem,
request_id=ti.get(REQUEST_ID))
log.info("Certificate {0!s} revoked on CA {1!s}.".format(revoked,
ca_specifier))
# call CAConnector.create_crl()
crl = ca_obj.create_crl()
log.info("CRL {0!s} created.".format(crl))
return revoked