using NATS.Server.JetStream; using NATS.Server.JetStream.MirrorSource; using NATS.Server.JetStream.Models; using NATS.Server.JetStream.Storage; namespace NATS.Server.JetStream.Tests.JetStream; // Go reference: server/jetstream_test.go — Mirror, Source & Transform parity tests // Each test documents the Go function name and line number it ports. public class MirrorSourceGoParityTests { // ------------------------------------------------------------------------- // Mirror: basic sync // Go reference: TestJetStreamMirrorStripExpectedHeaders — jetstream_test.go:9361 // ------------------------------------------------------------------------- [Fact] // Go: TestJetStreamMirrorStripExpectedHeaders — mirror receives messages published // to the origin. In .NET we verify the basic in-process mirror sync path: // publish to origin → stored in mirror via RebuildReplicationCoordinators. public async Task Mirror_syncs_messages_from_origin_through_stream_manager() { var mgr = new StreamManager(); mgr.CreateOrUpdate(new StreamConfig { Name = "S", Subjects = ["foo"] }); mgr.CreateOrUpdate(new StreamConfig { Name = "M", Mirror = "S" }); var ack = mgr.Capture("foo", "hello"u8.ToArray()); ack.ShouldNotBeNull(); ack!.ErrorCode.ShouldBeNull(); ack.Seq.ShouldBe(1UL); var state = await mgr.GetStateAsync("M", default); state.Messages.ShouldBe(1UL); var msg = mgr.GetMessage("M", 1); msg.ShouldNotBeNull(); msg!.Subject.ShouldBe("foo"); } // ------------------------------------------------------------------------- // Mirror: promote (updates) and FirstSeq validation // Go reference: TestJetStreamMirrorUpdatesNotSupported — jetstream_test.go:14127 // Go reference: TestJetStreamMirrorFirstSeqNotSupported — jetstream_test.go:14150 // ------------------------------------------------------------------------- [Fact] // Go: mirror can be promoted by updating it with Mirror = null and adding subjects. public void Mirror_can_be_promoted_by_removing_mirror_field() { var mgr = new StreamManager(); mgr.CreateOrUpdate(new StreamConfig { Name = "SOURCE" }); mgr.CreateOrUpdate(new StreamConfig { Name = "M", Mirror = "SOURCE" }); var result = mgr.CreateOrUpdate(new StreamConfig { Name = "M", Mirror = null, Subjects = ["m.>"], }); result.Error.ShouldBeNull(); result.StreamInfo.ShouldNotBeNull(); result.StreamInfo!.Config.Mirror.ShouldBeNull(); } [Fact] // Go: TestJetStreamMirrorFirstSeqNotSupported — mirror + FirstSeq is invalid. // Reference: server/stream.go:1028-1031 (NewJSMirrorWithFirstSeqError) public void Mirror_with_first_seq_is_rejected() { var mgr = new StreamManager(); mgr.CreateOrUpdate(new StreamConfig { Name = "SOURCE" }); var result = mgr.CreateOrUpdate(new StreamConfig { Name = "M", Mirror = "SOURCE", FirstSeq = 123, }); result.Error.ShouldNotBeNull(); result.Error!.Description.ShouldContain("first sequence"); } // ------------------------------------------------------------------------- // Mirror: clipping OptStartSeq to origin bounds // Go reference: TestJetStreamMirroringClipStartSeq — jetstream_test.go:18203 // ------------------------------------------------------------------------- [Fact] // Go: when OptStartSeq for a mirror exceeds the origin's last sequence the mirror // coordinator should still sync all available messages without crashing. public async Task Mirror_coordinator_clips_start_seq_beyond_origin_end() { var origin = new MemStore(); var target = new MemStore(); for (var i = 0; i < 10; i++) await origin.AppendAsync($"test.{i}", System.Text.Encoding.UTF8.GetBytes($"p{i}"), default); await using var mirror = new MirrorCoordinator(target); mirror.StartPullSyncLoop(origin); await WaitForConditionAsync(() => mirror.LastOriginSequence >= 10, TimeSpan.FromSeconds(5)); mirror.LastOriginSequence.ShouldBe(10UL); var state = await target.GetStateAsync(default); state.Messages.ShouldBe(10UL); } // ------------------------------------------------------------------------- // Mirror: AllowMsgTtl accepted; SubjectDeleteMarkerTtl + Mirror rejected // Go reference: TestJetStreamMessageTTLWhenMirroring — jetstream_test.go:18753 // Go reference: TestJetStreamSubjectDeleteMarkersWithMirror — jetstream_test.go:19052 // ------------------------------------------------------------------------- [Fact] // Go: mirror can have AllowMsgTtl=true (per-message TTL is forwarded to mirror). public void Mirror_with_allow_msg_ttl_is_accepted() { var mgr = new StreamManager(); mgr.CreateOrUpdate(new StreamConfig { Name = "ORIGIN" }); var result = mgr.CreateOrUpdate(new StreamConfig { Name = "M", Mirror = "ORIGIN", AllowMsgTtl = true, }); result.Error.ShouldBeNull(); result.StreamInfo!.Config.AllowMsgTtl.ShouldBeTrue(); } [Fact] // Go: SubjectDeleteMarkerTtlMs + Mirror is invalid. // Reference: server/stream.go:1050-1053 public void Mirror_with_subject_delete_marker_ttl_is_rejected() { var mgr = new StreamManager(); mgr.CreateOrUpdate(new StreamConfig { Name = "ORIGIN" }); var result = mgr.CreateOrUpdate(new StreamConfig { Name = "M", Mirror = "ORIGIN", SubjectDeleteMarkerTtlMs = 5000, }); result.Error.ShouldNotBeNull(); result.Error!.Description.ShouldContain("subject delete marker TTL"); } // ------------------------------------------------------------------------- // Mirror: promote after deleting origin // Go reference: TestJetStreamPromoteMirrorDeletingOrigin — jetstream_test.go:21462 // ------------------------------------------------------------------------- [Fact] // Go: after origin is deleted the subject conflict goes away so the mirror can be // promoted to a regular stream with those subjects. public void Promote_mirror_succeeds_after_deleting_conflicting_origin() { var mgr = new StreamManager(); mgr.CreateOrUpdate(new StreamConfig { Name = "O", Subjects = ["foo"] }); mgr.CreateOrUpdate(new StreamConfig { Name = "M", Mirror = "O" }); mgr.Delete("O"); var result = mgr.CreateOrUpdate(new StreamConfig { Name = "M", Mirror = null, Subjects = ["foo"], }); result.Error.ShouldBeNull(); result.StreamInfo!.Config.Mirror.ShouldBeNull(); result.StreamInfo.Config.Subjects.ShouldContain("foo"); } [Fact] // Go: after promoting the mirror, new publishes to the promoted stream work. // Go reference: TestJetStreamPromoteMirrorUpdatingOrigin — jetstream_test.go:21550 public void Promote_mirror_allows_new_publishes() { var mgr = new StreamManager(); mgr.CreateOrUpdate(new StreamConfig { Name = "O", Subjects = ["foo"] }); mgr.CreateOrUpdate(new StreamConfig { Name = "M", Mirror = "O" }); mgr.Capture("foo", "msg1"u8.ToArray()); mgr.Delete("O"); mgr.CreateOrUpdate(new StreamConfig { Name = "M", Mirror = null, Subjects = ["foo"], }); var ack = mgr.Capture("foo", "msg2"u8.ToArray()); ack.ShouldNotBeNull(); ack!.ErrorCode.ShouldBeNull(); } // ------------------------------------------------------------------------- // Mirror / Source: AllowMsgSchedules incompatibility // Go reference: TestJetStreamScheduledMirrorOrSource — jetstream_test.go:21643 // ------------------------------------------------------------------------- [Fact] // Go: NewJSMirrorWithMsgSchedulesError — mirror + AllowMsgSchedules is invalid. // Reference: server/stream.go:1040-1046 public void Mirror_with_allow_msg_schedules_is_rejected() { var mgr = new StreamManager(); var result = mgr.CreateOrUpdate(new StreamConfig { Name = "TEST", Mirror = "M", AllowMsgSchedules = true, }); result.Error.ShouldNotBeNull(); result.Error!.Description.ShouldContain("message schedules"); } [Fact] // Go: NewJSSourceWithMsgSchedulesError — sources + AllowMsgSchedules is invalid. // Reference: server/stream.go:1040-1046 public void Source_with_allow_msg_schedules_is_rejected() { var mgr = new StreamManager(); var result = mgr.CreateOrUpdate(new StreamConfig { Name = "TEST", Sources = [new StreamSourceConfig { Name = "S" }], AllowMsgSchedules = true, }); result.Error.ShouldNotBeNull(); result.Error!.Description.ShouldContain("message schedules"); } // ------------------------------------------------------------------------- // Mirror: normalization strips subjects (recovery from bad config) // Go reference: TestJetStreamRecoverBadMirrorConfigWithSubjects — jetstream_test.go:11255 // ------------------------------------------------------------------------- [Fact] // Go: mirror streams must not carry subject lists — they inherit subjects from origin. // When recovering a bad mirror config that has subjects, the server clears them. // Reference: server/stream.go:1020-1025 (clearMirrorSubjects recovery path) public void Mirror_stream_subjects_are_cleared_on_creation() { var mgr = new StreamManager(); mgr.CreateOrUpdate(new StreamConfig { Name = "S", Subjects = ["foo"] }); var result = mgr.CreateOrUpdate(new StreamConfig { Name = "M", Subjects = ["foo", "bar", "baz"], Mirror = "S", }); result.Error.ShouldBeNull(); result.StreamInfo!.Config.Subjects.ShouldBeEmpty(); } // ------------------------------------------------------------------------- // Source: work queue with limit // Go reference: TestJetStreamSourceWorkingQueueWithLimit — jetstream_test.go:9677 // ------------------------------------------------------------------------- [Fact] // Go: sourcing into a WorkQueue-retention stream with MaxMsgs limit. // We verify the source coordinator applies the subject filter. public async Task Source_work_queue_with_limit_retains_filtered_messages() { var origin = new MemStore(); var target = new MemStore(); for (var i = 1; i <= 20; i++) await origin.AppendAsync($"orders.{i}", System.Text.Encoding.UTF8.GetBytes($"p{i}"), default); var sourceCfg = new StreamSourceConfig { Name = "ORIGIN", FilterSubject = "orders.*" }; await using var source = new SourceCoordinator(target, sourceCfg); source.StartPullSyncLoop(origin); await WaitForConditionAsync(() => source.LastOriginSequence >= 20, TimeSpan.FromSeconds(5)); var state = await target.GetStateAsync(default); state.Messages.ShouldBe(20UL); } // ------------------------------------------------------------------------- // Source: from KV bucket stream // Go reference: TestJetStreamStreamSourceFromKV — jetstream_test.go:9749 // ------------------------------------------------------------------------- [Fact] // Go: sourcing from a KV bucket stream (streams with $KV..> subjects). public async Task Source_from_kv_bucket_stream_pulls_key_value_messages() { var kvOrigin = new MemStore(); await kvOrigin.AppendAsync("$KV.BUCKET.key1", "val1"u8.ToArray(), default); await kvOrigin.AppendAsync("$KV.BUCKET.key2", "val2"u8.ToArray(), default); await kvOrigin.AppendAsync("$KV.BUCKET.key1", "val1-updated"u8.ToArray(), default); var target = new MemStore(); var sourceCfg = new StreamSourceConfig { Name = "KV_BUCKET", FilterSubject = "$KV.BUCKET.>" }; await using var source = new SourceCoordinator(target, sourceCfg); source.StartPullSyncLoop(kvOrigin); await WaitForConditionAsync(() => source.LastOriginSequence >= 3, TimeSpan.FromSeconds(5)); var state = await target.GetStateAsync(default); state.Messages.ShouldBe(3UL); } // ------------------------------------------------------------------------- // Source: removal and re-add // Go reference: TestJetStreamSourceRemovalAndReAdd — jetstream_test.go:17931 // ------------------------------------------------------------------------- [Fact] // Go: removing a source from a stream stops new messages being forwarded; // re-adding the source makes new messages flow again. public async Task Source_removal_and_readd_resumes_forwarding() { var mgr = new StreamManager(); mgr.CreateOrUpdate(new StreamConfig { Name = "SRC", Subjects = ["foo.*"] }); mgr.CreateOrUpdate(new StreamConfig { Name = "TEST", Subjects = [], Sources = [new StreamSourceConfig { Name = "SRC" }], }); for (var i = 0; i < 10; i++) mgr.Capture($"foo.{i}", System.Text.Encoding.UTF8.GetBytes($"p{i}")); var stateAfterFirst = await mgr.GetStateAsync("TEST", default); stateAfterFirst.Messages.ShouldBe(10UL); // Remove source — rebuild coordinators removes the source link mgr.CreateOrUpdate(new StreamConfig { Name = "TEST", Subjects = [] }); for (var i = 10; i < 20; i++) mgr.Capture($"foo.{i}", System.Text.Encoding.UTF8.GetBytes($"p{i}")); var stateAfterRemove = await mgr.GetStateAsync("TEST", default); stateAfterRemove.Messages.ShouldBe(10UL); // Re-add source — new messages flow again mgr.CreateOrUpdate(new StreamConfig { Name = "TEST", Subjects = [], Sources = [new StreamSourceConfig { Name = "SRC" }], }); mgr.Capture("foo.99", "new"u8.ToArray()); var stateAfterReadd = await mgr.GetStateAsync("TEST", default); stateAfterReadd.Messages.ShouldBe(11UL); } // ------------------------------------------------------------------------- // Source: clipping OptStartSeq // Go reference: TestJetStreamSourcingClipStartSeq — jetstream_test.go:18160 // ------------------------------------------------------------------------- [Fact] // Go: when OptStartSeq for a source exceeds the origin's last sequence, the source // coordinator starts from the origin's end without crashing. public async Task Source_coordinator_clips_start_seq_beyond_origin_end() { var origin = new MemStore(); var target = new MemStore(); for (var i = 0; i < 10; i++) await origin.AppendAsync($"test.{i}", System.Text.Encoding.UTF8.GetBytes($"p{i}"), default); var sourceCfg = new StreamSourceConfig { Name = "ORIGIN" }; await using var source = new SourceCoordinator(target, sourceCfg); source.StartPullSyncLoop(origin); await WaitForConditionAsync(() => source.LastOriginSequence >= 10, TimeSpan.FromSeconds(5)); source.LastOriginSequence.ShouldBe(10UL); var state = await target.GetStateAsync(default); state.Messages.ShouldBe(10UL); } // ------------------------------------------------------------------------- // Input subject transform // Go reference: TestJetStreamInputTransform — jetstream_test.go:9803 // ------------------------------------------------------------------------- [Fact] // Go: TestJetStreamInputTransform — when a stream has SubjectTransform configured, // messages published to the original subject are stored under the transformed subject. // E.g., source=">" dest="transformed.>" → "foo" stored as "transformed.foo". public async Task Input_transform_stores_message_under_transformed_subject() { var mgr = new StreamManager(); mgr.CreateOrUpdate(new StreamConfig { Name = "T1", Subjects = ["foo"], SubjectTransformSource = ">", SubjectTransformDest = "transformed.>", }); var ack = mgr.Capture("foo", "OK"u8.ToArray()); ack.ShouldNotBeNull(); ack!.ErrorCode.ShouldBeNull(); var msg = mgr.GetMessage("T1", 1); msg.ShouldNotBeNull(); msg!.Subject.ShouldBe("transformed.foo"); _ = await mgr.GetStateAsync("T1", default); } [Fact] // Go: TestJetStreamImplicitRePublishAfterSubjectTransform — jetstream_test.go:22180 // After input transform the stored subject is the transformed one. public async Task Input_transform_followed_by_correct_stored_subject() { var mgr = new StreamManager(); mgr.CreateOrUpdate(new StreamConfig { Name = "T2", Subjects = ["events.>"], SubjectTransformSource = "events.>", SubjectTransformDest = "stored.>", }); mgr.Capture("events.login", "data"u8.ToArray()); var msg = mgr.GetMessage("T2", 1); msg.ShouldNotBeNull(); msg!.Subject.ShouldBe("stored.login"); var state = await mgr.GetStateAsync("T2", default); state.Messages.ShouldBe(1UL); } // ------------------------------------------------------------------------- // Republish: cycle detection // Go reference: TestJetStreamStreamRepublishCycle — jetstream_test.go:13230 // ------------------------------------------------------------------------- [Fact] // Go: stream configuration for republish destination must not form a cycle. // Reference: server/stream.go:1060-1080 (checkRePublish) public void Republish_cycle_detection_rejects_cyclic_destination() { var mgr = new StreamManager(); // Case 1: source=foo.> dest=foo.> — exact cycle var result1 = mgr.CreateOrUpdate(new StreamConfig { Name = "RPC1", Subjects = ["foo.>", "bar.*", "baz"], RePublishSource = "foo.>", RePublishDest = "foo.>", }); result1.Error.ShouldNotBeNull(); result1.Error!.Description.ShouldContain("cycle"); // Case 2: dest=foo.bar matches foo.> stream subject → cycle var result2 = mgr.CreateOrUpdate(new StreamConfig { Name = "RPC2", Subjects = ["foo.>", "bar.*", "baz"], RePublishSource = "bar.bar", RePublishDest = "foo.bar", }); result2.Error.ShouldNotBeNull(); // Case 3: dest=bar.bar matches bar.* stream subject → cycle var result3 = mgr.CreateOrUpdate(new StreamConfig { Name = "RPC3", Subjects = ["foo.>", "bar.*", "baz"], RePublishSource = "baz", RePublishDest = "bar.bar", }); result3.Error.ShouldNotBeNull(); } // ------------------------------------------------------------------------- // Republish: single-token match // Go reference: TestJetStreamStreamRepublishOneTokenMatch — jetstream_test.go:13283 // ------------------------------------------------------------------------- [Fact] // Go: when RePublishSource="one" and dest="uno", messages captured to "one" are stored. public async Task Republish_single_token_match_accepted_and_captures() { var mgr = new StreamManager(); var result = mgr.CreateOrUpdate(new StreamConfig { Name = "Stream1", Subjects = ["one", "four"], RePublishSource = "one", RePublishDest = "uno", RePublishHeadersOnly = false, }); result.Error.ShouldBeNull(); for (var i = 0; i < 10; i++) mgr.Capture("one", "msg"u8.ToArray()); var state = await mgr.GetStateAsync("Stream1", default); state.Messages.ShouldBe(10UL); } // ------------------------------------------------------------------------- // Republish: multi-token match // Go reference: TestJetStreamStreamRepublishMultiTokenMatch — jetstream_test.go:13325 // ------------------------------------------------------------------------- [Fact] // Go: RePublishSource="one.two.>" dest="uno.dos.>" — captures work for "one.two.three". public async Task Republish_multi_token_match_accepted_and_captures() { var mgr = new StreamManager(); var result = mgr.CreateOrUpdate(new StreamConfig { Name = "Stream1", Subjects = ["one.>", "four.>"], RePublishSource = "one.two.>", RePublishDest = "uno.dos.>", RePublishHeadersOnly = false, }); result.Error.ShouldBeNull(); for (var i = 0; i < 10; i++) mgr.Capture("one.two.three", "msg"u8.ToArray()); var state = await mgr.GetStateAsync("Stream1", default); state.Messages.ShouldBe(10UL); } // ------------------------------------------------------------------------- // Republish: any-subject match (empty source filter) // Go reference: TestJetStreamStreamRepublishAnySubjectMatch — jetstream_test.go:13367 // ------------------------------------------------------------------------- [Fact] // Go: when RePublishSource is null/empty all subjects are republished. public async Task Republish_any_subject_match_accepted_when_source_is_empty() { var mgr = new StreamManager(); var result = mgr.CreateOrUpdate(new StreamConfig { Name = "Stream1", Subjects = ["one.>", "four.>"], RePublishSource = null, RePublishDest = "any.>", RePublishHeadersOnly = false, }); result.Error.ShouldBeNull(); mgr.Capture("one.two.three", "msg"u8.ToArray()); mgr.Capture("four.five.six", "msg"u8.ToArray()); var state = await mgr.GetStateAsync("Stream1", default); state.Messages.ShouldBe(2UL); } // ------------------------------------------------------------------------- // Republish: multi-token no match // Go reference: TestJetStreamStreamRepublishMultiTokenNoMatch — jetstream_test.go:13408 // ------------------------------------------------------------------------- [Fact] // Go: publishing to "four.five.six" when filter is "one.two.>" should NOT // trigger republish — message is still stored normally in stream. public async Task Republish_multi_token_no_match_still_captures_to_stream() { var mgr = new StreamManager(); var result = mgr.CreateOrUpdate(new StreamConfig { Name = "Stream1", Subjects = ["one.>", "four.>"], RePublishSource = "one.two.>", RePublishDest = "uno.dos.>", RePublishHeadersOnly = true, }); result.Error.ShouldBeNull(); for (var i = 0; i < 10; i++) mgr.Capture("four.five.six", "msg"u8.ToArray()); var state = await mgr.GetStateAsync("Stream1", default); state.Messages.ShouldBe(10UL); } // ------------------------------------------------------------------------- // Republish: single-token no match // Go reference: TestJetStreamStreamRepublishOneTokenNoMatch — jetstream_test.go:13445 // ------------------------------------------------------------------------- [Fact] // Go: publishing to "four" when source="one" should not trigger republish. // Message is still stored in stream normally. public async Task Republish_single_token_no_match_still_captures_to_stream() { var mgr = new StreamManager(); var result = mgr.CreateOrUpdate(new StreamConfig { Name = "Stream1", Subjects = ["one", "four"], RePublishSource = "one", RePublishDest = "uno", RePublishHeadersOnly = true, }); result.Error.ShouldBeNull(); for (var i = 0; i < 10; i++) mgr.Capture("four", "msg"u8.ToArray()); var state = await mgr.GetStateAsync("Stream1", default); state.Messages.ShouldBe(10UL); } // ------------------------------------------------------------------------- // Republish: headers-only config // Go reference: TestJetStreamStreamRepublishHeadersOnly — jetstream_test.go:13482 // ------------------------------------------------------------------------- [Fact] // Go: with RePublishHeadersOnly=true config is accepted; body omission is external. public async Task Republish_headers_only_config_accepted_and_captures() { var mgr = new StreamManager(); var result = mgr.CreateOrUpdate(new StreamConfig { Name = "RPC", Subjects = ["foo", "bar", "baz"], RePublishDest = "RP.>", RePublishHeadersOnly = true, }); result.Error.ShouldBeNull(); result.StreamInfo!.Config.RePublishHeadersOnly.ShouldBeTrue(); for (var i = 0; i < 10; i++) mgr.Capture("foo", "msg"u8.ToArray()); var state = await mgr.GetStateAsync("RPC", default); state.Messages.ShouldBe(10UL); } // ------------------------------------------------------------------------- // MirrorCoordinator: lag tracking // Go reference: server/stream.go:2739-2743 (mirrorInfo) // ------------------------------------------------------------------------- [Fact] // Verify mirror coordinator lag calculation used by health reporting. public async Task Mirror_coordinator_tracks_lag_from_origin() { var target = new MemStore(); var mirror = new MirrorCoordinator(target); var report = mirror.GetHealthReport(originLastSeq: 10); report.Lag.ShouldBe(10UL); report.IsRunning.ShouldBeFalse(); await mirror.OnOriginAppendAsync( new StoredMessage { Sequence = 7, Subject = "a", Payload = "p"u8.ToArray() }, default); report = mirror.GetHealthReport(originLastSeq: 10); report.LastOriginSequence.ShouldBe(7UL); report.Lag.ShouldBe(3UL); await mirror.OnOriginAppendAsync( new StoredMessage { Sequence = 10, Subject = "b", Payload = "p"u8.ToArray() }, default); report = mirror.GetHealthReport(originLastSeq: 10); report.Lag.ShouldBe(0UL); } // ------------------------------------------------------------------------- // SourceCoordinator: filter + transform // Go reference: server/stream.go:3860-4007 (processInboundSourceMsg) // ------------------------------------------------------------------------- [Fact] // Verify SourceCoordinator filters by subject and applies prefix transform. public async Task Source_coordinator_filters_and_transforms_subjects() { var target = new MemStore(); var sourceCfg = new StreamSourceConfig { Name = "SRC", FilterSubject = "orders.*", SubjectTransformPrefix = "copy.", }; var source = new SourceCoordinator(target, sourceCfg); await source.OnOriginAppendAsync( new StoredMessage { Sequence = 1, Subject = "orders.created", Payload = "1"u8.ToArray() }, default); await source.OnOriginAppendAsync( new StoredMessage { Sequence = 2, Subject = "inventory.updated", Payload = "2"u8.ToArray() }, default); await source.OnOriginAppendAsync( new StoredMessage { Sequence = 3, Subject = "orders.deleted", Payload = "3"u8.ToArray() }, default); var state = await target.GetStateAsync(default); state.Messages.ShouldBe(2UL); var msg1 = await target.LoadAsync(1, default); msg1!.Subject.ShouldBe("copy.orders.created"); var msg2 = await target.LoadAsync(2, default); msg2!.Subject.ShouldBe("copy.orders.deleted"); source.FilteredOutCount.ShouldBe(1L); } // ------------------------------------------------------------------------- // SourceCoordinator: pull sync loop with filter // Go reference: server/stream.go:3474-3720 (setupSourceConsumer) // ------------------------------------------------------------------------- [Fact] public async Task Source_coordinator_pull_sync_loop_syncs_filtered_messages() { var origin = new MemStore(); var target = new MemStore(); await origin.AppendAsync("orders.1", "p1"u8.ToArray(), default); await origin.AppendAsync("inventory.1", "p2"u8.ToArray(), default); await origin.AppendAsync("orders.2", "p3"u8.ToArray(), default); await origin.AppendAsync("inventory.2", "p4"u8.ToArray(), default); await origin.AppendAsync("orders.3", "p5"u8.ToArray(), default); var sourceCfg = new StreamSourceConfig { Name = "ORIGIN", FilterSubject = "orders.*" }; await using var source = new SourceCoordinator(target, sourceCfg); source.StartPullSyncLoop(origin); await WaitForConditionAsync(() => source.LastOriginSequence >= 5, TimeSpan.FromSeconds(5)); var state = await target.GetStateAsync(default); state.Messages.ShouldBe(3UL); } // ------------------------------------------------------------------------- // MirrorCoordinator: ignores redelivered messages // Go reference: server/stream.go:2924 (dc > 1 check) // ------------------------------------------------------------------------- [Fact] public async Task Mirror_coordinator_ignores_redelivered_messages_in_channel_loop() { var target = new MemStore(); await using var mirror = new MirrorCoordinator(target); mirror.StartSyncLoop(); mirror.TryEnqueue(new StoredMessage { Sequence = 1, Subject = "a", Payload = "1"u8.ToArray() }); mirror.TryEnqueue(new StoredMessage { Sequence = 1, Subject = "a", Payload = "1"u8.ToArray(), Redelivered = true, }); mirror.TryEnqueue(new StoredMessage { Sequence = 2, Subject = "b", Payload = "2"u8.ToArray() }); await WaitForConditionAsync(() => mirror.LastOriginSequence >= 2, TimeSpan.FromSeconds(5)); var state = await target.GetStateAsync(default); state.Messages.ShouldBe(2UL); } // ------------------------------------------------------------------------- // Skipped tests (require real multi-server / external infrastructure) // ------------------------------------------------------------------------- [Fact(Skip = "Requires real server restart to test consumer failover — TestJetStreamMirroredConsumerFailAfterRestart:10835")] public Task Mirror_consumer_fails_after_restart_and_recovers() => Task.CompletedTask; [Fact(Skip = "Requires real external source/leaf node — TestJetStreamRemoveExternalSource:12150")] public Task Remove_external_source_stops_forwarding() => Task.CompletedTask; [Fact(Skip = "Requires real server restart — TestJetStreamWorkQueueSourceRestart:13010")] public Task Work_queue_source_recovers_after_restart() => Task.CompletedTask; [Fact(Skip = "Requires real server restart — TestJetStreamWorkQueueSourceNamingRestart:13111")] public Task Work_queue_source_naming_recovers_after_restart() => Task.CompletedTask; [Fact(Skip = "Requires real external source stream — TestJetStreamStreamUpdateWithExternalSource:15607")] public Task Stream_update_with_external_source_works() => Task.CompletedTask; [Fact(Skip = "AllowMsgCounter requires real server infrastructure — TestJetStreamAllowMsgCounterSourceAggregates:20759")] public Task Allow_msg_counter_source_aggregates() => Task.CompletedTask; [Fact(Skip = "AllowMsgCounter requires real server infrastructure — TestJetStreamAllowMsgCounterSourceVerbatim:20844")] public Task Allow_msg_counter_source_verbatim() => Task.CompletedTask; [Fact(Skip = "AllowMsgCounter requires real server infrastructure — TestJetStreamAllowMsgCounterSourceStartingAboveZero:20944")] public Task Allow_msg_counter_source_starting_above_zero() => Task.CompletedTask; // ------------------------------------------------------------------------- // Helpers // ------------------------------------------------------------------------- private static async Task WaitForConditionAsync(Func condition, TimeSpan timeout) { using var cts = new CancellationTokenSource(timeout); while (!condition()) { await Task.Delay(25, cts.Token); } } }