feat: implement NatsClient connection handler with read/write pipeline
This commit is contained in:
284
src/NATS.Server/NatsClient.cs
Normal file
284
src/NATS.Server/NatsClient.cs
Normal file
@@ -0,0 +1,284 @@
|
||||
using System.Buffers;
|
||||
using System.IO.Pipelines;
|
||||
using System.Net.Sockets;
|
||||
using System.Text;
|
||||
using System.Text.Json;
|
||||
using NATS.Server.Protocol;
|
||||
using NATS.Server.Subscriptions;
|
||||
|
||||
namespace NATS.Server;
|
||||
|
||||
public interface IMessageRouter
|
||||
{
|
||||
void ProcessMessage(string subject, string? replyTo, ReadOnlyMemory<byte> headers,
|
||||
ReadOnlyMemory<byte> payload, NatsClient sender);
|
||||
void RemoveClient(NatsClient client);
|
||||
}
|
||||
|
||||
public interface ISubListAccess
|
||||
{
|
||||
SubList SubList { get; }
|
||||
}
|
||||
|
||||
public sealed class NatsClient : IDisposable
|
||||
{
|
||||
private readonly Socket _socket;
|
||||
private readonly NetworkStream _stream;
|
||||
private readonly NatsOptions _options;
|
||||
private readonly ServerInfo _serverInfo;
|
||||
private readonly NatsParser _parser;
|
||||
private readonly SemaphoreSlim _writeLock = new(1, 1);
|
||||
private readonly Dictionary<string, Subscription> _subs = new();
|
||||
|
||||
public ulong Id { get; }
|
||||
public ClientOptions? ClientOpts { get; private set; }
|
||||
public IMessageRouter? Router { get; set; }
|
||||
public bool ConnectReceived { get; private set; }
|
||||
|
||||
// Stats
|
||||
public long InMsgs;
|
||||
public long OutMsgs;
|
||||
public long InBytes;
|
||||
public long OutBytes;
|
||||
|
||||
public IReadOnlyDictionary<string, Subscription> Subscriptions => _subs;
|
||||
|
||||
public NatsClient(ulong id, Socket socket, NatsOptions options, ServerInfo serverInfo)
|
||||
{
|
||||
Id = id;
|
||||
_socket = socket;
|
||||
_stream = new NetworkStream(socket, ownsSocket: false);
|
||||
_options = options;
|
||||
_serverInfo = serverInfo;
|
||||
_parser = new NatsParser(options.MaxPayload);
|
||||
}
|
||||
|
||||
public async Task RunAsync(CancellationToken ct)
|
||||
{
|
||||
var pipe = new Pipe();
|
||||
try
|
||||
{
|
||||
// Send INFO
|
||||
await SendInfoAsync(ct);
|
||||
|
||||
// Start read pump and command processing in parallel
|
||||
var fillTask = FillPipeAsync(pipe.Writer, ct);
|
||||
var processTask = ProcessCommandsAsync(pipe.Reader, ct);
|
||||
|
||||
await Task.WhenAny(fillTask, processTask);
|
||||
}
|
||||
catch (OperationCanceledException) { }
|
||||
catch (Exception) { /* connection error -- clean up */ }
|
||||
finally
|
||||
{
|
||||
Router?.RemoveClient(this);
|
||||
}
|
||||
}
|
||||
|
||||
private async Task FillPipeAsync(PipeWriter writer, CancellationToken ct)
|
||||
{
|
||||
try
|
||||
{
|
||||
while (!ct.IsCancellationRequested)
|
||||
{
|
||||
var memory = writer.GetMemory(4096);
|
||||
int bytesRead = await _stream.ReadAsync(memory, ct);
|
||||
if (bytesRead == 0)
|
||||
break;
|
||||
|
||||
writer.Advance(bytesRead);
|
||||
var result = await writer.FlushAsync(ct);
|
||||
if (result.IsCompleted)
|
||||
break;
|
||||
}
|
||||
}
|
||||
finally
|
||||
{
|
||||
await writer.CompleteAsync();
|
||||
}
|
||||
}
|
||||
|
||||
private async Task ProcessCommandsAsync(PipeReader reader, CancellationToken ct)
|
||||
{
|
||||
try
|
||||
{
|
||||
while (!ct.IsCancellationRequested)
|
||||
{
|
||||
var result = await reader.ReadAsync(ct);
|
||||
var buffer = result.Buffer;
|
||||
|
||||
while (_parser.TryParse(ref buffer, out var cmd))
|
||||
{
|
||||
await DispatchCommandAsync(cmd, ct);
|
||||
}
|
||||
|
||||
reader.AdvanceTo(buffer.Start, buffer.End);
|
||||
|
||||
if (result.IsCompleted)
|
||||
break;
|
||||
}
|
||||
}
|
||||
finally
|
||||
{
|
||||
await reader.CompleteAsync();
|
||||
}
|
||||
}
|
||||
|
||||
private async ValueTask DispatchCommandAsync(ParsedCommand cmd, CancellationToken ct)
|
||||
{
|
||||
switch (cmd.Type)
|
||||
{
|
||||
case CommandType.Connect:
|
||||
ProcessConnect(cmd);
|
||||
break;
|
||||
|
||||
case CommandType.Ping:
|
||||
await WriteAsync(NatsProtocol.PongBytes, ct);
|
||||
break;
|
||||
|
||||
case CommandType.Pong:
|
||||
// Update RTT tracking (placeholder)
|
||||
break;
|
||||
|
||||
case CommandType.Sub:
|
||||
ProcessSub(cmd);
|
||||
break;
|
||||
|
||||
case CommandType.Unsub:
|
||||
ProcessUnsub(cmd);
|
||||
break;
|
||||
|
||||
case CommandType.Pub:
|
||||
case CommandType.HPub:
|
||||
ProcessPub(cmd);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
private void ProcessConnect(ParsedCommand cmd)
|
||||
{
|
||||
ClientOpts = JsonSerializer.Deserialize<ClientOptions>(cmd.Payload.Span)
|
||||
?? new ClientOptions();
|
||||
ConnectReceived = true;
|
||||
}
|
||||
|
||||
private void ProcessSub(ParsedCommand cmd)
|
||||
{
|
||||
var sub = new Subscription
|
||||
{
|
||||
Subject = cmd.Subject!,
|
||||
Queue = cmd.Queue,
|
||||
Sid = cmd.Sid!,
|
||||
};
|
||||
|
||||
_subs[cmd.Sid!] = sub;
|
||||
sub.Client = this;
|
||||
|
||||
if (Router is ISubListAccess sl)
|
||||
sl.SubList.Insert(sub);
|
||||
}
|
||||
|
||||
private void ProcessUnsub(ParsedCommand cmd)
|
||||
{
|
||||
if (!_subs.TryGetValue(cmd.Sid!, out var sub))
|
||||
return;
|
||||
|
||||
if (cmd.MaxMessages > 0)
|
||||
{
|
||||
sub.MaxMessages = cmd.MaxMessages;
|
||||
// Will be cleaned up when MessageCount reaches MaxMessages
|
||||
return;
|
||||
}
|
||||
|
||||
_subs.Remove(cmd.Sid!);
|
||||
|
||||
if (Router is ISubListAccess sl)
|
||||
sl.SubList.Remove(sub);
|
||||
}
|
||||
|
||||
private void ProcessPub(ParsedCommand cmd)
|
||||
{
|
||||
Interlocked.Increment(ref InMsgs);
|
||||
Interlocked.Add(ref InBytes, cmd.Payload.Length);
|
||||
|
||||
ReadOnlyMemory<byte> headers = default;
|
||||
ReadOnlyMemory<byte> payload = cmd.Payload;
|
||||
|
||||
if (cmd.Type == CommandType.HPub && cmd.HeaderSize > 0)
|
||||
{
|
||||
headers = cmd.Payload[..cmd.HeaderSize];
|
||||
payload = cmd.Payload[cmd.HeaderSize..];
|
||||
}
|
||||
|
||||
Router?.ProcessMessage(cmd.Subject!, cmd.ReplyTo, headers, payload, this);
|
||||
}
|
||||
|
||||
private async Task SendInfoAsync(CancellationToken ct)
|
||||
{
|
||||
var infoJson = JsonSerializer.Serialize(_serverInfo);
|
||||
var infoLine = Encoding.ASCII.GetBytes($"INFO {infoJson}\r\n");
|
||||
await WriteAsync(infoLine, ct);
|
||||
}
|
||||
|
||||
public async Task SendMessageAsync(string subject, string sid, string? replyTo,
|
||||
ReadOnlyMemory<byte> headers, ReadOnlyMemory<byte> payload, CancellationToken ct)
|
||||
{
|
||||
Interlocked.Increment(ref OutMsgs);
|
||||
Interlocked.Add(ref OutBytes, payload.Length + headers.Length);
|
||||
|
||||
byte[] line;
|
||||
if (headers.Length > 0)
|
||||
{
|
||||
int totalSize = headers.Length + payload.Length;
|
||||
line = Encoding.ASCII.GetBytes($"HMSG {subject} {sid} {(replyTo != null ? replyTo + " " : "")}{headers.Length} {totalSize}\r\n");
|
||||
}
|
||||
else
|
||||
{
|
||||
line = Encoding.ASCII.GetBytes($"MSG {subject} {sid} {(replyTo != null ? replyTo + " " : "")}{payload.Length}\r\n");
|
||||
}
|
||||
|
||||
await _writeLock.WaitAsync(ct);
|
||||
try
|
||||
{
|
||||
await _stream.WriteAsync(line, ct);
|
||||
if (headers.Length > 0)
|
||||
await _stream.WriteAsync(headers, ct);
|
||||
if (payload.Length > 0)
|
||||
await _stream.WriteAsync(payload, ct);
|
||||
await _stream.WriteAsync(NatsProtocol.CrLf, ct);
|
||||
await _stream.FlushAsync(ct);
|
||||
}
|
||||
finally
|
||||
{
|
||||
_writeLock.Release();
|
||||
}
|
||||
}
|
||||
|
||||
private async Task WriteAsync(byte[] data, CancellationToken ct)
|
||||
{
|
||||
await _writeLock.WaitAsync(ct);
|
||||
try
|
||||
{
|
||||
await _stream.WriteAsync(data, ct);
|
||||
await _stream.FlushAsync(ct);
|
||||
}
|
||||
finally
|
||||
{
|
||||
_writeLock.Release();
|
||||
}
|
||||
}
|
||||
|
||||
public void RemoveAllSubscriptions(SubList subList)
|
||||
{
|
||||
foreach (var sub in _subs.Values)
|
||||
subList.Remove(sub);
|
||||
_subs.Clear();
|
||||
}
|
||||
|
||||
public void Dispose()
|
||||
{
|
||||
_stream.Dispose();
|
||||
_socket.Dispose();
|
||||
_writeLock.Dispose();
|
||||
}
|
||||
}
|
||||
@@ -1,3 +1,5 @@
|
||||
using NATS.Server;
|
||||
|
||||
namespace NATS.Server.Subscriptions;
|
||||
|
||||
public sealed class Subscription
|
||||
@@ -7,4 +9,5 @@ public sealed class Subscription
|
||||
public required string Sid { get; init; }
|
||||
public long MessageCount; // Interlocked
|
||||
public long MaxMessages; // 0 = unlimited
|
||||
public NatsClient? Client { get; set; }
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user