diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.InboundAndHeaders.cs b/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.InboundAndHeaders.cs new file mode 100644 index 0000000..25c9ce6 --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.InboundAndHeaders.cs @@ -0,0 +1,239 @@ +// Copyright 2012-2026 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); + +using System.Text; +using System.Linq; +using ZB.MOM.NatsNet.Server.Internal; +using ZB.MOM.NatsNet.Server.Internal.DataStructures; + +namespace ZB.MOM.NatsNet.Server; + +public sealed partial class ClientConnection +{ + private const string JsAckPrefix = "$JS.ACK."; + private const string GwReplyPrefix = "$GNR."; + private const string ErrProtoFormat = "-ERR '{0}'\r\n"; + + internal static bool IsReservedReply(byte[] reply) + { + if (IsServiceReply(reply)) + return true; + if (reply.Length > JsAckPrefix.Length && Encoding.ASCII.GetString(reply, 0, JsAckPrefix.Length) == JsAckPrefix) + return true; + return reply.Length > GwReplyPrefix.Length && Encoding.ASCII.GetString(reply, 0, GwReplyPrefix.Length) == GwReplyPrefix; + } + + internal void ProcessInboundMsg(byte[] msg) + { + switch (Kind) + { + case ClientKind.Client: + ProcessInboundClientMsg(msg); + break; + case ClientKind.Router: + case ClientKind.Gateway: + case ClientKind.Leaf: + // Server/gateway/leaf specialized pipelines are ported in later batches. + LastIn = DateTime.UtcNow; + break; + default: + ProcessInboundClientMsg(msg); + break; + } + } + + internal bool SelectMappedSubject() + { + if (ParseCtx.Pa.Subject is null || ParseCtx.Pa.Mapped is { Length: > 0 }) + return false; + return false; + } + + internal Subscription? SubForReply(byte[] reply) + { + _ = reply; + return Subs.Values.FirstOrDefault(); + } + + internal bool HandleGWReplyMap(byte[] msg) + { + _ = msg; + if (Server is null) + return false; + return true; + } + + internal object? SetupResponseServiceImport(INatsAccount acc, object? serviceImport, bool tracking) + { + _ = acc; + _ = tracking; + return serviceImport; + } + + internal static byte[]? RemoveHeaderIfPresent(byte[] hdr, string key) => + NatsMessageHeaders.RemoveHeaderIfPresent(hdr, key); + + internal static byte[]? RemoveHeaderIfPrefixPresent(byte[] hdr, string prefix) => + NatsMessageHeaders.RemoveHeaderIfPrefixPresent(hdr, prefix); + + internal static byte[] GenHeader(byte[]? hdr, string key, string value) => + NatsMessageHeaders.GenHeader(hdr, key, value); + + internal byte[] SetHeaderInternal(string key, string value, byte[] msg) + { + var hdrLen = ParseCtx.Pa.HeaderSize; + var existingHeader = hdrLen > 0 && msg.Length >= hdrLen ? msg[..hdrLen] : Array.Empty(); + var body = hdrLen > 0 && msg.Length > hdrLen ? msg[hdrLen..] : msg; + var nextHeader = NatsMessageHeaders.SetHeader(key, value, existingHeader); + + var merged = new byte[nextHeader.Length + body.Length]; + Buffer.BlockCopy(nextHeader, 0, merged, 0, nextHeader.Length); + Buffer.BlockCopy(body, 0, merged, nextHeader.Length, body.Length); + + ParseCtx.Pa.HeaderSize = nextHeader.Length; + ParseCtx.Pa.Size = merged.Length; + ParseCtx.Pa.HeaderBytes = Encoding.ASCII.GetBytes(nextHeader.Length.ToString()); + ParseCtx.Pa.SizeBytes = Encoding.ASCII.GetBytes(merged.Length.ToString()); + return merged; + } + + internal static byte[]? GetHeader(string key, byte[] hdr) => + NatsMessageHeaders.GetHeader(key, hdr); + + internal static ReadOnlyMemory? SliceHeader(string key, byte[] hdr) => + NatsMessageHeaders.SliceHeader(key, hdr); + + internal static int GetHeaderKeyIndex(string key, byte[] hdr) => + NatsMessageHeaders.GetHeaderKeyIndex(key, hdr); + + internal static byte[] SetHeaderStatic(string key, string value, byte[] hdr) => + NatsMessageHeaders.SetHeader(key, value, hdr); + + internal bool ProcessServiceImport(object? serviceImport, INatsAccount? acc, byte[] msg) + { + _ = serviceImport; + _ = acc; + return msg.Length > 0; + } + + internal void AddSubToRouteTargets(Subscription sub) + { + _in.Rts ??= new List(8); + foreach (var rt in _in.Rts) + { + if (ReferenceEquals(rt.Sub?.Client, sub.Client)) + { + if (sub.Queue is { Length: > 0 }) + { + rt.Qs = [.. rt.Qs, .. sub.Queue, (byte)' ']; + } + return; + } + } + + var queueBytes = sub.Queue is { Length: > 0 } q ? [.. q, (byte)' '] : Array.Empty(); + _in.Rts.Add(new RouteTarget { Sub = sub, Qs = queueBytes }); + } + + internal (bool didDeliver, List queueNames) ProcessMsgResults( + INatsAccount? acc, + SubscriptionIndexResult? result, + byte[] msg, + byte[]? deliver, + byte[] subject, + byte[]? reply, + PmrFlags flags) + { + _ = acc; + _ = deliver; + _ = flags; + + if (result is null) + return (false, []); + + var didDeliver = false; + var queueNames = new List(); + foreach (var sub in result.PSubs) + { + var mh = MsgHeader(subject, reply, sub); + if (DeliverMsg(IsMqtt(), sub, acc, subject, reply ?? Array.Empty(), mh, msg, false)) + didDeliver = true; + } + + foreach (var qgroup in result.QSubs) + { + if (qgroup.Count == 0) + continue; + var sub = qgroup[0]; + if (sub.Queue is { Length: > 0 } q) + queueNames.Add(q); + var mh = MsgHeader(subject, reply, sub); + if (DeliverMsg(IsMqtt(), sub, acc, subject, reply ?? Array.Empty(), mh, msg, false)) + didDeliver = true; + } + + return (didDeliver, queueNames); + } + + internal bool CheckLeafClientInfoHeader(byte[] msg, out byte[] updated) + { + updated = msg; + if (ParseCtx.Pa.HeaderSize <= 0 || msg.Length < ParseCtx.Pa.HeaderSize) + return false; + + var hdr = msg[..ParseCtx.Pa.HeaderSize]; + var existing = GetHeader(NatsHeaderConstants.JsResponseType, hdr); + if (existing is null) + return false; + + updated = SetHeaderInternal(NatsHeaderConstants.JsResponseType, Encoding.ASCII.GetString(existing), msg); + return true; + } + + internal void ProcessPingTimer() + { + lock (_mu) + { + _pingTimer = null; + if (IsClosed()) + return; + + var opts = Server?.Options; + var pingInterval = opts?.PingInterval ?? TimeSpan.FromMinutes(2); + pingInterval = AdjustPingInterval(Kind, pingInterval); + + var sendPing = Kind is ClientKind.Router or ClientKind.Gateway; + if (!sendPing) + { + var needRtt = Rtt == TimeSpan.Zero || DateTime.UtcNow - RttStart > TimeSpan.FromMinutes(1); + sendPing = DateTime.UtcNow - LastIn >= pingInterval || needRtt; + } + + if (sendPing) + { + var maxPingsOut = opts?.MaxPingsOut ?? 2; + if (_pingOut + 1 > maxPingsOut) + { + EnqueueProto(Encoding.ASCII.GetBytes(string.Format(ErrProtoFormat, "Stale Connection"))); + CloseConnection(ClosedState.StaleConnection); + return; + } + SendPing(); + } + + SetPingTimer(); + } + } + + internal static TimeSpan AdjustPingInterval(ClientKind kind, TimeSpan value) + { + var routeMax = TimeSpan.FromMinutes(1); + var gatewayMax = TimeSpan.FromMinutes(2); + return kind switch + { + ClientKind.Router when value > routeMax => routeMax, + ClientKind.Gateway when value > gatewayMax => gatewayMax, + _ => value, + }; + } +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.cs b/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.cs index 2bb31fc..edecabb 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.cs @@ -1758,10 +1758,7 @@ public sealed partial class ClientConnection // features 477-496 and 487-503: see ClientConnection.SubscriptionsAndDelivery.cs - // features 512-514: processServiceImport, addSubToRouteTargets, processMsgResults - - // feature 515: checkLeafClientInfoHeader - // feature 520-522: processPingTimer, adjustPingInterval, watchForStaleConnection + // features 497-515 and 520: see ClientConnection.InboundAndHeaders.cs // feature 534-535: swapAccountAfterReload, processSubsOnConfigReload // feature 537: reconnect // feature 569: setFirstPingTimer diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ClientConnectionStubFeaturesTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ClientConnectionStubFeaturesTests.cs index 592898c..19e14f3 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ClientConnectionStubFeaturesTests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ClientConnectionStubFeaturesTests.cs @@ -239,4 +239,48 @@ public sealed class ClientConnectionStubFeaturesTests c.PubAllowedFullCheck("_R_.x", fullCheck: true, hasLock: true).ShouldBeTrue(); c.PubAllowedFullCheck("_R_.x", fullCheck: true, hasLock: true).ShouldBeFalse(); } + + [Fact] + public void InboundAndHeaderHelpers_GroupB_ShouldBehave() + { + ClientConnection.IsReservedReply(Encoding.ASCII.GetBytes("_R_.A.B")).ShouldBeTrue(); + ClientConnection.IsReservedReply(Encoding.ASCII.GetBytes("$JS.ACK.A.B")).ShouldBeTrue(); + ClientConnection.IsReservedReply(Encoding.ASCII.GetBytes("$GNR.A.B")).ShouldBeTrue(); + ClientConnection.IsReservedReply(Encoding.ASCII.GetBytes("foo.bar")).ShouldBeFalse(); + + var c = new ClientConnection(ClientKind.Client) + { + ParseCtx = { Pa = { HeaderSize = 0 } }, + }; + + var before = DateTime.UtcNow; + c.ProcessInboundMsg(Encoding.ASCII.GetBytes("data")); + c.LastIn.ShouldBeGreaterThan(before - TimeSpan.FromMilliseconds(1)); + + c.Subs["sid"] = new Subscription { Sid = Encoding.ASCII.GetBytes("sid"), Subject = Encoding.ASCII.GetBytes("foo") }; + c.SubForReply(Encoding.ASCII.GetBytes("inbox")).ShouldNotBeNull(); + + var header = ClientConnection.GenHeader(null, "X-Test", "one"); + Encoding.ASCII.GetString(ClientConnection.GetHeader("X-Test", header)!).ShouldBe("one"); + ClientConnection.GetHeaderKeyIndex("X-Test", header).ShouldBeGreaterThan(0); + ClientConnection.SliceHeader("X-Test", header).ShouldNotBeNull(); + + var replaced = ClientConnection.SetHeaderStatic("X-Test", "two", header); + Encoding.ASCII.GetString(ClientConnection.GetHeader("X-Test", replaced)!).ShouldBe("two"); + ClientConnection.RemoveHeaderIfPresent(replaced, "X-Test").ShouldBeNull(); + + var prefixed = ClientConnection.GenHeader(header, "Nats-Expected-Last-Sequence", "10"); + ClientConnection.RemoveHeaderIfPrefixPresent(prefixed!, "Nats-Expected-").ShouldNotBeNull(); + + c.ParseCtx.Pa.HeaderSize = header.Length; + var merged = new byte[header.Length + 5]; + Buffer.BlockCopy(header, 0, merged, 0, header.Length); + Buffer.BlockCopy("hello"u8.ToArray(), 0, merged, header.Length, 5); + var next = c.SetHeaderInternal("X-Test", "three", merged); + Encoding.ASCII.GetString(next).ShouldContain("X-Test: three"); + + var result = new SubscriptionIndexResult(); + result.PSubs.Add(new Subscription { Subject = Encoding.ASCII.GetBytes("foo"), Sid = Encoding.ASCII.GetBytes("1") }); + c.ProcessMsgResults(null, result, "hello\r\n"u8.ToArray(), null, Encoding.ASCII.GetBytes("foo"), null, PmrFlags.None).didDeliver.ShouldBeTrue(); + } } diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/Protocol/ProtocolParserTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/Protocol/ProtocolParserTests.cs index 70fb822..8d399e2 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/Protocol/ProtocolParserTests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/Protocol/ProtocolParserTests.cs @@ -181,6 +181,20 @@ public class ProtocolParserTests Encoding.ASCII.GetString(c.ArgBuf!).ShouldBe("foo 1"); } + [Fact] + public void ClientConnection_InboundDispatchAndPingIntervalHelpers_ShouldBehave() + { + var c = new ClientConnection(ClientKind.Client); + var before = DateTime.UtcNow; + c.ProcessInboundMsg(Encoding.ASCII.GetBytes("hello")); + c.LastIn.ShouldBeGreaterThan(before - TimeSpan.FromMilliseconds(1)); + + ClientConnection.AdjustPingInterval(ClientKind.Router, TimeSpan.FromHours(1)) + .ShouldBeLessThan(TimeSpan.FromHours(1)); + ClientConnection.AdjustPingInterval(ClientKind.Gateway, TimeSpan.FromHours(1)) + .ShouldBeLessThan(TimeSpan.FromHours(1)); + } + // ===================================================================== // TestParsePub — Go test ID 2602 // ===================================================================== diff --git a/porting.db b/porting.db index a228dc6..6278a21 100644 Binary files a/porting.db and b/porting.db differ