# WorkProcessor Implementation Plan > **For Claude:** REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task. **Goal:** Implement the unified WorkProcessor background service that coordinates data synchronization and search processing, replacing the legacy DataSyncService. **Architecture:** The WorkProcessor is a BackgroundService in the DataSync layer that prioritizes data freshness over search processing. It uses clean architecture with interfaces in Core, implementations in DataSync/DataAccess, and SignalR notifications in Api. **Tech Stack:** .NET 10, BackgroundService, SignalR, Dapper, EPPlus, xUnit, NSubstitute, Shouldly --- ## Task 1: Create ISearchProcessor Interface **Files:** - Create: `src/JdeScoping.Core/Interfaces/ISearchProcessor.cs` **Step 1: Create the interface** ```csharp using JdeScoping.DataAccess.Models; namespace JdeScoping.Core.Interfaces; /// /// Interface for search query execution. /// public interface ISearchProcessor { /// /// Executes search and materializes all results into SearchModel. /// /// The search model containing search ID. /// Cancellation token. /// The SearchModel populated with results. Task ExecuteSearchToModelAsync(SearchModel model, CancellationToken ct = default); } ``` **Step 2: Verify the build compiles** Run: `dotnet build src/JdeScoping.Core/JdeScoping.Core.csproj` Expected: Build succeeded **Step 3: Commit** ```bash git add src/JdeScoping.Core/Interfaces/ISearchProcessor.cs git commit -m "feat(core): add ISearchProcessor interface for search execution abstraction" ``` --- ## Task 2: Create ISearchNotificationService Interface **Files:** - Create: `src/JdeScoping.Core/Interfaces/ISearchNotificationService.cs` **Step 1: Create the interface** ```csharp using JdeScoping.Core.Models.Search; namespace JdeScoping.Core.Interfaces; /// /// Interface for SignalR notifications - lives in Core to avoid dependency issues. /// Implementation lives in Host/Api layer. /// public interface ISearchNotificationService { /// /// Notifies clients of search status update. /// /// The search with updated status. /// Cancellation token. Task NotifySearchUpdateAsync(Search search, CancellationToken ct = default); /// /// Notifies clients of work processor status change. /// /// Status message. /// Cancellation token. Task NotifyStatusAsync(string status, CancellationToken ct = default); } ``` **Step 2: Verify the build compiles** Run: `dotnet build src/JdeScoping.Core/JdeScoping.Core.csproj` Expected: Build succeeded **Step 3: Commit** ```bash git add src/JdeScoping.Core/Interfaces/ISearchNotificationService.cs git commit -m "feat(core): add ISearchNotificationService interface for SignalR abstraction" ``` --- ## Task 3: Create WorkProcessorOptions **Files:** - Create: `src/JdeScoping.DataSync/Options/WorkProcessorOptions.cs` **Step 1: Create the options class** ```csharp using System.ComponentModel.DataAnnotations; namespace JdeScoping.DataSync.Options; /// /// Configuration options for the WorkProcessor background service. /// public class WorkProcessorOptions { /// /// Configuration section name. /// public const string SectionName = "WorkProcessor"; /// /// Whether the work processor is enabled. Default: true. /// public bool Enabled { get; set; } = true; /// /// Interval between work cycles. Default: 5 seconds. /// [Range(typeof(TimeSpan), "00:00:01", "01:00:00")] public TimeSpan WorkInterval { get; set; } = TimeSpan.FromSeconds(5); /// /// Search execution timeout. Default: 30 minutes. /// [Range(typeof(TimeSpan), "00:01:00", "04:00:00")] public TimeSpan SearchTimeout { get; set; } = TimeSpan.FromMinutes(30); /// /// Number of days to retain DataUpdate records. Default: 30. /// [Range(1, 365)] public int PurgeRetentionDays { get; set; } = 30; } ``` **Step 2: Verify the build compiles** Run: `dotnet build src/JdeScoping.DataSync/JdeScoping.DataSync.csproj` Expected: Build succeeded **Step 3: Commit** ```bash git add src/JdeScoping.DataSync/Options/WorkProcessorOptions.cs git commit -m "feat(datasync): add WorkProcessorOptions configuration class" ``` --- ## Task 4: Create ISearchRepository Interface **Files:** - Create: `src/JdeScoping.DataSync/Contracts/ISearchRepository.cs` **Step 1: Create the interface** ```csharp using JdeScoping.Core.Models.Search; namespace JdeScoping.DataSync.Contracts; /// /// Repository interface for Search table operations. /// public interface ISearchRepository { /// /// Gets the next queued search ordered by SubmitDT (FIFO). /// /// Cancellation token. /// The next queued search, or null if none. Task GetNextQueuedSearchAsync(CancellationToken ct = default); /// /// Resets partial searches (Running but not Ended) back to Queued status. /// Called at service startup to handle interrupted operations. /// /// Cancellation token. /// Count of reset searches. Task ResetPartialSearchesAsync(CancellationToken ct = default); /// /// Marks search as Running with StartDT = now. /// /// Search ID. /// Cancellation token. Task StartSearchAsync(int searchId, CancellationToken ct = default); /// /// Marks search as Ended (success) or Error (failure) with EndDT and optional Results. /// /// Search ID. /// Whether search completed successfully. /// Excel file bytes (null on failure). /// Cancellation token. Task CompleteSearchAsync(int searchId, bool success, byte[]? results, CancellationToken ct = default); } ``` **Step 2: Verify the build compiles** Run: `dotnet build src/JdeScoping.DataSync/JdeScoping.DataSync.csproj` Expected: Build succeeded **Step 3: Commit** ```bash git add src/JdeScoping.DataSync/Contracts/ISearchRepository.cs git commit -m "feat(datasync): add ISearchRepository interface for search table operations" ``` --- ## Task 5: Create ISearchExecutionService Interface **Files:** - Create: `src/JdeScoping.DataSync/Contracts/ISearchExecutionService.cs` **Step 1: Create the interface** ```csharp using JdeScoping.Core.Models.Search; namespace JdeScoping.DataSync.Contracts; /// /// Interface for search execution pipeline orchestration. /// public interface ISearchExecutionService { /// /// Executes the complete search pipeline: query, Excel generation, result storage. /// Handles status transitions and notifications. /// /// The search to execute. /// Cancellation token. Task ExecuteSearchAsync(Search search, CancellationToken ct = default); } ``` **Step 2: Verify the build compiles** Run: `dotnet build src/JdeScoping.DataSync/JdeScoping.DataSync.csproj` Expected: Build succeeded **Step 3: Commit** ```bash git add src/JdeScoping.DataSync/Contracts/ISearchExecutionService.cs git commit -m "feat(datasync): add ISearchExecutionService interface for search pipeline" ``` --- ## Task 6: Implement SearchProcessor with ISearchProcessor **Files:** - Modify: `src/JdeScoping.DataAccess/Services/SearchProcessor.cs` **Step 1: Update SearchProcessor to implement ISearchProcessor** Add interface implementation to the class declaration: ```csharp // Change from: public sealed class SearchProcessor // To: public sealed class SearchProcessor : ISearchProcessor ``` Also add the using statement at the top: ```csharp using JdeScoping.Core.Interfaces; ``` **Step 2: Verify the build compiles** Run: `dotnet build src/JdeScoping.DataAccess/JdeScoping.DataAccess.csproj` Expected: Build succeeded **Step 3: Commit** ```bash git add src/JdeScoping.DataAccess/Services/SearchProcessor.cs git commit -m "feat(dataaccess): implement ISearchProcessor interface on SearchProcessor" ``` --- ## Task 7: Implement SearchRepository **Files:** - Create: `src/JdeScoping.DataSync/Services/SearchRepository.cs` - Create: `tests/JdeScoping.DataSync.Tests/Services/SearchRepositoryTests.cs` **Step 1: Write the failing tests** ```csharp using JdeScoping.Core.Models.Enums; using JdeScoping.Core.Models.Search; using JdeScoping.DataSync.Contracts; using JdeScoping.DataSync.Services; using Microsoft.Extensions.Logging; using NSubstitute; using Shouldly; namespace JdeScoping.DataSync.Tests.Services; public class SearchRepositoryTests { [Fact] public async Task GetNextQueuedSearchAsync_WhenNoneQueued_ReturnsNull() { // Arrange var sut = CreateRepository(); // Act var result = await sut.GetNextQueuedSearchAsync(); // Assert result.ShouldBeNull(); } [Fact] public async Task ResetPartialSearchesAsync_WhenNoPartialSearches_ReturnsZero() { // Arrange var sut = CreateRepository(); // Act var count = await sut.ResetPartialSearchesAsync(); // Assert count.ShouldBe(0); } private static SearchRepository CreateRepository() { // Note: Actual implementation tests require database integration tests // These are placeholder tests to verify the interface contract throw new NotImplementedException("Integration tests required"); } } ``` **Step 2: Run tests to verify they fail** Run: `dotnet test tests/JdeScoping.DataSync.Tests --filter "SearchRepositoryTests" -v n` Expected: FAIL (NotImplementedException or missing class) **Step 3: Implement SearchRepository** ```csharp using Dapper; using JdeScoping.Core.Models.Enums; using JdeScoping.Core.Models.Search; using JdeScoping.DataAccess.Interfaces; using JdeScoping.DataSync.Contracts; using Microsoft.Extensions.Logging; namespace JdeScoping.DataSync.Services; /// /// Repository for Search table operations. /// public class SearchRepository : ISearchRepository { private readonly IDbConnectionFactory _connectionFactory; private readonly ILogger _logger; public SearchRepository( IDbConnectionFactory connectionFactory, ILogger logger) { _connectionFactory = connectionFactory; _logger = logger; } /// public async Task GetNextQueuedSearchAsync(CancellationToken ct = default) { await using var connection = await _connectionFactory.CreateLotFinderConnectionAsync(ct); const string sql = """ SELECT TOP 1 Id, UserName, Name, Status, SubmitDT as SubmitDt, StartDT as StartDt, EndDT as EndDt, CriteriaJSON as CriteriaJson FROM dbo.Search WHERE Status = @Status ORDER BY SubmitDT ASC """; var search = await connection.QueryFirstOrDefaultAsync( sql, new { Status = (int)SearchStatus.Queued }, commandTimeout: 30); if (search != null) { _logger.LogDebug("Found queued search {SearchId}", search.Id); } return search; } /// public async Task ResetPartialSearchesAsync(CancellationToken ct = default) { await using var connection = await _connectionFactory.CreateLotFinderConnectionAsync(ct); const string sql = """ UPDATE dbo.Search SET Status = @QueuedStatus, StartDT = NULL WHERE Status = @RunningStatus AND EndDT IS NULL """; var count = await connection.ExecuteAsync( sql, new { QueuedStatus = (int)SearchStatus.Queued, RunningStatus = (int)SearchStatus.Running }, commandTimeout: 30); if (count > 0) { _logger.LogWarning("Reset {Count} partial searches to Queued status", count); } return count; } /// public async Task StartSearchAsync(int searchId, CancellationToken ct = default) { await using var connection = await _connectionFactory.CreateLotFinderConnectionAsync(ct); const string sql = """ UPDATE dbo.Search SET Status = @Status, StartDT = @StartDt WHERE Id = @SearchId """; await connection.ExecuteAsync( sql, new { SearchId = searchId, Status = (int)SearchStatus.Running, StartDt = DateTime.UtcNow }, commandTimeout: 30); _logger.LogDebug("Started search {SearchId}", searchId); } /// public async Task CompleteSearchAsync(int searchId, bool success, byte[]? results, CancellationToken ct = default) { await using var connection = await _connectionFactory.CreateLotFinderConnectionAsync(ct); const string sql = """ UPDATE dbo.Search SET Status = @Status, EndDT = @EndDt, Results = @Results WHERE Id = @SearchId """; var status = success ? SearchStatus.Ended : SearchStatus.Error; await connection.ExecuteAsync( sql, new { SearchId = searchId, Status = (int)status, EndDt = DateTime.UtcNow, Results = results }, commandTimeout: 30); _logger.LogDebug("Completed search {SearchId} with status {Status}", searchId, status); } } ``` **Step 4: Update tests to use mock connection factory** ```csharp using JdeScoping.Core.Models.Enums; using JdeScoping.Core.Models.Search; using JdeScoping.DataAccess.Interfaces; using JdeScoping.DataSync.Contracts; using JdeScoping.DataSync.Services; using Microsoft.Data.SqlClient; using Microsoft.Extensions.Logging; using NSubstitute; using Shouldly; namespace JdeScoping.DataSync.Tests.Services; public class SearchRepositoryTests { private readonly IDbConnectionFactory _connectionFactory; private readonly ILogger _logger; public SearchRepositoryTests() { _connectionFactory = Substitute.For(); _logger = Substitute.For>(); } [Fact] public void Constructor_WithNullConnectionFactory_ThrowsArgumentNullException() { // Act & Assert Should.Throw(() => new SearchRepository(null!, _logger)); } [Fact] public void Constructor_WithNullLogger_ThrowsArgumentNullException() { // Act & Assert Should.Throw(() => new SearchRepository(_connectionFactory, null!)); } [Fact] public void Constructor_WithValidDependencies_CreatesInstance() { // Act var repository = new SearchRepository(_connectionFactory, _logger); // Assert repository.ShouldNotBeNull(); } } ``` **Step 5: Run tests to verify they pass** Run: `dotnet test tests/JdeScoping.DataSync.Tests --filter "SearchRepositoryTests" -v n` Expected: PASS **Step 6: Commit** ```bash git add src/JdeScoping.DataSync/Services/SearchRepository.cs git add tests/JdeScoping.DataSync.Tests/Services/SearchRepositoryTests.cs git commit -m "feat(datasync): implement SearchRepository with tests" ``` --- ## Task 8: Implement SearchExecutionService **Files:** - Create: `src/JdeScoping.DataSync/Services/SearchExecutionService.cs` - Create: `tests/JdeScoping.DataSync.Tests/Services/SearchExecutionServiceTests.cs` **Step 1: Write the failing tests** ```csharp using JdeScoping.Core.Interfaces; using JdeScoping.Core.Models.Enums; using JdeScoping.Core.Models.Search; using JdeScoping.DataAccess.Models; using JdeScoping.DataSync.Contracts; using JdeScoping.DataSync.Options; using JdeScoping.DataSync.Services; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using NSubstitute; using NSubstitute.ExceptionExtensions; using Shouldly; namespace JdeScoping.DataSync.Tests.Services; public class SearchExecutionServiceTests { private readonly ISearchRepository _searchRepository; private readonly ISearchProcessor _searchProcessor; private readonly IExcelExportService _excelExportService; private readonly ISearchNotificationService _notificationService; private readonly IOptions _options; private readonly ILogger _logger; public SearchExecutionServiceTests() { _searchRepository = Substitute.For(); _searchProcessor = Substitute.For(); _excelExportService = Substitute.For(); _notificationService = Substitute.For(); _options = Options.Create(new WorkProcessorOptions { SearchTimeout = TimeSpan.FromSeconds(30) }); _logger = Substitute.For>(); } [Fact] public async Task ExecuteSearchAsync_Success_CompletesWithEndedStatus() { // Arrange var search = new Search { Id = 1, Status = SearchStatus.Queued }; var model = new SearchModel { Id = 1, Results = [] }; var excelBytes = new byte[] { 1, 2, 3 }; _searchProcessor.ExecuteSearchToModelAsync(Arg.Any(), Arg.Any()) .Returns(model); _excelExportService.GenerateAsync(Arg.Any(), Arg.Any()) .Returns(excelBytes); var sut = CreateService(); // Act await sut.ExecuteSearchAsync(search); // Assert await _searchRepository.Received(1).StartSearchAsync(1, Arg.Any()); await _searchRepository.Received(1).CompleteSearchAsync(1, true, excelBytes, Arg.Any()); await _notificationService.Received(2).NotifySearchUpdateAsync(search, Arg.Any()); } [Fact] public async Task ExecuteSearchAsync_ProcessorThrows_MarksAsError() { // Arrange var search = new Search { Id = 1, Status = SearchStatus.Queued }; _searchProcessor.ExecuteSearchToModelAsync(Arg.Any(), Arg.Any()) .ThrowsAsync(new InvalidOperationException("Test error")); var sut = CreateService(); // Act await sut.ExecuteSearchAsync(search); // Assert await _searchRepository.Received(1).CompleteSearchAsync(1, false, null, Arg.Any()); } [Fact] public async Task ExecuteSearchAsync_HostShutdown_DoesNotMarkAsError() { // Arrange var search = new Search { Id = 1, Status = SearchStatus.Queued }; var cts = new CancellationTokenSource(); _searchProcessor.ExecuteSearchToModelAsync(Arg.Any(), Arg.Any()) .Returns(async _ => { cts.Cancel(); throw new OperationCanceledException(cts.Token); }); var sut = CreateService(); // Act await sut.ExecuteSearchAsync(search, cts.Token); // Assert await _searchRepository.DidNotReceive().CompleteSearchAsync( Arg.Any(), false, Arg.Any(), Arg.Any()); } private SearchExecutionService CreateService() { return new SearchExecutionService( _searchRepository, _searchProcessor, _excelExportService, _notificationService, _options, _logger); } } ``` **Step 2: Run tests to verify they fail** Run: `dotnet test tests/JdeScoping.DataSync.Tests --filter "SearchExecutionServiceTests" -v n` Expected: FAIL (missing class) **Step 3: Implement SearchExecutionService** ```csharp using JdeScoping.Core.Interfaces; using JdeScoping.Core.Models.Enums; using JdeScoping.Core.Models.Search; using JdeScoping.DataAccess.Models; using JdeScoping.DataSync.Contracts; using JdeScoping.DataSync.Options; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; namespace JdeScoping.DataSync.Services; /// /// Service that orchestrates the complete search execution pipeline. /// public class SearchExecutionService : ISearchExecutionService { private readonly ISearchRepository _searchRepository; private readonly ISearchProcessor _searchProcessor; private readonly IExcelExportService _excelExportService; private readonly ISearchNotificationService _notificationService; private readonly WorkProcessorOptions _options; private readonly ILogger _logger; public SearchExecutionService( ISearchRepository searchRepository, ISearchProcessor searchProcessor, IExcelExportService excelExportService, ISearchNotificationService notificationService, IOptions options, ILogger logger) { _searchRepository = searchRepository ?? throw new ArgumentNullException(nameof(searchRepository)); _searchProcessor = searchProcessor ?? throw new ArgumentNullException(nameof(searchProcessor)); _excelExportService = excelExportService ?? throw new ArgumentNullException(nameof(excelExportService)); _notificationService = notificationService ?? throw new ArgumentNullException(nameof(notificationService)); _options = options?.Value ?? throw new ArgumentNullException(nameof(options)); _logger = logger ?? throw new ArgumentNullException(nameof(logger)); } /// public async Task ExecuteSearchAsync(Search search, CancellationToken ct = default) { using var timeoutCts = new CancellationTokenSource(_options.SearchTimeout); using var linkedCts = CancellationTokenSource.CreateLinkedTokenSource(ct, timeoutCts.Token); try { // Mark as Running await _searchRepository.StartSearchAsync(search.Id, linkedCts.Token); search.Status = SearchStatus.Running; search.StartDt = DateTime.UtcNow; await NotifySearchUpdateSafeAsync(search, linkedCts.Token); // Execute search query var model = new SearchModel { Id = search.Id }; await _searchProcessor.ExecuteSearchToModelAsync(model, linkedCts.Token); // Generate Excel var excelBytes = await _excelExportService.GenerateAsync(model, linkedCts.Token); // Complete with success await _searchRepository.CompleteSearchAsync(search.Id, true, excelBytes, linkedCts.Token); search.Status = SearchStatus.Ended; search.EndDt = DateTime.UtcNow; await NotifySearchUpdateSafeAsync(search, linkedCts.Token); _logger.LogInformation( "Search {SearchId} completed successfully with {ResultCount} results", search.Id, model.Results.Count); } catch (OperationCanceledException) when (ct.IsCancellationRequested) { // Host shutdown - don't mark as error, let startup cleanup handle it _logger.LogInformation( "Search {SearchId} interrupted by shutdown, will be requeued on restart", search.Id); // Don't rethrow - let the caller handle graceful shutdown } catch (OperationCanceledException) when (timeoutCts.IsCancellationRequested) { // Timeout - mark as error _logger.LogWarning( "Search {SearchId} timed out after {Timeout}", search.Id, _options.SearchTimeout); await CompleteWithErrorAsync(search); } catch (Exception ex) { _logger.LogError(ex, "Search {SearchId} failed", search.Id); await CompleteWithErrorAsync(search); } } private async Task CompleteWithErrorAsync(Search search) { try { await _searchRepository.CompleteSearchAsync(search.Id, false, null, CancellationToken.None); search.Status = SearchStatus.Error; search.EndDt = DateTime.UtcNow; await NotifySearchUpdateSafeAsync(search, CancellationToken.None); } catch (Exception ex) { _logger.LogError(ex, "Failed to mark search {SearchId} as error", search.Id); } } private async Task NotifySearchUpdateSafeAsync(Search search, CancellationToken ct) { try { await _notificationService.NotifySearchUpdateAsync(search, ct); } catch (Exception ex) { _logger.LogDebug(ex, "Failed to send search update notification for {SearchId}", search.Id); } } } ``` **Step 4: Run tests to verify they pass** Run: `dotnet test tests/JdeScoping.DataSync.Tests --filter "SearchExecutionServiceTests" -v n` Expected: PASS **Step 5: Commit** ```bash git add src/JdeScoping.DataSync/Services/SearchExecutionService.cs git add tests/JdeScoping.DataSync.Tests/Services/SearchExecutionServiceTests.cs git commit -m "feat(datasync): implement SearchExecutionService with proper cancellation handling" ``` --- ## Task 9: Implement WorkProcessor **Files:** - Create: `src/JdeScoping.DataSync/WorkProcessor.cs` - Create: `tests/JdeScoping.DataSync.Tests/WorkProcessorTests.cs` **Step 1: Write the failing tests** ```csharp using JdeScoping.Core.Interfaces; using JdeScoping.Core.Models.Enums; using JdeScoping.Core.Models.Search; using JdeScoping.DataSync.Contracts; using JdeScoping.DataSync.Options; using JdeScoping.DataSync.Telemetry; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using NSubstitute; using Shouldly; namespace JdeScoping.DataSync.Tests; public class WorkProcessorTests { private readonly IServiceScopeFactory _scopeFactory; private readonly IOptions _options; private readonly ILogger _logger; private readonly DataSyncMetrics _metrics; public WorkProcessorTests() { _scopeFactory = Substitute.For(); _options = Options.Create(new WorkProcessorOptions { Enabled = true, WorkInterval = TimeSpan.FromMilliseconds(100) }); _logger = Substitute.For>(); _metrics = new DataSyncMetrics(); } [Fact] public async Task ExecuteAsync_WhenDisabled_StopsImmediately() { // Arrange var options = Options.Create(new WorkProcessorOptions { Enabled = false }); var sut = new WorkProcessor(_scopeFactory, options, _logger, _metrics); using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(1)); // Act await sut.StartAsync(cts.Token); await Task.Delay(50); await sut.StopAsync(CancellationToken.None); // Assert - should not throw } [Fact] public async Task ExecuteAsync_CallsStartupCleanup() { // Arrange var dataUpdateRepo = Substitute.For(); var searchRepo = Substitute.For(); var scheduleChecker = Substitute.For(); var notificationService = Substitute.For(); scheduleChecker.GetPendingTasksAsync(Arg.Any()) .Returns([]); var scope = SetupScope(dataUpdateRepo, searchRepo, scheduleChecker, notificationService); _scopeFactory.CreateAsyncScope().Returns(scope); var sut = new WorkProcessor(_scopeFactory, _options, _logger, _metrics); using var cts = new CancellationTokenSource(TimeSpan.FromMilliseconds(200)); // Act await sut.StartAsync(cts.Token); await Task.Delay(150); await sut.StopAsync(CancellationToken.None); // Assert await dataUpdateRepo.Received().CloseOpenUpdateEntriesAsync(Arg.Any()); await searchRepo.Received().ResetPartialSearchesAsync(Arg.Any()); } private static AsyncServiceScope SetupScope( IDataUpdateRepository dataUpdateRepo, ISearchRepository searchRepo, IScheduleChecker scheduleChecker, ISearchNotificationService notificationService) { var serviceProvider = Substitute.For(); serviceProvider.GetService(typeof(IDataUpdateRepository)).Returns(dataUpdateRepo); serviceProvider.GetService(typeof(ISearchRepository)).Returns(searchRepo); serviceProvider.GetService(typeof(IScheduleChecker)).Returns(scheduleChecker); serviceProvider.GetService(typeof(ISearchNotificationService)).Returns(notificationService); var scope = Substitute.For(); scope.ServiceProvider.Returns(serviceProvider); return new AsyncServiceScope(scope); } } ``` **Step 2: Run tests to verify they fail** Run: `dotnet test tests/JdeScoping.DataSync.Tests --filter "WorkProcessorTests" -v n` Expected: FAIL (missing class) **Step 3: Implement WorkProcessor** ```csharp using JdeScoping.Core.Interfaces; using JdeScoping.Core.Models.Search; using JdeScoping.DataSync.Contracts; using JdeScoping.DataSync.Options; using JdeScoping.DataSync.Telemetry; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; namespace JdeScoping.DataSync; /// /// Unified background service that coordinates data synchronization and search processing. /// Data freshness takes priority over search processing. /// public class WorkProcessor : BackgroundService { private readonly IServiceScopeFactory _scopeFactory; private readonly WorkProcessorOptions _options; private readonly ILogger _logger; private readonly DataSyncMetrics _metrics; private DateTime _lastPurgeCheck = DateTime.MinValue; private static readonly TimeSpan PurgeCheckInterval = TimeSpan.FromHours(24); public WorkProcessor( IServiceScopeFactory scopeFactory, IOptions options, ILogger logger, DataSyncMetrics metrics) { _scopeFactory = scopeFactory ?? throw new ArgumentNullException(nameof(scopeFactory)); _options = options?.Value ?? throw new ArgumentNullException(nameof(options)); _logger = logger ?? throw new ArgumentNullException(nameof(logger)); _metrics = metrics ?? throw new ArgumentNullException(nameof(metrics)); } protected override async Task ExecuteAsync(CancellationToken stoppingToken) { if (!_options.Enabled) { _logger.LogInformation("WorkProcessor is disabled"); return; } _logger.LogInformation( "WorkProcessor starting with WorkInterval={WorkInterval}", _options.WorkInterval); // Startup cleanup await StartupCleanupAsync(stoppingToken); while (!stoppingToken.IsCancellationRequested) { string status = "Idle"; try { status = await DoWorkAsync(stoppingToken); } catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested) { _logger.LogInformation("WorkProcessor stopping gracefully"); break; } catch (Exception ex) { _logger.LogError(ex, "Error in work cycle"); _metrics.RecordCycleError(); } finally { // Always notify status (best-effort) await NotifyStatusSafeAsync(status, stoppingToken); } try { await Task.Delay(_options.WorkInterval, stoppingToken); } catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested) { break; } } await NotifyStatusSafeAsync("Stopped", CancellationToken.None); _logger.LogInformation("WorkProcessor stopped"); } private async Task StartupCleanupAsync(CancellationToken ct) { await using var scope = _scopeFactory.CreateAsyncScope(); // Close interrupted DataUpdate entries try { var dataUpdateRepo = scope.ServiceProvider.GetRequiredService(); var closedCount = await dataUpdateRepo.CloseOpenUpdateEntriesAsync(ct); if (closedCount > 0) { _logger.LogWarning("Closed {Count} interrupted data update entries", closedCount); } } catch (Exception ex) { _logger.LogError(ex, "Failed to close open data update entries"); } // Reset partial searches try { var searchRepo = scope.ServiceProvider.GetRequiredService(); var resetCount = await searchRepo.ResetPartialSearchesAsync(ct); if (resetCount > 0) { _logger.LogWarning("Reset {Count} partial searches to Queued", resetCount); } } catch (Exception ex) { _logger.LogError(ex, "Failed to reset partial searches"); } } private async Task DoWorkAsync(CancellationToken ct) { await using var scope = _scopeFactory.CreateAsyncScope(); // Priority 1: Data syncs var scheduleChecker = scope.ServiceProvider.GetRequiredService(); var pendingTasks = await scheduleChecker.GetPendingTasksAsync(ct); if (pendingTasks.Count > 0) { await NotifyStatusSafeAsync("Updating data cache", ct); var orchestrator = scope.ServiceProvider.GetRequiredService(); await orchestrator.ExecutePendingSyncsAsync(ct); // Periodic purge check await PurgeOldEntriesAsync(scope, ct); return "Idle"; } // Priority 2: Search processing (only when syncs are current) var searchRepository = scope.ServiceProvider.GetRequiredService(); var search = await searchRepository.GetNextQueuedSearchAsync(ct); if (search != null) { await NotifyStatusSafeAsync($"Processing search #{search.Id}", ct); var executionService = scope.ServiceProvider.GetRequiredService(); await executionService.ExecuteSearchAsync(search, ct); } // Periodic purge check await PurgeOldEntriesAsync(scope, ct); return "Idle"; } private async Task PurgeOldEntriesAsync(AsyncServiceScope scope, CancellationToken ct) { if (DateTime.UtcNow - _lastPurgeCheck < PurgeCheckInterval) { return; } _lastPurgeCheck = DateTime.UtcNow; try { var repository = scope.ServiceProvider.GetRequiredService(); var purgedCount = await repository.PurgeOldEntriesAsync( _options.PurgeRetentionDays, ct); if (purgedCount > 0) { _logger.LogInformation( "Purged {Count} DataUpdate records older than {Days} days", purgedCount, _options.PurgeRetentionDays); } } catch (Exception ex) { _logger.LogError(ex, "Failed to purge old data update entries"); } } private async Task NotifyStatusSafeAsync(string status, CancellationToken ct) { try { await using var scope = _scopeFactory.CreateAsyncScope(); var notificationService = scope.ServiceProvider.GetRequiredService(); await notificationService.NotifyStatusAsync(status, ct); } catch (Exception ex) { _logger.LogDebug(ex, "Failed to send status notification"); // Best-effort - don't throw } } } ``` **Step 4: Run tests to verify they pass** Run: `dotnet test tests/JdeScoping.DataSync.Tests --filter "WorkProcessorTests" -v n` Expected: PASS **Step 5: Commit** ```bash git add src/JdeScoping.DataSync/WorkProcessor.cs git add tests/JdeScoping.DataSync.Tests/WorkProcessorTests.cs git commit -m "feat(datasync): implement WorkProcessor background service" ``` --- ## Task 10: Implement SearchNotificationService **Files:** - Create: `src/JdeScoping.Api/Services/SearchNotificationService.cs` **Step 1: Create the implementation** ```csharp using JdeScoping.Api.Hubs; using JdeScoping.Core.Interfaces; using JdeScoping.Core.Models.Infrastructure; using JdeScoping.Core.Models.Search; using Microsoft.AspNetCore.SignalR; using Microsoft.Extensions.Logging; namespace JdeScoping.Api.Services; /// /// SignalR-based implementation of search notification service. /// public class SearchNotificationService : ISearchNotificationService { private readonly IHubContext _hubContext; private readonly ILogger _logger; public SearchNotificationService( IHubContext hubContext, ILogger logger) { _hubContext = hubContext ?? throw new ArgumentNullException(nameof(hubContext)); _logger = logger ?? throw new ArgumentNullException(nameof(logger)); } /// public async Task NotifySearchUpdateAsync(Search search, CancellationToken ct = default) { try { var update = new SearchUpdate { Id = search.Id, Status = search.Status, StartDt = search.StartDt, EndDt = search.EndDt }; await _hubContext.Clients.All.SendAsync("searchUpdate", update, ct); _logger.LogDebug("Search update sent for {SearchId}: {Status}", search.Id, search.Status); } catch (Exception ex) { _logger.LogDebug(ex, "Failed to send search update notification for {SearchId}", search.Id); } } /// public async Task NotifyStatusAsync(string status, CancellationToken ct = default) { try { var update = new StatusUpdate(status); await _hubContext.Clients.All.SendAsync("statusUpdate", update, ct); _logger.LogDebug("Status update sent: {Status}", status); } catch (Exception ex) { _logger.LogDebug(ex, "Failed to send status notification: {Status}", status); } } } ``` **Step 2: Verify the build compiles** Run: `dotnet build src/JdeScoping.Api/JdeScoping.Api.csproj` Expected: Build succeeded **Step 3: Commit** ```bash git add src/JdeScoping.Api/Services/SearchNotificationService.cs git commit -m "feat(api): implement SearchNotificationService using SignalR IHubContext" ``` --- ## Task 11: Fix Hourly Lookback Bug in ScheduleChecker **Files:** - Modify: `src/JdeScoping.DataSync/Services/ScheduleChecker.cs` - Modify: `tests/JdeScoping.DataSync.Tests/ScheduleCheckerTests.cs` **Step 1: Write the failing test** Add this test to `ScheduleCheckerTests.cs`: ```csharp [Fact] public async Task GetPendingTasksAsync_HourlySync_UsesHourlyTimestampForLookback() { // Arrange var massUpdate = new DataUpdate { TableName = "TestTable", UpdateType = UpdateTypes.Mass, EndDt = DateTime.UtcNow.AddDays(-1), WasSuccessful = true }; var dailyUpdate = new DataUpdate { TableName = "TestTable", UpdateType = UpdateTypes.Daily, EndDt = DateTime.UtcNow.AddHours(-2), // Daily was 2 hours ago WasSuccessful = true }; var hourlyUpdate = new DataUpdate { TableName = "TestTable", UpdateType = UpdateTypes.Hourly, EndDt = DateTime.UtcNow.AddMinutes(-90), // Hourly was 90 mins ago WasSuccessful = true }; var updates = new Dictionary { ["TestTable_3"] = massUpdate, ["TestTable_2"] = dailyUpdate, ["TestTable_1"] = hourlyUpdate }; _repository.GetLastDataUpdatesAsync(Arg.Any()) .Returns(updates); var config = CreateDataSourceConfig( hourlyEnabled: true, hourlyInterval: 60, // 1 hour dailyEnabled: true, dailyInterval: 1440); // 24 hours _options.Value.DataSources.Returns([config]); var sut = CreateScheduleChecker(); // Act var tasks = await sut.GetPendingTasksAsync(); // Assert tasks.ShouldHaveSingleItem(); var task = tasks[0]; task.UpdateType.ShouldBe(UpdateTypes.Hourly); // Critical: MinimumDt should be based on HOURLY timestamp (90 mins ago - 3*60 = -270 mins) // NOT daily timestamp (2 hours ago - 3*1440 = -4322 mins) // The lookback should be ~4.5 hours from now, not ~3 days task.MinimumDt.ShouldNotBeNull(); var lookbackMinutes = (DateTime.UtcNow - task.MinimumDt.Value).TotalMinutes; lookbackMinutes.ShouldBeLessThan(300); // Should be < 5 hours, not days } ``` **Step 2: Run test to verify it fails** Run: `dotnet test tests/JdeScoping.DataSync.Tests --filter "HourlySync_UsesHourlyTimestampForLookback" -v n` Expected: FAIL (lookback is days instead of hours) **Step 3: Fix the bug in ScheduleChecker** Change lines 110-114 in `ScheduleChecker.cs`: ```csharp // BEFORE (buggy - uses daily values): // Check Hourly (uses Daily's last timestamp for MinimumDT calculation, per legacy behavior) if (config.HourlyConfig.Enabled && NeedsHourlySync(config, lastHourly, lastDaily, lastMass, now)) { // Use daily update timestamp for lookback, not hourly var minimumDt = CalculateMinimumDt(lastDaily, config.DailyConfig.IntervalMinutes); // AFTER (fixed - uses hourly values): // Check Hourly if (config.HourlyConfig.Enabled && NeedsHourlySync(config, lastHourly, lastDaily, lastMass, now)) { // Use hourly update timestamp and interval for lookback var minimumDt = CalculateMinimumDt(lastHourly, config.HourlyConfig.IntervalMinutes); ``` Also update the comment on line 110 to remove the misleading legacy note. **Step 4: Run test to verify it passes** Run: `dotnet test tests/JdeScoping.DataSync.Tests --filter "HourlySync_UsesHourlyTimestampForLookback" -v n` Expected: PASS **Step 5: Run all ScheduleChecker tests** Run: `dotnet test tests/JdeScoping.DataSync.Tests --filter "ScheduleCheckerTests" -v n` Expected: All PASS **Step 6: Commit** ```bash git add src/JdeScoping.DataSync/Services/ScheduleChecker.cs git add tests/JdeScoping.DataSync.Tests/ScheduleCheckerTests.cs git commit -m "fix(datasync): use hourly timestamp for hourly lookback calculation Fixes legacy bug where hourly sync used daily timestamp and interval for lookback calculation, resulting in 3-day lookback instead of 3 hours." ``` --- ## Task 12: Update DependencyInjection **Files:** - Modify: `src/JdeScoping.DataSync/DependencyInjection.cs` - Modify: `src/JdeScoping.Host/Program.cs` (or wherever services are registered) - Modify: `src/JdeScoping.DataAccess/DependencyInjection.cs` **Step 1: Update DataSync DependencyInjection** Replace the `DataSyncService` registration with `WorkProcessor` and add new services: ```csharp using JdeScoping.DataSync; using JdeScoping.DataSync.Options; using JdeScoping.DataSync.Contracts; using JdeScoping.DataSync.HealthChecks; using JdeScoping.DataSync.Services; using JdeScoping.DataSync.Telemetry; using Microsoft.Extensions.Configuration; namespace Microsoft.Extensions.DependencyInjection; /// /// Extension methods for registering data sync services. /// public static class DataSyncDependencyInjection { /// /// Adds data synchronization services to the service collection. /// public static IServiceCollection AddDataSyncServices( this IServiceCollection services, IConfiguration configuration) { // Bind configuration with validation services.AddOptions() .Bind(configuration.GetSection(DataSyncOptions.SectionName)) .ValidateDataAnnotations() .ValidateOnStart(); services.AddOptions() .Bind(configuration.GetSection(WorkProcessorOptions.SectionName)) .ValidateDataAnnotations() .ValidateOnStart(); // Pipeline configuration (new ETL infrastructure) services.AddOptions() .Bind(configuration.GetSection(PipelineOptions.SectionName)); // Pipeline factory (new ETL infrastructure) services.AddSingleton(); // Register WorkProcessor (replaces DataSyncService) services.AddHostedService(); // Register core services as scoped (for parallel isolation) services.AddScoped(); services.AddScoped(); services.AddScoped(); services.AddScoped(); // Search services (scoped for parallel isolation) services.AddScoped(); services.AddScoped(); // Register health check services.AddHealthChecks() .AddCheck("data-sync", tags: ["datasync", "background"]); // Register metrics as singleton services.AddSingleton(); return services; } } ``` **Step 2: Update DataAccess DependencyInjection to register ISearchProcessor** Add to `src/JdeScoping.DataAccess/DependencyInjection.cs`: ```csharp services.AddScoped(); ``` **Step 3: Update Host/Program.cs to register SearchNotificationService** Add to service registration: ```csharp builder.Services.AddScoped(); ``` **Step 4: Verify the build compiles** Run: `dotnet build` Expected: Build succeeded **Step 5: Commit** ```bash git add src/JdeScoping.DataSync/DependencyInjection.cs git add src/JdeScoping.DataAccess/DependencyInjection.cs git add src/JdeScoping.Host/Program.cs git commit -m "feat: wire up WorkProcessor and search services in DI - Replace DataSyncService with WorkProcessor - Register ISearchRepository, ISearchExecutionService - Register ISearchProcessor in DataAccess - Register ISearchNotificationService in Host" ``` --- ## Task 13: Add Configuration to appsettings **Files:** - Modify: `src/JdeScoping.Host/appsettings.json` - Modify: `src/JdeScoping.Host/appsettings.Development.json` **Step 1: Add WorkProcessor section to appsettings.json** ```json { "WorkProcessor": { "Enabled": true, "WorkInterval": "00:00:05", "SearchTimeout": "00:30:00", "PurgeRetentionDays": 30 } } ``` **Step 2: Add to appsettings.Development.json** ```json { "WorkProcessor": { "Enabled": true, "WorkInterval": "00:00:05", "SearchTimeout": "00:05:00", "PurgeRetentionDays": 7 } } ``` **Step 3: Verify the build compiles and settings load** Run: `dotnet build src/JdeScoping.Host/JdeScoping.Host.csproj` Expected: Build succeeded **Step 4: Commit** ```bash git add src/JdeScoping.Host/appsettings.json git add src/JdeScoping.Host/appsettings.Development.json git commit -m "config: add WorkProcessor configuration section" ``` --- ## Task 14: Remove DataSyncService **Files:** - Delete: `src/JdeScoping.DataSync/DataSyncService.cs` - Modify: `tests/JdeScoping.DataSync.Tests/DataSyncServiceTests.cs` (remove or repurpose) **Step 1: Delete DataSyncService.cs** ```bash rm src/JdeScoping.DataSync/DataSyncService.cs ``` **Step 2: Update or remove DataSyncServiceTests.cs** Either delete the file or rename it to WorkProcessorTests.cs if the tests are useful: ```bash rm tests/JdeScoping.DataSync.Tests/DataSyncServiceTests.cs ``` **Step 3: Verify the build compiles** Run: `dotnet build` Expected: Build succeeded **Step 4: Run all tests** Run: `dotnet test` Expected: All PASS **Step 5: Commit** ```bash git add -A git commit -m "refactor(datasync): remove DataSyncService, replaced by WorkProcessor" ``` --- ## Task 15: Final Integration Test **Files:** - None (verification only) **Step 1: Run all tests** Run: `dotnet test --verbosity normal` Expected: All PASS **Step 2: Build the solution** Run: `dotnet build` Expected: Build succeeded **Step 3: Verify no compiler warnings** Run: `dotnet build --warnaserror` Expected: Build succeeded with no warnings **Step 4: Final commit (if any cleanup needed)** ```bash git status # If any uncommitted changes: git add -A git commit -m "chore: cleanup after WorkProcessor implementation" ``` --- ## Summary This implementation plan covers: 1. **Core Interfaces** (Tasks 1-2): ISearchProcessor, ISearchNotificationService 2. **Options** (Task 3): WorkProcessorOptions configuration 3. **Contracts** (Tasks 4-5): ISearchRepository, ISearchExecutionService 4. **Implementations** (Tasks 6-10): SearchProcessor interface, SearchRepository, SearchExecutionService, WorkProcessor, SearchNotificationService 5. **Bug Fix** (Task 11): Hourly lookback uses correct hourly values 6. **Wiring** (Tasks 12-13): DependencyInjection updates, appsettings configuration 7. **Cleanup** (Task 14): Remove deprecated DataSyncService 8. **Verification** (Task 15): Final integration test All tasks follow TDD with bite-sized steps and include commits at each stage.