9.1 KiB
Destinations and Scripts
Destinations consume data from the pipeline and write it to storage. Scripts run SQL operations before or after data transfer, commonly for index management.
IImportDestination Contract
public interface IImportDestination
{
Task<DestinationResult> WriteAsync(IDataReader source, CancellationToken cancellationToken = default);
string DestinationName { get; }
}
Key requirements:
- Consume the entire
IDataReaderinWriteAsync - Return
DestinationResultwith row count, batch count, and elapsed time DestinationNameis used in logging andStepResulttracking
DbBulkImportDestination
Full table refresh using TRUNCATE + bulk copy:
public class DbBulkImportDestination : IImportDestination
{
private const int DefaultBatchSize = 10000;
private const int DefaultCommandTimeoutSeconds = 600;
public DbBulkImportDestination(
IDbConnectionFactory connectionFactory,
string tableName,
int batchSize = 0,
int commandTimeoutSeconds = 0)
{
_batchSize = batchSize > 0 ? batchSize : DefaultBatchSize;
_commandTimeoutSeconds = commandTimeoutSeconds > 0
? commandTimeoutSeconds : DefaultCommandTimeoutSeconds;
}
Column mapping
Queries destination schema and maps only matching columns. Extra source columns are ignored:
var destColumns = await GetDestinationColumnsAsync(connection, cancellationToken);
using var bulkCopy = new SqlBulkCopy(connection)
{
DestinationTableName = qualifiedName,
BatchSize = _batchSize,
BulkCopyTimeout = _commandTimeoutSeconds,
EnableStreaming = true
};
for (int i = 0; i < source.FieldCount; i++)
{
var columnName = source.GetName(i);
if (destColumns.Contains(columnName))
{
bulkCopy.ColumnMappings.Add(columnName, columnName);
}
}
if (bulkCopy.ColumnMappings.Count == 0)
throw new InvalidOperationException(
$"No columns from source exist in destination table '{_tableName}'.");
Destination column discovery
Uses INFORMATION_SCHEMA.COLUMNS with schema support:
private async Task<HashSet<string>> GetDestinationColumnsAsync(
SqlConnection connection, CancellationToken ct)
{
var (schema, table) = CommonScripts.ParseTableName(_tableName);
var sql = @"SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_NAME = @tableName AND TABLE_SCHEMA = @schemaName";
var columns = await connection.QueryAsync<string>(
new CommandDefinition(sql, new { tableName = table, schemaName = schema },
commandTimeout: _commandTimeoutSeconds, cancellationToken: ct));
return columns.ToHashSet(StringComparer.OrdinalIgnoreCase);
}
DbBulkMergeDestination
Incremental updates using bulk copy to temp table + MERGE:
public class DbBulkMergeDestination : IImportDestination
{
public DbBulkMergeDestination(
IDbConnectionFactory connectionFactory,
string tableName,
string[] matchColumns,
string[]? updateColumns = null,
int batchSize = 0,
int commandTimeoutSeconds = 0)
{
if (matchColumns.Length == 0)
throw new ArgumentException("At least one match column is required.");
}
Batch processing
Creates a temp table, bulk copies in batches, then merges each batch:
var tempTableName = $"#ETL_{_tableName.Replace(".", "_").Replace("[", "").Replace("]", "")}";
await CreateTempTableAsync(connection, tempTableName, cancellationToken);
while (source.Read())
{
// Buffer rows into DataTable
if (batch.Rows.Count >= _batchSize)
{
await ProcessBatchAsync(connection, batch, tempTableName, mergeSql, destColumns, ct);
totalRows += batch.Rows.Count;
batch.Clear();
}
}
MERGE SQL generation
Generates MERGE statement with configurable match and update columns:
private string BuildMergeSql(string tempTableName,
IReadOnlyList<string> allColumns, IReadOnlyList<string> updateColumns)
{
var qualifiedName = CommonScripts.FormatQualifiedTableName(_tableName);
var sb = new StringBuilder();
sb.AppendLine($"MERGE INTO {qualifiedName} AS target");
sb.AppendLine($"USING {tempTableName} AS source");
sb.Append("ON ");
sb.AppendLine(string.Join(" AND ",
_matchColumns.Select(c => $"target.[{c}] = source.[{c}]")));
if (updateColumns.Count > 0)
{
sb.AppendLine("WHEN MATCHED THEN UPDATE SET");
sb.AppendLine(string.Join(", ",
updateColumns.Select(c => $"target.[{c}] = source.[{c}]")));
}
sb.AppendLine("WHEN NOT MATCHED THEN INSERT");
sb.AppendLine($"({string.Join(", ", allColumns.Select(c => $"[{c}]"))})");
sb.AppendLine($"VALUES ({string.Join(", ", allColumns.Select(c => $"source.[{c}]"))});");
return sb.ToString();
}
Schema-Qualified Table Names
Both destinations support schema-qualified names via CommonScripts:
public static (string Schema, string Table) ParseTableName(string tableName)
{
var cleaned = tableName.Replace("[", "").Replace("]", "");
var parts = cleaned.Split('.', 2);
return parts.Length == 2 ? (parts[0], parts[1]) : ("dbo", parts[0]);
}
public static string FormatQualifiedTableName(string tableName)
{
var (schema, table) = ParseTableName(tableName);
return $"[{schema}].[{table}]";
}
Supported formats: "Table", "dbo.Table", "[dbo].[Table]"
Script Patterns
IScriptRunner Contract
public interface IScriptRunner
{
Task ExecuteAsync(CancellationToken cancellationToken = default);
string ScriptName { get; }
}
SqlScriptRunner Implementation
public class SqlScriptRunner : IScriptRunner
{
public SqlScriptRunner(
IDbConnectionFactory connectionFactory,
string sql,
string? name = null,
object? parameters = null,
int timeoutSeconds = 3600)
{
ScriptName = name ?? "SqlScript";
}
public async Task ExecuteAsync(CancellationToken cancellationToken = default)
{
await using var connection = await _connectionFactory
.CreateLotFinderConnectionAsync(cancellationToken);
await connection.ExecuteAsync(
new CommandDefinition(_sql, _parameters,
commandTimeout: _timeoutSeconds, cancellationToken: cancellationToken));
}
}
Common Scripts
CommonScripts provides factory methods for index management:
public static IScriptRunner DisableIndexes(
IDbConnectionFactory factory, string tableName, int timeoutSeconds = 300)
{
var (schema, table) = ParseTableName(tableName);
var sql = @"
DECLARE @sql NVARCHAR(MAX) = '';
DECLARE @fullTableName NVARCHAR(256) = QUOTENAME(@schemaName) + '.' + QUOTENAME(@tableName);
SELECT @sql = @sql + 'ALTER INDEX ' + QUOTENAME(i.name) + ' ON ' + @fullTableName + ' DISABLE;'
FROM sys.indexes i
INNER JOIN sys.tables t ON i.object_id = t.object_id
INNER JOIN sys.schemas s ON t.schema_id = s.schema_id
WHERE t.name = @tableName AND s.name = @schemaName
AND i.type = 2 AND i.is_disabled = 0;
IF LEN(@sql) > 0 EXEC sp_executesql @sql;";
return new SqlScriptRunner(factory, sql, $"DisableIndexes:{schema}.{table}",
parameters: new { tableName = table, schemaName = schema },
timeoutSeconds: timeoutSeconds);
}
public static IScriptRunner RebuildIndexes(
IDbConnectionFactory factory, string tableName, int timeoutSeconds = 3600)
{
// Similar pattern with ALTER INDEX ALL ... REBUILD
}
public static IScriptRunner UpdateStatistics(
IDbConnectionFactory factory, string tableName, int timeoutSeconds = 600)
{
// UPDATE STATISTICS with QUOTENAME protection
}
SQL injection protection
All dynamic SQL uses QUOTENAME() for identifier escaping:
DECLARE @fullTableName NVARCHAR(256) = QUOTENAME(@schemaName) + '.' + QUOTENAME(@tableName);
When to use scripts
Use pre/post scripts for large bulk loads where index overhead matters:
var pipeline = new EtlPipelineBuilder()
.WithName("LargeTableSync")
.WithSource(source)
.WithPreScript(CommonScripts.DisableIndexes(factory, "WorkOrder"))
.WithDestination(new DbBulkImportDestination(factory, "WorkOrder"))
.WithPostScript(CommonScripts.RebuildIndexes(factory, "WorkOrder"))
.WithPostScript(CommonScripts.UpdateStatistics(factory, "WorkOrder"))
.Build();
Related Documentation
- Overview - Pipeline architecture
- Transformers - Data transformation
- Configuration - Timeout and batch size options
- Troubleshooting - Performance tuning