# Old ETL Removal Design ## Goal Remove the legacy ETL implementation (Fetchers, MergeConfigurations, BulkMerge services, SourceGenerator) and wire the existing orchestration layer to use the new `EtlPipeline` system. ## Background The codebase has two parallel ETL implementations: - **OLD:** `IDataFetcher` → `BulkMergeHelper` → `IMergeConfiguration` with source-generated `IDataReader` implementations - **NEW:** `EtlPipeline` with `IImportSource` → `IDataTransformer` → `IImportDestination` The new pipeline is working well. The old implementation can be removed. ## Design Decisions 1. **Keep orchestration layer** - `DataSyncService`, `SyncOrchestrator`, `ScheduleChecker` remain; only internals change 2. **Keep tracking** - `DataUpdateRepository` preserved; new pipeline writes sync timestamps 3. **JSON config-driven pipelines** - Pipeline definitions loaded from JSON files at runtime (not compiled code) 4. **Builder pattern for factory** - `IEtlPipelineFactory` uses fluent builder: `.ForTable().WithMode().Build()` 5. **Generic DbQuerySource** - Single source class with connection type specified in config (not separate Oracle/Sybase classes) 6. **Conditional merge support** - Extend `DbBulkMergeDestination` with `UpdateWhen` condition 7. **Relative time offsets** - MinDt parameter uses TimeSpan format (e.g., `"-7.00:00:00"`) computed at runtime 8. **Config table names** - Factory uses table names exactly as defined in config (e.g., `WorkOrder_Curr`) 9. **MisData post-processing** - Convert `MisDataPostProcessor` to SQL post-script in pipeline 10. **Sync mode mapping** - Daily and Hourly both map to `incremental` mode; ScheduleChecker can override offset at runtime 11. **Parameter mapping** - Config defines parameter mappings for provider-specific syntax (`:dateUpdated` vs `@MinDt`) and format conversions (JDE Julian) 12. **Destination override** - Base destination config, sync modes can override destination type/settings 13. **Exclude list for updates** - Default: update all non-match columns; config can specify `excludeFromUpdate` for exceptions 14. **Implement new first** - Build new factory/sources/config before deleting old code to keep build working 15. **Config as content file** - `pipelines.json` copied to output directory, loaded from disk at runtime 16. **PrePurge/ReIndex as scripts** - PrePurge becomes pre-script (TRUNCATE), ReIndex becomes post-script 17. **Partial merge for overrides** - Mode-specific destination config merges with base (only specified fields override) 18. **Generic parameters** - Support arbitrary parameters with source types: `offset`, `static`, `runtime` 19. **Configurable timezone** - JDE Julian conversion uses configurable timezone (UTC or local) 20. **Fail fast on missing config** - Factory throws if requested sync mode not defined in config 21. **Runtime parameters deferred** - Only `offset` and `static` parameter sources supported; throw if `runtime` used 22. **Pipeline config owns PrePurge/ReIndex** - Remove schedule flags; pipeline config is single source of truth 23. **JSON camelCase** - Use `JsonSerializerOptions` with `PropertyNameCaseInsensitive = true` ## Files to Delete ### Source Generator Project (entire project) ``` src/JdeScoping.DataSync.SourceGenerators/ ├── DataReaderGenerator.cs ├── IsExternalInit.cs └── JdeScoping.DataSync.SourceGenerators.csproj ``` ### DataSync Source Files (~32 files) ``` src/JdeScoping.DataSync/ ├── BulkCopyTypeRegistry.cs ├── Contracts/ │ ├── IBulkMergeHelper.cs │ ├── IDataFetcher.cs │ ├── IDataReaderFactory.cs │ ├── IMergeConfiguration.cs │ ├── IMergeConfigurationRegistry.cs │ ├── IPostProcessor.cs │ └── ISchemaValidator.cs ├── Configuration/MergeConfigurations/ │ ├── BranchMergeConfiguration.cs │ ├── ItemMergeConfiguration.cs │ ├── JdeUserMergeConfiguration.cs │ ├── LotMergeConfiguration.cs │ ├── LotUsageMergeConfiguration.cs │ ├── MisDataMergeConfiguration.cs │ ├── ProfitCenterMergeConfiguration.cs │ ├── WorkCenterMergeConfiguration.cs │ └── WorkOrderMergeConfiguration.cs ├── Exceptions/BulkMergeException.cs ├── Fetchers/ │ ├── Cms/CmsMisDataFetcher.cs │ └── Jde/ │ ├── JdeBranchFetcher.cs │ ├── JdeItemFetcher.cs │ ├── JdeLotFetcher.cs │ ├── JdeLotUsageFetcher.cs │ ├── JdeProfitCenterFetcher.cs │ ├── JdeUserFetcher.cs │ ├── JdeWorkCenterFetcher.cs │ └── JdeWorkOrderFetcher.cs ├── Models/ │ ├── ColumnSchema.cs │ └── MergeResult.cs └── Services/ ├── BulkMergeHelper.cs ├── ExpressionParser.cs ├── MergeConfigurationRegistry.cs ├── MergeSqlBuilder.cs ├── MisDataPostProcessor.cs └── SchemaValidator.cs ``` ### Test Files (~8 files) ``` tests/JdeScoping.DataSync.Tests/ ├── Services/ │ ├── BulkMergeHelperTests.cs │ ├── ExpressionParserTests.cs │ ├── MergeConfigurationRegistryTests.cs │ ├── MergeSqlBuilderTests.cs │ └── SchemaValidatorTests.cs └── TableSyncOperationTests.cs tests/JdeScoping.DataSync.IntegrationTests/ ├── BulkMergeHelperTests.cs └── TableSyncOperationTests.cs ``` ### Integration Test Infrastructure (entire folder) ``` tests/JdeScoping.DataSync.IntegrationTests/Infrastructure/ ├── TestDataReaderFactory.cs ├── BulkMergeTestEntityDataReader.cs ├── BulkMergeTestEntity.cs ├── TestDatabaseInitializer.cs ├── TestDbConnectionFactory.cs ├── TestDataGenerator.cs └── SqlServerFixture.cs ``` Consider removing entire `tests/JdeScoping.DataSync.IntegrationTests/` project if all tests are obsolete. ## Files to Modify ### 1. TableSyncOperation.cs - Major Rewrite **Current:** Uses `IDataFetcher`, `IBulkMergeHelper`, `IMergeConfiguration` **New:** Uses `IEtlPipelineFactory` to get and execute pipelines ### 2. DependencyInjection.cs - Remove Old Registrations **Remove:** - `using JdeScoping.DataSync.Generated;` statement - All `IDataFetcher` registrations - All `IMergeConfiguration` registrations - `IBulkMergeHelper`, `IDataReaderFactory`, `ISchemaValidator` - `IMergeConfigurationRegistry` - `IPostProcessor`, `MisDataPostProcessor` - Named fetcher registrations - `DataReaderFactory` registration **Keep:** - `DataSyncService`, `ISyncOrchestrator`, `IScheduleChecker` - `IDataUpdateRepository` - Health check and metrics **Add:** - `IEtlPipelineFactory` registration - `PipelineOptions` configuration binding ### 3. DataSourceConfig.cs - Remove Unused Properties Remove these properties: - `FetcherTypeName` - `PostProcessorTypeName` - `PrepurgeData` (now in pipeline config) - `ReIndexData` (now in pipeline config) ### 4. appsettings.json / appsettings.Development.json Remove `FetcherTypeName` and `PostProcessorTypeName` from data source configurations ### 5. JdeScoping.slnx Remove `JdeScoping.DataSync.SourceGenerators` project reference ### 6. JdeScoping.DataSync.csproj - Remove reference to SourceGenerators project - Remove `InternalsVisibleTo` for integration tests if project removed ### 7. Tests to Update (not delete) - `ScheduleCheckerTests.cs` - Update test fixtures to remove FetcherTypeName - `SyncOrchestratorTests.cs` - Update test fixtures to remove FetcherTypeName ## Files to Create ### 1. Pipeline Configuration Files Pipeline definitions stored in JSON, copied to output directory at build time. **Location:** `src/JdeScoping.DataSync/Pipelines/pipelines.json` **Project file entry:** ```xml PreserveNewest ``` **Complete Schema Example:** ```json { "settings": { "timezone": "UTC" // or "Local" - used for JDE Julian conversion }, "pipelines": { "WorkOrder_Curr": { "source": { "connection": "jde", "query": "SELECT WADOCO, WADC0J, ... FROM PRODDTA.F4801 WHERE UPMJ >= :dateUpdated", "parameters": { "minDt": { "name": ":dateUpdated", "format": "jdeJulian", "source": "offset" } } }, "syncModes": { "mass": { "minDtOffset": "-365.00:00:00", "prePurge": true, "reIndex": true }, "incremental": { "minDtOffset": "-1.00:00:00" } }, "transformers": [ { "type": "jdeDate", "columns": ["OrderDate", "CompletionDate", "StartDate"] }, { "type": "columnRename", "mappings": { "WADOCO": "OrderNumber", "WADC0J": "Branch" } } ], "destination": { "table": "WorkOrder_Curr", "matchColumns": ["OrderNumber"], "excludeFromUpdate": ["CreatedDate"] } }, "MisData": { "source": { "connection": "cms", "query": "SELECT ... FROM MIS_DATA WHERE LastUpdate >= @MinDt", "parameters": { "minDt": { "name": "@MinDt", "source": "offset" } } }, "syncModes": { "mass": { "minDtOffset": "-365.00:00:00", "prePurge": true, "destination": { "type": "bulkImport" } }, "incremental": { "minDtOffset": "-7.00:00:00", "destination": { "type": "bulkMerge" } } }, "destination": { "table": "MisData", "matchColumns": ["MisDataId"] }, "postScripts": [ "UPDATE MisData SET ProcessedFlag = 1 WHERE ProcessedFlag IS NULL" ] } } } ``` **PrePurge/ReIndex behavior:** - `prePurge: true` → Factory adds pre-script: `TRUNCATE TABLE [TableName]` - `reIndex: true` → Factory adds post-script: `ALTER INDEX ALL ON [TableName] REBUILD` ### 2. Pipeline Configuration Models **PipelinesRoot.cs** - Root config structure ```csharp namespace JdeScoping.DataSync.Configuration; public record PipelinesRoot( PipelineSettings Settings, Dictionary Pipelines); public record PipelineSettings( string Timezone = "UTC"); // "UTC" or "Local" ``` **PipelineConfig.cs** ```csharp namespace JdeScoping.DataSync.Configuration; public record PipelineConfig( SourceConfig Source, Dictionary SyncModes, List? Transformers, DestinationConfig Destination, List? PreScripts, List? PostScripts); public record SourceConfig( string Connection, // "jde", "cms", "lotfinder" string Query, Dictionary? Parameters); public record ParameterConfig( string Name, // Provider-specific: ":dateUpdated" or "@MinDt" string? Format, // "jdeJulian", "jdeTime", null for DateTime string Source = "offset", // "offset", "static", "runtime" string? Value); // For static source public record SyncModeConfig( string? MinDtOffset, // TimeSpan format: "-7.00:00:00" bool PrePurge = false, bool ReIndex = false, string? UpdateWhen = null, // Conditional update expression DestinationOverride? Destination = null); // Override base destination (partial merge) public record DestinationOverride( string? Type, // "bulkImport" or "bulkMerge" List? MatchColumns, // Override match columns List? ExcludeFromUpdate); // Override exclude list public record TransformerConfig( string Type, List? Columns, Dictionary? Mappings); public record DestinationConfig( string Table, List? MatchColumns, // For merge operations List? ExcludeFromUpdate); // Columns to skip on update ``` **PipelineOptions.cs** ```csharp namespace JdeScoping.DataSync.Options; public class PipelineOptions { public const string SectionName = "Pipelines"; public string ConfigPath { get; set; } = "Pipelines/pipelines.json"; } ``` **Config loading with validation:** ```csharp private static readonly JsonSerializerOptions JsonOptions = new() { PropertyNameCaseInsensitive = true, ReadCommentHandling = JsonCommentHandling.Skip, AllowTrailingCommas = true }; private PipelinesRoot LoadPipelineConfigs(string configPath) { // Resolve path relative to assembly location (handles both debug and publish) var assemblyDir = Path.GetDirectoryName(typeof(EtlPipelineFactory).Assembly.Location)!; var fullPath = Path.Combine(assemblyDir, configPath); if (!File.Exists(fullPath)) throw new FileNotFoundException($"Pipeline config not found: {fullPath}"); var json = File.ReadAllText(fullPath); var root = JsonSerializer.Deserialize(json, JsonOptions) ?? throw new InvalidOperationException("Failed to deserialize pipeline config"); // Validate all pipelines have required sync modes foreach (var (name, config) in root.Pipelines) { if (!config.SyncModes.ContainsKey("mass")) throw new InvalidOperationException($"Pipeline '{name}' missing 'mass' sync mode"); if (!config.SyncModes.ContainsKey("incremental")) throw new InvalidOperationException($"Pipeline '{name}' missing 'incremental' sync mode"); // Validate no runtime parameters (not yet supported) if (config.Source.Parameters != null) { foreach (var (paramName, paramConfig) in config.Source.Parameters) { if (paramConfig.Source == "runtime") throw new NotSupportedException( $"Pipeline '{name}' parameter '{paramName}': runtime source not yet supported"); } } } return root; } ``` ### 3. IEtlPipelineFactory.cs (Builder Pattern) ```csharp namespace JdeScoping.DataSync.Contracts; public interface IEtlPipelineFactory { IEtlPipelineBuilder ForTable(string tableName); } public interface IEtlPipelineBuilder { IEtlPipelineBuilder WithMode(SyncMode mode); IEtlPipelineBuilder WithMinimumDate(DateTime? minDt); // Override config offset EtlPipeline Build(); } // Note: No WithPrePurge/WithReIndex - pipeline config is source of truth public enum SyncMode { Mass, // Full refresh - uses bulkImport by default Incremental // Delta sync - uses bulkMerge by default } ``` ### 4. EtlPipelineFactory.cs ```csharp namespace JdeScoping.DataSync.Services; public class EtlPipelineFactory : IEtlPipelineFactory { private readonly IDbConnectionFactory _connectionFactory; private readonly Dictionary _configs; public EtlPipelineFactory( IDbConnectionFactory connectionFactory, IOptions options) { _connectionFactory = connectionFactory; _configs = LoadPipelineConfigs(options.Value.ConfigPath); } public IEtlPipelineBuilder ForTable(string tableName) { if (!_configs.TryGetValue(tableName, out var config)) throw new ArgumentException($"No pipeline configured for table: {tableName}"); return new PipelineBuilder(_connectionFactory, tableName, config); } private class PipelineBuilder : IEtlPipelineBuilder { private readonly IDbConnectionFactory _connectionFactory; private readonly string _tableName; private readonly PipelineConfig _config; private readonly PipelineSettings _settings; private SyncMode _mode = SyncMode.Incremental; private DateTime? _minDtOverride; public EtlPipeline Build() { var modeKey = _mode == SyncMode.Mass ? "mass" : "incremental"; var modeConfig = _config.SyncModes[modeKey]; // Already validated at load // Compute MinDt from offset or override var minDt = _minDtOverride ?? ComputeMinDt(modeConfig.MinDtOffset); // Create source with parameter substitution var source = CreateSource(_config.Source, minDt); // Determine destination type (mode override > default by mode) var destType = modeConfig.Destination?.Type ?? (_mode == SyncMode.Mass ? "bulkImport" : "bulkMerge"); var destination = CreateDestination(destType, _config.Destination, modeConfig); // Build pipeline with scripts var builder = new EtlPipelineBuilder() .WithName(_tableName) .WithSource(source) .WithDestination(destination); // Add pre-scripts: config scripts first, then prePurge foreach (var script in _config.PreScripts ?? []) builder.WithPreScript(new SqlScriptRunner(_connectionFactory, script)); if (modeConfig.PrePurge) builder.WithPreScript(new SqlScriptRunner(_connectionFactory, $"TRUNCATE TABLE [{_config.Destination.Table}]")); // Add post-scripts: reIndex first, then config scripts if (modeConfig.ReIndex) builder.WithPostScript(new SqlScriptRunner(_connectionFactory, $"ALTER INDEX ALL ON [{_config.Destination.Table}] REBUILD")); foreach (var script in _config.PostScripts ?? []) builder.WithPostScript(new SqlScriptRunner(_connectionFactory, script)); // Add transformers foreach (var t in _config.Transformers ?? []) builder.WithTransformer(CreateTransformer(t)); return builder.Build(); } } } ``` ### 5. DbQuerySource.cs (Generic) ```csharp namespace JdeScoping.DataSync.Etl.Sources; /// /// Generic database query source that works with any connection type. /// public class DbQuerySource : IImportSource { private readonly IDbConnectionFactory _connectionFactory; private readonly string _connectionType; // "jde", "cms", "lotfinder" private readonly string _query; private readonly Dictionary _parameters; public string SourceName => $"DbQuery:{_connectionType}"; public DbQuerySource( IDbConnectionFactory connectionFactory, string connectionType, string query, Dictionary? parameters = null) { _connectionFactory = connectionFactory; _connectionType = connectionType; _query = query; _parameters = parameters ?? new(); } public async Task ReadDataAsync(CancellationToken ct) { var connection = _connectionType switch { "jde" => await _connectionFactory.CreateJdeConnectionAsync(), "cms" => await _connectionFactory.CreateCmsConnectionAsync(), "lotfinder" => await _connectionFactory.CreateLotFinderConnectionAsync(), _ => throw new ArgumentException($"Unknown connection type: {_connectionType}") }; var command = connection.CreateCommand(); command.CommandText = _query; foreach (var (name, value) in _parameters) { var param = command.CreateParameter(); param.ParameterName = name; param.Value = value; command.Parameters.Add(param); } return await command.ExecuteReaderAsync(CommandBehavior.CloseConnection, ct); } } ``` ### 6. Parameter Format Converters ```csharp namespace JdeScoping.DataSync.Services; public class ParameterFormatConverter { private readonly TimeZoneInfo _timezone; public ParameterFormatConverter(string timezone) { _timezone = timezone.ToUpperInvariant() switch { "UTC" => TimeZoneInfo.Utc, "LOCAL" => TimeZoneInfo.Local, _ => TimeZoneInfo.FindSystemTimeZoneById(timezone) }; } public object Convert(DateTime minDt, string? format) { // Convert to configured timezone var adjusted = TimeZoneInfo.ConvertTime(minDt, _timezone); return format switch { "jdeJulian" => ToJdeJulianDate(adjusted), "jdeTime" => ToJdeTime(adjusted), null => adjusted, _ => throw new ArgumentException($"Unknown format: {format}") }; } private static int ToJdeJulianDate(DateTime date) { // JDE Julian: CYYDDD where C=century (0=19xx, 1=20xx), YY=year, DDD=day of year int century = date.Year >= 2000 ? 1 : 0; int year = date.Year % 100; int dayOfYear = date.DayOfYear; return century * 100000 + year * 1000 + dayOfYear; } private static int ToJdeTime(DateTime time) { // JDE Time: HHMMSS return time.Hour * 10000 + time.Minute * 100 + time.Second; } } ``` ### 7. DbBulkMergeDestination Extension Extend to support conditional updates and exclude columns: ```csharp // Updated constructor public DbBulkMergeDestination( IDbConnectionFactory connectionFactory, string tableName, string[] matchColumns, string[]? excludeFromUpdate = null, string? updateCondition = null) // e.g., "source.LastUpdateDt > target.LastUpdateDt" ``` Modify MERGE SQL generation: ```sql MERGE INTO [Target] AS target USING #TempTable AS source ON target.OrderNumber = source.OrderNumber WHEN MATCHED AND source.LastUpdateDt > target.LastUpdateDt THEN -- updateCondition UPDATE SET target.Col1 = source.Col1, -- excludeFromUpdate columns omitted WHEN NOT MATCHED THEN INSERT (...) VALUES (...); ``` ## TableSyncOperation.cs - Error Handling `EtlPipeline.ExecuteAsync` returns `PipelineResult` with `Success=false` on failure (doesn't throw). `TableSyncOperation` must check `Success` and throw to keep `DataUpdate` records correct: ```csharp var pipeline = _pipelineFactory .ForTable(config.TableName) .WithMode(updateTask.IsMassUpdate ? SyncMode.Mass : SyncMode.Incremental) .WithMinimumDate(updateTask.MinimumDt) // ScheduleChecker can override .Build(); var result = await pipeline.ExecuteAsync(ct); if (!result.Success) throw new InvalidOperationException($"Pipeline failed for {config.TableName}: {result.ErrorMessage}"); ``` ## Files to Keep (No Changes) - `DataSyncService.cs` - Background service host - `SyncOrchestrator.cs` - Orchestration logic - `ScheduleChecker.cs` - Schedule checking logic (provides mode and can override offset) - `DataUpdateRepository.cs` - Sync timestamp tracking - `DataSyncHealthCheck.cs` - Health monitoring - `DataSyncMetrics.cs` - Telemetry - `DataSyncActivitySource.cs` - Tracing ## Test Files to Keep (with updates) - `SyncOrchestratorTests.cs` - Update fixtures to remove FetcherTypeName - `ScheduleCheckerTests.cs` - Update fixtures to remove FetcherTypeName - `DataSyncServiceTests.cs` - `DataSyncHealthCheckTests.cs` ## New Tests to Create - `EtlPipelineFactoryTests.cs` - Test config loading and pipeline building - `DbQuerySourceTests.cs` - Test connection type switching and parameter handling - `ParameterFormatConverterTests.cs` - Test JDE Julian/time conversions - `DbBulkMergeDestinationTests.cs` - Test UpdateWhen and excludeFromUpdate ## Validation & Precedence Rules ### Required Fields (fail at config load) - `source.connection` - must be "jde", "cms", or "lotfinder" - `source.query` - must be non-empty - `destination.table` - must be non-empty - `syncModes.mass` and `syncModes.incremental` - both required ### Precedence Rules 1. **MinDt**: `WithMinimumDate()` override > config `minDtOffset` computation 2. **PrePurge/ReIndex**: Removed from builder; pipeline config is only source 3. **Scripts order**: Config `preScripts` run first, then generated prePurge script; generated reIndex script runs first, then config `postScripts` 4. **Destination merge**: Mode-specific fields override base; missing fields inherit from base ### Parameter Resolution - `offset`: Computed from `minDtOffset` + current time; format conversion applied - `static`: Value taken from config `value` field; must be present; no format conversion - `runtime`: Throws `NotSupportedException` (deferred) ## Risk Assessment **Low risk:** - New ETL pipeline already working and tested - Orchestration layer unchanged (just different internals) - Clear separation between old and new code **Medium risk:** - Generic DbQuerySource with multiple connection types - need testing - Conditional merge (UpdateWhen) is new - need testing - JSON config loading is new - need validation - Parameter format conversion (JDE Julian) - need testing ## Migration Path **Phase 1: Build New (keep build working)** 1. Create pipeline config models and options 2. Create `DbQuerySource` (generic) 3. Extend `DbBulkMergeDestination` with UpdateWhen and excludeFromUpdate 4. Create `ParameterFormatConverter` 5. Create `IEtlPipelineFactory` and `EtlPipelineFactory` 6. Create `pipelines.json` config file 7. Register new services in DI (alongside old) **Phase 2: Wire Up** 8. Update `TableSyncOperation` to use `IEtlPipelineFactory` 9. Update `DependencyInjection.cs` to wire new factory 10. Test end-to-end with new pipeline **Phase 3: Clean Up** 11. Delete old source files (Fetchers, MergeConfigurations, BulkMerge services) 12. Delete old contracts 13. Delete SourceGenerator project 14. Update solution file 15. Update tests (remove FetcherTypeName references) 16. Delete obsolete test files and infrastructure