feat(batch16): port outbound and parser-facing client core features

This commit is contained in:
Joseph Doherty
2026-02-28 18:58:52 -05:00
parent 46ea749ad0
commit a0b0b45f5e
4 changed files with 301 additions and 2 deletions

View File

@@ -122,8 +122,8 @@ public sealed partial class ClientConnection
internal bool Headers; // mirrors c.headers
// Limits (int32 allows atomic access).
private int _mpay; // mirrors c.mpay — max payload (signed, jwt.NoLimit = -1)
private int _msubs; // mirrors c.msubs — max subscriptions
private int _mpay = -1; // mirrors c.mpay — max payload (signed, jwt.NoLimit = -1)
private int _msubs = -1; // mirrors c.msubs — max subscriptions
private int _mcl; // mirrors c.mcl — max control line
// Subscriptions.
@@ -136,6 +136,9 @@ public sealed partial class ClientConnection
internal long OutPb; // pending bytes
internal long OutMp; // max pending snapshot
internal TimeSpan OutWdl; // write deadline snapshot
internal WriteTimeoutPolicy OutWtp = WriteTimeoutPolicy.Close;
internal List<OutboundChunk> OutNb = [];
internal List<OutboundChunk> OutWnb = [];
// Timing.
internal DateTime Start;
@@ -1474,6 +1477,206 @@ public sealed partial class ClientConnection
internal static ClosedState ClosedStateForErr(Exception err) =>
err is EndOfStreamException ? ClosedState.ClientClosed : ClosedState.ReadError;
internal (List<OutboundChunk> chunks, long attempted) CollapsePtoNB()
{
long attempted = 0;
foreach (var chunk in OutNb)
attempted += chunk.Count;
return (OutNb, attempted);
}
internal bool FlushOutbound()
{
if (Flags.IsSet(ClientFlags.FlushOutbound))
return false;
Flags = Flags.Set(ClientFlags.FlushOutbound);
try
{
if (_nc is null || Server is null || OutPb == 0)
return true;
var (collapsed, attempted) = CollapsePtoNB();
OutNb = [];
if (collapsed.Count > 0)
OutWnb.AddRange(collapsed);
long written = 0;
try
{
foreach (var chunk in OutWnb)
{
_nc.Write(chunk.Buffer, 0, chunk.Count);
written += chunk.Count;
}
_nc.Flush();
}
catch (IOException ioEx) when (ioEx.InnerException is SocketException se &&
(se.SocketErrorCode == SocketError.TimedOut ||
se.SocketErrorCode == SocketError.WouldBlock))
{
if (HandleWriteTimeout(written, attempted, OutWnb.Count))
return true;
}
catch (Exception ex)
{
Debugf("Error flushing: {0}", ex.Message);
MarkConnAsClosed(ClosedState.WriteError);
return true;
}
if (written > 0)
{
OutPb = Math.Max(0, OutPb - written);
if (OutPb == 0)
OutWnb.Clear();
}
return true;
}
finally
{
Flags = Flags.Clear(ClientFlags.FlushOutbound);
}
}
internal bool HandleWriteTimeout(long written, long attempted, int numChunks)
{
if (Flags.IsSet(ClientFlags.ExpectConnect) && !Flags.IsSet(ClientFlags.ConnectReceived))
{
MarkConnAsClosed(ClosedState.SlowConsumerWriteDeadline);
return true;
}
if (OutWtp == WriteTimeoutPolicy.Close || written == 0)
{
MarkConnAsClosed(ClosedState.SlowConsumerWriteDeadline);
return true;
}
Flags = Flags.Set(ClientFlags.IsSlowConsumer);
Noticef("Slow Consumer State: WriteDeadline exceeded with {0} chunks of {1} bytes.", numChunks, attempted);
return false;
}
internal void MarkConnAsClosed(ClosedState reason)
{
if (reason is ClosedState.ReadError or ClosedState.WriteError or ClosedState.SlowConsumerPendingBytes
or ClosedState.SlowConsumerWriteDeadline or ClosedState.TlsHandshakeError)
{
Flags = Flags.Set(ClientFlags.SkipFlushOnClose);
}
if (Flags.IsSet(ClientFlags.ConnMarkedClosed))
return;
Flags = Flags.Set(ClientFlags.ConnMarkedClosed);
CloseConnection(reason);
}
internal void QueueOutbound(byte[] data)
{
if (IsClosed())
return;
OutPb += data.Length;
var remaining = data;
while (remaining.Length > 0)
{
var rented = NbPool.Get(remaining.Length);
var count = Math.Min(rented.Length, remaining.Length);
Buffer.BlockCopy(remaining, 0, rented, 0, count);
OutNb.Add(new OutboundChunk(rented, count));
remaining = remaining[count..];
}
if (Kind == ClientKind.Client && OutMp > 0 && OutPb > OutMp)
{
OutPb -= data.Length;
Noticef("Slow Consumer Detected: MaxPending of {0} exceeded", OutMp);
MarkConnAsClosed(ClosedState.SlowConsumerPendingBytes);
}
}
internal Exception? ProcessHeaderPub(byte[] arg, byte[]? remaining)
{
ParseCtx.Kind = Kind;
ParseCtx.HasHeaders = Headers;
ParseCtx.MaxPayload = _mpay == 0 ? -1 : _mpay;
return ProtocolParser.ProcessHeaderPub(ParseCtx, arg, remaining);
}
internal Exception? ProcessPub(byte[] arg)
{
ParseCtx.Kind = Kind;
ParseCtx.MaxPayload = _mpay == 0 ? -1 : _mpay;
return ProtocolParser.ProcessPub(ParseCtx, arg);
}
internal static List<byte[]> SplitArg(byte[] arg) => ProtocolParser.SplitArgs(arg);
internal Exception? ParseSub(byte[] arg, bool noForward)
{
var copied = arg.ToArray();
var args = SplitArg(copied);
byte[] subject;
byte[]? queue;
byte[] sid;
switch (args.Count)
{
case 2:
subject = args[0];
queue = null;
sid = args[1];
break;
case 3:
subject = args[0];
queue = args[1];
sid = args[2];
break;
default:
return new FormatException($"processSub Parse Error: {Encoding.ASCII.GetString(arg)}");
}
ProcessSub(subject, queue, sid, noForward);
return null;
}
internal (Subscription? sub, Exception? err) ProcessSub(byte[] subject, byte[]? queue, byte[] sid, bool noForward)
{
return ProcessSubEx(subject, queue, sid, noForward, false, false);
}
internal (Subscription? sub, Exception? err) ProcessSubEx(
byte[] subject, byte[]? queue, byte[] sid, bool noForward, bool si, bool rsi)
{
lock (_mu)
{
_in.Subs++;
if ((IsClosed() && !ClientKindHelpers.IsInternalClient(Kind)) || Subs is null)
return (null, new InvalidOperationException("connection closed"));
if (SubsAtLimit())
return (null, new InvalidOperationException("too many subs"));
var sidText = Encoding.ASCII.GetString(sid);
if (Subs.TryGetValue(sidText, out var existing))
return (existing, null);
var sub = new Subscription
{
Subject = subject,
Queue = queue,
Sid = sid,
};
Subs[sidText] = sub;
return (sub, null);
}
}
// features 440-441: processInfo, processErr
internal void ProcessInfo(string info)
{

View File

@@ -355,6 +355,8 @@ internal sealed class RespEntry
public int N { get; set; }
}
internal readonly record struct OutboundChunk(byte[] Buffer, int Count);
// ============================================================================
// Buffer pool constants
// ============================================================================