Files
Joseph Doherty 78b4bc2486 refactor: extract NATS.Server.JetStream.Tests project
Move 225 JetStream-related test files from NATS.Server.Tests into a
dedicated NATS.Server.JetStream.Tests project. This includes root-level
JetStream*.cs files, storage test files (FileStore, MemStore,
StreamStoreContract), and the full JetStream/ subfolder tree (Api,
Cluster, Consumers, MirrorSource, Snapshots, Storage, Streams).

Updated all namespaces, added InternalsVisibleTo, registered in the
solution file, and added the JETSTREAM_INTEGRATION_MATRIX define.
2026-03-12 15:58:10 -04:00

214 lines
7.1 KiB
C#

using NATS.Server.JetStream.MirrorSource;
using NATS.Server.JetStream.Models;
using NATS.Server.JetStream.Storage;
using Shouldly;
namespace NATS.Server.JetStream.Tests.JetStream.Streams;
// Go reference: server/stream.go:3474-3720 (setupSourceConsumer, trySetupSourceConsumer)
// Go reference: server/stream.go:3531 ($JS.API.CONSUMER.CREATE.{sourceName})
// Go reference: server/stream.go:3573-3598 (DeliverPolicy, FilterSubject, AckPolicy assignment)
public class SourceConsumerSetupTests
{
// -------------------------------------------------------------------------
// BuildConsumerCreateRequest — FilterSubject
// Go reference: server/stream.go:3597-3598
// -------------------------------------------------------------------------
[Fact]
public void BuildConsumerCreateRequest_sets_filter_subject()
{
var target = new MemStore();
var coordinator = new SourceCoordinator(target, new StreamSourceConfig
{
Name = "SOURCE",
FilterSubject = "orders.>",
});
var req = coordinator.BuildConsumerCreateRequest();
req.FilterSubject.ShouldBe("orders.>");
}
[Fact]
public void BuildConsumerCreateRequest_no_filter_leaves_null()
{
var target = new MemStore();
var coordinator = new SourceCoordinator(target, new StreamSourceConfig
{
Name = "SOURCE",
});
var req = coordinator.BuildConsumerCreateRequest();
req.FilterSubject.ShouldBeNull();
}
// -------------------------------------------------------------------------
// BuildConsumerCreateRequest — DeliverPolicy
// Go reference: server/stream.go:3573-3582
// -------------------------------------------------------------------------
[Fact]
public void BuildConsumerCreateRequest_starts_from_beginning_when_no_progress()
{
var target = new MemStore();
var coordinator = new SourceCoordinator(target, new StreamSourceConfig
{
Name = "SOURCE",
});
// LastOriginSequence is 0 — no messages have been processed yet
coordinator.LastOriginSequence.ShouldBe(0UL);
var req = coordinator.BuildConsumerCreateRequest();
req.DeliverPolicy.ShouldBe(DeliverPolicy.All);
}
[Fact]
public async Task BuildConsumerCreateRequest_resumes_from_last_sequence()
{
var target = new MemStore();
var coordinator = new SourceCoordinator(target, new StreamSourceConfig
{
Name = "SOURCE",
});
// Advance LastOriginSequence by processing messages
await coordinator.OnOriginAppendAsync(MakeMessage(3, "orders.created", "a"), default);
await coordinator.OnOriginAppendAsync(MakeMessage(7, "orders.updated", "b"), default);
coordinator.LastOriginSequence.ShouldBe(7UL);
var req = coordinator.BuildConsumerCreateRequest();
req.DeliverPolicy.ShouldBe(DeliverPolicy.ByStartSequence);
req.OptStartSeq.ShouldBe(8UL); // LastOriginSequence + 1
}
// -------------------------------------------------------------------------
// BuildConsumerCreateRequest — AckPolicy
// Go reference: server/stream.go:3586
// -------------------------------------------------------------------------
[Fact]
public void BuildConsumerCreateRequest_sets_ack_none()
{
var target = new MemStore();
var coordinator = new SourceCoordinator(target, new StreamSourceConfig
{
Name = "SOURCE",
});
var req = coordinator.BuildConsumerCreateRequest();
req.AckPolicy.ShouldBe(AckPolicy.None);
}
// -------------------------------------------------------------------------
// BuildConsumerCreateRequest — Push + FlowControl
// Go reference: server/stream.go:3589-3592
// -------------------------------------------------------------------------
[Fact]
public void BuildConsumerCreateRequest_enables_push_and_flow_control()
{
var target = new MemStore();
var coordinator = new SourceCoordinator(target, new StreamSourceConfig
{
Name = "SOURCE",
});
var req = coordinator.BuildConsumerCreateRequest();
req.Push.ShouldBeTrue();
req.FlowControl.ShouldBeTrue();
}
// -------------------------------------------------------------------------
// BuildConsumerCreateRequest — HeartbeatMs
// Go reference: server/stream.go:3593 — sourceHealthHB = 1 * time.Second
// -------------------------------------------------------------------------
[Fact]
public void BuildConsumerCreateRequest_sets_heartbeat()
{
var target = new MemStore();
var coordinator = new SourceCoordinator(target, new StreamSourceConfig
{
Name = "SOURCE",
});
var req = coordinator.BuildConsumerCreateRequest();
// HeartbeatInterval is 1 second = 1000 ms
req.HeartbeatMs.ShouldBe(1000);
}
// -------------------------------------------------------------------------
// BuildConsumerCreateSubject
// Go reference: server/stream.go:3531
// -------------------------------------------------------------------------
[Fact]
public void BuildConsumerCreateSubject_formats_correctly()
{
var target = new MemStore();
var coordinator = new SourceCoordinator(target, new StreamSourceConfig
{
Name = "MY_SOURCE",
});
var subject = coordinator.BuildConsumerCreateSubject();
subject.ShouldBe("$JS.API.CONSUMER.CREATE.MY_SOURCE");
}
// -------------------------------------------------------------------------
// GetDeliverySequence
// Go reference: server/stream.go si.dseq field
// -------------------------------------------------------------------------
[Fact]
public void GetDeliverySequence_starts_at_zero()
{
var target = new MemStore();
var coordinator = new SourceCoordinator(target, new StreamSourceConfig
{
Name = "SOURCE",
});
coordinator.GetDeliverySequence.ShouldBe(0UL);
}
[Fact]
public async Task GetDeliverySequence_increments_after_processing()
{
var target = new MemStore();
var coordinator = new SourceCoordinator(target, new StreamSourceConfig
{
Name = "SOURCE",
});
await coordinator.OnOriginAppendAsync(MakeMessage(1, "orders.created", "x"), default);
await coordinator.OnOriginAppendAsync(MakeMessage(2, "orders.updated", "y"), default);
await coordinator.OnOriginAppendAsync(MakeMessage(3, "orders.shipped", "z"), default);
coordinator.GetDeliverySequence.ShouldBe(3UL);
}
// -------------------------------------------------------------------------
// Helpers
// -------------------------------------------------------------------------
private static StoredMessage MakeMessage(ulong seq, string subject, string payload) => new()
{
Sequence = seq,
Subject = subject,
Payload = System.Text.Encoding.UTF8.GetBytes(payload),
TimestampUtc = DateTime.UtcNow,
};
}