Source code for privacyidea.models.server

# SPDX-FileCopyrightText: (C) 2025 NetKnights GmbH <https://netknights.it>
# SPDX-FileCopyrightText: (C) 2025 Paul Lettich <paul.lettich@netknights.it>
#
# SPDX-License-Identifier: AGPL-3.0-or-later
#
# This code is free software; you can redistribute it and/or
# modify it under the terms of the GNU AFFERO GENERAL PUBLIC LICENSE
# as published by the Free Software Foundation; either
# version 3 of the License, or any later version.
#
# This code is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU AFFERO GENERAL PUBLIC LICENSE for more details.
#
# You should have received a copy of the GNU Affero General Public
# License along with this program.  If not, see <http://www.gnu.org/licenses/>.

from sqlalchemy import Sequence, CheckConstraint

from privacyidea.models import db
from privacyidea.models.utils import MethodsMixin


[docs] class PrivacyIDEAServer(MethodsMixin, db.Model): """ This table can store remote privacyIDEA server definitions """ __tablename__ = 'privacyideaserver' id = db.Column(db.Integer, Sequence("privacyideaserver_seq"), primary_key=True) # This is a name to refer to identifier = db.Column(db.Unicode(255), nullable=False, unique=True) # This is the FQDN or the IP address url = db.Column(db.Unicode(255), nullable=False) tls = db.Column(db.Boolean, default=False) description = db.Column(db.Unicode(2000), default='') def save(self): pi = PrivacyIDEAServer.query.filter(PrivacyIDEAServer.identifier == self.identifier).first() if pi is None: # create a new one db.session.add(self) db.session.commit() ret = self.id else: # update values = {"url": self.url} if self.tls is not None: values["tls"] = self.tls if self.description is not None: values["description"] = self.description PrivacyIDEAServer.query.filter(PrivacyIDEAServer.identifier == self.identifier).update(values) ret = pi.id db.session.commit() return ret
[docs] class RADIUSServer(MethodsMixin, db.Model): """ This table can store configurations of RADIUS servers. https://github.com/privacyidea/privacyidea/issues/321 It saves * a unique name * a description * an IP address a * a Port * a secret * timeout in seconds (default 5) * retries (default 3) These RADIUS server definitions can be used in RADIUS tokens or in a radius passthru policy. """ __tablename__ = 'radiusserver' __table_args__ = ( CheckConstraint("options IS JSON", name="radiusserver_options_is_json", _create_rule=lambda compiler: compiler.dialect.name == "oracle"), ) id = db.Column(db.Integer, Sequence("radiusserver_seq"), primary_key=True) # This is a name to refer to identifier = db.Column(db.Unicode(255), nullable=False, unique=True) # This is the FQDN or the IP address server = db.Column(db.Unicode(255), nullable=False) port = db.Column(db.Integer, default=25) secret = db.Column(db.Unicode(255), default="") dictionary = db.Column(db.Unicode(255), default="/etc/privacyidea/dictionary") description = db.Column(db.Unicode(2000), default='') timeout = db.Column(db.Integer, default=5) retries = db.Column(db.Integer, default=3) options = db.Column(db.JSON)
[docs] def save(self): """ If a RADIUS server with a given name is save, then the existing RADIUS server is updated. """ radius = RADIUSServer.query.filter(RADIUSServer.identifier == self.identifier).first() if radius is None: # create a new one db.session.add(self) db.session.commit() ret = self.id else: # update values = {"server": self.server} if self.port is not None: values["port"] = self.port if self.secret is not None: values["secret"] = self.secret if self.dictionary is not None: values["dictionary"] = self.dictionary if self.description is not None: values["description"] = self.description if self.timeout is not None: values["timeout"] = int(self.timeout) if self.retries is not None: values["retries"] = int(self.retries) if self.options is not None: values["options"] = self.options RADIUSServer.query.filter(RADIUSServer.identifier == self.identifier).update(values) ret = radius.id db.session.commit() return ret
[docs] class SMTPServer(MethodsMixin, db.Model): """ This table can store configurations for SMTP servers. Each entry represents an SMTP server. EMail Token, SMS SMTP Gateways or Notifications like PIN handlers are supposed to use a reference to a server definition. Each Machine Resolver can have multiple configuration entries. The config entries are referenced by the id of the machine resolver """ __tablename__ = 'smtpserver' id = db.Column(db.Integer, Sequence("smtpserver_seq"), primary_key=True) # This is a name to refer to identifier = db.Column(db.Unicode(255), nullable=False) # This is the FQDN or the IP address server = db.Column(db.Unicode(255), nullable=False) port = db.Column(db.Integer, default=25) username = db.Column(db.Unicode(255), default="") password = db.Column(db.Unicode(255), default="") sender = db.Column(db.Unicode(255), default="") tls = db.Column(db.Boolean, default=False) description = db.Column(db.Unicode(2000), default='') timeout = db.Column(db.Integer, default=10) enqueue_job = db.Column(db.Boolean, nullable=False, default=False)
[docs] def get(self): """ :return: the configuration as a dictionary """ return { "id": self.id, "identifier": self.identifier, "server": self.server, "port": self.port, "username": self.username, "password": self.password, "sender": self.sender, "tls": self.tls, "description": self.description, "timeout": self.timeout, "enqueue_job": self.enqueue_job, }
def save(self): smtp = SMTPServer.query.filter(SMTPServer.identifier == self.identifier).first() if smtp is None: # create a new one db.session.add(self) db.session.commit() ret = self.id else: # update values = {"server": self.server} if self.port is not None: values["port"] = self.port if self.username is not None: values["username"] = self.username if self.password is not None: values["password"] = self.password if self.sender is not None: values["sender"] = self.sender if self.tls is not None: values["tls"] = self.tls if self.description is not None: values["description"] = self.description if self.timeout is not None: values["timeout"] = self.timeout if self.enqueue_job is not None: values["enqueue_job"] = self.enqueue_job SMTPServer.query.filter(SMTPServer.identifier == self.identifier).update(values) ret = smtp.id db.session.commit() return ret