diff --git a/src/NATS.Server/Events/EventCompressor.cs b/src/NATS.Server/Events/EventCompressor.cs new file mode 100644 index 0000000..8252404 --- /dev/null +++ b/src/NATS.Server/Events/EventCompressor.cs @@ -0,0 +1,138 @@ +// Go reference: server/events.go:2081-2090 — compressionType, snappyCompression, +// and events.go:578-598 — internalSendLoop compression via s2.WriterSnappyCompat(). + +using IronSnappy; + +namespace NATS.Server.Events; + +/// +/// Provides S2 (Snappy-compatible) compression for system event payloads. +/// Maps to Go's compressionType / snappyCompression handling in events.go:2082-2098 +/// and the compression branch in the internalSendLoop (events.go:578-598). +/// +public static class EventCompressor +{ + // Default threshold: only compress payloads larger than this many bytes. + // Compressing tiny payloads wastes CPU and may produce larger output. + private const int DefaultThresholdBytes = 256; + + private static long _totalCompressed; + private static long _totalUncompressed; + private static long _bytesSaved; + + /// + /// Total number of compress operations performed (payloads that met the + /// threshold and were actually compressed). Thread-safe via . + /// + public static long TotalCompressed => Interlocked.Read(ref _totalCompressed); + + /// + /// Total number of payloads that were left uncompressed (below threshold or + /// returned the original). Thread-safe. + /// + public static long TotalUncompressed => Interlocked.Read(ref _totalUncompressed); + + /// + /// Cumulative bytes saved across all compression operations. + /// Computed as (original size − compressed size) for each call to + /// that chose to compress. Thread-safe. + /// + public static long BytesSaved => Interlocked.Read(ref _bytesSaved); + + /// + /// Resets all statistics counters to zero. Useful in test isolation. + /// + public static void ResetStats() + { + Interlocked.Exchange(ref _totalCompressed, 0); + Interlocked.Exchange(ref _totalUncompressed, 0); + Interlocked.Exchange(ref _bytesSaved, 0); + } + + /// + /// Compresses using the Snappy block format via IronSnappy. + /// Go reference: events.go:591 — s2.WriterSnappyCompat() writes Snappy-compatible frames. + /// + /// Raw bytes to compress. + /// Compressed bytes. Returns an empty array for empty input. + public static byte[] Compress(ReadOnlySpan payload) + { + if (payload.IsEmpty) + return []; + + return Snappy.Encode(payload); + } + + /// + /// Decompresses a Snappy-compressed payload. + /// Go reference: events.go:590 — s2.NewReader / Snappy-compat decode. + /// + /// Snappy-compressed bytes to decode. + /// Decompressed bytes. Returns an empty array for empty input. + /// Propagated from IronSnappy if data is corrupt. + public static byte[] Decompress(ReadOnlySpan compressed) + { + if (compressed.IsEmpty) + return []; + + return Snappy.Decode(compressed); + } + + /// + /// Returns when exceeds + /// . Mirrors the Go pattern where compression + /// is only applied once a minimum payload size is reached (events.go:578). + /// + /// Number of bytes in the raw payload. + /// Minimum size to trigger compression (default 256). + public static bool ShouldCompress(int payloadSize, int thresholdBytes = DefaultThresholdBytes) + => payloadSize > thresholdBytes; + + /// + /// Compresses if it exceeds , + /// updating the shared statistics counters via . + /// Returns the original span as a new array when the payload is too small. + /// + /// Raw bytes to (maybe) compress. + /// Minimum size to trigger compression (default 256). + /// + /// A tuple of (Data, Compressed): + /// + /// Compressed = trueData is Snappy-compressed. + /// Compressed = falseData is the original bytes unchanged. + /// + /// + public static (byte[] Data, bool Compressed) CompressIfBeneficial( + ReadOnlySpan payload, + int thresholdBytes = DefaultThresholdBytes) + { + if (!ShouldCompress(payload.Length, thresholdBytes)) + { + Interlocked.Increment(ref _totalUncompressed); + return (payload.ToArray(), false); + } + + var compressed = Compress(payload); + Interlocked.Increment(ref _totalCompressed); + var saved = payload.Length - compressed.Length; + if (saved > 0) + Interlocked.Add(ref _bytesSaved, saved); + + return (compressed, true); + } + + /// + /// Returns the compression ratio as compressedSize / originalSize. + /// A value less than 1.0 indicates the data shrank; exactly 0.0 means the + /// original was empty. Lower is better. + /// + /// Size of the uncompressed payload in bytes. + /// Size of the compressed output in bytes. + public static double GetCompressionRatio(int originalSize, int compressedSize) + { + if (originalSize == 0) + return 0.0; + + return (double)compressedSize / originalSize; + } +} diff --git a/tests/NATS.Server.Tests/Events/EventCompressionTests.cs b/tests/NATS.Server.Tests/Events/EventCompressionTests.cs new file mode 100644 index 0000000..e6256e0 --- /dev/null +++ b/tests/NATS.Server.Tests/Events/EventCompressionTests.cs @@ -0,0 +1,171 @@ +// Go reference: server/events.go:2082-2090 — compressionType / snappyCompression, +// and events.go:578-598 — internalSendLoop optional compression branch. + +using System.Text; +using NATS.Server.Events; + +namespace NATS.Server.Tests.Events; + +/// +/// Tests for — S2/Snappy compression for system event payloads. +/// Go reference: server/events.go — compressed system events via snappyCompression. +/// +public class EventCompressionTests : IDisposable +{ + public EventCompressionTests() + { + // Ensure a clean statistics baseline for every test. + EventCompressor.ResetStats(); + } + + public void Dispose() + { + EventCompressor.ResetStats(); + } + + // ── 1 ────────────────────────────────────────────────────────────────────── + [Fact] + public void Compress_ValidPayload_ReturnsCompressed() + { + // Arrange + var json = """{"server":{"name":"s1","id":"ABCDEF"},"data":{"conns":42,"bytes":1024}}"""; + var payload = Encoding.UTF8.GetBytes(json); + + // Act + var compressed = EventCompressor.Compress(payload); + + // Assert + compressed.ShouldNotBeNull(); + compressed.Length.ShouldBeGreaterThan(0); + // Snappy output begins with a varint for the original length — not the same raw bytes. + compressed.ShouldNotBe(payload); + } + + // ── 2 ────────────────────────────────────────────────────────────────────── + [Fact] + public void Decompress_RoundTrip_MatchesOriginal() + { + // Arrange + var original = Encoding.UTF8.GetBytes( + """{"server":{"name":"test","id":"XYZ"},"stats":{"cpu":0.5,"mem":1048576}}"""); + + // Act + var compressed = EventCompressor.Compress(original); + var decompressed = EventCompressor.Decompress(compressed); + + // Assert + decompressed.ShouldBe(original); + } + + // ── 3 ────────────────────────────────────────────────────────────────────── + [Fact] + public void ShouldCompress_BelowThreshold_ReturnsFalse() + { + // 100 bytes is well below the default 256-byte threshold. + EventCompressor.ShouldCompress(100).ShouldBeFalse(); + } + + // ── 4 ────────────────────────────────────────────────────────────────────── + [Fact] + public void ShouldCompress_AboveThreshold_ReturnsTrue() + { + // 500 bytes exceeds the default 256-byte threshold. + EventCompressor.ShouldCompress(500).ShouldBeTrue(); + } + + // ── 5 ────────────────────────────────────────────────────────────────────── + [Fact] + public void CompressIfBeneficial_SmallPayload_NotCompressed() + { + // Arrange — 50 bytes is below the 256-byte threshold. + var payload = Encoding.UTF8.GetBytes("small"); + + // Act + var (data, compressed) = EventCompressor.CompressIfBeneficial(payload); + + // Assert + compressed.ShouldBeFalse(); + data.ShouldBe(payload); + } + + // ── 6 ────────────────────────────────────────────────────────────────────── + [Fact] + public void CompressIfBeneficial_LargePayload_Compressed() + { + // Arrange — build a payload well above the 256-byte threshold. + var largeJson = """{"server":{"name":"s1"},"data":""" + new string('x', 500) + "}"; + var payload = Encoding.UTF8.GetBytes(largeJson); + payload.Length.ShouldBeGreaterThan(256); + + // Act + var (data, isCompressed) = EventCompressor.CompressIfBeneficial(payload); + + // Assert + isCompressed.ShouldBeTrue(); + // The returned bytes should decompress back to the original. + var restored = EventCompressor.Decompress(data); + restored.ShouldBe(payload); + } + + // ── 7 ────────────────────────────────────────────────────────────────────── + [Fact] + public void GetCompressionRatio_Calculates() + { + // 100 / 200 = 0.5 + var ratio = EventCompressor.GetCompressionRatio(originalSize: 200, compressedSize: 100); + + ratio.ShouldBe(0.5, tolerance: 0.001); + } + + // ── 8 ────────────────────────────────────────────────────────────────────── + [Fact] + public void TotalCompressed_IncrementedOnCompress() + { + // Arrange — stats were reset in constructor. + EventCompressor.TotalCompressed.ShouldBe(0L); + + var largePayload = Encoding.UTF8.GetBytes(new string('a', 512)); + + // Act — two calls that exceed threshold. + EventCompressor.CompressIfBeneficial(largePayload); + EventCompressor.CompressIfBeneficial(largePayload); + + // Assert + EventCompressor.TotalCompressed.ShouldBe(2L); + } + + // ── 9 ────────────────────────────────────────────────────────────────────── + [Fact] + public void BytesSaved_TracksCorrectly() + { + // Arrange + // Use a highly-compressible payload so savings are guaranteed. + var payload = Encoding.UTF8.GetBytes(new string('z', 1024)); + + // Act + EventCompressor.CompressIfBeneficial(payload); + + // Assert — compressed version of 1 024 repeated bytes should be much smaller. + EventCompressor.BytesSaved.ShouldBeGreaterThan(0L); + // BytesSaved = original - compressed; should be less than original size. + EventCompressor.BytesSaved.ShouldBeLessThan(payload.Length); + } + + // ── 10 ───────────────────────────────────────────────────────────────────── + [Fact] + public void ResetStats_ClearsAll() + { + // Arrange — produce some stats first. + var largePayload = Encoding.UTF8.GetBytes(new string('b', 512)); + EventCompressor.CompressIfBeneficial(largePayload); + EventCompressor.TotalCompressed.ShouldBeGreaterThan(0L); + + // Act + EventCompressor.ResetStats(); + + // Assert + EventCompressor.TotalCompressed.ShouldBe(0L); + EventCompressor.TotalUncompressed.ShouldBe(0L); + EventCompressor.BytesSaved.ShouldBe(0L); + } +}