feat: extract DevEtl to JdeScoping.DataSync.Dev project
- Create JdeScoping.DataSync.Dev for sandbox testing ETL code - Create JdeScoping.DataSync.Dev.Tests for associated tests - Move 22 source files and 8 test files - Update namespaces from DevEtl to Dev - Add both projects to solution
This commit is contained in:
@@ -0,0 +1,188 @@
|
||||
using System.Collections.Concurrent;
|
||||
using JdeScoping.DataAccess.Interfaces;
|
||||
using JdeScoping.DataSync.Etl.Pipeline;
|
||||
using JdeScoping.DataSync.Etl.Results;
|
||||
using Microsoft.Extensions.Logging;
|
||||
|
||||
namespace JdeScoping.DataSync.Dev;
|
||||
|
||||
/// <summary>
|
||||
/// Registry for development ETL pipelines that load from cached JSON files.
|
||||
/// </summary>
|
||||
public class DevEtlRegistry
|
||||
{
|
||||
private readonly IDbConnectionFactory _connectionFactory;
|
||||
private readonly string _cacheDirectory;
|
||||
private readonly ILogger<DevEtlRegistry>? _logger;
|
||||
|
||||
private readonly Dictionary<string, Func<IDbConnectionFactory, string, EtlPipeline>> _pipelineFactories = new(StringComparer.OrdinalIgnoreCase)
|
||||
{
|
||||
// Small tables (< 1 MB)
|
||||
[BranchDevEtl.TableName] = (factory, cacheDir) =>
|
||||
BranchDevEtl.Create(factory, Path.Combine(cacheDir, BranchDevEtl.CacheFileName)),
|
||||
[OrgHierarchyDevEtl.TableName] = (factory, cacheDir) =>
|
||||
OrgHierarchyDevEtl.Create(factory, Path.Combine(cacheDir, OrgHierarchyDevEtl.CacheFileName)),
|
||||
[WorkCenterDevEtl.TableName] = (factory, cacheDir) =>
|
||||
WorkCenterDevEtl.Create(factory, Path.Combine(cacheDir, WorkCenterDevEtl.CacheFileName)),
|
||||
[ProfitCenterDevEtl.TableName] = (factory, cacheDir) =>
|
||||
ProfitCenterDevEtl.Create(factory, Path.Combine(cacheDir, ProfitCenterDevEtl.CacheFileName)),
|
||||
// Medium tables (1-20 MB)
|
||||
[JdeUserDevEtl.TableName] = (factory, cacheDir) =>
|
||||
JdeUserDevEtl.Create(factory, Path.Combine(cacheDir, JdeUserDevEtl.CacheFileName)),
|
||||
[FunctionCodeDevEtl.TableName] = (factory, cacheDir) =>
|
||||
FunctionCodeDevEtl.Create(factory, Path.Combine(cacheDir, FunctionCodeDevEtl.CacheFileName)),
|
||||
[ItemDevEtl.TableName] = (factory, cacheDir) =>
|
||||
ItemDevEtl.Create(factory, Path.Combine(cacheDir, ItemDevEtl.CacheFileName)),
|
||||
[RouteMasterDevEtl.TableName] = (factory, cacheDir) =>
|
||||
RouteMasterDevEtl.Create(factory, Path.Combine(cacheDir, RouteMasterDevEtl.CacheFileName)),
|
||||
// Large tables (20-200 MB)
|
||||
[LotDevEtl.TableName] = (factory, cacheDir) =>
|
||||
LotDevEtl.Create(factory, Path.Combine(cacheDir, LotDevEtl.CacheFileName)),
|
||||
[MisDataDevEtl.TableName] = (factory, cacheDir) =>
|
||||
MisDataDevEtl.Create(factory, Path.Combine(cacheDir, MisDataDevEtl.CacheFileName)),
|
||||
[WorkOrderCurrDevEtl.TableName] = (factory, cacheDir) =>
|
||||
WorkOrderCurrDevEtl.Create(factory, Path.Combine(cacheDir, WorkOrderCurrDevEtl.CacheFileName)),
|
||||
[WorkOrderHistDevEtl.TableName] = (factory, cacheDir) =>
|
||||
WorkOrderHistDevEtl.Create(factory, Path.Combine(cacheDir, WorkOrderHistDevEtl.CacheFileName)),
|
||||
[LotUsageHistDevEtl.TableName] = (factory, cacheDir) =>
|
||||
LotUsageHistDevEtl.Create(factory, Path.Combine(cacheDir, LotUsageHistDevEtl.CacheFileName)),
|
||||
[WorkOrderComponentHistDevEtl.TableName] = (factory, cacheDir) =>
|
||||
WorkOrderComponentHistDevEtl.Create(factory, Path.Combine(cacheDir, WorkOrderComponentHistDevEtl.CacheFileName)),
|
||||
// Very large tables (200+ MB)
|
||||
[WorkOrderStepHistDevEtl.TableName] = (factory, cacheDir) =>
|
||||
WorkOrderStepHistDevEtl.Create(factory, Path.Combine(cacheDir, WorkOrderStepHistDevEtl.CacheFileName)),
|
||||
[WorkOrderComponentCurrDevEtl.TableName] = (factory, cacheDir) =>
|
||||
WorkOrderComponentCurrDevEtl.Create(factory, Path.Combine(cacheDir, WorkOrderComponentCurrDevEtl.CacheFileName)),
|
||||
[WorkOrderRoutingDevEtl.TableName] = (factory, cacheDir) =>
|
||||
WorkOrderRoutingDevEtl.Create(factory, Path.Combine(cacheDir, WorkOrderRoutingDevEtl.CacheFileName)),
|
||||
[LotUsageCurrDevEtl.TableName] = (factory, cacheDir) =>
|
||||
LotUsageCurrDevEtl.Create(factory, Path.Combine(cacheDir, LotUsageCurrDevEtl.CacheFileName)),
|
||||
[WorkOrderStepCurrDevEtl.TableName] = (factory, cacheDir) =>
|
||||
WorkOrderStepCurrDevEtl.Create(factory, Path.Combine(cacheDir, WorkOrderStepCurrDevEtl.CacheFileName)),
|
||||
[WorkOrderTimeHistDevEtl.TableName] = (factory, cacheDir) =>
|
||||
WorkOrderTimeHistDevEtl.Create(factory, Path.Combine(cacheDir, WorkOrderTimeHistDevEtl.CacheFileName)),
|
||||
[WorkOrderTimeCurrDevEtl.TableName] = (factory, cacheDir) =>
|
||||
WorkOrderTimeCurrDevEtl.Create(factory, Path.Combine(cacheDir, WorkOrderTimeCurrDevEtl.CacheFileName)),
|
||||
};
|
||||
|
||||
public DevEtlRegistry(
|
||||
IDbConnectionFactory connectionFactory,
|
||||
string cacheDirectory,
|
||||
ILogger<DevEtlRegistry>? logger = null)
|
||||
{
|
||||
_connectionFactory = connectionFactory ?? throw new ArgumentNullException(nameof(connectionFactory));
|
||||
|
||||
if (string.IsNullOrWhiteSpace(cacheDirectory))
|
||||
throw new ArgumentException("Cache directory is required.", nameof(cacheDirectory));
|
||||
|
||||
if (!Directory.Exists(cacheDirectory))
|
||||
throw new DirectoryNotFoundException($"Cache directory not found: {cacheDirectory}");
|
||||
|
||||
_cacheDirectory = cacheDirectory;
|
||||
_logger = logger;
|
||||
}
|
||||
|
||||
public IEnumerable<string> GetAvailableTables() => _pipelineFactories.Keys;
|
||||
|
||||
public EtlPipeline GetPipeline(string tableName)
|
||||
{
|
||||
if (!_pipelineFactories.TryGetValue(tableName, out var factory))
|
||||
throw new ArgumentException($"No pipeline registered for table '{tableName}'.", nameof(tableName));
|
||||
|
||||
return factory(_connectionFactory, _cacheDirectory);
|
||||
}
|
||||
|
||||
public async Task<PipelineResult> RunAsync(string tableName, CancellationToken cancellationToken = default)
|
||||
{
|
||||
_logger?.LogInformation("Running dev ETL for {TableName}", tableName);
|
||||
|
||||
var pipeline = GetPipeline(tableName);
|
||||
var result = await pipeline.ExecuteAsync(cancellationToken);
|
||||
|
||||
if (result.Success)
|
||||
_logger?.LogInformation("Completed {TableName}: {Rows} rows in {Elapsed:g}",
|
||||
tableName, result.TotalRows, result.Elapsed);
|
||||
else
|
||||
_logger?.LogError(result.Error, "Failed {TableName}: {Error}",
|
||||
tableName, result.Error?.Message);
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
public async Task<IReadOnlyList<PipelineResult>> RunAllAsync(CancellationToken cancellationToken = default)
|
||||
{
|
||||
var results = new List<PipelineResult>();
|
||||
|
||||
foreach (var tableName in GetAvailableTables())
|
||||
{
|
||||
cancellationToken.ThrowIfCancellationRequested();
|
||||
var result = await RunAsync(tableName, cancellationToken);
|
||||
results.Add(result);
|
||||
}
|
||||
|
||||
return results;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Runs all dev ETL pipelines with parallelization.
|
||||
/// Small/medium tables run concurrently, very large tables run sequentially at the end.
|
||||
/// </summary>
|
||||
/// <param name="maxDegreeOfParallelism">Maximum concurrent table loads (default 4).</param>
|
||||
/// <param name="cancellationToken">Cancellation token.</param>
|
||||
public async Task<IReadOnlyList<PipelineResult>> RunAllParallelAsync(
|
||||
int maxDegreeOfParallelism = 4,
|
||||
CancellationToken cancellationToken = default)
|
||||
{
|
||||
var results = new ConcurrentBag<PipelineResult>();
|
||||
using var semaphore = new SemaphoreSlim(maxDegreeOfParallelism);
|
||||
|
||||
// Separate tables by size - run very large ones sequentially at the end
|
||||
var smallMediumTables = GetAvailableTables()
|
||||
.Where(t => !IsVeryLargeTable(t))
|
||||
.ToList();
|
||||
var veryLargeTables = GetAvailableTables()
|
||||
.Where(IsVeryLargeTable)
|
||||
.ToList();
|
||||
|
||||
_logger?.LogInformation(
|
||||
"Running {ParallelCount} tables in parallel (max {MaxParallel}), then {SequentialCount} large tables sequentially",
|
||||
smallMediumTables.Count, maxDegreeOfParallelism, veryLargeTables.Count);
|
||||
|
||||
// Run small/medium tables in parallel
|
||||
var tasks = smallMediumTables.Select(async tableName =>
|
||||
{
|
||||
await semaphore.WaitAsync(cancellationToken);
|
||||
try
|
||||
{
|
||||
var result = await RunAsync(tableName, cancellationToken);
|
||||
results.Add(result);
|
||||
}
|
||||
finally
|
||||
{
|
||||
semaphore.Release();
|
||||
}
|
||||
});
|
||||
|
||||
await Task.WhenAll(tasks);
|
||||
|
||||
// Run very large tables sequentially (IO-bound, would contend)
|
||||
foreach (var tableName in veryLargeTables)
|
||||
{
|
||||
cancellationToken.ThrowIfCancellationRequested();
|
||||
var result = await RunAsync(tableName, cancellationToken);
|
||||
results.Add(result);
|
||||
}
|
||||
|
||||
return results.ToList();
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Identifies very large tables that should be loaded sequentially to avoid IO contention.
|
||||
/// </summary>
|
||||
private static bool IsVeryLargeTable(string tableName) =>
|
||||
tableName.Contains("WorkOrderTime", StringComparison.OrdinalIgnoreCase) ||
|
||||
tableName.Contains("WorkOrderStep", StringComparison.OrdinalIgnoreCase) ||
|
||||
tableName.Contains("WorkOrderRouting", StringComparison.OrdinalIgnoreCase) ||
|
||||
tableName.Contains("WorkOrderComponent", StringComparison.OrdinalIgnoreCase) ||
|
||||
tableName.Contains("LotUsage", StringComparison.OrdinalIgnoreCase);
|
||||
}
|
||||
Reference in New Issue
Block a user