Files
jdescopingtool/DOCUMENTATION/DataSync/Sources.md
T
2026-01-03 15:34:45 -05:00

135 lines
4.0 KiB
Markdown

# Data Sources
Sources provide data to the ETL pipeline by implementing `IImportSource`. They return an `IDataReader` that streams rows to transformers and destinations.
## Interface Contract
```csharp
public interface IImportSource : IAsyncDisposable
{
Task<IDataReader> ReadDataAsync(CancellationToken cancellationToken = default);
string SourceName { get; }
}
```
**Key requirements:**
- Implement `IAsyncDisposable` for connection cleanup
- Return a live `IDataReader` (not buffered) for memory efficiency
- `SourceName` is used in logging and `StepResult` tracking
## DbQuerySource Implementation
`DbQuerySource` executes a SQL query against the local cache database:
```csharp
public class DbQuerySource : IImportSource
{
private readonly IDbConnectionFactory _connectionFactory;
private readonly string _sql;
private readonly object? _parameters;
private readonly int _commandTimeout;
private SqlConnection? _connection;
private SqlCommand? _command;
public string SourceName { get; }
public DbQuerySource(
IDbConnectionFactory connectionFactory,
string sql,
string? name = null,
object? parameters = null,
int commandTimeout = 3600)
{
_connectionFactory = connectionFactory;
_sql = sql;
_parameters = parameters;
_commandTimeout = commandTimeout;
SourceName = $"DbQuery:{name ?? "Query"}";
}
```
### Reading data
The connection opens in `ReadDataAsync` and stays open until disposal:
```csharp
public async Task<IDataReader> ReadDataAsync(CancellationToken cancellationToken = default)
{
_connection = await _connectionFactory.CreateLotFinderConnectionAsync(cancellationToken);
_command = _connection.CreateCommand();
_command.CommandText = _sql;
_command.CommandTimeout = _commandTimeout;
AddParameters(_command, _parameters);
return await _command.ExecuteReaderAsync(cancellationToken);
}
```
### Parameter handling
Parameters are added from an anonymous object using reflection:
```csharp
private static void AddParameters(SqlCommand command, object? parameters)
{
if (parameters == null) return;
var properties = parameters.GetType().GetProperties();
foreach (var prop in properties)
{
var value = prop.GetValue(parameters) ?? DBNull.Value;
command.Parameters.AddWithValue($"@{prop.Name}", value);
}
}
```
### Resource cleanup
Both the command and connection are disposed asynchronously:
```csharp
public async ValueTask DisposeAsync()
{
if (_command != null)
{
await _command.DisposeAsync();
_command = null;
}
if (_connection != null)
{
await _connection.DisposeAsync();
_connection = null;
}
}
}
```
## Key Patterns
### Keep sources stateless until ReadDataAsync
Don't open connections or execute queries in the constructor. The source should be configurable without side effects until `ReadDataAsync` is called.
### Streaming, not buffering
Return a live `IDataReader` rather than loading all data into memory. This allows processing millions of rows without memory pressure.
### Use SourceName for diagnostics
Format: `"DbQuery:{table}"` or `"File:{filename}"`. This appears in logs and `StepResult.StepName`.
## Future source types
The interface supports additional source types not yet implemented:
- **File-based sources** - CSV, Excel files
- **API sources** - REST endpoints returning paged data
- **Oracle/Sybase sources** - Direct queries against JDE or CMS
Each would implement the same interface with different connection and reader implementations.
## Related Documentation
- [Overview](./Overview.md) - Pipeline architecture
- [Transformers](./Transformers.md) - Processing source data
- [Configuration](./Configuration.md) - Connection factory setup