diff --git a/DATA_SYNC/WorkProcessorReport.md b/DATA_SYNC/WorkProcessorReport.md new file mode 100644 index 0000000..fc519b9 --- /dev/null +++ b/DATA_SYNC/WorkProcessorReport.md @@ -0,0 +1,407 @@ +# Work Processor Report + +This document describes the background work processor from the legacy JDE Scoping Tool (LotFinder) WorkerService. The work processor is responsible for coordinating data synchronization and search processing. + +## Executive Summary + +The `WorkProcessor` class is the main orchestration component that runs as a Windows service (via Topshelf). It implements a continuous polling loop that: + +1. Checks if any data syncs are overdue +2. If overdue syncs exist → performs data updates in parallel +3. If all syncs are current → processes the next queued search +4. Sleeps for 5 seconds and repeats + +**Key Design Principle:** Data freshness takes priority over search processing. Searches are only processed when all data caches are up to date. + +--- + +## Architecture Overview + +``` +┌─────────────────────────────────────────────────────────────────┐ +│ WorkProcessor │ +│ (Topshelf Service) │ +├─────────────────────────────────────────────────────────────────┤ +│ │ +│ ┌──────────────┐ ┌───────────────┐ ┌──────────────┐ │ +│ │ Start() │────▶│ DoWork() │────▶│ Stop() │ │ +│ └──────────────┘ └───────┬───────┘ └──────────────┘ │ +│ │ │ +│ ┌─────────────────┼─────────────────┐ │ +│ ▼ ▼ ▼ │ +│ ┌──────────────────┐ ┌───────────┐ ┌─────────────────┐ │ +│ │ UpdateProcessor │ │ Search │ │ SignalR Hub │ │ +│ │ (Data Sync) │ │ Processor │ │ (Status Update) │ │ +│ └──────────────────┘ └───────────┘ └─────────────────┘ │ +│ │ +└─────────────────────────────────────────────────────────────────┘ +``` + +--- + +## Source Files + +| File | Purpose | +|------|---------| +| `OLD/WorkerService/Process/WorkProcessor.cs` | Main orchestration loop | +| `OLD/WorkerService/Process/UpdateProcessor.cs` | Data sync coordination | +| `OLD/WorkerService/Process/UpdateProcessor.DataUpdateEntry.cs` | DataUpdate table logging | +| `OLD/WorkerService/Process/UpdateProcessor.TableManagement.cs` | Staging/merge SQL generation | +| `OLD/WorkerService/Process/LotFinderDBExt.cs` | Search query execution | +| `OLD/WorkerService/Process/ExcelWriter.cs` | Excel result generation | +| `OLD/WorkerService/Program.cs` | Topshelf service configuration | + +--- + +## Main Work Loop + +### Constants + +| Constant | Value | Description | +|----------|-------|-------------| +| `WAIT_INTERVAL` | 5000 ms (5 seconds) | Sleep duration between work cycles | + +### Loop Structure + +``` +Start() + └── Thread spawned + └── while(true) + ├── DoWork() + └── cancel.WaitOne(WAIT_INTERVAL) + └── if signaled → break +``` + +The loop runs indefinitely until `Stop()` is called, which sets the `cancel` ManualResetEvent. + +--- + +## DoWork() Algorithm + +The `DoWork()` method implements the following decision flow: + +``` +DoWork() +│ +├─▶ GetPendingUpdateTasks() +│ └── Returns list of overdue data syncs +│ +├─▶ IF pending updates exist: +│ ├── Status = "Updating data cache" +│ ├── Parallel.ForEach(pending, MaxDegreeOfParallelism=8) +│ │ └── UpdateProcessor.DoUpdate(task) +│ └── RETURN (skip search processing this cycle) +│ +├─▶ ELSE (all data syncs current): +│ │ +│ ├── ResetPartialSearches() +│ │ └── Reset any searches that started but never completed +│ │ +│ ├── GetNextSearch() +│ │ └── Get oldest queued search (Status=1, ORDER BY SubmitDT) +│ │ +│ └── IF search found: +│ ├── Status = "Processing search #{ID}" +│ ├── StartSearch(search) +│ │ └── Status → 2 (Started), set StartDT +│ ├── PublishSearchUpdate(search) → SignalR +│ │ +│ ├── Execute search query +│ ├── Generate Excel results +│ ├── Save debug file: search_{ID}.xlsx +│ │ +│ ├── CompleteSearch(search, success=true) +│ │ └── Status → 3 (Ended), store Results +│ └── PublishSearchUpdate(search) → SignalR +│ +│ ON EXCEPTION: +│ ├── Log error +│ ├── CompleteSearch(search, success=false) +│ │ └── Status → 4 (Error) +│ └── PublishSearchUpdate(search) → SignalR +│ +└── Status = "Idle" +``` + +--- + +## Data Sync Scheduling + +### How "Overdue" Is Determined + +The `GetPendingUpdateTasks()` method checks each configured data source against the `LastDataUpdates` view: + +```sql +-- LastDataUpdates view returns the most recent successful update per table/type +SELECT TableName, + COALESCE([3], '1970-01-01') AS MassUpdateDT, -- UpdateType 3 = Mass + COALESCE([2], '1970-01-01') AS DailyUpdateDT, -- UpdateType 2 = Daily + COALESCE([1], '1970-01-01') AS HourlyUpdateDT -- UpdateType 1 = Hourly +FROM (pivot query on DataUpdate table) +``` + +### Priority Order (Exclusive) + +Only ONE update type is selected per data source per cycle: + +| Priority | Condition | Update Type | +|----------|-----------|-------------| +| 1 | No record exists OR `Now > MassUpdateDT + MassInterval` | **Mass** (full reload) | +| 2 | `Now > DailyUpdateDT + DailyInterval` | **Daily** (incremental) | +| 3 | `Now > HourlyUpdateDT + HourlyInterval` | **Hourly** (incremental) | + +### Lookback Multiplier + +For incremental updates (Daily/Hourly), the query uses a **lookback of 3× the interval**: + +```csharp +// Daily update example +MinimumDT = lastDataUpdate.DailyUpdateDT.AddMinutes(-3 * DailyInterval); +// If DailyInterval = 1440 (24 hours), lookback = 3 days + +// Hourly update example (note: uses Daily values - see bug below) +MinimumDT = lastDataUpdate.DailyUpdateDT.AddMinutes(-3 * DailyInterval); +``` + +**Purpose:** The 3× multiplier provides overlap to catch any records that may have been missed due to timing edge cases or delays. + +> **⚠️ Legacy Bug:** In `GetPendingUpdateTasks()`, the Hourly update incorrectly uses `DailyUpdateDT` and `DailyUpdateConfig.Interval` instead of the hourly equivalents. This means hourly updates use a 3-day lookback instead of 3 hours. The separate `DoUpdate(tableName, updateType)` overload correctly uses hourly values, but this path is not called from the main work loop. + +### Parallel Execution + +Data updates run in parallel with controlled concurrency: + +```csharp +Parallel.ForEach(pending, + new ParallelOptions { MaxDegreeOfParallelism = 8 }, + task => UpdateProcessor.DoUpdate(task)); +``` + +--- + +## Data Update Process + +Each `DoUpdate(task)` execution: + +1. **Log Start**: Insert `DataUpdate` record with `NumberRecords = -2` (in-progress marker) +2. **Prepurge** (if configured): `TRUNCATE TABLE {destination}` +3. **Fetch Data**: Call the configured data fetch function (JDE/CMS query) +4. **Batch Processing** (1M records per batch): + - Create staging table (`#Staging{table}`) - local temp table + - Create index on staging table (PK columns + LastUpdateDT/ReleaseDate) + - Disable non-PK indexes on staging table + - Bulk copy data to staging (batch size: 10,000) + - Rebuild indexes on staging table + - Create temp table (`#{table}`) with deduplication via `ROW_NUMBER() OVER (PARTITION BY {PK} ORDER BY LastUpdateDT)` + - MERGE to destination table (only updates when `TARGET.LastUpdateDT < SOURCE.LastUpdateDT`) +5. **Post-Processing**: Run configured action (e.g., `PostProcessMisData`) +6. **Reindex** (if configured): Rebuild all non-PK indexes on destination +7. **Log End**: Update `DataUpdate` record with `WasSuccessful`, `NumberRecords` + +### In-Progress Detection + +Records with `NumberRecords = -2` indicate updates that started but never completed (crash/restart). + +> **Note:** The codebase contains a `CloseOpenUpdateEntries()` method to clean up these records, but it is **never called** in the current implementation. This means crashed updates remain with `NumberRecords = -2` indefinitely. The cleanup SQL exists but is unused: + +```sql +-- Defined but never invoked +UPDATE dbo.DataUpdate +SET EndDT = GETDATE(), WasSuccessful = 0, NumberRecords = -1 +WHERE NumberRecords = -2 +``` + +--- + +## Search Processing + +### Search Status Values + +| Value | Enum | Description | +|-------|------|-------------| +| 0 | `New` | Created but not submitted | +| 1 | `Submitted` | Queued for processing | +| 2 | `Started` | Currently being processed | +| 3 | `Ended` | Completed successfully | +| 4 | `Error` | Failed with error | + +### Partial Search Recovery + +Before processing new searches, the system resets any "orphaned" searches: + +```sql +-- ResetPartialSearches stored procedure +UPDATE dbo.Search +SET Status = 1, StartDT = NULL +WHERE StartDT IS NOT NULL AND EndDT IS NULL +``` + +This handles cases where the service crashed mid-search. + +### Search Execution Flow + +1. **GetNextSearch**: Query for oldest `Status = 1` search +2. **StartSearch**: Update to `Status = 2`, set `StartDT` +3. **Execute Query**: Run the generated SQL against local cache +4. **Generate Excel**: Create Excel file using EPPlus +5. **CompleteSearch**: Update `Status = 3` (or 4 on error), store `Results` blob + +--- + +## SignalR Integration + +### Status Updates + +The `Status` property publishes changes to the SignalR hub: + +```csharp +public string Status +{ + set + { + if (!string.Equals(_status, value)) + { + _status = value; + logger.Info("Status: {0}", _status); + PublishStatus(_status); // → SignalR + } + } +} +``` + +### Hub Methods + +| Method | Payload | When Called | +|--------|---------|-------------| +| `SetStatus` | `{ Message, Timestamp }` | On status property change | +| `PublishSearchUpdate` | `SearchUpdate(search)` | After StartSearch, CompleteSearch | + +### Connection Pattern + +Each publish creates a new `HubConnection`: + +```csharp +string hubHost = ConfigurationManager.AppSettings["HubHost"]; +using (HubConnection hubConnection = new HubConnection(hubHost)) +{ + IHubProxy hubProxy = hubConnection.CreateHubProxy("StatusHub"); + await hubConnection.Start(); + await hubProxy.Invoke("SetStatus", update); +} +``` + +**Note:** This is an `async void` fire-and-forget pattern. Failures are logged but don't interrupt processing. + +--- + +## Timing Summary + +| Event | Timing | +|-------|--------| +| Work loop interval | 5 seconds | +| Mass sync check | Every ~7 days (10080 minutes) | +| Daily sync check | Every ~24 hours (1440 minutes) | +| Hourly sync check | Every ~1 hour (60 minutes) | +| Query lookback (incremental) | 3× the interval | +| Data update parallelism | Up to 8 concurrent | +| Bulk copy batch size | 10,000 rows | +| Data batch grouping | 1,000,000 rows | + +--- + +## Database Tables + +### DataUpdate + +Tracks all data sync operations: + +```sql +CREATE TABLE dbo.DataUpdate ( + ID INT IDENTITY PRIMARY KEY, + SourceSystem VARCHAR(50), -- 'JDE', 'CMS' + SourceData VARCHAR(50), -- 'WORKORDER', 'LOTUSAGE', etc. + TableName VARCHAR(50), -- Destination table name + StartDT DATETIME, + EndDT DATETIME, + UpdateType SMALLINT, -- 1=Hourly, 2=Daily, 3=Mass + WasSuccessful BIT, + NumberRecords BIGINT -- -2=in-progress, -1=crashed, N=count +) +``` + +### Search + +Tracks search requests and results: + +```sql +CREATE TABLE dbo.Search ( + ID INT IDENTITY PRIMARY KEY, + UserName VARCHAR(128), + Name VARCHAR(128), + Status SMALLINT, -- 0-4 (see SearchStatus enum) + SubmitDT DATETIME, + StartDT DATETIME, + EndDT DATETIME, + Criteria VARCHAR(MAX), -- JSON serialized search criteria + Results VARBINARY(MAX) -- Excel file bytes +) +``` + +--- + +## Error Handling + +| Scenario | Behavior | +|----------|----------| +| Data update failure | Logged, continues with next update | +| Search execution failure | Status set to Error (4), logged | +| SignalR publish failure | Logged, processing continues | +| Top-level DoWork exception | Logged, swallowed, loop continues | +| Service crash during update | `NumberRecords = -2` cleaned up on restart | +| Service crash during search | `ResetPartialSearches()` requeues on restart | + +--- + +## Configuration + +### App.config Settings + +| Key | Purpose | +|-----|---------| +| `HubHost` | SignalR hub URL for status updates | +| `querytimeout` | SQL query timeout (default: 600 seconds) | + +### dsconfig/*.json + +Each data source has a JSON configuration file defining: +- Source system and data identifiers +- Destination table name +- Data fetch function reference +- Schedule configurations (Mass/Daily/Hourly) + +See `DataSyncReport.md` for complete configuration details. + +--- + +## Known Issues in Legacy Implementation + +| Issue | Severity | Description | Location | +|-------|----------|-------------|----------| +| Hourly lookback bug | Medium | `GetPendingUpdateTasks()` uses `DailyUpdateDT` and `DailyInterval` for hourly updates instead of hourly equivalents. Results in 3-day lookback instead of 3 hours. | `UpdateProcessor.cs:101-103` | +| Unused cleanup code | Low | `CloseOpenUpdateEntries()` is defined but never called. Crashed updates (`NumberRecords = -2`) are never cleaned up. | `UpdateProcessor.DataUpdateEntry.cs:28` | +| Fire-and-forget SignalR | Low | `PublishStatus` and `PublishSearchUpdate` are `async void` methods. Failures don't propagate and new connections are created per-call. | `WorkProcessor.cs:61-107` | + +--- + +## Key Differences from NEW Implementation + +| Aspect | OLD (Legacy) | NEW (.NET 10) | +|--------|--------------|---------------| +| Hosting | Topshelf Windows Service | .NET BackgroundService | +| Loop pattern | Manual thread + sleep | IHostedService + Timer | +| Parallelism | `Parallel.ForEach` | Configurable via options | +| SignalR | Legacy ASP.NET SignalR | ASP.NET Core SignalR | +| Configuration | JSON files + App.config | `appsettings.json` + `pipelines.json` | +| Scheduling | Interval-based polling | Same pattern, configurable defaults | +| Hourly lookback | Uses daily values (bug) | Should use correct hourly values | +| Cleanup on start | Not implemented | Should clean up orphaned updates | diff --git a/PLANS/2026-01-07-work-processor-design.md b/PLANS/2026-01-07-work-processor-design.md new file mode 100644 index 0000000..f294c39 --- /dev/null +++ b/PLANS/2026-01-07-work-processor-design.md @@ -0,0 +1,717 @@ +# WorkProcessor Design + +## Overview + +The WorkProcessor is a unified background service that coordinates both data synchronization and search processing for the JDE Scoping Tool. It implements the legacy work loop pattern where data freshness takes priority over search processing. + +## Architecture + +``` +┌─────────────────────────────────────────────────────────────────────┐ +│ WorkProcessor │ +│ (BackgroundService) │ +├─────────────────────────────────────────────────────────────────────┤ +│ │ +│ ExecuteAsync (main loop) │ +│ ├── Startup: CloseOpenUpdateEntries + ResetPartialSearches │ +│ │ │ +│ └── while (!cancelled): │ +│ ├── DoWorkAsync() │ +│ │ ├── Check pending data syncs (IScheduleChecker) │ +│ │ │ └── If any → ISyncOrchestrator.ExecutePendingSyncsAsync│ +│ │ │ │ +│ │ └── If no pending syncs: │ +│ │ ├── ISearchRepository.GetNextQueuedSearchAsync() │ +│ │ └── ISearchExecutionService.ExecuteSearchAsync() │ +│ │ │ +│ ├── Periodic: PurgeOldDataUpdateEntries │ +│ └── Task.Delay(WorkInterval) │ +│ │ +└─────────────────────────────────────────────────────────────────────┘ +``` + +## Design Principles + +1. **Clean Architecture**: Dependencies flow inward; infrastructure depends on abstractions +2. **Single Responsibility**: WorkProcessor only coordinates; actual work delegated to specialized services +3. **Priority-Based Processing**: Data syncs always take precedence over search processing +4. **Graceful Recovery**: Handles interrupted operations from prior runs at startup +5. **Observable**: Status notifications via injected abstraction + +## Layer Boundaries + +``` +┌─────────────────────────────────────────────────────────────────────┐ +│ JdeScoping.Host / JdeScoping.Api (Presentation Layer) │ +│ ├── StatusHub (SignalR) │ +│ └── SearchNotificationService : ISearchNotificationService │ +├─────────────────────────────────────────────────────────────────────┤ +│ JdeScoping.DataSync (Application/Infrastructure Layer) │ +│ ├── WorkProcessor : BackgroundService │ +│ ├── SearchExecutionService : ISearchExecutionService │ +│ ├── SearchRepository : ISearchRepository │ +│ └── Uses: ISyncOrchestrator, IScheduleChecker, IDataUpdateRepository│ +├─────────────────────────────────────────────────────────────────────┤ +│ JdeScoping.Core (Domain/Contracts Layer) │ +│ ├── ISearchNotificationService (interface only) │ +│ ├── ISearchProcessor (interface only) │ +│ ├── IExcelExportService (interface only - existing) │ +│ └── Models: Search, SearchStatus, SearchUpdate, StatusUpdate │ +├─────────────────────────────────────────────────────────────────────┤ +│ JdeScoping.DataAccess (Infrastructure Layer) │ +│ └── SearchProcessor : ISearchProcessor │ +├─────────────────────────────────────────────────────────────────────┤ +│ JdeScoping.ExcelIO (Infrastructure Layer) │ +│ └── ExcelExportService : IExcelExportService │ +└─────────────────────────────────────────────────────────────────────┘ +``` + +**Key Boundary Rule**: DataSync does NOT reference Api. SignalR notification implementation lives in Host/Api; DataSync only depends on `ISearchNotificationService` interface from Core. + +## Components + +### WorkProcessor (BackgroundService) + +Main orchestration service implementing the work loop. + +```csharp +public class WorkProcessor : BackgroundService +{ + private readonly IServiceScopeFactory _scopeFactory; + private readonly IOptions _options; + private readonly ILogger _logger; + private readonly DataSyncMetrics _metrics; + + private DateTime _lastPurgeCheck = DateTime.MinValue; + private static readonly TimeSpan PurgeCheckInterval = TimeSpan.FromHours(24); + + protected override async Task ExecuteAsync(CancellationToken stoppingToken) + { + if (!_options.Value.Enabled) + { + _logger.LogInformation("WorkProcessor is disabled"); + return; + } + + _logger.LogInformation( + "WorkProcessor starting with WorkInterval={WorkInterval}", + _options.Value.WorkInterval); + + // Startup cleanup + await StartupCleanupAsync(stoppingToken); + + while (!stoppingToken.IsCancellationRequested) + { + string status = "Idle"; + try + { + status = await DoWorkAsync(stoppingToken); + } + catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested) + { + _logger.LogInformation("WorkProcessor stopping gracefully"); + break; + } + catch (Exception ex) + { + _logger.LogError(ex, "Error in work cycle"); + _metrics.RecordCycleError(); + } + finally + { + // Always notify status (best-effort) + await NotifyStatusSafeAsync(status, stoppingToken); + } + + try + { + await Task.Delay(_options.Value.WorkInterval, stoppingToken); + } + catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested) + { + break; + } + } + + await NotifyStatusSafeAsync("Stopped", CancellationToken.None); + _logger.LogInformation("WorkProcessor stopped"); + } + + private async Task StartupCleanupAsync(CancellationToken ct) + { + await using var scope = _scopeFactory.CreateAsyncScope(); + + // Close interrupted DataUpdate entries + try + { + var dataUpdateRepo = scope.ServiceProvider.GetRequiredService(); + var closedCount = await dataUpdateRepo.CloseOpenUpdateEntriesAsync(ct); + if (closedCount > 0) + { + _logger.LogWarning("Closed {Count} interrupted data update entries", closedCount); + } + } + catch (Exception ex) + { + _logger.LogError(ex, "Failed to close open data update entries"); + } + + // Reset partial searches + try + { + var searchRepo = scope.ServiceProvider.GetRequiredService(); + var resetCount = await searchRepo.ResetPartialSearchesAsync(ct); + if (resetCount > 0) + { + _logger.LogWarning("Reset {Count} partial searches to Queued", resetCount); + } + } + catch (Exception ex) + { + _logger.LogError(ex, "Failed to reset partial searches"); + } + } + + private async Task DoWorkAsync(CancellationToken ct) + { + await using var scope = _scopeFactory.CreateAsyncScope(); + + // Priority 1: Data syncs + var scheduleChecker = scope.ServiceProvider.GetRequiredService(); + var pendingTasks = await scheduleChecker.GetPendingTasksAsync(ct); + + if (pendingTasks.Count > 0) + { + await NotifyStatusSafeAsync("Updating data cache", ct); + + var orchestrator = scope.ServiceProvider.GetRequiredService(); + await orchestrator.ExecutePendingSyncsAsync(ct); + + // Periodic purge check + await PurgeOldEntriesAsync(scope, ct); + + return "Idle"; + } + + // Priority 2: Search processing (only when syncs are current) + var searchRepository = scope.ServiceProvider.GetRequiredService(); + var search = await searchRepository.GetNextQueuedSearchAsync(ct); + + if (search != null) + { + await NotifyStatusSafeAsync($"Processing search #{search.Id}", ct); + + var executionService = scope.ServiceProvider.GetRequiredService(); + await executionService.ExecuteSearchAsync(search, ct); + } + + // Periodic purge check + await PurgeOldEntriesAsync(scope, ct); + + return "Idle"; + } + + private async Task PurgeOldEntriesAsync(AsyncServiceScope scope, CancellationToken ct) + { + if (DateTime.UtcNow - _lastPurgeCheck < PurgeCheckInterval) + { + return; + } + + _lastPurgeCheck = DateTime.UtcNow; + + try + { + var repository = scope.ServiceProvider.GetRequiredService(); + var purgedCount = await repository.PurgeOldEntriesAsync( + _options.Value.PurgeRetentionDays, ct); + + if (purgedCount > 0) + { + _logger.LogInformation( + "Purged {Count} DataUpdate records older than {Days} days", + purgedCount, _options.Value.PurgeRetentionDays); + } + } + catch (Exception ex) + { + _logger.LogError(ex, "Failed to purge old data update entries"); + } + } + + private async Task NotifyStatusSafeAsync(string status, CancellationToken ct) + { + try + { + await using var scope = _scopeFactory.CreateAsyncScope(); + var notificationService = scope.ServiceProvider.GetRequiredService(); + await notificationService.NotifyStatusAsync(status, ct); + } + catch (Exception ex) + { + _logger.LogDebug(ex, "Failed to send status notification"); + // Best-effort - don't throw + } + } +} +``` + +### WorkProcessorOptions + +Configuration for the work processor. + +```csharp +public class WorkProcessorOptions +{ + public const string SectionName = "WorkProcessor"; + + /// + /// Whether the work processor is enabled. Default: true. + /// + public bool Enabled { get; set; } = true; + + /// + /// Interval between work cycles. Default: 5 seconds. + /// + public TimeSpan WorkInterval { get; set; } = TimeSpan.FromSeconds(5); + + /// + /// Search execution timeout. Default: 30 minutes. + /// + public TimeSpan SearchTimeout { get; set; } = TimeSpan.FromMinutes(30); + + /// + /// Number of days to retain DataUpdate records. Default: 30. + /// + public int PurgeRetentionDays { get; set; } = 30; +} +``` + +### ISearchRepository + +Interface for search table operations (in JdeScoping.DataSync.Contracts). + +```csharp +public interface ISearchRepository +{ + /// + /// Gets the next queued search ordered by SubmitDT (FIFO). + /// + Task GetNextQueuedSearchAsync(CancellationToken ct = default); + + /// + /// Resets partial searches (Running but not Ended) back to Queued status. + /// Returns count of reset searches. + /// + Task ResetPartialSearchesAsync(CancellationToken ct = default); + + /// + /// Marks search as Running with StartDT = now. + /// + Task StartSearchAsync(int searchId, CancellationToken ct = default); + + /// + /// Marks search as Ended (success) or Error (failure) with EndDT and optional Results. + /// + Task CompleteSearchAsync(int searchId, bool success, byte[]? results, CancellationToken ct = default); +} +``` + +### ISearchProcessor (NEW - in JdeScoping.Core.Interfaces) + +Interface for search query execution. + +```csharp +public interface ISearchProcessor +{ + /// + /// Executes search and materializes all results into SearchModel. + /// + Task ExecuteSearchToModelAsync(SearchModel model, CancellationToken ct = default); +} +``` + +### ISearchExecutionService + +Interface for search execution pipeline (in JdeScoping.DataSync.Contracts). + +```csharp +public interface ISearchExecutionService +{ + /// + /// Executes the complete search pipeline: query, Excel generation, result storage. + /// Handles status transitions and notifications. + /// + Task ExecuteSearchAsync(Search search, CancellationToken ct = default); +} +``` + +### SearchExecutionService + +Implementation with proper cancellation handling. + +```csharp +public class SearchExecutionService : ISearchExecutionService +{ + private readonly ISearchRepository _searchRepository; + private readonly ISearchProcessor _searchProcessor; + private readonly IExcelExportService _excelExportService; + private readonly ISearchNotificationService _notificationService; + private readonly IOptions _options; + private readonly ILogger _logger; + + public async Task ExecuteSearchAsync(Search search, CancellationToken ct = default) + { + using var timeoutCts = new CancellationTokenSource(_options.Value.SearchTimeout); + using var linkedCts = CancellationTokenSource.CreateLinkedTokenSource(ct, timeoutCts.Token); + + try + { + // Mark as Running + await _searchRepository.StartSearchAsync(search.Id, linkedCts.Token); + search.Status = SearchStatus.Running; + search.StartDT = DateTime.UtcNow; + await _notificationService.NotifySearchUpdateAsync(search, linkedCts.Token); + + // Execute search query + var model = new SearchModel { Id = search.Id }; + await _searchProcessor.ExecuteSearchToModelAsync(model, linkedCts.Token); + + // Generate Excel + var excelBytes = await _excelExportService.GenerateAsync(model, linkedCts.Token); + + // Complete with success + await _searchRepository.CompleteSearchAsync(search.Id, true, excelBytes, linkedCts.Token); + search.Status = SearchStatus.Ended; + search.EndDT = DateTime.UtcNow; + await _notificationService.NotifySearchUpdateAsync(search, linkedCts.Token); + + _logger.LogInformation( + "Search {SearchId} completed successfully with {ResultCount} results", + search.Id, model.Results.Count); + } + catch (OperationCanceledException) when (ct.IsCancellationRequested) + { + // Host shutdown - don't mark as error, let startup cleanup handle it + _logger.LogInformation( + "Search {SearchId} interrupted by shutdown, will be requeued on restart", + search.Id); + // Don't rethrow - let the caller handle graceful shutdown + } + catch (OperationCanceledException) when (timeoutCts.IsCancellationRequested) + { + // Timeout - mark as error + _logger.LogWarning( + "Search {SearchId} timed out after {Timeout}", + search.Id, _options.Value.SearchTimeout); + await CompleteWithErrorAsync(search, CancellationToken.None); + } + catch (Exception ex) + { + _logger.LogError(ex, "Search {SearchId} failed", search.Id); + await CompleteWithErrorAsync(search, CancellationToken.None); + } + } + + private async Task CompleteWithErrorAsync(Search search, CancellationToken ct) + { + try + { + await _searchRepository.CompleteSearchAsync(search.Id, false, null, ct); + search.Status = SearchStatus.Error; + search.EndDT = DateTime.UtcNow; + await _notificationService.NotifySearchUpdateAsync(search, ct); + } + catch (Exception ex) + { + _logger.LogError(ex, "Failed to mark search {SearchId} as error", search.Id); + } + } +} +``` + +### ISearchNotificationService (in JdeScoping.Core.Interfaces) + +Interface for SignalR notifications - lives in Core to avoid dependency issues. + +```csharp +public interface ISearchNotificationService +{ + /// + /// Notifies clients of search status update. + /// + Task NotifySearchUpdateAsync(Search search, CancellationToken ct = default); + + /// + /// Notifies clients of work processor status change. + /// + Task NotifyStatusAsync(string status, CancellationToken ct = default); +} +``` + +### SearchNotificationService (in JdeScoping.Host or JdeScoping.Api) + +Implementation using ASP.NET Core SignalR - lives in presentation layer. + +```csharp +public class SearchNotificationService : ISearchNotificationService +{ + private readonly IHubContext _hubContext; + private readonly ILogger _logger; + + public SearchNotificationService( + IHubContext hubContext, + ILogger logger) + { + _hubContext = hubContext; + _logger = logger; + } + + public async Task NotifySearchUpdateAsync(Search search, CancellationToken ct = default) + { + try + { + var update = new SearchUpdate + { + Id = search.Id, + Status = search.Status, + StartDT = search.StartDT, + EndDT = search.EndDT + }; + + await _hubContext.Clients.All.SendAsync("SearchUpdated", update, ct); + } + catch (Exception ex) + { + _logger.LogDebug(ex, "Failed to send search update notification"); + } + } + + public async Task NotifyStatusAsync(string status, CancellationToken ct = default) + { + try + { + var update = new StatusUpdate + { + Message = status, + Timestamp = DateTime.UtcNow + }; + + await _hubContext.Clients.All.SendAsync("StatusUpdated", update, ct); + } + catch (Exception ex) + { + _logger.LogDebug(ex, "Failed to send status notification"); + } + } +} +``` + +## Configuration + +### appsettings.json + +```json +{ + "WorkProcessor": { + "Enabled": true, + "WorkInterval": "00:00:05", + "SearchTimeout": "00:30:00", + "PurgeRetentionDays": 30 + }, + "DataSync": { + "MaxDegreeOfParallelism": 8, + "LookbackMultiplier": 3, + "SyncTimeoutSeconds": 3600 + } +} +``` + +## Dependency Injection + +### In JdeScoping.DataSync/DependencyInjection.cs + +```csharp +public static IServiceCollection AddDataSyncServices( + this IServiceCollection services, + IConfiguration configuration) +{ + // Options + services.Configure( + configuration.GetSection(WorkProcessorOptions.SectionName)); + services.Configure( + configuration.GetSection(DataSyncOptions.SectionName)); + + // Existing DataSync services... + services.AddSingleton(); + services.AddSingleton(); + services.AddScoped(); + services.AddScoped(); + services.AddScoped(); + services.AddScoped(); + + // Search services (scoped for parallel isolation) + services.AddScoped(); + services.AddScoped(); + + // Background service + services.AddHostedService(); + + return services; +} +``` + +### In JdeScoping.Host/Program.cs (or similar) + +```csharp +// Register SignalR notification service (presentation layer implementation) +builder.Services.AddScoped(); +``` + +## Legacy Bug Fixes + +### Bug 1: Hourly Lookback Uses Daily Values + +**Legacy Issue**: In `ScheduleChecker.CheckConfigSchedule()` lines 110-114, hourly sync uses `lastDaily` and `config.DailyConfig.IntervalMinutes` for lookback calculation. + +**Fix Required**: Update ScheduleChecker to use hourly values: + +```csharp +// BEFORE (buggy): +var minimumDt = CalculateMinimumDt(lastDaily, config.DailyConfig.IntervalMinutes); + +// AFTER (fixed): +var minimumDt = CalculateMinimumDt(lastHourly, config.HourlyConfig.IntervalMinutes); +``` + +### Bug 2: Unused Cleanup Code + +**Legacy Issue**: `CloseOpenUpdateEntries()` was defined but never called. + +**Fix**: WorkProcessor calls cleanup at startup via `StartupCleanupAsync()`: +- `IDataUpdateRepository.CloseOpenUpdateEntriesAsync()` for DataUpdate table +- `ISearchRepository.ResetPartialSearchesAsync()` for Search table + +### Bug 3: Fire-and-Forget SignalR + +**Legacy Issue**: `async void` methods with per-call `HubConnection` instances. + +**Fix**: +- Use proper async/await with injected `IHubContext` +- Interface in Core, implementation in Host/Api +- Error handling logs but doesn't throw (best-effort notifications) +- No Status property setter with side effects + +## File Structure + +### New Files + +``` +src/JdeScoping.Core/Interfaces/ +├── ISearchProcessor.cs # Interface for search execution +└── ISearchNotificationService.cs # Interface for SignalR notifications + +src/JdeScoping.Core/Models/ +└── StatusUpdate.cs # DTO for status notifications (if not existing) + +src/JdeScoping.DataSync/ +├── WorkProcessor.cs # Main BackgroundService +├── Options/ +│ └── WorkProcessorOptions.cs # Configuration options +├── Contracts/ +│ ├── ISearchRepository.cs # Search table operations +│ └── ISearchExecutionService.cs # Search pipeline orchestration +└── Services/ + ├── SearchRepository.cs # ISearchRepository impl + └── SearchExecutionService.cs # ISearchExecutionService impl + +src/JdeScoping.Host/ (or JdeScoping.Api/) +└── Services/ + └── SearchNotificationService.cs # ISearchNotificationService impl + +tests/JdeScoping.DataSync.Tests/ +├── WorkProcessorTests.cs # Main service tests +└── Services/ + ├── SearchRepositoryTests.cs + └── SearchExecutionServiceTests.cs +``` + +### Modified Files + +``` +src/JdeScoping.Core/Interfaces/ +└── IExcelExportService.cs # Existing - update signature if needed + +src/JdeScoping.DataSync/ +├── DependencyInjection.cs # Add WorkProcessor, remove DataSyncService +├── Options/DataSyncOptions.cs # Remove Enabled (moved to WorkProcessor) +└── Services/ScheduleChecker.cs # Fix hourly lookback bug + +src/JdeScoping.DataAccess/ +└── Services/SearchProcessor.cs # Implement ISearchProcessor interface + +src/JdeScoping.Host/ +├── Program.cs # Register SearchNotificationService +├── appsettings.json # Add WorkProcessor section +└── appsettings.Development.json # Add WorkProcessor section +``` + +### Removed Files + +``` +src/JdeScoping.DataSync/ +└── DataSyncService.cs # Replaced by WorkProcessor +``` + +## Test Coverage + +### WorkProcessorTests + +- Service starts and stops gracefully +- Disabled via configuration skips processing +- Data sync priority: syncs block search processing +- Search processing: executes when no pending syncs +- Startup cleanup: calls CloseOpenUpdateEntries and ResetPartialSearches +- Purge: runs periodically every 24 hours +- Error handling: continues loop on exceptions +- Cancellation: responds to stopping token + +### SearchRepositoryTests + +- GetNextQueuedSearchAsync returns oldest queued search +- GetNextQueuedSearchAsync returns null when none queued +- ResetPartialSearchesAsync resets Running searches to Queued +- StartSearchAsync updates status and StartDT +- CompleteSearchAsync success path (Ended status, results stored) +- CompleteSearchAsync failure path (Error status, no results) + +### SearchExecutionServiceTests + +- ExecuteSearchAsync success: full pipeline completion +- ExecuteSearchAsync failure: error status set +- ExecuteSearchAsync timeout: handled gracefully, marked as error +- ExecuteSearchAsync shutdown: not marked as error, left for requeue +- Notifications sent on status changes +- Excel generation called with search results + +### ScheduleCheckerTests (additional) + +- Hourly lookback uses hourly timestamp and interval (bug fix verification) + +## Atomic Search Claim (Concurrency Note) + +If the system may run multiple instances, `GetNextQueuedSearchAsync` should atomically claim the search: + +```sql +WITH NextSearch AS ( + SELECT TOP 1 * + FROM dbo.Search + WHERE Status = 1 -- Queued + ORDER BY SubmitDT +) +UPDATE NextSearch +SET Status = 2, StartDT = GETUTCDATE() +OUTPUT INSERTED.*; +``` + +This ensures no two instances process the same search. The current design assumes single-instance; update if multi-instance is required. diff --git a/PLANS/2026-01-07-work-processor-implementation.md b/PLANS/2026-01-07-work-processor-implementation.md new file mode 100644 index 0000000..d892925 --- /dev/null +++ b/PLANS/2026-01-07-work-processor-implementation.md @@ -0,0 +1,1628 @@ +# WorkProcessor Implementation Plan + +> **For Claude:** REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task. + +**Goal:** Implement the unified WorkProcessor background service that coordinates data synchronization and search processing, replacing the legacy DataSyncService. + +**Architecture:** The WorkProcessor is a BackgroundService in the DataSync layer that prioritizes data freshness over search processing. It uses clean architecture with interfaces in Core, implementations in DataSync/DataAccess, and SignalR notifications in Api. + +**Tech Stack:** .NET 10, BackgroundService, SignalR, Dapper, EPPlus, xUnit, NSubstitute, Shouldly + +--- + +## Task 1: Create ISearchProcessor Interface + +**Files:** +- Create: `src/JdeScoping.Core/Interfaces/ISearchProcessor.cs` + +**Step 1: Create the interface** + +```csharp +using JdeScoping.DataAccess.Models; + +namespace JdeScoping.Core.Interfaces; + +/// +/// Interface for search query execution. +/// +public interface ISearchProcessor +{ + /// + /// Executes search and materializes all results into SearchModel. + /// + /// The search model containing search ID. + /// Cancellation token. + /// The SearchModel populated with results. + Task ExecuteSearchToModelAsync(SearchModel model, CancellationToken ct = default); +} +``` + +**Step 2: Verify the build compiles** + +Run: `dotnet build src/JdeScoping.Core/JdeScoping.Core.csproj` +Expected: Build succeeded + +**Step 3: Commit** + +```bash +git add src/JdeScoping.Core/Interfaces/ISearchProcessor.cs +git commit -m "feat(core): add ISearchProcessor interface for search execution abstraction" +``` + +--- + +## Task 2: Create ISearchNotificationService Interface + +**Files:** +- Create: `src/JdeScoping.Core/Interfaces/ISearchNotificationService.cs` + +**Step 1: Create the interface** + +```csharp +using JdeScoping.Core.Models.Search; + +namespace JdeScoping.Core.Interfaces; + +/// +/// Interface for SignalR notifications - lives in Core to avoid dependency issues. +/// Implementation lives in Host/Api layer. +/// +public interface ISearchNotificationService +{ + /// + /// Notifies clients of search status update. + /// + /// The search with updated status. + /// Cancellation token. + Task NotifySearchUpdateAsync(Search search, CancellationToken ct = default); + + /// + /// Notifies clients of work processor status change. + /// + /// Status message. + /// Cancellation token. + Task NotifyStatusAsync(string status, CancellationToken ct = default); +} +``` + +**Step 2: Verify the build compiles** + +Run: `dotnet build src/JdeScoping.Core/JdeScoping.Core.csproj` +Expected: Build succeeded + +**Step 3: Commit** + +```bash +git add src/JdeScoping.Core/Interfaces/ISearchNotificationService.cs +git commit -m "feat(core): add ISearchNotificationService interface for SignalR abstraction" +``` + +--- + +## Task 3: Create WorkProcessorOptions + +**Files:** +- Create: `src/JdeScoping.DataSync/Options/WorkProcessorOptions.cs` + +**Step 1: Create the options class** + +```csharp +using System.ComponentModel.DataAnnotations; + +namespace JdeScoping.DataSync.Options; + +/// +/// Configuration options for the WorkProcessor background service. +/// +public class WorkProcessorOptions +{ + /// + /// Configuration section name. + /// + public const string SectionName = "WorkProcessor"; + + /// + /// Whether the work processor is enabled. Default: true. + /// + public bool Enabled { get; set; } = true; + + /// + /// Interval between work cycles. Default: 5 seconds. + /// + [Range(typeof(TimeSpan), "00:00:01", "01:00:00")] + public TimeSpan WorkInterval { get; set; } = TimeSpan.FromSeconds(5); + + /// + /// Search execution timeout. Default: 30 minutes. + /// + [Range(typeof(TimeSpan), "00:01:00", "04:00:00")] + public TimeSpan SearchTimeout { get; set; } = TimeSpan.FromMinutes(30); + + /// + /// Number of days to retain DataUpdate records. Default: 30. + /// + [Range(1, 365)] + public int PurgeRetentionDays { get; set; } = 30; +} +``` + +**Step 2: Verify the build compiles** + +Run: `dotnet build src/JdeScoping.DataSync/JdeScoping.DataSync.csproj` +Expected: Build succeeded + +**Step 3: Commit** + +```bash +git add src/JdeScoping.DataSync/Options/WorkProcessorOptions.cs +git commit -m "feat(datasync): add WorkProcessorOptions configuration class" +``` + +--- + +## Task 4: Create ISearchRepository Interface + +**Files:** +- Create: `src/JdeScoping.DataSync/Contracts/ISearchRepository.cs` + +**Step 1: Create the interface** + +```csharp +using JdeScoping.Core.Models.Search; + +namespace JdeScoping.DataSync.Contracts; + +/// +/// Repository interface for Search table operations. +/// +public interface ISearchRepository +{ + /// + /// Gets the next queued search ordered by SubmitDT (FIFO). + /// + /// Cancellation token. + /// The next queued search, or null if none. + Task GetNextQueuedSearchAsync(CancellationToken ct = default); + + /// + /// Resets partial searches (Running but not Ended) back to Queued status. + /// Called at service startup to handle interrupted operations. + /// + /// Cancellation token. + /// Count of reset searches. + Task ResetPartialSearchesAsync(CancellationToken ct = default); + + /// + /// Marks search as Running with StartDT = now. + /// + /// Search ID. + /// Cancellation token. + Task StartSearchAsync(int searchId, CancellationToken ct = default); + + /// + /// Marks search as Ended (success) or Error (failure) with EndDT and optional Results. + /// + /// Search ID. + /// Whether search completed successfully. + /// Excel file bytes (null on failure). + /// Cancellation token. + Task CompleteSearchAsync(int searchId, bool success, byte[]? results, CancellationToken ct = default); +} +``` + +**Step 2: Verify the build compiles** + +Run: `dotnet build src/JdeScoping.DataSync/JdeScoping.DataSync.csproj` +Expected: Build succeeded + +**Step 3: Commit** + +```bash +git add src/JdeScoping.DataSync/Contracts/ISearchRepository.cs +git commit -m "feat(datasync): add ISearchRepository interface for search table operations" +``` + +--- + +## Task 5: Create ISearchExecutionService Interface + +**Files:** +- Create: `src/JdeScoping.DataSync/Contracts/ISearchExecutionService.cs` + +**Step 1: Create the interface** + +```csharp +using JdeScoping.Core.Models.Search; + +namespace JdeScoping.DataSync.Contracts; + +/// +/// Interface for search execution pipeline orchestration. +/// +public interface ISearchExecutionService +{ + /// + /// Executes the complete search pipeline: query, Excel generation, result storage. + /// Handles status transitions and notifications. + /// + /// The search to execute. + /// Cancellation token. + Task ExecuteSearchAsync(Search search, CancellationToken ct = default); +} +``` + +**Step 2: Verify the build compiles** + +Run: `dotnet build src/JdeScoping.DataSync/JdeScoping.DataSync.csproj` +Expected: Build succeeded + +**Step 3: Commit** + +```bash +git add src/JdeScoping.DataSync/Contracts/ISearchExecutionService.cs +git commit -m "feat(datasync): add ISearchExecutionService interface for search pipeline" +``` + +--- + +## Task 6: Implement SearchProcessor with ISearchProcessor + +**Files:** +- Modify: `src/JdeScoping.DataAccess/Services/SearchProcessor.cs` + +**Step 1: Update SearchProcessor to implement ISearchProcessor** + +Add interface implementation to the class declaration: + +```csharp +// Change from: +public sealed class SearchProcessor + +// To: +public sealed class SearchProcessor : ISearchProcessor +``` + +Also add the using statement at the top: + +```csharp +using JdeScoping.Core.Interfaces; +``` + +**Step 2: Verify the build compiles** + +Run: `dotnet build src/JdeScoping.DataAccess/JdeScoping.DataAccess.csproj` +Expected: Build succeeded + +**Step 3: Commit** + +```bash +git add src/JdeScoping.DataAccess/Services/SearchProcessor.cs +git commit -m "feat(dataaccess): implement ISearchProcessor interface on SearchProcessor" +``` + +--- + +## Task 7: Implement SearchRepository + +**Files:** +- Create: `src/JdeScoping.DataSync/Services/SearchRepository.cs` +- Create: `tests/JdeScoping.DataSync.Tests/Services/SearchRepositoryTests.cs` + +**Step 1: Write the failing tests** + +```csharp +using JdeScoping.Core.Models.Enums; +using JdeScoping.Core.Models.Search; +using JdeScoping.DataSync.Contracts; +using JdeScoping.DataSync.Services; +using Microsoft.Extensions.Logging; +using NSubstitute; +using Shouldly; + +namespace JdeScoping.DataSync.Tests.Services; + +public class SearchRepositoryTests +{ + [Fact] + public async Task GetNextQueuedSearchAsync_WhenNoneQueued_ReturnsNull() + { + // Arrange + var sut = CreateRepository(); + + // Act + var result = await sut.GetNextQueuedSearchAsync(); + + // Assert + result.ShouldBeNull(); + } + + [Fact] + public async Task ResetPartialSearchesAsync_WhenNoPartialSearches_ReturnsZero() + { + // Arrange + var sut = CreateRepository(); + + // Act + var count = await sut.ResetPartialSearchesAsync(); + + // Assert + count.ShouldBe(0); + } + + private static SearchRepository CreateRepository() + { + // Note: Actual implementation tests require database integration tests + // These are placeholder tests to verify the interface contract + throw new NotImplementedException("Integration tests required"); + } +} +``` + +**Step 2: Run tests to verify they fail** + +Run: `dotnet test tests/JdeScoping.DataSync.Tests --filter "SearchRepositoryTests" -v n` +Expected: FAIL (NotImplementedException or missing class) + +**Step 3: Implement SearchRepository** + +```csharp +using Dapper; +using JdeScoping.Core.Models.Enums; +using JdeScoping.Core.Models.Search; +using JdeScoping.DataAccess.Interfaces; +using JdeScoping.DataSync.Contracts; +using Microsoft.Extensions.Logging; + +namespace JdeScoping.DataSync.Services; + +/// +/// Repository for Search table operations. +/// +public class SearchRepository : ISearchRepository +{ + private readonly IDbConnectionFactory _connectionFactory; + private readonly ILogger _logger; + + public SearchRepository( + IDbConnectionFactory connectionFactory, + ILogger logger) + { + _connectionFactory = connectionFactory; + _logger = logger; + } + + /// + public async Task GetNextQueuedSearchAsync(CancellationToken ct = default) + { + await using var connection = await _connectionFactory.CreateLotFinderConnectionAsync(ct); + + const string sql = """ + SELECT TOP 1 + Id, UserName, Name, Status, SubmitDT as SubmitDt, + StartDT as StartDt, EndDT as EndDt, CriteriaJSON as CriteriaJson + FROM dbo.Search + WHERE Status = @Status + ORDER BY SubmitDT ASC + """; + + var search = await connection.QueryFirstOrDefaultAsync( + sql, + new { Status = (int)SearchStatus.Queued }, + commandTimeout: 30); + + if (search != null) + { + _logger.LogDebug("Found queued search {SearchId}", search.Id); + } + + return search; + } + + /// + public async Task ResetPartialSearchesAsync(CancellationToken ct = default) + { + await using var connection = await _connectionFactory.CreateLotFinderConnectionAsync(ct); + + const string sql = """ + UPDATE dbo.Search + SET Status = @QueuedStatus, StartDT = NULL + WHERE Status = @RunningStatus AND EndDT IS NULL + """; + + var count = await connection.ExecuteAsync( + sql, + new + { + QueuedStatus = (int)SearchStatus.Queued, + RunningStatus = (int)SearchStatus.Running + }, + commandTimeout: 30); + + if (count > 0) + { + _logger.LogWarning("Reset {Count} partial searches to Queued status", count); + } + + return count; + } + + /// + public async Task StartSearchAsync(int searchId, CancellationToken ct = default) + { + await using var connection = await _connectionFactory.CreateLotFinderConnectionAsync(ct); + + const string sql = """ + UPDATE dbo.Search + SET Status = @Status, StartDT = @StartDt + WHERE Id = @SearchId + """; + + await connection.ExecuteAsync( + sql, + new + { + SearchId = searchId, + Status = (int)SearchStatus.Running, + StartDt = DateTime.UtcNow + }, + commandTimeout: 30); + + _logger.LogDebug("Started search {SearchId}", searchId); + } + + /// + public async Task CompleteSearchAsync(int searchId, bool success, byte[]? results, CancellationToken ct = default) + { + await using var connection = await _connectionFactory.CreateLotFinderConnectionAsync(ct); + + const string sql = """ + UPDATE dbo.Search + SET Status = @Status, EndDT = @EndDt, Results = @Results + WHERE Id = @SearchId + """; + + var status = success ? SearchStatus.Ended : SearchStatus.Error; + + await connection.ExecuteAsync( + sql, + new + { + SearchId = searchId, + Status = (int)status, + EndDt = DateTime.UtcNow, + Results = results + }, + commandTimeout: 30); + + _logger.LogDebug("Completed search {SearchId} with status {Status}", searchId, status); + } +} +``` + +**Step 4: Update tests to use mock connection factory** + +```csharp +using JdeScoping.Core.Models.Enums; +using JdeScoping.Core.Models.Search; +using JdeScoping.DataAccess.Interfaces; +using JdeScoping.DataSync.Contracts; +using JdeScoping.DataSync.Services; +using Microsoft.Data.SqlClient; +using Microsoft.Extensions.Logging; +using NSubstitute; +using Shouldly; + +namespace JdeScoping.DataSync.Tests.Services; + +public class SearchRepositoryTests +{ + private readonly IDbConnectionFactory _connectionFactory; + private readonly ILogger _logger; + + public SearchRepositoryTests() + { + _connectionFactory = Substitute.For(); + _logger = Substitute.For>(); + } + + [Fact] + public void Constructor_WithNullConnectionFactory_ThrowsArgumentNullException() + { + // Act & Assert + Should.Throw(() => new SearchRepository(null!, _logger)); + } + + [Fact] + public void Constructor_WithNullLogger_ThrowsArgumentNullException() + { + // Act & Assert + Should.Throw(() => new SearchRepository(_connectionFactory, null!)); + } + + [Fact] + public void Constructor_WithValidDependencies_CreatesInstance() + { + // Act + var repository = new SearchRepository(_connectionFactory, _logger); + + // Assert + repository.ShouldNotBeNull(); + } +} +``` + +**Step 5: Run tests to verify they pass** + +Run: `dotnet test tests/JdeScoping.DataSync.Tests --filter "SearchRepositoryTests" -v n` +Expected: PASS + +**Step 6: Commit** + +```bash +git add src/JdeScoping.DataSync/Services/SearchRepository.cs +git add tests/JdeScoping.DataSync.Tests/Services/SearchRepositoryTests.cs +git commit -m "feat(datasync): implement SearchRepository with tests" +``` + +--- + +## Task 8: Implement SearchExecutionService + +**Files:** +- Create: `src/JdeScoping.DataSync/Services/SearchExecutionService.cs` +- Create: `tests/JdeScoping.DataSync.Tests/Services/SearchExecutionServiceTests.cs` + +**Step 1: Write the failing tests** + +```csharp +using JdeScoping.Core.Interfaces; +using JdeScoping.Core.Models.Enums; +using JdeScoping.Core.Models.Search; +using JdeScoping.DataAccess.Models; +using JdeScoping.DataSync.Contracts; +using JdeScoping.DataSync.Options; +using JdeScoping.DataSync.Services; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using NSubstitute; +using NSubstitute.ExceptionExtensions; +using Shouldly; + +namespace JdeScoping.DataSync.Tests.Services; + +public class SearchExecutionServiceTests +{ + private readonly ISearchRepository _searchRepository; + private readonly ISearchProcessor _searchProcessor; + private readonly IExcelExportService _excelExportService; + private readonly ISearchNotificationService _notificationService; + private readonly IOptions _options; + private readonly ILogger _logger; + + public SearchExecutionServiceTests() + { + _searchRepository = Substitute.For(); + _searchProcessor = Substitute.For(); + _excelExportService = Substitute.For(); + _notificationService = Substitute.For(); + _options = Options.Create(new WorkProcessorOptions { SearchTimeout = TimeSpan.FromSeconds(30) }); + _logger = Substitute.For>(); + } + + [Fact] + public async Task ExecuteSearchAsync_Success_CompletesWithEndedStatus() + { + // Arrange + var search = new Search { Id = 1, Status = SearchStatus.Queued }; + var model = new SearchModel { Id = 1, Results = [] }; + var excelBytes = new byte[] { 1, 2, 3 }; + + _searchProcessor.ExecuteSearchToModelAsync(Arg.Any(), Arg.Any()) + .Returns(model); + _excelExportService.GenerateAsync(Arg.Any(), Arg.Any()) + .Returns(excelBytes); + + var sut = CreateService(); + + // Act + await sut.ExecuteSearchAsync(search); + + // Assert + await _searchRepository.Received(1).StartSearchAsync(1, Arg.Any()); + await _searchRepository.Received(1).CompleteSearchAsync(1, true, excelBytes, Arg.Any()); + await _notificationService.Received(2).NotifySearchUpdateAsync(search, Arg.Any()); + } + + [Fact] + public async Task ExecuteSearchAsync_ProcessorThrows_MarksAsError() + { + // Arrange + var search = new Search { Id = 1, Status = SearchStatus.Queued }; + _searchProcessor.ExecuteSearchToModelAsync(Arg.Any(), Arg.Any()) + .ThrowsAsync(new InvalidOperationException("Test error")); + + var sut = CreateService(); + + // Act + await sut.ExecuteSearchAsync(search); + + // Assert + await _searchRepository.Received(1).CompleteSearchAsync(1, false, null, Arg.Any()); + } + + [Fact] + public async Task ExecuteSearchAsync_HostShutdown_DoesNotMarkAsError() + { + // Arrange + var search = new Search { Id = 1, Status = SearchStatus.Queued }; + var cts = new CancellationTokenSource(); + + _searchProcessor.ExecuteSearchToModelAsync(Arg.Any(), Arg.Any()) + .Returns(async _ => + { + cts.Cancel(); + throw new OperationCanceledException(cts.Token); + }); + + var sut = CreateService(); + + // Act + await sut.ExecuteSearchAsync(search, cts.Token); + + // Assert + await _searchRepository.DidNotReceive().CompleteSearchAsync( + Arg.Any(), false, Arg.Any(), Arg.Any()); + } + + private SearchExecutionService CreateService() + { + return new SearchExecutionService( + _searchRepository, + _searchProcessor, + _excelExportService, + _notificationService, + _options, + _logger); + } +} +``` + +**Step 2: Run tests to verify they fail** + +Run: `dotnet test tests/JdeScoping.DataSync.Tests --filter "SearchExecutionServiceTests" -v n` +Expected: FAIL (missing class) + +**Step 3: Implement SearchExecutionService** + +```csharp +using JdeScoping.Core.Interfaces; +using JdeScoping.Core.Models.Enums; +using JdeScoping.Core.Models.Search; +using JdeScoping.DataAccess.Models; +using JdeScoping.DataSync.Contracts; +using JdeScoping.DataSync.Options; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; + +namespace JdeScoping.DataSync.Services; + +/// +/// Service that orchestrates the complete search execution pipeline. +/// +public class SearchExecutionService : ISearchExecutionService +{ + private readonly ISearchRepository _searchRepository; + private readonly ISearchProcessor _searchProcessor; + private readonly IExcelExportService _excelExportService; + private readonly ISearchNotificationService _notificationService; + private readonly WorkProcessorOptions _options; + private readonly ILogger _logger; + + public SearchExecutionService( + ISearchRepository searchRepository, + ISearchProcessor searchProcessor, + IExcelExportService excelExportService, + ISearchNotificationService notificationService, + IOptions options, + ILogger logger) + { + _searchRepository = searchRepository ?? throw new ArgumentNullException(nameof(searchRepository)); + _searchProcessor = searchProcessor ?? throw new ArgumentNullException(nameof(searchProcessor)); + _excelExportService = excelExportService ?? throw new ArgumentNullException(nameof(excelExportService)); + _notificationService = notificationService ?? throw new ArgumentNullException(nameof(notificationService)); + _options = options?.Value ?? throw new ArgumentNullException(nameof(options)); + _logger = logger ?? throw new ArgumentNullException(nameof(logger)); + } + + /// + public async Task ExecuteSearchAsync(Search search, CancellationToken ct = default) + { + using var timeoutCts = new CancellationTokenSource(_options.SearchTimeout); + using var linkedCts = CancellationTokenSource.CreateLinkedTokenSource(ct, timeoutCts.Token); + + try + { + // Mark as Running + await _searchRepository.StartSearchAsync(search.Id, linkedCts.Token); + search.Status = SearchStatus.Running; + search.StartDt = DateTime.UtcNow; + await NotifySearchUpdateSafeAsync(search, linkedCts.Token); + + // Execute search query + var model = new SearchModel { Id = search.Id }; + await _searchProcessor.ExecuteSearchToModelAsync(model, linkedCts.Token); + + // Generate Excel + var excelBytes = await _excelExportService.GenerateAsync(model, linkedCts.Token); + + // Complete with success + await _searchRepository.CompleteSearchAsync(search.Id, true, excelBytes, linkedCts.Token); + search.Status = SearchStatus.Ended; + search.EndDt = DateTime.UtcNow; + await NotifySearchUpdateSafeAsync(search, linkedCts.Token); + + _logger.LogInformation( + "Search {SearchId} completed successfully with {ResultCount} results", + search.Id, model.Results.Count); + } + catch (OperationCanceledException) when (ct.IsCancellationRequested) + { + // Host shutdown - don't mark as error, let startup cleanup handle it + _logger.LogInformation( + "Search {SearchId} interrupted by shutdown, will be requeued on restart", + search.Id); + // Don't rethrow - let the caller handle graceful shutdown + } + catch (OperationCanceledException) when (timeoutCts.IsCancellationRequested) + { + // Timeout - mark as error + _logger.LogWarning( + "Search {SearchId} timed out after {Timeout}", + search.Id, _options.SearchTimeout); + await CompleteWithErrorAsync(search); + } + catch (Exception ex) + { + _logger.LogError(ex, "Search {SearchId} failed", search.Id); + await CompleteWithErrorAsync(search); + } + } + + private async Task CompleteWithErrorAsync(Search search) + { + try + { + await _searchRepository.CompleteSearchAsync(search.Id, false, null, CancellationToken.None); + search.Status = SearchStatus.Error; + search.EndDt = DateTime.UtcNow; + await NotifySearchUpdateSafeAsync(search, CancellationToken.None); + } + catch (Exception ex) + { + _logger.LogError(ex, "Failed to mark search {SearchId} as error", search.Id); + } + } + + private async Task NotifySearchUpdateSafeAsync(Search search, CancellationToken ct) + { + try + { + await _notificationService.NotifySearchUpdateAsync(search, ct); + } + catch (Exception ex) + { + _logger.LogDebug(ex, "Failed to send search update notification for {SearchId}", search.Id); + } + } +} +``` + +**Step 4: Run tests to verify they pass** + +Run: `dotnet test tests/JdeScoping.DataSync.Tests --filter "SearchExecutionServiceTests" -v n` +Expected: PASS + +**Step 5: Commit** + +```bash +git add src/JdeScoping.DataSync/Services/SearchExecutionService.cs +git add tests/JdeScoping.DataSync.Tests/Services/SearchExecutionServiceTests.cs +git commit -m "feat(datasync): implement SearchExecutionService with proper cancellation handling" +``` + +--- + +## Task 9: Implement WorkProcessor + +**Files:** +- Create: `src/JdeScoping.DataSync/WorkProcessor.cs` +- Create: `tests/JdeScoping.DataSync.Tests/WorkProcessorTests.cs` + +**Step 1: Write the failing tests** + +```csharp +using JdeScoping.Core.Interfaces; +using JdeScoping.Core.Models.Enums; +using JdeScoping.Core.Models.Search; +using JdeScoping.DataSync.Contracts; +using JdeScoping.DataSync.Options; +using JdeScoping.DataSync.Telemetry; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using NSubstitute; +using Shouldly; + +namespace JdeScoping.DataSync.Tests; + +public class WorkProcessorTests +{ + private readonly IServiceScopeFactory _scopeFactory; + private readonly IOptions _options; + private readonly ILogger _logger; + private readonly DataSyncMetrics _metrics; + + public WorkProcessorTests() + { + _scopeFactory = Substitute.For(); + _options = Options.Create(new WorkProcessorOptions + { + Enabled = true, + WorkInterval = TimeSpan.FromMilliseconds(100) + }); + _logger = Substitute.For>(); + _metrics = new DataSyncMetrics(); + } + + [Fact] + public async Task ExecuteAsync_WhenDisabled_StopsImmediately() + { + // Arrange + var options = Options.Create(new WorkProcessorOptions { Enabled = false }); + var sut = new WorkProcessor(_scopeFactory, options, _logger, _metrics); + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(1)); + + // Act + await sut.StartAsync(cts.Token); + await Task.Delay(50); + await sut.StopAsync(CancellationToken.None); + + // Assert - should not throw + } + + [Fact] + public async Task ExecuteAsync_CallsStartupCleanup() + { + // Arrange + var dataUpdateRepo = Substitute.For(); + var searchRepo = Substitute.For(); + var scheduleChecker = Substitute.For(); + var notificationService = Substitute.For(); + + scheduleChecker.GetPendingTasksAsync(Arg.Any()) + .Returns([]); + + var scope = SetupScope(dataUpdateRepo, searchRepo, scheduleChecker, notificationService); + _scopeFactory.CreateAsyncScope().Returns(scope); + + var sut = new WorkProcessor(_scopeFactory, _options, _logger, _metrics); + using var cts = new CancellationTokenSource(TimeSpan.FromMilliseconds(200)); + + // Act + await sut.StartAsync(cts.Token); + await Task.Delay(150); + await sut.StopAsync(CancellationToken.None); + + // Assert + await dataUpdateRepo.Received().CloseOpenUpdateEntriesAsync(Arg.Any()); + await searchRepo.Received().ResetPartialSearchesAsync(Arg.Any()); + } + + private static AsyncServiceScope SetupScope( + IDataUpdateRepository dataUpdateRepo, + ISearchRepository searchRepo, + IScheduleChecker scheduleChecker, + ISearchNotificationService notificationService) + { + var serviceProvider = Substitute.For(); + serviceProvider.GetService(typeof(IDataUpdateRepository)).Returns(dataUpdateRepo); + serviceProvider.GetService(typeof(ISearchRepository)).Returns(searchRepo); + serviceProvider.GetService(typeof(IScheduleChecker)).Returns(scheduleChecker); + serviceProvider.GetService(typeof(ISearchNotificationService)).Returns(notificationService); + + var scope = Substitute.For(); + scope.ServiceProvider.Returns(serviceProvider); + + return new AsyncServiceScope(scope); + } +} +``` + +**Step 2: Run tests to verify they fail** + +Run: `dotnet test tests/JdeScoping.DataSync.Tests --filter "WorkProcessorTests" -v n` +Expected: FAIL (missing class) + +**Step 3: Implement WorkProcessor** + +```csharp +using JdeScoping.Core.Interfaces; +using JdeScoping.Core.Models.Search; +using JdeScoping.DataSync.Contracts; +using JdeScoping.DataSync.Options; +using JdeScoping.DataSync.Telemetry; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; + +namespace JdeScoping.DataSync; + +/// +/// Unified background service that coordinates data synchronization and search processing. +/// Data freshness takes priority over search processing. +/// +public class WorkProcessor : BackgroundService +{ + private readonly IServiceScopeFactory _scopeFactory; + private readonly WorkProcessorOptions _options; + private readonly ILogger _logger; + private readonly DataSyncMetrics _metrics; + + private DateTime _lastPurgeCheck = DateTime.MinValue; + private static readonly TimeSpan PurgeCheckInterval = TimeSpan.FromHours(24); + + public WorkProcessor( + IServiceScopeFactory scopeFactory, + IOptions options, + ILogger logger, + DataSyncMetrics metrics) + { + _scopeFactory = scopeFactory ?? throw new ArgumentNullException(nameof(scopeFactory)); + _options = options?.Value ?? throw new ArgumentNullException(nameof(options)); + _logger = logger ?? throw new ArgumentNullException(nameof(logger)); + _metrics = metrics ?? throw new ArgumentNullException(nameof(metrics)); + } + + protected override async Task ExecuteAsync(CancellationToken stoppingToken) + { + if (!_options.Enabled) + { + _logger.LogInformation("WorkProcessor is disabled"); + return; + } + + _logger.LogInformation( + "WorkProcessor starting with WorkInterval={WorkInterval}", + _options.WorkInterval); + + // Startup cleanup + await StartupCleanupAsync(stoppingToken); + + while (!stoppingToken.IsCancellationRequested) + { + string status = "Idle"; + try + { + status = await DoWorkAsync(stoppingToken); + } + catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested) + { + _logger.LogInformation("WorkProcessor stopping gracefully"); + break; + } + catch (Exception ex) + { + _logger.LogError(ex, "Error in work cycle"); + _metrics.RecordCycleError(); + } + finally + { + // Always notify status (best-effort) + await NotifyStatusSafeAsync(status, stoppingToken); + } + + try + { + await Task.Delay(_options.WorkInterval, stoppingToken); + } + catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested) + { + break; + } + } + + await NotifyStatusSafeAsync("Stopped", CancellationToken.None); + _logger.LogInformation("WorkProcessor stopped"); + } + + private async Task StartupCleanupAsync(CancellationToken ct) + { + await using var scope = _scopeFactory.CreateAsyncScope(); + + // Close interrupted DataUpdate entries + try + { + var dataUpdateRepo = scope.ServiceProvider.GetRequiredService(); + var closedCount = await dataUpdateRepo.CloseOpenUpdateEntriesAsync(ct); + if (closedCount > 0) + { + _logger.LogWarning("Closed {Count} interrupted data update entries", closedCount); + } + } + catch (Exception ex) + { + _logger.LogError(ex, "Failed to close open data update entries"); + } + + // Reset partial searches + try + { + var searchRepo = scope.ServiceProvider.GetRequiredService(); + var resetCount = await searchRepo.ResetPartialSearchesAsync(ct); + if (resetCount > 0) + { + _logger.LogWarning("Reset {Count} partial searches to Queued", resetCount); + } + } + catch (Exception ex) + { + _logger.LogError(ex, "Failed to reset partial searches"); + } + } + + private async Task DoWorkAsync(CancellationToken ct) + { + await using var scope = _scopeFactory.CreateAsyncScope(); + + // Priority 1: Data syncs + var scheduleChecker = scope.ServiceProvider.GetRequiredService(); + var pendingTasks = await scheduleChecker.GetPendingTasksAsync(ct); + + if (pendingTasks.Count > 0) + { + await NotifyStatusSafeAsync("Updating data cache", ct); + + var orchestrator = scope.ServiceProvider.GetRequiredService(); + await orchestrator.ExecutePendingSyncsAsync(ct); + + // Periodic purge check + await PurgeOldEntriesAsync(scope, ct); + + return "Idle"; + } + + // Priority 2: Search processing (only when syncs are current) + var searchRepository = scope.ServiceProvider.GetRequiredService(); + var search = await searchRepository.GetNextQueuedSearchAsync(ct); + + if (search != null) + { + await NotifyStatusSafeAsync($"Processing search #{search.Id}", ct); + + var executionService = scope.ServiceProvider.GetRequiredService(); + await executionService.ExecuteSearchAsync(search, ct); + } + + // Periodic purge check + await PurgeOldEntriesAsync(scope, ct); + + return "Idle"; + } + + private async Task PurgeOldEntriesAsync(AsyncServiceScope scope, CancellationToken ct) + { + if (DateTime.UtcNow - _lastPurgeCheck < PurgeCheckInterval) + { + return; + } + + _lastPurgeCheck = DateTime.UtcNow; + + try + { + var repository = scope.ServiceProvider.GetRequiredService(); + var purgedCount = await repository.PurgeOldEntriesAsync( + _options.PurgeRetentionDays, ct); + + if (purgedCount > 0) + { + _logger.LogInformation( + "Purged {Count} DataUpdate records older than {Days} days", + purgedCount, _options.PurgeRetentionDays); + } + } + catch (Exception ex) + { + _logger.LogError(ex, "Failed to purge old data update entries"); + } + } + + private async Task NotifyStatusSafeAsync(string status, CancellationToken ct) + { + try + { + await using var scope = _scopeFactory.CreateAsyncScope(); + var notificationService = scope.ServiceProvider.GetRequiredService(); + await notificationService.NotifyStatusAsync(status, ct); + } + catch (Exception ex) + { + _logger.LogDebug(ex, "Failed to send status notification"); + // Best-effort - don't throw + } + } +} +``` + +**Step 4: Run tests to verify they pass** + +Run: `dotnet test tests/JdeScoping.DataSync.Tests --filter "WorkProcessorTests" -v n` +Expected: PASS + +**Step 5: Commit** + +```bash +git add src/JdeScoping.DataSync/WorkProcessor.cs +git add tests/JdeScoping.DataSync.Tests/WorkProcessorTests.cs +git commit -m "feat(datasync): implement WorkProcessor background service" +``` + +--- + +## Task 10: Implement SearchNotificationService + +**Files:** +- Create: `src/JdeScoping.Api/Services/SearchNotificationService.cs` + +**Step 1: Create the implementation** + +```csharp +using JdeScoping.Api.Hubs; +using JdeScoping.Core.Interfaces; +using JdeScoping.Core.Models.Infrastructure; +using JdeScoping.Core.Models.Search; +using Microsoft.AspNetCore.SignalR; +using Microsoft.Extensions.Logging; + +namespace JdeScoping.Api.Services; + +/// +/// SignalR-based implementation of search notification service. +/// +public class SearchNotificationService : ISearchNotificationService +{ + private readonly IHubContext _hubContext; + private readonly ILogger _logger; + + public SearchNotificationService( + IHubContext hubContext, + ILogger logger) + { + _hubContext = hubContext ?? throw new ArgumentNullException(nameof(hubContext)); + _logger = logger ?? throw new ArgumentNullException(nameof(logger)); + } + + /// + public async Task NotifySearchUpdateAsync(Search search, CancellationToken ct = default) + { + try + { + var update = new SearchUpdate + { + Id = search.Id, + Status = search.Status, + StartDt = search.StartDt, + EndDt = search.EndDt + }; + + await _hubContext.Clients.All.SendAsync("searchUpdate", update, ct); + _logger.LogDebug("Search update sent for {SearchId}: {Status}", search.Id, search.Status); + } + catch (Exception ex) + { + _logger.LogDebug(ex, "Failed to send search update notification for {SearchId}", search.Id); + } + } + + /// + public async Task NotifyStatusAsync(string status, CancellationToken ct = default) + { + try + { + var update = new StatusUpdate(status); + + await _hubContext.Clients.All.SendAsync("statusUpdate", update, ct); + _logger.LogDebug("Status update sent: {Status}", status); + } + catch (Exception ex) + { + _logger.LogDebug(ex, "Failed to send status notification: {Status}", status); + } + } +} +``` + +**Step 2: Verify the build compiles** + +Run: `dotnet build src/JdeScoping.Api/JdeScoping.Api.csproj` +Expected: Build succeeded + +**Step 3: Commit** + +```bash +git add src/JdeScoping.Api/Services/SearchNotificationService.cs +git commit -m "feat(api): implement SearchNotificationService using SignalR IHubContext" +``` + +--- + +## Task 11: Fix Hourly Lookback Bug in ScheduleChecker + +**Files:** +- Modify: `src/JdeScoping.DataSync/Services/ScheduleChecker.cs` +- Modify: `tests/JdeScoping.DataSync.Tests/ScheduleCheckerTests.cs` + +**Step 1: Write the failing test** + +Add this test to `ScheduleCheckerTests.cs`: + +```csharp +[Fact] +public async Task GetPendingTasksAsync_HourlySync_UsesHourlyTimestampForLookback() +{ + // Arrange + var massUpdate = new DataUpdate + { + TableName = "TestTable", + UpdateType = UpdateTypes.Mass, + EndDt = DateTime.UtcNow.AddDays(-1), + WasSuccessful = true + }; + var dailyUpdate = new DataUpdate + { + TableName = "TestTable", + UpdateType = UpdateTypes.Daily, + EndDt = DateTime.UtcNow.AddHours(-2), // Daily was 2 hours ago + WasSuccessful = true + }; + var hourlyUpdate = new DataUpdate + { + TableName = "TestTable", + UpdateType = UpdateTypes.Hourly, + EndDt = DateTime.UtcNow.AddMinutes(-90), // Hourly was 90 mins ago + WasSuccessful = true + }; + + var updates = new Dictionary + { + ["TestTable_3"] = massUpdate, + ["TestTable_2"] = dailyUpdate, + ["TestTable_1"] = hourlyUpdate + }; + + _repository.GetLastDataUpdatesAsync(Arg.Any()) + .Returns(updates); + + var config = CreateDataSourceConfig( + hourlyEnabled: true, + hourlyInterval: 60, // 1 hour + dailyEnabled: true, + dailyInterval: 1440); // 24 hours + + _options.Value.DataSources.Returns([config]); + + var sut = CreateScheduleChecker(); + + // Act + var tasks = await sut.GetPendingTasksAsync(); + + // Assert + tasks.ShouldHaveSingleItem(); + var task = tasks[0]; + task.UpdateType.ShouldBe(UpdateTypes.Hourly); + + // Critical: MinimumDt should be based on HOURLY timestamp (90 mins ago - 3*60 = -270 mins) + // NOT daily timestamp (2 hours ago - 3*1440 = -4322 mins) + // The lookback should be ~4.5 hours from now, not ~3 days + task.MinimumDt.ShouldNotBeNull(); + var lookbackMinutes = (DateTime.UtcNow - task.MinimumDt.Value).TotalMinutes; + lookbackMinutes.ShouldBeLessThan(300); // Should be < 5 hours, not days +} +``` + +**Step 2: Run test to verify it fails** + +Run: `dotnet test tests/JdeScoping.DataSync.Tests --filter "HourlySync_UsesHourlyTimestampForLookback" -v n` +Expected: FAIL (lookback is days instead of hours) + +**Step 3: Fix the bug in ScheduleChecker** + +Change lines 110-114 in `ScheduleChecker.cs`: + +```csharp +// BEFORE (buggy - uses daily values): +// Check Hourly (uses Daily's last timestamp for MinimumDT calculation, per legacy behavior) +if (config.HourlyConfig.Enabled && NeedsHourlySync(config, lastHourly, lastDaily, lastMass, now)) +{ + // Use daily update timestamp for lookback, not hourly + var minimumDt = CalculateMinimumDt(lastDaily, config.DailyConfig.IntervalMinutes); + +// AFTER (fixed - uses hourly values): +// Check Hourly +if (config.HourlyConfig.Enabled && NeedsHourlySync(config, lastHourly, lastDaily, lastMass, now)) +{ + // Use hourly update timestamp and interval for lookback + var minimumDt = CalculateMinimumDt(lastHourly, config.HourlyConfig.IntervalMinutes); +``` + +Also update the comment on line 110 to remove the misleading legacy note. + +**Step 4: Run test to verify it passes** + +Run: `dotnet test tests/JdeScoping.DataSync.Tests --filter "HourlySync_UsesHourlyTimestampForLookback" -v n` +Expected: PASS + +**Step 5: Run all ScheduleChecker tests** + +Run: `dotnet test tests/JdeScoping.DataSync.Tests --filter "ScheduleCheckerTests" -v n` +Expected: All PASS + +**Step 6: Commit** + +```bash +git add src/JdeScoping.DataSync/Services/ScheduleChecker.cs +git add tests/JdeScoping.DataSync.Tests/ScheduleCheckerTests.cs +git commit -m "fix(datasync): use hourly timestamp for hourly lookback calculation + +Fixes legacy bug where hourly sync used daily timestamp and interval +for lookback calculation, resulting in 3-day lookback instead of 3 hours." +``` + +--- + +## Task 12: Update DependencyInjection + +**Files:** +- Modify: `src/JdeScoping.DataSync/DependencyInjection.cs` +- Modify: `src/JdeScoping.Host/Program.cs` (or wherever services are registered) +- Modify: `src/JdeScoping.DataAccess/DependencyInjection.cs` + +**Step 1: Update DataSync DependencyInjection** + +Replace the `DataSyncService` registration with `WorkProcessor` and add new services: + +```csharp +using JdeScoping.DataSync; +using JdeScoping.DataSync.Options; +using JdeScoping.DataSync.Contracts; +using JdeScoping.DataSync.HealthChecks; +using JdeScoping.DataSync.Services; +using JdeScoping.DataSync.Telemetry; +using Microsoft.Extensions.Configuration; + +namespace Microsoft.Extensions.DependencyInjection; + +/// +/// Extension methods for registering data sync services. +/// +public static class DataSyncDependencyInjection +{ + /// + /// Adds data synchronization services to the service collection. + /// + public static IServiceCollection AddDataSyncServices( + this IServiceCollection services, + IConfiguration configuration) + { + // Bind configuration with validation + services.AddOptions() + .Bind(configuration.GetSection(DataSyncOptions.SectionName)) + .ValidateDataAnnotations() + .ValidateOnStart(); + + services.AddOptions() + .Bind(configuration.GetSection(WorkProcessorOptions.SectionName)) + .ValidateDataAnnotations() + .ValidateOnStart(); + + // Pipeline configuration (new ETL infrastructure) + services.AddOptions() + .Bind(configuration.GetSection(PipelineOptions.SectionName)); + + // Pipeline factory (new ETL infrastructure) + services.AddSingleton(); + + // Register WorkProcessor (replaces DataSyncService) + services.AddHostedService(); + + // Register core services as scoped (for parallel isolation) + services.AddScoped(); + services.AddScoped(); + services.AddScoped(); + services.AddScoped(); + + // Search services (scoped for parallel isolation) + services.AddScoped(); + services.AddScoped(); + + // Register health check + services.AddHealthChecks() + .AddCheck("data-sync", tags: ["datasync", "background"]); + + // Register metrics as singleton + services.AddSingleton(); + + return services; + } +} +``` + +**Step 2: Update DataAccess DependencyInjection to register ISearchProcessor** + +Add to `src/JdeScoping.DataAccess/DependencyInjection.cs`: + +```csharp +services.AddScoped(); +``` + +**Step 3: Update Host/Program.cs to register SearchNotificationService** + +Add to service registration: + +```csharp +builder.Services.AddScoped(); +``` + +**Step 4: Verify the build compiles** + +Run: `dotnet build` +Expected: Build succeeded + +**Step 5: Commit** + +```bash +git add src/JdeScoping.DataSync/DependencyInjection.cs +git add src/JdeScoping.DataAccess/DependencyInjection.cs +git add src/JdeScoping.Host/Program.cs +git commit -m "feat: wire up WorkProcessor and search services in DI + +- Replace DataSyncService with WorkProcessor +- Register ISearchRepository, ISearchExecutionService +- Register ISearchProcessor in DataAccess +- Register ISearchNotificationService in Host" +``` + +--- + +## Task 13: Add Configuration to appsettings + +**Files:** +- Modify: `src/JdeScoping.Host/appsettings.json` +- Modify: `src/JdeScoping.Host/appsettings.Development.json` + +**Step 1: Add WorkProcessor section to appsettings.json** + +```json +{ + "WorkProcessor": { + "Enabled": true, + "WorkInterval": "00:00:05", + "SearchTimeout": "00:30:00", + "PurgeRetentionDays": 30 + } +} +``` + +**Step 2: Add to appsettings.Development.json** + +```json +{ + "WorkProcessor": { + "Enabled": true, + "WorkInterval": "00:00:05", + "SearchTimeout": "00:05:00", + "PurgeRetentionDays": 7 + } +} +``` + +**Step 3: Verify the build compiles and settings load** + +Run: `dotnet build src/JdeScoping.Host/JdeScoping.Host.csproj` +Expected: Build succeeded + +**Step 4: Commit** + +```bash +git add src/JdeScoping.Host/appsettings.json +git add src/JdeScoping.Host/appsettings.Development.json +git commit -m "config: add WorkProcessor configuration section" +``` + +--- + +## Task 14: Remove DataSyncService + +**Files:** +- Delete: `src/JdeScoping.DataSync/DataSyncService.cs` +- Modify: `tests/JdeScoping.DataSync.Tests/DataSyncServiceTests.cs` (remove or repurpose) + +**Step 1: Delete DataSyncService.cs** + +```bash +rm src/JdeScoping.DataSync/DataSyncService.cs +``` + +**Step 2: Update or remove DataSyncServiceTests.cs** + +Either delete the file or rename it to WorkProcessorTests.cs if the tests are useful: + +```bash +rm tests/JdeScoping.DataSync.Tests/DataSyncServiceTests.cs +``` + +**Step 3: Verify the build compiles** + +Run: `dotnet build` +Expected: Build succeeded + +**Step 4: Run all tests** + +Run: `dotnet test` +Expected: All PASS + +**Step 5: Commit** + +```bash +git add -A +git commit -m "refactor(datasync): remove DataSyncService, replaced by WorkProcessor" +``` + +--- + +## Task 15: Final Integration Test + +**Files:** +- None (verification only) + +**Step 1: Run all tests** + +Run: `dotnet test --verbosity normal` +Expected: All PASS + +**Step 2: Build the solution** + +Run: `dotnet build` +Expected: Build succeeded + +**Step 3: Verify no compiler warnings** + +Run: `dotnet build --warnaserror` +Expected: Build succeeded with no warnings + +**Step 4: Final commit (if any cleanup needed)** + +```bash +git status +# If any uncommitted changes: +git add -A +git commit -m "chore: cleanup after WorkProcessor implementation" +``` + +--- + +## Summary + +This implementation plan covers: + +1. **Core Interfaces** (Tasks 1-2): ISearchProcessor, ISearchNotificationService +2. **Options** (Task 3): WorkProcessorOptions configuration +3. **Contracts** (Tasks 4-5): ISearchRepository, ISearchExecutionService +4. **Implementations** (Tasks 6-10): SearchProcessor interface, SearchRepository, SearchExecutionService, WorkProcessor, SearchNotificationService +5. **Bug Fix** (Task 11): Hourly lookback uses correct hourly values +6. **Wiring** (Tasks 12-13): DependencyInjection updates, appsettings configuration +7. **Cleanup** (Task 14): Remove deprecated DataSyncService +8. **Verification** (Task 15): Final integration test + +All tasks follow TDD with bite-sized steps and include commits at each stage.