Files
Joseph Doherty 78b4bc2486 refactor: extract NATS.Server.JetStream.Tests project
Move 225 JetStream-related test files from NATS.Server.Tests into a
dedicated NATS.Server.JetStream.Tests project. This includes root-level
JetStream*.cs files, storage test files (FileStore, MemStore,
StreamStoreContract), and the full JetStream/ subfolder tree (Api,
Cluster, Consumers, MirrorSource, Snapshots, Storage, Streams).

Updated all namespaces, added InternalsVisibleTo, registered in the
solution file, and added the JETSTREAM_INTEGRATION_MATRIX define.
2026-03-12 15:58:10 -04:00

142 lines
6.3 KiB
C#

using NATS.Server.TestUtilities;
// Ported from golang/nats-server/server/jetstream_test.go
// Reference Go tests: TestJetStreamAddStream, TestJetStreamAddStreamSameConfigOK,
// TestJetStreamUpdateStream, TestJetStreamStreamPurge, TestJetStreamDeleteMsg
namespace NATS.Server.JetStream.Tests;
public class StreamLifecycleTests
{
// Go ref: TestJetStreamAddStream (line 178)
// After addStream the stream exists with zero messages and the correct config.
// Verifies the CREATE API response and a subsequent INFO lookup both reflect
// the initial empty state with the right config.
[Fact]
public async Task Stream_create_returns_config_and_zero_message_state()
{
// Go ref: TestJetStreamAddStream — after addStream the stream exists with
// zero messages and the correct config. Here we verify the CREATE API
// response shape and a subsequent INFO lookup both reflect the initial state.
await using var fx = await JetStreamApiFixture.StartWithStreamAsync("EVENTS", "events.*");
var info = await fx.RequestLocalAsync("$JS.API.STREAM.INFO.EVENTS", "{}");
info.Error.ShouldBeNull();
info.StreamInfo.ShouldNotBeNull();
info.StreamInfo.Config.Name.ShouldBe("EVENTS");
info.StreamInfo.Config.Subjects.ShouldContain("events.*");
info.StreamInfo.State.Messages.ShouldBe((ulong)0);
}
// Go ref: TestJetStreamAddStreamSameConfigOK (line 701)
// Verifies that creating a stream with the same config twice is idempotent —
// the Go test calls acc.addStream twice with the identical mconfig and expects
// no error on the second call.
[Fact]
public async Task Stream_create_with_same_config_is_idempotent()
{
// StartWithStreamAsync creates the stream once internally.
// Call CREATE again with the identical config on the same fixture instance.
await using var fx = await JetStreamApiFixture.StartWithStreamAsync("ORDERS", "orders.*");
// Second call with identical config must also succeed (no error).
var second = await fx.RequestLocalAsync(
"$JS.API.STREAM.CREATE.ORDERS",
"{\"name\":\"ORDERS\",\"subjects\":[\"orders.*\"]}");
second.Error.ShouldBeNull();
second.StreamInfo.ShouldNotBeNull();
second.StreamInfo.Config.Name.ShouldBe("ORDERS");
}
// Go ref: TestJetStreamUpdateStream (line 6409)
// Verifies that updating a stream's subjects succeeds and that the updated
// config is reflected in a subsequent INFO call. The Go test updates MaxMsgs
// and verifies mset.config().MaxMsgs matches the updated value.
[Fact]
public async Task Stream_update_replaces_subjects_and_max_msgs()
{
await using var fx = await JetStreamApiFixture.StartWithStreamAsync("ORDERS", "orders.*");
// Publish a few messages before the update so we can verify state is preserved.
_ = await fx.PublishAndGetAckAsync("orders.created", "msg1");
_ = await fx.PublishAndGetAckAsync("orders.created", "msg2");
var stateBefore = await fx.GetStreamStateAsync("ORDERS");
stateBefore.Messages.ShouldBe((ulong)2);
// Update: change subjects and raise max_msgs limit.
var update = await fx.RequestLocalAsync(
"$JS.API.STREAM.UPDATE.ORDERS",
"{\"name\":\"ORDERS\",\"subjects\":[\"orders.v2.*\"],\"max_msgs\":100}");
update.Error.ShouldBeNull();
update.StreamInfo.ShouldNotBeNull();
update.StreamInfo.Config.Subjects.ShouldContain("orders.v2.*");
update.StreamInfo.Config.MaxMsgs.ShouldBe(100);
// INFO reflects updated config.
var info = await fx.RequestLocalAsync("$JS.API.STREAM.INFO.ORDERS", "{}");
info.Error.ShouldBeNull();
info.StreamInfo!.Config.Subjects.ShouldContain("orders.v2.*");
}
// Go ref: TestJetStreamStreamPurge (line 4182)
// Verifies that purging a stream removes all messages and resets the state,
// matching the Go assertion: state.Msgs == 0 after mset.purge(nil), and that
// publishing a new message afterwards records Msgs == 1.
[Fact]
public async Task Stream_purge_clears_all_messages_and_resets_state()
{
await using var fx = await JetStreamApiFixture.StartWithStreamAsync("DC", "dc.*");
// Publish 5 messages.
for (var i = 0; i < 5; i++)
_ = await fx.PublishAndGetAckAsync("dc.msg", $"payload-{i}");
var beforePurge = await fx.GetStreamStateAsync("DC");
beforePurge.Messages.ShouldBe((ulong)5);
// Purge via the API.
var purge = await fx.RequestLocalAsync("$JS.API.STREAM.PURGE.DC", "{}");
purge.Success.ShouldBeTrue();
purge.Error.ShouldBeNull();
var afterPurge = await fx.GetStreamStateAsync("DC");
afterPurge.Messages.ShouldBe((ulong)0);
// Publishing a new message after purge should be seq 1 relative perspective
// (the store starts fresh) — state.Messages rises to 1.
var ack = await fx.PublishAndGetAckAsync("dc.msg", "after-purge");
ack.Stream.ShouldBe("DC");
var afterPublish = await fx.GetStreamStateAsync("DC");
afterPublish.Messages.ShouldBe((ulong)1);
}
// Go ref: TestJetStreamUpdateStream (line 6409) — deletion side,
// TestJetStreamAddStream (line 229) — mset.delete() check.
// Verifies that deleting a stream succeeds and that a subsequent INFO returns
// a not-found error, matching the Go behaviour where deleted streams are no
// longer accessible via the API.
[Fact]
public async Task Stream_delete_removes_stream_and_info_returns_not_found()
{
await using var fx = await JetStreamApiFixture.StartWithStreamAsync("ORDERS", "orders.*");
_ = await fx.PublishAndGetAckAsync("orders.placed", "order-1");
var stateBefore = await fx.GetStreamStateAsync("ORDERS");
stateBefore.Messages.ShouldBe((ulong)1);
var delete = await fx.RequestLocalAsync("$JS.API.STREAM.DELETE.ORDERS", "{}");
delete.Success.ShouldBeTrue();
delete.Error.ShouldBeNull();
// Subsequent INFO must return an error (stream no longer exists).
var info = await fx.RequestLocalAsync("$JS.API.STREAM.INFO.ORDERS", "{}");
info.Error.ShouldNotBeNull();
info.StreamInfo.ShouldBeNull();
}
}