admin 管理员组文章数量: 1184232
2A and 2B
先占个坑,磨磨蹭蹭大半个月了,总算是独立写完了这两个lab,还是有很多想写的。
这里我个人比较建议把2A 与 2B 一起做完。因为如果你是完全按照paper的实现去构建的2A的话,2B的实现也就相对简单了。相反如果如果有一些与paper不一样的(特别是没有完全实现paper Figure2),可能2A 也会通过,但是实现2B的过程会让你重复修改2A的代码,这样就麻烦了,所以这里建议一起实现一步到位。当然实现过程仍然是先2A 然后 再 2B,只不过实现2A的过程要同时思考2B的实现。
这篇文章中,我会尽力回想当时的想法,以及一步一步的实现,而不是简单的贴出最终代码和解释。默认在看这篇文章的同学有精读 paper的 相关章节,以及大致理解raft算法轮廓。这里推荐一个b站的视频
结合视频能够较快的理解raft算法大致的过程,当然很多细节有待自己的慢慢思考。
前排叠甲,之前都是学c++的,go最近才学,目前代码也就能跑,还有很多能够改善与待优美化的地方,大佬轻喷。
让我们回到实验
2A
首先第一步就是理解目前给出的代码框架,也就是
/raft/raft.go
这个初始文件。从上到下看:
type ApplyMsg struct{
CommandValid bool
Command interface{}
CommandIndex int// For 2D:
SnapshotValid bool
Snapshot []byte
SnapshotTerm int
SnapshotIndex int}给出的 ApplyMsg ,看代码中的英文解释,应该猜到这个与raft支持的服务有关,也就是我们构建的raft算法为哪个service服务,这个struct 通过某个channel 来进行发送通信。 不懂也没关系 后面会重提提到。
然后就是核心的 raft struct
type Raft struct{
mu sync.Mutex // 每个raft的大锁
peers []*labrpc.ClientEnd // 拥有所有的RPC成员
persister *Persister // 不管
me int// 自己再peers中属于成员的第几位
dead int32// 自己是否活着// Your data here (2A, 2B, 2C).// Look at the paper's Figure 2 for a description of what// state a Raft server must maintain.}显然我们要为raft添加一些字段,那么添加哪些呢? 让我们从figure 2 来获取灵感,在state栏中的字段肯定都需要,全部加上
type Raft struct{
mu sync.Mutex // 每个raft的大锁
peers []*labrpc.ClientEnd // 拥有所有的RPC成员
persister *Persister // 不管
me int// 自己再peers中属于成员的第几位
dead int32// 自己是否活着// Your data here (2A, 2B, 2C).// Look at the paper's Figure 2 for a description of what// state a Raft server must maintain.
currentTerm int
votedFor int
log ~~~~~
}加到一半发现 log 这个结构体还没实现呢,我们先实现log结构体。显然Log struct中肯定包含 currentTerm, Index, 以及command, 如果需要其他的我们后面再看。前面两个用int类型,那么command用啥呢? 如果留意到前面的ApplyMsg的话这里command应该用interface{},没留意也没关系,直接用先用string。
type Log struct{
CurrentTerm int//这个log的任期
Index int//这个log的索引,与任期一起确定一个唯一的log
Command string//log包含的指令}踩坑小提示: 经由RPC发送的struct字段一定要用大写字母开头,一定要用! 不然后面RPC发送log的时候默认小写字段不发送,鬼知道我踩了几次这个坑…
好log实现后我们继续来写raft
type Raft struct{
mu sync.Mutex // 每个raft的大锁
peers []*labrpc.ClientEnd // 拥有所有的RPC成员
persister *Persister // 不管
me int// 自己再peers中属于成员的第几位
dead int32// 自己是否活着// Your data here (2A, 2B, 2C).// Look at the paper's Figure 2 for a description of what// state a Raft server must maintain.
currentTerm int//当前raft所在任期
votedFor int//当前任期下给谁投票了,没有投就是-1
logs Log[]//logs
commitIndex int//已经commit的索引
lastApplied int//最后一个applied的索引
nextIndex int[]
matchIndex int[]
state int}最后两对字段详细解释一下。首先是commitIndex 和 lastAppliedIndex,注意这里的具体实现与文章中说的有一点不一样,文章中都是用一个commit来断定某个log是否能提交,但是实际log提交也需要一个过程,所以实现中需要两个指针,一个指向能够提交的最大索引,也就是commitIndex,一个指向已经应用的最后一个索引appliedIndex,这两个索引之间的log 就是 能够应用,但是还没来得及应用的。然后就是nextIndex 与 matchIndex,任何server / peer都可能成为Leader,而作为Leader就需要维护所有follower的 log状态,这两个就起这个作用。 具体详细的解释可以看看 lab的 ,或者直接看我的翻译与理解 。
踩坑小提示:刚开始看next 和 match感觉这两个字段就有点懵逼,并且我最终实现中最多的bug出现就来自于这两个字段,所以一定要在使用两个字段的时候多多思考,bug才会少。
此外默认的Log index是从1开始而不是从零开始也让实现多了一些烦恼。
此外还有一个明显的字段要添加那就是state,代表当前的server / peer 处于什么状态,Candidate/Leader/Follower 用枚举就好,ok 就先这样,后面有需要的字段再添加。
下面到了RequestVoteArgs struct和 RequestVoteReply struct 这两个RPC字段,仍然是根据Figure2来写,注意大写
type RequestVoteArgs struct{// Your data here (2A, 2B).
Term int//发起投票的candidate的任期
CandidateId int//candidate的ID
LastLogIndex int//candidate的上一个entry
LastLogTerm int//candidate的任期}// example RequestVote RPC reply structure.// field names must start with capital letters!type RequestVoteReply struct{// Your data here (2A).
Term int//投票者的任期
Vote bool//是否投给你了}然后是 func RequestVote 和 func sendRequestVote。前者是RPC handler,用来处理选举请求,后者是啥呢,框架中帮我们写了一行,好像是用来调用前者,发送RPC请求的,看不懂先不管。
继续是func start,看英文注释好像是用来给leader 添加log的,也先不管。然后是func kill 和 func killed,应该是用来杀死server 以及判断server是否被杀死,上面的英文注释写着 在一个长循环中,每次循环开始都要判断该server 是否被杀死。好这个作用就很明显了,后面又长循环照做就行。
到了最后两个函数,func ticker 和 func Make。前者就是用来触发选举超时(election time out)以及心跳(heartbeat),那么为啥是这两个作用呢,首先根据英文提示可以知道这里要开启选举超时,实现上应该与一个定时器有关,然后还会又一个无线循环持续来等待触发定时器。 进一步思考一下,如果是leader的话那么就不会触发这个选举超时,而是会在心跳超时后触发心跳RPC,所以这里也要加上心跳检测。
我要吐槽一下election timeout这个名字,明明是等待超时后开始进行选举,但名字却是选举超时,很难不让人以为是选举的时候没获得大部分的选票而超时了。。。。
后者就是创建一个新的raft并且开始工作。 好,这样框架全部看完了,我们继续看lab的提示。前四个hint 我们已经完成。从第五个开始,实现AppendEntries RPC struct,ok 继续照着figure 2 来实现
type AppendEntriesArgs struct{
Term int
LeaderId int
PreLogIndex int
PreLogTerm int
Logs []Log
LeaderCommit int}type AppendEntriesReply struct{
Term int
Success bool}继续来看func Make的实现。
funcMake(peers []*labrpc.ClientEnd, me int,
persister *Persister, applyCh chan ApplyMsg)*Raft {
rf :=&Raft{}
rf.peers = peers
rf.persister = persister
rf.me = me
rf.currentTerm =0
rf.state = Follwer
rf.logs =make([]Log,0)
rf.nextIndex =make([]int,len(rf.peers))
rf.voteFor =-1for i :=range rf.nextIndex {
rf.nextIndex[i]=1}for i :=range rf.matchIndex {
rf.matchIndex[i]=0}
rf.matchIndex =make([]int,len(rf.peers))//上面都是初始化相关的字段
rf.electionTimeoutTimer = time.NewTimer(rf.resetRandomTimer())//初始化选举超时时间
rf.heartBeatTimer = time.NewTimer(100* time.Millisecond)//初始化心跳时间
rf.readPersist(persister.ReadRaftState())//开启选举超时检测go rf.ticker()//开启定期的执行apply的操作go rf.applier(applyCh)return rf
}先初始化添加的相关的字段,然后这里我们将election timeout 时间与 heartBeat 时间的触发器作为了与rf绑定的字段,当然也可也不这样设计,毕竟heartBeat time 可以认为是一个默认值,而election timeout time 是一个随机值,可以不和rf绑定。这里我们根据hint将心跳时间设置为 100 ms, 而选举超时时间设置如下:
// 生成随机时间间隔,范围为 500ms 到 700ms//这里按照hint的时间来写就好func(rf *Raft)resetRandomTimer() time.Duration {
randomDuration := time.Duration(rand.Intn(501)+200)* time.Millisecond
return randomDuration
}然后我们在raft中添加这两个时间触发器的字段:
type Raft struct{
mu sync.Mutex // 每个raft的大锁
peers []*labrpc.ClientEnd // 拥有所有的RPC成员
persister *Persister // 不管
me int// 自己再peers中属于成员的第几位
dead int32// 自己是否活着// Your data here (2A, 2B, 2C).// Look at the paper's Figure 2 for a description of what// state a Raft server must maintain.
currentTerm int//当前raft所在任期
votedFor int//当前任期下给谁投票了,没有投就是-1
logs Log[]//logs
commitIndex int//已经commit的索引
lastApplied int//最后一个applied的索引
nextIndex int[]
matchIndex int[]
state int
electionTimeoutTimer *time.Timer //选举超时定时器 (random)
heartBeatTimer *time.Timer //心跳发送定时器 (stable)}随后就是ticker的构造了,我们现在的轮廓就是 一个无限循环,持续判断是否有超时,然后根据不同的定时器超时来做不同的事情
// The ticker go routine starts a new election if this peer hasn't received// heartsbeats recently.func(rf *Raft)ticker(){for!rf.killed(){//在每一个长循环中都要判断该rf有没有被kill// Your code here to check if a leader election should// be started and to randomize sleeping time using// time.Sleep().//感觉用time.sleep轮询检测会相对慢 这里用定时器select{case<-rf.electionTimeoutTimer.C://如果到选取时间到要开启candidate了
rf.mu.Lock()if rf.state == Follwer || rf.state == Candidate {//只有folloer才会开启投票,candidate也会再次开启DPrintf("server%d开启选举超时", rf.me)go rf.candidateWork()//这里用不用Go程都可以,不用的话记得把函数里面的lock去掉不然会死锁
rf.electionTimeoutTimer.Reset(rf.resetRandomTimer())//重置选举超时时间}
rf.mu.Unlock()case<-rf.heartBeatTimer.C://心跳时间到了 要开启心跳了
rf.mu.Lock()if rf.state == Leader {//只有leader 才会心跳DPrintf("开始广播心跳:%d", rf.me)go rf.broadCastHeartBeat()}
rf.mu.Unlock()}}}通过一个select来接收定时器超时信息,如果对go不熟悉的同学可以去了解一下 select 以及 timer or tricker。当前选举超时触发,判断当前节点是不是Follwer 或者 Candidate,然后执行触发工作,Dprint用来debug,同时访问共享字段的时候要加锁就行,开启选举后,要重置选举超时时间。
这里就涉及到一个关键点,即:在哪些时候要重置选举超时时间?(可以先自己想想)
- 当我们开启一个新的选举的时候,重置选举超时时间
- 当follower收到一个voterequest,并且同意给这个candidate投票的时候
- 任何state下,收到appendRPC 都可以重置 选举超时时间 (可能没必要,但是暂时就这样说)
- 当任何服务器收到比自己更大的currentTerm的服务器的rpc回复时,重置选举超时时间
好,这样我们就把这个框架完善了一点,我们继续看看整个函数的实现。
选举超时后,服务器要做的就是给所有其他的server发送投票请求,并且将自己的state变成candidate,然后任期加一,等待RPC回复消息,根据不同的RPC回复来进行不同的操作。
// 向全部的服务器发送 votefunc(rf *Raft)candidateWork(){// rf.mu.Lock()// defer rf.mu.Unlock()
rf.state = Candidate
rf.currentTerm++
rf.voteSum =1//给自己投票
rf.voteFor = rf.me
currentTerm := rf.currentTerm
for i :=0; i <len(rf.peers); i++{if i == rf.me {continue}
args := RequestVoteArgs{}//置空RPC参数
relpy := RequestVoteReply{}
args.CandidateId = rf.me
args.Term = rf.currentTerm //这里整体的操作是原子性的 所以可以直接用rf.currentTermiflen(rf.logs)==0{
args.LastLogIndex =0
args.LastLogTerm =0}else{
args.LastLogIndex =len(rf.logs)//index是从1开始的
args.LastLogTerm = rf.logs[len(rf.logs)-1].CurrentTerm
}//开启go程要注意参数的变化,如果外部参数有改变的话 传入的参数最好是copy版本go rf.sendRequestVote(i,&args,&relpy)}}在实现的过程中我们发现还需要一个值来记录总共得票数目,这里所有继续为raft添加 voteSum字段,每个开启选举的server都为自己投票。然后遍历所有的peers,构建RequestVoteRPC 的args和 reply。 这里需要注意的是LastLogTerm 是由最后一个log来获取的,所有当没有log的时候我们直接将其设置为0。这里注意LogIndex是实际数组索引+1。然后对于每一个Peers 发送RPC并且等待返回,由于需要等待RPC的返回,所以必须开启一个新的go程来等待。继续看一下这个函数。
func rf.sendRequestVote() 主要作用就是发送 requestvote,然后根据回复来做出相应的操作。
func(rf *Raft)sendRequestVote(server int, args *RequestVoteArgs, reply *RequestVoteReply){DPrintf("%d 给服务器 %d 发送voteRPC请求", rf.me, server)
ok := rf.peers[server].Call("Raft.RequestVote", args, reply)if!ok {DPrintf("requestVote rpc发生问题 server:%d", server)return}
rf.mu.Lock()defer rf.mu.Unlock()DPrintf("当前状态:%d", rf.state)if rf.killed()|| rf.state != Candidate {//如果不是candidate或者rf结束那么就直接结束DPrintf("直接结束")return}//如果currenTerm发生过变化而且还是Candidate 那说明本次的投票过期了,直接结束if args.Term != rf.currentTerm {DPrintf("投票过期")return}if reply.Term > args.Term {//如果收到的term大于自己的
rf.state = Follwer
rf.currentTerm = reply.Term
rf.voteFor =-1
rf.electionTimeoutTimer.Reset(rf.resetRandomTimer())//重置任期时间return}if reply.Vote {//增加得票
rf.voteSum++DPrintf("此时得票:%d, 总共有:%d", rf.voteSum,len(rf.peers))}if rf.voteSum >len(rf.peers)/2{//获得过半票数DPrintf("获得过半票数成为leader,server:%d", rf.me)
rf.state = Leader
//重置相关的nextindexfor i :=range rf.nextIndex {
rf.nextIndex[i]=len(rf.logs)+1}go rf.broadCastHeartBeat()//直接就广播
rf.currentTerm++//任期+1
rf.electionTimeoutTimer.Reset(rf.resetRandomTimer())//重置任期时间}}这里第一个问题就是加锁的位置,显然不能在RPC之前加锁,这样太费性能。但是我们初看好像rf.peers[] 这个是共享变量,这里好像有data race的风险。但是仔细一想,修改这个变量的原因就是添加server,而且应该不会并发的添加server,所以这里不存在data race的风险。随后我们判读当前rf的状态,如果不是candidate 或者被Kill 那么就退出。同时guide还提醒我们,注意可能收到就的RPC,也就是args中的term 与当前的term不一样,当前的term更大,说明此时的rpc过期了,直接不管就好。如果收到大于自己term的reply那么就返回follower状态,修改任期,重置选举超时时间,选举人。
总结,guide中提到了一点就是Figure2中 If RPC request or response contains term T > currentTerm: set currentTerm = T, convert to follower 。也即是 任何时候 收到的rpc消息中的term大于自己的term,那么都会同步currentTerm 并且转变成follower, 此外我总结在这种情况下还会将voteFor重置 ,以及重置选举超时时间。
注意这里voteFor重置的条件,比较重要,踩过一些坑。
后面就是正常的统计选票,当选票过半后,就成为Leader增加任期,立马开始广播。
好,这样发送requestVoteRPC一端我们写完了,开始写收到requestVoteRPC handler。 这里需要注意的是,仔细实现figure2 和文中说的能够给让自己投票的条件。
func(rf *Raft)RequestVote(args *RequestVoteArgs, reply *RequestVoteReply){// Your code here (2A, 2B).DPrintf("收到投票请求, 自己是:%d", rf.me)
rf.mu.Lock()defer rf.mu.Unlock()
reply.Term = rf.currentTerm
if rf.currentTerm > args.Term {DPrintf("候选人的term小于自己的, 不投票")
reply.Vote =false}else{if rf.currentTerm < args.Term {//这里的情况 上述的总结中有提到,及任何时候
rf.voteFor =-1
rf.currentTerm = args.Term
rf.state = Follwer
}DPrintf("此时的votedfor:%d", rf.voteFor)if(rf.voteFor ==-1|| rf.voteFor == args.CandidateId)&&((len(rf.logs)<1|| args.LastLogTerm > rf.logs[len(rf.logs)-1].CurrentTerm)||(args.LastLogTerm == rf.logs[len(rf.logs)-1].CurrentTerm)&& args.LastLogIndex >=len(rf.logs)){//可以投票给他,并且重置选举超时DPrintf("可投, 投票sever:%d", rf.me)
reply.Vote =true
rf.electionTimeoutTimer.Reset(rf.resetRandomTimer())}else{DPrintf("不可投, 不投票sever:%d", rf.me)
reply.Vote =false}}}这里复杂的判断条件列出来就是 :
(此时的rf.voteFor为-1,或者为args.CandidateId)&& ((传过来的最后一个log的term大于自己最后一个log的term 或者 自己没有log)||(两个服务器的最后一个log的term相等,但是传来的服务器的最后一个logIndex更大))
仔细理解下这个判断条件,也就是涉及到文章中的对log更**“新”**的判断。 这样我们就写完了requestRPC的相关内容。、
接下来是appendEntriesRPC的内容,首先是 func broadCastHeartBeat 内容,向所有的peers发送心跳,并且在心跳中加入logs(如果有的话),这里不需要把 心跳RPC 和 携带Logs的RPC 分开处理。 此外由于可能在有添加新的server raft,所有这里可以检测一下,如果有新的server raft 就让nextIndex 和 matchIndex 扩容一下。
// leader 向所有的server 广播自己id心跳func(rf *Raft)broadCastHeartBeat(){
rf.mu.Lock()defer rf.mu.Unlock()DPrintf("一共有%d心跳要发送",len(rf.peers)-1)//扩充nextIndex 和 matchIndexiflen(rf.nextIndex)<len(rf.peers){
n :=len(rf.peers)-len(rf.nextIndex)for i :=0; i < n; i++{
rf.nextIndex =append(rf.nextIndex,len(rf.logs)+1)}for i :=0; i < n; i++{
rf.matchIndex =append(rf.matchIndex,0)}}for index :=range rf.peers {//index
args := AppendEntriesArgs{}
reply := AppendEntriesReply{}if index == rf.me {continue}//对于每一个server服务器都根据nextIndex来发送
args.Term = rf.currentTerm
args.LeaderCommit = rf.commitIndex
args.PreLogIndex = rf.nextIndex[index]-1//要发送log的前一个if args.PreLogIndex <1{
args.PreLogTerm =0}else{
args.PreLogTerm = rf.logs[rf.nextIndex[index]-2].CurrentTerm //index从1开始 而数组索引从0开始}
args.Logs = rf.logs[rf.nextIndex[index]-1:]DPrintf("发送的arg.logs的长度是%v",len(args.Logs))iflen(args.Logs)>0{DPrintf("发送的arg.logs[0].index是%v", args.Logs[0].Index)}go rf.sendAppendEntries(&args,&reply, index)}
rf.heartBeatTimer.Reset(100* time.Millisecond)}这里要非常非常注意 数组下标和logIndex之间的关系 ,以及深入理解nextIndex[],Figure 2中的解释是 要发送的下一个log entries, 我觉得说成是 要发送的第一个log entries的index 更加好理解,前者我自己有点容易搞混。 所以这里 args 中携带的logs就可以根据nextIndex 来选择,而且我是将所有nextIndex后面的logs全部写入进rpc中(可能有其他实现)。 此外args的preLogTern 和 PreLogIndex也是通过nextIndex来获得,LeaderCommit就根据自己的commitIndex来写入就可。
跟requestVoteRPC 一样发送RPC 以及等待回复的过程也需要用go程来开启。前面部分跟sendRequestVoteRPC类似,同样有判断状态,判断是否是旧RPC,判断任期是不是大于自己。
// 发送心跳rpc,并且根据收到的回复进行相应的处理func(rf *Raft)sendAppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply, server int){DPrintf("给server%d发送心跳", server)
ok := rf.peers[server].Call("Raft.AppendEntries", args, reply)if!ok {DPrintf("AppendEntries rpc发生问题, server:%d", server)return}
rf.mu.Lock()defer rf.mu.Unlock()//如果不是Leader 直接不管就好if rf.killed()|| rf.state != Leader {//如果不是candidate或者rf结束那么就直接结束return}//如果当前term发生变化,if args.Term != rf.currentTerm {return}//如果收到的消息的term大于自己的//变成follower,任期更改,重置选选举时间if reply.Term > args.Term {
rf.state = Follwer
rf.voteFor =-1
rf.currentTerm = reply.Term
rf.electionTimeoutTimer.Reset(rf.resetRandomTimer())return}if!reply.Success {//如果回复false,那么就需要把发送的next给向前移动一位if rf.nextIndex[server]>1{
rf.nextIndex[server]= rf.nextIndex[server]-1}DPrintf("收到false,向前移动一位next,now:%v", rf.nextIndex[server])}else{//如果回复true,那么修改该server的matchIndexDPrintf("收到true")
rf.matchIndex[server]= args.PreLogIndex +len(args.Logs)
rf.nextIndex[server]= rf.matchIndex[server]+1DPrintf("matchIndex更新为%v, nextIndex更新为:%v", rf.matchIndex[server], rf.nextIndex[server])//每次能修改matchIndex后,都判断一下是否是当前的term的log,并且有没有超过半数//Leader只能修改commitIndex到当前的term值的log,不能确认之前任期的term值的logiflen(args.Logs)!=0&& args.Logs[len(args.Logs)-1].CurrentTerm == rf.currentTerm {//有发送log,并且最后的log term当前的任期一致DPrintf("并且这次心跳有发送log")
sum :=1for i :=0; i <len(rf.peers); i++{if i == rf.me {continue}if rf.matchIndex[i]>= args.Logs[len(args.Logs)-1].Index {
sum++}if sum*2>len(rf.peers)&& rf.commitIndex < args.Logs[len(args.Logs)-1].Index {
rf.commitIndex = args.Logs[len(args.Logs)-1].Index
DPrintf("该log已经被复制到大部分的server中了")DPrintf("rf.commitIndex变成%v", rf.commitIndex)}}}}}
然后就是核心的内容,如果回复的是false,那么说明当前的nextIndex过于激进,我们选择后退一个(这里我们参考原始的算法,没有有优化的算法);如果是true,那么就说明收到该心跳的server将logs全部添加到自己的logs中了(如果本次心跳有logs的话),所以这里就可以更新 nextIndex 以及 matchIndex了,guide中提到了一个易错点是,这里直接将nextIndex 与 matchIndex跟据最新的状态修改,也就是nextIndex直接跟新为len(rf.logs) + 1 ,matchIndex变成len(rf.logs),这里是错误的,因为在发出RPC的过程到回复的过程中,很可能会有新的logs添加到leader中,所以我们只能根据 args里里面的参数来修改这两个Index。最后,在我的实现中,每一次收到true更改两个Index后,都要判断一次是否有过半的server 提交了这次RPC发送logs的内容,同时为了实现 安全章节中的提到的:leader只能在提交当前任期内的logs的时候才能够顺带一起提交前面的所有logs,所以有了这个条件
args.Logs[len(args.Logs)-1].CurrentTerm == rf.currentTerm
,如果条件都满足那么就会修改Leadercommit,然后在下次的心跳RPC中将leaderRPC发送给follower。
ok 继续看一下appendEntriesRPC hanler的实现:
不同state的server收到心跳rpc 有不同的处理方法,
当处于candidate状态时,收到的所有心跳RPC都返回false(log的处理就留给下一次的心跳吧,nextIndex 往前移动一个也没啥问题),当收到的Term 大于等于 自己的term的时候,变成follower,前文提到了转变的时候要做的事情,即:修改state,同步term,重置voteFor,重置选举超时时间。
处于Leader状态时类似的,唯一的不一样时只有收到 大于 自己的term的时候,才会变成follower。
处于Follower状态就要处理核心事情了。这里处理logs,如何判断冲突logs,以及如何截断logs在guide中也有提示。我们直接照着做就行。 简单来说就是,遍历发送来的Logs,根据Index来与本server中的log进行比较,如果相同,那么就下一个,如果冲突,那么就删除全部,直接全部添加,如果现在server的logs太少了也直接添加。这里我就不详细解释了,仔细看看我的实现,以及旁边的注释。
func(rf *Raft)AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply){
rf.mu.Lock()defer rf.mu.Unlock()DPrintf("处理心跳 server:%d", rf.me)//这里candidate收到心跳后 永远返回的是false,就算自己变成了follower 也暂时不处理log//等待下一个心跳来后再处理if rf.state == Candidate {//如果当前状态是candidateif args.Term >= rf.currentTerm {//收到大于等于自己任期的心跳
rf.state = Follwer
rf.currentTerm = args.Term
rf.voteFor =-1//将自己的投票人清除//重置选举超时时间
rf.electionTimeoutTimer.Reset(rf.resetRandomTimer())}
reply.Success =false
reply.Term = rf.currentTerm
}elseif rf.state == Leader {//如果当前状态是leader 处理方法跟candidate类似if args.Term > rf.currentTerm {
rf.state = Follwer
rf.currentTerm = args.Term
rf.voteFor =-1//将自己的投票人清除
rf.electionTimeoutTimer.Reset(rf.resetRandomTimer())//重置时间?}
reply.Success =false
reply.Term = rf.currentTerm
}else{//如果当前状态是follower//收到的term小于自己的 让他滚 不用重置选举超时时间DPrintf("当前的term:%d, 传入的term:%d, 当前的server:%d", rf.currentTerm, args.Term, rf.me)if args.Term < rf.currentTerm {
reply.Term = rf.currentTerm
reply.Success =falsereturn}//将自己的term更新if args.Term > rf.currentTerm {
rf.voteFor =-1//将自己的投票人清除}
rf.electionTimeoutTimer.Reset(rf.resetRandomTimer())
rf.currentTerm = args.Term
reply.Term = rf.currentTerm
DPrintf("收到的preIndex:%v", args.PreLogIndex)//如果没有目标索引的日志iflen(rf.logs)< args.PreLogIndex {DPrintf("索引过大返回false")
reply.Success =falsereturn}//如果preLogIndex 为0,那么直接全部直接添加就好。if args.PreLogIndex <=0{
reply.Success =true
rf.logs =append(rf.logs, args.Logs...)return}//如果目标索引的日志term不一样 全删了if rf.logs[args.PreLogIndex-1].CurrentTerm != args.PreLogTerm {
rf.logs = rf.logs[:args.PreLogIndex-1]DPrintf("preIndexlog内容不一样 还需要往前移动,返回false")
reply.Success =false}else{//如果找到了就要挨个判断是否相同 不相同 还是要全删掉,如果到结尾了 那么就添加for_, log :=range args.Logs {if log.Index >len(rf.logs){
rf.logs =append(rf.logs, log)DPrintf("server:%v添加日志index%v", rf.me, log.Index)continue}DPrintf("当前的log.index为%v", log.Index)if rf.logs[log.Index-1].CurrentTerm != log.CurrentTerm {DPrintf("只保留0-%v之间的log,然后继续添加", log.Index-2)
rf.logs = rf.logs[:log.Index-1]//如果只是删减不添加的话,后面的commit会出现问题(踩坑)
rf.logs =append(rf.logs, log)}}//如果可以追加了那么就可以将commit 更新一下了//注意这里min的另一个对象是此时发送的最新的logDPrintf("leadercommit为%v", args.LeaderCommit)if args.LeaderCommit > rf.commitIndex {DPrintf("可以更新当前server的commit")iflen(args.Logs)==0{//如果没有发送log 说明该follower 与log一致(起码是leadercommit之前的一致)
rf.commitIndex = args.LeaderCommit
}elseif args.LeaderCommit > args.Logs[len(args.Logs)-1].Index {
rf.commitIndex = args.Logs[len(args.Logs)-1].Index
}else{
rf.commitIndex = args.LeaderCommit
}}DPrintf("更新commit为%v", rf.commitIndex)
reply.Success =true}}}到此我们绝大部分的任务都完成了,还差一个appiler,最开始提到的ApplyMsg,作用就是提醒上层的的service 可以执行该命令,也即我们apply的方式就是发送ApplyMsg。我们这里根据hint 来新建一个applier 工程,来定时判断是否能够应用相关的log。
// 执行applierfunc(rf *Raft)applier(applyCh chan ApplyMsg){DPrintf("server%v开始执行app", rf.me)for!rf.killed(){//每次间隔10ms 判断一次是否有需要提交的
rf.mu.Lock()for rf.commitIndex > rf.lastApplied {//执行
applyMsg := ApplyMsg{}
applyMsg.CommandValid =true
applyMsg.Command = rf.logs[rf.lastApplied].Command
applyMsg.CommandIndex = rf.lastApplied +1
applyCh <- applyMsg
rf.lastApplied++DPrintf("server%v应用%dlog", rf.me, applyMsg.CommandIndex)}
rf.mu.Unlock()
time.Sleep(10* time.Millisecond)}}同时还有跟客户端用来发送请求的func start函数
func(rf *Raft)Start(command interface{})(int,int,bool){
index :=-1
term :=-1
isLeader :=true// Your code here (2B).
rf.mu.Lock()defer rf.mu.Unlock()if rf.state != Leader {//如果当前的raft不是leader
isLeader =false}else{//如果当前的raft是leader
addLog := Log{
Command: command,
CurrentTerm: rf.currentTerm,
Index:len(rf.logs)+1,}
rf.logs =append(rf.logs, addLog)
term = rf.currentTerm
index = addLog.Index
// for i := 0; i < len(rf.nextIndex); i++ {// rf.nextIndex[i] = len(rf.logs) + 1// }//直接开启发送DPrintf("当前有Leader有%d个log entries",len(rf.logs))// go rf.broadCastHeartBeat()}return index, term, isLeader
}这里直接添加logs就好了,同时这里我们看到command用的是interface{},所以我们也将最开始的log struct 修改一致。需要注意的是,不需要每一次都重置一下所有的nextIndex(不知道我当时脑袋抽风了,这样写了),以及也没有必要每一次start都立马广播一次,交给心跳超时就好。(不然可能同时有超级多个start被调用,这样就会发出太多了RPC了)。
至此2A & 2B都能完成了。这里是全部代码
心得:最大的体会就是自己独立完成两个lab成就感还是有不少的,就是这段时间拖拖拉拉,效率太低。最大的提升就是debug能力,每次疯狂看几万行log,短短700+行的代码debug就让我有的难顶。当然最后还是顺利通过,很开心。
还有未完成的点就是想看看如何设置测试函数的,之前都很少设置,好像最开始学java 的时候有设置过。再然后可以看看这里模拟RPC通信,并且模拟控制网络分区 是如何实现的,感觉也有点意思,虽然之前c++的RPC的项目我还没怎么看哈哈,就先这样吧。
版权声明:本文标题:掌握Flash艺术:轻松上手Adobe Flash Player技巧 内容由网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:http://www.roclinux.cn/p/1770809645a3537823.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论