diff --git a/PLANS/2025-01-07-pipeline-schedule-alignment-design.md b/PLANS/2025-01-07-pipeline-schedule-alignment-design.md new file mode 100644 index 0000000..9318480 --- /dev/null +++ b/PLANS/2025-01-07-pipeline-schedule-alignment-design.md @@ -0,0 +1,237 @@ +# Pipeline Schedule Alignment Design + +## Purpose + +Align the `pipelines.json` ETL configuration with the legacy system (documented in `DATA_SYNC/DataSyncReport.md`) by: +1. Supporting three explicit schedules (Mass/Daily/Hourly) instead of two sync modes (mass/incremental) +2. Adding 8 missing pipeline definitions +3. Adding GIW connection for StatusCode +4. Fixing Mass to be true full reload (unfiltered) + +## Current State + +**pipelines.json:** +- 9 pipelines defined +- Only `mass` and `incremental` sync modes +- Mass uses filtered query with `-365.00:00:00` offset +- No per-schedule enabled/disabled flags + +**Legacy system (source of truth):** +- 17 active syncs +- Three schedules: Mass (weekly/10080 min), Daily (1440 min), Hourly (60 min) +- Per-schedule configuration (enabled, prepurge, reindex) +- StatusCode uses GIW connection (not JDE) +- FunctionCode always does full reload on all schedules +- MisData has hourly disabled and mass interval of 100800 min + +--- + +## Design + +### 1. New pipelines.json Schema + +Replace `syncModes` with `schedules` structure: + +```json +{ + "settings": { + "timezone": "UTC" + }, + "scheduleDefaults": { + "mass": { "enabled": true, "intervalMinutes": 10080, "prePurge": true, "reIndex": true }, + "daily": { "enabled": true, "intervalMinutes": 1440, "prePurge": false, "reIndex": false }, + "hourly": { "enabled": true, "intervalMinutes": 60, "prePurge": false, "reIndex": false } + }, + "pipelines": { + "WorkOrder_Curr": { + "source": { + "connection": "jde", + "query": "SELECT ... WHERE (date > :dateUpdated ...)", + "massQuery": "SELECT ... FROM F4801", + "parameters": { ... } + }, + "schedules": { + "mass": { }, + "daily": { "updateWhen": "src.LastUpdateDt > tgt.LastUpdateDt" }, + "hourly": { "updateWhen": "src.LastUpdateDt > tgt.LastUpdateDt" } + }, + "destination": { "table": "WorkOrder_Curr", "matchColumns": [...] } + } + } +} +``` + +**Key changes:** +- `scheduleDefaults` provides global defaults for all pipelines +- Each pipeline can override with its own `schedules` section +- `massQuery` = unfiltered full load; `query` = filtered for daily/hourly +- Empty schedule `{ }` = inherit defaults; explicit values = override + +### 2. Code Changes for Schedule Support + +**Replace SyncMode enum with UpdateTypes:** + +Current `SyncMode.cs` has `Mass` and `Incremental`. Use the existing `UpdateTypes` enum (Mass/Daily/Hourly) directly in the pipeline factory: + +```csharp +// EtlPipelineFactory changes: +// - Accept UpdateTypes instead of SyncMode +// - Load schedule config based on UpdateTypes +// - Use massQuery for Mass, query for Daily/Hourly +// - Apply prePurge/reIndex from schedule config +``` + +**TableSyncOperation updates:** + +Currently maps Daily/Hourly → `SyncMode.Incremental`. Pass actual `UpdateTypes` through: + +```csharp +// Before: var mode = updateType == UpdateTypes.Mass ? SyncMode.Mass : SyncMode.Incremental; +// After: Pass updateType directly to factory, let it resolve schedule config +``` + +**DataUpdateRepository interval fix:** + +Currently uses hardcoded intervals (60/1440/10080). Read from pipeline config: + +```csharp +// Before: expectedInterval = updateType switch { Mass => 10080, Daily => 1440, Hourly => 60 } +// After: expectedInterval = pipelineConfig.Schedules[updateType].IntervalMinutes +``` + +**Files affected:** +- `Contracts/SyncMode.cs` (remove or deprecate) +- `Services/EtlPipelineFactory.cs` +- `Services/TableSyncOperation.cs` +- `Services/DataUpdateRepository.cs` +- `Services/ScheduleChecker.cs` + +### 3. GIW Connection for StatusCode + +**Configuration (appsettings.json):** + +```json +{ + "ConnectionStrings": { + "LotFinder": "...", + "JDE": "...", + "CMS": "...", + "GIW": "Data Source=...;User Id=...;Password=..." + } +} +``` + +**IDbConnectionFactory interface:** + +```csharp +public interface IDbConnectionFactory +{ + Task CreateLotFinderConnectionAsync(); + Task CreateJdeConnectionAsync(); + Task CreateCmsConnectionAsync(); + Task CreateGiwConnectionAsync(); // NEW +} +``` + +**DbConnectionFactory implementation:** + +```csharp +public async Task CreateGiwConnectionAsync() +{ + var connection = new OracleConnection(_options.GiwConnectionString); + await connection.OpenAsync(); + return connection; +} +``` + +**DbQuerySource update:** + +```csharp +var connection = connectionType.ToLower() switch +{ + "jde" => await _connectionFactory.CreateJdeConnectionAsync(), + "cms" => await _connectionFactory.CreateCmsConnectionAsync(), + "giw" => await _connectionFactory.CreateGiwConnectionAsync(), // NEW + "lotfinder" => await _connectionFactory.CreateLotFinderConnectionAsync(), + _ => throw new ArgumentException($"Unknown connection type: {connectionType}") +}; +``` + +### 4. Missing Pipelines (8 of 17) + +| Pipeline | Source | JDE Table | Dest Table | Special Notes | +|----------|--------|-----------|------------|---------------| +| WorkOrderTime_Curr | jde | F31122 | WorkOrderTime_Curr | Standard 3-schedule | +| WorkOrderComponent_Curr | jde | F3111 | WorkOrderComponent_Curr | Standard 3-schedule | +| WorkOrderStep_Curr | jde | F3112 | WorkOrderStep_Curr | Standard 3-schedule | +| WorkOrderRouting | jde | F3112Z1 | WorkOrderRouting | Filter invalid dates | +| StatusCode | **giw** | F0005 | StatusCode | Uses GIW connection | +| OrgHierarchy | jde | F30006 | OrgHierarchy | Standard 3-schedule | +| RouteMaster | jde | F3003 | RouteMaster | Standard 3-schedule | +| FunctionCode | jde | PRODDTA.F00192 | FunctionCode | **Always full reload** | + +**Special Cases:** + +FunctionCode - No filtered query exists. All schedules use full reload: +```json +"FunctionCode": { + "schedules": { + "mass": { "prePurge": true, "reIndex": true }, + "daily": { "prePurge": true, "reIndex": true }, + "hourly": { "prePurge": true, "reIndex": true } + } +} +``` + +MisData - Hourly disabled, mass interval is 100800: +```json +"MisData": { + "schedules": { + "mass": { "intervalMinutes": 100800 }, + "daily": { }, + "hourly": { "enabled": false } + } +} +``` + +SQL queries for all 8 pipelines exist in `DATA_SYNC/JDE/*.sql` files. + +--- + +## Implementation Order + +**Phase 1: Schema & Models (no behavior change)** +1. Create new C# models for schedule config (`ScheduleConfig`, `PipelineSchedules`) +2. Update `PipelineConfig` class to use new schema +3. Keep backward compatibility temporarily + +**Phase 2: Infrastructure Changes** +4. Add GIW connection string to `DataAccessOptions` +5. Add `CreateGiwConnectionAsync()` to `IDbConnectionFactory` and implementation +6. Update `DbQuerySource` to handle "giw" connection type + +**Phase 3: Core ETL Changes** +7. Update `EtlPipelineFactory` to accept `UpdateTypes` and read schedule config +8. Update `TableSyncOperation` to pass `UpdateTypes` through +9. Add `massQuery` support (unfiltered query for Mass schedule) +10. Update `DataUpdateRepository` to use per-pipeline intervals + +**Phase 4: Pipeline Configurations** +11. Migrate existing 9 pipelines to new schema format +12. Add 8 missing pipeline definitions with SQL queries +13. Update `appsettings.json` with GIW connection string + +**Phase 5: Validation & Testing** +14. Update existing unit tests for new schema +15. Add tests for schedule-specific behavior +16. Integration test with local SQL Server + +**Estimated scope:** ~15-20 files modified, ~500-800 lines of changes + +--- + +## Source of Truth References + +- **Legacy config:** `DATA_SYNC/DataSyncReport.md` +- **Legacy SQL queries:** `DATA_SYNC/JDE/*.sql`, `DATA_SYNC/CMS/*.sql` +- **Legacy archive queries:** `DATA_SYNC/JDE_ARCHIVE/*.sql`