Skip to content

Commit 0da4e2a

Browse files
dranikpgromange
authored andcommitted
chore: fix search replication (#3547)
chore: fix search replcation Signed-off-by: Vladislav Oleshko <[email protected]>
1 parent f54d755 commit 0da4e2a

File tree

3 files changed

+44
-0
lines changed

3 files changed

+44
-0
lines changed

src/server/search/doc_index.cc

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,15 +182,22 @@ void ShardDocIndex::Rebuild(const OpArgs& op_args, PMR_NS::memory_resource* mr)
182182
auto cb = [this](string_view key, BaseAccessor* doc) { indices_.Add(key_index_.Add(key), doc); };
183183
TraverseAllMatching(*base_, op_args, cb);
184184

185+
was_built_ = true;
185186
VLOG(1) << "Indexed " << key_index_.Size() << " docs on " << base_->prefix;
186187
}
187188

188189
void ShardDocIndex::AddDoc(string_view key, const DbContext& db_cntx, const PrimeValue& pv) {
190+
if (!was_built_)
191+
return;
192+
189193
auto accessor = GetAccessor(db_cntx, pv);
190194
indices_.Add(key_index_.Add(key), accessor.get());
191195
}
192196

193197
void ShardDocIndex::RemoveDoc(string_view key, const DbContext& db_cntx, const PrimeValue& pv) {
198+
if (!was_built_)
199+
return;
200+
194201
auto accessor = GetAccessor(db_cntx, pv);
195202
DocId id = key_index_.Remove(key);
196203
indices_.Remove(id, accessor.get());

src/server/search/doc_index.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,7 @@ class ShardDocIndex {
143143
void Rebuild(const OpArgs& op_args, PMR_NS::memory_resource* mr);
144144

145145
private:
146+
bool was_built_ = false;
146147
std::shared_ptr<const DocIndex> base_;
147148
search::FieldIndices indices_;
148149
DocKeyIndex key_index_;

tests/dragonfly/replication_test.py

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1858,6 +1858,42 @@ async def test_search(df_factory):
18581858
].id == "k0"
18591859

18601860

1861+
@dfly_args({"proactor_threads": 4})
1862+
async def test_search_with_stream(df_factory: DflyInstanceFactory):
1863+
master = df_factory.create()
1864+
replica = df_factory.create()
1865+
1866+
df_factory.start_all([master, replica])
1867+
1868+
c_master = master.client()
1869+
c_replica = replica.client()
1870+
1871+
# fill master with hsets and create index
1872+
p = c_master.pipeline(transaction=False)
1873+
for i in range(10_000):
1874+
p.hset(f"k{i}", mapping={"name": f"name of {i}"})
1875+
await p.execute()
1876+
1877+
await c_master.execute_command("FT.CREATE i1 SCHEMA name text")
1878+
1879+
# start replication and issue one add command and delete commands on master in parallel
1880+
await c_replica.execute_command(f"REPLICAOF localhost {master.port}")
1881+
await c_master.hset("secret-key", mapping={"name": "new-secret"})
1882+
for i in range(1_000):
1883+
await c_master.delete(f"k{i}")
1884+
1885+
# expect replica to see only 10k - 1k + 1 = 9001 keys in it's index
1886+
await wait_available_async(c_replica)
1887+
assert await c_replica.execute_command("FT.SEARCH i1 * LIMIT 0 0") == [9_001]
1888+
assert await c_replica.execute_command('FT.SEARCH i1 "secret"') == [
1889+
1,
1890+
"secret-key",
1891+
["name", "new-secret"],
1892+
]
1893+
1894+
await close_clients(c_master, c_replica)
1895+
1896+
18611897
# @pytest.mark.slow
18621898
@pytest.mark.asyncio
18631899
async def test_client_pause_with_replica(df_factory, df_seeder_factory):

0 commit comments

Comments
 (0)