feat(batch15): complete group 1 msgblock/consumerfilestore

This commit is contained in:
Joseph Doherty
2026-02-28 17:03:31 -05:00
parent 5367c3f34d
commit f36bc3111b
8 changed files with 813 additions and 10 deletions

View File

@@ -983,6 +983,8 @@ public sealed class JetStreamFileStore : IStreamStore, IDisposable
mb.Seed = generatedSeed;
var nonceSize = GenEncryptionKey(_fcfg.Cipher, _prf(Encoding.UTF8.GetBytes($"{_cfg.Config.Name}:{mb.Index}"))).NonceSize;
mb.Nonce = encrypted[..nonceSize];
mb.Aek = GenEncryptionKey(_fcfg.Cipher, mb.Seed);
mb.Bek = GenBlockEncryptionKey(_fcfg.Cipher, mb.Seed, mb.Nonce);
return;
}
@@ -999,6 +1001,8 @@ public sealed class JetStreamFileStore : IStreamStore, IDisposable
var seed = kek.Open(ekey.AsSpan(0, ns), ekey.AsSpan(ns));
mb.Seed = seed;
mb.Nonce = ekey[..ns];
mb.Aek = GenEncryptionKey(_fcfg.Cipher, mb.Seed);
mb.Bek = GenBlockEncryptionKey(_fcfg.Cipher, mb.Seed, mb.Nonce);
}
internal MessageBlock RecoverMsgBlock(uint index)
@@ -2402,6 +2406,8 @@ public sealed class JetStreamFileStore : IStreamStore, IDisposable
var (key, seed, encrypted) = GenEncryptionKeys($"{_cfg.Config.Name}:{mb.Index}");
mb.Seed = seed;
mb.Nonce = encrypted[..key.NonceSize];
mb.Aek = GenEncryptionKey(_fcfg.Cipher, mb.Seed);
mb.Bek = GenBlockEncryptionKey(_fcfg.Cipher, mb.Seed, mb.Nonce);
var mdir = Path.Combine(_fcfg.StoreDir, FileStoreDefaults.MsgDir);
var keyFile = Path.Combine(mdir, string.Format(FileStoreDefaults.KeyScan, mb.Index));
@@ -2415,6 +2421,21 @@ public sealed class JetStreamFileStore : IStreamStore, IDisposable
}
}
internal Exception? RegenerateEncryptionKeysForBlock(MessageBlock mb)
{
ArgumentNullException.ThrowIfNull(mb);
_mu.EnterWriteLock();
try
{
return GenEncryptionKeysForBlock(mb);
}
finally
{
_mu.ExitWriteLock();
}
}
// Lock should be held.
private Exception? StoreRawMsgInternal(string subject, byte[]? hdr, byte[]? msg, ulong seq, long ts, long ttl, bool discardNewCheck)
{

View File

@@ -15,6 +15,8 @@
using System.Text.Json;
using System.Threading.Channels;
using System.Buffers.Binary;
using System.Security.Cryptography;
using ZB.MOM.NatsNet.Server.Internal.DataStructures;
namespace ZB.MOM.NatsNet.Server;
@@ -76,6 +78,12 @@ internal sealed class MessageBlock
/// <summary>Per-block nonce recovered from the encrypted key payload.</summary>
public byte[]? Nonce { get; set; }
/// <summary>Per-block AEAD key derived from <see cref="Seed"/>.</summary>
public JetStreamFileStore.AeadCipher? Aek { get; set; }
/// <summary>Per-block stream cipher derived from <see cref="Seed"/> and <see cref="Nonce"/>.</summary>
public JetStreamFileStore.XorStreamCipher? Bek { get; set; }
// ------------------------------------------------------------------
// Compression
// ------------------------------------------------------------------
@@ -259,6 +267,656 @@ internal sealed class MessageBlock
Mfd = null;
}
internal Exception? CheckAndLoadEncryption()
{
var fs = Fs;
if (fs == null)
return null;
if (Aek != null && Bek != null)
return null;
try
{
fs.LoadEncryptionForMsgBlock(this);
return null;
}
catch (Exception ex)
{
return ex;
}
}
internal void EnsureLastChecksumLoaded()
{
if (HasChecksum(Lchk))
return;
var lchk = LastChecksum();
if (lchk.Length == FileStoreDefaults.RecordHashSize)
Lchk = lchk;
}
internal Exception? ConvertCipher()
{
var fs = Fs;
if (fs == null)
return new InvalidOperationException("message block has no parent file store");
var checkErr = CheckAndLoadEncryption();
if (checkErr != null)
return checkErr;
if (Bek == null)
return null;
byte[] encrypted;
try
{
encrypted = File.Exists(Mfn) ? File.ReadAllBytes(Mfn) : Array.Empty<byte>();
}
catch (Exception ex)
{
return ex;
}
if (encrypted.Length == 0)
return null;
var plaintext = (byte[])encrypted.Clone();
Bek.XorKeyStream(plaintext.AsSpan());
if (!TryValidateRecordBuffer(plaintext.AsSpan(), out _))
return new InvalidDataException("unable to recover keys");
var regenErr = fs.RegenerateEncryptionKeysForBlock(this);
if (regenErr != null)
return regenErr;
var reloadErr = CheckAndLoadEncryption();
if (reloadErr != null)
return reloadErr;
if (Bek == null)
return new InvalidOperationException("missing block encryption key after regeneration");
Bek.XorKeyStream(plaintext.AsSpan());
try
{
File.WriteAllBytes(Mfn, plaintext);
return null;
}
catch (Exception ex)
{
return ex;
}
}
internal Exception? ConvertToEncrypted()
{
var checkErr = CheckAndLoadEncryption();
if (checkErr != null)
return checkErr;
if (Bek == null)
return null;
byte[] buf;
try
{
buf = File.Exists(Mfn) ? File.ReadAllBytes(Mfn) : Array.Empty<byte>();
}
catch (Exception ex)
{
return ex;
}
if (buf.Length == 0)
return null;
if (!TryValidateRecordBuffer(buf.AsSpan(), out _))
return new InvalidDataException("unable to convert block to encrypted format");
Bek.XorKeyStream(buf.AsSpan());
try
{
File.WriteAllBytes(Mfn, buf);
return null;
}
catch (Exception ex)
{
return ex;
}
}
internal uint GetIndex()
{
Mu.EnterReadLock();
try
{
return Index;
}
finally
{
Mu.ExitReadLock();
}
}
internal (LostStreamData? LostData, ulong[] Tombstones, Exception? Error) RebuildState()
{
Mu.EnterWriteLock();
try
{
return RebuildStateLocked();
}
finally
{
Mu.ExitWriteLock();
}
}
internal (LostStreamData? LostData, ulong[] Tombstones, Exception? Error) RebuildStateLocked()
{
var startFirst = First.Seq;
var startLast = Last.Seq;
var previousDeleted = SnapshotDeletedSequences();
TryForceExpireCacheLocked();
Fss = null;
byte[] buf;
try
{
buf = File.Exists(Mfn) ? File.ReadAllBytes(Mfn) : Array.Empty<byte>();
RBytes = (ulong)buf.Length;
}
catch (Exception ex)
{
return (null, Array.Empty<ulong>(), ex);
}
if (buf.Length == 0)
{
var ld = BuildLostData(startFirst, startLast, previousDeleted, bytes: Bytes);
Msgs = 0;
Bytes = 0;
RBytes = 0;
Dmap.Empty();
First = new MsgId { Seq = startLast + 1, Ts = 0 };
Last = new MsgId { Seq = startLast, Ts = 0 };
return (ld, Array.Empty<ulong>(), null);
}
var encErr = CheckAndLoadEncryption();
if (encErr != null)
return (null, Array.Empty<ulong>(), encErr);
if (Bek != null)
Bek.XorKeyStream(buf.AsSpan());
return RebuildStateFromBufLocked(buf, allowTruncate: true);
}
internal (LostStreamData? LostData, ulong[] Tombstones, Exception? Error) RebuildStateFromBufLocked(byte[] buf, bool allowTruncate)
{
ArgumentNullException.ThrowIfNull(buf);
var startLast = Last.Seq;
var tombstones = new List<ulong>();
var oldDeleted = SnapshotDeletedSequences();
Msgs = 0;
Bytes = 0;
RBytes = (ulong)buf.Length;
First = new MsgId { Seq = 0, Ts = 0 };
Last = new MsgId { Seq = 0, Ts = 0 };
Dmap.Empty();
ulong highestSeq = 0;
ulong maxTombstoneSeq = 0;
long maxTombstoneTs = 0;
var index = 0;
while (index < buf.Length)
{
if (!TryReadRecordHeader(buf.AsSpan(index), out var seqRaw, out var ts, out var subjectLength, out var headerLength, out var msgLength, out var recordLength, out var headerError))
{
var ld = BuildLostData(Last.Seq + 1, startLast, oldDeleted, bytes: (ulong)(buf.Length - index));
return (ld, tombstones.ToArray(), headerError ?? new InvalidDataException("corrupt message header"));
}
if (recordLength <= 0 || index + recordLength > buf.Length)
{
var ld = BuildLostData(Last.Seq + 1, startLast, oldDeleted, bytes: (ulong)(buf.Length - index));
return (ld, tombstones.ToArray(), new InvalidDataException("record extends past block length"));
}
var record = buf.AsSpan(index, recordLength);
var payloadLength = recordLength - FileStoreDefaults.RecordHashSize;
var payload = record[..payloadLength];
var checksum = record[payloadLength..];
var computed = SHA256.HashData(payload);
if (!computed.AsSpan(0, FileStoreDefaults.RecordHashSize).SequenceEqual(checksum))
{
var ld = BuildLostData(Last.Seq + 1, startLast, oldDeleted, bytes: (ulong)(buf.Length - index));
return (ld, tombstones.ToArray(), new ErrBadMsg(Mfn, "invalid checksum"));
}
Lchk = checksum.ToArray();
var seq = seqRaw & ~FileStoreDefaults.Tbit & ~FileStoreDefaults.Ebit;
var isTombstone = (seqRaw & FileStoreDefaults.Tbit) != 0 ||
(subjectLength == 0 && headerLength == 0 && msgLength == 0);
if (isTombstone && seq > 0)
{
tombstones.Add(seq);
Dmap.Insert(seq);
if (seq > maxTombstoneSeq)
{
maxTombstoneSeq = seq;
maxTombstoneTs = ts;
}
if (seq > Last.Seq)
Last = new MsgId { Seq = seq, Ts = ts };
index += recordLength;
continue;
}
if (seq == 0)
{
index += recordLength;
continue;
}
if (First.Seq == 0 || seq < First.Seq)
First = new MsgId { Seq = seq, Ts = ts };
if (seq > highestSeq)
{
highestSeq = seq;
Last = new MsgId { Seq = seq, Ts = ts };
}
if (!Dmap.Exists(seq))
{
Msgs++;
Bytes += (ulong)recordLength;
}
index += recordLength;
}
if (Msgs == 0)
{
if (maxTombstoneSeq > 0)
{
First = new MsgId { Seq = maxTombstoneSeq + 1, Ts = 0 };
if (Last.Seq == 0)
Last = new MsgId { Seq = maxTombstoneSeq, Ts = maxTombstoneTs };
}
else if (First.Seq > 0)
{
Last = new MsgId { Seq = First.Seq - 1, Ts = 0 };
}
else
{
First = new MsgId { Seq = startLast + 1, Ts = 0 };
Last = new MsgId { Seq = startLast, Ts = 0 };
}
}
return (null, tombstones.ToArray(), null);
}
internal byte[] LastChecksum()
{
byte[] checksum = new byte[FileStoreDefaults.RecordHashSize];
if (string.IsNullOrWhiteSpace(Mfn) || !File.Exists(Mfn))
return Array.Empty<byte>();
try
{
using var file = new FileStream(Mfn, FileMode.Open, FileAccess.Read, FileShare.ReadWrite);
RBytes = (ulong)file.Length;
if (RBytes < FileStoreDefaults.RecordHashSize)
return checksum;
var encErr = CheckAndLoadEncryption();
if (encErr != null)
return Array.Empty<byte>();
if (Bek != null)
{
var buf = File.ReadAllBytes(Mfn);
if (buf.Length < FileStoreDefaults.RecordHashSize)
return checksum;
Bek.XorKeyStream(buf.AsSpan());
Buffer.BlockCopy(buf, buf.Length - FileStoreDefaults.RecordHashSize, checksum, 0, FileStoreDefaults.RecordHashSize);
}
else
{
file.Seek(-FileStoreDefaults.RecordHashSize, SeekOrigin.End);
file.ReadExactly(checksum, 0, checksum.Length);
}
return checksum;
}
catch
{
return Array.Empty<byte>();
}
}
internal (StoreMsg? Message, bool DidLoad, Exception? Error) FirstMatchingMulti(SimpleSublist? sl, ulong start, StoreMsg? sm)
{
Mu.EnterWriteLock();
try
{
if (Fs == null || sl == null)
return (null, false, StoreErrors.ErrStoreMsgNotFound);
var floor = Math.Max(start, First.Seq);
if (floor > Last.Seq)
return (null, false, StoreErrors.ErrStoreMsgNotFound);
var (msg, _) = Fs.LoadNextMsgMulti(sl, floor, sm);
if (msg == null || msg.Seq > Last.Seq)
return (null, false, StoreErrors.ErrStoreMsgNotFound);
Llseq = msg.Seq;
Llts = JetStreamFileStore.TimestampNormalized(DateTime.UtcNow);
return (msg, false, null);
}
catch (Exception ex)
{
return (null, false, ex);
}
finally
{
Mu.ExitWriteLock();
}
}
internal (StoreMsg? Message, bool DidLoad, Exception? Error) FirstMatching(string filter, bool wc, ulong start, StoreMsg? sm)
{
Mu.EnterWriteLock();
try
{
if (Fs == null)
return (null, false, StoreErrors.ErrStoreMsgNotFound);
var floor = Math.Max(start, First.Seq);
if (floor > Last.Seq)
return (null, false, StoreErrors.ErrStoreMsgNotFound);
var effectiveFilter = string.IsNullOrEmpty(filter) ? ">" : filter;
var useWildcard = wc || effectiveFilter == ">";
var (msg, _) = Fs.LoadNextMsg(effectiveFilter, useWildcard, floor, sm);
if (msg == null || msg.Seq > Last.Seq)
return (null, false, StoreErrors.ErrStoreMsgNotFound);
Llseq = msg.Seq;
Llts = JetStreamFileStore.TimestampNormalized(DateTime.UtcNow);
return (msg, false, null);
}
catch (Exception ex)
{
return (null, false, ex);
}
finally
{
Mu.ExitWriteLock();
}
}
internal (StoreMsg? Message, bool DidLoad, Exception? Error) PrevMatchingMulti(SimpleSublist? sl, ulong start, StoreMsg? sm)
{
Mu.EnterWriteLock();
try
{
if (Fs == null || sl == null)
return (null, false, StoreErrors.ErrStoreMsgNotFound);
var ceiling = Math.Min(start, Last.Seq);
if (ceiling < First.Seq)
return (null, false, StoreErrors.ErrStoreMsgNotFound);
var (msg, _, err) = Fs.LoadPrevMsgMulti(sl, ceiling, sm);
if (err != null)
return (null, false, err);
if (msg == null || msg.Seq < First.Seq)
return (null, false, StoreErrors.ErrStoreMsgNotFound);
Llseq = msg.Seq;
Llts = JetStreamFileStore.TimestampNormalized(DateTime.UtcNow);
return (msg, false, null);
}
catch (Exception ex)
{
return (null, false, ex);
}
finally
{
Mu.ExitWriteLock();
}
}
internal (ulong Total, ulong First, ulong Last) FilteredPending(string filter, bool wc, ulong seq)
{
Mu.EnterWriteLock();
try
{
return FilteredPendingLocked(filter, wc, seq);
}
finally
{
Mu.ExitWriteLock();
}
}
internal (ulong Total, ulong First, ulong Last) FilteredPendingLocked(string filter, bool wc, ulong sseq)
{
if (Fs == null)
return (0, 0, 0);
var floor = Math.Max(sseq, First.Seq);
if (floor > Last.Seq)
return (0, 0, 0);
var effectiveFilter = string.IsNullOrEmpty(filter) ? ">" : filter;
var useWildcard = wc || effectiveFilter == ">";
ulong total = 0;
ulong first = 0;
ulong last = 0;
var cursor = floor;
while (cursor <= Last.Seq)
{
var (msg, _) = Fs.LoadNextMsg(effectiveFilter, useWildcard, cursor, null);
if (msg == null || msg.Seq > Last.Seq)
break;
total++;
if (first == 0)
first = msg.Seq;
last = msg.Seq;
if (msg.Seq == ulong.MaxValue)
break;
cursor = msg.Seq + 1;
}
return (total, first, last);
}
internal Exception? SetupWriteCache(byte[]? initialBuffer = null)
{
if (CacheData != null)
return null;
if (!string.IsNullOrWhiteSpace(Mfn) && File.Exists(Mfn))
{
try
{
var existing = File.ReadAllBytes(Mfn);
CacheData = new Cache
{
Buf = existing,
Wp = existing.Length,
Fseq = First.Seq == 0 ? 1UL : First.Seq,
};
Llts = JetStreamFileStore.TimestampNormalized(DateTime.UtcNow);
return null;
}
catch (Exception ex)
{
return ex;
}
}
var buf = initialBuffer ?? JetStreamFileStore.GetMsgBlockBuf((int)FileStoreDefaults.DefaultTinyBlockSize);
CacheData = new Cache
{
Buf = buf,
Wp = 0,
Fseq = First.Seq == 0 ? 1UL : First.Seq,
};
Llts = JetStreamFileStore.TimestampNormalized(DateTime.UtcNow);
return null;
}
private static bool HasChecksum(byte[]? checksum)
{
if (checksum == null || checksum.Length != FileStoreDefaults.RecordHashSize)
return false;
foreach (var b in checksum)
{
if (b != 0)
return true;
}
return false;
}
private ulong[] SnapshotDeletedSequences()
{
if (Dmap.Size == 0)
return Array.Empty<ulong>();
var deleted = new List<ulong>(Dmap.Size);
Dmap.Range(seq =>
{
deleted.Add(seq);
return true;
});
return deleted.ToArray();
}
private static LostStreamData? BuildLostData(ulong start, ulong end, ulong[] deleted, ulong bytes)
{
if (start == 0 || start > end)
return null;
var deletedSet = deleted.Length == 0 ? null : new HashSet<ulong>(deleted);
var missing = new List<ulong>();
for (var seq = start; seq <= end; seq++)
{
if (deletedSet == null || !deletedSet.Contains(seq))
missing.Add(seq);
if (seq == ulong.MaxValue)
break;
}
if (missing.Count == 0)
return null;
return new LostStreamData
{
Msgs = missing.ToArray(),
Bytes = bytes,
};
}
private static bool TryReadRecordHeader(ReadOnlySpan<byte> record, out ulong seq, out long ts, out int subjectLength, out int headerLength, out int msgLength, out int totalLength, out Exception? error)
{
seq = 0;
ts = 0;
subjectLength = 0;
headerLength = 0;
msgLength = 0;
totalLength = 0;
error = null;
const int fixedHeaderLength = 8 + 8 + 4 + 4 + 4;
if (record.Length < fixedHeaderLength + FileStoreDefaults.RecordHashSize)
{
error = new InvalidDataException("record shorter than minimum header");
return false;
}
seq = BinaryPrimitives.ReadUInt64LittleEndian(record);
ts = BinaryPrimitives.ReadInt64LittleEndian(record[8..]);
subjectLength = BinaryPrimitives.ReadInt32LittleEndian(record[16..]);
headerLength = BinaryPrimitives.ReadInt32LittleEndian(record[20..]);
msgLength = BinaryPrimitives.ReadInt32LittleEndian(record[24..]);
if (subjectLength < 0 || headerLength < 0 || msgLength < 0)
{
error = new InvalidDataException("negative message section length");
return false;
}
var payloadLength = fixedHeaderLength + subjectLength + headerLength + msgLength;
if (payloadLength > record.Length || payloadLength > FileStoreDefaults.RlBadThresh)
{
error = new InvalidDataException("record payload length out of bounds");
return false;
}
totalLength = payloadLength + FileStoreDefaults.RecordHashSize;
if (totalLength > record.Length)
{
error = new InvalidDataException("record checksum exceeds block length");
return false;
}
return true;
}
private static bool TryValidateRecordBuffer(ReadOnlySpan<byte> buf, out int validLength)
{
validLength = 0;
while (validLength < buf.Length)
{
if (!TryReadRecordHeader(buf[validLength..], out _, out _, out _, out _, out _, out var recordLength, out _))
return false;
var record = buf.Slice(validLength, recordLength);
var payloadLength = recordLength - FileStoreDefaults.RecordHashSize;
var payload = record[..payloadLength];
var checksum = record[payloadLength..];
var computed = SHA256.HashData(payload);
if (!computed.AsSpan(0, FileStoreDefaults.RecordHashSize).SequenceEqual(checksum))
return false;
validLength += recordLength;
}
return validLength == buf.Length;
}
internal void TryForceExpireCacheLocked()
{
if (CacheData?.Buf is { Length: > 0 } buf)

View File

@@ -0,0 +1,54 @@
using System.Text.Json;
namespace ZB.MOM.NatsNet.Server;
/// <summary>
/// String/JSON helpers for store enums that mirror Go enum methods.
/// </summary>
public static class StoreEnumExtensions
{
public static string String(this StoreCipher cipher)
=> cipher switch
{
StoreCipher.ChaCha => "ChaCha20-Poly1305",
StoreCipher.Aes => "AES-GCM",
StoreCipher.NoCipher => "None",
_ => "Unknown StoreCipher",
};
public static string String(this StoreCompression alg)
=> alg switch
{
StoreCompression.NoCompression => "None",
StoreCompression.S2Compression => "S2",
_ => "Unknown StoreCompression",
};
public static byte[] MarshalJSON(this StoreCompression alg)
=> alg switch
{
StoreCompression.NoCompression => JsonSerializer.SerializeToUtf8Bytes("none"),
StoreCompression.S2Compression => JsonSerializer.SerializeToUtf8Bytes("s2"),
_ => throw new InvalidOperationException("unknown compression algorithm"),
};
public static void UnmarshalJSON(this ref StoreCompression alg, ReadOnlySpan<byte> b)
{
var parsed = JsonSerializer.Deserialize<string>(b);
if (parsed == null)
throw new InvalidDataException("compression value must be a JSON string");
alg = parsed switch
{
"none" => StoreCompression.NoCompression,
"s2" => StoreCompression.S2Compression,
_ => throw new InvalidOperationException("unknown compression algorithm"),
};
}
public static void UnmarshalJSON(this ref StoreCompression alg, byte[] b)
{
ArgumentNullException.ThrowIfNull(b);
UnmarshalJSON(ref alg, b.AsSpan());
}
}

View File

@@ -334,6 +334,16 @@ public sealed class LostStreamData
[JsonPropertyName("bytes")]
public ulong Bytes { get; set; }
/// <summary>
/// Returns the index of <paramref name="seq"/> in <see cref="Msgs"/> and whether it was found.
/// Mirrors Go's <c>LostStreamData.exists</c>.
/// </summary>
public (int Index, bool Found) Exists(ulong seq)
{
var index = Array.IndexOf(Msgs, seq);
return (index, index >= 0);
}
}
// ---------------------------------------------------------------------------