Files
natsdotnet/tests/NATS.Server.Tests/JetStream/Streams/SourceConsumerSetupTests.cs
Joseph Doherty a113dd686d feat: complete source consumer API request generation (Gap 4.3)
Add BuildConsumerCreateRequest, BuildConsumerCreateSubject, and
GetDeliverySequence to SourceCoordinator, modelling Go's
setupSourceConsumer/trySetupSourceConsumer. Covers DeliverPolicy
resume-from-sequence, AckPolicy.None, push/flow-control/heartbeat
consumer fields, and the $JS.API.CONSUMER.CREATE.{name} subject format.
2026-02-25 11:21:21 -05:00

214 lines
7.1 KiB
C#

using NATS.Server.JetStream.MirrorSource;
using NATS.Server.JetStream.Models;
using NATS.Server.JetStream.Storage;
using Shouldly;
namespace NATS.Server.Tests.JetStream.Streams;
// Go reference: server/stream.go:3474-3720 (setupSourceConsumer, trySetupSourceConsumer)
// Go reference: server/stream.go:3531 ($JS.API.CONSUMER.CREATE.{sourceName})
// Go reference: server/stream.go:3573-3598 (DeliverPolicy, FilterSubject, AckPolicy assignment)
public class SourceConsumerSetupTests
{
// -------------------------------------------------------------------------
// BuildConsumerCreateRequest — FilterSubject
// Go reference: server/stream.go:3597-3598
// -------------------------------------------------------------------------
[Fact]
public void BuildConsumerCreateRequest_sets_filter_subject()
{
var target = new MemStore();
var coordinator = new SourceCoordinator(target, new StreamSourceConfig
{
Name = "SOURCE",
FilterSubject = "orders.>",
});
var req = coordinator.BuildConsumerCreateRequest();
req.FilterSubject.ShouldBe("orders.>");
}
[Fact]
public void BuildConsumerCreateRequest_no_filter_leaves_null()
{
var target = new MemStore();
var coordinator = new SourceCoordinator(target, new StreamSourceConfig
{
Name = "SOURCE",
});
var req = coordinator.BuildConsumerCreateRequest();
req.FilterSubject.ShouldBeNull();
}
// -------------------------------------------------------------------------
// BuildConsumerCreateRequest — DeliverPolicy
// Go reference: server/stream.go:3573-3582
// -------------------------------------------------------------------------
[Fact]
public void BuildConsumerCreateRequest_starts_from_beginning_when_no_progress()
{
var target = new MemStore();
var coordinator = new SourceCoordinator(target, new StreamSourceConfig
{
Name = "SOURCE",
});
// LastOriginSequence is 0 — no messages have been processed yet
coordinator.LastOriginSequence.ShouldBe(0UL);
var req = coordinator.BuildConsumerCreateRequest();
req.DeliverPolicy.ShouldBe(DeliverPolicy.All);
}
[Fact]
public async Task BuildConsumerCreateRequest_resumes_from_last_sequence()
{
var target = new MemStore();
var coordinator = new SourceCoordinator(target, new StreamSourceConfig
{
Name = "SOURCE",
});
// Advance LastOriginSequence by processing messages
await coordinator.OnOriginAppendAsync(MakeMessage(3, "orders.created", "a"), default);
await coordinator.OnOriginAppendAsync(MakeMessage(7, "orders.updated", "b"), default);
coordinator.LastOriginSequence.ShouldBe(7UL);
var req = coordinator.BuildConsumerCreateRequest();
req.DeliverPolicy.ShouldBe(DeliverPolicy.ByStartSequence);
req.OptStartSeq.ShouldBe(8UL); // LastOriginSequence + 1
}
// -------------------------------------------------------------------------
// BuildConsumerCreateRequest — AckPolicy
// Go reference: server/stream.go:3586
// -------------------------------------------------------------------------
[Fact]
public void BuildConsumerCreateRequest_sets_ack_none()
{
var target = new MemStore();
var coordinator = new SourceCoordinator(target, new StreamSourceConfig
{
Name = "SOURCE",
});
var req = coordinator.BuildConsumerCreateRequest();
req.AckPolicy.ShouldBe(AckPolicy.None);
}
// -------------------------------------------------------------------------
// BuildConsumerCreateRequest — Push + FlowControl
// Go reference: server/stream.go:3589-3592
// -------------------------------------------------------------------------
[Fact]
public void BuildConsumerCreateRequest_enables_push_and_flow_control()
{
var target = new MemStore();
var coordinator = new SourceCoordinator(target, new StreamSourceConfig
{
Name = "SOURCE",
});
var req = coordinator.BuildConsumerCreateRequest();
req.Push.ShouldBeTrue();
req.FlowControl.ShouldBeTrue();
}
// -------------------------------------------------------------------------
// BuildConsumerCreateRequest — HeartbeatMs
// Go reference: server/stream.go:3593 — sourceHealthHB = 1 * time.Second
// -------------------------------------------------------------------------
[Fact]
public void BuildConsumerCreateRequest_sets_heartbeat()
{
var target = new MemStore();
var coordinator = new SourceCoordinator(target, new StreamSourceConfig
{
Name = "SOURCE",
});
var req = coordinator.BuildConsumerCreateRequest();
// HeartbeatInterval is 1 second = 1000 ms
req.HeartbeatMs.ShouldBe(1000);
}
// -------------------------------------------------------------------------
// BuildConsumerCreateSubject
// Go reference: server/stream.go:3531
// -------------------------------------------------------------------------
[Fact]
public void BuildConsumerCreateSubject_formats_correctly()
{
var target = new MemStore();
var coordinator = new SourceCoordinator(target, new StreamSourceConfig
{
Name = "MY_SOURCE",
});
var subject = coordinator.BuildConsumerCreateSubject();
subject.ShouldBe("$JS.API.CONSUMER.CREATE.MY_SOURCE");
}
// -------------------------------------------------------------------------
// GetDeliverySequence
// Go reference: server/stream.go si.dseq field
// -------------------------------------------------------------------------
[Fact]
public void GetDeliverySequence_starts_at_zero()
{
var target = new MemStore();
var coordinator = new SourceCoordinator(target, new StreamSourceConfig
{
Name = "SOURCE",
});
coordinator.GetDeliverySequence.ShouldBe(0UL);
}
[Fact]
public async Task GetDeliverySequence_increments_after_processing()
{
var target = new MemStore();
var coordinator = new SourceCoordinator(target, new StreamSourceConfig
{
Name = "SOURCE",
});
await coordinator.OnOriginAppendAsync(MakeMessage(1, "orders.created", "x"), default);
await coordinator.OnOriginAppendAsync(MakeMessage(2, "orders.updated", "y"), default);
await coordinator.OnOriginAppendAsync(MakeMessage(3, "orders.shipped", "z"), default);
coordinator.GetDeliverySequence.ShouldBe(3UL);
}
// -------------------------------------------------------------------------
// Helpers
// -------------------------------------------------------------------------
private static StoredMessage MakeMessage(ulong seq, string subject, string payload) => new()
{
Sequence = seq,
Subject = subject,
Payload = System.Text.Encoding.UTF8.GetBytes(payload),
TimestampUtc = DateTime.UtcNow,
};
}