forked from RedisLabs/redisraft
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathjoin.c
138 lines (115 loc) · 4.48 KB
/
join.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
/*
* This file is part of RedisRaft.
*
* Copyright (c) 2020-2021 Redis Ltd.
*
* RedisRaft is licensed under the Redis Source Available License (RSAL).
*/
/* This is the implementation of RAFT.CLUSTER JOIN.
*
* It involves creating a Connection object linked to a JoinState, which is
* populated with one or more NodeAddr.
*
* We then iterate the address list, establish a connection and attempt to
* perform a RAFT.NODE ADD operation. If -MOVED replies are received they
* are processed by adding the new node address to our list.
*
* We currently continue iterating the address list forever.
* FIXME: Change the RAFT.CLUSTER JOIN implementation so it will (optionally?)
* block, hold a reference from the JoinState to the RaftReq and produce a
* reply only when successful - making this operation (optionally?) synchronous.
*/
#include <string.h>
#include <assert.h>
#include "redisraft.h"
void HandleClusterJoinFailed(RedisRaftCtx *rr, RaftReq *req) {
RedisModule_ReplyWithError(req->ctx, "ERR Failed to join cluster, check logs");
RaftReqFree(req);
}
/* Callback for the RAFT.NODE ADD command.
*/
static void handleNodeAddResponse(redisAsyncContext *c, void *r, void *privdata)
{
Connection *conn = privdata;
JoinLinkState *state = ConnGetPrivateData(conn);
RedisRaftCtx *rr = ConnGetRedisRaftCtx(conn);
redisReply *reply = r;
if (!reply) {
LOG_ERROR("RAFT.NODE ADD failed: connection dropped.");
ConnMarkDisconnected(conn);
} else if (reply->type == REDIS_REPLY_ERROR) {
/* -MOVED? */
if (strlen(reply->str) > 6 && !strncmp(reply->str, "MOVED ", 6)) {
NodeAddr addr;
if (!parseMovedReply(reply->str, &addr)) {
LOG_ERROR("RAFT.NODE ADD failed: invalid MOVED response: %s", reply->str);
} else {
LOG_VERBOSE("Join redirected to leader: %s:%d", addr.host, addr.port);
NodeAddrListAddElement(&state->addr, &addr);
}
} else if (strlen(reply->str) > 12 && !strncmp(reply->str, "CLUSTERDOWN ", 12)) {
LOG_ERROR("RAFT.NODE ADD error: %s, retrying.", reply->str);
} else {
LOG_ERROR("RAFT.NODE ADD failed: %s", reply->str);
state->failed = true;
}
} else if (reply->type != REDIS_REPLY_ARRAY || reply->elements != 2) {
LOG_ERROR("RAFT.NODE ADD invalid reply.");
} else {
LOG_INFO("Joined Raft cluster, node id: %lu, dbid: %.*s",
(unsigned long) reply->element[0]->integer,
(int) reply->element[1]->len, reply->element[1]->str);
strncpy(rr->snapshot_info.dbid, reply->element[1]->str, reply->element[1]->len);
rr->snapshot_info.dbid[RAFT_DBID_LEN] = '\0';
rr->config->id = reply->element[0]->integer;
HandleClusterJoinCompleted(rr, state->req);
assert(rr->state == REDIS_RAFT_UP);
ConnAsyncTerminate(conn);
}
redisAsyncDisconnect(c);
}
/* Connect callback -- if connection was established successfully we
* send the RAFT.NODE ADD command.
*/
static void sendNodeAddRequest(Connection *conn)
{
RedisRaftCtx *rr = ConnGetRedisRaftCtx(conn);
/* Connection is not good? Terminate and continue */
if (!ConnIsConnected(conn)) {
return;
}
if (redisAsyncCommand(ConnGetRedisCtx(conn), handleNodeAddResponse, conn,
"RAFT.NODE %s %d %s:%u",
"ADD",
rr->config->id,
rr->config->addr.host, rr->config->addr.port) != REDIS_OK) {
redisAsyncDisconnect(ConnGetRedisCtx(conn));
ConnMarkDisconnected(conn);
}
}
void handleClusterJoin(RedisRaftCtx *rr, RaftReq *req)
{
const char * type = "join";
if (checkRaftNotLoading(rr, req) == RR_ERROR) {
goto exit_fail;
}
if (rr->state != REDIS_RAFT_UNINITIALIZED) {
RedisModule_ReplyWithError(req->ctx, "ERR Already cluster member");
goto exit_fail;
}
JoinLinkState *state = RedisModule_Calloc(1, sizeof(*state));
state->type = RedisModule_Calloc(1, strlen(type)+1);
strcpy(state->type, type);
state->connect_callback = sendNodeAddRequest;
time(&(state->start));
NodeAddrListConcat(&state->addr, req->r.cluster_join.addr);
state->req = req;
/* We just create the connection with an idle callback, which will
* shortly fire and handle connection setup.
*/
state->conn = ConnCreate(rr, state, joinLinkIdleCallback, joinLinkFreeCallback);
rr->state = REDIS_RAFT_JOINING;
return;
exit_fail:
RaftReqFree(req);
}