using System.Diagnostics.Metrics;
using JdeScoping.Core.Interfaces;
using JdeScoping.Core.Models.Enums;
using JdeScoping.Core.Models.Search;
using JdeScoping.DataSync.Configuration;
using EtlPipelineConfig = JdeScoping.DataSync.Configuration.EtlPipelineConfig;
using SourceElement = JdeScoping.DataSync.Configuration.SourceElement;
using DestinationElement = JdeScoping.DataSync.Configuration.DestinationElement;
using JdeScoping.DataSync.Contracts;
using JdeScoping.DataSync.Models;
using JdeScoping.DataSync.Options;
using JdeScoping.DataSync.Telemetry;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging.Abstractions;
using Microsoft.Extensions.Options;
using NSubstitute;
using NSubstitute.ExceptionExtensions;
using Shouldly;
using MsOptions = Microsoft.Extensions.Options.Options;
namespace JdeScoping.DataSync.Tests;
///
/// Unit tests for WorkProcessor background service.
///
public class WorkProcessorTests
{
#region Disabled Service
[Fact]
public async Task ExecuteAsync_WhenDisabled_StopsImmediately()
{
// Arrange
var options = MsOptions.Create(new WorkProcessorOptions { Enabled = false });
var scopeFactory = Substitute.For();
var metrics = CreateMetrics();
var sut = new WorkProcessor(
scopeFactory,
options,
NullLogger.Instance,
metrics);
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(1));
// Act
await sut.StartAsync(cts.Token);
await Task.Delay(50);
await sut.StopAsync(CancellationToken.None);
// Assert - should not throw and scope factory should not be called
scopeFactory.DidNotReceive().CreateAsyncScope();
}
#endregion
#region Startup Cleanup
[Fact]
public async Task ExecuteAsync_CallsStartupCleanup_CloseOpenUpdateEntries()
{
// Arrange
var dataUpdateRepo = Substitute.For();
var searchRepo = Substitute.For();
var scheduleChecker = Substitute.For();
var notificationService = Substitute.For();
scheduleChecker.GetPendingTasksAsync(Arg.Any())
.Returns(new List());
var scopeFactory = SetupScopeFactory(
dataUpdateRepo,
searchRepo,
scheduleChecker,
notificationService);
var options = MsOptions.Create(new WorkProcessorOptions
{
Enabled = true,
WorkInterval = TimeSpan.FromMilliseconds(100)
});
var metrics = CreateMetrics();
var sut = new WorkProcessor(
scopeFactory,
options,
NullLogger.Instance,
metrics);
using var cts = new CancellationTokenSource(TimeSpan.FromMilliseconds(200));
// Act
await sut.StartAsync(cts.Token);
await Task.Delay(150);
await sut.StopAsync(CancellationToken.None);
// Assert
await dataUpdateRepo.Received().CloseOpenUpdateEntriesAsync(Arg.Any());
}
[Fact]
public async Task ExecuteAsync_CallsStartupCleanup_ResetPartialSearches()
{
// Arrange
var dataUpdateRepo = Substitute.For();
var searchRepo = Substitute.For();
var scheduleChecker = Substitute.For();
var notificationService = Substitute.For();
scheduleChecker.GetPendingTasksAsync(Arg.Any())
.Returns(new List());
var scopeFactory = SetupScopeFactory(
dataUpdateRepo,
searchRepo,
scheduleChecker,
notificationService);
var options = MsOptions.Create(new WorkProcessorOptions
{
Enabled = true,
WorkInterval = TimeSpan.FromMilliseconds(100)
});
var metrics = CreateMetrics();
var sut = new WorkProcessor(
scopeFactory,
options,
NullLogger.Instance,
metrics);
using var cts = new CancellationTokenSource(TimeSpan.FromMilliseconds(200));
// Act
await sut.StartAsync(cts.Token);
await Task.Delay(150);
await sut.StopAsync(CancellationToken.None);
// Assert
await searchRepo.Received().ResetPartialSearchesAsync(Arg.Any());
}
[Fact]
public async Task ExecuteAsync_WhenCloseOpenEntriesThrows_ContinuesStarting()
{
// Arrange
var dataUpdateRepo = Substitute.For();
dataUpdateRepo.CloseOpenUpdateEntriesAsync(Arg.Any())
.ThrowsAsync(new Exception("Database error"));
var searchRepo = Substitute.For();
var scheduleChecker = Substitute.For();
scheduleChecker.GetPendingTasksAsync(Arg.Any())
.Returns(new List());
var notificationService = Substitute.For();
var scopeFactory = SetupScopeFactory(
dataUpdateRepo,
searchRepo,
scheduleChecker,
notificationService);
var options = MsOptions.Create(new WorkProcessorOptions
{
Enabled = true,
WorkInterval = TimeSpan.FromMilliseconds(100)
});
var metrics = CreateMetrics();
var sut = new WorkProcessor(
scopeFactory,
options,
NullLogger.Instance,
metrics);
using var cts = new CancellationTokenSource(TimeSpan.FromMilliseconds(200));
// Act - should not throw
await sut.StartAsync(cts.Token);
await Task.Delay(150);
await sut.StopAsync(CancellationToken.None);
// Assert - search repo should still be called (startup continues)
await searchRepo.Received().ResetPartialSearchesAsync(Arg.Any());
}
[Fact]
public async Task ExecuteAsync_WhenResetPartialSearchesThrows_ContinuesRunning()
{
// Arrange
var dataUpdateRepo = Substitute.For();
var searchRepo = Substitute.For();
searchRepo.ResetPartialSearchesAsync(Arg.Any())
.ThrowsAsync(new Exception("Database error"));
var scheduleChecker = Substitute.For();
var callCount = 0;
scheduleChecker.GetPendingTasksAsync(Arg.Any())
.Returns(x =>
{
callCount++;
return new List();
});
var notificationService = Substitute.For();
var scopeFactory = SetupScopeFactory(
dataUpdateRepo,
searchRepo,
scheduleChecker,
notificationService);
var options = MsOptions.Create(new WorkProcessorOptions
{
Enabled = true,
WorkInterval = TimeSpan.FromMilliseconds(50)
});
var metrics = CreateMetrics();
var sut = new WorkProcessor(
scopeFactory,
options,
NullLogger.Instance,
metrics);
using var cts = new CancellationTokenSource(TimeSpan.FromMilliseconds(200));
// Act - should not throw
await sut.StartAsync(cts.Token);
await Task.Delay(150);
await sut.StopAsync(CancellationToken.None);
// Assert - service continues running after startup error
callCount.ShouldBeGreaterThan(0);
}
#endregion
#region Priority Processing
[Fact]
public async Task DoWorkAsync_WhenPendingTasks_ExecutesSyncs()
{
// Arrange
var dataUpdateRepo = Substitute.For();
var searchRepo = Substitute.For();
var orchestrator = Substitute.For();
var scheduleChecker = Substitute.For();
scheduleChecker.GetPendingTasksAsync(Arg.Any())
.Returns(new List { CreateTask("TestTable", UpdateTypes.Daily) });
var notificationService = Substitute.For();
var scopeFactory = SetupScopeFactory(
dataUpdateRepo,
searchRepo,
scheduleChecker,
notificationService,
orchestrator: orchestrator);
var options = MsOptions.Create(new WorkProcessorOptions
{
Enabled = true,
WorkInterval = TimeSpan.FromMilliseconds(100)
});
var metrics = CreateMetrics();
var sut = new WorkProcessor(
scopeFactory,
options,
NullLogger.Instance,
metrics);
using var cts = new CancellationTokenSource(TimeSpan.FromMilliseconds(200));
// Act
await sut.StartAsync(cts.Token);
await Task.Delay(150);
await sut.StopAsync(CancellationToken.None);
// Assert
await orchestrator.Received().ExecutePendingSyncsAsync(Arg.Any());
}
[Fact]
public async Task DoWorkAsync_WhenNoPendingTasks_ChecksForQueuedSearches()
{
// Arrange
var dataUpdateRepo = Substitute.For();
var searchRepo = Substitute.For();
var scheduleChecker = Substitute.For();
scheduleChecker.GetPendingTasksAsync(Arg.Any())
.Returns(new List());
var notificationService = Substitute.For();
var scopeFactory = SetupScopeFactory(
dataUpdateRepo,
searchRepo,
scheduleChecker,
notificationService);
var options = MsOptions.Create(new WorkProcessorOptions
{
Enabled = true,
WorkInterval = TimeSpan.FromMilliseconds(100)
});
var metrics = CreateMetrics();
var sut = new WorkProcessor(
scopeFactory,
options,
NullLogger.Instance,
metrics);
using var cts = new CancellationTokenSource(TimeSpan.FromMilliseconds(200));
// Act
await sut.StartAsync(cts.Token);
await Task.Delay(150);
await sut.StopAsync(CancellationToken.None);
// Assert
await searchRepo.Received().GetNextQueuedSearchAsync(Arg.Any());
}
[Fact]
public async Task DoWorkAsync_WhenQueuedSearchExists_ExecutesSearch()
{
// Arrange
var dataUpdateRepo = Substitute.For();
var searchRepo = Substitute.For();
var queuedSearch = new Search { Id = 42 };
searchRepo.GetNextQueuedSearchAsync(Arg.Any())
.Returns(queuedSearch);
var searchExecution = Substitute.For();
var scheduleChecker = Substitute.For();
scheduleChecker.GetPendingTasksAsync(Arg.Any())
.Returns(new List());
var notificationService = Substitute.For();
var scopeFactory = SetupScopeFactory(
dataUpdateRepo,
searchRepo,
scheduleChecker,
notificationService,
searchExecution: searchExecution);
var options = MsOptions.Create(new WorkProcessorOptions
{
Enabled = true,
WorkInterval = TimeSpan.FromMilliseconds(100)
});
var metrics = CreateMetrics();
var sut = new WorkProcessor(
scopeFactory,
options,
NullLogger.Instance,
metrics);
using var cts = new CancellationTokenSource(TimeSpan.FromMilliseconds(200));
// Act
await sut.StartAsync(cts.Token);
await Task.Delay(150);
await sut.StopAsync(CancellationToken.None);
// Assert
await searchExecution.Received().ExecuteSearchAsync(
Arg.Is(s => s.Id == 42),
Arg.Any());
}
[Fact]
public async Task DoWorkAsync_WhenPendingTasks_DoesNotProcessSearches()
{
// Arrange
var dataUpdateRepo = Substitute.For();
var searchRepo = Substitute.For();
var orchestrator = Substitute.For();
var scheduleChecker = Substitute.For();
scheduleChecker.GetPendingTasksAsync(Arg.Any())
.Returns(new List { CreateTask("TestTable", UpdateTypes.Daily) });
var notificationService = Substitute.For();
var scopeFactory = SetupScopeFactory(
dataUpdateRepo,
searchRepo,
scheduleChecker,
notificationService,
orchestrator: orchestrator);
var options = MsOptions.Create(new WorkProcessorOptions
{
Enabled = true,
WorkInterval = TimeSpan.FromMilliseconds(100)
});
var metrics = CreateMetrics();
var sut = new WorkProcessor(
scopeFactory,
options,
NullLogger.Instance,
metrics);
using var cts = new CancellationTokenSource(TimeSpan.FromMilliseconds(200));
// Act
await sut.StartAsync(cts.Token);
await Task.Delay(150);
await sut.StopAsync(CancellationToken.None);
// Assert - when syncs are pending, searches are not processed
await searchRepo.DidNotReceive().GetNextQueuedSearchAsync(Arg.Any());
}
#endregion
#region Error Handling
[Fact]
public async Task ExecuteAsync_WhenDoWorkThrows_ContinuesLoop()
{
// Arrange
var dataUpdateRepo = Substitute.For();
var searchRepo = Substitute.For();
var callCount = 0;
var scheduleChecker = Substitute.For();
scheduleChecker.GetPendingTasksAsync(Arg.Any())
.Returns(x =>
{
callCount++;
if (callCount == 1)
{
throw new Exception("Test error");
}
return new List();
});
var notificationService = Substitute.For();
var scopeFactory = SetupScopeFactory(
dataUpdateRepo,
searchRepo,
scheduleChecker,
notificationService);
var options = MsOptions.Create(new WorkProcessorOptions
{
Enabled = true,
WorkInterval = TimeSpan.FromMilliseconds(50)
});
var metrics = CreateMetrics();
var sut = new WorkProcessor(
scopeFactory,
options,
NullLogger.Instance,
metrics);
using var cts = new CancellationTokenSource(TimeSpan.FromMilliseconds(300));
// Act
await sut.StartAsync(cts.Token);
await Task.Delay(250);
await sut.StopAsync(CancellationToken.None);
// Assert - should have been called multiple times despite first error
callCount.ShouldBeGreaterThan(1);
}
#endregion
#region Status Notifications
[Fact]
public async Task NotifyStatusSafeAsync_WhenNotificationThrows_DoesNotCrash()
{
// Arrange
var dataUpdateRepo = Substitute.For();
var searchRepo = Substitute.For();
var scheduleChecker = Substitute.For();
scheduleChecker.GetPendingTasksAsync(Arg.Any())
.Returns(new List());
var notificationService = Substitute.For();
notificationService.NotifyStatusAsync(Arg.Any(), Arg.Any())
.ThrowsAsync(new Exception("SignalR error"));
var scopeFactory = SetupScopeFactory(
dataUpdateRepo,
searchRepo,
scheduleChecker,
notificationService);
var options = MsOptions.Create(new WorkProcessorOptions
{
Enabled = true,
WorkInterval = TimeSpan.FromMilliseconds(100)
});
var metrics = CreateMetrics();
var sut = new WorkProcessor(
scopeFactory,
options,
NullLogger.Instance,
metrics);
using var cts = new CancellationTokenSource(TimeSpan.FromMilliseconds(200));
// Act - should not throw
await sut.StartAsync(cts.Token);
await Task.Delay(150);
await sut.StopAsync(CancellationToken.None);
// Assert - no exception thrown, service runs
}
#endregion
#region Graceful Shutdown
[Fact]
public async Task ExecuteAsync_WhenCancelled_StopsGracefully()
{
// Arrange
var dataUpdateRepo = Substitute.For();
var searchRepo = Substitute.For();
var scheduleChecker = Substitute.For();
scheduleChecker.GetPendingTasksAsync(Arg.Any())
.Returns(new List());
var notificationService = Substitute.For();
var scopeFactory = SetupScopeFactory(
dataUpdateRepo,
searchRepo,
scheduleChecker,
notificationService);
var options = MsOptions.Create(new WorkProcessorOptions
{
Enabled = true,
WorkInterval = TimeSpan.FromSeconds(10) // Long interval
});
var metrics = CreateMetrics();
var sut = new WorkProcessor(
scopeFactory,
options,
NullLogger.Instance,
metrics);
using var cts = new CancellationTokenSource();
// Act
await sut.StartAsync(cts.Token);
await Task.Delay(50);
cts.Cancel();
var stopTask = sut.StopAsync(CancellationToken.None);
var completed = await Task.WhenAny(stopTask, Task.Delay(2000));
// Assert - should complete without hanging
completed.ShouldBe(stopTask);
}
[Fact]
public async Task ExecuteAsync_WhenCancelledDuringWork_HandlesOperationCanceledException()
{
// Arrange
var dataUpdateRepo = Substitute.For();
var searchRepo = Substitute.For();
var orchestrator = Substitute.For();
var cts = new CancellationTokenSource();
orchestrator.ExecutePendingSyncsAsync(Arg.Any())
.Returns(async x =>
{
cts.Cancel();
await Task.Delay(100, x.Arg());
});
var scheduleChecker = Substitute.For();
scheduleChecker.GetPendingTasksAsync(Arg.Any())
.Returns(new List { CreateTask("TestTable", UpdateTypes.Daily) });
var notificationService = Substitute.For();
var scopeFactory = SetupScopeFactory(
dataUpdateRepo,
searchRepo,
scheduleChecker,
notificationService,
orchestrator: orchestrator);
var options = MsOptions.Create(new WorkProcessorOptions
{
Enabled = true,
WorkInterval = TimeSpan.FromMilliseconds(100)
});
var metrics = CreateMetrics();
var sut = new WorkProcessor(
scopeFactory,
options,
NullLogger.Instance,
metrics);
// Act
await sut.StartAsync(cts.Token);
await Task.Delay(200);
var stopTask = sut.StopAsync(CancellationToken.None);
var completed = await Task.WhenAny(stopTask, Task.Delay(2000));
// Assert - should complete gracefully
completed.ShouldBe(stopTask);
cts.Dispose();
}
#endregion
#region Helper Methods
private static DataSyncMetrics CreateMetrics()
{
var services = new ServiceCollection();
services.AddMetrics();
var provider = services.BuildServiceProvider();
var meterFactory = provider.GetRequiredService();
return new DataSyncMetrics(meterFactory);
}
private static IServiceScopeFactory SetupScopeFactory(
IDataUpdateRepository dataUpdateRepo,
ISearchRepository searchRepo,
IScheduleChecker scheduleChecker,
ISearchNotificationService notificationService,
ISyncOrchestrator? orchestrator = null,
ISearchExecutionService? searchExecution = null)
{
orchestrator ??= Substitute.For();
searchExecution ??= Substitute.For();
var services = new ServiceCollection();
services.AddScoped(_ => dataUpdateRepo);
services.AddScoped(_ => searchRepo);
services.AddScoped(_ => scheduleChecker);
services.AddScoped(_ => notificationService);
services.AddScoped(_ => orchestrator);
services.AddScoped(_ => searchExecution);
var serviceProvider = services.BuildServiceProvider();
return serviceProvider.GetRequiredService();
}
private static DataUpdateTask CreateTask(string tableName, UpdateTypes updateType)
{
return new DataUpdateTask
{
TableName = tableName,
SourceSystem = "JDE",
SourceData = tableName.ToUpper(),
UpdateType = updateType,
MinimumDt = null,
Pipeline = new EtlPipelineConfig
{
Name = tableName,
IsEnabled = true,
MassSyncIntervalMinutes = 10080,
DailySyncIntervalMinutes = 1440,
HourlySyncIntervalMinutes = 60,
Source = new SourceElement { Connection = "JDE", Query = "SELECT 1" },
Destination = new DestinationElement { Table = tableName, MatchColumns = ["Id"] }
}
};
}
#endregion
}