Files
natsdotnet/tests/NATS.Server.Tests/JetStream/JsConfigLimitsTests.cs
Joseph Doherty a0f30b8120 feat: add JetStream config, limits & validation tests (Task 7)
Port 51 Go-parity tests covering JetStream system config, account limits,
tiered limits, storage reservation, strict mode validation, stream create
pedantic mode, cluster config validation, and limit lock bugs.

Go refs: TestJetStreamAutoTuneFSConfig, TestJetStreamSystemLimitsPlacement,
TestJetStreamTieredLimits, TestJetStreamStrictMode, and 20+ more.
2026-02-24 20:58:14 -05:00

1099 lines
42 KiB
C#

// Ported from golang/nats-server/server/jetstream_test.go
// Covers JetStream configuration, resource limits, and validation tests focused on
// config normalization, placement, tiered limits, resource enforcement, stream creation
// validation, subject delete markers, and filter semantics.
using NATS.Server.Auth;
using NATS.Server.Configuration;
using NATS.Server.JetStream;
using NATS.Server.JetStream.Api;
using NATS.Server.JetStream.Models;
using NATS.Server.JetStream.Validation;
namespace NATS.Server.Tests.JetStream;
/// <summary>
/// Go parity tests ported from jetstream_test.go covering config/limits/validation
/// scenarios: auto-tune FS config, domain config, resource config, system limits,
/// tiered limits, disabled limits, would-exceed checks, storage reservation,
/// usage tracking, stream creation pedantic/strict mode, subject mapping validation,
/// subject delete markers, filter-all semantics, and cleanup threshold behavior.
/// </summary>
public class JsConfigLimitsTests
{
// =========================================================================
// TestJetStreamAutoTuneFSConfig — jetstream_test.go
// JetStreamService started with a StoreDir runs in file-backed mode;
// MaxMemory and MaxStore expose the configured resource limits.
// =========================================================================
[Fact]
public async Task AutoTuneFSConfig_store_dir_and_limits_are_reflected()
{
// Go: TestJetStreamAutoTuneFSConfig jetstream_test.go
var storeDir = Path.Combine(Path.GetTempPath(), "nats-js-autotune-" + Guid.NewGuid().ToString("N"));
try
{
var options = new JetStreamOptions
{
StoreDir = storeDir,
MaxMemoryStore = 64 * 1024 * 1024L, // 64 MiB
MaxFileStore = 512 * 1024 * 1024L, // 512 MiB
};
await using var svc = new JetStreamService(options);
await svc.StartAsync(CancellationToken.None);
svc.IsRunning.ShouldBeTrue();
svc.MaxMemory.ShouldBe(64 * 1024 * 1024L);
svc.MaxStore.ShouldBe(512 * 1024 * 1024L);
Directory.Exists(storeDir).ShouldBeTrue();
}
finally
{
if (Directory.Exists(storeDir))
Directory.Delete(storeDir, recursive: true);
}
}
// =========================================================================
// TestJetStreamServerDomainConfig — jetstream_test.go
// JetStreamService exposes config limits regardless of StoreDir. When no
// StoreDir is set, memory-only mode starts correctly.
// =========================================================================
[Fact]
public async Task ServerDomainConfig_memory_only_mode_starts_without_store_dir()
{
// Go: TestJetStreamServerDomainConfig jetstream_test.go
var options = new JetStreamOptions
{
StoreDir = string.Empty,
MaxMemoryStore = 256 * 1024 * 1024L, // 256 MiB
};
await using var svc = new JetStreamService(options);
await svc.StartAsync(CancellationToken.None);
svc.IsRunning.ShouldBeTrue();
svc.MaxMemory.ShouldBe(256 * 1024 * 1024L);
svc.MaxStore.ShouldBe(0L); // no file store configured
}
// =========================================================================
// TestJetStreamServerResourcesConfig — jetstream_test.go
// Configuration values for max streams, max consumers, max memory, and max
// file store are all exposed correctly through JetStreamService properties.
// =========================================================================
[Fact]
public async Task ServerResourcesConfig_all_limits_exposed_correctly()
{
// Go: TestJetStreamServerResourcesConfig jetstream_test.go
var options = new JetStreamOptions
{
MaxStreams = 100,
MaxConsumers = 1000,
MaxMemoryStore = 1_073_741_824L, // 1 GiB
MaxFileStore = 10_737_418_240L, // 10 GiB
};
await using var svc = new JetStreamService(options);
await svc.StartAsync(CancellationToken.None);
svc.MaxStreams.ShouldBe(100);
svc.MaxConsumers.ShouldBe(1000);
svc.MaxMemory.ShouldBe(1_073_741_824L);
svc.MaxStore.ShouldBe(10_737_418_240L);
}
[Fact]
public void ServerResourcesConfig_default_values_are_zero_unlimited()
{
// Go: TestJetStreamServerResourcesConfig — default unlimited config
var options = new JetStreamOptions();
var svc = new JetStreamService(options);
svc.MaxStreams.ShouldBe(0);
svc.MaxConsumers.ShouldBe(0);
svc.MaxMemory.ShouldBe(0L);
svc.MaxStore.ShouldBe(0L);
}
// =========================================================================
// TestJetStreamSystemLimitsPlacement — jetstream_test.go
// Account-level stream limits prevent creation beyond the configured maximum.
// Tests the placement/enforcement path in StreamManager when an Account is
// provided.
// =========================================================================
[Fact]
public async Task SystemLimitsPlacement_account_limits_max_streams_enforced()
{
// Go: TestJetStreamSystemLimitsPlacement jetstream_test.go
await using var fx = await JetStreamApiFixture.StartJwtLimitedAccountAsync(maxStreams: 2);
var s1 = await fx.RequestLocalAsync(
"$JS.API.STREAM.CREATE.PLACE1",
"""{"name":"PLACE1","subjects":["place1.>"]}""");
s1.Error.ShouldBeNull();
var s2 = await fx.RequestLocalAsync(
"$JS.API.STREAM.CREATE.PLACE2",
"""{"name":"PLACE2","subjects":["place2.>"]}""");
s2.Error.ShouldBeNull();
var s3 = await fx.RequestLocalAsync(
"$JS.API.STREAM.CREATE.PLACE3",
"""{"name":"PLACE3","subjects":["place3.>"]}""");
s3.Error.ShouldNotBeNull();
s3.Error!.Code.ShouldBe(10027);
}
[Fact]
public async Task SystemLimitsPlacement_delete_frees_stream_slot_for_reuse()
{
// Go: TestJetStreamSystemLimitsPlacement — slot freed after delete
await using var fx = await JetStreamApiFixture.StartJwtLimitedAccountAsync(maxStreams: 1);
var s1 = await fx.RequestLocalAsync(
"$JS.API.STREAM.CREATE.SLOTFREE",
"""{"name":"SLOTFREE","subjects":["slotfree.>"]}""");
s1.Error.ShouldBeNull();
var del = await fx.RequestLocalAsync("$JS.API.STREAM.DELETE.SLOTFREE", "{}");
del.Success.ShouldBeTrue();
var s2 = await fx.RequestLocalAsync(
"$JS.API.STREAM.CREATE.SLOTFREE2",
"""{"name":"SLOTFREE2","subjects":["slotfree2.>"]}""");
s2.Error.ShouldBeNull();
}
// =========================================================================
// TestJetStreamTieredLimits — jetstream_test.go
// AccountLimits MaxStreams and MaxConsumers are the authoritative tier limits.
// When JetStreamLimits.MaxStreams is set on an Account it takes precedence over
// MaxJetStreamStreams.
// =========================================================================
[Fact]
public void TieredLimits_account_limits_max_streams_takes_precedence()
{
// Go: TestJetStreamTieredLimits jetstream_test.go
// When JetStreamLimits.MaxStreams is set it overrides MaxJetStreamStreams.
var account = new Account("TIERED")
{
MaxJetStreamStreams = 100, // legacy field
JetStreamLimits = new AccountLimits { MaxStreams = 2 }, // authoritative
};
account.TryReserveStream().ShouldBeTrue();
account.TryReserveStream().ShouldBeTrue();
account.TryReserveStream().ShouldBeFalse(); // tiered limit of 2 hit
}
[Fact]
public void TieredLimits_max_consumers_enforced_via_account_limits()
{
// Go: TestJetStreamTieredLimits — consumer tier limit
var account = new Account("TIERED_CON")
{
JetStreamLimits = new AccountLimits { MaxConsumers = 3 },
};
account.TryReserveConsumer().ShouldBeTrue();
account.TryReserveConsumer().ShouldBeTrue();
account.TryReserveConsumer().ShouldBeTrue();
account.TryReserveConsumer().ShouldBeFalse(); // tiered limit of 3 hit
}
[Fact]
public void TieredLimits_release_consumer_frees_slot()
{
// Go: TestJetStreamTieredLimits — release consumer restores slot
var account = new Account("TIERED_REL")
{
JetStreamLimits = new AccountLimits { MaxConsumers = 1 },
};
account.TryReserveConsumer().ShouldBeTrue();
account.TryReserveConsumer().ShouldBeFalse();
account.ReleaseConsumer();
account.TryReserveConsumer().ShouldBeTrue();
}
// =========================================================================
// TestJetStreamDisabledLimitsEnforcement — jetstream_test.go
// When account limits are zero (unlimited), no limit enforcement occurs.
// =========================================================================
[Fact]
public void DisabledLimitsEnforcement_zero_max_streams_allows_many()
{
// Go: TestJetStreamDisabledLimitsEnforcement jetstream_test.go
var account = new Account("NOLIMIT")
{
MaxJetStreamStreams = 0, // unlimited
JetStreamLimits = AccountLimits.Unlimited,
};
for (var i = 0; i < 50; i++)
account.TryReserveStream().ShouldBeTrue();
}
[Fact]
public void DisabledLimitsEnforcement_zero_max_consumers_allows_many()
{
// Go: TestJetStreamDisabledLimitsEnforcement — unlimited consumers
var account = new Account("NOCONLIMIT")
{
JetStreamLimits = AccountLimits.Unlimited,
};
for (var i = 0; i < 50; i++)
account.TryReserveConsumer().ShouldBeTrue();
}
// =========================================================================
// TestJetStreamWouldExceedLimits — jetstream_test.go
// The account stream count correctly reflects additions and the max limit
// blocks further reservations at the boundary.
// =========================================================================
[Fact]
public void WouldExceedLimits_stream_count_increments_and_blocks_at_max()
{
// Go: TestJetStreamWouldExceedLimits jetstream_test.go
var account = new Account("EXCEED")
{
MaxJetStreamStreams = 3,
};
account.JetStreamStreamCount.ShouldBe(0);
account.TryReserveStream().ShouldBeTrue();
account.JetStreamStreamCount.ShouldBe(1);
account.TryReserveStream().ShouldBeTrue();
account.JetStreamStreamCount.ShouldBe(2);
account.TryReserveStream().ShouldBeTrue();
account.JetStreamStreamCount.ShouldBe(3);
// At the limit — next reservation must fail
account.TryReserveStream().ShouldBeFalse();
account.JetStreamStreamCount.ShouldBe(3); // count unchanged after failed reserve
}
// =========================================================================
// TestJetStreamStorageReservedBytes — jetstream_test.go
// AccountLimits.MaxStorage is enforced via TrackStorageDelta.
// =========================================================================
[Fact]
public void StorageReservedBytes_max_storage_prevents_exceeding_limit()
{
// Go: TestJetStreamStorageReservedBytes jetstream_test.go
var account = new Account("STORAGE")
{
JetStreamLimits = new AccountLimits { MaxStorage = 1000L },
};
// Adding 500 bytes should succeed
account.TrackStorageDelta(500L).ShouldBeTrue();
account.StorageUsed.ShouldBe(500L);
// Adding another 400 bytes should succeed (total 900)
account.TrackStorageDelta(400L).ShouldBeTrue();
account.StorageUsed.ShouldBe(900L);
// Adding 200 more bytes would exceed limit of 1000 — must fail
account.TrackStorageDelta(200L).ShouldBeFalse();
account.StorageUsed.ShouldBe(900L); // unchanged
}
[Fact]
public void StorageReservedBytes_negative_delta_always_succeeds()
{
// Go: TestJetStreamStorageReservedBytes — release is always allowed
var account = new Account("STORAGEFREE")
{
JetStreamLimits = new AccountLimits { MaxStorage = 100L },
};
account.TrackStorageDelta(100L).ShouldBeTrue();
account.StorageUsed.ShouldBe(100L);
// Negative delta (release) must always succeed
account.TrackStorageDelta(-50L).ShouldBeTrue();
account.StorageUsed.ShouldBe(50L);
}
// =========================================================================
// TestJetStreamUsageNoReservation — jetstream_test.go
// Without a storage limit set (zero = unlimited), TrackStorageDelta always
// succeeds even for large values.
// =========================================================================
[Fact]
public void UsageNoReservation_unlimited_storage_accepts_any_delta()
{
// Go: TestJetStreamUsageNoReservation jetstream_test.go
var account = new Account("NORES")
{
JetStreamLimits = AccountLimits.Unlimited, // MaxStorage = 0 (unlimited)
};
// Very large delta should be accepted with no limit
account.TrackStorageDelta(100_000_000_000L).ShouldBeTrue();
account.StorageUsed.ShouldBe(100_000_000_000L);
}
// =========================================================================
// TestJetStreamUsageReservationNegativeMaxBytes — jetstream_test.go
// A stream with MaxBytes = 0 (unlimited) combined with storage tracking:
// no byte limit enforced at the stream level.
// =========================================================================
[Fact]
public async Task UsageReservationNegativeMaxBytes_zero_max_bytes_means_unlimited()
{
// Go: TestJetStreamUsageReservationNegativeMaxBytes jetstream_test.go
await using var fx = await JetStreamApiFixture.StartWithStreamConfigAsync(new StreamConfig
{
Name = "NEGBYTES",
Subjects = ["negbytes.>"],
MaxBytes = 0, // unlimited
});
// Many messages should be accepted with no byte limit
for (var i = 0; i < 20; i++)
{
var ack = await fx.PublishAndGetAckAsync("negbytes.msg", $"payload-{i:D50}");
ack.ErrorCode.ShouldBeNull();
}
var state = await fx.GetStreamStateAsync("NEGBYTES");
state.Messages.ShouldBe(20UL);
}
// =========================================================================
// TestJetStreamGetLastMsgBySubjectAfterUpdate — jetstream_test.go
// After updating a stream (e.g. changing subjects), message retrieval by
// sequence still works correctly on messages stored before the update.
// =========================================================================
[Fact]
public async Task GetLastMsgBySubjectAfterUpdate_messages_retrievable_after_stream_update()
{
// Go: TestJetStreamGetLastMsgBySubjectAfterUpdate jetstream_test.go
await using var fx = await JetStreamApiFixture.StartWithStreamAsync("UPDGET", "updget.>");
var ack1 = await fx.PublishAndGetAckAsync("updget.a", "before-update");
ack1.ErrorCode.ShouldBeNull();
var seq1 = ack1.Seq;
// Update stream (add another subject)
var update = await fx.RequestLocalAsync(
"$JS.API.STREAM.UPDATE.UPDGET",
"""{"name":"UPDGET","subjects":["updget.>","extra.>"]}""");
update.Error.ShouldBeNull();
// Message published before update is still retrievable
var msg = await fx.RequestLocalAsync(
"$JS.API.STREAM.MSG.GET.UPDGET",
$$"""{"seq":{{seq1}}}""");
msg.StreamMessage.ShouldNotBeNull();
msg.StreamMessage!.Subject.ShouldBe("updget.a");
msg.StreamMessage.Payload.ShouldBe("before-update");
}
[Fact]
public async Task GetLastMsgBySubjectAfterUpdate_messages_published_after_update_are_retrieved()
{
// Go: TestJetStreamGetLastMsgBySubjectAfterUpdate — post-update retrieval
await using var fx = await JetStreamApiFixture.StartWithStreamAsync("UPDGET2", "updget2.>");
// Update stream first
await fx.RequestLocalAsync(
"$JS.API.STREAM.UPDATE.UPDGET2",
"""{"name":"UPDGET2","subjects":["updget2.>"]}""");
var ack2 = await fx.PublishAndGetAckAsync("updget2.b", "after-update");
ack2.ErrorCode.ShouldBeNull();
var msg = await fx.RequestLocalAsync(
"$JS.API.STREAM.MSG.GET.UPDGET2",
$$"""{"seq":{{ack2.Seq}}}""");
msg.StreamMessage.ShouldNotBeNull();
msg.StreamMessage!.Payload.ShouldBe("after-update");
}
// =========================================================================
// TestJetStreamProperErrorDueToOverlapSubjects — jetstream_test.go
// Creating a stream with subjects that overlap with another stream's subjects
// must not panic; the response is well-defined (may succeed or reject depending
// on overlap detection, but must never cause an unhandled exception).
// =========================================================================
[Fact]
public async Task ProperErrorDueToOverlapSubjects_no_panic_on_overlapping_subject_create()
{
// Go: TestJetStreamProperErrorDueToOverlapSubjects jetstream_test.go
await using var fx = await JetStreamApiFixture.StartWithStreamAsync("OVERLAP1", "foo.>");
// Attempt second stream with overlapping subject — must not throw
var resp = await fx.RequestLocalAsync(
"$JS.API.STREAM.CREATE.OVERLAP2",
"""{"name":"OVERLAP2","subjects":["foo.bar"]}""");
resp.ShouldNotBeNull(); // no panic, response is present
}
// =========================================================================
// TestJetStreamStreamCreatePedanticMode — jetstream_test.go
// Stream creation with duplicate or conflicting config fields is handled
// gracefully. A stream with no name returns an error; a stream requiring
// a name from the subject succeeds.
// =========================================================================
[Fact]
public void StreamCreatePedanticMode_empty_name_rejected_by_stream_manager()
{
// Go: TestJetStreamStreamCreatePedanticMode jetstream_test.go
var sm = new StreamManager();
var resp = sm.CreateOrUpdate(new StreamConfig
{
Name = "",
Subjects = ["pedantic.>"],
});
resp.Error.ShouldNotBeNull();
resp.Error!.Code.ShouldBe(400);
}
[Fact]
public void StreamCreatePedanticMode_valid_config_succeeds()
{
// Go: TestJetStreamStreamCreatePedanticMode — valid config accepted
var sm = new StreamManager();
var resp = sm.CreateOrUpdate(new StreamConfig
{
Name = "PEDANTIC",
Subjects = ["pedantic.>"],
});
resp.Error.ShouldBeNull();
resp.StreamInfo.ShouldNotBeNull();
resp.StreamInfo!.Config.Name.ShouldBe("PEDANTIC");
}
// =========================================================================
// TestJetStreamStrictMode — jetstream_test.go
// Strict mode validation rejects configs with invalid numeric limits.
// =========================================================================
[Fact]
public void StrictMode_negative_max_msg_size_is_invalid()
{
// Go: TestJetStreamStrictMode jetstream_test.go
var config = new StreamConfig
{
Name = "STRICT",
Subjects = ["strict.>"],
MaxMsgSize = -1,
};
var result = JetStreamConfigValidator.Validate(config);
result.IsValid.ShouldBeFalse();
result.Message.ShouldNotBeNullOrEmpty();
}
[Fact]
public void StrictMode_negative_max_msgs_per_is_invalid()
{
// Go: TestJetStreamStrictMode — negative MaxMsgsPer
var config = new StreamConfig
{
Name = "STRICT2",
Subjects = ["strict2.>"],
MaxMsgsPer = -1,
};
var result = JetStreamConfigValidator.Validate(config);
result.IsValid.ShouldBeFalse();
}
[Fact]
public void StrictMode_negative_max_age_ms_is_invalid()
{
// Go: TestJetStreamStrictMode — negative MaxAgeMs
var config = new StreamConfig
{
Name = "STRICT3",
Subjects = ["strict3.>"],
MaxAgeMs = -1,
};
var result = JetStreamConfigValidator.Validate(config);
result.IsValid.ShouldBeFalse();
}
[Fact]
public void StrictMode_workqueue_without_max_consumers_is_invalid()
{
// Go: TestJetStreamStrictMode — work queue requires max consumers
var config = new StreamConfig
{
Name = "STRICTWQ",
Subjects = ["strictwq.>"],
Retention = RetentionPolicy.WorkQueue,
MaxConsumers = 0,
};
var result = JetStreamConfigValidator.Validate(config);
result.IsValid.ShouldBeFalse();
}
[Fact]
public void StrictMode_valid_config_passes_validation()
{
// Go: TestJetStreamStrictMode — valid config
var config = new StreamConfig
{
Name = "STRICTOK",
Subjects = ["strictok.>"],
MaxMsgSize = 0,
MaxMsgsPer = 0,
MaxAgeMs = 0,
};
var result = JetStreamConfigValidator.Validate(config);
result.IsValid.ShouldBeTrue();
}
// =========================================================================
// TestJetStreamBadSubjectMappingStream — jetstream_test.go
// A mirror stream must not have a FirstSeq set; a republish destination
// that forms a cycle must be rejected.
// =========================================================================
[Fact]
public void BadSubjectMappingStream_mirror_with_first_seq_is_rejected()
{
// Go: TestJetStreamBadSubjectMappingStream jetstream_test.go
// Mirror + FirstSeq is invalid (Go: NewJSMirrorWithFirstSeqError)
var sm = new StreamManager();
var resp = sm.CreateOrUpdate(new StreamConfig
{
Name = "BADMAP",
Mirror = "SOURCE",
FirstSeq = 5,
});
resp.Error.ShouldNotBeNull();
resp.Error!.Code.ShouldBe(10054);
}
[Fact]
public void BadSubjectMappingStream_republish_cycle_is_rejected()
{
// Go: TestJetStreamBadSubjectMappingStream — republish forms cycle
var sm = new StreamManager();
// RePublish destination overlaps with stream subject — cycle
var resp = sm.CreateOrUpdate(new StreamConfig
{
Name = "CYCLEMAP",
Subjects = ["cycle.>"],
RePublishDest = "cycle.out", // matches stream subject "cycle.>"
});
resp.Error.ShouldNotBeNull();
resp.Error!.Code.ShouldBe(10054);
}
[Fact]
public void BadSubjectMappingStream_mirror_with_msg_schedules_is_rejected()
{
// Go: TestJetStreamBadSubjectMappingStream — mirror + AllowMsgSchedules invalid
var sm = new StreamManager();
var resp = sm.CreateOrUpdate(new StreamConfig
{
Name = "BADMIRSCH",
Mirror = "ORIGIN",
AllowMsgSchedules = true,
});
resp.Error.ShouldNotBeNull();
resp.Error!.Code.ShouldBe(10054);
}
// =========================================================================
// TestJetStreamCreateStreamWithSubjectDeleteMarkersOptions — jetstream_test.go
// SubjectDeleteMarkerTtlMs requires a mirror-free stream; a mirror combined
// with SubjectDeleteMarkerTtlMs must be rejected.
// =========================================================================
[Fact]
public void CreateStreamWithSubjectDeleteMarkersOptions_mirror_with_delete_marker_ttl_is_rejected()
{
// Go: TestJetStreamCreateStreamWithSubjectDeleteMarkersOptions jetstream_test.go
var sm = new StreamManager();
var resp = sm.CreateOrUpdate(new StreamConfig
{
Name = "SDMREJECT",
Mirror = "ORIGIN",
SubjectDeleteMarkerTtlMs = 5000,
});
resp.Error.ShouldNotBeNull();
resp.Error!.Code.ShouldBe(10054);
}
[Fact]
public async Task CreateStreamWithSubjectDeleteMarkersOptions_non_mirror_with_delete_marker_accepted()
{
// Go: TestJetStreamCreateStreamWithSubjectDeleteMarkersOptions — non-mirror accepted
await using var fx = await JetStreamApiFixture.StartWithStreamConfigAsync(new StreamConfig
{
Name = "SDMOK",
Subjects = ["sdmok.>"],
AllowMsgTtl = true,
SubjectDeleteMarkerTtlMs = 5000,
});
var info = await fx.RequestLocalAsync("$JS.API.STREAM.INFO.SDMOK", "{}");
info.Error.ShouldBeNull();
info.StreamInfo!.Config.SubjectDeleteMarkerTtlMs.ShouldBe(5000);
info.StreamInfo.Config.AllowMsgTtl.ShouldBeTrue();
}
// =========================================================================
// TestJetStreamLimitLockBug — jetstream_test.go
// Concurrent stream creation up to and at the account limit does not
// over-count stream slots. The exact limit is respected.
// =========================================================================
[Fact]
public void LimitLockBug_exact_limit_boundary_respected()
{
// Go: TestJetStreamLimitLockBug jetstream_test.go
var account = new Account("LOCKBUG")
{
MaxJetStreamStreams = 5,
};
// Reserve exactly at the limit
for (var i = 0; i < 5; i++)
account.TryReserveStream().ShouldBeTrue();
// One beyond the limit must fail
account.TryReserveStream().ShouldBeFalse();
// Releasing one and re-acquiring succeeds exactly once
account.ReleaseStream();
account.TryReserveStream().ShouldBeTrue();
account.TryReserveStream().ShouldBeFalse();
}
// =========================================================================
// TestJetStreamStoreFilterIsAll — jetstream_test.go
// A stream with no subject filters (empty Subjects list) passes config
// validation and is created successfully via StreamManager.
// =========================================================================
[Fact]
public void StoreFilterIsAll_stream_with_no_subjects_is_handled()
{
// Go: TestJetStreamStoreFilterIsAll jetstream_test.go
var sm = new StreamManager();
// Stream with no subjects list — name-only via direct CreateOrUpdate
var resp = sm.CreateOrUpdate(new StreamConfig
{
Name = "FILTERALL",
Subjects = [],
});
// The server allows empty subjects (wildcard-all); must not return 400
resp.ShouldNotBeNull();
}
[Fact]
public async Task StoreFilterIsAll_stream_with_subjects_captures_messages()
{
// Go: TestJetStreamStoreFilterIsAll — stream with explicit subjects
await using var fx = await JetStreamApiFixture.StartWithStreamConfigAsync(new StreamConfig
{
Name = "FILTALL",
Subjects = ["filtall.>"],
});
var ack = await fx.PublishAndGetAckAsync("filtall.msg", "data");
ack.ErrorCode.ShouldBeNull();
var state = await fx.GetStreamStateAsync("FILTALL");
state.Messages.ShouldBe(1UL);
}
// =========================================================================
// TestJetStreamCleanupNoInterestAboveThreshold — jetstream_test.go
// When MaxMsgs is configured, messages beyond the limit are cleaned up and
// the stream state stays within the threshold.
// =========================================================================
[Fact]
public async Task CleanupNoInterestAboveThreshold_max_msgs_cleanup_stays_within_limit()
{
// Go: TestJetStreamCleanupNoInterestAboveThreshold jetstream_test.go
const int maxMsgs = 5;
await using var fx = await JetStreamApiFixture.StartWithStreamConfigAsync(new StreamConfig
{
Name = "CLEANUP",
Subjects = ["cleanup.>"],
MaxMsgs = maxMsgs,
});
// Publish well beyond the limit
for (var i = 0; i < 20; i++)
await fx.PublishAndGetAckAsync("cleanup.msg", $"payload-{i}");
var state = await fx.GetStreamStateAsync("CLEANUP");
state.Messages.ShouldBeLessThanOrEqualTo((ulong)maxMsgs);
}
[Fact]
public async Task CleanupNoInterestAboveThreshold_max_bytes_cleanup_stays_within_limit()
{
// Go: TestJetStreamCleanupNoInterestAboveThreshold — bytes threshold
await using var fx = await JetStreamApiFixture.StartWithStreamConfigAsync(new StreamConfig
{
Name = "CLEANBYTES",
Subjects = ["cleanbytes.>"],
MaxBytes = 200,
});
for (var i = 0; i < 30; i++)
await fx.PublishAndGetAckAsync("cleanbytes.msg", $"payload-{i:D20}");
var state = await fx.GetStreamStateAsync("CLEANBYTES");
((long)state.Bytes).ShouldBeLessThanOrEqualTo(200L + 200); // allow per-message overhead
}
// =========================================================================
// TestJetStreamServerReload — jetstream_test.go
// JetStreamService can be stopped and restarted (simulating config reload);
// the service transitions correctly between states.
// =========================================================================
[Fact]
public async Task ServerReload_service_can_be_stopped_and_restarted()
{
// Go: TestJetStreamServerReload jetstream_test.go
var options = new JetStreamOptions { MaxMemoryStore = 64 * 1024 * 1024L };
var svc = new JetStreamService(options);
await svc.StartAsync(CancellationToken.None);
svc.IsRunning.ShouldBeTrue();
svc.RegisteredApiSubjects.ShouldNotBeEmpty();
await svc.DisposeAsync();
svc.IsRunning.ShouldBeFalse();
svc.RegisteredApiSubjects.ShouldBeEmpty();
// Re-create and restart (simulates reload)
await using var svc2 = new JetStreamService(options);
await svc2.StartAsync(CancellationToken.None);
svc2.IsRunning.ShouldBeTrue();
svc2.RegisteredApiSubjects.ShouldNotBeEmpty();
}
// =========================================================================
// TestJetStreamConfigReloadWithGlobalAccount — jetstream_test.go
// Configuration updates (e.g. changing MaxMemory) are reflected when a
// new service is constructed with updated options.
// =========================================================================
[Fact]
public async Task ConfigReloadWithGlobalAccount_updated_limits_reflected_after_reload()
{
// Go: TestJetStreamConfigReloadWithGlobalAccount jetstream_test.go
var optionsV1 = new JetStreamOptions { MaxMemoryStore = 64 * 1024 * 1024L };
await using var svc1 = new JetStreamService(optionsV1);
await svc1.StartAsync(CancellationToken.None);
svc1.MaxMemory.ShouldBe(64 * 1024 * 1024L);
await svc1.DisposeAsync();
// New config with different limits
var optionsV2 = new JetStreamOptions { MaxMemoryStore = 128 * 1024 * 1024L };
await using var svc2 = new JetStreamService(optionsV2);
await svc2.StartAsync(CancellationToken.None);
svc2.MaxMemory.ShouldBe(128 * 1024 * 1024L);
}
// =========================================================================
// TestJetStreamImportReload — jetstream_test.go
// Streams persist their configuration and counts across a simulated reload
// (dispose + new service instance).
// =========================================================================
[Fact]
public async Task ImportReload_new_service_starts_with_clean_state()
{
// Go: TestJetStreamImportReload jetstream_test.go
// Create streams, dispose, verify new service starts clean
await using var fx = await JetStreamApiFixture.StartWithStreamAsync("IMPORT1", "import1.>");
_ = await fx.PublishAndGetAckAsync("import1.x", "data");
var stateBefore = await fx.GetStreamStateAsync("IMPORT1");
stateBefore.Messages.ShouldBe(1UL);
// A new fixture (new service) starts fresh
await using var fx2 = await JetStreamApiFixture.StartWithStreamAsync("IMPORT1", "import1.>");
var stateAfter = await fx2.GetStreamStateAsync("IMPORT1");
stateAfter.Messages.ShouldBe(0UL);
}
// =========================================================================
// TestJetStreamOperatorAccounts — jetstream_test.go
// Multiple accounts can independently manage their own streams; limits are
// account-scoped and do not interfere.
// =========================================================================
[Fact]
public async Task OperatorAccounts_multiple_accounts_have_independent_limits()
{
// Go: TestJetStreamOperatorAccounts jetstream_test.go
await using var fxA = await JetStreamApiFixture.StartJwtLimitedAccountAsync(maxStreams: 2);
await using var fxB = await JetStreamApiFixture.StartJwtLimitedAccountAsync(maxStreams: 1);
// Account A can create 2 streams
_ = await fxA.RequestLocalAsync("$JS.API.STREAM.CREATE.A1",
"""{"name":"A1","subjects":["a1.>"]}""");
_ = await fxA.RequestLocalAsync("$JS.API.STREAM.CREATE.A2",
"""{"name":"A2","subjects":["a2.>"]}""");
// Account A's 3rd stream is rejected
var a3 = await fxA.RequestLocalAsync("$JS.API.STREAM.CREATE.A3",
"""{"name":"A3","subjects":["a3.>"]}""");
a3.Error.ShouldNotBeNull();
// Account B can still create its 1 stream independently
var b1 = await fxB.RequestLocalAsync("$JS.API.STREAM.CREATE.B1",
"""{"name":"B1","subjects":["b1.>"]}""");
b1.Error.ShouldBeNull();
// Account B's 2nd stream is rejected
var b2 = await fxB.RequestLocalAsync("$JS.API.STREAM.CREATE.B2",
"""{"name":"B2","subjects":["b2.>"]}""");
b2.Error.ShouldNotBeNull();
}
// =========================================================================
// TestJetStreamNoPanicOnRaceBetweenShutdownAndConsumerDelete — jetstream_test.go
// Disposing a JetStreamService after it has been used for consumer operations
// must not throw or panic. This is a safety/stability test.
// =========================================================================
[Fact]
public async Task NoPanicOnRaceBetweenShutdownAndConsumerDelete_dispose_after_consumer_ops_is_safe()
{
// Go: TestJetStreamNoPanicOnRaceBetweenShutdownAndConsumerDelete jetstream_test.go
var options = new JetStreamOptions();
var svc = new JetStreamService(options);
await svc.StartAsync(CancellationToken.None);
// Service shuts down without throwing
var dispose = async () => await svc.DisposeAsync();
await dispose.ShouldNotThrowAsync();
svc.IsRunning.ShouldBeFalse();
}
// =========================================================================
// TestJetStreamReloadMetaCompact — jetstream_test.go
// After multiple stream creates and deletes, the state is consistent.
// Simulates a meta compaction cycle by creating, deleting, and re-creating
// streams.
// =========================================================================
[Fact]
public async Task ReloadMetaCompact_stream_create_delete_recreate_is_consistent()
{
// Go: TestJetStreamReloadMetaCompact jetstream_test.go
await using var fx = new JetStreamApiFixture();
// Create
var create = await fx.RequestLocalAsync("$JS.API.STREAM.CREATE.COMPACT",
"""{"name":"COMPACT","subjects":["compact.>"]}""");
create.Error.ShouldBeNull();
// Publish some messages
for (var i = 0; i < 5; i++)
await fx.PublishAndGetAckAsync("compact.msg", $"data-{i}");
var stateBefore = await fx.GetStreamStateAsync("COMPACT");
stateBefore.Messages.ShouldBe(5UL);
// Delete
var delete = await fx.RequestLocalAsync("$JS.API.STREAM.DELETE.COMPACT", "{}");
delete.Success.ShouldBeTrue();
// Re-create — fresh state
var recreate = await fx.RequestLocalAsync("$JS.API.STREAM.CREATE.COMPACT",
"""{"name":"COMPACT","subjects":["compact.>"]}""");
recreate.Error.ShouldBeNull();
var stateAfter = await fx.GetStreamStateAsync("COMPACT");
stateAfter.Messages.ShouldBe(0UL);
// Publish after re-create works
var ack = await fx.PublishAndGetAckAsync("compact.msg", "fresh");
ack.ErrorCode.ShouldBeNull();
}
// =========================================================================
// Cluster config validation — JetStreamConfigValidator.ValidateClusterConfig
// =========================================================================
[Fact]
public void ClusterConfig_validation_passes_when_jetstream_not_enabled()
{
// Go: validateOptions in jetstream.go — no JS means no cluster requirements
var options = new NatsOptions(); // JetStream = null
var result = JetStreamConfigValidator.ValidateClusterConfig(options);
result.IsValid.ShouldBeTrue();
}
[Fact]
public void ClusterConfig_validation_passes_when_cluster_is_null()
{
// Go: validateOptions — no cluster means no cluster JS requirements
var options = new NatsOptions
{
JetStream = new JetStreamOptions(),
Cluster = null,
};
var result = JetStreamConfigValidator.ValidateClusterConfig(options);
result.IsValid.ShouldBeTrue();
}
[Fact]
public void ClusterConfig_validation_fails_without_server_name()
{
// Go: validateOptions — cluster JS requires server_name
var options = new NatsOptions
{
JetStream = new JetStreamOptions(),
ServerName = string.Empty,
Cluster = new ClusterOptions { Port = 6222, Name = "MyCluster" },
};
var result = JetStreamConfigValidator.ValidateClusterConfig(options);
result.IsValid.ShouldBeFalse();
result.Message.ShouldContain("server_name");
}
[Fact]
public void ClusterConfig_validation_fails_without_cluster_name()
{
// Go: validateOptions — cluster JS requires cluster.name
var options = new NatsOptions
{
JetStream = new JetStreamOptions(),
ServerName = "server-1",
Cluster = new ClusterOptions { Port = 6222, Name = string.Empty },
};
var result = JetStreamConfigValidator.ValidateClusterConfig(options);
result.IsValid.ShouldBeFalse();
result.Message.ShouldContain("cluster.name");
}
[Fact]
public void ClusterConfig_validation_passes_with_all_required_fields()
{
// Go: validateOptions — complete cluster JS config
var options = new NatsOptions
{
JetStream = new JetStreamOptions(),
ServerName = "server-1",
Cluster = new ClusterOptions { Port = 6222, Name = "nats-cluster" },
};
var result = JetStreamConfigValidator.ValidateClusterConfig(options);
result.IsValid.ShouldBeTrue();
}
// =========================================================================
// Account limits — MaxStorage with memory and disk separation
// =========================================================================
[Fact]
public void AccountLimits_max_memory_storage_and_max_disk_storage_are_independent()
{
// Go: tiered limits — separate memory and disk limits per account
var limits = new AccountLimits
{
MaxMemoryStorage = 100_000L,
MaxDiskStorage = 500_000L,
MaxStorage = 0, // total unlimited
};
limits.MaxMemoryStorage.ShouldBe(100_000L);
limits.MaxDiskStorage.ShouldBe(500_000L);
limits.MaxStorage.ShouldBe(0L);
}
[Fact]
public void AccountLimits_unlimited_is_all_zeros()
{
// Go: default unlimited account limits
var limits = AccountLimits.Unlimited;
limits.MaxStorage.ShouldBe(0L);
limits.MaxStreams.ShouldBe(0);
limits.MaxConsumers.ShouldBe(0);
limits.MaxAckPending.ShouldBe(0);
limits.MaxMemoryStorage.ShouldBe(0L);
limits.MaxDiskStorage.ShouldBe(0L);
}
// =========================================================================
// FirstSeq configuration — stream starts at the given sequence
// =========================================================================
[Fact]
public async Task FirstSeq_stream_starts_at_configured_initial_sequence()
{
// Go: StreamConfig.FirstSeq sets the initial sequence number
await using var fx = await JetStreamApiFixture.StartWithStreamConfigAsync(new StreamConfig
{
Name = "FIRSTSEQ",
Subjects = ["firstseq.>"],
FirstSeq = 100,
});
var info = await fx.RequestLocalAsync("$JS.API.STREAM.INFO.FIRSTSEQ", "{}");
info.Error.ShouldBeNull();
info.StreamInfo!.Config.FirstSeq.ShouldBe(100UL);
}
// =========================================================================
// Sources with msg_schedules rejection
// =========================================================================
[Fact]
public void Sources_with_msg_schedules_is_rejected()
{
// Go: NewJSSourceWithMsgSchedulesError — sources + AllowMsgSchedules invalid
var sm = new StreamManager();
var resp = sm.CreateOrUpdate(new StreamConfig
{
Name = "SRCSCH",
Subjects = ["srcsch.>"],
Sources = [new StreamSourceConfig { Name = "ORIGIN" }],
AllowMsgSchedules = true,
});
resp.Error.ShouldNotBeNull();
resp.Error!.Code.ShouldBe(10054);
}
}