refactor: extract NATS.Server.JetStream.Tests project
Move 225 JetStream-related test files from NATS.Server.Tests into a dedicated NATS.Server.JetStream.Tests project. This includes root-level JetStream*.cs files, storage test files (FileStore, MemStore, StreamStoreContract), and the full JetStream/ subfolder tree (Api, Cluster, Consumers, MirrorSource, Snapshots, Storage, Streams). Updated all namespaces, added InternalsVisibleTo, registered in the solution file, and added the JETSTREAM_INTEGRATION_MATRIX define.
This commit is contained in:
@@ -0,0 +1,341 @@
|
||||
using NATS.Server.JetStream.MirrorSource;
|
||||
using NATS.Server.JetStream.Storage;
|
||||
|
||||
namespace NATS.Server.JetStream.Tests.JetStream.MirrorSource;
|
||||
|
||||
// Go reference: server/stream.go:2788-2854 (processMirrorMsgs)
|
||||
// Go reference: server/stream.go:2863-3014 (processInboundMirrorMsg)
|
||||
// Go reference: server/stream.go:3125-3400 (setupMirrorConsumer)
|
||||
|
||||
public class MirrorSyncTests
|
||||
{
|
||||
// -------------------------------------------------------------------------
|
||||
// Direct in-process synchronization tests
|
||||
// -------------------------------------------------------------------------
|
||||
|
||||
[Fact]
|
||||
// Go reference: server/stream.go:2915 — sseq == mset.mirror.sseq+1 (normal in-order)
|
||||
public async Task Mirror_applies_single_message_and_tracks_sequence()
|
||||
{
|
||||
var target = new MemStore();
|
||||
var mirror = new MirrorCoordinator(target);
|
||||
|
||||
var msg = MakeMessage(seq: 1, subject: "orders.created", payload: "order-1");
|
||||
await mirror.OnOriginAppendAsync(msg, default);
|
||||
|
||||
mirror.LastOriginSequence.ShouldBe(1UL);
|
||||
mirror.LastSyncUtc.ShouldNotBe(default(DateTime));
|
||||
mirror.Lag.ShouldBe(0UL);
|
||||
|
||||
var stored = await target.LoadAsync(1, default);
|
||||
stored.ShouldNotBeNull();
|
||||
stored.Subject.ShouldBe("orders.created");
|
||||
}
|
||||
|
||||
[Fact]
|
||||
// Go reference: server/stream.go:2915-2917 — sequential messages increment sseq/dseq
|
||||
public async Task Mirror_applies_sequential_messages_in_order()
|
||||
{
|
||||
var target = new MemStore();
|
||||
var mirror = new MirrorCoordinator(target);
|
||||
|
||||
for (ulong i = 1; i <= 5; i++)
|
||||
{
|
||||
await mirror.OnOriginAppendAsync(
|
||||
MakeMessage(seq: i, subject: $"orders.{i}", payload: $"payload-{i}"), default);
|
||||
}
|
||||
|
||||
mirror.LastOriginSequence.ShouldBe(5UL);
|
||||
var state = await target.GetStateAsync(default);
|
||||
state.Messages.ShouldBe(5UL);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
// Go reference: server/stream.go:2918-2921 — sseq <= mset.mirror.sseq (ignore older)
|
||||
public async Task Mirror_ignores_older_duplicate_messages()
|
||||
{
|
||||
var target = new MemStore();
|
||||
var mirror = new MirrorCoordinator(target);
|
||||
|
||||
await mirror.OnOriginAppendAsync(MakeMessage(seq: 5, subject: "a", payload: "1"), default);
|
||||
await mirror.OnOriginAppendAsync(MakeMessage(seq: 3, subject: "b", payload: "2"), default); // older
|
||||
await mirror.OnOriginAppendAsync(MakeMessage(seq: 5, subject: "c", payload: "3"), default); // same
|
||||
|
||||
mirror.LastOriginSequence.ShouldBe(5UL);
|
||||
var state = await target.GetStateAsync(default);
|
||||
state.Messages.ShouldBe(1UL); // only seq 5 stored
|
||||
}
|
||||
|
||||
[Fact]
|
||||
// Go reference: server/stream.go:2927-2936 — gap handling (sseq > mirror.sseq+1)
|
||||
public async Task Mirror_handles_sequence_gaps_from_origin()
|
||||
{
|
||||
var target = new MemStore();
|
||||
var mirror = new MirrorCoordinator(target);
|
||||
|
||||
await mirror.OnOriginAppendAsync(MakeMessage(seq: 1, subject: "a", payload: "1"), default);
|
||||
// Gap: origin deleted seq 2-4
|
||||
await mirror.OnOriginAppendAsync(MakeMessage(seq: 5, subject: "b", payload: "2"), default);
|
||||
|
||||
mirror.LastOriginSequence.ShouldBe(5UL);
|
||||
var state = await target.GetStateAsync(default);
|
||||
state.Messages.ShouldBe(2UL);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Mirror_first_message_at_arbitrary_sequence()
|
||||
{
|
||||
var target = new MemStore();
|
||||
var mirror = new MirrorCoordinator(target);
|
||||
|
||||
// First message arrives at seq 100 (origin has prior history)
|
||||
await mirror.OnOriginAppendAsync(MakeMessage(seq: 100, subject: "a", payload: "1"), default);
|
||||
|
||||
mirror.LastOriginSequence.ShouldBe(100UL);
|
||||
var stored = await target.LoadAsync(1, default);
|
||||
stored.ShouldNotBeNull();
|
||||
}
|
||||
|
||||
// -------------------------------------------------------------------------
|
||||
// Health reporting tests
|
||||
// -------------------------------------------------------------------------
|
||||
|
||||
[Fact]
|
||||
// Go reference: server/stream.go:2739-2743 (mirrorInfo)
|
||||
public async Task Health_report_reflects_current_state()
|
||||
{
|
||||
var target = new MemStore();
|
||||
var mirror = new MirrorCoordinator(target);
|
||||
|
||||
var report = mirror.GetHealthReport(originLastSeq: 10);
|
||||
report.LastOriginSequence.ShouldBe(0UL);
|
||||
report.Lag.ShouldBe(10UL);
|
||||
report.IsRunning.ShouldBeFalse();
|
||||
|
||||
await mirror.OnOriginAppendAsync(MakeMessage(seq: 7, subject: "a", payload: "1"), default);
|
||||
|
||||
report = mirror.GetHealthReport(originLastSeq: 10);
|
||||
report.LastOriginSequence.ShouldBe(7UL);
|
||||
report.Lag.ShouldBe(3UL);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Health_report_shows_zero_lag_when_caught_up()
|
||||
{
|
||||
var target = new MemStore();
|
||||
var mirror = new MirrorCoordinator(target);
|
||||
|
||||
await mirror.OnOriginAppendAsync(MakeMessage(seq: 10, subject: "a", payload: "1"), default);
|
||||
|
||||
var report = mirror.GetHealthReport(originLastSeq: 10);
|
||||
report.Lag.ShouldBe(0UL);
|
||||
}
|
||||
|
||||
// -------------------------------------------------------------------------
|
||||
// Background sync loop: channel-based
|
||||
// Go reference: server/stream.go:2788-2854 (processMirrorMsgs goroutine)
|
||||
// -------------------------------------------------------------------------
|
||||
|
||||
[Fact]
|
||||
public async Task Channel_sync_loop_processes_enqueued_messages()
|
||||
{
|
||||
var target = new MemStore();
|
||||
await using var mirror = new MirrorCoordinator(target);
|
||||
|
||||
mirror.StartSyncLoop();
|
||||
mirror.IsRunning.ShouldBeTrue();
|
||||
|
||||
mirror.TryEnqueue(MakeMessage(seq: 1, subject: "a", payload: "1"));
|
||||
mirror.TryEnqueue(MakeMessage(seq: 2, subject: "b", payload: "2"));
|
||||
|
||||
await WaitForConditionAsync(() => mirror.LastOriginSequence >= 2, TimeSpan.FromSeconds(5));
|
||||
|
||||
mirror.LastOriginSequence.ShouldBe(2UL);
|
||||
var state = await target.GetStateAsync(default);
|
||||
state.Messages.ShouldBe(2UL);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Channel_sync_loop_can_be_stopped()
|
||||
{
|
||||
var target = new MemStore();
|
||||
await using var mirror = new MirrorCoordinator(target);
|
||||
|
||||
mirror.StartSyncLoop();
|
||||
mirror.IsRunning.ShouldBeTrue();
|
||||
|
||||
await mirror.StopAsync();
|
||||
mirror.IsRunning.ShouldBeFalse();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Channel_sync_loop_ignores_duplicates()
|
||||
{
|
||||
var target = new MemStore();
|
||||
await using var mirror = new MirrorCoordinator(target);
|
||||
|
||||
mirror.StartSyncLoop();
|
||||
|
||||
mirror.TryEnqueue(MakeMessage(seq: 1, subject: "a", payload: "1"));
|
||||
mirror.TryEnqueue(MakeMessage(seq: 1, subject: "a", payload: "1")); // duplicate
|
||||
mirror.TryEnqueue(MakeMessage(seq: 2, subject: "b", payload: "2"));
|
||||
|
||||
await WaitForConditionAsync(() => mirror.LastOriginSequence >= 2, TimeSpan.FromSeconds(5));
|
||||
|
||||
var state = await target.GetStateAsync(default);
|
||||
state.Messages.ShouldBe(2UL);
|
||||
}
|
||||
|
||||
// -------------------------------------------------------------------------
|
||||
// Background sync loop: pull-based
|
||||
// Go reference: server/stream.go:3125-3400 (setupMirrorConsumer)
|
||||
// -------------------------------------------------------------------------
|
||||
|
||||
[Fact]
|
||||
public async Task Pull_sync_loop_fetches_from_origin_store()
|
||||
{
|
||||
var origin = new MemStore();
|
||||
var target = new MemStore();
|
||||
await using var mirror = new MirrorCoordinator(target);
|
||||
|
||||
// Pre-populate origin
|
||||
await origin.AppendAsync("a", "1"u8.ToArray(), default);
|
||||
await origin.AppendAsync("b", "2"u8.ToArray(), default);
|
||||
await origin.AppendAsync("c", "3"u8.ToArray(), default);
|
||||
|
||||
mirror.StartPullSyncLoop(origin);
|
||||
|
||||
await WaitForConditionAsync(() => mirror.LastOriginSequence >= 3, TimeSpan.FromSeconds(5));
|
||||
|
||||
mirror.LastOriginSequence.ShouldBe(3UL);
|
||||
var state = await target.GetStateAsync(default);
|
||||
state.Messages.ShouldBe(3UL);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Pull_sync_loop_catches_up_after_restart()
|
||||
{
|
||||
var origin = new MemStore();
|
||||
var target = new MemStore();
|
||||
|
||||
// Phase 1: sync first 2 messages
|
||||
{
|
||||
await using var mirror = new MirrorCoordinator(target);
|
||||
await origin.AppendAsync("a", "1"u8.ToArray(), default);
|
||||
await origin.AppendAsync("b", "2"u8.ToArray(), default);
|
||||
|
||||
mirror.StartPullSyncLoop(origin);
|
||||
await WaitForConditionAsync(() => mirror.LastOriginSequence >= 2, TimeSpan.FromSeconds(5));
|
||||
await mirror.StopAsync();
|
||||
}
|
||||
|
||||
// Phase 2: add more messages and restart with new coordinator
|
||||
await origin.AppendAsync("c", "3"u8.ToArray(), default);
|
||||
await origin.AppendAsync("d", "4"u8.ToArray(), default);
|
||||
|
||||
{
|
||||
// Simulate restart: new coordinator, same target store
|
||||
await using var mirror2 = new MirrorCoordinator(target);
|
||||
|
||||
// Manually sync to simulate catchup from seq 2
|
||||
await mirror2.OnOriginAppendAsync(
|
||||
new StoredMessage { Sequence = 3, Subject = "c", Payload = "3"u8.ToArray() }, default);
|
||||
await mirror2.OnOriginAppendAsync(
|
||||
new StoredMessage { Sequence = 4, Subject = "d", Payload = "4"u8.ToArray() }, default);
|
||||
|
||||
mirror2.LastOriginSequence.ShouldBe(4UL);
|
||||
}
|
||||
|
||||
var state = await target.GetStateAsync(default);
|
||||
state.Messages.ShouldBe(4UL);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Pull_sync_loop_updates_lag()
|
||||
{
|
||||
var origin = new MemStore();
|
||||
var target = new MemStore();
|
||||
await using var mirror = new MirrorCoordinator(target);
|
||||
|
||||
// Pre-populate origin with 10 messages
|
||||
for (var i = 0; i < 10; i++)
|
||||
await origin.AppendAsync($"subj.{i}", System.Text.Encoding.UTF8.GetBytes($"payload-{i}"), default);
|
||||
|
||||
mirror.StartPullSyncLoop(origin, batchSize: 3);
|
||||
|
||||
// Wait for some progress
|
||||
await WaitForConditionAsync(() => mirror.LastOriginSequence >= 3, TimeSpan.FromSeconds(5));
|
||||
|
||||
// Eventually should catch up to all 10
|
||||
await WaitForConditionAsync(() => mirror.LastOriginSequence >= 10, TimeSpan.FromSeconds(10));
|
||||
|
||||
var report = mirror.GetHealthReport(originLastSeq: 10);
|
||||
report.Lag.ShouldBe(0UL);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Pull_sync_loop_handles_empty_origin()
|
||||
{
|
||||
var origin = new MemStore();
|
||||
var target = new MemStore();
|
||||
await using var mirror = new MirrorCoordinator(target);
|
||||
|
||||
mirror.StartPullSyncLoop(origin);
|
||||
|
||||
// Wait a bit to ensure it doesn't crash
|
||||
await Task.Delay(200);
|
||||
|
||||
mirror.IsRunning.ShouldBeTrue();
|
||||
mirror.LastOriginSequence.ShouldBe(0UL);
|
||||
}
|
||||
|
||||
// -------------------------------------------------------------------------
|
||||
// Dispose / lifecycle tests
|
||||
// -------------------------------------------------------------------------
|
||||
|
||||
[Fact]
|
||||
public async Task Dispose_stops_running_sync_loop()
|
||||
{
|
||||
var target = new MemStore();
|
||||
var mirror = new MirrorCoordinator(target);
|
||||
|
||||
mirror.StartSyncLoop();
|
||||
mirror.IsRunning.ShouldBeTrue();
|
||||
|
||||
await mirror.DisposeAsync();
|
||||
mirror.IsRunning.ShouldBeFalse();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Multiple_start_calls_are_idempotent()
|
||||
{
|
||||
var target = new MemStore();
|
||||
await using var mirror = new MirrorCoordinator(target);
|
||||
|
||||
mirror.StartSyncLoop();
|
||||
mirror.StartSyncLoop(); // second call should be no-op
|
||||
|
||||
mirror.IsRunning.ShouldBeTrue();
|
||||
}
|
||||
|
||||
// -------------------------------------------------------------------------
|
||||
// Helpers
|
||||
// -------------------------------------------------------------------------
|
||||
|
||||
private static StoredMessage MakeMessage(ulong seq, string subject, string payload) => new()
|
||||
{
|
||||
Sequence = seq,
|
||||
Subject = subject,
|
||||
Payload = System.Text.Encoding.UTF8.GetBytes(payload),
|
||||
TimestampUtc = DateTime.UtcNow,
|
||||
};
|
||||
|
||||
private static async Task WaitForConditionAsync(Func<bool> condition, TimeSpan timeout)
|
||||
{
|
||||
using var cts = new CancellationTokenSource(timeout);
|
||||
while (!condition())
|
||||
{
|
||||
await Task.Delay(25, cts.Token);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,569 @@
|
||||
using NATS.Server.JetStream.MirrorSource;
|
||||
using NATS.Server.JetStream.Models;
|
||||
using NATS.Server.JetStream.Storage;
|
||||
|
||||
namespace NATS.Server.JetStream.Tests.JetStream.MirrorSource;
|
||||
|
||||
// Go reference: server/stream.go:3860-4007 (processInboundSourceMsg)
|
||||
// Go reference: server/stream.go:3474-3720 (setupSourceConsumer, trySetupSourceConsumer)
|
||||
// Go reference: server/stream.go:3752-3833 (processAllSourceMsgs)
|
||||
|
||||
public class SourceFilterTests
|
||||
{
|
||||
// -------------------------------------------------------------------------
|
||||
// Subject filtering
|
||||
// Go reference: server/stream.go:3597-3598 — FilterSubject on consumer creation
|
||||
// -------------------------------------------------------------------------
|
||||
|
||||
[Fact]
|
||||
public async Task Source_with_filter_only_forwards_matching_messages()
|
||||
{
|
||||
var target = new MemStore();
|
||||
var source = new SourceCoordinator(target, new StreamSourceConfig
|
||||
{
|
||||
Name = "SRC",
|
||||
FilterSubject = "orders.*",
|
||||
});
|
||||
|
||||
await source.OnOriginAppendAsync(MakeMessage(1, "orders.created", "1"), default);
|
||||
await source.OnOriginAppendAsync(MakeMessage(2, "events.login", "2"), default); // filtered out
|
||||
await source.OnOriginAppendAsync(MakeMessage(3, "orders.updated", "3"), default);
|
||||
|
||||
source.LastOriginSequence.ShouldBe(3UL);
|
||||
source.FilteredOutCount.ShouldBe(1);
|
||||
|
||||
var state = await target.GetStateAsync(default);
|
||||
state.Messages.ShouldBe(2UL);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Source_with_wildcard_filter_matches_multi_token()
|
||||
{
|
||||
var target = new MemStore();
|
||||
var source = new SourceCoordinator(target, new StreamSourceConfig
|
||||
{
|
||||
Name = "SRC",
|
||||
FilterSubject = "orders.>",
|
||||
});
|
||||
|
||||
await source.OnOriginAppendAsync(MakeMessage(1, "orders.created", "1"), default);
|
||||
await source.OnOriginAppendAsync(MakeMessage(2, "orders.us.created", "2"), default);
|
||||
await source.OnOriginAppendAsync(MakeMessage(3, "events.login", "3"), default); // filtered out
|
||||
|
||||
var state = await target.GetStateAsync(default);
|
||||
state.Messages.ShouldBe(2UL);
|
||||
source.FilteredOutCount.ShouldBe(1);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Source_without_filter_forwards_all_messages()
|
||||
{
|
||||
var target = new MemStore();
|
||||
var source = new SourceCoordinator(target, new StreamSourceConfig
|
||||
{
|
||||
Name = "SRC",
|
||||
});
|
||||
|
||||
await source.OnOriginAppendAsync(MakeMessage(1, "orders.created", "1"), default);
|
||||
await source.OnOriginAppendAsync(MakeMessage(2, "events.login", "2"), default);
|
||||
await source.OnOriginAppendAsync(MakeMessage(3, "anything.goes", "3"), default);
|
||||
|
||||
var state = await target.GetStateAsync(default);
|
||||
state.Messages.ShouldBe(3UL);
|
||||
source.FilteredOutCount.ShouldBe(0);
|
||||
}
|
||||
|
||||
// -------------------------------------------------------------------------
|
||||
// Subject transform prefix
|
||||
// Go reference: server/stream.go:3943-3956 (subject transform for the source)
|
||||
// -------------------------------------------------------------------------
|
||||
|
||||
[Fact]
|
||||
public async Task Source_applies_subject_transform_prefix()
|
||||
{
|
||||
var target = new MemStore();
|
||||
var source = new SourceCoordinator(target, new StreamSourceConfig
|
||||
{
|
||||
Name = "SRC",
|
||||
SubjectTransformPrefix = "agg.",
|
||||
});
|
||||
|
||||
await source.OnOriginAppendAsync(MakeMessage(1, "orders.created", "1"), default);
|
||||
|
||||
var stored = await target.LoadAsync(1, default);
|
||||
stored.ShouldNotBeNull();
|
||||
stored.Subject.ShouldBe("agg.orders.created");
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Source_with_filter_and_transform_applies_both()
|
||||
{
|
||||
var target = new MemStore();
|
||||
var source = new SourceCoordinator(target, new StreamSourceConfig
|
||||
{
|
||||
Name = "SRC",
|
||||
FilterSubject = "orders.*",
|
||||
SubjectTransformPrefix = "mirror.",
|
||||
});
|
||||
|
||||
await source.OnOriginAppendAsync(MakeMessage(1, "orders.created", "1"), default);
|
||||
await source.OnOriginAppendAsync(MakeMessage(2, "events.login", "2"), default);
|
||||
|
||||
var state = await target.GetStateAsync(default);
|
||||
state.Messages.ShouldBe(1UL);
|
||||
|
||||
var stored = await target.LoadAsync(1, default);
|
||||
stored.ShouldNotBeNull();
|
||||
stored.Subject.ShouldBe("mirror.orders.created");
|
||||
}
|
||||
|
||||
// -------------------------------------------------------------------------
|
||||
// Account isolation
|
||||
// -------------------------------------------------------------------------
|
||||
|
||||
[Fact]
|
||||
public async Task Source_with_account_filter_skips_wrong_account()
|
||||
{
|
||||
var target = new MemStore();
|
||||
var source = new SourceCoordinator(target, new StreamSourceConfig
|
||||
{
|
||||
Name = "SRC",
|
||||
SourceAccount = "PROD",
|
||||
});
|
||||
|
||||
await source.OnOriginAppendAsync(MakeMessage(1, "a", "1", account: "PROD"), default);
|
||||
await source.OnOriginAppendAsync(MakeMessage(2, "b", "2", account: "DEV"), default); // wrong account
|
||||
await source.OnOriginAppendAsync(MakeMessage(3, "c", "3", account: "PROD"), default);
|
||||
|
||||
var state = await target.GetStateAsync(default);
|
||||
state.Messages.ShouldBe(2UL);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Source_with_account_allows_null_account_messages()
|
||||
{
|
||||
var target = new MemStore();
|
||||
var source = new SourceCoordinator(target, new StreamSourceConfig
|
||||
{
|
||||
Name = "SRC",
|
||||
SourceAccount = "PROD",
|
||||
});
|
||||
|
||||
// Messages with no account set should pass through (Go: account field empty means skip check)
|
||||
await source.OnOriginAppendAsync(MakeMessage(1, "a", "1", account: null), default);
|
||||
|
||||
var state = await target.GetStateAsync(default);
|
||||
state.Messages.ShouldBe(1UL);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Source_without_account_filter_passes_all_accounts()
|
||||
{
|
||||
var target = new MemStore();
|
||||
var source = new SourceCoordinator(target, new StreamSourceConfig
|
||||
{
|
||||
Name = "SRC",
|
||||
});
|
||||
|
||||
await source.OnOriginAppendAsync(MakeMessage(1, "a", "1", account: "A"), default);
|
||||
await source.OnOriginAppendAsync(MakeMessage(2, "b", "2", account: "B"), default);
|
||||
|
||||
var state = await target.GetStateAsync(default);
|
||||
state.Messages.ShouldBe(2UL);
|
||||
}
|
||||
|
||||
// -------------------------------------------------------------------------
|
||||
// Deduplication via Nats-Msg-Id header
|
||||
// -------------------------------------------------------------------------
|
||||
|
||||
[Fact]
|
||||
public async Task Source_deduplicates_messages_by_msg_id()
|
||||
{
|
||||
var target = new MemStore();
|
||||
var source = new SourceCoordinator(target, new StreamSourceConfig
|
||||
{
|
||||
Name = "SRC",
|
||||
DuplicateWindowMs = 60_000, // 60 second window
|
||||
});
|
||||
|
||||
await source.OnOriginAppendAsync(MakeMessageWithMsgId(1, "a", "1", "msg-001"), default);
|
||||
await source.OnOriginAppendAsync(MakeMessageWithMsgId(2, "a", "1", "msg-001"), default); // duplicate
|
||||
|
||||
source.DeduplicatedCount.ShouldBe(1);
|
||||
var state = await target.GetStateAsync(default);
|
||||
state.Messages.ShouldBe(1UL);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Source_allows_different_msg_ids()
|
||||
{
|
||||
var target = new MemStore();
|
||||
var source = new SourceCoordinator(target, new StreamSourceConfig
|
||||
{
|
||||
Name = "SRC",
|
||||
DuplicateWindowMs = 60_000,
|
||||
});
|
||||
|
||||
await source.OnOriginAppendAsync(MakeMessageWithMsgId(1, "a", "1", "msg-001"), default);
|
||||
await source.OnOriginAppendAsync(MakeMessageWithMsgId(2, "b", "2", "msg-002"), default);
|
||||
|
||||
source.DeduplicatedCount.ShouldBe(0);
|
||||
var state = await target.GetStateAsync(default);
|
||||
state.Messages.ShouldBe(2UL);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Source_dedup_disabled_when_window_is_zero()
|
||||
{
|
||||
var target = new MemStore();
|
||||
var source = new SourceCoordinator(target, new StreamSourceConfig
|
||||
{
|
||||
Name = "SRC",
|
||||
DuplicateWindowMs = 0, // disabled
|
||||
});
|
||||
|
||||
// Same msg-id should NOT be deduped when window is 0
|
||||
await source.OnOriginAppendAsync(MakeMessageWithMsgId(1, "a", "1", "msg-001"), default);
|
||||
await source.OnOriginAppendAsync(MakeMessageWithMsgId(2, "a", "1", "msg-001"), default);
|
||||
|
||||
source.DeduplicatedCount.ShouldBe(0);
|
||||
var state = await target.GetStateAsync(default);
|
||||
state.Messages.ShouldBe(2UL);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Source_dedup_ignores_messages_without_msg_id()
|
||||
{
|
||||
var target = new MemStore();
|
||||
var source = new SourceCoordinator(target, new StreamSourceConfig
|
||||
{
|
||||
Name = "SRC",
|
||||
DuplicateWindowMs = 60_000,
|
||||
});
|
||||
|
||||
// Messages without Nats-Msg-Id header bypass dedup
|
||||
await source.OnOriginAppendAsync(MakeMessage(1, "a", "1"), default);
|
||||
await source.OnOriginAppendAsync(MakeMessage(2, "a", "2"), default);
|
||||
|
||||
source.DeduplicatedCount.ShouldBe(0);
|
||||
var state = await target.GetStateAsync(default);
|
||||
state.Messages.ShouldBe(2UL);
|
||||
}
|
||||
|
||||
// -------------------------------------------------------------------------
|
||||
// Multiple sources per stream
|
||||
// -------------------------------------------------------------------------
|
||||
|
||||
[Fact]
|
||||
public async Task Multiple_sources_aggregate_into_single_target()
|
||||
{
|
||||
var target = new MemStore();
|
||||
|
||||
var src1 = new SourceCoordinator(target, new StreamSourceConfig
|
||||
{
|
||||
Name = "SRC1",
|
||||
SubjectTransformPrefix = "agg.",
|
||||
});
|
||||
var src2 = new SourceCoordinator(target, new StreamSourceConfig
|
||||
{
|
||||
Name = "SRC2",
|
||||
SubjectTransformPrefix = "agg.",
|
||||
});
|
||||
|
||||
await src1.OnOriginAppendAsync(MakeMessage(1, "orders.created", "1"), default);
|
||||
await src2.OnOriginAppendAsync(MakeMessage(1, "events.login", "2"), default);
|
||||
await src1.OnOriginAppendAsync(MakeMessage(2, "orders.updated", "3"), default);
|
||||
|
||||
var state = await target.GetStateAsync(default);
|
||||
state.Messages.ShouldBe(3UL);
|
||||
|
||||
var msg1 = await target.LoadAsync(1, default);
|
||||
msg1.ShouldNotBeNull();
|
||||
msg1.Subject.ShouldBe("agg.orders.created");
|
||||
|
||||
var msg2 = await target.LoadAsync(2, default);
|
||||
msg2.ShouldNotBeNull();
|
||||
msg2.Subject.ShouldBe("agg.events.login");
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Multiple_sources_with_different_filters()
|
||||
{
|
||||
var target = new MemStore();
|
||||
|
||||
var src1 = new SourceCoordinator(target, new StreamSourceConfig
|
||||
{
|
||||
Name = "SRC1",
|
||||
FilterSubject = "orders.*",
|
||||
});
|
||||
var src2 = new SourceCoordinator(target, new StreamSourceConfig
|
||||
{
|
||||
Name = "SRC2",
|
||||
FilterSubject = "events.*",
|
||||
});
|
||||
|
||||
await src1.OnOriginAppendAsync(MakeMessage(1, "orders.created", "1"), default);
|
||||
await src1.OnOriginAppendAsync(MakeMessage(2, "events.login", "2"), default); // filtered by src1
|
||||
await src2.OnOriginAppendAsync(MakeMessage(1, "events.login", "3"), default);
|
||||
await src2.OnOriginAppendAsync(MakeMessage(2, "orders.created", "4"), default); // filtered by src2
|
||||
|
||||
var state = await target.GetStateAsync(default);
|
||||
state.Messages.ShouldBe(2UL);
|
||||
}
|
||||
|
||||
// -------------------------------------------------------------------------
|
||||
// Lag tracking per source
|
||||
// -------------------------------------------------------------------------
|
||||
|
||||
[Fact]
|
||||
public async Task Source_lag_tracking_reflects_origin_position()
|
||||
{
|
||||
var target = new MemStore();
|
||||
var source = new SourceCoordinator(target, new StreamSourceConfig { Name = "SRC" });
|
||||
|
||||
await source.OnOriginAppendAsync(MakeMessage(5, "a", "1"), default);
|
||||
|
||||
var report = source.GetHealthReport(originLastSeq: 10);
|
||||
report.Lag.ShouldBe(5UL);
|
||||
report.SourceName.ShouldBe("SRC");
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Source_lag_zero_when_caught_up()
|
||||
{
|
||||
var target = new MemStore();
|
||||
var source = new SourceCoordinator(target, new StreamSourceConfig { Name = "SRC" });
|
||||
|
||||
await source.OnOriginAppendAsync(MakeMessage(10, "a", "1"), default);
|
||||
|
||||
var report = source.GetHealthReport(originLastSeq: 10);
|
||||
report.Lag.ShouldBe(0UL);
|
||||
}
|
||||
|
||||
// -------------------------------------------------------------------------
|
||||
// Sequence tracking — ignores older messages
|
||||
// -------------------------------------------------------------------------
|
||||
|
||||
[Fact]
|
||||
public async Task Source_ignores_older_sequences()
|
||||
{
|
||||
var target = new MemStore();
|
||||
var source = new SourceCoordinator(target, new StreamSourceConfig { Name = "SRC" });
|
||||
|
||||
await source.OnOriginAppendAsync(MakeMessage(5, "a", "1"), default);
|
||||
await source.OnOriginAppendAsync(MakeMessage(3, "b", "2"), default); // older, ignored
|
||||
await source.OnOriginAppendAsync(MakeMessage(5, "c", "3"), default); // same, ignored
|
||||
|
||||
var state = await target.GetStateAsync(default);
|
||||
state.Messages.ShouldBe(1UL);
|
||||
source.LastOriginSequence.ShouldBe(5UL);
|
||||
}
|
||||
|
||||
// -------------------------------------------------------------------------
|
||||
// Background sync loop: channel-based
|
||||
// Go reference: server/stream.go:3752-3833 (processAllSourceMsgs)
|
||||
// -------------------------------------------------------------------------
|
||||
|
||||
[Fact]
|
||||
public async Task Channel_sync_loop_processes_enqueued_source_messages()
|
||||
{
|
||||
var target = new MemStore();
|
||||
await using var source = new SourceCoordinator(target, new StreamSourceConfig
|
||||
{
|
||||
Name = "SRC",
|
||||
FilterSubject = "orders.*",
|
||||
});
|
||||
|
||||
source.StartSyncLoop();
|
||||
source.IsRunning.ShouldBeTrue();
|
||||
|
||||
source.TryEnqueue(MakeMessage(1, "orders.created", "1"));
|
||||
source.TryEnqueue(MakeMessage(2, "events.login", "2")); // filtered
|
||||
source.TryEnqueue(MakeMessage(3, "orders.updated", "3"));
|
||||
|
||||
await WaitForConditionAsync(() => source.LastOriginSequence >= 3, TimeSpan.FromSeconds(5));
|
||||
|
||||
var state = await target.GetStateAsync(default);
|
||||
state.Messages.ShouldBe(2UL);
|
||||
source.FilteredOutCount.ShouldBe(1);
|
||||
}
|
||||
|
||||
// -------------------------------------------------------------------------
|
||||
// Background sync loop: pull-based
|
||||
// Go reference: server/stream.go:3474-3720 (setupSourceConsumer)
|
||||
// -------------------------------------------------------------------------
|
||||
|
||||
[Fact]
|
||||
public async Task Pull_sync_loop_fetches_filtered_from_origin()
|
||||
{
|
||||
var origin = new MemStore();
|
||||
var target = new MemStore();
|
||||
await using var source = new SourceCoordinator(target, new StreamSourceConfig
|
||||
{
|
||||
Name = "SRC",
|
||||
FilterSubject = "orders.*",
|
||||
SubjectTransformPrefix = "agg.",
|
||||
});
|
||||
|
||||
await origin.AppendAsync("orders.created", "1"u8.ToArray(), default);
|
||||
await origin.AppendAsync("events.login", "2"u8.ToArray(), default);
|
||||
await origin.AppendAsync("orders.updated", "3"u8.ToArray(), default);
|
||||
|
||||
source.StartPullSyncLoop(origin);
|
||||
|
||||
await WaitForConditionAsync(() => source.LastOriginSequence >= 3, TimeSpan.FromSeconds(5));
|
||||
|
||||
var state = await target.GetStateAsync(default);
|
||||
state.Messages.ShouldBe(2UL);
|
||||
|
||||
// Verify transform was applied
|
||||
var msg1 = await target.LoadAsync(1, default);
|
||||
msg1.ShouldNotBeNull();
|
||||
msg1.Subject.ShouldBe("agg.orders.created");
|
||||
}
|
||||
|
||||
// -------------------------------------------------------------------------
|
||||
// Combined: filter + account + transform + dedup
|
||||
// -------------------------------------------------------------------------
|
||||
|
||||
[Fact]
|
||||
public async Task Source_applies_all_filters_together()
|
||||
{
|
||||
var target = new MemStore();
|
||||
var source = new SourceCoordinator(target, new StreamSourceConfig
|
||||
{
|
||||
Name = "SRC",
|
||||
FilterSubject = "orders.*",
|
||||
SubjectTransformPrefix = "agg.",
|
||||
SourceAccount = "PROD",
|
||||
DuplicateWindowMs = 60_000,
|
||||
});
|
||||
|
||||
// Pass: correct account, matching subject, unique msg-id
|
||||
await source.OnOriginAppendAsync(
|
||||
MakeMessageWithMsgId(1, "orders.created", "1", "m1", account: "PROD"), default);
|
||||
|
||||
// Fail: wrong account
|
||||
await source.OnOriginAppendAsync(
|
||||
MakeMessageWithMsgId(2, "orders.created", "2", "m2", account: "DEV"), default);
|
||||
|
||||
// Fail: wrong subject
|
||||
await source.OnOriginAppendAsync(
|
||||
MakeMessageWithMsgId(3, "events.login", "3", "m3", account: "PROD"), default);
|
||||
|
||||
// Fail: duplicate msg-id
|
||||
await source.OnOriginAppendAsync(
|
||||
MakeMessageWithMsgId(4, "orders.updated", "4", "m1", account: "PROD"), default);
|
||||
|
||||
// Pass: everything checks out
|
||||
await source.OnOriginAppendAsync(
|
||||
MakeMessageWithMsgId(5, "orders.updated", "5", "m5", account: "PROD"), default);
|
||||
|
||||
var state = await target.GetStateAsync(default);
|
||||
state.Messages.ShouldBe(2UL);
|
||||
source.FilteredOutCount.ShouldBe(1);
|
||||
source.DeduplicatedCount.ShouldBe(1);
|
||||
}
|
||||
|
||||
// -------------------------------------------------------------------------
|
||||
// Health report
|
||||
// -------------------------------------------------------------------------
|
||||
|
||||
[Fact]
|
||||
public async Task Health_report_includes_filter_and_dedup_stats()
|
||||
{
|
||||
var target = new MemStore();
|
||||
var source = new SourceCoordinator(target, new StreamSourceConfig
|
||||
{
|
||||
Name = "SRC",
|
||||
FilterSubject = "orders.*",
|
||||
DuplicateWindowMs = 60_000,
|
||||
});
|
||||
|
||||
await source.OnOriginAppendAsync(MakeMessage(1, "events.login", "1"), default); // filtered
|
||||
await source.OnOriginAppendAsync(
|
||||
MakeMessageWithMsgId(2, "orders.created", "2", "m1"), default);
|
||||
await source.OnOriginAppendAsync(
|
||||
MakeMessageWithMsgId(3, "orders.updated", "3", "m1"), default); // deduped
|
||||
|
||||
var report = source.GetHealthReport(originLastSeq: 10);
|
||||
report.SourceName.ShouldBe("SRC");
|
||||
report.FilterSubject.ShouldBe("orders.*");
|
||||
report.FilteredOutCount.ShouldBe(1);
|
||||
report.DeduplicatedCount.ShouldBe(1);
|
||||
report.Lag.ShouldBeGreaterThan(0UL);
|
||||
}
|
||||
|
||||
// -------------------------------------------------------------------------
|
||||
// Dispose / lifecycle tests
|
||||
// -------------------------------------------------------------------------
|
||||
|
||||
[Fact]
|
||||
public async Task Dispose_stops_running_source_sync_loop()
|
||||
{
|
||||
var target = new MemStore();
|
||||
var source = new SourceCoordinator(target, new StreamSourceConfig { Name = "SRC" });
|
||||
|
||||
source.StartSyncLoop();
|
||||
source.IsRunning.ShouldBeTrue();
|
||||
|
||||
await source.DisposeAsync();
|
||||
source.IsRunning.ShouldBeFalse();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Config_property_exposes_source_configuration()
|
||||
{
|
||||
var target = new MemStore();
|
||||
var config = new StreamSourceConfig
|
||||
{
|
||||
Name = "MY_SOURCE",
|
||||
FilterSubject = "orders.*",
|
||||
SubjectTransformPrefix = "agg.",
|
||||
SourceAccount = "PROD",
|
||||
DuplicateWindowMs = 5000,
|
||||
};
|
||||
var source = new SourceCoordinator(target, config);
|
||||
|
||||
source.Config.Name.ShouldBe("MY_SOURCE");
|
||||
source.Config.FilterSubject.ShouldBe("orders.*");
|
||||
source.Config.SubjectTransformPrefix.ShouldBe("agg.");
|
||||
source.Config.SourceAccount.ShouldBe("PROD");
|
||||
source.Config.DuplicateWindowMs.ShouldBe(5000);
|
||||
|
||||
await source.DisposeAsync();
|
||||
}
|
||||
|
||||
// -------------------------------------------------------------------------
|
||||
// Helpers
|
||||
// -------------------------------------------------------------------------
|
||||
|
||||
private static StoredMessage MakeMessage(ulong seq, string subject, string payload, string? account = null) => new()
|
||||
{
|
||||
Sequence = seq,
|
||||
Subject = subject,
|
||||
Payload = System.Text.Encoding.UTF8.GetBytes(payload),
|
||||
TimestampUtc = DateTime.UtcNow,
|
||||
Account = account,
|
||||
};
|
||||
|
||||
private static StoredMessage MakeMessageWithMsgId(
|
||||
ulong seq, string subject, string payload, string msgId, string? account = null) => new()
|
||||
{
|
||||
Sequence = seq,
|
||||
Subject = subject,
|
||||
Payload = System.Text.Encoding.UTF8.GetBytes(payload),
|
||||
TimestampUtc = DateTime.UtcNow,
|
||||
Account = account,
|
||||
Headers = new Dictionary<string, string> { ["Nats-Msg-Id"] = msgId },
|
||||
};
|
||||
|
||||
private static async Task WaitForConditionAsync(Func<bool> condition, TimeSpan timeout)
|
||||
{
|
||||
using var cts = new CancellationTokenSource(timeout);
|
||||
while (!condition())
|
||||
{
|
||||
await Task.Delay(25, cts.Token);
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user