using System.Collections.Concurrent; using JdeScoping.DataSync.Dev.Contracts; using JdeScoping.DataSync.Etl.Results; using Microsoft.Extensions.Logging; namespace JdeScoping.DataSync.Dev; /// /// Registry for development ETL pipelines that load from cached protobuf files. /// Uses JSON configuration via IDevEtlPipelineFactory. /// public class DevEtlRegistry { private readonly IDevEtlPipelineFactory _pipelineFactory; private readonly string _cacheDirectory; private readonly ILogger? _logger; public DevEtlRegistry( IDevEtlPipelineFactory pipelineFactory, string cacheDirectory, ILogger? logger = null) { _pipelineFactory = pipelineFactory ?? throw new ArgumentNullException(nameof(pipelineFactory)); if (string.IsNullOrWhiteSpace(cacheDirectory)) throw new ArgumentException("Cache directory is required.", nameof(cacheDirectory)); if (!Directory.Exists(cacheDirectory)) throw new DirectoryNotFoundException($"Cache directory not found: {cacheDirectory}"); _cacheDirectory = cacheDirectory; _logger = logger; } public IEnumerable GetAvailableTables() => _pipelineFactory.GetAvailableTables(); public async Task RunAsync(string tableName, CancellationToken cancellationToken = default) { _logger?.LogInformation("Running dev ETL for {TableName}", tableName); var pipeline = _pipelineFactory.GetPipeline(tableName, _cacheDirectory); var result = await pipeline.ExecuteAsync(cancellationToken); if (result.Success) _logger?.LogInformation("Completed {TableName}: {Rows} rows in {Elapsed:g}", tableName, result.TotalRows, result.Elapsed); else _logger?.LogError(result.Error, "Failed {TableName}: {Error}", tableName, result.Error?.Message); return result; } public async Task> RunAllAsync(CancellationToken cancellationToken = default) { var results = new List(); foreach (var tableName in GetAvailableTables()) { cancellationToken.ThrowIfCancellationRequested(); var result = await RunAsync(tableName, cancellationToken); results.Add(result); } return results; } /// /// Runs all dev ETL pipelines with parallelization. /// Small/medium tables run concurrently, very large tables run sequentially at the end. /// /// Maximum concurrent table loads (default 4). /// Cancellation token. public async Task> RunAllParallelAsync( int maxDegreeOfParallelism = 4, CancellationToken cancellationToken = default) { var results = new ConcurrentBag(); using var semaphore = new SemaphoreSlim(maxDegreeOfParallelism); // Separate tables by size - run very large ones sequentially at the end var smallMediumTables = GetAvailableTables() .Where(t => !_pipelineFactory.IsVeryLargeTable(t)) .ToList(); var veryLargeTables = GetAvailableTables() .Where(t => _pipelineFactory.IsVeryLargeTable(t)) .ToList(); _logger?.LogInformation( "Running {ParallelCount} tables in parallel (max {MaxParallel}), then {SequentialCount} large tables sequentially", smallMediumTables.Count, maxDegreeOfParallelism, veryLargeTables.Count); // Run small/medium tables in parallel var tasks = smallMediumTables.Select(async tableName => { await semaphore.WaitAsync(cancellationToken); try { var result = await RunAsync(tableName, cancellationToken); results.Add(result); } finally { semaphore.Release(); } }); await Task.WhenAll(tasks); // Run very large tables sequentially (IO-bound, would contend) foreach (var tableName in veryLargeTables) { cancellationToken.ThrowIfCancellationRequested(); var result = await RunAsync(tableName, cancellationToken); results.Add(result); } return results.ToList(); } }