From 3145fca371b5cb38191d9dc34ba98c680d287b7c Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sat, 3 Jan 2026 11:06:38 -0500 Subject: [PATCH] feat(etl): add column mapping to destinations (intersect with dest schema) --- .../Destinations/DbBulkImportDestination.cs | 26 +++++++++++++-- .../Destinations/DbBulkMergeDestination.cs | 33 +++++++++++++++++-- .../DbBulkImportDestinationTests.cs | 23 +++++++++++++ .../DbBulkMergeDestinationTests.cs | 23 +++++++++++++ 4 files changed, 101 insertions(+), 4 deletions(-) diff --git a/NEW/src/JdeScoping.DataSync/Etl/Destinations/DbBulkImportDestination.cs b/NEW/src/JdeScoping.DataSync/Etl/Destinations/DbBulkImportDestination.cs index 45b7548..3efb040 100644 --- a/NEW/src/JdeScoping.DataSync/Etl/Destinations/DbBulkImportDestination.cs +++ b/NEW/src/JdeScoping.DataSync/Etl/Destinations/DbBulkImportDestination.cs @@ -1,8 +1,10 @@ using System.Data; using System.Diagnostics; +using Dapper; using JdeScoping.DataAccess.Interfaces; using JdeScoping.DataSync.Etl.Contracts; using JdeScoping.DataSync.Etl.Results; +using JdeScoping.DataSync.Etl.Scripts; using Microsoft.Data.SqlClient; namespace JdeScoping.DataSync.Etl.Destinations; @@ -67,6 +69,9 @@ public class DbBulkImportDestination : IImportDestination await truncateCmd.ExecuteNonQueryAsync(cancellationToken); } + // Get destination columns for column mapping + var destColumns = await GetDestinationColumnsAsync(connection, cancellationToken); + // Bulk copy data using var bulkCopy = new SqlBulkCopy(connection) { @@ -76,10 +81,14 @@ public class DbBulkImportDestination : IImportDestination EnableStreaming = true }; - // Map columns by name + // Map only columns that exist in destination for (int i = 0; i < source.FieldCount; i++) { - bulkCopy.ColumnMappings.Add(source.GetName(i), source.GetName(i)); + var columnName = source.GetName(i); + if (destColumns.Contains(columnName)) + { + bulkCopy.ColumnMappings.Add(columnName, columnName); + } } // Track rows via event @@ -102,4 +111,17 @@ public class DbBulkImportDestination : IImportDestination stopwatch.Stop(); return new DestinationResult(totalRows, batchCount, stopwatch.Elapsed); } + + private async Task> GetDestinationColumnsAsync( + SqlConnection connection, + CancellationToken ct) + { + var (schema, table) = CommonScripts.ParseTableName(_tableName); + var sql = @"SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.COLUMNS + WHERE TABLE_NAME = @tableName AND TABLE_SCHEMA = @schemaName"; + var columns = await connection.QueryAsync( + new CommandDefinition(sql, new { tableName = table, schemaName = schema }, + commandTimeout: _commandTimeoutSeconds, cancellationToken: ct)); + return columns.ToHashSet(StringComparer.OrdinalIgnoreCase); + } } diff --git a/NEW/src/JdeScoping.DataSync/Etl/Destinations/DbBulkMergeDestination.cs b/NEW/src/JdeScoping.DataSync/Etl/Destinations/DbBulkMergeDestination.cs index 9c79f54..7ff87f9 100644 --- a/NEW/src/JdeScoping.DataSync/Etl/Destinations/DbBulkMergeDestination.cs +++ b/NEW/src/JdeScoping.DataSync/Etl/Destinations/DbBulkMergeDestination.cs @@ -1,9 +1,11 @@ using System.Data; using System.Diagnostics; using System.Text; +using Dapper; using JdeScoping.DataAccess.Interfaces; using JdeScoping.DataSync.Etl.Contracts; using JdeScoping.DataSync.Etl.Results; +using JdeScoping.DataSync.Etl.Scripts; using Microsoft.Data.SqlClient; namespace JdeScoping.DataSync.Etl.Destinations; @@ -79,6 +81,9 @@ public class DbBulkMergeDestination : IImportDestination // Create temp table from destination schema await CreateTempTableAsync(connection, tempTableName, cancellationToken); + // Get destination columns for column mapping + var destColumns = await GetDestinationColumnsAsync(connection, cancellationToken); + // Get all column names from source var allColumns = new List(); for (int i = 0; i < source.FieldCount; i++) @@ -105,7 +110,7 @@ public class DbBulkMergeDestination : IImportDestination if (batch.Rows.Count >= _batchSize) { batchCount++; - await ProcessBatchAsync(connection, batch, tempTableName, mergeSql, cancellationToken); + await ProcessBatchAsync(connection, batch, tempTableName, mergeSql, destColumns, cancellationToken); totalRows += batch.Rows.Count; batch.Clear(); } @@ -115,7 +120,7 @@ public class DbBulkMergeDestination : IImportDestination if (batch.Rows.Count > 0) { batchCount++; - await ProcessBatchAsync(connection, batch, tempTableName, mergeSql, cancellationToken); + await ProcessBatchAsync(connection, batch, tempTableName, mergeSql, destColumns, cancellationToken); totalRows += batch.Rows.Count; } @@ -158,6 +163,7 @@ public class DbBulkMergeDestination : IImportDestination DataTable batch, string tempTableName, string mergeSql, + HashSet destColumns, CancellationToken ct) { // Bulk copy to temp table @@ -167,6 +173,16 @@ public class DbBulkMergeDestination : IImportDestination BatchSize = batch.Rows.Count, BulkCopyTimeout = _commandTimeoutSeconds }; + + // Map only columns that exist in destination + foreach (DataColumn col in batch.Columns) + { + if (destColumns.Contains(col.ColumnName)) + { + bulkCopy.ColumnMappings.Add(col.ColumnName, col.ColumnName); + } + } + await bulkCopy.WriteToServerAsync(batch, ct); // Execute MERGE @@ -211,4 +227,17 @@ public class DbBulkMergeDestination : IImportDestination table.Columns.Add(source.GetName(i), baseType); } } + + private async Task> GetDestinationColumnsAsync( + SqlConnection connection, + CancellationToken ct) + { + var (schema, table) = CommonScripts.ParseTableName(_tableName); + var sql = @"SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.COLUMNS + WHERE TABLE_NAME = @tableName AND TABLE_SCHEMA = @schemaName"; + var columns = await connection.QueryAsync( + new CommandDefinition(sql, new { tableName = table, schemaName = schema }, + commandTimeout: _commandTimeoutSeconds, cancellationToken: ct)); + return columns.ToHashSet(StringComparer.OrdinalIgnoreCase); + } } diff --git a/NEW/tests/JdeScoping.DataSync.Tests/Etl/Destinations/DbBulkImportDestinationTests.cs b/NEW/tests/JdeScoping.DataSync.Tests/Etl/Destinations/DbBulkImportDestinationTests.cs index b2514b0..b852a57 100644 --- a/NEW/tests/JdeScoping.DataSync.Tests/Etl/Destinations/DbBulkImportDestinationTests.cs +++ b/NEW/tests/JdeScoping.DataSync.Tests/Etl/Destinations/DbBulkImportDestinationTests.cs @@ -1,3 +1,4 @@ +using System.Data; using JdeScoping.DataAccess.Interfaces; using JdeScoping.DataSync.Etl.Destinations; using NSubstitute; @@ -6,6 +7,28 @@ namespace JdeScoping.DataSync.Tests.Etl.Destinations; public class DbBulkImportDestinationTests { + /// + /// This test documents that column mapping is applied to ignore extra source columns. + /// The actual functionality requires a database connection and is an integration test concept. + /// The implementation fetches destination columns from INFORMATION_SCHEMA.COLUMNS + /// and only maps columns that exist in both source and destination. + /// + [Fact] + public void WriteAsync_SourceHasExtraColumns_IgnoresExtraColumns_IntegrationTestConcept() + { + // This is an integration test concept - + // The actual behavior verifies that column mappings are applied: + // 1. GetDestinationColumnsAsync fetches columns from INFORMATION_SCHEMA.COLUMNS + // 2. Column mappings only added for columns in destination (case-insensitive) + // 3. Extra source columns are silently ignored during bulk copy + // + // To test this fully, an integration test with a real database is required. + // The unit test here just verifies the component can be constructed. + var factory = Substitute.For(); + var dest = new DbBulkImportDestination(factory, "TestTable"); + Assert.NotNull(dest); + } + [Fact] public void Constructor_SetsDestinationName() { diff --git a/NEW/tests/JdeScoping.DataSync.Tests/Etl/Destinations/DbBulkMergeDestinationTests.cs b/NEW/tests/JdeScoping.DataSync.Tests/Etl/Destinations/DbBulkMergeDestinationTests.cs index 03df554..7e7a202 100644 --- a/NEW/tests/JdeScoping.DataSync.Tests/Etl/Destinations/DbBulkMergeDestinationTests.cs +++ b/NEW/tests/JdeScoping.DataSync.Tests/Etl/Destinations/DbBulkMergeDestinationTests.cs @@ -1,3 +1,4 @@ +using System.Data; using JdeScoping.DataAccess.Interfaces; using JdeScoping.DataSync.Etl.Destinations; using NSubstitute; @@ -6,6 +7,28 @@ namespace JdeScoping.DataSync.Tests.Etl.Destinations; public class DbBulkMergeDestinationTests { + /// + /// This test documents that column mapping is applied to ignore extra source columns. + /// The actual functionality requires a database connection and is an integration test concept. + /// The implementation fetches destination columns from INFORMATION_SCHEMA.COLUMNS + /// and only maps columns that exist in both source and destination. + /// + [Fact] + public void WriteAsync_SourceHasExtraColumns_IgnoresExtraColumns_IntegrationTestConcept() + { + // This is an integration test concept - + // The actual behavior verifies that column mappings are applied: + // 1. GetDestinationColumnsAsync fetches columns from INFORMATION_SCHEMA.COLUMNS + // 2. ProcessBatchAsync only adds column mappings for columns in destination + // 3. Extra source columns are silently ignored during bulk copy + // + // To test this fully, an integration test with a real database is required. + // The unit test here just verifies the component can be constructed. + var factory = Substitute.For(); + var dest = new DbBulkMergeDestination(factory, "TestTable", new[] { "Id" }); + Assert.NotNull(dest); + } + [Fact] public void Constructor_SetsDestinationName() {