using System.Diagnostics.Metrics; using JdeScoping.Core.Interfaces; using JdeScoping.Core.Models.Enums; using JdeScoping.Core.Models.Search; using JdeScoping.DataSync.Configuration; using EtlPipelineConfig = JdeScoping.DataSync.Configuration.EtlPipelineConfig; using SourceElement = JdeScoping.DataSync.Configuration.SourceElement; using DestinationElement = JdeScoping.DataSync.Configuration.DestinationElement; using JdeScoping.DataSync.Contracts; using JdeScoping.DataSync.Models; using JdeScoping.DataSync.Options; using JdeScoping.DataSync.Telemetry; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging.Abstractions; using Microsoft.Extensions.Options; using NSubstitute; using NSubstitute.ExceptionExtensions; using Shouldly; using MsOptions = Microsoft.Extensions.Options.Options; namespace JdeScoping.DataSync.Tests; /// /// Unit tests for WorkProcessor background service. /// public class WorkProcessorTests { #region Disabled Service [Fact] public async Task ExecuteAsync_WhenDisabled_StopsImmediately() { // Arrange var options = MsOptions.Create(new WorkProcessorOptions { Enabled = false }); var scopeFactory = Substitute.For(); var metrics = CreateMetrics(); var sut = new WorkProcessor( scopeFactory, options, NullLogger.Instance, metrics); using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(1)); // Act await sut.StartAsync(cts.Token); await Task.Delay(50); await sut.StopAsync(CancellationToken.None); // Assert - should not throw and scope factory should not be called scopeFactory.DidNotReceive().CreateAsyncScope(); } #endregion #region Startup Cleanup [Fact] public async Task ExecuteAsync_CallsStartupCleanup_CloseOpenUpdateEntries() { // Arrange var dataUpdateRepo = Substitute.For(); var searchRepo = Substitute.For(); var scheduleChecker = Substitute.For(); var notificationService = Substitute.For(); scheduleChecker.GetPendingTasksAsync(Arg.Any()) .Returns(new List()); var scopeFactory = SetupScopeFactory( dataUpdateRepo, searchRepo, scheduleChecker, notificationService); var options = MsOptions.Create(new WorkProcessorOptions { Enabled = true, WorkInterval = TimeSpan.FromMilliseconds(100) }); var metrics = CreateMetrics(); var sut = new WorkProcessor( scopeFactory, options, NullLogger.Instance, metrics); using var cts = new CancellationTokenSource(TimeSpan.FromMilliseconds(200)); // Act await sut.StartAsync(cts.Token); await Task.Delay(150); await sut.StopAsync(CancellationToken.None); // Assert await dataUpdateRepo.Received().CloseOpenUpdateEntriesAsync(Arg.Any()); } [Fact] public async Task ExecuteAsync_CallsStartupCleanup_ResetPartialSearches() { // Arrange var dataUpdateRepo = Substitute.For(); var searchRepo = Substitute.For(); var scheduleChecker = Substitute.For(); var notificationService = Substitute.For(); scheduleChecker.GetPendingTasksAsync(Arg.Any()) .Returns(new List()); var scopeFactory = SetupScopeFactory( dataUpdateRepo, searchRepo, scheduleChecker, notificationService); var options = MsOptions.Create(new WorkProcessorOptions { Enabled = true, WorkInterval = TimeSpan.FromMilliseconds(100) }); var metrics = CreateMetrics(); var sut = new WorkProcessor( scopeFactory, options, NullLogger.Instance, metrics); using var cts = new CancellationTokenSource(TimeSpan.FromMilliseconds(200)); // Act await sut.StartAsync(cts.Token); await Task.Delay(150); await sut.StopAsync(CancellationToken.None); // Assert await searchRepo.Received().ResetPartialSearchesAsync(Arg.Any()); } [Fact] public async Task ExecuteAsync_WhenCloseOpenEntriesThrows_ContinuesStarting() { // Arrange var dataUpdateRepo = Substitute.For(); dataUpdateRepo.CloseOpenUpdateEntriesAsync(Arg.Any()) .ThrowsAsync(new Exception("Database error")); var searchRepo = Substitute.For(); var scheduleChecker = Substitute.For(); scheduleChecker.GetPendingTasksAsync(Arg.Any()) .Returns(new List()); var notificationService = Substitute.For(); var scopeFactory = SetupScopeFactory( dataUpdateRepo, searchRepo, scheduleChecker, notificationService); var options = MsOptions.Create(new WorkProcessorOptions { Enabled = true, WorkInterval = TimeSpan.FromMilliseconds(100) }); var metrics = CreateMetrics(); var sut = new WorkProcessor( scopeFactory, options, NullLogger.Instance, metrics); using var cts = new CancellationTokenSource(TimeSpan.FromMilliseconds(200)); // Act - should not throw await sut.StartAsync(cts.Token); await Task.Delay(150); await sut.StopAsync(CancellationToken.None); // Assert - search repo should still be called (startup continues) await searchRepo.Received().ResetPartialSearchesAsync(Arg.Any()); } [Fact] public async Task ExecuteAsync_WhenResetPartialSearchesThrows_ContinuesRunning() { // Arrange var dataUpdateRepo = Substitute.For(); var searchRepo = Substitute.For(); searchRepo.ResetPartialSearchesAsync(Arg.Any()) .ThrowsAsync(new Exception("Database error")); var scheduleChecker = Substitute.For(); var callCount = 0; scheduleChecker.GetPendingTasksAsync(Arg.Any()) .Returns(x => { callCount++; return new List(); }); var notificationService = Substitute.For(); var scopeFactory = SetupScopeFactory( dataUpdateRepo, searchRepo, scheduleChecker, notificationService); var options = MsOptions.Create(new WorkProcessorOptions { Enabled = true, WorkInterval = TimeSpan.FromMilliseconds(50) }); var metrics = CreateMetrics(); var sut = new WorkProcessor( scopeFactory, options, NullLogger.Instance, metrics); using var cts = new CancellationTokenSource(TimeSpan.FromMilliseconds(200)); // Act - should not throw await sut.StartAsync(cts.Token); await Task.Delay(150); await sut.StopAsync(CancellationToken.None); // Assert - service continues running after startup error callCount.ShouldBeGreaterThan(0); } #endregion #region Priority Processing [Fact] public async Task DoWorkAsync_WhenPendingTasks_ExecutesSyncs() { // Arrange var dataUpdateRepo = Substitute.For(); var searchRepo = Substitute.For(); var orchestrator = Substitute.For(); var scheduleChecker = Substitute.For(); scheduleChecker.GetPendingTasksAsync(Arg.Any()) .Returns(new List { CreateTask("TestTable", UpdateTypes.Daily) }); var notificationService = Substitute.For(); var scopeFactory = SetupScopeFactory( dataUpdateRepo, searchRepo, scheduleChecker, notificationService, orchestrator: orchestrator); var options = MsOptions.Create(new WorkProcessorOptions { Enabled = true, WorkInterval = TimeSpan.FromMilliseconds(100) }); var metrics = CreateMetrics(); var sut = new WorkProcessor( scopeFactory, options, NullLogger.Instance, metrics); using var cts = new CancellationTokenSource(TimeSpan.FromMilliseconds(200)); // Act await sut.StartAsync(cts.Token); await Task.Delay(150); await sut.StopAsync(CancellationToken.None); // Assert await orchestrator.Received().ExecutePendingSyncsAsync(Arg.Any()); } [Fact] public async Task DoWorkAsync_WhenNoPendingTasks_ChecksForQueuedSearches() { // Arrange var dataUpdateRepo = Substitute.For(); var searchRepo = Substitute.For(); var scheduleChecker = Substitute.For(); scheduleChecker.GetPendingTasksAsync(Arg.Any()) .Returns(new List()); var notificationService = Substitute.For(); var scopeFactory = SetupScopeFactory( dataUpdateRepo, searchRepo, scheduleChecker, notificationService); var options = MsOptions.Create(new WorkProcessorOptions { Enabled = true, WorkInterval = TimeSpan.FromMilliseconds(100) }); var metrics = CreateMetrics(); var sut = new WorkProcessor( scopeFactory, options, NullLogger.Instance, metrics); using var cts = new CancellationTokenSource(TimeSpan.FromMilliseconds(200)); // Act await sut.StartAsync(cts.Token); await Task.Delay(150); await sut.StopAsync(CancellationToken.None); // Assert await searchRepo.Received().GetNextQueuedSearchAsync(Arg.Any()); } [Fact] public async Task DoWorkAsync_WhenQueuedSearchExists_ExecutesSearch() { // Arrange var dataUpdateRepo = Substitute.For(); var searchRepo = Substitute.For(); var queuedSearch = new Search { Id = 42 }; searchRepo.GetNextQueuedSearchAsync(Arg.Any()) .Returns(queuedSearch); var searchExecution = Substitute.For(); var scheduleChecker = Substitute.For(); scheduleChecker.GetPendingTasksAsync(Arg.Any()) .Returns(new List()); var notificationService = Substitute.For(); var scopeFactory = SetupScopeFactory( dataUpdateRepo, searchRepo, scheduleChecker, notificationService, searchExecution: searchExecution); var options = MsOptions.Create(new WorkProcessorOptions { Enabled = true, WorkInterval = TimeSpan.FromMilliseconds(100) }); var metrics = CreateMetrics(); var sut = new WorkProcessor( scopeFactory, options, NullLogger.Instance, metrics); using var cts = new CancellationTokenSource(TimeSpan.FromMilliseconds(200)); // Act await sut.StartAsync(cts.Token); await Task.Delay(150); await sut.StopAsync(CancellationToken.None); // Assert await searchExecution.Received().ExecuteSearchAsync( Arg.Is(s => s.Id == 42), Arg.Any()); } [Fact] public async Task DoWorkAsync_WhenPendingTasks_DoesNotProcessSearches() { // Arrange var dataUpdateRepo = Substitute.For(); var searchRepo = Substitute.For(); var orchestrator = Substitute.For(); var scheduleChecker = Substitute.For(); scheduleChecker.GetPendingTasksAsync(Arg.Any()) .Returns(new List { CreateTask("TestTable", UpdateTypes.Daily) }); var notificationService = Substitute.For(); var scopeFactory = SetupScopeFactory( dataUpdateRepo, searchRepo, scheduleChecker, notificationService, orchestrator: orchestrator); var options = MsOptions.Create(new WorkProcessorOptions { Enabled = true, WorkInterval = TimeSpan.FromMilliseconds(100) }); var metrics = CreateMetrics(); var sut = new WorkProcessor( scopeFactory, options, NullLogger.Instance, metrics); using var cts = new CancellationTokenSource(TimeSpan.FromMilliseconds(200)); // Act await sut.StartAsync(cts.Token); await Task.Delay(150); await sut.StopAsync(CancellationToken.None); // Assert - when syncs are pending, searches are not processed await searchRepo.DidNotReceive().GetNextQueuedSearchAsync(Arg.Any()); } #endregion #region Error Handling [Fact] public async Task ExecuteAsync_WhenDoWorkThrows_ContinuesLoop() { // Arrange var dataUpdateRepo = Substitute.For(); var searchRepo = Substitute.For(); var callCount = 0; var scheduleChecker = Substitute.For(); scheduleChecker.GetPendingTasksAsync(Arg.Any()) .Returns(x => { callCount++; if (callCount == 1) { throw new Exception("Test error"); } return new List(); }); var notificationService = Substitute.For(); var scopeFactory = SetupScopeFactory( dataUpdateRepo, searchRepo, scheduleChecker, notificationService); var options = MsOptions.Create(new WorkProcessorOptions { Enabled = true, WorkInterval = TimeSpan.FromMilliseconds(50) }); var metrics = CreateMetrics(); var sut = new WorkProcessor( scopeFactory, options, NullLogger.Instance, metrics); using var cts = new CancellationTokenSource(TimeSpan.FromMilliseconds(300)); // Act await sut.StartAsync(cts.Token); await Task.Delay(250); await sut.StopAsync(CancellationToken.None); // Assert - should have been called multiple times despite first error callCount.ShouldBeGreaterThan(1); } #endregion #region Status Notifications [Fact] public async Task NotifyStatusSafeAsync_WhenNotificationThrows_DoesNotCrash() { // Arrange var dataUpdateRepo = Substitute.For(); var searchRepo = Substitute.For(); var scheduleChecker = Substitute.For(); scheduleChecker.GetPendingTasksAsync(Arg.Any()) .Returns(new List()); var notificationService = Substitute.For(); notificationService.NotifyStatusAsync(Arg.Any(), Arg.Any()) .ThrowsAsync(new Exception("SignalR error")); var scopeFactory = SetupScopeFactory( dataUpdateRepo, searchRepo, scheduleChecker, notificationService); var options = MsOptions.Create(new WorkProcessorOptions { Enabled = true, WorkInterval = TimeSpan.FromMilliseconds(100) }); var metrics = CreateMetrics(); var sut = new WorkProcessor( scopeFactory, options, NullLogger.Instance, metrics); using var cts = new CancellationTokenSource(TimeSpan.FromMilliseconds(200)); // Act - should not throw await sut.StartAsync(cts.Token); await Task.Delay(150); await sut.StopAsync(CancellationToken.None); // Assert - no exception thrown, service runs } #endregion #region Graceful Shutdown [Fact] public async Task ExecuteAsync_WhenCancelled_StopsGracefully() { // Arrange var dataUpdateRepo = Substitute.For(); var searchRepo = Substitute.For(); var scheduleChecker = Substitute.For(); scheduleChecker.GetPendingTasksAsync(Arg.Any()) .Returns(new List()); var notificationService = Substitute.For(); var scopeFactory = SetupScopeFactory( dataUpdateRepo, searchRepo, scheduleChecker, notificationService); var options = MsOptions.Create(new WorkProcessorOptions { Enabled = true, WorkInterval = TimeSpan.FromSeconds(10) // Long interval }); var metrics = CreateMetrics(); var sut = new WorkProcessor( scopeFactory, options, NullLogger.Instance, metrics); using var cts = new CancellationTokenSource(); // Act await sut.StartAsync(cts.Token); await Task.Delay(50); cts.Cancel(); var stopTask = sut.StopAsync(CancellationToken.None); var completed = await Task.WhenAny(stopTask, Task.Delay(2000)); // Assert - should complete without hanging completed.ShouldBe(stopTask); } [Fact] public async Task ExecuteAsync_WhenCancelledDuringWork_HandlesOperationCanceledException() { // Arrange var dataUpdateRepo = Substitute.For(); var searchRepo = Substitute.For(); var orchestrator = Substitute.For(); var cts = new CancellationTokenSource(); orchestrator.ExecutePendingSyncsAsync(Arg.Any()) .Returns(async x => { cts.Cancel(); await Task.Delay(100, x.Arg()); }); var scheduleChecker = Substitute.For(); scheduleChecker.GetPendingTasksAsync(Arg.Any()) .Returns(new List { CreateTask("TestTable", UpdateTypes.Daily) }); var notificationService = Substitute.For(); var scopeFactory = SetupScopeFactory( dataUpdateRepo, searchRepo, scheduleChecker, notificationService, orchestrator: orchestrator); var options = MsOptions.Create(new WorkProcessorOptions { Enabled = true, WorkInterval = TimeSpan.FromMilliseconds(100) }); var metrics = CreateMetrics(); var sut = new WorkProcessor( scopeFactory, options, NullLogger.Instance, metrics); // Act await sut.StartAsync(cts.Token); await Task.Delay(200); var stopTask = sut.StopAsync(CancellationToken.None); var completed = await Task.WhenAny(stopTask, Task.Delay(2000)); // Assert - should complete gracefully completed.ShouldBe(stopTask); cts.Dispose(); } #endregion #region Helper Methods private static DataSyncMetrics CreateMetrics() { var services = new ServiceCollection(); services.AddMetrics(); var provider = services.BuildServiceProvider(); var meterFactory = provider.GetRequiredService(); return new DataSyncMetrics(meterFactory); } private static IServiceScopeFactory SetupScopeFactory( IDataUpdateRepository dataUpdateRepo, ISearchRepository searchRepo, IScheduleChecker scheduleChecker, ISearchNotificationService notificationService, ISyncOrchestrator? orchestrator = null, ISearchExecutionService? searchExecution = null) { orchestrator ??= Substitute.For(); searchExecution ??= Substitute.For(); var services = new ServiceCollection(); services.AddScoped(_ => dataUpdateRepo); services.AddScoped(_ => searchRepo); services.AddScoped(_ => scheduleChecker); services.AddScoped(_ => notificationService); services.AddScoped(_ => orchestrator); services.AddScoped(_ => searchExecution); var serviceProvider = services.BuildServiceProvider(); return serviceProvider.GetRequiredService(); } private static DataUpdateTask CreateTask(string tableName, UpdateTypes updateType) { return new DataUpdateTask { TableName = tableName, SourceSystem = "JDE", SourceData = tableName.ToUpper(), UpdateType = updateType, MinimumDt = null, Pipeline = new EtlPipelineConfig { Name = tableName, IsEnabled = true, MassSyncIntervalMinutes = 10080, DailySyncIntervalMinutes = 1440, HourlySyncIntervalMinutes = 60, Source = new SourceElement { Connection = "JDE", Query = "SELECT 1" }, Destination = new DestinationElement { Table = tableName, MatchColumns = ["Id"] } } }; } #endregion }