using System.Text.Json; using JdeScoping.Core.Models.Enums; using JdeScoping.DataAccess.Interfaces; using JdeScoping.DataSync.Configuration; using JdeScoping.DataSync.Contracts; using JdeScoping.DataSync.Etl.Contracts; using JdeScoping.DataSync.Etl.Destinations; using JdeScoping.DataSync.Etl.Pipeline; using JdeScoping.DataSync.Etl.Scripts; using JdeScoping.DataSync.Etl.Sources; using JdeScoping.DataSync.Options; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; namespace JdeScoping.DataSync.Services; /// /// Factory for creating ETL pipelines from JSON configuration. /// public class EtlPipelineFactory : IEtlPipelineFactory { private static readonly JsonSerializerOptions JsonOptions = new() { PropertyNameCaseInsensitive = true, ReadCommentHandling = JsonCommentHandling.Skip, AllowTrailingCommas = true }; private readonly IDbConnectionFactory _connectionFactory; private readonly ILogger _logger; private readonly PipelinesRoot _config; /// /// Creates a new pipeline factory. /// /// Factory for creating database connections. /// Pipeline configuration options. /// Logger for pipeline execution. public EtlPipelineFactory( IDbConnectionFactory connectionFactory, IOptions options, ILogger logger) { ArgumentNullException.ThrowIfNull(connectionFactory); ArgumentNullException.ThrowIfNull(options); ArgumentNullException.ThrowIfNull(logger); _connectionFactory = connectionFactory; _logger = logger; _config = LoadPipelineConfigs(options.Value.ConfigPath); } /// /// Creates a new pipeline factory with a pre-loaded configuration (for testing). /// internal EtlPipelineFactory( IDbConnectionFactory connectionFactory, PipelinesRoot config, ILogger logger) { ArgumentNullException.ThrowIfNull(connectionFactory); ArgumentNullException.ThrowIfNull(config); ArgumentNullException.ThrowIfNull(logger); ValidateConfig(config); _connectionFactory = connectionFactory; _logger = logger; _config = config; } /// public IEtlPipelineBuilder ForTable(string tableName) { ArgumentException.ThrowIfNullOrWhiteSpace(tableName); if (!_config.Pipelines.TryGetValue(tableName, out var pipelineConfig)) { throw new InvalidOperationException( $"No pipeline configured for table: {tableName}. " + $"Available tables: {string.Join(", ", _config.Pipelines.Keys)}"); } return new PipelineBuilder( _connectionFactory, tableName, pipelineConfig, _config.EffectiveSettings, _config.EffectiveScheduleDefaults, _logger); } private PipelinesRoot LoadPipelineConfigs(string configPath) { // Resolve path relative to assembly location (handles both debug and publish) var assemblyDir = Path.GetDirectoryName(typeof(EtlPipelineFactory).Assembly.Location)!; var fullPath = Path.Combine(assemblyDir, configPath); if (!File.Exists(fullPath)) { throw new FileNotFoundException( $"Pipeline config not found: {fullPath}. " + "Ensure the config file is included in the build output."); } var json = File.ReadAllText(fullPath); var root = JsonSerializer.Deserialize(json, JsonOptions) ?? throw new InvalidOperationException("Failed to deserialize pipeline config: result was null."); ValidateConfig(root); return root; } private static void ValidateConfig(PipelinesRoot root) { foreach (var (name, config) in root.Pipelines) { // Schedules are now required if (config.Schedules == null) { throw new InvalidOperationException( $"Pipeline '{name}' must define 'schedules'."); } // Validate no runtime parameters (not yet supported) if (config.Source.Parameters != null) { foreach (var (paramName, paramConfig) in config.Source.Parameters) { if (paramConfig.Source.Equals("runtime", StringComparison.OrdinalIgnoreCase)) { throw new NotSupportedException( $"Pipeline '{name}' parameter '{paramName}': " + "runtime parameter source is not yet supported."); } } } } } private sealed class PipelineBuilder : IEtlPipelineBuilder { private readonly IDbConnectionFactory _connectionFactory; private readonly string _tableName; private readonly PipelineConfig _config; private readonly PipelineSettings _settings; private readonly ScheduleDefaults _scheduleDefaults; private readonly ILogger _logger; private UpdateTypes _updateType = UpdateTypes.Hourly; private DateTime? _minDtOverride; public PipelineBuilder( IDbConnectionFactory connectionFactory, string tableName, PipelineConfig config, PipelineSettings settings, ScheduleDefaults scheduleDefaults, ILogger logger) { _connectionFactory = connectionFactory; _tableName = tableName; _config = config; _settings = settings; _scheduleDefaults = scheduleDefaults; _logger = logger; } public IEtlPipelineBuilder WithUpdateType(UpdateTypes updateType) { _updateType = updateType; return this; } public IEtlPipelineBuilder WithMinimumDate(DateTime? minDt) { _minDtOverride = minDt; return this; } public EtlPipeline Build() { return BuildWithSchedules(); } private EtlPipeline BuildWithSchedules() { var scheduleConfig = GetEffectiveScheduleConfig(_updateType); // Compute MinDt from override var minDt = _minDtOverride; // Use massQuery for Mass, regular query for Daily/Hourly var useMassQuery = _updateType == UpdateTypes.Mass && !string.IsNullOrEmpty(_config.Source.MassQuery); // Create source with parameter substitution var source = CreateSource(_config.Source, minDt, useMassQuery); // Determine destination type (Mass with prePurge = bulkImport, others = bulkMerge unless prePurge) var destType = scheduleConfig.PrePurge ? "bulkImport" : "bulkMerge"; var destination = CreateDestination(destType, _config.Destination, scheduleConfig); // Build pipeline with scripts var builder = new EtlPipelineBuilder() .WithName(_tableName) .WithSource(source) .WithDestination(destination) .WithLogger(_logger); // Add pre-scripts: config scripts first, then prePurge foreach (var script in _config.PreScripts ?? []) { builder.WithPreScript(new SqlScriptRunner(_connectionFactory, script, $"PreScript:{script.Substring(0, Math.Min(30, script.Length))}")); } if (scheduleConfig.PrePurge) { var truncateSql = $"TRUNCATE TABLE [{_config.Destination.Table}]"; builder.WithPreScript(new SqlScriptRunner(_connectionFactory, truncateSql, "PrePurge")); } // Add post-scripts: reIndex first, then config scripts if (scheduleConfig.ReIndex) { var reindexSql = $"ALTER INDEX ALL ON [{_config.Destination.Table}] REBUILD"; builder.WithPostScript(new SqlScriptRunner(_connectionFactory, reindexSql, "ReIndex")); } foreach (var script in _config.PostScripts ?? []) { builder.WithPostScript(new SqlScriptRunner(_connectionFactory, script, $"PostScript:{script.Substring(0, Math.Min(30, script.Length))}")); } return builder.Build(); } private Configuration.ScheduleConfig GetEffectiveScheduleConfig(UpdateTypes updateType) { // Get default for this update type var defaultConfig = updateType switch { UpdateTypes.Mass => _scheduleDefaults.Mass, UpdateTypes.Daily => _scheduleDefaults.Daily, UpdateTypes.Hourly => _scheduleDefaults.Hourly, _ => _scheduleDefaults.Hourly }; // Get pipeline-specific override if exists var pipelineConfig = updateType switch { UpdateTypes.Mass => _config.Schedules?.Mass, UpdateTypes.Daily => _config.Schedules?.Daily, UpdateTypes.Hourly => _config.Schedules?.Hourly, _ => null }; // Merge: pipeline config overrides defaults return pipelineConfig?.MergeWith(defaultConfig) ?? defaultConfig; } private IImportSource CreateSource(SourceConfig sourceConfig, DateTime? minDt, bool useMassQuery) { // Use massQuery if specified, otherwise use the default query var query = useMassQuery ? sourceConfig.MassQuery! : sourceConfig.Query; var parameters = new Dictionary(); var converter = new ParameterFormatConverter(_settings.Timezone); // Only add parameters when not using massQuery (mass queries typically don't need date parameters) var needsParameters = !useMassQuery; if (sourceConfig.Parameters != null && minDt.HasValue && needsParameters) { foreach (var (_, paramConfig) in sourceConfig.Parameters) { var paramValue = paramConfig.Source.ToLowerInvariant() switch { "offset" => converter.Convert(minDt.Value, paramConfig.Format), "static" => paramConfig.Value ?? throw new InvalidOperationException( $"Static parameter '{paramConfig.Name}' requires a value."), _ => throw new NotSupportedException( $"Parameter source '{paramConfig.Source}' is not supported.") }; // Use the parameter name exactly as configured (provider-specific) parameters[paramConfig.Name] = paramValue; } } return new DbQuerySource( _connectionFactory, sourceConfig.Connection, query, parameters); } private IImportDestination CreateDestination( string destType, DestinationConfig baseConfig, Configuration.ScheduleConfig scheduleConfig) { var tableName = baseConfig.Table; // Use base config for match/exclude columns var matchColumns = baseConfig.MatchColumns?.ToArray(); var excludeFromUpdate = baseConfig.ExcludeFromUpdate?.ToArray(); return destType.ToLowerInvariant() switch { "bulkimport" => new DbBulkImportDestination(_connectionFactory, tableName), "bulkmerge" => new DbBulkMergeDestination( _connectionFactory, tableName, matchColumns ?? throw new InvalidOperationException( $"matchColumns required for bulkMerge destination on table '{tableName}'."), updateColumns: null, excludeFromUpdate: excludeFromUpdate, updateCondition: scheduleConfig.UpdateWhen), _ => throw new InvalidOperationException( $"Unknown destination type: '{destType}'. Expected 'bulkImport' or 'bulkMerge'.") }; } } }