From 44d426a7c5b1732bb0ad1374a066c6074efc952b Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Mon, 23 Feb 2026 05:43:04 -0500 Subject: [PATCH] feat: parse cluster and jetstream config blocks --- .../Configuration/ClusterOptions.cs | 8 + .../Configuration/ConfigProcessor.cs | 186 +++++++++++++++++- .../Configuration/GatewayOptions.cs | 8 + .../Configuration/JetStreamOptions.cs | 8 + .../Configuration/LeafNodeOptions.cs | 7 + src/NATS.Server/NatsOptions.cs | 7 + .../ClusterJetStreamConfigProcessorTests.cs | 21 ++ 7 files changed, 244 insertions(+), 1 deletion(-) create mode 100644 src/NATS.Server/Configuration/ClusterOptions.cs create mode 100644 src/NATS.Server/Configuration/GatewayOptions.cs create mode 100644 src/NATS.Server/Configuration/JetStreamOptions.cs create mode 100644 src/NATS.Server/Configuration/LeafNodeOptions.cs create mode 100644 tests/NATS.Server.Tests/ClusterJetStreamConfigProcessorTests.cs diff --git a/src/NATS.Server/Configuration/ClusterOptions.cs b/src/NATS.Server/Configuration/ClusterOptions.cs new file mode 100644 index 0000000..4571522 --- /dev/null +++ b/src/NATS.Server/Configuration/ClusterOptions.cs @@ -0,0 +1,8 @@ +namespace NATS.Server.Configuration; + +public sealed class ClusterOptions +{ + public string? Name { get; set; } + public string Host { get; set; } = "0.0.0.0"; + public int Port { get; set; } = 6222; +} diff --git a/src/NATS.Server/Configuration/ConfigProcessor.cs b/src/NATS.Server/Configuration/ConfigProcessor.cs index 88b36ae..fa2ae1c 100644 --- a/src/NATS.Server/Configuration/ConfigProcessor.cs +++ b/src/NATS.Server/Configuration/ConfigProcessor.cs @@ -217,6 +217,26 @@ public static class ConfigProcessor opts.AllowNonTls = ToBool(value); break; + // Cluster / inter-server / JetStream + case "cluster": + if (value is Dictionary clusterDict) + opts.Cluster = ParseCluster(clusterDict, errors); + break; + case "gateway": + if (value is Dictionary gatewayDict) + opts.Gateway = ParseGateway(gatewayDict, errors); + break; + case "leaf": + case "leafnode": + case "leafnodes": + if (value is Dictionary leafDict) + opts.LeafNode = ParseLeafNode(leafDict, errors); + break; + case "jetstream": + if (value is Dictionary jsDict) + opts.JetStream = ParseJetStream(jsDict, errors); + break; + // Tags case "server_tags": if (value is Dictionary tagsDict) @@ -245,7 +265,7 @@ public static class ConfigProcessor opts.ReconnectErrorReports = ToInt(value); break; - // Unknown keys silently ignored (cluster, jetstream, gateway, leafnode, etc.) + // Unknown keys silently ignored default: break; } @@ -342,6 +362,9 @@ public static class ConfigProcessor private static readonly Regex DurationPattern = new( @"^(-?\d+(?:\.\d+)?)\s*(ms|s|m|h)$", RegexOptions.Compiled | RegexOptions.IgnoreCase); + private static readonly Regex ByteSizePattern = new( + @"^(\d+)\s*(b|kb|mb|gb|tb)?$", + RegexOptions.Compiled | RegexOptions.IgnoreCase); private static TimeSpan ParseDurationString(string s) { @@ -362,6 +385,133 @@ public static class ConfigProcessor }; } + // ─── Cluster / gateway / leafnode / JetStream parsing ──────── + + private static ClusterOptions ParseCluster(Dictionary dict, List errors) + { + var options = new ClusterOptions(); + foreach (var (key, value) in dict) + { + switch (key.ToLowerInvariant()) + { + case "name": + options.Name = ToString(value); + break; + case "listen": + try + { + var (host, port) = ParseHostPort(value); + if (host is not null) + options.Host = host; + if (port is not null) + options.Port = port.Value; + } + catch (Exception ex) + { + errors.Add($"Invalid cluster.listen: {ex.Message}"); + } + + break; + } + } + + return options; + } + + private static GatewayOptions ParseGateway(Dictionary dict, List errors) + { + var options = new GatewayOptions(); + foreach (var (key, value) in dict) + { + switch (key.ToLowerInvariant()) + { + case "name": + options.Name = ToString(value); + break; + case "listen": + try + { + var (host, port) = ParseHostPort(value); + if (host is not null) + options.Host = host; + if (port is not null) + options.Port = port.Value; + } + catch (Exception ex) + { + errors.Add($"Invalid gateway.listen: {ex.Message}"); + } + + break; + } + } + + return options; + } + + private static LeafNodeOptions ParseLeafNode(Dictionary dict, List errors) + { + var options = new LeafNodeOptions(); + foreach (var (key, value) in dict) + { + if (key.Equals("listen", StringComparison.OrdinalIgnoreCase)) + { + try + { + var (host, port) = ParseHostPort(value); + if (host is not null) + options.Host = host; + if (port is not null) + options.Port = port.Value; + } + catch (Exception ex) + { + errors.Add($"Invalid leafnode.listen: {ex.Message}"); + } + } + } + + return options; + } + + private static JetStreamOptions ParseJetStream(Dictionary dict, List errors) + { + var options = new JetStreamOptions(); + foreach (var (key, value) in dict) + { + switch (key.ToLowerInvariant()) + { + case "store_dir": + options.StoreDir = ToString(value); + break; + case "max_mem_store": + try + { + options.MaxMemoryStore = ParseByteSize(value); + } + catch (Exception ex) + { + errors.Add($"Invalid jetstream.max_mem_store: {ex.Message}"); + } + + break; + case "max_file_store": + try + { + options.MaxFileStore = ParseByteSize(value); + } + catch (Exception ex) + { + errors.Add($"Invalid jetstream.max_file_store: {ex.Message}"); + } + + break; + } + } + + return options; + } + // ─── Authorization parsing ───────────────────────────────────── private static void ParseAuthorization(Dictionary dict, NatsOptions opts, List errors) @@ -640,6 +790,40 @@ public static class ConfigProcessor _ => throw new FormatException($"Cannot convert {value?.GetType().Name ?? "null"} to long"), }; + private static long ParseByteSize(object? value) + { + if (value is long l) + return l; + if (value is int i) + return i; + if (value is double d) + return (long)d; + if (value is not string s) + throw new FormatException($"Cannot parse byte size from {value?.GetType().Name ?? "null"}"); + + var trimmed = s.Trim(); + var match = ByteSizePattern.Match(trimmed); + if (!match.Success) + throw new FormatException($"Cannot parse byte size: '{s}'"); + + var amount = long.Parse(match.Groups[1].Value, CultureInfo.InvariantCulture); + var unit = match.Groups[2].Value.ToLowerInvariant(); + var multiplier = unit switch + { + "" or "b" => 1L, + "kb" => 1024L, + "mb" => 1024L * 1024L, + "gb" => 1024L * 1024L * 1024L, + "tb" => 1024L * 1024L * 1024L * 1024L, + _ => throw new FormatException($"Unknown byte-size unit: '{unit}'"), + }; + + checked + { + return amount * multiplier; + } + } + private static bool ToBool(object? value) => value switch { bool b => b, diff --git a/src/NATS.Server/Configuration/GatewayOptions.cs b/src/NATS.Server/Configuration/GatewayOptions.cs new file mode 100644 index 0000000..cfdbd15 --- /dev/null +++ b/src/NATS.Server/Configuration/GatewayOptions.cs @@ -0,0 +1,8 @@ +namespace NATS.Server.Configuration; + +public sealed class GatewayOptions +{ + public string? Name { get; set; } + public string Host { get; set; } = "0.0.0.0"; + public int Port { get; set; } +} diff --git a/src/NATS.Server/Configuration/JetStreamOptions.cs b/src/NATS.Server/Configuration/JetStreamOptions.cs new file mode 100644 index 0000000..1301f60 --- /dev/null +++ b/src/NATS.Server/Configuration/JetStreamOptions.cs @@ -0,0 +1,8 @@ +namespace NATS.Server.Configuration; + +public sealed class JetStreamOptions +{ + public string StoreDir { get; set; } = string.Empty; + public long MaxMemoryStore { get; set; } + public long MaxFileStore { get; set; } +} diff --git a/src/NATS.Server/Configuration/LeafNodeOptions.cs b/src/NATS.Server/Configuration/LeafNodeOptions.cs new file mode 100644 index 0000000..59d8421 --- /dev/null +++ b/src/NATS.Server/Configuration/LeafNodeOptions.cs @@ -0,0 +1,7 @@ +namespace NATS.Server.Configuration; + +public sealed class LeafNodeOptions +{ + public string Host { get; set; } = "0.0.0.0"; + public int Port { get; set; } +} diff --git a/src/NATS.Server/NatsOptions.cs b/src/NATS.Server/NatsOptions.cs index 1e3820e..9e1281e 100644 --- a/src/NATS.Server/NatsOptions.cs +++ b/src/NATS.Server/NatsOptions.cs @@ -1,5 +1,6 @@ using System.Security.Authentication; using NATS.Server.Auth; +using NATS.Server.Configuration; using NATS.Server.Tls; namespace NATS.Server; @@ -115,6 +116,12 @@ public sealed class NatsOptions // Subject mapping / transforms (source pattern -> destination template) public Dictionary? SubjectMappings { get; set; } + // Cluster and JetStream settings + public ClusterOptions? Cluster { get; set; } + public GatewayOptions? Gateway { get; set; } + public LeafNodeOptions? LeafNode { get; set; } + public JetStreamOptions? JetStream { get; set; } + public bool HasTls => TlsCert != null && TlsKey != null; // WebSocket diff --git a/tests/NATS.Server.Tests/ClusterJetStreamConfigProcessorTests.cs b/tests/NATS.Server.Tests/ClusterJetStreamConfigProcessorTests.cs new file mode 100644 index 0000000..dba2216 --- /dev/null +++ b/tests/NATS.Server.Tests/ClusterJetStreamConfigProcessorTests.cs @@ -0,0 +1,21 @@ +using NATS.Server.Configuration; + +namespace NATS.Server.Tests; + +public class ClusterJetStreamConfigProcessorTests +{ + [Fact] + public void ConfigProcessor_maps_jetstream_and_cluster_blocks() + { + var cfg = """ + cluster { name: C1; listen: 127.0.0.1:6222 } + jetstream { store_dir: /tmp/js; max_mem_store: 1GB; max_file_store: 10GB } + """; + + var opts = ConfigProcessor.ProcessConfig(cfg); + + opts.Cluster.ShouldNotBeNull(); + opts.JetStream.ShouldNotBeNull(); + opts.JetStream!.StoreDir.ShouldBe("/tmp/js"); + } +}