using Shouldly; using ZB.MOM.NatsNet.Server; namespace ZB.MOM.NatsNet.Server.Tests.JetStream; public sealed class NatsStreamConsumersTests { [Fact] public void NewCMsg_ReturnToPool_ClearsValues() { var msg = NatsStream.NewCMsg("orders.created", 22); msg.Subject.ShouldBe("orders.created"); msg.Seq.ShouldBe(22UL); msg.ReturnToPool(); msg.Subject.ShouldBe(string.Empty); msg.Seq.ShouldBe(0UL); } [Fact] public void NewJSPubMsg_WithHeaderAndData_ReturnsSizedMessage() { var pub = NatsStream.NewJSPubMsg("inbox.x", "orders", "reply", [1, 2], [3, 4, 5], null, 12); pub.Subject.ShouldBe("inbox.x"); pub.Size().ShouldBeGreaterThan(0); } [Fact] public void JsOutQ_SendThenUnregister_RejectsFutureSends() { var outq = new JsOutQ(); var first = outq.SendMsg("inbox.1", [1, 2, 3]); first.Error.ShouldBeNull(); first.Len.ShouldBeGreaterThan(0); outq.Unregister(); var second = outq.SendMsg("inbox.2", [4]); second.Error.ShouldNotBeNull(); } [Fact] public void AccName_AndNameLocked_ReturnConfiguredValues() { var stream = CreateStream(); stream.AccName().ShouldBe("A"); stream.NameLocked().ShouldBe("S"); } [Fact] public void SetupSendCapabilities_AndResetConsumers_DoNotThrow() { var stream = CreateStream(); Should.NotThrow(stream.SetupSendCapabilities); Should.NotThrow(stream.ResetAndWaitOnConsumers); } [Fact] public void SetConsumer_LookupAndCounts_ReturnExpectedValues() { var stream = CreateStream(); var standard = new NatsConsumer("S", new ConsumerConfig { Name = "c1", Direct = false }, DateTime.UtcNow); var direct = new NatsConsumer("S", new ConsumerConfig { Name = "c2", Direct = true }, DateTime.UtcNow); stream.SetConsumer(standard); stream.SetConsumer(direct); stream.NumConsumers().ShouldBe(2); stream.NumPublicConsumers().ShouldBe(1); stream.NumDirectConsumers().ShouldBe(1); stream.LookupConsumer("c2").ShouldBe(direct); } [Fact] public void GetMsg_WhenStored_ReturnsStoredMessage() { var stream = CreateStream(); stream.SetupStore(null).ShouldBeNull(); stream.Store!.StoreMsg("events", null, [1, 2], ttl: 0); var message = stream.GetMsg(1); message.ShouldNotBeNull(); message!.Subject.ShouldBe("events"); message.Sequence.ShouldBe(1UL); } [Fact] public void PartitionUnique_WithCollidingFilters_ReturnsFalse() { var stream = CreateStream(); var existing = new NatsConsumer("S", new ConsumerConfig { Name = "existing", FilterSubject = "orders.*" }, DateTime.UtcNow); stream.SetConsumer(existing); var unique = stream.PartitionUnique("new", ["orders.created"]); unique.ShouldBeFalse(); } [Fact] public void PotentialFilteredConsumers_WithWildcardSubjectAndConsumer_ReturnsTrue() { var stream = CreateStream(["orders.>"]); stream.SetConsumer(new NatsConsumer("S", new ConsumerConfig { Name = "c1" }, DateTime.UtcNow)); stream.PotentialFilteredConsumers().ShouldBeTrue(); } [Fact] public void NoInterest_WithOnlyObservingConsumer_ReturnsTrue() { var stream = CreateStream(); var observer = new NatsConsumer("S", new ConsumerConfig { Name = "observer" }, DateTime.UtcNow); stream.SetConsumer(observer); stream.NoInterest(1, observer).ShouldBeTrue(); } [Fact] public void IsInterestRetention_WhenPolicyIsInterest_ReturnsTrue() { var stream = CreateStream(retention: RetentionPolicy.InterestPolicy); stream.IsInterestRetention().ShouldBeTrue(); } private static NatsStream CreateStream(string[]? subjects = null, RetentionPolicy retention = RetentionPolicy.LimitsPolicy) { var account = new Account { Name = "A" }; var config = new StreamConfig { Name = "S", Storage = StorageType.MemoryStorage, Subjects = subjects ?? ["events.>"], Retention = retention, }; return new NatsStream(account, config, DateTime.UtcNow); } }