fix(store-and-forward): wire up parked-message handler and start S&F service on sites
The Parked Messages page returned "Parked message handler not available"
because no actor was ever registered for ParkedMessages, and Retry/Discard
requests had no Receive at all (would have hit deadletters). On top of
that, StoreAndForwardService.StartAsync() was never called anywhere, so
the sf_messages SQLite table was never created and the retry timer never
ran — silently breaking all of S&F.
- New ParkedMessageHandlerActor bridges StoreAndForwardService.{Get,Retry,Discard}
using the Sender→Task→PipeTo pattern already used in DeploymentManagerActor.
- SiteCommunicationActor now routes ParkedMessageRetryRequest and
ParkedMessageDiscardRequest the same way as the existing Query handler.
- AkkaHostedService.RegisterSiteActors() resolves StoreAndForwardService,
calls StartAsync() to create the schema and start the timer, then
creates and registers the handler actor.
This commit is contained in:
@@ -136,6 +136,28 @@ public class SiteCommunicationActor : ReceiveActor, IWithTimers
|
||||
}
|
||||
});
|
||||
|
||||
Receive<ParkedMessageRetryRequest>(msg =>
|
||||
{
|
||||
if (_parkedMessageHandler != null)
|
||||
_parkedMessageHandler.Forward(msg);
|
||||
else
|
||||
{
|
||||
Sender.Tell(new ParkedMessageRetryResponse(
|
||||
msg.CorrelationId, false, "Parked message handler not available"));
|
||||
}
|
||||
});
|
||||
|
||||
Receive<ParkedMessageDiscardRequest>(msg =>
|
||||
{
|
||||
if (_parkedMessageHandler != null)
|
||||
_parkedMessageHandler.Forward(msg);
|
||||
else
|
||||
{
|
||||
Sender.Tell(new ParkedMessageDiscardResponse(
|
||||
msg.CorrelationId, false, "Parked message handler not available"));
|
||||
}
|
||||
});
|
||||
|
||||
// Internal: send heartbeat tick
|
||||
Receive<SendHeartbeat>(_ => SendHeartbeatToCentral());
|
||||
|
||||
|
||||
@@ -316,6 +316,21 @@ akka {{
|
||||
siteCommActor.Tell(new RegisterLocalHandler(LocalHandlerType.EventLog, eventLogHandler));
|
||||
}
|
||||
|
||||
// Parked message handler — bridges Akka to StoreAndForwardService
|
||||
var storeAndForwardService = _serviceProvider.GetService<StoreAndForwardService>();
|
||||
if (storeAndForwardService != null)
|
||||
{
|
||||
// Initialize SQLite schema and start the retry timer. Must complete before
|
||||
// any actor or HTTP handler touches the service.
|
||||
storeAndForwardService.StartAsync().GetAwaiter().GetResult();
|
||||
|
||||
var parkedMessageHandler = _actorSystem.ActorOf(
|
||||
Props.Create(() => new ParkedMessageHandlerActor(
|
||||
storeAndForwardService, _nodeOptions.SiteId!)),
|
||||
"parked-message-handler");
|
||||
siteCommActor.Tell(new RegisterLocalHandler(LocalHandlerType.ParkedMessages, parkedMessageHandler));
|
||||
}
|
||||
|
||||
// Register SiteCommunicationActor with ClusterClientReceptionist so central ClusterClients can reach it
|
||||
ClusterClientReceptionist.Get(_actorSystem).RegisterService(siteCommActor);
|
||||
|
||||
|
||||
119
src/ScadaLink.StoreAndForward/ParkedMessageHandlerActor.cs
Normal file
119
src/ScadaLink.StoreAndForward/ParkedMessageHandlerActor.cs
Normal file
@@ -0,0 +1,119 @@
|
||||
using System.Text.Json;
|
||||
using Akka.Actor;
|
||||
using Akka.Event;
|
||||
using ScadaLink.Commons.Messages.RemoteQuery;
|
||||
|
||||
namespace ScadaLink.StoreAndForward;
|
||||
|
||||
/// <summary>
|
||||
/// Akka actor bridge for <see cref="StoreAndForwardService"/> parked-message operations.
|
||||
/// Receives Query/Retry/Discard requests from the SiteCommunicationActor and replies
|
||||
/// with the matching response records.
|
||||
/// </summary>
|
||||
public class ParkedMessageHandlerActor : ReceiveActor
|
||||
{
|
||||
private readonly ILoggingAdapter _log = Context.GetLogger();
|
||||
private readonly StoreAndForwardService _service;
|
||||
private readonly string _siteId;
|
||||
|
||||
public ParkedMessageHandlerActor(StoreAndForwardService service, string siteId)
|
||||
{
|
||||
_service = service;
|
||||
_siteId = siteId;
|
||||
|
||||
Receive<ParkedMessageQueryRequest>(HandleQuery);
|
||||
Receive<ParkedMessageRetryRequest>(HandleRetry);
|
||||
Receive<ParkedMessageDiscardRequest>(HandleDiscard);
|
||||
}
|
||||
|
||||
private void HandleQuery(ParkedMessageQueryRequest msg)
|
||||
{
|
||||
var sender = Sender;
|
||||
var siteId = _siteId;
|
||||
|
||||
_service.GetParkedMessagesAsync(category: null, msg.PageNumber, msg.PageSize)
|
||||
.ContinueWith(t =>
|
||||
{
|
||||
if (t.IsCompletedSuccessfully)
|
||||
{
|
||||
var entries = t.Result.Messages
|
||||
.Select(m => new ParkedMessageEntry(
|
||||
MessageId: m.Id,
|
||||
TargetSystem: m.Target,
|
||||
MethodName: ExtractMethodName(m.PayloadJson, m.Category),
|
||||
ErrorMessage: m.LastError ?? string.Empty,
|
||||
AttemptCount: m.RetryCount,
|
||||
OriginalTimestamp: m.CreatedAt,
|
||||
LastAttemptTimestamp: m.LastAttemptAt ?? m.CreatedAt))
|
||||
.ToList();
|
||||
|
||||
return new ParkedMessageQueryResponse(
|
||||
msg.CorrelationId, siteId, entries, t.Result.TotalCount,
|
||||
msg.PageNumber, msg.PageSize, true, null, DateTimeOffset.UtcNow);
|
||||
}
|
||||
|
||||
return new ParkedMessageQueryResponse(
|
||||
msg.CorrelationId, siteId, [], 0, msg.PageNumber, msg.PageSize,
|
||||
false, t.Exception?.GetBaseException().Message, DateTimeOffset.UtcNow);
|
||||
}).PipeTo(sender);
|
||||
}
|
||||
|
||||
private void HandleRetry(ParkedMessageRetryRequest msg)
|
||||
{
|
||||
var sender = Sender;
|
||||
|
||||
_service.RetryParkedMessageAsync(msg.MessageId)
|
||||
.ContinueWith(t =>
|
||||
{
|
||||
if (t.IsCompletedSuccessfully)
|
||||
{
|
||||
return new ParkedMessageRetryResponse(
|
||||
msg.CorrelationId, t.Result,
|
||||
t.Result ? null : "Message not found or no longer parked.");
|
||||
}
|
||||
|
||||
return new ParkedMessageRetryResponse(
|
||||
msg.CorrelationId, false, t.Exception?.GetBaseException().Message);
|
||||
}).PipeTo(sender);
|
||||
}
|
||||
|
||||
private void HandleDiscard(ParkedMessageDiscardRequest msg)
|
||||
{
|
||||
var sender = Sender;
|
||||
|
||||
_service.DiscardParkedMessageAsync(msg.MessageId)
|
||||
.ContinueWith(t =>
|
||||
{
|
||||
if (t.IsCompletedSuccessfully)
|
||||
{
|
||||
return new ParkedMessageDiscardResponse(
|
||||
msg.CorrelationId, t.Result,
|
||||
t.Result ? null : "Message not found or no longer parked.");
|
||||
}
|
||||
|
||||
return new ParkedMessageDiscardResponse(
|
||||
msg.CorrelationId, false, t.Exception?.GetBaseException().Message);
|
||||
}).PipeTo(sender);
|
||||
}
|
||||
|
||||
private static string ExtractMethodName(string payloadJson, Commons.Types.Enums.StoreAndForwardCategory category)
|
||||
{
|
||||
if (string.IsNullOrEmpty(payloadJson))
|
||||
return category.ToString();
|
||||
|
||||
try
|
||||
{
|
||||
using var doc = JsonDocument.Parse(payloadJson);
|
||||
var root = doc.RootElement;
|
||||
if (root.TryGetProperty("MethodName", out var method) && method.ValueKind == JsonValueKind.String)
|
||||
return method.GetString() ?? category.ToString();
|
||||
if (root.TryGetProperty("Subject", out var subject) && subject.ValueKind == JsonValueKind.String)
|
||||
return subject.GetString() ?? category.ToString();
|
||||
}
|
||||
catch (JsonException)
|
||||
{
|
||||
}
|
||||
|
||||
return category.ToString();
|
||||
}
|
||||
}
|
||||
@@ -8,6 +8,7 @@
|
||||
</PropertyGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<PackageReference Include="Akka" Version="1.5.62" />
|
||||
<PackageReference Include="Microsoft.Data.Sqlite" Version="10.0.7" />
|
||||
<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="10.0.7" />
|
||||
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="10.0.7" />
|
||||
|
||||
Reference in New Issue
Block a user