diff --git a/src/NATS.Server/Monitoring/JszHandler.cs b/src/NATS.Server/Monitoring/JszHandler.cs new file mode 100644 index 0000000..b45f661 --- /dev/null +++ b/src/NATS.Server/Monitoring/JszHandler.cs @@ -0,0 +1,62 @@ +using System.Text.Json.Serialization; + +namespace NATS.Server.Monitoring; + +public sealed class JszHandler +{ + private readonly NatsServer _server; + private readonly NatsOptions _options; + + public JszHandler(NatsServer server, NatsOptions options) + { + _server = server; + _options = options; + } + + public JszResponse Build() + { + return new JszResponse + { + ServerId = _server.ServerId, + Now = DateTime.UtcNow, + Enabled = _server.Stats.JetStreamEnabled, + Memory = 0, + Storage = 0, + Streams = _server.JetStreamStreams, + Consumers = _server.JetStreamConsumers, + Config = new JetStreamConfig + { + MaxMemory = _options.JetStream?.MaxMemoryStore ?? 0, + MaxStorage = _options.JetStream?.MaxFileStore ?? 0, + StoreDir = _options.JetStream?.StoreDir ?? string.Empty, + }, + }; + } +} + +public sealed class JszResponse +{ + [JsonPropertyName("server_id")] + public string ServerId { get; set; } = string.Empty; + + [JsonPropertyName("now")] + public DateTime Now { get; set; } + + [JsonPropertyName("enabled")] + public bool Enabled { get; set; } + + [JsonPropertyName("memory")] + public ulong Memory { get; set; } + + [JsonPropertyName("storage")] + public ulong Storage { get; set; } + + [JsonPropertyName("streams")] + public int Streams { get; set; } + + [JsonPropertyName("consumers")] + public int Consumers { get; set; } + + [JsonPropertyName("config")] + public JetStreamConfig Config { get; set; } = new(); +} diff --git a/src/NATS.Server/Monitoring/MonitorServer.cs b/src/NATS.Server/Monitoring/MonitorServer.cs index ad06f91..829d29a 100644 --- a/src/NATS.Server/Monitoring/MonitorServer.cs +++ b/src/NATS.Server/Monitoring/MonitorServer.cs @@ -16,6 +16,7 @@ public sealed class MonitorServer : IAsyncDisposable private readonly VarzHandler _varzHandler; private readonly ConnzHandler _connzHandler; private readonly SubszHandler _subszHandler; + private readonly JszHandler _jszHandler; public MonitorServer(NatsServer server, NatsOptions options, ServerStats stats, ILoggerFactory loggerFactory) { @@ -31,6 +32,7 @@ public sealed class MonitorServer : IAsyncDisposable _varzHandler = new VarzHandler(server, options); _connzHandler = new ConnzHandler(server); _subszHandler = new SubszHandler(server); + _jszHandler = new JszHandler(server, options); _app.MapGet(basePath + "/", () => { @@ -100,7 +102,7 @@ public sealed class MonitorServer : IAsyncDisposable _app.MapGet(basePath + "/jsz", () => { stats.HttpReqStats.AddOrUpdate("/jsz", 1, (_, v) => v + 1); - return Results.Ok(new { }); + return Results.Ok(_jszHandler.Build()); }); } diff --git a/src/NATS.Server/Monitoring/Varz.cs b/src/NATS.Server/Monitoring/Varz.cs index 3e85374..e845746 100644 --- a/src/NATS.Server/Monitoring/Varz.cs +++ b/src/NATS.Server/Monitoring/Varz.cs @@ -422,6 +422,12 @@ public sealed class JetStreamStats [JsonPropertyName("ha_assets")] public int HaAssets { get; set; } + [JsonPropertyName("streams")] + public int Streams { get; set; } + + [JsonPropertyName("consumers")] + public int Consumers { get; set; } + [JsonPropertyName("api")] public JetStreamApiStats Api { get; set; } = new(); } diff --git a/src/NATS.Server/Monitoring/VarzHandler.cs b/src/NATS.Server/Monitoring/VarzHandler.cs index 3bdbe6d..3a581bd 100644 --- a/src/NATS.Server/Monitoring/VarzHandler.cs +++ b/src/NATS.Server/Monitoring/VarzHandler.cs @@ -121,6 +121,22 @@ public sealed class VarzHandler : IDisposable Subscriptions = _server.SubList.Count, ConfigLoadTime = _server.StartTime, HttpReqStats = stats.HttpReqStats.ToDictionary(kv => kv.Key, kv => (ulong)kv.Value), + JetStream = new JetStreamVarz + { + Config = new JetStreamConfig + { + MaxMemory = _options.JetStream?.MaxMemoryStore ?? 0, + MaxStorage = _options.JetStream?.MaxFileStore ?? 0, + StoreDir = _options.JetStream?.StoreDir ?? string.Empty, + }, + Stats = new JetStreamStats + { + Accounts = _options.JetStream is null ? 0 : 1, + HaAssets = _server.JetStreamStreams, + Streams = _server.JetStreamStreams, + Consumers = _server.JetStreamConsumers, + }, + }, }; } finally diff --git a/src/NATS.Server/NatsServer.cs b/src/NATS.Server/NatsServer.cs index d7396c0..293b4b8 100644 --- a/src/NATS.Server/NatsServer.cs +++ b/src/NATS.Server/NatsServer.cs @@ -91,6 +91,8 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable public bool IsLameDuckMode => Volatile.Read(ref _lameDuck) != 0; public string? ClusterListen => _routeManager?.ListenEndpoint; public JetStreamApiRouter? JetStreamApiRouter => _jetStreamApiRouter; + public int JetStreamStreams => _jetStreamStreamManager?.StreamNames.Count ?? 0; + public int JetStreamConsumers => _jetStreamConsumerManager?.ConsumerCount ?? 0; public Action? ReOpenLogFile { get; set; } public IEnumerable GetClients() => _clients.Values; diff --git a/tests/NATS.Server.Tests/JszMonitorTests.cs b/tests/NATS.Server.Tests/JszMonitorTests.cs new file mode 100644 index 0000000..cb05804 --- /dev/null +++ b/tests/NATS.Server.Tests/JszMonitorTests.cs @@ -0,0 +1,112 @@ +using System.Net; +using System.Net.Http.Json; +using System.Net.Sockets; +using System.Text; +using Microsoft.Extensions.Logging.Abstractions; +using NATS.Server.Configuration; +using NATS.Server.Monitoring; + +namespace NATS.Server.Tests; + +public class JszMonitorTests +{ + [Fact] + public async Task Jsz_reports_live_stream_and_consumer_counts() + { + await using var fixture = await JetStreamMonitoringFixture.StartWithStreamAndConsumerAsync(); + + var jsz = await fixture.GetJszAsync(); + jsz.Streams.ShouldBeGreaterThan(0); + jsz.Consumers.ShouldBeGreaterThan(0); + } +} + +internal sealed class JetStreamMonitoringFixture : IAsyncDisposable +{ + private readonly NatsServer _server; + private readonly int _monitorPort; + private readonly CancellationTokenSource _cts = new(); + private readonly HttpClient _http = new(); + + private JetStreamMonitoringFixture(NatsServer server, int monitorPort) + { + _server = server; + _monitorPort = monitorPort; + } + + public static async Task StartWithStreamAndConsumerAsync() + { + var natsPort = GetFreePort(); + var monitorPort = GetFreePort(); + var options = new NatsOptions + { + Host = "127.0.0.1", + Port = natsPort, + MonitorHost = "127.0.0.1", + MonitorPort = monitorPort, + JetStream = new JetStreamOptions + { + StoreDir = Path.Combine(Path.GetTempPath(), "natsdotnet-jsz"), + MaxMemoryStore = 1_024 * 1_024, + MaxFileStore = 10 * 1_024 * 1_024, + }, + }; + + var server = new NatsServer(options, NullLoggerFactory.Instance); + var fixture = new JetStreamMonitoringFixture(server, monitorPort); + + _ = server.StartAsync(fixture._cts.Token); + await server.WaitForReadyAsync(); + await fixture.WaitForHealthAsync(); + + var router = server.JetStreamApiRouter ?? throw new InvalidOperationException("JetStream API router unavailable."); + _ = router.Route("$JS.API.STREAM.CREATE.ORDERS", Encoding.UTF8.GetBytes("{\"name\":\"ORDERS\",\"subjects\":[\"orders.*\"]}")); + _ = router.Route("$JS.API.CONSUMER.CREATE.ORDERS.DUR", Encoding.UTF8.GetBytes("{\"durable_name\":\"DUR\",\"filter_subject\":\"orders.*\"}")); + + return fixture; + } + + public async Task GetJszAsync() + { + var response = await _http.GetAsync($"http://127.0.0.1:{_monitorPort}/jsz"); + response.StatusCode.ShouldBe(HttpStatusCode.OK); + + var jsz = await response.Content.ReadFromJsonAsync(); + return jsz ?? throw new InvalidOperationException("Failed to deserialize /jsz."); + } + + public async ValueTask DisposeAsync() + { + _http.Dispose(); + await _cts.CancelAsync(); + _server.Dispose(); + } + + private async Task WaitForHealthAsync() + { + for (int i = 0; i < 50; i++) + { + try + { + var response = await _http.GetAsync($"http://127.0.0.1:{_monitorPort}/healthz"); + if (response.IsSuccessStatusCode) + return; + } + catch (HttpRequestException) + { + // server not ready + } + + await Task.Delay(50); + } + + throw new TimeoutException("Monitoring endpoint did not become healthy."); + } + + private static int GetFreePort() + { + using var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); + sock.Bind(new System.Net.IPEndPoint(System.Net.IPAddress.Loopback, 0)); + return ((System.Net.IPEndPoint)sock.LocalEndPoint!).Port; + } +}