4.0 KiB
Data Sources
Sources provide data to the ETL pipeline by implementing IImportSource. They return an IDataReader that streams rows to transformers and destinations.
Interface Contract
public interface IImportSource : IAsyncDisposable
{
Task<IDataReader> ReadDataAsync(CancellationToken cancellationToken = default);
string SourceName { get; }
}
Key requirements:
- Implement
IAsyncDisposablefor connection cleanup - Return a live
IDataReader(not buffered) for memory efficiency SourceNameis used in logging andStepResulttracking
DbQuerySource Implementation
DbQuerySource executes a SQL query against the local cache database:
public class DbQuerySource : IImportSource
{
private readonly IDbConnectionFactory _connectionFactory;
private readonly string _sql;
private readonly object? _parameters;
private readonly int _commandTimeout;
private SqlConnection? _connection;
private SqlCommand? _command;
public string SourceName { get; }
public DbQuerySource(
IDbConnectionFactory connectionFactory,
string sql,
string? name = null,
object? parameters = null,
int commandTimeout = 3600)
{
_connectionFactory = connectionFactory;
_sql = sql;
_parameters = parameters;
_commandTimeout = commandTimeout;
SourceName = $"DbQuery:{name ?? "Query"}";
}
Reading data
The connection opens in ReadDataAsync and stays open until disposal:
public async Task<IDataReader> ReadDataAsync(CancellationToken cancellationToken = default)
{
_connection = await _connectionFactory.CreateLotFinderConnectionAsync(cancellationToken);
_command = _connection.CreateCommand();
_command.CommandText = _sql;
_command.CommandTimeout = _commandTimeout;
AddParameters(_command, _parameters);
return await _command.ExecuteReaderAsync(cancellationToken);
}
Parameter handling
Parameters are added from an anonymous object using reflection:
private static void AddParameters(SqlCommand command, object? parameters)
{
if (parameters == null) return;
var properties = parameters.GetType().GetProperties();
foreach (var prop in properties)
{
var value = prop.GetValue(parameters) ?? DBNull.Value;
command.Parameters.AddWithValue($"@{prop.Name}", value);
}
}
Resource cleanup
Both the command and connection are disposed asynchronously:
public async ValueTask DisposeAsync()
{
if (_command != null)
{
await _command.DisposeAsync();
_command = null;
}
if (_connection != null)
{
await _connection.DisposeAsync();
_connection = null;
}
}
}
Key Patterns
Keep sources stateless until ReadDataAsync
Don't open connections or execute queries in the constructor. The source should be configurable without side effects until ReadDataAsync is called.
Streaming, not buffering
Return a live IDataReader rather than loading all data into memory. This allows processing millions of rows without memory pressure.
Use SourceName for diagnostics
Format: "DbQuery:{table}" or "File:{filename}". This appears in logs and StepResult.StepName.
Future source types
The interface supports additional source types not yet implemented:
- File-based sources - CSV, Excel files
- API sources - REST endpoints returning paged data
- Oracle/Sybase sources - Direct queries against JDE or CMS
Each would implement the same interface with different connection and reader implementations.
Related Documentation
- Overview - Pipeline architecture
- Transformers - Processing source data
- Configuration - Connection factory setup