Files
natsnet/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/Internal/IpQueueTests.cs

373 lines
11 KiB
C#
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
// Copyright 2021-2025 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
using System.Collections.Concurrent;
using Shouldly;
using ZB.MOM.NatsNet.Server.Internal;
namespace ZB.MOM.NatsNet.Server.Tests.Internal;
/// <summary>
/// Tests for <see cref="IpQueue{T}"/>.
/// Mirrors server/ipqueue_test.go:
/// TestIPQueueBasic (ID 688), TestIPQueuePush (ID 689), TestIPQueuePop (ID 690),
/// TestIPQueuePopOne (ID 691), TestIPQueueMultiProducers (ID 692),
/// TestIPQueueRecycle (ID 693), TestIPQueueDrain (ID 694),
/// TestIPQueueSizeCalculation (ID 695), TestIPQueueSizeCalculationWithLimits (ID 696).
/// Benchmarks (IDs 697715) are n/a.
/// </summary>
public sealed class IpQueueTests
{
[Fact]
public void IpqMaxRecycleSize_ShouldAffectQueueConfig()
{
var q = IpQueue<int>.NewIPQueue("opt-max-recycle", null, IpQueue<int>.IpqMaxRecycleSize(123));
q.MaxRecycleSize.ShouldBe(123);
}
[Fact]
public void IpqSizeCalculation_AndLimitBySize_ShouldEnforceLimit()
{
var q = IpQueue<byte[]>.NewIPQueue(
"opt-size-limit",
null,
IpQueue<byte[]>.IpqSizeCalculation(e => (ulong)e.Length),
IpQueue<byte[]>.IpqLimitBySize(8));
var (_, err1) = q.Push(new byte[4]);
err1.ShouldBeNull();
var (_, err2) = q.Push(new byte[4]);
err2.ShouldBeNull();
var (_, err3) = q.Push(new byte[1]);
err3.ShouldBeSameAs(IpQueueErrors.SizeLimitReached);
}
[Fact]
public void IpqLimitByLen_ShouldEnforceLengthLimit()
{
var q = IpQueue<int>.NewIPQueue("opt-len-limit", null, IpQueue<int>.IpqLimitByLen(2));
q.Push(1).error.ShouldBeNull();
q.Push(2).error.ShouldBeNull();
q.Push(3).error.ShouldBeSameAs(IpQueueErrors.LenLimitReached);
}
[Fact]
public void NewIPQueue_ShouldApplyOptionsAndRegister()
{
var registry = new ConcurrentDictionary<string, object>();
var q = IpQueue<int>.NewIPQueue(
"opt-factory",
registry,
IpQueue<int>.IpqMaxRecycleSize(55),
IpQueue<int>.IpqLimitByLen(1));
q.MaxRecycleSize.ShouldBe(55);
registry.TryGetValue("opt-factory", out var registered).ShouldBeTrue();
registered.ShouldBeSameAs(q);
var (_, err1) = q.Push(1);
err1.ShouldBeNull();
var (_, err2) = q.Push(2);
err2.ShouldBeSameAs(IpQueueErrors.LenLimitReached);
}
[Fact]
public void Basic_ShouldInitialiseCorrectly()
{
// Mirror: TestIPQueueBasic
var registry = new ConcurrentDictionary<string, object>();
var q = new IpQueue<int>("test", registry);
q.MaxRecycleSize.ShouldBe(IpQueue<int>.DefaultMaxRecycleSize);
q.Ch.TryRead(out _).ShouldBeFalse("channel should be empty on creation");
q.Len().ShouldBe(0);
// Create a second queue with custom max recycle size.
var q2 = new IpQueue<int>("test2", registry, maxRecycleSize: 10);
q2.MaxRecycleSize.ShouldBe(10);
// Both should be in the registry.
registry.ContainsKey("test").ShouldBeTrue();
registry.ContainsKey("test2").ShouldBeTrue();
// Unregister both.
q.Unregister();
q2.Unregister();
registry.IsEmpty.ShouldBeTrue("registry should be empty after unregister");
// Push/pop should still work after unregister.
q.Push(1);
var elts = q.Pop();
elts.ShouldNotBeNull();
elts!.Length.ShouldBe(1);
q2.Push(2);
var (e, ok) = q2.PopOne();
ok.ShouldBeTrue();
e.ShouldBe(2);
}
[Fact]
public void Push_ShouldNotifyOnFirstElement()
{
// Mirror: TestIPQueuePush
var q = new IpQueue<int>("test");
q.Push(1);
q.Len().ShouldBe(1);
q.Ch.TryRead(out _).ShouldBeTrue("should have been notified after first push");
// Second push should NOT send another notification.
q.Push(2);
q.Len().ShouldBe(2);
q.Ch.TryRead(out _).ShouldBeFalse("should not notify again when queue was not empty");
}
[Fact]
public void Pop_ShouldReturnElementsAndTrackInProgress()
{
// Mirror: TestIPQueuePop
var q = new IpQueue<int>("test");
q.Push(1);
q.Ch.TryRead(out _); // consume signal
var elts = q.Pop();
elts.ShouldNotBeNull();
elts!.Length.ShouldBe(1);
q.Len().ShouldBe(0);
// Channel should still be empty after pop.
q.Ch.TryRead(out _).ShouldBeFalse();
// InProgress should be 1 — pop increments it.
q.InProgress().ShouldBe(1L);
// Recycle decrements it.
q.Recycle(elts);
q.InProgress().ShouldBe(0L);
// Pop on empty queue returns null.
var empty = q.Pop();
empty.ShouldBeNull();
q.InProgress().ShouldBe(0L);
}
[Fact]
public void PopOne_ShouldReturnOneAtATime()
{
// Mirror: TestIPQueuePopOne
var q = new IpQueue<int>("test");
q.Push(1);
q.Ch.TryRead(out _); // consume signal
var (e, ok) = q.PopOne();
ok.ShouldBeTrue();
e.ShouldBe(1);
q.Len().ShouldBe(0);
q.InProgress().ShouldBe(0L, "popOne does not increment inprogress");
q.Ch.TryRead(out _).ShouldBeFalse("no notification when queue is emptied by popOne");
q.Push(2);
q.Push(3);
var (e2, ok2) = q.PopOne();
ok2.ShouldBeTrue();
e2.ShouldBe(2);
q.Len().ShouldBe(1);
q.Ch.TryRead(out _).ShouldBeTrue("should re-notify when more items remain");
var (e3, ok3) = q.PopOne();
ok3.ShouldBeTrue();
e3.ShouldBe(3);
q.Len().ShouldBe(0);
q.Ch.TryRead(out _).ShouldBeFalse("no notification after last element removed");
var (_, okEmpty) = q.PopOne();
okEmpty.ShouldBeFalse("popOne on empty queue returns false");
}
[Fact]
public async Task MultiProducers_ShouldReceiveAllElements()
{
// Mirror: TestIPQueueMultiProducers
var q = new IpQueue<int>("test");
const int itemsPerProducer = 100;
const int numProducers = 3;
var tasks = Enumerable.Range(0, numProducers).Select(p =>
Task.Run(() =>
{
for (var i = p * itemsPerProducer + 1; i <= (p + 1) * itemsPerProducer; i++)
q.Push(i);
})).ToArray();
var received = new HashSet<int>();
var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
while (received.Count < numProducers * itemsPerProducer &&
!cts.Token.IsCancellationRequested)
{
if (q.Ch.TryRead(out _))
{
var batch = q.Pop();
if (batch != null)
{
foreach (var v in batch) received.Add(v);
q.Recycle(batch);
q.InProgress().ShouldBe(0L);
}
}
else
{
await Task.Delay(1, cts.Token);
}
}
await Task.WhenAll(tasks);
received.Count.ShouldBe(numProducers * itemsPerProducer, "all elements should be received");
}
[Fact]
public void Recycle_ShouldDecrementInProgressAndAllowReuse()
{
// Mirror: TestIPQueueRecycle (behavioral aspects)
var q = new IpQueue<int>("test");
const int total = 1000;
for (var i = 0; i < total; i++)
{
var (len, err) = q.Push(i);
err.ShouldBeNull();
len.ShouldBe(i + 1);
}
var values = q.Pop();
values.ShouldNotBeNull();
values!.Length.ShouldBe(total);
q.InProgress().ShouldBe((long)total);
q.Recycle(values);
q.InProgress().ShouldBe(0L, "recycle should decrement inprogress");
// Should be able to push/pop again after recycle.
var (l, err2) = q.Push(1001);
err2.ShouldBeNull();
l.ShouldBe(1);
var values2 = q.Pop();
values2.ShouldNotBeNull();
values2!.Length.ShouldBe(1);
values2[0].ShouldBe(1001);
// Recycle with small max recycle size: large arrays should not be pooled
// (behavioral: push/pop still works correctly).
var q2 = new IpQueue<int>("test2", maxRecycleSize: 10);
for (var i = 0; i < 100; i++) q2.Push(i);
var bigBatch = q2.Pop();
bigBatch.ShouldNotBeNull();
bigBatch!.Length.ShouldBe(100);
q2.Recycle(bigBatch);
q2.InProgress().ShouldBe(0L);
q2.Push(1001);
var small = q2.Pop();
small.ShouldNotBeNull();
small!.Length.ShouldBe(1);
q2.Recycle(small);
}
[Fact]
public void Drain_ShouldEmptyQueueAndConsumeSignal()
{
// Mirror: TestIPQueueDrain
var q = new IpQueue<int>("test");
for (var i = 1; i <= 100; i++) q.Push(i);
var drained = q.Drain();
drained.ShouldBe(100);
// Signal should have been consumed.
q.Ch.TryRead(out _).ShouldBeFalse("drain should consume the notification signal");
q.Len().ShouldBe(0);
}
[Fact]
public void SizeCalculation_ShouldTrackTotalSize()
{
// Mirror: TestIPQueueSizeCalculation
const int elemSize = 16;
var q = new IpQueue<byte[]>("test", sizeCalc: e => (ulong)e.Length);
for (var i = 0; i < 10; i++)
{
q.Push(new byte[elemSize]);
q.Len().ShouldBe(i + 1);
q.Size().ShouldBe((ulong)(i + 1) * elemSize);
}
for (var i = 10; i > 5; i--)
{
q.PopOne();
q.Len().ShouldBe(i - 1);
q.Size().ShouldBe((ulong)(i - 1) * elemSize);
}
q.Pop();
q.Len().ShouldBe(0);
q.Size().ShouldBe(0UL);
}
[Fact]
public void SizeCalculationWithLimits_ShouldEnforceLimits()
{
// Mirror: TestIPQueueSizeCalculationWithLimits
const int elemSize = 16;
Func<byte[], ulong> calc = e => (ulong)e.Length;
var elem = new byte[elemSize];
// LimitByLen
var q1 = new IpQueue<byte[]>("test-len", sizeCalc: calc, maxLen: 5);
for (var i = 0; i < 10; i++)
{
var (n, err) = q1.Push(elem);
if (i >= 5)
{
err.ShouldBeSameAs(IpQueueErrors.LenLimitReached, $"iteration {i}");
}
else
{
err.ShouldBeNull($"iteration {i}");
}
n.ShouldBeLessThan(6);
}
// LimitBySize
var q2 = new IpQueue<byte[]>("test-size", sizeCalc: calc, maxSize: elemSize * 5);
for (var i = 0; i < 10; i++)
{
var (n, err) = q2.Push(elem);
if (i >= 5)
{
err.ShouldBeSameAs(IpQueueErrors.SizeLimitReached, $"iteration {i}");
}
else
{
err.ShouldBeNull($"iteration {i}");
}
n.ShouldBeLessThan(6);
}
}
}