From 43d914bf833cdffb838bd8837060c143e1813598 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sat, 28 Feb 2026 18:03:56 -0500 Subject: [PATCH] feat(batch15): complete group 4 msgblock/consumerfilestore --- .../JetStream/FileStoreTypes.cs | 7 + .../JetStream/MessageBlock.cs | 775 ++++++++++++++++-- .../ConcurrencyTests1.Impltests.cs | 40 + .../ConcurrencyTests2.Impltests.cs | 158 ++++ porting.db | Bin 6643712 -> 6647808 bytes reports/current.md | 12 +- 6 files changed, 939 insertions(+), 53 deletions(-) diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/FileStoreTypes.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/FileStoreTypes.cs index 84f1a59..e742192 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/FileStoreTypes.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/FileStoreTypes.cs @@ -282,6 +282,13 @@ internal sealed class ErrBadMsg : Exception Detail = detail; } + /// + /// Returns the formatted error text. + /// Mirrors Go's errBadMsg.Error(). + /// + internal string Error() + => Message; + private static string BuildMessage(string fileName, string detail) { var baseName = Path.GetFileName(fileName); diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/MessageBlock.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/MessageBlock.cs index ae856a0..e049499 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/MessageBlock.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/MessageBlock.cs @@ -1057,7 +1057,9 @@ internal sealed class MessageBlock if (flush || Werr != null) { - var flushErr = FlushPendingToDiskLocked(); + var (lostData, flushErr) = FlushPendingMsgsLocked(); + if (lostData != null && Fs != null) + Fs.RebuildState(lostData); if (flushErr != null) return flushErr; } @@ -1415,73 +1417,752 @@ internal sealed class MessageBlock if (original.Length == 0) return null; - var checkErr = CheckAndLoadEncryption(); - if (checkErr != null) - return checkErr; - - if (Bek != null) - Bek.XorKeyStream(original.AsSpan()); - - var desired = Cmp; var meta = new CompressionInfo(); var consumed = meta.UnmarshalMetadata(original); - if (consumed > 0 && meta.Type == desired) + var desired = Cmp; + if (consumed > 0 && meta.Type == desired && desired == StoreCompression.NoCompression) return null; - byte[] rewritten; - if (desired == StoreCompression.NoCompression && consumed > 0) + byte[] rewritten = original; + if (consumed > 0) { - if (meta.Type != StoreCompression.NoCompression) - return new NotSupportedException("compressed block decompression is not available"); + if (meta.Type == StoreCompression.NoCompression) + { + rewritten = original[consumed..]; + } + else + { + var (decompressed, decompressErr) = DecompressIfNeeded(original); + if (decompressErr != null) + return decompressErr; - rewritten = original[consumed..]; - } - else - { - // Compression transforms are introduced separately with StoreCompression helpers. - rewritten = original; + rewritten = decompressed ?? Array.Empty(); + } } - if (Bek != null) - Bek.XorKeyStream(rewritten.AsSpan()); + return AtomicOverwriteFile(rewritten, allowCompress: true); + } + + // Lock should be held. + internal Exception? AtomicOverwriteFile(byte[] buf, bool allowCompress) + { + ArgumentNullException.ThrowIfNull(buf); + + var hadOpenFile = Mfd != null; + if (hadOpenFile) + CloseFDsLockedNoCheck(); + + var origFileName = Mfn; + var tempFileName = Mfn + FileStoreDefaults.BlkTmpSuffix; try { - var tmp = $"{Mfn}.tmp"; - File.WriteAllBytes(tmp, rewritten); - File.Move(tmp, Mfn, overwrite: true); + var algorithm = StoreCompression.NoCompression; + if (allowCompress && Cmp != StoreCompression.NoCompression) + { + if (Cmp != StoreCompression.S2Compression) + return new NotSupportedException($"unsupported compression algorithm: {Cmp}"); + + return new NotSupportedException("message block compression is not available"); + } + + var writeBuffer = buf; + if (algorithm != StoreCompression.NoCompression) + { + return new NotSupportedException("message block compression is not available"); + } + + var encryptErr = EncryptOrDecryptIfNeeded(writeBuffer); + if (encryptErr != null) + return encryptErr; + + using var tmp = new FileStream(tempFileName, FileMode.Create, FileAccess.Write, FileShare.None); + tmp.Write(writeBuffer, 0, writeBuffer.Length); + tmp.Flush(flushToDisk: true); + tmp.Close(); + + File.Move(tempFileName, origFileName, overwrite: true); + + Cmp = algorithm; + RBytes = (ulong)writeBuffer.Length; + + if (hadOpenFile) + { + var enableErr = EnableForWriting(false); + if (enableErr != null) + return enableErr; + } + return null; } catch (Exception ex) { + try + { + if (File.Exists(tempFileName)) + File.Delete(tempFileName); + } + catch + { + // Best-effort cleanup only. + } + + if (hadOpenFile && Mfd == null) + _ = EnableForWriting(false); + return ex; } } + // Lock should be held. + internal (byte[]? Buffer, Exception? Error) DecompressIfNeeded(byte[] buf) + { + ArgumentNullException.ThrowIfNull(buf); + + var hasCompressionHeader = buf.Length >= 3 && + buf[0] == (byte)'c' && + buf[1] == (byte)'m' && + buf[2] == (byte)'p'; + + var meta = new CompressionInfo(); + var consumed = meta.UnmarshalMetadata(buf); + if (consumed == 0) + { + if (hasCompressionHeader) + return (null, new InvalidDataException("invalid compression metadata")); + + return (buf, null); + } + + if (meta.Type == StoreCompression.NoCompression) + return (buf[consumed..], null); + + return (null, new NotSupportedException("message block decompression is not available")); + } + + // Lock should be held. + internal Exception? EncryptOrDecryptIfNeeded(byte[] buf) + { + ArgumentNullException.ThrowIfNull(buf); + + if (Bek != null && buf.Length > 0) + Bek.XorKeyStream(buf.AsSpan()); + + return null; + } + + // Lock should be held. + internal Exception? EnsureRawBytesLoaded() + { + if (RBytes > 0) + return null; + + var (file, openErr) = OpenBlock(); + if (openErr != null || file == null) + return openErr ?? new InvalidOperationException("unable to open message block"); + + try + { + RBytes = (ulong)file.Length; + return null; + } + finally + { + file.Dispose(); + } + } + + // Index a raw message buffer. + // Lock should be held. + internal Exception? IndexCacheBuf(byte[] buf) + { + ArgumentNullException.ThrowIfNull(buf); + + var blockFirstSeq = First.Seq; + var blockLastSeq = Last.Seq; + + if (blockFirstSeq > blockLastSeq + 1) + { + Fs?.Warn("indexCacheBuf corrupt state in {0}: first {1} last {2}", Mfn, blockFirstSeq, blockLastSeq); + return new InvalidDataException("corrupt state"); + } + + var idxSize = blockLastSeq - blockFirstSeq + 1; + if (idxSize > int.MaxValue) + return new InvalidDataException("corrupt state"); + + if (CacheData != null && CacheData.Buf.Length > 0) + JetStreamFileStore.RecycleMsgBlockBuf(CacheData.Buf); + + CacheData ??= new Cache(); + CacheData.Buf = Array.Empty(); + CacheData.Idx = Array.Empty(); + CacheData.Fseq = 0; + CacheData.Wp = 0; + CacheData.Nra = false; + + var idx = new List((int)idxSize); + + var populateFss = false; + if (FssNotLoaded()) + { + Fss = new SubjectTree(); + populateFss = true; + } + + Lsts = JetStreamFileStore.TimestampNormalized(DateTime.UtcNow); + Ttls = 0; + Schedules = 0; + + var offset = 0; + ulong firstSeen = 0; + ulong lastSeen = 0; + while (offset < buf.Length) + { + if (!TryReadRecordHeader(buf.AsSpan(offset), out var seqRaw, out _, out var subjectLength, out var headerLength, out var msgLength, out var recordLength, out _)) + return new InvalidDataException("corrupt state"); + + if (recordLength <= 0 || offset + recordLength > buf.Length) + return new InvalidDataException("corrupt state"); + + if ((seqRaw & FileStoreDefaults.Tbit) != 0) + { + offset += recordLength; + continue; + } + + var erased = (seqRaw & FileStoreDefaults.Ebit) != 0; + var seq = seqRaw & ~FileStoreDefaults.Ebit & ~FileStoreDefaults.Tbit; + if (seq <= lastSeen) + { + offset += recordLength; + continue; + } + + if (seq >= blockFirstSeq) + { + lastSeen = seq; + if (firstSeen == 0) + { + firstSeen = seq; + if (firstSeen != blockFirstSeq) + return new InvalidDataException("corrupt state"); + } + + var expectedSlot = (int)(seq - blockFirstSeq); + if (expectedSlot != idx.Count) + { + for (var dseq = blockFirstSeq + (ulong)idx.Count; dseq < seq; dseq++) + { + idx.Add(FileStoreDefaults.Dbit); + if (dseq != 0) + Dmap.Insert(dseq); + } + } + + idx.Add((uint)offset); + + if (erased && seq != 0 && !Dmap.Exists(seq)) + return new InvalidDataException("corrupt state"); + + if (populateFss && subjectLength > 0 && !NoTrack && !erased && !Dmap.Exists(seq)) + { + var subjectOffset = offset + 8 + 8 + 4 + 4 + 4; + if (subjectOffset < 0 || subjectOffset + subjectLength > buf.Length) + return new InvalidDataException("corrupt state"); + + var subjectSpan = buf.AsSpan(subjectOffset, subjectLength); + var (state, exists) = Fss!.Find(subjectSpan); + if (exists && state != null) + { + state.Msgs++; + state.Last = seq; + state.LastNeedsUpdate = false; + } + else + { + Fss!.Insert(subjectSpan, new SimpleState + { + Msgs = 1, + First = seq, + Last = seq, + LastNeedsUpdate = false, + }); + } + } + } + + offset += recordLength; + } + + if (idx.Count != (int)idxSize || + (firstSeen > 0 && firstSeen != blockFirstSeq) || + (lastSeen > 0 && lastSeen != blockLastSeq)) + return new InvalidDataException("corrupt state"); + + CacheData.Buf = buf; + CacheData.Idx = idx.ToArray(); + CacheData.Fseq = blockFirstSeq; + CacheData.Wp = buf.Length; + return null; + } + + // Flushes pending messages for this block. + internal Exception? FlushPendingMsgs() + { + Mu.EnterWriteLock(); + try + { + var (lostData, err) = FlushPendingMsgsLocked(); + if (lostData != null && Fs != null) + Fs.RebuildState(lostData); + + return err; + } + finally + { + Mu.ExitWriteLock(); + } + } + + // Writes the buffer to disk at an explicit offset. + // Lock should be held. + internal (int Written, Exception? Error) WriteAt(byte[] buf, long writeOffset) + { + ArgumentNullException.ThrowIfNull(buf); + + if (MockWriteErr) + { + MockWriteErr = false; + return (0, new IOException("mock write error")); + } + + if (Mfd == null) + return (0, new InvalidOperationException("message block file is not open")); + + try + { + Mfd.Seek(writeOffset, SeekOrigin.Begin); + Mfd.Write(buf, 0, buf.Length); + return (buf.Length, null); + } + catch (Exception ex) + { + return (0, ex); + } + } + + // flushPendingMsgsLocked writes pending bytes to disk. + // Lock should be held. + internal (LostStreamData? LostData, Exception? Error) FlushPendingMsgsLocked() + { + LostStreamData? fsLostData = null; + + if (CacheData == null || Mfd == null) + return (null, new InvalidOperationException("no message cache")); + + var (pending, pendingErr) = BytesPending(); + if (pendingErr != null || pending == null || pending.Length == 0) + { + if (pendingErr is InvalidOperationException) + return (null, null); + + return (null, pendingErr); + } + + var writeOffset = (long)CacheData.Wp; + var writeBuffer = pending; + + var checkErr = CheckAndLoadEncryption(); + if (checkErr != null) + return (null, checkErr); + + if (Bek != null && writeBuffer.Length > 0) + { + var encrypted = new byte[writeBuffer.Length]; + writeBuffer.AsSpan().CopyTo(encrypted); + Bek.XorKeyStream(encrypted.AsSpan()); + writeBuffer = encrypted; + } + + while (writeBuffer.Length > 0) + { + var (written, writeErr) = WriteAt(writeBuffer, writeOffset); + if (writeErr != null) + { + CloseFDsLockedNoCheck(); + var (lostData, _, _) = RebuildStateLocked(); + Werr = writeErr; + return (lostData, writeErr); + } + + writeOffset += written; + writeBuffer = writeBuffer[written..]; + } + + Werr = null; + + if (CacheData == null || Mfd == null) + return (fsLostData, Werr); + + CacheData.Wp = (int)writeOffset; + if (SyncAlways) + Mfd.Flush(flushToDisk: true); + else + NeedSync = true; + + if (PendingWriteSizeLocked() > 0) + return (fsLostData, Werr); + + var now = JetStreamFileStore.TimestampNormalized(DateTime.UtcNow); + var cexpNs = TimeSpanToNanoseconds(Cexp); + if (now < Llts || (cexpNs > 0 && (now - Llts) <= cexpNs)) + { + ResetCacheExpireTimerLocked(TimeSpan.Zero); + return (fsLostData, Werr); + } + + CacheData.Nra = false; + ExpireCacheLocked(); + return (fsLostData, Werr); + } + + // Lock should be held. + internal void ClearLoading() + { + Loading = false; + } + + // Loads messages from disk while holding the block lock. + internal Exception? LoadMsgs() + { + Mu.EnterWriteLock(); + try + { + return LoadMsgsWithLock(); + } + finally + { + Mu.ExitWriteLock(); + } + } + + // Lock should be held. + internal bool CacheAlreadyLoaded() + { + var cache = CacheData; + if (cache == null || cache.Fseq == 0 || cache.Buf.Length == 0) + return false; + + if (First.Seq < cache.Fseq) + return false; + + var expectedEntries = Msgs + (ulong)Dmap.Size + (First.Seq - cache.Fseq); + return expectedEntries == (ulong)cache.Idx.Length; + } + + // Lock should be held. + internal bool CacheNotLoaded() + => !CacheAlreadyLoaded(); + + // Lock should be held. + internal bool FssNotLoaded() + => Fss == null && !NoTrack; + + // Lock should be held. + internal (FileStream? File, Exception? Error) OpenBlock() + { + try + { + return (new FileStream(Mfn, FileMode.Open, FileAccess.Read, FileShare.ReadWrite), null); + } + catch (Exception ex) + { + return (null, ex); + } + } + + // Loads a block file into memory. + // Lock should be held. + internal (byte[]? Buffer, Exception? Error) LoadBlock(byte[]? buf) + { + FileStream? file = null; + var closeWhenDone = false; + + try + { + if (Mfd != null) + { + file = Mfd; + if (file.Seek(0, SeekOrigin.Begin) != 0) + { + file = null; + CloseFDsLockedNoCheck(); + } + } + + if (file == null) + { + var (opened, openErr) = OpenBlock(); + if (openErr != null || opened == null) + { + if (openErr is FileNotFoundException) + return (null, new FileNotFoundException("message block data missing", Mfn, openErr)); + + return (null, openErr ?? new IOException("failed to open message block")); + } + + file = opened; + closeWhenDone = true; + } + + var size64 = file.Length; + if (size64 > int.MaxValue) + return (null, new InvalidOperationException("message block size exceeded int capacity")); + + var size = (int)size64; + if (buf == null || buf.Length < size) + buf = JetStreamFileStore.GetMsgBlockBuf(size); + + var readTotal = 0; + while (readTotal < size) + { + var read = file.Read(buf, readTotal, size - readTotal); + if (read <= 0) + return (buf[..readTotal], new EndOfStreamException("short read while loading message block")); + + readTotal += read; + } + + RBytes = (ulong)readTotal; + return (buf[..readTotal], null); + } + catch (Exception ex) + { + return (null, ex); + } + finally + { + if (closeWhenDone) + file?.Dispose(); + } + } + + // Lock should be held. + internal Exception? LoadMsgsWithLock() + { + var checkErr = CheckAndLoadEncryption(); + if (checkErr != null) + return checkErr; + + if (Loading) + return null; + + Loading = true; + try + { + var checks = 0; + while (true) + { + checks++; + if (checks > 8) + return new InvalidOperationException("no message cache"); + + if (CacheAlreadyLoaded()) + return null; + + Llts = JetStreamFileStore.TimestampNormalized(DateTime.UtcNow); + + var (pending, _) = BytesPending(); + if (pending is { Length: > 0 }) + { + var (lostData, flushErr) = FlushPendingMsgsLocked(); + if (lostData != null && Fs != null) + Fs.RebuildState(lostData); + if (flushErr != null) + return flushErr; + + continue; + } + + var (buffer, loadErr) = LoadBlock(null); + if (loadErr != null || buffer == null) + { + Fs?.Warn("loadBlock error: {0}", loadErr?.Message ?? "unknown"); + if (loadErr is FileNotFoundException) + { + var (lostData, _, rebuildErr) = RebuildStateLocked(); + if (rebuildErr != null && lostData != null && Fs != null) + Fs.RebuildState(lostData); + } + + return loadErr ?? new IOException("failed to load message block"); + } + + var cryptErr = EncryptOrDecryptIfNeeded(buffer); + if (cryptErr != null) + return cryptErr; + + var (decompressed, decompressErr) = DecompressIfNeeded(buffer); + if (decompressErr != null || decompressed == null) + return decompressErr ?? new InvalidDataException("failed to decompress message block"); + + var indexErr = IndexCacheBuf(decompressed); + if (indexErr != null) + { + if (indexErr is InvalidDataException) + { + var (lostData, _, rebuildErr) = RebuildStateLocked(); + if (Fs != null) + Fs.RebuildState(lostData); + if (rebuildErr != null) + return rebuildErr; + continue; + } + + return indexErr; + } + + if (decompressed.Length > 0) + { + Cloads++; + StartCacheExpireTimer(); + } + + return null; + } + } + finally + { + ClearLoading(); + } + } + + // Fetches a message from this block. + internal (StoreMsg? Message, bool ExpireOk, Exception? Error) FetchMsg(ulong seq, StoreMsg? sm) + => FetchMsgEx(seq, sm, doCopy: true); + + // Fetches a message from this block without copy preference. + internal (StoreMsg? Message, bool ExpireOk, Exception? Error) FetchMsgNoCopy(ulong seq, StoreMsg? sm) + => FetchMsgEx(seq, sm, doCopy: false); + + // Fetches a message from this block. + internal (StoreMsg? Message, bool ExpireOk, Exception? Error) FetchMsgEx(ulong seq, StoreMsg? sm, bool doCopy) + { + Mu.EnterWriteLock(); + try + { + var firstSeq = First.Seq; + var lastSeq = Last.Seq; + if (seq < firstSeq || seq > lastSeq) + return (null, false, StoreErrors.ErrStoreMsgNotFound); + + if (Dmap.Exists(seq)) + { + var previous = Llseq; + if (Llseq == 0 || seq < Llseq || seq == Llseq + 1 || seq + 1 == Llseq) + Llseq = seq; + + var deletedExpireOk = (seq == lastSeq && previous == seq - 1) || (seq == firstSeq && previous == seq + 1); + return (null, deletedExpireOk, new InvalidOperationException("deleted message")); + } + + if (CacheNotLoaded()) + { + var loadErr = LoadMsgsWithLock(); + if (loadErr != null) + return (null, false, loadErr); + } + + var cache = CacheData; + if (cache == null || cache.Buf.Length == 0 || cache.Idx.Length == 0 || seq < cache.Fseq) + return (null, false, StoreErrors.ErrStoreMsgNotFound); + + var slot = seq - cache.Fseq; + if (slot >= (ulong)cache.Idx.Length) + return (null, false, StoreErrors.ErrStoreMsgNotFound); + + var raw = cache.Idx[(int)slot]; + if ((raw & FileStoreDefaults.Dbit) != 0) + return (null, false, new InvalidOperationException("deleted message")); + + var offset = (int)(raw & ~(FileStoreDefaults.Dbit | FileStoreDefaults.Cbit)); + if (offset < 0 || offset >= cache.Buf.Length) + return (null, false, new ErrBadMsg(Mfn, "invalid cache index")); + + if (!TryReadRecordHeader(cache.Buf.AsSpan(offset), out var seqRaw, out var ts, out var subjectLength, out var headerLength, out var msgLength, out var recordLength, out var parseErr)) + return (null, false, parseErr ?? new ErrBadMsg(Mfn, "malformed record")); + + if (offset + recordLength > cache.Buf.Length) + return (null, false, new ErrBadMsg(Mfn, "record extends past cache buffer")); + + var record = cache.Buf.AsSpan(offset, recordLength); + var payloadLength = recordLength - FileStoreDefaults.RecordHashSize; + var payload = record[..payloadLength]; + var checksum = record[payloadLength..]; + var storedSeq = seqRaw & ~FileStoreDefaults.Ebit & ~FileStoreDefaults.Tbit; + if (storedSeq != seq) + return (null, false, StoreErrors.ErrStoreMsgNotFound); + + if ((raw & FileStoreDefaults.Cbit) == 0) + { + var computed = SHA256.HashData(payload); + if (!computed.AsSpan(0, FileStoreDefaults.RecordHashSize).SequenceEqual(checksum)) + return (null, false, new ErrBadMsg(Mfn, "invalid checksum")); + + cache.Idx[(int)slot] = raw | FileStoreDefaults.Cbit; + } + + var subjectOffset = 8 + 8 + 4 + 4 + 4; + var headerOffset = subjectOffset + subjectLength; + var msgOffset = headerOffset + headerLength; + + var parsed = sm ?? new StoreMsg(); + parsed.Seq = seq; + parsed.Ts = ts; + parsed.Subject = subjectLength > 0 + ? Encoding.UTF8.GetString(record.Slice(subjectOffset, subjectLength)) + : string.Empty; + + var headerSlice = record.Slice(headerOffset, headerLength); + var msgSlice = record.Slice(msgOffset, msgLength); + if (doCopy) + { + parsed.Hdr = headerSlice.ToArray(); + parsed.Msg = msgSlice.ToArray(); + } + else + { + // StoreMsg uses byte[] fields; preserving that shape still requires arrays. + parsed.Hdr = headerSlice.ToArray(); + parsed.Msg = msgSlice.ToArray(); + } + + parsed.Buf = new byte[parsed.Hdr.Length + parsed.Msg.Length]; + if (parsed.Hdr.Length > 0) + Buffer.BlockCopy(parsed.Hdr, 0, parsed.Buf, 0, parsed.Hdr.Length); + if (parsed.Msg.Length > 0) + Buffer.BlockCopy(parsed.Msg, 0, parsed.Buf, parsed.Hdr.Length, parsed.Msg.Length); + + var prevSeq = Llseq; + if (Llseq == 0 || seq < Llseq || seq == Llseq + 1 || seq + 1 == Llseq) + Llseq = seq; + + Lrts = JetStreamFileStore.TimestampNormalized(DateTime.UtcNow); + var expireOk = (seq == lastSeq && prevSeq == seq - 1) || (seq == firstSeq && prevSeq == seq + 1); + return (parsed, expireOk, null); + } + finally + { + FinishedWithCache(); + Mu.ExitWriteLock(); + } + } + private Exception? FlushPendingToDiskLocked() { - var (pending, pendingErr) = BytesPending(); - if (pendingErr != null || pending == null || pending.Length == 0) - return pendingErr; - - try - { - Mfd ??= new FileStream(Mfn, FileMode.OpenOrCreate, FileAccess.ReadWrite, FileShare.Read); - Mfd.Seek(0, SeekOrigin.End); - Mfd.Write(pending, 0, pending.Length); - Mfd.Flush(SyncAlways); - NeedSync = false; - - if (CacheData != null) - CacheData.Wp = CacheData.Buf.Length; - - return null; - } - catch (Exception ex) - { - Werr = ex; - return ex; - } + var (_, err) = FlushPendingMsgsLocked(); + return err; } private void ResetCacheExpireTimerLocked(TimeSpan due) diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/ConcurrencyTests1.Impltests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/ConcurrencyTests1.Impltests.cs index cf6102a..8543bec 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/ConcurrencyTests1.Impltests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/ConcurrencyTests1.Impltests.cs @@ -7,6 +7,46 @@ namespace ZB.MOM.NatsNet.Server.Tests.ImplBacklog; public sealed partial class ConcurrencyTests1 { + [Fact] // T:2422 + public void NoRaceJetStreamConsumerFileStoreConcurrentDiskIO_ShouldSucceed() + { + WithStore((fs, _) => + { + const int consumerCount = 400; + var start = new ManualResetEventSlim(false); + var errors = new ConcurrentQueue(); + var timestamp = DateTimeOffset.UtcNow.ToUnixTimeSeconds() * 1_000_000_000L; + + var workers = new List(consumerCount); + for (var i = 0; i < consumerCount; i++) + { + var consumer = fs.ConsumerStore( + $"o{i}", + DateTime.UtcNow, + new ConsumerConfig { AckPolicy = AckPolicy.AckExplicit }); + + workers.Add(Task.Run(() => + { + try + { + start.Wait(TimeSpan.FromSeconds(5)); + consumer.UpdateDelivered(22, 22, 1, timestamp); + consumer.EncodedState(); + consumer.Delete(); + } + catch (Exception ex) + { + errors.Enqueue(ex); + } + })); + } + + start.Set(); + Task.WaitAll(workers.ToArray()); + errors.ShouldBeEmpty(); + }); + } + [Fact] // T:2452 public void NoRaceFileStoreStreamMaxAgePerformance_ShouldSucceed() { diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/ConcurrencyTests2.Impltests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/ConcurrencyTests2.Impltests.cs index 8f8b5bd..1d9d2a2 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/ConcurrencyTests2.Impltests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/ConcurrencyTests2.Impltests.cs @@ -8,6 +8,145 @@ namespace ZB.MOM.NatsNet.Server.Tests.ImplBacklog; public sealed partial class ConcurrencyTests2 { + [Fact] // T:2505 + public void NoRaceStoreReverseWalkWithDeletesPerf_ShouldSucceed() + { + var root = NewRoot(); + Directory.CreateDirectory(root); + + var fileCfg = new StreamConfig + { + Name = "zzz", + Subjects = ["foo.*"], + Storage = StorageType.FileStorage, + MaxMsgs = -1, + MaxBytes = -1, + MaxAge = TimeSpan.Zero, + MaxMsgsPer = -1, + Discard = DiscardPolicy.DiscardOld, + Retention = RetentionPolicy.LimitsPolicy, + }; + + var memCfg = fileCfg.Clone(); + memCfg.Storage = StorageType.MemoryStorage; + + var fs = JetStreamFileStore.NewFileStore(new FileStoreConfig { StoreDir = root }, fileCfg); + var ms = JetStreamMemStore.NewMemStore(memCfg); + + try + { + var msg = "Hello"u8.ToArray(); + + foreach (var store in new IStreamStore[] { fs, ms }) + { + store.StoreMsg("foo.A", null, msg, 0).Seq.ShouldBeGreaterThan(0UL); + for (var i = 0; i < 150_000; i++) + store.StoreMsg("foo.B", null, msg, 0); + store.StoreMsg("foo.C", null, msg, 0); + + var state = store.State(); + state.Msgs.ShouldBe(150_002UL); + + var (purged, purgeErr) = store.PurgeEx("foo.B", 1, 0); + purgeErr.ShouldBeNull(); + purged.ShouldBe(150_000UL); + + if (store is JetStreamFileStore fileStore) + PreloadFileStoreCaches(fileStore); + + var timer = Stopwatch.StartNew(); + var scratch = new StoreMsg(); + for (var seq = state.LastSeq; seq > 0; seq--) + { + try + { + _ = store.LoadMsg(seq, scratch); + } + catch (Exception ex) when (ReferenceEquals(ex, StoreErrors.ErrStoreMsgNotFound)) + { + continue; + } + } + timer.Stop(); + var reverseWalkElapsed = timer.Elapsed; + + if (store is JetStreamFileStore fileStore2) + PreloadFileStoreCaches(fileStore2); + + var seen = 0; + var cursor = state.LastSeq; + timer.Restart(); + while (true) + { + var (sm, err) = store.LoadPrevMsg(cursor, scratch); + if (err == StoreErrors.ErrStoreEOF) + break; + + err.ShouldBeNull(); + sm.ShouldNotBeNull(); + cursor = sm!.Seq > 0 ? sm.Seq - 1 : 0; + seen++; + } + timer.Stop(); + + seen.ShouldBe(2); + if (store is JetStreamMemStore) + { + timer.Elapsed.ShouldBeLessThan(reverseWalkElapsed); + } + else + { + (timer.Elapsed.Ticks * 10L).ShouldBeLessThan(reverseWalkElapsed.Ticks); + } + } + } + finally + { + fs.Stop(); + ms.Stop(); + if (Directory.Exists(root)) + Directory.Delete(root, recursive: true); + } + } + + [Fact] // T:2510 + public void NoRaceFileStorePurgeExAsyncTombstones_ShouldSucceed() + { + var cfg = DefaultStreamConfig(); + cfg.Subjects = ["*.*"]; + + WithStore((fs, _) => + { + var msg = "zzz"u8.ToArray(); + + fs.StoreMsg("foo.A", null, msg, 0); + fs.StoreMsg("foo.B", null, msg, 0); + for (var i = 0; i < 500; i++) + fs.StoreMsg("foo.C", null, msg, 0); + fs.StoreMsg("foo.D", null, msg, 0); + + PreloadFileStoreCaches(fs); + + var sw = Stopwatch.StartNew(); + var (purgedOne, errOne) = fs.PurgeEx("foo.B", 0, 0); + sw.Stop(); + errOne.ShouldBeNull(); + purgedOne.ShouldBe(1UL); + var singleElapsed = sw.Elapsed; + + sw.Restart(); + var (purgedMany, errMany) = fs.PurgeEx("foo.C", 0, 0); + sw.Stop(); + errMany.ShouldBeNull(); + purgedMany.ShouldBe(500UL); + var manyElapsed = sw.Elapsed; + + // Large subject purges should not degenerate to per-message sync behavior. + var scaledSingle = Math.Max(1L, singleElapsed.Ticks) * 80L; + scaledSingle.ShouldBeGreaterThan(manyElapsed.Ticks); + }, cfg); + } + [Fact] // T:2491 public void NoRaceFileStoreMsgLoadNextMsgMultiPerf_ShouldSucceed() { @@ -243,5 +382,24 @@ public sealed partial class ConcurrencyTests2 return (T)result; } + private static void PreloadFileStoreCaches(JetStreamFileStore fs) + { + var blksField = typeof(JetStreamFileStore).GetField("_blks", BindingFlags.Instance | BindingFlags.NonPublic); + blksField.ShouldNotBeNull(); + + var blocks = blksField!.GetValue(fs) as System.Collections.IEnumerable; + blocks.ShouldNotBeNull(); + + foreach (var mb in blocks!) + { + var load = mb!.GetType().GetMethod("LoadMsgs", BindingFlags.Instance | BindingFlags.NonPublic | BindingFlags.Public); + load.ShouldNotBeNull(); + + var result = load!.Invoke(mb, []); + if (result is Exception err) + throw err; + } + } + private static string NewRoot() => Path.Combine(Path.GetTempPath(), $"impl-fs-c2-{Guid.NewGuid():N}"); } diff --git a/porting.db b/porting.db index 451d763437338ac6e1b6616f4f10cfe844f00ab7..0104dd880f58cdffbfe1a28f2c0dd8b5b965eeb1 100644 GIT binary patch delta 4166 zcmc(ii&qm@8o=k?nIyxIt@T)Y>Yg53?d~!w_HV%1X?e*tcq0 zfUGi;!Eop+vXhL0F&D_vQ5?rvkkCKWgWQfNg zxHvz3pM#tNS%Djqs2S38=!f8(L~|j35*?u843-XL1ZdF_4$+G))^XkD6>qrIb98QMY3O3~UiD?w|~tQc*rW(&~j zG@CDuvyw7R=V4>MW<_X|H7i8R)T{t)h?+K)ezu73rEb-5gsztaV*=Txwsg{VBH*N( zRoA<;sz|28d?h%~-M#|6ruuR>w=*6Q5FMgN42XzCsO?Ppay7g#kB^5PEs}e{7-xD$ zc3MVu#^{Z5OSP3F85_&NvPp8N7h9w_0@he_JoMa+UyS+93@NZMN=|+Tmilbiet`&ah^GtK@Hu z^tT%Qt)`AvbJ>2%uK`aQn;`Iu`BL>j8aqH^w5%e36lxyk7}*G($xJZ^v-`o%MrNv; zGgtv31ir{*$%1Ynm#6ARu}}3R1-i$v0;|#Ua0FU{W>&{N%0B$v$cRer0yR>8GM~v0 z;T`-zzMXI3*Yb7zdHyutWq8kU#BjjyqQNgdBUX!-#m~Xx!Brytz?zAn%ba6=QmlrL zudy+z{zs+|fx7V*?mkXOL&dLl4^dmm!^SO%*K-4O&Yl=QJf!Gu;oyvuGH4DWFl z_%7TUj?#7Tb`LSNwaN#n?jkfCp;Oc~M6MNJLMDw>Z|md}_gBIRgFI1Wk#BLQshOj* z@MXtPH}s`Z2c%kLha9mj;<<O0IdPxZD*F1T zcb+%{aMdCHZknx1n9ZRa-qZ~n_=rsHk3RChiQ z@JqhA&P=eWj}3)C#me7XjZK{UK{^?N`&k`y#mR#<2$f+2SA>}-%*w;8EX+#7tR&2e z!)!sAZI~aX^TMnMll;(wpoL1bJ(^XZZPUzy_MB$rXbqZ`p;c;DiZ)BLl04KrO^eaS zXtn@Nn=@fP+QS7o56!GSSp>=P@^`q>$ayt3LGC8dW3?Em(AwdYg)Z_u=z3%yZ0V6# z2*wM>GobI4yCJ7n)~jFj%Bd8~zUi`@EC~O?U2E)Rd6`Ju@a9!{DsgYVc2!odUA4%U zlB9ahOifh0CEA$NJlV9;SQ$A>e@53SoZ`Qhp5;x{q$WDaH$-(EbDbmVXUAQ`1l6`W zzMM)XtP5PJDa%v}zjV5~1)t;_50>A%V$@~p;&%y~g&r$nQ-z)Z>s|&LVNHUb!LCGT zU168t)H-|4{jEPeJ2PF4de;8iYVg@&k|5_hyAx_I+8waP6gAv{1+V{!nV0M_I%u@H zk|5A!kAnSI?98xe_sFitV0XZt2Kh_uI(67L_OqY6f_0bskYl{*j>%%gkJ62Z z3u2NXN1v+860-R`3W*K2*X+KEwiz}&{LI9+nMrBVTbyY4jE>gbnLFH ztgBzR*jweSUFw`$vC`|z%yw3J8@x-amOJa0I;*`E&#Y+lI+I3@9GmG(Ub4Jqa`5@o zg?07Iy(z<-tDdf1__VXpyQHFa>2mCN@08?RvGkeRI_I>;`V|e%(az*q70VYpM`oo! z@9WY)?bTCHIP5xj{LhX{>X45eS9w)B<@hfZn%A`D!KC*c10cW4@o#^?;SWUl1JV9K zj6V?D5r~WbAf!^~wJFJ=wRUxzvIKT*iNsp#h%^CfS5j;b3wd*4OPPfuQ$|7WfXFDQ zY*%JO;Vvvv{SWuHE5k#_iUy_n-YKQO>c0U>*rDJoD77Jx)VvfP$?lhw58&uwDFrs~ zR?Rj8lzmV*2b=p(K&xL? zhK4pDep%Uc?`sz9RZ>D*w(nKefVD69cELl-h*y-f`-v4iX*E~16asreii4I{luEd` z_QCPQwol0n9j0KPvI2g5?LqVR`;^qsVP?1FLr==UILLcdnHS9C`(O9btICLwVLJ9J zB=AUti!%{7Ck{2(ba!=ef{Bl!cH%a_V>)c?j65Zs=`U^50oP`tI{k?23o*D+ysZv7 z=6aK;h3Bp3sag@R>WJMp!#6XS+7mIgCsGnWWn>YicIgq<8~?wA_BX}*n-cs@1N}|T zj;2IizJEo z_L_UKfAs5zt!*Wz&n0G@H5d!INLoWd4p;|5V!)mqOqOerm_ziuI_JFgO@1@!av@2` zAY?Ey1bGBWMp6(rG87qx3`bIt5l9*`5=logkW6G0G8)N3#vo&nY-AkrC^8V zAf-qdQm&FN5AP(`vATLHsAz@i9*tL!YWPWgvB%R%;D@swqw2Zn$7o1&2)@3rie0;4fAje`hpaRt&HV{U1=-A>sf4 delta 2705 zcmY*a4RBLc7Jm1>m*nLodG976uRnPyO@AmUr4$OZP${L@DhgQs1OXw`rcxFusEeRh z`@`I36tL>Fhd+^F*p;0{1>NmcW(V&eE3BiXfMNkxN{fm!h(&kYT156G1s31rOXl43 z^Ugiz)i~{6f?N3ggRn~s;I)y*YpQD^Rkr6%ttWO;6{(Iq{&=KH`jMSG zkA{V9mWjePd_;hfS;T<;DOiOuCHNy~K_nXnN^mKwnt~5u_<1M!9;P>u5o}@^{&b~fVvtdHMeP^ z3CV=Tgw=%2glxjj+BD_dI`EWB0oWW>vVkOt4m*ifdjyo ze#Hf3lVXG17nGDiYdIP*t&5rpprO^!+1eKr4Z&fmgh1P*$k6_xk_Njzbvo?xqgAHi zsK)gW1VabBQMq{AE(7{nL>u?hEQ>Wgol~)o&_#H!GQg z9^RLfRyh2OH3hn<;(!w`nF{j~3T-hz4%*_REw1(!b;&MlK@iT|_O?@5me#%-y|l+@c&4iHb@qH*wCLw4}CNB2#PudOb?t}2NXfrx`4I9D75uTUn%}hTg9`bE_KVHZBb*nx`JXv}pMkf#k*l?Z?18)n zQpVOJQX{EmX>a$-F^u(DNC$%Au{gxOwUK1mru``Vin9gSM@auL+gbTOd4c?-TrIcA zjrdo5Mcx@JxR@v^crGr-g*aO*g%mjM%&(0UA~#fc zNiqys6*7bjq!En++D+EVF1z!*^Mv!m zwhN?-eRYB4V)SotT_nRns>Oa*agi*PkpX|bOv;eaeDyM6S1-FsuZF4xO<0D7WonUY zrgJvP`_qO(sM9xwE!&@V)xl0Y9rzMM?gwe7no+mMgqNZ2Ax z({QdZ|5n5Q|J$&>wH)5suaAb>iMzs6h~8G>kO}e@I#GjV9j2AD4TIiKP-w* zw%!NM*E38mLGoPR zYOIW%<$|}($b{6%hSL^Z;ug@R}0ytB>N0rpox^_r0v_YkY1vq5`B@k8`AqZ07D=H826z`N5J8^1ry*=alhp6TYYiz|NE zY2*&N&Wlz+cW9^|%3n7Y#AfKG#nIP|QSp`2mMU49b(eANxKF7SY_b>21F~03RZCrK zot2KcESQSwY%tK}sb}6U&r*!Oh7U`UFG!6_<0NoD8AxJl&Us#ynx*=Hi4+q779l!$&8A9`IIr##r<7?3;u-*#2d3eL_R&h6S;K z8|j+h8W|tBDdxb*u>Z^86H-IvhNW;|e7cv_oeG9T>{`n9o(`5_siAJeVz}BBtcB0c z1ee490zFGyykQY6n3$dl#V3Ppu=;?O#g3m1j`=*xoLC}E6VyYlH=G9@r<7|-nmtV( zVe?vUSo|h?Ctx4S7N&uD?I4@BI&em2u9Kd#2(2xluw|1BIQMt-$nT( z%8MzVOnC|AQz$Q`d@AL4Q(i`SIpz0IelO+ID8G;L`zfDJ`3%ZuQvLwt6_n4Sd^Y8C zD4$FDJjx%Wd_Lt1D1V6Zg_JL%d@d3QKllD^S$*|zTR wLG*Hk75Y98J6X6VTwp;PVavDS6|g&dQFe3ArLc)*CMr!l!g4NEefHG<0H>v(`2YX_ diff --git a/reports/current.md b/reports/current.md index b1c631a..bf9aaeb 100644 --- a/reports/current.md +++ b/reports/current.md @@ -1,6 +1,6 @@ # NATS .NET Porting Status Report -Generated: 2026-02-28 22:45:05 UTC +Generated: 2026-02-28 23:03:57 UTC ## Modules (12 total) @@ -13,18 +13,18 @@ Generated: 2026-02-28 22:45:05 UTC | Status | Count | |--------|-------| | complete | 22 | -| deferred | 1758 | +| deferred | 1738 | | n_a | 24 | | stub | 1 | -| verified | 1868 | +| verified | 1888 | ## Unit Tests (3257 total) | Status | Count | |--------|-------| -| deferred | 1673 | +| deferred | 1670 | | n_a | 249 | -| verified | 1335 | +| verified | 1338 | ## Library Mappings (36 total) @@ -35,4 +35,4 @@ Generated: 2026-02-28 22:45:05 UTC ## Overall Progress -**3510/6942 items complete (50.6%)** +**3533/6942 items complete (50.9%)**