373 lines
11 KiB
C#
373 lines
11 KiB
C#
// Copyright 2021-2025 The NATS Authors
|
||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||
// you may not use this file except in compliance with the License.
|
||
// You may obtain a copy of the License at
|
||
//
|
||
// http://www.apache.org/licenses/LICENSE-2.0
|
||
//
|
||
// Unless required by applicable law or agreed to in writing, software
|
||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||
// See the License for the specific language governing permissions and
|
||
// limitations under the License.
|
||
|
||
using System.Collections.Concurrent;
|
||
using Shouldly;
|
||
using ZB.MOM.NatsNet.Server.Internal;
|
||
|
||
namespace ZB.MOM.NatsNet.Server.Tests.Internal;
|
||
|
||
/// <summary>
|
||
/// Tests for <see cref="IpQueue{T}"/>.
|
||
/// Mirrors server/ipqueue_test.go:
|
||
/// TestIPQueueBasic (ID 688), TestIPQueuePush (ID 689), TestIPQueuePop (ID 690),
|
||
/// TestIPQueuePopOne (ID 691), TestIPQueueMultiProducers (ID 692),
|
||
/// TestIPQueueRecycle (ID 693), TestIPQueueDrain (ID 694),
|
||
/// TestIPQueueSizeCalculation (ID 695), TestIPQueueSizeCalculationWithLimits (ID 696).
|
||
/// Benchmarks (IDs 697–715) are n/a.
|
||
/// </summary>
|
||
public sealed class IpQueueTests
|
||
{
|
||
[Fact]
|
||
public void IpqMaxRecycleSize_ShouldAffectQueueConfig()
|
||
{
|
||
var q = IpQueue<int>.NewIPQueue("opt-max-recycle", null, IpQueue<int>.IpqMaxRecycleSize(123));
|
||
q.MaxRecycleSize.ShouldBe(123);
|
||
}
|
||
|
||
[Fact]
|
||
public void IpqSizeCalculation_AndLimitBySize_ShouldEnforceLimit()
|
||
{
|
||
var q = IpQueue<byte[]>.NewIPQueue(
|
||
"opt-size-limit",
|
||
null,
|
||
IpQueue<byte[]>.IpqSizeCalculation(e => (ulong)e.Length),
|
||
IpQueue<byte[]>.IpqLimitBySize(8));
|
||
|
||
var (_, err1) = q.Push(new byte[4]);
|
||
err1.ShouldBeNull();
|
||
|
||
var (_, err2) = q.Push(new byte[4]);
|
||
err2.ShouldBeNull();
|
||
|
||
var (_, err3) = q.Push(new byte[1]);
|
||
err3.ShouldBeSameAs(IpQueueErrors.SizeLimitReached);
|
||
}
|
||
|
||
[Fact]
|
||
public void IpqLimitByLen_ShouldEnforceLengthLimit()
|
||
{
|
||
var q = IpQueue<int>.NewIPQueue("opt-len-limit", null, IpQueue<int>.IpqLimitByLen(2));
|
||
|
||
q.Push(1).error.ShouldBeNull();
|
||
q.Push(2).error.ShouldBeNull();
|
||
q.Push(3).error.ShouldBeSameAs(IpQueueErrors.LenLimitReached);
|
||
}
|
||
|
||
[Fact]
|
||
public void NewIPQueue_ShouldApplyOptionsAndRegister()
|
||
{
|
||
var registry = new ConcurrentDictionary<string, object>();
|
||
var q = IpQueue<int>.NewIPQueue(
|
||
"opt-factory",
|
||
registry,
|
||
IpQueue<int>.IpqMaxRecycleSize(55),
|
||
IpQueue<int>.IpqLimitByLen(1));
|
||
|
||
q.MaxRecycleSize.ShouldBe(55);
|
||
registry.TryGetValue("opt-factory", out var registered).ShouldBeTrue();
|
||
registered.ShouldBeSameAs(q);
|
||
|
||
var (_, err1) = q.Push(1);
|
||
err1.ShouldBeNull();
|
||
var (_, err2) = q.Push(2);
|
||
err2.ShouldBeSameAs(IpQueueErrors.LenLimitReached);
|
||
}
|
||
|
||
[Fact]
|
||
public void Basic_ShouldInitialiseCorrectly()
|
||
{
|
||
// Mirror: TestIPQueueBasic
|
||
var registry = new ConcurrentDictionary<string, object>();
|
||
var q = new IpQueue<int>("test", registry);
|
||
|
||
q.MaxRecycleSize.ShouldBe(IpQueue<int>.DefaultMaxRecycleSize);
|
||
q.Ch.TryRead(out _).ShouldBeFalse("channel should be empty on creation");
|
||
q.Len().ShouldBe(0);
|
||
|
||
// Create a second queue with custom max recycle size.
|
||
var q2 = new IpQueue<int>("test2", registry, maxRecycleSize: 10);
|
||
q2.MaxRecycleSize.ShouldBe(10);
|
||
|
||
// Both should be in the registry.
|
||
registry.ContainsKey("test").ShouldBeTrue();
|
||
registry.ContainsKey("test2").ShouldBeTrue();
|
||
|
||
// Unregister both.
|
||
q.Unregister();
|
||
q2.Unregister();
|
||
registry.IsEmpty.ShouldBeTrue("registry should be empty after unregister");
|
||
|
||
// Push/pop should still work after unregister.
|
||
q.Push(1);
|
||
var elts = q.Pop();
|
||
elts.ShouldNotBeNull();
|
||
elts!.Length.ShouldBe(1);
|
||
|
||
q2.Push(2);
|
||
var (e, ok) = q2.PopOne();
|
||
ok.ShouldBeTrue();
|
||
e.ShouldBe(2);
|
||
}
|
||
|
||
[Fact]
|
||
public void Push_ShouldNotifyOnFirstElement()
|
||
{
|
||
// Mirror: TestIPQueuePush
|
||
var q = new IpQueue<int>("test");
|
||
|
||
q.Push(1);
|
||
q.Len().ShouldBe(1);
|
||
q.Ch.TryRead(out _).ShouldBeTrue("should have been notified after first push");
|
||
|
||
// Second push should NOT send another notification.
|
||
q.Push(2);
|
||
q.Len().ShouldBe(2);
|
||
q.Ch.TryRead(out _).ShouldBeFalse("should not notify again when queue was not empty");
|
||
}
|
||
|
||
[Fact]
|
||
public void Pop_ShouldReturnElementsAndTrackInProgress()
|
||
{
|
||
// Mirror: TestIPQueuePop
|
||
var q = new IpQueue<int>("test");
|
||
q.Push(1);
|
||
q.Ch.TryRead(out _); // consume signal
|
||
|
||
var elts = q.Pop();
|
||
elts.ShouldNotBeNull();
|
||
elts!.Length.ShouldBe(1);
|
||
q.Len().ShouldBe(0);
|
||
|
||
// Channel should still be empty after pop.
|
||
q.Ch.TryRead(out _).ShouldBeFalse();
|
||
|
||
// InProgress should be 1 — pop increments it.
|
||
q.InProgress().ShouldBe(1L);
|
||
|
||
// Recycle decrements it.
|
||
q.Recycle(elts);
|
||
q.InProgress().ShouldBe(0L);
|
||
|
||
// Pop on empty queue returns null.
|
||
var empty = q.Pop();
|
||
empty.ShouldBeNull();
|
||
q.InProgress().ShouldBe(0L);
|
||
}
|
||
|
||
[Fact]
|
||
public void PopOne_ShouldReturnOneAtATime()
|
||
{
|
||
// Mirror: TestIPQueuePopOne
|
||
var q = new IpQueue<int>("test");
|
||
q.Push(1);
|
||
q.Ch.TryRead(out _); // consume signal
|
||
|
||
var (e, ok) = q.PopOne();
|
||
ok.ShouldBeTrue();
|
||
e.ShouldBe(1);
|
||
q.Len().ShouldBe(0);
|
||
q.InProgress().ShouldBe(0L, "popOne does not increment inprogress");
|
||
q.Ch.TryRead(out _).ShouldBeFalse("no notification when queue is emptied by popOne");
|
||
|
||
q.Push(2);
|
||
q.Push(3);
|
||
|
||
var (e2, ok2) = q.PopOne();
|
||
ok2.ShouldBeTrue();
|
||
e2.ShouldBe(2);
|
||
q.Len().ShouldBe(1);
|
||
q.Ch.TryRead(out _).ShouldBeTrue("should re-notify when more items remain");
|
||
|
||
var (e3, ok3) = q.PopOne();
|
||
ok3.ShouldBeTrue();
|
||
e3.ShouldBe(3);
|
||
q.Len().ShouldBe(0);
|
||
q.Ch.TryRead(out _).ShouldBeFalse("no notification after last element removed");
|
||
|
||
var (_, okEmpty) = q.PopOne();
|
||
okEmpty.ShouldBeFalse("popOne on empty queue returns false");
|
||
}
|
||
|
||
[Fact]
|
||
public async Task MultiProducers_ShouldReceiveAllElements()
|
||
{
|
||
// Mirror: TestIPQueueMultiProducers
|
||
var q = new IpQueue<int>("test");
|
||
const int itemsPerProducer = 100;
|
||
const int numProducers = 3;
|
||
|
||
var tasks = Enumerable.Range(0, numProducers).Select(p =>
|
||
Task.Run(() =>
|
||
{
|
||
for (var i = p * itemsPerProducer + 1; i <= (p + 1) * itemsPerProducer; i++)
|
||
q.Push(i);
|
||
})).ToArray();
|
||
|
||
var received = new HashSet<int>();
|
||
var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
|
||
|
||
while (received.Count < numProducers * itemsPerProducer &&
|
||
!cts.Token.IsCancellationRequested)
|
||
{
|
||
if (q.Ch.TryRead(out _))
|
||
{
|
||
var batch = q.Pop();
|
||
if (batch != null)
|
||
{
|
||
foreach (var v in batch) received.Add(v);
|
||
q.Recycle(batch);
|
||
q.InProgress().ShouldBe(0L);
|
||
}
|
||
}
|
||
else
|
||
{
|
||
await Task.Delay(1, cts.Token);
|
||
}
|
||
}
|
||
|
||
await Task.WhenAll(tasks);
|
||
received.Count.ShouldBe(numProducers * itemsPerProducer, "all elements should be received");
|
||
}
|
||
|
||
[Fact]
|
||
public void Recycle_ShouldDecrementInProgressAndAllowReuse()
|
||
{
|
||
// Mirror: TestIPQueueRecycle (behavioral aspects)
|
||
var q = new IpQueue<int>("test");
|
||
const int total = 1000;
|
||
|
||
for (var i = 0; i < total; i++)
|
||
{
|
||
var (len, err) = q.Push(i);
|
||
err.ShouldBeNull();
|
||
len.ShouldBe(i + 1);
|
||
}
|
||
|
||
var values = q.Pop();
|
||
values.ShouldNotBeNull();
|
||
values!.Length.ShouldBe(total);
|
||
q.InProgress().ShouldBe((long)total);
|
||
|
||
q.Recycle(values);
|
||
q.InProgress().ShouldBe(0L, "recycle should decrement inprogress");
|
||
|
||
// Should be able to push/pop again after recycle.
|
||
var (l, err2) = q.Push(1001);
|
||
err2.ShouldBeNull();
|
||
l.ShouldBe(1);
|
||
var values2 = q.Pop();
|
||
values2.ShouldNotBeNull();
|
||
values2!.Length.ShouldBe(1);
|
||
values2[0].ShouldBe(1001);
|
||
|
||
// Recycle with small max recycle size: large arrays should not be pooled
|
||
// (behavioral: push/pop still works correctly).
|
||
var q2 = new IpQueue<int>("test2", maxRecycleSize: 10);
|
||
for (var i = 0; i < 100; i++) q2.Push(i);
|
||
var bigBatch = q2.Pop();
|
||
bigBatch.ShouldNotBeNull();
|
||
bigBatch!.Length.ShouldBe(100);
|
||
q2.Recycle(bigBatch);
|
||
q2.InProgress().ShouldBe(0L);
|
||
|
||
q2.Push(1001);
|
||
var small = q2.Pop();
|
||
small.ShouldNotBeNull();
|
||
small!.Length.ShouldBe(1);
|
||
q2.Recycle(small);
|
||
}
|
||
|
||
[Fact]
|
||
public void Drain_ShouldEmptyQueueAndConsumeSignal()
|
||
{
|
||
// Mirror: TestIPQueueDrain
|
||
var q = new IpQueue<int>("test");
|
||
for (var i = 1; i <= 100; i++) q.Push(i);
|
||
|
||
var drained = q.Drain();
|
||
drained.ShouldBe(100);
|
||
|
||
// Signal should have been consumed.
|
||
q.Ch.TryRead(out _).ShouldBeFalse("drain should consume the notification signal");
|
||
q.Len().ShouldBe(0);
|
||
}
|
||
|
||
[Fact]
|
||
public void SizeCalculation_ShouldTrackTotalSize()
|
||
{
|
||
// Mirror: TestIPQueueSizeCalculation
|
||
const int elemSize = 16;
|
||
var q = new IpQueue<byte[]>("test", sizeCalc: e => (ulong)e.Length);
|
||
|
||
for (var i = 0; i < 10; i++)
|
||
{
|
||
q.Push(new byte[elemSize]);
|
||
q.Len().ShouldBe(i + 1);
|
||
q.Size().ShouldBe((ulong)(i + 1) * elemSize);
|
||
}
|
||
|
||
for (var i = 10; i > 5; i--)
|
||
{
|
||
q.PopOne();
|
||
q.Len().ShouldBe(i - 1);
|
||
q.Size().ShouldBe((ulong)(i - 1) * elemSize);
|
||
}
|
||
|
||
q.Pop();
|
||
q.Len().ShouldBe(0);
|
||
q.Size().ShouldBe(0UL);
|
||
}
|
||
|
||
[Fact]
|
||
public void SizeCalculationWithLimits_ShouldEnforceLimits()
|
||
{
|
||
// Mirror: TestIPQueueSizeCalculationWithLimits
|
||
const int elemSize = 16;
|
||
Func<byte[], ulong> calc = e => (ulong)e.Length;
|
||
var elem = new byte[elemSize];
|
||
|
||
// LimitByLen
|
||
var q1 = new IpQueue<byte[]>("test-len", sizeCalc: calc, maxLen: 5);
|
||
for (var i = 0; i < 10; i++)
|
||
{
|
||
var (n, err) = q1.Push(elem);
|
||
if (i >= 5)
|
||
{
|
||
err.ShouldBeSameAs(IpQueueErrors.LenLimitReached, $"iteration {i}");
|
||
}
|
||
else
|
||
{
|
||
err.ShouldBeNull($"iteration {i}");
|
||
}
|
||
n.ShouldBeLessThan(6);
|
||
}
|
||
|
||
// LimitBySize
|
||
var q2 = new IpQueue<byte[]>("test-size", sizeCalc: calc, maxSize: elemSize * 5);
|
||
for (var i = 0; i < 10; i++)
|
||
{
|
||
var (n, err) = q2.Push(elem);
|
||
if (i >= 5)
|
||
{
|
||
err.ShouldBeSameAs(IpQueueErrors.SizeLimitReached, $"iteration {i}");
|
||
}
|
||
else
|
||
{
|
||
err.ShouldBeNull($"iteration {i}");
|
||
}
|
||
n.ShouldBeLessThan(6);
|
||
}
|
||
}
|
||
}
|