diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/FileStore.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/FileStore.cs index a812af5..c80f15a 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/FileStore.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/FileStore.cs @@ -2300,6 +2300,92 @@ public sealed class JetStreamFileStore : IStreamStore, IDisposable }); } + // Lock should be held by caller. + private (ulong[] Seqs, Exception? Error) AllLastSeqsLocked() + { + if (_state.Msgs == 0 || NoTrackSubjects() || _psim == null) + return (Array.Empty(), null); + + var numSubjects = _psim.Size(); + if (numSubjects == 0) + return (Array.Empty(), null); + + var seqs = new List(numSubjects); + var seen = new HashSet(numSubjects, StringComparer.Ordinal); + + for (var i = _blks.Count - 1; i >= 0; i--) + { + if (seen.Count == numSubjects) + break; + + var mb = _blks[i]; + if (mb.Fss == null) + continue; + + mb.Fss.IterFast((subjectBytes, ss) => + { + var subject = Encoding.UTF8.GetString(subjectBytes); + if (!seen.Add(subject)) + return true; + + if (ss.LastNeedsUpdate) + RecalculateLastForSubject(subject, ss); + + seqs.Add(ss.Last); + return true; + }); + } + + seqs.Sort(); + return (seqs.ToArray(), null); + } + + // Lock should be held by caller. + private bool FilterIsAll(string[] filters) + { + ArgumentNullException.ThrowIfNull(filters); + + var streamSubjects = _cfg.Config.Subjects ?? []; + if (filters.Length != streamSubjects.Length) + return false; + + var sortedFilters = filters.ToArray(); + var sortedSubjects = streamSubjects.ToArray(); + Array.Sort(sortedFilters, StringComparer.Ordinal); + Array.Sort(sortedSubjects, StringComparer.Ordinal); + + for (var i = 0; i < sortedFilters.Length; i++) + { + if (!SubscriptionIndex.SubjectIsSubsetMatch(sortedSubjects[i], sortedFilters[i])) + return false; + } + + return true; + } + + // Lock should be held by caller. + private void RecalculateLastForSubject(string subject, SimpleState ss) + { + var subjectBytes = Encoding.UTF8.GetBytes(subject); + for (var i = _blks.Count - 1; i >= 0; i--) + { + var fss = _blks[i].Fss; + if (fss == null) + continue; + + var (candidate, ok) = fss.Find(subjectBytes); + if (!ok || candidate == null || candidate.Last == 0) + continue; + + ss.Last = candidate.Last; + ss.LastNeedsUpdate = false; + return; + } + + ss.Last = 0; + ss.LastNeedsUpdate = false; + } + // ----------------------------------------------------------------------- // IStreamStore — type / state // ----------------------------------------------------------------------- diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/JetStreamFileStoreReadQueryTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/JetStreamFileStoreReadQueryTests.cs index 852ecc9..0953873 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/JetStreamFileStoreReadQueryTests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/JetStreamFileStoreReadQueryTests.cs @@ -309,6 +309,141 @@ public sealed class JetStreamFileStoreReadQueryTests } } + [Fact] + public void AllLastSeqsLocked_MultipleSubjects_ReturnsSortedLastSequences() + { + var storeDir = CreateStoreDir(); + try + { + var fs = CreateStore(storeDir); + SetField(fs, "_state", new StreamState { Msgs = 6UL }); + + var b1 = NewBlock(1, 1, 20); + b1.Fss = BuildSubjectStateTree(("foo.a", 2, 10, 10), ("foo.b", 2, 20, 20)); + var b2 = NewBlock(2, 21, 40); + b2.Fss = BuildSubjectStateTree(("foo.b", 1, 25, 25), ("foo.c", 1, 15, 15)); + + ConfigureBlocks(fs, b1, b2); + SetPsims(fs, ("foo.a", 1, 1, 2), ("foo.b", 1, 2, 3), ("foo.c", 2, 2, 1)); + + var (seqs, error) = InvokeAllLastSeqsLocked(fs); + + error.ShouldBeNull(); + seqs.ShouldBe([10UL, 15UL, 25UL]); + fs.Stop(); + } + finally + { + DeleteStoreDir(storeDir); + } + } + + [Fact] + public void AllLastSeqsLocked_NoMessagesOrNoTracking_ReturnsEmpty() + { + var storeDir = CreateStoreDir(); + try + { + var fs = CreateStore(storeDir); + SetField(fs, "_state", new StreamState { Msgs = 0UL }); + SetPsims(fs, ("foo.a", 1, 1, 1)); + + var (noMsgsSeqs, noMsgsError) = InvokeAllLastSeqsLocked(fs); + noMsgsError.ShouldBeNull(); + noMsgsSeqs.ShouldBeEmpty(); + fs.Stop(); + } + finally + { + DeleteStoreDir(storeDir); + } + + var noTrackDir = CreateStoreDir(); + try + { + var fs = CreateStore(noTrackDir, subjects: []); + SetField(fs, "_state", new StreamState { Msgs = 1UL }); + SetField(fs, "_psim", new SubjectTree()); + + var (noTrackSeqs, noTrackError) = InvokeAllLastSeqsLocked(fs); + noTrackError.ShouldBeNull(); + noTrackSeqs.ShouldBeEmpty(); + fs.Stop(); + } + finally + { + DeleteStoreDir(noTrackDir); + } + } + + [Fact] + public void AllLastSeqsLocked_LastNeedsUpdate_RecalculatesBeforeCollecting() + { + var storeDir = CreateStoreDir(); + try + { + var fs = CreateStore(storeDir); + SetField(fs, "_state", new StreamState { Msgs = 3UL }); + SetPsims(fs, ("foo.a", 1, 2, 3)); + + var b1 = NewBlock(1, 1, 12); + b1.Fss = BuildSubjectStateTree(("foo.a", 3, 1, 12)); + + var b2 = NewBlock(2, 13, 20); + var stale = new SimpleState { Msgs = 3UL, First = 1UL, Last = 0UL, LastNeedsUpdate = true }; + var staleTree = new SubjectTree(); + staleTree.Insert(System.Text.Encoding.UTF8.GetBytes("foo.a"), stale); + b2.Fss = staleTree; + + ConfigureBlocks(fs, b1, b2); + + var (seqs, error) = InvokeAllLastSeqsLocked(fs); + + error.ShouldBeNull(); + seqs.ShouldBe([12UL]); + fs.Stop(); + } + finally + { + DeleteStoreDir(storeDir); + } + } + + [Fact] + public void FilterIsAll_ReorderedEquivalentFilters_ReturnsTrue() + { + var storeDir = CreateStoreDir(); + try + { + var fs = CreateStore(storeDir, ["foo.*", "bar.>"]); + + InvokeFilterIsAll(fs, ["bar.>", "foo.*"]).ShouldBeTrue(); + fs.Stop(); + } + finally + { + DeleteStoreDir(storeDir); + } + } + + [Fact] + public void FilterIsAll_CountMismatchOrNonSubset_ReturnsFalse() + { + var storeDir = CreateStoreDir(); + try + { + var fs = CreateStore(storeDir, ["foo.*", "bar.>"]); + + InvokeFilterIsAll(fs, ["foo.A"]).ShouldBeFalse(); + InvokeFilterIsAll(fs, ["bar.>", "baz.*"]).ShouldBeFalse(); + fs.Stop(); + } + finally + { + DeleteStoreDir(storeDir); + } + } + private static string CreateStoreDir() { var root = Path.Combine(Path.GetTempPath(), $"fs-read-query-{Guid.NewGuid():N}"); @@ -322,8 +457,10 @@ public sealed class JetStreamFileStoreReadQueryTests Directory.Delete(storeDir, recursive: true); } - private static JetStreamFileStore CreateStore(string storeDir) + private static JetStreamFileStore CreateStore(string storeDir, string[]? subjects = null) { + subjects ??= ["foo.*", "bar.*", "zoo.*"]; + return new JetStreamFileStore( new FileStoreConfig { StoreDir = storeDir, BlockSize = 1024 }, new FileStreamInfo @@ -333,7 +470,7 @@ public sealed class JetStreamFileStoreReadQueryTests { Name = "S", Storage = StorageType.FileStorage, - Subjects = ["foo.*", "bar.*", "zoo.*"], + Subjects = subjects, }, }); } @@ -434,6 +571,24 @@ public sealed class JetStreamFileStoreReadQueryTests _ = mi!.Invoke(fs, [filter, ss]); } + private static (ulong[] Seqs, Exception? Error) InvokeAllLastSeqsLocked(JetStreamFileStore fs) + { + var mi = typeof(JetStreamFileStore).GetMethod("AllLastSeqsLocked", BindingFlags.Instance | BindingFlags.NonPublic); + mi.ShouldNotBeNull(); + var result = mi!.Invoke(fs, []); + result.ShouldNotBeNull(); + return ((ulong[], Exception?))result!; + } + + private static bool InvokeFilterIsAll(JetStreamFileStore fs, string[] filters) + { + var mi = typeof(JetStreamFileStore).GetMethod("FilterIsAll", BindingFlags.Instance | BindingFlags.NonPublic); + mi.ShouldNotBeNull(); + var result = mi!.Invoke(fs, [filters]); + result.ShouldNotBeNull(); + return (bool)result; + } + private static Psi? GetPsi(JetStreamFileStore fs, string subject) { var fi = typeof(JetStreamFileStore).GetField("_psim", BindingFlags.Instance | BindingFlags.NonPublic); diff --git a/porting.db b/porting.db index b95a387..7af46c7 100644 Binary files a/porting.db and b/porting.db differ