feat(batch18): implement group-a server core helpers
This commit is contained in:
@@ -265,6 +265,22 @@ public sealed partial class NatsServer
|
|||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Returns S2 writer options for the selected route compression mode.
|
||||||
|
/// Mirrors Go <c>s2WriterOptions</c>.
|
||||||
|
/// </summary>
|
||||||
|
internal static string[]? S2WriterOptions(string cm)
|
||||||
|
{
|
||||||
|
var opts = new List<string> { "writer_concurrency=1" };
|
||||||
|
return cm switch
|
||||||
|
{
|
||||||
|
CompressionMode.S2Uncompressed => [.. opts, "writer_uncompressed"],
|
||||||
|
CompressionMode.S2Best => [.. opts, "writer_best_compression"],
|
||||||
|
CompressionMode.S2Better => [.. opts, "writer_better_compression"],
|
||||||
|
_ => null,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
// =========================================================================
|
// =========================================================================
|
||||||
// Factory methods (features 2983–2985)
|
// Factory methods (features 2983–2985)
|
||||||
// =========================================================================
|
// =========================================================================
|
||||||
@@ -509,20 +525,27 @@ public sealed partial class NatsServer
|
|||||||
/// Background loop that logs TLS rate-limited connection rejections every second.
|
/// Background loop that logs TLS rate-limited connection rejections every second.
|
||||||
/// Mirrors Go <c>Server.logRejectedTLSConns</c>.
|
/// Mirrors Go <c>Server.logRejectedTLSConns</c>.
|
||||||
/// </summary>
|
/// </summary>
|
||||||
internal async Task LogRejectedTlsConnsAsync(CancellationToken ct)
|
internal Task LogRejectedTlsConnsAsync(CancellationToken ct) =>
|
||||||
|
LogRejectedTLSConns(ct);
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Background loop that logs TLS rate-limited connection rejections every second.
|
||||||
|
/// Mirrors Go <c>Server.logRejectedTLSConns</c>.
|
||||||
|
/// </summary>
|
||||||
|
internal async Task LogRejectedTLSConns(CancellationToken ct, TimeSpan? interval = null)
|
||||||
{
|
{
|
||||||
using var timer = new PeriodicTimer(TimeSpan.FromSeconds(1));
|
using var timer = new PeriodicTimer(interval ?? TimeSpan.FromSeconds(1));
|
||||||
while (!ct.IsCancellationRequested)
|
while (!ct.IsCancellationRequested)
|
||||||
{
|
{
|
||||||
try { await timer.WaitForNextTickAsync(ct); }
|
|
||||||
catch (OperationCanceledException) { break; }
|
|
||||||
|
|
||||||
if (_connRateCounter is not null)
|
if (_connRateCounter is not null)
|
||||||
{
|
{
|
||||||
var blocked = _connRateCounter.CountBlocked();
|
var blocked = _connRateCounter.CountBlocked();
|
||||||
if (blocked > 0)
|
if (blocked > 0)
|
||||||
Warnf("Rejected {0} connections due to TLS rate limiting", blocked);
|
Warnf("Rejected {0} connections due to TLS rate limiting", blocked);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
try { await timer.WaitForNextTickAsync(ct); }
|
||||||
|
catch (OperationCanceledException) { break; }
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -178,7 +178,13 @@ public sealed partial class NatsServer
|
|||||||
/// Returns true if the goroutine was started, false if the server is already stopped.
|
/// Returns true if the goroutine was started, false if the server is already stopped.
|
||||||
/// Mirrors Go <c>Server.startGoRoutine(f)</c>.
|
/// Mirrors Go <c>Server.startGoRoutine(f)</c>.
|
||||||
/// </summary>
|
/// </summary>
|
||||||
internal bool StartGoRoutine(Action f)
|
internal bool StartGoRoutine(Action f) => StartGoRoutine(f, []);
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Starts a background Task with goroutine labels.
|
||||||
|
/// Mirrors Go <c>Server.startGoRoutine(f, tags...)</c>.
|
||||||
|
/// </summary>
|
||||||
|
internal bool StartGoRoutine(Action f, params IReadOnlyDictionary<string, string>[] tags)
|
||||||
{
|
{
|
||||||
lock (_grMu)
|
lock (_grMu)
|
||||||
{
|
{
|
||||||
@@ -186,13 +192,41 @@ public sealed partial class NatsServer
|
|||||||
_grWg.Add(1);
|
_grWg.Add(1);
|
||||||
Task.Run(() =>
|
Task.Run(() =>
|
||||||
{
|
{
|
||||||
try { f(); }
|
try
|
||||||
|
{
|
||||||
|
SetGoRoutineLabels(tags);
|
||||||
|
f();
|
||||||
|
}
|
||||||
finally { _grWg.Done(); }
|
finally { _grWg.Done(); }
|
||||||
});
|
});
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Optional test-only sink used to observe goroutine labels during unit tests.
|
||||||
|
/// </summary>
|
||||||
|
internal static Action<IReadOnlyList<KeyValuePair<string, string>>>? SetGoRoutineLabelsHookForTest { get; set; }
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Sets goroutine labels for diagnostics when tags are present.
|
||||||
|
/// Mirrors Go <c>setGoRoutineLabels</c>.
|
||||||
|
/// </summary>
|
||||||
|
internal static void SetGoRoutineLabels(params IReadOnlyDictionary<string, string>[] tags)
|
||||||
|
{
|
||||||
|
var labels = new List<KeyValuePair<string, string>>();
|
||||||
|
foreach (var tag in tags)
|
||||||
|
{
|
||||||
|
foreach (var pair in tag)
|
||||||
|
{
|
||||||
|
labels.Add(new KeyValuePair<string, string>(pair.Key, pair.Value));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (labels.Count > 0)
|
||||||
|
SetGoRoutineLabelsHookForTest?.Invoke(labels);
|
||||||
|
}
|
||||||
|
|
||||||
// =========================================================================
|
// =========================================================================
|
||||||
// Client / connection management (features 3081–3084)
|
// Client / connection management (features 3081–3084)
|
||||||
// =========================================================================
|
// =========================================================================
|
||||||
|
|||||||
@@ -15,6 +15,7 @@
|
|||||||
// Session 10: accept loops, client creation, connect URL management, port helpers.
|
// Session 10: accept loops, client creation, connect URL management, port helpers.
|
||||||
|
|
||||||
using System.Net;
|
using System.Net;
|
||||||
|
using System.Net.Security;
|
||||||
using System.Net.Sockets;
|
using System.Net.Sockets;
|
||||||
using System.Security.Cryptography;
|
using System.Security.Cryptography;
|
||||||
using System.Text.Json;
|
using System.Text.Json;
|
||||||
@@ -461,6 +462,41 @@ public sealed partial class NatsServer
|
|||||||
private string BasePath(string p) =>
|
private string BasePath(string p) =>
|
||||||
string.IsNullOrEmpty(_httpBasePath) ? p : System.IO.Path.Combine(_httpBasePath, p.TrimStart('/'));
|
string.IsNullOrEmpty(_httpBasePath) ? p : System.IO.Path.Combine(_httpBasePath, p.TrimStart('/'));
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Returns a monitoring TLS config cloned from server TLS config, with client certs disabled.
|
||||||
|
/// Mirrors Go <c>Server.getMonitoringTLSConfig()</c>.
|
||||||
|
/// </summary>
|
||||||
|
internal SslServerAuthenticationOptions? GetMonitoringTLSConfig()
|
||||||
|
{
|
||||||
|
var opts = GetOpts();
|
||||||
|
if (opts.TlsConfig == null)
|
||||||
|
return null;
|
||||||
|
|
||||||
|
var clone = new SslServerAuthenticationOptions
|
||||||
|
{
|
||||||
|
EnabledSslProtocols = opts.TlsConfig.EnabledSslProtocols,
|
||||||
|
AllowRenegotiation = opts.TlsConfig.AllowRenegotiation,
|
||||||
|
CertificateRevocationCheckMode = opts.TlsConfig.CertificateRevocationCheckMode,
|
||||||
|
CipherSuitesPolicy = opts.TlsConfig.CipherSuitesPolicy,
|
||||||
|
ServerCertificate = opts.TlsConfig.ServerCertificate,
|
||||||
|
ClientCertificateRequired = false,
|
||||||
|
EncryptionPolicy = opts.TlsConfig.EncryptionPolicy,
|
||||||
|
CertificateChainPolicy = opts.TlsConfig.CertificateChainPolicy,
|
||||||
|
};
|
||||||
|
return clone;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Returns the monitoring handler object while monitoring is active, otherwise null.
|
||||||
|
/// Mirrors Go <c>Server.HTTPHandler()</c>.
|
||||||
|
/// </summary>
|
||||||
|
public object? HTTPHandler()
|
||||||
|
{
|
||||||
|
_mu.EnterReadLock();
|
||||||
|
try { return _httpHandler; }
|
||||||
|
finally { _mu.ExitReadLock(); }
|
||||||
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// Starts the HTTP/HTTPS monitoring listener.
|
/// Starts the HTTP/HTTPS monitoring listener.
|
||||||
/// Stub — full monitoring handler registration deferred to session 17.
|
/// Stub — full monitoring handler registration deferred to session 17.
|
||||||
@@ -504,10 +540,12 @@ public sealed partial class NatsServer
|
|||||||
|
|
||||||
_mu.EnterWriteLock();
|
_mu.EnterWriteLock();
|
||||||
_http = httpListener;
|
_http = httpListener;
|
||||||
|
_httpHandler = new object();
|
||||||
_mu.ExitWriteLock();
|
_mu.ExitWriteLock();
|
||||||
|
|
||||||
// Full HTTP handler registration in session 17; for now just drain the listener.
|
// Full HTTP handler registration in session 17; for now just drain the listener.
|
||||||
_ = Task.Run(() =>
|
// Use a long-running task so teardown does not depend on thread-pool availability.
|
||||||
|
_ = Task.Factory.StartNew(() =>
|
||||||
{
|
{
|
||||||
// Accept and immediately close connections until shutdown.
|
// Accept and immediately close connections until shutdown.
|
||||||
while (!IsShuttingDown())
|
while (!IsShuttingDown())
|
||||||
@@ -524,11 +562,11 @@ public sealed partial class NatsServer
|
|||||||
}
|
}
|
||||||
|
|
||||||
_mu.EnterWriteLock();
|
_mu.EnterWriteLock();
|
||||||
// Don't null _http — ProfilerAddr etc. still read it.
|
_httpHandler = null;
|
||||||
_mu.ExitWriteLock();
|
_mu.ExitWriteLock();
|
||||||
|
|
||||||
_done.Writer.TryWrite(default);
|
_done.Writer.TryWrite(default);
|
||||||
});
|
}, CancellationToken.None, TaskCreationOptions.LongRunning, TaskScheduler.Default);
|
||||||
|
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
@@ -799,6 +837,26 @@ public sealed partial class NatsServer
|
|||||||
_ => (0, new ArgumentException($"unknown version: {ver}")),
|
_ => (0, new ArgumentException($"unknown version: {ver}")),
|
||||||
};
|
};
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Handles TLS handshake timeout by closing the connection if auth is incomplete.
|
||||||
|
/// Mirrors Go <c>tlsTimeout</c>.
|
||||||
|
/// </summary>
|
||||||
|
internal static void TlsTimeout(ClientConnection c, SslStream conn)
|
||||||
|
{
|
||||||
|
lock (c)
|
||||||
|
{
|
||||||
|
if (c.IsClosed())
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!conn.IsAuthenticated)
|
||||||
|
{
|
||||||
|
c.Errorf("TLS handshake timeout");
|
||||||
|
c.SendErr("Secure Connection - TLS Required");
|
||||||
|
c.CloseConnection(ClosedState.TlsHandshakeError);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// =========================================================================
|
// =========================================================================
|
||||||
// Connect URL helpers (features 3074–3076)
|
// Connect URL helpers (features 3074–3076)
|
||||||
// =========================================================================
|
// =========================================================================
|
||||||
|
|||||||
@@ -210,6 +210,7 @@ public sealed partial class NatsServer : INatsServer
|
|||||||
|
|
||||||
private string _httpBasePath = string.Empty;
|
private string _httpBasePath = string.Empty;
|
||||||
private readonly Dictionary<string, ulong> _httpReqStats = [];
|
private readonly Dictionary<string, ulong> _httpReqStats = [];
|
||||||
|
private object? _httpHandler;
|
||||||
|
|
||||||
// =========================================================================
|
// =========================================================================
|
||||||
// Client connect URLs
|
// Client connect URLs
|
||||||
|
|||||||
@@ -1,5 +1,8 @@
|
|||||||
using System.Security.Cryptography;
|
using System.Security.Cryptography;
|
||||||
using System.Security.Cryptography.X509Certificates;
|
using System.Security.Cryptography.X509Certificates;
|
||||||
|
using System.Net.Security;
|
||||||
|
using System.Net.Sockets;
|
||||||
|
using System.Reflection;
|
||||||
using Shouldly;
|
using Shouldly;
|
||||||
using ZB.MOM.NatsNet.Server;
|
using ZB.MOM.NatsNet.Server;
|
||||||
using ZB.MOM.NatsNet.Server.Internal;
|
using ZB.MOM.NatsNet.Server.Internal;
|
||||||
@@ -10,6 +13,99 @@ namespace ZB.MOM.NatsNet.Server.Tests.ImplBacklog;
|
|||||||
|
|
||||||
public sealed class MonitoringHandlerTests
|
public sealed class MonitoringHandlerTests
|
||||||
{
|
{
|
||||||
|
[Fact]
|
||||||
|
public void GetMonitoringTLSConfig_WithServerTlsConfig_DisablesClientCertificateRequirementOnClone()
|
||||||
|
{
|
||||||
|
var (certFile, keyFile, tempDir, _) = CreatePemCertificate(DateTimeOffset.UtcNow.AddMinutes(10));
|
||||||
|
var (tlsOpts, parseErr) = ServerOptions.ParseTLS(
|
||||||
|
new Dictionary<string, object?>
|
||||||
|
{
|
||||||
|
["cert_file"] = certFile,
|
||||||
|
["key_file"] = keyFile,
|
||||||
|
["verify"] = true,
|
||||||
|
},
|
||||||
|
isClientCtx: false);
|
||||||
|
parseErr.ShouldBeNull();
|
||||||
|
tlsOpts.ShouldNotBeNull();
|
||||||
|
|
||||||
|
var (tlsConfig, tlsErr) = ServerOptions.GenTLSConfig(tlsOpts!);
|
||||||
|
tlsErr.ShouldBeNull();
|
||||||
|
tlsConfig.ShouldNotBeNull();
|
||||||
|
|
||||||
|
var opts = new ServerOptions { TlsConfig = tlsConfig };
|
||||||
|
|
||||||
|
var (server, error) = NatsServer.NewServer(opts);
|
||||||
|
|
||||||
|
error.ShouldBeNull();
|
||||||
|
server.ShouldNotBeNull();
|
||||||
|
|
||||||
|
var monitoringTls = server!.GetMonitoringTLSConfig();
|
||||||
|
monitoringTls.ShouldNotBeNull();
|
||||||
|
monitoringTls!.ClientCertificateRequired.ShouldBeFalse();
|
||||||
|
monitoringTls.CertificateRevocationCheckMode.ShouldBe(opts.TlsConfig!.CertificateRevocationCheckMode);
|
||||||
|
opts.TlsConfig!.ClientCertificateRequired.ShouldBeTrue();
|
||||||
|
|
||||||
|
Directory.Delete(tempDir, recursive: true);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void HTTPHandler_WhenMonitoringListenerStops_TransitionsToNull()
|
||||||
|
{
|
||||||
|
var opts = new ServerOptions
|
||||||
|
{
|
||||||
|
HttpHost = "127.0.0.1",
|
||||||
|
HttpPort = -1,
|
||||||
|
};
|
||||||
|
var (server, error) = NatsServer.NewServer(opts);
|
||||||
|
error.ShouldBeNull();
|
||||||
|
server.ShouldNotBeNull();
|
||||||
|
|
||||||
|
server!.HTTPHandler().ShouldBeNull();
|
||||||
|
|
||||||
|
var startError = server.StartMonitoring();
|
||||||
|
startError.ShouldBeNull();
|
||||||
|
server.HTTPHandler().ShouldNotBeNull();
|
||||||
|
|
||||||
|
var listenerField = typeof(NatsServer).GetField("_http", BindingFlags.Instance | BindingFlags.NonPublic);
|
||||||
|
listenerField.ShouldNotBeNull();
|
||||||
|
var listener = listenerField!.GetValue(server).ShouldBeOfType<TcpListener>();
|
||||||
|
listener.Stop();
|
||||||
|
|
||||||
|
var transitioned = SpinWait.SpinUntil(() => server.HTTPHandler() == null, TimeSpan.FromSeconds(5));
|
||||||
|
transitioned.ShouldBeTrue();
|
||||||
|
server.HTTPHandler().ShouldBeNull();
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task LogRejectedTLSConns_WhenRateCounterHasBlockedConnections_EmitsWarning()
|
||||||
|
{
|
||||||
|
var opts = new ServerOptions { TlsRateLimit = 1 };
|
||||||
|
var (server, error) = NatsServer.NewServer(opts);
|
||||||
|
error.ShouldBeNull();
|
||||||
|
server.ShouldNotBeNull();
|
||||||
|
|
||||||
|
var logger = new MonitoringCaptureLogger();
|
||||||
|
server!.SetLogger(logger, debugFlag: false, traceFlag: false);
|
||||||
|
|
||||||
|
var rateCounterField = typeof(NatsServer).GetField("_connRateCounter", BindingFlags.Instance | BindingFlags.NonPublic);
|
||||||
|
rateCounterField.ShouldNotBeNull();
|
||||||
|
var rateCounter = rateCounterField!.GetValue(server).ShouldBeOfType<RateCounter>();
|
||||||
|
|
||||||
|
rateCounter.Allow();
|
||||||
|
rateCounter.Allow();
|
||||||
|
|
||||||
|
using var cts = new CancellationTokenSource();
|
||||||
|
var loop = server.LogRejectedTLSConns(cts.Token, TimeSpan.FromMilliseconds(10));
|
||||||
|
|
||||||
|
var warned = SpinWait.SpinUntil(
|
||||||
|
() => logger.WarningEntries.Any(w => w.Contains("Rejected", StringComparison.OrdinalIgnoreCase)),
|
||||||
|
TimeSpan.FromSeconds(2));
|
||||||
|
cts.Cancel();
|
||||||
|
await loop;
|
||||||
|
|
||||||
|
warned.ShouldBeTrue();
|
||||||
|
}
|
||||||
|
|
||||||
[Fact] // T:2108
|
[Fact] // T:2108
|
||||||
public void MonitorConnzClosedConnsBadTLSClient_ShouldSucceed()
|
public void MonitorConnzClosedConnsBadTLSClient_ShouldSucceed()
|
||||||
{
|
{
|
||||||
@@ -228,6 +324,18 @@ public sealed class MonitoringHandlerTests
|
|||||||
return (certFile, keyFile, tempDir, notAfter);
|
return (certFile, keyFile, tempDir, notAfter);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private sealed class MonitoringCaptureLogger : INatsLogger
|
||||||
|
{
|
||||||
|
public List<string> WarningEntries { get; } = [];
|
||||||
|
|
||||||
|
public void Noticef(string format, params object[] args) { }
|
||||||
|
public void Warnf(string format, params object[] args) => WarningEntries.Add(string.Format(format, args));
|
||||||
|
public void Fatalf(string format, params object[] args) { }
|
||||||
|
public void Errorf(string format, params object[] args) { }
|
||||||
|
public void Debugf(string format, params object[] args) { }
|
||||||
|
public void Tracef(string format, params object[] args) { }
|
||||||
|
}
|
||||||
|
|
||||||
[Fact] // T:2065
|
[Fact] // T:2065
|
||||||
public void MonitorNoPort_ShouldSucceed()
|
public void MonitorNoPort_ShouldSucceed()
|
||||||
{
|
{
|
||||||
|
|||||||
@@ -17,6 +17,8 @@
|
|||||||
using System.Text;
|
using System.Text;
|
||||||
using System.Text.Json;
|
using System.Text.Json;
|
||||||
using System.Text.RegularExpressions;
|
using System.Text.RegularExpressions;
|
||||||
|
using System.Net.Security;
|
||||||
|
using System.Threading;
|
||||||
using NSubstitute;
|
using NSubstitute;
|
||||||
using NSubstitute.ExceptionExtensions;
|
using NSubstitute.ExceptionExtensions;
|
||||||
using Shouldly;
|
using Shouldly;
|
||||||
@@ -280,6 +282,27 @@ public sealed class ServerTests
|
|||||||
[Fact]
|
[Fact]
|
||||||
public void NeedsCompression_S2Fast_ReturnsTrue()
|
public void NeedsCompression_S2Fast_ReturnsTrue()
|
||||||
=> NatsServer.NeedsCompression(CompressionMode.S2Fast).ShouldBeTrue();
|
=> NatsServer.NeedsCompression(CompressionMode.S2Fast).ShouldBeTrue();
|
||||||
|
|
||||||
|
[Theory]
|
||||||
|
[InlineData(CompressionMode.S2Uncompressed, "writer_concurrency=1", "writer_uncompressed")]
|
||||||
|
[InlineData(CompressionMode.S2Best, "writer_concurrency=1", "writer_best_compression")]
|
||||||
|
[InlineData(CompressionMode.S2Better, "writer_concurrency=1", "writer_better_compression")]
|
||||||
|
public void S2WriterOptions_KnownModes_ReturnExpectedOptions(
|
||||||
|
string mode,
|
||||||
|
string expectedFirst,
|
||||||
|
string expectedSecond)
|
||||||
|
{
|
||||||
|
var options = NatsServer.S2WriterOptions(mode);
|
||||||
|
|
||||||
|
options.ShouldNotBeNull();
|
||||||
|
options!.ShouldBe([expectedFirst, expectedSecond]);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void S2WriterOptions_UnsupportedMode_ReturnsNull()
|
||||||
|
{
|
||||||
|
NatsServer.S2WriterOptions(CompressionMode.S2Fast).ShouldBeNull();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// =============================================================================
|
// =============================================================================
|
||||||
@@ -292,6 +315,57 @@ public sealed class ServerTests
|
|||||||
/// </summary>
|
/// </summary>
|
||||||
public sealed class ServerListenersTests
|
public sealed class ServerListenersTests
|
||||||
{
|
{
|
||||||
|
[Fact]
|
||||||
|
public void TlsTimeout_IncompleteHandshake_ClosesConnection()
|
||||||
|
{
|
||||||
|
var c = new ClientConnection(ClientKind.Client, nc: new MemoryStream());
|
||||||
|
using var tls = new SslStream(new MemoryStream(), leaveInnerStreamOpen: false);
|
||||||
|
|
||||||
|
c.IsClosed().ShouldBeFalse();
|
||||||
|
|
||||||
|
NatsServer.TlsTimeout(c, tls);
|
||||||
|
|
||||||
|
c.IsClosed().ShouldBeTrue();
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void StartGoRoutine_WithLabels_InvokesSetGoRoutineLabels()
|
||||||
|
{
|
||||||
|
var (s, err) = NatsServer.NewServer(new ServerOptions());
|
||||||
|
err.ShouldBeNull();
|
||||||
|
s.ShouldNotBeNull();
|
||||||
|
s!.Start();
|
||||||
|
|
||||||
|
var signal = new ManualResetEventSlim(false);
|
||||||
|
IReadOnlyList<KeyValuePair<string, string>>? observed = null;
|
||||||
|
|
||||||
|
NatsServer.SetGoRoutineLabelsHookForTest = labels =>
|
||||||
|
{
|
||||||
|
observed = labels;
|
||||||
|
signal.Set();
|
||||||
|
};
|
||||||
|
|
||||||
|
try
|
||||||
|
{
|
||||||
|
var started = s.StartGoRoutine(
|
||||||
|
() => { },
|
||||||
|
new Dictionary<string, string> { ["component"] = "server", ["loop"] = "tls" });
|
||||||
|
|
||||||
|
started.ShouldBeTrue();
|
||||||
|
signal.Wait(TimeSpan.FromSeconds(2)).ShouldBeTrue();
|
||||||
|
|
||||||
|
observed.ShouldNotBeNull();
|
||||||
|
observed!.ShouldContain(kv => kv.Key == "component" && kv.Value == "server");
|
||||||
|
observed.ShouldContain(kv => kv.Key == "loop" && kv.Value == "tls");
|
||||||
|
}
|
||||||
|
finally
|
||||||
|
{
|
||||||
|
NatsServer.SetGoRoutineLabelsHookForTest = null;
|
||||||
|
s.Shutdown();
|
||||||
|
s.WaitForShutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// =========================================================================
|
// =========================================================================
|
||||||
// GenerateInfoJson (feature 3069) — Test ID 2906
|
// GenerateInfoJson (feature 3069) — Test ID 2906
|
||||||
// Mirrors Go TestServerJsonMarshalNestedStructsPanic (guards against
|
// Mirrors Go TestServerJsonMarshalNestedStructsPanic (guards against
|
||||||
|
|||||||
BIN
porting.db
BIN
porting.db
Binary file not shown.
Reference in New Issue
Block a user