Redis-sync

背景介绍

Redis是一个内存型KV存储,以其高性能特性得到广泛应用。作为数据库,主从同步一直是一个非常核心的功能。其主从同步是基于最终一致性。

本章主要是主从同步相关流程和代码的分析,内容写于2015年基于Redis 2.6版本。

流程分析

主库视角

Redis-master-sync
站在Redis-Master视角,根据代码整理出了主从同步过程的函数调用关系。同步过程有两种可能的情况:

  • 主从同步:正常状态下的实时同步
  • 主从复制:主从相差(参数控制)过大时,尝试进行部分复制或全量复制

从库视角

Redis-slave-sync
对于从库来说,它会主动向主库发起复制请求,对应主库的主从复制场景。

代码解析

主从同步

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
/* 对主库而言,call函数是一个入口函数 */
/* 这里是主从复制部分,函数调用路径为call()->propagate()->replicationFeedSlaves() */
/* Redis执行每个命令都会调用call函数,call函数中调用propagate函数,propagate函数就做了两件事:
a.将命令写入到aof中(如果开启了)
b.将命令传播给slave(replicationFeedSlaves())
*/
/* Call() is the core of Redis execution of a command */
void call(redisClient *c, int flags) {
    long long dirty, start = ustime(), duration;
     
    ... //省略了部分代码
 
    /* Propagate the command into the AOF and replication link */
    if (flags & REDIS_CALL_PROPAGATE) { //flags是propagate函数的标志位,可以控制是否进行同步、是否写入aof
        int flags = REDIS_PROPAGATE_NONE; //新建一个新flags做局部变量,等待传入之后的propagate函数
        if (c->cmd->flags & REDIS_CMD_FORCE_REPLICATION) { //命令是强制同步的
            flags |= REDIS_PROPAGATE_REPL;
        }
        if (dirty) { //数据被修改了,同步和写入aof都做
            flags |= (REDIS_PROPAGATE_REPL | REDIS_PROPAGATE_AOF);
        }
        if (flags != REDIS_PROPAGATE_NONE) {  //此时标志位不是NONE(即需要进行同步或写入aof),则调用propagate函数
            propagate(c->cmd,c->argv,c->argc,flags);
        }
    }
    /* Handle the alsoPropagate() API to handle commands that want to propagate
     * multiple separated commands. */
    if (server.also_propagate.numops) { //处理额外需要传播的命令
        int j;
        redisOp *rop;
        for (j = 0; j < server.also_propagate.numops; j++) {
            rop = &server.also_propagate.ops[j];
            propagate(rop->cmd, rop->argv, rop->argc, rop->target);
        }
        redisOpArrayFree(&server.also_propagate);
    }
    server.stat_numcommands++;
}
1
2
3
4
5
6
7
8
void propagate(struct redisCommand *cmd, robj **argv, int argc,
int flags)
{
if (server.aof_state != REDIS_AOF_OFF && flags & REDIS_PROPAGATE_AOF) //需要写入aof,则调用feedAppendOnlyFile函数
feedAppendOnlyFile(cmd,argv,argc);
if (flags & REDIS_PROPAGATE_REPL) //需要对slave进行同步,调用replicationFeedSlaves函数
replicationFeedSlaves(server.slaves,argv,argc);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
void replicationFeedSlaves(list *slaves, robj **argv, int argc) {
listNode *ln;
listIter li;
int j, len;
/* If there aren't slaves, and there is no backlog buffer to populate,
* we can return ASAP. */
if (server.repl_backlog == NULL && listLength(slaves) == 0) return; //没有slaves,或者repl_backlog为NULL,直接return
/* We can't have slaves attached and no backlog. */
redisAssert(!(listLength(slaves) != 0 && server.repl_backlog == NULL));
...//此处省略部分注释代码,用于处理select命令,开源版本代码,我们自研版本不需要select因此注释掉了
if (server.repl_backlog) { //repl_backlog是主从同步的一个缓存空间,如果不为NULL
char aux[REDIS_LONGSTR_SIZE+3];
/* Add the multi bulk reply length. */
aux[0] = '*';
len = ll2string(aux+1,sizeof(aux)-1,argc); //将命令个数传入aux数组,aux变成*num格式,遵守redis协议
aux[len+1] = '\r';
aux[len+2] = '\n';
feedReplicationBacklog(aux,len+3); //这个函数左右就是把字符串写入repl_backlog,这里是把命令个数转换成协议写进去
for (j = 0; j < argc; j++) { //挨个把命令写入缓冲区
long objlen = stringObjectLen(argv[j]);
/* We need to feed the buffer with the object as a bulk reply
* not just as a plain string, so create the $..CRLF payload len
* ad add the final CRLF */
aux[0] = '$';
len = ll2string(aux+1,sizeof(aux)-1,objlen);
aux[len+1] = '\r';
aux[len+2] = '\n';
feedReplicationBacklog(aux,len+3); //这里跟上面差不多,遵守协议,写入$num
feedReplicationBacklogWithObject(argv[j]); //紧跟着把具体命令传进去,这个函数仅仅是处理了一下对象将其转化成字符串,底层调用的任然是feedReplicationBacklog函数
feedReplicationBacklog(aux+len+1,2);
}
}
/* Write the command to every slave. */
listRewind(slaves,&li);
while((ln = listNext(&li))) { //遍历slaves链表
redisClient *slave = ln->value;
/* Don't feed slaves that are still waiting for BGSAVE to start */
if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) continue; //如果slave正在等待全量同步,则跳过
/* Feed slaves that are waiting for the initial SYNC (so these commands
* are queued in the output buffer until the initial SYNC completes),
* or are already in sync with the master. */
/* Add the multi bulk length. */
addReplyMultiBulkLen(slave,argc); //传播命令的长度
/* Finally any additional argument that was not stored inside the
* static buffer if any (from j to argc). */
for (j = 0; j < argc; j++)
addReplyBulk(slave,argv[j]); //对slave进行命令广播的真正函数
}
}

主从复制

主从复制主要是两个入口函数
对于master来说,syncCommand()用来处理slave的复制请求
对于slave来说,syncWithMaster()向master发起请求,并且通过updateSlavesWaitingBgsave()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
/* SYNC ad PSYNC command implemenation. */
void syncCommand(redisClient *c) {
/* ignore SYNC if already slave or in monitor mode */
if (c->flags & REDIS_SLAVE) return;
/* Refuse SYNC requests if we are a slave but the link with our master
* is not ok... */
if (server.masterhost && server.repl_state != REDIS_REPL_CONNECTED) { //连接不ok,拒绝
addReplyError(c,"Can't SYNC while not connected with my master");
return;
}
if (listLength(c->reply) != 0 || c->bufpos != 0) {
addReplyError(c,"SYNC and PSYNC are invalid with pending output");
return;
}
redisLog(REDIS_NOTICE,"Slave asks for synchronization");
if (!strcasecmp(c->argv[0]->ptr,"psync")) { //判断client发过来的是否为psync,如果是。。。
if (masterTryPartialResynchronization(c) == REDIS_OK) { //masterTryPartialResynchronization函数尝试进行部分同步,如果ok,则此次同步成功,直接返回即可
server.stat_sync_partial_ok++;
return; /* No full resync needed, return. */
} else {
char *master_runid = c->argv[1]->ptr;
/* Increment stats for failed PSYNCs, but only if the
* runid is not "?", as this is used by slaves to force a full
* resync on purpose when they are not albe to partially
* resync. */
if (master_runid[0] != '?') server.stat_sync_partial_err++; //?代表slave强制要求全量复制,如果走到这个分支条件,runid又不是?,则说明部分复制出了错
}
} else {
/* If a slave uses SYNC, we are dealing with an old implementation //如果是sync,则说明client用的是老版本协议,同样按psync处理
* of the replication protocol (like redis-cli --slave). Flag the client
* so that we don't expect to receive REPLCONF ACK feedbacks. */
c->flags |= REDIS_PRE_PSYNC_SLAVE;
}
/* Full resynchronization. */
server.stat_sync_full++;
/* Here we need to check if there is a background saving operation
* in progress, or if it is required to start one */
if (server.rdb_child_pid != -1) { //进行全量复制,第一步进行dump rdb,如果此时有子进程正在dump
/* Ok a background save is in progress. Let's check if it is a good
* one for replication, i.e. if there is another slave that is
* registering differences since the server forked to save */
redisClient *slave;
listNode *ln;
listIter li;
listRewind(server.slaves,&li);
while((ln = listNext(&li))) { //遍历slaves,如果有slave的repl状态是REDIS_REPL_WAIT_BGSAVE_END(说明有从库正在等着传rdb文件过去)
slave = ln->value;
if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) break;
}
if (ln) {//这个判断承接上面的遍历查找,如果存在等候的slave
/* Perfect, the server is already registering differences for
* another slave. Set the right state, and copy the buffer. */
copyClientOutputBuffer(c,slave);//将找到的符合条件的slave的outputbuffer复制给当前client来,从而当前client也可以进入wait状态...这里其实并不理解为啥可以这么干
c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
redisLog(REDIS_NOTICE,"Waiting for end of BGSAVE for SYNC");
} else { //找不到处在wait状态的slave
/* No way, we need to wait for the next BGSAVE in order to
* register differences */
c->replstate = REDIS_REPL_WAIT_BGSAVE_START; //把当前client状态置为等待下一次bgsave开始,这次不行,等下次bgsave吧
redisLog(REDIS_NOTICE,"Waiting for next BGSAVE for SYNC");
}
} else { //当前server没有在dump rdb
/* Ok we don't have a BGSAVE in progress, let's start one */
redisLog(REDIS_NOTICE,"Starting BGSAVE for SYNC");
if (rdbSaveBackground(server.rdb_filename) != REDIS_OK) { //rdbSaveBackgroud函数自己dump,如果结果不ok,记录失败日志,返回
redisLog(REDIS_NOTICE,"Replication failed, can't BGSAVE");
addReplyError(c,"Unable to perform background save");
return;
}
c->replstate = REDIS_REPL_WAIT_BGSAVE_END; //如果上面没有return掉,说明bgsave成功,这个将client状态置为wait
}
if (server.repl_disable_tcp_nodelay)
anetDisableTcpNoDelay(NULL, c->fd); /* Non critical if it fails. */ //启用tcp-nodelay功能
c->repldbfd = -1;
c->flags |= REDIS_SLAVE;
server.slaveseldb = -1; /* Force to re-emit the SELECT command. */
listAddNodeTail(server.slaves,c);
if (listLength(server.slaves) == 1 && server.repl_backlog == NULL) //特判一下只有一个从,而又没有进行上述步骤(repl-backlog为空),则new一个buffer出来
createReplicationBacklog();
return;
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
// masterTryPartialResynchronization函数,主库收到从库请求,尝试部分复制
int masterTryPartialResynchronization(redisClient *c) {
long long psync_offset, psync_len;
char *master_runid = c->argv[1]->ptr;
char buf[128];
int buflen;
/* Is the runid of this master the same advertised by the wannabe slave
* via PSYNC? If runid changed this master is a different instance and
* there is no way to continue. */
if (strcasecmp(master_runid, server.runid)) { //client传过来的runid与当前server的不匹配
/* Run id "?" is used by slaves that want to force a full resync. */
if (master_runid[0] != '?') { //client传过来的是?,说明client主动要求全量复制
redisLog(REDIS_NOTICE,"Partial resynchronization not accepted: "
"Runid mismatch (Client asked for '%s', I'm '%s')",
master_runid, server.runid);
} else {
redisLog(REDIS_NOTICE,"Full resync requested by slave."); //否则就是runid不匹配,可能发生过断线重连,要全量同步,这里的if-else就是为了记录一下日志
}
goto need_full_resync; //goto到全量复制模块
}
/* We still have the data our slave is asking for? */
if (getLongLongFromObjectOrReply(c,c->argv[2],&psync_offset,NULL) !=
REDIS_OK) goto need_full_resync; //获取client的offset,如果失败,也goto到全量复制
if (!server.repl_backlog ||
psync_offset < server.repl_backlog_off ||
psync_offset > (server.repl_backlog_off + server.repl_backlog_histlen)) //如果client的offset不在server的可部分同步范围(小于或大于)
{
redisLog(REDIS_NOTICE,
"Unable to partial resync with the slave for lack of backlog (Slave request was: %lld).", psync_offset);
if (psync_offset > server.master_repl_offset) {
redisLog(REDIS_WARNING,
"Warning: slave tried to PSYNC with an offset that is greater than the master replication offset.");
}
goto need_full_resync; //打印日志同样跳去全量
}
/* If we reached this point, we are able to perform a partial resync:
* 1) Set client state to make it a slave.
* 2) Inform the client we can continue with +CONTINUE
* 3) Send the backlog data (from the offset to the end) to the slave. */
c->flags |= REDIS_SLAVE;
c->replstate = REDIS_REPL_ONLINE;
c->repl_ack_time = server.unixtime;
listAddNodeTail(server.slaves,c);//可以进行部分同步,把当前client加入到server.slaves列表中
/* We can't use the connection buffers since they are used to accumulate
* new commands at this stage. But we are sure the socket send buffer is
* emtpy so this write will never fail actually. */
buflen = snprintf(buf,sizeof(buf),"+CONTINUE\r\n");
if (write(c->fd,buf,buflen) != buflen) { //给从库发送CONTINUE消息,告诉它可以进行部分同步
freeClientAsync(c);
return REDIS_OK;
}
psync_len = addReplyReplicationBacklog(c,psync_offset);//将缓冲区积压的数据发给从库
redisLog(REDIS_NOTICE,
"Partial resynchronization request accepted. Sending %lld bytes of backlog starting from offset %lld.", psync_len, psync_offset);
/* Note that we don't need to set the selected DB at server.slaveseldb
* to -1 to force the master to emit SELECT, since the slave already
* has this state from the previous connection with the master. */
return REDIS_OK; /* The caller can return, no full resync needed. */
need_full_resync: //全量复制模块
/* We need a full resync for some reason... notify the client. */
psync_offset = server.master_repl_offset;
/* Add 1 to psync_offset if it the replication backlog does not exists
* as when it will be created later we'll increment the offset by one. */
if (server.repl_backlog == NULL) psync_offset++;
/* Again, we can't use the connection buffers (see above). */
buflen = snprintf(buf,sizeof(buf),"+FULLRESYNC %s %lld\r\n",
server.runid,psync_offset);
if (write(c->fd,buf,buflen) != buflen) {
freeClientAsync(c);
return REDIS_OK;
}
return REDIS_ERR;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
// updateSlavesWaitingBgsave函数 尝试将全量dump文件发送给从库
void updateSlavesWaitingBgsave(int bgsaveerr) {
listNode *ln;
int startbgsave = 0;
listIter li;
listRewind(server.slaves,&li);
while((ln = listNext(&li))) { //遍历slaves,更新各个slave的repl状态
redisClient *slave = ln->value;
if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) {
startbgsave = 1;
slave->replstate = REDIS_REPL_WAIT_BGSAVE_END;
} else if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) {
struct redis_stat buf;
if (bgsaveerr != REDIS_OK) {//bgsave失败,释放slave节点,该节点复制失败
freeClient(slave);
redisLog(REDIS_WARNING,"SYNC failed. BGSAVE child returned an error");
continue;
}
if ((slave->repldbfd = open(server.rdb_filename,O_RDONLY)) == -1 || //打开rdb文件失败,释放slave
redis_fstat(slave->repldbfd,&buf) == -1) {
freeClient(slave);
redisLog(REDIS_WARNING,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno));
continue;
}
slave->repldboff = 0;
slave->repldbsize = buf.st_size;
slave->replstate = REDIS_REPL_SEND_BULK;
aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave) == AE_ERR) { //创建写事件,将rdb内容写到slave去
freeClient(slave);
continue;
}
}
}
if (startbgsave) { //如果startbgsave,即第一次触发这个函数,需要bgsave
if (rdbSaveBackground(server.rdb_filename) != REDIS_OK) {
listIter li;
listRewind(server.slaves,&li);
redisLog(REDIS_WARNING,"SYNC failed. BGSAVE failed");
while((ln = listNext(&li))) {
redisClient *slave = ln->value;
if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START)
freeClient(slave);
}
}
}
}

以下是站在从库视角代码解析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
/* --------------------------- REPLICATION CRON ----------------------------
实时向主库发起同步
*/
void replicationCron(void) {
/* Non blocking connection timeout? */
if (server.masterhost &&
(server.repl_state == REDIS_REPL_CONNECTING ||
server.repl_state == REDIS_REPL_RECEIVE_PONG) &&
(time(NULL)-server.repl_transfer_lastio) > server.repl_timeout)
{
redisLog(REDIS_WARNING,"Timeout connecting to the MASTER...");
undoConnectWithMaster();
}
/* Bulk transfer I/O timeout? */
if (server.masterhost && server.repl_state == REDIS_REPL_TRANSFER &&
(time(NULL)-server.repl_transfer_lastio) > server.repl_timeout)
{
redisLog(REDIS_WARNING,"Timeout receiving bulk data from MASTER... If the problem persists try to set the 'repl-timeout' parameter in redis.conf to a larger value.");
replicationAbortSyncTransfer();
}
/* Timed out master when we are an already connected slave? */
if (server.masterhost && server.repl_state == REDIS_REPL_CONNECTED &&
(time(NULL)-server.master->lastinteraction) > server.repl_timeout)
{
redisLog(REDIS_WARNING,"MASTER timeout: no data nor PING received...");
freeClient(server.master);
}//以上都是cron函数进行周期性的超时检查
/* Check if we should connect to a MASTER */
if (server.repl_state == REDIS_REPL_CONNECT) {
redisLog(REDIS_NOTICE,"Connecting to MASTER...");
if (connectWithMaster() == REDIS_OK) { //触发connectWithMaster()函数
redisLog(REDIS_NOTICE,"MASTER <-> SLAVE sync started");
}
}
/* Send ACK to master from time to time. */
if (server.masterhost && server.master) //对主库发REPLCONF ACK offset
replicationSendAck();
/* If we have attached slaves, PING them from time to time.
* So slaves can implement an explicit timeout to masters, and will
* be able to detect a link disconnection even if the TCP connection
* will not actually go down. */
if (!(server.cronloops % (server.repl_ping_slave_period * server.hz))) { //如果自身还有从库,对从库发ping
listIter li;
listNode *ln;
robj *ping_argv[1];
/* First, send PING */
ping_argv[0] = createStringObject("PING",4);
replicationFeedSlaves(server.slaves, ping_argv, 1); //调用replicationFeedSlaves函数,与主从同步一致的做法
decrRefCount(ping_argv[0]);
/* Second, send a newline to all the slaves in pre-synchronization
* stage, that is, slaves waiting for the master to create the RDB file.
* The newline will be ignored by the slave but will refresh the
* last-io timer preventing a timeout. */
listRewind(server.slaves,&li);
while((ln = listNext(&li))) {
redisClient *slave = ln->value;
if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START ||
slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) {
if (write(slave->fd, "\n", 1) == -1) {
/* Don't worry, it's just a ping. */
}
}
}
}
/* Disconnect timedout slaves. */ //检查timeout的slave,踢掉它,当前time-上一次ack的time 大于 配置repl-timeout认为生效
if (listLength(server.slaves)) {
listIter li;
listNode *ln;
listRewind(server.slaves,&li);
while((ln = listNext(&li))) {
redisClient *slave = ln->value;
if (slave->replstate != REDIS_REPL_ONLINE) continue;
if (slave->flags & REDIS_PRE_PSYNC_SLAVE) continue;
if ((server.unixtime - slave->repl_ack_time) > server.repl_timeout)
{
char ip[32];
int port;
if (anetPeerToString(slave->fd,ip,&port) != -1) {
redisLog(REDIS_WARNING,
"Disconnecting timedout slave: %s:%d",
ip, slave->slave_listening_port);
}
freeClient(slave);
}
}
}
/* If we have no attached slaves and there is a replication backlog
* using memory, free it after some (configured) time. */
if (listLength(server.slaves) == 0 && server.repl_backlog_time_limit && //检查是否没有从,清除backlog buffer
server.repl_backlog)
{
time_t idle = server.unixtime - server.repl_no_slaves_since;
if (idle > server.repl_backlog_time_limit) {
freeReplicationBacklog();
redisLog(REDIS_NOTICE,
"Replication backlog freed after %d seconds "
"without connected slaves.",
(int) server.repl_backlog_time_limit);
}
}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
// syncWithMaster 向主库发起复制请求,如果可以部分复制,优先进行部分复制
void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {
char tmpfile[256], *err;
int dfd, maxtries = 5;
int sockerr = 0, psync_result;
socklen_t errlen = sizeof(sockerr);
REDIS_NOTUSED(el);
REDIS_NOTUSED(privdata);
REDIS_NOTUSED(mask);
...//省略部分socket及事件代码
/* Try a partial resynchonization. If we don't have a cached master
* slaveTryPartialResynchronization() will at least try to use PSYNC
* to start a full resynchronization so that we get the master run id
* and the global offset, to try a partial resync at the next
* reconnection attempt. */
psync_result = slaveTryPartialResynchronization(fd); //尝试进行部分复制
if (psync_result == PSYNC_CONTINUE) { //返回参数为psync_continue,说明部分复制ok的
redisLog(REDIS_NOTICE, "MASTER <-> SLAVE sync: Master accepted a Partial Resynchronization.");
return;
}
/* Fall back to SYNC if needed. Otherwise psync_result == PSYNC_FULLRESYNC //上述不ok则进入全量复制过程
* and the server.repl_master_runid and repl_master_initial_offset are
* already populated. */
if (psync_result == PSYNC_NOT_SUPPORTED) { //主库反馈不支持PSYNC,尝试发SYNC
redisLog(REDIS_NOTICE,"Retrying with SYNC...");
if (syncWrite(fd,"SYNC\r\n",6,server.repl_syncio_timeout*1000) == -1) {
redisLog(REDIS_WARNING,"I/O error writing to MASTER: %s",
strerror(errno));
goto error;
}
}
/* Prepare a suitable temp file for bulk transfer */
while(maxtries--) { //创建文件,重试五次。。。why 5?
snprintf(tmpfile,256,
"temp-%d.%ld.rdb",(int)server.unixtime,(long int)getpid());
dfd = open(tmpfile,O_CREAT|O_WRONLY|O_EXCL,0644);
if (dfd != -1) break;
sleep(1);
}
if (dfd == -1) { //创建不了新文件,error
redisLog(REDIS_WARNING,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno));
goto error;
}
/* Setup the non blocking download of the bulk file. */
if (aeCreateFileEvent(server.el,fd, AE_READABLE,readSyncBulkPayload,NULL) //注册文件事件,回调真正处理全量同步的函数readSyncBulkPayload
== AE_ERR)
{
redisLog(REDIS_WARNING,
"Can't create readable event for SYNC: %s (fd=%d)",
strerror(errno),fd);
goto error;
}
server.repl_state = REDIS_REPL_TRANSFER;
server.repl_transfer_size = -1;
server.repl_transfer_read = 0;
server.repl_transfer_last_fsync_off = 0;
server.repl_transfer_fd = dfd;
server.repl_transfer_lastio = server.unixtime;
server.repl_transfer_tmpfile = zstrdup(tmpfile);
return;
error:
close(fd);
server.repl_transfer_s = -1;
server.repl_state = REDIS_REPL_CONNECT;
return;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
// slaveTryPartialResynchronization 从库尝试进行部分复制
int slaveTryPartialResynchronization(int fd) {
char *psync_runid;
char psync_offset[32];
sds reply;
/* Initially set repl_master_initial_offset to -1 to mark the current
* master run_id and offset as not valid. Later if we'll be able to do
* a FULL resync using the PSYNC command we'll set the offset at the
* right value, so that this information will be propagated to the
* client structure representing the master into server.master. */
server.repl_master_initial_offset = -1;
if (server.cached_master) { //如果有cached_master,将cached_master的offset给取出来
psync_runid = server.cached_master->replrunid;
snprintf(psync_offset,sizeof(psync_offset),"%lld", server.cached_master->reploff+1);
redisLog(REDIS_NOTICE,"Trying a partial resynchronization (request %s:%s).", psync_runid, psync_offset);
} else { //否则只能发起全量同步
redisLog(REDIS_NOTICE,"Partial resynchronization not possible (no cached master)");
psync_runid = "?";
memcpy(psync_offset,"-1",3);
}
/* Issue the PSYNC command */
reply = sendSynchronousCommand(fd,"PSYNC",psync_runid,psync_offset,NULL); //给主库发送psync
if (!strncmp(reply,"+FULLRESYNC",11)) { //如果主库回复只能FULLRESYNC
char *runid = NULL, *offset = NULL;
/* FULL RESYNC, parse the reply in order to extract the run id
* and the replication offset. */
runid = strchr(reply,' ');
if (runid) {
runid++;
offset = strchr(runid,' ');
if (offset) offset++;
}
if (!runid || !offset || (offset-runid-1) != REDIS_RUN_ID_SIZE) {
redisLog(REDIS_WARNING,
"Master replied with wrong +FULLRESYNC syntax.");
/* This is an unexpected condition, actually the +FULLRESYNC
* reply means that the master supports PSYNC, but the reply
* format seems wrong. To stay safe we blank the master
* runid to make sure next PSYNCs will fail. */
memset(server.repl_master_runid,0,REDIS_RUN_ID_SIZE+1);
} else {
memcpy(server.repl_master_runid, runid, offset-runid-1);
server.repl_master_runid[REDIS_RUN_ID_SIZE] = '\0';
server.repl_master_initial_offset = strtoll(offset,NULL,10);
redisLog(REDIS_NOTICE,"Full resync from master: %s:%lld",
server.repl_master_runid,
server.repl_master_initial_offset);
}
/* We are going to full resync, discard the cached master structure. */
replicationDiscardCachedMaster();//清除当前的cachemaster,why?
sdsfree(reply);
return PSYNC_FULLRESYNC;
}
if (!strncmp(reply,"+CONTINUE",9)) { //如果主库回复continue 意味着可以进行部分同步
/* Partial resync was accepted, set the replication state accordingly */
redisLog(REDIS_NOTICE,
"Successful partial resynchronization with master.");
sdsfree(reply);
replicationResurrectCachedMaster(fd);
return PSYNC_CONTINUE;
}
/* If we reach this point we receied either an error since the master does
* not understand PSYNC, or an unexpected reply from the master.
* Reply with PSYNC_NOT_SUPPORTED in both cases. */
if (strncmp(reply,"-ERR",4)) {
/* If it's not an error, log the unexpected event. */
redisLog(REDIS_WARNING,
"Unexpected reply to PSYNC from master: %s", reply);
} else {
redisLog(REDIS_NOTICE,
"Master does not support PSYNC or is in "
"error state (reply: %s)", reply);
}
sdsfree(reply);
replicationDiscardCachedMaster();
return PSYNC_NOT_SUPPORTED;
}