diff --git a/NEW/src/JdeScoping.DataSync/Etl/Sources/JsonZstdFileSource.cs b/NEW/src/JdeScoping.DataSync/Etl/Sources/JsonZstdFileSource.cs new file mode 100644 index 0000000..e348456 --- /dev/null +++ b/NEW/src/JdeScoping.DataSync/Etl/Sources/JsonZstdFileSource.cs @@ -0,0 +1,63 @@ +using System.Data; +using JdeScoping.DataSync.Etl.Contracts; +using JdeScoping.DataSync.Etl.Models; +using ZstdSharp; + +namespace JdeScoping.DataSync.Etl.Sources; + +/// +/// Import source that reads from a zstd-compressed JSON array file. +/// +public sealed class JsonZstdFileSource : IImportSource +{ + private readonly string _filePath; + private readonly JsonColumnSchema[] _schema; + private FileStream? _fileStream; + private DecompressionStream? _decompressionStream; + private JsonStreamingDataReader? _reader; + + public string SourceName => $"JsonZstd:{Path.GetFileName(_filePath)}"; + + public JsonZstdFileSource(string filePath, JsonColumnSchema[] schema) + { + if (string.IsNullOrWhiteSpace(filePath)) + throw new ArgumentException("File path cannot be null or empty.", nameof(filePath)); + + if (!File.Exists(filePath)) + throw new FileNotFoundException($"Cache file not found: {filePath}", filePath); + + _filePath = filePath; + _schema = schema ?? throw new ArgumentNullException(nameof(schema)); + } + + public Task ReadDataAsync(CancellationToken cancellationToken = default) + { + _fileStream = new FileStream(_filePath, FileMode.Open, FileAccess.Read, FileShare.Read, + bufferSize: 65536, useAsync: true); + _decompressionStream = new DecompressionStream(_fileStream); + _reader = new JsonStreamingDataReader(_decompressionStream, _schema); + + return Task.FromResult(_reader); + } + + public async ValueTask DisposeAsync() + { + if (_reader != null) + { + _reader.Dispose(); + _reader = null; + } + + if (_decompressionStream != null) + { + await _decompressionStream.DisposeAsync(); + _decompressionStream = null; + } + + if (_fileStream != null) + { + await _fileStream.DisposeAsync(); + _fileStream = null; + } + } +}