Source code for privacyidea.lib.eventhandler.usernotification

# -*- coding: utf-8 -*-
#
#  2016-10-12 Cornelius Kölbel <cornelius.koelbel@netknights.it>
#             Add tokentype, tokenrealm and serial
#             Add multi and regexp
#  2016-07-18 Cornelius Kölbel <cornelius.koelbel@netknights.it>
#             Add notification conditions
#  2016-05-06 Cornelius Kölbel <cornelius.koelbel@netknights.it>
#             Initial writup
#
# License:  AGPLv3
# (c) 2016. Cornelius Kölbel
#
# This code is free software; you can redistribute it and/or
# modify it under the terms of the GNU AFFERO GENERAL PUBLIC LICENSE
# License as published by the Free Software Foundation; either
# version 3 of the License, or any later version.
#
# This code is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU AFFERO GENERAL PUBLIC LICENSE for more details.
#
# You should have received a copy of the GNU Affero General Public
# License along with this program.  If not, see <http://www.gnu.org/licenses/>.
#
#
__doc__ = """This is the event handler module for user notifications.
It can be bound to each event and can perform the action:

  * sendmail: Send an email to the user/token owner
  * sendsms: We can also notify the user with an SMS.

The module is tested in tests/test_lib_events.py
"""
from privacyidea.lib.eventhandler.base import BaseEventHandler
from privacyidea.lib.smtpserver import send_email_identifier
from privacyidea.lib.smsprovider.SMSProvider import send_sms_identifier
from privacyidea.lib.auth import ROLE, get_db_admins, get_db_admin
from privacyidea.lib.token import get_token_owner, get_tokens
from privacyidea.lib.realm import get_realms
from privacyidea.lib.smtpserver import get_smtpservers
from privacyidea.lib.smsprovider.SMSProvider import get_smsgateway
from privacyidea.lib.config import get_token_types
from privacyidea.lib.user import User, get_user_list
from gettext import gettext as _
from flask import current_app
import json
import logging
import re
log = logging.getLogger(__name__)

DEFAULT_BODY = """
Hello {user},

the administrator {admin}@{realm} performed the action
{action} on your token {serial}.

To check your tokens you may login to the Web UI:
{url}
"""


[docs]class NOTIFY_TYPE(object): """ Allowed token owner """ TOKENOWNER = "tokenowner" LOGGED_IN_USER = "logged_in_user" INTERNAL_ADMIN = "internal admin" ADMIN_REALM = "admin realm" EMAIL = "email"
[docs]class UserNotificationEventHandler(BaseEventHandler): """ An Eventhandler needs to return a list of actions, which it can handle. It also returns a list of allowed action and conditions It returns an identifier, which can be used in the eventhandlig definitions """ identifier = "UserNotification" description = "This eventhandler notifies the user about actions on his " \ "tokens" @property def actions(cls): """ This method returns a dictionary of allowed actions and possible options in this handler module. :return: dict with actions """ smtpserver_objs = get_smtpservers() smsgateway_dicts = get_smsgateway() smsgateways = [sms.identifier for sms in smsgateway_dicts] smtpservers = [s.config.identifier for s in smtpserver_objs] actions = {"sendmail": {"emailconfig": {"type": "str", "required": True, "description": _("Send notification " "email via this " "email server."), "value": smtpservers}, "subject": {"type": "str", "required": False, "description": _("The subject of " "the mail that " "is sent.") }, "body": {"type": "text", "required": False, "description": _("The body of the " "mail that is " "sent.")}, "To": {"type": "str", "required": True, "description": _("Send notification to " "this user."), "value": [ NOTIFY_TYPE.TOKENOWNER, NOTIFY_TYPE.LOGGED_IN_USER, NOTIFY_TYPE.INTERNAL_ADMIN, NOTIFY_TYPE.ADMIN_REALM, NOTIFY_TYPE.EMAIL]}, "To "+NOTIFY_TYPE.ADMIN_REALM: { "type": "str", "value": current_app.config.get( "SUPERUSER_REALM", []), "visibleIf": "To", "visibleValue": NOTIFY_TYPE.ADMIN_REALM}, "To "+NOTIFY_TYPE.INTERNAL_ADMIN: { "type": "str", "value": [a.username for a in get_db_admins()], "visibleIf": "To", "visibleValue": NOTIFY_TYPE.INTERNAL_ADMIN}, "To "+NOTIFY_TYPE.EMAIL: { "type": "str", "description": _("Any email address, to " "which the notification " "should be sent."), "visibleIf": "To", "visibleValue": NOTIFY_TYPE.EMAIL} }, "sendsms": {"smsconfig": {"type": "str", "required": True, "description": _("Send the user " "notification via a " "predefined SMS " "gateway."), "value": smsgateways}, "body": {"type": "text", "required": False, "description": _("The text of the " "SMS.")}, "To": {"type": "str", "required": True, "description": _("Send notification to " "this user."), "value": [NOTIFY_TYPE.TOKENOWNER]} } } return actions @property def conditions(cls): """ The UserNotification can filter for conditions like * type of logged in user and * successful or failed value.success allowed types are str, multi, text, regexp :return: dict """ realms = get_realms() cond = { "realm": { "type": "str", "desc": _("The user realm, for which this event should apply."), "value": realms.keys() }, "tokenrealm": { "type": "multi", "desc": _("The token realm, for which this event should " "apply."), "value": [{"name": r} for r in realms.keys()] }, "tokentype": { "type": "multi", "desc": _("The type of the token."), "value": [{"name": r} for r in get_token_types()] }, "logged_in_user": { "type": "str", "desc": _("The logged in user is of the following type."), "value": (ROLE.ADMIN, ROLE.USER) }, "result_value": { "type": "str", "desc": _("The result.value within the response is " "True or False."), "value": ("True", "False") }, "token_locked": { "type": "str", "desc": _("Check if the max failcounter of the token is " "reached."), "value": ("True", "False") }, "serial": { "type": "regexp", "desc": _("Action is triggered, if the serial matches this " "regular expression.") } } return cond @staticmethod def _get_tokenowner(request): user = request.User serial = request.all_data.get("serial") if user.is_empty() and serial: # maybe the user is empty, but a serial was passed. # Then we determine the user by the serial user = get_token_owner(serial) or User() return user
[docs] def check_condition(self, options): """ Check if all conditions are met and if the action should be executed. The the conditions are met, we return "True" :return: True """ res = True g = options.get("g") request = options.get("request") response = options.get("response") e_handler_def = options.get("handler_def") # conditions can be correspnding to the property conditions conditions = e_handler_def.get("conditions") content = json.loads(response.data) user = self._get_tokenowner(request) serial = request.all_data.get("serial") or \ content.get("detail", {}).get("serial") token_obj_list = get_tokens(serial=serial) tokenrealms = [] tokentype = None if token_obj_list: tokenrealms = token_obj_list[0].get_realms() tokentype = token_obj_list[0].get_tokentype() if "realm" in conditions: res = user.realm == conditions.get("realm") if "logged_in_user" in conditions and res: # Determine the role of the user try: logged_in_user = g.logged_in_user user_role = logged_in_user.get("role") except Exception: # A non-logged-in-user is a User, not an admin user_role = ROLE.USER res = user_role == conditions.get("logged_in_user") # Check result_value only if the check condition is still True if "result_value" in conditions and res: res = content.get("result", {}).get("value") == conditions.get( "result_value") # checking of max-failcounter state of the token if "token_locked" in conditions and res: if serial: # There is a distinct serial number tokens = get_tokens(serial=serial) if tokens: token_obj = tokens[0] locked = token_obj.get_failcount() >= \ token_obj.get_max_failcount() res = (conditions.get("token_locked") in ["True", True]) \ == locked else: # check all tokens of the user, if any token is maxfail token_objects = get_tokens(user=user, maxfail=True) if not ','.join([tok.get_serial() for tok in token_objects]): res = False if "tokenrealm" in conditions and res: if tokenrealms: res = False for trealm in tokenrealms: if trealm in conditions.get("tokenrealm").split(","): res = True break if "tokentype" in conditions and res: if tokentype: res = False if tokentype in conditions.get("tokentype").split(","): res = True if "serial" in conditions and res: serial_match = conditions.get("serial") if serial: res = bool(re.match(serial_match, serial)) return res
[docs] def do(self, action, options=None): """ This method executes the defined action in the given event. :param action: :param options: Contains the flask parameters g, request, response and the handler_def configuration :type options: dict :return: """ ret = True g = options.get("g") request = options.get("request") response = options.get("response") content = json.loads(response.data) handler_def = options.get("handler_def") handler_options = handler_def.get("options", {}) notify_type = handler_options.get("To", NOTIFY_TYPE.TOKENOWNER) try: logged_in_user = g.logged_in_user except Exception: logged_in_user = {} tokenowner = self._get_tokenowner(request) log.debug("Executing event for action {0!s}, user {1!s}," "logged_in_user {2!s}".format(action, tokenowner, logged_in_user)) # Determine recipient recipient = None if notify_type == NOTIFY_TYPE.TOKENOWNER and not tokenowner.is_empty(): recipient = { "givenname": tokenowner.info.get("givenname"), "surname": tokenowner.info.get("surname"), "username": tokenowner.login, "userrealm": tokenowner.realm, "email": tokenowner.info.get("email"), "mobile": tokenowner.info.get("mobile") } elif notify_type == NOTIFY_TYPE.INTERNAL_ADMIN: username = handler_options.get("To "+NOTIFY_TYPE.INTERNAL_ADMIN) internal_admin = get_db_admin(username) recipient = { "givenname": username, "email": internal_admin.email if internal_admin else "" } elif notify_type == NOTIFY_TYPE.ADMIN_REALM: # Send emails to all the users in the specified admin realm admin_realm = handler_options.get("To "+NOTIFY_TYPE.ADMIN_REALM) ulist = get_user_list({"realm": admin_realm}) # create a list of all user-emails, if the user has an email emails = [u.get("email") for u in ulist if u.get("email")] recipient = { "givenname": "admin of realm {0!s}".format(admin_realm), "email": emails } elif notify_type == NOTIFY_TYPE.LOGGED_IN_USER: # Send notification to the logged in user if logged_in_user.get("user") and not logged_in_user.get("realm"): # internal admins have no realm internal_admin = get_db_admin(logged_in_user.get("user")) if internal_admin: recipient = { "givenname": logged_in_user.get("user"), "email": internal_admin.email if internal_admin else "" } else: # Try to find the user in the specified realm user_obj = User(logged_in_user.get("user"), logged_in_user.get("realm")) if user_obj: recipient = { "givenname": user_obj.info.get("givenname"), "surname": user_obj.info.get("surname"), "email": user_obj.info.get("email"), "mobile": user_obj.info.get("mobile") } elif notify_type == NOTIFY_TYPE.EMAIL: email = handler_options.get("To "+NOTIFY_TYPE.EMAIL, "").split(",") recipient = { "email": email } else: log.warning("Was not able to determine the recipient for the user " "notification: {0!s}".format(handler_def)) if recipient: # Collect all data body = handler_options.get("body") or DEFAULT_BODY serial = request.all_data.get("serial") or \ content.get("detail", {}).get("serial") or \ g.audit_object.audit_data.get("serial") registrationcode = content.get("detail", {}).get("registrationcode") tokentype = None if serial: tokens = get_tokens(serial=serial) if tokens: tokentype = tokens[0].get_tokentype() else: token_objects = get_tokens(user=tokenowner) serial = ','.join([tok.get_serial() for tok in token_objects]) body = body.format( admin=logged_in_user.get("username"), realm=logged_in_user.get("realm"), action=request.path, serial=serial, url=request.url_root, user=tokenowner.info.get("givenname"), surname=tokenowner.info.get("surname"), givenname=recipient.get("givenname"), username=tokenowner.login, userrealm=tokenowner.realm, tokentype=tokentype, registrationcode=registrationcode, recipient_givenname=recipient.get("givenname"), recipient_surname=recipient.get("surname") ) # Send notification if action.lower() == "sendmail": emailconfig = handler_options.get("emailconfig") useremail = recipient.get("email") subject = handler_options.get("subject") or \ "An action was performed on your token." try: ret = send_email_identifier(emailconfig, recipient=useremail, subject=subject, body=body) except Exception as exx: log.error("Failed to send email: {0!s}".format(exx)) ret = False if ret: log.info("Sent a notification email to user {0}".format( recipient)) else: log.warning("Failed to send a notification email to user " "{0}".format(recipient)) elif action.lower() == "sendsms": smsconfig = handler_options.get("smsconfig") userphone = recipient.get("mobile") try: ret = send_sms_identifier(smsconfig, userphone, body) except Exception as exx: log.error("Failed to send sms: {0!s}".format(exx)) ret = False if ret: log.info("Sent a notification sms to user {0}".format( recipient)) else: log.warning("Failed to send a notification email to user " "{0}".format(recipient)) return ret