# SPDX-FileCopyrightText: (C) 2025 NetKnights GmbH <https://netknights.it>
# SPDX-FileCopyrightText: (C) 2025 Paul Lettich <paul.lettich@netknights.it>
#
# SPDX-License-Identifier: AGPL-3.0-or-later
#
# This code is free software; you can redistribute it and/or
# modify it under the terms of the GNU AFFERO GENERAL PUBLIC LICENSE
# as published by the Free Software Foundation; either
# version 3 of the License, or any later version.
#
# This code is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU AFFERO GENERAL PUBLIC LICENSE for more details.
#
# You should have received a copy of the GNU Affero General Public
# License along with this program. If not, see <http://www.gnu.org/licenses/>.
import binascii
import logging
from sqlalchemy import Sequence
from privacyidea.lib.error import ResourceNotFoundError
from privacyidea.models import db
from privacyidea.models.realm import Realm
from privacyidea.models.utils import MethodsMixin
from privacyidea.models.challenge import Challenge
from privacyidea.models.config import SAFE_STORE
from privacyidea.models.tokengroup import Tokengroup, TokenTokengroup
from privacyidea.lib.crypto import (geturandom, encrypt, hexlify_and_unicode,
pass_hash, encryptPin, decryptPin, hash,
verify_pass_hash, SecretObj, encryptPassword)
from privacyidea.lib.framework import get_app_config_value
from privacyidea.lib.utils import convert_column_to_unicode
from privacyidea.lib.log import log_with
log = logging.getLogger(__name__)
[docs]
class TokenCredentialIdHash(MethodsMixin, db.Model):
__tablename__ = "tokencredentialidhash"
id = db.Column("id", db.Integer, db.Identity(), primary_key=True)
credential_id_hash = db.Column(db.String(256), nullable=False)
token_id = db.Column(db.Integer(), db.ForeignKey("token.id"), nullable=False)
__table_args__ = (db.Index('ix_tokencredentialidhash_credentialidhash',
'credential_id_hash', unique=True),)
def __init__(self, credential_id_hash, token_id):
self.credential_id_hash = credential_id_hash
self.token_id = token_id
[docs]
class Token(MethodsMixin, db.Model):
"""
The "Token" table contains the basic token data.
It contains data like
* serial number
* secret key
* PINs
* ...
The table :py:class:`privacyidea.models.TokenOwner` contains the owner
information of the specified token.
The table :py:class:`privacyidea.models.TokenInfo` contains additional information
that is specific to the tokentype.
"""
__tablename__ = 'token'
id = db.Column(db.Integer, Sequence("token_seq"),
primary_key=True,
nullable=False)
description = db.Column(db.Unicode(80), default='')
serial = db.Column(db.Unicode(40), default='',
unique=True,
nullable=False,
index=True)
tokentype = db.Column(db.Unicode(30),
default='HOTP',
index=True)
user_pin = db.Column(db.Unicode(512),
default='') # encrypt
user_pin_iv = db.Column(db.Unicode(32),
default='') # encrypt
so_pin = db.Column(db.Unicode(512),
default='') # encrypt
so_pin_iv = db.Column(db.Unicode(32),
default='') # encrypt
pin_seed = db.Column(db.Unicode(32),
default='')
otplen = db.Column(db.Integer(),
default=6)
pin_hash = db.Column(db.Unicode(512),
default='') # hashed
key_enc = db.Column(db.Unicode(2800),
default='') # encrypt
key_iv = db.Column(db.Unicode(32),
default='')
maxfail = db.Column(db.Integer(),
default=10)
active = db.Column(db.Boolean(),
nullable=False,
default=True)
revoked = db.Column(db.Boolean(),
default=False)
locked = db.Column(db.Boolean(),
default=False)
failcount = db.Column(db.Integer(),
default=0)
count = db.Column(db.Integer(),
default=0)
count_window = db.Column(db.Integer(),
default=10)
sync_window = db.Column(db.Integer(),
default=1000)
rollout_state = db.Column(db.Unicode(10),
default='')
info_list = db.relationship('TokenInfo', lazy='select', backref='token')
# This creates an attribute "token" in the TokenOwner object
owners = db.relationship('TokenOwner', lazy='dynamic', backref='token')
# Container
container = db.relationship('TokenContainer', secondary='tokencontainertoken', back_populates='tokens')
def __init__(self, serial, tokentype="",
isactive=True, otplen=6,
otpkey="",
userid=None, resolver=None, realm=None,
**kwargs):
super(Token, self).__init__(**kwargs)
self.serial = '' + serial
self.tokentype = tokentype
self.count = 0
self.failcount = 0
self.maxfail = 10
self.active = isactive
self.revoked = False
self.locked = False
self.count_window = 10
self.otplen = otplen
self.pin_seed = ""
self.set_otpkey(otpkey)
# also create the user assignment
if userid and resolver and realm:
# We can not create the tokenrealm-connection and owner-connection, yet
# since we need to token_id.
token_id = self.save()
realm_id = Realm.query.filter_by(name=realm).first().id
tr = TokenRealm(realm_id=realm_id, token_id=token_id)
if tr:
db.session.add(tr)
to = TokenOwner(token_id=token_id, user_id=userid, resolver=resolver, realm_id=realm_id)
if to:
db.session.add(to)
if tr or to:
db.session.commit()
@property
def first_owner(self):
return self.owners.first()
@property
def all_owners(self):
return self.owners.all()
@log_with(log)
def delete(self):
from .machine import MachineToken
# some DBs (e.g. DB2) run in a deadlock, if the TokenRealm entry
# is deleted via key relation, so we delete it explicitly
ret = self.id
db.session.query(TokenRealm) \
.filter(TokenRealm.token_id == self.id) \
.delete()
db.session.query(TokenOwner) \
.filter(TokenOwner.token_id == self.id) \
.delete()
for mt in db.session.execute(db.select(MachineToken).filter(MachineToken.token_id == self.id)).scalars():
mt.delete()
db.session.query(Challenge) \
.filter(Challenge.serial == self.serial) \
.delete()
db.session.query(TokenInfo) \
.filter(TokenInfo.token_id == self.id) \
.delete()
db.session.query(TokenTokengroup) \
.filter(TokenTokengroup.token_id == self.id) \
.delete()
if self.tokentype.lower() in ["webauthn", "passkey"]:
db.session.query(TokenCredentialIdHash).filter(TokenCredentialIdHash.token_id == self.id).delete()
db.session.delete(self)
db.session.commit()
return ret
@staticmethod
def _fix_spaces(data):
"""
On MS SQL server empty fields ("") like the info
are returned as a string with a space (" ").
This functions helps to fix this.
Also avoids running into errors, if the data is a None Type.
:param data: a string from the database
:type data: str
:return: a stripped string
:rtype: str
"""
if data:
data = data.strip()
return data
@log_with(log, hide_args=[1])
def set_otpkey(self, otpkey, reset_failcount=True):
iv = geturandom(16)
self.key_enc = encrypt(otpkey, iv)
length = len(self.key_enc)
if length > Token.key_enc.property.columns[0].type.length:
log.error(f"Key for token {self.serial} exceeds database field with length {length}!")
self.key_iv = hexlify_and_unicode(iv)
self.count = 0
if reset_failcount is True:
self.failcount = 0
[docs]
def set_tokengroups(self, tokengroups, add=False):
"""
Set the list of the tokengroups.
This is done by filling the :py:class:`privacyidea.models.TokenTokengroup` table.
:param tokengroups: the tokengroups
:type tokengroups: list[str]
:param add: If set, the tokengroups are added. I.e. old tokengroups are not deleted
:type add: bool
"""
# delete old Tokengroups
if not add:
db.session.query(TokenTokengroup) \
.filter(TokenTokengroup.token_id == self.id) \
.delete()
# add new Tokengroups
# We must not set the same tokengroup more than once...
# uniquify: tokengroups -> set(tokengroups)
for tokengroup in set(tokengroups):
# Get the id of the realm to add
g = Tokengroup.query.filter_by(name=tokengroup).first()
if g:
# Check if TokenTokengroup already exists
tg = TokenTokengroup.query.filter_by(token_id=self.id,
tokengroup_id=g.id).first()
if not tg:
# If the Tokengroup is not yet attached to the token
token_group = TokenTokengroup(token_id=self.id, tokengroup_id=g.id)
db.session.add(token_group)
db.session.commit()
[docs]
def set_realms(self, realms, add=False):
"""
Set the list of the realms.
This is done by filling the :py:class:`privacyidea.models.TokenRealm` table.
:param realms: realms
:type realms: list[str]
:param add: If set, the realms are added. I.e. old realms are not deleted
:type add: bool
"""
# delete old TokenRealms
if not add:
db.session.query(TokenRealm).filter(TokenRealm.token_id == self.id).delete()
# add new TokenRealms
# We must not set the same realm more than once...
# uniquify: realms -> set(realms)
if self.first_owner and self.first_owner.realm:
if self.first_owner.realm.name not in realms:
realms.append(self.first_owner.realm.name)
log.info(f"The realm of an assigned user cannot be removed from "
f"token {self.first_owner.token.serial} "
f"(realm: {self.first_owner.realm.name})")
for realm in set(realms):
# Get the id of the realm to add
realm_db = Realm.query.filter_by(name=realm).first()
if realm_db:
# Check if tokenrealm already exists
token_realm_db = TokenRealm.query.filter_by(token_id=self.id, realm_id=realm_db.id).first()
if not token_realm_db:
# If the realm is not yet attached to the token
token_realm = TokenRealm(token_id=self.id, realm_id=realm_db.id)
db.session.add(token_realm)
db.session.commit()
[docs]
def get_realms(self):
"""
return a list of the assigned realms
:return: the realms of the token
:rtype: list
"""
realms = []
for tokenrealm in self.realm_list:
realms.append(tokenrealm.realm.name)
return realms
@log_with(log)
def set_user_pin(self, user_pin):
iv = geturandom(16)
self.user_pin = encrypt(user_pin, iv)
self.user_pin_iv = hexlify_and_unicode(iv)
@log_with(log)
def get_otpkey(self):
key = binascii.unhexlify(self.key_enc)
iv = binascii.unhexlify(self.key_iv)
secret = SecretObj(key, iv)
return secret
[docs]
@log_with(log)
def get_user_pin(self):
"""
return the user_pin
:rtype : the PIN as a secretObject
"""
user_pin = self.user_pin or ''
user_pin_iv = self.user_pin_iv or ''
key = binascii.unhexlify(user_pin)
iv = binascii.unhexlify(user_pin_iv)
secret = SecretObj(key, iv)
return secret
[docs]
def set_hashed_pin(self, pin):
"""
Set the pin of the token in hashed format
:param pin: the pin to hash
:type pin: str
:return: the hashed pin
:rtype: str
"""
self.pin_hash = pass_hash(pin)
return self.pin_hash
[docs]
def get_hashed_pin(self, pin):
"""
Calculate a hash from a pin
Fix for working with MS SQL servers
MS SQL servers sometimes return a '<space>' when the
column is empty: ''
:param pin: the pin to hash
:type pin: str
:return: hashed pin with current pin_seed
:rtype: str
"""
seed_str = self._fix_spaces(self.pin_seed)
seed = binascii.unhexlify(seed_str)
hashed_pin = hash(pin, seed)
log.debug(f"hashed_pin: {hashed_pin}, pin: {pin!r}, seed: {self.pin_seed}")
return hashed_pin
@log_with(log)
def set_description(self, desc):
if desc is None:
desc = ""
length = len(desc)
if length > Token.description.property.columns[0].type.length:
desc = desc[:Token.description.property.columns[0].type.length]
self.description = convert_column_to_unicode(desc)
return self.description
[docs]
def set_pin(self, pin, hashed=True):
"""
Set the OTP pin in a hashed way
"""
real_pin = pin or ""
if hashed is True:
self.set_hashed_pin(real_pin)
log.debug(f"set_pin hash: {self.pin_hash!r}")
else:
self.pin_hash = "@@" + encryptPin(real_pin)
log.debug(f"set_pin encrypted: {self.pin_hash!r}")
return self.pin_hash
def check_pin(self, pin):
res = False
# check for a valid input
if pin is not None:
if self.is_pin_encrypted() is True:
log.debug("we got an encrypted PIN!")
token_pin = self.pin_hash[2:]
decrypted_token_pin = decryptPin(token_pin)
if decrypted_token_pin == pin:
res = True
else:
log.debug("we got a hashed PIN!")
if self.pin_hash:
try:
# New PIN verification
return verify_pass_hash(pin, self.pin_hash)
except ValueError as _e:
# old PIN verification
pin_hash = self.get_hashed_pin(pin)
else:
pin_hash = pin
if pin_hash == (self.pin_hash or ""):
res = True
return res
def is_pin_encrypted(self, pin=None):
ret = False
if pin is None:
pin = self.pin_hash or ""
if pin.startswith("@@"):
ret = True
return ret
def get_pin(self):
ret = -1
if self.is_pin_encrypted() is True:
token_pin = self.pin_hash[2:]
ret = decryptPin(token_pin)
return ret
[docs]
def set_so_pin(self, security_officer_pin):
"""
For smartcards this sets the security officer pin of the token
:rtype : None
"""
iv = geturandom(16)
self.so_pin = encrypt(security_officer_pin, iv)
self.so_pin_iv = hexlify_and_unicode(iv)
return self.so_pin, self.so_pin_iv
[docs]
@log_with(log)
def get(self, key=None, fallback=None, save=False):
"""
simulate the dict behaviour to make challenge processing
easier, as this will have to deal as well with
'dict only challenges'
:param key: the attribute name - in case of key is not provided, a dict
of all class attributes are returned
:param fallback: if the attribute is not found,
the fallback is returned
:param save: in case of all attributes and save==True, the timestamp is
converted to a string representation
"""
if key is None:
return self.get_vars(save=save)
td = self.get_vars(save=save)
return td.get(key, fallback)
@log_with(log)
def get_vars(self, save=False):
log.debug('get_vars()')
tokenowner = self.first_owner
ret = {
'id': self.id,
'description': self.description,
'serial': self.serial,
'tokentype': self.tokentype,
'info': self.get_info(),
'resolver': "" if not tokenowner else tokenowner.resolver,
'user_id': "" if not tokenowner else tokenowner.user_id,
'otplen': self.otplen,
'maxfail': self.maxfail,
'active': self.active,
'revoked': self.revoked,
'locked': self.locked,
'failcount': self.failcount,
'count': self.count,
'count_window': self.count_window,
'sync_window': self.sync_window,
'rollout_state': self.rollout_state}
# list of Realm names
realm_list = []
for realm_entry in self.realm_list:
realm_list.append(realm_entry.realm.name)
ret['realms'] = realm_list
# list of tokengroups
tokengroup_list = []
for tg_entry in self.tokengroup_list:
tokengroup_list.append(tg_entry.tokengroup.name)
ret['tokengroup'] = tokengroup_list
return ret
def __str__(self):
return self.serial
def __repr__(self):
"""
return the token state as text
:return: token state as string representation
:rtype: str
"""
ldict = {}
for attr in self.__dict__:
key = "{0!r}".format(attr)
val = "{0!r}".format(getattr(self, attr))
ldict[key] = val
res = "<{0!r} {1!r}>".format(self.__class__, ldict)
return res
[docs]
def set_info(self, info):
"""
Set the additional token info for this token
Entries that end with ".type" are used as type for the keys.
I.e. two entries sshkey="XYZ" and sshkey.type="password" will store
the key sshkey as type "password".
:param info: The key-values to set for this token
:type info: dict
"""
if not self.id:
# If there is no ID to reference the token, we need to save the token
self.save()
types = {}
for k, v in info.items():
if k.endswith(".type"):
key = ".".join(k.split(".")[:-1])
types[key] = v
if v == "password":
# If the type is password, we need to encrypt the value
# as it is a secret.
info[key] = encryptPassword(info[key])
for k, v in info.items():
if not k.endswith(".type"):
TokenInfo(self.id, k, v, Type=types.get(k)).save(persistent=False)
db.session.commit()
[docs]
def del_info(self, key=None):
"""
Deletes tokeninfo for a given token.
If the key is omitted, all Tokeninfo is deleted.
:param key: searches for the given key to delete the entry
:return:
"""
if key:
tokeninfos = TokenInfo.query.filter_by(token_id=self.id, Key=key)
else:
tokeninfos = TokenInfo.query.filter_by(token_id=self.id)
for ti in tokeninfos:
ti.delete()
[docs]
def del_tokengroup(self, tokengroup=None, tokengroup_id=None):
"""
Deletes the tokengroup from the given token.
If tokengroup name and id are omitted, all tokengroups are deleted.
:param tokengroup: The name of the tokengroup
:type tokengroup: str
:param tokengroup_id: The id of the tokengroup
:type tokengroup_id: int
:return:
"""
if tokengroup:
# We need to resolve the id of the tokengroup
t = Tokengroup.query.filter_by(name=tokengroup).first()
if not t:
raise Exception("tokengroup does not exist")
tokengroup_id = t.id
if tokengroup_id:
tokengroups = TokenTokengroup.query.filter_by(tokengroup_id=tokengroup_id, token_id=self.id)
else:
tokengroups = TokenTokengroup.query.filter_by(token_id=self.id)
for tokengroup in tokengroups:
tokengroup.delete()
[docs]
def get_info(self):
"""
:return: The token info as dictionary
"""
ret = {}
for tokeninfo in self.info_list:
if tokeninfo.Type:
ret[tokeninfo.Key + ".type"] = tokeninfo.Type
ret[tokeninfo.Key] = tokeninfo.Value
return ret
[docs]
def update_type(self, typ):
"""
in case the previous has been different type
we must reset the counters
But be aware, ray, this could also be upper and lower case mixing...
"""
if self.tokentype.lower() != typ.lower():
self.count = 0
self.failcount = 0
self.tokentype = typ
return
[docs]
def update_otpkey(self, otpkey):
"""
in case of a new hOtpKey we have to do some more things
"""
if otpkey is not None:
otp_key_secret = self.get_otpkey()
if otp_key_secret.compare(otpkey) is False:
log.debug('update token OtpKey - counter reset')
self.set_otpkey(otpkey)
def update_token(self, description=None, otpkey=None, pin=None):
if description is not None:
self.set_description(description)
if pin is not None:
self.set_pin(pin)
if otpkey is not None:
self.update_otpkey(otpkey)
[docs]
class TokenInfo(MethodsMixin, db.Model):
"""
The table "tokeninfo" is used to store additional, long information that
is specific to the tokentype.
E.g. the tokentype "TOTP" has additional entries in the tokeninfo table
for "timeStep" and "timeWindow", which are stored in the
column "Key" and "Value".
The tokeninfo is reference by the foreign key to the "token" table.
"""
__tablename__ = 'tokeninfo'
id = db.Column(db.Integer, Sequence("tokeninfo_seq"), primary_key=True)
Key = db.Column(db.Unicode(255),
nullable=False)
Value = db.Column(db.UnicodeText(), default='')
Type = db.Column(db.Unicode(100), default='')
Description = db.Column(db.Unicode(2000), default='')
token_id = db.Column(db.Integer(),
db.ForeignKey('token.id'), index=True)
__table_args__ = (db.UniqueConstraint('token_id',
'Key',
name='tiix_2'),)
def __init__(self, token_id, Key, Value, Type=None, Description=None):
"""
Create a new tokeninfo for a given token_id
"""
self.token_id = token_id
self.Key = Key
self.Value = convert_column_to_unicode(Value)
self.Type = Type
self.Description = Description
def save(self, persistent=True):
ti_func = TokenInfo.query.filter_by(token_id=self.token_id, Key=self.Key).first
ti = ti_func()
if ti is None:
# create a new one
db.session.add(self)
db.session.commit()
if get_app_config_value(SAFE_STORE, False):
ti = ti_func()
ret = ti.id
else:
ret = self.id
else:
# update
TokenInfo.query.filter_by(token_id=self.token_id,
Key=self.Key).update({'Value': self.Value,
'Description': self.Description,
'Type': self.Type})
ret = ti.id
if persistent:
db.session.commit()
return ret
[docs]
class TokenOwner(MethodsMixin, db.Model):
"""
This tables stores the owner of a token.
A token can be assigned to several users.
"""
__tablename__ = 'tokenowner'
id = db.Column(db.Integer(), Sequence("tokenowner_seq"), primary_key=True)
token_id = db.Column(db.Integer(), db.ForeignKey('token.id'))
resolver = db.Column(db.Unicode(120), default='', index=True)
user_id = db.Column(db.Unicode(320), default='', index=True)
realm_id = db.Column(db.Integer(), db.ForeignKey('realm.id'))
# This creates an attribute "tokenowners" in the realm objects
realm = db.relationship('Realm', lazy='joined', backref='tokenowners')
def __init__(self, token_id=None, serial=None, user_id=None, resolver=None,
realm_id=None, realmname=None):
"""
Create a new token assignment to a user.
:param token_id: The database ID of the token
:param serial: The alternate serial number of the token
:param resolver: The identifying name of the resolver
:param realm_id: The database ID of the realm
:param realmname: The alternate name of realm
"""
if realm_id is not None:
self.realm_id = realm_id
elif realmname:
realm = Realm.query.filter_by(name=realmname).first()
if not realm:
raise ResourceNotFoundError(f"Realm '{realmname}' does not exist.")
self.realm_id = realm.id
if token_id is not None:
self.token_id = token_id
elif serial:
token = Token.query.filter_by(serial=serial).first()
if not token: # pragma: no cover
# usually this is already covered by the lib / token class functions
raise ResourceNotFoundError(f"Token with serial '{serial}' does not exist.")
self.token_id = token.id
self.resolver = resolver
self.user_id = user_id
def save(self, persistent=True):
to_func = TokenOwner.query.filter_by(token_id=self.token_id,
user_id=self.user_id,
realm_id=self.realm_id,
resolver=self.resolver).first
to = to_func()
if to is None:
# This very assignment does not exist, yet:
db.session.add(self)
db.session.commit()
if get_app_config_value(SAFE_STORE, False):
to = to_func()
ret = to.id
else:
ret = self.id
else:
ret = to.id
# There is nothing to update
if persistent:
db.session.commit()
return ret
[docs]
class TokenRealm(MethodsMixin, db.Model):
"""
This table stores to which realms a token is assigned. A token is in the
realm of the user it is assigned to. But a token can also be put into
many additional realms.
"""
__tablename__ = 'tokenrealm'
id = db.Column(db.Integer(), Sequence("tokenrealm_seq"), primary_key=True)
token_id = db.Column(db.Integer(),
db.ForeignKey('token.id'))
realm_id = db.Column(db.Integer(),
db.ForeignKey('realm.id'))
# This creates an attribute "realm_list" in the Token object
token = db.relationship('Token',
lazy='joined',
backref='realm_list')
# This creates an attribute "token_list" in the Realm object
realm = db.relationship('Realm',
lazy='joined',
backref='token_list')
__table_args__ = (db.UniqueConstraint('token_id',
'realm_id',
name='trix_2'),)
def __init__(self, realm_id=0, token_id=0, realmname=None):
"""
Create a new TokenRealm entry.
:param realm_id: The id of the realm
:param token_id: The id of the token
"""
log.debug("setting realm_id to {0:d}".format(realm_id))
if realmname:
r = Realm.query.filter_by(name=realmname).first()
self.realm_id = r.id
if realm_id:
self.realm_id = realm_id
self.token_id = token_id
[docs]
def get_token_id(serial):
"""
Return the database token ID for a given serial number
:param serial:
:return: token ID
:rtpye: int
"""
token = Token.query.filter(Token.serial == serial).first()
return token.id