Files
2026-01-03 15:39:41 -05:00

7.8 KiB

Configuration

This document covers pipeline builder configuration, connection factory setup, and dependency injection registration.

Pipeline Builder API

EtlPipelineBuilder uses a fluent API to construct pipelines:

var pipeline = new EtlPipelineBuilder()
    .WithName("WorkOrderSync")
    .WithSource(new DbQuerySource(factory, "SELECT * FROM Source.WorkOrders", "WorkOrders"))
    .WithTransformer(new JdeDateTransformer("STRDJ", "TRDJ", "StartDate"))
    .WithTransformer(new ColumnDropTransformer("STRDJ", "TRDJ"))
    .WithPreScript(CommonScripts.DisableIndexes(factory, "WorkOrder"))
    .WithDestination(new DbBulkMergeDestination(factory, "WorkOrder", new[] { "OrderNumber" }))
    .WithPostScript(CommonScripts.RebuildIndexes(factory, "WorkOrder"))
    .WithLogger(logger)
    .Build();

Builder Methods

Method Required Description
WithName(string) No Pipeline name for logging. Default: "Unnamed"
WithSource(IImportSource) Yes Data source. Throws if not set before Build()
WithTransformer(IDataTransformer) No Add transformer. Can be called multiple times (chained)
WithDestination(IImportDestination) Yes Data destination. Throws if not set before Build()
WithPreScript(IScriptRunner) No Script to run before data transfer. Can be called multiple times
WithPostScript(IScriptRunner) No Script to run after data transfer. Can be called multiple times
WithCommandTimeout(TimeSpan) No Default timeout. Range: 0-24 hours. Default: 600s
WithLogger(ILogger<EtlPipeline>) No Logger for pipeline events. Default: NullLogger

WithCommandTimeout Validation

public EtlPipelineBuilder WithCommandTimeout(TimeSpan timeout)
{
    if (timeout < TimeSpan.Zero || timeout > TimeSpan.FromHours(24))
        throw new ArgumentOutOfRangeException(nameof(timeout),
            "Timeout must be between 0 and 24 hours.");
    _defaultCommandTimeoutSeconds = (int)timeout.TotalSeconds;
    return this;
}

Build Validation

public EtlPipeline Build()
{
    if (_source == null)
        throw new InvalidOperationException(
            "Source is required. Call WithSource() before Build().");
    if (_destination == null)
        throw new InvalidOperationException(
            "Destination is required. Call WithDestination() before Build().");

    return new EtlPipeline(_name, _source, _transformers, _destination,
        _preScripts, _postScripts, _logger ?? NullLogger<EtlPipeline>.Instance);
}

Component Configuration

DbQuerySource Options

Parameter Default Description
connectionFactory Required Factory for database connections
sql Required SQL query to execute
name "Query" Name for logging (appears as DbQuery:{name})
parameters null Anonymous object for query parameters
commandTimeout 3600 Query timeout in seconds

DbBulkImportDestination Options

Parameter Default Description
connectionFactory Required Factory for database connections
tableName Required Destination table (supports schema: dbo.Table)
batchSize 10000 Rows per batch for progress tracking
commandTimeoutSeconds 600 Timeout for TRUNCATE and bulk copy

DbBulkMergeDestination Options

Parameter Default Description
connectionFactory Required Factory for database connections
tableName Required Destination table (supports schema: dbo.Table)
matchColumns Required Key columns for MERGE matching
updateColumns All non-match Columns to update on match
batchSize 10000 Rows per batch
commandTimeoutSeconds 600 Timeout for bulk copy and MERGE

Script Timeout Defaults

Script Default Timeout
DisableIndexes 300s (5 min)
RebuildIndexes 3600s (1 hour)
UpdateStatistics 600s (10 min)
SqlScriptRunner 3600s (1 hour)

Connection Factory Setup

The pipeline uses IDbConnectionFactory for database connections. Register it with your connection strings:

services.AddSingleton<IDbConnectionFactory>(sp =>
{
    var configuration = sp.GetRequiredService<IConfiguration>();
    return new DbConnectionFactory(
        configuration.GetConnectionString("LotFinder"),
        configuration.GetConnectionString("JDE"),
        configuration.GetConnectionString("CMS"));
});

Connection string examples

{
  "ConnectionStrings": {
    "LotFinder": "Server=localhost,1434;Database=LotFinder;User Id=scopingapp;Password=...;TrustServerCertificate=true",
    "JDE": "Data Source=jde-oracle;User Id=...;Password=...",
    "CMS": "Data Source=cms-sybase;User Id=...;Password=..."
  }
}

Dependency Injection Registration

Basic registration

services.AddEtlPipeline();

This registers EtlPipelineBuilder as transient so each request gets a fresh builder.

Extension method implementation

public static class EtlServiceCollectionExtensions
{
    public static IServiceCollection AddEtlPipeline(this IServiceCollection services)
    {
        services.AddTransient<EtlPipelineBuilder>();
        return services;
    }
}

Full registration example

public static IServiceCollection AddDataSync(this IServiceCollection services)
{
    // Connection factory (singleton - manages connection pooling)
    services.AddSingleton<IDbConnectionFactory, DbConnectionFactory>();

    // ETL pipeline builder (transient - fresh instance per use)
    services.AddEtlPipeline();

    // Background service for scheduled syncs
    services.AddHostedService<DataSyncService>();

    return services;
}

Using the builder in a service

public class DataSyncService : BackgroundService
{
    private readonly EtlPipelineBuilder _pipelineBuilder;
    private readonly IDbConnectionFactory _connectionFactory;
    private readonly ILogger<EtlPipeline> _pipelineLogger;

    public DataSyncService(
        EtlPipelineBuilder pipelineBuilder,
        IDbConnectionFactory connectionFactory,
        ILogger<EtlPipeline> pipelineLogger)
    {
        _pipelineBuilder = pipelineBuilder;
        _connectionFactory = connectionFactory;
        _pipelineLogger = pipelineLogger;
    }

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        var pipeline = _pipelineBuilder
            .WithName("WorkOrderSync")
            .WithSource(new DbQuerySource(_connectionFactory, "SELECT * FROM JDE.WorkOrders"))
            .WithDestination(new DbBulkImportDestination(_connectionFactory, "WorkOrder"))
            .WithLogger(_pipelineLogger)
            .Build();

        var result = await pipeline.ExecuteAsync(stoppingToken);
    }
}

Configuration Summary

Component Option Default Valid Range
EtlPipelineBuilder WithCommandTimeout 600s 0-24 hours
DbQuerySource commandTimeout 3600s > 0
DbBulkImportDestination batchSize 10000 > 0
DbBulkImportDestination commandTimeoutSeconds 600s > 0
DbBulkMergeDestination batchSize 10000 > 0
DbBulkMergeDestination commandTimeoutSeconds 600s > 0