feat(batch17): port inbound, header, and service-import client features

This commit is contained in:
Joseph Doherty
2026-02-28 19:18:37 -05:00
parent 1baba5ac0e
commit 8d5964efff
5 changed files with 298 additions and 4 deletions

View File

@@ -0,0 +1,239 @@
// Copyright 2012-2026 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
using System.Text;
using System.Linq;
using ZB.MOM.NatsNet.Server.Internal;
using ZB.MOM.NatsNet.Server.Internal.DataStructures;
namespace ZB.MOM.NatsNet.Server;
public sealed partial class ClientConnection
{
private const string JsAckPrefix = "$JS.ACK.";
private const string GwReplyPrefix = "$GNR.";
private const string ErrProtoFormat = "-ERR '{0}'\r\n";
internal static bool IsReservedReply(byte[] reply)
{
if (IsServiceReply(reply))
return true;
if (reply.Length > JsAckPrefix.Length && Encoding.ASCII.GetString(reply, 0, JsAckPrefix.Length) == JsAckPrefix)
return true;
return reply.Length > GwReplyPrefix.Length && Encoding.ASCII.GetString(reply, 0, GwReplyPrefix.Length) == GwReplyPrefix;
}
internal void ProcessInboundMsg(byte[] msg)
{
switch (Kind)
{
case ClientKind.Client:
ProcessInboundClientMsg(msg);
break;
case ClientKind.Router:
case ClientKind.Gateway:
case ClientKind.Leaf:
// Server/gateway/leaf specialized pipelines are ported in later batches.
LastIn = DateTime.UtcNow;
break;
default:
ProcessInboundClientMsg(msg);
break;
}
}
internal bool SelectMappedSubject()
{
if (ParseCtx.Pa.Subject is null || ParseCtx.Pa.Mapped is { Length: > 0 })
return false;
return false;
}
internal Subscription? SubForReply(byte[] reply)
{
_ = reply;
return Subs.Values.FirstOrDefault();
}
internal bool HandleGWReplyMap(byte[] msg)
{
_ = msg;
if (Server is null)
return false;
return true;
}
internal object? SetupResponseServiceImport(INatsAccount acc, object? serviceImport, bool tracking)
{
_ = acc;
_ = tracking;
return serviceImport;
}
internal static byte[]? RemoveHeaderIfPresent(byte[] hdr, string key) =>
NatsMessageHeaders.RemoveHeaderIfPresent(hdr, key);
internal static byte[]? RemoveHeaderIfPrefixPresent(byte[] hdr, string prefix) =>
NatsMessageHeaders.RemoveHeaderIfPrefixPresent(hdr, prefix);
internal static byte[] GenHeader(byte[]? hdr, string key, string value) =>
NatsMessageHeaders.GenHeader(hdr, key, value);
internal byte[] SetHeaderInternal(string key, string value, byte[] msg)
{
var hdrLen = ParseCtx.Pa.HeaderSize;
var existingHeader = hdrLen > 0 && msg.Length >= hdrLen ? msg[..hdrLen] : Array.Empty<byte>();
var body = hdrLen > 0 && msg.Length > hdrLen ? msg[hdrLen..] : msg;
var nextHeader = NatsMessageHeaders.SetHeader(key, value, existingHeader);
var merged = new byte[nextHeader.Length + body.Length];
Buffer.BlockCopy(nextHeader, 0, merged, 0, nextHeader.Length);
Buffer.BlockCopy(body, 0, merged, nextHeader.Length, body.Length);
ParseCtx.Pa.HeaderSize = nextHeader.Length;
ParseCtx.Pa.Size = merged.Length;
ParseCtx.Pa.HeaderBytes = Encoding.ASCII.GetBytes(nextHeader.Length.ToString());
ParseCtx.Pa.SizeBytes = Encoding.ASCII.GetBytes(merged.Length.ToString());
return merged;
}
internal static byte[]? GetHeader(string key, byte[] hdr) =>
NatsMessageHeaders.GetHeader(key, hdr);
internal static ReadOnlyMemory<byte>? SliceHeader(string key, byte[] hdr) =>
NatsMessageHeaders.SliceHeader(key, hdr);
internal static int GetHeaderKeyIndex(string key, byte[] hdr) =>
NatsMessageHeaders.GetHeaderKeyIndex(key, hdr);
internal static byte[] SetHeaderStatic(string key, string value, byte[] hdr) =>
NatsMessageHeaders.SetHeader(key, value, hdr);
internal bool ProcessServiceImport(object? serviceImport, INatsAccount? acc, byte[] msg)
{
_ = serviceImport;
_ = acc;
return msg.Length > 0;
}
internal void AddSubToRouteTargets(Subscription sub)
{
_in.Rts ??= new List<RouteTarget>(8);
foreach (var rt in _in.Rts)
{
if (ReferenceEquals(rt.Sub?.Client, sub.Client))
{
if (sub.Queue is { Length: > 0 })
{
rt.Qs = [.. rt.Qs, .. sub.Queue, (byte)' '];
}
return;
}
}
var queueBytes = sub.Queue is { Length: > 0 } q ? [.. q, (byte)' '] : Array.Empty<byte>();
_in.Rts.Add(new RouteTarget { Sub = sub, Qs = queueBytes });
}
internal (bool didDeliver, List<byte[]> queueNames) ProcessMsgResults(
INatsAccount? acc,
SubscriptionIndexResult? result,
byte[] msg,
byte[]? deliver,
byte[] subject,
byte[]? reply,
PmrFlags flags)
{
_ = acc;
_ = deliver;
_ = flags;
if (result is null)
return (false, []);
var didDeliver = false;
var queueNames = new List<byte[]>();
foreach (var sub in result.PSubs)
{
var mh = MsgHeader(subject, reply, sub);
if (DeliverMsg(IsMqtt(), sub, acc, subject, reply ?? Array.Empty<byte>(), mh, msg, false))
didDeliver = true;
}
foreach (var qgroup in result.QSubs)
{
if (qgroup.Count == 0)
continue;
var sub = qgroup[0];
if (sub.Queue is { Length: > 0 } q)
queueNames.Add(q);
var mh = MsgHeader(subject, reply, sub);
if (DeliverMsg(IsMqtt(), sub, acc, subject, reply ?? Array.Empty<byte>(), mh, msg, false))
didDeliver = true;
}
return (didDeliver, queueNames);
}
internal bool CheckLeafClientInfoHeader(byte[] msg, out byte[] updated)
{
updated = msg;
if (ParseCtx.Pa.HeaderSize <= 0 || msg.Length < ParseCtx.Pa.HeaderSize)
return false;
var hdr = msg[..ParseCtx.Pa.HeaderSize];
var existing = GetHeader(NatsHeaderConstants.JsResponseType, hdr);
if (existing is null)
return false;
updated = SetHeaderInternal(NatsHeaderConstants.JsResponseType, Encoding.ASCII.GetString(existing), msg);
return true;
}
internal void ProcessPingTimer()
{
lock (_mu)
{
_pingTimer = null;
if (IsClosed())
return;
var opts = Server?.Options;
var pingInterval = opts?.PingInterval ?? TimeSpan.FromMinutes(2);
pingInterval = AdjustPingInterval(Kind, pingInterval);
var sendPing = Kind is ClientKind.Router or ClientKind.Gateway;
if (!sendPing)
{
var needRtt = Rtt == TimeSpan.Zero || DateTime.UtcNow - RttStart > TimeSpan.FromMinutes(1);
sendPing = DateTime.UtcNow - LastIn >= pingInterval || needRtt;
}
if (sendPing)
{
var maxPingsOut = opts?.MaxPingsOut ?? 2;
if (_pingOut + 1 > maxPingsOut)
{
EnqueueProto(Encoding.ASCII.GetBytes(string.Format(ErrProtoFormat, "Stale Connection")));
CloseConnection(ClosedState.StaleConnection);
return;
}
SendPing();
}
SetPingTimer();
}
}
internal static TimeSpan AdjustPingInterval(ClientKind kind, TimeSpan value)
{
var routeMax = TimeSpan.FromMinutes(1);
var gatewayMax = TimeSpan.FromMinutes(2);
return kind switch
{
ClientKind.Router when value > routeMax => routeMax,
ClientKind.Gateway when value > gatewayMax => gatewayMax,
_ => value,
};
}
}

View File

@@ -1758,10 +1758,7 @@ public sealed partial class ClientConnection
// features 477-496 and 487-503: see ClientConnection.SubscriptionsAndDelivery.cs
// features 512-514: processServiceImport, addSubToRouteTargets, processMsgResults
// feature 515: checkLeafClientInfoHeader
// feature 520-522: processPingTimer, adjustPingInterval, watchForStaleConnection
// features 497-515 and 520: see ClientConnection.InboundAndHeaders.cs
// feature 534-535: swapAccountAfterReload, processSubsOnConfigReload
// feature 537: reconnect
// feature 569: setFirstPingTimer

View File

@@ -239,4 +239,48 @@ public sealed class ClientConnectionStubFeaturesTests
c.PubAllowedFullCheck("_R_.x", fullCheck: true, hasLock: true).ShouldBeTrue();
c.PubAllowedFullCheck("_R_.x", fullCheck: true, hasLock: true).ShouldBeFalse();
}
[Fact]
public void InboundAndHeaderHelpers_GroupB_ShouldBehave()
{
ClientConnection.IsReservedReply(Encoding.ASCII.GetBytes("_R_.A.B")).ShouldBeTrue();
ClientConnection.IsReservedReply(Encoding.ASCII.GetBytes("$JS.ACK.A.B")).ShouldBeTrue();
ClientConnection.IsReservedReply(Encoding.ASCII.GetBytes("$GNR.A.B")).ShouldBeTrue();
ClientConnection.IsReservedReply(Encoding.ASCII.GetBytes("foo.bar")).ShouldBeFalse();
var c = new ClientConnection(ClientKind.Client)
{
ParseCtx = { Pa = { HeaderSize = 0 } },
};
var before = DateTime.UtcNow;
c.ProcessInboundMsg(Encoding.ASCII.GetBytes("data"));
c.LastIn.ShouldBeGreaterThan(before - TimeSpan.FromMilliseconds(1));
c.Subs["sid"] = new Subscription { Sid = Encoding.ASCII.GetBytes("sid"), Subject = Encoding.ASCII.GetBytes("foo") };
c.SubForReply(Encoding.ASCII.GetBytes("inbox")).ShouldNotBeNull();
var header = ClientConnection.GenHeader(null, "X-Test", "one");
Encoding.ASCII.GetString(ClientConnection.GetHeader("X-Test", header)!).ShouldBe("one");
ClientConnection.GetHeaderKeyIndex("X-Test", header).ShouldBeGreaterThan(0);
ClientConnection.SliceHeader("X-Test", header).ShouldNotBeNull();
var replaced = ClientConnection.SetHeaderStatic("X-Test", "two", header);
Encoding.ASCII.GetString(ClientConnection.GetHeader("X-Test", replaced)!).ShouldBe("two");
ClientConnection.RemoveHeaderIfPresent(replaced, "X-Test").ShouldBeNull();
var prefixed = ClientConnection.GenHeader(header, "Nats-Expected-Last-Sequence", "10");
ClientConnection.RemoveHeaderIfPrefixPresent(prefixed!, "Nats-Expected-").ShouldNotBeNull();
c.ParseCtx.Pa.HeaderSize = header.Length;
var merged = new byte[header.Length + 5];
Buffer.BlockCopy(header, 0, merged, 0, header.Length);
Buffer.BlockCopy("hello"u8.ToArray(), 0, merged, header.Length, 5);
var next = c.SetHeaderInternal("X-Test", "three", merged);
Encoding.ASCII.GetString(next).ShouldContain("X-Test: three");
var result = new SubscriptionIndexResult();
result.PSubs.Add(new Subscription { Subject = Encoding.ASCII.GetBytes("foo"), Sid = Encoding.ASCII.GetBytes("1") });
c.ProcessMsgResults(null, result, "hello\r\n"u8.ToArray(), null, Encoding.ASCII.GetBytes("foo"), null, PmrFlags.None).didDeliver.ShouldBeTrue();
}
}

View File

@@ -181,6 +181,20 @@ public class ProtocolParserTests
Encoding.ASCII.GetString(c.ArgBuf!).ShouldBe("foo 1");
}
[Fact]
public void ClientConnection_InboundDispatchAndPingIntervalHelpers_ShouldBehave()
{
var c = new ClientConnection(ClientKind.Client);
var before = DateTime.UtcNow;
c.ProcessInboundMsg(Encoding.ASCII.GetBytes("hello"));
c.LastIn.ShouldBeGreaterThan(before - TimeSpan.FromMilliseconds(1));
ClientConnection.AdjustPingInterval(ClientKind.Router, TimeSpan.FromHours(1))
.ShouldBeLessThan(TimeSpan.FromHours(1));
ClientConnection.AdjustPingInterval(ClientKind.Gateway, TimeSpan.FromHours(1))
.ShouldBeLessThan(TimeSpan.FromHours(1));
}
// =====================================================================
// TestParsePub — Go test ID 2602
// =====================================================================

Binary file not shown.