feat: add S2 compression for system events (Gap 10.9)
Add EventCompressor static class with Snappy/S2 compress/decompress, threshold-based ShouldCompress, CompressIfBeneficial with stats tracking, and GetCompressionRatio helpers. Port 10 tests covering round-trip, threshold logic, stats (TotalCompressed, BytesSaved, ResetStats). Go reference: server/events.go:2082-2098 compressionType / snappyCompression.
This commit is contained in:
138
src/NATS.Server/Events/EventCompressor.cs
Normal file
138
src/NATS.Server/Events/EventCompressor.cs
Normal file
@@ -0,0 +1,138 @@
|
||||
// Go reference: server/events.go:2081-2090 — compressionType, snappyCompression,
|
||||
// and events.go:578-598 — internalSendLoop compression via s2.WriterSnappyCompat().
|
||||
|
||||
using IronSnappy;
|
||||
|
||||
namespace NATS.Server.Events;
|
||||
|
||||
/// <summary>
|
||||
/// Provides S2 (Snappy-compatible) compression for system event payloads.
|
||||
/// Maps to Go's compressionType / snappyCompression handling in events.go:2082-2098
|
||||
/// and the compression branch in the internalSendLoop (events.go:578-598).
|
||||
/// </summary>
|
||||
public static class EventCompressor
|
||||
{
|
||||
// Default threshold: only compress payloads larger than this many bytes.
|
||||
// Compressing tiny payloads wastes CPU and may produce larger output.
|
||||
private const int DefaultThresholdBytes = 256;
|
||||
|
||||
private static long _totalCompressed;
|
||||
private static long _totalUncompressed;
|
||||
private static long _bytesSaved;
|
||||
|
||||
/// <summary>
|
||||
/// Total number of compress operations performed (payloads that met the
|
||||
/// threshold and were actually compressed). Thread-safe via <see cref="Interlocked"/>.
|
||||
/// </summary>
|
||||
public static long TotalCompressed => Interlocked.Read(ref _totalCompressed);
|
||||
|
||||
/// <summary>
|
||||
/// Total number of payloads that were left uncompressed (below threshold or
|
||||
/// <see cref="CompressIfBeneficial"/> returned the original). Thread-safe.
|
||||
/// </summary>
|
||||
public static long TotalUncompressed => Interlocked.Read(ref _totalUncompressed);
|
||||
|
||||
/// <summary>
|
||||
/// Cumulative bytes saved across all compression operations.
|
||||
/// Computed as (original size − compressed size) for each call to
|
||||
/// <see cref="CompressIfBeneficial"/> that chose to compress. Thread-safe.
|
||||
/// </summary>
|
||||
public static long BytesSaved => Interlocked.Read(ref _bytesSaved);
|
||||
|
||||
/// <summary>
|
||||
/// Resets all statistics counters to zero. Useful in test isolation.
|
||||
/// </summary>
|
||||
public static void ResetStats()
|
||||
{
|
||||
Interlocked.Exchange(ref _totalCompressed, 0);
|
||||
Interlocked.Exchange(ref _totalUncompressed, 0);
|
||||
Interlocked.Exchange(ref _bytesSaved, 0);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Compresses <paramref name="payload"/> using the Snappy block format via IronSnappy.
|
||||
/// Go reference: events.go:591 — <c>s2.WriterSnappyCompat()</c> writes Snappy-compatible frames.
|
||||
/// </summary>
|
||||
/// <param name="payload">Raw bytes to compress.</param>
|
||||
/// <returns>Compressed bytes. Returns an empty array for empty input.</returns>
|
||||
public static byte[] Compress(ReadOnlySpan<byte> payload)
|
||||
{
|
||||
if (payload.IsEmpty)
|
||||
return [];
|
||||
|
||||
return Snappy.Encode(payload);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Decompresses a Snappy-compressed <paramref name="compressed"/> payload.
|
||||
/// Go reference: events.go:590 — <c>s2.NewReader</c> / Snappy-compat decode.
|
||||
/// </summary>
|
||||
/// <param name="compressed">Snappy-compressed bytes to decode.</param>
|
||||
/// <returns>Decompressed bytes. Returns an empty array for empty input.</returns>
|
||||
/// <exception cref="Exception">Propagated from IronSnappy if data is corrupt.</exception>
|
||||
public static byte[] Decompress(ReadOnlySpan<byte> compressed)
|
||||
{
|
||||
if (compressed.IsEmpty)
|
||||
return [];
|
||||
|
||||
return Snappy.Decode(compressed);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Returns <see langword="true"/> when <paramref name="payloadSize"/> exceeds
|
||||
/// <paramref name="thresholdBytes"/>. Mirrors the Go pattern where compression
|
||||
/// is only applied once a minimum payload size is reached (events.go:578).
|
||||
/// </summary>
|
||||
/// <param name="payloadSize">Number of bytes in the raw payload.</param>
|
||||
/// <param name="thresholdBytes">Minimum size to trigger compression (default 256).</param>
|
||||
public static bool ShouldCompress(int payloadSize, int thresholdBytes = DefaultThresholdBytes)
|
||||
=> payloadSize > thresholdBytes;
|
||||
|
||||
/// <summary>
|
||||
/// Compresses <paramref name="payload"/> if it exceeds <paramref name="thresholdBytes"/>,
|
||||
/// updating the shared statistics counters via <see cref="Interlocked"/>.
|
||||
/// Returns the original span as a new array when the payload is too small.
|
||||
/// </summary>
|
||||
/// <param name="payload">Raw bytes to (maybe) compress.</param>
|
||||
/// <param name="thresholdBytes">Minimum size to trigger compression (default 256).</param>
|
||||
/// <returns>
|
||||
/// A tuple of (<c>Data</c>, <c>Compressed</c>):
|
||||
/// <list type="bullet">
|
||||
/// <item><c>Compressed = true</c> — <c>Data</c> is Snappy-compressed.</item>
|
||||
/// <item><c>Compressed = false</c> — <c>Data</c> is the original bytes unchanged.</item>
|
||||
/// </list>
|
||||
/// </returns>
|
||||
public static (byte[] Data, bool Compressed) CompressIfBeneficial(
|
||||
ReadOnlySpan<byte> payload,
|
||||
int thresholdBytes = DefaultThresholdBytes)
|
||||
{
|
||||
if (!ShouldCompress(payload.Length, thresholdBytes))
|
||||
{
|
||||
Interlocked.Increment(ref _totalUncompressed);
|
||||
return (payload.ToArray(), false);
|
||||
}
|
||||
|
||||
var compressed = Compress(payload);
|
||||
Interlocked.Increment(ref _totalCompressed);
|
||||
var saved = payload.Length - compressed.Length;
|
||||
if (saved > 0)
|
||||
Interlocked.Add(ref _bytesSaved, saved);
|
||||
|
||||
return (compressed, true);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Returns the compression ratio as <c>compressedSize / originalSize</c>.
|
||||
/// A value less than 1.0 indicates the data shrank; exactly 0.0 means the
|
||||
/// original was empty. Lower is better.
|
||||
/// </summary>
|
||||
/// <param name="originalSize">Size of the uncompressed payload in bytes.</param>
|
||||
/// <param name="compressedSize">Size of the compressed output in bytes.</param>
|
||||
public static double GetCompressionRatio(int originalSize, int compressedSize)
|
||||
{
|
||||
if (originalSize == 0)
|
||||
return 0.0;
|
||||
|
||||
return (double)compressedSize / originalSize;
|
||||
}
|
||||
}
|
||||
171
tests/NATS.Server.Tests/Events/EventCompressionTests.cs
Normal file
171
tests/NATS.Server.Tests/Events/EventCompressionTests.cs
Normal file
@@ -0,0 +1,171 @@
|
||||
// Go reference: server/events.go:2082-2090 — compressionType / snappyCompression,
|
||||
// and events.go:578-598 — internalSendLoop optional compression branch.
|
||||
|
||||
using System.Text;
|
||||
using NATS.Server.Events;
|
||||
|
||||
namespace NATS.Server.Tests.Events;
|
||||
|
||||
/// <summary>
|
||||
/// Tests for <see cref="EventCompressor"/> — S2/Snappy compression for system event payloads.
|
||||
/// Go reference: server/events.go — compressed system events via snappyCompression.
|
||||
/// </summary>
|
||||
public class EventCompressionTests : IDisposable
|
||||
{
|
||||
public EventCompressionTests()
|
||||
{
|
||||
// Ensure a clean statistics baseline for every test.
|
||||
EventCompressor.ResetStats();
|
||||
}
|
||||
|
||||
public void Dispose()
|
||||
{
|
||||
EventCompressor.ResetStats();
|
||||
}
|
||||
|
||||
// ── 1 ──────────────────────────────────────────────────────────────────────
|
||||
[Fact]
|
||||
public void Compress_ValidPayload_ReturnsCompressed()
|
||||
{
|
||||
// Arrange
|
||||
var json = """{"server":{"name":"s1","id":"ABCDEF"},"data":{"conns":42,"bytes":1024}}""";
|
||||
var payload = Encoding.UTF8.GetBytes(json);
|
||||
|
||||
// Act
|
||||
var compressed = EventCompressor.Compress(payload);
|
||||
|
||||
// Assert
|
||||
compressed.ShouldNotBeNull();
|
||||
compressed.Length.ShouldBeGreaterThan(0);
|
||||
// Snappy output begins with a varint for the original length — not the same raw bytes.
|
||||
compressed.ShouldNotBe(payload);
|
||||
}
|
||||
|
||||
// ── 2 ──────────────────────────────────────────────────────────────────────
|
||||
[Fact]
|
||||
public void Decompress_RoundTrip_MatchesOriginal()
|
||||
{
|
||||
// Arrange
|
||||
var original = Encoding.UTF8.GetBytes(
|
||||
"""{"server":{"name":"test","id":"XYZ"},"stats":{"cpu":0.5,"mem":1048576}}""");
|
||||
|
||||
// Act
|
||||
var compressed = EventCompressor.Compress(original);
|
||||
var decompressed = EventCompressor.Decompress(compressed);
|
||||
|
||||
// Assert
|
||||
decompressed.ShouldBe(original);
|
||||
}
|
||||
|
||||
// ── 3 ──────────────────────────────────────────────────────────────────────
|
||||
[Fact]
|
||||
public void ShouldCompress_BelowThreshold_ReturnsFalse()
|
||||
{
|
||||
// 100 bytes is well below the default 256-byte threshold.
|
||||
EventCompressor.ShouldCompress(100).ShouldBeFalse();
|
||||
}
|
||||
|
||||
// ── 4 ──────────────────────────────────────────────────────────────────────
|
||||
[Fact]
|
||||
public void ShouldCompress_AboveThreshold_ReturnsTrue()
|
||||
{
|
||||
// 500 bytes exceeds the default 256-byte threshold.
|
||||
EventCompressor.ShouldCompress(500).ShouldBeTrue();
|
||||
}
|
||||
|
||||
// ── 5 ──────────────────────────────────────────────────────────────────────
|
||||
[Fact]
|
||||
public void CompressIfBeneficial_SmallPayload_NotCompressed()
|
||||
{
|
||||
// Arrange — 50 bytes is below the 256-byte threshold.
|
||||
var payload = Encoding.UTF8.GetBytes("small");
|
||||
|
||||
// Act
|
||||
var (data, compressed) = EventCompressor.CompressIfBeneficial(payload);
|
||||
|
||||
// Assert
|
||||
compressed.ShouldBeFalse();
|
||||
data.ShouldBe(payload);
|
||||
}
|
||||
|
||||
// ── 6 ──────────────────────────────────────────────────────────────────────
|
||||
[Fact]
|
||||
public void CompressIfBeneficial_LargePayload_Compressed()
|
||||
{
|
||||
// Arrange — build a payload well above the 256-byte threshold.
|
||||
var largeJson = """{"server":{"name":"s1"},"data":""" + new string('x', 500) + "}";
|
||||
var payload = Encoding.UTF8.GetBytes(largeJson);
|
||||
payload.Length.ShouldBeGreaterThan(256);
|
||||
|
||||
// Act
|
||||
var (data, isCompressed) = EventCompressor.CompressIfBeneficial(payload);
|
||||
|
||||
// Assert
|
||||
isCompressed.ShouldBeTrue();
|
||||
// The returned bytes should decompress back to the original.
|
||||
var restored = EventCompressor.Decompress(data);
|
||||
restored.ShouldBe(payload);
|
||||
}
|
||||
|
||||
// ── 7 ──────────────────────────────────────────────────────────────────────
|
||||
[Fact]
|
||||
public void GetCompressionRatio_Calculates()
|
||||
{
|
||||
// 100 / 200 = 0.5
|
||||
var ratio = EventCompressor.GetCompressionRatio(originalSize: 200, compressedSize: 100);
|
||||
|
||||
ratio.ShouldBe(0.5, tolerance: 0.001);
|
||||
}
|
||||
|
||||
// ── 8 ──────────────────────────────────────────────────────────────────────
|
||||
[Fact]
|
||||
public void TotalCompressed_IncrementedOnCompress()
|
||||
{
|
||||
// Arrange — stats were reset in constructor.
|
||||
EventCompressor.TotalCompressed.ShouldBe(0L);
|
||||
|
||||
var largePayload = Encoding.UTF8.GetBytes(new string('a', 512));
|
||||
|
||||
// Act — two calls that exceed threshold.
|
||||
EventCompressor.CompressIfBeneficial(largePayload);
|
||||
EventCompressor.CompressIfBeneficial(largePayload);
|
||||
|
||||
// Assert
|
||||
EventCompressor.TotalCompressed.ShouldBe(2L);
|
||||
}
|
||||
|
||||
// ── 9 ──────────────────────────────────────────────────────────────────────
|
||||
[Fact]
|
||||
public void BytesSaved_TracksCorrectly()
|
||||
{
|
||||
// Arrange
|
||||
// Use a highly-compressible payload so savings are guaranteed.
|
||||
var payload = Encoding.UTF8.GetBytes(new string('z', 1024));
|
||||
|
||||
// Act
|
||||
EventCompressor.CompressIfBeneficial(payload);
|
||||
|
||||
// Assert — compressed version of 1 024 repeated bytes should be much smaller.
|
||||
EventCompressor.BytesSaved.ShouldBeGreaterThan(0L);
|
||||
// BytesSaved = original - compressed; should be less than original size.
|
||||
EventCompressor.BytesSaved.ShouldBeLessThan(payload.Length);
|
||||
}
|
||||
|
||||
// ── 10 ─────────────────────────────────────────────────────────────────────
|
||||
[Fact]
|
||||
public void ResetStats_ClearsAll()
|
||||
{
|
||||
// Arrange — produce some stats first.
|
||||
var largePayload = Encoding.UTF8.GetBytes(new string('b', 512));
|
||||
EventCompressor.CompressIfBeneficial(largePayload);
|
||||
EventCompressor.TotalCompressed.ShouldBeGreaterThan(0L);
|
||||
|
||||
// Act
|
||||
EventCompressor.ResetStats();
|
||||
|
||||
// Assert
|
||||
EventCompressor.TotalCompressed.ShouldBe(0L);
|
||||
EventCompressor.TotalUncompressed.ShouldBe(0L);
|
||||
EventCompressor.BytesSaved.ShouldBe(0L);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user