diff --git a/PLANS/2026-01-03-etl-pipeline-phase2-implementation.md b/PLANS/2026-01-03-etl-pipeline-phase2-implementation.md new file mode 100644 index 0000000..ab54778 --- /dev/null +++ b/PLANS/2026-01-03-etl-pipeline-phase2-implementation.md @@ -0,0 +1,1040 @@ +# ETL Pipeline Phase 2 Implementation Plan + +> **For Claude:** REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task. + +**Goal:** Extend ETL pipeline with column mapping, schema support, timeouts, date validation, ordinal mapping, and collision detection. + +**Architecture:** Enhance existing transformer/destination infrastructure with MapOrdinal interface method, QUOTENAME-based SQL generation, and intersection-based column mapping. + +**Tech Stack:** C#/.NET 10, SqlBulkCopy, Dapper, xUnit + +--- + +## Task 1: Add MapOrdinal to IDataTransformer Interface + +**Files:** +- Modify: `NEW/src/JdeScoping.DataSync/Etl/Contracts/IDataTransformer.cs` +- Test: `NEW/tests/JdeScoping.DataSync.Tests/Etl/Transformers/DataTransformerBaseTests.cs` + +**Step 1: Write the failing test** + +```csharp +[Fact] +public void MapOrdinal_DefaultImplementation_ReturnsOrdinalUnchanged() +{ + // Arrange + var transformer = new PassThroughTransformer(); + var reader = CreateMockReader(new[] { "A", "B", "C" }); + + // Act + var result = transformer.MapOrdinal(1, reader); + + // Assert + Assert.Equal(1, result); +} + +private class PassThroughTransformer : DataTransformerBase +{ + public override string TransformerName => "PassThrough"; +} +``` + +**Step 2: Run test to verify it fails** + +Run: `dotnet test tests/JdeScoping.DataSync.Tests --filter "MapOrdinal_DefaultImplementation" --no-build` +Expected: FAIL - MapOrdinal method doesn't exist + +**Step 3: Add MapOrdinal to IDataTransformer interface** + +```csharp +public interface IDataTransformer +{ + IDataReader Transform(IDataReader source); + string TransformerName { get; } + + /// + /// Maps a transformed ordinal to the source ordinal. + /// Returns -1 for computed columns that have no source ordinal. + /// + int MapOrdinal(int transformedOrdinal, IDataReader source); +} +``` + +**Step 4: Add default implementation to DataTransformerBase** + +```csharp +public virtual int MapOrdinal(int transformedOrdinal, IDataReader source) + => transformedOrdinal; +``` + +**Step 5: Run test to verify it passes** + +Run: `dotnet test tests/JdeScoping.DataSync.Tests --filter "MapOrdinal_DefaultImplementation" --no-build` +Expected: PASS + +**Step 6: Commit** + +```bash +git add src/JdeScoping.DataSync/Etl/Contracts/IDataTransformer.cs src/JdeScoping.DataSync/Etl/Transformers/DataTransformerBase.cs tests/JdeScoping.DataSync.Tests/Etl/Transformers/DataTransformerBaseTests.cs +git commit -m "feat(etl): add MapOrdinal to IDataTransformer interface" +``` + +--- + +## Task 2: Add Binary Method Overrides to DataTransformerBase + +**Files:** +- Modify: `NEW/src/JdeScoping.DataSync/Etl/Transformers/DataTransformerBase.cs` +- Modify: `NEW/src/JdeScoping.DataSync/Etl/Transformers/TransformingDataReader.cs` +- Test: `NEW/tests/JdeScoping.DataSync.Tests/Etl/Transformers/TransformingDataReaderTests.cs` + +**Step 1: Write the failing test for GetBytes with computed column** + +```csharp +[Fact] +public void GetBytes_ComputedColumn_ThrowsNotSupportedException() +{ + // Arrange - transformer that returns -1 for ordinal 0 (computed) + var transformer = new ComputedColumnTransformer(); + var source = CreateMockReader(new[] { "A", "B" }); + var reader = new TransformingDataReader(source, transformer); + transformer.Initialize(source); + + // Act & Assert + Assert.Throws(() => + reader.GetBytes(0, 0, null, 0, 0)); +} + +private class ComputedColumnTransformer : DataTransformerBase +{ + public override string TransformerName => "Computed"; + public override int MapOrdinal(int ordinal, IDataReader source) => ordinal == 0 ? -1 : ordinal; +} +``` + +**Step 2: Run test to verify it fails** + +Run: `dotnet test tests/JdeScoping.DataSync.Tests --filter "GetBytes_ComputedColumn" --no-build` +Expected: FAIL - doesn't throw + +**Step 3: Add virtual methods to DataTransformerBase** + +```csharp +public virtual long GetBytes(int ordinal, long fieldOffset, byte[]? buffer, + int bufferOffset, int length, IDataReader source) +{ + var sourceOrdinal = MapOrdinal(ordinal, source); + if (sourceOrdinal < 0) + throw new NotSupportedException( + $"GetBytes not supported for computed column at ordinal {ordinal}."); + return source.GetBytes(sourceOrdinal, fieldOffset, buffer, bufferOffset, length); +} + +public virtual long GetChars(int ordinal, long fieldOffset, char[]? buffer, + int bufferOffset, int length, IDataReader source) +{ + var sourceOrdinal = MapOrdinal(ordinal, source); + if (sourceOrdinal < 0) + throw new NotSupportedException( + $"GetChars not supported for computed column at ordinal {ordinal}."); + return source.GetChars(sourceOrdinal, fieldOffset, buffer, bufferOffset, length); +} + +public virtual IDataReader GetData(int ordinal, IDataReader source) +{ + var sourceOrdinal = MapOrdinal(ordinal, source); + if (sourceOrdinal < 0) + throw new NotSupportedException( + $"GetData not supported for computed column at ordinal {ordinal}."); + return source.GetData(sourceOrdinal); +} + +public virtual string GetDataTypeName(int ordinal, IDataReader source) +{ + var sourceOrdinal = MapOrdinal(ordinal, source); + if (sourceOrdinal < 0) + throw new NotSupportedException( + $"GetDataTypeName not supported for computed column at ordinal {ordinal}."); + return source.GetDataTypeName(sourceOrdinal); +} +``` + +**Step 4: Update TransformingDataReader to delegate** + +```csharp +public long GetBytes(int i, long fieldOffset, byte[]? buffer, int bufferoffset, int length) + => _transformer.GetBytes(i, fieldOffset, buffer, bufferoffset, length, _source); + +public long GetChars(int i, long fieldoffset, char[]? buffer, int bufferoffset, int length) + => _transformer.GetChars(i, fieldoffset, buffer, bufferoffset, length, _source); + +public IDataReader GetData(int i) + => _transformer.GetData(i, _source); + +public string GetDataTypeName(int i) + => _transformer.GetDataTypeName(i, _source); +``` + +**Step 5: Run test to verify it passes** + +Run: `dotnet test tests/JdeScoping.DataSync.Tests --filter "GetBytes_ComputedColumn" --no-build` +Expected: PASS + +**Step 6: Commit** + +```bash +git add src/JdeScoping.DataSync/Etl/Transformers/DataTransformerBase.cs src/JdeScoping.DataSync/Etl/Transformers/TransformingDataReader.cs tests/JdeScoping.DataSync.Tests/Etl/Transformers/TransformingDataReaderTests.cs +git commit -m "feat(etl): add binary method overrides to DataTransformerBase" +``` + +--- + +## Task 3: Add MapOrdinal Override to ColumnDropTransformer + +**Files:** +- Modify: `NEW/src/JdeScoping.DataSync/Etl/Transformers/ColumnDropTransformer.cs` +- Test: `NEW/tests/JdeScoping.DataSync.Tests/Etl/Transformers/ColumnDropTransformerTests.cs` + +**Step 1: Write the failing test** + +```csharp +[Fact] +public void MapOrdinal_DroppedColumn_MapsCorrectly() +{ + // Arrange - drop column B (ordinal 1), so C becomes ordinal 1 + var transformer = new ColumnDropTransformer("B"); + var source = CreateMockReader(new[] { "A", "B", "C" }); + transformer.Transform(source); + + // Act - transformed ordinal 1 should map to source ordinal 2 (C) + var result = transformer.MapOrdinal(1, source); + + // Assert + Assert.Equal(2, result); +} +``` + +**Step 2: Run test to verify it fails** + +Run: `dotnet test tests/JdeScoping.DataSync.Tests --filter "MapOrdinal_DroppedColumn" --no-build` +Expected: FAIL - returns 1 instead of 2 + +**Step 3: Add MapOrdinal override to ColumnDropTransformer** + +```csharp +public override int MapOrdinal(int transformedOrdinal, IDataReader source) +{ + EnsureInitialized(source); + return _ordinalMap![transformedOrdinal]; +} +``` + +**Step 4: Run test to verify it passes** + +Run: `dotnet test tests/JdeScoping.DataSync.Tests --filter "MapOrdinal_DroppedColumn" --no-build` +Expected: PASS + +**Step 5: Commit** + +```bash +git add src/JdeScoping.DataSync/Etl/Transformers/ColumnDropTransformer.cs tests/JdeScoping.DataSync.Tests/Etl/Transformers/ColumnDropTransformerTests.cs +git commit -m "feat(etl): add MapOrdinal override to ColumnDropTransformer" +``` + +--- + +## Task 4: Add MapOrdinal and Sentinel to JdeDateTransformer + +**Files:** +- Modify: `NEW/src/JdeScoping.DataSync/Etl/Transformers/JdeDateTransformer.cs` +- Test: `NEW/tests/JdeScoping.DataSync.Tests/Etl/Transformers/JdeDateTransformerTests.cs` + +**Step 1: Write the failing test for MapOrdinal** + +```csharp +[Fact] +public void MapOrdinal_DateOutputColumn_ReturnsNegativeOne() +{ + // Arrange + var transformer = new JdeDateTransformer("UPMJ", "TDAY", "UpdatedAt"); + var source = CreateMockReader(new[] { "UPMJ", "TDAY", "Other" }); + transformer.Transform(source); + + // Act - ordinal 0 is the computed DateTime column + var result = transformer.MapOrdinal(0, source); + + // Assert - computed columns return -1 + Assert.Equal(-1, result); +} + +[Fact] +public void MapOrdinal_NonComputedColumn_ReturnsSourceOrdinal() +{ + // Arrange + var transformer = new JdeDateTransformer("UPMJ", "TDAY", "UpdatedAt"); + var source = CreateMockReader(new[] { "UPMJ", "TDAY", "Other" }); + transformer.Transform(source); + + // Act - ordinal 1 is "Other" which maps to source ordinal 2 + var result = transformer.MapOrdinal(1, source); + + // Assert + Assert.Equal(2, result); +} +``` + +**Step 2: Run tests to verify they fail** + +Run: `dotnet test tests/JdeScoping.DataSync.Tests --filter "MapOrdinal_DateOutputColumn|MapOrdinal_NonComputedColumn" --no-build` +Expected: FAIL + +**Step 3: Write the failing test for sentinel** + +```csharp +[Fact] +public void ParseJdeDateTime_InvalidDate_ReturnsSentinel() +{ + // Arrange + var sentinel = new DateTime(1900, 1, 1); + + // Act - 999999 is invalid (century 9 doesn't exist) + var result = JdeDateTransformer.ParseJdeDateTime(999999m, 0m, sentinel); + + // Assert + Assert.Equal(sentinel, result); +} + +[Fact] +public void ParseJdeDateTime_ZeroDate_ReturnsSentinel() +{ + // Arrange + var sentinel = new DateTime(1900, 1, 1); + + // Act + var result = JdeDateTransformer.ParseJdeDateTime(0m, 0m, sentinel); + + // Assert + Assert.Equal(sentinel, result); +} + +[Fact] +public void Constructor_DefaultSentinel_Is1900() +{ + // Arrange & Act + var transformer = new JdeDateTransformer("D", "T", "Out"); + + // Assert + Assert.Equal(new DateTime(1900, 1, 1), JdeDateTransformer.DefaultInvalidDateSentinel); +} +``` + +**Step 4: Run tests to verify they fail** + +Run: `dotnet test tests/JdeScoping.DataSync.Tests --filter "ParseJdeDateTime_InvalidDate|ParseJdeDateTime_ZeroDate|Constructor_DefaultSentinel" --no-build` +Expected: FAIL + +**Step 5: Update JdeDateTransformer** + +Add to class: +```csharp +public static readonly DateTime DefaultInvalidDateSentinel = new(1900, 1, 1); +private readonly DateTime _invalidDateSentinel; +``` + +Update constructor: +```csharp +public JdeDateTransformer( + string dateColumn, + string timeColumn, + string outputColumn, + DateTime? invalidDateSentinel = null) +{ + ArgumentException.ThrowIfNullOrWhiteSpace(dateColumn); + ArgumentException.ThrowIfNullOrWhiteSpace(timeColumn); + ArgumentException.ThrowIfNullOrWhiteSpace(outputColumn); + _dateColumn = dateColumn; + _timeColumn = timeColumn; + _outputColumn = outputColumn; + _invalidDateSentinel = invalidDateSentinel ?? DefaultInvalidDateSentinel; +} +``` + +Update ParseJdeDateTime to accept sentinel and validate: +```csharp +public static DateTime ParseJdeDateTime(decimal julianDate, decimal time, DateTime sentinel) +{ + var dateInt = (int)julianDate; + if (dateInt <= 0) return sentinel; + + var century = dateInt / 100000; + var year = (dateInt / 1000) % 100; + var dayOfYear = dateInt % 1000; + + if (century < 0 || century > 1) return sentinel; + if (year < 0 || year > 99) return sentinel; + if (dayOfYear < 1 || dayOfYear > 366) return sentinel; + + var fullYear = (century == 0 ? 1900 : 2000) + year; + var daysInYear = DateTime.IsLeapYear(fullYear) ? 366 : 365; + if (dayOfYear > daysInYear) return sentinel; + + var date = new DateTime(fullYear, 1, 1).AddDays(dayOfYear - 1); + + var timeInt = (int)time; + var hours = timeInt / 10000; + var minutes = (timeInt / 100) % 100; + var seconds = timeInt % 100; + + if (hours < 0 || hours > 23) return sentinel; + if (minutes < 0 || minutes > 59) return sentinel; + if (seconds < 0 || seconds > 59) return sentinel; + + return date.AddHours(hours).AddMinutes(minutes).AddSeconds(seconds); +} +``` + +Add MapOrdinal override: +```csharp +public override int MapOrdinal(int transformedOrdinal, IDataReader source) +{ + EnsureInitialized(source); + var sourceOrdinal = _ordinalMap![transformedOrdinal]; + return sourceOrdinal == _dateOrdinal ? -1 : sourceOrdinal; +} +``` + +Add GetDataTypeName override: +```csharp +public override string GetDataTypeName(int ordinal, IDataReader source) +{ + EnsureInitialized(source); + var sourceOrdinal = _ordinalMap![ordinal]; + return sourceOrdinal == _dateOrdinal ? "datetime" : source.GetDataTypeName(sourceOrdinal); +} +``` + +**Step 6: Run all tests to verify they pass** + +Run: `dotnet test tests/JdeScoping.DataSync.Tests --filter "JdeDateTransformer" --no-build` +Expected: PASS + +**Step 7: Commit** + +```bash +git add src/JdeScoping.DataSync/Etl/Transformers/JdeDateTransformer.cs tests/JdeScoping.DataSync.Tests/Etl/Transformers/JdeDateTransformerTests.cs +git commit -m "feat(etl): add MapOrdinal and date validation with sentinel to JdeDateTransformer" +``` + +--- + +## Task 5: Add Collision Detection to ColumnRenameTransformer + +**Files:** +- Modify: `NEW/src/JdeScoping.DataSync/Etl/Transformers/ColumnRenameTransformer.cs` +- Test: `NEW/tests/JdeScoping.DataSync.Tests/Etl/Transformers/ColumnRenameTransformerTests.cs` + +**Step 1: Write the failing test** + +```csharp +[Fact] +public void OnInitialize_RenameCollision_ThrowsInvalidOperationException() +{ + // Arrange - renaming A to B when B already exists + var transformer = new ColumnRenameTransformer(("A", "B")); + var source = CreateMockReader(new[] { "A", "B", "C" }); + + // Act & Assert + var ex = Assert.Throws(() => + transformer.Transform(source)); + Assert.Contains("A", ex.Message); + Assert.Contains("B", ex.Message); + Assert.Contains("collision", ex.Message.ToLower()); +} + +[Fact] +public void OnInitialize_PreExistingDuplicates_ThrowsInvalidOperationException() +{ + // Arrange - source has duplicate column names (case-insensitive) + var transformer = new ColumnRenameTransformer(); + var source = CreateMockReaderWithDuplicates(new[] { "Name", "name" }); + + // Act & Assert + Assert.Throws(() => + transformer.Transform(source)); +} +``` + +**Step 2: Run tests to verify they fail** + +Run: `dotnet test tests/JdeScoping.DataSync.Tests --filter "RenameCollision|PreExistingDuplicates" --no-build` +Expected: FAIL - doesn't throw + +**Step 3: Update OnInitialize with collision detection** + +```csharp +protected override void OnInitialize(IDataReader source) +{ + _outputNames = new string[source.FieldCount]; + _nameToOrdinal = new Dictionary(StringComparer.OrdinalIgnoreCase); + + for (int i = 0; i < source.FieldCount; i++) + { + var originalName = source.GetName(i); + var outputName = _renames.TryGetValue(originalName, out var newName) + ? newName + : originalName; + + if (_nameToOrdinal.ContainsKey(outputName)) + { + var existingOrdinal = _nameToOrdinal[outputName]; + var existingOriginal = source.GetName(existingOrdinal); + throw new InvalidOperationException( + $"Column name collision: '{originalName}' → '{outputName}' conflicts with " + + $"'{existingOriginal}' (already at ordinal {existingOrdinal}). " + + $"Each output column name must be unique."); + } + + _outputNames[i] = outputName; + _nameToOrdinal[outputName] = i; + } +} +``` + +**Step 4: Run tests to verify they pass** + +Run: `dotnet test tests/JdeScoping.DataSync.Tests --filter "RenameCollision|PreExistingDuplicates" --no-build` +Expected: PASS + +**Step 5: Commit** + +```bash +git add src/JdeScoping.DataSync/Etl/Transformers/ColumnRenameTransformer.cs tests/JdeScoping.DataSync.Tests/Etl/Transformers/ColumnRenameTransformerTests.cs +git commit -m "feat(etl): add collision detection to ColumnRenameTransformer" +``` + +--- + +## Task 6: Add Parameters Support to SqlScriptRunner + +**Files:** +- Modify: `NEW/src/JdeScoping.DataSync/Etl/Scripts/SqlScriptRunner.cs` +- Test: `NEW/tests/JdeScoping.DataSync.Tests/Etl/Scripts/SqlScriptRunnerTests.cs` + +**Step 1: Write the failing test** + +```csharp +[Fact] +public async Task ExecuteAsync_WithParameters_PassesParametersToCommand() +{ + // Arrange + var mockFactory = new Mock(); + var mockConnection = new Mock(); + mockFactory.Setup(f => f.CreateLotFinderConnectionAsync(It.IsAny())) + .ReturnsAsync(mockConnection.Object); + + var parameters = new { tableName = "WorkOrder", schemaName = "dbo" }; + var runner = new SqlScriptRunner( + mockFactory.Object, + "SELECT @tableName, @schemaName", + "Test", + parameters: parameters); + + // This test verifies the constructor accepts parameters + Assert.NotNull(runner); +} +``` + +**Step 2: Run test to verify it fails** + +Run: `dotnet test tests/JdeScoping.DataSync.Tests --filter "WithParameters_PassesParametersToCommand" --no-build` +Expected: FAIL - constructor doesn't accept parameters + +**Step 3: Update SqlScriptRunner constructor** + +```csharp +public class SqlScriptRunner : IScriptRunner +{ + private readonly IDbConnectionFactory _connectionFactory; + private readonly string _sql; + private readonly string _scriptName; + private readonly object? _parameters; + private readonly int _timeoutSeconds; + + public SqlScriptRunner( + IDbConnectionFactory connectionFactory, + string sql, + string scriptName, + object? parameters = null, + int timeoutSeconds = 30) + { + _connectionFactory = connectionFactory ?? throw new ArgumentNullException(nameof(connectionFactory)); + ArgumentException.ThrowIfNullOrWhiteSpace(sql); + ArgumentException.ThrowIfNullOrWhiteSpace(scriptName); + _sql = sql; + _scriptName = scriptName; + _parameters = parameters; + _timeoutSeconds = timeoutSeconds; + } + + public string ScriptName => _scriptName; + + public async Task ExecuteAsync(CancellationToken cancellationToken = default) + { + await using var connection = await _connectionFactory.CreateLotFinderConnectionAsync(cancellationToken); + await connection.ExecuteAsync( + new CommandDefinition(_sql, _parameters, commandTimeout: _timeoutSeconds, cancellationToken: cancellationToken)); + } +} +``` + +**Step 4: Run test to verify it passes** + +Run: `dotnet test tests/JdeScoping.DataSync.Tests --filter "WithParameters_PassesParametersToCommand" --no-build` +Expected: PASS + +**Step 5: Commit** + +```bash +git add src/JdeScoping.DataSync/Etl/Scripts/SqlScriptRunner.cs tests/JdeScoping.DataSync.Tests/Etl/Scripts/SqlScriptRunnerTests.cs +git commit -m "feat(etl): add parameters support to SqlScriptRunner" +``` + +--- + +## Task 7: Update CommonScripts with ParseTableName and QUOTENAME + +**Files:** +- Modify: `NEW/src/JdeScoping.DataSync/Etl/Scripts/CommonScripts.cs` +- Test: `NEW/tests/JdeScoping.DataSync.Tests/Etl/Scripts/CommonScriptsTests.cs` + +**Step 1: Write the failing tests** + +```csharp +[Theory] +[InlineData("WorkOrder", "dbo", "WorkOrder")] +[InlineData("dbo.WorkOrder", "dbo", "WorkOrder")] +[InlineData("[dbo].[WorkOrder]", "dbo", "WorkOrder")] +[InlineData("Config.Settings", "Config", "Settings")] +public void ParseTableName_VariousFormats_ParsesCorrectly(string input, string expectedSchema, string expectedTable) +{ + // Act + var (schema, table) = CommonScripts.ParseTableName(input); + + // Assert + Assert.Equal(expectedSchema, schema); + Assert.Equal(expectedTable, table); +} +``` + +**Step 2: Run test to verify it fails** + +Run: `dotnet test tests/JdeScoping.DataSync.Tests --filter "ParseTableName_VariousFormats" --no-build` +Expected: FAIL - method doesn't exist + +**Step 3: Add ParseTableName and update all methods** + +```csharp +public static class CommonScripts +{ + /// + /// Parses a table name, extracting schema if present. + /// Supports: "Table", "dbo.Table", "[dbo].[Table]" + /// + public static (string Schema, string Table) ParseTableName(string tableName) + { + var cleaned = tableName.Replace("[", "").Replace("]", ""); + var parts = cleaned.Split('.', 2); + return parts.Length == 2 + ? (parts[0], parts[1]) + : ("dbo", parts[0]); + } + + public static IScriptRunner DisableIndexes( + IDbConnectionFactory factory, + string tableName, + int timeoutSeconds = 300) + { + var (schema, table) = ParseTableName(tableName); + + var sql = @" +DECLARE @sql NVARCHAR(MAX) = ''; +DECLARE @fullTableName NVARCHAR(256) = QUOTENAME(@schemaName) + '.' + QUOTENAME(@tableName); + +SELECT @sql = @sql + 'ALTER INDEX ' + QUOTENAME(i.name) + ' ON ' + @fullTableName + ' DISABLE;' + CHAR(13) +FROM sys.indexes i +INNER JOIN sys.tables t ON i.object_id = t.object_id +INNER JOIN sys.schemas s ON t.schema_id = s.schema_id +WHERE t.name = @tableName + AND s.name = @schemaName + AND i.type = 2 + AND i.is_disabled = 0; + +IF LEN(@sql) > 0 EXEC sp_executesql @sql;"; + + return new SqlScriptRunner(factory, sql, $"DisableIndexes:{schema}.{table}", + parameters: new { tableName = table, schemaName = schema }, + timeoutSeconds: timeoutSeconds); + } + + public static IScriptRunner RebuildIndexes( + IDbConnectionFactory factory, + string tableName, + int timeoutSeconds = 3600) + { + var (schema, table) = ParseTableName(tableName); + + var sql = @" +DECLARE @sql NVARCHAR(256) = 'ALTER INDEX ALL ON ' + QUOTENAME(@schemaName) + '.' + QUOTENAME(@tableName) + ' REBUILD WITH (FILLFACTOR = 95)'; +EXEC sp_executesql @sql;"; + + return new SqlScriptRunner(factory, sql, $"RebuildIndexes:{schema}.{table}", + parameters: new { tableName = table, schemaName = schema }, + timeoutSeconds: timeoutSeconds); + } + + public static IScriptRunner UpdateStatistics( + IDbConnectionFactory factory, + string tableName, + int timeoutSeconds = 600) + { + var (schema, table) = ParseTableName(tableName); + + var sql = @" +DECLARE @sql NVARCHAR(256) = 'UPDATE STATISTICS ' + QUOTENAME(@schemaName) + '.' + QUOTENAME(@tableName); +EXEC sp_executesql @sql;"; + + return new SqlScriptRunner(factory, sql, $"UpdateStats:{schema}.{table}", + parameters: new { tableName = table, schemaName = schema }, + timeoutSeconds: timeoutSeconds); + } + + public static IScriptRunner CustomSql( + IDbConnectionFactory factory, + string sql, + string name, + object? parameters = null, + int timeoutSeconds = 30) + { + return new SqlScriptRunner(factory, sql, name, parameters, timeoutSeconds); + } +} +``` + +**Step 4: Run tests to verify they pass** + +Run: `dotnet test tests/JdeScoping.DataSync.Tests --filter "CommonScripts" --no-build` +Expected: PASS + +**Step 5: Commit** + +```bash +git add src/JdeScoping.DataSync/Etl/Scripts/CommonScripts.cs tests/JdeScoping.DataSync.Tests/Etl/Scripts/CommonScriptsTests.cs +git commit -m "feat(etl): add ParseTableName and QUOTENAME to CommonScripts" +``` + +--- + +## Task 8: Add Command Timeout to Destinations + +**Files:** +- Modify: `NEW/src/JdeScoping.DataSync/Etl/Destinations/DbBulkMergeDestination.cs` +- Modify: `NEW/src/JdeScoping.DataSync/Etl/Destinations/DbBulkImportDestination.cs` +- Test: `NEW/tests/JdeScoping.DataSync.Tests/Etl/Destinations/DbBulkMergeDestinationTests.cs` + +**Step 1: Write the failing test** + +```csharp +[Fact] +public void Constructor_CustomTimeout_SetsTimeout() +{ + // Arrange & Act + var dest = new DbBulkMergeDestination( + Mock.Of(), + "TestTable", + new[] { "Id" }, + commandTimeoutSeconds: 1800); + + // Assert - can't directly test private field, but constructor should accept it + Assert.NotNull(dest); +} + +[Fact] +public void Constructor_ZeroTimeout_UsesDefault() +{ + // Arrange & Act + var dest = new DbBulkMergeDestination( + Mock.Of(), + "TestTable", + new[] { "Id" }, + commandTimeoutSeconds: 0); + + // Assert + Assert.NotNull(dest); +} +``` + +**Step 2: Run tests to verify they fail** + +Run: `dotnet test tests/JdeScoping.DataSync.Tests --filter "Constructor_CustomTimeout|Constructor_ZeroTimeout" --no-build` +Expected: FAIL - parameter doesn't exist + +**Step 3: Update DbBulkMergeDestination** + +Add fields and update constructor: +```csharp +private const int DefaultCommandTimeoutSeconds = 600; +private readonly int _commandTimeoutSeconds; + +public DbBulkMergeDestination( + IDbConnectionFactory connectionFactory, + string tableName, + string[] matchColumns, + string[]? updateColumns = null, + int batchSize = 0, + int commandTimeoutSeconds = 0) +{ + // ... existing validation ... + _commandTimeoutSeconds = commandTimeoutSeconds > 0 + ? commandTimeoutSeconds + : DefaultCommandTimeoutSeconds; +} +``` + +Update bulk copy: +```csharp +using var bulkCopy = new SqlBulkCopy(connection) +{ + DestinationTableName = tempTableName, + BatchSize = batch.Rows.Count, + BulkCopyTimeout = _commandTimeoutSeconds +}; +``` + +Update command execution: +```csharp +await using var cmd = connection.CreateCommand(); +cmd.CommandText = sql; +cmd.CommandTimeout = _commandTimeoutSeconds; +await cmd.ExecuteNonQueryAsync(ct); +``` + +**Step 4: Apply same changes to DbBulkImportDestination** + +**Step 5: Run tests to verify they pass** + +Run: `dotnet test tests/JdeScoping.DataSync.Tests --filter "DbBulkMergeDestination|DbBulkImportDestination" --no-build` +Expected: PASS + +**Step 6: Commit** + +```bash +git add src/JdeScoping.DataSync/Etl/Destinations/DbBulkMergeDestination.cs src/JdeScoping.DataSync/Etl/Destinations/DbBulkImportDestination.cs tests/JdeScoping.DataSync.Tests/Etl/Destinations/ +git commit -m "feat(etl): add commandTimeoutSeconds to destinations" +``` + +--- + +## Task 9: Add Column Mapping to Destinations + +**Files:** +- Modify: `NEW/src/JdeScoping.DataSync/Etl/Destinations/DbBulkMergeDestination.cs` +- Modify: `NEW/src/JdeScoping.DataSync/Etl/Destinations/DbBulkImportDestination.cs` +- Test: `NEW/tests/JdeScoping.DataSync.Tests/Etl/Destinations/DbBulkMergeDestinationTests.cs` + +**Step 1: Write the failing test** + +```csharp +[Fact] +public async Task WriteAsync_SourceHasExtraColumns_IgnoresExtraColumns() +{ + // This is an integration test concept - + // we need to verify that column mappings are applied + // The actual test would use a real DB or comprehensive mock +} +``` + +**Step 2: Add GetDestinationColumnsAsync method** + +```csharp +private async Task> GetDestinationColumnsAsync( + SqlConnection connection, + CancellationToken ct) +{ + var (schema, table) = CommonScripts.ParseTableName(_tableName); + var sql = @"SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.COLUMNS + WHERE TABLE_NAME = @tableName AND TABLE_SCHEMA = @schemaName"; + var columns = await connection.QueryAsync( + new CommandDefinition(sql, new { tableName = table, schemaName = schema }, + commandTimeout: _commandTimeoutSeconds, cancellationToken: ct)); + return columns.ToHashSet(StringComparer.OrdinalIgnoreCase); +} +``` + +**Step 3: Update ProcessBatchAsync to use column mappings** + +```csharp +private async Task ProcessBatchAsync( + SqlConnection connection, + DataTable batch, + string tempTableName, + string mergeSql, + HashSet destColumns, + CancellationToken ct) +{ + using var bulkCopy = new SqlBulkCopy(connection) + { + DestinationTableName = tempTableName, + BatchSize = batch.Rows.Count, + BulkCopyTimeout = _commandTimeoutSeconds + }; + + // Map only columns that exist in destination + foreach (DataColumn col in batch.Columns) + { + if (destColumns.Contains(col.ColumnName)) + { + bulkCopy.ColumnMappings.Add(col.ColumnName, col.ColumnName); + } + } + + await bulkCopy.WriteToServerAsync(batch, ct); + // ... rest of method +} +``` + +**Step 4: Run tests to verify they pass** + +Run: `dotnet test tests/JdeScoping.DataSync.Tests --filter "DbBulkMergeDestination" --no-build` +Expected: PASS + +**Step 5: Apply same changes to DbBulkImportDestination** + +**Step 6: Commit** + +```bash +git add src/JdeScoping.DataSync/Etl/Destinations/DbBulkMergeDestination.cs src/JdeScoping.DataSync/Etl/Destinations/DbBulkImportDestination.cs tests/JdeScoping.DataSync.Tests/Etl/Destinations/ +git commit -m "feat(etl): add column mapping to destinations (intersect with dest schema)" +``` + +--- + +## Task 10: Add WithCommandTimeout to EtlPipelineBuilder + +**Files:** +- Modify: `NEW/src/JdeScoping.DataSync/Etl/Pipeline/EtlPipelineBuilder.cs` +- Test: `NEW/tests/JdeScoping.DataSync.Tests/Etl/Pipeline/EtlPipelineBuilderTests.cs` + +**Step 1: Write the failing tests** + +```csharp +[Fact] +public void WithCommandTimeout_ValidTimeout_SetsTimeout() +{ + // Arrange + var builder = new EtlPipelineBuilder(); + + // Act + var result = builder.WithCommandTimeout(TimeSpan.FromMinutes(30)); + + // Assert + Assert.Same(builder, result); +} + +[Fact] +public void WithCommandTimeout_NegativeTimeout_ThrowsArgumentOutOfRange() +{ + // Arrange + var builder = new EtlPipelineBuilder(); + + // Act & Assert + Assert.Throws(() => + builder.WithCommandTimeout(TimeSpan.FromSeconds(-1))); +} + +[Fact] +public void WithCommandTimeout_Over24Hours_ThrowsArgumentOutOfRange() +{ + // Arrange + var builder = new EtlPipelineBuilder(); + + // Act & Assert + Assert.Throws(() => + builder.WithCommandTimeout(TimeSpan.FromHours(25))); +} +``` + +**Step 2: Run tests to verify they fail** + +Run: `dotnet test tests/JdeScoping.DataSync.Tests --filter "WithCommandTimeout" --no-build` +Expected: FAIL - method doesn't exist + +**Step 3: Add WithCommandTimeout method** + +```csharp +private int _defaultCommandTimeoutSeconds = 600; + +public EtlPipelineBuilder WithCommandTimeout(TimeSpan timeout) +{ + if (timeout < TimeSpan.Zero || timeout > TimeSpan.FromHours(24)) + throw new ArgumentOutOfRangeException(nameof(timeout), + "Timeout must be between 0 and 24 hours."); + _defaultCommandTimeoutSeconds = (int)timeout.TotalSeconds; + return this; +} +``` + +**Step 4: Run tests to verify they pass** + +Run: `dotnet test tests/JdeScoping.DataSync.Tests --filter "WithCommandTimeout" --no-build` +Expected: PASS + +**Step 5: Commit** + +```bash +git add src/JdeScoping.DataSync/Etl/Pipeline/EtlPipelineBuilder.cs tests/JdeScoping.DataSync.Tests/Etl/Pipeline/EtlPipelineBuilderTests.cs +git commit -m "feat(etl): add WithCommandTimeout to EtlPipelineBuilder with validation" +``` + +--- + +## Task 11: Run Full Test Suite and Verify + +**Step 1: Build the solution** + +Run: `dotnet build NEW/JdeScoping.sln` +Expected: Build succeeded + +**Step 2: Run all DataSync tests** + +Run: `dotnet test NEW/tests/JdeScoping.DataSync.Tests --verbosity normal` +Expected: All tests pass + +**Step 3: Run all solution tests** + +Run: `dotnet test NEW/JdeScoping.sln --verbosity normal` +Expected: All tests pass + +**Step 4: Commit final verification** + +```bash +git add -A +git commit -m "test: verify all Phase 2 ETL tests pass" +``` + +--- + +## Task 12: Final Review with Codex MCP + +**Step 1: Run Codex MCP review** + +Consult Codex MCP for final code review of all Phase 2 changes. + +**Step 2: Address any findings** + +Fix any issues identified by the review. + +**Step 3: Final commit** + +```bash +git commit -m "refactor: address Codex MCP review findings for Phase 2" +```