feat: add S2 compression and AEAD encryption for FileStore (Go parity)
Replace Deflate+XOR with IronSnappy S2 block compression and ChaCha20-Poly1305 / AES-256-GCM AEAD encryption, matching golang/nats-server/server/filestore.go. Introduces FSV2 envelope format alongside existing FSV1 for backward compatibility. Adds 55 new tests across S2CodecTests, AeadEncryptorTests, and FileStoreV2Tests covering all 6 cipher×compression permutations, tamper detection, and legacy format round-trips.
This commit is contained in:
111
src/NATS.Server/JetStream/Storage/S2Codec.cs
Normal file
111
src/NATS.Server/JetStream/Storage/S2Codec.cs
Normal file
@@ -0,0 +1,111 @@
|
||||
// Reference: golang/nats-server/server/filestore.go
|
||||
// Go uses S2 (Snappy variant) compression throughout FileStore:
|
||||
// - msgCompress / msgDecompress (filestore.go ~line 840)
|
||||
// - compressBlock / decompressBlock for block-level data
|
||||
// S2 is faster than Deflate and produces comparable ratios for binary payloads.
|
||||
// IronSnappy provides Snappy-format encode/decode, which is compatible with
|
||||
// the Go snappy package used by the S2 library for block compression.
|
||||
|
||||
using IronSnappy;
|
||||
|
||||
namespace NATS.Server.JetStream.Storage;
|
||||
|
||||
/// <summary>
|
||||
/// S2/Snappy codec for FileStore payload compression, mirroring the Go
|
||||
/// implementation which uses <c>github.com/klauspost/compress/s2</c>.
|
||||
/// </summary>
|
||||
internal static class S2Codec
|
||||
{
|
||||
/// <summary>
|
||||
/// Compresses <paramref name="data"/> using Snappy block format.
|
||||
/// Returns the compressed bytes, which may be longer than the input for
|
||||
/// very small payloads (Snappy does not guarantee compression for tiny inputs).
|
||||
/// </summary>
|
||||
public static byte[] Compress(ReadOnlySpan<byte> data)
|
||||
{
|
||||
if (data.IsEmpty)
|
||||
return [];
|
||||
|
||||
return Snappy.Encode(data);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Decompresses Snappy-compressed <paramref name="data"/>.
|
||||
/// </summary>
|
||||
/// <exception cref="InvalidDataException">If the data is not valid Snappy.</exception>
|
||||
public static byte[] Decompress(ReadOnlySpan<byte> data)
|
||||
{
|
||||
if (data.IsEmpty)
|
||||
return [];
|
||||
|
||||
return Snappy.Decode(data);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Compresses only the body portion of <paramref name="data"/>, leaving the
|
||||
/// last <paramref name="checksumSize"/> bytes uncompressed (appended verbatim).
|
||||
/// </summary>
|
||||
/// <remarks>
|
||||
/// In the Go FileStore the trailing bytes of a stored record can be a raw
|
||||
/// checksum that is not part of the compressed payload. This helper mirrors
|
||||
/// that separation (filestore.go msgCompress, where the CRC lives outside
|
||||
/// the S2 frame).
|
||||
/// </remarks>
|
||||
public static byte[] CompressWithTrailingChecksum(ReadOnlySpan<byte> data, int checksumSize)
|
||||
{
|
||||
if (checksumSize < 0)
|
||||
throw new ArgumentOutOfRangeException(nameof(checksumSize));
|
||||
|
||||
if (data.IsEmpty)
|
||||
return [];
|
||||
|
||||
if (checksumSize == 0)
|
||||
return Compress(data);
|
||||
|
||||
if (checksumSize >= data.Length)
|
||||
{
|
||||
// Nothing to compress — return a copy as-is (checksum covers everything).
|
||||
return data.ToArray();
|
||||
}
|
||||
|
||||
var body = data[..^checksumSize];
|
||||
var checksum = data[^checksumSize..];
|
||||
|
||||
var compressedBody = Compress(body);
|
||||
var result = new byte[compressedBody.Length + checksumSize];
|
||||
compressedBody.CopyTo(result.AsSpan());
|
||||
checksum.CopyTo(result.AsSpan(compressedBody.Length));
|
||||
return result;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Decompresses only the body portion of <paramref name="data"/>, treating
|
||||
/// the last <paramref name="checksumSize"/> bytes as a raw (uncompressed) checksum.
|
||||
/// </summary>
|
||||
public static byte[] DecompressWithTrailingChecksum(ReadOnlySpan<byte> data, int checksumSize)
|
||||
{
|
||||
if (checksumSize < 0)
|
||||
throw new ArgumentOutOfRangeException(nameof(checksumSize));
|
||||
|
||||
if (data.IsEmpty)
|
||||
return [];
|
||||
|
||||
if (checksumSize == 0)
|
||||
return Decompress(data);
|
||||
|
||||
if (checksumSize >= data.Length)
|
||||
{
|
||||
// Nothing was compressed — return a copy as-is.
|
||||
return data.ToArray();
|
||||
}
|
||||
|
||||
var compressedBody = data[..^checksumSize];
|
||||
var checksum = data[^checksumSize..];
|
||||
|
||||
var decompressedBody = Decompress(compressedBody);
|
||||
var result = new byte[decompressedBody.Length + checksumSize];
|
||||
decompressedBody.CopyTo(result.AsSpan());
|
||||
checksum.CopyTo(result.AsSpan(decompressedBody.Length));
|
||||
return result;
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user