diff --git a/DOCUMENTATION/DataSync/Destinations.md b/DOCUMENTATION/DataSync/Destinations.md new file mode 100644 index 0000000..4ae71b7 --- /dev/null +++ b/DOCUMENTATION/DataSync/Destinations.md @@ -0,0 +1,287 @@ +# Destinations and Scripts + +Destinations consume data from the pipeline and write it to storage. Scripts run SQL operations before or after data transfer, commonly for index management. + +## IImportDestination Contract + +```csharp +public interface IImportDestination +{ + Task WriteAsync(IDataReader source, CancellationToken cancellationToken = default); + string DestinationName { get; } +} +``` + +**Key requirements:** +- Consume the entire `IDataReader` in `WriteAsync` +- Return `DestinationResult` with row count, batch count, and elapsed time +- `DestinationName` is used in logging and `StepResult` tracking + +## DbBulkImportDestination + +Full table refresh using TRUNCATE + bulk copy: + +```csharp +public class DbBulkImportDestination : IImportDestination +{ + private const int DefaultBatchSize = 10000; + private const int DefaultCommandTimeoutSeconds = 600; + + public DbBulkImportDestination( + IDbConnectionFactory connectionFactory, + string tableName, + int batchSize = 0, + int commandTimeoutSeconds = 0) + { + _batchSize = batchSize > 0 ? batchSize : DefaultBatchSize; + _commandTimeoutSeconds = commandTimeoutSeconds > 0 + ? commandTimeoutSeconds : DefaultCommandTimeoutSeconds; + } +``` + +### Column mapping + +Queries destination schema and maps only matching columns. Extra source columns are ignored: + +```csharp + var destColumns = await GetDestinationColumnsAsync(connection, cancellationToken); + + using var bulkCopy = new SqlBulkCopy(connection) + { + DestinationTableName = qualifiedName, + BatchSize = _batchSize, + BulkCopyTimeout = _commandTimeoutSeconds, + EnableStreaming = true + }; + + for (int i = 0; i < source.FieldCount; i++) + { + var columnName = source.GetName(i); + if (destColumns.Contains(columnName)) + { + bulkCopy.ColumnMappings.Add(columnName, columnName); + } + } + + if (bulkCopy.ColumnMappings.Count == 0) + throw new InvalidOperationException( + $"No columns from source exist in destination table '{_tableName}'."); +``` + +### Destination column discovery + +Uses `INFORMATION_SCHEMA.COLUMNS` with schema support: + +```csharp + private async Task> GetDestinationColumnsAsync( + SqlConnection connection, CancellationToken ct) + { + var (schema, table) = CommonScripts.ParseTableName(_tableName); + var sql = @"SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.COLUMNS + WHERE TABLE_NAME = @tableName AND TABLE_SCHEMA = @schemaName"; + var columns = await connection.QueryAsync( + new CommandDefinition(sql, new { tableName = table, schemaName = schema }, + commandTimeout: _commandTimeoutSeconds, cancellationToken: ct)); + return columns.ToHashSet(StringComparer.OrdinalIgnoreCase); + } +``` + +## DbBulkMergeDestination + +Incremental updates using bulk copy to temp table + MERGE: + +```csharp +public class DbBulkMergeDestination : IImportDestination +{ + public DbBulkMergeDestination( + IDbConnectionFactory connectionFactory, + string tableName, + string[] matchColumns, + string[]? updateColumns = null, + int batchSize = 0, + int commandTimeoutSeconds = 0) + { + if (matchColumns.Length == 0) + throw new ArgumentException("At least one match column is required."); + } +``` + +### Batch processing + +Creates a temp table, bulk copies in batches, then merges each batch: + +```csharp + var tempTableName = $"#ETL_{_tableName.Replace(".", "_").Replace("[", "").Replace("]", "")}"; + await CreateTempTableAsync(connection, tempTableName, cancellationToken); + + while (source.Read()) + { + // Buffer rows into DataTable + if (batch.Rows.Count >= _batchSize) + { + await ProcessBatchAsync(connection, batch, tempTableName, mergeSql, destColumns, ct); + totalRows += batch.Rows.Count; + batch.Clear(); + } + } +``` + +### MERGE SQL generation + +Generates MERGE statement with configurable match and update columns: + +```csharp + private string BuildMergeSql(string tempTableName, + IReadOnlyList allColumns, IReadOnlyList updateColumns) + { + var qualifiedName = CommonScripts.FormatQualifiedTableName(_tableName); + var sb = new StringBuilder(); + sb.AppendLine($"MERGE INTO {qualifiedName} AS target"); + sb.AppendLine($"USING {tempTableName} AS source"); + sb.Append("ON "); + sb.AppendLine(string.Join(" AND ", + _matchColumns.Select(c => $"target.[{c}] = source.[{c}]"))); + + if (updateColumns.Count > 0) + { + sb.AppendLine("WHEN MATCHED THEN UPDATE SET"); + sb.AppendLine(string.Join(", ", + updateColumns.Select(c => $"target.[{c}] = source.[{c}]"))); + } + + sb.AppendLine("WHEN NOT MATCHED THEN INSERT"); + sb.AppendLine($"({string.Join(", ", allColumns.Select(c => $"[{c}]"))})"); + sb.AppendLine($"VALUES ({string.Join(", ", allColumns.Select(c => $"source.[{c}]"))});"); + + return sb.ToString(); + } +``` + +## Schema-Qualified Table Names + +Both destinations support schema-qualified names via `CommonScripts`: + +```csharp +public static (string Schema, string Table) ParseTableName(string tableName) +{ + var cleaned = tableName.Replace("[", "").Replace("]", ""); + var parts = cleaned.Split('.', 2); + return parts.Length == 2 ? (parts[0], parts[1]) : ("dbo", parts[0]); +} + +public static string FormatQualifiedTableName(string tableName) +{ + var (schema, table) = ParseTableName(tableName); + return $"[{schema}].[{table}]"; +} +``` + +Supported formats: `"Table"`, `"dbo.Table"`, `"[dbo].[Table]"` + +## Script Patterns + +### IScriptRunner Contract + +```csharp +public interface IScriptRunner +{ + Task ExecuteAsync(CancellationToken cancellationToken = default); + string ScriptName { get; } +} +``` + +### SqlScriptRunner Implementation + +```csharp +public class SqlScriptRunner : IScriptRunner +{ + public SqlScriptRunner( + IDbConnectionFactory connectionFactory, + string sql, + string? name = null, + object? parameters = null, + int timeoutSeconds = 3600) + { + ScriptName = name ?? "SqlScript"; + } + + public async Task ExecuteAsync(CancellationToken cancellationToken = default) + { + await using var connection = await _connectionFactory + .CreateLotFinderConnectionAsync(cancellationToken); + await connection.ExecuteAsync( + new CommandDefinition(_sql, _parameters, + commandTimeout: _timeoutSeconds, cancellationToken: cancellationToken)); + } +} +``` + +### Common Scripts + +`CommonScripts` provides factory methods for index management: + +```csharp +public static IScriptRunner DisableIndexes( + IDbConnectionFactory factory, string tableName, int timeoutSeconds = 300) +{ + var (schema, table) = ParseTableName(tableName); + var sql = @" +DECLARE @sql NVARCHAR(MAX) = ''; +DECLARE @fullTableName NVARCHAR(256) = QUOTENAME(@schemaName) + '.' + QUOTENAME(@tableName); + +SELECT @sql = @sql + 'ALTER INDEX ' + QUOTENAME(i.name) + ' ON ' + @fullTableName + ' DISABLE;' +FROM sys.indexes i +INNER JOIN sys.tables t ON i.object_id = t.object_id +INNER JOIN sys.schemas s ON t.schema_id = s.schema_id +WHERE t.name = @tableName AND s.name = @schemaName + AND i.type = 2 AND i.is_disabled = 0; + +IF LEN(@sql) > 0 EXEC sp_executesql @sql;"; + + return new SqlScriptRunner(factory, sql, $"DisableIndexes:{schema}.{table}", + parameters: new { tableName = table, schemaName = schema }, + timeoutSeconds: timeoutSeconds); +} + +public static IScriptRunner RebuildIndexes( + IDbConnectionFactory factory, string tableName, int timeoutSeconds = 3600) +{ + // Similar pattern with ALTER INDEX ALL ... REBUILD +} + +public static IScriptRunner UpdateStatistics( + IDbConnectionFactory factory, string tableName, int timeoutSeconds = 600) +{ + // UPDATE STATISTICS with QUOTENAME protection +} +``` + +### SQL injection protection + +All dynamic SQL uses `QUOTENAME()` for identifier escaping: + +```csharp +DECLARE @fullTableName NVARCHAR(256) = QUOTENAME(@schemaName) + '.' + QUOTENAME(@tableName); +``` + +### When to use scripts + +Use pre/post scripts for large bulk loads where index overhead matters: + +```csharp +var pipeline = new EtlPipelineBuilder() + .WithName("LargeTableSync") + .WithSource(source) + .WithPreScript(CommonScripts.DisableIndexes(factory, "WorkOrder")) + .WithDestination(new DbBulkImportDestination(factory, "WorkOrder")) + .WithPostScript(CommonScripts.RebuildIndexes(factory, "WorkOrder")) + .WithPostScript(CommonScripts.UpdateStatistics(factory, "WorkOrder")) + .Build(); +``` + +## Related Documentation + +- [Overview](./Overview.md) - Pipeline architecture +- [Transformers](./Transformers.md) - Data transformation +- [Configuration](./Configuration.md) - Timeout and batch size options +- [Troubleshooting](./Troubleshooting.md) - Performance tuning