diff --git a/src/NATS.Server/JetStream/MirrorSource/SourceCoordinator.cs b/src/NATS.Server/JetStream/MirrorSource/SourceCoordinator.cs index 57d30b5..cf9b16c 100644 --- a/src/NATS.Server/JetStream/MirrorSource/SourceCoordinator.cs +++ b/src/NATS.Server/JetStream/MirrorSource/SourceCoordinator.cs @@ -84,6 +84,12 @@ public sealed class SourceCoordinator : IAsyncDisposable /// The source configuration driving this coordinator. public StreamSourceConfig Config => _sourceConfig; + /// + /// Current delivery sequence counter — number of messages successfully written to the target store. + /// Go reference: server/stream.go si.dseq field + /// + public ulong GetDeliverySequence => _deliverySeq; + public SourceCoordinator(IStreamStore targetStore, StreamSourceConfig sourceConfig) { _targetStore = targetStore; @@ -95,6 +101,47 @@ public sealed class SourceCoordinator : IAsyncDisposable }); } + /// + /// Builds the consumer configuration that would be sent to the source stream to set up + /// a push consumer for consumption. This models the consumer create request generated by + /// Go's setupSourceConsumer / trySetupSourceConsumer. + /// Go reference: server/stream.go:3474-3720 (setupSourceConsumer, trySetupSourceConsumer) + /// + public ConsumerConfig BuildConsumerCreateRequest() + { + // Go: server/stream.go:3597-3598 — if ssi.FilterSubject != _EMPTY_ { req.Config.FilterSubject = ssi.FilterSubject } + var cfg = new ConsumerConfig + { + AckPolicy = AckPolicy.None, + Push = true, + FlowControl = true, + HeartbeatMs = (int)HeartbeatInterval.TotalMilliseconds, + }; + + if (!string.IsNullOrWhiteSpace(_sourceConfig.FilterSubject)) + cfg.FilterSubject = _sourceConfig.FilterSubject; + + // Go: server/stream.go:3573-3582 — resume from LastOriginSequence+1, or DeliverAll when starting fresh + if (LastOriginSequence == 0) + { + cfg.DeliverPolicy = DeliverPolicy.All; + } + else + { + cfg.DeliverPolicy = DeliverPolicy.ByStartSequence; + cfg.OptStartSeq = LastOriginSequence + 1; + } + + return cfg; + } + + /// + /// Returns the JetStream API subject used to create a consumer on the source stream. + /// Go reference: server/stream.go:3531 — $JS.API.CONSUMER.CREATE.{sourceName} + /// + public string BuildConsumerCreateSubject() => + $"$JS.API.CONSUMER.CREATE.{_sourceConfig.Name}"; + /// /// Processes a single inbound message from the origin stream. /// This is the direct-call path used when the origin and target are in the same process. diff --git a/tests/NATS.Server.Tests/JetStream/Streams/SourceConsumerSetupTests.cs b/tests/NATS.Server.Tests/JetStream/Streams/SourceConsumerSetupTests.cs new file mode 100644 index 0000000..c88ac6e --- /dev/null +++ b/tests/NATS.Server.Tests/JetStream/Streams/SourceConsumerSetupTests.cs @@ -0,0 +1,213 @@ +using NATS.Server.JetStream.MirrorSource; +using NATS.Server.JetStream.Models; +using NATS.Server.JetStream.Storage; +using Shouldly; + +namespace NATS.Server.Tests.JetStream.Streams; + +// Go reference: server/stream.go:3474-3720 (setupSourceConsumer, trySetupSourceConsumer) +// Go reference: server/stream.go:3531 ($JS.API.CONSUMER.CREATE.{sourceName}) +// Go reference: server/stream.go:3573-3598 (DeliverPolicy, FilterSubject, AckPolicy assignment) + +public class SourceConsumerSetupTests +{ + // ------------------------------------------------------------------------- + // BuildConsumerCreateRequest — FilterSubject + // Go reference: server/stream.go:3597-3598 + // ------------------------------------------------------------------------- + + [Fact] + public void BuildConsumerCreateRequest_sets_filter_subject() + { + var target = new MemStore(); + var coordinator = new SourceCoordinator(target, new StreamSourceConfig + { + Name = "SOURCE", + FilterSubject = "orders.>", + }); + + var req = coordinator.BuildConsumerCreateRequest(); + + req.FilterSubject.ShouldBe("orders.>"); + } + + [Fact] + public void BuildConsumerCreateRequest_no_filter_leaves_null() + { + var target = new MemStore(); + var coordinator = new SourceCoordinator(target, new StreamSourceConfig + { + Name = "SOURCE", + }); + + var req = coordinator.BuildConsumerCreateRequest(); + + req.FilterSubject.ShouldBeNull(); + } + + // ------------------------------------------------------------------------- + // BuildConsumerCreateRequest — DeliverPolicy + // Go reference: server/stream.go:3573-3582 + // ------------------------------------------------------------------------- + + [Fact] + public void BuildConsumerCreateRequest_starts_from_beginning_when_no_progress() + { + var target = new MemStore(); + var coordinator = new SourceCoordinator(target, new StreamSourceConfig + { + Name = "SOURCE", + }); + + // LastOriginSequence is 0 — no messages have been processed yet + coordinator.LastOriginSequence.ShouldBe(0UL); + + var req = coordinator.BuildConsumerCreateRequest(); + + req.DeliverPolicy.ShouldBe(DeliverPolicy.All); + } + + [Fact] + public async Task BuildConsumerCreateRequest_resumes_from_last_sequence() + { + var target = new MemStore(); + var coordinator = new SourceCoordinator(target, new StreamSourceConfig + { + Name = "SOURCE", + }); + + // Advance LastOriginSequence by processing messages + await coordinator.OnOriginAppendAsync(MakeMessage(3, "orders.created", "a"), default); + await coordinator.OnOriginAppendAsync(MakeMessage(7, "orders.updated", "b"), default); + + coordinator.LastOriginSequence.ShouldBe(7UL); + + var req = coordinator.BuildConsumerCreateRequest(); + + req.DeliverPolicy.ShouldBe(DeliverPolicy.ByStartSequence); + req.OptStartSeq.ShouldBe(8UL); // LastOriginSequence + 1 + } + + // ------------------------------------------------------------------------- + // BuildConsumerCreateRequest — AckPolicy + // Go reference: server/stream.go:3586 + // ------------------------------------------------------------------------- + + [Fact] + public void BuildConsumerCreateRequest_sets_ack_none() + { + var target = new MemStore(); + var coordinator = new SourceCoordinator(target, new StreamSourceConfig + { + Name = "SOURCE", + }); + + var req = coordinator.BuildConsumerCreateRequest(); + + req.AckPolicy.ShouldBe(AckPolicy.None); + } + + // ------------------------------------------------------------------------- + // BuildConsumerCreateRequest — Push + FlowControl + // Go reference: server/stream.go:3589-3592 + // ------------------------------------------------------------------------- + + [Fact] + public void BuildConsumerCreateRequest_enables_push_and_flow_control() + { + var target = new MemStore(); + var coordinator = new SourceCoordinator(target, new StreamSourceConfig + { + Name = "SOURCE", + }); + + var req = coordinator.BuildConsumerCreateRequest(); + + req.Push.ShouldBeTrue(); + req.FlowControl.ShouldBeTrue(); + } + + // ------------------------------------------------------------------------- + // BuildConsumerCreateRequest — HeartbeatMs + // Go reference: server/stream.go:3593 — sourceHealthHB = 1 * time.Second + // ------------------------------------------------------------------------- + + [Fact] + public void BuildConsumerCreateRequest_sets_heartbeat() + { + var target = new MemStore(); + var coordinator = new SourceCoordinator(target, new StreamSourceConfig + { + Name = "SOURCE", + }); + + var req = coordinator.BuildConsumerCreateRequest(); + + // HeartbeatInterval is 1 second = 1000 ms + req.HeartbeatMs.ShouldBe(1000); + } + + // ------------------------------------------------------------------------- + // BuildConsumerCreateSubject + // Go reference: server/stream.go:3531 + // ------------------------------------------------------------------------- + + [Fact] + public void BuildConsumerCreateSubject_formats_correctly() + { + var target = new MemStore(); + var coordinator = new SourceCoordinator(target, new StreamSourceConfig + { + Name = "MY_SOURCE", + }); + + var subject = coordinator.BuildConsumerCreateSubject(); + + subject.ShouldBe("$JS.API.CONSUMER.CREATE.MY_SOURCE"); + } + + // ------------------------------------------------------------------------- + // GetDeliverySequence + // Go reference: server/stream.go si.dseq field + // ------------------------------------------------------------------------- + + [Fact] + public void GetDeliverySequence_starts_at_zero() + { + var target = new MemStore(); + var coordinator = new SourceCoordinator(target, new StreamSourceConfig + { + Name = "SOURCE", + }); + + coordinator.GetDeliverySequence.ShouldBe(0UL); + } + + [Fact] + public async Task GetDeliverySequence_increments_after_processing() + { + var target = new MemStore(); + var coordinator = new SourceCoordinator(target, new StreamSourceConfig + { + Name = "SOURCE", + }); + + await coordinator.OnOriginAppendAsync(MakeMessage(1, "orders.created", "x"), default); + await coordinator.OnOriginAppendAsync(MakeMessage(2, "orders.updated", "y"), default); + await coordinator.OnOriginAppendAsync(MakeMessage(3, "orders.shipped", "z"), default); + + coordinator.GetDeliverySequence.ShouldBe(3UL); + } + + // ------------------------------------------------------------------------- + // Helpers + // ------------------------------------------------------------------------- + + private static StoredMessage MakeMessage(ulong seq, string subject, string payload) => new() + { + Sequence = seq, + Subject = subject, + Payload = System.Text.Encoding.UTF8.GetBytes(payload), + TimestampUtc = DateTime.UtcNow, + }; +}