Files
natsdotnet/tests/NATS.Server.Tests/JetStream/JetStreamStreamFeatureTests.cs
Joseph Doherty 5c608f07e3 Move shared fixtures and parity utilities to TestUtilities project
- git mv JetStreamApiFixture, JetStreamClusterFixture, LeafFixture,
  Parity utilities, and TestData from NATS.Server.Tests to
  NATS.Server.TestUtilities
- Update namespaces to NATS.Server.TestUtilities (and .Parity sub-ns)
- Make fixture classes public for cross-project access
- Add PollHelper to replace Task.Delay polling with SemaphoreSlim waits
- Refactor all fixture polling loops to use PollHelper
- Add 'using NATS.Server.TestUtilities;' to ~75 consuming test files
- Rename local fixture duplicates (MetaGroupTestFixture,
  LeafProtocolTestFixture) to avoid shadowing shared fixtures
- Remove TestData entry from NATS.Server.Tests.csproj (moved to
  TestUtilities)
2026-03-12 14:45:21 -04:00

544 lines
19 KiB
C#

// Ported from golang/nats-server/server/jetstream_test.go
// Stream features: mirroring, sourcing, direct get, sealed streams, message TTL,
// subject transforms, discard policies
using System.Text;
using NATS.Server.JetStream;
using NATS.Server.JetStream.Api;
using NATS.Server.JetStream.Models;
using NATS.Server.TestUtilities;
namespace NATS.Server.Tests.JetStream;
public class JetStreamStreamFeatureTests
{
// Go: TestJetStreamMirrorBasics server/jetstream_test.go
[Fact]
public async Task Mirror_stream_replicates_published_messages()
{
await using var fx = await JetStreamApiFixture.StartWithMirrorSetupAsync();
_ = await fx.PublishAndGetAckAsync("orders.created", "order-1");
await fx.WaitForMirrorSyncAsync("ORDERS_MIRROR");
var state = await fx.GetStreamStateAsync("ORDERS_MIRROR");
state.Messages.ShouldBeGreaterThan(0UL);
}
// Go: TestJetStreamMirrorBasics — mirror config
[Fact]
public async Task Mirror_stream_info_shows_mirror_config()
{
await using var fx = await JetStreamApiFixture.StartWithMirrorSetupAsync();
var info = await fx.RequestLocalAsync("$JS.API.STREAM.INFO.ORDERS_MIRROR", "{}");
info.Error.ShouldBeNull();
info.StreamInfo!.Config.Mirror.ShouldBe("ORDERS");
}
// Go: TestJetStreamSourceBasics server/jetstream_test.go
[Fact]
public async Task Source_stream_aggregates_from_multiple_origins()
{
await using var fx = await JetStreamApiFixture.StartWithMultipleSourcesAsync();
await fx.PublishToSourceAsync("SRC1", "a.msg", "from-src1");
await fx.PublishToSourceAsync("SRC2", "b.msg", "from-src2");
var state = await fx.GetStreamStateAsync("AGG");
// AGG sources from SRC1 and SRC2
state.Messages.ShouldBeGreaterThanOrEqualTo(0UL);
}
// Go: TestJetStreamSourceBasics — sources list config
[Fact]
public async Task Source_stream_config_lists_sources()
{
await using var fx = await JetStreamApiFixture.StartWithMultipleSourcesAsync();
var info = await fx.RequestLocalAsync("$JS.API.STREAM.INFO.AGG", "{}");
info.Error.ShouldBeNull();
info.StreamInfo!.Config.Sources.Count.ShouldBe(2);
info.StreamInfo.Config.Sources.Select(s => s.Name).ShouldContain("SRC1");
info.StreamInfo.Config.Sources.Select(s => s.Name).ShouldContain("SRC2");
}
// Go: TestJetStreamDirectMsgGet server/jetstream_test.go
[Fact]
public async Task Direct_get_retrieves_message_by_sequence()
{
await using var fx = await JetStreamApiFixture.StartWithStreamAsync("DG", "dg.>");
_ = await fx.PublishAndGetAckAsync("dg.first", "payload1");
var ack2 = await fx.PublishAndGetAckAsync("dg.second", "payload2");
var resp = await fx.RequestLocalAsync(
"$JS.API.DIRECT.GET.DG",
$$"""{ "seq": {{ack2.Seq}} }""");
resp.DirectMessage.ShouldNotBeNull();
resp.DirectMessage!.Sequence.ShouldBe(ack2.Seq);
resp.DirectMessage.Subject.ShouldBe("dg.second");
resp.DirectMessage.Payload.ShouldBe("payload2");
}
// Go: TestJetStreamDirectMsgGetNext server/jetstream_test.go
[Fact]
public async Task Direct_get_first_sequence()
{
await using var fx = await JetStreamApiFixture.StartWithStreamAsync("DGF", "dgf.>");
var ack = await fx.PublishAndGetAckAsync("dgf.x", "first");
_ = await fx.PublishAndGetAckAsync("dgf.x", "second");
var resp = await fx.RequestLocalAsync(
"$JS.API.DIRECT.GET.DGF",
$$"""{ "seq": {{ack.Seq}} }""");
resp.DirectMessage!.Payload.ShouldBe("first");
}
// Go: TestJetStreamDirectGetBySubject server/jetstream_test.go
[Fact]
public async Task Direct_get_non_existent_sequence_returns_error()
{
await using var fx = await JetStreamApiFixture.StartWithStreamAsync("DGN", "dgn.>");
_ = await fx.PublishAndGetAckAsync("dgn.x", "data");
var resp = await fx.RequestLocalAsync("$JS.API.DIRECT.GET.DGN", """{"seq":999}""");
resp.Error.ShouldNotBeNull();
}
// Go: TestJetStreamRecoverSealedAfterServerRestart server/jetstream_test.go
[Fact]
public async Task Sealed_stream_allows_reads_but_not_writes()
{
var config = new StreamConfig
{
Name = "SEALED",
Subjects = ["sealed.>"],
};
await using var fx = await JetStreamApiFixture.StartWithStreamConfigAsync(config);
// Publish before sealing
_ = await fx.PublishAndGetAckAsync("sealed.x", "data");
// Now update to sealed
var update = await fx.RequestLocalAsync(
"$JS.API.STREAM.UPDATE.SEALED",
"""{"name":"SEALED","subjects":["sealed.>"],"sealed":true}""");
update.Error.ShouldBeNull();
// Verify we can still read
var info = await fx.RequestLocalAsync("$JS.API.STREAM.INFO.SEALED", "{}");
info.StreamInfo!.State.Messages.ShouldBe(1UL);
// Purge should fail on sealed stream
var purge = await fx.RequestLocalAsync("$JS.API.STREAM.PURGE.SEALED", "{}");
purge.Success.ShouldBeFalse();
}
// Go: TestJetStreamMaxMsgsPerSubjectWithDiscardNew server/jetstream_test.go
[Fact]
public async Task Max_msgs_per_subject_with_discard_old()
{
await using var fx = await JetStreamApiFixture.StartWithStreamConfigAsync(new StreamConfig
{
Name = "MPSDO",
Subjects = ["mpsdo.>"],
MaxMsgsPer = 2,
Discard = DiscardPolicy.Old,
});
_ = await fx.PublishAndGetAckAsync("mpsdo.a", "a1");
_ = await fx.PublishAndGetAckAsync("mpsdo.a", "a2");
_ = await fx.PublishAndGetAckAsync("mpsdo.a", "a3");
var state = await fx.GetStreamStateAsync("MPSDO");
state.Messages.ShouldBeLessThanOrEqualTo(2UL);
}
// Go: TestJetStreamStreamStorageTrackingAndLimits server/jetstream_test.go:4931
[Fact]
public async Task Max_msgs_enforces_fifo_eviction()
{
await using var fx = await JetStreamApiFixture.StartWithStreamAsync("FIFO", "fifo.>", maxMsgs: 3);
for (var i = 0; i < 6; i++)
_ = await fx.PublishAndGetAckAsync("fifo.x", $"msg-{i}");
var state = await fx.GetStreamStateAsync("FIFO");
state.Messages.ShouldBeLessThanOrEqualTo(3UL);
// Latest messages should be kept
state.LastSeq.ShouldBe(6UL);
}
// Go: TestJetStreamInterestRetentionStream server/jetstream_test.go:4336
[Fact]
public async Task Interest_retention_stream_basic_flow()
{
await using var fx = await JetStreamApiFixture.StartWithStreamConfigAsync(new StreamConfig
{
Name = "IRS",
Subjects = ["irs.>"],
Retention = RetentionPolicy.Interest,
});
_ = await fx.CreateConsumerAsync("IRS", "C1", "irs.>");
_ = await fx.PublishAndGetAckAsync("irs.x", "data");
var state = await fx.GetStreamStateAsync("IRS");
state.Messages.ShouldBe(1UL);
}
// Go: TestJetStreamBasicWorkQueue server/jetstream_test.go:937
[Fact]
public async Task Workqueue_retention_stream_basic_flow()
{
await using var fx = await JetStreamApiFixture.StartWithStreamConfigAsync(new StreamConfig
{
Name = "WQR",
Subjects = ["wqr.>"],
Retention = RetentionPolicy.WorkQueue,
MaxConsumers = 1,
});
_ = await fx.CreateConsumerAsync("WQR", "C1", "wqr.>",
ackPolicy: AckPolicy.None);
_ = await fx.PublishAndGetAckAsync("wqr.x", "data");
var state = await fx.GetStreamStateAsync("WQR");
state.Messages.ShouldBeGreaterThanOrEqualTo(0UL);
}
// Go: TestJetStreamDenyDelete — deny_delete prevents message deletion
[Fact]
public async Task Deny_delete_stream_preserves_all_messages()
{
await using var fx = await JetStreamApiFixture.StartWithStreamConfigAsync(new StreamConfig
{
Name = "DD",
Subjects = ["dd.>"],
DenyDelete = true,
});
var ack = await fx.PublishAndGetAckAsync("dd.x", "data");
var del = await fx.RequestLocalAsync(
"$JS.API.STREAM.MSG.DELETE.DD",
$$"""{ "seq": {{ack.Seq}} }""");
del.Success.ShouldBeFalse();
var state = await fx.GetStreamStateAsync("DD");
state.Messages.ShouldBe(1UL);
}
// Go: TestJetStreamAllowDirectAfterUpdate server/jetstream_test.go
[Fact]
public async Task Allow_direct_enables_direct_get()
{
await using var fx = await JetStreamApiFixture.StartWithStreamConfigAsync(new StreamConfig
{
Name = "ADG",
Subjects = ["adg.>"],
AllowDirect = true,
});
var ack = await fx.PublishAndGetAckAsync("adg.x", "direct-data");
var resp = await fx.RequestLocalAsync(
"$JS.API.DIRECT.GET.ADG",
$$"""{ "seq": {{ack.Seq}} }""");
resp.DirectMessage.ShouldNotBeNull();
resp.DirectMessage!.Payload.ShouldBe("direct-data");
}
// Go: TestJetStreamSnapshotsAPI — snapshot stream with messages
[Fact]
public async Task Snapshot_preserves_message_count()
{
await using var fx = await JetStreamApiFixture.StartWithStreamAsync("SNP", "snp.>");
for (var i = 0; i < 5; i++)
_ = await fx.PublishAndGetAckAsync("snp.x", $"msg-{i}");
var snap = await fx.RequestLocalAsync("$JS.API.STREAM.SNAPSHOT.SNP", "{}");
snap.Snapshot.ShouldNotBeNull();
snap.Snapshot!.Payload.ShouldNotBeNullOrWhiteSpace();
}
// Go: TestJetStreamSnapshotsAPI — snapshot non-existent
[Fact]
public async Task Snapshot_non_existent_stream_returns_error()
{
await using var fx = await JetStreamApiFixture.StartWithStreamAsync("X", "x.>");
var snap = await fx.RequestLocalAsync("$JS.API.STREAM.SNAPSHOT.NOPE", "{}");
snap.Error.ShouldNotBeNull();
}
// Go: TestJetStreamInvalidRestoreRequests server/jetstream_test.go
[Fact]
public async Task Restore_with_invalid_payload_returns_error()
{
await using var fx = await JetStreamApiFixture.StartWithStreamAsync("INV", "inv.>");
var restore = await fx.RequestLocalAsync("$JS.API.STREAM.RESTORE.INV", "");
restore.Error.ShouldNotBeNull();
}
// Go: TestJetStreamMirrorUpdatePreventsSubjects server/jetstream_test.go:9412
// Mirror streams cannot have subjects — the Go test verifies that attempting
// to update a mirror with subjects returns an error.
[Fact]
public async Task Mirror_stream_cannot_have_subjects()
{
await using var fx = await JetStreamApiFixture.StartWithMirrorSetupAsync();
// Mirror streams should have empty subjects (Go: "stream mirrors can not contain subjects")
var info = await fx.RequestLocalAsync("$JS.API.STREAM.INFO.ORDERS_MIRROR", "{}");
info.StreamInfo!.Config.Subjects.ShouldBeEmpty();
}
// Go: TestJetStreamStreamSubjectsOverlap server/jetstream_test.go
[Fact]
public async Task Streams_with_wildcard_subjects_capture_matching()
{
await using var fx = await JetStreamApiFixture.StartWithStreamAsync("WS", "events.>");
var ack1 = await fx.PublishAndGetAckAsync("events.click", "1");
ack1.Stream.ShouldBe("WS");
var ack2 = await fx.PublishAndGetAckAsync("events.view.page", "2");
ack2.Stream.ShouldBe("WS");
}
// Go: TestJetStreamStreamTransformOverlap server/jetstream_test.go
[Fact]
public async Task Stream_with_star_wildcard_subject()
{
await using var fx = await JetStreamApiFixture.StartWithStreamAsync("STAR", "star.*");
var ack1 = await fx.PublishAndGetAckAsync("star.one", "1");
ack1.ErrorCode.ShouldBeNull();
// star.one.two should not match star.*
var ack2 = await fx.PublishAndGetAckAsync("star.one.two", "2", expectError: true);
ack2.ErrorCode.ShouldNotBeNull();
}
// Go: TestJetStreamDuplicateWindowMs
[Fact]
public async Task Duplicate_window_config_roundtrips()
{
await using var fx = await JetStreamApiFixture.StartWithStreamConfigAsync(new StreamConfig
{
Name = "DWC",
Subjects = ["dwc.>"],
DuplicateWindowMs = 5000,
});
var info = await fx.RequestLocalAsync("$JS.API.STREAM.INFO.DWC", "{}");
info.StreamInfo!.Config.DuplicateWindowMs.ShouldBe(5000);
}
// Go: TestJetStreamMaxConsumers server/jetstream_test.go:619
[Fact]
public async Task Max_consumers_config_roundtrips()
{
await using var fx = await JetStreamApiFixture.StartWithStreamConfigAsync(new StreamConfig
{
Name = "MC",
Subjects = ["mc.>"],
MaxConsumers = 5,
});
var info = await fx.RequestLocalAsync("$JS.API.STREAM.INFO.MC", "{}");
info.StreamInfo!.Config.MaxConsumers.ShouldBe(5);
}
// Go: TestJetStreamAddStreamDiscardNew — discard new config
[Fact]
public async Task Discard_new_config_roundtrips()
{
await using var fx = await JetStreamApiFixture.StartWithStreamConfigAsync(new StreamConfig
{
Name = "DNC",
Subjects = ["dnc.>"],
Discard = DiscardPolicy.New,
});
var info = await fx.RequestLocalAsync("$JS.API.STREAM.INFO.DNC", "{}");
info.StreamInfo!.Config.Discard.ShouldBe(DiscardPolicy.New);
}
// Go: TestJetStreamAddStreamDiscardNew — discard old (default) config
[Fact]
public async Task Discard_old_is_default()
{
await using var fx = await JetStreamApiFixture.StartWithStreamAsync("DOC", "doc.>");
var info = await fx.RequestLocalAsync("$JS.API.STREAM.INFO.DOC", "{}");
info.StreamInfo!.Config.Discard.ShouldBe(DiscardPolicy.Old);
}
// Go: TestJetStreamRollup server/jetstream_test.go
[Fact]
public async Task Multiple_subjects_tracked_independently()
{
await using var fx = await JetStreamApiFixture.StartWithStreamConfigAsync(new StreamConfig
{
Name = "MST",
Subjects = ["mst.>"],
MaxMsgsPer = 1,
});
_ = await fx.PublishAndGetAckAsync("mst.a", "a1");
_ = await fx.PublishAndGetAckAsync("mst.b", "b1");
_ = await fx.PublishAndGetAckAsync("mst.a", "a2");
_ = await fx.PublishAndGetAckAsync("mst.b", "b2");
var state = await fx.GetStreamStateAsync("MST");
// Each subject keeps 1 message: mst.a -> a2, mst.b -> b2
state.Messages.ShouldBeLessThanOrEqualTo(2UL);
}
// Go: TestJetStreamMirrorBasics — mirror with no messages
[Fact]
public async Task Mirror_stream_with_no_origin_messages()
{
await using var fx = await JetStreamApiFixture.StartWithMirrorSetupAsync();
// Don't publish anything; mirror should exist but be empty
var state = await fx.GetStreamStateAsync("ORDERS_MIRROR");
state.Messages.ShouldBe(0UL);
}
// Go: TestJetStreamSourceBasics — source with no messages
[Fact]
public async Task Source_stream_with_no_origin_messages()
{
await using var fx = await JetStreamApiFixture.StartWithMultipleSourcesAsync();
var state = await fx.GetStreamStateAsync("AGG");
state.Messages.ShouldBe(0UL);
}
// Go: TestJetStreamPurgeExAndAccounting server/jetstream_test.go
[Fact]
public async Task Delete_specific_message_preserves_others()
{
await using var fx = await JetStreamApiFixture.StartWithStreamAsync("DSP", "dsp.>");
var ack1 = await fx.PublishAndGetAckAsync("dsp.a", "msg1");
_ = await fx.PublishAndGetAckAsync("dsp.b", "msg2");
var ack3 = await fx.PublishAndGetAckAsync("dsp.c", "msg3");
// Delete middle message
var del = await fx.RequestLocalAsync(
"$JS.API.STREAM.MSG.DELETE.DSP",
$$"""{ "seq": {{ack1.Seq + 1}} }""");
del.Success.ShouldBeTrue();
var state = await fx.GetStreamStateAsync("DSP");
state.Messages.ShouldBe(2UL);
}
// Go: TestJetStreamStreamPurge — purge non-existent stream
[Fact]
public async Task Purge_non_existent_stream_fails()
{
await using var fx = await JetStreamApiFixture.StartWithStreamAsync("X", "x.>");
var purge = await fx.RequestLocalAsync("$JS.API.STREAM.PURGE.NOTEXIST", "{}");
purge.Success.ShouldBeFalse();
}
// Go: TestJetStreamMaxBytesIgnored — max bytes config
[Fact]
public async Task Max_bytes_config_roundtrips()
{
await using var fx = await JetStreamApiFixture.StartWithStreamConfigAsync(new StreamConfig
{
Name = "MBC",
Subjects = ["mbc.>"],
MaxBytes = 1024,
});
var info = await fx.RequestLocalAsync("$JS.API.STREAM.INFO.MBC", "{}");
info.StreamInfo!.Config.MaxBytes.ShouldBe(1024);
}
// Go: TestJetStreamMaxAgeMs — max age config
[Fact]
public async Task Max_age_config_roundtrips()
{
await using var fx = await JetStreamApiFixture.StartWithStreamConfigAsync(new StreamConfig
{
Name = "MAC",
Subjects = ["mac.>"],
MaxAgeMs = 60_000,
});
var info = await fx.RequestLocalAsync("$JS.API.STREAM.INFO.MAC", "{}");
info.StreamInfo!.Config.MaxAgeMs.ShouldBe(60_000);
}
// Go: TestJetStreamReplicas config
[Fact]
public async Task Replicas_config_roundtrips()
{
await using var fx = await JetStreamApiFixture.StartWithStreamConfigAsync(new StreamConfig
{
Name = "REP",
Subjects = ["rep.>"],
Replicas = 3,
});
var info = await fx.RequestLocalAsync("$JS.API.STREAM.INFO.REP", "{}");
info.StreamInfo!.Config.Replicas.ShouldBe(3);
}
// Go: TestJetStreamMaxMsgSize config
[Fact]
public async Task Max_msg_size_config_roundtrips()
{
await using var fx = await JetStreamApiFixture.StartWithStreamConfigAsync(new StreamConfig
{
Name = "MMS",
Subjects = ["mms.>"],
MaxMsgSize = 4096,
});
var info = await fx.RequestLocalAsync("$JS.API.STREAM.INFO.MMS", "{}");
info.StreamInfo!.Config.MaxMsgSize.ShouldBe(4096);
}
// Go: TestJetStreamStreamUpdateSubjectsOverlapOthers server/jetstream_test.go
[Fact]
public async Task Update_stream_subjects_preserves_existing_data()
{
await using var fx = await JetStreamApiFixture.StartWithStreamAsync("USP", "usp.v1.*");
_ = await fx.PublishAndGetAckAsync("usp.v1.x", "old-data");
_ = await fx.RequestLocalAsync(
"$JS.API.STREAM.UPDATE.USP",
"""{"name":"USP","subjects":["usp.v2.*"]}""");
var state = await fx.GetStreamStateAsync("USP");
state.Messages.ShouldBe(1UL);
}
// Go: TestJetStreamStreamInfoSubjectsDetails server/jetstream_test.go
[Fact]
public async Task Stream_bytes_increase_with_each_publish()
{
await using var fx = await JetStreamApiFixture.StartWithStreamAsync("SBI", "sbi.>");
var state0 = await fx.GetStreamStateAsync("SBI");
state0.Bytes.ShouldBe(0UL);
_ = await fx.PublishAndGetAckAsync("sbi.x", "data");
var state1 = await fx.GetStreamStateAsync("SBI");
var bytes1 = state1.Bytes;
bytes1.ShouldBeGreaterThan(0UL);
_ = await fx.PublishAndGetAckAsync("sbi.y", "more-data");
var state2 = await fx.GetStreamStateAsync("SBI");
state2.Bytes.ShouldBeGreaterThan(bytes1);
}
}