feat(batch14): complete group1 write path and wave1 tests

This commit is contained in:
Joseph Doherty
2026-02-28 15:10:46 -05:00
parent 7670488369
commit 4f0a7f40fc
7 changed files with 869 additions and 6 deletions

View File

@@ -14,6 +14,7 @@
// Adapted from server/filestore.go (fileStore struct and methods)
using System.Buffers;
using System.Buffers.Binary;
using System.Security.Cryptography;
using System.Text;
using System.Text.Json;
@@ -2032,6 +2033,576 @@ public sealed class JetStreamFileStore : IStreamStore, IDisposable
_lmb = _blks.Count > 0 ? _blks[^1] : null;
}
// -----------------------------------------------------------------------
// Write/lifecycle helper methods (Batch 14 Group 1)
// -----------------------------------------------------------------------
// Lock should be held.
private Dictionary<string, ulong>? SubjectsTotalsLocked(string filter)
{
if (_psim == null || _psim.Size() == 0)
return null;
if (string.IsNullOrEmpty(filter))
filter = ">";
var totals = new Dictionary<string, ulong>(StringComparer.Ordinal);
_psim.Match(Encoding.UTF8.GetBytes(filter), (subj, psi) =>
{
totals[Encoding.UTF8.GetString(subj)] = psi.Total;
return true;
});
return totals;
}
// Lock should be held.
private byte[] HashKeyForBlock(uint index)
=> Encoding.UTF8.GetBytes($"{_cfg.Config.Name}-{index}");
// Lock should be held.
private (MessageBlock? Mb, Exception? Error) NewMsgBlockForWrite()
{
uint index = 1;
byte[]? recycledBuf = null;
if (_lmb != null)
{
_lmb.Mu.EnterWriteLock();
try
{
index = _lmb.Index + 1;
_lmb.Qch = null;
_lmb.CloseFDs();
if (_lmb.CacheData?.Buf is { Length: > 0 } buf)
{
recycledBuf = buf;
_lmb.CacheData = null;
}
}
finally
{
_lmb.Mu.ExitWriteLock();
}
}
var mb = InitMsgBlock(index);
mb.Mu.EnterWriteLock();
try
{
mb.Fss = new SubjectTree<SimpleState>();
mb.Llts = 0;
mb.Lwts = TimestampNormalized(DateTime.UtcNow);
mb.First = new MsgId { Seq = _state.LastSeq + 1, Ts = 0 };
mb.Last = new MsgId { Seq = _state.LastSeq, Ts = TimestampNormalized(_state.LastTime) };
mb.CacheData = new Cache
{
Buf = recycledBuf ?? GetMsgBlockBuf((int)FileStoreDefaults.DefaultTinyBlockSize),
Fseq = mb.First.Seq,
Idx = Array.Empty<uint>(),
Nra = false,
Wp = 0,
};
}
finally
{
mb.Mu.ExitWriteLock();
}
try
{
mb.Mfd = new FileStream(mb.Mfn, FileMode.OpenOrCreate, FileAccess.ReadWrite, FileShare.Read);
}
catch (Exception ex)
{
return (null, ex);
}
if (_prf != null)
{
var encErr = GenEncryptionKeysForBlock(mb);
if (encErr != null)
return (null, encErr);
}
_ = SHA256.HashData(HashKeyForBlock(index));
AddMsgBlock(mb);
return (mb, null);
}
// Lock should be held.
private Exception? GenEncryptionKeysForBlock(MessageBlock? mb)
{
if (mb == null)
return null;
try
{
var (key, seed, encrypted) = GenEncryptionKeys($"{_cfg.Config.Name}:{mb.Index}");
mb.Seed = seed;
mb.Nonce = encrypted[..key.NonceSize];
var mdir = Path.Combine(_fcfg.StoreDir, FileStoreDefaults.MsgDir);
var keyFile = Path.Combine(mdir, string.Format(FileStoreDefaults.KeyScan, mb.Index));
WriteFileWithOptionalSync(keyFile, encrypted);
mb.Kfn = keyFile;
return null;
}
catch (Exception ex)
{
return ex;
}
}
// Lock should be held.
private Exception? StoreRawMsgInternal(string subject, byte[]? hdr, byte[]? msg, ulong seq, long ts, long ttl, bool discardNewCheck)
{
if (IsClosed())
return StoreErrors.ErrStoreClosed;
hdr ??= Array.Empty<byte>();
msg ??= Array.Empty<byte>();
if (_lmb == null)
{
var (newMb, newErr) = NewMsgBlockForWrite();
if (newErr != null)
return newErr;
_lmb = newMb;
}
var projectedSize = _lmb!.RBytes + FileStoreMsgSize(subject, hdr, msg);
if (_fcfg.BlockSize > 0 && projectedSize > _fcfg.BlockSize)
{
var (newMb, newErr) = NewMsgBlockForWrite();
if (newErr != null)
return newErr;
_lmb = newMb;
}
try
{
_memStore.StoreRawMsg(subject, hdr, msg, seq, ts, ttl, discardNewCheck);
}
catch (Exception ex)
{
return ex;
}
_state = _memStore.State();
_receivedAny = true;
var record = BuildMessageRecord(subject, hdr, msg, seq, ts);
_lmb!.Mu.EnterWriteLock();
try
{
_lmb.Mfd ??= new FileStream(_lmb.Mfn, FileMode.OpenOrCreate, FileAccess.ReadWrite, FileShare.Read);
_lmb.Mfd.Seek(0, SeekOrigin.End);
_lmb.Mfd.Write(record, 0, record.Length);
if (_fcfg.SyncAlways)
_lmb.Mfd.Flush(true);
var msgSize = FileStoreMsgSize(subject, hdr, msg);
_lmb.Msgs++;
_lmb.Bytes += msgSize;
_lmb.RBytes += (ulong)record.Length;
_lmb.CBytes = _lmb.Bytes;
_lmb.NeedSync = !_fcfg.SyncAlways;
_lmb.Lwts = ts;
_lmb.Lchk = record.AsSpan(record.Length - FileStoreDefaults.RecordHashSize).ToArray();
if (_lmb.First.Seq == 0 || _lmb.Msgs == 1 || seq < _lmb.First.Seq)
_lmb.First = new MsgId { Seq = seq, Ts = ts };
if (seq >= _lmb.Last.Seq)
_lmb.Last = new MsgId { Seq = seq, Ts = ts };
_lmb.Fss ??= new SubjectTree<SimpleState>();
if (!string.IsNullOrEmpty(subject))
{
var subj = Encoding.UTF8.GetBytes(subject);
var (ss, ok) = _lmb.Fss.Find(subj);
if (ok && ss != null)
{
ss.Msgs++;
if (ss.First == 0 || seq < ss.First)
ss.First = seq;
if (seq > ss.Last)
ss.Last = seq;
ss.LastNeedsUpdate = false;
}
else
{
_lmb.Fss.Insert(subj, new SimpleState
{
Msgs = 1,
First = seq,
Last = seq,
LastNeedsUpdate = false,
});
}
}
}
finally
{
_lmb.Mu.ExitWriteLock();
}
RefreshPsiFromMemStoreLocked();
EnforceMsgLimit();
EnforceBytesLimit();
EnforceMsgPerSubjectLimit(fireCallback: false);
return null;
}
// Lock should be held.
private void RebuildFirst()
{
_state = _memStore.State();
if (_state.Msgs == 0)
{
_state.FirstSeq = _state.LastSeq + 1;
_state.FirstTime = default;
return;
}
var ss = _memStore.FilteredState(_state.FirstSeq, ">");
if (ss.Msgs == 0 || ss.First == 0)
{
_state.FirstSeq = _state.LastSeq + 1;
_state.FirstTime = default;
return;
}
_state.FirstSeq = ss.First;
var sm = _memStore.LoadMsg(ss.First, null);
if (sm != null)
_state.FirstTime = FromUnixNanosUtc(sm.Ts);
SelectNextFirst();
}
// Lock should be held.
private (ulong Seq, Exception? Error) FirstSeqForSubj(string subj)
{
if (string.IsNullOrEmpty(subj))
return (0, null);
try
{
var ss = _memStore.FilteredState(_state.FirstSeq, subj);
return (ss.Msgs > 0 ? ss.First : 0, null);
}
catch (Exception ex)
{
return (0, ex);
}
}
// Lock should be held.
private void EnforceMsgLimit()
{
if (_cfg.Config.MaxMsgs <= 0)
return;
var maxMsgs = (ulong)_cfg.Config.MaxMsgs;
while (_state.Msgs > maxMsgs)
{
var (removed, err) = DeleteFirstMsg();
if (err != null || !removed)
{
RebuildFirst();
break;
}
}
}
// Lock should be held.
private void EnforceBytesLimit()
{
if (_cfg.Config.MaxBytes <= 0)
return;
var maxBytes = (ulong)_cfg.Config.MaxBytes;
while (_state.Bytes > maxBytes)
{
var (removed, err) = DeleteFirstMsg();
if (err != null || !removed)
{
RebuildFirst();
break;
}
}
}
// Lock should be held.
private void EnforceMsgPerSubjectLimit(bool fireCallback)
{
if (_cfg.Config.MaxMsgsPer <= 0)
return;
var limit = (ulong)_cfg.Config.MaxMsgsPer;
var totals = _memStore.SubjectsTotals(">");
foreach (var entry in totals.ToArray())
{
var subj = entry.Key;
var remaining = entry.Value;
while (remaining > limit)
{
var (fseq, seqErr) = FirstSeqForSubj(subj);
if (seqErr != null || fseq == 0)
break;
var (removed, remErr) = RemoveMsgViaLimits(fseq);
if (remErr != null || !removed)
break;
remaining--;
}
}
if (fireCallback)
_state = _memStore.State();
}
// Lock should be held.
private (bool Removed, Exception? Error) DeleteFirstMsg()
=> RemoveMsgViaLimits(_state.FirstSeq);
// Lock should be held.
private (bool Removed, Exception? Error) RemoveMsgViaLimits(ulong seq)
=> RemoveMsgInternal(seq, secure: false, viaLimits: true, needFsLock: false);
// Lock should be held.
private ulong RemovePerSubject(string subj)
{
if (_psim == null || string.IsNullOrEmpty(subj))
return 0;
var subjBytes = Encoding.UTF8.GetBytes(subj);
var (psi, ok) = _psim.Find(subjBytes);
if (!ok || psi == null)
return 0;
_psim.Delete(subjBytes);
_tsl = Math.Max(0, _tsl - subj.Length);
return psi.Total;
}
private (bool Removed, Exception? Error) RemoveMsgInternal(ulong seq, bool secure, bool viaLimits, bool needFsLock)
{
if (needFsLock)
_mu.EnterWriteLock();
try
{
if (IsClosed())
return (false, StoreErrors.ErrStoreClosed);
var sm = _memStore.LoadMsg(seq, null);
var (removed, err) = secure ? _memStore.EraseMsg(seq) : _memStore.RemoveMsg(seq);
if (err != null || !removed)
return (removed, err);
_state = _memStore.State();
var mb = FindMsgBlockForSeq(seq);
if (mb != null)
{
var (_, blockErr) = RemoveMsgFromBlock(mb, seq, secure, viaLimits, sm);
if (blockErr != null)
return (true, blockErr);
}
RefreshPsiFromMemStoreLocked();
if (_state.Msgs == 0)
_firstMoved = true;
return (true, null);
}
finally
{
if (needFsLock)
_mu.ExitWriteLock();
}
}
// Lock should be held.
private (bool Removed, Exception? Error) RemoveMsgFromBlock(MessageBlock mb, ulong seq, bool secure, bool viaLimits)
=> RemoveMsgFromBlock(mb, seq, secure, viaLimits, null);
// Lock should be held.
private (bool Removed, Exception? Error) RemoveMsgFromBlock(MessageBlock mb, ulong seq, bool secure, bool viaLimits, StoreMsg? sm)
{
ArgumentNullException.ThrowIfNull(mb);
mb.Mu.EnterWriteLock();
try
{
if (seq < mb.First.Seq || seq > mb.Last.Seq)
return (false, null);
if (mb.Msgs > 0)
mb.Msgs--;
mb.Dmap.Insert(seq);
if (sm != null)
{
var size = FileStoreMsgSize(sm.Subject, sm.Hdr, sm.Msg);
if (size <= mb.Bytes)
mb.Bytes -= size;
else
mb.Bytes = 0;
if (!mb.NoTrack && mb.Fss != null && !string.IsNullOrEmpty(sm.Subject))
{
var subjBytes = Encoding.UTF8.GetBytes(sm.Subject);
var (ss, ok) = mb.Fss.Find(subjBytes);
if (ok && ss != null)
{
if (ss.Msgs > 0)
ss.Msgs--;
if (ss.Msgs == 0)
mb.Fss.Delete(subjBytes);
else if (seq == ss.First || seq == ss.Last)
ss.LastNeedsUpdate = true;
}
}
}
mb.SelectNextFirst();
if (mb.Msgs == 0)
{
mb.First = new MsgId { Seq = mb.Last.Seq + 1, Ts = 0 };
mb.Last = new MsgId { Seq = mb.First.Seq - 1, Ts = 0 };
}
mb.NeedSync = mb.NeedSync || viaLimits || secure;
return (true, null);
}
finally
{
mb.Mu.ExitWriteLock();
}
}
// Lock should be held.
private void RemoveMsgsInRange(ulong first, ulong last, bool viaLimits)
{
if (first == 0 || last == 0 || last < first)
return;
for (var seq = first; seq <= last; seq++)
{
_ = RemoveMsgInternal(seq, secure: false, viaLimits: viaLimits, needFsLock: false);
if (seq == ulong.MaxValue)
break;
}
RebuildFirst();
}
private bool IsClosed()
=> _closed;
// Lock should be held.
private void SelectNextFirst()
{
foreach (var mb in _blks)
{
mb.Mu.EnterWriteLock();
try
{
mb.SelectNextFirst();
if (mb.Msgs == 0)
continue;
_state.FirstSeq = mb.First.Seq;
var sm = _memStore.LoadMsg(_state.FirstSeq, null);
_state.FirstTime = sm == null ? default : FromUnixNanosUtc(sm.Ts);
return;
}
finally
{
mb.Mu.ExitWriteLock();
}
}
_state.FirstSeq = _state.LastSeq + 1;
_state.FirstTime = default;
}
// Lock should be held.
private MessageBlock? FindMsgBlockForSeq(ulong seq)
{
foreach (var mb in _blks)
{
if (seq >= mb.First.Seq && seq <= mb.Last.Seq)
return mb;
}
return null;
}
// Lock should be held.
private void RefreshPsiFromMemStoreLocked()
{
_psim ??= new SubjectTree<Psi>();
_psim.Reset();
_tsl = 0;
var totals = _memStore.SubjectsTotals(">");
if (totals.Count == 0)
return;
var fblk = _blks.Count > 0 ? _blks[0].Index : 1U;
var lblk = _lmb?.Index ?? fblk;
foreach (var kvp in totals)
{
if (kvp.Value == 0)
continue;
var subject = kvp.Key;
_psim.Insert(Encoding.UTF8.GetBytes(subject), new Psi
{
Total = kvp.Value,
Fblk = fblk,
Lblk = lblk,
});
_tsl += subject.Length;
}
}
private static byte[] BuildMessageRecord(string subject, ReadOnlySpan<byte> hdr, ReadOnlySpan<byte> msg, ulong seq, long ts)
{
var subjectBytes = Encoding.UTF8.GetBytes(subject);
var bodyLength = (8 + 8 + 4 + 4 + 4) + subjectBytes.Length + hdr.Length + msg.Length;
var record = new byte[bodyLength + FileStoreDefaults.RecordHashSize];
var span = record.AsSpan();
var index = 0;
BinaryPrimitives.WriteUInt64LittleEndian(span[index..], seq);
index += 8;
BinaryPrimitives.WriteInt64LittleEndian(span[index..], ts);
index += 8;
BinaryPrimitives.WriteInt32LittleEndian(span[index..], subjectBytes.Length);
index += 4;
BinaryPrimitives.WriteInt32LittleEndian(span[index..], hdr.Length);
index += 4;
BinaryPrimitives.WriteInt32LittleEndian(span[index..], msg.Length);
index += 4;
subjectBytes.CopyTo(span[index..]);
index += subjectBytes.Length;
hdr.CopyTo(span[index..]);
index += hdr.Length;
msg.CopyTo(span[index..]);
index += msg.Length;
var hash = SHA256.HashData(record.AsSpan(0, index));
hash.AsSpan(0, FileStoreDefaults.RecordHashSize).CopyTo(span[index..]);
return record;
}
// -----------------------------------------------------------------------
// Read/query helper methods (Batch 13)
// -----------------------------------------------------------------------

View File

@@ -267,6 +267,26 @@ internal sealed class MessageBlock
CacheData = null;
Fss = null;
}
// Lock should be held.
internal void SelectNextFirst()
{
if (Msgs == 0 || First.Seq == 0 || Last.Seq == 0)
return;
var seq = First.Seq;
while (seq <= Last.Seq && Dmap.Exists(seq))
{
if (seq == ulong.MaxValue)
break;
seq++;
}
if (seq > Last.Seq)
First = new MsgId { Seq = Last.Seq + 1, Ts = 0 };
else
First = new MsgId { Seq = seq, Ts = First.Ts };
}
}
// ---------------------------------------------------------------------------