task6(batch39): implement shutdown and signal flow paths

This commit is contained in:
Joseph Doherty
2026-03-01 01:30:17 -05:00
parent c0ec1f3341
commit 09f73a0d2f
7 changed files with 333 additions and 0 deletions

View File

@@ -0,0 +1,222 @@
namespace ZB.MOM.NatsNet.Server;
internal sealed partial class NatsConsumer
{
internal void StopWithFlags(bool clearPending, bool clearAdvisories)
{
_mu.EnterWriteLock();
try
{
_closed = true;
_quitCts?.Cancel();
_deleteTimer = StopAndClearTimer(_deleteTimer);
_pendingTimer = StopAndClearTimer(_pendingTimer);
if (clearPending)
ResetPendingDeliveries();
if (!clearAdvisories)
_ = SendDeleteAdvisoryLocked();
}
finally
{
_mu.ExitWriteLock();
}
}
internal int CleanupNoInterestMessages()
{
_mu.EnterWriteLock();
try
{
if (_state.Pending is not { Count: > 0 })
return 0;
var removed = _state.Pending.Count;
_state.Pending.Clear();
_streamPending = 0;
return removed;
}
finally
{
_mu.ExitWriteLock();
}
}
internal static bool DeliveryFormsCycle(string subject, string deliverSubject) =>
!string.IsNullOrWhiteSpace(subject) &&
!string.IsNullOrWhiteSpace(deliverSubject) &&
subject.StartsWith(deliverSubject, StringComparison.Ordinal);
internal bool SwitchToEphemeral()
{
_mu.EnterWriteLock();
try
{
if (string.IsNullOrWhiteSpace(Config.Durable))
return false;
Config.Durable = null;
Name = CreateConsumerName();
return true;
}
finally
{
_mu.ExitWriteLock();
}
}
internal string RequestNextMsgSubject() =>
$"$JS.API.CONSUMER.MSG.NEXT.{Stream}.{Name}";
internal long DecStreamPending()
{
_mu.EnterWriteLock();
try
{
_streamPending = Math.Max(0, _streamPending - 1);
return _streamPending;
}
finally
{
_mu.ExitWriteLock();
}
}
internal Account? Account() => GetStream()?.Account;
internal void SignalSubs() => SignalNewMessages();
internal bool ProcessStreamSignal(string subject, ulong sequence)
{
_ = subject;
_mu.EnterWriteLock();
try
{
if (_closed)
return false;
_state.Delivered.Stream = Math.Max(_state.Delivered.Stream, sequence);
SignalNewMessages();
return true;
}
finally
{
_mu.ExitWriteLock();
}
}
internal static bool SubjectSliceEqual(string[] left, string[] right)
{
if (ReferenceEquals(left, right))
return true;
if (left.Length != right.Length)
return false;
for (var i = 0; i < left.Length; i++)
{
if (!string.Equals(left[i], right[i], StringComparison.Ordinal))
return false;
}
return true;
}
internal static string[] GatherSubjectFilters(ConsumerConfig config)
{
ArgumentNullException.ThrowIfNull(config);
if (config.FilterSubjects is { Length: > 0 })
return config.FilterSubjects.Where(s => !string.IsNullOrWhiteSpace(s)).ToArray();
if (!string.IsNullOrWhiteSpace(config.FilterSubject))
return [config.FilterSubject!];
return [];
}
internal bool ShouldStartMonitor()
{
_mu.EnterReadLock();
try
{
return !_closed && !_monitorRunning && (Config.InactiveThreshold > TimeSpan.Zero || IsPushMode());
}
finally
{
_mu.ExitReadLock();
}
}
internal void ClearMonitorRunning()
{
_mu.EnterWriteLock();
try
{
_monitorRunning = false;
}
finally
{
_mu.ExitWriteLock();
}
}
internal bool IsMonitorRunning()
{
_mu.EnterReadLock();
try
{
return _monitorRunning;
}
finally
{
_mu.ExitReadLock();
}
}
internal bool CheckStateForInterestStream()
{
_mu.EnterReadLock();
try
{
return _state.Pending is { Count: > 0 } || HasDeliveryInterest();
}
finally
{
_mu.ExitReadLock();
}
}
internal void ResetPtmr(TimeSpan due)
{
_mu.EnterWriteLock();
try
{
_pendingTimer ??= new Timer(static s => ((NatsConsumer)s!).CheckPending(), this, Timeout.InfiniteTimeSpan, Timeout.InfiniteTimeSpan);
if (due <= TimeSpan.Zero)
due = TimeSpan.FromMilliseconds(1);
_pendingTimer.Change(due, Timeout.InfiniteTimeSpan);
}
finally
{
_mu.ExitWriteLock();
}
}
internal void StopAndClearPtmr()
{
_mu.EnterWriteLock();
try
{
_pendingTimer = StopAndClearTimer(_pendingTimer);
}
finally
{
_mu.ExitWriteLock();
}
}
internal void ResetPendingDeliveries()
{
_state.Pending?.Clear();
_state.Redelivered?.Clear();
_redeliveryQueue.Clear();
_redeliveryIndex.Clear();
_npc = 0;
}
}

View File

@@ -68,6 +68,9 @@ internal sealed partial class NatsConsumer : IDisposable
private string _flowControlReplyId = string.Empty;
private readonly Queue<ulong> _redeliveryQueue = new();
private readonly HashSet<ulong> _redeliveryIndex = new();
private bool _monitorRunning;
private long _streamPending;
private Timer? _pendingTimer;
/// <summary>IRaftNode — stored as object to avoid cross-dependency on Raft session.</summary>
private object? _node;