135 lines
4.0 KiB
Markdown
135 lines
4.0 KiB
Markdown
# Data Sources
|
|
|
|
Sources provide data to the ETL pipeline by implementing `IImportSource`. They return an `IDataReader` that streams rows to transformers and destinations.
|
|
|
|
## Interface Contract
|
|
|
|
```csharp
|
|
public interface IImportSource : IAsyncDisposable
|
|
{
|
|
Task<IDataReader> ReadDataAsync(CancellationToken cancellationToken = default);
|
|
string SourceName { get; }
|
|
}
|
|
```
|
|
|
|
**Key requirements:**
|
|
- Implement `IAsyncDisposable` for connection cleanup
|
|
- Return a live `IDataReader` (not buffered) for memory efficiency
|
|
- `SourceName` is used in logging and `StepResult` tracking
|
|
|
|
## DbQuerySource Implementation
|
|
|
|
`DbQuerySource` executes a SQL query against the local cache database:
|
|
|
|
```csharp
|
|
public class DbQuerySource : IImportSource
|
|
{
|
|
private readonly IDbConnectionFactory _connectionFactory;
|
|
private readonly string _sql;
|
|
private readonly object? _parameters;
|
|
private readonly int _commandTimeout;
|
|
private SqlConnection? _connection;
|
|
private SqlCommand? _command;
|
|
|
|
public string SourceName { get; }
|
|
|
|
public DbQuerySource(
|
|
IDbConnectionFactory connectionFactory,
|
|
string sql,
|
|
string? name = null,
|
|
object? parameters = null,
|
|
int commandTimeout = 3600)
|
|
{
|
|
_connectionFactory = connectionFactory;
|
|
_sql = sql;
|
|
_parameters = parameters;
|
|
_commandTimeout = commandTimeout;
|
|
SourceName = $"DbQuery:{name ?? "Query"}";
|
|
}
|
|
```
|
|
|
|
### Reading data
|
|
|
|
The connection opens in `ReadDataAsync` and stays open until disposal:
|
|
|
|
```csharp
|
|
public async Task<IDataReader> ReadDataAsync(CancellationToken cancellationToken = default)
|
|
{
|
|
_connection = await _connectionFactory.CreateLotFinderConnectionAsync(cancellationToken);
|
|
_command = _connection.CreateCommand();
|
|
_command.CommandText = _sql;
|
|
_command.CommandTimeout = _commandTimeout;
|
|
AddParameters(_command, _parameters);
|
|
return await _command.ExecuteReaderAsync(cancellationToken);
|
|
}
|
|
```
|
|
|
|
### Parameter handling
|
|
|
|
Parameters are added from an anonymous object using reflection:
|
|
|
|
```csharp
|
|
private static void AddParameters(SqlCommand command, object? parameters)
|
|
{
|
|
if (parameters == null) return;
|
|
|
|
var properties = parameters.GetType().GetProperties();
|
|
foreach (var prop in properties)
|
|
{
|
|
var value = prop.GetValue(parameters) ?? DBNull.Value;
|
|
command.Parameters.AddWithValue($"@{prop.Name}", value);
|
|
}
|
|
}
|
|
```
|
|
|
|
### Resource cleanup
|
|
|
|
Both the command and connection are disposed asynchronously:
|
|
|
|
```csharp
|
|
public async ValueTask DisposeAsync()
|
|
{
|
|
if (_command != null)
|
|
{
|
|
await _command.DisposeAsync();
|
|
_command = null;
|
|
}
|
|
if (_connection != null)
|
|
{
|
|
await _connection.DisposeAsync();
|
|
_connection = null;
|
|
}
|
|
}
|
|
}
|
|
```
|
|
|
|
## Key Patterns
|
|
|
|
### Keep sources stateless until ReadDataAsync
|
|
|
|
Don't open connections or execute queries in the constructor. The source should be configurable without side effects until `ReadDataAsync` is called.
|
|
|
|
### Streaming, not buffering
|
|
|
|
Return a live `IDataReader` rather than loading all data into memory. This allows processing millions of rows without memory pressure.
|
|
|
|
### Use SourceName for diagnostics
|
|
|
|
Format: `"DbQuery:{table}"` or `"File:{filename}"`. This appears in logs and `StepResult.StepName`.
|
|
|
|
## Future source types
|
|
|
|
The interface supports additional source types not yet implemented:
|
|
|
|
- **File-based sources** - CSV, Excel files
|
|
- **API sources** - REST endpoints returning paged data
|
|
- **Oracle/Sybase sources** - Direct queries against JDE or CMS
|
|
|
|
Each would implement the same interface with different connection and reader implementations.
|
|
|
|
## Related Documentation
|
|
|
|
- [Overview](./Overview.md) - Pipeline architecture
|
|
- [Transformers](./Transformers.md) - Processing source data
|
|
- [Configuration](./Configuration.md) - Connection factory setup
|