From 1822e3c76f98c565451c22faa8e640029997e2f1 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Wed, 13 May 2026 07:12:37 -0400 Subject: [PATCH] fix(store-and-forward): wire up parked-message handler and start S&F service on sites MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The Parked Messages page returned "Parked message handler not available" because no actor was ever registered for ParkedMessages, and Retry/Discard requests had no Receive at all (would have hit deadletters). On top of that, StoreAndForwardService.StartAsync() was never called anywhere, so the sf_messages SQLite table was never created and the retry timer never ran — silently breaking all of S&F. - New ParkedMessageHandlerActor bridges StoreAndForwardService.{Get,Retry,Discard} using the Sender→Task→PipeTo pattern already used in DeploymentManagerActor. - SiteCommunicationActor now routes ParkedMessageRetryRequest and ParkedMessageDiscardRequest the same way as the existing Query handler. - AkkaHostedService.RegisterSiteActors() resolves StoreAndForwardService, calls StartAsync() to create the schema and start the timer, then creates and registers the handler actor. --- .../Actors/SiteCommunicationActor.cs | 22 ++++ .../Actors/AkkaHostedService.cs | 15 +++ .../ParkedMessageHandlerActor.cs | 119 ++++++++++++++++++ .../ScadaLink.StoreAndForward.csproj | 1 + 4 files changed, 157 insertions(+) create mode 100644 src/ScadaLink.StoreAndForward/ParkedMessageHandlerActor.cs diff --git a/src/ScadaLink.Communication/Actors/SiteCommunicationActor.cs b/src/ScadaLink.Communication/Actors/SiteCommunicationActor.cs index 35f300c..12f13f1 100644 --- a/src/ScadaLink.Communication/Actors/SiteCommunicationActor.cs +++ b/src/ScadaLink.Communication/Actors/SiteCommunicationActor.cs @@ -136,6 +136,28 @@ public class SiteCommunicationActor : ReceiveActor, IWithTimers } }); + Receive(msg => + { + if (_parkedMessageHandler != null) + _parkedMessageHandler.Forward(msg); + else + { + Sender.Tell(new ParkedMessageRetryResponse( + msg.CorrelationId, false, "Parked message handler not available")); + } + }); + + Receive(msg => + { + if (_parkedMessageHandler != null) + _parkedMessageHandler.Forward(msg); + else + { + Sender.Tell(new ParkedMessageDiscardResponse( + msg.CorrelationId, false, "Parked message handler not available")); + } + }); + // Internal: send heartbeat tick Receive(_ => SendHeartbeatToCentral()); diff --git a/src/ScadaLink.Host/Actors/AkkaHostedService.cs b/src/ScadaLink.Host/Actors/AkkaHostedService.cs index 9bfd1ac..49de15a 100644 --- a/src/ScadaLink.Host/Actors/AkkaHostedService.cs +++ b/src/ScadaLink.Host/Actors/AkkaHostedService.cs @@ -316,6 +316,21 @@ akka {{ siteCommActor.Tell(new RegisterLocalHandler(LocalHandlerType.EventLog, eventLogHandler)); } + // Parked message handler — bridges Akka to StoreAndForwardService + var storeAndForwardService = _serviceProvider.GetService(); + if (storeAndForwardService != null) + { + // Initialize SQLite schema and start the retry timer. Must complete before + // any actor or HTTP handler touches the service. + storeAndForwardService.StartAsync().GetAwaiter().GetResult(); + + var parkedMessageHandler = _actorSystem.ActorOf( + Props.Create(() => new ParkedMessageHandlerActor( + storeAndForwardService, _nodeOptions.SiteId!)), + "parked-message-handler"); + siteCommActor.Tell(new RegisterLocalHandler(LocalHandlerType.ParkedMessages, parkedMessageHandler)); + } + // Register SiteCommunicationActor with ClusterClientReceptionist so central ClusterClients can reach it ClusterClientReceptionist.Get(_actorSystem).RegisterService(siteCommActor); diff --git a/src/ScadaLink.StoreAndForward/ParkedMessageHandlerActor.cs b/src/ScadaLink.StoreAndForward/ParkedMessageHandlerActor.cs new file mode 100644 index 0000000..aba0b89 --- /dev/null +++ b/src/ScadaLink.StoreAndForward/ParkedMessageHandlerActor.cs @@ -0,0 +1,119 @@ +using System.Text.Json; +using Akka.Actor; +using Akka.Event; +using ScadaLink.Commons.Messages.RemoteQuery; + +namespace ScadaLink.StoreAndForward; + +/// +/// Akka actor bridge for parked-message operations. +/// Receives Query/Retry/Discard requests from the SiteCommunicationActor and replies +/// with the matching response records. +/// +public class ParkedMessageHandlerActor : ReceiveActor +{ + private readonly ILoggingAdapter _log = Context.GetLogger(); + private readonly StoreAndForwardService _service; + private readonly string _siteId; + + public ParkedMessageHandlerActor(StoreAndForwardService service, string siteId) + { + _service = service; + _siteId = siteId; + + Receive(HandleQuery); + Receive(HandleRetry); + Receive(HandleDiscard); + } + + private void HandleQuery(ParkedMessageQueryRequest msg) + { + var sender = Sender; + var siteId = _siteId; + + _service.GetParkedMessagesAsync(category: null, msg.PageNumber, msg.PageSize) + .ContinueWith(t => + { + if (t.IsCompletedSuccessfully) + { + var entries = t.Result.Messages + .Select(m => new ParkedMessageEntry( + MessageId: m.Id, + TargetSystem: m.Target, + MethodName: ExtractMethodName(m.PayloadJson, m.Category), + ErrorMessage: m.LastError ?? string.Empty, + AttemptCount: m.RetryCount, + OriginalTimestamp: m.CreatedAt, + LastAttemptTimestamp: m.LastAttemptAt ?? m.CreatedAt)) + .ToList(); + + return new ParkedMessageQueryResponse( + msg.CorrelationId, siteId, entries, t.Result.TotalCount, + msg.PageNumber, msg.PageSize, true, null, DateTimeOffset.UtcNow); + } + + return new ParkedMessageQueryResponse( + msg.CorrelationId, siteId, [], 0, msg.PageNumber, msg.PageSize, + false, t.Exception?.GetBaseException().Message, DateTimeOffset.UtcNow); + }).PipeTo(sender); + } + + private void HandleRetry(ParkedMessageRetryRequest msg) + { + var sender = Sender; + + _service.RetryParkedMessageAsync(msg.MessageId) + .ContinueWith(t => + { + if (t.IsCompletedSuccessfully) + { + return new ParkedMessageRetryResponse( + msg.CorrelationId, t.Result, + t.Result ? null : "Message not found or no longer parked."); + } + + return new ParkedMessageRetryResponse( + msg.CorrelationId, false, t.Exception?.GetBaseException().Message); + }).PipeTo(sender); + } + + private void HandleDiscard(ParkedMessageDiscardRequest msg) + { + var sender = Sender; + + _service.DiscardParkedMessageAsync(msg.MessageId) + .ContinueWith(t => + { + if (t.IsCompletedSuccessfully) + { + return new ParkedMessageDiscardResponse( + msg.CorrelationId, t.Result, + t.Result ? null : "Message not found or no longer parked."); + } + + return new ParkedMessageDiscardResponse( + msg.CorrelationId, false, t.Exception?.GetBaseException().Message); + }).PipeTo(sender); + } + + private static string ExtractMethodName(string payloadJson, Commons.Types.Enums.StoreAndForwardCategory category) + { + if (string.IsNullOrEmpty(payloadJson)) + return category.ToString(); + + try + { + using var doc = JsonDocument.Parse(payloadJson); + var root = doc.RootElement; + if (root.TryGetProperty("MethodName", out var method) && method.ValueKind == JsonValueKind.String) + return method.GetString() ?? category.ToString(); + if (root.TryGetProperty("Subject", out var subject) && subject.ValueKind == JsonValueKind.String) + return subject.GetString() ?? category.ToString(); + } + catch (JsonException) + { + } + + return category.ToString(); + } +} diff --git a/src/ScadaLink.StoreAndForward/ScadaLink.StoreAndForward.csproj b/src/ScadaLink.StoreAndForward/ScadaLink.StoreAndForward.csproj index 3d1f13c..d37f2e3 100644 --- a/src/ScadaLink.StoreAndForward/ScadaLink.StoreAndForward.csproj +++ b/src/ScadaLink.StoreAndForward/ScadaLink.StoreAndForward.csproj @@ -8,6 +8,7 @@ +