feat(jetstream): add mirror sync loop and source coordination with filtering (C9+C10)

This commit is contained in:
Joseph Doherty
2026-02-24 15:41:35 -05:00
parent 6ad8ab69bf
commit 18acd6f4e2
8 changed files with 1709 additions and 3 deletions

View File

@@ -0,0 +1,341 @@
using NATS.Server.JetStream.MirrorSource;
using NATS.Server.JetStream.Storage;
namespace NATS.Server.Tests.JetStream.MirrorSource;
// Go reference: server/stream.go:2788-2854 (processMirrorMsgs)
// Go reference: server/stream.go:2863-3014 (processInboundMirrorMsg)
// Go reference: server/stream.go:3125-3400 (setupMirrorConsumer)
public class MirrorSyncTests
{
// -------------------------------------------------------------------------
// Direct in-process synchronization tests
// -------------------------------------------------------------------------
[Fact]
// Go reference: server/stream.go:2915 — sseq == mset.mirror.sseq+1 (normal in-order)
public async Task Mirror_applies_single_message_and_tracks_sequence()
{
var target = new MemStore();
var mirror = new MirrorCoordinator(target);
var msg = MakeMessage(seq: 1, subject: "orders.created", payload: "order-1");
await mirror.OnOriginAppendAsync(msg, default);
mirror.LastOriginSequence.ShouldBe(1UL);
mirror.LastSyncUtc.ShouldNotBe(default(DateTime));
mirror.Lag.ShouldBe(0UL);
var stored = await target.LoadAsync(1, default);
stored.ShouldNotBeNull();
stored.Subject.ShouldBe("orders.created");
}
[Fact]
// Go reference: server/stream.go:2915-2917 — sequential messages increment sseq/dseq
public async Task Mirror_applies_sequential_messages_in_order()
{
var target = new MemStore();
var mirror = new MirrorCoordinator(target);
for (ulong i = 1; i <= 5; i++)
{
await mirror.OnOriginAppendAsync(
MakeMessage(seq: i, subject: $"orders.{i}", payload: $"payload-{i}"), default);
}
mirror.LastOriginSequence.ShouldBe(5UL);
var state = await target.GetStateAsync(default);
state.Messages.ShouldBe(5UL);
}
[Fact]
// Go reference: server/stream.go:2918-2921 — sseq <= mset.mirror.sseq (ignore older)
public async Task Mirror_ignores_older_duplicate_messages()
{
var target = new MemStore();
var mirror = new MirrorCoordinator(target);
await mirror.OnOriginAppendAsync(MakeMessage(seq: 5, subject: "a", payload: "1"), default);
await mirror.OnOriginAppendAsync(MakeMessage(seq: 3, subject: "b", payload: "2"), default); // older
await mirror.OnOriginAppendAsync(MakeMessage(seq: 5, subject: "c", payload: "3"), default); // same
mirror.LastOriginSequence.ShouldBe(5UL);
var state = await target.GetStateAsync(default);
state.Messages.ShouldBe(1UL); // only seq 5 stored
}
[Fact]
// Go reference: server/stream.go:2927-2936 — gap handling (sseq > mirror.sseq+1)
public async Task Mirror_handles_sequence_gaps_from_origin()
{
var target = new MemStore();
var mirror = new MirrorCoordinator(target);
await mirror.OnOriginAppendAsync(MakeMessage(seq: 1, subject: "a", payload: "1"), default);
// Gap: origin deleted seq 2-4
await mirror.OnOriginAppendAsync(MakeMessage(seq: 5, subject: "b", payload: "2"), default);
mirror.LastOriginSequence.ShouldBe(5UL);
var state = await target.GetStateAsync(default);
state.Messages.ShouldBe(2UL);
}
[Fact]
public async Task Mirror_first_message_at_arbitrary_sequence()
{
var target = new MemStore();
var mirror = new MirrorCoordinator(target);
// First message arrives at seq 100 (origin has prior history)
await mirror.OnOriginAppendAsync(MakeMessage(seq: 100, subject: "a", payload: "1"), default);
mirror.LastOriginSequence.ShouldBe(100UL);
var stored = await target.LoadAsync(1, default);
stored.ShouldNotBeNull();
}
// -------------------------------------------------------------------------
// Health reporting tests
// -------------------------------------------------------------------------
[Fact]
// Go reference: server/stream.go:2739-2743 (mirrorInfo)
public async Task Health_report_reflects_current_state()
{
var target = new MemStore();
var mirror = new MirrorCoordinator(target);
var report = mirror.GetHealthReport(originLastSeq: 10);
report.LastOriginSequence.ShouldBe(0UL);
report.Lag.ShouldBe(10UL);
report.IsRunning.ShouldBeFalse();
await mirror.OnOriginAppendAsync(MakeMessage(seq: 7, subject: "a", payload: "1"), default);
report = mirror.GetHealthReport(originLastSeq: 10);
report.LastOriginSequence.ShouldBe(7UL);
report.Lag.ShouldBe(3UL);
}
[Fact]
public async Task Health_report_shows_zero_lag_when_caught_up()
{
var target = new MemStore();
var mirror = new MirrorCoordinator(target);
await mirror.OnOriginAppendAsync(MakeMessage(seq: 10, subject: "a", payload: "1"), default);
var report = mirror.GetHealthReport(originLastSeq: 10);
report.Lag.ShouldBe(0UL);
}
// -------------------------------------------------------------------------
// Background sync loop: channel-based
// Go reference: server/stream.go:2788-2854 (processMirrorMsgs goroutine)
// -------------------------------------------------------------------------
[Fact]
public async Task Channel_sync_loop_processes_enqueued_messages()
{
var target = new MemStore();
await using var mirror = new MirrorCoordinator(target);
mirror.StartSyncLoop();
mirror.IsRunning.ShouldBeTrue();
mirror.TryEnqueue(MakeMessage(seq: 1, subject: "a", payload: "1"));
mirror.TryEnqueue(MakeMessage(seq: 2, subject: "b", payload: "2"));
await WaitForConditionAsync(() => mirror.LastOriginSequence >= 2, TimeSpan.FromSeconds(5));
mirror.LastOriginSequence.ShouldBe(2UL);
var state = await target.GetStateAsync(default);
state.Messages.ShouldBe(2UL);
}
[Fact]
public async Task Channel_sync_loop_can_be_stopped()
{
var target = new MemStore();
await using var mirror = new MirrorCoordinator(target);
mirror.StartSyncLoop();
mirror.IsRunning.ShouldBeTrue();
await mirror.StopAsync();
mirror.IsRunning.ShouldBeFalse();
}
[Fact]
public async Task Channel_sync_loop_ignores_duplicates()
{
var target = new MemStore();
await using var mirror = new MirrorCoordinator(target);
mirror.StartSyncLoop();
mirror.TryEnqueue(MakeMessage(seq: 1, subject: "a", payload: "1"));
mirror.TryEnqueue(MakeMessage(seq: 1, subject: "a", payload: "1")); // duplicate
mirror.TryEnqueue(MakeMessage(seq: 2, subject: "b", payload: "2"));
await WaitForConditionAsync(() => mirror.LastOriginSequence >= 2, TimeSpan.FromSeconds(5));
var state = await target.GetStateAsync(default);
state.Messages.ShouldBe(2UL);
}
// -------------------------------------------------------------------------
// Background sync loop: pull-based
// Go reference: server/stream.go:3125-3400 (setupMirrorConsumer)
// -------------------------------------------------------------------------
[Fact]
public async Task Pull_sync_loop_fetches_from_origin_store()
{
var origin = new MemStore();
var target = new MemStore();
await using var mirror = new MirrorCoordinator(target);
// Pre-populate origin
await origin.AppendAsync("a", "1"u8.ToArray(), default);
await origin.AppendAsync("b", "2"u8.ToArray(), default);
await origin.AppendAsync("c", "3"u8.ToArray(), default);
mirror.StartPullSyncLoop(origin);
await WaitForConditionAsync(() => mirror.LastOriginSequence >= 3, TimeSpan.FromSeconds(5));
mirror.LastOriginSequence.ShouldBe(3UL);
var state = await target.GetStateAsync(default);
state.Messages.ShouldBe(3UL);
}
[Fact]
public async Task Pull_sync_loop_catches_up_after_restart()
{
var origin = new MemStore();
var target = new MemStore();
// Phase 1: sync first 2 messages
{
await using var mirror = new MirrorCoordinator(target);
await origin.AppendAsync("a", "1"u8.ToArray(), default);
await origin.AppendAsync("b", "2"u8.ToArray(), default);
mirror.StartPullSyncLoop(origin);
await WaitForConditionAsync(() => mirror.LastOriginSequence >= 2, TimeSpan.FromSeconds(5));
await mirror.StopAsync();
}
// Phase 2: add more messages and restart with new coordinator
await origin.AppendAsync("c", "3"u8.ToArray(), default);
await origin.AppendAsync("d", "4"u8.ToArray(), default);
{
// Simulate restart: new coordinator, same target store
await using var mirror2 = new MirrorCoordinator(target);
// Manually sync to simulate catchup from seq 2
await mirror2.OnOriginAppendAsync(
new StoredMessage { Sequence = 3, Subject = "c", Payload = "3"u8.ToArray() }, default);
await mirror2.OnOriginAppendAsync(
new StoredMessage { Sequence = 4, Subject = "d", Payload = "4"u8.ToArray() }, default);
mirror2.LastOriginSequence.ShouldBe(4UL);
}
var state = await target.GetStateAsync(default);
state.Messages.ShouldBe(4UL);
}
[Fact]
public async Task Pull_sync_loop_updates_lag()
{
var origin = new MemStore();
var target = new MemStore();
await using var mirror = new MirrorCoordinator(target);
// Pre-populate origin with 10 messages
for (var i = 0; i < 10; i++)
await origin.AppendAsync($"subj.{i}", System.Text.Encoding.UTF8.GetBytes($"payload-{i}"), default);
mirror.StartPullSyncLoop(origin, batchSize: 3);
// Wait for some progress
await WaitForConditionAsync(() => mirror.LastOriginSequence >= 3, TimeSpan.FromSeconds(5));
// Eventually should catch up to all 10
await WaitForConditionAsync(() => mirror.LastOriginSequence >= 10, TimeSpan.FromSeconds(10));
var report = mirror.GetHealthReport(originLastSeq: 10);
report.Lag.ShouldBe(0UL);
}
[Fact]
public async Task Pull_sync_loop_handles_empty_origin()
{
var origin = new MemStore();
var target = new MemStore();
await using var mirror = new MirrorCoordinator(target);
mirror.StartPullSyncLoop(origin);
// Wait a bit to ensure it doesn't crash
await Task.Delay(200);
mirror.IsRunning.ShouldBeTrue();
mirror.LastOriginSequence.ShouldBe(0UL);
}
// -------------------------------------------------------------------------
// Dispose / lifecycle tests
// -------------------------------------------------------------------------
[Fact]
public async Task Dispose_stops_running_sync_loop()
{
var target = new MemStore();
var mirror = new MirrorCoordinator(target);
mirror.StartSyncLoop();
mirror.IsRunning.ShouldBeTrue();
await mirror.DisposeAsync();
mirror.IsRunning.ShouldBeFalse();
}
[Fact]
public async Task Multiple_start_calls_are_idempotent()
{
var target = new MemStore();
await using var mirror = new MirrorCoordinator(target);
mirror.StartSyncLoop();
mirror.StartSyncLoop(); // second call should be no-op
mirror.IsRunning.ShouldBeTrue();
}
// -------------------------------------------------------------------------
// Helpers
// -------------------------------------------------------------------------
private static StoredMessage MakeMessage(ulong seq, string subject, string payload) => new()
{
Sequence = seq,
Subject = subject,
Payload = System.Text.Encoding.UTF8.GetBytes(payload),
TimestampUtc = DateTime.UtcNow,
};
private static async Task WaitForConditionAsync(Func<bool> condition, TimeSpan timeout)
{
using var cts = new CancellationTokenSource(timeout);
while (!condition())
{
await Task.Delay(25, cts.Token);
}
}
}

View File

@@ -0,0 +1,569 @@
using NATS.Server.JetStream.MirrorSource;
using NATS.Server.JetStream.Models;
using NATS.Server.JetStream.Storage;
namespace NATS.Server.Tests.JetStream.MirrorSource;
// Go reference: server/stream.go:3860-4007 (processInboundSourceMsg)
// Go reference: server/stream.go:3474-3720 (setupSourceConsumer, trySetupSourceConsumer)
// Go reference: server/stream.go:3752-3833 (processAllSourceMsgs)
public class SourceFilterTests
{
// -------------------------------------------------------------------------
// Subject filtering
// Go reference: server/stream.go:3597-3598 — FilterSubject on consumer creation
// -------------------------------------------------------------------------
[Fact]
public async Task Source_with_filter_only_forwards_matching_messages()
{
var target = new MemStore();
var source = new SourceCoordinator(target, new StreamSourceConfig
{
Name = "SRC",
FilterSubject = "orders.*",
});
await source.OnOriginAppendAsync(MakeMessage(1, "orders.created", "1"), default);
await source.OnOriginAppendAsync(MakeMessage(2, "events.login", "2"), default); // filtered out
await source.OnOriginAppendAsync(MakeMessage(3, "orders.updated", "3"), default);
source.LastOriginSequence.ShouldBe(3UL);
source.FilteredOutCount.ShouldBe(1);
var state = await target.GetStateAsync(default);
state.Messages.ShouldBe(2UL);
}
[Fact]
public async Task Source_with_wildcard_filter_matches_multi_token()
{
var target = new MemStore();
var source = new SourceCoordinator(target, new StreamSourceConfig
{
Name = "SRC",
FilterSubject = "orders.>",
});
await source.OnOriginAppendAsync(MakeMessage(1, "orders.created", "1"), default);
await source.OnOriginAppendAsync(MakeMessage(2, "orders.us.created", "2"), default);
await source.OnOriginAppendAsync(MakeMessage(3, "events.login", "3"), default); // filtered out
var state = await target.GetStateAsync(default);
state.Messages.ShouldBe(2UL);
source.FilteredOutCount.ShouldBe(1);
}
[Fact]
public async Task Source_without_filter_forwards_all_messages()
{
var target = new MemStore();
var source = new SourceCoordinator(target, new StreamSourceConfig
{
Name = "SRC",
});
await source.OnOriginAppendAsync(MakeMessage(1, "orders.created", "1"), default);
await source.OnOriginAppendAsync(MakeMessage(2, "events.login", "2"), default);
await source.OnOriginAppendAsync(MakeMessage(3, "anything.goes", "3"), default);
var state = await target.GetStateAsync(default);
state.Messages.ShouldBe(3UL);
source.FilteredOutCount.ShouldBe(0);
}
// -------------------------------------------------------------------------
// Subject transform prefix
// Go reference: server/stream.go:3943-3956 (subject transform for the source)
// -------------------------------------------------------------------------
[Fact]
public async Task Source_applies_subject_transform_prefix()
{
var target = new MemStore();
var source = new SourceCoordinator(target, new StreamSourceConfig
{
Name = "SRC",
SubjectTransformPrefix = "agg.",
});
await source.OnOriginAppendAsync(MakeMessage(1, "orders.created", "1"), default);
var stored = await target.LoadAsync(1, default);
stored.ShouldNotBeNull();
stored.Subject.ShouldBe("agg.orders.created");
}
[Fact]
public async Task Source_with_filter_and_transform_applies_both()
{
var target = new MemStore();
var source = new SourceCoordinator(target, new StreamSourceConfig
{
Name = "SRC",
FilterSubject = "orders.*",
SubjectTransformPrefix = "mirror.",
});
await source.OnOriginAppendAsync(MakeMessage(1, "orders.created", "1"), default);
await source.OnOriginAppendAsync(MakeMessage(2, "events.login", "2"), default);
var state = await target.GetStateAsync(default);
state.Messages.ShouldBe(1UL);
var stored = await target.LoadAsync(1, default);
stored.ShouldNotBeNull();
stored.Subject.ShouldBe("mirror.orders.created");
}
// -------------------------------------------------------------------------
// Account isolation
// -------------------------------------------------------------------------
[Fact]
public async Task Source_with_account_filter_skips_wrong_account()
{
var target = new MemStore();
var source = new SourceCoordinator(target, new StreamSourceConfig
{
Name = "SRC",
SourceAccount = "PROD",
});
await source.OnOriginAppendAsync(MakeMessage(1, "a", "1", account: "PROD"), default);
await source.OnOriginAppendAsync(MakeMessage(2, "b", "2", account: "DEV"), default); // wrong account
await source.OnOriginAppendAsync(MakeMessage(3, "c", "3", account: "PROD"), default);
var state = await target.GetStateAsync(default);
state.Messages.ShouldBe(2UL);
}
[Fact]
public async Task Source_with_account_allows_null_account_messages()
{
var target = new MemStore();
var source = new SourceCoordinator(target, new StreamSourceConfig
{
Name = "SRC",
SourceAccount = "PROD",
});
// Messages with no account set should pass through (Go: account field empty means skip check)
await source.OnOriginAppendAsync(MakeMessage(1, "a", "1", account: null), default);
var state = await target.GetStateAsync(default);
state.Messages.ShouldBe(1UL);
}
[Fact]
public async Task Source_without_account_filter_passes_all_accounts()
{
var target = new MemStore();
var source = new SourceCoordinator(target, new StreamSourceConfig
{
Name = "SRC",
});
await source.OnOriginAppendAsync(MakeMessage(1, "a", "1", account: "A"), default);
await source.OnOriginAppendAsync(MakeMessage(2, "b", "2", account: "B"), default);
var state = await target.GetStateAsync(default);
state.Messages.ShouldBe(2UL);
}
// -------------------------------------------------------------------------
// Deduplication via Nats-Msg-Id header
// -------------------------------------------------------------------------
[Fact]
public async Task Source_deduplicates_messages_by_msg_id()
{
var target = new MemStore();
var source = new SourceCoordinator(target, new StreamSourceConfig
{
Name = "SRC",
DuplicateWindowMs = 60_000, // 60 second window
});
await source.OnOriginAppendAsync(MakeMessageWithMsgId(1, "a", "1", "msg-001"), default);
await source.OnOriginAppendAsync(MakeMessageWithMsgId(2, "a", "1", "msg-001"), default); // duplicate
source.DeduplicatedCount.ShouldBe(1);
var state = await target.GetStateAsync(default);
state.Messages.ShouldBe(1UL);
}
[Fact]
public async Task Source_allows_different_msg_ids()
{
var target = new MemStore();
var source = new SourceCoordinator(target, new StreamSourceConfig
{
Name = "SRC",
DuplicateWindowMs = 60_000,
});
await source.OnOriginAppendAsync(MakeMessageWithMsgId(1, "a", "1", "msg-001"), default);
await source.OnOriginAppendAsync(MakeMessageWithMsgId(2, "b", "2", "msg-002"), default);
source.DeduplicatedCount.ShouldBe(0);
var state = await target.GetStateAsync(default);
state.Messages.ShouldBe(2UL);
}
[Fact]
public async Task Source_dedup_disabled_when_window_is_zero()
{
var target = new MemStore();
var source = new SourceCoordinator(target, new StreamSourceConfig
{
Name = "SRC",
DuplicateWindowMs = 0, // disabled
});
// Same msg-id should NOT be deduped when window is 0
await source.OnOriginAppendAsync(MakeMessageWithMsgId(1, "a", "1", "msg-001"), default);
await source.OnOriginAppendAsync(MakeMessageWithMsgId(2, "a", "1", "msg-001"), default);
source.DeduplicatedCount.ShouldBe(0);
var state = await target.GetStateAsync(default);
state.Messages.ShouldBe(2UL);
}
[Fact]
public async Task Source_dedup_ignores_messages_without_msg_id()
{
var target = new MemStore();
var source = new SourceCoordinator(target, new StreamSourceConfig
{
Name = "SRC",
DuplicateWindowMs = 60_000,
});
// Messages without Nats-Msg-Id header bypass dedup
await source.OnOriginAppendAsync(MakeMessage(1, "a", "1"), default);
await source.OnOriginAppendAsync(MakeMessage(2, "a", "2"), default);
source.DeduplicatedCount.ShouldBe(0);
var state = await target.GetStateAsync(default);
state.Messages.ShouldBe(2UL);
}
// -------------------------------------------------------------------------
// Multiple sources per stream
// -------------------------------------------------------------------------
[Fact]
public async Task Multiple_sources_aggregate_into_single_target()
{
var target = new MemStore();
var src1 = new SourceCoordinator(target, new StreamSourceConfig
{
Name = "SRC1",
SubjectTransformPrefix = "agg.",
});
var src2 = new SourceCoordinator(target, new StreamSourceConfig
{
Name = "SRC2",
SubjectTransformPrefix = "agg.",
});
await src1.OnOriginAppendAsync(MakeMessage(1, "orders.created", "1"), default);
await src2.OnOriginAppendAsync(MakeMessage(1, "events.login", "2"), default);
await src1.OnOriginAppendAsync(MakeMessage(2, "orders.updated", "3"), default);
var state = await target.GetStateAsync(default);
state.Messages.ShouldBe(3UL);
var msg1 = await target.LoadAsync(1, default);
msg1.ShouldNotBeNull();
msg1.Subject.ShouldBe("agg.orders.created");
var msg2 = await target.LoadAsync(2, default);
msg2.ShouldNotBeNull();
msg2.Subject.ShouldBe("agg.events.login");
}
[Fact]
public async Task Multiple_sources_with_different_filters()
{
var target = new MemStore();
var src1 = new SourceCoordinator(target, new StreamSourceConfig
{
Name = "SRC1",
FilterSubject = "orders.*",
});
var src2 = new SourceCoordinator(target, new StreamSourceConfig
{
Name = "SRC2",
FilterSubject = "events.*",
});
await src1.OnOriginAppendAsync(MakeMessage(1, "orders.created", "1"), default);
await src1.OnOriginAppendAsync(MakeMessage(2, "events.login", "2"), default); // filtered by src1
await src2.OnOriginAppendAsync(MakeMessage(1, "events.login", "3"), default);
await src2.OnOriginAppendAsync(MakeMessage(2, "orders.created", "4"), default); // filtered by src2
var state = await target.GetStateAsync(default);
state.Messages.ShouldBe(2UL);
}
// -------------------------------------------------------------------------
// Lag tracking per source
// -------------------------------------------------------------------------
[Fact]
public async Task Source_lag_tracking_reflects_origin_position()
{
var target = new MemStore();
var source = new SourceCoordinator(target, new StreamSourceConfig { Name = "SRC" });
await source.OnOriginAppendAsync(MakeMessage(5, "a", "1"), default);
var report = source.GetHealthReport(originLastSeq: 10);
report.Lag.ShouldBe(5UL);
report.SourceName.ShouldBe("SRC");
}
[Fact]
public async Task Source_lag_zero_when_caught_up()
{
var target = new MemStore();
var source = new SourceCoordinator(target, new StreamSourceConfig { Name = "SRC" });
await source.OnOriginAppendAsync(MakeMessage(10, "a", "1"), default);
var report = source.GetHealthReport(originLastSeq: 10);
report.Lag.ShouldBe(0UL);
}
// -------------------------------------------------------------------------
// Sequence tracking — ignores older messages
// -------------------------------------------------------------------------
[Fact]
public async Task Source_ignores_older_sequences()
{
var target = new MemStore();
var source = new SourceCoordinator(target, new StreamSourceConfig { Name = "SRC" });
await source.OnOriginAppendAsync(MakeMessage(5, "a", "1"), default);
await source.OnOriginAppendAsync(MakeMessage(3, "b", "2"), default); // older, ignored
await source.OnOriginAppendAsync(MakeMessage(5, "c", "3"), default); // same, ignored
var state = await target.GetStateAsync(default);
state.Messages.ShouldBe(1UL);
source.LastOriginSequence.ShouldBe(5UL);
}
// -------------------------------------------------------------------------
// Background sync loop: channel-based
// Go reference: server/stream.go:3752-3833 (processAllSourceMsgs)
// -------------------------------------------------------------------------
[Fact]
public async Task Channel_sync_loop_processes_enqueued_source_messages()
{
var target = new MemStore();
await using var source = new SourceCoordinator(target, new StreamSourceConfig
{
Name = "SRC",
FilterSubject = "orders.*",
});
source.StartSyncLoop();
source.IsRunning.ShouldBeTrue();
source.TryEnqueue(MakeMessage(1, "orders.created", "1"));
source.TryEnqueue(MakeMessage(2, "events.login", "2")); // filtered
source.TryEnqueue(MakeMessage(3, "orders.updated", "3"));
await WaitForConditionAsync(() => source.LastOriginSequence >= 3, TimeSpan.FromSeconds(5));
var state = await target.GetStateAsync(default);
state.Messages.ShouldBe(2UL);
source.FilteredOutCount.ShouldBe(1);
}
// -------------------------------------------------------------------------
// Background sync loop: pull-based
// Go reference: server/stream.go:3474-3720 (setupSourceConsumer)
// -------------------------------------------------------------------------
[Fact]
public async Task Pull_sync_loop_fetches_filtered_from_origin()
{
var origin = new MemStore();
var target = new MemStore();
await using var source = new SourceCoordinator(target, new StreamSourceConfig
{
Name = "SRC",
FilterSubject = "orders.*",
SubjectTransformPrefix = "agg.",
});
await origin.AppendAsync("orders.created", "1"u8.ToArray(), default);
await origin.AppendAsync("events.login", "2"u8.ToArray(), default);
await origin.AppendAsync("orders.updated", "3"u8.ToArray(), default);
source.StartPullSyncLoop(origin);
await WaitForConditionAsync(() => source.LastOriginSequence >= 3, TimeSpan.FromSeconds(5));
var state = await target.GetStateAsync(default);
state.Messages.ShouldBe(2UL);
// Verify transform was applied
var msg1 = await target.LoadAsync(1, default);
msg1.ShouldNotBeNull();
msg1.Subject.ShouldBe("agg.orders.created");
}
// -------------------------------------------------------------------------
// Combined: filter + account + transform + dedup
// -------------------------------------------------------------------------
[Fact]
public async Task Source_applies_all_filters_together()
{
var target = new MemStore();
var source = new SourceCoordinator(target, new StreamSourceConfig
{
Name = "SRC",
FilterSubject = "orders.*",
SubjectTransformPrefix = "agg.",
SourceAccount = "PROD",
DuplicateWindowMs = 60_000,
});
// Pass: correct account, matching subject, unique msg-id
await source.OnOriginAppendAsync(
MakeMessageWithMsgId(1, "orders.created", "1", "m1", account: "PROD"), default);
// Fail: wrong account
await source.OnOriginAppendAsync(
MakeMessageWithMsgId(2, "orders.created", "2", "m2", account: "DEV"), default);
// Fail: wrong subject
await source.OnOriginAppendAsync(
MakeMessageWithMsgId(3, "events.login", "3", "m3", account: "PROD"), default);
// Fail: duplicate msg-id
await source.OnOriginAppendAsync(
MakeMessageWithMsgId(4, "orders.updated", "4", "m1", account: "PROD"), default);
// Pass: everything checks out
await source.OnOriginAppendAsync(
MakeMessageWithMsgId(5, "orders.updated", "5", "m5", account: "PROD"), default);
var state = await target.GetStateAsync(default);
state.Messages.ShouldBe(2UL);
source.FilteredOutCount.ShouldBe(1);
source.DeduplicatedCount.ShouldBe(1);
}
// -------------------------------------------------------------------------
// Health report
// -------------------------------------------------------------------------
[Fact]
public async Task Health_report_includes_filter_and_dedup_stats()
{
var target = new MemStore();
var source = new SourceCoordinator(target, new StreamSourceConfig
{
Name = "SRC",
FilterSubject = "orders.*",
DuplicateWindowMs = 60_000,
});
await source.OnOriginAppendAsync(MakeMessage(1, "events.login", "1"), default); // filtered
await source.OnOriginAppendAsync(
MakeMessageWithMsgId(2, "orders.created", "2", "m1"), default);
await source.OnOriginAppendAsync(
MakeMessageWithMsgId(3, "orders.updated", "3", "m1"), default); // deduped
var report = source.GetHealthReport(originLastSeq: 10);
report.SourceName.ShouldBe("SRC");
report.FilterSubject.ShouldBe("orders.*");
report.FilteredOutCount.ShouldBe(1);
report.DeduplicatedCount.ShouldBe(1);
report.Lag.ShouldBeGreaterThan(0UL);
}
// -------------------------------------------------------------------------
// Dispose / lifecycle tests
// -------------------------------------------------------------------------
[Fact]
public async Task Dispose_stops_running_source_sync_loop()
{
var target = new MemStore();
var source = new SourceCoordinator(target, new StreamSourceConfig { Name = "SRC" });
source.StartSyncLoop();
source.IsRunning.ShouldBeTrue();
await source.DisposeAsync();
source.IsRunning.ShouldBeFalse();
}
[Fact]
public async Task Config_property_exposes_source_configuration()
{
var target = new MemStore();
var config = new StreamSourceConfig
{
Name = "MY_SOURCE",
FilterSubject = "orders.*",
SubjectTransformPrefix = "agg.",
SourceAccount = "PROD",
DuplicateWindowMs = 5000,
};
var source = new SourceCoordinator(target, config);
source.Config.Name.ShouldBe("MY_SOURCE");
source.Config.FilterSubject.ShouldBe("orders.*");
source.Config.SubjectTransformPrefix.ShouldBe("agg.");
source.Config.SourceAccount.ShouldBe("PROD");
source.Config.DuplicateWindowMs.ShouldBe(5000);
await source.DisposeAsync();
}
// -------------------------------------------------------------------------
// Helpers
// -------------------------------------------------------------------------
private static StoredMessage MakeMessage(ulong seq, string subject, string payload, string? account = null) => new()
{
Sequence = seq,
Subject = subject,
Payload = System.Text.Encoding.UTF8.GetBytes(payload),
TimestampUtc = DateTime.UtcNow,
Account = account,
};
private static StoredMessage MakeMessageWithMsgId(
ulong seq, string subject, string payload, string msgId, string? account = null) => new()
{
Sequence = seq,
Subject = subject,
Payload = System.Text.Encoding.UTF8.GetBytes(payload),
TimestampUtc = DateTime.UtcNow,
Account = account,
Headers = new Dictionary<string, string> { ["Nats-Msg-Id"] = msgId },
};
private static async Task WaitForConditionAsync(Func<bool> condition, TimeSpan timeout)
{
using var cts = new CancellationTokenSource(timeout);
while (!condition())
{
await Task.Delay(25, cts.Token);
}
}
}