Skip to content

Commit

Permalink
Merge pull request #2314 from bagerard/abarto-feature/allow-setting-r…
Browse files Browse the repository at this point in the history
…ead-concern-queryset

Abarto feature/allow setting read concern queryset
  • Loading branch information
bagerard authored Apr 26, 2020
2 parents 9b73be2 + 78c9e97 commit 130e9c5
Show file tree
Hide file tree
Showing 6 changed files with 152 additions and 6 deletions.
1 change: 1 addition & 0 deletions AUTHORS
Original file line number Diff line number Diff line change
Expand Up @@ -256,3 +256,4 @@ that much better:
* Eric Timmons (https://github.com/daewok)
* Matthew Simpson (https://github.com/mcsimps2)
* Leonardo Domingues (https://github.com/leodmgs)
* Agustin Barto (https://github.com/abarto)
1 change: 1 addition & 0 deletions docs/changelog.rst
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ Development
- ``Queryset._ensure_indexes`` and ``Queryset.ensure_indexes``, the right method to use is ``Document.ensure_indexes``
- Added pre-commit #2212
- Renamed requirements-lint.txt to requirements-dev.txt #2212
- Support for setting ReadConcern #2255

Changes in 0.19.1
=================
Expand Down
20 changes: 20 additions & 0 deletions mongoengine/context_managers.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from contextlib import contextmanager

from pymongo.read_concern import ReadConcern
from pymongo.write_concern import WriteConcern

from mongoengine.common import _import_class
Expand All @@ -13,6 +14,7 @@
"no_sub_classes",
"query_counter",
"set_write_concern",
"set_read_write_concern",
)


Expand Down Expand Up @@ -256,3 +258,21 @@ def set_write_concern(collection, write_concerns):
combined_concerns = dict(collection.write_concern.document.items())
combined_concerns.update(write_concerns)
yield collection.with_options(write_concern=WriteConcern(**combined_concerns))


@contextmanager
def set_read_write_concern(collection, write_concerns, read_concerns):
combined_write_concerns = dict(collection.write_concern.document.items())

if write_concerns is not None:
combined_write_concerns.update(write_concerns)

combined_read_concerns = dict(collection.read_concern.document.items())

if read_concerns is not None:
combined_read_concerns.update(read_concerns)

yield collection.with_options(
write_concern=WriteConcern(**combined_write_concerns),
read_concern=ReadConcern(**combined_read_concerns),
)
47 changes: 41 additions & 6 deletions mongoengine/queryset/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,25 @@
import re
import warnings

from collections.abc import Mapping

from bson import SON, json_util
from bson.code import Code
import pymongo
import pymongo.errors
from pymongo.collection import ReturnDocument
from pymongo.common import validate_read_preference
from pymongo.read_concern import ReadConcern

from mongoengine import signals
from mongoengine.base import get_document
from mongoengine.common import _import_class
from mongoengine.connection import get_db
from mongoengine.context_managers import set_write_concern, switch_db
from mongoengine.context_managers import (
set_read_write_concern,
set_write_concern,
switch_db,
)
from mongoengine.errors import (
BulkWriteError,
InvalidQueryError,
Expand Down Expand Up @@ -57,6 +64,7 @@ def __init__(self, document, collection):
self._snapshot = False
self._timeout = True
self._read_preference = None
self._read_concern = None
self._iter = False
self._scalar = []
self._none = False
Expand Down Expand Up @@ -484,7 +492,13 @@ def delete(self, write_concern=None, _from_doc_delete=False, cascade_refs=None):
return result.deleted_count

def update(
self, upsert=False, multi=True, write_concern=None, full_result=False, **update
self,
upsert=False,
multi=True,
write_concern=None,
read_concern=None,
full_result=False,
**update
):
"""Perform an atomic update on the fields matched by the query.
Expand All @@ -496,6 +510,7 @@ def update(
``save(..., write_concern={w: 2, fsync: True}, ...)`` will
wait until at least two servers have recorded the write and
will force an fsync on the primary server.
:param read_concern: Override the read concern for the operation
:param full_result: Return the associated ``pymongo.UpdateResult`` rather than just the number
updated items
:param update: Django-style update keyword arguments
Expand All @@ -522,7 +537,9 @@ def update(
else:
update["$set"] = {"_cls": queryset._document._class_name}
try:
with set_write_concern(queryset._collection, write_concern) as collection:
with set_read_write_concern(
queryset._collection, write_concern, read_concern
) as collection:
update_func = collection.update_one
if multi:
update_func = collection.update_many
Expand All @@ -539,7 +556,7 @@ def update(
raise OperationError(message)
raise OperationError("Update failed (%s)" % err)

def upsert_one(self, write_concern=None, **update):
def upsert_one(self, write_concern=None, read_concern=None, **update):
"""Overwrite or add the first document matched by the query.
:param write_concern: Extra keyword arguments are passed down which
Expand All @@ -548,6 +565,7 @@ def upsert_one(self, write_concern=None, **update):
``save(..., write_concern={w: 2, fsync: True}, ...)`` will
wait until at least two servers have recorded the write and
will force an fsync on the primary server.
:param read_concern: Override the read concern for the operation
:param update: Django-style update keyword arguments
:returns the new or overwritten document
Expand All @@ -559,6 +577,7 @@ def upsert_one(self, write_concern=None, **update):
multi=False,
upsert=True,
write_concern=write_concern,
read_concern=read_concern,
full_result=True,
**update
)
Expand Down Expand Up @@ -1177,6 +1196,22 @@ def read_preference(self, read_preference):
queryset._cursor_obj = None # we need to re-create the cursor object whenever we apply read_preference
return queryset

def read_concern(self, read_concern):
"""Change the read_concern when querying.
:param read_concern: override ReplicaSetConnection-level
preference.
"""
if read_concern is not None and not isinstance(read_concern, Mapping):
raise TypeError("%r is not a valid read concern." % (read_concern,))

queryset = self.clone()
queryset._read_concern = (
ReadConcern(**read_concern) if read_concern is not None else None
)
queryset._cursor_obj = None # we need to re-create the cursor object whenever we apply read_concern
return queryset

def scalar(self, *fields):
"""Instead of returning Document instances, return either a specific
value or a tuple of values in order.
Expand Down Expand Up @@ -1623,9 +1658,9 @@ def _cursor(self):
# XXX In PyMongo 3+, we define the read preference on a collection
# level, not a cursor level. Thus, we need to get a cloned collection
# object using `with_options` first.
if self._read_preference is not None:
if self._read_preference is not None or self._read_concern is not None:
self._cursor_obj = self._collection.with_options(
read_preference=self._read_preference
read_preference=self._read_preference, read_concern=self._read_concern
).find(self._query, **self._cursor_args)
else:
self._cursor_obj = self._collection.find(self._query, **self._cursor_args)
Expand Down
41 changes: 41 additions & 0 deletions tests/queryset/test_queryset.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

from bson import DBRef, ObjectId
import pymongo
from pymongo.read_concern import ReadConcern
from pymongo.read_preferences import ReadPreference
from pymongo.results import UpdateResult
import pytest
Expand Down Expand Up @@ -4726,6 +4727,46 @@ def assert_read_pref(qs, expected_read_pref):
)
assert_read_pref(bars, ReadPreference.SECONDARY_PREFERRED)

def test_read_concern(self):
class Bar(Document):
txt = StringField()

meta = {"indexes": ["txt"]}

Bar.drop_collection()
bar = Bar.objects.create(txt="xyz")

bars = list(Bar.objects.read_concern(None))
assert bars == [bar]

bars = Bar.objects.read_concern({"level": "local"})
assert bars._read_concern.document == {"level": "local"}
assert bars._cursor.collection.read_concern.document == {"level": "local"}

# Make sure that `.read_concern(...)` does not accept string values.
with pytest.raises(TypeError):
Bar.objects.read_concern("local")

def assert_read_concern(qs, expected_read_concern):
assert qs._read_concern.document == expected_read_concern
assert qs._cursor.collection.read_concern.document == expected_read_concern

# Make sure read concern is respected after a `.skip(...)`.
bars = Bar.objects.skip(1).read_concern({"level": "local"})
assert_read_concern(bars, {"level": "local"})

# Make sure read concern is respected after a `.limit(...)`.
bars = Bar.objects.limit(1).read_concern({"level": "local"})
assert_read_concern(bars, {"level": "local"})

# Make sure read concern is respected after an `.order_by(...)`.
bars = Bar.objects.order_by("txt").read_concern({"level": "local"})
assert_read_concern(bars, {"level": "local"})

# Make sure read concern is respected after a `.hint(...)`.
bars = Bar.objects.hint([("txt", 1)]).read_concern({"level": "majority"})
assert_read_concern(bars, {"level": "majority"})

def test_json_simple(self):
class Embedded(EmbeddedDocument):
string = StringField()
Expand Down
48 changes: 48 additions & 0 deletions tests/test_context_managers.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,61 @@
no_dereference,
no_sub_classes,
query_counter,
set_read_write_concern,
set_write_concern,
switch_collection,
switch_db,
)
from mongoengine.pymongo_support import count_documents


class TestContextManagers:
def test_set_write_concern(self):
connect("mongoenginetest")

class User(Document):
name = StringField()

collection = User._get_collection()
original_write_concern = collection.write_concern

with set_write_concern(
collection, {"w": "majority", "j": True, "wtimeout": 1234}
) as updated_collection:
assert updated_collection.write_concern.document == {
"w": "majority",
"j": True,
"wtimeout": 1234,
}

assert original_write_concern.document == collection.write_concern.document

def test_set_read_write_concern(self):
connect("mongoenginetest")

class User(Document):
name = StringField()

collection = User._get_collection()

original_read_concern = collection.read_concern
original_write_concern = collection.write_concern

with set_read_write_concern(
collection,
{"w": "majority", "j": True, "wtimeout": 1234},
{"level": "local"},
) as update_collection:
assert update_collection.read_concern.document == {"level": "local"}
assert update_collection.write_concern.document == {
"w": "majority",
"j": True,
"wtimeout": 1234,
}

assert original_read_concern.document == collection.read_concern.document
assert original_write_concern.document == collection.write_concern.document

def test_switch_db_context_manager(self):
connect("mongoenginetest")
register_connection("testdb-1", "mongoenginetest2")
Expand Down

0 comments on commit 130e9c5

Please sign in to comment.