411 lines
15 KiB
Python
411 lines
15 KiB
Python
import logging
|
|
|
|
from django.conf import settings
|
|
from django.contrib.auth import get_user_model
|
|
from django.core.mail import get_connection
|
|
from django.core.mail.message import EmailMultiAlternatives
|
|
from django.template.loader import render_to_string
|
|
from django.utils.translation import override
|
|
|
|
from wagtail.coreutils import camelcase_to_underscore
|
|
from wagtail.models import AbstractGroupApprovalTask, Page, TaskState, WorkflowState
|
|
from wagtail.users.models import UserProfile
|
|
|
|
logger = logging.getLogger("wagtail.admin")
|
|
|
|
|
|
class OpenedConnection:
|
|
"""Context manager for mail connections to ensure they are closed when manually opened"""
|
|
|
|
def __init__(self, connection):
|
|
self.connection = connection
|
|
|
|
def __enter__(self):
|
|
self.connection.open()
|
|
return self.connection
|
|
|
|
def __exit__(self, type, value, traceback):
|
|
self.connection.close()
|
|
return self.connection
|
|
|
|
|
|
def send_mail(subject, message, recipient_list, from_email=None, **kwargs):
|
|
"""
|
|
Wrapper around Django's EmailMultiAlternatives as done in send_mail().
|
|
Custom from_email handling and special Auto-Submitted header.
|
|
"""
|
|
if not from_email:
|
|
if hasattr(settings, "WAGTAILADMIN_NOTIFICATION_FROM_EMAIL"):
|
|
from_email = settings.WAGTAILADMIN_NOTIFICATION_FROM_EMAIL
|
|
elif hasattr(settings, "DEFAULT_FROM_EMAIL"):
|
|
from_email = settings.DEFAULT_FROM_EMAIL
|
|
else:
|
|
# We are no longer using the term `webmaster` except in this case, where we continue to match Django's default: https://github.com/django/django/blob/stable/3.2.x/django/conf/global_settings.py#L223
|
|
from_email = "webmaster@localhost"
|
|
|
|
connection = kwargs.get("connection", False) or get_connection(
|
|
username=kwargs.get("auth_user", None),
|
|
password=kwargs.get("auth_password", None),
|
|
fail_silently=kwargs.get("fail_silently", None),
|
|
)
|
|
multi_alt_kwargs = {
|
|
"connection": connection,
|
|
"headers": {
|
|
"Auto-Submitted": "auto-generated",
|
|
},
|
|
"bcc": kwargs.get("bcc", None),
|
|
"cc": kwargs.get("cc", None),
|
|
"reply_to": kwargs.get("reply_to", None),
|
|
}
|
|
mail = EmailMultiAlternatives(
|
|
subject, message, from_email, recipient_list, **multi_alt_kwargs
|
|
)
|
|
html_message = kwargs.get("html_message", None)
|
|
if html_message:
|
|
mail.attach_alternative(html_message, "text/html")
|
|
|
|
return mail.send()
|
|
|
|
|
|
def send_notification(recipient_users, notification, extra_context):
|
|
# Get list of email addresses
|
|
email_recipients = [
|
|
recipient
|
|
for recipient in recipient_users
|
|
if recipient.is_active
|
|
and recipient.email
|
|
and getattr(
|
|
UserProfile.get_for_user(recipient), notification + "_notifications"
|
|
)
|
|
]
|
|
|
|
# Return if there are no email addresses
|
|
if not email_recipients:
|
|
return True
|
|
|
|
# Get template
|
|
template_subject = "wagtailadmin/notifications/" + notification + "_subject.txt"
|
|
template_text = "wagtailadmin/notifications/" + notification + ".txt"
|
|
template_html = "wagtailadmin/notifications/" + notification + ".html"
|
|
|
|
# Common context to template
|
|
context = {
|
|
"settings": settings,
|
|
}
|
|
context.update(extra_context)
|
|
|
|
connection = get_connection()
|
|
|
|
with OpenedConnection(connection) as open_connection:
|
|
# Send emails
|
|
sent_count = 0
|
|
for recipient in email_recipients:
|
|
# update context with this recipient
|
|
context["user"] = recipient
|
|
|
|
# Translate text to the recipient language settings
|
|
with override(recipient.wagtail_userprofile.get_preferred_language()):
|
|
# Get email subject and content
|
|
email_subject = render_to_string(template_subject, context).strip()
|
|
email_content = render_to_string(template_text, context).strip()
|
|
|
|
kwargs = {}
|
|
if getattr(settings, "WAGTAILADMIN_NOTIFICATION_USE_HTML", False):
|
|
kwargs["html_message"] = render_to_string(template_html, context)
|
|
|
|
try:
|
|
# Send email
|
|
send_mail(
|
|
email_subject,
|
|
email_content,
|
|
[recipient.email],
|
|
connection=open_connection,
|
|
**kwargs,
|
|
)
|
|
sent_count += 1
|
|
except Exception:
|
|
logger.exception(
|
|
"Failed to send notification email '%s' to %s",
|
|
email_subject,
|
|
recipient.email,
|
|
)
|
|
|
|
return sent_count == len(email_recipients)
|
|
|
|
|
|
class Notifier:
|
|
"""Generic class for sending event notifications: callable, intended to be connected to a signal to send
|
|
notifications using rendered templates."""
|
|
|
|
notification = ""
|
|
template_directory = "wagtailadmin/notifications/"
|
|
|
|
def __init__(self, valid_classes):
|
|
# the classes of the calling instance that the notifier can handle
|
|
self.valid_classes = valid_classes
|
|
|
|
def can_handle(self, instance, **kwargs):
|
|
"""Returns True if the Notifier can handle sending the notification from the instance, otherwise False"""
|
|
return isinstance(instance, self.valid_classes)
|
|
|
|
def get_valid_recipients(self, instance, **kwargs):
|
|
"""Returns a set of the final list of recipients for the notification message"""
|
|
return set()
|
|
|
|
def get_template_base_prefix(self, instance, **kwargs):
|
|
return camelcase_to_underscore(type(instance).__name__) + "_"
|
|
|
|
def get_context(self, instance, **kwargs):
|
|
return {"settings": settings}
|
|
|
|
def get_template_set(self, instance, **kwargs):
|
|
"""Return a dictionary of template paths for the templates: by default, a text message"""
|
|
template_base = self.get_template_base_prefix(instance) + self.notification
|
|
|
|
template_text = self.template_directory + template_base + ".txt"
|
|
|
|
return {
|
|
"text": template_text,
|
|
}
|
|
|
|
def send_notifications(self, template_set, context, recipients, **kwargs):
|
|
raise NotImplementedError
|
|
|
|
def __call__(self, instance=None, **kwargs):
|
|
"""Send notifications from an instance (intended to be the signal sender), returning True if all sent correctly
|
|
and False otherwise"""
|
|
|
|
if not self.can_handle(instance, **kwargs):
|
|
return False
|
|
|
|
recipients = self.get_valid_recipients(instance, **kwargs)
|
|
|
|
if not recipients:
|
|
return True
|
|
|
|
template_set = self.get_template_set(instance, **kwargs)
|
|
|
|
context = self.get_context(instance, **kwargs)
|
|
|
|
return self.send_notifications(template_set, context, recipients, **kwargs)
|
|
|
|
|
|
class EmailNotificationMixin:
|
|
"""Mixin for sending email notifications upon events"""
|
|
|
|
def get_recipient_users(self, instance, **kwargs):
|
|
"""Gets the ideal set of recipient users, without accounting for notification preferences or missing email addresses"""
|
|
|
|
return set()
|
|
|
|
def get_valid_recipients(self, instance, **kwargs):
|
|
"""Filters notification recipients to those allowing the notification type on their UserProfile, and those
|
|
with an email address"""
|
|
return {
|
|
recipient
|
|
for recipient in self.get_recipient_users(instance, **kwargs)
|
|
if recipient
|
|
and recipient.is_active
|
|
and recipient.email
|
|
and getattr(
|
|
UserProfile.get_for_user(recipient),
|
|
self.notification + "_notifications",
|
|
)
|
|
}
|
|
|
|
def get_template_set(self, instance, **kwargs):
|
|
"""Return a dictionary of template paths for the templates for the email subject and the text and html
|
|
alternatives"""
|
|
template_base = self.get_template_base_prefix(instance) + self.notification
|
|
|
|
template_subject = self.template_directory + template_base + "_subject.txt"
|
|
template_text = self.template_directory + template_base + ".txt"
|
|
template_html = self.template_directory + template_base + ".html"
|
|
|
|
return {
|
|
"subject": template_subject,
|
|
"text": template_text,
|
|
"html": template_html,
|
|
}
|
|
|
|
def send_emails(self, template_set, context, recipients, **kwargs):
|
|
connection = get_connection()
|
|
sent_count = 0
|
|
try:
|
|
with OpenedConnection(connection) as open_connection:
|
|
# Send emails
|
|
for recipient in recipients:
|
|
# update context with this recipient
|
|
context["user"] = recipient
|
|
|
|
# Translate text to the recipient language settings
|
|
with override(
|
|
recipient.wagtail_userprofile.get_preferred_language()
|
|
):
|
|
# Get email subject and content
|
|
email_subject = render_to_string(
|
|
template_set["subject"], context
|
|
).strip()
|
|
email_content = render_to_string(
|
|
template_set["text"], context
|
|
).strip()
|
|
|
|
kwargs = {}
|
|
if getattr(settings, "WAGTAILADMIN_NOTIFICATION_USE_HTML", False):
|
|
kwargs["html_message"] = render_to_string(
|
|
template_set["html"], context
|
|
)
|
|
|
|
try:
|
|
# Send email
|
|
send_mail(
|
|
email_subject,
|
|
email_content,
|
|
[recipient.email],
|
|
connection=open_connection,
|
|
**kwargs,
|
|
)
|
|
sent_count += 1
|
|
except Exception:
|
|
logger.exception(
|
|
"Failed to send notification email '%s' to %s",
|
|
email_subject,
|
|
recipient.email,
|
|
)
|
|
except (TimeoutError, ConnectionError):
|
|
logger.exception("Mail connection error, notification sending skipped")
|
|
|
|
return sent_count == len(recipients)
|
|
|
|
def send_notifications(self, template_set, context, recipients, **kwargs):
|
|
return self.send_emails(template_set, context, recipients, **kwargs)
|
|
|
|
|
|
class BaseWorkflowStateEmailNotifier(EmailNotificationMixin, Notifier):
|
|
"""A base notifier to send email updates for WorkflowState events"""
|
|
|
|
def __init__(self):
|
|
super().__init__((WorkflowState,))
|
|
|
|
def get_context(self, workflow_state: WorkflowState, **kwargs):
|
|
context = super().get_context(workflow_state, **kwargs)
|
|
context["workflow"] = workflow_state.workflow
|
|
context["object"] = workflow_state.content_object
|
|
context["model_name"] = context["object"]._meta.verbose_name
|
|
if isinstance(context["object"], Page):
|
|
context["page"] = context["object"].specific
|
|
return context
|
|
|
|
|
|
class WorkflowStateApprovalEmailNotifier(BaseWorkflowStateEmailNotifier):
|
|
"""A notifier to send email updates for WorkflowState approval events"""
|
|
|
|
notification = "approved"
|
|
|
|
def get_recipient_users(self, workflow_state: WorkflowState, **kwargs):
|
|
triggering_user = kwargs.get("user", None)
|
|
recipients = set()
|
|
requested_by = workflow_state.requested_by
|
|
if requested_by is not None and requested_by != triggering_user:
|
|
recipients = {requested_by}
|
|
|
|
return recipients
|
|
|
|
|
|
class WorkflowStateRejectionEmailNotifier(BaseWorkflowStateEmailNotifier):
|
|
"""A notifier to send email updates for WorkflowState rejection events"""
|
|
|
|
notification = "rejected"
|
|
|
|
def get_recipient_users(self, workflow_state: WorkflowState, **kwargs):
|
|
triggering_user = kwargs.get("user", None)
|
|
recipients = set()
|
|
requested_by = workflow_state.requested_by
|
|
if requested_by is not None and requested_by != triggering_user:
|
|
recipients = {requested_by}
|
|
|
|
return recipients
|
|
|
|
def get_context(self, workflow_state, **kwargs):
|
|
context = super().get_context(workflow_state, **kwargs)
|
|
task_state = workflow_state.current_task_state.specific
|
|
context["task"] = task_state.task
|
|
context["task_state"] = task_state
|
|
context["comment"] = task_state.get_comment()
|
|
return context
|
|
|
|
|
|
class WorkflowStateSubmissionEmailNotifier(BaseWorkflowStateEmailNotifier):
|
|
"""A notifier to send email updates for WorkflowState submission events"""
|
|
|
|
notification = "submitted"
|
|
|
|
def get_recipient_users(self, workflow_state: WorkflowState, **kwargs):
|
|
triggering_user = kwargs.get("user", None)
|
|
recipients = get_user_model().objects.none()
|
|
include_superusers = getattr(
|
|
settings, "WAGTAILADMIN_NOTIFICATION_INCLUDE_SUPERUSERS", True
|
|
)
|
|
if include_superusers:
|
|
recipients = get_user_model().objects.filter(is_superuser=True)
|
|
if triggering_user:
|
|
recipients.exclude(pk=triggering_user.pk)
|
|
|
|
return recipients
|
|
|
|
def get_context(self, workflow_state, **kwargs):
|
|
context = super().get_context(workflow_state, **kwargs)
|
|
context["requested_by"] = workflow_state.requested_by
|
|
return context
|
|
|
|
|
|
class BaseGroupApprovalTaskStateEmailNotifier(EmailNotificationMixin, Notifier):
|
|
"""A base notifier to send email updates for GroupApprovalTask events"""
|
|
|
|
def __init__(self):
|
|
super().__init__((TaskState,))
|
|
|
|
def can_handle(self, instance, **kwargs):
|
|
if super().can_handle(instance, **kwargs) and isinstance(
|
|
instance.task.specific, AbstractGroupApprovalTask
|
|
):
|
|
return True
|
|
return False
|
|
|
|
def get_context(self, task_state, **kwargs):
|
|
context = super().get_context(task_state, **kwargs)
|
|
context["task"] = task_state.task.specific
|
|
context["object"] = task_state.workflow_state.content_object
|
|
context["model_name"] = context["object"]._meta.verbose_name
|
|
if isinstance(context["object"], Page):
|
|
context["page"] = context["object"].specific
|
|
return context
|
|
|
|
def get_recipient_users(self, task_state: TaskState, **kwargs):
|
|
triggering_user = kwargs.get("user", None)
|
|
|
|
group_members = get_user_model().objects.filter(
|
|
groups__in=task_state.task.specific.groups.all()
|
|
)
|
|
|
|
recipients = group_members
|
|
|
|
include_superusers = getattr(
|
|
settings, "WAGTAILADMIN_NOTIFICATION_INCLUDE_SUPERUSERS", True
|
|
)
|
|
if include_superusers:
|
|
superusers = get_user_model().objects.filter(is_superuser=True)
|
|
recipients = recipients | superusers
|
|
|
|
if triggering_user:
|
|
recipients = recipients.exclude(pk=triggering_user.pk)
|
|
|
|
return recipients
|
|
|
|
|
|
class GroupApprovalTaskStateSubmissionEmailNotifier(
|
|
BaseGroupApprovalTaskStateEmailNotifier
|
|
):
|
|
"""A notifier to send email updates for GroupApprovalTask submission events"""
|
|
|
|
notification = "submitted"
|