fix(datasync): correct MisData postScript and query filtering

- Replace placeholder postScript with actual MIS data post-processing:
  1. Sets ObsoleteDate based on BackLevel records
  2. Sets ObsoleteDate for remaining NULL cases
  3. Rebuilds PK_MisData index

- Add massQuery support to SourceConfig for mode-specific queries
- MisData mass sync now uses query without date filter (like legacy)
- EtlPipelineFactory selects massQuery when in mass mode if available
- Remove unnecessary minDtOffset from MisData mass mode config
This commit is contained in:
Joseph Doherty
2026-01-06 14:01:26 -05:00
parent 4298fb8147
commit e75cd70d94
3 changed files with 20 additions and 7 deletions
@@ -11,7 +11,8 @@ public record PipelineConfig(
public record SourceConfig(
string Connection,
string Query,
Dictionary<string, ParameterConfig>? Parameters);
Dictionary<string, ParameterConfig>? Parameters,
string? MassQuery = null);
public record ParameterConfig(
string Name,
@@ -156,12 +156,13 @@
"source": {
"connection": "cms",
"query": "SELECT DISTINCT mis.P_PART_NUMBER AS ItemNumber, mis.P_OPERATION_NUMBER AS SequenceNumber, item.PITEM_ID AS MISNumber, itemrev.PITEM_REVISION_ID AS RevID, TRIM(mis.P_SITE) AS BranchCode, zim_test_details.P_SEQ_NUMBER AS CharNumber, zim_test_details.P_TEST_DESC AS TestDescription, zim_test_details.P_SAMPL_TYPE AS SamplingType, zim_test_details.P_SAMPL_VALUE AS SamplingValue, zim_test_details.P_TOOLS AS ToolsGauges, zim_test_details.P_WORK_INTR AS WorkInstructions, Status.PNAME AS Status, Status.PDATE_RELEASED AS ReleaseDate FROM INFODBA.PITEM item INNER JOIN INFODBA.PITEMREVISION itemrev ON (item.PUID = itemrev.RITEMS_TAGU) INNER JOIN INFODBA.PRELEASE_STATUS_LIST listing ON (itemrev.PUID = listing.PUID) INNER JOIN INFODBA.PRELEASESTATUS Status ON (listing.PVALU_0 = Status.PUID) INNER JOIN INFODBA.PIMANRELATION imanrel ON (itemrev.PUID = imanrel.RPRIMARY_OBJECTU) INNER JOIN INFODBA.PFORM form ON (imanrel.RSECONDARY_OBJECTU = form.PUID) INNER JOIN INFODBA.PZIMMERMISDETAILS zim_mis ON (form.RDATA_FILEU = zim_mis.PUID) INNER JOIN INFODBA.P_TEST_DETAILS test_details ON (zim_mis.PUID = test_details.PUID) INNER JOIN INFODBA.P_PART_ASSOCIATION ppa ON (ppa.PUID = test_details.PUID) INNER JOIN INFODBA.PMISDATAOBJECT mis ON (mis.PUID = ppa.PVALU_0) INNER JOIN INFODBA.PZIMTESTDETAILS zim_test_details ON (test_details.PVALU_0 = zim_test_details.PUID) WHERE Status.PNAME IN ('Current', 'BackLevel') AND Status.PDATE_RELEASED >= :lastUpdateDT",
"massQuery": "SELECT DISTINCT mis.P_PART_NUMBER AS ItemNumber, mis.P_OPERATION_NUMBER AS SequenceNumber, item.PITEM_ID AS MISNumber, itemrev.PITEM_REVISION_ID AS RevID, TRIM(mis.P_SITE) AS BranchCode, zim_test_details.P_SEQ_NUMBER AS CharNumber, zim_test_details.P_TEST_DESC AS TestDescription, zim_test_details.P_SAMPL_TYPE AS SamplingType, zim_test_details.P_SAMPL_VALUE AS SamplingValue, zim_test_details.P_TOOLS AS ToolsGauges, zim_test_details.P_WORK_INTR AS WorkInstructions, Status.PNAME AS Status, Status.PDATE_RELEASED AS ReleaseDate FROM INFODBA.PITEM item INNER JOIN INFODBA.PITEMREVISION itemrev ON (item.PUID = itemrev.RITEMS_TAGU) INNER JOIN INFODBA.PRELEASE_STATUS_LIST listing ON (itemrev.PUID = listing.PUID) INNER JOIN INFODBA.PRELEASESTATUS Status ON (listing.PVALU_0 = Status.PUID) INNER JOIN INFODBA.PIMANRELATION imanrel ON (itemrev.PUID = imanrel.RPRIMARY_OBJECTU) INNER JOIN INFODBA.PFORM form ON (imanrel.RSECONDARY_OBJECTU = form.PUID) INNER JOIN INFODBA.PZIMMERMISDETAILS zim_mis ON (form.RDATA_FILEU = zim_mis.PUID) INNER JOIN INFODBA.P_TEST_DETAILS test_details ON (zim_mis.PUID = test_details.PUID) INNER JOIN INFODBA.P_PART_ASSOCIATION ppa ON (ppa.PUID = test_details.PUID) INNER JOIN INFODBA.PMISDATAOBJECT mis ON (mis.PUID = ppa.PVALU_0) INNER JOIN INFODBA.PZIMTESTDETAILS zim_test_details ON (test_details.PVALU_0 = zim_test_details.PUID) WHERE Status.PNAME IN ('Current', 'BackLevel')",
"parameters": {
"lastUpdateDT": { "name": ":lastUpdateDT", "format": null, "source": "offset" }
}
},
"syncModes": {
"mass": { "minDtOffset": "-3650.00:00:00", "prePurge": true, "reIndex": true },
"mass": { "prePurge": true, "reIndex": true },
"incremental": { "minDtOffset": "-7.00:00:00" }
},
"destination": {
@@ -169,7 +170,9 @@
"matchColumns": ["ItemNumber", "BranchCode", "SequenceNumber", "MisNumber", "CharNumber"]
},
"postScripts": [
"UPDATE MisData SET ProcessedFlag = 1 WHERE ProcessedFlag IS NULL"
"SET ANSI_WARNINGS OFF; WITH cte AS (SELECT md.MisNumber, md.RevID, md.Status, MIN(md.ReleaseDate) Released FROM dbo.MisData AS md GROUP BY md.MisNumber, md.RevID, md.Status) UPDATE dbo.MisData SET ObsoleteDate = bl.Released FROM cte bl WHERE MisData.MisNumber = bl.MisNumber AND MisData.RevID = bl.RevID AND MisData.Status = 'Current' AND bl.Status = 'BackLevel';",
"WITH cte AS (SELECT md.MisNumber, md.RevID, md.Status, MIN(md.ReleaseDate) Released FROM dbo.MisData AS md GROUP BY md.MisNumber, md.RevID, md.Status) UPDATE dbo.MisData SET ObsoleteDate = (SELECT TOP 1 nl.Released FROM cte nl WHERE MisData.MisNumber = nl.MisNumber AND MisData.RevID < nl.RevID AND MisData.Status = nl.Status ORDER BY nl.RevID) WHERE ObsoleteDate IS NULL;",
"ALTER INDEX [PK_MisData] ON [dbo].[MisData] REBUILD;"
]
}
}
@@ -192,7 +192,7 @@ public class EtlPipelineFactory : IEtlPipelineFactory
var minDt = _minDtOverride ?? ComputeMinDt(modeConfig.MinDtOffset);
// Create source with parameter substitution
var source = CreateSource(_config.Source, minDt);
var source = CreateSource(_config.Source, minDt, _mode);
// Determine destination type (mode override > default by mode)
var destType = modeConfig.Destination?.Type
@@ -251,12 +251,21 @@ public class EtlPipelineFactory : IEtlPipelineFactory
return DateTime.UtcNow.Add(offset);
}
private IImportSource CreateSource(SourceConfig sourceConfig, DateTime? minDt)
private IImportSource CreateSource(SourceConfig sourceConfig, DateTime? minDt, SyncMode mode)
{
// Use massQuery if available and in mass mode, otherwise use the default query
var query = (mode == SyncMode.Mass && !string.IsNullOrEmpty(sourceConfig.MassQuery))
? sourceConfig.MassQuery
: sourceConfig.Query;
var parameters = new Dictionary<string, object>();
var converter = new ParameterFormatConverter(_settings.Timezone);
if (sourceConfig.Parameters != null && minDt.HasValue)
// Only add parameters for incremental mode or when using the default query
// Mass mode with massQuery typically doesn't need date parameters
var needsParameters = mode != SyncMode.Mass || string.IsNullOrEmpty(sourceConfig.MassQuery);
if (sourceConfig.Parameters != null && minDt.HasValue && needsParameters)
{
foreach (var (_, paramConfig) in sourceConfig.Parameters)
{
@@ -278,7 +287,7 @@ public class EtlPipelineFactory : IEtlPipelineFactory
return new DbQuerySource(
_connectionFactory,
sourceConfig.Connection,
sourceConfig.Query,
query,
parameters);
}