feat: add mqtt config model and parser for all Go MQTTOpts fields
This commit is contained in:
@@ -245,6 +245,12 @@ public static class ConfigProcessor
|
||||
opts.ReconnectErrorReports = ToInt(value);
|
||||
break;
|
||||
|
||||
// MQTT
|
||||
case "mqtt":
|
||||
if (value is Dictionary<string, object?> mqttDict)
|
||||
ParseMqtt(mqttDict, opts, errors);
|
||||
break;
|
||||
|
||||
// Unknown keys silently ignored (cluster, jetstream, gateway, leafnode, etc.)
|
||||
default:
|
||||
break;
|
||||
@@ -620,6 +626,145 @@ public static class ConfigProcessor
|
||||
opts.Tags = tags;
|
||||
}
|
||||
|
||||
// ─── MQTT parsing ────────────────────────────────────────────────
|
||||
// Reference: Go server/opts.go parseMQTT (lines ~5443-5541)
|
||||
|
||||
private static void ParseMqtt(Dictionary<string, object?> dict, NatsOptions opts, List<string> errors)
|
||||
{
|
||||
var mqtt = opts.Mqtt ?? new MqttOptions();
|
||||
|
||||
foreach (var (key, value) in dict)
|
||||
{
|
||||
switch (key.ToLowerInvariant())
|
||||
{
|
||||
case "listen":
|
||||
var (host, port) = ParseHostPort(value);
|
||||
if (host is not null) mqtt.Host = host;
|
||||
if (port is not null) mqtt.Port = port.Value;
|
||||
break;
|
||||
case "port":
|
||||
mqtt.Port = ToInt(value);
|
||||
break;
|
||||
case "host" or "net":
|
||||
mqtt.Host = ToString(value);
|
||||
break;
|
||||
case "no_auth_user":
|
||||
mqtt.NoAuthUser = ToString(value);
|
||||
break;
|
||||
case "tls":
|
||||
if (value is Dictionary<string, object?> tlsDict)
|
||||
ParseMqttTls(tlsDict, mqtt, errors);
|
||||
break;
|
||||
case "authorization" or "authentication":
|
||||
if (value is Dictionary<string, object?> authDict)
|
||||
ParseMqttAuth(authDict, mqtt, errors);
|
||||
break;
|
||||
case "ack_wait" or "ackwait":
|
||||
mqtt.AckWait = ParseDuration(value);
|
||||
break;
|
||||
case "js_api_timeout" or "api_timeout":
|
||||
mqtt.JsApiTimeout = ParseDuration(value);
|
||||
break;
|
||||
case "max_ack_pending" or "max_pending" or "max_inflight":
|
||||
var pending = ToInt(value);
|
||||
if (pending < 0 || pending > 0xFFFF)
|
||||
errors.Add($"mqtt max_ack_pending invalid value {pending}, should be in [0..{0xFFFF}] range");
|
||||
else
|
||||
mqtt.MaxAckPending = (ushort)pending;
|
||||
break;
|
||||
case "js_domain":
|
||||
mqtt.JsDomain = ToString(value);
|
||||
break;
|
||||
case "stream_replicas":
|
||||
mqtt.StreamReplicas = ToInt(value);
|
||||
break;
|
||||
case "consumer_replicas":
|
||||
mqtt.ConsumerReplicas = ToInt(value);
|
||||
break;
|
||||
case "consumer_memory_storage":
|
||||
mqtt.ConsumerMemoryStorage = ToBool(value);
|
||||
break;
|
||||
case "consumer_inactive_threshold" or "consumer_auto_cleanup":
|
||||
mqtt.ConsumerInactiveThreshold = ParseDuration(value);
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
opts.Mqtt = mqtt;
|
||||
}
|
||||
|
||||
private static void ParseMqttAuth(Dictionary<string, object?> dict, MqttOptions mqtt, List<string> errors)
|
||||
{
|
||||
foreach (var (key, value) in dict)
|
||||
{
|
||||
switch (key.ToLowerInvariant())
|
||||
{
|
||||
case "user" or "username":
|
||||
mqtt.Username = ToString(value);
|
||||
break;
|
||||
case "pass" or "password":
|
||||
mqtt.Password = ToString(value);
|
||||
break;
|
||||
case "token":
|
||||
mqtt.Token = ToString(value);
|
||||
break;
|
||||
case "timeout":
|
||||
mqtt.AuthTimeout = ToDouble(value);
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static void ParseMqttTls(Dictionary<string, object?> dict, MqttOptions mqtt, List<string> errors)
|
||||
{
|
||||
foreach (var (key, value) in dict)
|
||||
{
|
||||
switch (key.ToLowerInvariant())
|
||||
{
|
||||
case "cert_file":
|
||||
mqtt.TlsCert = ToString(value);
|
||||
break;
|
||||
case "key_file":
|
||||
mqtt.TlsKey = ToString(value);
|
||||
break;
|
||||
case "ca_file":
|
||||
mqtt.TlsCaCert = ToString(value);
|
||||
break;
|
||||
case "verify":
|
||||
mqtt.TlsVerify = ToBool(value);
|
||||
break;
|
||||
case "verify_and_map":
|
||||
var map = ToBool(value);
|
||||
mqtt.TlsMap = map;
|
||||
if (map) mqtt.TlsVerify = true;
|
||||
break;
|
||||
case "timeout":
|
||||
mqtt.TlsTimeout = ToDouble(value);
|
||||
break;
|
||||
case "pinned_certs":
|
||||
if (value is List<object?> pinnedList)
|
||||
{
|
||||
var certs = new HashSet<string>(StringComparer.OrdinalIgnoreCase);
|
||||
foreach (var item in pinnedList)
|
||||
{
|
||||
if (item is string s)
|
||||
certs.Add(s.ToLowerInvariant());
|
||||
}
|
||||
|
||||
mqtt.TlsPinnedCerts = certs;
|
||||
}
|
||||
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ─── Type conversion helpers ───────────────────────────────────
|
||||
|
||||
private static int ToInt(object? value) => value switch
|
||||
@@ -653,6 +798,15 @@ public static class ConfigProcessor
|
||||
_ => throw new FormatException($"Cannot convert {value?.GetType().Name ?? "null"} to string"),
|
||||
};
|
||||
|
||||
private static double ToDouble(object? value) => value switch
|
||||
{
|
||||
double d => d,
|
||||
long l => l,
|
||||
int i => i,
|
||||
string s when double.TryParse(s, NumberStyles.Float, CultureInfo.InvariantCulture, out var d) => d,
|
||||
_ => throw new FormatException($"Cannot convert {value?.GetType().Name ?? "null"} to double"),
|
||||
};
|
||||
|
||||
private static IReadOnlyList<string> ToStringList(object? value)
|
||||
{
|
||||
if (value is List<object?> list)
|
||||
|
||||
43
src/NATS.Server/MqttOptions.cs
Normal file
43
src/NATS.Server/MqttOptions.cs
Normal file
@@ -0,0 +1,43 @@
|
||||
namespace NATS.Server;
|
||||
|
||||
/// <summary>
|
||||
/// MQTT protocol configuration options.
|
||||
/// Corresponds to Go server/opts.go MQTTOpts struct.
|
||||
/// Config is parsed and stored but no MQTT listener is started yet.
|
||||
/// </summary>
|
||||
public sealed class MqttOptions
|
||||
{
|
||||
// Network
|
||||
public string Host { get; set; } = "";
|
||||
public int Port { get; set; }
|
||||
|
||||
// Auth override (MQTT-specific, separate from global auth)
|
||||
public string? NoAuthUser { get; set; }
|
||||
public string? Username { get; set; }
|
||||
public string? Password { get; set; }
|
||||
public string? Token { get; set; }
|
||||
public double AuthTimeout { get; set; }
|
||||
|
||||
// TLS
|
||||
public string? TlsCert { get; set; }
|
||||
public string? TlsKey { get; set; }
|
||||
public string? TlsCaCert { get; set; }
|
||||
public bool TlsVerify { get; set; }
|
||||
public double TlsTimeout { get; set; } = 2.0;
|
||||
public bool TlsMap { get; set; }
|
||||
public HashSet<string>? TlsPinnedCerts { get; set; }
|
||||
|
||||
// JetStream integration
|
||||
public string? JsDomain { get; set; }
|
||||
public int StreamReplicas { get; set; }
|
||||
public int ConsumerReplicas { get; set; }
|
||||
public bool ConsumerMemoryStorage { get; set; }
|
||||
public TimeSpan ConsumerInactiveThreshold { get; set; }
|
||||
|
||||
// QoS
|
||||
public TimeSpan AckWait { get; set; } = TimeSpan.FromSeconds(30);
|
||||
public ushort MaxAckPending { get; set; }
|
||||
public TimeSpan JsApiTimeout { get; set; } = TimeSpan.FromSeconds(5);
|
||||
|
||||
public bool HasTls => TlsCert != null && TlsKey != null;
|
||||
}
|
||||
@@ -115,6 +115,9 @@ public sealed class NatsOptions
|
||||
// Subject mapping / transforms (source pattern -> destination template)
|
||||
public Dictionary<string, string>? SubjectMappings { get; set; }
|
||||
|
||||
// MQTT configuration (parsed from config, no listener yet)
|
||||
public MqttOptions? Mqtt { get; set; }
|
||||
|
||||
public bool HasTls => TlsCert != null && TlsKey != null;
|
||||
|
||||
// WebSocket
|
||||
|
||||
Reference in New Issue
Block a user