angrybeanie_wagtail/env/lib/python3.12/site-packages/wagtail/models/pages.py

2759 lines
100 KiB
Python
Raw Normal View History

2025-07-25 21:32:16 +10:00
from __future__ import annotations
import functools
import logging
import posixpath
import uuid
from django.conf import settings
from django.contrib.auth.models import Group, Permission
from django.contrib.contenttypes.fields import GenericRelation
from django.contrib.contenttypes.models import ContentType
from django.core import checks
from django.core.exceptions import (
FieldDoesNotExist,
ValidationError,
)
from django.db import models, transaction
from django.db.models import Q, Value
from django.db.models.expressions import Subquery
from django.db.models.functions import Concat, Substr
from django.dispatch import receiver
from django.http import Http404, HttpRequest, HttpResponse, HttpResponseNotAllowed
from django.template.response import TemplateResponse
from django.urls import NoReverseMatch, reverse
from django.utils import translation as translation
from django.utils.encoding import force_bytes, force_str
from django.utils.functional import Promise, cached_property
from django.utils.log import log_response
from django.utils.text import capfirst, slugify
from django.utils.translation import gettext_lazy as _
from modelcluster.fields import ParentalKey
from modelcluster.models import (
ClusterableModel,
)
from treebeard.mp_tree import MP_Node
from wagtail.actions.copy_for_translation import CopyPageForTranslationAction
from wagtail.actions.copy_page import CopyPageAction
from wagtail.actions.create_alias import CreatePageAliasAction
from wagtail.actions.delete_page import DeletePageAction
from wagtail.actions.move_page import MovePageAction
from wagtail.actions.publish_page_revision import PublishPageRevisionAction
from wagtail.actions.unpublish_page import UnpublishPageAction
from wagtail.compat import HTTPMethod
from wagtail.coreutils import (
WAGTAIL_APPEND_SLASH,
camelcase_to_underscore,
get_supported_content_language_variant,
resolve_model_string,
safe_md5,
)
from wagtail.fields import StreamField
from wagtail.log_actions import log
from wagtail.query import PageQuerySet
from wagtail.search import index
from wagtail.signals import (
page_published,
page_slug_changed,
pre_validate_delete,
)
from wagtail.url_routing import RouteResult
from wagtail.utils.timestamps import ensure_utc
from .audit_log import BaseLogEntry, BaseLogEntryManager, LogEntryQuerySet
from .content_types import get_default_page_content_type
from .copying import _copy_m2m_relations
from .draft_state import DraftStateMixin
from .i18n import Locale, TranslatableMixin
from .locking import LockableMixin
from .panels import CommentPanelPlaceholder, PanelPlaceholder
from .preview import PreviewableMixin
from .revisions import Revision, RevisionMixin
from .sites import Site
from .specific import SpecificMixin
from .view_restrictions import BaseViewRestriction
from .workflows import WorkflowMixin
logger = logging.getLogger("wagtail")
PAGE_TEMPLATE_VAR = "page"
COMMENTS_RELATION_NAME = getattr(
settings, "WAGTAIL_COMMENTS_RELATION_NAME", "wagtail_admin_comments"
)
@receiver(pre_validate_delete, sender=Locale)
def reassign_root_page_locale_on_delete(sender, instance, **kwargs):
# if we're deleting the locale used on the root page node, reassign that to a new locale first
root_page_with_this_locale = Page.objects.filter(depth=1, locale=instance)
if root_page_with_this_locale.exists():
# Select the default locale, if one exists and isn't the one being deleted
try:
new_locale = Locale.get_default()
default_locale_is_ok = new_locale != instance
except (Locale.DoesNotExist, LookupError):
default_locale_is_ok = False
if not default_locale_is_ok:
# fall back on any remaining locale
new_locale = Locale.all_objects.exclude(pk=instance.pk).first()
root_page_with_this_locale.update(locale=new_locale)
PAGE_MODEL_CLASSES = []
def get_page_models():
"""
Returns a list of all non-abstract Page model classes defined in this project.
"""
return PAGE_MODEL_CLASSES.copy()
def get_page_content_types(include_base_page_type=True):
"""
Returns a queryset of all ContentType objects corresponding to Page model classes.
"""
models = get_page_models()
if not include_base_page_type:
models.remove(Page)
content_type_ids = [
ct.pk for ct in ContentType.objects.get_for_models(*models).values()
]
return ContentType.objects.filter(pk__in=content_type_ids).order_by("model")
@functools.cache
def get_streamfield_names(model_class):
return tuple(
field.name
for field in model_class._meta.concrete_fields
if isinstance(field, StreamField)
)
class BasePageManager(models.Manager):
def get_queryset(self):
return self._queryset_class(self.model).order_by("path")
def first_common_ancestor_of(self, pages, include_self=False, strict=False):
"""
This is similar to ``PageQuerySet.first_common_ancestor`` but works
for a list of pages instead of a queryset.
"""
if not pages:
if strict:
raise self.model.DoesNotExist("Can not find ancestor of empty list")
return self.model.get_first_root_node()
if include_self:
paths = list({page.path for page in pages})
else:
paths = list({page.path[: -self.model.steplen] for page in pages})
# This method works on anything, not just file system paths.
common_parent_path = posixpath.commonprefix(paths)
extra_chars = len(common_parent_path) % self.model.steplen
if extra_chars != 0:
common_parent_path = common_parent_path[:-extra_chars]
if common_parent_path == "":
if strict:
raise self.model.DoesNotExist("No common ancestor found!")
return self.model.get_first_root_node()
return self.get(path=common_parent_path)
def annotate_parent_page(self, pages):
"""
Annotates each page with its parent page. This is implemented as a
manager-only method instead of a QuerySet method so it can be used with
search results.
If given a QuerySet, this method will evaluate it. Only use this method
when you are ready to consume the queryset, e.g. after pagination has
been applied. This is typically done in the view's ``get_context_data``
using ``context["object_list"]``.
This method does not return a new queryset, but modifies the existing one,
to ensure any references to the queryset in the view's context are updated
(e.g. when using ``context_object_name``).
"""
parent_page_paths = {
Page._get_parent_path_from_path(page.path) for page in pages
}
parent_pages_by_path = {
page.path: page
for page in Page.objects.filter(path__in=parent_page_paths).specific(
defer=True
)
}
for page in pages:
parent_page = parent_pages_by_path.get(
Page._get_parent_path_from_path(page.path)
)
page._parent_page = parent_page
PageManager = BasePageManager.from_queryset(PageQuerySet)
class PageBase(models.base.ModelBase):
"""Metaclass for Page"""
def __init__(cls, name, bases, dct):
super().__init__(name, bases, dct)
if "template" not in dct:
# Define a default template path derived from the app name and model name
cls.template = "{}/{}.html".format(
cls._meta.app_label,
camelcase_to_underscore(name),
)
if "ajax_template" not in dct:
cls.ajax_template = None
cls._clean_subpage_models = (
None # to be filled in on first call to cls.clean_subpage_models
)
cls._clean_parent_page_models = (
None # to be filled in on first call to cls.clean_parent_page_models
)
# All pages should be creatable unless explicitly set otherwise.
# This attribute is not inheritable.
if "is_creatable" not in dct:
cls.is_creatable = not cls._meta.abstract
if not cls._meta.abstract:
# register this type in the list of page content types
PAGE_MODEL_CLASSES.append(cls)
class AbstractPage(
WorkflowMixin,
PreviewableMixin,
DraftStateMixin,
LockableMixin,
RevisionMixin,
TranslatableMixin,
SpecificMixin,
MP_Node,
):
"""
Abstract superclass for Page. According to Django's inheritance rules, managers set on
abstract models are inherited by subclasses, but managers set on concrete models that are extended
via multi-table inheritance are not. We therefore need to attach PageManager to an abstract
superclass to ensure that it is retained by subclasses of Page.
"""
objects = PageManager()
class Meta:
abstract = True
# Make sure that this list is sorted by the codename (first item in the tuple)
# so that we can follow the same order when querying the Permission objects.
PAGE_PERMISSION_TYPES = [
("add_page", _("Add"), _("Add/edit pages you own")),
("bulk_delete_page", _("Bulk delete"), _("Delete pages with children")),
("change_page", _("Edit"), _("Edit any page")),
("lock_page", _("Lock"), _("Lock/unlock pages you've locked")),
("publish_page", _("Publish"), _("Publish any page")),
("unlock_page", _("Unlock"), _("Unlock any page")),
]
PAGE_PERMISSION_TYPE_CHOICES = [
(identifier[:-5], long_label) for identifier, _, long_label in PAGE_PERMISSION_TYPES
]
PAGE_PERMISSION_CODENAMES = [identifier for identifier, *_ in PAGE_PERMISSION_TYPES]
class Page(AbstractPage, index.Indexed, ClusterableModel, metaclass=PageBase):
title = models.CharField(
verbose_name=_("title"),
max_length=255,
help_text=_("The page title as you'd like it to be seen by the public"),
)
title.required_on_save = True
# to reflect title of a current draft in the admin UI
draft_title = models.CharField(max_length=255, editable=False)
slug = models.SlugField(
verbose_name=_("slug"),
allow_unicode=True,
max_length=255,
help_text=_(
"The name of the page as it will appear in URLs e.g http://domain.com/blog/[my-slug]/"
),
)
content_type = models.ForeignKey(
ContentType,
verbose_name=_("content type"),
related_name="pages",
on_delete=models.SET(get_default_page_content_type),
)
content_type.wagtail_reference_index_ignore = True
url_path = models.TextField(verbose_name=_("URL path"), blank=True, editable=False)
owner = models.ForeignKey(
settings.AUTH_USER_MODEL,
verbose_name=_("owner"),
null=True,
blank=True,
editable=True,
on_delete=models.SET_NULL,
related_name="owned_pages",
)
owner.wagtail_reference_index_ignore = True
seo_title = models.CharField(
verbose_name=_("title tag"),
max_length=255,
blank=True,
help_text=_(
"The name of the page displayed on search engine results as the clickable headline."
),
)
show_in_menus_default = False
show_in_menus = models.BooleanField(
verbose_name=_("show in menus"),
default=False,
help_text=_(
"Whether a link to this page will appear in automatically generated menus"
),
)
search_description = models.TextField(
verbose_name=_("meta description"),
blank=True,
help_text=_(
"The descriptive text displayed underneath a headline in search engine results."
),
)
latest_revision_created_at = models.DateTimeField(
verbose_name=_("latest revision created at"), null=True, editable=False
)
_revisions = GenericRelation("wagtailcore.Revision", related_query_name="page")
# Add GenericRelation to allow WorkflowState.objects.filter(page=...) queries.
# There is no need to override the workflow_states property, as the default
# implementation in WorkflowMixin already ensures that the queryset uses the
# base Page content type.
_workflow_states = GenericRelation(
"wagtailcore.WorkflowState",
content_type_field="base_content_type",
object_id_field="object_id",
related_query_name="page",
for_concrete_model=False,
)
# When using a specific queryset, accessing the _workflow_states GenericRelation
# will yield no results. This is because the _workflow_states GenericRelation
# uses the base_content_type as the content_type_field, which is not the same
# as the content type of the specific queryset. To work around this, we define
# a second GenericRelation that uses the specific content_type to be used
# when working with specific querysets.
_specific_workflow_states = GenericRelation(
"wagtailcore.WorkflowState",
content_type_field="content_type",
object_id_field="object_id",
related_query_name="page",
for_concrete_model=False,
)
# If non-null, this page is an alias of the linked page
# This means the page is kept in sync with the live version
# of the linked pages and is not editable by users.
alias_of = models.ForeignKey(
"self",
on_delete=models.SET_NULL,
null=True,
blank=True,
editable=False,
related_name="aliases",
)
alias_of.wagtail_reference_index_ignore = True
search_fields = [
index.SearchField("title", boost=2),
index.AutocompleteField("title"),
index.FilterField("title"),
index.FilterField("id"),
index.FilterField("live"),
index.FilterField("owner"),
index.FilterField("content_type"),
index.FilterField("path"),
index.FilterField("depth"),
index.FilterField("locked"),
index.FilterField("show_in_menus"),
index.FilterField("first_published_at"),
index.FilterField("last_published_at"),
index.FilterField("latest_revision_created_at"),
index.FilterField("locale"),
index.FilterField("translation_key"),
]
# Do not allow plain Page instances to be created through the Wagtail admin
is_creatable = False
# Define the maximum number of instances this page type can have. Default to unlimited.
max_count = None
# Define the maximum number of instances this page can have under a specific parent. Default to unlimited.
max_count_per_parent = None
# Set the default order for child pages to be shown in the Page index listing
admin_default_ordering = "-latest_revision_created_at"
# An array of additional field names that will not be included when a Page is copied.
exclude_fields_in_copy = []
default_exclude_fields_in_copy = [
"id",
"depth",
"numchild",
"url_path",
"path",
"postgres_index_entries",
"index_entries",
"latest_revision",
COMMENTS_RELATION_NAME,
]
# Real panel classes are defined in wagtail.admin.panels, which we can't import here
# because it would create a circular import. Instead, define them with placeholders
# to be replaced with the real classes by `wagtail.admin.panels.model_utils.expand_panel_list`.
content_panels = [
PanelPlaceholder("wagtail.admin.panels.TitleFieldPanel", ["title"], {}),
]
promote_panels = [
PanelPlaceholder(
"wagtail.admin.panels.MultiFieldPanel",
[
[
"slug",
"seo_title",
"search_description",
],
_("For search engines"),
],
{},
),
PanelPlaceholder(
"wagtail.admin.panels.MultiFieldPanel",
[
[
"show_in_menus",
],
_("For site menus"),
],
{},
),
]
settings_panels = [
PanelPlaceholder("wagtail.admin.panels.PublishingPanel", [], {}),
CommentPanelPlaceholder(),
]
# Privacy options for page
private_page_options = ["password", "groups", "login"]
# Allows page types to specify a list of HTTP method names that page instances will
# respond to. When the request type doesn't match, Wagtail should return a response
# with a status code of 405.
allowed_http_methods = [
HTTPMethod.DELETE,
HTTPMethod.GET,
HTTPMethod.HEAD,
HTTPMethod.OPTIONS,
HTTPMethod.PATCH,
HTTPMethod.POST,
HTTPMethod.PUT,
]
@staticmethod
def route_for_request(request: HttpRequest, path: str) -> RouteResult | None:
"""
Find the page route for the given HTTP request object, and URL path. The route
result (`page`, `args`, and `kwargs`) will be cached via
``request._wagtail_route_for_request``.
"""
if not hasattr(request, "_wagtail_route_for_request"):
try:
# we need a valid Site object for this request in order to proceed
if site := Site.find_for_request(request):
path_components = [
component for component in path.split("/") if component
]
request._wagtail_route_for_request = (
site.root_page.localized.specific.route(
request, path_components
)
)
else:
request._wagtail_route_for_request = None
except Http404:
# .route() can raise Http404
request._wagtail_route_for_request = None
return request._wagtail_route_for_request
@staticmethod
def find_for_request(request: HttpRequest, path: str) -> Page | None:
"""
Find the page for the given HTTP request object, and URL path. The full
page route will be cached via ``request._wagtail_route_for_request``.
"""
result = Page.route_for_request(request, path)
if result is not None:
return result[0]
@classmethod
def allowed_http_method_names(cls):
return [
method.value if hasattr(method, "value") else method
for method in cls.allowed_http_methods
]
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
if not self.id:
# this model is being newly created
# rather than retrieved from the db;
if not self.content_type_id:
# set content type to correctly represent the model class
# that this was created as
self.content_type = ContentType.objects.get_for_model(self)
if "show_in_menus" not in kwargs:
# if the value is not set on submit refer to the model setting
self.show_in_menus = self.show_in_menus_default
def __str__(self):
return self.title
@property
def revisions(self):
# Always use the specific page instance when querying for revisions as
# they are always saved with the specific content_type.
return self.specific_deferred._revisions
def get_base_content_type(self):
# We want to always use the default Page model's ContentType as the
# base_content_type so that we can query for page revisions without
# having to know the specific Page type.
return get_default_page_content_type()
def get_content_type(self):
return self.content_type
@classmethod
def get_streamfield_names(cls):
return get_streamfield_names(cls)
def set_url_path(self, parent):
"""
Populate the url_path field based on this page's slug and the specified parent page.
(We pass a parent in here, rather than retrieving it via get_parent, so that we can give
new unsaved pages a meaningful URL when previewing them; at that point the page has not
been assigned a position in the tree, as far as treebeard is concerned.
"""
if parent:
self.url_path = parent.url_path + self.slug + "/"
else:
# a page without a parent is the tree root, which always has a url_path of '/'
self.url_path = "/"
return self.url_path
@staticmethod
def _slug_is_available(slug, parent_page, page=None):
"""
Determine whether the given slug is available for use on a child page of
parent_page. If 'page' is passed, the slug is intended for use on that page
(and so it will be excluded from the duplicate check).
"""
if parent_page is None:
# the root page's slug can be whatever it likes...
return True
siblings = parent_page.get_children()
if page:
siblings = siblings.not_page(page)
return not siblings.filter(slug=slug).exists()
def _get_autogenerated_slug(self, base_slug):
candidate_slug = base_slug
suffix = 1
parent_page = self.get_parent()
while not Page._slug_is_available(candidate_slug, parent_page, self):
# try with incrementing suffix until we find a slug which is available
suffix += 1
candidate_slug = "%s-%d" % (base_slug, suffix)
return candidate_slug
def get_default_locale(self):
"""
Finds the default locale to use for this page.
This will be called just before the initial save.
"""
parent = self.get_parent()
if parent is not None:
return (
parent.specific_class.objects.defer()
.select_related("locale")
.get(id=parent.id)
.locale
)
return super().get_default_locale()
def get_admin_default_ordering(self):
"""
Determine the default ordering for child pages in the admin index listing.
Returns a string (e.g. 'latest_revision_created_at, title, ord' or 'live').
"""
return self.admin_default_ordering
def _set_core_field_defaults(self):
"""
Set default values for core fields (slug, draft_title, locale) that need to be
in place before validating or saving
"""
if not self.slug:
# Try to auto-populate slug from title
allow_unicode = getattr(settings, "WAGTAIL_ALLOW_UNICODE_SLUGS", True)
base_slug = slugify(self.title, allow_unicode=allow_unicode)
# only proceed if we get a non-empty base slug back from slugify
if base_slug:
self.slug = self._get_autogenerated_slug(base_slug)
if not self.draft_title:
self.draft_title = self.title
# Set the locale
if self.locale_id is None:
self.locale = self.get_default_locale()
def full_clean(self, *args, **kwargs):
self._set_core_field_defaults()
super().full_clean(*args, **kwargs)
def _check_slug_is_unique(self):
parent_page = self.get_parent()
if not Page._slug_is_available(self.slug, parent_page, self):
raise ValidationError(
{
"slug": _(
"The slug '%(page_slug)s' is already in use within the parent page at '%(parent_url_path)s'"
)
% {"page_slug": self.slug, "parent_url_path": parent_page.url}
}
)
def clean(self):
super().clean()
self._check_slug_is_unique()
def minimal_clean(self):
self._set_core_field_defaults()
self.title = self._meta.get_field("title").clean(self.title, self)
self._check_slug_is_unique()
def is_site_root(self):
"""
Returns True if this page is the root of any site.
This includes translations of site root pages as well.
"""
# `_is_site_root` may be populated by `annotate_site_root_state` on `PageQuerySet` as a
# performance optimisation
if hasattr(self, "_is_site_root"):
return self._is_site_root
return Site.objects.filter(
root_page__translation_key=self.translation_key
).exists()
@transaction.atomic
# ensure that changes are only committed when we have updated all descendant URL paths, to preserve consistency
def save(self, clean=True, user=None, log_action=False, **kwargs):
"""
Writes the page to the database, performing additional housekeeping tasks to ensure data
integrity:
* ``locale``, ``draft_title`` and ``slug`` are set to default values if not provided, with ``slug``
being generated from the title with a suffix to ensure uniqueness within the parent page
where necessary
* The ``url_path`` field is set based on the ``slug`` and the parent page
* If the ``slug`` has changed, the ``url_path`` of this page and all descendants is updated and
a :ref:`page_slug_changed` signal is sent
New pages should be saved by passing the unsaved page instance to the
:meth:`~treebeard.mp_tree.MP_Node.add_child`
or :meth:`~treebeard.mp_tree.MP_Node.add_sibling` method of an existing page, which will correctly update
the fields responsible for tracking the page's location in the tree.
If ``clean=False`` is passed, the page is saved without validation. This is appropriate for updates that only
change metadata such as `latest_revision` while keeping content and page location unchanged.
If ``clean=True`` is passed (the default), and the page has ``live=True`` set, the page is validated using
:meth:`~django.db.models.Model.full_clean` before saving.
If ``clean=True`` is passed, and the page has ``live=False`` set, only the title and slug fields are validated.
.. versionchanged:: 7.0
``clean=True`` now only performs full validation when the page is live. When the page is not live, only
the title and slug fields are validated. Previously, full validation was always performed.
"""
if clean:
if self.live:
self.full_clean()
else:
# Saving as draft; only perform the minimal validation to satisfy data integrity
self.minimal_clean()
slug_changed = False
is_new = self.id is None
if is_new:
# we are creating a record. If we're doing things properly, this should happen
# through a treebeard method like add_child, in which case the 'path' field
# has been set and so we can safely call get_parent
self.set_url_path(self.get_parent())
else:
# Check that we are committing the slug to the database
# Basically: If update_fields has been specified, and slug is not included, skip this step
if not (
"update_fields" in kwargs and "slug" not in kwargs["update_fields"]
):
# see if the slug has changed from the record in the db, in which case we need to
# update url_path of self and all descendants. Even though we might not need it,
# the specific page is fetched here for sending to the 'page_slug_changed' signal.
old_record = Page.objects.get(id=self.id).specific
if old_record.slug != self.slug:
self.set_url_path(self.get_parent())
slug_changed = True
old_url_path = old_record.url_path
new_url_path = self.url_path
result = super().save(**kwargs)
if slug_changed:
self._update_descendant_url_paths(old_url_path, new_url_path)
# Emit page_slug_changed signal on successful db commit
transaction.on_commit(
lambda: page_slug_changed.send(
sender=self.specific_class or self.__class__,
instance=self.specific,
instance_before=old_record,
)
)
# Check if this is a root page of any sites and clear the 'wagtail_site_root_paths' key if so
# Note: New translations of existing site roots are considered site roots as well, so we must
# always check if this page is a site root, even if it's new.
if self.is_site_root():
Site.clear_site_root_paths_cache()
# Log
if is_new:
cls = type(self)
logger.info(
'Page created: "%s" id=%d content_type=%s.%s path=%s',
self.title,
self.id,
cls._meta.app_label,
cls.__name__,
self.url_path,
)
if log_action is not None:
# The default for log_action is False. i.e. don't log unless specifically instructed
# Page creation is a special case that we want logged by default, but allow skipping it
# explicitly by passing log_action=None
if is_new:
log(
instance=self,
action="wagtail.create",
user=user or self.owner,
content_changed=True,
)
elif log_action:
log(instance=self, action=log_action, user=user)
return result
def delete(self, *args, **kwargs):
user = kwargs.pop("user", None)
return DeletePageAction(self, user=user).execute(*args, **kwargs)
@classmethod
def check(cls, **kwargs):
errors = super().check(**kwargs)
# Check that foreign keys from pages are not configured to cascade
# This is the default Django behaviour which must be explicitly overridden
# to prevent pages disappearing unexpectedly and the tree being corrupted
# get names of foreign keys pointing to parent classes (such as page_ptr)
field_exceptions = [
field.name
for model in [cls] + list(cls._meta.get_parent_list())
for field in model._meta.parents.values()
if field
]
for field in cls._meta.fields:
if (
isinstance(field, models.ForeignKey)
and field.name not in field_exceptions
):
if field.remote_field.on_delete == models.CASCADE:
errors.append(
checks.Warning(
"Field hasn't specified on_delete action",
hint="Set on_delete=models.SET_NULL and make sure the field is nullable or set on_delete=models.PROTECT. Wagtail does not allow simple database CASCADE because it will corrupt its tree storage.",
obj=field,
id="wagtailcore.W001",
)
)
if not isinstance(cls.objects, PageManager):
errors.append(
checks.Error(
"Manager does not inherit from PageManager",
hint="Ensure that custom Page managers inherit from wagtail.models.PageManager",
obj=cls,
id="wagtailcore.E002",
)
)
try:
cls.clean_subpage_models()
except (ValueError, LookupError) as e:
errors.append(
checks.Error(
"Invalid subpage_types setting for %s" % cls,
hint=str(e),
id="wagtailcore.E002",
)
)
try:
cls.clean_parent_page_models()
except (ValueError, LookupError) as e:
errors.append(
checks.Error(
"Invalid parent_page_types setting for %s" % cls,
hint=str(e),
id="wagtailcore.E002",
)
)
return errors
def _update_descendant_url_paths(self, old_url_path, new_url_path):
(
Page.objects.filter(path__startswith=self.path)
.exclude(pk=self.pk)
.update(
url_path=Concat(
Value(new_url_path), Substr("url_path", len(old_url_path) + 1)
)
)
)
@property
def page_type_display_name(self):
"""
A human-readable version of this page's type.
"""
if not self.specific_class or self.is_root():
return ""
else:
return self.specific_class.get_verbose_name()
def route(self, request, path_components):
if path_components:
# request is for a child of this page
child_slug = path_components[0]
remaining_components = path_components[1:]
try:
subpage = self.get_children().get(slug=child_slug)
# Cache the parent page on the subpage to avoid another db query
# Treebeard's get_parent will use the `_cached_parent_obj` attribute if it exists
# And update = False
setattr(subpage, "_cached_parent_obj", self)
except Page.DoesNotExist:
raise Http404
return subpage.specific.route(request, remaining_components)
else:
# request is for this very page
if self.live:
return RouteResult(self)
else:
raise Http404
def get_admin_display_title(self):
"""
Return the title for this page as it should appear in the admin backend;
override this if you wish to display extra contextual information about the page,
such as language. By default, returns ``draft_title``.
"""
# Fall back on title if draft_title is blank (which may happen if the page was created
# in a fixture or migration that didn't explicitly handle draft_title)
return self.draft_title or self.title
def save_revision(
self,
user=None,
approved_go_live_at=None,
changed=True,
log_action=False,
previous_revision=None,
clean=True,
):
# Raise error if this is not the specific version of the page
if not isinstance(self, self.specific_class):
raise RuntimeError(
"page.save_revision() must be called on the specific version of the page. "
"Call page.specific.save_revision() instead."
)
# Raise an error if this page is an alias.
if self.alias_of_id:
raise RuntimeError(
"page.save_revision() was called on an alias page. "
"Revisions are not required for alias pages as they are an exact copy of another page."
)
if clean:
self.full_clean()
new_comments = getattr(self, COMMENTS_RELATION_NAME).filter(pk__isnull=True)
for comment in new_comments:
# We need to ensure comments have an id in the revision, so positions can be identified correctly
comment.save()
revision = Revision.objects.create(
content_object=self,
base_content_type=self.get_base_content_type(),
user=user,
approved_go_live_at=approved_go_live_at,
content=self.serializable_data(),
object_str=str(self),
)
for comment in new_comments:
comment.revision_created = revision
self.latest_revision_created_at = revision.created_at
self.draft_title = self.title
self.latest_revision = revision
update_fields = [
COMMENTS_RELATION_NAME,
"latest_revision_created_at",
"draft_title",
"latest_revision",
]
if changed:
self.has_unpublished_changes = True
update_fields.append("has_unpublished_changes")
if update_fields:
# clean=False because the fields we're updating don't need validation
self.save(update_fields=update_fields, clean=False)
# Log
logger.info(
'Page edited: "%s" id=%d revision_id=%d', self.title, self.id, revision.id
)
if log_action:
if not previous_revision:
log(
instance=self,
action=log_action
if isinstance(log_action, str)
else "wagtail.edit",
user=user,
revision=revision,
content_changed=changed,
)
else:
log(
instance=self,
action=log_action
if isinstance(log_action, str)
else "wagtail.revert",
user=user,
data={
"revision": {
"id": previous_revision.id,
"created": ensure_utc(previous_revision.created_at),
}
},
revision=revision,
content_changed=changed,
)
return revision
def get_latest_revision_as_object(self):
if not self.has_unpublished_changes:
# Use the live database copy in preference to the revision record, as:
# 1) this will pick up any changes that have been made directly to the model,
# such as automated data imports;
# 2) it ensures that inline child objects pick up real database IDs even if
# those are absent from the revision data. (If this wasn't the case, the child
# objects would be recreated with new IDs on next publish - see #1853)
return self.specific
latest_revision = self.get_latest_revision()
if latest_revision:
return latest_revision.as_object()
else:
return self.specific
def update_aliases(self, *, revision=None, _content=None, _updated_ids=None):
"""
Publishes all aliases that follow this page with the latest content from this page.
This is called by Wagtail whenever a page with aliases is published.
:param revision: The revision of the original page that we are updating to (used for logging purposes)
:type revision: Revision, Optional
"""
specific_self = self.specific
# Only compute this if necessary since it's quite a heavy operation
if _content is None:
_content = self.serializable_data()
# A list of IDs that have already been updated. This is just in case someone has
# created an alias loop (which is impossible to do with the UI Wagtail provides)
_updated_ids = _updated_ids or []
for alias in self.specific_class.objects.filter(alias_of=self).exclude(
id__in=_updated_ids
):
# FIXME: Switch to the same fields that are excluded from copy
# We can't do this right now because we can't exclude fields from with_content_json
exclude_fields = [
"id",
"path",
"depth",
"numchild",
"url_path",
"path",
"index_entries",
"postgres_index_entries",
]
# Copy field content
alias_updated = alias.with_content_json(_content)
# Publish the alias if it's currently in draft
alias_updated.live = True
alias_updated.has_unpublished_changes = False
# Copy child relations
child_object_map = specific_self.copy_all_child_relations(
target=alias_updated, exclude=exclude_fields
)
# Process child objects
# This has two jobs:
# - If the alias is in a different locale, this updates the
# locale of any translatable child objects to match
# - If the alias is not a translation of the original, this
# changes the translation_key field of all child objects
# so they do not clash
if child_object_map:
alias_is_translation = alias.translation_key == self.translation_key
def process_child_object(child_object):
if isinstance(child_object, TranslatableMixin):
# Child object's locale must always match the page
child_object.locale = alias_updated.locale
# If the alias isn't a translation of the original page,
# change the child object's translation_keys so they are
# not either
if not alias_is_translation:
child_object.translation_key = uuid.uuid4()
for (rel, previous_id), child_objects in child_object_map.items():
if previous_id is None:
for child_object in child_objects:
process_child_object(child_object)
else:
process_child_object(child_objects)
# Copy M2M relations
_copy_m2m_relations(
specific_self, alias_updated, exclude_fields=exclude_fields
)
# Don't change the aliases slug
# Aliases can have their own slugs so they can be siblings of the original
alias_updated.slug = alias.slug
alias_updated.set_url_path(alias_updated.get_parent())
# Aliases don't have revisions, so update fields that would normally be updated by save_revision
alias_updated.draft_title = alias_updated.title
alias_updated.latest_revision_created_at = self.latest_revision_created_at
alias_updated.save(clean=False)
page_published.send(
sender=alias_updated.specific_class,
instance=alias_updated,
revision=revision,
alias=True,
)
# Update any aliases of that alias
# Design note:
# It could be argued that this will be faster if we just changed these alias-of-alias
# pages to all point to the original page and avoid having to update them recursively.
#
# But, it's useful to have a record of how aliases have been chained.
# For example, In Wagtail Localize, we use aliases to create mirrored trees, but those
# trees themselves could have aliases within them. If an alias within a tree is
# converted to a regular page, we want the alias in the mirrored tree to follow that
# new page and stop receiving updates from the original page.
#
# Doing it this way requires an extra lookup query per alias but this is small in
# comparison to the work required to update the alias.
alias.update_aliases(
revision=revision,
_content=_content,
_updated_ids=_updated_ids,
)
update_aliases.alters_data = True
def publish(
self,
revision,
user=None,
changed=True,
log_action=True,
previous_revision=None,
skip_permission_checks=False,
):
return PublishPageRevisionAction(
revision,
user=user,
changed=changed,
log_action=log_action,
previous_revision=previous_revision,
).execute(skip_permission_checks=skip_permission_checks)
def unpublish(self, set_expired=False, commit=True, user=None, log_action=True):
return UnpublishPageAction(
self,
set_expired=set_expired,
commit=commit,
user=user,
log_action=log_action,
).execute()
context_object_name = None
def get_context(self, request, *args, **kwargs):
context = {
PAGE_TEMPLATE_VAR: self,
"self": self,
"request": request,
}
if self.context_object_name:
context[self.context_object_name] = self
return context
def get_preview_context(self, request, mode_name):
return self.get_context(request)
def get_template(self, request, *args, **kwargs):
if request.headers.get("x-requested-with") == "XMLHttpRequest":
return self.ajax_template or self.template
else:
return self.template
def get_preview_template(self, request, mode_name):
return self.get_template(request)
def serve(self, request, *args, **kwargs):
request.is_preview = False
return TemplateResponse(
request,
self.get_template(request, *args, **kwargs),
self.get_context(request, *args, **kwargs),
)
def check_request_method(self, request, *args, **kwargs):
"""
Checks the ``method`` attribute of the request against those supported
by the page (as defined by :attr:`allowed_http_methods`) and responds
accordingly.
If supported, ``None`` is returned, and the request is processed
normally. If not, a warning is logged and an ``HttpResponseNotAllowed``
is returned, and any further request handling is terminated.
"""
allowed_methods = self.allowed_http_method_names()
if request.method not in allowed_methods:
response = HttpResponseNotAllowed(allowed_methods)
log_response(
"Method Not Allowed (%s): %s",
request.method,
request.path,
request=request,
response=response,
)
return response
def handle_options_request(self, request, *args, **kwargs):
"""
Returns an ``HttpResponse`` with an ``"Allow"`` header containing the list of
supported HTTP methods for this page. This method is used instead of
:meth:`serve` to handle requests when the ``OPTIONS`` HTTP verb is
detected (and :class:`HTTPMethod.OPTIONS <python:http.HTTPMethod>` is
present in :attr:`allowed_http_methods` for this type of page).
"""
return HttpResponse(
headers={"Allow": ", ".join(self.allowed_http_method_names())}
)
def is_navigable(self):
"""
Return true if it's meaningful to browse subpages of this page -
i.e. it currently has subpages,
or it's at the top level (this rule necessary for empty out-of-the-box sites to have working navigation)
"""
return (not self.is_leaf()) or self.depth == 2
def _get_site_root_paths(self, request=None):
"""
Return ``Site.get_site_root_paths()``, using the cached copy on the
request object if available.
"""
# if we have a request, use that to cache site_root_paths; otherwise, use self
cache_object = request if request else self
try:
return cache_object._wagtail_cached_site_root_paths
except AttributeError:
cache_object._wagtail_cached_site_root_paths = Site.get_site_root_paths()
return cache_object._wagtail_cached_site_root_paths
def _get_relevant_site_root_paths(self, cache_object=None):
"""
Returns a tuple of root paths for all sites this page belongs to.
"""
return tuple(
srp
for srp in self._get_site_root_paths(cache_object)
if self.url_path.startswith(srp.root_path)
)
def get_url_parts(self, request=None):
"""
Determine the URL for this page and return it as a tuple of
``(site_id, site_root_url, page_url_relative_to_site_root)``.
Return ``None`` if the page is not routable, or return
``(site_id, None, None)`` if ``NoReverseMatch`` exception is raised.
This is used internally by the ``full_url``, ``url``, ``relative_url``
and ``get_site`` properties and methods; pages with custom URL routing
should override this method in order to have those operations return
the custom URLs.
Accepts an optional keyword argument ``request``, which may be used
to avoid repeated database / cache lookups. Typically, a page model
that overrides ``get_url_parts`` should not need to deal with
``request`` directly, and should just pass it to the original method
when calling ``super``.
"""
possible_sites = self._get_relevant_site_root_paths(request)
if not possible_sites:
return None
# Thanks to the ordering applied by Site.get_site_root_paths(),
# the first item is ideal in the vast majority of setups.
site_id, root_path, root_url, language_code = possible_sites[0]
unique_site_ids = {values[0] for values in possible_sites}
if len(unique_site_ids) > 1 and isinstance(request, HttpRequest):
# The page somehow belongs to more than one site (rare, but possible).
# If 'request' is indeed a HttpRequest, use it to identify the 'current'
# site and prefer an option matching that (where present).
site = Site.find_for_request(request)
if site:
for values in possible_sites:
if values[0] == site.pk:
site_id, root_path, root_url, language_code = values
break
use_wagtail_i18n = getattr(settings, "WAGTAIL_I18N_ENABLED", False)
if use_wagtail_i18n:
# If the active language code is a variant of the page's language, then
# use that instead
# This is used when LANGUAGES contain more languages than WAGTAIL_CONTENT_LANGUAGES
try:
if (
get_supported_content_language_variant(translation.get_language())
== language_code
):
language_code = translation.get_language()
except LookupError:
# active language code is not a recognised content language, so leave
# page's language code unchanged
pass
# The page may not be routable because wagtail_serve is not registered
# This may be the case if Wagtail is used headless
try:
if use_wagtail_i18n:
with translation.override(language_code):
page_path = reverse(
"wagtail_serve", args=(self.url_path[len(root_path) :],)
)
else:
page_path = reverse(
"wagtail_serve", args=(self.url_path[len(root_path) :],)
)
except NoReverseMatch:
return (site_id, None, None)
# Remove the trailing slash from the URL reverse generates if
# WAGTAIL_APPEND_SLASH is False and we're not trying to serve
# the root path
if not WAGTAIL_APPEND_SLASH and page_path != "/":
page_path = page_path.rstrip("/")
return (site_id, root_url, page_path)
def get_full_url(self, request=None):
"""
Return the full URL (including protocol / domain) to this page, or ``None`` if it is not routable.
"""
url_parts = self.get_url_parts(request=request)
if url_parts is None or url_parts[1] is None and url_parts[2] is None:
# page is not routable
return
site_id, root_url, page_path = url_parts
return root_url + page_path
full_url = property(get_full_url)
def get_url(self, request=None, current_site=None):
"""
Return the 'most appropriate' URL for referring to this page from the pages we serve,
within the Wagtail backend and actual website templates;
this is the local URL (starting with '/') if we're only running a single site
(i.e. we know that whatever the current page is being served from, this link will be on the
same domain), and the full URL (with domain) if not.
Return ``None`` if the page is not routable.
Accepts an optional but recommended ``request`` keyword argument that, if provided, will
be used to cache site-level URL information (thereby avoiding repeated database / cache
lookups) and, via the ``Site.find_for_request()`` function, determine whether a relative
or full URL is most appropriate.
"""
# ``current_site`` is purposefully undocumented, as one can simply pass the request and get
# a relative URL based on ``Site.find_for_request()``. Nonetheless, support it here to avoid
# copy/pasting the code to the ``relative_url`` method below.
if current_site is None and request is not None:
site = Site.find_for_request(request)
current_site = site
url_parts = self.get_url_parts(request=request)
if url_parts is None or url_parts[1] is None and url_parts[2] is None:
# page is not routable
return
site_id, root_url, page_path = url_parts
# Get number of unique sites in root paths
# Note: there may be more root paths to sites if there are multiple languages
num_sites = len(
{root_path[0] for root_path in self._get_site_root_paths(request)}
)
if (current_site is not None and site_id == current_site.id) or num_sites == 1:
# the site matches OR we're only running a single site, so a local URL is sufficient
return page_path
else:
return root_url + page_path
url = property(get_url)
def relative_url(self, current_site, request=None):
"""
Return the 'most appropriate' URL for this page taking into account the site we're currently on;
a local URL if the site matches, or a fully qualified one otherwise.
Return ``None`` if the page is not routable.
Accepts an optional but recommended ``request`` keyword argument that, if provided, will
be used to cache site-level URL information (thereby avoiding repeated database / cache
lookups).
"""
return self.get_url(request=request, current_site=current_site)
def get_site(self):
"""
Return the Site object that this page belongs to.
"""
url_parts = self.get_url_parts()
if url_parts is None:
# page is not routable
return
site_id, root_url, page_path = url_parts
return Site.objects.get(id=site_id)
@classmethod
def get_indexed_objects(cls):
content_type = ContentType.objects.get_for_model(cls)
return super().get_indexed_objects().filter(content_type=content_type)
def get_indexed_instance(self):
# This is accessed on save by the wagtailsearch signal handler, and in edge
# cases (e.g. loading test fixtures), may be called before the specific instance's
# entry has been created. In those cases, we aren't ready to be indexed yet, so
# return None.
try:
return self.specific
except self.specific_class.DoesNotExist:
return None
def get_default_privacy_setting(self, request: HttpRequest):
"""Set the default privacy setting for a page."""
return {"type": BaseViewRestriction.NONE}
@classmethod
def clean_subpage_models(cls):
"""
Returns the list of subpage types, normalized as model classes.
Throws ValueError if any entry in subpage_types cannot be recognized as a model name,
or LookupError if a model does not exist (or is not a Page subclass).
"""
if cls._clean_subpage_models is None:
subpage_types = getattr(cls, "subpage_types", None)
if subpage_types is None:
# if subpage_types is not specified on the Page class, allow all page types as subpages
cls._clean_subpage_models = get_page_models()
else:
cls._clean_subpage_models = [
resolve_model_string(model_string, cls._meta.app_label)
for model_string in subpage_types
]
for model in cls._clean_subpage_models:
if not issubclass(model, Page):
raise LookupError("%s is not a Page subclass" % model)
return cls._clean_subpage_models
@classmethod
def clean_parent_page_models(cls):
"""
Returns the list of parent page types, normalized as model classes.
Throws ValueError if any entry in parent_page_types cannot be recognized as a model name,
or LookupError if a model does not exist (or is not a Page subclass).
"""
if cls._clean_parent_page_models is None:
parent_page_types = getattr(cls, "parent_page_types", None)
if parent_page_types is None:
# if parent_page_types is not specified on the Page class, allow all page types as subpages
cls._clean_parent_page_models = get_page_models()
else:
cls._clean_parent_page_models = [
resolve_model_string(model_string, cls._meta.app_label)
for model_string in parent_page_types
]
for model in cls._clean_parent_page_models:
if not issubclass(model, Page):
raise LookupError("%s is not a Page subclass" % model)
return cls._clean_parent_page_models
@classmethod
def allowed_parent_page_models(cls):
"""
Returns the list of page types that this page type can be a subpage of,
as a list of model classes.
"""
return [
parent_model
for parent_model in cls.clean_parent_page_models()
if cls in parent_model.clean_subpage_models()
]
@classmethod
def allowed_subpage_models(cls):
"""
Returns the list of page types that this page type can have as subpages,
as a list of model classes.
"""
return [
subpage_model
for subpage_model in cls.clean_subpage_models()
if cls in subpage_model.clean_parent_page_models()
]
@classmethod
def creatable_subpage_models(cls):
"""
Returns the list of page types that may be created under this page type,
as a list of model classes.
"""
return [
page_model
for page_model in cls.allowed_subpage_models()
if page_model.is_creatable
]
@classmethod
def can_exist_under(cls, parent):
"""
Checks if this page type can exist as a subpage under a parent page
instance.
See also: :func:`Page.can_create_at` and :func:`Page.can_move_to`
"""
return cls in parent.specific_class.allowed_subpage_models()
@classmethod
def can_create_at(cls, parent):
"""
Checks if this page type can be created as a subpage under a parent
page instance.
"""
can_create = cls.is_creatable and cls.can_exist_under(parent)
if cls.max_count is not None:
can_create = can_create and cls.objects.count() < cls.max_count
if cls.max_count_per_parent is not None:
can_create = (
can_create
and parent.get_children().type(cls).count() < cls.max_count_per_parent
)
return can_create
def can_move_to(self, parent):
"""
Checks if this page instance can be moved to be a subpage of a parent
page instance.
"""
# Prevent pages from being moved to different language sections
# The only page that can have multi-lingual children is the root page
parent_is_root = parent.depth == 1
if not parent_is_root and parent.locale_id != self.locale_id:
return False
return self.can_exist_under(parent)
@classmethod
def get_verbose_name(cls):
"""
Returns the human-readable "verbose name" of this page model e.g "Blog page".
"""
# This is similar to doing cls._meta.verbose_name.title()
# except this doesn't convert any characters to lowercase
return capfirst(cls._meta.verbose_name)
@classmethod
def get_page_description(cls):
"""
Returns a page description if it's set. For example "A multi-purpose web page".
"""
description = getattr(cls, "page_description", None)
# make sure that page_description is actually a string rather than a model field
if isinstance(description, str):
return description
elif isinstance(description, Promise):
# description is a lazy object (e.g. the result of gettext_lazy())
return str(description)
else:
return ""
@property
def approved_schedule(self):
"""
``_approved_schedule`` may be populated by ``annotate_approved_schedule`` on ``PageQuerySet`` as a
performance optimization.
"""
if hasattr(self, "_approved_schedule"):
return self._approved_schedule
return self.scheduled_revision is not None
def has_unpublished_subtree(self):
"""
An awkwardly-defined flag used in determining whether unprivileged editors have
permission to delete this article. Returns true if and only if this page is non-live,
and it has no live children.
"""
return (not self.live) and (
not self.get_descendants().filter(live=True).exists()
)
def move(self, target, pos=None, user=None):
"""
Extension to the treebeard 'move' method to ensure that url_path is updated,
and to emit a 'pre_page_move' and 'post_page_move' signals.
"""
return MovePageAction(self, target, pos=pos, user=user).execute()
def copy(
self,
recursive=False,
to=None,
update_attrs=None,
copy_revisions=True,
keep_live=True,
user=None,
process_child_object=None,
exclude_fields=None,
log_action="wagtail.copy",
reset_translation_key=True,
):
"""
Copies a given page
:param log_action: flag for logging the action. Pass None to skip logging. Can be passed an action string. Defaults to ``'wagtail.copy'``.
"""
return CopyPageAction(
self,
to=to,
update_attrs=update_attrs,
exclude_fields=exclude_fields,
recursive=recursive,
copy_revisions=copy_revisions,
keep_live=keep_live,
user=user,
process_child_object=process_child_object,
log_action=log_action,
reset_translation_key=reset_translation_key,
).execute(skip_permission_checks=True)
copy.alters_data = True
def create_alias(
self,
*,
recursive=False,
parent=None,
update_slug=None,
update_locale=None,
user=None,
log_action="wagtail.create_alias",
reset_translation_key=True,
_mpnode_attrs=None,
):
return CreatePageAliasAction(
self,
recursive=recursive,
parent=parent,
update_slug=update_slug,
update_locale=update_locale,
user=user,
log_action=log_action,
reset_translation_key=reset_translation_key,
_mpnode_attrs=_mpnode_attrs,
).execute()
create_alias.alters_data = True
def copy_for_translation(
self, locale, copy_parents=False, alias=False, exclude_fields=None
):
"""Creates a copy of this page in the specified locale."""
return CopyPageForTranslationAction(
self,
locale,
copy_parents=copy_parents,
alias=alias,
exclude_fields=exclude_fields,
).execute()
copy_for_translation.alters_data = True
def permissions_for_user(self, user):
"""
Return a PagePermissionsTester object defining what actions the user can perform on this page.
"""
# Allow specific classes to override this method, but only cast to the
# specific instance if it's not already specific and if the method has
# been overridden. This helps improve performance when working with
# base Page querysets.
is_overridden = (
self.specific_class
and self.specific_class.permissions_for_user
!= type(self).permissions_for_user
)
if is_overridden and not isinstance(self, self.specific_class):
return self.specific_deferred.permissions_for_user(user)
return PagePermissionTester(user, self)
def is_previewable(self):
"""Returns True if at least one preview mode is specified"""
# It's possible that this will be called from a listing page using a plain Page queryset -
# if so, checking self.preview_modes would incorrectly give us the default set from
# Page.preview_modes. However, accessing self.specific.preview_modes would result in an N+1
# query problem. To avoid this (at least in the general case), we'll call .specific only if
# a check of the property at the class level indicates that preview_modes has been
# overridden from whatever type we're currently in.
page = self
if page.specific_class.preview_modes != type(page).preview_modes:
page = page.specific
return bool(page.preview_modes)
def get_route_paths(self):
"""
Returns a list of paths that this page can be viewed at.
These values are combined with the dynamic portion of the page URL to
automatically create redirects when the page's URL changes.
.. note::
If using ``RoutablePageMixin``, you may want to override this method
to include the paths of popular routes.
.. note::
Redirect paths are 'normalized' to apply consistent ordering to GET parameters,
so you don't need to include every variation. Fragment identifiers are discarded
too, so should be avoided.
"""
return ["/"]
def get_cached_paths(self):
"""
This returns a list of paths to invalidate in a frontend cache
"""
return ["/"]
def get_cache_key_components(self):
"""
The components of a :class:`Page` which make up the :attr:`cache_key`. Any change to a
page should be reflected in a change to at least one of these components.
"""
return [
self.id,
self.url_path,
self.last_published_at.isoformat() if self.last_published_at else None,
]
@property
def cache_key(self):
"""
A generic cache key to identify a page in its current state.
Should the page change, so will the key.
Customizations to the cache key should be made in :attr:`get_cache_key_components`.
"""
hasher = safe_md5()
for component in self.get_cache_key_components():
hasher.update(force_bytes(component))
return hasher.hexdigest()
def get_sitemap_urls(self, request=None):
return [
{
"location": self.get_full_url(request),
# fall back on latest_revision_created_at if last_published_at is null
# (for backwards compatibility from before last_published_at was added)
"lastmod": (self.last_published_at or self.latest_revision_created_at),
}
]
def get_ancestors(self, inclusive=False):
"""
Returns a queryset of the current page's ancestors, starting at the root page
and descending to the parent, or to the current page itself if ``inclusive`` is true.
"""
return Page.objects.ancestor_of(self, inclusive)
def get_descendants(self, inclusive=False):
"""
Returns a queryset of all pages underneath the current page, any number of levels deep.
If ``inclusive`` is true, the current page itself is included in the queryset.
"""
return Page.objects.descendant_of(self, inclusive)
def get_siblings(self, inclusive=True):
"""
Returns a queryset of all other pages with the same parent as the current page.
If ``inclusive`` is true, the current page itself is included in the queryset.
"""
return Page.objects.sibling_of(self, inclusive)
def get_next_siblings(self, inclusive=False):
return self.get_siblings(inclusive).filter(path__gte=self.path).order_by("path")
def get_prev_siblings(self, inclusive=False):
return (
self.get_siblings(inclusive).filter(path__lte=self.path).order_by("-path")
)
def get_view_restrictions(self):
"""
Return a query set of all page view restrictions that apply to this page.
This checks the current page and all ancestor pages for page view restrictions.
If any of those pages are aliases, it will resolve them to their source pages
before querying PageViewRestrictions so alias pages use the same view restrictions
as their source page and they cannot have their own.
"""
page_ids_to_check = set()
def add_page_to_check_list(page):
# If the page is an alias, add the source page to the check list instead
if page.alias_of:
add_page_to_check_list(page.alias_of)
else:
page_ids_to_check.add(page.id)
# Check current page for view restrictions
add_page_to_check_list(self)
# Check each ancestor for view restrictions as well
for page in self.get_ancestors().only("alias_of"):
add_page_to_check_list(page)
return PageViewRestriction.objects.filter(page_id__in=page_ids_to_check)
password_required_template = None
def serve_password_required_response(self, request, form, action_url):
"""
Serve a response indicating that the user has been denied access to view this page,
and must supply a password.
``form`` = a Django form object containing the password input
(and zero or more hidden fields that also need to be output on the template)
``action_url`` = URL that this form should be POSTed to
"""
password_required_template = self.password_required_template or getattr(
settings,
"WAGTAIL_PASSWORD_REQUIRED_TEMPLATE",
"wagtailcore/password_required.html",
)
context = self.get_context(request)
context["form"] = form
context["action_url"] = action_url
return TemplateResponse(request, password_required_template, context)
def with_content_json(self, content):
"""
Returns a new version of the page with field values updated to reflect changes
in the provided ``content`` (which usually comes from a previously-saved
page revision).
Certain field values are preserved in order to prevent errors if the returned
page is saved, such as ``id``, ``content_type`` and some tree-related values.
The following field values are also preserved, as they are considered to be
meaningful to the page as a whole, rather than to a specific revision:
* ``draft_title``
* ``live``
* ``has_unpublished_changes``
* ``owner``
* ``locked``
* ``locked_by``
* ``locked_at``
* ``latest_revision``
* ``latest_revision_created_at``
* ``first_published_at``
* ``alias_of``
* ``wagtail_admin_comments`` (COMMENTS_RELATION_NAME)
"""
# Old revisions (pre Wagtail 2.15) may have saved comment data under the name 'comments'
# rather than the current relation name as set by COMMENTS_RELATION_NAME;
# if a 'comments' field exists and looks like our comments model, alter the data to use
# COMMENTS_RELATION_NAME before restoring
if (
COMMENTS_RELATION_NAME not in content
and "comments" in content
and isinstance(content["comments"], list)
and len(content["comments"])
and isinstance(content["comments"][0], dict)
and "contentpath" in content["comments"][0]
):
content[COMMENTS_RELATION_NAME] = content["comments"]
del content["comments"]
obj = self.specific_class.from_serializable_data(content)
# These should definitely never change between revisions
obj.id = self.id
obj.pk = self.pk
obj.content_type_id = self.content_type_id
# Override possibly-outdated tree parameter fields
obj.path = self.path
obj.depth = self.depth
obj.numchild = self.numchild
# Update url_path to reflect potential slug changes, but maintaining the page's
# existing tree position
obj.set_url_path(self.get_parent())
# Ensure other values that are meaningful for the page as a whole (rather than
# to a specific revision) are preserved
obj.draft_title = self.draft_title
obj.live = self.live
obj.has_unpublished_changes = self.has_unpublished_changes
obj.owner_id = self.owner_id
obj.locked = self.locked
obj.locked_by_id = self.locked_by_id
obj.locked_at = self.locked_at
obj.latest_revision_id = self.latest_revision_id
obj.latest_revision_created_at = self.latest_revision_created_at
obj.first_published_at = self.first_published_at
obj.translation_key = self.translation_key
obj.locale_id = self.locale_id
obj.alias_of_id = self.alias_of_id
revision_comment_positions = dict(
getattr(obj, COMMENTS_RELATION_NAME).values_list("id", "position")
)
page_comments = (
getattr(self, COMMENTS_RELATION_NAME)
.filter(resolved_at__isnull=True)
.defer("position")
)
for comment in page_comments:
# attempt to retrieve the comment position from the revision's stored version
# of the comment
try:
comment.position = revision_comment_positions[comment.id]
except KeyError:
pass
setattr(obj, COMMENTS_RELATION_NAME, page_comments)
return obj
@property
def has_workflow(self):
"""
Returns ``True`` if the page or an ancestor has an active workflow assigned, otherwise ``False``.
"""
if not getattr(settings, "WAGTAIL_WORKFLOW_ENABLED", True):
return False
return (
self.get_ancestors(inclusive=True)
.filter(workflowpage__isnull=False)
.filter(workflowpage__workflow__active=True)
.exists()
)
def get_workflow(self):
"""
Returns the active workflow assigned to the page or its nearest ancestor.
"""
if not getattr(settings, "WAGTAIL_WORKFLOW_ENABLED", True):
return None
if hasattr(self, "workflowpage") and self.workflowpage.workflow.active:
return self.workflowpage.workflow
else:
try:
workflow = (
self.get_ancestors()
.filter(workflowpage__isnull=False)
.filter(workflowpage__workflow__active=True)
.order_by("-depth")
.first()
.workflowpage.workflow
)
except AttributeError:
workflow = None
return workflow
class Meta:
verbose_name = _("page")
verbose_name_plural = _("pages")
unique_together = [("translation_key", "locale")]
# Make sure that we auto-create Permission objects that are defined in
# PAGE_PERMISSION_TYPES, skipping the default_permissions from Django.
permissions = [
(codename, name)
for codename, _, name in PAGE_PERMISSION_TYPES
if codename not in {"add_page", "change_page", "delete_page", "view_page"}
]
# set module path of Page so that when Sphinx autodoc sees Page in type annotations
# it won't complain that there's no target for wagtail.models.pages.Page
Page.__module__ = "wagtail.models"
class GroupPagePermissionManager(models.Manager):
def create(self, **kwargs):
# Simplify creation of GroupPagePermission objects by allowing one
# of permission or permission_type to be passed in.
permission = kwargs.get("permission")
permission_type = kwargs.pop("permission_type", None)
if not permission and permission_type:
kwargs["permission"] = Permission.objects.get(
content_type=get_default_page_content_type(),
codename=f"{permission_type}_page",
)
return super().create(**kwargs)
class GroupPagePermission(models.Model):
group = models.ForeignKey(
Group,
verbose_name=_("group"),
related_name="page_permissions",
on_delete=models.CASCADE,
)
page = models.ForeignKey(
"Page",
verbose_name=_("page"),
related_name="group_permissions",
on_delete=models.CASCADE,
)
permission = models.ForeignKey(
Permission,
verbose_name=_("permission"),
on_delete=models.CASCADE,
)
objects = GroupPagePermissionManager()
class Meta:
constraints = [
models.UniqueConstraint(
fields=("group", "page", "permission"),
name="unique_permission",
),
]
verbose_name = _("group page permission")
verbose_name_plural = _("group page permissions")
def __str__(self):
return "Group %d ('%s') has permission '%s' on page %d ('%s')" % (
self.group.id,
self.group,
self.permission.codename,
self.page.id,
self.page,
)
class PagePermissionTester:
def __init__(self, user, page):
from wagtail.permissions import page_permission_policy
self.user = user
self.permission_policy = page_permission_policy
self.page = page
self.page_is_root = page.depth == 1 # Equivalent to page.is_root()
if self.user.is_active and not self.user.is_superuser:
self.permissions = {
# Get the 'action' part of the permission codename, e.g.
# 'add' instead of 'add_page'
perm.permission.codename.rsplit("_", maxsplit=1)[0]
for perm in self.permission_policy.get_cached_permissions_for_user(user)
if self.page.path.startswith(perm.page.path)
}
def user_has_lock(self):
return self.page.locked_by_id == self.user.pk
def page_locked(self):
lock = self.page.get_lock()
return lock and lock.for_user(self.user)
def can_add_subpage(self):
if not self.user.is_active:
return False
specific_class = self.page.specific_class
if specific_class is None or not specific_class.creatable_subpage_models():
return False
return self.user.is_superuser or ("add" in self.permissions)
def can_edit(self):
if not self.user.is_active:
return False
if (
self.page_is_root
): # root node is not a page and can never be edited, even by superusers
return False
if self.user.is_superuser:
return True
if "change" in self.permissions:
return True
if "add" in self.permissions and self.page.owner_id == self.user.pk:
return True
current_workflow_task = self.page.current_workflow_task
if current_workflow_task:
if current_workflow_task.user_can_access_editor(self.page, self.user):
return True
return False
def can_delete(self, ignore_bulk=False):
if not self.user.is_active:
return False
if (
self.page_is_root
): # root node is not a page and can never be deleted, even by superusers
return False
if self.user.is_superuser:
# superusers require no further checks
return True
# if the user does not have bulk_delete permission, they may only delete leaf pages
if (
"bulk_delete" not in self.permissions
and not self.page.is_leaf()
and not ignore_bulk
):
return False
if "change" in self.permissions:
# if the user does not have publish permission, we also need to confirm that there
# are no published pages here
if "publish" not in self.permissions:
pages_to_delete = self.page.get_descendants(inclusive=True)
if pages_to_delete.live().exists():
return False
return True
elif "add" in self.permissions:
pages_to_delete = self.page.get_descendants(inclusive=True)
if "publish" in self.permissions:
# we don't care about live state, but all pages must be owned by this user
# (i.e. eliminating pages owned by this user must give us the empty set)
return not pages_to_delete.exclude(owner=self.user).exists()
else:
# all pages must be owned by this user and non-live
# (i.e. eliminating non-live pages owned by this user must give us the empty set)
return not pages_to_delete.exclude(live=False, owner=self.user).exists()
else:
return False
def can_unpublish(self):
if not self.user.is_active:
return False
if (not self.page.live) or self.page_is_root:
return False
if self.page_locked():
return False
return self.user.is_superuser or ("publish" in self.permissions)
def can_publish(self):
if not self.user.is_active:
return False
if self.page_is_root:
return False
return self.user.is_superuser or ("publish" in self.permissions)
def can_submit_for_moderation(self):
return (
not self.page_locked()
and self.page.has_workflow
and not self.page.workflow_in_progress
)
def can_set_view_restrictions(self):
return self.can_publish()
def can_unschedule(self):
return self.can_publish()
def can_lock(self):
if self.user.is_superuser:
return True
current_workflow_task = self.page.current_workflow_task
if current_workflow_task:
return current_workflow_task.user_can_lock(self.page, self.user)
if "lock" in self.permissions:
return True
return False
def can_unlock(self):
if self.user.is_superuser:
return True
if self.user_has_lock():
return True
current_workflow_task = self.page.current_workflow_task
if current_workflow_task:
return current_workflow_task.user_can_unlock(self.page, self.user)
if "unlock" in self.permissions:
return True
return False
def can_publish_subpage(self):
"""
Niggly special case for creating and publishing a page in one go.
Differs from can_publish in that we want to be able to publish subpages of root, but not
to be able to publish root itself. (Also, can_publish_subpage returns false if the page
does not allow subpages at all.)
"""
if not self.user.is_active:
return False
specific_class = self.page.specific_class
if specific_class is None or not specific_class.creatable_subpage_models():
return False
return self.user.is_superuser or ("publish" in self.permissions)
def can_reorder_children(self):
"""
Reorder permission checking is similar to publishing a subpage, since it immediately
affects published pages. However, it shouldn't care about the 'creatability' of
page types, because the action only ever updates existing pages.
"""
if not self.user.is_active:
return False
return self.user.is_superuser or ("publish" in self.permissions)
def can_move(self):
"""
Moving a page should be logically equivalent to deleting and re-adding it (and all its children).
As such, the permission test for 'can this be moved at all?' should be the same as for deletion.
(Further constraints will then apply on where it can be moved *to*.)
"""
return self.can_delete(ignore_bulk=True)
def can_copy(self):
return not self.page_is_root
def can_move_to(self, destination):
# reject the logically impossible cases first
if self.page == destination or destination.is_descendant_of(self.page):
return False
# reject moves that are forbidden by subpage_types / parent_page_types rules
# (these rules apply to superusers too)
# but only check this if the page is not already under the target parent.
# If it already is, then the user is just reordering the page, and we want
# to allow it even if the page currently violates the subpage_type /
# parent_page_type rules. This can happen if it was either created before
# the rules were specified, or it was done programmatically (e.g. to
# predefine a set of pages and disallow the creation of new subpages by
# setting subpage_types = []).
if (not self.page.is_child_of(destination)) and (
not self.page.specific.can_move_to(destination)
):
return False
# shortcut the trivial 'everything' / 'nothing' permissions
if not self.user.is_active:
return False
if self.user.is_superuser:
return True
# check that the page can be moved at all
if not self.can_move():
return False
# Inspect permissions on the destination
destination_perms = destination.permissions_for_user(self.user)
# we always need at least add permission in the target
if "add" not in destination_perms.permissions:
return False
if self.page.live or self.page.get_descendants().filter(live=True).exists():
# moving this page will entail publishing within the destination section
return "publish" in destination_perms.permissions
else:
# no publishing required, so the already-tested 'add' permission is sufficient
return True
def can_copy_to(self, destination, recursive=False):
# reject the logically impossible cases first
# recursive can't copy to the same tree otherwise it will be on infinite loop
if recursive and (
self.page == destination or destination.is_descendant_of(self.page)
):
return False
# reject inactive users early
if not self.user.is_active:
return False
# reject early if pages of this type cannot be created at the destination
if not self.page.specific_class.can_create_at(destination):
return False
# skip permission checking for super users
if self.user.is_superuser:
return True
# Inspect permissions on the destination
destination_perms = destination.permissions_for_user(self.user)
if not destination.specific_class.creatable_subpage_models():
return False
# we always need at least add permission in the target
if "add" not in destination_perms.permissions:
return False
return True
def can_view_revisions(self):
return not self.page_is_root
class PageViewRestriction(BaseViewRestriction):
page = models.ForeignKey(
"Page",
verbose_name=_("page"),
related_name="view_restrictions",
on_delete=models.CASCADE,
)
passed_view_restrictions_session_key = "passed_page_view_restrictions"
class Meta:
verbose_name = _("page view restriction")
verbose_name_plural = _("page view restrictions")
def save(self, user=None, **kwargs):
"""
Custom save handler to include logging.
:param user: the user add/updating the view restriction
:param specific_instance: the specific model instance the restriction applies to
"""
specific_instance = self.page.specific
is_new = self.id is None
super().save(**kwargs)
if specific_instance:
log(
instance=specific_instance,
action="wagtail.view_restriction.create"
if is_new
else "wagtail.view_restriction.edit",
user=user,
data={
"restriction": {
"type": self.restriction_type,
"title": force_str(
dict(self.RESTRICTION_CHOICES).get(self.restriction_type)
),
}
},
)
def delete(self, user=None, **kwargs):
"""
Custom delete handler to aid in logging.
:param user: the user removing the view restriction
"""
specific_instance = self.page.specific
if specific_instance:
removed_restriction_type = PageViewRestriction.objects.filter(
id=self.id
).values_list("restriction_type", flat=True)[0]
log(
instance=specific_instance,
action="wagtail.view_restriction.delete",
user=user,
data={
"restriction": {
"type": self.restriction_type,
"title": force_str(
dict(self.RESTRICTION_CHOICES).get(removed_restriction_type)
),
}
},
)
return super().delete(**kwargs)
class WorkflowPage(models.Model):
page = models.OneToOneField(
"Page",
verbose_name=_("page"),
on_delete=models.CASCADE,
primary_key=True,
unique=True,
)
workflow = models.ForeignKey(
"Workflow",
related_name="workflow_pages",
verbose_name=_("workflow"),
on_delete=models.CASCADE,
)
def get_pages(self):
"""
Returns a queryset of pages that are affected by this ``WorkflowPage`` link.
This includes all descendants of the page excluding any that have other ``WorkflowPage``(s).
"""
descendant_pages = Page.objects.descendant_of(self.page, inclusive=True)
descendant_workflow_pages = WorkflowPage.objects.filter(
page_id__in=descendant_pages.values_list("id", flat=True)
).exclude(pk=self.pk)
for path, depth in descendant_workflow_pages.values_list(
"page__path", "page__depth"
):
descendant_pages = descendant_pages.exclude(
path__startswith=path, depth__gte=depth
)
return descendant_pages
class Meta:
verbose_name = _("workflow page")
verbose_name_plural = _("workflow pages")
class PageLogEntryQuerySet(LogEntryQuerySet):
def get_content_type_ids(self):
# for reporting purposes, pages of all types are combined under a single "Page"
# object type
if self.exists():
return {ContentType.objects.get_for_model(Page).pk}
else:
return set()
def filter_on_content_type(self, content_type):
if content_type == ContentType.objects.get_for_model(Page):
return self
else:
return self.none()
class PageLogEntryManager(BaseLogEntryManager):
def get_queryset(self):
return PageLogEntryQuerySet(self.model, using=self._db)
def get_instance_title(self, instance):
return instance.specific_deferred.get_admin_display_title()
def log_action(self, instance, action, **kwargs):
kwargs.update(page=instance)
return super().log_action(instance, action, **kwargs)
def viewable_by_user(self, user):
from wagtail.permissions import page_permission_policy
explorable_instances = page_permission_policy.explorable_instances(user)
q = Q(page__in=explorable_instances.values_list("pk", flat=True))
root_page_permissions = Page.get_first_root_node().permissions_for_user(user)
if (
user.is_superuser
or root_page_permissions.can_add_subpage()
or root_page_permissions.can_edit()
):
# Include deleted entries
q = q | Q(
page_id__in=Subquery(
PageLogEntry.objects.filter(deleted=True).values("page_id")
)
)
return PageLogEntry.objects.filter(q)
def for_instance(self, instance):
return self.filter(page=instance)
class PageLogEntry(BaseLogEntry):
page = models.ForeignKey(
"wagtailcore.Page",
on_delete=models.DO_NOTHING,
db_constraint=False,
related_name="+",
)
objects = PageLogEntryManager()
class Meta:
ordering = ["-timestamp", "-id"]
verbose_name = _("page log entry")
verbose_name_plural = _("page log entries")
def __str__(self):
return "PageLogEntry %d: '%s' on '%s' with id %s" % (
self.pk,
self.action,
self.object_verbose_name(),
self.page_id,
)
@cached_property
def object_id(self):
return self.page_id
@cached_property
def message(self):
# for page log entries, the 'edit' action should show as 'Draft saved'
if self.action == "wagtail.edit":
return _("Draft saved")
else:
return super().message
class Comment(ClusterableModel):
"""
A comment on a field, or a field within a streamfield block
"""
page = ParentalKey(
Page, on_delete=models.CASCADE, related_name=COMMENTS_RELATION_NAME
)
user = models.ForeignKey(
settings.AUTH_USER_MODEL,
on_delete=models.CASCADE,
related_name=COMMENTS_RELATION_NAME,
)
text = models.TextField()
contentpath = models.TextField()
# This stores the field or field within a streamfield block that the comment is applied on, in the form: 'field', or 'field.block_id.field'
# This must be unchanging across all revisions, so we will not support (current-format) ListBlock or the contents of InlinePanels initially.
position = models.TextField(blank=True)
# This stores the position within a field, to be interpreted by the field's frontend widget. It may change between revisions
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
revision_created = models.ForeignKey(
Revision,
on_delete=models.CASCADE,
related_name="created_comments",
null=True,
blank=True,
)
resolved_at = models.DateTimeField(null=True, blank=True)
resolved_by = models.ForeignKey(
settings.AUTH_USER_MODEL,
on_delete=models.SET_NULL,
related_name="comments_resolved",
null=True,
blank=True,
)
class Meta:
verbose_name = _("comment")
verbose_name_plural = _("comments")
def __str__(self):
return "Comment on Page '{}', left by {}: '{}'".format(
self.page, self.user, self.text
)
def save(self, update_position=False, **kwargs):
# Don't save the position unless specifically instructed to, as the position will normally be retrieved from the revision
update_fields = kwargs.pop("update_fields", None)
if not update_position and (
not update_fields or "position" not in update_fields
):
if self.id:
# The instance is already saved; we can use `update_fields`
update_fields = (
update_fields if update_fields else self._meta.get_fields()
)
update_fields = [
field.name
for field in update_fields
if field.name not in {"position", "id"}
]
else:
# This is a new instance, we have to preserve and then restore the position via a variable
position = self.position
result = super().save(**kwargs)
self.position = position
return result
return super().save(update_fields=update_fields, **kwargs)
def _log(self, action, page_revision=None, user=None):
log(
instance=self.page,
action=action,
user=user,
revision=page_revision,
data={
"comment": {
"id": self.pk,
"contentpath": self.contentpath,
"text": self.text,
}
},
)
def log_create(self, **kwargs):
self._log("wagtail.comments.create", **kwargs)
def log_edit(self, **kwargs):
self._log("wagtail.comments.edit", **kwargs)
def log_resolve(self, **kwargs):
self._log("wagtail.comments.resolve", **kwargs)
def log_delete(self, **kwargs):
self._log("wagtail.comments.delete", **kwargs)
def has_valid_contentpath(self, page):
"""
Return True if this comment's contentpath corresponds to a valid field or
StreamField block on the given page object.
"""
field_name, *remainder = self.contentpath.split(".")
try:
field = page._meta.get_field(field_name)
except FieldDoesNotExist:
return False
if not remainder:
# comment applies to the field as a whole
return True
if not isinstance(field, StreamField):
# only StreamField supports content paths that are deeper than one level
return False
stream_value = getattr(page, field_name)
block = field.get_block_by_content_path(stream_value, remainder)
# content path is valid if this returns a BoundBlock rather than None
return bool(block)
class CommentReply(models.Model):
comment = ParentalKey(Comment, on_delete=models.CASCADE, related_name="replies")
user = models.ForeignKey(
settings.AUTH_USER_MODEL,
on_delete=models.CASCADE,
related_name="comment_replies",
)
text = models.TextField()
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
class Meta:
verbose_name = _("comment reply")
verbose_name_plural = _("comment replies")
def __str__(self):
return f"CommentReply left by '{self.user}': '{self.text}'"
def _log(self, action, page_revision=None, user=None):
log(
instance=self.comment.page,
action=action,
user=user,
revision=page_revision,
data={
"comment": {
"id": self.comment.pk,
"contentpath": self.comment.contentpath,
"text": self.comment.text,
},
"reply": {
"id": self.pk,
"text": self.text,
},
},
)
def log_create(self, **kwargs):
self._log("wagtail.comments.create_reply", **kwargs)
def log_edit(self, **kwargs):
self._log("wagtail.comments.edit_reply", **kwargs)
def log_delete(self, **kwargs):
self._log("wagtail.comments.delete_reply", **kwargs)
class PageSubscription(models.Model):
user = models.ForeignKey(
settings.AUTH_USER_MODEL,
on_delete=models.CASCADE,
related_name="page_subscriptions",
)
page = models.ForeignKey(Page, on_delete=models.CASCADE, related_name="subscribers")
comment_notifications = models.BooleanField()
wagtail_reference_index_ignore = True
class Meta:
unique_together = [
("page", "user"),
]