feat: add ProtobufZstdFileSource for reading protobuf cache files

This commit is contained in:
Joseph Doherty
2026-01-06 16:39:35 -05:00
parent d503fec7cc
commit 055406431d
@@ -0,0 +1,104 @@
using System.Data;
using JdeScoping.DataSync.Etl.Contracts;
using ProtoBuf.Data;
using ZstdSharp;
namespace JdeScoping.DataSync.Dev.Sources;
/// <summary>
/// Import source that reads from a zstd-compressed protobuf file.
/// Uses protobuf-net-data for IDataReader deserialization.
/// </summary>
public sealed class ProtobufZstdFileSource : IImportSource
{
private const int FileBufferSize = 256 * 1024; // 256 KB
private const int DecompressBufferSize = 256 * 1024; // 256 KB
private readonly string _filePath;
private FileStream? _fileStream;
private DecompressionStream? _decompressionStream;
private BufferedStream? _bufferedStream;
private IDataReader? _reader;
public string SourceName => $"Protobuf:{Path.GetFileName(_filePath)}";
public ProtobufZstdFileSource(string filePath)
{
if (string.IsNullOrWhiteSpace(filePath))
throw new ArgumentException("File path cannot be null or empty.", nameof(filePath));
if (!File.Exists(filePath))
throw new FileNotFoundException($"Cache file not found: {filePath}", filePath);
_filePath = filePath;
}
public Task<IDataReader> ReadDataAsync(CancellationToken cancellationToken = default)
{
if (_fileStream != null)
throw new InvalidOperationException("ReadDataAsync has already been called. Dispose and create a new source to read again.");
try
{
_fileStream = new FileStream(
_filePath,
FileMode.Open,
FileAccess.Read,
FileShare.Read,
bufferSize: FileBufferSize,
FileOptions.SequentialScan);
_decompressionStream = new DecompressionStream(_fileStream);
_bufferedStream = new BufferedStream(_decompressionStream, DecompressBufferSize);
// protobuf-net-data returns IDataReader directly
_reader = DataSerializer.Deserialize(_bufferedStream);
return Task.FromResult(_reader);
}
catch
{
Cleanup();
throw;
}
}
private void Cleanup()
{
_reader?.Dispose();
_bufferedStream?.Dispose();
_decompressionStream?.Dispose();
_fileStream?.Dispose();
_reader = null;
_bufferedStream = null;
_decompressionStream = null;
_fileStream = null;
}
public async ValueTask DisposeAsync()
{
if (_reader != null)
{
_reader.Dispose();
_reader = null;
}
if (_bufferedStream != null)
{
await _bufferedStream.DisposeAsync();
_bufferedStream = null;
}
if (_decompressionStream != null)
{
await _decompressionStream.DisposeAsync();
_decompressionStream = null;
}
if (_fileStream != null)
{
await _fileStream.DisposeAsync();
_fileStream = null;
}
}
}