Source code for privacyidea.lib.auditmodules.sqlaudit

# (c) NetKnights GmbH 2025,  https://netknights.it
#
#  2016-04-08 Cornelius Kölbel <cornelius@privacyidea.org>
#             Avoid consecutive if statements
#
#  privacyIDEA
#  May 11, 2014 Cornelius Kölbel, info@privacyidea.org
#  http://www.privacyidea.org
#
# This code is free software; you can redistribute it and/or
# modify it under the terms of the GNU AFFERO GENERAL PUBLIC LICENSE
# License as published by the Free Software Foundation; either
# version 3 of the License, or any later version.
#
# This code is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU AFFERO GENERAL PUBLIC LICENSE for more details.
#
# You should have received a copy of the GNU Affero General Public
# License along with this program.  If not, see <http://www.gnu.org/licenses/>.
#
# SPDX-FileCopyrightText: 2025 Paul Lettich <paul.lettich@netknights.it>
# SPDX-License-Identifier: AGPL-3.0-or-later
#
"""The SQL Audit Module is used to write audit entries to an SQL
database.
The SQL Audit Module is configured like this::

    PI_AUDIT_MODULE = "privacyidea.lib.auditmodules.sqlaudit"
    PI_AUDIT_KEY_PRIVATE = "tests/testdata/private.pem"
    PI_AUDIT_KEY_PUBLIC = "tests/testdata/public.pem"
    PI_AUDIT_SERVERNAME = "your choice"

Optional::

    PI_AUDIT_SQL_URI = "sqlite://"
    PI_AUDIT_SQL_TRUNCATE = True | False
    PI_AUDIT_SQL_COLUMN_LENGTH = {"user": 60, "info": 10 ...}

If the PI_AUDIT_SQL_URI is omitted the Audit data is written to the
token database.
"""

import datetime
import logging
import traceback
from collections import OrderedDict

from sqlalchemy import asc, desc, and_, or_, select, delete
from sqlalchemy import create_engine
from sqlalchemy.ext.compiler import compiles
from sqlalchemy.orm import sessionmaker, scoped_session
from sqlalchemy.sql.expression import FunctionElement

from privacyidea.config import ConfigKey
from privacyidea.lib.auditmodules.base import Audit as AuditBase, Paginate
from privacyidea.lib.crypto import Sign
from privacyidea.lib.lifecycle import register_finalizer
from privacyidea.lib.pooling import get_engine
from privacyidea.lib.utils import censor_connect_string
from privacyidea.lib.utils import truncate_comma_list, is_true
from privacyidea.models import Audit as LogEntry
from privacyidea.models import audit_column_length as column_length

log = logging.getLogger(__name__)


# Define function to convert SQL DateTime objects to an ISO-format string
# By using <https://docs.sqlalchemy.org/en/14/core/compiler.html> we can
# differentiate between different dialects.
[docs] class to_isodate(FunctionElement): name = 'to_isodate' inherit_cache = True
[docs] @compiles(to_isodate, 'oracle') @compiles(to_isodate, 'postgresql') def fn_to_isodate_oracle_pg(element, compiler, **kw): return f"to_char({compiler.process(element.clauses, **kw)}, 'IYYY-MM-DD HH24:MI:SS')"
[docs] @compiles(to_isodate, 'sqlite') def fn_to_isodate_sqlite(element, compiler, **kw): # sqlite does not have a DateTime type, they are already in ISO format return f"{compiler.process(element.clauses, **kw)}"
[docs] @compiles(to_isodate) def fn_to_isodate_default(element, compiler, **kw): # %% escapes a literal % past the DBAPI driver's pyformat paramstyle. return "date_format({}, '%%Y-%%m-%%d %%H:%%i:%%s')".format(compiler.process( element.clauses, **kw))
def _now(): """ Returns the current local date and time. This function is required to be able to mock datetime.now() in tests. """ return datetime.datetime.now()
[docs] class Audit(AuditBase): """ This is the SQLAudit module, which writes the audit entries to an SQL database table. It requires the following configuration parameters in :ref:`cfgfile`: * ``PI_AUDIT_KEY_PUBLIC`` * ``PI_AUDIT_KEY_PRIVATE`` If you want to host the SQL Audit database in another DB than the token DB, you can use: * ``PI_AUDIT_SQL_URI`` and * ``PI_AUDIT_SQL_OPTIONS`` With ``PI_AUDIT_SQL_OPTIONS = {}`` You can pass options to the DB engine creation. If ``PI_AUDIT_SQL_OPTIONS`` is not set, ``SQLALCHEMY_ENGINE_OPTIONS`` will be used. This module also takes the following optional parameters: * ``PI_AUDIT_POOL_SIZE`` * ``PI_AUDIT_POOL_RECYCLE`` * ``PI_AUDIT_SQL_TRUNCATE`` * ``PI_AUDIT_NO_SIGN`` * ``PI_CHECK_OLD_SIGNATURES`` You can use ``PI_AUDIT_NO_SIGN = True`` to avoid signing of the audit log. If ``PI_CHECK_OLD_SIGNATURES = True`` old style signatures (text-book RSA) will be checked as well, otherwise they will be marked as ``FAIL``. """ is_readable = True def __init__(self, config=None, startdate=None): super().__init__(config, startdate) self.name = "sqlaudit" self.sign_data = not self.config.get(ConfigKey.AUDIT_NO_SIGN) self.sign_object = None self.verify_old_sig = self.config.get(ConfigKey.CHECK_OLD_SIGNATURES) # Disable the costly checking of private RSA keys when loading them. self.check_private_key = not self.config.get(ConfigKey.AUDIT_NO_PRIVATE_KEY_CHECK, False) if self.sign_data: self.read_keys(self.config.get(ConfigKey.AUDIT_KEY_PUBLIC), self.config.get(ConfigKey.AUDIT_KEY_PRIVATE)) self.sign_object = Sign(self.private, self.public, check_private_key=self.check_private_key) # Read column_length from the config file config_column_length = self.config.get(ConfigKey.AUDIT_SQL_COLUMN_LENGTH, {}) # fill the missing parts with the default from the models self.custom_column_length = {k: (v if k not in config_column_length else config_column_length[k]) for k, v in column_length.items()} # We can use "sqlaudit" as the key because the SQLAudit connection # string is fixed for a running privacyIDEA instance. # In other words, we will not run into any problems with changing connect strings. self.engine = get_engine(self.name, self._create_engine) # create a configured "Session" class. ``scoped_session`` is not # necessary because we do not share session objects among threads. # We use it anyway as a safety measure. Session = scoped_session(sessionmaker(bind=self.engine)) self.session = Session() # Ensure that the connection gets returned to the pool when the request has # been handled. This may close an already-closed session, but this is not a problem. register_finalizer(self._finalize_session) self.session._model_changes = {} def _create_engine(self): """ :return: a new SQLAlchemy engine connecting to the database specified in PI_AUDIT_SQL_URI. """ # an Engine, which the Session will use for connection # resources connect_string = self.config.get(ConfigKey.AUDIT_SQL_URI, self.config.get( ConfigKey.SQLALCHEMY_DATABASE_URI)) log.debug(f"using the connect string {censor_connect_string(connect_string)!s}") # if no specific audit engine options are given, use the default from # SQLALCHEMY_ENGINE_OPTIONS or none sqa_options = self.config.get(ConfigKey.AUDIT_SQL_OPTIONS, self.config.get(ConfigKey.SQLALCHEMY_ENGINE_OPTIONS, {})) log.debug(f"Using Audit SQLAlchemy engine options: {sqa_options!s}") try: pool_size = self.config.get(ConfigKey.AUDIT_POOL_SIZE, 20) engine = create_engine( connect_string, pool_size=pool_size, pool_recycle=self.config.get(ConfigKey.AUDIT_POOL_RECYCLE, 600), **sqa_options) log.debug(f"Using SQL pool size of {pool_size}") except TypeError: # SQLite does not support pool_size engine = create_engine(connect_string, **sqa_options) log.debug("Using no SQL pool_size.") return engine def _finalize_session(self): """ Close current session and dispose connections of db engine""" self.session.close() self.engine.dispose() def _truncate_data(self): """ Truncate self.audit_data according to the self.custom_column_length. :return: None """ for column, length in self.custom_column_length.items(): if column in self.audit_data: data = self.audit_data[column] if isinstance(data, str): if column == "policies": # The policies column is shortened per comma entry data = truncate_comma_list(data, length) else: data = data[:length] self.audit_data[column] = data @staticmethod def _create_filter(param: dict, admin_params: dict | None = None, timelimit: datetime.timedelta | None = None): """ create a filter condition for the logentry :param param: Filter parameters that are concatenated with a logical AND :param admin_params: Optional admin parameters containing the admin name, admin realm and a list of realms the admin is allowed to see, such as :: {"admin": "admin_name", "admin_realm": "realm_of_the_admin", "allowed_audit_realms": ["realm1", "realm2"]} :param timelimit: Only audit entries newer than this timedelta """ conditions = [] param = param or {} # For admins, only get audit entries of their allowed realms and their own realm filter_realm = None if admin_params: admin = admin_params.get("admin") admin_realm = admin_params.get("admin_realm") allowed_audit_realms = admin_params.get("allowed_audit_realms", []) if allowed_audit_realms: # search condition realm_conditions = [] for realm in allowed_audit_realms: realm_conditions.append(LogEntry.realm == realm) # If the admin is limited to some realms, we need to additionally filter for their own audit entries if admin and admin_realm: realm_conditions.append(and_(LogEntry.administrator == admin, LogEntry.realm == admin_realm)) elif admin: realm_conditions.append(LogEntry.administrator == admin) filter_realm = or_(*realm_conditions) for search_key in param.keys(): search_value = param.get(search_key) if search_key == "allowed_audit_realm": # Add each realm in the allowed_audit_realm list to the search condition realm_conditions = [] for realm in search_value: realm_conditions.append(LogEntry.realm == realm) filter_realm = or_(*realm_conditions) conditions.append(filter_realm) # We do not search if the search value only consists of '*' elif search_value.strip() != '' and search_value.strip('*') != '': try: if search_key == "success": # "success" is the only integer. search_value = search_value.strip("*") conditions.append(getattr(LogEntry, search_key) == int(is_true(search_value))) else: # All other keys are compared as strings column = getattr(LogEntry, search_key) if search_key in ["date", "startdate"]: # but we cast a column with a DateTime type to an # ISO-format string first column = to_isodate(column) search_value = search_value.replace('*', '%') if '%' in search_value: if search_value.startswith("!"): conditions.append(column.notlike(search_value[1:])) else: conditions.append(column.like(search_value)) else: if search_value.startswith("!"): conditions.append(column != search_value[1:]) else: conditions.append(column == search_value) except Exception as exx: # The search_key was no search key but some # bullshit stuff in the param log.debug(f"Not a valid searchkey: {exx!s}") if timelimit: conditions.append(LogEntry.date >= datetime.datetime.now() - timelimit) # Combine them with or to a BooleanClauseList if conditions: filter_condition = and_(*conditions) else: filter_condition = and_(True, *conditions) if filter_realm is not None: filter_condition = and_(filter_condition, filter_realm) return filter_condition
[docs] def get_total(self, param: dict, admin_params: dict | None = None, AND: bool = True, display_error: bool = True, timelimit: datetime.timedelta | None = None) -> int: """ This method returns the total number of audit entries in the audit store """ count = 0 # if param contains search filters, we build the search filter # to only return the number of those entries filter_condition = self._create_filter(param, admin_params, timelimit=timelimit) try: count = self.session.query(LogEntry.id).filter(filter_condition).count() finally: self.session.close() return count
[docs] def finalize_log(self): """ This method is used to log the data. It should hash the data and do a hash chain and sign the data """ try: for entry, value in self.audit_data.items(): if isinstance(value, list): self.audit_data[entry] = ",".join(value) if self.config.get(ConfigKey.AUDIT_SQL_TRUNCATE): self._truncate_data() if "tokentype" in self.audit_data: log.warning("We have a wrong 'tokentype' key. This should not happen. Fix it!. " "Error occurs in action: {!r}.".format(self.audit_data.get("action"))) if "token_type" not in self.audit_data: self.audit_data["token_type"] = self.audit_data.get("tokentype") end_date = _now() if self.audit_data.get("startdate"): duration = end_date - self.audit_data.get("startdate") else: duration = None le = LogEntry(action=self.audit_data.get("action"), success=int(self.audit_data.get("success", 0)), authentication=self.audit_data.get("authentication"), serial=self.audit_data.get("serial"), token_type=self.audit_data.get("token_type"), container_serial=self.audit_data.get("container_serial"), container_type=self.audit_data.get("container_type"), user=self.audit_data.get("user"), realm=self.audit_data.get("realm"), resolver=self.audit_data.get("resolver"), administrator=self.audit_data.get("administrator"), action_detail=self.audit_data.get("action_detail"), info=self.audit_data.get("info"), privacyidea_server=self.audit_data.get("privacyidea_server"), client=self.audit_data.get("client", ""), user_agent=self.audit_data.get("user_agent"), user_agent_version=self.audit_data.get("user_agent_version"), loglevel=self.audit_data.get("log_level"), clearance_level=self.audit_data.get("clearance_level"), policies=self.audit_data.get("policies"), startdate=self.audit_data.get("startdate"), duration=duration, date=end_date, thread_id=self.audit_data.get("thread_id") ) self.session.add(le) self.session.commit() # Add the signature if self.sign_data and self.sign_object: s = self._log_to_string(le) sign = self.sign_object.sign(s) le.signature = sign self.session.merge(le) self.session.commit() except Exception as exx: # pragma: no cover # in case of a Unicode Error in _log_to_string() we won't have # a signature, but the log entry is available log.error(f"exception {exx!r}") log.error(f"DATA: {self.audit_data!s}") log.debug(f"{traceback.format_exc()!s}") self.session.rollback() finally: self.session.close() # clear the audit data self.audit_data = {}
def _check_missing(self, audit_id): """ Check if the audit log contains the entries before and after the given id. TODO: We can not check at the moment if the first or the last entries were deleted. If we want to do this, we need to store some signed meta information: 1. Which one was the first entry. (use initialize_log) 2. Which one was the last entry. """ res = False try: id_bef = self.session.query(LogEntry.id ).filter(LogEntry.id == int(audit_id) - 1).count() id_aft = self.session.query(LogEntry.id ).filter(LogEntry.id == int(audit_id) + 1).count() # We may not do a commit! # self.session.commit() if id_bef and id_aft: res = True except Exception as exx: # pragma: no cover log.error(f"exception {exx!r}") log.debug(f"{traceback.format_exc()!s}") # self.session.rollback() finally: # self.session.close() pass return res @staticmethod def _log_to_string(le): """ This function creates a string from the logentry so that this string can be signed. Note: Not all elements of the LogEntry are used to generate the string (the Signature is not!), otherwise we could have used pickle :param le: LogEntry object containing the data :type le: LogEntry :rtype str """ # TODO: Add thread_id. We really should add a versioning to identify which audit data is signed. s = f"id={le.id},date={le.date},action={le.action},succ={le.success},serial={le.serial}," \ f"t={le.token_type},u={le.user},r={le.realm},adm={le.administrator},ad={le.action_detail}," \ f"i={le.info},ps={le.privacyidea_server},c={le.client},l={le.loglevel},cl={le.clearance_level}" # If we have the new log entries, we also add them for signing and verification. if le.startdate: s += f",{le.startdate!s}" if le.duration: s += f",{le.duration!s}" if le.container_serial: s += f",c_serial={le.container_serial}" if le.container_type: s += f",ct={le.container_type}" return s @staticmethod def _get_logentry_attribute(key): """ This function returns the LogEntry attribute for the given key value """ sortname = {'number': LogEntry.id, 'action': LogEntry.action, 'success': LogEntry.success, 'serial': LogEntry.serial, 'date': LogEntry.date, 'startdate': LogEntry.startdate, 'duration': LogEntry.duration, 'token_type': LogEntry.token_type, 'user': LogEntry.user, 'realm': LogEntry.realm, 'administrator': LogEntry.administrator, 'action_detail': LogEntry.action_detail, 'info': LogEntry.info, 'privacyidea_server': LogEntry.privacyidea_server, 'client': LogEntry.client, 'log_level': LogEntry.loglevel, 'policies': LogEntry.policies, 'clearance_level': LogEntry.clearance_level, 'thread_id': LogEntry.thread_id, 'container_serial': LogEntry.container_serial, 'container_type': LogEntry.container_type} return sortname.get(key)
[docs] def csv_generator(self, param: dict | None = None, admin_params: dict | None = None, user=None, timelimit: datetime.timedelta | None = None): """ Returns the audit log as csv file. :param timelimit: Limit the number of dumped entries by time :param param: The request parameters :param admin_params: Optional admin parameters containing the admin name, admin realm and a list of realms the admin is allowed to see, such as :: {"admin": "admin_name", "admin_realm": "realm_of_the_admin", "allowed_audit_realms": ["realm1", "realm2"]} :param user: The user, who issued the request :return: None. It yields results as a generator """ filter_condition = self._create_filter(param, admin_params=admin_params, timelimit=timelimit) stmt = select(LogEntry).where(filter_condition).order_by(LogEntry.date) logentries = self.session.scalars(stmt).all() for le in logentries: audit_dict = self.audit_entry_to_dict(le) yield ",".join([f"'{x!s}'" for x in audit_dict.values()]) + "\n"
[docs] def get_count(self, search_dict, timedelta=None, success=None): # create filter condition filter_condition = self._create_filter(search_dict) conditions = [filter_condition] if success is not None: conditions.append(LogEntry.success == int(is_true(success))) if timedelta is not None: conditions.append(LogEntry.date >= datetime.datetime.now() - timedelta) if conditions: filter_condition = and_(*conditions) else: filter_condition = and_(True, *conditions) log_count = self.session.query(LogEntry).filter(filter_condition).count() return log_count
[docs] def search(self, search_dict: dict, admin_params: dict | None = None, page_size: int = 15, page: int = 1, sortorder: str = "asc", timelimit: datetime.timedelta | None = None): """ This function returns the audit log as a Pagination object. :param search_dict: Filter parameters that are concatenated with a logical AND :param admin_params: Optional admin parameters containing the admin name, admin realm and a list of realms the admin is allowed to see, such as :: {"admin": "admin_name", "admin_realm": "realm_of_the_admin", "allowed_audit_realms": ["realm1", "realm2"]} :param page_size: Number of entries per page :param page: The page number :param sortorder: "asc" - ascending or "desc" - descending :param timelimit: Only audit entries newer than this timedelta will be searched """ page = page page_size = page_size paging_object = Paginate() paging_object.page = page paging_object.total = self.get_total(search_dict, admin_params=admin_params, timelimit=timelimit) if page > 1: paging_object.prev = page - 1 if paging_object.total > (page_size * page): paging_object.next = page + 1 auditIter = self.search_query(search_dict, admin_params=admin_params, page_size=page_size, page=page, sortorder=sortorder, timelimit=timelimit) while True: try: le = next(auditIter) # Fill the list paging_object.auditdata.append(self.audit_entry_to_dict(le)) except StopIteration as _e: log.debug("Iteration stopped.") break except UnicodeDecodeError as _e: # Unfortunately if one of the audit entries fails, the whole # iteration stops and we return an empty paging_object. # TODO: Check if we can return the other entries in the auditIter # or some meaningful error for the user. log.warning('Could not read audit log entry! ' 'Possible database encoding mismatch.') log.debug(f"{traceback.format_exc()!s}") return paging_object
[docs] def search_query(self, search_dict: dict, admin_params: dict | None = None, page_size: int = 15, page: int = 1, sortorder: str = "asc", sortname: str = "number", timelimit: datetime.timedelta | None = None): """ This function returns the audit log as an iterator on the result :param search_dict: Filter parameters that are concatenated with a logical AND :param admin_params: Optional admin parameters containing the admin name, admin realm and a list of realms the admin is allowed to see, such as :: {"admin": "admin_name", "admin_realm": "realm_of_the_admin", "allowed_audit_realms": ["realm1", "realm2"]} :param page_size: Number of entries per page :param page: The page number :param sortorder: "asc" - ascending or "desc" - descending :param sortname: The column name to sort after. E.g. "number", "date", "user", ... :param timelimit: Only audit entries newer than this timedelta will be searched :type timelimit: timedelta """ logentries = None try: limit = page_size offset = (page - 1) * limit # create filter condition filter_condition = self._create_filter(search_dict, admin_params, timelimit=timelimit) stmt = select(LogEntry).where(filter_condition) if sortorder == "desc": stmt = stmt.order_by( desc(self._get_logentry_attribute("number"))) else: stmt = stmt.order_by( asc(self._get_logentry_attribute("number"))) stmt = stmt.limit(limit).offset(offset) logentries = self.session.scalars(stmt).all() except Exception as exx: # pragma: no cover log.error(f"exception {exx!r}") log.debug(f"{traceback.format_exc()!s}") self.session.rollback() finally: self.session.close() if logentries is None: return iter([]) else: return iter(logentries)
[docs] def clear(self): """ Deletes all entries in the database table. This is only used for test cases! :return: """ stmt = delete(LogEntry) self.session.execute(stmt) self.session.commit()
[docs] def audit_entry_to_dict(self, audit_entry): sig = None if self.sign_data: try: sig = self.sign_object.verify(self._log_to_string(audit_entry), audit_entry.signature, self.verify_old_sig) except UnicodeDecodeError as _e: # TODO: Unless we trace and eliminate the broken unicode in the # audit_entry, we will get issues when packing the response. log.warning('Could not verify log entry! We get invalid values ' 'from the database, please check the encoding.') log.debug(f'{traceback.format_exc()!s}') is_not_missing = self._check_missing(int(audit_entry.id)) # is_not_missing = True audit_dict = OrderedDict() audit_dict['number'] = audit_entry.id audit_dict['date'] = audit_entry.date.isoformat() audit_dict['sig_check'] = "OK" if sig else "FAIL" audit_dict['missing_line'] = "OK" if is_not_missing else "FAIL" audit_dict['action'] = audit_entry.action audit_dict['authentication'] = audit_entry.authentication audit_dict['success'] = audit_entry.success audit_dict['serial'] = audit_entry.serial audit_dict['token_type'] = audit_entry.token_type audit_dict['container_serial'] = audit_entry.container_serial audit_dict['container_type'] = audit_entry.container_type audit_dict['user'] = audit_entry.user audit_dict['realm'] = audit_entry.realm audit_dict['resolver'] = audit_entry.resolver audit_dict['administrator'] = audit_entry.administrator audit_dict['action_detail'] = audit_entry.action_detail audit_dict['info'] = audit_entry.info audit_dict['privacyidea_server'] = audit_entry.privacyidea_server audit_dict['policies'] = audit_entry.policies audit_dict['client'] = audit_entry.client audit_dict['user_agent'] = audit_entry.user_agent audit_dict['user_agent_version'] = audit_entry.user_agent_version audit_dict['log_level'] = audit_entry.loglevel audit_dict['clearance_level'] = audit_entry.clearance_level audit_dict['startdate'] = audit_entry.startdate.isoformat() if audit_entry.startdate else None audit_dict['duration'] = audit_entry.duration.total_seconds() if audit_entry.duration else None audit_dict['thread_id'] = audit_entry.thread_id return audit_dict