diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.cs b/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.cs index 5111b61..90fc0c0 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.cs @@ -122,8 +122,8 @@ public sealed partial class ClientConnection internal bool Headers; // mirrors c.headers // Limits (int32 allows atomic access). - private int _mpay; // mirrors c.mpay — max payload (signed, jwt.NoLimit = -1) - private int _msubs; // mirrors c.msubs — max subscriptions + private int _mpay = -1; // mirrors c.mpay — max payload (signed, jwt.NoLimit = -1) + private int _msubs = -1; // mirrors c.msubs — max subscriptions private int _mcl; // mirrors c.mcl — max control line // Subscriptions. @@ -136,6 +136,9 @@ public sealed partial class ClientConnection internal long OutPb; // pending bytes internal long OutMp; // max pending snapshot internal TimeSpan OutWdl; // write deadline snapshot + internal WriteTimeoutPolicy OutWtp = WriteTimeoutPolicy.Close; + internal List OutNb = []; + internal List OutWnb = []; // Timing. internal DateTime Start; @@ -1474,6 +1477,206 @@ public sealed partial class ClientConnection internal static ClosedState ClosedStateForErr(Exception err) => err is EndOfStreamException ? ClosedState.ClientClosed : ClosedState.ReadError; + internal (List chunks, long attempted) CollapsePtoNB() + { + long attempted = 0; + foreach (var chunk in OutNb) + attempted += chunk.Count; + return (OutNb, attempted); + } + + internal bool FlushOutbound() + { + if (Flags.IsSet(ClientFlags.FlushOutbound)) + return false; + + Flags = Flags.Set(ClientFlags.FlushOutbound); + try + { + if (_nc is null || Server is null || OutPb == 0) + return true; + + var (collapsed, attempted) = CollapsePtoNB(); + OutNb = []; + if (collapsed.Count > 0) + OutWnb.AddRange(collapsed); + + long written = 0; + try + { + foreach (var chunk in OutWnb) + { + _nc.Write(chunk.Buffer, 0, chunk.Count); + written += chunk.Count; + } + _nc.Flush(); + } + catch (IOException ioEx) when (ioEx.InnerException is SocketException se && + (se.SocketErrorCode == SocketError.TimedOut || + se.SocketErrorCode == SocketError.WouldBlock)) + { + if (HandleWriteTimeout(written, attempted, OutWnb.Count)) + return true; + } + catch (Exception ex) + { + Debugf("Error flushing: {0}", ex.Message); + MarkConnAsClosed(ClosedState.WriteError); + return true; + } + + if (written > 0) + { + OutPb = Math.Max(0, OutPb - written); + if (OutPb == 0) + OutWnb.Clear(); + } + + return true; + } + finally + { + Flags = Flags.Clear(ClientFlags.FlushOutbound); + } + } + + internal bool HandleWriteTimeout(long written, long attempted, int numChunks) + { + if (Flags.IsSet(ClientFlags.ExpectConnect) && !Flags.IsSet(ClientFlags.ConnectReceived)) + { + MarkConnAsClosed(ClosedState.SlowConsumerWriteDeadline); + return true; + } + + if (OutWtp == WriteTimeoutPolicy.Close || written == 0) + { + MarkConnAsClosed(ClosedState.SlowConsumerWriteDeadline); + return true; + } + + Flags = Flags.Set(ClientFlags.IsSlowConsumer); + Noticef("Slow Consumer State: WriteDeadline exceeded with {0} chunks of {1} bytes.", numChunks, attempted); + return false; + } + + internal void MarkConnAsClosed(ClosedState reason) + { + if (reason is ClosedState.ReadError or ClosedState.WriteError or ClosedState.SlowConsumerPendingBytes + or ClosedState.SlowConsumerWriteDeadline or ClosedState.TlsHandshakeError) + { + Flags = Flags.Set(ClientFlags.SkipFlushOnClose); + } + + if (Flags.IsSet(ClientFlags.ConnMarkedClosed)) + return; + + Flags = Flags.Set(ClientFlags.ConnMarkedClosed); + CloseConnection(reason); + } + + internal void QueueOutbound(byte[] data) + { + if (IsClosed()) + return; + + OutPb += data.Length; + var remaining = data; + while (remaining.Length > 0) + { + var rented = NbPool.Get(remaining.Length); + var count = Math.Min(rented.Length, remaining.Length); + Buffer.BlockCopy(remaining, 0, rented, 0, count); + OutNb.Add(new OutboundChunk(rented, count)); + remaining = remaining[count..]; + } + + if (Kind == ClientKind.Client && OutMp > 0 && OutPb > OutMp) + { + OutPb -= data.Length; + Noticef("Slow Consumer Detected: MaxPending of {0} exceeded", OutMp); + MarkConnAsClosed(ClosedState.SlowConsumerPendingBytes); + } + } + + internal Exception? ProcessHeaderPub(byte[] arg, byte[]? remaining) + { + ParseCtx.Kind = Kind; + ParseCtx.HasHeaders = Headers; + ParseCtx.MaxPayload = _mpay == 0 ? -1 : _mpay; + return ProtocolParser.ProcessHeaderPub(ParseCtx, arg, remaining); + } + + internal Exception? ProcessPub(byte[] arg) + { + ParseCtx.Kind = Kind; + ParseCtx.MaxPayload = _mpay == 0 ? -1 : _mpay; + return ProtocolParser.ProcessPub(ParseCtx, arg); + } + + internal static List SplitArg(byte[] arg) => ProtocolParser.SplitArgs(arg); + + internal Exception? ParseSub(byte[] arg, bool noForward) + { + var copied = arg.ToArray(); + var args = SplitArg(copied); + byte[] subject; + byte[]? queue; + byte[] sid; + + switch (args.Count) + { + case 2: + subject = args[0]; + queue = null; + sid = args[1]; + break; + case 3: + subject = args[0]; + queue = args[1]; + sid = args[2]; + break; + default: + return new FormatException($"processSub Parse Error: {Encoding.ASCII.GetString(arg)}"); + } + + ProcessSub(subject, queue, sid, noForward); + return null; + } + + internal (Subscription? sub, Exception? err) ProcessSub(byte[] subject, byte[]? queue, byte[] sid, bool noForward) + { + return ProcessSubEx(subject, queue, sid, noForward, false, false); + } + + internal (Subscription? sub, Exception? err) ProcessSubEx( + byte[] subject, byte[]? queue, byte[] sid, bool noForward, bool si, bool rsi) + { + lock (_mu) + { + _in.Subs++; + + if ((IsClosed() && !ClientKindHelpers.IsInternalClient(Kind)) || Subs is null) + return (null, new InvalidOperationException("connection closed")); + + if (SubsAtLimit()) + return (null, new InvalidOperationException("too many subs")); + + var sidText = Encoding.ASCII.GetString(sid); + if (Subs.TryGetValue(sidText, out var existing)) + return (existing, null); + + var sub = new Subscription + { + Subject = subject, + Queue = queue, + Sid = sid, + }; + Subs[sidText] = sub; + + return (sub, null); + } + } + // features 440-441: processInfo, processErr internal void ProcessInfo(string info) { diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/ClientTypes.cs b/dotnet/src/ZB.MOM.NatsNet.Server/ClientTypes.cs index 9835751..608761e 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/ClientTypes.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/ClientTypes.cs @@ -355,6 +355,8 @@ internal sealed class RespEntry public int N { get; set; } } +internal readonly record struct OutboundChunk(byte[] Buffer, int Count); + // ============================================================================ // Buffer pool constants // ============================================================================ diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ClientConnectionStubFeaturesTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ClientConnectionStubFeaturesTests.cs index eab7bcb..57b3e75 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ClientConnectionStubFeaturesTests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ClientConnectionStubFeaturesTests.cs @@ -3,6 +3,7 @@ using System.Reflection; using System.Text; +using System.Linq; using Shouldly; using ZB.MOM.NatsNet.Server; using ZB.MOM.NatsNet.Server.Internal; @@ -58,4 +59,97 @@ public sealed class ClientConnectionStubFeaturesTests .GetField(field, BindingFlags.Instance | BindingFlags.NonPublic)! .GetValue(c); } + + [Fact] + public void QueueOutbound_ChunkingAndPendingBytes_ShouldTrackState() + { + var c = new ClientConnection(ClientKind.Client) + { + OutMp = 100_000, + }; + + c.QueueOutbound(new byte[70_000]); + + c.OutPb.ShouldBe(70_000); + c.OutNb.Count.ShouldBeGreaterThan(1); + c.OutNb.Sum(chunk => chunk.Count).ShouldBe(70_000); + } + + [Fact] + public void FlushOutbound_WithoutServerOrConn_ShouldNoOpTrue() + { + var c = new ClientConnection(ClientKind.Client); + c.QueueOutbound(Encoding.ASCII.GetBytes("hello")); + + c.FlushOutbound().ShouldBeTrue(); + c.OutPb.ShouldBe(5); + } + + [Fact] + public void HandleWriteTimeout_ClosePolicy_ShouldMarkClosed() + { + var c = new ClientConnection(ClientKind.Client) + { + OutWtp = WriteTimeoutPolicy.Close, + }; + + c.HandleWriteTimeout(0, 100, 1).ShouldBeTrue(); + c.Flags.IsSet(ClientFlags.ConnMarkedClosed).ShouldBeTrue(); + c.Flags.IsSet(ClientFlags.SkipFlushOnClose).ShouldBeTrue(); + } + + [Fact] + public void HandleWriteTimeout_RetryPolicy_ShouldSetSlowConsumerFlag() + { + var c = new ClientConnection(ClientKind.Client) + { + OutWtp = WriteTimeoutPolicy.Retry, + }; + + c.HandleWriteTimeout(1, 100, 2).ShouldBeFalse(); + c.Flags.IsSet(ClientFlags.IsSlowConsumer).ShouldBeTrue(); + } + + [Fact] + public void ProcessPubAndHeaderPubWrappers_ShouldPopulateParseContext() + { + var c = new ClientConnection(ClientKind.Client) + { + Headers = true, + }; + + c.ProcessPub(Encoding.ASCII.GetBytes("foo 5")).ShouldBeNull(); + Encoding.ASCII.GetString(c.ParseCtx.Pa.Subject!).ShouldBe("foo"); + c.ParseCtx.Pa.Size.ShouldBe(5); + + c.ProcessHeaderPub(Encoding.ASCII.GetBytes("foo 3 5"), null).ShouldBeNull(); + Encoding.ASCII.GetString(c.ParseCtx.Pa.Subject!).ShouldBe("foo"); + c.ParseCtx.Pa.HeaderSize.ShouldBe(3); + c.ParseCtx.Pa.Size.ShouldBe(5); + } + + [Fact] + public void SplitArgParseSubAndProcessSub_ShouldCreateSubscriptions() + { + var tokens = ClientConnection.SplitArg(Encoding.ASCII.GetBytes("foo queue sid\r\n")); + tokens.Count.ShouldBe(3); + Encoding.ASCII.GetString(tokens[0]).ShouldBe("foo"); + Encoding.ASCII.GetString(tokens[1]).ShouldBe("queue"); + Encoding.ASCII.GetString(tokens[2]).ShouldBe("sid"); + + var c = new ClientConnection(ClientKind.Client); + c.ParseSub(Encoding.ASCII.GetBytes("foo queue sid"), noForward: true).ShouldBeNull(); + c.Subs.Count.ShouldBe(1); + + var result = c.ProcessSubEx( + Encoding.ASCII.GetBytes("bar"), + null, + Encoding.ASCII.GetBytes("sid2"), + noForward: false, + si: false, + rsi: false); + result.err.ShouldBeNull(); + result.sub.ShouldNotBeNull(); + c.Subs.Count.ShouldBe(2); + } } diff --git a/porting.db b/porting.db index aaa4dcc..bda06b2 100644 Binary files a/porting.db and b/porting.db differ