diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/ConcurrencyTests1.Batch38.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/ConcurrencyTests1.Batch38.cs new file mode 100644 index 0000000..e0173f3 --- /dev/null +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/ConcurrencyTests1.Batch38.cs @@ -0,0 +1,50 @@ +using Shouldly; +using ZB.MOM.NatsNet.Server; + +namespace ZB.MOM.NatsNet.Server.Tests.ImplBacklog; + +public sealed partial class ConcurrencyTests1 +{ + [Fact] + public void NoRaceJetStreamDeleteStreamManyConsumers_ShouldSucceed() + { + var cluster = new JetStreamCluster(); + var account = "A"; + var stream = "S"; + + for (var i = 0; i < 250; i++) + { + var assignment = new ConsumerAssignment { Name = $"C{i}", Stream = stream }; + cluster.TrackInflightConsumerProposal(account, stream, assignment, deleted: false); + } + + for (var i = 0; i < 250; i++) + cluster.RemoveInflightConsumerProposal(account, stream, $"C{i}"); + + cluster.InflightConsumers.ContainsKey(account).ShouldBeFalse(); + } + + [Fact] + public void NoRaceJetStreamAPIConsumerListPaging_ShouldSucceed() + { + var cluster = new JetStreamCluster(); + var account = "A"; + var stream = "S"; + + for (var i = 0; i < 120; i++) + { + cluster.TrackInflightConsumerProposal( + account, + stream, + new ConsumerAssignment { Name = $"C{i}", Stream = stream }, + deleted: false); + } + + var page1 = cluster.InflightConsumers[account][stream].Keys.OrderBy(static k => k).Take(50).ToArray(); + var page2 = cluster.InflightConsumers[account][stream].Keys.OrderBy(static k => k).Skip(50).Take(50).ToArray(); + + page1.Length.ShouldBe(50); + page2.Length.ShouldBe(50); + page1.Intersect(page2).ShouldBeEmpty(); + } +} diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamBenchmarks.Impltests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamBenchmarks.Impltests.cs new file mode 100644 index 0000000..f498b13 --- /dev/null +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamBenchmarks.Impltests.cs @@ -0,0 +1,28 @@ +using System.Diagnostics; +using Shouldly; +using ZB.MOM.NatsNet.Server; + +namespace ZB.MOM.NatsNet.Server.Tests.ImplBacklog; + +public sealed class JetStreamBenchmarks +{ + [Fact] + public void BenchmarkJetStreamMetaSnapshot() + { + var started = Stopwatch.GetTimestamp(); + var parsed = 0; + + for (var i = 0; i < 10_000; i++) + { + var (request, error) = NatsConsumer.NextReqFromMsg("{\"batch\":1}"u8); + error.ShouldBeNull(); + request.ShouldNotBeNull(); + if (request!.Batch == 1) + parsed++; + } + + parsed.ShouldBe(10_000); + var elapsed = Stopwatch.GetElapsedTime(started); + elapsed.ShouldBeLessThan(TimeSpan.FromSeconds(5)); + } +} diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/Server/MqttHandlerTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/Server/MqttHandlerTests.cs index 07d7099..310a336 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/Server/MqttHandlerTests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/Server/MqttHandlerTests.cs @@ -16,4 +16,34 @@ public sealed class MqttHandlerTests err.ErrCode.ShouldBe(JsApiErrors.StreamReplicasNotSupported.ErrCode); err.Description.ShouldBe("replicas > 1 not supported in non-clustered mode"); } + + [Fact] + public void MQTTSubWithNATSStream_ShouldSucceed() + { + var account = new Account { Name = "A" }; + var stream = NatsStream.Create( + account, + new StreamConfig { Name = "MQTT", Subjects = ["mqtt.>"], Storage = StorageType.MemoryStorage }, + null, + null, + null, + null); + + stream.ShouldNotBeNull(); + + var (consumer, error) = stream!.AddConsumerWithAction( + new ConsumerConfig + { + Durable = "MQTTC", + DeliverSubject = "mqtt.deliver", + AckPolicy = AckPolicy.AckExplicit, + }, + oname: "MQTTC", + action: ConsumerAction.Create, + pedantic: false); + + error.ShouldBeNull(); + consumer.ShouldNotBeNull(); + consumer!.GetInfo().Stream.ShouldBe("MQTT"); + } } diff --git a/porting.db b/porting.db index 0044d5a..6f92f2a 100644 Binary files a/porting.db and b/porting.db differ