feat: complete WorkProcessor integration and bug fixes

- Fix hourly lookback bug: use hourly timestamp/interval (not daily)
- Update DI registrations across DataSync, DataAccess, Api layers
- Add WorkProcessor config to appsettings.json
- Remove deprecated DataSyncService (replaced by WorkProcessor)

All 340 DataSync tests pass. Legacy bug from OLD solution now fixed.
This commit is contained in:
Joseph Doherty
2026-01-07 06:26:45 -05:00
parent 91b516e197
commit 5ee920a399
8 changed files with 30 additions and 841 deletions
@@ -1,160 +0,0 @@
using JdeScoping.DataSync.Options;
using JdeScoping.DataSync.Contracts;
using JdeScoping.DataSync.Telemetry;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
namespace JdeScoping.DataSync;
/// <summary>
/// Background service that orchestrates data synchronization from JDE/CMS to SQL Server cache.
/// </summary>
public class DataSyncService : BackgroundService
{
private readonly IServiceScopeFactory _scopeFactory;
private readonly IOptions<DataSyncOptions> _options;
private readonly ILogger<DataSyncService> _logger;
private readonly DataSyncMetrics _metrics;
private DateTime _lastPurgeCheck = DateTime.MinValue;
private readonly TimeSpan _purgeCheckInterval = TimeSpan.FromHours(24);
/// <summary>
/// Initializes a new instance of the <see cref="DataSyncService"/> class.
/// </summary>
public DataSyncService(
IServiceScopeFactory scopeFactory,
IOptions<DataSyncOptions> options,
ILogger<DataSyncService> logger,
DataSyncMetrics metrics)
{
_scopeFactory = scopeFactory ?? throw new ArgumentNullException(nameof(scopeFactory));
_options = options ?? throw new ArgumentNullException(nameof(options));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_metrics = metrics ?? throw new ArgumentNullException(nameof(metrics));
}
/// <inheritdoc/>
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
if (!_options.Value.Enabled)
{
_logger.LogInformation("DataSyncService is disabled via configuration");
return;
}
_logger.LogInformation(
"DataSyncService starting with CheckInterval={CheckInterval}, MaxDegreeOfParallelism={MaxDegreeOfParallelism}",
_options.Value.CheckInterval,
_options.Value.MaxDegreeOfParallelism);
// Startup: close any interrupted syncs from prior runs
await CloseOpenUpdateEntriesAsync(stoppingToken);
while (!stoppingToken.IsCancellationRequested)
{
try
{
// Create scope for this sync cycle
await using var scope = _scopeFactory.CreateAsyncScope();
var orchestrator = scope.ServiceProvider.GetRequiredService<ISyncOrchestrator>();
// Check schedules and execute pending syncs
await orchestrator.ExecutePendingSyncsAsync(stoppingToken);
// Periodic purge of old DataUpdate records
await PurgeUpdateEntriesAsync(scope, stoppingToken);
}
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
{
// Graceful shutdown
_logger.LogInformation("DataSyncService stopping gracefully");
break;
}
catch (Exception ex)
{
_logger.LogError(ex, "Error in sync cycle");
_metrics.RecordCycleError();
}
// Wait before next check
try
{
await Task.Delay(_options.Value.CheckInterval, stoppingToken);
}
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
{
break;
}
}
_logger.LogInformation("DataSyncService stopped");
}
/// <summary>
/// Closes any open update entries from interrupted prior runs.
/// </summary>
private async Task CloseOpenUpdateEntriesAsync(CancellationToken cancellationToken)
{
try
{
await using var scope = _scopeFactory.CreateAsyncScope();
var repository = scope.ServiceProvider.GetRequiredService<IDataUpdateRepository>();
var closedCount = await repository.CloseOpenUpdateEntriesAsync(cancellationToken);
if (closedCount > 0)
{
_logger.LogWarning(
"Closed {Count} interrupted update entries from prior runs",
closedCount);
}
else
{
_logger.LogDebug("No interrupted update entries found");
}
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to close open update entries at startup");
// Continue starting - this is not fatal
}
}
/// <summary>
/// Purges old DataUpdate records periodically.
/// </summary>
private async Task PurgeUpdateEntriesAsync(AsyncServiceScope scope, CancellationToken cancellationToken)
{
if (DateTime.UtcNow - _lastPurgeCheck < _purgeCheckInterval)
{
return;
}
_lastPurgeCheck = DateTime.UtcNow;
try
{
var repository = scope.ServiceProvider.GetRequiredService<IDataUpdateRepository>();
var purgedCount = await repository.PurgeOldEntriesAsync(
_options.Value.PurgeRetentionDays,
cancellationToken);
if (purgedCount > 0)
{
_logger.LogInformation(
"Purged {Count} DataUpdate records older than {Days} days",
purgedCount,
_options.Value.PurgeRetentionDays);
}
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to purge old update entries");
// Continue - this is not fatal
}
}
}
@@ -29,6 +29,12 @@ public static class DataSyncDependencyInjection
.ValidateDataAnnotations()
.ValidateOnStart();
// WorkProcessor configuration with validation
services.AddOptions<WorkProcessorOptions>()
.Bind(configuration.GetSection(WorkProcessorOptions.SectionName))
.ValidateDataAnnotations()
.ValidateOnStart();
// Pipeline configuration (new ETL infrastructure)
services.AddOptions<PipelineOptions>()
.Bind(configuration.GetSection(PipelineOptions.SectionName));
@@ -36,8 +42,8 @@ public static class DataSyncDependencyInjection
// Pipeline factory (new ETL infrastructure)
services.AddSingleton<IEtlPipelineFactory, EtlPipelineFactory>();
// Register hosted service
services.AddHostedService<DataSyncService>();
// Register hosted service (WorkProcessor combines data sync and search processing)
services.AddHostedService<WorkProcessor>();
// Register core services as scoped (for parallel isolation)
services.AddScoped<ISyncOrchestrator, SyncOrchestrator>();
@@ -45,6 +51,10 @@ public static class DataSyncDependencyInjection
services.AddScoped<ITableSyncOperation, TableSyncOperation>();
services.AddScoped<IDataUpdateRepository, DataUpdateRepository>();
// Register search processing services as scoped
services.AddScoped<ISearchRepository, SearchRepository>();
services.AddScoped<ISearchExecutionService, SearchExecutionService>();
// Register health check
services.AddHealthChecks()
.AddCheck<DataSyncHealthCheck>("data-sync", tags: ["datasync", "background"]);
@@ -107,11 +107,10 @@ public class ScheduleChecker : IScheduleChecker
return CreateTask(config, UpdateTypes.Daily, minimumDt);
}
// Check Hourly (uses Daily's last timestamp for MinimumDT calculation, per legacy behavior)
// Check Hourly
if (config.HourlyConfig.Enabled && NeedsHourlySync(config, lastHourly, lastDaily, lastMass, now))
{
// Use daily update timestamp for lookback, not hourly
var minimumDt = CalculateMinimumDt(lastDaily, config.DailyConfig.IntervalMinutes);
var minimumDt = CalculateMinimumDt(lastHourly, config.HourlyConfig.IntervalMinutes);
_logger.LogDebug(
"Hourly sync needed for {Table}: last={LastSync}, interval={Interval}m, minDT={MinDT}",