diff --git a/NEW/src/JdeScoping.DataSync/DevEtl/DevEtlRegistry.cs b/NEW/src/JdeScoping.DataSync/DevEtl/DevEtlRegistry.cs new file mode 100644 index 0000000..c8bff33 --- /dev/null +++ b/NEW/src/JdeScoping.DataSync/DevEtl/DevEtlRegistry.cs @@ -0,0 +1,82 @@ +using JdeScoping.DataAccess.Interfaces; +using JdeScoping.DataSync.Etl.Pipeline; +using JdeScoping.DataSync.Etl.Results; +using Microsoft.Extensions.Logging; + +namespace JdeScoping.DataSync.DevEtl; + +/// +/// Registry for development ETL pipelines that load from cached JSON files. +/// +public class DevEtlRegistry +{ + private readonly IDbConnectionFactory _connectionFactory; + private readonly string _cacheDirectory; + private readonly ILogger? _logger; + + private readonly Dictionary> _pipelineFactories = new(StringComparer.OrdinalIgnoreCase) + { + [BranchDevEtl.TableName] = (factory, cacheDir) => + BranchDevEtl.Create(factory, Path.Combine(cacheDir, BranchDevEtl.CacheFileName)), + }; + + public DevEtlRegistry( + IDbConnectionFactory connectionFactory, + string cacheDirectory, + ILogger? logger = null) + { + _connectionFactory = connectionFactory ?? throw new ArgumentNullException(nameof(connectionFactory)); + + if (string.IsNullOrWhiteSpace(cacheDirectory)) + throw new ArgumentException("Cache directory is required.", nameof(cacheDirectory)); + + if (!Directory.Exists(cacheDirectory)) + throw new DirectoryNotFoundException($"Cache directory not found: {cacheDirectory}"); + + _cacheDirectory = cacheDirectory; + _logger = logger; + } + + public IEnumerable GetAvailableTables() => _pipelineFactories.Keys; + + public EtlPipeline GetPipeline(string tableName) + { + if (!_pipelineFactories.TryGetValue(tableName, out var factory)) + throw new ArgumentException($"No pipeline registered for table '{tableName}'.", nameof(tableName)); + + return factory(_connectionFactory, _cacheDirectory); + } + + public async Task RunAsync(string tableName, CancellationToken cancellationToken = default) + { + _logger?.LogInformation("Running dev ETL for {TableName}", tableName); + + var pipeline = GetPipeline(tableName); + var result = await pipeline.ExecuteAsync(cancellationToken); + + if (result.Success) + _logger?.LogInformation("Completed {TableName}: {Rows} rows in {Elapsed:g}", + tableName, result.TotalRows, result.Elapsed); + else + _logger?.LogError(result.Error, "Failed {TableName}: {Error}", + tableName, result.Error?.Message); + + return result; + } + + public async Task> RunAllAsync(CancellationToken cancellationToken = default) + { + var results = new List(); + + foreach (var tableName in GetAvailableTables()) + { + if (cancellationToken.IsCancellationRequested) + break; + + var result = await RunAsync(tableName, cancellationToken); + results.Add(result); + } + + return results; + } +}