feat: complete source consumer API request generation (Gap 4.3)
Add BuildConsumerCreateRequest, BuildConsumerCreateSubject, and
GetDeliverySequence to SourceCoordinator, modelling Go's
setupSourceConsumer/trySetupSourceConsumer. Covers DeliverPolicy
resume-from-sequence, AckPolicy.None, push/flow-control/heartbeat
consumer fields, and the $JS.API.CONSUMER.CREATE.{name} subject format.
This commit is contained in:
@@ -84,6 +84,12 @@ public sealed class SourceCoordinator : IAsyncDisposable
|
||||
/// <summary>The source configuration driving this coordinator.</summary>
|
||||
public StreamSourceConfig Config => _sourceConfig;
|
||||
|
||||
/// <summary>
|
||||
/// Current delivery sequence counter — number of messages successfully written to the target store.
|
||||
/// Go reference: server/stream.go si.dseq field
|
||||
/// </summary>
|
||||
public ulong GetDeliverySequence => _deliverySeq;
|
||||
|
||||
public SourceCoordinator(IStreamStore targetStore, StreamSourceConfig sourceConfig)
|
||||
{
|
||||
_targetStore = targetStore;
|
||||
@@ -95,6 +101,47 @@ public sealed class SourceCoordinator : IAsyncDisposable
|
||||
});
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Builds the consumer configuration that would be sent to the source stream to set up
|
||||
/// a push consumer for consumption. This models the consumer create request generated by
|
||||
/// Go's setupSourceConsumer / trySetupSourceConsumer.
|
||||
/// Go reference: server/stream.go:3474-3720 (setupSourceConsumer, trySetupSourceConsumer)
|
||||
/// </summary>
|
||||
public ConsumerConfig BuildConsumerCreateRequest()
|
||||
{
|
||||
// Go: server/stream.go:3597-3598 — if ssi.FilterSubject != _EMPTY_ { req.Config.FilterSubject = ssi.FilterSubject }
|
||||
var cfg = new ConsumerConfig
|
||||
{
|
||||
AckPolicy = AckPolicy.None,
|
||||
Push = true,
|
||||
FlowControl = true,
|
||||
HeartbeatMs = (int)HeartbeatInterval.TotalMilliseconds,
|
||||
};
|
||||
|
||||
if (!string.IsNullOrWhiteSpace(_sourceConfig.FilterSubject))
|
||||
cfg.FilterSubject = _sourceConfig.FilterSubject;
|
||||
|
||||
// Go: server/stream.go:3573-3582 — resume from LastOriginSequence+1, or DeliverAll when starting fresh
|
||||
if (LastOriginSequence == 0)
|
||||
{
|
||||
cfg.DeliverPolicy = DeliverPolicy.All;
|
||||
}
|
||||
else
|
||||
{
|
||||
cfg.DeliverPolicy = DeliverPolicy.ByStartSequence;
|
||||
cfg.OptStartSeq = LastOriginSequence + 1;
|
||||
}
|
||||
|
||||
return cfg;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Returns the JetStream API subject used to create a consumer on the source stream.
|
||||
/// Go reference: server/stream.go:3531 — $JS.API.CONSUMER.CREATE.{sourceName}
|
||||
/// </summary>
|
||||
public string BuildConsumerCreateSubject() =>
|
||||
$"$JS.API.CONSUMER.CREATE.{_sourceConfig.Name}";
|
||||
|
||||
/// <summary>
|
||||
/// Processes a single inbound message from the origin stream.
|
||||
/// This is the direct-call path used when the origin and target are in the same process.
|
||||
|
||||
@@ -0,0 +1,213 @@
|
||||
using NATS.Server.JetStream.MirrorSource;
|
||||
using NATS.Server.JetStream.Models;
|
||||
using NATS.Server.JetStream.Storage;
|
||||
using Shouldly;
|
||||
|
||||
namespace NATS.Server.Tests.JetStream.Streams;
|
||||
|
||||
// Go reference: server/stream.go:3474-3720 (setupSourceConsumer, trySetupSourceConsumer)
|
||||
// Go reference: server/stream.go:3531 ($JS.API.CONSUMER.CREATE.{sourceName})
|
||||
// Go reference: server/stream.go:3573-3598 (DeliverPolicy, FilterSubject, AckPolicy assignment)
|
||||
|
||||
public class SourceConsumerSetupTests
|
||||
{
|
||||
// -------------------------------------------------------------------------
|
||||
// BuildConsumerCreateRequest — FilterSubject
|
||||
// Go reference: server/stream.go:3597-3598
|
||||
// -------------------------------------------------------------------------
|
||||
|
||||
[Fact]
|
||||
public void BuildConsumerCreateRequest_sets_filter_subject()
|
||||
{
|
||||
var target = new MemStore();
|
||||
var coordinator = new SourceCoordinator(target, new StreamSourceConfig
|
||||
{
|
||||
Name = "SOURCE",
|
||||
FilterSubject = "orders.>",
|
||||
});
|
||||
|
||||
var req = coordinator.BuildConsumerCreateRequest();
|
||||
|
||||
req.FilterSubject.ShouldBe("orders.>");
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void BuildConsumerCreateRequest_no_filter_leaves_null()
|
||||
{
|
||||
var target = new MemStore();
|
||||
var coordinator = new SourceCoordinator(target, new StreamSourceConfig
|
||||
{
|
||||
Name = "SOURCE",
|
||||
});
|
||||
|
||||
var req = coordinator.BuildConsumerCreateRequest();
|
||||
|
||||
req.FilterSubject.ShouldBeNull();
|
||||
}
|
||||
|
||||
// -------------------------------------------------------------------------
|
||||
// BuildConsumerCreateRequest — DeliverPolicy
|
||||
// Go reference: server/stream.go:3573-3582
|
||||
// -------------------------------------------------------------------------
|
||||
|
||||
[Fact]
|
||||
public void BuildConsumerCreateRequest_starts_from_beginning_when_no_progress()
|
||||
{
|
||||
var target = new MemStore();
|
||||
var coordinator = new SourceCoordinator(target, new StreamSourceConfig
|
||||
{
|
||||
Name = "SOURCE",
|
||||
});
|
||||
|
||||
// LastOriginSequence is 0 — no messages have been processed yet
|
||||
coordinator.LastOriginSequence.ShouldBe(0UL);
|
||||
|
||||
var req = coordinator.BuildConsumerCreateRequest();
|
||||
|
||||
req.DeliverPolicy.ShouldBe(DeliverPolicy.All);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task BuildConsumerCreateRequest_resumes_from_last_sequence()
|
||||
{
|
||||
var target = new MemStore();
|
||||
var coordinator = new SourceCoordinator(target, new StreamSourceConfig
|
||||
{
|
||||
Name = "SOURCE",
|
||||
});
|
||||
|
||||
// Advance LastOriginSequence by processing messages
|
||||
await coordinator.OnOriginAppendAsync(MakeMessage(3, "orders.created", "a"), default);
|
||||
await coordinator.OnOriginAppendAsync(MakeMessage(7, "orders.updated", "b"), default);
|
||||
|
||||
coordinator.LastOriginSequence.ShouldBe(7UL);
|
||||
|
||||
var req = coordinator.BuildConsumerCreateRequest();
|
||||
|
||||
req.DeliverPolicy.ShouldBe(DeliverPolicy.ByStartSequence);
|
||||
req.OptStartSeq.ShouldBe(8UL); // LastOriginSequence + 1
|
||||
}
|
||||
|
||||
// -------------------------------------------------------------------------
|
||||
// BuildConsumerCreateRequest — AckPolicy
|
||||
// Go reference: server/stream.go:3586
|
||||
// -------------------------------------------------------------------------
|
||||
|
||||
[Fact]
|
||||
public void BuildConsumerCreateRequest_sets_ack_none()
|
||||
{
|
||||
var target = new MemStore();
|
||||
var coordinator = new SourceCoordinator(target, new StreamSourceConfig
|
||||
{
|
||||
Name = "SOURCE",
|
||||
});
|
||||
|
||||
var req = coordinator.BuildConsumerCreateRequest();
|
||||
|
||||
req.AckPolicy.ShouldBe(AckPolicy.None);
|
||||
}
|
||||
|
||||
// -------------------------------------------------------------------------
|
||||
// BuildConsumerCreateRequest — Push + FlowControl
|
||||
// Go reference: server/stream.go:3589-3592
|
||||
// -------------------------------------------------------------------------
|
||||
|
||||
[Fact]
|
||||
public void BuildConsumerCreateRequest_enables_push_and_flow_control()
|
||||
{
|
||||
var target = new MemStore();
|
||||
var coordinator = new SourceCoordinator(target, new StreamSourceConfig
|
||||
{
|
||||
Name = "SOURCE",
|
||||
});
|
||||
|
||||
var req = coordinator.BuildConsumerCreateRequest();
|
||||
|
||||
req.Push.ShouldBeTrue();
|
||||
req.FlowControl.ShouldBeTrue();
|
||||
}
|
||||
|
||||
// -------------------------------------------------------------------------
|
||||
// BuildConsumerCreateRequest — HeartbeatMs
|
||||
// Go reference: server/stream.go:3593 — sourceHealthHB = 1 * time.Second
|
||||
// -------------------------------------------------------------------------
|
||||
|
||||
[Fact]
|
||||
public void BuildConsumerCreateRequest_sets_heartbeat()
|
||||
{
|
||||
var target = new MemStore();
|
||||
var coordinator = new SourceCoordinator(target, new StreamSourceConfig
|
||||
{
|
||||
Name = "SOURCE",
|
||||
});
|
||||
|
||||
var req = coordinator.BuildConsumerCreateRequest();
|
||||
|
||||
// HeartbeatInterval is 1 second = 1000 ms
|
||||
req.HeartbeatMs.ShouldBe(1000);
|
||||
}
|
||||
|
||||
// -------------------------------------------------------------------------
|
||||
// BuildConsumerCreateSubject
|
||||
// Go reference: server/stream.go:3531
|
||||
// -------------------------------------------------------------------------
|
||||
|
||||
[Fact]
|
||||
public void BuildConsumerCreateSubject_formats_correctly()
|
||||
{
|
||||
var target = new MemStore();
|
||||
var coordinator = new SourceCoordinator(target, new StreamSourceConfig
|
||||
{
|
||||
Name = "MY_SOURCE",
|
||||
});
|
||||
|
||||
var subject = coordinator.BuildConsumerCreateSubject();
|
||||
|
||||
subject.ShouldBe("$JS.API.CONSUMER.CREATE.MY_SOURCE");
|
||||
}
|
||||
|
||||
// -------------------------------------------------------------------------
|
||||
// GetDeliverySequence
|
||||
// Go reference: server/stream.go si.dseq field
|
||||
// -------------------------------------------------------------------------
|
||||
|
||||
[Fact]
|
||||
public void GetDeliverySequence_starts_at_zero()
|
||||
{
|
||||
var target = new MemStore();
|
||||
var coordinator = new SourceCoordinator(target, new StreamSourceConfig
|
||||
{
|
||||
Name = "SOURCE",
|
||||
});
|
||||
|
||||
coordinator.GetDeliverySequence.ShouldBe(0UL);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task GetDeliverySequence_increments_after_processing()
|
||||
{
|
||||
var target = new MemStore();
|
||||
var coordinator = new SourceCoordinator(target, new StreamSourceConfig
|
||||
{
|
||||
Name = "SOURCE",
|
||||
});
|
||||
|
||||
await coordinator.OnOriginAppendAsync(MakeMessage(1, "orders.created", "x"), default);
|
||||
await coordinator.OnOriginAppendAsync(MakeMessage(2, "orders.updated", "y"), default);
|
||||
await coordinator.OnOriginAppendAsync(MakeMessage(3, "orders.shipped", "z"), default);
|
||||
|
||||
coordinator.GetDeliverySequence.ShouldBe(3UL);
|
||||
}
|
||||
|
||||
// -------------------------------------------------------------------------
|
||||
// Helpers
|
||||
// -------------------------------------------------------------------------
|
||||
|
||||
private static StoredMessage MakeMessage(ulong seq, string subject, string payload) => new()
|
||||
{
|
||||
Sequence = seq,
|
||||
Subject = subject,
|
||||
Payload = System.Text.Encoding.UTF8.GetBytes(payload),
|
||||
TimestampUtc = DateTime.UtcNow,
|
||||
};
|
||||
}
|
||||
Reference in New Issue
Block a user