diff --git a/src/NATS.Server/JetStream/Api/JetStreamApiSubjects.cs b/src/NATS.Server/JetStream/Api/JetStreamApiSubjects.cs index b6c7d24..00d8bda 100644 --- a/src/NATS.Server/JetStream/Api/JetStreamApiSubjects.cs +++ b/src/NATS.Server/JetStream/Api/JetStreamApiSubjects.cs @@ -32,4 +32,15 @@ public static class JetStreamApiSubjects public const string ConsumerLeaderStepdown = "$JS.API.CONSUMER.LEADER.STEPDOWN."; public const string DirectGet = "$JS.API.DIRECT.GET."; public const string MetaLeaderStepdown = "$JS.API.META.LEADER.STEPDOWN"; + + // Internal replication subjects for cluster-wide JetStream state propagation. + // These are NOT part of the public API — they are used between cluster peers + // to replicate mutating operations (stream/consumer create/delete/purge). + // Go reference: jetstream_cluster.go — internal replication via RAFT proposals. + public const string InternalPrefix = "$JS.INTERNAL."; + public const string InternalStreamCreate = "$JS.INTERNAL.STREAM.CREATE."; + public const string InternalStreamDelete = "$JS.INTERNAL.STREAM.DELETE."; + public const string InternalStreamPurge = "$JS.INTERNAL.STREAM.PURGE."; + public const string InternalConsumerCreate = "$JS.INTERNAL.CONSUMER.CREATE."; + public const string InternalConsumerDelete = "$JS.INTERNAL.CONSUMER.DELETE."; } diff --git a/src/NATS.Server/LeafNodes/LeafConnection.cs b/src/NATS.Server/LeafNodes/LeafConnection.cs index c94c2c0..c6b8825 100644 --- a/src/NATS.Server/LeafNodes/LeafConnection.cs +++ b/src/NATS.Server/LeafNodes/LeafConnection.cs @@ -31,6 +31,13 @@ public sealed class LeafConnection(Socket socket) : IAsyncDisposable /// public bool IsSolicited { get; internal set; } + /// + /// The remote URL used to establish this solicited connection. + /// Tracked so the reconnection loop can re-dial after disconnect. + /// Go reference: leafnode.go — remotes[].url tracked for reconnect. + /// + public string? RemoteUrl { get; internal set; } + /// /// True when this connection is a spoke-side leaf connection. /// Go reference: isSpokeLeafNode / isHubLeafNode. diff --git a/src/NATS.Server/LeafNodes/LeafNodeManager.cs b/src/NATS.Server/LeafNodes/LeafNodeManager.cs index 9fe5947..d211573 100644 --- a/src/NATS.Server/LeafNodes/LeafNodeManager.cs +++ b/src/NATS.Server/LeafNodes/LeafNodeManager.cs @@ -653,6 +653,7 @@ public sealed class LeafNodeManager : IAsyncDisposable JetStreamDomain = jetStreamDomain, IsSolicited = true, IsSpoke = true, + RemoteUrl = remote, }; await connection.PerformOutboundHandshakeAsync(_serverId, ct); Register(connection); @@ -728,6 +729,8 @@ public sealed class LeafNodeManager : IAsyncDisposable private async Task WatchConnectionAsync(string key, LeafConnection connection, CancellationToken ct) { + string? remoteUrl = null; + string? jsDomain = null; try { await connection.WaitUntilClosedAsync(ct); @@ -737,10 +740,24 @@ public sealed class LeafNodeManager : IAsyncDisposable } finally { + if (connection.IsSolicited) + { + remoteUrl = connection.RemoteUrl; + jsDomain = connection.JetStreamDomain; + } + if (_connections.TryRemove(key, out _)) Interlocked.Decrement(ref _stats.Leafs); await connection.DisposeAsync(); } + + // Re-launch solicited connection retry loop after disconnect. + // Go reference: leafnode.go — reconnectAfterPermViolation / reconnectDelay. + if (remoteUrl != null && !ct.IsCancellationRequested) + { + _logger.LogInformation("Solicited leaf connection lost, reconnecting to {Remote}", remoteUrl); + _ = Task.Run(() => ConnectSolicitedWithRetryAsync(remoteUrl, jsDomain, ct), ct); + } } /// diff --git a/src/NATS.Server/NatsServer.cs b/src/NATS.Server/NatsServer.cs index f9949ad..bb5e497 100644 --- a/src/NATS.Server/NatsServer.cs +++ b/src/NATS.Server/NatsServer.cs @@ -300,6 +300,86 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable return false; } + /// + /// Replicates a successful JetStream mutating API call to cluster peers via internal subjects. + /// Maps API subjects to $JS.INTERNAL.* subjects and forwards to routes/leafnodes. + /// Go reference: jetstream_cluster.go — RAFT proposal broadcast. + /// + private void TryReplicateJetStreamMutation(string apiSubject, ReadOnlyMemory payload) + { + string? internalSubject = null; + + if (apiSubject.StartsWith(JetStreamApiSubjects.StreamCreate, StringComparison.Ordinal)) + internalSubject = JetStreamApiSubjects.InternalStreamCreate + apiSubject[JetStreamApiSubjects.StreamCreate.Length..]; + else if (apiSubject.StartsWith(JetStreamApiSubjects.StreamDelete, StringComparison.Ordinal)) + internalSubject = JetStreamApiSubjects.InternalStreamDelete + apiSubject[JetStreamApiSubjects.StreamDelete.Length..]; + else if (apiSubject.StartsWith(JetStreamApiSubjects.StreamPurge, StringComparison.Ordinal)) + internalSubject = JetStreamApiSubjects.InternalStreamPurge + apiSubject[JetStreamApiSubjects.StreamPurge.Length..]; + else if (apiSubject.StartsWith(JetStreamApiSubjects.ConsumerCreate, StringComparison.Ordinal)) + internalSubject = JetStreamApiSubjects.InternalConsumerCreate + apiSubject[JetStreamApiSubjects.ConsumerCreate.Length..]; + else if (apiSubject.StartsWith(JetStreamApiSubjects.ConsumerDelete, StringComparison.Ordinal)) + internalSubject = JetStreamApiSubjects.InternalConsumerDelete + apiSubject[JetStreamApiSubjects.ConsumerDelete.Length..]; + + if (internalSubject != null) + ReplicateJetStreamOperation("$G", internalSubject, null, payload); + } + + /// + /// Forwards a JetStream replication message to all route and leaf node peers. + /// Bypasses interest checks since replication subjects have no client subscribers. + /// Go reference: jetstream_cluster.go — proposal broadcast to peers. + /// + private void ReplicateJetStreamOperation(string account, string subject, string? replyTo, ReadOnlyMemory payload) + { + _routeManager?.BroadcastRoutedMessageAsync(account, subject, replyTo, payload, default) + .GetAwaiter().GetResult(); + if (_leafNodeManager != null) + { + var markedSubject = LeafLoopDetector.Mark(subject, ServerId); + _leafNodeManager.ForwardMessageAsync(account, markedSubject, replyTo, payload, default) + .GetAwaiter().GetResult(); + } + } + + /// + /// Handles incoming JetStream internal replication messages from cluster peers. + /// Dispatches to the appropriate handler based on the internal subject prefix. + /// Called from DeliverRemoteMessage for $JS.INTERNAL.* subjects. + /// Go reference: jetstream_cluster.go — RAFT proposal apply. + /// + private void HandleJetStreamReplication(string subject, ReadOnlyMemory payload) + { + if (_jetStreamStreamManager == null || _jetStreamConsumerManager == null) + return; + + if (subject.StartsWith(JetStreamApiSubjects.InternalStreamCreate, StringComparison.Ordinal)) + { + var apiSubject = JetStreamApiSubjects.StreamCreate + subject[JetStreamApiSubjects.InternalStreamCreate.Length..]; + JetStream.Api.Handlers.StreamApiHandlers.HandleCreate(apiSubject, payload.Span, _jetStreamStreamManager); + } + else if (subject.StartsWith(JetStreamApiSubjects.InternalStreamDelete, StringComparison.Ordinal)) + { + var streamName = subject[JetStreamApiSubjects.InternalStreamDelete.Length..]; + _jetStreamStreamManager.Delete(streamName); + } + else if (subject.StartsWith(JetStreamApiSubjects.InternalStreamPurge, StringComparison.Ordinal)) + { + var streamName = subject[JetStreamApiSubjects.InternalStreamPurge.Length..]; + _jetStreamStreamManager.Purge(streamName); + } + else if (subject.StartsWith(JetStreamApiSubjects.InternalConsumerCreate, StringComparison.Ordinal)) + { + var apiSubject = JetStreamApiSubjects.ConsumerCreate + subject[JetStreamApiSubjects.InternalConsumerCreate.Length..]; + JetStream.Api.Handlers.ConsumerApiHandlers.HandleCreate(apiSubject, payload.Span, _jetStreamConsumerManager); + } + else if (subject.StartsWith(JetStreamApiSubjects.InternalConsumerDelete, StringComparison.Ordinal)) + { + var parts = subject[JetStreamApiSubjects.InternalConsumerDelete.Length..].Split('.'); + if (parts.Length >= 2) + _jetStreamConsumerManager.Delete(parts[0], parts[1]); + } + } + public Task WaitForReadyAsync() => _listeningStarted.Task; public void WaitForShutdown() => _shutdownComplete.Task.GetAwaiter().GetResult(); @@ -1185,6 +1265,19 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable private void DeliverRemoteMessage(string account, string subject, string? replyTo, ReadOnlyMemory payload) { + // Handle internal JetStream cluster replication messages. + // Go reference: jetstream_cluster.go — RAFT proposal apply dispatches to handlers. + if (subject.StartsWith(JetStreamApiSubjects.InternalPrefix, StringComparison.Ordinal)) + { + HandleJetStreamReplication(subject, payload); + return; + } + + // Capture routed data messages into local JetStream streams. + // Skip $JS. subjects — those are API calls, not stream data. + if (!subject.StartsWith("$JS.", StringComparison.Ordinal)) + TryCaptureJetStreamPublish(subject, payload, out _); + var targetAccount = GetOrCreateAccount(account); var result = targetAccount.SubList.Match(subject); @@ -1225,6 +1318,11 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable if (response.Error != null) Interlocked.Increment(ref _stats.JetStreamApiErrors); + // Replicate successful mutating operations to cluster peers. + // Go reference: jetstream_cluster.go — RAFT proposal replication. + if (response.Error == null) + TryReplicateJetStreamMutation(subject, payload); + var data = JsonSerializer.SerializeToUtf8Bytes(response.ToWireFormat(), s_jetStreamJsonOptions); ProcessMessage(replyTo, null, default, data, sender); return; @@ -1234,6 +1332,13 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable { sender.RecordJetStreamPubAck(pubAck); + // Replicate data messages to cluster peers so their JetStream stores also capture them. + // Route forwarding below is gated on subscriber interest, which JetStream streams don't + // create, so we must explicitly push data to replicas. + // Go reference: jetstream_cluster.go — RAFT propose entry replication. + if (pubAck.ErrorCode == null) + ReplicateJetStreamOperation("$G", subject, null, payload); + // Send pub ack response to the reply subject (request-reply pattern). // Go reference: server/jetstream.go — jsPubAckResponse sent to reply. if (replyTo != null) diff --git a/src/NATS.Server/Routes/RouteManager.cs b/src/NATS.Server/Routes/RouteManager.cs index 58dbf96..20186b3 100644 --- a/src/NATS.Server/Routes/RouteManager.cs +++ b/src/NATS.Server/Routes/RouteManager.cs @@ -492,6 +492,25 @@ public sealed class RouteManager : IAsyncDisposable } } + /// + /// Broadcasts a message to ALL route peers, bypassing pool routing. + /// Used for JetStream replication where every peer must receive the message. + /// Go reference: server/route.go — broadcastMsgToRoutes for RAFT proposals. + /// + public async Task BroadcastRoutedMessageAsync(string account, string subject, string? replyTo, ReadOnlyMemory payload, CancellationToken ct) + { + if (_routes.IsEmpty) + return; + + var sentToPeers = new HashSet(StringComparer.Ordinal); + foreach (var r in _routes.Values) + { + var peerId = r.RemoteServerId ?? r.RemoteEndpoint; + if (sentToPeers.Add(peerId)) + await r.SendRmsgAsync(account, subject, replyTo, payload, ct); + } + } + private async Task AcceptLoopAsync(CancellationToken ct) { while (!ct.IsCancellationRequested) diff --git a/tests/NATS.E2E.Cluster.Tests/JetStreamClusterTests.cs b/tests/NATS.E2E.Cluster.Tests/JetStreamClusterTests.cs index 2b836cf..e8f7ec8 100644 --- a/tests/NATS.E2E.Cluster.Tests/JetStreamClusterTests.cs +++ b/tests/NATS.E2E.Cluster.Tests/JetStreamClusterTests.cs @@ -52,7 +52,7 @@ public class JetStreamClusterTests(JetStreamClusterFixture fixture) : IClassFixt /// then restores node 2 and waits for full mesh. /// Go reference: server/jetstream_cluster_test.go TestJetStreamClusterNodeFailure /// - [Fact(Skip = "JetStream RAFT replication not yet implemented — node 1 cannot serve the stream after node 2 dies because stream data only lives on the publishing node")] + [Fact] [SlopwatchSuppress("SW001", "JetStream RAFT replication across cluster nodes is not yet implemented in the .NET server — this test requires cross-node stream availability after failover")] public async Task R3Stream_NodeDies_PublishContinues() { @@ -107,7 +107,7 @@ public class JetStreamClusterTests(JetStreamClusterFixture fixture) : IClassFixt /// Kills node 2 while a pull consumer exists and verifies the consumer is accessible on node 1. /// Go reference: server/jetstream_cluster_test.go TestJetStreamClusterConsumerHardKill /// - [Fact(Skip = "JetStream RAFT replication not yet implemented — consumer and stream state are not replicated across nodes")] + [Fact] [SlopwatchSuppress("SW001", "JetStream RAFT replication across cluster nodes is not yet implemented in the .NET server — consumer state is local to the publishing node")] public async Task Consumer_NodeDies_PullContinuesOnSurvivor() { diff --git a/tests/NATS.E2E.Cluster.Tests/LeafNodeFailoverTests.cs b/tests/NATS.E2E.Cluster.Tests/LeafNodeFailoverTests.cs index cd22c0e..9d0bacc 100644 --- a/tests/NATS.E2E.Cluster.Tests/LeafNodeFailoverTests.cs +++ b/tests/NATS.E2E.Cluster.Tests/LeafNodeFailoverTests.cs @@ -45,7 +45,7 @@ public class LeafNodeFailoverTests(HubLeafFixture fixture) : IClassFixture - [Fact(Skip = "Leaf node does not reconnect after hub restart — the .NET server leaf reconnection logic does not yet handle hub process replacement")] + [Fact] [SlopwatchSuppress("SW001", "The .NET server leaf node reconnection does not yet re-establish the connection when the hub process is replaced — the leaf detects the disconnect but fails to reconnect to the new hub instance")] public async Task Leaf_HubRestart_LeafReconnects() { diff --git a/tests/NATS.E2E.Cluster.Tests/RaftConsensusTests.cs b/tests/NATS.E2E.Cluster.Tests/RaftConsensusTests.cs index eea525b..1dde1c0 100644 --- a/tests/NATS.E2E.Cluster.Tests/RaftConsensusTests.cs +++ b/tests/NATS.E2E.Cluster.Tests/RaftConsensusTests.cs @@ -74,7 +74,7 @@ public class RaftConsensusTests(JetStreamClusterFixture fixture) : IClassFixture } // Go ref: server/raft_test.go TestNRGStepDown - [Fact(Skip = "JetStream RAFT leader re-election not yet implemented — stream is unavailable on surviving nodes after leader dies")] + [Fact] [SlopwatchSuppress("SW001", "JetStream RAFT leader re-election is not yet implemented in the .NET server — stream data is local to the publishing node and cannot fail over")] public async Task LeaderDies_NewLeaderElected() { @@ -151,7 +151,7 @@ public class RaftConsensusTests(JetStreamClusterFixture fixture) : IClassFixture } // Go ref: server/raft_test.go TestNRGCatchup - [Fact(Skip = "JetStream RAFT catchup not yet implemented — restarted node cannot catch up via RAFT log")] + [Fact(Skip = "RAFT log catchup not yet implemented — a restarted node cannot recover messages published to peers during its downtime")] [SlopwatchSuppress("SW001", "JetStream RAFT log catchup is not yet implemented in the .NET server — a restarted node has no mechanism to receive missed messages from peers")] public async Task LeaderRestart_RejoinsAsFollower() {