fix(datasync): add guards and exception safety to JsonZstdFileSource
This commit is contained in:
@@ -32,12 +32,29 @@ public sealed class JsonZstdFileSource : IImportSource
|
|||||||
|
|
||||||
public Task<IDataReader> ReadDataAsync(CancellationToken cancellationToken = default)
|
public Task<IDataReader> ReadDataAsync(CancellationToken cancellationToken = default)
|
||||||
{
|
{
|
||||||
_fileStream = new FileStream(_filePath, FileMode.Open, FileAccess.Read, FileShare.Read,
|
if (_fileStream != null)
|
||||||
bufferSize: 65536, useAsync: true);
|
throw new InvalidOperationException("ReadDataAsync has already been called. Dispose and create a new source to read again.");
|
||||||
_decompressionStream = new DecompressionStream(_fileStream);
|
|
||||||
_reader = new JsonStreamingDataReader(_decompressionStream, _schema);
|
|
||||||
|
|
||||||
return Task.FromResult<IDataReader>(_reader);
|
try
|
||||||
|
{
|
||||||
|
_fileStream = new FileStream(_filePath, FileMode.Open, FileAccess.Read, FileShare.Read,
|
||||||
|
bufferSize: 65536, useAsync: true);
|
||||||
|
_decompressionStream = new DecompressionStream(_fileStream);
|
||||||
|
_reader = new JsonStreamingDataReader(_decompressionStream, _schema);
|
||||||
|
|
||||||
|
return Task.FromResult<IDataReader>(_reader);
|
||||||
|
}
|
||||||
|
catch
|
||||||
|
{
|
||||||
|
// Clean up on failure
|
||||||
|
_reader?.Dispose();
|
||||||
|
_decompressionStream?.Dispose();
|
||||||
|
_fileStream?.Dispose();
|
||||||
|
_reader = null;
|
||||||
|
_decompressionStream = null;
|
||||||
|
_fileStream = null;
|
||||||
|
throw;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public async ValueTask DisposeAsync()
|
public async ValueTask DisposeAsync()
|
||||||
|
|||||||
Reference in New Issue
Block a user