diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/FileStore.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/FileStore.cs index 09752d5..7b5cd2d 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/FileStore.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/FileStore.cs @@ -2032,6 +2032,106 @@ public sealed class JetStreamFileStore : IStreamStore, IDisposable _lmb = _blks.Count > 0 ? _blks[^1] : null; } + // ----------------------------------------------------------------------- + // Read/query helper methods (Batch 13) + // ----------------------------------------------------------------------- + + // Lock should be held by caller. + private (int Next, Exception? Error) CheckSkipFirstBlock(string filter, bool wc, int bi) + { + if (string.IsNullOrEmpty(filter) || filter == ">") + return (bi + 1, null); + + var start = uint.MaxValue; + uint stop = 0; + + if (_psim is { } psim) + { + if (wc) + { + psim.Match(Encoding.UTF8.GetBytes(filter), (_, psi) => + { + if (psi.Fblk < start) + start = psi.Fblk; + if (psi.Lblk > stop) + stop = psi.Lblk; + return true; + }); + } + else + { + var (psi, ok) = psim.Find(Encoding.UTF8.GetBytes(filter)); + if (ok && psi != null) + { + start = psi.Fblk; + stop = psi.Lblk; + } + } + } + + if (start == uint.MaxValue) + return (-1, StoreErrors.ErrStoreEOF); + + return SelectSkipFirstBlock(bi, start, stop); + } + + // Lock should be held by caller. + private (int Next, Exception? Error) CheckSkipFirstBlockMulti(SimpleSublist? sl, int bi) + { + if (_psim == null || sl == null) + return (-1, StoreErrors.ErrStoreEOF); + + var start = uint.MaxValue; + uint stop = 0; + + _psim.IterFast((subj, psi) => + { + var matched = false; + sl.Match(Encoding.UTF8.GetString(subj), _ => matched = true); + if (matched) + { + if (psi.Fblk < start) + start = psi.Fblk; + if (psi.Lblk > stop) + stop = psi.Lblk; + } + return true; + }); + + if (start == uint.MaxValue) + return (-1, StoreErrors.ErrStoreEOF); + + return SelectSkipFirstBlock(bi, start, stop); + } + + // Lock should be held by caller. + private (int Next, Exception? Error) SelectSkipFirstBlock(int bi, uint start, uint stop) + { + if (bi < 0 || bi >= _blks.Count) + return (-1, StoreErrors.ErrStoreEOF); + + var mbi = _blks[bi].Index; + if (stop <= mbi) + return (-1, StoreErrors.ErrStoreEOF); + + if (start > mbi && _bim.TryGetValue(start, out var mb) && mb != null) + { + var ni = -1; + for (var i = 0; i < _blks.Count; i++) + { + if (mb.Last.Seq <= _blks[i].Last.Seq) + { + ni = i; + break; + } + } + + return (ni, null); + } + + return (bi + 1, null); + } + // ----------------------------------------------------------------------- // IStreamStore — type / state // ----------------------------------------------------------------------- diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/JetStreamFileStoreReadQueryTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/JetStreamFileStoreReadQueryTests.cs new file mode 100644 index 0000000..9552d89 --- /dev/null +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/JetStreamFileStoreReadQueryTests.cs @@ -0,0 +1,256 @@ +using System.Reflection; +using Shouldly; +using ZB.MOM.NatsNet.Server; +using ZB.MOM.NatsNet.Server.Internal.DataStructures; + +namespace ZB.MOM.NatsNet.Server.Tests.JetStream; + +public sealed class JetStreamFileStoreReadQueryTests +{ + [Theory] + [InlineData("")] + [InlineData(">")] + public void CheckSkipFirstBlock_FilterIsAll_ReturnsNextBlock(string filter) + { + var storeDir = CreateStoreDir(); + try + { + var fs = CreateStore(storeDir); + ConfigureBlocks(fs, NewBlock(1, 1, 10), NewBlock(2, 11, 20)); + + var (next, error) = InvokeCheckSkipFirstBlock(fs, filter, wc: true, bi: 0); + + error.ShouldBeNull(); + next.ShouldBe(1); + fs.Stop(); + } + finally + { + DeleteStoreDir(storeDir); + } + } + + [Fact] + public void CheckSkipFirstBlock_LiteralFilterWithoutPsiMatch_ReturnsStoreEof() + { + var storeDir = CreateStoreDir(); + try + { + var fs = CreateStore(storeDir); + ConfigureBlocks(fs, NewBlock(1, 1, 10), NewBlock(2, 11, 20)); + SetPsims(fs, ("bar", 1, 2, 2)); + + var (next, error) = InvokeCheckSkipFirstBlock(fs, "foo", wc: false, bi: 0); + + next.ShouldBe(-1); + error.ShouldBe(StoreErrors.ErrStoreEOF); + fs.Stop(); + } + finally + { + DeleteStoreDir(storeDir); + } + } + + [Fact] + public void SelectSkipFirstBlock_StopAtCurrentBlock_ReturnsStoreEof() + { + var storeDir = CreateStoreDir(); + try + { + var fs = CreateStore(storeDir); + ConfigureBlocks(fs, NewBlock(1, 1, 10), NewBlock(3, 11, 20)); + + var (next, error) = InvokeSelectSkipFirstBlock(fs, bi: 1, start: 1, stop: 3); + + next.ShouldBe(-1); + error.ShouldBe(StoreErrors.ErrStoreEOF); + fs.Stop(); + } + finally + { + DeleteStoreDir(storeDir); + } + } + + [Fact] + public void SelectSkipFirstBlock_StartAfterCurrentBlock_ReturnsSelectedBlock() + { + var storeDir = CreateStoreDir(); + try + { + var fs = CreateStore(storeDir); + ConfigureBlocks( + fs, + NewBlock(1, 1, 10), + NewBlock(5, 11, 20), + NewBlock(9, 21, 30)); + + var (next, error) = InvokeSelectSkipFirstBlock(fs, bi: 0, start: 5, stop: 9); + + error.ShouldBeNull(); + next.ShouldBe(1); + fs.Stop(); + } + finally + { + DeleteStoreDir(storeDir); + } + } + + [Fact] + public void CheckSkipFirstBlock_StopBeforeOrAtCurrentBlock_ReturnsStoreEof() + { + var storeDir = CreateStoreDir(); + try + { + var fs = CreateStore(storeDir); + ConfigureBlocks(fs, NewBlock(1, 1, 10), NewBlock(2, 11, 20), NewBlock(3, 21, 30)); + SetPsims(fs, ("foo", 1, 3, 3)); + + var (next, error) = InvokeCheckSkipFirstBlock(fs, "foo", wc: false, bi: 2); + + next.ShouldBe(-1); + error.ShouldBe(StoreErrors.ErrStoreEOF); + fs.Stop(); + } + finally + { + DeleteStoreDir(storeDir); + } + } + + [Fact] + public void CheckSkipFirstBlockMulti_IntersectingSubjectsOnly_SelectsMatchingBlock() + { + var storeDir = CreateStoreDir(); + try + { + var fs = CreateStore(storeDir); + ConfigureBlocks( + fs, + NewBlock(1, 1, 10), + NewBlock(2, 11, 20), + NewBlock(5, 21, 30), + NewBlock(9, 31, 40)); + SetPsims( + fs, + ("zoo.c", 2, 2, 1), + ("foo.a", 5, 5, 1), + ("bar.b", 9, 9, 1)); + + var sl = GenericSublist.NewSimpleSublist(); + sl.Insert("foo.*", EmptyStruct.Value); + + var (next, error) = InvokeCheckSkipFirstBlockMulti(fs, sl, bi: 0); + + error.ShouldBeNull(); + next.ShouldBe(2); + fs.Stop(); + } + finally + { + DeleteStoreDir(storeDir); + } + } + + private static string CreateStoreDir() + { + var root = Path.Combine(Path.GetTempPath(), $"fs-read-query-{Guid.NewGuid():N}"); + Directory.CreateDirectory(root); + return root; + } + + private static void DeleteStoreDir(string storeDir) + { + if (Directory.Exists(storeDir)) + Directory.Delete(storeDir, recursive: true); + } + + private static JetStreamFileStore CreateStore(string storeDir) + { + return new JetStreamFileStore( + new FileStoreConfig { StoreDir = storeDir, BlockSize = 1024 }, + new FileStreamInfo + { + Created = DateTime.UtcNow, + Config = new StreamConfig + { + Name = "S", + Storage = StorageType.FileStorage, + Subjects = ["foo.*", "bar.*", "zoo.*"], + }, + }); + } + + private static MessageBlock NewBlock(uint index, ulong first, ulong last) + { + return new MessageBlock + { + Index = index, + First = new MsgId { Seq = first }, + Last = new MsgId { Seq = last }, + }; + } + + private static void ConfigureBlocks(JetStreamFileStore fs, params MessageBlock[] blocks) + { + var ordered = blocks.OrderBy(b => b.Index).ToList(); + var bim = ordered.ToDictionary(b => b.Index, b => b); + SetField(fs, "_blks", ordered); + SetField(fs, "_bim", bim); + SetField(fs, "_lmb", ordered.Count > 0 ? ordered[^1] : null); + } + + private static void SetPsims(JetStreamFileStore fs, params (string Subject, uint Fblk, uint Lblk, ulong Total)[] entries) + { + var psim = new SubjectTree(); + foreach (var (subject, fblk, lblk, total) in entries) + { + psim.Insert( + System.Text.Encoding.UTF8.GetBytes(subject), + new Psi + { + Fblk = fblk, + Lblk = lblk, + Total = total, + }); + } + + SetField(fs, "_psim", psim); + } + + private static (int Next, Exception? Error) InvokeCheckSkipFirstBlock(JetStreamFileStore fs, string filter, bool wc, int bi) + { + var mi = typeof(JetStreamFileStore).GetMethod("CheckSkipFirstBlock", BindingFlags.Instance | BindingFlags.NonPublic); + mi.ShouldNotBeNull(); + var result = mi!.Invoke(fs, [filter, wc, bi]); + result.ShouldNotBeNull(); + return ((int, Exception?))result!; + } + + private static (int Next, Exception? Error) InvokeCheckSkipFirstBlockMulti(JetStreamFileStore fs, SimpleSublist sl, int bi) + { + var mi = typeof(JetStreamFileStore).GetMethod("CheckSkipFirstBlockMulti", BindingFlags.Instance | BindingFlags.NonPublic); + mi.ShouldNotBeNull(); + var result = mi!.Invoke(fs, [sl, bi]); + result.ShouldNotBeNull(); + return ((int, Exception?))result!; + } + + private static (int Next, Exception? Error) InvokeSelectSkipFirstBlock(JetStreamFileStore fs, int bi, uint start, uint stop) + { + var mi = typeof(JetStreamFileStore).GetMethod("SelectSkipFirstBlock", BindingFlags.Instance | BindingFlags.NonPublic); + mi.ShouldNotBeNull(); + var result = mi!.Invoke(fs, [bi, start, stop]); + result.ShouldNotBeNull(); + return ((int, Exception?))result!; + } + + private static void SetField(object target, string fieldName, T value) + { + var fi = target.GetType().GetField(fieldName, BindingFlags.Instance | BindingFlags.NonPublic); + fi.ShouldNotBeNull(); + fi!.SetValue(target, value); + } +} diff --git a/porting.db b/porting.db index 3774516..44c3179 100644 Binary files a/porting.db and b/porting.db differ