# SPDX-FileCopyrightText: (C) 2025 NetKnights GmbH <https://netknights.it>
# SPDX-FileCopyrightText: (C) 2025 Paul Lettich <paul.lettich@netknights.it>
#
# SPDX-License-Identifier: AGPL-3.0-or-later
#
# This code is free software; you can redistribute it and/or
# modify it under the terms of the GNU AFFERO GENERAL PUBLIC LICENSE
# as published by the Free Software Foundation; either
# version 3 of the License, or any later version.
#
# This code is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU AFFERO GENERAL PUBLIC LICENSE for more details.
#
# You should have received a copy of the GNU Affero General Public
# License along with this program. If not, see <http://www.gnu.org/licenses/>.
import binascii
import logging
from typing import TYPE_CHECKING
from sqlalchemy import Sequence, Unicode, Integer, Boolean, select, UnicodeText
from sqlalchemy.orm import Mapped, mapped_column, relationship
if TYPE_CHECKING:
from privacyidea.models.machine import MachineToken
from privacyidea.models.tokengroup import Tokengroup
from privacyidea.lib.crypto import (geturandom, encrypt, hexlify_and_unicode,
pass_hash, encryptPin, decryptPin, hash,
verify_pass_hash, SecretObj)
from privacyidea.lib.error import ResourceNotFoundError
from privacyidea.lib.log import log_with
from privacyidea.lib.utils import convert_column_to_unicode
from privacyidea.models import db
from privacyidea.models.realm import Realm
from privacyidea.models.utils import MethodsMixin
log = logging.getLogger(__name__)
[docs]
class TokenCredentialIdHash(MethodsMixin, db.Model):
__tablename__ = "tokencredentialidhash"
id: Mapped[int] = mapped_column("id", Integer, Sequence("tokencredentialidhash_seq"), primary_key=True)
credential_id_hash: Mapped[str] = mapped_column(db.String(256), nullable=False)
token_id: Mapped[int] = mapped_column(db.ForeignKey("token.id"), nullable=False)
__table_args__ = (db.Index('ix_tokencredentialidhash_credentialidhash',
'credential_id_hash', unique=True),)
def __init__(self, credential_id_hash, token_id):
self.credential_id_hash = credential_id_hash
self.token_id = token_id
[docs]
class Token(MethodsMixin, db.Model):
"""
The "Token" table contains the basic token data.
It contains data like
* serial number
* secret key
* PINs
* ...
The table :py:class:`privacyidea.models.TokenOwner` contains the owner
information of the specified token.
The table :py:class:`privacyidea.models.TokenInfo` contains additional information
that is specific to the tokentype.
"""
__tablename__ = 'token'
id: Mapped[int] = mapped_column(Integer, Sequence("token_seq"),
primary_key=True,
nullable=False)
description: Mapped[str | None] = mapped_column(Unicode(80), default='')
serial: Mapped[str] = mapped_column(Unicode(40), default='',
unique=True,
nullable=False,
index=True)
tokentype: Mapped[str | None] = mapped_column(Unicode(30),
default='HOTP',
index=True)
user_pin: Mapped[str | None] = mapped_column(Unicode(512),
default='') # encrypt
user_pin_iv: Mapped[str | None] = mapped_column(Unicode(32),
default='') # encrypt
so_pin: Mapped[str | None] = mapped_column(Unicode(512),
default='') # encrypt
so_pin_iv: Mapped[str | None] = mapped_column(Unicode(32),
default='') # encrypt
pin_seed: Mapped[str | None] = mapped_column(Unicode(32),
default='')
otplen: Mapped[int | None] = mapped_column(Integer,
default=6)
pin_hash: Mapped[str | None] = mapped_column(Unicode(512),
default='') # hashed
key_enc: Mapped[str | None] = mapped_column(Unicode(2800),
default='') # encrypt
key_iv: Mapped[str | None] = mapped_column(Unicode(32),
default='')
maxfail: Mapped[int | None] = mapped_column(Integer,
default=10)
active: Mapped[bool] = mapped_column(Boolean,
nullable=False,
default=True)
revoked: Mapped[bool | None] = mapped_column(Boolean,
default=False)
locked: Mapped[bool | None] = mapped_column(Boolean,
default=False)
failcount: Mapped[int | None] = mapped_column(Integer,
default=0)
count: Mapped[int | None] = mapped_column(Integer,
default=0)
count_window: Mapped[int | None] = mapped_column(Integer,
default=10)
sync_window: Mapped[int | None] = mapped_column(Integer,
default=1000)
rollout_state: Mapped[str | None] = mapped_column(Unicode(10),
default='')
info_list = relationship('TokenInfo', lazy='select', back_populates='token', cascade="all, delete-orphan")
owners = relationship('TokenOwner', lazy='dynamic', back_populates='token', cascade="all, delete-orphan")
# Container
container = relationship('TokenContainer', secondary='tokencontainertoken', back_populates='tokens')
# This creates an attribute "realm_list" in the Token object
# TODO: could be updated to a modern relationship that stores a list of realms here and not of the association
# table TokenRealm (requires changes in the token query, etc.)
realm_list = relationship('TokenRealm', lazy='joined', back_populates='token')
tokengroup_list: Mapped[list['Tokengroup']] = relationship('Tokengroup', secondary='tokentokengroup',
back_populates='tokens', single_parent=True)
machine_list: Mapped[list['MachineToken']] = relationship('MachineToken', back_populates='token',
cascade="all, delete-orphan")
def __init__(self, serial, tokentype="",
isactive=True, otplen=6,
otpkey="",
**kwargs):
super().__init__(**kwargs)
self.serial = '' + serial
self.tokentype = tokentype
self.count = 0
self.failcount = 0
self.maxfail = 10
self.active = isactive
self.revoked = False
self.locked = False
self.count_window = 10
self.otplen = otplen
self.pin_seed = ""
self.set_otpkey(otpkey)
@property
def first_owner(self):
return self.owners.first()
@property
def all_owners(self):
return self.owners.all()
@staticmethod
def _fix_spaces(data):
"""
On MS SQL server empty fields ("") like the info
are returned as a string with a space (" ").
This functions helps to fix this.
Also avoids running into errors, if the data is a None Type.
:param data: a string from the database
:type data: str
:return: a stripped string
:rtype: str
"""
if data:
data = data.strip()
return data
@log_with(log, hide_args=[1])
def set_otpkey(self, otpkey, reset_failcount=True):
iv = geturandom(16)
self.key_enc = encrypt(otpkey, iv)
length = len(self.key_enc)
if length > Token.key_enc.property.columns[0].type.length:
log.error(f"Key for token {self.serial} exceeds database field with length {length}!")
self.key_iv = hexlify_and_unicode(iv)
self.count = 0
if reset_failcount is True:
self.failcount = 0
[docs]
def get_realms(self):
"""
return a list of the assigned realms
:return: the realms of the token
:rtype: list
"""
realms = []
for tokenrealm in self.realm_list:
realms.append(tokenrealm.realm.name)
return realms
@log_with(log)
def set_user_pin(self, user_pin):
iv = geturandom(16)
self.user_pin = encrypt(user_pin, iv)
self.user_pin_iv = hexlify_and_unicode(iv)
@log_with(log)
def get_otpkey(self):
key = binascii.unhexlify(self.key_enc)
iv = binascii.unhexlify(self.key_iv)
secret = SecretObj(key, iv)
return secret
[docs]
@log_with(log)
def get_user_pin(self):
"""
return the user_pin
:rtype : the PIN as a secretObject
"""
user_pin = self.user_pin or ''
user_pin_iv = self.user_pin_iv or ''
key = binascii.unhexlify(user_pin)
iv = binascii.unhexlify(user_pin_iv)
secret = SecretObj(key, iv)
return secret
[docs]
def set_hashed_pin(self, pin):
"""
Set the pin of the token in hashed format
:param pin: the pin to hash
:type pin: str
:return: the hashed pin
:rtype: str
"""
self.pin_hash = pass_hash(pin)
return self.pin_hash
[docs]
def get_hashed_pin(self, pin):
"""
Calculate a hash from a pin
Fix for working with MS SQL servers
MS SQL servers sometimes return a '<space>' when the
column is empty: ''
:param pin: the pin to hash
:type pin: str
:return: hashed pin with current pin_seed
:rtype: str
"""
seed_str = self._fix_spaces(self.pin_seed)
seed = binascii.unhexlify(seed_str)
hashed_pin = hash(pin, seed)
log.debug(f"hashed_pin: {hashed_pin}, pin: {pin!r}, seed: {self.pin_seed}")
return hashed_pin
@log_with(log)
def set_description(self, desc):
if desc is None:
desc = ""
length = len(desc)
if length > Token.description.property.columns[0].type.length:
desc = desc[:Token.description.property.columns[0].type.length]
self.description = convert_column_to_unicode(desc)
return self.description
[docs]
def set_pin(self, pin, hashed=True):
"""
Set the OTP pin in a hashed way
"""
real_pin = pin or ""
if hashed:
self.set_hashed_pin(real_pin)
log.debug(f"set_pin hash: {self.pin_hash!r}")
else:
self.pin_hash = "@@" + encryptPin(real_pin)
log.debug(f"set_pin encrypted: {self.pin_hash!r}")
return self.pin_hash
def check_pin(self, pin):
res = False
# check for a valid input
if pin is not None:
if self.is_pin_encrypted() is True:
log.debug("we got an encrypted PIN!")
token_pin = self.pin_hash[2:]
decrypted_token_pin = decryptPin(token_pin)
if decrypted_token_pin == pin:
res = True
else:
log.debug("we got a hashed PIN!")
if self.pin_hash:
try:
# New PIN verification
return verify_pass_hash(pin, self.pin_hash)
except ValueError as _e:
# old PIN verification
pin_hash = self.get_hashed_pin(pin)
else:
pin_hash = pin
if pin_hash == (self.pin_hash or ""):
res = True
return res
def is_pin_encrypted(self, pin=None):
ret = False
if pin is None:
pin = self.pin_hash or ""
if pin.startswith("@@"):
ret = True
return ret
def get_pin(self):
ret = -1
if self.is_pin_encrypted() is True:
token_pin = self.pin_hash[2:]
ret = decryptPin(token_pin)
return ret
[docs]
def set_so_pin(self, security_officer_pin):
"""
For smartcards this sets the security officer pin of the token
:rtype : None
"""
iv = geturandom(16)
self.so_pin = encrypt(security_officer_pin, iv)
self.so_pin_iv = hexlify_and_unicode(iv)
return self.so_pin, self.so_pin_iv
[docs]
@log_with(log)
def get(self, key=None, fallback=None, save=False):
"""
simulate the dict behaviour to make challenge processing
easier, as this will have to deal as well with
'dict only challenges'
:param key: the attribute name - in case of key is not provided, a dict
of all class attributes are returned
:param fallback: if the attribute is not found,
the fallback is returned
:param save: in case of all attributes and save==True, the timestamp is
converted to a string representation
"""
if key is None:
return self.get_vars(save=save)
td = self.get_vars(save=save)
return td.get(key, fallback)
@log_with(log)
def get_vars(self, save=False):
log.debug('get_vars()')
tokenowner = self.first_owner
ret = {
'id': self.id,
'description': self.description,
'serial': self.serial,
'tokentype': self.tokentype,
'info': self.get_info(),
'resolver': "" if not tokenowner else tokenowner.resolver,
'user_id': "" if not tokenowner else tokenowner.user_id,
'otplen': self.otplen,
'maxfail': self.maxfail,
'active': self.active,
'revoked': self.revoked,
'locked': self.locked,
'failcount': self.failcount,
'count': self.count,
'count_window': self.count_window,
'sync_window': self.sync_window,
'rollout_state': self.rollout_state}
# list of Realm names
realm_list = []
for realm_entry in self.realm_list:
realm_list.append(realm_entry.realm.name)
ret['realms'] = realm_list
# list of tokengroups
tokengroup_list = []
for token_group in self.tokengroup_list:
tokengroup_list.append(token_group.name)
ret['tokengroup'] = tokengroup_list
return ret
def __str__(self):
return self.serial
def __repr__(self):
"""
return the token state as text
:return: token state as string representation
:rtype: str
"""
ldict = {}
for attr in self.__dict__:
key = f"{attr!r}"
val = f"{getattr(self, attr)!r}"
ldict[key] = val
res = f"<{self.__class__!r} {ldict!r}>"
return res
[docs]
def get_info(self):
"""
:return: The token info as dictionary
"""
ret = {}
for tokeninfo in self.info_list:
if tokeninfo.Type:
ret[tokeninfo.Key + ".type"] = tokeninfo.Type
ret[tokeninfo.Key] = tokeninfo.Value
return ret
[docs]
def update_type(self, typ):
"""
in case the previous has been different type
we must reset the counters
But be aware, ray, this could also be upper and lower case mixing...
"""
if self.tokentype.lower() != typ.lower():
self.count = 0
self.failcount = 0
self.tokentype = typ
return
[docs]
def update_otpkey(self, otpkey):
"""
in case of a new hOtpKey we have to do some more things
"""
if otpkey is not None:
otp_key_secret = self.get_otpkey()
if otp_key_secret.compare(otpkey) is False:
log.debug('update token OtpKey - counter reset')
self.set_otpkey(otpkey)
def update_token(self, description=None, otpkey=None, pin=None):
if description is not None:
self.set_description(description)
if pin is not None:
self.set_pin(pin)
if otpkey is not None:
self.update_otpkey(otpkey)
[docs]
class TokenInfo(MethodsMixin, db.Model):
"""
The table "tokeninfo" is used to store additional, long information that
is specific to the tokentype.
E.g. the tokentype "TOTP" has additional entries in the tokeninfo table
for "timeStep" and "timeWindow", which are stored in the
column "Key" and "Value".
The tokeninfo is reference by the foreign key to the "token" table.
"""
__tablename__ = 'tokeninfo'
id: Mapped[int] = mapped_column(Integer, Sequence("tokeninfo_seq"), primary_key=True)
Key: Mapped[str] = mapped_column(Unicode(255), nullable=False)
Value: Mapped[str | None] = mapped_column(UnicodeText(), default='')
Type: Mapped[str | None] = mapped_column(Unicode(100), default='')
Description: Mapped[str | None] = mapped_column(Unicode(2000), default='')
token_id: Mapped[int | None] = mapped_column(Integer, db.ForeignKey('token.id'), index=True)
token = relationship("Token", back_populates="info_list")
__table_args__ = (db.UniqueConstraint('token_id',
'Key',
name='tiix_2'),)
def __init__(self, token_id, Key, Value, Type=None, Description=None):
"""
Create a new tokeninfo for a given token_id
"""
self.token_id = token_id
self.Key = Key
self.Value = convert_column_to_unicode(Value)
self.Type = Type
self.Description = Description
[docs]
class TokenOwner(MethodsMixin, db.Model):
"""
This tables stores the owner of a token.
A token can be assigned to several users.
"""
__tablename__ = 'tokenowner'
id: Mapped[int] = mapped_column(Integer, Sequence("tokenowner_seq"), primary_key=True)
token_id: Mapped[int | None] = mapped_column(Integer, db.ForeignKey('token.id'))
resolver: Mapped[str | None] = mapped_column(Unicode(120), default='', index=True)
user_id: Mapped[str | None] = mapped_column(Unicode(320), default='', index=True)
realm_id: Mapped[int | None] = mapped_column(Integer, db.ForeignKey('realm.id'))
token = relationship('Token', lazy='joined', back_populates='owners')
realm = relationship('Realm', lazy='joined', back_populates='tokenowners')
def __init__(self, token_id=None, serial=None, user_id=None, resolver=None,
realm_id=None, realmname=None):
"""
Create a new token assignment to a user.
:param token_id: The database ID of the token
:param serial: The alternate serial number of the token
:param resolver: The identifying name of the resolver
:param realm_id: The database ID of the realm
:param realmname: The alternate name of realm
"""
if realm_id is not None:
self.realm_id = realm_id
elif realmname:
stmt = select(Realm).filter_by(name=realmname)
r = db.session.execute(stmt).scalar_one_or_none()
if not r:
raise ResourceNotFoundError(f"Realm '{realmname}' does not exist.")
self.realm_id = r.id
if token_id is not None:
self.token_id = token_id
elif serial:
stmt = select(Token).filter_by(serial=serial)
r = db.session.execute(stmt).scalar_one_or_none()
if not r: # pragma: no cover
# usually this is already covered by the lib / token class functions
raise ResourceNotFoundError(f"Token with serial '{serial}' does not exist.")
self.token_id = r.id
self.resolver = resolver
self.user_id = user_id
[docs]
class TokenRealm(MethodsMixin, db.Model):
"""
This table stores to which realms a token is assigned. A token is in the
realm of the user it is assigned to. But a token can also be put into
many additional realms.
"""
__tablename__ = 'tokenrealm'
id: Mapped[int] = mapped_column(Integer, Sequence("tokenrealm_seq"), primary_key=True)
token_id: Mapped[int | None] = mapped_column(Integer,
db.ForeignKey('token.id'))
realm_id: Mapped[int | None] = mapped_column(Integer,
db.ForeignKey('realm.id'))
# This creates an attribute "realm_list" in the Token object
token = relationship('Token',
lazy='joined',
back_populates='realm_list')
# This creates an attribute "token_list" in the Realm object
realm = relationship('Realm',
lazy='joined',
back_populates='token_list')
__table_args__ = (db.UniqueConstraint('token_id',
'realm_id',
name='trix_2'),)
def __init__(self, realm_id=0, token_id=0, realmname=None):
log.debug(f"setting realm_id to {realm_id}")
if realm_id:
self.realm_id = realm_id
elif realmname:
stmt = select(Realm).filter_by(name=realmname)
r = db.session.execute(stmt).scalar_one_or_none()
self.realm_id = r.id
self.token_id = token_id
def get_token_id(serial):
stmt = select(Token).filter(Token.serial == serial)
token = db.session.scalars(stmt).unique().one_or_none()
return token.id if token else None