Source code for privacyidea.models.subscription
# SPDX-FileCopyrightText: (C) 2025 NetKnights GmbH <https://netknights.it>
# SPDX-FileCopyrightText: (C) 2025 Paul Lettich <paul.lettich@netknights.it>
#
# SPDX-License-Identifier: AGPL-3.0-or-later
#
# This code is free software; you can redistribute it and/or
# modify it under the terms of the GNU AFFERO GENERAL PUBLIC LICENSE
# as published by the Free Software Foundation; either
# version 3 of the License, or any later version.
#
# This code is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU AFFERO GENERAL PUBLIC LICENSE for more details.
#
# You should have received a copy of the GNU Affero General Public
# License along with this program. If not, see <http://www.gnu.org/licenses/>.
import logging
import traceback
from datetime import datetime
from sqlalchemy import Sequence
from sqlalchemy.exc import IntegrityError
from privacyidea.models import db
from privacyidea.models.utils import MethodsMixin
log = logging.getLogger(__name__)
[docs]
class ClientApplication(MethodsMixin, db.Model):
"""
This table stores the clients, which sent an authentication request to
privacyIDEA.
This table is filled automatically by authentication requests.
"""
__tablename__ = 'clientapplication'
id = db.Column(db.Integer, Sequence("clientapp_seq"), primary_key=True)
ip = db.Column(db.Unicode(255), nullable=False, index=True)
hostname = db.Column(db.Unicode(255))
clienttype = db.Column(db.Unicode(255), nullable=False, index=True)
lastseen = db.Column(db.DateTime, index=True, default=datetime.utcnow())
node = db.Column(db.Unicode(255), nullable=False)
__table_args__ = (db.UniqueConstraint('ip',
'clienttype',
'node',
name='caix'),)
def save(self):
clientapp = ClientApplication.query.filter(
ClientApplication.ip == self.ip,
ClientApplication.clienttype == self.clienttype,
ClientApplication.node == self.node).first()
self.lastseen = datetime.now()
if clientapp is None:
# create a new one
db.session.add(self)
else:
# update
values = {"lastseen": self.lastseen}
if self.hostname is not None:
values["hostname"] = self.hostname
ClientApplication.query.filter(ClientApplication.id == clientapp.id).update(values)
try:
db.session.commit()
except IntegrityError as e: # pragma: no cover
log.info('Unable to write ClientApplication entry to db: {0!s}'.format(e))
log.debug(traceback.format_exc())
def __repr__(self):
return "<ClientApplication [{0!s}][{1!s}:{2!s}] on {3!s}>".format(
self.id, self.ip, self.clienttype, self.node)
[docs]
class Subscription(MethodsMixin, db.Model):
"""
This table stores the imported subscription files.
"""
__tablename__ = 'subscription'
id = db.Column(db.Integer, Sequence("subscription_seq"), primary_key=True)
application = db.Column(db.Unicode(80), index=True)
for_name = db.Column(db.Unicode(80), nullable=False)
for_address = db.Column(db.Unicode(128))
for_email = db.Column(db.Unicode(128), nullable=False)
for_phone = db.Column(db.Unicode(50), nullable=False)
for_url = db.Column(db.Unicode(80))
for_comment = db.Column(db.Unicode(255))
by_name = db.Column(db.Unicode(50), nullable=False)
by_email = db.Column(db.Unicode(128), nullable=False)
by_address = db.Column(db.Unicode(128))
by_phone = db.Column(db.Unicode(50))
by_url = db.Column(db.Unicode(80))
date_from = db.Column(db.DateTime)
date_till = db.Column(db.DateTime)
num_users = db.Column(db.Integer)
num_tokens = db.Column(db.Integer)
num_clients = db.Column(db.Integer)
level = db.Column(db.Unicode(80))
signature = db.Column(db.Unicode(640))
def save(self):
subscription = Subscription.query.filter(
Subscription.application == self.application).first()
if subscription is None:
# create a new one
db.session.add(self)
db.session.commit()
ret = self.id
else:
# update
values = self.get()
Subscription.query.filter(
Subscription.id == subscription.id).update(values)
ret = subscription.id
db.session.commit()
return ret
def __repr__(self):
return "<Subscription [{0!s}][{1!s}:{2!s}:{3!s}]>".format(
self.id, self.application, self.for_name, self.by_name)
[docs]
def get(self):
"""
Return the database object as dict
:return:
"""
d = {}
for attr in Subscription.__table__.columns.keys():
if getattr(self, attr) is not None:
d[attr] = getattr(self, attr)
return d