diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamClusterTypes.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamClusterTypes.cs index 55f0303..6aa676a 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamClusterTypes.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamClusterTypes.cs @@ -32,9 +32,12 @@ namespace ZB.MOM.NatsNet.Server; /// internal sealed class JetStreamCluster { + internal const string JscAllSubj = "$JSC.>"; private static readonly Exception ErrBadStreamMsg = new("jetstream cluster bad replicated stream msg"); private const int CompressThreshold = 8192; private const ulong MsgFlagFromSourceOrMirror = 1UL; + private const int ReplySuffixLength = 10; + private const string Base62 = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"; /// The meta-controller Raft node. public IRaftNode? Meta { get; set; } @@ -865,6 +868,27 @@ internal sealed class JetStreamCluster internal static byte[] EncodeStreamMsgAllowCompress(string subject, string reply, byte[]? header, byte[]? message, ulong sequence, long timestamp, bool sourced) => EncodeStreamMsgAllowCompressAndBatch(subject, reply, header, message, sequence, timestamp, sourced, string.Empty, 0, false); + internal static string SyncSubjForStream() => SyncSubject("$JSC.SYNC"); + + internal static string SyncReplySubject() => SyncSubject("$JSC.R"); + + internal static string InfoReplySubject() => SyncSubject("$JSC.R"); + + internal static string SyncAckSubject() => $"{SyncSubject("$JSC.ACK")}.*"; + + internal static string SyncSubject(string prefix) + { + var suffix = new char[ReplySuffixLength]; + var value = Random.Shared.NextInt64(long.MaxValue); + for (var i = 0; i < suffix.Length; i++) + { + suffix[i] = Base62[(int)(value % Base62.Length)]; + value /= Base62.Length; + } + + return $"{prefix}.{new string(suffix)}"; + } + internal static byte[] EncodeStreamMsgAllowCompressAndBatch( string subject, string reply, diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamEngine.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamEngine.cs index 1ff05d6..f22a899 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamEngine.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamEngine.cs @@ -969,6 +969,65 @@ internal sealed class JetStreamEngine(JetStream state) } } + internal string[] StreamAlternates(ClientInfo clientInfo, string streamName) + { + if (_state.Server is not NatsServer server) + return []; + + _state.Lock.EnterReadLock(); + try + { + if (_state.Cluster is not JetStreamCluster cluster) + return []; + + var (account, _) = server.LookupAccount(clientInfo.ServiceAccount()); + if (account == null) + return []; + + if (!cluster.Streams.TryGetValue(account.Name, out var accountStreams)) + return []; + + var weights = new Dictionary(StringComparer.Ordinal); + if (clientInfo.Cluster is { Length: > 0 }) + { + for (var i = 0; i < clientInfo.Cluster.Length; i++) + weights[clientInfo.Cluster[i]] = clientInfo.Cluster.Length - i; + } + + if (clientInfo.Alternates is { Count: > 0 }) + { + for (var i = 0; i < clientInfo.Alternates.Count; i++) + weights[clientInfo.Alternates[i]] = clientInfo.Alternates.Count - i; + } + + var candidates = new List<(string Name, string Cluster)>(); + foreach (var assignment in accountStreams.Values) + { + if (assignment.Unsupported != null || assignment.Config == null) + continue; + + if (string.Equals(assignment.Config.Name, streamName, StringComparison.Ordinal) || + string.Equals(assignment.Config.Mirror?.Name, streamName, StringComparison.Ordinal)) + { + candidates.Add((assignment.Config.Name, assignment.Group?.Cluster ?? string.Empty)); + } + } + + if (candidates.Count <= 1) + return []; + + return candidates + .OrderByDescending(c => weights.TryGetValue(c.Cluster, out var weight) ? weight : 0) + .Select(c => c.Name) + .Distinct(StringComparer.Ordinal) + .ToArray(); + } + finally + { + _state.Lock.ExitReadLock(); + } + } + internal (byte[] Snapshot, int Streams, int Consumers, Exception? Error) MetaSnapshot() { _state.Lock.EnterReadLock(); diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.ClusterRemaining.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.ClusterRemaining.cs index 2b7cfbb..aa39009 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.ClusterRemaining.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.ClusterRemaining.cs @@ -447,4 +447,60 @@ internal sealed partial class NatsStream ClearCatchupPeer(request.Peer); } } + + internal void CheckClusterInfo(ClusterInfo? clusterInfo) + { + if (clusterInfo?.Replicas == null || clusterInfo.Replicas.Length == 0) + return; + + foreach (var replica in clusterInfo.Replicas) + { + var peer = NatsServer.GetHash(replica.Name); + var lag = LagForCatchupPeer(peer); + if (lag == 0) + continue; + + replica.Current = false; + replica.Lag = lag; + } + } + + internal void HandleClusterStreamInfoRequest(object? sub, ClientConnection? client, Account? account, string subject, string reply, byte[] message) + { + _ = sub; + _ = client; + _ = account; + _ = subject; + _ = message; + _ = Task.Run(() => ProcessClusterStreamInfoRequest(reply)); + } + + internal void ProcessClusterStreamInfoRequest(string reply) + { + _mu.EnterReadLock(); + try + { + if (string.IsNullOrWhiteSpace(reply)) + return; + + var streamInfo = new StreamInfo + { + Created = CreatedTime(), + State = State(), + Config = Config.Clone(), + Cluster = null, + Sources = SourcesInfo(), + Mirror = MirrorInfo(), + }; + + if (HasCatchupPeers()) + CheckClusterInfo(streamInfo.Cluster); + + _outq.SendMsg(reply, JsonSerializer.SerializeToUtf8Bytes(streamInfo)); + } + finally + { + _mu.ExitReadLock(); + } + } } diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.JetStreamClusterRemaining.cs b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.JetStreamClusterRemaining.cs index 965fa1c..0ab8ed1 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.JetStreamClusterRemaining.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.JetStreamClusterRemaining.cs @@ -1,9 +1,13 @@ using System.Text.Json; +using System.Threading.Channels; namespace ZB.MOM.NatsNet.Server; public sealed partial class NatsServer { + private Channel? _gcbKickCh; + private const long DefaultMaxTotalCatchupOutBytes = 64L * 1024 * 1024; + internal void JsClusteredConsumerRequest( ClientInfo clientInfo, Account account, @@ -142,4 +146,109 @@ public sealed partial class NatsServer _ = requestMessage; } + + internal long GcbTotal() + { + _gcbMu.EnterReadLock(); + try + { + return _gcbOut; + } + finally + { + _gcbMu.ExitReadLock(); + } + } + + internal bool GcbBelowMax() + { + _gcbMu.EnterReadLock(); + try + { + var limit = _gcbOutMax > 0 ? _gcbOutMax : DefaultMaxTotalCatchupOutBytes; + return _gcbOut <= limit; + } + finally + { + _gcbMu.ExitReadLock(); + } + } + + internal void GcbAdd(ref long localOutstandingBytes, long size) + { + _gcbMu.EnterWriteLock(); + try + { + localOutstandingBytes += size; + _gcbOut += size; + var limit = _gcbOutMax > 0 ? _gcbOutMax : DefaultMaxTotalCatchupOutBytes; + if (_gcbOut >= limit && _gcbKickCh == null) + _gcbKickCh = Channel.CreateBounded(1); + } + finally + { + _gcbMu.ExitWriteLock(); + } + } + + internal void GcbSubLocked(ref long localOutstandingBytes, long size) + { + if (localOutstandingBytes == 0) + return; + + localOutstandingBytes -= size; + if (localOutstandingBytes < 0) + localOutstandingBytes = 0; + _gcbOut -= size; + if (_gcbOut < 0) + _gcbOut = 0; + + var limit = _gcbOutMax > 0 ? _gcbOutMax : DefaultMaxTotalCatchupOutBytes; + if (_gcbKickCh != null && _gcbOut < limit) + { + _gcbKickCh.Writer.TryWrite(true); + _gcbKickCh.Writer.TryComplete(); + _gcbKickCh = null; + } + } + + internal void GcbSub(ref long localOutstandingBytes, long size) + { + _gcbMu.EnterWriteLock(); + try + { + GcbSubLocked(ref localOutstandingBytes, size); + } + finally + { + _gcbMu.ExitWriteLock(); + } + } + + internal void GcbSubLast(ref long localOutstandingBytes) + { + _gcbMu.EnterWriteLock(); + try + { + GcbSubLocked(ref localOutstandingBytes, localOutstandingBytes); + localOutstandingBytes = 0; + } + finally + { + _gcbMu.ExitWriteLock(); + } + } + + internal ChannelReader? CbKickChan() + { + _gcbMu.EnterReadLock(); + try + { + return _gcbKickCh?.Reader; + } + finally + { + _gcbMu.ExitReadLock(); + } + } } diff --git a/porting.db b/porting.db index 3fbeccc..5802cd1 100644 Binary files a/porting.db and b/porting.db differ