feat(datasync): extend DbBulkMergeDestination with excludeFromUpdate and updateCondition
This commit is contained in:
@@ -24,6 +24,8 @@ public class DbBulkMergeDestination : IImportDestination
|
|||||||
private readonly string _tableName;
|
private readonly string _tableName;
|
||||||
private readonly string[] _matchColumns;
|
private readonly string[] _matchColumns;
|
||||||
private readonly string[]? _updateColumns;
|
private readonly string[]? _updateColumns;
|
||||||
|
private readonly string[]? _excludeFromUpdate;
|
||||||
|
private readonly string? _updateCondition;
|
||||||
private readonly int _batchSize;
|
private readonly int _batchSize;
|
||||||
private readonly int _commandTimeoutSeconds;
|
private readonly int _commandTimeoutSeconds;
|
||||||
|
|
||||||
@@ -37,6 +39,8 @@ public class DbBulkMergeDestination : IImportDestination
|
|||||||
/// <param name="tableName">Name of the destination table.</param>
|
/// <param name="tableName">Name of the destination table.</param>
|
||||||
/// <param name="matchColumns">Columns to match on for determining existing rows (key columns).</param>
|
/// <param name="matchColumns">Columns to match on for determining existing rows (key columns).</param>
|
||||||
/// <param name="updateColumns">Columns to update when a row matches. If null, all non-match columns are updated.</param>
|
/// <param name="updateColumns">Columns to update when a row matches. If null, all non-match columns are updated.</param>
|
||||||
|
/// <param name="excludeFromUpdate">Columns to exclude from the UPDATE clause. Takes precedence over updateColumns.</param>
|
||||||
|
/// <param name="updateCondition">Optional SQL condition to add to the WHEN MATCHED clause (e.g., "source.LastUpdate > target.LastUpdate").</param>
|
||||||
/// <param name="batchSize">Number of rows per batch. 0 uses the default (10000).</param>
|
/// <param name="batchSize">Number of rows per batch. 0 uses the default (10000).</param>
|
||||||
/// <param name="commandTimeoutSeconds">Command timeout in seconds. 0 uses the default (600).</param>
|
/// <param name="commandTimeoutSeconds">Command timeout in seconds. 0 uses the default (600).</param>
|
||||||
public DbBulkMergeDestination(
|
public DbBulkMergeDestination(
|
||||||
@@ -44,6 +48,8 @@ public class DbBulkMergeDestination : IImportDestination
|
|||||||
string tableName,
|
string tableName,
|
||||||
string[] matchColumns,
|
string[] matchColumns,
|
||||||
string[]? updateColumns = null,
|
string[]? updateColumns = null,
|
||||||
|
string[]? excludeFromUpdate = null,
|
||||||
|
string? updateCondition = null,
|
||||||
int batchSize = 0,
|
int batchSize = 0,
|
||||||
int commandTimeoutSeconds = 0)
|
int commandTimeoutSeconds = 0)
|
||||||
{
|
{
|
||||||
@@ -57,6 +63,8 @@ public class DbBulkMergeDestination : IImportDestination
|
|||||||
_tableName = tableName;
|
_tableName = tableName;
|
||||||
_matchColumns = matchColumns;
|
_matchColumns = matchColumns;
|
||||||
_updateColumns = updateColumns;
|
_updateColumns = updateColumns;
|
||||||
|
_excludeFromUpdate = excludeFromUpdate;
|
||||||
|
_updateCondition = updateCondition;
|
||||||
_batchSize = batchSize > 0 ? batchSize : DefaultBatchSize;
|
_batchSize = batchSize > 0 ? batchSize : DefaultBatchSize;
|
||||||
_commandTimeoutSeconds = commandTimeoutSeconds > 0 ? commandTimeoutSeconds : DefaultCommandTimeoutSeconds;
|
_commandTimeoutSeconds = commandTimeoutSeconds > 0 ? commandTimeoutSeconds : DefaultCommandTimeoutSeconds;
|
||||||
}
|
}
|
||||||
@@ -101,8 +109,12 @@ public class DbBulkMergeDestination : IImportDestination
|
|||||||
|
|
||||||
// Determine update columns (all non-match columns if not specified), filtered to destColumns
|
// Determine update columns (all non-match columns if not specified), filtered to destColumns
|
||||||
var matchSet = new HashSet<string>(_matchColumns, StringComparer.OrdinalIgnoreCase);
|
var matchSet = new HashSet<string>(_matchColumns, StringComparer.OrdinalIgnoreCase);
|
||||||
|
var excludeSet = _excludeFromUpdate != null
|
||||||
|
? new HashSet<string>(_excludeFromUpdate, StringComparer.OrdinalIgnoreCase)
|
||||||
|
: new HashSet<string>(StringComparer.OrdinalIgnoreCase);
|
||||||
|
|
||||||
var updateCols = (_updateColumns ?? allColumns.Where(c => !matchSet.Contains(c)).ToArray())
|
var updateCols = (_updateColumns ?? allColumns.Where(c => !matchSet.Contains(c)).ToArray())
|
||||||
.Where(c => destColumns.Contains(c))
|
.Where(c => destColumns.Contains(c) && !excludeSet.Contains(c))
|
||||||
.ToArray();
|
.ToArray();
|
||||||
|
|
||||||
// Build MERGE SQL
|
// Build MERGE SQL
|
||||||
@@ -221,7 +233,14 @@ public class DbBulkMergeDestination : IImportDestination
|
|||||||
|
|
||||||
if (updateColumns.Count > 0)
|
if (updateColumns.Count > 0)
|
||||||
{
|
{
|
||||||
sb.AppendLine("WHEN MATCHED THEN UPDATE SET");
|
if (!string.IsNullOrWhiteSpace(_updateCondition))
|
||||||
|
{
|
||||||
|
sb.AppendLine($"WHEN MATCHED AND {_updateCondition} THEN UPDATE SET");
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
sb.AppendLine("WHEN MATCHED THEN UPDATE SET");
|
||||||
|
}
|
||||||
sb.AppendLine(string.Join(", ", updateColumns.Select(c => $"target.[{c}] = source.[{c}]")));
|
sb.AppendLine(string.Join(", ", updateColumns.Select(c => $"target.[{c}] = source.[{c}]")));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -115,4 +115,189 @@ public class DbBulkMergeDestinationTests
|
|||||||
// Assert
|
// Assert
|
||||||
Assert.NotNull(dest);
|
Assert.NotNull(dest);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void Constructor_WithExcludeFromUpdate_Succeeds()
|
||||||
|
{
|
||||||
|
// Arrange & Act
|
||||||
|
var factory = Substitute.For<IDbConnectionFactory>();
|
||||||
|
var dest = new DbBulkMergeDestination(
|
||||||
|
factory,
|
||||||
|
"WorkOrder",
|
||||||
|
new[] { "OrderNumber" },
|
||||||
|
excludeFromUpdate: new[] { "CreatedDate", "CreatedBy" });
|
||||||
|
|
||||||
|
// Assert
|
||||||
|
Assert.NotNull(dest);
|
||||||
|
Assert.Equal("BulkMerge:WorkOrder", dest.DestinationName);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void Constructor_WithUpdateCondition_Succeeds()
|
||||||
|
{
|
||||||
|
// Arrange & Act
|
||||||
|
var factory = Substitute.For<IDbConnectionFactory>();
|
||||||
|
var dest = new DbBulkMergeDestination(
|
||||||
|
factory,
|
||||||
|
"WorkOrder",
|
||||||
|
new[] { "OrderNumber" },
|
||||||
|
updateCondition: "source.LastUpdate > target.LastUpdate");
|
||||||
|
|
||||||
|
// Assert
|
||||||
|
Assert.NotNull(dest);
|
||||||
|
Assert.Equal("BulkMerge:WorkOrder", dest.DestinationName);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void Constructor_WithAllNewParameters_Succeeds()
|
||||||
|
{
|
||||||
|
// Arrange & Act
|
||||||
|
var factory = Substitute.For<IDbConnectionFactory>();
|
||||||
|
var dest = new DbBulkMergeDestination(
|
||||||
|
factory,
|
||||||
|
"WorkOrder",
|
||||||
|
new[] { "OrderNumber" },
|
||||||
|
updateColumns: new[] { "Status", "Description" },
|
||||||
|
excludeFromUpdate: new[] { "CreatedDate" },
|
||||||
|
updateCondition: "source.LastUpdate > target.LastUpdate",
|
||||||
|
batchSize: 5000,
|
||||||
|
commandTimeoutSeconds: 300);
|
||||||
|
|
||||||
|
// Assert
|
||||||
|
Assert.NotNull(dest);
|
||||||
|
Assert.Equal("BulkMerge:WorkOrder", dest.DestinationName);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void Constructor_WithNullExcludeFromUpdate_Succeeds()
|
||||||
|
{
|
||||||
|
// Arrange & Act
|
||||||
|
var factory = Substitute.For<IDbConnectionFactory>();
|
||||||
|
var dest = new DbBulkMergeDestination(
|
||||||
|
factory,
|
||||||
|
"WorkOrder",
|
||||||
|
new[] { "OrderNumber" },
|
||||||
|
updateColumns: null,
|
||||||
|
excludeFromUpdate: null,
|
||||||
|
updateCondition: null);
|
||||||
|
|
||||||
|
// Assert
|
||||||
|
Assert.NotNull(dest);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void Constructor_WithEmptyExcludeFromUpdate_Succeeds()
|
||||||
|
{
|
||||||
|
// Arrange & Act
|
||||||
|
var factory = Substitute.For<IDbConnectionFactory>();
|
||||||
|
var dest = new DbBulkMergeDestination(
|
||||||
|
factory,
|
||||||
|
"WorkOrder",
|
||||||
|
new[] { "OrderNumber" },
|
||||||
|
excludeFromUpdate: Array.Empty<string>());
|
||||||
|
|
||||||
|
// Assert
|
||||||
|
Assert.NotNull(dest);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void Constructor_WithEmptyUpdateCondition_Succeeds()
|
||||||
|
{
|
||||||
|
// Arrange & Act
|
||||||
|
var factory = Substitute.For<IDbConnectionFactory>();
|
||||||
|
var dest = new DbBulkMergeDestination(
|
||||||
|
factory,
|
||||||
|
"WorkOrder",
|
||||||
|
new[] { "OrderNumber" },
|
||||||
|
updateCondition: "");
|
||||||
|
|
||||||
|
// Assert
|
||||||
|
Assert.NotNull(dest);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void Constructor_WithWhitespaceUpdateCondition_Succeeds()
|
||||||
|
{
|
||||||
|
// Arrange & Act
|
||||||
|
var factory = Substitute.For<IDbConnectionFactory>();
|
||||||
|
var dest = new DbBulkMergeDestination(
|
||||||
|
factory,
|
||||||
|
"WorkOrder",
|
||||||
|
new[] { "OrderNumber" },
|
||||||
|
updateCondition: " ");
|
||||||
|
|
||||||
|
// Assert
|
||||||
|
Assert.NotNull(dest);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Documents that excludeFromUpdate takes precedence over updateColumns.
|
||||||
|
/// If a column is in both updateColumns and excludeFromUpdate, it should be excluded.
|
||||||
|
/// This is verified during actual MERGE SQL generation (integration test concept).
|
||||||
|
/// </summary>
|
||||||
|
[Fact]
|
||||||
|
public void Constructor_WithConflictingUpdateAndExclude_Succeeds()
|
||||||
|
{
|
||||||
|
// Arrange & Act - excludeFromUpdate takes precedence over updateColumns
|
||||||
|
var factory = Substitute.For<IDbConnectionFactory>();
|
||||||
|
var dest = new DbBulkMergeDestination(
|
||||||
|
factory,
|
||||||
|
"WorkOrder",
|
||||||
|
new[] { "OrderNumber" },
|
||||||
|
updateColumns: new[] { "Status", "CreatedDate" },
|
||||||
|
excludeFromUpdate: new[] { "CreatedDate" });
|
||||||
|
|
||||||
|
// Assert - construction succeeds; actual exclusion behavior is in MERGE SQL generation
|
||||||
|
Assert.NotNull(dest);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Documents that updateCondition is applied to the WHEN MATCHED clause.
|
||||||
|
/// This adds conditional update logic like "source.LastUpdate > target.LastUpdate"
|
||||||
|
/// to prevent stale data from overwriting newer data.
|
||||||
|
/// </summary>
|
||||||
|
[Fact]
|
||||||
|
public void UpdateCondition_DocumentsUsage_IntegrationTestConcept()
|
||||||
|
{
|
||||||
|
// The updateCondition parameter adds a condition to the MERGE statement's WHEN MATCHED clause.
|
||||||
|
// Example: WHEN MATCHED AND source.LastUpdate > target.LastUpdate THEN UPDATE SET ...
|
||||||
|
//
|
||||||
|
// This is useful for:
|
||||||
|
// - Preventing stale data from overwriting newer data
|
||||||
|
// - Only updating rows that have actually changed
|
||||||
|
// - Implementing optimistic concurrency patterns
|
||||||
|
//
|
||||||
|
// Full verification requires an integration test with a real database.
|
||||||
|
var factory = Substitute.For<IDbConnectionFactory>();
|
||||||
|
var dest = new DbBulkMergeDestination(
|
||||||
|
factory,
|
||||||
|
"WorkOrder",
|
||||||
|
new[] { "OrderNumber" },
|
||||||
|
updateCondition: "source.LastUpdate > target.LastUpdate");
|
||||||
|
Assert.NotNull(dest);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Documents that excludeFromUpdate removes columns from the UPDATE SET clause.
|
||||||
|
/// This is useful for columns that should only be set on INSERT (e.g., CreatedDate).
|
||||||
|
/// </summary>
|
||||||
|
[Fact]
|
||||||
|
public void ExcludeFromUpdate_DocumentsUsage_IntegrationTestConcept()
|
||||||
|
{
|
||||||
|
// The excludeFromUpdate parameter removes specified columns from the UPDATE SET clause.
|
||||||
|
// These columns are still included in the INSERT clause for new rows.
|
||||||
|
//
|
||||||
|
// Common use cases:
|
||||||
|
// - CreatedDate, CreatedBy - set only on insert, never updated
|
||||||
|
// - Audit columns that should preserve original values
|
||||||
|
//
|
||||||
|
// Full verification requires an integration test with a real database.
|
||||||
|
var factory = Substitute.For<IDbConnectionFactory>();
|
||||||
|
var dest = new DbBulkMergeDestination(
|
||||||
|
factory,
|
||||||
|
"WorkOrder",
|
||||||
|
new[] { "OrderNumber" },
|
||||||
|
excludeFromUpdate: new[] { "CreatedDate", "CreatedBy" });
|
||||||
|
Assert.NotNull(dest);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user