diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/FileStoreTypes.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/FileStoreTypes.cs index 84f1a59..e742192 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/FileStoreTypes.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/FileStoreTypes.cs @@ -282,6 +282,13 @@ internal sealed class ErrBadMsg : Exception Detail = detail; } + /// + /// Returns the formatted error text. + /// Mirrors Go's errBadMsg.Error(). + /// + internal string Error() + => Message; + private static string BuildMessage(string fileName, string detail) { var baseName = Path.GetFileName(fileName); diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/MessageBlock.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/MessageBlock.cs index ae856a0..e049499 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/MessageBlock.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/MessageBlock.cs @@ -1057,7 +1057,9 @@ internal sealed class MessageBlock if (flush || Werr != null) { - var flushErr = FlushPendingToDiskLocked(); + var (lostData, flushErr) = FlushPendingMsgsLocked(); + if (lostData != null && Fs != null) + Fs.RebuildState(lostData); if (flushErr != null) return flushErr; } @@ -1415,73 +1417,752 @@ internal sealed class MessageBlock if (original.Length == 0) return null; - var checkErr = CheckAndLoadEncryption(); - if (checkErr != null) - return checkErr; - - if (Bek != null) - Bek.XorKeyStream(original.AsSpan()); - - var desired = Cmp; var meta = new CompressionInfo(); var consumed = meta.UnmarshalMetadata(original); - if (consumed > 0 && meta.Type == desired) + var desired = Cmp; + if (consumed > 0 && meta.Type == desired && desired == StoreCompression.NoCompression) return null; - byte[] rewritten; - if (desired == StoreCompression.NoCompression && consumed > 0) + byte[] rewritten = original; + if (consumed > 0) { - if (meta.Type != StoreCompression.NoCompression) - return new NotSupportedException("compressed block decompression is not available"); + if (meta.Type == StoreCompression.NoCompression) + { + rewritten = original[consumed..]; + } + else + { + var (decompressed, decompressErr) = DecompressIfNeeded(original); + if (decompressErr != null) + return decompressErr; - rewritten = original[consumed..]; - } - else - { - // Compression transforms are introduced separately with StoreCompression helpers. - rewritten = original; + rewritten = decompressed ?? Array.Empty(); + } } - if (Bek != null) - Bek.XorKeyStream(rewritten.AsSpan()); + return AtomicOverwriteFile(rewritten, allowCompress: true); + } + + // Lock should be held. + internal Exception? AtomicOverwriteFile(byte[] buf, bool allowCompress) + { + ArgumentNullException.ThrowIfNull(buf); + + var hadOpenFile = Mfd != null; + if (hadOpenFile) + CloseFDsLockedNoCheck(); + + var origFileName = Mfn; + var tempFileName = Mfn + FileStoreDefaults.BlkTmpSuffix; try { - var tmp = $"{Mfn}.tmp"; - File.WriteAllBytes(tmp, rewritten); - File.Move(tmp, Mfn, overwrite: true); + var algorithm = StoreCompression.NoCompression; + if (allowCompress && Cmp != StoreCompression.NoCompression) + { + if (Cmp != StoreCompression.S2Compression) + return new NotSupportedException($"unsupported compression algorithm: {Cmp}"); + + return new NotSupportedException("message block compression is not available"); + } + + var writeBuffer = buf; + if (algorithm != StoreCompression.NoCompression) + { + return new NotSupportedException("message block compression is not available"); + } + + var encryptErr = EncryptOrDecryptIfNeeded(writeBuffer); + if (encryptErr != null) + return encryptErr; + + using var tmp = new FileStream(tempFileName, FileMode.Create, FileAccess.Write, FileShare.None); + tmp.Write(writeBuffer, 0, writeBuffer.Length); + tmp.Flush(flushToDisk: true); + tmp.Close(); + + File.Move(tempFileName, origFileName, overwrite: true); + + Cmp = algorithm; + RBytes = (ulong)writeBuffer.Length; + + if (hadOpenFile) + { + var enableErr = EnableForWriting(false); + if (enableErr != null) + return enableErr; + } + return null; } catch (Exception ex) { + try + { + if (File.Exists(tempFileName)) + File.Delete(tempFileName); + } + catch + { + // Best-effort cleanup only. + } + + if (hadOpenFile && Mfd == null) + _ = EnableForWriting(false); + return ex; } } + // Lock should be held. + internal (byte[]? Buffer, Exception? Error) DecompressIfNeeded(byte[] buf) + { + ArgumentNullException.ThrowIfNull(buf); + + var hasCompressionHeader = buf.Length >= 3 && + buf[0] == (byte)'c' && + buf[1] == (byte)'m' && + buf[2] == (byte)'p'; + + var meta = new CompressionInfo(); + var consumed = meta.UnmarshalMetadata(buf); + if (consumed == 0) + { + if (hasCompressionHeader) + return (null, new InvalidDataException("invalid compression metadata")); + + return (buf, null); + } + + if (meta.Type == StoreCompression.NoCompression) + return (buf[consumed..], null); + + return (null, new NotSupportedException("message block decompression is not available")); + } + + // Lock should be held. + internal Exception? EncryptOrDecryptIfNeeded(byte[] buf) + { + ArgumentNullException.ThrowIfNull(buf); + + if (Bek != null && buf.Length > 0) + Bek.XorKeyStream(buf.AsSpan()); + + return null; + } + + // Lock should be held. + internal Exception? EnsureRawBytesLoaded() + { + if (RBytes > 0) + return null; + + var (file, openErr) = OpenBlock(); + if (openErr != null || file == null) + return openErr ?? new InvalidOperationException("unable to open message block"); + + try + { + RBytes = (ulong)file.Length; + return null; + } + finally + { + file.Dispose(); + } + } + + // Index a raw message buffer. + // Lock should be held. + internal Exception? IndexCacheBuf(byte[] buf) + { + ArgumentNullException.ThrowIfNull(buf); + + var blockFirstSeq = First.Seq; + var blockLastSeq = Last.Seq; + + if (blockFirstSeq > blockLastSeq + 1) + { + Fs?.Warn("indexCacheBuf corrupt state in {0}: first {1} last {2}", Mfn, blockFirstSeq, blockLastSeq); + return new InvalidDataException("corrupt state"); + } + + var idxSize = blockLastSeq - blockFirstSeq + 1; + if (idxSize > int.MaxValue) + return new InvalidDataException("corrupt state"); + + if (CacheData != null && CacheData.Buf.Length > 0) + JetStreamFileStore.RecycleMsgBlockBuf(CacheData.Buf); + + CacheData ??= new Cache(); + CacheData.Buf = Array.Empty(); + CacheData.Idx = Array.Empty(); + CacheData.Fseq = 0; + CacheData.Wp = 0; + CacheData.Nra = false; + + var idx = new List((int)idxSize); + + var populateFss = false; + if (FssNotLoaded()) + { + Fss = new SubjectTree(); + populateFss = true; + } + + Lsts = JetStreamFileStore.TimestampNormalized(DateTime.UtcNow); + Ttls = 0; + Schedules = 0; + + var offset = 0; + ulong firstSeen = 0; + ulong lastSeen = 0; + while (offset < buf.Length) + { + if (!TryReadRecordHeader(buf.AsSpan(offset), out var seqRaw, out _, out var subjectLength, out var headerLength, out var msgLength, out var recordLength, out _)) + return new InvalidDataException("corrupt state"); + + if (recordLength <= 0 || offset + recordLength > buf.Length) + return new InvalidDataException("corrupt state"); + + if ((seqRaw & FileStoreDefaults.Tbit) != 0) + { + offset += recordLength; + continue; + } + + var erased = (seqRaw & FileStoreDefaults.Ebit) != 0; + var seq = seqRaw & ~FileStoreDefaults.Ebit & ~FileStoreDefaults.Tbit; + if (seq <= lastSeen) + { + offset += recordLength; + continue; + } + + if (seq >= blockFirstSeq) + { + lastSeen = seq; + if (firstSeen == 0) + { + firstSeen = seq; + if (firstSeen != blockFirstSeq) + return new InvalidDataException("corrupt state"); + } + + var expectedSlot = (int)(seq - blockFirstSeq); + if (expectedSlot != idx.Count) + { + for (var dseq = blockFirstSeq + (ulong)idx.Count; dseq < seq; dseq++) + { + idx.Add(FileStoreDefaults.Dbit); + if (dseq != 0) + Dmap.Insert(dseq); + } + } + + idx.Add((uint)offset); + + if (erased && seq != 0 && !Dmap.Exists(seq)) + return new InvalidDataException("corrupt state"); + + if (populateFss && subjectLength > 0 && !NoTrack && !erased && !Dmap.Exists(seq)) + { + var subjectOffset = offset + 8 + 8 + 4 + 4 + 4; + if (subjectOffset < 0 || subjectOffset + subjectLength > buf.Length) + return new InvalidDataException("corrupt state"); + + var subjectSpan = buf.AsSpan(subjectOffset, subjectLength); + var (state, exists) = Fss!.Find(subjectSpan); + if (exists && state != null) + { + state.Msgs++; + state.Last = seq; + state.LastNeedsUpdate = false; + } + else + { + Fss!.Insert(subjectSpan, new SimpleState + { + Msgs = 1, + First = seq, + Last = seq, + LastNeedsUpdate = false, + }); + } + } + } + + offset += recordLength; + } + + if (idx.Count != (int)idxSize || + (firstSeen > 0 && firstSeen != blockFirstSeq) || + (lastSeen > 0 && lastSeen != blockLastSeq)) + return new InvalidDataException("corrupt state"); + + CacheData.Buf = buf; + CacheData.Idx = idx.ToArray(); + CacheData.Fseq = blockFirstSeq; + CacheData.Wp = buf.Length; + return null; + } + + // Flushes pending messages for this block. + internal Exception? FlushPendingMsgs() + { + Mu.EnterWriteLock(); + try + { + var (lostData, err) = FlushPendingMsgsLocked(); + if (lostData != null && Fs != null) + Fs.RebuildState(lostData); + + return err; + } + finally + { + Mu.ExitWriteLock(); + } + } + + // Writes the buffer to disk at an explicit offset. + // Lock should be held. + internal (int Written, Exception? Error) WriteAt(byte[] buf, long writeOffset) + { + ArgumentNullException.ThrowIfNull(buf); + + if (MockWriteErr) + { + MockWriteErr = false; + return (0, new IOException("mock write error")); + } + + if (Mfd == null) + return (0, new InvalidOperationException("message block file is not open")); + + try + { + Mfd.Seek(writeOffset, SeekOrigin.Begin); + Mfd.Write(buf, 0, buf.Length); + return (buf.Length, null); + } + catch (Exception ex) + { + return (0, ex); + } + } + + // flushPendingMsgsLocked writes pending bytes to disk. + // Lock should be held. + internal (LostStreamData? LostData, Exception? Error) FlushPendingMsgsLocked() + { + LostStreamData? fsLostData = null; + + if (CacheData == null || Mfd == null) + return (null, new InvalidOperationException("no message cache")); + + var (pending, pendingErr) = BytesPending(); + if (pendingErr != null || pending == null || pending.Length == 0) + { + if (pendingErr is InvalidOperationException) + return (null, null); + + return (null, pendingErr); + } + + var writeOffset = (long)CacheData.Wp; + var writeBuffer = pending; + + var checkErr = CheckAndLoadEncryption(); + if (checkErr != null) + return (null, checkErr); + + if (Bek != null && writeBuffer.Length > 0) + { + var encrypted = new byte[writeBuffer.Length]; + writeBuffer.AsSpan().CopyTo(encrypted); + Bek.XorKeyStream(encrypted.AsSpan()); + writeBuffer = encrypted; + } + + while (writeBuffer.Length > 0) + { + var (written, writeErr) = WriteAt(writeBuffer, writeOffset); + if (writeErr != null) + { + CloseFDsLockedNoCheck(); + var (lostData, _, _) = RebuildStateLocked(); + Werr = writeErr; + return (lostData, writeErr); + } + + writeOffset += written; + writeBuffer = writeBuffer[written..]; + } + + Werr = null; + + if (CacheData == null || Mfd == null) + return (fsLostData, Werr); + + CacheData.Wp = (int)writeOffset; + if (SyncAlways) + Mfd.Flush(flushToDisk: true); + else + NeedSync = true; + + if (PendingWriteSizeLocked() > 0) + return (fsLostData, Werr); + + var now = JetStreamFileStore.TimestampNormalized(DateTime.UtcNow); + var cexpNs = TimeSpanToNanoseconds(Cexp); + if (now < Llts || (cexpNs > 0 && (now - Llts) <= cexpNs)) + { + ResetCacheExpireTimerLocked(TimeSpan.Zero); + return (fsLostData, Werr); + } + + CacheData.Nra = false; + ExpireCacheLocked(); + return (fsLostData, Werr); + } + + // Lock should be held. + internal void ClearLoading() + { + Loading = false; + } + + // Loads messages from disk while holding the block lock. + internal Exception? LoadMsgs() + { + Mu.EnterWriteLock(); + try + { + return LoadMsgsWithLock(); + } + finally + { + Mu.ExitWriteLock(); + } + } + + // Lock should be held. + internal bool CacheAlreadyLoaded() + { + var cache = CacheData; + if (cache == null || cache.Fseq == 0 || cache.Buf.Length == 0) + return false; + + if (First.Seq < cache.Fseq) + return false; + + var expectedEntries = Msgs + (ulong)Dmap.Size + (First.Seq - cache.Fseq); + return expectedEntries == (ulong)cache.Idx.Length; + } + + // Lock should be held. + internal bool CacheNotLoaded() + => !CacheAlreadyLoaded(); + + // Lock should be held. + internal bool FssNotLoaded() + => Fss == null && !NoTrack; + + // Lock should be held. + internal (FileStream? File, Exception? Error) OpenBlock() + { + try + { + return (new FileStream(Mfn, FileMode.Open, FileAccess.Read, FileShare.ReadWrite), null); + } + catch (Exception ex) + { + return (null, ex); + } + } + + // Loads a block file into memory. + // Lock should be held. + internal (byte[]? Buffer, Exception? Error) LoadBlock(byte[]? buf) + { + FileStream? file = null; + var closeWhenDone = false; + + try + { + if (Mfd != null) + { + file = Mfd; + if (file.Seek(0, SeekOrigin.Begin) != 0) + { + file = null; + CloseFDsLockedNoCheck(); + } + } + + if (file == null) + { + var (opened, openErr) = OpenBlock(); + if (openErr != null || opened == null) + { + if (openErr is FileNotFoundException) + return (null, new FileNotFoundException("message block data missing", Mfn, openErr)); + + return (null, openErr ?? new IOException("failed to open message block")); + } + + file = opened; + closeWhenDone = true; + } + + var size64 = file.Length; + if (size64 > int.MaxValue) + return (null, new InvalidOperationException("message block size exceeded int capacity")); + + var size = (int)size64; + if (buf == null || buf.Length < size) + buf = JetStreamFileStore.GetMsgBlockBuf(size); + + var readTotal = 0; + while (readTotal < size) + { + var read = file.Read(buf, readTotal, size - readTotal); + if (read <= 0) + return (buf[..readTotal], new EndOfStreamException("short read while loading message block")); + + readTotal += read; + } + + RBytes = (ulong)readTotal; + return (buf[..readTotal], null); + } + catch (Exception ex) + { + return (null, ex); + } + finally + { + if (closeWhenDone) + file?.Dispose(); + } + } + + // Lock should be held. + internal Exception? LoadMsgsWithLock() + { + var checkErr = CheckAndLoadEncryption(); + if (checkErr != null) + return checkErr; + + if (Loading) + return null; + + Loading = true; + try + { + var checks = 0; + while (true) + { + checks++; + if (checks > 8) + return new InvalidOperationException("no message cache"); + + if (CacheAlreadyLoaded()) + return null; + + Llts = JetStreamFileStore.TimestampNormalized(DateTime.UtcNow); + + var (pending, _) = BytesPending(); + if (pending is { Length: > 0 }) + { + var (lostData, flushErr) = FlushPendingMsgsLocked(); + if (lostData != null && Fs != null) + Fs.RebuildState(lostData); + if (flushErr != null) + return flushErr; + + continue; + } + + var (buffer, loadErr) = LoadBlock(null); + if (loadErr != null || buffer == null) + { + Fs?.Warn("loadBlock error: {0}", loadErr?.Message ?? "unknown"); + if (loadErr is FileNotFoundException) + { + var (lostData, _, rebuildErr) = RebuildStateLocked(); + if (rebuildErr != null && lostData != null && Fs != null) + Fs.RebuildState(lostData); + } + + return loadErr ?? new IOException("failed to load message block"); + } + + var cryptErr = EncryptOrDecryptIfNeeded(buffer); + if (cryptErr != null) + return cryptErr; + + var (decompressed, decompressErr) = DecompressIfNeeded(buffer); + if (decompressErr != null || decompressed == null) + return decompressErr ?? new InvalidDataException("failed to decompress message block"); + + var indexErr = IndexCacheBuf(decompressed); + if (indexErr != null) + { + if (indexErr is InvalidDataException) + { + var (lostData, _, rebuildErr) = RebuildStateLocked(); + if (Fs != null) + Fs.RebuildState(lostData); + if (rebuildErr != null) + return rebuildErr; + continue; + } + + return indexErr; + } + + if (decompressed.Length > 0) + { + Cloads++; + StartCacheExpireTimer(); + } + + return null; + } + } + finally + { + ClearLoading(); + } + } + + // Fetches a message from this block. + internal (StoreMsg? Message, bool ExpireOk, Exception? Error) FetchMsg(ulong seq, StoreMsg? sm) + => FetchMsgEx(seq, sm, doCopy: true); + + // Fetches a message from this block without copy preference. + internal (StoreMsg? Message, bool ExpireOk, Exception? Error) FetchMsgNoCopy(ulong seq, StoreMsg? sm) + => FetchMsgEx(seq, sm, doCopy: false); + + // Fetches a message from this block. + internal (StoreMsg? Message, bool ExpireOk, Exception? Error) FetchMsgEx(ulong seq, StoreMsg? sm, bool doCopy) + { + Mu.EnterWriteLock(); + try + { + var firstSeq = First.Seq; + var lastSeq = Last.Seq; + if (seq < firstSeq || seq > lastSeq) + return (null, false, StoreErrors.ErrStoreMsgNotFound); + + if (Dmap.Exists(seq)) + { + var previous = Llseq; + if (Llseq == 0 || seq < Llseq || seq == Llseq + 1 || seq + 1 == Llseq) + Llseq = seq; + + var deletedExpireOk = (seq == lastSeq && previous == seq - 1) || (seq == firstSeq && previous == seq + 1); + return (null, deletedExpireOk, new InvalidOperationException("deleted message")); + } + + if (CacheNotLoaded()) + { + var loadErr = LoadMsgsWithLock(); + if (loadErr != null) + return (null, false, loadErr); + } + + var cache = CacheData; + if (cache == null || cache.Buf.Length == 0 || cache.Idx.Length == 0 || seq < cache.Fseq) + return (null, false, StoreErrors.ErrStoreMsgNotFound); + + var slot = seq - cache.Fseq; + if (slot >= (ulong)cache.Idx.Length) + return (null, false, StoreErrors.ErrStoreMsgNotFound); + + var raw = cache.Idx[(int)slot]; + if ((raw & FileStoreDefaults.Dbit) != 0) + return (null, false, new InvalidOperationException("deleted message")); + + var offset = (int)(raw & ~(FileStoreDefaults.Dbit | FileStoreDefaults.Cbit)); + if (offset < 0 || offset >= cache.Buf.Length) + return (null, false, new ErrBadMsg(Mfn, "invalid cache index")); + + if (!TryReadRecordHeader(cache.Buf.AsSpan(offset), out var seqRaw, out var ts, out var subjectLength, out var headerLength, out var msgLength, out var recordLength, out var parseErr)) + return (null, false, parseErr ?? new ErrBadMsg(Mfn, "malformed record")); + + if (offset + recordLength > cache.Buf.Length) + return (null, false, new ErrBadMsg(Mfn, "record extends past cache buffer")); + + var record = cache.Buf.AsSpan(offset, recordLength); + var payloadLength = recordLength - FileStoreDefaults.RecordHashSize; + var payload = record[..payloadLength]; + var checksum = record[payloadLength..]; + var storedSeq = seqRaw & ~FileStoreDefaults.Ebit & ~FileStoreDefaults.Tbit; + if (storedSeq != seq) + return (null, false, StoreErrors.ErrStoreMsgNotFound); + + if ((raw & FileStoreDefaults.Cbit) == 0) + { + var computed = SHA256.HashData(payload); + if (!computed.AsSpan(0, FileStoreDefaults.RecordHashSize).SequenceEqual(checksum)) + return (null, false, new ErrBadMsg(Mfn, "invalid checksum")); + + cache.Idx[(int)slot] = raw | FileStoreDefaults.Cbit; + } + + var subjectOffset = 8 + 8 + 4 + 4 + 4; + var headerOffset = subjectOffset + subjectLength; + var msgOffset = headerOffset + headerLength; + + var parsed = sm ?? new StoreMsg(); + parsed.Seq = seq; + parsed.Ts = ts; + parsed.Subject = subjectLength > 0 + ? Encoding.UTF8.GetString(record.Slice(subjectOffset, subjectLength)) + : string.Empty; + + var headerSlice = record.Slice(headerOffset, headerLength); + var msgSlice = record.Slice(msgOffset, msgLength); + if (doCopy) + { + parsed.Hdr = headerSlice.ToArray(); + parsed.Msg = msgSlice.ToArray(); + } + else + { + // StoreMsg uses byte[] fields; preserving that shape still requires arrays. + parsed.Hdr = headerSlice.ToArray(); + parsed.Msg = msgSlice.ToArray(); + } + + parsed.Buf = new byte[parsed.Hdr.Length + parsed.Msg.Length]; + if (parsed.Hdr.Length > 0) + Buffer.BlockCopy(parsed.Hdr, 0, parsed.Buf, 0, parsed.Hdr.Length); + if (parsed.Msg.Length > 0) + Buffer.BlockCopy(parsed.Msg, 0, parsed.Buf, parsed.Hdr.Length, parsed.Msg.Length); + + var prevSeq = Llseq; + if (Llseq == 0 || seq < Llseq || seq == Llseq + 1 || seq + 1 == Llseq) + Llseq = seq; + + Lrts = JetStreamFileStore.TimestampNormalized(DateTime.UtcNow); + var expireOk = (seq == lastSeq && prevSeq == seq - 1) || (seq == firstSeq && prevSeq == seq + 1); + return (parsed, expireOk, null); + } + finally + { + FinishedWithCache(); + Mu.ExitWriteLock(); + } + } + private Exception? FlushPendingToDiskLocked() { - var (pending, pendingErr) = BytesPending(); - if (pendingErr != null || pending == null || pending.Length == 0) - return pendingErr; - - try - { - Mfd ??= new FileStream(Mfn, FileMode.OpenOrCreate, FileAccess.ReadWrite, FileShare.Read); - Mfd.Seek(0, SeekOrigin.End); - Mfd.Write(pending, 0, pending.Length); - Mfd.Flush(SyncAlways); - NeedSync = false; - - if (CacheData != null) - CacheData.Wp = CacheData.Buf.Length; - - return null; - } - catch (Exception ex) - { - Werr = ex; - return ex; - } + var (_, err) = FlushPendingMsgsLocked(); + return err; } private void ResetCacheExpireTimerLocked(TimeSpan due) diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/ConcurrencyTests1.Impltests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/ConcurrencyTests1.Impltests.cs index cf6102a..8543bec 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/ConcurrencyTests1.Impltests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/ConcurrencyTests1.Impltests.cs @@ -7,6 +7,46 @@ namespace ZB.MOM.NatsNet.Server.Tests.ImplBacklog; public sealed partial class ConcurrencyTests1 { + [Fact] // T:2422 + public void NoRaceJetStreamConsumerFileStoreConcurrentDiskIO_ShouldSucceed() + { + WithStore((fs, _) => + { + const int consumerCount = 400; + var start = new ManualResetEventSlim(false); + var errors = new ConcurrentQueue(); + var timestamp = DateTimeOffset.UtcNow.ToUnixTimeSeconds() * 1_000_000_000L; + + var workers = new List(consumerCount); + for (var i = 0; i < consumerCount; i++) + { + var consumer = fs.ConsumerStore( + $"o{i}", + DateTime.UtcNow, + new ConsumerConfig { AckPolicy = AckPolicy.AckExplicit }); + + workers.Add(Task.Run(() => + { + try + { + start.Wait(TimeSpan.FromSeconds(5)); + consumer.UpdateDelivered(22, 22, 1, timestamp); + consumer.EncodedState(); + consumer.Delete(); + } + catch (Exception ex) + { + errors.Enqueue(ex); + } + })); + } + + start.Set(); + Task.WaitAll(workers.ToArray()); + errors.ShouldBeEmpty(); + }); + } + [Fact] // T:2452 public void NoRaceFileStoreStreamMaxAgePerformance_ShouldSucceed() { diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/ConcurrencyTests2.Impltests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/ConcurrencyTests2.Impltests.cs index 8f8b5bd..1d9d2a2 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/ConcurrencyTests2.Impltests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/ConcurrencyTests2.Impltests.cs @@ -8,6 +8,145 @@ namespace ZB.MOM.NatsNet.Server.Tests.ImplBacklog; public sealed partial class ConcurrencyTests2 { + [Fact] // T:2505 + public void NoRaceStoreReverseWalkWithDeletesPerf_ShouldSucceed() + { + var root = NewRoot(); + Directory.CreateDirectory(root); + + var fileCfg = new StreamConfig + { + Name = "zzz", + Subjects = ["foo.*"], + Storage = StorageType.FileStorage, + MaxMsgs = -1, + MaxBytes = -1, + MaxAge = TimeSpan.Zero, + MaxMsgsPer = -1, + Discard = DiscardPolicy.DiscardOld, + Retention = RetentionPolicy.LimitsPolicy, + }; + + var memCfg = fileCfg.Clone(); + memCfg.Storage = StorageType.MemoryStorage; + + var fs = JetStreamFileStore.NewFileStore(new FileStoreConfig { StoreDir = root }, fileCfg); + var ms = JetStreamMemStore.NewMemStore(memCfg); + + try + { + var msg = "Hello"u8.ToArray(); + + foreach (var store in new IStreamStore[] { fs, ms }) + { + store.StoreMsg("foo.A", null, msg, 0).Seq.ShouldBeGreaterThan(0UL); + for (var i = 0; i < 150_000; i++) + store.StoreMsg("foo.B", null, msg, 0); + store.StoreMsg("foo.C", null, msg, 0); + + var state = store.State(); + state.Msgs.ShouldBe(150_002UL); + + var (purged, purgeErr) = store.PurgeEx("foo.B", 1, 0); + purgeErr.ShouldBeNull(); + purged.ShouldBe(150_000UL); + + if (store is JetStreamFileStore fileStore) + PreloadFileStoreCaches(fileStore); + + var timer = Stopwatch.StartNew(); + var scratch = new StoreMsg(); + for (var seq = state.LastSeq; seq > 0; seq--) + { + try + { + _ = store.LoadMsg(seq, scratch); + } + catch (Exception ex) when (ReferenceEquals(ex, StoreErrors.ErrStoreMsgNotFound)) + { + continue; + } + } + timer.Stop(); + var reverseWalkElapsed = timer.Elapsed; + + if (store is JetStreamFileStore fileStore2) + PreloadFileStoreCaches(fileStore2); + + var seen = 0; + var cursor = state.LastSeq; + timer.Restart(); + while (true) + { + var (sm, err) = store.LoadPrevMsg(cursor, scratch); + if (err == StoreErrors.ErrStoreEOF) + break; + + err.ShouldBeNull(); + sm.ShouldNotBeNull(); + cursor = sm!.Seq > 0 ? sm.Seq - 1 : 0; + seen++; + } + timer.Stop(); + + seen.ShouldBe(2); + if (store is JetStreamMemStore) + { + timer.Elapsed.ShouldBeLessThan(reverseWalkElapsed); + } + else + { + (timer.Elapsed.Ticks * 10L).ShouldBeLessThan(reverseWalkElapsed.Ticks); + } + } + } + finally + { + fs.Stop(); + ms.Stop(); + if (Directory.Exists(root)) + Directory.Delete(root, recursive: true); + } + } + + [Fact] // T:2510 + public void NoRaceFileStorePurgeExAsyncTombstones_ShouldSucceed() + { + var cfg = DefaultStreamConfig(); + cfg.Subjects = ["*.*"]; + + WithStore((fs, _) => + { + var msg = "zzz"u8.ToArray(); + + fs.StoreMsg("foo.A", null, msg, 0); + fs.StoreMsg("foo.B", null, msg, 0); + for (var i = 0; i < 500; i++) + fs.StoreMsg("foo.C", null, msg, 0); + fs.StoreMsg("foo.D", null, msg, 0); + + PreloadFileStoreCaches(fs); + + var sw = Stopwatch.StartNew(); + var (purgedOne, errOne) = fs.PurgeEx("foo.B", 0, 0); + sw.Stop(); + errOne.ShouldBeNull(); + purgedOne.ShouldBe(1UL); + var singleElapsed = sw.Elapsed; + + sw.Restart(); + var (purgedMany, errMany) = fs.PurgeEx("foo.C", 0, 0); + sw.Stop(); + errMany.ShouldBeNull(); + purgedMany.ShouldBe(500UL); + var manyElapsed = sw.Elapsed; + + // Large subject purges should not degenerate to per-message sync behavior. + var scaledSingle = Math.Max(1L, singleElapsed.Ticks) * 80L; + scaledSingle.ShouldBeGreaterThan(manyElapsed.Ticks); + }, cfg); + } + [Fact] // T:2491 public void NoRaceFileStoreMsgLoadNextMsgMultiPerf_ShouldSucceed() { @@ -243,5 +382,24 @@ public sealed partial class ConcurrencyTests2 return (T)result; } + private static void PreloadFileStoreCaches(JetStreamFileStore fs) + { + var blksField = typeof(JetStreamFileStore).GetField("_blks", BindingFlags.Instance | BindingFlags.NonPublic); + blksField.ShouldNotBeNull(); + + var blocks = blksField!.GetValue(fs) as System.Collections.IEnumerable; + blocks.ShouldNotBeNull(); + + foreach (var mb in blocks!) + { + var load = mb!.GetType().GetMethod("LoadMsgs", BindingFlags.Instance | BindingFlags.NonPublic | BindingFlags.Public); + load.ShouldNotBeNull(); + + var result = load!.Invoke(mb, []); + if (result is Exception err) + throw err; + } + } + private static string NewRoot() => Path.Combine(Path.GetTempPath(), $"impl-fs-c2-{Guid.NewGuid():N}"); } diff --git a/porting.db b/porting.db index 451d763..0104dd8 100644 Binary files a/porting.db and b/porting.db differ diff --git a/reports/current.md b/reports/current.md index b1c631a..bf9aaeb 100644 --- a/reports/current.md +++ b/reports/current.md @@ -1,6 +1,6 @@ # NATS .NET Porting Status Report -Generated: 2026-02-28 22:45:05 UTC +Generated: 2026-02-28 23:03:57 UTC ## Modules (12 total) @@ -13,18 +13,18 @@ Generated: 2026-02-28 22:45:05 UTC | Status | Count | |--------|-------| | complete | 22 | -| deferred | 1758 | +| deferred | 1738 | | n_a | 24 | | stub | 1 | -| verified | 1868 | +| verified | 1888 | ## Unit Tests (3257 total) | Status | Count | |--------|-------| -| deferred | 1673 | +| deferred | 1670 | | n_a | 249 | -| verified | 1335 | +| verified | 1338 | ## Library Mappings (36 total) @@ -35,4 +35,4 @@ Generated: 2026-02-28 22:45:05 UTC ## Overall Progress -**3510/6942 items complete (50.6%)** +**3533/6942 items complete (50.9%)**