- WorkProcessorReport.md: Analysis of legacy work processor from OLD solution - Design document with clean architecture and component specifications - Implementation plan with 15 TDD tasks
51 KiB
WorkProcessor Implementation Plan
For Claude: REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task.
Goal: Implement the unified WorkProcessor background service that coordinates data synchronization and search processing, replacing the legacy DataSyncService.
Architecture: The WorkProcessor is a BackgroundService in the DataSync layer that prioritizes data freshness over search processing. It uses clean architecture with interfaces in Core, implementations in DataSync/DataAccess, and SignalR notifications in Api.
Tech Stack: .NET 10, BackgroundService, SignalR, Dapper, EPPlus, xUnit, NSubstitute, Shouldly
Task 1: Create ISearchProcessor Interface
Files:
- Create:
src/JdeScoping.Core/Interfaces/ISearchProcessor.cs
Step 1: Create the interface
using JdeScoping.DataAccess.Models;
namespace JdeScoping.Core.Interfaces;
/// <summary>
/// Interface for search query execution.
/// </summary>
public interface ISearchProcessor
{
/// <summary>
/// Executes search and materializes all results into SearchModel.
/// </summary>
/// <param name="model">The search model containing search ID.</param>
/// <param name="ct">Cancellation token.</param>
/// <returns>The SearchModel populated with results.</returns>
Task<SearchModel> ExecuteSearchToModelAsync(SearchModel model, CancellationToken ct = default);
}
Step 2: Verify the build compiles
Run: dotnet build src/JdeScoping.Core/JdeScoping.Core.csproj
Expected: Build succeeded
Step 3: Commit
git add src/JdeScoping.Core/Interfaces/ISearchProcessor.cs
git commit -m "feat(core): add ISearchProcessor interface for search execution abstraction"
Task 2: Create ISearchNotificationService Interface
Files:
- Create:
src/JdeScoping.Core/Interfaces/ISearchNotificationService.cs
Step 1: Create the interface
using JdeScoping.Core.Models.Search;
namespace JdeScoping.Core.Interfaces;
/// <summary>
/// Interface for SignalR notifications - lives in Core to avoid dependency issues.
/// Implementation lives in Host/Api layer.
/// </summary>
public interface ISearchNotificationService
{
/// <summary>
/// Notifies clients of search status update.
/// </summary>
/// <param name="search">The search with updated status.</param>
/// <param name="ct">Cancellation token.</param>
Task NotifySearchUpdateAsync(Search search, CancellationToken ct = default);
/// <summary>
/// Notifies clients of work processor status change.
/// </summary>
/// <param name="status">Status message.</param>
/// <param name="ct">Cancellation token.</param>
Task NotifyStatusAsync(string status, CancellationToken ct = default);
}
Step 2: Verify the build compiles
Run: dotnet build src/JdeScoping.Core/JdeScoping.Core.csproj
Expected: Build succeeded
Step 3: Commit
git add src/JdeScoping.Core/Interfaces/ISearchNotificationService.cs
git commit -m "feat(core): add ISearchNotificationService interface for SignalR abstraction"
Task 3: Create WorkProcessorOptions
Files:
- Create:
src/JdeScoping.DataSync/Options/WorkProcessorOptions.cs
Step 1: Create the options class
using System.ComponentModel.DataAnnotations;
namespace JdeScoping.DataSync.Options;
/// <summary>
/// Configuration options for the WorkProcessor background service.
/// </summary>
public class WorkProcessorOptions
{
/// <summary>
/// Configuration section name.
/// </summary>
public const string SectionName = "WorkProcessor";
/// <summary>
/// Whether the work processor is enabled. Default: true.
/// </summary>
public bool Enabled { get; set; } = true;
/// <summary>
/// Interval between work cycles. Default: 5 seconds.
/// </summary>
[Range(typeof(TimeSpan), "00:00:01", "01:00:00")]
public TimeSpan WorkInterval { get; set; } = TimeSpan.FromSeconds(5);
/// <summary>
/// Search execution timeout. Default: 30 minutes.
/// </summary>
[Range(typeof(TimeSpan), "00:01:00", "04:00:00")]
public TimeSpan SearchTimeout { get; set; } = TimeSpan.FromMinutes(30);
/// <summary>
/// Number of days to retain DataUpdate records. Default: 30.
/// </summary>
[Range(1, 365)]
public int PurgeRetentionDays { get; set; } = 30;
}
Step 2: Verify the build compiles
Run: dotnet build src/JdeScoping.DataSync/JdeScoping.DataSync.csproj
Expected: Build succeeded
Step 3: Commit
git add src/JdeScoping.DataSync/Options/WorkProcessorOptions.cs
git commit -m "feat(datasync): add WorkProcessorOptions configuration class"
Task 4: Create ISearchRepository Interface
Files:
- Create:
src/JdeScoping.DataSync/Contracts/ISearchRepository.cs
Step 1: Create the interface
using JdeScoping.Core.Models.Search;
namespace JdeScoping.DataSync.Contracts;
/// <summary>
/// Repository interface for Search table operations.
/// </summary>
public interface ISearchRepository
{
/// <summary>
/// Gets the next queued search ordered by SubmitDT (FIFO).
/// </summary>
/// <param name="ct">Cancellation token.</param>
/// <returns>The next queued search, or null if none.</returns>
Task<Search?> GetNextQueuedSearchAsync(CancellationToken ct = default);
/// <summary>
/// Resets partial searches (Running but not Ended) back to Queued status.
/// Called at service startup to handle interrupted operations.
/// </summary>
/// <param name="ct">Cancellation token.</param>
/// <returns>Count of reset searches.</returns>
Task<int> ResetPartialSearchesAsync(CancellationToken ct = default);
/// <summary>
/// Marks search as Running with StartDT = now.
/// </summary>
/// <param name="searchId">Search ID.</param>
/// <param name="ct">Cancellation token.</param>
Task StartSearchAsync(int searchId, CancellationToken ct = default);
/// <summary>
/// Marks search as Ended (success) or Error (failure) with EndDT and optional Results.
/// </summary>
/// <param name="searchId">Search ID.</param>
/// <param name="success">Whether search completed successfully.</param>
/// <param name="results">Excel file bytes (null on failure).</param>
/// <param name="ct">Cancellation token.</param>
Task CompleteSearchAsync(int searchId, bool success, byte[]? results, CancellationToken ct = default);
}
Step 2: Verify the build compiles
Run: dotnet build src/JdeScoping.DataSync/JdeScoping.DataSync.csproj
Expected: Build succeeded
Step 3: Commit
git add src/JdeScoping.DataSync/Contracts/ISearchRepository.cs
git commit -m "feat(datasync): add ISearchRepository interface for search table operations"
Task 5: Create ISearchExecutionService Interface
Files:
- Create:
src/JdeScoping.DataSync/Contracts/ISearchExecutionService.cs
Step 1: Create the interface
using JdeScoping.Core.Models.Search;
namespace JdeScoping.DataSync.Contracts;
/// <summary>
/// Interface for search execution pipeline orchestration.
/// </summary>
public interface ISearchExecutionService
{
/// <summary>
/// Executes the complete search pipeline: query, Excel generation, result storage.
/// Handles status transitions and notifications.
/// </summary>
/// <param name="search">The search to execute.</param>
/// <param name="ct">Cancellation token.</param>
Task ExecuteSearchAsync(Search search, CancellationToken ct = default);
}
Step 2: Verify the build compiles
Run: dotnet build src/JdeScoping.DataSync/JdeScoping.DataSync.csproj
Expected: Build succeeded
Step 3: Commit
git add src/JdeScoping.DataSync/Contracts/ISearchExecutionService.cs
git commit -m "feat(datasync): add ISearchExecutionService interface for search pipeline"
Task 6: Implement SearchProcessor with ISearchProcessor
Files:
- Modify:
src/JdeScoping.DataAccess/Services/SearchProcessor.cs
Step 1: Update SearchProcessor to implement ISearchProcessor
Add interface implementation to the class declaration:
// Change from:
public sealed class SearchProcessor
// To:
public sealed class SearchProcessor : ISearchProcessor
Also add the using statement at the top:
using JdeScoping.Core.Interfaces;
Step 2: Verify the build compiles
Run: dotnet build src/JdeScoping.DataAccess/JdeScoping.DataAccess.csproj
Expected: Build succeeded
Step 3: Commit
git add src/JdeScoping.DataAccess/Services/SearchProcessor.cs
git commit -m "feat(dataaccess): implement ISearchProcessor interface on SearchProcessor"
Task 7: Implement SearchRepository
Files:
- Create:
src/JdeScoping.DataSync/Services/SearchRepository.cs - Create:
tests/JdeScoping.DataSync.Tests/Services/SearchRepositoryTests.cs
Step 1: Write the failing tests
using JdeScoping.Core.Models.Enums;
using JdeScoping.Core.Models.Search;
using JdeScoping.DataSync.Contracts;
using JdeScoping.DataSync.Services;
using Microsoft.Extensions.Logging;
using NSubstitute;
using Shouldly;
namespace JdeScoping.DataSync.Tests.Services;
public class SearchRepositoryTests
{
[Fact]
public async Task GetNextQueuedSearchAsync_WhenNoneQueued_ReturnsNull()
{
// Arrange
var sut = CreateRepository();
// Act
var result = await sut.GetNextQueuedSearchAsync();
// Assert
result.ShouldBeNull();
}
[Fact]
public async Task ResetPartialSearchesAsync_WhenNoPartialSearches_ReturnsZero()
{
// Arrange
var sut = CreateRepository();
// Act
var count = await sut.ResetPartialSearchesAsync();
// Assert
count.ShouldBe(0);
}
private static SearchRepository CreateRepository()
{
// Note: Actual implementation tests require database integration tests
// These are placeholder tests to verify the interface contract
throw new NotImplementedException("Integration tests required");
}
}
Step 2: Run tests to verify they fail
Run: dotnet test tests/JdeScoping.DataSync.Tests --filter "SearchRepositoryTests" -v n
Expected: FAIL (NotImplementedException or missing class)
Step 3: Implement SearchRepository
using Dapper;
using JdeScoping.Core.Models.Enums;
using JdeScoping.Core.Models.Search;
using JdeScoping.DataAccess.Interfaces;
using JdeScoping.DataSync.Contracts;
using Microsoft.Extensions.Logging;
namespace JdeScoping.DataSync.Services;
/// <summary>
/// Repository for Search table operations.
/// </summary>
public class SearchRepository : ISearchRepository
{
private readonly IDbConnectionFactory _connectionFactory;
private readonly ILogger<SearchRepository> _logger;
public SearchRepository(
IDbConnectionFactory connectionFactory,
ILogger<SearchRepository> logger)
{
_connectionFactory = connectionFactory;
_logger = logger;
}
/// <inheritdoc/>
public async Task<Search?> GetNextQueuedSearchAsync(CancellationToken ct = default)
{
await using var connection = await _connectionFactory.CreateLotFinderConnectionAsync(ct);
const string sql = """
SELECT TOP 1
Id, UserName, Name, Status, SubmitDT as SubmitDt,
StartDT as StartDt, EndDT as EndDt, CriteriaJSON as CriteriaJson
FROM dbo.Search
WHERE Status = @Status
ORDER BY SubmitDT ASC
""";
var search = await connection.QueryFirstOrDefaultAsync<Search>(
sql,
new { Status = (int)SearchStatus.Queued },
commandTimeout: 30);
if (search != null)
{
_logger.LogDebug("Found queued search {SearchId}", search.Id);
}
return search;
}
/// <inheritdoc/>
public async Task<int> ResetPartialSearchesAsync(CancellationToken ct = default)
{
await using var connection = await _connectionFactory.CreateLotFinderConnectionAsync(ct);
const string sql = """
UPDATE dbo.Search
SET Status = @QueuedStatus, StartDT = NULL
WHERE Status = @RunningStatus AND EndDT IS NULL
""";
var count = await connection.ExecuteAsync(
sql,
new
{
QueuedStatus = (int)SearchStatus.Queued,
RunningStatus = (int)SearchStatus.Running
},
commandTimeout: 30);
if (count > 0)
{
_logger.LogWarning("Reset {Count} partial searches to Queued status", count);
}
return count;
}
/// <inheritdoc/>
public async Task StartSearchAsync(int searchId, CancellationToken ct = default)
{
await using var connection = await _connectionFactory.CreateLotFinderConnectionAsync(ct);
const string sql = """
UPDATE dbo.Search
SET Status = @Status, StartDT = @StartDt
WHERE Id = @SearchId
""";
await connection.ExecuteAsync(
sql,
new
{
SearchId = searchId,
Status = (int)SearchStatus.Running,
StartDt = DateTime.UtcNow
},
commandTimeout: 30);
_logger.LogDebug("Started search {SearchId}", searchId);
}
/// <inheritdoc/>
public async Task CompleteSearchAsync(int searchId, bool success, byte[]? results, CancellationToken ct = default)
{
await using var connection = await _connectionFactory.CreateLotFinderConnectionAsync(ct);
const string sql = """
UPDATE dbo.Search
SET Status = @Status, EndDT = @EndDt, Results = @Results
WHERE Id = @SearchId
""";
var status = success ? SearchStatus.Ended : SearchStatus.Error;
await connection.ExecuteAsync(
sql,
new
{
SearchId = searchId,
Status = (int)status,
EndDt = DateTime.UtcNow,
Results = results
},
commandTimeout: 30);
_logger.LogDebug("Completed search {SearchId} with status {Status}", searchId, status);
}
}
Step 4: Update tests to use mock connection factory
using JdeScoping.Core.Models.Enums;
using JdeScoping.Core.Models.Search;
using JdeScoping.DataAccess.Interfaces;
using JdeScoping.DataSync.Contracts;
using JdeScoping.DataSync.Services;
using Microsoft.Data.SqlClient;
using Microsoft.Extensions.Logging;
using NSubstitute;
using Shouldly;
namespace JdeScoping.DataSync.Tests.Services;
public class SearchRepositoryTests
{
private readonly IDbConnectionFactory _connectionFactory;
private readonly ILogger<SearchRepository> _logger;
public SearchRepositoryTests()
{
_connectionFactory = Substitute.For<IDbConnectionFactory>();
_logger = Substitute.For<ILogger<SearchRepository>>();
}
[Fact]
public void Constructor_WithNullConnectionFactory_ThrowsArgumentNullException()
{
// Act & Assert
Should.Throw<ArgumentNullException>(() => new SearchRepository(null!, _logger));
}
[Fact]
public void Constructor_WithNullLogger_ThrowsArgumentNullException()
{
// Act & Assert
Should.Throw<ArgumentNullException>(() => new SearchRepository(_connectionFactory, null!));
}
[Fact]
public void Constructor_WithValidDependencies_CreatesInstance()
{
// Act
var repository = new SearchRepository(_connectionFactory, _logger);
// Assert
repository.ShouldNotBeNull();
}
}
Step 5: Run tests to verify they pass
Run: dotnet test tests/JdeScoping.DataSync.Tests --filter "SearchRepositoryTests" -v n
Expected: PASS
Step 6: Commit
git add src/JdeScoping.DataSync/Services/SearchRepository.cs
git add tests/JdeScoping.DataSync.Tests/Services/SearchRepositoryTests.cs
git commit -m "feat(datasync): implement SearchRepository with tests"
Task 8: Implement SearchExecutionService
Files:
- Create:
src/JdeScoping.DataSync/Services/SearchExecutionService.cs - Create:
tests/JdeScoping.DataSync.Tests/Services/SearchExecutionServiceTests.cs
Step 1: Write the failing tests
using JdeScoping.Core.Interfaces;
using JdeScoping.Core.Models.Enums;
using JdeScoping.Core.Models.Search;
using JdeScoping.DataAccess.Models;
using JdeScoping.DataSync.Contracts;
using JdeScoping.DataSync.Options;
using JdeScoping.DataSync.Services;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using NSubstitute;
using NSubstitute.ExceptionExtensions;
using Shouldly;
namespace JdeScoping.DataSync.Tests.Services;
public class SearchExecutionServiceTests
{
private readonly ISearchRepository _searchRepository;
private readonly ISearchProcessor _searchProcessor;
private readonly IExcelExportService _excelExportService;
private readonly ISearchNotificationService _notificationService;
private readonly IOptions<WorkProcessorOptions> _options;
private readonly ILogger<SearchExecutionService> _logger;
public SearchExecutionServiceTests()
{
_searchRepository = Substitute.For<ISearchRepository>();
_searchProcessor = Substitute.For<ISearchProcessor>();
_excelExportService = Substitute.For<IExcelExportService>();
_notificationService = Substitute.For<ISearchNotificationService>();
_options = Options.Create(new WorkProcessorOptions { SearchTimeout = TimeSpan.FromSeconds(30) });
_logger = Substitute.For<ILogger<SearchExecutionService>>();
}
[Fact]
public async Task ExecuteSearchAsync_Success_CompletesWithEndedStatus()
{
// Arrange
var search = new Search { Id = 1, Status = SearchStatus.Queued };
var model = new SearchModel { Id = 1, Results = [] };
var excelBytes = new byte[] { 1, 2, 3 };
_searchProcessor.ExecuteSearchToModelAsync(Arg.Any<SearchModel>(), Arg.Any<CancellationToken>())
.Returns(model);
_excelExportService.GenerateAsync(Arg.Any<SearchModel>(), Arg.Any<CancellationToken>())
.Returns(excelBytes);
var sut = CreateService();
// Act
await sut.ExecuteSearchAsync(search);
// Assert
await _searchRepository.Received(1).StartSearchAsync(1, Arg.Any<CancellationToken>());
await _searchRepository.Received(1).CompleteSearchAsync(1, true, excelBytes, Arg.Any<CancellationToken>());
await _notificationService.Received(2).NotifySearchUpdateAsync(search, Arg.Any<CancellationToken>());
}
[Fact]
public async Task ExecuteSearchAsync_ProcessorThrows_MarksAsError()
{
// Arrange
var search = new Search { Id = 1, Status = SearchStatus.Queued };
_searchProcessor.ExecuteSearchToModelAsync(Arg.Any<SearchModel>(), Arg.Any<CancellationToken>())
.ThrowsAsync(new InvalidOperationException("Test error"));
var sut = CreateService();
// Act
await sut.ExecuteSearchAsync(search);
// Assert
await _searchRepository.Received(1).CompleteSearchAsync(1, false, null, Arg.Any<CancellationToken>());
}
[Fact]
public async Task ExecuteSearchAsync_HostShutdown_DoesNotMarkAsError()
{
// Arrange
var search = new Search { Id = 1, Status = SearchStatus.Queued };
var cts = new CancellationTokenSource();
_searchProcessor.ExecuteSearchToModelAsync(Arg.Any<SearchModel>(), Arg.Any<CancellationToken>())
.Returns(async _ =>
{
cts.Cancel();
throw new OperationCanceledException(cts.Token);
});
var sut = CreateService();
// Act
await sut.ExecuteSearchAsync(search, cts.Token);
// Assert
await _searchRepository.DidNotReceive().CompleteSearchAsync(
Arg.Any<int>(), false, Arg.Any<byte[]?>(), Arg.Any<CancellationToken>());
}
private SearchExecutionService CreateService()
{
return new SearchExecutionService(
_searchRepository,
_searchProcessor,
_excelExportService,
_notificationService,
_options,
_logger);
}
}
Step 2: Run tests to verify they fail
Run: dotnet test tests/JdeScoping.DataSync.Tests --filter "SearchExecutionServiceTests" -v n
Expected: FAIL (missing class)
Step 3: Implement SearchExecutionService
using JdeScoping.Core.Interfaces;
using JdeScoping.Core.Models.Enums;
using JdeScoping.Core.Models.Search;
using JdeScoping.DataAccess.Models;
using JdeScoping.DataSync.Contracts;
using JdeScoping.DataSync.Options;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
namespace JdeScoping.DataSync.Services;
/// <summary>
/// Service that orchestrates the complete search execution pipeline.
/// </summary>
public class SearchExecutionService : ISearchExecutionService
{
private readonly ISearchRepository _searchRepository;
private readonly ISearchProcessor _searchProcessor;
private readonly IExcelExportService _excelExportService;
private readonly ISearchNotificationService _notificationService;
private readonly WorkProcessorOptions _options;
private readonly ILogger<SearchExecutionService> _logger;
public SearchExecutionService(
ISearchRepository searchRepository,
ISearchProcessor searchProcessor,
IExcelExportService excelExportService,
ISearchNotificationService notificationService,
IOptions<WorkProcessorOptions> options,
ILogger<SearchExecutionService> logger)
{
_searchRepository = searchRepository ?? throw new ArgumentNullException(nameof(searchRepository));
_searchProcessor = searchProcessor ?? throw new ArgumentNullException(nameof(searchProcessor));
_excelExportService = excelExportService ?? throw new ArgumentNullException(nameof(excelExportService));
_notificationService = notificationService ?? throw new ArgumentNullException(nameof(notificationService));
_options = options?.Value ?? throw new ArgumentNullException(nameof(options));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
/// <inheritdoc/>
public async Task ExecuteSearchAsync(Search search, CancellationToken ct = default)
{
using var timeoutCts = new CancellationTokenSource(_options.SearchTimeout);
using var linkedCts = CancellationTokenSource.CreateLinkedTokenSource(ct, timeoutCts.Token);
try
{
// Mark as Running
await _searchRepository.StartSearchAsync(search.Id, linkedCts.Token);
search.Status = SearchStatus.Running;
search.StartDt = DateTime.UtcNow;
await NotifySearchUpdateSafeAsync(search, linkedCts.Token);
// Execute search query
var model = new SearchModel { Id = search.Id };
await _searchProcessor.ExecuteSearchToModelAsync(model, linkedCts.Token);
// Generate Excel
var excelBytes = await _excelExportService.GenerateAsync(model, linkedCts.Token);
// Complete with success
await _searchRepository.CompleteSearchAsync(search.Id, true, excelBytes, linkedCts.Token);
search.Status = SearchStatus.Ended;
search.EndDt = DateTime.UtcNow;
await NotifySearchUpdateSafeAsync(search, linkedCts.Token);
_logger.LogInformation(
"Search {SearchId} completed successfully with {ResultCount} results",
search.Id, model.Results.Count);
}
catch (OperationCanceledException) when (ct.IsCancellationRequested)
{
// Host shutdown - don't mark as error, let startup cleanup handle it
_logger.LogInformation(
"Search {SearchId} interrupted by shutdown, will be requeued on restart",
search.Id);
// Don't rethrow - let the caller handle graceful shutdown
}
catch (OperationCanceledException) when (timeoutCts.IsCancellationRequested)
{
// Timeout - mark as error
_logger.LogWarning(
"Search {SearchId} timed out after {Timeout}",
search.Id, _options.SearchTimeout);
await CompleteWithErrorAsync(search);
}
catch (Exception ex)
{
_logger.LogError(ex, "Search {SearchId} failed", search.Id);
await CompleteWithErrorAsync(search);
}
}
private async Task CompleteWithErrorAsync(Search search)
{
try
{
await _searchRepository.CompleteSearchAsync(search.Id, false, null, CancellationToken.None);
search.Status = SearchStatus.Error;
search.EndDt = DateTime.UtcNow;
await NotifySearchUpdateSafeAsync(search, CancellationToken.None);
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to mark search {SearchId} as error", search.Id);
}
}
private async Task NotifySearchUpdateSafeAsync(Search search, CancellationToken ct)
{
try
{
await _notificationService.NotifySearchUpdateAsync(search, ct);
}
catch (Exception ex)
{
_logger.LogDebug(ex, "Failed to send search update notification for {SearchId}", search.Id);
}
}
}
Step 4: Run tests to verify they pass
Run: dotnet test tests/JdeScoping.DataSync.Tests --filter "SearchExecutionServiceTests" -v n
Expected: PASS
Step 5: Commit
git add src/JdeScoping.DataSync/Services/SearchExecutionService.cs
git add tests/JdeScoping.DataSync.Tests/Services/SearchExecutionServiceTests.cs
git commit -m "feat(datasync): implement SearchExecutionService with proper cancellation handling"
Task 9: Implement WorkProcessor
Files:
- Create:
src/JdeScoping.DataSync/WorkProcessor.cs - Create:
tests/JdeScoping.DataSync.Tests/WorkProcessorTests.cs
Step 1: Write the failing tests
using JdeScoping.Core.Interfaces;
using JdeScoping.Core.Models.Enums;
using JdeScoping.Core.Models.Search;
using JdeScoping.DataSync.Contracts;
using JdeScoping.DataSync.Options;
using JdeScoping.DataSync.Telemetry;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using NSubstitute;
using Shouldly;
namespace JdeScoping.DataSync.Tests;
public class WorkProcessorTests
{
private readonly IServiceScopeFactory _scopeFactory;
private readonly IOptions<WorkProcessorOptions> _options;
private readonly ILogger<WorkProcessor> _logger;
private readonly DataSyncMetrics _metrics;
public WorkProcessorTests()
{
_scopeFactory = Substitute.For<IServiceScopeFactory>();
_options = Options.Create(new WorkProcessorOptions
{
Enabled = true,
WorkInterval = TimeSpan.FromMilliseconds(100)
});
_logger = Substitute.For<ILogger<WorkProcessor>>();
_metrics = new DataSyncMetrics();
}
[Fact]
public async Task ExecuteAsync_WhenDisabled_StopsImmediately()
{
// Arrange
var options = Options.Create(new WorkProcessorOptions { Enabled = false });
var sut = new WorkProcessor(_scopeFactory, options, _logger, _metrics);
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(1));
// Act
await sut.StartAsync(cts.Token);
await Task.Delay(50);
await sut.StopAsync(CancellationToken.None);
// Assert - should not throw
}
[Fact]
public async Task ExecuteAsync_CallsStartupCleanup()
{
// Arrange
var dataUpdateRepo = Substitute.For<IDataUpdateRepository>();
var searchRepo = Substitute.For<ISearchRepository>();
var scheduleChecker = Substitute.For<IScheduleChecker>();
var notificationService = Substitute.For<ISearchNotificationService>();
scheduleChecker.GetPendingTasksAsync(Arg.Any<CancellationToken>())
.Returns([]);
var scope = SetupScope(dataUpdateRepo, searchRepo, scheduleChecker, notificationService);
_scopeFactory.CreateAsyncScope().Returns(scope);
var sut = new WorkProcessor(_scopeFactory, _options, _logger, _metrics);
using var cts = new CancellationTokenSource(TimeSpan.FromMilliseconds(200));
// Act
await sut.StartAsync(cts.Token);
await Task.Delay(150);
await sut.StopAsync(CancellationToken.None);
// Assert
await dataUpdateRepo.Received().CloseOpenUpdateEntriesAsync(Arg.Any<CancellationToken>());
await searchRepo.Received().ResetPartialSearchesAsync(Arg.Any<CancellationToken>());
}
private static AsyncServiceScope SetupScope(
IDataUpdateRepository dataUpdateRepo,
ISearchRepository searchRepo,
IScheduleChecker scheduleChecker,
ISearchNotificationService notificationService)
{
var serviceProvider = Substitute.For<IServiceProvider>();
serviceProvider.GetService(typeof(IDataUpdateRepository)).Returns(dataUpdateRepo);
serviceProvider.GetService(typeof(ISearchRepository)).Returns(searchRepo);
serviceProvider.GetService(typeof(IScheduleChecker)).Returns(scheduleChecker);
serviceProvider.GetService(typeof(ISearchNotificationService)).Returns(notificationService);
var scope = Substitute.For<IServiceScope>();
scope.ServiceProvider.Returns(serviceProvider);
return new AsyncServiceScope(scope);
}
}
Step 2: Run tests to verify they fail
Run: dotnet test tests/JdeScoping.DataSync.Tests --filter "WorkProcessorTests" -v n
Expected: FAIL (missing class)
Step 3: Implement WorkProcessor
using JdeScoping.Core.Interfaces;
using JdeScoping.Core.Models.Search;
using JdeScoping.DataSync.Contracts;
using JdeScoping.DataSync.Options;
using JdeScoping.DataSync.Telemetry;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
namespace JdeScoping.DataSync;
/// <summary>
/// Unified background service that coordinates data synchronization and search processing.
/// Data freshness takes priority over search processing.
/// </summary>
public class WorkProcessor : BackgroundService
{
private readonly IServiceScopeFactory _scopeFactory;
private readonly WorkProcessorOptions _options;
private readonly ILogger<WorkProcessor> _logger;
private readonly DataSyncMetrics _metrics;
private DateTime _lastPurgeCheck = DateTime.MinValue;
private static readonly TimeSpan PurgeCheckInterval = TimeSpan.FromHours(24);
public WorkProcessor(
IServiceScopeFactory scopeFactory,
IOptions<WorkProcessorOptions> options,
ILogger<WorkProcessor> logger,
DataSyncMetrics metrics)
{
_scopeFactory = scopeFactory ?? throw new ArgumentNullException(nameof(scopeFactory));
_options = options?.Value ?? throw new ArgumentNullException(nameof(options));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_metrics = metrics ?? throw new ArgumentNullException(nameof(metrics));
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
if (!_options.Enabled)
{
_logger.LogInformation("WorkProcessor is disabled");
return;
}
_logger.LogInformation(
"WorkProcessor starting with WorkInterval={WorkInterval}",
_options.WorkInterval);
// Startup cleanup
await StartupCleanupAsync(stoppingToken);
while (!stoppingToken.IsCancellationRequested)
{
string status = "Idle";
try
{
status = await DoWorkAsync(stoppingToken);
}
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
{
_logger.LogInformation("WorkProcessor stopping gracefully");
break;
}
catch (Exception ex)
{
_logger.LogError(ex, "Error in work cycle");
_metrics.RecordCycleError();
}
finally
{
// Always notify status (best-effort)
await NotifyStatusSafeAsync(status, stoppingToken);
}
try
{
await Task.Delay(_options.WorkInterval, stoppingToken);
}
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
{
break;
}
}
await NotifyStatusSafeAsync("Stopped", CancellationToken.None);
_logger.LogInformation("WorkProcessor stopped");
}
private async Task StartupCleanupAsync(CancellationToken ct)
{
await using var scope = _scopeFactory.CreateAsyncScope();
// Close interrupted DataUpdate entries
try
{
var dataUpdateRepo = scope.ServiceProvider.GetRequiredService<IDataUpdateRepository>();
var closedCount = await dataUpdateRepo.CloseOpenUpdateEntriesAsync(ct);
if (closedCount > 0)
{
_logger.LogWarning("Closed {Count} interrupted data update entries", closedCount);
}
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to close open data update entries");
}
// Reset partial searches
try
{
var searchRepo = scope.ServiceProvider.GetRequiredService<ISearchRepository>();
var resetCount = await searchRepo.ResetPartialSearchesAsync(ct);
if (resetCount > 0)
{
_logger.LogWarning("Reset {Count} partial searches to Queued", resetCount);
}
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to reset partial searches");
}
}
private async Task<string> DoWorkAsync(CancellationToken ct)
{
await using var scope = _scopeFactory.CreateAsyncScope();
// Priority 1: Data syncs
var scheduleChecker = scope.ServiceProvider.GetRequiredService<IScheduleChecker>();
var pendingTasks = await scheduleChecker.GetPendingTasksAsync(ct);
if (pendingTasks.Count > 0)
{
await NotifyStatusSafeAsync("Updating data cache", ct);
var orchestrator = scope.ServiceProvider.GetRequiredService<ISyncOrchestrator>();
await orchestrator.ExecutePendingSyncsAsync(ct);
// Periodic purge check
await PurgeOldEntriesAsync(scope, ct);
return "Idle";
}
// Priority 2: Search processing (only when syncs are current)
var searchRepository = scope.ServiceProvider.GetRequiredService<ISearchRepository>();
var search = await searchRepository.GetNextQueuedSearchAsync(ct);
if (search != null)
{
await NotifyStatusSafeAsync($"Processing search #{search.Id}", ct);
var executionService = scope.ServiceProvider.GetRequiredService<ISearchExecutionService>();
await executionService.ExecuteSearchAsync(search, ct);
}
// Periodic purge check
await PurgeOldEntriesAsync(scope, ct);
return "Idle";
}
private async Task PurgeOldEntriesAsync(AsyncServiceScope scope, CancellationToken ct)
{
if (DateTime.UtcNow - _lastPurgeCheck < PurgeCheckInterval)
{
return;
}
_lastPurgeCheck = DateTime.UtcNow;
try
{
var repository = scope.ServiceProvider.GetRequiredService<IDataUpdateRepository>();
var purgedCount = await repository.PurgeOldEntriesAsync(
_options.PurgeRetentionDays, ct);
if (purgedCount > 0)
{
_logger.LogInformation(
"Purged {Count} DataUpdate records older than {Days} days",
purgedCount, _options.PurgeRetentionDays);
}
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to purge old data update entries");
}
}
private async Task NotifyStatusSafeAsync(string status, CancellationToken ct)
{
try
{
await using var scope = _scopeFactory.CreateAsyncScope();
var notificationService = scope.ServiceProvider.GetRequiredService<ISearchNotificationService>();
await notificationService.NotifyStatusAsync(status, ct);
}
catch (Exception ex)
{
_logger.LogDebug(ex, "Failed to send status notification");
// Best-effort - don't throw
}
}
}
Step 4: Run tests to verify they pass
Run: dotnet test tests/JdeScoping.DataSync.Tests --filter "WorkProcessorTests" -v n
Expected: PASS
Step 5: Commit
git add src/JdeScoping.DataSync/WorkProcessor.cs
git add tests/JdeScoping.DataSync.Tests/WorkProcessorTests.cs
git commit -m "feat(datasync): implement WorkProcessor background service"
Task 10: Implement SearchNotificationService
Files:
- Create:
src/JdeScoping.Api/Services/SearchNotificationService.cs
Step 1: Create the implementation
using JdeScoping.Api.Hubs;
using JdeScoping.Core.Interfaces;
using JdeScoping.Core.Models.Infrastructure;
using JdeScoping.Core.Models.Search;
using Microsoft.AspNetCore.SignalR;
using Microsoft.Extensions.Logging;
namespace JdeScoping.Api.Services;
/// <summary>
/// SignalR-based implementation of search notification service.
/// </summary>
public class SearchNotificationService : ISearchNotificationService
{
private readonly IHubContext<StatusHub> _hubContext;
private readonly ILogger<SearchNotificationService> _logger;
public SearchNotificationService(
IHubContext<StatusHub> hubContext,
ILogger<SearchNotificationService> logger)
{
_hubContext = hubContext ?? throw new ArgumentNullException(nameof(hubContext));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
/// <inheritdoc/>
public async Task NotifySearchUpdateAsync(Search search, CancellationToken ct = default)
{
try
{
var update = new SearchUpdate
{
Id = search.Id,
Status = search.Status,
StartDt = search.StartDt,
EndDt = search.EndDt
};
await _hubContext.Clients.All.SendAsync("searchUpdate", update, ct);
_logger.LogDebug("Search update sent for {SearchId}: {Status}", search.Id, search.Status);
}
catch (Exception ex)
{
_logger.LogDebug(ex, "Failed to send search update notification for {SearchId}", search.Id);
}
}
/// <inheritdoc/>
public async Task NotifyStatusAsync(string status, CancellationToken ct = default)
{
try
{
var update = new StatusUpdate(status);
await _hubContext.Clients.All.SendAsync("statusUpdate", update, ct);
_logger.LogDebug("Status update sent: {Status}", status);
}
catch (Exception ex)
{
_logger.LogDebug(ex, "Failed to send status notification: {Status}", status);
}
}
}
Step 2: Verify the build compiles
Run: dotnet build src/JdeScoping.Api/JdeScoping.Api.csproj
Expected: Build succeeded
Step 3: Commit
git add src/JdeScoping.Api/Services/SearchNotificationService.cs
git commit -m "feat(api): implement SearchNotificationService using SignalR IHubContext"
Task 11: Fix Hourly Lookback Bug in ScheduleChecker
Files:
- Modify:
src/JdeScoping.DataSync/Services/ScheduleChecker.cs - Modify:
tests/JdeScoping.DataSync.Tests/ScheduleCheckerTests.cs
Step 1: Write the failing test
Add this test to ScheduleCheckerTests.cs:
[Fact]
public async Task GetPendingTasksAsync_HourlySync_UsesHourlyTimestampForLookback()
{
// Arrange
var massUpdate = new DataUpdate
{
TableName = "TestTable",
UpdateType = UpdateTypes.Mass,
EndDt = DateTime.UtcNow.AddDays(-1),
WasSuccessful = true
};
var dailyUpdate = new DataUpdate
{
TableName = "TestTable",
UpdateType = UpdateTypes.Daily,
EndDt = DateTime.UtcNow.AddHours(-2), // Daily was 2 hours ago
WasSuccessful = true
};
var hourlyUpdate = new DataUpdate
{
TableName = "TestTable",
UpdateType = UpdateTypes.Hourly,
EndDt = DateTime.UtcNow.AddMinutes(-90), // Hourly was 90 mins ago
WasSuccessful = true
};
var updates = new Dictionary<string, DataUpdate>
{
["TestTable_3"] = massUpdate,
["TestTable_2"] = dailyUpdate,
["TestTable_1"] = hourlyUpdate
};
_repository.GetLastDataUpdatesAsync(Arg.Any<CancellationToken>())
.Returns(updates);
var config = CreateDataSourceConfig(
hourlyEnabled: true,
hourlyInterval: 60, // 1 hour
dailyEnabled: true,
dailyInterval: 1440); // 24 hours
_options.Value.DataSources.Returns([config]);
var sut = CreateScheduleChecker();
// Act
var tasks = await sut.GetPendingTasksAsync();
// Assert
tasks.ShouldHaveSingleItem();
var task = tasks[0];
task.UpdateType.ShouldBe(UpdateTypes.Hourly);
// Critical: MinimumDt should be based on HOURLY timestamp (90 mins ago - 3*60 = -270 mins)
// NOT daily timestamp (2 hours ago - 3*1440 = -4322 mins)
// The lookback should be ~4.5 hours from now, not ~3 days
task.MinimumDt.ShouldNotBeNull();
var lookbackMinutes = (DateTime.UtcNow - task.MinimumDt.Value).TotalMinutes;
lookbackMinutes.ShouldBeLessThan(300); // Should be < 5 hours, not days
}
Step 2: Run test to verify it fails
Run: dotnet test tests/JdeScoping.DataSync.Tests --filter "HourlySync_UsesHourlyTimestampForLookback" -v n
Expected: FAIL (lookback is days instead of hours)
Step 3: Fix the bug in ScheduleChecker
Change lines 110-114 in ScheduleChecker.cs:
// BEFORE (buggy - uses daily values):
// Check Hourly (uses Daily's last timestamp for MinimumDT calculation, per legacy behavior)
if (config.HourlyConfig.Enabled && NeedsHourlySync(config, lastHourly, lastDaily, lastMass, now))
{
// Use daily update timestamp for lookback, not hourly
var minimumDt = CalculateMinimumDt(lastDaily, config.DailyConfig.IntervalMinutes);
// AFTER (fixed - uses hourly values):
// Check Hourly
if (config.HourlyConfig.Enabled && NeedsHourlySync(config, lastHourly, lastDaily, lastMass, now))
{
// Use hourly update timestamp and interval for lookback
var minimumDt = CalculateMinimumDt(lastHourly, config.HourlyConfig.IntervalMinutes);
Also update the comment on line 110 to remove the misleading legacy note.
Step 4: Run test to verify it passes
Run: dotnet test tests/JdeScoping.DataSync.Tests --filter "HourlySync_UsesHourlyTimestampForLookback" -v n
Expected: PASS
Step 5: Run all ScheduleChecker tests
Run: dotnet test tests/JdeScoping.DataSync.Tests --filter "ScheduleCheckerTests" -v n
Expected: All PASS
Step 6: Commit
git add src/JdeScoping.DataSync/Services/ScheduleChecker.cs
git add tests/JdeScoping.DataSync.Tests/ScheduleCheckerTests.cs
git commit -m "fix(datasync): use hourly timestamp for hourly lookback calculation
Fixes legacy bug where hourly sync used daily timestamp and interval
for lookback calculation, resulting in 3-day lookback instead of 3 hours."
Task 12: Update DependencyInjection
Files:
- Modify:
src/JdeScoping.DataSync/DependencyInjection.cs - Modify:
src/JdeScoping.Host/Program.cs(or wherever services are registered) - Modify:
src/JdeScoping.DataAccess/DependencyInjection.cs
Step 1: Update DataSync DependencyInjection
Replace the DataSyncService registration with WorkProcessor and add new services:
using JdeScoping.DataSync;
using JdeScoping.DataSync.Options;
using JdeScoping.DataSync.Contracts;
using JdeScoping.DataSync.HealthChecks;
using JdeScoping.DataSync.Services;
using JdeScoping.DataSync.Telemetry;
using Microsoft.Extensions.Configuration;
namespace Microsoft.Extensions.DependencyInjection;
/// <summary>
/// Extension methods for registering data sync services.
/// </summary>
public static class DataSyncDependencyInjection
{
/// <summary>
/// Adds data synchronization services to the service collection.
/// </summary>
public static IServiceCollection AddDataSyncServices(
this IServiceCollection services,
IConfiguration configuration)
{
// Bind configuration with validation
services.AddOptions<DataSyncOptions>()
.Bind(configuration.GetSection(DataSyncOptions.SectionName))
.ValidateDataAnnotations()
.ValidateOnStart();
services.AddOptions<WorkProcessorOptions>()
.Bind(configuration.GetSection(WorkProcessorOptions.SectionName))
.ValidateDataAnnotations()
.ValidateOnStart();
// Pipeline configuration (new ETL infrastructure)
services.AddOptions<PipelineOptions>()
.Bind(configuration.GetSection(PipelineOptions.SectionName));
// Pipeline factory (new ETL infrastructure)
services.AddSingleton<IEtlPipelineFactory, EtlPipelineFactory>();
// Register WorkProcessor (replaces DataSyncService)
services.AddHostedService<WorkProcessor>();
// Register core services as scoped (for parallel isolation)
services.AddScoped<ISyncOrchestrator, SyncOrchestrator>();
services.AddScoped<IScheduleChecker, ScheduleChecker>();
services.AddScoped<ITableSyncOperation, TableSyncOperation>();
services.AddScoped<IDataUpdateRepository, DataUpdateRepository>();
// Search services (scoped for parallel isolation)
services.AddScoped<ISearchRepository, SearchRepository>();
services.AddScoped<ISearchExecutionService, SearchExecutionService>();
// Register health check
services.AddHealthChecks()
.AddCheck<DataSyncHealthCheck>("data-sync", tags: ["datasync", "background"]);
// Register metrics as singleton
services.AddSingleton<DataSyncMetrics>();
return services;
}
}
Step 2: Update DataAccess DependencyInjection to register ISearchProcessor
Add to src/JdeScoping.DataAccess/DependencyInjection.cs:
services.AddScoped<ISearchProcessor, SearchProcessor>();
Step 3: Update Host/Program.cs to register SearchNotificationService
Add to service registration:
builder.Services.AddScoped<ISearchNotificationService, SearchNotificationService>();
Step 4: Verify the build compiles
Run: dotnet build
Expected: Build succeeded
Step 5: Commit
git add src/JdeScoping.DataSync/DependencyInjection.cs
git add src/JdeScoping.DataAccess/DependencyInjection.cs
git add src/JdeScoping.Host/Program.cs
git commit -m "feat: wire up WorkProcessor and search services in DI
- Replace DataSyncService with WorkProcessor
- Register ISearchRepository, ISearchExecutionService
- Register ISearchProcessor in DataAccess
- Register ISearchNotificationService in Host"
Task 13: Add Configuration to appsettings
Files:
- Modify:
src/JdeScoping.Host/appsettings.json - Modify:
src/JdeScoping.Host/appsettings.Development.json
Step 1: Add WorkProcessor section to appsettings.json
{
"WorkProcessor": {
"Enabled": true,
"WorkInterval": "00:00:05",
"SearchTimeout": "00:30:00",
"PurgeRetentionDays": 30
}
}
Step 2: Add to appsettings.Development.json
{
"WorkProcessor": {
"Enabled": true,
"WorkInterval": "00:00:05",
"SearchTimeout": "00:05:00",
"PurgeRetentionDays": 7
}
}
Step 3: Verify the build compiles and settings load
Run: dotnet build src/JdeScoping.Host/JdeScoping.Host.csproj
Expected: Build succeeded
Step 4: Commit
git add src/JdeScoping.Host/appsettings.json
git add src/JdeScoping.Host/appsettings.Development.json
git commit -m "config: add WorkProcessor configuration section"
Task 14: Remove DataSyncService
Files:
- Delete:
src/JdeScoping.DataSync/DataSyncService.cs - Modify:
tests/JdeScoping.DataSync.Tests/DataSyncServiceTests.cs(remove or repurpose)
Step 1: Delete DataSyncService.cs
rm src/JdeScoping.DataSync/DataSyncService.cs
Step 2: Update or remove DataSyncServiceTests.cs
Either delete the file or rename it to WorkProcessorTests.cs if the tests are useful:
rm tests/JdeScoping.DataSync.Tests/DataSyncServiceTests.cs
Step 3: Verify the build compiles
Run: dotnet build
Expected: Build succeeded
Step 4: Run all tests
Run: dotnet test
Expected: All PASS
Step 5: Commit
git add -A
git commit -m "refactor(datasync): remove DataSyncService, replaced by WorkProcessor"
Task 15: Final Integration Test
Files:
- None (verification only)
Step 1: Run all tests
Run: dotnet test --verbosity normal
Expected: All PASS
Step 2: Build the solution
Run: dotnet build
Expected: Build succeeded
Step 3: Verify no compiler warnings
Run: dotnet build --warnaserror
Expected: Build succeeded with no warnings
Step 4: Final commit (if any cleanup needed)
git status
# If any uncommitted changes:
git add -A
git commit -m "chore: cleanup after WorkProcessor implementation"
Summary
This implementation plan covers:
- Core Interfaces (Tasks 1-2): ISearchProcessor, ISearchNotificationService
- Options (Task 3): WorkProcessorOptions configuration
- Contracts (Tasks 4-5): ISearchRepository, ISearchExecutionService
- Implementations (Tasks 6-10): SearchProcessor interface, SearchRepository, SearchExecutionService, WorkProcessor, SearchNotificationService
- Bug Fix (Task 11): Hourly lookback uses correct hourly values
- Wiring (Tasks 12-13): DependencyInjection updates, appsettings configuration
- Cleanup (Task 14): Remove deprecated DataSyncService
- Verification (Task 15): Final integration test
All tasks follow TDD with bite-sized steps and include commits at each stage.