refactor(datasync): migrate pipelines to new schedule schema

- Add scheduleDefaults with mass/daily/hourly defaults
- Add massQuery (unfiltered) to each pipeline for Mass schedule
- Add schedules section to each pipeline inheriting defaults
- Keep syncModes for backward compatibility
This commit is contained in:
Joseph Doherty
2026-01-07 01:39:22 -05:00
parent da02784feb
commit 3af06b6aff
@@ -2,11 +2,17 @@
"settings": {
"timezone": "UTC"
},
"scheduleDefaults": {
"mass": { "enabled": true, "intervalMinutes": 10080, "prePurge": true, "reIndex": true },
"daily": { "enabled": true, "intervalMinutes": 1440, "prePurge": false, "reIndex": false },
"hourly": { "enabled": true, "intervalMinutes": 60, "prePurge": false, "reIndex": false }
},
"pipelines": {
"WorkOrder_Curr": {
"source": {
"connection": "jde",
"query": "SELECT wo.WADOCO AS WorkOrderNumber, TRIM(wo.WAMMCU) AS BranchCode, TRIM(wo.WALOTN) AS LotNumber, TRIM(wo.WALITM) AS ItemNumber, wo.WAITM AS ShortItemNumber, TRIM(wo.WAPARS) AS ParentWorkOrderNumber, wo.WAUORG / 100.0 AS OrderQuantity, wo.WASOBK / 100.0 AS HeldQuantity, wo.WASOQS / 100.0 AS ShippedQuantity, TRIM(wo.WASRST) AS StatusCode, CASE wo.WADCG WHEN 0 THEN TO_DATE('1900-01-01', 'YYYY-MM-DD') ELSE TO_DATE(wo.WADCG+1900000,'YYYYDDD') END AS StatusCodeUpdateDT, CASE wo.WATRDJ WHEN 0 THEN TO_DATE('1900-01-01', 'YYYY-MM-DD') ELSE TO_DATE(wo.WATRDJ+1900000,'YYYYDDD') END AS IssueDate, CASE wo.WASTRT WHEN 0 THEN TO_DATE('1900-01-01', 'YYYY-MM-DD') ELSE TO_DATE(wo.WASTRT+1900000,'YYYYDDD') END AS StartDate, TRIM(wo.WATRT) AS RoutingType, wo.WAUPMJ AS LastUpdateDate, wo.WATDAY AS LastUpdateTime FROM {ProductionSchema}.F4801 wo WHERE (wo.WAUPMJ > :dateUpdated OR (wo.WAUPMJ = :dateUpdated AND wo.WATDAY >= :timeUpdated))",
"massQuery": "SELECT wo.WADOCO AS WorkOrderNumber, TRIM(wo.WAMMCU) AS BranchCode, TRIM(wo.WALOTN) AS LotNumber, TRIM(wo.WALITM) AS ItemNumber, wo.WAITM AS ShortItemNumber, TRIM(wo.WAPARS) AS ParentWorkOrderNumber, wo.WAUORG / 100.0 AS OrderQuantity, wo.WASOBK / 100.0 AS HeldQuantity, wo.WASOQS / 100.0 AS ShippedQuantity, TRIM(wo.WASRST) AS StatusCode, CASE wo.WADCG WHEN 0 THEN TO_DATE('1900-01-01', 'YYYY-MM-DD') ELSE TO_DATE(wo.WADCG+1900000,'YYYYDDD') END AS StatusCodeUpdateDT, CASE wo.WATRDJ WHEN 0 THEN TO_DATE('1900-01-01', 'YYYY-MM-DD') ELSE TO_DATE(wo.WATRDJ+1900000,'YYYYDDD') END AS IssueDate, CASE wo.WASTRT WHEN 0 THEN TO_DATE('1900-01-01', 'YYYY-MM-DD') ELSE TO_DATE(wo.WASTRT+1900000,'YYYYDDD') END AS StartDate, TRIM(wo.WATRT) AS RoutingType, wo.WAUPMJ AS LastUpdateDate, wo.WATDAY AS LastUpdateTime FROM {ProductionSchema}.F4801 wo",
"parameters": {
"dateUpdated": { "name": ":dateUpdated", "format": "jdeJulian", "source": "offset" },
"timeUpdated": { "name": ":timeUpdated", "format": "jdeTime", "source": "offset" }
@@ -16,6 +22,11 @@
"mass": { "minDtOffset": "-365.00:00:00", "prePurge": true, "reIndex": true },
"incremental": { "minDtOffset": "-7.00:00:00", "updateWhen": "src.LastUpdateDt > tgt.LastUpdateDt" }
},
"schedules": {
"mass": {},
"daily": {},
"hourly": {}
},
"destination": {
"table": "WorkOrder_Curr",
"matchColumns": ["WorkOrderNumber", "BranchCode"],
@@ -26,6 +37,7 @@
"source": {
"connection": "jde",
"query": "SELECT TRIM(lot.IOLOTN) AS LotNumber, TRIM(lot.IOMCU) AS BranchCode, lot.IOITM AS ShortItemNumber, TRIM(lot.IOLITM) AS ItemNumber, lot.IOVEND AS SupplierCode, lot.IOLOTS AS StatusCode, TRIM(lot.IOLOT1) AS Memo1, TRIM(lot.IOLOT2) AS Memo2, TRIM(lot.IOLOT3) AS Memo3, lot.IOUPMJ AS LastUpdateDate, lot.IOTDAY AS LastUpdateTime FROM {ProductionSchema}.F4108 lot WHERE TRIM(lot.IOLOTN) IS NOT NULL AND TRIM(lot.IOMCU) IS NOT NULL AND (lot.IOUPMJ > :dateUpdated OR (lot.IOUPMJ = :dateUpdated AND lot.IOTDAY >= :timeUpdated))",
"massQuery": "SELECT TRIM(lot.IOLOTN) AS LotNumber, TRIM(lot.IOMCU) AS BranchCode, lot.IOITM AS ShortItemNumber, TRIM(lot.IOLITM) AS ItemNumber, lot.IOVEND AS SupplierCode, lot.IOLOTS AS StatusCode, TRIM(lot.IOLOT1) AS Memo1, TRIM(lot.IOLOT2) AS Memo2, TRIM(lot.IOLOT3) AS Memo3, lot.IOUPMJ AS LastUpdateDate, lot.IOTDAY AS LastUpdateTime FROM {ProductionSchema}.F4108 lot WHERE TRIM(lot.IOLOTN) IS NOT NULL AND TRIM(lot.IOMCU) IS NOT NULL",
"parameters": {
"dateUpdated": { "name": ":dateUpdated", "format": "jdeJulian", "source": "offset" },
"timeUpdated": { "name": ":timeUpdated", "format": "jdeTime", "source": "offset" }
@@ -35,6 +47,11 @@
"mass": { "minDtOffset": "-365.00:00:00", "prePurge": true, "reIndex": true },
"incremental": { "minDtOffset": "-7.00:00:00", "updateWhen": "src.LastUpdateDt > tgt.LastUpdateDt" }
},
"schedules": {
"mass": {},
"daily": {},
"hourly": {}
},
"destination": {
"table": "Lot",
"matchColumns": ["LotNumber", "BranchCode"],
@@ -45,6 +62,7 @@
"source": {
"connection": "jde",
"query": "SELECT lu.ILUKID AS UniqueId, lu.ILDOCO AS WorkOrderNumber, TRIM(lu.ILLOTN) AS LotNumber, TRIM(lu.ILMCU) AS BranchCode, lu.ILITM AS ShortItemNumber, lu.ILTRQT AS Quantity, lu.ILTRDJ AS LastUpdateDate, lu.ILTDAY AS LastUpdateTime FROM {ProductionSchema}.F4111 lu WHERE lu.ILDCT = 'IM' AND TRIM(lu.ILLOTN) IS NOT NULL AND (lu.ILTRDJ > :dateUpdated OR (lu.ILTRDJ = :dateUpdated AND lu.ILTDAY >= :timeUpdated))",
"massQuery": "SELECT lu.ILUKID AS UniqueId, lu.ILDOCO AS WorkOrderNumber, TRIM(lu.ILLOTN) AS LotNumber, TRIM(lu.ILMCU) AS BranchCode, lu.ILITM AS ShortItemNumber, lu.ILTRQT AS Quantity, lu.ILTRDJ AS LastUpdateDate, lu.ILTDAY AS LastUpdateTime FROM {ProductionSchema}.F4111 lu WHERE lu.ILDCT = 'IM' AND TRIM(lu.ILLOTN) IS NOT NULL",
"parameters": {
"dateUpdated": { "name": ":dateUpdated", "format": "jdeJulian", "source": "offset" },
"timeUpdated": { "name": ":timeUpdated", "format": "jdeTime", "source": "offset" }
@@ -54,6 +72,11 @@
"mass": { "minDtOffset": "-365.00:00:00", "prePurge": true, "reIndex": true },
"incremental": { "minDtOffset": "-7.00:00:00", "updateWhen": "src.LastUpdateDt > tgt.LastUpdateDt" }
},
"schedules": {
"mass": {},
"daily": {},
"hourly": {}
},
"destination": {
"table": "LotUsage_Curr",
"matchColumns": ["UniqueId"],
@@ -64,6 +87,7 @@
"source": {
"connection": "jde",
"query": "SELECT pn.IMITM AS ShortItemNumber, TRIM(pn.IMLITM) AS ItemNumber, TRIM(pn.IMDSC1) AS Description, TRIM(pn.IMPRP4) AS PlanningFamily, TRIM(pn.IMSTKT) AS StockingType, pn.IMUPMJ AS LastUpdateDate, pn.IMTDAY AS LastUpdateTime FROM {ProductionSchema}.F4101 pn WHERE TRIM(pn.IMLITM) IS NOT NULL AND (pn.IMUPMJ > :dateUpdated OR (pn.IMUPMJ = :dateUpdated AND pn.IMTDAY >= :timeUpdated))",
"massQuery": "SELECT pn.IMITM AS ShortItemNumber, TRIM(pn.IMLITM) AS ItemNumber, TRIM(pn.IMDSC1) AS Description, TRIM(pn.IMPRP4) AS PlanningFamily, TRIM(pn.IMSTKT) AS StockingType, pn.IMUPMJ AS LastUpdateDate, pn.IMTDAY AS LastUpdateTime FROM {ProductionSchema}.F4101 pn WHERE TRIM(pn.IMLITM) IS NOT NULL",
"parameters": {
"dateUpdated": { "name": ":dateUpdated", "format": "jdeJulian", "source": "offset" },
"timeUpdated": { "name": ":timeUpdated", "format": "jdeTime", "source": "offset" }
@@ -73,6 +97,11 @@
"mass": { "minDtOffset": "-365.00:00:00", "prePurge": true, "reIndex": true },
"incremental": { "minDtOffset": "-7.00:00:00", "updateWhen": "src.LastUpdateDt > tgt.LastUpdateDt" }
},
"schedules": {
"mass": {},
"daily": {},
"hourly": {}
},
"destination": {
"table": "Item",
"matchColumns": ["ShortItemNumber"],
@@ -83,6 +112,7 @@
"source": {
"connection": "jde",
"query": "SELECT TRIM(wc.MCMCU) AS Code, TRIM(wc.MCDL01) AS Description, wc.MCUPMJ AS LastUpdateDate, wc.MCUPMT AS LastUpdateTime FROM {ProductionSchema}.F0006 wc WHERE wc.MCSTYL = 'WC' AND (wc.MCUPMJ > :dateUpdated OR (wc.MCUPMJ = :dateUpdated AND wc.MCUPMT >= :timeUpdated))",
"massQuery": "SELECT TRIM(wc.MCMCU) AS Code, TRIM(wc.MCDL01) AS Description, wc.MCUPMJ AS LastUpdateDate, wc.MCUPMT AS LastUpdateTime FROM {ProductionSchema}.F0006 wc WHERE wc.MCSTYL = 'WC'",
"parameters": {
"dateUpdated": { "name": ":dateUpdated", "format": "jdeJulian", "source": "offset" },
"timeUpdated": { "name": ":timeUpdated", "format": "jdeTime", "source": "offset" }
@@ -92,6 +122,11 @@
"mass": { "minDtOffset": "-365.00:00:00", "prePurge": true, "reIndex": true },
"incremental": { "minDtOffset": "-7.00:00:00", "updateWhen": "src.LastUpdateDt > tgt.LastUpdateDt" }
},
"schedules": {
"mass": {},
"daily": {},
"hourly": {}
},
"destination": {
"table": "WorkCenter",
"matchColumns": ["Code"],
@@ -102,6 +137,7 @@
"source": {
"connection": "jde",
"query": "SELECT TRIM(wc.MCMCU) AS Code, TRIM(wc.MCDL01) AS Description, wc.MCUPMJ AS LastUpdateDate, wc.MCUPMT AS LastUpdateTime FROM {ProductionSchema}.F0006 wc WHERE wc.MCSTYL = 'I3' AND (wc.MCUPMJ > :dateUpdated OR (wc.MCUPMJ = :dateUpdated AND wc.MCUPMT >= :timeUpdated))",
"massQuery": "SELECT TRIM(wc.MCMCU) AS Code, TRIM(wc.MCDL01) AS Description, wc.MCUPMJ AS LastUpdateDate, wc.MCUPMT AS LastUpdateTime FROM {ProductionSchema}.F0006 wc WHERE wc.MCSTYL = 'I3'",
"parameters": {
"dateUpdated": { "name": ":dateUpdated", "format": "jdeJulian", "source": "offset" },
"timeUpdated": { "name": ":timeUpdated", "format": "jdeTime", "source": "offset" }
@@ -111,6 +147,11 @@
"mass": { "minDtOffset": "-365.00:00:00", "prePurge": true, "reIndex": true },
"incremental": { "minDtOffset": "-7.00:00:00", "updateWhen": "src.LastUpdateDt > tgt.LastUpdateDt" }
},
"schedules": {
"mass": {},
"daily": {},
"hourly": {}
},
"destination": {
"table": "ProfitCenter",
"matchColumns": ["Code"],
@@ -121,12 +162,18 @@
"source": {
"connection": "jde",
"query": "WITH USER_CTE AS (SELECT ab.ABAN8 AS AddressNumber, TRIM(pro.ULUSER) AS UserId, TRIM(ab.ABALPH) AS FullName, ab.ABUPMJ AS LastUpdateDate, ab.ABUPMT AS LastUpdateTime, ROW_NUMBER() OVER (PARTITION BY ab.ABAN8 ORDER BY ab.ABUPMJ DESC, ab.ABUPMT DESC) RN FROM {ProductionSchema}.F0101 ab LEFT OUTER JOIN {ProductionSchema}.F0092 pro ON (ab.ABAN8 = pro.ULAN8) WHERE ab.ABATE = 'Y') SELECT AddressNumber, UserId, FullName, LastUpdateDate, LastUpdateTime FROM USER_CTE WHERE RN = 1",
"massQuery": "WITH USER_CTE AS (SELECT ab.ABAN8 AS AddressNumber, TRIM(pro.ULUSER) AS UserId, TRIM(ab.ABALPH) AS FullName, ab.ABUPMJ AS LastUpdateDate, ab.ABUPMT AS LastUpdateTime, ROW_NUMBER() OVER (PARTITION BY ab.ABAN8 ORDER BY ab.ABUPMJ DESC, ab.ABUPMT DESC) RN FROM {ProductionSchema}.F0101 ab LEFT OUTER JOIN {ProductionSchema}.F0092 pro ON (ab.ABAN8 = pro.ULAN8) WHERE ab.ABATE = 'Y') SELECT AddressNumber, UserId, FullName, LastUpdateDate, LastUpdateTime FROM USER_CTE WHERE RN = 1",
"parameters": {}
},
"syncModes": {
"mass": { "minDtOffset": "-365.00:00:00", "prePurge": true, "reIndex": true },
"incremental": { "minDtOffset": "-7.00:00:00", "updateWhen": "src.LastUpdateDt > tgt.LastUpdateDt" }
},
"schedules": {
"mass": {},
"daily": {},
"hourly": {}
},
"destination": {
"table": "JdeUser",
"matchColumns": ["AddressNumber"],
@@ -137,6 +184,7 @@
"source": {
"connection": "jde",
"query": "SELECT TRIM(wc.MCMCU) AS Code, TRIM(wc.MCDL01) AS Description, wc.MCUPMJ AS LastUpdateDate, wc.MCUPMT AS LastUpdateTime FROM {ProductionSchema}.F0006 wc WHERE wc.MCSTYL = 'BP' AND (wc.MCUPMJ > :dateUpdated OR (wc.MCUPMJ = :dateUpdated AND wc.MCUPMT >= :timeUpdated))",
"massQuery": "SELECT TRIM(wc.MCMCU) AS Code, TRIM(wc.MCDL01) AS Description, wc.MCUPMJ AS LastUpdateDate, wc.MCUPMT AS LastUpdateTime FROM {ProductionSchema}.F0006 wc WHERE wc.MCSTYL = 'BP'",
"parameters": {
"dateUpdated": { "name": ":dateUpdated", "format": "jdeJulian", "source": "offset" },
"timeUpdated": { "name": ":timeUpdated", "format": "jdeTime", "source": "offset" }
@@ -146,6 +194,11 @@
"mass": { "minDtOffset": "-365.00:00:00", "prePurge": true, "reIndex": true },
"incremental": { "minDtOffset": "-7.00:00:00", "updateWhen": "src.LastUpdateDt > tgt.LastUpdateDt" }
},
"schedules": {
"mass": {},
"daily": {},
"hourly": {}
},
"destination": {
"table": "Branch",
"matchColumns": ["Code"],
@@ -165,6 +218,11 @@
"mass": { "prePurge": true, "reIndex": true },
"incremental": { "minDtOffset": "-7.00:00:00" }
},
"schedules": {
"mass": {},
"daily": {},
"hourly": {}
},
"destination": {
"table": "MisData",
"matchColumns": ["ItemNumber", "BranchCode", "SequenceNumber", "MisNumber", "CharNumber"]