7.8 KiB
7.8 KiB
Configuration
This document covers pipeline builder configuration, connection factory setup, and dependency injection registration.
Pipeline Builder API
EtlPipelineBuilder uses a fluent API to construct pipelines:
var pipeline = new EtlPipelineBuilder()
.WithName("WorkOrderSync")
.WithSource(new DbQuerySource(factory, "SELECT * FROM Source.WorkOrders", "WorkOrders"))
.WithTransformer(new JdeDateTransformer("STRDJ", "TRDJ", "StartDate"))
.WithTransformer(new ColumnDropTransformer("STRDJ", "TRDJ"))
.WithPreScript(CommonScripts.DisableIndexes(factory, "WorkOrder"))
.WithDestination(new DbBulkMergeDestination(factory, "WorkOrder", new[] { "OrderNumber" }))
.WithPostScript(CommonScripts.RebuildIndexes(factory, "WorkOrder"))
.WithLogger(logger)
.Build();
Builder Methods
| Method | Required | Description |
|---|---|---|
WithName(string) |
No | Pipeline name for logging. Default: "Unnamed" |
WithSource(IImportSource) |
Yes | Data source. Throws if not set before Build() |
WithTransformer(IDataTransformer) |
No | Add transformer. Can be called multiple times (chained) |
WithDestination(IImportDestination) |
Yes | Data destination. Throws if not set before Build() |
WithPreScript(IScriptRunner) |
No | Script to run before data transfer. Can be called multiple times |
WithPostScript(IScriptRunner) |
No | Script to run after data transfer. Can be called multiple times |
WithCommandTimeout(TimeSpan) |
No | Default timeout. Range: 0-24 hours. Default: 600s |
WithLogger(ILogger<EtlPipeline>) |
No | Logger for pipeline events. Default: NullLogger |
WithCommandTimeout Validation
public EtlPipelineBuilder WithCommandTimeout(TimeSpan timeout)
{
if (timeout < TimeSpan.Zero || timeout > TimeSpan.FromHours(24))
throw new ArgumentOutOfRangeException(nameof(timeout),
"Timeout must be between 0 and 24 hours.");
_defaultCommandTimeoutSeconds = (int)timeout.TotalSeconds;
return this;
}
Build Validation
public EtlPipeline Build()
{
if (_source == null)
throw new InvalidOperationException(
"Source is required. Call WithSource() before Build().");
if (_destination == null)
throw new InvalidOperationException(
"Destination is required. Call WithDestination() before Build().");
return new EtlPipeline(_name, _source, _transformers, _destination,
_preScripts, _postScripts, _logger ?? NullLogger<EtlPipeline>.Instance);
}
Component Configuration
DbQuerySource Options
| Parameter | Default | Description |
|---|---|---|
connectionFactory |
Required | Factory for database connections |
sql |
Required | SQL query to execute |
name |
"Query" |
Name for logging (appears as DbQuery:{name}) |
parameters |
null |
Anonymous object for query parameters |
commandTimeout |
3600 |
Query timeout in seconds |
DbBulkImportDestination Options
| Parameter | Default | Description |
|---|---|---|
connectionFactory |
Required | Factory for database connections |
tableName |
Required | Destination table (supports schema: dbo.Table) |
batchSize |
10000 |
Rows per batch for progress tracking |
commandTimeoutSeconds |
600 |
Timeout for TRUNCATE and bulk copy |
DbBulkMergeDestination Options
| Parameter | Default | Description |
|---|---|---|
connectionFactory |
Required | Factory for database connections |
tableName |
Required | Destination table (supports schema: dbo.Table) |
matchColumns |
Required | Key columns for MERGE matching |
updateColumns |
All non-match | Columns to update on match |
batchSize |
10000 |
Rows per batch |
commandTimeoutSeconds |
600 |
Timeout for bulk copy and MERGE |
Script Timeout Defaults
| Script | Default Timeout |
|---|---|
DisableIndexes |
300s (5 min) |
RebuildIndexes |
3600s (1 hour) |
UpdateStatistics |
600s (10 min) |
SqlScriptRunner |
3600s (1 hour) |
Connection Factory Setup
The pipeline uses IDbConnectionFactory for database connections. Register it with your connection strings:
services.AddSingleton<IDbConnectionFactory>(sp =>
{
var configuration = sp.GetRequiredService<IConfiguration>();
return new DbConnectionFactory(
configuration.GetConnectionString("LotFinder"),
configuration.GetConnectionString("JDE"),
configuration.GetConnectionString("CMS"));
});
Connection string examples
{
"ConnectionStrings": {
"LotFinder": "Server=localhost,1434;Database=LotFinder;User Id=scopingapp;Password=...;TrustServerCertificate=true",
"JDE": "Data Source=jde-oracle;User Id=...;Password=...",
"CMS": "Data Source=cms-sybase;User Id=...;Password=..."
}
}
Dependency Injection Registration
Basic registration
services.AddEtlPipeline();
This registers EtlPipelineBuilder as transient so each request gets a fresh builder.
Extension method implementation
public static class EtlServiceCollectionExtensions
{
public static IServiceCollection AddEtlPipeline(this IServiceCollection services)
{
services.AddTransient<EtlPipelineBuilder>();
return services;
}
}
Full registration example
public static IServiceCollection AddDataSync(this IServiceCollection services)
{
// Connection factory (singleton - manages connection pooling)
services.AddSingleton<IDbConnectionFactory, DbConnectionFactory>();
// ETL pipeline builder (transient - fresh instance per use)
services.AddEtlPipeline();
// Background service for scheduled syncs
services.AddHostedService<DataSyncService>();
return services;
}
Using the builder in a service
public class DataSyncService : BackgroundService
{
private readonly EtlPipelineBuilder _pipelineBuilder;
private readonly IDbConnectionFactory _connectionFactory;
private readonly ILogger<EtlPipeline> _pipelineLogger;
public DataSyncService(
EtlPipelineBuilder pipelineBuilder,
IDbConnectionFactory connectionFactory,
ILogger<EtlPipeline> pipelineLogger)
{
_pipelineBuilder = pipelineBuilder;
_connectionFactory = connectionFactory;
_pipelineLogger = pipelineLogger;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
var pipeline = _pipelineBuilder
.WithName("WorkOrderSync")
.WithSource(new DbQuerySource(_connectionFactory, "SELECT * FROM JDE.WorkOrders"))
.WithDestination(new DbBulkImportDestination(_connectionFactory, "WorkOrder"))
.WithLogger(_pipelineLogger)
.Build();
var result = await pipeline.ExecuteAsync(stoppingToken);
}
}
Configuration Summary
| Component | Option | Default | Valid Range |
|---|---|---|---|
EtlPipelineBuilder |
WithCommandTimeout |
600s | 0-24 hours |
DbQuerySource |
commandTimeout |
3600s | > 0 |
DbBulkImportDestination |
batchSize |
10000 | > 0 |
DbBulkImportDestination |
commandTimeoutSeconds |
600s | > 0 |
DbBulkMergeDestination |
batchSize |
10000 | > 0 |
DbBulkMergeDestination |
commandTimeoutSeconds |
600s | > 0 |
Related Documentation
- Overview - Pipeline architecture
- Destinations - Destination-specific options
- Troubleshooting - Timeout and batch size tuning