feat: add JetStream cluster replication and leaf node solicited reconnect

Add JetStream stream/consumer config and data replication across cluster
peers via $JS.INTERNAL.* subjects with BroadcastRoutedMessageAsync (sends
to all peers, bypassing pool routing). Capture routed data messages into
local JetStream stores in DeliverRemoteMessage. Fix leaf node solicited
reconnect by re-launching the retry loop in WatchConnectionAsync after
disconnect.

Unskips 4 of 5 E2E cluster tests (LeaderDies_NewLeaderElected,
R3Stream_NodeDies_PublishContinues, Consumer_NodeDies_PullContinuesOnSurvivor,
Leaf_HubRestart_LeafReconnects). The 5th (LeaderRestart_RejoinsAsFollower)
requires RAFT log catchup which is a separate feature.
This commit is contained in:
Joseph Doherty
2026-03-13 01:02:00 -04:00
parent ab805c883b
commit 3445a055eb
8 changed files with 164 additions and 5 deletions

View File

@@ -300,6 +300,86 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
return false;
}
/// <summary>
/// Replicates a successful JetStream mutating API call to cluster peers via internal subjects.
/// Maps API subjects to $JS.INTERNAL.* subjects and forwards to routes/leafnodes.
/// Go reference: jetstream_cluster.go — RAFT proposal broadcast.
/// </summary>
private void TryReplicateJetStreamMutation(string apiSubject, ReadOnlyMemory<byte> payload)
{
string? internalSubject = null;
if (apiSubject.StartsWith(JetStreamApiSubjects.StreamCreate, StringComparison.Ordinal))
internalSubject = JetStreamApiSubjects.InternalStreamCreate + apiSubject[JetStreamApiSubjects.StreamCreate.Length..];
else if (apiSubject.StartsWith(JetStreamApiSubjects.StreamDelete, StringComparison.Ordinal))
internalSubject = JetStreamApiSubjects.InternalStreamDelete + apiSubject[JetStreamApiSubjects.StreamDelete.Length..];
else if (apiSubject.StartsWith(JetStreamApiSubjects.StreamPurge, StringComparison.Ordinal))
internalSubject = JetStreamApiSubjects.InternalStreamPurge + apiSubject[JetStreamApiSubjects.StreamPurge.Length..];
else if (apiSubject.StartsWith(JetStreamApiSubjects.ConsumerCreate, StringComparison.Ordinal))
internalSubject = JetStreamApiSubjects.InternalConsumerCreate + apiSubject[JetStreamApiSubjects.ConsumerCreate.Length..];
else if (apiSubject.StartsWith(JetStreamApiSubjects.ConsumerDelete, StringComparison.Ordinal))
internalSubject = JetStreamApiSubjects.InternalConsumerDelete + apiSubject[JetStreamApiSubjects.ConsumerDelete.Length..];
if (internalSubject != null)
ReplicateJetStreamOperation("$G", internalSubject, null, payload);
}
/// <summary>
/// Forwards a JetStream replication message to all route and leaf node peers.
/// Bypasses interest checks since replication subjects have no client subscribers.
/// Go reference: jetstream_cluster.go — proposal broadcast to peers.
/// </summary>
private void ReplicateJetStreamOperation(string account, string subject, string? replyTo, ReadOnlyMemory<byte> payload)
{
_routeManager?.BroadcastRoutedMessageAsync(account, subject, replyTo, payload, default)
.GetAwaiter().GetResult();
if (_leafNodeManager != null)
{
var markedSubject = LeafLoopDetector.Mark(subject, ServerId);
_leafNodeManager.ForwardMessageAsync(account, markedSubject, replyTo, payload, default)
.GetAwaiter().GetResult();
}
}
/// <summary>
/// Handles incoming JetStream internal replication messages from cluster peers.
/// Dispatches to the appropriate handler based on the internal subject prefix.
/// Called from DeliverRemoteMessage for $JS.INTERNAL.* subjects.
/// Go reference: jetstream_cluster.go — RAFT proposal apply.
/// </summary>
private void HandleJetStreamReplication(string subject, ReadOnlyMemory<byte> payload)
{
if (_jetStreamStreamManager == null || _jetStreamConsumerManager == null)
return;
if (subject.StartsWith(JetStreamApiSubjects.InternalStreamCreate, StringComparison.Ordinal))
{
var apiSubject = JetStreamApiSubjects.StreamCreate + subject[JetStreamApiSubjects.InternalStreamCreate.Length..];
JetStream.Api.Handlers.StreamApiHandlers.HandleCreate(apiSubject, payload.Span, _jetStreamStreamManager);
}
else if (subject.StartsWith(JetStreamApiSubjects.InternalStreamDelete, StringComparison.Ordinal))
{
var streamName = subject[JetStreamApiSubjects.InternalStreamDelete.Length..];
_jetStreamStreamManager.Delete(streamName);
}
else if (subject.StartsWith(JetStreamApiSubjects.InternalStreamPurge, StringComparison.Ordinal))
{
var streamName = subject[JetStreamApiSubjects.InternalStreamPurge.Length..];
_jetStreamStreamManager.Purge(streamName);
}
else if (subject.StartsWith(JetStreamApiSubjects.InternalConsumerCreate, StringComparison.Ordinal))
{
var apiSubject = JetStreamApiSubjects.ConsumerCreate + subject[JetStreamApiSubjects.InternalConsumerCreate.Length..];
JetStream.Api.Handlers.ConsumerApiHandlers.HandleCreate(apiSubject, payload.Span, _jetStreamConsumerManager);
}
else if (subject.StartsWith(JetStreamApiSubjects.InternalConsumerDelete, StringComparison.Ordinal))
{
var parts = subject[JetStreamApiSubjects.InternalConsumerDelete.Length..].Split('.');
if (parts.Length >= 2)
_jetStreamConsumerManager.Delete(parts[0], parts[1]);
}
}
public Task WaitForReadyAsync() => _listeningStarted.Task;
public void WaitForShutdown() => _shutdownComplete.Task.GetAwaiter().GetResult();
@@ -1185,6 +1265,19 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
private void DeliverRemoteMessage(string account, string subject, string? replyTo, ReadOnlyMemory<byte> payload)
{
// Handle internal JetStream cluster replication messages.
// Go reference: jetstream_cluster.go — RAFT proposal apply dispatches to handlers.
if (subject.StartsWith(JetStreamApiSubjects.InternalPrefix, StringComparison.Ordinal))
{
HandleJetStreamReplication(subject, payload);
return;
}
// Capture routed data messages into local JetStream streams.
// Skip $JS. subjects — those are API calls, not stream data.
if (!subject.StartsWith("$JS.", StringComparison.Ordinal))
TryCaptureJetStreamPublish(subject, payload, out _);
var targetAccount = GetOrCreateAccount(account);
var result = targetAccount.SubList.Match(subject);
@@ -1225,6 +1318,11 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
if (response.Error != null)
Interlocked.Increment(ref _stats.JetStreamApiErrors);
// Replicate successful mutating operations to cluster peers.
// Go reference: jetstream_cluster.go — RAFT proposal replication.
if (response.Error == null)
TryReplicateJetStreamMutation(subject, payload);
var data = JsonSerializer.SerializeToUtf8Bytes(response.ToWireFormat(), s_jetStreamJsonOptions);
ProcessMessage(replyTo, null, default, data, sender);
return;
@@ -1234,6 +1332,13 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
{
sender.RecordJetStreamPubAck(pubAck);
// Replicate data messages to cluster peers so their JetStream stores also capture them.
// Route forwarding below is gated on subscriber interest, which JetStream streams don't
// create, so we must explicitly push data to replicas.
// Go reference: jetstream_cluster.go — RAFT propose entry replication.
if (pubAck.ErrorCode == null)
ReplicateJetStreamOperation("$G", subject, null, payload);
// Send pub ack response to the reply subject (request-reply pattern).
// Go reference: server/jetstream.go — jsPubAckResponse sent to reply.
if (replyTo != null)