using Akka.Actor; using Akka.TestKit.Xunit2; using Microsoft.Data.Sqlite; using Microsoft.Extensions.Logging.Abstractions; using ScadaLink.Commons.Messages.RemoteQuery; using ScadaLink.Commons.Types.Enums; namespace ScadaLink.StoreAndForward.Tests; /// /// StoreAndForward-013: tests for the actor /// bridge — the Query/Retry/Discard request-to-response mapping and the /// ExtractMethodName payload-JSON parsing (including the malformed-JSON branch). /// public class ParkedMessageHandlerActorTests : TestKit, IAsyncLifetime, IDisposable { private readonly SqliteConnection _keepAlive; private readonly StoreAndForwardStorage _storage; private readonly StoreAndForwardService _service; public ParkedMessageHandlerActorTests() { var connStr = $"Data Source=ActorTests_{Guid.NewGuid():N};Mode=Memory;Cache=Shared"; _keepAlive = new SqliteConnection(connStr); _keepAlive.Open(); _storage = new StoreAndForwardStorage(connStr, NullLogger.Instance); var options = new StoreAndForwardOptions { DefaultRetryInterval = TimeSpan.Zero, DefaultMaxRetries = 1, RetryTimerInterval = TimeSpan.FromMinutes(10), ReplicationEnabled = false, }; _service = new StoreAndForwardService( _storage, options, NullLogger.Instance); } public async Task InitializeAsync() => await _storage.InitializeAsync(); public Task DisposeAsync() => Task.CompletedTask; protected override void Dispose(bool disposing) { if (disposing) _keepAlive.Dispose(); base.Dispose(disposing); } /// Enqueues a message and parks it via the retry sweep (MaxRetries = 1). private async Task ParkMessageAsync(StoreAndForwardCategory category, string payloadJson) { _service.RegisterDeliveryHandler(category, _ => throw new HttpRequestException("always fails")); var result = await _service.EnqueueAsync(category, "target", payloadJson, maxRetries: 1); await _service.RetryPendingMessagesAsync(); return result.MessageId; } [Fact] public async Task Query_ReturnsParkedEntries_WithExtractedMethodName() { await ParkMessageAsync(StoreAndForwardCategory.ExternalSystem, """{"MethodName":"StartPump","Args":{}}"""); var actor = Sys.ActorOf(Props.Create(() => new ParkedMessageHandlerActor(_service, "site-1"))); actor.Tell(new ParkedMessageQueryRequest("corr-1", "site-1", 1, 50, DateTimeOffset.UtcNow)); var response = ExpectMsg(); Assert.True(response.Success); Assert.Equal("corr-1", response.CorrelationId); Assert.Equal("site-1", response.SiteId); Assert.Single(response.Messages); Assert.Equal("StartPump", response.Messages[0].MethodName); Assert.Equal(StoreAndForwardCategory.ExternalSystem, response.Messages[0].Category); } [Fact] public async Task Query_NotificationPayload_UsesSubjectAsMethodName() { await ParkMessageAsync(StoreAndForwardCategory.Notification, """{"Subject":"Tank overflow","Body":"..."}"""); var actor = Sys.ActorOf(Props.Create(() => new ParkedMessageHandlerActor(_service, "site-1"))); actor.Tell(new ParkedMessageQueryRequest("corr-2", "site-1", 1, 50, DateTimeOffset.UtcNow)); var response = ExpectMsg(); Assert.True(response.Success); Assert.Equal("Tank overflow", response.Messages[0].MethodName); } [Fact] public async Task Query_MalformedJsonPayload_FallsBackToCategoryName() { await ParkMessageAsync(StoreAndForwardCategory.CachedDbWrite, "not-valid-json{"); var actor = Sys.ActorOf(Props.Create(() => new ParkedMessageHandlerActor(_service, "site-1"))); actor.Tell(new ParkedMessageQueryRequest("corr-3", "site-1", 1, 50, DateTimeOffset.UtcNow)); var response = ExpectMsg(); Assert.True(response.Success); // Malformed JSON must not throw — ExtractMethodName falls back to the category. Assert.Equal(StoreAndForwardCategory.CachedDbWrite.ToString(), response.Messages[0].MethodName); } [Fact] public async Task Query_PayloadWithoutMethodNameOrSubject_FallsBackToCategoryName() { await ParkMessageAsync(StoreAndForwardCategory.ExternalSystem, """{"Unrelated":"value"}"""); var actor = Sys.ActorOf(Props.Create(() => new ParkedMessageHandlerActor(_service, "site-1"))); actor.Tell(new ParkedMessageQueryRequest("corr-4", "site-1", 1, 50, DateTimeOffset.UtcNow)); var response = ExpectMsg(); Assert.Equal(StoreAndForwardCategory.ExternalSystem.ToString(), response.Messages[0].MethodName); } [Fact] public async Task Retry_ParkedMessage_ReturnsSuccess() { var messageId = await ParkMessageAsync(StoreAndForwardCategory.ExternalSystem, """{}"""); var actor = Sys.ActorOf(Props.Create(() => new ParkedMessageHandlerActor(_service, "site-1"))); actor.Tell(new ParkedMessageRetryRequest("corr-5", "site-1", messageId, DateTimeOffset.UtcNow)); var response = ExpectMsg(); Assert.True(response.Success); Assert.Equal("corr-5", response.CorrelationId); Assert.Null(response.ErrorMessage); var msg = await _storage.GetMessageByIdAsync(messageId); Assert.Equal(StoreAndForwardMessageStatus.Pending, msg!.Status); } [Fact] public void Retry_UnknownMessage_ReturnsFailureWithMessage() { var actor = Sys.ActorOf(Props.Create(() => new ParkedMessageHandlerActor(_service, "site-1"))); actor.Tell(new ParkedMessageRetryRequest("corr-6", "site-1", "does-not-exist", DateTimeOffset.UtcNow)); var response = ExpectMsg(); Assert.False(response.Success); Assert.Equal("corr-6", response.CorrelationId); Assert.NotNull(response.ErrorMessage); } [Fact] public async Task Discard_ParkedMessage_ReturnsSuccessAndRemovesMessage() { var messageId = await ParkMessageAsync(StoreAndForwardCategory.ExternalSystem, """{}"""); var actor = Sys.ActorOf(Props.Create(() => new ParkedMessageHandlerActor(_service, "site-1"))); actor.Tell(new ParkedMessageDiscardRequest("corr-7", "site-1", messageId, DateTimeOffset.UtcNow)); var response = ExpectMsg(); Assert.True(response.Success); Assert.Equal("corr-7", response.CorrelationId); var msg = await _storage.GetMessageByIdAsync(messageId); Assert.Null(msg); } [Fact] public void Discard_UnknownMessage_ReturnsFailureWithMessage() { var actor = Sys.ActorOf(Props.Create(() => new ParkedMessageHandlerActor(_service, "site-1"))); actor.Tell(new ParkedMessageDiscardRequest("corr-8", "site-1", "does-not-exist", DateTimeOffset.UtcNow)); var response = ExpectMsg(); Assert.False(response.Success); Assert.NotNull(response.ErrorMessage); } }