feat(datasync): add WithUpdateType to IEtlPipelineBuilder
- Add WithUpdateType(UpdateTypes) method to IEtlPipelineBuilder interface
- Mark existing WithMode(SyncMode) as [Obsolete("Use WithUpdateType instead")]
- Update PipelineBuilder to store UpdateTypes instead of SyncMode
- Add GetEffectiveScheduleConfig method to merge pipeline schedules with defaults
- Add BuildWithSchedules method for new Schedules-based config
- Update validation to support both old SyncModes and new Schedules formats
- Pass ScheduleDefaults from PipelinesRoot to PipelineBuilder
- For Mass mode: use massQuery, apply prePurge/reIndex from schedule config
- For Daily/Hourly: use regular query with date parameters
- Add 8 new tests for WithUpdateType functionality
This commit is contained in:
@@ -1,3 +1,4 @@
|
|||||||
|
using JdeScoping.Core.Models.Enums;
|
||||||
using JdeScoping.DataSync.Etl.Pipeline;
|
using JdeScoping.DataSync.Etl.Pipeline;
|
||||||
|
|
||||||
namespace JdeScoping.DataSync.Contracts;
|
namespace JdeScoping.DataSync.Contracts;
|
||||||
@@ -9,7 +10,31 @@ public interface IEtlPipelineFactory
|
|||||||
|
|
||||||
public interface IEtlPipelineBuilder
|
public interface IEtlPipelineBuilder
|
||||||
{
|
{
|
||||||
|
/// <summary>
|
||||||
|
/// Sets the sync mode for this pipeline.
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="mode">The sync mode (Mass or Incremental).</param>
|
||||||
|
/// <returns>The builder for chaining.</returns>
|
||||||
|
[Obsolete("Use WithUpdateType instead")]
|
||||||
IEtlPipelineBuilder WithMode(SyncMode mode);
|
IEtlPipelineBuilder WithMode(SyncMode mode);
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Sets the update type for this pipeline (Mass, Daily, or Hourly).
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="updateType">The update type.</param>
|
||||||
|
/// <returns>The builder for chaining.</returns>
|
||||||
|
IEtlPipelineBuilder WithUpdateType(UpdateTypes updateType);
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Sets an optional minimum date for filtering source data.
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="minDt">The minimum date, or null to use config offset.</param>
|
||||||
|
/// <returns>The builder for chaining.</returns>
|
||||||
IEtlPipelineBuilder WithMinimumDate(DateTime? minDt);
|
IEtlPipelineBuilder WithMinimumDate(DateTime? minDt);
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Builds the pipeline with the configured settings.
|
||||||
|
/// </summary>
|
||||||
|
/// <returns>The configured pipeline.</returns>
|
||||||
EtlPipeline Build();
|
EtlPipeline Build();
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,4 +1,5 @@
|
|||||||
using System.Text.Json;
|
using System.Text.Json;
|
||||||
|
using JdeScoping.Core.Models.Enums;
|
||||||
using JdeScoping.DataAccess.Interfaces;
|
using JdeScoping.DataAccess.Interfaces;
|
||||||
using JdeScoping.DataSync.Configuration;
|
using JdeScoping.DataSync.Configuration;
|
||||||
using JdeScoping.DataSync.Contracts;
|
using JdeScoping.DataSync.Contracts;
|
||||||
@@ -85,6 +86,7 @@ public class EtlPipelineFactory : IEtlPipelineFactory
|
|||||||
tableName,
|
tableName,
|
||||||
pipelineConfig,
|
pipelineConfig,
|
||||||
_config.EffectiveSettings,
|
_config.EffectiveSettings,
|
||||||
|
_config.EffectiveScheduleDefaults,
|
||||||
_logger);
|
_logger);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -113,17 +115,34 @@ public class EtlPipelineFactory : IEtlPipelineFactory
|
|||||||
{
|
{
|
||||||
foreach (var (name, config) in root.Pipelines)
|
foreach (var (name, config) in root.Pipelines)
|
||||||
{
|
{
|
||||||
// Validate required sync modes
|
// Accept either old SyncModes or new Schedules format
|
||||||
if (!config.SyncModes.ContainsKey("mass"))
|
var hasOldConfig = config.SyncModes != null &&
|
||||||
{
|
config.SyncModes.ContainsKey("mass") &&
|
||||||
throw new InvalidOperationException(
|
config.SyncModes.ContainsKey("incremental");
|
||||||
$"Pipeline '{name}' missing required 'mass' sync mode.");
|
var hasNewConfig = config.Schedules != null;
|
||||||
}
|
|
||||||
|
|
||||||
if (!config.SyncModes.ContainsKey("incremental"))
|
if (!hasOldConfig && !hasNewConfig)
|
||||||
{
|
{
|
||||||
throw new InvalidOperationException(
|
// If neither format is present, check for the old partial config for backward-compat error messages
|
||||||
$"Pipeline '{name}' missing required 'incremental' sync mode.");
|
if (config.SyncModes != null)
|
||||||
|
{
|
||||||
|
if (!config.SyncModes.ContainsKey("mass"))
|
||||||
|
{
|
||||||
|
throw new InvalidOperationException(
|
||||||
|
$"Pipeline '{name}' missing required 'mass' sync mode.");
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!config.SyncModes.ContainsKey("incremental"))
|
||||||
|
{
|
||||||
|
throw new InvalidOperationException(
|
||||||
|
$"Pipeline '{name}' missing required 'incremental' sync mode.");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
throw new InvalidOperationException(
|
||||||
|
$"Pipeline '{name}' must define either 'syncModes' (mass+incremental) or 'schedules'.");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Validate no runtime parameters (not yet supported)
|
// Validate no runtime parameters (not yet supported)
|
||||||
@@ -148,8 +167,9 @@ public class EtlPipelineFactory : IEtlPipelineFactory
|
|||||||
private readonly string _tableName;
|
private readonly string _tableName;
|
||||||
private readonly PipelineConfig _config;
|
private readonly PipelineConfig _config;
|
||||||
private readonly PipelineSettings _settings;
|
private readonly PipelineSettings _settings;
|
||||||
|
private readonly ScheduleDefaults _scheduleDefaults;
|
||||||
private readonly ILogger<EtlPipeline> _logger;
|
private readonly ILogger<EtlPipeline> _logger;
|
||||||
private SyncMode _mode = SyncMode.Incremental;
|
private UpdateTypes _updateType = UpdateTypes.Hourly;
|
||||||
private DateTime? _minDtOverride;
|
private DateTime? _minDtOverride;
|
||||||
|
|
||||||
public PipelineBuilder(
|
public PipelineBuilder(
|
||||||
@@ -157,18 +177,28 @@ public class EtlPipelineFactory : IEtlPipelineFactory
|
|||||||
string tableName,
|
string tableName,
|
||||||
PipelineConfig config,
|
PipelineConfig config,
|
||||||
PipelineSettings settings,
|
PipelineSettings settings,
|
||||||
|
ScheduleDefaults scheduleDefaults,
|
||||||
ILogger<EtlPipeline> logger)
|
ILogger<EtlPipeline> logger)
|
||||||
{
|
{
|
||||||
_connectionFactory = connectionFactory;
|
_connectionFactory = connectionFactory;
|
||||||
_tableName = tableName;
|
_tableName = tableName;
|
||||||
_config = config;
|
_config = config;
|
||||||
_settings = settings;
|
_settings = settings;
|
||||||
|
_scheduleDefaults = scheduleDefaults;
|
||||||
_logger = logger;
|
_logger = logger;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
[Obsolete("Use WithUpdateType instead")]
|
||||||
public IEtlPipelineBuilder WithMode(SyncMode mode)
|
public IEtlPipelineBuilder WithMode(SyncMode mode)
|
||||||
{
|
{
|
||||||
_mode = mode;
|
// Map old SyncMode to new UpdateTypes for backward compatibility
|
||||||
|
_updateType = mode == SyncMode.Mass ? UpdateTypes.Mass : UpdateTypes.Hourly;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public IEtlPipelineBuilder WithUpdateType(UpdateTypes updateType)
|
||||||
|
{
|
||||||
|
_updateType = updateType;
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -180,9 +210,74 @@ public class EtlPipelineFactory : IEtlPipelineFactory
|
|||||||
|
|
||||||
public EtlPipeline Build()
|
public EtlPipeline Build()
|
||||||
{
|
{
|
||||||
var modeKey = _mode == SyncMode.Mass ? "mass" : "incremental";
|
// Check if using new Schedules format or old SyncModes format
|
||||||
|
if (_config.Schedules != null)
|
||||||
|
{
|
||||||
|
return BuildWithSchedules();
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
return BuildWithSyncModes();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if (!_config.SyncModes.TryGetValue(modeKey, out var modeConfig))
|
private EtlPipeline BuildWithSchedules()
|
||||||
|
{
|
||||||
|
var scheduleConfig = GetEffectiveScheduleConfig(_updateType);
|
||||||
|
|
||||||
|
// Compute MinDt from override
|
||||||
|
var minDt = _minDtOverride;
|
||||||
|
|
||||||
|
// Use massQuery for Mass, regular query for Daily/Hourly
|
||||||
|
var useMassQuery = _updateType == UpdateTypes.Mass && !string.IsNullOrEmpty(_config.Source.MassQuery);
|
||||||
|
|
||||||
|
// Create source with parameter substitution
|
||||||
|
var source = CreateSourceWithUpdateType(_config.Source, minDt, useMassQuery);
|
||||||
|
|
||||||
|
// Determine destination type (Mass with prePurge = bulkImport, others = bulkMerge unless prePurge)
|
||||||
|
var destType = scheduleConfig.PrePurge ? "bulkImport" : "bulkMerge";
|
||||||
|
var destination = CreateDestinationWithSchedule(destType, _config.Destination, scheduleConfig);
|
||||||
|
|
||||||
|
// Build pipeline with scripts
|
||||||
|
var builder = new EtlPipelineBuilder()
|
||||||
|
.WithName(_tableName)
|
||||||
|
.WithSource(source)
|
||||||
|
.WithDestination(destination)
|
||||||
|
.WithLogger(_logger);
|
||||||
|
|
||||||
|
// Add pre-scripts: config scripts first, then prePurge
|
||||||
|
foreach (var script in _config.PreScripts ?? [])
|
||||||
|
{
|
||||||
|
builder.WithPreScript(new SqlScriptRunner(_connectionFactory, script, $"PreScript:{script.Substring(0, Math.Min(30, script.Length))}"));
|
||||||
|
}
|
||||||
|
|
||||||
|
if (scheduleConfig.PrePurge)
|
||||||
|
{
|
||||||
|
var truncateSql = $"TRUNCATE TABLE [{_config.Destination.Table}]";
|
||||||
|
builder.WithPreScript(new SqlScriptRunner(_connectionFactory, truncateSql, "PrePurge"));
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add post-scripts: reIndex first, then config scripts
|
||||||
|
if (scheduleConfig.ReIndex)
|
||||||
|
{
|
||||||
|
var reindexSql = $"ALTER INDEX ALL ON [{_config.Destination.Table}] REBUILD";
|
||||||
|
builder.WithPostScript(new SqlScriptRunner(_connectionFactory, reindexSql, "ReIndex"));
|
||||||
|
}
|
||||||
|
|
||||||
|
foreach (var script in _config.PostScripts ?? [])
|
||||||
|
{
|
||||||
|
builder.WithPostScript(new SqlScriptRunner(_connectionFactory, script, $"PostScript:{script.Substring(0, Math.Min(30, script.Length))}"));
|
||||||
|
}
|
||||||
|
|
||||||
|
return builder.Build();
|
||||||
|
}
|
||||||
|
|
||||||
|
private EtlPipeline BuildWithSyncModes()
|
||||||
|
{
|
||||||
|
// Map UpdateTypes to old sync mode keys for backward compatibility
|
||||||
|
var modeKey = _updateType == UpdateTypes.Mass ? "mass" : "incremental";
|
||||||
|
|
||||||
|
if (!_config.SyncModes!.TryGetValue(modeKey, out var modeConfig))
|
||||||
{
|
{
|
||||||
throw new InvalidOperationException(
|
throw new InvalidOperationException(
|
||||||
$"Sync mode '{modeKey}' not defined for table '{_tableName}'.");
|
$"Sync mode '{modeKey}' not defined for table '{_tableName}'.");
|
||||||
@@ -191,12 +286,15 @@ public class EtlPipelineFactory : IEtlPipelineFactory
|
|||||||
// Compute MinDt from offset or override
|
// Compute MinDt from offset or override
|
||||||
var minDt = _minDtOverride ?? ComputeMinDt(modeConfig.MinDtOffset);
|
var minDt = _minDtOverride ?? ComputeMinDt(modeConfig.MinDtOffset);
|
||||||
|
|
||||||
|
// Convert UpdateTypes to SyncMode for backward compatibility with CreateSource
|
||||||
|
var syncMode = _updateType == UpdateTypes.Mass ? SyncMode.Mass : SyncMode.Incremental;
|
||||||
|
|
||||||
// Create source with parameter substitution
|
// Create source with parameter substitution
|
||||||
var source = CreateSource(_config.Source, minDt, _mode);
|
var source = CreateSource(_config.Source, minDt, syncMode);
|
||||||
|
|
||||||
// Determine destination type (mode override > default by mode)
|
// Determine destination type (mode override > default by mode)
|
||||||
var destType = modeConfig.Destination?.Type
|
var destType = modeConfig.Destination?.Type
|
||||||
?? (_mode == SyncMode.Mass ? "bulkImport" : "bulkMerge");
|
?? (_updateType == UpdateTypes.Mass ? "bulkImport" : "bulkMerge");
|
||||||
var destination = CreateDestination(destType, _config.Destination, modeConfig);
|
var destination = CreateDestination(destType, _config.Destination, modeConfig);
|
||||||
|
|
||||||
// Build pipeline with scripts
|
// Build pipeline with scripts
|
||||||
@@ -237,6 +335,30 @@ public class EtlPipelineFactory : IEtlPipelineFactory
|
|||||||
return builder.Build();
|
return builder.Build();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private Configuration.ScheduleConfig GetEffectiveScheduleConfig(UpdateTypes updateType)
|
||||||
|
{
|
||||||
|
// Get default for this update type
|
||||||
|
var defaultConfig = updateType switch
|
||||||
|
{
|
||||||
|
UpdateTypes.Mass => _scheduleDefaults.Mass,
|
||||||
|
UpdateTypes.Daily => _scheduleDefaults.Daily,
|
||||||
|
UpdateTypes.Hourly => _scheduleDefaults.Hourly,
|
||||||
|
_ => _scheduleDefaults.Hourly
|
||||||
|
};
|
||||||
|
|
||||||
|
// Get pipeline-specific override if exists
|
||||||
|
var pipelineConfig = updateType switch
|
||||||
|
{
|
||||||
|
UpdateTypes.Mass => _config.Schedules?.Mass,
|
||||||
|
UpdateTypes.Daily => _config.Schedules?.Daily,
|
||||||
|
UpdateTypes.Hourly => _config.Schedules?.Hourly,
|
||||||
|
_ => null
|
||||||
|
};
|
||||||
|
|
||||||
|
// Merge: pipeline config overrides defaults
|
||||||
|
return pipelineConfig?.MergeWith(defaultConfig) ?? defaultConfig;
|
||||||
|
}
|
||||||
|
|
||||||
private DateTime? ComputeMinDt(string? minDtOffset)
|
private DateTime? ComputeMinDt(string? minDtOffset)
|
||||||
{
|
{
|
||||||
if (string.IsNullOrEmpty(minDtOffset))
|
if (string.IsNullOrEmpty(minDtOffset))
|
||||||
@@ -321,5 +443,71 @@ public class EtlPipelineFactory : IEtlPipelineFactory
|
|||||||
$"Unknown destination type: '{destType}'. Expected 'bulkImport' or 'bulkMerge'.")
|
$"Unknown destination type: '{destType}'. Expected 'bulkImport' or 'bulkMerge'.")
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private IImportSource CreateSourceWithUpdateType(SourceConfig sourceConfig, DateTime? minDt, bool useMassQuery)
|
||||||
|
{
|
||||||
|
// Use massQuery if specified, otherwise use the default query
|
||||||
|
var query = useMassQuery ? sourceConfig.MassQuery! : sourceConfig.Query;
|
||||||
|
|
||||||
|
var parameters = new Dictionary<string, object>();
|
||||||
|
var converter = new ParameterFormatConverter(_settings.Timezone);
|
||||||
|
|
||||||
|
// Only add parameters when not using massQuery (mass queries typically don't need date parameters)
|
||||||
|
var needsParameters = !useMassQuery;
|
||||||
|
|
||||||
|
if (sourceConfig.Parameters != null && minDt.HasValue && needsParameters)
|
||||||
|
{
|
||||||
|
foreach (var (_, paramConfig) in sourceConfig.Parameters)
|
||||||
|
{
|
||||||
|
var paramValue = paramConfig.Source.ToLowerInvariant() switch
|
||||||
|
{
|
||||||
|
"offset" => converter.Convert(minDt.Value, paramConfig.Format),
|
||||||
|
"static" => paramConfig.Value
|
||||||
|
?? throw new InvalidOperationException(
|
||||||
|
$"Static parameter '{paramConfig.Name}' requires a value."),
|
||||||
|
_ => throw new NotSupportedException(
|
||||||
|
$"Parameter source '{paramConfig.Source}' is not supported.")
|
||||||
|
};
|
||||||
|
|
||||||
|
// Use the parameter name exactly as configured (provider-specific)
|
||||||
|
parameters[paramConfig.Name] = paramValue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return new DbQuerySource(
|
||||||
|
_connectionFactory,
|
||||||
|
sourceConfig.Connection,
|
||||||
|
query,
|
||||||
|
parameters);
|
||||||
|
}
|
||||||
|
|
||||||
|
private IImportDestination CreateDestinationWithSchedule(
|
||||||
|
string destType,
|
||||||
|
DestinationConfig baseConfig,
|
||||||
|
Configuration.ScheduleConfig scheduleConfig)
|
||||||
|
{
|
||||||
|
var tableName = baseConfig.Table;
|
||||||
|
|
||||||
|
// Use base config for match/exclude columns
|
||||||
|
var matchColumns = baseConfig.MatchColumns?.ToArray();
|
||||||
|
var excludeFromUpdate = baseConfig.ExcludeFromUpdate?.ToArray();
|
||||||
|
|
||||||
|
return destType.ToLowerInvariant() switch
|
||||||
|
{
|
||||||
|
"bulkimport" => new DbBulkImportDestination(_connectionFactory, tableName),
|
||||||
|
|
||||||
|
"bulkmerge" => new DbBulkMergeDestination(
|
||||||
|
_connectionFactory,
|
||||||
|
tableName,
|
||||||
|
matchColumns ?? throw new InvalidOperationException(
|
||||||
|
$"matchColumns required for bulkMerge destination on table '{tableName}'."),
|
||||||
|
updateColumns: null,
|
||||||
|
excludeFromUpdate: excludeFromUpdate,
|
||||||
|
updateCondition: scheduleConfig.UpdateWhen),
|
||||||
|
|
||||||
|
_ => throw new InvalidOperationException(
|
||||||
|
$"Unknown destination type: '{destType}'. Expected 'bulkImport' or 'bulkMerge'.")
|
||||||
|
};
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,3 +1,4 @@
|
|||||||
|
using JdeScoping.Core.Models.Enums;
|
||||||
using JdeScoping.DataAccess.Interfaces;
|
using JdeScoping.DataAccess.Interfaces;
|
||||||
using JdeScoping.DataSync.Configuration;
|
using JdeScoping.DataSync.Configuration;
|
||||||
using JdeScoping.DataSync.Contracts;
|
using JdeScoping.DataSync.Contracts;
|
||||||
@@ -156,6 +157,141 @@ public class EtlPipelineFactoryTests
|
|||||||
|
|
||||||
#endregion
|
#endregion
|
||||||
|
|
||||||
|
#region Builder WithUpdateType Tests
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void Builder_WithUpdateTypesMass_BuildsPipeline()
|
||||||
|
{
|
||||||
|
// Arrange
|
||||||
|
var config = CreateValidConfigWithSchedules();
|
||||||
|
var factory = CreateFactory(config);
|
||||||
|
|
||||||
|
// Act
|
||||||
|
var pipeline = factory.ForTable("TestTable")
|
||||||
|
.WithUpdateType(UpdateTypes.Mass)
|
||||||
|
.Build();
|
||||||
|
|
||||||
|
// Assert
|
||||||
|
pipeline.ShouldNotBeNull();
|
||||||
|
pipeline.PipelineName.ShouldBe("TestTable");
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void Builder_WithUpdateTypesDaily_BuildsPipeline()
|
||||||
|
{
|
||||||
|
// Arrange
|
||||||
|
var config = CreateValidConfigWithSchedules();
|
||||||
|
var factory = CreateFactory(config);
|
||||||
|
|
||||||
|
// Act
|
||||||
|
var pipeline = factory.ForTable("TestTable")
|
||||||
|
.WithUpdateType(UpdateTypes.Daily)
|
||||||
|
.Build();
|
||||||
|
|
||||||
|
// Assert
|
||||||
|
pipeline.ShouldNotBeNull();
|
||||||
|
pipeline.PipelineName.ShouldBe("TestTable");
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void Builder_WithUpdateTypesHourly_BuildsPipeline()
|
||||||
|
{
|
||||||
|
// Arrange
|
||||||
|
var config = CreateValidConfigWithSchedules();
|
||||||
|
var factory = CreateFactory(config);
|
||||||
|
|
||||||
|
// Act
|
||||||
|
var pipeline = factory.ForTable("TestTable")
|
||||||
|
.WithUpdateType(UpdateTypes.Hourly)
|
||||||
|
.Build();
|
||||||
|
|
||||||
|
// Assert
|
||||||
|
pipeline.ShouldNotBeNull();
|
||||||
|
pipeline.PipelineName.ShouldBe("TestTable");
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void Builder_WithUpdateTypesMass_UsesMassQuery()
|
||||||
|
{
|
||||||
|
// Arrange - config with massQuery should use it for Mass update type
|
||||||
|
var config = CreateValidConfigWithSchedules();
|
||||||
|
var factory = CreateFactory(config);
|
||||||
|
|
||||||
|
// Act
|
||||||
|
var pipeline = factory.ForTable("TestTable")
|
||||||
|
.WithUpdateType(UpdateTypes.Mass)
|
||||||
|
.Build();
|
||||||
|
|
||||||
|
// Assert
|
||||||
|
pipeline.ShouldNotBeNull();
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void Builder_WithUpdateTypesDaily_UsesRegularQuery()
|
||||||
|
{
|
||||||
|
// Arrange - Daily should use regular query with date filtering
|
||||||
|
var config = CreateValidConfigWithSchedules();
|
||||||
|
var factory = CreateFactory(config);
|
||||||
|
|
||||||
|
// Act
|
||||||
|
var pipeline = factory.ForTable("TestTable")
|
||||||
|
.WithUpdateType(UpdateTypes.Daily)
|
||||||
|
.Build();
|
||||||
|
|
||||||
|
// Assert
|
||||||
|
pipeline.ShouldNotBeNull();
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void Builder_WithUpdateTypesMass_AppliesPrePurgeFromScheduleConfig()
|
||||||
|
{
|
||||||
|
// Arrange - Mass schedule should have prePurge=true from defaults
|
||||||
|
var config = CreateValidConfigWithSchedules();
|
||||||
|
var factory = CreateFactory(config);
|
||||||
|
|
||||||
|
// Act - should not throw and should include truncate pre-script
|
||||||
|
var pipeline = factory.ForTable("TestTable")
|
||||||
|
.WithUpdateType(UpdateTypes.Mass)
|
||||||
|
.Build();
|
||||||
|
|
||||||
|
// Assert
|
||||||
|
pipeline.ShouldNotBeNull();
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void Builder_WithUpdateTypesMass_AppliesReIndexFromScheduleConfig()
|
||||||
|
{
|
||||||
|
// Arrange - Mass schedule should have reIndex=true from defaults
|
||||||
|
var config = CreateValidConfigWithSchedules();
|
||||||
|
var factory = CreateFactory(config);
|
||||||
|
|
||||||
|
// Act - should not throw and should include reindex post-script
|
||||||
|
var pipeline = factory.ForTable("TestTable")
|
||||||
|
.WithUpdateType(UpdateTypes.Mass)
|
||||||
|
.Build();
|
||||||
|
|
||||||
|
// Assert
|
||||||
|
pipeline.ShouldNotBeNull();
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void Builder_WithUpdateTypesHourly_UsesUpdateWhenFromDefaults()
|
||||||
|
{
|
||||||
|
// Arrange - Hourly should use updateWhen from defaults
|
||||||
|
var config = CreateValidConfigWithSchedules();
|
||||||
|
var factory = CreateFactory(config);
|
||||||
|
|
||||||
|
// Act
|
||||||
|
var pipeline = factory.ForTable("TestTable")
|
||||||
|
.WithUpdateType(UpdateTypes.Hourly)
|
||||||
|
.Build();
|
||||||
|
|
||||||
|
// Assert
|
||||||
|
pipeline.ShouldNotBeNull();
|
||||||
|
}
|
||||||
|
|
||||||
|
#endregion
|
||||||
|
|
||||||
#region Builder WithMinimumDate Tests
|
#region Builder WithMinimumDate Tests
|
||||||
|
|
||||||
[Fact]
|
[Fact]
|
||||||
@@ -839,6 +975,34 @@ public class EtlPipelineFactoryTests
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private PipelinesRoot CreateValidConfigWithSchedules()
|
||||||
|
{
|
||||||
|
return new PipelinesRoot(
|
||||||
|
new PipelineSettings("UTC"),
|
||||||
|
new ScheduleDefaults(),
|
||||||
|
new Dictionary<string, PipelineConfig>
|
||||||
|
{
|
||||||
|
["TestTable"] = new PipelineConfig(
|
||||||
|
new SourceConfig("lotfinder", "SELECT * FROM Test WHERE UpdateDt >= @MinDt",
|
||||||
|
new Dictionary<string, ParameterConfig>
|
||||||
|
{
|
||||||
|
["minDt"] = new ParameterConfig("@MinDt", null, "offset", null)
|
||||||
|
},
|
||||||
|
"SELECT * FROM Test"), // MassQuery
|
||||||
|
null, // No old SyncModes
|
||||||
|
new PipelineSchedules
|
||||||
|
{
|
||||||
|
Mass = new ScheduleConfig { PrePurge = true, ReIndex = true },
|
||||||
|
Daily = new ScheduleConfig(),
|
||||||
|
Hourly = new ScheduleConfig()
|
||||||
|
},
|
||||||
|
null, // Transformers
|
||||||
|
new DestinationConfig("TestTable", ["Id"], null),
|
||||||
|
null,
|
||||||
|
null)
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
private EtlPipelineFactory CreateFactory(PipelinesRoot config)
|
private EtlPipelineFactory CreateFactory(PipelinesRoot config)
|
||||||
{
|
{
|
||||||
return new EtlPipelineFactory(_connectionFactory, config, _logger);
|
return new EtlPipelineFactory(_connectionFactory, config, _logger);
|
||||||
|
|||||||
Reference in New Issue
Block a user