Files
jdescopingtool/DOCUMENTATION/DataSync/Destinations.md
T
2026-01-03 15:38:01 -05:00

9.1 KiB

Destinations and Scripts

Destinations consume data from the pipeline and write it to storage. Scripts run SQL operations before or after data transfer, commonly for index management.

IImportDestination Contract

public interface IImportDestination
{
    Task<DestinationResult> WriteAsync(IDataReader source, CancellationToken cancellationToken = default);
    string DestinationName { get; }
}

Key requirements:

  • Consume the entire IDataReader in WriteAsync
  • Return DestinationResult with row count, batch count, and elapsed time
  • DestinationName is used in logging and StepResult tracking

DbBulkImportDestination

Full table refresh using TRUNCATE + bulk copy:

public class DbBulkImportDestination : IImportDestination
{
    private const int DefaultBatchSize = 10000;
    private const int DefaultCommandTimeoutSeconds = 600;

    public DbBulkImportDestination(
        IDbConnectionFactory connectionFactory,
        string tableName,
        int batchSize = 0,
        int commandTimeoutSeconds = 0)
    {
        _batchSize = batchSize > 0 ? batchSize : DefaultBatchSize;
        _commandTimeoutSeconds = commandTimeoutSeconds > 0
            ? commandTimeoutSeconds : DefaultCommandTimeoutSeconds;
    }

Column mapping

Queries destination schema and maps only matching columns. Extra source columns are ignored:

    var destColumns = await GetDestinationColumnsAsync(connection, cancellationToken);

    using var bulkCopy = new SqlBulkCopy(connection)
    {
        DestinationTableName = qualifiedName,
        BatchSize = _batchSize,
        BulkCopyTimeout = _commandTimeoutSeconds,
        EnableStreaming = true
    };

    for (int i = 0; i < source.FieldCount; i++)
    {
        var columnName = source.GetName(i);
        if (destColumns.Contains(columnName))
        {
            bulkCopy.ColumnMappings.Add(columnName, columnName);
        }
    }

    if (bulkCopy.ColumnMappings.Count == 0)
        throw new InvalidOperationException(
            $"No columns from source exist in destination table '{_tableName}'.");

Destination column discovery

Uses INFORMATION_SCHEMA.COLUMNS with schema support:

    private async Task<HashSet<string>> GetDestinationColumnsAsync(
        SqlConnection connection, CancellationToken ct)
    {
        var (schema, table) = CommonScripts.ParseTableName(_tableName);
        var sql = @"SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.COLUMNS
                    WHERE TABLE_NAME = @tableName AND TABLE_SCHEMA = @schemaName";
        var columns = await connection.QueryAsync<string>(
            new CommandDefinition(sql, new { tableName = table, schemaName = schema },
                commandTimeout: _commandTimeoutSeconds, cancellationToken: ct));
        return columns.ToHashSet(StringComparer.OrdinalIgnoreCase);
    }

DbBulkMergeDestination

Incremental updates using bulk copy to temp table + MERGE:

public class DbBulkMergeDestination : IImportDestination
{
    public DbBulkMergeDestination(
        IDbConnectionFactory connectionFactory,
        string tableName,
        string[] matchColumns,
        string[]? updateColumns = null,
        int batchSize = 0,
        int commandTimeoutSeconds = 0)
    {
        if (matchColumns.Length == 0)
            throw new ArgumentException("At least one match column is required.");
    }

Batch processing

Creates a temp table, bulk copies in batches, then merges each batch:

    var tempTableName = $"#ETL_{_tableName.Replace(".", "_").Replace("[", "").Replace("]", "")}";
    await CreateTempTableAsync(connection, tempTableName, cancellationToken);

    while (source.Read())
    {
        // Buffer rows into DataTable
        if (batch.Rows.Count >= _batchSize)
        {
            await ProcessBatchAsync(connection, batch, tempTableName, mergeSql, destColumns, ct);
            totalRows += batch.Rows.Count;
            batch.Clear();
        }
    }

MERGE SQL generation

Generates MERGE statement with configurable match and update columns:

    private string BuildMergeSql(string tempTableName,
        IReadOnlyList<string> allColumns, IReadOnlyList<string> updateColumns)
    {
        var qualifiedName = CommonScripts.FormatQualifiedTableName(_tableName);
        var sb = new StringBuilder();
        sb.AppendLine($"MERGE INTO {qualifiedName} AS target");
        sb.AppendLine($"USING {tempTableName} AS source");
        sb.Append("ON ");
        sb.AppendLine(string.Join(" AND ",
            _matchColumns.Select(c => $"target.[{c}] = source.[{c}]")));

        if (updateColumns.Count > 0)
        {
            sb.AppendLine("WHEN MATCHED THEN UPDATE SET");
            sb.AppendLine(string.Join(", ",
                updateColumns.Select(c => $"target.[{c}] = source.[{c}]")));
        }

        sb.AppendLine("WHEN NOT MATCHED THEN INSERT");
        sb.AppendLine($"({string.Join(", ", allColumns.Select(c => $"[{c}]"))})");
        sb.AppendLine($"VALUES ({string.Join(", ", allColumns.Select(c => $"source.[{c}]"))});");

        return sb.ToString();
    }

Schema-Qualified Table Names

Both destinations support schema-qualified names via CommonScripts:

public static (string Schema, string Table) ParseTableName(string tableName)
{
    var cleaned = tableName.Replace("[", "").Replace("]", "");
    var parts = cleaned.Split('.', 2);
    return parts.Length == 2 ? (parts[0], parts[1]) : ("dbo", parts[0]);
}

public static string FormatQualifiedTableName(string tableName)
{
    var (schema, table) = ParseTableName(tableName);
    return $"[{schema}].[{table}]";
}

Supported formats: "Table", "dbo.Table", "[dbo].[Table]"

Script Patterns

IScriptRunner Contract

public interface IScriptRunner
{
    Task ExecuteAsync(CancellationToken cancellationToken = default);
    string ScriptName { get; }
}

SqlScriptRunner Implementation

public class SqlScriptRunner : IScriptRunner
{
    public SqlScriptRunner(
        IDbConnectionFactory connectionFactory,
        string sql,
        string? name = null,
        object? parameters = null,
        int timeoutSeconds = 3600)
    {
        ScriptName = name ?? "SqlScript";
    }

    public async Task ExecuteAsync(CancellationToken cancellationToken = default)
    {
        await using var connection = await _connectionFactory
            .CreateLotFinderConnectionAsync(cancellationToken);
        await connection.ExecuteAsync(
            new CommandDefinition(_sql, _parameters,
                commandTimeout: _timeoutSeconds, cancellationToken: cancellationToken));
    }
}

Common Scripts

CommonScripts provides factory methods for index management:

public static IScriptRunner DisableIndexes(
    IDbConnectionFactory factory, string tableName, int timeoutSeconds = 300)
{
    var (schema, table) = ParseTableName(tableName);
    var sql = @"
DECLARE @sql NVARCHAR(MAX) = '';
DECLARE @fullTableName NVARCHAR(256) = QUOTENAME(@schemaName) + '.' + QUOTENAME(@tableName);

SELECT @sql = @sql + 'ALTER INDEX ' + QUOTENAME(i.name) + ' ON ' + @fullTableName + ' DISABLE;'
FROM sys.indexes i
INNER JOIN sys.tables t ON i.object_id = t.object_id
INNER JOIN sys.schemas s ON t.schema_id = s.schema_id
WHERE t.name = @tableName AND s.name = @schemaName
  AND i.type = 2 AND i.is_disabled = 0;

IF LEN(@sql) > 0 EXEC sp_executesql @sql;";

    return new SqlScriptRunner(factory, sql, $"DisableIndexes:{schema}.{table}",
        parameters: new { tableName = table, schemaName = schema },
        timeoutSeconds: timeoutSeconds);
}

public static IScriptRunner RebuildIndexes(
    IDbConnectionFactory factory, string tableName, int timeoutSeconds = 3600)
{
    // Similar pattern with ALTER INDEX ALL ... REBUILD
}

public static IScriptRunner UpdateStatistics(
    IDbConnectionFactory factory, string tableName, int timeoutSeconds = 600)
{
    // UPDATE STATISTICS with QUOTENAME protection
}

SQL injection protection

All dynamic SQL uses QUOTENAME() for identifier escaping:

DECLARE @fullTableName NVARCHAR(256) = QUOTENAME(@schemaName) + '.' + QUOTENAME(@tableName);

When to use scripts

Use pre/post scripts for large bulk loads where index overhead matters:

var pipeline = new EtlPipelineBuilder()
    .WithName("LargeTableSync")
    .WithSource(source)
    .WithPreScript(CommonScripts.DisableIndexes(factory, "WorkOrder"))
    .WithDestination(new DbBulkImportDestination(factory, "WorkOrder"))
    .WithPostScript(CommonScripts.RebuildIndexes(factory, "WorkOrder"))
    .WithPostScript(CommonScripts.UpdateStatistics(factory, "WorkOrder"))
    .Build();