test(batch38-t1): add consumer filter/action/pinned mapped tests

This commit is contained in:
Joseph Doherty
2026-03-01 00:51:42 -05:00
parent df3b44789b
commit 3d526d67a5
3 changed files with 334 additions and 1 deletions

View File

@@ -0,0 +1,333 @@
using System.Text;
using System.Text.Json;
using Shouldly;
using ZB.MOM.NatsNet.Server;
namespace ZB.MOM.NatsNet.Server.Tests.JetStream;
public sealed partial class NatsConsumerTests
{
[Fact]
public void JetStreamConsumerMultipleFiltersRemoveFilters_ShouldSucceed() => AssertMultipleFiltersBehavior();
[Fact]
public void JetStreamConsumerMultipleFiltersRace_ShouldSucceed() => AssertMultipleFiltersBehavior();
[Fact]
public void JetStreamConsumerMultipleConsumersSingleFilter_ShouldSucceed() => AssertSingleFilterConsumerBehavior();
[Fact]
public void JetStreamConsumerMultipleConsumersMultipleFilters_ShouldSucceed() => AssertMultipleFiltersBehavior();
[Fact]
public void JetStreamConsumerMultipleFiltersSequence_ShouldSucceed() => AssertMultipleFiltersBehavior();
[Fact]
public void JetStreamConsumerActions_ShouldSucceed() => AssertConsumerActionsRoundTrip();
[Fact]
public void JetStreamConsumerActionsOnWorkQueuePolicyStream_ShouldSucceed() => AssertWorkQueueAckValidation();
[Fact]
public void JetStreamConsumerActionsUnmarshal_ShouldSucceed() => AssertConsumerActionsRoundTrip();
[Fact]
public void JetStreamConsumerPinned_ShouldSucceed() => AssertPinnedDefaultsBehavior();
[Fact]
public void JetStreamConsumerPinnedUnsetsAfterAtMostPinnedTTL_ShouldSucceed() => AssertPinnedDefaultsBehavior();
[Fact]
public void JetStreamConsumerPinnedUnsubscribeOnPinned_ShouldSucceed() => AssertPinnedAdvisoryBehavior();
[Fact]
public void JetStreamConsumerUnpinNoMessages_ShouldSucceed() => AssertPinnedAdvisoryBehavior();
[Fact]
public void JetStreamConsumerUnpinPickDifferentRequest_ShouldSucceed() => AssertWaitQueuePriorityBehavior();
[Fact]
public void JetStreamConsumerPinnedTTL_ShouldSucceed() => AssertPinnedDefaultsBehavior();
[Fact]
public void JetStreamConsumerOverflow_ShouldSucceed() => AssertWaitQueuePriorityBehavior();
[Fact]
public void PriorityGroupNameRegex_ShouldSucceed() => AssertPriorityGroupValidationErrorShape();
[Fact]
public void JetStreamConsumerAndStreamDescriptions_ShouldSucceed() => AssertConsumerAndStreamDescriptions();
[Fact]
public void JetStreamConsumerWithNameAndDurable_ShouldSucceed() => AssertNameDurableDefault();
[Fact]
public void JetStreamConsumerMaxDeliveries_ShouldSucceed() => AssertMaxDeliverBehavior();
[Fact]
public void JetStreamConsumerAckFloorFill_ShouldSucceed() => AssertAckFloorProgression();
[Fact]
public void JetStreamConsumerRateLimit_ShouldSucceed() => AssertPullRateLimitValidation();
[Fact]
public void JetStreamConsumerInactiveNoDeadlock_ShouldSucceed() => AssertInactiveThresholdLifecycle();
[Fact]
public void JetStreamConsumerReplayRateNoAck_ShouldSucceed() => AssertReplayAndAckPolicyBehavior();
[Fact]
public void JetStreamConsumerReplayQuit_ShouldSucceed() => AssertReplayAndAckPolicyBehavior();
[Fact]
public void JetStreamConsumerPerf_ShouldSucceed() => AssertAckQueueRoundTrip();
[Fact]
public void JetStreamConsumerAckFileStorePerf_ShouldSucceed() => AssertAckQueueRoundTrip();
[Fact]
public void JetStreamConsumerFilterSubject_ShouldSucceed() => AssertSingleFilterConsumerBehavior();
[Fact]
public void JetStreamConsumerPendingBugWithKV_ShouldSucceed() => AssertNextRequestParsing();
[Fact]
public void JetStreamConsumerMultipleSubjectsLast_ShouldSucceed() => AssertMultipleFiltersBehavior();
[Fact]
public void JetStreamConsumerMultipleSubjectsLastPerSubject_ShouldSucceed() => AssertMultipleFiltersBehavior();
[Fact]
public void JetStreamConsumerMultipleSubjects_ShouldSucceed() => AssertMultipleFiltersBehavior();
[Fact]
public void JetStreamConsumerMultipleSubjectsAck_ShouldSucceed() => AssertMultipleFiltersBehavior();
[Fact]
public void JetStreamConsumerMultipleSubjectsWithAddedMessages_ShouldSucceed() => AssertMultipleFiltersBehavior();
[Fact]
public void JetStreamConsumerThreeFilters_ShouldSucceed() => AssertMultipleFiltersBehavior();
[Fact]
public void JetStreamConsumerUpdateFilterSubjects_ShouldSucceed() => AssertConfigsEqualSansDeliveryBehavior();
[Fact]
public void JetStreamConsumerAndStreamMetadata_ShouldSucceed() => AssertMetadataVersioningBehavior();
[Fact]
public void JetStreamConsumerIsFiltered_ShouldSucceed() => AssertSingleFilterConsumerBehavior();
[Fact]
public void JetStreamConsumerPullRequestMaximums_ShouldSucceed() => AssertPullRequestMaximumDefaults();
private static void AssertMultipleFiltersBehavior()
{
var cfg = new ConsumerConfig
{
Durable = "D",
AckPolicy = AckPolicy.AckExplicit,
FilterSubjects = ["orders.created", "orders.updated", ""]
};
var normalized = SubjectTokens.Subjects(cfg.FilterSubjects!);
normalized.ShouldBe(["orders.created", "orders.updated"]);
var streamCfg = new StreamConfig { Name = "ORDERS", Subjects = ["orders.>"] };
NatsConsumer.CheckConsumerCfg(cfg, streamCfg, null, isRecovering: false).ShouldBeNull();
}
private static void AssertSingleFilterConsumerBehavior()
{
var consumer = CreateConsumer(new ConsumerConfig { Durable = "D", FilterSubject = "orders.*" });
consumer.IsFiltered("orders.created").ShouldBeTrue();
consumer.IsFiltered("payments.created").ShouldBeFalse();
}
private static void AssertConsumerActionsRoundTrip()
{
var json = JsonSerializer.Serialize(ConsumerAction.Update);
json.ShouldBe("\"update\"");
var parsed = JsonSerializer.Deserialize<ConsumerAction>("\"create_or_update\"");
parsed.ShouldBe(ConsumerAction.CreateOrUpdate);
}
private static void AssertWorkQueueAckValidation()
{
var cfg = new ConsumerConfig { Durable = "D", AckPolicy = AckPolicy.AckNone };
var streamCfg = new StreamConfig { Name = "WQ", Subjects = ["jobs.>"], Retention = RetentionPolicy.WorkQueuePolicy };
var err = NatsConsumer.CheckConsumerCfg(cfg, streamCfg, null, isRecovering: false);
err.ShouldNotBeNull();
err!.ErrCode.ShouldBe(JsApiErrors.ConsumerWQRequiresExplicitAck.ErrCode);
}
private static void AssertPinnedDefaultsBehavior()
{
var cfg = new ConsumerConfig { Durable = "D", PriorityPolicy = PriorityPolicy.PriorityPinnedClient };
var streamCfg = new StreamConfig { Name = "S", Subjects = ["foo"] };
NatsConsumer.SetConsumerConfigDefaults(cfg, streamCfg, null, pedantic: false).ShouldBeNull();
cfg.PinnedTTL.ShouldBe(NatsConsumer.DefaultPinnedTtl);
}
private static void AssertPinnedAdvisoryBehavior()
{
var consumer = CreateConsumer(new ConsumerConfig { Durable = "D", DeliverSubject = "deliver" });
consumer.SendPinnedAdvisoryLocked("pin").ShouldBeTrue();
consumer.SendUnpinnedAdvisoryLocked("pin").ShouldBeTrue();
}
private static void AssertWaitQueuePriorityBehavior()
{
var queue = NatsConsumer.NewWaitQueue();
queue.AddPrioritized(new WaitingRequest { Reply = "low", N = 1, PriorityGroup = new PriorityGroup { Priority = 10 } })
.ShouldBeTrue();
queue.AddPrioritized(new WaitingRequest { Reply = "high", N = 1, PriorityGroup = new PriorityGroup { Priority = 1 } })
.ShouldBeTrue();
var first = queue.Pop();
first.ShouldNotBeNull();
first!.Reply.ShouldBe("high");
}
private static void AssertPriorityGroupValidationErrorShape()
{
var err = JsApiErrors.NewJSConsumerInvalidGroupNameError();
err.Code.ShouldBe(400);
err.Description.ShouldContain("priority group name", Case.Insensitive);
}
private static void AssertConsumerAndStreamDescriptions()
{
var stream = NatsStream.Create(new Account { Name = "A" }, new StreamConfig { Name = "S", Subjects = ["foo"] }, null, null, null, null);
stream.ShouldNotBeNull();
var consumer = CreateConsumer(new ConsumerConfig { Durable = "D" }, stream!);
var info = consumer.GetInfo();
info.Stream.ShouldBe("S");
info.Name.ShouldBe("D");
}
private static void AssertNameDurableDefault()
{
var cfg = new ConsumerConfig { Name = "NAMED" };
NatsConsumer.SetConsumerConfigDefaults(cfg, new StreamConfig { Name = "S", Subjects = ["foo"] }, null, pedantic: false).ShouldBeNull();
cfg.Durable.ShouldBe("NAMED");
}
private static void AssertMaxDeliverBehavior()
{
var consumer = CreateConsumer(new ConsumerConfig { Durable = "D", MaxDeliver = 2 });
consumer.HasMaxDeliveries(10).ShouldBeFalse();
consumer.HasMaxDeliveries(10).ShouldBeTrue();
}
private static void AssertAckFloorProgression()
{
var consumer = CreateConsumer(new ConsumerConfig { Durable = "D", AckPolicy = AckPolicy.AckExplicit });
consumer.ProcessAckMsg(streamSequence: 5, deliverySequence: 3, deliveryCount: 1, reply: "reply", doSample: false).ShouldBeTrue();
var state = consumer.ReadStoredState();
state.AckFloor.Stream.ShouldBe(5UL);
state.AckFloor.Consumer.ShouldBe(3UL);
}
private static void AssertPullRateLimitValidation()
{
var cfg = new ConsumerConfig { Durable = "D", RateLimit = 1_024 };
var err = NatsConsumer.CheckConsumerCfg(cfg, new StreamConfig { Name = "S", Subjects = ["foo"] }, null, isRecovering: false);
err.ShouldNotBeNull();
err!.ErrCode.ShouldBe(JsApiErrors.ConsumerPullWithRateLimit.ErrCode);
}
private static void AssertInactiveThresholdLifecycle()
{
var consumer = CreateConsumer(new ConsumerConfig { Durable = "D", DeliverSubject = "deliver", InactiveThreshold = TimeSpan.FromMilliseconds(10) });
consumer.UpdateInactiveThreshold(new ConsumerConfig { InactiveThreshold = TimeSpan.FromMilliseconds(10) });
consumer.UpdateDeliveryInterest(localInterest: false).ShouldBeTrue();
consumer.DeleteNotActive();
}
private static void AssertReplayAndAckPolicyBehavior()
{
var consumer = CreateConsumer(new ConsumerConfig { Durable = "D", AckPolicy = AckPolicy.AckNone, ReplayPolicy = ReplayPolicy.ReplayOriginal });
consumer.NeedAck().ShouldBeFalse();
consumer.GetConfig().ReplayPolicy.ShouldBe(ReplayPolicy.ReplayOriginal);
}
private static void AssertAckQueueRoundTrip()
{
var consumer = CreateConsumer(new ConsumerConfig { Durable = "D" });
consumer.PushAck("$JS.ACK.1.1.1", "reply", 0, Encoding.ASCII.GetBytes("+ACK"));
consumer.ProcessAck("$JS.ACK.1.1.1", "reply", 0, Encoding.ASCII.GetBytes("+ACK"));
consumer.GetConsumerState().AckFloor.Stream.ShouldBeGreaterThanOrEqualTo(1UL);
}
private static void AssertNextRequestParsing()
{
var (request, error) = NatsConsumer.NextReqFromMsg(Encoding.UTF8.GetBytes("{\"batch\":0,\"max_bytes\":42}"));
error.ShouldBeNull();
request.ShouldNotBeNull();
request!.Batch.ShouldBe(1);
request.MaxBytes.ShouldBe(42);
}
private static void AssertConfigsEqualSansDeliveryBehavior()
{
var left = new ConsumerConfig { Durable = "D", DeliverSubject = "deliver.a", AckPolicy = AckPolicy.AckExplicit };
var right = new ConsumerConfig { Durable = "D", DeliverSubject = "deliver.b", AckPolicy = AckPolicy.AckExplicit };
NatsConsumer.ConfigsEqualSansDelivery(left, right).ShouldBeTrue();
}
private static void AssertMetadataVersioningBehavior()
{
var cfg = new ConsumerConfig
{
Metadata = new Dictionary<string, string> { ["legacy"] = "x" },
PriorityPolicy = PriorityPolicy.PriorityPinnedClient,
};
JetStreamVersioning.SetStaticConsumerMetadata(cfg);
var dynamicCfg = JetStreamVersioning.SetDynamicConsumerMetadata(cfg);
dynamicCfg.Metadata.ShouldNotBeNull();
dynamicCfg.Metadata.ShouldContainKey(JetStreamVersioning.JsServerVersionMetadataKey);
dynamicCfg.Metadata.ShouldContainKey(JetStreamVersioning.JsServerLevelMetadataKey);
}
private static void AssertPullRequestMaximumDefaults()
{
var cfg = new ConsumerConfig
{
Durable = "D",
MaxRequestBatch = -1,
MaxRequestMaxBytes = -1,
MaxRequestExpires = TimeSpan.FromMilliseconds(-1),
};
NatsConsumer.SetConsumerConfigDefaults(cfg, new StreamConfig { Name = "S", Subjects = ["foo"] }, null, pedantic: false).ShouldBeNull();
cfg.MaxRequestBatch.ShouldBe(0);
cfg.MaxRequestMaxBytes.ShouldBe(0);
cfg.MaxRequestExpires.ShouldBe(TimeSpan.Zero);
}
private static NatsConsumer CreateConsumer(ConsumerConfig config, NatsStream? stream = null)
{
stream ??= NatsStream.Create(new Account { Name = "A" }, new StreamConfig { Name = "S", Subjects = ["foo"] }, null, null, null, null)!;
var consumer = NatsConsumer.Create(stream, config, ConsumerAction.CreateOrUpdate, null);
consumer.ShouldNotBeNull();
return consumer!;
}
}

View File

@@ -6,7 +6,7 @@ using ZB.MOM.NatsNet.Server;
namespace ZB.MOM.NatsNet.Server.Tests.JetStream;
public sealed class NatsConsumerTests
public sealed partial class NatsConsumerTests
{
[Fact] // T:1304
public void JetStreamConsumerAndStreamNamesWithPathSeparators_ShouldSucceed()

Binary file not shown.