Reconcile close reason tracking: feature branch's MarkClosed() and ShouldSkipFlush/FlushAndCloseAsync now use main's ClientClosedReason enum. ClosedState enum retained for forward compatibility.
5.7 KiB
Section 2: Client/Connection Handling — Design
Implements all in-scope gaps from differences.md Section 2.
Scope
8 features, all single-server client-facing (no clustering/routes/gateways/leaf):
- Close reason tracking (ClosedState enum)
- Connection state flags (bitfield replacing
_connectReceived) - Channel-based write loop with batch flush
- Slow consumer detection (pending bytes + write deadline)
- Write deadline / timeout
- Verbose mode (
+OKresponses) - No-responders validation and notification
- Per-read-cycle stat batching
A. Close Reasons
New ClientClosedReason enum with 16 values scoped to single-server:
ClientClosed, AuthenticationTimeout, AuthenticationViolation, TLSHandshakeError,
SlowConsumerPendingBytes, SlowConsumerWriteDeadline, WriteError, ReadError,
ParseError, StaleConnection, ProtocolViolation, MaxPayloadExceeded,
MaxSubscriptionsExceeded, ServerShutdown, MsgHeaderViolation, NoRespondersRequiresHeaders
Go has 37 values; excluded: route/gateway/leaf/JWT/operator-mode values.
Per-client CloseReason property set before closing. Available in monitoring (/connz).
B. Connection State Flags
ClientFlags bitfield enum backed by int, manipulated via Interlocked.Or/Interlocked.And:
ConnectReceived = 1,
FirstPongSent = 2,
HandshakeComplete = 4,
CloseConnection = 8,
WriteLoopStarted = 16,
IsSlowConsumer = 32,
ConnectProcessFinished = 64
Replaces current _connectReceived (int with Volatile.Read/Write).
Helper methods: SetFlag(flag), ClearFlag(flag), HasFlag(flag).
C. Channel-based Write Loop
Architecture
Replace inline _writeLock + direct stream writes:
Producer threads → QueueOutbound(bytes) → Channel<ReadOnlyMemory<byte>> → WriteLoop → Stream
Components
Channel<ReadOnlyMemory<byte>>— bounded (capacity derived from MaxPending / avg message size, or 8192 items)_pendingBytes(long) — tracks queued but unflushed bytes viaInterlocked.AddRunWriteLoopAsync— background task:WaitToReadAsync→ drain all viaTryRead→ singleFlushAsyncQueueOutbound(ReadOnlyMemory<byte>)— enqueue, update pending bytes, check slow consumer
Coalescing
The write loop drains all available items from the channel before flushing:
while (await reader.WaitToReadAsync(ct))
{
while (reader.TryRead(out var data))
await stream.WriteAsync(data, ct); // buffered writes, no flush yet
await stream.FlushAsync(ct); // single flush after batch
}
Migration
All existing write paths refactored:
SendMessageAsync→ serialize MSG/HMSG to byte array →QueueOutboundWriteAsync→ serialize protocol message →QueueOutbound- Remove
_writeLockSemaphoreSlim
D. Slow Consumer Detection
Pending Bytes (Hard Limit)
In QueueOutbound, before writing to channel:
if (_pendingBytes + data.Length > _maxPending)
{
SetFlag(IsSlowConsumer);
CloseWithReason(SlowConsumerPendingBytes);
return;
}
MaxPendingdefault: 64MB (matching Go'sMAX_PENDING_SIZE)- New option in
NatsOptions
Write Deadline (Timeout)
In write loop flush:
using var cts = CancellationTokenSource.CreateLinkedTokenSource(ct);
cts.CancelAfter(_writeDeadline);
await stream.FlushAsync(cts.Token);
On timeout → close with SlowConsumerWriteDeadline.
WriteDeadlinedefault: 10 seconds- New option in
NatsOptions
Monitoring
IsSlowConsumerflag readable for/connz- Server-level
SlowConsumerCountstat incremented
E. Verbose Mode
After successful command processing (CONNECT, SUB, UNSUB, PUB), check ClientOpts?.Verbose:
if (ClientOpts?.Verbose == true)
QueueOutbound(OkBytes);
OkBytes = pre-encoded +OK\r\n static byte array in NatsProtocol.
F. No-Responders
CONNECT Validation
if (clientOpts.NoResponders && !clientOpts.Headers)
{
CloseWithReason(NoRespondersRequiresHeaders);
return;
}
Publish-time Notification
In NatsServer message delivery, after Match() returns zero subscribers:
if (!delivered && reply.Length > 0 && publisher.ClientOpts?.NoResponders == true)
{
// Send HMSG with NATS/1.0 503 status back to publisher
var header = $"NATS/1.0 503\r\nNats-Subject: {subject}\r\n\r\n";
publisher.SendNoRespondersAsync(reply, sid, header);
}
G. Stat Batching
In read loop, accumulate locally:
long localInMsgs = 0, localInBytes = 0;
// ... per message: localInMsgs++; localInBytes += size;
// End of read cycle:
Interlocked.Add(ref _inMsgs, localInMsgs);
Interlocked.Add(ref _inBytes, localInBytes);
// Same for server stats
Reduces atomic operations from per-message to per-read-cycle.
Files
| File | Change | Size |
|---|---|---|
ClientClosedReason.cs |
New | Small |
ClientFlags.cs |
New | Small |
NatsClient.cs |
Major rewrite of write path | Large |
NatsServer.cs |
No-responders, close reason | Medium |
NatsOptions.cs |
MaxPending, WriteDeadline | Small |
NatsProtocol.cs |
+OK bytes, NoResponders | Small |
ClientTests.cs |
Verbose, close reasons, flags | Medium |
ServerTests.cs |
No-responders, slow consumer | Medium |
Test Plan
- Verbose mode: Connect with
verbose:true, send SUB/PUB, verify+OKresponses - Close reasons: Trigger each close path, verify reason is set
- State flags: Set/clear/check flags concurrently
- Slow consumer (pending bytes): Queue more than MaxPending, verify close
- Slow consumer (write deadline): Use a slow/blocked stream, verify timeout close
- No-responders: Publish to empty subject with reply, verify 503 HMSG
- Write coalescing: Send multiple messages rapidly, verify batched flush
- Stat batching: Send N messages, verify stats match after read cycle