using System.Buffers; using System.IO.Pipelines; using System.Net.Sockets; using System.Text; using System.Text.Json; using Microsoft.Extensions.Logging; using NATS.Server.Protocol; using NATS.Server.Subscriptions; namespace NATS.Server; public interface IMessageRouter { void ProcessMessage(string subject, string? replyTo, ReadOnlyMemory headers, ReadOnlyMemory payload, NatsClient sender); void RemoveClient(NatsClient client); } public interface ISubListAccess { SubList SubList { get; } } public sealed class NatsClient : IDisposable { private readonly Socket _socket; private readonly NetworkStream _stream; private readonly NatsOptions _options; private readonly ServerInfo _serverInfo; private readonly NatsParser _parser; private readonly SemaphoreSlim _writeLock = new(1, 1); private readonly Dictionary _subs = new(); private readonly ILogger _logger; public ulong Id { get; } public ClientOptions? ClientOpts { get; private set; } public IMessageRouter? Router { get; set; } public bool ConnectReceived { get; private set; } // Stats public long InMsgs; public long OutMsgs; public long InBytes; public long OutBytes; public IReadOnlyDictionary Subscriptions => _subs; public NatsClient(ulong id, Socket socket, NatsOptions options, ServerInfo serverInfo, ILogger logger) { Id = id; _socket = socket; _stream = new NetworkStream(socket, ownsSocket: false); _options = options; _serverInfo = serverInfo; _logger = logger; _parser = new NatsParser(options.MaxPayload); } public async Task RunAsync(CancellationToken ct) { var pipe = new Pipe(); try { // Send INFO await SendInfoAsync(ct); // Start read pump and command processing in parallel var fillTask = FillPipeAsync(pipe.Writer, ct); var processTask = ProcessCommandsAsync(pipe.Reader, ct); await Task.WhenAny(fillTask, processTask); } catch (OperationCanceledException) { } catch (Exception ex) { _logger.LogDebug(ex, "Client {ClientId} connection error", Id); } finally { Router?.RemoveClient(this); } } private async Task FillPipeAsync(PipeWriter writer, CancellationToken ct) { try { while (!ct.IsCancellationRequested) { var memory = writer.GetMemory(4096); int bytesRead = await _stream.ReadAsync(memory, ct); if (bytesRead == 0) break; writer.Advance(bytesRead); var result = await writer.FlushAsync(ct); if (result.IsCompleted) break; } } finally { await writer.CompleteAsync(); } } private async Task ProcessCommandsAsync(PipeReader reader, CancellationToken ct) { try { while (!ct.IsCancellationRequested) { var result = await reader.ReadAsync(ct); var buffer = result.Buffer; while (_parser.TryParse(ref buffer, out var cmd)) { await DispatchCommandAsync(cmd, ct); } reader.AdvanceTo(buffer.Start, buffer.End); if (result.IsCompleted) break; } } finally { await reader.CompleteAsync(); } } private async ValueTask DispatchCommandAsync(ParsedCommand cmd, CancellationToken ct) { switch (cmd.Type) { case CommandType.Connect: ProcessConnect(cmd); break; case CommandType.Ping: await WriteAsync(NatsProtocol.PongBytes, ct); break; case CommandType.Pong: // Update RTT tracking (placeholder) break; case CommandType.Sub: ProcessSub(cmd); break; case CommandType.Unsub: ProcessUnsub(cmd); break; case CommandType.Pub: case CommandType.HPub: ProcessPub(cmd); break; } } private void ProcessConnect(ParsedCommand cmd) { ClientOpts = JsonSerializer.Deserialize(cmd.Payload.Span) ?? new ClientOptions(); ConnectReceived = true; _logger.LogDebug("CONNECT received from client {ClientId}, name={ClientName}", Id, ClientOpts?.Name); } private void ProcessSub(ParsedCommand cmd) { var sub = new Subscription { Subject = cmd.Subject!, Queue = cmd.Queue, Sid = cmd.Sid!, }; _subs[cmd.Sid!] = sub; sub.Client = this; _logger.LogDebug("SUB {Subject} {Sid} from client {ClientId}", cmd.Subject, cmd.Sid, Id); if (Router is ISubListAccess sl) sl.SubList.Insert(sub); } private void ProcessUnsub(ParsedCommand cmd) { _logger.LogDebug("UNSUB {Sid} from client {ClientId}", cmd.Sid, Id); if (!_subs.TryGetValue(cmd.Sid!, out var sub)) return; if (cmd.MaxMessages > 0) { sub.MaxMessages = cmd.MaxMessages; // Will be cleaned up when MessageCount reaches MaxMessages return; } _subs.Remove(cmd.Sid!); if (Router is ISubListAccess sl) sl.SubList.Remove(sub); } private void ProcessPub(ParsedCommand cmd) { Interlocked.Increment(ref InMsgs); Interlocked.Add(ref InBytes, cmd.Payload.Length); ReadOnlyMemory headers = default; ReadOnlyMemory payload = cmd.Payload; if (cmd.Type == CommandType.HPub && cmd.HeaderSize > 0) { headers = cmd.Payload[..cmd.HeaderSize]; payload = cmd.Payload[cmd.HeaderSize..]; } Router?.ProcessMessage(cmd.Subject!, cmd.ReplyTo, headers, payload, this); } private async Task SendInfoAsync(CancellationToken ct) { var infoJson = JsonSerializer.Serialize(_serverInfo); var infoLine = Encoding.ASCII.GetBytes($"INFO {infoJson}\r\n"); await WriteAsync(infoLine, ct); } public async Task SendMessageAsync(string subject, string sid, string? replyTo, ReadOnlyMemory headers, ReadOnlyMemory payload, CancellationToken ct) { Interlocked.Increment(ref OutMsgs); Interlocked.Add(ref OutBytes, payload.Length + headers.Length); byte[] line; if (headers.Length > 0) { int totalSize = headers.Length + payload.Length; line = Encoding.ASCII.GetBytes($"HMSG {subject} {sid} {(replyTo != null ? replyTo + " " : "")}{headers.Length} {totalSize}\r\n"); } else { line = Encoding.ASCII.GetBytes($"MSG {subject} {sid} {(replyTo != null ? replyTo + " " : "")}{payload.Length}\r\n"); } await _writeLock.WaitAsync(ct); try { await _stream.WriteAsync(line, ct); if (headers.Length > 0) await _stream.WriteAsync(headers, ct); if (payload.Length > 0) await _stream.WriteAsync(payload, ct); await _stream.WriteAsync(NatsProtocol.CrLf, ct); await _stream.FlushAsync(ct); } finally { _writeLock.Release(); } } private async Task WriteAsync(byte[] data, CancellationToken ct) { await _writeLock.WaitAsync(ct); try { await _stream.WriteAsync(data, ct); await _stream.FlushAsync(ct); } finally { _writeLock.Release(); } } public void RemoveAllSubscriptions(SubList subList) { foreach (var sub in _subs.Values) subList.Remove(sub); _subs.Clear(); } public void Dispose() { _stream.Dispose(); _socket.Dispose(); _writeLock.Dispose(); } }