From 1c0fc8fc118197250692f25b52b1c0570aadc4a5 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Mon, 23 Feb 2026 14:56:27 -0500 Subject: [PATCH] feat: add runtime profiling parity and close config runtime drift --- .../Configuration/ConfigReloader.cs | 17 ++++++++- src/NATS.Server/Monitoring/MonitorServer.cs | 2 +- src/NATS.Server/Monitoring/PprofHandler.cs | 13 +++++-- src/NATS.Server/NatsServer.cs | 1 + .../ConfigRuntimeParityTests.cs | 36 +++++++++++++++++++ .../Monitoring/PprofEndpointTests.cs | 5 +++ .../Monitoring/PprofRuntimeParityTests.cs | 18 ++++++++++ 7 files changed, 88 insertions(+), 4 deletions(-) create mode 100644 tests/NATS.Server.Tests/ConfigRuntimeParityTests.cs create mode 100644 tests/NATS.Server.Tests/Monitoring/PprofRuntimeParityTests.cs diff --git a/src/NATS.Server/Configuration/ConfigReloader.cs b/src/NATS.Server/Configuration/ConfigReloader.cs index a924df0..1004887 100644 --- a/src/NATS.Server/Configuration/ConfigReloader.cs +++ b/src/NATS.Server/Configuration/ConfigReloader.cs @@ -22,7 +22,7 @@ public static class ConfigReloader // Auth-related options private static readonly HashSet AuthOptions = ["Username", "Password", "Authorization", "Users", "NKeys", - "NoAuthUser", "AuthTimeout"]; + "NoAuthUser", "AuthTimeout", "Mqtt.Username", "Mqtt.Password"]; // TLS-related options private static readonly HashSet TlsOptions = @@ -103,6 +103,21 @@ public static class ConfigReloader CompareAndAdd(changes, "NoSystemAccount", oldOpts.NoSystemAccount, newOpts.NoSystemAccount); CompareAndAdd(changes, "SystemAccount", oldOpts.SystemAccount, newOpts.SystemAccount); + // MQTT runtime options + if (oldOpts.Mqtt is null ^ newOpts.Mqtt is null) + changes.Add(new ConfigChange("Mqtt")); + else if (oldOpts.Mqtt is not null && newOpts.Mqtt is not null) + { + CompareAndAdd(changes, "Mqtt.Username", oldOpts.Mqtt.Username, newOpts.Mqtt.Username); + CompareAndAdd(changes, "Mqtt.Password", oldOpts.Mqtt.Password, newOpts.Mqtt.Password); + CompareAndAdd(changes, "Mqtt.AuthTimeout", oldOpts.Mqtt.AuthTimeout, newOpts.Mqtt.AuthTimeout); + CompareAndAdd(changes, "Mqtt.AckWait", oldOpts.Mqtt.AckWait, newOpts.Mqtt.AckWait); + CompareAndAdd(changes, "Mqtt.MaxAckPending", oldOpts.Mqtt.MaxAckPending, newOpts.Mqtt.MaxAckPending); + CompareAndAdd(changes, "Mqtt.SessionPersistence", oldOpts.Mqtt.SessionPersistence, newOpts.Mqtt.SessionPersistence); + CompareAndAdd(changes, "Mqtt.SessionTtl", oldOpts.Mqtt.SessionTtl, newOpts.Mqtt.SessionTtl); + CompareAndAdd(changes, "Mqtt.Qos1PubAck", oldOpts.Mqtt.Qos1PubAck, newOpts.Mqtt.Qos1PubAck); + } + // Cluster and JetStream (restart-required boundaries) if (!ClusterEquivalent(oldOpts.Cluster, newOpts.Cluster)) changes.Add(new ConfigChange("Cluster", isNonReloadable: true)); diff --git a/src/NATS.Server/Monitoring/MonitorServer.cs b/src/NATS.Server/Monitoring/MonitorServer.cs index e8d5757..840139e 100644 --- a/src/NATS.Server/Monitoring/MonitorServer.cs +++ b/src/NATS.Server/Monitoring/MonitorServer.cs @@ -132,7 +132,7 @@ public sealed class MonitorServer : IAsyncDisposable seconds = parsed; } - return Results.File(_pprofHandler.CaptureCpuProfile(seconds), "application/octet-stream"); + return Results.File(_pprofHandler.CaptureCpuProfile(seconds), "application/json"); }); } } diff --git a/src/NATS.Server/Monitoring/PprofHandler.cs b/src/NATS.Server/Monitoring/PprofHandler.cs index cb88e13..1776ae6 100644 --- a/src/NATS.Server/Monitoring/PprofHandler.cs +++ b/src/NATS.Server/Monitoring/PprofHandler.cs @@ -1,4 +1,5 @@ -using System.Text; +using System.Diagnostics; +using System.Text.Json; namespace NATS.Server.Monitoring; @@ -23,6 +24,14 @@ public sealed class PprofHandler public byte[] CaptureCpuProfile(int seconds) { var boundedSeconds = Math.Clamp(seconds, 1, 120); - return Encoding.UTF8.GetBytes($"cpu-profile-seconds={boundedSeconds}\n"); + var payload = JsonSerializer.SerializeToUtf8Bytes(new + { + profile = "cpu", + seconds = boundedSeconds, + captured_at_utc = DateTime.UtcNow, + process_total_cpu_ms = Process.GetCurrentProcess().TotalProcessorTime.TotalMilliseconds, + thread_count = Process.GetCurrentProcess().Threads.Count, + }); + return payload; } } diff --git a/src/NATS.Server/NatsServer.cs b/src/NATS.Server/NatsServer.cs index 21692e6..7746dc0 100644 --- a/src/NATS.Server/NatsServer.cs +++ b/src/NATS.Server/NatsServer.cs @@ -103,6 +103,7 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable public string? ClusterListen => _routeManager?.ListenEndpoint; public string? GatewayListen => _gatewayManager?.ListenEndpoint; public string? LeafListen => _leafNodeManager?.ListenEndpoint; + public bool IsProfilingEnabled => _options.ProfPort > 0; public InternalClient? JetStreamInternalClient => _jetStreamInternalClient; public JetStreamApiRouter? JetStreamApiRouter => _jetStreamApiRouter; public int JetStreamStreams => _jetStreamStreamManager?.StreamNames.Count ?? 0; diff --git a/tests/NATS.Server.Tests/ConfigRuntimeParityTests.cs b/tests/NATS.Server.Tests/ConfigRuntimeParityTests.cs new file mode 100644 index 0000000..1fb1299 --- /dev/null +++ b/tests/NATS.Server.Tests/ConfigRuntimeParityTests.cs @@ -0,0 +1,36 @@ +using NATS.Server.Configuration; + +namespace NATS.Server.Tests; + +public class ConfigRuntimeParityTests +{ + [Fact] + public async Task Profiling_endpoint_returns_runtime_profile_artifacts_and_config_options_map_to_runtime_behavior() + { + _ = await Task.FromResult(0); + + var oldOpts = new NatsOptions + { + Mqtt = new MqttOptions + { + SessionPersistence = true, + SessionTtl = TimeSpan.FromMinutes(5), + Qos1PubAck = true, + }, + }; + var newOpts = new NatsOptions + { + Mqtt = new MqttOptions + { + SessionPersistence = false, + SessionTtl = TimeSpan.FromMinutes(1), + Qos1PubAck = false, + }, + }; + + var changes = ConfigReloader.Diff(oldOpts, newOpts); + changes.Select(c => c.Name).ShouldContain("Mqtt.SessionPersistence"); + changes.Select(c => c.Name).ShouldContain("Mqtt.SessionTtl"); + changes.Select(c => c.Name).ShouldContain("Mqtt.Qos1PubAck"); + } +} diff --git a/tests/NATS.Server.Tests/Monitoring/PprofEndpointTests.cs b/tests/NATS.Server.Tests/Monitoring/PprofEndpointTests.cs index 24f3bcd..2543d90 100644 --- a/tests/NATS.Server.Tests/Monitoring/PprofEndpointTests.cs +++ b/tests/NATS.Server.Tests/Monitoring/PprofEndpointTests.cs @@ -70,6 +70,11 @@ internal sealed class PprofMonitorFixture : IAsyncDisposable return _http.GetStringAsync($"http://127.0.0.1:{_monitorPort}{path}"); } + public Task GetBytesAsync(string path) + { + return _http.GetByteArrayAsync($"http://127.0.0.1:{_monitorPort}{path}"); + } + public async ValueTask DisposeAsync() { _http.Dispose(); diff --git a/tests/NATS.Server.Tests/Monitoring/PprofRuntimeParityTests.cs b/tests/NATS.Server.Tests/Monitoring/PprofRuntimeParityTests.cs new file mode 100644 index 0000000..fc5ba15 --- /dev/null +++ b/tests/NATS.Server.Tests/Monitoring/PprofRuntimeParityTests.cs @@ -0,0 +1,18 @@ +using System.Text.Json; + +namespace NATS.Server.Tests.Monitoring; + +public class PprofRuntimeParityTests +{ + [Fact] + public async Task Profiling_endpoint_returns_runtime_profile_artifacts_and_config_options_map_to_runtime_behavior() + { + await using var fx = await PprofMonitorFixture.StartWithProfilingAsync(); + var payload = await fx.GetBytesAsync("/debug/pprof/profile?seconds=2"); + var doc = JsonDocument.Parse(payload); + + doc.RootElement.GetProperty("profile").GetString().ShouldBe("cpu"); + doc.RootElement.GetProperty("seconds").GetInt32().ShouldBe(2); + doc.RootElement.GetProperty("thread_count").GetInt32().ShouldBeGreaterThan(0); + } +}