288 lines
9.1 KiB
Markdown
288 lines
9.1 KiB
Markdown
# Destinations and Scripts
|
|
|
|
Destinations consume data from the pipeline and write it to storage. Scripts run SQL operations before or after data transfer, commonly for index management.
|
|
|
|
## IImportDestination Contract
|
|
|
|
```csharp
|
|
public interface IImportDestination
|
|
{
|
|
Task<DestinationResult> WriteAsync(IDataReader source, CancellationToken cancellationToken = default);
|
|
string DestinationName { get; }
|
|
}
|
|
```
|
|
|
|
**Key requirements:**
|
|
- Consume the entire `IDataReader` in `WriteAsync`
|
|
- Return `DestinationResult` with row count, batch count, and elapsed time
|
|
- `DestinationName` is used in logging and `StepResult` tracking
|
|
|
|
## DbBulkImportDestination
|
|
|
|
Full table refresh using TRUNCATE + bulk copy:
|
|
|
|
```csharp
|
|
public class DbBulkImportDestination : IImportDestination
|
|
{
|
|
private const int DefaultBatchSize = 10000;
|
|
private const int DefaultCommandTimeoutSeconds = 600;
|
|
|
|
public DbBulkImportDestination(
|
|
IDbConnectionFactory connectionFactory,
|
|
string tableName,
|
|
int batchSize = 0,
|
|
int commandTimeoutSeconds = 0)
|
|
{
|
|
_batchSize = batchSize > 0 ? batchSize : DefaultBatchSize;
|
|
_commandTimeoutSeconds = commandTimeoutSeconds > 0
|
|
? commandTimeoutSeconds : DefaultCommandTimeoutSeconds;
|
|
}
|
|
```
|
|
|
|
### Column mapping
|
|
|
|
Queries destination schema and maps only matching columns. Extra source columns are ignored:
|
|
|
|
```csharp
|
|
var destColumns = await GetDestinationColumnsAsync(connection, cancellationToken);
|
|
|
|
using var bulkCopy = new SqlBulkCopy(connection)
|
|
{
|
|
DestinationTableName = qualifiedName,
|
|
BatchSize = _batchSize,
|
|
BulkCopyTimeout = _commandTimeoutSeconds,
|
|
EnableStreaming = true
|
|
};
|
|
|
|
for (int i = 0; i < source.FieldCount; i++)
|
|
{
|
|
var columnName = source.GetName(i);
|
|
if (destColumns.Contains(columnName))
|
|
{
|
|
bulkCopy.ColumnMappings.Add(columnName, columnName);
|
|
}
|
|
}
|
|
|
|
if (bulkCopy.ColumnMappings.Count == 0)
|
|
throw new InvalidOperationException(
|
|
$"No columns from source exist in destination table '{_tableName}'.");
|
|
```
|
|
|
|
### Destination column discovery
|
|
|
|
Uses `INFORMATION_SCHEMA.COLUMNS` with schema support:
|
|
|
|
```csharp
|
|
private async Task<HashSet<string>> GetDestinationColumnsAsync(
|
|
SqlConnection connection, CancellationToken ct)
|
|
{
|
|
var (schema, table) = CommonScripts.ParseTableName(_tableName);
|
|
var sql = @"SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.COLUMNS
|
|
WHERE TABLE_NAME = @tableName AND TABLE_SCHEMA = @schemaName";
|
|
var columns = await connection.QueryAsync<string>(
|
|
new CommandDefinition(sql, new { tableName = table, schemaName = schema },
|
|
commandTimeout: _commandTimeoutSeconds, cancellationToken: ct));
|
|
return columns.ToHashSet(StringComparer.OrdinalIgnoreCase);
|
|
}
|
|
```
|
|
|
|
## DbBulkMergeDestination
|
|
|
|
Incremental updates using bulk copy to temp table + MERGE:
|
|
|
|
```csharp
|
|
public class DbBulkMergeDestination : IImportDestination
|
|
{
|
|
public DbBulkMergeDestination(
|
|
IDbConnectionFactory connectionFactory,
|
|
string tableName,
|
|
string[] matchColumns,
|
|
string[]? updateColumns = null,
|
|
int batchSize = 0,
|
|
int commandTimeoutSeconds = 0)
|
|
{
|
|
if (matchColumns.Length == 0)
|
|
throw new ArgumentException("At least one match column is required.");
|
|
}
|
|
```
|
|
|
|
### Batch processing
|
|
|
|
Creates a temp table, bulk copies in batches, then merges each batch:
|
|
|
|
```csharp
|
|
var tempTableName = $"#ETL_{_tableName.Replace(".", "_").Replace("[", "").Replace("]", "")}";
|
|
await CreateTempTableAsync(connection, tempTableName, cancellationToken);
|
|
|
|
while (source.Read())
|
|
{
|
|
// Buffer rows into DataTable
|
|
if (batch.Rows.Count >= _batchSize)
|
|
{
|
|
await ProcessBatchAsync(connection, batch, tempTableName, mergeSql, destColumns, ct);
|
|
totalRows += batch.Rows.Count;
|
|
batch.Clear();
|
|
}
|
|
}
|
|
```
|
|
|
|
### MERGE SQL generation
|
|
|
|
Generates MERGE statement with configurable match and update columns:
|
|
|
|
```csharp
|
|
private string BuildMergeSql(string tempTableName,
|
|
IReadOnlyList<string> allColumns, IReadOnlyList<string> updateColumns)
|
|
{
|
|
var qualifiedName = CommonScripts.FormatQualifiedTableName(_tableName);
|
|
var sb = new StringBuilder();
|
|
sb.AppendLine($"MERGE INTO {qualifiedName} AS target");
|
|
sb.AppendLine($"USING {tempTableName} AS source");
|
|
sb.Append("ON ");
|
|
sb.AppendLine(string.Join(" AND ",
|
|
_matchColumns.Select(c => $"target.[{c}] = source.[{c}]")));
|
|
|
|
if (updateColumns.Count > 0)
|
|
{
|
|
sb.AppendLine("WHEN MATCHED THEN UPDATE SET");
|
|
sb.AppendLine(string.Join(", ",
|
|
updateColumns.Select(c => $"target.[{c}] = source.[{c}]")));
|
|
}
|
|
|
|
sb.AppendLine("WHEN NOT MATCHED THEN INSERT");
|
|
sb.AppendLine($"({string.Join(", ", allColumns.Select(c => $"[{c}]"))})");
|
|
sb.AppendLine($"VALUES ({string.Join(", ", allColumns.Select(c => $"source.[{c}]"))});");
|
|
|
|
return sb.ToString();
|
|
}
|
|
```
|
|
|
|
## Schema-Qualified Table Names
|
|
|
|
Both destinations support schema-qualified names via `CommonScripts`:
|
|
|
|
```csharp
|
|
public static (string Schema, string Table) ParseTableName(string tableName)
|
|
{
|
|
var cleaned = tableName.Replace("[", "").Replace("]", "");
|
|
var parts = cleaned.Split('.', 2);
|
|
return parts.Length == 2 ? (parts[0], parts[1]) : ("dbo", parts[0]);
|
|
}
|
|
|
|
public static string FormatQualifiedTableName(string tableName)
|
|
{
|
|
var (schema, table) = ParseTableName(tableName);
|
|
return $"[{schema}].[{table}]";
|
|
}
|
|
```
|
|
|
|
Supported formats: `"Table"`, `"dbo.Table"`, `"[dbo].[Table]"`
|
|
|
|
## Script Patterns
|
|
|
|
### IScriptRunner Contract
|
|
|
|
```csharp
|
|
public interface IScriptRunner
|
|
{
|
|
Task ExecuteAsync(CancellationToken cancellationToken = default);
|
|
string ScriptName { get; }
|
|
}
|
|
```
|
|
|
|
### SqlScriptRunner Implementation
|
|
|
|
```csharp
|
|
public class SqlScriptRunner : IScriptRunner
|
|
{
|
|
public SqlScriptRunner(
|
|
IDbConnectionFactory connectionFactory,
|
|
string sql,
|
|
string? name = null,
|
|
object? parameters = null,
|
|
int timeoutSeconds = 3600)
|
|
{
|
|
ScriptName = name ?? "SqlScript";
|
|
}
|
|
|
|
public async Task ExecuteAsync(CancellationToken cancellationToken = default)
|
|
{
|
|
await using var connection = await _connectionFactory
|
|
.CreateLotFinderConnectionAsync(cancellationToken);
|
|
await connection.ExecuteAsync(
|
|
new CommandDefinition(_sql, _parameters,
|
|
commandTimeout: _timeoutSeconds, cancellationToken: cancellationToken));
|
|
}
|
|
}
|
|
```
|
|
|
|
### Common Scripts
|
|
|
|
`CommonScripts` provides factory methods for index management:
|
|
|
|
```csharp
|
|
public static IScriptRunner DisableIndexes(
|
|
IDbConnectionFactory factory, string tableName, int timeoutSeconds = 300)
|
|
{
|
|
var (schema, table) = ParseTableName(tableName);
|
|
var sql = @"
|
|
DECLARE @sql NVARCHAR(MAX) = '';
|
|
DECLARE @fullTableName NVARCHAR(256) = QUOTENAME(@schemaName) + '.' + QUOTENAME(@tableName);
|
|
|
|
SELECT @sql = @sql + 'ALTER INDEX ' + QUOTENAME(i.name) + ' ON ' + @fullTableName + ' DISABLE;'
|
|
FROM sys.indexes i
|
|
INNER JOIN sys.tables t ON i.object_id = t.object_id
|
|
INNER JOIN sys.schemas s ON t.schema_id = s.schema_id
|
|
WHERE t.name = @tableName AND s.name = @schemaName
|
|
AND i.type = 2 AND i.is_disabled = 0;
|
|
|
|
IF LEN(@sql) > 0 EXEC sp_executesql @sql;";
|
|
|
|
return new SqlScriptRunner(factory, sql, $"DisableIndexes:{schema}.{table}",
|
|
parameters: new { tableName = table, schemaName = schema },
|
|
timeoutSeconds: timeoutSeconds);
|
|
}
|
|
|
|
public static IScriptRunner RebuildIndexes(
|
|
IDbConnectionFactory factory, string tableName, int timeoutSeconds = 3600)
|
|
{
|
|
// Similar pattern with ALTER INDEX ALL ... REBUILD
|
|
}
|
|
|
|
public static IScriptRunner UpdateStatistics(
|
|
IDbConnectionFactory factory, string tableName, int timeoutSeconds = 600)
|
|
{
|
|
// UPDATE STATISTICS with QUOTENAME protection
|
|
}
|
|
```
|
|
|
|
### SQL injection protection
|
|
|
|
All dynamic SQL uses `QUOTENAME()` for identifier escaping:
|
|
|
|
```csharp
|
|
DECLARE @fullTableName NVARCHAR(256) = QUOTENAME(@schemaName) + '.' + QUOTENAME(@tableName);
|
|
```
|
|
|
|
### When to use scripts
|
|
|
|
Use pre/post scripts for large bulk loads where index overhead matters:
|
|
|
|
```csharp
|
|
var pipeline = new EtlPipelineBuilder()
|
|
.WithName("LargeTableSync")
|
|
.WithSource(source)
|
|
.WithPreScript(CommonScripts.DisableIndexes(factory, "WorkOrder"))
|
|
.WithDestination(new DbBulkImportDestination(factory, "WorkOrder"))
|
|
.WithPostScript(CommonScripts.RebuildIndexes(factory, "WorkOrder"))
|
|
.WithPostScript(CommonScripts.UpdateStatistics(factory, "WorkOrder"))
|
|
.Build();
|
|
```
|
|
|
|
## Related Documentation
|
|
|
|
- [Overview](./Overview.md) - Pipeline architecture
|
|
- [Transformers](./Transformers.md) - Data transformation
|
|
- [Configuration](./Configuration.md) - Timeout and batch size options
|
|
- [Troubleshooting](./Troubleshooting.md) - Performance tuning
|