From 2629cb26e07f3c8083231874b4023e6f733b68b0 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sat, 3 Jan 2026 16:21:59 -0500 Subject: [PATCH] fix(datasync): add guards and exception safety to JsonZstdFileSource --- .../Etl/Sources/JsonZstdFileSource.cs | 27 +++++++++++++++---- 1 file changed, 22 insertions(+), 5 deletions(-) diff --git a/NEW/src/JdeScoping.DataSync/Etl/Sources/JsonZstdFileSource.cs b/NEW/src/JdeScoping.DataSync/Etl/Sources/JsonZstdFileSource.cs index e348456..d7f7204 100644 --- a/NEW/src/JdeScoping.DataSync/Etl/Sources/JsonZstdFileSource.cs +++ b/NEW/src/JdeScoping.DataSync/Etl/Sources/JsonZstdFileSource.cs @@ -32,12 +32,29 @@ public sealed class JsonZstdFileSource : IImportSource public Task ReadDataAsync(CancellationToken cancellationToken = default) { - _fileStream = new FileStream(_filePath, FileMode.Open, FileAccess.Read, FileShare.Read, - bufferSize: 65536, useAsync: true); - _decompressionStream = new DecompressionStream(_fileStream); - _reader = new JsonStreamingDataReader(_decompressionStream, _schema); + if (_fileStream != null) + throw new InvalidOperationException("ReadDataAsync has already been called. Dispose and create a new source to read again."); - return Task.FromResult(_reader); + try + { + _fileStream = new FileStream(_filePath, FileMode.Open, FileAccess.Read, FileShare.Read, + bufferSize: 65536, useAsync: true); + _decompressionStream = new DecompressionStream(_fileStream); + _reader = new JsonStreamingDataReader(_decompressionStream, _schema); + + return Task.FromResult(_reader); + } + catch + { + // Clean up on failure + _reader?.Dispose(); + _decompressionStream?.Dispose(); + _fileStream?.Dispose(); + _reader = null; + _decompressionStream = null; + _fileStream = null; + throw; + } } public async ValueTask DisposeAsync()