diff --git a/DOCUMENTATION/DataSync/Sources.md b/DOCUMENTATION/DataSync/Sources.md new file mode 100644 index 0000000..d8c5605 --- /dev/null +++ b/DOCUMENTATION/DataSync/Sources.md @@ -0,0 +1,134 @@ +# Data Sources + +Sources provide data to the ETL pipeline by implementing `IImportSource`. They return an `IDataReader` that streams rows to transformers and destinations. + +## Interface Contract + +```csharp +public interface IImportSource : IAsyncDisposable +{ + Task ReadDataAsync(CancellationToken cancellationToken = default); + string SourceName { get; } +} +``` + +**Key requirements:** +- Implement `IAsyncDisposable` for connection cleanup +- Return a live `IDataReader` (not buffered) for memory efficiency +- `SourceName` is used in logging and `StepResult` tracking + +## DbQuerySource Implementation + +`DbQuerySource` executes a SQL query against the local cache database: + +```csharp +public class DbQuerySource : IImportSource +{ + private readonly IDbConnectionFactory _connectionFactory; + private readonly string _sql; + private readonly object? _parameters; + private readonly int _commandTimeout; + private SqlConnection? _connection; + private SqlCommand? _command; + + public string SourceName { get; } + + public DbQuerySource( + IDbConnectionFactory connectionFactory, + string sql, + string? name = null, + object? parameters = null, + int commandTimeout = 3600) + { + _connectionFactory = connectionFactory; + _sql = sql; + _parameters = parameters; + _commandTimeout = commandTimeout; + SourceName = $"DbQuery:{name ?? "Query"}"; + } +``` + +### Reading data + +The connection opens in `ReadDataAsync` and stays open until disposal: + +```csharp + public async Task ReadDataAsync(CancellationToken cancellationToken = default) + { + _connection = await _connectionFactory.CreateLotFinderConnectionAsync(cancellationToken); + _command = _connection.CreateCommand(); + _command.CommandText = _sql; + _command.CommandTimeout = _commandTimeout; + AddParameters(_command, _parameters); + return await _command.ExecuteReaderAsync(cancellationToken); + } +``` + +### Parameter handling + +Parameters are added from an anonymous object using reflection: + +```csharp + private static void AddParameters(SqlCommand command, object? parameters) + { + if (parameters == null) return; + + var properties = parameters.GetType().GetProperties(); + foreach (var prop in properties) + { + var value = prop.GetValue(parameters) ?? DBNull.Value; + command.Parameters.AddWithValue($"@{prop.Name}", value); + } + } +``` + +### Resource cleanup + +Both the command and connection are disposed asynchronously: + +```csharp + public async ValueTask DisposeAsync() + { + if (_command != null) + { + await _command.DisposeAsync(); + _command = null; + } + if (_connection != null) + { + await _connection.DisposeAsync(); + _connection = null; + } + } +} +``` + +## Key Patterns + +### Keep sources stateless until ReadDataAsync + +Don't open connections or execute queries in the constructor. The source should be configurable without side effects until `ReadDataAsync` is called. + +### Streaming, not buffering + +Return a live `IDataReader` rather than loading all data into memory. This allows processing millions of rows without memory pressure. + +### Use SourceName for diagnostics + +Format: `"DbQuery:{table}"` or `"File:{filename}"`. This appears in logs and `StepResult.StepName`. + +## Future source types + +The interface supports additional source types not yet implemented: + +- **File-based sources** - CSV, Excel files +- **API sources** - REST endpoints returning paged data +- **Oracle/Sybase sources** - Direct queries against JDE or CMS + +Each would implement the same interface with different connection and reader implementations. + +## Related Documentation + +- [Overview](./Overview.md) - Pipeline architecture +- [Transformers](./Transformers.md) - Processing source data +- [Configuration](./Configuration.md) - Connection factory setup