diff --git a/NEW/src/JdeScoping.DataSync/Contracts/IEtlPipelineFactory.cs b/NEW/src/JdeScoping.DataSync/Contracts/IEtlPipelineFactory.cs
index 2dff781..08e9e97 100644
--- a/NEW/src/JdeScoping.DataSync/Contracts/IEtlPipelineFactory.cs
+++ b/NEW/src/JdeScoping.DataSync/Contracts/IEtlPipelineFactory.cs
@@ -1,3 +1,4 @@
+using JdeScoping.Core.Models.Enums;
using JdeScoping.DataSync.Etl.Pipeline;
namespace JdeScoping.DataSync.Contracts;
@@ -9,7 +10,31 @@ public interface IEtlPipelineFactory
public interface IEtlPipelineBuilder
{
+ ///
+ /// Sets the sync mode for this pipeline.
+ ///
+ /// The sync mode (Mass or Incremental).
+ /// The builder for chaining.
+ [Obsolete("Use WithUpdateType instead")]
IEtlPipelineBuilder WithMode(SyncMode mode);
+
+ ///
+ /// Sets the update type for this pipeline (Mass, Daily, or Hourly).
+ ///
+ /// The update type.
+ /// The builder for chaining.
+ IEtlPipelineBuilder WithUpdateType(UpdateTypes updateType);
+
+ ///
+ /// Sets an optional minimum date for filtering source data.
+ ///
+ /// The minimum date, or null to use config offset.
+ /// The builder for chaining.
IEtlPipelineBuilder WithMinimumDate(DateTime? minDt);
+
+ ///
+ /// Builds the pipeline with the configured settings.
+ ///
+ /// The configured pipeline.
EtlPipeline Build();
}
diff --git a/NEW/src/JdeScoping.DataSync/Services/EtlPipelineFactory.cs b/NEW/src/JdeScoping.DataSync/Services/EtlPipelineFactory.cs
index 9ed61e6..bc83eb6 100644
--- a/NEW/src/JdeScoping.DataSync/Services/EtlPipelineFactory.cs
+++ b/NEW/src/JdeScoping.DataSync/Services/EtlPipelineFactory.cs
@@ -1,4 +1,5 @@
using System.Text.Json;
+using JdeScoping.Core.Models.Enums;
using JdeScoping.DataAccess.Interfaces;
using JdeScoping.DataSync.Configuration;
using JdeScoping.DataSync.Contracts;
@@ -85,6 +86,7 @@ public class EtlPipelineFactory : IEtlPipelineFactory
tableName,
pipelineConfig,
_config.EffectiveSettings,
+ _config.EffectiveScheduleDefaults,
_logger);
}
@@ -113,17 +115,34 @@ public class EtlPipelineFactory : IEtlPipelineFactory
{
foreach (var (name, config) in root.Pipelines)
{
- // Validate required sync modes
- if (!config.SyncModes.ContainsKey("mass"))
- {
- throw new InvalidOperationException(
- $"Pipeline '{name}' missing required 'mass' sync mode.");
- }
+ // Accept either old SyncModes or new Schedules format
+ var hasOldConfig = config.SyncModes != null &&
+ config.SyncModes.ContainsKey("mass") &&
+ config.SyncModes.ContainsKey("incremental");
+ var hasNewConfig = config.Schedules != null;
- if (!config.SyncModes.ContainsKey("incremental"))
+ if (!hasOldConfig && !hasNewConfig)
{
- throw new InvalidOperationException(
- $"Pipeline '{name}' missing required 'incremental' sync mode.");
+ // If neither format is present, check for the old partial config for backward-compat error messages
+ if (config.SyncModes != null)
+ {
+ if (!config.SyncModes.ContainsKey("mass"))
+ {
+ throw new InvalidOperationException(
+ $"Pipeline '{name}' missing required 'mass' sync mode.");
+ }
+
+ if (!config.SyncModes.ContainsKey("incremental"))
+ {
+ throw new InvalidOperationException(
+ $"Pipeline '{name}' missing required 'incremental' sync mode.");
+ }
+ }
+ else
+ {
+ throw new InvalidOperationException(
+ $"Pipeline '{name}' must define either 'syncModes' (mass+incremental) or 'schedules'.");
+ }
}
// Validate no runtime parameters (not yet supported)
@@ -148,8 +167,9 @@ public class EtlPipelineFactory : IEtlPipelineFactory
private readonly string _tableName;
private readonly PipelineConfig _config;
private readonly PipelineSettings _settings;
+ private readonly ScheduleDefaults _scheduleDefaults;
private readonly ILogger _logger;
- private SyncMode _mode = SyncMode.Incremental;
+ private UpdateTypes _updateType = UpdateTypes.Hourly;
private DateTime? _minDtOverride;
public PipelineBuilder(
@@ -157,18 +177,28 @@ public class EtlPipelineFactory : IEtlPipelineFactory
string tableName,
PipelineConfig config,
PipelineSettings settings,
+ ScheduleDefaults scheduleDefaults,
ILogger logger)
{
_connectionFactory = connectionFactory;
_tableName = tableName;
_config = config;
_settings = settings;
+ _scheduleDefaults = scheduleDefaults;
_logger = logger;
}
+ [Obsolete("Use WithUpdateType instead")]
public IEtlPipelineBuilder WithMode(SyncMode mode)
{
- _mode = mode;
+ // Map old SyncMode to new UpdateTypes for backward compatibility
+ _updateType = mode == SyncMode.Mass ? UpdateTypes.Mass : UpdateTypes.Hourly;
+ return this;
+ }
+
+ public IEtlPipelineBuilder WithUpdateType(UpdateTypes updateType)
+ {
+ _updateType = updateType;
return this;
}
@@ -180,9 +210,74 @@ public class EtlPipelineFactory : IEtlPipelineFactory
public EtlPipeline Build()
{
- var modeKey = _mode == SyncMode.Mass ? "mass" : "incremental";
+ // Check if using new Schedules format or old SyncModes format
+ if (_config.Schedules != null)
+ {
+ return BuildWithSchedules();
+ }
+ else
+ {
+ return BuildWithSyncModes();
+ }
+ }
- if (!_config.SyncModes.TryGetValue(modeKey, out var modeConfig))
+ private EtlPipeline BuildWithSchedules()
+ {
+ var scheduleConfig = GetEffectiveScheduleConfig(_updateType);
+
+ // Compute MinDt from override
+ var minDt = _minDtOverride;
+
+ // Use massQuery for Mass, regular query for Daily/Hourly
+ var useMassQuery = _updateType == UpdateTypes.Mass && !string.IsNullOrEmpty(_config.Source.MassQuery);
+
+ // Create source with parameter substitution
+ var source = CreateSourceWithUpdateType(_config.Source, minDt, useMassQuery);
+
+ // Determine destination type (Mass with prePurge = bulkImport, others = bulkMerge unless prePurge)
+ var destType = scheduleConfig.PrePurge ? "bulkImport" : "bulkMerge";
+ var destination = CreateDestinationWithSchedule(destType, _config.Destination, scheduleConfig);
+
+ // Build pipeline with scripts
+ var builder = new EtlPipelineBuilder()
+ .WithName(_tableName)
+ .WithSource(source)
+ .WithDestination(destination)
+ .WithLogger(_logger);
+
+ // Add pre-scripts: config scripts first, then prePurge
+ foreach (var script in _config.PreScripts ?? [])
+ {
+ builder.WithPreScript(new SqlScriptRunner(_connectionFactory, script, $"PreScript:{script.Substring(0, Math.Min(30, script.Length))}"));
+ }
+
+ if (scheduleConfig.PrePurge)
+ {
+ var truncateSql = $"TRUNCATE TABLE [{_config.Destination.Table}]";
+ builder.WithPreScript(new SqlScriptRunner(_connectionFactory, truncateSql, "PrePurge"));
+ }
+
+ // Add post-scripts: reIndex first, then config scripts
+ if (scheduleConfig.ReIndex)
+ {
+ var reindexSql = $"ALTER INDEX ALL ON [{_config.Destination.Table}] REBUILD";
+ builder.WithPostScript(new SqlScriptRunner(_connectionFactory, reindexSql, "ReIndex"));
+ }
+
+ foreach (var script in _config.PostScripts ?? [])
+ {
+ builder.WithPostScript(new SqlScriptRunner(_connectionFactory, script, $"PostScript:{script.Substring(0, Math.Min(30, script.Length))}"));
+ }
+
+ return builder.Build();
+ }
+
+ private EtlPipeline BuildWithSyncModes()
+ {
+ // Map UpdateTypes to old sync mode keys for backward compatibility
+ var modeKey = _updateType == UpdateTypes.Mass ? "mass" : "incremental";
+
+ if (!_config.SyncModes!.TryGetValue(modeKey, out var modeConfig))
{
throw new InvalidOperationException(
$"Sync mode '{modeKey}' not defined for table '{_tableName}'.");
@@ -191,12 +286,15 @@ public class EtlPipelineFactory : IEtlPipelineFactory
// Compute MinDt from offset or override
var minDt = _minDtOverride ?? ComputeMinDt(modeConfig.MinDtOffset);
+ // Convert UpdateTypes to SyncMode for backward compatibility with CreateSource
+ var syncMode = _updateType == UpdateTypes.Mass ? SyncMode.Mass : SyncMode.Incremental;
+
// Create source with parameter substitution
- var source = CreateSource(_config.Source, minDt, _mode);
+ var source = CreateSource(_config.Source, minDt, syncMode);
// Determine destination type (mode override > default by mode)
var destType = modeConfig.Destination?.Type
- ?? (_mode == SyncMode.Mass ? "bulkImport" : "bulkMerge");
+ ?? (_updateType == UpdateTypes.Mass ? "bulkImport" : "bulkMerge");
var destination = CreateDestination(destType, _config.Destination, modeConfig);
// Build pipeline with scripts
@@ -237,6 +335,30 @@ public class EtlPipelineFactory : IEtlPipelineFactory
return builder.Build();
}
+ private Configuration.ScheduleConfig GetEffectiveScheduleConfig(UpdateTypes updateType)
+ {
+ // Get default for this update type
+ var defaultConfig = updateType switch
+ {
+ UpdateTypes.Mass => _scheduleDefaults.Mass,
+ UpdateTypes.Daily => _scheduleDefaults.Daily,
+ UpdateTypes.Hourly => _scheduleDefaults.Hourly,
+ _ => _scheduleDefaults.Hourly
+ };
+
+ // Get pipeline-specific override if exists
+ var pipelineConfig = updateType switch
+ {
+ UpdateTypes.Mass => _config.Schedules?.Mass,
+ UpdateTypes.Daily => _config.Schedules?.Daily,
+ UpdateTypes.Hourly => _config.Schedules?.Hourly,
+ _ => null
+ };
+
+ // Merge: pipeline config overrides defaults
+ return pipelineConfig?.MergeWith(defaultConfig) ?? defaultConfig;
+ }
+
private DateTime? ComputeMinDt(string? minDtOffset)
{
if (string.IsNullOrEmpty(minDtOffset))
@@ -321,5 +443,71 @@ public class EtlPipelineFactory : IEtlPipelineFactory
$"Unknown destination type: '{destType}'. Expected 'bulkImport' or 'bulkMerge'.")
};
}
+
+ private IImportSource CreateSourceWithUpdateType(SourceConfig sourceConfig, DateTime? minDt, bool useMassQuery)
+ {
+ // Use massQuery if specified, otherwise use the default query
+ var query = useMassQuery ? sourceConfig.MassQuery! : sourceConfig.Query;
+
+ var parameters = new Dictionary();
+ var converter = new ParameterFormatConverter(_settings.Timezone);
+
+ // Only add parameters when not using massQuery (mass queries typically don't need date parameters)
+ var needsParameters = !useMassQuery;
+
+ if (sourceConfig.Parameters != null && minDt.HasValue && needsParameters)
+ {
+ foreach (var (_, paramConfig) in sourceConfig.Parameters)
+ {
+ var paramValue = paramConfig.Source.ToLowerInvariant() switch
+ {
+ "offset" => converter.Convert(minDt.Value, paramConfig.Format),
+ "static" => paramConfig.Value
+ ?? throw new InvalidOperationException(
+ $"Static parameter '{paramConfig.Name}' requires a value."),
+ _ => throw new NotSupportedException(
+ $"Parameter source '{paramConfig.Source}' is not supported.")
+ };
+
+ // Use the parameter name exactly as configured (provider-specific)
+ parameters[paramConfig.Name] = paramValue;
+ }
+ }
+
+ return new DbQuerySource(
+ _connectionFactory,
+ sourceConfig.Connection,
+ query,
+ parameters);
+ }
+
+ private IImportDestination CreateDestinationWithSchedule(
+ string destType,
+ DestinationConfig baseConfig,
+ Configuration.ScheduleConfig scheduleConfig)
+ {
+ var tableName = baseConfig.Table;
+
+ // Use base config for match/exclude columns
+ var matchColumns = baseConfig.MatchColumns?.ToArray();
+ var excludeFromUpdate = baseConfig.ExcludeFromUpdate?.ToArray();
+
+ return destType.ToLowerInvariant() switch
+ {
+ "bulkimport" => new DbBulkImportDestination(_connectionFactory, tableName),
+
+ "bulkmerge" => new DbBulkMergeDestination(
+ _connectionFactory,
+ tableName,
+ matchColumns ?? throw new InvalidOperationException(
+ $"matchColumns required for bulkMerge destination on table '{tableName}'."),
+ updateColumns: null,
+ excludeFromUpdate: excludeFromUpdate,
+ updateCondition: scheduleConfig.UpdateWhen),
+
+ _ => throw new InvalidOperationException(
+ $"Unknown destination type: '{destType}'. Expected 'bulkImport' or 'bulkMerge'.")
+ };
+ }
}
}
diff --git a/NEW/tests/JdeScoping.DataSync.Tests/Services/EtlPipelineFactoryTests.cs b/NEW/tests/JdeScoping.DataSync.Tests/Services/EtlPipelineFactoryTests.cs
index b38c387..0759037 100644
--- a/NEW/tests/JdeScoping.DataSync.Tests/Services/EtlPipelineFactoryTests.cs
+++ b/NEW/tests/JdeScoping.DataSync.Tests/Services/EtlPipelineFactoryTests.cs
@@ -1,3 +1,4 @@
+using JdeScoping.Core.Models.Enums;
using JdeScoping.DataAccess.Interfaces;
using JdeScoping.DataSync.Configuration;
using JdeScoping.DataSync.Contracts;
@@ -156,6 +157,141 @@ public class EtlPipelineFactoryTests
#endregion
+ #region Builder WithUpdateType Tests
+
+ [Fact]
+ public void Builder_WithUpdateTypesMass_BuildsPipeline()
+ {
+ // Arrange
+ var config = CreateValidConfigWithSchedules();
+ var factory = CreateFactory(config);
+
+ // Act
+ var pipeline = factory.ForTable("TestTable")
+ .WithUpdateType(UpdateTypes.Mass)
+ .Build();
+
+ // Assert
+ pipeline.ShouldNotBeNull();
+ pipeline.PipelineName.ShouldBe("TestTable");
+ }
+
+ [Fact]
+ public void Builder_WithUpdateTypesDaily_BuildsPipeline()
+ {
+ // Arrange
+ var config = CreateValidConfigWithSchedules();
+ var factory = CreateFactory(config);
+
+ // Act
+ var pipeline = factory.ForTable("TestTable")
+ .WithUpdateType(UpdateTypes.Daily)
+ .Build();
+
+ // Assert
+ pipeline.ShouldNotBeNull();
+ pipeline.PipelineName.ShouldBe("TestTable");
+ }
+
+ [Fact]
+ public void Builder_WithUpdateTypesHourly_BuildsPipeline()
+ {
+ // Arrange
+ var config = CreateValidConfigWithSchedules();
+ var factory = CreateFactory(config);
+
+ // Act
+ var pipeline = factory.ForTable("TestTable")
+ .WithUpdateType(UpdateTypes.Hourly)
+ .Build();
+
+ // Assert
+ pipeline.ShouldNotBeNull();
+ pipeline.PipelineName.ShouldBe("TestTable");
+ }
+
+ [Fact]
+ public void Builder_WithUpdateTypesMass_UsesMassQuery()
+ {
+ // Arrange - config with massQuery should use it for Mass update type
+ var config = CreateValidConfigWithSchedules();
+ var factory = CreateFactory(config);
+
+ // Act
+ var pipeline = factory.ForTable("TestTable")
+ .WithUpdateType(UpdateTypes.Mass)
+ .Build();
+
+ // Assert
+ pipeline.ShouldNotBeNull();
+ }
+
+ [Fact]
+ public void Builder_WithUpdateTypesDaily_UsesRegularQuery()
+ {
+ // Arrange - Daily should use regular query with date filtering
+ var config = CreateValidConfigWithSchedules();
+ var factory = CreateFactory(config);
+
+ // Act
+ var pipeline = factory.ForTable("TestTable")
+ .WithUpdateType(UpdateTypes.Daily)
+ .Build();
+
+ // Assert
+ pipeline.ShouldNotBeNull();
+ }
+
+ [Fact]
+ public void Builder_WithUpdateTypesMass_AppliesPrePurgeFromScheduleConfig()
+ {
+ // Arrange - Mass schedule should have prePurge=true from defaults
+ var config = CreateValidConfigWithSchedules();
+ var factory = CreateFactory(config);
+
+ // Act - should not throw and should include truncate pre-script
+ var pipeline = factory.ForTable("TestTable")
+ .WithUpdateType(UpdateTypes.Mass)
+ .Build();
+
+ // Assert
+ pipeline.ShouldNotBeNull();
+ }
+
+ [Fact]
+ public void Builder_WithUpdateTypesMass_AppliesReIndexFromScheduleConfig()
+ {
+ // Arrange - Mass schedule should have reIndex=true from defaults
+ var config = CreateValidConfigWithSchedules();
+ var factory = CreateFactory(config);
+
+ // Act - should not throw and should include reindex post-script
+ var pipeline = factory.ForTable("TestTable")
+ .WithUpdateType(UpdateTypes.Mass)
+ .Build();
+
+ // Assert
+ pipeline.ShouldNotBeNull();
+ }
+
+ [Fact]
+ public void Builder_WithUpdateTypesHourly_UsesUpdateWhenFromDefaults()
+ {
+ // Arrange - Hourly should use updateWhen from defaults
+ var config = CreateValidConfigWithSchedules();
+ var factory = CreateFactory(config);
+
+ // Act
+ var pipeline = factory.ForTable("TestTable")
+ .WithUpdateType(UpdateTypes.Hourly)
+ .Build();
+
+ // Assert
+ pipeline.ShouldNotBeNull();
+ }
+
+ #endregion
+
#region Builder WithMinimumDate Tests
[Fact]
@@ -839,6 +975,34 @@ public class EtlPipelineFactoryTests
});
}
+ private PipelinesRoot CreateValidConfigWithSchedules()
+ {
+ return new PipelinesRoot(
+ new PipelineSettings("UTC"),
+ new ScheduleDefaults(),
+ new Dictionary
+ {
+ ["TestTable"] = new PipelineConfig(
+ new SourceConfig("lotfinder", "SELECT * FROM Test WHERE UpdateDt >= @MinDt",
+ new Dictionary
+ {
+ ["minDt"] = new ParameterConfig("@MinDt", null, "offset", null)
+ },
+ "SELECT * FROM Test"), // MassQuery
+ null, // No old SyncModes
+ new PipelineSchedules
+ {
+ Mass = new ScheduleConfig { PrePurge = true, ReIndex = true },
+ Daily = new ScheduleConfig(),
+ Hourly = new ScheduleConfig()
+ },
+ null, // Transformers
+ new DestinationConfig("TestTable", ["Id"], null),
+ null,
+ null)
+ });
+ }
+
private EtlPipelineFactory CreateFactory(PipelinesRoot config)
{
return new EtlPipelineFactory(_connectionFactory, config, _logger);