""" Base model definitions for audit logging. These may be subclassed to accommodate specific models such as Page, but the definitions here should remain generic and not depend on the base wagtail.models module or specific models such as Page. """ from collections import defaultdict from django.conf import settings from django.contrib.auth import get_user_model from django.contrib.auth.models import Permission from django.contrib.contenttypes.models import ContentType from django.core.exceptions import ValidationError from django.core.serializers.json import DjangoJSONEncoder from django.db import models from django.utils import timezone from django.utils.functional import cached_property from django.utils.translation import gettext_lazy as _ from wagtail.log_actions import registry as log_action_registry from wagtail.users.utils import get_deleted_user_display_name class LogEntryQuerySet(models.QuerySet): def get_actions(self): """ Returns a set of actions used by at least one log entry in this QuerySet """ return set(self.order_by().values_list("action", flat=True).distinct()) def get_user_ids(self): """ Returns a set of user IDs of users who have created at least one log entry in this QuerySet """ return set(self.order_by().values_list("user_id", flat=True).distinct()) def get_users(self): """ Returns a QuerySet of Users who have created at least one log entry in this QuerySet. The returned queryset is ordered by the username. """ User = get_user_model() return User.objects.filter(pk__in=self.get_user_ids()).order_by( User.USERNAME_FIELD ) def get_content_type_ids(self): """ Returns a set of IDs of content types with logged actions in this QuerySet """ return set(self.order_by().values_list("content_type_id", flat=True).distinct()) def filter_on_content_type(self, content_type): # custom method for filtering by content type, to allow overriding on log entry models # that have a concept of object types that doesn't correspond directly to ContentType # instances (e.g. PageLogEntry, which treats all page types as a single Page type) return self.filter(content_type_id=content_type.id) def with_instances(self): # return an iterable of (log_entry, instance) tuples for all log entries in this queryset. # instance is None if the instance does not exist. # Note: This is an expensive operation and should only be done on small querysets # (e.g. after pagination). # evaluate the queryset in full now, as we'll be iterating over it multiple times log_entries = list(self) ids_by_content_type = defaultdict(list) for log_entry in log_entries: ids_by_content_type[log_entry.content_type_id].append(log_entry.object_id) instances_by_id = {} # lookup of (content_type_id, stringified_object_id) to instance for content_type_id, object_ids in ids_by_content_type.items(): try: content_type = ContentType.objects.get_for_id(content_type_id) model = content_type.model_class() except ContentType.DoesNotExist: model = None if model: model_instances = model.objects.in_bulk(object_ids) else: # The model class for the logged instance no longer exists, # so we have no instance to return. Return None instead. model_instances = {object_id: None for object_id in object_ids} for object_id, instance in model_instances.items(): instances_by_id[(content_type_id, str(object_id))] = instance for log_entry in log_entries: lookup_key = (log_entry.content_type_id, str(log_entry.object_id)) yield (log_entry, instances_by_id.get(lookup_key)) class BaseLogEntryManager(models.Manager): def get_queryset(self): return LogEntryQuerySet(self.model, using=self._db) def get_instance_title(self, instance): return str(instance) or f"{instance.__class__.__name__} object ({instance.pk})" def log_action(self, instance, action, **kwargs): """ :param instance: The model instance we are logging an action for :param action: The action. Should be namespaced to app (e.g. wagtail.create, wagtail.workflow.start) :param kwargs: Addition fields to for the model deriving from BaseLogEntry - user: The user performing the action - uuid: uuid shared between log entries from the same user action - title: the instance title - data: any additional metadata - content_changed, deleted - Boolean flags :return: The new log entry """ if instance.pk is None: raise ValueError( "Attempted to log an action for object %r with empty primary key" % (instance,) ) data = kwargs.pop("data", None) or {} title = kwargs.pop("title", None) if not title: title = self.get_instance_title(instance) timestamp = kwargs.pop("timestamp", timezone.now()) return self.model.objects.create( content_type=ContentType.objects.get_for_model( instance, for_concrete_model=False ), label=title, action=action, timestamp=timestamp, data=data, **kwargs, ) def viewable_by_user(self, user): if user.is_superuser: return self.all() # This will be called multiple times per request, so we cache those ids once. if not hasattr(user, "_allowed_content_type_ids"): # 1) Only query those permissions, where log entries exist for their content # types. used_content_type_ids = self.values_list( "content_type_id", flat=True ).distinct() permissions = Permission.objects.filter( content_type_id__in=used_content_type_ids ) # 2) If the user has at least one permission for a content type, we add its # id to the allowed-set. allowed_content_type_ids = set() for permission in permissions: if permission.content_type_id in allowed_content_type_ids: continue content_type = ContentType.objects.get_for_id( permission.content_type_id ) if user.has_perm(f"{content_type.app_label}.{permission.codename}"): allowed_content_type_ids.add(permission.content_type_id) user._allowed_content_type_ids = allowed_content_type_ids return self.filter(content_type_id__in=user._allowed_content_type_ids) def get_for_model(self, model): # Return empty queryset if the given object is not valid. if not issubclass(model, models.Model): return self.none() ct = ContentType.objects.get_for_model(model) return self.filter(content_type=ct) def get_for_user(self, user_id): return self.filter(user=user_id) def for_instance(self, instance): """ Return a queryset of log entries from this log model that relate to the given object instance """ raise NotImplementedError # must be implemented by subclass class BaseLogEntry(models.Model): content_type = models.ForeignKey( ContentType, models.SET_NULL, verbose_name=_("content type"), blank=True, null=True, related_name="+", ) label = models.TextField() action = models.CharField(max_length=255, blank=True, db_index=True) data = models.JSONField(blank=True, default=dict, encoder=DjangoJSONEncoder) timestamp = models.DateTimeField(verbose_name=_("timestamp (UTC)"), db_index=True) uuid = models.UUIDField( blank=True, null=True, editable=False, help_text="Log entries that happened as part of the same user action are assigned the same UUID", ) user = models.ForeignKey( settings.AUTH_USER_MODEL, null=True, # Null if actioned by system blank=True, on_delete=models.DO_NOTHING, db_constraint=False, related_name="+", ) # Pointer to a specific page revision revision = models.ForeignKey( "wagtailcore.Revision", null=True, blank=True, on_delete=models.DO_NOTHING, db_constraint=False, related_name="+", ) # Flags for additional context to the 'action' made by the user (or system). content_changed = models.BooleanField(default=False, db_index=True) deleted = models.BooleanField(default=False) objects = BaseLogEntryManager() wagtail_reference_index_ignore = True class Meta: abstract = True verbose_name = _("log entry") verbose_name_plural = _("log entries") ordering = ["-timestamp"] def save(self, *args, **kwargs): self.full_clean() return super().save(*args, **kwargs) def clean(self): if not log_action_registry.action_exists(self.action): raise ValidationError( { "action": _( "The log action '%(action_name)s' has not been registered." ) % {"action_name": self.action} } ) def __str__(self): return "LogEntry %d: '%s' on '%s'" % ( self.pk, self.action, self.object_verbose_name(), ) @cached_property def user_display_name(self): """ Returns the display name of the associated user; get_full_name if available and non-empty, otherwise get_username. Defaults to 'system' when none is provided """ if self.user_id: user = self.user if user is None: return get_deleted_user_display_name(self.user_id) try: full_name = user.get_full_name().strip() except AttributeError: full_name = "" return full_name or user.get_username() else: return _("system") @cached_property def object_verbose_name(self): model_class = self.content_type.model_class() if model_class is None: return self.content_type_id return model_class._meta.verbose_name.title def object_id(self): raise NotImplementedError @cached_property def formatter(self): return log_action_registry.get_formatter(self) @cached_property def message(self): if self.formatter: return self.formatter.format_message(self) else: return _("Unknown %(action)s") % {"action": self.action} @cached_property def comment(self): if self.formatter: return self.formatter.format_comment(self) else: return "" class ModelLogEntryManager(BaseLogEntryManager): def log_action(self, instance, action, **kwargs): kwargs.update(object_id=str(instance.pk)) return super().log_action(instance, action, **kwargs) def for_instance(self, instance): return self.filter( content_type=ContentType.objects.get_for_model( instance, for_concrete_model=False ), object_id=str(instance.pk), ) class ModelLogEntry(BaseLogEntry): """ Simple logger for generic Django models """ object_id = models.CharField(max_length=255, blank=False, db_index=True) objects = ModelLogEntryManager() class Meta: ordering = ["-timestamp", "-id"] verbose_name = _("model log entry") verbose_name_plural = _("model log entries") def __str__(self): return "ModelLogEntry %d: '%s' on '%s' with id %s" % ( self.pk, self.action, self.object_verbose_name(), self.object_id, )