1278 lines
41 KiB
Python
1278 lines
41 KiB
Python
|
|
import json
|
||
|
|
from collections import OrderedDict
|
||
|
|
from copy import deepcopy
|
||
|
|
from urllib.parse import urlparse
|
||
|
|
|
||
|
|
from django.db import DEFAULT_DB_ALIAS, models
|
||
|
|
from django.db.models import Subquery
|
||
|
|
from django.db.models.sql import Query
|
||
|
|
from django.db.models.sql.constants import MULTI, SINGLE
|
||
|
|
from django.utils.crypto import get_random_string
|
||
|
|
from elasticsearch import VERSION as ELASTICSEARCH_VERSION
|
||
|
|
from elasticsearch import Elasticsearch, NotFoundError
|
||
|
|
from elasticsearch.helpers import bulk
|
||
|
|
|
||
|
|
from wagtail.search.backends.base import (
|
||
|
|
BaseSearchBackend,
|
||
|
|
BaseSearchQueryCompiler,
|
||
|
|
BaseSearchResults,
|
||
|
|
FilterFieldError,
|
||
|
|
get_model_root,
|
||
|
|
)
|
||
|
|
from wagtail.search.index import (
|
||
|
|
AutocompleteField,
|
||
|
|
FilterField,
|
||
|
|
Indexed,
|
||
|
|
RelatedFields,
|
||
|
|
SearchField,
|
||
|
|
class_is_indexed,
|
||
|
|
get_indexed_models,
|
||
|
|
)
|
||
|
|
from wagtail.search.query import And, Boost, Fuzzy, MatchAll, Not, Or, Phrase, PlainText
|
||
|
|
from wagtail.utils.utils import deep_update
|
||
|
|
|
||
|
|
use_new_elasticsearch_api = ELASTICSEARCH_VERSION >= (7, 15)
|
||
|
|
|
||
|
|
|
||
|
|
class Field:
|
||
|
|
def __init__(self, field_name, boost=1):
|
||
|
|
self.field_name = field_name
|
||
|
|
self.boost = boost
|
||
|
|
|
||
|
|
@property
|
||
|
|
def field_name_with_boost(self):
|
||
|
|
if self.boost == 1:
|
||
|
|
return self.field_name
|
||
|
|
else:
|
||
|
|
return f"{self.field_name}^{self.boost}"
|
||
|
|
|
||
|
|
|
||
|
|
class Elasticsearch7Mapping:
|
||
|
|
all_field_name = "_all_text"
|
||
|
|
edgengrams_field_name = "_edgengrams"
|
||
|
|
|
||
|
|
type_map = {
|
||
|
|
"AutoField": "integer",
|
||
|
|
"SmallAutoField": "integer",
|
||
|
|
"BigAutoField": "long",
|
||
|
|
"BinaryField": "binary",
|
||
|
|
"BooleanField": "boolean",
|
||
|
|
"CharField": "string",
|
||
|
|
"CommaSeparatedIntegerField": "string",
|
||
|
|
"DateField": "date",
|
||
|
|
"DateTimeField": "date",
|
||
|
|
"DecimalField": "double",
|
||
|
|
"FileField": "string",
|
||
|
|
"FilePathField": "string",
|
||
|
|
"FloatField": "double",
|
||
|
|
"IntegerField": "integer",
|
||
|
|
"BigIntegerField": "long",
|
||
|
|
"IPAddressField": "string",
|
||
|
|
"GenericIPAddressField": "string",
|
||
|
|
"NullBooleanField": "boolean",
|
||
|
|
"PositiveIntegerField": "integer",
|
||
|
|
"PositiveSmallIntegerField": "integer",
|
||
|
|
"PositiveBigIntegerField": "long",
|
||
|
|
"SlugField": "string",
|
||
|
|
"SmallIntegerField": "integer",
|
||
|
|
"TextField": "string",
|
||
|
|
"TimeField": "date",
|
||
|
|
"URLField": "string",
|
||
|
|
}
|
||
|
|
|
||
|
|
keyword_type = "keyword"
|
||
|
|
text_type = "text"
|
||
|
|
edgengram_analyzer_config = {
|
||
|
|
"analyzer": "edgengram_analyzer",
|
||
|
|
"search_analyzer": "standard",
|
||
|
|
}
|
||
|
|
|
||
|
|
def __init__(self, model):
|
||
|
|
self.model = model
|
||
|
|
|
||
|
|
def get_parent(self):
|
||
|
|
for base in self.model.__bases__:
|
||
|
|
if issubclass(base, Indexed) and issubclass(base, models.Model):
|
||
|
|
return type(self)(base)
|
||
|
|
|
||
|
|
def get_document_type(self):
|
||
|
|
return "doc"
|
||
|
|
|
||
|
|
def get_field_column_name(self, field):
|
||
|
|
# Fields in derived models get prefixed with their model name, fields
|
||
|
|
# in the root model don't get prefixed at all
|
||
|
|
# This is to prevent mapping clashes in cases where two page types have
|
||
|
|
# a field with the same name but a different type.
|
||
|
|
root_model = get_model_root(self.model)
|
||
|
|
definition_model = field.get_definition_model(self.model)
|
||
|
|
|
||
|
|
if definition_model != root_model:
|
||
|
|
prefix = (
|
||
|
|
definition_model._meta.app_label.lower()
|
||
|
|
+ "_"
|
||
|
|
+ definition_model.__name__.lower()
|
||
|
|
+ "__"
|
||
|
|
)
|
||
|
|
else:
|
||
|
|
prefix = ""
|
||
|
|
|
||
|
|
if isinstance(field, FilterField):
|
||
|
|
return prefix + field.get_attname(self.model) + "_filter"
|
||
|
|
elif isinstance(field, AutocompleteField):
|
||
|
|
return prefix + field.get_attname(self.model) + "_edgengrams"
|
||
|
|
elif isinstance(field, SearchField):
|
||
|
|
return prefix + field.get_attname(self.model)
|
||
|
|
elif isinstance(field, RelatedFields):
|
||
|
|
return prefix + field.field_name
|
||
|
|
|
||
|
|
def get_boost_field_name(self, boost):
|
||
|
|
# replace . with _ to avoid issues with . in field names
|
||
|
|
boost = str(float(boost)).replace(".", "_")
|
||
|
|
return f"{self.all_field_name}_boost_{boost}"
|
||
|
|
|
||
|
|
def get_content_type(self):
|
||
|
|
"""
|
||
|
|
Returns the content type as a string for the model.
|
||
|
|
|
||
|
|
For example: "wagtailcore.Page"
|
||
|
|
"myapp.MyModel"
|
||
|
|
"""
|
||
|
|
return self.model._meta.app_label + "." + self.model.__name__
|
||
|
|
|
||
|
|
def get_all_content_types(self):
|
||
|
|
"""
|
||
|
|
Returns all the content type strings that apply to this model.
|
||
|
|
This includes the models' content type and all concrete ancestor
|
||
|
|
models that inherit from Indexed.
|
||
|
|
|
||
|
|
For example: ["myapp.MyPageModel", "wagtailcore.Page"]
|
||
|
|
["myapp.MyModel"]
|
||
|
|
"""
|
||
|
|
# Add our content type
|
||
|
|
content_types = [self.get_content_type()]
|
||
|
|
|
||
|
|
# Add all ancestor classes content types as well
|
||
|
|
ancestor = self.get_parent()
|
||
|
|
while ancestor:
|
||
|
|
content_types.append(ancestor.get_content_type())
|
||
|
|
ancestor = ancestor.get_parent()
|
||
|
|
|
||
|
|
return content_types
|
||
|
|
|
||
|
|
def get_field_mapping(self, field):
|
||
|
|
if isinstance(field, RelatedFields):
|
||
|
|
mapping = {"type": "nested", "properties": {}}
|
||
|
|
nested_model = field.get_field(self.model).related_model
|
||
|
|
nested_mapping = type(self)(nested_model)
|
||
|
|
|
||
|
|
for sub_field in field.fields:
|
||
|
|
sub_field_name, sub_field_mapping = nested_mapping.get_field_mapping(
|
||
|
|
sub_field
|
||
|
|
)
|
||
|
|
mapping["properties"][sub_field_name] = sub_field_mapping
|
||
|
|
|
||
|
|
return self.get_field_column_name(field), mapping
|
||
|
|
else:
|
||
|
|
mapping = {"type": self.type_map.get(field.get_type(self.model), "string")}
|
||
|
|
|
||
|
|
if isinstance(field, SearchField):
|
||
|
|
if mapping["type"] == "string":
|
||
|
|
mapping["type"] = self.text_type
|
||
|
|
|
||
|
|
if field.boost:
|
||
|
|
mapping["boost"] = field.boost
|
||
|
|
|
||
|
|
mapping["include_in_all"] = True
|
||
|
|
|
||
|
|
if isinstance(field, AutocompleteField):
|
||
|
|
mapping["type"] = self.text_type
|
||
|
|
mapping.update(self.edgengram_analyzer_config)
|
||
|
|
|
||
|
|
elif isinstance(field, FilterField):
|
||
|
|
if mapping["type"] == "string":
|
||
|
|
mapping["type"] = self.keyword_type
|
||
|
|
|
||
|
|
if "es_extra" in field.kwargs:
|
||
|
|
for key, value in field.kwargs["es_extra"].items():
|
||
|
|
mapping[key] = value
|
||
|
|
|
||
|
|
return self.get_field_column_name(field), mapping
|
||
|
|
|
||
|
|
def get_mapping(self):
|
||
|
|
# Make field list
|
||
|
|
fields = {
|
||
|
|
"pk": {"type": self.keyword_type, "store": True},
|
||
|
|
"content_type": {"type": self.keyword_type},
|
||
|
|
self.edgengrams_field_name: {"type": self.text_type},
|
||
|
|
}
|
||
|
|
fields[self.edgengrams_field_name].update(self.edgengram_analyzer_config)
|
||
|
|
|
||
|
|
for field in self.model.get_search_fields():
|
||
|
|
key, val = self.get_field_mapping(field)
|
||
|
|
fields[key] = val
|
||
|
|
|
||
|
|
# Add _all_text field
|
||
|
|
fields[self.all_field_name] = {"type": "text"}
|
||
|
|
|
||
|
|
unique_boosts = set()
|
||
|
|
|
||
|
|
# Replace {"include_in_all": true} with {"copy_to": ["_all_text", "_all_text_boost_2"]}
|
||
|
|
def replace_include_in_all(properties):
|
||
|
|
for field_mapping in properties.values():
|
||
|
|
if "include_in_all" in field_mapping:
|
||
|
|
if field_mapping["include_in_all"]:
|
||
|
|
field_mapping["copy_to"] = self.all_field_name
|
||
|
|
|
||
|
|
if "boost" in field_mapping:
|
||
|
|
# added to unique_boosts to avoid duplicate fields, or cases like 2.0 and 2
|
||
|
|
unique_boosts.add(field_mapping["boost"])
|
||
|
|
field_mapping["copy_to"] = [
|
||
|
|
field_mapping["copy_to"],
|
||
|
|
self.get_boost_field_name(field_mapping["boost"]),
|
||
|
|
]
|
||
|
|
del field_mapping["boost"]
|
||
|
|
|
||
|
|
del field_mapping["include_in_all"]
|
||
|
|
|
||
|
|
if field_mapping["type"] == "nested":
|
||
|
|
replace_include_in_all(field_mapping["properties"])
|
||
|
|
|
||
|
|
replace_include_in_all(fields)
|
||
|
|
for boost in unique_boosts:
|
||
|
|
fields[self.get_boost_field_name(boost)] = {"type": "text"}
|
||
|
|
|
||
|
|
return {
|
||
|
|
"properties": fields,
|
||
|
|
}
|
||
|
|
|
||
|
|
def get_document_id(self, obj):
|
||
|
|
return str(obj.pk)
|
||
|
|
|
||
|
|
def _get_nested_document(self, fields, obj):
|
||
|
|
doc = {}
|
||
|
|
edgengrams = []
|
||
|
|
model = type(obj)
|
||
|
|
mapping = type(self)(model)
|
||
|
|
|
||
|
|
for field in fields:
|
||
|
|
value = field.get_value(obj)
|
||
|
|
doc[mapping.get_field_column_name(field)] = value
|
||
|
|
|
||
|
|
# Check if this field should be added into _edgengrams
|
||
|
|
if isinstance(field, AutocompleteField):
|
||
|
|
edgengrams.append(value)
|
||
|
|
|
||
|
|
return doc, edgengrams
|
||
|
|
|
||
|
|
def get_document(self, obj):
|
||
|
|
# Build document
|
||
|
|
doc = {"pk": str(obj.pk), "content_type": self.get_all_content_types()}
|
||
|
|
edgengrams = []
|
||
|
|
for field in self.model.get_search_fields():
|
||
|
|
value = field.get_value(obj)
|
||
|
|
|
||
|
|
if isinstance(field, RelatedFields):
|
||
|
|
if isinstance(value, (models.Manager, models.QuerySet)):
|
||
|
|
nested_docs = []
|
||
|
|
|
||
|
|
for nested_obj in value.all():
|
||
|
|
nested_doc, extra_edgengrams = self._get_nested_document(
|
||
|
|
field.fields, nested_obj
|
||
|
|
)
|
||
|
|
nested_docs.append(nested_doc)
|
||
|
|
edgengrams.extend(extra_edgengrams)
|
||
|
|
|
||
|
|
value = nested_docs
|
||
|
|
elif isinstance(value, models.Model):
|
||
|
|
value, extra_edgengrams = self._get_nested_document(
|
||
|
|
field.fields, value
|
||
|
|
)
|
||
|
|
edgengrams.extend(extra_edgengrams)
|
||
|
|
elif isinstance(field, FilterField):
|
||
|
|
if isinstance(value, (models.Manager, models.QuerySet)):
|
||
|
|
value = list(value.values_list("pk", flat=True))
|
||
|
|
elif isinstance(value, models.Model):
|
||
|
|
value = value.pk
|
||
|
|
elif isinstance(value, (list, tuple)):
|
||
|
|
value = [
|
||
|
|
item.pk if isinstance(item, models.Model) else item
|
||
|
|
for item in value
|
||
|
|
]
|
||
|
|
|
||
|
|
doc[self.get_field_column_name(field)] = value
|
||
|
|
|
||
|
|
# Check if this field should be added into _edgengrams
|
||
|
|
if isinstance(field, AutocompleteField):
|
||
|
|
edgengrams.append(value)
|
||
|
|
|
||
|
|
# Add partials to document
|
||
|
|
doc[self.edgengrams_field_name] = edgengrams
|
||
|
|
|
||
|
|
return doc
|
||
|
|
|
||
|
|
def __repr__(self):
|
||
|
|
return f"<ElasticsearchMapping: {self.model.__name__}>"
|
||
|
|
|
||
|
|
|
||
|
|
class Elasticsearch7Index:
|
||
|
|
def __init__(self, backend, name):
|
||
|
|
self.backend = backend
|
||
|
|
self.es = backend.es
|
||
|
|
self.mapping_class = backend.mapping_class
|
||
|
|
self.name = name
|
||
|
|
|
||
|
|
if use_new_elasticsearch_api:
|
||
|
|
|
||
|
|
def put(self):
|
||
|
|
self.es.indices.create(index=self.name, **self.backend.settings)
|
||
|
|
|
||
|
|
def delete(self):
|
||
|
|
try:
|
||
|
|
self.es.indices.delete(index=self.name)
|
||
|
|
except NotFoundError:
|
||
|
|
pass
|
||
|
|
|
||
|
|
def refresh(self):
|
||
|
|
self.es.indices.refresh(index=self.name)
|
||
|
|
|
||
|
|
else:
|
||
|
|
|
||
|
|
def put(self):
|
||
|
|
self.es.indices.create(self.name, self.backend.settings)
|
||
|
|
|
||
|
|
def delete(self):
|
||
|
|
try:
|
||
|
|
self.es.indices.delete(self.name)
|
||
|
|
except NotFoundError:
|
||
|
|
pass
|
||
|
|
|
||
|
|
def refresh(self):
|
||
|
|
self.es.indices.refresh(self.name)
|
||
|
|
|
||
|
|
def exists(self):
|
||
|
|
return self.es.indices.exists(self.name)
|
||
|
|
|
||
|
|
def is_alias(self):
|
||
|
|
return self.es.indices.exists_alias(name=self.name)
|
||
|
|
|
||
|
|
def aliased_indices(self):
|
||
|
|
"""
|
||
|
|
If this index object represents an alias (which appear the same in the
|
||
|
|
Elasticsearch API), this method can be used to fetch the list of indices
|
||
|
|
the alias points to.
|
||
|
|
|
||
|
|
Use the is_alias method if you need to find out if this an alias. This
|
||
|
|
returns an empty list if called on an index.
|
||
|
|
"""
|
||
|
|
return [
|
||
|
|
self.backend.index_class(self.backend, index_name)
|
||
|
|
for index_name in self.es.indices.get_alias(name=self.name).keys()
|
||
|
|
]
|
||
|
|
|
||
|
|
def put_alias(self, name):
|
||
|
|
"""
|
||
|
|
Creates a new alias to this index. If the alias already exists it will
|
||
|
|
be repointed to this index.
|
||
|
|
"""
|
||
|
|
self.es.indices.put_alias(name=name, index=self.name)
|
||
|
|
|
||
|
|
def add_model(self, model):
|
||
|
|
# Get mapping
|
||
|
|
mapping = self.mapping_class(model)
|
||
|
|
|
||
|
|
# Put mapping
|
||
|
|
self.es.indices.put_mapping(index=self.name, body=mapping.get_mapping())
|
||
|
|
|
||
|
|
if use_new_elasticsearch_api:
|
||
|
|
|
||
|
|
def add_item(self, item):
|
||
|
|
# Make sure the object can be indexed
|
||
|
|
if not class_is_indexed(item.__class__):
|
||
|
|
return
|
||
|
|
|
||
|
|
# Get mapping
|
||
|
|
mapping = self.mapping_class(item.__class__)
|
||
|
|
|
||
|
|
# Add document to index
|
||
|
|
self.es.index(
|
||
|
|
index=self.name,
|
||
|
|
document=mapping.get_document(item),
|
||
|
|
id=mapping.get_document_id(item),
|
||
|
|
)
|
||
|
|
|
||
|
|
else:
|
||
|
|
|
||
|
|
def add_item(self, item):
|
||
|
|
# Make sure the object can be indexed
|
||
|
|
if not class_is_indexed(item.__class__):
|
||
|
|
return
|
||
|
|
# Get mapping
|
||
|
|
mapping = self.mapping_class(item.__class__)
|
||
|
|
|
||
|
|
# Add document to index
|
||
|
|
self.es.index(
|
||
|
|
self.name, mapping.get_document(item), id=mapping.get_document_id(item)
|
||
|
|
)
|
||
|
|
|
||
|
|
def add_items(self, model, items):
|
||
|
|
if not class_is_indexed(model):
|
||
|
|
return
|
||
|
|
|
||
|
|
# Get mapping
|
||
|
|
mapping = self.mapping_class(model)
|
||
|
|
|
||
|
|
# Create list of actions
|
||
|
|
actions = []
|
||
|
|
for item in items:
|
||
|
|
# Create the action
|
||
|
|
action = {"_id": mapping.get_document_id(item)}
|
||
|
|
action.update(mapping.get_document(item))
|
||
|
|
actions.append(action)
|
||
|
|
|
||
|
|
# Run the actions
|
||
|
|
bulk(self.es, actions, index=self.name)
|
||
|
|
|
||
|
|
def delete_item(self, item):
|
||
|
|
# Make sure the object can be indexed
|
||
|
|
if not class_is_indexed(item.__class__):
|
||
|
|
return
|
||
|
|
|
||
|
|
# Get mapping
|
||
|
|
mapping = self.mapping_class(item.__class__)
|
||
|
|
|
||
|
|
# Delete document
|
||
|
|
try:
|
||
|
|
self.es.delete(index=self.name, id=mapping.get_document_id(item))
|
||
|
|
except NotFoundError:
|
||
|
|
pass # Document doesn't exist, ignore this exception
|
||
|
|
|
||
|
|
def reset(self):
|
||
|
|
# Delete old index
|
||
|
|
self.delete()
|
||
|
|
|
||
|
|
# Create new index
|
||
|
|
self.put()
|
||
|
|
|
||
|
|
|
||
|
|
class Elasticsearch7SearchQueryCompiler(BaseSearchQueryCompiler):
|
||
|
|
mapping_class = Elasticsearch7Mapping
|
||
|
|
DEFAULT_OPERATOR = "or"
|
||
|
|
|
||
|
|
def __init__(self, *args, **kwargs):
|
||
|
|
super().__init__(*args, **kwargs)
|
||
|
|
self.mapping = self.mapping_class(self.queryset.model)
|
||
|
|
self.remapped_fields = self._remap_fields(self.fields)
|
||
|
|
|
||
|
|
def _remap_fields(self, fields):
|
||
|
|
"""Convert field names into index column names and add boosts."""
|
||
|
|
|
||
|
|
remapped_fields = []
|
||
|
|
if fields:
|
||
|
|
searchable_fields = {f.field_name: f for f in self.get_searchable_fields()}
|
||
|
|
for field_name in fields:
|
||
|
|
field = searchable_fields.get(field_name)
|
||
|
|
if field:
|
||
|
|
field_name = self.mapping.get_field_column_name(field)
|
||
|
|
remapped_fields.append(Field(field_name, field.boost or 1))
|
||
|
|
else:
|
||
|
|
remapped_fields.append(Field(self.mapping.all_field_name))
|
||
|
|
|
||
|
|
models = get_indexed_models()
|
||
|
|
unique_boosts = set()
|
||
|
|
for model in models:
|
||
|
|
if not issubclass(model, self.queryset.model):
|
||
|
|
continue
|
||
|
|
for field in model.get_searchable_search_fields():
|
||
|
|
if field.boost:
|
||
|
|
unique_boosts.add(float(field.boost))
|
||
|
|
|
||
|
|
remapped_fields.extend(
|
||
|
|
[
|
||
|
|
Field(self.mapping.get_boost_field_name(boost), boost)
|
||
|
|
for boost in unique_boosts
|
||
|
|
]
|
||
|
|
)
|
||
|
|
|
||
|
|
return remapped_fields
|
||
|
|
|
||
|
|
def _process_lookup(self, field, lookup, value):
|
||
|
|
column_name = self.mapping.get_field_column_name(field)
|
||
|
|
|
||
|
|
if lookup == "exact":
|
||
|
|
if value is None:
|
||
|
|
return {
|
||
|
|
"missing": {
|
||
|
|
"field": column_name,
|
||
|
|
}
|
||
|
|
}
|
||
|
|
else:
|
||
|
|
if isinstance(value, (Query, Subquery)):
|
||
|
|
db_alias = self.queryset._db or DEFAULT_DB_ALIAS
|
||
|
|
query = value.query if isinstance(value, Subquery) else value
|
||
|
|
value = query.get_compiler(db_alias).execute_sql(result_type=SINGLE)
|
||
|
|
# The result is either a tuple with one element or None
|
||
|
|
if value:
|
||
|
|
value = value[0]
|
||
|
|
|
||
|
|
return {
|
||
|
|
"term": {
|
||
|
|
column_name: value,
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
if lookup == "isnull":
|
||
|
|
query = {
|
||
|
|
"exists": {
|
||
|
|
"field": column_name,
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
if value:
|
||
|
|
query = {"bool": {"mustNot": query}}
|
||
|
|
|
||
|
|
return query
|
||
|
|
|
||
|
|
if lookup in ["startswith", "prefix"]:
|
||
|
|
return {
|
||
|
|
"prefix": {
|
||
|
|
column_name: value,
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
if lookup in ["gt", "gte", "lt", "lte"]:
|
||
|
|
return {
|
||
|
|
"range": {
|
||
|
|
column_name: {
|
||
|
|
lookup: value,
|
||
|
|
}
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
if lookup == "range":
|
||
|
|
lower, upper = value
|
||
|
|
|
||
|
|
return {
|
||
|
|
"range": {
|
||
|
|
column_name: {
|
||
|
|
"gte": lower,
|
||
|
|
"lte": upper,
|
||
|
|
}
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
if lookup == "in":
|
||
|
|
if isinstance(value, (Query, Subquery)):
|
||
|
|
db_alias = self.queryset._db or DEFAULT_DB_ALIAS
|
||
|
|
query = value.query if isinstance(value, Subquery) else value
|
||
|
|
resultset = query.get_compiler(db_alias).execute_sql(result_type=MULTI)
|
||
|
|
value = [row[0] for chunk in resultset for row in chunk]
|
||
|
|
|
||
|
|
elif not isinstance(value, list):
|
||
|
|
value = list(value)
|
||
|
|
return {
|
||
|
|
"terms": {
|
||
|
|
column_name: value,
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
def _process_match_none(self):
|
||
|
|
return {"bool": {"mustNot": {"match_all": {}}}}
|
||
|
|
|
||
|
|
def _connect_filters(self, filters, connector, negated):
|
||
|
|
if filters:
|
||
|
|
if len(filters) == 1:
|
||
|
|
filter_out = filters[0]
|
||
|
|
elif connector == "AND":
|
||
|
|
filter_out = {
|
||
|
|
"bool": {"must": [fil for fil in filters if fil is not None]}
|
||
|
|
}
|
||
|
|
elif connector == "OR":
|
||
|
|
filter_out = {
|
||
|
|
"bool": {"should": [fil for fil in filters if fil is not None]}
|
||
|
|
}
|
||
|
|
|
||
|
|
if negated:
|
||
|
|
filter_out = {"bool": {"mustNot": filter_out}}
|
||
|
|
|
||
|
|
return filter_out
|
||
|
|
|
||
|
|
def _compile_plaintext_query(self, query, fields, boost=1.0):
|
||
|
|
match_query = {"query": query.query_string}
|
||
|
|
|
||
|
|
if query.operator != "or":
|
||
|
|
match_query["operator"] = query.operator
|
||
|
|
|
||
|
|
if len(fields) == 1:
|
||
|
|
if boost != 1.0 or fields[0].boost != 1.0:
|
||
|
|
match_query["boost"] = boost * fields[0].boost
|
||
|
|
return {"match": {fields[0].field_name: match_query}}
|
||
|
|
else:
|
||
|
|
if boost != 1.0:
|
||
|
|
match_query["boost"] = boost
|
||
|
|
match_query["fields"] = [field.field_name_with_boost for field in fields]
|
||
|
|
|
||
|
|
return {"multi_match": match_query}
|
||
|
|
|
||
|
|
def _compile_fuzzy_query(self, query, fields):
|
||
|
|
match_query = {
|
||
|
|
"query": query.query_string,
|
||
|
|
"fuzziness": "AUTO",
|
||
|
|
}
|
||
|
|
|
||
|
|
if query.operator != "or":
|
||
|
|
match_query["operator"] = query.operator
|
||
|
|
|
||
|
|
if len(fields) == 1:
|
||
|
|
if fields[0].boost != 1.0:
|
||
|
|
match_query["boost"] = fields[0].boost
|
||
|
|
return {"match": {fields[0].field_name: match_query}}
|
||
|
|
else:
|
||
|
|
match_query["fields"] = [field.field_name_with_boost for field in fields]
|
||
|
|
return {"multi_match": match_query}
|
||
|
|
|
||
|
|
def _compile_phrase_query(self, query, fields):
|
||
|
|
if len(fields) == 1:
|
||
|
|
if fields[0].boost != 1.0:
|
||
|
|
return {
|
||
|
|
"match_phrase": {
|
||
|
|
fields[0].field_name: {
|
||
|
|
"query": query.query_string,
|
||
|
|
"boost": fields[0].boost,
|
||
|
|
}
|
||
|
|
}
|
||
|
|
}
|
||
|
|
else:
|
||
|
|
return {"match_phrase": {fields[0].field_name: query.query_string}}
|
||
|
|
else:
|
||
|
|
return {
|
||
|
|
"multi_match": {
|
||
|
|
"query": query.query_string,
|
||
|
|
"fields": [field.field_name_with_boost for field in fields],
|
||
|
|
"type": "phrase",
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
def _compile_query(self, query, field, boost=1.0):
|
||
|
|
if isinstance(query, MatchAll):
|
||
|
|
match_all_query = {}
|
||
|
|
|
||
|
|
if boost != 1.0:
|
||
|
|
match_all_query["boost"] = boost
|
||
|
|
|
||
|
|
return {"match_all": match_all_query}
|
||
|
|
|
||
|
|
elif isinstance(query, And):
|
||
|
|
return {
|
||
|
|
"bool": {
|
||
|
|
"must": [
|
||
|
|
self._compile_query(child_query, field, boost)
|
||
|
|
for child_query in query.subqueries
|
||
|
|
]
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
elif isinstance(query, Or):
|
||
|
|
return {
|
||
|
|
"bool": {
|
||
|
|
"should": [
|
||
|
|
self._compile_query(child_query, field, boost)
|
||
|
|
for child_query in query.subqueries
|
||
|
|
]
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
elif isinstance(query, Not):
|
||
|
|
return {
|
||
|
|
"bool": {"mustNot": self._compile_query(query.subquery, field, boost)}
|
||
|
|
}
|
||
|
|
|
||
|
|
elif isinstance(query, PlainText):
|
||
|
|
return self._compile_plaintext_query(query, [field], boost)
|
||
|
|
|
||
|
|
elif isinstance(query, Fuzzy):
|
||
|
|
return self._compile_fuzzy_query(query, [field])
|
||
|
|
|
||
|
|
elif isinstance(query, Phrase):
|
||
|
|
return self._compile_phrase_query(query, [field])
|
||
|
|
|
||
|
|
elif isinstance(query, Boost):
|
||
|
|
return self._compile_query(query.subquery, field, boost * query.boost)
|
||
|
|
|
||
|
|
else:
|
||
|
|
raise NotImplementedError(
|
||
|
|
"`%s` is not supported by the Elasticsearch search backend."
|
||
|
|
% query.__class__.__name__
|
||
|
|
)
|
||
|
|
|
||
|
|
def get_inner_query(self):
|
||
|
|
if self.remapped_fields:
|
||
|
|
fields = self.remapped_fields
|
||
|
|
else:
|
||
|
|
fields = [self.mapping.all_field_name]
|
||
|
|
|
||
|
|
if len(fields) == 0:
|
||
|
|
# No fields. Return a query that'll match nothing
|
||
|
|
return {"bool": {"mustNot": {"match_all": {}}}}
|
||
|
|
|
||
|
|
# Handle MatchAll and PlainText separately as they were supported
|
||
|
|
# before "search query classes" was implemented and we'd like to
|
||
|
|
# keep the query the same as before
|
||
|
|
if isinstance(self.query, MatchAll):
|
||
|
|
return {"match_all": {}}
|
||
|
|
|
||
|
|
elif isinstance(self.query, PlainText):
|
||
|
|
return self._compile_plaintext_query(self.query, fields)
|
||
|
|
|
||
|
|
elif isinstance(self.query, Phrase):
|
||
|
|
return self._compile_phrase_query(self.query, fields)
|
||
|
|
|
||
|
|
elif isinstance(self.query, Fuzzy):
|
||
|
|
return self._compile_fuzzy_query(self.query, fields)
|
||
|
|
|
||
|
|
elif isinstance(self.query, Not):
|
||
|
|
return {
|
||
|
|
"bool": {
|
||
|
|
"mustNot": [
|
||
|
|
self._compile_query(self.query.subquery, field)
|
||
|
|
for field in fields
|
||
|
|
]
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
else:
|
||
|
|
return self._join_and_compile_queries(self.query, fields)
|
||
|
|
|
||
|
|
def _join_and_compile_queries(self, query, fields, boost=1.0):
|
||
|
|
if len(fields) == 1:
|
||
|
|
return self._compile_query(query, fields[0], boost)
|
||
|
|
else:
|
||
|
|
# Compile a query for each field then combine with disjunction
|
||
|
|
# max (or operator which takes the max score out of each of the
|
||
|
|
# field queries)
|
||
|
|
field_queries = []
|
||
|
|
for field in fields:
|
||
|
|
field_queries.append(self._compile_query(query, field, boost))
|
||
|
|
|
||
|
|
return {"dis_max": {"queries": field_queries}}
|
||
|
|
|
||
|
|
def get_content_type_filter(self):
|
||
|
|
# Query content_type using a "match" query. See comment in
|
||
|
|
# Elasticsearch7Mapping.get_document for more details
|
||
|
|
content_type = self.mapping_class(self.queryset.model).get_content_type()
|
||
|
|
|
||
|
|
return {"match": {"content_type": content_type}}
|
||
|
|
|
||
|
|
def get_filters(self):
|
||
|
|
# Filter by content type
|
||
|
|
filters = [self.get_content_type_filter()]
|
||
|
|
|
||
|
|
# Apply filters from queryset
|
||
|
|
queryset_filters = self._get_filters_from_queryset()
|
||
|
|
if queryset_filters:
|
||
|
|
filters.append(queryset_filters)
|
||
|
|
|
||
|
|
return filters
|
||
|
|
|
||
|
|
def get_query(self):
|
||
|
|
inner_query = self.get_inner_query()
|
||
|
|
filters = self.get_filters()
|
||
|
|
|
||
|
|
if len(filters) == 1:
|
||
|
|
return {
|
||
|
|
"bool": {
|
||
|
|
"must": inner_query,
|
||
|
|
"filter": filters[0],
|
||
|
|
}
|
||
|
|
}
|
||
|
|
elif len(filters) > 1:
|
||
|
|
return {
|
||
|
|
"bool": {
|
||
|
|
"must": inner_query,
|
||
|
|
"filter": filters,
|
||
|
|
}
|
||
|
|
}
|
||
|
|
else:
|
||
|
|
return inner_query
|
||
|
|
|
||
|
|
def get_searchable_fields(self):
|
||
|
|
return self.queryset.model.get_searchable_search_fields()
|
||
|
|
|
||
|
|
def get_sort(self):
|
||
|
|
# Ordering by relevance is the default in Elasticsearch
|
||
|
|
if self.order_by_relevance:
|
||
|
|
return
|
||
|
|
|
||
|
|
# Get queryset and make sure its ordered
|
||
|
|
if self.queryset.ordered:
|
||
|
|
sort = []
|
||
|
|
|
||
|
|
for reverse, field in self._get_order_by():
|
||
|
|
column_name = self.mapping.get_field_column_name(field)
|
||
|
|
|
||
|
|
sort.append({column_name: "desc" if reverse else "asc"})
|
||
|
|
|
||
|
|
return sort
|
||
|
|
|
||
|
|
else:
|
||
|
|
# Order by pk field
|
||
|
|
return ["pk"]
|
||
|
|
|
||
|
|
def __repr__(self):
|
||
|
|
return json.dumps(self.get_query())
|
||
|
|
|
||
|
|
|
||
|
|
class Elasticsearch7SearchResults(BaseSearchResults):
|
||
|
|
fields_param_name = "stored_fields"
|
||
|
|
supports_facet = True
|
||
|
|
|
||
|
|
def facet(self, field_name):
|
||
|
|
# Get field
|
||
|
|
field = self.query_compiler._get_filterable_field(field_name)
|
||
|
|
if field is None:
|
||
|
|
raise FilterFieldError(
|
||
|
|
'Cannot facet search results with field "'
|
||
|
|
+ field_name
|
||
|
|
+ "\". Please add index.FilterField('"
|
||
|
|
+ field_name
|
||
|
|
+ "') to "
|
||
|
|
+ self.query_compiler.queryset.model.__name__
|
||
|
|
+ ".search_fields.",
|
||
|
|
field_name=field_name,
|
||
|
|
)
|
||
|
|
|
||
|
|
# Build body
|
||
|
|
body = self._get_es_body()
|
||
|
|
column_name = self.query_compiler.mapping.get_field_column_name(field)
|
||
|
|
|
||
|
|
body["aggregations"] = {
|
||
|
|
field_name: {
|
||
|
|
"terms": {
|
||
|
|
"field": column_name,
|
||
|
|
"missing": 0,
|
||
|
|
}
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
# Send to Elasticsearch
|
||
|
|
response = self._backend_do_search(
|
||
|
|
body,
|
||
|
|
index=self.backend.get_index_for_model(
|
||
|
|
self.query_compiler.queryset.model
|
||
|
|
).name,
|
||
|
|
size=0,
|
||
|
|
)
|
||
|
|
|
||
|
|
return OrderedDict(
|
||
|
|
[
|
||
|
|
(bucket["key"] if bucket["key"] != 0 else None, bucket["doc_count"])
|
||
|
|
for bucket in response["aggregations"][field_name]["buckets"]
|
||
|
|
]
|
||
|
|
)
|
||
|
|
|
||
|
|
def _get_es_body(self, for_count=False):
|
||
|
|
body = {"query": self.query_compiler.get_query()}
|
||
|
|
|
||
|
|
if not for_count:
|
||
|
|
sort = self.query_compiler.get_sort()
|
||
|
|
|
||
|
|
if sort is not None:
|
||
|
|
body["sort"] = sort
|
||
|
|
|
||
|
|
return body
|
||
|
|
|
||
|
|
def _get_results_from_hits(self, hits):
|
||
|
|
"""
|
||
|
|
Yields Django model instances from a page of hits returned by Elasticsearch
|
||
|
|
"""
|
||
|
|
# Get pks from results
|
||
|
|
pks = [hit["fields"]["pk"][0] for hit in hits]
|
||
|
|
scores = {str(hit["fields"]["pk"][0]): hit["_score"] for hit in hits}
|
||
|
|
|
||
|
|
# Initialise results dictionary
|
||
|
|
results = {str(pk): None for pk in pks}
|
||
|
|
|
||
|
|
# Find objects in database and add them to dict
|
||
|
|
for obj in self.query_compiler.queryset.filter(pk__in=pks):
|
||
|
|
results[str(obj.pk)] = obj
|
||
|
|
|
||
|
|
if self._score_field:
|
||
|
|
setattr(obj, self._score_field, scores.get(str(obj.pk)))
|
||
|
|
|
||
|
|
# Yield results in order given by Elasticsearch
|
||
|
|
for pk in pks:
|
||
|
|
result = results[str(pk)]
|
||
|
|
if result:
|
||
|
|
yield result
|
||
|
|
|
||
|
|
if use_new_elasticsearch_api:
|
||
|
|
|
||
|
|
def _backend_do_search(self, body, **kwargs):
|
||
|
|
# As of Elasticsearch 7.15, the 'body' parameter is deprecated; instead, the top-level
|
||
|
|
# keys of the body dict are now kwargs in their own right
|
||
|
|
return self.backend.es.search(**body, **kwargs)
|
||
|
|
|
||
|
|
else:
|
||
|
|
|
||
|
|
def _backend_do_search(self, body, **kwargs):
|
||
|
|
# Send the search query to the backend.
|
||
|
|
return self.backend.es.search(body=body, **kwargs)
|
||
|
|
|
||
|
|
def _do_search(self):
|
||
|
|
PAGE_SIZE = 100
|
||
|
|
|
||
|
|
if self.stop is not None:
|
||
|
|
limit = self.stop - self.start
|
||
|
|
else:
|
||
|
|
limit = None
|
||
|
|
|
||
|
|
use_scroll = limit is None or limit > PAGE_SIZE
|
||
|
|
|
||
|
|
body = self._get_es_body()
|
||
|
|
params = {
|
||
|
|
"index": self.backend.get_index_for_model(
|
||
|
|
self.query_compiler.queryset.model
|
||
|
|
).name,
|
||
|
|
"_source": False,
|
||
|
|
self.fields_param_name: "pk",
|
||
|
|
}
|
||
|
|
|
||
|
|
if use_scroll:
|
||
|
|
params.update(
|
||
|
|
{
|
||
|
|
"scroll": "2m",
|
||
|
|
"size": PAGE_SIZE,
|
||
|
|
}
|
||
|
|
)
|
||
|
|
|
||
|
|
# The scroll API doesn't support offset, manually skip the first results
|
||
|
|
skip = self.start
|
||
|
|
|
||
|
|
# Send to Elasticsearch
|
||
|
|
page = self._backend_do_search(body, **params)
|
||
|
|
|
||
|
|
while True:
|
||
|
|
hits = page["hits"]["hits"]
|
||
|
|
|
||
|
|
if len(hits) == 0:
|
||
|
|
break
|
||
|
|
|
||
|
|
# Get results
|
||
|
|
if skip < len(hits):
|
||
|
|
for result in self._get_results_from_hits(hits):
|
||
|
|
if limit is not None and limit == 0:
|
||
|
|
break
|
||
|
|
|
||
|
|
if skip == 0:
|
||
|
|
yield result
|
||
|
|
|
||
|
|
if limit is not None:
|
||
|
|
limit -= 1
|
||
|
|
else:
|
||
|
|
skip -= 1
|
||
|
|
|
||
|
|
if limit is not None and limit == 0:
|
||
|
|
break
|
||
|
|
else:
|
||
|
|
# Skip whole page
|
||
|
|
skip -= len(hits)
|
||
|
|
|
||
|
|
# Fetch next page of results
|
||
|
|
if "_scroll_id" not in page:
|
||
|
|
break
|
||
|
|
|
||
|
|
page = self.backend.es.scroll(scroll_id=page["_scroll_id"], scroll="2m")
|
||
|
|
|
||
|
|
# Clear the scroll
|
||
|
|
if "_scroll_id" in page:
|
||
|
|
self.backend.es.clear_scroll(scroll_id=page["_scroll_id"])
|
||
|
|
else:
|
||
|
|
params.update(
|
||
|
|
{
|
||
|
|
"from_": self.start,
|
||
|
|
"size": limit or PAGE_SIZE,
|
||
|
|
}
|
||
|
|
)
|
||
|
|
|
||
|
|
# Send to Elasticsearch
|
||
|
|
hits = self._backend_do_search(body, **params)["hits"]["hits"]
|
||
|
|
|
||
|
|
# Get results
|
||
|
|
for result in self._get_results_from_hits(hits):
|
||
|
|
yield result
|
||
|
|
|
||
|
|
def _do_count(self):
|
||
|
|
# Get count
|
||
|
|
hit_count = self.backend.es.count(
|
||
|
|
index=self.backend.get_index_for_model(
|
||
|
|
self.query_compiler.queryset.model
|
||
|
|
).name,
|
||
|
|
body=self._get_es_body(for_count=True),
|
||
|
|
)["count"]
|
||
|
|
|
||
|
|
# Add limits
|
||
|
|
hit_count -= self.start
|
||
|
|
if self.stop is not None:
|
||
|
|
hit_count = min(hit_count, self.stop - self.start)
|
||
|
|
|
||
|
|
return max(hit_count, 0)
|
||
|
|
|
||
|
|
|
||
|
|
class ElasticsearchAutocompleteQueryCompilerImpl:
|
||
|
|
def __init__(self, *args, **kwargs):
|
||
|
|
super().__init__(*args, **kwargs)
|
||
|
|
|
||
|
|
# Convert field names into index column names
|
||
|
|
# Note: this overrides Elasticsearch7SearchQueryCompiler by using autocomplete fields instead of searchable fields
|
||
|
|
if self.fields:
|
||
|
|
fields = []
|
||
|
|
autocomplete_fields = {
|
||
|
|
f.field_name: f
|
||
|
|
for f in self.queryset.model.get_autocomplete_search_fields()
|
||
|
|
}
|
||
|
|
for field_name in self.fields:
|
||
|
|
if field_name in autocomplete_fields:
|
||
|
|
field_name = self.mapping.get_field_column_name(
|
||
|
|
autocomplete_fields[field_name]
|
||
|
|
)
|
||
|
|
|
||
|
|
fields.append(field_name)
|
||
|
|
|
||
|
|
self.remapped_fields = fields
|
||
|
|
else:
|
||
|
|
self.remapped_fields = None
|
||
|
|
|
||
|
|
def get_inner_query(self):
|
||
|
|
fields = self.remapped_fields or [self.mapping.edgengrams_field_name]
|
||
|
|
fields = [Field(field) for field in fields]
|
||
|
|
if len(fields) == 0:
|
||
|
|
# No fields. Return a query that'll match nothing
|
||
|
|
return {"bool": {"mustNot": {"match_all": {}}}}
|
||
|
|
elif isinstance(self.query, PlainText):
|
||
|
|
return self._compile_plaintext_query(self.query, fields)
|
||
|
|
elif isinstance(self.query, MatchAll):
|
||
|
|
return {"match_all": {}}
|
||
|
|
else:
|
||
|
|
raise NotImplementedError(
|
||
|
|
"`%s` is not supported for autocomplete queries."
|
||
|
|
% self.query.__class__.__name__
|
||
|
|
)
|
||
|
|
|
||
|
|
|
||
|
|
class Elasticsearch7AutocompleteQueryCompiler(
|
||
|
|
ElasticsearchAutocompleteQueryCompilerImpl, Elasticsearch7SearchQueryCompiler
|
||
|
|
):
|
||
|
|
pass
|
||
|
|
|
||
|
|
|
||
|
|
class ElasticsearchIndexRebuilder:
|
||
|
|
def __init__(self, index):
|
||
|
|
self.index = index
|
||
|
|
|
||
|
|
def reset_index(self):
|
||
|
|
self.index.reset()
|
||
|
|
|
||
|
|
def start(self):
|
||
|
|
# Reset the index
|
||
|
|
self.reset_index()
|
||
|
|
|
||
|
|
return self.index
|
||
|
|
|
||
|
|
def finish(self):
|
||
|
|
self.index.refresh()
|
||
|
|
|
||
|
|
|
||
|
|
class ElasticsearchAtomicIndexRebuilder(ElasticsearchIndexRebuilder):
|
||
|
|
def __init__(self, index):
|
||
|
|
self.alias = index
|
||
|
|
self.index = index.backend.index_class(
|
||
|
|
index.backend, self.alias.name + "_" + get_random_string(7).lower()
|
||
|
|
)
|
||
|
|
|
||
|
|
def reset_index(self):
|
||
|
|
# Delete old index using the alias
|
||
|
|
# This should delete both the alias and the index
|
||
|
|
self.alias.delete()
|
||
|
|
|
||
|
|
# Create new index
|
||
|
|
self.index.put()
|
||
|
|
|
||
|
|
# Create a new alias
|
||
|
|
self.index.put_alias(self.alias.name)
|
||
|
|
|
||
|
|
def start(self):
|
||
|
|
# Create the new index
|
||
|
|
self.index.put()
|
||
|
|
|
||
|
|
return self.index
|
||
|
|
|
||
|
|
def finish(self):
|
||
|
|
self.index.refresh()
|
||
|
|
|
||
|
|
if self.alias.is_alias():
|
||
|
|
# Update existing alias, then delete the old index
|
||
|
|
|
||
|
|
# Find index that alias currently points to, we'll delete it after
|
||
|
|
# updating the alias
|
||
|
|
old_index = self.alias.aliased_indices()
|
||
|
|
|
||
|
|
# Update alias to point to new index
|
||
|
|
self.index.put_alias(self.alias.name)
|
||
|
|
|
||
|
|
# Delete old index
|
||
|
|
# aliased_indices() can return multiple indices. Delete them all
|
||
|
|
for index in old_index:
|
||
|
|
if index.name != self.index.name:
|
||
|
|
index.delete()
|
||
|
|
|
||
|
|
else:
|
||
|
|
# self.alias doesn't currently refer to an alias in Elasticsearch.
|
||
|
|
# This means that either nothing exists in ES with that name or
|
||
|
|
# there is currently an index with the that name
|
||
|
|
|
||
|
|
# Run delete on the alias, just in case it is currently an index.
|
||
|
|
# This happens on the first rebuild after switching ATOMIC_REBUILD on
|
||
|
|
self.alias.delete()
|
||
|
|
|
||
|
|
# Create the alias
|
||
|
|
self.index.put_alias(self.alias.name)
|
||
|
|
|
||
|
|
|
||
|
|
class Elasticsearch7SearchBackend(BaseSearchBackend):
|
||
|
|
mapping_class = Elasticsearch7Mapping
|
||
|
|
index_class = Elasticsearch7Index
|
||
|
|
query_compiler_class = Elasticsearch7SearchQueryCompiler
|
||
|
|
autocomplete_query_compiler_class = Elasticsearch7AutocompleteQueryCompiler
|
||
|
|
results_class = Elasticsearch7SearchResults
|
||
|
|
basic_rebuilder_class = ElasticsearchIndexRebuilder
|
||
|
|
atomic_rebuilder_class = ElasticsearchAtomicIndexRebuilder
|
||
|
|
catch_indexing_errors = True
|
||
|
|
timeout_kwarg_name = "timeout"
|
||
|
|
|
||
|
|
settings = {
|
||
|
|
"settings": {
|
||
|
|
"analysis": {
|
||
|
|
"analyzer": {
|
||
|
|
"ngram_analyzer": {
|
||
|
|
"type": "custom",
|
||
|
|
"tokenizer": "standard",
|
||
|
|
"filter": ["asciifolding", "lowercase", "ngram"],
|
||
|
|
},
|
||
|
|
"edgengram_analyzer": {
|
||
|
|
"type": "custom",
|
||
|
|
"tokenizer": "standard",
|
||
|
|
"filter": ["asciifolding", "lowercase", "edgengram"],
|
||
|
|
},
|
||
|
|
},
|
||
|
|
"tokenizer": {
|
||
|
|
"ngram_tokenizer": {
|
||
|
|
"type": "ngram",
|
||
|
|
"min_gram": 3,
|
||
|
|
"max_gram": 15,
|
||
|
|
},
|
||
|
|
"edgengram_tokenizer": {
|
||
|
|
"type": "edge_ngram",
|
||
|
|
"min_gram": 2,
|
||
|
|
"max_gram": 15,
|
||
|
|
"side": "front",
|
||
|
|
},
|
||
|
|
},
|
||
|
|
"filter": {
|
||
|
|
"ngram": {"type": "ngram", "min_gram": 3, "max_gram": 15},
|
||
|
|
"edgengram": {"type": "edge_ngram", "min_gram": 1, "max_gram": 15},
|
||
|
|
},
|
||
|
|
},
|
||
|
|
"index": {
|
||
|
|
"max_ngram_diff": 12,
|
||
|
|
},
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
def _get_host_config_from_url(self, url):
|
||
|
|
"""Given a parsed URL, return the host configuration to be added to self.hosts"""
|
||
|
|
use_ssl = url.scheme == "https"
|
||
|
|
port = url.port or (443 if use_ssl else 80)
|
||
|
|
|
||
|
|
http_auth = None
|
||
|
|
if url.username is not None and url.password is not None:
|
||
|
|
http_auth = (url.username, url.password)
|
||
|
|
|
||
|
|
return {
|
||
|
|
"host": url.hostname,
|
||
|
|
"port": port,
|
||
|
|
"url_prefix": url.path,
|
||
|
|
"use_ssl": use_ssl,
|
||
|
|
"verify_certs": use_ssl,
|
||
|
|
"http_auth": http_auth,
|
||
|
|
}
|
||
|
|
|
||
|
|
def _get_options_from_host_urls(self, urls):
|
||
|
|
"""Given a list of parsed URLs, return a dict of additional options to be passed into the
|
||
|
|
Elasticsearch constructor; necessary for options that aren't valid as part of the 'hosts' config
|
||
|
|
"""
|
||
|
|
return {}
|
||
|
|
|
||
|
|
def __init__(self, params):
|
||
|
|
super().__init__(params)
|
||
|
|
|
||
|
|
# Get settings
|
||
|
|
self.hosts = params.pop("HOSTS", None)
|
||
|
|
self.index_name = params.pop("INDEX", "wagtail")
|
||
|
|
self.timeout = params.pop("TIMEOUT", 10)
|
||
|
|
|
||
|
|
if params.pop("ATOMIC_REBUILD", False):
|
||
|
|
self.rebuilder_class = self.atomic_rebuilder_class
|
||
|
|
else:
|
||
|
|
self.rebuilder_class = self.basic_rebuilder_class
|
||
|
|
|
||
|
|
self.settings = deepcopy(
|
||
|
|
self.settings
|
||
|
|
) # Make the class settings attribute as instance settings attribute
|
||
|
|
self.settings = deep_update(self.settings, params.pop("INDEX_SETTINGS", {}))
|
||
|
|
|
||
|
|
# Get Elasticsearch interface
|
||
|
|
# Any remaining params are passed into the Elasticsearch constructor
|
||
|
|
options = params.pop("OPTIONS", {})
|
||
|
|
|
||
|
|
# If HOSTS is not set, convert URLS setting to HOSTS
|
||
|
|
if self.hosts is None:
|
||
|
|
es_urls = params.pop("URLS", ["http://localhost:9200"])
|
||
|
|
# if es_urls is not a list, convert it to a list
|
||
|
|
if isinstance(es_urls, str):
|
||
|
|
es_urls = [es_urls]
|
||
|
|
|
||
|
|
parsed_urls = [urlparse(url) for url in es_urls]
|
||
|
|
|
||
|
|
self.hosts = [self._get_host_config_from_url(url) for url in parsed_urls]
|
||
|
|
options.update(self._get_options_from_host_urls(parsed_urls))
|
||
|
|
|
||
|
|
options[self.timeout_kwarg_name] = self.timeout
|
||
|
|
|
||
|
|
self.es = Elasticsearch(hosts=self.hosts, **options)
|
||
|
|
|
||
|
|
def get_index_for_model(self, model):
|
||
|
|
# Split models up into separate indices based on their root model.
|
||
|
|
# For example, all page-derived models get put together in one index,
|
||
|
|
# while images and documents each have their own index.
|
||
|
|
root_model = get_model_root(model)
|
||
|
|
index_suffix = (
|
||
|
|
"__"
|
||
|
|
+ root_model._meta.app_label.lower()
|
||
|
|
+ "_"
|
||
|
|
+ root_model.__name__.lower()
|
||
|
|
)
|
||
|
|
|
||
|
|
return self.index_class(self, self.index_name + index_suffix)
|
||
|
|
|
||
|
|
def get_index(self):
|
||
|
|
return self.index_class(self, self.index_name)
|
||
|
|
|
||
|
|
def get_rebuilder(self):
|
||
|
|
return self.rebuilder_class(self.get_index())
|
||
|
|
|
||
|
|
def reset_index(self):
|
||
|
|
# Use the rebuilder to reset the index
|
||
|
|
self.get_rebuilder().reset_index()
|
||
|
|
|
||
|
|
|
||
|
|
SearchBackend = Elasticsearch7SearchBackend
|