114 lines
4.5 KiB
C#
114 lines
4.5 KiB
C#
using System.Text.Json;
|
|
using Akka.Actor;
|
|
using Akka.Event;
|
|
using ScadaLink.Commons.Messages.RemoteQuery;
|
|
|
|
namespace ScadaLink.StoreAndForward;
|
|
|
|
/// <summary>
|
|
/// Akka actor bridge for <see cref="StoreAndForwardService"/> parked-message operations.
|
|
/// Receives Query/Retry/Discard requests from the SiteCommunicationActor and replies
|
|
/// with the matching response records.
|
|
/// </summary>
|
|
public class ParkedMessageHandlerActor : ReceiveActor
|
|
{
|
|
private readonly ILoggingAdapter _log = Context.GetLogger();
|
|
private readonly StoreAndForwardService _service;
|
|
private readonly string _siteId;
|
|
|
|
public ParkedMessageHandlerActor(StoreAndForwardService service, string siteId)
|
|
{
|
|
_service = service;
|
|
_siteId = siteId;
|
|
|
|
Receive<ParkedMessageQueryRequest>(HandleQuery);
|
|
Receive<ParkedMessageRetryRequest>(HandleRetry);
|
|
Receive<ParkedMessageDiscardRequest>(HandleDiscard);
|
|
}
|
|
|
|
private void HandleQuery(ParkedMessageQueryRequest msg)
|
|
{
|
|
var sender = Sender;
|
|
var siteId = _siteId;
|
|
|
|
// StoreAndForward-007: idiomatic PipeTo with explicit success/failure
|
|
// projections instead of ContinueWith. Both projections touch only locals
|
|
// (captured before the await), so they are safe to run off the actor thread.
|
|
_service.GetParkedMessagesAsync(category: null, msg.PageNumber, msg.PageSize)
|
|
.PipeTo(
|
|
sender,
|
|
success: result =>
|
|
{
|
|
var entries = result.Messages
|
|
.Select(m => new ParkedMessageEntry(
|
|
MessageId: m.Id,
|
|
TargetSystem: m.Target,
|
|
MethodName: ExtractMethodName(m.PayloadJson, m.Category),
|
|
ErrorMessage: m.LastError ?? string.Empty,
|
|
AttemptCount: m.RetryCount,
|
|
OriginalTimestamp: m.CreatedAt,
|
|
LastAttemptTimestamp: m.LastAttemptAt ?? m.CreatedAt,
|
|
MaxAttempts: m.MaxRetries,
|
|
Category: m.Category,
|
|
OriginInstance: m.OriginInstanceName))
|
|
.ToList();
|
|
|
|
return new ParkedMessageQueryResponse(
|
|
msg.CorrelationId, siteId, entries, result.TotalCount,
|
|
msg.PageNumber, msg.PageSize, true, null, DateTimeOffset.UtcNow);
|
|
},
|
|
failure: ex => new ParkedMessageQueryResponse(
|
|
msg.CorrelationId, siteId, [], 0, msg.PageNumber, msg.PageSize,
|
|
false, ex.GetBaseException().Message, DateTimeOffset.UtcNow));
|
|
}
|
|
|
|
private void HandleRetry(ParkedMessageRetryRequest msg)
|
|
{
|
|
var sender = Sender;
|
|
|
|
_service.RetryParkedMessageAsync(msg.MessageId)
|
|
.PipeTo(
|
|
sender,
|
|
success: retried => new ParkedMessageRetryResponse(
|
|
msg.CorrelationId, retried,
|
|
retried ? null : "Message not found or no longer parked."),
|
|
failure: ex => new ParkedMessageRetryResponse(
|
|
msg.CorrelationId, false, ex.GetBaseException().Message));
|
|
}
|
|
|
|
private void HandleDiscard(ParkedMessageDiscardRequest msg)
|
|
{
|
|
var sender = Sender;
|
|
|
|
_service.DiscardParkedMessageAsync(msg.MessageId)
|
|
.PipeTo(
|
|
sender,
|
|
success: discarded => new ParkedMessageDiscardResponse(
|
|
msg.CorrelationId, discarded,
|
|
discarded ? null : "Message not found or no longer parked."),
|
|
failure: ex => new ParkedMessageDiscardResponse(
|
|
msg.CorrelationId, false, ex.GetBaseException().Message));
|
|
}
|
|
|
|
private static string ExtractMethodName(string payloadJson, Commons.Types.Enums.StoreAndForwardCategory category)
|
|
{
|
|
if (string.IsNullOrEmpty(payloadJson))
|
|
return category.ToString();
|
|
|
|
try
|
|
{
|
|
using var doc = JsonDocument.Parse(payloadJson);
|
|
var root = doc.RootElement;
|
|
if (root.TryGetProperty("MethodName", out var method) && method.ValueKind == JsonValueKind.String)
|
|
return method.GetString() ?? category.ToString();
|
|
if (root.TryGetProperty("Subject", out var subject) && subject.ValueKind == JsonValueKind.String)
|
|
return subject.GetString() ?? category.ToString();
|
|
}
|
|
catch (JsonException)
|
|
{
|
|
}
|
|
|
|
return category.ToString();
|
|
}
|
|
}
|