diff --git a/PLANS/2025-01-07-pipeline-schedule-alignment-implementation.md b/PLANS/2025-01-07-pipeline-schedule-alignment-implementation.md new file mode 100644 index 0000000..413a723 --- /dev/null +++ b/PLANS/2025-01-07-pipeline-schedule-alignment-implementation.md @@ -0,0 +1,1284 @@ +# Pipeline Schedule Alignment Implementation Plan + +> **For Claude:** REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task. + +**Goal:** Align pipelines.json with legacy DataSyncReport.md by supporting three schedules (Mass/Daily/Hourly), adding 8 missing pipelines, and adding GIW connection for StatusCode. + +**Architecture:** Replace `SyncMode` enum with `UpdateTypes` throughout the ETL system. Add `scheduleDefaults` to pipelines.json for global defaults. Each pipeline defines `schedules` (mass/daily/hourly) that inherit from or override defaults. Add GIW connection type for StatusCode pipeline. + +**Tech Stack:** .NET 10, System.Text.Json, Oracle.ManagedDataAccess, Microsoft.Data.SqlClient, xUnit, NSubstitute, Shouldly + +--- + +## Phase 1: Schema & Models + +### Task 1: Add ScheduleConfig and ScheduleDefaults Models + +**Files:** +- Create: `src/JdeScoping.DataSync/Configuration/ScheduleConfig.cs` + +**Step 1: Write the failing test** + +Create test file `tests/JdeScoping.DataSync.Tests/Configuration/ScheduleConfigTests.cs`: + +```csharp +using JdeScoping.DataSync.Configuration; +using Shouldly; + +namespace JdeScoping.DataSync.Tests.Configuration; + +public class ScheduleConfigTests +{ + [Fact] + public void ScheduleConfig_DefaultValues_AreCorrect() + { + var config = new ScheduleConfig(); + + config.Enabled.ShouldBeTrue(); + config.IntervalMinutes.ShouldBe(0); + config.PrePurge.ShouldBeFalse(); + config.ReIndex.ShouldBeFalse(); + config.UpdateWhen.ShouldBeNull(); + } + + [Fact] + public void ScheduleConfig_WithValues_StoresCorrectly() + { + var config = new ScheduleConfig + { + Enabled = false, + IntervalMinutes = 60, + PrePurge = true, + ReIndex = true, + UpdateWhen = "src.LastUpdateDt > tgt.LastUpdateDt" + }; + + config.Enabled.ShouldBeFalse(); + config.IntervalMinutes.ShouldBe(60); + config.PrePurge.ShouldBeTrue(); + config.ReIndex.ShouldBeTrue(); + config.UpdateWhen.ShouldBe("src.LastUpdateDt > tgt.LastUpdateDt"); + } + + [Fact] + public void ScheduleDefaults_HasCorrectDefaultValues() + { + var defaults = new ScheduleDefaults(); + + defaults.Mass.ShouldNotBeNull(); + defaults.Daily.ShouldNotBeNull(); + defaults.Hourly.ShouldNotBeNull(); + } +} +``` + +**Step 2: Run test to verify it fails** + +Run: `dotnet test tests/JdeScoping.DataSync.Tests --filter "FullyQualifiedName~ScheduleConfigTests" --verbosity normal` +Expected: FAIL with "type or namespace 'ScheduleConfig' could not be found" + +**Step 3: Write minimal implementation** + +Create `src/JdeScoping.DataSync/Configuration/ScheduleConfig.cs`: + +```csharp +namespace JdeScoping.DataSync.Configuration; + +/// +/// Configuration for a single schedule type (Mass/Daily/Hourly). +/// +public record ScheduleConfig +{ + /// + /// Whether this schedule is enabled. + /// + public bool Enabled { get; init; } = true; + + /// + /// Interval in minutes between syncs. + /// + public int IntervalMinutes { get; init; } + + /// + /// Whether to truncate the table before import (full reload). + /// + public bool PrePurge { get; init; } + + /// + /// Whether to rebuild indexes after import. + /// + public bool ReIndex { get; init; } + + /// + /// Condition for updating existing rows (e.g., "src.LastUpdateDt > tgt.LastUpdateDt"). + /// + public string? UpdateWhen { get; init; } + + /// + /// Merges this config with defaults. Non-null/non-default values in this config override defaults. + /// + public ScheduleConfig MergeWith(ScheduleConfig defaults) + { + return new ScheduleConfig + { + Enabled = Enabled, + IntervalMinutes = IntervalMinutes > 0 ? IntervalMinutes : defaults.IntervalMinutes, + PrePurge = PrePurge || defaults.PrePurge, + ReIndex = ReIndex || defaults.ReIndex, + UpdateWhen = UpdateWhen ?? defaults.UpdateWhen + }; + } +} + +/// +/// Default schedule configurations for all pipelines. +/// +public record ScheduleDefaults +{ + /// + /// Default Mass schedule config (weekly, full reload). + /// + public ScheduleConfig Mass { get; init; } = new() + { + Enabled = true, + IntervalMinutes = 10080, // Weekly + PrePurge = true, + ReIndex = true + }; + + /// + /// Default Daily schedule config (incremental merge). + /// + public ScheduleConfig Daily { get; init; } = new() + { + Enabled = true, + IntervalMinutes = 1440, // Daily + PrePurge = false, + ReIndex = false, + UpdateWhen = "src.LastUpdateDt > tgt.LastUpdateDt" + }; + + /// + /// Default Hourly schedule config (incremental merge). + /// + public ScheduleConfig Hourly { get; init; } = new() + { + Enabled = true, + IntervalMinutes = 60, // Hourly + PrePurge = false, + ReIndex = false, + UpdateWhen = "src.LastUpdateDt > tgt.LastUpdateDt" + }; +} + +/// +/// Per-pipeline schedule overrides. +/// +public record PipelineSchedules +{ + public ScheduleConfig? Mass { get; init; } + public ScheduleConfig? Daily { get; init; } + public ScheduleConfig? Hourly { get; init; } +} +``` + +**Step 4: Run test to verify it passes** + +Run: `dotnet test tests/JdeScoping.DataSync.Tests --filter "FullyQualifiedName~ScheduleConfigTests" --verbosity normal` +Expected: PASS + +**Step 5: Commit** + +```bash +git add src/JdeScoping.DataSync/Configuration/ScheduleConfig.cs tests/JdeScoping.DataSync.Tests/Configuration/ScheduleConfigTests.cs +git commit -m "feat(datasync): add ScheduleConfig and ScheduleDefaults models" +``` + +--- + +### Task 2: Update PipelinesRoot to Include ScheduleDefaults + +**Files:** +- Modify: `src/JdeScoping.DataSync/Configuration/PipelinesRoot.cs` +- Test: `tests/JdeScoping.DataSync.Tests/Configuration/PipelinesRootTests.cs` + +**Step 1: Write the failing test** + +Create `tests/JdeScoping.DataSync.Tests/Configuration/PipelinesRootTests.cs`: + +```csharp +using JdeScoping.DataSync.Configuration; +using Shouldly; + +namespace JdeScoping.DataSync.Tests.Configuration; + +public class PipelinesRootTests +{ + [Fact] + public void EffectiveScheduleDefaults_WhenNull_ReturnsDefaults() + { + var root = new PipelinesRoot(null, null, new Dictionary()); + + var defaults = root.EffectiveScheduleDefaults; + + defaults.ShouldNotBeNull(); + defaults.Mass.IntervalMinutes.ShouldBe(10080); + defaults.Daily.IntervalMinutes.ShouldBe(1440); + defaults.Hourly.IntervalMinutes.ShouldBe(60); + } + + [Fact] + public void EffectiveScheduleDefaults_WhenProvided_ReturnsProvided() + { + var customDefaults = new ScheduleDefaults + { + Mass = new ScheduleConfig { IntervalMinutes = 20000 } + }; + var root = new PipelinesRoot(null, customDefaults, new Dictionary()); + + var defaults = root.EffectiveScheduleDefaults; + + defaults.Mass.IntervalMinutes.ShouldBe(20000); + } +} +``` + +**Step 2: Run test to verify it fails** + +Run: `dotnet test tests/JdeScoping.DataSync.Tests --filter "FullyQualifiedName~PipelinesRootTests" --verbosity normal` +Expected: FAIL + +**Step 3: Write minimal implementation** + +Update `src/JdeScoping.DataSync/Configuration/PipelinesRoot.cs`: + +```csharp +namespace JdeScoping.DataSync.Configuration; + +public record PipelinesRoot( + PipelineSettings? Settings, + ScheduleDefaults? ScheduleDefaults, + Dictionary Pipelines) +{ + public PipelineSettings EffectiveSettings => Settings ?? new PipelineSettings(); + public ScheduleDefaults EffectiveScheduleDefaults => ScheduleDefaults ?? new ScheduleDefaults(); +} + +public record PipelineSettings( + string Timezone = "UTC"); +``` + +**Step 4: Run test to verify it passes** + +Run: `dotnet test tests/JdeScoping.DataSync.Tests --filter "FullyQualifiedName~PipelinesRootTests" --verbosity normal` +Expected: PASS + +**Step 5: Commit** + +```bash +git add src/JdeScoping.DataSync/Configuration/PipelinesRoot.cs tests/JdeScoping.DataSync.Tests/Configuration/PipelinesRootTests.cs +git commit -m "feat(datasync): add ScheduleDefaults to PipelinesRoot" +``` + +--- + +### Task 3: Update PipelineConfig to Include Schedules + +**Files:** +- Modify: `src/JdeScoping.DataSync/Configuration/PipelineConfig.cs` + +**Step 1: Write the failing test** + +Add to `tests/JdeScoping.DataSync.Tests/Configuration/PipelinesRootTests.cs`: + +```csharp +[Fact] +public void PipelineConfig_WithSchedules_ParsesCorrectly() +{ + var config = new PipelineConfig( + new SourceConfig("jde", "SELECT 1", null, null), + null, // Old SyncModes - deprecated + new PipelineSchedules + { + Mass = new ScheduleConfig { PrePurge = true, ReIndex = true }, + Daily = new ScheduleConfig { Enabled = true }, + Hourly = new ScheduleConfig { Enabled = false } + }, + null, + new DestinationConfig("TestTable", ["Id"], null), + null, + null); + + config.Schedules.ShouldNotBeNull(); + config.Schedules!.Mass!.PrePurge.ShouldBeTrue(); + config.Schedules!.Hourly!.Enabled.ShouldBeFalse(); +} +``` + +**Step 2: Run test to verify it fails** + +Run: `dotnet test tests/JdeScoping.DataSync.Tests --filter "FullyQualifiedName~PipelineConfig_WithSchedules" --verbosity normal` +Expected: FAIL + +**Step 3: Write minimal implementation** + +Update `src/JdeScoping.DataSync/Configuration/PipelineConfig.cs`: + +```csharp +namespace JdeScoping.DataSync.Configuration; + +public record PipelineConfig( + SourceConfig Source, + Dictionary? SyncModes, // Deprecated - kept for backward compatibility + PipelineSchedules? Schedules, // New schedule-based config + List? Transformers, + DestinationConfig Destination, + List? PreScripts, + List? PostScripts); + +public record SourceConfig( + string Connection, + string Query, + Dictionary? Parameters, + string? MassQuery = null); + +public record ParameterConfig( + string Name, + string? Format, + string Source = "offset", + string? Value = null); + +public record SyncModeConfig( + string? MinDtOffset, + bool PrePurge = false, + bool ReIndex = false, + string? UpdateWhen = null, + DestinationOverride? Destination = null); + +public record DestinationOverride( + string? Type, + List? MatchColumns, + List? ExcludeFromUpdate); + +public record TransformerConfig( + string Type, + List? Columns, + Dictionary? Mappings); + +public record DestinationConfig( + string Table, + List? MatchColumns, + List? ExcludeFromUpdate); +``` + +**Step 4: Run test to verify it passes** + +Run: `dotnet test tests/JdeScoping.DataSync.Tests --filter "FullyQualifiedName~PipelineConfig_WithSchedules" --verbosity normal` +Expected: PASS + +**Step 5: Commit** + +```bash +git add src/JdeScoping.DataSync/Configuration/PipelineConfig.cs tests/JdeScoping.DataSync.Tests/Configuration/PipelinesRootTests.cs +git commit -m "feat(datasync): add Schedules property to PipelineConfig" +``` + +--- + +## Phase 2: Infrastructure Changes (GIW Connection) + +### Task 4: Add GIW Connection to IDbConnectionFactory + +**Files:** +- Modify: `src/JdeScoping.DataAccess/Interfaces/IDbConnectionFactory.cs` +- Modify: `src/JdeScoping.DataAccess/DbConnectionFactory.cs` +- Test: `tests/JdeScoping.DataAccess.Tests/DbConnectionFactoryTests.cs` + +**Step 1: Write the failing test** + +Add to existing test file or create `tests/JdeScoping.DataAccess.Tests/DbConnectionFactoryGiwTests.cs`: + +```csharp +using JdeScoping.DataAccess; +using JdeScoping.DataAccess.Exceptions; +using JdeScoping.DataAccess.Interfaces; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.Logging.Abstractions; +using Shouldly; + +namespace JdeScoping.DataAccess.Tests; + +public class DbConnectionFactoryGiwTests +{ + [Fact] + public async Task CreateGiwConnectionAsync_WhenConnectionStringMissing_ThrowsConnectionException() + { + // Arrange + var config = new ConfigurationBuilder() + .AddInMemoryCollection(new Dictionary()) + .Build(); + var factory = new DbConnectionFactory(config, NullLogger.Instance); + + // Act & Assert + var ex = await Should.ThrowAsync( + () => factory.CreateGiwConnectionAsync()); + ex.DataSource.ShouldBe("GIW"); + } +} +``` + +**Step 2: Run test to verify it fails** + +Run: `dotnet test tests/JdeScoping.DataAccess.Tests --filter "FullyQualifiedName~CreateGiwConnectionAsync" --verbosity normal` +Expected: FAIL with "IDbConnectionFactory does not contain a definition for 'CreateGiwConnectionAsync'" + +**Step 3: Write minimal implementation** + +Update `src/JdeScoping.DataAccess/Interfaces/IDbConnectionFactory.cs`: + +```csharp +using Microsoft.Data.SqlClient; +using Oracle.ManagedDataAccess.Client; + +namespace JdeScoping.DataAccess.Interfaces; + +/// +/// Factory for creating database connections to all data sources. +/// +public interface IDbConnectionFactory +{ + /// + /// Creates and opens a connection to the LotFinderDB SQL Server cache database. + /// + Task CreateLotFinderConnectionAsync(CancellationToken ct = default); + + /// + /// Creates and opens a connection to the JDE Oracle database (production schema). + /// + Task CreateJdeConnectionAsync(CancellationToken ct = default); + + /// + /// Creates and opens a connection to the JDE Stage Oracle database. + /// + Task CreateJdeStageConnectionAsync(CancellationToken ct = default); + + /// + /// Creates and opens a connection to the CMS Oracle database. + /// + Task CreateCmsConnectionAsync(CancellationToken ct = default); + + /// + /// Creates and opens a connection to the GIW Oracle database (for StatusCode sync). + /// + Task CreateGiwConnectionAsync(CancellationToken ct = default); +} +``` + +Update `src/JdeScoping.DataAccess/DbConnectionFactory.cs` - add method: + +```csharp +/// +public async Task CreateGiwConnectionAsync(CancellationToken ct = default) +{ + return await CreateOracleConnectionAsync("GIW", ct).ConfigureAwait(false); +} +``` + +**Step 4: Run test to verify it passes** + +Run: `dotnet test tests/JdeScoping.DataAccess.Tests --filter "FullyQualifiedName~CreateGiwConnectionAsync" --verbosity normal` +Expected: PASS + +**Step 5: Commit** + +```bash +git add src/JdeScoping.DataAccess/Interfaces/IDbConnectionFactory.cs src/JdeScoping.DataAccess/DbConnectionFactory.cs tests/JdeScoping.DataAccess.Tests/DbConnectionFactoryGiwTests.cs +git commit -m "feat(dataaccess): add GIW connection factory method" +``` + +--- + +### Task 5: Update DbQuerySource to Support GIW Connection + +**Files:** +- Modify: `src/JdeScoping.DataSync/Etl/Sources/DbQuerySource.cs` +- Test: `tests/JdeScoping.DataSync.Tests/Etl/Sources/DbQuerySourceTests.cs` + +**Step 1: Write the failing test** + +Add test to existing `DbQuerySourceTests.cs`: + +```csharp +[Fact] +public void Constructor_WithGiwConnectionType_DoesNotThrow() +{ + // Arrange + var connectionFactory = Substitute.For(); + + // Act & Assert - should not throw + var source = new DbQuerySource(connectionFactory, "giw", "SELECT 1", null); + source.SourceName.ShouldBe("DbQuery:giw"); +} +``` + +**Step 2: Run test to verify it fails** + +Run: `dotnet test tests/JdeScoping.DataSync.Tests --filter "FullyQualifiedName~Constructor_WithGiwConnectionType" --verbosity normal` +Expected: FAIL with "Unknown connection type: giw" + +**Step 3: Write minimal implementation** + +Update `src/JdeScoping.DataSync/Etl/Sources/DbQuerySource.cs`: + +```csharp +private static readonly HashSet ValidConnectionTypes = new(StringComparer.OrdinalIgnoreCase) +{ + "jde", "cms", "lotfinder", "giw" +}; +``` + +And update `CreateConnectionAsync`: + +```csharp +private async Task CreateConnectionAsync(CancellationToken cancellationToken) +{ + return _connectionType switch + { + "jde" => await _connectionFactory.CreateJdeConnectionAsync(cancellationToken), + "cms" => await _connectionFactory.CreateCmsConnectionAsync(cancellationToken), + "giw" => await _connectionFactory.CreateGiwConnectionAsync(cancellationToken), + "lotfinder" => await _connectionFactory.CreateLotFinderConnectionAsync(cancellationToken), + _ => throw new InvalidOperationException($"Unknown connection type: {_connectionType}") + }; +} +``` + +**Step 4: Run test to verify it passes** + +Run: `dotnet test tests/JdeScoping.DataSync.Tests --filter "FullyQualifiedName~Constructor_WithGiwConnectionType" --verbosity normal` +Expected: PASS + +**Step 5: Commit** + +```bash +git add src/JdeScoping.DataSync/Etl/Sources/DbQuerySource.cs tests/JdeScoping.DataSync.Tests/Etl/Sources/DbQuerySourceTests.cs +git commit -m "feat(datasync): add GIW connection type to DbQuerySource" +``` + +--- + +## Phase 3: Core ETL Changes + +### Task 6: Update IEtlPipelineBuilder to Accept UpdateTypes + +**Files:** +- Modify: `src/JdeScoping.DataSync/Contracts/IEtlPipelineFactory.cs` +- Modify: `src/JdeScoping.DataSync/Services/EtlPipelineFactory.cs` + +**Step 1: Write the failing test** + +Add to `tests/JdeScoping.DataSync.Tests/Services/EtlPipelineFactoryTests.cs`: + +```csharp +[Fact] +public void Builder_WithUpdateTypesMass_BuildsPipeline() +{ + // Arrange + var config = CreateValidConfigWithSchedules(); + var factory = CreateFactory(config); + + // Act + var pipeline = factory.ForTable("TestTable") + .WithUpdateType(UpdateTypes.Mass) + .Build(); + + // Assert + pipeline.ShouldNotBeNull(); +} + +[Fact] +public void Builder_WithUpdateTypesDaily_BuildsPipeline() +{ + // Arrange + var config = CreateValidConfigWithSchedules(); + var factory = CreateFactory(config); + + // Act + var pipeline = factory.ForTable("TestTable") + .WithUpdateType(UpdateTypes.Daily) + .Build(); + + // Assert + pipeline.ShouldNotBeNull(); +} + +[Fact] +public void Builder_WithUpdateTypesHourly_BuildsPipeline() +{ + // Arrange + var config = CreateValidConfigWithSchedules(); + var factory = CreateFactory(config); + + // Act + var pipeline = factory.ForTable("TestTable") + .WithUpdateType(UpdateTypes.Hourly) + .Build(); + + // Assert + pipeline.ShouldNotBeNull(); +} + +private PipelinesRoot CreateValidConfigWithSchedules() +{ + return new PipelinesRoot( + new PipelineSettings("UTC"), + new ScheduleDefaults(), + new Dictionary + { + ["TestTable"] = new PipelineConfig( + new SourceConfig("lotfinder", "SELECT * FROM Test WHERE UpdateDt >= @MinDt", + new Dictionary + { + ["minDt"] = new ParameterConfig("@MinDt", null, "offset", null) + }, + "SELECT * FROM Test"), + null, // No old SyncModes + new PipelineSchedules + { + Mass = new ScheduleConfig { PrePurge = true, ReIndex = true }, + Daily = new ScheduleConfig(), + Hourly = new ScheduleConfig() + }, + null, + new DestinationConfig("TestTable", ["Id"], null), + null, + null) + }); +} +``` + +**Step 2: Run test to verify it fails** + +Run: `dotnet test tests/JdeScoping.DataSync.Tests --filter "FullyQualifiedName~Builder_WithUpdateTypes" --verbosity normal` +Expected: FAIL with "'IEtlPipelineBuilder' does not contain a definition for 'WithUpdateType'" + +**Step 3: Write minimal implementation** + +Update `src/JdeScoping.DataSync/Contracts/IEtlPipelineFactory.cs`: + +```csharp +using JdeScoping.Core.Models.Enums; +using JdeScoping.DataSync.Etl.Pipeline; + +namespace JdeScoping.DataSync.Contracts; + +public interface IEtlPipelineFactory +{ + IEtlPipelineBuilder ForTable(string tableName); +} + +public interface IEtlPipelineBuilder +{ + [Obsolete("Use WithUpdateType instead")] + IEtlPipelineBuilder WithMode(SyncMode mode); + + IEtlPipelineBuilder WithUpdateType(UpdateTypes updateType); + IEtlPipelineBuilder WithMinimumDate(DateTime? minDt); + EtlPipeline Build(); +} +``` + +Update `src/JdeScoping.DataSync/Services/EtlPipelineFactory.cs` - PipelineBuilder class: + +```csharp +private sealed class PipelineBuilder : IEtlPipelineBuilder +{ + private readonly IDbConnectionFactory _connectionFactory; + private readonly string _tableName; + private readonly PipelineConfig _config; + private readonly PipelineSettings _settings; + private readonly ScheduleDefaults _scheduleDefaults; + private readonly ILogger _logger; + private UpdateTypes _updateType = UpdateTypes.Hourly; + private DateTime? _minDtOverride; + + public PipelineBuilder( + IDbConnectionFactory connectionFactory, + string tableName, + PipelineConfig config, + PipelineSettings settings, + ScheduleDefaults scheduleDefaults, + ILogger logger) + { + _connectionFactory = connectionFactory; + _tableName = tableName; + _config = config; + _settings = settings; + _scheduleDefaults = scheduleDefaults; + _logger = logger; + } + + [Obsolete("Use WithUpdateType instead")] + public IEtlPipelineBuilder WithMode(SyncMode mode) + { + _updateType = mode == SyncMode.Mass ? UpdateTypes.Mass : UpdateTypes.Hourly; + return this; + } + + public IEtlPipelineBuilder WithUpdateType(UpdateTypes updateType) + { + _updateType = updateType; + return this; + } + + public IEtlPipelineBuilder WithMinimumDate(DateTime? minDt) + { + _minDtOverride = minDt; + return this; + } + + public EtlPipeline Build() + { + var scheduleConfig = GetEffectiveScheduleConfig(_updateType); + + // Compute MinDt from schedule config + var minDt = _minDtOverride; + + // Use massQuery for Mass, regular query for Daily/Hourly + var useMassQuery = _updateType == UpdateTypes.Mass && !string.IsNullOrEmpty(_config.Source.MassQuery); + + // Create source with parameter substitution + var source = CreateSource(_config.Source, minDt, useMassQuery); + + // Determine destination type (Mass = bulkImport, Daily/Hourly = bulkMerge unless prePurge) + var destType = scheduleConfig.PrePurge ? "bulkImport" : "bulkMerge"; + var destination = CreateDestination(destType, _config.Destination, scheduleConfig); + + // Build pipeline with scripts + var builder = new EtlPipelineBuilder() + .WithName(_tableName) + .WithSource(source) + .WithDestination(destination) + .WithLogger(_logger); + + // Add pre-scripts: config scripts first, then prePurge + foreach (var script in _config.PreScripts ?? []) + { + builder.WithPreScript(new SqlScriptRunner(_connectionFactory, script, $"PreScript:{script.Substring(0, Math.Min(30, script.Length))}")); + } + + if (scheduleConfig.PrePurge) + { + var truncateSql = $"TRUNCATE TABLE [{_config.Destination.Table}]"; + builder.WithPreScript(new SqlScriptRunner(_connectionFactory, truncateSql, "PrePurge")); + } + + // Add post-scripts: reIndex first, then config scripts + if (scheduleConfig.ReIndex) + { + var reindexSql = $"ALTER INDEX ALL ON [{_config.Destination.Table}] REBUILD"; + builder.WithPostScript(new SqlScriptRunner(_connectionFactory, reindexSql, "ReIndex")); + } + + foreach (var script in _config.PostScripts ?? []) + { + builder.WithPostScript(new SqlScriptRunner(_connectionFactory, script, $"PostScript:{script.Substring(0, Math.Min(30, script.Length))}")); + } + + return builder.Build(); + } + + private ScheduleConfig GetEffectiveScheduleConfig(UpdateTypes updateType) + { + // Get default for this update type + var defaultConfig = updateType switch + { + UpdateTypes.Mass => _scheduleDefaults.Mass, + UpdateTypes.Daily => _scheduleDefaults.Daily, + UpdateTypes.Hourly => _scheduleDefaults.Hourly, + _ => _scheduleDefaults.Hourly + }; + + // Get pipeline-specific override if exists + var pipelineConfig = updateType switch + { + UpdateTypes.Mass => _config.Schedules?.Mass, + UpdateTypes.Daily => _config.Schedules?.Daily, + UpdateTypes.Hourly => _config.Schedules?.Hourly, + _ => null + }; + + // Merge: pipeline config overrides defaults + return pipelineConfig?.MergeWith(defaultConfig) ?? defaultConfig; + } + + // ... rest of methods updated similarly +} +``` + +**Step 4: Run test to verify it passes** + +Run: `dotnet test tests/JdeScoping.DataSync.Tests --filter "FullyQualifiedName~Builder_WithUpdateTypes" --verbosity normal` +Expected: PASS + +**Step 5: Commit** + +```bash +git add src/JdeScoping.DataSync/Contracts/IEtlPipelineFactory.cs src/JdeScoping.DataSync/Services/EtlPipelineFactory.cs tests/JdeScoping.DataSync.Tests/Services/EtlPipelineFactoryTests.cs +git commit -m "feat(datasync): add WithUpdateType to IEtlPipelineBuilder" +``` + +--- + +### Task 7: Update TableSyncOperation to Use UpdateTypes + +**Files:** +- Modify: `src/JdeScoping.DataSync/Services/TableSyncOperation.cs` + +**Step 1: Write the failing test** + +The existing tests should still pass with the refactored code. Add a new test: + +```csharp +[Fact] +public async Task ExecuteAsync_WithDailyUpdateType_UsesDailyConfig() +{ + // Arrange + var task = new DataUpdateTask + { + TableName = "TestTable", + SourceSystem = "JDE", + SourceData = "TEST", + UpdateType = UpdateTypes.Daily, + MinimumDt = DateTime.UtcNow.AddDays(-1) + }; + + // ... setup mocks + + // Act + await _operation.ExecuteAsync(task); + + // Assert + _mockFactory.Received(1).ForTable("TestTable"); + // Verify WithUpdateType(UpdateTypes.Daily) was called +} +``` + +**Step 2: Run test to verify it fails** + +Run: `dotnet test tests/JdeScoping.DataSync.Tests --filter "FullyQualifiedName~ExecuteAsync_WithDailyUpdateType" --verbosity normal` +Expected: FAIL + +**Step 3: Write minimal implementation** + +Update `src/JdeScoping.DataSync/Services/TableSyncOperation.cs`: + +```csharp +private async Task ExecuteSyncCoreAsync(DataUpdateTask task, CancellationToken cancellationToken) +{ + _logger.LogDebug("Building pipeline for {Table} with {UpdateType}", task.TableName, task.UpdateType); + + // Build and execute the pipeline using UpdateTypes directly + var pipeline = _pipelineFactory + .ForTable(task.TableName) + .WithUpdateType(task.UpdateType) + .WithMinimumDate(task.MinimumDt) + .Build(); + + var result = await pipeline.ExecuteAsync(cancellationToken); + + if (!result.Success) + { + throw new InvalidOperationException( + $"Pipeline failed for {task.TableName}: {result.Error?.Message ?? "Unknown error"}", + result.Error); + } + + return result.TotalRows; +} +``` + +**Step 4: Run test to verify it passes** + +Run: `dotnet test tests/JdeScoping.DataSync.Tests --filter "FullyQualifiedName~TableSyncOperation" --verbosity normal` +Expected: PASS + +**Step 5: Commit** + +```bash +git add src/JdeScoping.DataSync/Services/TableSyncOperation.cs +git commit -m "refactor(datasync): use UpdateTypes in TableSyncOperation" +``` + +--- + +### Task 8: Update DataUpdateRepository for Per-Pipeline Intervals + +**Files:** +- Modify: `src/JdeScoping.DataSync/Services/DataUpdateRepository.cs` +- Modify: `src/JdeScoping.DataSync/Contracts/IDataUpdateRepository.cs` + +**Step 1: Write the failing test** + +```csharp +[Fact] +public async Task GetSyncStatusAsync_WithCustomInterval_UsesProvidedInterval() +{ + // Arrange - setup mock to return data + // ... + + // Act + var customIntervals = new Dictionary + { + ["MisData_0"] = 100800 // Mass interval for MisData + }; + var status = await _repository.GetSyncStatusAsync(customIntervals); + + // Assert + var misDataStatus = status.FirstOrDefault(s => s.TableName == "MisData"); + misDataStatus.ShouldNotBeNull(); + misDataStatus.ExpectedIntervalMinutes.ShouldBe(100800); +} +``` + +**Step 2: Run test to verify it fails** + +Run: `dotnet test tests/JdeScoping.DataSync.Tests --filter "FullyQualifiedName~GetSyncStatusAsync_WithCustomInterval" --verbosity normal` +Expected: FAIL + +**Step 3: Write minimal implementation** + +Update `IDataUpdateRepository` to accept optional interval overrides, then update `DataUpdateRepository.GetSyncStatusAsync` to use them. + +**Step 4: Run test to verify it passes** + +Run: `dotnet test tests/JdeScoping.DataSync.Tests --filter "FullyQualifiedName~GetSyncStatusAsync_WithCustomInterval" --verbosity normal` +Expected: PASS + +**Step 5: Commit** + +```bash +git add src/JdeScoping.DataSync/Contracts/IDataUpdateRepository.cs src/JdeScoping.DataSync/Services/DataUpdateRepository.cs +git commit -m "feat(datasync): support per-pipeline intervals in DataUpdateRepository" +``` + +--- + +## Phase 4: Pipeline Configurations + +### Task 9: Migrate Existing Pipelines to New Schema + +**Files:** +- Modify: `src/JdeScoping.DataSync/Pipelines/pipelines.json` + +**Step 1: Backup current config** + +```bash +cp src/JdeScoping.DataSync/Pipelines/pipelines.json src/JdeScoping.DataSync/Pipelines/pipelines.json.bak +``` + +**Step 2: Update pipelines.json** + +Add `scheduleDefaults` and convert each pipeline from `syncModes` to `schedules`. Keep `syncModes` for backward compatibility during transition. + +Example structure: + +```json +{ + "settings": { + "timezone": "UTC" + }, + "scheduleDefaults": { + "mass": { "enabled": true, "intervalMinutes": 10080, "prePurge": true, "reIndex": true }, + "daily": { "enabled": true, "intervalMinutes": 1440, "prePurge": false, "reIndex": false, "updateWhen": "src.LastUpdateDt > tgt.LastUpdateDt" }, + "hourly": { "enabled": true, "intervalMinutes": 60, "prePurge": false, "reIndex": false, "updateWhen": "src.LastUpdateDt > tgt.LastUpdateDt" } + }, + "pipelines": { + "WorkOrder_Curr": { + "source": { + "connection": "jde", + "massQuery": "SELECT ... FROM {ProductionSchema}.F4801 wo", + "query": "SELECT ... FROM {ProductionSchema}.F4801 wo WHERE (...date filter...)", + "parameters": { ... } + }, + "schedules": { + "mass": {}, + "daily": {}, + "hourly": {} + }, + "destination": { ... } + } + } +} +``` + +**Step 3: Run all tests** + +Run: `dotnet test tests/JdeScoping.DataSync.Tests --verbosity normal` +Expected: All PASS + +**Step 4: Commit** + +```bash +git add src/JdeScoping.DataSync/Pipelines/pipelines.json +git commit -m "refactor(datasync): migrate existing pipelines to new schedule schema" +``` + +--- + +### Task 10: Add 8 Missing Pipeline Definitions + +**Files:** +- Modify: `src/JdeScoping.DataSync/Pipelines/pipelines.json` + +Add these pipelines using SQL from `DATA_SYNC/JDE/*.sql`: + +1. **WorkOrderTime_Curr** (F31122_VIEW) +2. **WorkOrderComponent_Curr** (F3111_VIEW) +3. **WorkOrderStep_Curr** (F3112_VIEW) +4. **WorkOrderRouting** (F3112Z1_VIEW) +5. **StatusCode** (F0005_VIEW via GIW connection) +6. **OrgHierarchy** (F30006_VIEW) +7. **RouteMaster** (F3003_VIEW) +8. **FunctionCode** (PRODDTA.F00192 - always full reload) + +Each pipeline follows this pattern: + +```json +"WorkOrderTime_Curr": { + "source": { + "connection": "jde", + "massQuery": "SELECT wot.UNIQUEKEYIDINTERNAL_WTUKID AS UniqueID, TRIM(wot.COSTCENTERALT_WTMMCU) AS BranchCode, wot.DOCUMENTORDERINVOICEE_WTDOCO AS WorkOrderNumber, wot.SEQUENCENOOPERATIONS_WTOPSQ AS StepNumber, wot.ADDRESSNUMBER_WTAN8 AS AddressNumber, wot.DTFORGLANDVOUCH1_WTDGL AS GlDate, wot.DATEUPDATED_WTUPMJ AS DateUpdated, wot.TIMEOFDAY_WTTDAY AS TimeUpdated FROM JDESTAGE.F31122_VIEW wot", + "query": "SELECT wot.UNIQUEKEYIDINTERNAL_WTUKID AS UniqueID, TRIM(wot.COSTCENTERALT_WTMMCU) AS BranchCode, wot.DOCUMENTORDERINVOICEE_WTDOCO AS WorkOrderNumber, wot.SEQUENCENOOPERATIONS_WTOPSQ AS StepNumber, wot.ADDRESSNUMBER_WTAN8 AS AddressNumber, wot.DTFORGLANDVOUCH1_WTDGL AS GlDate, wot.DATEUPDATED_WTUPMJ AS DateUpdated, wot.TIMEOFDAY_WTTDAY AS TimeUpdated FROM JDESTAGE.F31122_VIEW wot WHERE (wot.DATEUPDATED_WTUPMJ > :dateUpdated OR (wot.DATEUPDATED_WTUPMJ = :dateUpdated AND wot.TIMEOFDAY_WTTDAY >= :timeUpdated))", + "parameters": { + "dateUpdated": { "name": ":dateUpdated", "format": "jdeJulian", "source": "offset" }, + "timeUpdated": { "name": ":timeUpdated", "format": "jdeTime", "source": "offset" } + } + }, + "schedules": { + "mass": {}, + "daily": {}, + "hourly": {} + }, + "destination": { + "table": "WorkOrderTime_Curr", + "matchColumns": ["UniqueID"], + "excludeFromUpdate": ["UniqueID", "LastUpdateDt"] + } +} +``` + +Special cases: + +**StatusCode** (uses GIW connection): +```json +"StatusCode": { + "source": { + "connection": "giw", + ... + } +} +``` + +**FunctionCode** (always full reload): +```json +"FunctionCode": { + "schedules": { + "mass": { "prePurge": true, "reIndex": true }, + "daily": { "prePurge": true, "reIndex": true }, + "hourly": { "prePurge": true, "reIndex": true } + } +} +``` + +**MisData** (hourly disabled, custom mass interval): +```json +"MisData": { + "schedules": { + "mass": { "intervalMinutes": 100800 }, + "daily": {}, + "hourly": { "enabled": false } + } +} +``` + +**Step 3: Run tests** + +Run: `dotnet test tests/JdeScoping.DataSync.Tests --verbosity normal` +Expected: PASS + +**Step 4: Commit** + +```bash +git add src/JdeScoping.DataSync/Pipelines/pipelines.json +git commit -m "feat(datasync): add 8 missing pipeline definitions" +``` + +--- + +### Task 11: Add GIW Connection String to appsettings + +**Files:** +- Modify: `src/JdeScoping.Host/appsettings.json` +- Modify: `src/JdeScoping.Host/appsettings.Development.json` + +**Step 1: Update appsettings.json** + +Add placeholder for GIW connection: + +```json +{ + "ConnectionStrings": { + "LotFinderDB": "...", + "JDE": "...", + "CMS": "...", + "GIW": "" + } +} +``` + +**Step 2: Commit** + +```bash +git add src/JdeScoping.Host/appsettings.json src/JdeScoping.Host/appsettings.Development.json +git commit -m "config: add GIW connection string placeholder" +``` + +--- + +## Phase 5: Validation & Testing + +### Task 12: Update EtlPipelineFactory Validation + +**Files:** +- Modify: `src/JdeScoping.DataSync/Services/EtlPipelineFactory.cs` + +Update validation to support both old `SyncModes` and new `Schedules` format: + +```csharp +private static void ValidateConfig(PipelinesRoot root) +{ + foreach (var (name, config) in root.Pipelines) + { + // Accept either old SyncModes or new Schedules format + var hasOldConfig = config.SyncModes != null && + config.SyncModes.ContainsKey("mass") && + config.SyncModes.ContainsKey("incremental"); + var hasNewConfig = config.Schedules != null; + + if (!hasOldConfig && !hasNewConfig) + { + throw new InvalidOperationException( + $"Pipeline '{name}' must define either 'syncModes' (mass+incremental) or 'schedules'."); + } + + // Validate no runtime parameters + if (config.Source.Parameters != null) + { + foreach (var (paramName, paramConfig) in config.Source.Parameters) + { + if (paramConfig.Source.Equals("runtime", StringComparison.OrdinalIgnoreCase)) + { + throw new NotSupportedException( + $"Pipeline '{name}' parameter '{paramName}': runtime parameter source is not yet supported."); + } + } + } + } +} +``` + +**Commit:** + +```bash +git add src/JdeScoping.DataSync/Services/EtlPipelineFactory.cs +git commit -m "refactor(datasync): update validation to support both config formats" +``` + +--- + +### Task 13: Update Existing Unit Tests + +**Files:** +- Modify: `tests/JdeScoping.DataSync.Tests/Services/EtlPipelineFactoryTests.cs` + +Update `CreateValidConfig()` helper to use new schema format. Ensure all existing tests still pass. + +**Step 1: Run all tests** + +Run: `dotnet test tests/JdeScoping.DataSync.Tests --verbosity normal` +Expected: All PASS + +**Step 2: Commit** + +```bash +git add tests/JdeScoping.DataSync.Tests/ +git commit -m "test(datasync): update tests for new schedule config format" +``` + +--- + +### Task 14: Run Full Test Suite + +**Step 1: Build entire solution** + +Run: `dotnet build` +Expected: Build succeeded with 0 errors + +**Step 2: Run all tests** + +Run: `dotnet test --verbosity normal` +Expected: All tests pass + +**Step 3: Commit any final fixes** + +```bash +git add . +git commit -m "fix: address test failures from schedule alignment" +``` + +--- + +### Task 15: Final Cleanup - Remove Deprecated SyncMode + +**Note:** This task should be done after confirming all tests pass and the new system works correctly. It's optional and can be deferred. + +**Files:** +- Modify: `src/JdeScoping.DataSync/Contracts/SyncMode.cs` - Mark as obsolete +- Modify: `src/JdeScoping.DataSync/Configuration/PipelineConfig.cs` - Mark SyncModes as obsolete + +```csharp +[Obsolete("Use Schedules property instead")] +public Dictionary? SyncModes { get; init; } +``` + +**Commit:** + +```bash +git add . +git commit -m "chore(datasync): mark SyncMode as obsolete" +``` + +--- + +## Summary + +**Total Tasks:** 15 +**Estimated Files Modified:** ~18 +**New Files Created:** ~3 + +**Key Changes:** +1. ScheduleConfig/ScheduleDefaults models added +2. PipelinesRoot supports scheduleDefaults +3. PipelineConfig supports schedules (mass/daily/hourly) +4. GIW connection added for StatusCode +5. DbQuerySource supports "giw" connection type +6. IEtlPipelineBuilder.WithUpdateType() added +7. TableSyncOperation uses UpdateTypes directly +8. 8 missing pipelines added +9. Backward compatible with existing syncModes format + +**Verification:** +- All existing tests continue to pass +- New tests cover schedule config behavior +- pipelines.json validates with new schema