Source code for privacyidea.lib.tokens.certificatetoken

# -*- coding: utf-8 -*-
#
#  privacyIDEA
#  Aug 12, 2014 Cornelius Kölbel
#  License:  AGPLv3
#  contact:  http://www.privacyidea.org
#
#  2016-04-26 Cornelius Kölbel <cornelius@privacyidea.org>
#             Add the possibility to create key pair on server side
#             Provide download for pkcs12 file
#
#  2015-05-15 Adapt during migration to flask
#             Cornelius Kölbel <cornelius@privacyidea.org>
#
# This code is free software; you can redistribute it and/or
# modify it under the terms of the GNU AFFERO GENERAL PUBLIC LICENSE
# License as published by the Free Software Foundation; either
# version 3 of the License, or any later version.
#
# This code is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU AFFERO GENERAL PUBLIC LICENSE for more details.
#
# You should have received a copy of the GNU Affero General Public
# License along with this program.  If not, see <http://www.gnu.org/licenses/>.
#
"""
This file contains the definition of the CertificateToken class.

The code is tested in test_lib_tokens_certificate.py.
"""

import logging
from privacyidea.lib.tokenclass import TokenClass
from privacyidea.lib.log import log_with
from privacyidea.api.lib.utils import getParam
from privacyidea.lib.caconnector import get_caconnector_object
from privacyidea.lib.user import get_user_from_param
from OpenSSL import crypto
from privacyidea.lib.decorators import check_token_locked
import base64
from privacyidea.lib import _

optional = True
required = False

log = logging.getLogger(__name__)


[docs]class CertificateTokenClass(TokenClass): """ Token to implement an X509 certificate. The certificate can be enrolled by sending a CSR to the server or the keypair is created by the server. If the server creates the keypair, the user can download a PKCS12 file. The OTP PIN is used as passphrase for the PKCS12 file. privacyIDEA is capable of working with different CA connectors. Valid parameters are *request* or *certificate*, both PEM encoded. If you pass a *request* you also need to pass the *ca* that should be used to sign the request. Passing a *certificate* just uploads the certificate to a new token object. A certificate token can be created by an administrative task with the token/init api like this: **Example Initialization Request**: .. sourcecode:: http POST /auth HTTP/1.1 Host: example.com Accept: application/json type=certificate user=cornelius realm=realm1 request=<PEM encoded request> ca=<name of the ca connector> **Example Initialization Request, key generation on servers side** In this case the certificate is created on behalf of another user. .. sourcecode:: http POST /auth HTTP/1.1 Host: example.com Accept: application/json type=certificate user=cornelius realm=realm1 generate=1 ca=<name of the ca connector> **Example response**: .. sourcecode:: http HTTP/1.1 200 OK Content-Type: application/json { "detail": { "certificate": "...PEM..." }, "id": 1, "jsonrpc": "2.0", "result": { "status": true, "value": true }, "version": "privacyIDEA unknown" } """ using_pin = False hKeyRequired = False def __init__(self, aToken): TokenClass.__init__(self, aToken) self.set_type(u"certificate") self.otp_len = 0 @staticmethod
[docs] def get_class_type(): return "certificate"
@staticmethod
[docs] def get_class_prefix(): return "CRT"
@staticmethod @log_with(log)
[docs] def get_class_info(key=None, ret='all'): """ returns a subtree of the token definition :param key: subsection identifier :type key: string :param ret: default return value, if nothing is found :type ret: user defined :return: subsection if key exists or user defined :rtype: dict or scalar """ res = {'type': 'certificate', 'title': 'Certificate Token', 'description': _('Certificate: Enroll an x509 Certificate ' 'Token.'), 'init': {}, 'config': {}, 'user': ['enroll'], # This tokentype is enrollable in the UI for... 'ui_enroll': ["admin", "user"], 'policy': {}, } if key: ret = res.get(key, {}) else: if ret == 'all': ret = res return ret
[docs] def update(self, param): """ This method is called during the initialization process. :param param: parameters from the token init :type param: dict :return: None """ TokenClass.update(self, param) request = getParam(param, "request", optional) spkac = getParam(param, "spkac", optional) certificate = getParam(param, "certificate", optional) generate = getParam(param, "genkey", optional) template_name = getParam(param, "template", optional) if request or generate: # If we do not upload a user certificate, then we need a CA do # sign the uploaded request or generated certificate. ca = getParam(param, "ca", required) self.add_tokeninfo("CA", ca) cacon = get_caconnector_object(ca) if request: # During the initialization process, we need to create the # certificate x509object = cacon.sign_request(request, options={"spkac": spkac, "template": template_name}) certificate = crypto.dump_certificate(crypto.FILETYPE_PEM, x509object) elif generate: # Create the certificate on behalf of another user. # Now we need to create the key pair, # the request # and the certificate # We need the user for whom the certificate should be created user = get_user_from_param(param, optionalOrRequired=required) keysize = getParam(param, "keysize", optional, 2048) key = crypto.PKey() key.generate_key(crypto.TYPE_RSA, keysize) req = crypto.X509Req() req.get_subject().CN = user.login # Add email to subject if user.info.get("email"): req.get_subject().emailAddress = user.info.get("email") req.get_subject().organizationalUnitName = user.realm # TODO: Add Country, Organization, Email # req.get_subject().countryName = 'xxx' # req.get_subject().stateOrProvinceName = 'xxx' # req.get_subject().localityName = 'xxx' # req.get_subject().organizationName = 'xxx' req.set_pubkey(key) req.sign(key, "sha256") x509object = cacon.sign_request(crypto.dump_certificate_request( crypto.FILETYPE_PEM, req), options={"template": template_name}) certificate = crypto.dump_certificate(crypto.FILETYPE_PEM, x509object) # Save the private key to the encrypted key field of the token s = crypto.dump_privatekey(crypto.FILETYPE_PEM, key) self.add_tokeninfo("privatekey", s, value_type="password") if "pin" in param: self.set_pin(param.get("pin"), encrypt=True) if certificate: self.add_tokeninfo("certificate", certificate)
@log_with(log)
[docs] def get_init_detail(self, params=None, user=None): """ At the end of the initialization we return the certificate and the PKCS12 file, if the private key exists. """ response_detail = TokenClass.get_init_detail(self, params, user) params = params or {} certificate = self.get_tokeninfo("certificate") response_detail["certificate"] = certificate privatekey = self.get_tokeninfo("privatekey") # If there is a private key, we dump a PKCS12 if privatekey: response_detail["pkcs12"] = base64.b64encode( self._create_pkcs12_bin()) return response_detail
def _create_pkcs12_bin(self): """ Helper function to create an encrypted pkcs12 binary for download :return: PKCS12 binary """ certificate = self.get_tokeninfo("certificate") privatekey = self.get_tokeninfo("privatekey") pkcs12 = crypto.PKCS12() pkcs12.set_certificate(crypto.load_certificate( crypto.FILETYPE_PEM, certificate)) pkcs12.set_privatekey(crypto.load_privatekey(crypto.FILETYPE_PEM, privatekey)) # TODO define a random passphrase and hand it to the user passphrase = self.token.get_pin() if passphrase == -1: passphrase = "" pkcs12_bin = pkcs12.export(passphrase=passphrase) return pkcs12_bin
[docs] def get_as_dict(self): """ This returns the token data as a dictionary. It is used to display the token list at /token/list. The certificate token can add the PKCS12 file if it exists :return: The token data as dict :rtype: dict """ # first get the database values as dict token_dict = self.token.get() if "privatekey" in token_dict.get("info"): token_dict["info"]["pkcs12"] = base64.b64encode( self._create_pkcs12_bin()) #del(token_dict["privatekey"]) return token_dict
@check_token_locked
[docs] def set_pin(self, pin, encrypt=False): """ set the PIN of a token. The PIN of the certificate token is stored encrypted. It is used as passphrase for the PKCS12 file. :param pin: the pin to be set for the token :type pin: basestring :param encrypt: If set to True, the pin is stored encrypted and can be retrieved from the database again :type encrypt: bool """ storeHashed = False self.token.set_pin(pin, storeHashed)
[docs] def revoke(self): """ This revokes the token. We need to determine the CA, which issues the certificate, contact the connector and revoke the certificate Some token types may revoke a token without locking it. """ TokenClass.revoke(self) # determine the CA and its connector. ti = self.get_tokeninfo() ca_specifier = ti.get("CA") log.debug("Revoking certificate {0!s} on CA {1!s}.".format( self.token.serial, ca_specifier)) certificate_pem = ti.get("certificate") # call CAConnector.revoke_cert() ca_obj = get_caconnector_object(ca_specifier) revoked = ca_obj.revoke_cert(certificate_pem) log.info("Certificate {0!s} revoked on CA {1!s}.".format(revoked, ca_specifier)) # call CAConnector.create_crl() crl = ca_obj.create_crl() log.info("CRL {0!s} created.".format(crl)) return revoked