diff --git a/src/ScadaLink.Commons/Messages/Audit/SiteCallRelayMessages.cs b/src/ScadaLink.Commons/Messages/Audit/SiteCallRelayMessages.cs new file mode 100644 index 0000000..62039e6 --- /dev/null +++ b/src/ScadaLink.Commons/Messages/Audit/SiteCallRelayMessages.cs @@ -0,0 +1,113 @@ +namespace ScadaLink.Commons.Messages.Audit; + +/// +/// Outcome of a Site Call Audit (#22) Retry/Discard relay — distinguishes the +/// three cases the Central UI Site Calls page must surface differently. +/// +/// +/// The "site unreachable" case is deliberately separate from +/// : central is an eventually-consistent mirror, +/// not the source of truth, so a relay that never reaches the owning site is a +/// transient transport condition the operator can retry — not a failed +/// operation. The UI shows "site unreachable" rather than a generic error. +/// +public enum SiteCallRelayOutcome +{ + /// + /// The owning site received the relay command and applied the action to its + /// Store-and-Forward buffer (the parked cached call was reset to retry, or + /// discarded). The corrected state reaches central later via telemetry. + /// + Applied, + + /// + /// The owning site received the relay command but found nothing to do — no + /// parked row matched the tracked id (already delivered/discarded, or no + /// longer Parked). A definitive answer from the site, not a failure. + /// + NotParked, + + /// + /// The owning site could not be reached (offline / no ClusterClient route / + /// relay timed out). The action was NOT applied; the operator may retry once + /// the site is back online. + /// + SiteUnreachable, + + /// + /// The owning site was reached but reported it could not apply the action + /// (its parked-message handler was unavailable or its store faulted). + /// + OperationFailed, +} + +/// +/// Central UI → Site Call Audit: relay a Retry of a parked cached call to its +/// owning site. The owning site performs the actual retry on its +/// Store-and-Forward buffer — central never mutates the central SiteCalls +/// mirror row. Mirrors +/// +/// but carries (the relay target) and answers with a +/// distinct site-unreachable outcome. +/// +/// Request correlation id, echoed on the response. +/// +/// The cached operation to retry — the PK of the central SiteCalls row +/// and the S&F buffer message id at the owning site. +/// +/// +/// The owning site (SiteCall.SourceSite) the relay is routed to. +/// +public sealed record RetrySiteCallRequest( + string CorrelationId, + Guid TrackedOperationId, + string SourceSite); + +/// +/// Site Call Audit → Central UI: result of a . +/// +/// Echoed request correlation id. +/// +/// The relay outcome — , +/// , +/// or +/// . +/// +/// +/// Convenience flag — true only for . +/// +/// +/// false only for ; lets +/// the UI distinguish "site offline" from "operation failed" without switching +/// on the enum. +/// +/// +/// Human-readable detail for a non-applied outcome; null on success. +/// +public sealed record RetrySiteCallResponse( + string CorrelationId, + SiteCallRelayOutcome Outcome, + bool Success, + bool SiteReachable, + string? ErrorMessage); + +/// +/// Central UI → Site Call Audit: relay a Discard of a parked cached call to its +/// owning site. See for the source-of-truth +/// and routing rationale. +/// +public sealed record DiscardSiteCallRequest( + string CorrelationId, + Guid TrackedOperationId, + string SourceSite); + +/// +/// Site Call Audit → Central UI: result of a . +/// Same shape as . +/// +public sealed record DiscardSiteCallResponse( + string CorrelationId, + SiteCallRelayOutcome Outcome, + bool Success, + bool SiteReachable, + string? ErrorMessage); diff --git a/src/ScadaLink.Commons/Messages/RemoteQuery/ParkedOperationRelayMessages.cs b/src/ScadaLink.Commons/Messages/RemoteQuery/ParkedOperationRelayMessages.cs new file mode 100644 index 0000000..0d60789 --- /dev/null +++ b/src/ScadaLink.Commons/Messages/RemoteQuery/ParkedOperationRelayMessages.cs @@ -0,0 +1,75 @@ +using ScadaLink.Commons.Types; + +namespace ScadaLink.Commons.Messages.RemoteQuery; + +/// +/// Central → site relay command: retry a parked cached operation +/// (ExternalSystem.CachedCall / Database.CachedWrite) on the +/// owning site's Store-and-Forward buffer. Sent over the command/control +/// channel by SiteCallAuditActor when an operator clicks Retry on a +/// Parked Site Call row in the Central UI. +/// +/// +/// +/// The site is the source of truth for cached-call status — central never +/// mutates the central SiteCalls mirror row directly. This command asks +/// the site to reset its own parked row back to Pending so the S&F +/// retry sweep attempts delivery again; the corrected state then flows back to +/// central via the normal cached-call telemetry path. +/// +/// +/// The cached call's S&F buffer message id is the +/// itself (the tracked id is supplied as the +/// buffered row's id at enqueue time), so the site can resolve the parked row +/// directly from . A retry on a row that is not +/// actually Parked is a safe no-op at the site — the ack reports +/// Applied=false rather than corrupting a non-parked row. +/// +/// +/// This is a plain record carrying only ids, so it lives in Commons (no +/// IActorRef field). It mirrors +/// but keys on rather than the opaque S&F +/// message-id string. +/// +/// +public sealed record RetryParkedOperation( + string CorrelationId, + TrackedOperationId TrackedOperationId); + +/// +/// Central → site relay command: discard a parked cached operation on the +/// owning site's Store-and-Forward buffer. Sent over the command/control +/// channel by SiteCallAuditActor when an operator clicks Discard on a +/// Parked Site Call row in the Central UI. See +/// for the source-of-truth and message-id +/// rationale; Discard marks the operation terminally Discarded at the +/// site by removing the parked S&F buffer row. +/// +public sealed record DiscardParkedOperation( + string CorrelationId, + TrackedOperationId TrackedOperationId); + +/// +/// Site → central ack for a / +/// relay command. The site replies this +/// after applying (or safely no-op-ing) the action against its own +/// Store-and-Forward buffer. +/// +/// Correlation id of the originating relay command. +/// +/// true when the parked operation was found and the action was applied; +/// false when no parked row matched the +/// (already delivered, discarded, never cached, or not in a Parked +/// state). A false ack is a definitive "nothing to do" answer from the +/// site — it is NOT a transport failure, so the relay must distinguish it from +/// a site-unreachable timeout. +/// +/// +/// Populated only when the site could not apply the action (e.g. the parked +/// message handler is not available, or the S&F store faulted); null +/// on a clean Applied=true/Applied=false outcome. +/// +public sealed record ParkedOperationActionAck( + string CorrelationId, + bool Applied, + string? ErrorMessage = null); diff --git a/src/ScadaLink.Communication/Actors/SiteCommunicationActor.cs b/src/ScadaLink.Communication/Actors/SiteCommunicationActor.cs index 1934ac8..42d7635 100644 --- a/src/ScadaLink.Communication/Actors/SiteCommunicationActor.cs +++ b/src/ScadaLink.Communication/Actors/SiteCommunicationActor.cs @@ -167,6 +167,33 @@ public class SiteCommunicationActor : ReceiveActor, IWithTimers } }); + // Task 5 (#22): central→site Retry/Discard relay for parked cached + // operations. SiteCallAuditActor relays these over the command/control + // channel; the parked-message handler executes them against the local + // S&F buffer and replies a ParkedOperationActionAck that routes back to + // the relaying SiteCallAuditActor's Ask. + Receive(msg => + { + if (_parkedMessageHandler != null) + _parkedMessageHandler.Forward(msg); + else + { + Sender.Tell(new ParkedOperationActionAck( + msg.CorrelationId, Applied: false, "Parked message handler not available")); + } + }); + + Receive(msg => + { + if (_parkedMessageHandler != null) + _parkedMessageHandler.Forward(msg); + else + { + Sender.Tell(new ParkedOperationActionAck( + msg.CorrelationId, Applied: false, "Parked message handler not available")); + } + }); + // Notification Outbox: forward a buffered notification submitted by the site // Store-and-Forward Engine to the central cluster. The original Sender (the // S&F forwarder's Ask) is forwarded as the ClusterClient.Send sender so the diff --git a/src/ScadaLink.Communication/CommunicationService.cs b/src/ScadaLink.Communication/CommunicationService.cs index c83901b..e7cadf9 100644 --- a/src/ScadaLink.Communication/CommunicationService.cs +++ b/src/ScadaLink.Communication/CommunicationService.cs @@ -347,6 +347,33 @@ public class CommunicationService return await GetSiteCallAudit().Ask( request, _options.QueryTimeout, cancellationToken); } + + /// + /// Task 5 (#22): relays an operator Retry of a parked cached call to its + /// owning site. The SiteCallAuditActor is Asked directly (it is + /// central-local); it in turn relays a RetryParkedOperation to the + /// owning site and replies a carrying a + /// distinct site-unreachable outcome. Central never mutates the central + /// SiteCalls mirror row. + /// + public async Task RetrySiteCallAsync( + RetrySiteCallRequest request, CancellationToken cancellationToken = default) + { + return await GetSiteCallAudit().Ask( + request, _options.QueryTimeout, cancellationToken); + } + + /// + /// Task 5 (#22): relays an operator Discard of a parked cached call to its + /// owning site. See for the routing and + /// source-of-truth rationale. + /// + public async Task DiscardSiteCallAsync( + DiscardSiteCallRequest request, CancellationToken cancellationToken = default) + { + return await GetSiteCallAudit().Ask( + request, _options.QueryTimeout, cancellationToken); + } } /// diff --git a/src/ScadaLink.Host/Actors/AkkaHostedService.cs b/src/ScadaLink.Host/Actors/AkkaHostedService.cs index 4708744..90dda17 100644 --- a/src/ScadaLink.Host/Actors/AkkaHostedService.cs +++ b/src/ScadaLink.Host/Actors/AkkaHostedService.cs @@ -446,7 +446,17 @@ akka {{ // the Site Call Audit actor directly (query, KPIs, detail) — mirrors the // SetNotificationOutbox wiring above. commService?.SetSiteCallAudit(siteCallAuditProxy); - _logger.LogInformation("SiteCallAuditActor singleton created"); + + // Task 5 (#22): hand the CentralCommunicationActor to the SiteCallAudit + // actor so it can relay operator Retry/Discard on parked cached calls to + // the owning site (over the per-site ClusterClient via SiteEnvelope). + // Mirrors the RegisterAuditIngest / RegisterNotificationOutbox wiring; + // the message is sent to the singleton proxy so it reaches whichever + // central node currently hosts the singleton. + siteCallAuditProxy.Tell( + new ScadaLink.SiteCallAudit.RegisterCentralCommunication(centralCommActor)); + _logger.LogInformation( + "SiteCallAuditActor singleton created and registered with CentralCommunicationActor"); _logger.LogInformation("Central actors registered. CentralCommunicationActor created."); } diff --git a/src/ScadaLink.SiteCallAudit/ScadaLink.SiteCallAudit.csproj b/src/ScadaLink.SiteCallAudit/ScadaLink.SiteCallAudit.csproj index 7603dd6..0a46f34 100644 --- a/src/ScadaLink.SiteCallAudit/ScadaLink.SiteCallAudit.csproj +++ b/src/ScadaLink.SiteCallAudit/ScadaLink.SiteCallAudit.csproj @@ -24,6 +24,11 @@ project reference is documented here so the actor's scope-per-message GetRequiredService() compiles. --> + + diff --git a/src/ScadaLink.SiteCallAudit/SiteCallAuditActor.cs b/src/ScadaLink.SiteCallAudit/SiteCallAuditActor.cs index 11af5ca..e338b05 100644 --- a/src/ScadaLink.SiteCallAudit/SiteCallAuditActor.cs +++ b/src/ScadaLink.SiteCallAudit/SiteCallAuditActor.cs @@ -4,8 +4,10 @@ using Microsoft.Extensions.Logging; using ScadaLink.Commons.Entities.Audit; using ScadaLink.Commons.Interfaces.Repositories; using ScadaLink.Commons.Messages.Audit; +using ScadaLink.Commons.Messages.RemoteQuery; using ScadaLink.Commons.Types; using ScadaLink.Commons.Types.Audit; +using ScadaLink.Communication; namespace ScadaLink.SiteCallAudit; @@ -52,6 +54,19 @@ public class SiteCallAuditActor : ReceiveActor private readonly SiteCallAuditOptions _options; private readonly ILogger _logger; + /// + /// Task 5 (#22): the central→site command transport — the + /// CentralCommunicationActor, which owns the per-site + /// ClusterClient map and routes a to the + /// owning site. Set via by the + /// Host after both actors exist (this actor is a cluster singleton; the + /// transport actor is created separately). Null until registration + /// completes — a relay arriving before then is answered with a + /// outcome, because there + /// is genuinely no route to any site yet. + /// + private IActorRef? _centralCommunication; + /// /// Test-mode constructor — injects a concrete repository instance whose /// lifetime exceeds the test, so the actor reuses the same instance @@ -110,6 +125,15 @@ public class SiteCallAuditActor : ReceiveActor Receive(HandleDetail); Receive(HandleKpi); Receive(HandlePerSiteKpi); + + // Task 5 (#22): central→site Retry/Discard relay for parked cached calls. + Receive(msg => + { + _centralCommunication = msg.CentralCommunication; + _logger.LogInformation("SiteCallAudit registered central→site communication transport"); + }); + Receive(HandleRetrySiteCall); + Receive(HandleDiscardSiteCall); } /// @@ -385,6 +409,175 @@ public class SiteCallAuditActor : ReceiveActor } } + // ── Task 5: central→site Retry/Discard relay ── + + /// + /// Relays an operator Retry of a parked cached call to its owning site. The + /// site is the source of truth — this handler NEVER writes the central + /// SiteCalls mirror row. It wraps a + /// in a addressed to SourceSite, Asks the + /// CentralCommunicationActor (which routes it over the per-site + /// ClusterClient), and maps the site's + /// — or an Ask timeout — onto a + /// . A timeout / no-route is reported as + /// the distinct outcome, + /// not a generic failure, so the Central UI can tell "site offline" from + /// "operation failed". + /// + private void HandleRetrySiteCall(RetrySiteCallRequest request) + { + var sender = Sender; + + if (_centralCommunication is null) + { + // No transport registered yet — there is genuinely no route to any + // site, so the only honest answer is unreachable. + _logger.LogWarning( + "RetrySiteCall {TrackedOperationId} for site {SourceSite} arrived before the " + + "central→site transport was registered; reporting site unreachable", + request.TrackedOperationId, request.SourceSite); + sender.Tell(UnreachableRetry(request.CorrelationId)); + return; + } + + var relay = new RetryParkedOperation( + request.CorrelationId, new TrackedOperationId(request.TrackedOperationId)); + var envelope = new SiteEnvelope(request.SourceSite, relay); + + _centralCommunication.Ask(envelope, _options.RelayTimeout) + .PipeTo( + sender, + success: ack => MapRetryResponse(request.CorrelationId, ack), + failure: ex => MapRetryFailure(request.CorrelationId, request.SourceSite, ex)); + } + + /// + /// Relays an operator Discard of a parked cached call to its owning site. + /// Mirrors — see that method for the + /// source-of-truth and site-unreachable rationale. + /// + private void HandleDiscardSiteCall(DiscardSiteCallRequest request) + { + var sender = Sender; + + if (_centralCommunication is null) + { + _logger.LogWarning( + "DiscardSiteCall {TrackedOperationId} for site {SourceSite} arrived before the " + + "central→site transport was registered; reporting site unreachable", + request.TrackedOperationId, request.SourceSite); + sender.Tell(UnreachableDiscard(request.CorrelationId)); + return; + } + + var relay = new DiscardParkedOperation( + request.CorrelationId, new TrackedOperationId(request.TrackedOperationId)); + var envelope = new SiteEnvelope(request.SourceSite, relay); + + _centralCommunication.Ask(envelope, _options.RelayTimeout) + .PipeTo( + sender, + success: ack => MapDiscardResponse(request.CorrelationId, ack), + failure: ex => MapDiscardFailure(request.CorrelationId, request.SourceSite, ex)); + } + + /// + /// Maps the site's for a Retry onto a + /// : an applied action is + /// ; a clean no-op + /// (Applied=false, no error) is ; + /// an ack carrying an error is + /// — in every case the site WAS reached. + /// + private static RetrySiteCallResponse MapRetryResponse(string correlationId, ParkedOperationActionAck ack) + { + var outcome = ClassifyAck(ack); + return new RetrySiteCallResponse( + correlationId, + outcome, + Success: outcome == SiteCallRelayOutcome.Applied, + SiteReachable: true, + ErrorMessage: AckErrorMessage(outcome, ack)); + } + + private static DiscardSiteCallResponse MapDiscardResponse(string correlationId, ParkedOperationActionAck ack) + { + var outcome = ClassifyAck(ack); + return new DiscardSiteCallResponse( + correlationId, + outcome, + Success: outcome == SiteCallRelayOutcome.Applied, + SiteReachable: true, + ErrorMessage: AckErrorMessage(outcome, ack)); + } + + private RetrySiteCallResponse MapRetryFailure(string correlationId, string sourceSite, Exception ex) + { + _logger.LogWarning(ex, + "Retry relay to site {SourceSite} did not complete; reporting site unreachable", sourceSite); + return UnreachableRetry(correlationId); + } + + private DiscardSiteCallResponse MapDiscardFailure(string correlationId, string sourceSite, Exception ex) + { + _logger.LogWarning(ex, + "Discard relay to site {SourceSite} did not complete; reporting site unreachable", sourceSite); + return UnreachableDiscard(correlationId); + } + + /// + /// Classifies a site ack: Applied=true → applied; Applied=false + /// with no error → the site definitively had nothing parked; Applied=false + /// with an error → the site could not apply the action. + /// + private static SiteCallRelayOutcome ClassifyAck(ParkedOperationActionAck ack) + { + if (ack.Applied) + { + return SiteCallRelayOutcome.Applied; + } + + return ack.ErrorMessage is null + ? SiteCallRelayOutcome.NotParked + : SiteCallRelayOutcome.OperationFailed; + } + + private static string? AckErrorMessage(SiteCallRelayOutcome outcome, ParkedOperationActionAck ack) + { + return outcome switch + { + SiteCallRelayOutcome.Applied => null, + SiteCallRelayOutcome.NotParked => + "The operation is no longer parked at the site (already delivered, discarded, or retrying).", + SiteCallRelayOutcome.OperationFailed => ack.ErrorMessage, + _ => ack.ErrorMessage, + }; + } + + /// Shared "site unreachable" detail text for both relay directions. + private const string SiteUnreachableMessage = + "The owning site is unreachable; the action was not applied. Retry when the site is back online."; + + private static RetrySiteCallResponse UnreachableRetry(string correlationId) + { + return new RetrySiteCallResponse( + correlationId, + SiteCallRelayOutcome.SiteUnreachable, + Success: false, + SiteReachable: false, + ErrorMessage: SiteUnreachableMessage); + } + + private static DiscardSiteCallResponse UnreachableDiscard(string correlationId) + { + return new DiscardSiteCallResponse( + correlationId, + SiteCallRelayOutcome.SiteUnreachable, + Success: false, + SiteReachable: false, + ErrorMessage: SiteUnreachableMessage); + } + /// /// Resolves an for one read message. /// In test mode the injected instance is returned with a null scope; in @@ -464,3 +657,13 @@ public class SiteCallAuditActor : ReceiveActor return string.IsNullOrWhiteSpace(value) ? null : value; } } + +/// +/// Registers the central→site command transport (the CentralCommunicationActor) +/// with the so it can relay Retry/Discard +/// actions on parked cached calls to their owning sites. Sent by the Host after +/// both actors exist. Lives here (not in Commons) because it carries an +/// and ScadaLink.Commons has no Akka reference — +/// the same rationale as RegisterAuditIngest. +/// +public sealed record RegisterCentralCommunication(IActorRef CentralCommunication); diff --git a/src/ScadaLink.SiteCallAudit/SiteCallAuditOptions.cs b/src/ScadaLink.SiteCallAudit/SiteCallAuditOptions.cs index 572fec6..53fe6b9 100644 --- a/src/ScadaLink.SiteCallAudit/SiteCallAuditOptions.cs +++ b/src/ScadaLink.SiteCallAudit/SiteCallAuditOptions.cs @@ -23,4 +23,15 @@ public class SiteCallAuditOptions /// NotificationOutboxOptions.DeliveredKpiWindow. /// public TimeSpan KpiInterval { get; set; } = TimeSpan.FromMinutes(1); + + /// + /// Task 5 (#22): Ask timeout for the central→site Retry/Discard relay. When + /// the owning site does not ack a RetryParkedOperation / + /// DiscardParkedOperation within this window — site offline, no + /// ClusterClient route, or central buffering deliberately absent — the relay + /// reports a SiteUnreachable outcome. Default 10 seconds: long enough + /// to absorb a healthy cross-cluster round-trip, short enough that an + /// operator clicking Retry on an offline site gets a fast, honest answer. + /// + public TimeSpan RelayTimeout { get; set; } = TimeSpan.FromSeconds(10); } diff --git a/src/ScadaLink.StoreAndForward/ParkedMessageHandlerActor.cs b/src/ScadaLink.StoreAndForward/ParkedMessageHandlerActor.cs index 0f922cd..8d2b8ed 100644 --- a/src/ScadaLink.StoreAndForward/ParkedMessageHandlerActor.cs +++ b/src/ScadaLink.StoreAndForward/ParkedMessageHandlerActor.cs @@ -24,6 +24,13 @@ public class ParkedMessageHandlerActor : ReceiveActor Receive(HandleQuery); Receive(HandleRetry); Receive(HandleDiscard); + + // Task 5 (#22): central→site Retry/Discard relay for parked cached + // operations. The cached call's S&F buffer message id is the + // TrackedOperationId, so these reuse the same parked-message primitive + // as HandleRetry/HandleDiscard, keyed off the tracked id. + Receive(HandleRetryParkedOperation); + Receive(HandleDiscardParkedOperation); } private void HandleQuery(ParkedMessageQueryRequest msg) @@ -90,6 +97,46 @@ public class ParkedMessageHandlerActor : ReceiveActor msg.CorrelationId, false, ex.GetBaseException().Message)); } + /// + /// Task 5 (#22): executes a central-relayed Retry of a parked cached call. + /// The tracked id is the S&F buffer message id, so this reuses + /// — which only + /// touches rows that are actually Parked (a non-parked or unknown + /// operation yields false, a safe no-op). Central never mutates the + /// central SiteCalls mirror; the reset row's corrected state flows + /// back via the normal cached-call telemetry path. + /// + private void HandleRetryParkedOperation(RetryParkedOperation msg) + { + var sender = Sender; + + _service.RetryParkedMessageAsync(msg.TrackedOperationId.ToString()) + .PipeTo( + sender, + success: applied => new ParkedOperationActionAck( + msg.CorrelationId, applied, ErrorMessage: null), + failure: ex => new ParkedOperationActionAck( + msg.CorrelationId, Applied: false, ex.GetBaseException().Message)); + } + + /// + /// Task 5 (#22): executes a central-relayed Discard of a parked cached call. + /// Mirrors ; Discard removes the + /// parked S&F buffer row (only when it is actually Parked). + /// + private void HandleDiscardParkedOperation(DiscardParkedOperation msg) + { + var sender = Sender; + + _service.DiscardParkedMessageAsync(msg.TrackedOperationId.ToString()) + .PipeTo( + sender, + success: applied => new ParkedOperationActionAck( + msg.CorrelationId, applied, ErrorMessage: null), + failure: ex => new ParkedOperationActionAck( + msg.CorrelationId, Applied: false, ex.GetBaseException().Message)); + } + private static string ExtractMethodName(string payloadJson, Commons.Types.Enums.StoreAndForwardCategory category) { if (string.IsNullOrEmpty(payloadJson)) diff --git a/tests/ScadaLink.Communication.Tests/CommunicationServiceTests.cs b/tests/ScadaLink.Communication.Tests/CommunicationServiceTests.cs index b1a1de8..df09395 100644 --- a/tests/ScadaLink.Communication.Tests/CommunicationServiceTests.cs +++ b/tests/ScadaLink.Communication.Tests/CommunicationServiceTests.cs @@ -382,6 +382,64 @@ public class CommunicationServiceTests : TestKit Assert.Equal("plant-a", result.Sites[0].SourceSite); } + [Fact] + public async Task RetrySiteCallAsync_BeforeSiteCallAuditSet_Throws() + { + var service = new CommunicationService( + Options.Create(new CommunicationOptions()), + NullLogger.Instance); + + await Assert.ThrowsAsync(() => + service.RetrySiteCallAsync(new RetrySiteCallRequest("corr-1", Guid.NewGuid(), "plant-a"))); + } + + [Fact] + public async Task RetrySiteCallAsync_AsksSiteCallAuditProxyDirectly() + { + // The relay is initiated by Asking the central-local Site Call Audit + // proxy directly (no SiteEnvelope wrapping at this layer — the actor + // does the site routing itself). + var service = new CommunicationService( + Options.Create(new CommunicationOptions()), + NullLogger.Instance); + var probe = CreateTestProbe(); + service.SetSiteCallAudit(probe.Ref); + + var request = new RetrySiteCallRequest("corr-r", Guid.NewGuid(), "plant-a"); + var task = service.RetrySiteCallAsync(request); + + var received = probe.ExpectMsg(); + Assert.Same(request, received); + var reply = new RetrySiteCallResponse( + "corr-r", SiteCallRelayOutcome.Applied, true, true, null); + probe.Reply(reply); + + Assert.Same(reply, await task); + } + + [Fact] + public async Task DiscardSiteCallAsync_AsksSiteCallAuditProxyDirectly() + { + var service = new CommunicationService( + Options.Create(new CommunicationOptions()), + NullLogger.Instance); + var probe = CreateTestProbe(); + service.SetSiteCallAudit(probe.Ref); + + var request = new DiscardSiteCallRequest("corr-d", Guid.NewGuid(), "plant-a"); + var task = service.DiscardSiteCallAsync(request); + + var received = probe.ExpectMsg(); + Assert.Same(request, received); + var reply = new DiscardSiteCallResponse( + "corr-d", SiteCallRelayOutcome.SiteUnreachable, false, false, "unreachable"); + probe.Reply(reply); + + var result = await task; + Assert.Same(reply, result); + Assert.False(result.SiteReachable); + } + /// /// Stand-in for CentralCommunicationActor: verifies the message is wrapped /// in a SiteEnvelope targeting the requested site and replies with a typed diff --git a/tests/ScadaLink.Communication.Tests/SiteCommunicationActorTests.cs b/tests/ScadaLink.Communication.Tests/SiteCommunicationActorTests.cs index 5a39f2f..4bc880b 100644 --- a/tests/ScadaLink.Communication.Tests/SiteCommunicationActorTests.cs +++ b/tests/ScadaLink.Communication.Tests/SiteCommunicationActorTests.cs @@ -214,4 +214,72 @@ public class SiteCommunicationActorTests : TestKit ExpectMsg(msg => !msg.Success); } + + // ── Task 5 (#22): central→site Retry/Discard relay for parked cached calls ── + + [Fact] + public void RetryParkedOperation_WithHandler_ForwardedToParkedMessageHandler() + { + var dmProbe = CreateTestProbe(); + var handlerProbe = CreateTestProbe(); + var siteActor = Sys.ActorOf(Props.Create(() => + new SiteCommunicationActor("site1", _options, dmProbe.Ref))); + + siteActor.Tell(new RegisterLocalHandler(LocalHandlerType.ParkedMessages, handlerProbe.Ref)); + + var id = Commons.Types.TrackedOperationId.New(); + siteActor.Tell(new RetryParkedOperation("corr-rp", id)); + + handlerProbe.ExpectMsg(msg => + msg.CorrelationId == "corr-rp" && msg.TrackedOperationId.Equals(id)); + } + + [Fact] + public void DiscardParkedOperation_WithHandler_ForwardedToParkedMessageHandler() + { + var dmProbe = CreateTestProbe(); + var handlerProbe = CreateTestProbe(); + var siteActor = Sys.ActorOf(Props.Create(() => + new SiteCommunicationActor("site1", _options, dmProbe.Ref))); + + siteActor.Tell(new RegisterLocalHandler(LocalHandlerType.ParkedMessages, handlerProbe.Ref)); + + var id = Commons.Types.TrackedOperationId.New(); + siteActor.Tell(new DiscardParkedOperation("corr-dp", id)); + + handlerProbe.ExpectMsg(msg => + msg.CorrelationId == "corr-dp" && msg.TrackedOperationId.Equals(id)); + } + + [Fact] + public void RetryParkedOperation_WithoutHandler_RepliesNotAppliedAck() + { + // No parked-message handler registered — the relay must get a definitive + // non-applied ack, not silence (the SiteCallAuditActor's Ask must not + // hang and then mis-report site-unreachable when the site IS reachable). + var dmProbe = CreateTestProbe(); + var siteActor = Sys.ActorOf(Props.Create(() => + new SiteCommunicationActor("site1", _options, dmProbe.Ref))); + + siteActor.Tell(new RetryParkedOperation("corr-no-handler", Commons.Types.TrackedOperationId.New())); + + var ack = ExpectMsg(); + Assert.Equal("corr-no-handler", ack.CorrelationId); + Assert.False(ack.Applied); + Assert.NotNull(ack.ErrorMessage); + } + + [Fact] + public void DiscardParkedOperation_WithoutHandler_RepliesNotAppliedAck() + { + var dmProbe = CreateTestProbe(); + var siteActor = Sys.ActorOf(Props.Create(() => + new SiteCommunicationActor("site1", _options, dmProbe.Ref))); + + siteActor.Tell(new DiscardParkedOperation("corr-no-handler", Commons.Types.TrackedOperationId.New())); + + var ack = ExpectMsg(); + Assert.False(ack.Applied); + Assert.NotNull(ack.ErrorMessage); + } } diff --git a/tests/ScadaLink.SiteCallAudit.Tests/SiteCallRelayTests.cs b/tests/ScadaLink.SiteCallAudit.Tests/SiteCallRelayTests.cs new file mode 100644 index 0000000..6cff69d --- /dev/null +++ b/tests/ScadaLink.SiteCallAudit.Tests/SiteCallRelayTests.cs @@ -0,0 +1,212 @@ +using Akka.Actor; +using Akka.TestKit.Xunit2; +using Microsoft.Extensions.Logging.Abstractions; +using ScadaLink.Commons.Entities.Audit; +using ScadaLink.Commons.Interfaces.Repositories; +using ScadaLink.Commons.Messages.Audit; +using ScadaLink.Commons.Messages.RemoteQuery; +using ScadaLink.Commons.Types; +using ScadaLink.Commons.Types.Audit; +using ScadaLink.Communication; + +namespace ScadaLink.SiteCallAudit.Tests; + +/// +/// Task 5 (#22 Retry/Discard relay): tests for +/// relaying operator Retry/Discard on a parked Site Call down to the owning +/// site. The relay routes a / +/// command via a +/// to the +/// (stood in by a TestProbe here) and awaits the site's +/// . These tests never touch the +/// SiteCalls repository — central never mutates the mirror row. +/// +public class SiteCallRelayTests : TestKit +{ + /// + /// A repository that fails every call — the relay path must NEVER touch the + /// SiteCalls table (central is not the source of truth), so any + /// invocation here is a test failure surfaced as an exception. + /// + private sealed class ThrowingRepository : ISiteCallAuditRepository + { + public Task UpsertAsync(SiteCall siteCall, CancellationToken ct = default) => + throw new InvalidOperationException("relay must not write the SiteCalls row"); + + public Task GetAsync(TrackedOperationId id, CancellationToken ct = default) => + throw new InvalidOperationException("relay must not read the SiteCalls row"); + + public Task> QueryAsync( + SiteCallQueryFilter filter, SiteCallPaging paging, CancellationToken ct = default) => + throw new InvalidOperationException("relay must not query the SiteCalls table"); + + public Task PurgeTerminalAsync(DateTime olderThanUtc, CancellationToken ct = default) => + throw new InvalidOperationException("relay must not purge"); + + public Task ComputeKpisAsync( + DateTime stuckCutoff, DateTime intervalSince, CancellationToken ct = default) => + throw new InvalidOperationException("relay must not compute KPIs"); + + public Task> ComputePerSiteKpisAsync( + DateTime stuckCutoff, DateTime intervalSince, CancellationToken ct = default) => + throw new InvalidOperationException("relay must not compute per-site KPIs"); + } + + /// + /// Builds a with a throwing repository and a + /// short relay timeout, and registers as the + /// central→site transport. + /// + private IActorRef CreateActor(IActorRef centralComm) + { + var options = new SiteCallAuditOptions { RelayTimeout = TimeSpan.FromMilliseconds(500) }; + var actor = Sys.ActorOf(Props.Create(() => new SiteCallAuditActor( + new ThrowingRepository(), + NullLogger.Instance, + options))); + actor.Tell(new RegisterCentralCommunication(centralComm)); + return actor; + } + + [Fact] + public void RetrySiteCall_RoutesRetryParkedOperation_ToOwningSite() + { + var central = CreateTestProbe(); + var actor = CreateActor(central.Ref); + + var id = Guid.NewGuid(); + actor.Tell(new RetrySiteCallRequest("corr-1", id, "site-north")); + + // The relay must wrap a RetryParkedOperation in a SiteEnvelope addressed + // to the owning site. + var envelope = central.ExpectMsg(); + Assert.Equal("site-north", envelope.SiteId); + var relay = Assert.IsType(envelope.Message); + Assert.Equal(id, relay.TrackedOperationId.Value); + + // The site applies it and acks; the relay reports Applied. + central.Reply(new ParkedOperationActionAck(relay.CorrelationId, Applied: true)); + + var response = ExpectMsg(); + Assert.Equal("corr-1", response.CorrelationId); + Assert.Equal(SiteCallRelayOutcome.Applied, response.Outcome); + Assert.True(response.Success); + Assert.True(response.SiteReachable); + Assert.Null(response.ErrorMessage); + } + + [Fact] + public void DiscardSiteCall_RoutesDiscardParkedOperation_ToOwningSite() + { + var central = CreateTestProbe(); + var actor = CreateActor(central.Ref); + + var id = Guid.NewGuid(); + actor.Tell(new DiscardSiteCallRequest("corr-2", id, "site-south")); + + var envelope = central.ExpectMsg(); + Assert.Equal("site-south", envelope.SiteId); + var relay = Assert.IsType(envelope.Message); + Assert.Equal(id, relay.TrackedOperationId.Value); + + central.Reply(new ParkedOperationActionAck(relay.CorrelationId, Applied: true)); + + var response = ExpectMsg(); + Assert.Equal(SiteCallRelayOutcome.Applied, response.Outcome); + Assert.True(response.Success); + } + + [Fact] + public void RetrySiteCall_SiteRepliesNotApplied_ReportsNotParked() + { + var central = CreateTestProbe(); + var actor = CreateActor(central.Ref); + + actor.Tell(new RetrySiteCallRequest("corr-3", Guid.NewGuid(), "site-north")); + + var envelope = central.ExpectMsg(); + var relay = (RetryParkedOperation)envelope.Message; + // The site found nothing parked — a definitive answer, not a failure. + central.Reply(new ParkedOperationActionAck(relay.CorrelationId, Applied: false)); + + var response = ExpectMsg(); + Assert.Equal(SiteCallRelayOutcome.NotParked, response.Outcome); + Assert.False(response.Success); + Assert.True(response.SiteReachable); + } + + [Fact] + public void RetrySiteCall_SiteRepliesError_ReportsOperationFailed() + { + var central = CreateTestProbe(); + var actor = CreateActor(central.Ref); + + actor.Tell(new RetrySiteCallRequest("corr-4", Guid.NewGuid(), "site-north")); + + var envelope = central.ExpectMsg(); + var relay = (RetryParkedOperation)envelope.Message; + central.Reply(new ParkedOperationActionAck( + relay.CorrelationId, Applied: false, "Parked message handler not available")); + + var response = ExpectMsg(); + Assert.Equal(SiteCallRelayOutcome.OperationFailed, response.Outcome); + Assert.False(response.Success); + // The site WAS reached — this is an operation failure, not unreachable. + Assert.True(response.SiteReachable); + Assert.NotNull(response.ErrorMessage); + } + + [Fact] + public void RetrySiteCall_SiteNeverReplies_ReportsSiteUnreachable() + { + // A central comm probe that silently drops the relay — models an offline + // site / no ClusterClient route: the Ask times out. + var central = CreateTestProbe(); + var actor = CreateActor(central.Ref); + + actor.Tell(new RetrySiteCallRequest("corr-5", Guid.NewGuid(), "site-offline")); + + central.ExpectMsg(); + // Probe does not reply — the relay Ask times out (RelayTimeout = 500ms). + + var response = ExpectMsg(TimeSpan.FromSeconds(3)); + Assert.Equal(SiteCallRelayOutcome.SiteUnreachable, response.Outcome); + Assert.False(response.Success); + // The distinct unreachable signal the UI relies on. + Assert.False(response.SiteReachable); + Assert.NotNull(response.ErrorMessage); + } + + [Fact] + public void DiscardSiteCall_SiteNeverReplies_ReportsSiteUnreachable() + { + var central = CreateTestProbe(); + var actor = CreateActor(central.Ref); + + actor.Tell(new DiscardSiteCallRequest("corr-6", Guid.NewGuid(), "site-offline")); + + central.ExpectMsg(); + + var response = ExpectMsg(TimeSpan.FromSeconds(3)); + Assert.Equal(SiteCallRelayOutcome.SiteUnreachable, response.Outcome); + Assert.False(response.SiteReachable); + } + + [Fact] + public void RetrySiteCall_BeforeCentralCommunicationRegistered_ReportsSiteUnreachable() + { + // No RegisterCentralCommunication — the actor has no transport to reach + // any site, so the only honest answer is "unreachable". + var options = new SiteCallAuditOptions { RelayTimeout = TimeSpan.FromMilliseconds(500) }; + var actor = Sys.ActorOf(Props.Create(() => new SiteCallAuditActor( + new ThrowingRepository(), + NullLogger.Instance, + options))); + + actor.Tell(new RetrySiteCallRequest("corr-7", Guid.NewGuid(), "site-north")); + + var response = ExpectMsg(); + Assert.Equal(SiteCallRelayOutcome.SiteUnreachable, response.Outcome); + Assert.False(response.SiteReachable); + } +} diff --git a/tests/ScadaLink.StoreAndForward.Tests/ParkedOperationRelayTests.cs b/tests/ScadaLink.StoreAndForward.Tests/ParkedOperationRelayTests.cs new file mode 100644 index 0000000..7d4ef6e --- /dev/null +++ b/tests/ScadaLink.StoreAndForward.Tests/ParkedOperationRelayTests.cs @@ -0,0 +1,168 @@ +using Akka.Actor; +using Akka.TestKit.Xunit2; +using Microsoft.Data.Sqlite; +using Microsoft.Extensions.Logging.Abstractions; +using ScadaLink.Commons.Messages.RemoteQuery; +using ScadaLink.Commons.Types; +using ScadaLink.Commons.Types.Enums; + +namespace ScadaLink.StoreAndForward.Tests; + +/// +/// Task 5 (#22 Retry/Discard relay): tests the site-side execution of a +/// central→site / +/// relay command on the . The cached +/// call's S&F buffer message id is the , so +/// the handler resolves the parked row directly from the tracked id and reuses +/// the existing parked-message Retry/Discard primitive. A non-parked operation +/// must be a safe no-op (Applied=false), never a corruption. +/// +public class ParkedOperationRelayTests : TestKit, IAsyncLifetime, IDisposable +{ + private readonly SqliteConnection _keepAlive; + private readonly StoreAndForwardStorage _storage; + private readonly StoreAndForwardService _service; + + public ParkedOperationRelayTests() + { + var connStr = $"Data Source=RelayTests_{Guid.NewGuid():N};Mode=Memory;Cache=Shared"; + _keepAlive = new SqliteConnection(connStr); + _keepAlive.Open(); + + _storage = new StoreAndForwardStorage(connStr, NullLogger.Instance); + + var options = new StoreAndForwardOptions + { + DefaultRetryInterval = TimeSpan.Zero, + DefaultMaxRetries = 1, + RetryTimerInterval = TimeSpan.FromMinutes(10), + ReplicationEnabled = false, + }; + + _service = new StoreAndForwardService( + _storage, options, NullLogger.Instance); + } + + public async Task InitializeAsync() => await _storage.InitializeAsync(); + + public Task DisposeAsync() => Task.CompletedTask; + + protected override void Dispose(bool disposing) + { + if (disposing) _keepAlive.Dispose(); + base.Dispose(disposing); + } + + /// + /// Enqueues a cached-call message whose S&F id is the supplied + /// and parks it via the retry sweep. + /// + private async Task ParkCachedCallAsync(TrackedOperationId id) + { + _service.RegisterDeliveryHandler( + StoreAndForwardCategory.ExternalSystem, _ => throw new HttpRequestException("always fails")); + await _service.EnqueueAsync( + StoreAndForwardCategory.ExternalSystem, "ERP.GetOrder", """{}""", + maxRetries: 1, messageId: id.ToString()); + await _service.RetryPendingMessagesAsync(); + } + + [Fact] + public async Task RetryParkedOperation_ParkedCachedCall_ResetsToPendingAndApplied() + { + var id = TrackedOperationId.New(); + await ParkCachedCallAsync(id); + + var actor = Sys.ActorOf(Props.Create(() => new ParkedMessageHandlerActor(_service, "site-1"))); + actor.Tell(new RetryParkedOperation("corr-1", id)); + + var ack = ExpectMsg(); + Assert.True(ack.Applied); + Assert.Equal("corr-1", ack.CorrelationId); + Assert.Null(ack.ErrorMessage); + + // The parked row was reset back to Pending so the retry sweep picks it up. + var msg = await _storage.GetMessageByIdAsync(id.ToString()); + Assert.NotNull(msg); + Assert.Equal(StoreAndForwardMessageStatus.Pending, msg!.Status); + } + + [Fact] + public async Task DiscardParkedOperation_ParkedCachedCall_RemovesRowAndApplied() + { + var id = TrackedOperationId.New(); + await ParkCachedCallAsync(id); + + var actor = Sys.ActorOf(Props.Create(() => new ParkedMessageHandlerActor(_service, "site-1"))); + actor.Tell(new DiscardParkedOperation("corr-2", id)); + + var ack = ExpectMsg(); + Assert.True(ack.Applied); + Assert.Equal("corr-2", ack.CorrelationId); + + var msg = await _storage.GetMessageByIdAsync(id.ToString()); + Assert.Null(msg); + } + + [Fact] + public void RetryParkedOperation_UnknownOperation_IsSafeNoOp() + { + var actor = Sys.ActorOf(Props.Create(() => new ParkedMessageHandlerActor(_service, "site-1"))); + actor.Tell(new RetryParkedOperation("corr-3", TrackedOperationId.New())); + + var ack = ExpectMsg(); + // No parked row matched — definitive "nothing to do", not an error. + Assert.False(ack.Applied); + Assert.Equal("corr-3", ack.CorrelationId); + Assert.Null(ack.ErrorMessage); + } + + [Fact] + public async Task RetryParkedOperation_NonParkedOperation_IsSafeNoOpAndDoesNotCorrupt() + { + // Enqueue a cached call but DO NOT park it — it stays Pending. + var id = TrackedOperationId.New(); + _service.RegisterDeliveryHandler( + StoreAndForwardCategory.ExternalSystem, _ => throw new HttpRequestException("fails")); + await _service.EnqueueAsync( + StoreAndForwardCategory.ExternalSystem, "ERP.GetOrder", """{}""", + maxRetries: 5, messageId: id.ToString()); + + var before = await _storage.GetMessageByIdAsync(id.ToString()); + Assert.Equal(StoreAndForwardMessageStatus.Pending, before!.Status); + + var actor = Sys.ActorOf(Props.Create(() => new ParkedMessageHandlerActor(_service, "site-1"))); + actor.Tell(new RetryParkedOperation("corr-4", id)); + + var ack = ExpectMsg(); + // The row is Pending, not Parked — Retry must be a no-op, not a mutation. + Assert.False(ack.Applied); + + var after = await _storage.GetMessageByIdAsync(id.ToString()); + Assert.NotNull(after); + Assert.Equal(StoreAndForwardMessageStatus.Pending, after!.Status); + // retry_count untouched — a Parked-only Retry must not reset a live row. + Assert.Equal(before.RetryCount, after.RetryCount); + } + + [Fact] + public async Task DiscardParkedOperation_NonParkedOperation_IsSafeNoOp() + { + var id = TrackedOperationId.New(); + _service.RegisterDeliveryHandler( + StoreAndForwardCategory.ExternalSystem, _ => throw new HttpRequestException("fails")); + await _service.EnqueueAsync( + StoreAndForwardCategory.ExternalSystem, "ERP.GetOrder", """{}""", + maxRetries: 5, messageId: id.ToString()); + + var actor = Sys.ActorOf(Props.Create(() => new ParkedMessageHandlerActor(_service, "site-1"))); + actor.Tell(new DiscardParkedOperation("corr-5", id)); + + var ack = ExpectMsg(); + Assert.False(ack.Applied); + + // The Pending row must NOT have been deleted by a Parked-only Discard. + var after = await _storage.GetMessageByIdAsync(id.ToString()); + Assert.NotNull(after); + } +}