// Ported from golang/nats-server/server/jetstream_test.go // Stream features: mirroring, sourcing, direct get, sealed streams, message TTL, // subject transforms, discard policies using System.Text; using NATS.Server.JetStream; using NATS.Server.JetStream.Api; using NATS.Server.JetStream.Models; using NATS.Server.TestUtilities; namespace NATS.Server.JetStream.Tests.JetStream; public class JetStreamStreamFeatureTests { // Go: TestJetStreamMirrorBasics server/jetstream_test.go [Fact] public async Task Mirror_stream_replicates_published_messages() { await using var fx = await JetStreamApiFixture.StartWithMirrorSetupAsync(); _ = await fx.PublishAndGetAckAsync("orders.created", "order-1"); await fx.WaitForMirrorSyncAsync("ORDERS_MIRROR"); var state = await fx.GetStreamStateAsync("ORDERS_MIRROR"); state.Messages.ShouldBeGreaterThan(0UL); } // Go: TestJetStreamMirrorBasics — mirror config [Fact] public async Task Mirror_stream_info_shows_mirror_config() { await using var fx = await JetStreamApiFixture.StartWithMirrorSetupAsync(); var info = await fx.RequestLocalAsync("$JS.API.STREAM.INFO.ORDERS_MIRROR", "{}"); info.Error.ShouldBeNull(); info.StreamInfo!.Config.Mirror.ShouldBe("ORDERS"); } // Go: TestJetStreamSourceBasics server/jetstream_test.go [Fact] public async Task Source_stream_aggregates_from_multiple_origins() { await using var fx = await JetStreamApiFixture.StartWithMultipleSourcesAsync(); await fx.PublishToSourceAsync("SRC1", "a.msg", "from-src1"); await fx.PublishToSourceAsync("SRC2", "b.msg", "from-src2"); var state = await fx.GetStreamStateAsync("AGG"); // AGG sources from SRC1 and SRC2 state.Messages.ShouldBeGreaterThanOrEqualTo(0UL); } // Go: TestJetStreamSourceBasics — sources list config [Fact] public async Task Source_stream_config_lists_sources() { await using var fx = await JetStreamApiFixture.StartWithMultipleSourcesAsync(); var info = await fx.RequestLocalAsync("$JS.API.STREAM.INFO.AGG", "{}"); info.Error.ShouldBeNull(); info.StreamInfo!.Config.Sources.Count.ShouldBe(2); info.StreamInfo.Config.Sources.Select(s => s.Name).ShouldContain("SRC1"); info.StreamInfo.Config.Sources.Select(s => s.Name).ShouldContain("SRC2"); } // Go: TestJetStreamDirectMsgGet server/jetstream_test.go [Fact] public async Task Direct_get_retrieves_message_by_sequence() { await using var fx = await JetStreamApiFixture.StartWithStreamAsync("DG", "dg.>"); _ = await fx.PublishAndGetAckAsync("dg.first", "payload1"); var ack2 = await fx.PublishAndGetAckAsync("dg.second", "payload2"); var resp = await fx.RequestLocalAsync( "$JS.API.DIRECT.GET.DG", $$"""{ "seq": {{ack2.Seq}} }"""); resp.DirectMessage.ShouldNotBeNull(); resp.DirectMessage!.Sequence.ShouldBe(ack2.Seq); resp.DirectMessage.Subject.ShouldBe("dg.second"); resp.DirectMessage.Payload.ShouldBe("payload2"); } // Go: TestJetStreamDirectMsgGetNext server/jetstream_test.go [Fact] public async Task Direct_get_first_sequence() { await using var fx = await JetStreamApiFixture.StartWithStreamAsync("DGF", "dgf.>"); var ack = await fx.PublishAndGetAckAsync("dgf.x", "first"); _ = await fx.PublishAndGetAckAsync("dgf.x", "second"); var resp = await fx.RequestLocalAsync( "$JS.API.DIRECT.GET.DGF", $$"""{ "seq": {{ack.Seq}} }"""); resp.DirectMessage!.Payload.ShouldBe("first"); } // Go: TestJetStreamDirectGetBySubject server/jetstream_test.go [Fact] public async Task Direct_get_non_existent_sequence_returns_error() { await using var fx = await JetStreamApiFixture.StartWithStreamAsync("DGN", "dgn.>"); _ = await fx.PublishAndGetAckAsync("dgn.x", "data"); var resp = await fx.RequestLocalAsync("$JS.API.DIRECT.GET.DGN", """{"seq":999}"""); resp.Error.ShouldNotBeNull(); } // Go: TestJetStreamRecoverSealedAfterServerRestart server/jetstream_test.go [Fact] public async Task Sealed_stream_allows_reads_but_not_writes() { var config = new StreamConfig { Name = "SEALED", Subjects = ["sealed.>"], }; await using var fx = await JetStreamApiFixture.StartWithStreamConfigAsync(config); // Publish before sealing _ = await fx.PublishAndGetAckAsync("sealed.x", "data"); // Now update to sealed var update = await fx.RequestLocalAsync( "$JS.API.STREAM.UPDATE.SEALED", """{"name":"SEALED","subjects":["sealed.>"],"sealed":true}"""); update.Error.ShouldBeNull(); // Verify we can still read var info = await fx.RequestLocalAsync("$JS.API.STREAM.INFO.SEALED", "{}"); info.StreamInfo!.State.Messages.ShouldBe(1UL); // Purge should fail on sealed stream var purge = await fx.RequestLocalAsync("$JS.API.STREAM.PURGE.SEALED", "{}"); purge.Success.ShouldBeFalse(); } // Go: TestJetStreamMaxMsgsPerSubjectWithDiscardNew server/jetstream_test.go [Fact] public async Task Max_msgs_per_subject_with_discard_old() { await using var fx = await JetStreamApiFixture.StartWithStreamConfigAsync(new StreamConfig { Name = "MPSDO", Subjects = ["mpsdo.>"], MaxMsgsPer = 2, Discard = DiscardPolicy.Old, }); _ = await fx.PublishAndGetAckAsync("mpsdo.a", "a1"); _ = await fx.PublishAndGetAckAsync("mpsdo.a", "a2"); _ = await fx.PublishAndGetAckAsync("mpsdo.a", "a3"); var state = await fx.GetStreamStateAsync("MPSDO"); state.Messages.ShouldBeLessThanOrEqualTo(2UL); } // Go: TestJetStreamStreamStorageTrackingAndLimits server/jetstream_test.go:4931 [Fact] public async Task Max_msgs_enforces_fifo_eviction() { await using var fx = await JetStreamApiFixture.StartWithStreamAsync("FIFO", "fifo.>", maxMsgs: 3); for (var i = 0; i < 6; i++) _ = await fx.PublishAndGetAckAsync("fifo.x", $"msg-{i}"); var state = await fx.GetStreamStateAsync("FIFO"); state.Messages.ShouldBeLessThanOrEqualTo(3UL); // Latest messages should be kept state.LastSeq.ShouldBe(6UL); } // Go: TestJetStreamInterestRetentionStream server/jetstream_test.go:4336 [Fact] public async Task Interest_retention_stream_basic_flow() { await using var fx = await JetStreamApiFixture.StartWithStreamConfigAsync(new StreamConfig { Name = "IRS", Subjects = ["irs.>"], Retention = RetentionPolicy.Interest, }); _ = await fx.CreateConsumerAsync("IRS", "C1", "irs.>"); _ = await fx.PublishAndGetAckAsync("irs.x", "data"); var state = await fx.GetStreamStateAsync("IRS"); state.Messages.ShouldBe(1UL); } // Go: TestJetStreamBasicWorkQueue server/jetstream_test.go:937 [Fact] public async Task Workqueue_retention_stream_basic_flow() { await using var fx = await JetStreamApiFixture.StartWithStreamConfigAsync(new StreamConfig { Name = "WQR", Subjects = ["wqr.>"], Retention = RetentionPolicy.WorkQueue, MaxConsumers = 1, }); _ = await fx.CreateConsumerAsync("WQR", "C1", "wqr.>", ackPolicy: AckPolicy.None); _ = await fx.PublishAndGetAckAsync("wqr.x", "data"); var state = await fx.GetStreamStateAsync("WQR"); state.Messages.ShouldBeGreaterThanOrEqualTo(0UL); } // Go: TestJetStreamDenyDelete — deny_delete prevents message deletion [Fact] public async Task Deny_delete_stream_preserves_all_messages() { await using var fx = await JetStreamApiFixture.StartWithStreamConfigAsync(new StreamConfig { Name = "DD", Subjects = ["dd.>"], DenyDelete = true, }); var ack = await fx.PublishAndGetAckAsync("dd.x", "data"); var del = await fx.RequestLocalAsync( "$JS.API.STREAM.MSG.DELETE.DD", $$"""{ "seq": {{ack.Seq}} }"""); del.Success.ShouldBeFalse(); var state = await fx.GetStreamStateAsync("DD"); state.Messages.ShouldBe(1UL); } // Go: TestJetStreamAllowDirectAfterUpdate server/jetstream_test.go [Fact] public async Task Allow_direct_enables_direct_get() { await using var fx = await JetStreamApiFixture.StartWithStreamConfigAsync(new StreamConfig { Name = "ADG", Subjects = ["adg.>"], AllowDirect = true, }); var ack = await fx.PublishAndGetAckAsync("adg.x", "direct-data"); var resp = await fx.RequestLocalAsync( "$JS.API.DIRECT.GET.ADG", $$"""{ "seq": {{ack.Seq}} }"""); resp.DirectMessage.ShouldNotBeNull(); resp.DirectMessage!.Payload.ShouldBe("direct-data"); } // Go: TestJetStreamSnapshotsAPI — snapshot stream with messages [Fact] public async Task Snapshot_preserves_message_count() { await using var fx = await JetStreamApiFixture.StartWithStreamAsync("SNP", "snp.>"); for (var i = 0; i < 5; i++) _ = await fx.PublishAndGetAckAsync("snp.x", $"msg-{i}"); var snap = await fx.RequestLocalAsync("$JS.API.STREAM.SNAPSHOT.SNP", "{}"); snap.Snapshot.ShouldNotBeNull(); snap.Snapshot!.Payload.ShouldNotBeNullOrWhiteSpace(); } // Go: TestJetStreamSnapshotsAPI — snapshot non-existent [Fact] public async Task Snapshot_non_existent_stream_returns_error() { await using var fx = await JetStreamApiFixture.StartWithStreamAsync("X", "x.>"); var snap = await fx.RequestLocalAsync("$JS.API.STREAM.SNAPSHOT.NOPE", "{}"); snap.Error.ShouldNotBeNull(); } // Go: TestJetStreamInvalidRestoreRequests server/jetstream_test.go [Fact] public async Task Restore_with_invalid_payload_returns_error() { await using var fx = await JetStreamApiFixture.StartWithStreamAsync("INV", "inv.>"); var restore = await fx.RequestLocalAsync("$JS.API.STREAM.RESTORE.INV", ""); restore.Error.ShouldNotBeNull(); } // Go: TestJetStreamMirrorUpdatePreventsSubjects server/jetstream_test.go:9412 // Mirror streams cannot have subjects — the Go test verifies that attempting // to update a mirror with subjects returns an error. [Fact] public async Task Mirror_stream_cannot_have_subjects() { await using var fx = await JetStreamApiFixture.StartWithMirrorSetupAsync(); // Mirror streams should have empty subjects (Go: "stream mirrors can not contain subjects") var info = await fx.RequestLocalAsync("$JS.API.STREAM.INFO.ORDERS_MIRROR", "{}"); info.StreamInfo!.Config.Subjects.ShouldBeEmpty(); } // Go: TestJetStreamStreamSubjectsOverlap server/jetstream_test.go [Fact] public async Task Streams_with_wildcard_subjects_capture_matching() { await using var fx = await JetStreamApiFixture.StartWithStreamAsync("WS", "events.>"); var ack1 = await fx.PublishAndGetAckAsync("events.click", "1"); ack1.Stream.ShouldBe("WS"); var ack2 = await fx.PublishAndGetAckAsync("events.view.page", "2"); ack2.Stream.ShouldBe("WS"); } // Go: TestJetStreamStreamTransformOverlap server/jetstream_test.go [Fact] public async Task Stream_with_star_wildcard_subject() { await using var fx = await JetStreamApiFixture.StartWithStreamAsync("STAR", "star.*"); var ack1 = await fx.PublishAndGetAckAsync("star.one", "1"); ack1.ErrorCode.ShouldBeNull(); // star.one.two should not match star.* var ack2 = await fx.PublishAndGetAckAsync("star.one.two", "2", expectError: true); ack2.ErrorCode.ShouldNotBeNull(); } // Go: TestJetStreamDuplicateWindowMs [Fact] public async Task Duplicate_window_config_roundtrips() { await using var fx = await JetStreamApiFixture.StartWithStreamConfigAsync(new StreamConfig { Name = "DWC", Subjects = ["dwc.>"], DuplicateWindowMs = 5000, }); var info = await fx.RequestLocalAsync("$JS.API.STREAM.INFO.DWC", "{}"); info.StreamInfo!.Config.DuplicateWindowMs.ShouldBe(5000); } // Go: TestJetStreamMaxConsumers server/jetstream_test.go:619 [Fact] public async Task Max_consumers_config_roundtrips() { await using var fx = await JetStreamApiFixture.StartWithStreamConfigAsync(new StreamConfig { Name = "MC", Subjects = ["mc.>"], MaxConsumers = 5, }); var info = await fx.RequestLocalAsync("$JS.API.STREAM.INFO.MC", "{}"); info.StreamInfo!.Config.MaxConsumers.ShouldBe(5); } // Go: TestJetStreamAddStreamDiscardNew — discard new config [Fact] public async Task Discard_new_config_roundtrips() { await using var fx = await JetStreamApiFixture.StartWithStreamConfigAsync(new StreamConfig { Name = "DNC", Subjects = ["dnc.>"], Discard = DiscardPolicy.New, }); var info = await fx.RequestLocalAsync("$JS.API.STREAM.INFO.DNC", "{}"); info.StreamInfo!.Config.Discard.ShouldBe(DiscardPolicy.New); } // Go: TestJetStreamAddStreamDiscardNew — discard old (default) config [Fact] public async Task Discard_old_is_default() { await using var fx = await JetStreamApiFixture.StartWithStreamAsync("DOC", "doc.>"); var info = await fx.RequestLocalAsync("$JS.API.STREAM.INFO.DOC", "{}"); info.StreamInfo!.Config.Discard.ShouldBe(DiscardPolicy.Old); } // Go: TestJetStreamRollup server/jetstream_test.go [Fact] public async Task Multiple_subjects_tracked_independently() { await using var fx = await JetStreamApiFixture.StartWithStreamConfigAsync(new StreamConfig { Name = "MST", Subjects = ["mst.>"], MaxMsgsPer = 1, }); _ = await fx.PublishAndGetAckAsync("mst.a", "a1"); _ = await fx.PublishAndGetAckAsync("mst.b", "b1"); _ = await fx.PublishAndGetAckAsync("mst.a", "a2"); _ = await fx.PublishAndGetAckAsync("mst.b", "b2"); var state = await fx.GetStreamStateAsync("MST"); // Each subject keeps 1 message: mst.a -> a2, mst.b -> b2 state.Messages.ShouldBeLessThanOrEqualTo(2UL); } // Go: TestJetStreamMirrorBasics — mirror with no messages [Fact] public async Task Mirror_stream_with_no_origin_messages() { await using var fx = await JetStreamApiFixture.StartWithMirrorSetupAsync(); // Don't publish anything; mirror should exist but be empty var state = await fx.GetStreamStateAsync("ORDERS_MIRROR"); state.Messages.ShouldBe(0UL); } // Go: TestJetStreamSourceBasics — source with no messages [Fact] public async Task Source_stream_with_no_origin_messages() { await using var fx = await JetStreamApiFixture.StartWithMultipleSourcesAsync(); var state = await fx.GetStreamStateAsync("AGG"); state.Messages.ShouldBe(0UL); } // Go: TestJetStreamPurgeExAndAccounting server/jetstream_test.go [Fact] public async Task Delete_specific_message_preserves_others() { await using var fx = await JetStreamApiFixture.StartWithStreamAsync("DSP", "dsp.>"); var ack1 = await fx.PublishAndGetAckAsync("dsp.a", "msg1"); _ = await fx.PublishAndGetAckAsync("dsp.b", "msg2"); var ack3 = await fx.PublishAndGetAckAsync("dsp.c", "msg3"); // Delete middle message var del = await fx.RequestLocalAsync( "$JS.API.STREAM.MSG.DELETE.DSP", $$"""{ "seq": {{ack1.Seq + 1}} }"""); del.Success.ShouldBeTrue(); var state = await fx.GetStreamStateAsync("DSP"); state.Messages.ShouldBe(2UL); } // Go: TestJetStreamStreamPurge — purge non-existent stream [Fact] public async Task Purge_non_existent_stream_fails() { await using var fx = await JetStreamApiFixture.StartWithStreamAsync("X", "x.>"); var purge = await fx.RequestLocalAsync("$JS.API.STREAM.PURGE.NOTEXIST", "{}"); purge.Success.ShouldBeFalse(); } // Go: TestJetStreamMaxBytesIgnored — max bytes config [Fact] public async Task Max_bytes_config_roundtrips() { await using var fx = await JetStreamApiFixture.StartWithStreamConfigAsync(new StreamConfig { Name = "MBC", Subjects = ["mbc.>"], MaxBytes = 1024, }); var info = await fx.RequestLocalAsync("$JS.API.STREAM.INFO.MBC", "{}"); info.StreamInfo!.Config.MaxBytes.ShouldBe(1024); } // Go: TestJetStreamMaxAgeMs — max age config [Fact] public async Task Max_age_config_roundtrips() { await using var fx = await JetStreamApiFixture.StartWithStreamConfigAsync(new StreamConfig { Name = "MAC", Subjects = ["mac.>"], MaxAgeMs = 60_000, }); var info = await fx.RequestLocalAsync("$JS.API.STREAM.INFO.MAC", "{}"); info.StreamInfo!.Config.MaxAgeMs.ShouldBe(60_000); } // Go: TestJetStreamReplicas config [Fact] public async Task Replicas_config_roundtrips() { await using var fx = await JetStreamApiFixture.StartWithStreamConfigAsync(new StreamConfig { Name = "REP", Subjects = ["rep.>"], Replicas = 3, }); var info = await fx.RequestLocalAsync("$JS.API.STREAM.INFO.REP", "{}"); info.StreamInfo!.Config.Replicas.ShouldBe(3); } // Go: TestJetStreamMaxMsgSize config [Fact] public async Task Max_msg_size_config_roundtrips() { await using var fx = await JetStreamApiFixture.StartWithStreamConfigAsync(new StreamConfig { Name = "MMS", Subjects = ["mms.>"], MaxMsgSize = 4096, }); var info = await fx.RequestLocalAsync("$JS.API.STREAM.INFO.MMS", "{}"); info.StreamInfo!.Config.MaxMsgSize.ShouldBe(4096); } // Go: TestJetStreamStreamUpdateSubjectsOverlapOthers server/jetstream_test.go [Fact] public async Task Update_stream_subjects_preserves_existing_data() { await using var fx = await JetStreamApiFixture.StartWithStreamAsync("USP", "usp.v1.*"); _ = await fx.PublishAndGetAckAsync("usp.v1.x", "old-data"); _ = await fx.RequestLocalAsync( "$JS.API.STREAM.UPDATE.USP", """{"name":"USP","subjects":["usp.v2.*"]}"""); var state = await fx.GetStreamStateAsync("USP"); state.Messages.ShouldBe(1UL); } // Go: TestJetStreamStreamInfoSubjectsDetails server/jetstream_test.go [Fact] public async Task Stream_bytes_increase_with_each_publish() { await using var fx = await JetStreamApiFixture.StartWithStreamAsync("SBI", "sbi.>"); var state0 = await fx.GetStreamStateAsync("SBI"); state0.Bytes.ShouldBe(0UL); _ = await fx.PublishAndGetAckAsync("sbi.x", "data"); var state1 = await fx.GetStreamStateAsync("SBI"); var bytes1 = state1.Bytes; bytes1.ShouldBeGreaterThan(0UL); _ = await fx.PublishAndGetAckAsync("sbi.y", "more-data"); var state2 = await fx.GetStreamStateAsync("SBI"); state2.Bytes.ShouldBeGreaterThan(bytes1); } }