feat(etl): add column mapping to destinations (intersect with dest schema)

This commit is contained in:
Joseph Doherty
2026-01-03 11:06:38 -05:00
parent 0b317c1ffc
commit 3145fca371
4 changed files with 101 additions and 4 deletions
@@ -1,8 +1,10 @@
using System.Data; using System.Data;
using System.Diagnostics; using System.Diagnostics;
using Dapper;
using JdeScoping.DataAccess.Interfaces; using JdeScoping.DataAccess.Interfaces;
using JdeScoping.DataSync.Etl.Contracts; using JdeScoping.DataSync.Etl.Contracts;
using JdeScoping.DataSync.Etl.Results; using JdeScoping.DataSync.Etl.Results;
using JdeScoping.DataSync.Etl.Scripts;
using Microsoft.Data.SqlClient; using Microsoft.Data.SqlClient;
namespace JdeScoping.DataSync.Etl.Destinations; namespace JdeScoping.DataSync.Etl.Destinations;
@@ -67,6 +69,9 @@ public class DbBulkImportDestination : IImportDestination
await truncateCmd.ExecuteNonQueryAsync(cancellationToken); await truncateCmd.ExecuteNonQueryAsync(cancellationToken);
} }
// Get destination columns for column mapping
var destColumns = await GetDestinationColumnsAsync(connection, cancellationToken);
// Bulk copy data // Bulk copy data
using var bulkCopy = new SqlBulkCopy(connection) using var bulkCopy = new SqlBulkCopy(connection)
{ {
@@ -76,10 +81,14 @@ public class DbBulkImportDestination : IImportDestination
EnableStreaming = true EnableStreaming = true
}; };
// Map columns by name // Map only columns that exist in destination
for (int i = 0; i < source.FieldCount; i++) for (int i = 0; i < source.FieldCount; i++)
{ {
bulkCopy.ColumnMappings.Add(source.GetName(i), source.GetName(i)); var columnName = source.GetName(i);
if (destColumns.Contains(columnName))
{
bulkCopy.ColumnMappings.Add(columnName, columnName);
}
} }
// Track rows via event // Track rows via event
@@ -102,4 +111,17 @@ public class DbBulkImportDestination : IImportDestination
stopwatch.Stop(); stopwatch.Stop();
return new DestinationResult(totalRows, batchCount, stopwatch.Elapsed); return new DestinationResult(totalRows, batchCount, stopwatch.Elapsed);
} }
private async Task<HashSet<string>> GetDestinationColumnsAsync(
SqlConnection connection,
CancellationToken ct)
{
var (schema, table) = CommonScripts.ParseTableName(_tableName);
var sql = @"SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_NAME = @tableName AND TABLE_SCHEMA = @schemaName";
var columns = await connection.QueryAsync<string>(
new CommandDefinition(sql, new { tableName = table, schemaName = schema },
commandTimeout: _commandTimeoutSeconds, cancellationToken: ct));
return columns.ToHashSet(StringComparer.OrdinalIgnoreCase);
}
} }
@@ -1,9 +1,11 @@
using System.Data; using System.Data;
using System.Diagnostics; using System.Diagnostics;
using System.Text; using System.Text;
using Dapper;
using JdeScoping.DataAccess.Interfaces; using JdeScoping.DataAccess.Interfaces;
using JdeScoping.DataSync.Etl.Contracts; using JdeScoping.DataSync.Etl.Contracts;
using JdeScoping.DataSync.Etl.Results; using JdeScoping.DataSync.Etl.Results;
using JdeScoping.DataSync.Etl.Scripts;
using Microsoft.Data.SqlClient; using Microsoft.Data.SqlClient;
namespace JdeScoping.DataSync.Etl.Destinations; namespace JdeScoping.DataSync.Etl.Destinations;
@@ -79,6 +81,9 @@ public class DbBulkMergeDestination : IImportDestination
// Create temp table from destination schema // Create temp table from destination schema
await CreateTempTableAsync(connection, tempTableName, cancellationToken); await CreateTempTableAsync(connection, tempTableName, cancellationToken);
// Get destination columns for column mapping
var destColumns = await GetDestinationColumnsAsync(connection, cancellationToken);
// Get all column names from source // Get all column names from source
var allColumns = new List<string>(); var allColumns = new List<string>();
for (int i = 0; i < source.FieldCount; i++) for (int i = 0; i < source.FieldCount; i++)
@@ -105,7 +110,7 @@ public class DbBulkMergeDestination : IImportDestination
if (batch.Rows.Count >= _batchSize) if (batch.Rows.Count >= _batchSize)
{ {
batchCount++; batchCount++;
await ProcessBatchAsync(connection, batch, tempTableName, mergeSql, cancellationToken); await ProcessBatchAsync(connection, batch, tempTableName, mergeSql, destColumns, cancellationToken);
totalRows += batch.Rows.Count; totalRows += batch.Rows.Count;
batch.Clear(); batch.Clear();
} }
@@ -115,7 +120,7 @@ public class DbBulkMergeDestination : IImportDestination
if (batch.Rows.Count > 0) if (batch.Rows.Count > 0)
{ {
batchCount++; batchCount++;
await ProcessBatchAsync(connection, batch, tempTableName, mergeSql, cancellationToken); await ProcessBatchAsync(connection, batch, tempTableName, mergeSql, destColumns, cancellationToken);
totalRows += batch.Rows.Count; totalRows += batch.Rows.Count;
} }
@@ -158,6 +163,7 @@ public class DbBulkMergeDestination : IImportDestination
DataTable batch, DataTable batch,
string tempTableName, string tempTableName,
string mergeSql, string mergeSql,
HashSet<string> destColumns,
CancellationToken ct) CancellationToken ct)
{ {
// Bulk copy to temp table // Bulk copy to temp table
@@ -167,6 +173,16 @@ public class DbBulkMergeDestination : IImportDestination
BatchSize = batch.Rows.Count, BatchSize = batch.Rows.Count,
BulkCopyTimeout = _commandTimeoutSeconds BulkCopyTimeout = _commandTimeoutSeconds
}; };
// Map only columns that exist in destination
foreach (DataColumn col in batch.Columns)
{
if (destColumns.Contains(col.ColumnName))
{
bulkCopy.ColumnMappings.Add(col.ColumnName, col.ColumnName);
}
}
await bulkCopy.WriteToServerAsync(batch, ct); await bulkCopy.WriteToServerAsync(batch, ct);
// Execute MERGE // Execute MERGE
@@ -211,4 +227,17 @@ public class DbBulkMergeDestination : IImportDestination
table.Columns.Add(source.GetName(i), baseType); table.Columns.Add(source.GetName(i), baseType);
} }
} }
private async Task<HashSet<string>> GetDestinationColumnsAsync(
SqlConnection connection,
CancellationToken ct)
{
var (schema, table) = CommonScripts.ParseTableName(_tableName);
var sql = @"SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_NAME = @tableName AND TABLE_SCHEMA = @schemaName";
var columns = await connection.QueryAsync<string>(
new CommandDefinition(sql, new { tableName = table, schemaName = schema },
commandTimeout: _commandTimeoutSeconds, cancellationToken: ct));
return columns.ToHashSet(StringComparer.OrdinalIgnoreCase);
}
} }
@@ -1,3 +1,4 @@
using System.Data;
using JdeScoping.DataAccess.Interfaces; using JdeScoping.DataAccess.Interfaces;
using JdeScoping.DataSync.Etl.Destinations; using JdeScoping.DataSync.Etl.Destinations;
using NSubstitute; using NSubstitute;
@@ -6,6 +7,28 @@ namespace JdeScoping.DataSync.Tests.Etl.Destinations;
public class DbBulkImportDestinationTests public class DbBulkImportDestinationTests
{ {
/// <summary>
/// This test documents that column mapping is applied to ignore extra source columns.
/// The actual functionality requires a database connection and is an integration test concept.
/// The implementation fetches destination columns from INFORMATION_SCHEMA.COLUMNS
/// and only maps columns that exist in both source and destination.
/// </summary>
[Fact]
public void WriteAsync_SourceHasExtraColumns_IgnoresExtraColumns_IntegrationTestConcept()
{
// This is an integration test concept -
// The actual behavior verifies that column mappings are applied:
// 1. GetDestinationColumnsAsync fetches columns from INFORMATION_SCHEMA.COLUMNS
// 2. Column mappings only added for columns in destination (case-insensitive)
// 3. Extra source columns are silently ignored during bulk copy
//
// To test this fully, an integration test with a real database is required.
// The unit test here just verifies the component can be constructed.
var factory = Substitute.For<IDbConnectionFactory>();
var dest = new DbBulkImportDestination(factory, "TestTable");
Assert.NotNull(dest);
}
[Fact] [Fact]
public void Constructor_SetsDestinationName() public void Constructor_SetsDestinationName()
{ {
@@ -1,3 +1,4 @@
using System.Data;
using JdeScoping.DataAccess.Interfaces; using JdeScoping.DataAccess.Interfaces;
using JdeScoping.DataSync.Etl.Destinations; using JdeScoping.DataSync.Etl.Destinations;
using NSubstitute; using NSubstitute;
@@ -6,6 +7,28 @@ namespace JdeScoping.DataSync.Tests.Etl.Destinations;
public class DbBulkMergeDestinationTests public class DbBulkMergeDestinationTests
{ {
/// <summary>
/// This test documents that column mapping is applied to ignore extra source columns.
/// The actual functionality requires a database connection and is an integration test concept.
/// The implementation fetches destination columns from INFORMATION_SCHEMA.COLUMNS
/// and only maps columns that exist in both source and destination.
/// </summary>
[Fact]
public void WriteAsync_SourceHasExtraColumns_IgnoresExtraColumns_IntegrationTestConcept()
{
// This is an integration test concept -
// The actual behavior verifies that column mappings are applied:
// 1. GetDestinationColumnsAsync fetches columns from INFORMATION_SCHEMA.COLUMNS
// 2. ProcessBatchAsync only adds column mappings for columns in destination
// 3. Extra source columns are silently ignored during bulk copy
//
// To test this fully, an integration test with a real database is required.
// The unit test here just verifies the component can be constructed.
var factory = Substitute.For<IDbConnectionFactory>();
var dest = new DbBulkMergeDestination(factory, "TestTable", new[] { "Id" });
Assert.NotNull(dest);
}
[Fact] [Fact]
public void Constructor_SetsDestinationName() public void Constructor_SetsDestinationName()
{ {