401 lines
12 KiB
C#
401 lines
12 KiB
C#
using System.Buffers;
|
|
using System.IO.Pipelines;
|
|
using System.Net.Sockets;
|
|
using System.Text;
|
|
using System.Text.Json;
|
|
using Microsoft.Extensions.Logging;
|
|
using NATS.Server.Protocol;
|
|
using NATS.Server.Subscriptions;
|
|
|
|
namespace NATS.Server;
|
|
|
|
public interface IMessageRouter
|
|
{
|
|
void ProcessMessage(string subject, string? replyTo, ReadOnlyMemory<byte> headers,
|
|
ReadOnlyMemory<byte> payload, NatsClient sender);
|
|
void RemoveClient(NatsClient client);
|
|
}
|
|
|
|
public interface ISubListAccess
|
|
{
|
|
SubList SubList { get; }
|
|
}
|
|
|
|
public sealed class NatsClient : IDisposable
|
|
{
|
|
private readonly Socket _socket;
|
|
private readonly NetworkStream _stream;
|
|
private readonly NatsOptions _options;
|
|
private readonly ServerInfo _serverInfo;
|
|
private readonly NatsParser _parser;
|
|
private readonly SemaphoreSlim _writeLock = new(1, 1);
|
|
private CancellationTokenSource? _clientCts;
|
|
private readonly Dictionary<string, Subscription> _subs = new();
|
|
private readonly ILogger _logger;
|
|
|
|
public ulong Id { get; }
|
|
public ClientOptions? ClientOpts { get; private set; }
|
|
public IMessageRouter? Router { get; set; }
|
|
public bool ConnectReceived { get; private set; }
|
|
|
|
// Stats
|
|
public long InMsgs;
|
|
public long OutMsgs;
|
|
public long InBytes;
|
|
public long OutBytes;
|
|
|
|
// PING keepalive state
|
|
private int _pingsOut;
|
|
private long _lastIn;
|
|
|
|
public IReadOnlyDictionary<string, Subscription> Subscriptions => _subs;
|
|
|
|
public NatsClient(ulong id, Socket socket, NatsOptions options, ServerInfo serverInfo, ILogger logger)
|
|
{
|
|
Id = id;
|
|
_socket = socket;
|
|
_stream = new NetworkStream(socket, ownsSocket: false);
|
|
_options = options;
|
|
_serverInfo = serverInfo;
|
|
_logger = logger;
|
|
_parser = new NatsParser(options.MaxPayload);
|
|
}
|
|
|
|
public async Task RunAsync(CancellationToken ct)
|
|
{
|
|
_clientCts = CancellationTokenSource.CreateLinkedTokenSource(ct);
|
|
Interlocked.Exchange(ref _lastIn, Environment.TickCount64);
|
|
var pipe = new Pipe();
|
|
try
|
|
{
|
|
// Send INFO
|
|
await SendInfoAsync(_clientCts.Token);
|
|
|
|
// Start read pump, command processing, and ping timer in parallel
|
|
var fillTask = FillPipeAsync(pipe.Writer, _clientCts.Token);
|
|
var processTask = ProcessCommandsAsync(pipe.Reader, _clientCts.Token);
|
|
var pingTask = RunPingTimerAsync(_clientCts.Token);
|
|
|
|
await Task.WhenAny(fillTask, processTask, pingTask);
|
|
}
|
|
catch (OperationCanceledException)
|
|
{
|
|
_logger.LogDebug("Client {ClientId} operation cancelled", Id);
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_logger.LogDebug(ex, "Client {ClientId} connection error", Id);
|
|
}
|
|
finally
|
|
{
|
|
try { _socket.Shutdown(SocketShutdown.Both); }
|
|
catch (SocketException) { }
|
|
catch (ObjectDisposedException) { }
|
|
Router?.RemoveClient(this);
|
|
}
|
|
}
|
|
|
|
private async Task FillPipeAsync(PipeWriter writer, CancellationToken ct)
|
|
{
|
|
try
|
|
{
|
|
while (!ct.IsCancellationRequested)
|
|
{
|
|
var memory = writer.GetMemory(4096);
|
|
int bytesRead = await _stream.ReadAsync(memory, ct);
|
|
if (bytesRead == 0)
|
|
break;
|
|
|
|
writer.Advance(bytesRead);
|
|
var result = await writer.FlushAsync(ct);
|
|
if (result.IsCompleted)
|
|
break;
|
|
}
|
|
}
|
|
finally
|
|
{
|
|
await writer.CompleteAsync();
|
|
}
|
|
}
|
|
|
|
private async Task ProcessCommandsAsync(PipeReader reader, CancellationToken ct)
|
|
{
|
|
try
|
|
{
|
|
while (!ct.IsCancellationRequested)
|
|
{
|
|
var result = await reader.ReadAsync(ct);
|
|
var buffer = result.Buffer;
|
|
|
|
while (_parser.TryParse(ref buffer, out var cmd))
|
|
{
|
|
Interlocked.Exchange(ref _lastIn, Environment.TickCount64);
|
|
await DispatchCommandAsync(cmd, ct);
|
|
}
|
|
|
|
reader.AdvanceTo(buffer.Start, buffer.End);
|
|
|
|
if (result.IsCompleted)
|
|
break;
|
|
}
|
|
}
|
|
finally
|
|
{
|
|
await reader.CompleteAsync();
|
|
}
|
|
}
|
|
|
|
private async ValueTask DispatchCommandAsync(ParsedCommand cmd, CancellationToken ct)
|
|
{
|
|
switch (cmd.Type)
|
|
{
|
|
case CommandType.Connect:
|
|
ProcessConnect(cmd);
|
|
break;
|
|
|
|
case CommandType.Ping:
|
|
await WriteAsync(NatsProtocol.PongBytes, ct);
|
|
break;
|
|
|
|
case CommandType.Pong:
|
|
Interlocked.Exchange(ref _pingsOut, 0);
|
|
break;
|
|
|
|
case CommandType.Sub:
|
|
ProcessSub(cmd);
|
|
break;
|
|
|
|
case CommandType.Unsub:
|
|
ProcessUnsub(cmd);
|
|
break;
|
|
|
|
case CommandType.Pub:
|
|
case CommandType.HPub:
|
|
await ProcessPubAsync(cmd);
|
|
break;
|
|
}
|
|
}
|
|
|
|
private void ProcessConnect(ParsedCommand cmd)
|
|
{
|
|
ClientOpts = JsonSerializer.Deserialize<ClientOptions>(cmd.Payload.Span)
|
|
?? new ClientOptions();
|
|
ConnectReceived = true;
|
|
_logger.LogDebug("CONNECT received from client {ClientId}, name={ClientName}", Id, ClientOpts?.Name);
|
|
}
|
|
|
|
private void ProcessSub(ParsedCommand cmd)
|
|
{
|
|
var sub = new Subscription
|
|
{
|
|
Subject = cmd.Subject!,
|
|
Queue = cmd.Queue,
|
|
Sid = cmd.Sid!,
|
|
};
|
|
|
|
_subs[cmd.Sid!] = sub;
|
|
sub.Client = this;
|
|
|
|
_logger.LogDebug("SUB {Subject} {Sid} from client {ClientId}", cmd.Subject, cmd.Sid, Id);
|
|
|
|
if (Router is ISubListAccess sl)
|
|
sl.SubList.Insert(sub);
|
|
}
|
|
|
|
private void ProcessUnsub(ParsedCommand cmd)
|
|
{
|
|
_logger.LogDebug("UNSUB {Sid} from client {ClientId}", cmd.Sid, Id);
|
|
|
|
if (!_subs.TryGetValue(cmd.Sid!, out var sub))
|
|
return;
|
|
|
|
if (cmd.MaxMessages > 0)
|
|
{
|
|
sub.MaxMessages = cmd.MaxMessages;
|
|
// Will be cleaned up when MessageCount reaches MaxMessages
|
|
return;
|
|
}
|
|
|
|
_subs.Remove(cmd.Sid!);
|
|
|
|
if (Router is ISubListAccess sl)
|
|
sl.SubList.Remove(sub);
|
|
}
|
|
|
|
private async ValueTask ProcessPubAsync(ParsedCommand cmd)
|
|
{
|
|
Interlocked.Increment(ref InMsgs);
|
|
Interlocked.Add(ref InBytes, cmd.Payload.Length);
|
|
|
|
// Max payload validation (always, hard close)
|
|
if (cmd.Payload.Length > _options.MaxPayload)
|
|
{
|
|
_logger.LogWarning("Client {ClientId} exceeded max payload: {Size} > {MaxPayload}",
|
|
Id, cmd.Payload.Length, _options.MaxPayload);
|
|
await SendErrAndCloseAsync(NatsProtocol.ErrMaxPayloadViolation);
|
|
return;
|
|
}
|
|
|
|
// Pedantic mode: validate publish subject
|
|
if (ClientOpts?.Pedantic == true && !SubjectMatch.IsValidPublishSubject(cmd.Subject!))
|
|
{
|
|
_logger.LogDebug("Client {ClientId} invalid publish subject: {Subject}", Id, cmd.Subject);
|
|
await SendErrAsync(NatsProtocol.ErrInvalidPublishSubject);
|
|
return;
|
|
}
|
|
|
|
ReadOnlyMemory<byte> headers = default;
|
|
ReadOnlyMemory<byte> payload = cmd.Payload;
|
|
|
|
if (cmd.Type == CommandType.HPub && cmd.HeaderSize > 0)
|
|
{
|
|
headers = cmd.Payload[..cmd.HeaderSize];
|
|
payload = cmd.Payload[cmd.HeaderSize..];
|
|
}
|
|
|
|
Router?.ProcessMessage(cmd.Subject!, cmd.ReplyTo, headers, payload, this);
|
|
}
|
|
|
|
private async Task SendInfoAsync(CancellationToken ct)
|
|
{
|
|
var infoJson = JsonSerializer.Serialize(_serverInfo);
|
|
var infoLine = Encoding.ASCII.GetBytes($"INFO {infoJson}\r\n");
|
|
await WriteAsync(infoLine, ct);
|
|
}
|
|
|
|
public async Task SendMessageAsync(string subject, string sid, string? replyTo,
|
|
ReadOnlyMemory<byte> headers, ReadOnlyMemory<byte> payload, CancellationToken ct)
|
|
{
|
|
Interlocked.Increment(ref OutMsgs);
|
|
Interlocked.Add(ref OutBytes, payload.Length + headers.Length);
|
|
|
|
byte[] line;
|
|
if (headers.Length > 0)
|
|
{
|
|
int totalSize = headers.Length + payload.Length;
|
|
line = Encoding.ASCII.GetBytes($"HMSG {subject} {sid} {(replyTo != null ? replyTo + " " : "")}{headers.Length} {totalSize}\r\n");
|
|
}
|
|
else
|
|
{
|
|
line = Encoding.ASCII.GetBytes($"MSG {subject} {sid} {(replyTo != null ? replyTo + " " : "")}{payload.Length}\r\n");
|
|
}
|
|
|
|
await _writeLock.WaitAsync(ct);
|
|
try
|
|
{
|
|
await _stream.WriteAsync(line, ct);
|
|
if (headers.Length > 0)
|
|
await _stream.WriteAsync(headers, ct);
|
|
if (payload.Length > 0)
|
|
await _stream.WriteAsync(payload, ct);
|
|
await _stream.WriteAsync(NatsProtocol.CrLf, ct);
|
|
await _stream.FlushAsync(ct);
|
|
}
|
|
finally
|
|
{
|
|
_writeLock.Release();
|
|
}
|
|
}
|
|
|
|
private async Task WriteAsync(byte[] data, CancellationToken ct)
|
|
{
|
|
await _writeLock.WaitAsync(ct);
|
|
try
|
|
{
|
|
await _stream.WriteAsync(data, ct);
|
|
await _stream.FlushAsync(ct);
|
|
}
|
|
finally
|
|
{
|
|
_writeLock.Release();
|
|
}
|
|
}
|
|
|
|
public async Task SendErrAsync(string message)
|
|
{
|
|
var errLine = Encoding.ASCII.GetBytes($"-ERR '{message}'\r\n");
|
|
try
|
|
{
|
|
await WriteAsync(errLine, _clientCts?.Token ?? CancellationToken.None);
|
|
}
|
|
catch (OperationCanceledException)
|
|
{
|
|
// Expected during shutdown
|
|
}
|
|
catch (IOException ex)
|
|
{
|
|
_logger.LogDebug(ex, "Client {ClientId} failed to send -ERR", Id);
|
|
}
|
|
catch (ObjectDisposedException ex)
|
|
{
|
|
_logger.LogDebug(ex, "Client {ClientId} failed to send -ERR (disposed)", Id);
|
|
}
|
|
}
|
|
|
|
public async Task SendErrAndCloseAsync(string message)
|
|
{
|
|
await SendErrAsync(message);
|
|
if (_clientCts is { } cts)
|
|
await cts.CancelAsync();
|
|
else
|
|
_socket.Close();
|
|
}
|
|
|
|
private async Task RunPingTimerAsync(CancellationToken ct)
|
|
{
|
|
using var timer = new PeriodicTimer(_options.PingInterval);
|
|
try
|
|
{
|
|
while (await timer.WaitForNextTickAsync(ct))
|
|
{
|
|
var elapsed = Environment.TickCount64 - Interlocked.Read(ref _lastIn);
|
|
if (elapsed < (long)_options.PingInterval.TotalMilliseconds)
|
|
{
|
|
// Client was recently active, skip ping
|
|
Interlocked.Exchange(ref _pingsOut, 0);
|
|
continue;
|
|
}
|
|
|
|
var currentPingsOut = Interlocked.Increment(ref _pingsOut);
|
|
if (currentPingsOut > _options.MaxPingsOut)
|
|
{
|
|
_logger.LogDebug("Client {ClientId} stale connection — closing", Id);
|
|
await SendErrAndCloseAsync(NatsProtocol.ErrStaleConnection);
|
|
return;
|
|
}
|
|
|
|
_logger.LogDebug("Client {ClientId} sending PING ({PingsOut}/{MaxPingsOut})",
|
|
Id, currentPingsOut, _options.MaxPingsOut);
|
|
try
|
|
{
|
|
await WriteAsync(NatsProtocol.PingBytes, ct);
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_logger.LogDebug(ex, "Client {ClientId} failed to send PING", Id);
|
|
return;
|
|
}
|
|
}
|
|
}
|
|
catch (OperationCanceledException)
|
|
{
|
|
// Normal shutdown
|
|
}
|
|
}
|
|
|
|
public void RemoveAllSubscriptions(SubList subList)
|
|
{
|
|
foreach (var sub in _subs.Values)
|
|
subList.Remove(sub);
|
|
_subs.Clear();
|
|
}
|
|
|
|
public void Dispose()
|
|
{
|
|
_clientCts?.Dispose();
|
|
_stream.Dispose();
|
|
_socket.Dispose();
|
|
_writeLock.Dispose();
|
|
}
|
|
}
|