From 3af06b6affcf7ddd971a7853516b48b9f79f450b Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Wed, 7 Jan 2026 01:39:22 -0500 Subject: [PATCH] refactor(datasync): migrate pipelines to new schedule schema - Add scheduleDefaults with mass/daily/hourly defaults - Add massQuery (unfiltered) to each pipeline for Mass schedule - Add schedules section to each pipeline inheriting defaults - Keep syncModes for backward compatibility --- .../Pipelines/pipelines.json | 58 +++++++++++++++++++ 1 file changed, 58 insertions(+) diff --git a/NEW/src/JdeScoping.DataSync/Pipelines/pipelines.json b/NEW/src/JdeScoping.DataSync/Pipelines/pipelines.json index f0d607f..b6cb18e 100644 --- a/NEW/src/JdeScoping.DataSync/Pipelines/pipelines.json +++ b/NEW/src/JdeScoping.DataSync/Pipelines/pipelines.json @@ -2,11 +2,17 @@ "settings": { "timezone": "UTC" }, + "scheduleDefaults": { + "mass": { "enabled": true, "intervalMinutes": 10080, "prePurge": true, "reIndex": true }, + "daily": { "enabled": true, "intervalMinutes": 1440, "prePurge": false, "reIndex": false }, + "hourly": { "enabled": true, "intervalMinutes": 60, "prePurge": false, "reIndex": false } + }, "pipelines": { "WorkOrder_Curr": { "source": { "connection": "jde", "query": "SELECT wo.WADOCO AS WorkOrderNumber, TRIM(wo.WAMMCU) AS BranchCode, TRIM(wo.WALOTN) AS LotNumber, TRIM(wo.WALITM) AS ItemNumber, wo.WAITM AS ShortItemNumber, TRIM(wo.WAPARS) AS ParentWorkOrderNumber, wo.WAUORG / 100.0 AS OrderQuantity, wo.WASOBK / 100.0 AS HeldQuantity, wo.WASOQS / 100.0 AS ShippedQuantity, TRIM(wo.WASRST) AS StatusCode, CASE wo.WADCG WHEN 0 THEN TO_DATE('1900-01-01', 'YYYY-MM-DD') ELSE TO_DATE(wo.WADCG+1900000,'YYYYDDD') END AS StatusCodeUpdateDT, CASE wo.WATRDJ WHEN 0 THEN TO_DATE('1900-01-01', 'YYYY-MM-DD') ELSE TO_DATE(wo.WATRDJ+1900000,'YYYYDDD') END AS IssueDate, CASE wo.WASTRT WHEN 0 THEN TO_DATE('1900-01-01', 'YYYY-MM-DD') ELSE TO_DATE(wo.WASTRT+1900000,'YYYYDDD') END AS StartDate, TRIM(wo.WATRT) AS RoutingType, wo.WAUPMJ AS LastUpdateDate, wo.WATDAY AS LastUpdateTime FROM {ProductionSchema}.F4801 wo WHERE (wo.WAUPMJ > :dateUpdated OR (wo.WAUPMJ = :dateUpdated AND wo.WATDAY >= :timeUpdated))", + "massQuery": "SELECT wo.WADOCO AS WorkOrderNumber, TRIM(wo.WAMMCU) AS BranchCode, TRIM(wo.WALOTN) AS LotNumber, TRIM(wo.WALITM) AS ItemNumber, wo.WAITM AS ShortItemNumber, TRIM(wo.WAPARS) AS ParentWorkOrderNumber, wo.WAUORG / 100.0 AS OrderQuantity, wo.WASOBK / 100.0 AS HeldQuantity, wo.WASOQS / 100.0 AS ShippedQuantity, TRIM(wo.WASRST) AS StatusCode, CASE wo.WADCG WHEN 0 THEN TO_DATE('1900-01-01', 'YYYY-MM-DD') ELSE TO_DATE(wo.WADCG+1900000,'YYYYDDD') END AS StatusCodeUpdateDT, CASE wo.WATRDJ WHEN 0 THEN TO_DATE('1900-01-01', 'YYYY-MM-DD') ELSE TO_DATE(wo.WATRDJ+1900000,'YYYYDDD') END AS IssueDate, CASE wo.WASTRT WHEN 0 THEN TO_DATE('1900-01-01', 'YYYY-MM-DD') ELSE TO_DATE(wo.WASTRT+1900000,'YYYYDDD') END AS StartDate, TRIM(wo.WATRT) AS RoutingType, wo.WAUPMJ AS LastUpdateDate, wo.WATDAY AS LastUpdateTime FROM {ProductionSchema}.F4801 wo", "parameters": { "dateUpdated": { "name": ":dateUpdated", "format": "jdeJulian", "source": "offset" }, "timeUpdated": { "name": ":timeUpdated", "format": "jdeTime", "source": "offset" } @@ -16,6 +22,11 @@ "mass": { "minDtOffset": "-365.00:00:00", "prePurge": true, "reIndex": true }, "incremental": { "minDtOffset": "-7.00:00:00", "updateWhen": "src.LastUpdateDt > tgt.LastUpdateDt" } }, + "schedules": { + "mass": {}, + "daily": {}, + "hourly": {} + }, "destination": { "table": "WorkOrder_Curr", "matchColumns": ["WorkOrderNumber", "BranchCode"], @@ -26,6 +37,7 @@ "source": { "connection": "jde", "query": "SELECT TRIM(lot.IOLOTN) AS LotNumber, TRIM(lot.IOMCU) AS BranchCode, lot.IOITM AS ShortItemNumber, TRIM(lot.IOLITM) AS ItemNumber, lot.IOVEND AS SupplierCode, lot.IOLOTS AS StatusCode, TRIM(lot.IOLOT1) AS Memo1, TRIM(lot.IOLOT2) AS Memo2, TRIM(lot.IOLOT3) AS Memo3, lot.IOUPMJ AS LastUpdateDate, lot.IOTDAY AS LastUpdateTime FROM {ProductionSchema}.F4108 lot WHERE TRIM(lot.IOLOTN) IS NOT NULL AND TRIM(lot.IOMCU) IS NOT NULL AND (lot.IOUPMJ > :dateUpdated OR (lot.IOUPMJ = :dateUpdated AND lot.IOTDAY >= :timeUpdated))", + "massQuery": "SELECT TRIM(lot.IOLOTN) AS LotNumber, TRIM(lot.IOMCU) AS BranchCode, lot.IOITM AS ShortItemNumber, TRIM(lot.IOLITM) AS ItemNumber, lot.IOVEND AS SupplierCode, lot.IOLOTS AS StatusCode, TRIM(lot.IOLOT1) AS Memo1, TRIM(lot.IOLOT2) AS Memo2, TRIM(lot.IOLOT3) AS Memo3, lot.IOUPMJ AS LastUpdateDate, lot.IOTDAY AS LastUpdateTime FROM {ProductionSchema}.F4108 lot WHERE TRIM(lot.IOLOTN) IS NOT NULL AND TRIM(lot.IOMCU) IS NOT NULL", "parameters": { "dateUpdated": { "name": ":dateUpdated", "format": "jdeJulian", "source": "offset" }, "timeUpdated": { "name": ":timeUpdated", "format": "jdeTime", "source": "offset" } @@ -35,6 +47,11 @@ "mass": { "minDtOffset": "-365.00:00:00", "prePurge": true, "reIndex": true }, "incremental": { "minDtOffset": "-7.00:00:00", "updateWhen": "src.LastUpdateDt > tgt.LastUpdateDt" } }, + "schedules": { + "mass": {}, + "daily": {}, + "hourly": {} + }, "destination": { "table": "Lot", "matchColumns": ["LotNumber", "BranchCode"], @@ -45,6 +62,7 @@ "source": { "connection": "jde", "query": "SELECT lu.ILUKID AS UniqueId, lu.ILDOCO AS WorkOrderNumber, TRIM(lu.ILLOTN) AS LotNumber, TRIM(lu.ILMCU) AS BranchCode, lu.ILITM AS ShortItemNumber, lu.ILTRQT AS Quantity, lu.ILTRDJ AS LastUpdateDate, lu.ILTDAY AS LastUpdateTime FROM {ProductionSchema}.F4111 lu WHERE lu.ILDCT = 'IM' AND TRIM(lu.ILLOTN) IS NOT NULL AND (lu.ILTRDJ > :dateUpdated OR (lu.ILTRDJ = :dateUpdated AND lu.ILTDAY >= :timeUpdated))", + "massQuery": "SELECT lu.ILUKID AS UniqueId, lu.ILDOCO AS WorkOrderNumber, TRIM(lu.ILLOTN) AS LotNumber, TRIM(lu.ILMCU) AS BranchCode, lu.ILITM AS ShortItemNumber, lu.ILTRQT AS Quantity, lu.ILTRDJ AS LastUpdateDate, lu.ILTDAY AS LastUpdateTime FROM {ProductionSchema}.F4111 lu WHERE lu.ILDCT = 'IM' AND TRIM(lu.ILLOTN) IS NOT NULL", "parameters": { "dateUpdated": { "name": ":dateUpdated", "format": "jdeJulian", "source": "offset" }, "timeUpdated": { "name": ":timeUpdated", "format": "jdeTime", "source": "offset" } @@ -54,6 +72,11 @@ "mass": { "minDtOffset": "-365.00:00:00", "prePurge": true, "reIndex": true }, "incremental": { "minDtOffset": "-7.00:00:00", "updateWhen": "src.LastUpdateDt > tgt.LastUpdateDt" } }, + "schedules": { + "mass": {}, + "daily": {}, + "hourly": {} + }, "destination": { "table": "LotUsage_Curr", "matchColumns": ["UniqueId"], @@ -64,6 +87,7 @@ "source": { "connection": "jde", "query": "SELECT pn.IMITM AS ShortItemNumber, TRIM(pn.IMLITM) AS ItemNumber, TRIM(pn.IMDSC1) AS Description, TRIM(pn.IMPRP4) AS PlanningFamily, TRIM(pn.IMSTKT) AS StockingType, pn.IMUPMJ AS LastUpdateDate, pn.IMTDAY AS LastUpdateTime FROM {ProductionSchema}.F4101 pn WHERE TRIM(pn.IMLITM) IS NOT NULL AND (pn.IMUPMJ > :dateUpdated OR (pn.IMUPMJ = :dateUpdated AND pn.IMTDAY >= :timeUpdated))", + "massQuery": "SELECT pn.IMITM AS ShortItemNumber, TRIM(pn.IMLITM) AS ItemNumber, TRIM(pn.IMDSC1) AS Description, TRIM(pn.IMPRP4) AS PlanningFamily, TRIM(pn.IMSTKT) AS StockingType, pn.IMUPMJ AS LastUpdateDate, pn.IMTDAY AS LastUpdateTime FROM {ProductionSchema}.F4101 pn WHERE TRIM(pn.IMLITM) IS NOT NULL", "parameters": { "dateUpdated": { "name": ":dateUpdated", "format": "jdeJulian", "source": "offset" }, "timeUpdated": { "name": ":timeUpdated", "format": "jdeTime", "source": "offset" } @@ -73,6 +97,11 @@ "mass": { "minDtOffset": "-365.00:00:00", "prePurge": true, "reIndex": true }, "incremental": { "minDtOffset": "-7.00:00:00", "updateWhen": "src.LastUpdateDt > tgt.LastUpdateDt" } }, + "schedules": { + "mass": {}, + "daily": {}, + "hourly": {} + }, "destination": { "table": "Item", "matchColumns": ["ShortItemNumber"], @@ -83,6 +112,7 @@ "source": { "connection": "jde", "query": "SELECT TRIM(wc.MCMCU) AS Code, TRIM(wc.MCDL01) AS Description, wc.MCUPMJ AS LastUpdateDate, wc.MCUPMT AS LastUpdateTime FROM {ProductionSchema}.F0006 wc WHERE wc.MCSTYL = 'WC' AND (wc.MCUPMJ > :dateUpdated OR (wc.MCUPMJ = :dateUpdated AND wc.MCUPMT >= :timeUpdated))", + "massQuery": "SELECT TRIM(wc.MCMCU) AS Code, TRIM(wc.MCDL01) AS Description, wc.MCUPMJ AS LastUpdateDate, wc.MCUPMT AS LastUpdateTime FROM {ProductionSchema}.F0006 wc WHERE wc.MCSTYL = 'WC'", "parameters": { "dateUpdated": { "name": ":dateUpdated", "format": "jdeJulian", "source": "offset" }, "timeUpdated": { "name": ":timeUpdated", "format": "jdeTime", "source": "offset" } @@ -92,6 +122,11 @@ "mass": { "minDtOffset": "-365.00:00:00", "prePurge": true, "reIndex": true }, "incremental": { "minDtOffset": "-7.00:00:00", "updateWhen": "src.LastUpdateDt > tgt.LastUpdateDt" } }, + "schedules": { + "mass": {}, + "daily": {}, + "hourly": {} + }, "destination": { "table": "WorkCenter", "matchColumns": ["Code"], @@ -102,6 +137,7 @@ "source": { "connection": "jde", "query": "SELECT TRIM(wc.MCMCU) AS Code, TRIM(wc.MCDL01) AS Description, wc.MCUPMJ AS LastUpdateDate, wc.MCUPMT AS LastUpdateTime FROM {ProductionSchema}.F0006 wc WHERE wc.MCSTYL = 'I3' AND (wc.MCUPMJ > :dateUpdated OR (wc.MCUPMJ = :dateUpdated AND wc.MCUPMT >= :timeUpdated))", + "massQuery": "SELECT TRIM(wc.MCMCU) AS Code, TRIM(wc.MCDL01) AS Description, wc.MCUPMJ AS LastUpdateDate, wc.MCUPMT AS LastUpdateTime FROM {ProductionSchema}.F0006 wc WHERE wc.MCSTYL = 'I3'", "parameters": { "dateUpdated": { "name": ":dateUpdated", "format": "jdeJulian", "source": "offset" }, "timeUpdated": { "name": ":timeUpdated", "format": "jdeTime", "source": "offset" } @@ -111,6 +147,11 @@ "mass": { "minDtOffset": "-365.00:00:00", "prePurge": true, "reIndex": true }, "incremental": { "minDtOffset": "-7.00:00:00", "updateWhen": "src.LastUpdateDt > tgt.LastUpdateDt" } }, + "schedules": { + "mass": {}, + "daily": {}, + "hourly": {} + }, "destination": { "table": "ProfitCenter", "matchColumns": ["Code"], @@ -121,12 +162,18 @@ "source": { "connection": "jde", "query": "WITH USER_CTE AS (SELECT ab.ABAN8 AS AddressNumber, TRIM(pro.ULUSER) AS UserId, TRIM(ab.ABALPH) AS FullName, ab.ABUPMJ AS LastUpdateDate, ab.ABUPMT AS LastUpdateTime, ROW_NUMBER() OVER (PARTITION BY ab.ABAN8 ORDER BY ab.ABUPMJ DESC, ab.ABUPMT DESC) RN FROM {ProductionSchema}.F0101 ab LEFT OUTER JOIN {ProductionSchema}.F0092 pro ON (ab.ABAN8 = pro.ULAN8) WHERE ab.ABATE = 'Y') SELECT AddressNumber, UserId, FullName, LastUpdateDate, LastUpdateTime FROM USER_CTE WHERE RN = 1", + "massQuery": "WITH USER_CTE AS (SELECT ab.ABAN8 AS AddressNumber, TRIM(pro.ULUSER) AS UserId, TRIM(ab.ABALPH) AS FullName, ab.ABUPMJ AS LastUpdateDate, ab.ABUPMT AS LastUpdateTime, ROW_NUMBER() OVER (PARTITION BY ab.ABAN8 ORDER BY ab.ABUPMJ DESC, ab.ABUPMT DESC) RN FROM {ProductionSchema}.F0101 ab LEFT OUTER JOIN {ProductionSchema}.F0092 pro ON (ab.ABAN8 = pro.ULAN8) WHERE ab.ABATE = 'Y') SELECT AddressNumber, UserId, FullName, LastUpdateDate, LastUpdateTime FROM USER_CTE WHERE RN = 1", "parameters": {} }, "syncModes": { "mass": { "minDtOffset": "-365.00:00:00", "prePurge": true, "reIndex": true }, "incremental": { "minDtOffset": "-7.00:00:00", "updateWhen": "src.LastUpdateDt > tgt.LastUpdateDt" } }, + "schedules": { + "mass": {}, + "daily": {}, + "hourly": {} + }, "destination": { "table": "JdeUser", "matchColumns": ["AddressNumber"], @@ -137,6 +184,7 @@ "source": { "connection": "jde", "query": "SELECT TRIM(wc.MCMCU) AS Code, TRIM(wc.MCDL01) AS Description, wc.MCUPMJ AS LastUpdateDate, wc.MCUPMT AS LastUpdateTime FROM {ProductionSchema}.F0006 wc WHERE wc.MCSTYL = 'BP' AND (wc.MCUPMJ > :dateUpdated OR (wc.MCUPMJ = :dateUpdated AND wc.MCUPMT >= :timeUpdated))", + "massQuery": "SELECT TRIM(wc.MCMCU) AS Code, TRIM(wc.MCDL01) AS Description, wc.MCUPMJ AS LastUpdateDate, wc.MCUPMT AS LastUpdateTime FROM {ProductionSchema}.F0006 wc WHERE wc.MCSTYL = 'BP'", "parameters": { "dateUpdated": { "name": ":dateUpdated", "format": "jdeJulian", "source": "offset" }, "timeUpdated": { "name": ":timeUpdated", "format": "jdeTime", "source": "offset" } @@ -146,6 +194,11 @@ "mass": { "minDtOffset": "-365.00:00:00", "prePurge": true, "reIndex": true }, "incremental": { "minDtOffset": "-7.00:00:00", "updateWhen": "src.LastUpdateDt > tgt.LastUpdateDt" } }, + "schedules": { + "mass": {}, + "daily": {}, + "hourly": {} + }, "destination": { "table": "Branch", "matchColumns": ["Code"], @@ -165,6 +218,11 @@ "mass": { "prePurge": true, "reIndex": true }, "incremental": { "minDtOffset": "-7.00:00:00" } }, + "schedules": { + "mass": {}, + "daily": {}, + "hourly": {} + }, "destination": { "table": "MisData", "matchColumns": ["ItemNumber", "BranchCode", "SequenceNumber", "MisNumber", "CharNumber"]