diff --git a/NEW/src/JdeScoping.DataSync.Dev/Sources/ProtobufZstdFileSource.cs b/NEW/src/JdeScoping.DataSync.Dev/Sources/ProtobufZstdFileSource.cs new file mode 100644 index 0000000..c1c4598 --- /dev/null +++ b/NEW/src/JdeScoping.DataSync.Dev/Sources/ProtobufZstdFileSource.cs @@ -0,0 +1,104 @@ +using System.Data; +using JdeScoping.DataSync.Etl.Contracts; +using ProtoBuf.Data; +using ZstdSharp; + +namespace JdeScoping.DataSync.Dev.Sources; + +/// +/// Import source that reads from a zstd-compressed protobuf file. +/// Uses protobuf-net-data for IDataReader deserialization. +/// +public sealed class ProtobufZstdFileSource : IImportSource +{ + private const int FileBufferSize = 256 * 1024; // 256 KB + private const int DecompressBufferSize = 256 * 1024; // 256 KB + + private readonly string _filePath; + private FileStream? _fileStream; + private DecompressionStream? _decompressionStream; + private BufferedStream? _bufferedStream; + private IDataReader? _reader; + + public string SourceName => $"Protobuf:{Path.GetFileName(_filePath)}"; + + public ProtobufZstdFileSource(string filePath) + { + if (string.IsNullOrWhiteSpace(filePath)) + throw new ArgumentException("File path cannot be null or empty.", nameof(filePath)); + + if (!File.Exists(filePath)) + throw new FileNotFoundException($"Cache file not found: {filePath}", filePath); + + _filePath = filePath; + } + + public Task ReadDataAsync(CancellationToken cancellationToken = default) + { + if (_fileStream != null) + throw new InvalidOperationException("ReadDataAsync has already been called. Dispose and create a new source to read again."); + + try + { + _fileStream = new FileStream( + _filePath, + FileMode.Open, + FileAccess.Read, + FileShare.Read, + bufferSize: FileBufferSize, + FileOptions.SequentialScan); + + _decompressionStream = new DecompressionStream(_fileStream); + _bufferedStream = new BufferedStream(_decompressionStream, DecompressBufferSize); + + // protobuf-net-data returns IDataReader directly + _reader = DataSerializer.Deserialize(_bufferedStream); + + return Task.FromResult(_reader); + } + catch + { + Cleanup(); + throw; + } + } + + private void Cleanup() + { + _reader?.Dispose(); + _bufferedStream?.Dispose(); + _decompressionStream?.Dispose(); + _fileStream?.Dispose(); + _reader = null; + _bufferedStream = null; + _decompressionStream = null; + _fileStream = null; + } + + public async ValueTask DisposeAsync() + { + if (_reader != null) + { + _reader.Dispose(); + _reader = null; + } + + if (_bufferedStream != null) + { + await _bufferedStream.DisposeAsync(); + _bufferedStream = null; + } + + if (_decompressionStream != null) + { + await _decompressionStream.DisposeAsync(); + _decompressionStream = null; + } + + if (_fileStream != null) + { + await _fileStream.DisposeAsync(); + _fileStream = null; + } + } +}