15-task plan to align pipelines.json with legacy DataSyncReport.md: - Phase 1: Schema & Models (Tasks 1-3) - Phase 2: GIW Connection (Tasks 4-5) - Phase 3: Core ETL Changes (Tasks 6-8) - Phase 4: Pipeline Configurations (Tasks 9-11) - Phase 5: Validation & Testing (Tasks 12-15)
39 KiB
Pipeline Schedule Alignment Implementation Plan
For Claude: REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task.
Goal: Align pipelines.json with legacy DataSyncReport.md by supporting three schedules (Mass/Daily/Hourly), adding 8 missing pipelines, and adding GIW connection for StatusCode.
Architecture: Replace SyncMode enum with UpdateTypes throughout the ETL system. Add scheduleDefaults to pipelines.json for global defaults. Each pipeline defines schedules (mass/daily/hourly) that inherit from or override defaults. Add GIW connection type for StatusCode pipeline.
Tech Stack: .NET 10, System.Text.Json, Oracle.ManagedDataAccess, Microsoft.Data.SqlClient, xUnit, NSubstitute, Shouldly
Phase 1: Schema & Models
Task 1: Add ScheduleConfig and ScheduleDefaults Models
Files:
- Create:
src/JdeScoping.DataSync/Configuration/ScheduleConfig.cs
Step 1: Write the failing test
Create test file tests/JdeScoping.DataSync.Tests/Configuration/ScheduleConfigTests.cs:
using JdeScoping.DataSync.Configuration;
using Shouldly;
namespace JdeScoping.DataSync.Tests.Configuration;
public class ScheduleConfigTests
{
[Fact]
public void ScheduleConfig_DefaultValues_AreCorrect()
{
var config = new ScheduleConfig();
config.Enabled.ShouldBeTrue();
config.IntervalMinutes.ShouldBe(0);
config.PrePurge.ShouldBeFalse();
config.ReIndex.ShouldBeFalse();
config.UpdateWhen.ShouldBeNull();
}
[Fact]
public void ScheduleConfig_WithValues_StoresCorrectly()
{
var config = new ScheduleConfig
{
Enabled = false,
IntervalMinutes = 60,
PrePurge = true,
ReIndex = true,
UpdateWhen = "src.LastUpdateDt > tgt.LastUpdateDt"
};
config.Enabled.ShouldBeFalse();
config.IntervalMinutes.ShouldBe(60);
config.PrePurge.ShouldBeTrue();
config.ReIndex.ShouldBeTrue();
config.UpdateWhen.ShouldBe("src.LastUpdateDt > tgt.LastUpdateDt");
}
[Fact]
public void ScheduleDefaults_HasCorrectDefaultValues()
{
var defaults = new ScheduleDefaults();
defaults.Mass.ShouldNotBeNull();
defaults.Daily.ShouldNotBeNull();
defaults.Hourly.ShouldNotBeNull();
}
}
Step 2: Run test to verify it fails
Run: dotnet test tests/JdeScoping.DataSync.Tests --filter "FullyQualifiedName~ScheduleConfigTests" --verbosity normal
Expected: FAIL with "type or namespace 'ScheduleConfig' could not be found"
Step 3: Write minimal implementation
Create src/JdeScoping.DataSync/Configuration/ScheduleConfig.cs:
namespace JdeScoping.DataSync.Configuration;
/// <summary>
/// Configuration for a single schedule type (Mass/Daily/Hourly).
/// </summary>
public record ScheduleConfig
{
/// <summary>
/// Whether this schedule is enabled.
/// </summary>
public bool Enabled { get; init; } = true;
/// <summary>
/// Interval in minutes between syncs.
/// </summary>
public int IntervalMinutes { get; init; }
/// <summary>
/// Whether to truncate the table before import (full reload).
/// </summary>
public bool PrePurge { get; init; }
/// <summary>
/// Whether to rebuild indexes after import.
/// </summary>
public bool ReIndex { get; init; }
/// <summary>
/// Condition for updating existing rows (e.g., "src.LastUpdateDt > tgt.LastUpdateDt").
/// </summary>
public string? UpdateWhen { get; init; }
/// <summary>
/// Merges this config with defaults. Non-null/non-default values in this config override defaults.
/// </summary>
public ScheduleConfig MergeWith(ScheduleConfig defaults)
{
return new ScheduleConfig
{
Enabled = Enabled,
IntervalMinutes = IntervalMinutes > 0 ? IntervalMinutes : defaults.IntervalMinutes,
PrePurge = PrePurge || defaults.PrePurge,
ReIndex = ReIndex || defaults.ReIndex,
UpdateWhen = UpdateWhen ?? defaults.UpdateWhen
};
}
}
/// <summary>
/// Default schedule configurations for all pipelines.
/// </summary>
public record ScheduleDefaults
{
/// <summary>
/// Default Mass schedule config (weekly, full reload).
/// </summary>
public ScheduleConfig Mass { get; init; } = new()
{
Enabled = true,
IntervalMinutes = 10080, // Weekly
PrePurge = true,
ReIndex = true
};
/// <summary>
/// Default Daily schedule config (incremental merge).
/// </summary>
public ScheduleConfig Daily { get; init; } = new()
{
Enabled = true,
IntervalMinutes = 1440, // Daily
PrePurge = false,
ReIndex = false,
UpdateWhen = "src.LastUpdateDt > tgt.LastUpdateDt"
};
/// <summary>
/// Default Hourly schedule config (incremental merge).
/// </summary>
public ScheduleConfig Hourly { get; init; } = new()
{
Enabled = true,
IntervalMinutes = 60, // Hourly
PrePurge = false,
ReIndex = false,
UpdateWhen = "src.LastUpdateDt > tgt.LastUpdateDt"
};
}
/// <summary>
/// Per-pipeline schedule overrides.
/// </summary>
public record PipelineSchedules
{
public ScheduleConfig? Mass { get; init; }
public ScheduleConfig? Daily { get; init; }
public ScheduleConfig? Hourly { get; init; }
}
Step 4: Run test to verify it passes
Run: dotnet test tests/JdeScoping.DataSync.Tests --filter "FullyQualifiedName~ScheduleConfigTests" --verbosity normal
Expected: PASS
Step 5: Commit
git add src/JdeScoping.DataSync/Configuration/ScheduleConfig.cs tests/JdeScoping.DataSync.Tests/Configuration/ScheduleConfigTests.cs
git commit -m "feat(datasync): add ScheduleConfig and ScheduleDefaults models"
Task 2: Update PipelinesRoot to Include ScheduleDefaults
Files:
- Modify:
src/JdeScoping.DataSync/Configuration/PipelinesRoot.cs - Test:
tests/JdeScoping.DataSync.Tests/Configuration/PipelinesRootTests.cs
Step 1: Write the failing test
Create tests/JdeScoping.DataSync.Tests/Configuration/PipelinesRootTests.cs:
using JdeScoping.DataSync.Configuration;
using Shouldly;
namespace JdeScoping.DataSync.Tests.Configuration;
public class PipelinesRootTests
{
[Fact]
public void EffectiveScheduleDefaults_WhenNull_ReturnsDefaults()
{
var root = new PipelinesRoot(null, null, new Dictionary<string, PipelineConfig>());
var defaults = root.EffectiveScheduleDefaults;
defaults.ShouldNotBeNull();
defaults.Mass.IntervalMinutes.ShouldBe(10080);
defaults.Daily.IntervalMinutes.ShouldBe(1440);
defaults.Hourly.IntervalMinutes.ShouldBe(60);
}
[Fact]
public void EffectiveScheduleDefaults_WhenProvided_ReturnsProvided()
{
var customDefaults = new ScheduleDefaults
{
Mass = new ScheduleConfig { IntervalMinutes = 20000 }
};
var root = new PipelinesRoot(null, customDefaults, new Dictionary<string, PipelineConfig>());
var defaults = root.EffectiveScheduleDefaults;
defaults.Mass.IntervalMinutes.ShouldBe(20000);
}
}
Step 2: Run test to verify it fails
Run: dotnet test tests/JdeScoping.DataSync.Tests --filter "FullyQualifiedName~PipelinesRootTests" --verbosity normal
Expected: FAIL
Step 3: Write minimal implementation
Update src/JdeScoping.DataSync/Configuration/PipelinesRoot.cs:
namespace JdeScoping.DataSync.Configuration;
public record PipelinesRoot(
PipelineSettings? Settings,
ScheduleDefaults? ScheduleDefaults,
Dictionary<string, PipelineConfig> Pipelines)
{
public PipelineSettings EffectiveSettings => Settings ?? new PipelineSettings();
public ScheduleDefaults EffectiveScheduleDefaults => ScheduleDefaults ?? new ScheduleDefaults();
}
public record PipelineSettings(
string Timezone = "UTC");
Step 4: Run test to verify it passes
Run: dotnet test tests/JdeScoping.DataSync.Tests --filter "FullyQualifiedName~PipelinesRootTests" --verbosity normal
Expected: PASS
Step 5: Commit
git add src/JdeScoping.DataSync/Configuration/PipelinesRoot.cs tests/JdeScoping.DataSync.Tests/Configuration/PipelinesRootTests.cs
git commit -m "feat(datasync): add ScheduleDefaults to PipelinesRoot"
Task 3: Update PipelineConfig to Include Schedules
Files:
- Modify:
src/JdeScoping.DataSync/Configuration/PipelineConfig.cs
Step 1: Write the failing test
Add to tests/JdeScoping.DataSync.Tests/Configuration/PipelinesRootTests.cs:
[Fact]
public void PipelineConfig_WithSchedules_ParsesCorrectly()
{
var config = new PipelineConfig(
new SourceConfig("jde", "SELECT 1", null, null),
null, // Old SyncModes - deprecated
new PipelineSchedules
{
Mass = new ScheduleConfig { PrePurge = true, ReIndex = true },
Daily = new ScheduleConfig { Enabled = true },
Hourly = new ScheduleConfig { Enabled = false }
},
null,
new DestinationConfig("TestTable", ["Id"], null),
null,
null);
config.Schedules.ShouldNotBeNull();
config.Schedules!.Mass!.PrePurge.ShouldBeTrue();
config.Schedules!.Hourly!.Enabled.ShouldBeFalse();
}
Step 2: Run test to verify it fails
Run: dotnet test tests/JdeScoping.DataSync.Tests --filter "FullyQualifiedName~PipelineConfig_WithSchedules" --verbosity normal
Expected: FAIL
Step 3: Write minimal implementation
Update src/JdeScoping.DataSync/Configuration/PipelineConfig.cs:
namespace JdeScoping.DataSync.Configuration;
public record PipelineConfig(
SourceConfig Source,
Dictionary<string, SyncModeConfig>? SyncModes, // Deprecated - kept for backward compatibility
PipelineSchedules? Schedules, // New schedule-based config
List<TransformerConfig>? Transformers,
DestinationConfig Destination,
List<string>? PreScripts,
List<string>? PostScripts);
public record SourceConfig(
string Connection,
string Query,
Dictionary<string, ParameterConfig>? Parameters,
string? MassQuery = null);
public record ParameterConfig(
string Name,
string? Format,
string Source = "offset",
string? Value = null);
public record SyncModeConfig(
string? MinDtOffset,
bool PrePurge = false,
bool ReIndex = false,
string? UpdateWhen = null,
DestinationOverride? Destination = null);
public record DestinationOverride(
string? Type,
List<string>? MatchColumns,
List<string>? ExcludeFromUpdate);
public record TransformerConfig(
string Type,
List<string>? Columns,
Dictionary<string, string>? Mappings);
public record DestinationConfig(
string Table,
List<string>? MatchColumns,
List<string>? ExcludeFromUpdate);
Step 4: Run test to verify it passes
Run: dotnet test tests/JdeScoping.DataSync.Tests --filter "FullyQualifiedName~PipelineConfig_WithSchedules" --verbosity normal
Expected: PASS
Step 5: Commit
git add src/JdeScoping.DataSync/Configuration/PipelineConfig.cs tests/JdeScoping.DataSync.Tests/Configuration/PipelinesRootTests.cs
git commit -m "feat(datasync): add Schedules property to PipelineConfig"
Phase 2: Infrastructure Changes (GIW Connection)
Task 4: Add GIW Connection to IDbConnectionFactory
Files:
- Modify:
src/JdeScoping.DataAccess/Interfaces/IDbConnectionFactory.cs - Modify:
src/JdeScoping.DataAccess/DbConnectionFactory.cs - Test:
tests/JdeScoping.DataAccess.Tests/DbConnectionFactoryTests.cs
Step 1: Write the failing test
Add to existing test file or create tests/JdeScoping.DataAccess.Tests/DbConnectionFactoryGiwTests.cs:
using JdeScoping.DataAccess;
using JdeScoping.DataAccess.Exceptions;
using JdeScoping.DataAccess.Interfaces;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging.Abstractions;
using Shouldly;
namespace JdeScoping.DataAccess.Tests;
public class DbConnectionFactoryGiwTests
{
[Fact]
public async Task CreateGiwConnectionAsync_WhenConnectionStringMissing_ThrowsConnectionException()
{
// Arrange
var config = new ConfigurationBuilder()
.AddInMemoryCollection(new Dictionary<string, string?>())
.Build();
var factory = new DbConnectionFactory(config, NullLogger<DbConnectionFactory>.Instance);
// Act & Assert
var ex = await Should.ThrowAsync<ConnectionException>(
() => factory.CreateGiwConnectionAsync());
ex.DataSource.ShouldBe("GIW");
}
}
Step 2: Run test to verify it fails
Run: dotnet test tests/JdeScoping.DataAccess.Tests --filter "FullyQualifiedName~CreateGiwConnectionAsync" --verbosity normal
Expected: FAIL with "IDbConnectionFactory does not contain a definition for 'CreateGiwConnectionAsync'"
Step 3: Write minimal implementation
Update src/JdeScoping.DataAccess/Interfaces/IDbConnectionFactory.cs:
using Microsoft.Data.SqlClient;
using Oracle.ManagedDataAccess.Client;
namespace JdeScoping.DataAccess.Interfaces;
/// <summary>
/// Factory for creating database connections to all data sources.
/// </summary>
public interface IDbConnectionFactory
{
/// <summary>
/// Creates and opens a connection to the LotFinderDB SQL Server cache database.
/// </summary>
Task<SqlConnection> CreateLotFinderConnectionAsync(CancellationToken ct = default);
/// <summary>
/// Creates and opens a connection to the JDE Oracle database (production schema).
/// </summary>
Task<OracleConnection> CreateJdeConnectionAsync(CancellationToken ct = default);
/// <summary>
/// Creates and opens a connection to the JDE Stage Oracle database.
/// </summary>
Task<OracleConnection> CreateJdeStageConnectionAsync(CancellationToken ct = default);
/// <summary>
/// Creates and opens a connection to the CMS Oracle database.
/// </summary>
Task<OracleConnection> CreateCmsConnectionAsync(CancellationToken ct = default);
/// <summary>
/// Creates and opens a connection to the GIW Oracle database (for StatusCode sync).
/// </summary>
Task<OracleConnection> CreateGiwConnectionAsync(CancellationToken ct = default);
}
Update src/JdeScoping.DataAccess/DbConnectionFactory.cs - add method:
/// <inheritdoc/>
public async Task<OracleConnection> CreateGiwConnectionAsync(CancellationToken ct = default)
{
return await CreateOracleConnectionAsync("GIW", ct).ConfigureAwait(false);
}
Step 4: Run test to verify it passes
Run: dotnet test tests/JdeScoping.DataAccess.Tests --filter "FullyQualifiedName~CreateGiwConnectionAsync" --verbosity normal
Expected: PASS
Step 5: Commit
git add src/JdeScoping.DataAccess/Interfaces/IDbConnectionFactory.cs src/JdeScoping.DataAccess/DbConnectionFactory.cs tests/JdeScoping.DataAccess.Tests/DbConnectionFactoryGiwTests.cs
git commit -m "feat(dataaccess): add GIW connection factory method"
Task 5: Update DbQuerySource to Support GIW Connection
Files:
- Modify:
src/JdeScoping.DataSync/Etl/Sources/DbQuerySource.cs - Test:
tests/JdeScoping.DataSync.Tests/Etl/Sources/DbQuerySourceTests.cs
Step 1: Write the failing test
Add test to existing DbQuerySourceTests.cs:
[Fact]
public void Constructor_WithGiwConnectionType_DoesNotThrow()
{
// Arrange
var connectionFactory = Substitute.For<IDbConnectionFactory>();
// Act & Assert - should not throw
var source = new DbQuerySource(connectionFactory, "giw", "SELECT 1", null);
source.SourceName.ShouldBe("DbQuery:giw");
}
Step 2: Run test to verify it fails
Run: dotnet test tests/JdeScoping.DataSync.Tests --filter "FullyQualifiedName~Constructor_WithGiwConnectionType" --verbosity normal
Expected: FAIL with "Unknown connection type: giw"
Step 3: Write minimal implementation
Update src/JdeScoping.DataSync/Etl/Sources/DbQuerySource.cs:
private static readonly HashSet<string> ValidConnectionTypes = new(StringComparer.OrdinalIgnoreCase)
{
"jde", "cms", "lotfinder", "giw"
};
And update CreateConnectionAsync:
private async Task<DbConnection> CreateConnectionAsync(CancellationToken cancellationToken)
{
return _connectionType switch
{
"jde" => await _connectionFactory.CreateJdeConnectionAsync(cancellationToken),
"cms" => await _connectionFactory.CreateCmsConnectionAsync(cancellationToken),
"giw" => await _connectionFactory.CreateGiwConnectionAsync(cancellationToken),
"lotfinder" => await _connectionFactory.CreateLotFinderConnectionAsync(cancellationToken),
_ => throw new InvalidOperationException($"Unknown connection type: {_connectionType}")
};
}
Step 4: Run test to verify it passes
Run: dotnet test tests/JdeScoping.DataSync.Tests --filter "FullyQualifiedName~Constructor_WithGiwConnectionType" --verbosity normal
Expected: PASS
Step 5: Commit
git add src/JdeScoping.DataSync/Etl/Sources/DbQuerySource.cs tests/JdeScoping.DataSync.Tests/Etl/Sources/DbQuerySourceTests.cs
git commit -m "feat(datasync): add GIW connection type to DbQuerySource"
Phase 3: Core ETL Changes
Task 6: Update IEtlPipelineBuilder to Accept UpdateTypes
Files:
- Modify:
src/JdeScoping.DataSync/Contracts/IEtlPipelineFactory.cs - Modify:
src/JdeScoping.DataSync/Services/EtlPipelineFactory.cs
Step 1: Write the failing test
Add to tests/JdeScoping.DataSync.Tests/Services/EtlPipelineFactoryTests.cs:
[Fact]
public void Builder_WithUpdateTypesMass_BuildsPipeline()
{
// Arrange
var config = CreateValidConfigWithSchedules();
var factory = CreateFactory(config);
// Act
var pipeline = factory.ForTable("TestTable")
.WithUpdateType(UpdateTypes.Mass)
.Build();
// Assert
pipeline.ShouldNotBeNull();
}
[Fact]
public void Builder_WithUpdateTypesDaily_BuildsPipeline()
{
// Arrange
var config = CreateValidConfigWithSchedules();
var factory = CreateFactory(config);
// Act
var pipeline = factory.ForTable("TestTable")
.WithUpdateType(UpdateTypes.Daily)
.Build();
// Assert
pipeline.ShouldNotBeNull();
}
[Fact]
public void Builder_WithUpdateTypesHourly_BuildsPipeline()
{
// Arrange
var config = CreateValidConfigWithSchedules();
var factory = CreateFactory(config);
// Act
var pipeline = factory.ForTable("TestTable")
.WithUpdateType(UpdateTypes.Hourly)
.Build();
// Assert
pipeline.ShouldNotBeNull();
}
private PipelinesRoot CreateValidConfigWithSchedules()
{
return new PipelinesRoot(
new PipelineSettings("UTC"),
new ScheduleDefaults(),
new Dictionary<string, PipelineConfig>
{
["TestTable"] = new PipelineConfig(
new SourceConfig("lotfinder", "SELECT * FROM Test WHERE UpdateDt >= @MinDt",
new Dictionary<string, ParameterConfig>
{
["minDt"] = new ParameterConfig("@MinDt", null, "offset", null)
},
"SELECT * FROM Test"),
null, // No old SyncModes
new PipelineSchedules
{
Mass = new ScheduleConfig { PrePurge = true, ReIndex = true },
Daily = new ScheduleConfig(),
Hourly = new ScheduleConfig()
},
null,
new DestinationConfig("TestTable", ["Id"], null),
null,
null)
});
}
Step 2: Run test to verify it fails
Run: dotnet test tests/JdeScoping.DataSync.Tests --filter "FullyQualifiedName~Builder_WithUpdateTypes" --verbosity normal
Expected: FAIL with "'IEtlPipelineBuilder' does not contain a definition for 'WithUpdateType'"
Step 3: Write minimal implementation
Update src/JdeScoping.DataSync/Contracts/IEtlPipelineFactory.cs:
using JdeScoping.Core.Models.Enums;
using JdeScoping.DataSync.Etl.Pipeline;
namespace JdeScoping.DataSync.Contracts;
public interface IEtlPipelineFactory
{
IEtlPipelineBuilder ForTable(string tableName);
}
public interface IEtlPipelineBuilder
{
[Obsolete("Use WithUpdateType instead")]
IEtlPipelineBuilder WithMode(SyncMode mode);
IEtlPipelineBuilder WithUpdateType(UpdateTypes updateType);
IEtlPipelineBuilder WithMinimumDate(DateTime? minDt);
EtlPipeline Build();
}
Update src/JdeScoping.DataSync/Services/EtlPipelineFactory.cs - PipelineBuilder class:
private sealed class PipelineBuilder : IEtlPipelineBuilder
{
private readonly IDbConnectionFactory _connectionFactory;
private readonly string _tableName;
private readonly PipelineConfig _config;
private readonly PipelineSettings _settings;
private readonly ScheduleDefaults _scheduleDefaults;
private readonly ILogger<EtlPipeline> _logger;
private UpdateTypes _updateType = UpdateTypes.Hourly;
private DateTime? _minDtOverride;
public PipelineBuilder(
IDbConnectionFactory connectionFactory,
string tableName,
PipelineConfig config,
PipelineSettings settings,
ScheduleDefaults scheduleDefaults,
ILogger<EtlPipeline> logger)
{
_connectionFactory = connectionFactory;
_tableName = tableName;
_config = config;
_settings = settings;
_scheduleDefaults = scheduleDefaults;
_logger = logger;
}
[Obsolete("Use WithUpdateType instead")]
public IEtlPipelineBuilder WithMode(SyncMode mode)
{
_updateType = mode == SyncMode.Mass ? UpdateTypes.Mass : UpdateTypes.Hourly;
return this;
}
public IEtlPipelineBuilder WithUpdateType(UpdateTypes updateType)
{
_updateType = updateType;
return this;
}
public IEtlPipelineBuilder WithMinimumDate(DateTime? minDt)
{
_minDtOverride = minDt;
return this;
}
public EtlPipeline Build()
{
var scheduleConfig = GetEffectiveScheduleConfig(_updateType);
// Compute MinDt from schedule config
var minDt = _minDtOverride;
// Use massQuery for Mass, regular query for Daily/Hourly
var useMassQuery = _updateType == UpdateTypes.Mass && !string.IsNullOrEmpty(_config.Source.MassQuery);
// Create source with parameter substitution
var source = CreateSource(_config.Source, minDt, useMassQuery);
// Determine destination type (Mass = bulkImport, Daily/Hourly = bulkMerge unless prePurge)
var destType = scheduleConfig.PrePurge ? "bulkImport" : "bulkMerge";
var destination = CreateDestination(destType, _config.Destination, scheduleConfig);
// Build pipeline with scripts
var builder = new EtlPipelineBuilder()
.WithName(_tableName)
.WithSource(source)
.WithDestination(destination)
.WithLogger(_logger);
// Add pre-scripts: config scripts first, then prePurge
foreach (var script in _config.PreScripts ?? [])
{
builder.WithPreScript(new SqlScriptRunner(_connectionFactory, script, $"PreScript:{script.Substring(0, Math.Min(30, script.Length))}"));
}
if (scheduleConfig.PrePurge)
{
var truncateSql = $"TRUNCATE TABLE [{_config.Destination.Table}]";
builder.WithPreScript(new SqlScriptRunner(_connectionFactory, truncateSql, "PrePurge"));
}
// Add post-scripts: reIndex first, then config scripts
if (scheduleConfig.ReIndex)
{
var reindexSql = $"ALTER INDEX ALL ON [{_config.Destination.Table}] REBUILD";
builder.WithPostScript(new SqlScriptRunner(_connectionFactory, reindexSql, "ReIndex"));
}
foreach (var script in _config.PostScripts ?? [])
{
builder.WithPostScript(new SqlScriptRunner(_connectionFactory, script, $"PostScript:{script.Substring(0, Math.Min(30, script.Length))}"));
}
return builder.Build();
}
private ScheduleConfig GetEffectiveScheduleConfig(UpdateTypes updateType)
{
// Get default for this update type
var defaultConfig = updateType switch
{
UpdateTypes.Mass => _scheduleDefaults.Mass,
UpdateTypes.Daily => _scheduleDefaults.Daily,
UpdateTypes.Hourly => _scheduleDefaults.Hourly,
_ => _scheduleDefaults.Hourly
};
// Get pipeline-specific override if exists
var pipelineConfig = updateType switch
{
UpdateTypes.Mass => _config.Schedules?.Mass,
UpdateTypes.Daily => _config.Schedules?.Daily,
UpdateTypes.Hourly => _config.Schedules?.Hourly,
_ => null
};
// Merge: pipeline config overrides defaults
return pipelineConfig?.MergeWith(defaultConfig) ?? defaultConfig;
}
// ... rest of methods updated similarly
}
Step 4: Run test to verify it passes
Run: dotnet test tests/JdeScoping.DataSync.Tests --filter "FullyQualifiedName~Builder_WithUpdateTypes" --verbosity normal
Expected: PASS
Step 5: Commit
git add src/JdeScoping.DataSync/Contracts/IEtlPipelineFactory.cs src/JdeScoping.DataSync/Services/EtlPipelineFactory.cs tests/JdeScoping.DataSync.Tests/Services/EtlPipelineFactoryTests.cs
git commit -m "feat(datasync): add WithUpdateType to IEtlPipelineBuilder"
Task 7: Update TableSyncOperation to Use UpdateTypes
Files:
- Modify:
src/JdeScoping.DataSync/Services/TableSyncOperation.cs
Step 1: Write the failing test
The existing tests should still pass with the refactored code. Add a new test:
[Fact]
public async Task ExecuteAsync_WithDailyUpdateType_UsesDailyConfig()
{
// Arrange
var task = new DataUpdateTask
{
TableName = "TestTable",
SourceSystem = "JDE",
SourceData = "TEST",
UpdateType = UpdateTypes.Daily,
MinimumDt = DateTime.UtcNow.AddDays(-1)
};
// ... setup mocks
// Act
await _operation.ExecuteAsync(task);
// Assert
_mockFactory.Received(1).ForTable("TestTable");
// Verify WithUpdateType(UpdateTypes.Daily) was called
}
Step 2: Run test to verify it fails
Run: dotnet test tests/JdeScoping.DataSync.Tests --filter "FullyQualifiedName~ExecuteAsync_WithDailyUpdateType" --verbosity normal
Expected: FAIL
Step 3: Write minimal implementation
Update src/JdeScoping.DataSync/Services/TableSyncOperation.cs:
private async Task<long> ExecuteSyncCoreAsync(DataUpdateTask task, CancellationToken cancellationToken)
{
_logger.LogDebug("Building pipeline for {Table} with {UpdateType}", task.TableName, task.UpdateType);
// Build and execute the pipeline using UpdateTypes directly
var pipeline = _pipelineFactory
.ForTable(task.TableName)
.WithUpdateType(task.UpdateType)
.WithMinimumDate(task.MinimumDt)
.Build();
var result = await pipeline.ExecuteAsync(cancellationToken);
if (!result.Success)
{
throw new InvalidOperationException(
$"Pipeline failed for {task.TableName}: {result.Error?.Message ?? "Unknown error"}",
result.Error);
}
return result.TotalRows;
}
Step 4: Run test to verify it passes
Run: dotnet test tests/JdeScoping.DataSync.Tests --filter "FullyQualifiedName~TableSyncOperation" --verbosity normal
Expected: PASS
Step 5: Commit
git add src/JdeScoping.DataSync/Services/TableSyncOperation.cs
git commit -m "refactor(datasync): use UpdateTypes in TableSyncOperation"
Task 8: Update DataUpdateRepository for Per-Pipeline Intervals
Files:
- Modify:
src/JdeScoping.DataSync/Services/DataUpdateRepository.cs - Modify:
src/JdeScoping.DataSync/Contracts/IDataUpdateRepository.cs
Step 1: Write the failing test
[Fact]
public async Task GetSyncStatusAsync_WithCustomInterval_UsesProvidedInterval()
{
// Arrange - setup mock to return data
// ...
// Act
var customIntervals = new Dictionary<string, int>
{
["MisData_0"] = 100800 // Mass interval for MisData
};
var status = await _repository.GetSyncStatusAsync(customIntervals);
// Assert
var misDataStatus = status.FirstOrDefault(s => s.TableName == "MisData");
misDataStatus.ShouldNotBeNull();
misDataStatus.ExpectedIntervalMinutes.ShouldBe(100800);
}
Step 2: Run test to verify it fails
Run: dotnet test tests/JdeScoping.DataSync.Tests --filter "FullyQualifiedName~GetSyncStatusAsync_WithCustomInterval" --verbosity normal
Expected: FAIL
Step 3: Write minimal implementation
Update IDataUpdateRepository to accept optional interval overrides, then update DataUpdateRepository.GetSyncStatusAsync to use them.
Step 4: Run test to verify it passes
Run: dotnet test tests/JdeScoping.DataSync.Tests --filter "FullyQualifiedName~GetSyncStatusAsync_WithCustomInterval" --verbosity normal
Expected: PASS
Step 5: Commit
git add src/JdeScoping.DataSync/Contracts/IDataUpdateRepository.cs src/JdeScoping.DataSync/Services/DataUpdateRepository.cs
git commit -m "feat(datasync): support per-pipeline intervals in DataUpdateRepository"
Phase 4: Pipeline Configurations
Task 9: Migrate Existing Pipelines to New Schema
Files:
- Modify:
src/JdeScoping.DataSync/Pipelines/pipelines.json
Step 1: Backup current config
cp src/JdeScoping.DataSync/Pipelines/pipelines.json src/JdeScoping.DataSync/Pipelines/pipelines.json.bak
Step 2: Update pipelines.json
Add scheduleDefaults and convert each pipeline from syncModes to schedules. Keep syncModes for backward compatibility during transition.
Example structure:
{
"settings": {
"timezone": "UTC"
},
"scheduleDefaults": {
"mass": { "enabled": true, "intervalMinutes": 10080, "prePurge": true, "reIndex": true },
"daily": { "enabled": true, "intervalMinutes": 1440, "prePurge": false, "reIndex": false, "updateWhen": "src.LastUpdateDt > tgt.LastUpdateDt" },
"hourly": { "enabled": true, "intervalMinutes": 60, "prePurge": false, "reIndex": false, "updateWhen": "src.LastUpdateDt > tgt.LastUpdateDt" }
},
"pipelines": {
"WorkOrder_Curr": {
"source": {
"connection": "jde",
"massQuery": "SELECT ... FROM {ProductionSchema}.F4801 wo",
"query": "SELECT ... FROM {ProductionSchema}.F4801 wo WHERE (...date filter...)",
"parameters": { ... }
},
"schedules": {
"mass": {},
"daily": {},
"hourly": {}
},
"destination": { ... }
}
}
}
Step 3: Run all tests
Run: dotnet test tests/JdeScoping.DataSync.Tests --verbosity normal
Expected: All PASS
Step 4: Commit
git add src/JdeScoping.DataSync/Pipelines/pipelines.json
git commit -m "refactor(datasync): migrate existing pipelines to new schedule schema"
Task 10: Add 8 Missing Pipeline Definitions
Files:
- Modify:
src/JdeScoping.DataSync/Pipelines/pipelines.json
Add these pipelines using SQL from DATA_SYNC/JDE/*.sql:
- WorkOrderTime_Curr (F31122_VIEW)
- WorkOrderComponent_Curr (F3111_VIEW)
- WorkOrderStep_Curr (F3112_VIEW)
- WorkOrderRouting (F3112Z1_VIEW)
- StatusCode (F0005_VIEW via GIW connection)
- OrgHierarchy (F30006_VIEW)
- RouteMaster (F3003_VIEW)
- FunctionCode (PRODDTA.F00192 - always full reload)
Each pipeline follows this pattern:
"WorkOrderTime_Curr": {
"source": {
"connection": "jde",
"massQuery": "SELECT wot.UNIQUEKEYIDINTERNAL_WTUKID AS UniqueID, TRIM(wot.COSTCENTERALT_WTMMCU) AS BranchCode, wot.DOCUMENTORDERINVOICEE_WTDOCO AS WorkOrderNumber, wot.SEQUENCENOOPERATIONS_WTOPSQ AS StepNumber, wot.ADDRESSNUMBER_WTAN8 AS AddressNumber, wot.DTFORGLANDVOUCH1_WTDGL AS GlDate, wot.DATEUPDATED_WTUPMJ AS DateUpdated, wot.TIMEOFDAY_WTTDAY AS TimeUpdated FROM JDESTAGE.F31122_VIEW wot",
"query": "SELECT wot.UNIQUEKEYIDINTERNAL_WTUKID AS UniqueID, TRIM(wot.COSTCENTERALT_WTMMCU) AS BranchCode, wot.DOCUMENTORDERINVOICEE_WTDOCO AS WorkOrderNumber, wot.SEQUENCENOOPERATIONS_WTOPSQ AS StepNumber, wot.ADDRESSNUMBER_WTAN8 AS AddressNumber, wot.DTFORGLANDVOUCH1_WTDGL AS GlDate, wot.DATEUPDATED_WTUPMJ AS DateUpdated, wot.TIMEOFDAY_WTTDAY AS TimeUpdated FROM JDESTAGE.F31122_VIEW wot WHERE (wot.DATEUPDATED_WTUPMJ > :dateUpdated OR (wot.DATEUPDATED_WTUPMJ = :dateUpdated AND wot.TIMEOFDAY_WTTDAY >= :timeUpdated))",
"parameters": {
"dateUpdated": { "name": ":dateUpdated", "format": "jdeJulian", "source": "offset" },
"timeUpdated": { "name": ":timeUpdated", "format": "jdeTime", "source": "offset" }
}
},
"schedules": {
"mass": {},
"daily": {},
"hourly": {}
},
"destination": {
"table": "WorkOrderTime_Curr",
"matchColumns": ["UniqueID"],
"excludeFromUpdate": ["UniqueID", "LastUpdateDt"]
}
}
Special cases:
StatusCode (uses GIW connection):
"StatusCode": {
"source": {
"connection": "giw",
...
}
}
FunctionCode (always full reload):
"FunctionCode": {
"schedules": {
"mass": { "prePurge": true, "reIndex": true },
"daily": { "prePurge": true, "reIndex": true },
"hourly": { "prePurge": true, "reIndex": true }
}
}
MisData (hourly disabled, custom mass interval):
"MisData": {
"schedules": {
"mass": { "intervalMinutes": 100800 },
"daily": {},
"hourly": { "enabled": false }
}
}
Step 3: Run tests
Run: dotnet test tests/JdeScoping.DataSync.Tests --verbosity normal
Expected: PASS
Step 4: Commit
git add src/JdeScoping.DataSync/Pipelines/pipelines.json
git commit -m "feat(datasync): add 8 missing pipeline definitions"
Task 11: Add GIW Connection String to appsettings
Files:
- Modify:
src/JdeScoping.Host/appsettings.json - Modify:
src/JdeScoping.Host/appsettings.Development.json
Step 1: Update appsettings.json
Add placeholder for GIW connection:
{
"ConnectionStrings": {
"LotFinderDB": "...",
"JDE": "...",
"CMS": "...",
"GIW": ""
}
}
Step 2: Commit
git add src/JdeScoping.Host/appsettings.json src/JdeScoping.Host/appsettings.Development.json
git commit -m "config: add GIW connection string placeholder"
Phase 5: Validation & Testing
Task 12: Update EtlPipelineFactory Validation
Files:
- Modify:
src/JdeScoping.DataSync/Services/EtlPipelineFactory.cs
Update validation to support both old SyncModes and new Schedules format:
private static void ValidateConfig(PipelinesRoot root)
{
foreach (var (name, config) in root.Pipelines)
{
// Accept either old SyncModes or new Schedules format
var hasOldConfig = config.SyncModes != null &&
config.SyncModes.ContainsKey("mass") &&
config.SyncModes.ContainsKey("incremental");
var hasNewConfig = config.Schedules != null;
if (!hasOldConfig && !hasNewConfig)
{
throw new InvalidOperationException(
$"Pipeline '{name}' must define either 'syncModes' (mass+incremental) or 'schedules'.");
}
// Validate no runtime parameters
if (config.Source.Parameters != null)
{
foreach (var (paramName, paramConfig) in config.Source.Parameters)
{
if (paramConfig.Source.Equals("runtime", StringComparison.OrdinalIgnoreCase))
{
throw new NotSupportedException(
$"Pipeline '{name}' parameter '{paramName}': runtime parameter source is not yet supported.");
}
}
}
}
}
Commit:
git add src/JdeScoping.DataSync/Services/EtlPipelineFactory.cs
git commit -m "refactor(datasync): update validation to support both config formats"
Task 13: Update Existing Unit Tests
Files:
- Modify:
tests/JdeScoping.DataSync.Tests/Services/EtlPipelineFactoryTests.cs
Update CreateValidConfig() helper to use new schema format. Ensure all existing tests still pass.
Step 1: Run all tests
Run: dotnet test tests/JdeScoping.DataSync.Tests --verbosity normal
Expected: All PASS
Step 2: Commit
git add tests/JdeScoping.DataSync.Tests/
git commit -m "test(datasync): update tests for new schedule config format"
Task 14: Run Full Test Suite
Step 1: Build entire solution
Run: dotnet build
Expected: Build succeeded with 0 errors
Step 2: Run all tests
Run: dotnet test --verbosity normal
Expected: All tests pass
Step 3: Commit any final fixes
git add .
git commit -m "fix: address test failures from schedule alignment"
Task 15: Final Cleanup - Remove Deprecated SyncMode
Note: This task should be done after confirming all tests pass and the new system works correctly. It's optional and can be deferred.
Files:
- Modify:
src/JdeScoping.DataSync/Contracts/SyncMode.cs- Mark as obsolete - Modify:
src/JdeScoping.DataSync/Configuration/PipelineConfig.cs- Mark SyncModes as obsolete
[Obsolete("Use Schedules property instead")]
public Dictionary<string, SyncModeConfig>? SyncModes { get; init; }
Commit:
git add .
git commit -m "chore(datasync): mark SyncMode as obsolete"
Summary
Total Tasks: 15 Estimated Files Modified: ~18 New Files Created: ~3
Key Changes:
- ScheduleConfig/ScheduleDefaults models added
- PipelinesRoot supports scheduleDefaults
- PipelineConfig supports schedules (mass/daily/hourly)
- GIW connection added for StatusCode
- DbQuerySource supports "giw" connection type
- IEtlPipelineBuilder.WithUpdateType() added
- TableSyncOperation uses UpdateTypes directly
- 8 missing pipelines added
- Backward compatible with existing syncModes format
Verification:
- All existing tests continue to pass
- New tests cover schedule config behavior
- pipelines.json validates with new schema