feat(batch15): complete group 2 msgblock/consumerfilestore

This commit is contained in:
Joseph Doherty
2026-02-28 17:10:41 -05:00
parent f36bc3111b
commit fd0170b22c
5 changed files with 361 additions and 9 deletions

View File

@@ -795,6 +795,299 @@ internal sealed class MessageBlock
return null;
}
internal void FinishedWithCache()
{
if (CacheData != null && PendingWriteSizeLocked() == 0)
CacheData = null;
}
internal (ulong Seq, Exception? Error) SkipMsg(ulong seq)
{
var fs = Fs;
if (fs == null)
return (0, StoreErrors.ErrStoreClosed);
return fs.SkipMsg(seq);
}
internal bool ShouldCompactInline()
{
if (NoCompact || Msgs == 0 || Dmap.Size == 0)
return false;
var deletedBytes = RBytes > Bytes ? RBytes - Bytes : 0;
return deletedBytes >= (ulong)FileStoreDefaults.CompactMinimum ||
Dmap.Size >= Math.Max(1, (int)(Msgs / 2));
}
internal bool ShouldCompactSync()
=> ShouldCompactInline() && SyncAlways;
internal Exception? Compact()
=> CompactWithFloor(0);
internal Exception? CompactWithFloor(ulong floor)
{
var fs = Fs;
if (fs == null)
return StoreErrors.ErrStoreClosed;
var start = floor == 0 ? Math.Max(First.Seq, 1UL) : floor;
var (_, err) = fs.Compact(start);
return err;
}
internal (uint Slot, bool Deleted) SlotInfo(ulong seq)
{
var cache = CacheData;
if (cache == null || cache.Idx.Length == 0 || seq < cache.Fseq)
return (0, false);
var slot = seq - cache.Fseq;
if (slot >= (ulong)cache.Idx.Length)
return (0, false);
var raw = cache.Idx[slot];
var deleted = (raw & FileStoreDefaults.Dbit) != 0;
return (raw & ~(FileStoreDefaults.Dbit | FileStoreDefaults.Cbit), deleted);
}
internal void SpinUpFlushLoop()
{
Mu.EnterWriteLock();
try
{
SpinUpFlushLoopLocked();
}
finally
{
Mu.ExitWriteLock();
}
}
internal void SpinUpFlushLoopLocked()
{
if (Flusher || Closed)
return;
Flusher = true;
Fch = Channel.CreateBounded<byte>(1);
Qch = Channel.CreateUnbounded<byte>();
var fch = Fch;
var qch = Qch;
_ = Task.Run(() => FlushLoop(fch, qch));
}
internal void KickFlusher()
{
Mu.EnterReadLock();
try
{
_ = Fch?.Writer.TryWrite(0);
}
finally
{
Mu.ExitReadLock();
}
}
internal void SetInFlusher()
{
Mu.EnterWriteLock();
try
{
Flusher = true;
}
finally
{
Mu.ExitWriteLock();
}
}
internal void ClearInFlusher()
{
Mu.EnterWriteLock();
try
{
Flusher = false;
Qch?.Writer.TryComplete();
Qch = null;
Fch?.Writer.TryComplete();
Fch = null;
}
finally
{
Mu.ExitWriteLock();
}
}
internal void FlushLoop(Channel<byte>? flushChannel = null, Channel<byte>? quitChannel = null)
{
SetInFlusher();
try
{
var fch = flushChannel ?? Fch;
var qch = quitChannel ?? Qch;
if (fch == null)
return;
while (true)
{
if (qch?.Reader.Completion.IsCompleted == true)
break;
var canRead = fch.Reader.WaitToReadAsync().AsTask().GetAwaiter().GetResult();
if (!canRead)
break;
while (fch.Reader.TryRead(out _)) { }
Mu.EnterWriteLock();
try
{
if (Closed)
break;
if (Mfd != null)
{
Mfd.Flush(SyncAlways);
NeedSync = false;
}
}
catch (Exception ex)
{
Werr = ex;
}
finally
{
Mu.ExitWriteLock();
}
}
}
finally
{
ClearInFlusher();
}
}
internal (bool Removed, Exception? Error) EraseMsg(ulong seq)
{
var fs = Fs;
if (fs == null)
return (false, StoreErrors.ErrStoreClosed);
return fs.EraseMsg(seq);
}
internal void Truncate(ulong seq)
{
Fs?.Truncate(seq);
}
internal bool IsEmpty()
{
Mu.EnterReadLock();
try
{
return Msgs == 0;
}
finally
{
Mu.ExitReadLock();
}
}
internal void ResetCacheExpireTimer()
{
Mu.EnterWriteLock();
try
{
if (Ctmr == null)
StartCacheExpireTimer();
else
_ = Ctmr.Change(Cexp, Timeout.InfiniteTimeSpan);
}
finally
{
Mu.ExitWriteLock();
}
}
internal void StartCacheExpireTimer()
{
if (Cexp <= TimeSpan.Zero)
return;
Ctmr?.Dispose();
Ctmr = new Timer(_ =>
{
Mu.EnterWriteLock();
try
{
if (!Closed)
TryForceExpireCacheLocked();
}
finally
{
Mu.ExitWriteLock();
}
}, null, Cexp, Timeout.InfiniteTimeSpan);
}
internal void ClearCacheAndOffset()
{
CacheData = null;
Fss = null;
Ctmr?.Dispose();
Ctmr = null;
try
{
if (!string.IsNullOrWhiteSpace(Mfn))
{
var dir = Path.GetDirectoryName(Mfn);
if (!string.IsNullOrWhiteSpace(dir))
{
var idxPath = Path.Combine(dir, string.Format(FileStoreDefaults.IndexScan, Index));
if (File.Exists(idxPath))
File.Delete(idxPath);
var fssPath = Path.Combine(dir, $"{Index}.fss");
if (File.Exists(fssPath))
File.Delete(fssPath);
}
}
}
catch
{
// best effort cleanup for stale metadata files
}
}
internal ulong PendingWriteSize()
{
Mu.EnterReadLock();
try
{
return PendingWriteSizeLocked();
}
finally
{
Mu.ExitReadLock();
}
}
internal ulong PendingWriteSizeLocked()
{
var cache = CacheData;
if (cache == null)
return 0;
var wp = Math.Clamp(cache.Wp, 0, cache.Buf.Length);
return (ulong)(cache.Buf.Length - wp);
}
private static bool HasChecksum(byte[]? checksum)
{
if (checksum == null || checksum.Length != FileStoreDefaults.RecordHashSize)