Source code for privacyidea.models.machine

# SPDX-FileCopyrightText: (C) 2025 NetKnights GmbH <https://netknights.it>
# SPDX-FileCopyrightText: (C) 2025 Paul Lettich <paul.lettich@netknights.it>
#
# SPDX-License-Identifier: AGPL-3.0-or-later
#
# This code is free software; you can redistribute it and/or
# modify it under the terms of the GNU AFFERO GENERAL PUBLIC LICENSE
# as published by the Free Software Foundation; either
# version 3 of the License, or any later version.
#
# This code is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU AFFERO GENERAL PUBLIC LICENSE for more details.
#
# You should have received a copy of the GNU Affero General Public
# License along with this program.  If not, see <http://www.gnu.org/licenses/>.
import logging

from sqlalchemy import (
    Sequence,
    and_,
    select,
    Unicode,
    Integer,
    ForeignKey,
    UniqueConstraint
)
from sqlalchemy.orm import Mapped, mapped_column, relationship

from privacyidea.lib.log import log_with
from privacyidea.lib.utils import convert_column_to_unicode
# Assuming these imports and classes exist in your project
from privacyidea.models import db
from privacyidea.models.token import Token, get_token_id
from privacyidea.models.utils import MethodsMixin

log = logging.getLogger(__name__)


[docs] class MachineResolver(MethodsMixin, db.Model): """ This model holds the definition to the machinestore. Machines could be located in flat files, LDAP directory or in puppet services or other... The usual MachineResolver just holds a name and a type and a reference to its config """ __tablename__ = 'machineresolver' id: Mapped[int] = mapped_column(Integer, Sequence("machineresolver_seq"), primary_key=True, nullable=False) name: Mapped[str] = mapped_column(Unicode(255), default="", unique=True, nullable=False) rtype: Mapped[str] = mapped_column(Unicode(255), default="", nullable=False) # The cascade option handles automatic deletion of related # MachineResolverConfig when a MachineResolver is deleted. rconfig = relationship('MachineResolverConfig', lazy='dynamic', backref='machineresolver', cascade="all, delete-orphan") def __init__(self, name, rtype): self.name = name self.rtype = rtype
[docs] class MachineResolverConfig(db.Model): """ Each Machine Resolver can have multiple configuration entries. The config entries are referenced by the id of the machine resolver """ __tablename__ = 'machineresolverconfig' id: Mapped[int] = mapped_column(Integer, Sequence("machineresolverconf_seq"), primary_key=True) resolver_id: Mapped[int | None] = mapped_column(Integer, ForeignKey('machineresolver.id')) Key: Mapped[str] = mapped_column(Unicode(255), nullable=False) Value: Mapped[str | None] = mapped_column(Unicode(2000), default='') Type: Mapped[str | None] = mapped_column(Unicode(2000), default='') Description: Mapped[str | None] = mapped_column(Unicode(2000), default='') __table_args__ = (UniqueConstraint('resolver_id', 'Key', name='mrcix_2'),) def __init__(self, resolver_id=None, Key=None, Value=None, resolver=None, Type="", Description=""): if resolver_id: self.resolver_id = resolver_id elif resolver: # Replaced .query with a modern select statement stmt = select(MachineResolver.id).filter_by(name=resolver) self.resolver_id = db.session.execute(stmt).scalar_one_or_none() self.Key = Key self.Value = convert_column_to_unicode(Value) self.Type = Type self.Description = Description
[docs] class MachineToken(MethodsMixin, db.Model): """ The MachineToken assigns a Token and an application type to a machine. The Machine is represented as the tuple of machineresolver.id and the machine_id. The machine_id is defined by the machineresolver. This can be an n:m mapping. """ __tablename__ = 'machinetoken' id: Mapped[int] = mapped_column(Integer, Sequence("machinetoken_seq"), primary_key=True, nullable=False) token_id: Mapped[int | None] = mapped_column(Integer, ForeignKey('token.id')) machineresolver_id: Mapped[int | None] = mapped_column(Integer) machine_id: Mapped[str | None] = mapped_column(Unicode(255)) application: Mapped[str | None] = mapped_column(Unicode(64)) # This connects the machine with the token and makes the machines visible # in the token as "machine_list". The cascade option handles automatic # deletion of related MachineTokenOptions when a MachineToken is deleted. token = relationship('Token', lazy='joined', back_populates='machine_list') option_list = relationship('MachineTokenOptions', lazy='joined', backref='machinetoken', cascade="all, delete-orphan") @log_with(log) def __init__(self, machineresolver_id=None, machineresolver=None, machine_id=None, token_id=None, serial=None, application=None): if machineresolver_id: self.machineresolver_id = machineresolver_id elif machineresolver: # Replaced .query with a modern select statement stmt = select(MachineResolver.id).filter( MachineResolver.name == machineresolver ) self.machineresolver_id = db.session.execute(stmt).scalar_one_or_none() if token_id: self.token_id = token_id elif serial: # Replaced .query with a modern select statement stmt = select(Token.id).filter_by(serial=serial) self.token_id = db.session.execute(stmt).scalar_one_or_none() self.machine_id = machine_id self.application = application
[docs] class MachineTokenOptions(db.Model): """ This class holds an Option for the token assigned to a certain client machine. Each Token-Clientmachine-Combination can have several options. """ __tablename__ = 'machinetokenoptions' id: Mapped[int] = mapped_column(Integer, Sequence("machtokenopt_seq"), primary_key=True, nullable=False) machinetoken_id: Mapped[int | None] = mapped_column(Integer, ForeignKey('machinetoken.id')) mt_key: Mapped[str] = mapped_column(Unicode(64), nullable=False) mt_value: Mapped[str] = mapped_column(Unicode(64), nullable=False) def __init__(self, machinetoken_id, key, value): log.debug(f"setting {key!r} to {value!r} for MachineToken {machinetoken_id!s}") self.machinetoken_id = machinetoken_id self.mt_key = convert_column_to_unicode(key) self.mt_value = convert_column_to_unicode(value)
[docs] def get_machineresolver_id(resolvername): """ Return the database ID of the machine resolver :param resolvername: :return: """ # Replaced .query.first() with a modern select statement stmt = select(MachineResolver).filter(MachineResolver.name == resolvername) mr = db.session.execute(stmt).scalar_one_or_none() return mr.id
[docs] def get_machinetoken_ids(machine_id, resolver_name, serial, application): """ Returns a list of the ID in the machinetoken table :param machine_id: The resolverdependent machine_id :type machine_id: basestring :param resolver_name: The name of the resolver :type resolver_name: basestring :param serial: the serial number of the token :type serial: basestring :param application: The application type :type application: basestring :return: A list of IDs of the machinetoken entry :rtype: list of int """ ret = [] # Replaced .query with modern select statements token_id = get_token_id(serial) resolver_id = None if resolver_name: stmt = select(MachineResolver.id).filter(MachineResolver.name == resolver_name) resolver_id = db.session.execute(stmt).scalar_one_or_none() stmt = select(MachineToken).filter( and_( MachineToken.token_id == token_id, MachineToken.machineresolver_id == resolver_id, MachineToken.machine_id == machine_id, MachineToken.application == application ) ) mtokens = db.session.scalars(stmt).unique().all() if mtokens: for mt in mtokens: ret.append(mt.id) return ret