fix: read file size after streams are closed in converter

This commit is contained in:
Joseph Doherty
2026-01-06 15:39:47 -05:00
parent 6ebd78d487
commit cd68b2c655
+32 -30
View File
@@ -102,47 +102,49 @@ await Parallel.ForEachAsync(jsonFiles, options, async (jsonFile, cancellationTok
dataTable.Columns.Add(colName, colType); dataTable.Columns.Add(colName, colType);
} }
// Stream JSON and write to protobuf in batches
await using var inputFs = new FileStream(jsonFile, FileMode.Open, FileAccess.Read, FileShare.Read, 256 * 1024, FileOptions.SequentialScan | FileOptions.Asynchronous);
await using var decompressStream = new DecompressionStream(inputFs);
await using var outputFs = new FileStream(outputFile, FileMode.Create, FileAccess.Write, FileShare.None, 256 * 1024, FileOptions.Asynchronous);
await using var compressStream = new CompressionStream(outputFs, level: 10);
int rowCount = 0; int rowCount = 0;
int batchCount = 0; int batchCount = 0;
// True streaming: DeserializeAsyncEnumerable streams each array element without loading entire JSON // Stream JSON and write to protobuf in batches
var jsonOptions = new JsonSerializerOptions { PropertyNameCaseInsensitive = true }; // Use explicit dispose to ensure file is closed before reading size
await foreach (var element in JsonSerializer.DeserializeAsyncEnumerable<JsonElement>(
decompressStream,
jsonOptions,
cancellationToken))
{ {
var row = dataTable.NewRow(); await using var inputFs = new FileStream(jsonFile, FileMode.Open, FileAccess.Read, FileShare.Read, 256 * 1024, FileOptions.SequentialScan | FileOptions.Asynchronous);
ReadJsonElement(element, row, dataTable); await using var decompressStream = new DecompressionStream(inputFs);
dataTable.Rows.Add(row); await using var outputFs = new FileStream(outputFile, FileMode.Create, FileAccess.Write, FileShare.None, 256 * 1024, FileOptions.Asynchronous);
rowCount++; await using var compressStream = new CompressionStream(outputFs, level: 10);
// Write batch when we hit the batch size // True streaming: DeserializeAsyncEnumerable streams each array element without loading entire JSON
if (dataTable.Rows.Count >= BatchSize) var jsonOptions = new JsonSerializerOptions { PropertyNameCaseInsensitive = true };
await foreach (var element in JsonSerializer.DeserializeAsyncEnumerable<JsonElement>(
decompressStream,
jsonOptions,
cancellationToken))
{
var row = dataTable.NewRow();
ReadJsonElement(element, row, dataTable);
dataTable.Rows.Add(row);
rowCount++;
// Write batch when we hit the batch size
if (dataTable.Rows.Count >= BatchSize)
{
using var reader = dataTable.CreateDataReader();
DataSerializer.Serialize(compressStream, reader);
dataTable.Clear();
batchCount++;
}
}
// Write remaining rows
if (dataTable.Rows.Count > 0)
{ {
using var reader = dataTable.CreateDataReader(); using var reader = dataTable.CreateDataReader();
DataSerializer.Serialize(compressStream, reader); DataSerializer.Serialize(compressStream, reader);
dataTable.Clear();
batchCount++; batchCount++;
} }
} } // Streams closed here
// Write remaining rows
if (dataTable.Rows.Count > 0)
{
using var reader = dataTable.CreateDataReader();
DataSerializer.Serialize(compressStream, reader);
batchCount++;
}
await compressStream.FlushAsync(cancellationToken);
// Read file size after streams are fully closed
var newSize = new FileInfo(outputFile).Length; var newSize = new FileInfo(outputFile).Length;
Interlocked.Add(ref totalNewSize, newSize); Interlocked.Add(ref totalNewSize, newSize);