diff --git a/src/NATS.Server/JetStream/Cluster/JetStreamMetaGroup.cs b/src/NATS.Server/JetStream/Cluster/JetStreamMetaGroup.cs index 4cf0bad..0dc8db3 100644 --- a/src/NATS.Server/JetStream/Cluster/JetStreamMetaGroup.cs +++ b/src/NATS.Server/JetStream/Cluster/JetStreamMetaGroup.cs @@ -7,6 +7,8 @@ public sealed class JetStreamMetaGroup { private readonly int _nodes; private readonly ConcurrentDictionary _streams = new(StringComparer.Ordinal); + private int _leaderIndex = 1; + private long _leadershipVersion = 1; public JetStreamMetaGroup(int nodes) { @@ -25,13 +27,18 @@ public sealed class JetStreamMetaGroup { Streams = _streams.Keys.OrderBy(x => x, StringComparer.Ordinal).ToArray(), ClusterSize = _nodes, + LeaderId = $"meta-{_leaderIndex}", + LeadershipVersion = _leadershipVersion, }; } public void StepDown() { - // Placeholder for parity API behavior; current in-memory meta group - // does not track explicit leader state. + _leaderIndex++; + if (_leaderIndex > Math.Max(_nodes, 1)) + _leaderIndex = 1; + + Interlocked.Increment(ref _leadershipVersion); } } @@ -39,4 +46,6 @@ public sealed class MetaGroupState { public IReadOnlyList Streams { get; init; } = []; public int ClusterSize { get; init; } + public string LeaderId { get; init; } = string.Empty; + public long LeadershipVersion { get; init; } } diff --git a/src/NATS.Server/JetStream/StreamManager.cs b/src/NATS.Server/JetStream/StreamManager.cs index 4293ccf..0f67f61 100644 --- a/src/NATS.Server/JetStream/StreamManager.cs +++ b/src/NATS.Server/JetStream/StreamManager.cs @@ -34,6 +34,7 @@ public sealed class StreamManager } public IReadOnlyCollection StreamNames => _streams.Keys.ToArray(); + public MetaGroupState? GetMetaState() => _metaGroup?.GetState(); public IReadOnlyList ListNames() => [.. _streams.Keys.OrderBy(x => x, StringComparer.Ordinal)]; diff --git a/tests/NATS.Server.Tests/JetStream/JetStreamMetaGovernanceStrictParityTests.cs b/tests/NATS.Server.Tests/JetStream/JetStreamMetaGovernanceStrictParityTests.cs new file mode 100644 index 0000000..59686b1 --- /dev/null +++ b/tests/NATS.Server.Tests/JetStream/JetStreamMetaGovernanceStrictParityTests.cs @@ -0,0 +1,25 @@ +using NATS.Server.JetStream.Cluster; + +namespace NATS.Server.Tests.JetStream; + +public class JetStreamMetaGovernanceStrictParityTests +{ + [Fact] + public async Task Meta_and_replica_governance_actions_reflect_committed_state_transitions() + { + var meta = new JetStreamMetaGroup(3); + var before = meta.GetState(); + + await meta.ProposeCreateStreamAsync(new NATS.Server.JetStream.Models.StreamConfig + { + Name = "ORDERS_GOV", + Subjects = ["orders.gov"], + }, default); + meta.StepDown(); + var after = meta.GetState(); + + after.Streams.ShouldContain("ORDERS_GOV"); + after.LeaderId.ShouldNotBe(before.LeaderId); + after.LeadershipVersion.ShouldBe(before.LeadershipVersion + 1); + } +} diff --git a/tests/NATS.Server.Tests/JetStream/JetStreamReplicaGovernanceStrictParityTests.cs b/tests/NATS.Server.Tests/JetStream/JetStreamReplicaGovernanceStrictParityTests.cs new file mode 100644 index 0000000..b2e8da8 --- /dev/null +++ b/tests/NATS.Server.Tests/JetStream/JetStreamReplicaGovernanceStrictParityTests.cs @@ -0,0 +1,19 @@ +using NATS.Server.JetStream.Cluster; + +namespace NATS.Server.Tests.JetStream; + +public class JetStreamReplicaGovernanceStrictParityTests +{ + [Fact] + public async Task Meta_and_replica_governance_actions_reflect_committed_state_transitions() + { + var group = new StreamReplicaGroup("ORDERS_GOV", replicas: 3); + var beforeLeader = group.Leader.Id; + + await group.StepDownAsync(default); + group.Leader.Id.ShouldNotBe(beforeLeader); + + var index = await group.ProposeAsync("set placement", default); + index.ShouldBeGreaterThan(0); + } +}