Files
Joseph Doherty 8630a5d32b docs: add WorkProcessor design and implementation documentation
- WorkProcessorReport.md: Analysis of legacy work processor from OLD solution
- Design document with clean architecture and component specifications
- Implementation plan with 15 TDD tasks
2026-01-07 06:30:54 -05:00

408 lines
16 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# Work Processor Report
This document describes the background work processor from the legacy JDE Scoping Tool (LotFinder) WorkerService. The work processor is responsible for coordinating data synchronization and search processing.
## Executive Summary
The `WorkProcessor` class is the main orchestration component that runs as a Windows service (via Topshelf). It implements a continuous polling loop that:
1. Checks if any data syncs are overdue
2. If overdue syncs exist → performs data updates in parallel
3. If all syncs are current → processes the next queued search
4. Sleeps for 5 seconds and repeats
**Key Design Principle:** Data freshness takes priority over search processing. Searches are only processed when all data caches are up to date.
---
## Architecture Overview
```
┌─────────────────────────────────────────────────────────────────┐
│ WorkProcessor │
│ (Topshelf Service) │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────┐ ┌───────────────┐ ┌──────────────┐ │
│ │ Start() │────▶│ DoWork() │────▶│ Stop() │ │
│ └──────────────┘ └───────┬───────┘ └──────────────┘ │
│ │ │
│ ┌─────────────────┼─────────────────┐ │
│ ▼ ▼ ▼ │
│ ┌──────────────────┐ ┌───────────┐ ┌─────────────────┐ │
│ │ UpdateProcessor │ │ Search │ │ SignalR Hub │ │
│ │ (Data Sync) │ │ Processor │ │ (Status Update) │ │
│ └──────────────────┘ └───────────┘ └─────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
```
---
## Source Files
| File | Purpose |
|------|---------|
| `OLD/WorkerService/Process/WorkProcessor.cs` | Main orchestration loop |
| `OLD/WorkerService/Process/UpdateProcessor.cs` | Data sync coordination |
| `OLD/WorkerService/Process/UpdateProcessor.DataUpdateEntry.cs` | DataUpdate table logging |
| `OLD/WorkerService/Process/UpdateProcessor.TableManagement.cs` | Staging/merge SQL generation |
| `OLD/WorkerService/Process/LotFinderDBExt.cs` | Search query execution |
| `OLD/WorkerService/Process/ExcelWriter.cs` | Excel result generation |
| `OLD/WorkerService/Program.cs` | Topshelf service configuration |
---
## Main Work Loop
### Constants
| Constant | Value | Description |
|----------|-------|-------------|
| `WAIT_INTERVAL` | 5000 ms (5 seconds) | Sleep duration between work cycles |
### Loop Structure
```
Start()
└── Thread spawned
└── while(true)
├── DoWork()
└── cancel.WaitOne(WAIT_INTERVAL)
└── if signaled → break
```
The loop runs indefinitely until `Stop()` is called, which sets the `cancel` ManualResetEvent.
---
## DoWork() Algorithm
The `DoWork()` method implements the following decision flow:
```
DoWork()
├─▶ GetPendingUpdateTasks()
│ └── Returns list of overdue data syncs
├─▶ IF pending updates exist:
│ ├── Status = "Updating data cache"
│ ├── Parallel.ForEach(pending, MaxDegreeOfParallelism=8)
│ │ └── UpdateProcessor.DoUpdate(task)
│ └── RETURN (skip search processing this cycle)
├─▶ ELSE (all data syncs current):
│ │
│ ├── ResetPartialSearches()
│ │ └── Reset any searches that started but never completed
│ │
│ ├── GetNextSearch()
│ │ └── Get oldest queued search (Status=1, ORDER BY SubmitDT)
│ │
│ └── IF search found:
│ ├── Status = "Processing search #{ID}"
│ ├── StartSearch(search)
│ │ └── Status → 2 (Started), set StartDT
│ ├── PublishSearchUpdate(search) → SignalR
│ │
│ ├── Execute search query
│ ├── Generate Excel results
│ ├── Save debug file: search_{ID}.xlsx
│ │
│ ├── CompleteSearch(search, success=true)
│ │ └── Status → 3 (Ended), store Results
│ └── PublishSearchUpdate(search) → SignalR
│ ON EXCEPTION:
│ ├── Log error
│ ├── CompleteSearch(search, success=false)
│ │ └── Status → 4 (Error)
│ └── PublishSearchUpdate(search) → SignalR
└── Status = "Idle"
```
---
## Data Sync Scheduling
### How "Overdue" Is Determined
The `GetPendingUpdateTasks()` method checks each configured data source against the `LastDataUpdates` view:
```sql
-- LastDataUpdates view returns the most recent successful update per table/type
SELECT TableName,
COALESCE([3], '1970-01-01') AS MassUpdateDT, -- UpdateType 3 = Mass
COALESCE([2], '1970-01-01') AS DailyUpdateDT, -- UpdateType 2 = Daily
COALESCE([1], '1970-01-01') AS HourlyUpdateDT -- UpdateType 1 = Hourly
FROM (pivot query on DataUpdate table)
```
### Priority Order (Exclusive)
Only ONE update type is selected per data source per cycle:
| Priority | Condition | Update Type |
|----------|-----------|-------------|
| 1 | No record exists OR `Now > MassUpdateDT + MassInterval` | **Mass** (full reload) |
| 2 | `Now > DailyUpdateDT + DailyInterval` | **Daily** (incremental) |
| 3 | `Now > HourlyUpdateDT + HourlyInterval` | **Hourly** (incremental) |
### Lookback Multiplier
For incremental updates (Daily/Hourly), the query uses a **lookback of 3× the interval**:
```csharp
// Daily update example
MinimumDT = lastDataUpdate.DailyUpdateDT.AddMinutes(-3 * DailyInterval);
// If DailyInterval = 1440 (24 hours), lookback = 3 days
// Hourly update example (note: uses Daily values - see bug below)
MinimumDT = lastDataUpdate.DailyUpdateDT.AddMinutes(-3 * DailyInterval);
```
**Purpose:** The 3× multiplier provides overlap to catch any records that may have been missed due to timing edge cases or delays.
> **⚠️ Legacy Bug:** In `GetPendingUpdateTasks()`, the Hourly update incorrectly uses `DailyUpdateDT` and `DailyUpdateConfig.Interval` instead of the hourly equivalents. This means hourly updates use a 3-day lookback instead of 3 hours. The separate `DoUpdate(tableName, updateType)` overload correctly uses hourly values, but this path is not called from the main work loop.
### Parallel Execution
Data updates run in parallel with controlled concurrency:
```csharp
Parallel.ForEach(pending,
new ParallelOptions { MaxDegreeOfParallelism = 8 },
task => UpdateProcessor.DoUpdate(task));
```
---
## Data Update Process
Each `DoUpdate(task)` execution:
1. **Log Start**: Insert `DataUpdate` record with `NumberRecords = -2` (in-progress marker)
2. **Prepurge** (if configured): `TRUNCATE TABLE {destination}`
3. **Fetch Data**: Call the configured data fetch function (JDE/CMS query)
4. **Batch Processing** (1M records per batch):
- Create staging table (`#Staging{table}`) - local temp table
- Create index on staging table (PK columns + LastUpdateDT/ReleaseDate)
- Disable non-PK indexes on staging table
- Bulk copy data to staging (batch size: 10,000)
- Rebuild indexes on staging table
- Create temp table (`#{table}`) with deduplication via `ROW_NUMBER() OVER (PARTITION BY {PK} ORDER BY LastUpdateDT)`
- MERGE to destination table (only updates when `TARGET.LastUpdateDT < SOURCE.LastUpdateDT`)
5. **Post-Processing**: Run configured action (e.g., `PostProcessMisData`)
6. **Reindex** (if configured): Rebuild all non-PK indexes on destination
7. **Log End**: Update `DataUpdate` record with `WasSuccessful`, `NumberRecords`
### In-Progress Detection
Records with `NumberRecords = -2` indicate updates that started but never completed (crash/restart).
> **Note:** The codebase contains a `CloseOpenUpdateEntries()` method to clean up these records, but it is **never called** in the current implementation. This means crashed updates remain with `NumberRecords = -2` indefinitely. The cleanup SQL exists but is unused:
```sql
-- Defined but never invoked
UPDATE dbo.DataUpdate
SET EndDT = GETDATE(), WasSuccessful = 0, NumberRecords = -1
WHERE NumberRecords = -2
```
---
## Search Processing
### Search Status Values
| Value | Enum | Description |
|-------|------|-------------|
| 0 | `New` | Created but not submitted |
| 1 | `Submitted` | Queued for processing |
| 2 | `Started` | Currently being processed |
| 3 | `Ended` | Completed successfully |
| 4 | `Error` | Failed with error |
### Partial Search Recovery
Before processing new searches, the system resets any "orphaned" searches:
```sql
-- ResetPartialSearches stored procedure
UPDATE dbo.Search
SET Status = 1, StartDT = NULL
WHERE StartDT IS NOT NULL AND EndDT IS NULL
```
This handles cases where the service crashed mid-search.
### Search Execution Flow
1. **GetNextSearch**: Query for oldest `Status = 1` search
2. **StartSearch**: Update to `Status = 2`, set `StartDT`
3. **Execute Query**: Run the generated SQL against local cache
4. **Generate Excel**: Create Excel file using EPPlus
5. **CompleteSearch**: Update `Status = 3` (or 4 on error), store `Results` blob
---
## SignalR Integration
### Status Updates
The `Status` property publishes changes to the SignalR hub:
```csharp
public string Status
{
set
{
if (!string.Equals(_status, value))
{
_status = value;
logger.Info("Status: {0}", _status);
PublishStatus(_status); // → SignalR
}
}
}
```
### Hub Methods
| Method | Payload | When Called |
|--------|---------|-------------|
| `SetStatus` | `{ Message, Timestamp }` | On status property change |
| `PublishSearchUpdate` | `SearchUpdate(search)` | After StartSearch, CompleteSearch |
### Connection Pattern
Each publish creates a new `HubConnection`:
```csharp
string hubHost = ConfigurationManager.AppSettings["HubHost"];
using (HubConnection hubConnection = new HubConnection(hubHost))
{
IHubProxy hubProxy = hubConnection.CreateHubProxy("StatusHub");
await hubConnection.Start();
await hubProxy.Invoke("SetStatus", update);
}
```
**Note:** This is an `async void` fire-and-forget pattern. Failures are logged but don't interrupt processing.
---
## Timing Summary
| Event | Timing |
|-------|--------|
| Work loop interval | 5 seconds |
| Mass sync check | Every ~7 days (10080 minutes) |
| Daily sync check | Every ~24 hours (1440 minutes) |
| Hourly sync check | Every ~1 hour (60 minutes) |
| Query lookback (incremental) | 3× the interval |
| Data update parallelism | Up to 8 concurrent |
| Bulk copy batch size | 10,000 rows |
| Data batch grouping | 1,000,000 rows |
---
## Database Tables
### DataUpdate
Tracks all data sync operations:
```sql
CREATE TABLE dbo.DataUpdate (
ID INT IDENTITY PRIMARY KEY,
SourceSystem VARCHAR(50), -- 'JDE', 'CMS'
SourceData VARCHAR(50), -- 'WORKORDER', 'LOTUSAGE', etc.
TableName VARCHAR(50), -- Destination table name
StartDT DATETIME,
EndDT DATETIME,
UpdateType SMALLINT, -- 1=Hourly, 2=Daily, 3=Mass
WasSuccessful BIT,
NumberRecords BIGINT -- -2=in-progress, -1=crashed, N=count
)
```
### Search
Tracks search requests and results:
```sql
CREATE TABLE dbo.Search (
ID INT IDENTITY PRIMARY KEY,
UserName VARCHAR(128),
Name VARCHAR(128),
Status SMALLINT, -- 0-4 (see SearchStatus enum)
SubmitDT DATETIME,
StartDT DATETIME,
EndDT DATETIME,
Criteria VARCHAR(MAX), -- JSON serialized search criteria
Results VARBINARY(MAX) -- Excel file bytes
)
```
---
## Error Handling
| Scenario | Behavior |
|----------|----------|
| Data update failure | Logged, continues with next update |
| Search execution failure | Status set to Error (4), logged |
| SignalR publish failure | Logged, processing continues |
| Top-level DoWork exception | Logged, swallowed, loop continues |
| Service crash during update | `NumberRecords = -2` cleaned up on restart |
| Service crash during search | `ResetPartialSearches()` requeues on restart |
---
## Configuration
### App.config Settings
| Key | Purpose |
|-----|---------|
| `HubHost` | SignalR hub URL for status updates |
| `querytimeout` | SQL query timeout (default: 600 seconds) |
### dsconfig/*.json
Each data source has a JSON configuration file defining:
- Source system and data identifiers
- Destination table name
- Data fetch function reference
- Schedule configurations (Mass/Daily/Hourly)
See `DataSyncReport.md` for complete configuration details.
---
## Known Issues in Legacy Implementation
| Issue | Severity | Description | Location |
|-------|----------|-------------|----------|
| Hourly lookback bug | Medium | `GetPendingUpdateTasks()` uses `DailyUpdateDT` and `DailyInterval` for hourly updates instead of hourly equivalents. Results in 3-day lookback instead of 3 hours. | `UpdateProcessor.cs:101-103` |
| Unused cleanup code | Low | `CloseOpenUpdateEntries()` is defined but never called. Crashed updates (`NumberRecords = -2`) are never cleaned up. | `UpdateProcessor.DataUpdateEntry.cs:28` |
| Fire-and-forget SignalR | Low | `PublishStatus` and `PublishSearchUpdate` are `async void` methods. Failures don't propagate and new connections are created per-call. | `WorkProcessor.cs:61-107` |
---
## Key Differences from NEW Implementation
| Aspect | OLD (Legacy) | NEW (.NET 10) |
|--------|--------------|---------------|
| Hosting | Topshelf Windows Service | .NET BackgroundService |
| Loop pattern | Manual thread + sleep | IHostedService + Timer |
| Parallelism | `Parallel.ForEach` | Configurable via options |
| SignalR | Legacy ASP.NET SignalR | ASP.NET Core SignalR |
| Configuration | JSON files + App.config | `appsettings.json` + `pipelines.json` |
| Scheduling | Interval-based polling | Same pattern, configurable defaults |
| Hourly lookback | Uses daily values (bug) | Should use correct hourly values |
| Cleanup on start | Not implemented | Should clean up orphaned updates |