diff --git a/NEW/src/JdeScoping.DataSync/Etl/Destinations/DbBulkMergeDestination.cs b/NEW/src/JdeScoping.DataSync/Etl/Destinations/DbBulkMergeDestination.cs
new file mode 100644
index 0000000..c4a0984
--- /dev/null
+++ b/NEW/src/JdeScoping.DataSync/Etl/Destinations/DbBulkMergeDestination.cs
@@ -0,0 +1,204 @@
+using System.Data;
+using System.Diagnostics;
+using System.Text;
+using JdeScoping.DataAccess.Interfaces;
+using JdeScoping.DataSync.Etl.Contracts;
+using JdeScoping.DataSync.Etl.Results;
+using Microsoft.Data.SqlClient;
+
+namespace JdeScoping.DataSync.Etl.Destinations;
+
+///
+/// Imports data into a SQL Server table using bulk copy to a temp table followed by MERGE.
+/// This approach supports incremental updates by matching on key columns and updating
+/// existing rows or inserting new ones.
+///
+public class DbBulkMergeDestination : IImportDestination
+{
+ private const int DefaultBatchSize = 10000;
+
+ private readonly IDbConnectionFactory _connectionFactory;
+ private readonly string _tableName;
+ private readonly string[] _matchColumns;
+ private readonly string[]? _updateColumns;
+ private readonly int _batchSize;
+
+ ///
+ public string DestinationName => $"BulkMerge:{_tableName}";
+
+ ///
+ /// Creates a new bulk merge destination for the specified table.
+ ///
+ /// Factory to create database connections.
+ /// Name of the destination table.
+ /// Columns to match on for determining existing rows (key columns).
+ /// Columns to update when a row matches. If null, all non-match columns are updated.
+ /// Number of rows per batch. 0 uses the default (10000).
+ public DbBulkMergeDestination(
+ IDbConnectionFactory connectionFactory,
+ string tableName,
+ string[] matchColumns,
+ string[]? updateColumns = null,
+ int batchSize = 0)
+ {
+ ArgumentNullException.ThrowIfNull(connectionFactory);
+ ArgumentException.ThrowIfNullOrWhiteSpace(tableName);
+ ArgumentNullException.ThrowIfNull(matchColumns);
+ if (matchColumns.Length == 0)
+ throw new ArgumentException("At least one match column is required.", nameof(matchColumns));
+
+ _connectionFactory = connectionFactory;
+ _tableName = tableName;
+ _matchColumns = matchColumns;
+ _updateColumns = updateColumns;
+ _batchSize = batchSize > 0 ? batchSize : DefaultBatchSize;
+ }
+
+ ///
+ public async Task WriteAsync(
+ IDataReader source,
+ CancellationToken cancellationToken = default)
+ {
+ ArgumentNullException.ThrowIfNull(source);
+
+ var stopwatch = Stopwatch.StartNew();
+ long totalRows = 0;
+ int batchCount = 0;
+
+ var tempTableName = $"#ETL_{_tableName.Replace(".", "_").Replace("[", "").Replace("]", "")}";
+
+ await using var connection = await _connectionFactory.CreateLotFinderConnectionAsync(cancellationToken);
+
+ try
+ {
+ // Create temp table from destination schema
+ await CreateTempTableAsync(connection, tempTableName, cancellationToken);
+
+ // Get all column names from source
+ var allColumns = new List();
+ for (int i = 0; i < source.FieldCount; i++)
+ allColumns.Add(source.GetName(i));
+
+ // Determine update columns (all non-match columns if not specified)
+ var matchSet = new HashSet(_matchColumns, StringComparer.OrdinalIgnoreCase);
+ var updateCols = _updateColumns ?? allColumns.Where(c => !matchSet.Contains(c)).ToArray();
+
+ // Build MERGE SQL
+ var mergeSql = BuildMergeSql(tempTableName, allColumns, updateCols);
+
+ // Process in batches using DataTable buffer
+ var batch = new DataTable();
+ SetupDataTable(batch, source);
+
+ while (source.Read())
+ {
+ var row = batch.NewRow();
+ for (int i = 0; i < source.FieldCount; i++)
+ row[i] = source.GetValue(i);
+ batch.Rows.Add(row);
+
+ if (batch.Rows.Count >= _batchSize)
+ {
+ batchCount++;
+ await ProcessBatchAsync(connection, batch, tempTableName, mergeSql, cancellationToken);
+ totalRows += batch.Rows.Count;
+ batch.Clear();
+ }
+ }
+
+ // Process remaining rows
+ if (batch.Rows.Count > 0)
+ {
+ batchCount++;
+ await ProcessBatchAsync(connection, batch, tempTableName, mergeSql, cancellationToken);
+ totalRows += batch.Rows.Count;
+ }
+
+ stopwatch.Stop();
+ return new DestinationResult(totalRows, batchCount, stopwatch.Elapsed);
+ }
+ finally
+ {
+ await DropTempTableAsync(connection, tempTableName);
+ }
+ }
+
+ private async Task CreateTempTableAsync(SqlConnection connection, string tempTableName, CancellationToken ct)
+ {
+ var sql = $"SELECT TOP 0 * INTO {tempTableName} FROM [{_tableName}]";
+ await using var cmd = connection.CreateCommand();
+ cmd.CommandText = sql;
+ await cmd.ExecuteNonQueryAsync(ct);
+ }
+
+ private async Task DropTempTableAsync(SqlConnection connection, string tempTableName)
+ {
+ try
+ {
+ var sql = $"IF OBJECT_ID('tempdb..{tempTableName}') IS NOT NULL DROP TABLE {tempTableName}";
+ await using var cmd = connection.CreateCommand();
+ cmd.CommandText = sql;
+ await cmd.ExecuteNonQueryAsync();
+ }
+ catch
+ {
+ // Ignore cleanup errors
+ }
+ }
+
+ private async Task ProcessBatchAsync(
+ SqlConnection connection,
+ DataTable batch,
+ string tempTableName,
+ string mergeSql,
+ CancellationToken ct)
+ {
+ // Bulk copy to temp table
+ using var bulkCopy = new SqlBulkCopy(connection)
+ {
+ DestinationTableName = tempTableName,
+ BatchSize = batch.Rows.Count
+ };
+ await bulkCopy.WriteToServerAsync(batch, ct);
+
+ // Execute MERGE
+ await using var cmd = connection.CreateCommand();
+ cmd.CommandText = mergeSql;
+ await cmd.ExecuteNonQueryAsync(ct);
+
+ // Truncate temp table for next batch
+ cmd.CommandText = $"TRUNCATE TABLE {tempTableName}";
+ await cmd.ExecuteNonQueryAsync(ct);
+ }
+
+ private string BuildMergeSql(string tempTableName, IReadOnlyList allColumns, IReadOnlyList updateColumns)
+ {
+ var sb = new StringBuilder();
+ sb.AppendLine($"MERGE INTO [{_tableName}] AS target");
+ sb.AppendLine($"USING {tempTableName} AS source");
+ sb.Append("ON ");
+ sb.AppendLine(string.Join(" AND ", _matchColumns.Select(c => $"target.[{c}] = source.[{c}]")));
+
+ if (updateColumns.Count > 0)
+ {
+ sb.AppendLine("WHEN MATCHED THEN UPDATE SET");
+ sb.AppendLine(string.Join(", ", updateColumns.Select(c => $"target.[{c}] = source.[{c}]")));
+ }
+
+ sb.AppendLine("WHEN NOT MATCHED THEN INSERT");
+ sb.AppendLine($"({string.Join(", ", allColumns.Select(c => $"[{c}]"))})");
+ sb.AppendLine($"VALUES ({string.Join(", ", allColumns.Select(c => $"source.[{c}]"))});");
+
+ return sb.ToString();
+ }
+
+ private static void SetupDataTable(DataTable table, IDataReader source)
+ {
+ for (int i = 0; i < source.FieldCount; i++)
+ {
+ var type = source.GetFieldType(i);
+ var baseType = Nullable.GetUnderlyingType(type) ?? type;
+ table.Columns.Add(source.GetName(i), baseType);
+ }
+ }
+}
diff --git a/NEW/tests/JdeScoping.DataSync.Tests/Etl/Destinations/DbBulkMergeDestinationTests.cs b/NEW/tests/JdeScoping.DataSync.Tests/Etl/Destinations/DbBulkMergeDestinationTests.cs
new file mode 100644
index 0000000..aea5813
--- /dev/null
+++ b/NEW/tests/JdeScoping.DataSync.Tests/Etl/Destinations/DbBulkMergeDestinationTests.cs
@@ -0,0 +1,65 @@
+using JdeScoping.DataAccess.Interfaces;
+using JdeScoping.DataSync.Etl.Destinations;
+using NSubstitute;
+
+namespace JdeScoping.DataSync.Tests.Etl.Destinations;
+
+public class DbBulkMergeDestinationTests
+{
+ [Fact]
+ public void Constructor_SetsDestinationName()
+ {
+ var factory = Substitute.For();
+ var dest = new DbBulkMergeDestination(factory, "WorkOrder", new[] { "OrderNumber" });
+ Assert.Equal("BulkMerge:WorkOrder", dest.DestinationName);
+ }
+
+ [Fact]
+ public void Constructor_NullFactory_ThrowsArgumentNullException()
+ {
+ Assert.Throws(() =>
+ new DbBulkMergeDestination(null!, "WorkOrder", new[] { "Id" }));
+ }
+
+ [Fact]
+ public void Constructor_NullTableName_ThrowsArgumentNullException()
+ {
+ var factory = Substitute.For();
+ Assert.Throws(() =>
+ new DbBulkMergeDestination(factory, null!, new[] { "Id" }));
+ }
+
+ [Fact]
+ public void Constructor_EmptyTableName_ThrowsArgumentException()
+ {
+ var factory = Substitute.For();
+ Assert.Throws(() =>
+ new DbBulkMergeDestination(factory, "", new[] { "Id" }));
+ }
+
+ [Fact]
+ public void Constructor_EmptyMatchColumns_ThrowsArgumentException()
+ {
+ var factory = Substitute.For();
+ Assert.Throws(() =>
+ new DbBulkMergeDestination(factory, "WorkOrder", Array.Empty()));
+ }
+
+ [Fact]
+ public void Constructor_NullMatchColumns_ThrowsArgumentNullException()
+ {
+ var factory = Substitute.For();
+ Assert.Throws(() =>
+ new DbBulkMergeDestination(factory, "WorkOrder", null!));
+ }
+
+ [Fact]
+ public void Constructor_WithUpdateColumns_Succeeds()
+ {
+ var factory = Substitute.For();
+ var dest = new DbBulkMergeDestination(factory, "WorkOrder",
+ new[] { "OrderNumber" },
+ updateColumns: new[] { "Status", "Description" });
+ Assert.Equal("BulkMerge:WorkOrder", dest.DestinationName);
+ }
+}