Now Reading
Implementing a distributed key-value retailer on prime of implementing Raft in Go

Implementing a distributed key-value retailer on prime of implementing Raft in Go

2023-05-25 08:26:47

As a part of bringing myself up-to-speed after becoming a member of TigerBeetle, I
needed some background on how distributed consensus and replicated state
machines protocols work. TigerBeetle makes use of Viewstamped
. However I
needed to grasp all well-liked protocols and I made a decision to start out with

We’ll implement two key parts of Raft on this publish (chief
election and log replication). Round 1k strains of Go. It took me
round 7 months of sporadic learning to come back to (what I hope is) an
understanding of the fundamentals.

Disclaimer: I am not an knowledgeable. My implementation is not but hooked
as much as Jepsen. I’ve run it
via a mixture of
manual and
it appears usually right. This isn’t supposed for use in
manufacturing. It is only for my training.

All code for this challenge is available on GitHub.

Let’s dig in!

The algorithm

The Raft paper itself is sort of
readable. Give it a learn and you will get the fundamental concept.

The gist is that nodes in a cluster conduct elections to choose
a frontrunner. Customers of the Raft cluster ship messages to the chief. The
chief passes the message to followers and waits for a majority to
retailer the message. As soon as the message is dedicated (majority consensus
has been reached), the message is utilized to a state machine the person
provides. Followers be taught concerning the newest dedicated message from the
chief and apply every new dedicated message to their native
user-supplied state machine.

There’s extra to it together with reconfiguration and snapshotting, which I
will not get into on this publish. However you will get the gist of Raft by
fascinated with 1) chief election and a couple of) replicated logs powering
replicated state machines.

Modeling with state machines and key-value shops

I’ve written earlier than about how one can build a key-value store on top
. How
you may build a SQL database on top of a key-value
. And how one can
construct a distributed SQL database on top of

This publish will begin fairly equally to that first publish apart from
that we can’t cease on the Raft layer.

A distributed key-value retailer

To construct on prime of the Raft library we’ll construct, we have to create a
state machine and instructions which are despatched to the state machine.

Our state machine may have two operations: get a price from a key,
and set a key to a price.

It will go in cmd/kvapi/foremost.go.

package deal foremost

import (
    crypto "crypto/rand"


sort statemachine struct {
    db     *sync.Map
    server int

sort commandKind uint8

const (
    setCommand commandKind = iota

sort command struct {
    sort  commandKind
    key   string
    worth string

func (s *statemachine) Apply(cmd []byte) ([]byte, error) {
    c := decodeCommand(cmd)

    swap c.sort {
    case setCommand:
        s.db.Retailer(c.key, c.worth)
    case getCommand:
        worth, okay := s.db.Load(c.key)
        if !okay {
            return nil, fmt.Errorf("Key not discovered")
        return []byte(worth.(string)), nil
        return nil, fmt.Errorf("Unknown command: %x", cmd)

    return nil, nil

However the Raft library we’ll construct must take care of varied state
machines. So instructions handed from the person into the Raft cluster should
be serialized to bytes.

func encodeCommand(c command) []byte {
    msg := bytes.NewBuffer(nil)
    err := msg.WriteByte(uint8(c.sort))
    if err != nil {

    err = binary.Write(msg, binary.LittleEndian, uint64(len(c.key)))
    if err != nil {


    err = binary.Write(msg, binary.LittleEndian, uint64(len(c.worth)))
    if err != nil {


    return msg.Bytes()

And the Apply() perform from above wants to have the ability to decode the

func decodeCommand(msg []byte) command {
    var c command
    c.sort = commandKind(msg[0])

    keyLen := binary.LittleEndian.Uint64(msg[1:9])
    c.key = string(msg[9 : 9+keyLen])

    if c.sort == setCommand {
        valLen := binary.LittleEndian.Uint64(msg[9+keyLen : 9+keyLen+8])
        c.worth = string(msg[9+keyLen+8 : 9+keyLen+8+valLen])

    return c


Now that we have modeled the key-value retailer as a state machine. Let’s
construct the HTTP endpoints that enable the person to function the state
machine via the Raft cluster.

First, let’s implement the set operation. We have to seize the important thing
and worth the person passes in and name Apply() on the Raft
cluster. Calling Apply() on the Raft cluster will finally name
the Apply() perform we simply wrote, however not till the message despatched
to the Raft cluster is definitely replicated.

sort httpServer struct {
    raft *goraft.Server
    db   *sync.Map

// Instance:
//  curl http://localhost:2020/set?key=x&worth=1
func (hs httpServer) setHandler(w http.ResponseWriter, r *http.Request) {
    var c command
    c.sort = setCommand
    c.key = r.URL.Question().Get("key")
    c.worth = r.URL.Question().Get("worth")

    _, err := hs.raft.Apply([][]byte{encodeCommand(c)})
    if err != nil {
        log.Printf("Couldn't write key-value: %s", err)
        http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest)

To reiterate, we inform the Raft cluster we would like this message
replicated. The message accommodates the operation sort (set) and the
operation particulars (key and worth). These messages are customized to
the state machine we wrote. And they are going to be interpreted by the state
machine we wrote, on every node within the cluster.

Subsequent we deal with get-ing values from the cluster. There are two methods
to do that. We already embed an area copy of the distributed key-value
map. We may simply learn from that map within the present course of. Nevertheless it
won’t be up-to-date or right. It might be quick to learn
although. And handy for debugging.

However the one correct way to read from a Raft
is to move the
learn via the log replication too.

So we’ll help each.

// Instance:
//  curl http://localhost:2020/get?key=x
//  1
//  curl http://localhost:2020/get?key=x&relaxed=true # Skips consensus for the learn.
//  1
func (hs httpServer) getHandler(w http.ResponseWriter, r *http.Request) {
    var c command
    c.sort = getCommand
    c.key = r.URL.Question().Get("key")

    var worth []byte
    var err error
    if r.URL.Question().Get("relaxed") == "true" {
        v, okay := hs.db.Load(c.key)
        if !okay {
            err = fmt.Errorf("Key not discovered")
        } else {
            worth = []byte(v.(string))
    } else {
        var outcomes []goraft.ApplyResult
        outcomes, err = hs.raft.Apply([][]byte{encodeCommand(c)})
        if err == nil {
            if len(outcomes) != 1 {
                err = fmt.Errorf("Anticipated single response from Raft, acquired: %d.", len(outcomes))
            } else if outcomes[0].Error != nil {
                err = outcomes[0].Error
            } else {
                worth = outcomes[0].Consequence


    if err != nil {
        log.Printf("Couldn't encode key-value in http response: %s", err)
        http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError)

    written := 0
    for written < len(worth) {
        n, err := w.Write(worth[written:])
        if err != nil {
            log.Printf("Couldn't encode key-value in http response: %s", err)
            http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError)

        written += n


Now that we have arrange our customized state machine and our HTTP API for
interacting with the Raft cluster, we’ll tie it along with studying
configuration from the command-line and truly beginning the Raft
node and the HTTP API.

sort config struct {
    cluster []goraft.ClusterMember
    index   int
    id      string
    handle string
    http    string

func getConfig() config {
    cfg := config{}
    var node string
    for i, arg := vary os.Args[1:] {
        if arg == "--node" {
            var err error
            node = os.Args[i+2]
            cfg.index, err = strconv.Atoi(node)
            if err != nil {
                log.Deadly("Anticipated $worth to be a legitimate integer in `--node $worth`, acquired: %s", node)

        if arg == "--http" {
            cfg.http = os.Args[i+2]

        if arg == "--cluster" {
            cluster := os.Args[i+2]
            var clusterEntry goraft.ClusterMember
            for _, half := vary strings.Break up(cluster, ";") {
                idAddress := strings.Break up(half, ",")
                var err error
                clusterEntry.Id, err = strconv.ParseUint(idAddress[0], 10, 64)
                if err != nil {
                    log.Deadly("Anticipated $id to be a legitimate integer in `--cluster $id,$ip`, acquired: %s", idAddress[0])

                clusterEntry.Tackle = idAddress[1]
                cfg.cluster = append(cfg.cluster, clusterEntry)


    if node == "" {
        log.Deadly("Lacking required parameter: --node $index")

    if cfg.http == "" {
        log.Deadly("Lacking required parameter: --http $handle")

    if len(cfg.cluster) == 0 {
        log.Deadly("Lacking required parameter: --cluster $node1Id,$node1Address;...;$nodeNId,$nodeNAddress")

    return cfg

func foremost() {
    var b [8]byte
    _, err := crypto.Learn(b[:])
    if err != nil {
        panic("can't seed math/rand package deal with cryptographically safe random quantity generator")

    cfg := getConfig()

    var db sync.Map

    var sm statemachine
    sm.db = &db
    sm.server = cfg.index

    s := goraft.NewServer(cfg.cluster, &sm, ".", cfg.index)
    go s.Begin()

    hs := httpServer{s, &db}

    http.HandleFunc("/set", hs.setHandler)
    http.HandleFunc("/get", hs.getHandler)
    err = http.ListenAndServe(cfg.http, nil)
    if err != nil {

And that is it for the simple half: a distributed key-value retailer on prime
of a Raft cluster.

Subsequent we have to implement Raft.

A Raft server

If we check out Determine 2 within the Raft paper, we get an concept for
all of the state we have to mannequin.

Raft Figure 2

We’ll dig into the main points as we go. However for now let’s flip that mannequin
into just a few Go sorts. This goes in raft.go within the base listing,
not cmd/kvapi.

package deal goraft

import (

sort StateMachine interface {
    Apply(cmd []byte) ([]byte, error)

sort ApplyResult struct {
    Consequence []byte
    Error  error

sort Entry struct {
    Command []byte
    Time period    uint64

    // Set by the first so it might study the results of
    // making use of this command to the state machine
    consequence  chan ApplyResult

sort ClusterMember struct {
    Id      uint64
    Tackle string

    // Index of the subsequent log entry to ship
    nextIndex uint64
    // Highest log entry recognized to be replicated
    matchIndex uint64

    // Who was voted for in the latest time period
    votedFor uint64

    // TCP connection
    rpcClient *rpc.Shopper

sort ServerState string

const (
    leaderState    ServerState = "chief"
    followerState              = "follower"
    candidateState             = "candidate"

sort Server struct {
    // These variables for shutting down.
    completed bool
        server *http.Server

    Debug bool

    mu sync.Mutex
    // ----------- PERSISTENT STATE -----------

    // The present time period
    currentTerm uint64

    log []Entry

    // votedFor is saved in `cluster []ClusterMember` beneath,
    // mapped by `clusterIndex` beneath

    // ----------- READONLY STATE -----------

    // Distinctive identifier for this Server
    id uint64

    // The TCP handle for RPC
    handle string

    // When to start out elections after no append entry messages
    electionTimeout time.Time

    // How typically to ship empty messages
    heartbeatMs int

    // When to subsequent ship empty message
    heartbeatTimeout time.Time

    // Consumer-provided state machine
    statemachine StateMachine

    // Metadata listing
    metadataDir string

    // Metadata retailer
    fd *os.File

    // ----------- VOLATILE STATE -----------

    // Index of highest log entry recognized to be dedicated
    commitIndex uint64

    // Index of highest log entry utilized to state machine
    lastApplied uint64

    // Candidate, follower, or chief
    state ServerState

    // Servers within the cluster, together with this one
    cluster []ClusterMember

    // Index of this server
    clusterIndex int

And let’s construct a constructor to initialize the state for all servers
within the cluster, in addition to native server state.

func NewServer(
    clusterConfig []ClusterMember,
    statemachine StateMachine,
    metadataDir string,
    clusterIndex int,
) *Server {
    // Explicitly make a replica of the cluster as a result of we'll be
    // modifying it on this server.
    var cluster []ClusterMember
    for _, c := vary clusterConfig {
        if c.Id == 0 {
            panic("Id should not be 0.")
        cluster = append(cluster, c)

    return &Server{
        id:           cluster[clusterIndex].Id,
        handle:      cluster[clusterIndex].Tackle,
        cluster:      cluster,
        statemachine: statemachine,
        metadataDir:  metadataDir,
        clusterIndex: clusterIndex,
        heartbeatMs:  300,
        mu:           sync.Mutex{},

And add just a few debugging and assertion helpers.

func (s *Server) debugmsg(msg string) string {
    return fmt.Sprintf("%s [Id: %d, Term: %d] %s", time.Now().Format(time.RFC3339Nano),, s.currentTerm, msg)

func (s *Server) debug(msg string) {
    if !s.Debug {

func (s *Server) debugf(msg string, args ...any) {
    if !s.Debug {

    s.debug(fmt.Sprintf(msg, args...))

func (s *Server) warn(msg string) {
    fmt.Println("[WARN] " + s.debugmsg(msg))

func (s *Server) warnf(msg string, args ...any) {
    fmt.Println(fmt.Sprintf(msg, args...))

func Assert[T comparable](msg string, a, b T) {
    if a != b {
        panic(fmt.Sprintf("%s. Received a = %#v, b = %#v", msg, a, b))

func Server_assert[T comparable](s *Server, msg string, a, b T) {
    Assert(s.debugmsg(msg), a, b)

Persistent state

As Determine 2 says, currentTerm, log, and votedFor should be
persevered to disk as they’re edited.

I wish to initially doing the stupidest factor potential. So within the
first model of this challenge I used encoding/gob to put in writing these
three fields to disk each time s.persist() was referred to as.

Here’s what this primary model appeared like:

func (s *Server) persist() {

    s.fd.Search(0, 0)
    enc := gob.NewEncoder(s.fd)
    err := enc.Encode(PersistentState{
        CurrentTerm: s.currentTerm,
        Log:         s.log,
        VotedFor:    s.votedFor,
    if err != nil {
    if err = s.fd.Sync(); err != nil {
    s.debug(fmt.Sprintf("Endured. Time period: %d. Log Len: %d. Voted For: %s.", s.currentTerm, len(s.log), s.votedFor))

However doing so means this implementation is a perform of the dimensions of
the log. And that was horrible for throughput.

I additionally seen that encoding/gob is fairly inefficient.

For a easy struct like:

sort X struct {
    A uint64
    B []uint64
    C bool

encoding/gob makes use of 68 bytes to store that data for when B has two
. If we wrote the
encoder/decoder ourselves we may retailer that struct in 33 bytes (8
(sizeof(A)) + 8 (sizeof(len(B))) + 16 (len(B) * sizeof(B)) + 1

It is not that encoding/gob is dangerous. It simply doubtless has completely different
constraints than we’re occasion to.

So I made a decision to swap out encoding/gob for merely binary encoding the
fields and likewise, importantly, protecting monitor of precisely what number of
entries within the log should be written and solely writing that many.


This is what that appears like.

const PAGE_SIZE = 4096
const ENTRY_HEADER = 16
const ENTRY_SIZE = 128

// Should be referred to as inside
func (s *Server) persist(writeLog bool, nNewEntries int) {
    t := time.Now()

    if nNewEntries == 0 && writeLog {
        nNewEntries = len(s.log)

    s.fd.Search(0, 0)

    var web page [PAGE_SIZE]byte
    // Bytes 0  - 8:   Present time period
    // Bytes 8  - 16:  Voted for
    // Bytes 16 - 24:  Log size
    // Bytes 4096 - N: Log

    binary.LittleEndian.PutUint64(web page[:8], s.currentTerm)
    binary.LittleEndian.PutUint64(web page[8:16], s.getVotedFor())
    binary.LittleEndian.PutUint64(web page[16:24], uint64(len(s.log)))
    n, err := s.fd.Write(web page[:])
    if err != nil {
    Server_assert(s, "Wrote full web page", n, PAGE_SIZE)

    if writeLog && nNewEntries > 0 {
        newLogOffset := max(len(s.log)-nNewEntries, 0)

        s.fd.Search(int64(PAGE_SIZE+ENTRY_SIZE*newLogOffset), 0)
        bw := bufio.NewWriter(s.fd)

        var entryBytes [ENTRY_SIZE]byte
        for i := newLogOffset; i < len(s.log); i++ {
            // Bytes 0 - 8:    Entry time period
            // Bytes 8 - 16:   Entry command size
            // Bytes 16 - ENTRY_SIZE: Entry command

            if len(s.log[i].Command) > ENTRY_SIZE-ENTRY_HEADER {
                panic(fmt.Sprintf("Command is simply too giant (%d). Should be at most %d bytes.", len(s.log[i].Command), ENTRY_SIZE-ENTRY_HEADER))

            binary.LittleEndian.PutUint64(entryBytes[:8], s.log[i].Time period)
            binary.LittleEndian.PutUint64(entryBytes[8:16], uint64(len(s.log[i].Command)))
            copy(entryBytes[16:], []byte(s.log[i].Command))

            n, err := bw.Write(entryBytes[:])
            if err != nil {
            Server_assert(s, "Wrote full web page", n, ENTRY_SIZE)

        err = bw.Flush()
        if err != nil {

    if err = s.fd.Sync(); err != nil {
    s.debugf("Endured in %s. Time period: %d. Log Len: %d (%d new). Voted For: %d.", time.Now().Sub(t), s.currentTerm, len(s.log), nNewEntries, s.getVotedFor())

Once more the necessary factor is that solely the entries that want to be
written are written. We try this by search-ing to the offset of the
first entry that must be written.

And we accumulate writes of entries in a bufio.Author so we do not waste
write syscalls. Remember to flush the buffered author!

And do not forget to flush all writes to disk with fd.Sync().

ENTRY_SIZE is one thing that I may see being configurable based mostly
on the workload. Some workloads really want solely 128 bytes. However a
key-value retailer most likely needs way more than that. This
implementation would not attempt to deal with the case of utterly
arbitrary sized keys and values.

Lastly, just a few helpers utilized in there:

func min[T ~int | ~uint64](a, b T) T {
    if a < b {
        return a

    return b

func max[T ~int | ~uint64](a, b T) T {
    if a > b {
        return a

    return b

// Should be referred to as inside
func (s *Server) getVotedFor() uint64 {
    for i := vary s.cluster {
        if i == s.clusterIndex {
            return s.cluster[i].votedFor

    Server_assert(s, "Invalid cluster", true, false)
    return 0


Now let’s do the reverse operation, restoring from disk. It will
solely be referred to as as soon as on startup.

func (s *Server) restore() {

    if s.fd == nil {
        var err error
        s.fd, err = os.OpenFile(
            path.Be part of(s.metadataDir, fmt.Sprintf("md_percentd.dat",,
        if err != nil {

    s.fd.Search(0, 0)

    // Bytes 0  - 8:   Present time period
    // Bytes 8  - 16:  Voted for
    // Bytes 16 - 24:  Log size
    // Bytes 4096 - N: Log
    var web page [PAGE_SIZE]byte
    n, err := s.fd.Learn(web page[:])
    if err == io.EOF {
    } else if err != nil {
    Server_assert(s, "Learn full web page", n, PAGE_SIZE)

    s.currentTerm = binary.LittleEndian.Uint64(web page[:8])
    s.setVotedFor(binary.LittleEndian.Uint64(web page[8:16]))
    lenLog := binary.LittleEndian.Uint64(web page[16:24])
    s.log = nil

    if lenLog > 0 {
        s.fd.Search(int64(PAGE_SIZE), 0)

        var e Entry
        for i := 0; uint64(i) < lenLog; i++ {
            var entryBytes [ENTRY_SIZE]byte
            n, err := s.fd.Learn(entryBytes[:])
            if err != nil {
            Server_assert(s, "Learn full entry", n, ENTRY_SIZE)

            // Bytes 0 - 8:    Entry time period
            // Bytes 8 - 16:   Entry command size
            // Bytes 16 - ENTRY_SIZE: Entry command
            e.Time period = binary.LittleEndian.Uint64(entryBytes[:8])
            lenValue := binary.LittleEndian.Uint64(entryBytes[8:16])
            e.Command = entryBytes[16 : 16+lenValue]
            s.log = append(s.log, e)


And some helpers it calls:

func (s *Server) ensureLog() {
    if len(s.log) == 0 {
        // At all times has at the very least one log entry.
        s.log = append(s.log, Entry{})

// Should be referred to as inside
func (s *Server) setVotedFor(id uint64) {
    for i := vary s.cluster {
        if i == s.clusterIndex {
            s.cluster[i].votedFor = id

    Server_assert(s, "Invalid cluster", true, false)

The primary loop

Now let’s take into consideration the principle loop. Earlier than beginning the loop we want
to 1) restore persistent state from disk and a couple of) kick off an RPC
server so servers within the cluster can ship and obtain messages to and
from eachother.

// Ensure that rand is seeded
func (s *Server) Begin() {
    s.state = followerState
    s.completed = false


    rpcServer := rpc.NewServer()
    l, err := web.Hear("tcp", s.handle)
    if err != nil {
    mux := http.NewServeMux()
    mux.Deal with(rpc.DefaultRPCPath, rpcServer)

    s.server = &http.Server{Handler: mux}
    go s.server.Serve(l)

    go func() {

        for {
            if s.completed {
            state := s.state

In the principle loop we’re both within the chief state, follower state or
candidate state.

All states will probably obtain RPC messages from different servers in
the cluster however that will not be modeled on this foremost loop.

The one factor happening in the principle loop is that:

  • We ship heartbeat RPCs (chief state)
  • We attempt to advance the commit index (chief state solely) and apply instructions to the state machine (chief and follower states)
  • We set off a brand new election if we’ve not obtained a message in a while (candidate and follower states)
  • Or we turn out to be the chief (candidate state)
            swap state {
            case leaderState:
            case followerState:
            case candidateState:

Let’s take care of chief election first.

Chief election

Chief election occurs each time nodes have not obtained a message
from a legitimate chief in a while.

I am going to break this up into 4 main items:

  1. Timing out and changing into a candidate after a random (however bounded)
    time period of not listening to a message from a legitimate chief:
  2. The candidate requests votes from all different servers: s.requestVote().
  3. All servers deal with vote requests: s.HandleRequestVoteRequest().
  4. A candidate with a quorum of vote requests turns into the chief: s.becomeLeader().

You increment currentTerm, vote for your self and ship RPC vote
requests to different nodes within the server.

func (s *Server) resetElectionTimeout() {
    interval := time.Length(rand.Intn(s.heartbeatMs*2) + s.heartbeatMs*2)
    s.debugf("New interval: %s.", interval*time.Millisecond)
    s.electionTimeout = time.Now().Add(interval * time.Millisecond)

func (s *Server) timeout() {

    hasTimedOut := time.Now().After(s.electionTimeout)
    if hasTimedOut {
        s.debug("Timed out, beginning new election.")
        s.state = candidateState
        for i := vary s.cluster {
            if i == s.clusterIndex {
                s.cluster[i].votedFor =
            } else {
                s.cluster[i].votedFor = 0

        s.persist(false, 0)

The whole lot in there may be applied already apart from
s.requestVote(). Let’s dig into that.


By referring again to Determine 2 from the Raft paper we are able to see easy methods to
mannequin the request vote request and response. Let’s flip that into some Go sorts.

sort RPCMessage struct {
    Time period uint64

sort RequestVoteRequest struct {

    // Candidate requesting vote
    CandidateId uint64

    // Index of candidate's final log entry
    LastLogIndex uint64

    // Time period of candidate's final log entry
    LastLogTerm uint64

sort RequestVoteResponse struct {

    // True means candidate obtained vote
    VoteGranted bool

Now we simply must fill the RequestVoteRequest struct out and ship
it to one another node within the cluster in parallel. As we iterate
via nodes within the cluster, we skip ourselves (we all the time instantly
vote for ourselves).

func (s *Server) requestVote() {
    for i := vary s.cluster {
        if i == s.clusterIndex {

        go func(i int) {

            s.debugf("Requesting vote from %d.", s.cluster[i].Id)

            lastLogIndex := uint64(len(s.log) - 1)
            lastLogTerm := s.log[len(s.log)-1].Time period

            req := RequestVoteRequest{
                RPCMessage: RPCMessage{
                    Time period: s.currentTerm,
                LastLogIndex: lastLogIndex,
                LastLogTerm:  lastLogTerm,

            var rsp RequestVoteResponse
            okay := s.rpcCall(i, "Server.HandleRequestVoteRequest", req, &rsp)
            if !okay {
                // Will retry later

Now keep in mind from Determine 2 within the Raft paper that we should all the time test
that the RPC request and response remains to be legitimate. If the time period of the
response is larger than our personal time period, we should instantly cease
processing and revert to follower state.

In any other case provided that the response remains to be related to us for the time being
(the response time period is similar because the request time period) and the request
has succeeded can we depend the vote.


            if s.updateTerm(rsp.RPCMessage) {

            dropStaleResponse := rsp.Time period != req.Time period
            if dropStaleResponse {

            if rsp.VoteGranted {
                s.debugf("Vote granted by %d.", s.cluster[i].Id)
                s.cluster[i].votedFor =

And that is it for the candidate aspect of requesting a vote.

The implementation of s.updateTerm() is easy. It simply takes care
of transitioning to follower state if the time period of an RPC message is
higher than the node’s present time period.

// Should be referred to as inside a
func (s *Server) updateTerm(msg RPCMessage) bool {
    transitioned := false
    if msg.Time period > s.currentTerm {
        s.currentTerm = msg.Time period
        s.state = followerState
        transitioned = true
        s.debug("Transitioned to follower")
        s.persist(false, 0)
    return transitioned

And the implementation of s.rpcCall() is a wrapper round web/rpc
to lazily join.

func (s *Server) rpcCall(i int, title string, req, rsp any) bool {
    c := s.cluster[i]
    var err error
    var rpcClient *rpc.Shopper = c.rpcClient
    if c.rpcClient == nil {
        c.rpcClient, err = rpc.DialHTTP("tcp", c.Tackle)
        rpcClient = c.rpcClient

    // TODO: the place/easy methods to reconnect if the connection should be reestablished?

    if err == nil {
        err = rpcClient.Name(title, req, rsp)

    if err != nil {
        s.warnf("Error calling %s on %d: %s.", title, c.Id, err)

    return err == nil

Let’s dig into the opposite aspect of request vote, what occurs when a
node receives a vote request?


First off, as mentioned above, we should all the time test the RPC time period
versus our personal and revert to follower if the time period is larger than our
personal. (Do not forget that since that is an RPC request it may come to a
server in any state: chief, candidate, or follower.)

func (s *Server) HandleRequestVoteRequest(req RequestVoteRequest, rsp *RequestVoteResponse) error {


    s.debugf("Obtained vote request from %d.", req.CandidateId)

Then we are able to return instantly if the request time period is decrease than our
personal (meaning it is an outdated request).

    rsp.VoteGranted = false
    rsp.Time period = s.currentTerm

    if req.Time period < s.currentTerm {
        s.debugf("Not granting vote request from %d.", req.CandidateId)
        Server_assert(s, "VoteGranted = false", rsp.VoteGranted, false)
        return nil

And at last, we test to ensure the requester’s log is at the very least as
up-to-date as our personal and that we’ve not already voted for

The primary situation (up-to-date log) was not described in
the Raft paper that I may discover. However the writer of the paper
revealed a Raft TLA+ spec that does have it

And the second situation you would possibly assume may by no means occur since we
already wrote the code that stated after we set off an election we vote
for ourselves. However since every server has a random election timeout,
the one who begins the election will differ in timing sufficiently
sufficient to catch different servers and permit them to vote for it.

    lastLogTerm := s.log[len(s.log)-1].Time period
    logLen := uint64(len(s.log) - 1)
    logOk := req.LastLogTerm > lastLogTerm ||
        (req.LastLogTerm == lastLogTerm && req.LastLogIndex >= logLen)
    grant := req.Time period == s.currentTerm &&
        logOk &&
        (s.getVotedFor() == 0 || s.getVotedFor() == req.CandidateId)
    if grant {
        s.debugf("Voted for %d.", req.CandidateId)
        rsp.VoteGranted = true
        s.persist(false, 0)
    } else {
        s.debugf("Not granting vote request from %d.", +req.CandidateId)

    return nil

Lastly, we have to handle how the candidate who despatched out vote
requests really turns into the chief.


This can be a comparatively easy methodology. If we now have a quorum of votes, we
turn out to be the chief!

func (s *Server) becomeLeader() {

    quorum := len(s.cluster)/2 + 1
    for i := vary s.cluster {
        if s.cluster[i].votedFor == && quorum > 0 {

There’s a little bit of bookkeeping we have to do like resetting nextIndex
and matchIndex for every server (famous in Determine 2). And we additionally want
to append a clean entry for the brand new time period.

Regardless of the part quoted beneath in code, I nonetheless do not perceive
why this clean entry is important.

    if quorum == 0 {
        // Reset all cluster state
        for i := vary s.cluster {
            s.cluster[i].nextIndex = uint64(len(s.log) + 1)
            // Sure, even matchIndex is reset. Determine 2
            // from Raft reveals each nextIndex and
            // matchIndex are reset after each election.
            s.cluster[i].matchIndex = 0

        s.debug("New chief.")
        s.state = leaderState

        // From Part 8 Shopper Interplay:
        // > First, a frontrunner will need to have the newest data on
        // > which entries are dedicated. The Chief
        // > Completeness Property ensures {that a} chief has
        // > all dedicated entries, however in the beginning of its
        // > time period, it could not know which these are. To seek out out,
        // > it must commit an entry from its time period. Raft
        // > handles this by having every chief commit a clean
        // > no-op entry into the log in the beginning of its time period.
        s.log = append(s.log, Entry{Time period: s.currentTerm, Command: nil})
        s.persist(true, 1)

        // Triggers s.appendEntries() within the subsequent tick of the
        // foremost state loop.
        s.heartbeatTimeout = time.Now()

And we’re completed with elections!

After I was engaged on this for the primary time, I simply stopped right here and
made positive I may get to a steady chief shortly. If it takes extra
than 1 time period to determine a frontrunner while you run three servers within the
cluster on localhost, you’ve got most likely acquired a bug.

In a super atmosphere (which three processes on one machine most
doubtless is), management ought to be established fairly shortly and with out
many time period adjustments. Because the atmosphere will get extra adversarial
(e.g. processes crash regularly or community latency is excessive and
variable), management (and log replication) will take longer.

However simply because we now have chief election working when there aren’t any
logs doesn’t imply we’ll have it working after we introduce log
replication since components of voting depend upon log evaluation.

I had chief election working at one time however then it broke once I
acquired log replication working till I discovered some extra bugs in chief
election and glued them. After all, there should be bugs even

Log replication

I am going to break up log replication into 4 main items:

  1. Consumer submits a message to the chief to be replicated: s.Apply().
  2. The chief sends uncommitted messages (messages from
    nextIndex) to all followers: s.appendEntries().
  3. A follower receives a AppendEntriesRequest and shops new
    messages if applicable, letting the chief know when it does retailer
    the messages: s.HandleAppendEntriesRequest().
  4. The chief tries to replace commitIndex for the final uncommitted
    message by seeing if it has been replicated on a quorum of servers:

Let’s dig in in that order.


That is the entry level for a person of the cluster to aim to get
messages replicated into the cluster.

It should be referred to as on the present chief of the cluster. Sooner or later
the failure response would possibly embrace the present chief. Or the person
may submit messages in parallel to all nodes within the cluster and
ignore ErrApplyToLeader. Within the meantime we simply assume the person can
determine which server within the cluster is the chief.

var ErrApplyToLeader = errors.New("Can't apply message to follower, apply to chief.")

func (s *Server) Apply(instructions [][]byte) ([]ApplyResult, error) {
    if s.state != leaderState {
        return nil, ErrApplyToLeader
    s.debugf("Processing %d new entry!", len(instructions))

Subsequent we’ll retailer the message within the chief’s log together with a
Go channel that we should block on for the results of making use of
the message within the state machine after the message has been dedicated
to the cluster.

    resultChans := make([]chan ApplyResult, len(instructions))
    for i, command := vary instructions {
        resultChans[i] = make(chan ApplyResult)
        s.log = append(s.log, Entry{
            Time period:    s.currentTerm,
            Command: command,
            consequence:  resultChans[i],

    s.persist(true, len(instructions))

Then we kick off the replication course of (this is not going to block).

    s.debug("Ready to be utilized!")


After which we block till we obtain outcomes from every of the channels we created.

    // TODO: What occurs if this takes too lengthy?
    outcomes := make([]ApplyResult, len(instructions))
    var wg sync.WaitGroup
    for i, ch := vary resultChans {
        go func(i int, c chan ApplyResult) {
            outcomes[i] = <-c
        }(i, ch)


    return outcomes, nil

The fascinating factor right here is that appending entries is indifferent from
the messages we simply obtained. s.appendEntries() will most likely
embrace at the very least the messages we simply appended to our log, nevertheless it
would possibly embrace extra too if some servers should not very up-to-date. It might
even embrace lower than the messages we append to our log since we’ll
prohibit the variety of entries to ship at one time so we preserve latency


That is the meat of log replication on the chief aspect. We ship
unreplicated messages to one another server within the cluster.

By once more referring again to Determine 2 from the Raft paper we are able to see how
to mannequin the request vote request and response. Let’s flip that into
some Go sorts too.

sort AppendEntriesRequest struct {

    // So follower can redirect purchasers
    LeaderId uint64

    // Index of log entry instantly previous new ones
    PrevLogIndex uint64

    // Time period of prevLogIndex entry
    PrevLogTerm uint64

    // Log entries to retailer. Empty for heartbeat.
    Entries []Entry

    // Chief's commitIndex
    LeaderCommit uint64

sort AppendEntriesResponse struct {

    // true if follower contained entry matching prevLogIndex and
    // prevLogTerm
    Success bool

For the tactic itself, we begin optimistically sending no entries and
decrement nextIndex for every server because the server fails to duplicate
messages. Because of this we’d finally find yourself sending the
complete log to at least one or all servers.

We’ll set a max variety of entries to ship per request so we keep away from
unbounded latency as followers retailer entries to disk. However we nonetheless
wish to ship a big batch in order that we amortize the price of fsync.


func (s *Server) appendEntries() {
    for i := vary s.cluster {
        // Need not ship message to self
        if i == s.clusterIndex {

        go func(i int) {

            subsequent := s.cluster[i].nextIndex
            prevLogIndex := subsequent - 1
            prevLogTerm := s.log[prevLogIndex].Time period

            var entries []Entry
            if uint64(len(s.log)-1) >= s.cluster[i].nextIndex {
                s.debugf("len: %d, subsequent: %d, server: %d", len(s.log), subsequent, s.cluster[i].Id)
                entries = s.log[next:]

            // Hold latency down by solely making use of N at a time.
            if len(entries) > MAX_APPEND_ENTRIES_BATCH {
                entries = entries[:MAX_APPEND_ENTRIES_BATCH]

            lenEntries := uint64(len(entries))
            req := AppendEntriesRequest{
                RPCMessage: RPCMessage{
                    Time period: s.currentTerm,
                LeaderId:     s.cluster[s.clusterIndex].Id,
                PrevLogIndex: prevLogIndex,
                PrevLogTerm:  prevLogTerm,
                Entries:      entries,
                LeaderCommit: s.commitIndex,


            var rsp AppendEntriesResponse
            s.debugf("Sending %d entries to %d for time period %d.", len(entries), s.cluster[i].Id, req.Time period)
            okay := s.rpcCall(i, "Server.HandleAppendEntriesRequest", req, &rsp)
            if !okay {
                // Will retry subsequent tick

Now, as with each RPC request and response, we should test phrases and
probably drop the message if it is outdated.

            if s.updateTerm(rsp.RPCMessage) {

            dropStaleResponse := rsp.Time period != req.Time period && s.state == leaderState
            if dropStaleResponse {

In any other case, if the message was profitable, we’ll replace matchIndex
(the final confirmed message saved on the follower) and nextIndex
(the subsequent doubtless message to ship to the follower).

If the message was not profitable, we decrement nextIndex. Subsequent time
s.appendEntries() is named it should embrace yet one more earlier
message for this reproduction.

            if rsp.Success {
                prev := s.cluster[i].nextIndex
                s.cluster[i].nextIndex = max(req.PrevLogIndex+lenEntries+1, 1)
                s.cluster[i].matchIndex = s.cluster[i].nextIndex - 1
                s.debugf("Message accepted for %d. Prev Index: %d, Subsequent Index: %d, Match Index: %d.", s.cluster[i].Id, prev, s.cluster[i].nextIndex, s.cluster[i].matchIndex)
            } else {
                s.cluster[i].nextIndex = max(s.cluster[i].nextIndex-1, 1)
                s.debugf("Compelled to return to %d for: %d.", s.cluster[i].nextIndex, s.cluster[i].Id)

And we’re completed the chief aspect of append entries!


Now for the follower aspect of log replication. That is, once more, an RPC
handler that might be referred to as at any second. So we have to probably
replace the time period (and transition to follower).

func (s *Server) HandleAppendEntriesRequest(req AppendEntriesRequest, rsp *AppendEntriesResponse) error {


“Hidden” within the “Candidates (§5.2):” part of Determine 2 is an extra rule about:

If AppendEntries RPC obtained from new chief: convert to follower

So we additionally must deal with that right here. And if we’re nonetheless not a
follower, we’ll return instantly.

    // From Candidates (§5.2) in Determine 2
    // If AppendEntries RPC obtained from new chief: convert to follower
    if req.Time period == s.currentTerm && s.state == candidateState {
        s.state = followerState

    rsp.Time period = s.currentTerm
    rsp.Success = false

    if s.state != followerState {
        s.debugf("Non-follower can't append entries.")
        return nil

Subsequent, we additionally return early if the request time period is lower than our
personal. This is able to signify an outdated request.

    if req.Time period < s.currentTerm {
        s.debugf("Dropping request from outdated chief %d: time period %d.", req.LeaderId, req.Time period)
        // Not a legitimate chief.
        return nil

Now, lastly, we all know we’re receiving a request from a legitimate
chief. So we have to instantly bump the election timeout.

    // Legitimate chief so reset election.

Then we do the log comparability to see if we are able to add the entries despatched
from this request. Particularly, we make it possible for our log at
req.PrevLogIndex exists and has the identical time period as req.PrevLogTerm.

    logLen := uint64(len(s.log))
    validPreviousLog := req.PrevLogIndex == 0 /* That is the induction step */ ||
        (req.PrevLogIndex < logLen &&
            s.log[req.PrevLogIndex].Time period == req.PrevLogTerm)
    if !validPreviousLog {
        s.debug("Not a legitimate log.")
        return nil

Subsequent, we have legitimate entries that we have to add to our log. This
implementation is a bit more complicated as a result of we’ll make use of Go
slice capability in order that append() by no means allocates.

Importantly, we should truncate the log if a brand new entry ever conflicts
with an present one:

If an present entry conflicts with a brand new one (similar index
however completely different phrases), delete the prevailing entry and all that
comply with it (§5.3)

    subsequent := req.PrevLogIndex + 1
    nNewEntries := 0

    for i := subsequent; i < subsequent+uint64(len(req.Entries)); i++ {
        e := req.Entries[i-next]
        if i >= uint64(cap(s.log)) {
            newTotal := subsequent + uint64(len(req.Entries))
            // Second argument should really be `i`
            // not `0` in any other case the copy after this
            // would not work.
            // Solely copy till `i`, not `newTotal` since
            // we'll proceed appending after this.
            newLog := make([]Entry, i, newTotal*2)
            copy(newLog, s.log)
            s.log = newLog

        if i < uint64(len(s.log)) && s.log[i].Time period != e.Time period {
            prevCap := cap(s.log)
            // If an present entry conflicts with a brand new
            // one (similar index however completely different phrases),
            // delete the prevailing entry and all that
            // comply with it (§5.3)
            s.log = s.log[:i]
            Server_assert(s, "Capability stays the identical whereas we truncated.", cap(s.log), prevCap)

        s.debugf("Appending entry: %s. At index: %d.", string(e.Command), len(s.log))

        if i < uint64(len(s.log)) {
            Server_assert(s, "Present log is similar as new log", s.log[i].Time period, e.Time period)
        } else {
            s.log = append(s.log, e)
            Server_assert(s, "Size is immediately associated to the index.", uint64(len(s.log)), i+1)

Lastly, we replace the server’s native commitIndex to the min of
req.LeaderCommit and our personal log size.

And at last we persist all these adjustments and mark the response as

    if req.LeaderCommit > s.commitIndex {
        s.commitIndex = min(req.LeaderCommit, uint64(len(s.log)-1))

    s.persist(nNewEntries != 0, nNewEntries)

    rsp.Success = true
    return nil

So the mixed conduct of the chief and follower when replicating
is {that a} follower not in sync with the chief could finally go down
to the start of the log so the chief and follower have some first
N messages of the log that match.

See Also


Now when not only one follower however a quorum of followers all have a
matching first N messages, the chief can advance the cluster’s

func (s *Server) advanceCommitIndex() {

    // Chief can replace commitIndex on quorum.
    if s.state == leaderState {
        lastLogIndex := uint64(len(s.log) - 1)

        for i := lastLogIndex; i > s.commitIndex; i-- {
            quorum := len(s.cluster) / 2 + 1

            for j := vary s.cluster {
                if quorum == 0 {

                isLeader := j == s.clusterIndex
                if s.cluster[j].matchIndex >= i || isLeader {

            if quorum == 0 {
                s.commitIndex = i
                s.debugf("New commit index: %d.", i)

And for each state a server could be in, if there are messages
dedicated however not utilized, we’ll apply one right here. And importantly,
we’ll move the consequence again to the message’s consequence channel if it
exists, in order that s.Apply() can be taught concerning the consequence.

    if s.lastApplied <= s.commitIndex {
        log := s.log[s.lastApplied]

        // len(log.Command) == 0 is a noop dedicated by the chief.
        if len(log.Command) != 0 {
            s.debugf("Entry utilized: %d.", s.lastApplied)
            // TODO: what if Apply() takes too lengthy?
            res, err := s.statemachine.Apply(log.Command)

            // Can be nil for follower entries and for no-op entries.
            // Not nil for all person submitted messages.
            if log.consequence != nil {
                log.consequence <- ApplyResult{
                    Consequence: res,
                    Error:  err,



Heartbeats mix log replication and chief election. Heartbeats
stave off chief election (follower timeouts). And heartbeats additionally
deliver followers up-to-date if they’re behind.

And it is a easy methodology. If it is time to heartbeat, we name
s.appendEntries(). That is it.

func (s *Server) heartbeat() {

    timeForHeartbeat := time.Now().After(s.heartbeatTimeout)
    if timeForHeartbeat {
        s.heartbeatTimeout = time.Now().Add(time.Length(s.heartbeatMs) * time.Millisecond)
        s.debug("Sending heartbeat")

The explanation this staves off chief election is as a result of any variety of
entries (0 or N) will come from a legitimate chief and can thus trigger the
followers to reset their election timeout.

And that is the entirety of (the fundamentals of) Raft.

There are most likely bugs.

Operating kvapi

Now let’s run the key-value API.

$ cd cmd/kvapi && go construct
$ rm *.dat

Terminal 1

$ ./kvapi --node 0 --http :2020 --cluster "0,:3030;1,:3031;2,:3032"

Terminal 2

$ ./kvapi --node 1 --http :2021 --cluster "0,:3030;1,:3031;2,:3032"

Terminal 3

$ ./kvapi --node 2 --http :2022 --cluster "0,:3030;1,:3031;2,:3032"

Terminal 4

Do not forget that requests will undergo the chief (apart from if we
flip that off within the /get request). So you may should strive sending a
message to every server till you discover the chief.

To set a key:

$ curl http://localhost:2020/set?key=y&worth=hiya

To get a key:

$ curl http://localhost:2020/get?key=y

And that is that! Strive killing a server and restarting it. A brand new chief
will likely be elected so you may want to seek out the proper one to ship requests
to once more. However all present entries ought to nonetheless be there.

A take a look at rig

I will not cowl the implementation of my test
this publish however I’ll describe it.

It is nowhere close to Jepsen nevertheless it does have a selected focus:

  1. Can the cluster elect a frontrunner?
  2. Can the cluster retailer logs appropriately?
  3. Can the cluster of three nodes tolerate one node down?
  4. How briskly can it retailer N messages?
  5. Are messages recovered appropriately when the nodes shut down and begin again up?
  6. If a node’s logs are deleted, is the log for that node recovered after it’s restarted?

This implementation passes these exams and handles round 20k-40k entries/second.


This was fairly a difficult challenge. Usually once I hack on stuff
like this I’ve TV (The Simpsons) on within the background. It is form of
dumb however this was the primary challenge the place I completely couldn’t focus
with that background noise.

There are a tedious variety of circumstances and I’m not positive I acquired them
all (proper). Quite a few methods for refined bugs.

Race circumstances and deadlocks

It’s extremely straightforward to program in race circumstances. Fortunately Go has the
-race flag that detects this. This makes positive that you’re locking
learn and write entry to shared variables when essential.

On the opposite aspect of race circumstances, Go doesn’t enable you to out with:
deadlocks. As soon as you’ve got acquired locks in place for shared variables, you
must be sure to’re releasing the locks appropriately too.

Fortunately somebody wrote a swap-in alternative for the Go sync
package deal referred to as
go-deadlock. Whenever you import
this file as an alternative of the default sync package deal, it should panic and
offer you a stacktrace when it thinks you hit a impasse.

Typically it thinks you hit a impasse as a result of a way that wants a
lock takes too lengthy. Typically that point it takes is respectable (or
one thing you have not optimized but). However really it is default of
30s isn’t actually aggressive in any respect.

So I usually set the impasse timeout to 2s and finally would
wish to make that extra like 100ms:

sync.Opts.DeadlockTimeout = 2000 * time.Millisecond

It is largely the persist() perform that causes go-deadlock to
assume there is a impasse as a result of it tries to synchronously write a
bunch of information to disk.

go-deadlock is gradual

The go-deadlock package deal is extremely helpful. However do not forget to
flip it off for benchmarks. With it on I get round 4-8k
entries/second. With it off I get round 20k-40k entries/second.

Unbounded reminiscence

One other challenge on this implementation is that the log retains rising
indefinitely and all the log is duplicated in reminiscence.

There are two methods to take care of that:

  1. Implement Raft snapshotting so the log may be truncated safely.
  2. Hold just some variety of entries in reminiscence (say, 1 million) and
    learn from disk as wanted when logs have to be verified. In splendid
    operation this is able to by no means occur since ideally all servers are
    all the time on, by no means miss entries, and simply preserve appending. However “splendid”
    will not all the time occur.

Equally, there may be unbounded and unreused channel creation for
notifying s.Apply() when the user-submitted message(s) end.

web/rpc and encoding/gob

Within the persist() part above I already talked about how I prototyped
this utilizing Go’s builtin gob encoding. And I discussed how inefficient
it was. It is also fairly gradual and I realized that as a result of web/rpc
makes use of it and after every thing I did web/rpc began to be the
bottleneck in my benchmarks. This is not extremely shocking.

So a future model of this code would possibly implement its personal protocol and
personal encoding (like we did for disk) on prime of TCP slightly than use


Everybody needs to know the way a distributed algorithm does towards
Jepsen, which exams
linearizability of distributed programs within the face of community and
course of faults.

However the setup isn’t trivial so I have not hooked it as much as this challenge
but. This is able to be an excellent space for future work.

Election timeout and the atmosphere

One factor I seen as I used to be attempting out alternate options to web/rpc
(alternate options that injected latency to simulate a foul atmosphere) is
that election timeouts ought to most likely be tuned with latency of the
cluster in thoughts.

If the election timeout is each 300ms however the latency of the
cluster is close to 1s, you are going to have continuous chief election.

After I adjusted the election timeout to be each 2s when the latency
of the cluster is close to 1s, every thing was wonderful. Perhaps this implies
there is a bug in my code however I do not assume so.

Shopper request serial identifier

One main a part of the Raft protocol I didn’t cowl is that the consumer
is meant to ship a serial identifier for every message despatched to the
cluster. That is to make sure that messages should not unintentionally
duplicated at any degree of all the software program stack.

Diego Ongaro’s

goes into extra element about this than the Raft paper. Search in that
PDF for “session”.

Once more I simply utterly ignored the opportunity of duplicate messages
on this implementation thus far.


Lastly, I couldn’t have completed this with no bunch of web
assist. This challenge took me about 7 months in whole. The primary 5 months
I used to be attempting to determine it out largely by myself, simply wanting on the
Raft paper.

The most important breakthrough got here from discovering the writer of Raft’s
TLA+ spec for Raft. Formal strategies sound scary nevertheless it was really not
too dangerous! This was the primary “implementation” of Raft that was in a
single file of code. And underneath 500 strains.

Jack Vanlightly’s information to studying TLA+ helped a bunch.

Lastly, I needed to peer at different implementations, particularly to determine
out locking and avoiding deadlocks.

This is every thing that helped me out.

And helpful implementations I checked out for inspiration and readability.

  • Hashicorp’s Raft implementation in Go: Though it is typically fairly sophisticated to be taught from because it really is meant for manufacturing.
  • Eli Bendersky’s Raft implementation in Go: Though it will get complicated as a result of it makes use of detrimental numbers for phrases whereas the paper doesn’t.
  • Jing Yang’s Raft implementation in Rust.

And I have not tried these however they appear cool:


Source Link

What's Your Reaction?
In Love
Not Sure
View Comments (0)

Leave a Reply

Your email address will not be published.

2022 Blinking Robots.
WordPress by Doejo

Scroll To Top