Skip to content

Commit

Permalink
Stability fixes to PUBSUB code
Browse files Browse the repository at this point in the history
  • Loading branch information
romange committed Jun 8, 2022
1 parent 746c04f commit e646209
Show file tree
Hide file tree
Showing 6 changed files with 41 additions and 14 deletions.
10 changes: 10 additions & 0 deletions .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,11 @@ jobs:
update-alternatives --install /usr/bin/gcc gcc /usr/bin/gcc-9 40 \
--slave /usr/bin/g++ g++ /usr/bin/g++-9
run: |
# Work around https://github.com/actions/checkout/issues/766
git config --global --add safe.directory /src
cd /src
git describe --always --tags ${{ github.sha }}
if [ -d build-opt ]; then
chown -R root build-opt
ls -l ./build-opt
Expand Down Expand Up @@ -96,6 +100,12 @@ jobs:
submodules: true
- name: Build artifacts
run: |
# Work around https://github.com/actions/checkout/issues/766
git config --global --add safe.directory "$GITHUB_WORKSPACE"
git describe --always --tags ${{ github.sha }}
VERSION=$(git describe --always --tags ${{ github.sha }})
echo "::set-output name=version::${VERSION}"
./tools/release.sh
- name: Upload
uses: actions/upload-artifact@v3
Expand Down
2 changes: 2 additions & 0 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ if(ENABLE_GIT_VERSION)
else()
set(GIT_CLEAN_DIRTY "")
endif()
Message(STATUS "GIT_SHA1 ${GIT_SHA1}")
git_describe(GIT_VER --always)
Message(STATUS "GIT_VER ${GIT_VER}")
string(TIMESTAMP PRJ_BUILD_TIME "%Y-%m-%d %H:%M:%S" UTC)
else(ENABLE_GIT_VERSION)
set(GIT_VER "dev")
Expand Down
7 changes: 6 additions & 1 deletion src/server/channel_slice.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ extern "C" {
#include "redis/util.h"
}

#include "base/logging.h"

namespace dfly {
using namespace std;

Expand Down Expand Up @@ -72,7 +74,10 @@ auto ChannelSlice::FetchSubscribers(string_view channel) -> vector<Subscriber> {
void ChannelSlice::CopySubscribers(const SubscribeMap& src, const std::string& pattern,
vector<Subscriber>* dest) {
for (const auto& sub : src) {
Subscriber s(sub.first, sub.second.thread_id);
ConnectionContext* cntx = sub.first;
CHECK(cntx->conn_state.subscribe_info);

Subscriber s(cntx, sub.second.thread_id);
s.pattern = pattern;
s.borrow_token.Inc();

Expand Down
28 changes: 16 additions & 12 deletions src/server/conn_context.cc
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,6 @@ void ConnectionContext::ChangeSubscription(bool to_add, bool to_reply, CmdArgLis
}
}

if (!to_add && conn_state.subscribe_info->IsEmpty()) {
conn_state.subscribe_info.reset();
force_dispatch = false;
}

sort(channels.begin(), channels.end());

// prepare the array in order to distribute the updates to the shards.
Expand Down Expand Up @@ -90,6 +85,12 @@ void ConnectionContext::ChangeSubscription(bool to_add, bool to_reply, CmdArgLis
// Update subscription
shard_set->RunBriefInParallel(move(cb),
[&](ShardId sid) { return shard_idx[sid + 1] > shard_idx[sid]; });

// It's important to reset
if (!to_add && conn_state.subscribe_info->IsEmpty()) {
conn_state.subscribe_info.reset();
force_dispatch = false;
}
}

if (to_reply) {
Expand Down Expand Up @@ -139,15 +140,10 @@ void ConnectionContext::ChangePSub(bool to_add, bool to_reply, CmdArgList args)
}
}

if (!to_add && conn_state.subscribe_info->IsEmpty()) {
conn_state.subscribe_info.reset();
force_dispatch = false;
}

int32_t tid = util::ProactorBase::GetIndex();
DCHECK_GE(tid, 0);

// Update the subscribers on publisher's side.
// Update the subscribers on channel-slice side.
auto cb = [&](EngineShard* shard) {
ChannelSlice& cs = shard->channel_slice();
for (string_view pattern : patterns) {
Expand All @@ -161,6 +157,13 @@ void ConnectionContext::ChangePSub(bool to_add, bool to_reply, CmdArgList args)

// Update pattern subscription. Run on all shards.
shard_set->RunBriefInParallel(move(cb));

// Important to reset conn_state.subscribe_info only after all references to it were
// removed from channel slices.
if (!to_add && conn_state.subscribe_info->IsEmpty()) {
conn_state.subscribe_info.reset();
force_dispatch = false;
}
}

if (to_reply) {
Expand Down Expand Up @@ -209,7 +212,8 @@ void ConnectionContext::SendSubscriptionChangedResponse(string_view action,
}

void ConnectionContext::OnClose() {
if (!conn_state.subscribe_info) return;
if (!conn_state.subscribe_info)
return;

if (!conn_state.subscribe_info->channels.empty()) {
auto token = conn_state.subscribe_info->borrow_token;
Expand Down
2 changes: 1 addition & 1 deletion src/server/dragonfly_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ TEST_F(DflyEngineTest, Hello) {
auto resp = Run({"hello", "2"});
ASSERT_THAT(resp, ArrLen(12));
EXPECT_THAT(resp.GetVec(),
ElementsAre("server", "redis", "version", "df-dev", "proto",
ElementsAre("server", "redis", "version", ArgType(RespExpr::STRING), "proto",
IntArg(2), "id", ArgType(RespExpr::INT64), "mode",
"standalone", "role", "master"));

Expand Down
6 changes: 6 additions & 0 deletions src/server/main_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -897,6 +897,9 @@ void Service::Publish(CmdArgList args, ConnectionContext* cntx) {

auto cb = [&] { return EngineShard::tlocal()->channel_slice().FetchSubscribers(channel); };

// How do we know that subsribers did not disappear after we fetched them?
// Each subscriber object hold a borrow_token.
// ConnectionContext::OnClose does not reset subscribe_info before all tokens are returned.
vector<ChannelSlice::Subscriber> subscriber_arr = shard_set->Await(sid, std::move(cb));
atomic_uint32_t published{0};

Expand All @@ -912,6 +915,8 @@ void Service::Publish(CmdArgList args, ConnectionContext* cntx) {
}

fibers_ext::BlockingCounter bc(subscriber_arr.size());

// We run publish_cb in each subsriber's thread.
auto publish_cb = [&, bc](unsigned idx, util::ProactorBase*) mutable {
unsigned start = slices[idx];

Expand All @@ -921,6 +926,7 @@ void Service::Publish(CmdArgList args, ConnectionContext* cntx) {
break;

published.fetch_add(1, memory_order_relaxed);

facade::Connection* conn = subscriber_arr[i].conn_cntx->owner();
DCHECK(conn);
facade::Connection::PubMessage pmsg;
Expand Down

0 comments on commit e646209

Please sign in to comment.