diff --git a/NEW/src/JdeScoping.DataSync/Services/EtlPipelineFactory.cs b/NEW/src/JdeScoping.DataSync/Services/EtlPipelineFactory.cs
new file mode 100644
index 0000000..00095c0
--- /dev/null
+++ b/NEW/src/JdeScoping.DataSync/Services/EtlPipelineFactory.cs
@@ -0,0 +1,316 @@
+using System.Text.Json;
+using JdeScoping.DataAccess.Interfaces;
+using JdeScoping.DataSync.Configuration;
+using JdeScoping.DataSync.Contracts;
+using JdeScoping.DataSync.Etl.Contracts;
+using JdeScoping.DataSync.Etl.Destinations;
+using JdeScoping.DataSync.Etl.Pipeline;
+using JdeScoping.DataSync.Etl.Scripts;
+using JdeScoping.DataSync.Etl.Sources;
+using JdeScoping.DataSync.Options;
+using Microsoft.Extensions.Logging;
+using Microsoft.Extensions.Options;
+
+namespace JdeScoping.DataSync.Services;
+
+///
+/// Factory for creating ETL pipelines from JSON configuration.
+///
+public class EtlPipelineFactory : IEtlPipelineFactory
+{
+ private static readonly JsonSerializerOptions JsonOptions = new()
+ {
+ PropertyNameCaseInsensitive = true,
+ ReadCommentHandling = JsonCommentHandling.Skip,
+ AllowTrailingCommas = true
+ };
+
+ private readonly IDbConnectionFactory _connectionFactory;
+ private readonly ILogger _logger;
+ private readonly PipelinesRoot _config;
+
+ ///
+ /// Creates a new pipeline factory.
+ ///
+ /// Factory for creating database connections.
+ /// Pipeline configuration options.
+ /// Logger for pipeline execution.
+ public EtlPipelineFactory(
+ IDbConnectionFactory connectionFactory,
+ IOptions options,
+ ILogger logger)
+ {
+ ArgumentNullException.ThrowIfNull(connectionFactory);
+ ArgumentNullException.ThrowIfNull(options);
+ ArgumentNullException.ThrowIfNull(logger);
+
+ _connectionFactory = connectionFactory;
+ _logger = logger;
+ _config = LoadPipelineConfigs(options.Value.ConfigPath);
+ }
+
+ ///
+ /// Creates a new pipeline factory with a pre-loaded configuration (for testing).
+ ///
+ internal EtlPipelineFactory(
+ IDbConnectionFactory connectionFactory,
+ PipelinesRoot config,
+ ILogger logger)
+ {
+ ArgumentNullException.ThrowIfNull(connectionFactory);
+ ArgumentNullException.ThrowIfNull(config);
+ ArgumentNullException.ThrowIfNull(logger);
+
+ ValidateConfig(config);
+
+ _connectionFactory = connectionFactory;
+ _logger = logger;
+ _config = config;
+ }
+
+ ///
+ public IEtlPipelineBuilder ForTable(string tableName)
+ {
+ ArgumentException.ThrowIfNullOrWhiteSpace(tableName);
+
+ if (!_config.Pipelines.TryGetValue(tableName, out var pipelineConfig))
+ {
+ throw new InvalidOperationException(
+ $"No pipeline configured for table: {tableName}. " +
+ $"Available tables: {string.Join(", ", _config.Pipelines.Keys)}");
+ }
+
+ return new PipelineBuilder(
+ _connectionFactory,
+ tableName,
+ pipelineConfig,
+ _config.EffectiveSettings,
+ _logger);
+ }
+
+ private PipelinesRoot LoadPipelineConfigs(string configPath)
+ {
+ // Resolve path relative to assembly location (handles both debug and publish)
+ var assemblyDir = Path.GetDirectoryName(typeof(EtlPipelineFactory).Assembly.Location)!;
+ var fullPath = Path.Combine(assemblyDir, configPath);
+
+ if (!File.Exists(fullPath))
+ {
+ throw new FileNotFoundException(
+ $"Pipeline config not found: {fullPath}. " +
+ "Ensure the config file is included in the build output.");
+ }
+
+ var json = File.ReadAllText(fullPath);
+ var root = JsonSerializer.Deserialize(json, JsonOptions)
+ ?? throw new InvalidOperationException("Failed to deserialize pipeline config: result was null.");
+
+ ValidateConfig(root);
+ return root;
+ }
+
+ private static void ValidateConfig(PipelinesRoot root)
+ {
+ foreach (var (name, config) in root.Pipelines)
+ {
+ // Validate required sync modes
+ if (!config.SyncModes.ContainsKey("mass"))
+ {
+ throw new InvalidOperationException(
+ $"Pipeline '{name}' missing required 'mass' sync mode.");
+ }
+
+ if (!config.SyncModes.ContainsKey("incremental"))
+ {
+ throw new InvalidOperationException(
+ $"Pipeline '{name}' missing required 'incremental' sync mode.");
+ }
+
+ // Validate no runtime parameters (not yet supported)
+ if (config.Source.Parameters != null)
+ {
+ foreach (var (paramName, paramConfig) in config.Source.Parameters)
+ {
+ if (paramConfig.Source.Equals("runtime", StringComparison.OrdinalIgnoreCase))
+ {
+ throw new NotSupportedException(
+ $"Pipeline '{name}' parameter '{paramName}': " +
+ "runtime parameter source is not yet supported.");
+ }
+ }
+ }
+ }
+ }
+
+ private sealed class PipelineBuilder : IEtlPipelineBuilder
+ {
+ private readonly IDbConnectionFactory _connectionFactory;
+ private readonly string _tableName;
+ private readonly PipelineConfig _config;
+ private readonly PipelineSettings _settings;
+ private readonly ILogger _logger;
+ private SyncMode _mode = SyncMode.Incremental;
+ private DateTime? _minDtOverride;
+
+ public PipelineBuilder(
+ IDbConnectionFactory connectionFactory,
+ string tableName,
+ PipelineConfig config,
+ PipelineSettings settings,
+ ILogger logger)
+ {
+ _connectionFactory = connectionFactory;
+ _tableName = tableName;
+ _config = config;
+ _settings = settings;
+ _logger = logger;
+ }
+
+ public IEtlPipelineBuilder WithMode(SyncMode mode)
+ {
+ _mode = mode;
+ return this;
+ }
+
+ public IEtlPipelineBuilder WithMinimumDate(DateTime? minDt)
+ {
+ _minDtOverride = minDt;
+ return this;
+ }
+
+ public EtlPipeline Build()
+ {
+ var modeKey = _mode == SyncMode.Mass ? "mass" : "incremental";
+
+ if (!_config.SyncModes.TryGetValue(modeKey, out var modeConfig))
+ {
+ throw new InvalidOperationException(
+ $"Sync mode '{modeKey}' not defined for table '{_tableName}'.");
+ }
+
+ // Compute MinDt from offset or override
+ var minDt = _minDtOverride ?? ComputeMinDt(modeConfig.MinDtOffset);
+
+ // Create source with parameter substitution
+ var source = CreateSource(_config.Source, minDt);
+
+ // Determine destination type (mode override > default by mode)
+ var destType = modeConfig.Destination?.Type
+ ?? (_mode == SyncMode.Mass ? "bulkImport" : "bulkMerge");
+ var destination = CreateDestination(destType, _config.Destination, modeConfig);
+
+ // Build pipeline with scripts
+ var builder = new EtlPipelineBuilder()
+ .WithName(_tableName)
+ .WithSource(source)
+ .WithDestination(destination)
+ .WithLogger(_logger);
+
+ // Add pre-scripts: config scripts first, then prePurge
+ foreach (var script in _config.PreScripts ?? [])
+ {
+ builder.WithPreScript(new SqlScriptRunner(_connectionFactory, script, $"PreScript:{script.Substring(0, Math.Min(30, script.Length))}"));
+ }
+
+ if (modeConfig.PrePurge)
+ {
+ var truncateSql = $"TRUNCATE TABLE [{_config.Destination.Table}]";
+ builder.WithPreScript(new SqlScriptRunner(_connectionFactory, truncateSql, "PrePurge"));
+ }
+
+ // Add post-scripts: reIndex first, then config scripts
+ if (modeConfig.ReIndex)
+ {
+ var reindexSql = $"ALTER INDEX ALL ON [{_config.Destination.Table}] REBUILD";
+ builder.WithPostScript(new SqlScriptRunner(_connectionFactory, reindexSql, "ReIndex"));
+ }
+
+ foreach (var script in _config.PostScripts ?? [])
+ {
+ builder.WithPostScript(new SqlScriptRunner(_connectionFactory, script, $"PostScript:{script.Substring(0, Math.Min(30, script.Length))}"));
+ }
+
+ // Transformers are not yet implemented - placeholder for future
+ // foreach (var t in _config.Transformers ?? [])
+ // builder.WithTransformer(CreateTransformer(t));
+
+ return builder.Build();
+ }
+
+ private DateTime? ComputeMinDt(string? minDtOffset)
+ {
+ if (string.IsNullOrEmpty(minDtOffset))
+ return null;
+
+ if (!TimeSpan.TryParse(minDtOffset, out var offset))
+ {
+ throw new InvalidOperationException(
+ $"Invalid minDtOffset format: '{minDtOffset}'. Expected TimeSpan format (e.g., '-7.00:00:00').");
+ }
+
+ return DateTime.UtcNow.Add(offset);
+ }
+
+ private IImportSource CreateSource(SourceConfig sourceConfig, DateTime? minDt)
+ {
+ var parameters = new Dictionary();
+ var converter = new ParameterFormatConverter(_settings.Timezone);
+
+ if (sourceConfig.Parameters != null && minDt.HasValue)
+ {
+ foreach (var (_, paramConfig) in sourceConfig.Parameters)
+ {
+ var paramValue = paramConfig.Source.ToLowerInvariant() switch
+ {
+ "offset" => converter.Convert(minDt.Value, paramConfig.Format),
+ "static" => paramConfig.Value
+ ?? throw new InvalidOperationException(
+ $"Static parameter '{paramConfig.Name}' requires a value."),
+ _ => throw new NotSupportedException(
+ $"Parameter source '{paramConfig.Source}' is not supported.")
+ };
+
+ // Use the parameter name exactly as configured (provider-specific)
+ parameters[paramConfig.Name] = paramValue;
+ }
+ }
+
+ return new DbQuerySource(
+ _connectionFactory,
+ sourceConfig.Connection,
+ sourceConfig.Query,
+ parameters);
+ }
+
+ private IImportDestination CreateDestination(
+ string destType,
+ DestinationConfig baseConfig,
+ SyncModeConfig modeConfig)
+ {
+ var tableName = baseConfig.Table;
+
+ // Merge mode-specific destination config with base
+ var matchColumns = modeConfig.Destination?.MatchColumns?.ToArray()
+ ?? baseConfig.MatchColumns?.ToArray();
+ var excludeFromUpdate = modeConfig.Destination?.ExcludeFromUpdate?.ToArray()
+ ?? baseConfig.ExcludeFromUpdate?.ToArray();
+
+ return destType.ToLowerInvariant() switch
+ {
+ "bulkimport" => new DbBulkImportDestination(_connectionFactory, tableName),
+
+ "bulkmerge" => new DbBulkMergeDestination(
+ _connectionFactory,
+ tableName,
+ matchColumns ?? throw new InvalidOperationException(
+ $"matchColumns required for bulkMerge destination on table '{tableName}'."),
+ updateColumns: null,
+ excludeFromUpdate: excludeFromUpdate,
+ updateCondition: modeConfig.UpdateWhen),
+
+ _ => throw new InvalidOperationException(
+ $"Unknown destination type: '{destType}'. Expected 'bulkImport' or 'bulkMerge'.")
+ };
+ }
+ }
+}
diff --git a/NEW/tests/JdeScoping.DataSync.Tests/Services/EtlPipelineFactoryTests.cs b/NEW/tests/JdeScoping.DataSync.Tests/Services/EtlPipelineFactoryTests.cs
new file mode 100644
index 0000000..2faffc9
--- /dev/null
+++ b/NEW/tests/JdeScoping.DataSync.Tests/Services/EtlPipelineFactoryTests.cs
@@ -0,0 +1,810 @@
+using JdeScoping.DataAccess.Interfaces;
+using JdeScoping.DataSync.Configuration;
+using JdeScoping.DataSync.Contracts;
+using JdeScoping.DataSync.Etl.Pipeline;
+using JdeScoping.DataSync.Services;
+using Microsoft.Extensions.Logging;
+using Microsoft.Extensions.Logging.Abstractions;
+using NSubstitute;
+using Shouldly;
+
+namespace JdeScoping.DataSync.Tests.Services;
+
+public class EtlPipelineFactoryTests
+{
+ private readonly IDbConnectionFactory _connectionFactory;
+ private readonly ILogger _logger;
+
+ public EtlPipelineFactoryTests()
+ {
+ _connectionFactory = Substitute.For();
+ _logger = NullLogger.Instance;
+ }
+
+ #region ForTable Tests
+
+ [Fact]
+ public void ForTable_WithValidTable_ReturnsBuilder()
+ {
+ // Arrange
+ var config = CreateValidConfig();
+ var factory = CreateFactory(config);
+
+ // Act
+ var builder = factory.ForTable("TestTable");
+
+ // Assert
+ builder.ShouldNotBeNull();
+ builder.ShouldBeAssignableTo();
+ }
+
+ [Fact]
+ public void ForTable_WithUnknownTable_ThrowsInvalidOperationException()
+ {
+ // Arrange
+ var config = CreateValidConfig();
+ var factory = CreateFactory(config);
+
+ // Act & Assert
+ var ex = Should.Throw(() => factory.ForTable("NonExistentTable"));
+ ex.Message.ShouldContain("No pipeline configured for table: NonExistentTable");
+ ex.Message.ShouldContain("TestTable"); // Should list available tables
+ }
+
+ [Fact]
+ public void ForTable_WithNullTableName_ThrowsArgumentException()
+ {
+ // Arrange
+ var config = CreateValidConfig();
+ var factory = CreateFactory(config);
+
+ // Act & Assert
+ Should.Throw(() => factory.ForTable(null!));
+ }
+
+ [Fact]
+ public void ForTable_WithEmptyTableName_ThrowsArgumentException()
+ {
+ // Arrange
+ var config = CreateValidConfig();
+ var factory = CreateFactory(config);
+
+ // Act & Assert
+ Should.Throw(() => factory.ForTable(""));
+ }
+
+ #endregion
+
+ #region Builder WithMode Tests
+
+ [Fact]
+ public void Builder_WithMassMode_BuildsPipeline()
+ {
+ // Arrange
+ var config = CreateValidConfig();
+ var factory = CreateFactory(config);
+
+ // Act
+ var pipeline = factory.ForTable("TestTable")
+ .WithMode(SyncMode.Mass)
+ .Build();
+
+ // Assert
+ pipeline.ShouldNotBeNull();
+ pipeline.PipelineName.ShouldBe("TestTable");
+ }
+
+ [Fact]
+ public void Builder_WithIncrementalMode_BuildsPipeline()
+ {
+ // Arrange
+ var config = CreateValidConfig();
+ var factory = CreateFactory(config);
+
+ // Act
+ var pipeline = factory.ForTable("TestTable")
+ .WithMode(SyncMode.Incremental)
+ .Build();
+
+ // Assert
+ pipeline.ShouldNotBeNull();
+ pipeline.PipelineName.ShouldBe("TestTable");
+ }
+
+ [Fact]
+ public void Builder_DefaultMode_IsIncremental()
+ {
+ // Arrange
+ var config = CreateValidConfig();
+ var factory = CreateFactory(config);
+
+ // Act - don't call WithMode()
+ var pipeline = factory.ForTable("TestTable")
+ .Build();
+
+ // Assert - should work because incremental mode is defined
+ pipeline.ShouldNotBeNull();
+ }
+
+ [Fact]
+ public void Builder_WithUndefinedSyncMode_ThrowsInvalidOperationException()
+ {
+ // Arrange - config with only mass mode
+ var config = new PipelinesRoot(
+ new PipelineSettings("UTC"),
+ new Dictionary
+ {
+ ["TestTable"] = new PipelineConfig(
+ new SourceConfig("lotfinder", "SELECT * FROM Test", null),
+ new Dictionary
+ {
+ ["mass"] = new SyncModeConfig("-365.00:00:00", PrePurge: true)
+ // No incremental mode defined
+ },
+ null,
+ new DestinationConfig("TestTable", ["Id"], null),
+ null,
+ null)
+ });
+
+ // Act & Assert - validation fails at factory creation
+ var ex = Should.Throw(() => CreateFactory(config));
+ ex.Message.ShouldContain("missing required 'incremental' sync mode");
+ }
+
+ #endregion
+
+ #region Builder WithMinimumDate Tests
+
+ [Fact]
+ public void Builder_WithMinimumDate_OverridesConfigOffset()
+ {
+ // Arrange
+ var config = CreateValidConfig();
+ var factory = CreateFactory(config);
+ var customDate = new DateTime(2024, 1, 1, 0, 0, 0, DateTimeKind.Utc);
+
+ // Act - should not throw even though we're overriding
+ var pipeline = factory.ForTable("TestTable")
+ .WithMode(SyncMode.Incremental)
+ .WithMinimumDate(customDate)
+ .Build();
+
+ // Assert
+ pipeline.ShouldNotBeNull();
+ }
+
+ [Fact]
+ public void Builder_WithNullMinimumDate_UsesConfigOffset()
+ {
+ // Arrange
+ var config = CreateValidConfig();
+ var factory = CreateFactory(config);
+
+ // Act - null minDt means use config offset
+ var pipeline = factory.ForTable("TestTable")
+ .WithMode(SyncMode.Incremental)
+ .WithMinimumDate(null)
+ .Build();
+
+ // Assert
+ pipeline.ShouldNotBeNull();
+ }
+
+ #endregion
+
+ #region Config Validation Tests
+
+ [Fact]
+ public void Validate_ConfigMissingMassMode_ThrowsInvalidOperationException()
+ {
+ // Arrange
+ var config = new PipelinesRoot(
+ new PipelineSettings("UTC"),
+ new Dictionary
+ {
+ ["TestTable"] = new PipelineConfig(
+ new SourceConfig("lotfinder", "SELECT * FROM Test", null),
+ new Dictionary
+ {
+ // Missing mass mode
+ ["incremental"] = new SyncModeConfig("-1.00:00:00")
+ },
+ null,
+ new DestinationConfig("TestTable", ["Id"], null),
+ null,
+ null)
+ });
+
+ // Act & Assert
+ var ex = Should.Throw(() => CreateFactory(config));
+ ex.Message.ShouldContain("missing required 'mass' sync mode");
+ }
+
+ [Fact]
+ public void Validate_ConfigWithRuntimeParameter_ThrowsNotSupportedException()
+ {
+ // Arrange
+ var config = new PipelinesRoot(
+ new PipelineSettings("UTC"),
+ new Dictionary
+ {
+ ["TestTable"] = new PipelineConfig(
+ new SourceConfig("lotfinder", "SELECT * FROM Test WHERE Id = @Id",
+ new Dictionary
+ {
+ ["id"] = new ParameterConfig("@Id", null, "runtime", null)
+ }),
+ new Dictionary
+ {
+ ["mass"] = new SyncModeConfig("-365.00:00:00"),
+ ["incremental"] = new SyncModeConfig("-1.00:00:00")
+ },
+ null,
+ new DestinationConfig("TestTable", ["Id"], null),
+ null,
+ null)
+ });
+
+ // Act & Assert
+ var ex = Should.Throw(() => CreateFactory(config));
+ ex.Message.ShouldContain("runtime parameter source is not yet supported");
+ }
+
+ #endregion
+
+ #region Destination Type Tests
+
+ [Fact]
+ public void Builder_MassMode_DefaultsToBulkImport()
+ {
+ // Arrange - no destination override
+ var config = new PipelinesRoot(
+ new PipelineSettings("UTC"),
+ new Dictionary
+ {
+ ["TestTable"] = new PipelineConfig(
+ new SourceConfig("lotfinder", "SELECT * FROM Test", null),
+ new Dictionary
+ {
+ ["mass"] = new SyncModeConfig("-365.00:00:00", PrePurge: true),
+ ["incremental"] = new SyncModeConfig("-1.00:00:00")
+ },
+ null,
+ new DestinationConfig("TestTable", ["Id"], null),
+ null,
+ null)
+ });
+ var factory = CreateFactory(config);
+
+ // Act - should use bulkImport for mass mode
+ var pipeline = factory.ForTable("TestTable")
+ .WithMode(SyncMode.Mass)
+ .Build();
+
+ // Assert
+ pipeline.ShouldNotBeNull();
+ }
+
+ [Fact]
+ public void Builder_IncrementalMode_DefaultsToBulkMerge()
+ {
+ // Arrange - no destination override
+ var config = CreateValidConfig();
+ var factory = CreateFactory(config);
+
+ // Act - should use bulkMerge for incremental mode
+ var pipeline = factory.ForTable("TestTable")
+ .WithMode(SyncMode.Incremental)
+ .Build();
+
+ // Assert
+ pipeline.ShouldNotBeNull();
+ }
+
+ [Fact]
+ public void Builder_ModeWithDestinationOverride_UsesOverride()
+ {
+ // Arrange - mass mode with bulkMerge override
+ var config = new PipelinesRoot(
+ new PipelineSettings("UTC"),
+ new Dictionary
+ {
+ ["TestTable"] = new PipelineConfig(
+ new SourceConfig("lotfinder", "SELECT * FROM Test", null),
+ new Dictionary
+ {
+ ["mass"] = new SyncModeConfig("-365.00:00:00",
+ Destination: new DestinationOverride("bulkMerge", null, null)),
+ ["incremental"] = new SyncModeConfig("-1.00:00:00")
+ },
+ null,
+ new DestinationConfig("TestTable", ["Id"], null),
+ null,
+ null)
+ });
+ var factory = CreateFactory(config);
+
+ // Act - mass mode should use bulkMerge due to override
+ var pipeline = factory.ForTable("TestTable")
+ .WithMode(SyncMode.Mass)
+ .Build();
+
+ // Assert
+ pipeline.ShouldNotBeNull();
+ }
+
+ [Fact]
+ public void Builder_BulkMergeWithoutMatchColumns_ThrowsInvalidOperationException()
+ {
+ // Arrange - bulkMerge needs matchColumns
+ var config = new PipelinesRoot(
+ new PipelineSettings("UTC"),
+ new Dictionary
+ {
+ ["TestTable"] = new PipelineConfig(
+ new SourceConfig("lotfinder", "SELECT * FROM Test", null),
+ new Dictionary
+ {
+ ["mass"] = new SyncModeConfig("-365.00:00:00", PrePurge: true),
+ ["incremental"] = new SyncModeConfig("-1.00:00:00")
+ },
+ null,
+ new DestinationConfig("TestTable", null, null), // No matchColumns!
+ null,
+ null)
+ });
+ var factory = CreateFactory(config);
+
+ // Act & Assert
+ var ex = Should.Throw(() =>
+ factory.ForTable("TestTable")
+ .WithMode(SyncMode.Incremental) // Uses bulkMerge
+ .Build());
+ ex.Message.ShouldContain("matchColumns required for bulkMerge");
+ }
+
+ #endregion
+
+ #region Parameter Tests
+
+ [Fact]
+ public void Builder_WithOffsetParameter_CreatesSource()
+ {
+ // Arrange
+ var config = new PipelinesRoot(
+ new PipelineSettings("UTC"),
+ new Dictionary
+ {
+ ["TestTable"] = new PipelineConfig(
+ new SourceConfig("lotfinder", "SELECT * FROM Test WHERE UpdateDt >= @MinDt",
+ new Dictionary
+ {
+ ["minDt"] = new ParameterConfig("@MinDt", null, "offset", null)
+ }),
+ new Dictionary
+ {
+ ["mass"] = new SyncModeConfig("-365.00:00:00", PrePurge: true),
+ ["incremental"] = new SyncModeConfig("-1.00:00:00")
+ },
+ null,
+ new DestinationConfig("TestTable", ["Id"], null),
+ null,
+ null)
+ });
+ var factory = CreateFactory(config);
+
+ // Act
+ var pipeline = factory.ForTable("TestTable")
+ .WithMode(SyncMode.Incremental)
+ .Build();
+
+ // Assert
+ pipeline.ShouldNotBeNull();
+ }
+
+ [Fact]
+ public void Builder_WithJdeJulianParameter_CreatesSource()
+ {
+ // Arrange
+ var config = new PipelinesRoot(
+ new PipelineSettings("UTC"),
+ new Dictionary
+ {
+ ["TestTable"] = new PipelineConfig(
+ new SourceConfig("jde", "SELECT * FROM Test WHERE UPMJ >= :dateUpdated",
+ new Dictionary
+ {
+ ["minDt"] = new ParameterConfig(":dateUpdated", "jdeJulian", "offset", null)
+ }),
+ new Dictionary
+ {
+ ["mass"] = new SyncModeConfig("-365.00:00:00", PrePurge: true),
+ ["incremental"] = new SyncModeConfig("-1.00:00:00")
+ },
+ null,
+ new DestinationConfig("TestTable", ["Id"], null),
+ null,
+ null)
+ });
+ var factory = CreateFactory(config);
+
+ // Act
+ var pipeline = factory.ForTable("TestTable")
+ .WithMode(SyncMode.Incremental)
+ .Build();
+
+ // Assert
+ pipeline.ShouldNotBeNull();
+ }
+
+ [Fact]
+ public void Builder_WithStaticParameter_UsesConfiguredValue()
+ {
+ // Arrange
+ var config = new PipelinesRoot(
+ new PipelineSettings("UTC"),
+ new Dictionary
+ {
+ ["TestTable"] = new PipelineConfig(
+ new SourceConfig("lotfinder", "SELECT * FROM Test WHERE Status = @Status",
+ new Dictionary
+ {
+ ["status"] = new ParameterConfig("@Status", null, "static", "Active")
+ }),
+ new Dictionary
+ {
+ ["mass"] = new SyncModeConfig("-365.00:00:00", PrePurge: true),
+ ["incremental"] = new SyncModeConfig("-1.00:00:00")
+ },
+ null,
+ new DestinationConfig("TestTable", ["Id"], null),
+ null,
+ null)
+ });
+ var factory = CreateFactory(config);
+
+ // Act
+ var pipeline = factory.ForTable("TestTable")
+ .WithMode(SyncMode.Incremental)
+ .Build();
+
+ // Assert
+ pipeline.ShouldNotBeNull();
+ }
+
+ [Fact]
+ public void Builder_WithStaticParameterNoValue_ThrowsInvalidOperationException()
+ {
+ // Arrange
+ var config = new PipelinesRoot(
+ new PipelineSettings("UTC"),
+ new Dictionary
+ {
+ ["TestTable"] = new PipelineConfig(
+ new SourceConfig("lotfinder", "SELECT * FROM Test WHERE Status = @Status",
+ new Dictionary
+ {
+ ["status"] = new ParameterConfig("@Status", null, "static", null) // No value!
+ }),
+ new Dictionary
+ {
+ ["mass"] = new SyncModeConfig("-365.00:00:00", PrePurge: true),
+ ["incremental"] = new SyncModeConfig("-1.00:00:00")
+ },
+ null,
+ new DestinationConfig("TestTable", ["Id"], null),
+ null,
+ null)
+ });
+ var factory = CreateFactory(config);
+
+ // Act & Assert
+ var ex = Should.Throw(() =>
+ factory.ForTable("TestTable")
+ .WithMode(SyncMode.Incremental)
+ .Build());
+ ex.Message.ShouldContain("Static parameter '@Status' requires a value");
+ }
+
+ #endregion
+
+ #region Script Tests
+
+ [Fact]
+ public void Builder_WithPrePurge_AddsTruncateScript()
+ {
+ // Arrange
+ var config = new PipelinesRoot(
+ new PipelineSettings("UTC"),
+ new Dictionary
+ {
+ ["TestTable"] = new PipelineConfig(
+ new SourceConfig("lotfinder", "SELECT * FROM Test", null),
+ new Dictionary
+ {
+ ["mass"] = new SyncModeConfig("-365.00:00:00", PrePurge: true),
+ ["incremental"] = new SyncModeConfig("-1.00:00:00")
+ },
+ null,
+ new DestinationConfig("TestTable", ["Id"], null),
+ null,
+ null)
+ });
+ var factory = CreateFactory(config);
+
+ // Act
+ var pipeline = factory.ForTable("TestTable")
+ .WithMode(SyncMode.Mass)
+ .Build();
+
+ // Assert
+ pipeline.ShouldNotBeNull();
+ }
+
+ [Fact]
+ public void Builder_WithReIndex_AddsRebuildScript()
+ {
+ // Arrange
+ var config = new PipelinesRoot(
+ new PipelineSettings("UTC"),
+ new Dictionary
+ {
+ ["TestTable"] = new PipelineConfig(
+ new SourceConfig("lotfinder", "SELECT * FROM Test", null),
+ new Dictionary
+ {
+ ["mass"] = new SyncModeConfig("-365.00:00:00", PrePurge: true, ReIndex: true),
+ ["incremental"] = new SyncModeConfig("-1.00:00:00")
+ },
+ null,
+ new DestinationConfig("TestTable", ["Id"], null),
+ null,
+ null)
+ });
+ var factory = CreateFactory(config);
+
+ // Act
+ var pipeline = factory.ForTable("TestTable")
+ .WithMode(SyncMode.Mass)
+ .Build();
+
+ // Assert
+ pipeline.ShouldNotBeNull();
+ }
+
+ [Fact]
+ public void Builder_WithPreScripts_AddsConfiguredScripts()
+ {
+ // Arrange
+ var config = new PipelinesRoot(
+ new PipelineSettings("UTC"),
+ new Dictionary
+ {
+ ["TestTable"] = new PipelineConfig(
+ new SourceConfig("lotfinder", "SELECT * FROM Test", null),
+ new Dictionary
+ {
+ ["mass"] = new SyncModeConfig("-365.00:00:00", PrePurge: true),
+ ["incremental"] = new SyncModeConfig("-1.00:00:00")
+ },
+ null,
+ new DestinationConfig("TestTable", ["Id"], null),
+ ["EXEC sp_BeforeSync"],
+ null)
+ });
+ var factory = CreateFactory(config);
+
+ // Act
+ var pipeline = factory.ForTable("TestTable")
+ .WithMode(SyncMode.Mass)
+ .Build();
+
+ // Assert
+ pipeline.ShouldNotBeNull();
+ }
+
+ [Fact]
+ public void Builder_WithPostScripts_AddsConfiguredScripts()
+ {
+ // Arrange
+ var config = new PipelinesRoot(
+ new PipelineSettings("UTC"),
+ new Dictionary
+ {
+ ["TestTable"] = new PipelineConfig(
+ new SourceConfig("lotfinder", "SELECT * FROM Test", null),
+ new Dictionary
+ {
+ ["mass"] = new SyncModeConfig("-365.00:00:00", PrePurge: true),
+ ["incremental"] = new SyncModeConfig("-1.00:00:00")
+ },
+ null,
+ new DestinationConfig("TestTable", ["Id"], null),
+ null,
+ ["UPDATE TestTable SET ProcessedFlag = 1 WHERE ProcessedFlag IS NULL"])
+ });
+ var factory = CreateFactory(config);
+
+ // Act
+ var pipeline = factory.ForTable("TestTable")
+ .WithMode(SyncMode.Incremental)
+ .Build();
+
+ // Assert
+ pipeline.ShouldNotBeNull();
+ }
+
+ #endregion
+
+ #region Connection Type Tests
+
+ [Theory]
+ [InlineData("jde")]
+ [InlineData("cms")]
+ [InlineData("lotfinder")]
+ public void Builder_WithValidConnectionType_BuildsPipeline(string connectionType)
+ {
+ // Arrange
+ var config = new PipelinesRoot(
+ new PipelineSettings("UTC"),
+ new Dictionary
+ {
+ ["TestTable"] = new PipelineConfig(
+ new SourceConfig(connectionType, "SELECT * FROM Test", null),
+ new Dictionary
+ {
+ ["mass"] = new SyncModeConfig("-365.00:00:00", PrePurge: true),
+ ["incremental"] = new SyncModeConfig("-1.00:00:00")
+ },
+ null,
+ new DestinationConfig("TestTable", ["Id"], null),
+ null,
+ null)
+ });
+ var factory = CreateFactory(config);
+
+ // Act
+ var pipeline = factory.ForTable("TestTable")
+ .WithMode(SyncMode.Mass)
+ .Build();
+
+ // Assert
+ pipeline.ShouldNotBeNull();
+ }
+
+ #endregion
+
+ #region Settings Tests
+
+ [Fact]
+ public void Factory_WithNullSettings_UsesDefaults()
+ {
+ // Arrange - null settings should use defaults
+ var config = new PipelinesRoot(
+ null, // Null settings
+ new Dictionary
+ {
+ ["TestTable"] = new PipelineConfig(
+ new SourceConfig("lotfinder", "SELECT * FROM Test", null),
+ new Dictionary
+ {
+ ["mass"] = new SyncModeConfig("-365.00:00:00", PrePurge: true),
+ ["incremental"] = new SyncModeConfig("-1.00:00:00")
+ },
+ null,
+ new DestinationConfig("TestTable", ["Id"], null),
+ null,
+ null)
+ });
+ var factory = CreateFactory(config);
+
+ // Act
+ var pipeline = factory.ForTable("TestTable")
+ .WithMode(SyncMode.Incremental)
+ .Build();
+
+ // Assert
+ pipeline.ShouldNotBeNull();
+ }
+
+ #endregion
+
+ #region MinDtOffset Format Tests
+
+ [Fact]
+ public void Builder_WithInvalidMinDtOffsetFormat_ThrowsInvalidOperationException()
+ {
+ // Arrange
+ var config = new PipelinesRoot(
+ new PipelineSettings("UTC"),
+ new Dictionary
+ {
+ ["TestTable"] = new PipelineConfig(
+ new SourceConfig("lotfinder", "SELECT * FROM Test", null),
+ new Dictionary
+ {
+ ["mass"] = new SyncModeConfig("-365.00:00:00", PrePurge: true),
+ ["incremental"] = new SyncModeConfig("not-a-valid-timespan") // Invalid!
+ },
+ null,
+ new DestinationConfig("TestTable", ["Id"], null),
+ null,
+ null)
+ });
+ var factory = CreateFactory(config);
+
+ // Act & Assert
+ var ex = Should.Throw(() =>
+ factory.ForTable("TestTable")
+ .WithMode(SyncMode.Incremental)
+ .Build());
+ ex.Message.ShouldContain("Invalid minDtOffset format");
+ }
+
+ [Fact]
+ public void Builder_WithNullMinDtOffset_DoesNotThrow()
+ {
+ // Arrange - null offset should be valid (no date filtering)
+ var config = new PipelinesRoot(
+ new PipelineSettings("UTC"),
+ new Dictionary
+ {
+ ["TestTable"] = new PipelineConfig(
+ new SourceConfig("lotfinder", "SELECT * FROM Test", null),
+ new Dictionary
+ {
+ ["mass"] = new SyncModeConfig(null, PrePurge: true), // Null offset
+ ["incremental"] = new SyncModeConfig("-1.00:00:00")
+ },
+ null,
+ new DestinationConfig("TestTable", ["Id"], null),
+ null,
+ null)
+ });
+ var factory = CreateFactory(config);
+
+ // Act
+ var pipeline = factory.ForTable("TestTable")
+ .WithMode(SyncMode.Mass)
+ .Build();
+
+ // Assert
+ pipeline.ShouldNotBeNull();
+ }
+
+ #endregion
+
+ #region Helper Methods
+
+ private PipelinesRoot CreateValidConfig()
+ {
+ return new PipelinesRoot(
+ new PipelineSettings("UTC"),
+ new Dictionary
+ {
+ ["TestTable"] = new PipelineConfig(
+ new SourceConfig("lotfinder", "SELECT * FROM Test WHERE UpdateDt >= @MinDt",
+ new Dictionary
+ {
+ ["minDt"] = new ParameterConfig("@MinDt", null, "offset", null)
+ }),
+ new Dictionary
+ {
+ ["mass"] = new SyncModeConfig("-365.00:00:00", PrePurge: true, ReIndex: true),
+ ["incremental"] = new SyncModeConfig("-1.00:00:00")
+ },
+ null,
+ new DestinationConfig("TestTable", ["Id"], null),
+ null,
+ null)
+ });
+ }
+
+ private EtlPipelineFactory CreateFactory(PipelinesRoot config)
+ {
+ return new EtlPipelineFactory(_connectionFactory, config, _logger);
+ }
+
+ #endregion
+}