Files
Joseph Doherty 26ff8d9b4f Initial commit: JDE Scoping Tool migration project
Set up repository with legacy .NET Framework 4.8 source (OLD/),
new .NET 10 Blazor solution (NEW/), OpenSpec specifications,
documentation, and project configuration.
2026-01-02 07:43:29 -05:00

18 KiB

Data Sync Service Design

Overview

This document describes the architecture and implementation patterns for the data synchronization background service.

Component Architecture

JdeScoping.DataSync/
├── DataSyncService.cs              # BackgroundService implementation
├── IDataFetcher.cs                 # Generic fetcher interface
├── IPostProcessor.cs               # Post-processing interface
├── DataSyncOptions.cs              # Configuration options
├── DataSourceConfig.cs             # Per-table configuration
├── ScheduleChecker.cs              # Schedule evaluation logic
├── SyncOrchestrator.cs             # Coordinates parallel sync operations
├── TableSyncOperation.cs           # Single table sync execution
├── StagingTableManager.cs          # Temp table creation and MERGE
├── DataSyncHealthCheck.cs          # IHealthCheck implementation
├── DataSyncMetrics.cs              # Metrics and telemetry
├── ServiceCollectionExtensions.cs  # AddDataSync registration
└── Fetchers/                       # IDataFetcher<T> implementations
    ├── JdeWorkOrderFetcher.cs
    ├── JdeLotUsageFetcher.cs
    ├── JdeItemFetcher.cs
    └── ...

BackgroundService Pattern

ExecuteAsync Implementation

public class DataSyncService : BackgroundService
{
    private readonly IServiceScopeFactory _scopeFactory;
    private readonly IOptions<DataSyncOptions> _options;
    private readonly ILogger<DataSyncService> _logger;
    private readonly DataSyncMetrics _metrics;

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        // Startup: close any interrupted syncs from prior runs
        await CloseOpenUpdateEntriesAsync(stoppingToken);

        while (!stoppingToken.IsCancellationRequested)
        {
            try
            {
                // Create scope for this sync cycle
                await using var scope = _scopeFactory.CreateAsyncScope();

                var orchestrator = scope.ServiceProvider
                    .GetRequiredService<ISyncOrchestrator>();

                // Check schedules and execute pending syncs
                await orchestrator.ExecutePendingSyncsAsync(stoppingToken);

                // Periodic purge of old DataUpdate records
                await PurgeUpdateEntriesAsync(scope, stoppingToken);
            }
            catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
            {
                // Graceful shutdown
                break;
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, "Error in sync cycle");
            }

            // Wait before next check
            await Task.Delay(_options.Value.CheckInterval, stoppingToken);
        }
    }
}

Graceful Shutdown

  • CancellationToken propagates to all child operations
  • Parallel.ForEachAsync respects cancellation token
  • In-progress operations complete current batch or cancel gracefully
  • Incomplete syncs marked as failed with WasSuccessful = false

IDataFetcher Interface

Interface Definition

public interface IDataFetcher<TEntity> where TEntity : class
{
    /// <summary>
    /// Fetches entities from source system as an async stream.
    /// </summary>
    /// <param name="minimumDT">For incremental fetches, only return records modified after this time. Null for full fetch.</param>
    /// <param name="cancellationToken">Cancellation token for graceful shutdown.</param>
    /// <returns>Async enumerable of entities, streamed from source.</returns>
    IAsyncEnumerable<TEntity> FetchAsync(
        DateTime? minimumDT,
        CancellationToken cancellationToken = default);
}

Fetcher Resolution

Fetchers are registered in DI by convention:

services.AddScoped<IDataFetcher<WorkOrder>, JdeWorkOrderFetcher>();
services.AddScoped<IDataFetcher<LotUsage>, JdeLotUsageFetcher>();
services.AddScoped<IDataFetcher<Item>, JdeItemFetcher>();
// ... etc

Configuration references fetcher type name:

{
  "DataSync": {
    "DataSources": [
      {
        "TableName": "WorkOrder_Curr",
        "SourceSystem": "JDE",
        "FetcherTypeName": "JdeWorkOrderFetcher",
        "IsEnabled": true,
        "MassConfig": { "Enabled": true, "IntervalMinutes": 10080, "PrepurgeData": true },
        "DailyConfig": { "Enabled": true, "IntervalMinutes": 1440 },
        "HourlyConfig": { "Enabled": true, "IntervalMinutes": 60 }
      }
    ]
  }
}

At startup, FetcherTypeName is validated and resolved to a registered IDataFetcher<T>.

Configuration Classes

DataSyncOptions

public class DataSyncOptions
{
    public const string SectionName = "DataSync";

    /// <summary>Time between schedule checks (default: 1 minute)</summary>
    public TimeSpan CheckInterval { get; set; } = TimeSpan.FromMinutes(1);

    /// <summary>Maximum parallel sync operations (default: 8)</summary>
    public int MaxDegreeOfParallelism { get; set; } = 8;

    /// <summary>Records per batch for streaming (default: 1,000,000)</summary>
    public int BatchSize { get; set; } = 1_000_000;

    /// <summary>Rows per bulk copy batch (default: 10,000)</summary>
    public int BulkCopyBatchSize { get; set; } = 10_000;

    /// <summary>Multiplier for lookback window (default: 3)</summary>
    public int LookbackMultiplier { get; set; } = 3;

    /// <summary>Days to retain DataUpdate history (default: 30)</summary>
    public int PurgeRetentionDays { get; set; } = 30;

    /// <summary>Per-table data source configurations</summary>
    public List<DataSourceConfig> DataSources { get; set; } = new();
}

DataSourceConfig

public class DataSourceConfig
{
    /// <summary>Target table name in SQL Server cache</summary>
    public required string TableName { get; set; }

    /// <summary>Source system: "JDE" or "CMS"</summary>
    public required string SourceSystem { get; set; }

    /// <summary>Name of IDataFetcher<T> implementation type</summary>
    public required string FetcherTypeName { get; set; }

    /// <summary>Optional IPostProcessor implementation type name</summary>
    public string? PostProcessorTypeName { get; set; }

    /// <summary>Whether this data source is enabled for sync</summary>
    public bool IsEnabled { get; set; } = true;

    /// <summary>Mass sync schedule configuration</summary>
    public ScheduleConfig MassConfig { get; set; } = new();

    /// <summary>Daily incremental sync configuration</summary>
    public ScheduleConfig DailyConfig { get; set; } = new();

    /// <summary>Hourly incremental sync configuration</summary>
    public ScheduleConfig HourlyConfig { get; set; } = new();
}

public class ScheduleConfig
{
    public bool Enabled { get; set; } = true;
    public int IntervalMinutes { get; set; }
    public bool PrepurgeData { get; set; } = false;
    public bool ReIndexData { get; set; } = false;
}

Parallel Sync Execution

Parallel.ForEachAsync Pattern

public class SyncOrchestrator : ISyncOrchestrator
{
    public async Task ExecutePendingSyncsAsync(CancellationToken cancellationToken)
    {
        var pendingTasks = await _scheduleChecker.GetPendingTasksAsync(cancellationToken);

        if (pendingTasks.Count == 0)
            return;

        var parallelOptions = new ParallelOptions
        {
            MaxDegreeOfParallelism = _options.Value.MaxDegreeOfParallelism,
            CancellationToken = cancellationToken
        };

        await Parallel.ForEachAsync(pendingTasks, parallelOptions, async (task, ct) =>
        {
            // Each task gets its own scope
            await using var scope = _scopeFactory.CreateAsyncScope();

            var operation = scope.ServiceProvider
                .GetRequiredService<ITableSyncOperation>();

            await operation.ExecuteAsync(task, ct);
        });
    }
}

Isolation Requirements

  • Each parallel sync operation creates its own IServiceScope
  • Each operation uses its own SQL connection from the scoped DbContext or connection factory
  • Staging tables use unique suffixes: #Staging{TableName}_{OperationId}
  • No shared mutable state between parallel operations

Staging Table Management

Naming Convention

#Staging{TableName}_{OperationId}    - Bulk copy destination
#{TableName}_{OperationId}           - Deduplicated temp table for MERGE

Where OperationId is a GUID or sequential ID unique to each sync operation.

MERGE Operation Flow

  1. Create staging table matching destination schema with unique suffix
  2. Bulk copy source data to staging table (batched at 10,000 rows)
  3. Deduplicate into temp table using ROW_NUMBER() OVER (PARTITION BY PK ORDER BY LastUpdateDT DESC)
  4. MERGE from temp table to destination:
    • INSERT new records (not matched by primary key)
    • UPDATE existing records WHERE source.LastUpdateDT > target.LastUpdateDT
  5. Cleanup staging and temp tables

Mass Update with Truncation

For mass updates with PrepurgeData = true:

  1. Disable non-PK indexes on destination table
  2. TRUNCATE destination table
  3. Bulk copy directly to destination (no staging needed)
  4. Rebuild indexes if ReIndexData = true
  5. Update statistics

Batching Large Datasets

When streaming more than 1,000,000 records:

int batchNumber = 0;
var batch = new List<T>(_options.BatchSize);

await foreach (var entity in fetcher.FetchAsync(minimumDT, ct))
{
    batch.Add(entity);

    if (batch.Count >= _options.BatchSize)
    {
        await ProcessBatchAsync(batch, operationId, batchNumber++, ct);
        batch.Clear();
    }
}

// Process remaining records
if (batch.Count > 0)
{
    await ProcessBatchAsync(batch, operationId, batchNumber, ct);
}

Update Logging

DataUpdate Record Lifecycle

Start: NumberRecords = -2 (in-progress marker)
  |
  v
Success: NumberRecords = actual count, WasSuccessful = true, EndDT = now
  OR
Failure: NumberRecords = -1, WasSuccessful = false, EndDT = now

Logging with Scope

public async Task ExecuteAsync(DataUpdateTask task, CancellationToken ct)
{
    using var logScope = _logger.BeginScope(new Dictionary<string, object>
    {
        ["TableName"] = task.TableName,
        ["UpdateType"] = task.UpdateType,
        ["OperationId"] = task.OperationId
    });

    var updateId = await _repository.StartUpdateAsync(task, ct);

    try
    {
        var recordCount = await ExecuteSyncAsync(task, ct);
        await _repository.CompleteUpdateAsync(updateId, recordCount, success: true, ct);
        _logger.LogInformation("Sync completed: {RecordCount} records", recordCount);
    }
    catch (Exception ex)
    {
        await _repository.CompleteUpdateAsync(updateId, -1, success: false, ct);
        _logger.LogError(ex, "Sync failed");
        throw;
    }
}

Startup Recovery

At startup, CloseOpenUpdateEntries() updates any records with NumberRecords = -2:

UPDATE DataUpdate
SET EndDT = GETDATE(),
    WasSuccessful = 0,
    NumberRecords = -1
WHERE NumberRecords = -2

Health Checks

IHealthCheck Implementation

public class DataSyncHealthCheck : IHealthCheck
{
    public async Task<HealthCheckResult> CheckHealthAsync(
        HealthCheckContext context,
        CancellationToken cancellationToken = default)
    {
        var statuses = await GetTableStatusesAsync(cancellationToken);
        var data = new Dictionary<string, object>();

        foreach (var status in statuses)
        {
            data[$"{status.TableName}_LastSync"] = status.LastSyncTime?.ToString("O") ?? "Never";
            data[$"{status.TableName}_Status"] = status.IsOverdue ? "Overdue" : "Current";
        }

        var overdueCount = statuses.Count(s => s.IsOverdue);
        var failedCount = statuses.Count(s => s.RecentFailures > 0);

        if (failedCount > 0)
            return HealthCheckResult.Unhealthy("Multiple recent sync failures", data: data);

        if (overdueCount > 0)
            return HealthCheckResult.Degraded($"{overdueCount} tables overdue for sync", data: data);

        return HealthCheckResult.Healthy("All syncs current", data: data);
    }
}

Telemetry

Metrics

public class DataSyncMetrics
{
    private readonly Meter _meter;
    private readonly Counter<long> _operationsStarted;
    private readonly Counter<long> _operationsCompleted;
    private readonly Counter<long> _operationsFailed;
    private readonly Histogram<double> _operationDuration;
    private readonly Histogram<long> _recordsProcessed;

    public DataSyncMetrics(IMeterFactory meterFactory)
    {
        _meter = meterFactory.Create("DataSync");
        _operationsStarted = _meter.CreateCounter<long>("sync.operations.started");
        _operationsCompleted = _meter.CreateCounter<long>("sync.operations.completed");
        _operationsFailed = _meter.CreateCounter<long>("sync.operations.failed");
        _operationDuration = _meter.CreateHistogram<double>("sync.duration.seconds");
        _recordsProcessed = _meter.CreateHistogram<long>("sync.records.processed");
    }

    public void RecordOperationStarted(string tableName, string updateType)
    {
        _operationsStarted.Add(1,
            new KeyValuePair<string, object?>("table", tableName),
            new KeyValuePair<string, object?>("type", updateType));
    }

    // ... similar for completed, failed, duration, records
}

Activity Tracing

public static class DataSyncActivitySource
{
    public static readonly ActivitySource Source = new("DataSync");

    public static Activity? StartSyncOperation(string tableName, string updateType)
    {
        return Source.StartActivity("SyncTable", ActivityKind.Internal)?
            .SetTag("table.name", tableName)
            .SetTag("update.type", updateType);
    }
}

DI Registration

AddDataSync Extension

public static class ServiceCollectionExtensions
{
    public static IServiceCollection AddDataSync(
        this IServiceCollection services,
        IConfiguration configuration)
    {
        // Bind configuration
        services.Configure<DataSyncOptions>(
            configuration.GetSection(DataSyncOptions.SectionName));

        // Register core services
        services.AddHostedService<DataSyncService>();
        services.AddScoped<ISyncOrchestrator, SyncOrchestrator>();
        services.AddScoped<IScheduleChecker, ScheduleChecker>();
        services.AddScoped<ITableSyncOperation, TableSyncOperation>();
        services.AddScoped<IStagingTableManager, StagingTableManager>();

        // Register health check
        services.AddHealthChecks()
            .AddCheck<DataSyncHealthCheck>("data-sync");

        // Register metrics
        services.AddSingleton<DataSyncMetrics>();

        // Register fetchers
        services.AddScoped<IDataFetcher<WorkOrder>, JdeWorkOrderFetcher>();
        services.AddScoped<IDataFetcher<LotUsage>, JdeLotUsageFetcher>();
        // ... etc

        // Validate configuration at startup
        services.AddOptions<DataSyncOptions>()
            .ValidateDataAnnotations()
            .ValidateOnStart();

        return services;
    }
}

Schedule Checking Logic

Priority: Mass > Daily > Hourly

public async Task<List<DataUpdateTask>> GetPendingTasksAsync(CancellationToken ct)
{
    var lastUpdates = await _repository.GetLastDataUpdatesAsync(ct);
    var tasks = new List<DataUpdateTask>();

    foreach (var config in _options.Value.DataSources.Where(c => c.IsEnabled))
    {
        var lastSync = lastUpdates.GetValueOrDefault(config.TableName);
        var now = DateTime.UtcNow;

        // Check Mass first (highest priority)
        if (config.MassConfig.Enabled && NeedsMassSync(config, lastSync, now))
        {
            tasks.Add(CreateMassTask(config));
            continue; // Skip daily/hourly checks
        }

        // Check Daily
        if (config.DailyConfig.Enabled && NeedsDailySync(config, lastSync, now))
        {
            tasks.Add(CreateDailyTask(config, lastSync));
            continue;
        }

        // Check Hourly
        if (config.HourlyConfig.Enabled && NeedsHourlySync(config, lastSync, now))
        {
            tasks.Add(CreateHourlyTask(config, lastSync));
        }
    }

    return tasks;
}

MinimumDT Calculation

For Daily updates:

MinimumDT = LastDailyUpdateDT - (LookbackMultiplier * DailyInterval)

For Hourly updates (uses Daily timestamp, not Hourly):

MinimumDT = LastDailyUpdateDT - (LookbackMultiplier * DailyInterval)

File Structure

NEW/src/
├── JdeScoping.DataSync/
│   ├── JdeScoping.DataSync.csproj
│   ├── DataSyncService.cs
│   ├── Configuration/
│   │   ├── DataSyncOptions.cs
│   │   └── DataSourceConfig.cs
│   ├── Contracts/
│   │   ├── IDataFetcher.cs
│   │   ├── IPostProcessor.cs
│   │   ├── ISyncOrchestrator.cs
│   │   ├── IScheduleChecker.cs
│   │   ├── ITableSyncOperation.cs
│   │   └── IStagingTableManager.cs
│   ├── Services/
│   │   ├── SyncOrchestrator.cs
│   │   ├── ScheduleChecker.cs
│   │   ├── TableSyncOperation.cs
│   │   └── StagingTableManager.cs
│   ├── Fetchers/
│   │   ├── Jde/
│   │   │   ├── JdeWorkOrderFetcher.cs
│   │   │   ├── JdeLotUsageFetcher.cs
│   │   │   └── ...
│   │   └── Cms/
│   │       └── CmsMisDataFetcher.cs
│   ├── HealthChecks/
│   │   └── DataSyncHealthCheck.cs
│   ├── Telemetry/
│   │   ├── DataSyncMetrics.cs
│   │   └── DataSyncActivitySource.cs
│   └── DependencyInjection/
│       └── ServiceCollectionExtensions.cs
└── JdeScoping.Host/
    └── Program.cs  (add: builder.Services.AddDataSync(configuration))