feat(batch13): port filestore skip-first-block query helpers

This commit is contained in:
Joseph Doherty
2026-02-28 14:36:58 -05:00
parent 2cec58f559
commit 430ba17f42
3 changed files with 356 additions and 0 deletions

View File

@@ -2032,6 +2032,106 @@ public sealed class JetStreamFileStore : IStreamStore, IDisposable
_lmb = _blks.Count > 0 ? _blks[^1] : null;
}
// -----------------------------------------------------------------------
// Read/query helper methods (Batch 13)
// -----------------------------------------------------------------------
// Lock should be held by caller.
private (int Next, Exception? Error) CheckSkipFirstBlock(string filter, bool wc, int bi)
{
if (string.IsNullOrEmpty(filter) || filter == ">")
return (bi + 1, null);
var start = uint.MaxValue;
uint stop = 0;
if (_psim is { } psim)
{
if (wc)
{
psim.Match(Encoding.UTF8.GetBytes(filter), (_, psi) =>
{
if (psi.Fblk < start)
start = psi.Fblk;
if (psi.Lblk > stop)
stop = psi.Lblk;
return true;
});
}
else
{
var (psi, ok) = psim.Find(Encoding.UTF8.GetBytes(filter));
if (ok && psi != null)
{
start = psi.Fblk;
stop = psi.Lblk;
}
}
}
if (start == uint.MaxValue)
return (-1, StoreErrors.ErrStoreEOF);
return SelectSkipFirstBlock(bi, start, stop);
}
// Lock should be held by caller.
private (int Next, Exception? Error) CheckSkipFirstBlockMulti(SimpleSublist? sl, int bi)
{
if (_psim == null || sl == null)
return (-1, StoreErrors.ErrStoreEOF);
var start = uint.MaxValue;
uint stop = 0;
_psim.IterFast((subj, psi) =>
{
var matched = false;
sl.Match(Encoding.UTF8.GetString(subj), _ => matched = true);
if (matched)
{
if (psi.Fblk < start)
start = psi.Fblk;
if (psi.Lblk > stop)
stop = psi.Lblk;
}
return true;
});
if (start == uint.MaxValue)
return (-1, StoreErrors.ErrStoreEOF);
return SelectSkipFirstBlock(bi, start, stop);
}
// Lock should be held by caller.
private (int Next, Exception? Error) SelectSkipFirstBlock(int bi, uint start, uint stop)
{
if (bi < 0 || bi >= _blks.Count)
return (-1, StoreErrors.ErrStoreEOF);
var mbi = _blks[bi].Index;
if (stop <= mbi)
return (-1, StoreErrors.ErrStoreEOF);
if (start > mbi && _bim.TryGetValue(start, out var mb) && mb != null)
{
var ni = -1;
for (var i = 0; i < _blks.Count; i++)
{
if (mb.Last.Seq <= _blks[i].Last.Seq)
{
ni = i;
break;
}
}
return (ni, null);
}
return (bi + 1, null);
}
// -----------------------------------------------------------------------
// IStreamStore — type / state
// -----------------------------------------------------------------------

View File

@@ -0,0 +1,256 @@
using System.Reflection;
using Shouldly;
using ZB.MOM.NatsNet.Server;
using ZB.MOM.NatsNet.Server.Internal.DataStructures;
namespace ZB.MOM.NatsNet.Server.Tests.JetStream;
public sealed class JetStreamFileStoreReadQueryTests
{
[Theory]
[InlineData("")]
[InlineData(">")]
public void CheckSkipFirstBlock_FilterIsAll_ReturnsNextBlock(string filter)
{
var storeDir = CreateStoreDir();
try
{
var fs = CreateStore(storeDir);
ConfigureBlocks(fs, NewBlock(1, 1, 10), NewBlock(2, 11, 20));
var (next, error) = InvokeCheckSkipFirstBlock(fs, filter, wc: true, bi: 0);
error.ShouldBeNull();
next.ShouldBe(1);
fs.Stop();
}
finally
{
DeleteStoreDir(storeDir);
}
}
[Fact]
public void CheckSkipFirstBlock_LiteralFilterWithoutPsiMatch_ReturnsStoreEof()
{
var storeDir = CreateStoreDir();
try
{
var fs = CreateStore(storeDir);
ConfigureBlocks(fs, NewBlock(1, 1, 10), NewBlock(2, 11, 20));
SetPsims(fs, ("bar", 1, 2, 2));
var (next, error) = InvokeCheckSkipFirstBlock(fs, "foo", wc: false, bi: 0);
next.ShouldBe(-1);
error.ShouldBe(StoreErrors.ErrStoreEOF);
fs.Stop();
}
finally
{
DeleteStoreDir(storeDir);
}
}
[Fact]
public void SelectSkipFirstBlock_StopAtCurrentBlock_ReturnsStoreEof()
{
var storeDir = CreateStoreDir();
try
{
var fs = CreateStore(storeDir);
ConfigureBlocks(fs, NewBlock(1, 1, 10), NewBlock(3, 11, 20));
var (next, error) = InvokeSelectSkipFirstBlock(fs, bi: 1, start: 1, stop: 3);
next.ShouldBe(-1);
error.ShouldBe(StoreErrors.ErrStoreEOF);
fs.Stop();
}
finally
{
DeleteStoreDir(storeDir);
}
}
[Fact]
public void SelectSkipFirstBlock_StartAfterCurrentBlock_ReturnsSelectedBlock()
{
var storeDir = CreateStoreDir();
try
{
var fs = CreateStore(storeDir);
ConfigureBlocks(
fs,
NewBlock(1, 1, 10),
NewBlock(5, 11, 20),
NewBlock(9, 21, 30));
var (next, error) = InvokeSelectSkipFirstBlock(fs, bi: 0, start: 5, stop: 9);
error.ShouldBeNull();
next.ShouldBe(1);
fs.Stop();
}
finally
{
DeleteStoreDir(storeDir);
}
}
[Fact]
public void CheckSkipFirstBlock_StopBeforeOrAtCurrentBlock_ReturnsStoreEof()
{
var storeDir = CreateStoreDir();
try
{
var fs = CreateStore(storeDir);
ConfigureBlocks(fs, NewBlock(1, 1, 10), NewBlock(2, 11, 20), NewBlock(3, 21, 30));
SetPsims(fs, ("foo", 1, 3, 3));
var (next, error) = InvokeCheckSkipFirstBlock(fs, "foo", wc: false, bi: 2);
next.ShouldBe(-1);
error.ShouldBe(StoreErrors.ErrStoreEOF);
fs.Stop();
}
finally
{
DeleteStoreDir(storeDir);
}
}
[Fact]
public void CheckSkipFirstBlockMulti_IntersectingSubjectsOnly_SelectsMatchingBlock()
{
var storeDir = CreateStoreDir();
try
{
var fs = CreateStore(storeDir);
ConfigureBlocks(
fs,
NewBlock(1, 1, 10),
NewBlock(2, 11, 20),
NewBlock(5, 21, 30),
NewBlock(9, 31, 40));
SetPsims(
fs,
("zoo.c", 2, 2, 1),
("foo.a", 5, 5, 1),
("bar.b", 9, 9, 1));
var sl = GenericSublist<EmptyStruct>.NewSimpleSublist();
sl.Insert("foo.*", EmptyStruct.Value);
var (next, error) = InvokeCheckSkipFirstBlockMulti(fs, sl, bi: 0);
error.ShouldBeNull();
next.ShouldBe(2);
fs.Stop();
}
finally
{
DeleteStoreDir(storeDir);
}
}
private static string CreateStoreDir()
{
var root = Path.Combine(Path.GetTempPath(), $"fs-read-query-{Guid.NewGuid():N}");
Directory.CreateDirectory(root);
return root;
}
private static void DeleteStoreDir(string storeDir)
{
if (Directory.Exists(storeDir))
Directory.Delete(storeDir, recursive: true);
}
private static JetStreamFileStore CreateStore(string storeDir)
{
return new JetStreamFileStore(
new FileStoreConfig { StoreDir = storeDir, BlockSize = 1024 },
new FileStreamInfo
{
Created = DateTime.UtcNow,
Config = new StreamConfig
{
Name = "S",
Storage = StorageType.FileStorage,
Subjects = ["foo.*", "bar.*", "zoo.*"],
},
});
}
private static MessageBlock NewBlock(uint index, ulong first, ulong last)
{
return new MessageBlock
{
Index = index,
First = new MsgId { Seq = first },
Last = new MsgId { Seq = last },
};
}
private static void ConfigureBlocks(JetStreamFileStore fs, params MessageBlock[] blocks)
{
var ordered = blocks.OrderBy(b => b.Index).ToList();
var bim = ordered.ToDictionary(b => b.Index, b => b);
SetField(fs, "_blks", ordered);
SetField(fs, "_bim", bim);
SetField(fs, "_lmb", ordered.Count > 0 ? ordered[^1] : null);
}
private static void SetPsims(JetStreamFileStore fs, params (string Subject, uint Fblk, uint Lblk, ulong Total)[] entries)
{
var psim = new SubjectTree<Psi>();
foreach (var (subject, fblk, lblk, total) in entries)
{
psim.Insert(
System.Text.Encoding.UTF8.GetBytes(subject),
new Psi
{
Fblk = fblk,
Lblk = lblk,
Total = total,
});
}
SetField(fs, "_psim", psim);
}
private static (int Next, Exception? Error) InvokeCheckSkipFirstBlock(JetStreamFileStore fs, string filter, bool wc, int bi)
{
var mi = typeof(JetStreamFileStore).GetMethod("CheckSkipFirstBlock", BindingFlags.Instance | BindingFlags.NonPublic);
mi.ShouldNotBeNull();
var result = mi!.Invoke(fs, [filter, wc, bi]);
result.ShouldNotBeNull();
return ((int, Exception?))result!;
}
private static (int Next, Exception? Error) InvokeCheckSkipFirstBlockMulti(JetStreamFileStore fs, SimpleSublist sl, int bi)
{
var mi = typeof(JetStreamFileStore).GetMethod("CheckSkipFirstBlockMulti", BindingFlags.Instance | BindingFlags.NonPublic);
mi.ShouldNotBeNull();
var result = mi!.Invoke(fs, [sl, bi]);
result.ShouldNotBeNull();
return ((int, Exception?))result!;
}
private static (int Next, Exception? Error) InvokeSelectSkipFirstBlock(JetStreamFileStore fs, int bi, uint start, uint stop)
{
var mi = typeof(JetStreamFileStore).GetMethod("SelectSkipFirstBlock", BindingFlags.Instance | BindingFlags.NonPublic);
mi.ShouldNotBeNull();
var result = mi!.Invoke(fs, [bi, start, stop]);
result.ShouldNotBeNull();
return ((int, Exception?))result!;
}
private static void SetField<T>(object target, string fieldName, T value)
{
var fi = target.GetType().GetField(fieldName, BindingFlags.Instance | BindingFlags.NonPublic);
fi.ShouldNotBeNull();
fi!.SetValue(target, value);
}
}

Binary file not shown.