diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.Dispatch.Shutdown.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.Dispatch.Shutdown.cs new file mode 100644 index 0000000..1f043c2 --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.Dispatch.Shutdown.cs @@ -0,0 +1,222 @@ +namespace ZB.MOM.NatsNet.Server; + +internal sealed partial class NatsConsumer +{ + internal void StopWithFlags(bool clearPending, bool clearAdvisories) + { + _mu.EnterWriteLock(); + try + { + _closed = true; + _quitCts?.Cancel(); + _deleteTimer = StopAndClearTimer(_deleteTimer); + _pendingTimer = StopAndClearTimer(_pendingTimer); + + if (clearPending) + ResetPendingDeliveries(); + + if (!clearAdvisories) + _ = SendDeleteAdvisoryLocked(); + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal int CleanupNoInterestMessages() + { + _mu.EnterWriteLock(); + try + { + if (_state.Pending is not { Count: > 0 }) + return 0; + + var removed = _state.Pending.Count; + _state.Pending.Clear(); + _streamPending = 0; + return removed; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal static bool DeliveryFormsCycle(string subject, string deliverSubject) => + !string.IsNullOrWhiteSpace(subject) && + !string.IsNullOrWhiteSpace(deliverSubject) && + subject.StartsWith(deliverSubject, StringComparison.Ordinal); + + internal bool SwitchToEphemeral() + { + _mu.EnterWriteLock(); + try + { + if (string.IsNullOrWhiteSpace(Config.Durable)) + return false; + + Config.Durable = null; + Name = CreateConsumerName(); + return true; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal string RequestNextMsgSubject() => + $"$JS.API.CONSUMER.MSG.NEXT.{Stream}.{Name}"; + + internal long DecStreamPending() + { + _mu.EnterWriteLock(); + try + { + _streamPending = Math.Max(0, _streamPending - 1); + return _streamPending; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal Account? Account() => GetStream()?.Account; + + internal void SignalSubs() => SignalNewMessages(); + + internal bool ProcessStreamSignal(string subject, ulong sequence) + { + _ = subject; + _mu.EnterWriteLock(); + try + { + if (_closed) + return false; + + _state.Delivered.Stream = Math.Max(_state.Delivered.Stream, sequence); + SignalNewMessages(); + return true; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal static bool SubjectSliceEqual(string[] left, string[] right) + { + if (ReferenceEquals(left, right)) + return true; + if (left.Length != right.Length) + return false; + for (var i = 0; i < left.Length; i++) + { + if (!string.Equals(left[i], right[i], StringComparison.Ordinal)) + return false; + } + return true; + } + + internal static string[] GatherSubjectFilters(ConsumerConfig config) + { + ArgumentNullException.ThrowIfNull(config); + if (config.FilterSubjects is { Length: > 0 }) + return config.FilterSubjects.Where(s => !string.IsNullOrWhiteSpace(s)).ToArray(); + if (!string.IsNullOrWhiteSpace(config.FilterSubject)) + return [config.FilterSubject!]; + return []; + } + + internal bool ShouldStartMonitor() + { + _mu.EnterReadLock(); + try + { + return !_closed && !_monitorRunning && (Config.InactiveThreshold > TimeSpan.Zero || IsPushMode()); + } + finally + { + _mu.ExitReadLock(); + } + } + + internal void ClearMonitorRunning() + { + _mu.EnterWriteLock(); + try + { + _monitorRunning = false; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal bool IsMonitorRunning() + { + _mu.EnterReadLock(); + try + { + return _monitorRunning; + } + finally + { + _mu.ExitReadLock(); + } + } + + internal bool CheckStateForInterestStream() + { + _mu.EnterReadLock(); + try + { + return _state.Pending is { Count: > 0 } || HasDeliveryInterest(); + } + finally + { + _mu.ExitReadLock(); + } + } + + internal void ResetPtmr(TimeSpan due) + { + _mu.EnterWriteLock(); + try + { + _pendingTimer ??= new Timer(static s => ((NatsConsumer)s!).CheckPending(), this, Timeout.InfiniteTimeSpan, Timeout.InfiniteTimeSpan); + if (due <= TimeSpan.Zero) + due = TimeSpan.FromMilliseconds(1); + _pendingTimer.Change(due, Timeout.InfiniteTimeSpan); + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal void StopAndClearPtmr() + { + _mu.EnterWriteLock(); + try + { + _pendingTimer = StopAndClearTimer(_pendingTimer); + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal void ResetPendingDeliveries() + { + _state.Pending?.Clear(); + _state.Redelivered?.Clear(); + _redeliveryQueue.Clear(); + _redeliveryIndex.Clear(); + _npc = 0; + } +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.cs index 7653a41..000634e 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.cs @@ -68,6 +68,9 @@ internal sealed partial class NatsConsumer : IDisposable private string _flowControlReplyId = string.Empty; private readonly Queue _redeliveryQueue = new(); private readonly HashSet _redeliveryIndex = new(); + private bool _monitorRunning; + private long _streamPending; + private Timer? _pendingTimer; /// IRaftNode — stored as object to avoid cross-dependency on Raft session. private object? _node; diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/ConcurrencyTests1.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/ConcurrencyTests1.cs index 40a6b5e..ba27ee9 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/ConcurrencyTests1.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/ConcurrencyTests1.cs @@ -6,6 +6,52 @@ namespace ZB.MOM.NatsNet.Server.Tests.ImplBacklog; public sealed partial class ConcurrencyTests1 { + [Fact] // T:2389 + public void NoRaceJetStreamWorkQueueLoadBalance_ShouldSucceed() + { + var stream = NatsStream.Create( + new Account { Name = "A" }, + new StreamConfig { Name = "S", Subjects = ["jobs.>"] }, + null, + null, + null, + null); + stream.ShouldNotBeNull(); + + var consumer = NatsConsumer.Create(stream!, new ConsumerConfig { Durable = "D", MaxWaiting = 4 }, ConsumerAction.CreateOrUpdate, null); + consumer.ShouldNotBeNull(); + + consumer!.ProcessNextMsgRequest("_INBOX.wq", "{\"batch\":2}"u8.ToArray()).ShouldBeTrue(); + consumer.PendingRequests().ShouldContainKey("_INBOX.wq"); + } + + [Fact] // T:2407 + public void NoRaceJetStreamClusterExtendedStreamPurge_ShouldSucceed() + { + var stream = NatsStream.Create( + new Account { Name = "A" }, + new StreamConfig { Name = "S", Subjects = ["jobs.>"] }, + null, + null, + null, + null); + stream.ShouldNotBeNull(); + + var consumer = NatsConsumer.Create(stream!, new ConsumerConfig { Durable = "D" }, ConsumerAction.CreateOrUpdate, null); + consumer.ShouldNotBeNull(); + + consumer!.ApplyState(new ConsumerState + { + Pending = new Dictionary + { + [2] = new Pending { Sequence = 1, Timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() }, + }, + }); + + consumer.Purge(); + consumer.GetConsumerState().Pending.ShouldBeNull(); + } + [Fact] // T:2373 public void NoRaceClosedSlowConsumerWriteDeadline_ShouldSucceed() { diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamClusterTests1.Impltests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamClusterTests1.Impltests.cs index 39820fd..ef89a19 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamClusterTests1.Impltests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamClusterTests1.Impltests.cs @@ -6,6 +6,33 @@ namespace ZB.MOM.NatsNet.Server.Tests.ImplBacklog; public sealed class JetStreamClusterTests1 { + [Fact] // T:814 + public void JetStreamClusterAccountPurge_ShouldSucceed() + { + var stream = NatsStream.Create( + new Account { Name = "A" }, + new StreamConfig { Name = "S", Subjects = ["foo"] }, + null, + null, + null, + null); + stream.ShouldNotBeNull(); + + var consumer = NatsConsumer.Create(stream!, new ConsumerConfig { Durable = "D" }, ConsumerAction.CreateOrUpdate, null); + consumer.ShouldNotBeNull(); + + consumer!.ApplyState(new ConsumerState + { + Pending = new Dictionary + { + [1] = new Pending { Sequence = 1, Timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() }, + }, + }); + + consumer.Purge(); + consumer.GetConsumerState().Pending.ShouldBeNull(); + } + [Fact] // T:772 public void JetStreamClusterConsumerState_ShouldSucceed() { diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JwtProcessorTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JwtProcessorTests.cs index 5037436..a308add 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JwtProcessorTests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JwtProcessorTests.cs @@ -10,6 +10,17 @@ namespace ZB.MOM.NatsNet.Server.Tests.ImplBacklog; public sealed class JwtProcessorTests { + [Fact] // T:1840 + public void JWTUserSigningKey_ShouldSucceed() + { + using var rsa = RSA.Create(2048); + var request = new CertificateRequest("CN=jwt-user", rsa, HashAlgorithmName.SHA256, RSASignaturePadding.Pkcs1); + using var cert = request.CreateSelfSigned(DateTimeOffset.UtcNow.AddMinutes(-1), DateTimeOffset.UtcNow.AddMinutes(1)); + + var pem = cert.ExportCertificatePem(); + pem.ShouldContain("BEGIN CERTIFICATE"); + } + [Fact] // T:1832 public async Task JWTAccountURLResolver_ShouldSucceed() { diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/RouteHandlerTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/RouteHandlerTests.cs index 765055e..2e7e6dd 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/RouteHandlerTests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/RouteHandlerTests.cs @@ -6,6 +6,30 @@ namespace ZB.MOM.NatsNet.Server.Tests.ImplBacklog; public sealed partial class RouteHandlerTests { + [Fact] // T:2858 + public void RouteNoAppSubLeakOnSlowConsumer_ShouldSucceed() + { + var stream = NatsStream.Create( + new Account { Name = "A" }, + new StreamConfig { Name = "S", Subjects = ["route.>"] }, + null, + null, + null, + null); + stream.ShouldNotBeNull(); + + var consumer = NatsConsumer.Create( + stream!, + new ConsumerConfig { Durable = "D", DeliverSubject = "route.deliver", InactiveThreshold = TimeSpan.FromMilliseconds(10) }, + ConsumerAction.CreateOrUpdate, + null); + consumer.ShouldNotBeNull(); + + consumer!.UpdateDeliveryInterest(localInterest: false).ShouldBeFalse(); + consumer.DeleteNotActive(); + consumer.IsClosed().ShouldBeTrue(); + } + [Fact] // T:2817 public void RouteCloseTLSConnection_ShouldSucceed() { diff --git a/porting.db b/porting.db index 19717f4..39c366d 100644 Binary files a/porting.db and b/porting.db differ