diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/Auth/Ocsp/OcspTypes.cs b/dotnet/src/ZB.MOM.NatsNet.Server/Auth/Ocsp/OcspTypes.cs index b2c77d4..fe6742a 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/Auth/Ocsp/OcspTypes.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/Auth/Ocsp/OcspTypes.cs @@ -153,15 +153,105 @@ public interface IOcspResponseCache void Remove(string key); } +/// +/// Runtime counters for OCSP response cache behavior. +/// Mirrors Go OCSPResponseCacheStats shape. +/// +public sealed class OcspResponseCacheStats +{ + public long Responses { get; set; } + public long Hits { get; set; } + public long Misses { get; set; } + public long Revokes { get; set; } + public long Goods { get; set; } + public long Unknowns { get; set; } +} + /// /// A no-op OCSP cache that never stores anything. /// Mirrors Go NoOpCache in server/ocsp_responsecache.go. /// internal sealed class NoOpCache : IOcspResponseCache { - public byte[]? Get(string key) => null; - public void Put(string key, byte[] response) { } - public void Remove(string key) { } + private readonly Lock _mu = new(); + private readonly OcspResponseCacheConfig _config; + private OcspResponseCacheStats? _stats; + private bool _online; + + public NoOpCache() + : this(new OcspResponseCacheConfig { Type = "none" }) + { + } + + public NoOpCache(OcspResponseCacheConfig config) + { + _config = config; + } + + public byte[]? Get(string key) => null; + + public void Put(string key, byte[] response) { } + + public void Remove(string key) => Delete(key); + + public void Delete(string key) + { + _ = key; + } + + public void Start(NatsServer? server = null) + { + lock (_mu) + { + _stats = new OcspResponseCacheStats(); + _online = true; + } + } + + public void Stop(NatsServer? server = null) + { + lock (_mu) + { + _online = false; + } + } + + public bool Online() + { + lock (_mu) + { + return _online; + } + } + + public string Type() => "none"; + + public OcspResponseCacheConfig Config() + { + lock (_mu) + { + return _config; + } + } + + public OcspResponseCacheStats? Stats() + { + lock (_mu) + { + if (_stats is null) + return null; + + return new OcspResponseCacheStats + { + Responses = _stats.Responses, + Hits = _stats.Hits, + Misses = _stats.Misses, + Revokes = _stats.Revokes, + Goods = _stats.Goods, + Unknowns = _stats.Unknowns, + }; + } + } } /// diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamApiTypes.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamApiTypes.cs index 53ea072..7234cbc 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamApiTypes.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamApiTypes.cs @@ -24,8 +24,27 @@ namespace ZB.MOM.NatsNet.Server; /// Stub: stored message type — full definition in session 20. public sealed class StoredMsg { } -/// Priority group for pull consumers — full definition in session 20. -public sealed class PriorityGroup { } +/// +/// Priority group for pull consumers. +/// Mirrors PriorityGroup in server/consumer.go. +/// +public sealed class PriorityGroup +{ + [JsonPropertyName("group")] + public string Group { get; set; } = string.Empty; + + [JsonPropertyName("min_pending")] + public long MinPending { get; set; } + + [JsonPropertyName("min_ack_pending")] + public long MinAckPending { get; set; } + + [JsonPropertyName("id")] + public string Id { get; set; } = string.Empty; + + [JsonPropertyName("priority")] + public int Priority { get; set; } +} // --------------------------------------------------------------------------- // API subject constants diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StoreTypes.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StoreTypes.cs index 46434ec..442847e 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StoreTypes.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StoreTypes.cs @@ -970,20 +970,21 @@ public static class DiskAvailability private const long JetStreamMaxStoreDefault = 1L * 1024 * 1024 * 1024 * 1024; /// - /// Returns approximately 75% of available disk space at . - /// Returns (1 TB) if the check fails. + /// Returns approximately 75% of available disk space at . + /// Ensures the directory exists before probing and falls back to the default + /// cap if disk probing fails. /// - public static long Available(string path) + public static long DiskAvailable(string storeDir) { - // TODO: session 17 — implement via DriveInfo or P/Invoke statvfs on non-Windows. try { - var drive = new DriveInfo(Path.GetPathRoot(Path.GetFullPath(path)) ?? path); + if (!string.IsNullOrWhiteSpace(storeDir)) + Directory.CreateDirectory(storeDir); + + var root = Path.GetPathRoot(Path.GetFullPath(storeDir)); + var drive = new DriveInfo(root ?? storeDir); if (drive.IsReady) - { - // Estimate 75% of available free space, matching Go behaviour. return drive.AvailableFreeSpace / 4 * 3; - } } catch { @@ -993,8 +994,14 @@ public static class DiskAvailability return JetStreamMaxStoreDefault; } + /// + /// Returns approximately 75% of available disk space at . + /// Returns (1 TB) if the check fails. + /// + public static long Available(string path) => DiskAvailable(path); + /// /// Returns true if at least bytes are available at . /// - public static bool Check(string path, long needed) => Available(path) >= needed; + public static bool Check(string path, long needed) => DiskAvailable(path) >= needed; } diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StreamTypes.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StreamTypes.cs index c68dafc..5244c6c 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StreamTypes.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StreamTypes.cs @@ -409,6 +409,9 @@ public sealed class WaitingRequest /// Bytes accumulated so far. public int B { get; set; } + + /// Optional pull request priority group metadata. + public PriorityGroup? PriorityGroup { get; set; } } /// @@ -418,9 +421,15 @@ public sealed class WaitingRequest public sealed class WaitQueue { private readonly List _reqs = new(); + private readonly int _max; private int _head; private int _tail; + public WaitQueue(int max = 0) + { + _max = max; + } + /// Number of pending requests in the queue. public int Len => _tail - _head; @@ -432,6 +441,43 @@ public sealed class WaitQueue _tail++; } + /// + /// Add a waiting request ordered by priority while preserving FIFO order + /// within each priority level. + /// + public bool AddPrioritized(WaitingRequest req) + { + ArgumentNullException.ThrowIfNull(req); + if (IsFull(_max)) + return false; + InsertSorted(req); + return true; + } + + /// Insert a request in priority order (lower number = higher priority). + public void InsertSorted(WaitingRequest req) + { + ArgumentNullException.ThrowIfNull(req); + + if (Len == 0) + { + Add(req); + return; + } + + var priority = PriorityOf(req); + var insertAt = _head; + while (insertAt < _tail) + { + if (PriorityOf(_reqs[insertAt]) > priority) + break; + insertAt++; + } + + _reqs.Insert(insertAt, req); + _tail++; + } + /// Peek at the head request without removing it. public WaitingRequest? Peek() { @@ -443,13 +489,123 @@ public sealed class WaitQueue /// Remove and return the head request. public WaitingRequest? Pop() { - if (Len == 0) + var wr = Peek(); + if (wr is null) return null; - var req = _reqs[_head++]; + wr.D++; + wr.N--; + if (wr.N > 0 && Len > 1) + { + RemoveCurrent(); + Add(wr); + } + else if (wr.N <= 0) + { + RemoveCurrent(); + } + + return wr; + } + + /// Returns true if the queue contains no active requests. + public bool IsEmpty() => Len == 0; + + /// Rotate the head request to the tail. + public void Cycle() + { + var wr = Peek(); + if (wr is null) + return; + + RemoveCurrent(); + Add(wr); + } + + /// Pop strategy used by pull consumers based on priority policy. + public WaitingRequest? PopOrPopAndRequeue(PriorityPolicy priority) + => priority == PriorityPolicy.PriorityPrioritized ? PopAndRequeue() : Pop(); + + /// + /// Pop and requeue to the end of the same priority band while preserving + /// stable order within that band. + /// + public WaitingRequest? PopAndRequeue() + { + var wr = Peek(); + if (wr is null) + return null; + + wr.D++; + wr.N--; + + if (wr.N > 0 && Len > 1) + { + // Remove the current head and insert it back in priority order. + _reqs.RemoveAt(_head); + _tail--; + InsertSorted(wr); + } + else if (wr.N <= 0) + { + RemoveCurrent(); + } + + return wr; + } + + /// Remove the current head request from the queue. + public void RemoveCurrent() => Remove(null, Peek()); + + /// Remove a specific request from the queue. + public void Remove(WaitingRequest? pre, WaitingRequest? wr) + { + if (wr is null || Len == 0) + return; + + var removeAt = -1; + + if (pre is not null) + { + for (var i = _head; i < _tail; i++) + { + if (!ReferenceEquals(_reqs[i], pre)) + continue; + + var candidate = i + 1; + if (candidate < _tail && ReferenceEquals(_reqs[candidate], wr)) + removeAt = candidate; + break; + } + } + + if (removeAt < 0) + { + for (var i = _head; i < _tail; i++) + { + if (ReferenceEquals(_reqs[i], wr)) + { + removeAt = i; + break; + } + } + } + + if (removeAt < 0) + return; + + if (removeAt == _head) + { + _head++; + } + else + { + _reqs.RemoveAt(removeAt); + _tail--; + } + if (_head > 32 && _head * 2 >= _tail) Compress(); - return req; } /// Compact the internal backing list to reclaim removed slots. @@ -470,6 +626,8 @@ public sealed class WaitQueue return false; return Len >= max; } + + private static int PriorityOf(WaitingRequest req) => req.PriorityGroup?.Priority ?? int.MaxValue; } /// diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/Auth/OcspResponseCacheTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/Auth/OcspResponseCacheTests.cs index 718270e..d568544 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/Auth/OcspResponseCacheTests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/Auth/OcspResponseCacheTests.cs @@ -31,13 +31,30 @@ public sealed class OcspResponseCacheTests } [Fact] - public void NoOpCache_AndMonitor_ShouldNoOpSafely() + public void NoOpCache_LifecycleAndStats_ShouldNoOpSafely() { var noOp = new NoOpCache(); + noOp.Online().ShouldBeFalse(); + noOp.Type().ShouldBe("none"); + noOp.Config().ShouldNotBeNull(); + noOp.Stats().ShouldBeNull(); + + noOp.Start(); + noOp.Online().ShouldBeTrue(); + noOp.Stats().ShouldNotBeNull(); + noOp.Put("k", [5]); noOp.Get("k").ShouldBeNull(); - noOp.Remove("k"); + noOp.Remove("k"); // alias to Delete + noOp.Delete("k"); + noOp.Stop(); + noOp.Online().ShouldBeFalse(); + } + + [Fact] + public void OcspMonitor_StartAndStop_ShouldLoadStaple() + { var dir = Path.Combine(Path.GetTempPath(), $"ocsp-monitor-{Guid.NewGuid():N}"); Directory.CreateDirectory(dir); try diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/DiskAvailabilityTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/DiskAvailabilityTests.cs new file mode 100644 index 0000000..c7c8936 --- /dev/null +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/DiskAvailabilityTests.cs @@ -0,0 +1,58 @@ +// Copyright 2012-2026 The NATS Authors +// Licensed under the Apache License, Version 2.0 + +using Shouldly; +using ZB.MOM.NatsNet.Server; + +namespace ZB.MOM.NatsNet.Server.Tests.JetStream; + +public sealed class DiskAvailabilityTests +{ + private const long JetStreamMaxStoreDefault = 1L * 1024 * 1024 * 1024 * 1024; + + [Fact] + public void DiskAvailable_MissingDirectory_ShouldCreateDirectory() + { + var root = Path.Combine(Path.GetTempPath(), $"disk-avail-{Guid.NewGuid():N}"); + var target = Path.Combine(root, "nested"); + try + { + Directory.Exists(target).ShouldBeFalse(); + + var available = DiskAvailability.DiskAvailable(target); + + Directory.Exists(target).ShouldBeTrue(); + available.ShouldBeGreaterThan(0L); + } + finally + { + if (Directory.Exists(root)) + Directory.Delete(root, recursive: true); + } + } + + [Fact] + public void DiskAvailable_InvalidPath_ShouldReturnFallback() + { + var available = DiskAvailability.DiskAvailable("\0"); + available.ShouldBe(JetStreamMaxStoreDefault); + } + + [Fact] + public void Check_ShouldUseDiskAvailableThreshold() + { + var root = Path.Combine(Path.GetTempPath(), $"disk-check-{Guid.NewGuid():N}"); + try + { + var available = DiskAvailability.DiskAvailable(root); + + DiskAvailability.Check(root, Math.Max(0, available - 1)).ShouldBeTrue(); + DiskAvailability.Check(root, available + 1).ShouldBeFalse(); + } + finally + { + if (Directory.Exists(root)) + Directory.Delete(root, recursive: true); + } + } +} diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/NatsConsumerTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/NatsConsumerTests.cs index e73f9f1..2ae7f32 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/NatsConsumerTests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/NatsConsumerTests.cs @@ -35,4 +35,82 @@ public sealed class NatsConsumerTests consumer.Stop(); consumer.IsLeader().ShouldBeFalse(); } + + [Fact] // T:1364 + public void SortingConsumerPullRequests_ShouldSucceed() + { + var q = new WaitQueue(max: 100); + + q.AddPrioritized(new WaitingRequest { Reply = "1a", PriorityGroup = new PriorityGroup { Priority = 1 }, N = 1 }) + .ShouldBeTrue(); + q.AddPrioritized(new WaitingRequest { Reply = "2a", PriorityGroup = new PriorityGroup { Priority = 2 }, N = 1 }) + .ShouldBeTrue(); + q.AddPrioritized(new WaitingRequest { Reply = "1b", PriorityGroup = new PriorityGroup { Priority = 1 }, N = 1 }) + .ShouldBeTrue(); + q.AddPrioritized(new WaitingRequest { Reply = "2b", PriorityGroup = new PriorityGroup { Priority = 2 }, N = 1 }) + .ShouldBeTrue(); + q.AddPrioritized(new WaitingRequest { Reply = "1c", PriorityGroup = new PriorityGroup { Priority = 1 }, N = 1 }) + .ShouldBeTrue(); + q.AddPrioritized(new WaitingRequest { Reply = "3a", PriorityGroup = new PriorityGroup { Priority = 3 }, N = 1 }) + .ShouldBeTrue(); + q.AddPrioritized(new WaitingRequest { Reply = "2c", PriorityGroup = new PriorityGroup { Priority = 2 }, N = 1 }) + .ShouldBeTrue(); + + var expectedOrder = new[] + { + ("1a", 1), + ("1b", 1), + ("1c", 1), + ("2a", 2), + ("2b", 2), + ("2c", 2), + ("3a", 3), + }; + + q.Len.ShouldBe(expectedOrder.Length); + foreach (var (reply, priority) in expectedOrder) + { + var current = q.Peek(); + current.ShouldNotBeNull(); + current!.Reply.ShouldBe(reply); + current.PriorityGroup.ShouldNotBeNull(); + current.PriorityGroup!.Priority.ShouldBe(priority); + q.RemoveCurrent(); + } + + q.IsEmpty().ShouldBeTrue(); + } + + [Fact] // T:1365 + public void WaitQueuePopAndRequeue_ShouldSucceed() + { + var q = new WaitQueue(max: 100); + q.AddPrioritized(new WaitingRequest { Reply = "1a", N = 2, PriorityGroup = new PriorityGroup { Priority = 1 } }) + .ShouldBeTrue(); + q.AddPrioritized(new WaitingRequest { Reply = "1b", N = 1, PriorityGroup = new PriorityGroup { Priority = 1 } }) + .ShouldBeTrue(); + q.AddPrioritized(new WaitingRequest { Reply = "2a", N = 3, PriorityGroup = new PriorityGroup { Priority = 2 } }) + .ShouldBeTrue(); + + var wr = q.PopAndRequeue(); + wr.ShouldNotBeNull(); + wr!.Reply.ShouldBe("1a"); + wr.N.ShouldBe(1); + q.Len.ShouldBe(3); + + wr = q.PopAndRequeue(); + wr.ShouldNotBeNull(); + wr!.Reply.ShouldBe("1b"); + wr.N.ShouldBe(0); + q.Len.ShouldBe(2); + + wr = q.PopAndRequeue(); + wr.ShouldNotBeNull(); + wr!.Reply.ShouldBe("1a"); + wr.N.ShouldBe(0); + q.Len.ShouldBe(1); + + q.Peek()!.Reply.ShouldBe("2a"); + q.Peek()!.N.ShouldBe(3); + } } diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/WaitQueueTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/WaitQueueTests.cs index 6c12bb9..215360b 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/WaitQueueTests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/WaitQueueTests.cs @@ -24,8 +24,28 @@ public sealed class WaitQueueTests q.Peek()!.Subject.ShouldBe("A"); q.Pop()!.Subject.ShouldBe("A"); + q.Pop()!.Subject.ShouldBe("B"); + q.Len.ShouldBe(1); + q.Pop()!.Subject.ShouldBe("B"); q.Len.ShouldBe(0); q.IsFull(1).ShouldBeFalse(); } + + [Fact] + public void AddPrioritized_AndCycle_ShouldPreserveStableOrder() + { + var q = new WaitQueue(max: 10); + + q.AddPrioritized(new WaitingRequest { Reply = "2a", N = 1, PriorityGroup = new PriorityGroup { Priority = 2 } }) + .ShouldBeTrue(); + q.AddPrioritized(new WaitingRequest { Reply = "1a", N = 1, PriorityGroup = new PriorityGroup { Priority = 1 } }) + .ShouldBeTrue(); + q.AddPrioritized(new WaitingRequest { Reply = "1b", N = 1, PriorityGroup = new PriorityGroup { Priority = 1 } }) + .ShouldBeTrue(); + + q.Peek()!.Reply.ShouldBe("1a"); + q.Cycle(); + q.Peek()!.Reply.ShouldBe("1b"); + } } diff --git a/porting.db b/porting.db index 9d9fd66..3e0840d 100644 Binary files a/porting.db and b/porting.db differ diff --git a/reports/current.md b/reports/current.md index 5239520..3fab8dd 100644 --- a/reports/current.md +++ b/reports/current.md @@ -1,6 +1,6 @@ # NATS .NET Porting Status Report -Generated: 2026-02-27 13:56:27 UTC +Generated: 2026-02-27 14:58:38 UTC ## Modules (12 total) @@ -12,17 +12,17 @@ Generated: 2026-02-27 13:56:27 UTC | Status | Count | |--------|-------| -| deferred | 2461 | +| deferred | 2440 | | n_a | 18 | -| verified | 1194 | +| verified | 1215 | ## Unit Tests (3257 total) | Status | Count | |--------|-------| -| deferred | 2662 | +| deferred | 2660 | | n_a | 187 | -| verified | 408 | +| verified | 410 | ## Library Mappings (36 total) @@ -33,4 +33,4 @@ Generated: 2026-02-27 13:56:27 UTC ## Overall Progress -**1819/6942 items complete (26.2%)** +**1842/6942 items complete (26.5%)** diff --git a/reports/report_8849265.md b/reports/report_8849265.md new file mode 100644 index 0000000..3fab8dd --- /dev/null +++ b/reports/report_8849265.md @@ -0,0 +1,36 @@ +# NATS .NET Porting Status Report + +Generated: 2026-02-27 14:58:38 UTC + +## Modules (12 total) + +| Status | Count | +|--------|-------| +| verified | 12 | + +## Features (3673 total) + +| Status | Count | +|--------|-------| +| deferred | 2440 | +| n_a | 18 | +| verified | 1215 | + +## Unit Tests (3257 total) + +| Status | Count | +|--------|-------| +| deferred | 2660 | +| n_a | 187 | +| verified | 410 | + +## Library Mappings (36 total) + +| Status | Count | +|--------|-------| +| mapped | 36 | + + +## Overall Progress + +**1842/6942 items complete (26.5%)**