在 Redis cluster 中故障转移是个很重要的功能,下面就从故障发现到故障转移整个流程做一下详细分析。
故障检测
PFAIL 标记
集群中每个节点都会定期向其他节点发送 PING 消息,以此来检测对方是否在线,如果接收 PING 消息的节点 B 没有在规定时间(cluster_node_timeout)内回应节点 A PONG 消息,那么节点 A 就会将节点 B 标记为疑似下线(probable fail, PFAIL)。
void clusterCron(void) { // ... di = dictGetSafeIterator(server.cluster->nodes); while((de = dictNext(di)) != NULL) { clusterNode *node = dictGetVal(de); now = mstime(); /* Use an updated time at every iteration. */ // ... delay = now - node->ping_sent; if (delay > server.cluster_node_timeout) { /* Timeout reached. Set the node as possibly failing if it is * not already in this state. */ if (!(node->flags & (CLUSTER_NODE_PFAIL|CLUSTER_NODE_FAIL))) { node->flags |= CLUSTER_NODE_PFAIL; update_state = 1; } } } dictReleaseIterator(di); // ...}
可以看到,在 clusterCron
函数中如果对节点 B 发出 PING 消息,在 server.cluster_node_timeout 时间内没有收到其返回的 PONG 消息,如果节点 B 现在没有被标记成 CLUSTER_NODE_PFAIL 状态,那么现在就做下这个标记。
可以根据 ping_sent 参数进行判断的依据如下,
int clusterProcessPacket(clusterLink *link) { // ... if (link->node && type == CLUSTERMSG_TYPE_PONG) { link->node->pong_received = mstime(); link->node->ping_sent = 0; // ... } // ...}
当节点 A 接收到节点 B 的 PONG 消息时,会把 ping_sent 更新成 0,同时记下收到本次 PONG 消息的时间。
上面提到的 clusterNode 与 clusterLink 有如下关联关系:
可以看出, clusterLink 就是为了接收对端 gossip 消息而设置的。
另外,我们发现, 在上面的 clusterCron
函数中将节点标记成 PFAIL 时,会将 update_state 变量置为 1,这会引发后面更改集群状态的逻辑。
if (update_state || server.cluster->state == CLUSTER_FAIL) clusterUpdateState();
集群有两个状态,CLUSTER_OK 和 CLUSTER_FAIL,如果集群目前状态是 CLUSTER_FAIL,且设置了参数 cluster-require-full-coverage yes
,那么此时访问集群会返回错误,意思是可能有某些 slot 没有被 server 接管。
clusterUpdateState
函数负责更新集群状态,该部分逻辑与本篇博文要讲的主逻辑关系不大,所以放到了后面的补充章节中了。
FAIL 标记
主动标记 FAIL
被节点 A 标记成 FAIL/ PFAIL 的节点如何让节点 C 知道呢?这主要是通过平常发送的 PING/PONG 消息实现的,在 3.x 的版本时,会尽最大努力把这样的节点放到 gossip 消息的流言部分,到后面的 4.x 版本的代码中每次的 PING/PONG 消息都会把 PFAIL 节点都带上。
clusterProcessGossipSection
函数用来处理 gossip 消息的流言部分。
void clusterProcessGossipSection(clusterMsg *hdr, clusterLink *link) { uint16_t count = ntohs(hdr->count); clusterMsgDataGossip *g = (clusterMsgDataGossip*) hdr->data.ping.gossip; clusterNode *sender = link->node ? link->node : clusterLookupNode(hdr->sender); while(count--) { // ... node = clusterLookupNode(g->nodename); if (node) { if (sender && nodeIsMaster(sender) && node != myself) { if (flags & (CLUSTER_NODE_FAIL|CLUSTER_NODE_PFAIL)) { if (clusterNodeAddFailureReport(node,sender)) { serverLog(LL_VERBOSE, "Node %.40s reported node %.40s as not reachable.", sender->name, node->name); } markNodeAsFailingIfNeeded(node); } else { // ... } } // ... } // ... } // ...}
该函数依次处理 gossip 消息流言部分携带的各节点信息(总节点数的1/10)。当发现带有 CLUSTER_NODE_FAIL 或者 CLUSTER_NODE_PFAIL 时会调用 clusterNodeAddFailureReport
函数。
int clusterNodeAddFailureReport(clusterNode *failing, clusterNode *sender) { list *l = failing->fail_reports; listNode *ln; listIter li; clusterNodeFailReport *fr; /* If a failure report from the same sender already exists, just update * the timestamp. */ listRewind(l,&li); while ((ln = listNext(&li)) != NULL) { fr = ln->value; if (fr->node == sender) { fr->time = mstime(); return 0; } } /* Otherwise create a new report. */ fr = zmalloc(sizeof(*fr)); fr->node = sender; fr->time = mstime(); listAddNodeTail(l,fr); return 1;}
每一个节点都有一个名为 fail_reports 的 list 结构的变量,用来搜集该异常节点获得了集群中哪些节点的 PFAIL 状态投票。fail_reports 每个成员都是一个 clusterNodeFailReport 结构。
typedef struct clusterNodeFailReport { struct clusterNode *node; /* Node reporting the failure condition. */ mstime_t time; /* Time of the last report from this node. */} clusterNodeFailReport;
clusterNodeFailReport 中带有时间戳,标记这个节点上一次被报上来处于异常状态的时间。
每次调用 clusterNodeAddFailureReport
函数时,先会检查sender 是否已经为该异常节点投票过了,如果有,更新时间戳,如果没有,把 sender 加入到投票节点中。
简单点说就是,在 A 节点看来 B 节点是 PFAIL 状态,在 gossip 通信中把它告诉了 C 节点,C 节点发现这个异常状态的节点,检查一下为 B 节点投过票的节点中有没有 A 节点,如果没有就加进去。
然后下面就是判断 PFAIL 状态是不是要转变成 FAIL 状态的关键。
void markNodeAsFailingIfNeeded(clusterNode *node) { int failures; int needed_quorum = (server.cluster->size / 2) + 1; if (!nodeTimedOut(node)) return; /* We can reach it. */ if (nodeFailed(node)) return; /* Already FAILing. */ failures = clusterNodeFailureReportsCount(node); /* Also count myself as a voter if I'm a master. */ if (nodeIsMaster(myself)) failures++; if (failures < needed_quorum) return; /* No weak agreement from masters. */ serverLog(LL_NOTICE, "Marking node %.40s as failing (quorum reached).", node->name); /* Mark the node as failing. */ node->flags &= ~CLUSTER_NODE_PFAIL; node->flags |= CLUSTER_NODE_FAIL; node->fail_time = mstime(); /* Broadcast the failing node name to everybody, forcing all the other * reachable nodes to flag the node as FAIL. */ if (nodeIsMaster(myself)) clusterSendFail(node->name); /* 广播这个节点的 fail 消息 */ clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG);}
C 节点收到消息,检查下 A 报过来的异常节点 B,在自己看来是否也是 PFAIL 状态的,如果不是,那么不理会 A 节点本次 report。如果在节点 C 看来,节点 B 已经被标记成 FAIL 了,那么就不需要进行下面的判定了。
在函数 clusterNodeFailureReportsCount
中会判断计算出把 B 节点标记成 PFAIL 状态的节点的数量 sum,如果 sum 值小于集群 size 的一半,为防止误判,忽略掉这条信息。在函数 clusterNodeFailureReportsCount
中会检查关于 B 节点的 clusterNodeFailReport,清理掉那些过期的投票,过期时间为 2 倍的 server.cluster_node_timeout。
如果满足条件,节点 C 将节点 B 的 PFAIL 状态消除,标记成 FAIL,同时记下 fail_time,如果 C 节点是个 master,那么将 B 节点 FAIL 的消息广播出去,以便让集群中其他节点尽快知道。
void clusterSendFail(char *nodename) { unsigned char buf[sizeof(clusterMsg)]; clusterMsg *hdr = (clusterMsg*) buf; clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_FAIL); memcpy(hdr->data.fail.about.nodename,nodename,CLUSTER_NAMELEN); clusterBroadcastMessage(buf,ntohl(hdr->totlen));}
发送的 gossip 消息类型为 CLUSTERMSG_TYPE_FAIL,广播的节点排除自身和处于 HANDSHAKE 状态节点。
Gossip 被动感知 FAIL
前面说过,gossip 消息的处理函数为 clusterProcessPacket
,下面看 CLUSTERMSG_TYPE_FAIL 类型的消息如何处理。
int clusterProcessPacket(clusterLink *link) { // ... uint16_t type = ntohs(hdr->type); // ... if (type == CLUSTERMSG_TYPE_FAIL) { // fail clusterNode *failing; if (sender) { failing = clusterLookupNode(hdr->data.fail.about.nodename); if (failing && !(failing->flags & (CLUSTER_NODE_FAIL|CLUSTER_NODE_MYSELF))) { serverLog(LL_NOTICE, "FAIL message received from %.40s about %.40s", hdr->sender, hdr->data.fail.about.nodename); failing->flags |= CLUSTER_NODE_FAIL; failing->fail_time = mstime(); failing->flags &= ~CLUSTER_NODE_PFAIL; clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG| CLUSTER_TODO_UPDATE_STATE); } } else { serverLog(LL_NOTICE, "Ignoring FAIL message from unknown node %.40s about %.40s", hdr->sender, hdr->data.fail.about.nodename); } } // ...}
集群中另一个节点 D 收到节点 B 广播过来的消息:B 节点 FAIL 了。如果 D 还没有把 B 标记成 FAIL,那么标记成 CLUSTER_NODE_FAIL,并取消 CLUSTER_NODE_PFAIL 标记;否则,忽略,因为D已经知道 B 是 FAIL 节点了。
故障转移
failover 分为两类,主动 failover(主动切主从)以及被动 failover(被动切主从),下面挨个进行分析。
被动 failover
先验条件及初始化
void clusterCron(void) { // ... if (nodeIsSlave(myself)) { clusterHandleSlaveFailover(); // ... } // ...}
是否要做被动主从切换,在 clusterHandleSlaveFailover
函数中有如下的判断逻辑,
if (nodeIsMaster(myself) || myself->slaveof == NULL || (!nodeFailed(myself->slaveof) && !manual_failover) || myself->slaveof->numslots == 0){ /* There are no reasons to failover, so we set the reason why we * are returning without failing over to NONE. */ server.cluster->cant_failover_reason = CLUSTER_CANT_FAILOVER_NONE; return;}
只有满足如下条件的节点才有资格做 failover:
- slave 节点
- master 不为空
- master 负责的 slot 数量不为空
- master 被标记成了 FAIL,或者这是一个主动 failover(manual_failover 为真)
假设,现在 B 节点的 slave Bx 节点检测到 B 节点挂掉了,通过了以上的条件测试,接下来就会进行 failover。
那么下面 Bx 节点就开始在集群中进行拉票,该逻辑也在 clusterHandleSlaveFailover
函数中。
mstime_t auth_age = mstime() - server.cluster->failover_auth_time; int needed_quorum = (server.cluster->size / 2) + 1;mstime_t auth_timeout, auth_retry_time;auth_timeout = server.cluster_node_timeout*2;if (auth_timeout < 2000) auth_timeout =2000 ;auth_retry_time = auth_timeout*2;
cluster 的 failover_auth_time 属性,表示 slave 节点开始进行故障转移的时刻。集群初始化时该属性置为 0,一旦满足 failover 的条件后,该属性就置为未来的某个时间点(不是立马执行),在该时间点,slave 节点才开始进行拉票。
auth_age 变量表示从发起 failover 流程开始到现在,已经过去了多长时间。
needed_quorum 变量表示当前 slave 节点必须至少获得多少选票,才能成为新的 master。
auth_timeout 变量表示当前 slave 发起投票后,等待回应的超时时间,至少为 2s。如果超过该时间还没有获得足够的选票,那么表示本次 failover 失败。
auth_retry_time 变量用来判断是否可以开始发起下一次 failover 的时间间隔。
if (server.repl_state == REPL_STATE_CONNECTED) { data_age = (mstime_t)(server.unixtime - server.master->lastinteraction) * 1000;} else { data_age = (mstime_t)(server.unixtime - server.repl_down_since) * 1000;}if (data_age > server.cluster_node_timeout) data_age -= server.cluster_node_timeout;
data_age 变量表示距离上一次与我的 master 节点交互过去了多长时间。经过 cluster_node_timeout 时间还没有收到 PONG 消息才会将节点标记为 PFAIL 状态。实际上 data_age 表示在 master 节点下线之前,当前 slave 节点有多长时间没有与其交互过了。
data_age 主要用于判断当前 slave 节点的数据新鲜度;如果 data_age 超过了一定时间,表示当前 slave 节点的数据已经太老了,不能替换掉下线 master 节点,因此在不是手动强制故障转移的情况下,直接返回。
制定 failover 时间
void clusterHandleSlaveFailover(void) { // ... if (auth_age > auth_retry_time) { server.cluster->failover_auth_time = mstime() + 500 + /* Fixed delay of 500 milliseconds, let FAIL msg propagate. */ random() % 500; /* Random delay between 0 and 500 milliseconds. */ server.cluster->failover_auth_count = 0; server.cluster->failover_auth_sent = 0; server.cluster->failover_auth_rank = clusterGetSlaveRank(); /* We add another delay that is proportional to the slave rank. * Specifically 1 second * rank. This way slaves that have a probably * less updated replication offset, are penalized. * */ server.cluster->failover_auth_time += server.cluster->failover_auth_rank * 1000; if (server.cluster->mf_end) { server.cluster->failover_auth_time = mstime(); server.cluster->failover_auth_rank = 0; } // ... clusterBroadcastPong(CLUSTER_BROADCAST_LOCAL_SLAVES); return; } // ...}
满足条件(auth_age > auth_retry_time)后,发起故障转移流程。
首先设置故障转移发起时刻,即设置 failover_auth_time 时间。
mstime() + 500 + random()%500 + rank*1000
固定延时 500ms 是为了让 master fail 的消息能够广泛传播到集群,这样集群中的其他节点才可能投票。
随机延时是为了避免多个你 slave 节点同时发起 failover 流程。
rank 表示 slave 节点的排名,计算方式如下,
int clusterGetSlaveRank(void) { long long myoffset; int j, rank = 0; clusterNode *master; serverAssert(nodeIsSlave(myself)); master = myself->slaveof; if (master == NULL) return 0; /* Never called by slaves without master. */ myoffset = replicationGetSlaveOffset(); for (j = 0; j < master->numslaves; j++) if (master->slaves[j] != myself && master->slaves[j]->repl_offset > myoffset) rank++; return rank;}
可以看出,排名主要是根据复制数据量来定,复制数据量越多,排名越靠前(rank 值越小)。这样做是为了做 failover 时尽量选择一个复制数据量较多的 slave,以尽最大努力保留数据。在没有开始拉选票之前,每隔一段时间(每次调用clusterHandleSlaveFailover
函数,也就是每次 cron 的时间)就会调用一次 clusterGetSlaveRank
函数,以更新当前 slave 节点的排名。
slave 拉选票
首先对于一些不合理的 failover 要过滤掉。
/* Return ASAP if we can't still start the election. */if (mstime() < server.cluster->failover_auth_time) { clusterLogCantFailover(CLUSTER_CANT_FAILOVER_WAITING_DELAY); return;}/* Return ASAP if the election is too old to be valid. * failover 超时 */if (auth_age > auth_timeout) { clusterLogCantFailover(CLUSTER_CANT_FAILOVER_EXPIRED); return;}
然后开始拉选票。
if (server.cluster->failover_auth_sent == 0) { server.cluster->currentEpoch++; // 增加当前节点的currentEpoch的值,表示要开始新一轮选举了 server.cluster->failover_auth_epoch = server.cluster->currentEpoch; serverLog(LL_WARNING,"Starting a failover election for epoch %llu.", (unsigned long long) server.cluster->currentEpoch); /* 向所有节点发送 CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST 消息,开始拉票*/ clusterRequestFailoverAuth(); server.cluster->failover_auth_sent = 1; // 表示已经发起了故障转移流程 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG| CLUSTER_TODO_UPDATE_STATE| CLUSTER_TODO_FSYNC_CONFIG); return; /* Wait for replies. */}
如果 failover_auth_sent 为 0,表示没有发起过投票,那么将 currentEpoch 加 1,记录 failover_auth_epoch 为 currentEpoch,函数 clusterRequestFailoverAuth
用来发起投票,failover_auth_sent 置 1,表示该 slave 已经发起过投票了。
void clusterRequestFailoverAuth(void) { unsigned char buf[sizeof(clusterMsg)]; clusterMsg *hdr = (clusterMsg*) buf; uint32_t totlen; clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST); /* If this is a manual failover, set the CLUSTERMSG_FLAG0_FORCEACK bit * in the header to communicate the nodes receiving the message that * they should authorized the failover even if the master is working. */ if (server.cluster->mf_end) hdr->mflags[0] |= CLUSTERMSG_FLAG0_FORCEACK; totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData); hdr->totlen = htonl(totlen); clusterBroadcastMessage(buf,totlen);}
clusterRequestFailoverAuth
函数向集群广播 CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST 类型的 gossip 信息,这类型的信息就是向集群中的 master 节点索要本轮选举中的选票。另外,如果是 mf,那么会在 gossip hdr 中带上 CLUSTERMSG_FLAG0_FORCEACK 信息。
其他 master 投票
else if (type == CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST) { if (!sender) return 1; /* We don't know that node. */ clusterSendFailoverAuthIfNeeded(sender,hdr);}
在 clusterProcessPacket
函数中处理 gossip 消息,当接收到 CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST 类型的消息时,调用 clusterSendFailoverAuthIfNeeded
函数处理,在满足条件的基础上,给 sender 投票。
注:以下若不进行特殊说明,都是 clusterSendFailoverAuthIfNeeded
函数处理逻辑。
筛掉没资格投票的节点
if (nodeIsSlave(myself) || myself->numslots == 0) return;
slave 节点或者不负责 slot 的 master 节点
筛掉不需要投票的 sender
uint64_t requestCurrentEpoch = ntohu64(request->currentEpoch);if (requestCurrentEpoch < server.cluster->currentEpoch) { serverLog(LL_WARNING, "Failover auth denied to %.40s: reqEpoch (%llu) < curEpoch(%llu)", node->name, (unsigned long long) requestCurrentEpoch, (unsigned long long) server.cluster->currentEpoch); return;}
sender 节点集群信息过旧。
正常来说,如果 receiver 在接收到 sender 的 CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST 消息之前接收了 PING/PONG 消息,会更新自己的 currentEpoch,这时 currentEpoch 会增加,因为 sender 发起选举之前,会先增加自身的currentEpoch;否则的话,receiver 的 currentEpoch 应该小于 sender。因此 sender 的 currentEpoch 应该 >= receiver 的。有可能 sender 是个长时间下线的节点刚刚上线,这样的节点不能给他投票,因为它的集群信息过旧。if (server.cluster->lastVoteEpoch == server.cluster->currentEpoch) { serverLog(LL_WARNING, "Failover auth denied to %.40s: already voted for epoch %llu", node->name, (unsigned long long) server.cluster->currentEpoch); return;}
receiver 节点在本轮选举中已经投过票了,避免两个 slave 节点同时赢得本界选举。
lastVoteEpoch 记录了在本轮投票中 receiver 投过票的 sender 的 currentEpoch。各 slave 节点独立发起选举,currentEpoch 是相同的,都在原来的基础上加 1。clusterNode *master = node->slaveof;if (nodeIsMaster(node) || master == NULL || (!nodeFailed(master) && !force_ack)){ if (nodeIsMaster(node)) { serverLog(LL_WARNING, "Failover auth denied to %.40s: it is a master node", node->name); } else if (master == NULL) { serverLog(LL_WARNING, "Failover auth denied to %.40s: I don't know its master", node->name); } else if (!nodeFailed(master)) { serverLog(LL_WARNING, "Failover auth denied to %.40s: its master is up", node->name); } return;}
sender 是个 master。
sender 是个没有 master 的 slave。 sender 的 master 没有 fail,且不是个 mf。if (mstime() - node->slaveof->voted_time < server.cluster_node_timeout * 2){ serverLog(LL_WARNING, "Failover auth denied to %.40s: " "can't vote about this master before %lld milliseconds", node->name, (long long) ((server.cluster_node_timeout*2) - (mstime() - node->slaveof->voted_time))); return;}
两次投票时间间隔不能少于 2 倍 的 cluster_node_timeout。
这个裕量时间,使得获得赢得选举的 slave 将新的主从关系周知集群其他节点,避免其他 slave 发起新一轮的投票。uint64_t requestConfigEpoch = ntohu64(request->configEpoch);unsigned char *claimed_slots = request->myslots;for (j = 0; j < CLUSTER_SLOTS; j++) { if (bitmapTestBit(claimed_slots, j) == 0) continue; if (server.cluster->slots[j] == NULL || server.cluster->slots[j]->configEpoch <= requestConfigEpoch) { continue; } /* If we reached this point we found a slot that in our current slots * is served by a master with a greater configEpoch than the one claimed * by the slave requesting our vote. Refuse to vote for this slave. */ serverLog(LL_WARNING, "Failover auth denied to %.40s: " "slot %d epoch (%llu) > reqEpoch (%llu)", node->name, j, (unsigned long long) server.cluster->slots[j]->configEpoch, (unsigned long long) requestConfigEpoch); return;}
sender 节点声称要接管的 slots,在 receiver 节点看来其中有个别 slot 原来负责节点的 configEpoch 要比 sender 的大,这说明 sender 看到的集群消息太旧了,这可能是一个长时间下线又重新上线的节点。
在本轮选举投票
clusterSendFailoverAuth(node);server.cluster->lastVoteEpoch = server.cluster->currentEpoch;node->slaveof->voted_time = mstime(); // 更新投票时间
clusterSendFailoverAuth
函数中发送 CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK 类型的 gossip 消息,这就算在本轮选举中投票了,并记录本轮投票的 epoch以及投票时间。
slave 统计选票
slave 接收到 CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK 类型的 gossip 消息,就算统计到一票。
else if (type == CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK) { // slave 统计票数 if (!sender) return 1; /* We don't know that node. */ /* We consider this vote only if the sender is a master serving * a non zero number of slots, and its currentEpoch is greater or * equal to epoch where this node started the election. */ if (nodeIsMaster(sender) && sender->numslots > 0 && senderCurrentEpoch >= server.cluster->failover_auth_epoch) { server.cluster->failover_auth_count++; /* Maybe we reached a quorum here, set a flag to make sure * we check ASAP. */ clusterDoBeforeSleep(CLUSTER_TODO_HANDLE_FAILOVER); }}
sender 是个负责 slot 的 master 并且满足 currentEpoch 的要求,那么这张选票有效。出现 senderCurrentEpoch < server.cluster->failover_auth_epoch
的情况时有可能的,如果这张选票是上一轮选举的获得选票,就不能作数。
slave 做主从切换
void clusterHandleSlaveFailover(void) { // ... int needed_quorum = (server.cluster->size / 2) + 1; if (server.cluster->failover_auth_count >= needed_quorum) { /* We have the quorum, we can finally failover the master. */ serverLog(LL_WARNING, "Failover election won: I'm the new master."); /* Update my configEpoch to the epoch of the election. */ if (myself->configEpoch < server.cluster->failover_auth_epoch) { myself->configEpoch = server.cluster->failover_auth_epoch; serverLog(LL_WARNING, "configEpoch set to %llu after successful failover", (unsigned long long) myself->configEpoch); } /* Take responsability for the cluster slots. */ clusterFailoverReplaceYourMaster(); } else { clusterLogCantFailover(CLUSTER_CANT_FAILOVER_WAITING_VOTES); }}
slave 节点获得足够多选票后, 成为新的 master 节点。
更新自己的 configEpoch 为选举协商的 failover_auth_epoch,这是本节点就获得了最新当前集群最大的 configEpoch,表明它看到的集群信息现在是最新的。最后调用clusterFailoverReplaceYourMaster
函数取代下线主节点,成为新的主节点,并向其他节点广播这种变化。 void clusterFailoverReplaceYourMaster(void) { int j; clusterNode *oldmaster = myself->slaveof; if (nodeIsMaster(myself) || oldmaster == NULL) return; /* 1) Turn this node into a master. */ /* 把 myself 标记为 master,并从原 master 里删掉,更新原 master 的涉及 slave 的参数, * 如果 slave 数量为0,去掉它的 CLUSTER_NODE_MIGRATE_TO 标记 */ clusterSetNodeAsMaster(myself); /* 取消主从复制过程,将当前节点升级为主节点 *、 replicationUnsetMaster(); /* 2) Claim all the slots assigned to our master. * 接手老的 master 节点负责的槽位 */ for (j = 0; j < CLUSTER_SLOTS; j++) { if (clusterNodeGetSlotBit(oldmaster,j)) { clusterDelSlot(j); clusterAddSlot(myself,j); } } /* 3) Update state and save config. */ clusterUpdateState(); clusterSaveConfigOrDie(1); /* 4) Pong all the other nodes so that they can update the state * accordingly and detect that we switched to master role. */ clusterBroadcastPong(CLUSTER_BROADCAST_ALL); /* 5) If there was a manual failover in progress, clear the state. */ resetManualFailover();}
进行必要的 flag 设置和 slots 交接,向集群广播 PONG 消息,并进行善后处理。
集群其他节点感知主从变化
if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_PONG || type == CLUSTERMSG_TYPE_MEET) { // ... /* Check for role switch: slave -> master or master -> slave. */ if (sender) { if (!memcmp(hdr->slaveof, CLUSTER_NODE_NULL_NAME, sizeof(hdr->slaveof))) { /* Node is a master. set master flag for sender */ clusterSetNodeAsMaster(sender); } // ... } clusterNode *sender_master = NULL; /* Sender or its master if slave. */ int dirty_slots = 0; /* Sender claimed slots don't match my view? */ if (sender) { sender_master = nodeIsMaster(sender) ? sender : sender->slaveof; if (sender_master) { dirty_slots = memcmp(sender_master->slots, hdr->myslots, sizeof(hdr->myslots)) != 0; } } if (sender && nodeIsMaster(sender) && dirty_slots) clusterUpdateSlotsConfigWith(sender,senderConfigEpoch,hdr->myslots); // ...}
集群中其他节点接收到 PONG 消息后,对 sender 进行正确的 role 标记,以某节点 D 为例。
对于刚刚做完故障转移的 slave,也即现在 master,在节点 D 看来它负责的 slot 是空的,所以 dirty_slots 为 1。之后调用clusterUpdateSlotsConfigWith
函数处理 slots 的 dirty diff 信息。至此 failover 的逻辑就已经基本完成。 主动 failover
除了上面的发现故障后集群自动 failover,也可以进行主动的主从切换。
slave 节点接受 cluster failover 命令
主动 failover 是通过 redis 命令实现的,命令格式为 CLUSTER FAILOVER [FORCE|TAKEOVER]
,该命令使用详情可以参考这篇。
#define CLUSTER_MF_TIMEOUT 5000 else if (!strcasecmp(c->argv[1]->ptr,"failover") && (c->argc == 2 || c->argc == 3)){ /* CLUSTER FAILOVER [FORCE|TAKEOVER] */ int force = 0, takeover = 0; if (c->argc == 3) { /* 不与 master 沟通,主节点也不会阻塞其客户端,需要经过选举 */ if (!strcasecmp(c->argv[2]->ptr,"force")) { force = 1; /* 不与 master 沟通,不经过选举 */ } else if (!strcasecmp(c->argv[2]->ptr,"takeover")) { takeover = 1; force = 1; /* Takeover also implies force. */ /* 与 master 沟通,需要经过选举 */ } else { addReply(c,shared.syntaxerr); return; } } // ... server.cluster->mf_end = mstime() + CLUSTER_MF_TIMEOUT; // mf 的超时时间为 5s}
cluster failover 命令有三种不同的选项,各有不同的含义,如上面注释所说。takeover 变量标记是否要经过选举, force 变量标记是否需要与 master 沟通。
另外,mf 过程有一个过期时间,目前定义为 5s,同时, mf_end 也表示现在正在做 mf。
不同的选项有不同的处理方式,如下,if (takeover) { // takeover 不会做任何初始化校验。 // 不经过其他节点选举协商,直接将该节点的 current epoch 加 1,然后广播这个新的配置 serverLog(LL_WARNING,"Taking over the master (user request)."); clusterBumpConfigEpochWithoutConsensus(); clusterFailoverReplaceYourMaster();} else if (force) { /* If this is a forced failover, we don't need to talk with our * master to agree about the offset. We just failover taking over * it without coordination. */ serverLog(LL_WARNING,"Forced failover user request accepted."); server.cluster->mf_can_start = 1;// 可以直接开始选举过程} else { serverLog(LL_WARNING,"Manual failover user request accepted."); clusterSendMFStart(myself->slaveof); // 发送带有 CLUSTERMSG_TYPE_MFSTART 标记的 gossip 包(只有消息头)给我的 master}
takeover 方式最为粗暴,slave 节点不发起选举,而是直接将自己升级为master,接手原主节点的槽位,增加自己的 configEpoch 后更新配置。clusterFailoverReplaceYourMaster
的逻辑在前面讲过,只有在本轮选举中获得足够多的选票才会调用该函数。
force 方式表示可以直接开始选举过程,选举过程也在前面说过了。
现在来看看默认方式,处理逻辑为 clusterSendMFStart
函数。该函数主要逻辑就是发送向要做 failover 的 slave 的 master 发送 CLUSTERMSG_TYPE_MFSTART
类型的 gossip 消息。
master 节点做 mf 准备
else if (type == CLUSTERMSG_TYPE_MFSTART) { /* This message is acceptable only if I'm a master and the sender * is one of my slaves. */ if (!sender || sender->slaveof != myself) return 1; /* Manual failover requested from slaves. * Initialize the state accordingly. * master 收到消息,重置 mf 状态 */ resetManualFailover(); server.cluster->mf_end = mstime() + CLUSTER_MF_TIMEOUT; server.cluster->mf_slave = sender; pauseClients(mstime()+(CLUSTER_MF_TIMEOUT*2)); // 阻塞客户端 10s serverLog(LL_WARNING,"Manual failover requested by slave %.40s.", sender->name);}
resetManualFailover
函数中重置与 mf 相关的参数,表示这是一次新的 mf。
设置 mf_end,将它的 master 指向 sender(就是那个搞事情的 slave),同时阻塞 client 10s 钟。
随后,标记在做 mf 的 master 发送 PING 信息时 hdr 会带上 CLUSTERMSG_FLAG0_PAUSED 标记。
void clusterBuildMessageHdr(clusterMsg *hdr, int type) { // ... /* Set the message flags. */ if (nodeIsMaster(myself) && server.cluster->mf_end) hdr->mflags[0] |= CLUSTERMSG_FLAG0_PAUSED; // ...}
mflags 记录与 mf 相关的 flag。
slave 处理
获得 master 的 repl offset
slave 节点处理带有 CLUSTERMSG_FLAG0_PAUSED 标记的 gossip 消息。
int clusterProcessPacket(clusterLink *link) { // ... sender = clusterLookupNode(hdr->sender); if (sender && !nodeInHandshake(sender)) { // ... if (server.cluster->mf_end && // 处于 mf 状态 nodeIsSlave(myself) && // 我是 slave myself->slaveof == sender && // 我的 master 是 sender hdr->mflags[0] & CLUSTERMSG_FLAG0_PAUSED && server.cluster->mf_master_offset == 0) // 还没有正式开始时,mf_master_offset 设置为 0 { server.cluster->mf_master_offset = sender->repl_offset; // 从 sender 获得 repl_offset serverLog(LL_WARNING, "Received replication offset for paused " "master manual failover: %lld", server.cluster->mf_master_offset); } } // ...}
对于那个发起 failover 的 slave,记下其 master 的 repl_offset,如果之前还没有记录下的话。
向 maser 追平 repl offset
void clusterCron(void) { // ... if (nodeIsSlave(myself)) { clusterHandleManualFailover(); // ... } // ...}void clusterHandleManualFailover(void) { /* Return ASAP if no manual failover is in progress. */ if (server.cluster->mf_end == 0) return; /* If mf_can_start is non-zero, the failover was already triggered so the * next steps are performed by clusterHandleSlaveFailover(). */ if (server.cluster->mf_can_start) return; if (server.cluster->mf_master_offset == 0) return; /* Wait for offset... */ if (server.cluster->mf_master_offset == replicationGetSlaveOffset()) { /* Our replication offset matches the master replication offset * announced after clients were paused. We can start the failover. */ server.cluster->mf_can_start = 1; serverLog(LL_WARNING, "All master replication stream processed, " "manual failover can start."); }}
在 clusterCron
函数里有 clusterHandleManualFailover
的逻辑。
mf_end 为 0,说明此时没有 mf 发生。
mf_can_start 非 0 值,表示现在可以此 slave 可以发起选举了。
mf_master_offset 为 0,说明现在还没有获得 master 的复制偏移量,需要等一会儿。当 mf_master_offset 值等于 replicationGetSlaveOffset
函数的返回值时,把 mf_can_start 置为 1。另外,应该记得,使用带有 force 选项的 CLUSTER FAILOVER
命令,直接就会把 mf_can_start 置为 1,而 replicationGetSlaveOffset
函数的作用就是检查当前的主从复制偏移量,也就是说主从复制偏移量一定要达到 mf_master_offset 时,slave 才会发起选举,即默认选项有一个追平 repl offset 的过程。
其他一些选举什么的流程跟被动 failover 没有区别。
过期清理 mf
主从节点在周期性的clusterCron
中都有一个检查本次 mf 是否过期的函数。
void manualFailoverCheckTimeout(void) { if (server.cluster->mf_end && server.cluster->mf_end < mstime()) { serverLog(LL_WARNING,"Manual failover timed out."); resetManualFailover(); }}void resetManualFailover(void) { if (server.cluster->mf_end && clientsArePaused()) { server.clients_pause_end_time = 0; clientsArePaused(); /* Just use the side effect of the function. */ } server.cluster->mf_end = 0; /* No manual failover in progress. */ server.cluster->mf_can_start = 0; server.cluster->mf_slave = NULL; server.cluster->mf_master_offset = 0;}
如果过期没有做 mf ,那么就会重置它的相关参数。
附录
epoch 概念
在 Redis cluster 里 epoch 是个非常重要的概念,类似于 raft 算法中的 term 概念。Redis cluster 里主要是两种:currentEpoch 和 configEpoch。
currentEpoch
这是一个集群状态相关的概念,可以当做记录集群状态变更的递增版本号。每个集群节点,都会通过server.cluster->currentEpoch 记录当前的 currentEpoch。集群节点创建时,不管是主节点还是从节点,都置currentEpoch 为 0。当前节点接收到来自其他节点的包时,如果发送者的currentEpoch(消息头部会包含发送者的currentEpoch)大于当前节点的currentEpoch,那么当前节点会更新 currentEpoch 为发送者的 currentEpoch。因此,集群中所有节点的currentEpoch最终会达成一致,相当于对集群状态的认知达成了一致。
currentEpoch 作用在于,集群状态发生改变时,某节点会先增加自身 currentEpoch 的值,然后向集群中其他节点征求同意,以便执行某些动作。目前,仅用于 slave 节点的故障转移流程,在上面分析中也看到了,在发起选举之前,slave 会增加自己的 currentEpoch,并且得到的 currentEpoch 表示这一轮选举的 voteEpoch,当获得了足够多的选票后才会执行故障转移。
configEpoch
这是一个集群节点配置相关的概念,每个集群节点都有自己独一无二的 configepoch。所谓的节点配置,实际上是指节点所负责的 slot 信息。
configEpoch 主要用于解决不同的节点就 slot 归属认知发生冲突的情况。公说公有理婆说婆有理,到底听谁的,configEpoch 越大,看到的集群节点配置信息越新,就越有话语权。对于冲突的情况,后面会有博客进行详细分析。
以下几种情况 configEpoch 会更新:
- 新节点加入;
- 槽节点映射冲突检测;(slot 归属变更)
- 从节点投票选举冲突检测。(主从切换)
递增 node epoch 称为 bump epoch。
关于 configEpoch 有三个原则:
- 如果 epoch 不变, 集群就不应该有变更(包括选举和迁移槽位)。
- 每个节点的 node epoch 都是独一无二的。
- 拥有越高 epoch 的节点, 集群信息越新。
clusterUpdateState 函数逻辑
#define CLUSTER_MAX_REJOIN_DELAY 5000#define CLUSTER_MIN_REJOIN_DELAY 500#define CLUSTER_WRITABLE_DELAY 2000void clusterUpdateState(void) { // ... static mstime_t among_minority_time; static mstime_t first_call_time = 0; server.cluster->todo_before_sleep &= ~CLUSTER_TODO_UPDATE_STATE; /* 时间从第一次调用该函数算起,是为了跳过 DB load 时间。 * cluster 启动时,状态为 CLUSTER_FAIL, * 这里要等待一定的时间(2s)让 cluster 变为 CLUSTER_OK 状态。 */ if (first_call_time == 0) first_call_time = mstime(); if (nodeIsMaster(myself) && server.cluster->state == CLUSTER_FAIL && mstime() - first_call_time < CLUSTER_WRITABLE_DELAY) return; /* 先假设集群状态为 CLUSTER_OK, * 然后遍历 16384 个 slot,如果发现有 slot 被有被接管, * 或者接管某 slot 的 node 是 fail 状态,那么把集群设置为 CLUSTER_FAIL,退出循环 */ new_state = CLUSTER_OK; if (server.cluster_require_full_coverage) { for (j = 0; j < CLUSTER_SLOTS; j++) { if (server.cluster->slots[j] == NULL || server.cluster->slots[j]->flags & (CLUSTER_NODE_FAIL)) { new_state = CLUSTER_FAIL; break; } } } { /* 计算 cluster size,计数的是那些至少负责一个 slot 的 node * 计算 reachable_masters,计数基于 cluster size, * 加入筛选条件(不带有 CLUSTER_NODE_FAIL|CLUSTER_NODE_PFAIL) 标记 */ dictIterator *di; dictEntry *de; server.cluster->size = 0; di = dictGetSafeIterator(server.cluster->nodes); while((de = dictNext(di)) != NULL) { clusterNode *node = dictGetVal(de); if (nodeIsMaster(node) && node->numslots) { server.cluster->size++; if ((node->flags & (CLUSTER_NODE_FAIL|CLUSTER_NODE_PFAIL)) == 0) reachable_masters++; } } dictReleaseIterator(di); } { /* 如果 reachable_masters 不到 cluster size 一半(a minority partition), * 就将集群标记为 CLUSTER_FAIL */ int needed_quorum = (server.cluster->size / 2) + 1; if (reachable_masters < needed_quorum) { new_state = CLUSTER_FAIL; among_minority_time = mstime(); } } if (new_state != server.cluster->state) { mstime_t rejoin_delay = server.cluster_node_timeout; if (rejoin_delay > CLUSTER_MAX_REJOIN_DELAY) rejoin_delay = CLUSTER_MAX_REJOIN_DELAY; if (rejoin_delay < CLUSTER_MIN_REJOIN_DELAY) rejoin_delay = CLUSTER_MIN_REJOIN_DELAY; /* 处于 minority partition 的时间没有超过 cluster_node_timeout, * 那么此次不更新集群状态。 */ if (new_state == CLUSTER_OK && nodeIsMaster(myself) && mstime() - among_minority_time < rejoin_delay) { return; } /* Change the state and log the event. */ serverLog(LL_WARNING,"Cluster state changed: %s", new_state == CLUSTER_OK ? "ok" : "fail"); server.cluster->state = new_state; }
4. 参考