feat: port sessions 12 & 13 — Events/Monitoring/MsgTrace + Config Reload
Session 12 (218 features, IDs 854-950, 2166-2251, 2405-2439): - EventTypes: system subjects, event message types, InternalState, ConnectEventMsg, DisconnectEventMsg, AccountNumConns, ServerIdentity, DataStats - MonitorTypes: Connz, ConnInfo, ConnzOptions, ConnState, ProxyInfo, TlsPeerCert - MonitorSortOptions: SortOpt, ConnInfos, all 13 sort comparers - MsgTraceTypes: IMsgTrace, MsgTraceBase + 6 concrete types, custom JSON converter Session 13 (89 features, IDs 2800-2888): - ReloadOptions: IReloadOption interface, NoopReloadOption base, 50 option classes covering logging, TLS, auth, cluster, JetStream, MQTT, OCSP, misc
This commit is contained in:
@@ -19,38 +19,6 @@ using ZB.MOM.NatsNet.Server.Internal.DataStructures;
|
||||
|
||||
namespace ZB.MOM.NatsNet.Server;
|
||||
|
||||
// ============================================================================
|
||||
// AccountNumConns — remote server connection/leafnode count message
|
||||
// Mirrors Go `AccountNumConns` struct used in updateRemoteServer.
|
||||
// ============================================================================
|
||||
|
||||
/// <summary>
|
||||
/// Carries the number of client connections and leaf nodes that a remote server
|
||||
/// has for a given account, along with the remote server's identity.
|
||||
/// Mirrors Go <c>AccountNumConns</c> in server/accounts.go.
|
||||
/// </summary>
|
||||
internal sealed class AccountNumConns
|
||||
{
|
||||
/// <summary>Remote server identity. Mirrors Go <c>Server ServerInfo</c>.</summary>
|
||||
public ServerIdentity Server { get; set; } = new();
|
||||
|
||||
/// <summary>Number of client connections on the remote server. Mirrors Go <c>Conns int</c>.</summary>
|
||||
public int Conns { get; set; }
|
||||
|
||||
/// <summary>Number of leaf nodes on the remote server. Mirrors Go <c>LeafNodes int</c>.</summary>
|
||||
public int LeafNodes { get; set; }
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Minimal remote server identity stub used by <see cref="AccountNumConns"/>.
|
||||
/// Full implementation lives with the server cluster sessions.
|
||||
/// </summary>
|
||||
internal sealed class ServerIdentity
|
||||
{
|
||||
/// <summary>Unique server ID. Mirrors Go <c>ID string</c>.</summary>
|
||||
public string ID { get; set; } = string.Empty;
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// Account — full implementation
|
||||
// Mirrors Go `Account` struct in server/accounts.go lines 52-119.
|
||||
@@ -666,8 +634,8 @@ public sealed class Account : INatsAccount
|
||||
{
|
||||
_strack ??= new Dictionary<string, SConns>();
|
||||
|
||||
_strack.TryGetValue(m.Server.ID, out var prev);
|
||||
_strack[m.Server.ID] = new SConns
|
||||
_strack.TryGetValue(m.Server.Id, out var prev);
|
||||
_strack[m.Server.Id] = new SConns
|
||||
{
|
||||
Conns = m.Conns,
|
||||
Leafs = m.LeafNodes,
|
||||
|
||||
996
dotnet/src/ZB.MOM.NatsNet.Server/Config/ReloadOptions.cs
Normal file
996
dotnet/src/ZB.MOM.NatsNet.Server/Config/ReloadOptions.cs
Normal file
@@ -0,0 +1,996 @@
|
||||
// Copyright 2017-2026 The NATS Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
//
|
||||
// Adapted from server/reload.go in the NATS server Go source.
|
||||
|
||||
namespace ZB.MOM.NatsNet.Server;
|
||||
|
||||
// =============================================================================
|
||||
// IReloadOption — mirrors Go `option` interface in reload.go
|
||||
// =============================================================================
|
||||
|
||||
/// <summary>
|
||||
/// Represents a hot-swappable configuration setting that can be applied to a
|
||||
/// running server. Mirrors Go <c>option</c> interface in server/reload.go.
|
||||
/// </summary>
|
||||
public interface IReloadOption
|
||||
{
|
||||
/// <summary>Apply this option to the running server.</summary>
|
||||
void Apply(NatsServer server);
|
||||
|
||||
/// <summary>Returns true if this option requires reloading the logger.</summary>
|
||||
bool IsLoggingChange();
|
||||
|
||||
/// <summary>
|
||||
/// Returns true if this option requires reloading the cached trace level.
|
||||
/// Clients store trace level separately.
|
||||
/// </summary>
|
||||
bool IsTraceLevelChange();
|
||||
|
||||
/// <summary>Returns true if this option requires reloading authorization.</summary>
|
||||
bool IsAuthChange();
|
||||
|
||||
/// <summary>Returns true if this option requires reloading TLS.</summary>
|
||||
bool IsTlsChange();
|
||||
|
||||
/// <summary>Returns true if this option requires reloading cluster permissions.</summary>
|
||||
bool IsClusterPermsChange();
|
||||
|
||||
/// <summary>
|
||||
/// Returns true if this option requires special handling for changes in
|
||||
/// cluster pool size or accounts list.
|
||||
/// </summary>
|
||||
bool IsClusterPoolSizeOrAccountsChange();
|
||||
|
||||
/// <summary>
|
||||
/// Returns true if this option indicates a change in the server's JetStream config.
|
||||
/// Account changes are handled separately in reloadAuthorization.
|
||||
/// </summary>
|
||||
bool IsJetStreamChange();
|
||||
|
||||
/// <summary>Returns true if this change requires publishing the server's statz.</summary>
|
||||
bool IsStatszChange();
|
||||
}
|
||||
|
||||
// =============================================================================
|
||||
// NoopReloadOption — mirrors Go `noopOption` struct in reload.go
|
||||
// =============================================================================
|
||||
|
||||
/// <summary>
|
||||
/// Base class providing no-op implementations for all <see cref="IReloadOption"/>
|
||||
/// methods. Concrete option types override only the methods relevant to them.
|
||||
/// Mirrors Go <c>noopOption</c> struct in server/reload.go.
|
||||
/// </summary>
|
||||
public abstract class NoopReloadOption : IReloadOption
|
||||
{
|
||||
/// <inheritdoc/>
|
||||
public virtual void Apply(NatsServer server) { }
|
||||
|
||||
/// <inheritdoc/>
|
||||
public virtual bool IsLoggingChange() => false;
|
||||
|
||||
/// <inheritdoc/>
|
||||
public virtual bool IsTraceLevelChange() => false;
|
||||
|
||||
/// <inheritdoc/>
|
||||
public virtual bool IsAuthChange() => false;
|
||||
|
||||
/// <inheritdoc/>
|
||||
public virtual bool IsTlsChange() => false;
|
||||
|
||||
/// <inheritdoc/>
|
||||
public virtual bool IsClusterPermsChange() => false;
|
||||
|
||||
/// <inheritdoc/>
|
||||
public virtual bool IsClusterPoolSizeOrAccountsChange() => false;
|
||||
|
||||
/// <inheritdoc/>
|
||||
public virtual bool IsJetStreamChange() => false;
|
||||
|
||||
/// <inheritdoc/>
|
||||
public virtual bool IsStatszChange() => false;
|
||||
}
|
||||
|
||||
// =============================================================================
|
||||
// Intermediate base classes (mirrors Go loggingOption / traceLevelOption)
|
||||
// =============================================================================
|
||||
|
||||
/// <summary>
|
||||
/// Base for all logging-related reload options.
|
||||
/// Mirrors Go <c>loggingOption</c> struct.
|
||||
/// </summary>
|
||||
internal abstract class LoggingReloadOption : NoopReloadOption
|
||||
{
|
||||
public override bool IsLoggingChange() => true;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Base for all trace-level reload options.
|
||||
/// Mirrors Go <c>traceLevelOption</c> struct.
|
||||
/// </summary>
|
||||
internal abstract class TraceLevelReloadOption : LoggingReloadOption
|
||||
{
|
||||
public override bool IsTraceLevelChange() => true;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Base for all authorization-related reload options.
|
||||
/// Mirrors Go <c>authOption</c> struct.
|
||||
/// </summary>
|
||||
internal abstract class AuthReloadOption : NoopReloadOption
|
||||
{
|
||||
public override bool IsAuthChange() => true;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Base for TLS reload options.
|
||||
/// Mirrors Go <c>tlsOption</c> (as a base, not the concrete type).
|
||||
/// </summary>
|
||||
internal abstract class TlsBaseReloadOption : NoopReloadOption
|
||||
{
|
||||
public override bool IsTlsChange() => true;
|
||||
}
|
||||
|
||||
// =============================================================================
|
||||
// Logging & Trace option types
|
||||
// =============================================================================
|
||||
|
||||
/// <summary>
|
||||
/// Reload option for the <c>trace</c> setting.
|
||||
/// Mirrors Go <c>traceOption</c> struct in reload.go.
|
||||
/// </summary>
|
||||
internal sealed class TraceReloadOption : TraceLevelReloadOption
|
||||
{
|
||||
private readonly bool _newValue;
|
||||
public TraceReloadOption(bool newValue) => _newValue = newValue;
|
||||
|
||||
public override void Apply(NatsServer server)
|
||||
=> server.Noticef("Reloaded: trace = {0}", _newValue);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Reload option for the <c>trace_verbose</c> setting.
|
||||
/// Mirrors Go <c>traceVerboseOption</c> struct in reload.go.
|
||||
/// </summary>
|
||||
internal sealed class TraceVerboseReloadOption : TraceLevelReloadOption
|
||||
{
|
||||
private readonly bool _newValue;
|
||||
public TraceVerboseReloadOption(bool newValue) => _newValue = newValue;
|
||||
|
||||
public override void Apply(NatsServer server)
|
||||
=> server.Noticef("Reloaded: trace_verbose = {0}", _newValue);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Reload option for the <c>trace_headers</c> setting.
|
||||
/// Mirrors Go <c>traceHeadersOption</c> struct in reload.go.
|
||||
/// </summary>
|
||||
internal sealed class TraceHeadersReloadOption : TraceLevelReloadOption
|
||||
{
|
||||
private readonly bool _newValue;
|
||||
public TraceHeadersReloadOption(bool newValue) => _newValue = newValue;
|
||||
|
||||
public override void Apply(NatsServer server)
|
||||
=> server.Noticef("Reloaded: trace_headers = {0}", _newValue);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Reload option for the <c>debug</c> setting.
|
||||
/// Mirrors Go <c>debugOption</c> struct in reload.go.
|
||||
/// </summary>
|
||||
internal sealed class DebugReloadOption : LoggingReloadOption
|
||||
{
|
||||
private readonly bool _newValue;
|
||||
public DebugReloadOption(bool newValue) => _newValue = newValue;
|
||||
|
||||
public override void Apply(NatsServer server)
|
||||
{
|
||||
server.Noticef("Reloaded: debug = {0}", _newValue);
|
||||
// TODO: session 13 — call server.ReloadDebugRaftNodes(_newValue)
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Reload option for the <c>logtime</c> setting.
|
||||
/// Mirrors Go <c>logtimeOption</c> struct in reload.go.
|
||||
/// </summary>
|
||||
internal sealed class LogtimeReloadOption : LoggingReloadOption
|
||||
{
|
||||
private readonly bool _newValue;
|
||||
public LogtimeReloadOption(bool newValue) => _newValue = newValue;
|
||||
|
||||
public override void Apply(NatsServer server)
|
||||
=> server.Noticef("Reloaded: logtime = {0}", _newValue);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Reload option for the <c>logtime_utc</c> setting.
|
||||
/// Mirrors Go <c>logtimeUTCOption</c> struct in reload.go.
|
||||
/// </summary>
|
||||
internal sealed class LogtimeUtcReloadOption : LoggingReloadOption
|
||||
{
|
||||
private readonly bool _newValue;
|
||||
public LogtimeUtcReloadOption(bool newValue) => _newValue = newValue;
|
||||
|
||||
public override void Apply(NatsServer server)
|
||||
=> server.Noticef("Reloaded: logtime_utc = {0}", _newValue);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Reload option for the <c>log_file</c> setting.
|
||||
/// Mirrors Go <c>logfileOption</c> struct in reload.go.
|
||||
/// </summary>
|
||||
internal sealed class LogFileReloadOption : LoggingReloadOption
|
||||
{
|
||||
private readonly string _newValue;
|
||||
public LogFileReloadOption(string newValue) => _newValue = newValue;
|
||||
|
||||
public override void Apply(NatsServer server)
|
||||
=> server.Noticef("Reloaded: log_file = {0}", _newValue);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Reload option for the <c>syslog</c> setting.
|
||||
/// Mirrors Go <c>syslogOption</c> struct in reload.go.
|
||||
/// </summary>
|
||||
internal sealed class SyslogReloadOption : LoggingReloadOption
|
||||
{
|
||||
private readonly bool _newValue;
|
||||
public SyslogReloadOption(bool newValue) => _newValue = newValue;
|
||||
|
||||
public override void Apply(NatsServer server)
|
||||
=> server.Noticef("Reloaded: syslog = {0}", _newValue);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Reload option for the <c>remote_syslog</c> setting.
|
||||
/// Mirrors Go <c>remoteSyslogOption</c> struct in reload.go.
|
||||
/// </summary>
|
||||
internal sealed class RemoteSyslogReloadOption : LoggingReloadOption
|
||||
{
|
||||
private readonly string _newValue;
|
||||
public RemoteSyslogReloadOption(string newValue) => _newValue = newValue;
|
||||
|
||||
public override void Apply(NatsServer server)
|
||||
=> server.Noticef("Reloaded: remote_syslog = {0}", _newValue);
|
||||
}
|
||||
|
||||
// =============================================================================
|
||||
// TLS option types
|
||||
// =============================================================================
|
||||
|
||||
/// <summary>
|
||||
/// Reload option for the <c>tls</c> setting.
|
||||
/// Mirrors Go <c>tlsOption</c> struct in reload.go.
|
||||
/// The TLS config is stored as <c>object?</c> because the full
|
||||
/// <c>TlsConfig</c> type is not yet ported.
|
||||
/// TODO: session 13 — replace object? with the ported TlsConfig type.
|
||||
/// </summary>
|
||||
internal sealed class TlsReloadOption : NoopReloadOption
|
||||
{
|
||||
// TODO: session 13 — replace object? with ported TlsConfig type
|
||||
private readonly object? _newValue;
|
||||
public TlsReloadOption(object? newValue) => _newValue = newValue;
|
||||
|
||||
public override bool IsTlsChange() => true;
|
||||
|
||||
public override void Apply(NatsServer server)
|
||||
{
|
||||
var message = _newValue is null ? "disabled" : "enabled";
|
||||
server.Noticef("Reloaded: tls = {0}", message);
|
||||
// TODO: session 13 — update server.Info.TLSRequired / TLSVerify
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Reload option for the TLS <c>timeout</c> setting.
|
||||
/// Mirrors Go <c>tlsTimeoutOption</c> struct in reload.go.
|
||||
/// </summary>
|
||||
internal sealed class TlsTimeoutReloadOption : NoopReloadOption
|
||||
{
|
||||
private readonly double _newValue;
|
||||
public TlsTimeoutReloadOption(double newValue) => _newValue = newValue;
|
||||
|
||||
public override void Apply(NatsServer server)
|
||||
=> server.Noticef("Reloaded: tls timeout = {0}", _newValue);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Reload option for the TLS <c>pinned_certs</c> setting.
|
||||
/// Mirrors Go <c>tlsPinnedCertOption</c> struct in reload.go.
|
||||
/// The pinned cert set is stored as <c>object?</c> pending the port
|
||||
/// of the PinnedCertSet type.
|
||||
/// TODO: session 13 — replace object? with ported PinnedCertSet type.
|
||||
/// </summary>
|
||||
internal sealed class TlsPinnedCertReloadOption : NoopReloadOption
|
||||
{
|
||||
// TODO: session 13 — replace object? with ported PinnedCertSet type
|
||||
private readonly object? _newValue;
|
||||
public TlsPinnedCertReloadOption(object? newValue) => _newValue = newValue;
|
||||
|
||||
public override void Apply(NatsServer server)
|
||||
=> server.Noticef("Reloaded: pinned_certs");
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Reload option for the TLS <c>handshake_first</c> setting.
|
||||
/// Mirrors Go <c>tlsHandshakeFirst</c> struct in reload.go.
|
||||
/// </summary>
|
||||
internal sealed class TlsHandshakeFirstReloadOption : NoopReloadOption
|
||||
{
|
||||
private readonly bool _newValue;
|
||||
public TlsHandshakeFirstReloadOption(bool newValue) => _newValue = newValue;
|
||||
|
||||
public override void Apply(NatsServer server)
|
||||
=> server.Noticef("Reloaded: Client TLS handshake first: {0}", _newValue);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Reload option for the TLS <c>handshake_first_fallback</c> delay setting.
|
||||
/// Mirrors Go <c>tlsHandshakeFirstFallback</c> struct in reload.go.
|
||||
/// </summary>
|
||||
internal sealed class TlsHandshakeFirstFallbackReloadOption : NoopReloadOption
|
||||
{
|
||||
private readonly TimeSpan _newValue;
|
||||
public TlsHandshakeFirstFallbackReloadOption(TimeSpan newValue) => _newValue = newValue;
|
||||
|
||||
public override void Apply(NatsServer server)
|
||||
=> server.Noticef("Reloaded: Client TLS handshake first fallback delay: {0}", _newValue);
|
||||
}
|
||||
|
||||
// =============================================================================
|
||||
// Authorization option types
|
||||
// =============================================================================
|
||||
|
||||
/// <summary>
|
||||
/// Reload option for the <c>username</c> authorization setting.
|
||||
/// Mirrors Go <c>usernameOption</c> struct in reload.go.
|
||||
/// </summary>
|
||||
internal sealed class UsernameReloadOption : AuthReloadOption
|
||||
{
|
||||
public override void Apply(NatsServer server)
|
||||
=> server.Noticef("Reloaded: authorization username");
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Reload option for the <c>password</c> authorization setting.
|
||||
/// Mirrors Go <c>passwordOption</c> struct in reload.go.
|
||||
/// </summary>
|
||||
internal sealed class PasswordReloadOption : AuthReloadOption
|
||||
{
|
||||
public override void Apply(NatsServer server)
|
||||
=> server.Noticef("Reloaded: authorization password");
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Reload option for the <c>token</c> authorization setting.
|
||||
/// Mirrors Go <c>authorizationOption</c> struct in reload.go.
|
||||
/// </summary>
|
||||
internal sealed class AuthorizationReloadOption : AuthReloadOption
|
||||
{
|
||||
public override void Apply(NatsServer server)
|
||||
=> server.Noticef("Reloaded: authorization token");
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Reload option for the authorization <c>timeout</c> setting.
|
||||
/// Note: this is a NoopReloadOption (not auth) because authorization
|
||||
/// will be reloaded with options separately.
|
||||
/// Mirrors Go <c>authTimeoutOption</c> struct in reload.go.
|
||||
/// </summary>
|
||||
internal sealed class AuthTimeoutReloadOption : NoopReloadOption
|
||||
{
|
||||
private readonly double _newValue;
|
||||
public AuthTimeoutReloadOption(double newValue) => _newValue = newValue;
|
||||
|
||||
public override void Apply(NatsServer server)
|
||||
=> server.Noticef("Reloaded: authorization timeout = {0}", _newValue);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Reload option for the <c>tags</c> setting.
|
||||
/// Mirrors Go <c>tagsOption</c> struct in reload.go.
|
||||
/// </summary>
|
||||
internal sealed class TagsReloadOption : NoopReloadOption
|
||||
{
|
||||
public override void Apply(NatsServer server)
|
||||
=> server.Noticef("Reloaded: tags");
|
||||
|
||||
public override bool IsStatszChange() => true;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Reload option for the <c>metadata</c> setting.
|
||||
/// Mirrors Go <c>metadataOption</c> struct in reload.go.
|
||||
/// </summary>
|
||||
internal sealed class MetadataReloadOption : NoopReloadOption
|
||||
{
|
||||
public override void Apply(NatsServer server)
|
||||
=> server.Noticef("Reloaded: metadata");
|
||||
|
||||
public override bool IsStatszChange() => true;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Reload option for the authorization <c>users</c> setting.
|
||||
/// Mirrors Go <c>usersOption</c> struct in reload.go.
|
||||
/// </summary>
|
||||
internal sealed class UsersReloadOption : AuthReloadOption
|
||||
{
|
||||
public override void Apply(NatsServer server)
|
||||
=> server.Noticef("Reloaded: authorization users");
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Reload option for the authorization <c>nkeys</c> setting.
|
||||
/// Mirrors Go <c>nkeysOption</c> struct in reload.go.
|
||||
/// </summary>
|
||||
internal sealed class NkeysReloadOption : AuthReloadOption
|
||||
{
|
||||
public override void Apply(NatsServer server)
|
||||
=> server.Noticef("Reloaded: authorization nkey users");
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Reload option for the <c>accounts</c> setting.
|
||||
/// Mirrors Go <c>accountsOption</c> struct in reload.go.
|
||||
/// </summary>
|
||||
internal sealed class AccountsReloadOption : AuthReloadOption
|
||||
{
|
||||
public override void Apply(NatsServer server)
|
||||
=> server.Noticef("Reloaded: accounts");
|
||||
}
|
||||
|
||||
// =============================================================================
|
||||
// Cluster option types
|
||||
// =============================================================================
|
||||
|
||||
/// <summary>
|
||||
/// Reload option for the <c>cluster</c> setting.
|
||||
/// Stores cluster options as <c>object?</c> pending the port of <c>ClusterOpts</c>.
|
||||
/// Mirrors Go <c>clusterOption</c> struct in reload.go.
|
||||
/// TODO: session 13 — replace object? with ported ClusterOpts type.
|
||||
/// </summary>
|
||||
internal sealed class ClusterReloadOption : AuthReloadOption
|
||||
{
|
||||
// TODO: session 13 — replace object? with ported ClusterOpts type
|
||||
private readonly object? _newValue;
|
||||
private readonly bool _permsChanged;
|
||||
private readonly bool _poolSizeChanged;
|
||||
private readonly bool _compressChanged;
|
||||
private readonly string[] _accsAdded;
|
||||
private readonly string[] _accsRemoved;
|
||||
|
||||
public ClusterReloadOption(
|
||||
object? newValue,
|
||||
bool permsChanged,
|
||||
bool poolSizeChanged,
|
||||
bool compressChanged,
|
||||
string[] accsAdded,
|
||||
string[] accsRemoved)
|
||||
{
|
||||
_newValue = newValue;
|
||||
_permsChanged = permsChanged;
|
||||
_poolSizeChanged = poolSizeChanged;
|
||||
_compressChanged = compressChanged;
|
||||
_accsAdded = accsAdded;
|
||||
_accsRemoved = accsRemoved;
|
||||
}
|
||||
|
||||
public override bool IsClusterPermsChange()
|
||||
=> _permsChanged;
|
||||
|
||||
public override bool IsClusterPoolSizeOrAccountsChange()
|
||||
=> _poolSizeChanged || _accsAdded.Length > 0 || _accsRemoved.Length > 0;
|
||||
|
||||
public override void Apply(NatsServer server)
|
||||
{
|
||||
// TODO: session 13 — full cluster apply logic (TLS, route info, compression)
|
||||
server.Noticef("Reloaded: cluster");
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Reload option for the cluster <c>routes</c> setting.
|
||||
/// Routes to add/remove are stored as <c>object[]</c> pending the port of URL handling.
|
||||
/// Mirrors Go <c>routesOption</c> struct in reload.go.
|
||||
/// TODO: session 13 — replace object[] with Uri[] when route types are ported.
|
||||
/// </summary>
|
||||
internal sealed class RoutesReloadOption : NoopReloadOption
|
||||
{
|
||||
// TODO: session 13 — replace object[] with Uri[] when route URL types are ported
|
||||
private readonly object[] _add;
|
||||
private readonly object[] _remove;
|
||||
|
||||
public RoutesReloadOption(object[] add, object[] remove)
|
||||
{
|
||||
_add = add;
|
||||
_remove = remove;
|
||||
}
|
||||
|
||||
public override void Apply(NatsServer server)
|
||||
{
|
||||
// TODO: session 13 — add/remove routes, update varzUpdateRouteURLs
|
||||
server.Noticef("Reloaded: cluster routes");
|
||||
}
|
||||
}
|
||||
|
||||
// =============================================================================
|
||||
// Connection limit & network option types
|
||||
// =============================================================================
|
||||
|
||||
/// <summary>
|
||||
/// Reload option for the <c>max_connections</c> setting.
|
||||
/// Mirrors Go <c>maxConnOption</c> struct in reload.go.
|
||||
/// </summary>
|
||||
internal sealed class MaxConnReloadOption : NoopReloadOption
|
||||
{
|
||||
private readonly int _newValue;
|
||||
public MaxConnReloadOption(int newValue) => _newValue = newValue;
|
||||
|
||||
public override void Apply(NatsServer server)
|
||||
{
|
||||
// TODO: session 13 — close random connections if over limit
|
||||
server.Noticef("Reloaded: max_connections = {0}", _newValue);
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Reload option for the <c>pid_file</c> setting.
|
||||
/// Mirrors Go <c>pidFileOption</c> struct in reload.go.
|
||||
/// </summary>
|
||||
internal sealed class PidFileReloadOption : NoopReloadOption
|
||||
{
|
||||
private readonly string _newValue;
|
||||
public PidFileReloadOption(string newValue) => _newValue = newValue;
|
||||
|
||||
public override void Apply(NatsServer server)
|
||||
{
|
||||
if (string.IsNullOrEmpty(_newValue))
|
||||
return;
|
||||
// TODO: session 13 — call server.LogPid()
|
||||
server.Noticef("Reloaded: pid_file = {0}", _newValue);
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Reload option for the <c>ports_file_dir</c> setting.
|
||||
/// Mirrors Go <c>portsFileDirOption</c> struct in reload.go.
|
||||
/// </summary>
|
||||
internal sealed class PortsFileDirReloadOption : NoopReloadOption
|
||||
{
|
||||
private readonly string _oldValue;
|
||||
private readonly string _newValue;
|
||||
|
||||
public PortsFileDirReloadOption(string oldValue, string newValue)
|
||||
{
|
||||
_oldValue = oldValue;
|
||||
_newValue = newValue;
|
||||
}
|
||||
|
||||
public override void Apply(NatsServer server)
|
||||
{
|
||||
// TODO: session 13 — call server.DeletePortsFile(_oldValue) and server.LogPorts()
|
||||
server.Noticef("Reloaded: ports_file_dir = {0}", _newValue);
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Reload option for the <c>max_control_line</c> setting.
|
||||
/// Mirrors Go <c>maxControlLineOption</c> struct in reload.go.
|
||||
/// </summary>
|
||||
internal sealed class MaxControlLineReloadOption : NoopReloadOption
|
||||
{
|
||||
private readonly int _newValue;
|
||||
public MaxControlLineReloadOption(int newValue) => _newValue = newValue;
|
||||
|
||||
public override void Apply(NatsServer server)
|
||||
{
|
||||
// TODO: session 13 — update mcl on each connected client
|
||||
server.Noticef("Reloaded: max_control_line = {0}", _newValue);
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Reload option for the <c>max_payload</c> setting.
|
||||
/// Mirrors Go <c>maxPayloadOption</c> struct in reload.go.
|
||||
/// </summary>
|
||||
internal sealed class MaxPayloadReloadOption : NoopReloadOption
|
||||
{
|
||||
private readonly int _newValue;
|
||||
public MaxPayloadReloadOption(int newValue) => _newValue = newValue;
|
||||
|
||||
public override void Apply(NatsServer server)
|
||||
{
|
||||
// TODO: session 13 — update server info and mpay on each client
|
||||
server.Noticef("Reloaded: max_payload = {0}", _newValue);
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Reload option for the <c>ping_interval</c> setting.
|
||||
/// Mirrors Go <c>pingIntervalOption</c> struct in reload.go.
|
||||
/// </summary>
|
||||
internal sealed class PingIntervalReloadOption : NoopReloadOption
|
||||
{
|
||||
private readonly TimeSpan _newValue;
|
||||
public PingIntervalReloadOption(TimeSpan newValue) => _newValue = newValue;
|
||||
|
||||
public override void Apply(NatsServer server)
|
||||
=> server.Noticef("Reloaded: ping_interval = {0}", _newValue);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Reload option for the <c>ping_max</c> setting.
|
||||
/// Mirrors Go <c>maxPingsOutOption</c> struct in reload.go.
|
||||
/// </summary>
|
||||
internal sealed class MaxPingsOutReloadOption : NoopReloadOption
|
||||
{
|
||||
private readonly int _newValue;
|
||||
public MaxPingsOutReloadOption(int newValue) => _newValue = newValue;
|
||||
|
||||
public override void Apply(NatsServer server)
|
||||
=> server.Noticef("Reloaded: ping_max = {0}", _newValue);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Reload option for the <c>write_deadline</c> setting.
|
||||
/// Mirrors Go <c>writeDeadlineOption</c> struct in reload.go.
|
||||
/// </summary>
|
||||
internal sealed class WriteDeadlineReloadOption : NoopReloadOption
|
||||
{
|
||||
private readonly TimeSpan _newValue;
|
||||
public WriteDeadlineReloadOption(TimeSpan newValue) => _newValue = newValue;
|
||||
|
||||
public override void Apply(NatsServer server)
|
||||
=> server.Noticef("Reloaded: write_deadline = {0}", _newValue);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Reload option for the <c>client_advertise</c> setting.
|
||||
/// Mirrors Go <c>clientAdvertiseOption</c> struct in reload.go.
|
||||
/// </summary>
|
||||
internal sealed class ClientAdvertiseReloadOption : NoopReloadOption
|
||||
{
|
||||
private readonly string _newValue;
|
||||
public ClientAdvertiseReloadOption(string newValue) => _newValue = newValue;
|
||||
|
||||
public override void Apply(NatsServer server)
|
||||
{
|
||||
// TODO: session 13 — call server.SetInfoHostPort()
|
||||
server.Noticef("Reload: client_advertise = {0}", _newValue);
|
||||
}
|
||||
}
|
||||
|
||||
// =============================================================================
|
||||
// JetStream option type
|
||||
// =============================================================================
|
||||
|
||||
/// <summary>
|
||||
/// Reload option for the <c>jetstream</c> setting.
|
||||
/// Mirrors Go <c>jetStreamOption</c> struct in reload.go.
|
||||
/// </summary>
|
||||
internal sealed class JetStreamReloadOption : NoopReloadOption
|
||||
{
|
||||
private readonly bool _newValue;
|
||||
public JetStreamReloadOption(bool newValue) => _newValue = newValue;
|
||||
|
||||
public override bool IsJetStreamChange() => true;
|
||||
public override bool IsStatszChange() => true;
|
||||
|
||||
public override void Apply(NatsServer server)
|
||||
=> server.Noticef("Reloaded: JetStream");
|
||||
}
|
||||
|
||||
// =============================================================================
|
||||
// Miscellaneous option types
|
||||
// =============================================================================
|
||||
|
||||
/// <summary>
|
||||
/// Reload option for the <c>default_sentinel</c> setting.
|
||||
/// Mirrors Go <c>defaultSentinelOption</c> struct in reload.go.
|
||||
/// </summary>
|
||||
internal sealed class DefaultSentinelReloadOption : NoopReloadOption
|
||||
{
|
||||
private readonly string _newValue;
|
||||
public DefaultSentinelReloadOption(string newValue) => _newValue = newValue;
|
||||
|
||||
public override void Apply(NatsServer server)
|
||||
=> server.Noticef("Reloaded: default_sentinel = {0}", _newValue);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Reload option for the OCSP setting.
|
||||
/// The new value is stored as <c>object?</c> pending the port of <c>OCSPConfig</c>.
|
||||
/// Mirrors Go <c>ocspOption</c> struct in reload.go.
|
||||
/// TODO: session 13 — replace object? with ported OcspConfig type.
|
||||
/// </summary>
|
||||
internal sealed class OcspReloadOption : TlsBaseReloadOption
|
||||
{
|
||||
// TODO: session 13 — replace object? with ported OcspConfig type
|
||||
private readonly object? _newValue;
|
||||
public OcspReloadOption(object? newValue) => _newValue = newValue;
|
||||
|
||||
public override void Apply(NatsServer server)
|
||||
=> server.Noticef("Reloaded: OCSP");
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Reload option for the OCSP response cache setting.
|
||||
/// The new value is stored as <c>object?</c> pending the port of
|
||||
/// <c>OCSPResponseCacheConfig</c>.
|
||||
/// Mirrors Go <c>ocspResponseCacheOption</c> struct in reload.go.
|
||||
/// TODO: session 13 — replace object? with ported OcspResponseCacheConfig type.
|
||||
/// </summary>
|
||||
internal sealed class OcspResponseCacheReloadOption : TlsBaseReloadOption
|
||||
{
|
||||
// TODO: session 13 — replace object? with ported OcspResponseCacheConfig type
|
||||
private readonly object? _newValue;
|
||||
public OcspResponseCacheReloadOption(object? newValue) => _newValue = newValue;
|
||||
|
||||
public override void Apply(NatsServer server)
|
||||
=> server.Noticef("Reloaded OCSP peer cache");
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Reload option for the <c>connect_error_reports</c> setting.
|
||||
/// Mirrors Go <c>connectErrorReports</c> struct in reload.go.
|
||||
/// </summary>
|
||||
internal sealed class ConnectErrorReportsReloadOption : NoopReloadOption
|
||||
{
|
||||
private readonly int _newValue;
|
||||
public ConnectErrorReportsReloadOption(int newValue) => _newValue = newValue;
|
||||
|
||||
public override void Apply(NatsServer server)
|
||||
=> server.Noticef("Reloaded: connect_error_reports = {0}", _newValue);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Reload option for the <c>reconnect_error_reports</c> setting.
|
||||
/// Mirrors Go <c>reconnectErrorReports</c> struct in reload.go.
|
||||
/// </summary>
|
||||
internal sealed class ReconnectErrorReportsReloadOption : NoopReloadOption
|
||||
{
|
||||
private readonly int _newValue;
|
||||
public ReconnectErrorReportsReloadOption(int newValue) => _newValue = newValue;
|
||||
|
||||
public override void Apply(NatsServer server)
|
||||
=> server.Noticef("Reloaded: reconnect_error_reports = {0}", _newValue);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Reload option for the <c>max_traced_msg_len</c> setting.
|
||||
/// Mirrors Go <c>maxTracedMsgLenOption</c> struct in reload.go.
|
||||
/// </summary>
|
||||
internal sealed class MaxTracedMsgLenReloadOption : NoopReloadOption
|
||||
{
|
||||
private readonly int _newValue;
|
||||
public MaxTracedMsgLenReloadOption(int newValue) => _newValue = newValue;
|
||||
|
||||
public override void Apply(NatsServer server)
|
||||
{
|
||||
// TODO: session 13 — update server.Opts.MaxTracedMsgLen under lock
|
||||
server.Noticef("Reloaded: max_traced_msg_len = {0}", _newValue);
|
||||
}
|
||||
}
|
||||
|
||||
// =============================================================================
|
||||
// MQTT option types
|
||||
// =============================================================================
|
||||
|
||||
/// <summary>
|
||||
/// Reload option for the MQTT <c>ack_wait</c> setting.
|
||||
/// Mirrors Go <c>mqttAckWaitReload</c> struct in reload.go.
|
||||
/// </summary>
|
||||
internal sealed class MqttAckWaitReloadOption : NoopReloadOption
|
||||
{
|
||||
private readonly TimeSpan _newValue;
|
||||
public MqttAckWaitReloadOption(TimeSpan newValue) => _newValue = newValue;
|
||||
|
||||
public override void Apply(NatsServer server)
|
||||
=> server.Noticef("Reloaded: MQTT ack_wait = {0}", _newValue);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Reload option for the MQTT <c>max_ack_pending</c> setting.
|
||||
/// Mirrors Go <c>mqttMaxAckPendingReload</c> struct in reload.go.
|
||||
/// </summary>
|
||||
internal sealed class MqttMaxAckPendingReloadOption : NoopReloadOption
|
||||
{
|
||||
private readonly ushort _newValue;
|
||||
public MqttMaxAckPendingReloadOption(ushort newValue) => _newValue = newValue;
|
||||
|
||||
public override void Apply(NatsServer server)
|
||||
{
|
||||
// TODO: session 13 — call server.MqttUpdateMaxAckPending(_newValue)
|
||||
server.Noticef("Reloaded: MQTT max_ack_pending = {0}", _newValue);
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Reload option for the MQTT <c>stream_replicas</c> setting.
|
||||
/// Mirrors Go <c>mqttStreamReplicasReload</c> struct in reload.go.
|
||||
/// </summary>
|
||||
internal sealed class MqttStreamReplicasReloadOption : NoopReloadOption
|
||||
{
|
||||
private readonly int _newValue;
|
||||
public MqttStreamReplicasReloadOption(int newValue) => _newValue = newValue;
|
||||
|
||||
public override void Apply(NatsServer server)
|
||||
=> server.Noticef("Reloaded: MQTT stream_replicas = {0}", _newValue);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Reload option for the MQTT <c>consumer_replicas</c> setting.
|
||||
/// Mirrors Go <c>mqttConsumerReplicasReload</c> struct in reload.go.
|
||||
/// </summary>
|
||||
internal sealed class MqttConsumerReplicasReloadOption : NoopReloadOption
|
||||
{
|
||||
private readonly int _newValue;
|
||||
public MqttConsumerReplicasReloadOption(int newValue) => _newValue = newValue;
|
||||
|
||||
public override void Apply(NatsServer server)
|
||||
=> server.Noticef("Reloaded: MQTT consumer_replicas = {0}", _newValue);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Reload option for the MQTT <c>consumer_memory_storage</c> setting.
|
||||
/// Mirrors Go <c>mqttConsumerMemoryStorageReload</c> struct in reload.go.
|
||||
/// </summary>
|
||||
internal sealed class MqttConsumerMemoryStorageReloadOption : NoopReloadOption
|
||||
{
|
||||
private readonly bool _newValue;
|
||||
public MqttConsumerMemoryStorageReloadOption(bool newValue) => _newValue = newValue;
|
||||
|
||||
public override void Apply(NatsServer server)
|
||||
=> server.Noticef("Reloaded: MQTT consumer_memory_storage = {0}", _newValue);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Reload option for the MQTT <c>consumer_inactive_threshold</c> setting.
|
||||
/// Mirrors Go <c>mqttInactiveThresholdReload</c> struct in reload.go.
|
||||
/// </summary>
|
||||
internal sealed class MqttInactiveThresholdReloadOption : NoopReloadOption
|
||||
{
|
||||
private readonly TimeSpan _newValue;
|
||||
public MqttInactiveThresholdReloadOption(TimeSpan newValue) => _newValue = newValue;
|
||||
|
||||
public override void Apply(NatsServer server)
|
||||
=> server.Noticef("Reloaded: MQTT consumer_inactive_threshold = {0}", _newValue);
|
||||
}
|
||||
|
||||
// =============================================================================
|
||||
// Profiling option type
|
||||
// =============================================================================
|
||||
|
||||
/// <summary>
|
||||
/// Reload option for the <c>prof_block_rate</c> setting.
|
||||
/// Mirrors Go <c>profBlockRateReload</c> struct in reload.go.
|
||||
/// </summary>
|
||||
internal sealed class ProfBlockRateReloadOption : NoopReloadOption
|
||||
{
|
||||
private readonly int _newValue;
|
||||
public ProfBlockRateReloadOption(int newValue) => _newValue = newValue;
|
||||
|
||||
public override void Apply(NatsServer server)
|
||||
{
|
||||
// TODO: session 13 — call server.SetBlockProfileRate(_newValue)
|
||||
server.Noticef("Reloaded: prof_block_rate = {0}", _newValue);
|
||||
}
|
||||
}
|
||||
|
||||
// =============================================================================
|
||||
// LeafNode option type
|
||||
// =============================================================================
|
||||
|
||||
/// <summary>
|
||||
/// Reload option for leaf-node settings (TLS handshake-first, compression, disabled).
|
||||
/// Mirrors Go <c>leafNodeOption</c> struct in reload.go.
|
||||
/// </summary>
|
||||
internal sealed class LeafNodeReloadOption : NoopReloadOption
|
||||
{
|
||||
private readonly bool _tlsFirstChanged;
|
||||
private readonly bool _compressionChanged;
|
||||
private readonly bool _disabledChanged;
|
||||
|
||||
public LeafNodeReloadOption(bool tlsFirstChanged, bool compressionChanged, bool disabledChanged)
|
||||
{
|
||||
_tlsFirstChanged = tlsFirstChanged;
|
||||
_compressionChanged = compressionChanged;
|
||||
_disabledChanged = disabledChanged;
|
||||
}
|
||||
|
||||
public override void Apply(NatsServer server)
|
||||
{
|
||||
// TODO: session 13 — full leaf-node apply logic from Go leafNodeOption.Apply()
|
||||
if (_tlsFirstChanged)
|
||||
server.Noticef("Reloaded: LeafNode TLS HandshakeFirst settings");
|
||||
if (_compressionChanged)
|
||||
server.Noticef("Reloaded: LeafNode compression settings");
|
||||
if (_disabledChanged)
|
||||
server.Noticef("Reloaded: LeafNode disabled/enabled state");
|
||||
}
|
||||
}
|
||||
|
||||
// =============================================================================
|
||||
// NoFastProducerStall option type
|
||||
// =============================================================================
|
||||
|
||||
/// <summary>
|
||||
/// Reload option for the <c>no_fast_producer_stall</c> setting.
|
||||
/// Mirrors Go <c>noFastProdStallReload</c> struct in reload.go.
|
||||
/// </summary>
|
||||
internal sealed class NoFastProducerStallReloadOption : NoopReloadOption
|
||||
{
|
||||
private readonly bool _noStall;
|
||||
public NoFastProducerStallReloadOption(bool noStall) => _noStall = noStall;
|
||||
|
||||
public override void Apply(NatsServer server)
|
||||
{
|
||||
var not = _noStall ? "not " : string.Empty;
|
||||
server.Noticef("Reloaded: fast producers will {0}be stalled", not);
|
||||
}
|
||||
}
|
||||
|
||||
// =============================================================================
|
||||
// Proxies option type
|
||||
// =============================================================================
|
||||
|
||||
/// <summary>
|
||||
/// Reload option for the <c>proxies</c> trusted keys setting.
|
||||
/// Mirrors Go <c>proxiesReload</c> struct in reload.go.
|
||||
/// </summary>
|
||||
internal sealed class ProxiesReloadOption : NoopReloadOption
|
||||
{
|
||||
private readonly string[] _add;
|
||||
private readonly string[] _del;
|
||||
|
||||
public ProxiesReloadOption(string[] add, string[] del)
|
||||
{
|
||||
_add = add;
|
||||
_del = del;
|
||||
}
|
||||
|
||||
public override void Apply(NatsServer server)
|
||||
{
|
||||
// TODO: session 13 — disconnect proxied clients for removed keys,
|
||||
// call server.ProcessProxiesTrustedKeys()
|
||||
if (_del.Length > 0)
|
||||
server.Noticef("Reloaded: proxies trusted keys {0} were removed", string.Join(", ", _del));
|
||||
if (_add.Length > 0)
|
||||
server.Noticef("Reloaded: proxies trusted keys {0} were added", string.Join(", ", _add));
|
||||
}
|
||||
}
|
||||
|
||||
// =============================================================================
|
||||
// ConfigReloader — stub for server/reload.go Reload() / ReloadOptions()
|
||||
// =============================================================================
|
||||
|
||||
/// <summary>
|
||||
/// Stub for the configuration reloader.
|
||||
/// Full reload logic (diffOptions, applyOptions, recheckPinnedCerts) will be
|
||||
/// implemented in a future session.
|
||||
/// Mirrors Go <c>Server.Reload()</c> and <c>Server.ReloadOptions()</c> in
|
||||
/// server/reload.go.
|
||||
/// </summary>
|
||||
internal sealed class ConfigReloader
|
||||
{
|
||||
// TODO: session 13 — full reload logic
|
||||
// Mirrors Go server.Reload() / server.ReloadOptions() in server/reload.go
|
||||
|
||||
/// <summary>
|
||||
/// Stub: read and apply the server config file.
|
||||
/// Returns null on success; a non-null Exception describes the failure.
|
||||
/// </summary>
|
||||
public Exception? Reload(NatsServer server) => null;
|
||||
}
|
||||
778
dotnet/src/ZB.MOM.NatsNet.Server/Events/EventTypes.cs
Normal file
778
dotnet/src/ZB.MOM.NatsNet.Server/Events/EventTypes.cs
Normal file
@@ -0,0 +1,778 @@
|
||||
// Copyright 2018-2026 The NATS Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
//
|
||||
// Adapted from server/events.go in the NATS server Go source.
|
||||
|
||||
using System.Text.Json.Serialization;
|
||||
using ZB.MOM.NatsNet.Server.Auth.CertificateIdentityProvider;
|
||||
using ZB.MOM.NatsNet.Server.Internal;
|
||||
|
||||
namespace ZB.MOM.NatsNet.Server;
|
||||
|
||||
// ============================================================================
|
||||
// System subject constants
|
||||
// Mirrors the const block at the top of server/events.go.
|
||||
// ============================================================================
|
||||
|
||||
/// <summary>
|
||||
/// System-account subject templates and constants used for internal NATS server
|
||||
/// event routing. All format-string fields use <see cref="string.Format"/> with
|
||||
/// the appropriate server/account ID substituted at call time.
|
||||
/// Mirrors the const block in server/events.go.
|
||||
/// </summary>
|
||||
public static class SystemSubjects
|
||||
{
|
||||
// Account lookup / claims
|
||||
public const string AccLookupReqSubj = "$SYS.REQ.ACCOUNT.{0}.CLAIMS.LOOKUP";
|
||||
public const string AccPackReqSubj = "$SYS.REQ.CLAIMS.PACK";
|
||||
public const string AccListReqSubj = "$SYS.REQ.CLAIMS.LIST";
|
||||
public const string AccClaimsReqSubj = "$SYS.REQ.CLAIMS.UPDATE";
|
||||
public const string AccDeleteReqSubj = "$SYS.REQ.CLAIMS.DELETE";
|
||||
|
||||
// Connection events
|
||||
public const string ConnectEventSubj = "$SYS.ACCOUNT.{0}.CONNECT";
|
||||
public const string DisconnectEventSubj = "$SYS.ACCOUNT.{0}.DISCONNECT";
|
||||
|
||||
// Direct request routing
|
||||
public const string AccDirectReqSubj = "$SYS.REQ.ACCOUNT.{0}.{1}";
|
||||
public const string AccPingReqSubj = "$SYS.REQ.ACCOUNT.PING.{0}";
|
||||
|
||||
// Account update events (both old and new forms kept for backward compatibility)
|
||||
public const string AccUpdateEventSubjOld = "$SYS.ACCOUNT.{0}.CLAIMS.UPDATE";
|
||||
public const string AccUpdateEventSubjNew = "$SYS.REQ.ACCOUNT.{0}.CLAIMS.UPDATE";
|
||||
|
||||
public const string ConnsRespSubj = "$SYS._INBOX_.{0}";
|
||||
public const string AccConnsEventSubjNew = "$SYS.ACCOUNT.{0}.SERVER.CONNS";
|
||||
public const string AccConnsEventSubjOld = "$SYS.SERVER.ACCOUNT.{0}.CONNS"; // backward compat
|
||||
|
||||
// Server lifecycle events
|
||||
public const string LameDuckEventSubj = "$SYS.SERVER.{0}.LAMEDUCK";
|
||||
public const string ShutdownEventSubj = "$SYS.SERVER.{0}.SHUTDOWN";
|
||||
|
||||
// Client control
|
||||
public const string ClientKickReqSubj = "$SYS.REQ.SERVER.{0}.KICK";
|
||||
public const string ClientLdmReqSubj = "$SYS.REQ.SERVER.{0}.LDM";
|
||||
|
||||
// Auth error events
|
||||
public const string AuthErrorEventSubj = "$SYS.SERVER.{0}.CLIENT.AUTH.ERR";
|
||||
public const string AuthErrorAccountEventSubj = "$SYS.ACCOUNT.CLIENT.AUTH.ERR";
|
||||
|
||||
// Stats
|
||||
public const string ServerStatsSubj = "$SYS.SERVER.{0}.STATSZ";
|
||||
public const string ServerDirectReqSubj = "$SYS.REQ.SERVER.{0}.{1}";
|
||||
public const string ServerPingReqSubj = "$SYS.REQ.SERVER.PING.{0}";
|
||||
public const string ServerStatsPingReqSubj = "$SYS.REQ.SERVER.PING"; // deprecated; use STATSZ variant
|
||||
public const string ServerReloadReqSubj = "$SYS.REQ.SERVER.{0}.RELOAD";
|
||||
|
||||
// Leaf node
|
||||
public const string LeafNodeConnectEventSubj = "$SYS.ACCOUNT.{0}.LEAFNODE.CONNECT"; // internal only
|
||||
|
||||
// Latency
|
||||
public const string RemoteLatencyEventSubj = "$SYS.LATENCY.M2.{0}";
|
||||
public const string InboxRespSubj = "$SYS._INBOX.{0}.{1}";
|
||||
|
||||
// User info
|
||||
public const string UserDirectInfoSubj = "$SYS.REQ.USER.INFO";
|
||||
public const string UserDirectReqSubj = "$SYS.REQ.USER.{0}.INFO";
|
||||
|
||||
// Subscription count
|
||||
public const string AccNumSubsReqSubj = "$SYS.REQ.ACCOUNT.NSUBS";
|
||||
|
||||
// Debug
|
||||
public const string AccSubsSubj = "$SYS.DEBUG.SUBSCRIBERS";
|
||||
|
||||
// OCSP peer events
|
||||
public const string OcspPeerRejectEventSubj = "$SYS.SERVER.{0}.OCSP.PEER.CONN.REJECT";
|
||||
public const string OcspPeerChainlinkInvalidEventSubj = "$SYS.SERVER.{0}.OCSP.PEER.LINK.INVALID";
|
||||
|
||||
// Parsing constants (token indexes / counts)
|
||||
public const int AccLookupReqTokens = 6;
|
||||
public const int ShutdownEventTokens = 4;
|
||||
public const int ServerSubjectIndex = 2;
|
||||
public const int AccUpdateTokensNew = 6;
|
||||
public const int AccUpdateTokensOld = 5;
|
||||
public const int AccUpdateAccIdxOld = 2;
|
||||
public const int AccReqTokens = 5;
|
||||
public const int AccReqAccIndex = 3;
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// Advisory message type schema URI constants
|
||||
// Mirrors the const string variables near each struct in server/events.go.
|
||||
// ============================================================================
|
||||
|
||||
public static class EventMsgTypes
|
||||
{
|
||||
public const string ConnectEventMsgType = "io.nats.server.advisory.v1.client_connect";
|
||||
public const string DisconnectEventMsgType = "io.nats.server.advisory.v1.client_disconnect";
|
||||
public const string OcspPeerRejectEventMsgType = "io.nats.server.advisory.v1.ocsp_peer_reject";
|
||||
public const string OcspPeerChainlinkInvalidEventMsgType = "io.nats.server.advisory.v1.ocsp_peer_link_invalid";
|
||||
public const string AccountNumConnsMsgType = "io.nats.server.advisory.v1.account_connections";
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// Heartbeat / rate-limit intervals (mirrors package-level vars in events.go)
|
||||
// ============================================================================
|
||||
|
||||
/// <summary>
|
||||
/// Default timing constants for server event heartbeats and rate limiting.
|
||||
/// Mirrors Go package-level <c>var</c> declarations in events.go.
|
||||
/// </summary>
|
||||
public static class EventIntervals
|
||||
{
|
||||
/// <summary>Default HB interval for events. Mirrors Go <c>eventsHBInterval = 30s</c>.</summary>
|
||||
public static readonly TimeSpan EventsHbInterval = TimeSpan.FromSeconds(30);
|
||||
|
||||
/// <summary>Default HB interval for stats. Mirrors Go <c>statsHBInterval = 10s</c>.</summary>
|
||||
public static readonly TimeSpan StatsHbInterval = TimeSpan.FromSeconds(10);
|
||||
|
||||
/// <summary>Minimum interval between statsz publishes. Mirrors Go <c>defaultStatszRateLimit = 1s</c>.</summary>
|
||||
public static readonly TimeSpan DefaultStatszRateLimit = TimeSpan.FromSeconds(1);
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// SysMsgHandler — delegate for internal system message dispatch
|
||||
// Mirrors Go <c>sysMsgHandler</c> func type in events.go.
|
||||
// ============================================================================
|
||||
|
||||
/// <summary>
|
||||
/// Callback invoked when an internal system message is dispatched.
|
||||
/// Mirrors Go <c>sysMsgHandler</c> in server/events.go.
|
||||
/// </summary>
|
||||
public delegate void SysMsgHandler(
|
||||
Subscription sub,
|
||||
NatsClient client,
|
||||
Account acc,
|
||||
string subject,
|
||||
string reply,
|
||||
byte[] hdr,
|
||||
byte[] msg);
|
||||
|
||||
// ============================================================================
|
||||
// InSysMsg — queued internal system message
|
||||
// Mirrors Go <c>inSysMsg</c> struct in server/events.go.
|
||||
// ============================================================================
|
||||
|
||||
/// <summary>
|
||||
/// Holds a system message queued for internal delivery, avoiding the
|
||||
/// route/gateway path.
|
||||
/// Mirrors Go <c>inSysMsg</c> struct in server/events.go.
|
||||
/// </summary>
|
||||
internal sealed class InSysMsg
|
||||
{
|
||||
public Subscription? Sub { get; set; }
|
||||
public NatsClient? Client { get; set; }
|
||||
public Account? Acc { get; set; }
|
||||
public string Subject { get; set; } = string.Empty;
|
||||
public string Reply { get; set; } = string.Empty;
|
||||
public byte[]? Hdr { get; set; }
|
||||
public byte[]? Msg { get; set; }
|
||||
public SysMsgHandler? Cb { get; set; }
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// InternalState — server internal/sys state
|
||||
// Mirrors Go <c>internal</c> struct in server/events.go.
|
||||
// Uses Monitor lock (lock(this)) in place of Go's embedded sync.Mutex.
|
||||
// ============================================================================
|
||||
|
||||
/// <summary>
|
||||
/// Holds all internal state used by the server's system-account event
|
||||
/// machinery: account reference, client, send/receive queues, timers,
|
||||
/// reply handlers, and heartbeat configuration.
|
||||
/// Mirrors Go <c>internal</c> struct in server/events.go.
|
||||
/// </summary>
|
||||
internal sealed class InternalState
|
||||
{
|
||||
// ---- identity / sequencing ----
|
||||
public Account? Account { get; set; }
|
||||
public NatsClient? Client { get; set; }
|
||||
public ulong Seq { get; set; }
|
||||
public int Sid { get; set; }
|
||||
|
||||
// ---- remote server tracking ----
|
||||
/// <summary>Map of server ID → serverUpdate. Mirrors Go <c>servers map[string]*serverUpdate</c>.</summary>
|
||||
public Dictionary<string, ServerUpdate> Servers { get; set; } = new();
|
||||
|
||||
// ---- timers ----
|
||||
/// <summary>Sweeper timer. Mirrors Go <c>sweeper *time.Timer</c>.</summary>
|
||||
public System.Threading.Timer? Sweeper { get; set; }
|
||||
|
||||
/// <summary>Stats heartbeat timer. Mirrors Go <c>stmr *time.Timer</c>.</summary>
|
||||
public System.Threading.Timer? StatsMsgTimer { get; set; }
|
||||
|
||||
// ---- reply handlers ----
|
||||
/// <summary>
|
||||
/// Pending reply subject → handler map.
|
||||
/// Mirrors Go <c>replies map[string]msgHandler</c>.
|
||||
/// </summary>
|
||||
public Dictionary<string, Action<Subscription, NatsClient, Account, string, string, byte[], byte[]>> Replies { get; set; } = new();
|
||||
|
||||
// ---- queues ----
|
||||
/// <summary>Outbound message send queue. Mirrors Go <c>sendq *ipQueue[*pubMsg]</c>.</summary>
|
||||
public IpQueue<PubMsg>? SendQueue { get; set; }
|
||||
|
||||
/// <summary>Inbound receive queue. Mirrors Go <c>recvq *ipQueue[*inSysMsg]</c>.</summary>
|
||||
public IpQueue<InSysMsg>? RecvQueue { get; set; }
|
||||
|
||||
/// <summary>Priority receive queue for STATSZ/Pings. Mirrors Go <c>recvqp *ipQueue[*inSysMsg]</c>.</summary>
|
||||
public IpQueue<InSysMsg>? RecvQueuePriority { get; set; }
|
||||
|
||||
/// <summary>Reset channel used to restart the send loop. Mirrors Go <c>resetCh chan struct{}</c>.</summary>
|
||||
public System.Threading.Channels.Channel<bool>? ResetChannel { get; set; }
|
||||
|
||||
// ---- durations ----
|
||||
/// <summary>Maximum time before an orphaned server entry is removed. Mirrors Go <c>orphMax</c>.</summary>
|
||||
public TimeSpan OrphanMax { get; set; }
|
||||
|
||||
/// <summary>Interval at which orphan checks run. Mirrors Go <c>chkOrph</c>.</summary>
|
||||
public TimeSpan CheckOrphan { get; set; }
|
||||
|
||||
/// <summary>Interval between statsz publishes. Mirrors Go <c>statsz</c>.</summary>
|
||||
public TimeSpan StatszInterval { get; set; }
|
||||
|
||||
/// <summary>Client-facing statsz interval. Mirrors Go <c>cstatsz</c>.</summary>
|
||||
public TimeSpan ClientStatszInterval { get; set; }
|
||||
|
||||
// ---- misc ----
|
||||
/// <summary>Short hash used for shared-inbox routing. Mirrors Go <c>shash string</c>.</summary>
|
||||
public string ShortHash { get; set; } = string.Empty;
|
||||
|
||||
/// <summary>Inbox prefix for this server's internal client. Mirrors Go <c>inboxPre string</c>.</summary>
|
||||
public string InboxPrefix { get; set; } = string.Empty;
|
||||
|
||||
/// <summary>Subscription for remote stats. Mirrors Go <c>remoteStatsSub *subscription</c>.</summary>
|
||||
public Subscription? RemoteStatsSub { get; set; }
|
||||
|
||||
/// <summary>Time of the last statsz publish. Mirrors Go <c>lastStatsz time.Time</c>.</summary>
|
||||
public DateTime LastStatsz { get; set; }
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// ServerUpdate — remote server heartbeat tracking
|
||||
// Mirrors Go <c>serverUpdate</c> struct in server/events.go.
|
||||
// ============================================================================
|
||||
|
||||
/// <summary>
|
||||
/// Tracks the sequence number and last-seen timestamp of a remote server's
|
||||
/// system heartbeat. Used to detect orphaned servers.
|
||||
/// Mirrors Go <c>serverUpdate</c> struct in server/events.go.
|
||||
/// </summary>
|
||||
internal sealed class ServerUpdate
|
||||
{
|
||||
/// <summary>Last sequence number received from the remote server.</summary>
|
||||
public ulong Seq { get; set; }
|
||||
|
||||
/// <summary>Wall-clock time of the last heartbeat.</summary>
|
||||
public DateTime LTime { get; set; }
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// PubMsg — internally-queued outbound publish message
|
||||
// Mirrors Go <c>pubMsg</c> struct in server/events.go.
|
||||
// ============================================================================
|
||||
|
||||
/// <summary>
|
||||
/// Holds an outbound message that the server wants to publish via the internal
|
||||
/// send loop, avoiding direct route/gateway writes.
|
||||
/// Mirrors Go <c>pubMsg</c> struct in server/events.go.
|
||||
/// </summary>
|
||||
internal sealed class PubMsg
|
||||
{
|
||||
public NatsClient? Client { get; set; }
|
||||
public string Subject { get; set; } = string.Empty;
|
||||
public string Reply { get; set; } = string.Empty;
|
||||
public ServerInfo? Si { get; set; }
|
||||
public byte[]? Hdr { get; set; }
|
||||
public object? Msg { get; set; }
|
||||
|
||||
/// <summary>Compression type. TODO: session 12 — wire up compressionType enum.</summary>
|
||||
public int Oct { get; set; }
|
||||
|
||||
public bool Echo { get; set; }
|
||||
public bool Last { get; set; }
|
||||
|
||||
// TODO: session 12 — add pool return helper (returnToPool).
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// DataStats — message/byte counter pair (sent or received)
|
||||
// Mirrors Go <c>DataStats</c> struct in server/events.go.
|
||||
// ============================================================================
|
||||
|
||||
/// <summary>
|
||||
/// Reports how many messages and bytes were sent or received.
|
||||
/// Optionally breaks out gateway, route, and leaf-node traffic.
|
||||
/// Mirrors Go <c>DataStats</c> struct in server/events.go.
|
||||
/// </summary>
|
||||
public sealed class DataStats
|
||||
{
|
||||
[JsonPropertyName("msgs")]
|
||||
public long Msgs { get; set; }
|
||||
|
||||
[JsonPropertyName("bytes")]
|
||||
public long Bytes { get; set; }
|
||||
|
||||
[JsonPropertyName("gateways")]
|
||||
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)]
|
||||
public MsgBytes? Gateways { get; set; }
|
||||
|
||||
[JsonPropertyName("routes")]
|
||||
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)]
|
||||
public MsgBytes? Routes { get; set; }
|
||||
|
||||
[JsonPropertyName("leafs")]
|
||||
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)]
|
||||
public MsgBytes? Leafs { get; set; }
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// MsgBytes — simple message+byte pair used inside DataStats
|
||||
// Mirrors Go <c>MsgBytes</c> struct in server/events.go.
|
||||
// ============================================================================
|
||||
|
||||
/// <summary>
|
||||
/// A simple pair of message and byte counts, used as a nested breakdown
|
||||
/// inside <see cref="DataStats"/>.
|
||||
/// Mirrors Go <c>MsgBytes</c> struct in server/events.go.
|
||||
/// </summary>
|
||||
public sealed class MsgBytes
|
||||
{
|
||||
[JsonPropertyName("msgs")]
|
||||
public long Msgs { get; set; }
|
||||
|
||||
[JsonPropertyName("bytes")]
|
||||
public long Bytes { get; set; }
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// RouteStat / GatewayStat — per-route and per-gateway stat snapshots
|
||||
// Mirrors Go <c>RouteStat</c> and <c>GatewayStat</c> in server/events.go.
|
||||
// ============================================================================
|
||||
|
||||
/// <summary>
|
||||
/// Statistics snapshot for a single cluster route connection.
|
||||
/// Mirrors Go <c>RouteStat</c> in server/events.go.
|
||||
/// </summary>
|
||||
public sealed class RouteStat
|
||||
{
|
||||
[JsonPropertyName("rid")]
|
||||
public ulong Id { get; set; }
|
||||
|
||||
[JsonPropertyName("name")]
|
||||
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)]
|
||||
public string? Name { get; set; }
|
||||
|
||||
[JsonPropertyName("sent")]
|
||||
public DataStats Sent { get; set; } = new();
|
||||
|
||||
[JsonPropertyName("received")]
|
||||
public DataStats Received { get; set; } = new();
|
||||
|
||||
[JsonPropertyName("pending")]
|
||||
public int Pending { get; set; }
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Statistics snapshot for a gateway connection.
|
||||
/// Mirrors Go <c>GatewayStat</c> in server/events.go.
|
||||
/// </summary>
|
||||
public sealed class GatewayStat
|
||||
{
|
||||
[JsonPropertyName("gwid")]
|
||||
public ulong Id { get; set; }
|
||||
|
||||
[JsonPropertyName("name")]
|
||||
public string Name { get; set; } = string.Empty;
|
||||
|
||||
[JsonPropertyName("sent")]
|
||||
public DataStats Sent { get; set; } = new();
|
||||
|
||||
[JsonPropertyName("received")]
|
||||
public DataStats Received { get; set; } = new();
|
||||
|
||||
[JsonPropertyName("inbound_connections")]
|
||||
public int NumInbound { get; set; }
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// ServerStatsMsg — periodic stats advisory published on $SYS.SERVER.{id}.STATSZ
|
||||
// Mirrors Go <c>ServerStatsMsg</c> struct in server/events.go.
|
||||
// ============================================================================
|
||||
|
||||
/// <summary>
|
||||
/// Periodic advisory message containing the current server statistics.
|
||||
/// Mirrors Go <c>ServerStatsMsg</c> struct in server/events.go.
|
||||
/// </summary>
|
||||
public sealed class ServerStatsMsg
|
||||
{
|
||||
[JsonPropertyName("server")]
|
||||
public ServerInfo Server { get; set; } = new();
|
||||
|
||||
[JsonPropertyName("statsz")]
|
||||
public ServerStatsAdvisory Stats { get; set; } = new();
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// ServerStatsAdvisory — the statsz payload inside ServerStatsMsg
|
||||
// Mirrors Go <c>ServerStats</c> struct (advisory form) in server/events.go.
|
||||
// NOTE: distinct from the internal ServerStats in NatsServerTypes.cs.
|
||||
// ============================================================================
|
||||
|
||||
/// <summary>
|
||||
/// The JSON-serialisable statistics payload included inside <see cref="ServerStatsMsg"/>.
|
||||
/// Mirrors Go <c>ServerStats</c> struct (advisory form) in server/events.go.
|
||||
/// </summary>
|
||||
public sealed class ServerStatsAdvisory
|
||||
{
|
||||
[JsonPropertyName("start")]
|
||||
public DateTime Start { get; set; }
|
||||
|
||||
[JsonPropertyName("mem")]
|
||||
public long Mem { get; set; }
|
||||
|
||||
[JsonPropertyName("cores")]
|
||||
public int Cores { get; set; }
|
||||
|
||||
[JsonPropertyName("cpu")]
|
||||
public double Cpu { get; set; }
|
||||
|
||||
[JsonPropertyName("connections")]
|
||||
public int Connections { get; set; }
|
||||
|
||||
[JsonPropertyName("total_connections")]
|
||||
public ulong TotalConnections { get; set; }
|
||||
|
||||
[JsonPropertyName("active_accounts")]
|
||||
public int ActiveAccounts { get; set; }
|
||||
|
||||
[JsonPropertyName("subscriptions")]
|
||||
public uint NumSubs { get; set; }
|
||||
|
||||
[JsonPropertyName("sent")]
|
||||
public DataStats Sent { get; set; } = new();
|
||||
|
||||
[JsonPropertyName("received")]
|
||||
public DataStats Received { get; set; } = new();
|
||||
|
||||
[JsonPropertyName("slow_consumers")]
|
||||
public long SlowConsumers { get; set; }
|
||||
|
||||
[JsonPropertyName("slow_consumer_stats")]
|
||||
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)]
|
||||
public SlowConsumersStats? SlowConsumersStats { get; set; }
|
||||
|
||||
[JsonPropertyName("stale_connections")]
|
||||
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)]
|
||||
public long StaleConnections { get; set; }
|
||||
|
||||
[JsonPropertyName("stale_connection_stats")]
|
||||
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)]
|
||||
public StaleConnectionStats? StaleConnectionStats { get; set; }
|
||||
|
||||
[JsonPropertyName("stalled_clients")]
|
||||
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)]
|
||||
public long StalledClients { get; set; }
|
||||
|
||||
[JsonPropertyName("routes")]
|
||||
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)]
|
||||
public List<RouteStat>? Routes { get; set; }
|
||||
|
||||
[JsonPropertyName("gateways")]
|
||||
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)]
|
||||
public List<GatewayStat>? Gateways { get; set; }
|
||||
|
||||
[JsonPropertyName("active_servers")]
|
||||
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)]
|
||||
public int ActiveServers { get; set; }
|
||||
|
||||
/// <summary>JetStream stats. TODO: session 19 — wire JetStreamVarz type.</summary>
|
||||
[JsonPropertyName("jetstream")]
|
||||
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)]
|
||||
public object? JetStream { get; set; }
|
||||
|
||||
[JsonPropertyName("gomemlimit")]
|
||||
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)]
|
||||
public long MemLimit { get; set; }
|
||||
|
||||
[JsonPropertyName("gomaxprocs")]
|
||||
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)]
|
||||
public int MaxProcs { get; set; }
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// SlowConsumersStats / StaleConnectionStats — advisory-layer per-kind counters
|
||||
// These are the JSON-serialisable variants used in ServerStatsAdvisory.
|
||||
// The internal atomic counters live in NatsServerTypes.cs (SlowConsumerStats /
|
||||
// StaleConnectionStats with different casing).
|
||||
// ============================================================================
|
||||
|
||||
/// <summary>
|
||||
/// Per-kind slow-consumer counters included in stats advisories.
|
||||
/// Mirrors Go <c>SlowConsumersStats</c> in server/monitor.go.
|
||||
/// </summary>
|
||||
public sealed class SlowConsumersStats
|
||||
{
|
||||
[JsonPropertyName("clients")]
|
||||
public ulong Clients { get; set; }
|
||||
|
||||
[JsonPropertyName("routes")]
|
||||
public ulong Routes { get; set; }
|
||||
|
||||
[JsonPropertyName("gateways")]
|
||||
public ulong Gateways { get; set; }
|
||||
|
||||
[JsonPropertyName("leafs")]
|
||||
public ulong Leafs { get; set; }
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Per-kind stale-connection counters included in stats advisories.
|
||||
/// Mirrors Go <c>StaleConnectionStats</c> in server/monitor.go.
|
||||
/// </summary>
|
||||
public sealed class StaleConnectionStats
|
||||
{
|
||||
[JsonPropertyName("clients")]
|
||||
public ulong Clients { get; set; }
|
||||
|
||||
[JsonPropertyName("routes")]
|
||||
public ulong Routes { get; set; }
|
||||
|
||||
[JsonPropertyName("gateways")]
|
||||
public ulong Gateways { get; set; }
|
||||
|
||||
[JsonPropertyName("leafs")]
|
||||
public ulong Leafs { get; set; }
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// ConnectEventMsg / DisconnectEventMsg — client lifecycle advisories
|
||||
// Mirrors Go structs in server/events.go.
|
||||
// ============================================================================
|
||||
|
||||
/// <summary>
|
||||
/// Advisory published on <c>$SYS.ACCOUNT.{acc}.CONNECT</c> when a new
|
||||
/// client connection is established within a tracked account.
|
||||
/// Mirrors Go <c>ConnectEventMsg</c> in server/events.go.
|
||||
/// </summary>
|
||||
public sealed class ConnectEventMsg : TypedEvent
|
||||
{
|
||||
[JsonPropertyName("server")]
|
||||
public ServerInfo Server { get; set; } = new();
|
||||
|
||||
[JsonPropertyName("client")]
|
||||
public ClientInfo Client { get; set; } = new();
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Advisory published on <c>$SYS.ACCOUNT.{acc}.DISCONNECT</c> when a
|
||||
/// previously-tracked client connection closes.
|
||||
/// Mirrors Go <c>DisconnectEventMsg</c> in server/events.go.
|
||||
/// </summary>
|
||||
public sealed class DisconnectEventMsg : TypedEvent
|
||||
{
|
||||
[JsonPropertyName("server")]
|
||||
public ServerInfo Server { get; set; } = new();
|
||||
|
||||
[JsonPropertyName("client")]
|
||||
public ClientInfo Client { get; set; } = new();
|
||||
|
||||
[JsonPropertyName("sent")]
|
||||
public DataStats Sent { get; set; } = new();
|
||||
|
||||
[JsonPropertyName("received")]
|
||||
public DataStats Received { get; set; } = new();
|
||||
|
||||
[JsonPropertyName("reason")]
|
||||
public string Reason { get; set; } = string.Empty;
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// OCSPPeerRejectEventMsg / OCSPPeerChainlinkInvalidEventMsg
|
||||
// Mirrors Go structs in server/events.go.
|
||||
// ============================================================================
|
||||
|
||||
/// <summary>
|
||||
/// Advisory published when a peer TLS handshake is rejected due to OCSP
|
||||
/// invalidation of the peer's leaf certificate.
|
||||
/// Mirrors Go <c>OCSPPeerRejectEventMsg</c> in server/events.go.
|
||||
/// </summary>
|
||||
public sealed class OcspPeerRejectEventMsg : TypedEvent
|
||||
{
|
||||
[JsonPropertyName("kind")]
|
||||
public string Kind { get; set; } = string.Empty;
|
||||
|
||||
[JsonPropertyName("peer")]
|
||||
public CertInfo Peer { get; set; } = new();
|
||||
|
||||
[JsonPropertyName("server")]
|
||||
public ServerInfo Server { get; set; } = new();
|
||||
|
||||
[JsonPropertyName("reason")]
|
||||
public string Reason { get; set; } = string.Empty;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Advisory published when a certificate in a valid TLS chain is found to be
|
||||
/// OCSP-invalid during a peer handshake. Both the invalid link and the
|
||||
/// peer's leaf cert are included.
|
||||
/// Mirrors Go <c>OCSPPeerChainlinkInvalidEventMsg</c> in server/events.go.
|
||||
/// </summary>
|
||||
public sealed class OcspPeerChainlinkInvalidEventMsg : TypedEvent
|
||||
{
|
||||
[JsonPropertyName("link")]
|
||||
public CertInfo Link { get; set; } = new();
|
||||
|
||||
[JsonPropertyName("peer")]
|
||||
public CertInfo Peer { get; set; } = new();
|
||||
|
||||
[JsonPropertyName("server")]
|
||||
public ServerInfo Server { get; set; } = new();
|
||||
|
||||
[JsonPropertyName("reason")]
|
||||
public string Reason { get; set; } = string.Empty;
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// AccountNumConns / AccountStat — account connection count advisories
|
||||
// Mirrors Go structs in server/events.go.
|
||||
// ============================================================================
|
||||
|
||||
/// <summary>
|
||||
/// Advisory heartbeat published when the connection count for a tracked
|
||||
/// account changes, or on a periodic schedule.
|
||||
/// Mirrors Go <c>AccountNumConns</c> struct in server/events.go.
|
||||
/// </summary>
|
||||
public sealed class AccountNumConns : TypedEvent
|
||||
{
|
||||
[JsonPropertyName("server")]
|
||||
public ServerInfo Server { get; set; } = new();
|
||||
|
||||
// Embedded AccountStat fields are inlined via composition.
|
||||
[JsonPropertyName("acc")]
|
||||
public string Account { get; set; } = string.Empty;
|
||||
|
||||
[JsonPropertyName("name")]
|
||||
public string Name { get; set; } = string.Empty;
|
||||
|
||||
[JsonPropertyName("conns")]
|
||||
public int Conns { get; set; }
|
||||
|
||||
[JsonPropertyName("leafnodes")]
|
||||
public int LeafNodes { get; set; }
|
||||
|
||||
[JsonPropertyName("total_conns")]
|
||||
public int TotalConns { get; set; }
|
||||
|
||||
[JsonPropertyName("num_subscriptions")]
|
||||
public uint NumSubs { get; set; }
|
||||
|
||||
[JsonPropertyName("sent")]
|
||||
public DataStats Sent { get; set; } = new();
|
||||
|
||||
[JsonPropertyName("received")]
|
||||
public DataStats Received { get; set; } = new();
|
||||
|
||||
[JsonPropertyName("slow_consumers")]
|
||||
public long SlowConsumers { get; set; }
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Statistic data common to <see cref="AccountNumConns"/> and account-level
|
||||
/// monitoring responses.
|
||||
/// Mirrors Go <c>AccountStat</c> struct in server/events.go.
|
||||
/// </summary>
|
||||
public sealed class AccountStat
|
||||
{
|
||||
[JsonPropertyName("acc")]
|
||||
public string Account { get; set; } = string.Empty;
|
||||
|
||||
[JsonPropertyName("name")]
|
||||
public string Name { get; set; } = string.Empty;
|
||||
|
||||
[JsonPropertyName("conns")]
|
||||
public int Conns { get; set; }
|
||||
|
||||
[JsonPropertyName("leafnodes")]
|
||||
public int LeafNodes { get; set; }
|
||||
|
||||
[JsonPropertyName("total_conns")]
|
||||
public int TotalConns { get; set; }
|
||||
|
||||
[JsonPropertyName("num_subscriptions")]
|
||||
public uint NumSubs { get; set; }
|
||||
|
||||
[JsonPropertyName("sent")]
|
||||
public DataStats Sent { get; set; } = new();
|
||||
|
||||
[JsonPropertyName("received")]
|
||||
public DataStats Received { get; set; } = new();
|
||||
|
||||
[JsonPropertyName("slow_consumers")]
|
||||
public long SlowConsumers { get; set; }
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Internal request payload sent when this server first starts tracking an
|
||||
/// account, asking peer servers for their local connection counts.
|
||||
/// Mirrors Go <c>accNumConnsReq</c> struct in server/events.go.
|
||||
/// </summary>
|
||||
internal sealed class AccNumConnsReq
|
||||
{
|
||||
[JsonPropertyName("server")]
|
||||
public ServerInfo Server { get; set; } = new();
|
||||
|
||||
[JsonPropertyName("acc")]
|
||||
public string Account { get; set; } = string.Empty;
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// ServerCapability / ServerID — server identity and capability flags
|
||||
// Mirrors Go types in server/events.go.
|
||||
// ============================================================================
|
||||
|
||||
/// <summary>
|
||||
/// Bit-flag capability set for a remote server.
|
||||
/// Mirrors Go <c>ServerCapability uint64</c> in server/events.go.
|
||||
/// </summary>
|
||||
[Flags]
|
||||
public enum ServerCapability : ulong
|
||||
{
|
||||
/// <summary>No capabilities.</summary>
|
||||
None = 0,
|
||||
|
||||
/// <summary>Server has JetStream enabled. Mirrors Go <c>JetStreamEnabled</c>.</summary>
|
||||
JetStreamEnabled = 1UL << 0,
|
||||
|
||||
/// <summary>New stream snapshot capability. Mirrors Go <c>BinaryStreamSnapshot</c>.</summary>
|
||||
BinaryStreamSnapshot = 1UL << 1,
|
||||
|
||||
/// <summary>Move NRG traffic out of system account. Mirrors Go <c>AccountNRG</c>.</summary>
|
||||
AccountNrg = 1UL << 2,
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Minimal static identity for a remote server (name, host, ID).
|
||||
/// Mirrors Go <c>ServerID</c> struct in server/events.go.
|
||||
/// </summary>
|
||||
public sealed class ServerIdentity
|
||||
{
|
||||
[JsonPropertyName("name")]
|
||||
public string Name { get; set; } = string.Empty;
|
||||
|
||||
[JsonPropertyName("host")]
|
||||
public string Host { get; set; } = string.Empty;
|
||||
|
||||
[JsonPropertyName("id")]
|
||||
public string Id { get; set; } = string.Empty;
|
||||
}
|
||||
465
dotnet/src/ZB.MOM.NatsNet.Server/MessageTrace/MsgTraceTypes.cs
Normal file
465
dotnet/src/ZB.MOM.NatsNet.Server/MessageTrace/MsgTraceTypes.cs
Normal file
@@ -0,0 +1,465 @@
|
||||
// Copyright 2024-2026 The NATS Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
//
|
||||
// Adapted from server/msgtrace.go in the NATS server Go source.
|
||||
|
||||
using System.Text.Json;
|
||||
using System.Text.Json.Serialization;
|
||||
|
||||
namespace ZB.MOM.NatsNet.Server;
|
||||
|
||||
// ============================================================================
|
||||
// Message-trace header name constants
|
||||
// Mirrors Go const block at top of server/msgtrace.go.
|
||||
// ============================================================================
|
||||
|
||||
/// <summary>
|
||||
/// NATS message-trace header names and special sentinel values.
|
||||
/// Mirrors Go const block in server/msgtrace.go.
|
||||
/// </summary>
|
||||
public static class MsgTraceHeaders
|
||||
{
|
||||
/// <summary>Header that carries the trace destination subject. Mirrors Go <c>MsgTraceDest</c>.</summary>
|
||||
public const string MsgTraceDest = "Nats-Trace-Dest";
|
||||
|
||||
/// <summary>
|
||||
/// Sentinel value placed in the trace-dest header to disable tracing
|
||||
/// (must be an invalid NATS subject). Mirrors Go <c>MsgTraceDestDisabled</c>.
|
||||
/// </summary>
|
||||
public const string MsgTraceDestDisabled = "trace disabled";
|
||||
|
||||
/// <summary>Header used for hop-count tracking across servers. Mirrors Go <c>MsgTraceHop</c>.</summary>
|
||||
public const string MsgTraceHop = "Nats-Trace-Hop";
|
||||
|
||||
/// <summary>Header that carries the originating account name. Mirrors Go <c>MsgTraceOriginAccount</c>.</summary>
|
||||
public const string MsgTraceOriginAccount = "Nats-Trace-Origin-Account";
|
||||
|
||||
/// <summary>
|
||||
/// When set to a truthy value, the message is consumed only for tracing
|
||||
/// and not delivered to subscribers. Mirrors Go <c>MsgTraceOnly</c>.
|
||||
/// </summary>
|
||||
public const string MsgTraceOnly = "Nats-Trace-Only";
|
||||
|
||||
/// <summary>
|
||||
/// W3C trace-context parent header. NATS no longer lower-cases this but
|
||||
/// accepts it in any case. Mirrors Go <c>traceParentHdr</c> (internal).
|
||||
/// </summary>
|
||||
public const string TraceParentHdr = "traceparent";
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// MsgTraceType — discriminator string for polymorphic trace event lists
|
||||
// Mirrors Go <c>MsgTraceType string</c> in server/msgtrace.go.
|
||||
// ============================================================================
|
||||
|
||||
/// <summary>
|
||||
/// Discriminator string identifying the concrete type of a trace event
|
||||
/// within a <see cref="MsgTraceEvents"/> list.
|
||||
/// Mirrors Go <c>MsgTraceType string</c> and its constants in server/msgtrace.go.
|
||||
/// </summary>
|
||||
public sealed class MsgTraceType
|
||||
{
|
||||
private readonly string _value;
|
||||
private MsgTraceType(string value) => _value = value;
|
||||
|
||||
/// <inheritdoc/>
|
||||
public override string ToString() => _value;
|
||||
|
||||
public static implicit operator MsgTraceType(string value) => new(value);
|
||||
public static implicit operator string(MsgTraceType t) => t._value;
|
||||
|
||||
public override bool Equals(object? obj) =>
|
||||
obj is MsgTraceType other && _value == other._value;
|
||||
|
||||
public override int GetHashCode() => _value.GetHashCode();
|
||||
|
||||
// ---- Well-known type constants (mirror Go const block) ----
|
||||
|
||||
/// <summary>Ingress event. Mirrors Go <c>MsgTraceIngressType = "in"</c>.</summary>
|
||||
public static readonly MsgTraceType Ingress = new("in");
|
||||
|
||||
/// <summary>Subject-mapping event. Mirrors Go <c>MsgTraceSubjectMappingType = "sm"</c>.</summary>
|
||||
public static readonly MsgTraceType SubjectMapping = new("sm");
|
||||
|
||||
/// <summary>Stream-export event. Mirrors Go <c>MsgTraceStreamExportType = "se"</c>.</summary>
|
||||
public static readonly MsgTraceType StreamExport = new("se");
|
||||
|
||||
/// <summary>Service-import event. Mirrors Go <c>MsgTraceServiceImportType = "si"</c>.</summary>
|
||||
public static readonly MsgTraceType ServiceImport = new("si");
|
||||
|
||||
/// <summary>JetStream storage event. Mirrors Go <c>MsgTraceJetStreamType = "js"</c>.</summary>
|
||||
public static readonly MsgTraceType JetStream = new("js");
|
||||
|
||||
/// <summary>Egress (delivery) event. Mirrors Go <c>MsgTraceEgressType = "eg"</c>.</summary>
|
||||
public static readonly MsgTraceType Egress = new("eg");
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// IMsgTrace — interface for polymorphic trace events
|
||||
// Mirrors Go <c>MsgTrace interface</c> in server/msgtrace.go.
|
||||
// ============================================================================
|
||||
|
||||
/// <summary>
|
||||
/// Marker interface implemented by all concrete message-trace event types.
|
||||
/// Enables polymorphic handling of the <see cref="MsgTraceEvents"/> list.
|
||||
/// Mirrors Go <c>MsgTrace interface</c> in server/msgtrace.go.
|
||||
/// </summary>
|
||||
public interface IMsgTrace
|
||||
{
|
||||
/// <summary>Returns the discriminator type string for this trace event.</summary>
|
||||
string Typ();
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// MsgTraceBase — shared fields present in every trace event
|
||||
// Mirrors Go <c>MsgTraceBase</c> struct in server/msgtrace.go.
|
||||
// ============================================================================
|
||||
|
||||
/// <summary>
|
||||
/// Common base fields shared by all concrete message-trace event types.
|
||||
/// Mirrors Go <c>MsgTraceBase</c> struct in server/msgtrace.go.
|
||||
/// </summary>
|
||||
public class MsgTraceBase : IMsgTrace
|
||||
{
|
||||
[JsonPropertyName("type")]
|
||||
public string Type { get; set; } = string.Empty;
|
||||
|
||||
[JsonPropertyName("ts")]
|
||||
public DateTime Timestamp { get; set; }
|
||||
|
||||
/// <inheritdoc/>
|
||||
public virtual string Typ() => Type;
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// MsgTraceIngress — client / route / gateway / leaf connection ingress event
|
||||
// Mirrors Go <c>MsgTraceIngress</c> struct in server/msgtrace.go.
|
||||
// ============================================================================
|
||||
|
||||
/// <summary>
|
||||
/// Records the point at which a message was received by the server from a
|
||||
/// client, route, gateway, or leaf connection.
|
||||
/// Mirrors Go <c>MsgTraceIngress</c> struct in server/msgtrace.go.
|
||||
/// </summary>
|
||||
public sealed class MsgTraceIngress : MsgTraceBase
|
||||
{
|
||||
[JsonPropertyName("kind")]
|
||||
public int Kind { get; set; }
|
||||
|
||||
[JsonPropertyName("cid")]
|
||||
public ulong Cid { get; set; }
|
||||
|
||||
[JsonPropertyName("name")]
|
||||
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)]
|
||||
public string? Name { get; set; }
|
||||
|
||||
[JsonPropertyName("acc")]
|
||||
public string Account { get; set; } = string.Empty;
|
||||
|
||||
[JsonPropertyName("subj")]
|
||||
public string Subject { get; set; } = string.Empty;
|
||||
|
||||
[JsonPropertyName("error")]
|
||||
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)]
|
||||
public string? Error { get; set; }
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// MsgTraceSubjectMapping — subject-mapping rewrite event
|
||||
// Mirrors Go <c>MsgTraceSubjectMapping</c> struct in server/msgtrace.go.
|
||||
// ============================================================================
|
||||
|
||||
/// <summary>
|
||||
/// Records a subject-mapping rewrite applied to an in-flight message.
|
||||
/// Mirrors Go <c>MsgTraceSubjectMapping</c> struct in server/msgtrace.go.
|
||||
/// </summary>
|
||||
public sealed class MsgTraceSubjectMapping : MsgTraceBase
|
||||
{
|
||||
[JsonPropertyName("to")]
|
||||
public string MappedTo { get; set; } = string.Empty;
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// MsgTraceStreamExport — stream export / cross-account delivery event
|
||||
// Mirrors Go <c>MsgTraceStreamExport</c> struct in server/msgtrace.go.
|
||||
// ============================================================================
|
||||
|
||||
/// <summary>
|
||||
/// Records delivery of a message to a stream-export destination account.
|
||||
/// Mirrors Go <c>MsgTraceStreamExport</c> struct in server/msgtrace.go.
|
||||
/// </summary>
|
||||
public sealed class MsgTraceStreamExport : MsgTraceBase
|
||||
{
|
||||
[JsonPropertyName("acc")]
|
||||
public string Account { get; set; } = string.Empty;
|
||||
|
||||
[JsonPropertyName("to")]
|
||||
public string To { get; set; } = string.Empty;
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// MsgTraceServiceImport — service import routing event
|
||||
// Mirrors Go <c>MsgTraceServiceImport</c> struct in server/msgtrace.go.
|
||||
// ============================================================================
|
||||
|
||||
/// <summary>
|
||||
/// Records routing of a message via a service-import from one account to
|
||||
/// another.
|
||||
/// Mirrors Go <c>MsgTraceServiceImport</c> struct in server/msgtrace.go.
|
||||
/// </summary>
|
||||
public sealed class MsgTraceServiceImport : MsgTraceBase
|
||||
{
|
||||
[JsonPropertyName("acc")]
|
||||
public string Account { get; set; } = string.Empty;
|
||||
|
||||
[JsonPropertyName("from")]
|
||||
public string From { get; set; } = string.Empty;
|
||||
|
||||
[JsonPropertyName("to")]
|
||||
public string To { get; set; } = string.Empty;
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// MsgTraceJetStream — JetStream storage event
|
||||
// Mirrors Go <c>MsgTraceJetStream</c> struct in server/msgtrace.go.
|
||||
// ============================================================================
|
||||
|
||||
/// <summary>
|
||||
/// Records the attempt (and outcome) of storing or delivering a message
|
||||
/// to a JetStream stream.
|
||||
/// Mirrors Go <c>MsgTraceJetStream</c> struct in server/msgtrace.go.
|
||||
/// </summary>
|
||||
public sealed class MsgTraceJetStream : MsgTraceBase
|
||||
{
|
||||
[JsonPropertyName("stream")]
|
||||
public string Stream { get; set; } = string.Empty;
|
||||
|
||||
[JsonPropertyName("subject")]
|
||||
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)]
|
||||
public string? Subject { get; set; }
|
||||
|
||||
[JsonPropertyName("nointerest")]
|
||||
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)]
|
||||
public bool NoInterest { get; set; }
|
||||
|
||||
[JsonPropertyName("error")]
|
||||
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)]
|
||||
public string? Error { get; set; }
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// MsgTraceEgress — outbound delivery event
|
||||
// Mirrors Go <c>MsgTraceEgress</c> struct in server/msgtrace.go.
|
||||
// ============================================================================
|
||||
|
||||
/// <summary>
|
||||
/// Records the outbound delivery of a message to a subscriber, route,
|
||||
/// gateway, or leaf connection.
|
||||
/// Mirrors Go <c>MsgTraceEgress</c> struct in server/msgtrace.go.
|
||||
/// </summary>
|
||||
public sealed class MsgTraceEgress : MsgTraceBase
|
||||
{
|
||||
[JsonPropertyName("kind")]
|
||||
public int Kind { get; set; }
|
||||
|
||||
[JsonPropertyName("cid")]
|
||||
public ulong Cid { get; set; }
|
||||
|
||||
[JsonPropertyName("name")]
|
||||
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)]
|
||||
public string? Name { get; set; }
|
||||
|
||||
[JsonPropertyName("hop")]
|
||||
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)]
|
||||
public string? Hop { get; set; }
|
||||
|
||||
[JsonPropertyName("acc")]
|
||||
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)]
|
||||
public string? Account { get; set; }
|
||||
|
||||
[JsonPropertyName("sub")]
|
||||
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)]
|
||||
public string? Subscription { get; set; }
|
||||
|
||||
[JsonPropertyName("queue")]
|
||||
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)]
|
||||
public string? Queue { get; set; }
|
||||
|
||||
[JsonPropertyName("error")]
|
||||
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)]
|
||||
public string? Error { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// Optional link to the <see cref="MsgTraceEvent"/> produced by the remote
|
||||
/// server that received this egress message (route/leaf/gateway hop).
|
||||
/// Not serialised. Mirrors Go <c>Link *MsgTraceEvent</c>.
|
||||
/// </summary>
|
||||
[JsonIgnore]
|
||||
public MsgTraceEvent? Link { get; set; }
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// MsgTraceEvents — polymorphic list with custom JSON deserialiser
|
||||
// Mirrors Go <c>MsgTraceEvents []MsgTrace</c> and its UnmarshalJSON in msgtrace.go.
|
||||
// ============================================================================
|
||||
|
||||
/// <summary>
|
||||
/// Custom JSON converter that deserialises a <c>MsgTraceEvents</c> JSON array
|
||||
/// into the correct concrete <see cref="IMsgTrace"/> subtype, using the
|
||||
/// <c>"type"</c> discriminator field.
|
||||
/// Mirrors Go <c>MsgTraceEvents.UnmarshalJSON</c> in server/msgtrace.go.
|
||||
/// </summary>
|
||||
public sealed class MsgTraceEventsConverter : JsonConverter<List<IMsgTrace>>
|
||||
{
|
||||
private static readonly Dictionary<string, Func<JsonElement, IMsgTrace>> Factories = new()
|
||||
{
|
||||
["in"] = e => e.Deserialize<MsgTraceIngress>()!,
|
||||
["sm"] = e => e.Deserialize<MsgTraceSubjectMapping>()!,
|
||||
["se"] = e => e.Deserialize<MsgTraceStreamExport>()!,
|
||||
["si"] = e => e.Deserialize<MsgTraceServiceImport>()!,
|
||||
["js"] = e => e.Deserialize<MsgTraceJetStream>()!,
|
||||
["eg"] = e => e.Deserialize<MsgTraceEgress>()!,
|
||||
};
|
||||
|
||||
public override List<IMsgTrace> Read(
|
||||
ref Utf8JsonReader reader,
|
||||
Type typeToConvert,
|
||||
JsonSerializerOptions options)
|
||||
{
|
||||
var result = new List<IMsgTrace>();
|
||||
using var doc = JsonDocument.ParseValue(ref reader);
|
||||
|
||||
foreach (var element in doc.RootElement.EnumerateArray())
|
||||
{
|
||||
if (!element.TryGetProperty("type", out var typeProp))
|
||||
throw new JsonException("MsgTrace element missing 'type' field.");
|
||||
|
||||
var typeStr = typeProp.GetString() ?? string.Empty;
|
||||
|
||||
if (!Factories.TryGetValue(typeStr, out var factory))
|
||||
throw new JsonException($"Unknown MsgTrace type '{typeStr}'.");
|
||||
|
||||
result.Add(factory(element));
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
public override void Write(
|
||||
Utf8JsonWriter writer,
|
||||
List<IMsgTrace> value,
|
||||
JsonSerializerOptions options)
|
||||
{
|
||||
writer.WriteStartArray();
|
||||
foreach (var item in value)
|
||||
JsonSerializer.Serialize(writer, item, item.GetType(), options);
|
||||
writer.WriteEndArray();
|
||||
}
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// MsgTraceRequest — the original request metadata included in a trace event
|
||||
// Mirrors Go <c>MsgTraceRequest</c> struct in server/msgtrace.go.
|
||||
// ============================================================================
|
||||
|
||||
/// <summary>
|
||||
/// Captures the headers and size of the original message that triggered a
|
||||
/// trace event.
|
||||
/// Mirrors Go <c>MsgTraceRequest</c> struct in server/msgtrace.go.
|
||||
/// </summary>
|
||||
public sealed class MsgTraceRequest
|
||||
{
|
||||
/// <summary>
|
||||
/// Original message headers, preserving header-name casing.
|
||||
/// Mirrors Go <c>Header map[string][]string</c> (not http.Header, so casing is preserved).
|
||||
/// </summary>
|
||||
[JsonPropertyName("header")]
|
||||
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)]
|
||||
public Dictionary<string, List<string>>? Header { get; set; }
|
||||
|
||||
[JsonPropertyName("msgsize")]
|
||||
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)]
|
||||
public int MsgSize { get; set; }
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// MsgTraceEvent — top-level trace event published to the trace destination
|
||||
// Mirrors Go <c>MsgTraceEvent</c> struct in server/msgtrace.go.
|
||||
// ============================================================================
|
||||
|
||||
/// <summary>
|
||||
/// The top-level message-trace advisory published to the trace destination
|
||||
/// subject. Contains server identity, the original request metadata, the
|
||||
/// hop count, and the ordered list of trace events.
|
||||
/// Mirrors Go <c>MsgTraceEvent</c> struct in server/msgtrace.go.
|
||||
/// </summary>
|
||||
public sealed class MsgTraceEvent
|
||||
{
|
||||
[JsonPropertyName("server")]
|
||||
public ServerInfo Server { get; set; } = new();
|
||||
|
||||
[JsonPropertyName("request")]
|
||||
public MsgTraceRequest Request { get; set; } = new();
|
||||
|
||||
[JsonPropertyName("hops")]
|
||||
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)]
|
||||
public int Hops { get; set; }
|
||||
|
||||
[JsonPropertyName("events")]
|
||||
[JsonConverter(typeof(MsgTraceEventsConverter))]
|
||||
public List<IMsgTrace> Events { get; set; } = [];
|
||||
|
||||
// ---- Convenience accessors (mirrors Go helper methods on MsgTraceEvent) ----
|
||||
|
||||
/// <summary>
|
||||
/// Returns the first event if it is a <see cref="MsgTraceIngress"/>, else null.
|
||||
/// Mirrors Go <c>MsgTraceEvent.Ingress()</c>.
|
||||
/// </summary>
|
||||
public MsgTraceIngress? Ingress() =>
|
||||
Events.Count > 0 ? Events[0] as MsgTraceIngress : null;
|
||||
|
||||
/// <summary>
|
||||
/// Returns the first <see cref="MsgTraceSubjectMapping"/> in the event list, or null.
|
||||
/// Mirrors Go <c>MsgTraceEvent.SubjectMapping()</c>.
|
||||
/// </summary>
|
||||
public MsgTraceSubjectMapping? SubjectMapping() =>
|
||||
Events.OfType<MsgTraceSubjectMapping>().FirstOrDefault();
|
||||
|
||||
/// <summary>
|
||||
/// Returns all <see cref="MsgTraceStreamExport"/> events.
|
||||
/// Mirrors Go <c>MsgTraceEvent.StreamExports()</c>.
|
||||
/// </summary>
|
||||
public IReadOnlyList<MsgTraceStreamExport> StreamExports() =>
|
||||
Events.OfType<MsgTraceStreamExport>().ToList();
|
||||
|
||||
/// <summary>
|
||||
/// Returns all <see cref="MsgTraceServiceImport"/> events.
|
||||
/// Mirrors Go <c>MsgTraceEvent.ServiceImports()</c>.
|
||||
/// </summary>
|
||||
public IReadOnlyList<MsgTraceServiceImport> ServiceImports() =>
|
||||
Events.OfType<MsgTraceServiceImport>().ToList();
|
||||
|
||||
/// <summary>
|
||||
/// Returns the first <see cref="MsgTraceJetStream"/> event, or null.
|
||||
/// Mirrors Go <c>MsgTraceEvent.JetStream()</c>.
|
||||
/// </summary>
|
||||
public MsgTraceJetStream? JetStream() =>
|
||||
Events.OfType<MsgTraceJetStream>().FirstOrDefault();
|
||||
|
||||
/// <summary>
|
||||
/// Returns all <see cref="MsgTraceEgress"/> events.
|
||||
/// Mirrors Go <c>MsgTraceEvent.Egresses()</c>.
|
||||
/// </summary>
|
||||
public IReadOnlyList<MsgTraceEgress> Egresses() =>
|
||||
Events.OfType<MsgTraceEgress>().ToList();
|
||||
}
|
||||
294
dotnet/src/ZB.MOM.NatsNet.Server/Monitor/MonitorSortOptions.cs
Normal file
294
dotnet/src/ZB.MOM.NatsNet.Server/Monitor/MonitorSortOptions.cs
Normal file
@@ -0,0 +1,294 @@
|
||||
// Copyright 2013-2026 The NATS Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
//
|
||||
// Adapted from server/monitor_sort_opts.go in the NATS server Go source.
|
||||
|
||||
using System.Text.Json.Serialization;
|
||||
|
||||
namespace ZB.MOM.NatsNet.Server;
|
||||
|
||||
// ============================================================================
|
||||
// SortOpt — string wrapper type for connection-list sort options
|
||||
// Mirrors Go <c>SortOpt string</c> in server/monitor_sort_opts.go.
|
||||
// ============================================================================
|
||||
|
||||
/// <summary>
|
||||
/// A strongly-typed sort option for <see cref="ConnzOptions.Sort"/>.
|
||||
/// Wraps a raw string value corresponding to the JSON sort key.
|
||||
/// Mirrors Go <c>SortOpt string</c> in server/monitor_sort_opts.go.
|
||||
/// </summary>
|
||||
public sealed class SortOpt
|
||||
{
|
||||
private readonly string _value;
|
||||
|
||||
private SortOpt(string value) => _value = value;
|
||||
|
||||
/// <summary>Returns the raw sort-option string value.</summary>
|
||||
public override string ToString() => _value;
|
||||
|
||||
/// <summary>Allows implicit conversion from a string literal.</summary>
|
||||
public static implicit operator SortOpt(string value) => new(value);
|
||||
|
||||
/// <summary>Allows implicit conversion back to a plain string.</summary>
|
||||
public static implicit operator string(SortOpt opt) => opt._value;
|
||||
|
||||
public override bool Equals(object? obj) =>
|
||||
obj is SortOpt other && _value == other._value;
|
||||
|
||||
public override int GetHashCode() => _value.GetHashCode();
|
||||
|
||||
// ---- Well-known sort-option constants ----
|
||||
// Mirrors Go const block in monitor_sort_opts.go.
|
||||
|
||||
/// <summary>Sort by connection ID (ascending). Mirrors Go <c>ByCid = "cid"</c>.</summary>
|
||||
public static readonly SortOpt ByCid = new("cid");
|
||||
|
||||
/// <summary>Sort by connection start time (same as ByCid). Mirrors Go <c>ByStart = "start"</c>.</summary>
|
||||
public static readonly SortOpt ByStart = new("start");
|
||||
|
||||
/// <summary>Sort by number of subscriptions (descending). Mirrors Go <c>BySubs = "subs"</c>.</summary>
|
||||
public static readonly SortOpt BySubs = new("subs");
|
||||
|
||||
/// <summary>Sort by pending bytes waiting to be sent (descending). Mirrors Go <c>ByPending = "pending"</c>.</summary>
|
||||
public static readonly SortOpt ByPending = new("pending");
|
||||
|
||||
/// <summary>Sort by number of outbound messages (descending). Mirrors Go <c>ByOutMsgs = "msgs_to"</c>.</summary>
|
||||
public static readonly SortOpt ByOutMsgs = new("msgs_to");
|
||||
|
||||
/// <summary>Sort by number of inbound messages (descending). Mirrors Go <c>ByInMsgs = "msgs_from"</c>.</summary>
|
||||
public static readonly SortOpt ByInMsgs = new("msgs_from");
|
||||
|
||||
/// <summary>Sort by bytes sent (descending). Mirrors Go <c>ByOutBytes = "bytes_to"</c>.</summary>
|
||||
public static readonly SortOpt ByOutBytes = new("bytes_to");
|
||||
|
||||
/// <summary>Sort by bytes received (descending). Mirrors Go <c>ByInBytes = "bytes_from"</c>.</summary>
|
||||
public static readonly SortOpt ByInBytes = new("bytes_from");
|
||||
|
||||
/// <summary>Sort by last activity time (descending). Mirrors Go <c>ByLast = "last"</c>.</summary>
|
||||
public static readonly SortOpt ByLast = new("last");
|
||||
|
||||
/// <summary>Sort by idle duration (descending). Mirrors Go <c>ByIdle = "idle"</c>.</summary>
|
||||
public static readonly SortOpt ByIdle = new("idle");
|
||||
|
||||
/// <summary>Sort by uptime (descending). Mirrors Go <c>ByUptime = "uptime"</c>.</summary>
|
||||
public static readonly SortOpt ByUptime = new("uptime");
|
||||
|
||||
/// <summary>Sort by stop time — only valid on closed connections. Mirrors Go <c>ByStop = "stop"</c>.</summary>
|
||||
public static readonly SortOpt ByStop = new("stop");
|
||||
|
||||
/// <summary>Sort by close reason — only valid on closed connections. Mirrors Go <c>ByReason = "reason"</c>.</summary>
|
||||
public static readonly SortOpt ByReason = new("reason");
|
||||
|
||||
/// <summary>Sort by round-trip time (descending). Mirrors Go <c>ByRTT = "rtt"</c>.</summary>
|
||||
public static readonly SortOpt ByRtt = new("rtt");
|
||||
|
||||
private static readonly HashSet<string> ValidValues =
|
||||
[
|
||||
"", "cid", "start", "subs", "pending",
|
||||
"msgs_to", "msgs_from", "bytes_to", "bytes_from",
|
||||
"last", "idle", "uptime", "stop", "reason", "rtt"
|
||||
];
|
||||
|
||||
/// <summary>
|
||||
/// Returns true if this sort option is a recognised value.
|
||||
/// Mirrors Go <c>SortOpt.IsValid()</c> in monitor_sort_opts.go.
|
||||
/// </summary>
|
||||
public bool IsValid() => ValidValues.Contains(_value);
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// ConnInfos — sortable list wrapper for ConnInfo pointers
|
||||
// Mirrors Go <c>ConnInfos []*ConnInfo</c> in monitor_sort_opts.go.
|
||||
// ============================================================================
|
||||
|
||||
/// <summary>
|
||||
/// A list of <see cref="ConnInfo"/> objects that can be sorted using one of
|
||||
/// the <c>SortBy*</c> comparers defined in this file.
|
||||
/// Mirrors Go <c>ConnInfos []*ConnInfo</c> in server/monitor_sort_opts.go.
|
||||
/// </summary>
|
||||
public sealed class ConnInfos : List<ConnInfo>
|
||||
{
|
||||
public ConnInfos() { }
|
||||
public ConnInfos(IEnumerable<ConnInfo> items) : base(items) { }
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// IComparer<ConnInfo> implementations — one per sort option
|
||||
// Each class mirrors the corresponding Less() method in monitor_sort_opts.go.
|
||||
// ============================================================================
|
||||
|
||||
/// <summary>Sort by connection ID (ascending). Mirrors Go <c>SortByCid</c>.</summary>
|
||||
public sealed class SortByCid : IComparer<ConnInfo>
|
||||
{
|
||||
public static readonly SortByCid Instance = new();
|
||||
public int Compare(ConnInfo? x, ConnInfo? y)
|
||||
{
|
||||
if (x is null || y is null) return 0;
|
||||
return x.Cid.CompareTo(y.Cid);
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>Sort by number of subscriptions (ascending for underlying sort; caller reverses if needed).</summary>
|
||||
/// Mirrors Go <c>SortBySubs</c>.
|
||||
public sealed class SortBySubs : IComparer<ConnInfo>
|
||||
{
|
||||
public static readonly SortBySubs Instance = new();
|
||||
public int Compare(ConnInfo? x, ConnInfo? y)
|
||||
{
|
||||
if (x is null || y is null) return 0;
|
||||
return x.NumSubs.CompareTo(y.NumSubs);
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>Sort by pending bytes. Mirrors Go <c>SortByPending</c>.</summary>
|
||||
public sealed class SortByPending : IComparer<ConnInfo>
|
||||
{
|
||||
public static readonly SortByPending Instance = new();
|
||||
public int Compare(ConnInfo? x, ConnInfo? y)
|
||||
{
|
||||
if (x is null || y is null) return 0;
|
||||
return x.Pending.CompareTo(y.Pending);
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>Sort by outbound message count. Mirrors Go <c>SortByOutMsgs</c>.</summary>
|
||||
public sealed class SortByOutMsgs : IComparer<ConnInfo>
|
||||
{
|
||||
public static readonly SortByOutMsgs Instance = new();
|
||||
public int Compare(ConnInfo? x, ConnInfo? y)
|
||||
{
|
||||
if (x is null || y is null) return 0;
|
||||
return x.OutMsgs.CompareTo(y.OutMsgs);
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>Sort by inbound message count. Mirrors Go <c>SortByInMsgs</c>.</summary>
|
||||
public sealed class SortByInMsgs : IComparer<ConnInfo>
|
||||
{
|
||||
public static readonly SortByInMsgs Instance = new();
|
||||
public int Compare(ConnInfo? x, ConnInfo? y)
|
||||
{
|
||||
if (x is null || y is null) return 0;
|
||||
return x.InMsgs.CompareTo(y.InMsgs);
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>Sort by outbound bytes. Mirrors Go <c>SortByOutBytes</c>.</summary>
|
||||
public sealed class SortByOutBytes : IComparer<ConnInfo>
|
||||
{
|
||||
public static readonly SortByOutBytes Instance = new();
|
||||
public int Compare(ConnInfo? x, ConnInfo? y)
|
||||
{
|
||||
if (x is null || y is null) return 0;
|
||||
return x.OutBytes.CompareTo(y.OutBytes);
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>Sort by inbound bytes. Mirrors Go <c>SortByInBytes</c>.</summary>
|
||||
public sealed class SortByInBytes : IComparer<ConnInfo>
|
||||
{
|
||||
public static readonly SortByInBytes Instance = new();
|
||||
public int Compare(ConnInfo? x, ConnInfo? y)
|
||||
{
|
||||
if (x is null || y is null) return 0;
|
||||
return x.InBytes.CompareTo(y.InBytes);
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>Sort by last activity timestamp. Mirrors Go <c>SortByLast</c>.</summary>
|
||||
public sealed class SortByLast : IComparer<ConnInfo>
|
||||
{
|
||||
public static readonly SortByLast Instance = new();
|
||||
public int Compare(ConnInfo? x, ConnInfo? y)
|
||||
{
|
||||
if (x is null || y is null) return 0;
|
||||
return x.LastActivity.CompareTo(y.LastActivity);
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Sort by idle duration (time since last activity), relative to a supplied
|
||||
/// reference time. Mirrors Go <c>SortByIdle</c>.
|
||||
/// </summary>
|
||||
public sealed class SortByIdle : IComparer<ConnInfo>
|
||||
{
|
||||
private readonly DateTime _now;
|
||||
|
||||
public SortByIdle(DateTime now) => _now = now;
|
||||
|
||||
public int Compare(ConnInfo? x, ConnInfo? y)
|
||||
{
|
||||
if (x is null || y is null) return 0;
|
||||
var idleX = _now - x.LastActivity;
|
||||
var idleY = _now - y.LastActivity;
|
||||
return idleX.CompareTo(idleY);
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Sort by uptime (time the connection has been open), relative to a supplied
|
||||
/// reference time. Mirrors Go <c>SortByUptime</c>.
|
||||
/// </summary>
|
||||
public sealed class SortByUptime : IComparer<ConnInfo>
|
||||
{
|
||||
private readonly DateTime _now;
|
||||
|
||||
public SortByUptime(DateTime now) => _now = now;
|
||||
|
||||
public int Compare(ConnInfo? x, ConnInfo? y)
|
||||
{
|
||||
if (x is null || y is null) return 0;
|
||||
var uptimeX = (x.Stop is null || x.Stop == default) ? _now - x.Start : x.Stop.Value - x.Start;
|
||||
var uptimeY = (y.Stop is null || y.Stop == default) ? _now - y.Start : y.Stop.Value - y.Start;
|
||||
return uptimeX.CompareTo(uptimeY);
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>Sort by stop time (closed connections only). Mirrors Go <c>SortByStop</c>.</summary>
|
||||
public sealed class SortByStop : IComparer<ConnInfo>
|
||||
{
|
||||
public static readonly SortByStop Instance = new();
|
||||
public int Compare(ConnInfo? x, ConnInfo? y)
|
||||
{
|
||||
if (x is null || y is null) return 0;
|
||||
// If either stop is null treat as zero (shouldn't happen for closed-only queries)
|
||||
var stopX = x.Stop ?? DateTime.MinValue;
|
||||
var stopY = y.Stop ?? DateTime.MinValue;
|
||||
return stopX.CompareTo(stopY);
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>Sort by close reason string. Mirrors Go <c>SortByReason</c>.</summary>
|
||||
public sealed class SortByReason : IComparer<ConnInfo>
|
||||
{
|
||||
public static readonly SortByReason Instance = new();
|
||||
public int Compare(ConnInfo? x, ConnInfo? y)
|
||||
{
|
||||
if (x is null || y is null) return 0;
|
||||
return string.Compare(x.Reason, y.Reason, StringComparison.Ordinal);
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Sort by round-trip time (nanoseconds, internal field).
|
||||
/// Mirrors Go <c>SortByRTT</c>.
|
||||
/// </summary>
|
||||
public sealed class SortByRtt : IComparer<ConnInfo>
|
||||
{
|
||||
public static readonly SortByRtt Instance = new();
|
||||
public int Compare(ConnInfo? x, ConnInfo? y)
|
||||
{
|
||||
if (x is null || y is null) return 0;
|
||||
return x.RttNanos.CompareTo(y.RttNanos);
|
||||
}
|
||||
}
|
||||
387
dotnet/src/ZB.MOM.NatsNet.Server/Monitor/MonitorTypes.cs
Normal file
387
dotnet/src/ZB.MOM.NatsNet.Server/Monitor/MonitorTypes.cs
Normal file
@@ -0,0 +1,387 @@
|
||||
// Copyright 2013-2026 The NATS Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
//
|
||||
// Adapted from server/monitor.go in the NATS server Go source.
|
||||
|
||||
using System.Text.Json.Serialization;
|
||||
|
||||
namespace ZB.MOM.NatsNet.Server;
|
||||
|
||||
// ============================================================================
|
||||
// Monitor list size defaults
|
||||
// Mirrors Go const block near top of monitor.go.
|
||||
// ============================================================================
|
||||
|
||||
/// <summary>
|
||||
/// Default sizes for monitoring API response lists.
|
||||
/// Mirrors Go constants in server/monitor.go.
|
||||
/// </summary>
|
||||
public static class MonitorDefaults
|
||||
{
|
||||
/// <summary>Default maximum number of connection entries returned. Mirrors Go <c>DefaultConnListSize = 1024</c>.</summary>
|
||||
public const int DefaultConnListSize = 1024;
|
||||
|
||||
/// <summary>Default maximum number of subscription entries returned. Mirrors Go <c>DefaultSubListSize = 1024</c>.</summary>
|
||||
public const int DefaultSubListSize = 1024;
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// ConnState — connection state filter for Connz queries
|
||||
// Mirrors Go <c>ConnState</c> and its iota constants in monitor.go.
|
||||
// ============================================================================
|
||||
|
||||
/// <summary>
|
||||
/// Filter applied to connection-list queries to select open, closed, or
|
||||
/// all connections.
|
||||
/// Mirrors Go <c>ConnState</c> in server/monitor.go.
|
||||
/// </summary>
|
||||
public enum ConnState
|
||||
{
|
||||
/// <summary>Only return open (active) connections. Mirrors Go <c>ConnOpen = 0</c>.</summary>
|
||||
ConnOpen = 0,
|
||||
|
||||
/// <summary>Only return closed connections. Mirrors Go <c>ConnClosed</c>.</summary>
|
||||
ConnClosed = 1,
|
||||
|
||||
/// <summary>Return all connections, open or closed. Mirrors Go <c>ConnAll</c>.</summary>
|
||||
ConnAll = 2,
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// ConnzOptions — query options for the Connz endpoint
|
||||
// Mirrors Go <c>ConnzOptions</c> struct in server/monitor.go.
|
||||
// ============================================================================
|
||||
|
||||
/// <summary>
|
||||
/// Options that control the output of a <c>Connz</c> monitoring query.
|
||||
/// Mirrors Go <c>ConnzOptions</c> struct in server/monitor.go.
|
||||
/// </summary>
|
||||
public sealed class ConnzOptions
|
||||
{
|
||||
/// <summary>
|
||||
/// How to sort results. Only <c>ByCid</c> is ascending; all others are
|
||||
/// descending. Mirrors Go <c>Sort SortOpt</c>.
|
||||
/// </summary>
|
||||
[JsonPropertyName("sort")]
|
||||
public SortOpt Sort { get; set; } = SortOpt.ByCid;
|
||||
|
||||
/// <summary>When true, usernames are included in results. Mirrors Go <c>Username bool</c>.</summary>
|
||||
[JsonPropertyName("auth")]
|
||||
public bool Username { get; set; }
|
||||
|
||||
/// <summary>When true, subscription subjects are listed. Mirrors Go <c>Subscriptions bool</c>.</summary>
|
||||
[JsonPropertyName("subscriptions")]
|
||||
public bool Subscriptions { get; set; }
|
||||
|
||||
/// <summary>When true, verbose subscription detail is included. Mirrors Go <c>SubscriptionsDetail bool</c>.</summary>
|
||||
[JsonPropertyName("subscriptions_detail")]
|
||||
public bool SubscriptionsDetail { get; set; }
|
||||
|
||||
/// <summary>Zero-based offset for pagination. Mirrors Go <c>Offset int</c>.</summary>
|
||||
[JsonPropertyName("offset")]
|
||||
public int Offset { get; set; }
|
||||
|
||||
/// <summary>Maximum number of connections to return. Mirrors Go <c>Limit int</c>.</summary>
|
||||
[JsonPropertyName("limit")]
|
||||
public int Limit { get; set; }
|
||||
|
||||
/// <summary>Filter for a specific client connection by CID. Mirrors Go <c>CID uint64</c>.</summary>
|
||||
[JsonPropertyName("cid")]
|
||||
public ulong Cid { get; set; }
|
||||
|
||||
/// <summary>Filter for a specific MQTT client ID. Mirrors Go <c>MQTTClient string</c>.</summary>
|
||||
[JsonPropertyName("mqtt_client")]
|
||||
public string MqttClient { get; set; } = string.Empty;
|
||||
|
||||
/// <summary>Connection state filter. Mirrors Go <c>State ConnState</c>.</summary>
|
||||
[JsonPropertyName("state")]
|
||||
public ConnState State { get; set; } = ConnState.ConnOpen;
|
||||
|
||||
/// <summary>Filter by username. Mirrors Go <c>User string</c>.</summary>
|
||||
[JsonPropertyName("user")]
|
||||
public string User { get; set; } = string.Empty;
|
||||
|
||||
/// <summary>Filter by account name. Mirrors Go <c>Account string</c>.</summary>
|
||||
[JsonPropertyName("acc")]
|
||||
public string Account { get; set; } = string.Empty;
|
||||
|
||||
/// <summary>Filter by subject interest (requires Account filter). Mirrors Go <c>FilterSubject string</c>.</summary>
|
||||
[JsonPropertyName("filter_subject")]
|
||||
public string FilterSubject { get; set; } = string.Empty;
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// Connz — top-level connection list monitoring response
|
||||
// Mirrors Go <c>Connz</c> struct in server/monitor.go.
|
||||
// ============================================================================
|
||||
|
||||
/// <summary>
|
||||
/// Top-level response type for the <c>/connz</c> monitoring endpoint.
|
||||
/// Contains the current connection list and pagination metadata.
|
||||
/// Mirrors Go <c>Connz</c> struct in server/monitor.go.
|
||||
/// </summary>
|
||||
public sealed class Connz
|
||||
{
|
||||
[JsonPropertyName("server_id")]
|
||||
public string Id { get; set; } = string.Empty;
|
||||
|
||||
[JsonPropertyName("now")]
|
||||
public DateTime Now { get; set; }
|
||||
|
||||
[JsonPropertyName("num_connections")]
|
||||
public int NumConns { get; set; }
|
||||
|
||||
[JsonPropertyName("total")]
|
||||
public int Total { get; set; }
|
||||
|
||||
[JsonPropertyName("offset")]
|
||||
public int Offset { get; set; }
|
||||
|
||||
[JsonPropertyName("limit")]
|
||||
public int Limit { get; set; }
|
||||
|
||||
[JsonPropertyName("connections")]
|
||||
public List<ConnInfo> Conns { get; set; } = [];
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// ConnInfo — per-connection detail record
|
||||
// Mirrors Go <c>ConnInfo</c> struct in server/monitor.go.
|
||||
// ============================================================================
|
||||
|
||||
/// <summary>
|
||||
/// Detailed information about a single client connection, as returned by the
|
||||
/// <c>/connz</c> monitoring endpoint.
|
||||
/// Mirrors Go <c>ConnInfo</c> struct in server/monitor.go.
|
||||
/// </summary>
|
||||
public sealed class ConnInfo
|
||||
{
|
||||
[JsonPropertyName("cid")]
|
||||
public ulong Cid { get; set; }
|
||||
|
||||
[JsonPropertyName("kind")]
|
||||
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)]
|
||||
public string? Kind { get; set; }
|
||||
|
||||
[JsonPropertyName("type")]
|
||||
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)]
|
||||
public string? Type { get; set; }
|
||||
|
||||
[JsonPropertyName("ip")]
|
||||
public string Ip { get; set; } = string.Empty;
|
||||
|
||||
[JsonPropertyName("port")]
|
||||
public int Port { get; set; }
|
||||
|
||||
[JsonPropertyName("start")]
|
||||
public DateTime Start { get; set; }
|
||||
|
||||
[JsonPropertyName("last_activity")]
|
||||
public DateTime LastActivity { get; set; }
|
||||
|
||||
[JsonPropertyName("stop")]
|
||||
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)]
|
||||
public DateTime? Stop { get; set; }
|
||||
|
||||
[JsonPropertyName("reason")]
|
||||
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)]
|
||||
public string? Reason { get; set; }
|
||||
|
||||
[JsonPropertyName("rtt")]
|
||||
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)]
|
||||
public string? Rtt { get; set; }
|
||||
|
||||
[JsonPropertyName("uptime")]
|
||||
public string Uptime { get; set; } = string.Empty;
|
||||
|
||||
[JsonPropertyName("idle")]
|
||||
public string Idle { get; set; } = string.Empty;
|
||||
|
||||
[JsonPropertyName("pending_bytes")]
|
||||
public int Pending { get; set; }
|
||||
|
||||
[JsonPropertyName("in_msgs")]
|
||||
public long InMsgs { get; set; }
|
||||
|
||||
[JsonPropertyName("out_msgs")]
|
||||
public long OutMsgs { get; set; }
|
||||
|
||||
[JsonPropertyName("in_bytes")]
|
||||
public long InBytes { get; set; }
|
||||
|
||||
[JsonPropertyName("out_bytes")]
|
||||
public long OutBytes { get; set; }
|
||||
|
||||
[JsonPropertyName("stalls")]
|
||||
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)]
|
||||
public long Stalls { get; set; }
|
||||
|
||||
[JsonPropertyName("subscriptions")]
|
||||
public uint NumSubs { get; set; }
|
||||
|
||||
[JsonPropertyName("name")]
|
||||
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)]
|
||||
public string? Name { get; set; }
|
||||
|
||||
[JsonPropertyName("lang")]
|
||||
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)]
|
||||
public string? Lang { get; set; }
|
||||
|
||||
[JsonPropertyName("version")]
|
||||
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)]
|
||||
public string? Version { get; set; }
|
||||
|
||||
[JsonPropertyName("tls_version")]
|
||||
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)]
|
||||
public string? TlsVersion { get; set; }
|
||||
|
||||
[JsonPropertyName("tls_cipher_suite")]
|
||||
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)]
|
||||
public string? TlsCipher { get; set; }
|
||||
|
||||
[JsonPropertyName("tls_peer_certs")]
|
||||
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)]
|
||||
public List<TlsPeerCert>? TlsPeerCerts { get; set; }
|
||||
|
||||
[JsonPropertyName("tls_first")]
|
||||
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)]
|
||||
public bool TlsFirst { get; set; }
|
||||
|
||||
[JsonPropertyName("authorized_user")]
|
||||
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)]
|
||||
public string? AuthorizedUser { get; set; }
|
||||
|
||||
[JsonPropertyName("account")]
|
||||
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)]
|
||||
public string? Account { get; set; }
|
||||
|
||||
[JsonPropertyName("subscriptions_list")]
|
||||
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)]
|
||||
public List<string>? Subs { get; set; }
|
||||
|
||||
[JsonPropertyName("subscriptions_list_detail")]
|
||||
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)]
|
||||
public List<SubDetail>? SubsDetail { get; set; }
|
||||
|
||||
[JsonPropertyName("jwt")]
|
||||
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)]
|
||||
public string? Jwt { get; set; }
|
||||
|
||||
[JsonPropertyName("issuer_key")]
|
||||
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)]
|
||||
public string? IssuerKey { get; set; }
|
||||
|
||||
[JsonPropertyName("name_tag")]
|
||||
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)]
|
||||
public string? NameTag { get; set; }
|
||||
|
||||
[JsonPropertyName("tags")]
|
||||
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)]
|
||||
public string[]? Tags { get; set; }
|
||||
|
||||
[JsonPropertyName("mqtt_client")]
|
||||
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)]
|
||||
public string? MqttClient { get; set; }
|
||||
|
||||
[JsonPropertyName("proxy")]
|
||||
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)]
|
||||
public ProxyInfo? Proxy { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// Internal field used for fast RTT-based sorting.
|
||||
/// Mirrors Go <c>rtt int64</c> unexported field in ConnInfo.
|
||||
/// Not serialised.
|
||||
/// </summary>
|
||||
[JsonIgnore]
|
||||
internal long RttNanos { get; set; }
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// ProxyInfo — proxy connection metadata
|
||||
// Mirrors Go <c>ProxyInfo</c> struct in server/monitor.go.
|
||||
// ============================================================================
|
||||
|
||||
/// <summary>
|
||||
/// Information about a proxied connection (e.g. HAProxy PROXY protocol).
|
||||
/// Mirrors Go <c>ProxyInfo</c> struct in server/monitor.go.
|
||||
/// </summary>
|
||||
public sealed class ProxyInfo
|
||||
{
|
||||
[JsonPropertyName("key")]
|
||||
public string Key { get; set; } = string.Empty;
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// TlsPeerCert — TLS peer certificate summary
|
||||
// Mirrors Go <c>TLSPeerCert</c> struct in server/monitor.go.
|
||||
// ============================================================================
|
||||
|
||||
/// <summary>
|
||||
/// Basic information about a TLS peer certificate.
|
||||
/// Mirrors Go <c>TLSPeerCert</c> struct in server/monitor.go.
|
||||
/// </summary>
|
||||
public sealed class TlsPeerCert
|
||||
{
|
||||
[JsonPropertyName("subject")]
|
||||
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)]
|
||||
public string? Subject { get; set; }
|
||||
|
||||
[JsonPropertyName("spki_sha256")]
|
||||
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)]
|
||||
public string? SubjectPkiSha256 { get; set; }
|
||||
|
||||
[JsonPropertyName("cert_sha256")]
|
||||
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)]
|
||||
public string? CertSha256 { get; set; }
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// SubDetail — verbose subscription information
|
||||
// Mirrors Go <c>SubDetail</c> struct in server/monitor.go (line ~961).
|
||||
// ============================================================================
|
||||
|
||||
/// <summary>
|
||||
/// Verbose information about a single subscription, included in detailed
|
||||
/// connection or account monitoring responses.
|
||||
/// Mirrors Go <c>SubDetail</c> struct in server/monitor.go.
|
||||
/// </summary>
|
||||
public sealed class SubDetail
|
||||
{
|
||||
[JsonPropertyName("account")]
|
||||
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)]
|
||||
public string? Account { get; set; }
|
||||
|
||||
[JsonPropertyName("account_tag")]
|
||||
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)]
|
||||
public string? AccountTag { get; set; }
|
||||
|
||||
[JsonPropertyName("subject")]
|
||||
public string Subject { get; set; } = string.Empty;
|
||||
|
||||
[JsonPropertyName("qgroup")]
|
||||
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)]
|
||||
public string? Queue { get; set; }
|
||||
|
||||
[JsonPropertyName("sid")]
|
||||
public string Sid { get; set; } = string.Empty;
|
||||
|
||||
[JsonPropertyName("msgs")]
|
||||
public long Msgs { get; set; }
|
||||
|
||||
[JsonPropertyName("max")]
|
||||
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)]
|
||||
public long Max { get; set; }
|
||||
|
||||
[JsonPropertyName("cid")]
|
||||
public ulong Cid { get; set; }
|
||||
}
|
||||
@@ -57,7 +57,7 @@ public sealed partial class NatsServer : INatsServer
|
||||
|
||||
private readonly ServerStats _stats = new();
|
||||
private readonly SlowConsumerStats _scStats = new();
|
||||
private readonly StaleConnectionStats _staleStats = new();
|
||||
private readonly InternalStaleStats _staleStats = new();
|
||||
|
||||
// =========================================================================
|
||||
// Core identity
|
||||
|
||||
@@ -122,10 +122,12 @@ internal sealed class SlowConsumerStats
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Per-kind stale-connection counters (atomic).
|
||||
/// Per-kind stale-connection counters (atomic, internal use only).
|
||||
/// Mirrors Go embedded <c>staleStats</c> in server.go.
|
||||
/// NOTE: The public JSON-serialisable monitoring equivalent is <c>StaleConnectionStats</c>
|
||||
/// in Events/EventTypes.cs.
|
||||
/// </summary>
|
||||
internal sealed class StaleConnectionStats
|
||||
internal sealed class InternalStaleStats
|
||||
{
|
||||
public long Clients;
|
||||
public long Routes;
|
||||
@@ -204,12 +206,7 @@ public static class CompressionMode
|
||||
// These stubs will be replaced with full implementations in later sessions.
|
||||
// They are declared here to allow the NatsServer class to compile.
|
||||
|
||||
/// <summary>Stub for the system/internal messaging state (session 12).</summary>
|
||||
internal sealed class InternalState
|
||||
{
|
||||
public Account? Account { get; set; }
|
||||
// Full implementation in session 12 (events.go)
|
||||
}
|
||||
// InternalState is now fully defined in Events/EventTypes.cs (session 12).
|
||||
|
||||
/// <summary>Stub for JetStream state pointer (session 19).</summary>
|
||||
internal sealed class JetStreamState { }
|
||||
|
||||
Reference in New Issue
Block a user