feat: implement /subz endpoint with account filter, test subject, and pagination
This commit is contained in:
@@ -15,6 +15,7 @@ public sealed class MonitorServer : IAsyncDisposable
|
|||||||
private readonly ILogger<MonitorServer> _logger;
|
private readonly ILogger<MonitorServer> _logger;
|
||||||
private readonly VarzHandler _varzHandler;
|
private readonly VarzHandler _varzHandler;
|
||||||
private readonly ConnzHandler _connzHandler;
|
private readonly ConnzHandler _connzHandler;
|
||||||
|
private readonly SubszHandler _subszHandler;
|
||||||
|
|
||||||
public MonitorServer(NatsServer server, NatsOptions options, ServerStats stats, ILoggerFactory loggerFactory)
|
public MonitorServer(NatsServer server, NatsOptions options, ServerStats stats, ILoggerFactory loggerFactory)
|
||||||
{
|
{
|
||||||
@@ -29,6 +30,7 @@ public sealed class MonitorServer : IAsyncDisposable
|
|||||||
|
|
||||||
_varzHandler = new VarzHandler(server, options);
|
_varzHandler = new VarzHandler(server, options);
|
||||||
_connzHandler = new ConnzHandler(server);
|
_connzHandler = new ConnzHandler(server);
|
||||||
|
_subszHandler = new SubszHandler(server);
|
||||||
|
|
||||||
_app.MapGet(basePath + "/", () =>
|
_app.MapGet(basePath + "/", () =>
|
||||||
{
|
{
|
||||||
@@ -75,15 +77,15 @@ public sealed class MonitorServer : IAsyncDisposable
|
|||||||
stats.HttpReqStats.AddOrUpdate("/leafz", 1, (_, v) => v + 1);
|
stats.HttpReqStats.AddOrUpdate("/leafz", 1, (_, v) => v + 1);
|
||||||
return Results.Ok(new { });
|
return Results.Ok(new { });
|
||||||
});
|
});
|
||||||
_app.MapGet(basePath + "/subz", () =>
|
_app.MapGet(basePath + "/subz", (HttpContext ctx) =>
|
||||||
{
|
{
|
||||||
stats.HttpReqStats.AddOrUpdate("/subz", 1, (_, v) => v + 1);
|
stats.HttpReqStats.AddOrUpdate("/subz", 1, (_, v) => v + 1);
|
||||||
return Results.Ok(new { });
|
return Results.Ok(_subszHandler.HandleSubsz(ctx));
|
||||||
});
|
});
|
||||||
_app.MapGet(basePath + "/subscriptionsz", () =>
|
_app.MapGet(basePath + "/subscriptionsz", (HttpContext ctx) =>
|
||||||
{
|
{
|
||||||
stats.HttpReqStats.AddOrUpdate("/subscriptionsz", 1, (_, v) => v + 1);
|
stats.HttpReqStats.AddOrUpdate("/subscriptionsz", 1, (_, v) => v + 1);
|
||||||
return Results.Ok(new { });
|
return Results.Ok(_subszHandler.HandleSubsz(ctx));
|
||||||
});
|
});
|
||||||
_app.MapGet(basePath + "/accountz", () =>
|
_app.MapGet(basePath + "/accountz", () =>
|
||||||
{
|
{
|
||||||
|
|||||||
45
src/NATS.Server/Monitoring/Subsz.cs
Normal file
45
src/NATS.Server/Monitoring/Subsz.cs
Normal file
@@ -0,0 +1,45 @@
|
|||||||
|
using System.Text.Json.Serialization;
|
||||||
|
|
||||||
|
namespace NATS.Server.Monitoring;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Subscription information response. Corresponds to Go server/monitor.go Subsz struct.
|
||||||
|
/// </summary>
|
||||||
|
public sealed class Subsz
|
||||||
|
{
|
||||||
|
[JsonPropertyName("server_id")]
|
||||||
|
public string Id { get; set; } = "";
|
||||||
|
|
||||||
|
[JsonPropertyName("now")]
|
||||||
|
public DateTime Now { get; set; }
|
||||||
|
|
||||||
|
[JsonPropertyName("num_subscriptions")]
|
||||||
|
public uint NumSubs { get; set; }
|
||||||
|
|
||||||
|
[JsonPropertyName("num_cache")]
|
||||||
|
public int NumCache { get; set; }
|
||||||
|
|
||||||
|
[JsonPropertyName("total")]
|
||||||
|
public int Total { get; set; }
|
||||||
|
|
||||||
|
[JsonPropertyName("offset")]
|
||||||
|
public int Offset { get; set; }
|
||||||
|
|
||||||
|
[JsonPropertyName("limit")]
|
||||||
|
public int Limit { get; set; }
|
||||||
|
|
||||||
|
[JsonPropertyName("subscriptions")]
|
||||||
|
public SubDetail[] Subs { get; set; } = [];
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Options passed to Subsz() for filtering.
|
||||||
|
/// </summary>
|
||||||
|
public sealed class SubszOptions
|
||||||
|
{
|
||||||
|
public int Offset { get; set; }
|
||||||
|
public int Limit { get; set; } = 1024;
|
||||||
|
public bool Subscriptions { get; set; }
|
||||||
|
public string Account { get; set; } = "";
|
||||||
|
public string Test { get; set; } = "";
|
||||||
|
}
|
||||||
93
src/NATS.Server/Monitoring/SubszHandler.cs
Normal file
93
src/NATS.Server/Monitoring/SubszHandler.cs
Normal file
@@ -0,0 +1,93 @@
|
|||||||
|
using Microsoft.AspNetCore.Http;
|
||||||
|
using NATS.Server.Subscriptions;
|
||||||
|
|
||||||
|
namespace NATS.Server.Monitoring;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Handles /subz endpoint requests, returning subscription information.
|
||||||
|
/// Corresponds to Go server/monitor.go handleSubsz.
|
||||||
|
/// </summary>
|
||||||
|
public sealed class SubszHandler(NatsServer server)
|
||||||
|
{
|
||||||
|
public Subsz HandleSubsz(HttpContext ctx)
|
||||||
|
{
|
||||||
|
var opts = ParseQueryParams(ctx);
|
||||||
|
var now = DateTime.UtcNow;
|
||||||
|
|
||||||
|
// Collect subscriptions from all accounts (or filtered)
|
||||||
|
var allSubs = new List<Subscription>();
|
||||||
|
foreach (var account in server.GetAccounts())
|
||||||
|
{
|
||||||
|
if (!string.IsNullOrEmpty(opts.Account) && account.Name != opts.Account)
|
||||||
|
continue;
|
||||||
|
allSubs.AddRange(account.SubList.GetAllSubscriptions());
|
||||||
|
}
|
||||||
|
|
||||||
|
// Filter by test subject if provided
|
||||||
|
if (!string.IsNullOrEmpty(opts.Test))
|
||||||
|
{
|
||||||
|
allSubs = allSubs.Where(s => SubjectMatch.MatchLiteral(opts.Test, s.Subject)).ToList();
|
||||||
|
}
|
||||||
|
|
||||||
|
var total = allSubs.Count;
|
||||||
|
var numSubs = server.GetAccounts()
|
||||||
|
.Where(a => string.IsNullOrEmpty(opts.Account) || a.Name == opts.Account)
|
||||||
|
.Aggregate(0u, (sum, a) => sum + a.SubList.Count);
|
||||||
|
var numCache = server.GetAccounts()
|
||||||
|
.Where(a => string.IsNullOrEmpty(opts.Account) || a.Name == opts.Account)
|
||||||
|
.Sum(a => a.SubList.CacheCount);
|
||||||
|
|
||||||
|
SubDetail[] details = [];
|
||||||
|
if (opts.Subscriptions)
|
||||||
|
{
|
||||||
|
details = allSubs
|
||||||
|
.Skip(opts.Offset)
|
||||||
|
.Take(opts.Limit)
|
||||||
|
.Select(s => new SubDetail
|
||||||
|
{
|
||||||
|
Subject = s.Subject,
|
||||||
|
Queue = s.Queue ?? "",
|
||||||
|
Sid = s.Sid,
|
||||||
|
Msgs = Interlocked.Read(ref s.MessageCount),
|
||||||
|
Max = s.MaxMessages,
|
||||||
|
Cid = s.Client?.Id ?? 0,
|
||||||
|
})
|
||||||
|
.ToArray();
|
||||||
|
}
|
||||||
|
|
||||||
|
return new Subsz
|
||||||
|
{
|
||||||
|
Id = server.ServerId,
|
||||||
|
Now = now,
|
||||||
|
NumSubs = numSubs,
|
||||||
|
NumCache = numCache,
|
||||||
|
Total = total,
|
||||||
|
Offset = opts.Offset,
|
||||||
|
Limit = opts.Limit,
|
||||||
|
Subs = details,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
private static SubszOptions ParseQueryParams(HttpContext ctx)
|
||||||
|
{
|
||||||
|
var q = ctx.Request.Query;
|
||||||
|
var opts = new SubszOptions();
|
||||||
|
|
||||||
|
if (q.TryGetValue("subs", out var subs))
|
||||||
|
opts.Subscriptions = subs == "true" || subs == "1" || subs == "detail";
|
||||||
|
|
||||||
|
if (q.TryGetValue("offset", out var offset) && int.TryParse(offset, out var o))
|
||||||
|
opts.Offset = o;
|
||||||
|
|
||||||
|
if (q.TryGetValue("limit", out var limit) && int.TryParse(limit, out var l))
|
||||||
|
opts.Limit = l;
|
||||||
|
|
||||||
|
if (q.TryGetValue("acc", out var acc))
|
||||||
|
opts.Account = acc.ToString();
|
||||||
|
|
||||||
|
if (q.TryGetValue("test", out var test))
|
||||||
|
opts.Test = test.ToString();
|
||||||
|
|
||||||
|
return opts;
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -64,6 +64,8 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
|
|||||||
public Action? ReOpenLogFile { get; set; }
|
public Action? ReOpenLogFile { get; set; }
|
||||||
public IEnumerable<NatsClient> GetClients() => _clients.Values;
|
public IEnumerable<NatsClient> GetClients() => _clients.Values;
|
||||||
|
|
||||||
|
public IEnumerable<Auth.Account> GetAccounts() => _accounts.Values;
|
||||||
|
|
||||||
public Task WaitForReadyAsync() => _listeningStarted.Task;
|
public Task WaitForReadyAsync() => _listeningStarted.Task;
|
||||||
|
|
||||||
public void WaitForShutdown() => _shutdownComplete.Task.GetAwaiter().GetResult();
|
public void WaitForShutdown() => _shutdownComplete.Task.GetAwaiter().GetResult();
|
||||||
|
|||||||
@@ -33,6 +33,62 @@ public sealed class SubList : IDisposable
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Returns all subscriptions in the trie. For monitoring only.
|
||||||
|
/// </summary>
|
||||||
|
public List<Subscription> GetAllSubscriptions()
|
||||||
|
{
|
||||||
|
_lock.EnterReadLock();
|
||||||
|
try
|
||||||
|
{
|
||||||
|
var result = new List<Subscription>();
|
||||||
|
CollectAll(_root, result);
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
finally
|
||||||
|
{
|
||||||
|
_lock.ExitReadLock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void CollectAll(TrieLevel level, List<Subscription> result)
|
||||||
|
{
|
||||||
|
foreach (var (_, node) in level.Nodes)
|
||||||
|
{
|
||||||
|
foreach (var sub in node.PlainSubs) result.Add(sub);
|
||||||
|
foreach (var (_, qset) in node.QueueSubs)
|
||||||
|
foreach (var sub in qset) result.Add(sub);
|
||||||
|
if (node.Next != null) CollectAll(node.Next, result);
|
||||||
|
}
|
||||||
|
if (level.Pwc != null)
|
||||||
|
{
|
||||||
|
foreach (var sub in level.Pwc.PlainSubs) result.Add(sub);
|
||||||
|
foreach (var (_, qset) in level.Pwc.QueueSubs)
|
||||||
|
foreach (var sub in qset) result.Add(sub);
|
||||||
|
if (level.Pwc.Next != null) CollectAll(level.Pwc.Next, result);
|
||||||
|
}
|
||||||
|
if (level.Fwc != null)
|
||||||
|
{
|
||||||
|
foreach (var sub in level.Fwc.PlainSubs) result.Add(sub);
|
||||||
|
foreach (var (_, qset) in level.Fwc.QueueSubs)
|
||||||
|
foreach (var sub in qset) result.Add(sub);
|
||||||
|
if (level.Fwc.Next != null) CollectAll(level.Fwc.Next, result);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Returns the current number of entries in the cache.
|
||||||
|
/// </summary>
|
||||||
|
public int CacheCount
|
||||||
|
{
|
||||||
|
get
|
||||||
|
{
|
||||||
|
_lock.EnterReadLock();
|
||||||
|
try { return _cache?.Count ?? 0; }
|
||||||
|
finally { _lock.ExitReadLock(); }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public void Insert(Subscription sub)
|
public void Insert(Subscription sub)
|
||||||
{
|
{
|
||||||
var subject = sub.Subject;
|
var subject = sub.Subject;
|
||||||
|
|||||||
137
tests/NATS.Server.Tests/SubszTests.cs
Normal file
137
tests/NATS.Server.Tests/SubszTests.cs
Normal file
@@ -0,0 +1,137 @@
|
|||||||
|
using System.Net;
|
||||||
|
using System.Net.Http.Json;
|
||||||
|
using System.Net.Sockets;
|
||||||
|
using Microsoft.Extensions.Logging.Abstractions;
|
||||||
|
using NATS.Server.Monitoring;
|
||||||
|
|
||||||
|
namespace NATS.Server.Tests;
|
||||||
|
|
||||||
|
public class SubszTests : IAsyncLifetime
|
||||||
|
{
|
||||||
|
private readonly NatsServer _server;
|
||||||
|
private readonly int _natsPort;
|
||||||
|
private readonly int _monitorPort;
|
||||||
|
private readonly CancellationTokenSource _cts = new();
|
||||||
|
private readonly HttpClient _http = new();
|
||||||
|
|
||||||
|
public SubszTests()
|
||||||
|
{
|
||||||
|
_natsPort = GetFreePort();
|
||||||
|
_monitorPort = GetFreePort();
|
||||||
|
_server = new NatsServer(
|
||||||
|
new NatsOptions { Port = _natsPort, MonitorPort = _monitorPort },
|
||||||
|
NullLoggerFactory.Instance);
|
||||||
|
}
|
||||||
|
|
||||||
|
public async Task InitializeAsync()
|
||||||
|
{
|
||||||
|
_ = _server.StartAsync(_cts.Token);
|
||||||
|
await _server.WaitForReadyAsync();
|
||||||
|
for (int i = 0; i < 50; i++)
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
var resp = await _http.GetAsync($"http://127.0.0.1:{_monitorPort}/healthz");
|
||||||
|
if (resp.IsSuccessStatusCode) break;
|
||||||
|
}
|
||||||
|
catch (HttpRequestException) { }
|
||||||
|
await Task.Delay(50);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public async Task DisposeAsync()
|
||||||
|
{
|
||||||
|
_http.Dispose();
|
||||||
|
await _cts.CancelAsync();
|
||||||
|
_server.Dispose();
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task Subz_returns_empty_when_no_subscriptions()
|
||||||
|
{
|
||||||
|
var response = await _http.GetAsync($"http://127.0.0.1:{_monitorPort}/subz");
|
||||||
|
response.StatusCode.ShouldBe(HttpStatusCode.OK);
|
||||||
|
|
||||||
|
var subz = await response.Content.ReadFromJsonAsync<Subsz>();
|
||||||
|
subz.ShouldNotBeNull();
|
||||||
|
subz.NumSubs.ShouldBe(0u);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task Subz_returns_count_with_subscriptions()
|
||||||
|
{
|
||||||
|
using var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
|
||||||
|
await sock.ConnectAsync(new IPEndPoint(IPAddress.Loopback, _natsPort));
|
||||||
|
using var stream = new NetworkStream(sock);
|
||||||
|
var buf = new byte[4096];
|
||||||
|
_ = await stream.ReadAsync(buf);
|
||||||
|
await stream.WriteAsync("CONNECT {}\r\nSUB foo 1\r\nSUB bar 2\r\nSUB baz.* 3\r\n"u8.ToArray());
|
||||||
|
await Task.Delay(200);
|
||||||
|
|
||||||
|
var response = await _http.GetAsync($"http://127.0.0.1:{_monitorPort}/subz");
|
||||||
|
var subz = await response.Content.ReadFromJsonAsync<Subsz>();
|
||||||
|
subz.ShouldNotBeNull();
|
||||||
|
subz.NumSubs.ShouldBeGreaterThanOrEqualTo(3u);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task Subz_subs_true_returns_subscription_details()
|
||||||
|
{
|
||||||
|
using var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
|
||||||
|
await sock.ConnectAsync(new IPEndPoint(IPAddress.Loopback, _natsPort));
|
||||||
|
using var stream = new NetworkStream(sock);
|
||||||
|
var buf = new byte[4096];
|
||||||
|
_ = await stream.ReadAsync(buf);
|
||||||
|
await stream.WriteAsync("CONNECT {}\r\nSUB foo 1\r\n"u8.ToArray());
|
||||||
|
await Task.Delay(200);
|
||||||
|
|
||||||
|
var response = await _http.GetAsync($"http://127.0.0.1:{_monitorPort}/subz?subs=true");
|
||||||
|
var subz = await response.Content.ReadFromJsonAsync<Subsz>();
|
||||||
|
subz.ShouldNotBeNull();
|
||||||
|
subz.Subs.ShouldNotBeEmpty();
|
||||||
|
subz.Subs.ShouldContain(s => s.Subject == "foo");
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task Subz_test_subject_filters_matching_subs()
|
||||||
|
{
|
||||||
|
using var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
|
||||||
|
await sock.ConnectAsync(new IPEndPoint(IPAddress.Loopback, _natsPort));
|
||||||
|
using var stream = new NetworkStream(sock);
|
||||||
|
var buf = new byte[4096];
|
||||||
|
_ = await stream.ReadAsync(buf);
|
||||||
|
await stream.WriteAsync("CONNECT {}\r\nSUB foo.* 1\r\nSUB bar 2\r\n"u8.ToArray());
|
||||||
|
await Task.Delay(200);
|
||||||
|
|
||||||
|
var response = await _http.GetAsync($"http://127.0.0.1:{_monitorPort}/subz?subs=true&test=foo.hello");
|
||||||
|
var subz = await response.Content.ReadFromJsonAsync<Subsz>();
|
||||||
|
subz.ShouldNotBeNull();
|
||||||
|
subz.Subs.ShouldContain(s => s.Subject == "foo.*");
|
||||||
|
subz.Subs.ShouldNotContain(s => s.Subject == "bar");
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task Subz_pagination()
|
||||||
|
{
|
||||||
|
using var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
|
||||||
|
await sock.ConnectAsync(new IPEndPoint(IPAddress.Loopback, _natsPort));
|
||||||
|
using var stream = new NetworkStream(sock);
|
||||||
|
var buf = new byte[4096];
|
||||||
|
_ = await stream.ReadAsync(buf);
|
||||||
|
await stream.WriteAsync("CONNECT {}\r\nSUB a 1\r\nSUB b 2\r\nSUB c 3\r\n"u8.ToArray());
|
||||||
|
await Task.Delay(200);
|
||||||
|
|
||||||
|
var response = await _http.GetAsync($"http://127.0.0.1:{_monitorPort}/subz?subs=true&offset=0&limit=2");
|
||||||
|
var subz = await response.Content.ReadFromJsonAsync<Subsz>();
|
||||||
|
subz.ShouldNotBeNull();
|
||||||
|
subz.Subs.Length.ShouldBe(2);
|
||||||
|
subz.Total.ShouldBeGreaterThanOrEqualTo(3);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static int GetFreePort()
|
||||||
|
{
|
||||||
|
using var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
|
||||||
|
sock.Bind(new IPEndPoint(IPAddress.Loopback, 0));
|
||||||
|
return ((IPEndPoint)sock.LocalEndPoint!).Port;
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user