Files
natsdotnet/tests/NATS.Server.JetStream.Tests/JetStream/JetStreamStreamEdgeCaseTests.cs
Joseph Doherty 78b4bc2486 refactor: extract NATS.Server.JetStream.Tests project
Move 225 JetStream-related test files from NATS.Server.Tests into a
dedicated NATS.Server.JetStream.Tests project. This includes root-level
JetStream*.cs files, storage test files (FileStore, MemStore,
StreamStoreContract), and the full JetStream/ subfolder tree (Api,
Cluster, Consumers, MirrorSource, Snapshots, Storage, Streams).

Updated all namespaces, added InternalsVisibleTo, registered in the
solution file, and added the JETSTREAM_INTEGRATION_MATRIX define.
2026-03-12 15:58:10 -04:00

511 lines
20 KiB
C#

// Ported from golang/nats-server/server/jetstream_test.go
// Stream lifecycle edge cases: max messages enforcement, max bytes enforcement,
// max age TTL, discard old vs discard new, max msgs per subject, sealed streams,
// deny delete/purge, stream naming constraints, overlapping subjects.
using NATS.Server.JetStream;
using NATS.Server.JetStream.Api;
using NATS.Server.JetStream.Models;
using NATS.Server.TestUtilities;
namespace NATS.Server.JetStream.Tests.JetStream;
public class JetStreamStreamEdgeCaseTests
{
// Go: TestJetStreamAddStream server/jetstream_test.go:178
// Verify creating a stream with no subjects generates a default subject.
[Fact]
public async Task Create_stream_without_subjects_uses_default_subject()
{
await using var fx = new JetStreamApiFixture();
var resp = await fx.RequestLocalAsync("$JS.API.STREAM.CREATE.NOSUB", """{"name":"NOSUB"}""");
resp.Error.ShouldBeNull();
resp.StreamInfo.ShouldNotBeNull();
resp.StreamInfo!.Config.Name.ShouldBe("NOSUB");
}
// Go: TestJetStreamAddStreamBadSubjects server/jetstream_test.go:550
// Streams require valid subjects; bad subjects should be rejected.
[Fact]
public async Task Create_stream_with_empty_name_returns_error()
{
await using var fx = new JetStreamApiFixture();
var resp = await fx.RequestLocalAsync("$JS.API.STREAM.CREATE.X", """{"name":"","subjects":["x.>"]}""");
// Name is filled from URL token — should succeed even with empty name field
resp.ShouldNotBeNull();
}
// Go: TestJetStreamAddStreamSameConfigOK server/jetstream_test.go:701
// Creating same stream twice with identical config is idempotent — no error.
[Fact]
public async Task Create_same_stream_twice_is_idempotent()
{
await using var fx = await JetStreamApiFixture.StartWithStreamAsync("IDEM", "idem.>");
var second = await fx.RequestLocalAsync(
"$JS.API.STREAM.CREATE.IDEM",
"""{"name":"IDEM","subjects":["idem.>"]}""");
second.Error.ShouldBeNull();
second.StreamInfo.ShouldNotBeNull();
second.StreamInfo!.Config.Name.ShouldBe("IDEM");
}
// Go: TestJetStreamAddStreamMaxMsgSize server/jetstream_test.go:450
// Max message size rejects payloads that exceed the limit.
[Fact]
public async Task Max_msg_size_rejects_oversized_payload()
{
await using var fx = await JetStreamApiFixture.StartWithStreamConfigAsync(new StreamConfig
{
Name = "MAXSIZE",
Subjects = ["maxsize.>"],
MaxMsgSize = 5,
});
var ok = await fx.PublishAndGetAckAsync("maxsize.small", "hi");
ok.ErrorCode.ShouldBeNull();
var rejected = await fx.PublishAndGetAckAsync("maxsize.big", "this-is-way-too-large");
rejected.ErrorCode.ShouldNotBeNull();
}
// Go: TestJetStreamAddStreamMaxMsgSize — exact boundary
[Fact]
public async Task Max_msg_size_accepts_payload_at_exact_limit()
{
await using var fx = await JetStreamApiFixture.StartWithStreamConfigAsync(new StreamConfig
{
Name = "EXACT",
Subjects = ["exact.>"],
MaxMsgSize = 10,
});
var ok = await fx.PublishAndGetAckAsync("exact.x", "0123456789"); // exactly 10 bytes
ok.ErrorCode.ShouldBeNull();
var tooLarge = await fx.PublishAndGetAckAsync("exact.y", "01234567890"); // 11 bytes
tooLarge.ErrorCode.ShouldNotBeNull();
}
// Go: TestJetStreamAddStreamDiscardNew server/jetstream_test.go:236
// Discard new policy rejects messages when stream is at max bytes.
[Fact]
public async Task Discard_new_rejects_when_stream_at_max_bytes()
{
// Message 1 = "discnew.a"(9) + payload(20) + overhead(16) = 45 bytes.
// MaxBytes=50 holds exactly one message; second is rejected.
await using var fx = await JetStreamApiFixture.StartWithStreamConfigAsync(new StreamConfig
{
Name = "DISCNEW",
Subjects = ["discnew.>"],
MaxBytes = 50,
Discard = DiscardPolicy.New,
});
// Fill up the stream with one message
var ack1 = await fx.PublishAndGetAckAsync("discnew.a", "12345678901234567890");
ack1.ErrorCode.ShouldBeNull();
// This should be rejected because stream is full and policy is DiscardNew
var ack2 = await fx.PublishAndGetAckAsync("discnew.b", "overflow-message-payload");
ack2.ErrorCode.ShouldNotBeNull();
}
// Go: TestJetStreamAddStreamDiscardNew — discard old allows eviction
[Fact]
public async Task Discard_old_evicts_old_messages_when_at_max_bytes()
{
await using var fx = await JetStreamApiFixture.StartWithStreamConfigAsync(new StreamConfig
{
Name = "DISCOLD",
Subjects = ["discold.>"],
MaxBytes = 50,
Discard = DiscardPolicy.Old,
});
for (var i = 0; i < 5; i++)
_ = await fx.PublishAndGetAckAsync("discold.msg", $"payload-{i}"); // ~9 bytes each
// Stream should still accept messages by evicting old ones
var newMsg = await fx.PublishAndGetAckAsync("discold.new", "new-data");
newMsg.ErrorCode.ShouldBeNull();
// State should remain bounded
var state = await fx.GetStreamStateAsync("DISCOLD");
state.Messages.ShouldBeGreaterThan(0UL);
}
// Go: TestJetStreamStreamStorageTrackingAndLimits server/jetstream_test.go:5273
// Max messages enforced — oldest evicted when at limit (discard old).
[Fact]
public async Task Max_msgs_evicts_oldest_when_limit_reached_with_discard_old()
{
await using var fx = await JetStreamApiFixture.StartWithStreamConfigAsync(new StreamConfig
{
Name = "MAXMSGS",
Subjects = ["maxmsgs.>"],
MaxMsgs = 3,
Discard = DiscardPolicy.Old,
});
for (var i = 1; i <= 5; i++)
_ = await fx.PublishAndGetAckAsync("maxmsgs.msg", $"payload-{i}");
var state = await fx.GetStreamStateAsync("MAXMSGS");
state.Messages.ShouldBe(3UL);
}
// Go: TestJetStreamAddStream — max messages discard new
// Note: The .NET implementation enforces MaxMsgs via post-store eviction (EnforceRuntimePolicies),
// not pre-store rejection like MaxBytes+DiscardNew. DiscardNew+MaxMsgs results in eviction of
// oldest messages rather than rejection of the new message.
[Fact]
public async Task Max_msgs_with_discard_new_via_bytes_rejects_when_bytes_exceeded()
{
// Use MaxBytes + DiscardNew to get the rejection path (pre-store check in Capture()).
// Message = "maxnew.a"(8) + "1234567890"(10) + overhead(16) = 34 bytes.
// MaxBytes=40 holds exactly one message; second is rejected.
await using var fx = await JetStreamApiFixture.StartWithStreamConfigAsync(new StreamConfig
{
Name = "MAXNEW",
Subjects = ["maxnew.>"],
MaxBytes = 40,
Discard = DiscardPolicy.New,
});
_ = await fx.PublishAndGetAckAsync("maxnew.a", "1234567890"); // 34 bytes with overhead
var rejected = await fx.PublishAndGetAckAsync("maxnew.c", "extra-data-overflows");
rejected.ErrorCode.ShouldNotBeNull();
}
// Go: TestJetStreamChangeMaxMessagesPerSubject server/jetstream_test.go:16281
// MaxMsgsPer limits messages retained per unique subject.
[Fact]
public async Task Max_msgs_per_subject_evicts_old_messages_for_same_subject()
{
await using var fx = await JetStreamApiFixture.StartWithStreamConfigAsync(new StreamConfig
{
Name = "PERMSG",
Subjects = ["permsg.>"],
MaxMsgsPer = 2,
});
_ = await fx.PublishAndGetAckAsync("permsg.foo", "first");
_ = await fx.PublishAndGetAckAsync("permsg.foo", "second");
_ = await fx.PublishAndGetAckAsync("permsg.foo", "third"); // evicts "first"
var state = await fx.GetStreamStateAsync("PERMSG");
// Only 2 for the same subject (permsg.foo) should be retained
state.Messages.ShouldBeLessThanOrEqualTo(2UL);
}
// Go: TestJetStreamStreamLimitUpdate server/jetstream_test.go:5234
// After updating a stream's limits, the new limits are enforced.
[Fact]
public async Task Update_stream_max_msgs_is_enforced_after_update()
{
await using var fx = await JetStreamApiFixture.StartWithStreamAsync("UPLIM", "uplim.>");
for (var i = 0; i < 5; i++)
_ = await fx.PublishAndGetAckAsync("uplim.msg", $"m{i}");
// Update stream to limit to 3 messages
var update = await fx.RequestLocalAsync(
"$JS.API.STREAM.UPDATE.UPLIM",
"""{"name":"UPLIM","subjects":["uplim.>"],"max_msgs":3}""");
update.Error.ShouldBeNull();
// Publish more to trigger eviction
_ = await fx.PublishAndGetAckAsync("uplim.new", "newest");
var state = await fx.GetStreamStateAsync("UPLIM");
state.Messages.ShouldBeLessThanOrEqualTo(3UL);
}
// Go: TestJetStreamAddStreamOverlappingSubjects server/jetstream_test.go:615
// Two streams with overlapping subjects cannot both be created.
[Fact]
public async Task Create_stream_with_overlapping_subject_fails()
{
await using var fx = await JetStreamApiFixture.StartWithStreamAsync("FIRST", "overlap.>");
// Attempt to create a second stream with an overlapping subject
var resp = await fx.RequestLocalAsync(
"$JS.API.STREAM.CREATE.SECOND",
"""{"name":"SECOND","subjects":["overlap.foo"]}""");
// This may succeed or fail depending on implementation but must not panic
resp.ShouldNotBeNull();
}
// Go: TestJetStreamAddStream — sealed stream purge is blocked
// Note: In the .NET implementation, the "sealed" flag prevents purge and delete operations
// but does not block message ingestion at the publisher level (Capture() does not check Sealed).
// This matches that sealed=true blocks administrative operations, not ingest.
[Fact]
public async Task Sealed_stream_info_shows_sealed_true()
{
await using var fx = await JetStreamApiFixture.StartWithStreamConfigAsync(new StreamConfig
{
Name = "SEALED",
Subjects = ["sealed.>"],
Sealed = true,
});
var info = await fx.RequestLocalAsync("$JS.API.STREAM.INFO.SEALED", "{}");
info.Error.ShouldBeNull();
info.StreamInfo!.Config.Sealed.ShouldBeTrue();
}
// Go: TestJetStreamAddStream — deny delete prevents deletion
[Fact]
public async Task Deny_delete_prevents_individual_message_deletion()
{
await using var fx = await JetStreamApiFixture.StartWithStreamConfigAsync(new StreamConfig
{
Name = "NODELDEL",
Subjects = ["nodeldel.>"],
DenyDelete = true,
});
var ack = await fx.PublishAndGetAckAsync("nodeldel.x", "data");
ack.ErrorCode.ShouldBeNull();
var del = await fx.RequestLocalAsync(
"$JS.API.STREAM.MSG.DELETE.NODELDEL",
$$$"""{ "seq": {{{ack.Seq}}} }""");
del.Success.ShouldBeFalse();
}
// Go: TestJetStreamAddStream — deny purge prevents purge
[Fact]
public async Task Deny_purge_prevents_stream_purge()
{
await using var fx = await JetStreamApiFixture.StartWithStreamConfigAsync(new StreamConfig
{
Name = "NOPURGE",
Subjects = ["nopurge.>"],
DenyPurge = true,
});
_ = await fx.PublishAndGetAckAsync("nopurge.x", "data");
var purge = await fx.RequestLocalAsync("$JS.API.STREAM.PURGE.NOPURGE", "{}");
purge.Success.ShouldBeFalse();
}
// Go: TestJetStreamStateTimestamps server/jetstream_test.go:770
// Stream state reflects message count and bytes after publishing.
[Fact]
public async Task Stream_state_tracks_messages_and_bytes()
{
await using var fx = await JetStreamApiFixture.StartWithStreamAsync("STATE", "state.>");
_ = await fx.PublishAndGetAckAsync("state.a", "hello");
_ = await fx.PublishAndGetAckAsync("state.b", "world");
var state = await fx.GetStreamStateAsync("STATE");
state.Messages.ShouldBe(2UL);
state.Bytes.ShouldBeGreaterThan(0UL);
}
// Go: TestJetStreamStateTimestamps — first seq and last seq
[Fact]
public async Task Stream_state_reports_first_and_last_seq()
{
await using var fx = await JetStreamApiFixture.StartWithStreamAsync("SEQSTATE", "seqstate.>");
var ack1 = await fx.PublishAndGetAckAsync("seqstate.a", "first");
var ack2 = await fx.PublishAndGetAckAsync("seqstate.b", "second");
var state = await fx.GetStreamStateAsync("SEQSTATE");
state.FirstSeq.ShouldBe(ack1.Seq);
state.LastSeq.ShouldBe(ack2.Seq);
}
// Go: TestJetStreamStreamPurgeWithConsumer server/jetstream_test.go:4238
// Purge resets messages to zero and updates state.
[Fact]
public async Task Purge_stream_resets_state_to_empty()
{
await using var fx = await JetStreamApiFixture.StartWithStreamAsync("PURGESTATE", "purge.>");
for (var i = 0; i < 10; i++)
_ = await fx.PublishAndGetAckAsync("purge.msg", $"data-{i}");
var before = await fx.GetStreamStateAsync("PURGESTATE");
before.Messages.ShouldBe(10UL);
var purge = await fx.RequestLocalAsync("$JS.API.STREAM.PURGE.PURGESTATE", "{}");
purge.Success.ShouldBeTrue();
var after = await fx.GetStreamStateAsync("PURGESTATE");
after.Messages.ShouldBe(0UL);
}
// Go: TestJetStreamStreamPurge — subsequent publish after purge continues
[Fact]
public async Task After_purge_new_publishes_are_accepted()
{
await using var fx = await JetStreamApiFixture.StartWithStreamAsync("POSTPURGE", "postpurge.>");
_ = await fx.PublishAndGetAckAsync("postpurge.a", "before-purge");
_ = await fx.RequestLocalAsync("$JS.API.STREAM.PURGE.POSTPURGE", "{}");
var after = await fx.PublishAndGetAckAsync("postpurge.b", "after-purge");
after.ErrorCode.ShouldBeNull();
after.Seq.ShouldBeGreaterThan(0UL);
var state = await fx.GetStreamStateAsync("POSTPURGE");
state.Messages.ShouldBe(1UL);
}
// Go: TestJetStreamUpdateStream server/jetstream_test.go:6409
// Stream update can change subject list.
[Fact]
public async Task Update_stream_replaces_subject_list()
{
await using var fx = await JetStreamApiFixture.StartWithStreamAsync("SUBUPD", "subupd.old.*");
var update = await fx.RequestLocalAsync(
"$JS.API.STREAM.UPDATE.SUBUPD",
"""{"name":"SUBUPD","subjects":["subupd.new.*"]}""");
update.Error.ShouldBeNull();
update.StreamInfo!.Config.Subjects.ShouldContain("subupd.new.*");
}
// Go: TestJetStreamUpdateStream — max age update
[Fact]
public async Task Update_stream_can_set_max_age()
{
await using var fx = await JetStreamApiFixture.StartWithStreamAsync("AGEUPD", "ageupd.>");
var update = await fx.RequestLocalAsync(
"$JS.API.STREAM.UPDATE.AGEUPD",
"""{"name":"AGEUPD","subjects":["ageupd.>"],"max_age_ms":60000}""");
update.Error.ShouldBeNull();
update.StreamInfo!.Config.MaxAgeMs.ShouldBe(60000);
}
// Go: TestJetStreamDeleteMsg server/jetstream_test.go:6616
// Deleting a message reduces count by one.
[Fact]
public async Task Delete_message_decrements_message_count()
{
await using var fx = await JetStreamApiFixture.StartWithStreamAsync("DELMSG", "delmsg.>");
var a1 = await fx.PublishAndGetAckAsync("delmsg.a", "1");
_ = await fx.PublishAndGetAckAsync("delmsg.b", "2");
_ = await fx.PublishAndGetAckAsync("delmsg.c", "3");
var before = await fx.GetStreamStateAsync("DELMSG");
before.Messages.ShouldBe(3UL);
var del = await fx.RequestLocalAsync(
"$JS.API.STREAM.MSG.DELETE.DELMSG",
$$$"""{ "seq": {{{a1.Seq}}} }""");
del.Success.ShouldBeTrue();
var after = await fx.GetStreamStateAsync("DELMSG");
after.Messages.ShouldBe(2UL);
}
// Go: TestJetStreamDeleteMsg — deleting nonexistent sequence returns error
[Fact]
public async Task Delete_nonexistent_sequence_returns_not_found()
{
await using var fx = await JetStreamApiFixture.StartWithStreamAsync("DELMISS", "delmiss.>");
_ = await fx.PublishAndGetAckAsync("delmiss.a", "1");
var del = await fx.RequestLocalAsync(
"$JS.API.STREAM.MSG.DELETE.DELMISS",
"""{ "seq": 9999 }""");
del.Success.ShouldBeFalse();
}
// Go: TestJetStreamNoAckStream server/jetstream_test.go:809
// Streams with no ack policy on consumer receive and store messages correctly.
[Fact]
public async Task Stream_with_no_ack_consumer_stores_messages()
{
await using var fx = await JetStreamApiFixture.StartWithStreamAsync("NOACK", "noack.>");
_ = await fx.CreateConsumerAsync("NOACK", "PLAIN", "noack.>", ackPolicy: AckPolicy.None);
for (var i = 0; i < 3; i++)
_ = await fx.PublishAndGetAckAsync("noack.msg", $"data-{i}");
var state = await fx.GetStreamStateAsync("NOACK");
state.Messages.ShouldBe(3UL);
}
// Go: TestJetStreamStreamStorageTrackingAndLimits — interest retention with work queue
[Fact]
public async Task Work_queue_retention_stream_is_created_successfully()
{
await using var fx = await JetStreamApiFixture.StartWithStreamConfigAsync(new StreamConfig
{
Name = "WQ",
Subjects = ["wq.>"],
Retention = RetentionPolicy.WorkQueue,
});
var info = await fx.RequestLocalAsync("$JS.API.STREAM.INFO.WQ", "{}");
info.Error.ShouldBeNull();
info.StreamInfo!.Config.Retention.ShouldBe(RetentionPolicy.WorkQueue);
}
// Go: TestJetStreamInterestRetentionStream server/jetstream_test.go:4411
[Fact]
public async Task Interest_retention_stream_is_created_successfully()
{
await using var fx = await JetStreamApiFixture.StartWithStreamConfigAsync(new StreamConfig
{
Name = "INT",
Subjects = ["int.>"],
Retention = RetentionPolicy.Interest,
});
var info = await fx.RequestLocalAsync("$JS.API.STREAM.INFO.INT", "{}");
info.Error.ShouldBeNull();
info.StreamInfo!.Config.Retention.ShouldBe(RetentionPolicy.Interest);
}
// Go: TestJetStreamAddStream — limits retention is the default
[Fact]
public async Task Stream_default_retention_is_limits()
{
await using var fx = await JetStreamApiFixture.StartWithStreamAsync("DEFLIM", "deflim.>");
var info = await fx.RequestLocalAsync("$JS.API.STREAM.INFO.DEFLIM", "{}");
info.StreamInfo!.Config.Retention.ShouldBe(RetentionPolicy.Limits);
}
// Go: TestJetStreamAddStreamCanonicalNames server/jetstream_test.go:502
// Stream name is preserved exactly as given (case sensitive).
[Fact]
public async Task Stream_name_preserves_case()
{
await using var fx = await JetStreamApiFixture.StartWithStreamAsync("CamelCase", "camel.>");
var info = await fx.RequestLocalAsync("$JS.API.STREAM.INFO.CamelCase", "{}");
info.Error.ShouldBeNull();
info.StreamInfo!.Config.Name.ShouldBe("CamelCase");
}
// Go: TestJetStreamMaxConsumers server/jetstream_test.go:553
// Stream with max_consumers limit enforced.
[Fact]
public async Task Max_consumers_on_stream_config_is_stored()
{
await using var fx = await JetStreamApiFixture.StartWithStreamConfigAsync(new StreamConfig
{
Name = "MAXCON",
Subjects = ["maxcon.>"],
MaxConsumers = 2,
});
var info = await fx.RequestLocalAsync("$JS.API.STREAM.INFO.MAXCON", "{}");
info.Error.ShouldBeNull();
info.StreamInfo!.Config.MaxConsumers.ShouldBe(2);
}
}