using System.Diagnostics; using Shouldly; using ZB.MOM.NatsNet.Server; namespace ZB.MOM.NatsNet.Server.Tests.ImplBacklog; public sealed class JetStreamClusterLongTests { [Fact] // T:1219 public void LongFileStoreEnforceMsgPerSubjectLimit_ShouldSucceed() { var root = Path.Combine(Path.GetTempPath(), $"impl-fs-long-{Guid.NewGuid():N}"); Directory.CreateDirectory(root); JetStreamFileStore? fs = null; try { fs = JetStreamFileStore.NewFileStore( new FileStoreConfig { StoreDir = root, BlockSize = 1024 }, new StreamConfig { Name = "zzz", Storage = StorageType.FileStorage, Subjects = ["test.>"], MaxMsgsPer = 1, MaxMsgs = -1, MaxBytes = -1, Discard = DiscardPolicy.DiscardOld, Retention = RetentionPolicy.LimitsPolicy, }); const int keys = 4_000; const int rewriteCount = 30_000; for (var i = 0; i < keys; i++) { fs.StoreMsg($"test.{i:000000}", null, "seed"u8.ToArray(), 0).Seq.ShouldBeGreaterThan(0UL); } var sw = Stopwatch.StartNew(); for (var i = 0; i < rewriteCount; i++) { var n = Random.Shared.Next(keys); fs.StoreMsg($"test.{n:000000}", null, "rewrite"u8.ToArray(), 0).Seq.ShouldBeGreaterThan(0UL); } sw.Stop(); var totals = fs.SubjectsTotals("test.*"); totals.Count.ShouldBe(keys); totals.Values.All(v => v <= 1UL).ShouldBeTrue(); sw.Elapsed.ShouldBeLessThan(TimeSpan.FromSeconds(30)); } finally { fs?.Stop(); if (Directory.Exists(root)) Directory.Delete(root, recursive: true); } } }