From 795c15df5611fb494a1f072264cec7cdaf41c48a Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Tue, 6 Jan 2026 13:45:36 -0500 Subject: [PATCH] feat(datasync): add EtlPipelineFactory with JSON config support - Implement IEtlPipelineFactory with ForTable() method returning a builder - Load pipeline config from JSON file path (from PipelineOptions) - Parse config using System.Text.Json with PropertyNameCaseInsensitive - Builder supports WithMode() and WithMinimumDate() fluent methods - Create DbQuerySource for source with ParameterFormatConverter for JDE dates - Create DbBulkMergeDestination or DbBulkImportDestination based on sync mode - Mass mode defaults to bulkImport, incremental defaults to bulkMerge - Support destination override in sync mode config - Execute pre/post scripts from config (prePurge, reIndex, custom scripts) - Validate config: require mass and incremental modes, reject runtime params - Add comprehensive tests for factory, builder, and config validation --- .../Services/EtlPipelineFactory.cs | 316 +++++++ .../Services/EtlPipelineFactoryTests.cs | 810 ++++++++++++++++++ 2 files changed, 1126 insertions(+) create mode 100644 NEW/src/JdeScoping.DataSync/Services/EtlPipelineFactory.cs create mode 100644 NEW/tests/JdeScoping.DataSync.Tests/Services/EtlPipelineFactoryTests.cs diff --git a/NEW/src/JdeScoping.DataSync/Services/EtlPipelineFactory.cs b/NEW/src/JdeScoping.DataSync/Services/EtlPipelineFactory.cs new file mode 100644 index 0000000..00095c0 --- /dev/null +++ b/NEW/src/JdeScoping.DataSync/Services/EtlPipelineFactory.cs @@ -0,0 +1,316 @@ +using System.Text.Json; +using JdeScoping.DataAccess.Interfaces; +using JdeScoping.DataSync.Configuration; +using JdeScoping.DataSync.Contracts; +using JdeScoping.DataSync.Etl.Contracts; +using JdeScoping.DataSync.Etl.Destinations; +using JdeScoping.DataSync.Etl.Pipeline; +using JdeScoping.DataSync.Etl.Scripts; +using JdeScoping.DataSync.Etl.Sources; +using JdeScoping.DataSync.Options; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; + +namespace JdeScoping.DataSync.Services; + +/// +/// Factory for creating ETL pipelines from JSON configuration. +/// +public class EtlPipelineFactory : IEtlPipelineFactory +{ + private static readonly JsonSerializerOptions JsonOptions = new() + { + PropertyNameCaseInsensitive = true, + ReadCommentHandling = JsonCommentHandling.Skip, + AllowTrailingCommas = true + }; + + private readonly IDbConnectionFactory _connectionFactory; + private readonly ILogger _logger; + private readonly PipelinesRoot _config; + + /// + /// Creates a new pipeline factory. + /// + /// Factory for creating database connections. + /// Pipeline configuration options. + /// Logger for pipeline execution. + public EtlPipelineFactory( + IDbConnectionFactory connectionFactory, + IOptions options, + ILogger logger) + { + ArgumentNullException.ThrowIfNull(connectionFactory); + ArgumentNullException.ThrowIfNull(options); + ArgumentNullException.ThrowIfNull(logger); + + _connectionFactory = connectionFactory; + _logger = logger; + _config = LoadPipelineConfigs(options.Value.ConfigPath); + } + + /// + /// Creates a new pipeline factory with a pre-loaded configuration (for testing). + /// + internal EtlPipelineFactory( + IDbConnectionFactory connectionFactory, + PipelinesRoot config, + ILogger logger) + { + ArgumentNullException.ThrowIfNull(connectionFactory); + ArgumentNullException.ThrowIfNull(config); + ArgumentNullException.ThrowIfNull(logger); + + ValidateConfig(config); + + _connectionFactory = connectionFactory; + _logger = logger; + _config = config; + } + + /// + public IEtlPipelineBuilder ForTable(string tableName) + { + ArgumentException.ThrowIfNullOrWhiteSpace(tableName); + + if (!_config.Pipelines.TryGetValue(tableName, out var pipelineConfig)) + { + throw new InvalidOperationException( + $"No pipeline configured for table: {tableName}. " + + $"Available tables: {string.Join(", ", _config.Pipelines.Keys)}"); + } + + return new PipelineBuilder( + _connectionFactory, + tableName, + pipelineConfig, + _config.EffectiveSettings, + _logger); + } + + private PipelinesRoot LoadPipelineConfigs(string configPath) + { + // Resolve path relative to assembly location (handles both debug and publish) + var assemblyDir = Path.GetDirectoryName(typeof(EtlPipelineFactory).Assembly.Location)!; + var fullPath = Path.Combine(assemblyDir, configPath); + + if (!File.Exists(fullPath)) + { + throw new FileNotFoundException( + $"Pipeline config not found: {fullPath}. " + + "Ensure the config file is included in the build output."); + } + + var json = File.ReadAllText(fullPath); + var root = JsonSerializer.Deserialize(json, JsonOptions) + ?? throw new InvalidOperationException("Failed to deserialize pipeline config: result was null."); + + ValidateConfig(root); + return root; + } + + private static void ValidateConfig(PipelinesRoot root) + { + foreach (var (name, config) in root.Pipelines) + { + // Validate required sync modes + if (!config.SyncModes.ContainsKey("mass")) + { + throw new InvalidOperationException( + $"Pipeline '{name}' missing required 'mass' sync mode."); + } + + if (!config.SyncModes.ContainsKey("incremental")) + { + throw new InvalidOperationException( + $"Pipeline '{name}' missing required 'incremental' sync mode."); + } + + // Validate no runtime parameters (not yet supported) + if (config.Source.Parameters != null) + { + foreach (var (paramName, paramConfig) in config.Source.Parameters) + { + if (paramConfig.Source.Equals("runtime", StringComparison.OrdinalIgnoreCase)) + { + throw new NotSupportedException( + $"Pipeline '{name}' parameter '{paramName}': " + + "runtime parameter source is not yet supported."); + } + } + } + } + } + + private sealed class PipelineBuilder : IEtlPipelineBuilder + { + private readonly IDbConnectionFactory _connectionFactory; + private readonly string _tableName; + private readonly PipelineConfig _config; + private readonly PipelineSettings _settings; + private readonly ILogger _logger; + private SyncMode _mode = SyncMode.Incremental; + private DateTime? _minDtOverride; + + public PipelineBuilder( + IDbConnectionFactory connectionFactory, + string tableName, + PipelineConfig config, + PipelineSettings settings, + ILogger logger) + { + _connectionFactory = connectionFactory; + _tableName = tableName; + _config = config; + _settings = settings; + _logger = logger; + } + + public IEtlPipelineBuilder WithMode(SyncMode mode) + { + _mode = mode; + return this; + } + + public IEtlPipelineBuilder WithMinimumDate(DateTime? minDt) + { + _minDtOverride = minDt; + return this; + } + + public EtlPipeline Build() + { + var modeKey = _mode == SyncMode.Mass ? "mass" : "incremental"; + + if (!_config.SyncModes.TryGetValue(modeKey, out var modeConfig)) + { + throw new InvalidOperationException( + $"Sync mode '{modeKey}' not defined for table '{_tableName}'."); + } + + // Compute MinDt from offset or override + var minDt = _minDtOverride ?? ComputeMinDt(modeConfig.MinDtOffset); + + // Create source with parameter substitution + var source = CreateSource(_config.Source, minDt); + + // Determine destination type (mode override > default by mode) + var destType = modeConfig.Destination?.Type + ?? (_mode == SyncMode.Mass ? "bulkImport" : "bulkMerge"); + var destination = CreateDestination(destType, _config.Destination, modeConfig); + + // Build pipeline with scripts + var builder = new EtlPipelineBuilder() + .WithName(_tableName) + .WithSource(source) + .WithDestination(destination) + .WithLogger(_logger); + + // Add pre-scripts: config scripts first, then prePurge + foreach (var script in _config.PreScripts ?? []) + { + builder.WithPreScript(new SqlScriptRunner(_connectionFactory, script, $"PreScript:{script.Substring(0, Math.Min(30, script.Length))}")); + } + + if (modeConfig.PrePurge) + { + var truncateSql = $"TRUNCATE TABLE [{_config.Destination.Table}]"; + builder.WithPreScript(new SqlScriptRunner(_connectionFactory, truncateSql, "PrePurge")); + } + + // Add post-scripts: reIndex first, then config scripts + if (modeConfig.ReIndex) + { + var reindexSql = $"ALTER INDEX ALL ON [{_config.Destination.Table}] REBUILD"; + builder.WithPostScript(new SqlScriptRunner(_connectionFactory, reindexSql, "ReIndex")); + } + + foreach (var script in _config.PostScripts ?? []) + { + builder.WithPostScript(new SqlScriptRunner(_connectionFactory, script, $"PostScript:{script.Substring(0, Math.Min(30, script.Length))}")); + } + + // Transformers are not yet implemented - placeholder for future + // foreach (var t in _config.Transformers ?? []) + // builder.WithTransformer(CreateTransformer(t)); + + return builder.Build(); + } + + private DateTime? ComputeMinDt(string? minDtOffset) + { + if (string.IsNullOrEmpty(minDtOffset)) + return null; + + if (!TimeSpan.TryParse(minDtOffset, out var offset)) + { + throw new InvalidOperationException( + $"Invalid minDtOffset format: '{minDtOffset}'. Expected TimeSpan format (e.g., '-7.00:00:00')."); + } + + return DateTime.UtcNow.Add(offset); + } + + private IImportSource CreateSource(SourceConfig sourceConfig, DateTime? minDt) + { + var parameters = new Dictionary(); + var converter = new ParameterFormatConverter(_settings.Timezone); + + if (sourceConfig.Parameters != null && minDt.HasValue) + { + foreach (var (_, paramConfig) in sourceConfig.Parameters) + { + var paramValue = paramConfig.Source.ToLowerInvariant() switch + { + "offset" => converter.Convert(minDt.Value, paramConfig.Format), + "static" => paramConfig.Value + ?? throw new InvalidOperationException( + $"Static parameter '{paramConfig.Name}' requires a value."), + _ => throw new NotSupportedException( + $"Parameter source '{paramConfig.Source}' is not supported.") + }; + + // Use the parameter name exactly as configured (provider-specific) + parameters[paramConfig.Name] = paramValue; + } + } + + return new DbQuerySource( + _connectionFactory, + sourceConfig.Connection, + sourceConfig.Query, + parameters); + } + + private IImportDestination CreateDestination( + string destType, + DestinationConfig baseConfig, + SyncModeConfig modeConfig) + { + var tableName = baseConfig.Table; + + // Merge mode-specific destination config with base + var matchColumns = modeConfig.Destination?.MatchColumns?.ToArray() + ?? baseConfig.MatchColumns?.ToArray(); + var excludeFromUpdate = modeConfig.Destination?.ExcludeFromUpdate?.ToArray() + ?? baseConfig.ExcludeFromUpdate?.ToArray(); + + return destType.ToLowerInvariant() switch + { + "bulkimport" => new DbBulkImportDestination(_connectionFactory, tableName), + + "bulkmerge" => new DbBulkMergeDestination( + _connectionFactory, + tableName, + matchColumns ?? throw new InvalidOperationException( + $"matchColumns required for bulkMerge destination on table '{tableName}'."), + updateColumns: null, + excludeFromUpdate: excludeFromUpdate, + updateCondition: modeConfig.UpdateWhen), + + _ => throw new InvalidOperationException( + $"Unknown destination type: '{destType}'. Expected 'bulkImport' or 'bulkMerge'.") + }; + } + } +} diff --git a/NEW/tests/JdeScoping.DataSync.Tests/Services/EtlPipelineFactoryTests.cs b/NEW/tests/JdeScoping.DataSync.Tests/Services/EtlPipelineFactoryTests.cs new file mode 100644 index 0000000..2faffc9 --- /dev/null +++ b/NEW/tests/JdeScoping.DataSync.Tests/Services/EtlPipelineFactoryTests.cs @@ -0,0 +1,810 @@ +using JdeScoping.DataAccess.Interfaces; +using JdeScoping.DataSync.Configuration; +using JdeScoping.DataSync.Contracts; +using JdeScoping.DataSync.Etl.Pipeline; +using JdeScoping.DataSync.Services; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Logging.Abstractions; +using NSubstitute; +using Shouldly; + +namespace JdeScoping.DataSync.Tests.Services; + +public class EtlPipelineFactoryTests +{ + private readonly IDbConnectionFactory _connectionFactory; + private readonly ILogger _logger; + + public EtlPipelineFactoryTests() + { + _connectionFactory = Substitute.For(); + _logger = NullLogger.Instance; + } + + #region ForTable Tests + + [Fact] + public void ForTable_WithValidTable_ReturnsBuilder() + { + // Arrange + var config = CreateValidConfig(); + var factory = CreateFactory(config); + + // Act + var builder = factory.ForTable("TestTable"); + + // Assert + builder.ShouldNotBeNull(); + builder.ShouldBeAssignableTo(); + } + + [Fact] + public void ForTable_WithUnknownTable_ThrowsInvalidOperationException() + { + // Arrange + var config = CreateValidConfig(); + var factory = CreateFactory(config); + + // Act & Assert + var ex = Should.Throw(() => factory.ForTable("NonExistentTable")); + ex.Message.ShouldContain("No pipeline configured for table: NonExistentTable"); + ex.Message.ShouldContain("TestTable"); // Should list available tables + } + + [Fact] + public void ForTable_WithNullTableName_ThrowsArgumentException() + { + // Arrange + var config = CreateValidConfig(); + var factory = CreateFactory(config); + + // Act & Assert + Should.Throw(() => factory.ForTable(null!)); + } + + [Fact] + public void ForTable_WithEmptyTableName_ThrowsArgumentException() + { + // Arrange + var config = CreateValidConfig(); + var factory = CreateFactory(config); + + // Act & Assert + Should.Throw(() => factory.ForTable("")); + } + + #endregion + + #region Builder WithMode Tests + + [Fact] + public void Builder_WithMassMode_BuildsPipeline() + { + // Arrange + var config = CreateValidConfig(); + var factory = CreateFactory(config); + + // Act + var pipeline = factory.ForTable("TestTable") + .WithMode(SyncMode.Mass) + .Build(); + + // Assert + pipeline.ShouldNotBeNull(); + pipeline.PipelineName.ShouldBe("TestTable"); + } + + [Fact] + public void Builder_WithIncrementalMode_BuildsPipeline() + { + // Arrange + var config = CreateValidConfig(); + var factory = CreateFactory(config); + + // Act + var pipeline = factory.ForTable("TestTable") + .WithMode(SyncMode.Incremental) + .Build(); + + // Assert + pipeline.ShouldNotBeNull(); + pipeline.PipelineName.ShouldBe("TestTable"); + } + + [Fact] + public void Builder_DefaultMode_IsIncremental() + { + // Arrange + var config = CreateValidConfig(); + var factory = CreateFactory(config); + + // Act - don't call WithMode() + var pipeline = factory.ForTable("TestTable") + .Build(); + + // Assert - should work because incremental mode is defined + pipeline.ShouldNotBeNull(); + } + + [Fact] + public void Builder_WithUndefinedSyncMode_ThrowsInvalidOperationException() + { + // Arrange - config with only mass mode + var config = new PipelinesRoot( + new PipelineSettings("UTC"), + new Dictionary + { + ["TestTable"] = new PipelineConfig( + new SourceConfig("lotfinder", "SELECT * FROM Test", null), + new Dictionary + { + ["mass"] = new SyncModeConfig("-365.00:00:00", PrePurge: true) + // No incremental mode defined + }, + null, + new DestinationConfig("TestTable", ["Id"], null), + null, + null) + }); + + // Act & Assert - validation fails at factory creation + var ex = Should.Throw(() => CreateFactory(config)); + ex.Message.ShouldContain("missing required 'incremental' sync mode"); + } + + #endregion + + #region Builder WithMinimumDate Tests + + [Fact] + public void Builder_WithMinimumDate_OverridesConfigOffset() + { + // Arrange + var config = CreateValidConfig(); + var factory = CreateFactory(config); + var customDate = new DateTime(2024, 1, 1, 0, 0, 0, DateTimeKind.Utc); + + // Act - should not throw even though we're overriding + var pipeline = factory.ForTable("TestTable") + .WithMode(SyncMode.Incremental) + .WithMinimumDate(customDate) + .Build(); + + // Assert + pipeline.ShouldNotBeNull(); + } + + [Fact] + public void Builder_WithNullMinimumDate_UsesConfigOffset() + { + // Arrange + var config = CreateValidConfig(); + var factory = CreateFactory(config); + + // Act - null minDt means use config offset + var pipeline = factory.ForTable("TestTable") + .WithMode(SyncMode.Incremental) + .WithMinimumDate(null) + .Build(); + + // Assert + pipeline.ShouldNotBeNull(); + } + + #endregion + + #region Config Validation Tests + + [Fact] + public void Validate_ConfigMissingMassMode_ThrowsInvalidOperationException() + { + // Arrange + var config = new PipelinesRoot( + new PipelineSettings("UTC"), + new Dictionary + { + ["TestTable"] = new PipelineConfig( + new SourceConfig("lotfinder", "SELECT * FROM Test", null), + new Dictionary + { + // Missing mass mode + ["incremental"] = new SyncModeConfig("-1.00:00:00") + }, + null, + new DestinationConfig("TestTable", ["Id"], null), + null, + null) + }); + + // Act & Assert + var ex = Should.Throw(() => CreateFactory(config)); + ex.Message.ShouldContain("missing required 'mass' sync mode"); + } + + [Fact] + public void Validate_ConfigWithRuntimeParameter_ThrowsNotSupportedException() + { + // Arrange + var config = new PipelinesRoot( + new PipelineSettings("UTC"), + new Dictionary + { + ["TestTable"] = new PipelineConfig( + new SourceConfig("lotfinder", "SELECT * FROM Test WHERE Id = @Id", + new Dictionary + { + ["id"] = new ParameterConfig("@Id", null, "runtime", null) + }), + new Dictionary + { + ["mass"] = new SyncModeConfig("-365.00:00:00"), + ["incremental"] = new SyncModeConfig("-1.00:00:00") + }, + null, + new DestinationConfig("TestTable", ["Id"], null), + null, + null) + }); + + // Act & Assert + var ex = Should.Throw(() => CreateFactory(config)); + ex.Message.ShouldContain("runtime parameter source is not yet supported"); + } + + #endregion + + #region Destination Type Tests + + [Fact] + public void Builder_MassMode_DefaultsToBulkImport() + { + // Arrange - no destination override + var config = new PipelinesRoot( + new PipelineSettings("UTC"), + new Dictionary + { + ["TestTable"] = new PipelineConfig( + new SourceConfig("lotfinder", "SELECT * FROM Test", null), + new Dictionary + { + ["mass"] = new SyncModeConfig("-365.00:00:00", PrePurge: true), + ["incremental"] = new SyncModeConfig("-1.00:00:00") + }, + null, + new DestinationConfig("TestTable", ["Id"], null), + null, + null) + }); + var factory = CreateFactory(config); + + // Act - should use bulkImport for mass mode + var pipeline = factory.ForTable("TestTable") + .WithMode(SyncMode.Mass) + .Build(); + + // Assert + pipeline.ShouldNotBeNull(); + } + + [Fact] + public void Builder_IncrementalMode_DefaultsToBulkMerge() + { + // Arrange - no destination override + var config = CreateValidConfig(); + var factory = CreateFactory(config); + + // Act - should use bulkMerge for incremental mode + var pipeline = factory.ForTable("TestTable") + .WithMode(SyncMode.Incremental) + .Build(); + + // Assert + pipeline.ShouldNotBeNull(); + } + + [Fact] + public void Builder_ModeWithDestinationOverride_UsesOverride() + { + // Arrange - mass mode with bulkMerge override + var config = new PipelinesRoot( + new PipelineSettings("UTC"), + new Dictionary + { + ["TestTable"] = new PipelineConfig( + new SourceConfig("lotfinder", "SELECT * FROM Test", null), + new Dictionary + { + ["mass"] = new SyncModeConfig("-365.00:00:00", + Destination: new DestinationOverride("bulkMerge", null, null)), + ["incremental"] = new SyncModeConfig("-1.00:00:00") + }, + null, + new DestinationConfig("TestTable", ["Id"], null), + null, + null) + }); + var factory = CreateFactory(config); + + // Act - mass mode should use bulkMerge due to override + var pipeline = factory.ForTable("TestTable") + .WithMode(SyncMode.Mass) + .Build(); + + // Assert + pipeline.ShouldNotBeNull(); + } + + [Fact] + public void Builder_BulkMergeWithoutMatchColumns_ThrowsInvalidOperationException() + { + // Arrange - bulkMerge needs matchColumns + var config = new PipelinesRoot( + new PipelineSettings("UTC"), + new Dictionary + { + ["TestTable"] = new PipelineConfig( + new SourceConfig("lotfinder", "SELECT * FROM Test", null), + new Dictionary + { + ["mass"] = new SyncModeConfig("-365.00:00:00", PrePurge: true), + ["incremental"] = new SyncModeConfig("-1.00:00:00") + }, + null, + new DestinationConfig("TestTable", null, null), // No matchColumns! + null, + null) + }); + var factory = CreateFactory(config); + + // Act & Assert + var ex = Should.Throw(() => + factory.ForTable("TestTable") + .WithMode(SyncMode.Incremental) // Uses bulkMerge + .Build()); + ex.Message.ShouldContain("matchColumns required for bulkMerge"); + } + + #endregion + + #region Parameter Tests + + [Fact] + public void Builder_WithOffsetParameter_CreatesSource() + { + // Arrange + var config = new PipelinesRoot( + new PipelineSettings("UTC"), + new Dictionary + { + ["TestTable"] = new PipelineConfig( + new SourceConfig("lotfinder", "SELECT * FROM Test WHERE UpdateDt >= @MinDt", + new Dictionary + { + ["minDt"] = new ParameterConfig("@MinDt", null, "offset", null) + }), + new Dictionary + { + ["mass"] = new SyncModeConfig("-365.00:00:00", PrePurge: true), + ["incremental"] = new SyncModeConfig("-1.00:00:00") + }, + null, + new DestinationConfig("TestTable", ["Id"], null), + null, + null) + }); + var factory = CreateFactory(config); + + // Act + var pipeline = factory.ForTable("TestTable") + .WithMode(SyncMode.Incremental) + .Build(); + + // Assert + pipeline.ShouldNotBeNull(); + } + + [Fact] + public void Builder_WithJdeJulianParameter_CreatesSource() + { + // Arrange + var config = new PipelinesRoot( + new PipelineSettings("UTC"), + new Dictionary + { + ["TestTable"] = new PipelineConfig( + new SourceConfig("jde", "SELECT * FROM Test WHERE UPMJ >= :dateUpdated", + new Dictionary + { + ["minDt"] = new ParameterConfig(":dateUpdated", "jdeJulian", "offset", null) + }), + new Dictionary + { + ["mass"] = new SyncModeConfig("-365.00:00:00", PrePurge: true), + ["incremental"] = new SyncModeConfig("-1.00:00:00") + }, + null, + new DestinationConfig("TestTable", ["Id"], null), + null, + null) + }); + var factory = CreateFactory(config); + + // Act + var pipeline = factory.ForTable("TestTable") + .WithMode(SyncMode.Incremental) + .Build(); + + // Assert + pipeline.ShouldNotBeNull(); + } + + [Fact] + public void Builder_WithStaticParameter_UsesConfiguredValue() + { + // Arrange + var config = new PipelinesRoot( + new PipelineSettings("UTC"), + new Dictionary + { + ["TestTable"] = new PipelineConfig( + new SourceConfig("lotfinder", "SELECT * FROM Test WHERE Status = @Status", + new Dictionary + { + ["status"] = new ParameterConfig("@Status", null, "static", "Active") + }), + new Dictionary + { + ["mass"] = new SyncModeConfig("-365.00:00:00", PrePurge: true), + ["incremental"] = new SyncModeConfig("-1.00:00:00") + }, + null, + new DestinationConfig("TestTable", ["Id"], null), + null, + null) + }); + var factory = CreateFactory(config); + + // Act + var pipeline = factory.ForTable("TestTable") + .WithMode(SyncMode.Incremental) + .Build(); + + // Assert + pipeline.ShouldNotBeNull(); + } + + [Fact] + public void Builder_WithStaticParameterNoValue_ThrowsInvalidOperationException() + { + // Arrange + var config = new PipelinesRoot( + new PipelineSettings("UTC"), + new Dictionary + { + ["TestTable"] = new PipelineConfig( + new SourceConfig("lotfinder", "SELECT * FROM Test WHERE Status = @Status", + new Dictionary + { + ["status"] = new ParameterConfig("@Status", null, "static", null) // No value! + }), + new Dictionary + { + ["mass"] = new SyncModeConfig("-365.00:00:00", PrePurge: true), + ["incremental"] = new SyncModeConfig("-1.00:00:00") + }, + null, + new DestinationConfig("TestTable", ["Id"], null), + null, + null) + }); + var factory = CreateFactory(config); + + // Act & Assert + var ex = Should.Throw(() => + factory.ForTable("TestTable") + .WithMode(SyncMode.Incremental) + .Build()); + ex.Message.ShouldContain("Static parameter '@Status' requires a value"); + } + + #endregion + + #region Script Tests + + [Fact] + public void Builder_WithPrePurge_AddsTruncateScript() + { + // Arrange + var config = new PipelinesRoot( + new PipelineSettings("UTC"), + new Dictionary + { + ["TestTable"] = new PipelineConfig( + new SourceConfig("lotfinder", "SELECT * FROM Test", null), + new Dictionary + { + ["mass"] = new SyncModeConfig("-365.00:00:00", PrePurge: true), + ["incremental"] = new SyncModeConfig("-1.00:00:00") + }, + null, + new DestinationConfig("TestTable", ["Id"], null), + null, + null) + }); + var factory = CreateFactory(config); + + // Act + var pipeline = factory.ForTable("TestTable") + .WithMode(SyncMode.Mass) + .Build(); + + // Assert + pipeline.ShouldNotBeNull(); + } + + [Fact] + public void Builder_WithReIndex_AddsRebuildScript() + { + // Arrange + var config = new PipelinesRoot( + new PipelineSettings("UTC"), + new Dictionary + { + ["TestTable"] = new PipelineConfig( + new SourceConfig("lotfinder", "SELECT * FROM Test", null), + new Dictionary + { + ["mass"] = new SyncModeConfig("-365.00:00:00", PrePurge: true, ReIndex: true), + ["incremental"] = new SyncModeConfig("-1.00:00:00") + }, + null, + new DestinationConfig("TestTable", ["Id"], null), + null, + null) + }); + var factory = CreateFactory(config); + + // Act + var pipeline = factory.ForTable("TestTable") + .WithMode(SyncMode.Mass) + .Build(); + + // Assert + pipeline.ShouldNotBeNull(); + } + + [Fact] + public void Builder_WithPreScripts_AddsConfiguredScripts() + { + // Arrange + var config = new PipelinesRoot( + new PipelineSettings("UTC"), + new Dictionary + { + ["TestTable"] = new PipelineConfig( + new SourceConfig("lotfinder", "SELECT * FROM Test", null), + new Dictionary + { + ["mass"] = new SyncModeConfig("-365.00:00:00", PrePurge: true), + ["incremental"] = new SyncModeConfig("-1.00:00:00") + }, + null, + new DestinationConfig("TestTable", ["Id"], null), + ["EXEC sp_BeforeSync"], + null) + }); + var factory = CreateFactory(config); + + // Act + var pipeline = factory.ForTable("TestTable") + .WithMode(SyncMode.Mass) + .Build(); + + // Assert + pipeline.ShouldNotBeNull(); + } + + [Fact] + public void Builder_WithPostScripts_AddsConfiguredScripts() + { + // Arrange + var config = new PipelinesRoot( + new PipelineSettings("UTC"), + new Dictionary + { + ["TestTable"] = new PipelineConfig( + new SourceConfig("lotfinder", "SELECT * FROM Test", null), + new Dictionary + { + ["mass"] = new SyncModeConfig("-365.00:00:00", PrePurge: true), + ["incremental"] = new SyncModeConfig("-1.00:00:00") + }, + null, + new DestinationConfig("TestTable", ["Id"], null), + null, + ["UPDATE TestTable SET ProcessedFlag = 1 WHERE ProcessedFlag IS NULL"]) + }); + var factory = CreateFactory(config); + + // Act + var pipeline = factory.ForTable("TestTable") + .WithMode(SyncMode.Incremental) + .Build(); + + // Assert + pipeline.ShouldNotBeNull(); + } + + #endregion + + #region Connection Type Tests + + [Theory] + [InlineData("jde")] + [InlineData("cms")] + [InlineData("lotfinder")] + public void Builder_WithValidConnectionType_BuildsPipeline(string connectionType) + { + // Arrange + var config = new PipelinesRoot( + new PipelineSettings("UTC"), + new Dictionary + { + ["TestTable"] = new PipelineConfig( + new SourceConfig(connectionType, "SELECT * FROM Test", null), + new Dictionary + { + ["mass"] = new SyncModeConfig("-365.00:00:00", PrePurge: true), + ["incremental"] = new SyncModeConfig("-1.00:00:00") + }, + null, + new DestinationConfig("TestTable", ["Id"], null), + null, + null) + }); + var factory = CreateFactory(config); + + // Act + var pipeline = factory.ForTable("TestTable") + .WithMode(SyncMode.Mass) + .Build(); + + // Assert + pipeline.ShouldNotBeNull(); + } + + #endregion + + #region Settings Tests + + [Fact] + public void Factory_WithNullSettings_UsesDefaults() + { + // Arrange - null settings should use defaults + var config = new PipelinesRoot( + null, // Null settings + new Dictionary + { + ["TestTable"] = new PipelineConfig( + new SourceConfig("lotfinder", "SELECT * FROM Test", null), + new Dictionary + { + ["mass"] = new SyncModeConfig("-365.00:00:00", PrePurge: true), + ["incremental"] = new SyncModeConfig("-1.00:00:00") + }, + null, + new DestinationConfig("TestTable", ["Id"], null), + null, + null) + }); + var factory = CreateFactory(config); + + // Act + var pipeline = factory.ForTable("TestTable") + .WithMode(SyncMode.Incremental) + .Build(); + + // Assert + pipeline.ShouldNotBeNull(); + } + + #endregion + + #region MinDtOffset Format Tests + + [Fact] + public void Builder_WithInvalidMinDtOffsetFormat_ThrowsInvalidOperationException() + { + // Arrange + var config = new PipelinesRoot( + new PipelineSettings("UTC"), + new Dictionary + { + ["TestTable"] = new PipelineConfig( + new SourceConfig("lotfinder", "SELECT * FROM Test", null), + new Dictionary + { + ["mass"] = new SyncModeConfig("-365.00:00:00", PrePurge: true), + ["incremental"] = new SyncModeConfig("not-a-valid-timespan") // Invalid! + }, + null, + new DestinationConfig("TestTable", ["Id"], null), + null, + null) + }); + var factory = CreateFactory(config); + + // Act & Assert + var ex = Should.Throw(() => + factory.ForTable("TestTable") + .WithMode(SyncMode.Incremental) + .Build()); + ex.Message.ShouldContain("Invalid minDtOffset format"); + } + + [Fact] + public void Builder_WithNullMinDtOffset_DoesNotThrow() + { + // Arrange - null offset should be valid (no date filtering) + var config = new PipelinesRoot( + new PipelineSettings("UTC"), + new Dictionary + { + ["TestTable"] = new PipelineConfig( + new SourceConfig("lotfinder", "SELECT * FROM Test", null), + new Dictionary + { + ["mass"] = new SyncModeConfig(null, PrePurge: true), // Null offset + ["incremental"] = new SyncModeConfig("-1.00:00:00") + }, + null, + new DestinationConfig("TestTable", ["Id"], null), + null, + null) + }); + var factory = CreateFactory(config); + + // Act + var pipeline = factory.ForTable("TestTable") + .WithMode(SyncMode.Mass) + .Build(); + + // Assert + pipeline.ShouldNotBeNull(); + } + + #endregion + + #region Helper Methods + + private PipelinesRoot CreateValidConfig() + { + return new PipelinesRoot( + new PipelineSettings("UTC"), + new Dictionary + { + ["TestTable"] = new PipelineConfig( + new SourceConfig("lotfinder", "SELECT * FROM Test WHERE UpdateDt >= @MinDt", + new Dictionary + { + ["minDt"] = new ParameterConfig("@MinDt", null, "offset", null) + }), + new Dictionary + { + ["mass"] = new SyncModeConfig("-365.00:00:00", PrePurge: true, ReIndex: true), + ["incremental"] = new SyncModeConfig("-1.00:00:00") + }, + null, + new DestinationConfig("TestTable", ["Id"], null), + null, + null) + }); + } + + private EtlPipelineFactory CreateFactory(PipelinesRoot config) + { + return new EtlPipelineFactory(_connectionFactory, config, _logger); + } + + #endregion +}