diff --git a/src/NATS.Server/Raft/RaftLog.cs b/src/NATS.Server/Raft/RaftLog.cs new file mode 100644 index 0000000..949cd9f --- /dev/null +++ b/src/NATS.Server/Raft/RaftLog.cs @@ -0,0 +1,25 @@ +namespace NATS.Server.Raft; + +public sealed class RaftLog +{ + private readonly List _entries = []; + + public IReadOnlyList Entries => _entries; + + public RaftLogEntry Append(int term, string command) + { + var entry = new RaftLogEntry(_entries.Count + 1, term, command); + _entries.Add(entry); + return entry; + } + + public void AppendReplicated(RaftLogEntry entry) + { + if (_entries.Any(e => e.Index == entry.Index)) + return; + + _entries.Add(entry); + } +} + +public sealed record RaftLogEntry(long Index, int Term, string Command); diff --git a/src/NATS.Server/Raft/RaftNode.cs b/src/NATS.Server/Raft/RaftNode.cs index b49f005..ecb89e7 100644 --- a/src/NATS.Server/Raft/RaftNode.cs +++ b/src/NATS.Server/Raft/RaftNode.cs @@ -3,18 +3,27 @@ namespace NATS.Server.Raft; public sealed class RaftNode { private int _votesReceived; + private readonly List _cluster = []; + private readonly RaftReplicator _replicator = new(); public string Id { get; } public int Term => TermState.CurrentTerm; public RaftRole Role { get; private set; } = RaftRole.Follower; public RaftTermState TermState { get; } = new(); public long AppliedIndex { get; set; } + public RaftLog Log { get; } = new(); public RaftNode(string id) { Id = id; } + public void ConfigureCluster(IEnumerable peers) + { + _cluster.Clear(); + _cluster.AddRange(peers); + } + public void StartElection(int clusterSize) { Role = RaftRole.Candidate; @@ -42,15 +51,41 @@ public sealed class RaftNode TryBecomeLeader(clusterSize); } - private void TryBecomeLeader(int clusterSize) + public async ValueTask ProposeAsync(string command, CancellationToken ct) { - var quorum = (clusterSize / 2) + 1; - if (_votesReceived >= quorum) - Role = RaftRole.Leader; + if (Role != RaftRole.Leader) + throw new InvalidOperationException("Only leader can propose entries."); + + var entry = Log.Append(TermState.CurrentTerm, command); + var followers = _cluster.Where(n => n.Id != Id).ToList(); + var acknowledgements = _replicator.Replicate(entry, followers); + + var quorum = (_cluster.Count / 2) + 1; + if (acknowledgements + 1 >= quorum) + { + AppliedIndex = entry.Index; + foreach (var node in _cluster) + node.AppliedIndex = Math.Max(node.AppliedIndex, entry.Index); + } + + await Task.CompletedTask; + return entry.Index; + } + + public void ReceiveReplicatedEntry(RaftLogEntry entry) + { + Log.AppendReplicated(entry); } public void RequestStepDown() { Role = RaftRole.Follower; } + + private void TryBecomeLeader(int clusterSize) + { + var quorum = (clusterSize / 2) + 1; + if (_votesReceived >= quorum) + Role = RaftRole.Leader; + } } diff --git a/src/NATS.Server/Raft/RaftReplicator.cs b/src/NATS.Server/Raft/RaftReplicator.cs new file mode 100644 index 0000000..e261f7c --- /dev/null +++ b/src/NATS.Server/Raft/RaftReplicator.cs @@ -0,0 +1,16 @@ +namespace NATS.Server.Raft; + +public sealed class RaftReplicator +{ + public int Replicate(RaftLogEntry entry, IReadOnlyList followers) + { + var acknowledgements = 0; + foreach (var follower in followers) + { + follower.ReceiveReplicatedEntry(entry); + acknowledgements++; + } + + return acknowledgements; + } +} diff --git a/tests/NATS.Server.Tests/RaftElectionTests.cs b/tests/NATS.Server.Tests/RaftElectionTests.cs index 9925dab..a81cc13 100644 --- a/tests/NATS.Server.Tests/RaftElectionTests.cs +++ b/tests/NATS.Server.Tests/RaftElectionTests.cs @@ -27,6 +27,8 @@ internal sealed class RaftTestCluster public static RaftTestCluster Create(int nodes) { var created = Enumerable.Range(1, nodes).Select(i => new RaftNode($"n{i}")).ToList(); + foreach (var node in created) + node.ConfigureCluster(created); return new RaftTestCluster(created); } @@ -40,4 +42,16 @@ internal sealed class RaftTestCluster return Task.FromResult(candidate); } + + public async Task WaitForAppliedAsync(long index) + { + using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(2)); + while (!timeout.IsCancellationRequested) + { + if (Nodes.All(n => n.AppliedIndex >= index)) + return; + + await Task.Delay(20, timeout.Token).ContinueWith(_ => { }, TaskScheduler.Default); + } + } } diff --git a/tests/NATS.Server.Tests/RaftReplicationTests.cs b/tests/NATS.Server.Tests/RaftReplicationTests.cs new file mode 100644 index 0000000..5e44717 --- /dev/null +++ b/tests/NATS.Server.Tests/RaftReplicationTests.cs @@ -0,0 +1,19 @@ +using NATS.Server.Raft; + +namespace NATS.Server.Tests; + +public class RaftReplicationTests +{ + [Fact] + public async Task Leader_replicates_entry_to_quorum_and_applies() + { + var cluster = RaftTestCluster.Create(3); + var leader = await cluster.ElectLeaderAsync(); + + var idx = await leader.ProposeAsync("create-stream", default); + idx.ShouldBeGreaterThan(0); + + await cluster.WaitForAppliedAsync(idx); + cluster.Nodes.All(n => n.AppliedIndex >= idx).ShouldBeTrue(); + } +}