feat(etl): implement DbBulkImportDestination for full table refresh

Add bulk import destination that truncates and loads data using
SqlBulkCopy with configurable batch sizes and streaming support.
This commit is contained in:
Joseph Doherty
2026-01-03 09:22:57 -05:00
parent 8594baf11d
commit 63a0e7cf7e
3 changed files with 147 additions and 0 deletions
@@ -0,0 +1,99 @@
using System.Data;
using System.Diagnostics;
using JdeScoping.DataAccess.Interfaces;
using JdeScoping.DataSync.Etl.Contracts;
using JdeScoping.DataSync.Etl.Results;
using Microsoft.Data.SqlClient;
namespace JdeScoping.DataSync.Etl.Destinations;
/// <summary>
/// Imports data into a SQL Server table using bulk copy operations.
/// Performs a full table refresh by truncating the table before loading.
/// </summary>
public class DbBulkImportDestination : IImportDestination
{
private const int DefaultBatchSize = 10000;
private readonly IDbConnectionFactory _connectionFactory;
private readonly string _tableName;
private readonly int _batchSize;
/// <inheritdoc />
public string DestinationName => $"BulkImport:{_tableName}";
/// <summary>
/// Creates a new bulk import destination for the specified table.
/// </summary>
/// <param name="connectionFactory">Factory to create database connections.</param>
/// <param name="tableName">Name of the destination table.</param>
/// <param name="batchSize">Number of rows per batch. 0 uses the default (10000).</param>
public DbBulkImportDestination(
IDbConnectionFactory connectionFactory,
string tableName,
int batchSize = 0)
{
ArgumentNullException.ThrowIfNull(connectionFactory);
ArgumentException.ThrowIfNullOrWhiteSpace(tableName);
_connectionFactory = connectionFactory;
_tableName = tableName;
_batchSize = batchSize > 0 ? batchSize : DefaultBatchSize;
}
/// <inheritdoc />
public async Task<DestinationResult> WriteAsync(
IDataReader source,
CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(source);
var stopwatch = Stopwatch.StartNew();
long totalRows = 0;
int batchCount = 0;
await using var connection = await _connectionFactory.CreateLotFinderConnectionAsync(cancellationToken);
// Truncate destination table
await using (var truncateCmd = connection.CreateCommand())
{
truncateCmd.CommandText = $"TRUNCATE TABLE [{_tableName}]";
await truncateCmd.ExecuteNonQueryAsync(cancellationToken);
}
// Bulk copy data
using var bulkCopy = new SqlBulkCopy(connection)
{
DestinationTableName = $"[{_tableName}]",
BatchSize = _batchSize,
BulkCopyTimeout = 3600,
EnableStreaming = true
};
// Map columns by name
for (int i = 0; i < source.FieldCount; i++)
{
bulkCopy.ColumnMappings.Add(source.GetName(i), source.GetName(i));
}
// Track rows via event
bulkCopy.NotifyAfter = _batchSize;
bulkCopy.SqlRowsCopied += (_, e) =>
{
totalRows = e.RowsCopied;
batchCount++;
};
await bulkCopy.WriteToServerAsync(source, cancellationToken);
// Final count - RowsCopied property may have more rows if NotifyAfter didn't fire
if (bulkCopy.RowsCopied > totalRows)
{
totalRows = bulkCopy.RowsCopied;
batchCount++;
}
stopwatch.Stop();
return new DestinationResult(totalRows, batchCount, stopwatch.Elapsed);
}
}
@@ -0,0 +1,47 @@
using JdeScoping.DataAccess.Interfaces;
using JdeScoping.DataSync.Etl.Destinations;
using NSubstitute;
namespace JdeScoping.DataSync.Tests.Etl.Destinations;
public class DbBulkImportDestinationTests
{
[Fact]
public void Constructor_SetsDestinationName()
{
var factory = Substitute.For<IDbConnectionFactory>();
var dest = new DbBulkImportDestination(factory, "WorkOrder");
Assert.Equal("BulkImport:WorkOrder", dest.DestinationName);
}
[Fact]
public void Constructor_NullFactory_ThrowsArgumentNullException()
{
Assert.Throws<ArgumentNullException>(() => new DbBulkImportDestination(null!, "WorkOrder"));
}
[Fact]
public void Constructor_NullTableName_ThrowsArgumentNullException()
{
var factory = Substitute.For<IDbConnectionFactory>();
Assert.Throws<ArgumentNullException>(() => new DbBulkImportDestination(factory, null!));
}
[Fact]
public void Constructor_EmptyTableName_ThrowsArgumentException()
{
var factory = Substitute.For<IDbConnectionFactory>();
Assert.Throws<ArgumentException>(() => new DbBulkImportDestination(factory, ""));
}
[Theory]
[InlineData(0)] // 0 means default
[InlineData(5000)]
[InlineData(50000)]
public void Constructor_VariousBatchSizes_Succeeds(int batchSize)
{
var factory = Substitute.For<IDbConnectionFactory>();
var dest = new DbBulkImportDestination(factory, "WorkOrder", batchSize: batchSize);
Assert.NotNull(dest);
}
}
@@ -28,6 +28,7 @@
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\..\src\JdeScoping.DataAccess\JdeScoping.DataAccess.csproj" />
<ProjectReference Include="..\..\src\JdeScoping.DataSync\JdeScoping.DataSync.csproj" />
</ItemGroup>