From bb54994f2d24784c3b4811fcd60084026f3c82ec Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Wed, 7 Jan 2026 01:15:18 -0500 Subject: [PATCH] feat(datasync): add WithUpdateType to IEtlPipelineBuilder - Add WithUpdateType(UpdateTypes) method to IEtlPipelineBuilder interface - Mark existing WithMode(SyncMode) as [Obsolete("Use WithUpdateType instead")] - Update PipelineBuilder to store UpdateTypes instead of SyncMode - Add GetEffectiveScheduleConfig method to merge pipeline schedules with defaults - Add BuildWithSchedules method for new Schedules-based config - Update validation to support both old SyncModes and new Schedules formats - Pass ScheduleDefaults from PipelinesRoot to PipelineBuilder - For Mass mode: use massQuery, apply prePurge/reIndex from schedule config - For Daily/Hourly: use regular query with date parameters - Add 8 new tests for WithUpdateType functionality --- .../Contracts/IEtlPipelineFactory.cs | 25 ++ .../Services/EtlPipelineFactory.cs | 218 ++++++++++++++++-- .../Services/EtlPipelineFactoryTests.cs | 164 +++++++++++++ 3 files changed, 392 insertions(+), 15 deletions(-) diff --git a/NEW/src/JdeScoping.DataSync/Contracts/IEtlPipelineFactory.cs b/NEW/src/JdeScoping.DataSync/Contracts/IEtlPipelineFactory.cs index 2dff781..08e9e97 100644 --- a/NEW/src/JdeScoping.DataSync/Contracts/IEtlPipelineFactory.cs +++ b/NEW/src/JdeScoping.DataSync/Contracts/IEtlPipelineFactory.cs @@ -1,3 +1,4 @@ +using JdeScoping.Core.Models.Enums; using JdeScoping.DataSync.Etl.Pipeline; namespace JdeScoping.DataSync.Contracts; @@ -9,7 +10,31 @@ public interface IEtlPipelineFactory public interface IEtlPipelineBuilder { + /// + /// Sets the sync mode for this pipeline. + /// + /// The sync mode (Mass or Incremental). + /// The builder for chaining. + [Obsolete("Use WithUpdateType instead")] IEtlPipelineBuilder WithMode(SyncMode mode); + + /// + /// Sets the update type for this pipeline (Mass, Daily, or Hourly). + /// + /// The update type. + /// The builder for chaining. + IEtlPipelineBuilder WithUpdateType(UpdateTypes updateType); + + /// + /// Sets an optional minimum date for filtering source data. + /// + /// The minimum date, or null to use config offset. + /// The builder for chaining. IEtlPipelineBuilder WithMinimumDate(DateTime? minDt); + + /// + /// Builds the pipeline with the configured settings. + /// + /// The configured pipeline. EtlPipeline Build(); } diff --git a/NEW/src/JdeScoping.DataSync/Services/EtlPipelineFactory.cs b/NEW/src/JdeScoping.DataSync/Services/EtlPipelineFactory.cs index 9ed61e6..bc83eb6 100644 --- a/NEW/src/JdeScoping.DataSync/Services/EtlPipelineFactory.cs +++ b/NEW/src/JdeScoping.DataSync/Services/EtlPipelineFactory.cs @@ -1,4 +1,5 @@ using System.Text.Json; +using JdeScoping.Core.Models.Enums; using JdeScoping.DataAccess.Interfaces; using JdeScoping.DataSync.Configuration; using JdeScoping.DataSync.Contracts; @@ -85,6 +86,7 @@ public class EtlPipelineFactory : IEtlPipelineFactory tableName, pipelineConfig, _config.EffectiveSettings, + _config.EffectiveScheduleDefaults, _logger); } @@ -113,17 +115,34 @@ public class EtlPipelineFactory : IEtlPipelineFactory { foreach (var (name, config) in root.Pipelines) { - // Validate required sync modes - if (!config.SyncModes.ContainsKey("mass")) - { - throw new InvalidOperationException( - $"Pipeline '{name}' missing required 'mass' sync mode."); - } + // Accept either old SyncModes or new Schedules format + var hasOldConfig = config.SyncModes != null && + config.SyncModes.ContainsKey("mass") && + config.SyncModes.ContainsKey("incremental"); + var hasNewConfig = config.Schedules != null; - if (!config.SyncModes.ContainsKey("incremental")) + if (!hasOldConfig && !hasNewConfig) { - throw new InvalidOperationException( - $"Pipeline '{name}' missing required 'incremental' sync mode."); + // If neither format is present, check for the old partial config for backward-compat error messages + if (config.SyncModes != null) + { + if (!config.SyncModes.ContainsKey("mass")) + { + throw new InvalidOperationException( + $"Pipeline '{name}' missing required 'mass' sync mode."); + } + + if (!config.SyncModes.ContainsKey("incremental")) + { + throw new InvalidOperationException( + $"Pipeline '{name}' missing required 'incremental' sync mode."); + } + } + else + { + throw new InvalidOperationException( + $"Pipeline '{name}' must define either 'syncModes' (mass+incremental) or 'schedules'."); + } } // Validate no runtime parameters (not yet supported) @@ -148,8 +167,9 @@ public class EtlPipelineFactory : IEtlPipelineFactory private readonly string _tableName; private readonly PipelineConfig _config; private readonly PipelineSettings _settings; + private readonly ScheduleDefaults _scheduleDefaults; private readonly ILogger _logger; - private SyncMode _mode = SyncMode.Incremental; + private UpdateTypes _updateType = UpdateTypes.Hourly; private DateTime? _minDtOverride; public PipelineBuilder( @@ -157,18 +177,28 @@ public class EtlPipelineFactory : IEtlPipelineFactory string tableName, PipelineConfig config, PipelineSettings settings, + ScheduleDefaults scheduleDefaults, ILogger logger) { _connectionFactory = connectionFactory; _tableName = tableName; _config = config; _settings = settings; + _scheduleDefaults = scheduleDefaults; _logger = logger; } + [Obsolete("Use WithUpdateType instead")] public IEtlPipelineBuilder WithMode(SyncMode mode) { - _mode = mode; + // Map old SyncMode to new UpdateTypes for backward compatibility + _updateType = mode == SyncMode.Mass ? UpdateTypes.Mass : UpdateTypes.Hourly; + return this; + } + + public IEtlPipelineBuilder WithUpdateType(UpdateTypes updateType) + { + _updateType = updateType; return this; } @@ -180,9 +210,74 @@ public class EtlPipelineFactory : IEtlPipelineFactory public EtlPipeline Build() { - var modeKey = _mode == SyncMode.Mass ? "mass" : "incremental"; + // Check if using new Schedules format or old SyncModes format + if (_config.Schedules != null) + { + return BuildWithSchedules(); + } + else + { + return BuildWithSyncModes(); + } + } - if (!_config.SyncModes.TryGetValue(modeKey, out var modeConfig)) + private EtlPipeline BuildWithSchedules() + { + var scheduleConfig = GetEffectiveScheduleConfig(_updateType); + + // Compute MinDt from override + var minDt = _minDtOverride; + + // Use massQuery for Mass, regular query for Daily/Hourly + var useMassQuery = _updateType == UpdateTypes.Mass && !string.IsNullOrEmpty(_config.Source.MassQuery); + + // Create source with parameter substitution + var source = CreateSourceWithUpdateType(_config.Source, minDt, useMassQuery); + + // Determine destination type (Mass with prePurge = bulkImport, others = bulkMerge unless prePurge) + var destType = scheduleConfig.PrePurge ? "bulkImport" : "bulkMerge"; + var destination = CreateDestinationWithSchedule(destType, _config.Destination, scheduleConfig); + + // Build pipeline with scripts + var builder = new EtlPipelineBuilder() + .WithName(_tableName) + .WithSource(source) + .WithDestination(destination) + .WithLogger(_logger); + + // Add pre-scripts: config scripts first, then prePurge + foreach (var script in _config.PreScripts ?? []) + { + builder.WithPreScript(new SqlScriptRunner(_connectionFactory, script, $"PreScript:{script.Substring(0, Math.Min(30, script.Length))}")); + } + + if (scheduleConfig.PrePurge) + { + var truncateSql = $"TRUNCATE TABLE [{_config.Destination.Table}]"; + builder.WithPreScript(new SqlScriptRunner(_connectionFactory, truncateSql, "PrePurge")); + } + + // Add post-scripts: reIndex first, then config scripts + if (scheduleConfig.ReIndex) + { + var reindexSql = $"ALTER INDEX ALL ON [{_config.Destination.Table}] REBUILD"; + builder.WithPostScript(new SqlScriptRunner(_connectionFactory, reindexSql, "ReIndex")); + } + + foreach (var script in _config.PostScripts ?? []) + { + builder.WithPostScript(new SqlScriptRunner(_connectionFactory, script, $"PostScript:{script.Substring(0, Math.Min(30, script.Length))}")); + } + + return builder.Build(); + } + + private EtlPipeline BuildWithSyncModes() + { + // Map UpdateTypes to old sync mode keys for backward compatibility + var modeKey = _updateType == UpdateTypes.Mass ? "mass" : "incremental"; + + if (!_config.SyncModes!.TryGetValue(modeKey, out var modeConfig)) { throw new InvalidOperationException( $"Sync mode '{modeKey}' not defined for table '{_tableName}'."); @@ -191,12 +286,15 @@ public class EtlPipelineFactory : IEtlPipelineFactory // Compute MinDt from offset or override var minDt = _minDtOverride ?? ComputeMinDt(modeConfig.MinDtOffset); + // Convert UpdateTypes to SyncMode for backward compatibility with CreateSource + var syncMode = _updateType == UpdateTypes.Mass ? SyncMode.Mass : SyncMode.Incremental; + // Create source with parameter substitution - var source = CreateSource(_config.Source, minDt, _mode); + var source = CreateSource(_config.Source, minDt, syncMode); // Determine destination type (mode override > default by mode) var destType = modeConfig.Destination?.Type - ?? (_mode == SyncMode.Mass ? "bulkImport" : "bulkMerge"); + ?? (_updateType == UpdateTypes.Mass ? "bulkImport" : "bulkMerge"); var destination = CreateDestination(destType, _config.Destination, modeConfig); // Build pipeline with scripts @@ -237,6 +335,30 @@ public class EtlPipelineFactory : IEtlPipelineFactory return builder.Build(); } + private Configuration.ScheduleConfig GetEffectiveScheduleConfig(UpdateTypes updateType) + { + // Get default for this update type + var defaultConfig = updateType switch + { + UpdateTypes.Mass => _scheduleDefaults.Mass, + UpdateTypes.Daily => _scheduleDefaults.Daily, + UpdateTypes.Hourly => _scheduleDefaults.Hourly, + _ => _scheduleDefaults.Hourly + }; + + // Get pipeline-specific override if exists + var pipelineConfig = updateType switch + { + UpdateTypes.Mass => _config.Schedules?.Mass, + UpdateTypes.Daily => _config.Schedules?.Daily, + UpdateTypes.Hourly => _config.Schedules?.Hourly, + _ => null + }; + + // Merge: pipeline config overrides defaults + return pipelineConfig?.MergeWith(defaultConfig) ?? defaultConfig; + } + private DateTime? ComputeMinDt(string? minDtOffset) { if (string.IsNullOrEmpty(minDtOffset)) @@ -321,5 +443,71 @@ public class EtlPipelineFactory : IEtlPipelineFactory $"Unknown destination type: '{destType}'. Expected 'bulkImport' or 'bulkMerge'.") }; } + + private IImportSource CreateSourceWithUpdateType(SourceConfig sourceConfig, DateTime? minDt, bool useMassQuery) + { + // Use massQuery if specified, otherwise use the default query + var query = useMassQuery ? sourceConfig.MassQuery! : sourceConfig.Query; + + var parameters = new Dictionary(); + var converter = new ParameterFormatConverter(_settings.Timezone); + + // Only add parameters when not using massQuery (mass queries typically don't need date parameters) + var needsParameters = !useMassQuery; + + if (sourceConfig.Parameters != null && minDt.HasValue && needsParameters) + { + foreach (var (_, paramConfig) in sourceConfig.Parameters) + { + var paramValue = paramConfig.Source.ToLowerInvariant() switch + { + "offset" => converter.Convert(minDt.Value, paramConfig.Format), + "static" => paramConfig.Value + ?? throw new InvalidOperationException( + $"Static parameter '{paramConfig.Name}' requires a value."), + _ => throw new NotSupportedException( + $"Parameter source '{paramConfig.Source}' is not supported.") + }; + + // Use the parameter name exactly as configured (provider-specific) + parameters[paramConfig.Name] = paramValue; + } + } + + return new DbQuerySource( + _connectionFactory, + sourceConfig.Connection, + query, + parameters); + } + + private IImportDestination CreateDestinationWithSchedule( + string destType, + DestinationConfig baseConfig, + Configuration.ScheduleConfig scheduleConfig) + { + var tableName = baseConfig.Table; + + // Use base config for match/exclude columns + var matchColumns = baseConfig.MatchColumns?.ToArray(); + var excludeFromUpdate = baseConfig.ExcludeFromUpdate?.ToArray(); + + return destType.ToLowerInvariant() switch + { + "bulkimport" => new DbBulkImportDestination(_connectionFactory, tableName), + + "bulkmerge" => new DbBulkMergeDestination( + _connectionFactory, + tableName, + matchColumns ?? throw new InvalidOperationException( + $"matchColumns required for bulkMerge destination on table '{tableName}'."), + updateColumns: null, + excludeFromUpdate: excludeFromUpdate, + updateCondition: scheduleConfig.UpdateWhen), + + _ => throw new InvalidOperationException( + $"Unknown destination type: '{destType}'. Expected 'bulkImport' or 'bulkMerge'.") + }; + } } } diff --git a/NEW/tests/JdeScoping.DataSync.Tests/Services/EtlPipelineFactoryTests.cs b/NEW/tests/JdeScoping.DataSync.Tests/Services/EtlPipelineFactoryTests.cs index b38c387..0759037 100644 --- a/NEW/tests/JdeScoping.DataSync.Tests/Services/EtlPipelineFactoryTests.cs +++ b/NEW/tests/JdeScoping.DataSync.Tests/Services/EtlPipelineFactoryTests.cs @@ -1,3 +1,4 @@ +using JdeScoping.Core.Models.Enums; using JdeScoping.DataAccess.Interfaces; using JdeScoping.DataSync.Configuration; using JdeScoping.DataSync.Contracts; @@ -156,6 +157,141 @@ public class EtlPipelineFactoryTests #endregion + #region Builder WithUpdateType Tests + + [Fact] + public void Builder_WithUpdateTypesMass_BuildsPipeline() + { + // Arrange + var config = CreateValidConfigWithSchedules(); + var factory = CreateFactory(config); + + // Act + var pipeline = factory.ForTable("TestTable") + .WithUpdateType(UpdateTypes.Mass) + .Build(); + + // Assert + pipeline.ShouldNotBeNull(); + pipeline.PipelineName.ShouldBe("TestTable"); + } + + [Fact] + public void Builder_WithUpdateTypesDaily_BuildsPipeline() + { + // Arrange + var config = CreateValidConfigWithSchedules(); + var factory = CreateFactory(config); + + // Act + var pipeline = factory.ForTable("TestTable") + .WithUpdateType(UpdateTypes.Daily) + .Build(); + + // Assert + pipeline.ShouldNotBeNull(); + pipeline.PipelineName.ShouldBe("TestTable"); + } + + [Fact] + public void Builder_WithUpdateTypesHourly_BuildsPipeline() + { + // Arrange + var config = CreateValidConfigWithSchedules(); + var factory = CreateFactory(config); + + // Act + var pipeline = factory.ForTable("TestTable") + .WithUpdateType(UpdateTypes.Hourly) + .Build(); + + // Assert + pipeline.ShouldNotBeNull(); + pipeline.PipelineName.ShouldBe("TestTable"); + } + + [Fact] + public void Builder_WithUpdateTypesMass_UsesMassQuery() + { + // Arrange - config with massQuery should use it for Mass update type + var config = CreateValidConfigWithSchedules(); + var factory = CreateFactory(config); + + // Act + var pipeline = factory.ForTable("TestTable") + .WithUpdateType(UpdateTypes.Mass) + .Build(); + + // Assert + pipeline.ShouldNotBeNull(); + } + + [Fact] + public void Builder_WithUpdateTypesDaily_UsesRegularQuery() + { + // Arrange - Daily should use regular query with date filtering + var config = CreateValidConfigWithSchedules(); + var factory = CreateFactory(config); + + // Act + var pipeline = factory.ForTable("TestTable") + .WithUpdateType(UpdateTypes.Daily) + .Build(); + + // Assert + pipeline.ShouldNotBeNull(); + } + + [Fact] + public void Builder_WithUpdateTypesMass_AppliesPrePurgeFromScheduleConfig() + { + // Arrange - Mass schedule should have prePurge=true from defaults + var config = CreateValidConfigWithSchedules(); + var factory = CreateFactory(config); + + // Act - should not throw and should include truncate pre-script + var pipeline = factory.ForTable("TestTable") + .WithUpdateType(UpdateTypes.Mass) + .Build(); + + // Assert + pipeline.ShouldNotBeNull(); + } + + [Fact] + public void Builder_WithUpdateTypesMass_AppliesReIndexFromScheduleConfig() + { + // Arrange - Mass schedule should have reIndex=true from defaults + var config = CreateValidConfigWithSchedules(); + var factory = CreateFactory(config); + + // Act - should not throw and should include reindex post-script + var pipeline = factory.ForTable("TestTable") + .WithUpdateType(UpdateTypes.Mass) + .Build(); + + // Assert + pipeline.ShouldNotBeNull(); + } + + [Fact] + public void Builder_WithUpdateTypesHourly_UsesUpdateWhenFromDefaults() + { + // Arrange - Hourly should use updateWhen from defaults + var config = CreateValidConfigWithSchedules(); + var factory = CreateFactory(config); + + // Act + var pipeline = factory.ForTable("TestTable") + .WithUpdateType(UpdateTypes.Hourly) + .Build(); + + // Assert + pipeline.ShouldNotBeNull(); + } + + #endregion + #region Builder WithMinimumDate Tests [Fact] @@ -839,6 +975,34 @@ public class EtlPipelineFactoryTests }); } + private PipelinesRoot CreateValidConfigWithSchedules() + { + return new PipelinesRoot( + new PipelineSettings("UTC"), + new ScheduleDefaults(), + new Dictionary + { + ["TestTable"] = new PipelineConfig( + new SourceConfig("lotfinder", "SELECT * FROM Test WHERE UpdateDt >= @MinDt", + new Dictionary + { + ["minDt"] = new ParameterConfig("@MinDt", null, "offset", null) + }, + "SELECT * FROM Test"), // MassQuery + null, // No old SyncModes + new PipelineSchedules + { + Mass = new ScheduleConfig { PrePurge = true, ReIndex = true }, + Daily = new ScheduleConfig(), + Hourly = new ScheduleConfig() + }, + null, // Transformers + new DestinationConfig("TestTable", ["Id"], null), + null, + null) + }); + } + private EtlPipelineFactory CreateFactory(PipelinesRoot config) { return new EtlPipelineFactory(_connectionFactory, config, _logger);