docs: add pipeline schedule alignment design

Aligns pipelines.json with legacy DataSyncReport.md:
- Three schedules (Mass/Daily/Hourly) instead of two sync modes
- 8 missing pipelines to add
- GIW connection for StatusCode
- Per-pipeline schedule configuration
This commit is contained in:
Joseph Doherty
2026-01-07 00:07:46 -05:00
parent 621dd41a97
commit 2a9dbe2f62
@@ -0,0 +1,237 @@
# Pipeline Schedule Alignment Design
## Purpose
Align the `pipelines.json` ETL configuration with the legacy system (documented in `DATA_SYNC/DataSyncReport.md`) by:
1. Supporting three explicit schedules (Mass/Daily/Hourly) instead of two sync modes (mass/incremental)
2. Adding 8 missing pipeline definitions
3. Adding GIW connection for StatusCode
4. Fixing Mass to be true full reload (unfiltered)
## Current State
**pipelines.json:**
- 9 pipelines defined
- Only `mass` and `incremental` sync modes
- Mass uses filtered query with `-365.00:00:00` offset
- No per-schedule enabled/disabled flags
**Legacy system (source of truth):**
- 17 active syncs
- Three schedules: Mass (weekly/10080 min), Daily (1440 min), Hourly (60 min)
- Per-schedule configuration (enabled, prepurge, reindex)
- StatusCode uses GIW connection (not JDE)
- FunctionCode always does full reload on all schedules
- MisData has hourly disabled and mass interval of 100800 min
---
## Design
### 1. New pipelines.json Schema
Replace `syncModes` with `schedules` structure:
```json
{
"settings": {
"timezone": "UTC"
},
"scheduleDefaults": {
"mass": { "enabled": true, "intervalMinutes": 10080, "prePurge": true, "reIndex": true },
"daily": { "enabled": true, "intervalMinutes": 1440, "prePurge": false, "reIndex": false },
"hourly": { "enabled": true, "intervalMinutes": 60, "prePurge": false, "reIndex": false }
},
"pipelines": {
"WorkOrder_Curr": {
"source": {
"connection": "jde",
"query": "SELECT ... WHERE (date > :dateUpdated ...)",
"massQuery": "SELECT ... FROM F4801",
"parameters": { ... }
},
"schedules": {
"mass": { },
"daily": { "updateWhen": "src.LastUpdateDt > tgt.LastUpdateDt" },
"hourly": { "updateWhen": "src.LastUpdateDt > tgt.LastUpdateDt" }
},
"destination": { "table": "WorkOrder_Curr", "matchColumns": [...] }
}
}
}
```
**Key changes:**
- `scheduleDefaults` provides global defaults for all pipelines
- Each pipeline can override with its own `schedules` section
- `massQuery` = unfiltered full load; `query` = filtered for daily/hourly
- Empty schedule `{ }` = inherit defaults; explicit values = override
### 2. Code Changes for Schedule Support
**Replace SyncMode enum with UpdateTypes:**
Current `SyncMode.cs` has `Mass` and `Incremental`. Use the existing `UpdateTypes` enum (Mass/Daily/Hourly) directly in the pipeline factory:
```csharp
// EtlPipelineFactory changes:
// - Accept UpdateTypes instead of SyncMode
// - Load schedule config based on UpdateTypes
// - Use massQuery for Mass, query for Daily/Hourly
// - Apply prePurge/reIndex from schedule config
```
**TableSyncOperation updates:**
Currently maps Daily/Hourly → `SyncMode.Incremental`. Pass actual `UpdateTypes` through:
```csharp
// Before: var mode = updateType == UpdateTypes.Mass ? SyncMode.Mass : SyncMode.Incremental;
// After: Pass updateType directly to factory, let it resolve schedule config
```
**DataUpdateRepository interval fix:**
Currently uses hardcoded intervals (60/1440/10080). Read from pipeline config:
```csharp
// Before: expectedInterval = updateType switch { Mass => 10080, Daily => 1440, Hourly => 60 }
// After: expectedInterval = pipelineConfig.Schedules[updateType].IntervalMinutes
```
**Files affected:**
- `Contracts/SyncMode.cs` (remove or deprecate)
- `Services/EtlPipelineFactory.cs`
- `Services/TableSyncOperation.cs`
- `Services/DataUpdateRepository.cs`
- `Services/ScheduleChecker.cs`
### 3. GIW Connection for StatusCode
**Configuration (appsettings.json):**
```json
{
"ConnectionStrings": {
"LotFinder": "...",
"JDE": "...",
"CMS": "...",
"GIW": "Data Source=...;User Id=...;Password=..."
}
}
```
**IDbConnectionFactory interface:**
```csharp
public interface IDbConnectionFactory
{
Task<DbConnection> CreateLotFinderConnectionAsync();
Task<DbConnection> CreateJdeConnectionAsync();
Task<DbConnection> CreateCmsConnectionAsync();
Task<DbConnection> CreateGiwConnectionAsync(); // NEW
}
```
**DbConnectionFactory implementation:**
```csharp
public async Task<DbConnection> CreateGiwConnectionAsync()
{
var connection = new OracleConnection(_options.GiwConnectionString);
await connection.OpenAsync();
return connection;
}
```
**DbQuerySource update:**
```csharp
var connection = connectionType.ToLower() switch
{
"jde" => await _connectionFactory.CreateJdeConnectionAsync(),
"cms" => await _connectionFactory.CreateCmsConnectionAsync(),
"giw" => await _connectionFactory.CreateGiwConnectionAsync(), // NEW
"lotfinder" => await _connectionFactory.CreateLotFinderConnectionAsync(),
_ => throw new ArgumentException($"Unknown connection type: {connectionType}")
};
```
### 4. Missing Pipelines (8 of 17)
| Pipeline | Source | JDE Table | Dest Table | Special Notes |
|----------|--------|-----------|------------|---------------|
| WorkOrderTime_Curr | jde | F31122 | WorkOrderTime_Curr | Standard 3-schedule |
| WorkOrderComponent_Curr | jde | F3111 | WorkOrderComponent_Curr | Standard 3-schedule |
| WorkOrderStep_Curr | jde | F3112 | WorkOrderStep_Curr | Standard 3-schedule |
| WorkOrderRouting | jde | F3112Z1 | WorkOrderRouting | Filter invalid dates |
| StatusCode | **giw** | F0005 | StatusCode | Uses GIW connection |
| OrgHierarchy | jde | F30006 | OrgHierarchy | Standard 3-schedule |
| RouteMaster | jde | F3003 | RouteMaster | Standard 3-schedule |
| FunctionCode | jde | PRODDTA.F00192 | FunctionCode | **Always full reload** |
**Special Cases:**
FunctionCode - No filtered query exists. All schedules use full reload:
```json
"FunctionCode": {
"schedules": {
"mass": { "prePurge": true, "reIndex": true },
"daily": { "prePurge": true, "reIndex": true },
"hourly": { "prePurge": true, "reIndex": true }
}
}
```
MisData - Hourly disabled, mass interval is 100800:
```json
"MisData": {
"schedules": {
"mass": { "intervalMinutes": 100800 },
"daily": { },
"hourly": { "enabled": false }
}
}
```
SQL queries for all 8 pipelines exist in `DATA_SYNC/JDE/*.sql` files.
---
## Implementation Order
**Phase 1: Schema & Models (no behavior change)**
1. Create new C# models for schedule config (`ScheduleConfig`, `PipelineSchedules`)
2. Update `PipelineConfig` class to use new schema
3. Keep backward compatibility temporarily
**Phase 2: Infrastructure Changes**
4. Add GIW connection string to `DataAccessOptions`
5. Add `CreateGiwConnectionAsync()` to `IDbConnectionFactory` and implementation
6. Update `DbQuerySource` to handle "giw" connection type
**Phase 3: Core ETL Changes**
7. Update `EtlPipelineFactory` to accept `UpdateTypes` and read schedule config
8. Update `TableSyncOperation` to pass `UpdateTypes` through
9. Add `massQuery` support (unfiltered query for Mass schedule)
10. Update `DataUpdateRepository` to use per-pipeline intervals
**Phase 4: Pipeline Configurations**
11. Migrate existing 9 pipelines to new schema format
12. Add 8 missing pipeline definitions with SQL queries
13. Update `appsettings.json` with GIW connection string
**Phase 5: Validation & Testing**
14. Update existing unit tests for new schema
15. Add tests for schedule-specific behavior
16. Integration test with local SQL Server
**Estimated scope:** ~15-20 files modified, ~500-800 lines of changes
---
## Source of Truth References
- **Legacy config:** `DATA_SYNC/DataSyncReport.md`
- **Legacy SQL queries:** `DATA_SYNC/JDE/*.sql`, `DATA_SYNC/CMS/*.sql`
- **Legacy archive queries:** `DATA_SYNC/JDE_ARCHIVE/*.sql`