Files
2026-01-03 15:34:45 -05:00

4.0 KiB

Data Sources

Sources provide data to the ETL pipeline by implementing IImportSource. They return an IDataReader that streams rows to transformers and destinations.

Interface Contract

public interface IImportSource : IAsyncDisposable
{
    Task<IDataReader> ReadDataAsync(CancellationToken cancellationToken = default);
    string SourceName { get; }
}

Key requirements:

  • Implement IAsyncDisposable for connection cleanup
  • Return a live IDataReader (not buffered) for memory efficiency
  • SourceName is used in logging and StepResult tracking

DbQuerySource Implementation

DbQuerySource executes a SQL query against the local cache database:

public class DbQuerySource : IImportSource
{
    private readonly IDbConnectionFactory _connectionFactory;
    private readonly string _sql;
    private readonly object? _parameters;
    private readonly int _commandTimeout;
    private SqlConnection? _connection;
    private SqlCommand? _command;

    public string SourceName { get; }

    public DbQuerySource(
        IDbConnectionFactory connectionFactory,
        string sql,
        string? name = null,
        object? parameters = null,
        int commandTimeout = 3600)
    {
        _connectionFactory = connectionFactory;
        _sql = sql;
        _parameters = parameters;
        _commandTimeout = commandTimeout;
        SourceName = $"DbQuery:{name ?? "Query"}";
    }

Reading data

The connection opens in ReadDataAsync and stays open until disposal:

    public async Task<IDataReader> ReadDataAsync(CancellationToken cancellationToken = default)
    {
        _connection = await _connectionFactory.CreateLotFinderConnectionAsync(cancellationToken);
        _command = _connection.CreateCommand();
        _command.CommandText = _sql;
        _command.CommandTimeout = _commandTimeout;
        AddParameters(_command, _parameters);
        return await _command.ExecuteReaderAsync(cancellationToken);
    }

Parameter handling

Parameters are added from an anonymous object using reflection:

    private static void AddParameters(SqlCommand command, object? parameters)
    {
        if (parameters == null) return;

        var properties = parameters.GetType().GetProperties();
        foreach (var prop in properties)
        {
            var value = prop.GetValue(parameters) ?? DBNull.Value;
            command.Parameters.AddWithValue($"@{prop.Name}", value);
        }
    }

Resource cleanup

Both the command and connection are disposed asynchronously:

    public async ValueTask DisposeAsync()
    {
        if (_command != null)
        {
            await _command.DisposeAsync();
            _command = null;
        }
        if (_connection != null)
        {
            await _connection.DisposeAsync();
            _connection = null;
        }
    }
}

Key Patterns

Keep sources stateless until ReadDataAsync

Don't open connections or execute queries in the constructor. The source should be configurable without side effects until ReadDataAsync is called.

Streaming, not buffering

Return a live IDataReader rather than loading all data into memory. This allows processing millions of rows without memory pressure.

Use SourceName for diagnostics

Format: "DbQuery:{table}" or "File:{filename}". This appears in logs and StepResult.StepName.

Future source types

The interface supports additional source types not yet implemented:

  • File-based sources - CSV, Excel files
  • API sources - REST endpoints returning paged data
  • Oracle/Sybase sources - Direct queries against JDE or CMS

Each would implement the same interface with different connection and reader implementations.