Skip to content

Commit

Permalink
Get make test to a good state
Browse files Browse the repository at this point in the history
Signed-off-by: Jacob Murphy <[email protected]>
  • Loading branch information
murphyjacob4 committed Nov 2, 2024
1 parent d21a62c commit 2cac076
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 16 deletions.
4 changes: 2 additions & 2 deletions src/config.c
Original file line number Diff line number Diff line change
Expand Up @@ -2950,6 +2950,7 @@ static int setConfigReplicaOfOption(standardConfig *config, sds *argv, int argc,
}

freeReplicationLink(server.primary_replication_link);
server.primary_replication_link = NULL;

if (!strcasecmp(argv[0], "no") && !strcasecmp(argv[1], "one")) {
return 1;
Expand All @@ -2961,8 +2962,7 @@ static int setConfigReplicaOfOption(standardConfig *config, sds *argv, int argc,
*err = "Invalid primary port";
return 0;
}
sds primary_host = sdsnew(argv[0]);
server.primary_replication_link = createReplicationLink(primary_host, primary_port, -1);
server.primary_replication_link = createReplicationLink(argv[0], primary_port, -1);
server.primary_replication_link->state = REPL_STATE_CONNECT;
return 1;
}
Expand Down
2 changes: 1 addition & 1 deletion src/networking.c
Original file line number Diff line number Diff line change
Expand Up @@ -1700,7 +1700,7 @@ void freeClient(client *c) {
} else {
serverLog(LL_NOTICE, "Connection with primary lost.");
}
if (!c->flag.dont_cache_primary && !(c->flag.protocol_error || c->flag.blocked)) {
if (!c->flag.dont_cache_primary && !(c->flag.protocol_error || c->flag.blocked) && c->primary_slot_num == -1) {
c->flag.close_asap = 0;
c->flag.close_after_reply = 0;
replicationCachePrimary(c);
Expand Down
23 changes: 10 additions & 13 deletions src/replication.c
Original file line number Diff line number Diff line change
Expand Up @@ -1927,10 +1927,6 @@ void replicationCreateSourceClientWithHandler(replicationLink *link, int dbid, C
* connection. */
link->client->flag.primary = 1;
link->client->flag.authenticated = 1;
if (link != server.primary_replication_link) {
/* Don't try to cache our non-primary replication link. */
link->client->flag.dont_cache_primary = 1;
}

/* Allocate a private query buffer for the replication link client instead of using the shared query buffer.
* This is done because the replication link's query buffer data needs to be preserved for my sub-replicas to use. */
Expand Down Expand Up @@ -3987,6 +3983,7 @@ int cancelReplicationHandshake(replicationLink *link, int reconnect) {
/* Set replication to the specified primary address and port. */
void replicationSetPrimary(char *ip, int port, int full_sync_required) {
int was_primary = server.primary_replication_link == NULL;
int was_connected = !was_primary && server.primary_replication_link->state == REPL_STATE_CONNECTED;

if (server.primary_replication_link) {
if (server.primary_replication_link->client) server.primary_replication_link->client->flag.dont_cache_primary = full_sync_required;
Expand All @@ -3996,6 +3993,8 @@ void replicationSetPrimary(char *ip, int port, int full_sync_required) {

disconnectAllBlockedClients(); /* Clients blocked in primary, now replica. */

server.primary_replication_link = createReplicationLink(ip, port, -1);

/* Update oom_score_adj */
setOOMScoreAdj(-1);

Expand All @@ -4017,11 +4016,9 @@ void replicationSetPrimary(char *ip, int port, int full_sync_required) {
NULL);

/* Fire the primary link modules event. */
if (server.primary_replication_link && server.primary_replication_link->state == REPL_STATE_CONNECTED)
if (was_connected)
moduleFireServerEvent(VALKEYMODULE_EVENT_PRIMARY_LINK_CHANGE, VALKEYMODULE_SUBEVENT_PRIMARY_LINK_DOWN, NULL);

server.primary_replication_link = createReplicationLink(ip, port, -1);

/* Allow trying dual-channel-replication with the new primary. If new primary doesn't
* support dual-channel-replication, we will set to 0 afterwards. */
serverLog(LL_NOTICE, "Connecting to PRIMARY %s:%d", ip, port);
Expand Down Expand Up @@ -4263,7 +4260,7 @@ void replicationSendAck(replicationLink *link) {
* handshake in order to reactivate the cached primary.
*/
void replicationCachePrimary(client *c) {
serverAssert(server.primary_replication_link && server.primary_replication_link->client != NULL && server.cached_primary == NULL);
serverAssert(server.primary_replication_link && server.primary_replication_link->client != NULL && c == server.primary_replication_link->client && server.cached_primary == NULL);
serverLog(LL_NOTICE, "Caching the disconnected primary state.");

/* Unlink the client from the server structures. */
Expand All @@ -4273,10 +4270,10 @@ void replicationCachePrimary(client *c) {
* we want to discard the non processed query buffers and non processed
* offsets, including pending transactions, already populated arguments,
* pending outputs to the primary. */
sdsclear(server.primary_replication_link->client->querybuf);
server.primary_replication_link->client->qb_pos = 0;
server.primary_replication_link->client->repl_applied = 0;
server.primary_replication_link->client->read_reploff = server.primary_replication_link->client->reploff;
sdsclear(c->querybuf);
c->qb_pos = 0;
c->repl_applied = 0;
c->read_reploff = server.primary_replication_link->client->reploff;
if (c->flag.multi) discardTransaction(c);
listEmpty(c->reply);
c->sentlen = 0;
Expand All @@ -4286,7 +4283,7 @@ void replicationCachePrimary(client *c) {

/* Save the primary. Server.primary_replication_link->client will be set to null later by
* replicationHandlePrimaryDisconnection(). */
server.cached_primary = server.primary_replication_link->client;
server.cached_primary = c;

/* Invalidate the Peer ID cache. */
if (c->peerid) {
Expand Down

0 comments on commit 2cac076

Please sign in to comment.