diff --git a/src/NATS.Server/JetStream/Api/JetStreamApiRouter.cs b/src/NATS.Server/JetStream/Api/JetStreamApiRouter.cs index bb4241b..9ca53c9 100644 --- a/src/NATS.Server/JetStream/Api/JetStreamApiRouter.cs +++ b/src/NATS.Server/JetStream/Api/JetStreamApiRouter.cs @@ -102,15 +102,11 @@ public sealed class JetStreamApiRouter public JetStreamApiResponse Route(string subject, ReadOnlySpan payload) { - // TODO: Re-enable leader check once ForwardToLeader is implemented with actual - // request forwarding to the leader node. Currently ForwardToLeader is a stub that - // returns a not-leader error, which breaks single-node simulation tests where - // the meta group's selfIndex doesn't track the rotating leader. // Go reference: jetstream_api.go:200-300 — leader check + forwarding. - // if (_metaGroup is not null && IsLeaderRequired(subject) && !_metaGroup.IsLeader()) - // { - // return ForwardToLeader(subject, payload, _metaGroup.Leader); - // } + if (_metaGroup is not null && IsLeaderRequired(subject) && !_metaGroup.IsLeader()) + { + return ForwardToLeader(subject, payload, _metaGroup.Leader); + } if (subject.Equals(JetStreamApiSubjects.Info, StringComparison.Ordinal)) return AccountApiHandlers.HandleInfo(_streamManager, _consumerManager); diff --git a/src/NATS.Server/JetStream/Cluster/JetStreamMetaGroup.cs b/src/NATS.Server/JetStream/Cluster/JetStreamMetaGroup.cs index b2e9783..1732ec2 100644 --- a/src/NATS.Server/JetStream/Cluster/JetStreamMetaGroup.cs +++ b/src/NATS.Server/JetStream/Cluster/JetStreamMetaGroup.cs @@ -12,7 +12,7 @@ namespace NATS.Server.JetStream.Cluster; public sealed class JetStreamMetaGroup { private readonly int _nodes; - private readonly int _selfIndex; + private int _selfIndex; // Backward-compatible stream name set used by existing GetState().Streams. private readonly ConcurrentDictionary _streams = new(StringComparer.Ordinal); @@ -50,6 +50,13 @@ public sealed class JetStreamMetaGroup /// public bool IsLeader() => _leaderIndex == _selfIndex; + /// + /// Simulates this node winning the leader election after a stepdown. + /// Used in single-process test fixtures where only one "node" exists. + /// Go reference: jetstream_cluster.go — after stepdown, a new leader is elected. + /// + public void BecomeLeader() => _selfIndex = _leaderIndex; + /// /// Returns the leader identifier string, e.g. "meta-1". /// Used to populate the leader_hint field in not-leader error responses. diff --git a/tests/NATS.Server.Tests/JetStream/Cluster/JetStreamMetaControllerTests.cs b/tests/NATS.Server.Tests/JetStream/Cluster/JetStreamMetaControllerTests.cs index 1f7806d..eac7700 100644 --- a/tests/NATS.Server.Tests/JetStream/Cluster/JetStreamMetaControllerTests.cs +++ b/tests/NATS.Server.Tests/JetStream/Cluster/JetStreamMetaControllerTests.cs @@ -625,7 +625,17 @@ internal sealed class MetaControllerFixture : IAsyncDisposable public MetaGroupState GetMetaState() => _metaGroup.GetState(); public Task RequestAsync(string subject, string payload) - => Task.FromResult(_router.Route(subject, Encoding.UTF8.GetBytes(payload))); + { + var response = _router.Route(subject, Encoding.UTF8.GetBytes(payload)); + + // In a real cluster, after stepdown a new leader is elected. + // Simulate this node becoming the new leader so subsequent mutating + // operations through the router succeed. + if (subject.Equals(JetStreamApiSubjects.MetaLeaderStepdown, StringComparison.Ordinal) && response.Success) + _metaGroup.BecomeLeader(); + + return Task.FromResult(response); + } public ValueTask DisposeAsync() => ValueTask.CompletedTask; }