Skip to content

Commit

Permalink
Updates for ES 1.0
Browse files Browse the repository at this point in the history
  • Loading branch information
Chris Sinchok committed Jun 18, 2014
1 parent 3d17110 commit fe1ed22
Show file tree
Hide file tree
Showing 20 changed files with 485 additions and 212 deletions.
5 changes: 5 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
*.py[cod]
.DS_Store

share
bin
lib
include
.Python

*.db
.coverage
coverage.xml
Expand Down
2 changes: 1 addition & 1 deletion Vagrantfile
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ Vagrant.configure(VAGRANTFILE_API_VERSION) do |config|
config.vm.box = "precise32"

config.vm.network :private_network, ip: "192.168.33.101"
config.vm.network "forwarded_port", guest: 9200, host: 9200
config.vm.network "forwarded_port", guest: 9200, host: 9220

config.vm.provider :virtualbox do |v, override|
override.vm.box_url = "http://files.vagrantup.com/precise32.box"
Expand Down
2 changes: 1 addition & 1 deletion docs/quickstart.rst
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,6 @@ Once you have models that inherit from PolymorphicIndexable, you'll need to crea
python manage.py synces

Migrating Indexes
================
=================

Lorem ipsum
5 changes: 2 additions & 3 deletions elastimorphic/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
from .base import PolymorphicIndexable, SearchManager
from .base import Indexable, PolymorphicIndexable, SearchManager # noqa


__version__ = "0.1.0-dev"
__version__ = "0.1.0"
__all__ = [PolymorphicIndexable, SearchManager]
225 changes: 112 additions & 113 deletions elastimorphic/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

from .conf import settings
from .models import polymorphic_indexable_registry
from .mappings.doctype import DocumentType, search_field_factory


class ModelSearchResults(SearchResults):
Expand Down Expand Up @@ -105,12 +106,12 @@ def s(self):

@property
def es(self):
"""Returns a pyelasticsearch object, using the ES URL from the Django settings"""
"""Returns an elasticsearch object, using the ES URL from the Django settings"""
return get_es(urls=settings.ES_URLS)

def refresh(self):
"""Refreshes the index for this object"""
return self.es.refresh(index=self.model.get_index_name())
return self.es.indices.refresh(index=self.model.get_index_name())

def query(self, **kwargs):
"""Just a simple bridge to elasticutils' S().query(), prepopulating the URL
Expand All @@ -123,86 +124,84 @@ def filter(self, **kwargs):
return self.s().filter(**kwargs)


class PolymorphicIndexable(object):
"""Mixin for PolymorphicModel, allowing easy indexing and querying.
class Indexable(object):
"""A mixing for Django's Model, allowing easy indexing and querying."""

This class is a mixin, intended to be used on PolymorphicModel classes. To use it,
you just mix it in, and implement a few methods. For example:
@classmethod
def get_es(cls):
return get_es(urls=settings.ES_URLS)

.. code-block:: python
@classmethod
def get_mapping_type_name(cls):
"""By default, we'll be using the app_label and module_name properties to get the ES doctype for this object"""
return "%s_%s" % (cls._meta.app_label, cls._meta.module_name)

from django.db import models
from elastimorphic import PolymorphicIndexable, SearchManager
from polymorphic import PolymorphicModel
@classmethod
def get_doctype_class(cls):
doctype_class = type("{}_Mapping".format(cls.__name__), (DocumentType,), {})
if hasattr(cls, "Mapping"):
doctype_class = cls.Mapping

class ParentIndexable(PolymorphicIndexable, PolymorphicModel):
foo = models.CharField(max_length=255)
exclude = getattr(doctype_class, "exclude", [])

search_objects = SearchManager()
for field_pair in doctype_class.fields:
exclude.append(field_pair[0])

def extract_document(self):
doc = super(ParentIndexable, self).extract_document()
doc["foo"] = self.foo
return doc
for field in cls._meta.fields:
if field.name in exclude:
continue

@classmethod
def get_mapping_properties(cls):
properties = super(ParentIndexable, cls).get_mapping_properties()
properties.update({
"foo": {"type": "string"}
})
return properties
field_tuple = search_field_factory(field)
if field_tuple:
doctype_class.fields.append(field_tuple)

class ChildIndexable(ParentIndexable):
bar = models.IntegerField()
return doctype_class

def extract_document(self):
doc = super(ChildIndexable, self).extract_document()
doc["bar"] = self.bar
return doc
@classmethod
def get_mapping(cls):
doctype_class = cls.get_doctype_class()

@classmethod
def get_mapping_properties(cls):
properties = super(ChildIndexable, cls).get_mapping_properties()
properties.update({
"bar": {"type": "integer"}
})
return properties
mapping = doctype_class().get_mapping()
mapping["dynamic"] = "strict"
mapping["_id"] = {"path": cls._meta.pk.get_attname()}
return {cls.get_mapping_type_name(): mapping}

With this example code, after syncdb a new Elasticsearch index named `example_parentindexable` would
be created, with two mappings: `example_parentindexable` and `example_childindexable`. At minimum, you
should implement the :func:`extract_document` instance method, and the :func:`get_mapping_properties` classmethod.
"""
@classmethod
def get_index_name(cls):
index_prefix = slugify(settings.DATABASES[DEFAULT_DB_ALIAS].get("NAME", "bulbs"))
return "%s_%s" % (index_prefix, cls._meta.db_table)

def extract_document(self):
"""Returns a python dictionary, representing the Elasticseach document for this model instance.
doctype_class = self.get_doctype_class()

By default, this just includes the `polymorphic_ctype id`_, and the primary key, e.g.::
doctype = doctype_class()
document = {}
for name, field in doctype.fields:
value = getattr(self, name, None)
document[name] = field.to_es(value)
return document

{
"polymorphic_ctype": 1,
"id": 1
}
If when you override this method, be sure to at least return the default fields. This is best
done by simply updating the parent's data. For example::
def index(self, refresh=False):
es = self.get_es()
doc = self.extract_document()
es.update(
index=self.get_index_name(),
doc_type=self.get_mapping_type_name(),
id=self.pk,
body=dict(doc=doc, doc_as_upsert=True)
)

def extract_document(self):
doc = super(ParentModel, self).extract_document()
doc.update({
"bar": self.bar
})
return doc
def save(self, index=True, refresh=False, *args, **kwargs):
result = super(Indexable, self).save(*args, **kwargs)
if index:
self.index(refresh=refresh)
self._index = index
return result

It's also wise to be sure that your data is properly modeled (by overriding :func:`get_mapping`), so that
you're not letting Elasticseach decide your mappings for you.

.. _polymorphic_ctype id: https://github.com/chrisglass/django_polymorphic/blob/master/polymorphic/query.py#L190
"""
return {
"polymorphic_ctype": self.polymorphic_ctype_id,
self.polymorphic_primary_key_name: self.id
}
class PolymorphicIndexable(Indexable):
"""Mixin for PolymorphicModel, allowing easy indexing and querying.
"""

@classmethod
def get_base_class(cls):
Expand All @@ -216,23 +215,18 @@ def get_index_name(cls):
return "%s_%s" % (index_prefix, cls.get_base_class()._meta.db_table)

@classmethod
def get_es(cls):
return get_es(urls=settings.ES_URLS)
def get_mapping_type_names(cls, exclude_base=False):
"""Returns the mapping type name of this class and all of its descendants."""
names = []
if not exclude_base:
names.append(cls.get_mapping_type_name())
for subclass in cls.__subclasses__():
names.extend(subclass.get_mapping_type_names())
return names

@classmethod
def get_mapping(cls):
return {
cls.get_mapping_type_name(): {
"_id": {
"path": cls.polymorphic_primary_key_name
},
"properties": cls.get_mapping_properties(),
"dynamic": "strict",
"_all": {
"analyzer": "html"
}
}
}
def get_doctypes(cls):
return polymorphic_indexable_registry.get_doctypes(cls)

@classmethod
def get_mapping_properties(cls):
Expand Down Expand Up @@ -265,41 +259,46 @@ def get_mapping_properties(cls):
}

@classmethod
def get_mapping_type_name(cls):
"""By default, we'll be using the app_label and module_name properties to get the ES doctype for this object"""
return "%s_%s" % (cls._meta.app_label, cls._meta.module_name)
def get_mapping(cls):
return {
cls.get_mapping_type_name(): {
"_id": {
"path": cls.polymorphic_primary_key_name
},
"properties": cls.get_mapping_properties(),
"dynamic": "strict",
"_all": {
"analyzer": "html"
}
}
}

@classmethod
def get_mapping_type_names(cls, exclude_base=False):
"""Returns the mapping type name of this class and all of its descendants."""
names = []
if not exclude_base:
names.append(cls.get_mapping_type_name())
for subclass in cls.__subclasses__():
names.extend(subclass.get_mapping_type_names())
return names
def extract_document(self):
"""Returns a python dictionary, representing the Elasticseach document for this model instance.
@classmethod
def get_doctypes(cls):
return polymorphic_indexable_registry.get_doctypes(cls)
By default, this just includes the `polymorphic_ctype id`_, and the primary key, e.g.::
def index(self, refresh=False):
es = self.get_es()
doc = self.extract_document()
# NOTE: this could be made more efficient with the `doc_as_upsert`
# param when the following pull request is merged into pyelasticsearch:
# https://github.com/rhec/pyelasticsearch/pull/132
es.update(
self.get_index_name(),
self.get_mapping_type_name(),
self.id,
doc=doc,
upsert=doc
)
{
"polymorphic_ctype": 1,
"id": 1
}
def save(self, index=True, refresh=False, *args, **kwargs):
result = super(PolymorphicIndexable, self).save(*args, **kwargs)
if index:
self.index(refresh=refresh)
self._index = index
return result
If when you override this method, be sure to at least return the default fields. This is best
done by simply updating the parent's data. For example::
def extract_document(self):
doc = super(ParentModel, self).extract_document()
doc.update({
"bar": self.bar
})
return doc
It's also wise to be sure that your data is properly modeled (by overriding :func:`get_mapping`), so that
you're not letting Elasticseach decide your mappings for you.
.. _polymorphic_ctype id: https://github.com/chrisglass/django_polymorphic/blob/master/polymorphic/query.py#L190
"""
return {
"polymorphic_ctype": self.polymorphic_ctype_id,
self.polymorphic_primary_key_name: self.id
}
32 changes: 11 additions & 21 deletions elastimorphic/management/commands/bulk_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from django.core.management.base import BaseCommand
from django.db import models
from elasticutils import get_es
from pyelasticsearch.client import JsonEncoder
# from pyelasticsearch.client import JsonEncoder

from elastimorphic import PolymorphicIndexable
from elastimorphic.conf import settings
Expand All @@ -29,7 +29,6 @@ class Command(BaseCommand):

def handle(self, *args, **options):
self.es = get_es(urls=settings.ES_URLS)
bulk_endpoint = "%s/_bulk" % settings.ES_URLS[0]

chunk_size = options.get("chunk")
index_suffix = options.get("index_suffix")
Expand Down Expand Up @@ -73,31 +72,22 @@ def handle(self, *args, **options):
"_id": instance.pk
}
}
payload.append(json.dumps(meta, cls=JsonEncoder, use_decimal=True))
payload.append(meta)
doc = instance.extract_document()
payload.append(json.dumps(doc, cls=JsonEncoder, use_decimal=True))
payload.append(doc)
if len(payload) / 2 == chunk_size:
r = requests.post(bulk_endpoint, data="\n".join(payload) + "\n")
if r.status_code != 200:
print(payload)
print(r.json())
else:
# make sure it indexed everything:
result = r.json()
good_items = [item for item in result["items"] if item["index"].get("ok", False)]
if len(good_items) != len(payload) // 2:
self.stdout.write("Bulk indexing error! Item count mismatch.")
bad_items = [item for item in result["items"] if not item["index"].get("ok", False)]
self.stdout.write("These were rejected: %s" % str(bad_items))
return "Bulk indexing failed."
response = self.es.bulk(body=payload)
good_items = [item for item in response["items"] if item["index"].get("ok", False)]
if len(good_items) != len(payload) // 2:
self.stdout.write("Bulk indexing error! Item count mismatch.")
bad_items = [item for item in response["items"] if not item["index"].get("ok", False)]
self.stdout.write("These were rejected: %s" % str(bad_items))
return "Bulk indexing failed."
num_processed += (len(payload) / 2)
self.stdout.write("Indexed %d items" % num_processed)
payload = []

if payload:
r = requests.post(bulk_endpoint, data="\n".join(payload) + "\n")
if r.status_code != 200:
print(payload)
print(r.json())
response = self.es.bulk(body=payload)
num_processed += (len(payload) / 2)
self.stdout.write("Indexed %d items" % num_processed)
4 changes: 2 additions & 2 deletions elastimorphic/management/commands/es_swap_aliases.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ def handle(self, index_suffix, **options):
es = get_es()
alias_actions = []
# remove existing indexes using the aliases we want
existing_aliases = es.aliases()
existing_aliases = es.indices.get_aliases()
for index, aliases in existing_aliases.items():
for alias, new_index in indexes.items():
if alias in aliases['aliases']:
Expand All @@ -38,4 +38,4 @@ def handle(self, index_suffix, **options):
"index": index
}
})
es.update_aliases(dict(actions=alias_actions))
es.indices.update_aliases(body=dict(actions=alias_actions))
Loading

0 comments on commit fe1ed22

Please sign in to comment.