feat: add parallel file conversion support to cache converter
- Add optional third parameter for parallelism (default: 8) - Use Parallel.ForEachAsync for concurrent file processing - Thread-safe console output with lock - Thread-safe size counters with Interlocked Usage: dotnet run -- <cache-dir> <scripts-dir> [parallelism]
This commit is contained in:
@@ -6,13 +6,14 @@ using ZstdSharp;
|
|||||||
|
|
||||||
if (args.Length < 2)
|
if (args.Length < 2)
|
||||||
{
|
{
|
||||||
Console.WriteLine("Usage: CacheConverter <cache-directory> <scripts-directory>");
|
Console.WriteLine("Usage: CacheConverter <cache-directory> <scripts-directory> [parallelism]");
|
||||||
Console.WriteLine("Example: dotnet run -- ../../CACHED_DB_FILES ../../NEW/src/JdeScoping.Database/Scripts");
|
Console.WriteLine("Example: dotnet run -- ../../CACHED_DB_FILES ../../NEW/src/JdeScoping.Database/Scripts 8");
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
var cacheDir = args[0];
|
var cacheDir = args[0];
|
||||||
var scriptsDir = args[1];
|
var scriptsDir = args[1];
|
||||||
|
var parallelism = args.Length > 2 && int.TryParse(args[2], out var p) ? p : 8;
|
||||||
|
|
||||||
if (!Directory.Exists(cacheDir))
|
if (!Directory.Exists(cacheDir))
|
||||||
{
|
{
|
||||||
@@ -53,45 +54,46 @@ var fileMapping = new Dictionary<string, (string ScriptFile, string TableName)>(
|
|||||||
};
|
};
|
||||||
|
|
||||||
var jsonFiles = Directory.GetFiles(cacheDir, "*.json.zstd");
|
var jsonFiles = Directory.GetFiles(cacheDir, "*.json.zstd");
|
||||||
Console.WriteLine($"Found {jsonFiles.Length} JSON files to convert");
|
Console.WriteLine($"Found {jsonFiles.Length} JSON files to convert (parallelism: {parallelism})");
|
||||||
|
|
||||||
long totalOriginalSize = 0;
|
long totalOriginalSize = 0;
|
||||||
long totalNewSize = 0;
|
long totalNewSize = 0;
|
||||||
const int BatchSize = 10000;
|
const int BatchSize = 10000;
|
||||||
|
var consoleLock = new object();
|
||||||
|
|
||||||
foreach (var jsonFile in jsonFiles)
|
var options = new ParallelOptions { MaxDegreeOfParallelism = parallelism };
|
||||||
|
|
||||||
|
await Parallel.ForEachAsync(jsonFiles, options, async (jsonFile, cancellationToken) =>
|
||||||
{
|
{
|
||||||
var baseName = Path.GetFileName(jsonFile).Replace(".json.zstd", "");
|
var baseName = Path.GetFileName(jsonFile).Replace(".json.zstd", "");
|
||||||
var outputFile = Path.Combine(cacheDir, $"{baseName}.pb.zstd");
|
var outputFile = Path.Combine(cacheDir, $"{baseName}.pb.zstd");
|
||||||
|
|
||||||
Console.Write($"Converting {baseName}... ");
|
|
||||||
|
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
// Look up the SQL script for this file
|
// Look up the SQL script for this file
|
||||||
if (!fileMapping.TryGetValue(baseName, out var mapping))
|
if (!fileMapping.TryGetValue(baseName, out var mapping))
|
||||||
{
|
{
|
||||||
Console.WriteLine($"SKIP (no SQL mapping for '{baseName}')");
|
lock (consoleLock) Console.WriteLine($"{baseName}: SKIP (no SQL mapping)");
|
||||||
continue;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
var scriptPath = Path.Combine(scriptsDir, mapping.ScriptFile);
|
var scriptPath = Path.Combine(scriptsDir, mapping.ScriptFile);
|
||||||
if (!File.Exists(scriptPath))
|
if (!File.Exists(scriptPath))
|
||||||
{
|
{
|
||||||
Console.WriteLine($"SKIP (script not found: {mapping.ScriptFile})");
|
lock (consoleLock) Console.WriteLine($"{baseName}: SKIP (script not found: {mapping.ScriptFile})");
|
||||||
continue;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Parse schema from SQL script
|
// Parse schema from SQL script
|
||||||
var schema = ParseSqlSchema(scriptPath, mapping.TableName);
|
var schema = ParseSqlSchema(scriptPath, mapping.TableName);
|
||||||
if (schema.Count == 0)
|
if (schema.Count == 0)
|
||||||
{
|
{
|
||||||
Console.WriteLine("SKIP (could not parse schema)");
|
lock (consoleLock) Console.WriteLine($"{baseName}: SKIP (could not parse schema)");
|
||||||
continue;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
var originalSize = new FileInfo(jsonFile).Length;
|
var originalSize = new FileInfo(jsonFile).Length;
|
||||||
totalOriginalSize += originalSize;
|
Interlocked.Add(ref totalOriginalSize, originalSize);
|
||||||
|
|
||||||
// Create DataTable with schema from SQL
|
// Create DataTable with schema from SQL
|
||||||
var dataTable = new DataTable(mapping.TableName);
|
var dataTable = new DataTable(mapping.TableName);
|
||||||
@@ -104,7 +106,7 @@ foreach (var jsonFile in jsonFiles)
|
|||||||
await using var inputFs = new FileStream(jsonFile, FileMode.Open, FileAccess.Read, FileShare.Read, 256 * 1024, FileOptions.SequentialScan | FileOptions.Asynchronous);
|
await using var inputFs = new FileStream(jsonFile, FileMode.Open, FileAccess.Read, FileShare.Read, 256 * 1024, FileOptions.SequentialScan | FileOptions.Asynchronous);
|
||||||
await using var decompressStream = new DecompressionStream(inputFs);
|
await using var decompressStream = new DecompressionStream(inputFs);
|
||||||
await using var outputFs = new FileStream(outputFile, FileMode.Create, FileAccess.Write, FileShare.None, 256 * 1024, FileOptions.Asynchronous);
|
await using var outputFs = new FileStream(outputFile, FileMode.Create, FileAccess.Write, FileShare.None, 256 * 1024, FileOptions.Asynchronous);
|
||||||
await using var compressStream = new CompressionStream(outputFs, level: 3);
|
await using var compressStream = new CompressionStream(outputFs, level: 10);
|
||||||
|
|
||||||
int rowCount = 0;
|
int rowCount = 0;
|
||||||
int batchCount = 0;
|
int batchCount = 0;
|
||||||
@@ -113,7 +115,8 @@ foreach (var jsonFile in jsonFiles)
|
|||||||
var jsonOptions = new JsonSerializerOptions { PropertyNameCaseInsensitive = true };
|
var jsonOptions = new JsonSerializerOptions { PropertyNameCaseInsensitive = true };
|
||||||
await foreach (var element in JsonSerializer.DeserializeAsyncEnumerable<JsonElement>(
|
await foreach (var element in JsonSerializer.DeserializeAsyncEnumerable<JsonElement>(
|
||||||
decompressStream,
|
decompressStream,
|
||||||
jsonOptions))
|
jsonOptions,
|
||||||
|
cancellationToken))
|
||||||
{
|
{
|
||||||
var row = dataTable.NewRow();
|
var row = dataTable.NewRow();
|
||||||
ReadJsonElement(element, row, dataTable);
|
ReadJsonElement(element, row, dataTable);
|
||||||
@@ -138,19 +141,19 @@ foreach (var jsonFile in jsonFiles)
|
|||||||
batchCount++;
|
batchCount++;
|
||||||
}
|
}
|
||||||
|
|
||||||
await compressStream.FlushAsync();
|
await compressStream.FlushAsync(cancellationToken);
|
||||||
|
|
||||||
var newSize = new FileInfo(outputFile).Length;
|
var newSize = new FileInfo(outputFile).Length;
|
||||||
totalNewSize += newSize;
|
Interlocked.Add(ref totalNewSize, newSize);
|
||||||
|
|
||||||
var ratio = (double)newSize / originalSize * 100;
|
var ratio = (double)newSize / originalSize * 100;
|
||||||
Console.WriteLine($"OK ({rowCount:N0} rows, {batchCount} batches, {originalSize:N0} -> {newSize:N0} bytes, {ratio:F1}%)");
|
lock (consoleLock) Console.WriteLine($"{baseName}: OK ({rowCount:N0} rows, {batchCount} batches, {originalSize:N0} -> {newSize:N0} bytes, {ratio:F1}%)");
|
||||||
}
|
}
|
||||||
catch (Exception ex)
|
catch (Exception ex)
|
||||||
{
|
{
|
||||||
Console.WriteLine($"ERROR: {ex.Message}");
|
lock (consoleLock) Console.WriteLine($"{baseName}: ERROR: {ex.Message}");
|
||||||
}
|
}
|
||||||
}
|
});
|
||||||
|
|
||||||
Console.WriteLine();
|
Console.WriteLine();
|
||||||
if (totalOriginalSize > 0)
|
if (totalOriginalSize > 0)
|
||||||
|
|||||||
Reference in New Issue
Block a user