batch36 task5-6 group-d-e and test waves t3-t4
This commit is contained in:
289
dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.Source.cs
Normal file
289
dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.Source.cs
Normal file
@@ -0,0 +1,289 @@
|
||||
namespace ZB.MOM.NatsNet.Server;
|
||||
|
||||
internal sealed partial class NatsStream
|
||||
{
|
||||
internal void SetupMirrorConsumer()
|
||||
{
|
||||
_mu.EnterWriteLock();
|
||||
try
|
||||
{
|
||||
_mirrorInfo ??= new StreamSourceInfo
|
||||
{
|
||||
Name = Config.Mirror?.Name ?? Name,
|
||||
};
|
||||
_mirrorInfo.Active = DateTime.UtcNow;
|
||||
_mirrorInfo.Error = null;
|
||||
}
|
||||
finally
|
||||
{
|
||||
_mu.ExitWriteLock();
|
||||
}
|
||||
}
|
||||
|
||||
internal StreamSource? StreamSource(string indexName)
|
||||
{
|
||||
_mu.EnterReadLock();
|
||||
try
|
||||
{
|
||||
return (Config.Sources ?? []).FirstOrDefault(source => string.Equals(source.IndexName, indexName, StringComparison.Ordinal));
|
||||
}
|
||||
finally
|
||||
{
|
||||
_mu.ExitReadLock();
|
||||
}
|
||||
}
|
||||
|
||||
internal void RetrySourceConsumerAtSeq(string indexName, ulong sequence)
|
||||
{
|
||||
_mu.EnterWriteLock();
|
||||
try
|
||||
{
|
||||
_sourceStartingSequences[indexName] = sequence;
|
||||
}
|
||||
finally
|
||||
{
|
||||
_mu.ExitWriteLock();
|
||||
}
|
||||
}
|
||||
|
||||
internal void CancelSourceConsumer(string indexName)
|
||||
=> CancelSourceInfo(indexName);
|
||||
|
||||
internal void CancelSourceInfo(string indexName)
|
||||
{
|
||||
_mu.EnterWriteLock();
|
||||
try
|
||||
{
|
||||
_sources.Remove(indexName);
|
||||
_sourceStartingSequences.Remove(indexName);
|
||||
}
|
||||
finally
|
||||
{
|
||||
_mu.ExitWriteLock();
|
||||
}
|
||||
}
|
||||
|
||||
internal void SetupSourceConsumer(string indexName, ulong sequence, DateTime requestedAtUtc)
|
||||
{
|
||||
_mu.EnterWriteLock();
|
||||
try
|
||||
{
|
||||
if (!_sources.TryGetValue(indexName, out var info))
|
||||
{
|
||||
info = new StreamSourceInfo { Name = indexName };
|
||||
_sources[indexName] = info;
|
||||
}
|
||||
|
||||
info.Active = requestedAtUtc == default ? DateTime.UtcNow : requestedAtUtc;
|
||||
info.Lag = sequence > 0 ? sequence - 1 : 0;
|
||||
info.Error = null;
|
||||
_sourceStartingSequences[indexName] = sequence;
|
||||
}
|
||||
finally
|
||||
{
|
||||
_mu.ExitWriteLock();
|
||||
}
|
||||
}
|
||||
|
||||
internal bool TrySetupSourceConsumer(string indexName, ulong sequence, DateTime requestedAtUtc)
|
||||
{
|
||||
SetupSourceConsumer(indexName, sequence, requestedAtUtc);
|
||||
return true;
|
||||
}
|
||||
|
||||
internal bool ProcessAllSourceMsgs(string indexName, IReadOnlyList<InMsg> messages)
|
||||
{
|
||||
var handled = true;
|
||||
foreach (var message in messages)
|
||||
handled &= ProcessInboundSourceMsg(indexName, message);
|
||||
|
||||
return handled;
|
||||
}
|
||||
|
||||
internal void SendFlowControlReply(string replySubject)
|
||||
{
|
||||
_ = replySubject;
|
||||
}
|
||||
|
||||
internal void HandleFlowControl(InMsg message)
|
||||
{
|
||||
if (!string.IsNullOrWhiteSpace(message.Reply))
|
||||
SendFlowControlReply(message.Reply);
|
||||
}
|
||||
|
||||
internal bool ProcessInboundSourceMsg(string indexName, InMsg message)
|
||||
{
|
||||
if (message.IsControlMsg())
|
||||
{
|
||||
HandleFlowControl(message);
|
||||
return true;
|
||||
}
|
||||
|
||||
_mu.EnterWriteLock();
|
||||
try
|
||||
{
|
||||
if (!_sources.TryGetValue(indexName, out var info))
|
||||
return false;
|
||||
|
||||
info.Active = DateTime.UtcNow;
|
||||
info.Lag = info.Lag > 0 ? info.Lag - 1 : 0;
|
||||
return true;
|
||||
}
|
||||
finally
|
||||
{
|
||||
_mu.ExitWriteLock();
|
||||
}
|
||||
}
|
||||
|
||||
internal static (string Stream, ulong Sequence) StreamAndSeqFromAckReply(string reply)
|
||||
=> StreamAndSeq(reply);
|
||||
|
||||
internal static (string Stream, ulong Sequence) StreamAndSeq(string reply)
|
||||
{
|
||||
if (string.IsNullOrWhiteSpace(reply))
|
||||
return (string.Empty, 0);
|
||||
|
||||
var tokens = reply.Split('.', StringSplitOptions.RemoveEmptyEntries);
|
||||
if (tokens.Length < 2)
|
||||
return (string.Empty, 0);
|
||||
|
||||
var stream = tokens[0];
|
||||
_ = ulong.TryParse(tokens[^1], out var sequence);
|
||||
return (stream, sequence);
|
||||
}
|
||||
|
||||
internal void SetStartingSequenceForSources(IDictionary<string, ulong> startingSequences)
|
||||
{
|
||||
_mu.EnterWriteLock();
|
||||
try
|
||||
{
|
||||
foreach (var pair in startingSequences)
|
||||
_sourceStartingSequences[pair.Key] = pair.Value;
|
||||
}
|
||||
finally
|
||||
{
|
||||
_mu.ExitWriteLock();
|
||||
}
|
||||
}
|
||||
|
||||
internal void ResetSourceInfo(string indexName)
|
||||
{
|
||||
_mu.EnterWriteLock();
|
||||
try
|
||||
{
|
||||
if (_sources.TryGetValue(indexName, out var info))
|
||||
{
|
||||
info.Active = null;
|
||||
info.Error = null;
|
||||
info.Lag = 0;
|
||||
}
|
||||
}
|
||||
finally
|
||||
{
|
||||
_mu.ExitWriteLock();
|
||||
}
|
||||
}
|
||||
|
||||
internal ulong StartingSequenceForSources(string indexName)
|
||||
{
|
||||
_mu.EnterReadLock();
|
||||
try
|
||||
{
|
||||
return _sourceStartingSequences.TryGetValue(indexName, out var sequence) ? sequence : 0;
|
||||
}
|
||||
finally
|
||||
{
|
||||
_mu.ExitReadLock();
|
||||
}
|
||||
}
|
||||
|
||||
internal void SetupSourceConsumers()
|
||||
{
|
||||
_mu.EnterWriteLock();
|
||||
try
|
||||
{
|
||||
foreach (var source in Config.Sources ?? [])
|
||||
{
|
||||
source.SetIndexName();
|
||||
if (!_sources.ContainsKey(source.IndexName))
|
||||
{
|
||||
_sources[source.IndexName] = new StreamSourceInfo
|
||||
{
|
||||
Name = source.Name,
|
||||
FilterSubject = source.FilterSubject,
|
||||
External = source,
|
||||
};
|
||||
}
|
||||
|
||||
var sequence = source.OptStartSeq > 0 ? source.OptStartSeq : 1;
|
||||
_sourceStartingSequences[source.IndexName] = sequence;
|
||||
}
|
||||
}
|
||||
finally
|
||||
{
|
||||
_mu.ExitWriteLock();
|
||||
}
|
||||
}
|
||||
|
||||
internal void RetryDisconnectedSyncConsumers()
|
||||
{
|
||||
_mu.EnterWriteLock();
|
||||
try
|
||||
{
|
||||
if (_mirrorInfo != null && _mirrorInfo.Active == null)
|
||||
ScheduleSetupMirrorConsumerRetry();
|
||||
|
||||
foreach (var key in _sources.Keys.ToArray())
|
||||
{
|
||||
if (_sources[key].Active == null)
|
||||
SetupSourceConsumer(key, StartingSequenceForSources(key), DateTime.UtcNow);
|
||||
}
|
||||
}
|
||||
finally
|
||||
{
|
||||
_mu.ExitWriteLock();
|
||||
}
|
||||
}
|
||||
|
||||
internal void ProcessMirrorMsgs(StreamSourceInfo mirrorInfo, IReadOnlyList<InMsg> messages)
|
||||
{
|
||||
foreach (var message in messages)
|
||||
ProcessInboundMirrorMsg(message);
|
||||
|
||||
_mu.EnterWriteLock();
|
||||
try
|
||||
{
|
||||
_mirrorInfo = SourceInfo(mirrorInfo);
|
||||
if (_mirrorInfo != null)
|
||||
_mirrorInfo.Active = DateTime.UtcNow;
|
||||
}
|
||||
finally
|
||||
{
|
||||
_mu.ExitWriteLock();
|
||||
}
|
||||
}
|
||||
|
||||
internal bool ProcessInboundMirrorMsg(InMsg message)
|
||||
{
|
||||
if (message.IsControlMsg())
|
||||
{
|
||||
HandleFlowControl(message);
|
||||
return true;
|
||||
}
|
||||
|
||||
_mu.EnterWriteLock();
|
||||
try
|
||||
{
|
||||
if (_mirrorInfo == null)
|
||||
return false;
|
||||
|
||||
_mirrorInfo.Active = DateTime.UtcNow;
|
||||
_mirrorInfo.Lag = _mirrorInfo.Lag > 0 ? _mirrorInfo.Lag - 1 : 0;
|
||||
return true;
|
||||
}
|
||||
finally
|
||||
{
|
||||
_mu.ExitWriteLock();
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,176 @@
|
||||
using System.Text;
|
||||
using ZB.MOM.NatsNet.Server.Internal;
|
||||
|
||||
namespace ZB.MOM.NatsNet.Server;
|
||||
|
||||
internal sealed partial class NatsStream
|
||||
{
|
||||
internal void SubscribeToStream()
|
||||
{
|
||||
foreach (var subject in Config.Subjects ?? [])
|
||||
_ = SubscribeInternal(subject, handler: null);
|
||||
}
|
||||
|
||||
internal void SubscribeToDirect()
|
||||
{
|
||||
_mu.EnterWriteLock();
|
||||
try
|
||||
{
|
||||
_allowDirectSubscription = true;
|
||||
}
|
||||
finally
|
||||
{
|
||||
_mu.ExitWriteLock();
|
||||
}
|
||||
}
|
||||
|
||||
internal void UnsubscribeToDirect()
|
||||
{
|
||||
_mu.EnterWriteLock();
|
||||
try
|
||||
{
|
||||
_allowDirectSubscription = false;
|
||||
}
|
||||
finally
|
||||
{
|
||||
_mu.ExitWriteLock();
|
||||
}
|
||||
}
|
||||
|
||||
internal void SubscribeToMirrorDirect()
|
||||
{
|
||||
_mu.EnterWriteLock();
|
||||
try
|
||||
{
|
||||
_allowMirrorDirectSubscription = true;
|
||||
}
|
||||
finally
|
||||
{
|
||||
_mu.ExitWriteLock();
|
||||
}
|
||||
}
|
||||
|
||||
internal void UnsubscribeToMirrorDirect()
|
||||
{
|
||||
_mu.EnterWriteLock();
|
||||
try
|
||||
{
|
||||
_allowMirrorDirectSubscription = false;
|
||||
}
|
||||
finally
|
||||
{
|
||||
_mu.ExitWriteLock();
|
||||
}
|
||||
}
|
||||
|
||||
internal void StopSourceConsumers()
|
||||
{
|
||||
_mu.EnterWriteLock();
|
||||
try
|
||||
{
|
||||
_sources.Clear();
|
||||
_sourceStartingSequences.Clear();
|
||||
_mirrorInfo = null;
|
||||
}
|
||||
finally
|
||||
{
|
||||
_mu.ExitWriteLock();
|
||||
}
|
||||
}
|
||||
|
||||
internal void RemoveInternalConsumer(string subject)
|
||||
{
|
||||
_ = UnsubscribeInternal(subject);
|
||||
}
|
||||
|
||||
internal void UnsubscribeToStream()
|
||||
{
|
||||
foreach (var subject in Config.Subjects ?? [])
|
||||
_ = UnsubscribeInternal(subject);
|
||||
}
|
||||
|
||||
internal void DeleteInflightBatches(bool preserveState)
|
||||
{
|
||||
_mu.EnterWriteLock();
|
||||
try
|
||||
{
|
||||
_inflightBatches.Clear();
|
||||
if (!preserveState)
|
||||
DeleteBatchApplyState();
|
||||
}
|
||||
finally
|
||||
{
|
||||
_mu.ExitWriteLock();
|
||||
}
|
||||
}
|
||||
|
||||
internal void DeleteBatchApplyState()
|
||||
{
|
||||
_mu.EnterWriteLock();
|
||||
try
|
||||
{
|
||||
_inflightBatches.Clear();
|
||||
}
|
||||
finally
|
||||
{
|
||||
_mu.ExitWriteLock();
|
||||
}
|
||||
}
|
||||
|
||||
internal (Subscription? Subscription, Exception? Error) SubscribeInternal(string subject, Action<InMsg>? handler)
|
||||
{
|
||||
_ = handler;
|
||||
|
||||
if (string.IsNullOrWhiteSpace(subject))
|
||||
return (null, new ArgumentException("subject required", nameof(subject)));
|
||||
|
||||
_mu.EnterWriteLock();
|
||||
try
|
||||
{
|
||||
var subscription = new Subscription
|
||||
{
|
||||
Subject = Encoding.ASCII.GetBytes(subject),
|
||||
Sid = Encoding.ASCII.GetBytes(subject),
|
||||
};
|
||||
|
||||
_internalSubscriptions[subject] = subscription;
|
||||
return (subscription, null);
|
||||
}
|
||||
finally
|
||||
{
|
||||
_mu.ExitWriteLock();
|
||||
}
|
||||
}
|
||||
|
||||
internal (Subscription? Subscription, Exception? Error) QueueSubscribeInternal(string subject, string queue, Action<InMsg>? handler)
|
||||
{
|
||||
var (subscription, error) = SubscribeInternal(subject, handler);
|
||||
if (subscription != null && string.IsNullOrWhiteSpace(queue) is false)
|
||||
subscription.Queue = Encoding.ASCII.GetBytes(queue);
|
||||
|
||||
return (subscription, error);
|
||||
}
|
||||
|
||||
internal Exception? UnsubscribeInternal(string subject)
|
||||
{
|
||||
if (string.IsNullOrWhiteSpace(subject))
|
||||
return new ArgumentException("subject required", nameof(subject));
|
||||
|
||||
_mu.EnterWriteLock();
|
||||
try
|
||||
{
|
||||
if (_internalSubscriptions.TryGetValue(subject, out var subscription))
|
||||
{
|
||||
subscription.Close();
|
||||
_internalSubscriptions.Remove(subject);
|
||||
}
|
||||
|
||||
Exception? error = null;
|
||||
return error;
|
||||
}
|
||||
finally
|
||||
{
|
||||
_mu.ExitWriteLock();
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -14,6 +14,7 @@
|
||||
// Adapted from server/stream.go in the NATS server Go source.
|
||||
|
||||
using System.Threading.Channels;
|
||||
using ZB.MOM.NatsNet.Server.Internal;
|
||||
|
||||
namespace ZB.MOM.NatsNet.Server;
|
||||
|
||||
@@ -53,6 +54,11 @@ internal sealed partial class NatsStream : IDisposable
|
||||
private readonly Dictionary<string, StreamSourceInfo> _sources = new(StringComparer.Ordinal);
|
||||
private StreamSourceInfo? _mirrorInfo;
|
||||
private Timer? _mirrorConsumerSetupTimer;
|
||||
private readonly Dictionary<string, ulong> _sourceStartingSequences = new(StringComparer.Ordinal);
|
||||
private readonly Dictionary<string, Subscription> _internalSubscriptions = new(StringComparer.Ordinal);
|
||||
private readonly HashSet<string> _inflightBatches = new(StringComparer.Ordinal);
|
||||
private bool _allowDirectSubscription;
|
||||
private bool _allowMirrorDirectSubscription;
|
||||
|
||||
/// <summary>IRaftNode — stored as object to avoid cross-dependency on Raft session.</summary>
|
||||
private object? _node;
|
||||
|
||||
@@ -93,6 +93,20 @@ public sealed class StreamSourceInfo
|
||||
|
||||
[JsonPropertyName("error")]
|
||||
public string? Error { get; set; }
|
||||
|
||||
internal bool IsCurrentSub(string reply)
|
||||
{
|
||||
if (string.IsNullOrWhiteSpace(reply) || string.IsNullOrWhiteSpace(Name))
|
||||
return false;
|
||||
|
||||
return reply.Contains($".{Name}.", StringComparison.Ordinal) || reply.EndsWith($".{Name}", StringComparison.Ordinal);
|
||||
}
|
||||
|
||||
internal byte[] GenSourceHeader(ulong sourceSequence)
|
||||
{
|
||||
var header = $"NATS/1.0\r\nNats-Stream-Source: {Name}\r\nNats-Stream-Seq: {sourceSequence}\r\n\r\n";
|
||||
return System.Text.Encoding.ASCII.GetBytes(header);
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
@@ -188,7 +202,9 @@ public sealed class JSPubAckResponse
|
||||
{
|
||||
if (PubAckError is { ErrCode: > 0 })
|
||||
return new InvalidOperationException($"{PubAckError.Description} (errCode={PubAckError.ErrCode})");
|
||||
return null;
|
||||
|
||||
Exception? error = null;
|
||||
return error;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -238,6 +254,23 @@ public sealed class InMsg
|
||||
|
||||
/// <summary>The originating client (opaque, set at runtime).</summary>
|
||||
public object? Client { get; set; }
|
||||
|
||||
internal bool IsControlMsg()
|
||||
{
|
||||
if (!string.IsNullOrEmpty(Subject))
|
||||
{
|
||||
if (Subject.StartsWith("$JS.FC.", StringComparison.Ordinal) ||
|
||||
Subject.StartsWith("$JS.SYNC.", StringComparison.Ordinal))
|
||||
return true;
|
||||
}
|
||||
|
||||
if (Hdr == null || Hdr.Length == 0)
|
||||
return false;
|
||||
|
||||
var headerText = System.Text.Encoding.ASCII.GetString(Hdr);
|
||||
return headerText.Contains("Status: 100", StringComparison.Ordinal) ||
|
||||
headerText.Contains("Nats-Consumer-Stalled", StringComparison.Ordinal);
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
@@ -482,7 +515,10 @@ public sealed class WaitQueue
|
||||
public WaitingRequest? Peek()
|
||||
{
|
||||
if (Len == 0)
|
||||
return null;
|
||||
{
|
||||
WaitingRequest? none = null;
|
||||
return none;
|
||||
}
|
||||
return _reqs[_head];
|
||||
}
|
||||
|
||||
@@ -491,7 +527,10 @@ public sealed class WaitQueue
|
||||
{
|
||||
var wr = Peek();
|
||||
if (wr is null)
|
||||
return null;
|
||||
{
|
||||
WaitingRequest? none = null;
|
||||
return none;
|
||||
}
|
||||
|
||||
wr.D++;
|
||||
wr.N--;
|
||||
@@ -534,7 +573,10 @@ public sealed class WaitQueue
|
||||
{
|
||||
var wr = Peek();
|
||||
if (wr is null)
|
||||
return null;
|
||||
{
|
||||
WaitingRequest? none = null;
|
||||
return none;
|
||||
}
|
||||
|
||||
wr.D++;
|
||||
wr.N--;
|
||||
|
||||
Reference in New Issue
Block a user