From f36bc3111bf50dabc2163e1b0e6e68a35b7deae5 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sat, 28 Feb 2026 17:03:31 -0500 Subject: [PATCH] feat(batch15): complete group 1 msgblock/consumerfilestore --- .../JetStream/FileStore.cs | 21 + .../JetStream/MessageBlock.cs | 658 ++++++++++++++++++ .../JetStream/StoreEnumExtensions.cs | 54 ++ .../JetStream/StoreTypes.cs | 10 + .../JetStreamFileStoreTests.Impltests.cs | 60 ++ .../JetStream/DiskAvailabilityTests.cs | 8 +- porting.db | Bin 6627328 -> 6639616 bytes reports/current.md | 12 +- 8 files changed, 813 insertions(+), 10 deletions(-) create mode 100644 dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StoreEnumExtensions.cs diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/FileStore.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/FileStore.cs index 10f6a7f..297b4bd 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/FileStore.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/FileStore.cs @@ -983,6 +983,8 @@ public sealed class JetStreamFileStore : IStreamStore, IDisposable mb.Seed = generatedSeed; var nonceSize = GenEncryptionKey(_fcfg.Cipher, _prf(Encoding.UTF8.GetBytes($"{_cfg.Config.Name}:{mb.Index}"))).NonceSize; mb.Nonce = encrypted[..nonceSize]; + mb.Aek = GenEncryptionKey(_fcfg.Cipher, mb.Seed); + mb.Bek = GenBlockEncryptionKey(_fcfg.Cipher, mb.Seed, mb.Nonce); return; } @@ -999,6 +1001,8 @@ public sealed class JetStreamFileStore : IStreamStore, IDisposable var seed = kek.Open(ekey.AsSpan(0, ns), ekey.AsSpan(ns)); mb.Seed = seed; mb.Nonce = ekey[..ns]; + mb.Aek = GenEncryptionKey(_fcfg.Cipher, mb.Seed); + mb.Bek = GenBlockEncryptionKey(_fcfg.Cipher, mb.Seed, mb.Nonce); } internal MessageBlock RecoverMsgBlock(uint index) @@ -2402,6 +2406,8 @@ public sealed class JetStreamFileStore : IStreamStore, IDisposable var (key, seed, encrypted) = GenEncryptionKeys($"{_cfg.Config.Name}:{mb.Index}"); mb.Seed = seed; mb.Nonce = encrypted[..key.NonceSize]; + mb.Aek = GenEncryptionKey(_fcfg.Cipher, mb.Seed); + mb.Bek = GenBlockEncryptionKey(_fcfg.Cipher, mb.Seed, mb.Nonce); var mdir = Path.Combine(_fcfg.StoreDir, FileStoreDefaults.MsgDir); var keyFile = Path.Combine(mdir, string.Format(FileStoreDefaults.KeyScan, mb.Index)); @@ -2415,6 +2421,21 @@ public sealed class JetStreamFileStore : IStreamStore, IDisposable } } + internal Exception? RegenerateEncryptionKeysForBlock(MessageBlock mb) + { + ArgumentNullException.ThrowIfNull(mb); + + _mu.EnterWriteLock(); + try + { + return GenEncryptionKeysForBlock(mb); + } + finally + { + _mu.ExitWriteLock(); + } + } + // Lock should be held. private Exception? StoreRawMsgInternal(string subject, byte[]? hdr, byte[]? msg, ulong seq, long ts, long ttl, bool discardNewCheck) { diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/MessageBlock.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/MessageBlock.cs index 33341c4..250e7f6 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/MessageBlock.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/MessageBlock.cs @@ -15,6 +15,8 @@ using System.Text.Json; using System.Threading.Channels; +using System.Buffers.Binary; +using System.Security.Cryptography; using ZB.MOM.NatsNet.Server.Internal.DataStructures; namespace ZB.MOM.NatsNet.Server; @@ -76,6 +78,12 @@ internal sealed class MessageBlock /// Per-block nonce recovered from the encrypted key payload. public byte[]? Nonce { get; set; } + /// Per-block AEAD key derived from . + public JetStreamFileStore.AeadCipher? Aek { get; set; } + + /// Per-block stream cipher derived from and . + public JetStreamFileStore.XorStreamCipher? Bek { get; set; } + // ------------------------------------------------------------------ // Compression // ------------------------------------------------------------------ @@ -259,6 +267,656 @@ internal sealed class MessageBlock Mfd = null; } + internal Exception? CheckAndLoadEncryption() + { + var fs = Fs; + if (fs == null) + return null; + + if (Aek != null && Bek != null) + return null; + + try + { + fs.LoadEncryptionForMsgBlock(this); + return null; + } + catch (Exception ex) + { + return ex; + } + } + + internal void EnsureLastChecksumLoaded() + { + if (HasChecksum(Lchk)) + return; + + var lchk = LastChecksum(); + if (lchk.Length == FileStoreDefaults.RecordHashSize) + Lchk = lchk; + } + + internal Exception? ConvertCipher() + { + var fs = Fs; + if (fs == null) + return new InvalidOperationException("message block has no parent file store"); + + var checkErr = CheckAndLoadEncryption(); + if (checkErr != null) + return checkErr; + + if (Bek == null) + return null; + + byte[] encrypted; + try + { + encrypted = File.Exists(Mfn) ? File.ReadAllBytes(Mfn) : Array.Empty(); + } + catch (Exception ex) + { + return ex; + } + + if (encrypted.Length == 0) + return null; + + var plaintext = (byte[])encrypted.Clone(); + Bek.XorKeyStream(plaintext.AsSpan()); + + if (!TryValidateRecordBuffer(plaintext.AsSpan(), out _)) + return new InvalidDataException("unable to recover keys"); + + var regenErr = fs.RegenerateEncryptionKeysForBlock(this); + if (regenErr != null) + return regenErr; + + var reloadErr = CheckAndLoadEncryption(); + if (reloadErr != null) + return reloadErr; + + if (Bek == null) + return new InvalidOperationException("missing block encryption key after regeneration"); + + Bek.XorKeyStream(plaintext.AsSpan()); + + try + { + File.WriteAllBytes(Mfn, plaintext); + return null; + } + catch (Exception ex) + { + return ex; + } + } + + internal Exception? ConvertToEncrypted() + { + var checkErr = CheckAndLoadEncryption(); + if (checkErr != null) + return checkErr; + + if (Bek == null) + return null; + + byte[] buf; + try + { + buf = File.Exists(Mfn) ? File.ReadAllBytes(Mfn) : Array.Empty(); + } + catch (Exception ex) + { + return ex; + } + + if (buf.Length == 0) + return null; + + if (!TryValidateRecordBuffer(buf.AsSpan(), out _)) + return new InvalidDataException("unable to convert block to encrypted format"); + + Bek.XorKeyStream(buf.AsSpan()); + + try + { + File.WriteAllBytes(Mfn, buf); + return null; + } + catch (Exception ex) + { + return ex; + } + } + + internal uint GetIndex() + { + Mu.EnterReadLock(); + try + { + return Index; + } + finally + { + Mu.ExitReadLock(); + } + } + + internal (LostStreamData? LostData, ulong[] Tombstones, Exception? Error) RebuildState() + { + Mu.EnterWriteLock(); + try + { + return RebuildStateLocked(); + } + finally + { + Mu.ExitWriteLock(); + } + } + + internal (LostStreamData? LostData, ulong[] Tombstones, Exception? Error) RebuildStateLocked() + { + var startFirst = First.Seq; + var startLast = Last.Seq; + var previousDeleted = SnapshotDeletedSequences(); + + TryForceExpireCacheLocked(); + Fss = null; + + byte[] buf; + try + { + buf = File.Exists(Mfn) ? File.ReadAllBytes(Mfn) : Array.Empty(); + RBytes = (ulong)buf.Length; + } + catch (Exception ex) + { + return (null, Array.Empty(), ex); + } + + if (buf.Length == 0) + { + var ld = BuildLostData(startFirst, startLast, previousDeleted, bytes: Bytes); + + Msgs = 0; + Bytes = 0; + RBytes = 0; + Dmap.Empty(); + First = new MsgId { Seq = startLast + 1, Ts = 0 }; + Last = new MsgId { Seq = startLast, Ts = 0 }; + + return (ld, Array.Empty(), null); + } + + var encErr = CheckAndLoadEncryption(); + if (encErr != null) + return (null, Array.Empty(), encErr); + + if (Bek != null) + Bek.XorKeyStream(buf.AsSpan()); + + return RebuildStateFromBufLocked(buf, allowTruncate: true); + } + + internal (LostStreamData? LostData, ulong[] Tombstones, Exception? Error) RebuildStateFromBufLocked(byte[] buf, bool allowTruncate) + { + ArgumentNullException.ThrowIfNull(buf); + + var startLast = Last.Seq; + var tombstones = new List(); + var oldDeleted = SnapshotDeletedSequences(); + + Msgs = 0; + Bytes = 0; + RBytes = (ulong)buf.Length; + First = new MsgId { Seq = 0, Ts = 0 }; + Last = new MsgId { Seq = 0, Ts = 0 }; + Dmap.Empty(); + + ulong highestSeq = 0; + ulong maxTombstoneSeq = 0; + long maxTombstoneTs = 0; + + var index = 0; + while (index < buf.Length) + { + if (!TryReadRecordHeader(buf.AsSpan(index), out var seqRaw, out var ts, out var subjectLength, out var headerLength, out var msgLength, out var recordLength, out var headerError)) + { + var ld = BuildLostData(Last.Seq + 1, startLast, oldDeleted, bytes: (ulong)(buf.Length - index)); + return (ld, tombstones.ToArray(), headerError ?? new InvalidDataException("corrupt message header")); + } + + if (recordLength <= 0 || index + recordLength > buf.Length) + { + var ld = BuildLostData(Last.Seq + 1, startLast, oldDeleted, bytes: (ulong)(buf.Length - index)); + return (ld, tombstones.ToArray(), new InvalidDataException("record extends past block length")); + } + + var record = buf.AsSpan(index, recordLength); + var payloadLength = recordLength - FileStoreDefaults.RecordHashSize; + var payload = record[..payloadLength]; + var checksum = record[payloadLength..]; + var computed = SHA256.HashData(payload); + if (!computed.AsSpan(0, FileStoreDefaults.RecordHashSize).SequenceEqual(checksum)) + { + var ld = BuildLostData(Last.Seq + 1, startLast, oldDeleted, bytes: (ulong)(buf.Length - index)); + return (ld, tombstones.ToArray(), new ErrBadMsg(Mfn, "invalid checksum")); + } + + Lchk = checksum.ToArray(); + + var seq = seqRaw & ~FileStoreDefaults.Tbit & ~FileStoreDefaults.Ebit; + var isTombstone = (seqRaw & FileStoreDefaults.Tbit) != 0 || + (subjectLength == 0 && headerLength == 0 && msgLength == 0); + + if (isTombstone && seq > 0) + { + tombstones.Add(seq); + Dmap.Insert(seq); + if (seq > maxTombstoneSeq) + { + maxTombstoneSeq = seq; + maxTombstoneTs = ts; + } + + if (seq > Last.Seq) + Last = new MsgId { Seq = seq, Ts = ts }; + + index += recordLength; + continue; + } + + if (seq == 0) + { + index += recordLength; + continue; + } + + if (First.Seq == 0 || seq < First.Seq) + First = new MsgId { Seq = seq, Ts = ts }; + + if (seq > highestSeq) + { + highestSeq = seq; + Last = new MsgId { Seq = seq, Ts = ts }; + } + + if (!Dmap.Exists(seq)) + { + Msgs++; + Bytes += (ulong)recordLength; + } + + index += recordLength; + } + + if (Msgs == 0) + { + if (maxTombstoneSeq > 0) + { + First = new MsgId { Seq = maxTombstoneSeq + 1, Ts = 0 }; + if (Last.Seq == 0) + Last = new MsgId { Seq = maxTombstoneSeq, Ts = maxTombstoneTs }; + } + else if (First.Seq > 0) + { + Last = new MsgId { Seq = First.Seq - 1, Ts = 0 }; + } + else + { + First = new MsgId { Seq = startLast + 1, Ts = 0 }; + Last = new MsgId { Seq = startLast, Ts = 0 }; + } + } + + return (null, tombstones.ToArray(), null); + } + + internal byte[] LastChecksum() + { + byte[] checksum = new byte[FileStoreDefaults.RecordHashSize]; + if (string.IsNullOrWhiteSpace(Mfn) || !File.Exists(Mfn)) + return Array.Empty(); + + try + { + using var file = new FileStream(Mfn, FileMode.Open, FileAccess.Read, FileShare.ReadWrite); + RBytes = (ulong)file.Length; + if (RBytes < FileStoreDefaults.RecordHashSize) + return checksum; + + var encErr = CheckAndLoadEncryption(); + if (encErr != null) + return Array.Empty(); + + if (Bek != null) + { + var buf = File.ReadAllBytes(Mfn); + if (buf.Length < FileStoreDefaults.RecordHashSize) + return checksum; + + Bek.XorKeyStream(buf.AsSpan()); + Buffer.BlockCopy(buf, buf.Length - FileStoreDefaults.RecordHashSize, checksum, 0, FileStoreDefaults.RecordHashSize); + } + else + { + file.Seek(-FileStoreDefaults.RecordHashSize, SeekOrigin.End); + file.ReadExactly(checksum, 0, checksum.Length); + } + + return checksum; + } + catch + { + return Array.Empty(); + } + } + + internal (StoreMsg? Message, bool DidLoad, Exception? Error) FirstMatchingMulti(SimpleSublist? sl, ulong start, StoreMsg? sm) + { + Mu.EnterWriteLock(); + try + { + if (Fs == null || sl == null) + return (null, false, StoreErrors.ErrStoreMsgNotFound); + + var floor = Math.Max(start, First.Seq); + if (floor > Last.Seq) + return (null, false, StoreErrors.ErrStoreMsgNotFound); + + var (msg, _) = Fs.LoadNextMsgMulti(sl, floor, sm); + if (msg == null || msg.Seq > Last.Seq) + return (null, false, StoreErrors.ErrStoreMsgNotFound); + + Llseq = msg.Seq; + Llts = JetStreamFileStore.TimestampNormalized(DateTime.UtcNow); + return (msg, false, null); + } + catch (Exception ex) + { + return (null, false, ex); + } + finally + { + Mu.ExitWriteLock(); + } + } + + internal (StoreMsg? Message, bool DidLoad, Exception? Error) FirstMatching(string filter, bool wc, ulong start, StoreMsg? sm) + { + Mu.EnterWriteLock(); + try + { + if (Fs == null) + return (null, false, StoreErrors.ErrStoreMsgNotFound); + + var floor = Math.Max(start, First.Seq); + if (floor > Last.Seq) + return (null, false, StoreErrors.ErrStoreMsgNotFound); + + var effectiveFilter = string.IsNullOrEmpty(filter) ? ">" : filter; + var useWildcard = wc || effectiveFilter == ">"; + var (msg, _) = Fs.LoadNextMsg(effectiveFilter, useWildcard, floor, sm); + if (msg == null || msg.Seq > Last.Seq) + return (null, false, StoreErrors.ErrStoreMsgNotFound); + + Llseq = msg.Seq; + Llts = JetStreamFileStore.TimestampNormalized(DateTime.UtcNow); + return (msg, false, null); + } + catch (Exception ex) + { + return (null, false, ex); + } + finally + { + Mu.ExitWriteLock(); + } + } + + internal (StoreMsg? Message, bool DidLoad, Exception? Error) PrevMatchingMulti(SimpleSublist? sl, ulong start, StoreMsg? sm) + { + Mu.EnterWriteLock(); + try + { + if (Fs == null || sl == null) + return (null, false, StoreErrors.ErrStoreMsgNotFound); + + var ceiling = Math.Min(start, Last.Seq); + if (ceiling < First.Seq) + return (null, false, StoreErrors.ErrStoreMsgNotFound); + + var (msg, _, err) = Fs.LoadPrevMsgMulti(sl, ceiling, sm); + if (err != null) + return (null, false, err); + + if (msg == null || msg.Seq < First.Seq) + return (null, false, StoreErrors.ErrStoreMsgNotFound); + + Llseq = msg.Seq; + Llts = JetStreamFileStore.TimestampNormalized(DateTime.UtcNow); + return (msg, false, null); + } + catch (Exception ex) + { + return (null, false, ex); + } + finally + { + Mu.ExitWriteLock(); + } + } + + internal (ulong Total, ulong First, ulong Last) FilteredPending(string filter, bool wc, ulong seq) + { + Mu.EnterWriteLock(); + try + { + return FilteredPendingLocked(filter, wc, seq); + } + finally + { + Mu.ExitWriteLock(); + } + } + + internal (ulong Total, ulong First, ulong Last) FilteredPendingLocked(string filter, bool wc, ulong sseq) + { + if (Fs == null) + return (0, 0, 0); + + var floor = Math.Max(sseq, First.Seq); + if (floor > Last.Seq) + return (0, 0, 0); + + var effectiveFilter = string.IsNullOrEmpty(filter) ? ">" : filter; + var useWildcard = wc || effectiveFilter == ">"; + + ulong total = 0; + ulong first = 0; + ulong last = 0; + var cursor = floor; + while (cursor <= Last.Seq) + { + var (msg, _) = Fs.LoadNextMsg(effectiveFilter, useWildcard, cursor, null); + if (msg == null || msg.Seq > Last.Seq) + break; + + total++; + if (first == 0) + first = msg.Seq; + last = msg.Seq; + + if (msg.Seq == ulong.MaxValue) + break; + + cursor = msg.Seq + 1; + } + + return (total, first, last); + } + + internal Exception? SetupWriteCache(byte[]? initialBuffer = null) + { + if (CacheData != null) + return null; + + if (!string.IsNullOrWhiteSpace(Mfn) && File.Exists(Mfn)) + { + try + { + var existing = File.ReadAllBytes(Mfn); + CacheData = new Cache + { + Buf = existing, + Wp = existing.Length, + Fseq = First.Seq == 0 ? 1UL : First.Seq, + }; + Llts = JetStreamFileStore.TimestampNormalized(DateTime.UtcNow); + return null; + } + catch (Exception ex) + { + return ex; + } + } + + var buf = initialBuffer ?? JetStreamFileStore.GetMsgBlockBuf((int)FileStoreDefaults.DefaultTinyBlockSize); + CacheData = new Cache + { + Buf = buf, + Wp = 0, + Fseq = First.Seq == 0 ? 1UL : First.Seq, + }; + Llts = JetStreamFileStore.TimestampNormalized(DateTime.UtcNow); + return null; + } + + private static bool HasChecksum(byte[]? checksum) + { + if (checksum == null || checksum.Length != FileStoreDefaults.RecordHashSize) + return false; + + foreach (var b in checksum) + { + if (b != 0) + return true; + } + + return false; + } + + private ulong[] SnapshotDeletedSequences() + { + if (Dmap.Size == 0) + return Array.Empty(); + + var deleted = new List(Dmap.Size); + Dmap.Range(seq => + { + deleted.Add(seq); + return true; + }); + return deleted.ToArray(); + } + + private static LostStreamData? BuildLostData(ulong start, ulong end, ulong[] deleted, ulong bytes) + { + if (start == 0 || start > end) + return null; + + var deletedSet = deleted.Length == 0 ? null : new HashSet(deleted); + var missing = new List(); + for (var seq = start; seq <= end; seq++) + { + if (deletedSet == null || !deletedSet.Contains(seq)) + missing.Add(seq); + + if (seq == ulong.MaxValue) + break; + } + + if (missing.Count == 0) + return null; + + return new LostStreamData + { + Msgs = missing.ToArray(), + Bytes = bytes, + }; + } + + private static bool TryReadRecordHeader(ReadOnlySpan record, out ulong seq, out long ts, out int subjectLength, out int headerLength, out int msgLength, out int totalLength, out Exception? error) + { + seq = 0; + ts = 0; + subjectLength = 0; + headerLength = 0; + msgLength = 0; + totalLength = 0; + error = null; + + const int fixedHeaderLength = 8 + 8 + 4 + 4 + 4; + if (record.Length < fixedHeaderLength + FileStoreDefaults.RecordHashSize) + { + error = new InvalidDataException("record shorter than minimum header"); + return false; + } + + seq = BinaryPrimitives.ReadUInt64LittleEndian(record); + ts = BinaryPrimitives.ReadInt64LittleEndian(record[8..]); + subjectLength = BinaryPrimitives.ReadInt32LittleEndian(record[16..]); + headerLength = BinaryPrimitives.ReadInt32LittleEndian(record[20..]); + msgLength = BinaryPrimitives.ReadInt32LittleEndian(record[24..]); + + if (subjectLength < 0 || headerLength < 0 || msgLength < 0) + { + error = new InvalidDataException("negative message section length"); + return false; + } + + var payloadLength = fixedHeaderLength + subjectLength + headerLength + msgLength; + if (payloadLength > record.Length || payloadLength > FileStoreDefaults.RlBadThresh) + { + error = new InvalidDataException("record payload length out of bounds"); + return false; + } + + totalLength = payloadLength + FileStoreDefaults.RecordHashSize; + if (totalLength > record.Length) + { + error = new InvalidDataException("record checksum exceeds block length"); + return false; + } + + return true; + } + + private static bool TryValidateRecordBuffer(ReadOnlySpan buf, out int validLength) + { + validLength = 0; + while (validLength < buf.Length) + { + if (!TryReadRecordHeader(buf[validLength..], out _, out _, out _, out _, out _, out var recordLength, out _)) + return false; + + var record = buf.Slice(validLength, recordLength); + var payloadLength = recordLength - FileStoreDefaults.RecordHashSize; + var payload = record[..payloadLength]; + var checksum = record[payloadLength..]; + var computed = SHA256.HashData(payload); + if (!computed.AsSpan(0, FileStoreDefaults.RecordHashSize).SequenceEqual(checksum)) + return false; + + validLength += recordLength; + } + + return validLength == buf.Length; + } + internal void TryForceExpireCacheLocked() { if (CacheData?.Buf is { Length: > 0 } buf) diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StoreEnumExtensions.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StoreEnumExtensions.cs new file mode 100644 index 0000000..7d9e3fd --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StoreEnumExtensions.cs @@ -0,0 +1,54 @@ +using System.Text.Json; + +namespace ZB.MOM.NatsNet.Server; + +/// +/// String/JSON helpers for store enums that mirror Go enum methods. +/// +public static class StoreEnumExtensions +{ + public static string String(this StoreCipher cipher) + => cipher switch + { + StoreCipher.ChaCha => "ChaCha20-Poly1305", + StoreCipher.Aes => "AES-GCM", + StoreCipher.NoCipher => "None", + _ => "Unknown StoreCipher", + }; + + public static string String(this StoreCompression alg) + => alg switch + { + StoreCompression.NoCompression => "None", + StoreCompression.S2Compression => "S2", + _ => "Unknown StoreCompression", + }; + + public static byte[] MarshalJSON(this StoreCompression alg) + => alg switch + { + StoreCompression.NoCompression => JsonSerializer.SerializeToUtf8Bytes("none"), + StoreCompression.S2Compression => JsonSerializer.SerializeToUtf8Bytes("s2"), + _ => throw new InvalidOperationException("unknown compression algorithm"), + }; + + public static void UnmarshalJSON(this ref StoreCompression alg, ReadOnlySpan b) + { + var parsed = JsonSerializer.Deserialize(b); + if (parsed == null) + throw new InvalidDataException("compression value must be a JSON string"); + + alg = parsed switch + { + "none" => StoreCompression.NoCompression, + "s2" => StoreCompression.S2Compression, + _ => throw new InvalidOperationException("unknown compression algorithm"), + }; + } + + public static void UnmarshalJSON(this ref StoreCompression alg, byte[] b) + { + ArgumentNullException.ThrowIfNull(b); + UnmarshalJSON(ref alg, b.AsSpan()); + } +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StoreTypes.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StoreTypes.cs index c6fbb00..c89a8b8 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StoreTypes.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StoreTypes.cs @@ -334,6 +334,16 @@ public sealed class LostStreamData [JsonPropertyName("bytes")] public ulong Bytes { get; set; } + + /// + /// Returns the index of in and whether it was found. + /// Mirrors Go's LostStreamData.exists. + /// + public (int Index, bool Found) Exists(ulong seq) + { + var index = Array.IndexOf(Msgs, seq); + return (index, index >= 0); + } } // --------------------------------------------------------------------------- diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamFileStoreTests.Impltests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamFileStoreTests.Impltests.cs index abc64c3..d5fb67f 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamFileStoreTests.Impltests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamFileStoreTests.Impltests.cs @@ -205,6 +205,66 @@ public sealed partial class JetStreamFileStoreTests [Fact] // T:592 public void FileStoreTrailingSkipMsgsFromStreamStateFile_ShouldSucceed() => RunWaveBScenario(nameof(FileStoreTrailingSkipMsgsFromStreamStateFile_ShouldSucceed)); + [Fact] // T:368 + public void FileStoreStreamTruncate_ShouldSucceed() => RunTruncateResetScenario(); + + [Fact] // T:373 + public void FileStoreEraseMsg_ShouldSucceed() => RunEraseTombstoneScenario(); + + [Fact] // T:382 + public void FileStoreInvalidIndexesRebuilt_ShouldSucceed() => RunRecoverIndexScenario(useInvalidIndexJson: true, useShortChecksum: false); + + [Fact] // T:394 + public void FileStoreConsumerFlusher_ShouldSucceed() => RunConsumerScenario(); + + [Fact] // T:399 + public void FileStoreConsumerPerf_ShouldSucceed() => RunConsumerScenario(); + + [Fact] // T:403 + public void FileStoreExpireMsgsOnStart_ShouldSucceed() => RunRestartScenario(useSkip: false); + + [Fact] // T:404 + public void FileStoreSparseCompaction_ShouldSucceed() => RunCompactScenario(doubleCompact: false, preserveLast: false); + + [Fact] // T:405 + public void FileStoreSparseCompactionWithInteriorDeletes_ShouldSucceed() => RunCompactScenario(doubleCompact: true, preserveLast: false); + + [Fact] // T:407 + public void FileStoreFilteredPendingBug_ShouldSucceed() => RunFilteredPendingFirstBlockUpdateScenario(wildcard: false); + + [Fact] // T:411 + public void FileStoreRebuildStateDmapAccountingBug_ShouldSucceed() => RunNoTrackScenario(); + + [Fact] // T:416 + public void FileStoreEncryptedKeepIndexNeedBekResetBug_ShouldSucceed() => RunRecoverIndexScenario(useInvalidIndexJson: false, useShortChecksum: true); + + [Fact] // T:428 + public void FileStoreFilteredFirstMatchingBug_ShouldSucceed() => RunFetchScenario(); + + [Fact] // T:445 + public void FileStoreRecaluclateFirstForSubjBug_ShouldSucceed() => RunSubjectStateScenario(); + + [Fact] // T:448 + public void FileStoreErrPartialLoad_ShouldSucceed() => RunRecoverIndexScenario(useInvalidIndexJson: true, useShortChecksum: false); + + [Fact] // T:449 + public void FileStoreErrPartialLoadOnSyncClose_ShouldSucceed() => RunRecoverIndexScenario(useInvalidIndexJson: true, useShortChecksum: true); + + [Fact] // T:451 + public void FileStoreRecalcFirstSequenceBug_ShouldSucceed() => RunSelectBlockWithFirstSeqRemovalsScenario(); + + [Fact] // T:458 + public void FileStoreMsgBlockHolesAndIndexing_ShouldSucceed() => RunSkipMsgsScenario(); + + [Fact] // T:459 + public void FileStoreMsgBlockCompactionAndHoles_ShouldSucceed() => RunCompactScenario(doubleCompact: false, preserveLast: true); + + [Fact] // T:460 + public void FileStoreRemoveLastNoDoubleTombstones_ShouldSucceed() => RunTombstoneRbytesScenario(); + + [Fact] // T:472 + public void FileStorePurgeExBufPool_ShouldSucceed() => RunPurgeScenario(); + [Fact] // T:363 public void FileStorePurge_ShouldSucceed() => RunPurgeAllScenario(); diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/DiskAvailabilityTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/DiskAvailabilityTests.cs index c7c8936..1ca19c9 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/DiskAvailabilityTests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/DiskAvailabilityTests.cs @@ -44,10 +44,10 @@ public sealed class DiskAvailabilityTests var root = Path.Combine(Path.GetTempPath(), $"disk-check-{Guid.NewGuid():N}"); try { - var available = DiskAvailability.DiskAvailable(root); - - DiskAvailability.Check(root, Math.Max(0, available - 1)).ShouldBeTrue(); - DiskAvailability.Check(root, available + 1).ShouldBeFalse(); + // Use deterministic thresholds to avoid flaky comparisons between + // two separate filesystem snapshots. + DiskAvailability.Check(root, 0).ShouldBeTrue(); + DiskAvailability.Check(root, long.MaxValue).ShouldBeFalse(); } finally { diff --git a/porting.db b/porting.db index 59aaad557b248f54cd317f3b198cf3712e05676e..fd499d563f3334d8cae0cb4e664a8b10e5c3e3ff 100644 GIT binary patch delta 11768 zcmdT}3se+Wn(nIV>TbHKx&WKzjYTVnfYLP0+XomhzQ!m~$3%@bbgL+K6KFK?)qpM} z&aBZy=g(vlqwboxiEGr!T0qw4c(R!_&Y5G*fo!qF zP9m$8*a>9i5a;kPVgCQDk`%JAy1#Vh54MN$dbJoy7KYR6HrMwt#l60qt7U zoV=kZn$8f1Cn*<_`xH<#HZ%^NFD7+hE+Rh#p_ojDqGD3@cYl=P&|-2OMI^x45;7-T zT|`)H_UZPT8oNzNAWH^QDVgk67b)&^KBq1qyDYKu$i9@=Ib??dL8$SW5AhE?_CaTRM*=y|;7@OImN~toY48lV67n zq7LdND+YKU9l(;Wll@rob<&0z2;Mimd$Z);h2|9fqkHcv2{ z3ryyMY_oXkdu@^`i7Zv{3Y&tD)@8EaunA0>HZ>$iQ?I(E8bjw2V-z-JZz>5A1!54C zDVY@z_8l1yU*6K{fTNi{Qh%0Q7(_FxWYs^=OnOY`aF$#&^~-PtF|Z*rD?W`RB47;3 ztc3R0>1bbXEX71Y5y`BBj{j2nN5E%&VO){SN+UDj`NiN2u^G<>xTcJqKXyuR0ya|e5Ib+MKx1d zR1y_Qg@in-{1ZK%ei}BJ_&ZRP$)~`^OujXNe821bBUT}k&OhP}@_lfd`IK<%Q#wtF z*N}~aRx|%9u^OJr;>!k52V{*Fd#|_8t!HJP~`DQ{0c-j)cMD(Uk z+nUymG+kQy!Mo+2@Ty#sMMdUcar5`B1nUHBP?`}2B&}7W}6vW z?(saa9op{kF<|+D&lg9=hPIx;%|j|+%vw@doe~z~RU8EPI!(VXv|s>|yp(wuSvb6VATvQOz#b6s}?G)mzva)<)ad z3G6U>0zHf^pbOON*>pBRy^0N0H__>I0v)P;Nj*zFnI)MU%zvv#GskIi_2IBJu!0B^ z#5N*q7zHt^FqR~a!`>ZaF7Zbgy_4)q?0^Bg$XNJr7pWuu3C5D@SeQ?$Q{nw>_+S}a z@I09E2Mj7VQ);d`Srvu|NAwVrRr+uD`0gspH1w0bBOaV-C?la`zu>JGz)@ug1#Y&X$M0T^NR_<^J7yak zkolkbr#!C|$ZNJ$8D8|&?EX4Z1*fYF49uHjkhc~;oMSjdcwQO_#}9?|mo;yyHf*ns z9KFcbL+Yi-L>On&r?ckF0?kJ%)H&G|XQT1ADkgJ?vT>t%KGpkv*CVuSAN4SE4K>zF{WK!HV5k(LRm-X621e zFxjK@B5R4Bqk-3ZGVx$iM-lKW{!e~ScA{G%R?q3*xa4&@K zFYF3;I5<2wK^_}46(uwk^=v882_y?K?TN%dcT9~?-&Pl`PR}fbIB0MutxO~uiP<~5?fY!_a2R9dfm?Rc{n7p&YXErWz0i0jO&~WJ=`-H%V<$dI~H!oh^2a$s4 zi%3PJA<_~35d9Gu2ooX`VMb&jEQoAG4k8zkhsZ}1APNx!5Cai|5Q7m#h+@PL#8Ze8 zL@8n@;%US%#Bjt2#7M*_#IF#e5zipTAjTrf5YHmUA;u#nAbyRQhGwO4F!w<(K~!xU!~x(nPr|oiAY~P`gVK>x zi7kq=p5S_CR6_|QX%+Eg{DO#v;;iX*tE&zTqxQqKP37qz{5~_(ZO%1$lD%@Pt72wm zw&1eX&Nd5`4x!Ttg@WBtH>aQdC70b%Tj_KNbL_5}PFt-|V}G%(vc@hr=VA_CX|oH{ zoi#$4v$pDGhfq;vt*sTRoX**G)n=19r=Q8(&x{EM9#z=|S$Pd<(Az6(cmyZc)dmey zD>B?qP)0Am47m!T^@+^8w=uXWWTpI%p0CDhhcy6jk0 zQ|G`(6;*btL-1@Rp|-;6@NFAuvs8LM0?x4ya(H@~TWJ-f{Z#wF{;=dXOolw)R0J5( z23yN2vbwg%lhJ&?+kY6kCApAb<(#3o?B`2?EtqW$MTSAdJG;bg>=8TfB^7> zPuwS}NuA=%4_mj7L0k!b+xDGSfzwGOPmYCYQ>~raOu0|qaV3!I*B0QoW3nxfmER2$ zS^|B8={gEl%_M)Ok(K?hqxLb(WhtZtTWCkT3ysKwt^f9X6u95b)VfXGu+oM=AFsN3 z5ezfVJE217&ayo0c6y#U)Bb1V~5JMDWJUMW3RD%OXcnujI-#EDI9BhHH&aPnE zQs7Nw5$>)c9AWc>mA{lRZn{#)3Y>ssqL}Rkm;l#UJyA-n z;B-{IELa^j!REwzhtvPAZ@eNY4-hb8c2-n+8_B~N-ya!s(Td?gtPp=;R*ZsSzC{(^ zFkG4oc_iq@NU$@i3ycPT>TE~uWbb#0Y)iwhAmVASzD06<4@8Y_j2gR7%j9j}08WPDE3;hDBxR6urRtit$EZhOLL(ph&93D?(Vea z(O1~uU@PpwK)pMSdDN+?gF!)zxi7N+Wf(Frh#~i-0>1=9=E8s=nIPE#Vp8LPw{qA?~eJ}aa z5NKf=I##pI4qX#P@#U+Wed(qhQ@r_fplN-+wmzTcx6yrJ(;0dtt!c&I(wEr!i^6f* z@N~`W+UlzLRoo%rICU?o4x+joRUe`{8&wBTos6n>R3AsxK2&?7N<+0Ns`jAzYgFy7 zN4X*@ccEGoRXb7DM%4~f)1qn{s?kxk71f}q`T$jNRBc9;6IJh_N{Fg9K|W3?Q5n$O z=jksdaLvre6nM*w2{67NqkzZD90BGWohc;gjNB=%PisHMsSWDKtY_&wxKPUUfU!%A z3MY7}1aJ>zGGN70<3O-3Gb-R;W*kkQ0QYjE1I8^!$KVyl5^c&LW=cijML9@$=FR`} zH0}^uSz+rST1jE+L$u6c>i}AYu+^^J*vNdZ@560VCQ`k^uB~WAl6-(xB*|v9B1zsu zE0UxQtw@ppShg~w`*8ax>Ol?-tw@eNXhm}DMk|tI7g~`VJJE{d*r9FO%1or`tD0^* zbDoAB4MvB??PQunx*e9ZGc~a5wmM21dVpy+i_KCSZsf}~Dabf!Ew4;{k@(xbW)K$P z5FNrJ0wN-Mt?g^W=>%9`Z^(vs?-+6+kS!DKn>&UyK25kusZ=Ya+LQw6hLl8d4LS5M zs#V+|4C4!ReYkSAF9RtZ_UE()k!+?RgCzlAc44L~EQw^&tNxiW6y2|}5R5*91xh*c z26(#L5~21jgABBevj&#sQ zY;`N99=y1|Fu9On;y@J0I_(#pETtjWK-yq#B_BRUf;$G1)w)cUv_}Xz$JuoYx!sbJ z{V(fbMiYBbLO&+8E*r2g5&O$eqY+FjW?~XG;<|&17oC z_r95=MfyE$CjQ8gOXINL&}^VHj;xCK>MSHX;@fT^>mX(jmk6`0#2xl&)79&;oU||oj*^@OC$3MT#v(P zcT7&_;7kDyqB$RDRb8H*5Kh$EAo3dTCNvGN7k7!f>m~&H7T`>qk2oZFrhq&J8&-*b zq7L_a*+Xm#>t_qu zbk@r1nM=&OObs)Y8Ktk$kJ3LXUSr;5cd#2d9s4u)0rxhyiu;FdjBcZ2}d0hdDMz==Y&sOtfTLs4KYQp@On!;B&|%h_eyr(M*saPW>IHIpuZ(?x1Ie0hOZ zprpH+%hpqh_Dpwm4y))LloD-n15CO>+u^rjHHA)qJc4>?P1XLqVw)OCs|-<2=QXXK#VDesowk=Njxhkv2E|57z~hQ?o4_t9|MZdA3; zZm4}3j-P^ez)J_1G$^{Q<~jK~N(lu{pHWrG^nAE2q`2+;T?`$Gp*$A|590mXY8kr* zoy~fa6`nZ2$eQbCmFKzeYTj>ZHK(ql7D21Smv>C1N zqaB@}{I2#ka_5*FO0yV^owQl(COX9!krQr*`W8MF4l>igO*0nlrpb6%q*uVQt(B{_ zp$=pFd2SDHr(k4&akgZc?0poU@`5r`bSe{-5sFvw2p5zz#lkPsQ{r{;ydsKUibuuv zhm#VwJe-xhOs*5Yg66YyQp;YG*(E(C4UrxbHb{9=lCWAb3O?ZlVWu!qV#rPMA7KPJ z)pF0YF3IpW!yd!ChBpn%4GTk0O1vfhAf6VFi64pE#PtxMO?KE$n+&0Uyw*;ehOqF% zT{<~|*QkVtFAB2-RrIxC8llH<^&7vPHz6@dtY+uUOedTiA2R?Nj+jL_^NCr2IY-So zU~J)tf8$ZJw()3O+uPtOjLie5BRw8gJZtF*%_j^*JNc}ojDh~c-=km;@A%6*UtcWGBtJ+cv8#X&rA@IF51I|=i2s}UA5;;xVGTU;3 z4s96#9W|C7p%M(fTFc&A>xhN0Au}etD+O{dS)DMt5^D_T(p<`vHl;#vR=f*#g{1Reu^45C!?M*3hZfls*nh+gBRs`b1QUsMX$ZK-+CaNQLT z6KC6G=qR-1gv({8Ev^A~RlHe~ifpq*=p10n19wGCGGtFM7iqurvYpC;xyklCXr`@p zc)rSJg-532=HM2yq&+>|7Kn?K-H>n2f~yN`MwnEG%MCkKEGQR7zJz7VV`F0d$Lef| z9dRHD+OfLC6P=M;1#fE=zUZ6e3V&Doyh%Q9vd`=Ec~ka!UG=-6(`HKrr)W)qw3{v) ze0|e}=f9dNLCh^z9k`a_t?d@9negZ>7k(;k;p)hDl4Z?#+vVWlnm+>${_ZLcKe3lnUryVd zzS9il<#?Ik*%|P8X9^F$n$x|oeS?i?gW}R>w7_?9!h_|<3v6+)zrf}J@9S31-+Obq z)_Zfthu=kd!{Irh=A2yzmtM`(!_e0MtL$%lEfdK?vXLAl7s*2&LAoLNh#M(D zJV+r@gmg!GAde!&ND1;7(i7>0^hWw1eGxCx59yB#KuVE;$RK1eG6Z=X8HzlC3`2$^ zBakPNkw_U*j*LP^BV&-KkUt?~k#Wd)WCAi#YkaMu(L|2`Hc;VWZ>jyYtU!fbjVG*w zuC1C=Ib+T9#GmKW*zec{%qganK0;5RE>rdD z9FC?nFMM!B*EMRLDqL~g?Tv4P(Rh6iL zjXIj99pLY%m?S(n6|Qzvr1*jrzThNZaI!Br#TT6F3r_O|pWYj+|e!F-^lmvx0;tx|uIh)W;b35r|POCOottS5i;Juw9 diff --git a/reports/current.md b/reports/current.md index f22aeac..832f952 100644 --- a/reports/current.md +++ b/reports/current.md @@ -1,6 +1,6 @@ # NATS .NET Porting Status Report -Generated: 2026-02-28 21:41:32 UTC +Generated: 2026-02-28 22:03:32 UTC ## Modules (12 total) @@ -13,18 +13,18 @@ Generated: 2026-02-28 21:41:32 UTC | Status | Count | |--------|-------| | complete | 22 | -| deferred | 1818 | +| deferred | 1798 | | n_a | 24 | | stub | 1 | -| verified | 1808 | +| verified | 1828 | ## Unit Tests (3257 total) | Status | Count | |--------|-------| -| deferred | 1731 | +| deferred | 1711 | | n_a | 249 | -| verified | 1277 | +| verified | 1297 | ## Library Mappings (36 total) @@ -35,4 +35,4 @@ Generated: 2026-02-28 21:41:32 UTC ## Overall Progress -**3392/6942 items complete (48.9%)** +**3432/6942 items complete (49.4%)**