Files
natsdotnet/tests/NATS.Server.JetStream.Tests/JetStream/MirrorSourceGoParityTests.cs
Joseph Doherty 78b4bc2486 refactor: extract NATS.Server.JetStream.Tests project
Move 225 JetStream-related test files from NATS.Server.Tests into a
dedicated NATS.Server.JetStream.Tests project. This includes root-level
JetStream*.cs files, storage test files (FileStore, MemStore,
StreamStoreContract), and the full JetStream/ subfolder tree (Api,
Cluster, Consumers, MirrorSource, Snapshots, Storage, Streams).

Updated all namespaces, added InternalsVisibleTo, registered in the
solution file, and added the JETSTREAM_INTEGRATION_MATRIX define.
2026-03-12 15:58:10 -04:00

826 lines
32 KiB
C#

using NATS.Server.JetStream;
using NATS.Server.JetStream.MirrorSource;
using NATS.Server.JetStream.Models;
using NATS.Server.JetStream.Storage;
namespace NATS.Server.JetStream.Tests.JetStream;
// Go reference: server/jetstream_test.go — Mirror, Source & Transform parity tests
// Each test documents the Go function name and line number it ports.
public class MirrorSourceGoParityTests
{
// -------------------------------------------------------------------------
// Mirror: basic sync
// Go reference: TestJetStreamMirrorStripExpectedHeaders — jetstream_test.go:9361
// -------------------------------------------------------------------------
[Fact]
// Go: TestJetStreamMirrorStripExpectedHeaders — mirror receives messages published
// to the origin. In .NET we verify the basic in-process mirror sync path:
// publish to origin → stored in mirror via RebuildReplicationCoordinators.
public async Task Mirror_syncs_messages_from_origin_through_stream_manager()
{
var mgr = new StreamManager();
mgr.CreateOrUpdate(new StreamConfig { Name = "S", Subjects = ["foo"] });
mgr.CreateOrUpdate(new StreamConfig { Name = "M", Mirror = "S" });
var ack = mgr.Capture("foo", "hello"u8.ToArray());
ack.ShouldNotBeNull();
ack!.ErrorCode.ShouldBeNull();
ack.Seq.ShouldBe(1UL);
var state = await mgr.GetStateAsync("M", default);
state.Messages.ShouldBe(1UL);
var msg = mgr.GetMessage("M", 1);
msg.ShouldNotBeNull();
msg!.Subject.ShouldBe("foo");
}
// -------------------------------------------------------------------------
// Mirror: promote (updates) and FirstSeq validation
// Go reference: TestJetStreamMirrorUpdatesNotSupported — jetstream_test.go:14127
// Go reference: TestJetStreamMirrorFirstSeqNotSupported — jetstream_test.go:14150
// -------------------------------------------------------------------------
[Fact]
// Go: mirror can be promoted by updating it with Mirror = null and adding subjects.
public void Mirror_can_be_promoted_by_removing_mirror_field()
{
var mgr = new StreamManager();
mgr.CreateOrUpdate(new StreamConfig { Name = "SOURCE" });
mgr.CreateOrUpdate(new StreamConfig { Name = "M", Mirror = "SOURCE" });
var result = mgr.CreateOrUpdate(new StreamConfig
{
Name = "M",
Mirror = null,
Subjects = ["m.>"],
});
result.Error.ShouldBeNull();
result.StreamInfo.ShouldNotBeNull();
result.StreamInfo!.Config.Mirror.ShouldBeNull();
}
[Fact]
// Go: TestJetStreamMirrorFirstSeqNotSupported — mirror + FirstSeq is invalid.
// Reference: server/stream.go:1028-1031 (NewJSMirrorWithFirstSeqError)
public void Mirror_with_first_seq_is_rejected()
{
var mgr = new StreamManager();
mgr.CreateOrUpdate(new StreamConfig { Name = "SOURCE" });
var result = mgr.CreateOrUpdate(new StreamConfig
{
Name = "M",
Mirror = "SOURCE",
FirstSeq = 123,
});
result.Error.ShouldNotBeNull();
result.Error!.Description.ShouldContain("first sequence");
}
// -------------------------------------------------------------------------
// Mirror: clipping OptStartSeq to origin bounds
// Go reference: TestJetStreamMirroringClipStartSeq — jetstream_test.go:18203
// -------------------------------------------------------------------------
[Fact]
// Go: when OptStartSeq for a mirror exceeds the origin's last sequence the mirror
// coordinator should still sync all available messages without crashing.
public async Task Mirror_coordinator_clips_start_seq_beyond_origin_end()
{
var origin = new MemStore();
var target = new MemStore();
for (var i = 0; i < 10; i++)
await origin.AppendAsync($"test.{i}", System.Text.Encoding.UTF8.GetBytes($"p{i}"), default);
await using var mirror = new MirrorCoordinator(target);
mirror.StartPullSyncLoop(origin);
await WaitForConditionAsync(() => mirror.LastOriginSequence >= 10, TimeSpan.FromSeconds(5));
mirror.LastOriginSequence.ShouldBe(10UL);
var state = await target.GetStateAsync(default);
state.Messages.ShouldBe(10UL);
}
// -------------------------------------------------------------------------
// Mirror: AllowMsgTtl accepted; SubjectDeleteMarkerTtl + Mirror rejected
// Go reference: TestJetStreamMessageTTLWhenMirroring — jetstream_test.go:18753
// Go reference: TestJetStreamSubjectDeleteMarkersWithMirror — jetstream_test.go:19052
// -------------------------------------------------------------------------
[Fact]
// Go: mirror can have AllowMsgTtl=true (per-message TTL is forwarded to mirror).
public void Mirror_with_allow_msg_ttl_is_accepted()
{
var mgr = new StreamManager();
mgr.CreateOrUpdate(new StreamConfig { Name = "ORIGIN" });
var result = mgr.CreateOrUpdate(new StreamConfig
{
Name = "M",
Mirror = "ORIGIN",
AllowMsgTtl = true,
});
result.Error.ShouldBeNull();
result.StreamInfo!.Config.AllowMsgTtl.ShouldBeTrue();
}
[Fact]
// Go: SubjectDeleteMarkerTtlMs + Mirror is invalid.
// Reference: server/stream.go:1050-1053
public void Mirror_with_subject_delete_marker_ttl_is_rejected()
{
var mgr = new StreamManager();
mgr.CreateOrUpdate(new StreamConfig { Name = "ORIGIN" });
var result = mgr.CreateOrUpdate(new StreamConfig
{
Name = "M",
Mirror = "ORIGIN",
SubjectDeleteMarkerTtlMs = 5000,
});
result.Error.ShouldNotBeNull();
result.Error!.Description.ShouldContain("subject delete marker TTL");
}
// -------------------------------------------------------------------------
// Mirror: promote after deleting origin
// Go reference: TestJetStreamPromoteMirrorDeletingOrigin — jetstream_test.go:21462
// -------------------------------------------------------------------------
[Fact]
// Go: after origin is deleted the subject conflict goes away so the mirror can be
// promoted to a regular stream with those subjects.
public void Promote_mirror_succeeds_after_deleting_conflicting_origin()
{
var mgr = new StreamManager();
mgr.CreateOrUpdate(new StreamConfig { Name = "O", Subjects = ["foo"] });
mgr.CreateOrUpdate(new StreamConfig { Name = "M", Mirror = "O" });
mgr.Delete("O");
var result = mgr.CreateOrUpdate(new StreamConfig
{
Name = "M",
Mirror = null,
Subjects = ["foo"],
});
result.Error.ShouldBeNull();
result.StreamInfo!.Config.Mirror.ShouldBeNull();
result.StreamInfo.Config.Subjects.ShouldContain("foo");
}
[Fact]
// Go: after promoting the mirror, new publishes to the promoted stream work.
// Go reference: TestJetStreamPromoteMirrorUpdatingOrigin — jetstream_test.go:21550
public void Promote_mirror_allows_new_publishes()
{
var mgr = new StreamManager();
mgr.CreateOrUpdate(new StreamConfig { Name = "O", Subjects = ["foo"] });
mgr.CreateOrUpdate(new StreamConfig { Name = "M", Mirror = "O" });
mgr.Capture("foo", "msg1"u8.ToArray());
mgr.Delete("O");
mgr.CreateOrUpdate(new StreamConfig
{
Name = "M",
Mirror = null,
Subjects = ["foo"],
});
var ack = mgr.Capture("foo", "msg2"u8.ToArray());
ack.ShouldNotBeNull();
ack!.ErrorCode.ShouldBeNull();
}
// -------------------------------------------------------------------------
// Mirror / Source: AllowMsgSchedules incompatibility
// Go reference: TestJetStreamScheduledMirrorOrSource — jetstream_test.go:21643
// -------------------------------------------------------------------------
[Fact]
// Go: NewJSMirrorWithMsgSchedulesError — mirror + AllowMsgSchedules is invalid.
// Reference: server/stream.go:1040-1046
public void Mirror_with_allow_msg_schedules_is_rejected()
{
var mgr = new StreamManager();
var result = mgr.CreateOrUpdate(new StreamConfig
{
Name = "TEST",
Mirror = "M",
AllowMsgSchedules = true,
});
result.Error.ShouldNotBeNull();
result.Error!.Description.ShouldContain("message schedules");
}
[Fact]
// Go: NewJSSourceWithMsgSchedulesError — sources + AllowMsgSchedules is invalid.
// Reference: server/stream.go:1040-1046
public void Source_with_allow_msg_schedules_is_rejected()
{
var mgr = new StreamManager();
var result = mgr.CreateOrUpdate(new StreamConfig
{
Name = "TEST",
Sources = [new StreamSourceConfig { Name = "S" }],
AllowMsgSchedules = true,
});
result.Error.ShouldNotBeNull();
result.Error!.Description.ShouldContain("message schedules");
}
// -------------------------------------------------------------------------
// Mirror: normalization strips subjects (recovery from bad config)
// Go reference: TestJetStreamRecoverBadMirrorConfigWithSubjects — jetstream_test.go:11255
// -------------------------------------------------------------------------
[Fact]
// Go: mirror streams must not carry subject lists — they inherit subjects from origin.
// When recovering a bad mirror config that has subjects, the server clears them.
// Reference: server/stream.go:1020-1025 (clearMirrorSubjects recovery path)
public void Mirror_stream_subjects_are_cleared_on_creation()
{
var mgr = new StreamManager();
mgr.CreateOrUpdate(new StreamConfig { Name = "S", Subjects = ["foo"] });
var result = mgr.CreateOrUpdate(new StreamConfig
{
Name = "M",
Subjects = ["foo", "bar", "baz"],
Mirror = "S",
});
result.Error.ShouldBeNull();
result.StreamInfo!.Config.Subjects.ShouldBeEmpty();
}
// -------------------------------------------------------------------------
// Source: work queue with limit
// Go reference: TestJetStreamSourceWorkingQueueWithLimit — jetstream_test.go:9677
// -------------------------------------------------------------------------
[Fact]
// Go: sourcing into a WorkQueue-retention stream with MaxMsgs limit.
// We verify the source coordinator applies the subject filter.
public async Task Source_work_queue_with_limit_retains_filtered_messages()
{
var origin = new MemStore();
var target = new MemStore();
for (var i = 1; i <= 20; i++)
await origin.AppendAsync($"orders.{i}", System.Text.Encoding.UTF8.GetBytes($"p{i}"), default);
var sourceCfg = new StreamSourceConfig { Name = "ORIGIN", FilterSubject = "orders.*" };
await using var source = new SourceCoordinator(target, sourceCfg);
source.StartPullSyncLoop(origin);
await WaitForConditionAsync(() => source.LastOriginSequence >= 20, TimeSpan.FromSeconds(5));
var state = await target.GetStateAsync(default);
state.Messages.ShouldBe(20UL);
}
// -------------------------------------------------------------------------
// Source: from KV bucket stream
// Go reference: TestJetStreamStreamSourceFromKV — jetstream_test.go:9749
// -------------------------------------------------------------------------
[Fact]
// Go: sourcing from a KV bucket stream (streams with $KV.<bucket>.> subjects).
public async Task Source_from_kv_bucket_stream_pulls_key_value_messages()
{
var kvOrigin = new MemStore();
await kvOrigin.AppendAsync("$KV.BUCKET.key1", "val1"u8.ToArray(), default);
await kvOrigin.AppendAsync("$KV.BUCKET.key2", "val2"u8.ToArray(), default);
await kvOrigin.AppendAsync("$KV.BUCKET.key1", "val1-updated"u8.ToArray(), default);
var target = new MemStore();
var sourceCfg = new StreamSourceConfig { Name = "KV_BUCKET", FilterSubject = "$KV.BUCKET.>" };
await using var source = new SourceCoordinator(target, sourceCfg);
source.StartPullSyncLoop(kvOrigin);
await WaitForConditionAsync(() => source.LastOriginSequence >= 3, TimeSpan.FromSeconds(5));
var state = await target.GetStateAsync(default);
state.Messages.ShouldBe(3UL);
}
// -------------------------------------------------------------------------
// Source: removal and re-add
// Go reference: TestJetStreamSourceRemovalAndReAdd — jetstream_test.go:17931
// -------------------------------------------------------------------------
[Fact]
// Go: removing a source from a stream stops new messages being forwarded;
// re-adding the source makes new messages flow again.
public async Task Source_removal_and_readd_resumes_forwarding()
{
var mgr = new StreamManager();
mgr.CreateOrUpdate(new StreamConfig { Name = "SRC", Subjects = ["foo.*"] });
mgr.CreateOrUpdate(new StreamConfig
{
Name = "TEST",
Subjects = [],
Sources = [new StreamSourceConfig { Name = "SRC" }],
});
for (var i = 0; i < 10; i++)
mgr.Capture($"foo.{i}", System.Text.Encoding.UTF8.GetBytes($"p{i}"));
var stateAfterFirst = await mgr.GetStateAsync("TEST", default);
stateAfterFirst.Messages.ShouldBe(10UL);
// Remove source — rebuild coordinators removes the source link
mgr.CreateOrUpdate(new StreamConfig { Name = "TEST", Subjects = [] });
for (var i = 10; i < 20; i++)
mgr.Capture($"foo.{i}", System.Text.Encoding.UTF8.GetBytes($"p{i}"));
var stateAfterRemove = await mgr.GetStateAsync("TEST", default);
stateAfterRemove.Messages.ShouldBe(10UL);
// Re-add source — new messages flow again
mgr.CreateOrUpdate(new StreamConfig
{
Name = "TEST",
Subjects = [],
Sources = [new StreamSourceConfig { Name = "SRC" }],
});
mgr.Capture("foo.99", "new"u8.ToArray());
var stateAfterReadd = await mgr.GetStateAsync("TEST", default);
stateAfterReadd.Messages.ShouldBe(11UL);
}
// -------------------------------------------------------------------------
// Source: clipping OptStartSeq
// Go reference: TestJetStreamSourcingClipStartSeq — jetstream_test.go:18160
// -------------------------------------------------------------------------
[Fact]
// Go: when OptStartSeq for a source exceeds the origin's last sequence, the source
// coordinator starts from the origin's end without crashing.
public async Task Source_coordinator_clips_start_seq_beyond_origin_end()
{
var origin = new MemStore();
var target = new MemStore();
for (var i = 0; i < 10; i++)
await origin.AppendAsync($"test.{i}", System.Text.Encoding.UTF8.GetBytes($"p{i}"), default);
var sourceCfg = new StreamSourceConfig { Name = "ORIGIN" };
await using var source = new SourceCoordinator(target, sourceCfg);
source.StartPullSyncLoop(origin);
await WaitForConditionAsync(() => source.LastOriginSequence >= 10, TimeSpan.FromSeconds(5));
source.LastOriginSequence.ShouldBe(10UL);
var state = await target.GetStateAsync(default);
state.Messages.ShouldBe(10UL);
}
// -------------------------------------------------------------------------
// Input subject transform
// Go reference: TestJetStreamInputTransform — jetstream_test.go:9803
// -------------------------------------------------------------------------
[Fact]
// Go: TestJetStreamInputTransform — when a stream has SubjectTransform configured,
// messages published to the original subject are stored under the transformed subject.
// E.g., source=">" dest="transformed.>" → "foo" stored as "transformed.foo".
public async Task Input_transform_stores_message_under_transformed_subject()
{
var mgr = new StreamManager();
mgr.CreateOrUpdate(new StreamConfig
{
Name = "T1",
Subjects = ["foo"],
SubjectTransformSource = ">",
SubjectTransformDest = "transformed.>",
});
var ack = mgr.Capture("foo", "OK"u8.ToArray());
ack.ShouldNotBeNull();
ack!.ErrorCode.ShouldBeNull();
var msg = mgr.GetMessage("T1", 1);
msg.ShouldNotBeNull();
msg!.Subject.ShouldBe("transformed.foo");
_ = await mgr.GetStateAsync("T1", default);
}
[Fact]
// Go: TestJetStreamImplicitRePublishAfterSubjectTransform — jetstream_test.go:22180
// After input transform the stored subject is the transformed one.
public async Task Input_transform_followed_by_correct_stored_subject()
{
var mgr = new StreamManager();
mgr.CreateOrUpdate(new StreamConfig
{
Name = "T2",
Subjects = ["events.>"],
SubjectTransformSource = "events.>",
SubjectTransformDest = "stored.>",
});
mgr.Capture("events.login", "data"u8.ToArray());
var msg = mgr.GetMessage("T2", 1);
msg.ShouldNotBeNull();
msg!.Subject.ShouldBe("stored.login");
var state = await mgr.GetStateAsync("T2", default);
state.Messages.ShouldBe(1UL);
}
// -------------------------------------------------------------------------
// Republish: cycle detection
// Go reference: TestJetStreamStreamRepublishCycle — jetstream_test.go:13230
// -------------------------------------------------------------------------
[Fact]
// Go: stream configuration for republish destination must not form a cycle.
// Reference: server/stream.go:1060-1080 (checkRePublish)
public void Republish_cycle_detection_rejects_cyclic_destination()
{
var mgr = new StreamManager();
// Case 1: source=foo.> dest=foo.> — exact cycle
var result1 = mgr.CreateOrUpdate(new StreamConfig
{
Name = "RPC1",
Subjects = ["foo.>", "bar.*", "baz"],
RePublishSource = "foo.>",
RePublishDest = "foo.>",
});
result1.Error.ShouldNotBeNull();
result1.Error!.Description.ShouldContain("cycle");
// Case 2: dest=foo.bar matches foo.> stream subject → cycle
var result2 = mgr.CreateOrUpdate(new StreamConfig
{
Name = "RPC2",
Subjects = ["foo.>", "bar.*", "baz"],
RePublishSource = "bar.bar",
RePublishDest = "foo.bar",
});
result2.Error.ShouldNotBeNull();
// Case 3: dest=bar.bar matches bar.* stream subject → cycle
var result3 = mgr.CreateOrUpdate(new StreamConfig
{
Name = "RPC3",
Subjects = ["foo.>", "bar.*", "baz"],
RePublishSource = "baz",
RePublishDest = "bar.bar",
});
result3.Error.ShouldNotBeNull();
}
// -------------------------------------------------------------------------
// Republish: single-token match
// Go reference: TestJetStreamStreamRepublishOneTokenMatch — jetstream_test.go:13283
// -------------------------------------------------------------------------
[Fact]
// Go: when RePublishSource="one" and dest="uno", messages captured to "one" are stored.
public async Task Republish_single_token_match_accepted_and_captures()
{
var mgr = new StreamManager();
var result = mgr.CreateOrUpdate(new StreamConfig
{
Name = "Stream1",
Subjects = ["one", "four"],
RePublishSource = "one",
RePublishDest = "uno",
RePublishHeadersOnly = false,
});
result.Error.ShouldBeNull();
for (var i = 0; i < 10; i++)
mgr.Capture("one", "msg"u8.ToArray());
var state = await mgr.GetStateAsync("Stream1", default);
state.Messages.ShouldBe(10UL);
}
// -------------------------------------------------------------------------
// Republish: multi-token match
// Go reference: TestJetStreamStreamRepublishMultiTokenMatch — jetstream_test.go:13325
// -------------------------------------------------------------------------
[Fact]
// Go: RePublishSource="one.two.>" dest="uno.dos.>" — captures work for "one.two.three".
public async Task Republish_multi_token_match_accepted_and_captures()
{
var mgr = new StreamManager();
var result = mgr.CreateOrUpdate(new StreamConfig
{
Name = "Stream1",
Subjects = ["one.>", "four.>"],
RePublishSource = "one.two.>",
RePublishDest = "uno.dos.>",
RePublishHeadersOnly = false,
});
result.Error.ShouldBeNull();
for (var i = 0; i < 10; i++)
mgr.Capture("one.two.three", "msg"u8.ToArray());
var state = await mgr.GetStateAsync("Stream1", default);
state.Messages.ShouldBe(10UL);
}
// -------------------------------------------------------------------------
// Republish: any-subject match (empty source filter)
// Go reference: TestJetStreamStreamRepublishAnySubjectMatch — jetstream_test.go:13367
// -------------------------------------------------------------------------
[Fact]
// Go: when RePublishSource is null/empty all subjects are republished.
public async Task Republish_any_subject_match_accepted_when_source_is_empty()
{
var mgr = new StreamManager();
var result = mgr.CreateOrUpdate(new StreamConfig
{
Name = "Stream1",
Subjects = ["one.>", "four.>"],
RePublishSource = null,
RePublishDest = "any.>",
RePublishHeadersOnly = false,
});
result.Error.ShouldBeNull();
mgr.Capture("one.two.three", "msg"u8.ToArray());
mgr.Capture("four.five.six", "msg"u8.ToArray());
var state = await mgr.GetStateAsync("Stream1", default);
state.Messages.ShouldBe(2UL);
}
// -------------------------------------------------------------------------
// Republish: multi-token no match
// Go reference: TestJetStreamStreamRepublishMultiTokenNoMatch — jetstream_test.go:13408
// -------------------------------------------------------------------------
[Fact]
// Go: publishing to "four.five.six" when filter is "one.two.>" should NOT
// trigger republish — message is still stored normally in stream.
public async Task Republish_multi_token_no_match_still_captures_to_stream()
{
var mgr = new StreamManager();
var result = mgr.CreateOrUpdate(new StreamConfig
{
Name = "Stream1",
Subjects = ["one.>", "four.>"],
RePublishSource = "one.two.>",
RePublishDest = "uno.dos.>",
RePublishHeadersOnly = true,
});
result.Error.ShouldBeNull();
for (var i = 0; i < 10; i++)
mgr.Capture("four.five.six", "msg"u8.ToArray());
var state = await mgr.GetStateAsync("Stream1", default);
state.Messages.ShouldBe(10UL);
}
// -------------------------------------------------------------------------
// Republish: single-token no match
// Go reference: TestJetStreamStreamRepublishOneTokenNoMatch — jetstream_test.go:13445
// -------------------------------------------------------------------------
[Fact]
// Go: publishing to "four" when source="one" should not trigger republish.
// Message is still stored in stream normally.
public async Task Republish_single_token_no_match_still_captures_to_stream()
{
var mgr = new StreamManager();
var result = mgr.CreateOrUpdate(new StreamConfig
{
Name = "Stream1",
Subjects = ["one", "four"],
RePublishSource = "one",
RePublishDest = "uno",
RePublishHeadersOnly = true,
});
result.Error.ShouldBeNull();
for (var i = 0; i < 10; i++)
mgr.Capture("four", "msg"u8.ToArray());
var state = await mgr.GetStateAsync("Stream1", default);
state.Messages.ShouldBe(10UL);
}
// -------------------------------------------------------------------------
// Republish: headers-only config
// Go reference: TestJetStreamStreamRepublishHeadersOnly — jetstream_test.go:13482
// -------------------------------------------------------------------------
[Fact]
// Go: with RePublishHeadersOnly=true config is accepted; body omission is external.
public async Task Republish_headers_only_config_accepted_and_captures()
{
var mgr = new StreamManager();
var result = mgr.CreateOrUpdate(new StreamConfig
{
Name = "RPC",
Subjects = ["foo", "bar", "baz"],
RePublishDest = "RP.>",
RePublishHeadersOnly = true,
});
result.Error.ShouldBeNull();
result.StreamInfo!.Config.RePublishHeadersOnly.ShouldBeTrue();
for (var i = 0; i < 10; i++)
mgr.Capture("foo", "msg"u8.ToArray());
var state = await mgr.GetStateAsync("RPC", default);
state.Messages.ShouldBe(10UL);
}
// -------------------------------------------------------------------------
// MirrorCoordinator: lag tracking
// Go reference: server/stream.go:2739-2743 (mirrorInfo)
// -------------------------------------------------------------------------
[Fact]
// Verify mirror coordinator lag calculation used by health reporting.
public async Task Mirror_coordinator_tracks_lag_from_origin()
{
var target = new MemStore();
var mirror = new MirrorCoordinator(target);
var report = mirror.GetHealthReport(originLastSeq: 10);
report.Lag.ShouldBe(10UL);
report.IsRunning.ShouldBeFalse();
await mirror.OnOriginAppendAsync(
new StoredMessage { Sequence = 7, Subject = "a", Payload = "p"u8.ToArray() }, default);
report = mirror.GetHealthReport(originLastSeq: 10);
report.LastOriginSequence.ShouldBe(7UL);
report.Lag.ShouldBe(3UL);
await mirror.OnOriginAppendAsync(
new StoredMessage { Sequence = 10, Subject = "b", Payload = "p"u8.ToArray() }, default);
report = mirror.GetHealthReport(originLastSeq: 10);
report.Lag.ShouldBe(0UL);
}
// -------------------------------------------------------------------------
// SourceCoordinator: filter + transform
// Go reference: server/stream.go:3860-4007 (processInboundSourceMsg)
// -------------------------------------------------------------------------
[Fact]
// Verify SourceCoordinator filters by subject and applies prefix transform.
public async Task Source_coordinator_filters_and_transforms_subjects()
{
var target = new MemStore();
var sourceCfg = new StreamSourceConfig
{
Name = "SRC",
FilterSubject = "orders.*",
SubjectTransformPrefix = "copy.",
};
var source = new SourceCoordinator(target, sourceCfg);
await source.OnOriginAppendAsync(
new StoredMessage { Sequence = 1, Subject = "orders.created", Payload = "1"u8.ToArray() }, default);
await source.OnOriginAppendAsync(
new StoredMessage { Sequence = 2, Subject = "inventory.updated", Payload = "2"u8.ToArray() }, default);
await source.OnOriginAppendAsync(
new StoredMessage { Sequence = 3, Subject = "orders.deleted", Payload = "3"u8.ToArray() }, default);
var state = await target.GetStateAsync(default);
state.Messages.ShouldBe(2UL);
var msg1 = await target.LoadAsync(1, default);
msg1!.Subject.ShouldBe("copy.orders.created");
var msg2 = await target.LoadAsync(2, default);
msg2!.Subject.ShouldBe("copy.orders.deleted");
source.FilteredOutCount.ShouldBe(1L);
}
// -------------------------------------------------------------------------
// SourceCoordinator: pull sync loop with filter
// Go reference: server/stream.go:3474-3720 (setupSourceConsumer)
// -------------------------------------------------------------------------
[Fact]
public async Task Source_coordinator_pull_sync_loop_syncs_filtered_messages()
{
var origin = new MemStore();
var target = new MemStore();
await origin.AppendAsync("orders.1", "p1"u8.ToArray(), default);
await origin.AppendAsync("inventory.1", "p2"u8.ToArray(), default);
await origin.AppendAsync("orders.2", "p3"u8.ToArray(), default);
await origin.AppendAsync("inventory.2", "p4"u8.ToArray(), default);
await origin.AppendAsync("orders.3", "p5"u8.ToArray(), default);
var sourceCfg = new StreamSourceConfig { Name = "ORIGIN", FilterSubject = "orders.*" };
await using var source = new SourceCoordinator(target, sourceCfg);
source.StartPullSyncLoop(origin);
await WaitForConditionAsync(() => source.LastOriginSequence >= 5, TimeSpan.FromSeconds(5));
var state = await target.GetStateAsync(default);
state.Messages.ShouldBe(3UL);
}
// -------------------------------------------------------------------------
// MirrorCoordinator: ignores redelivered messages
// Go reference: server/stream.go:2924 (dc > 1 check)
// -------------------------------------------------------------------------
[Fact]
public async Task Mirror_coordinator_ignores_redelivered_messages_in_channel_loop()
{
var target = new MemStore();
await using var mirror = new MirrorCoordinator(target);
mirror.StartSyncLoop();
mirror.TryEnqueue(new StoredMessage { Sequence = 1, Subject = "a", Payload = "1"u8.ToArray() });
mirror.TryEnqueue(new StoredMessage
{
Sequence = 1,
Subject = "a",
Payload = "1"u8.ToArray(),
Redelivered = true,
});
mirror.TryEnqueue(new StoredMessage { Sequence = 2, Subject = "b", Payload = "2"u8.ToArray() });
await WaitForConditionAsync(() => mirror.LastOriginSequence >= 2, TimeSpan.FromSeconds(5));
var state = await target.GetStateAsync(default);
state.Messages.ShouldBe(2UL);
}
// -------------------------------------------------------------------------
// Skipped tests (require real multi-server / external infrastructure)
// -------------------------------------------------------------------------
[Fact(Skip = "Requires real server restart to test consumer failover — TestJetStreamMirroredConsumerFailAfterRestart:10835")]
public Task Mirror_consumer_fails_after_restart_and_recovers() => Task.CompletedTask;
[Fact(Skip = "Requires real external source/leaf node — TestJetStreamRemoveExternalSource:12150")]
public Task Remove_external_source_stops_forwarding() => Task.CompletedTask;
[Fact(Skip = "Requires real server restart — TestJetStreamWorkQueueSourceRestart:13010")]
public Task Work_queue_source_recovers_after_restart() => Task.CompletedTask;
[Fact(Skip = "Requires real server restart — TestJetStreamWorkQueueSourceNamingRestart:13111")]
public Task Work_queue_source_naming_recovers_after_restart() => Task.CompletedTask;
[Fact(Skip = "Requires real external source stream — TestJetStreamStreamUpdateWithExternalSource:15607")]
public Task Stream_update_with_external_source_works() => Task.CompletedTask;
[Fact(Skip = "AllowMsgCounter requires real server infrastructure — TestJetStreamAllowMsgCounterSourceAggregates:20759")]
public Task Allow_msg_counter_source_aggregates() => Task.CompletedTask;
[Fact(Skip = "AllowMsgCounter requires real server infrastructure — TestJetStreamAllowMsgCounterSourceVerbatim:20844")]
public Task Allow_msg_counter_source_verbatim() => Task.CompletedTask;
[Fact(Skip = "AllowMsgCounter requires real server infrastructure — TestJetStreamAllowMsgCounterSourceStartingAboveZero:20944")]
public Task Allow_msg_counter_source_starting_above_zero() => Task.CompletedTask;
// -------------------------------------------------------------------------
// Helpers
// -------------------------------------------------------------------------
private static async Task WaitForConditionAsync(Func<bool> condition, TimeSpan timeout)
{
using var cts = new CancellationTokenSource(timeout);
while (!condition())
{
await Task.Delay(25, cts.Token);
}
}
}