Add BuildConsumerCreateRequest, BuildConsumerCreateSubject, and
GetDeliverySequence to SourceCoordinator, modelling Go's
setupSourceConsumer/trySetupSourceConsumer. Covers DeliverPolicy
resume-from-sequence, AckPolicy.None, push/flow-control/heartbeat
consumer fields, and the $JS.API.CONSUMER.CREATE.{name} subject format.
214 lines
7.1 KiB
C#
214 lines
7.1 KiB
C#
using NATS.Server.JetStream.MirrorSource;
|
|
using NATS.Server.JetStream.Models;
|
|
using NATS.Server.JetStream.Storage;
|
|
using Shouldly;
|
|
|
|
namespace NATS.Server.Tests.JetStream.Streams;
|
|
|
|
// Go reference: server/stream.go:3474-3720 (setupSourceConsumer, trySetupSourceConsumer)
|
|
// Go reference: server/stream.go:3531 ($JS.API.CONSUMER.CREATE.{sourceName})
|
|
// Go reference: server/stream.go:3573-3598 (DeliverPolicy, FilterSubject, AckPolicy assignment)
|
|
|
|
public class SourceConsumerSetupTests
|
|
{
|
|
// -------------------------------------------------------------------------
|
|
// BuildConsumerCreateRequest — FilterSubject
|
|
// Go reference: server/stream.go:3597-3598
|
|
// -------------------------------------------------------------------------
|
|
|
|
[Fact]
|
|
public void BuildConsumerCreateRequest_sets_filter_subject()
|
|
{
|
|
var target = new MemStore();
|
|
var coordinator = new SourceCoordinator(target, new StreamSourceConfig
|
|
{
|
|
Name = "SOURCE",
|
|
FilterSubject = "orders.>",
|
|
});
|
|
|
|
var req = coordinator.BuildConsumerCreateRequest();
|
|
|
|
req.FilterSubject.ShouldBe("orders.>");
|
|
}
|
|
|
|
[Fact]
|
|
public void BuildConsumerCreateRequest_no_filter_leaves_null()
|
|
{
|
|
var target = new MemStore();
|
|
var coordinator = new SourceCoordinator(target, new StreamSourceConfig
|
|
{
|
|
Name = "SOURCE",
|
|
});
|
|
|
|
var req = coordinator.BuildConsumerCreateRequest();
|
|
|
|
req.FilterSubject.ShouldBeNull();
|
|
}
|
|
|
|
// -------------------------------------------------------------------------
|
|
// BuildConsumerCreateRequest — DeliverPolicy
|
|
// Go reference: server/stream.go:3573-3582
|
|
// -------------------------------------------------------------------------
|
|
|
|
[Fact]
|
|
public void BuildConsumerCreateRequest_starts_from_beginning_when_no_progress()
|
|
{
|
|
var target = new MemStore();
|
|
var coordinator = new SourceCoordinator(target, new StreamSourceConfig
|
|
{
|
|
Name = "SOURCE",
|
|
});
|
|
|
|
// LastOriginSequence is 0 — no messages have been processed yet
|
|
coordinator.LastOriginSequence.ShouldBe(0UL);
|
|
|
|
var req = coordinator.BuildConsumerCreateRequest();
|
|
|
|
req.DeliverPolicy.ShouldBe(DeliverPolicy.All);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task BuildConsumerCreateRequest_resumes_from_last_sequence()
|
|
{
|
|
var target = new MemStore();
|
|
var coordinator = new SourceCoordinator(target, new StreamSourceConfig
|
|
{
|
|
Name = "SOURCE",
|
|
});
|
|
|
|
// Advance LastOriginSequence by processing messages
|
|
await coordinator.OnOriginAppendAsync(MakeMessage(3, "orders.created", "a"), default);
|
|
await coordinator.OnOriginAppendAsync(MakeMessage(7, "orders.updated", "b"), default);
|
|
|
|
coordinator.LastOriginSequence.ShouldBe(7UL);
|
|
|
|
var req = coordinator.BuildConsumerCreateRequest();
|
|
|
|
req.DeliverPolicy.ShouldBe(DeliverPolicy.ByStartSequence);
|
|
req.OptStartSeq.ShouldBe(8UL); // LastOriginSequence + 1
|
|
}
|
|
|
|
// -------------------------------------------------------------------------
|
|
// BuildConsumerCreateRequest — AckPolicy
|
|
// Go reference: server/stream.go:3586
|
|
// -------------------------------------------------------------------------
|
|
|
|
[Fact]
|
|
public void BuildConsumerCreateRequest_sets_ack_none()
|
|
{
|
|
var target = new MemStore();
|
|
var coordinator = new SourceCoordinator(target, new StreamSourceConfig
|
|
{
|
|
Name = "SOURCE",
|
|
});
|
|
|
|
var req = coordinator.BuildConsumerCreateRequest();
|
|
|
|
req.AckPolicy.ShouldBe(AckPolicy.None);
|
|
}
|
|
|
|
// -------------------------------------------------------------------------
|
|
// BuildConsumerCreateRequest — Push + FlowControl
|
|
// Go reference: server/stream.go:3589-3592
|
|
// -------------------------------------------------------------------------
|
|
|
|
[Fact]
|
|
public void BuildConsumerCreateRequest_enables_push_and_flow_control()
|
|
{
|
|
var target = new MemStore();
|
|
var coordinator = new SourceCoordinator(target, new StreamSourceConfig
|
|
{
|
|
Name = "SOURCE",
|
|
});
|
|
|
|
var req = coordinator.BuildConsumerCreateRequest();
|
|
|
|
req.Push.ShouldBeTrue();
|
|
req.FlowControl.ShouldBeTrue();
|
|
}
|
|
|
|
// -------------------------------------------------------------------------
|
|
// BuildConsumerCreateRequest — HeartbeatMs
|
|
// Go reference: server/stream.go:3593 — sourceHealthHB = 1 * time.Second
|
|
// -------------------------------------------------------------------------
|
|
|
|
[Fact]
|
|
public void BuildConsumerCreateRequest_sets_heartbeat()
|
|
{
|
|
var target = new MemStore();
|
|
var coordinator = new SourceCoordinator(target, new StreamSourceConfig
|
|
{
|
|
Name = "SOURCE",
|
|
});
|
|
|
|
var req = coordinator.BuildConsumerCreateRequest();
|
|
|
|
// HeartbeatInterval is 1 second = 1000 ms
|
|
req.HeartbeatMs.ShouldBe(1000);
|
|
}
|
|
|
|
// -------------------------------------------------------------------------
|
|
// BuildConsumerCreateSubject
|
|
// Go reference: server/stream.go:3531
|
|
// -------------------------------------------------------------------------
|
|
|
|
[Fact]
|
|
public void BuildConsumerCreateSubject_formats_correctly()
|
|
{
|
|
var target = new MemStore();
|
|
var coordinator = new SourceCoordinator(target, new StreamSourceConfig
|
|
{
|
|
Name = "MY_SOURCE",
|
|
});
|
|
|
|
var subject = coordinator.BuildConsumerCreateSubject();
|
|
|
|
subject.ShouldBe("$JS.API.CONSUMER.CREATE.MY_SOURCE");
|
|
}
|
|
|
|
// -------------------------------------------------------------------------
|
|
// GetDeliverySequence
|
|
// Go reference: server/stream.go si.dseq field
|
|
// -------------------------------------------------------------------------
|
|
|
|
[Fact]
|
|
public void GetDeliverySequence_starts_at_zero()
|
|
{
|
|
var target = new MemStore();
|
|
var coordinator = new SourceCoordinator(target, new StreamSourceConfig
|
|
{
|
|
Name = "SOURCE",
|
|
});
|
|
|
|
coordinator.GetDeliverySequence.ShouldBe(0UL);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task GetDeliverySequence_increments_after_processing()
|
|
{
|
|
var target = new MemStore();
|
|
var coordinator = new SourceCoordinator(target, new StreamSourceConfig
|
|
{
|
|
Name = "SOURCE",
|
|
});
|
|
|
|
await coordinator.OnOriginAppendAsync(MakeMessage(1, "orders.created", "x"), default);
|
|
await coordinator.OnOriginAppendAsync(MakeMessage(2, "orders.updated", "y"), default);
|
|
await coordinator.OnOriginAppendAsync(MakeMessage(3, "orders.shipped", "z"), default);
|
|
|
|
coordinator.GetDeliverySequence.ShouldBe(3UL);
|
|
}
|
|
|
|
// -------------------------------------------------------------------------
|
|
// Helpers
|
|
// -------------------------------------------------------------------------
|
|
|
|
private static StoredMessage MakeMessage(ulong seq, string subject, string payload) => new()
|
|
{
|
|
Sequence = seq,
|
|
Subject = subject,
|
|
Payload = System.Text.Encoding.UTF8.GetBytes(payload),
|
|
TimestampUtc = DateTime.UtcNow,
|
|
};
|
|
}
|