feat(datasync): add JsonZstdFileSource for reading zstd-compressed JSON files
This commit is contained in:
@@ -0,0 +1,63 @@
|
||||
using System.Data;
|
||||
using JdeScoping.DataSync.Etl.Contracts;
|
||||
using JdeScoping.DataSync.Etl.Models;
|
||||
using ZstdSharp;
|
||||
|
||||
namespace JdeScoping.DataSync.Etl.Sources;
|
||||
|
||||
/// <summary>
|
||||
/// Import source that reads from a zstd-compressed JSON array file.
|
||||
/// </summary>
|
||||
public sealed class JsonZstdFileSource : IImportSource
|
||||
{
|
||||
private readonly string _filePath;
|
||||
private readonly JsonColumnSchema[] _schema;
|
||||
private FileStream? _fileStream;
|
||||
private DecompressionStream? _decompressionStream;
|
||||
private JsonStreamingDataReader? _reader;
|
||||
|
||||
public string SourceName => $"JsonZstd:{Path.GetFileName(_filePath)}";
|
||||
|
||||
public JsonZstdFileSource(string filePath, JsonColumnSchema[] schema)
|
||||
{
|
||||
if (string.IsNullOrWhiteSpace(filePath))
|
||||
throw new ArgumentException("File path cannot be null or empty.", nameof(filePath));
|
||||
|
||||
if (!File.Exists(filePath))
|
||||
throw new FileNotFoundException($"Cache file not found: {filePath}", filePath);
|
||||
|
||||
_filePath = filePath;
|
||||
_schema = schema ?? throw new ArgumentNullException(nameof(schema));
|
||||
}
|
||||
|
||||
public Task<IDataReader> ReadDataAsync(CancellationToken cancellationToken = default)
|
||||
{
|
||||
_fileStream = new FileStream(_filePath, FileMode.Open, FileAccess.Read, FileShare.Read,
|
||||
bufferSize: 65536, useAsync: true);
|
||||
_decompressionStream = new DecompressionStream(_fileStream);
|
||||
_reader = new JsonStreamingDataReader(_decompressionStream, _schema);
|
||||
|
||||
return Task.FromResult<IDataReader>(_reader);
|
||||
}
|
||||
|
||||
public async ValueTask DisposeAsync()
|
||||
{
|
||||
if (_reader != null)
|
||||
{
|
||||
_reader.Dispose();
|
||||
_reader = null;
|
||||
}
|
||||
|
||||
if (_decompressionStream != null)
|
||||
{
|
||||
await _decompressionStream.DisposeAsync();
|
||||
_decompressionStream = null;
|
||||
}
|
||||
|
||||
if (_fileStream != null)
|
||||
{
|
||||
await _fileStream.DisposeAsync();
|
||||
_fileStream = null;
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user