feat: parse cluster and jetstream config blocks
This commit is contained in:
8
src/NATS.Server/Configuration/ClusterOptions.cs
Normal file
8
src/NATS.Server/Configuration/ClusterOptions.cs
Normal file
@@ -0,0 +1,8 @@
|
|||||||
|
namespace NATS.Server.Configuration;
|
||||||
|
|
||||||
|
public sealed class ClusterOptions
|
||||||
|
{
|
||||||
|
public string? Name { get; set; }
|
||||||
|
public string Host { get; set; } = "0.0.0.0";
|
||||||
|
public int Port { get; set; } = 6222;
|
||||||
|
}
|
||||||
@@ -217,6 +217,26 @@ public static class ConfigProcessor
|
|||||||
opts.AllowNonTls = ToBool(value);
|
opts.AllowNonTls = ToBool(value);
|
||||||
break;
|
break;
|
||||||
|
|
||||||
|
// Cluster / inter-server / JetStream
|
||||||
|
case "cluster":
|
||||||
|
if (value is Dictionary<string, object?> clusterDict)
|
||||||
|
opts.Cluster = ParseCluster(clusterDict, errors);
|
||||||
|
break;
|
||||||
|
case "gateway":
|
||||||
|
if (value is Dictionary<string, object?> gatewayDict)
|
||||||
|
opts.Gateway = ParseGateway(gatewayDict, errors);
|
||||||
|
break;
|
||||||
|
case "leaf":
|
||||||
|
case "leafnode":
|
||||||
|
case "leafnodes":
|
||||||
|
if (value is Dictionary<string, object?> leafDict)
|
||||||
|
opts.LeafNode = ParseLeafNode(leafDict, errors);
|
||||||
|
break;
|
||||||
|
case "jetstream":
|
||||||
|
if (value is Dictionary<string, object?> jsDict)
|
||||||
|
opts.JetStream = ParseJetStream(jsDict, errors);
|
||||||
|
break;
|
||||||
|
|
||||||
// Tags
|
// Tags
|
||||||
case "server_tags":
|
case "server_tags":
|
||||||
if (value is Dictionary<string, object?> tagsDict)
|
if (value is Dictionary<string, object?> tagsDict)
|
||||||
@@ -245,7 +265,7 @@ public static class ConfigProcessor
|
|||||||
opts.ReconnectErrorReports = ToInt(value);
|
opts.ReconnectErrorReports = ToInt(value);
|
||||||
break;
|
break;
|
||||||
|
|
||||||
// Unknown keys silently ignored (cluster, jetstream, gateway, leafnode, etc.)
|
// Unknown keys silently ignored
|
||||||
default:
|
default:
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
@@ -342,6 +362,9 @@ public static class ConfigProcessor
|
|||||||
private static readonly Regex DurationPattern = new(
|
private static readonly Regex DurationPattern = new(
|
||||||
@"^(-?\d+(?:\.\d+)?)\s*(ms|s|m|h)$",
|
@"^(-?\d+(?:\.\d+)?)\s*(ms|s|m|h)$",
|
||||||
RegexOptions.Compiled | RegexOptions.IgnoreCase);
|
RegexOptions.Compiled | RegexOptions.IgnoreCase);
|
||||||
|
private static readonly Regex ByteSizePattern = new(
|
||||||
|
@"^(\d+)\s*(b|kb|mb|gb|tb)?$",
|
||||||
|
RegexOptions.Compiled | RegexOptions.IgnoreCase);
|
||||||
|
|
||||||
private static TimeSpan ParseDurationString(string s)
|
private static TimeSpan ParseDurationString(string s)
|
||||||
{
|
{
|
||||||
@@ -362,6 +385,133 @@ public static class ConfigProcessor
|
|||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ─── Cluster / gateway / leafnode / JetStream parsing ────────
|
||||||
|
|
||||||
|
private static ClusterOptions ParseCluster(Dictionary<string, object?> dict, List<string> errors)
|
||||||
|
{
|
||||||
|
var options = new ClusterOptions();
|
||||||
|
foreach (var (key, value) in dict)
|
||||||
|
{
|
||||||
|
switch (key.ToLowerInvariant())
|
||||||
|
{
|
||||||
|
case "name":
|
||||||
|
options.Name = ToString(value);
|
||||||
|
break;
|
||||||
|
case "listen":
|
||||||
|
try
|
||||||
|
{
|
||||||
|
var (host, port) = ParseHostPort(value);
|
||||||
|
if (host is not null)
|
||||||
|
options.Host = host;
|
||||||
|
if (port is not null)
|
||||||
|
options.Port = port.Value;
|
||||||
|
}
|
||||||
|
catch (Exception ex)
|
||||||
|
{
|
||||||
|
errors.Add($"Invalid cluster.listen: {ex.Message}");
|
||||||
|
}
|
||||||
|
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return options;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static GatewayOptions ParseGateway(Dictionary<string, object?> dict, List<string> errors)
|
||||||
|
{
|
||||||
|
var options = new GatewayOptions();
|
||||||
|
foreach (var (key, value) in dict)
|
||||||
|
{
|
||||||
|
switch (key.ToLowerInvariant())
|
||||||
|
{
|
||||||
|
case "name":
|
||||||
|
options.Name = ToString(value);
|
||||||
|
break;
|
||||||
|
case "listen":
|
||||||
|
try
|
||||||
|
{
|
||||||
|
var (host, port) = ParseHostPort(value);
|
||||||
|
if (host is not null)
|
||||||
|
options.Host = host;
|
||||||
|
if (port is not null)
|
||||||
|
options.Port = port.Value;
|
||||||
|
}
|
||||||
|
catch (Exception ex)
|
||||||
|
{
|
||||||
|
errors.Add($"Invalid gateway.listen: {ex.Message}");
|
||||||
|
}
|
||||||
|
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return options;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static LeafNodeOptions ParseLeafNode(Dictionary<string, object?> dict, List<string> errors)
|
||||||
|
{
|
||||||
|
var options = new LeafNodeOptions();
|
||||||
|
foreach (var (key, value) in dict)
|
||||||
|
{
|
||||||
|
if (key.Equals("listen", StringComparison.OrdinalIgnoreCase))
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
var (host, port) = ParseHostPort(value);
|
||||||
|
if (host is not null)
|
||||||
|
options.Host = host;
|
||||||
|
if (port is not null)
|
||||||
|
options.Port = port.Value;
|
||||||
|
}
|
||||||
|
catch (Exception ex)
|
||||||
|
{
|
||||||
|
errors.Add($"Invalid leafnode.listen: {ex.Message}");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return options;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static JetStreamOptions ParseJetStream(Dictionary<string, object?> dict, List<string> errors)
|
||||||
|
{
|
||||||
|
var options = new JetStreamOptions();
|
||||||
|
foreach (var (key, value) in dict)
|
||||||
|
{
|
||||||
|
switch (key.ToLowerInvariant())
|
||||||
|
{
|
||||||
|
case "store_dir":
|
||||||
|
options.StoreDir = ToString(value);
|
||||||
|
break;
|
||||||
|
case "max_mem_store":
|
||||||
|
try
|
||||||
|
{
|
||||||
|
options.MaxMemoryStore = ParseByteSize(value);
|
||||||
|
}
|
||||||
|
catch (Exception ex)
|
||||||
|
{
|
||||||
|
errors.Add($"Invalid jetstream.max_mem_store: {ex.Message}");
|
||||||
|
}
|
||||||
|
|
||||||
|
break;
|
||||||
|
case "max_file_store":
|
||||||
|
try
|
||||||
|
{
|
||||||
|
options.MaxFileStore = ParseByteSize(value);
|
||||||
|
}
|
||||||
|
catch (Exception ex)
|
||||||
|
{
|
||||||
|
errors.Add($"Invalid jetstream.max_file_store: {ex.Message}");
|
||||||
|
}
|
||||||
|
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return options;
|
||||||
|
}
|
||||||
|
|
||||||
// ─── Authorization parsing ─────────────────────────────────────
|
// ─── Authorization parsing ─────────────────────────────────────
|
||||||
|
|
||||||
private static void ParseAuthorization(Dictionary<string, object?> dict, NatsOptions opts, List<string> errors)
|
private static void ParseAuthorization(Dictionary<string, object?> dict, NatsOptions opts, List<string> errors)
|
||||||
@@ -640,6 +790,40 @@ public static class ConfigProcessor
|
|||||||
_ => throw new FormatException($"Cannot convert {value?.GetType().Name ?? "null"} to long"),
|
_ => throw new FormatException($"Cannot convert {value?.GetType().Name ?? "null"} to long"),
|
||||||
};
|
};
|
||||||
|
|
||||||
|
private static long ParseByteSize(object? value)
|
||||||
|
{
|
||||||
|
if (value is long l)
|
||||||
|
return l;
|
||||||
|
if (value is int i)
|
||||||
|
return i;
|
||||||
|
if (value is double d)
|
||||||
|
return (long)d;
|
||||||
|
if (value is not string s)
|
||||||
|
throw new FormatException($"Cannot parse byte size from {value?.GetType().Name ?? "null"}");
|
||||||
|
|
||||||
|
var trimmed = s.Trim();
|
||||||
|
var match = ByteSizePattern.Match(trimmed);
|
||||||
|
if (!match.Success)
|
||||||
|
throw new FormatException($"Cannot parse byte size: '{s}'");
|
||||||
|
|
||||||
|
var amount = long.Parse(match.Groups[1].Value, CultureInfo.InvariantCulture);
|
||||||
|
var unit = match.Groups[2].Value.ToLowerInvariant();
|
||||||
|
var multiplier = unit switch
|
||||||
|
{
|
||||||
|
"" or "b" => 1L,
|
||||||
|
"kb" => 1024L,
|
||||||
|
"mb" => 1024L * 1024L,
|
||||||
|
"gb" => 1024L * 1024L * 1024L,
|
||||||
|
"tb" => 1024L * 1024L * 1024L * 1024L,
|
||||||
|
_ => throw new FormatException($"Unknown byte-size unit: '{unit}'"),
|
||||||
|
};
|
||||||
|
|
||||||
|
checked
|
||||||
|
{
|
||||||
|
return amount * multiplier;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private static bool ToBool(object? value) => value switch
|
private static bool ToBool(object? value) => value switch
|
||||||
{
|
{
|
||||||
bool b => b,
|
bool b => b,
|
||||||
|
|||||||
8
src/NATS.Server/Configuration/GatewayOptions.cs
Normal file
8
src/NATS.Server/Configuration/GatewayOptions.cs
Normal file
@@ -0,0 +1,8 @@
|
|||||||
|
namespace NATS.Server.Configuration;
|
||||||
|
|
||||||
|
public sealed class GatewayOptions
|
||||||
|
{
|
||||||
|
public string? Name { get; set; }
|
||||||
|
public string Host { get; set; } = "0.0.0.0";
|
||||||
|
public int Port { get; set; }
|
||||||
|
}
|
||||||
8
src/NATS.Server/Configuration/JetStreamOptions.cs
Normal file
8
src/NATS.Server/Configuration/JetStreamOptions.cs
Normal file
@@ -0,0 +1,8 @@
|
|||||||
|
namespace NATS.Server.Configuration;
|
||||||
|
|
||||||
|
public sealed class JetStreamOptions
|
||||||
|
{
|
||||||
|
public string StoreDir { get; set; } = string.Empty;
|
||||||
|
public long MaxMemoryStore { get; set; }
|
||||||
|
public long MaxFileStore { get; set; }
|
||||||
|
}
|
||||||
7
src/NATS.Server/Configuration/LeafNodeOptions.cs
Normal file
7
src/NATS.Server/Configuration/LeafNodeOptions.cs
Normal file
@@ -0,0 +1,7 @@
|
|||||||
|
namespace NATS.Server.Configuration;
|
||||||
|
|
||||||
|
public sealed class LeafNodeOptions
|
||||||
|
{
|
||||||
|
public string Host { get; set; } = "0.0.0.0";
|
||||||
|
public int Port { get; set; }
|
||||||
|
}
|
||||||
@@ -1,5 +1,6 @@
|
|||||||
using System.Security.Authentication;
|
using System.Security.Authentication;
|
||||||
using NATS.Server.Auth;
|
using NATS.Server.Auth;
|
||||||
|
using NATS.Server.Configuration;
|
||||||
using NATS.Server.Tls;
|
using NATS.Server.Tls;
|
||||||
|
|
||||||
namespace NATS.Server;
|
namespace NATS.Server;
|
||||||
@@ -115,6 +116,12 @@ public sealed class NatsOptions
|
|||||||
// Subject mapping / transforms (source pattern -> destination template)
|
// Subject mapping / transforms (source pattern -> destination template)
|
||||||
public Dictionary<string, string>? SubjectMappings { get; set; }
|
public Dictionary<string, string>? SubjectMappings { get; set; }
|
||||||
|
|
||||||
|
// Cluster and JetStream settings
|
||||||
|
public ClusterOptions? Cluster { get; set; }
|
||||||
|
public GatewayOptions? Gateway { get; set; }
|
||||||
|
public LeafNodeOptions? LeafNode { get; set; }
|
||||||
|
public JetStreamOptions? JetStream { get; set; }
|
||||||
|
|
||||||
public bool HasTls => TlsCert != null && TlsKey != null;
|
public bool HasTls => TlsCert != null && TlsKey != null;
|
||||||
|
|
||||||
// WebSocket
|
// WebSocket
|
||||||
|
|||||||
@@ -0,0 +1,21 @@
|
|||||||
|
using NATS.Server.Configuration;
|
||||||
|
|
||||||
|
namespace NATS.Server.Tests;
|
||||||
|
|
||||||
|
public class ClusterJetStreamConfigProcessorTests
|
||||||
|
{
|
||||||
|
[Fact]
|
||||||
|
public void ConfigProcessor_maps_jetstream_and_cluster_blocks()
|
||||||
|
{
|
||||||
|
var cfg = """
|
||||||
|
cluster { name: C1; listen: 127.0.0.1:6222 }
|
||||||
|
jetstream { store_dir: /tmp/js; max_mem_store: 1GB; max_file_store: 10GB }
|
||||||
|
""";
|
||||||
|
|
||||||
|
var opts = ConfigProcessor.ProcessConfig(cfg);
|
||||||
|
|
||||||
|
opts.Cluster.ShouldNotBeNull();
|
||||||
|
opts.JetStream.ShouldNotBeNull();
|
||||||
|
opts.JetStream!.StoreDir.ShouldBe("/tmp/js");
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user