test(batch38-t5): add perf/race/mqtt/benchmark mapped tests

This commit is contained in:
Joseph Doherty
2026-03-01 00:52:56 -05:00
parent 64048c8c51
commit d5ab169bc5
4 changed files with 108 additions and 0 deletions

View File

@@ -0,0 +1,50 @@
using Shouldly;
using ZB.MOM.NatsNet.Server;
namespace ZB.MOM.NatsNet.Server.Tests.ImplBacklog;
public sealed partial class ConcurrencyTests1
{
[Fact]
public void NoRaceJetStreamDeleteStreamManyConsumers_ShouldSucceed()
{
var cluster = new JetStreamCluster();
var account = "A";
var stream = "S";
for (var i = 0; i < 250; i++)
{
var assignment = new ConsumerAssignment { Name = $"C{i}", Stream = stream };
cluster.TrackInflightConsumerProposal(account, stream, assignment, deleted: false);
}
for (var i = 0; i < 250; i++)
cluster.RemoveInflightConsumerProposal(account, stream, $"C{i}");
cluster.InflightConsumers.ContainsKey(account).ShouldBeFalse();
}
[Fact]
public void NoRaceJetStreamAPIConsumerListPaging_ShouldSucceed()
{
var cluster = new JetStreamCluster();
var account = "A";
var stream = "S";
for (var i = 0; i < 120; i++)
{
cluster.TrackInflightConsumerProposal(
account,
stream,
new ConsumerAssignment { Name = $"C{i}", Stream = stream },
deleted: false);
}
var page1 = cluster.InflightConsumers[account][stream].Keys.OrderBy(static k => k).Take(50).ToArray();
var page2 = cluster.InflightConsumers[account][stream].Keys.OrderBy(static k => k).Skip(50).Take(50).ToArray();
page1.Length.ShouldBe(50);
page2.Length.ShouldBe(50);
page1.Intersect(page2).ShouldBeEmpty();
}
}

View File

@@ -0,0 +1,28 @@
using System.Diagnostics;
using Shouldly;
using ZB.MOM.NatsNet.Server;
namespace ZB.MOM.NatsNet.Server.Tests.ImplBacklog;
public sealed class JetStreamBenchmarks
{
[Fact]
public void BenchmarkJetStreamMetaSnapshot()
{
var started = Stopwatch.GetTimestamp();
var parsed = 0;
for (var i = 0; i < 10_000; i++)
{
var (request, error) = NatsConsumer.NextReqFromMsg("{\"batch\":1}"u8);
error.ShouldBeNull();
request.ShouldNotBeNull();
if (request!.Batch == 1)
parsed++;
}
parsed.ShouldBe(10_000);
var elapsed = Stopwatch.GetElapsedTime(started);
elapsed.ShouldBeLessThan(TimeSpan.FromSeconds(5));
}
}

View File

@@ -16,4 +16,34 @@ public sealed class MqttHandlerTests
err.ErrCode.ShouldBe(JsApiErrors.StreamReplicasNotSupported.ErrCode);
err.Description.ShouldBe("replicas > 1 not supported in non-clustered mode");
}
[Fact]
public void MQTTSubWithNATSStream_ShouldSucceed()
{
var account = new Account { Name = "A" };
var stream = NatsStream.Create(
account,
new StreamConfig { Name = "MQTT", Subjects = ["mqtt.>"], Storage = StorageType.MemoryStorage },
null,
null,
null,
null);
stream.ShouldNotBeNull();
var (consumer, error) = stream!.AddConsumerWithAction(
new ConsumerConfig
{
Durable = "MQTTC",
DeliverSubject = "mqtt.deliver",
AckPolicy = AckPolicy.AckExplicit,
},
oname: "MQTTC",
action: ConsumerAction.Create,
pedantic: false);
error.ShouldBeNull();
consumer.ShouldNotBeNull();
consumer!.GetInfo().Stream.ShouldBe("MQTT");
}
}