using System.Collections.Concurrent; using System.Diagnostics; using System.Reflection; using Shouldly; using ZB.MOM.NatsNet.Server; namespace ZB.MOM.NatsNet.Server.Tests.ImplBacklog; public sealed partial class ConcurrencyTests2 { [Fact] // T:2505 public void NoRaceStoreReverseWalkWithDeletesPerf_ShouldSucceed() { var root = NewRoot(); Directory.CreateDirectory(root); var fileCfg = new StreamConfig { Name = "zzz", Subjects = ["foo.*"], Storage = StorageType.FileStorage, MaxMsgs = -1, MaxBytes = -1, MaxAge = TimeSpan.Zero, MaxMsgsPer = -1, Discard = DiscardPolicy.DiscardOld, Retention = RetentionPolicy.LimitsPolicy, }; var memCfg = fileCfg.Clone(); memCfg.Storage = StorageType.MemoryStorage; var fs = JetStreamFileStore.NewFileStore(new FileStoreConfig { StoreDir = root }, fileCfg); var ms = JetStreamMemStore.NewMemStore(memCfg); try { var msg = "Hello"u8.ToArray(); foreach (var store in new IStreamStore[] { fs, ms }) { store.StoreMsg("foo.A", null, msg, 0).Seq.ShouldBeGreaterThan(0UL); for (var i = 0; i < 150_000; i++) store.StoreMsg("foo.B", null, msg, 0); store.StoreMsg("foo.C", null, msg, 0); var state = store.State(); state.Msgs.ShouldBe(150_002UL); var (purged, purgeErr) = store.PurgeEx("foo.B", 1, 0); purgeErr.ShouldBeNull(); purged.ShouldBe(150_000UL); if (store is JetStreamFileStore fileStore) PreloadFileStoreCaches(fileStore); var timer = Stopwatch.StartNew(); var scratch = new StoreMsg(); for (var seq = state.LastSeq; seq > 0; seq--) { try { _ = store.LoadMsg(seq, scratch); } catch (Exception ex) when (ReferenceEquals(ex, StoreErrors.ErrStoreMsgNotFound)) { continue; } } timer.Stop(); var reverseWalkElapsed = timer.Elapsed; if (store is JetStreamFileStore fileStore2) PreloadFileStoreCaches(fileStore2); var seen = 0; var cursor = state.LastSeq; timer.Restart(); while (true) { var (sm, err) = store.LoadPrevMsg(cursor, scratch); if (err == StoreErrors.ErrStoreEOF) break; err.ShouldBeNull(); sm.ShouldNotBeNull(); cursor = sm!.Seq > 0 ? sm.Seq - 1 : 0; seen++; } timer.Stop(); seen.ShouldBe(2); if (store is JetStreamMemStore) { timer.Elapsed.ShouldBeLessThan(reverseWalkElapsed); } else { (timer.Elapsed.Ticks * 10L).ShouldBeLessThan(reverseWalkElapsed.Ticks); } } } finally { fs.Stop(); ms.Stop(); if (Directory.Exists(root)) Directory.Delete(root, recursive: true); } } [Fact] // T:2510 public void NoRaceFileStorePurgeExAsyncTombstones_ShouldSucceed() { var cfg = DefaultStreamConfig(); cfg.Subjects = ["*.*"]; WithStore((fs, _) => { var msg = "zzz"u8.ToArray(); fs.StoreMsg("foo.A", null, msg, 0); fs.StoreMsg("foo.B", null, msg, 0); for (var i = 0; i < 500; i++) fs.StoreMsg("foo.C", null, msg, 0); fs.StoreMsg("foo.D", null, msg, 0); PreloadFileStoreCaches(fs); var sw = Stopwatch.StartNew(); var (purgedOne, errOne) = fs.PurgeEx("foo.B", 0, 0); sw.Stop(); errOne.ShouldBeNull(); purgedOne.ShouldBe(1UL); var singleElapsed = sw.Elapsed; sw.Restart(); var (purgedMany, errMany) = fs.PurgeEx("foo.C", 0, 0); sw.Stop(); errMany.ShouldBeNull(); purgedMany.ShouldBe(500UL); var manyElapsed = sw.Elapsed; // Large subject purges should not degenerate to per-message sync behavior. var scaledSingle = Math.Max(1L, singleElapsed.Ticks) * 80L; scaledSingle.ShouldBeGreaterThan(manyElapsed.Ticks); }, cfg); } [Fact] // T:2491 public void NoRaceFileStoreMsgLoadNextMsgMultiPerf_ShouldSucceed() { WithStore((fs, _) => { for (var i = 0; i < 150; i++) fs.StoreMsg($"ln.{i % 6}", null, "x"u8.ToArray(), 0); var errors = new ConcurrentQueue(); Parallel.For(0, 400, _ => { try { var (sm, _) = fs.LoadNextMsgMulti(new[] { "ln.1", "ln.*" }, 1, null); if (sm != null) sm.Subject.ShouldStartWith("ln."); } catch (Exception ex) { errors.Enqueue(ex); } }); errors.ShouldBeEmpty(); fs.State().Msgs.ShouldBeGreaterThan(0UL); }); } [Fact] // T:2501 public void NoRaceFileStoreMsgLimitsAndOldRecoverState_ShouldSucceed() { var root = NewRoot(); Directory.CreateDirectory(root); try { var cfg = DefaultStreamConfig(maxMsgs: 60); var fs1 = JetStreamFileStore.NewFileStore(new FileStoreConfig { StoreDir = root }, cfg); Parallel.For(0, 180, i => fs1.StoreMsg($"lm.{i % 4}", null, "x"u8.ToArray(), 0)); fs1.Stop(); var fs2 = JetStreamFileStore.NewFileStore(new FileStoreConfig { StoreDir = root }, cfg); var (seq, _) = fs2.StoreMsg("lm.tail", null, "tail"u8.ToArray(), 0); seq.ShouldBeGreaterThan(0UL); fs2.State().Msgs.ShouldBeLessThanOrEqualTo((ulong)cfg.MaxMsgs); fs2.Stop(); } finally { Directory.Delete(root, recursive: true); } } [Fact] // T:2476 public void NoRaceFilestoreBinaryStreamSnapshotEncodingLargeGaps_ShouldSucceed() { WithStore((fs, _) => { const int numMsgs = 5000; var payload = new byte[128]; fs.StoreMsg("zzz", null, payload, 0).Seq.ShouldBe(1UL); for (var i = 2; i < numMsgs; i++) { var (seq, _) = fs.StoreMsg("zzz", null, null, 0); seq.ShouldBeGreaterThan(1UL); fs.RemoveMsg(seq).Removed.ShouldBeTrue(); } fs.StoreMsg("zzz", null, payload, 0).Seq.ShouldBe((ulong)numMsgs); Should.NotThrow(() => InvokePrivate(fs, "SyncBlocks")); var (snapshot, err) = fs.EncodedStreamState(0); err.ShouldBeNull(); StoreParity.IsEncodedStreamState(snapshot).ShouldBeTrue(); snapshot.Length.ShouldBeLessThan(2048); var state = fs.State(); state.FirstSeq.ShouldBe(1UL); state.LastSeq.ShouldBe((ulong)numMsgs); state.Msgs.ShouldBe(2UL); state.NumDeleted.ShouldBe(numMsgs - 2); }); } [Fact] // T:2480 public void NoRaceFileStoreLargeMsgsAndFirstMatching_ShouldSucceed() { var cfg = DefaultStreamConfig(); cfg.Subjects = [">"]; WithStore((fs, _) => { for (var i = 0; i < 4_000; i++) fs.StoreMsg($"foo.bar.{i}", null, null, 0); for (var i = 0; i < 4_000; i++) fs.StoreMsg($"foo.baz.{i}", null, null, 0); var blocks = InvokePrivate(fs, "NumMsgBlocks"); blocks.ShouldBeGreaterThanOrEqualTo(1); var start = fs.State().FirstSeq; for (var seq = start; seq < start + 7_600; seq++) fs.RemoveMsg(seq).Removed.ShouldBeTrue(); var sw = System.Diagnostics.Stopwatch.StartNew(); var (sm, _) = fs.LoadNextMsg("*.baz.*", true, start, null); sw.Stop(); sm.ShouldNotBeNull(); sm!.Subject.ShouldContain(".baz."); sw.ElapsedMilliseconds.ShouldBeLessThan(50); }, cfg); } [Fact] // T:2494 public void NoRaceFileStoreWriteFullStateUniqueSubjects_ShouldSucceed() { var cfg = new StreamConfig { Name = "TEST", Storage = StorageType.FileStorage, Subjects = ["records.>"], MaxMsgs = -1, MaxBytes = 15L * 1024 * 1024 * 1024, MaxAge = TimeSpan.Zero, MaxMsgsPer = 1, Discard = DiscardPolicy.DiscardOld, Retention = RetentionPolicy.LimitsPolicy, }; WithStore((fs, root) => { var payload = Enumerable.Repeat((byte)'Z', 128).ToArray(); var errors = new ConcurrentQueue(); using var cts = new CancellationTokenSource(); var writer = Task.Run(async () => { while (!cts.Token.IsCancellationRequested) { try { var err = InvokePrivate(fs, "WriteFullState"); if (err != null) errors.Enqueue(err); } catch (Exception ex) { errors.Enqueue(ex); } try { await Task.Delay(10, cts.Token); } catch (OperationCanceledException) { break; } } }); for (var i = 0; i < 2_000; i++) { var subject = $"records.{Guid.NewGuid():N}.{i % 5}"; var sw = Stopwatch.StartNew(); fs.StoreMsg(subject, null, payload, 0).Seq.ShouldBeGreaterThan(0UL); sw.Stop(); sw.Elapsed.ShouldBeLessThan(TimeSpan.FromMilliseconds(500)); } cts.Cancel(); Should.NotThrow(() => writer.Wait(TimeSpan.FromSeconds(2))); errors.ShouldBeEmpty(); fs.Stop(); var stateFile = Path.Combine(root, FileStoreDefaults.MsgDir, FileStoreDefaults.StreamStateFile); File.Exists(stateFile).ShouldBeTrue(); new FileInfo(stateFile).Length.ShouldBeGreaterThan(0L); }, cfg); } [Fact] // T:2488 public void NoRaceJetStreamSnapshotsWithSlowAckDontSlowConsumer_ShouldSucceed() { var cfg = DefaultStreamConfig(); cfg.Subjects = ["snap.>"]; WithStore((fs, _) => { var errors = new ConcurrentQueue(); using var cts = new CancellationTokenSource(); var payload = "snapshot"u8.ToArray(); var ts = DateTimeOffset.UtcNow.ToUnixTimeSeconds() * 1_000_000_000L; var consumer = fs.ConsumerStore("snap-consumer", DateTime.UtcNow, new ConsumerConfig { AckPolicy = AckPolicy.AckExplicit }); var slowAcker = Task.Run(async () => { for (ulong i = 1; i <= 100; i++) { try { consumer.UpdateDelivered(i, i, 1, ts + (long)i); await Task.Delay(2, cts.Token); } catch (OperationCanceledException) { break; } catch (Exception ex) { errors.Enqueue(ex); break; } } }); for (var i = 0; i < 100; i++) fs.StoreMsg($"snap.{i % 5}", null, payload, 0); var sw = Stopwatch.StartNew(); var (snapshot, err) = fs.Snapshot(TimeSpan.FromSeconds(2), includeConsumers: true, checkMsgs: true); sw.Stop(); err.ShouldBeNull(); snapshot.ShouldNotBeNull(); snapshot!.State.Msgs.ShouldBeGreaterThan(0UL); sw.Elapsed.ShouldBeLessThan(TimeSpan.FromSeconds(2)); using (snapshot.Reader) { } cts.Cancel(); Should.NotThrow(() => slowAcker.Wait(TimeSpan.FromSeconds(1))); errors.ShouldBeEmpty(); consumer.Stop(); }, cfg); } private static void WithStore(Action action, StreamConfig? cfg = null) { var root = NewRoot(); Directory.CreateDirectory(root); JetStreamFileStore? fs = null; try { fs = JetStreamFileStore.NewFileStore(new FileStoreConfig { StoreDir = root }, cfg ?? DefaultStreamConfig()); action(fs, root); } finally { fs?.Stop(); if (Directory.Exists(root)) Directory.Delete(root, recursive: true); } } private static StreamConfig DefaultStreamConfig(long maxMsgs = -1) { return new StreamConfig { Name = "TEST", Storage = StorageType.FileStorage, Subjects = ["test.>"], MaxMsgs = maxMsgs, MaxBytes = -1, MaxAge = TimeSpan.Zero, MaxMsgsPer = -1, Discard = DiscardPolicy.DiscardOld, Retention = RetentionPolicy.LimitsPolicy, }; } private static void InvokePrivate(object target, string methodName, params object[] args) { var method = target.GetType().GetMethod(methodName, BindingFlags.Instance | BindingFlags.NonPublic); method.ShouldNotBeNull(); method!.Invoke(target, args); } private static T InvokePrivate(object target, string methodName, params object[] args) { var method = target.GetType().GetMethod(methodName, BindingFlags.Instance | BindingFlags.NonPublic); method.ShouldNotBeNull(); var result = method!.Invoke(target, args); if (result == null) return default!; return (T)result; } private static void PreloadFileStoreCaches(JetStreamFileStore fs) { var blksField = typeof(JetStreamFileStore).GetField("_blks", BindingFlags.Instance | BindingFlags.NonPublic); blksField.ShouldNotBeNull(); var blocks = blksField!.GetValue(fs) as System.Collections.IEnumerable; blocks.ShouldNotBeNull(); foreach (var mb in blocks!) { var load = mb!.GetType().GetMethod("LoadMsgs", BindingFlags.Instance | BindingFlags.NonPublic | BindingFlags.Public); load.ShouldNotBeNull(); var result = load!.Invoke(mb, []); if (result is Exception err) throw err; } } private static string NewRoot() => Path.Combine(Path.GetTempPath(), $"impl-fs-c2-{Guid.NewGuid():N}"); }