From 6ebd78d487e5d32d370c0d114a5b36bb749df7d8 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Tue, 6 Jan 2026 15:35:00 -0500 Subject: [PATCH] feat: add parallel file conversion support to cache converter - Add optional third parameter for parallelism (default: 8) - Use Parallel.ForEachAsync for concurrent file processing - Thread-safe console output with lock - Thread-safe size counters with Interlocked Usage: dotnet run -- [parallelism] --- Tools/CacheConverter/Program.cs | 43 ++++++++++++++++++--------------- 1 file changed, 23 insertions(+), 20 deletions(-) diff --git a/Tools/CacheConverter/Program.cs b/Tools/CacheConverter/Program.cs index bcc9367..8667ee7 100644 --- a/Tools/CacheConverter/Program.cs +++ b/Tools/CacheConverter/Program.cs @@ -6,13 +6,14 @@ using ZstdSharp; if (args.Length < 2) { - Console.WriteLine("Usage: CacheConverter "); - Console.WriteLine("Example: dotnet run -- ../../CACHED_DB_FILES ../../NEW/src/JdeScoping.Database/Scripts"); + Console.WriteLine("Usage: CacheConverter [parallelism]"); + Console.WriteLine("Example: dotnet run -- ../../CACHED_DB_FILES ../../NEW/src/JdeScoping.Database/Scripts 8"); return 1; } var cacheDir = args[0]; var scriptsDir = args[1]; +var parallelism = args.Length > 2 && int.TryParse(args[2], out var p) ? p : 8; if (!Directory.Exists(cacheDir)) { @@ -53,45 +54,46 @@ var fileMapping = new Dictionary( }; var jsonFiles = Directory.GetFiles(cacheDir, "*.json.zstd"); -Console.WriteLine($"Found {jsonFiles.Length} JSON files to convert"); +Console.WriteLine($"Found {jsonFiles.Length} JSON files to convert (parallelism: {parallelism})"); long totalOriginalSize = 0; long totalNewSize = 0; const int BatchSize = 10000; +var consoleLock = new object(); -foreach (var jsonFile in jsonFiles) +var options = new ParallelOptions { MaxDegreeOfParallelism = parallelism }; + +await Parallel.ForEachAsync(jsonFiles, options, async (jsonFile, cancellationToken) => { var baseName = Path.GetFileName(jsonFile).Replace(".json.zstd", ""); var outputFile = Path.Combine(cacheDir, $"{baseName}.pb.zstd"); - Console.Write($"Converting {baseName}... "); - try { // Look up the SQL script for this file if (!fileMapping.TryGetValue(baseName, out var mapping)) { - Console.WriteLine($"SKIP (no SQL mapping for '{baseName}')"); - continue; + lock (consoleLock) Console.WriteLine($"{baseName}: SKIP (no SQL mapping)"); + return; } var scriptPath = Path.Combine(scriptsDir, mapping.ScriptFile); if (!File.Exists(scriptPath)) { - Console.WriteLine($"SKIP (script not found: {mapping.ScriptFile})"); - continue; + lock (consoleLock) Console.WriteLine($"{baseName}: SKIP (script not found: {mapping.ScriptFile})"); + return; } // Parse schema from SQL script var schema = ParseSqlSchema(scriptPath, mapping.TableName); if (schema.Count == 0) { - Console.WriteLine("SKIP (could not parse schema)"); - continue; + lock (consoleLock) Console.WriteLine($"{baseName}: SKIP (could not parse schema)"); + return; } var originalSize = new FileInfo(jsonFile).Length; - totalOriginalSize += originalSize; + Interlocked.Add(ref totalOriginalSize, originalSize); // Create DataTable with schema from SQL var dataTable = new DataTable(mapping.TableName); @@ -104,7 +106,7 @@ foreach (var jsonFile in jsonFiles) await using var inputFs = new FileStream(jsonFile, FileMode.Open, FileAccess.Read, FileShare.Read, 256 * 1024, FileOptions.SequentialScan | FileOptions.Asynchronous); await using var decompressStream = new DecompressionStream(inputFs); await using var outputFs = new FileStream(outputFile, FileMode.Create, FileAccess.Write, FileShare.None, 256 * 1024, FileOptions.Asynchronous); - await using var compressStream = new CompressionStream(outputFs, level: 3); + await using var compressStream = new CompressionStream(outputFs, level: 10); int rowCount = 0; int batchCount = 0; @@ -113,7 +115,8 @@ foreach (var jsonFile in jsonFiles) var jsonOptions = new JsonSerializerOptions { PropertyNameCaseInsensitive = true }; await foreach (var element in JsonSerializer.DeserializeAsyncEnumerable( decompressStream, - jsonOptions)) + jsonOptions, + cancellationToken)) { var row = dataTable.NewRow(); ReadJsonElement(element, row, dataTable); @@ -138,19 +141,19 @@ foreach (var jsonFile in jsonFiles) batchCount++; } - await compressStream.FlushAsync(); + await compressStream.FlushAsync(cancellationToken); var newSize = new FileInfo(outputFile).Length; - totalNewSize += newSize; + Interlocked.Add(ref totalNewSize, newSize); var ratio = (double)newSize / originalSize * 100; - Console.WriteLine($"OK ({rowCount:N0} rows, {batchCount} batches, {originalSize:N0} -> {newSize:N0} bytes, {ratio:F1}%)"); + lock (consoleLock) Console.WriteLine($"{baseName}: OK ({rowCount:N0} rows, {batchCount} batches, {originalSize:N0} -> {newSize:N0} bytes, {ratio:F1}%)"); } catch (Exception ex) { - Console.WriteLine($"ERROR: {ex.Message}"); + lock (consoleLock) Console.WriteLine($"{baseName}: ERROR: {ex.Message}"); } -} +}); Console.WriteLine(); if (totalOriginalSize > 0)