diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/FileStore.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/FileStore.cs index 283cae6..b31ba3c 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/FileStore.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/FileStore.cs @@ -111,6 +111,17 @@ public sealed class JetStreamFileStore : IStreamStore, IDisposable // to the memory store implementation while file-specific APIs are added on top. private readonly JetStreamMemStore _memStore; private static readonly ArrayPool MsgBlockBufferPool = ArrayPool.Shared; + private static readonly object InitLock = new(); + private static SemaphoreSlim? _diskIoSlots; + private static int _diskIoCount; + private const int ConsumerHeaderLength = 2; + private const int MaxVarIntLength = 10; + private const long NanosecondsPerSecond = 1_000_000_000L; + + static JetStreamFileStore() + { + Init(); + } // ----------------------------------------------------------------------- // Constructor @@ -468,6 +479,224 @@ public sealed class JetStreamFileStore : IStreamStore, IDisposable return SubjectsEqual; } + internal static long TimestampNormalized(DateTime t) + { + if (t == default) + return 0; + + var utc = t.Kind == DateTimeKind.Utc ? t : t.ToUniversalTime(); + return (utc - DateTime.UnixEpoch).Ticks * 100L; + } + + internal static void Init() + { + var mp = Environment.ProcessorCount; + var nIo = Math.Min(16, Math.Max(4, mp)); + if (mp > 32) + nIo = Math.Max(16, Math.Min(mp, mp / 2)); + + lock (InitLock) + { + if (_diskIoSlots != null && _diskIoCount == nIo) + return; + + _diskIoSlots?.Dispose(); + _diskIoSlots = new SemaphoreSlim(nIo, nIo); + _diskIoCount = nIo; + } + } + + internal static (byte Version, Exception? Error) CheckConsumerHeader(byte[]? hdr) + { + if (hdr is not { Length: >= ConsumerHeaderLength } || hdr[0] != FileStoreDefaults.FileStoreMagic) + return (0, new InvalidDataException("corrupt state")); + + var version = hdr[1]; + return version switch + { + FileStoreDefaults.FileStoreVersion or FileStoreDefaults.NewVersion => (version, null), + _ => (0, new InvalidDataException($"unsupported version: {version}")), + }; + } + + internal static (ConsumerState? State, Exception? Error) DecodeConsumerState(byte[]? buf) + { + if (buf == null) + return (null, new InvalidDataException("corrupt state")); + + var (version, headerErr) = CheckConsumerHeader(buf); + if (headerErr != null) + return (null, headerErr); + + var index = ConsumerHeaderLength; + if (!TryReadUVarInt(buf, ref index, out var ackConsumer) || + !TryReadUVarInt(buf, ref index, out var ackStream) || + !TryReadUVarInt(buf, ref index, out var deliveredConsumer) || + !TryReadUVarInt(buf, ref index, out var deliveredStream)) + { + return (null, new InvalidDataException("corrupt state")); + } + + var state = new ConsumerState + { + AckFloor = new SequencePair { Consumer = ackConsumer, Stream = ackStream }, + Delivered = new SequencePair { Consumer = deliveredConsumer, Stream = deliveredStream }, + }; + + if (version == FileStoreDefaults.FileStoreVersion) + { + if (state.AckFloor.Consumer > 1) + state.Delivered.Consumer += state.AckFloor.Consumer - 1; + if (state.AckFloor.Stream > 1) + state.Delivered.Stream += state.AckFloor.Stream - 1; + } + + const ulong highBit = 1UL << 63; + if ((state.AckFloor.Stream & highBit) != 0 || (state.Delivered.Stream & highBit) != 0) + return (null, new InvalidDataException("corrupt state")); + + if (!TryReadUVarInt(buf, ref index, out var pendingCount)) + return (null, new InvalidDataException("corrupt state")); + + if (pendingCount > 0) + { + if (!TryReadVarInt(buf, ref index, out var minTs)) + return (null, new InvalidDataException("corrupt state")); + + state.Pending = new Dictionary((int)pendingCount); + for (var i = 0; i < (int)pendingCount; i++) + { + if (!TryReadUVarInt(buf, ref index, out var sseq)) + return (null, new InvalidDataException("corrupt state")); + + var dseq = 0UL; + if (version == FileStoreDefaults.NewVersion && !TryReadUVarInt(buf, ref index, out dseq)) + return (null, new InvalidDataException("corrupt state")); + + if (!TryReadVarInt(buf, ref index, out var ts)) + return (null, new InvalidDataException("corrupt state")); + + sseq += state.AckFloor.Stream; + if (sseq == 0) + return (null, new InvalidDataException("corrupt state")); + + if (version == FileStoreDefaults.NewVersion) + dseq += state.AckFloor.Consumer; + + var adjustedTs = version == FileStoreDefaults.FileStoreVersion + ? (ts + minTs) * NanosecondsPerSecond + : (minTs - ts) * NanosecondsPerSecond; + + state.Pending[sseq] = new Pending + { + Sequence = dseq, + Timestamp = adjustedTs, + }; + } + } + + if (!TryReadUVarInt(buf, ref index, out var redeliveredCount)) + return (null, new InvalidDataException("corrupt state")); + + if (redeliveredCount > 0) + { + state.Redelivered = new Dictionary((int)redeliveredCount); + for (var i = 0; i < (int)redeliveredCount; i++) + { + if (!TryReadUVarInt(buf, ref index, out var seq) || + !TryReadUVarInt(buf, ref index, out var count)) + { + return (null, new InvalidDataException("corrupt state")); + } + + if (seq > 0 && count > 0) + { + seq += state.AckFloor.Stream; + state.Redelivered[seq] = count; + } + } + } + + return (state, null); + } + + internal static Exception? WriteFileWithSync(string name, byte[] data, UnixFileMode perm) + => WriteAtomically(name, data, perm, sync: true); + + internal static Exception? WriteAtomically(string name, byte[] data, UnixFileMode perm, bool sync) + { + ArgumentException.ThrowIfNullOrWhiteSpace(name); + ArgumentNullException.ThrowIfNull(data); + + Init(); + var slots = _diskIoSlots!; + var tmp = name + ".tmp"; + + slots.Wait(); + try + { + var options = sync ? FileOptions.WriteThrough : FileOptions.None; + using (var stream = new FileStream( + tmp, + FileMode.Create, + FileAccess.Write, + FileShare.None, + bufferSize: 4096, + options)) + { + stream.Write(data, 0, data.Length); + stream.Flush(sync); + } + + try + { + File.SetUnixFileMode(tmp, perm); + } + catch (PlatformNotSupportedException) + { + } + + File.Move(tmp, name, overwrite: true); + + if (sync) + { + var dir = Path.GetDirectoryName(Path.GetFullPath(name)); + if (!string.IsNullOrEmpty(dir)) + { + try + { + using var handle = File.OpenHandle(dir, FileMode.Open, FileAccess.Read, FileShare.ReadWrite | FileShare.Delete); + RandomAccess.FlushToDisk(handle); + } + catch + { + // Best-effort directory metadata sync. + } + } + } + + return null; + } + catch (Exception ex) + { + try + { + if (File.Exists(tmp)) + File.Delete(tmp); + } + catch + { + // Best-effort cleanup. + } + + return ex; + } + finally + { + slots.Release(); + } + } + internal static AeadCipher GenEncryptionKey(StoreCipher sc, byte[] seed) { ArgumentNullException.ThrowIfNull(seed); @@ -557,6 +786,57 @@ public sealed class JetStreamFileStore : IStreamStore, IDisposable return DateTimeOffset.FromUnixTimeSeconds(seconds).AddTicks(remainderNanos / 100L).UtcDateTime; } + private static bool TryReadUVarInt(ReadOnlySpan source, ref int index, out ulong value) + { + value = 0; + var shift = 0; + for (var i = 0; i < MaxVarIntLength; i++) + { + if ((uint)index >= (uint)source.Length) + { + index = -1; + value = 0; + return false; + } + + var b = source[index++]; + if (b < 0x80) + { + if (i == MaxVarIntLength - 1 && b > 1) + { + index = -1; + value = 0; + return false; + } + + value |= (ulong)b << shift; + return true; + } + + value |= (ulong)(b & 0x7F) << shift; + shift += 7; + } + + index = -1; + value = 0; + return false; + } + + private static bool TryReadVarInt(ReadOnlySpan source, ref int index, out long value) + { + if (!TryReadUVarInt(source, ref index, out var unsigned)) + { + value = 0; + return false; + } + + value = (long)(unsigned >> 1); + if ((unsigned & 1) != 0) + value = ~value; + + return true; + } + internal void RecoverAEK() { if (_prf == null || _aek != null) diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/ConcurrencyTests1.Impltests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/ConcurrencyTests1.Impltests.cs index 72f96d7..5192cf3 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/ConcurrencyTests1.Impltests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/ConcurrencyTests1.Impltests.cs @@ -81,6 +81,59 @@ public sealed partial class ConcurrencyTests1 }); } + [Fact] // T:2427 + public void NoRaceJetStreamFileStoreKeyFileCleanup_ShouldSucceed() + { + WithStore((_, root) => + { + var msgDir = Path.Combine(root, FileStoreDefaults.MsgDir); + Directory.CreateDirectory(msgDir); + var perm = UnixFileMode.UserRead | UnixFileMode.UserWrite; + + var errors = new ConcurrentQueue(); + Parallel.For(0, 300, i => + { + var payload = BitConverter.GetBytes(i); + var keyFile = Path.Combine(msgDir, string.Format(FileStoreDefaults.KeyScan, (uint)(i + 1))); + var err = JetStreamFileStore.WriteAtomically(keyFile, payload, perm, sync: true); + if (err != null) + errors.Enqueue(err); + }); + + errors.ShouldBeEmpty(); + var keyFiles = Directory.GetFiles(msgDir, "*.key"); + keyFiles.Length.ShouldBe(300); + + foreach (var key in keyFiles.Skip(1)) + File.Delete(key); + + Directory.GetFiles(msgDir, "*.key").Length.ShouldBe(1); + Directory.GetFiles(msgDir, "*.tmp").ShouldBeEmpty(); + }); + } + + [Fact] // T:2447 + public void NoRaceEncodeConsumerStateBug_ShouldSucceed() + { + for (var i = 0; i < 5_000; i++) + { + var pending = new Pending + { + Sequence = 1, + Timestamp = DateTimeOffset.UtcNow.AddSeconds(1).ToUnixTimeSeconds() * 1_000_000_000L, + }; + var state = new ConsumerState + { + Delivered = new SequencePair { Consumer = 1, Stream = 1 }, + Pending = new Dictionary { [1] = pending }, + }; + + var encoded = StoreParity.EncodeConsumerState(state); + var (_, err) = JetStreamFileStore.DecodeConsumerState(encoded); + err.ShouldBeNull(); + } + } + private static void WithStore(Action action, StreamConfig? cfg = null) { var root = NewRoot(); diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamFileStoreTests.Impltests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamFileStoreTests.Impltests.cs index 8bd6b52..368a098 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamFileStoreTests.Impltests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamFileStoreTests.Impltests.cs @@ -910,4 +910,278 @@ public sealed partial class JetStreamFileStoreTests Directory.Delete(root, recursive: true); } } + + [Fact] // T:385 + public void FileStoreConsumer_ShouldSucceed() + { + WithStore((fs, _) => + { + var consumer = fs.ConsumerStore("obs22", DateTime.UtcNow, new ConsumerConfig { AckPolicy = AckPolicy.AckExplicit }); + var (initial, initialErr) = consumer.State(); + initialErr.ShouldBeNull(); + initial.ShouldNotBeNull(); + initial!.Delivered.Consumer.ShouldBe(0UL); + + var state = new ConsumerState + { + Delivered = new SequencePair { Consumer = 100, Stream = 122 }, + AckFloor = new SequencePair { Consumer = 50, Stream = 72 }, + Pending = new Dictionary + { + [75] = new() { Sequence = 75, Timestamp = DateTimeOffset.UtcNow.ToUnixTimeSeconds() * 1_000_000_000L }, + [80] = new() { Sequence = 80, Timestamp = DateTimeOffset.UtcNow.AddSeconds(1).ToUnixTimeSeconds() * 1_000_000_000L }, + }, + Redelivered = new Dictionary + { + [90] = 2, + }, + }; + + consumer.Update(state); + var (updated, updateErr) = consumer.State(); + updateErr.ShouldBeNull(); + updated.ShouldNotBeNull(); + updated!.Delivered.Consumer.ShouldBe(state.Delivered.Consumer); + updated.Delivered.Stream.ShouldBe(state.Delivered.Stream); + updated.AckFloor.Consumer.ShouldBe(state.AckFloor.Consumer); + updated.AckFloor.Stream.ShouldBe(state.AckFloor.Stream); + updated.Pending!.Count.ShouldBe(2); + updated.Redelivered!.Count.ShouldBe(1); + + state.AckFloor = new SequencePair { Consumer = 200, Stream = 100 }; + Should.Throw(() => consumer.Update(state)); + + consumer.Stop(); + }); + } + + [Fact] // T:386 + public void FileStoreConsumerEncodeDecodeRedelivered_ShouldSucceed() + { + var state = new ConsumerState + { + Delivered = new SequencePair { Consumer = 100, Stream = 100 }, + AckFloor = new SequencePair { Consumer = 50, Stream = 50 }, + Redelivered = new Dictionary + { + [122] = 3, + [144] = 8, + }, + }; + + var buf = StoreParity.EncodeConsumerState(state); + var (decoded, err) = JetStreamFileStore.DecodeConsumerState(buf); + err.ShouldBeNull(); + decoded.ShouldNotBeNull(); + decoded!.Delivered.Consumer.ShouldBe(state.Delivered.Consumer); + decoded.Delivered.Stream.ShouldBe(state.Delivered.Stream); + decoded.AckFloor.Consumer.ShouldBe(state.AckFloor.Consumer); + decoded.AckFloor.Stream.ShouldBe(state.AckFloor.Stream); + decoded.Redelivered.ShouldNotBeNull(); + decoded.Redelivered![122].ShouldBe(3UL); + decoded.Redelivered[144].ShouldBe(8UL); + } + + [Fact] // T:387 + public void FileStoreConsumerEncodeDecodePendingBelowStreamAckFloor_ShouldSucceed() + { + var now = DateTimeOffset.UtcNow.ToUnixTimeSeconds() * 1_000_000_000L; + var state = new ConsumerState + { + Delivered = new SequencePair { Consumer = 1192, Stream = 10185 }, + AckFloor = new SequencePair { Consumer = 1189, Stream = 10815 }, + Pending = new Dictionary + { + [10782] = new() { Sequence = 1190, Timestamp = now }, + [10810] = new() { Sequence = 1191, Timestamp = now + 1_000_000_000L }, + [10815] = new() { Sequence = 1192, Timestamp = now + 2_000_000_000L }, + }, + }; + + var buf = StoreParity.EncodeConsumerState(state); + var (decoded, err) = JetStreamFileStore.DecodeConsumerState(buf); + err.ShouldBeNull(); + decoded.ShouldNotBeNull(); + decoded!.Pending.ShouldNotBeNull(); + decoded.Pending.Count.ShouldBe(3); + decoded.Pending.ContainsKey(10782).ShouldBeTrue(); + decoded.Pending.ContainsKey(10810).ShouldBeTrue(); + decoded.Pending.ContainsKey(10815).ShouldBeTrue(); + } + + [Fact] // T:393 + public void FileStoreConsumerRedeliveredLost_ShouldSucceed() + { + var root = NewRoot(); + Directory.CreateDirectory(root); + + try + { + var fs = JetStreamFileStore.NewFileStore(new FileStoreConfig { StoreDir = root }, DefaultStreamConfig()); + var cfg = new ConsumerConfig { AckPolicy = AckPolicy.AckExplicit }; + var consumer = fs.ConsumerStore("o22", DateTime.UtcNow, cfg); + var ts = DateTimeOffset.UtcNow.ToUnixTimeSeconds() * 1_000_000_000L; + + consumer.UpdateDelivered(1, 1, 1, ts); + consumer.UpdateDelivered(2, 1, 2, ts); + consumer.UpdateDelivered(3, 1, 3, ts); + consumer.UpdateDelivered(4, 1, 4, ts); + consumer.UpdateDelivered(5, 2, 1, ts); + consumer.Stop(); + + consumer = fs.ConsumerStore("o22", DateTime.UtcNow, cfg); + var (state, err) = consumer.State(); + err.ShouldBeNull(); + state.ShouldNotBeNull(); + state!.Redelivered.ShouldNotBeNull(); + state.Redelivered.Count.ShouldBeGreaterThan(0); + + consumer.UpdateAcks(2, 1); + consumer.UpdateAcks(5, 2); + + var (afterAcks, afterErr) = consumer.State(); + afterErr.ShouldBeNull(); + afterAcks.ShouldNotBeNull(); + afterAcks!.Pending.ShouldBeNull(); + afterAcks.Redelivered.ShouldBeNull(); + + consumer.Stop(); + fs.Stop(); + } + finally + { + Directory.Delete(root, recursive: true); + } + } + + [Fact] // T:395 + public void FileStoreConsumerDeliveredUpdates_ShouldSucceed() + { + WithStore((fs, _) => + { + var consumer = fs.ConsumerStore("o22", DateTime.UtcNow, new ConsumerConfig()); + + var ts = DateTimeOffset.UtcNow.ToUnixTimeSeconds() * 1_000_000_000L; + consumer.UpdateDelivered(1, 100, 1, ts); + consumer.UpdateDelivered(2, 110, 1, ts); + consumer.UpdateDelivered(5, 130, 1, ts); + + var (state, err) = consumer.State(); + err.ShouldBeNull(); + state.ShouldNotBeNull(); + state!.Delivered.Consumer.ShouldBe(5UL); + state.Delivered.Stream.ShouldBe(130UL); + state.AckFloor.Consumer.ShouldBe(5UL); + state.AckFloor.Stream.ShouldBe(130UL); + state.Pending.ShouldBeNull(); + + Should.Throw(() => consumer.UpdateAcks(1, 100)); + Should.Throw(() => consumer.UpdateDelivered(6, 131, 2, ts)); + + consumer.Stop(); + }); + } + + [Fact] // T:396 + public void FileStoreConsumerDeliveredAndAckUpdates_ShouldSucceed() + { + var root = NewRoot(); + Directory.CreateDirectory(root); + + try + { + var fs = JetStreamFileStore.NewFileStore(new FileStoreConfig { StoreDir = root }, DefaultStreamConfig()); + var cfg = new ConsumerConfig { AckPolicy = AckPolicy.AckExplicit }; + var consumer = fs.ConsumerStore("o22", DateTime.UtcNow, cfg); + var ts = DateTimeOffset.UtcNow.ToUnixTimeSeconds() * 1_000_000_000L; + + consumer.UpdateDelivered(1, 100, 1, ts); + consumer.UpdateDelivered(2, 110, 1, ts); + consumer.UpdateDelivered(3, 130, 1, ts); + consumer.UpdateDelivered(4, 150, 1, ts); + consumer.UpdateDelivered(5, 165, 1, ts); + + var (beforeAcks, beforeErr) = consumer.State(); + beforeErr.ShouldBeNull(); + beforeAcks.ShouldNotBeNull(); + beforeAcks!.Pending!.Count.ShouldBe(5); + + Should.Throw(() => consumer.UpdateAcks(3, 101)); + Should.Throw(() => consumer.UpdateAcks(1, 1)); + + consumer.UpdateAcks(1, 100); + consumer.UpdateAcks(2, 110); + consumer.UpdateAcks(3, 130); + var (afterAcks, afterErr) = consumer.State(); + afterErr.ShouldBeNull(); + afterAcks.ShouldNotBeNull(); + afterAcks!.Pending!.Count.ShouldBe(2); + + consumer.Stop(); + consumer = fs.ConsumerStore("o22", DateTime.UtcNow, cfg); + var (restored, restoredErr) = consumer.State(); + restoredErr.ShouldBeNull(); + restored.ShouldNotBeNull(); + restored!.Pending!.Count.ShouldBe(2); + + consumer.Stop(); + fs.Stop(); + } + finally + { + Directory.Delete(root, recursive: true); + } + } + + [Fact] // T:402 + public void FileStoreBadConsumerState_ShouldSucceed() + { + var bs = new byte[] + { + 0x16, 0x02, 0x01, 0x01, 0x03, 0x02, 0x01, 0x98, 0xF4, 0x8A, + 0x8A, 0x0C, 0x01, 0x03, 0x86, 0xFA, 0x0A, 0x01, 0x00, 0x01, + }; + + var (state, err) = JetStreamFileStore.DecodeConsumerState(bs); + err.ShouldBeNull(); + state.ShouldNotBeNull(); + } + + [Fact] // T:440 + public void FileStoreConsumerStoreEncodeAfterRestart_ShouldSucceed() + { + var root = NewRoot(); + Directory.CreateDirectory(root); + + try + { + var persisted = new ConsumerState + { + Delivered = new SequencePair { Consumer = 22, Stream = 22 }, + AckFloor = new SequencePair { Consumer = 11, Stream = 11 }, + }; + + var fs1 = JetStreamFileStore.NewFileStore(new FileStoreConfig { StoreDir = root }, DefaultStreamConfig()); + var c1 = fs1.ConsumerStore("o22", DateTime.UtcNow, new ConsumerConfig { AckPolicy = AckPolicy.AckExplicit }); + c1.Update(persisted); + c1.Stop(); + fs1.Stop(); + + var fs2 = JetStreamFileStore.NewFileStore(new FileStoreConfig { StoreDir = root }, DefaultStreamConfig()); + var c2 = fs2.ConsumerStore("o22", DateTime.UtcNow, new ConsumerConfig { AckPolicy = AckPolicy.AckExplicit }); + var (state, err) = c2.State(); + err.ShouldBeNull(); + state.ShouldNotBeNull(); + state!.Delivered.Consumer.ShouldBe(persisted.Delivered.Consumer); + state.Delivered.Stream.ShouldBe(persisted.Delivered.Stream); + state.AckFloor.Consumer.ShouldBe(persisted.AckFloor.Consumer); + state.AckFloor.Stream.ShouldBe(persisted.AckFloor.Stream); + c2.Stop(); + fs2.Stop(); + } + finally + { + Directory.Delete(root, recursive: true); + } + } } diff --git a/porting.db b/porting.db index 02375e6..10ec7b4 100644 Binary files a/porting.db and b/porting.db differ diff --git a/reports/current.md b/reports/current.md index b5ab70e..75f620f 100644 --- a/reports/current.md +++ b/reports/current.md @@ -1,38 +1,26 @@ -# NATS .NET Porting Status Report +# Current Porting Status -Generated: 2026-02-28 18:17:03 UTC +Generated: 2026-02-28 19:08:58Z -## Modules (12 total) +=== Porting Status Summary === -| Status | Count | -|--------|-------| -| verified | 12 | +Modules (12 total): + verified 12 -## Features (3673 total) +Features (3673 total): + complete 14 + deferred 1910 + n_a 24 + stub 1 + verified 1724 -| Status | Count | -|--------|-------| -| complete | 14 | -| deferred | 1949 | -| n_a | 24 | -| stub | 1 | -| verified | 1685 | +Unit Tests (3257 total): + deferred 1795 + n_a 241 + verified 1221 -## Unit Tests (3257 total) - -| Status | Count | -|--------|-------| -| deferred | 1918 | -| n_a | 240 | -| verified | 1099 | - -## Library Mappings (36 total) - -| Status | Count | -|--------|-------| -| mapped | 36 | +Library Mappings (36 total): + mapped 36 -## Overall Progress - -**3074/6942 items complete (44.3%)** +Overall Progress: 3236/6942 (46.6%)