From d49bc5b0d7c421c00269a2167a0ed3cc4c8da551 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Mon, 23 Feb 2026 04:42:31 -0500 Subject: [PATCH] feat: add WebSocket permessage-deflate compression Implement WsCompression with Compress/Decompress methods per RFC 7692. Key .NET adaptation: Flush() without Dispose() on DeflateStream to produce the correct sync flush marker that can be stripped and re-appended. --- src/NATS.Server/WebSocket/WsCompression.cs | 89 +++++++++++++++++++ .../WebSocket/WsCompressionTests.cs | 58 ++++++++++++ 2 files changed, 147 insertions(+) create mode 100644 src/NATS.Server/WebSocket/WsCompression.cs create mode 100644 tests/NATS.Server.Tests/WebSocket/WsCompressionTests.cs diff --git a/src/NATS.Server/WebSocket/WsCompression.cs b/src/NATS.Server/WebSocket/WsCompression.cs new file mode 100644 index 0000000..d5b8a5d --- /dev/null +++ b/src/NATS.Server/WebSocket/WsCompression.cs @@ -0,0 +1,89 @@ +using System.IO.Compression; + +namespace NATS.Server.WebSocket; + +/// +/// permessage-deflate compression/decompression for WebSocket frames (RFC 7692). +/// Ported from golang/nats-server/server/websocket.go lines 403-440 and 1391-1466. +/// +public static class WsCompression +{ + /// + /// Compresses data using deflate. Removes trailing 4 bytes (sync marker) + /// per RFC 7692 Section 7.2.1. + /// + /// + /// We call Flush() but intentionally do not Dispose() the DeflateStream before + /// reading output, because Dispose writes a final deflate block (0x03 0x00) that + /// would be corrupted by the 4-byte tail strip. Flush() alone writes a sync flush + /// ending with 0x00 0x00 0xff 0xff, matching Go's flate.Writer.Flush() behavior. + /// + public static byte[] Compress(ReadOnlySpan data) + { + var output = new MemoryStream(); + var deflate = new DeflateStream(output, CompressionLevel.Fastest, leaveOpen: true); + deflate.Write(data); + deflate.Flush(); + + var compressed = output.ToArray(); + + deflate.Dispose(); + output.Dispose(); + + // Remove trailing 4-byte sync marker (0x00 0x00 0xff 0xff) per RFC 7692 + if (compressed.Length >= 4) + return compressed[..^4]; + + return compressed; + } + + /// + /// Decompresses collected compressed buffers. + /// Appends trailer bytes before decompressing per RFC 7692 Section 7.2.2. + /// Ported from golang/nats-server/server/websocket.go lines 403-440. + /// The Go code appends compressLastBlock (9 bytes) which includes the sync + /// marker plus a final empty stored block to signal end-of-stream to the + /// flate reader. + /// + public static byte[] Decompress(List compressedBuffers, int maxPayload) + { + if (maxPayload <= 0) + maxPayload = 1024 * 1024; // Default 1MB + + // Concatenate all compressed buffers + trailer. + // Per RFC 7692 Section 7.2.2, append the sync flush marker (0x00 0x00 0xff 0xff) + // that was stripped during compression. The Go reference appends compressLastBlock + // (9 bytes) for Go's flate reader; .NET's DeflateStream only needs the 4-byte trailer. + int totalLen = 0; + foreach (var buf in compressedBuffers) + totalLen += buf.Length; + totalLen += WsConstants.DecompressTrailer.Length; + + var combined = new byte[totalLen]; + int offset = 0; + foreach (var buf in compressedBuffers) + { + buf.CopyTo(combined, offset); + offset += buf.Length; + } + + WsConstants.DecompressTrailer.CopyTo(combined, offset); + + using var input = new MemoryStream(combined); + using var deflate = new DeflateStream(input, CompressionMode.Decompress); + using var output = new MemoryStream(); + + var readBuf = new byte[4096]; + int totalRead = 0; + int n; + while ((n = deflate.Read(readBuf, 0, readBuf.Length)) > 0) + { + totalRead += n; + if (totalRead > maxPayload) + throw new InvalidOperationException("decompressed data exceeds maximum payload size"); + output.Write(readBuf, 0, n); + } + + return output.ToArray(); + } +} diff --git a/tests/NATS.Server.Tests/WebSocket/WsCompressionTests.cs b/tests/NATS.Server.Tests/WebSocket/WsCompressionTests.cs new file mode 100644 index 0000000..425534c --- /dev/null +++ b/tests/NATS.Server.Tests/WebSocket/WsCompressionTests.cs @@ -0,0 +1,58 @@ +using NATS.Server.WebSocket; +using Shouldly; + +namespace NATS.Server.Tests.WebSocket; + +public class WsCompressionTests +{ + [Fact] + public void CompressDecompress_RoundTrip() + { + var original = "Hello, WebSocket compression test! This is long enough to compress."u8.ToArray(); + var compressed = WsCompression.Compress(original); + compressed.ShouldNotBeNull(); + compressed.Length.ShouldBeGreaterThan(0); + + var decompressed = WsCompression.Decompress([compressed], maxPayload: 4096); + decompressed.ShouldBe(original); + } + + [Fact] + public void Decompress_ExceedsMaxPayload_Throws() + { + var original = new byte[1000]; + Random.Shared.NextBytes(original); + var compressed = WsCompression.Compress(original); + + Should.Throw(() => + WsCompression.Decompress([compressed], maxPayload: 100)); + } + + [Fact] + public void Compress_RemovesTrailing4Bytes() + { + var data = new byte[200]; + Random.Shared.NextBytes(data); + var compressed = WsCompression.Compress(data); + + // The compressed data should be valid for decompression when we add the trailer back + var decompressed = WsCompression.Decompress([compressed], maxPayload: 4096); + decompressed.ShouldBe(data); + } + + [Fact] + public void Decompress_MultipleBuffers() + { + var original = new byte[500]; + Random.Shared.NextBytes(original); + var compressed = WsCompression.Compress(original); + + // Split compressed data into multiple chunks + int mid = compressed.Length / 2; + var chunk1 = compressed[..mid]; + var chunk2 = compressed[mid..]; + + var decompressed = WsCompression.Decompress([chunk1, chunk2], maxPayload: 4096); + decompressed.ShouldBe(original); + } +}