angrybeanie_wagtail/env/lib/python3.12/site-packages/wagtail/blocks/migrations/utils.py

292 lines
9.3 KiB
Python
Raw Permalink Normal View History

2025-07-25 21:32:16 +10:00
from wagtail.blocks import ListBlock, StreamBlock, StructBlock
class InvalidBlockDefError(Exception):
"""Exception for invalid block definitions"""
def __init__(self, *args, instance=None, revision=None, **kwargs):
# in the case of a revision pass both instance and revision
self.instance = instance
self.revision = revision
super().__init__(*args, **kwargs)
def __str__(self):
message = ""
if self.instance is not None:
message += "Invalid block def in {} object ({})".format(
self.instance.__class__.__name__, self.instance.id
)
if self.revision is not None:
message += " for revision id ({}) created at {}".format(
self.revision.id,
self.revision.created_at,
)
if self.args:
message += "\n"
message += super().__str__()
return message
def should_alter_block(block_name, block_path):
# If the block is not at the start of `block_path`, then neither it nor its children are
# blocks that we need to alter.
return block_name == block_path[0]
def map_block_value(block_value, block_def, block_path, operation, **kwargs):
"""
Maps the value of a block.
Args:
block_value:
The value of the block. This would be a list or dict of children for structural blocks.
block_def:
The definition of the block.
block_path:
A '.' separated list of names of the blocks from the current block (not included) to
the nested block of which the value will be passed to the operation.
operation:
An Operation class instance (extends `BaseBlockOperation`), which has an `apply` method
for mapping values.
Returns:
mapped_value:
"""
# If the `block_path` length is 0, that means we've reached the end of the block path, that
# is, the block where we need to apply the operation. Note that we are asking the user to
# pass "item" as part of the block path for list children, so it won't give rise to any
# problems here.
if len(block_path) == 0:
return operation.apply(block_value)
# Depending on whether the block is a ListBlock, StructBlock or StreamBlock we call a
# different function to alter its children.
if isinstance(block_def, StreamBlock):
return map_stream_block_value(
block_value,
operation=operation,
block_def=block_def,
block_path=block_path,
**kwargs,
)
elif isinstance(block_def, ListBlock):
return map_list_block_value(
block_value,
operation=operation,
block_def=block_def,
block_path=block_path,
**kwargs,
)
elif isinstance(block_def, StructBlock):
return map_struct_block_value(
block_value,
operation=operation,
block_def=block_def,
block_path=block_path,
**kwargs,
)
else:
raise ValueError(f"Unexpected Structural Block: {block_value}")
def map_stream_block_value(stream_block_value, block_def, block_path, **kwargs):
"""
Maps each child block in a StreamBlock value.
Args:
stream_block_value:
The value of the StreamBlock, a list of child blocks
block_def:
The definition of the StreamBlock
block_path:
A '.' separated list of names of the blocks from the current block (not included) to
the nested block of which the value will be passed to the operation.
Returns
mapped_value:
The value of the StreamBlock after mapping all the children.
"""
mapped_value = []
for child_block in stream_block_value:
if not should_alter_block(child_block["type"], block_path):
mapped_value.append(child_block)
else:
try:
child_block_def = block_def.child_blocks[child_block["type"]]
except KeyError:
raise InvalidBlockDefError(
"No current block def named {}".format(child_block["type"])
)
mapped_child_value = map_block_value(
child_block["value"],
block_def=child_block_def,
block_path=block_path[1:],
**kwargs,
)
mapped_value.append({**child_block, "value": mapped_child_value})
return mapped_value
def map_struct_block_value(struct_block_value, block_def, block_path, **kwargs):
"""
Maps each child block in a StructBlock value.
Args:
stream_block_value:
The value of the StructBlock, a dict of child blocks
block_def:
The definition of the StructBlock
block_path:
A '.' separated list of names of the blocks from the current block (not included) to
the nested block of which the value will be passed to the operation.
Returns
mapped_value:
The value of the StructBlock after mapping all the children.
"""
mapped_value = {}
for key, child_value in struct_block_value.items():
if not should_alter_block(key, block_path):
mapped_value[key] = child_value
else:
try:
child_block_def = block_def.child_blocks[key]
except KeyError:
raise InvalidBlockDefError(f"No current block def named {key}")
altered_child_value = map_block_value(
child_value,
block_def=child_block_def,
block_path=block_path[1:],
**kwargs,
)
mapped_value[key] = altered_child_value
return mapped_value
def map_list_block_value(list_block_value, block_def, block_path, **kwargs):
"""
Maps each child block in a ListBlock value.
Args:
stream_block_value:
The value of the ListBlock, a list of child blocks
block_def:
The definition of the ListBlock
block_path:
A '.' separated list of names of the blocks from the current block (not included) to
the nested block of which the value will be passed to the operation.
Returns
mapped_value:
The value of the ListBlock after mapping all the children.
"""
mapped_value = []
# In case data is in old list format
for child_block in formatted_list_child_generator(list_block_value):
mapped_child_value = map_block_value(
child_block["value"],
block_def=block_def.child_block,
block_path=block_path[1:],
**kwargs,
)
mapped_value.append({**child_block, "value": mapped_child_value})
return mapped_value
def formatted_list_child_generator(list_block_value):
is_old_format = False
if not isinstance(list_block_value[0], dict):
is_old_format = True
elif "type" not in list_block_value[0] or list_block_value[0]["type"] != "item":
is_old_format = True
for child in list_block_value:
if not is_old_format:
yield child
else:
yield {"type": "item", "value": child}
def apply_changes_to_raw_data(
raw_data, block_path_str, operation, streamfield, **kwargs
):
"""
Applies changes to raw stream data
Args:
raw_data:
The current stream data (a list of top level blocks)
block_path_str:
A '.' separated list of names of the blocks from the top level block to the nested
block of which the value will be passed to the operation.
eg:- 'simplestream.struct1' would point to,
[..., { type: simplestream, value: [..., { type: struct1, value: {...} }] }]
NOTE: If we're directly applying changes on the top level stream block, then this will
be "".
NOTE: When the path contains a ListBlock child, 'item' must be added to the block as
the name of said child.
eg:- 'list1.item.stream1' where the list child is a StructBlock would point to,
[
...,
{
type: list1,
value: [
{
type: item,
value: { ..., stream1: [...] }
},
...
]
}
]
operation:
A subclass of `operations.BaseBlockOperation`. It will have the `apply` method
for applying changes to the matching block values.
streamfield:
The streamfield for which data is being migrated. This is used to get the definitions
of the blocks.
Returns:
altered_raw_data:
"""
if block_path_str == "":
# If block_path_str is "", we're directly applying the operation on the top level
# streamblock.
block_path = []
else:
block_path = block_path_str.split(".")
block_def = streamfield.field.stream_block
altered_raw_data = map_block_value(
raw_data,
block_def=block_def,
block_path=block_path,
operation=operation,
**kwargs,
)
return altered_raw_data