feat(batch15): complete group 4 msgblock/consumerfilestore

This commit is contained in:
Joseph Doherty
2026-02-28 18:03:56 -05:00
parent ced3e7e9b8
commit 43d914bf83
6 changed files with 939 additions and 53 deletions

View File

@@ -282,6 +282,13 @@ internal sealed class ErrBadMsg : Exception
Detail = detail;
}
/// <summary>
/// Returns the formatted error text.
/// Mirrors Go's <c>errBadMsg.Error()</c>.
/// </summary>
internal string Error()
=> Message;
private static string BuildMessage(string fileName, string detail)
{
var baseName = Path.GetFileName(fileName);

View File

@@ -1057,7 +1057,9 @@ internal sealed class MessageBlock
if (flush || Werr != null)
{
var flushErr = FlushPendingToDiskLocked();
var (lostData, flushErr) = FlushPendingMsgsLocked();
if (lostData != null && Fs != null)
Fs.RebuildState(lostData);
if (flushErr != null)
return flushErr;
}
@@ -1415,73 +1417,752 @@ internal sealed class MessageBlock
if (original.Length == 0)
return null;
var checkErr = CheckAndLoadEncryption();
if (checkErr != null)
return checkErr;
if (Bek != null)
Bek.XorKeyStream(original.AsSpan());
var desired = Cmp;
var meta = new CompressionInfo();
var consumed = meta.UnmarshalMetadata(original);
if (consumed > 0 && meta.Type == desired)
var desired = Cmp;
if (consumed > 0 && meta.Type == desired && desired == StoreCompression.NoCompression)
return null;
byte[] rewritten;
if (desired == StoreCompression.NoCompression && consumed > 0)
byte[] rewritten = original;
if (consumed > 0)
{
if (meta.Type != StoreCompression.NoCompression)
return new NotSupportedException("compressed block decompression is not available");
if (meta.Type == StoreCompression.NoCompression)
{
rewritten = original[consumed..];
}
else
{
var (decompressed, decompressErr) = DecompressIfNeeded(original);
if (decompressErr != null)
return decompressErr;
rewritten = original[consumed..];
}
else
{
// Compression transforms are introduced separately with StoreCompression helpers.
rewritten = original;
rewritten = decompressed ?? Array.Empty<byte>();
}
}
if (Bek != null)
Bek.XorKeyStream(rewritten.AsSpan());
return AtomicOverwriteFile(rewritten, allowCompress: true);
}
// Lock should be held.
internal Exception? AtomicOverwriteFile(byte[] buf, bool allowCompress)
{
ArgumentNullException.ThrowIfNull(buf);
var hadOpenFile = Mfd != null;
if (hadOpenFile)
CloseFDsLockedNoCheck();
var origFileName = Mfn;
var tempFileName = Mfn + FileStoreDefaults.BlkTmpSuffix;
try
{
var tmp = $"{Mfn}.tmp";
File.WriteAllBytes(tmp, rewritten);
File.Move(tmp, Mfn, overwrite: true);
var algorithm = StoreCompression.NoCompression;
if (allowCompress && Cmp != StoreCompression.NoCompression)
{
if (Cmp != StoreCompression.S2Compression)
return new NotSupportedException($"unsupported compression algorithm: {Cmp}");
return new NotSupportedException("message block compression is not available");
}
var writeBuffer = buf;
if (algorithm != StoreCompression.NoCompression)
{
return new NotSupportedException("message block compression is not available");
}
var encryptErr = EncryptOrDecryptIfNeeded(writeBuffer);
if (encryptErr != null)
return encryptErr;
using var tmp = new FileStream(tempFileName, FileMode.Create, FileAccess.Write, FileShare.None);
tmp.Write(writeBuffer, 0, writeBuffer.Length);
tmp.Flush(flushToDisk: true);
tmp.Close();
File.Move(tempFileName, origFileName, overwrite: true);
Cmp = algorithm;
RBytes = (ulong)writeBuffer.Length;
if (hadOpenFile)
{
var enableErr = EnableForWriting(false);
if (enableErr != null)
return enableErr;
}
return null;
}
catch (Exception ex)
{
try
{
if (File.Exists(tempFileName))
File.Delete(tempFileName);
}
catch
{
// Best-effort cleanup only.
}
if (hadOpenFile && Mfd == null)
_ = EnableForWriting(false);
return ex;
}
}
// Lock should be held.
internal (byte[]? Buffer, Exception? Error) DecompressIfNeeded(byte[] buf)
{
ArgumentNullException.ThrowIfNull(buf);
var hasCompressionHeader = buf.Length >= 3 &&
buf[0] == (byte)'c' &&
buf[1] == (byte)'m' &&
buf[2] == (byte)'p';
var meta = new CompressionInfo();
var consumed = meta.UnmarshalMetadata(buf);
if (consumed == 0)
{
if (hasCompressionHeader)
return (null, new InvalidDataException("invalid compression metadata"));
return (buf, null);
}
if (meta.Type == StoreCompression.NoCompression)
return (buf[consumed..], null);
return (null, new NotSupportedException("message block decompression is not available"));
}
// Lock should be held.
internal Exception? EncryptOrDecryptIfNeeded(byte[] buf)
{
ArgumentNullException.ThrowIfNull(buf);
if (Bek != null && buf.Length > 0)
Bek.XorKeyStream(buf.AsSpan());
return null;
}
// Lock should be held.
internal Exception? EnsureRawBytesLoaded()
{
if (RBytes > 0)
return null;
var (file, openErr) = OpenBlock();
if (openErr != null || file == null)
return openErr ?? new InvalidOperationException("unable to open message block");
try
{
RBytes = (ulong)file.Length;
return null;
}
finally
{
file.Dispose();
}
}
// Index a raw message buffer.
// Lock should be held.
internal Exception? IndexCacheBuf(byte[] buf)
{
ArgumentNullException.ThrowIfNull(buf);
var blockFirstSeq = First.Seq;
var blockLastSeq = Last.Seq;
if (blockFirstSeq > blockLastSeq + 1)
{
Fs?.Warn("indexCacheBuf corrupt state in {0}: first {1} last {2}", Mfn, blockFirstSeq, blockLastSeq);
return new InvalidDataException("corrupt state");
}
var idxSize = blockLastSeq - blockFirstSeq + 1;
if (idxSize > int.MaxValue)
return new InvalidDataException("corrupt state");
if (CacheData != null && CacheData.Buf.Length > 0)
JetStreamFileStore.RecycleMsgBlockBuf(CacheData.Buf);
CacheData ??= new Cache();
CacheData.Buf = Array.Empty<byte>();
CacheData.Idx = Array.Empty<uint>();
CacheData.Fseq = 0;
CacheData.Wp = 0;
CacheData.Nra = false;
var idx = new List<uint>((int)idxSize);
var populateFss = false;
if (FssNotLoaded())
{
Fss = new SubjectTree<SimpleState>();
populateFss = true;
}
Lsts = JetStreamFileStore.TimestampNormalized(DateTime.UtcNow);
Ttls = 0;
Schedules = 0;
var offset = 0;
ulong firstSeen = 0;
ulong lastSeen = 0;
while (offset < buf.Length)
{
if (!TryReadRecordHeader(buf.AsSpan(offset), out var seqRaw, out _, out var subjectLength, out var headerLength, out var msgLength, out var recordLength, out _))
return new InvalidDataException("corrupt state");
if (recordLength <= 0 || offset + recordLength > buf.Length)
return new InvalidDataException("corrupt state");
if ((seqRaw & FileStoreDefaults.Tbit) != 0)
{
offset += recordLength;
continue;
}
var erased = (seqRaw & FileStoreDefaults.Ebit) != 0;
var seq = seqRaw & ~FileStoreDefaults.Ebit & ~FileStoreDefaults.Tbit;
if (seq <= lastSeen)
{
offset += recordLength;
continue;
}
if (seq >= blockFirstSeq)
{
lastSeen = seq;
if (firstSeen == 0)
{
firstSeen = seq;
if (firstSeen != blockFirstSeq)
return new InvalidDataException("corrupt state");
}
var expectedSlot = (int)(seq - blockFirstSeq);
if (expectedSlot != idx.Count)
{
for (var dseq = blockFirstSeq + (ulong)idx.Count; dseq < seq; dseq++)
{
idx.Add(FileStoreDefaults.Dbit);
if (dseq != 0)
Dmap.Insert(dseq);
}
}
idx.Add((uint)offset);
if (erased && seq != 0 && !Dmap.Exists(seq))
return new InvalidDataException("corrupt state");
if (populateFss && subjectLength > 0 && !NoTrack && !erased && !Dmap.Exists(seq))
{
var subjectOffset = offset + 8 + 8 + 4 + 4 + 4;
if (subjectOffset < 0 || subjectOffset + subjectLength > buf.Length)
return new InvalidDataException("corrupt state");
var subjectSpan = buf.AsSpan(subjectOffset, subjectLength);
var (state, exists) = Fss!.Find(subjectSpan);
if (exists && state != null)
{
state.Msgs++;
state.Last = seq;
state.LastNeedsUpdate = false;
}
else
{
Fss!.Insert(subjectSpan, new SimpleState
{
Msgs = 1,
First = seq,
Last = seq,
LastNeedsUpdate = false,
});
}
}
}
offset += recordLength;
}
if (idx.Count != (int)idxSize ||
(firstSeen > 0 && firstSeen != blockFirstSeq) ||
(lastSeen > 0 && lastSeen != blockLastSeq))
return new InvalidDataException("corrupt state");
CacheData.Buf = buf;
CacheData.Idx = idx.ToArray();
CacheData.Fseq = blockFirstSeq;
CacheData.Wp = buf.Length;
return null;
}
// Flushes pending messages for this block.
internal Exception? FlushPendingMsgs()
{
Mu.EnterWriteLock();
try
{
var (lostData, err) = FlushPendingMsgsLocked();
if (lostData != null && Fs != null)
Fs.RebuildState(lostData);
return err;
}
finally
{
Mu.ExitWriteLock();
}
}
// Writes the buffer to disk at an explicit offset.
// Lock should be held.
internal (int Written, Exception? Error) WriteAt(byte[] buf, long writeOffset)
{
ArgumentNullException.ThrowIfNull(buf);
if (MockWriteErr)
{
MockWriteErr = false;
return (0, new IOException("mock write error"));
}
if (Mfd == null)
return (0, new InvalidOperationException("message block file is not open"));
try
{
Mfd.Seek(writeOffset, SeekOrigin.Begin);
Mfd.Write(buf, 0, buf.Length);
return (buf.Length, null);
}
catch (Exception ex)
{
return (0, ex);
}
}
// flushPendingMsgsLocked writes pending bytes to disk.
// Lock should be held.
internal (LostStreamData? LostData, Exception? Error) FlushPendingMsgsLocked()
{
LostStreamData? fsLostData = null;
if (CacheData == null || Mfd == null)
return (null, new InvalidOperationException("no message cache"));
var (pending, pendingErr) = BytesPending();
if (pendingErr != null || pending == null || pending.Length == 0)
{
if (pendingErr is InvalidOperationException)
return (null, null);
return (null, pendingErr);
}
var writeOffset = (long)CacheData.Wp;
var writeBuffer = pending;
var checkErr = CheckAndLoadEncryption();
if (checkErr != null)
return (null, checkErr);
if (Bek != null && writeBuffer.Length > 0)
{
var encrypted = new byte[writeBuffer.Length];
writeBuffer.AsSpan().CopyTo(encrypted);
Bek.XorKeyStream(encrypted.AsSpan());
writeBuffer = encrypted;
}
while (writeBuffer.Length > 0)
{
var (written, writeErr) = WriteAt(writeBuffer, writeOffset);
if (writeErr != null)
{
CloseFDsLockedNoCheck();
var (lostData, _, _) = RebuildStateLocked();
Werr = writeErr;
return (lostData, writeErr);
}
writeOffset += written;
writeBuffer = writeBuffer[written..];
}
Werr = null;
if (CacheData == null || Mfd == null)
return (fsLostData, Werr);
CacheData.Wp = (int)writeOffset;
if (SyncAlways)
Mfd.Flush(flushToDisk: true);
else
NeedSync = true;
if (PendingWriteSizeLocked() > 0)
return (fsLostData, Werr);
var now = JetStreamFileStore.TimestampNormalized(DateTime.UtcNow);
var cexpNs = TimeSpanToNanoseconds(Cexp);
if (now < Llts || (cexpNs > 0 && (now - Llts) <= cexpNs))
{
ResetCacheExpireTimerLocked(TimeSpan.Zero);
return (fsLostData, Werr);
}
CacheData.Nra = false;
ExpireCacheLocked();
return (fsLostData, Werr);
}
// Lock should be held.
internal void ClearLoading()
{
Loading = false;
}
// Loads messages from disk while holding the block lock.
internal Exception? LoadMsgs()
{
Mu.EnterWriteLock();
try
{
return LoadMsgsWithLock();
}
finally
{
Mu.ExitWriteLock();
}
}
// Lock should be held.
internal bool CacheAlreadyLoaded()
{
var cache = CacheData;
if (cache == null || cache.Fseq == 0 || cache.Buf.Length == 0)
return false;
if (First.Seq < cache.Fseq)
return false;
var expectedEntries = Msgs + (ulong)Dmap.Size + (First.Seq - cache.Fseq);
return expectedEntries == (ulong)cache.Idx.Length;
}
// Lock should be held.
internal bool CacheNotLoaded()
=> !CacheAlreadyLoaded();
// Lock should be held.
internal bool FssNotLoaded()
=> Fss == null && !NoTrack;
// Lock should be held.
internal (FileStream? File, Exception? Error) OpenBlock()
{
try
{
return (new FileStream(Mfn, FileMode.Open, FileAccess.Read, FileShare.ReadWrite), null);
}
catch (Exception ex)
{
return (null, ex);
}
}
// Loads a block file into memory.
// Lock should be held.
internal (byte[]? Buffer, Exception? Error) LoadBlock(byte[]? buf)
{
FileStream? file = null;
var closeWhenDone = false;
try
{
if (Mfd != null)
{
file = Mfd;
if (file.Seek(0, SeekOrigin.Begin) != 0)
{
file = null;
CloseFDsLockedNoCheck();
}
}
if (file == null)
{
var (opened, openErr) = OpenBlock();
if (openErr != null || opened == null)
{
if (openErr is FileNotFoundException)
return (null, new FileNotFoundException("message block data missing", Mfn, openErr));
return (null, openErr ?? new IOException("failed to open message block"));
}
file = opened;
closeWhenDone = true;
}
var size64 = file.Length;
if (size64 > int.MaxValue)
return (null, new InvalidOperationException("message block size exceeded int capacity"));
var size = (int)size64;
if (buf == null || buf.Length < size)
buf = JetStreamFileStore.GetMsgBlockBuf(size);
var readTotal = 0;
while (readTotal < size)
{
var read = file.Read(buf, readTotal, size - readTotal);
if (read <= 0)
return (buf[..readTotal], new EndOfStreamException("short read while loading message block"));
readTotal += read;
}
RBytes = (ulong)readTotal;
return (buf[..readTotal], null);
}
catch (Exception ex)
{
return (null, ex);
}
finally
{
if (closeWhenDone)
file?.Dispose();
}
}
// Lock should be held.
internal Exception? LoadMsgsWithLock()
{
var checkErr = CheckAndLoadEncryption();
if (checkErr != null)
return checkErr;
if (Loading)
return null;
Loading = true;
try
{
var checks = 0;
while (true)
{
checks++;
if (checks > 8)
return new InvalidOperationException("no message cache");
if (CacheAlreadyLoaded())
return null;
Llts = JetStreamFileStore.TimestampNormalized(DateTime.UtcNow);
var (pending, _) = BytesPending();
if (pending is { Length: > 0 })
{
var (lostData, flushErr) = FlushPendingMsgsLocked();
if (lostData != null && Fs != null)
Fs.RebuildState(lostData);
if (flushErr != null)
return flushErr;
continue;
}
var (buffer, loadErr) = LoadBlock(null);
if (loadErr != null || buffer == null)
{
Fs?.Warn("loadBlock error: {0}", loadErr?.Message ?? "unknown");
if (loadErr is FileNotFoundException)
{
var (lostData, _, rebuildErr) = RebuildStateLocked();
if (rebuildErr != null && lostData != null && Fs != null)
Fs.RebuildState(lostData);
}
return loadErr ?? new IOException("failed to load message block");
}
var cryptErr = EncryptOrDecryptIfNeeded(buffer);
if (cryptErr != null)
return cryptErr;
var (decompressed, decompressErr) = DecompressIfNeeded(buffer);
if (decompressErr != null || decompressed == null)
return decompressErr ?? new InvalidDataException("failed to decompress message block");
var indexErr = IndexCacheBuf(decompressed);
if (indexErr != null)
{
if (indexErr is InvalidDataException)
{
var (lostData, _, rebuildErr) = RebuildStateLocked();
if (Fs != null)
Fs.RebuildState(lostData);
if (rebuildErr != null)
return rebuildErr;
continue;
}
return indexErr;
}
if (decompressed.Length > 0)
{
Cloads++;
StartCacheExpireTimer();
}
return null;
}
}
finally
{
ClearLoading();
}
}
// Fetches a message from this block.
internal (StoreMsg? Message, bool ExpireOk, Exception? Error) FetchMsg(ulong seq, StoreMsg? sm)
=> FetchMsgEx(seq, sm, doCopy: true);
// Fetches a message from this block without copy preference.
internal (StoreMsg? Message, bool ExpireOk, Exception? Error) FetchMsgNoCopy(ulong seq, StoreMsg? sm)
=> FetchMsgEx(seq, sm, doCopy: false);
// Fetches a message from this block.
internal (StoreMsg? Message, bool ExpireOk, Exception? Error) FetchMsgEx(ulong seq, StoreMsg? sm, bool doCopy)
{
Mu.EnterWriteLock();
try
{
var firstSeq = First.Seq;
var lastSeq = Last.Seq;
if (seq < firstSeq || seq > lastSeq)
return (null, false, StoreErrors.ErrStoreMsgNotFound);
if (Dmap.Exists(seq))
{
var previous = Llseq;
if (Llseq == 0 || seq < Llseq || seq == Llseq + 1 || seq + 1 == Llseq)
Llseq = seq;
var deletedExpireOk = (seq == lastSeq && previous == seq - 1) || (seq == firstSeq && previous == seq + 1);
return (null, deletedExpireOk, new InvalidOperationException("deleted message"));
}
if (CacheNotLoaded())
{
var loadErr = LoadMsgsWithLock();
if (loadErr != null)
return (null, false, loadErr);
}
var cache = CacheData;
if (cache == null || cache.Buf.Length == 0 || cache.Idx.Length == 0 || seq < cache.Fseq)
return (null, false, StoreErrors.ErrStoreMsgNotFound);
var slot = seq - cache.Fseq;
if (slot >= (ulong)cache.Idx.Length)
return (null, false, StoreErrors.ErrStoreMsgNotFound);
var raw = cache.Idx[(int)slot];
if ((raw & FileStoreDefaults.Dbit) != 0)
return (null, false, new InvalidOperationException("deleted message"));
var offset = (int)(raw & ~(FileStoreDefaults.Dbit | FileStoreDefaults.Cbit));
if (offset < 0 || offset >= cache.Buf.Length)
return (null, false, new ErrBadMsg(Mfn, "invalid cache index"));
if (!TryReadRecordHeader(cache.Buf.AsSpan(offset), out var seqRaw, out var ts, out var subjectLength, out var headerLength, out var msgLength, out var recordLength, out var parseErr))
return (null, false, parseErr ?? new ErrBadMsg(Mfn, "malformed record"));
if (offset + recordLength > cache.Buf.Length)
return (null, false, new ErrBadMsg(Mfn, "record extends past cache buffer"));
var record = cache.Buf.AsSpan(offset, recordLength);
var payloadLength = recordLength - FileStoreDefaults.RecordHashSize;
var payload = record[..payloadLength];
var checksum = record[payloadLength..];
var storedSeq = seqRaw & ~FileStoreDefaults.Ebit & ~FileStoreDefaults.Tbit;
if (storedSeq != seq)
return (null, false, StoreErrors.ErrStoreMsgNotFound);
if ((raw & FileStoreDefaults.Cbit) == 0)
{
var computed = SHA256.HashData(payload);
if (!computed.AsSpan(0, FileStoreDefaults.RecordHashSize).SequenceEqual(checksum))
return (null, false, new ErrBadMsg(Mfn, "invalid checksum"));
cache.Idx[(int)slot] = raw | FileStoreDefaults.Cbit;
}
var subjectOffset = 8 + 8 + 4 + 4 + 4;
var headerOffset = subjectOffset + subjectLength;
var msgOffset = headerOffset + headerLength;
var parsed = sm ?? new StoreMsg();
parsed.Seq = seq;
parsed.Ts = ts;
parsed.Subject = subjectLength > 0
? Encoding.UTF8.GetString(record.Slice(subjectOffset, subjectLength))
: string.Empty;
var headerSlice = record.Slice(headerOffset, headerLength);
var msgSlice = record.Slice(msgOffset, msgLength);
if (doCopy)
{
parsed.Hdr = headerSlice.ToArray();
parsed.Msg = msgSlice.ToArray();
}
else
{
// StoreMsg uses byte[] fields; preserving that shape still requires arrays.
parsed.Hdr = headerSlice.ToArray();
parsed.Msg = msgSlice.ToArray();
}
parsed.Buf = new byte[parsed.Hdr.Length + parsed.Msg.Length];
if (parsed.Hdr.Length > 0)
Buffer.BlockCopy(parsed.Hdr, 0, parsed.Buf, 0, parsed.Hdr.Length);
if (parsed.Msg.Length > 0)
Buffer.BlockCopy(parsed.Msg, 0, parsed.Buf, parsed.Hdr.Length, parsed.Msg.Length);
var prevSeq = Llseq;
if (Llseq == 0 || seq < Llseq || seq == Llseq + 1 || seq + 1 == Llseq)
Llseq = seq;
Lrts = JetStreamFileStore.TimestampNormalized(DateTime.UtcNow);
var expireOk = (seq == lastSeq && prevSeq == seq - 1) || (seq == firstSeq && prevSeq == seq + 1);
return (parsed, expireOk, null);
}
finally
{
FinishedWithCache();
Mu.ExitWriteLock();
}
}
private Exception? FlushPendingToDiskLocked()
{
var (pending, pendingErr) = BytesPending();
if (pendingErr != null || pending == null || pending.Length == 0)
return pendingErr;
try
{
Mfd ??= new FileStream(Mfn, FileMode.OpenOrCreate, FileAccess.ReadWrite, FileShare.Read);
Mfd.Seek(0, SeekOrigin.End);
Mfd.Write(pending, 0, pending.Length);
Mfd.Flush(SyncAlways);
NeedSync = false;
if (CacheData != null)
CacheData.Wp = CacheData.Buf.Length;
return null;
}
catch (Exception ex)
{
Werr = ex;
return ex;
}
var (_, err) = FlushPendingMsgsLocked();
return err;
}
private void ResetCacheExpireTimerLocked(TimeSpan due)