feat(etl): add commandTimeoutSeconds to destinations

This commit is contained in:
Joseph Doherty
2026-01-03 11:01:12 -05:00
parent 0e07a76438
commit 0b317c1ffc
4 changed files with 78 additions and 4 deletions
@@ -14,10 +14,12 @@ namespace JdeScoping.DataSync.Etl.Destinations;
public class DbBulkImportDestination : IImportDestination public class DbBulkImportDestination : IImportDestination
{ {
private const int DefaultBatchSize = 10000; private const int DefaultBatchSize = 10000;
private const int DefaultCommandTimeoutSeconds = 600;
private readonly IDbConnectionFactory _connectionFactory; private readonly IDbConnectionFactory _connectionFactory;
private readonly string _tableName; private readonly string _tableName;
private readonly int _batchSize; private readonly int _batchSize;
private readonly int _commandTimeoutSeconds;
/// <inheritdoc /> /// <inheritdoc />
public string DestinationName => $"BulkImport:{_tableName}"; public string DestinationName => $"BulkImport:{_tableName}";
@@ -28,10 +30,12 @@ public class DbBulkImportDestination : IImportDestination
/// <param name="connectionFactory">Factory to create database connections.</param> /// <param name="connectionFactory">Factory to create database connections.</param>
/// <param name="tableName">Name of the destination table.</param> /// <param name="tableName">Name of the destination table.</param>
/// <param name="batchSize">Number of rows per batch. 0 uses the default (10000).</param> /// <param name="batchSize">Number of rows per batch. 0 uses the default (10000).</param>
/// <param name="commandTimeoutSeconds">Command timeout in seconds. 0 uses the default (600).</param>
public DbBulkImportDestination( public DbBulkImportDestination(
IDbConnectionFactory connectionFactory, IDbConnectionFactory connectionFactory,
string tableName, string tableName,
int batchSize = 0) int batchSize = 0,
int commandTimeoutSeconds = 0)
{ {
ArgumentNullException.ThrowIfNull(connectionFactory); ArgumentNullException.ThrowIfNull(connectionFactory);
ArgumentException.ThrowIfNullOrWhiteSpace(tableName); ArgumentException.ThrowIfNullOrWhiteSpace(tableName);
@@ -39,6 +43,7 @@ public class DbBulkImportDestination : IImportDestination
_connectionFactory = connectionFactory; _connectionFactory = connectionFactory;
_tableName = tableName; _tableName = tableName;
_batchSize = batchSize > 0 ? batchSize : DefaultBatchSize; _batchSize = batchSize > 0 ? batchSize : DefaultBatchSize;
_commandTimeoutSeconds = commandTimeoutSeconds > 0 ? commandTimeoutSeconds : DefaultCommandTimeoutSeconds;
} }
/// <inheritdoc /> /// <inheritdoc />
@@ -58,6 +63,7 @@ public class DbBulkImportDestination : IImportDestination
await using (var truncateCmd = connection.CreateCommand()) await using (var truncateCmd = connection.CreateCommand())
{ {
truncateCmd.CommandText = $"TRUNCATE TABLE [{_tableName}]"; truncateCmd.CommandText = $"TRUNCATE TABLE [{_tableName}]";
truncateCmd.CommandTimeout = _commandTimeoutSeconds;
await truncateCmd.ExecuteNonQueryAsync(cancellationToken); await truncateCmd.ExecuteNonQueryAsync(cancellationToken);
} }
@@ -66,7 +72,7 @@ public class DbBulkImportDestination : IImportDestination
{ {
DestinationTableName = $"[{_tableName}]", DestinationTableName = $"[{_tableName}]",
BatchSize = _batchSize, BatchSize = _batchSize,
BulkCopyTimeout = 3600, BulkCopyTimeout = _commandTimeoutSeconds,
EnableStreaming = true EnableStreaming = true
}; };
@@ -16,12 +16,14 @@ namespace JdeScoping.DataSync.Etl.Destinations;
public class DbBulkMergeDestination : IImportDestination public class DbBulkMergeDestination : IImportDestination
{ {
private const int DefaultBatchSize = 10000; private const int DefaultBatchSize = 10000;
private const int DefaultCommandTimeoutSeconds = 600;
private readonly IDbConnectionFactory _connectionFactory; private readonly IDbConnectionFactory _connectionFactory;
private readonly string _tableName; private readonly string _tableName;
private readonly string[] _matchColumns; private readonly string[] _matchColumns;
private readonly string[]? _updateColumns; private readonly string[]? _updateColumns;
private readonly int _batchSize; private readonly int _batchSize;
private readonly int _commandTimeoutSeconds;
/// <inheritdoc /> /// <inheritdoc />
public string DestinationName => $"BulkMerge:{_tableName}"; public string DestinationName => $"BulkMerge:{_tableName}";
@@ -34,12 +36,14 @@ public class DbBulkMergeDestination : IImportDestination
/// <param name="matchColumns">Columns to match on for determining existing rows (key columns).</param> /// <param name="matchColumns">Columns to match on for determining existing rows (key columns).</param>
/// <param name="updateColumns">Columns to update when a row matches. If null, all non-match columns are updated.</param> /// <param name="updateColumns">Columns to update when a row matches. If null, all non-match columns are updated.</param>
/// <param name="batchSize">Number of rows per batch. 0 uses the default (10000).</param> /// <param name="batchSize">Number of rows per batch. 0 uses the default (10000).</param>
/// <param name="commandTimeoutSeconds">Command timeout in seconds. 0 uses the default (600).</param>
public DbBulkMergeDestination( public DbBulkMergeDestination(
IDbConnectionFactory connectionFactory, IDbConnectionFactory connectionFactory,
string tableName, string tableName,
string[] matchColumns, string[] matchColumns,
string[]? updateColumns = null, string[]? updateColumns = null,
int batchSize = 0) int batchSize = 0,
int commandTimeoutSeconds = 0)
{ {
ArgumentNullException.ThrowIfNull(connectionFactory); ArgumentNullException.ThrowIfNull(connectionFactory);
ArgumentException.ThrowIfNullOrWhiteSpace(tableName); ArgumentException.ThrowIfNullOrWhiteSpace(tableName);
@@ -52,6 +56,7 @@ public class DbBulkMergeDestination : IImportDestination
_matchColumns = matchColumns; _matchColumns = matchColumns;
_updateColumns = updateColumns; _updateColumns = updateColumns;
_batchSize = batchSize > 0 ? batchSize : DefaultBatchSize; _batchSize = batchSize > 0 ? batchSize : DefaultBatchSize;
_commandTimeoutSeconds = commandTimeoutSeconds > 0 ? commandTimeoutSeconds : DefaultCommandTimeoutSeconds;
} }
/// <inheritdoc /> /// <inheritdoc />
@@ -128,6 +133,7 @@ public class DbBulkMergeDestination : IImportDestination
var sql = $"SELECT TOP 0 * INTO {tempTableName} FROM [{_tableName}]"; var sql = $"SELECT TOP 0 * INTO {tempTableName} FROM [{_tableName}]";
await using var cmd = connection.CreateCommand(); await using var cmd = connection.CreateCommand();
cmd.CommandText = sql; cmd.CommandText = sql;
cmd.CommandTimeout = _commandTimeoutSeconds;
await cmd.ExecuteNonQueryAsync(ct); await cmd.ExecuteNonQueryAsync(ct);
} }
@@ -138,6 +144,7 @@ public class DbBulkMergeDestination : IImportDestination
var sql = $"IF OBJECT_ID('tempdb..{tempTableName}') IS NOT NULL DROP TABLE {tempTableName}"; var sql = $"IF OBJECT_ID('tempdb..{tempTableName}') IS NOT NULL DROP TABLE {tempTableName}";
await using var cmd = connection.CreateCommand(); await using var cmd = connection.CreateCommand();
cmd.CommandText = sql; cmd.CommandText = sql;
cmd.CommandTimeout = _commandTimeoutSeconds;
await cmd.ExecuteNonQueryAsync(); await cmd.ExecuteNonQueryAsync();
} }
catch catch
@@ -157,17 +164,20 @@ public class DbBulkMergeDestination : IImportDestination
using var bulkCopy = new SqlBulkCopy(connection) using var bulkCopy = new SqlBulkCopy(connection)
{ {
DestinationTableName = tempTableName, DestinationTableName = tempTableName,
BatchSize = batch.Rows.Count BatchSize = batch.Rows.Count,
BulkCopyTimeout = _commandTimeoutSeconds
}; };
await bulkCopy.WriteToServerAsync(batch, ct); await bulkCopy.WriteToServerAsync(batch, ct);
// Execute MERGE // Execute MERGE
await using var cmd = connection.CreateCommand(); await using var cmd = connection.CreateCommand();
cmd.CommandText = mergeSql; cmd.CommandText = mergeSql;
cmd.CommandTimeout = _commandTimeoutSeconds;
await cmd.ExecuteNonQueryAsync(ct); await cmd.ExecuteNonQueryAsync(ct);
// Truncate temp table for next batch // Truncate temp table for next batch
cmd.CommandText = $"TRUNCATE TABLE {tempTableName}"; cmd.CommandText = $"TRUNCATE TABLE {tempTableName}";
cmd.CommandTimeout = _commandTimeoutSeconds;
await cmd.ExecuteNonQueryAsync(ct); await cmd.ExecuteNonQueryAsync(ct);
} }
@@ -44,4 +44,32 @@ public class DbBulkImportDestinationTests
var dest = new DbBulkImportDestination(factory, "WorkOrder", batchSize: batchSize); var dest = new DbBulkImportDestination(factory, "WorkOrder", batchSize: batchSize);
Assert.NotNull(dest); Assert.NotNull(dest);
} }
[Fact]
public void Constructor_CustomTimeout_SetsTimeout()
{
// Arrange & Act
var factory = Substitute.For<IDbConnectionFactory>();
var dest = new DbBulkImportDestination(
factory,
"TestTable",
commandTimeoutSeconds: 1800);
// Assert - can't directly test private field, but constructor should accept it
Assert.NotNull(dest);
}
[Fact]
public void Constructor_ZeroTimeout_UsesDefault()
{
// Arrange & Act
var factory = Substitute.For<IDbConnectionFactory>();
var dest = new DbBulkImportDestination(
factory,
"TestTable",
commandTimeoutSeconds: 0);
// Assert
Assert.NotNull(dest);
}
} }
@@ -62,4 +62,34 @@ public class DbBulkMergeDestinationTests
updateColumns: new[] { "Status", "Description" }); updateColumns: new[] { "Status", "Description" });
Assert.Equal("BulkMerge:WorkOrder", dest.DestinationName); Assert.Equal("BulkMerge:WorkOrder", dest.DestinationName);
} }
[Fact]
public void Constructor_CustomTimeout_SetsTimeout()
{
// Arrange & Act
var factory = Substitute.For<IDbConnectionFactory>();
var dest = new DbBulkMergeDestination(
factory,
"TestTable",
new[] { "Id" },
commandTimeoutSeconds: 1800);
// Assert - can't directly test private field, but constructor should accept it
Assert.NotNull(dest);
}
[Fact]
public void Constructor_ZeroTimeout_UsesDefault()
{
// Arrange & Act
var factory = Substitute.For<IDbConnectionFactory>();
var dest = new DbBulkMergeDestination(
factory,
"TestTable",
new[] { "Id" },
commandTimeoutSeconds: 0);
// Assert
Assert.NotNull(dest);
}
} }