# Data Sources Sources provide data to the ETL pipeline by implementing `IImportSource`. They return an `IDataReader` that streams rows to transformers and destinations. ## Interface Contract ```csharp public interface IImportSource : IAsyncDisposable { Task ReadDataAsync(CancellationToken cancellationToken = default); string SourceName { get; } } ``` **Key requirements:** - Implement `IAsyncDisposable` for connection cleanup - Return a live `IDataReader` (not buffered) for memory efficiency - `SourceName` is used in logging and `StepResult` tracking ## DbQuerySource Implementation `DbQuerySource` executes a SQL query against the local cache database: ```csharp public class DbQuerySource : IImportSource { private readonly IDbConnectionFactory _connectionFactory; private readonly string _sql; private readonly object? _parameters; private readonly int _commandTimeout; private SqlConnection? _connection; private SqlCommand? _command; public string SourceName { get; } public DbQuerySource( IDbConnectionFactory connectionFactory, string sql, string? name = null, object? parameters = null, int commandTimeout = 3600) { _connectionFactory = connectionFactory; _sql = sql; _parameters = parameters; _commandTimeout = commandTimeout; SourceName = $"DbQuery:{name ?? "Query"}"; } ``` ### Reading data The connection opens in `ReadDataAsync` and stays open until disposal: ```csharp public async Task ReadDataAsync(CancellationToken cancellationToken = default) { _connection = await _connectionFactory.CreateLotFinderConnectionAsync(cancellationToken); _command = _connection.CreateCommand(); _command.CommandText = _sql; _command.CommandTimeout = _commandTimeout; AddParameters(_command, _parameters); return await _command.ExecuteReaderAsync(cancellationToken); } ``` ### Parameter handling Parameters are added from an anonymous object using reflection: ```csharp private static void AddParameters(SqlCommand command, object? parameters) { if (parameters == null) return; var properties = parameters.GetType().GetProperties(); foreach (var prop in properties) { var value = prop.GetValue(parameters) ?? DBNull.Value; command.Parameters.AddWithValue($"@{prop.Name}", value); } } ``` ### Resource cleanup Both the command and connection are disposed asynchronously: ```csharp public async ValueTask DisposeAsync() { if (_command != null) { await _command.DisposeAsync(); _command = null; } if (_connection != null) { await _connection.DisposeAsync(); _connection = null; } } } ``` ## Key Patterns ### Keep sources stateless until ReadDataAsync Don't open connections or execute queries in the constructor. The source should be configurable without side effects until `ReadDataAsync` is called. ### Streaming, not buffering Return a live `IDataReader` rather than loading all data into memory. This allows processing millions of rows without memory pressure. ### Use SourceName for diagnostics Format: `"DbQuery:{table}"` or `"File:{filename}"`. This appears in logs and `StepResult.StepName`. ## Future source types The interface supports additional source types not yet implemented: - **File-based sources** - CSV, Excel files - **API sources** - REST endpoints returning paged data - **Oracle/Sybase sources** - Direct queries against JDE or CMS Each would implement the same interface with different connection and reader implementations. ## Related Documentation - [Overview](./Overview.md) - Pipeline architecture - [Transformers](./Transformers.md) - Processing source data - [Configuration](./Configuration.md) - Connection factory setup