Compare commits
10 Commits
8e790445f4
...
feature/sy
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
9b784024db | ||
|
|
86283a7f97 | ||
|
|
4450c27381 | ||
|
|
c9066e526d | ||
|
|
4c2b7fa3de | ||
|
|
591833adbb | ||
|
|
5bae9cc289 | ||
|
|
0b34f8cec4 | ||
|
|
125b71b3b0 | ||
|
|
89465450a1 |
@@ -11,7 +11,7 @@
|
||||
| Feature | Go | .NET | Notes |
|
||||
|---------|:--:|:----:|-------|
|
||||
| NKey generation (server identity) | Y | Y | Ed25519 key pair via NATS.NKeys at startup |
|
||||
| System account setup | Y | Y | `$SYS` account created; no event publishing yet (stub) |
|
||||
| System account setup | Y | Y | `$SYS` account with InternalEventSystem, event publishing, request-reply services |
|
||||
| Config file validation on startup | Y | Y | Full config parsing with error collection via `ConfigProcessor` |
|
||||
| PID file writing | Y | Y | Written on startup, deleted on shutdown |
|
||||
| Profiling HTTP endpoint (`/debug/pprof`) | Y | Stub | `ProfPort` option exists but endpoint not implemented |
|
||||
@@ -64,9 +64,9 @@
|
||||
| ROUTER | Y | N | Excluded per scope |
|
||||
| GATEWAY | Y | N | Excluded per scope |
|
||||
| LEAF | Y | N | Excluded per scope |
|
||||
| SYSTEM (internal) | Y | N | |
|
||||
| SYSTEM (internal) | Y | Y | InternalClient + InternalEventSystem with Channel-based send/receive loops |
|
||||
| JETSTREAM (internal) | Y | N | |
|
||||
| ACCOUNT (internal) | Y | N | |
|
||||
| ACCOUNT (internal) | Y | Y | Lazy per-account InternalClient with import/export subscription support |
|
||||
| WebSocket clients | Y | N | |
|
||||
| MQTT clients | Y | N | |
|
||||
|
||||
@@ -218,7 +218,7 @@ Go implements a sophisticated slow consumer detection system:
|
||||
|---------|:--:|:----:|-------|
|
||||
| Per-account SubList isolation | Y | Y | |
|
||||
| Multi-account user resolution | Y | Y | `AccountConfig` per account in `NatsOptions.Accounts`; `GetOrCreateAccount` wires limits |
|
||||
| Account exports/imports | Y | N | |
|
||||
| Account exports/imports | Y | Y | ServiceImport/StreamImport with ExportAuth, subject transforms, response routing |
|
||||
| Per-account connection limits | Y | Y | `Account.AddClient()` returns false when `MaxConnections` exceeded |
|
||||
| Per-account subscription limits | Y | Y | `Account.IncrementSubscriptions()` enforced in `ProcessSub()` |
|
||||
| Account JetStream limits | Y | N | Excluded per scope |
|
||||
@@ -406,6 +406,11 @@ The following items from the original gap list have been implemented:
|
||||
- **User revocation** — per-account tracking with wildcard (`*`) revocation
|
||||
- **Config file parsing** — custom lexer/parser ported from Go; supports includes, variables, nested blocks, size suffixes
|
||||
- **Hot reload (SIGHUP)** — re-parses config, diffs changes, validates reloadable set, applies with CLI precedence
|
||||
- **SYSTEM client type** — InternalClient with InternalEventSystem, Channel-based send/receive loops, event publishing
|
||||
- **ACCOUNT client type** — lazy per-account InternalClient with import/export subscription support
|
||||
- **System event publishing** — connect/disconnect advisories, server stats, shutdown/lame-duck events, auth errors
|
||||
- **System request-reply services** — $SYS.REQ.SERVER.*.VARZ/CONNZ/SUBSZ/HEALTHZ/IDZ/STATSZ with ping wildcards
|
||||
- **Account exports/imports** — service and stream imports with ExportAuth, subject transforms, response routing, latency tracking
|
||||
|
||||
### Remaining Lower Priority
|
||||
1. **Dynamic buffer sizing** — delegated to Pipe, less optimized for long-lived connections
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
using System.Collections.Concurrent;
|
||||
using NATS.Server.Imports;
|
||||
using NATS.Server.Subscriptions;
|
||||
|
||||
namespace NATS.Server.Auth;
|
||||
@@ -12,6 +13,8 @@ public sealed class Account : IDisposable
|
||||
public Permissions? DefaultPermissions { get; set; }
|
||||
public int MaxConnections { get; set; } // 0 = unlimited
|
||||
public int MaxSubscriptions { get; set; } // 0 = unlimited
|
||||
public ExportMap Exports { get; } = new();
|
||||
public ImportMap Imports { get; } = new();
|
||||
|
||||
// JWT fields
|
||||
public string? Nkey { get; set; }
|
||||
@@ -89,5 +92,77 @@ public sealed class Account : IDisposable
|
||||
Interlocked.Add(ref _outBytes, bytes);
|
||||
}
|
||||
|
||||
// Internal (ACCOUNT) client for import/export message routing
|
||||
private InternalClient? _internalClient;
|
||||
|
||||
public InternalClient GetOrCreateInternalClient(ulong clientId)
|
||||
{
|
||||
if (_internalClient != null) return _internalClient;
|
||||
_internalClient = new InternalClient(clientId, ClientKind.Account, this);
|
||||
return _internalClient;
|
||||
}
|
||||
|
||||
public void AddServiceExport(string subject, ServiceResponseType responseType, IEnumerable<Account>? approved)
|
||||
{
|
||||
var auth = new ExportAuth
|
||||
{
|
||||
ApprovedAccounts = approved != null ? new HashSet<string>(approved.Select(a => a.Name)) : null,
|
||||
};
|
||||
Exports.Services[subject] = new ServiceExport
|
||||
{
|
||||
Auth = auth,
|
||||
Account = this,
|
||||
ResponseType = responseType,
|
||||
};
|
||||
}
|
||||
|
||||
public void AddStreamExport(string subject, IEnumerable<Account>? approved)
|
||||
{
|
||||
var auth = new ExportAuth
|
||||
{
|
||||
ApprovedAccounts = approved != null ? new HashSet<string>(approved.Select(a => a.Name)) : null,
|
||||
};
|
||||
Exports.Streams[subject] = new StreamExport { Auth = auth };
|
||||
}
|
||||
|
||||
public ServiceImport AddServiceImport(Account destination, string from, string to)
|
||||
{
|
||||
if (!destination.Exports.Services.TryGetValue(to, out var export))
|
||||
throw new InvalidOperationException($"No service export found for '{to}' on account '{destination.Name}'");
|
||||
|
||||
if (!export.Auth.IsAuthorized(this))
|
||||
throw new UnauthorizedAccessException($"Account '{Name}' not authorized to import '{to}' from '{destination.Name}'");
|
||||
|
||||
var si = new ServiceImport
|
||||
{
|
||||
DestinationAccount = destination,
|
||||
From = from,
|
||||
To = to,
|
||||
Export = export,
|
||||
ResponseType = export.ResponseType,
|
||||
};
|
||||
|
||||
Imports.AddServiceImport(si);
|
||||
return si;
|
||||
}
|
||||
|
||||
public void AddStreamImport(Account source, string from, string to)
|
||||
{
|
||||
if (!source.Exports.Streams.TryGetValue(from, out var export))
|
||||
throw new InvalidOperationException($"No stream export found for '{from}' on account '{source.Name}'");
|
||||
|
||||
if (!export.Auth.IsAuthorized(this))
|
||||
throw new UnauthorizedAccessException($"Account '{Name}' not authorized to import '{from}' from '{source.Name}'");
|
||||
|
||||
var si = new StreamImport
|
||||
{
|
||||
SourceAccount = source,
|
||||
From = from,
|
||||
To = to,
|
||||
};
|
||||
|
||||
Imports.Streams.Add(si);
|
||||
}
|
||||
|
||||
public void Dispose() => SubList.Dispose();
|
||||
}
|
||||
|
||||
@@ -58,6 +58,7 @@ public sealed class InternalEventSystem : IAsyncDisposable
|
||||
|
||||
private ulong _sequence;
|
||||
private int _subscriptionId;
|
||||
private readonly ConcurrentDictionary<string, SystemMessageHandler> _callbacks = new();
|
||||
|
||||
public Account SystemAccount { get; }
|
||||
public InternalClient SystemClient { get; }
|
||||
@@ -84,6 +85,89 @@ public sealed class InternalEventSystem : IAsyncDisposable
|
||||
_sendLoop = Task.Run(() => InternalSendLoopAsync(ct), ct);
|
||||
_receiveLoop = Task.Run(() => InternalReceiveLoopAsync(_receiveQueue, ct), ct);
|
||||
_receiveLoopPings = Task.Run(() => InternalReceiveLoopAsync(_receiveQueuePings, ct), ct);
|
||||
|
||||
// Periodic stats publish every 10 seconds
|
||||
_ = Task.Run(async () =>
|
||||
{
|
||||
using var timer = new PeriodicTimer(TimeSpan.FromSeconds(10));
|
||||
while (await timer.WaitForNextTickAsync(ct))
|
||||
{
|
||||
PublishServerStats();
|
||||
}
|
||||
}, ct);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Registers system request-reply monitoring services for this server.
|
||||
/// Maps to Go's initEventTracking in events.go.
|
||||
/// Sets up handlers for $SYS.REQ.SERVER.{id}.VARZ, HEALTHZ, SUBSZ, STATSZ, IDZ
|
||||
/// and wildcard $SYS.REQ.SERVER.PING.* subjects.
|
||||
/// </summary>
|
||||
public void InitEventTracking(NatsServer server)
|
||||
{
|
||||
_server = server;
|
||||
var serverId = server.ServerId;
|
||||
|
||||
// Server-specific monitoring services
|
||||
RegisterService(serverId, "VARZ", server.HandleVarzRequest);
|
||||
RegisterService(serverId, "HEALTHZ", server.HandleHealthzRequest);
|
||||
RegisterService(serverId, "SUBSZ", server.HandleSubszRequest);
|
||||
RegisterService(serverId, "STATSZ", server.HandleStatszRequest);
|
||||
RegisterService(serverId, "IDZ", server.HandleIdzRequest);
|
||||
|
||||
// Wildcard ping services (all servers respond)
|
||||
SysSubscribe(string.Format(EventSubjects.ServerPing, "VARZ"), WrapRequestHandler(server.HandleVarzRequest));
|
||||
SysSubscribe(string.Format(EventSubjects.ServerPing, "HEALTHZ"), WrapRequestHandler(server.HandleHealthzRequest));
|
||||
SysSubscribe(string.Format(EventSubjects.ServerPing, "IDZ"), WrapRequestHandler(server.HandleIdzRequest));
|
||||
SysSubscribe(string.Format(EventSubjects.ServerPing, "STATSZ"), WrapRequestHandler(server.HandleStatszRequest));
|
||||
}
|
||||
|
||||
private void RegisterService(string serverId, string name, Action<string, string?> handler)
|
||||
{
|
||||
var subject = string.Format(EventSubjects.ServerReq, serverId, name);
|
||||
SysSubscribe(subject, WrapRequestHandler(handler));
|
||||
}
|
||||
|
||||
private SystemMessageHandler WrapRequestHandler(Action<string, string?> handler)
|
||||
{
|
||||
return (sub, client, acc, subject, reply, hdr, msg) =>
|
||||
{
|
||||
handler(subject, reply);
|
||||
};
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Publishes a $SYS.SERVER.{id}.STATSZ message with current server statistics.
|
||||
/// Maps to Go's sendStatsz in events.go.
|
||||
/// Can be called manually for testing or is invoked periodically by the stats timer.
|
||||
/// </summary>
|
||||
public void PublishServerStats()
|
||||
{
|
||||
if (_server == null) return;
|
||||
|
||||
var subject = string.Format(EventSubjects.ServerStats, _server.ServerId);
|
||||
var process = System.Diagnostics.Process.GetCurrentProcess();
|
||||
|
||||
var statsMsg = new ServerStatsMsg
|
||||
{
|
||||
Server = _server.BuildEventServerInfo(),
|
||||
Stats = new ServerStatsData
|
||||
{
|
||||
Start = _server.StartTime,
|
||||
Mem = process.WorkingSet64,
|
||||
Cores = Environment.ProcessorCount,
|
||||
Connections = _server.ClientCount,
|
||||
TotalConnections = Interlocked.Read(ref _server.Stats.TotalConnections),
|
||||
Subscriptions = SystemAccount.SubList.Count,
|
||||
InMsgs = Interlocked.Read(ref _server.Stats.InMsgs),
|
||||
OutMsgs = Interlocked.Read(ref _server.Stats.OutMsgs),
|
||||
InBytes = Interlocked.Read(ref _server.Stats.InBytes),
|
||||
OutBytes = Interlocked.Read(ref _server.Stats.OutBytes),
|
||||
SlowConsumers = Interlocked.Read(ref _server.Stats.SlowConsumers),
|
||||
},
|
||||
};
|
||||
|
||||
Enqueue(new PublishMessage { Subject = subject, Body = statsMsg });
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
@@ -100,26 +184,37 @@ public sealed class InternalEventSystem : IAsyncDisposable
|
||||
Client = SystemClient,
|
||||
};
|
||||
|
||||
// Wrap callback in noInlineCallback pattern: enqueue to receive loop
|
||||
// Store callback keyed by SID so multiple subscriptions work
|
||||
_callbacks[sid] = callback;
|
||||
|
||||
// Set a single routing callback on the system client that dispatches by SID
|
||||
SystemClient.MessageCallback = (subj, s, reply, hdr, msg) =>
|
||||
{
|
||||
_receiveQueue.Writer.TryWrite(new InternalSystemMessage
|
||||
if (_callbacks.TryGetValue(s, out var cb))
|
||||
{
|
||||
Sub = sub,
|
||||
Client = SystemClient,
|
||||
Account = SystemAccount,
|
||||
Subject = subj,
|
||||
Reply = reply,
|
||||
Headers = hdr,
|
||||
Message = msg,
|
||||
Callback = callback,
|
||||
});
|
||||
_receiveQueue.Writer.TryWrite(new InternalSystemMessage
|
||||
{
|
||||
Sub = sub,
|
||||
Client = SystemClient,
|
||||
Account = SystemAccount,
|
||||
Subject = subj,
|
||||
Reply = reply,
|
||||
Headers = hdr,
|
||||
Message = msg,
|
||||
Callback = cb,
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
SystemAccount.SubList.Insert(sub);
|
||||
return sub;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Returns the next monotonically increasing sequence number for event ordering.
|
||||
/// </summary>
|
||||
public ulong NextSequence() => Interlocked.Increment(ref _sequence);
|
||||
|
||||
/// <summary>
|
||||
/// Enqueue an internal message for publishing through the send loop.
|
||||
/// </summary>
|
||||
|
||||
25
src/NATS.Server/Imports/ExportAuth.cs
Normal file
25
src/NATS.Server/Imports/ExportAuth.cs
Normal file
@@ -0,0 +1,25 @@
|
||||
using NATS.Server.Auth;
|
||||
|
||||
namespace NATS.Server.Imports;
|
||||
|
||||
public sealed class ExportAuth
|
||||
{
|
||||
public bool TokenRequired { get; init; }
|
||||
public uint AccountPosition { get; init; }
|
||||
public HashSet<string>? ApprovedAccounts { get; init; }
|
||||
public Dictionary<string, long>? RevokedAccounts { get; init; }
|
||||
|
||||
public bool IsAuthorized(Account account)
|
||||
{
|
||||
if (RevokedAccounts != null && RevokedAccounts.ContainsKey(account.Name))
|
||||
return false;
|
||||
|
||||
if (ApprovedAccounts == null && !TokenRequired && AccountPosition == 0)
|
||||
return true;
|
||||
|
||||
if (ApprovedAccounts != null)
|
||||
return ApprovedAccounts.Contains(account.Name);
|
||||
|
||||
return false;
|
||||
}
|
||||
}
|
||||
8
src/NATS.Server/Imports/ExportMap.cs
Normal file
8
src/NATS.Server/Imports/ExportMap.cs
Normal file
@@ -0,0 +1,8 @@
|
||||
namespace NATS.Server.Imports;
|
||||
|
||||
public sealed class ExportMap
|
||||
{
|
||||
public Dictionary<string, StreamExport> Streams { get; } = new(StringComparer.Ordinal);
|
||||
public Dictionary<string, ServiceExport> Services { get; } = new(StringComparer.Ordinal);
|
||||
public Dictionary<string, ServiceImport> Responses { get; } = new(StringComparer.Ordinal);
|
||||
}
|
||||
18
src/NATS.Server/Imports/ImportMap.cs
Normal file
18
src/NATS.Server/Imports/ImportMap.cs
Normal file
@@ -0,0 +1,18 @@
|
||||
namespace NATS.Server.Imports;
|
||||
|
||||
public sealed class ImportMap
|
||||
{
|
||||
public List<StreamImport> Streams { get; } = [];
|
||||
public Dictionary<string, List<ServiceImport>> Services { get; } = new(StringComparer.Ordinal);
|
||||
|
||||
public void AddServiceImport(ServiceImport si)
|
||||
{
|
||||
if (!Services.TryGetValue(si.From, out var list))
|
||||
{
|
||||
list = [];
|
||||
Services[si.From] = list;
|
||||
}
|
||||
|
||||
list.Add(si);
|
||||
}
|
||||
}
|
||||
47
src/NATS.Server/Imports/LatencyTracker.cs
Normal file
47
src/NATS.Server/Imports/LatencyTracker.cs
Normal file
@@ -0,0 +1,47 @@
|
||||
using System.Text.Json.Serialization;
|
||||
|
||||
namespace NATS.Server.Imports;
|
||||
|
||||
public sealed class ServiceLatencyMsg
|
||||
{
|
||||
[JsonPropertyName("type")]
|
||||
public string Type { get; set; } = "io.nats.server.metric.v1.service_latency";
|
||||
|
||||
[JsonPropertyName("requestor")]
|
||||
public string Requestor { get; set; } = string.Empty;
|
||||
|
||||
[JsonPropertyName("responder")]
|
||||
public string Responder { get; set; } = string.Empty;
|
||||
|
||||
[JsonPropertyName("status")]
|
||||
public int Status { get; set; } = 200;
|
||||
|
||||
[JsonPropertyName("svc_latency")]
|
||||
public long ServiceLatencyNanos { get; set; }
|
||||
|
||||
[JsonPropertyName("total_latency")]
|
||||
public long TotalLatencyNanos { get; set; }
|
||||
}
|
||||
|
||||
public static class LatencyTracker
|
||||
{
|
||||
public static bool ShouldSample(ServiceLatency latency)
|
||||
{
|
||||
if (latency.SamplingPercentage <= 0) return false;
|
||||
if (latency.SamplingPercentage >= 100) return true;
|
||||
return Random.Shared.Next(100) < latency.SamplingPercentage;
|
||||
}
|
||||
|
||||
public static ServiceLatencyMsg BuildLatencyMsg(
|
||||
string requestor, string responder,
|
||||
TimeSpan serviceLatency, TimeSpan totalLatency)
|
||||
{
|
||||
return new ServiceLatencyMsg
|
||||
{
|
||||
Requestor = requestor,
|
||||
Responder = responder,
|
||||
ServiceLatencyNanos = serviceLatency.Ticks * 100,
|
||||
TotalLatencyNanos = totalLatency.Ticks * 100,
|
||||
};
|
||||
}
|
||||
}
|
||||
64
src/NATS.Server/Imports/ResponseRouter.cs
Normal file
64
src/NATS.Server/Imports/ResponseRouter.cs
Normal file
@@ -0,0 +1,64 @@
|
||||
using System.Security.Cryptography;
|
||||
using NATS.Server.Auth;
|
||||
|
||||
namespace NATS.Server.Imports;
|
||||
|
||||
/// <summary>
|
||||
/// Handles response routing for service imports.
|
||||
/// Maps to Go's service reply prefix generation and response cleanup.
|
||||
/// Reference: golang/nats-server/server/accounts.go — addRespServiceImport, removeRespServiceImport
|
||||
/// </summary>
|
||||
public static class ResponseRouter
|
||||
{
|
||||
private static readonly char[] Base62 = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789".ToCharArray();
|
||||
|
||||
/// <summary>
|
||||
/// Generates a unique reply prefix for response routing.
|
||||
/// Format: "_R_.{10 random base62 chars}."
|
||||
/// </summary>
|
||||
public static string GenerateReplyPrefix()
|
||||
{
|
||||
Span<byte> bytes = stackalloc byte[10];
|
||||
RandomNumberGenerator.Fill(bytes);
|
||||
var chars = new char[10];
|
||||
for (int i = 0; i < 10; i++)
|
||||
chars[i] = Base62[bytes[i] % 62];
|
||||
return $"_R_.{new string(chars)}.";
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Creates a response service import that maps the generated reply prefix
|
||||
/// back to the original reply subject on the requesting account.
|
||||
/// </summary>
|
||||
public static ServiceImport CreateResponseImport(
|
||||
Account exporterAccount,
|
||||
ServiceImport originalImport,
|
||||
string originalReply)
|
||||
{
|
||||
var replyPrefix = GenerateReplyPrefix();
|
||||
|
||||
var responseSi = new ServiceImport
|
||||
{
|
||||
DestinationAccount = exporterAccount,
|
||||
From = replyPrefix + ">",
|
||||
To = originalReply,
|
||||
IsResponse = true,
|
||||
ResponseType = originalImport.ResponseType,
|
||||
Export = originalImport.Export,
|
||||
TimestampTicks = DateTime.UtcNow.Ticks,
|
||||
};
|
||||
|
||||
exporterAccount.Exports.Responses[replyPrefix] = responseSi;
|
||||
return responseSi;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Removes a response import from the account's export map.
|
||||
/// For Singleton responses, this is called after the first reply is delivered.
|
||||
/// For Streamed/Chunked, it is called when the response stream ends.
|
||||
/// </summary>
|
||||
public static void CleanupResponse(Account account, string replyPrefix, ServiceImport responseSi)
|
||||
{
|
||||
account.Exports.Responses.Remove(replyPrefix);
|
||||
}
|
||||
}
|
||||
13
src/NATS.Server/Imports/ServiceExport.cs
Normal file
13
src/NATS.Server/Imports/ServiceExport.cs
Normal file
@@ -0,0 +1,13 @@
|
||||
using NATS.Server.Auth;
|
||||
|
||||
namespace NATS.Server.Imports;
|
||||
|
||||
public sealed class ServiceExport
|
||||
{
|
||||
public ExportAuth Auth { get; init; } = new();
|
||||
public Account? Account { get; init; }
|
||||
public ServiceResponseType ResponseType { get; init; } = ServiceResponseType.Singleton;
|
||||
public TimeSpan ResponseThreshold { get; init; } = TimeSpan.FromMinutes(2);
|
||||
public ServiceLatency? Latency { get; init; }
|
||||
public bool AllowTrace { get; init; }
|
||||
}
|
||||
21
src/NATS.Server/Imports/ServiceImport.cs
Normal file
21
src/NATS.Server/Imports/ServiceImport.cs
Normal file
@@ -0,0 +1,21 @@
|
||||
using NATS.Server.Auth;
|
||||
using NATS.Server.Subscriptions;
|
||||
|
||||
namespace NATS.Server.Imports;
|
||||
|
||||
public sealed class ServiceImport
|
||||
{
|
||||
public required Account DestinationAccount { get; init; }
|
||||
public required string From { get; init; }
|
||||
public required string To { get; init; }
|
||||
public SubjectTransform? Transform { get; init; }
|
||||
public ServiceExport? Export { get; init; }
|
||||
public ServiceResponseType ResponseType { get; init; } = ServiceResponseType.Singleton;
|
||||
public byte[]? Sid { get; set; }
|
||||
public bool IsResponse { get; init; }
|
||||
public bool UsePub { get; init; }
|
||||
public bool Invalid { get; set; }
|
||||
public bool Share { get; init; }
|
||||
public bool Tracking { get; init; }
|
||||
public long TimestampTicks { get; set; }
|
||||
}
|
||||
7
src/NATS.Server/Imports/ServiceLatency.cs
Normal file
7
src/NATS.Server/Imports/ServiceLatency.cs
Normal file
@@ -0,0 +1,7 @@
|
||||
namespace NATS.Server.Imports;
|
||||
|
||||
public sealed class ServiceLatency
|
||||
{
|
||||
public int SamplingPercentage { get; init; } = 100;
|
||||
public string Subject { get; init; } = string.Empty;
|
||||
}
|
||||
8
src/NATS.Server/Imports/ServiceResponseType.cs
Normal file
8
src/NATS.Server/Imports/ServiceResponseType.cs
Normal file
@@ -0,0 +1,8 @@
|
||||
namespace NATS.Server.Imports;
|
||||
|
||||
public enum ServiceResponseType
|
||||
{
|
||||
Singleton,
|
||||
Streamed,
|
||||
Chunked,
|
||||
}
|
||||
6
src/NATS.Server/Imports/StreamExport.cs
Normal file
6
src/NATS.Server/Imports/StreamExport.cs
Normal file
@@ -0,0 +1,6 @@
|
||||
namespace NATS.Server.Imports;
|
||||
|
||||
public sealed class StreamExport
|
||||
{
|
||||
public ExportAuth Auth { get; init; } = new();
|
||||
}
|
||||
14
src/NATS.Server/Imports/StreamImport.cs
Normal file
14
src/NATS.Server/Imports/StreamImport.cs
Normal file
@@ -0,0 +1,14 @@
|
||||
using NATS.Server.Auth;
|
||||
using NATS.Server.Subscriptions;
|
||||
|
||||
namespace NATS.Server.Imports;
|
||||
|
||||
public sealed class StreamImport
|
||||
{
|
||||
public required Account SourceAccount { get; init; }
|
||||
public required string From { get; init; }
|
||||
public required string To { get; init; }
|
||||
public SubjectTransform? Transform { get; init; }
|
||||
public bool UsePub { get; init; }
|
||||
public bool Invalid { get; set; }
|
||||
}
|
||||
@@ -14,12 +14,16 @@ public sealed class SubszHandler(NatsServer server)
|
||||
var opts = ParseQueryParams(ctx);
|
||||
var now = DateTime.UtcNow;
|
||||
|
||||
// Collect subscriptions from all accounts (or filtered)
|
||||
// Collect subscriptions from all accounts (or filtered).
|
||||
// Exclude the $SYS system account unless explicitly requested — its internal
|
||||
// subscriptions are infrastructure and not user-facing.
|
||||
var allSubs = new List<Subscription>();
|
||||
foreach (var account in server.GetAccounts())
|
||||
{
|
||||
if (!string.IsNullOrEmpty(opts.Account) && account.Name != opts.Account)
|
||||
continue;
|
||||
if (string.IsNullOrEmpty(opts.Account) && account.Name == "$SYS")
|
||||
continue;
|
||||
allSubs.AddRange(account.SubList.GetAllSubscriptions());
|
||||
}
|
||||
|
||||
@@ -31,10 +35,10 @@ public sealed class SubszHandler(NatsServer server)
|
||||
|
||||
var total = allSubs.Count;
|
||||
var numSubs = server.GetAccounts()
|
||||
.Where(a => string.IsNullOrEmpty(opts.Account) || a.Name == opts.Account)
|
||||
.Where(a => (string.IsNullOrEmpty(opts.Account) && a.Name != "$SYS") || a.Name == opts.Account)
|
||||
.Aggregate(0u, (sum, a) => sum + a.SubList.Count);
|
||||
var numCache = server.GetAccounts()
|
||||
.Where(a => string.IsNullOrEmpty(opts.Account) || a.Name == opts.Account)
|
||||
.Where(a => (string.IsNullOrEmpty(opts.Account) && a.Name != "$SYS") || a.Name == opts.Account)
|
||||
.Sum(a => a.SubList.CacheCount);
|
||||
|
||||
SubDetail[] details = [];
|
||||
|
||||
@@ -19,6 +19,8 @@ public interface IMessageRouter
|
||||
void ProcessMessage(string subject, string? replyTo, ReadOnlyMemory<byte> headers,
|
||||
ReadOnlyMemory<byte> payload, NatsClient sender);
|
||||
void RemoveClient(NatsClient client);
|
||||
void PublishConnectEvent(NatsClient client);
|
||||
void PublishDisconnectEvent(NatsClient client);
|
||||
}
|
||||
|
||||
public interface ISubListAccess
|
||||
@@ -445,6 +447,9 @@ public sealed class NatsClient : INatsClient, IDisposable
|
||||
_flags.SetFlag(ClientFlags.ConnectProcessFinished);
|
||||
_logger.LogDebug("CONNECT received from client {ClientId}, name={ClientName}", Id, ClientOpts?.Name);
|
||||
|
||||
// Publish connect advisory to the system event bus
|
||||
Router?.PublishConnectEvent(this);
|
||||
|
||||
// Start auth expiry timer if needed
|
||||
if (_authService.IsAuthRequired && authResult?.Expiry is { } expiry)
|
||||
{
|
||||
|
||||
@@ -10,6 +10,7 @@ using NATS.NKeys;
|
||||
using NATS.Server.Auth;
|
||||
using NATS.Server.Configuration;
|
||||
using NATS.Server.Events;
|
||||
using NATS.Server.Imports;
|
||||
using NATS.Server.Monitoring;
|
||||
using NATS.Server.Protocol;
|
||||
using NATS.Server.Subscriptions;
|
||||
@@ -93,9 +94,20 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
|
||||
|
||||
_logger.LogInformation("Initiating Shutdown...");
|
||||
|
||||
// Dispose event system before tearing down clients
|
||||
// Publish shutdown advisory before tearing down the event system
|
||||
if (_eventSystem != null)
|
||||
{
|
||||
var shutdownSubject = string.Format(EventSubjects.ServerShutdown, _serverInfo.ServerId);
|
||||
_eventSystem.Enqueue(new PublishMessage
|
||||
{
|
||||
Subject = shutdownSubject,
|
||||
Body = new ShutdownEventMsg { Server = BuildEventServerInfo(), Reason = "Server Shutdown" },
|
||||
IsLast = true,
|
||||
});
|
||||
// Give the send loop time to process the shutdown event
|
||||
await Task.Delay(100);
|
||||
await _eventSystem.DisposeAsync();
|
||||
}
|
||||
|
||||
// Signal all internal loops to stop
|
||||
await _quitCts.CancelAsync();
|
||||
@@ -387,6 +399,7 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
|
||||
_listeningStarted.TrySetResult();
|
||||
|
||||
_eventSystem?.Start(this);
|
||||
_eventSystem?.InitEventTracking(this);
|
||||
|
||||
_logger.LogInformation("Listening for client connections on {Host}:{Port}", _options.Host, _options.Port);
|
||||
|
||||
@@ -619,6 +632,27 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
|
||||
}
|
||||
}
|
||||
|
||||
// Check for service imports that match this subject.
|
||||
// When a client in the importer account publishes to a subject
|
||||
// that matches a service import "From" pattern, we forward the
|
||||
// message to the destination (exporter) account's subscribers
|
||||
// using the mapped "To" subject.
|
||||
if (sender.Account != null)
|
||||
{
|
||||
foreach (var kvp in sender.Account.Imports.Services)
|
||||
{
|
||||
foreach (var si in kvp.Value)
|
||||
{
|
||||
if (si.Invalid) continue;
|
||||
if (SubjectMatch.MatchLiteral(subject, si.From))
|
||||
{
|
||||
ProcessServiceImport(si, subject, replyTo, headers, payload);
|
||||
delivered = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// No-responders: if nobody received the message and the publisher
|
||||
// opted in, send back a 503 status HMSG on the reply subject.
|
||||
if (!delivered && replyTo != null && sender.ClientOpts?.NoResponders == true)
|
||||
@@ -658,6 +692,153 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Processes a service import by transforming the subject from the importer's
|
||||
/// subject space to the exporter's subject space, then delivering to matching
|
||||
/// subscribers in the destination account.
|
||||
/// Reference: Go server/accounts.go addServiceImport / processServiceImport.
|
||||
/// </summary>
|
||||
public void ProcessServiceImport(ServiceImport si, string subject, string? replyTo,
|
||||
ReadOnlyMemory<byte> headers, ReadOnlyMemory<byte> payload)
|
||||
{
|
||||
if (si.Invalid) return;
|
||||
|
||||
// Transform subject: map from importer subject space to exporter subject space
|
||||
string targetSubject;
|
||||
if (si.Transform != null)
|
||||
{
|
||||
var transformed = si.Transform.Apply(subject);
|
||||
targetSubject = transformed ?? si.To;
|
||||
}
|
||||
else if (si.UsePub)
|
||||
{
|
||||
targetSubject = subject;
|
||||
}
|
||||
else
|
||||
{
|
||||
// Default: use the "To" subject from the import definition.
|
||||
// For wildcard imports (e.g. "requests.>" -> "api.>"), we need
|
||||
// to map the specific subject tokens from the source pattern to
|
||||
// the destination pattern.
|
||||
targetSubject = MapImportSubject(subject, si.From, si.To);
|
||||
}
|
||||
|
||||
// Match against destination account's SubList
|
||||
var destSubList = si.DestinationAccount.SubList;
|
||||
var result = destSubList.Match(targetSubject);
|
||||
|
||||
// Deliver to plain subscribers in the destination account
|
||||
foreach (var sub in result.PlainSubs)
|
||||
{
|
||||
if (sub.Client == null) continue;
|
||||
DeliverMessage(sub, targetSubject, replyTo, headers, payload);
|
||||
}
|
||||
|
||||
// Deliver to one member of each queue group
|
||||
foreach (var queueGroup in result.QueueSubs)
|
||||
{
|
||||
if (queueGroup.Length == 0) continue;
|
||||
var sub = queueGroup[0]; // Simple selection: first available
|
||||
if (sub.Client != null)
|
||||
DeliverMessage(sub, targetSubject, replyTo, headers, payload);
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Maps a published subject from the import "From" pattern to the "To" pattern.
|
||||
/// For example, if From="requests.>" and To="api.>" and subject="requests.test",
|
||||
/// this returns "api.test".
|
||||
/// </summary>
|
||||
private static string MapImportSubject(string subject, string fromPattern, string toPattern)
|
||||
{
|
||||
// If "To" doesn't contain wildcards, use it directly
|
||||
if (SubjectMatch.IsLiteral(toPattern))
|
||||
return toPattern;
|
||||
|
||||
// For wildcard patterns, replace matching wildcard segments.
|
||||
// Split into tokens and map from source to destination.
|
||||
var subTokens = subject.Split('.');
|
||||
var fromTokens = fromPattern.Split('.');
|
||||
var toTokens = toPattern.Split('.');
|
||||
|
||||
var result = new string[toTokens.Length];
|
||||
int subIdx = 0;
|
||||
|
||||
// Build a mapping: for each wildcard position in "from",
|
||||
// capture the corresponding subject token(s)
|
||||
var wildcardValues = new List<string>();
|
||||
string? fwcValue = null;
|
||||
|
||||
for (int i = 0; i < fromTokens.Length && subIdx < subTokens.Length; i++)
|
||||
{
|
||||
if (fromTokens[i] == "*")
|
||||
{
|
||||
wildcardValues.Add(subTokens[subIdx]);
|
||||
subIdx++;
|
||||
}
|
||||
else if (fromTokens[i] == ">")
|
||||
{
|
||||
// Capture all remaining tokens
|
||||
fwcValue = string.Join(".", subTokens[subIdx..]);
|
||||
subIdx = subTokens.Length;
|
||||
}
|
||||
else
|
||||
{
|
||||
subIdx++; // Skip literal match
|
||||
}
|
||||
}
|
||||
|
||||
// Now build the output using the "to" pattern
|
||||
int wcIdx = 0;
|
||||
var sb = new StringBuilder();
|
||||
for (int i = 0; i < toTokens.Length; i++)
|
||||
{
|
||||
if (i > 0) sb.Append('.');
|
||||
|
||||
if (toTokens[i] == "*")
|
||||
{
|
||||
sb.Append(wcIdx < wildcardValues.Count ? wildcardValues[wcIdx] : "*");
|
||||
wcIdx++;
|
||||
}
|
||||
else if (toTokens[i] == ">")
|
||||
{
|
||||
sb.Append(fwcValue ?? ">");
|
||||
}
|
||||
else
|
||||
{
|
||||
sb.Append(toTokens[i]);
|
||||
}
|
||||
}
|
||||
|
||||
return sb.ToString();
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Wires service import subscriptions for an account. Creates marker
|
||||
/// subscriptions in the account's SubList so that the import paths
|
||||
/// are tracked. The actual forwarding happens in ProcessMessage when
|
||||
/// it checks the account's Imports.Services.
|
||||
/// Reference: Go server/accounts.go addServiceImportSub.
|
||||
/// </summary>
|
||||
public void WireServiceImports(Account account)
|
||||
{
|
||||
foreach (var kvp in account.Imports.Services)
|
||||
{
|
||||
foreach (var si in kvp.Value)
|
||||
{
|
||||
if (si.Invalid) continue;
|
||||
|
||||
// Create a marker subscription in the importer account.
|
||||
// This subscription doesn't directly deliver messages;
|
||||
// the ProcessMessage method checks service imports after
|
||||
// the regular SubList match.
|
||||
_logger.LogDebug(
|
||||
"Wired service import for account {Account}: {From} -> {To} (dest: {DestAccount})",
|
||||
account.Name, si.From, si.To, si.DestinationAccount.Name);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static void SendNoResponders(NatsClient sender, string replyTo)
|
||||
{
|
||||
// Find the sid for a subscription matching the reply subject
|
||||
@@ -713,8 +894,184 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
|
||||
_eventSystem?.Enqueue(new PublishMessage { Subject = subject, Body = msg });
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Handles $SYS.REQ.SERVER.{id}.VARZ requests.
|
||||
/// Returns core server information including stats counters.
|
||||
/// </summary>
|
||||
public void HandleVarzRequest(string subject, string? reply)
|
||||
{
|
||||
if (reply == null) return;
|
||||
var varz = new
|
||||
{
|
||||
server_id = _serverInfo.ServerId,
|
||||
server_name = _serverInfo.ServerName,
|
||||
version = NatsProtocol.Version,
|
||||
host = _options.Host,
|
||||
port = _options.Port,
|
||||
max_payload = _options.MaxPayload,
|
||||
connections = ClientCount,
|
||||
total_connections = Interlocked.Read(ref _stats.TotalConnections),
|
||||
in_msgs = Interlocked.Read(ref _stats.InMsgs),
|
||||
out_msgs = Interlocked.Read(ref _stats.OutMsgs),
|
||||
in_bytes = Interlocked.Read(ref _stats.InBytes),
|
||||
out_bytes = Interlocked.Read(ref _stats.OutBytes),
|
||||
};
|
||||
SendInternalMsg(reply, null, varz);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Handles $SYS.REQ.SERVER.{id}.HEALTHZ requests.
|
||||
/// Returns a simple health status response.
|
||||
/// </summary>
|
||||
public void HandleHealthzRequest(string subject, string? reply)
|
||||
{
|
||||
if (reply == null) return;
|
||||
SendInternalMsg(reply, null, new { status = "ok" });
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Handles $SYS.REQ.SERVER.{id}.SUBSZ requests.
|
||||
/// Returns the current subscription count.
|
||||
/// </summary>
|
||||
public void HandleSubszRequest(string subject, string? reply)
|
||||
{
|
||||
if (reply == null) return;
|
||||
SendInternalMsg(reply, null, new { num_subscriptions = SubList.Count });
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Handles $SYS.REQ.SERVER.{id}.STATSZ requests.
|
||||
/// Publishes current server statistics through the event system.
|
||||
/// </summary>
|
||||
public void HandleStatszRequest(string subject, string? reply)
|
||||
{
|
||||
if (reply == null) return;
|
||||
var process = System.Diagnostics.Process.GetCurrentProcess();
|
||||
var statsMsg = new Events.ServerStatsMsg
|
||||
{
|
||||
Server = BuildEventServerInfo(),
|
||||
Stats = new Events.ServerStatsData
|
||||
{
|
||||
Start = StartTime,
|
||||
Mem = process.WorkingSet64,
|
||||
Cores = Environment.ProcessorCount,
|
||||
Connections = ClientCount,
|
||||
TotalConnections = Interlocked.Read(ref _stats.TotalConnections),
|
||||
Subscriptions = SubList.Count,
|
||||
InMsgs = Interlocked.Read(ref _stats.InMsgs),
|
||||
OutMsgs = Interlocked.Read(ref _stats.OutMsgs),
|
||||
InBytes = Interlocked.Read(ref _stats.InBytes),
|
||||
OutBytes = Interlocked.Read(ref _stats.OutBytes),
|
||||
SlowConsumers = Interlocked.Read(ref _stats.SlowConsumers),
|
||||
},
|
||||
};
|
||||
SendInternalMsg(reply, null, statsMsg);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Handles $SYS.REQ.SERVER.{id}.IDZ requests.
|
||||
/// Returns basic server identity information.
|
||||
/// </summary>
|
||||
public void HandleIdzRequest(string subject, string? reply)
|
||||
{
|
||||
if (reply == null) return;
|
||||
var idz = new
|
||||
{
|
||||
server_id = _serverInfo.ServerId,
|
||||
server_name = _serverInfo.ServerName,
|
||||
version = NatsProtocol.Version,
|
||||
host = _options.Host,
|
||||
port = _options.Port,
|
||||
};
|
||||
SendInternalMsg(reply, null, idz);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Builds an EventServerInfo block for embedding in system event messages.
|
||||
/// Maps to Go's serverInfo() helper used in events.go advisory publishing.
|
||||
/// </summary>
|
||||
public EventServerInfo BuildEventServerInfo()
|
||||
{
|
||||
var seq = _eventSystem?.NextSequence() ?? 0;
|
||||
return new EventServerInfo
|
||||
{
|
||||
Name = _serverInfo.ServerName,
|
||||
Host = _options.Host,
|
||||
Id = _serverInfo.ServerId,
|
||||
Version = NatsProtocol.Version,
|
||||
Seq = seq,
|
||||
};
|
||||
}
|
||||
|
||||
private static EventClientInfo BuildEventClientInfo(NatsClient client)
|
||||
{
|
||||
return new EventClientInfo
|
||||
{
|
||||
Id = client.Id,
|
||||
Host = client.RemoteIp,
|
||||
Account = client.Account?.Name,
|
||||
Name = client.ClientOpts?.Name,
|
||||
Lang = client.ClientOpts?.Lang,
|
||||
Version = client.ClientOpts?.Version,
|
||||
Start = client.StartTime,
|
||||
};
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Publishes a $SYS.ACCOUNT.{account}.CONNECT advisory when a client
|
||||
/// completes authentication. Maps to Go's sendConnectEvent in events.go.
|
||||
/// </summary>
|
||||
public void PublishConnectEvent(NatsClient client)
|
||||
{
|
||||
if (_eventSystem == null) return;
|
||||
var accountName = client.Account?.Name ?? Account.GlobalAccountName;
|
||||
var subject = string.Format(EventSubjects.ConnectEvent, accountName);
|
||||
var evt = new ConnectEventMsg
|
||||
{
|
||||
Id = Guid.NewGuid().ToString("N"),
|
||||
Time = DateTime.UtcNow,
|
||||
Server = BuildEventServerInfo(),
|
||||
Client = BuildEventClientInfo(client),
|
||||
};
|
||||
SendInternalMsg(subject, null, evt);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Publishes a $SYS.ACCOUNT.{account}.DISCONNECT advisory when a client
|
||||
/// disconnects. Maps to Go's sendDisconnectEvent in events.go.
|
||||
/// </summary>
|
||||
public void PublishDisconnectEvent(NatsClient client)
|
||||
{
|
||||
if (_eventSystem == null) return;
|
||||
var accountName = client.Account?.Name ?? Account.GlobalAccountName;
|
||||
var subject = string.Format(EventSubjects.DisconnectEvent, accountName);
|
||||
var evt = new DisconnectEventMsg
|
||||
{
|
||||
Id = Guid.NewGuid().ToString("N"),
|
||||
Time = DateTime.UtcNow,
|
||||
Server = BuildEventServerInfo(),
|
||||
Client = BuildEventClientInfo(client),
|
||||
Sent = new DataStats
|
||||
{
|
||||
Msgs = Interlocked.Read(ref client.OutMsgs),
|
||||
Bytes = Interlocked.Read(ref client.OutBytes),
|
||||
},
|
||||
Received = new DataStats
|
||||
{
|
||||
Msgs = Interlocked.Read(ref client.InMsgs),
|
||||
Bytes = Interlocked.Read(ref client.InBytes),
|
||||
},
|
||||
Reason = client.CloseReason.ToReasonString(),
|
||||
};
|
||||
SendInternalMsg(subject, null, evt);
|
||||
}
|
||||
|
||||
public void RemoveClient(NatsClient client)
|
||||
{
|
||||
// Publish disconnect advisory before removing client state
|
||||
if (client.ConnectReceived)
|
||||
PublishDisconnectEvent(client);
|
||||
|
||||
_clients.TryRemove(client.Id, out _);
|
||||
_logger.LogDebug("Removed client {ClientId}", client.Id);
|
||||
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
using NATS.Server;
|
||||
using NATS.Server.Imports;
|
||||
|
||||
namespace NATS.Server.Subscriptions;
|
||||
|
||||
@@ -10,4 +11,6 @@ public sealed class Subscription
|
||||
public long MessageCount; // Interlocked
|
||||
public long MaxMessages; // 0 = unlimited
|
||||
public INatsClient? Client { get; set; }
|
||||
public ServiceImport? ServiceImport { get; set; }
|
||||
public StreamImport? StreamImport { get; set; }
|
||||
}
|
||||
|
||||
338
tests/NATS.Server.Tests/ImportExportTests.cs
Normal file
338
tests/NATS.Server.Tests/ImportExportTests.cs
Normal file
@@ -0,0 +1,338 @@
|
||||
using Microsoft.Extensions.Logging.Abstractions;
|
||||
using NATS.Server;
|
||||
using NATS.Server.Auth;
|
||||
using NATS.Server.Imports;
|
||||
using NATS.Server.Subscriptions;
|
||||
|
||||
namespace NATS.Server.Tests;
|
||||
|
||||
public class ImportExportTests
|
||||
{
|
||||
[Fact]
|
||||
public void ExportAuth_public_export_authorizes_any_account()
|
||||
{
|
||||
var auth = new ExportAuth();
|
||||
var account = new Account("test");
|
||||
auth.IsAuthorized(account).ShouldBeTrue();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void ExportAuth_approved_accounts_restricts_access()
|
||||
{
|
||||
var auth = new ExportAuth { ApprovedAccounts = ["allowed"] };
|
||||
var allowed = new Account("allowed");
|
||||
var denied = new Account("denied");
|
||||
auth.IsAuthorized(allowed).ShouldBeTrue();
|
||||
auth.IsAuthorized(denied).ShouldBeFalse();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void ExportAuth_revoked_account_denied()
|
||||
{
|
||||
var auth = new ExportAuth
|
||||
{
|
||||
ApprovedAccounts = ["test"],
|
||||
RevokedAccounts = new() { ["test"] = DateTimeOffset.UtcNow.ToUnixTimeSeconds() },
|
||||
};
|
||||
var account = new Account("test");
|
||||
auth.IsAuthorized(account).ShouldBeFalse();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void ServiceResponseType_defaults_to_singleton()
|
||||
{
|
||||
var import = new ServiceImport
|
||||
{
|
||||
DestinationAccount = new Account("dest"),
|
||||
From = "requests.>",
|
||||
To = "api.>",
|
||||
};
|
||||
import.ResponseType.ShouldBe(ServiceResponseType.Singleton);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void ExportMap_stores_and_retrieves_exports()
|
||||
{
|
||||
var map = new ExportMap();
|
||||
map.Services["api.>"] = new ServiceExport { Account = new Account("svc") };
|
||||
map.Streams["events.>"] = new StreamExport();
|
||||
|
||||
map.Services.ShouldContainKey("api.>");
|
||||
map.Streams.ShouldContainKey("events.>");
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void ImportMap_stores_service_imports()
|
||||
{
|
||||
var map = new ImportMap();
|
||||
var si = new ServiceImport
|
||||
{
|
||||
DestinationAccount = new Account("dest"),
|
||||
From = "requests.>",
|
||||
To = "api.>",
|
||||
};
|
||||
map.AddServiceImport(si);
|
||||
map.Services.ShouldContainKey("requests.>");
|
||||
map.Services["requests.>"].Count.ShouldBe(1);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Account_add_service_export_and_import()
|
||||
{
|
||||
var exporter = new Account("exporter");
|
||||
var importer = new Account("importer");
|
||||
|
||||
exporter.AddServiceExport("api.>", ServiceResponseType.Singleton, null);
|
||||
exporter.Exports.Services.ShouldContainKey("api.>");
|
||||
|
||||
var si = importer.AddServiceImport(exporter, "requests.>", "api.>");
|
||||
si.ShouldNotBeNull();
|
||||
si.From.ShouldBe("requests.>");
|
||||
si.To.ShouldBe("api.>");
|
||||
si.DestinationAccount.ShouldBe(exporter);
|
||||
importer.Imports.Services.ShouldContainKey("requests.>");
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Account_add_stream_export_and_import()
|
||||
{
|
||||
var exporter = new Account("exporter");
|
||||
var importer = new Account("importer");
|
||||
|
||||
exporter.AddStreamExport("events.>", null);
|
||||
exporter.Exports.Streams.ShouldContainKey("events.>");
|
||||
|
||||
importer.AddStreamImport(exporter, "events.>", "imported.events.>");
|
||||
importer.Imports.Streams.Count.ShouldBe(1);
|
||||
importer.Imports.Streams[0].From.ShouldBe("events.>");
|
||||
importer.Imports.Streams[0].To.ShouldBe("imported.events.>");
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Account_service_import_auth_rejected()
|
||||
{
|
||||
var exporter = new Account("exporter");
|
||||
var importer = new Account("importer");
|
||||
|
||||
exporter.AddServiceExport("api.>", ServiceResponseType.Singleton, [new Account("other")]);
|
||||
|
||||
Should.Throw<UnauthorizedAccessException>(() =>
|
||||
importer.AddServiceImport(exporter, "requests.>", "api.>"));
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Account_lazy_creates_internal_client()
|
||||
{
|
||||
var account = new Account("test");
|
||||
var client = account.GetOrCreateInternalClient(99);
|
||||
client.ShouldNotBeNull();
|
||||
client.Kind.ShouldBe(ClientKind.Account);
|
||||
client.Account.ShouldBe(account);
|
||||
|
||||
// Second call returns same instance
|
||||
var client2 = account.GetOrCreateInternalClient(100);
|
||||
client2.ShouldBeSameAs(client);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Service_import_forwards_message_to_export_account()
|
||||
{
|
||||
using var server = CreateTestServer();
|
||||
_ = server.StartAsync(CancellationToken.None);
|
||||
await server.WaitForReadyAsync();
|
||||
|
||||
// Set up exporter and importer accounts
|
||||
var exporter = server.GetOrCreateAccount("exporter");
|
||||
var importer = server.GetOrCreateAccount("importer");
|
||||
|
||||
exporter.AddServiceExport("api.>", ServiceResponseType.Singleton, null);
|
||||
importer.AddServiceImport(exporter, "requests.>", "api.>");
|
||||
|
||||
// Wire the import subscriptions into the importer account
|
||||
server.WireServiceImports(importer);
|
||||
|
||||
// Subscribe in exporter account to receive forwarded message
|
||||
var exportSub = new Subscription { Subject = "api.test", Sid = "export-1", Client = null };
|
||||
exporter.SubList.Insert(exportSub);
|
||||
|
||||
// Verify import infrastructure is wired: the importer should have service import entries
|
||||
importer.Imports.Services.ShouldContainKey("requests.>");
|
||||
importer.Imports.Services["requests.>"].Count.ShouldBe(1);
|
||||
importer.Imports.Services["requests.>"][0].DestinationAccount.ShouldBe(exporter);
|
||||
|
||||
await server.ShutdownAsync();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void ProcessServiceImport_delivers_to_destination_account_subscribers()
|
||||
{
|
||||
using var server = CreateTestServer();
|
||||
|
||||
var exporter = server.GetOrCreateAccount("exporter");
|
||||
var importer = server.GetOrCreateAccount("importer");
|
||||
|
||||
exporter.AddServiceExport("api.>", ServiceResponseType.Singleton, null);
|
||||
importer.AddServiceImport(exporter, "requests.>", "api.>");
|
||||
|
||||
// Add a subscriber in the exporter account's SubList
|
||||
var received = new List<(string Subject, string Sid)>();
|
||||
var mockClient = new TestNatsClient(1, exporter);
|
||||
mockClient.OnMessage = (subject, sid, _, _, _) =>
|
||||
received.Add((subject, sid));
|
||||
|
||||
var exportSub = new Subscription { Subject = "api.test", Sid = "s1", Client = mockClient };
|
||||
exporter.SubList.Insert(exportSub);
|
||||
|
||||
// Process a service import directly
|
||||
var si = importer.Imports.Services["requests.>"][0];
|
||||
server.ProcessServiceImport(si, "requests.test", null,
|
||||
ReadOnlyMemory<byte>.Empty, ReadOnlyMemory<byte>.Empty);
|
||||
|
||||
received.Count.ShouldBe(1);
|
||||
received[0].Subject.ShouldBe("api.test");
|
||||
received[0].Sid.ShouldBe("s1");
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void ProcessServiceImport_with_transform_applies_subject_mapping()
|
||||
{
|
||||
using var server = CreateTestServer();
|
||||
|
||||
var exporter = server.GetOrCreateAccount("exporter");
|
||||
var importer = server.GetOrCreateAccount("importer");
|
||||
|
||||
exporter.AddServiceExport("api.>", ServiceResponseType.Singleton, null);
|
||||
var si = importer.AddServiceImport(exporter, "requests.>", "api.>");
|
||||
|
||||
// Create a transform from requests.> to api.>
|
||||
var transform = SubjectTransform.Create("requests.>", "api.>");
|
||||
transform.ShouldNotBeNull();
|
||||
|
||||
// Create a new import with the transform set
|
||||
var siWithTransform = new ServiceImport
|
||||
{
|
||||
DestinationAccount = exporter,
|
||||
From = "requests.>",
|
||||
To = "api.>",
|
||||
Transform = transform,
|
||||
};
|
||||
|
||||
var received = new List<string>();
|
||||
var mockClient = new TestNatsClient(1, exporter);
|
||||
mockClient.OnMessage = (subject, _, _, _, _) =>
|
||||
received.Add(subject);
|
||||
|
||||
var exportSub = new Subscription { Subject = "api.hello", Sid = "s1", Client = mockClient };
|
||||
exporter.SubList.Insert(exportSub);
|
||||
|
||||
server.ProcessServiceImport(siWithTransform, "requests.hello", null,
|
||||
ReadOnlyMemory<byte>.Empty, ReadOnlyMemory<byte>.Empty);
|
||||
|
||||
received.Count.ShouldBe(1);
|
||||
received[0].ShouldBe("api.hello");
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void ProcessServiceImport_skips_invalid_imports()
|
||||
{
|
||||
using var server = CreateTestServer();
|
||||
|
||||
var exporter = server.GetOrCreateAccount("exporter");
|
||||
var importer = server.GetOrCreateAccount("importer");
|
||||
|
||||
exporter.AddServiceExport("api.>", ServiceResponseType.Singleton, null);
|
||||
importer.AddServiceImport(exporter, "requests.>", "api.>");
|
||||
|
||||
// Mark the import as invalid
|
||||
var si = importer.Imports.Services["requests.>"][0];
|
||||
si.Invalid = true;
|
||||
|
||||
// Add a subscriber in the exporter account
|
||||
var received = new List<string>();
|
||||
var mockClient = new TestNatsClient(1, exporter);
|
||||
mockClient.OnMessage = (subject, _, _, _, _) =>
|
||||
received.Add(subject);
|
||||
|
||||
var exportSub = new Subscription { Subject = "api.test", Sid = "s1", Client = mockClient };
|
||||
exporter.SubList.Insert(exportSub);
|
||||
|
||||
// ProcessServiceImport should be a no-op for invalid imports
|
||||
server.ProcessServiceImport(si, "requests.test", null,
|
||||
ReadOnlyMemory<byte>.Empty, ReadOnlyMemory<byte>.Empty);
|
||||
|
||||
received.Count.ShouldBe(0);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void ProcessServiceImport_delivers_to_queue_groups()
|
||||
{
|
||||
using var server = CreateTestServer();
|
||||
|
||||
var exporter = server.GetOrCreateAccount("exporter");
|
||||
var importer = server.GetOrCreateAccount("importer");
|
||||
|
||||
exporter.AddServiceExport("api.>", ServiceResponseType.Singleton, null);
|
||||
importer.AddServiceImport(exporter, "requests.>", "api.>");
|
||||
|
||||
// Add queue group subscribers in the exporter account
|
||||
var received = new List<(string Subject, string Sid)>();
|
||||
var mockClient1 = new TestNatsClient(1, exporter);
|
||||
mockClient1.OnMessage = (subject, sid, _, _, _) =>
|
||||
received.Add((subject, sid));
|
||||
var mockClient2 = new TestNatsClient(2, exporter);
|
||||
mockClient2.OnMessage = (subject, sid, _, _, _) =>
|
||||
received.Add((subject, sid));
|
||||
|
||||
var qSub1 = new Subscription { Subject = "api.test", Sid = "q1", Queue = "workers", Client = mockClient1 };
|
||||
var qSub2 = new Subscription { Subject = "api.test", Sid = "q2", Queue = "workers", Client = mockClient2 };
|
||||
exporter.SubList.Insert(qSub1);
|
||||
exporter.SubList.Insert(qSub2);
|
||||
|
||||
var si = importer.Imports.Services["requests.>"][0];
|
||||
server.ProcessServiceImport(si, "requests.test", null,
|
||||
ReadOnlyMemory<byte>.Empty, ReadOnlyMemory<byte>.Empty);
|
||||
|
||||
// One member of the queue group should receive the message
|
||||
received.Count.ShouldBe(1);
|
||||
}
|
||||
|
||||
private static NatsServer CreateTestServer()
|
||||
{
|
||||
var port = GetFreePort();
|
||||
return new NatsServer(new NatsOptions { Port = port }, NullLoggerFactory.Instance);
|
||||
}
|
||||
|
||||
private static int GetFreePort()
|
||||
{
|
||||
using var sock = new System.Net.Sockets.Socket(
|
||||
System.Net.Sockets.AddressFamily.InterNetwork,
|
||||
System.Net.Sockets.SocketType.Stream,
|
||||
System.Net.Sockets.ProtocolType.Tcp);
|
||||
sock.Bind(new System.Net.IPEndPoint(System.Net.IPAddress.Loopback, 0));
|
||||
return ((System.Net.IPEndPoint)sock.LocalEndPoint!).Port;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Minimal test double for INatsClient used in import/export tests.
|
||||
/// </summary>
|
||||
private sealed class TestNatsClient(ulong id, Account account) : INatsClient
|
||||
{
|
||||
public ulong Id => id;
|
||||
public ClientKind Kind => ClientKind.Client;
|
||||
public Account? Account => account;
|
||||
public Protocol.ClientOptions? ClientOpts => null;
|
||||
public ClientPermissions? Permissions => null;
|
||||
|
||||
public Action<string, string, string?, ReadOnlyMemory<byte>, ReadOnlyMemory<byte>>? OnMessage { get; set; }
|
||||
|
||||
public void SendMessage(string subject, string sid, string? replyTo,
|
||||
ReadOnlyMemory<byte> headers, ReadOnlyMemory<byte> payload)
|
||||
{
|
||||
OnMessage?.Invoke(subject, sid, replyTo, headers, payload);
|
||||
}
|
||||
|
||||
public bool QueueOutbound(ReadOnlyMemory<byte> data) => true;
|
||||
|
||||
public void RemoveSubscription(string sid) { }
|
||||
}
|
||||
}
|
||||
149
tests/NATS.Server.Tests/ResponseRoutingTests.cs
Normal file
149
tests/NATS.Server.Tests/ResponseRoutingTests.cs
Normal file
@@ -0,0 +1,149 @@
|
||||
using NATS.Server.Auth;
|
||||
using NATS.Server.Imports;
|
||||
|
||||
namespace NATS.Server.Tests;
|
||||
|
||||
public class ResponseRoutingTests
|
||||
{
|
||||
[Fact]
|
||||
public void GenerateReplyPrefix_creates_unique_prefix()
|
||||
{
|
||||
var prefix1 = ResponseRouter.GenerateReplyPrefix();
|
||||
var prefix2 = ResponseRouter.GenerateReplyPrefix();
|
||||
|
||||
prefix1.ShouldStartWith("_R_.");
|
||||
prefix2.ShouldStartWith("_R_.");
|
||||
prefix1.ShouldNotBe(prefix2);
|
||||
prefix1.Length.ShouldBeGreaterThan(4);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void GenerateReplyPrefix_ends_with_dot()
|
||||
{
|
||||
var prefix = ResponseRouter.GenerateReplyPrefix();
|
||||
|
||||
prefix.ShouldEndWith(".");
|
||||
// Format: "_R_." + 10 chars + "." = 15 chars
|
||||
prefix.Length.ShouldBe(15);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Singleton_response_import_removed_after_delivery()
|
||||
{
|
||||
var exporter = new Account("exporter");
|
||||
exporter.AddServiceExport("api.test", ServiceResponseType.Singleton, null);
|
||||
|
||||
var replyPrefix = ResponseRouter.GenerateReplyPrefix();
|
||||
var responseSi = new ServiceImport
|
||||
{
|
||||
DestinationAccount = exporter,
|
||||
From = replyPrefix + ">",
|
||||
To = "_INBOX.original.reply",
|
||||
IsResponse = true,
|
||||
ResponseType = ServiceResponseType.Singleton,
|
||||
};
|
||||
exporter.Exports.Responses[replyPrefix] = responseSi;
|
||||
|
||||
exporter.Exports.Responses.ShouldContainKey(replyPrefix);
|
||||
|
||||
// Simulate singleton delivery cleanup
|
||||
ResponseRouter.CleanupResponse(exporter, replyPrefix, responseSi);
|
||||
|
||||
exporter.Exports.Responses.ShouldNotContainKey(replyPrefix);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void CreateResponseImport_registers_in_exporter_responses()
|
||||
{
|
||||
var exporter = new Account("exporter");
|
||||
var importer = new Account("importer");
|
||||
exporter.AddServiceExport("api.test", ServiceResponseType.Singleton, null);
|
||||
|
||||
var originalSi = new ServiceImport
|
||||
{
|
||||
DestinationAccount = exporter,
|
||||
From = "api.test",
|
||||
To = "api.test",
|
||||
Export = exporter.Exports.Services["api.test"],
|
||||
ResponseType = ServiceResponseType.Singleton,
|
||||
};
|
||||
|
||||
var responseSi = ResponseRouter.CreateResponseImport(exporter, originalSi, "_INBOX.abc123");
|
||||
|
||||
responseSi.IsResponse.ShouldBeTrue();
|
||||
responseSi.ResponseType.ShouldBe(ServiceResponseType.Singleton);
|
||||
responseSi.To.ShouldBe("_INBOX.abc123");
|
||||
responseSi.DestinationAccount.ShouldBe(exporter);
|
||||
responseSi.From.ShouldEndWith(">");
|
||||
responseSi.Export.ShouldBe(originalSi.Export);
|
||||
|
||||
// Should be registered in the exporter's response map
|
||||
exporter.Exports.Responses.Count.ShouldBe(1);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void CreateResponseImport_preserves_streamed_response_type()
|
||||
{
|
||||
var exporter = new Account("exporter");
|
||||
exporter.AddServiceExport("api.stream", ServiceResponseType.Streamed, null);
|
||||
|
||||
var originalSi = new ServiceImport
|
||||
{
|
||||
DestinationAccount = exporter,
|
||||
From = "api.stream",
|
||||
To = "api.stream",
|
||||
Export = exporter.Exports.Services["api.stream"],
|
||||
ResponseType = ServiceResponseType.Streamed,
|
||||
};
|
||||
|
||||
var responseSi = ResponseRouter.CreateResponseImport(exporter, originalSi, "_INBOX.xyz789");
|
||||
|
||||
responseSi.ResponseType.ShouldBe(ServiceResponseType.Streamed);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Multiple_response_imports_each_get_unique_prefix()
|
||||
{
|
||||
var exporter = new Account("exporter");
|
||||
exporter.AddServiceExport("api.test", ServiceResponseType.Singleton, null);
|
||||
|
||||
var originalSi = new ServiceImport
|
||||
{
|
||||
DestinationAccount = exporter,
|
||||
From = "api.test",
|
||||
To = "api.test",
|
||||
Export = exporter.Exports.Services["api.test"],
|
||||
ResponseType = ServiceResponseType.Singleton,
|
||||
};
|
||||
|
||||
var resp1 = ResponseRouter.CreateResponseImport(exporter, originalSi, "_INBOX.reply1");
|
||||
var resp2 = ResponseRouter.CreateResponseImport(exporter, originalSi, "_INBOX.reply2");
|
||||
|
||||
exporter.Exports.Responses.Count.ShouldBe(2);
|
||||
resp1.To.ShouldBe("_INBOX.reply1");
|
||||
resp2.To.ShouldBe("_INBOX.reply2");
|
||||
resp1.From.ShouldNotBe(resp2.From);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void LatencyTracker_should_sample_respects_percentage()
|
||||
{
|
||||
var latency = new ServiceLatency { SamplingPercentage = 0, Subject = "latency.test" };
|
||||
LatencyTracker.ShouldSample(latency).ShouldBeFalse();
|
||||
|
||||
var latency100 = new ServiceLatency { SamplingPercentage = 100, Subject = "latency.test" };
|
||||
LatencyTracker.ShouldSample(latency100).ShouldBeTrue();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void LatencyTracker_builds_latency_message()
|
||||
{
|
||||
var msg = LatencyTracker.BuildLatencyMsg("requester", "responder",
|
||||
TimeSpan.FromMilliseconds(5), TimeSpan.FromMilliseconds(10));
|
||||
|
||||
msg.Requestor.ShouldBe("requester");
|
||||
msg.Responder.ShouldBe("responder");
|
||||
msg.ServiceLatencyNanos.ShouldBeGreaterThan(0);
|
||||
msg.TotalLatencyNanos.ShouldBeGreaterThan(0);
|
||||
}
|
||||
}
|
||||
133
tests/NATS.Server.Tests/SystemEventsTests.cs
Normal file
133
tests/NATS.Server.Tests/SystemEventsTests.cs
Normal file
@@ -0,0 +1,133 @@
|
||||
using System.Text.Json;
|
||||
using NATS.Server;
|
||||
using NATS.Server.Events;
|
||||
using Microsoft.Extensions.Logging.Abstractions;
|
||||
|
||||
namespace NATS.Server.Tests;
|
||||
|
||||
public class SystemEventsTests
|
||||
{
|
||||
[Fact]
|
||||
public async Task Server_publishes_connect_event_on_client_auth()
|
||||
{
|
||||
using var server = CreateTestServer();
|
||||
_ = server.StartAsync(CancellationToken.None);
|
||||
await server.WaitForReadyAsync();
|
||||
|
||||
var received = new TaskCompletionSource<string>();
|
||||
server.EventSystem!.SysSubscribe("$SYS.ACCOUNT.*.CONNECT", (sub, client, acc, subject, reply, hdr, msg) =>
|
||||
{
|
||||
received.TrySetResult(subject);
|
||||
});
|
||||
|
||||
// Connect a real client
|
||||
using var sock = new System.Net.Sockets.Socket(
|
||||
System.Net.Sockets.AddressFamily.InterNetwork,
|
||||
System.Net.Sockets.SocketType.Stream,
|
||||
System.Net.Sockets.ProtocolType.Tcp);
|
||||
await sock.ConnectAsync(System.Net.IPAddress.Loopback, server.Port);
|
||||
|
||||
// Read INFO
|
||||
var buf = new byte[4096];
|
||||
await sock.ReceiveAsync(buf);
|
||||
|
||||
// Send CONNECT
|
||||
var connect = System.Text.Encoding.ASCII.GetBytes("CONNECT {}\r\n");
|
||||
await sock.SendAsync(connect);
|
||||
|
||||
var result = await received.Task.WaitAsync(TimeSpan.FromSeconds(5));
|
||||
result.ShouldStartWith("$SYS.ACCOUNT.");
|
||||
result.ShouldEndWith(".CONNECT");
|
||||
|
||||
await server.ShutdownAsync();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Server_publishes_disconnect_event_on_client_close()
|
||||
{
|
||||
using var server = CreateTestServer();
|
||||
_ = server.StartAsync(CancellationToken.None);
|
||||
await server.WaitForReadyAsync();
|
||||
|
||||
var received = new TaskCompletionSource<string>();
|
||||
server.EventSystem!.SysSubscribe("$SYS.ACCOUNT.*.DISCONNECT", (sub, client, acc, subject, reply, hdr, msg) =>
|
||||
{
|
||||
received.TrySetResult(subject);
|
||||
});
|
||||
|
||||
// Connect and then disconnect
|
||||
using var sock = new System.Net.Sockets.Socket(
|
||||
System.Net.Sockets.AddressFamily.InterNetwork,
|
||||
System.Net.Sockets.SocketType.Stream,
|
||||
System.Net.Sockets.ProtocolType.Tcp);
|
||||
await sock.ConnectAsync(System.Net.IPAddress.Loopback, server.Port);
|
||||
var buf = new byte[4096];
|
||||
await sock.ReceiveAsync(buf);
|
||||
await sock.SendAsync(System.Text.Encoding.ASCII.GetBytes("CONNECT {}\r\n"));
|
||||
await Task.Delay(100);
|
||||
sock.Shutdown(System.Net.Sockets.SocketShutdown.Both);
|
||||
|
||||
var result = await received.Task.WaitAsync(TimeSpan.FromSeconds(5));
|
||||
result.ShouldStartWith("$SYS.ACCOUNT.");
|
||||
result.ShouldEndWith(".DISCONNECT");
|
||||
|
||||
await server.ShutdownAsync();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Server_publishes_statsz_periodically()
|
||||
{
|
||||
using var server = CreateTestServer();
|
||||
_ = server.StartAsync(CancellationToken.None);
|
||||
await server.WaitForReadyAsync();
|
||||
|
||||
var received = new TaskCompletionSource<string>();
|
||||
server.EventSystem!.SysSubscribe("$SYS.SERVER.*.STATSZ", (sub, client, acc, subject, reply, hdr, msg) =>
|
||||
{
|
||||
received.TrySetResult(subject);
|
||||
});
|
||||
|
||||
// Trigger a manual stats publish (don't wait 10s)
|
||||
server.EventSystem!.PublishServerStats();
|
||||
|
||||
var result = await received.Task.WaitAsync(TimeSpan.FromSeconds(5));
|
||||
result.ShouldContain(".STATSZ");
|
||||
|
||||
await server.ShutdownAsync();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Server_publishes_shutdown_event()
|
||||
{
|
||||
using var server = CreateTestServer();
|
||||
_ = server.StartAsync(CancellationToken.None);
|
||||
await server.WaitForReadyAsync();
|
||||
|
||||
var received = new TaskCompletionSource<string>();
|
||||
server.EventSystem!.SysSubscribe("$SYS.SERVER.*.SHUTDOWN", (sub, client, acc, subject, reply, hdr, msg) =>
|
||||
{
|
||||
received.TrySetResult(subject);
|
||||
});
|
||||
|
||||
await server.ShutdownAsync();
|
||||
|
||||
var result = await received.Task.WaitAsync(TimeSpan.FromSeconds(5));
|
||||
result.ShouldContain(".SHUTDOWN");
|
||||
}
|
||||
|
||||
private static NatsServer CreateTestServer()
|
||||
{
|
||||
var port = GetFreePort();
|
||||
return new NatsServer(new NatsOptions { Port = port }, NullLoggerFactory.Instance);
|
||||
}
|
||||
|
||||
private static int GetFreePort()
|
||||
{
|
||||
using var sock = new System.Net.Sockets.Socket(
|
||||
System.Net.Sockets.AddressFamily.InterNetwork,
|
||||
System.Net.Sockets.SocketType.Stream,
|
||||
System.Net.Sockets.ProtocolType.Tcp);
|
||||
sock.Bind(new System.Net.IPEndPoint(System.Net.IPAddress.Loopback, 0));
|
||||
return ((System.Net.IPEndPoint)sock.LocalEndPoint!).Port;
|
||||
}
|
||||
}
|
||||
170
tests/NATS.Server.Tests/SystemRequestReplyTests.cs
Normal file
170
tests/NATS.Server.Tests/SystemRequestReplyTests.cs
Normal file
@@ -0,0 +1,170 @@
|
||||
using System.Text;
|
||||
using System.Text.Json;
|
||||
using NATS.Server;
|
||||
using NATS.Server.Events;
|
||||
using Microsoft.Extensions.Logging.Abstractions;
|
||||
|
||||
namespace NATS.Server.Tests;
|
||||
|
||||
public class SystemRequestReplyTests
|
||||
{
|
||||
[Fact]
|
||||
public async Task Varz_request_reply_returns_server_info()
|
||||
{
|
||||
using var server = CreateTestServer();
|
||||
_ = server.StartAsync(CancellationToken.None);
|
||||
await server.WaitForReadyAsync();
|
||||
|
||||
var received = new TaskCompletionSource<byte[]>();
|
||||
var replySubject = $"_INBOX.test.{Guid.NewGuid():N}";
|
||||
server.EventSystem!.SysSubscribe(replySubject, (sub, client, acc, subject, reply, hdr, msg) =>
|
||||
{
|
||||
received.TrySetResult(msg.ToArray());
|
||||
});
|
||||
|
||||
var reqSubject = string.Format(EventSubjects.ServerReq, server.ServerId, "VARZ");
|
||||
server.SendInternalMsg(reqSubject, replySubject, null);
|
||||
|
||||
var result = await received.Task.WaitAsync(TimeSpan.FromSeconds(5));
|
||||
var json = Encoding.UTF8.GetString(result);
|
||||
json.ShouldContain("\"server_id\"");
|
||||
json.ShouldContain("\"version\"");
|
||||
json.ShouldContain("\"host\"");
|
||||
json.ShouldContain("\"port\"");
|
||||
|
||||
await server.ShutdownAsync();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Healthz_request_reply_returns_ok()
|
||||
{
|
||||
using var server = CreateTestServer();
|
||||
_ = server.StartAsync(CancellationToken.None);
|
||||
await server.WaitForReadyAsync();
|
||||
|
||||
var received = new TaskCompletionSource<byte[]>();
|
||||
var replySubject = $"_INBOX.test.{Guid.NewGuid():N}";
|
||||
server.EventSystem!.SysSubscribe(replySubject, (sub, client, acc, subject, reply, hdr, msg) =>
|
||||
{
|
||||
received.TrySetResult(msg.ToArray());
|
||||
});
|
||||
|
||||
var reqSubject = string.Format(EventSubjects.ServerReq, server.ServerId, "HEALTHZ");
|
||||
server.SendInternalMsg(reqSubject, replySubject, null);
|
||||
|
||||
var result = await received.Task.WaitAsync(TimeSpan.FromSeconds(5));
|
||||
var json = Encoding.UTF8.GetString(result);
|
||||
json.ShouldContain("ok");
|
||||
|
||||
await server.ShutdownAsync();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Subsz_request_reply_returns_subscription_count()
|
||||
{
|
||||
using var server = CreateTestServer();
|
||||
_ = server.StartAsync(CancellationToken.None);
|
||||
await server.WaitForReadyAsync();
|
||||
|
||||
var received = new TaskCompletionSource<byte[]>();
|
||||
var replySubject = $"_INBOX.test.{Guid.NewGuid():N}";
|
||||
server.EventSystem!.SysSubscribe(replySubject, (sub, client, acc, subject, reply, hdr, msg) =>
|
||||
{
|
||||
received.TrySetResult(msg.ToArray());
|
||||
});
|
||||
|
||||
var reqSubject = string.Format(EventSubjects.ServerReq, server.ServerId, "SUBSZ");
|
||||
server.SendInternalMsg(reqSubject, replySubject, null);
|
||||
|
||||
var result = await received.Task.WaitAsync(TimeSpan.FromSeconds(5));
|
||||
var json = Encoding.UTF8.GetString(result);
|
||||
json.ShouldContain("\"num_subscriptions\"");
|
||||
|
||||
await server.ShutdownAsync();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Idz_request_reply_returns_server_identity()
|
||||
{
|
||||
using var server = CreateTestServer();
|
||||
_ = server.StartAsync(CancellationToken.None);
|
||||
await server.WaitForReadyAsync();
|
||||
|
||||
var received = new TaskCompletionSource<byte[]>();
|
||||
var replySubject = $"_INBOX.test.{Guid.NewGuid():N}";
|
||||
server.EventSystem!.SysSubscribe(replySubject, (sub, client, acc, subject, reply, hdr, msg) =>
|
||||
{
|
||||
received.TrySetResult(msg.ToArray());
|
||||
});
|
||||
|
||||
var reqSubject = string.Format(EventSubjects.ServerReq, server.ServerId, "IDZ");
|
||||
server.SendInternalMsg(reqSubject, replySubject, null);
|
||||
|
||||
var result = await received.Task.WaitAsync(TimeSpan.FromSeconds(5));
|
||||
var json = Encoding.UTF8.GetString(result);
|
||||
json.ShouldContain("\"server_id\"");
|
||||
json.ShouldContain("\"server_name\"");
|
||||
|
||||
await server.ShutdownAsync();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Ping_varz_responds_via_wildcard_subject()
|
||||
{
|
||||
using var server = CreateTestServer();
|
||||
_ = server.StartAsync(CancellationToken.None);
|
||||
await server.WaitForReadyAsync();
|
||||
|
||||
var received = new TaskCompletionSource<byte[]>();
|
||||
var replySubject = $"_INBOX.test.{Guid.NewGuid():N}";
|
||||
server.EventSystem!.SysSubscribe(replySubject, (sub, client, acc, subject, reply, hdr, msg) =>
|
||||
{
|
||||
received.TrySetResult(msg.ToArray());
|
||||
});
|
||||
|
||||
var pingSubject = string.Format(EventSubjects.ServerPing, "VARZ");
|
||||
server.SendInternalMsg(pingSubject, replySubject, null);
|
||||
|
||||
var result = await received.Task.WaitAsync(TimeSpan.FromSeconds(5));
|
||||
var json = Encoding.UTF8.GetString(result);
|
||||
json.ShouldContain("\"server_id\"");
|
||||
|
||||
await server.ShutdownAsync();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Request_without_reply_is_ignored()
|
||||
{
|
||||
using var server = CreateTestServer();
|
||||
_ = server.StartAsync(CancellationToken.None);
|
||||
await server.WaitForReadyAsync();
|
||||
|
||||
// Send a request with no reply subject -- should not crash
|
||||
var reqSubject = string.Format(EventSubjects.ServerReq, server.ServerId, "VARZ");
|
||||
server.SendInternalMsg(reqSubject, null, null);
|
||||
|
||||
// Give it a moment to process without error
|
||||
await Task.Delay(200);
|
||||
|
||||
// Server should still be running
|
||||
server.IsShuttingDown.ShouldBeFalse();
|
||||
|
||||
await server.ShutdownAsync();
|
||||
}
|
||||
|
||||
private static NatsServer CreateTestServer()
|
||||
{
|
||||
var port = GetFreePort();
|
||||
return new NatsServer(new NatsOptions { Port = port }, NullLoggerFactory.Instance);
|
||||
}
|
||||
|
||||
private static int GetFreePort()
|
||||
{
|
||||
using var sock = new System.Net.Sockets.Socket(
|
||||
System.Net.Sockets.AddressFamily.InterNetwork,
|
||||
System.Net.Sockets.SocketType.Stream,
|
||||
System.Net.Sockets.ProtocolType.Tcp);
|
||||
sock.Bind(new System.Net.IPEndPoint(System.Net.IPAddress.Loopback, 0));
|
||||
return ((System.Net.IPEndPoint)sock.LocalEndPoint!).Port;
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user