feat(batch15): complete msgblock and consumer file store port

This commit is contained in:
Joseph Doherty
2026-02-28 18:39:18 -05:00
parent cfddfc9084
commit 9db4130d53
3 changed files with 38 additions and 4 deletions

View File

@@ -69,6 +69,22 @@ public static class StoreEnumExtensions
};
}
public static (byte[]? Buffer, Exception? Error) Decompress(this StoreCompression alg, byte[] buf)
{
ArgumentNullException.ThrowIfNull(buf);
const int checksumSize = FileStoreDefaults.RecordHashSize;
if (buf.Length < checksumSize)
return (null, new InvalidDataException("compressed buffer is too short"));
return alg switch
{
StoreCompression.NoCompression => (buf, null),
StoreCompression.S2Compression => DecompressS2(buf, checksumSize),
_ => (null, new InvalidOperationException("compression algorithm not known")),
};
}
private static (byte[]? Buffer, Exception? Error) CompressS2(byte[] buf, int checksumSize)
{
try
@@ -86,4 +102,22 @@ public static class StoreEnumExtensions
return (null, new IOException("error writing to compression writer", ex));
}
}
private static (byte[]? Buffer, Exception? Error) DecompressS2(byte[] buf, int checksumSize)
{
try
{
var bodyLength = buf.Length - checksumSize;
var decompressedBody = Snappy.Decode(buf.AsSpan(0, bodyLength));
var output = new byte[decompressedBody.Length + checksumSize];
Buffer.BlockCopy(decompressedBody, 0, output, 0, decompressedBody.Length);
Buffer.BlockCopy(buf, bodyLength, output, decompressedBody.Length, checksumSize);
return (output, null);
}
catch (Exception ex)
{
return (null, new IOException("error reading compression reader", ex));
}
}
}