diff --git a/NEW/src/JdeScoping.DataSync/Etl/Destinations/DbBulkImportDestination.cs b/NEW/src/JdeScoping.DataSync/Etl/Destinations/DbBulkImportDestination.cs index 1274ff0..45b7548 100644 --- a/NEW/src/JdeScoping.DataSync/Etl/Destinations/DbBulkImportDestination.cs +++ b/NEW/src/JdeScoping.DataSync/Etl/Destinations/DbBulkImportDestination.cs @@ -14,10 +14,12 @@ namespace JdeScoping.DataSync.Etl.Destinations; public class DbBulkImportDestination : IImportDestination { private const int DefaultBatchSize = 10000; + private const int DefaultCommandTimeoutSeconds = 600; private readonly IDbConnectionFactory _connectionFactory; private readonly string _tableName; private readonly int _batchSize; + private readonly int _commandTimeoutSeconds; /// public string DestinationName => $"BulkImport:{_tableName}"; @@ -28,10 +30,12 @@ public class DbBulkImportDestination : IImportDestination /// Factory to create database connections. /// Name of the destination table. /// Number of rows per batch. 0 uses the default (10000). + /// Command timeout in seconds. 0 uses the default (600). public DbBulkImportDestination( IDbConnectionFactory connectionFactory, string tableName, - int batchSize = 0) + int batchSize = 0, + int commandTimeoutSeconds = 0) { ArgumentNullException.ThrowIfNull(connectionFactory); ArgumentException.ThrowIfNullOrWhiteSpace(tableName); @@ -39,6 +43,7 @@ public class DbBulkImportDestination : IImportDestination _connectionFactory = connectionFactory; _tableName = tableName; _batchSize = batchSize > 0 ? batchSize : DefaultBatchSize; + _commandTimeoutSeconds = commandTimeoutSeconds > 0 ? commandTimeoutSeconds : DefaultCommandTimeoutSeconds; } /// @@ -58,6 +63,7 @@ public class DbBulkImportDestination : IImportDestination await using (var truncateCmd = connection.CreateCommand()) { truncateCmd.CommandText = $"TRUNCATE TABLE [{_tableName}]"; + truncateCmd.CommandTimeout = _commandTimeoutSeconds; await truncateCmd.ExecuteNonQueryAsync(cancellationToken); } @@ -66,7 +72,7 @@ public class DbBulkImportDestination : IImportDestination { DestinationTableName = $"[{_tableName}]", BatchSize = _batchSize, - BulkCopyTimeout = 3600, + BulkCopyTimeout = _commandTimeoutSeconds, EnableStreaming = true }; diff --git a/NEW/src/JdeScoping.DataSync/Etl/Destinations/DbBulkMergeDestination.cs b/NEW/src/JdeScoping.DataSync/Etl/Destinations/DbBulkMergeDestination.cs index c4a0984..9c79f54 100644 --- a/NEW/src/JdeScoping.DataSync/Etl/Destinations/DbBulkMergeDestination.cs +++ b/NEW/src/JdeScoping.DataSync/Etl/Destinations/DbBulkMergeDestination.cs @@ -16,12 +16,14 @@ namespace JdeScoping.DataSync.Etl.Destinations; public class DbBulkMergeDestination : IImportDestination { private const int DefaultBatchSize = 10000; + private const int DefaultCommandTimeoutSeconds = 600; private readonly IDbConnectionFactory _connectionFactory; private readonly string _tableName; private readonly string[] _matchColumns; private readonly string[]? _updateColumns; private readonly int _batchSize; + private readonly int _commandTimeoutSeconds; /// public string DestinationName => $"BulkMerge:{_tableName}"; @@ -34,12 +36,14 @@ public class DbBulkMergeDestination : IImportDestination /// Columns to match on for determining existing rows (key columns). /// Columns to update when a row matches. If null, all non-match columns are updated. /// Number of rows per batch. 0 uses the default (10000). + /// Command timeout in seconds. 0 uses the default (600). public DbBulkMergeDestination( IDbConnectionFactory connectionFactory, string tableName, string[] matchColumns, string[]? updateColumns = null, - int batchSize = 0) + int batchSize = 0, + int commandTimeoutSeconds = 0) { ArgumentNullException.ThrowIfNull(connectionFactory); ArgumentException.ThrowIfNullOrWhiteSpace(tableName); @@ -52,6 +56,7 @@ public class DbBulkMergeDestination : IImportDestination _matchColumns = matchColumns; _updateColumns = updateColumns; _batchSize = batchSize > 0 ? batchSize : DefaultBatchSize; + _commandTimeoutSeconds = commandTimeoutSeconds > 0 ? commandTimeoutSeconds : DefaultCommandTimeoutSeconds; } /// @@ -128,6 +133,7 @@ public class DbBulkMergeDestination : IImportDestination var sql = $"SELECT TOP 0 * INTO {tempTableName} FROM [{_tableName}]"; await using var cmd = connection.CreateCommand(); cmd.CommandText = sql; + cmd.CommandTimeout = _commandTimeoutSeconds; await cmd.ExecuteNonQueryAsync(ct); } @@ -138,6 +144,7 @@ public class DbBulkMergeDestination : IImportDestination var sql = $"IF OBJECT_ID('tempdb..{tempTableName}') IS NOT NULL DROP TABLE {tempTableName}"; await using var cmd = connection.CreateCommand(); cmd.CommandText = sql; + cmd.CommandTimeout = _commandTimeoutSeconds; await cmd.ExecuteNonQueryAsync(); } catch @@ -157,17 +164,20 @@ public class DbBulkMergeDestination : IImportDestination using var bulkCopy = new SqlBulkCopy(connection) { DestinationTableName = tempTableName, - BatchSize = batch.Rows.Count + BatchSize = batch.Rows.Count, + BulkCopyTimeout = _commandTimeoutSeconds }; await bulkCopy.WriteToServerAsync(batch, ct); // Execute MERGE await using var cmd = connection.CreateCommand(); cmd.CommandText = mergeSql; + cmd.CommandTimeout = _commandTimeoutSeconds; await cmd.ExecuteNonQueryAsync(ct); // Truncate temp table for next batch cmd.CommandText = $"TRUNCATE TABLE {tempTableName}"; + cmd.CommandTimeout = _commandTimeoutSeconds; await cmd.ExecuteNonQueryAsync(ct); } diff --git a/NEW/tests/JdeScoping.DataSync.Tests/Etl/Destinations/DbBulkImportDestinationTests.cs b/NEW/tests/JdeScoping.DataSync.Tests/Etl/Destinations/DbBulkImportDestinationTests.cs index ab36f7b..b2514b0 100644 --- a/NEW/tests/JdeScoping.DataSync.Tests/Etl/Destinations/DbBulkImportDestinationTests.cs +++ b/NEW/tests/JdeScoping.DataSync.Tests/Etl/Destinations/DbBulkImportDestinationTests.cs @@ -44,4 +44,32 @@ public class DbBulkImportDestinationTests var dest = new DbBulkImportDestination(factory, "WorkOrder", batchSize: batchSize); Assert.NotNull(dest); } + + [Fact] + public void Constructor_CustomTimeout_SetsTimeout() + { + // Arrange & Act + var factory = Substitute.For(); + var dest = new DbBulkImportDestination( + factory, + "TestTable", + commandTimeoutSeconds: 1800); + + // Assert - can't directly test private field, but constructor should accept it + Assert.NotNull(dest); + } + + [Fact] + public void Constructor_ZeroTimeout_UsesDefault() + { + // Arrange & Act + var factory = Substitute.For(); + var dest = new DbBulkImportDestination( + factory, + "TestTable", + commandTimeoutSeconds: 0); + + // Assert + Assert.NotNull(dest); + } } diff --git a/NEW/tests/JdeScoping.DataSync.Tests/Etl/Destinations/DbBulkMergeDestinationTests.cs b/NEW/tests/JdeScoping.DataSync.Tests/Etl/Destinations/DbBulkMergeDestinationTests.cs index aea5813..03df554 100644 --- a/NEW/tests/JdeScoping.DataSync.Tests/Etl/Destinations/DbBulkMergeDestinationTests.cs +++ b/NEW/tests/JdeScoping.DataSync.Tests/Etl/Destinations/DbBulkMergeDestinationTests.cs @@ -62,4 +62,34 @@ public class DbBulkMergeDestinationTests updateColumns: new[] { "Status", "Description" }); Assert.Equal("BulkMerge:WorkOrder", dest.DestinationName); } + + [Fact] + public void Constructor_CustomTimeout_SetsTimeout() + { + // Arrange & Act + var factory = Substitute.For(); + var dest = new DbBulkMergeDestination( + factory, + "TestTable", + new[] { "Id" }, + commandTimeoutSeconds: 1800); + + // Assert - can't directly test private field, but constructor should accept it + Assert.NotNull(dest); + } + + [Fact] + public void Constructor_ZeroTimeout_UsesDefault() + { + // Arrange & Act + var factory = Substitute.For(); + var dest = new DbBulkMergeDestination( + factory, + "TestTable", + new[] { "Id" }, + commandTimeoutSeconds: 0); + + // Assert + Assert.NotNull(dest); + } }