// Copyright 2021-2025 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. using System.Collections.Concurrent; using Shouldly; using ZB.MOM.NatsNet.Server.Internal; namespace ZB.MOM.NatsNet.Server.Tests.Internal; /// /// Tests for . /// Mirrors server/ipqueue_test.go: /// TestIPQueueBasic (ID 688), TestIPQueuePush (ID 689), TestIPQueuePop (ID 690), /// TestIPQueuePopOne (ID 691), TestIPQueueMultiProducers (ID 692), /// TestIPQueueRecycle (ID 693), TestIPQueueDrain (ID 694), /// TestIPQueueSizeCalculation (ID 695), TestIPQueueSizeCalculationWithLimits (ID 696). /// Benchmarks (IDs 697–715) are n/a. /// public sealed class IpQueueTests { [Fact] public void IpqMaxRecycleSize_ShouldAffectQueueConfig() { var q = IpQueue.NewIPQueue("opt-max-recycle", null, IpQueue.IpqMaxRecycleSize(123)); q.MaxRecycleSize.ShouldBe(123); } [Fact] public void IpqSizeCalculation_AndLimitBySize_ShouldEnforceLimit() { var q = IpQueue.NewIPQueue( "opt-size-limit", null, IpQueue.IpqSizeCalculation(e => (ulong)e.Length), IpQueue.IpqLimitBySize(8)); var (_, err1) = q.Push(new byte[4]); err1.ShouldBeNull(); var (_, err2) = q.Push(new byte[4]); err2.ShouldBeNull(); var (_, err3) = q.Push(new byte[1]); err3.ShouldBeSameAs(IpQueueErrors.SizeLimitReached); } [Fact] public void IpqLimitByLen_ShouldEnforceLengthLimit() { var q = IpQueue.NewIPQueue("opt-len-limit", null, IpQueue.IpqLimitByLen(2)); q.Push(1).error.ShouldBeNull(); q.Push(2).error.ShouldBeNull(); q.Push(3).error.ShouldBeSameAs(IpQueueErrors.LenLimitReached); } [Fact] public void NewIPQueue_ShouldApplyOptionsAndRegister() { var registry = new ConcurrentDictionary(); var q = IpQueue.NewIPQueue( "opt-factory", registry, IpQueue.IpqMaxRecycleSize(55), IpQueue.IpqLimitByLen(1)); q.MaxRecycleSize.ShouldBe(55); registry.TryGetValue("opt-factory", out var registered).ShouldBeTrue(); registered.ShouldBeSameAs(q); var (_, err1) = q.Push(1); err1.ShouldBeNull(); var (_, err2) = q.Push(2); err2.ShouldBeSameAs(IpQueueErrors.LenLimitReached); } [Fact] public void Basic_ShouldInitialiseCorrectly() { // Mirror: TestIPQueueBasic var registry = new ConcurrentDictionary(); var q = new IpQueue("test", registry); q.MaxRecycleSize.ShouldBe(IpQueue.DefaultMaxRecycleSize); q.Ch.TryRead(out _).ShouldBeFalse("channel should be empty on creation"); q.Len().ShouldBe(0); // Create a second queue with custom max recycle size. var q2 = new IpQueue("test2", registry, maxRecycleSize: 10); q2.MaxRecycleSize.ShouldBe(10); // Both should be in the registry. registry.ContainsKey("test").ShouldBeTrue(); registry.ContainsKey("test2").ShouldBeTrue(); // Unregister both. q.Unregister(); q2.Unregister(); registry.IsEmpty.ShouldBeTrue("registry should be empty after unregister"); // Push/pop should still work after unregister. q.Push(1); var elts = q.Pop(); elts.ShouldNotBeNull(); elts!.Length.ShouldBe(1); q2.Push(2); var (e, ok) = q2.PopOne(); ok.ShouldBeTrue(); e.ShouldBe(2); } [Fact] public void Push_ShouldNotifyOnFirstElement() { // Mirror: TestIPQueuePush var q = new IpQueue("test"); q.Push(1); q.Len().ShouldBe(1); q.Ch.TryRead(out _).ShouldBeTrue("should have been notified after first push"); // Second push should NOT send another notification. q.Push(2); q.Len().ShouldBe(2); q.Ch.TryRead(out _).ShouldBeFalse("should not notify again when queue was not empty"); } [Fact] public void Pop_ShouldReturnElementsAndTrackInProgress() { // Mirror: TestIPQueuePop var q = new IpQueue("test"); q.Push(1); q.Ch.TryRead(out _); // consume signal var elts = q.Pop(); elts.ShouldNotBeNull(); elts!.Length.ShouldBe(1); q.Len().ShouldBe(0); // Channel should still be empty after pop. q.Ch.TryRead(out _).ShouldBeFalse(); // InProgress should be 1 — pop increments it. q.InProgress().ShouldBe(1L); // Recycle decrements it. q.Recycle(elts); q.InProgress().ShouldBe(0L); // Pop on empty queue returns null. var empty = q.Pop(); empty.ShouldBeNull(); q.InProgress().ShouldBe(0L); } [Fact] public void PopOne_ShouldReturnOneAtATime() { // Mirror: TestIPQueuePopOne var q = new IpQueue("test"); q.Push(1); q.Ch.TryRead(out _); // consume signal var (e, ok) = q.PopOne(); ok.ShouldBeTrue(); e.ShouldBe(1); q.Len().ShouldBe(0); q.InProgress().ShouldBe(0L, "popOne does not increment inprogress"); q.Ch.TryRead(out _).ShouldBeFalse("no notification when queue is emptied by popOne"); q.Push(2); q.Push(3); var (e2, ok2) = q.PopOne(); ok2.ShouldBeTrue(); e2.ShouldBe(2); q.Len().ShouldBe(1); q.Ch.TryRead(out _).ShouldBeTrue("should re-notify when more items remain"); var (e3, ok3) = q.PopOne(); ok3.ShouldBeTrue(); e3.ShouldBe(3); q.Len().ShouldBe(0); q.Ch.TryRead(out _).ShouldBeFalse("no notification after last element removed"); var (_, okEmpty) = q.PopOne(); okEmpty.ShouldBeFalse("popOne on empty queue returns false"); } [Fact] public async Task MultiProducers_ShouldReceiveAllElements() { // Mirror: TestIPQueueMultiProducers var q = new IpQueue("test"); const int itemsPerProducer = 100; const int numProducers = 3; var tasks = Enumerable.Range(0, numProducers).Select(p => Task.Run(() => { for (var i = p * itemsPerProducer + 1; i <= (p + 1) * itemsPerProducer; i++) q.Push(i); })).ToArray(); var received = new HashSet(); var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5)); while (received.Count < numProducers * itemsPerProducer && !cts.Token.IsCancellationRequested) { if (q.Ch.TryRead(out _)) { var batch = q.Pop(); if (batch != null) { foreach (var v in batch) received.Add(v); q.Recycle(batch); q.InProgress().ShouldBe(0L); } } else { await Task.Delay(1, cts.Token); } } await Task.WhenAll(tasks); received.Count.ShouldBe(numProducers * itemsPerProducer, "all elements should be received"); } [Fact] public void Recycle_ShouldDecrementInProgressAndAllowReuse() { // Mirror: TestIPQueueRecycle (behavioral aspects) var q = new IpQueue("test"); const int total = 1000; for (var i = 0; i < total; i++) { var (len, err) = q.Push(i); err.ShouldBeNull(); len.ShouldBe(i + 1); } var values = q.Pop(); values.ShouldNotBeNull(); values!.Length.ShouldBe(total); q.InProgress().ShouldBe((long)total); q.Recycle(values); q.InProgress().ShouldBe(0L, "recycle should decrement inprogress"); // Should be able to push/pop again after recycle. var (l, err2) = q.Push(1001); err2.ShouldBeNull(); l.ShouldBe(1); var values2 = q.Pop(); values2.ShouldNotBeNull(); values2!.Length.ShouldBe(1); values2[0].ShouldBe(1001); // Recycle with small max recycle size: large arrays should not be pooled // (behavioral: push/pop still works correctly). var q2 = new IpQueue("test2", maxRecycleSize: 10); for (var i = 0; i < 100; i++) q2.Push(i); var bigBatch = q2.Pop(); bigBatch.ShouldNotBeNull(); bigBatch!.Length.ShouldBe(100); q2.Recycle(bigBatch); q2.InProgress().ShouldBe(0L); q2.Push(1001); var small = q2.Pop(); small.ShouldNotBeNull(); small!.Length.ShouldBe(1); q2.Recycle(small); } [Fact] public void Drain_ShouldEmptyQueueAndConsumeSignal() { // Mirror: TestIPQueueDrain var q = new IpQueue("test"); for (var i = 1; i <= 100; i++) q.Push(i); var drained = q.Drain(); drained.ShouldBe(100); // Signal should have been consumed. q.Ch.TryRead(out _).ShouldBeFalse("drain should consume the notification signal"); q.Len().ShouldBe(0); } [Fact] public void SizeCalculation_ShouldTrackTotalSize() { // Mirror: TestIPQueueSizeCalculation const int elemSize = 16; var q = new IpQueue("test", sizeCalc: e => (ulong)e.Length); for (var i = 0; i < 10; i++) { q.Push(new byte[elemSize]); q.Len().ShouldBe(i + 1); q.Size().ShouldBe((ulong)(i + 1) * elemSize); } for (var i = 10; i > 5; i--) { q.PopOne(); q.Len().ShouldBe(i - 1); q.Size().ShouldBe((ulong)(i - 1) * elemSize); } q.Pop(); q.Len().ShouldBe(0); q.Size().ShouldBe(0UL); } [Fact] public void SizeCalculationWithLimits_ShouldEnforceLimits() { // Mirror: TestIPQueueSizeCalculationWithLimits const int elemSize = 16; Func calc = e => (ulong)e.Length; var elem = new byte[elemSize]; // LimitByLen var q1 = new IpQueue("test-len", sizeCalc: calc, maxLen: 5); for (var i = 0; i < 10; i++) { var (n, err) = q1.Push(elem); if (i >= 5) { err.ShouldBeSameAs(IpQueueErrors.LenLimitReached, $"iteration {i}"); } else { err.ShouldBeNull($"iteration {i}"); } n.ShouldBeLessThan(6); } // LimitBySize var q2 = new IpQueue("test-size", sizeCalc: calc, maxSize: elemSize * 5); for (var i = 0; i < 10; i++) { var (n, err) = q2.Push(elem); if (i >= 5) { err.ShouldBeSameAs(IpQueueErrors.SizeLimitReached, $"iteration {i}"); } else { err.ShouldBeNull($"iteration {i}"); } n.ShouldBeLessThan(6); } } }