feat: add stream replica groups and leader stepdown
This commit is contained in:
65
src/NATS.Server/JetStream/Cluster/StreamReplicaGroup.cs
Normal file
65
src/NATS.Server/JetStream/Cluster/StreamReplicaGroup.cs
Normal file
@@ -0,0 +1,65 @@
|
|||||||
|
using NATS.Server.Raft;
|
||||||
|
|
||||||
|
namespace NATS.Server.JetStream.Cluster;
|
||||||
|
|
||||||
|
public sealed class StreamReplicaGroup
|
||||||
|
{
|
||||||
|
private readonly List<RaftNode> _nodes;
|
||||||
|
|
||||||
|
public string StreamName { get; }
|
||||||
|
public IReadOnlyList<RaftNode> Nodes => _nodes;
|
||||||
|
public RaftNode Leader { get; private set; }
|
||||||
|
|
||||||
|
public StreamReplicaGroup(string streamName, int replicas)
|
||||||
|
{
|
||||||
|
StreamName = streamName;
|
||||||
|
|
||||||
|
var nodeCount = Math.Max(replicas, 1);
|
||||||
|
_nodes = Enumerable.Range(1, nodeCount)
|
||||||
|
.Select(i => new RaftNode($"{streamName.ToLowerInvariant()}-r{i}"))
|
||||||
|
.ToList();
|
||||||
|
|
||||||
|
foreach (var node in _nodes)
|
||||||
|
node.ConfigureCluster(_nodes);
|
||||||
|
|
||||||
|
Leader = ElectLeader(_nodes[0]);
|
||||||
|
}
|
||||||
|
|
||||||
|
public async ValueTask<long> ProposeAsync(string command, CancellationToken ct)
|
||||||
|
{
|
||||||
|
if (!Leader.IsLeader)
|
||||||
|
Leader = ElectLeader(SelectNextCandidate(Leader));
|
||||||
|
|
||||||
|
return await Leader.ProposeAsync(command, ct);
|
||||||
|
}
|
||||||
|
|
||||||
|
public Task StepDownAsync(CancellationToken ct)
|
||||||
|
{
|
||||||
|
_ = ct;
|
||||||
|
var previous = Leader;
|
||||||
|
previous.RequestStepDown();
|
||||||
|
Leader = ElectLeader(SelectNextCandidate(previous));
|
||||||
|
return Task.CompletedTask;
|
||||||
|
}
|
||||||
|
|
||||||
|
private RaftNode SelectNextCandidate(RaftNode currentLeader)
|
||||||
|
{
|
||||||
|
if (_nodes.Count == 1)
|
||||||
|
return _nodes[0];
|
||||||
|
|
||||||
|
var index = _nodes.FindIndex(n => n.Id == currentLeader.Id);
|
||||||
|
if (index < 0)
|
||||||
|
return _nodes[0];
|
||||||
|
|
||||||
|
return _nodes[(index + 1) % _nodes.Count];
|
||||||
|
}
|
||||||
|
|
||||||
|
private RaftNode ElectLeader(RaftNode candidate)
|
||||||
|
{
|
||||||
|
candidate.StartElection(_nodes.Count);
|
||||||
|
foreach (var voter in _nodes.Where(n => n.Id != candidate.Id))
|
||||||
|
candidate.ReceiveVote(voter.GrantVote(candidate.Term), _nodes.Count);
|
||||||
|
|
||||||
|
return candidate;
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -14,6 +14,8 @@ public sealed class StreamManager
|
|||||||
private readonly JetStreamMetaGroup? _metaGroup;
|
private readonly JetStreamMetaGroup? _metaGroup;
|
||||||
private readonly ConcurrentDictionary<string, StreamHandle> _streams =
|
private readonly ConcurrentDictionary<string, StreamHandle> _streams =
|
||||||
new(StringComparer.Ordinal);
|
new(StringComparer.Ordinal);
|
||||||
|
private readonly ConcurrentDictionary<string, StreamReplicaGroup> _replicaGroups =
|
||||||
|
new(StringComparer.Ordinal);
|
||||||
private readonly ConcurrentDictionary<string, List<MirrorCoordinator>> _mirrorsByOrigin =
|
private readonly ConcurrentDictionary<string, List<MirrorCoordinator>> _mirrorsByOrigin =
|
||||||
new(StringComparer.Ordinal);
|
new(StringComparer.Ordinal);
|
||||||
private readonly ConcurrentDictionary<string, List<SourceCoordinator>> _sourcesByOrigin =
|
private readonly ConcurrentDictionary<string, List<SourceCoordinator>> _sourcesByOrigin =
|
||||||
@@ -36,6 +38,12 @@ public sealed class StreamManager
|
|||||||
normalized.Name,
|
normalized.Name,
|
||||||
_ => new StreamHandle(normalized, new MemStore()),
|
_ => new StreamHandle(normalized, new MemStore()),
|
||||||
(_, existing) => existing with { Config = normalized });
|
(_, existing) => existing with { Config = normalized });
|
||||||
|
_replicaGroups.AddOrUpdate(
|
||||||
|
normalized.Name,
|
||||||
|
_ => new StreamReplicaGroup(normalized.Name, normalized.Replicas),
|
||||||
|
(_, existing) => existing.Nodes.Count == Math.Max(normalized.Replicas, 1)
|
||||||
|
? existing
|
||||||
|
: new StreamReplicaGroup(normalized.Name, normalized.Replicas));
|
||||||
RebuildReplicationCoordinators();
|
RebuildReplicationCoordinators();
|
||||||
_metaGroup?.ProposeCreateStreamAsync(normalized, default).GetAwaiter().GetResult();
|
_metaGroup?.ProposeCreateStreamAsync(normalized, default).GetAwaiter().GetResult();
|
||||||
|
|
||||||
@@ -77,6 +85,9 @@ public sealed class StreamManager
|
|||||||
if (stream == null)
|
if (stream == null)
|
||||||
return null;
|
return null;
|
||||||
|
|
||||||
|
if (_replicaGroups.TryGetValue(stream.Config.Name, out var replicaGroup))
|
||||||
|
_ = replicaGroup.ProposeAsync($"PUB {subject}", default).GetAwaiter().GetResult();
|
||||||
|
|
||||||
var seq = stream.Store.AppendAsync(subject, payload, default).GetAwaiter().GetResult();
|
var seq = stream.Store.AppendAsync(subject, payload, default).GetAwaiter().GetResult();
|
||||||
EnforceLimits(stream);
|
EnforceLimits(stream);
|
||||||
var stored = stream.Store.LoadAsync(seq, default).GetAwaiter().GetResult();
|
var stored = stream.Store.LoadAsync(seq, default).GetAwaiter().GetResult();
|
||||||
@@ -90,6 +101,14 @@ public sealed class StreamManager
|
|||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Task StepDownStreamLeaderAsync(string stream, CancellationToken ct)
|
||||||
|
{
|
||||||
|
if (_replicaGroups.TryGetValue(stream, out var replicaGroup))
|
||||||
|
return replicaGroup.StepDownAsync(ct);
|
||||||
|
|
||||||
|
return Task.CompletedTask;
|
||||||
|
}
|
||||||
|
|
||||||
private static StreamConfig NormalizeConfig(StreamConfig config)
|
private static StreamConfig NormalizeConfig(StreamConfig config)
|
||||||
{
|
{
|
||||||
var copy = new StreamConfig
|
var copy = new StreamConfig
|
||||||
|
|||||||
@@ -9,6 +9,7 @@ public sealed class RaftNode
|
|||||||
|
|
||||||
public string Id { get; }
|
public string Id { get; }
|
||||||
public int Term => TermState.CurrentTerm;
|
public int Term => TermState.CurrentTerm;
|
||||||
|
public bool IsLeader => Role == RaftRole.Leader;
|
||||||
public RaftRole Role { get; private set; } = RaftRole.Follower;
|
public RaftRole Role { get; private set; } = RaftRole.Follower;
|
||||||
public RaftTermState TermState { get; } = new();
|
public RaftTermState TermState { get; } = new();
|
||||||
public long AppliedIndex { get; set; }
|
public long AppliedIndex { get; set; }
|
||||||
@@ -99,6 +100,8 @@ public sealed class RaftNode
|
|||||||
public void RequestStepDown()
|
public void RequestStepDown()
|
||||||
{
|
{
|
||||||
Role = RaftRole.Follower;
|
Role = RaftRole.Follower;
|
||||||
|
_votesReceived = 0;
|
||||||
|
TermState.VotedFor = null;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void TryBecomeLeader(int clusterSize)
|
private void TryBecomeLeader(int clusterSize)
|
||||||
|
|||||||
71
tests/NATS.Server.Tests/JetStreamStreamReplicaGroupTests.cs
Normal file
71
tests/NATS.Server.Tests/JetStreamStreamReplicaGroupTests.cs
Normal file
@@ -0,0 +1,71 @@
|
|||||||
|
using System.Text;
|
||||||
|
using NATS.Server.JetStream;
|
||||||
|
using NATS.Server.JetStream.Models;
|
||||||
|
using NATS.Server.JetStream.Publish;
|
||||||
|
|
||||||
|
namespace NATS.Server.Tests;
|
||||||
|
|
||||||
|
public class JetStreamStreamReplicaGroupTests
|
||||||
|
{
|
||||||
|
[Fact]
|
||||||
|
public async Task Leader_stepdown_preserves_stream_write_availability_after_new_election()
|
||||||
|
{
|
||||||
|
await using var fixture = await JetStreamReplicaFixture.StartAsync(nodes: 3);
|
||||||
|
await fixture.CreateStreamAsync("ORDERS", replicas: 3);
|
||||||
|
|
||||||
|
await fixture.StepDownStreamLeaderAsync("ORDERS");
|
||||||
|
var ack = await fixture.PublishAndGetAckAsync("orders.created", "1");
|
||||||
|
|
||||||
|
ack.Stream.ShouldBe("ORDERS");
|
||||||
|
ack.Seq.ShouldBeGreaterThan((ulong)0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
internal sealed class JetStreamReplicaFixture : IAsyncDisposable
|
||||||
|
{
|
||||||
|
private readonly StreamManager _streamManager;
|
||||||
|
private readonly JetStreamPublisher _publisher;
|
||||||
|
|
||||||
|
private JetStreamReplicaFixture(StreamManager streamManager)
|
||||||
|
{
|
||||||
|
_streamManager = streamManager;
|
||||||
|
_publisher = new JetStreamPublisher(_streamManager);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static Task<JetStreamReplicaFixture> StartAsync(int nodes)
|
||||||
|
{
|
||||||
|
_ = nodes;
|
||||||
|
var streamManager = new StreamManager();
|
||||||
|
return Task.FromResult(new JetStreamReplicaFixture(streamManager));
|
||||||
|
}
|
||||||
|
|
||||||
|
public Task CreateStreamAsync(string name, int replicas)
|
||||||
|
{
|
||||||
|
var response = _streamManager.CreateOrUpdate(new StreamConfig
|
||||||
|
{
|
||||||
|
Name = name,
|
||||||
|
Subjects = ["orders.*"],
|
||||||
|
Replicas = replicas,
|
||||||
|
});
|
||||||
|
|
||||||
|
if (response.Error is not null)
|
||||||
|
throw new InvalidOperationException(response.Error.Description);
|
||||||
|
|
||||||
|
return Task.CompletedTask;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Task StepDownStreamLeaderAsync(string stream)
|
||||||
|
{
|
||||||
|
return _streamManager.StepDownStreamLeaderAsync(stream, default);
|
||||||
|
}
|
||||||
|
|
||||||
|
public Task<PubAck> PublishAndGetAckAsync(string subject, string payload)
|
||||||
|
{
|
||||||
|
if (_publisher.TryCapture(subject, Encoding.UTF8.GetBytes(payload), null, out var ack))
|
||||||
|
return Task.FromResult(ack);
|
||||||
|
|
||||||
|
throw new InvalidOperationException("Publish did not match a stream.");
|
||||||
|
}
|
||||||
|
|
||||||
|
public ValueTask DisposeAsync() => ValueTask.CompletedTask;
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user