diff --git a/src/NATS.Server/Configuration/ConfigProcessor.cs b/src/NATS.Server/Configuration/ConfigProcessor.cs index 88b36ae..ae593b1 100644 --- a/src/NATS.Server/Configuration/ConfigProcessor.cs +++ b/src/NATS.Server/Configuration/ConfigProcessor.cs @@ -245,6 +245,12 @@ public static class ConfigProcessor opts.ReconnectErrorReports = ToInt(value); break; + // MQTT + case "mqtt": + if (value is Dictionary mqttDict) + ParseMqtt(mqttDict, opts, errors); + break; + // Unknown keys silently ignored (cluster, jetstream, gateway, leafnode, etc.) default: break; @@ -620,6 +626,145 @@ public static class ConfigProcessor opts.Tags = tags; } + // ─── MQTT parsing ──────────────────────────────────────────────── + // Reference: Go server/opts.go parseMQTT (lines ~5443-5541) + + private static void ParseMqtt(Dictionary dict, NatsOptions opts, List errors) + { + var mqtt = opts.Mqtt ?? new MqttOptions(); + + foreach (var (key, value) in dict) + { + switch (key.ToLowerInvariant()) + { + case "listen": + var (host, port) = ParseHostPort(value); + if (host is not null) mqtt.Host = host; + if (port is not null) mqtt.Port = port.Value; + break; + case "port": + mqtt.Port = ToInt(value); + break; + case "host" or "net": + mqtt.Host = ToString(value); + break; + case "no_auth_user": + mqtt.NoAuthUser = ToString(value); + break; + case "tls": + if (value is Dictionary tlsDict) + ParseMqttTls(tlsDict, mqtt, errors); + break; + case "authorization" or "authentication": + if (value is Dictionary authDict) + ParseMqttAuth(authDict, mqtt, errors); + break; + case "ack_wait" or "ackwait": + mqtt.AckWait = ParseDuration(value); + break; + case "js_api_timeout" or "api_timeout": + mqtt.JsApiTimeout = ParseDuration(value); + break; + case "max_ack_pending" or "max_pending" or "max_inflight": + var pending = ToInt(value); + if (pending < 0 || pending > 0xFFFF) + errors.Add($"mqtt max_ack_pending invalid value {pending}, should be in [0..{0xFFFF}] range"); + else + mqtt.MaxAckPending = (ushort)pending; + break; + case "js_domain": + mqtt.JsDomain = ToString(value); + break; + case "stream_replicas": + mqtt.StreamReplicas = ToInt(value); + break; + case "consumer_replicas": + mqtt.ConsumerReplicas = ToInt(value); + break; + case "consumer_memory_storage": + mqtt.ConsumerMemoryStorage = ToBool(value); + break; + case "consumer_inactive_threshold" or "consumer_auto_cleanup": + mqtt.ConsumerInactiveThreshold = ParseDuration(value); + break; + default: + break; + } + } + + opts.Mqtt = mqtt; + } + + private static void ParseMqttAuth(Dictionary dict, MqttOptions mqtt, List errors) + { + foreach (var (key, value) in dict) + { + switch (key.ToLowerInvariant()) + { + case "user" or "username": + mqtt.Username = ToString(value); + break; + case "pass" or "password": + mqtt.Password = ToString(value); + break; + case "token": + mqtt.Token = ToString(value); + break; + case "timeout": + mqtt.AuthTimeout = ToDouble(value); + break; + default: + break; + } + } + } + + private static void ParseMqttTls(Dictionary dict, MqttOptions mqtt, List errors) + { + foreach (var (key, value) in dict) + { + switch (key.ToLowerInvariant()) + { + case "cert_file": + mqtt.TlsCert = ToString(value); + break; + case "key_file": + mqtt.TlsKey = ToString(value); + break; + case "ca_file": + mqtt.TlsCaCert = ToString(value); + break; + case "verify": + mqtt.TlsVerify = ToBool(value); + break; + case "verify_and_map": + var map = ToBool(value); + mqtt.TlsMap = map; + if (map) mqtt.TlsVerify = true; + break; + case "timeout": + mqtt.TlsTimeout = ToDouble(value); + break; + case "pinned_certs": + if (value is List pinnedList) + { + var certs = new HashSet(StringComparer.OrdinalIgnoreCase); + foreach (var item in pinnedList) + { + if (item is string s) + certs.Add(s.ToLowerInvariant()); + } + + mqtt.TlsPinnedCerts = certs; + } + + break; + default: + break; + } + } + } + // ─── Type conversion helpers ─────────────────────────────────── private static int ToInt(object? value) => value switch @@ -653,6 +798,15 @@ public static class ConfigProcessor _ => throw new FormatException($"Cannot convert {value?.GetType().Name ?? "null"} to string"), }; + private static double ToDouble(object? value) => value switch + { + double d => d, + long l => l, + int i => i, + string s when double.TryParse(s, NumberStyles.Float, CultureInfo.InvariantCulture, out var d) => d, + _ => throw new FormatException($"Cannot convert {value?.GetType().Name ?? "null"} to double"), + }; + private static IReadOnlyList ToStringList(object? value) { if (value is List list) diff --git a/src/NATS.Server/MqttOptions.cs b/src/NATS.Server/MqttOptions.cs new file mode 100644 index 0000000..c47e15e --- /dev/null +++ b/src/NATS.Server/MqttOptions.cs @@ -0,0 +1,43 @@ +namespace NATS.Server; + +/// +/// MQTT protocol configuration options. +/// Corresponds to Go server/opts.go MQTTOpts struct. +/// Config is parsed and stored but no MQTT listener is started yet. +/// +public sealed class MqttOptions +{ + // Network + public string Host { get; set; } = ""; + public int Port { get; set; } + + // Auth override (MQTT-specific, separate from global auth) + public string? NoAuthUser { get; set; } + public string? Username { get; set; } + public string? Password { get; set; } + public string? Token { get; set; } + public double AuthTimeout { get; set; } + + // TLS + public string? TlsCert { get; set; } + public string? TlsKey { get; set; } + public string? TlsCaCert { get; set; } + public bool TlsVerify { get; set; } + public double TlsTimeout { get; set; } = 2.0; + public bool TlsMap { get; set; } + public HashSet? TlsPinnedCerts { get; set; } + + // JetStream integration + public string? JsDomain { get; set; } + public int StreamReplicas { get; set; } + public int ConsumerReplicas { get; set; } + public bool ConsumerMemoryStorage { get; set; } + public TimeSpan ConsumerInactiveThreshold { get; set; } + + // QoS + public TimeSpan AckWait { get; set; } = TimeSpan.FromSeconds(30); + public ushort MaxAckPending { get; set; } + public TimeSpan JsApiTimeout { get; set; } = TimeSpan.FromSeconds(5); + + public bool HasTls => TlsCert != null && TlsKey != null; +} diff --git a/src/NATS.Server/NatsOptions.cs b/src/NATS.Server/NatsOptions.cs index 1e3820e..981f6f7 100644 --- a/src/NATS.Server/NatsOptions.cs +++ b/src/NATS.Server/NatsOptions.cs @@ -115,6 +115,9 @@ public sealed class NatsOptions // Subject mapping / transforms (source pattern -> destination template) public Dictionary? SubjectMappings { get; set; } + // MQTT configuration (parsed from config, no listener yet) + public MqttOptions? Mqtt { get; set; } + public bool HasTls => TlsCert != null && TlsKey != null; // WebSocket