docs: add ETL sources documentation
This commit is contained in:
@@ -0,0 +1,134 @@
|
||||
# Data Sources
|
||||
|
||||
Sources provide data to the ETL pipeline by implementing `IImportSource`. They return an `IDataReader` that streams rows to transformers and destinations.
|
||||
|
||||
## Interface Contract
|
||||
|
||||
```csharp
|
||||
public interface IImportSource : IAsyncDisposable
|
||||
{
|
||||
Task<IDataReader> ReadDataAsync(CancellationToken cancellationToken = default);
|
||||
string SourceName { get; }
|
||||
}
|
||||
```
|
||||
|
||||
**Key requirements:**
|
||||
- Implement `IAsyncDisposable` for connection cleanup
|
||||
- Return a live `IDataReader` (not buffered) for memory efficiency
|
||||
- `SourceName` is used in logging and `StepResult` tracking
|
||||
|
||||
## DbQuerySource Implementation
|
||||
|
||||
`DbQuerySource` executes a SQL query against the local cache database:
|
||||
|
||||
```csharp
|
||||
public class DbQuerySource : IImportSource
|
||||
{
|
||||
private readonly IDbConnectionFactory _connectionFactory;
|
||||
private readonly string _sql;
|
||||
private readonly object? _parameters;
|
||||
private readonly int _commandTimeout;
|
||||
private SqlConnection? _connection;
|
||||
private SqlCommand? _command;
|
||||
|
||||
public string SourceName { get; }
|
||||
|
||||
public DbQuerySource(
|
||||
IDbConnectionFactory connectionFactory,
|
||||
string sql,
|
||||
string? name = null,
|
||||
object? parameters = null,
|
||||
int commandTimeout = 3600)
|
||||
{
|
||||
_connectionFactory = connectionFactory;
|
||||
_sql = sql;
|
||||
_parameters = parameters;
|
||||
_commandTimeout = commandTimeout;
|
||||
SourceName = $"DbQuery:{name ?? "Query"}";
|
||||
}
|
||||
```
|
||||
|
||||
### Reading data
|
||||
|
||||
The connection opens in `ReadDataAsync` and stays open until disposal:
|
||||
|
||||
```csharp
|
||||
public async Task<IDataReader> ReadDataAsync(CancellationToken cancellationToken = default)
|
||||
{
|
||||
_connection = await _connectionFactory.CreateLotFinderConnectionAsync(cancellationToken);
|
||||
_command = _connection.CreateCommand();
|
||||
_command.CommandText = _sql;
|
||||
_command.CommandTimeout = _commandTimeout;
|
||||
AddParameters(_command, _parameters);
|
||||
return await _command.ExecuteReaderAsync(cancellationToken);
|
||||
}
|
||||
```
|
||||
|
||||
### Parameter handling
|
||||
|
||||
Parameters are added from an anonymous object using reflection:
|
||||
|
||||
```csharp
|
||||
private static void AddParameters(SqlCommand command, object? parameters)
|
||||
{
|
||||
if (parameters == null) return;
|
||||
|
||||
var properties = parameters.GetType().GetProperties();
|
||||
foreach (var prop in properties)
|
||||
{
|
||||
var value = prop.GetValue(parameters) ?? DBNull.Value;
|
||||
command.Parameters.AddWithValue($"@{prop.Name}", value);
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
### Resource cleanup
|
||||
|
||||
Both the command and connection are disposed asynchronously:
|
||||
|
||||
```csharp
|
||||
public async ValueTask DisposeAsync()
|
||||
{
|
||||
if (_command != null)
|
||||
{
|
||||
await _command.DisposeAsync();
|
||||
_command = null;
|
||||
}
|
||||
if (_connection != null)
|
||||
{
|
||||
await _connection.DisposeAsync();
|
||||
_connection = null;
|
||||
}
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
## Key Patterns
|
||||
|
||||
### Keep sources stateless until ReadDataAsync
|
||||
|
||||
Don't open connections or execute queries in the constructor. The source should be configurable without side effects until `ReadDataAsync` is called.
|
||||
|
||||
### Streaming, not buffering
|
||||
|
||||
Return a live `IDataReader` rather than loading all data into memory. This allows processing millions of rows without memory pressure.
|
||||
|
||||
### Use SourceName for diagnostics
|
||||
|
||||
Format: `"DbQuery:{table}"` or `"File:{filename}"`. This appears in logs and `StepResult.StepName`.
|
||||
|
||||
## Future source types
|
||||
|
||||
The interface supports additional source types not yet implemented:
|
||||
|
||||
- **File-based sources** - CSV, Excel files
|
||||
- **API sources** - REST endpoints returning paged data
|
||||
- **Oracle/Sybase sources** - Direct queries against JDE or CMS
|
||||
|
||||
Each would implement the same interface with different connection and reader implementations.
|
||||
|
||||
## Related Documentation
|
||||
|
||||
- [Overview](./Overview.md) - Pipeline architecture
|
||||
- [Transformers](./Transformers.md) - Processing source data
|
||||
- [Configuration](./Configuration.md) - Connection factory setup
|
||||
Reference in New Issue
Block a user