Source code for privacyidea.models.machine

# SPDX-FileCopyrightText: (C) 2025 NetKnights GmbH <https://netknights.it>
# SPDX-FileCopyrightText: (C) 2025 Paul Lettich <paul.lettich@netknights.it>
#
# SPDX-License-Identifier: AGPL-3.0-or-later
#
# This code is free software; you can redistribute it and/or
# modify it under the terms of the GNU AFFERO GENERAL PUBLIC LICENSE
# as published by the Free Software Foundation; either
# version 3 of the License, or any later version.
#
# This code is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU AFFERO GENERAL PUBLIC LICENSE for more details.
#
# You should have received a copy of the GNU Affero General Public
# License along with this program.  If not, see <http://www.gnu.org/licenses/>.

import logging
from sqlalchemy import Sequence, and_

from privacyidea.models import db
from privacyidea.models.token import Token, get_token_id
from privacyidea.models.config import save_config_timestamp
from privacyidea.models.utils import MethodsMixin
from privacyidea.lib.utils import convert_column_to_unicode
from privacyidea.lib.log import log_with

log = logging.getLogger(__name__)


[docs] class MachineToken(MethodsMixin, db.Model): """ The MachineToken assigns a Token and an application type to a machine. The Machine is represented as the tuple of machineresolver.id and the machine_id. The machine_id is defined by the machineresolver. This can be an n:m mapping. """ __tablename__ = 'machinetoken' id = db.Column(db.Integer(), Sequence("machinetoken_seq"), primary_key=True, nullable=False) token_id = db.Column(db.Integer(), db.ForeignKey('token.id')) machineresolver_id = db.Column(db.Integer()) machine_id = db.Column(db.Unicode(255)) application = db.Column(db.Unicode(64)) # This connects the machine with the token and makes the machines visible # in the token as "machine_list". token = db.relationship('Token', lazy='joined', backref='machine_list') @log_with(log) def __init__(self, machineresolver_id=None, machineresolver=None, machine_id=None, token_id=None, serial=None, application=None): if machineresolver_id: self.machineresolver_id = machineresolver_id elif machineresolver: # determine the machineresolver_id: self.machineresolver_id = MachineResolver.query.filter( MachineResolver.name == machineresolver).first().id if token_id: self.token_id = token_id elif serial: # determine token_id self.token_id = Token.query.filter_by(serial=serial).first().id self.machine_id = machine_id self.application = application def delete(self): ret = self.id db.session.query(MachineTokenOptions) \ .filter(MachineTokenOptions.machinetoken_id == self.id) \ .delete() db.session.delete(self) save_config_timestamp() db.session.commit() return ret
[docs] class MachineTokenOptions(db.Model): """ This class holds an Option for the token assigned to a certain client machine. Each Token-Clientmachine-Combination can have several options. """ __tablename__ = 'machinetokenoptions' id = db.Column(db.Integer(), Sequence("machtokenopt_seq"), primary_key=True, nullable=False) machinetoken_id = db.Column(db.Integer(), db.ForeignKey('machinetoken.id')) mt_key = db.Column(db.Unicode(64), nullable=False) mt_value = db.Column(db.Unicode(64), nullable=False) # This connects the MachineTokenOption with the MachineToken and makes the # options visible in the MachineToken as "option_list". machinetoken = db.relationship('MachineToken', lazy='joined', backref='option_list') def __init__(self, machinetoken_id, key, value): log.debug("setting {0!r} to {1!r} for MachineToken {2!s}".format(key, value, machinetoken_id)) self.machinetoken_id = machinetoken_id self.mt_key = convert_column_to_unicode(key) self.mt_value = convert_column_to_unicode(value) # if the combination machinetoken_id / mt_key already exist, # we need to update c = MachineTokenOptions.query.filter_by( machinetoken_id=self.machinetoken_id, mt_key=self.mt_key).first() if c is None: # create a new one db.session.add(self) else: # update MachineTokenOptions.query.filter_by( machinetoken_id=self.machinetoken_id, mt_key=self.mt_key).update({'mt_value': self.mt_value}) db.session.commit()
[docs] class MachineResolver(MethodsMixin, db.Model): """ This model holds the definition to the machinestore. Machines could be located in flat files, LDAP directory or in puppet services or other... The usual MachineResolver just holds a name and a type and a reference to its config """ __tablename__ = 'machineresolver' id = db.Column(db.Integer, Sequence("machineresolver_seq"), primary_key=True, nullable=False) name = db.Column(db.Unicode(255), default="", unique=True, nullable=False) rtype = db.Column(db.Unicode(255), default="", nullable=False) rconfig = db.relationship('MachineResolverConfig', lazy='dynamic', backref='machineresolver') def __init__(self, name, rtype): self.name = name self.rtype = rtype def delete(self): ret = self.id # delete all MachineResolverConfig db.session.query(MachineResolverConfig) \ .filter(MachineResolverConfig.resolver_id == ret) \ .delete() # delete the MachineResolver itself db.session.delete(self) db.session.commit() return ret
[docs] class MachineResolverConfig(db.Model): """ Each Machine Resolver can have multiple configuration entries. The config entries are referenced by the id of the machine resolver """ __tablename__ = 'machineresolverconfig' id = db.Column(db.Integer, Sequence("machineresolverconf_seq"), primary_key=True) resolver_id = db.Column(db.Integer, db.ForeignKey('machineresolver.id')) Key = db.Column(db.Unicode(255), nullable=False) Value = db.Column(db.Unicode(2000), default='') Type = db.Column(db.Unicode(2000), default='') Description = db.Column(db.Unicode(2000), default='') __table_args__ = (db.UniqueConstraint('resolver_id', 'Key', name='mrcix_2'),) def __init__(self, resolver_id=None, Key=None, Value=None, resolver=None, Type="", Description=""): if resolver_id: self.resolver_id = resolver_id elif resolver: self.resolver_id = MachineResolver.query \ .filter_by(name=resolver) \ .first() \ .id self.Key = Key self.Value = convert_column_to_unicode(Value) self.Type = Type self.Description = Description def save(self): c = MachineResolverConfig.query.filter_by( resolver_id=self.resolver_id, Key=self.Key).first() if c is None: # create a new one db.session.add(self) db.session.commit() ret = self.id else: # update MachineResolverConfig.query.filter_by( resolver_id=self.resolver_id, Key=self.Key) \ .update({'Value': self.Value, 'Type': self.Type, 'Description': self.Description}) ret = c.id db.session.commit() return ret
[docs] def get_machineresolver_id(resolvername): """ Return the database ID of the machine resolver :param resolvername: :return: """ mr = MachineResolver.query.filter(MachineResolver.name == resolvername).first() return mr.id
[docs] def get_machinetoken_ids(machine_id, resolver_name, serial, application): """ Returns a list of the ID in the machinetoken table :param machine_id: The resolverdependent machine_id :type machine_id: basestring :param resolver_name: The name of the resolver :type resolver_name: basestring :param serial: the serial number of the token :type serial: basestring :param application: The application type :type application: basestring :return: A list of IDs of the machinetoken entry :rtype: list of int """ ret = [] token_id = get_token_id(serial) if resolver_name: resolver = MachineResolver.query.filter(MachineResolver.name == resolver_name).first() resolver_id = resolver.id else: resolver_id = None mtokens = MachineToken.query.filter(and_(MachineToken.token_id == token_id, MachineToken.machineresolver_id == resolver_id, MachineToken.machine_id == machine_id, MachineToken.application == application)).all() if mtokens: for mt in mtokens: ret.append(mt.id) return ret