Reconcile close reason tracking: feature branch's MarkClosed() and ShouldSkipFlush/FlushAndCloseAsync now use main's ClientClosedReason enum. ClosedState enum retained for forward compatibility.
200 lines
5.7 KiB
Markdown
200 lines
5.7 KiB
Markdown
# Section 2: Client/Connection Handling — Design
|
|
|
|
> Implements all in-scope gaps from differences.md Section 2.
|
|
|
|
## Scope
|
|
|
|
8 features, all single-server client-facing (no clustering/routes/gateways/leaf):
|
|
|
|
1. Close reason tracking (ClosedState enum)
|
|
2. Connection state flags (bitfield replacing `_connectReceived`)
|
|
3. Channel-based write loop with batch flush
|
|
4. Slow consumer detection (pending bytes + write deadline)
|
|
5. Write deadline / timeout
|
|
6. Verbose mode (`+OK` responses)
|
|
7. No-responders validation and notification
|
|
8. Per-read-cycle stat batching
|
|
|
|
## A. Close Reasons
|
|
|
|
New `ClientClosedReason` enum with 16 values scoped to single-server:
|
|
|
|
```
|
|
ClientClosed, AuthenticationTimeout, AuthenticationViolation, TLSHandshakeError,
|
|
SlowConsumerPendingBytes, SlowConsumerWriteDeadline, WriteError, ReadError,
|
|
ParseError, StaleConnection, ProtocolViolation, MaxPayloadExceeded,
|
|
MaxSubscriptionsExceeded, ServerShutdown, MsgHeaderViolation, NoRespondersRequiresHeaders
|
|
```
|
|
|
|
Go has 37 values; excluded: route/gateway/leaf/JWT/operator-mode values.
|
|
|
|
Per-client `CloseReason` property set before closing. Available in monitoring (`/connz`).
|
|
|
|
## B. Connection State Flags
|
|
|
|
`ClientFlags` bitfield enum backed by `int`, manipulated via `Interlocked.Or`/`Interlocked.And`:
|
|
|
|
```
|
|
ConnectReceived = 1,
|
|
FirstPongSent = 2,
|
|
HandshakeComplete = 4,
|
|
CloseConnection = 8,
|
|
WriteLoopStarted = 16,
|
|
IsSlowConsumer = 32,
|
|
ConnectProcessFinished = 64
|
|
```
|
|
|
|
Replaces current `_connectReceived` (int with Volatile.Read/Write).
|
|
|
|
Helper methods: `SetFlag(flag)`, `ClearFlag(flag)`, `HasFlag(flag)`.
|
|
|
|
## C. Channel-based Write Loop
|
|
|
|
### Architecture
|
|
|
|
Replace inline `_writeLock` + direct stream writes:
|
|
|
|
```
|
|
Producer threads → QueueOutbound(bytes) → Channel<ReadOnlyMemory<byte>> → WriteLoop → Stream
|
|
```
|
|
|
|
### Components
|
|
|
|
- `Channel<ReadOnlyMemory<byte>>` — bounded (capacity derived from MaxPending / avg message size, or 8192 items)
|
|
- `_pendingBytes` (long) — tracks queued but unflushed bytes via `Interlocked.Add`
|
|
- `RunWriteLoopAsync` — background task: `WaitToReadAsync` → drain all via `TryRead` → single `FlushAsync`
|
|
- `QueueOutbound(ReadOnlyMemory<byte>)` — enqueue, update pending bytes, check slow consumer
|
|
|
|
### Coalescing
|
|
|
|
The write loop drains all available items from the channel before flushing:
|
|
|
|
```
|
|
while (await reader.WaitToReadAsync(ct))
|
|
{
|
|
while (reader.TryRead(out var data))
|
|
await stream.WriteAsync(data, ct); // buffered writes, no flush yet
|
|
await stream.FlushAsync(ct); // single flush after batch
|
|
}
|
|
```
|
|
|
|
### Migration
|
|
|
|
All existing write paths refactored:
|
|
- `SendMessageAsync` → serialize MSG/HMSG to byte array → `QueueOutbound`
|
|
- `WriteAsync` → serialize protocol message → `QueueOutbound`
|
|
- Remove `_writeLock` SemaphoreSlim
|
|
|
|
## D. Slow Consumer Detection
|
|
|
|
### Pending Bytes (Hard Limit)
|
|
|
|
In `QueueOutbound`, before writing to channel:
|
|
|
|
```
|
|
if (_pendingBytes + data.Length > _maxPending)
|
|
{
|
|
SetFlag(IsSlowConsumer);
|
|
CloseWithReason(SlowConsumerPendingBytes);
|
|
return;
|
|
}
|
|
```
|
|
|
|
- `MaxPending` default: 64MB (matching Go's `MAX_PENDING_SIZE`)
|
|
- New option in `NatsOptions`
|
|
|
|
### Write Deadline (Timeout)
|
|
|
|
In write loop flush:
|
|
|
|
```
|
|
using var cts = CancellationTokenSource.CreateLinkedTokenSource(ct);
|
|
cts.CancelAfter(_writeDeadline);
|
|
await stream.FlushAsync(cts.Token);
|
|
```
|
|
|
|
On timeout → close with `SlowConsumerWriteDeadline`.
|
|
|
|
- `WriteDeadline` default: 10 seconds
|
|
- New option in `NatsOptions`
|
|
|
|
### Monitoring
|
|
|
|
- `IsSlowConsumer` flag readable for `/connz`
|
|
- Server-level `SlowConsumerCount` stat incremented
|
|
|
|
## E. Verbose Mode
|
|
|
|
After successful command processing (CONNECT, SUB, UNSUB, PUB), check `ClientOpts?.Verbose`:
|
|
|
|
```
|
|
if (ClientOpts?.Verbose == true)
|
|
QueueOutbound(OkBytes);
|
|
```
|
|
|
|
`OkBytes` = pre-encoded `+OK\r\n` static byte array in `NatsProtocol`.
|
|
|
|
## F. No-Responders
|
|
|
|
### CONNECT Validation
|
|
|
|
```
|
|
if (clientOpts.NoResponders && !clientOpts.Headers)
|
|
{
|
|
CloseWithReason(NoRespondersRequiresHeaders);
|
|
return;
|
|
}
|
|
```
|
|
|
|
### Publish-time Notification
|
|
|
|
In `NatsServer` message delivery, after `Match()` returns zero subscribers:
|
|
|
|
```
|
|
if (!delivered && reply.Length > 0 && publisher.ClientOpts?.NoResponders == true)
|
|
{
|
|
// Send HMSG with NATS/1.0 503 status back to publisher
|
|
var header = $"NATS/1.0 503\r\nNats-Subject: {subject}\r\n\r\n";
|
|
publisher.SendNoRespondersAsync(reply, sid, header);
|
|
}
|
|
```
|
|
|
|
## G. Stat Batching
|
|
|
|
In read loop, accumulate locally:
|
|
|
|
```
|
|
long localInMsgs = 0, localInBytes = 0;
|
|
// ... per message: localInMsgs++; localInBytes += size;
|
|
// End of read cycle:
|
|
Interlocked.Add(ref _inMsgs, localInMsgs);
|
|
Interlocked.Add(ref _inBytes, localInBytes);
|
|
// Same for server stats
|
|
```
|
|
|
|
Reduces atomic operations from per-message to per-read-cycle.
|
|
|
|
## Files
|
|
|
|
| File | Change | Size |
|
|
|------|--------|------|
|
|
| `ClientClosedReason.cs` | New | Small |
|
|
| `ClientFlags.cs` | New | Small |
|
|
| `NatsClient.cs` | Major rewrite of write path | Large |
|
|
| `NatsServer.cs` | No-responders, close reason | Medium |
|
|
| `NatsOptions.cs` | MaxPending, WriteDeadline | Small |
|
|
| `NatsProtocol.cs` | +OK bytes, NoResponders | Small |
|
|
| `ClientTests.cs` | Verbose, close reasons, flags | Medium |
|
|
| `ServerTests.cs` | No-responders, slow consumer | Medium |
|
|
|
|
## Test Plan
|
|
|
|
- **Verbose mode**: Connect with `verbose:true`, send SUB/PUB, verify `+OK` responses
|
|
- **Close reasons**: Trigger each close path, verify reason is set
|
|
- **State flags**: Set/clear/check flags concurrently
|
|
- **Slow consumer (pending bytes)**: Queue more than MaxPending, verify close
|
|
- **Slow consumer (write deadline)**: Use a slow/blocked stream, verify timeout close
|
|
- **No-responders**: Publish to empty subject with reply, verify 503 HMSG
|
|
- **Write coalescing**: Send multiple messages rapidly, verify batched flush
|
|
- **Stat batching**: Send N messages, verify stats match after read cycle
|