fix(etl): address Codex MCP review findings for Phase 2

- Filter MERGE SQL columns to only include columns that exist in destination
  (allColumns and updateColumns were using unfiltered source columns)
- Fix schema-qualified table names to use proper [schema].[table] format
  instead of wrapping entire name in single brackets
- Add empty column mapping validation to throw early if no columns intersect
- Add JdeDateTransformer output column collision detection in OnInitialize
- Add TODO comment for WithCommandTimeout (stored but not yet passed to
  destinations)
- Add tests for FormatQualifiedTableName and output column collision
This commit is contained in:
Joseph Doherty
2026-01-03 11:27:07 -05:00
parent fcd8b660fa
commit 7dcbacd5ca
7 changed files with 111 additions and 8 deletions
@@ -62,9 +62,10 @@ public class DbBulkImportDestination : IImportDestination
await using var connection = await _connectionFactory.CreateLotFinderConnectionAsync(cancellationToken);
// Truncate destination table
var qualifiedName = CommonScripts.FormatQualifiedTableName(_tableName);
await using (var truncateCmd = connection.CreateCommand())
{
truncateCmd.CommandText = $"TRUNCATE TABLE [{_tableName}]";
truncateCmd.CommandText = $"TRUNCATE TABLE {qualifiedName}";
truncateCmd.CommandTimeout = _commandTimeoutSeconds;
await truncateCmd.ExecuteNonQueryAsync(cancellationToken);
}
@@ -75,7 +76,7 @@ public class DbBulkImportDestination : IImportDestination
// Bulk copy data
using var bulkCopy = new SqlBulkCopy(connection)
{
DestinationTableName = $"[{_tableName}]",
DestinationTableName = qualifiedName,
BatchSize = _batchSize,
BulkCopyTimeout = _commandTimeoutSeconds,
EnableStreaming = true
@@ -91,6 +92,12 @@ public class DbBulkImportDestination : IImportDestination
}
}
// Validate that we have columns to work with
if (bulkCopy.ColumnMappings.Count == 0)
throw new InvalidOperationException(
$"No columns from source exist in destination table '{_tableName}'. " +
"Check column names match between source query and destination table.");
// Track rows via event
bulkCopy.NotifyAfter = _batchSize;
bulkCopy.SqlRowsCopied += (_, e) =>
@@ -84,14 +84,26 @@ public class DbBulkMergeDestination : IImportDestination
// Get destination columns for column mapping
var destColumns = await GetDestinationColumnsAsync(connection, cancellationToken);
// Get all column names from source
// Get all column names from source, filtered to only include columns that exist in destination
var allColumns = new List<string>();
for (int i = 0; i < source.FieldCount; i++)
allColumns.Add(source.GetName(i));
{
var colName = source.GetName(i);
if (destColumns.Contains(colName))
allColumns.Add(colName);
}
// Determine update columns (all non-match columns if not specified)
// Validate that we have columns to work with
if (allColumns.Count == 0)
throw new InvalidOperationException(
$"No columns from source exist in destination table '{_tableName}'. " +
"Check column names match between source query and destination table.");
// Determine update columns (all non-match columns if not specified), filtered to destColumns
var matchSet = new HashSet<string>(_matchColumns, StringComparer.OrdinalIgnoreCase);
var updateCols = _updateColumns ?? allColumns.Where(c => !matchSet.Contains(c)).ToArray();
var updateCols = (_updateColumns ?? allColumns.Where(c => !matchSet.Contains(c)).ToArray())
.Where(c => destColumns.Contains(c))
.ToArray();
// Build MERGE SQL
var mergeSql = BuildMergeSql(tempTableName, allColumns, updateCols);
@@ -135,7 +147,8 @@ public class DbBulkMergeDestination : IImportDestination
private async Task CreateTempTableAsync(SqlConnection connection, string tempTableName, CancellationToken ct)
{
var sql = $"SELECT TOP 0 * INTO {tempTableName} FROM [{_tableName}]";
var qualifiedName = CommonScripts.FormatQualifiedTableName(_tableName);
var sql = $"SELECT TOP 0 * INTO {tempTableName} FROM {qualifiedName}";
await using var cmd = connection.CreateCommand();
cmd.CommandText = sql;
cmd.CommandTimeout = _commandTimeoutSeconds;
@@ -199,8 +212,9 @@ public class DbBulkMergeDestination : IImportDestination
private string BuildMergeSql(string tempTableName, IReadOnlyList<string> allColumns, IReadOnlyList<string> updateColumns)
{
var qualifiedName = CommonScripts.FormatQualifiedTableName(_tableName);
var sb = new StringBuilder();
sb.AppendLine($"MERGE INTO [{_tableName}] AS target");
sb.AppendLine($"MERGE INTO {qualifiedName} AS target");
sb.AppendLine($"USING {tempTableName} AS source");
sb.Append("ON ");
sb.AppendLine(string.Join(" AND ", _matchColumns.Select(c => $"target.[{c}] = source.[{c}]")));
@@ -60,6 +60,9 @@ public class EtlPipelineBuilder
return this;
}
// TODO: Currently this timeout value is stored but not passed to destinations.
// In the future, the pipeline should pass this timeout to destinations that support it.
// For now, destinations use their own default timeout (600 seconds).
public EtlPipelineBuilder WithCommandTimeout(TimeSpan timeout)
{
if (timeout < TimeSpan.Zero || timeout > TimeSpan.FromHours(24))
@@ -18,6 +18,15 @@ public static class CommonScripts
: ("dbo", parts[0]);
}
/// <summary>
/// Formats a table name as a properly quoted [schema].[table] identifier.
/// </summary>
public static string FormatQualifiedTableName(string tableName)
{
var (schema, table) = ParseTableName(tableName);
return $"[{schema}].[{table}]";
}
public static IScriptRunner DisableIndexes(
IDbConnectionFactory factory,
string tableName,
@@ -56,6 +56,20 @@ public class JdeDateTransformer : DataTransformerBase
_dateOrdinal = source.GetOrdinal(_dateColumn);
_timeOrdinal = source.GetOrdinal(_timeColumn);
// Validate output column name doesn't conflict with existing columns
// (excluding the date and time columns which will be replaced/removed)
for (int i = 0; i < source.FieldCount; i++)
{
if (i == _dateOrdinal || i == _timeOrdinal) continue;
var existingName = source.GetName(i);
if (string.Equals(existingName, _outputColumn, StringComparison.OrdinalIgnoreCase))
{
throw new InvalidOperationException(
$"Output column name '{_outputColumn}' conflicts with existing column '{existingName}'. " +
"Choose a different output column name.");
}
}
var ordinalList = new List<int>();
var nameList = new List<string>();
_nameToOrdinal = new Dictionary<string, int>(StringComparer.OrdinalIgnoreCase);