# 2018-05-07 Cornelius Kölbel <cornelius.koelbel@netknights.it>
# Use tags in email subject.
# 2017-10-27 Cornelius Kölbel <cornelius.koelbel@netknights.it>
# Add additional tags for notification: date, time, client_ip,
# ua_string, ua_browser
# 2016-10-12 Cornelius Kölbel <cornelius.koelbel@netknights.it>
# Add tokentype, tokenrealm and serial
# Add multi and regexp
# 2016-07-18 Cornelius Kölbel <cornelius.koelbel@netknights.it>
# Add notification conditions
# 2016-05-06 Cornelius Kölbel <cornelius.koelbel@netknights.it>
# Initial writup
#
# License: AGPLv3
# (c) 2016. Cornelius Kölbel
#
# This code is free software; you can redistribute it and/or
# modify it under the terms of the GNU AFFERO GENERAL PUBLIC LICENSE
# License as published by the Free Software Foundation; either
# version 3 of the License, or any later version.
#
# This code is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU AFFERO GENERAL PUBLIC LICENSE for more details.
#
# You should have received a copy of the GNU Affero General Public
# License along with this program. If not, see <http://www.gnu.org/licenses/>.
#
#
__doc__ = """This is the event handler module for user notifications.
It can be bound to each event and can perform the action:
* sendmail: Send an email to the user/token owner
* sendsms: We can also notify the user with an SMS.
* savefile: Create a file which can be processed later
The module is tested in tests/test_lib_eventhandler_usernotification.py
"""
import logging
import os
import traceback
from email.mime.image import MIMEImage
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from urllib.request import urlopen
from privacyidea.lib import _
from privacyidea.lib.auth import get_db_admin, get_all_db_admins
from privacyidea.lib.crypto import get_alphanum_str
from privacyidea.lib.eventhandler.base import BaseEventHandler
from privacyidea.lib.framework import get_app_config_value
from privacyidea.lib.smsprovider.SMSProvider import get_smsgateway
from privacyidea.lib.smsprovider.SMSProvider import send_sms_identifier
from privacyidea.lib.smtpserver import get_smtpservers
from privacyidea.lib.smtpserver import send_email_identifier
from privacyidea.lib.token import get_tokens
from privacyidea.lib.user import User, get_user_list, is_attribute_at_all
from privacyidea.lib.utils import create_tag_dict, to_unicode, is_true
log = logging.getLogger(__name__)
DEFAULT_BODY = """
Hello {user},
the administrator {admin}@{realm} performed the action
{action} on your token {serial}.
To check your tokens you may login to the Web UI:
{url}
"""
[docs]
class NOTIFY_TYPE:
"""
Allowed token owner
"""
TOKENOWNER = "tokenowner"
LOGGED_IN_USER = "logged_in_user"
INTERNAL_ADMIN = "internal admin"
ADMIN_REALM = "admin realm"
EMAIL = "email"
NO_REPLY_TO = ""
[docs]
class UserNotificationEventHandler(BaseEventHandler):
"""
An Eventhandler needs to return a list of actions, which it can handle.
It also returns a list of allowed action and conditions
It returns an identifier, which can be used in the eventhandling definitions
"""
identifier = "UserNotification"
description = "This eventhandler notifies the user about actions on his tokens"
@property
def allowed_positions(self):
"""
This returns the allowed positions of the event handler definition.
:return: list of allowed positions
"""
return ["post", "pre"]
[docs]
@staticmethod
def attach_qr(body, url_img, mimetype, serial):
"""
Attach the QR-Code image to the email body.
:param body: email body
:param url_img: URL of the QR-Code image
:param mimetype: mimetype of the email
:param serial: serial number of the token
:return: body with the attached QR-Code image
"""
# get the image part of the url
url = urlopen(url_img) # nosec B310 # no user input
mail_body = MIMEMultipart('related')
mail_body.attach(MIMEText(body, mimetype))
mail_img = MIMEImage(url.read())
mail_img.add_header('Content-ID', '<token_image>')
mail_img.add_header('Content-Disposition',
f'inline; filename="{serial}.png"')
mail_body.attach(mail_img)
body = mail_body
return body
@property
def actions(self):
"""
This method returns a dictionary of allowed actions and possible
options in this handler module.
:return: dict with actions
"""
smtpserver_objs = get_smtpservers()
smsgateway_dicts = get_smsgateway()
smsgateways = [sms.identifier for sms in smsgateway_dicts]
smtpservers = [s.config.identifier for s in smtpserver_objs]
actions = {
"sendmail": {
"emailconfig": {
"type": "str",
"required": True,
"description": _("Send notification email via this email server."),
"value": smtpservers},
"mimetype": {
"type": "str",
"description": _("Either send email as plain text or HTML."),
"value": ["plain", "html"]},
"attach_qrcode": {
"type": "bool",
"description": _("Send QR-Code image as an attachment "
"(cid URL: token_image)")},
"subject": {
"type": "str",
"required": False,
"description": _("The subject of the mail that is sent.")},
"reply_to": {
"type": "str",
"required": False,
"description": _("The Reply-To header in the sent email."),
"value": [
NOTIFY_TYPE.NO_REPLY_TO,
NOTIFY_TYPE.TOKENOWNER,
NOTIFY_TYPE.LOGGED_IN_USER,
NOTIFY_TYPE.INTERNAL_ADMIN,
NOTIFY_TYPE.ADMIN_REALM,
NOTIFY_TYPE.EMAIL]},
"reply_to " + NOTIFY_TYPE.ADMIN_REALM: {
"type": "str",
"value": get_app_config_value("SUPERUSER_REALM", []),
"visibleIf": "reply_to",
"visibleValue": NOTIFY_TYPE.ADMIN_REALM},
"reply_to " + NOTIFY_TYPE.INTERNAL_ADMIN: {
"type": "str",
"value": [a.username for a in
get_all_db_admins()],
"visibleIf": "reply_to",
"visibleValue":
NOTIFY_TYPE.INTERNAL_ADMIN},
"reply_to " + NOTIFY_TYPE.EMAIL: {
"type": "str",
"description": _("Any email address, to which the notification "
"should be sent."),
"visibleIf": "reply_to",
"visibleValue": NOTIFY_TYPE.EMAIL},
"reply_to " + NOTIFY_TYPE.TOKENOWNER: {
"type": "str",
"description": _(
"You can enter a user attribute here to use an email address other than the default."),
"required": False,
"visibleIf": "reply_to",
"visibleValue": NOTIFY_TYPE.TOKENOWNER},
"reply_to " + NOTIFY_TYPE.LOGGED_IN_USER: {
"type": "str",
"description": _(
"You can enter a user attribute here to use an email address other than the default."),
"required": False,
"visibleIf": "reply_to",
"visibleValue": NOTIFY_TYPE.LOGGED_IN_USER},
"body": {
"type": "text",
"required": False,
"description": _("The template of the mail body that will be sent. It may contain the following "
"tags as specified in the documentation: <code>{admin}, {realm}, {action}, "
"{serial}, {url}, {user}, {givenname}, {surname}, {username}, {userrealm}, "
"{tokentype}, {tokendescription}, {registrationcode}, {recipient_givenname}, "
"{recipient_surname}, {googleurl_value}, {googleurl_img}, {pushurl_value}, "
"{pushurl_img}, {container_url_value}, {container_url_img}, {time}, {date}, "
"{client_ip}, {ua_browser}, {ua_string}, {pin}."
"</code>")},
"To": {
"type": "str",
"required": True,
"description": _("Send notification to this user."),
"value": [
NOTIFY_TYPE.TOKENOWNER,
NOTIFY_TYPE.LOGGED_IN_USER,
NOTIFY_TYPE.INTERNAL_ADMIN,
NOTIFY_TYPE.ADMIN_REALM,
NOTIFY_TYPE.EMAIL]},
"To " + NOTIFY_TYPE.ADMIN_REALM: {
"type": "str",
"value": get_app_config_value("SUPERUSER_REALM", []),
"visibleIf": "To",
"visibleValue": NOTIFY_TYPE.ADMIN_REALM},
"To " + NOTIFY_TYPE.INTERNAL_ADMIN: {
"type": "str",
"value": [a.username for a in
get_all_db_admins()],
"visibleIf": "To",
"visibleValue":
NOTIFY_TYPE.INTERNAL_ADMIN},
"To " + NOTIFY_TYPE.EMAIL: {
"type": "str",
"description": _("Any email address, to which the notification "
"should be sent."),
"visibleIf": "To",
"visibleValue": NOTIFY_TYPE.EMAIL},
"To " + NOTIFY_TYPE.TOKENOWNER: {
"type": "str",
"description": _(
"You can enter a user attribute here to use an email address other than the default."),
"required": False,
"visibleIf": "To",
"visibleValue": NOTIFY_TYPE.TOKENOWNER},
"To " + NOTIFY_TYPE.LOGGED_IN_USER: {
"type": "str",
"description": _(
"You can enter a user attribute here to use an email address other than the default."),
"required": False,
"visibleIf": "To",
"visibleValue": NOTIFY_TYPE.LOGGED_IN_USER}
},
"sendsms": {
"smsconfig": {
"type": "str",
"required": True,
"description": _("Send the user notification via a "
"predefined SMS gateway."),
"value": smsgateways},
"body": {"type": "text",
"required": False,
"description": _("The text template of the SMS. It may contain the following tags "
"as specified in the documentation: <code>{admin}, {realm}, {action}, "
"{serial}, {url}, {user}, {givenname}, {surname}, {username}, {userrealm}, "
"{tokentype}, {tokendescription}, {registrationcode}, {recipient_givenname}, "
"{recipient_surname}, {googleurl_value}, {googleurl_img}, {pushurl_value}, "
"{pushurl_img}, {container_url_value}, {container_url_img}, {time}, {date}, "
"{client_ip}, {ua_browser}, {ua_string}, "
"{pin}.</code>")},
"To": {"type": "str",
"required": True,
"description": _("Send notification to this user."),
"value": [NOTIFY_TYPE.TOKENOWNER]}
},
"savefile": {
"body": {
"type": "text",
"required": True,
"description": _("This is the template content of the new file. It may contain the following tags "
"as specified in the documentation: <code>{admin}, {realm}, {action}, "
"{serial}, {url}, {user}, {givenname}, {surname}, {username}, {userrealm}, "
"{tokentype}, {tokendescription}, {registrationcode}, {recipient_givenname}, "
"{recipient_surname}, {googleurl_value}, {googleurl_img}, {pushurl_value}, "
"{pushurl_img}, {container_url_value}, {container_url_img}, {time}, {date}, "
"{client_ip}, {ua_browser}, {ua_string}, {pin}."
"</code>")},
"filename": {
"type": "str",
"required": True,
"description": _("The filename of the notification. Existing files "
"are overwritten. The name can contain tags as specified "
"in the documentation and can also contain the tag {random}.")}
}
}
return actions
[docs]
def do(self, action, options=None):
"""
This method executes the defined action in the given event.
:param action:
:param options: Contains the flask parameters g, request, response
and the handler_def configuration
:type options: dict
:return:
"""
ret = True
g = options.get("g")
request = options.get("request")
response = options.get("response")
content = self._get_response_content(response)
handler_def = options.get("handler_def")
handler_options = handler_def.get("options", {})
notify_type = handler_options.get("To", NOTIFY_TYPE.TOKENOWNER)
reply_to_type = handler_options.get("reply_to")
try:
logged_in_user = g.logged_in_user
except Exception:
logged_in_user = {}
tokenowner = self._get_tokenowner(request)
log.debug(f"Executing event for action {action}, user {tokenowner}, logged_in_user {logged_in_user}")
# Determine recipient
recipient = None
reply_to = None
if notify_type == NOTIFY_TYPE.TOKENOWNER and not tokenowner.is_empty():
email = handler_options.get("To " + NOTIFY_TYPE.TOKENOWNER, 'email')
user_info = tokenowner.get_specific_info(["givenname", "surname", email, "mobile"])
recipient = {
"givenname": user_info.get("givenname"),
"surname": user_info.get("surname"),
"username": tokenowner.login,
"userrealm": tokenowner.realm,
"email": user_info.get(email),
"mobile": user_info.get("mobile")
}
elif notify_type == NOTIFY_TYPE.INTERNAL_ADMIN:
username = handler_options.get("To " + NOTIFY_TYPE.INTERNAL_ADMIN)
internal_admin = get_db_admin(username)
recipient = {
"givenname": username,
"email": internal_admin.email if internal_admin else ""
}
elif notify_type == NOTIFY_TYPE.ADMIN_REALM:
# Send emails to all the users in the specified admin realm
admin_realm = handler_options.get("To " + NOTIFY_TYPE.ADMIN_REALM)
attr = is_attribute_at_all()
ulist = get_user_list({"realm": admin_realm}, include_custom_attributes=attr)
# create a list of all user-emails, if the user has an email
emails = [u.get("email") for u in ulist if u.get("email")]
recipient = {
"givenname": f"admin of realm {admin_realm}",
"email": emails
}
elif notify_type == NOTIFY_TYPE.LOGGED_IN_USER:
# Send notification to the logged in user
if logged_in_user.get("username") and not logged_in_user.get(
"realm"):
# internal admins have no realm
internal_admin = get_db_admin(logged_in_user.get("username"))
if internal_admin:
recipient = {
"givenname": logged_in_user.get("username"),
"email": internal_admin.email if internal_admin else ""
}
else:
# Try to find the user in the specified realm
user = User(logged_in_user.get("username"),
logged_in_user.get("realm"))
email = handler_options.get("To " + NOTIFY_TYPE.LOGGED_IN_USER, 'email')
if user:
user_info = user.get_specific_info(["givenname", "surname", email, "mobile"])
recipient = {
"givenname": user_info.get("givenname"),
"surname": user_info.get("surname"),
"email": user_info.get(email),
"mobile": user_info.get("mobile")
}
elif notify_type == NOTIFY_TYPE.EMAIL:
email = handler_options.get("To " + NOTIFY_TYPE.EMAIL, "").split(",")
recipient = {
"email": email
}
if recipient or action.lower() == "savefile":
# In case of "savefile" we do not need a recipient
# Collect all data
body = handler_options.get("body") or DEFAULT_BODY
if body.startswith("file:"): # pragma no cover
# We read the template from the file.
filename = body[5:]
try:
with open(filename, encoding="utf-8") as f:
body = f.read()
except Exception as e:
log.warning(f"Failed to read email template from file {filename}: {e}")
log.debug(traceback.format_exc())
subject = handler_options.get("subject") or "An action was performed on your token."
serial = (request.all_data.get("serial")
or content.get("detail", {}).get("serial")
or g.audit_object.audit_data.get("serial"))
container_serial = request.all_data.get("container_serial")
registrationcode = content.get("detail", {}).get("registrationcode")
pin = content.get("detail", {}).get("pin")
googleurl_value = content.get("detail", {}).get("googleurl",
{}).get("value")
googleurl_img = content.get("detail", {}).get("googleurl",
{}).get("img")
pushurl_value = content.get("detail", {}).get("pushurl",
{}).get("value")
pushurl_img = content.get("detail", {}).get("pushurl",
{}).get("img")
container_content = content.get("result", {}).get("value", {})
container_url_value = container_url_img = None
if isinstance(container_content, dict):
container_url_value = container_content.get("container_url", {}).get("value")
container_url_img = container_content.get("container_url", {}).get("img")
tokentype = None
tokendescription = None
if serial:
tokens = get_tokens(serial=serial)
if tokens:
tokentype = tokens[0].get_tokentype()
tokendescription = tokens[0].token.description
else:
token_objects = get_tokens(user=tokenowner)
serial = ','.join([tok.get_serial() for tok in token_objects])
tags = create_tag_dict(logged_in_user=logged_in_user,
request=request,
client_ip=g.client_ip,
pin=pin,
googleurl_value=googleurl_value,
googleurl_img=googleurl_img,
pushurl_value=pushurl_value,
pushurl_img=pushurl_img,
recipient=recipient,
tokenowner=tokenowner,
serial=serial,
container_serial=container_serial,
tokentype=tokentype,
tokendescription=tokendescription,
registrationcode=registrationcode,
escape_html=(action.lower() == "sendmail"
and handler_options.get("mimetype", "").lower() == "html"),
container_url_value=container_url_value,
container_url_img=container_url_img)
body = to_unicode(body).format(**tags)
subject = subject.format(**tags)
# Send notification
if action.lower() == "sendmail":
if not recipient:
log.warning("Unable to determine the recipient for the user notification!")
if reply_to_type:
if reply_to_type == NOTIFY_TYPE.NO_REPLY_TO:
reply_to = ""
elif reply_to_type == NOTIFY_TYPE.TOKENOWNER and not tokenowner.is_empty():
email = handler_options.get("reply_to " + NOTIFY_TYPE.TOKENOWNER, 'email')
reply_to = tokenowner.info.get(email)
elif reply_to_type == NOTIFY_TYPE.INTERNAL_ADMIN:
username = handler_options.get("reply_to " + NOTIFY_TYPE.INTERNAL_ADMIN)
internal_admin = get_db_admin(username)
reply_to = internal_admin.email if internal_admin else ""
elif reply_to_type == NOTIFY_TYPE.ADMIN_REALM:
# Adds all email addresses from a specific admin realm to the reply-to-header
admin_realm = handler_options.get("reply_to " + NOTIFY_TYPE.ADMIN_REALM)
attr = is_attribute_at_all()
ulist = get_user_list({"realm": admin_realm}, include_custom_attributes=attr)
# create a list of all user-emails, if the user has an email
emails = [u.get("email") for u in ulist if u.get("email")]
reply_to = ",".join(emails)
elif reply_to_type == NOTIFY_TYPE.LOGGED_IN_USER:
# Add email address from the logged in user into the reply-to header
email = handler_options.get("reply_to " + NOTIFY_TYPE.LOGGED_IN_USER, 'email')
if logged_in_user.get("username") and not logged_in_user.get(
"realm"):
# internal admins have no realm
internal_admin = get_db_admin(logged_in_user.get("username"))
if internal_admin:
reply_to = internal_admin.email if internal_admin else ""
else:
# Try to find the user in the specified realm
user = User(logged_in_user.get("username"),
logged_in_user.get("realm"))
if user:
reply_to = user.get_specific_info([email]).get(email) if user else ""
elif reply_to_type == NOTIFY_TYPE.EMAIL:
email = handler_options.get("reply_to " + NOTIFY_TYPE.EMAIL, "").split(",")
reply_to = email[0]
else:
log.warning(f"Unable to determine the email for the reply-to header: {handler_def}")
emailconfig = handler_options.get("emailconfig")
mimetype = handler_options.get("mimetype", "plain")
useremail = recipient.get("email")
attach_qrcode = is_true(handler_options.get("attach_qrcode"))
if attach_qrcode:
if googleurl_img:
body = self.attach_qr(body, googleurl_img, mimetype, serial)
elif pushurl_img:
body = self.attach_qr(body, pushurl_img, mimetype, serial)
elif container_url_img:
body = self.attach_qr(body, container_url_img, mimetype, serial)
try:
ret = send_email_identifier(emailconfig,
recipient=useremail,
subject=subject, body=body,
reply_to=reply_to,
mimetype=mimetype)
except Exception as e: # pragma: no cover
log.error(f"Failed to send email: {e}")
self.run_details = str(e)
ret = False
if ret:
log.info(f"Sent a notification email to user {recipient}")
else:
log.warning(f"Failed to send a notification email to user {recipient}")
self.run_details = f"Failed: {useremail}."
elif action.lower() == "savefile":
spooldir = get_app_config_value("PI_NOTIFICATION_HANDLER_SPOOLDIRECTORY",
"/var/lib/privacyidea/notifications/")
filename = handler_options.get("filename")
random = get_alphanum_str(16)
filename = filename.format(random=random, **tags).lstrip(os.path.sep)
outfile = os.path.normpath(os.path.join(spooldir, filename))
if not outfile.startswith(spooldir):
log.error(f'Cannot write outside of spooldir {spooldir}!')
else:
try:
with open(outfile, "w") as f:
f.write(body)
except Exception as err:
log.error(f"Failed to write notification file: {err}")
elif action.lower() == "sendsms":
if not recipient:
log.warning("Was not able to determine the recipient for the user")
smsconfig = handler_options.get("smsconfig")
userphone = recipient.get("mobile")
try:
ret = send_sms_identifier(smsconfig, userphone, body)
except Exception as e:
log.error(f"Failed to send sms: {e}")
ret = False
if ret:
log.info(f"Sent a notification sms to user {recipient}")
else:
log.warning(f"Failed to send a notification sms to user {recipient}")
return ret