Liftbridge relies on Raft for metadata replication and leadership election. However, for replication of streams, rather than using Raft or other quorum-based techniques, we use a technique similar to Kafka. For each stream partition, we maintain an in-sync replica set (ISR), which is all of the replicas currently up to date with the leader (at partition creation time, this is all of the replicas). During replication, the leader writes messages to a write-ahead log, and we only wait on replicas in the ISR before committing. If a replica falls behind or fails, it's removed from the ISR. If the leader fails, any replica in the ISR can take its place. If a failed replica catches back up, it rejoins the ISR. The general partition replication process is as follows:
- Client creates a partition with a
- Metadata leader selects
nreplicas to participate and one leader at random (this comprises the initial ISR).
- Metadata leader replicates the partition via Raft to the entire cluster. This
process also assigns a monotonically increasing
Epochto the partition as well as a
LeaderEpoch. All replicas store a durable cache of each
LeaderEpochand the offset of the first message for the epoch used for recovery purposes described below.
- The nodes participating in the partition initialize it, and the leader subscribes to the NATS subject.
- The leader initializes the high watermark (
HW) to -1. This is the offset of the last committed message in the partition (-1 meaning, initially, no messages are committed).
- The leader begins sequencing messages from NATS and writes them to the log uncommitted.
- Followers consume from the leader's log to replicate messages to their own
log. We piggyback the leader's
LeaderEpochon these responses. Replicas update their
LeaderEpochcache as needed and periodically checkpoint the
HWto stable storage. If the
LeaderEpochon the response does not match the expected
LeaderEpoch, the follower drops the response. The same is true if the offset of the first message in the response is less than the expected next offset. The follower uses the leader's
HWto update their own
- Followers acknowledge they've replicated the message via replication requests to the leader.
- Once the leader has heard from the ISR, the message is committed and the
Note that clients only see committed messages in the log.
When a follower has caught up with the leader's log (i.e. it reaches the
leader's log end offset, or LEO), the leader will register a data waiter for
the follower on the log. This waiter is used to signal when new messages have
been written to the log. Upon catching up, followers will receive an empty
replication response from the leader and sleep for a duration
replica.max.idle.wait to avoid overloading the leader with replication
requests. After sleeping, they will make subsequent replication requests. Even
if data is not available, this will ensure timely health checks occur to avoid
spurious ISR shrinking. If new messages are written while a follower is idle,
the data waiter is signalled which causes the leader to send a notification to
the follower to preempt the sleep and begin replicating again.
There are a variety of failures that can occur in the replication process. A few of them are described below along with how they are mitigated.
Partition Leader Failure
If a follower suspects the leader has failed, it will notify the metadata
leader. If the metadata leader receives a notification from the majority of the
ISR within a bounded period, it will select a new leader for the partition, apply
this update to the Raft group, and notify the replica set. Leader reports
include the current partition
LeaderEpoch which is checked against
by the metadata leader.
Committed messages are always preserved during a leadership change, but
uncommitted messages could be lost. When a partition leader is changed, the
Epoch is incremented.
Metadata Leader Failure
If the metadata leader fails, Raft will handle electing a new leader. The metadata Raft group stores the leader and ISR for every stream, so failover of the metadata leader is not a problem.
Partition Follower Failure
If the partition leader detects that a follower has failed or fallen too far
behind, it removes the replica from the ISR by notifying the metadata leader.
This results in incrementing the partition's
Epoch. The metadata leader
replicates this fact via Raft. The partition leader continues to commit messages
with fewer replicas in the ISR, entering an under-replicated state.
When a failed follower is restarted, it requests the last offset for the
LeaderEpoch from the partition leader, called a
request. It then truncates its log up to this offset, which removes any
potentially uncommitted messages from the log. The replica then begins
fetching messages from the leader starting at this offset. Once the replica has
caught up, it's added back into the ISR and the system resumes its fully
replicated state. Adding a replica back into the ISR results in incrementing
We truncate uncommitted messages from the log using the
than the HW because there are certain edge cases that can result in data loss
or divergent log lineages with the latter. This has been described in
by the maintainers of Kafka.
Replication RPC Protocol
Replication RPCs are made over internal NATS subjects. Replication requests for
a partition are sent to
<namespace>.<stream>.<partition>.replicate which is a
subject the partition leader subscribes to. Request and response payloads are
prefixed with the Liftbridge envelope header. The
request data is a protobuf
containing the ID of the follower and the offset they want to begin fetching
from. The NATS message also includes a random reply
leader uses to send the response to.
The leader response uses a binary format consisting of the following:
0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 +-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+... | LeaderEpoch | HW |... +-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+... |<-------------------8 bytes------------------->|<-------------------8 bytes------------------->|
The remainder of the response consists of message data. If the follower is
caught up, there won't be any message data. The
always present, so there are 16 bytes guaranteed in the response after the
LeaderEpoch offset requests also use internal NATS subjects similar to
replication RPCs. These requests are sent to
<namespace>.<stream>.<partition>.offset. The request and response both use a
LeaderEpoch and offset, respectively. Like replication,
responses are sent to a random reply subject included on the NATS request
Notifications of new data are sent on the NATS subject
<namespace>.notify.<serverID>. This is also a protobuf containing the stream
name and ID of the partition with new data available. Upon receiving this
notification, the follower preempts the replication loop for the respective
partition, if idle, to begin replicating.