diff --git a/src/NATS.Server/Monitoring/MonitorServer.cs b/src/NATS.Server/Monitoring/MonitorServer.cs index af23506..ad06f91 100644 --- a/src/NATS.Server/Monitoring/MonitorServer.cs +++ b/src/NATS.Server/Monitoring/MonitorServer.cs @@ -15,6 +15,7 @@ public sealed class MonitorServer : IAsyncDisposable private readonly ILogger _logger; private readonly VarzHandler _varzHandler; private readonly ConnzHandler _connzHandler; + private readonly SubszHandler _subszHandler; public MonitorServer(NatsServer server, NatsOptions options, ServerStats stats, ILoggerFactory loggerFactory) { @@ -29,6 +30,7 @@ public sealed class MonitorServer : IAsyncDisposable _varzHandler = new VarzHandler(server, options); _connzHandler = new ConnzHandler(server); + _subszHandler = new SubszHandler(server); _app.MapGet(basePath + "/", () => { @@ -75,15 +77,15 @@ public sealed class MonitorServer : IAsyncDisposable stats.HttpReqStats.AddOrUpdate("/leafz", 1, (_, v) => v + 1); return Results.Ok(new { }); }); - _app.MapGet(basePath + "/subz", () => + _app.MapGet(basePath + "/subz", (HttpContext ctx) => { stats.HttpReqStats.AddOrUpdate("/subz", 1, (_, v) => v + 1); - return Results.Ok(new { }); + return Results.Ok(_subszHandler.HandleSubsz(ctx)); }); - _app.MapGet(basePath + "/subscriptionsz", () => + _app.MapGet(basePath + "/subscriptionsz", (HttpContext ctx) => { stats.HttpReqStats.AddOrUpdate("/subscriptionsz", 1, (_, v) => v + 1); - return Results.Ok(new { }); + return Results.Ok(_subszHandler.HandleSubsz(ctx)); }); _app.MapGet(basePath + "/accountz", () => { diff --git a/src/NATS.Server/Monitoring/Subsz.cs b/src/NATS.Server/Monitoring/Subsz.cs new file mode 100644 index 0000000..472b728 --- /dev/null +++ b/src/NATS.Server/Monitoring/Subsz.cs @@ -0,0 +1,45 @@ +using System.Text.Json.Serialization; + +namespace NATS.Server.Monitoring; + +/// +/// Subscription information response. Corresponds to Go server/monitor.go Subsz struct. +/// +public sealed class Subsz +{ + [JsonPropertyName("server_id")] + public string Id { get; set; } = ""; + + [JsonPropertyName("now")] + public DateTime Now { get; set; } + + [JsonPropertyName("num_subscriptions")] + public uint NumSubs { get; set; } + + [JsonPropertyName("num_cache")] + public int NumCache { get; set; } + + [JsonPropertyName("total")] + public int Total { get; set; } + + [JsonPropertyName("offset")] + public int Offset { get; set; } + + [JsonPropertyName("limit")] + public int Limit { get; set; } + + [JsonPropertyName("subscriptions")] + public SubDetail[] Subs { get; set; } = []; +} + +/// +/// Options passed to Subsz() for filtering. +/// +public sealed class SubszOptions +{ + public int Offset { get; set; } + public int Limit { get; set; } = 1024; + public bool Subscriptions { get; set; } + public string Account { get; set; } = ""; + public string Test { get; set; } = ""; +} diff --git a/src/NATS.Server/Monitoring/SubszHandler.cs b/src/NATS.Server/Monitoring/SubszHandler.cs new file mode 100644 index 0000000..1de4f97 --- /dev/null +++ b/src/NATS.Server/Monitoring/SubszHandler.cs @@ -0,0 +1,93 @@ +using Microsoft.AspNetCore.Http; +using NATS.Server.Subscriptions; + +namespace NATS.Server.Monitoring; + +/// +/// Handles /subz endpoint requests, returning subscription information. +/// Corresponds to Go server/monitor.go handleSubsz. +/// +public sealed class SubszHandler(NatsServer server) +{ + public Subsz HandleSubsz(HttpContext ctx) + { + var opts = ParseQueryParams(ctx); + var now = DateTime.UtcNow; + + // Collect subscriptions from all accounts (or filtered) + var allSubs = new List(); + foreach (var account in server.GetAccounts()) + { + if (!string.IsNullOrEmpty(opts.Account) && account.Name != opts.Account) + continue; + allSubs.AddRange(account.SubList.GetAllSubscriptions()); + } + + // Filter by test subject if provided + if (!string.IsNullOrEmpty(opts.Test)) + { + allSubs = allSubs.Where(s => SubjectMatch.MatchLiteral(opts.Test, s.Subject)).ToList(); + } + + var total = allSubs.Count; + var numSubs = server.GetAccounts() + .Where(a => string.IsNullOrEmpty(opts.Account) || a.Name == opts.Account) + .Aggregate(0u, (sum, a) => sum + a.SubList.Count); + var numCache = server.GetAccounts() + .Where(a => string.IsNullOrEmpty(opts.Account) || a.Name == opts.Account) + .Sum(a => a.SubList.CacheCount); + + SubDetail[] details = []; + if (opts.Subscriptions) + { + details = allSubs + .Skip(opts.Offset) + .Take(opts.Limit) + .Select(s => new SubDetail + { + Subject = s.Subject, + Queue = s.Queue ?? "", + Sid = s.Sid, + Msgs = Interlocked.Read(ref s.MessageCount), + Max = s.MaxMessages, + Cid = s.Client?.Id ?? 0, + }) + .ToArray(); + } + + return new Subsz + { + Id = server.ServerId, + Now = now, + NumSubs = numSubs, + NumCache = numCache, + Total = total, + Offset = opts.Offset, + Limit = opts.Limit, + Subs = details, + }; + } + + private static SubszOptions ParseQueryParams(HttpContext ctx) + { + var q = ctx.Request.Query; + var opts = new SubszOptions(); + + if (q.TryGetValue("subs", out var subs)) + opts.Subscriptions = subs == "true" || subs == "1" || subs == "detail"; + + if (q.TryGetValue("offset", out var offset) && int.TryParse(offset, out var o)) + opts.Offset = o; + + if (q.TryGetValue("limit", out var limit) && int.TryParse(limit, out var l)) + opts.Limit = l; + + if (q.TryGetValue("acc", out var acc)) + opts.Account = acc.ToString(); + + if (q.TryGetValue("test", out var test)) + opts.Test = test.ToString(); + + return opts; + } +} diff --git a/src/NATS.Server/NatsServer.cs b/src/NATS.Server/NatsServer.cs index c80fa74..2eff8de 100644 --- a/src/NATS.Server/NatsServer.cs +++ b/src/NATS.Server/NatsServer.cs @@ -64,6 +64,8 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable public Action? ReOpenLogFile { get; set; } public IEnumerable GetClients() => _clients.Values; + public IEnumerable GetAccounts() => _accounts.Values; + public Task WaitForReadyAsync() => _listeningStarted.Task; public void WaitForShutdown() => _shutdownComplete.Task.GetAwaiter().GetResult(); diff --git a/src/NATS.Server/Subscriptions/SubList.cs b/src/NATS.Server/Subscriptions/SubList.cs index 047bef2..b999403 100644 --- a/src/NATS.Server/Subscriptions/SubList.cs +++ b/src/NATS.Server/Subscriptions/SubList.cs @@ -33,6 +33,62 @@ public sealed class SubList : IDisposable } } + /// + /// Returns all subscriptions in the trie. For monitoring only. + /// + public List GetAllSubscriptions() + { + _lock.EnterReadLock(); + try + { + var result = new List(); + CollectAll(_root, result); + return result; + } + finally + { + _lock.ExitReadLock(); + } + } + + private static void CollectAll(TrieLevel level, List result) + { + foreach (var (_, node) in level.Nodes) + { + foreach (var sub in node.PlainSubs) result.Add(sub); + foreach (var (_, qset) in node.QueueSubs) + foreach (var sub in qset) result.Add(sub); + if (node.Next != null) CollectAll(node.Next, result); + } + if (level.Pwc != null) + { + foreach (var sub in level.Pwc.PlainSubs) result.Add(sub); + foreach (var (_, qset) in level.Pwc.QueueSubs) + foreach (var sub in qset) result.Add(sub); + if (level.Pwc.Next != null) CollectAll(level.Pwc.Next, result); + } + if (level.Fwc != null) + { + foreach (var sub in level.Fwc.PlainSubs) result.Add(sub); + foreach (var (_, qset) in level.Fwc.QueueSubs) + foreach (var sub in qset) result.Add(sub); + if (level.Fwc.Next != null) CollectAll(level.Fwc.Next, result); + } + } + + /// + /// Returns the current number of entries in the cache. + /// + public int CacheCount + { + get + { + _lock.EnterReadLock(); + try { return _cache?.Count ?? 0; } + finally { _lock.ExitReadLock(); } + } + } + public void Insert(Subscription sub) { var subject = sub.Subject; diff --git a/tests/NATS.Server.Tests/SubszTests.cs b/tests/NATS.Server.Tests/SubszTests.cs new file mode 100644 index 0000000..b049181 --- /dev/null +++ b/tests/NATS.Server.Tests/SubszTests.cs @@ -0,0 +1,137 @@ +using System.Net; +using System.Net.Http.Json; +using System.Net.Sockets; +using Microsoft.Extensions.Logging.Abstractions; +using NATS.Server.Monitoring; + +namespace NATS.Server.Tests; + +public class SubszTests : IAsyncLifetime +{ + private readonly NatsServer _server; + private readonly int _natsPort; + private readonly int _monitorPort; + private readonly CancellationTokenSource _cts = new(); + private readonly HttpClient _http = new(); + + public SubszTests() + { + _natsPort = GetFreePort(); + _monitorPort = GetFreePort(); + _server = new NatsServer( + new NatsOptions { Port = _natsPort, MonitorPort = _monitorPort }, + NullLoggerFactory.Instance); + } + + public async Task InitializeAsync() + { + _ = _server.StartAsync(_cts.Token); + await _server.WaitForReadyAsync(); + for (int i = 0; i < 50; i++) + { + try + { + var resp = await _http.GetAsync($"http://127.0.0.1:{_monitorPort}/healthz"); + if (resp.IsSuccessStatusCode) break; + } + catch (HttpRequestException) { } + await Task.Delay(50); + } + } + + public async Task DisposeAsync() + { + _http.Dispose(); + await _cts.CancelAsync(); + _server.Dispose(); + } + + [Fact] + public async Task Subz_returns_empty_when_no_subscriptions() + { + var response = await _http.GetAsync($"http://127.0.0.1:{_monitorPort}/subz"); + response.StatusCode.ShouldBe(HttpStatusCode.OK); + + var subz = await response.Content.ReadFromJsonAsync(); + subz.ShouldNotBeNull(); + subz.NumSubs.ShouldBe(0u); + } + + [Fact] + public async Task Subz_returns_count_with_subscriptions() + { + using var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); + await sock.ConnectAsync(new IPEndPoint(IPAddress.Loopback, _natsPort)); + using var stream = new NetworkStream(sock); + var buf = new byte[4096]; + _ = await stream.ReadAsync(buf); + await stream.WriteAsync("CONNECT {}\r\nSUB foo 1\r\nSUB bar 2\r\nSUB baz.* 3\r\n"u8.ToArray()); + await Task.Delay(200); + + var response = await _http.GetAsync($"http://127.0.0.1:{_monitorPort}/subz"); + var subz = await response.Content.ReadFromJsonAsync(); + subz.ShouldNotBeNull(); + subz.NumSubs.ShouldBeGreaterThanOrEqualTo(3u); + } + + [Fact] + public async Task Subz_subs_true_returns_subscription_details() + { + using var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); + await sock.ConnectAsync(new IPEndPoint(IPAddress.Loopback, _natsPort)); + using var stream = new NetworkStream(sock); + var buf = new byte[4096]; + _ = await stream.ReadAsync(buf); + await stream.WriteAsync("CONNECT {}\r\nSUB foo 1\r\n"u8.ToArray()); + await Task.Delay(200); + + var response = await _http.GetAsync($"http://127.0.0.1:{_monitorPort}/subz?subs=true"); + var subz = await response.Content.ReadFromJsonAsync(); + subz.ShouldNotBeNull(); + subz.Subs.ShouldNotBeEmpty(); + subz.Subs.ShouldContain(s => s.Subject == "foo"); + } + + [Fact] + public async Task Subz_test_subject_filters_matching_subs() + { + using var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); + await sock.ConnectAsync(new IPEndPoint(IPAddress.Loopback, _natsPort)); + using var stream = new NetworkStream(sock); + var buf = new byte[4096]; + _ = await stream.ReadAsync(buf); + await stream.WriteAsync("CONNECT {}\r\nSUB foo.* 1\r\nSUB bar 2\r\n"u8.ToArray()); + await Task.Delay(200); + + var response = await _http.GetAsync($"http://127.0.0.1:{_monitorPort}/subz?subs=true&test=foo.hello"); + var subz = await response.Content.ReadFromJsonAsync(); + subz.ShouldNotBeNull(); + subz.Subs.ShouldContain(s => s.Subject == "foo.*"); + subz.Subs.ShouldNotContain(s => s.Subject == "bar"); + } + + [Fact] + public async Task Subz_pagination() + { + using var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); + await sock.ConnectAsync(new IPEndPoint(IPAddress.Loopback, _natsPort)); + using var stream = new NetworkStream(sock); + var buf = new byte[4096]; + _ = await stream.ReadAsync(buf); + await stream.WriteAsync("CONNECT {}\r\nSUB a 1\r\nSUB b 2\r\nSUB c 3\r\n"u8.ToArray()); + await Task.Delay(200); + + var response = await _http.GetAsync($"http://127.0.0.1:{_monitorPort}/subz?subs=true&offset=0&limit=2"); + var subz = await response.Content.ReadFromJsonAsync(); + subz.ShouldNotBeNull(); + subz.Subs.Length.ShouldBe(2); + subz.Total.ShouldBeGreaterThanOrEqualTo(3); + } + + private static int GetFreePort() + { + using var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); + sock.Bind(new IPEndPoint(IPAddress.Loopback, 0)); + return ((IPEndPoint)sock.LocalEndPoint!).Port; + } +}