diff --git a/NEW/src/JdeScoping.DataSync/Etl/Destinations/DbBulkImportDestination.cs b/NEW/src/JdeScoping.DataSync/Etl/Destinations/DbBulkImportDestination.cs new file mode 100644 index 0000000..1274ff0 --- /dev/null +++ b/NEW/src/JdeScoping.DataSync/Etl/Destinations/DbBulkImportDestination.cs @@ -0,0 +1,99 @@ +using System.Data; +using System.Diagnostics; +using JdeScoping.DataAccess.Interfaces; +using JdeScoping.DataSync.Etl.Contracts; +using JdeScoping.DataSync.Etl.Results; +using Microsoft.Data.SqlClient; + +namespace JdeScoping.DataSync.Etl.Destinations; + +/// +/// Imports data into a SQL Server table using bulk copy operations. +/// Performs a full table refresh by truncating the table before loading. +/// +public class DbBulkImportDestination : IImportDestination +{ + private const int DefaultBatchSize = 10000; + + private readonly IDbConnectionFactory _connectionFactory; + private readonly string _tableName; + private readonly int _batchSize; + + /// + public string DestinationName => $"BulkImport:{_tableName}"; + + /// + /// Creates a new bulk import destination for the specified table. + /// + /// Factory to create database connections. + /// Name of the destination table. + /// Number of rows per batch. 0 uses the default (10000). + public DbBulkImportDestination( + IDbConnectionFactory connectionFactory, + string tableName, + int batchSize = 0) + { + ArgumentNullException.ThrowIfNull(connectionFactory); + ArgumentException.ThrowIfNullOrWhiteSpace(tableName); + + _connectionFactory = connectionFactory; + _tableName = tableName; + _batchSize = batchSize > 0 ? batchSize : DefaultBatchSize; + } + + /// + public async Task WriteAsync( + IDataReader source, + CancellationToken cancellationToken = default) + { + ArgumentNullException.ThrowIfNull(source); + + var stopwatch = Stopwatch.StartNew(); + long totalRows = 0; + int batchCount = 0; + + await using var connection = await _connectionFactory.CreateLotFinderConnectionAsync(cancellationToken); + + // Truncate destination table + await using (var truncateCmd = connection.CreateCommand()) + { + truncateCmd.CommandText = $"TRUNCATE TABLE [{_tableName}]"; + await truncateCmd.ExecuteNonQueryAsync(cancellationToken); + } + + // Bulk copy data + using var bulkCopy = new SqlBulkCopy(connection) + { + DestinationTableName = $"[{_tableName}]", + BatchSize = _batchSize, + BulkCopyTimeout = 3600, + EnableStreaming = true + }; + + // Map columns by name + for (int i = 0; i < source.FieldCount; i++) + { + bulkCopy.ColumnMappings.Add(source.GetName(i), source.GetName(i)); + } + + // Track rows via event + bulkCopy.NotifyAfter = _batchSize; + bulkCopy.SqlRowsCopied += (_, e) => + { + totalRows = e.RowsCopied; + batchCount++; + }; + + await bulkCopy.WriteToServerAsync(source, cancellationToken); + + // Final count - RowsCopied property may have more rows if NotifyAfter didn't fire + if (bulkCopy.RowsCopied > totalRows) + { + totalRows = bulkCopy.RowsCopied; + batchCount++; + } + + stopwatch.Stop(); + return new DestinationResult(totalRows, batchCount, stopwatch.Elapsed); + } +} diff --git a/NEW/tests/JdeScoping.DataSync.Tests/Etl/Destinations/DbBulkImportDestinationTests.cs b/NEW/tests/JdeScoping.DataSync.Tests/Etl/Destinations/DbBulkImportDestinationTests.cs new file mode 100644 index 0000000..ab36f7b --- /dev/null +++ b/NEW/tests/JdeScoping.DataSync.Tests/Etl/Destinations/DbBulkImportDestinationTests.cs @@ -0,0 +1,47 @@ +using JdeScoping.DataAccess.Interfaces; +using JdeScoping.DataSync.Etl.Destinations; +using NSubstitute; + +namespace JdeScoping.DataSync.Tests.Etl.Destinations; + +public class DbBulkImportDestinationTests +{ + [Fact] + public void Constructor_SetsDestinationName() + { + var factory = Substitute.For(); + var dest = new DbBulkImportDestination(factory, "WorkOrder"); + Assert.Equal("BulkImport:WorkOrder", dest.DestinationName); + } + + [Fact] + public void Constructor_NullFactory_ThrowsArgumentNullException() + { + Assert.Throws(() => new DbBulkImportDestination(null!, "WorkOrder")); + } + + [Fact] + public void Constructor_NullTableName_ThrowsArgumentNullException() + { + var factory = Substitute.For(); + Assert.Throws(() => new DbBulkImportDestination(factory, null!)); + } + + [Fact] + public void Constructor_EmptyTableName_ThrowsArgumentException() + { + var factory = Substitute.For(); + Assert.Throws(() => new DbBulkImportDestination(factory, "")); + } + + [Theory] + [InlineData(0)] // 0 means default + [InlineData(5000)] + [InlineData(50000)] + public void Constructor_VariousBatchSizes_Succeeds(int batchSize) + { + var factory = Substitute.For(); + var dest = new DbBulkImportDestination(factory, "WorkOrder", batchSize: batchSize); + Assert.NotNull(dest); + } +} diff --git a/NEW/tests/JdeScoping.DataSync.Tests/JdeScoping.DataSync.Tests.csproj b/NEW/tests/JdeScoping.DataSync.Tests/JdeScoping.DataSync.Tests.csproj index c085503..352d954 100644 --- a/NEW/tests/JdeScoping.DataSync.Tests/JdeScoping.DataSync.Tests.csproj +++ b/NEW/tests/JdeScoping.DataSync.Tests/JdeScoping.DataSync.Tests.csproj @@ -28,6 +28,7 @@ +