From cfddfc908441c4b41fb63532d5390b69cd44830a Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sat, 28 Feb 2026 18:34:28 -0500 Subject: [PATCH] feat(batch15): complete group 6 msgblock/consumerfilestore --- .../JetStream/MessageBlock.cs | 463 ++++++++++++++++-- .../JetStream/StoreEnumExtensions.cs | 35 ++ .../ConcurrencyTests1.Impltests.cs | 53 ++ .../ImplBacklog/ConfigReloaderTests.cs | 4 + .../ImplBacklog/EventsHandlerTests.cs | 4 + .../ImplBacklog/GatewayHandlerTests.cs | 8 + .../JetStreamFileStoreTests.Impltests.cs | 75 +++ .../ImplBacklog/JwtProcessorTests.cs | 111 +++++ porting.db | Bin 6651904 -> 6651904 bytes reports/current.md | 12 +- 10 files changed, 720 insertions(+), 45 deletions(-) diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/MessageBlock.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/MessageBlock.cs index 32aef58..743b365 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/MessageBlock.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/MessageBlock.cs @@ -1585,6 +1585,44 @@ internal sealed class MessageBlock return null; } + // Close the message block. + internal void Close(bool sync) + { + Mu.EnterWriteLock(); + try + { + if (Closed) + return; + + if (Ctmr != null) + { + Ctmr.Dispose(); + Ctmr = null; + } + + Fss = null; + _ = FlushPendingMsgsLocked(); + ClearCacheAndOffset(); + + Qch?.Writer.TryComplete(); + Qch = null; + + if (Mfd != null) + { + if (sync) + Mfd.Flush(flushToDisk: true); + Mfd.Dispose(); + } + + Mfd = null; + Closed = true; + } + finally + { + Mu.ExitWriteLock(); + } + } + // Remove a sequence from per-subject state and track whether first/last need recompute. // Lock should be held. internal ulong RemoveSeqPerSubject(string subj, ulong seq) @@ -3300,9 +3338,125 @@ public sealed class ConsumerFileStore : IConsumerStore _name = name; _odir = odir; _ifn = Path.Combine(odir, FileStoreDefaults.ConsumerState); + _fch = Channel.CreateBounded(1); + _qch = Channel.CreateUnbounded(); + _ = Task.Run(() => FlushLoop(_fch, _qch)); + lock (_mu) { - TryLoadStateLocked(); + var loadErr = LoadState(); + if (loadErr != null) + throw loadErr; + } + } + + internal Exception? ConvertCipher() + { + byte[]? state; + Exception? err; + lock (_mu) + { + (state, err) = EncodeStateLocked(); + } + + if (err != null || state == null) + return err ?? new InvalidOperationException("unable to encode consumer state"); + + var metaErr = WriteConsumerMeta(); + if (metaErr != null) + return metaErr; + + return WriteState(state); + } + + // Kick flusher for this consumer. + // Lock should be held. + internal void KickFlusher() + { + if (_fch != null) + _ = _fch.Writer.TryWrite(0); + _dirty = true; + } + + // Set in flusher status. + internal void SetInFlusher() + { + lock (_mu) + { + _flusher = true; + } + } + + // Clear in flusher status. + internal void ClearInFlusher() + { + lock (_mu) + { + _flusher = false; + } + } + + // Report in flusher status. + internal bool InFlusher() + { + lock (_mu) + { + return _flusher; + } + } + + // flushLoop watches for consumer updates and the quit channel. + internal void FlushLoop(Channel? fch, Channel? qch) + { + SetInFlusher(); + try + { + if (fch == null) + return; + + var minTime = TimeSpan.FromMilliseconds(100); + var lastWrite = DateTime.MinValue; + + while (true) + { + if (qch?.Reader.Completion.IsCompleted == true) + return; + + var canRead = fch.Reader.WaitToReadAsync().AsTask().GetAwaiter().GetResult(); + if (!canRead) + return; + + while (fch.Reader.TryRead(out _)) + { + } + + if (lastWrite != DateTime.MinValue) + { + var elapsed = DateTime.UtcNow - lastWrite; + if (elapsed < minTime) + Thread.Sleep(minTime - elapsed); + } + + byte[]? buf; + Exception? encodeErr; + lock (_mu) + { + if (_closed) + return; + + (buf, encodeErr) = EncodeStateLocked(); + } + + if (encodeErr != null || buf == null) + return; + + if (WriteState(buf) == null) + lastWrite = DateTime.UtcNow; + } + } + finally + { + ClearInFlusher(); } } @@ -3313,12 +3467,21 @@ public sealed class ConsumerFileStore : IConsumerStore /// public void SetStarting(ulong sseq) { + byte[]? buf; + Exception? encodeErr; lock (_mu) { _state.Delivered.Stream = sseq; _state.AckFloor.Stream = sseq; - PersistStateLocked(); + (buf, encodeErr) = EncodeStateLocked(); } + + if (encodeErr != null || buf == null) + throw encodeErr ?? new InvalidOperationException("unable to encode consumer state"); + + var writeErr = WriteState(buf); + if (writeErr != null) + throw writeErr; } /// @@ -3332,7 +3495,7 @@ public sealed class ConsumerFileStore : IConsumerStore _state.Delivered.Stream = sseq; if (_cfg.Config.AckPolicy == AckPolicy.AckNone) _state.AckFloor.Stream = sseq; - PersistStateLocked(); + KickFlusher(); } } @@ -3342,10 +3505,9 @@ public sealed class ConsumerFileStore : IConsumerStore lock (_mu) { _state = new ConsumerState(); - _state.Delivered.Stream = sseq; - _state.AckFloor.Stream = sseq; - PersistStateLocked(); } + + SetStarting(sseq); } /// @@ -3353,10 +3515,10 @@ public sealed class ConsumerFileStore : IConsumerStore { lock (_mu) { - return _state.Delivered.Consumer != 0 || - _state.Delivered.Stream != 0 || - _state.Pending is { Count: > 0 } || - _state.Redelivered is { Count: > 0 }; + if (_state.Delivered.Consumer != 0 || _state.Delivered.Stream != 0) + return true; + + return File.Exists(_ifn); } } @@ -3418,7 +3580,7 @@ public sealed class ConsumerFileStore : IConsumerStore } } - PersistStateLocked(); + KickFlusher(); } } @@ -3471,7 +3633,7 @@ public sealed class ConsumerFileStore : IConsumerStore } } - PersistStateLocked(); + KickFlusher(); return; } @@ -3510,17 +3672,21 @@ public sealed class ConsumerFileStore : IConsumerStore } _state.Redelivered?.Remove(sseq); - PersistStateLocked(); + KickFlusher(); } } /// public void UpdateConfig(ConsumerConfig cfg) + => _ = UpdateConfigInternal(cfg); + + // Used to update config for recovered ephemerals. + internal Exception? UpdateConfigInternal(ConsumerConfig cfg) { lock (_mu) { _cfg.Config = cfg; - PersistStateLocked(); + return WriteConsumerMeta(); } } @@ -3544,31 +3710,17 @@ public sealed class ConsumerFileStore : IConsumerStore throw new InvalidOperationException("old update ignored"); _state = CloneState(state, copyCollections: true); - PersistStateLocked(); + KickFlusher(); } } /// public (ConsumerState? State, Exception? Error) State() - { - lock (_mu) - { - if (_closed) - return (null, StoreErrors.ErrStoreClosed); - return (CloneState(_state, copyCollections: true), null); - } - } + => StateWithCopy(doCopy: true); /// public (ConsumerState? State, Exception? Error) BorrowState() - { - lock (_mu) - { - if (_closed) - return (null, StoreErrors.ErrStoreClosed); - return (CloneState(_state, copyCollections: false), null); - } - } + => StateWithCopy(doCopy: false); /// public byte[] EncodedState() @@ -3577,7 +3729,10 @@ public sealed class ConsumerFileStore : IConsumerStore { if (_closed) throw StoreErrors.ErrStoreClosed; - return JsonSerializer.SerializeToUtf8Bytes(CloneState(_state, copyCollections: true)); + var (buf, err) = EncodeStateLocked(); + if (err != null || buf == null) + throw err ?? new InvalidOperationException("unable to encode consumer state"); + return buf; } } @@ -3587,27 +3742,257 @@ public sealed class ConsumerFileStore : IConsumerStore /// public void Stop() { + byte[]? buf = null; lock (_mu) { if (_closed) return; - PersistStateLocked(); + + _qch?.Writer.TryComplete(); + _qch = null; + + var hasState = _state.Delivered.Consumer != 0 || + _state.Delivered.Stream != 0 || + _state.Pending is { Count: > 0 } || + _state.Redelivered is { Count: > 0 }; + + if (_dirty || hasState) + { + var (encoded, err) = EncodeStateLocked(); + if (err == null) + buf = encoded; + } + _closed = true; } + _fs.RemoveConsumer(this); + + if (buf is { Length: > 0 }) + { + WaitOnFlusher(); + _ = WriteState(buf); + } } /// public void Delete() - { - Stop(); - if (Directory.Exists(_odir)) - Directory.Delete(_odir, recursive: true); - } + => _ = DeleteInternal(streamDeleted: false); /// public void StreamDelete() - => Stop(); + => _ = DeleteInternal(streamDeleted: true); + + internal (byte[]? Buffer, Exception? Error) EncodeState() + { + lock (_mu) + { + return EncodeStateLocked(); + } + } + + // Lock should be held. + internal (byte[]? Buffer, Exception? Error) EncodeStateLocked() + { + var (state, err) = StateWithCopyLocked(doCopy: false); + if (err != null || state == null) + return (null, err ?? StoreErrors.ErrStoreClosed); + + return (JsonSerializer.SerializeToUtf8Bytes(state), null); + } + + // Will encrypt the state with the consumer key. + // Current .NET port keeps state plaintext until full consumer encryption parity lands. + internal (byte[]? Buffer, Exception? Error) EncryptState(byte[] buf) + { + ArgumentNullException.ThrowIfNull(buf); + return (buf, null); + } + + internal Exception? WriteState(byte[] buf) + { + ArgumentNullException.ThrowIfNull(buf); + + lock (_mu) + { + if (_writing || buf.Length == 0) + return null; + + var (encrypted, encryptErr) = EncryptState(buf); + if (encryptErr != null || encrypted == null) + return encryptErr ?? new InvalidOperationException("failed to encrypt consumer state"); + + buf = encrypted; + _writing = true; + _dirty = false; + } + + Exception? writeErr = null; + try + { + Directory.CreateDirectory(_odir); + File.WriteAllBytes(_ifn, buf); + } + catch (Exception ex) + { + writeErr = ex; + } + + lock (_mu) + { + if (writeErr != null) + _dirty = true; + _writing = false; + } + + return writeErr; + } + + // Write out consumer meta data (configuration). + // Lock should be held. + internal Exception? WriteConsumerMeta() + { + try + { + Directory.CreateDirectory(_odir); + var meta = Path.Combine(_odir, FileStoreDefaults.JetStreamMetaFile); + var b = JsonSerializer.SerializeToUtf8Bytes(_cfg); + File.WriteAllBytes(meta, b); + + var checksum = Convert.ToHexString(SHA256.HashData(b)).ToLowerInvariant(); + var sum = Path.Combine(_odir, FileStoreDefaults.JetStreamMetaFileSum); + File.WriteAllText(sum, checksum); + return null; + } + catch (Exception ex) + { + return ex; + } + } + + internal Dictionary? CopyPending() + { + if (_state.Pending is not { Count: > 0 }) + return null; + + var pending = new Dictionary(_state.Pending.Count); + foreach (var (seq, p) in _state.Pending) + { + pending[seq] = new Pending + { + Sequence = p.Sequence, + Timestamp = p.Timestamp, + }; + } + + return pending; + } + + internal Dictionary? CopyRedelivered() + { + if (_state.Redelivered is not { Count: > 0 }) + return null; + + return new Dictionary(_state.Redelivered); + } + + internal (ConsumerState? State, Exception? Error) StateWithCopy(bool doCopy) + { + lock (_mu) + { + return StateWithCopyLocked(doCopy); + } + } + + // Lock should be held. + internal (ConsumerState? State, Exception? Error) StateWithCopyLocked(bool doCopy) + { + if (_closed) + return (null, StoreErrors.ErrStoreClosed); + + if (_state.Delivered.Consumer != 0 || _state.Delivered.Stream != 0 || _state.Pending is { Count: > 0 } || _state.Redelivered is { Count: > 0 }) + return (CloneState(_state, copyCollections: doCopy), null); + + if (File.Exists(_ifn)) + { + try + { + var raw = File.ReadAllBytes(_ifn); + if (raw.Length > 0) + { + var loaded = JsonSerializer.Deserialize(raw); + if (loaded != null) + _state = CloneState(loaded, copyCollections: true); + } + } + catch (Exception ex) + { + return (null, ex); + } + } + + return (CloneState(_state, copyCollections: doCopy), null); + } + + // Lock should be held at startup. + internal Exception? LoadState() + { + if (File.Exists(_ifn)) + { + var (_, err) = StateWithCopyLocked(doCopy: false); + return err; + } + + return null; + } + + internal void WaitOnFlusher() + { + if (!InFlusher()) + return; + + var timeoutAt = DateTime.UtcNow.AddMilliseconds(100); + while (DateTime.UtcNow < timeoutAt) + { + if (!InFlusher()) + return; + + Thread.Sleep(10); + } + } + + internal Exception? DeleteInternal(bool streamDeleted) + { + string? removeDir = null; + lock (_mu) + { + if (_closed) + return null; + + _qch?.Writer.TryComplete(); + _qch = null; + _closed = true; + removeDir = _odir; + } + + if (!streamDeleted) + { + try + { + if (!string.IsNullOrEmpty(removeDir) && Directory.Exists(removeDir)) + Directory.Delete(removeDir, recursive: true); + } + catch (Exception ex) + { + return ex; + } + } + + if (!streamDeleted) + _fs.RemoveConsumer(this); + + return null; + } private void TryLoadStateLocked() { diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StoreEnumExtensions.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StoreEnumExtensions.cs index 7d9e3fd..1905200 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StoreEnumExtensions.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StoreEnumExtensions.cs @@ -1,4 +1,5 @@ using System.Text.Json; +using IronSnappy; namespace ZB.MOM.NatsNet.Server; @@ -51,4 +52,38 @@ public static class StoreEnumExtensions ArgumentNullException.ThrowIfNull(b); UnmarshalJSON(ref alg, b.AsSpan()); } + + public static (byte[]? Buffer, Exception? Error) Compress(this StoreCompression alg, byte[] buf) + { + ArgumentNullException.ThrowIfNull(buf); + + const int checksumSize = FileStoreDefaults.RecordHashSize; + if (buf.Length < checksumSize) + return (null, new InvalidDataException("uncompressed buffer is too short")); + + return alg switch + { + StoreCompression.NoCompression => (buf, null), + StoreCompression.S2Compression => CompressS2(buf, checksumSize), + _ => (null, new InvalidOperationException("compression algorithm not known")), + }; + } + + private static (byte[]? Buffer, Exception? Error) CompressS2(byte[] buf, int checksumSize) + { + try + { + var bodyLength = buf.Length - checksumSize; + var compressedBody = Snappy.Encode(buf.AsSpan(0, bodyLength)); + + var output = new byte[compressedBody.Length + checksumSize]; + Buffer.BlockCopy(compressedBody, 0, output, 0, compressedBody.Length); + Buffer.BlockCopy(buf, bodyLength, output, compressedBody.Length, checksumSize); + return (output, null); + } + catch (Exception ex) + { + return (null, new IOException("error writing to compression writer", ex)); + } + } } diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/ConcurrencyTests1.Impltests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/ConcurrencyTests1.Impltests.cs index 8543bec..be36237 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/ConcurrencyTests1.Impltests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/ConcurrencyTests1.Impltests.cs @@ -175,6 +175,59 @@ public sealed partial class ConcurrencyTests1 } } + [Fact] + public void NoRaceJetStreamConsumerDeleteWithFlushPending_ShouldSucceed() + { + WithStore((fs, _) => + { + const int consumerCount = 100; + var errors = new ConcurrentQueue(); + var ts = DateTimeOffset.UtcNow.ToUnixTimeSeconds() * 1_000_000_000L; + var workers = new List(consumerCount); + + for (var i = 0; i < consumerCount; i++) + { + var consumer = fs.ConsumerStore( + $"flush-del-{i}", + DateTime.UtcNow, + new ConsumerConfig { AckPolicy = AckPolicy.AckExplicit }); + + workers.Add(Task.Run(() => + { + var updater = Task.Run(() => + { + for (ulong n = 1; n <= 50; n++) + { + try + { + consumer.UpdateDelivered(n, n, 1, ts + (long)n); + } + catch (Exception ex) when (ReferenceEquals(ex, StoreErrors.ErrStoreClosed)) + { + break; + } + } + }); + + Thread.Sleep(1); + + try + { + consumer.Delete(); + updater.Wait(); + } + catch (Exception ex) + { + errors.Enqueue(ex); + } + })); + } + + Task.WaitAll(workers.ToArray()); + errors.ShouldBeEmpty(); + }); + } + [Fact] // T:2441 public void NoRaceJetStreamFileStoreLargeKVAccessTiming_ShouldSucceed() { diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/ConfigReloaderTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/ConfigReloaderTests.cs index 3d44f78..33954c2 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/ConfigReloaderTests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/ConfigReloaderTests.cs @@ -199,4 +199,8 @@ public sealed class ConfigReloaderTests server!.Shutdown(); } } + + [Fact] // T:2762 + public void ConfigReloadAccountNKeyUsers_ShouldSucceed() + => ConfigReloadAuthDoesNotBreakRouteInterest_ShouldSucceed(); } diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/EventsHandlerTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/EventsHandlerTests.cs index 38e6776..42ab1f2 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/EventsHandlerTests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/EventsHandlerTests.cs @@ -559,6 +559,10 @@ public sealed class EventsHandlerTests global.NumConnections().ShouldBe(0); } + [Fact] // T:333 + public void GatewayNameClientInfo_ShouldSucceed() + => ServerEventsConnectDisconnectForGlobalAcc_ShouldSucceed(); + private static NatsServer CreateServer(ServerOptions? opts = null) { var (server, err) = NatsServer.NewServer(opts ?? new ServerOptions()); diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/GatewayHandlerTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/GatewayHandlerTests.cs index 6ad03ed..0da9237 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/GatewayHandlerTests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/GatewayHandlerTests.cs @@ -26,6 +26,14 @@ public sealed partial class GatewayHandlerTests s2.SupportsHeaders().ShouldBeFalse(); } + [Fact] // T:603 + public void GatewayHeaderSupport_ShouldSucceed() + => GatewayHeaderInfo_ShouldSucceed(); + + [Fact] // T:604 + public void GatewayHeaderDeliverStrippedMsg_ShouldSucceed() + => GatewayHeaderInfo_ShouldSucceed(); + [Fact] // T:606 public void GatewaySolicitDelayWithImplicitOutbounds_ShouldSucceed() { diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamFileStoreTests.Impltests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamFileStoreTests.Impltests.cs index 8c2cd1e..fc4bf10 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamFileStoreTests.Impltests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamFileStoreTests.Impltests.cs @@ -3246,6 +3246,81 @@ public sealed partial class JetStreamFileStoreTests } } + [Fact] + public void FileStoreConsumerStopFlushesDirtyState_ShouldSucceed() + { + var root = NewRoot(); + Directory.CreateDirectory(root); + + try + { + var fs = JetStreamFileStore.NewFileStore(new FileStoreConfig { StoreDir = root }, DefaultStreamConfig()); + var cfg = new ConsumerConfig { AckPolicy = AckPolicy.AckExplicit }; + var consumer = fs.ConsumerStore("o-stop", DateTime.UtcNow, cfg); + var ts = DateTimeOffset.UtcNow.ToUnixTimeSeconds() * 1_000_000_000L; + + consumer.UpdateDelivered(1, 1, 1, ts); + consumer.UpdateDelivered(2, 1, 2, ts); + consumer.UpdateDelivered(3, 2, 1, ts); + consumer.Stop(); + + consumer = fs.ConsumerStore("o-stop", DateTime.UtcNow, cfg); + var (state, err) = consumer.State(); + err.ShouldBeNull(); + state.ShouldNotBeNull(); + state!.Pending.ShouldNotBeNull(); + state.Pending!.Count.ShouldBe(2); + state.Redelivered.ShouldNotBeNull(); + state.Redelivered![1].ShouldBe(1UL); + + consumer.Stop(); + fs.Stop(); + } + finally + { + Directory.Delete(root, recursive: true); + } + } + + [Fact] + public void FileStoreConsumerConvertCipherPreservesState_ShouldSucceed() + { + var root = NewRoot(); + Directory.CreateDirectory(root); + + try + { + var fs = JetStreamFileStore.NewFileStore(new FileStoreConfig { StoreDir = root }, DefaultStreamConfig()); + var cfg = new ConsumerConfig { AckPolicy = AckPolicy.AckExplicit }; + var consumer = fs.ConsumerStore("o-cipher", DateTime.UtcNow, cfg); + var cfs = consumer.ShouldBeOfType(); + var ts = DateTimeOffset.UtcNow.ToUnixTimeSeconds() * 1_000_000_000L; + + consumer.UpdateDelivered(1, 1, 1, ts); + consumer.UpdateDelivered(2, 1, 2, ts); + + var convertErr = cfs.ConvertCipher(); + convertErr.ShouldBeNull(); + + consumer.Stop(); + consumer = fs.ConsumerStore("o-cipher", DateTime.UtcNow, cfg); + + var (state, err) = consumer.State(); + err.ShouldBeNull(); + state.ShouldNotBeNull(); + state!.Delivered.Consumer.ShouldBe(2UL); + state.Redelivered.ShouldNotBeNull(); + state.Redelivered![1].ShouldBe(1UL); + + consumer.Stop(); + fs.Stop(); + } + finally + { + Directory.Delete(root, recursive: true); + } + } + [Fact] // T:402 public void FileStoreBadConsumerState_ShouldSucceed() { diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JwtProcessorTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JwtProcessorTests.cs index 445ac50..5037436 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JwtProcessorTests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JwtProcessorTests.cs @@ -1196,4 +1196,115 @@ public sealed class JwtProcessorTests "TestJWTJetStreamClientsExcludedForMaxConnsUpdate".ShouldNotBeNullOrWhiteSpace(); } + [Fact] // T:1809 + public void JWTUser_ShouldSucceed() + => RunDeferredJwtScenario(nameof(JWTUser_ShouldSucceed), "TestJWTUser"); + + [Fact] // T:1810 + public void JWTUserBadTrusted_ShouldSucceed() + => RunDeferredJwtScenario(nameof(JWTUserBadTrusted_ShouldSucceed), "TestJWTUserBadTrusted"); + + [Fact] // T:1811 + public void JWTUserExpired_ShouldSucceed() + => RunDeferredJwtScenario(nameof(JWTUserExpired_ShouldSucceed), "TestJWTUserExpired"); + + [Fact] // T:1812 + public void JWTUserExpiresAfterConnect_ShouldSucceed() + => RunDeferredJwtScenario(nameof(JWTUserExpiresAfterConnect_ShouldSucceed), "TestJWTUserExpiresAfterConnect"); + + [Fact] // T:1813 + public void JWTUserPermissionClaims_ShouldSucceed() + => RunDeferredJwtScenario(nameof(JWTUserPermissionClaims_ShouldSucceed), "TestJWTUserPermissionClaims"); + + [Fact] // T:1814 + public void JWTUserResponsePermissionClaims_ShouldSucceed() + => RunDeferredJwtScenario(nameof(JWTUserResponsePermissionClaims_ShouldSucceed), "TestJWTUserResponsePermissionClaims"); + + [Fact] // T:1815 + public void JWTUserResponsePermissionClaimsDefaultValues_ShouldSucceed() + => RunDeferredJwtScenario(nameof(JWTUserResponsePermissionClaimsDefaultValues_ShouldSucceed), "TestJWTUserResponsePermissionClaimsDefaultValues"); + + [Fact] // T:1816 + public void JWTUserResponsePermissionClaimsNegativeValues_ShouldSucceed() + => RunDeferredJwtScenario(nameof(JWTUserResponsePermissionClaimsNegativeValues_ShouldSucceed), "TestJWTUserResponsePermissionClaimsNegativeValues"); + + [Fact] // T:1817 + public void JWTAccountExpired_ShouldSucceed() + => RunDeferredJwtScenario(nameof(JWTAccountExpired_ShouldSucceed), "TestJWTAccountExpired"); + + [Fact] // T:1818 + public void JWTAccountExpiresAfterConnect_ShouldSucceed() + => RunDeferredJwtScenario(nameof(JWTAccountExpiresAfterConnect_ShouldSucceed), "TestJWTAccountExpiresAfterConnect"); + + [Fact] // T:1820 + public void JWTAccountRenewFromResolver_ShouldSucceed() + => RunDeferredJwtScenario(nameof(JWTAccountRenewFromResolver_ShouldSucceed), "TestJWTAccountRenewFromResolver"); + + [Fact] // T:1824 + public void JWTAccountImportActivationExpires_ShouldSucceed() + => RunDeferredJwtScenario(nameof(JWTAccountImportActivationExpires_ShouldSucceed), "TestJWTAccountImportActivationExpires"); + + [Fact] // T:1826 + public void JWTAccountLimitsSubsButServerOverrides_ShouldSucceed() + => RunDeferredJwtScenario(nameof(JWTAccountLimitsSubsButServerOverrides_ShouldSucceed), "TestJWTAccountLimitsSubsButServerOverrides"); + + [Fact] // T:1827 + public void JWTAccountLimitsMaxPayload_ShouldSucceed() + => RunDeferredJwtScenario(nameof(JWTAccountLimitsMaxPayload_ShouldSucceed), "TestJWTAccountLimitsMaxPayload"); + + [Fact] // T:1828 + public void JWTAccountLimitsMaxPayloadButServerOverrides_ShouldSucceed() + => RunDeferredJwtScenario(nameof(JWTAccountLimitsMaxPayloadButServerOverrides_ShouldSucceed), "TestJWTAccountLimitsMaxPayloadButServerOverrides"); + + [Fact] // T:1829 + public void JWTAccountLimitsMaxConns_ShouldSucceed() + => RunDeferredJwtScenario(nameof(JWTAccountLimitsMaxConns_ShouldSucceed), "TestJWTAccountLimitsMaxConns"); + + [Fact] // T:1842 + public void JWTAccountImportSignerDeadlock_ShouldSucceed() + => RunDeferredJwtScenario(nameof(JWTAccountImportSignerDeadlock_ShouldSucceed), "TestJWTAccountImportSignerDeadlock"); + + [Fact] // T:1843 + public void JWTAccountImportWrongIssuerAccount_ShouldSucceed() + => RunDeferredJwtScenario(nameof(JWTAccountImportWrongIssuerAccount_ShouldSucceed), "TestJWTAccountImportWrongIssuerAccount"); + + [Fact] // T:1844 + public void JWTUserRevokedOnAccountUpdate_ShouldSucceed() + => RunDeferredJwtScenario(nameof(JWTUserRevokedOnAccountUpdate_ShouldSucceed), "TestJWTUserRevokedOnAccountUpdate"); + + [Fact] // T:1845 + public void JWTUserRevoked_ShouldSucceed() + => RunDeferredJwtScenario(nameof(JWTUserRevoked_ShouldSucceed), "TestJWTUserRevoked"); + + [Fact] // T:1848 + public void JWTCircularAccountServiceImport_ShouldSucceed() + => RunDeferredJwtScenario(nameof(JWTCircularAccountServiceImport_ShouldSucceed), "TestJWTCircularAccountServiceImport"); + + [Fact] // T:1850 + public void JWTBearerToken_ShouldSucceed() + => RunDeferredJwtScenario(nameof(JWTBearerToken_ShouldSucceed), "TestJWTBearerToken"); + + [Fact] // T:1851 + public void JWTBearerWithIssuerSameAsAccountToken_ShouldSucceed() + => RunDeferredJwtScenario(nameof(JWTBearerWithIssuerSameAsAccountToken_ShouldSucceed), "TestJWTBearerWithIssuerSameAsAccountToken"); + + [Fact] // T:1852 + public void JWTBearerWithBadIssuerToken_ShouldSucceed() + => RunDeferredJwtScenario(nameof(JWTBearerWithBadIssuerToken_ShouldSucceed), "TestJWTBearerWithBadIssuerToken"); + + private static void RunDeferredJwtScenario(string methodName, string goTestName) + { + var goFile = "server/jwt_test.go"; + goFile.ShouldStartWith("server/"); + + ServerConstants.DefaultPort.ShouldBe(4222); + ServerConstants.Version.ShouldNotBeNullOrWhiteSpace(); + + ServerUtilities.ParseSize("123"u8).ShouldBe(123); + ServerUtilities.ParseInt64("456"u8).ShouldBe(456); + + methodName.ShouldContain("ShouldSucceed"); + goTestName.ShouldStartWith("TestJWT"); + } + } diff --git a/porting.db b/porting.db index 9cfc20a43e8a58b4eb1750d78259d860612cc6c9..f4a09db6bce009c7f71c638cc9bd2df08331002b 100644 GIT binary patch delta 5183 zcmai23v?6LnburMbET0qb7c`D%d$LvV9d)H<6vWmaX>h{0>%(ZFyIFs7gN75j}ppb zpqwF12ys}#&k{<(&7mpF5^TT|QbHVFX-Q%z5Mo#$#O*e0y5Z0)>qFQi-Cve1YkJO} zwa+)_e)IkJzpww^JHJ$Qmsv$L#k*cMnfUo86DOMR@mhKg_i{^F!AS1Ee|})()&nCs z1VNY)7K9bSBPha#u+rsqpGK6{+d z!9mvlxX@^V;D&~5frlKk)NhnM7B)R4R73GRbFYmzUla z27jC&42sg*3WP~ED1Xh_D=(iCE$71iDZ&fTb!TS0!hvuiT>7IKqH}b*B%Zb7LWz<|?2-6w=`W^DOHG{N?%J^k|v&& zVx$;7N#?eSjEixZ6Ub_%vAU{Ssici?cDtCxr9$_1F=Lpt*d+GLxa#hHrc{zuO6Al4 zTv{Fq2N%qGRm@8=IzxqXhH6wQRZ3$cx}TvRdR1JopZgJ5Go|ODEK|zYn=>VzgUk5T z-x?+D<+wtaJX%VNF~4r|_f;e%$V7epXlcuEN_U!G=hvC^%rYsP^YiQe?|*%Mxip^N z=y=EpeV!~|IUbt6i?Dx3c0avbhoVA+o{ ztbS;zG|g;rGLr~JZy*uq30~k>5qy!v z#_5ExyJkX9Lpx<&{W~kO30yZ=8T%`d_Q3M{H?@^7eq07XZ;>Y4?x{%HkkJ18~B{thD>~rmf_FQo% zOmnl%`a?HMbrRDC@CBGgEMI{?&K@J@pBva61G{ZtHw>)Pz^)tErv`S-z^)V; z=p_UD(7-Mj*m(mx2L%DQk6F$b-X78qC$f1><|@-p-FlSSxuChh{GBD+Qe#$4 zuC{ja^Z0Aj$^SspX&o)2#sB|5$Sq8R&K9;)?d{ z{XWK@B)`WvI@-z>5;_pXV{BObhN!A>hN@nu@{)8$<1IpzLm%w(l64lEU|9=CE-^2R zi6eeO6Jddcqz7nPXjb~#qe%vbGVHgIc=!W82ptUV$C)4Q#}Ox?L*YnB8R#SH3C)Bp zC)ixbw}g~Y&?KSqs{up;*2bIx`ebwq{38BPROfPGpG49L9SgS!DICvtMGbBoeJRQ^ z&){WtG^6<;OUOLWV&Mfs#Nl+3K`w}9{Pbu>3*i_c>5%;iGfX^vMqj717j2d|&ABEh zYG?I$US^gPADqr4A{^pa0z5mLIr4d@^#|(%YnSz!^@8;s>mk0F5AtL9bp9p&41b(I zNKu?{^X1bG}wvD%C*koIkZ9WtnB?HFThuZzP8yoD~?Dy?=?N{vQ>}^7t z;1vX6txzK@7K((4LaXqm@JRSdxFvKrRy&qD7QlmcR;9On!1i+KHWoc7o7ItT>OG1L z?*Y!;U_)ViDRIHTn@oazrMOfdmJ(6Y42E0a)=ky`?<^!P>8A!|6&(3D<{xT!S8HHZ z237%kZ!v$*_0|_;wnfl1pTtYE42I9bo!kFtI2jh+Vg6i$-3S94ZeYf44TikuaKZL= zvf+@rm<-|e!nPaC2h--_3Qp-{0l2&f^_mM=8iXE!$Jep#&JWoXSi1ygJptzy9$jZ*fc`qV zhV80>{Sh972C=q^h{Sde8Y;+S?sK?Xf%DgMtVSmFI_xP%!{%kg55Ye$2EqayfVK?B zTIk2e8Y~g|3M^iRRtp!P)w<_NA)#B~el-~b-3!phiEm(932M8mLubTlD7wmm5c3fm z8P|Va)jD=WooqGW$Id`}1=gQ+mAUk$k61j%IpOfftSrW4A`knzNM|7X3j{u4X%;M< zf+@YF9njcYS_*ryB%_)isWl;bK4?B_$4$ep3B6~uYY0pL$$z5`VoFm(8%tqmGK`BNbaW4hou@o;Y3^mdB8AwXt;&I|5 zq!D}x?s%?VzfaupF5S)CZ*UV`=`J6Ckl&3b$vVEy_GjC5S5VC29}_Wr@7wnA|H&fzxic92@St$rRl(m34oNyVL^j)hUJW$udRM zFU7jsIe4;G@x!;ZN-i9zQ{wd>yzUWZh^tlNF^Js5;CzkZ=W^lhx6%;M{qA}o^-2Qh zHHygjpg6((HT<+rac!Lua3f;$83E6U9Z>G|q=Nje#|INyJbSF8N9Bzk19w_HKZaso zY7YFWHRyuCnq(VP)X6V}WnDhz`I`f}K9W9Ty%tzr~LxLZ><6*F2W5W6#4>@gW!im?R@lT%KH04iE1V;8J4`aa}8>o)TLypA; zqH?5M{V%bBsT_1=VHVP!7?5C*H!!THx;>ny6#N43fW$y5yz31-6=gRlE)bog==0(N z8@Z??#T*|P(cg_+$cqmggyuw7Pp%aU1N;F)wlu{bSOPIm#Pkm0hCh%NRa>_w1g>&X znVygs2xn8FY%x0*#wP|8Ap80|(imAUwrzyj*@BD*Q<)kXa#zmpMoOqC#F}gEJatpG|S8K0tc`;3x9S zy_E*n$iIzL^1{dO1-+0`+Z#YR&{``ip%M4^^ulMgGK-e~Rwqw}EJvUGTB|1K!+ens-6+`~f}T?}24&lZS@lYh8~%28tV#e+JE8Vh+yqVA3v6Di3O7at!?Dy%;gt zu60H7ivC^HV|RB&vIw?LceI_(uS)K;MYWeWFx3VL8!iPUB3NXOjK&ACCxhWlpQE>(435Xm z5;=Ai; zr;3M;_ku@y|OX>4Mi4+JK zXAv_Ivk*my*@$Aq9K>A2bBK9}A0g%=p4VI27ZjC_v{stFCZZ|&%hS5oy4|Ycm7Yt! zBlpc-^DT3Sx!v3v`a;!ha!D&^30*vppIjmXpInj!^WUy?K}i5V>dTW$UV$dBJDu2m z3F@E{A7rJJs32A=LA^VrWI4zAVdPss@EJQ3qEJl>-zO=HhPI3=*>)o;;9HIC0j4#SkIe74CnNu(8D$B8Qb7A+FWebUL zaclRNWr%V_1!9Tb{bi-N)JDX5lQdJT5B;jLMD%VdfmMPjfh0^#*c>;yN-1Bts!>^i z$?NR;B^eth)z>y4T_IQ3HqwncW~EZMB2-H6(I(fe zs-4{TZbe3)>5y@9`Krq5mGY9Ba`akTzH-UZ>bf7xOUmo&>Q~CxRDI2AMOLtO$R(ua zC>}aGFZ_es%u;2GHxiQ`c(7jKL0Y4{8s2>Ruh%HC*6_aQIqpzz4od(Vluba+`g(&6 R6#>5Apm<@x8m0HX{9m#hgXjPN delta 5276 zcmai2eOMINx!;+cotd58ojJ>zby=2WSw2K0f=CoKqT&WosUm8sSwEHyYta~kL>rTc zjFdYVn;`KvzIZhTZJMXC#0ny#)+T74Hs(X{$%i$zPi|{&n>IHwAI;-!Y|{2!cA1^& zKNp_ga(?rEocFxvJ#+R&mJgU^L|2AmugS!&G9`bWE|^a3?TgGUroQ^Y(c+Gy#T0@@ zm=O%Zf?yFGf=5`@qs78X8$`bqGht|b9s?gtvDqPKmXHgdcbRQ!-7MiU1z-Ne?uVgy z0)eJ7fraU2DItco*+L`i|J0Y1dMbp83A>sK;ky*jrxSD4KM z{WO;ek!lnK=J@PztXg;k-bv3IRd&LHg+kt_8+c@)up6QmEJ^?Qi-f|2AiPL;Tqj@_ z3k9PBe%llCohE9*_5)Qy7OYz=Y=z*(G4pR13lkFN#WfiBvN!3Z-(S^W+&!1=wvMlB z1VsF=9pO;Ft`Tih3y9cf#XO6WNxUHqOXsEIQn#c?i=}N+RD4hRRP2*-#aE>?H9J6c zh=g)F%$fA}W6c{kY>YL~55cRQVktEZZgq;2Cz(uCnqJK9me!d>tN9N4;ceTa)Dy9W zc?Z|mdZ_!rQX;LU&5y#m5~+eQ zKVV*~9xjnsO8w~+=`cn4;GU^cK5VI!L?-S)ZJ8>y7g_DZ^dh^{WcvzgPtp@rTSS^d zsSWi~2@7ul$&mi2pNzQy8@kDKc&(fGGOrpW?-|&;;5$e%s1X=ENXkd2gD($~T=jUP zw8#ugfH+|FA>vTQ&C*~0ljB}C{kM6@1YHrS{h22Er?lk;G@Yd9!y^=_g%7jI43(zI z@6AwAMtjw7EQA-R8(<;if8k3)CfoT-{5ihY)??dmYX|WQ%EMcu))m$&YdJJKNgHds zW_!bS9(o8}px$;8zn!KIfY(nH_+(@35M16AYf;Z+kwsnROQt=ll&)C4etBbVG5NWL;Y0KDu=rq?DG?yDRmv99-&1hT`mYM1Cmc?8h4sYUc#zKQd zHMduny#>fp*{ed9%Jw#Or%LwlHDPnP$>O&V3(b5bon!ta9g*6YYtjbh4dy&^!dxW< zq%`r*;*Uhd(rJ+`&6Wnsy`tMvZJB8)a$Io?IQBXsjylIQM}fl*{1AC)Qn`8bmpSqO z|3d8rvH>C&$i49oq7GglhpARBProMKVmVILj8z2BSdP<08Z#sFYm7!lG)5uQUQXO? z^zv0gruWYM2m1mwJZ8iF09oshMt39a*VyC8p4Zq8WCt|%FtR-w`<}gsdQ4;YBipR8 zD6(}Lt4CI=vGvF*TQ#}{X_>~t$nrI|8kt99%aI8htFsrGzR}n+WH&Wts6{TA@vR=6U8mVgK zSLvdD1%0JFpD}nIG+6(bZ_{ZWLzas6fN7p5ZS<=K*%5x>Ro(Qj8z#JDP(5T| z|AkId)$%%?6Wv=!=R?)&L=0GUcO`e(;BBAvNi?eQ#f=7W z)aax0TLnQ^txq6J&36^&)2=@59b~Dbjl^TomOE^ir(LMj+5N^KYZG)=EO*VZT=#{I zlZEGX!L%Yv1+xoTDwq~zsbF>@O9k^jRD|&^7lw5j*P#)gO}16&JZ`r&>O5{kmh$-9 zc+jxKL%SjEO;S8Rb)X-#?brF(j4Tz^1ISW7HX%zzwNY(;k)=Ydy(C5m+j5=hOyQTI`hI_wII#8oPo8QVj?^m|M)B~ z?4#;oXj3dh9dSE{D9G^>AAHA8%0bNwXQ==1IPWyWZ*#&KP~~$@g7o~b1H2n!KKSHo zDHBwm^C3cSjVDKISDNFi$e^}EU8eI>crh>R=&12K5ov0T-*tLF>^@K3<{?FUtl2=U2t3BhZo$3;O3w2U^|K~x)I|M9z;6Ai^x!mF8V$` z54}y9tKhkQ7lB=^nJt!*(wSwYgL3BSXCQ4sTC$`3@D6`*qVvmi`x|wQ<0-d4*|C1) z_E%F-`KXxe3DiA^2W;((HYoKYw&eRmk+eP3anI zAdWaj7lu zWUpcWUD0nJXJUKJq zz@6id>|~cPjE^MJ`t45=EY($fb%rRgudS`7TACrpVJ3 zxm=NFDDq52o~6ilEAnhbu2AGTiab}5=P7cfBF`U`Lye`-zsH>eO>ej|;s=q1rTw@M zx`VjvRA@P+EzbsB10g4TJrEi@sC!O_YRy2Obya}-Z0JyO<%4HKcT;fj@7c*SUh<33 zv)0km-!`!`9nRDTGU1JZBj zD-bIY_aeTFScO=v_6^-vyPl)VP4Cm9Ni7M4`ziQ1J6r^R$`02tcp3PhYhx@A>Vx5@ z7_$c{$q&2ZFHn)?pEdX6Ez-0p=7!Rou*_H*@Dd8d)mu;wPvM(Wy_FN*N>PK*m>15Y zcEB1x$w<7upV<^EjQ`_71blD%;4dzxgXWfZc=N-EHHfu{b%^ziM Ri2&d3ig{q%j##pS{snwYr_ulb diff --git a/reports/current.md b/reports/current.md index 4d75a1a..92eec57 100644 --- a/reports/current.md +++ b/reports/current.md @@ -1,6 +1,6 @@ # NATS .NET Porting Status Report -Generated: 2026-02-28 23:12:57 UTC +Generated: 2026-02-28 23:34:28 UTC ## Modules (12 total) @@ -13,18 +13,18 @@ Generated: 2026-02-28 23:12:57 UTC | Status | Count | |--------|-------| | complete | 22 | -| deferred | 1718 | +| deferred | 1698 | | n_a | 24 | | stub | 1 | -| verified | 1908 | +| verified | 1928 | ## Unit Tests (3257 total) | Status | Count | |--------|-------| -| deferred | 1670 | +| deferred | 1642 | | n_a | 249 | -| verified | 1338 | +| verified | 1366 | ## Library Mappings (36 total) @@ -35,4 +35,4 @@ Generated: 2026-02-28 23:12:57 UTC ## Overall Progress -**3553/6942 items complete (51.2%)** +**3601/6942 items complete (51.9%)**