diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/FileStore.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/FileStore.cs index 7b5cd2d..a812af5 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/FileStore.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/FileStore.cs @@ -2132,6 +2132,174 @@ public sealed class JetStreamFileStore : IStreamStore, IDisposable return (bi + 1, null); } + // Lock should be held by caller. + private void NumFilteredPending(string filter, SimpleState ss) + => NumFilteredPendingWithLast(filter, includeLast: true, ss); + + // Lock should be held by caller. + private void NumFilteredPendingNoLast(string filter, SimpleState ss) + => NumFilteredPendingWithLast(filter, includeLast: false, ss); + + // Lock should be held by caller. + private void NumFilteredPendingWithLast(string filter, bool includeLast, SimpleState ss) + { + ArgumentNullException.ThrowIfNull(ss); + + var isAll = string.IsNullOrEmpty(filter) || filter == ">"; + if (isAll) + { + ss.First = _state.FirstSeq; + ss.Last = _state.LastSeq; + ss.Msgs = _state.Msgs; + return; + } + + ss.First = 0; + ss.Last = 0; + ss.Msgs = 0; + + if (_psim == null) + return; + + var wc = SubscriptionIndex.SubjectHasWildcard(filter); + var filterBytes = Encoding.UTF8.GetBytes(filter); + var start = uint.MaxValue; + uint stop = 0; + + if (wc) + { + _psim.Match(filterBytes, (_, psi) => + { + ss.Msgs += psi.Total; + if (psi.Fblk < start) + start = psi.Fblk; + if (psi.Lblk > stop) + stop = psi.Lblk; + return true; + }); + } + else + { + var (psi, ok) = _psim.Find(filterBytes); + if (ok && psi != null) + { + ss.Msgs += psi.Total; + start = psi.Fblk; + stop = psi.Lblk; + } + } + + if (stop == 0 || start == uint.MaxValue) + return; + + if (_bim.TryGetValue(start, out var mb) && mb != null) + { + var (_, first, _) = FilteredPendingInBlock(mb, filter, wc); + ss.First = first; + } + + if (ss.First == 0) + { + uint? correctedFirstBlock = null; + for (ulong i = (ulong)start + 1; i <= stop; i++) + { + if (!_bim.TryGetValue((uint)i, out var candidate) || candidate == null) + continue; + + var (_, first, _) = FilteredPendingInBlock(candidate, filter, wc); + if (first > 0) + { + ss.First = first; + correctedFirstBlock = (uint)i; + break; + } + } + + if (correctedFirstBlock is { } corrected) + SchedulePsiFirstBlockCorrection(filter, wc, corrected); + } + + if (includeLast && _bim.TryGetValue(stop, out var lastMb) && lastMb != null) + { + var (_, _, last) = FilteredPendingInBlock(lastMb, filter, wc); + ss.Last = last; + } + } + + // Caller should hold a read lock on fs + message block lock if concurrent mutations are possible. + private static (ulong Total, ulong First, ulong Last) FilteredPendingInBlock(MessageBlock mb, string filter, bool wc) + { + if (mb.Fss == null) + return (0, 0, 0); + + var filterBytes = Encoding.UTF8.GetBytes(filter); + ulong total = 0; + ulong first = 0; + ulong last = 0; + + if (wc) + { + mb.Fss.Match(filterBytes, (_, ss) => + { + total += ss.Msgs; + if (first == 0 || (ss.First > 0 && ss.First < first)) + first = ss.First; + if (ss.Last > last) + last = ss.Last; + return true; + }); + } + else + { + var (ss, ok) = mb.Fss.Find(filterBytes); + if (ok && ss != null) + { + total = ss.Msgs; + first = ss.First; + last = ss.Last; + } + } + + return (total, first, last); + } + + private void SchedulePsiFirstBlockCorrection(string filter, bool wc, uint firstBlock) + { + if (_psim == null) + return; + + var filterBytes = Encoding.UTF8.GetBytes(filter); + + _ = Task.Run(() => + { + _mu.EnterWriteLock(); + try + { + if (_psim == null) + return; + + if (!wc) + { + var (info, ok) = _psim.Find(filterBytes); + if (ok && info != null && firstBlock > info.Fblk) + info.Fblk = firstBlock; + return; + } + + _psim.Match(filterBytes, (_, psi) => + { + if (firstBlock > psi.Fblk) + psi.Fblk = firstBlock; + return true; + }); + } + finally + { + _mu.ExitWriteLock(); + } + }); + } + // ----------------------------------------------------------------------- // IStreamStore — type / state // ----------------------------------------------------------------------- diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamFileStoreTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamFileStoreTests.cs index a2d23df..d7c6771 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamFileStoreTests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamFileStoreTests.cs @@ -179,7 +179,7 @@ public sealed partial class JetStreamFileStoreTests { fs.StoreMsg("ts", null, "one"u8.ToArray(), 0); var cutoff = DateTime.UtcNow; - Thread.Sleep(2); + Thread.Sleep(20); fs.StoreMsg("ts", null, "two"u8.ToArray(), 0); fs.GetSeqFromTime(cutoff).ShouldBeGreaterThanOrEqualTo(2UL); diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/JetStreamFileStoreReadQueryTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/JetStreamFileStoreReadQueryTests.cs index 9552d89..852ecc9 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/JetStreamFileStoreReadQueryTests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/JetStreamFileStoreReadQueryTests.cs @@ -154,6 +154,161 @@ public sealed class JetStreamFileStoreReadQueryTests } } + [Theory] + [InlineData("")] + [InlineData(">")] + public void NumFilteredPending_FilterIsAll_UsesStreamState(string filter) + { + var storeDir = CreateStoreDir(); + try + { + var fs = CreateStore(storeDir); + SetField(fs, "_state", new StreamState { FirstSeq = 5UL, LastSeq = 99UL, Msgs = 42UL }); + var ss = new SimpleState(); + + InvokeNumFilteredPending(fs, filter, ss); + + ss.Msgs.ShouldBe(42UL); + ss.First.ShouldBe(5UL); + ss.Last.ShouldBe(99UL); + fs.Stop(); + } + finally + { + DeleteStoreDir(storeDir); + } + } + + [Fact] + public void NumFilteredPending_NoMatch_ResetsSimpleStateToZero() + { + var storeDir = CreateStoreDir(); + try + { + var fs = CreateStore(storeDir); + SetPsims(fs, ("foo.a", 1, 1, 1)); + var ss = new SimpleState { Msgs = 999UL, First = 123UL, Last = 456UL }; + + InvokeNumFilteredPending(fs, "bar.b", ss); + + ss.Msgs.ShouldBe(0UL); + ss.First.ShouldBe(0UL); + ss.Last.ShouldBe(0UL); + fs.Stop(); + } + finally + { + DeleteStoreDir(storeDir); + } + } + + [Fact] + public void NumFilteredPending_LiteralAndWildcard_UsesPsiTotalsAndBlockBounds() + { + var storeDir = CreateStoreDir(); + try + { + var fs = CreateStore(storeDir); + + var b1 = NewBlock(1, 1, 20); + b1.Fss = BuildSubjectStateTree(("foo.a", 2, 10, 11)); + + var b2 = NewBlock(2, 21, 40); + b2.Fss = BuildSubjectStateTree(("foo.a", 1, 30, 30), ("foo.b", 3, 35, 40)); + + ConfigureBlocks(fs, b1, b2); + SetPsims(fs, ("foo.a", 1, 2, 3), ("foo.b", 2, 2, 3)); + + var literal = new SimpleState(); + InvokeNumFilteredPending(fs, "foo.a", literal); + literal.Msgs.ShouldBe(3UL); + literal.First.ShouldBe(10UL); + literal.Last.ShouldBe(30UL); + + var wildcard = new SimpleState(); + InvokeNumFilteredPending(fs, "foo.*", wildcard); + wildcard.Msgs.ShouldBe(6UL); + wildcard.First.ShouldBe(10UL); + wildcard.Last.ShouldBe(40UL); + fs.Stop(); + } + finally + { + DeleteStoreDir(storeDir); + } + } + + [Fact] + public void NumFilteredPendingNoLast_LiteralMatch_LeavesLastAsZero() + { + var storeDir = CreateStoreDir(); + try + { + var fs = CreateStore(storeDir); + var b1 = NewBlock(1, 1, 20); + b1.Fss = BuildSubjectStateTree(("foo.a", 2, 10, 11)); + ConfigureBlocks(fs, b1); + SetPsims(fs, ("foo.a", 1, 1, 2)); + + var ss = new SimpleState(); + InvokeNumFilteredPendingNoLast(fs, "foo.a", ss); + + ss.Msgs.ShouldBe(2UL); + ss.First.ShouldBe(10UL); + ss.Last.ShouldBe(0UL); + fs.Stop(); + } + finally + { + DeleteStoreDir(storeDir); + } + } + + [Fact] + public void NumFilteredPending_StaleFirstBlockHint_UpdatesPsiFirstBlock() + { + var storeDir = CreateStoreDir(); + try + { + var fs = CreateStore(storeDir); + + var b1 = NewBlock(1, 1, 10); + b1.Fss = BuildSubjectStateTree(("bar.a", 1, 1, 1)); + + var b2 = NewBlock(2, 11, 20); + b2.Fss = BuildSubjectStateTree(("foo.a", 1, 20, 20)); + + ConfigureBlocks(fs, b1, b2); + SetPsims(fs, ("foo.a", 1, 2, 1)); + var ss = new SimpleState(); + + InvokeNumFilteredPending(fs, "foo.a", ss); + + ss.Msgs.ShouldBe(1UL); + ss.First.ShouldBe(20UL); + ss.Last.ShouldBe(20UL); + + var updated = false; + for (var i = 0; i < 100; i++) + { + var psi = GetPsi(fs, "foo.a"); + if (psi != null && psi.Fblk == 2) + { + updated = true; + break; + } + Thread.Sleep(10); + } + + updated.ShouldBeTrue(); + fs.Stop(); + } + finally + { + DeleteStoreDir(storeDir); + } + } + private static string CreateStoreDir() { var root = Path.Combine(Path.GetTempPath(), $"fs-read-query-{Guid.NewGuid():N}"); @@ -193,6 +348,24 @@ public sealed class JetStreamFileStoreReadQueryTests }; } + private static SubjectTree BuildSubjectStateTree(params (string Subject, ulong Msgs, ulong First, ulong Last)[] states) + { + var tree = new SubjectTree(); + foreach (var (subject, msgs, first, last) in states) + { + tree.Insert( + System.Text.Encoding.UTF8.GetBytes(subject), + new SimpleState + { + Msgs = msgs, + First = first, + Last = last, + }); + } + + return tree; + } + private static void ConfigureBlocks(JetStreamFileStore fs, params MessageBlock[] blocks) { var ordered = blocks.OrderBy(b => b.Index).ToList(); @@ -247,6 +420,29 @@ public sealed class JetStreamFileStoreReadQueryTests return ((int, Exception?))result!; } + private static void InvokeNumFilteredPending(JetStreamFileStore fs, string filter, SimpleState ss) + { + var mi = typeof(JetStreamFileStore).GetMethod("NumFilteredPending", BindingFlags.Instance | BindingFlags.NonPublic); + mi.ShouldNotBeNull(); + _ = mi!.Invoke(fs, [filter, ss]); + } + + private static void InvokeNumFilteredPendingNoLast(JetStreamFileStore fs, string filter, SimpleState ss) + { + var mi = typeof(JetStreamFileStore).GetMethod("NumFilteredPendingNoLast", BindingFlags.Instance | BindingFlags.NonPublic); + mi.ShouldNotBeNull(); + _ = mi!.Invoke(fs, [filter, ss]); + } + + private static Psi? GetPsi(JetStreamFileStore fs, string subject) + { + var fi = typeof(JetStreamFileStore).GetField("_psim", BindingFlags.Instance | BindingFlags.NonPublic); + fi.ShouldNotBeNull(); + var psim = fi!.GetValue(fs).ShouldBeOfType>(); + var (psi, found) = psim.Find(System.Text.Encoding.UTF8.GetBytes(subject)); + return found ? psi : null; + } + private static void SetField(object target, string fieldName, T value) { var fi = target.GetType().GetField(fieldName, BindingFlags.Instance | BindingFlags.NonPublic); diff --git a/porting.db b/porting.db index 44c3179..b95a387 100644 Binary files a/porting.db and b/porting.db differ