From 7dcbacd5ca2ff5e8cb8f932a8018cc517acc7ae7 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sat, 3 Jan 2026 11:27:07 -0500 Subject: [PATCH] fix(etl): address Codex MCP review findings for Phase 2 - Filter MERGE SQL columns to only include columns that exist in destination (allColumns and updateColumns were using unfiltered source columns) - Fix schema-qualified table names to use proper [schema].[table] format instead of wrapping entire name in single brackets - Add empty column mapping validation to throw early if no columns intersect - Add JdeDateTransformer output column collision detection in OnInitialize - Add TODO comment for WithCommandTimeout (stored but not yet passed to destinations) - Add tests for FormatQualifiedTableName and output column collision --- .../Destinations/DbBulkImportDestination.cs | 11 ++++- .../Destinations/DbBulkMergeDestination.cs | 26 +++++++++--- .../Etl/Pipeline/EtlPipelineBuilder.cs | 3 ++ .../Etl/Scripts/CommonScripts.cs | 9 +++++ .../Etl/Transformers/JdeDateTransformer.cs | 14 +++++++ .../Etl/Scripts/CommonScriptsTests.cs | 16 ++++++++ .../Transformers/JdeDateTransformerTests.cs | 40 +++++++++++++++++++ 7 files changed, 111 insertions(+), 8 deletions(-) diff --git a/NEW/src/JdeScoping.DataSync/Etl/Destinations/DbBulkImportDestination.cs b/NEW/src/JdeScoping.DataSync/Etl/Destinations/DbBulkImportDestination.cs index 3efb040..311b7ef 100644 --- a/NEW/src/JdeScoping.DataSync/Etl/Destinations/DbBulkImportDestination.cs +++ b/NEW/src/JdeScoping.DataSync/Etl/Destinations/DbBulkImportDestination.cs @@ -62,9 +62,10 @@ public class DbBulkImportDestination : IImportDestination await using var connection = await _connectionFactory.CreateLotFinderConnectionAsync(cancellationToken); // Truncate destination table + var qualifiedName = CommonScripts.FormatQualifiedTableName(_tableName); await using (var truncateCmd = connection.CreateCommand()) { - truncateCmd.CommandText = $"TRUNCATE TABLE [{_tableName}]"; + truncateCmd.CommandText = $"TRUNCATE TABLE {qualifiedName}"; truncateCmd.CommandTimeout = _commandTimeoutSeconds; await truncateCmd.ExecuteNonQueryAsync(cancellationToken); } @@ -75,7 +76,7 @@ public class DbBulkImportDestination : IImportDestination // Bulk copy data using var bulkCopy = new SqlBulkCopy(connection) { - DestinationTableName = $"[{_tableName}]", + DestinationTableName = qualifiedName, BatchSize = _batchSize, BulkCopyTimeout = _commandTimeoutSeconds, EnableStreaming = true @@ -91,6 +92,12 @@ public class DbBulkImportDestination : IImportDestination } } + // Validate that we have columns to work with + if (bulkCopy.ColumnMappings.Count == 0) + throw new InvalidOperationException( + $"No columns from source exist in destination table '{_tableName}'. " + + "Check column names match between source query and destination table."); + // Track rows via event bulkCopy.NotifyAfter = _batchSize; bulkCopy.SqlRowsCopied += (_, e) => diff --git a/NEW/src/JdeScoping.DataSync/Etl/Destinations/DbBulkMergeDestination.cs b/NEW/src/JdeScoping.DataSync/Etl/Destinations/DbBulkMergeDestination.cs index 7ff87f9..80e0ba3 100644 --- a/NEW/src/JdeScoping.DataSync/Etl/Destinations/DbBulkMergeDestination.cs +++ b/NEW/src/JdeScoping.DataSync/Etl/Destinations/DbBulkMergeDestination.cs @@ -84,14 +84,26 @@ public class DbBulkMergeDestination : IImportDestination // Get destination columns for column mapping var destColumns = await GetDestinationColumnsAsync(connection, cancellationToken); - // Get all column names from source + // Get all column names from source, filtered to only include columns that exist in destination var allColumns = new List(); for (int i = 0; i < source.FieldCount; i++) - allColumns.Add(source.GetName(i)); + { + var colName = source.GetName(i); + if (destColumns.Contains(colName)) + allColumns.Add(colName); + } - // Determine update columns (all non-match columns if not specified) + // Validate that we have columns to work with + if (allColumns.Count == 0) + throw new InvalidOperationException( + $"No columns from source exist in destination table '{_tableName}'. " + + "Check column names match between source query and destination table."); + + // Determine update columns (all non-match columns if not specified), filtered to destColumns var matchSet = new HashSet(_matchColumns, StringComparer.OrdinalIgnoreCase); - var updateCols = _updateColumns ?? allColumns.Where(c => !matchSet.Contains(c)).ToArray(); + var updateCols = (_updateColumns ?? allColumns.Where(c => !matchSet.Contains(c)).ToArray()) + .Where(c => destColumns.Contains(c)) + .ToArray(); // Build MERGE SQL var mergeSql = BuildMergeSql(tempTableName, allColumns, updateCols); @@ -135,7 +147,8 @@ public class DbBulkMergeDestination : IImportDestination private async Task CreateTempTableAsync(SqlConnection connection, string tempTableName, CancellationToken ct) { - var sql = $"SELECT TOP 0 * INTO {tempTableName} FROM [{_tableName}]"; + var qualifiedName = CommonScripts.FormatQualifiedTableName(_tableName); + var sql = $"SELECT TOP 0 * INTO {tempTableName} FROM {qualifiedName}"; await using var cmd = connection.CreateCommand(); cmd.CommandText = sql; cmd.CommandTimeout = _commandTimeoutSeconds; @@ -199,8 +212,9 @@ public class DbBulkMergeDestination : IImportDestination private string BuildMergeSql(string tempTableName, IReadOnlyList allColumns, IReadOnlyList updateColumns) { + var qualifiedName = CommonScripts.FormatQualifiedTableName(_tableName); var sb = new StringBuilder(); - sb.AppendLine($"MERGE INTO [{_tableName}] AS target"); + sb.AppendLine($"MERGE INTO {qualifiedName} AS target"); sb.AppendLine($"USING {tempTableName} AS source"); sb.Append("ON "); sb.AppendLine(string.Join(" AND ", _matchColumns.Select(c => $"target.[{c}] = source.[{c}]"))); diff --git a/NEW/src/JdeScoping.DataSync/Etl/Pipeline/EtlPipelineBuilder.cs b/NEW/src/JdeScoping.DataSync/Etl/Pipeline/EtlPipelineBuilder.cs index 13f5a1a..0a17c9b 100644 --- a/NEW/src/JdeScoping.DataSync/Etl/Pipeline/EtlPipelineBuilder.cs +++ b/NEW/src/JdeScoping.DataSync/Etl/Pipeline/EtlPipelineBuilder.cs @@ -60,6 +60,9 @@ public class EtlPipelineBuilder return this; } + // TODO: Currently this timeout value is stored but not passed to destinations. + // In the future, the pipeline should pass this timeout to destinations that support it. + // For now, destinations use their own default timeout (600 seconds). public EtlPipelineBuilder WithCommandTimeout(TimeSpan timeout) { if (timeout < TimeSpan.Zero || timeout > TimeSpan.FromHours(24)) diff --git a/NEW/src/JdeScoping.DataSync/Etl/Scripts/CommonScripts.cs b/NEW/src/JdeScoping.DataSync/Etl/Scripts/CommonScripts.cs index 95f909d..baade44 100644 --- a/NEW/src/JdeScoping.DataSync/Etl/Scripts/CommonScripts.cs +++ b/NEW/src/JdeScoping.DataSync/Etl/Scripts/CommonScripts.cs @@ -18,6 +18,15 @@ public static class CommonScripts : ("dbo", parts[0]); } + /// + /// Formats a table name as a properly quoted [schema].[table] identifier. + /// + public static string FormatQualifiedTableName(string tableName) + { + var (schema, table) = ParseTableName(tableName); + return $"[{schema}].[{table}]"; + } + public static IScriptRunner DisableIndexes( IDbConnectionFactory factory, string tableName, diff --git a/NEW/src/JdeScoping.DataSync/Etl/Transformers/JdeDateTransformer.cs b/NEW/src/JdeScoping.DataSync/Etl/Transformers/JdeDateTransformer.cs index 2f63cd7..cfe1e52 100644 --- a/NEW/src/JdeScoping.DataSync/Etl/Transformers/JdeDateTransformer.cs +++ b/NEW/src/JdeScoping.DataSync/Etl/Transformers/JdeDateTransformer.cs @@ -56,6 +56,20 @@ public class JdeDateTransformer : DataTransformerBase _dateOrdinal = source.GetOrdinal(_dateColumn); _timeOrdinal = source.GetOrdinal(_timeColumn); + // Validate output column name doesn't conflict with existing columns + // (excluding the date and time columns which will be replaced/removed) + for (int i = 0; i < source.FieldCount; i++) + { + if (i == _dateOrdinal || i == _timeOrdinal) continue; + var existingName = source.GetName(i); + if (string.Equals(existingName, _outputColumn, StringComparison.OrdinalIgnoreCase)) + { + throw new InvalidOperationException( + $"Output column name '{_outputColumn}' conflicts with existing column '{existingName}'. " + + "Choose a different output column name."); + } + } + var ordinalList = new List(); var nameList = new List(); _nameToOrdinal = new Dictionary(StringComparer.OrdinalIgnoreCase); diff --git a/NEW/tests/JdeScoping.DataSync.Tests/Etl/Scripts/CommonScriptsTests.cs b/NEW/tests/JdeScoping.DataSync.Tests/Etl/Scripts/CommonScriptsTests.cs index 0315fca..7ad49e5 100644 --- a/NEW/tests/JdeScoping.DataSync.Tests/Etl/Scripts/CommonScriptsTests.cs +++ b/NEW/tests/JdeScoping.DataSync.Tests/Etl/Scripts/CommonScriptsTests.cs @@ -98,4 +98,20 @@ public class CommonScriptsTests Assert.Equal("TimeoutScript", runner.ScriptName); Assert.NotNull(runner); } + + [Theory] + [InlineData("WorkOrder", "[dbo].[WorkOrder]")] + [InlineData("dbo.WorkOrder", "[dbo].[WorkOrder]")] + [InlineData("[dbo].[WorkOrder]", "[dbo].[WorkOrder]")] + [InlineData("Config.Settings", "[Config].[Settings]")] + [InlineData("[Config].[Settings]", "[Config].[Settings]")] + [InlineData("myschema.MyTable", "[myschema].[MyTable]")] + public void FormatQualifiedTableName_VariousFormats_FormatsCorrectly(string input, string expected) + { + // Act + var result = CommonScripts.FormatQualifiedTableName(input); + + // Assert + Assert.Equal(expected, result); + } } diff --git a/NEW/tests/JdeScoping.DataSync.Tests/Etl/Transformers/JdeDateTransformerTests.cs b/NEW/tests/JdeScoping.DataSync.Tests/Etl/Transformers/JdeDateTransformerTests.cs index 82178da..516966a 100644 --- a/NEW/tests/JdeScoping.DataSync.Tests/Etl/Transformers/JdeDateTransformerTests.cs +++ b/NEW/tests/JdeScoping.DataSync.Tests/Etl/Transformers/JdeDateTransformerTests.cs @@ -240,6 +240,46 @@ public class JdeDateTransformerTests Assert.Equal(sentinel, result); } + [Fact] + public void OutputColumnConflictsWithExistingColumn_ThrowsInvalidOperationException() + { + // Arrange - "Name" column exists and we try to use it as output + var source = CreateMockReader(new[] { "Id", "UPMJ", "TDAY", "Name" }, new object[] { 1, 124001m, 120000m, "Test" }); + var transformer = new JdeDateTransformer("UPMJ", "TDAY", "Name"); // "Name" conflicts + + // Act & Assert + var ex = Assert.Throws(() => transformer.Transform(source)); + Assert.Contains("conflicts with existing column", ex.Message); + } + + [Fact] + public void OutputColumnMatchesDateColumn_Succeeds() + { + // Arrange - output column can match the date column name (it replaces it) + var source = CreateMockReader(new[] { "Id", "UPMJ", "TDAY", "Name" }, new object[] { 1, 124001m, 120000m, "Test" }); + var transformer = new JdeDateTransformer("UPMJ", "TDAY", "UPMJ"); // Same as date column + + // Act - should not throw + var reader = transformer.Transform(source); + + // Assert + Assert.Equal("UPMJ", reader.GetName(1)); // Still named UPMJ + } + + [Fact] + public void OutputColumnMatchesTimeColumn_Succeeds() + { + // Arrange - output column can match the time column name (it's removed anyway) + var source = CreateMockReader(new[] { "Id", "UPMJ", "TDAY", "Name" }, new object[] { 1, 124001m, 120000m, "Test" }); + var transformer = new JdeDateTransformer("UPMJ", "TDAY", "TDAY"); // Same as time column + + // Act - should not throw + var reader = transformer.Transform(source); + + // Assert + Assert.Equal(3, reader.FieldCount); // One column removed + } + private static IDataReader CreateMockReader(string[] columns, object[] values) { var reader = Substitute.For();