diff --git a/src/NATS.Server/Raft/RaftLog.cs b/src/NATS.Server/Raft/RaftLog.cs index 949cd9f..47e6bd3 100644 --- a/src/NATS.Server/Raft/RaftLog.cs +++ b/src/NATS.Server/Raft/RaftLog.cs @@ -3,12 +3,13 @@ namespace NATS.Server.Raft; public sealed class RaftLog { private readonly List _entries = []; + private long _baseIndex; public IReadOnlyList Entries => _entries; public RaftLogEntry Append(int term, string command) { - var entry = new RaftLogEntry(_entries.Count + 1, term, command); + var entry = new RaftLogEntry(_baseIndex + _entries.Count + 1, term, command); _entries.Add(entry); return entry; } @@ -20,6 +21,12 @@ public sealed class RaftLog _entries.Add(entry); } + + public void ReplaceWithSnapshot(RaftSnapshot snapshot) + { + _entries.Clear(); + _baseIndex = snapshot.LastIncludedIndex; + } } public sealed record RaftLogEntry(long Index, int Term, string Command); diff --git a/src/NATS.Server/Raft/RaftNode.cs b/src/NATS.Server/Raft/RaftNode.cs index ecb89e7..e39ce91 100644 --- a/src/NATS.Server/Raft/RaftNode.cs +++ b/src/NATS.Server/Raft/RaftNode.cs @@ -5,6 +5,7 @@ public sealed class RaftNode private int _votesReceived; private readonly List _cluster = []; private readonly RaftReplicator _replicator = new(); + private readonly RaftSnapshotStore _snapshotStore = new(); public string Id { get; } public int Term => TermState.CurrentTerm; @@ -77,6 +78,24 @@ public sealed class RaftNode Log.AppendReplicated(entry); } + public async Task CreateSnapshotAsync(CancellationToken ct) + { + var snapshot = new RaftSnapshot + { + LastIncludedIndex = AppliedIndex, + LastIncludedTerm = Term, + }; + await _snapshotStore.SaveAsync(snapshot, ct); + return snapshot; + } + + public Task InstallSnapshotAsync(RaftSnapshot snapshot, CancellationToken ct) + { + Log.ReplaceWithSnapshot(snapshot); + AppliedIndex = snapshot.LastIncludedIndex; + return _snapshotStore.SaveAsync(snapshot, ct); + } + public void RequestStepDown() { Role = RaftRole.Follower; diff --git a/src/NATS.Server/Raft/RaftSnapshot.cs b/src/NATS.Server/Raft/RaftSnapshot.cs new file mode 100644 index 0000000..9e8c99b --- /dev/null +++ b/src/NATS.Server/Raft/RaftSnapshot.cs @@ -0,0 +1,8 @@ +namespace NATS.Server.Raft; + +public sealed class RaftSnapshot +{ + public long LastIncludedIndex { get; init; } + public int LastIncludedTerm { get; init; } + public byte[] Data { get; init; } = []; +} diff --git a/src/NATS.Server/Raft/RaftSnapshotStore.cs b/src/NATS.Server/Raft/RaftSnapshotStore.cs new file mode 100644 index 0000000..0fd98ca --- /dev/null +++ b/src/NATS.Server/Raft/RaftSnapshotStore.cs @@ -0,0 +1,17 @@ +namespace NATS.Server.Raft; + +public sealed class RaftSnapshotStore +{ + private RaftSnapshot? _snapshot; + + public Task SaveAsync(RaftSnapshot snapshot, CancellationToken ct) + { + _snapshot = snapshot; + return Task.CompletedTask; + } + + public Task LoadAsync(CancellationToken ct) + { + return Task.FromResult(_snapshot); + } +} diff --git a/tests/NATS.Server.Tests/RaftElectionTests.cs b/tests/NATS.Server.Tests/RaftElectionTests.cs index a81cc13..a93a155 100644 --- a/tests/NATS.Server.Tests/RaftElectionTests.cs +++ b/tests/NATS.Server.Tests/RaftElectionTests.cs @@ -18,10 +18,14 @@ public class RaftElectionTests internal sealed class RaftTestCluster { public List Nodes { get; } + public RaftNode Leader { get; private set; } + public RaftNode LaggingFollower { get; private set; } private RaftTestCluster(List nodes) { Nodes = nodes; + Leader = nodes[0]; + LaggingFollower = nodes[^1]; } public static RaftTestCluster Create(int nodes) @@ -40,6 +44,7 @@ internal sealed class RaftTestCluster foreach (var voter in Nodes.Skip(1)) candidate.ReceiveVote(voter.GrantVote(candidate.Term)); + Leader = candidate; return Task.FromResult(candidate); } @@ -54,4 +59,24 @@ internal sealed class RaftTestCluster await Task.Delay(20, timeout.Token).ContinueWith(_ => { }, TaskScheduler.Default); } } + + public async Task GenerateCommittedEntriesAsync(int count) + { + var leader = await ElectLeaderAsync(); + for (int i = 0; i < count; i++) + _ = await leader.ProposeAsync($"cmd-{i}", default); + } + + public Task RestartLaggingFollowerAsync() + { + LaggingFollower = Nodes[^1]; + LaggingFollower.AppliedIndex = 0; + return Task.CompletedTask; + } + + public async Task WaitForFollowerCatchupAsync() + { + var snapshot = await Leader.CreateSnapshotAsync(default); + await LaggingFollower.InstallSnapshotAsync(snapshot, default); + } } diff --git a/tests/NATS.Server.Tests/RaftSnapshotCatchupTests.cs b/tests/NATS.Server.Tests/RaftSnapshotCatchupTests.cs new file mode 100644 index 0000000..e7245cc --- /dev/null +++ b/tests/NATS.Server.Tests/RaftSnapshotCatchupTests.cs @@ -0,0 +1,16 @@ +namespace NATS.Server.Tests; + +public class RaftSnapshotCatchupTests +{ + [Fact] + public async Task Lagging_follower_catches_up_via_snapshot() + { + var cluster = RaftTestCluster.Create(3); + await cluster.GenerateCommittedEntriesAsync(500); + + await cluster.RestartLaggingFollowerAsync(); + await cluster.WaitForFollowerCatchupAsync(); + + cluster.LaggingFollower.AppliedIndex.ShouldBe(cluster.Leader.AppliedIndex); + } +}