refactor(configmanager): migrate to per-file pipeline system

Align ConfigManager with DataSync's per-file pipeline format (pipeline.*.json)
by reusing EtlPipelineConfig types directly, eliminating duplicate models and
simplifying the codebase. Removes ~3200 lines of obsolete code.
This commit is contained in:
Joseph Doherty
2026-01-23 02:30:48 -05:00
parent 1b7bb26def
commit ba54a87be5
49 changed files with 1429 additions and 4396 deletions
@@ -1,34 +0,0 @@
namespace JdeScoping.DataSync.Configuration;
/// <summary>
/// Configuration for an ETL pipeline.
/// </summary>
public record PipelineConfig(
SourceConfig Source,
PipelineSchedules? Schedules,
List<TransformerConfig>? Transformers,
DestinationConfig Destination,
List<string>? PreScripts,
List<string>? PostScripts);
public record SourceConfig(
string Connection,
string Query,
Dictionary<string, ParameterConfig>? Parameters,
string? MassQuery = null);
public record ParameterConfig(
string Name,
string? Format,
string Source = "offset",
string? Value = null);
public record TransformerConfig(
string Type,
List<string>? Columns,
Dictionary<string, string>? Mappings);
public record DestinationConfig(
string Table,
List<string>? MatchColumns,
List<string>? ExcludeFromUpdate);
@@ -1,15 +0,0 @@
namespace JdeScoping.DataSync.Configuration;
public record PipelinesRoot(
PipelineSettings? Settings, // Optional - defaults applied if missing
ScheduleDefaults? ScheduleDefaults, // Optional - defaults applied if missing
Dictionary<string, PipelineConfig> Pipelines)
{
/// <summary>Gets the effective pipeline settings, using defaults if not specified.</summary>
public PipelineSettings EffectiveSettings => Settings ?? new PipelineSettings();
/// <summary>Gets the effective schedule defaults, using defaults if not specified.</summary>
public ScheduleDefaults EffectiveScheduleDefaults => ScheduleDefaults ?? new ScheduleDefaults();
}
public record PipelineSettings(
string Timezone = "UTC");
@@ -1,110 +0,0 @@
namespace JdeScoping.DataSync.Configuration;
/// <summary>
/// Configuration for a single schedule type (Mass/Daily/Hourly).
/// </summary>
public record ScheduleConfig
{
/// <summary>
/// Whether this schedule is enabled.
/// </summary>
public bool Enabled { get; init; } = true;
/// <summary>
/// Interval in minutes between syncs.
/// </summary>
public int IntervalMinutes { get; init; }
/// <summary>
/// Whether to truncate the table before import (full reload).
/// </summary>
public bool PrePurge { get; init; }
/// <summary>
/// Whether to rebuild indexes after import.
/// </summary>
public bool ReIndex { get; init; }
/// <summary>
/// Condition for updating existing rows (e.g., "src.LastUpdateDt > tgt.LastUpdateDt").
/// </summary>
public string? UpdateWhen { get; init; }
/// <summary>
/// Merges this config with defaults. Non-null/non-default values in this config override defaults.
/// </summary>
/// <param name="defaults">The default configuration to merge with.</param>
public ScheduleConfig MergeWith(ScheduleConfig defaults)
{
return new ScheduleConfig
{
Enabled = Enabled,
IntervalMinutes = IntervalMinutes > 0 ? IntervalMinutes : defaults.IntervalMinutes,
PrePurge = PrePurge || defaults.PrePurge,
ReIndex = ReIndex || defaults.ReIndex,
UpdateWhen = UpdateWhen ?? defaults.UpdateWhen
};
}
}
/// <summary>
/// Default schedule configurations for all pipelines.
/// </summary>
public record ScheduleDefaults
{
/// <summary>
/// Default Mass schedule config (weekly, full reload).
/// </summary>
public ScheduleConfig Mass { get; init; } = new()
{
Enabled = true,
IntervalMinutes = 10080, // Weekly
PrePurge = true,
ReIndex = true
};
/// <summary>
/// Default Daily schedule config (incremental merge).
/// </summary>
public ScheduleConfig Daily { get; init; } = new()
{
Enabled = true,
IntervalMinutes = 1440, // Daily
PrePurge = false,
ReIndex = false,
UpdateWhen = "src.LastUpdateDt > tgt.LastUpdateDt"
};
/// <summary>
/// Default Hourly schedule config (incremental merge).
/// </summary>
public ScheduleConfig Hourly { get; init; } = new()
{
Enabled = true,
IntervalMinutes = 60, // Hourly
PrePurge = false,
ReIndex = false,
UpdateWhen = "src.LastUpdateDt > tgt.LastUpdateDt"
};
}
/// <summary>
/// Per-pipeline schedule overrides.
/// </summary>
public record PipelineSchedules
{
/// <summary>
/// Gets or initializes the Mass schedule configuration override.
/// </summary>
public ScheduleConfig? Mass { get; init; }
/// <summary>
/// Gets or initializes the Daily schedule configuration override.
/// </summary>
public ScheduleConfig? Daily { get; init; }
/// <summary>
/// Gets or initializes the Hourly schedule configuration override.
/// </summary>
public ScheduleConfig? Hourly { get; init; }
}
@@ -1,57 +0,0 @@
using JdeScoping.Core.Models.Enums;
using JdeScoping.DataSync.Configuration;
using JdeScoping.DataSync.Etl.Pipeline;
namespace JdeScoping.DataSync.Contracts;
public interface IEtlPipelineFactory
{
/// <summary>
/// Creates a pipeline builder for the specified table.
/// </summary>
/// <param name="tableName">The table name (pipeline key).</param>
/// <returns>A builder for configuring the pipeline.</returns>
IEtlPipelineBuilder ForTable(string tableName);
/// <summary>
/// Gets the list of available table names (pipeline keys).
/// </summary>
/// <returns>List of table names with configured pipelines.</returns>
IReadOnlyList<string> GetAvailableTables();
/// <summary>
/// Gets the configuration for a specific pipeline.
/// </summary>
/// <param name="tableName">The table name (pipeline key).</param>
/// <returns>The pipeline configuration, or null if not found.</returns>
PipelineConfig? GetPipelineConfig(string tableName);
/// <summary>
/// Gets the schedule defaults from the configuration.
/// </summary>
/// <returns>The schedule defaults.</returns>
ScheduleDefaults GetScheduleDefaults();
}
public interface IEtlPipelineBuilder
{
/// <summary>
/// Sets the update type for this pipeline (Mass, Daily, or Hourly).
/// </summary>
/// <param name="updateType">The update type.</param>
/// <returns>The builder for chaining.</returns>
IEtlPipelineBuilder WithUpdateType(UpdateTypes updateType);
/// <summary>
/// Sets an optional minimum date for filtering source data.
/// </summary>
/// <param name="minDt">The minimum date, or null to use config offset.</param>
/// <returns>The builder for chaining.</returns>
IEtlPipelineBuilder WithMinimumDate(DateTime? minDt);
/// <summary>
/// Builds the pipeline with the configured settings.
/// </summary>
/// <returns>The configured pipeline.</returns>
EtlPipeline Build();
}
@@ -1,6 +1,6 @@
using JdeScoping.DataSync;
using JdeScoping.DataSync.Options;
using JdeScoping.DataSync.Contracts;
using JdeScoping.DataSync.Options;
using JdeScoping.DataSync.HealthChecks;
using JdeScoping.DataSync.Services;
using JdeScoping.DataSync.Telemetry;
@@ -35,14 +35,10 @@ public static class DataSyncDependencyInjection
.ValidateDataAnnotations()
.ValidateOnStart();
// Pipeline configuration (new ETL infrastructure)
services.AddOptions<PipelineOptions>()
.Bind(configuration.GetSection(PipelineOptions.SectionName));
// Pipeline builder service (builds ETL pipelines from config)
services.AddSingleton<IEtlPipelineBuilder, EtlPipelineBuilderService>();
// Pipeline factory (new ETL infrastructure)
services.AddSingleton<IEtlPipelineFactory, EtlPipelineFactory>();
// Pipeline registry services (new hot-reload infrastructure)
// Pipeline registry services (hot-reload infrastructure)
services.AddSingleton<IPipelineValidator, PipelineValidator>();
services.AddSingleton<IPipelineRegistry, PipelineRegistry>();
@@ -27,10 +27,4 @@
<PackageReference Include="ZstdSharp.Port" Version="0.8.1" />
</ItemGroup>
<ItemGroup>
<Content Include="Pipelines\pipelines.json">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
</Content>
</ItemGroup>
</Project>
@@ -1,6 +1,5 @@
using JdeScoping.Core.Models.Enums;
using JdeScoping.DataSync.Configuration;
using JdeScoping.DataSync.Options;
namespace JdeScoping.DataSync.Models;
@@ -41,16 +40,10 @@ public class DataUpdateTask
public DateTime? MinimumDt { get; init; }
/// <summary>
/// The pipeline configuration for this task (new format).
/// The pipeline configuration for this task.
/// </summary>
public EtlPipelineConfig? Pipeline { get; init; }
/// <summary>
/// The data source configuration for this task (legacy format - will be removed).
/// </summary>
[Obsolete("Use Pipeline instead. This property exists for backward compatibility during migration.")]
public DataSourceConfig? Config { get; init; }
/// <summary>
/// Gets a unique key for logging purposes.
/// </summary>
@@ -1,58 +0,0 @@
namespace JdeScoping.DataSync.Options;
/// <summary>
/// Configuration for a single data source table sync.
/// </summary>
public class DataSourceConfig
{
/// <summary>
/// Target table name in SQL Server cache.
/// </summary>
public required string TableName { get; set; }
/// <summary>
/// Source system: "JDE" or "CMS".
/// </summary>
public required string SourceSystem { get; set; }
/// <summary>
/// Source data identifier (e.g., "WORKORDER", "LOTUSAGE").
/// </summary>
public string SourceData { get; set; } = string.Empty;
/// <summary>
/// Whether this data source is enabled for sync.
/// </summary>
public bool IsEnabled { get; set; } = true;
/// <summary>
/// Mass sync schedule configuration.
/// </summary>
public ScheduleConfig MassConfig { get; set; } = new();
/// <summary>
/// Daily incremental sync configuration.
/// </summary>
public ScheduleConfig DailyConfig { get; set; } = new();
/// <summary>
/// Hourly incremental sync configuration.
/// </summary>
public ScheduleConfig HourlyConfig { get; set; } = new();
}
/// <summary>
/// Schedule configuration for a sync type (Mass/Daily/Hourly).
/// </summary>
public class ScheduleConfig
{
/// <summary>
/// Whether this schedule is enabled.
/// </summary>
public bool Enabled { get; set; } = true;
/// <summary>
/// Interval in minutes between syncs.
/// </summary>
public int IntervalMinutes { get; set; }
}
@@ -69,9 +69,4 @@ public class DataSyncOptions
/// If false, invalid enabled pipelines are skipped with warnings.
/// </summary>
public bool StrictPipelineValidation { get; set; } = true;
/// <summary>
/// Per-table data source configurations.
/// </summary>
public List<DataSourceConfig> DataSources { get; set; } = [];
}
@@ -1,11 +0,0 @@
namespace JdeScoping.DataSync.Options;
public class PipelineOptions
{
public const string SectionName = "Pipelines";
/// <summary>
/// Gets or sets the path to the pipeline configuration file.
/// </summary>
public string ConfigPath { get; set; } = "Pipelines/pipelines.json";
}
@@ -1,392 +0,0 @@
{
"settings": {
"timezone": "UTC"
},
"scheduleDefaults": {
"mass": { "enabled": true, "intervalMinutes": 10080, "prePurge": true, "reIndex": true },
"daily": { "enabled": true, "intervalMinutes": 1440, "prePurge": false, "reIndex": false },
"hourly": { "enabled": true, "intervalMinutes": 60, "prePurge": false, "reIndex": false }
},
"pipelines": {
"WorkOrder_Curr": {
"source": {
"connection": "jde",
"query": "SELECT wo.WADOCO AS WorkOrderNumber, TRIM(wo.WAMMCU) AS BranchCode, TRIM(wo.WALOTN) AS LotNumber, TRIM(wo.WALITM) AS ItemNumber, wo.WAITM AS ShortItemNumber, TRIM(wo.WAPARS) AS ParentWorkOrderNumber, wo.WAUORG / 100.0 AS OrderQuantity, wo.WASOBK / 100.0 AS HeldQuantity, wo.WASOQS / 100.0 AS ShippedQuantity, TRIM(wo.WASRST) AS StatusCode, CASE wo.WADCG WHEN 0 THEN TO_DATE('1900-01-01', 'YYYY-MM-DD') ELSE TO_DATE(wo.WADCG+1900000,'YYYYDDD') END AS StatusCodeUpdateDT, CASE wo.WATRDJ WHEN 0 THEN TO_DATE('1900-01-01', 'YYYY-MM-DD') ELSE TO_DATE(wo.WATRDJ+1900000,'YYYYDDD') END AS IssueDate, CASE wo.WASTRT WHEN 0 THEN TO_DATE('1900-01-01', 'YYYY-MM-DD') ELSE TO_DATE(wo.WASTRT+1900000,'YYYYDDD') END AS StartDate, TRIM(wo.WATRT) AS RoutingType, wo.WAUPMJ AS LastUpdateDate, wo.WATDAY AS LastUpdateTime FROM {ProductionSchema}.F4801 wo WHERE (wo.WAUPMJ > :dateUpdated OR (wo.WAUPMJ = :dateUpdated AND wo.WATDAY >= :timeUpdated))",
"massQuery": "SELECT wo.WADOCO AS WorkOrderNumber, TRIM(wo.WAMMCU) AS BranchCode, TRIM(wo.WALOTN) AS LotNumber, TRIM(wo.WALITM) AS ItemNumber, wo.WAITM AS ShortItemNumber, TRIM(wo.WAPARS) AS ParentWorkOrderNumber, wo.WAUORG / 100.0 AS OrderQuantity, wo.WASOBK / 100.0 AS HeldQuantity, wo.WASOQS / 100.0 AS ShippedQuantity, TRIM(wo.WASRST) AS StatusCode, CASE wo.WADCG WHEN 0 THEN TO_DATE('1900-01-01', 'YYYY-MM-DD') ELSE TO_DATE(wo.WADCG+1900000,'YYYYDDD') END AS StatusCodeUpdateDT, CASE wo.WATRDJ WHEN 0 THEN TO_DATE('1900-01-01', 'YYYY-MM-DD') ELSE TO_DATE(wo.WATRDJ+1900000,'YYYYDDD') END AS IssueDate, CASE wo.WASTRT WHEN 0 THEN TO_DATE('1900-01-01', 'YYYY-MM-DD') ELSE TO_DATE(wo.WASTRT+1900000,'YYYYDDD') END AS StartDate, TRIM(wo.WATRT) AS RoutingType, wo.WAUPMJ AS LastUpdateDate, wo.WATDAY AS LastUpdateTime FROM {ProductionSchema}.F4801 wo",
"parameters": {
"dateUpdated": { "name": ":dateUpdated", "format": "jdeJulian", "source": "offset" },
"timeUpdated": { "name": ":timeUpdated", "format": "jdeTime", "source": "offset" }
}
},
"schedules": {
"mass": {},
"daily": {},
"hourly": {}
},
"destination": {
"table": "WorkOrder_Curr",
"matchColumns": ["WorkOrderNumber", "BranchCode"],
"excludeFromUpdate": ["WorkOrderNumber", "BranchCode", "LastUpdateDt"]
}
},
"Lot": {
"source": {
"connection": "jde",
"query": "SELECT TRIM(lot.IOLOTN) AS LotNumber, TRIM(lot.IOMCU) AS BranchCode, lot.IOITM AS ShortItemNumber, TRIM(lot.IOLITM) AS ItemNumber, lot.IOVEND AS SupplierCode, lot.IOLOTS AS StatusCode, TRIM(lot.IOLOT1) AS Memo1, TRIM(lot.IOLOT2) AS Memo2, TRIM(lot.IOLOT3) AS Memo3, lot.IOUPMJ AS LastUpdateDate, lot.IOTDAY AS LastUpdateTime FROM {ProductionSchema}.F4108 lot WHERE TRIM(lot.IOLOTN) IS NOT NULL AND TRIM(lot.IOMCU) IS NOT NULL AND (lot.IOUPMJ > :dateUpdated OR (lot.IOUPMJ = :dateUpdated AND lot.IOTDAY >= :timeUpdated))",
"massQuery": "SELECT TRIM(lot.IOLOTN) AS LotNumber, TRIM(lot.IOMCU) AS BranchCode, lot.IOITM AS ShortItemNumber, TRIM(lot.IOLITM) AS ItemNumber, lot.IOVEND AS SupplierCode, lot.IOLOTS AS StatusCode, TRIM(lot.IOLOT1) AS Memo1, TRIM(lot.IOLOT2) AS Memo2, TRIM(lot.IOLOT3) AS Memo3, lot.IOUPMJ AS LastUpdateDate, lot.IOTDAY AS LastUpdateTime FROM {ProductionSchema}.F4108 lot WHERE TRIM(lot.IOLOTN) IS NOT NULL AND TRIM(lot.IOMCU) IS NOT NULL",
"parameters": {
"dateUpdated": { "name": ":dateUpdated", "format": "jdeJulian", "source": "offset" },
"timeUpdated": { "name": ":timeUpdated", "format": "jdeTime", "source": "offset" }
}
},
"schedules": {
"mass": {},
"daily": {},
"hourly": {}
},
"destination": {
"table": "Lot",
"matchColumns": ["LotNumber", "BranchCode"],
"excludeFromUpdate": ["LotNumber", "BranchCode", "LastUpdateDt"]
}
},
"LotUsage_Curr": {
"source": {
"connection": "jde",
"query": "SELECT lu.ILUKID AS UniqueId, lu.ILDOCO AS WorkOrderNumber, TRIM(lu.ILLOTN) AS LotNumber, TRIM(lu.ILMCU) AS BranchCode, lu.ILITM AS ShortItemNumber, lu.ILTRQT AS Quantity, lu.ILTRDJ AS LastUpdateDate, lu.ILTDAY AS LastUpdateTime FROM {ProductionSchema}.F4111 lu WHERE lu.ILDCT = 'IM' AND TRIM(lu.ILLOTN) IS NOT NULL AND (lu.ILTRDJ > :dateUpdated OR (lu.ILTRDJ = :dateUpdated AND lu.ILTDAY >= :timeUpdated))",
"massQuery": "SELECT lu.ILUKID AS UniqueId, lu.ILDOCO AS WorkOrderNumber, TRIM(lu.ILLOTN) AS LotNumber, TRIM(lu.ILMCU) AS BranchCode, lu.ILITM AS ShortItemNumber, lu.ILTRQT AS Quantity, lu.ILTRDJ AS LastUpdateDate, lu.ILTDAY AS LastUpdateTime FROM {ProductionSchema}.F4111 lu WHERE lu.ILDCT = 'IM' AND TRIM(lu.ILLOTN) IS NOT NULL",
"parameters": {
"dateUpdated": { "name": ":dateUpdated", "format": "jdeJulian", "source": "offset" },
"timeUpdated": { "name": ":timeUpdated", "format": "jdeTime", "source": "offset" }
}
},
"schedules": {
"mass": {},
"daily": {},
"hourly": {}
},
"destination": {
"table": "LotUsage_Curr",
"matchColumns": ["UniqueId"],
"excludeFromUpdate": ["UniqueId", "LastUpdateDt"]
}
},
"Item": {
"source": {
"connection": "jde",
"query": "SELECT pn.IMITM AS ShortItemNumber, TRIM(pn.IMLITM) AS ItemNumber, TRIM(pn.IMDSC1) AS Description, TRIM(pn.IMPRP4) AS PlanningFamily, TRIM(pn.IMSTKT) AS StockingType, pn.IMUPMJ AS LastUpdateDate, pn.IMTDAY AS LastUpdateTime FROM {ProductionSchema}.F4101 pn WHERE TRIM(pn.IMLITM) IS NOT NULL AND (pn.IMUPMJ > :dateUpdated OR (pn.IMUPMJ = :dateUpdated AND pn.IMTDAY >= :timeUpdated))",
"massQuery": "SELECT pn.IMITM AS ShortItemNumber, TRIM(pn.IMLITM) AS ItemNumber, TRIM(pn.IMDSC1) AS Description, TRIM(pn.IMPRP4) AS PlanningFamily, TRIM(pn.IMSTKT) AS StockingType, pn.IMUPMJ AS LastUpdateDate, pn.IMTDAY AS LastUpdateTime FROM {ProductionSchema}.F4101 pn WHERE TRIM(pn.IMLITM) IS NOT NULL",
"parameters": {
"dateUpdated": { "name": ":dateUpdated", "format": "jdeJulian", "source": "offset" },
"timeUpdated": { "name": ":timeUpdated", "format": "jdeTime", "source": "offset" }
}
},
"schedules": {
"mass": {},
"daily": {},
"hourly": {}
},
"destination": {
"table": "Item",
"matchColumns": ["ShortItemNumber"],
"excludeFromUpdate": ["ShortItemNumber", "LastUpdateDt"]
}
},
"WorkCenter": {
"source": {
"connection": "jde",
"query": "SELECT TRIM(wc.MCMCU) AS Code, TRIM(wc.MCDL01) AS Description, wc.MCUPMJ AS LastUpdateDate, wc.MCUPMT AS LastUpdateTime FROM {ProductionSchema}.F0006 wc WHERE wc.MCSTYL = 'WC' AND (wc.MCUPMJ > :dateUpdated OR (wc.MCUPMJ = :dateUpdated AND wc.MCUPMT >= :timeUpdated))",
"massQuery": "SELECT TRIM(wc.MCMCU) AS Code, TRIM(wc.MCDL01) AS Description, wc.MCUPMJ AS LastUpdateDate, wc.MCUPMT AS LastUpdateTime FROM {ProductionSchema}.F0006 wc WHERE wc.MCSTYL = 'WC'",
"parameters": {
"dateUpdated": { "name": ":dateUpdated", "format": "jdeJulian", "source": "offset" },
"timeUpdated": { "name": ":timeUpdated", "format": "jdeTime", "source": "offset" }
}
},
"schedules": {
"mass": {},
"daily": {},
"hourly": {}
},
"destination": {
"table": "WorkCenter",
"matchColumns": ["Code"],
"excludeFromUpdate": ["Code", "LastUpdateDt"]
}
},
"ProfitCenter": {
"source": {
"connection": "jde",
"query": "SELECT TRIM(wc.MCMCU) AS Code, TRIM(wc.MCDL01) AS Description, wc.MCUPMJ AS LastUpdateDate, wc.MCUPMT AS LastUpdateTime FROM {ProductionSchema}.F0006 wc WHERE wc.MCSTYL = 'I3' AND (wc.MCUPMJ > :dateUpdated OR (wc.MCUPMJ = :dateUpdated AND wc.MCUPMT >= :timeUpdated))",
"massQuery": "SELECT TRIM(wc.MCMCU) AS Code, TRIM(wc.MCDL01) AS Description, wc.MCUPMJ AS LastUpdateDate, wc.MCUPMT AS LastUpdateTime FROM {ProductionSchema}.F0006 wc WHERE wc.MCSTYL = 'I3'",
"parameters": {
"dateUpdated": { "name": ":dateUpdated", "format": "jdeJulian", "source": "offset" },
"timeUpdated": { "name": ":timeUpdated", "format": "jdeTime", "source": "offset" }
}
},
"schedules": {
"mass": {},
"daily": {},
"hourly": {}
},
"destination": {
"table": "ProfitCenter",
"matchColumns": ["Code"],
"excludeFromUpdate": ["Code", "LastUpdateDt"]
}
},
"JdeUser": {
"source": {
"connection": "jde",
"query": "WITH USER_CTE AS (SELECT ab.ABAN8 AS AddressNumber, TRIM(pro.ULUSER) AS UserId, TRIM(ab.ABALPH) AS FullName, ab.ABUPMJ AS LastUpdateDate, ab.ABUPMT AS LastUpdateTime, ROW_NUMBER() OVER (PARTITION BY ab.ABAN8 ORDER BY ab.ABUPMJ DESC, ab.ABUPMT DESC) RN FROM {ProductionSchema}.F0101 ab LEFT OUTER JOIN {ProductionSchema}.F0092 pro ON (ab.ABAN8 = pro.ULAN8) WHERE ab.ABATE = 'Y') SELECT AddressNumber, UserId, FullName, LastUpdateDate, LastUpdateTime FROM USER_CTE WHERE RN = 1",
"massQuery": "WITH USER_CTE AS (SELECT ab.ABAN8 AS AddressNumber, TRIM(pro.ULUSER) AS UserId, TRIM(ab.ABALPH) AS FullName, ab.ABUPMJ AS LastUpdateDate, ab.ABUPMT AS LastUpdateTime, ROW_NUMBER() OVER (PARTITION BY ab.ABAN8 ORDER BY ab.ABUPMJ DESC, ab.ABUPMT DESC) RN FROM {ProductionSchema}.F0101 ab LEFT OUTER JOIN {ProductionSchema}.F0092 pro ON (ab.ABAN8 = pro.ULAN8) WHERE ab.ABATE = 'Y') SELECT AddressNumber, UserId, FullName, LastUpdateDate, LastUpdateTime FROM USER_CTE WHERE RN = 1",
"parameters": {}
},
"schedules": {
"mass": {},
"daily": {},
"hourly": {}
},
"destination": {
"table": "JdeUser",
"matchColumns": ["AddressNumber"],
"excludeFromUpdate": ["AddressNumber", "LastUpdateDt"]
}
},
"Branch": {
"source": {
"connection": "jde",
"query": "SELECT TRIM(wc.MCMCU) AS Code, TRIM(wc.MCDL01) AS Description, wc.MCUPMJ AS LastUpdateDate, wc.MCUPMT AS LastUpdateTime FROM {ProductionSchema}.F0006 wc WHERE wc.MCSTYL = 'BP' AND (wc.MCUPMJ > :dateUpdated OR (wc.MCUPMJ = :dateUpdated AND wc.MCUPMT >= :timeUpdated))",
"massQuery": "SELECT TRIM(wc.MCMCU) AS Code, TRIM(wc.MCDL01) AS Description, wc.MCUPMJ AS LastUpdateDate, wc.MCUPMT AS LastUpdateTime FROM {ProductionSchema}.F0006 wc WHERE wc.MCSTYL = 'BP'",
"parameters": {
"dateUpdated": { "name": ":dateUpdated", "format": "jdeJulian", "source": "offset" },
"timeUpdated": { "name": ":timeUpdated", "format": "jdeTime", "source": "offset" }
}
},
"schedules": {
"mass": {},
"daily": {},
"hourly": {}
},
"destination": {
"table": "Branch",
"matchColumns": ["Code"],
"excludeFromUpdate": ["Code", "LastUpdateDt"]
}
},
"MisData_Curr": {
"source": {
"connection": "cms",
"query": "SELECT DISTINCT mis.P_PART_NUMBER AS ItemNumber, mis.P_OPERATION_NUMBER AS SequenceNumber, item.PITEM_ID AS MISNumber, itemrev.PITEM_REVISION_ID AS RevID, TRIM(mis.P_SITE) AS BranchCode, zim_test_details.P_SEQ_NUMBER AS CharNumber, zim_test_details.P_TEST_DESC AS TestDescription, zim_test_details.P_SAMPL_TYPE AS SamplingType, zim_test_details.P_SAMPL_VALUE AS SamplingValue, zim_test_details.P_TOOLS AS ToolsGauges, zim_test_details.P_WORK_INTR AS WorkInstructions, Status.PNAME AS Status, Status.PDATE_RELEASED AS ReleaseDate FROM INFODBA.PITEM item INNER JOIN INFODBA.PITEMREVISION itemrev ON (item.PUID = itemrev.RITEMS_TAGU) INNER JOIN INFODBA.PRELEASE_STATUS_LIST listing ON (itemrev.PUID = listing.PUID) INNER JOIN INFODBA.PRELEASESTATUS Status ON (listing.PVALU_0 = Status.PUID) INNER JOIN INFODBA.PIMANRELATION imanrel ON (itemrev.PUID = imanrel.RPRIMARY_OBJECTU) INNER JOIN INFODBA.PFORM form ON (imanrel.RSECONDARY_OBJECTU = form.PUID) INNER JOIN INFODBA.PZIMMERMISDETAILS zim_mis ON (form.RDATA_FILEU = zim_mis.PUID) INNER JOIN INFODBA.P_TEST_DETAILS test_details ON (zim_mis.PUID = test_details.PUID) INNER JOIN INFODBA.P_PART_ASSOCIATION ppa ON (ppa.PUID = test_details.PUID) INNER JOIN INFODBA.PMISDATAOBJECT mis ON (mis.PUID = ppa.PVALU_0) INNER JOIN INFODBA.PZIMTESTDETAILS zim_test_details ON (test_details.PVALU_0 = zim_test_details.PUID) WHERE Status.PNAME = 'Current' AND Status.PDATE_RELEASED >= :lastUpdateDT",
"massQuery": "SELECT DISTINCT mis.P_PART_NUMBER AS ItemNumber, mis.P_OPERATION_NUMBER AS SequenceNumber, item.PITEM_ID AS MISNumber, itemrev.PITEM_REVISION_ID AS RevID, TRIM(mis.P_SITE) AS BranchCode, zim_test_details.P_SEQ_NUMBER AS CharNumber, zim_test_details.P_TEST_DESC AS TestDescription, zim_test_details.P_SAMPL_TYPE AS SamplingType, zim_test_details.P_SAMPL_VALUE AS SamplingValue, zim_test_details.P_TOOLS AS ToolsGauges, zim_test_details.P_WORK_INTR AS WorkInstructions, Status.PNAME AS Status, Status.PDATE_RELEASED AS ReleaseDate FROM INFODBA.PITEM item INNER JOIN INFODBA.PITEMREVISION itemrev ON (item.PUID = itemrev.RITEMS_TAGU) INNER JOIN INFODBA.PRELEASE_STATUS_LIST listing ON (itemrev.PUID = listing.PUID) INNER JOIN INFODBA.PRELEASESTATUS Status ON (listing.PVALU_0 = Status.PUID) INNER JOIN INFODBA.PIMANRELATION imanrel ON (itemrev.PUID = imanrel.RPRIMARY_OBJECTU) INNER JOIN INFODBA.PFORM form ON (imanrel.RSECONDARY_OBJECTU = form.PUID) INNER JOIN INFODBA.PZIMMERMISDETAILS zim_mis ON (form.RDATA_FILEU = zim_mis.PUID) INNER JOIN INFODBA.P_TEST_DETAILS test_details ON (zim_mis.PUID = test_details.PUID) INNER JOIN INFODBA.P_PART_ASSOCIATION ppa ON (ppa.PUID = test_details.PUID) INNER JOIN INFODBA.PMISDATAOBJECT mis ON (mis.PUID = ppa.PVALU_0) INNER JOIN INFODBA.PZIMTESTDETAILS zim_test_details ON (test_details.PVALU_0 = zim_test_details.PUID) WHERE Status.PNAME = 'Current'",
"parameters": {
"lastUpdateDT": { "name": ":lastUpdateDT", "format": null, "source": "offset" }
}
},
"schedules": {
"mass": { "intervalMinutes": 100800 },
"daily": {},
"hourly": { "enabled": false }
},
"destination": {
"table": "MisData_Curr",
"matchColumns": ["ItemNumber", "BranchCode", "SequenceNumber", "MisNumber", "CharNumber"]
},
"postScripts": [
"SET ANSI_WARNINGS OFF; WITH cte AS (SELECT md.MisNumber, md.RevID, md.Status, MIN(md.ReleaseDate) Released FROM dbo.MisData_Curr AS md GROUP BY md.MisNumber, md.RevID, md.Status) UPDATE dbo.MisData_Curr SET ObsoleteDate = bl.Released FROM cte bl WHERE MisData_Curr.MisNumber = bl.MisNumber AND MisData_Curr.RevID = bl.RevID AND MisData_Curr.Status = 'Current' AND bl.Status = 'BackLevel';",
"WITH cte AS (SELECT md.MisNumber, md.RevID, md.Status, MIN(md.ReleaseDate) Released FROM dbo.MisData_Curr AS md GROUP BY md.MisNumber, md.RevID, md.Status) UPDATE dbo.MisData_Curr SET ObsoleteDate = (SELECT TOP 1 nl.Released FROM cte nl WHERE MisData_Curr.MisNumber = nl.MisNumber AND MisData_Curr.RevID < nl.RevID AND MisData_Curr.Status = nl.Status ORDER BY nl.RevID) WHERE ObsoleteDate IS NULL;",
"MERGE INTO dbo.MisData_Hist AS target USING (SELECT * FROM dbo.MisData_Curr WHERE Status = 'BackLevel') AS source ON target.ItemNumber = source.ItemNumber AND target.BranchCode = source.BranchCode AND target.SequenceNumber = source.SequenceNumber AND target.MisNumber = source.MisNumber AND target.CharNumber = source.CharNumber WHEN MATCHED THEN UPDATE SET target.RevID = source.RevID, target.TestDescription = source.TestDescription, target.SamplingType = source.SamplingType, target.SamplingValue = source.SamplingValue, target.ToolsGauges = source.ToolsGauges, target.WorkInstructions = source.WorkInstructions, target.Status = source.Status, target.ReleaseDate = source.ReleaseDate, target.ObsoleteDate = source.ObsoleteDate WHEN NOT MATCHED THEN INSERT (ItemNumber, BranchCode, SequenceNumber, MisNumber, RevID, CharNumber, TestDescription, SamplingType, SamplingValue, ToolsGauges, WorkInstructions, Status, ReleaseDate, ObsoleteDate) VALUES (source.ItemNumber, source.BranchCode, source.SequenceNumber, source.MisNumber, source.RevID, source.CharNumber, source.TestDescription, source.SamplingType, source.SamplingValue, source.ToolsGauges, source.WorkInstructions, source.Status, source.ReleaseDate, source.ObsoleteDate);",
"DELETE FROM dbo.MisData_Curr WHERE Status = 'BackLevel';",
"ALTER INDEX [PK_MisData_Curr] ON [dbo].[MisData_Curr] REBUILD;"
]
},
"MisData_Hist": {
"source": {
"connection": "cms",
"query": "SELECT DISTINCT mis.P_PART_NUMBER AS ItemNumber, mis.P_OPERATION_NUMBER AS SequenceNumber, item.PITEM_ID AS MISNumber, itemrev.PITEM_REVISION_ID AS RevID, TRIM(mis.P_SITE) AS BranchCode, zim_test_details.P_SEQ_NUMBER AS CharNumber, zim_test_details.P_TEST_DESC AS TestDescription, zim_test_details.P_SAMPL_TYPE AS SamplingType, zim_test_details.P_SAMPL_VALUE AS SamplingValue, zim_test_details.P_TOOLS AS ToolsGauges, zim_test_details.P_WORK_INTR AS WorkInstructions, Status.PNAME AS Status, Status.PDATE_RELEASED AS ReleaseDate FROM INFODBA.PITEM item INNER JOIN INFODBA.PITEMREVISION itemrev ON (item.PUID = itemrev.RITEMS_TAGU) INNER JOIN INFODBA.PRELEASE_STATUS_LIST listing ON (itemrev.PUID = listing.PUID) INNER JOIN INFODBA.PRELEASESTATUS Status ON (listing.PVALU_0 = Status.PUID) INNER JOIN INFODBA.PIMANRELATION imanrel ON (itemrev.PUID = imanrel.RPRIMARY_OBJECTU) INNER JOIN INFODBA.PFORM form ON (imanrel.RSECONDARY_OBJECTU = form.PUID) INNER JOIN INFODBA.PZIMMERMISDETAILS zim_mis ON (form.RDATA_FILEU = zim_mis.PUID) INNER JOIN INFODBA.P_TEST_DETAILS test_details ON (zim_mis.PUID = test_details.PUID) INNER JOIN INFODBA.P_PART_ASSOCIATION ppa ON (ppa.PUID = test_details.PUID) INNER JOIN INFODBA.PMISDATAOBJECT mis ON (mis.PUID = ppa.PVALU_0) INNER JOIN INFODBA.PZIMTESTDETAILS zim_test_details ON (test_details.PVALU_0 = zim_test_details.PUID) WHERE Status.PNAME = 'BackLevel' AND Status.PDATE_RELEASED >= :lastUpdateDT",
"massQuery": "SELECT DISTINCT mis.P_PART_NUMBER AS ItemNumber, mis.P_OPERATION_NUMBER AS SequenceNumber, item.PITEM_ID AS MISNumber, itemrev.PITEM_REVISION_ID AS RevID, TRIM(mis.P_SITE) AS BranchCode, zim_test_details.P_SEQ_NUMBER AS CharNumber, zim_test_details.P_TEST_DESC AS TestDescription, zim_test_details.P_SAMPL_TYPE AS SamplingType, zim_test_details.P_SAMPL_VALUE AS SamplingValue, zim_test_details.P_TOOLS AS ToolsGauges, zim_test_details.P_WORK_INTR AS WorkInstructions, Status.PNAME AS Status, Status.PDATE_RELEASED AS ReleaseDate FROM INFODBA.PITEM item INNER JOIN INFODBA.PITEMREVISION itemrev ON (item.PUID = itemrev.RITEMS_TAGU) INNER JOIN INFODBA.PRELEASE_STATUS_LIST listing ON (itemrev.PUID = listing.PUID) INNER JOIN INFODBA.PRELEASESTATUS Status ON (listing.PVALU_0 = Status.PUID) INNER JOIN INFODBA.PIMANRELATION imanrel ON (itemrev.PUID = imanrel.RPRIMARY_OBJECTU) INNER JOIN INFODBA.PFORM form ON (imanrel.RSECONDARY_OBJECTU = form.PUID) INNER JOIN INFODBA.PZIMMERMISDETAILS zim_mis ON (form.RDATA_FILEU = zim_mis.PUID) INNER JOIN INFODBA.P_TEST_DETAILS test_details ON (zim_mis.PUID = test_details.PUID) INNER JOIN INFODBA.P_PART_ASSOCIATION ppa ON (ppa.PUID = test_details.PUID) INNER JOIN INFODBA.PMISDATAOBJECT mis ON (mis.PUID = ppa.PVALU_0) INNER JOIN INFODBA.PZIMTESTDETAILS zim_test_details ON (test_details.PVALU_0 = zim_test_details.PUID) WHERE Status.PNAME = 'BackLevel'",
"parameters": {
"lastUpdateDT": { "name": ":lastUpdateDT", "format": null, "source": "offset" }
}
},
"schedules": {
"mass": { "intervalMinutes": 100800 },
"daily": { "enabled": false },
"hourly": { "enabled": false }
},
"destination": {
"table": "MisData_Hist",
"matchColumns": ["ItemNumber", "BranchCode", "SequenceNumber", "MisNumber", "CharNumber"]
},
"postScripts": [
"SET ANSI_WARNINGS OFF; WITH cte AS (SELECT md.MisNumber, md.RevID, md.Status, MIN(md.ReleaseDate) Released FROM dbo.MisData_Hist AS md GROUP BY md.MisNumber, md.RevID, md.Status) UPDATE dbo.MisData_Hist SET ObsoleteDate = bl.Released FROM cte bl WHERE MisData_Hist.MisNumber = bl.MisNumber AND MisData_Hist.RevID = bl.RevID AND MisData_Hist.Status = 'Current' AND bl.Status = 'BackLevel';",
"WITH cte AS (SELECT md.MisNumber, md.RevID, md.Status, MIN(md.ReleaseDate) Released FROM dbo.MisData_Hist AS md GROUP BY md.MisNumber, md.RevID, md.Status) UPDATE dbo.MisData_Hist SET ObsoleteDate = (SELECT TOP 1 nl.Released FROM cte nl WHERE MisData_Hist.MisNumber = nl.MisNumber AND MisData_Hist.RevID < nl.RevID AND MisData_Hist.Status = nl.Status ORDER BY nl.RevID) WHERE ObsoleteDate IS NULL;",
"ALTER INDEX [PK_MisData_Hist] ON [dbo].[MisData_Hist] REBUILD;"
]
},
"WorkOrderTime_Curr": {
"source": {
"connection": "jde",
"query": "SELECT wot.UNIQUEKEYIDINTERNAL_WTUKID AS UniqueID, TRIM(wot.COSTCENTERALT_WTMMCU) AS BranchCode, wot.DOCUMENTORDERINVOICEE_WTDOCO AS WorkOrderNumber, wot.SEQUENCENOOPERATIONS_WTOPSQ AS StepNumber, wot.ADDRESSNUMBER_WTAN8 AS AddressNumber, wot.DTFORGLANDVOUCH1_WTDGL AS GlDate, wot.DATEUPDATED_WTUPMJ AS DateUpdated, wot.TIMEOFDAY_WTTDAY AS TimeUpdated FROM JDESTAGE.F31122_VIEW wot WHERE (wot.DATEUPDATED_WTUPMJ > :dateUpdated OR (wot.DATEUPDATED_WTUPMJ = :dateUpdated AND wot.TIMEOFDAY_WTTDAY >= :timeUpdated))",
"massQuery": "SELECT wot.UNIQUEKEYIDINTERNAL_WTUKID AS UniqueID, TRIM(wot.COSTCENTERALT_WTMMCU) AS BranchCode, wot.DOCUMENTORDERINVOICEE_WTDOCO AS WorkOrderNumber, wot.SEQUENCENOOPERATIONS_WTOPSQ AS StepNumber, wot.ADDRESSNUMBER_WTAN8 AS AddressNumber, wot.DTFORGLANDVOUCH1_WTDGL AS GlDate, wot.DATEUPDATED_WTUPMJ AS DateUpdated, wot.TIMEOFDAY_WTTDAY AS TimeUpdated FROM JDESTAGE.F31122_VIEW wot",
"parameters": {
"dateUpdated": { "name": ":dateUpdated", "format": "jdeJulian", "source": "offset" },
"timeUpdated": { "name": ":timeUpdated", "format": "jdeTime", "source": "offset" }
}
},
"schedules": {
"mass": {},
"daily": {},
"hourly": {}
},
"destination": {
"table": "WorkOrderTime_Curr",
"matchColumns": ["UniqueID"],
"excludeFromUpdate": ["UniqueID", "LastUpdateDt"]
}
},
"WorkOrderComponent_Curr": {
"source": {
"connection": "jde",
"query": "SELECT woc.UNIQUEKEYIDINTERNAL_WMUKID AS UniqueID, woc.DOCUMENTORDERINVOICEE_WMDOCO AS WorkOrderNumber, TRIM(woc.LOT_WMLOTN) AS LotNumber, TRIM(woc.BRANCHCOMPONENT_WMCMCU) AS BranchCode, woc.COMPONENTITEMNOSHORT_WMCPIT AS ShortItemNumber, woc.QUANTITYTRANSACTION_WMTRQT AS Quantity, woc.DATEUPDATED_WMUPMJ AS DateUpdated, woc.TIMEOFDAY_WMTDAY AS TimeUpdated FROM JDESTAGE.F3111_VIEW woc WHERE TRIM(woc.LOT_WMLOTN) IS NOT NULL AND (woc.DATEUPDATED_WMUPMJ > :dateUpdated OR (woc.DATEUPDATED_WMUPMJ = :dateUpdated AND woc.TIMEOFDAY_WMTDAY >= :timeUpdated))",
"massQuery": "SELECT woc.UNIQUEKEYIDINTERNAL_WMUKID AS UniqueID, woc.DOCUMENTORDERINVOICEE_WMDOCO AS WorkOrderNumber, TRIM(woc.LOT_WMLOTN) AS LotNumber, TRIM(woc.BRANCHCOMPONENT_WMCMCU) AS BranchCode, woc.COMPONENTITEMNOSHORT_WMCPIT AS ShortItemNumber, woc.QUANTITYTRANSACTION_WMTRQT AS Quantity, woc.DATEUPDATED_WMUPMJ AS DateUpdated, woc.TIMEOFDAY_WMTDAY AS TimeUpdated FROM JDESTAGE.F3111_VIEW woc WHERE TRIM(woc.LOT_WMLOTN) IS NOT NULL",
"parameters": {
"dateUpdated": { "name": ":dateUpdated", "format": "jdeJulian", "source": "offset" },
"timeUpdated": { "name": ":timeUpdated", "format": "jdeTime", "source": "offset" }
}
},
"schedules": {
"mass": {},
"daily": {},
"hourly": {}
},
"destination": {
"table": "WorkOrderComponent_Curr",
"matchColumns": ["UniqueID"],
"excludeFromUpdate": ["UniqueID", "LastUpdateDt"]
}
},
"WorkOrderStep_Curr": {
"source": {
"connection": "jde",
"query": "SELECT wos.DOCUMENTORDERINVOICEE_WLDOCO AS WorkOrderNumber, TRIM(wos.COSTCENTERALT_WLMMCU) AS BranchCode, TRIM(wos.COSTCENTER_WLMCU) AS WorkCenterCode, wos.SEQUENCENOOPERATIONS_WLOPSQ AS StepNumber, TRIM(wos.DESCRIPTIONLINE1_WLDSC1) AS StepDescription, TRIM(mes.DESCRIPT80CHARACTERS_CFDS80) AS FunctionOperationDescription, wos.TYPEOPERATIONCODE_WLOPSC AS StepTypeCode, CASE wos.DATESTART_WLSTRT WHEN TO_DATE('1900-01-01', 'yyyy-MM-dd') THEN NULL ELSE wos.DATESTART_WLSTRT END AS StartDT, CASE wos.DATECOMPLETION_WLSTRX WHEN TO_DATE('1900-01-01', 'yyyy-MM-dd') THEN NULL ELSE wos.DATECOMPLETION_WLSTRX END AS EndDT, TRIM(wos.USERRESERVEDREFERENCE_WLURRF) AS FunctionCode, wos.DATEUPDATED_WLUPMJ AS DateUpdated, wos.TIMEOFDAY_WLTDAY AS TimeUpdated FROM JDESTAGE.F3112_VIEW wos LEFT OUTER JOIN JDESTAGE.F00192_VIEW mes ON (wos.USERRESERVEDREFERENCE_WLURRF = mes.USERDEFINEDCODE_CFKY) WHERE (wos.DATEUPDATED_WLUPMJ > :dateUpdated OR (wos.DATEUPDATED_WLUPMJ = :dateUpdated AND wos.TIMEOFDAY_WLTDAY >= :timeUpdated))",
"massQuery": "SELECT wos.DOCUMENTORDERINVOICEE_WLDOCO AS WorkOrderNumber, TRIM(wos.COSTCENTERALT_WLMMCU) AS BranchCode, TRIM(wos.COSTCENTER_WLMCU) AS WorkCenterCode, wos.SEQUENCENOOPERATIONS_WLOPSQ AS StepNumber, TRIM(wos.DESCRIPTIONLINE1_WLDSC1) AS StepDescription, TRIM(mes.DESCRIPT80CHARACTERS_CFDS80) AS FunctionOperationDescription, wos.TYPEOPERATIONCODE_WLOPSC AS StepTypeCode, CASE wos.DATESTART_WLSTRT WHEN TO_DATE('1900-01-01', 'yyyy-MM-dd') THEN NULL ELSE wos.DATESTART_WLSTRT END AS StartDT, CASE wos.DATECOMPLETION_WLSTRX WHEN TO_DATE('1900-01-01', 'yyyy-MM-dd') THEN NULL ELSE wos.DATECOMPLETION_WLSTRX END AS EndDT, TRIM(wos.USERRESERVEDREFERENCE_WLURRF) AS FunctionCode, wos.DATEUPDATED_WLUPMJ AS DateUpdated, wos.TIMEOFDAY_WLTDAY AS TimeUpdated FROM JDESTAGE.F3112_VIEW wos LEFT OUTER JOIN JDESTAGE.F00192_VIEW mes ON (wos.USERRESERVEDREFERENCE_WLURRF = mes.USERDEFINEDCODE_CFKY)",
"parameters": {
"dateUpdated": { "name": ":dateUpdated", "format": "jdeJulian", "source": "offset" },
"timeUpdated": { "name": ":timeUpdated", "format": "jdeTime", "source": "offset" }
}
},
"schedules": {
"mass": {},
"daily": {},
"hourly": {}
},
"destination": {
"table": "WorkOrderStep_Curr",
"matchColumns": ["WorkOrderNumber", "BranchCode", "StepNumber"],
"excludeFromUpdate": ["WorkOrderNumber", "BranchCode", "StepNumber", "LastUpdateDt"]
}
},
"WorkOrderRouting": {
"source": {
"connection": "jde",
"query": "SELECT TRIM(woz.EDIUSERID_SZEDUS) AS UserID, TRIM(woz.EDIBATCHNUMBER_SZEDBT) AS BatchNumber, TRIM(woz.EDITRANSACTNUMBER_SZEDTN) AS TransactionNumber, woz.EDILINENUMBER_SZEDLN AS LineNumber, woz.SEQUENCENOOPERATIONS_SZOPSQ AS StepNumber, TRIM(woz.COSTCENTER_SZMCU) AS WorkCenterCode, woz.DOCUMENTORDERINVOICEE_SZDOCO AS WorkOrderNumber, TRIM(woz.TYPEROUTING_SZTRT) AS RoutingType, TRIM(woz.COSTCENTERALT_SZMMCU) AS BranchCode, TRIM(woz.DESCRIPTIONLINE1_SZDSC1) AS StepDescription, TRIM(woz.USERRESERVEDREFERENCE_SZURRF) AS FunctionCode, woz.DATETRANSACTIONJULIAN_SZTRDJ AS TransactionDate, woz.DATEUPDATED_SZUPMJ AS DateUpdated, woz.TIMEOFDAY_SZTDAY AS TimeUpdated FROM JDESTAGE.F3112Z1_VIEW woz WHERE woz.TYPETRANSACTION_SZTYTN = 'JDERTG' AND woz.DIRECTIONINDICATOR_SZDRIN = '2' AND woz.TRANSACTIONACTION_SZTNAC = '02' AND woz.PROGRAMID_SZPID = 'ER31410' AND (woz.DATEUPDATED_SZUPMJ > :dateUpdated OR (woz.DATEUPDATED_SZUPMJ = :dateUpdated AND woz.TIMEOFDAY_SZTDAY >= :timeUpdated))",
"massQuery": "SELECT TRIM(woz.EDIUSERID_SZEDUS) AS UserID, TRIM(woz.EDIBATCHNUMBER_SZEDBT) AS BatchNumber, TRIM(woz.EDITRANSACTNUMBER_SZEDTN) AS TransactionNumber, woz.EDILINENUMBER_SZEDLN AS LineNumber, woz.SEQUENCENOOPERATIONS_SZOPSQ AS StepNumber, TRIM(woz.COSTCENTER_SZMCU) AS WorkCenterCode, woz.DOCUMENTORDERINVOICEE_SZDOCO AS WorkOrderNumber, TRIM(woz.TYPEROUTING_SZTRT) AS RoutingType, TRIM(woz.COSTCENTERALT_SZMMCU) AS BranchCode, TRIM(woz.DESCRIPTIONLINE1_SZDSC1) AS StepDescription, TRIM(woz.USERRESERVEDREFERENCE_SZURRF) AS FunctionCode, woz.DATETRANSACTIONJULIAN_SZTRDJ AS TransactionDate, woz.DATEUPDATED_SZUPMJ AS DateUpdated, woz.TIMEOFDAY_SZTDAY AS TimeUpdated FROM JDESTAGE.F3112Z1_VIEW woz WHERE woz.TYPETRANSACTION_SZTYTN = 'JDERTG' AND woz.DIRECTIONINDICATOR_SZDRIN = '2' AND woz.TRANSACTIONACTION_SZTNAC = '02' AND woz.PROGRAMID_SZPID = 'ER31410'",
"parameters": {
"dateUpdated": { "name": ":dateUpdated", "format": "jdeJulian", "source": "offset" },
"timeUpdated": { "name": ":timeUpdated", "format": "jdeTime", "source": "offset" }
}
},
"schedules": {
"mass": {},
"daily": {},
"hourly": {}
},
"destination": {
"table": "WorkOrderRouting",
"matchColumns": ["UserID", "BatchNumber", "TransactionNumber", "LineNumber"],
"excludeFromUpdate": ["UserID", "BatchNumber", "TransactionNumber", "LineNumber", "LastUpdateDt"]
}
},
"StatusCode": {
"source": {
"connection": "giw",
"query": "SELECT TRIM(sc.USERDEFINEDCODE_DRKY) AS Code, TRIM(sc.DESCRIPTION001_DRDL01) AS Description, sc.DATEUPDATED_DRUPMJ AS DateUpdated, sc.TIMELASTUPDATED_DRUPMT AS TimeUpdated FROM JDESTAGE.F0005_VIEW sc WHERE TRIM(sc.PRODUCTCODE_DRSY) = '00' AND sc.USERDEFINEDCODES_DRRT = 'SS' AND TRIM(sc.USERDEFINEDCODE_DRKY) IS NOT NULL AND (sc.DATEUPDATED_DRUPMJ > :dateUpdated OR (sc.DATEUPDATED_DRUPMJ = :dateUpdated AND sc.TIMELASTUPDATED_DRUPMT >= :timeUpdated))",
"massQuery": "SELECT TRIM(sc.USERDEFINEDCODE_DRKY) AS Code, TRIM(sc.DESCRIPTION001_DRDL01) AS Description, sc.DATEUPDATED_DRUPMJ AS DateUpdated, sc.TIMELASTUPDATED_DRUPMT AS TimeUpdated FROM JDESTAGE.F0005_VIEW sc WHERE TRIM(sc.PRODUCTCODE_DRSY) = '00' AND sc.USERDEFINEDCODES_DRRT = 'SS' AND TRIM(sc.USERDEFINEDCODE_DRKY) IS NOT NULL",
"parameters": {
"dateUpdated": { "name": ":dateUpdated", "format": "jdeJulian", "source": "offset" },
"timeUpdated": { "name": ":timeUpdated", "format": "jdeTime", "source": "offset" }
}
},
"schedules": {
"mass": {},
"daily": {},
"hourly": {}
},
"destination": {
"table": "StatusCode",
"matchColumns": ["Code"],
"excludeFromUpdate": ["Code", "LastUpdateDt"]
}
},
"OrgHierarchy": {
"source": {
"connection": "jde",
"query": "SELECT TRIM(oh.DISPATCHGROUP_IWMCUW) AS ProfitCenterCode, TRIM(oh.COSTCENTER_IWMCU) AS WorkCenterCode, TRIM(oh.COSTCENTERALT_IWMMCU) AS BranchCode, oh.DATEUPDATED_IWUPMJ AS DateUpdated, oh.TIMEOFDAY_IWTDAY AS TimeUpdated FROM JDESTAGE.F30006_VIEW oh WHERE TRIM(oh.COSTCENTER_IWMCU) IS NOT NULL AND TRIM(oh.COSTCENTERALT_IWMMCU) IS NOT NULL AND (oh.DATEUPDATED_IWUPMJ > :dateUpdated OR (oh.DATEUPDATED_IWUPMJ = :dateUpdated AND oh.TIMEOFDAY_IWTDAY >= :timeUpdated))",
"massQuery": "SELECT TRIM(oh.DISPATCHGROUP_IWMCUW) AS ProfitCenterCode, TRIM(oh.COSTCENTER_IWMCU) AS WorkCenterCode, TRIM(oh.COSTCENTERALT_IWMMCU) AS BranchCode, oh.DATEUPDATED_IWUPMJ AS DateUpdated, oh.TIMEOFDAY_IWTDAY AS TimeUpdated FROM JDESTAGE.F30006_VIEW oh WHERE TRIM(oh.COSTCENTER_IWMCU) IS NOT NULL AND TRIM(oh.COSTCENTERALT_IWMMCU) IS NOT NULL",
"parameters": {
"dateUpdated": { "name": ":dateUpdated", "format": "jdeJulian", "source": "offset" },
"timeUpdated": { "name": ":timeUpdated", "format": "jdeTime", "source": "offset" }
}
},
"schedules": {
"mass": {},
"daily": {},
"hourly": {}
},
"destination": {
"table": "OrgHierarchy",
"matchColumns": ["WorkCenterCode", "BranchCode"],
"excludeFromUpdate": ["WorkCenterCode", "BranchCode", "LastUpdateDt"]
}
},
"RouteMaster": {
"source": {
"connection": "jde",
"query": "SELECT TRIM(route_master.COSTCENTERALT_IRMMCU) AS BranchCode, TRIM(route_master.ITEMNUMBER2NDKIT_IRKITL) AS ItemNumber, TRIM(route_master.TYPEROUTING_IRTRT) AS RoutingType, route_master.SEQUENCENOOPERATIONS_IROPSQ AS SequenceNumber, TRIM(route_master.USERRESERVEDREFERENCE_IRURRF) AS FunctionCode, TRIM(route_master.COSTCENTER_IRMCU) AS WorkCenterCode, route_master.EFFECTIVEFROMDATE_IREFFF AS StartDate, route_master.EFFECTIVETHRUDATE_IREFFT AS EndDate, route_master.DATEUPDATED_IRUPMJ AS DateUpdated, route_master.TIMEOFDAY_IRTDAY AS TimeUpdated FROM JDESTAGE.F3003_VIEW route_master WHERE TRIM(route_master.ITEMNUMBER2NDKIT_IRKITL) IS NOT NULL AND (route_master.DATEUPDATED_IRUPMJ > :dateUpdated OR (route_master.DATEUPDATED_IRUPMJ = :dateUpdated AND route_master.TIMEOFDAY_IRTDAY >= :timeUpdated))",
"massQuery": "SELECT TRIM(route_master.COSTCENTERALT_IRMMCU) AS BranchCode, TRIM(route_master.ITEMNUMBER2NDKIT_IRKITL) AS ItemNumber, TRIM(route_master.TYPEROUTING_IRTRT) AS RoutingType, route_master.SEQUENCENOOPERATIONS_IROPSQ AS SequenceNumber, TRIM(route_master.USERRESERVEDREFERENCE_IRURRF) AS FunctionCode, TRIM(route_master.COSTCENTER_IRMCU) AS WorkCenterCode, route_master.EFFECTIVEFROMDATE_IREFFF AS StartDate, route_master.EFFECTIVETHRUDATE_IREFFT AS EndDate, route_master.DATEUPDATED_IRUPMJ AS DateUpdated, route_master.TIMEOFDAY_IRTDAY AS TimeUpdated FROM JDESTAGE.F3003_VIEW route_master WHERE TRIM(route_master.ITEMNUMBER2NDKIT_IRKITL) IS NOT NULL",
"parameters": {
"dateUpdated": { "name": ":dateUpdated", "format": "jdeJulian", "source": "offset" },
"timeUpdated": { "name": ":timeUpdated", "format": "jdeTime", "source": "offset" }
}
},
"schedules": {
"mass": {},
"daily": {},
"hourly": {}
},
"destination": {
"table": "RouteMaster",
"matchColumns": ["BranchCode", "ItemNumber", "RoutingType", "SequenceNumber"],
"excludeFromUpdate": ["BranchCode", "ItemNumber", "RoutingType", "SequenceNumber", "LastUpdateDt"]
}
},
"FunctionCode": {
"source": {
"connection": "jde",
"query": "SELECT Code, TRIM(LISTAGG(Description, ' ') WITHIN GROUP(ORDER BY Description) || CASE WHEN MAX(total_lengthb) > 4000 THEN '...' ELSE '' END) Description, SYSDATE AS LastUpdateDT FROM (SELECT TRIM(fc.CFKY) AS Code, TRIM(ASCIISTR(fc.CFDS80)) AS Description, SUM(LENGTHB(TRIM(fc.CFDS80))+1) OVER(PARTITION BY TRIM(fc.CFKY) ORDER BY TRIM(fc.CFDS80)) - 1 cumul_lengthb, SUM(LENGTHB(TRIM(fc.CFDS80))+1) OVER(PARTITION BY TRIM(fc.CFKY)) - 1 total_lengthb, COUNT(*) OVER(PARTITION BY TRIM(fc.CFKY)) num_values FROM PRODDTA.F00192 fc WHERE TRIM(fc.CFKY) IS NOT NULL) WHERE total_lengthb <= 4000 OR cumul_lengthb <= 4000 - length('...') GROUP BY Code",
"massQuery": "SELECT Code, TRIM(LISTAGG(Description, ' ') WITHIN GROUP(ORDER BY Description) || CASE WHEN MAX(total_lengthb) > 4000 THEN '...' ELSE '' END) Description, SYSDATE AS LastUpdateDT FROM (SELECT TRIM(fc.CFKY) AS Code, TRIM(ASCIISTR(fc.CFDS80)) AS Description, SUM(LENGTHB(TRIM(fc.CFDS80))+1) OVER(PARTITION BY TRIM(fc.CFKY) ORDER BY TRIM(fc.CFDS80)) - 1 cumul_lengthb, SUM(LENGTHB(TRIM(fc.CFDS80))+1) OVER(PARTITION BY TRIM(fc.CFKY)) - 1 total_lengthb, COUNT(*) OVER(PARTITION BY TRIM(fc.CFKY)) num_values FROM PRODDTA.F00192 fc WHERE TRIM(fc.CFKY) IS NOT NULL) WHERE total_lengthb <= 4000 OR cumul_lengthb <= 4000 - length('...') GROUP BY Code",
"parameters": {}
},
"schedules": {
"mass": { "prePurge": true, "reIndex": true },
"daily": { "prePurge": true, "reIndex": true },
"hourly": { "prePurge": true, "reIndex": true }
},
"destination": {
"table": "FunctionCode",
"matchColumns": ["Code"],
"excludeFromUpdate": ["Code", "LastUpdateDt"]
}
}
}
}
@@ -0,0 +1,260 @@
using System.Text.Json;
using JdeScoping.Core.Models.Enums;
using JdeScoping.DataAccess.Interfaces;
using JdeScoping.DataSync.Configuration;
using JdeScoping.DataSync.Etl.Contracts;
using JdeScoping.DataSync.Etl.Destinations;
using JdeScoping.DataSync.Etl.Pipeline;
using JdeScoping.DataSync.Etl.Scripts;
using JdeScoping.DataSync.Etl.Sources;
using JdeScoping.DataSync.Etl.Transformers;
using Microsoft.Extensions.Logging;
namespace JdeScoping.DataSync.Services;
/// <summary>
/// Builds executable ETL pipelines from pipeline configuration.
/// </summary>
public class EtlPipelineBuilderService : IEtlPipelineBuilder
{
private const string DefaultTimezone = "UTC";
private readonly IDbConnectionFactory _connectionFactory;
private readonly ILogger<EtlPipeline> _logger;
/// <summary>
/// Initializes a new instance of the <see cref="EtlPipelineBuilderService"/> class.
/// </summary>
/// <param name="connectionFactory">Factory for creating database connections.</param>
/// <param name="logger">Logger for pipeline execution.</param>
public EtlPipelineBuilderService(
IDbConnectionFactory connectionFactory,
ILogger<EtlPipeline> logger)
{
_connectionFactory = connectionFactory ?? throw new ArgumentNullException(nameof(connectionFactory));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
/// <inheritdoc />
public EtlPipeline Build(EtlPipelineConfig config, UpdateTypes updateType, DateTime? minimumDt)
{
ArgumentNullException.ThrowIfNull(config);
// Determine if this is a mass sync
var isMassSync = updateType == UpdateTypes.Mass;
// Use massQuery for Mass updates if available, otherwise use the regular query
var useMassQuery = isMassSync && !string.IsNullOrEmpty(config.Source.MassQuery);
// Create source with parameter substitution
var source = CreateSource(config.Source, minimumDt, useMassQuery);
// Create destination based on update type
// Mass = bulk import (TRUNCATE + INSERT), Daily/Hourly = bulk merge (UPSERT)
var destination = isMassSync
? CreateBulkImportDestination(config.Destination)
: CreateBulkMergeDestination(config.Destination);
// Build the pipeline
var builder = new EtlPipelineBuilder()
.WithName(config.Name)
.WithSource(source)
.WithDestination(destination)
.WithLogger(_logger);
// Add transformers if configured
foreach (var transform in config.Transforms)
{
var transformer = CreateTransformer(transform);
if (transformer != null)
{
builder.WithTransformer(transformer);
}
}
// Add pre-scripts: config scripts first, then TRUNCATE for mass sync
foreach (var script in config.PreScripts)
{
builder.WithPreScript(CreateScriptRunner(script));
}
if (isMassSync)
{
var truncateSql = $"TRUNCATE TABLE [{config.Destination.Table}]";
builder.WithPreScript(new SqlScriptRunner(_connectionFactory, truncateSql, "PrePurge"));
}
// Add post-scripts: REBUILD INDEX for mass sync first, then config scripts
if (isMassSync)
{
var reindexSql = $"ALTER INDEX ALL ON [{config.Destination.Table}] REBUILD";
builder.WithPostScript(new SqlScriptRunner(_connectionFactory, reindexSql, "ReIndex"));
}
foreach (var script in config.PostScripts)
{
builder.WithPostScript(CreateScriptRunner(script));
}
return builder.Build();
}
private IImportSource CreateSource(SourceElement sourceConfig, DateTime? minDt, bool useMassQuery)
{
// Use massQuery if specified, otherwise use the default query
var query = useMassQuery ? sourceConfig.MassQuery! : sourceConfig.Query;
var parameters = new Dictionary<string, object>();
var converter = new ParameterFormatConverter(DefaultTimezone);
// Only add parameters when not using massQuery (mass queries typically don't need date parameters)
var needsParameters = !useMassQuery;
if (sourceConfig.Parameters.Count > 0 && minDt.HasValue && needsParameters)
{
foreach (var (_, paramConfig) in sourceConfig.Parameters)
{
var paramValue = paramConfig.Source.ToLowerInvariant() switch
{
"offset" => converter.Convert(minDt.Value, paramConfig.Format),
"static" => paramConfig.Value
?? throw new InvalidOperationException(
$"Static parameter '{paramConfig.Name}' requires a value."),
_ => throw new NotSupportedException(
$"Parameter source '{paramConfig.Source}' is not supported.")
};
// Use the parameter name exactly as configured (provider-specific)
parameters[paramConfig.Name] = paramValue;
}
}
return new DbQuerySource(
_connectionFactory,
sourceConfig.Connection,
query,
parameters);
}
private IImportDestination CreateBulkImportDestination(DestinationElement destConfig)
{
return new DbBulkImportDestination(_connectionFactory, destConfig.Table);
}
private IImportDestination CreateBulkMergeDestination(DestinationElement destConfig)
{
if (destConfig.MatchColumns.Count == 0)
{
throw new InvalidOperationException(
$"matchColumns required for incremental sync on table '{destConfig.Table}'.");
}
return new DbBulkMergeDestination(
_connectionFactory,
destConfig.Table,
destConfig.MatchColumns.ToArray(),
updateColumns: null,
excludeFromUpdate: destConfig.ExcludeFromUpdate.Count > 0
? destConfig.ExcludeFromUpdate.ToArray()
: null,
updateCondition: null);
}
private IScriptRunner CreateScriptRunner(ScriptElement script)
{
var name = $"Script:{script.Script.Substring(0, Math.Min(30, script.Script.Length))}";
return new SqlScriptRunner(_connectionFactory, script.Script, name);
}
private static IDataTransformer? CreateTransformer(TransformElement transform)
{
var type = transform.TransformType.ToLowerInvariant();
return type switch
{
"columndrop" => CreateColumnDropTransformer(transform),
"columnrename" => CreateColumnRenameTransformer(transform),
"jdedate" => CreateJdeDateTransformer(transform),
"regex" => CreateRegexTransformer(transform),
_ => null // Skip unknown transformer types
};
}
private static IDataTransformer? CreateColumnDropTransformer(TransformElement transform)
{
if (transform.Config == null)
return null;
var columns = transform.Config.Value.TryGetProperty("columns", out var columnsElement)
? columnsElement.EnumerateArray().Select(e => e.GetString()!).ToArray()
: null;
if (columns == null || columns.Length == 0)
return null;
return new ColumnDropTransformer(columns);
}
private static IDataTransformer? CreateColumnRenameTransformer(TransformElement transform)
{
if (transform.Config == null)
return null;
var renames = new List<(string OldName, string NewName)>();
if (transform.Config.Value.TryGetProperty("mappings", out var mappingsElement))
{
foreach (var prop in mappingsElement.EnumerateObject())
{
renames.Add((prop.Name, prop.Value.GetString() ?? prop.Name));
}
}
if (renames.Count == 0)
return null;
return new ColumnRenameTransformer(renames.ToArray());
}
private static IDataTransformer? CreateJdeDateTransformer(TransformElement transform)
{
if (transform.Config == null)
return null;
var dateColumn = transform.Config.Value.TryGetProperty("dateColumn", out var dateEl)
? dateEl.GetString()
: null;
var timeColumn = transform.Config.Value.TryGetProperty("timeColumn", out var timeEl)
? timeEl.GetString()
: null;
var outputColumn = transform.Config.Value.TryGetProperty("outputColumn", out var outputEl)
? outputEl.GetString()
: null;
if (string.IsNullOrEmpty(dateColumn) || string.IsNullOrEmpty(timeColumn) || string.IsNullOrEmpty(outputColumn))
return null;
return new JdeDateTransformer(dateColumn, timeColumn, outputColumn);
}
private static IDataTransformer? CreateRegexTransformer(TransformElement transform)
{
if (transform.Config == null)
return null;
var column = transform.Config.Value.TryGetProperty("column", out var columnElement)
? columnElement.GetString()
: null;
var pattern = transform.Config.Value.TryGetProperty("pattern", out var patternElement)
? patternElement.GetString()
: null;
var replacement = transform.Config.Value.TryGetProperty("replacement", out var replacementElement)
? replacementElement.GetString()
: string.Empty;
if (string.IsNullOrEmpty(column) || string.IsNullOrEmpty(pattern))
return null;
return new RegexTransformer(column, pattern, replacement ?? string.Empty);
}
}
@@ -1,370 +0,0 @@
using System.Text.Json;
using JdeScoping.Core.Models.Enums;
using JdeScoping.DataAccess.Interfaces;
using JdeScoping.DataSync.Configuration;
using JdeScoping.DataSync.Contracts;
using JdeScoping.DataSync.Etl.Contracts;
using JdeScoping.DataSync.Etl.Destinations;
using JdeScoping.DataSync.Etl.Pipeline;
using JdeScoping.DataSync.Etl.Scripts;
using JdeScoping.DataSync.Etl.Sources;
using JdeScoping.DataSync.Options;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
namespace JdeScoping.DataSync.Services;
/// <summary>
/// Factory for creating ETL pipelines from JSON configuration.
/// </summary>
public class EtlPipelineFactory : IEtlPipelineFactory
{
private static readonly JsonSerializerOptions JsonOptions = new()
{
PropertyNameCaseInsensitive = true,
ReadCommentHandling = JsonCommentHandling.Skip,
AllowTrailingCommas = true
};
private readonly IDbConnectionFactory _connectionFactory;
private readonly ILogger<EtlPipeline> _logger;
private readonly PipelinesRoot _config;
/// <summary>
/// Creates a new pipeline factory.
/// </summary>
/// <param name="connectionFactory">Factory for creating database connections.</param>
/// <param name="options">Pipeline configuration options.</param>
/// <param name="logger">Logger for pipeline execution.</param>
public EtlPipelineFactory(
IDbConnectionFactory connectionFactory,
IOptions<PipelineOptions> options,
ILogger<EtlPipeline> logger)
{
ArgumentNullException.ThrowIfNull(connectionFactory);
ArgumentNullException.ThrowIfNull(options);
ArgumentNullException.ThrowIfNull(logger);
_connectionFactory = connectionFactory;
_logger = logger;
_config = LoadPipelineConfigs(options.Value.ConfigPath);
}
/// <summary>
/// Creates a new pipeline factory with a pre-loaded configuration (for testing).
/// </summary>
/// <param name="connectionFactory">Factory for creating database connections.</param>
/// <param name="config">Pre-loaded pipeline configuration.</param>
/// <param name="logger">Logger for pipeline execution.</param>
internal EtlPipelineFactory(
IDbConnectionFactory connectionFactory,
PipelinesRoot config,
ILogger<EtlPipeline> logger)
{
ArgumentNullException.ThrowIfNull(connectionFactory);
ArgumentNullException.ThrowIfNull(config);
ArgumentNullException.ThrowIfNull(logger);
ValidateConfig(config);
_connectionFactory = connectionFactory;
_logger = logger;
_config = config;
}
/// <inheritdoc />
public IEtlPipelineBuilder ForTable(string tableName)
{
ArgumentException.ThrowIfNullOrWhiteSpace(tableName);
if (!_config.Pipelines.TryGetValue(tableName, out var pipelineConfig))
{
throw new InvalidOperationException(
$"No pipeline configured for table: {tableName}. " +
$"Available tables: {string.Join(", ", _config.Pipelines.Keys)}");
}
return new PipelineBuilder(
_connectionFactory,
tableName,
pipelineConfig,
_config.EffectiveSettings,
_config.EffectiveScheduleDefaults,
_logger);
}
/// <inheritdoc />
public IReadOnlyList<string> GetAvailableTables()
{
return _config.Pipelines.Keys.ToList().AsReadOnly();
}
/// <inheritdoc />
public PipelineConfig? GetPipelineConfig(string tableName)
{
return _config.Pipelines.TryGetValue(tableName, out var config) ? config : null;
}
/// <inheritdoc />
public ScheduleDefaults GetScheduleDefaults()
{
return _config.EffectiveScheduleDefaults;
}
private PipelinesRoot LoadPipelineConfigs(string configPath)
{
// Resolve path relative to assembly location (handles both debug and publish)
var assemblyDir = Path.GetDirectoryName(typeof(EtlPipelineFactory).Assembly.Location)!;
var fullPath = Path.Combine(assemblyDir, configPath);
if (!File.Exists(fullPath))
{
throw new FileNotFoundException(
$"Pipeline config not found: {fullPath}. " +
"Ensure the config file is included in the build output.");
}
var json = File.ReadAllText(fullPath);
var root = JsonSerializer.Deserialize<PipelinesRoot>(json, JsonOptions)
?? throw new InvalidOperationException("Failed to deserialize pipeline config: result was null.");
ValidateConfig(root);
return root;
}
private static void ValidateConfig(PipelinesRoot root)
{
foreach (var (name, config) in root.Pipelines)
{
// Schedules are now required
if (config.Schedules == null)
{
throw new InvalidOperationException(
$"Pipeline '{name}' must define 'schedules'.");
}
// Validate no runtime parameters (not yet supported)
if (config.Source.Parameters != null)
{
foreach (var (paramName, paramConfig) in config.Source.Parameters)
{
if (paramConfig.Source.Equals("runtime", StringComparison.OrdinalIgnoreCase))
{
throw new NotSupportedException(
$"Pipeline '{name}' parameter '{paramName}': " +
"runtime parameter source is not yet supported.");
}
}
}
}
}
private sealed class PipelineBuilder : IEtlPipelineBuilder
{
private readonly IDbConnectionFactory _connectionFactory;
private readonly string _tableName;
private readonly PipelineConfig _config;
private readonly PipelineSettings _settings;
private readonly ScheduleDefaults _scheduleDefaults;
private readonly ILogger<EtlPipeline> _logger;
private UpdateTypes _updateType = UpdateTypes.Hourly;
private DateTime? _minDtOverride;
/// <summary>
/// Initializes a new instance of the PipelineBuilder class.
/// </summary>
/// <param name="connectionFactory">Factory for creating database connections.</param>
/// <param name="tableName">The name of the table for this pipeline.</param>
/// <param name="config">The pipeline configuration.</param>
/// <param name="settings">Global pipeline settings.</param>
/// <param name="scheduleDefaults">Default schedule configuration.</param>
/// <param name="logger">Logger for pipeline execution.</param>
public PipelineBuilder(
IDbConnectionFactory connectionFactory,
string tableName,
PipelineConfig config,
PipelineSettings settings,
ScheduleDefaults scheduleDefaults,
ILogger<EtlPipeline> logger)
{
_connectionFactory = connectionFactory;
_tableName = tableName;
_config = config;
_settings = settings;
_scheduleDefaults = scheduleDefaults;
_logger = logger;
}
/// <summary>
/// Specifies the update type for this pipeline.
/// </summary>
/// <param name="updateType">The type of update (Mass, Daily, or Hourly).</param>
/// <returns>The builder for fluent configuration.</returns>
public IEtlPipelineBuilder WithUpdateType(UpdateTypes updateType)
{
_updateType = updateType;
return this;
}
/// <summary>
/// Specifies the minimum date for incremental data extraction.
/// </summary>
/// <param name="minDt">The minimum date, or null for no filter.</param>
/// <returns>The builder for fluent configuration.</returns>
public IEtlPipelineBuilder WithMinimumDate(DateTime? minDt)
{
_minDtOverride = minDt;
return this;
}
/// <summary>
/// Builds and returns the configured ETL pipeline.
/// </summary>
/// <returns>A configured ETL pipeline ready for execution.</returns>
public EtlPipeline Build()
{
return BuildWithSchedules();
}
private EtlPipeline BuildWithSchedules()
{
var scheduleConfig = GetEffectiveScheduleConfig(_updateType);
// Compute MinDt from override
var minDt = _minDtOverride;
// Use massQuery for Mass, regular query for Daily/Hourly
var useMassQuery = _updateType == UpdateTypes.Mass && !string.IsNullOrEmpty(_config.Source.MassQuery);
// Create source with parameter substitution
var source = CreateSource(_config.Source, minDt, useMassQuery);
// Determine destination type (Mass with prePurge = bulkImport, others = bulkMerge unless prePurge)
var destType = scheduleConfig.PrePurge ? "bulkImport" : "bulkMerge";
var destination = CreateDestination(destType, _config.Destination, scheduleConfig);
// Build pipeline with scripts
var builder = new EtlPipelineBuilder()
.WithName(_tableName)
.WithSource(source)
.WithDestination(destination)
.WithLogger(_logger);
// Add pre-scripts: config scripts first, then prePurge
foreach (var script in _config.PreScripts ?? [])
{
builder.WithPreScript(new SqlScriptRunner(_connectionFactory, script, $"PreScript:{script.Substring(0, Math.Min(30, script.Length))}"));
}
if (scheduleConfig.PrePurge)
{
var truncateSql = $"TRUNCATE TABLE [{_config.Destination.Table}]";
builder.WithPreScript(new SqlScriptRunner(_connectionFactory, truncateSql, "PrePurge"));
}
// Add post-scripts: reIndex first, then config scripts
if (scheduleConfig.ReIndex)
{
var reindexSql = $"ALTER INDEX ALL ON [{_config.Destination.Table}] REBUILD";
builder.WithPostScript(new SqlScriptRunner(_connectionFactory, reindexSql, "ReIndex"));
}
foreach (var script in _config.PostScripts ?? [])
{
builder.WithPostScript(new SqlScriptRunner(_connectionFactory, script, $"PostScript:{script.Substring(0, Math.Min(30, script.Length))}"));
}
return builder.Build();
}
private Configuration.ScheduleConfig GetEffectiveScheduleConfig(UpdateTypes updateType)
{
// Get default for this update type
var defaultConfig = updateType switch
{
UpdateTypes.Mass => _scheduleDefaults.Mass,
UpdateTypes.Daily => _scheduleDefaults.Daily,
UpdateTypes.Hourly => _scheduleDefaults.Hourly,
_ => _scheduleDefaults.Hourly
};
// Get pipeline-specific override if exists
var pipelineConfig = updateType switch
{
UpdateTypes.Mass => _config.Schedules?.Mass,
UpdateTypes.Daily => _config.Schedules?.Daily,
UpdateTypes.Hourly => _config.Schedules?.Hourly,
_ => null
};
// Merge: pipeline config overrides defaults
return pipelineConfig?.MergeWith(defaultConfig) ?? defaultConfig;
}
private IImportSource CreateSource(SourceConfig sourceConfig, DateTime? minDt, bool useMassQuery)
{
// Use massQuery if specified, otherwise use the default query
var query = useMassQuery ? sourceConfig.MassQuery! : sourceConfig.Query;
var parameters = new Dictionary<string, object>();
var converter = new ParameterFormatConverter(_settings.Timezone);
// Only add parameters when not using massQuery (mass queries typically don't need date parameters)
var needsParameters = !useMassQuery;
if (sourceConfig.Parameters != null && minDt.HasValue && needsParameters)
{
foreach (var (_, paramConfig) in sourceConfig.Parameters)
{
var paramValue = paramConfig.Source.ToLowerInvariant() switch
{
"offset" => converter.Convert(minDt.Value, paramConfig.Format),
"static" => paramConfig.Value
?? throw new InvalidOperationException(
$"Static parameter '{paramConfig.Name}' requires a value."),
_ => throw new NotSupportedException(
$"Parameter source '{paramConfig.Source}' is not supported.")
};
// Use the parameter name exactly as configured (provider-specific)
parameters[paramConfig.Name] = paramValue;
}
}
return new DbQuerySource(
_connectionFactory,
sourceConfig.Connection,
query,
parameters);
}
private IImportDestination CreateDestination(
string destType,
DestinationConfig baseConfig,
Configuration.ScheduleConfig scheduleConfig)
{
var tableName = baseConfig.Table;
// Use base config for match/exclude columns
var matchColumns = baseConfig.MatchColumns?.ToArray();
var excludeFromUpdate = baseConfig.ExcludeFromUpdate?.ToArray();
return destType.ToLowerInvariant() switch
{
"bulkimport" => new DbBulkImportDestination(_connectionFactory, tableName),
"bulkmerge" => new DbBulkMergeDestination(
_connectionFactory,
tableName,
matchColumns ?? throw new InvalidOperationException(
$"matchColumns required for bulkMerge destination on table '{tableName}'."),
updateColumns: null,
excludeFromUpdate: excludeFromUpdate,
updateCondition: scheduleConfig.UpdateWhen),
_ => throw new InvalidOperationException(
$"Unknown destination type: '{destType}'. Expected 'bulkImport' or 'bulkMerge'.")
};
}
}
}
@@ -0,0 +1,20 @@
using JdeScoping.Core.Models.Enums;
using JdeScoping.DataSync.Configuration;
using JdeScoping.DataSync.Etl.Pipeline;
namespace JdeScoping.DataSync.Services;
/// <summary>
/// Builds executable ETL pipelines from pipeline configuration.
/// </summary>
public interface IEtlPipelineBuilder
{
/// <summary>
/// Builds an executable ETL pipeline from the given configuration.
/// </summary>
/// <param name="config">The pipeline configuration.</param>
/// <param name="updateType">The update type (Mass, Daily, or Hourly).</param>
/// <param name="minimumDt">The minimum date for incremental syncs, or null for mass syncs.</param>
/// <returns>A configured ETL pipeline ready for execution.</returns>
EtlPipeline Build(EtlPipelineConfig config, UpdateTypes updateType, DateTime? minimumDt);
}
@@ -1,8 +1,7 @@
using System.Diagnostics;
using JdeScoping.Core.Models.Enums;
using JdeScoping.Core.Interfaces;
using JdeScoping.DataSync.Options;
using JdeScoping.DataSync.Contracts;
using JdeScoping.DataSync.Options;
using JdeScoping.DataSync.Models;
using JdeScoping.DataSync.Telemetry;
using Microsoft.Extensions.Logging;
@@ -15,7 +14,7 @@ namespace JdeScoping.DataSync.Services;
/// </summary>
public class TableSyncOperation : ITableSyncOperation
{
private readonly IEtlPipelineFactory _pipelineFactory;
private readonly IEtlPipelineBuilder _pipelineBuilder;
private readonly IDataUpdateRepository _updateRepository;
private readonly IOptions<DataSyncOptions> _options;
private readonly ILogger<TableSyncOperation> _logger;
@@ -24,19 +23,19 @@ public class TableSyncOperation : ITableSyncOperation
/// <summary>
/// Initializes a new instance of the <see cref="TableSyncOperation"/> class.
/// </summary>
/// <param name="pipelineFactory">Factory for creating ETL pipelines.</param>
/// <param name="pipelineBuilder">Builder for creating ETL pipelines.</param>
/// <param name="updateRepository">Repository for managing data update records.</param>
/// <param name="options">Data sync configuration options.</param>
/// <param name="logger">Logger for operation events.</param>
/// <param name="metrics">Metrics collector for operation tracking.</param>
public TableSyncOperation(
IEtlPipelineFactory pipelineFactory,
IEtlPipelineBuilder pipelineBuilder,
IDataUpdateRepository updateRepository,
IOptions<DataSyncOptions> options,
ILogger<TableSyncOperation> logger,
DataSyncMetrics metrics)
{
_pipelineFactory = pipelineFactory ?? throw new ArgumentNullException(nameof(pipelineFactory));
_pipelineBuilder = pipelineBuilder ?? throw new ArgumentNullException(nameof(pipelineBuilder));
_updateRepository = updateRepository ?? throw new ArgumentNullException(nameof(updateRepository));
_options = options ?? throw new ArgumentNullException(nameof(options));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
@@ -162,12 +161,16 @@ public class TableSyncOperation : ITableSyncOperation
{
_logger.LogDebug("Building pipeline for {Table} with UpdateType={UpdateType}", task.TableName, task.UpdateType);
// Build and execute the pipeline using the task's UpdateType directly
var pipeline = _pipelineFactory
.ForTable(task.TableName)
.WithUpdateType(task.UpdateType)
.WithMinimumDate(task.MinimumDt)
.Build();
// Ensure the task has a pipeline configuration
if (task.Pipeline == null)
{
throw new InvalidOperationException(
$"No pipeline configuration for {task.TableName}. " +
"Ensure the task was created with a valid EtlPipelineConfig.");
}
// Build and execute the pipeline using the task's pipeline configuration
var pipeline = _pipelineBuilder.Build(task.Pipeline, task.UpdateType, task.MinimumDt);
var result = await pipeline.ExecuteAsync(cancellationToken);