// Copyright 2012-2026 The NATS Authors // Licensed under the Apache License, Version 2.0 using System.Reflection; using System.Text; using System.Linq; using Shouldly; using ZB.MOM.NatsNet.Server; using ZB.MOM.NatsNet.Server.Internal; namespace ZB.MOM.NatsNet.Server.Tests; public sealed class ClientConnectionStubFeaturesTests { [Fact] public void ProcessConnect_ProcessPong_AndTimers_ShouldBehave() { var (server, err) = NatsServer.NewServer(new ServerOptions { PingInterval = TimeSpan.FromMilliseconds(20), AuthTimeout = 0.1, }); err.ShouldBeNull(); using var ms = new MemoryStream(); var c = new ClientConnection(ClientKind.Client, server, ms) { Cid = 9, Trace = true, }; var connectJson = Encoding.UTF8.GetBytes("{\"echo\":false,\"headers\":true,\"name\":\"unit\"}"); c.ProcessConnect(connectJson); c.Opts.Name.ShouldBe("unit"); c.Echo.ShouldBeFalse(); c.Headers.ShouldBeTrue(); c.RttStart = DateTime.UtcNow - TimeSpan.FromMilliseconds(50); c.ProcessPong(); c.GetRttValue().ShouldBeGreaterThan(TimeSpan.Zero); c.SetPingTimer(); GetTimer(c, "_pingTimer").ShouldNotBeNull(); c.SetAuthTimer(TimeSpan.FromMilliseconds(20)); GetTimer(c, "_atmr").ShouldNotBeNull(); c.TraceMsg(Encoding.UTF8.GetBytes("MSG")); c.FlushSignal(); c.UpdateS2AutoCompressionLevel(); c.SetExpirationTimer(TimeSpan.Zero); c.IsClosed().ShouldBeTrue(); } private static Timer? GetTimer(ClientConnection c, string field) { return (Timer?)typeof(ClientConnection) .GetField(field, BindingFlags.Instance | BindingFlags.NonPublic)! .GetValue(c); } [Fact] public void QueueOutbound_ChunkingAndPendingBytes_ShouldTrackState() { var c = new ClientConnection(ClientKind.Client) { OutMp = 100_000, }; c.QueueOutbound(new byte[70_000]); c.OutPb.ShouldBe(70_000); c.OutNb.Count.ShouldBeGreaterThan(1); c.OutNb.Sum(chunk => chunk.Count).ShouldBe(70_000); } [Fact] public void FlushOutbound_WithoutServerOrConn_ShouldNoOpTrue() { var c = new ClientConnection(ClientKind.Client); c.QueueOutbound(Encoding.ASCII.GetBytes("hello")); c.FlushOutbound().ShouldBeTrue(); c.OutPb.ShouldBe(5); } [Fact] public void HandleWriteTimeout_ClosePolicy_ShouldMarkClosed() { var c = new ClientConnection(ClientKind.Client) { OutWtp = WriteTimeoutPolicy.Close, }; c.HandleWriteTimeout(0, 100, 1).ShouldBeTrue(); c.Flags.IsSet(ClientFlags.ConnMarkedClosed).ShouldBeTrue(); c.Flags.IsSet(ClientFlags.SkipFlushOnClose).ShouldBeTrue(); } [Fact] public void HandleWriteTimeout_RetryPolicy_ShouldSetSlowConsumerFlag() { var c = new ClientConnection(ClientKind.Client) { OutWtp = WriteTimeoutPolicy.Retry, }; c.HandleWriteTimeout(1, 100, 2).ShouldBeFalse(); c.Flags.IsSet(ClientFlags.IsSlowConsumer).ShouldBeTrue(); } [Fact] public void ProcessPubAndHeaderPubWrappers_ShouldPopulateParseContext() { var c = new ClientConnection(ClientKind.Client) { Headers = true, }; c.ProcessPub(Encoding.ASCII.GetBytes("foo 5")).ShouldBeNull(); Encoding.ASCII.GetString(c.ParseCtx.Pa.Subject!).ShouldBe("foo"); c.ParseCtx.Pa.Size.ShouldBe(5); c.ProcessHeaderPub(Encoding.ASCII.GetBytes("foo 3 5"), null).ShouldBeNull(); Encoding.ASCII.GetString(c.ParseCtx.Pa.Subject!).ShouldBe("foo"); c.ParseCtx.Pa.HeaderSize.ShouldBe(3); c.ParseCtx.Pa.Size.ShouldBe(5); } [Fact] public void SplitArgParseSubAndProcessSub_ShouldCreateSubscriptions() { var tokens = ClientConnection.SplitArg(Encoding.ASCII.GetBytes("foo queue sid\r\n")); tokens.Count.ShouldBe(3); Encoding.ASCII.GetString(tokens[0]).ShouldBe("foo"); Encoding.ASCII.GetString(tokens[1]).ShouldBe("queue"); Encoding.ASCII.GetString(tokens[2]).ShouldBe("sid"); var c = new ClientConnection(ClientKind.Client); c.ParseSub(Encoding.ASCII.GetBytes("foo queue sid"), noForward: true).ShouldBeNull(); c.Subs.Count.ShouldBe(1); var result = c.ProcessSubEx( Encoding.ASCII.GetBytes("bar"), null, Encoding.ASCII.GetBytes("sid2"), noForward: false, si: false, rsi: false); result.err.ShouldBeNull(); result.sub.ShouldNotBeNull(); c.Subs.Count.ShouldBe(2); } }