// Copyright 2012-2026 The NATS Authors // Licensed under the Apache License, Version 2.0 using Shouldly; using ZB.MOM.NatsNet.Server; namespace ZB.MOM.NatsNet.Server.Tests.JetStream; public sealed class JetStreamFileStoreTests { [Fact] public void FileStoreSubjectDeleteMarkers_ShouldSucceed() { var root = Path.Combine(Path.GetTempPath(), $"fs-sdm-{Guid.NewGuid():N}"); Directory.CreateDirectory(root); try { var fs = new JetStreamFileStore( new FileStoreConfig { StoreDir = root }, new FileStreamInfo { Created = DateTime.UtcNow, Config = new StreamConfig { Name = "SDM", Storage = StorageType.FileStorage, Subjects = ["test"], MaxAge = TimeSpan.FromSeconds(1), AllowMsgTTL = true, SubjectDeleteMarkerTTL = TimeSpan.FromSeconds(1), }, }); var (seq, _) = fs.StoreMsg("test", null, [1], 0); seq.ShouldBe(1UL); var (removed, err) = fs.RemoveMsg(seq); removed.ShouldBeTrue(); err.ShouldBeNull(); fs.State().Msgs.ShouldBe(0UL); fs.Stop(); } finally { Directory.Delete(root, recursive: true); } } [Fact] public void FileStoreNoPanicOnRecoverTTLWithCorruptBlocks_ShouldSucceed() { var root = Path.Combine(Path.GetTempPath(), $"fs-ttl-{Guid.NewGuid():N}"); Directory.CreateDirectory(root); try { var hdr = NatsMessageHeaders.GenHeader(null, NatsHeaderConstants.JsMessageTtl, "1"); var fs = NewStore(root, cfg => { cfg.AllowMsgTTL = true; cfg.Subjects = ["foo"]; }); fs.StoreMsg("foo", hdr, [1], 1).Seq.ShouldBe(1UL); fs.Stop(); var reopened = NewStore(root, cfg => { cfg.AllowMsgTTL = true; cfg.Subjects = ["foo"]; }); reopened.State().Msgs.ShouldBeGreaterThanOrEqualTo(0UL); reopened.Stop(); } finally { Directory.Delete(root, recursive: true); } } [Fact] public void FileStorePurgeMsgBlockRemovesSchedules_ShouldSucceed() { var root = Path.Combine(Path.GetTempPath(), $"fs-purge-sched-{Guid.NewGuid():N}"); Directory.CreateDirectory(root); try { var fs = NewStore(root, cfg => { cfg.AllowMsgSchedules = true; cfg.Subjects = ["foo.*"]; }); var hdr = NatsMessageHeaders.GenHeader(null, NatsHeaderConstants.JsSchedulePattern, "@every 10s"); hdr = NatsMessageHeaders.GenHeader(hdr, NatsHeaderConstants.JsScheduleTarget, "foo.target"); for (var i = 0; i < 10; i++) fs.StoreMsg($"foo.schedule.{i}", hdr, [1], 0); var (purged, err) = fs.Purge(); err.ShouldBeNull(); purged.ShouldBe(10UL); fs.State().Msgs.ShouldBe(0UL); fs.Stop(); } finally { Directory.Delete(root, recursive: true); } } [Fact] public void StoreMsg_LoadAndPurge_ShouldRoundTrip() { var root = Path.Combine(Path.GetTempPath(), $"fs-{Guid.NewGuid():N}"); Directory.CreateDirectory(root); try { var fs = NewStore(root); var (seq1, _) = fs.StoreMsg("foo", [1], [2, 3], 0); var (seq2, _) = fs.StoreMsg("bar", null, [4, 5], 0); seq1.ShouldBe(1UL); seq2.ShouldBe(2UL); fs.State().Msgs.ShouldBe(2UL); var msg = fs.LoadMsg(1, null); msg.ShouldNotBeNull(); msg!.Subject.ShouldBe("foo"); fs.SubjectForSeq(2).Subject.ShouldBe("bar"); fs.SubjectsTotals(string.Empty).Count.ShouldBe(2); var (removed, remErr) = fs.RemoveMsg(1); removed.ShouldBeTrue(); remErr.ShouldBeNull(); fs.State().Msgs.ShouldBe(1UL); var (purged, purgeErr) = fs.Purge(); purgeErr.ShouldBeNull(); purged.ShouldBe(1UL); fs.State().Msgs.ShouldBe(0UL); var (snapshot, snapErr) = fs.Snapshot(TimeSpan.FromSeconds(1), includeConsumers: false, checkMsgs: false); snapErr.ShouldBeNull(); snapshot.ShouldNotBeNull(); snapshot!.Reader.ShouldNotBeNull(); var (total, reported, utilErr) = fs.Utilization(); utilErr.ShouldBeNull(); total.ShouldBe(reported); fs.Stop(); } finally { Directory.Delete(root, recursive: true); } } private static JetStreamFileStore NewStore(string root, Action? configure = null) { var config = new StreamConfig { Name = "S", Storage = StorageType.FileStorage, Subjects = ["foo", "bar"], }; configure?.Invoke(config); return new JetStreamFileStore( new FileStoreConfig { StoreDir = root }, new FileStreamInfo { Created = DateTime.UtcNow, Config = config, }); } }