From 4298fb8147b9811b9527d377a708d55debd1452f Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Tue, 6 Jan 2026 13:53:30 -0500 Subject: [PATCH] feat(datasync): add pipelines.json config for all sync tables Configure ETL pipelines for all 9 sync tables: - WorkOrder_Curr, Lot, LotUsage_Curr (JDE) - Item, WorkCenter, ProfitCenter (JDE) - JdeUser, Branch (JDE) - MisData (CMS) with postScript for ProcessedFlag update Each pipeline includes: - Source query with JDE Julian date/time parameters - Sync modes for mass (365 day lookback, prePurge, reIndex) and incremental (7 day lookback) operations - Destination table with match columns and updateWhen conditions --- .../JdeScoping.DataSync.csproj | 6 + .../Pipelines/pipelines.json | 176 ++++++++++++++++++ 2 files changed, 182 insertions(+) create mode 100644 NEW/src/JdeScoping.DataSync/Pipelines/pipelines.json diff --git a/NEW/src/JdeScoping.DataSync/JdeScoping.DataSync.csproj b/NEW/src/JdeScoping.DataSync/JdeScoping.DataSync.csproj index f84a01c..e62322e 100644 --- a/NEW/src/JdeScoping.DataSync/JdeScoping.DataSync.csproj +++ b/NEW/src/JdeScoping.DataSync/JdeScoping.DataSync.csproj @@ -31,4 +31,10 @@ + + + PreserveNewest + + + diff --git a/NEW/src/JdeScoping.DataSync/Pipelines/pipelines.json b/NEW/src/JdeScoping.DataSync/Pipelines/pipelines.json new file mode 100644 index 0000000..5144cc3 --- /dev/null +++ b/NEW/src/JdeScoping.DataSync/Pipelines/pipelines.json @@ -0,0 +1,176 @@ +{ + "settings": { + "timezone": "UTC" + }, + "pipelines": { + "WorkOrder_Curr": { + "source": { + "connection": "jde", + "query": "SELECT wo.WADOCO AS WorkOrderNumber, TRIM(wo.WAMMCU) AS BranchCode, TRIM(wo.WALOTN) AS LotNumber, TRIM(wo.WALITM) AS ItemNumber, wo.WAITM AS ShortItemNumber, TRIM(wo.WAPARS) AS ParentWorkOrderNumber, wo.WAUORG / 100.0 AS OrderQuantity, wo.WASOBK / 100.0 AS HeldQuantity, wo.WASOQS / 100.0 AS ShippedQuantity, TRIM(wo.WASRST) AS StatusCode, CASE wo.WADCG WHEN 0 THEN TO_DATE('1900-01-01', 'YYYY-MM-DD') ELSE TO_DATE(wo.WADCG+1900000,'YYYYDDD') END AS StatusCodeUpdateDT, CASE wo.WATRDJ WHEN 0 THEN TO_DATE('1900-01-01', 'YYYY-MM-DD') ELSE TO_DATE(wo.WATRDJ+1900000,'YYYYDDD') END AS IssueDate, CASE wo.WASTRT WHEN 0 THEN TO_DATE('1900-01-01', 'YYYY-MM-DD') ELSE TO_DATE(wo.WASTRT+1900000,'YYYYDDD') END AS StartDate, TRIM(wo.WATRT) AS RoutingType, wo.WAUPMJ AS LastUpdateDate, wo.WATDAY AS LastUpdateTime FROM {ProductionSchema}.F4801 wo WHERE (wo.WAUPMJ > :dateUpdated OR (wo.WAUPMJ = :dateUpdated AND wo.WATDAY >= :timeUpdated))", + "parameters": { + "dateUpdated": { "name": ":dateUpdated", "format": "jdeJulian", "source": "offset" }, + "timeUpdated": { "name": ":timeUpdated", "format": "jdeTime", "source": "offset" } + } + }, + "syncModes": { + "mass": { "minDtOffset": "-365.00:00:00", "prePurge": true, "reIndex": true }, + "incremental": { "minDtOffset": "-7.00:00:00", "updateWhen": "src.LastUpdateDt > tgt.LastUpdateDt" } + }, + "destination": { + "table": "WorkOrder_Curr", + "matchColumns": ["WorkOrderNumber", "BranchCode"], + "excludeFromUpdate": ["WorkOrderNumber", "BranchCode", "LastUpdateDt"] + } + }, + "Lot": { + "source": { + "connection": "jde", + "query": "SELECT TRIM(lot.IOLOTN) AS LotNumber, TRIM(lot.IOMCU) AS BranchCode, lot.IOITM AS ShortItemNumber, TRIM(lot.IOLITM) AS ItemNumber, lot.IOVEND AS SupplierCode, lot.IOLOTS AS StatusCode, TRIM(lot.IOLOT1) AS Memo1, TRIM(lot.IOLOT2) AS Memo2, TRIM(lot.IOLOT3) AS Memo3, lot.IOUPMJ AS LastUpdateDate, lot.IOTDAY AS LastUpdateTime FROM {ProductionSchema}.F4108 lot WHERE TRIM(lot.IOLOTN) IS NOT NULL AND TRIM(lot.IOMCU) IS NOT NULL AND (lot.IOUPMJ > :dateUpdated OR (lot.IOUPMJ = :dateUpdated AND lot.IOTDAY >= :timeUpdated))", + "parameters": { + "dateUpdated": { "name": ":dateUpdated", "format": "jdeJulian", "source": "offset" }, + "timeUpdated": { "name": ":timeUpdated", "format": "jdeTime", "source": "offset" } + } + }, + "syncModes": { + "mass": { "minDtOffset": "-365.00:00:00", "prePurge": true, "reIndex": true }, + "incremental": { "minDtOffset": "-7.00:00:00", "updateWhen": "src.LastUpdateDt > tgt.LastUpdateDt" } + }, + "destination": { + "table": "Lot", + "matchColumns": ["LotNumber", "BranchCode"], + "excludeFromUpdate": ["LotNumber", "BranchCode", "LastUpdateDt"] + } + }, + "LotUsage_Curr": { + "source": { + "connection": "jde", + "query": "SELECT lu.ILUKID AS UniqueId, lu.ILDOCO AS WorkOrderNumber, TRIM(lu.ILLOTN) AS LotNumber, TRIM(lu.ILMCU) AS BranchCode, lu.ILITM AS ShortItemNumber, lu.ILTRQT AS Quantity, lu.ILTRDJ AS LastUpdateDate, lu.ILTDAY AS LastUpdateTime FROM {ProductionSchema}.F4111 lu WHERE lu.ILDCT = 'IM' AND TRIM(lu.ILLOTN) IS NOT NULL AND (lu.ILTRDJ > :dateUpdated OR (lu.ILTRDJ = :dateUpdated AND lu.ILTDAY >= :timeUpdated))", + "parameters": { + "dateUpdated": { "name": ":dateUpdated", "format": "jdeJulian", "source": "offset" }, + "timeUpdated": { "name": ":timeUpdated", "format": "jdeTime", "source": "offset" } + } + }, + "syncModes": { + "mass": { "minDtOffset": "-365.00:00:00", "prePurge": true, "reIndex": true }, + "incremental": { "minDtOffset": "-7.00:00:00", "updateWhen": "src.LastUpdateDt > tgt.LastUpdateDt" } + }, + "destination": { + "table": "LotUsage_Curr", + "matchColumns": ["UniqueId"], + "excludeFromUpdate": ["UniqueId", "LastUpdateDt"] + } + }, + "Item": { + "source": { + "connection": "jde", + "query": "SELECT pn.IMITM AS ShortItemNumber, TRIM(pn.IMLITM) AS ItemNumber, TRIM(pn.IMDSC1) AS Description, TRIM(pn.IMPRP4) AS PlanningFamily, TRIM(pn.IMSTKT) AS StockingType, pn.IMUPMJ AS LastUpdateDate, pn.IMTDAY AS LastUpdateTime FROM {ProductionSchema}.F4101 pn WHERE TRIM(pn.IMLITM) IS NOT NULL AND (pn.IMUPMJ > :dateUpdated OR (pn.IMUPMJ = :dateUpdated AND pn.IMTDAY >= :timeUpdated))", + "parameters": { + "dateUpdated": { "name": ":dateUpdated", "format": "jdeJulian", "source": "offset" }, + "timeUpdated": { "name": ":timeUpdated", "format": "jdeTime", "source": "offset" } + } + }, + "syncModes": { + "mass": { "minDtOffset": "-365.00:00:00", "prePurge": true, "reIndex": true }, + "incremental": { "minDtOffset": "-7.00:00:00", "updateWhen": "src.LastUpdateDt > tgt.LastUpdateDt" } + }, + "destination": { + "table": "Item", + "matchColumns": ["ShortItemNumber"], + "excludeFromUpdate": ["ShortItemNumber", "LastUpdateDt"] + } + }, + "WorkCenter": { + "source": { + "connection": "jde", + "query": "SELECT TRIM(wc.MCMCU) AS Code, TRIM(wc.MCDL01) AS Description, wc.MCUPMJ AS LastUpdateDate, wc.MCUPMT AS LastUpdateTime FROM {ProductionSchema}.F0006 wc WHERE wc.MCSTYL = 'WC' AND (wc.MCUPMJ > :dateUpdated OR (wc.MCUPMJ = :dateUpdated AND wc.MCUPMT >= :timeUpdated))", + "parameters": { + "dateUpdated": { "name": ":dateUpdated", "format": "jdeJulian", "source": "offset" }, + "timeUpdated": { "name": ":timeUpdated", "format": "jdeTime", "source": "offset" } + } + }, + "syncModes": { + "mass": { "minDtOffset": "-365.00:00:00", "prePurge": true, "reIndex": true }, + "incremental": { "minDtOffset": "-7.00:00:00", "updateWhen": "src.LastUpdateDt > tgt.LastUpdateDt" } + }, + "destination": { + "table": "WorkCenter", + "matchColumns": ["Code"], + "excludeFromUpdate": ["Code", "LastUpdateDt"] + } + }, + "ProfitCenter": { + "source": { + "connection": "jde", + "query": "SELECT TRIM(wc.MCMCU) AS Code, TRIM(wc.MCDL01) AS Description, wc.MCUPMJ AS LastUpdateDate, wc.MCUPMT AS LastUpdateTime FROM {ProductionSchema}.F0006 wc WHERE wc.MCSTYL = 'I3' AND (wc.MCUPMJ > :dateUpdated OR (wc.MCUPMJ = :dateUpdated AND wc.MCUPMT >= :timeUpdated))", + "parameters": { + "dateUpdated": { "name": ":dateUpdated", "format": "jdeJulian", "source": "offset" }, + "timeUpdated": { "name": ":timeUpdated", "format": "jdeTime", "source": "offset" } + } + }, + "syncModes": { + "mass": { "minDtOffset": "-365.00:00:00", "prePurge": true, "reIndex": true }, + "incremental": { "minDtOffset": "-7.00:00:00", "updateWhen": "src.LastUpdateDt > tgt.LastUpdateDt" } + }, + "destination": { + "table": "ProfitCenter", + "matchColumns": ["Code"], + "excludeFromUpdate": ["Code", "LastUpdateDt"] + } + }, + "JdeUser": { + "source": { + "connection": "jde", + "query": "WITH USER_CTE AS (SELECT ab.ABAN8 AS AddressNumber, TRIM(pro.ULUSER) AS UserId, TRIM(ab.ABALPH) AS FullName, ab.ABUPMJ AS LastUpdateDate, ab.ABUPMT AS LastUpdateTime, ROW_NUMBER() OVER (PARTITION BY ab.ABAN8 ORDER BY ab.ABUPMJ DESC, ab.ABUPMT DESC) RN FROM {ProductionSchema}.F0101 ab LEFT OUTER JOIN {ProductionSchema}.F0092 pro ON (ab.ABAN8 = pro.ULAN8) WHERE ab.ABATE = 'Y') SELECT AddressNumber, UserId, FullName, LastUpdateDate, LastUpdateTime FROM USER_CTE WHERE RN = 1", + "parameters": {} + }, + "syncModes": { + "mass": { "minDtOffset": "-365.00:00:00", "prePurge": true, "reIndex": true }, + "incremental": { "minDtOffset": "-7.00:00:00", "updateWhen": "src.LastUpdateDt > tgt.LastUpdateDt" } + }, + "destination": { + "table": "JdeUser", + "matchColumns": ["AddressNumber"], + "excludeFromUpdate": ["AddressNumber", "LastUpdateDt"] + } + }, + "Branch": { + "source": { + "connection": "jde", + "query": "SELECT TRIM(wc.MCMCU) AS Code, TRIM(wc.MCDL01) AS Description, wc.MCUPMJ AS LastUpdateDate, wc.MCUPMT AS LastUpdateTime FROM {ProductionSchema}.F0006 wc WHERE wc.MCSTYL = 'BP' AND (wc.MCUPMJ > :dateUpdated OR (wc.MCUPMJ = :dateUpdated AND wc.MCUPMT >= :timeUpdated))", + "parameters": { + "dateUpdated": { "name": ":dateUpdated", "format": "jdeJulian", "source": "offset" }, + "timeUpdated": { "name": ":timeUpdated", "format": "jdeTime", "source": "offset" } + } + }, + "syncModes": { + "mass": { "minDtOffset": "-365.00:00:00", "prePurge": true, "reIndex": true }, + "incremental": { "minDtOffset": "-7.00:00:00", "updateWhen": "src.LastUpdateDt > tgt.LastUpdateDt" } + }, + "destination": { + "table": "Branch", + "matchColumns": ["Code"], + "excludeFromUpdate": ["Code", "LastUpdateDt"] + } + }, + "MisData": { + "source": { + "connection": "cms", + "query": "SELECT DISTINCT mis.P_PART_NUMBER AS ItemNumber, mis.P_OPERATION_NUMBER AS SequenceNumber, item.PITEM_ID AS MISNumber, itemrev.PITEM_REVISION_ID AS RevID, TRIM(mis.P_SITE) AS BranchCode, zim_test_details.P_SEQ_NUMBER AS CharNumber, zim_test_details.P_TEST_DESC AS TestDescription, zim_test_details.P_SAMPL_TYPE AS SamplingType, zim_test_details.P_SAMPL_VALUE AS SamplingValue, zim_test_details.P_TOOLS AS ToolsGauges, zim_test_details.P_WORK_INTR AS WorkInstructions, Status.PNAME AS Status, Status.PDATE_RELEASED AS ReleaseDate FROM INFODBA.PITEM item INNER JOIN INFODBA.PITEMREVISION itemrev ON (item.PUID = itemrev.RITEMS_TAGU) INNER JOIN INFODBA.PRELEASE_STATUS_LIST listing ON (itemrev.PUID = listing.PUID) INNER JOIN INFODBA.PRELEASESTATUS Status ON (listing.PVALU_0 = Status.PUID) INNER JOIN INFODBA.PIMANRELATION imanrel ON (itemrev.PUID = imanrel.RPRIMARY_OBJECTU) INNER JOIN INFODBA.PFORM form ON (imanrel.RSECONDARY_OBJECTU = form.PUID) INNER JOIN INFODBA.PZIMMERMISDETAILS zim_mis ON (form.RDATA_FILEU = zim_mis.PUID) INNER JOIN INFODBA.P_TEST_DETAILS test_details ON (zim_mis.PUID = test_details.PUID) INNER JOIN INFODBA.P_PART_ASSOCIATION ppa ON (ppa.PUID = test_details.PUID) INNER JOIN INFODBA.PMISDATAOBJECT mis ON (mis.PUID = ppa.PVALU_0) INNER JOIN INFODBA.PZIMTESTDETAILS zim_test_details ON (test_details.PVALU_0 = zim_test_details.PUID) WHERE Status.PNAME IN ('Current', 'BackLevel') AND Status.PDATE_RELEASED >= :lastUpdateDT", + "parameters": { + "lastUpdateDT": { "name": ":lastUpdateDT", "format": null, "source": "offset" } + } + }, + "syncModes": { + "mass": { "minDtOffset": "-3650.00:00:00", "prePurge": true, "reIndex": true }, + "incremental": { "minDtOffset": "-7.00:00:00" } + }, + "destination": { + "table": "MisData", + "matchColumns": ["ItemNumber", "BranchCode", "SequenceNumber", "MisNumber", "CharNumber"] + }, + "postScripts": [ + "UPDATE MisData SET ProcessedFlag = 1 WHERE ProcessedFlag IS NULL" + ] + } + } +}