using System.Text.Json; using Akka.Actor; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging.Abstractions; using ZB.MOM.WW.ScadaBridge.Commons.Messages.Notification; namespace ZB.MOM.WW.ScadaBridge.StoreAndForward; /// /// Notification Outbox: the site Store-and-Forward delivery handler for the /// /// category. /// /// In the outbox design the site no longer sends notification email itself. /// "Delivering" a buffered notification means forwarding it to the central cluster /// and treating central's as the outcome: /// /// ack Accepted returns /// true; the S&F engine removes the message from the buffer. /// ack not Accepted, or the Ask times out / fails → /// throws; the S&F engine treats any thrown /// exception as transient and retries the forward at the fixed interval. /// /// /// The forward travels over the ClusterClient command/control transport: the handler /// Asks /// the site communication actor, which wraps the message in a /// ClusterClient.Send("/user/central-communication", …) and routes central's /// reply straight back to this Ask. /// public sealed class NotificationForwarder { private readonly IActorRef _siteCommunicationActor; private readonly string _sourceSiteId; private readonly TimeSpan _forwardTimeout; private readonly ILogger _logger; /// /// Initializes a new that forwards buffered /// notifications to the central cluster via the site communication actor. /// /// /// The site communication actor. It forwards a to /// central via the registered ClusterClient and replies with the /// . /// /// This site's identifier, stamped on every submit. /// /// How long to wait for central's ack before treating the forward as a transient /// failure. Sourced from host configuration. /// /// /// Optional logger. StoreAndForward-018: a corrupt buffered payload is logged at /// Warning before being discarded so an operator has a forensic trail of the row /// that vanished from the buffer. /// public NotificationForwarder( IActorRef siteCommunicationActor, string sourceSiteId, TimeSpan forwardTimeout, ILogger? logger = null) { _siteCommunicationActor = siteCommunicationActor; _sourceSiteId = sourceSiteId; _forwardTimeout = forwardTimeout; _logger = logger ?? NullLogger.Instance; } /// /// Store-and-Forward delivery handler entry point — matches the /// Func<StoreAndForwardMessage, Task<bool>> handler contract. /// Returns true when central accepts the notification; throws on a /// non-accepted ack or an Ask timeout/failure so the engine retries. /// /// The buffered store-and-forward message to deliver to central. /// A task that resolves to true when central accepts (or the payload is corrupt and discarded); throws on a transient forward failure so the engine retries. public async Task DeliverAsync(StoreAndForwardMessage message) { // StoreAndForward-018: an unreadable payload cannot be fixed by retrying. // The design doc explicitly forbids parking notifications ("notifications do // not park — they are retried at the fixed forward interval until central // acks"; Component-StoreAndForward.md). The earlier behaviour returned false // here, which the S&F engine interprets as a permanent failure and parks // the row — contradicting the invariant and surfacing the row in the // central UI's parked-message list. The correct outcome for a corrupt-payload // notification is to DISCARD: log a Warning with the buffered row id + // payload preview for forensics, then return true so the engine clears the // buffer via its standard success-path cleanup. The buffered row is // unrecoverable; retrying or parking would both make the queue worse, not // better. if (!TryBuildSubmit(message, out var submit)) { _logger.LogWarning( "Discarding corrupt buffered notification {NotificationId} (payload is not deserialisable as NotificationSubmit). " + "Payload preview: {PayloadPreview}", message.Id, PreviewPayload(message.PayloadJson)); return true; } // The reply may legitimately be a non-accepted ack, so it is not requested as // a status-failing Ask: ask for the bare NotificationSubmitAck and classify it // here. An Ask timeout surfaces as a TimeoutException, which — like any other // thrown exception — the S&F engine treats as transient. var ack = await _siteCommunicationActor .Ask(submit, _forwardTimeout) .ConfigureAwait(false); if (ack.Accepted) { return true; } // A non-accepted ack is a transient failure: central could not persist the // notification right now. Throw so the engine keeps buffering and retries. throw new NotificationForwardException( $"Central rejected notification {submit.NotificationId}: {ack.Error ?? "no detail"}"); } /// /// Maps a buffered S&F notification message onto the /// forwarded to central, returning false if the payload is unreadable. /// /// The buffered payload IS a serialized written by /// the site Notify.Send enqueue path (Task 19). Its /// is the central idempotency key — /// it was generated by the script, equals the buffered row's /// , and is stable across every retry. The /// forwarder forwards the payload as-is except that it re-stamps the fields it /// authoritatively owns: (this site's /// id) and (the buffered row's /// origin instance), and it falls the list name back to the S&F /// when the payload list name is blank. /// private bool TryBuildSubmit(StoreAndForwardMessage message, out NotificationSubmit submit) { submit = null!; NotificationSubmit? payload; try { payload = JsonSerializer.Deserialize(message.PayloadJson); } catch (JsonException) { return false; } if (payload == null) { return false; } submit = payload with { // The NotificationId is the script-generated idempotency key carried in the // payload. Defend against a payload missing it by falling back to the // buffered row id, which the enqueue path pins to the same value. NotificationId = string.IsNullOrEmpty(payload.NotificationId) ? message.Id : payload.NotificationId, // A null OR empty/blank ListName falls back to the S&F Target — so an empty // list name is never forwarded to central. ListName = string.IsNullOrEmpty(payload.ListName) ? message.Target : payload.ListName, // SourceSiteId/SourceInstanceId are authoritatively owned by the site: the // forwarder knows the real site id, and the buffered row records the origin // instance even after the instance is deleted. SourceSiteId = _sourceSiteId, SourceInstanceId = message.OriginInstanceName, }; return true; } private const int CorruptPayloadPreviewMaxLength = 200; /// /// Returns a length-capped preview of a corrupt buffered payload for the Warning /// log line emitted on discard. The full payload may be megabytes and is not /// suitable for the structured log; the preview retains the leading characters, /// which is what an operator typically uses to identify the producing script. /// private static string PreviewPayload(string? payloadJson) { if (string.IsNullOrEmpty(payloadJson)) { return ""; } return payloadJson.Length <= CorruptPayloadPreviewMaxLength ? payloadJson : payloadJson.Substring(0, CorruptPayloadPreviewMaxLength) + "…"; } } /// /// Raised by on a transient forward failure — /// a non-accepted central ack. The Store-and-Forward engine treats any thrown /// exception as transient and retries the forward at the fixed interval. /// public sealed class NotificationForwardException : Exception { /// /// Initializes a new exception with the specified message. /// /// Message describing the forward failure. public NotificationForwardException(string message) : base(message) { } }