Implement deferred WaitQueue, DiskAvailability, and NoOpCache behavior with tests

This commit is contained in:
Joseph Doherty
2026-02-27 09:58:37 -05:00
parent 8849265780
commit a660e38575
11 changed files with 508 additions and 25 deletions

View File

@@ -153,15 +153,105 @@ public interface IOcspResponseCache
void Remove(string key);
}
/// <summary>
/// Runtime counters for OCSP response cache behavior.
/// Mirrors Go <c>OCSPResponseCacheStats</c> shape.
/// </summary>
public sealed class OcspResponseCacheStats
{
public long Responses { get; set; }
public long Hits { get; set; }
public long Misses { get; set; }
public long Revokes { get; set; }
public long Goods { get; set; }
public long Unknowns { get; set; }
}
/// <summary>
/// A no-op OCSP cache that never stores anything.
/// Mirrors Go <c>NoOpCache</c> in server/ocsp_responsecache.go.
/// </summary>
internal sealed class NoOpCache : IOcspResponseCache
{
public byte[]? Get(string key) => null;
public void Put(string key, byte[] response) { }
public void Remove(string key) { }
private readonly Lock _mu = new();
private readonly OcspResponseCacheConfig _config;
private OcspResponseCacheStats? _stats;
private bool _online;
public NoOpCache()
: this(new OcspResponseCacheConfig { Type = "none" })
{
}
public NoOpCache(OcspResponseCacheConfig config)
{
_config = config;
}
public byte[]? Get(string key) => null;
public void Put(string key, byte[] response) { }
public void Remove(string key) => Delete(key);
public void Delete(string key)
{
_ = key;
}
public void Start(NatsServer? server = null)
{
lock (_mu)
{
_stats = new OcspResponseCacheStats();
_online = true;
}
}
public void Stop(NatsServer? server = null)
{
lock (_mu)
{
_online = false;
}
}
public bool Online()
{
lock (_mu)
{
return _online;
}
}
public string Type() => "none";
public OcspResponseCacheConfig Config()
{
lock (_mu)
{
return _config;
}
}
public OcspResponseCacheStats? Stats()
{
lock (_mu)
{
if (_stats is null)
return null;
return new OcspResponseCacheStats
{
Responses = _stats.Responses,
Hits = _stats.Hits,
Misses = _stats.Misses,
Revokes = _stats.Revokes,
Goods = _stats.Goods,
Unknowns = _stats.Unknowns,
};
}
}
}
/// <summary>

View File

@@ -24,8 +24,27 @@ namespace ZB.MOM.NatsNet.Server;
/// <summary>Stub: stored message type — full definition in session 20.</summary>
public sealed class StoredMsg { }
/// <summary>Priority group for pull consumers — full definition in session 20.</summary>
public sealed class PriorityGroup { }
/// <summary>
/// Priority group for pull consumers.
/// Mirrors <c>PriorityGroup</c> in server/consumer.go.
/// </summary>
public sealed class PriorityGroup
{
[JsonPropertyName("group")]
public string Group { get; set; } = string.Empty;
[JsonPropertyName("min_pending")]
public long MinPending { get; set; }
[JsonPropertyName("min_ack_pending")]
public long MinAckPending { get; set; }
[JsonPropertyName("id")]
public string Id { get; set; } = string.Empty;
[JsonPropertyName("priority")]
public int Priority { get; set; }
}
// ---------------------------------------------------------------------------
// API subject constants

View File

@@ -970,20 +970,21 @@ public static class DiskAvailability
private const long JetStreamMaxStoreDefault = 1L * 1024 * 1024 * 1024 * 1024;
/// <summary>
/// Returns approximately 75% of available disk space at <paramref name="path"/>.
/// Returns <see cref="JetStreamMaxStoreDefault"/> (1 TB) if the check fails.
/// Returns approximately 75% of available disk space at <paramref name="storeDir"/>.
/// Ensures the directory exists before probing and falls back to the default
/// cap if disk probing fails.
/// </summary>
public static long Available(string path)
public static long DiskAvailable(string storeDir)
{
// TODO: session 17 — implement via DriveInfo or P/Invoke statvfs on non-Windows.
try
{
var drive = new DriveInfo(Path.GetPathRoot(Path.GetFullPath(path)) ?? path);
if (!string.IsNullOrWhiteSpace(storeDir))
Directory.CreateDirectory(storeDir);
var root = Path.GetPathRoot(Path.GetFullPath(storeDir));
var drive = new DriveInfo(root ?? storeDir);
if (drive.IsReady)
{
// Estimate 75% of available free space, matching Go behaviour.
return drive.AvailableFreeSpace / 4 * 3;
}
}
catch
{
@@ -993,8 +994,14 @@ public static class DiskAvailability
return JetStreamMaxStoreDefault;
}
/// <summary>
/// Returns approximately 75% of available disk space at <paramref name="path"/>.
/// Returns <see cref="JetStreamMaxStoreDefault"/> (1 TB) if the check fails.
/// </summary>
public static long Available(string path) => DiskAvailable(path);
/// <summary>
/// Returns true if at least <paramref name="needed"/> bytes are available at <paramref name="path"/>.
/// </summary>
public static bool Check(string path, long needed) => Available(path) >= needed;
public static bool Check(string path, long needed) => DiskAvailable(path) >= needed;
}

View File

@@ -409,6 +409,9 @@ public sealed class WaitingRequest
/// <summary>Bytes accumulated so far.</summary>
public int B { get; set; }
/// <summary>Optional pull request priority group metadata.</summary>
public PriorityGroup? PriorityGroup { get; set; }
}
/// <summary>
@@ -418,9 +421,15 @@ public sealed class WaitingRequest
public sealed class WaitQueue
{
private readonly List<WaitingRequest> _reqs = new();
private readonly int _max;
private int _head;
private int _tail;
public WaitQueue(int max = 0)
{
_max = max;
}
/// <summary>Number of pending requests in the queue.</summary>
public int Len => _tail - _head;
@@ -432,6 +441,43 @@ public sealed class WaitQueue
_tail++;
}
/// <summary>
/// Add a waiting request ordered by priority while preserving FIFO order
/// within each priority level.
/// </summary>
public bool AddPrioritized(WaitingRequest req)
{
ArgumentNullException.ThrowIfNull(req);
if (IsFull(_max))
return false;
InsertSorted(req);
return true;
}
/// <summary>Insert a request in priority order (lower number = higher priority).</summary>
public void InsertSorted(WaitingRequest req)
{
ArgumentNullException.ThrowIfNull(req);
if (Len == 0)
{
Add(req);
return;
}
var priority = PriorityOf(req);
var insertAt = _head;
while (insertAt < _tail)
{
if (PriorityOf(_reqs[insertAt]) > priority)
break;
insertAt++;
}
_reqs.Insert(insertAt, req);
_tail++;
}
/// <summary>Peek at the head request without removing it.</summary>
public WaitingRequest? Peek()
{
@@ -443,13 +489,123 @@ public sealed class WaitQueue
/// <summary>Remove and return the head request.</summary>
public WaitingRequest? Pop()
{
if (Len == 0)
var wr = Peek();
if (wr is null)
return null;
var req = _reqs[_head++];
wr.D++;
wr.N--;
if (wr.N > 0 && Len > 1)
{
RemoveCurrent();
Add(wr);
}
else if (wr.N <= 0)
{
RemoveCurrent();
}
return wr;
}
/// <summary>Returns true if the queue contains no active requests.</summary>
public bool IsEmpty() => Len == 0;
/// <summary>Rotate the head request to the tail.</summary>
public void Cycle()
{
var wr = Peek();
if (wr is null)
return;
RemoveCurrent();
Add(wr);
}
/// <summary>Pop strategy used by pull consumers based on priority policy.</summary>
public WaitingRequest? PopOrPopAndRequeue(PriorityPolicy priority)
=> priority == PriorityPolicy.PriorityPrioritized ? PopAndRequeue() : Pop();
/// <summary>
/// Pop and requeue to the end of the same priority band while preserving
/// stable order within that band.
/// </summary>
public WaitingRequest? PopAndRequeue()
{
var wr = Peek();
if (wr is null)
return null;
wr.D++;
wr.N--;
if (wr.N > 0 && Len > 1)
{
// Remove the current head and insert it back in priority order.
_reqs.RemoveAt(_head);
_tail--;
InsertSorted(wr);
}
else if (wr.N <= 0)
{
RemoveCurrent();
}
return wr;
}
/// <summary>Remove the current head request from the queue.</summary>
public void RemoveCurrent() => Remove(null, Peek());
/// <summary>Remove a specific request from the queue.</summary>
public void Remove(WaitingRequest? pre, WaitingRequest? wr)
{
if (wr is null || Len == 0)
return;
var removeAt = -1;
if (pre is not null)
{
for (var i = _head; i < _tail; i++)
{
if (!ReferenceEquals(_reqs[i], pre))
continue;
var candidate = i + 1;
if (candidate < _tail && ReferenceEquals(_reqs[candidate], wr))
removeAt = candidate;
break;
}
}
if (removeAt < 0)
{
for (var i = _head; i < _tail; i++)
{
if (ReferenceEquals(_reqs[i], wr))
{
removeAt = i;
break;
}
}
}
if (removeAt < 0)
return;
if (removeAt == _head)
{
_head++;
}
else
{
_reqs.RemoveAt(removeAt);
_tail--;
}
if (_head > 32 && _head * 2 >= _tail)
Compress();
return req;
}
/// <summary>Compact the internal backing list to reclaim removed slots.</summary>
@@ -470,6 +626,8 @@ public sealed class WaitQueue
return false;
return Len >= max;
}
private static int PriorityOf(WaitingRequest req) => req.PriorityGroup?.Priority ?? int.MaxValue;
}
/// <summary>