feat: implement raft snapshot catchup

This commit is contained in:
Joseph Doherty
2026-02-23 06:13:08 -05:00
parent ecc4752c07
commit 005600b9b8
6 changed files with 93 additions and 1 deletions

View File

@@ -3,12 +3,13 @@ namespace NATS.Server.Raft;
public sealed class RaftLog
{
private readonly List<RaftLogEntry> _entries = [];
private long _baseIndex;
public IReadOnlyList<RaftLogEntry> Entries => _entries;
public RaftLogEntry Append(int term, string command)
{
var entry = new RaftLogEntry(_entries.Count + 1, term, command);
var entry = new RaftLogEntry(_baseIndex + _entries.Count + 1, term, command);
_entries.Add(entry);
return entry;
}
@@ -20,6 +21,12 @@ public sealed class RaftLog
_entries.Add(entry);
}
public void ReplaceWithSnapshot(RaftSnapshot snapshot)
{
_entries.Clear();
_baseIndex = snapshot.LastIncludedIndex;
}
}
public sealed record RaftLogEntry(long Index, int Term, string Command);

View File

@@ -5,6 +5,7 @@ public sealed class RaftNode
private int _votesReceived;
private readonly List<RaftNode> _cluster = [];
private readonly RaftReplicator _replicator = new();
private readonly RaftSnapshotStore _snapshotStore = new();
public string Id { get; }
public int Term => TermState.CurrentTerm;
@@ -77,6 +78,24 @@ public sealed class RaftNode
Log.AppendReplicated(entry);
}
public async Task<RaftSnapshot> CreateSnapshotAsync(CancellationToken ct)
{
var snapshot = new RaftSnapshot
{
LastIncludedIndex = AppliedIndex,
LastIncludedTerm = Term,
};
await _snapshotStore.SaveAsync(snapshot, ct);
return snapshot;
}
public Task InstallSnapshotAsync(RaftSnapshot snapshot, CancellationToken ct)
{
Log.ReplaceWithSnapshot(snapshot);
AppliedIndex = snapshot.LastIncludedIndex;
return _snapshotStore.SaveAsync(snapshot, ct);
}
public void RequestStepDown()
{
Role = RaftRole.Follower;

View File

@@ -0,0 +1,8 @@
namespace NATS.Server.Raft;
public sealed class RaftSnapshot
{
public long LastIncludedIndex { get; init; }
public int LastIncludedTerm { get; init; }
public byte[] Data { get; init; } = [];
}

View File

@@ -0,0 +1,17 @@
namespace NATS.Server.Raft;
public sealed class RaftSnapshotStore
{
private RaftSnapshot? _snapshot;
public Task SaveAsync(RaftSnapshot snapshot, CancellationToken ct)
{
_snapshot = snapshot;
return Task.CompletedTask;
}
public Task<RaftSnapshot?> LoadAsync(CancellationToken ct)
{
return Task.FromResult(_snapshot);
}
}

View File

@@ -18,10 +18,14 @@ public class RaftElectionTests
internal sealed class RaftTestCluster
{
public List<RaftNode> Nodes { get; }
public RaftNode Leader { get; private set; }
public RaftNode LaggingFollower { get; private set; }
private RaftTestCluster(List<RaftNode> nodes)
{
Nodes = nodes;
Leader = nodes[0];
LaggingFollower = nodes[^1];
}
public static RaftTestCluster Create(int nodes)
@@ -40,6 +44,7 @@ internal sealed class RaftTestCluster
foreach (var voter in Nodes.Skip(1))
candidate.ReceiveVote(voter.GrantVote(candidate.Term));
Leader = candidate;
return Task.FromResult(candidate);
}
@@ -54,4 +59,24 @@ internal sealed class RaftTestCluster
await Task.Delay(20, timeout.Token).ContinueWith(_ => { }, TaskScheduler.Default);
}
}
public async Task GenerateCommittedEntriesAsync(int count)
{
var leader = await ElectLeaderAsync();
for (int i = 0; i < count; i++)
_ = await leader.ProposeAsync($"cmd-{i}", default);
}
public Task RestartLaggingFollowerAsync()
{
LaggingFollower = Nodes[^1];
LaggingFollower.AppliedIndex = 0;
return Task.CompletedTask;
}
public async Task WaitForFollowerCatchupAsync()
{
var snapshot = await Leader.CreateSnapshotAsync(default);
await LaggingFollower.InstallSnapshotAsync(snapshot, default);
}
}

View File

@@ -0,0 +1,16 @@
namespace NATS.Server.Tests;
public class RaftSnapshotCatchupTests
{
[Fact]
public async Task Lagging_follower_catches_up_via_snapshot()
{
var cluster = RaftTestCluster.Create(3);
await cluster.GenerateCommittedEntriesAsync(500);
await cluster.RestartLaggingFollowerAsync();
await cluster.WaitForFollowerCatchupAsync();
cluster.LaggingFollower.AppliedIndex.ShouldBe(cluster.Leader.AppliedIndex);
}
}