batch35: implement and verify feature group C

This commit is contained in:
Joseph Doherty
2026-03-01 02:30:35 -05:00
parent 35becf549a
commit 8baa604dce
5 changed files with 248 additions and 0 deletions

View File

@@ -32,9 +32,12 @@ namespace ZB.MOM.NatsNet.Server;
/// </summary>
internal sealed class JetStreamCluster
{
internal const string JscAllSubj = "$JSC.>";
private static readonly Exception ErrBadStreamMsg = new("jetstream cluster bad replicated stream msg");
private const int CompressThreshold = 8192;
private const ulong MsgFlagFromSourceOrMirror = 1UL;
private const int ReplySuffixLength = 10;
private const string Base62 = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz";
/// <summary>The meta-controller Raft node.</summary>
public IRaftNode? Meta { get; set; }
@@ -865,6 +868,27 @@ internal sealed class JetStreamCluster
internal static byte[] EncodeStreamMsgAllowCompress(string subject, string reply, byte[]? header, byte[]? message, ulong sequence, long timestamp, bool sourced)
=> EncodeStreamMsgAllowCompressAndBatch(subject, reply, header, message, sequence, timestamp, sourced, string.Empty, 0, false);
internal static string SyncSubjForStream() => SyncSubject("$JSC.SYNC");
internal static string SyncReplySubject() => SyncSubject("$JSC.R");
internal static string InfoReplySubject() => SyncSubject("$JSC.R");
internal static string SyncAckSubject() => $"{SyncSubject("$JSC.ACK")}.*";
internal static string SyncSubject(string prefix)
{
var suffix = new char[ReplySuffixLength];
var value = Random.Shared.NextInt64(long.MaxValue);
for (var i = 0; i < suffix.Length; i++)
{
suffix[i] = Base62[(int)(value % Base62.Length)];
value /= Base62.Length;
}
return $"{prefix}.{new string(suffix)}";
}
internal static byte[] EncodeStreamMsgAllowCompressAndBatch(
string subject,
string reply,

View File

@@ -969,6 +969,65 @@ internal sealed class JetStreamEngine(JetStream state)
}
}
internal string[] StreamAlternates(ClientInfo clientInfo, string streamName)
{
if (_state.Server is not NatsServer server)
return [];
_state.Lock.EnterReadLock();
try
{
if (_state.Cluster is not JetStreamCluster cluster)
return [];
var (account, _) = server.LookupAccount(clientInfo.ServiceAccount());
if (account == null)
return [];
if (!cluster.Streams.TryGetValue(account.Name, out var accountStreams))
return [];
var weights = new Dictionary<string, int>(StringComparer.Ordinal);
if (clientInfo.Cluster is { Length: > 0 })
{
for (var i = 0; i < clientInfo.Cluster.Length; i++)
weights[clientInfo.Cluster[i]] = clientInfo.Cluster.Length - i;
}
if (clientInfo.Alternates is { Count: > 0 })
{
for (var i = 0; i < clientInfo.Alternates.Count; i++)
weights[clientInfo.Alternates[i]] = clientInfo.Alternates.Count - i;
}
var candidates = new List<(string Name, string Cluster)>();
foreach (var assignment in accountStreams.Values)
{
if (assignment.Unsupported != null || assignment.Config == null)
continue;
if (string.Equals(assignment.Config.Name, streamName, StringComparison.Ordinal) ||
string.Equals(assignment.Config.Mirror?.Name, streamName, StringComparison.Ordinal))
{
candidates.Add((assignment.Config.Name, assignment.Group?.Cluster ?? string.Empty));
}
}
if (candidates.Count <= 1)
return [];
return candidates
.OrderByDescending(c => weights.TryGetValue(c.Cluster, out var weight) ? weight : 0)
.Select(c => c.Name)
.Distinct(StringComparer.Ordinal)
.ToArray();
}
finally
{
_state.Lock.ExitReadLock();
}
}
internal (byte[] Snapshot, int Streams, int Consumers, Exception? Error) MetaSnapshot()
{
_state.Lock.EnterReadLock();

View File

@@ -447,4 +447,60 @@ internal sealed partial class NatsStream
ClearCatchupPeer(request.Peer);
}
}
internal void CheckClusterInfo(ClusterInfo? clusterInfo)
{
if (clusterInfo?.Replicas == null || clusterInfo.Replicas.Length == 0)
return;
foreach (var replica in clusterInfo.Replicas)
{
var peer = NatsServer.GetHash(replica.Name);
var lag = LagForCatchupPeer(peer);
if (lag == 0)
continue;
replica.Current = false;
replica.Lag = lag;
}
}
internal void HandleClusterStreamInfoRequest(object? sub, ClientConnection? client, Account? account, string subject, string reply, byte[] message)
{
_ = sub;
_ = client;
_ = account;
_ = subject;
_ = message;
_ = Task.Run(() => ProcessClusterStreamInfoRequest(reply));
}
internal void ProcessClusterStreamInfoRequest(string reply)
{
_mu.EnterReadLock();
try
{
if (string.IsNullOrWhiteSpace(reply))
return;
var streamInfo = new StreamInfo
{
Created = CreatedTime(),
State = State(),
Config = Config.Clone(),
Cluster = null,
Sources = SourcesInfo(),
Mirror = MirrorInfo(),
};
if (HasCatchupPeers())
CheckClusterInfo(streamInfo.Cluster);
_outq.SendMsg(reply, JsonSerializer.SerializeToUtf8Bytes(streamInfo));
}
finally
{
_mu.ExitReadLock();
}
}
}

View File

@@ -1,9 +1,13 @@
using System.Text.Json;
using System.Threading.Channels;
namespace ZB.MOM.NatsNet.Server;
public sealed partial class NatsServer
{
private Channel<bool>? _gcbKickCh;
private const long DefaultMaxTotalCatchupOutBytes = 64L * 1024 * 1024;
internal void JsClusteredConsumerRequest(
ClientInfo clientInfo,
Account account,
@@ -142,4 +146,109 @@ public sealed partial class NatsServer
_ = requestMessage;
}
internal long GcbTotal()
{
_gcbMu.EnterReadLock();
try
{
return _gcbOut;
}
finally
{
_gcbMu.ExitReadLock();
}
}
internal bool GcbBelowMax()
{
_gcbMu.EnterReadLock();
try
{
var limit = _gcbOutMax > 0 ? _gcbOutMax : DefaultMaxTotalCatchupOutBytes;
return _gcbOut <= limit;
}
finally
{
_gcbMu.ExitReadLock();
}
}
internal void GcbAdd(ref long localOutstandingBytes, long size)
{
_gcbMu.EnterWriteLock();
try
{
localOutstandingBytes += size;
_gcbOut += size;
var limit = _gcbOutMax > 0 ? _gcbOutMax : DefaultMaxTotalCatchupOutBytes;
if (_gcbOut >= limit && _gcbKickCh == null)
_gcbKickCh = Channel.CreateBounded<bool>(1);
}
finally
{
_gcbMu.ExitWriteLock();
}
}
internal void GcbSubLocked(ref long localOutstandingBytes, long size)
{
if (localOutstandingBytes == 0)
return;
localOutstandingBytes -= size;
if (localOutstandingBytes < 0)
localOutstandingBytes = 0;
_gcbOut -= size;
if (_gcbOut < 0)
_gcbOut = 0;
var limit = _gcbOutMax > 0 ? _gcbOutMax : DefaultMaxTotalCatchupOutBytes;
if (_gcbKickCh != null && _gcbOut < limit)
{
_gcbKickCh.Writer.TryWrite(true);
_gcbKickCh.Writer.TryComplete();
_gcbKickCh = null;
}
}
internal void GcbSub(ref long localOutstandingBytes, long size)
{
_gcbMu.EnterWriteLock();
try
{
GcbSubLocked(ref localOutstandingBytes, size);
}
finally
{
_gcbMu.ExitWriteLock();
}
}
internal void GcbSubLast(ref long localOutstandingBytes)
{
_gcbMu.EnterWriteLock();
try
{
GcbSubLocked(ref localOutstandingBytes, localOutstandingBytes);
localOutstandingBytes = 0;
}
finally
{
_gcbMu.ExitWriteLock();
}
}
internal ChannelReader<bool>? CbKickChan()
{
_gcbMu.EnterReadLock();
try
{
return _gcbKickCh?.Reader;
}
finally
{
_gcbMu.ExitReadLock();
}
}
}