feat(etl): implement DbBulkMergeDestination for incremental updates

This commit is contained in:
Joseph Doherty
2026-01-03 09:26:43 -05:00
parent 63a0e7cf7e
commit 644e884b21
2 changed files with 269 additions and 0 deletions
@@ -0,0 +1,204 @@
using System.Data;
using System.Diagnostics;
using System.Text;
using JdeScoping.DataAccess.Interfaces;
using JdeScoping.DataSync.Etl.Contracts;
using JdeScoping.DataSync.Etl.Results;
using Microsoft.Data.SqlClient;
namespace JdeScoping.DataSync.Etl.Destinations;
/// <summary>
/// Imports data into a SQL Server table using bulk copy to a temp table followed by MERGE.
/// This approach supports incremental updates by matching on key columns and updating
/// existing rows or inserting new ones.
/// </summary>
public class DbBulkMergeDestination : IImportDestination
{
private const int DefaultBatchSize = 10000;
private readonly IDbConnectionFactory _connectionFactory;
private readonly string _tableName;
private readonly string[] _matchColumns;
private readonly string[]? _updateColumns;
private readonly int _batchSize;
/// <inheritdoc />
public string DestinationName => $"BulkMerge:{_tableName}";
/// <summary>
/// Creates a new bulk merge destination for the specified table.
/// </summary>
/// <param name="connectionFactory">Factory to create database connections.</param>
/// <param name="tableName">Name of the destination table.</param>
/// <param name="matchColumns">Columns to match on for determining existing rows (key columns).</param>
/// <param name="updateColumns">Columns to update when a row matches. If null, all non-match columns are updated.</param>
/// <param name="batchSize">Number of rows per batch. 0 uses the default (10000).</param>
public DbBulkMergeDestination(
IDbConnectionFactory connectionFactory,
string tableName,
string[] matchColumns,
string[]? updateColumns = null,
int batchSize = 0)
{
ArgumentNullException.ThrowIfNull(connectionFactory);
ArgumentException.ThrowIfNullOrWhiteSpace(tableName);
ArgumentNullException.ThrowIfNull(matchColumns);
if (matchColumns.Length == 0)
throw new ArgumentException("At least one match column is required.", nameof(matchColumns));
_connectionFactory = connectionFactory;
_tableName = tableName;
_matchColumns = matchColumns;
_updateColumns = updateColumns;
_batchSize = batchSize > 0 ? batchSize : DefaultBatchSize;
}
/// <inheritdoc />
public async Task<DestinationResult> WriteAsync(
IDataReader source,
CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(source);
var stopwatch = Stopwatch.StartNew();
long totalRows = 0;
int batchCount = 0;
var tempTableName = $"#ETL_{_tableName.Replace(".", "_").Replace("[", "").Replace("]", "")}";
await using var connection = await _connectionFactory.CreateLotFinderConnectionAsync(cancellationToken);
try
{
// Create temp table from destination schema
await CreateTempTableAsync(connection, tempTableName, cancellationToken);
// Get all column names from source
var allColumns = new List<string>();
for (int i = 0; i < source.FieldCount; i++)
allColumns.Add(source.GetName(i));
// Determine update columns (all non-match columns if not specified)
var matchSet = new HashSet<string>(_matchColumns, StringComparer.OrdinalIgnoreCase);
var updateCols = _updateColumns ?? allColumns.Where(c => !matchSet.Contains(c)).ToArray();
// Build MERGE SQL
var mergeSql = BuildMergeSql(tempTableName, allColumns, updateCols);
// Process in batches using DataTable buffer
var batch = new DataTable();
SetupDataTable(batch, source);
while (source.Read())
{
var row = batch.NewRow();
for (int i = 0; i < source.FieldCount; i++)
row[i] = source.GetValue(i);
batch.Rows.Add(row);
if (batch.Rows.Count >= _batchSize)
{
batchCount++;
await ProcessBatchAsync(connection, batch, tempTableName, mergeSql, cancellationToken);
totalRows += batch.Rows.Count;
batch.Clear();
}
}
// Process remaining rows
if (batch.Rows.Count > 0)
{
batchCount++;
await ProcessBatchAsync(connection, batch, tempTableName, mergeSql, cancellationToken);
totalRows += batch.Rows.Count;
}
stopwatch.Stop();
return new DestinationResult(totalRows, batchCount, stopwatch.Elapsed);
}
finally
{
await DropTempTableAsync(connection, tempTableName);
}
}
private async Task CreateTempTableAsync(SqlConnection connection, string tempTableName, CancellationToken ct)
{
var sql = $"SELECT TOP 0 * INTO {tempTableName} FROM [{_tableName}]";
await using var cmd = connection.CreateCommand();
cmd.CommandText = sql;
await cmd.ExecuteNonQueryAsync(ct);
}
private async Task DropTempTableAsync(SqlConnection connection, string tempTableName)
{
try
{
var sql = $"IF OBJECT_ID('tempdb..{tempTableName}') IS NOT NULL DROP TABLE {tempTableName}";
await using var cmd = connection.CreateCommand();
cmd.CommandText = sql;
await cmd.ExecuteNonQueryAsync();
}
catch
{
// Ignore cleanup errors
}
}
private async Task ProcessBatchAsync(
SqlConnection connection,
DataTable batch,
string tempTableName,
string mergeSql,
CancellationToken ct)
{
// Bulk copy to temp table
using var bulkCopy = new SqlBulkCopy(connection)
{
DestinationTableName = tempTableName,
BatchSize = batch.Rows.Count
};
await bulkCopy.WriteToServerAsync(batch, ct);
// Execute MERGE
await using var cmd = connection.CreateCommand();
cmd.CommandText = mergeSql;
await cmd.ExecuteNonQueryAsync(ct);
// Truncate temp table for next batch
cmd.CommandText = $"TRUNCATE TABLE {tempTableName}";
await cmd.ExecuteNonQueryAsync(ct);
}
private string BuildMergeSql(string tempTableName, IReadOnlyList<string> allColumns, IReadOnlyList<string> updateColumns)
{
var sb = new StringBuilder();
sb.AppendLine($"MERGE INTO [{_tableName}] AS target");
sb.AppendLine($"USING {tempTableName} AS source");
sb.Append("ON ");
sb.AppendLine(string.Join(" AND ", _matchColumns.Select(c => $"target.[{c}] = source.[{c}]")));
if (updateColumns.Count > 0)
{
sb.AppendLine("WHEN MATCHED THEN UPDATE SET");
sb.AppendLine(string.Join(", ", updateColumns.Select(c => $"target.[{c}] = source.[{c}]")));
}
sb.AppendLine("WHEN NOT MATCHED THEN INSERT");
sb.AppendLine($"({string.Join(", ", allColumns.Select(c => $"[{c}]"))})");
sb.AppendLine($"VALUES ({string.Join(", ", allColumns.Select(c => $"source.[{c}]"))});");
return sb.ToString();
}
private static void SetupDataTable(DataTable table, IDataReader source)
{
for (int i = 0; i < source.FieldCount; i++)
{
var type = source.GetFieldType(i);
var baseType = Nullable.GetUnderlyingType(type) ?? type;
table.Columns.Add(source.GetName(i), baseType);
}
}
}
@@ -0,0 +1,65 @@
using JdeScoping.DataAccess.Interfaces;
using JdeScoping.DataSync.Etl.Destinations;
using NSubstitute;
namespace JdeScoping.DataSync.Tests.Etl.Destinations;
public class DbBulkMergeDestinationTests
{
[Fact]
public void Constructor_SetsDestinationName()
{
var factory = Substitute.For<IDbConnectionFactory>();
var dest = new DbBulkMergeDestination(factory, "WorkOrder", new[] { "OrderNumber" });
Assert.Equal("BulkMerge:WorkOrder", dest.DestinationName);
}
[Fact]
public void Constructor_NullFactory_ThrowsArgumentNullException()
{
Assert.Throws<ArgumentNullException>(() =>
new DbBulkMergeDestination(null!, "WorkOrder", new[] { "Id" }));
}
[Fact]
public void Constructor_NullTableName_ThrowsArgumentNullException()
{
var factory = Substitute.For<IDbConnectionFactory>();
Assert.Throws<ArgumentNullException>(() =>
new DbBulkMergeDestination(factory, null!, new[] { "Id" }));
}
[Fact]
public void Constructor_EmptyTableName_ThrowsArgumentException()
{
var factory = Substitute.For<IDbConnectionFactory>();
Assert.Throws<ArgumentException>(() =>
new DbBulkMergeDestination(factory, "", new[] { "Id" }));
}
[Fact]
public void Constructor_EmptyMatchColumns_ThrowsArgumentException()
{
var factory = Substitute.For<IDbConnectionFactory>();
Assert.Throws<ArgumentException>(() =>
new DbBulkMergeDestination(factory, "WorkOrder", Array.Empty<string>()));
}
[Fact]
public void Constructor_NullMatchColumns_ThrowsArgumentNullException()
{
var factory = Substitute.For<IDbConnectionFactory>();
Assert.Throws<ArgumentNullException>(() =>
new DbBulkMergeDestination(factory, "WorkOrder", null!));
}
[Fact]
public void Constructor_WithUpdateColumns_Succeeds()
{
var factory = Substitute.For<IDbConnectionFactory>();
var dest = new DbBulkMergeDestination(factory, "WorkOrder",
new[] { "OrderNumber" },
updateColumns: new[] { "Status", "Description" });
Assert.Equal("BulkMerge:WorkOrder", dest.DestinationName);
}
}