From 644e884b21062fe115b3b38a4b25f9afe3d19edd Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sat, 3 Jan 2026 09:26:43 -0500 Subject: [PATCH] feat(etl): implement DbBulkMergeDestination for incremental updates --- .../Destinations/DbBulkMergeDestination.cs | 204 ++++++++++++++++++ .../DbBulkMergeDestinationTests.cs | 65 ++++++ 2 files changed, 269 insertions(+) create mode 100644 NEW/src/JdeScoping.DataSync/Etl/Destinations/DbBulkMergeDestination.cs create mode 100644 NEW/tests/JdeScoping.DataSync.Tests/Etl/Destinations/DbBulkMergeDestinationTests.cs diff --git a/NEW/src/JdeScoping.DataSync/Etl/Destinations/DbBulkMergeDestination.cs b/NEW/src/JdeScoping.DataSync/Etl/Destinations/DbBulkMergeDestination.cs new file mode 100644 index 0000000..c4a0984 --- /dev/null +++ b/NEW/src/JdeScoping.DataSync/Etl/Destinations/DbBulkMergeDestination.cs @@ -0,0 +1,204 @@ +using System.Data; +using System.Diagnostics; +using System.Text; +using JdeScoping.DataAccess.Interfaces; +using JdeScoping.DataSync.Etl.Contracts; +using JdeScoping.DataSync.Etl.Results; +using Microsoft.Data.SqlClient; + +namespace JdeScoping.DataSync.Etl.Destinations; + +/// +/// Imports data into a SQL Server table using bulk copy to a temp table followed by MERGE. +/// This approach supports incremental updates by matching on key columns and updating +/// existing rows or inserting new ones. +/// +public class DbBulkMergeDestination : IImportDestination +{ + private const int DefaultBatchSize = 10000; + + private readonly IDbConnectionFactory _connectionFactory; + private readonly string _tableName; + private readonly string[] _matchColumns; + private readonly string[]? _updateColumns; + private readonly int _batchSize; + + /// + public string DestinationName => $"BulkMerge:{_tableName}"; + + /// + /// Creates a new bulk merge destination for the specified table. + /// + /// Factory to create database connections. + /// Name of the destination table. + /// Columns to match on for determining existing rows (key columns). + /// Columns to update when a row matches. If null, all non-match columns are updated. + /// Number of rows per batch. 0 uses the default (10000). + public DbBulkMergeDestination( + IDbConnectionFactory connectionFactory, + string tableName, + string[] matchColumns, + string[]? updateColumns = null, + int batchSize = 0) + { + ArgumentNullException.ThrowIfNull(connectionFactory); + ArgumentException.ThrowIfNullOrWhiteSpace(tableName); + ArgumentNullException.ThrowIfNull(matchColumns); + if (matchColumns.Length == 0) + throw new ArgumentException("At least one match column is required.", nameof(matchColumns)); + + _connectionFactory = connectionFactory; + _tableName = tableName; + _matchColumns = matchColumns; + _updateColumns = updateColumns; + _batchSize = batchSize > 0 ? batchSize : DefaultBatchSize; + } + + /// + public async Task WriteAsync( + IDataReader source, + CancellationToken cancellationToken = default) + { + ArgumentNullException.ThrowIfNull(source); + + var stopwatch = Stopwatch.StartNew(); + long totalRows = 0; + int batchCount = 0; + + var tempTableName = $"#ETL_{_tableName.Replace(".", "_").Replace("[", "").Replace("]", "")}"; + + await using var connection = await _connectionFactory.CreateLotFinderConnectionAsync(cancellationToken); + + try + { + // Create temp table from destination schema + await CreateTempTableAsync(connection, tempTableName, cancellationToken); + + // Get all column names from source + var allColumns = new List(); + for (int i = 0; i < source.FieldCount; i++) + allColumns.Add(source.GetName(i)); + + // Determine update columns (all non-match columns if not specified) + var matchSet = new HashSet(_matchColumns, StringComparer.OrdinalIgnoreCase); + var updateCols = _updateColumns ?? allColumns.Where(c => !matchSet.Contains(c)).ToArray(); + + // Build MERGE SQL + var mergeSql = BuildMergeSql(tempTableName, allColumns, updateCols); + + // Process in batches using DataTable buffer + var batch = new DataTable(); + SetupDataTable(batch, source); + + while (source.Read()) + { + var row = batch.NewRow(); + for (int i = 0; i < source.FieldCount; i++) + row[i] = source.GetValue(i); + batch.Rows.Add(row); + + if (batch.Rows.Count >= _batchSize) + { + batchCount++; + await ProcessBatchAsync(connection, batch, tempTableName, mergeSql, cancellationToken); + totalRows += batch.Rows.Count; + batch.Clear(); + } + } + + // Process remaining rows + if (batch.Rows.Count > 0) + { + batchCount++; + await ProcessBatchAsync(connection, batch, tempTableName, mergeSql, cancellationToken); + totalRows += batch.Rows.Count; + } + + stopwatch.Stop(); + return new DestinationResult(totalRows, batchCount, stopwatch.Elapsed); + } + finally + { + await DropTempTableAsync(connection, tempTableName); + } + } + + private async Task CreateTempTableAsync(SqlConnection connection, string tempTableName, CancellationToken ct) + { + var sql = $"SELECT TOP 0 * INTO {tempTableName} FROM [{_tableName}]"; + await using var cmd = connection.CreateCommand(); + cmd.CommandText = sql; + await cmd.ExecuteNonQueryAsync(ct); + } + + private async Task DropTempTableAsync(SqlConnection connection, string tempTableName) + { + try + { + var sql = $"IF OBJECT_ID('tempdb..{tempTableName}') IS NOT NULL DROP TABLE {tempTableName}"; + await using var cmd = connection.CreateCommand(); + cmd.CommandText = sql; + await cmd.ExecuteNonQueryAsync(); + } + catch + { + // Ignore cleanup errors + } + } + + private async Task ProcessBatchAsync( + SqlConnection connection, + DataTable batch, + string tempTableName, + string mergeSql, + CancellationToken ct) + { + // Bulk copy to temp table + using var bulkCopy = new SqlBulkCopy(connection) + { + DestinationTableName = tempTableName, + BatchSize = batch.Rows.Count + }; + await bulkCopy.WriteToServerAsync(batch, ct); + + // Execute MERGE + await using var cmd = connection.CreateCommand(); + cmd.CommandText = mergeSql; + await cmd.ExecuteNonQueryAsync(ct); + + // Truncate temp table for next batch + cmd.CommandText = $"TRUNCATE TABLE {tempTableName}"; + await cmd.ExecuteNonQueryAsync(ct); + } + + private string BuildMergeSql(string tempTableName, IReadOnlyList allColumns, IReadOnlyList updateColumns) + { + var sb = new StringBuilder(); + sb.AppendLine($"MERGE INTO [{_tableName}] AS target"); + sb.AppendLine($"USING {tempTableName} AS source"); + sb.Append("ON "); + sb.AppendLine(string.Join(" AND ", _matchColumns.Select(c => $"target.[{c}] = source.[{c}]"))); + + if (updateColumns.Count > 0) + { + sb.AppendLine("WHEN MATCHED THEN UPDATE SET"); + sb.AppendLine(string.Join(", ", updateColumns.Select(c => $"target.[{c}] = source.[{c}]"))); + } + + sb.AppendLine("WHEN NOT MATCHED THEN INSERT"); + sb.AppendLine($"({string.Join(", ", allColumns.Select(c => $"[{c}]"))})"); + sb.AppendLine($"VALUES ({string.Join(", ", allColumns.Select(c => $"source.[{c}]"))});"); + + return sb.ToString(); + } + + private static void SetupDataTable(DataTable table, IDataReader source) + { + for (int i = 0; i < source.FieldCount; i++) + { + var type = source.GetFieldType(i); + var baseType = Nullable.GetUnderlyingType(type) ?? type; + table.Columns.Add(source.GetName(i), baseType); + } + } +} diff --git a/NEW/tests/JdeScoping.DataSync.Tests/Etl/Destinations/DbBulkMergeDestinationTests.cs b/NEW/tests/JdeScoping.DataSync.Tests/Etl/Destinations/DbBulkMergeDestinationTests.cs new file mode 100644 index 0000000..aea5813 --- /dev/null +++ b/NEW/tests/JdeScoping.DataSync.Tests/Etl/Destinations/DbBulkMergeDestinationTests.cs @@ -0,0 +1,65 @@ +using JdeScoping.DataAccess.Interfaces; +using JdeScoping.DataSync.Etl.Destinations; +using NSubstitute; + +namespace JdeScoping.DataSync.Tests.Etl.Destinations; + +public class DbBulkMergeDestinationTests +{ + [Fact] + public void Constructor_SetsDestinationName() + { + var factory = Substitute.For(); + var dest = new DbBulkMergeDestination(factory, "WorkOrder", new[] { "OrderNumber" }); + Assert.Equal("BulkMerge:WorkOrder", dest.DestinationName); + } + + [Fact] + public void Constructor_NullFactory_ThrowsArgumentNullException() + { + Assert.Throws(() => + new DbBulkMergeDestination(null!, "WorkOrder", new[] { "Id" })); + } + + [Fact] + public void Constructor_NullTableName_ThrowsArgumentNullException() + { + var factory = Substitute.For(); + Assert.Throws(() => + new DbBulkMergeDestination(factory, null!, new[] { "Id" })); + } + + [Fact] + public void Constructor_EmptyTableName_ThrowsArgumentException() + { + var factory = Substitute.For(); + Assert.Throws(() => + new DbBulkMergeDestination(factory, "", new[] { "Id" })); + } + + [Fact] + public void Constructor_EmptyMatchColumns_ThrowsArgumentException() + { + var factory = Substitute.For(); + Assert.Throws(() => + new DbBulkMergeDestination(factory, "WorkOrder", Array.Empty())); + } + + [Fact] + public void Constructor_NullMatchColumns_ThrowsArgumentNullException() + { + var factory = Substitute.For(); + Assert.Throws(() => + new DbBulkMergeDestination(factory, "WorkOrder", null!)); + } + + [Fact] + public void Constructor_WithUpdateColumns_Succeeds() + { + var factory = Substitute.For(); + var dest = new DbBulkMergeDestination(factory, "WorkOrder", + new[] { "OrderNumber" }, + updateColumns: new[] { "Status", "Description" }); + Assert.Equal("BulkMerge:WorkOrder", dest.DestinationName); + } +}