feat: add WebSocket permessage-deflate compression
Implement WsCompression with Compress/Decompress methods per RFC 7692. Key .NET adaptation: Flush() without Dispose() on DeflateStream to produce the correct sync flush marker that can be stripped and re-appended.
This commit is contained in:
89
src/NATS.Server/WebSocket/WsCompression.cs
Normal file
89
src/NATS.Server/WebSocket/WsCompression.cs
Normal file
@@ -0,0 +1,89 @@
|
||||
using System.IO.Compression;
|
||||
|
||||
namespace NATS.Server.WebSocket;
|
||||
|
||||
/// <summary>
|
||||
/// permessage-deflate compression/decompression for WebSocket frames (RFC 7692).
|
||||
/// Ported from golang/nats-server/server/websocket.go lines 403-440 and 1391-1466.
|
||||
/// </summary>
|
||||
public static class WsCompression
|
||||
{
|
||||
/// <summary>
|
||||
/// Compresses data using deflate. Removes trailing 4 bytes (sync marker)
|
||||
/// per RFC 7692 Section 7.2.1.
|
||||
/// </summary>
|
||||
/// <remarks>
|
||||
/// We call Flush() but intentionally do not Dispose() the DeflateStream before
|
||||
/// reading output, because Dispose writes a final deflate block (0x03 0x00) that
|
||||
/// would be corrupted by the 4-byte tail strip. Flush() alone writes a sync flush
|
||||
/// ending with 0x00 0x00 0xff 0xff, matching Go's flate.Writer.Flush() behavior.
|
||||
/// </remarks>
|
||||
public static byte[] Compress(ReadOnlySpan<byte> data)
|
||||
{
|
||||
var output = new MemoryStream();
|
||||
var deflate = new DeflateStream(output, CompressionLevel.Fastest, leaveOpen: true);
|
||||
deflate.Write(data);
|
||||
deflate.Flush();
|
||||
|
||||
var compressed = output.ToArray();
|
||||
|
||||
deflate.Dispose();
|
||||
output.Dispose();
|
||||
|
||||
// Remove trailing 4-byte sync marker (0x00 0x00 0xff 0xff) per RFC 7692
|
||||
if (compressed.Length >= 4)
|
||||
return compressed[..^4];
|
||||
|
||||
return compressed;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Decompresses collected compressed buffers.
|
||||
/// Appends trailer bytes before decompressing per RFC 7692 Section 7.2.2.
|
||||
/// Ported from golang/nats-server/server/websocket.go lines 403-440.
|
||||
/// The Go code appends compressLastBlock (9 bytes) which includes the sync
|
||||
/// marker plus a final empty stored block to signal end-of-stream to the
|
||||
/// flate reader.
|
||||
/// </summary>
|
||||
public static byte[] Decompress(List<byte[]> compressedBuffers, int maxPayload)
|
||||
{
|
||||
if (maxPayload <= 0)
|
||||
maxPayload = 1024 * 1024; // Default 1MB
|
||||
|
||||
// Concatenate all compressed buffers + trailer.
|
||||
// Per RFC 7692 Section 7.2.2, append the sync flush marker (0x00 0x00 0xff 0xff)
|
||||
// that was stripped during compression. The Go reference appends compressLastBlock
|
||||
// (9 bytes) for Go's flate reader; .NET's DeflateStream only needs the 4-byte trailer.
|
||||
int totalLen = 0;
|
||||
foreach (var buf in compressedBuffers)
|
||||
totalLen += buf.Length;
|
||||
totalLen += WsConstants.DecompressTrailer.Length;
|
||||
|
||||
var combined = new byte[totalLen];
|
||||
int offset = 0;
|
||||
foreach (var buf in compressedBuffers)
|
||||
{
|
||||
buf.CopyTo(combined, offset);
|
||||
offset += buf.Length;
|
||||
}
|
||||
|
||||
WsConstants.DecompressTrailer.CopyTo(combined, offset);
|
||||
|
||||
using var input = new MemoryStream(combined);
|
||||
using var deflate = new DeflateStream(input, CompressionMode.Decompress);
|
||||
using var output = new MemoryStream();
|
||||
|
||||
var readBuf = new byte[4096];
|
||||
int totalRead = 0;
|
||||
int n;
|
||||
while ((n = deflate.Read(readBuf, 0, readBuf.Length)) > 0)
|
||||
{
|
||||
totalRead += n;
|
||||
if (totalRead > maxPayload)
|
||||
throw new InvalidOperationException("decompressed data exceeds maximum payload size");
|
||||
output.Write(readBuf, 0, n);
|
||||
}
|
||||
|
||||
return output.ToArray();
|
||||
}
|
||||
}
|
||||
58
tests/NATS.Server.Tests/WebSocket/WsCompressionTests.cs
Normal file
58
tests/NATS.Server.Tests/WebSocket/WsCompressionTests.cs
Normal file
@@ -0,0 +1,58 @@
|
||||
using NATS.Server.WebSocket;
|
||||
using Shouldly;
|
||||
|
||||
namespace NATS.Server.Tests.WebSocket;
|
||||
|
||||
public class WsCompressionTests
|
||||
{
|
||||
[Fact]
|
||||
public void CompressDecompress_RoundTrip()
|
||||
{
|
||||
var original = "Hello, WebSocket compression test! This is long enough to compress."u8.ToArray();
|
||||
var compressed = WsCompression.Compress(original);
|
||||
compressed.ShouldNotBeNull();
|
||||
compressed.Length.ShouldBeGreaterThan(0);
|
||||
|
||||
var decompressed = WsCompression.Decompress([compressed], maxPayload: 4096);
|
||||
decompressed.ShouldBe(original);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Decompress_ExceedsMaxPayload_Throws()
|
||||
{
|
||||
var original = new byte[1000];
|
||||
Random.Shared.NextBytes(original);
|
||||
var compressed = WsCompression.Compress(original);
|
||||
|
||||
Should.Throw<InvalidOperationException>(() =>
|
||||
WsCompression.Decompress([compressed], maxPayload: 100));
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Compress_RemovesTrailing4Bytes()
|
||||
{
|
||||
var data = new byte[200];
|
||||
Random.Shared.NextBytes(data);
|
||||
var compressed = WsCompression.Compress(data);
|
||||
|
||||
// The compressed data should be valid for decompression when we add the trailer back
|
||||
var decompressed = WsCompression.Decompress([compressed], maxPayload: 4096);
|
||||
decompressed.ShouldBe(data);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Decompress_MultipleBuffers()
|
||||
{
|
||||
var original = new byte[500];
|
||||
Random.Shared.NextBytes(original);
|
||||
var compressed = WsCompression.Compress(original);
|
||||
|
||||
// Split compressed data into multiple chunks
|
||||
int mid = compressed.Length / 2;
|
||||
var chunk1 = compressed[..mid];
|
||||
var chunk2 = compressed[mid..];
|
||||
|
||||
var decompressed = WsCompression.Decompress([chunk1, chunk2], maxPayload: 4096);
|
||||
decompressed.ShouldBe(original);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user