feat(datasync): add DevEtlRegistry for managing development ETL pipelines
This commit is contained in:
@@ -0,0 +1,82 @@
|
|||||||
|
using JdeScoping.DataAccess.Interfaces;
|
||||||
|
using JdeScoping.DataSync.Etl.Pipeline;
|
||||||
|
using JdeScoping.DataSync.Etl.Results;
|
||||||
|
using Microsoft.Extensions.Logging;
|
||||||
|
|
||||||
|
namespace JdeScoping.DataSync.DevEtl;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Registry for development ETL pipelines that load from cached JSON files.
|
||||||
|
/// </summary>
|
||||||
|
public class DevEtlRegistry
|
||||||
|
{
|
||||||
|
private readonly IDbConnectionFactory _connectionFactory;
|
||||||
|
private readonly string _cacheDirectory;
|
||||||
|
private readonly ILogger<DevEtlRegistry>? _logger;
|
||||||
|
|
||||||
|
private readonly Dictionary<string, Func<IDbConnectionFactory, string, EtlPipeline>> _pipelineFactories = new(StringComparer.OrdinalIgnoreCase)
|
||||||
|
{
|
||||||
|
[BranchDevEtl.TableName] = (factory, cacheDir) =>
|
||||||
|
BranchDevEtl.Create(factory, Path.Combine(cacheDir, BranchDevEtl.CacheFileName)),
|
||||||
|
};
|
||||||
|
|
||||||
|
public DevEtlRegistry(
|
||||||
|
IDbConnectionFactory connectionFactory,
|
||||||
|
string cacheDirectory,
|
||||||
|
ILogger<DevEtlRegistry>? logger = null)
|
||||||
|
{
|
||||||
|
_connectionFactory = connectionFactory ?? throw new ArgumentNullException(nameof(connectionFactory));
|
||||||
|
|
||||||
|
if (string.IsNullOrWhiteSpace(cacheDirectory))
|
||||||
|
throw new ArgumentException("Cache directory is required.", nameof(cacheDirectory));
|
||||||
|
|
||||||
|
if (!Directory.Exists(cacheDirectory))
|
||||||
|
throw new DirectoryNotFoundException($"Cache directory not found: {cacheDirectory}");
|
||||||
|
|
||||||
|
_cacheDirectory = cacheDirectory;
|
||||||
|
_logger = logger;
|
||||||
|
}
|
||||||
|
|
||||||
|
public IEnumerable<string> GetAvailableTables() => _pipelineFactories.Keys;
|
||||||
|
|
||||||
|
public EtlPipeline GetPipeline(string tableName)
|
||||||
|
{
|
||||||
|
if (!_pipelineFactories.TryGetValue(tableName, out var factory))
|
||||||
|
throw new ArgumentException($"No pipeline registered for table '{tableName}'.", nameof(tableName));
|
||||||
|
|
||||||
|
return factory(_connectionFactory, _cacheDirectory);
|
||||||
|
}
|
||||||
|
|
||||||
|
public async Task<PipelineResult> RunAsync(string tableName, CancellationToken cancellationToken = default)
|
||||||
|
{
|
||||||
|
_logger?.LogInformation("Running dev ETL for {TableName}", tableName);
|
||||||
|
|
||||||
|
var pipeline = GetPipeline(tableName);
|
||||||
|
var result = await pipeline.ExecuteAsync(cancellationToken);
|
||||||
|
|
||||||
|
if (result.Success)
|
||||||
|
_logger?.LogInformation("Completed {TableName}: {Rows} rows in {Elapsed:g}",
|
||||||
|
tableName, result.TotalRows, result.Elapsed);
|
||||||
|
else
|
||||||
|
_logger?.LogError(result.Error, "Failed {TableName}: {Error}",
|
||||||
|
tableName, result.Error?.Message);
|
||||||
|
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
public async Task<IReadOnlyList<PipelineResult>> RunAllAsync(CancellationToken cancellationToken = default)
|
||||||
|
{
|
||||||
|
var results = new List<PipelineResult>();
|
||||||
|
|
||||||
|
foreach (var tableName in GetAvailableTables())
|
||||||
|
{
|
||||||
|
if (cancellationToken.IsCancellationRequested)
|
||||||
|
break;
|
||||||
|
|
||||||
|
var result = await RunAsync(tableName, cancellationToken);
|
||||||
|
results.Add(result);
|
||||||
|
}
|
||||||
|
|
||||||
|
return results;
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user