// Copyright 2012-2026 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); using System.Text; using System.Linq; using ZB.MOM.NatsNet.Server.Internal; using ZB.MOM.NatsNet.Server.Internal.DataStructures; namespace ZB.MOM.NatsNet.Server; public sealed partial class ClientConnection { private const string JsAckPrefix = "$JS.ACK."; private const string GwReplyPrefix = "$GNR."; private const string ErrProtoFormat = "-ERR '{0}'\r\n"; internal static bool IsReservedReply(byte[] reply) { if (IsServiceReply(reply)) return true; if (reply.Length > JsAckPrefix.Length && Encoding.ASCII.GetString(reply, 0, JsAckPrefix.Length) == JsAckPrefix) return true; return reply.Length > GwReplyPrefix.Length && Encoding.ASCII.GetString(reply, 0, GwReplyPrefix.Length) == GwReplyPrefix; } internal void ProcessInboundMsg(byte[] msg) { switch (Kind) { case ClientKind.Client: ProcessInboundClientMsg(msg); break; case ClientKind.Router: case ClientKind.Gateway: case ClientKind.Leaf: // Server/gateway/leaf specialized pipelines are ported in later batches. LastIn = DateTime.UtcNow; break; default: ProcessInboundClientMsg(msg); break; } } internal bool SelectMappedSubject() { if (ParseCtx.Pa.Subject is null || ParseCtx.Pa.Mapped is { Length: > 0 }) return false; return false; } internal Subscription? SubForReply(byte[] reply) { _ = reply; return Subs.Values.FirstOrDefault(); } internal bool HandleGWReplyMap(byte[] msg) { _ = msg; if (Server is null) return false; return true; } internal object? SetupResponseServiceImport(INatsAccount acc, object? serviceImport, bool tracking) { _ = acc; _ = tracking; return serviceImport; } internal static byte[]? RemoveHeaderIfPresent(byte[] hdr, string key) => NatsMessageHeaders.RemoveHeaderIfPresent(hdr, key); internal static byte[]? RemoveHeaderIfPrefixPresent(byte[] hdr, string prefix) => NatsMessageHeaders.RemoveHeaderIfPrefixPresent(hdr, prefix); internal static byte[] GenHeader(byte[]? hdr, string key, string value) => NatsMessageHeaders.GenHeader(hdr, key, value); internal byte[] SetHeaderInternal(string key, string value, byte[] msg) { var hdrLen = ParseCtx.Pa.HeaderSize; var existingHeader = hdrLen > 0 && msg.Length >= hdrLen ? msg[..hdrLen] : Array.Empty(); var body = hdrLen > 0 && msg.Length > hdrLen ? msg[hdrLen..] : msg; var nextHeader = NatsMessageHeaders.SetHeader(key, value, existingHeader); var merged = new byte[nextHeader.Length + body.Length]; Buffer.BlockCopy(nextHeader, 0, merged, 0, nextHeader.Length); Buffer.BlockCopy(body, 0, merged, nextHeader.Length, body.Length); ParseCtx.Pa.HeaderSize = nextHeader.Length; ParseCtx.Pa.Size = merged.Length; ParseCtx.Pa.HeaderBytes = Encoding.ASCII.GetBytes(nextHeader.Length.ToString()); ParseCtx.Pa.SizeBytes = Encoding.ASCII.GetBytes(merged.Length.ToString()); return merged; } internal static byte[]? GetHeader(string key, byte[] hdr) => NatsMessageHeaders.GetHeader(key, hdr); internal static ReadOnlyMemory? SliceHeader(string key, byte[] hdr) => NatsMessageHeaders.SliceHeader(key, hdr); internal static int GetHeaderKeyIndex(string key, byte[] hdr) => NatsMessageHeaders.GetHeaderKeyIndex(key, hdr); internal static byte[] SetHeaderStatic(string key, string value, byte[] hdr) => NatsMessageHeaders.SetHeader(key, value, hdr); internal bool ProcessServiceImport(object? serviceImport, INatsAccount? acc, byte[] msg) { _ = serviceImport; _ = acc; return msg.Length > 0; } internal void AddSubToRouteTargets(Subscription sub) { _in.Rts ??= new List(8); foreach (var rt in _in.Rts) { if (ReferenceEquals(rt.Sub?.Client, sub.Client)) { if (sub.Queue is { Length: > 0 }) { rt.Qs = [.. rt.Qs, .. sub.Queue, (byte)' ']; } return; } } var queueBytes = sub.Queue is { Length: > 0 } q ? [.. q, (byte)' '] : Array.Empty(); _in.Rts.Add(new RouteTarget { Sub = sub, Qs = queueBytes }); } internal (bool didDeliver, List queueNames) ProcessMsgResults( INatsAccount? acc, SubscriptionIndexResult? result, byte[] msg, byte[]? deliver, byte[] subject, byte[]? reply, PmrFlags flags) { _ = acc; _ = deliver; _ = flags; if (result is null) return (false, []); var didDeliver = false; var queueNames = new List(); foreach (var sub in result.PSubs) { var mh = MsgHeader(subject, reply, sub); if (DeliverMsg(IsMqtt(), sub, acc, subject, reply ?? Array.Empty(), mh, msg, false)) didDeliver = true; } foreach (var qgroup in result.QSubs) { if (qgroup.Count == 0) continue; var sub = qgroup[0]; if (sub.Queue is { Length: > 0 } q) queueNames.Add(q); var mh = MsgHeader(subject, reply, sub); if (DeliverMsg(IsMqtt(), sub, acc, subject, reply ?? Array.Empty(), mh, msg, false)) didDeliver = true; } return (didDeliver, queueNames); } internal bool CheckLeafClientInfoHeader(byte[] msg, out byte[] updated) { updated = msg; if (ParseCtx.Pa.HeaderSize <= 0 || msg.Length < ParseCtx.Pa.HeaderSize) return false; var hdr = msg[..ParseCtx.Pa.HeaderSize]; var existing = GetHeader(NatsHeaderConstants.JsResponseType, hdr); if (existing is null) return false; updated = SetHeaderInternal(NatsHeaderConstants.JsResponseType, Encoding.ASCII.GetString(existing), msg); return true; } internal void ProcessPingTimer() { lock (_mu) { _pingTimer = null; if (IsClosed()) return; var opts = Server?.Options; var pingInterval = opts?.PingInterval ?? TimeSpan.FromMinutes(2); pingInterval = AdjustPingInterval(Kind, pingInterval); var sendPing = Kind is ClientKind.Router or ClientKind.Gateway; if (!sendPing) { var needRtt = Rtt == TimeSpan.Zero || DateTime.UtcNow - RttStart > TimeSpan.FromMinutes(1); sendPing = DateTime.UtcNow - LastIn >= pingInterval || needRtt; } if (sendPing) { var maxPingsOut = opts?.MaxPingsOut ?? 2; if (_pingOut + 1 > maxPingsOut) { EnqueueProto(Encoding.ASCII.GetBytes(string.Format(ErrProtoFormat, "Stale Connection"))); CloseConnection(ClosedState.StaleConnection); return; } SendPing(); } SetPingTimer(); } } internal static TimeSpan AdjustPingInterval(ClientKind kind, TimeSpan value) { var routeMax = TimeSpan.FromMinutes(1); var gatewayMax = TimeSpan.FromMinutes(2); return kind switch { ClientKind.Router when value > routeMax => routeMax, ClientKind.Gateway when value > gatewayMax => gatewayMax, _ => value, }; } }