feat(datasync): add EtlPipelineFactory with JSON config support

- Implement IEtlPipelineFactory with ForTable() method returning a builder
- Load pipeline config from JSON file path (from PipelineOptions)
- Parse config using System.Text.Json with PropertyNameCaseInsensitive
- Builder supports WithMode() and WithMinimumDate() fluent methods
- Create DbQuerySource for source with ParameterFormatConverter for JDE dates
- Create DbBulkMergeDestination or DbBulkImportDestination based on sync mode
- Mass mode defaults to bulkImport, incremental defaults to bulkMerge
- Support destination override in sync mode config
- Execute pre/post scripts from config (prePurge, reIndex, custom scripts)
- Validate config: require mass and incremental modes, reject runtime params
- Add comprehensive tests for factory, builder, and config validation
This commit is contained in:
Joseph Doherty
2026-01-06 13:45:36 -05:00
parent afb6ad4f09
commit 795c15df56
2 changed files with 1126 additions and 0 deletions
@@ -0,0 +1,316 @@
using System.Text.Json;
using JdeScoping.DataAccess.Interfaces;
using JdeScoping.DataSync.Configuration;
using JdeScoping.DataSync.Contracts;
using JdeScoping.DataSync.Etl.Contracts;
using JdeScoping.DataSync.Etl.Destinations;
using JdeScoping.DataSync.Etl.Pipeline;
using JdeScoping.DataSync.Etl.Scripts;
using JdeScoping.DataSync.Etl.Sources;
using JdeScoping.DataSync.Options;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
namespace JdeScoping.DataSync.Services;
/// <summary>
/// Factory for creating ETL pipelines from JSON configuration.
/// </summary>
public class EtlPipelineFactory : IEtlPipelineFactory
{
private static readonly JsonSerializerOptions JsonOptions = new()
{
PropertyNameCaseInsensitive = true,
ReadCommentHandling = JsonCommentHandling.Skip,
AllowTrailingCommas = true
};
private readonly IDbConnectionFactory _connectionFactory;
private readonly ILogger<EtlPipeline> _logger;
private readonly PipelinesRoot _config;
/// <summary>
/// Creates a new pipeline factory.
/// </summary>
/// <param name="connectionFactory">Factory for creating database connections.</param>
/// <param name="options">Pipeline configuration options.</param>
/// <param name="logger">Logger for pipeline execution.</param>
public EtlPipelineFactory(
IDbConnectionFactory connectionFactory,
IOptions<PipelineOptions> options,
ILogger<EtlPipeline> logger)
{
ArgumentNullException.ThrowIfNull(connectionFactory);
ArgumentNullException.ThrowIfNull(options);
ArgumentNullException.ThrowIfNull(logger);
_connectionFactory = connectionFactory;
_logger = logger;
_config = LoadPipelineConfigs(options.Value.ConfigPath);
}
/// <summary>
/// Creates a new pipeline factory with a pre-loaded configuration (for testing).
/// </summary>
internal EtlPipelineFactory(
IDbConnectionFactory connectionFactory,
PipelinesRoot config,
ILogger<EtlPipeline> logger)
{
ArgumentNullException.ThrowIfNull(connectionFactory);
ArgumentNullException.ThrowIfNull(config);
ArgumentNullException.ThrowIfNull(logger);
ValidateConfig(config);
_connectionFactory = connectionFactory;
_logger = logger;
_config = config;
}
/// <inheritdoc />
public IEtlPipelineBuilder ForTable(string tableName)
{
ArgumentException.ThrowIfNullOrWhiteSpace(tableName);
if (!_config.Pipelines.TryGetValue(tableName, out var pipelineConfig))
{
throw new InvalidOperationException(
$"No pipeline configured for table: {tableName}. " +
$"Available tables: {string.Join(", ", _config.Pipelines.Keys)}");
}
return new PipelineBuilder(
_connectionFactory,
tableName,
pipelineConfig,
_config.EffectiveSettings,
_logger);
}
private PipelinesRoot LoadPipelineConfigs(string configPath)
{
// Resolve path relative to assembly location (handles both debug and publish)
var assemblyDir = Path.GetDirectoryName(typeof(EtlPipelineFactory).Assembly.Location)!;
var fullPath = Path.Combine(assemblyDir, configPath);
if (!File.Exists(fullPath))
{
throw new FileNotFoundException(
$"Pipeline config not found: {fullPath}. " +
"Ensure the config file is included in the build output.");
}
var json = File.ReadAllText(fullPath);
var root = JsonSerializer.Deserialize<PipelinesRoot>(json, JsonOptions)
?? throw new InvalidOperationException("Failed to deserialize pipeline config: result was null.");
ValidateConfig(root);
return root;
}
private static void ValidateConfig(PipelinesRoot root)
{
foreach (var (name, config) in root.Pipelines)
{
// Validate required sync modes
if (!config.SyncModes.ContainsKey("mass"))
{
throw new InvalidOperationException(
$"Pipeline '{name}' missing required 'mass' sync mode.");
}
if (!config.SyncModes.ContainsKey("incremental"))
{
throw new InvalidOperationException(
$"Pipeline '{name}' missing required 'incremental' sync mode.");
}
// Validate no runtime parameters (not yet supported)
if (config.Source.Parameters != null)
{
foreach (var (paramName, paramConfig) in config.Source.Parameters)
{
if (paramConfig.Source.Equals("runtime", StringComparison.OrdinalIgnoreCase))
{
throw new NotSupportedException(
$"Pipeline '{name}' parameter '{paramName}': " +
"runtime parameter source is not yet supported.");
}
}
}
}
}
private sealed class PipelineBuilder : IEtlPipelineBuilder
{
private readonly IDbConnectionFactory _connectionFactory;
private readonly string _tableName;
private readonly PipelineConfig _config;
private readonly PipelineSettings _settings;
private readonly ILogger<EtlPipeline> _logger;
private SyncMode _mode = SyncMode.Incremental;
private DateTime? _minDtOverride;
public PipelineBuilder(
IDbConnectionFactory connectionFactory,
string tableName,
PipelineConfig config,
PipelineSettings settings,
ILogger<EtlPipeline> logger)
{
_connectionFactory = connectionFactory;
_tableName = tableName;
_config = config;
_settings = settings;
_logger = logger;
}
public IEtlPipelineBuilder WithMode(SyncMode mode)
{
_mode = mode;
return this;
}
public IEtlPipelineBuilder WithMinimumDate(DateTime? minDt)
{
_minDtOverride = minDt;
return this;
}
public EtlPipeline Build()
{
var modeKey = _mode == SyncMode.Mass ? "mass" : "incremental";
if (!_config.SyncModes.TryGetValue(modeKey, out var modeConfig))
{
throw new InvalidOperationException(
$"Sync mode '{modeKey}' not defined for table '{_tableName}'.");
}
// Compute MinDt from offset or override
var minDt = _minDtOverride ?? ComputeMinDt(modeConfig.MinDtOffset);
// Create source with parameter substitution
var source = CreateSource(_config.Source, minDt);
// Determine destination type (mode override > default by mode)
var destType = modeConfig.Destination?.Type
?? (_mode == SyncMode.Mass ? "bulkImport" : "bulkMerge");
var destination = CreateDestination(destType, _config.Destination, modeConfig);
// Build pipeline with scripts
var builder = new EtlPipelineBuilder()
.WithName(_tableName)
.WithSource(source)
.WithDestination(destination)
.WithLogger(_logger);
// Add pre-scripts: config scripts first, then prePurge
foreach (var script in _config.PreScripts ?? [])
{
builder.WithPreScript(new SqlScriptRunner(_connectionFactory, script, $"PreScript:{script.Substring(0, Math.Min(30, script.Length))}"));
}
if (modeConfig.PrePurge)
{
var truncateSql = $"TRUNCATE TABLE [{_config.Destination.Table}]";
builder.WithPreScript(new SqlScriptRunner(_connectionFactory, truncateSql, "PrePurge"));
}
// Add post-scripts: reIndex first, then config scripts
if (modeConfig.ReIndex)
{
var reindexSql = $"ALTER INDEX ALL ON [{_config.Destination.Table}] REBUILD";
builder.WithPostScript(new SqlScriptRunner(_connectionFactory, reindexSql, "ReIndex"));
}
foreach (var script in _config.PostScripts ?? [])
{
builder.WithPostScript(new SqlScriptRunner(_connectionFactory, script, $"PostScript:{script.Substring(0, Math.Min(30, script.Length))}"));
}
// Transformers are not yet implemented - placeholder for future
// foreach (var t in _config.Transformers ?? [])
// builder.WithTransformer(CreateTransformer(t));
return builder.Build();
}
private DateTime? ComputeMinDt(string? minDtOffset)
{
if (string.IsNullOrEmpty(minDtOffset))
return null;
if (!TimeSpan.TryParse(minDtOffset, out var offset))
{
throw new InvalidOperationException(
$"Invalid minDtOffset format: '{minDtOffset}'. Expected TimeSpan format (e.g., '-7.00:00:00').");
}
return DateTime.UtcNow.Add(offset);
}
private IImportSource CreateSource(SourceConfig sourceConfig, DateTime? minDt)
{
var parameters = new Dictionary<string, object>();
var converter = new ParameterFormatConverter(_settings.Timezone);
if (sourceConfig.Parameters != null && minDt.HasValue)
{
foreach (var (_, paramConfig) in sourceConfig.Parameters)
{
var paramValue = paramConfig.Source.ToLowerInvariant() switch
{
"offset" => converter.Convert(minDt.Value, paramConfig.Format),
"static" => paramConfig.Value
?? throw new InvalidOperationException(
$"Static parameter '{paramConfig.Name}' requires a value."),
_ => throw new NotSupportedException(
$"Parameter source '{paramConfig.Source}' is not supported.")
};
// Use the parameter name exactly as configured (provider-specific)
parameters[paramConfig.Name] = paramValue;
}
}
return new DbQuerySource(
_connectionFactory,
sourceConfig.Connection,
sourceConfig.Query,
parameters);
}
private IImportDestination CreateDestination(
string destType,
DestinationConfig baseConfig,
SyncModeConfig modeConfig)
{
var tableName = baseConfig.Table;
// Merge mode-specific destination config with base
var matchColumns = modeConfig.Destination?.MatchColumns?.ToArray()
?? baseConfig.MatchColumns?.ToArray();
var excludeFromUpdate = modeConfig.Destination?.ExcludeFromUpdate?.ToArray()
?? baseConfig.ExcludeFromUpdate?.ToArray();
return destType.ToLowerInvariant() switch
{
"bulkimport" => new DbBulkImportDestination(_connectionFactory, tableName),
"bulkmerge" => new DbBulkMergeDestination(
_connectionFactory,
tableName,
matchColumns ?? throw new InvalidOperationException(
$"matchColumns required for bulkMerge destination on table '{tableName}'."),
updateColumns: null,
excludeFromUpdate: excludeFromUpdate,
updateCondition: modeConfig.UpdateWhen),
_ => throw new InvalidOperationException(
$"Unknown destination type: '{destType}'. Expected 'bulkImport' or 'bulkMerge'.")
};
}
}
}