using Akka.Actor; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using ZB.MOM.WW.ScadaBridge.Commons.Messages.Artifacts; using ZB.MOM.WW.ScadaBridge.Commons.Messages.Audit; using ZB.MOM.WW.ScadaBridge.Commons.Messages.DebugView; using ZB.MOM.WW.ScadaBridge.Commons.Messages.Deployment; using ZB.MOM.WW.ScadaBridge.Commons.Messages.Health; using ZB.MOM.WW.ScadaBridge.Commons.Messages.InboundApi; using ZB.MOM.WW.ScadaBridge.Commons.Messages.Integration; using ZB.MOM.WW.ScadaBridge.Commons.Messages.Lifecycle; using ZB.MOM.WW.ScadaBridge.Commons.Messages.Management; using ZB.MOM.WW.ScadaBridge.Commons.Messages.Notification; using ZB.MOM.WW.ScadaBridge.Commons.Messages.RemoteQuery; using ZB.MOM.WW.ScadaBridge.Communication.Actors; namespace ZB.MOM.WW.ScadaBridge.Communication; /// /// Central-side service that wraps the Akka Ask pattern with per-pattern timeouts. /// Provides a typed API for sending messages to sites and awaiting responses. /// On connection drop, the ask times out (no central buffering per design). /// public class CommunicationService { private readonly CommunicationOptions _options; private readonly ILogger _logger; private IActorRef? _centralCommunicationActor; private IActorRef? _notificationOutboxProxy; private IActorRef? _siteCallAuditProxy; /// /// Initializes a new instance of the CommunicationService. /// /// Communication service configuration options. /// Logger instance. public CommunicationService( IOptions options, ILogger logger) { _options = options.Value; _logger = logger; } /// /// Sets the central communication actor reference. Called during actor system startup. /// /// The central communication actor reference. public void SetCommunicationActor(IActorRef centralCommunicationActor) { _centralCommunicationActor = centralCommunicationActor; } /// /// Sets the notification-outbox singleton proxy reference. Called during actor /// system startup. The outbox actor is central-local, so outbox calls Ask this /// proxy directly (no SiteEnvelope routing). /// /// The notification outbox proxy reference. public void SetNotificationOutbox(IActorRef notificationOutboxProxy) { _notificationOutboxProxy = notificationOutboxProxy; } /// /// Sets the Site Call Audit (#22) singleton proxy reference. Called during /// actor system startup. The Site Call Audit actor is central-local, so Site /// Calls read calls Ask this proxy directly (no SiteEnvelope routing), the /// same pattern as . /// /// The Site Call Audit proxy reference. public void SetSiteCallAudit(IActorRef siteCallAuditProxy) { _siteCallAuditProxy = siteCallAuditProxy; } /// /// Triggers an immediate refresh of the site address cache from the database. /// public void RefreshSiteAddresses() { GetActor().Tell(new RefreshSiteAddresses()); } /// /// Gets the central communication actor reference. Throws if not yet initialized. /// /// The for the central communication actor. public IActorRef GetCommunicationActor() { return _centralCommunicationActor ?? throw new InvalidOperationException("CommunicationService not initialized. CentralCommunicationActor not set."); } private IActorRef GetActor() => GetCommunicationActor(); /// /// Gets the notification-outbox proxy reference. Throws if not yet initialized. /// private IActorRef GetNotificationOutbox() { return _notificationOutboxProxy ?? throw new InvalidOperationException("CommunicationService not initialized. NotificationOutbox proxy not set."); } /// /// Gets the Site Call Audit proxy reference. Throws if not yet initialized. /// private IActorRef GetSiteCallAudit() { return _siteCallAuditProxy ?? throw new InvalidOperationException("CommunicationService not initialized. SiteCallAudit proxy not set."); } // ── Pattern 1: Instance Deployment ── /// /// Sends a deployment command for an instance to a site. /// /// The target site identifier. /// The deployment command. /// Cancellation token. /// The deployment status response. public async Task DeployInstanceAsync( string siteId, DeployInstanceCommand command, CancellationToken cancellationToken = default) { _logger.LogDebug( "Sending DeployInstanceCommand to site {SiteId}, instance={Instance}, correlationId={DeploymentId}", siteId, command.InstanceUniqueName, command.DeploymentId); var envelope = new SiteEnvelope(siteId, command); return await GetActor().Ask( envelope, _options.DeploymentTimeout, cancellationToken); } /// /// DeploymentManager-006: queries a site for the currently-applied deployment /// identity of a single instance. Used by the Deployment Manager before a /// re-deploy to reconcile against the site's actual state. Sent over the /// existing ClusterClient command/control transport; the Ask times out (no /// central buffering) if the site is unreachable, and the caller falls /// through to a normal deploy. /// /// The target site identifier. /// The deployment state query request. /// Cancellation token. /// The deployment state query response. public async Task QueryDeploymentStateAsync( string siteId, DeploymentStateQueryRequest request, CancellationToken cancellationToken = default) { _logger.LogDebug( "Sending DeploymentStateQueryRequest to site {SiteId}, instance={Instance}, correlationId={CorrelationId}", siteId, request.InstanceUniqueName, request.CorrelationId); var envelope = new SiteEnvelope(siteId, request); return await GetActor().Ask( envelope, _options.QueryTimeout, cancellationToken); } // ── Pattern 2: Lifecycle ── /// /// Sends a disable command for an instance to a site. /// /// The target site identifier. /// The disable command. /// Cancellation token. /// The instance lifecycle response. public async Task DisableInstanceAsync( string siteId, DisableInstanceCommand command, CancellationToken cancellationToken = default) { var envelope = new SiteEnvelope(siteId, command); return await GetActor().Ask( envelope, _options.LifecycleTimeout, cancellationToken); } /// /// Sends an enable command for an instance to a site. /// /// The target site identifier. /// The enable command. /// Cancellation token. /// The instance lifecycle response. public async Task EnableInstanceAsync( string siteId, EnableInstanceCommand command, CancellationToken cancellationToken = default) { var envelope = new SiteEnvelope(siteId, command); return await GetActor().Ask( envelope, _options.LifecycleTimeout, cancellationToken); } /// /// Sends a delete command for an instance to a site. /// /// The target site identifier. /// The delete command. /// Cancellation token. /// The instance lifecycle response. public async Task DeleteInstanceAsync( string siteId, DeleteInstanceCommand command, CancellationToken cancellationToken = default) { var envelope = new SiteEnvelope(siteId, command); return await GetActor().Ask( envelope, _options.LifecycleTimeout, cancellationToken); } // ── Pattern 3: Artifact Deployment ── /// /// Sends a system-wide artifact deployment command to a site. /// /// The target site identifier. /// The artifact deployment command. /// Cancellation token. /// The artifact deployment response. public async Task DeployArtifactsAsync( string siteId, DeployArtifactsCommand command, CancellationToken cancellationToken = default) { var envelope = new SiteEnvelope(siteId, command); return await GetActor().Ask( envelope, _options.ArtifactDeploymentTimeout, cancellationToken); } // ── Pattern 4: Integration Routing ── /// /// Routes an integration call to a site. /// /// The target site identifier. /// The integration call request. /// Cancellation token. /// The integration call response. public async Task RouteIntegrationCallAsync( string siteId, IntegrationCallRequest request, CancellationToken cancellationToken = default) { var envelope = new SiteEnvelope(siteId, request); return await GetActor().Ask( envelope, _options.IntegrationTimeout, cancellationToken); } // ── Pattern 5: Debug View ── /// /// Subscribes to debug view events from a site. /// /// The target site identifier. /// The debug view subscription request. /// Cancellation token. /// A snapshot of the debug view. public async Task SubscribeDebugViewAsync( string siteId, SubscribeDebugViewRequest request, CancellationToken cancellationToken = default) { var envelope = new SiteEnvelope(siteId, request); return await GetActor().Ask( envelope, _options.DebugViewTimeout, cancellationToken); } /// /// Unsubscribes from debug view events for a site. /// /// The target site identifier. /// The debug view unsubscription request. public void UnsubscribeDebugView(string siteId, UnsubscribeDebugViewRequest request) { // Tell (fire-and-forget) — no response expected GetActor().Tell(new SiteEnvelope(siteId, request)); } // ── Pattern 6a: Debug Snapshot (one-shot, request/response) ── /// /// Requests a snapshot of the debug view from a site. /// /// The target site identifier. /// The debug snapshot request. /// Cancellation token. /// A snapshot of the debug view. public async Task RequestDebugSnapshotAsync( string siteId, DebugSnapshotRequest request, CancellationToken cancellationToken = default) { var envelope = new SiteEnvelope(siteId, request); return await GetActor().Ask( envelope, _options.QueryTimeout, cancellationToken); } // ── Pattern 6b: Health Reporting (site→central, Tell) ── // Health reports are received by central, not sent. No method needed here. // ── Pattern 7: Remote Queries ── /// /// Queries event logs from a site. /// /// The target site identifier. /// The event log query request. /// Cancellation token. /// The event log query response. public async Task QueryEventLogsAsync( string siteId, EventLogQueryRequest request, CancellationToken cancellationToken = default) { var envelope = new SiteEnvelope(siteId, request); return await GetActor().Ask( envelope, _options.QueryTimeout, cancellationToken); } /// /// Queries parked messages from a site. /// /// The target site identifier. /// The parked message query request. /// Cancellation token. /// The parked message query response. public async Task QueryParkedMessagesAsync( string siteId, ParkedMessageQueryRequest request, CancellationToken cancellationToken = default) { var envelope = new SiteEnvelope(siteId, request); return await GetActor().Ask( envelope, _options.QueryTimeout, cancellationToken); } /// /// Retries a parked message at a site. /// /// The target site identifier. /// The parked message retry request. /// Cancellation token. /// The parked message retry response. public async Task RetryParkedMessageAsync( string siteId, ParkedMessageRetryRequest request, CancellationToken cancellationToken = default) { var envelope = new SiteEnvelope(siteId, request); return await GetActor().Ask( envelope, _options.QueryTimeout, cancellationToken); } /// /// Discards a parked message at a site. /// /// The target site identifier. /// The parked message discard request. /// Cancellation token. /// The parked message discard response. public async Task DiscardParkedMessageAsync( string siteId, ParkedMessageDiscardRequest request, CancellationToken cancellationToken = default) { var envelope = new SiteEnvelope(siteId, request); return await GetActor().Ask( envelope, _options.QueryTimeout, cancellationToken); } // ── OPC UA Tag Browser (interactive design-time query) ── /// /// Asks a site to enumerate the immediate children of an OPC UA node on the /// live server backing the given data connection. Used by the CentralUI OPC UA /// Tag Browser dialog. The Ask is bounded by /// — interactive browse expansions are short, one-shot queries that share the /// same latency budget as other remote queries (event logs, parked messages). /// /// The target site identifier. /// The OPC UA browse command. /// Cancellation token. /// The browse result (children + truncation flag + structured failure). public Task BrowseNodeAsync( string siteId, BrowseNodeCommand command, CancellationToken cancellationToken = default) { var envelope = new SiteEnvelope(siteId, command); return GetActor().Ask( envelope, _options.QueryTimeout, cancellationToken); } // ── Test Bindings (one-shot live read of bound tags) ── /// /// Asks a site to read the current value of one or more tags on the live /// server backing the given data connection. Used by the CentralUI "Test /// Bindings" dialog on the Configure Instance page. The Ask is bounded by /// — same latency budget /// as (both are interactive one-shot /// design-time queries). /// /// The target site identifier. /// The read-tag-values command (connection name + tag paths). /// Cancellation token. /// The read result — per-tag outcomes plus an optional connection-level failure. public Task ReadTagValuesAsync( string siteId, ReadTagValuesCommand command, CancellationToken cancellationToken = default) { var envelope = new SiteEnvelope(siteId, command); return GetActor().Ask( envelope, _options.QueryTimeout, cancellationToken); } // ── Pattern 8: Heartbeat (site→central, Tell) ── // Heartbeats are received by central, not sent. No method needed here. // ── Inbound API Cross-Site Routing (WP-4) ── /// /// Routes an inbound API call to a site. /// /// The target site identifier. /// The call route request. /// Cancellation token. /// The call route response. public async Task RouteToCallAsync( string siteId, RouteToCallRequest request, CancellationToken cancellationToken = default) { var envelope = new SiteEnvelope(siteId, request); return await GetActor().Ask( envelope, _options.IntegrationTimeout, cancellationToken); } /// /// Routes an inbound API get-attributes request to a site. /// /// The target site identifier. /// The get-attributes route request. /// Cancellation token. /// The get-attributes route response. public async Task RouteToGetAttributesAsync( string siteId, RouteToGetAttributesRequest request, CancellationToken cancellationToken = default) { var envelope = new SiteEnvelope(siteId, request); return await GetActor().Ask( envelope, _options.IntegrationTimeout, cancellationToken); } /// /// Routes an inbound API set-attributes request to a site. /// /// The target site identifier. /// The set-attributes route request. /// Cancellation token. /// The set-attributes route response. public async Task RouteToSetAttributesAsync( string siteId, RouteToSetAttributesRequest request, CancellationToken cancellationToken = default) { var envelope = new SiteEnvelope(siteId, request); return await GetActor().Ask( envelope, _options.IntegrationTimeout, cancellationToken); } // ── Notification Outbox (central-local actor — Asked directly, no SiteEnvelope) ── /// /// Queries the notification outbox. /// /// The notification outbox query request. /// Cancellation token. /// The notification outbox query response. public async Task QueryNotificationOutboxAsync( NotificationOutboxQueryRequest request, CancellationToken cancellationToken = default) { return await GetNotificationOutbox().Ask( request, _options.QueryTimeout, cancellationToken); } /// /// Retries a notification from the outbox. /// /// The retry notification request. /// Cancellation token. /// The retry notification response. public async Task RetryNotificationAsync( RetryNotificationRequest request, CancellationToken cancellationToken = default) { return await GetNotificationOutbox().Ask( request, _options.QueryTimeout, cancellationToken); } /// /// Discards a notification from the outbox. /// /// The discard notification request. /// Cancellation token. /// The discard notification response. public async Task DiscardNotificationAsync( DiscardNotificationRequest request, CancellationToken cancellationToken = default) { return await GetNotificationOutbox().Ask( request, _options.QueryTimeout, cancellationToken); } /// /// Gets details about a specific notification. /// /// The notification detail request. /// Cancellation token. /// The notification detail response. public async Task GetNotificationDetailAsync( NotificationDetailRequest request, CancellationToken cancellationToken = default) { return await GetNotificationOutbox().Ask( request, _options.QueryTimeout, cancellationToken); } /// /// Gets KPI metrics for the notification outbox. /// /// The notification KPI request. /// Cancellation token. /// The notification KPI response. public async Task GetNotificationKpisAsync( NotificationKpiRequest request, CancellationToken cancellationToken = default) { return await GetNotificationOutbox().Ask( request, _options.QueryTimeout, cancellationToken); } /// /// Gets per-site KPI metrics for the notification outbox. /// /// The per-site notification KPI request. /// Cancellation token. /// The per-site notification KPI response. public async Task GetPerSiteNotificationKpisAsync( PerSiteNotificationKpiRequest request, CancellationToken cancellationToken = default) { return await GetNotificationOutbox().Ask( request, _options.QueryTimeout, cancellationToken); } // ── Site Call Audit (central-local actor — Asked directly, no SiteEnvelope) ── /// /// Queries site call audit records. /// /// The site call query request. /// Cancellation token. /// The site call query response. public async Task QuerySiteCallsAsync( SiteCallQueryRequest request, CancellationToken cancellationToken = default) { return await GetSiteCallAudit().Ask( request, _options.QueryTimeout, cancellationToken); } /// /// Gets details about a specific site call. /// /// The site call detail request. /// Cancellation token. /// The site call detail response. public async Task GetSiteCallDetailAsync( SiteCallDetailRequest request, CancellationToken cancellationToken = default) { return await GetSiteCallAudit().Ask( request, _options.QueryTimeout, cancellationToken); } /// /// Gets KPI metrics for site calls. /// /// The site call KPI request. /// Cancellation token. /// The site call KPI response. public async Task GetSiteCallKpisAsync( SiteCallKpiRequest request, CancellationToken cancellationToken = default) { return await GetSiteCallAudit().Ask( request, _options.QueryTimeout, cancellationToken); } /// /// Gets per-site KPI metrics for site calls. /// /// The per-site site call KPI request. /// Cancellation token. /// The per-site site call KPI response. public async Task GetPerSiteSiteCallKpisAsync( PerSiteSiteCallKpiRequest request, CancellationToken cancellationToken = default) { return await GetSiteCallAudit().Ask( request, _options.QueryTimeout, cancellationToken); } /// /// Task 5 (#22): relays an operator Retry of a parked cached call to its /// owning site. The SiteCallAuditActor is Asked directly (it is /// central-local); it in turn relays a RetryParkedOperation to the /// owning site and replies a carrying a /// distinct site-unreachable outcome. Central never mutates the central /// SiteCalls mirror row. /// /// This outer Ask uses /// (default 30s), which must outlive the inner site relay Ask the /// SiteCallAuditActor issues with SiteCallAuditOptions.RelayTimeout /// (default 10s). The inner relay must time out first so its distinct /// SiteUnreachable outcome reaches us; were this outer Ask to expire /// first, that outcome would be lost to a generic Ask-timeout exception. /// /// /// The retry site call request. /// Cancellation token. /// The retry site call response. public async Task RetrySiteCallAsync( RetrySiteCallRequest request, CancellationToken cancellationToken = default) { return await GetSiteCallAudit().Ask( request, _options.QueryTimeout, cancellationToken); } /// /// Task 5 (#22): relays an operator Discard of a parked cached call to its /// owning site. See for the routing and /// source-of-truth rationale. /// /// The discard site call request. /// Cancellation token. /// The discard site call response. public async Task DiscardSiteCallAsync( DiscardSiteCallRequest request, CancellationToken cancellationToken = default) { return await GetSiteCallAudit().Ask( request, _options.QueryTimeout, cancellationToken); } } /// /// Envelope that wraps any message with a target site ID for routing. /// Used by CentralCommunicationActor to resolve the site actor path. /// public record SiteEnvelope(string SiteId, object Message);