Compare commits
5 Commits
codex/stub
...
codex/defe
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b94a67be6e | ||
|
|
c0aaae9236 | ||
|
|
4e96fb2ba8 | ||
|
|
ae0a553ab8 | ||
|
|
a660e38575 |
@@ -153,15 +153,105 @@ public interface IOcspResponseCache
|
||||
void Remove(string key);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Runtime counters for OCSP response cache behavior.
|
||||
/// Mirrors Go <c>OCSPResponseCacheStats</c> shape.
|
||||
/// </summary>
|
||||
public sealed class OcspResponseCacheStats
|
||||
{
|
||||
public long Responses { get; set; }
|
||||
public long Hits { get; set; }
|
||||
public long Misses { get; set; }
|
||||
public long Revokes { get; set; }
|
||||
public long Goods { get; set; }
|
||||
public long Unknowns { get; set; }
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// A no-op OCSP cache that never stores anything.
|
||||
/// Mirrors Go <c>NoOpCache</c> in server/ocsp_responsecache.go.
|
||||
/// </summary>
|
||||
internal sealed class NoOpCache : IOcspResponseCache
|
||||
{
|
||||
public byte[]? Get(string key) => null;
|
||||
public void Put(string key, byte[] response) { }
|
||||
public void Remove(string key) { }
|
||||
private readonly Lock _mu = new();
|
||||
private readonly OcspResponseCacheConfig _config;
|
||||
private OcspResponseCacheStats? _stats;
|
||||
private bool _online;
|
||||
|
||||
public NoOpCache()
|
||||
: this(new OcspResponseCacheConfig { Type = "none" })
|
||||
{
|
||||
}
|
||||
|
||||
public NoOpCache(OcspResponseCacheConfig config)
|
||||
{
|
||||
_config = config;
|
||||
}
|
||||
|
||||
public byte[]? Get(string key) => null;
|
||||
|
||||
public void Put(string key, byte[] response) { }
|
||||
|
||||
public void Remove(string key) => Delete(key);
|
||||
|
||||
public void Delete(string key)
|
||||
{
|
||||
_ = key;
|
||||
}
|
||||
|
||||
public void Start(NatsServer? server = null)
|
||||
{
|
||||
lock (_mu)
|
||||
{
|
||||
_stats = new OcspResponseCacheStats();
|
||||
_online = true;
|
||||
}
|
||||
}
|
||||
|
||||
public void Stop(NatsServer? server = null)
|
||||
{
|
||||
lock (_mu)
|
||||
{
|
||||
_online = false;
|
||||
}
|
||||
}
|
||||
|
||||
public bool Online()
|
||||
{
|
||||
lock (_mu)
|
||||
{
|
||||
return _online;
|
||||
}
|
||||
}
|
||||
|
||||
public string Type() => "none";
|
||||
|
||||
public OcspResponseCacheConfig Config()
|
||||
{
|
||||
lock (_mu)
|
||||
{
|
||||
return _config;
|
||||
}
|
||||
}
|
||||
|
||||
public OcspResponseCacheStats? Stats()
|
||||
{
|
||||
lock (_mu)
|
||||
{
|
||||
if (_stats is null)
|
||||
return null;
|
||||
|
||||
return new OcspResponseCacheStats
|
||||
{
|
||||
Responses = _stats.Responses,
|
||||
Hits = _stats.Hits,
|
||||
Misses = _stats.Misses,
|
||||
Revokes = _stats.Revokes,
|
||||
Goods = _stats.Goods,
|
||||
Unknowns = _stats.Unknowns,
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
|
||||
@@ -25,6 +25,18 @@ public static class AccessTimeService
|
||||
// Mirror Go's init(): nothing to pre-allocate in .NET.
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Explicit init hook for Go parity.
|
||||
/// Mirrors package <c>init()</c> in server/ats/ats.go.
|
||||
/// This method is intentionally idempotent.
|
||||
/// </summary>
|
||||
public static void Init()
|
||||
{
|
||||
// Ensure a non-zero cached timestamp is present.
|
||||
var now = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() * 1_000_000L;
|
||||
Interlocked.CompareExchange(ref _utime, now, 0);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Registers a user. Starts the background timer when the first registrant calls this.
|
||||
/// Each call to <see cref="Register"/> must be paired with a call to <see cref="Unregister"/>.
|
||||
|
||||
@@ -40,6 +40,24 @@ public sealed class IpQueue<T>
|
||||
/// <summary>Default maximum size of the recycled backing-list capacity.</summary>
|
||||
public const int DefaultMaxRecycleSize = 4 * 1024;
|
||||
|
||||
/// <summary>
|
||||
/// Functional option type used by <see cref="NewIPQueue"/>.
|
||||
/// Mirrors Go <c>ipQueueOpt</c>.
|
||||
/// </summary>
|
||||
public delegate void IpQueueOption(IpQueueOptions options);
|
||||
|
||||
/// <summary>
|
||||
/// Option bag used by <see cref="NewIPQueue"/>.
|
||||
/// Mirrors Go <c>ipQueueOpts</c>.
|
||||
/// </summary>
|
||||
public sealed class IpQueueOptions
|
||||
{
|
||||
public int MaxRecycleSize { get; set; } = DefaultMaxRecycleSize;
|
||||
public Func<T, ulong>? SizeCalc { get; set; }
|
||||
public ulong MaxSize { get; set; }
|
||||
public int MaxLen { get; set; }
|
||||
}
|
||||
|
||||
private long _inprogress;
|
||||
private readonly object _lock = new();
|
||||
|
||||
@@ -68,6 +86,56 @@ public sealed class IpQueue<T>
|
||||
/// <summary>Notification channel reader — wait on this to learn items were added.</summary>
|
||||
public ChannelReader<bool> Ch => _ch.Reader;
|
||||
|
||||
/// <summary>
|
||||
/// Option helper that configures maximum recycled backing-list size.
|
||||
/// Mirrors Go <c>ipqMaxRecycleSize</c>.
|
||||
/// </summary>
|
||||
public static IpQueueOption IpqMaxRecycleSize(int max) =>
|
||||
options => options.MaxRecycleSize = max;
|
||||
|
||||
/// <summary>
|
||||
/// Option helper that enables size accounting for queue elements.
|
||||
/// Mirrors Go <c>ipqSizeCalculation</c>.
|
||||
/// </summary>
|
||||
public static IpQueueOption IpqSizeCalculation(Func<T, ulong> calc) =>
|
||||
options => options.SizeCalc = calc;
|
||||
|
||||
/// <summary>
|
||||
/// Option helper that limits queue pushes by total accounted size.
|
||||
/// Mirrors Go <c>ipqLimitBySize</c>.
|
||||
/// </summary>
|
||||
public static IpQueueOption IpqLimitBySize(ulong max) =>
|
||||
options => options.MaxSize = max;
|
||||
|
||||
/// <summary>
|
||||
/// Option helper that limits queue pushes by element count.
|
||||
/// Mirrors Go <c>ipqLimitByLen</c>.
|
||||
/// </summary>
|
||||
public static IpQueueOption IpqLimitByLen(int max) =>
|
||||
options => options.MaxLen = max;
|
||||
|
||||
/// <summary>
|
||||
/// Factory wrapper for Go parity.
|
||||
/// Mirrors <c>newIPQueue</c>.
|
||||
/// </summary>
|
||||
public static IpQueue<T> NewIPQueue(
|
||||
string name,
|
||||
ConcurrentDictionary<string, object>? registry = null,
|
||||
params IpQueueOption[] options)
|
||||
{
|
||||
var opts = new IpQueueOptions();
|
||||
foreach (var option in options)
|
||||
option(opts);
|
||||
|
||||
return new IpQueue<T>(
|
||||
name,
|
||||
registry,
|
||||
opts.MaxRecycleSize,
|
||||
opts.SizeCalc,
|
||||
opts.MaxSize,
|
||||
opts.MaxLen);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Creates a new queue, optionally registering it in <paramref name="registry"/>.
|
||||
/// Mirrors <c>newIPQueue</c>.
|
||||
|
||||
@@ -38,6 +38,12 @@ public sealed class RateCounter
|
||||
Interval = TimeSpan.FromSeconds(1);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Factory wrapper for Go parity.
|
||||
/// Mirrors <c>newRateCounter</c>.
|
||||
/// </summary>
|
||||
public static RateCounter NewRateCounter(long limit) => new(limit);
|
||||
|
||||
/// <summary>
|
||||
/// Returns true if the event is within the rate limit for the current window.
|
||||
/// Mirrors <c>rateCounter.allow</c>.
|
||||
|
||||
@@ -14,6 +14,8 @@
|
||||
// Adapted from server/util.go in the NATS server Go source.
|
||||
|
||||
using System.Net;
|
||||
using System.Text;
|
||||
using System.Text.Json;
|
||||
using System.Text.RegularExpressions;
|
||||
|
||||
namespace ZB.MOM.NatsNet.Server.Internal;
|
||||
@@ -268,6 +270,25 @@ public static class ServerUtilities
|
||||
return client;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Parity wrapper for Go <c>natsDialTimeout</c>.
|
||||
/// Accepts a network label (tcp/tcp4/tcp6) and host:port address.
|
||||
/// </summary>
|
||||
public static Task<System.Net.Sockets.TcpClient> NatsDialTimeout(
|
||||
string network, string address, TimeSpan timeout)
|
||||
{
|
||||
if (!string.Equals(network, "tcp", StringComparison.OrdinalIgnoreCase) &&
|
||||
!string.Equals(network, "tcp4", StringComparison.OrdinalIgnoreCase) &&
|
||||
!string.Equals(network, "tcp6", StringComparison.OrdinalIgnoreCase))
|
||||
throw new NotSupportedException($"unsupported network: {network}");
|
||||
|
||||
var (host, port, err) = ParseHostPort(address, defaultPort: 0);
|
||||
if (err != null || port <= 0)
|
||||
throw new InvalidOperationException($"invalid dial address: {address}", err);
|
||||
|
||||
return NatsDialTimeoutAsync(host, port, timeout);
|
||||
}
|
||||
|
||||
// -------------------------------------------------------------------------
|
||||
// URL redaction
|
||||
// -------------------------------------------------------------------------
|
||||
@@ -337,6 +358,54 @@ public static class ServerUtilities
|
||||
return result;
|
||||
}
|
||||
|
||||
// -------------------------------------------------------------------------
|
||||
// RefCountedUrlSet wrappers (Go parity mapping)
|
||||
// -------------------------------------------------------------------------
|
||||
|
||||
/// <summary>
|
||||
/// Parity wrapper for <see cref="RefCountedUrlSet.AddUrl"/>.
|
||||
/// Mirrors <c>refCountedUrlSet.addUrl</c>.
|
||||
/// </summary>
|
||||
public static bool AddUrl(RefCountedUrlSet urlSet, string urlStr)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(urlSet);
|
||||
return urlSet.AddUrl(urlStr);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Parity wrapper for <see cref="RefCountedUrlSet.RemoveUrl"/>.
|
||||
/// Mirrors <c>refCountedUrlSet.removeUrl</c>.
|
||||
/// </summary>
|
||||
public static bool RemoveUrl(RefCountedUrlSet urlSet, string urlStr)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(urlSet);
|
||||
return urlSet.RemoveUrl(urlStr);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Parity wrapper for <see cref="RefCountedUrlSet.GetAsStringSlice"/>.
|
||||
/// Mirrors <c>refCountedUrlSet.getAsStringSlice</c>.
|
||||
/// </summary>
|
||||
public static string[] GetAsStringSlice(RefCountedUrlSet urlSet)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(urlSet);
|
||||
return urlSet.GetAsStringSlice();
|
||||
}
|
||||
|
||||
// -------------------------------------------------------------------------
|
||||
// INFO helpers
|
||||
// -------------------------------------------------------------------------
|
||||
|
||||
/// <summary>
|
||||
/// Serialises <paramref name="info"/> into an INFO line (<c>INFO {...}\r\n</c>).
|
||||
/// Mirrors <c>generateInfoJSON</c>.
|
||||
/// </summary>
|
||||
public static byte[] GenerateInfoJSON(global::ZB.MOM.NatsNet.Server.ServerInfo info)
|
||||
{
|
||||
var json = JsonSerializer.Serialize(info);
|
||||
return Encoding.UTF8.GetBytes($"INFO {json}\r\n");
|
||||
}
|
||||
|
||||
// -------------------------------------------------------------------------
|
||||
// Copy helpers
|
||||
// -------------------------------------------------------------------------
|
||||
@@ -391,6 +460,13 @@ public static class ServerUtilities
|
||||
|
||||
return channel.Writer;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Parity wrapper for <see cref="CreateParallelTaskQueue"/>.
|
||||
/// Mirrors <c>parallelTaskQueue</c>.
|
||||
/// </summary>
|
||||
public static System.Threading.Channels.ChannelWriter<Action> ParallelTaskQueue(int maxParallelism = 0) =>
|
||||
CreateParallelTaskQueue(maxParallelism);
|
||||
}
|
||||
|
||||
// -------------------------------------------------------------------------
|
||||
|
||||
@@ -187,6 +187,12 @@ public static class SignalHandler
|
||||
_ => throw new ArgumentOutOfRangeException(nameof(command), $"unknown signal \"{CommandToString(command)}\""),
|
||||
};
|
||||
|
||||
/// <summary>
|
||||
/// Go parity alias for <see cref="CommandToUnixSignal"/>.
|
||||
/// Mirrors <c>CommandToSignal</c> in signal.go.
|
||||
/// </summary>
|
||||
public static UnixSignal CommandToSignal(ServerCommand command) => CommandToUnixSignal(command);
|
||||
|
||||
private static Exception? SendSignal(int pid, UnixSignal signal)
|
||||
{
|
||||
try
|
||||
|
||||
@@ -24,8 +24,27 @@ namespace ZB.MOM.NatsNet.Server;
|
||||
/// <summary>Stub: stored message type — full definition in session 20.</summary>
|
||||
public sealed class StoredMsg { }
|
||||
|
||||
/// <summary>Priority group for pull consumers — full definition in session 20.</summary>
|
||||
public sealed class PriorityGroup { }
|
||||
/// <summary>
|
||||
/// Priority group for pull consumers.
|
||||
/// Mirrors <c>PriorityGroup</c> in server/consumer.go.
|
||||
/// </summary>
|
||||
public sealed class PriorityGroup
|
||||
{
|
||||
[JsonPropertyName("group")]
|
||||
public string Group { get; set; } = string.Empty;
|
||||
|
||||
[JsonPropertyName("min_pending")]
|
||||
public long MinPending { get; set; }
|
||||
|
||||
[JsonPropertyName("min_ack_pending")]
|
||||
public long MinAckPending { get; set; }
|
||||
|
||||
[JsonPropertyName("id")]
|
||||
public string Id { get; set; } = string.Empty;
|
||||
|
||||
[JsonPropertyName("priority")]
|
||||
public int Priority { get; set; }
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// API subject constants
|
||||
|
||||
@@ -970,20 +970,21 @@ public static class DiskAvailability
|
||||
private const long JetStreamMaxStoreDefault = 1L * 1024 * 1024 * 1024 * 1024;
|
||||
|
||||
/// <summary>
|
||||
/// Returns approximately 75% of available disk space at <paramref name="path"/>.
|
||||
/// Returns <see cref="JetStreamMaxStoreDefault"/> (1 TB) if the check fails.
|
||||
/// Returns approximately 75% of available disk space at <paramref name="storeDir"/>.
|
||||
/// Ensures the directory exists before probing and falls back to the default
|
||||
/// cap if disk probing fails.
|
||||
/// </summary>
|
||||
public static long Available(string path)
|
||||
public static long DiskAvailable(string storeDir)
|
||||
{
|
||||
// TODO: session 17 — implement via DriveInfo or P/Invoke statvfs on non-Windows.
|
||||
try
|
||||
{
|
||||
var drive = new DriveInfo(Path.GetPathRoot(Path.GetFullPath(path)) ?? path);
|
||||
if (!string.IsNullOrWhiteSpace(storeDir))
|
||||
Directory.CreateDirectory(storeDir);
|
||||
|
||||
var root = Path.GetPathRoot(Path.GetFullPath(storeDir));
|
||||
var drive = new DriveInfo(root ?? storeDir);
|
||||
if (drive.IsReady)
|
||||
{
|
||||
// Estimate 75% of available free space, matching Go behaviour.
|
||||
return drive.AvailableFreeSpace / 4 * 3;
|
||||
}
|
||||
}
|
||||
catch
|
||||
{
|
||||
@@ -993,8 +994,14 @@ public static class DiskAvailability
|
||||
return JetStreamMaxStoreDefault;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Returns approximately 75% of available disk space at <paramref name="path"/>.
|
||||
/// Returns <see cref="JetStreamMaxStoreDefault"/> (1 TB) if the check fails.
|
||||
/// </summary>
|
||||
public static long Available(string path) => DiskAvailable(path);
|
||||
|
||||
/// <summary>
|
||||
/// Returns true if at least <paramref name="needed"/> bytes are available at <paramref name="path"/>.
|
||||
/// </summary>
|
||||
public static bool Check(string path, long needed) => Available(path) >= needed;
|
||||
public static bool Check(string path, long needed) => DiskAvailable(path) >= needed;
|
||||
}
|
||||
|
||||
@@ -409,6 +409,9 @@ public sealed class WaitingRequest
|
||||
|
||||
/// <summary>Bytes accumulated so far.</summary>
|
||||
public int B { get; set; }
|
||||
|
||||
/// <summary>Optional pull request priority group metadata.</summary>
|
||||
public PriorityGroup? PriorityGroup { get; set; }
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
@@ -418,9 +421,15 @@ public sealed class WaitingRequest
|
||||
public sealed class WaitQueue
|
||||
{
|
||||
private readonly List<WaitingRequest> _reqs = new();
|
||||
private readonly int _max;
|
||||
private int _head;
|
||||
private int _tail;
|
||||
|
||||
public WaitQueue(int max = 0)
|
||||
{
|
||||
_max = max;
|
||||
}
|
||||
|
||||
/// <summary>Number of pending requests in the queue.</summary>
|
||||
public int Len => _tail - _head;
|
||||
|
||||
@@ -432,6 +441,43 @@ public sealed class WaitQueue
|
||||
_tail++;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Add a waiting request ordered by priority while preserving FIFO order
|
||||
/// within each priority level.
|
||||
/// </summary>
|
||||
public bool AddPrioritized(WaitingRequest req)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(req);
|
||||
if (IsFull(_max))
|
||||
return false;
|
||||
InsertSorted(req);
|
||||
return true;
|
||||
}
|
||||
|
||||
/// <summary>Insert a request in priority order (lower number = higher priority).</summary>
|
||||
public void InsertSorted(WaitingRequest req)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(req);
|
||||
|
||||
if (Len == 0)
|
||||
{
|
||||
Add(req);
|
||||
return;
|
||||
}
|
||||
|
||||
var priority = PriorityOf(req);
|
||||
var insertAt = _head;
|
||||
while (insertAt < _tail)
|
||||
{
|
||||
if (PriorityOf(_reqs[insertAt]) > priority)
|
||||
break;
|
||||
insertAt++;
|
||||
}
|
||||
|
||||
_reqs.Insert(insertAt, req);
|
||||
_tail++;
|
||||
}
|
||||
|
||||
/// <summary>Peek at the head request without removing it.</summary>
|
||||
public WaitingRequest? Peek()
|
||||
{
|
||||
@@ -443,13 +489,123 @@ public sealed class WaitQueue
|
||||
/// <summary>Remove and return the head request.</summary>
|
||||
public WaitingRequest? Pop()
|
||||
{
|
||||
if (Len == 0)
|
||||
var wr = Peek();
|
||||
if (wr is null)
|
||||
return null;
|
||||
|
||||
var req = _reqs[_head++];
|
||||
wr.D++;
|
||||
wr.N--;
|
||||
if (wr.N > 0 && Len > 1)
|
||||
{
|
||||
RemoveCurrent();
|
||||
Add(wr);
|
||||
}
|
||||
else if (wr.N <= 0)
|
||||
{
|
||||
RemoveCurrent();
|
||||
}
|
||||
|
||||
return wr;
|
||||
}
|
||||
|
||||
/// <summary>Returns true if the queue contains no active requests.</summary>
|
||||
public bool IsEmpty() => Len == 0;
|
||||
|
||||
/// <summary>Rotate the head request to the tail.</summary>
|
||||
public void Cycle()
|
||||
{
|
||||
var wr = Peek();
|
||||
if (wr is null)
|
||||
return;
|
||||
|
||||
RemoveCurrent();
|
||||
Add(wr);
|
||||
}
|
||||
|
||||
/// <summary>Pop strategy used by pull consumers based on priority policy.</summary>
|
||||
public WaitingRequest? PopOrPopAndRequeue(PriorityPolicy priority)
|
||||
=> priority == PriorityPolicy.PriorityPrioritized ? PopAndRequeue() : Pop();
|
||||
|
||||
/// <summary>
|
||||
/// Pop and requeue to the end of the same priority band while preserving
|
||||
/// stable order within that band.
|
||||
/// </summary>
|
||||
public WaitingRequest? PopAndRequeue()
|
||||
{
|
||||
var wr = Peek();
|
||||
if (wr is null)
|
||||
return null;
|
||||
|
||||
wr.D++;
|
||||
wr.N--;
|
||||
|
||||
if (wr.N > 0 && Len > 1)
|
||||
{
|
||||
// Remove the current head and insert it back in priority order.
|
||||
_reqs.RemoveAt(_head);
|
||||
_tail--;
|
||||
InsertSorted(wr);
|
||||
}
|
||||
else if (wr.N <= 0)
|
||||
{
|
||||
RemoveCurrent();
|
||||
}
|
||||
|
||||
return wr;
|
||||
}
|
||||
|
||||
/// <summary>Remove the current head request from the queue.</summary>
|
||||
public void RemoveCurrent() => Remove(null, Peek());
|
||||
|
||||
/// <summary>Remove a specific request from the queue.</summary>
|
||||
public void Remove(WaitingRequest? pre, WaitingRequest? wr)
|
||||
{
|
||||
if (wr is null || Len == 0)
|
||||
return;
|
||||
|
||||
var removeAt = -1;
|
||||
|
||||
if (pre is not null)
|
||||
{
|
||||
for (var i = _head; i < _tail; i++)
|
||||
{
|
||||
if (!ReferenceEquals(_reqs[i], pre))
|
||||
continue;
|
||||
|
||||
var candidate = i + 1;
|
||||
if (candidate < _tail && ReferenceEquals(_reqs[candidate], wr))
|
||||
removeAt = candidate;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (removeAt < 0)
|
||||
{
|
||||
for (var i = _head; i < _tail; i++)
|
||||
{
|
||||
if (ReferenceEquals(_reqs[i], wr))
|
||||
{
|
||||
removeAt = i;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (removeAt < 0)
|
||||
return;
|
||||
|
||||
if (removeAt == _head)
|
||||
{
|
||||
_head++;
|
||||
}
|
||||
else
|
||||
{
|
||||
_reqs.RemoveAt(removeAt);
|
||||
_tail--;
|
||||
}
|
||||
|
||||
if (_head > 32 && _head * 2 >= _tail)
|
||||
Compress();
|
||||
return req;
|
||||
}
|
||||
|
||||
/// <summary>Compact the internal backing list to reclaim removed slots.</summary>
|
||||
@@ -470,6 +626,8 @@ public sealed class WaitQueue
|
||||
return false;
|
||||
return Len >= max;
|
||||
}
|
||||
|
||||
private static int PriorityOf(WaitingRequest req) => req.PriorityGroup?.Priority ?? int.MaxValue;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
|
||||
@@ -31,13 +31,30 @@ public sealed class OcspResponseCacheTests
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void NoOpCache_AndMonitor_ShouldNoOpSafely()
|
||||
public void NoOpCache_LifecycleAndStats_ShouldNoOpSafely()
|
||||
{
|
||||
var noOp = new NoOpCache();
|
||||
noOp.Online().ShouldBeFalse();
|
||||
noOp.Type().ShouldBe("none");
|
||||
noOp.Config().ShouldNotBeNull();
|
||||
noOp.Stats().ShouldBeNull();
|
||||
|
||||
noOp.Start();
|
||||
noOp.Online().ShouldBeTrue();
|
||||
noOp.Stats().ShouldNotBeNull();
|
||||
|
||||
noOp.Put("k", [5]);
|
||||
noOp.Get("k").ShouldBeNull();
|
||||
noOp.Remove("k");
|
||||
noOp.Remove("k"); // alias to Delete
|
||||
noOp.Delete("k");
|
||||
|
||||
noOp.Stop();
|
||||
noOp.Online().ShouldBeFalse();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void OcspMonitor_StartAndStop_ShouldLoadStaple()
|
||||
{
|
||||
var dir = Path.Combine(Path.GetTempPath(), $"ocsp-monitor-{Guid.NewGuid():N}");
|
||||
Directory.CreateDirectory(dir);
|
||||
try
|
||||
|
||||
@@ -77,4 +77,16 @@ public sealed class AccessTimeServiceTests : IDisposable
|
||||
// Mirror: TestUnbalancedUnregister
|
||||
Should.Throw<InvalidOperationException>(() => AccessTimeService.Unregister());
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Init_ShouldBeIdempotentAndNonThrowing()
|
||||
{
|
||||
Should.NotThrow(() => AccessTimeService.Init());
|
||||
var first = AccessTimeService.AccessTime();
|
||||
first.ShouldBeGreaterThan(0);
|
||||
|
||||
Should.NotThrow(() => AccessTimeService.Init());
|
||||
var second = AccessTimeService.AccessTime();
|
||||
second.ShouldBeGreaterThan(0);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -28,6 +28,62 @@ namespace ZB.MOM.NatsNet.Server.Tests.Internal;
|
||||
/// </summary>
|
||||
public sealed class IpQueueTests
|
||||
{
|
||||
[Fact]
|
||||
public void IpqMaxRecycleSize_ShouldAffectQueueConfig()
|
||||
{
|
||||
var q = IpQueue<int>.NewIPQueue("opt-max-recycle", null, IpQueue<int>.IpqMaxRecycleSize(123));
|
||||
q.MaxRecycleSize.ShouldBe(123);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void IpqSizeCalculation_AndLimitBySize_ShouldEnforceLimit()
|
||||
{
|
||||
var q = IpQueue<byte[]>.NewIPQueue(
|
||||
"opt-size-limit",
|
||||
null,
|
||||
IpQueue<byte[]>.IpqSizeCalculation(e => (ulong)e.Length),
|
||||
IpQueue<byte[]>.IpqLimitBySize(8));
|
||||
|
||||
var (_, err1) = q.Push(new byte[4]);
|
||||
err1.ShouldBeNull();
|
||||
|
||||
var (_, err2) = q.Push(new byte[4]);
|
||||
err2.ShouldBeNull();
|
||||
|
||||
var (_, err3) = q.Push(new byte[1]);
|
||||
err3.ShouldBeSameAs(IpQueueErrors.SizeLimitReached);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void IpqLimitByLen_ShouldEnforceLengthLimit()
|
||||
{
|
||||
var q = IpQueue<int>.NewIPQueue("opt-len-limit", null, IpQueue<int>.IpqLimitByLen(2));
|
||||
|
||||
q.Push(1).error.ShouldBeNull();
|
||||
q.Push(2).error.ShouldBeNull();
|
||||
q.Push(3).error.ShouldBeSameAs(IpQueueErrors.LenLimitReached);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void NewIPQueue_ShouldApplyOptionsAndRegister()
|
||||
{
|
||||
var registry = new ConcurrentDictionary<string, object>();
|
||||
var q = IpQueue<int>.NewIPQueue(
|
||||
"opt-factory",
|
||||
registry,
|
||||
IpQueue<int>.IpqMaxRecycleSize(55),
|
||||
IpQueue<int>.IpqLimitByLen(1));
|
||||
|
||||
q.MaxRecycleSize.ShouldBe(55);
|
||||
registry.TryGetValue("opt-factory", out var registered).ShouldBeTrue();
|
||||
registered.ShouldBeSameAs(q);
|
||||
|
||||
var (_, err1) = q.Push(1);
|
||||
err1.ShouldBeNull();
|
||||
var (_, err2) = q.Push(2);
|
||||
err2.ShouldBeSameAs(IpQueueErrors.LenLimitReached);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Basic_ShouldInitialiseCorrectly()
|
||||
{
|
||||
|
||||
@@ -22,6 +22,17 @@ namespace ZB.MOM.NatsNet.Server.Tests.Internal;
|
||||
/// </summary>
|
||||
public sealed class RateCounterTests
|
||||
{
|
||||
[Fact]
|
||||
public void NewRateCounter_ShouldCreateWithDefaultInterval()
|
||||
{
|
||||
var counter = RateCounter.NewRateCounter(2);
|
||||
counter.Interval.ShouldBe(TimeSpan.FromSeconds(1));
|
||||
|
||||
counter.Allow().ShouldBeTrue();
|
||||
counter.Allow().ShouldBeTrue();
|
||||
counter.Allow().ShouldBeFalse();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task RateCounter_ShouldAllowUpToLimitThenBlockAndReset()
|
||||
{
|
||||
|
||||
@@ -11,7 +11,10 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
using System.Net;
|
||||
using System.Text.Json;
|
||||
using Shouldly;
|
||||
using ZB.MOM.NatsNet.Server;
|
||||
using ZB.MOM.NatsNet.Server.Internal;
|
||||
|
||||
namespace ZB.MOM.NatsNet.Server.Tests.Internal;
|
||||
@@ -191,4 +194,86 @@ public sealed class ServerUtilitiesTests
|
||||
$"VersionAtLeast({version}, {major}, {minor}, {update})");
|
||||
}
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void RefCountedUrlSet_Wrappers_ShouldTrackRefCounts()
|
||||
{
|
||||
var set = new RefCountedUrlSet();
|
||||
ServerUtilities.AddUrl(set, "nats://a:4222").ShouldBeTrue();
|
||||
ServerUtilities.AddUrl(set, "nats://a:4222").ShouldBeFalse();
|
||||
ServerUtilities.AddUrl(set, "nats://b:4222").ShouldBeTrue();
|
||||
|
||||
ServerUtilities.RemoveUrl(set, "nats://a:4222").ShouldBeFalse();
|
||||
ServerUtilities.RemoveUrl(set, "nats://a:4222").ShouldBeTrue();
|
||||
|
||||
var urls = ServerUtilities.GetAsStringSlice(set);
|
||||
urls.Length.ShouldBe(1);
|
||||
urls[0].ShouldBe("nats://b:4222");
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task NatsDialTimeout_ShouldConnectWithinTimeout()
|
||||
{
|
||||
using var listener = new System.Net.Sockets.TcpListener(IPAddress.Loopback, 0);
|
||||
listener.Start();
|
||||
var port = ((IPEndPoint)listener.LocalEndpoint).Port;
|
||||
var acceptTask = listener.AcceptTcpClientAsync();
|
||||
|
||||
using var client = await ServerUtilities.NatsDialTimeout(
|
||||
"tcp",
|
||||
$"127.0.0.1:{port}",
|
||||
TimeSpan.FromSeconds(2));
|
||||
|
||||
client.Connected.ShouldBeTrue();
|
||||
using var accepted = await acceptTask;
|
||||
accepted.Connected.ShouldBeTrue();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void GenerateInfoJSON_ShouldEmitInfoLineWithCRLF()
|
||||
{
|
||||
var info = new ServerInfo
|
||||
{
|
||||
Id = "S1",
|
||||
Name = "n1",
|
||||
Host = "127.0.0.1",
|
||||
Port = 4222,
|
||||
Version = "2.0.0",
|
||||
Proto = 1,
|
||||
GoVersion = "go1.23",
|
||||
};
|
||||
|
||||
var bytes = ServerUtilities.GenerateInfoJSON(info);
|
||||
var line = System.Text.Encoding.UTF8.GetString(bytes);
|
||||
line.ShouldStartWith("INFO ");
|
||||
line.ShouldEndWith("\r\n");
|
||||
|
||||
var json = line["INFO ".Length..^2];
|
||||
var payload = JsonSerializer.Deserialize<ServerInfo>(json);
|
||||
payload.ShouldNotBeNull();
|
||||
payload!.Id.ShouldBe("S1");
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task ParallelTaskQueue_ShouldExecuteQueuedActions()
|
||||
{
|
||||
var writer = ServerUtilities.ParallelTaskQueue(maxParallelism: 2);
|
||||
var ran = 0;
|
||||
var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
|
||||
|
||||
for (var i = 0; i < 4; i++)
|
||||
{
|
||||
var accepted = writer.TryWrite(() =>
|
||||
{
|
||||
if (Interlocked.Increment(ref ran) == 4)
|
||||
tcs.TrySetResult();
|
||||
});
|
||||
accepted.ShouldBeTrue();
|
||||
}
|
||||
|
||||
writer.TryComplete().ShouldBeTrue();
|
||||
var finished = await Task.WhenAny(tcs.Task, Task.Delay(TimeSpan.FromSeconds(2)));
|
||||
finished.ShouldBe(tcs.Task);
|
||||
ran.ShouldBe(4);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -35,6 +35,16 @@ public sealed class SignalHandlerTests : IDisposable
|
||||
SignalHandler.CommandToUnixSignal(ServerCommand.LameDuckMode).ShouldBe(UnixSignal.SigUsr2);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void CommandToSignal_ShouldMatchCommandToUnixSignal()
|
||||
{
|
||||
foreach (var command in Enum.GetValues<ServerCommand>())
|
||||
{
|
||||
SignalHandler.CommandToSignal(command)
|
||||
.ShouldBe(SignalHandler.CommandToUnixSignal(command));
|
||||
}
|
||||
}
|
||||
|
||||
[Fact] // T:3155
|
||||
public void SetProcessName_ShouldNotThrow()
|
||||
{
|
||||
|
||||
@@ -0,0 +1,58 @@
|
||||
// Copyright 2012-2026 The NATS Authors
|
||||
// Licensed under the Apache License, Version 2.0
|
||||
|
||||
using Shouldly;
|
||||
using ZB.MOM.NatsNet.Server;
|
||||
|
||||
namespace ZB.MOM.NatsNet.Server.Tests.JetStream;
|
||||
|
||||
public sealed class DiskAvailabilityTests
|
||||
{
|
||||
private const long JetStreamMaxStoreDefault = 1L * 1024 * 1024 * 1024 * 1024;
|
||||
|
||||
[Fact]
|
||||
public void DiskAvailable_MissingDirectory_ShouldCreateDirectory()
|
||||
{
|
||||
var root = Path.Combine(Path.GetTempPath(), $"disk-avail-{Guid.NewGuid():N}");
|
||||
var target = Path.Combine(root, "nested");
|
||||
try
|
||||
{
|
||||
Directory.Exists(target).ShouldBeFalse();
|
||||
|
||||
var available = DiskAvailability.DiskAvailable(target);
|
||||
|
||||
Directory.Exists(target).ShouldBeTrue();
|
||||
available.ShouldBeGreaterThan(0L);
|
||||
}
|
||||
finally
|
||||
{
|
||||
if (Directory.Exists(root))
|
||||
Directory.Delete(root, recursive: true);
|
||||
}
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void DiskAvailable_InvalidPath_ShouldReturnFallback()
|
||||
{
|
||||
var available = DiskAvailability.DiskAvailable("\0");
|
||||
available.ShouldBe(JetStreamMaxStoreDefault);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Check_ShouldUseDiskAvailableThreshold()
|
||||
{
|
||||
var root = Path.Combine(Path.GetTempPath(), $"disk-check-{Guid.NewGuid():N}");
|
||||
try
|
||||
{
|
||||
var available = DiskAvailability.DiskAvailable(root);
|
||||
|
||||
DiskAvailability.Check(root, Math.Max(0, available - 1)).ShouldBeTrue();
|
||||
DiskAvailability.Check(root, available + 1).ShouldBeFalse();
|
||||
}
|
||||
finally
|
||||
{
|
||||
if (Directory.Exists(root))
|
||||
Directory.Delete(root, recursive: true);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -35,4 +35,82 @@ public sealed class NatsConsumerTests
|
||||
consumer.Stop();
|
||||
consumer.IsLeader().ShouldBeFalse();
|
||||
}
|
||||
|
||||
[Fact] // T:1364
|
||||
public void SortingConsumerPullRequests_ShouldSucceed()
|
||||
{
|
||||
var q = new WaitQueue(max: 100);
|
||||
|
||||
q.AddPrioritized(new WaitingRequest { Reply = "1a", PriorityGroup = new PriorityGroup { Priority = 1 }, N = 1 })
|
||||
.ShouldBeTrue();
|
||||
q.AddPrioritized(new WaitingRequest { Reply = "2a", PriorityGroup = new PriorityGroup { Priority = 2 }, N = 1 })
|
||||
.ShouldBeTrue();
|
||||
q.AddPrioritized(new WaitingRequest { Reply = "1b", PriorityGroup = new PriorityGroup { Priority = 1 }, N = 1 })
|
||||
.ShouldBeTrue();
|
||||
q.AddPrioritized(new WaitingRequest { Reply = "2b", PriorityGroup = new PriorityGroup { Priority = 2 }, N = 1 })
|
||||
.ShouldBeTrue();
|
||||
q.AddPrioritized(new WaitingRequest { Reply = "1c", PriorityGroup = new PriorityGroup { Priority = 1 }, N = 1 })
|
||||
.ShouldBeTrue();
|
||||
q.AddPrioritized(new WaitingRequest { Reply = "3a", PriorityGroup = new PriorityGroup { Priority = 3 }, N = 1 })
|
||||
.ShouldBeTrue();
|
||||
q.AddPrioritized(new WaitingRequest { Reply = "2c", PriorityGroup = new PriorityGroup { Priority = 2 }, N = 1 })
|
||||
.ShouldBeTrue();
|
||||
|
||||
var expectedOrder = new[]
|
||||
{
|
||||
("1a", 1),
|
||||
("1b", 1),
|
||||
("1c", 1),
|
||||
("2a", 2),
|
||||
("2b", 2),
|
||||
("2c", 2),
|
||||
("3a", 3),
|
||||
};
|
||||
|
||||
q.Len.ShouldBe(expectedOrder.Length);
|
||||
foreach (var (reply, priority) in expectedOrder)
|
||||
{
|
||||
var current = q.Peek();
|
||||
current.ShouldNotBeNull();
|
||||
current!.Reply.ShouldBe(reply);
|
||||
current.PriorityGroup.ShouldNotBeNull();
|
||||
current.PriorityGroup!.Priority.ShouldBe(priority);
|
||||
q.RemoveCurrent();
|
||||
}
|
||||
|
||||
q.IsEmpty().ShouldBeTrue();
|
||||
}
|
||||
|
||||
[Fact] // T:1365
|
||||
public void WaitQueuePopAndRequeue_ShouldSucceed()
|
||||
{
|
||||
var q = new WaitQueue(max: 100);
|
||||
q.AddPrioritized(new WaitingRequest { Reply = "1a", N = 2, PriorityGroup = new PriorityGroup { Priority = 1 } })
|
||||
.ShouldBeTrue();
|
||||
q.AddPrioritized(new WaitingRequest { Reply = "1b", N = 1, PriorityGroup = new PriorityGroup { Priority = 1 } })
|
||||
.ShouldBeTrue();
|
||||
q.AddPrioritized(new WaitingRequest { Reply = "2a", N = 3, PriorityGroup = new PriorityGroup { Priority = 2 } })
|
||||
.ShouldBeTrue();
|
||||
|
||||
var wr = q.PopAndRequeue();
|
||||
wr.ShouldNotBeNull();
|
||||
wr!.Reply.ShouldBe("1a");
|
||||
wr.N.ShouldBe(1);
|
||||
q.Len.ShouldBe(3);
|
||||
|
||||
wr = q.PopAndRequeue();
|
||||
wr.ShouldNotBeNull();
|
||||
wr!.Reply.ShouldBe("1b");
|
||||
wr.N.ShouldBe(0);
|
||||
q.Len.ShouldBe(2);
|
||||
|
||||
wr = q.PopAndRequeue();
|
||||
wr.ShouldNotBeNull();
|
||||
wr!.Reply.ShouldBe("1a");
|
||||
wr.N.ShouldBe(0);
|
||||
q.Len.ShouldBe(1);
|
||||
|
||||
q.Peek()!.Reply.ShouldBe("2a");
|
||||
q.Peek()!.N.ShouldBe(3);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -24,8 +24,28 @@ public sealed class WaitQueueTests
|
||||
q.Peek()!.Subject.ShouldBe("A");
|
||||
|
||||
q.Pop()!.Subject.ShouldBe("A");
|
||||
q.Pop()!.Subject.ShouldBe("B");
|
||||
q.Len.ShouldBe(1);
|
||||
|
||||
q.Pop()!.Subject.ShouldBe("B");
|
||||
q.Len.ShouldBe(0);
|
||||
q.IsFull(1).ShouldBeFalse();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void AddPrioritized_AndCycle_ShouldPreserveStableOrder()
|
||||
{
|
||||
var q = new WaitQueue(max: 10);
|
||||
|
||||
q.AddPrioritized(new WaitingRequest { Reply = "2a", N = 1, PriorityGroup = new PriorityGroup { Priority = 2 } })
|
||||
.ShouldBeTrue();
|
||||
q.AddPrioritized(new WaitingRequest { Reply = "1a", N = 1, PriorityGroup = new PriorityGroup { Priority = 1 } })
|
||||
.ShouldBeTrue();
|
||||
q.AddPrioritized(new WaitingRequest { Reply = "1b", N = 1, PriorityGroup = new PriorityGroup { Priority = 1 } })
|
||||
.ShouldBeTrue();
|
||||
|
||||
q.Peek()!.Reply.ShouldBe("1a");
|
||||
q.Cycle();
|
||||
q.Peek()!.Reply.ShouldBe("1b");
|
||||
}
|
||||
}
|
||||
|
||||
BIN
porting.db
BIN
porting.db
Binary file not shown.
@@ -1,6 +1,6 @@
|
||||
# NATS .NET Porting Status Report
|
||||
|
||||
Generated: 2026-02-27 13:56:27 UTC
|
||||
Generated: 2026-02-27 15:27:06 UTC
|
||||
|
||||
## Modules (12 total)
|
||||
|
||||
@@ -12,17 +12,18 @@ Generated: 2026-02-27 13:56:27 UTC
|
||||
|
||||
| Status | Count |
|
||||
|--------|-------|
|
||||
| deferred | 2461 |
|
||||
| n_a | 18 |
|
||||
| verified | 1194 |
|
||||
| deferred | 2377 |
|
||||
| n_a | 24 |
|
||||
| stub | 1 |
|
||||
| verified | 1271 |
|
||||
|
||||
## Unit Tests (3257 total)
|
||||
|
||||
| Status | Count |
|
||||
|--------|-------|
|
||||
| deferred | 2662 |
|
||||
| deferred | 2660 |
|
||||
| n_a | 187 |
|
||||
| verified | 408 |
|
||||
| verified | 410 |
|
||||
|
||||
## Library Mappings (36 total)
|
||||
|
||||
@@ -33,4 +34,4 @@ Generated: 2026-02-27 13:56:27 UTC
|
||||
|
||||
## Overall Progress
|
||||
|
||||
**1819/6942 items complete (26.2%)**
|
||||
**1904/6942 items complete (27.4%)**
|
||||
|
||||
37
reports/report_4e96fb2.md
Normal file
37
reports/report_4e96fb2.md
Normal file
@@ -0,0 +1,37 @@
|
||||
# NATS .NET Porting Status Report
|
||||
|
||||
Generated: 2026-02-27 15:04:33 UTC
|
||||
|
||||
## Modules (12 total)
|
||||
|
||||
| Status | Count |
|
||||
|--------|-------|
|
||||
| verified | 12 |
|
||||
|
||||
## Features (3673 total)
|
||||
|
||||
| Status | Count |
|
||||
|--------|-------|
|
||||
| deferred | 2397 |
|
||||
| n_a | 18 |
|
||||
| stub | 1 |
|
||||
| verified | 1257 |
|
||||
|
||||
## Unit Tests (3257 total)
|
||||
|
||||
| Status | Count |
|
||||
|--------|-------|
|
||||
| deferred | 2660 |
|
||||
| n_a | 187 |
|
||||
| verified | 410 |
|
||||
|
||||
## Library Mappings (36 total)
|
||||
|
||||
| Status | Count |
|
||||
|--------|-------|
|
||||
| mapped | 36 |
|
||||
|
||||
|
||||
## Overall Progress
|
||||
|
||||
**1884/6942 items complete (27.1%)**
|
||||
36
reports/report_8849265.md
Normal file
36
reports/report_8849265.md
Normal file
@@ -0,0 +1,36 @@
|
||||
# NATS .NET Porting Status Report
|
||||
|
||||
Generated: 2026-02-27 14:58:38 UTC
|
||||
|
||||
## Modules (12 total)
|
||||
|
||||
| Status | Count |
|
||||
|--------|-------|
|
||||
| verified | 12 |
|
||||
|
||||
## Features (3673 total)
|
||||
|
||||
| Status | Count |
|
||||
|--------|-------|
|
||||
| deferred | 2440 |
|
||||
| n_a | 18 |
|
||||
| verified | 1215 |
|
||||
|
||||
## Unit Tests (3257 total)
|
||||
|
||||
| Status | Count |
|
||||
|--------|-------|
|
||||
| deferred | 2660 |
|
||||
| n_a | 187 |
|
||||
| verified | 410 |
|
||||
|
||||
## Library Mappings (36 total)
|
||||
|
||||
| Status | Count |
|
||||
|--------|-------|
|
||||
| mapped | 36 |
|
||||
|
||||
|
||||
## Overall Progress
|
||||
|
||||
**1842/6942 items complete (26.5%)**
|
||||
36
reports/report_ae0a553.md
Normal file
36
reports/report_ae0a553.md
Normal file
@@ -0,0 +1,36 @@
|
||||
# NATS .NET Porting Status Report
|
||||
|
||||
Generated: 2026-02-27 14:59:29 UTC
|
||||
|
||||
## Modules (12 total)
|
||||
|
||||
| Status | Count |
|
||||
|--------|-------|
|
||||
| verified | 12 |
|
||||
|
||||
## Features (3673 total)
|
||||
|
||||
| Status | Count |
|
||||
|--------|-------|
|
||||
| deferred | 2440 |
|
||||
| n_a | 18 |
|
||||
| verified | 1215 |
|
||||
|
||||
## Unit Tests (3257 total)
|
||||
|
||||
| Status | Count |
|
||||
|--------|-------|
|
||||
| deferred | 2660 |
|
||||
| n_a | 187 |
|
||||
| verified | 410 |
|
||||
|
||||
## Library Mappings (36 total)
|
||||
|
||||
| Status | Count |
|
||||
|--------|-------|
|
||||
| mapped | 36 |
|
||||
|
||||
|
||||
## Overall Progress
|
||||
|
||||
**1842/6942 items complete (26.5%)**
|
||||
37
reports/report_c0aaae9.md
Normal file
37
reports/report_c0aaae9.md
Normal file
@@ -0,0 +1,37 @@
|
||||
# NATS .NET Porting Status Report
|
||||
|
||||
Generated: 2026-02-27 15:27:06 UTC
|
||||
|
||||
## Modules (12 total)
|
||||
|
||||
| Status | Count |
|
||||
|--------|-------|
|
||||
| verified | 12 |
|
||||
|
||||
## Features (3673 total)
|
||||
|
||||
| Status | Count |
|
||||
|--------|-------|
|
||||
| deferred | 2377 |
|
||||
| n_a | 24 |
|
||||
| stub | 1 |
|
||||
| verified | 1271 |
|
||||
|
||||
## Unit Tests (3257 total)
|
||||
|
||||
| Status | Count |
|
||||
|--------|-------|
|
||||
| deferred | 2660 |
|
||||
| n_a | 187 |
|
||||
| verified | 410 |
|
||||
|
||||
## Library Mappings (36 total)
|
||||
|
||||
| Status | Count |
|
||||
|--------|-------|
|
||||
| mapped | 36 |
|
||||
|
||||
|
||||
## Overall Progress
|
||||
|
||||
**1904/6942 items complete (27.4%)**
|
||||
@@ -256,6 +256,10 @@ func (a *Analyzer) parseTestFile(filePath string) ([]TestFunc, []ImportInfo, int
|
||||
}
|
||||
|
||||
test.FeatureName = a.inferFeatureName(name)
|
||||
test.BestFeatureIdx = -1
|
||||
if fn.Body != nil {
|
||||
test.Calls = a.extractCalls(fn.Body)
|
||||
}
|
||||
tests = append(tests, test)
|
||||
}
|
||||
|
||||
@@ -331,6 +335,210 @@ func (a *Analyzer) inferFeatureName(testName string) string {
|
||||
return name
|
||||
}
|
||||
|
||||
// extractCalls walks an AST block statement and extracts all function/method calls.
|
||||
func (a *Analyzer) extractCalls(body *ast.BlockStmt) []CallInfo {
|
||||
seen := make(map[string]bool)
|
||||
var calls []CallInfo
|
||||
|
||||
ast.Inspect(body, func(n ast.Node) bool {
|
||||
callExpr, ok := n.(*ast.CallExpr)
|
||||
if !ok {
|
||||
return true
|
||||
}
|
||||
|
||||
var ci CallInfo
|
||||
switch fun := callExpr.Fun.(type) {
|
||||
case *ast.Ident:
|
||||
ci = CallInfo{FuncName: fun.Name}
|
||||
case *ast.SelectorExpr:
|
||||
ci = CallInfo{
|
||||
RecvOrPkg: extractIdent(fun.X),
|
||||
MethodName: fun.Sel.Name,
|
||||
IsSelector: true,
|
||||
}
|
||||
default:
|
||||
return true
|
||||
}
|
||||
|
||||
key := ci.callKey()
|
||||
if !seen[key] && !isFilteredCall(ci) {
|
||||
seen[key] = true
|
||||
calls = append(calls, ci)
|
||||
}
|
||||
return true
|
||||
})
|
||||
|
||||
return calls
|
||||
}
|
||||
|
||||
// extractIdent extracts an identifier name from an expression (handles X in X.Y).
|
||||
func extractIdent(expr ast.Expr) string {
|
||||
switch e := expr.(type) {
|
||||
case *ast.Ident:
|
||||
return e.Name
|
||||
case *ast.SelectorExpr:
|
||||
return extractIdent(e.X) + "." + e.Sel.Name
|
||||
default:
|
||||
return ""
|
||||
}
|
||||
}
|
||||
|
||||
// isFilteredCall returns true if a call should be excluded from feature matching.
|
||||
func isFilteredCall(c CallInfo) bool {
|
||||
if c.IsSelector {
|
||||
recv := c.RecvOrPkg
|
||||
// testing.T/B methods
|
||||
if recv == "t" || recv == "b" || recv == "tb" {
|
||||
return true
|
||||
}
|
||||
// stdlib packages
|
||||
if stdlibPkgs[recv] {
|
||||
return true
|
||||
}
|
||||
// NATS client libs
|
||||
if recv == "nats" || recv == "nuid" || recv == "nkeys" || recv == "jwt" {
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// Go builtins
|
||||
name := c.FuncName
|
||||
if builtinFuncs[name] {
|
||||
return true
|
||||
}
|
||||
|
||||
// Test assertion helpers
|
||||
lower := strings.ToLower(name)
|
||||
if strings.HasPrefix(name, "require_") {
|
||||
return true
|
||||
}
|
||||
for _, prefix := range []string{"check", "verify", "assert", "expect"} {
|
||||
if strings.HasPrefix(lower, prefix) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
// featureRef identifies a feature within the analysis result.
|
||||
type featureRef struct {
|
||||
moduleIdx int
|
||||
featureIdx int
|
||||
goFile string
|
||||
goClass string
|
||||
}
|
||||
|
||||
// resolveCallGraph matches test calls against known features across all modules.
|
||||
func resolveCallGraph(result *AnalysisResult) {
|
||||
// Build method index: go_method name → list of feature refs
|
||||
methodIndex := make(map[string][]featureRef)
|
||||
for mi, mod := range result.Modules {
|
||||
for fi, feat := range mod.Features {
|
||||
ref := featureRef{
|
||||
moduleIdx: mi,
|
||||
featureIdx: fi,
|
||||
goFile: feat.GoFile,
|
||||
goClass: feat.GoClass,
|
||||
}
|
||||
methodIndex[feat.GoMethod] = append(methodIndex[feat.GoMethod], ref)
|
||||
}
|
||||
}
|
||||
|
||||
// For each test, resolve calls to features
|
||||
for mi := range result.Modules {
|
||||
mod := &result.Modules[mi]
|
||||
for ti := range mod.Tests {
|
||||
test := &mod.Tests[ti]
|
||||
seen := make(map[int]bool) // feature indices already linked
|
||||
var linked []int
|
||||
|
||||
testFileBase := sourceFileBase(test.GoFile)
|
||||
|
||||
for _, call := range test.Calls {
|
||||
// Look up the method name
|
||||
name := call.MethodName
|
||||
if !call.IsSelector {
|
||||
name = call.FuncName
|
||||
}
|
||||
|
||||
candidates := methodIndex[name]
|
||||
if len(candidates) == 0 {
|
||||
continue
|
||||
}
|
||||
// Ambiguity threshold: skip very common method names
|
||||
if len(candidates) > 10 {
|
||||
continue
|
||||
}
|
||||
|
||||
// Filter to same module
|
||||
var sameModule []featureRef
|
||||
for _, ref := range candidates {
|
||||
if ref.moduleIdx == mi {
|
||||
sameModule = append(sameModule, ref)
|
||||
}
|
||||
}
|
||||
if len(sameModule) == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
for _, ref := range sameModule {
|
||||
if !seen[ref.featureIdx] {
|
||||
seen[ref.featureIdx] = true
|
||||
linked = append(linked, ref.featureIdx)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
test.LinkedFeatures = linked
|
||||
|
||||
// Set BestFeatureIdx using priority:
|
||||
// (a) existing inferFeatureName match
|
||||
// (b) same-file-base match
|
||||
// (c) first remaining candidate
|
||||
if test.BestFeatureIdx < 0 && len(linked) > 0 {
|
||||
// Try same-file-base match first
|
||||
for _, fi := range linked {
|
||||
featFileBase := sourceFileBase(mod.Features[fi].GoFile)
|
||||
if featFileBase == testFileBase {
|
||||
test.BestFeatureIdx = fi
|
||||
break
|
||||
}
|
||||
}
|
||||
// Fall back to first candidate
|
||||
if test.BestFeatureIdx < 0 {
|
||||
test.BestFeatureIdx = linked[0]
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// sourceFileBase strips _test.go suffix and path to get the base file name.
|
||||
func sourceFileBase(goFile string) string {
|
||||
base := filepath.Base(goFile)
|
||||
base = strings.TrimSuffix(base, "_test.go")
|
||||
base = strings.TrimSuffix(base, ".go")
|
||||
return base
|
||||
}
|
||||
|
||||
var stdlibPkgs = map[string]bool{
|
||||
"fmt": true, "time": true, "strings": true, "bytes": true, "errors": true,
|
||||
"os": true, "math": true, "sort": true, "reflect": true, "sync": true,
|
||||
"context": true, "io": true, "filepath": true, "strconv": true,
|
||||
"encoding": true, "json": true, "binary": true, "hex": true, "rand": true,
|
||||
"runtime": true, "atomic": true, "slices": true, "testing": true,
|
||||
"net": true, "bufio": true, "crypto": true, "log": true, "regexp": true,
|
||||
"unicode": true, "http": true, "url": true,
|
||||
}
|
||||
|
||||
var builtinFuncs = map[string]bool{
|
||||
"make": true, "append": true, "len": true, "cap": true, "close": true,
|
||||
"delete": true, "panic": true, "recover": true, "print": true,
|
||||
"println": true, "copy": true, "new": true,
|
||||
}
|
||||
|
||||
// isStdlib checks if an import path is a Go standard library package.
|
||||
func isStdlib(importPath string) bool {
|
||||
firstSlash := strings.Index(importPath, "/")
|
||||
|
||||
@@ -11,28 +11,47 @@ func main() {
|
||||
sourceDir := flag.String("source", "", "Path to Go source root (e.g., ../../golang/nats-server)")
|
||||
dbPath := flag.String("db", "", "Path to SQLite database file (e.g., ../../porting.db)")
|
||||
schemaPath := flag.String("schema", "", "Path to SQL schema file (e.g., ../../porting-schema.sql)")
|
||||
mode := flag.String("mode", "full", "Analysis mode: 'full' (default) or 'call-graph' (incremental)")
|
||||
flag.Parse()
|
||||
|
||||
if *sourceDir == "" || *dbPath == "" || *schemaPath == "" {
|
||||
fmt.Fprintf(os.Stderr, "Usage: go-analyzer --source <path> --db <path> --schema <path>\n")
|
||||
if *sourceDir == "" || *dbPath == "" {
|
||||
fmt.Fprintf(os.Stderr, "Usage: go-analyzer --source <path> --db <path> [--schema <path>] [--mode full|call-graph]\n")
|
||||
flag.PrintDefaults()
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
switch *mode {
|
||||
case "full":
|
||||
runFull(*sourceDir, *dbPath, *schemaPath)
|
||||
case "call-graph":
|
||||
runCallGraph(*sourceDir, *dbPath)
|
||||
default:
|
||||
log.Fatalf("Unknown mode %q: must be 'full' or 'call-graph'", *mode)
|
||||
}
|
||||
}
|
||||
|
||||
func runFull(sourceDir, dbPath, schemaPath string) {
|
||||
if schemaPath == "" {
|
||||
log.Fatal("--schema is required for full mode")
|
||||
}
|
||||
|
||||
// Open DB and apply schema
|
||||
db, err := OpenDB(*dbPath, *schemaPath)
|
||||
db, err := OpenDB(dbPath, schemaPath)
|
||||
if err != nil {
|
||||
log.Fatalf("Failed to open database: %v", err)
|
||||
}
|
||||
defer db.Close()
|
||||
|
||||
// Run analysis
|
||||
analyzer := NewAnalyzer(*sourceDir)
|
||||
analyzer := NewAnalyzer(sourceDir)
|
||||
result, err := analyzer.Analyze()
|
||||
if err != nil {
|
||||
log.Fatalf("Analysis failed: %v", err)
|
||||
}
|
||||
|
||||
// Resolve call graph before writing
|
||||
resolveCallGraph(result)
|
||||
|
||||
// Write to DB
|
||||
writer := NewDBWriter(db)
|
||||
if err := writer.WriteAll(result); err != nil {
|
||||
@@ -46,3 +65,35 @@ func main() {
|
||||
fmt.Printf(" Dependencies: %d\n", len(result.Dependencies))
|
||||
fmt.Printf(" Imports: %d\n", len(result.Imports))
|
||||
}
|
||||
|
||||
func runCallGraph(sourceDir, dbPath string) {
|
||||
// Open existing DB without schema
|
||||
db, err := OpenDBNoSchema(dbPath)
|
||||
if err != nil {
|
||||
log.Fatalf("Failed to open database: %v", err)
|
||||
}
|
||||
defer db.Close()
|
||||
|
||||
// Run analysis (parse Go source)
|
||||
analyzer := NewAnalyzer(sourceDir)
|
||||
result, err := analyzer.Analyze()
|
||||
if err != nil {
|
||||
log.Fatalf("Analysis failed: %v", err)
|
||||
}
|
||||
|
||||
// Resolve call graph
|
||||
resolveCallGraph(result)
|
||||
|
||||
// Update DB incrementally
|
||||
writer := NewDBWriter(db)
|
||||
stats, err := writer.UpdateCallGraph(result)
|
||||
if err != nil {
|
||||
log.Fatalf("Failed to update call graph: %v", err)
|
||||
}
|
||||
|
||||
fmt.Printf("Call graph analysis complete:\n")
|
||||
fmt.Printf(" Tests analyzed: %d\n", stats.TestsAnalyzed)
|
||||
fmt.Printf(" Tests linked: %d\n", stats.TestsLinked)
|
||||
fmt.Printf(" Dependency rows: %d\n", stats.DependencyRows)
|
||||
fmt.Printf(" Feature IDs set: %d\n", stats.FeatureIDsSet)
|
||||
}
|
||||
|
||||
@@ -152,3 +152,176 @@ func (w *DBWriter) insertLibrary(tx *sql.Tx, imp *ImportInfo) error {
|
||||
)
|
||||
return err
|
||||
}
|
||||
|
||||
// OpenDBNoSchema opens an existing SQLite database without applying schema.
|
||||
// It verifies that the required tables exist.
|
||||
func OpenDBNoSchema(dbPath string) (*sql.DB, error) {
|
||||
db, err := sql.Open("sqlite3", dbPath+"?_journal_mode=WAL&_foreign_keys=ON")
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("opening database: %w", err)
|
||||
}
|
||||
|
||||
// Verify required tables exist
|
||||
for _, table := range []string{"modules", "features", "unit_tests", "dependencies"} {
|
||||
var name string
|
||||
err := db.QueryRow("SELECT name FROM sqlite_master WHERE type='table' AND name=?", table).Scan(&name)
|
||||
if err != nil {
|
||||
db.Close()
|
||||
return nil, fmt.Errorf("required table %q not found: %w", table, err)
|
||||
}
|
||||
}
|
||||
|
||||
return db, nil
|
||||
}
|
||||
|
||||
// CallGraphStats holds summary statistics from a call-graph update.
|
||||
type CallGraphStats struct {
|
||||
TestsAnalyzed int
|
||||
TestsLinked int
|
||||
DependencyRows int
|
||||
FeatureIDsSet int
|
||||
}
|
||||
|
||||
// UpdateCallGraph writes call-graph analysis results to the database incrementally.
|
||||
func (w *DBWriter) UpdateCallGraph(result *AnalysisResult) (*CallGraphStats, error) {
|
||||
stats := &CallGraphStats{}
|
||||
|
||||
// Load module name→ID mapping
|
||||
moduleIDs := make(map[string]int64)
|
||||
rows, err := w.db.Query("SELECT id, name FROM modules")
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("querying modules: %w", err)
|
||||
}
|
||||
for rows.Next() {
|
||||
var id int64
|
||||
var name string
|
||||
if err := rows.Scan(&id, &name); err != nil {
|
||||
rows.Close()
|
||||
return nil, err
|
||||
}
|
||||
moduleIDs[name] = id
|
||||
}
|
||||
rows.Close()
|
||||
|
||||
// Load feature DB IDs: "module_name:go_method:go_class" → id
|
||||
type featureKey struct {
|
||||
moduleName string
|
||||
goMethod string
|
||||
goClass string
|
||||
}
|
||||
featureDBIDs := make(map[featureKey]int64)
|
||||
rows, err = w.db.Query(`
|
||||
SELECT f.id, m.name, f.go_method, COALESCE(f.go_class, '')
|
||||
FROM features f
|
||||
JOIN modules m ON f.module_id = m.id
|
||||
`)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("querying features: %w", err)
|
||||
}
|
||||
for rows.Next() {
|
||||
var id int64
|
||||
var modName, goMethod, goClass string
|
||||
if err := rows.Scan(&id, &modName, &goMethod, &goClass); err != nil {
|
||||
rows.Close()
|
||||
return nil, err
|
||||
}
|
||||
featureDBIDs[featureKey{modName, goMethod, goClass}] = id
|
||||
}
|
||||
rows.Close()
|
||||
|
||||
// Load test DB IDs: "module_name:go_method" → id
|
||||
testDBIDs := make(map[string]int64)
|
||||
rows, err = w.db.Query(`
|
||||
SELECT ut.id, m.name, ut.go_method
|
||||
FROM unit_tests ut
|
||||
JOIN modules m ON ut.module_id = m.id
|
||||
`)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("querying unit_tests: %w", err)
|
||||
}
|
||||
for rows.Next() {
|
||||
var id int64
|
||||
var modName, goMethod string
|
||||
if err := rows.Scan(&id, &modName, &goMethod); err != nil {
|
||||
rows.Close()
|
||||
return nil, err
|
||||
}
|
||||
testDBIDs[modName+":"+goMethod] = id
|
||||
}
|
||||
rows.Close()
|
||||
|
||||
// Begin transaction
|
||||
tx, err := w.db.Begin()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("beginning transaction: %w", err)
|
||||
}
|
||||
defer tx.Rollback()
|
||||
|
||||
// Clear old call-graph data
|
||||
if _, err := tx.Exec("DELETE FROM dependencies WHERE source_type='unit_test' AND dependency_kind='calls'"); err != nil {
|
||||
return nil, fmt.Errorf("clearing old dependencies: %w", err)
|
||||
}
|
||||
if _, err := tx.Exec("UPDATE unit_tests SET feature_id = NULL"); err != nil {
|
||||
return nil, fmt.Errorf("clearing old feature_ids: %w", err)
|
||||
}
|
||||
|
||||
// Prepare statements
|
||||
insertDep, err := tx.Prepare("INSERT OR IGNORE INTO dependencies (source_type, source_id, target_type, target_id, dependency_kind) VALUES ('unit_test', ?, 'feature', ?, 'calls')")
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("preparing insert dependency: %w", err)
|
||||
}
|
||||
defer insertDep.Close()
|
||||
|
||||
updateFeatureID, err := tx.Prepare("UPDATE unit_tests SET feature_id = ? WHERE id = ?")
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("preparing update feature_id: %w", err)
|
||||
}
|
||||
defer updateFeatureID.Close()
|
||||
|
||||
// Process each module's tests
|
||||
for _, mod := range result.Modules {
|
||||
for _, test := range mod.Tests {
|
||||
stats.TestsAnalyzed++
|
||||
|
||||
testDBID, ok := testDBIDs[mod.Name+":"+test.GoMethod]
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
||||
// Insert dependency rows for linked features
|
||||
if len(test.LinkedFeatures) > 0 {
|
||||
stats.TestsLinked++
|
||||
}
|
||||
for _, fi := range test.LinkedFeatures {
|
||||
feat := mod.Features[fi]
|
||||
featDBID, ok := featureDBIDs[featureKey{mod.Name, feat.GoMethod, feat.GoClass}]
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
if _, err := insertDep.Exec(testDBID, featDBID); err != nil {
|
||||
return nil, fmt.Errorf("inserting dependency for test %s: %w", test.GoMethod, err)
|
||||
}
|
||||
stats.DependencyRows++
|
||||
}
|
||||
|
||||
// Set feature_id for best match
|
||||
if test.BestFeatureIdx >= 0 {
|
||||
feat := mod.Features[test.BestFeatureIdx]
|
||||
featDBID, ok := featureDBIDs[featureKey{mod.Name, feat.GoMethod, feat.GoClass}]
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
if _, err := updateFeatureID.Exec(featDBID, testDBID); err != nil {
|
||||
return nil, fmt.Errorf("updating feature_id for test %s: %w", test.GoMethod, err)
|
||||
}
|
||||
stats.FeatureIDsSet++
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if err := tx.Commit(); err != nil {
|
||||
return nil, fmt.Errorf("committing transaction: %w", err)
|
||||
}
|
||||
|
||||
return stats, nil
|
||||
}
|
||||
|
||||
@@ -58,6 +58,28 @@ type TestFunc struct {
|
||||
GoLineCount int
|
||||
// FeatureName links this test to a feature by naming convention
|
||||
FeatureName string
|
||||
// Calls holds raw function/method calls extracted from the test body AST
|
||||
Calls []CallInfo
|
||||
// LinkedFeatures holds indices into the parent module's Features slice
|
||||
LinkedFeatures []int
|
||||
// BestFeatureIdx is the primary feature match index (-1 = none)
|
||||
BestFeatureIdx int
|
||||
}
|
||||
|
||||
// CallInfo represents a function or method call extracted from a test body.
|
||||
type CallInfo struct {
|
||||
FuncName string // direct call name: "newMemStore"
|
||||
RecvOrPkg string // selector receiver/pkg: "ms", "fmt", "t"
|
||||
MethodName string // selector method: "StoreMsg", "Fatalf"
|
||||
IsSelector bool // true for X.Y() form
|
||||
}
|
||||
|
||||
// callKey returns a deduplication key for this call.
|
||||
func (c CallInfo) callKey() string {
|
||||
if c.IsSelector {
|
||||
return c.RecvOrPkg + "." + c.MethodName
|
||||
}
|
||||
return c.FuncName
|
||||
}
|
||||
|
||||
// Dependency represents a call relationship between two items.
|
||||
|
||||
Reference in New Issue
Block a user