using System.Collections.Immutable; using Akka.Actor; using Akka.Cluster; using Akka.Cluster.Tools.Client; using Akka.Cluster.Tools.Singleton; using Akka.Configuration; using Microsoft.Extensions.Options; using ZB.MOM.WW.ScadaBridge.ClusterInfrastructure; using ZB.MOM.WW.ScadaBridge.Communication; using ZB.MOM.WW.ScadaBridge.Communication.Actors; using ZB.MOM.WW.ScadaBridge.Host.Actors; using ZB.MOM.WW.ScadaBridge.SiteRuntime; using ZB.MOM.WW.ScadaBridge.SiteRuntime.Actors; using ZB.MOM.WW.ScadaBridge.SiteRuntime.Messages; using ZB.MOM.WW.ScadaBridge.SiteRuntime.Persistence; using ZB.MOM.WW.ScadaBridge.SiteRuntime.Scripts; using ZB.MOM.WW.ScadaBridge.SiteRuntime.Streaming; using ZB.MOM.WW.ScadaBridge.StoreAndForward; namespace ZB.MOM.WW.ScadaBridge.Host.Actors; /// /// Hosted service that manages the Akka.NET actor system lifecycle. /// Creates the actor system on start, registers actors, and triggers /// CoordinatedShutdown on stop. /// /// WP-3: Transport heartbeat is explicitly configured in HOCON from CommunicationOptions. /// public class AkkaHostedService : IHostedService { private readonly IServiceProvider _serviceProvider; private readonly NodeOptions _nodeOptions; private readonly ClusterOptions _clusterOptions; private readonly CommunicationOptions _communicationOptions; private readonly ILogger _logger; private ActorSystem? _actorSystem; /// /// Auxiliary IDisposables (e.g. the SiteAuditTelemetryStalledTracker) /// that this hosted service constructs at start time and must tear down /// on shutdown — they don't fit the ActorSystem lifecycle but share its /// process scope. /// private readonly List _trackedDisposables = new(); /// /// NotificationService-020 guard: sentinel that flips to true the /// first time a Notification-category S&F delivery handler is registered /// on this hosted service instance. /// is last-write-wins on category, so a future code change that introduces /// a second registration path (e.g. a role-branch + helper that both call /// the registration) would silently overwrite the canonical /// NotificationForwarder handler with whatever the loser registers — /// the prior NS-001 fix did exactly this, and was silently superseded /// when the central-only redesign moved delivery to NotificationOutbox. /// This sentinel makes the duplicate noisy at startup so a maintainer /// re-introducing the second path sees it immediately. /// private bool _notificationDeliveryHandlerRegistered; /// /// Initializes a new instance of the class. /// /// The service provider for accessing dependencies. /// The node configuration options. /// The cluster configuration options. /// The communication configuration options. /// The logger instance. public AkkaHostedService( IServiceProvider serviceProvider, IOptions nodeOptions, IOptions clusterOptions, IOptions communicationOptions, ILogger logger) { _serviceProvider = serviceProvider; _nodeOptions = nodeOptions.Value; _clusterOptions = clusterOptions.Value; _communicationOptions = communicationOptions.Value; _logger = logger; } /// /// Gets the actor system once started. Null before StartAsync completes. /// public ActorSystem? ActorSystem => _actorSystem; /// /// Starts the Akka.NET actor system and registers actors. /// /// A cancellation token that can be used to cancel the operation. /// A task representing the asynchronous operation. public async Task StartAsync(CancellationToken cancellationToken) { // For site nodes, include a site-specific role (e.g., "site-SiteA") alongside the base role var roles = BuildRoles(); // WP-3: Transport heartbeat explicitly configured from CommunicationOptions (not framework defaults) var transportHeartbeatSec = _communicationOptions.TransportHeartbeatInterval.TotalSeconds; var transportFailureSec = _communicationOptions.TransportFailureThreshold.TotalSeconds; // Host-006: HOCON is assembled in a dedicated builder that quotes/escapes every // interpolated value, so a hostname, seed node or strategy containing a quote, // backslash or whitespace cannot corrupt the configuration document. var hocon = BuildHocon(_nodeOptions, _clusterOptions, roles, _communicationOptions.TransportHeartbeatInterval, _communicationOptions.TransportFailureThreshold); var config = ConfigurationFactory.ParseString(hocon); _actorSystem = ActorSystem.Create("scadabridge", config); _logger.LogInformation( "Akka.NET actor system 'scadabridge' started. Role={Role}, Roles={Roles}, Hostname={Hostname}, Port={Port}, " + "TransportHeartbeat={TransportHeartbeat}s, TransportFailure={TransportFailure}s", _nodeOptions.Role, string.Join(", ", roles), _nodeOptions.NodeHostname, _nodeOptions.RemotingPort, transportHeartbeatSec, transportFailureSec); // Register the dead letter monitor actor var loggerFactory = _serviceProvider.GetRequiredService(); var dlmLogger = loggerFactory.CreateLogger(); var dlmHealthCollector = _serviceProvider.GetService(); _actorSystem.ActorOf( Props.Create(() => new DeadLetterMonitorActor(dlmLogger, dlmHealthCollector)), "dead-letter-monitor"); // Register role-specific actors if (_nodeOptions.Role.Equals("Central", StringComparison.OrdinalIgnoreCase)) { RegisterCentralActors(); } else if (_nodeOptions.Role.Equals("Site", StringComparison.OrdinalIgnoreCase)) { await RegisterSiteActorsAsync(cancellationToken); } } /// /// Builds the Akka HOCON configuration document. Every interpolated value is /// routed through (string values) so a hostname, /// seed-node URI, role or split-brain strategy containing a quote, backslash or /// whitespace cannot corrupt the document or be silently misparsed (Host-006). /// /// Host-012: the keep-oldest down-if-alone flag is emitted from /// rather than hard-coded, so the bound /// configuration value is actually consumed. /// /// Host-013: every duration is rendered via in /// milliseconds, so sub-second cluster timing values (e.g. a 750ms heartbeat) are /// preserved exactly instead of being rounded to whole seconds. /// /// The node configuration options. /// The cluster configuration options. /// The list of node roles to configure. /// The transport heartbeat interval. /// The transport failure threshold. /// The Akka HOCON configuration string. public static string BuildHocon( NodeOptions nodeOptions, ClusterOptions clusterOptions, IEnumerable roles, TimeSpan transportHeartbeat, TimeSpan transportFailure) { var seedNodesStr = string.Join(",", clusterOptions.SeedNodes.Select(QuoteHocon)); var rolesStr = string.Join(",", roles.Select(QuoteHocon)); return $@" audit-telemetry-dispatcher {{ type = ForkJoinDispatcher throughput = 100 dedicated-thread-pool {{ thread-count = 2 }} }} akka {{ extensions = [ ""Akka.Cluster.Tools.PublishSubscribe.DistributedPubSubExtensionProvider, Akka.Cluster.Tools"" ] actor {{ provider = cluster }} remote {{ dot-netty.tcp {{ hostname = {QuoteHocon(nodeOptions.NodeHostname)} port = {nodeOptions.RemotingPort} }} transport-failure-detector {{ heartbeat-interval = {DurationHocon(transportHeartbeat)} acceptable-heartbeat-pause = {DurationHocon(transportFailure)} }} }} cluster {{ seed-nodes = [{seedNodesStr}] roles = [{rolesStr}] min-nr-of-members = {clusterOptions.MinNrOfMembers} split-brain-resolver {{ active-strategy = {QuoteHocon(clusterOptions.SplitBrainResolverStrategy)} stable-after = {DurationHocon(clusterOptions.StableAfter)} keep-oldest {{ down-if-alone = {(clusterOptions.DownIfAlone ? "on" : "off")} }} }} failure-detector {{ heartbeat-interval = {DurationHocon(clusterOptions.HeartbeatInterval)} acceptable-heartbeat-pause = {DurationHocon(clusterOptions.FailureDetectionThreshold)} }} run-coordinated-shutdown-when-down = on }} coordinated-shutdown {{ run-by-clr-shutdown-hook = on }} }}"; } /// /// Renders a as a HOCON duration in milliseconds. Akka's /// HOCON parser accepts a ms suffix, so emitting whole milliseconds /// preserves sub-second configuration exactly — a 750ms heartbeat stays 750ms /// rather than being rounded to 1s (or, for sub-half-second values, /// silently collapsing to a degenerate 0s) — Host-013. /// private static string DurationHocon(TimeSpan duration) { return $"{(long)Math.Round(duration.TotalMilliseconds)}ms"; } /// /// Renders a value as a HOCON double-quoted string, escaping backslashes and /// double quotes so the resulting token cannot break out of its string literal. /// private static string QuoteHocon(string? value) { var escaped = (value ?? string.Empty) .Replace("\\", "\\\\") .Replace("\"", "\\\""); return $"\"{escaped}\""; } /// /// Stops the Akka.NET actor system and cleans up resources. /// /// A cancellation token that can be used to cancel the operation. /// A task representing the asynchronous operation. public async Task StopAsync(CancellationToken cancellationToken) { // Dispose auxiliary subscribers (e.g. SiteAuditTelemetryStalledTracker) // BEFORE Akka shuts down so their EventStream unsubscribe calls run // while the system is still alive. Per-tracker Dispose is wrapped in // its own try so a misbehaving subscriber can't sink the shutdown. // Snapshot the list inside a lock so a concurrent StartAsync (the // test harness sometimes triggers a second start/stop interleaving) // can't race the enumeration. Clearing the original list under the // same lock leaves the next StartAsync with a clean slate. IDisposable[] disposables; lock (_trackedDisposables) { disposables = _trackedDisposables.ToArray(); _trackedDisposables.Clear(); } foreach (var disposable in disposables) { try { disposable.Dispose(); } catch (Exception ex) { _logger.LogWarning(ex, "Auxiliary subscriber {Type} threw during shutdown", disposable.GetType().Name); } } if (_actorSystem != null) { _logger.LogInformation("Shutting down Akka.NET actor system via CoordinatedShutdown..."); var shutdown = Akka.Actor.CoordinatedShutdown.Get(_actorSystem); await shutdown.Run(Akka.Actor.CoordinatedShutdown.ClrExitReason.Instance); _logger.LogInformation("Akka.NET actor system shutdown complete."); } } /// /// Builds the list of cluster roles for this node. Site nodes get both "Site" /// and a site-specific role (e.g., "site-SiteA") to scope singleton placement. /// private List BuildRoles() { var roles = new List { _nodeOptions.Role }; if (_nodeOptions.Role.Equals("Site", StringComparison.OrdinalIgnoreCase) && !string.IsNullOrEmpty(_nodeOptions.SiteId)) { roles.Add($"site-{_nodeOptions.SiteId}"); } return roles; } /// /// Registers central-side actors including the CentralCommunicationActor. /// WP-4: Central communication actor routes all 8 message patterns to sites. /// private void RegisterCentralActors() { // Feed this central node's hostname into the local health collector so // the CentralHealthReportLoop's report identifies the active node. var centralHealthCollector = _serviceProvider.GetService(); centralHealthCollector?.SetNodeHostname(_nodeOptions.NodeHostname); var siteClientFactory = new DefaultSiteClientFactory(); var centralCommActor = _actorSystem!.ActorOf( Props.Create(() => new CentralCommunicationActor(_serviceProvider, siteClientFactory)), "central-communication"); // Register CentralCommunicationActor with ClusterClientReceptionist so site ClusterClients can reach it ClusterClientReceptionist.Get(_actorSystem).RegisterService(centralCommActor); _logger.LogInformation("CentralCommunicationActor registered with ClusterClientReceptionist"); // Wire up the CommunicationService with the actor reference var commService = _serviceProvider.GetService(); commService?.SetCommunicationActor(centralCommActor); // Wire up the DebugStreamService with the ActorSystem var debugStreamService = _serviceProvider.GetService(); debugStreamService?.SetActorSystem(_actorSystem!); // Management Service — accessible via ClusterClient var mgmtLogger = _serviceProvider.GetRequiredService() .CreateLogger(); var mgmtActor = _actorSystem!.ActorOf( Props.Create(() => new ZB.MOM.WW.ScadaBridge.ManagementService.ManagementActor(_serviceProvider, mgmtLogger)), "management"); ClusterClientReceptionist.Get(_actorSystem).RegisterService(mgmtActor); var mgmtHolder = _serviceProvider.GetRequiredService(); mgmtHolder.ActorRef = mgmtActor; _logger.LogInformation("ManagementActor registered with ClusterClientReceptionist"); // Notification Outbox — cluster singleton so exactly one node owns ingest, // the dispatch sweep and the purge loop. Central actors run on the base // "Central" role, so the singleton settings are NOT role-scoped (unlike the // site singletons, which are scoped to a per-site role). var outboxOptions = _serviceProvider .GetRequiredService>().Value; var outboxLogger = _serviceProvider.GetRequiredService() .CreateLogger(); // M4 Bundle B: central direct-write audit writer for dispatcher attempt // + terminal events. Resolved once from the root provider — the writer // is a singleton and stateless, opening per-call DI scopes internally // to resolve the scoped IAuditLogRepository. var outboxAuditWriter = _serviceProvider .GetRequiredService(); var outboxSingletonProps = ClusterSingletonManager.Props( singletonProps: Props.Create(() => new ZB.MOM.WW.ScadaBridge.NotificationOutbox.NotificationOutboxActor( _serviceProvider, outboxOptions, outboxAuditWriter, outboxLogger)), terminationMessage: PoisonPill.Instance, settings: ClusterSingletonManagerSettings.Create(_actorSystem!) .WithSingletonName("notification-outbox")); _actorSystem!.ActorOf(outboxSingletonProps, "notification-outbox-singleton"); var outboxProxyProps = ClusterSingletonProxy.Props( singletonManagerPath: "/user/notification-outbox-singleton", settings: ClusterSingletonProxySettings.Create(_actorSystem) .WithSingletonName("notification-outbox")); var outboxProxy = _actorSystem.ActorOf(outboxProxyProps, "notification-outbox-proxy"); // Hand the outbox proxy to the CentralCommunicationActor so forwarded // NotificationSubmit messages from sites are routed to the outbox singleton. centralCommActor.Tell(new RegisterNotificationOutbox(outboxProxy)); // Hand the same proxy to the CommunicationService so the Central UI can // Ask the outbox actor directly (query, retry, discard, KPIs). commService?.SetNotificationOutbox(outboxProxy); _logger.LogInformation("NotificationOutbox singleton created and registered with CentralCommunicationActor"); // Audit Log (#23) — central singleton mirrors the Notification Outbox // pattern. The IngestAuditEvents gRPC handler lives on SiteStreamGrpcServer // (Communication.Grpc); a central node hosting that server (M6 reconciliation // path) hands the proxy in via SetAuditIngestActor below. When the gRPC // server is not registered (current central topology), the host still // brings the singleton up so a Bundle H in-process test (or a future // direct caller) can Ask the proxy without further wiring. // IAuditLogRepository is a SCOPED EF Core service, so the singleton // actor takes the root IServiceProvider and creates a fresh scope per // message (mirroring NotificationOutboxActor). Pre-resolving the // repository here would attempt to take a scoped service from the // root and fail under DI scope validation. var auditIngestLogger = _serviceProvider.GetRequiredService() .CreateLogger(); var auditIngestSingletonProps = ClusterSingletonManager.Props( singletonProps: Props.Create(() => new ZB.MOM.WW.ScadaBridge.AuditLog.Central.AuditLogIngestActor( _serviceProvider, auditIngestLogger)), terminationMessage: PoisonPill.Instance, settings: ClusterSingletonManagerSettings.Create(_actorSystem!) .WithSingletonName("audit-log-ingest")); _actorSystem!.ActorOf(auditIngestSingletonProps, "audit-log-ingest-singleton"); var auditIngestProxyProps = ClusterSingletonProxy.Props( singletonManagerPath: "/user/audit-log-ingest-singleton", settings: ClusterSingletonProxySettings.Create(_actorSystem) .WithSingletonName("audit-log-ingest")); var auditIngestProxy = _actorSystem.ActorOf(auditIngestProxyProps, "audit-log-ingest-proxy"); // Hand the audit-ingest proxy to the CentralCommunicationActor so audit // ingest commands forwarded by sites over ClusterClient are routed to the // singleton. Mirrors the RegisterNotificationOutbox wiring above. centralCommActor.Tell(new RegisterAuditIngest(auditIngestProxy)); // Hand the proxy to the SiteStreamGrpcServer (if registered on this node) // so the IngestAuditEvents RPC routes incoming site batches to the singleton. // The gRPC server is currently only registered on Site nodes; on a central // node this resolves to null and the wiring is a no-op until M6 (which // brings central-hosted gRPC + a real site→central client). var grpcServer = _serviceProvider.GetService(); grpcServer?.SetAuditIngestActor(auditIngestProxy); _logger.LogInformation( "AuditLogIngestActor singleton created (gRPC server bound: {GrpcBound})", grpcServer is not null); // Audit Log (#23) M6 Bundle E (T7): subscribe the per-site stalled // telemetry tracker to the actor system EventStream NOW that the // system exists. The tracker mirrors every // SiteAuditTelemetryStalledChanged publication (from // SiteAuditReconciliationActor — wired in a later bundle) into the // AuditCentralHealthSnapshot singleton so the central health surface // sees per-site stalled state. The tracker is constructed here rather // than in AddAuditLogCentralMaintenance because its ctor needs an // ActorSystem, which is not a DI-resolvable singleton — it's owned // by this hosted service. The snapshot singleton is resolvable; // passing it in seeds the tracker's Apply() so both internal state // and the snapshot stay in lock-step. var auditCentralSnapshot = _serviceProvider .GetService(); if (auditCentralSnapshot is not null) { var stalledTracker = new ZB.MOM.WW.ScadaBridge.AuditLog.Central.SiteAuditTelemetryStalledTracker( _actorSystem!, auditCentralSnapshot); lock (_trackedDisposables) { _trackedDisposables.Add(stalledTracker); } _logger.LogInformation("SiteAuditTelemetryStalledTracker subscribed to EventStream"); } // Site Call Audit (#22) — central singleton mirrors the AuditLogIngest // and NotificationOutbox patterns. M3's dual-write transaction routes // SiteCalls upserts through AuditLogIngestActor's own scope-per-message // ISiteCallAuditRepository resolution, so this singleton is not on the // M3 happy-path hot path; it exists so direct-write callers Ask through // a stable cluster proxy without further wiring. The central→site // Retry/Discard relay now lives in this actor (see the // RegisterCentralCommunication wiring below); the reconciliation puller // is the remaining deferred direct-write caller. // Like AuditLogIngestActor, the actor takes the root IServiceProvider // and creates a fresh scope per message because ISiteCallAuditRepository // is a scoped EF Core service. var siteCallAuditLogger = _serviceProvider.GetRequiredService() .CreateLogger(); var siteCallAuditOptions = _serviceProvider .GetRequiredService>().Value; var siteCallAuditSingletonProps = ClusterSingletonManager.Props( singletonProps: Props.Create(() => new ZB.MOM.WW.ScadaBridge.SiteCallAudit.SiteCallAuditActor( _serviceProvider, siteCallAuditOptions, siteCallAuditLogger)), terminationMessage: PoisonPill.Instance, settings: ClusterSingletonManagerSettings.Create(_actorSystem!) .WithSingletonName("site-call-audit")); var siteCallAuditSingletonManager = _actorSystem!.ActorOf(siteCallAuditSingletonProps, "site-call-audit-singleton"); // SiteCallAudit-002 graceful-handover hook. The default singleton handover // path waits for the actor's `ReceiveAsync` task to complete before // signalling `HandOverDone` to the new oldest node — so an in-flight // EF `UpsertAsync` IS waited for during a *clean* coordinated shutdown // (the cluster-leave phase below fires before the singleton terminates). // The risk the finding tracks is the seam between in-flight async work // and the cluster-leave + singleton-stop sequence: we bound it by // issuing an explicit `GracefulStop` to the singleton manager early // in `cluster-leave`, with a timeout that lets the running upsert + SQL // round-trip drain before the handover-to-other-node race window // opens. The timeout is bounded so a misbehaving upsert cannot stall // coordinated shutdown indefinitely — exceeding it falls through to // the existing PoisonPill termination path. Same pattern is suitable // for the NotificationOutbox singleton; not added here to keep this // change minimal (out of NS-020's scope). var siteCallAuditShutdown = Akka.Actor.CoordinatedShutdown.Get(_actorSystem); siteCallAuditShutdown.AddTask( Akka.Actor.CoordinatedShutdown.PhaseClusterLeave, "drain-site-call-audit-singleton", async () => { try { await siteCallAuditSingletonManager.GracefulStop(TimeSpan.FromSeconds(10)); } catch (Exception ex) { _logger.LogWarning(ex, "SiteCallAudit singleton did not drain within the graceful-stop " + "timeout; falling through to PoisonPill handover"); } return Akka.Done.Instance; }); var siteCallAuditProxyProps = ClusterSingletonProxy.Props( singletonManagerPath: "/user/site-call-audit-singleton", settings: ClusterSingletonProxySettings.Create(_actorSystem) .WithSingletonName("site-call-audit")); var siteCallAuditProxy = _actorSystem.ActorOf(siteCallAuditProxyProps, "site-call-audit-proxy"); // Hand the proxy to the CommunicationService so the Central UI can Ask // the Site Call Audit actor directly (query, KPIs, detail) — mirrors the // SetNotificationOutbox wiring above. commService?.SetSiteCallAudit(siteCallAuditProxy); // Task 5 (#22): hand the CentralCommunicationActor to the SiteCallAudit // actor so it can relay operator Retry/Discard on parked cached calls to // the owning site (over the per-site ClusterClient via SiteEnvelope). // Mirrors the RegisterAuditIngest / RegisterNotificationOutbox wiring; // the message is sent to the singleton proxy so it reaches whichever // central node currently hosts the singleton. siteCallAuditProxy.Tell( new ZB.MOM.WW.ScadaBridge.SiteCallAudit.RegisterCentralCommunication(centralCommActor)); _logger.LogInformation( "SiteCallAuditActor singleton created and registered with CentralCommunicationActor"); _logger.LogInformation("Central actors registered. CentralCommunicationActor created."); } /// /// Registers site-specific actors including the Deployment Manager cluster singleton /// and the SiteCommunicationActor. /// The singleton is scoped to the site-specific cluster role so it runs on exactly /// one node within this site's cluster. /// private async Task RegisterSiteActorsAsync(CancellationToken cancellationToken) { var siteRole = $"site-{_nodeOptions.SiteId}"; var storage = _serviceProvider.GetRequiredService(); var compilationService = _serviceProvider.GetRequiredService(); var sharedScriptLibrary = _serviceProvider.GetRequiredService(); var streamManager = _serviceProvider.GetRequiredService(); streamManager.Initialize(_actorSystem!); var siteRuntimeOptionsValue = _serviceProvider.GetService>()?.Value ?? new SiteRuntimeOptions(); var dmLogger = _serviceProvider.GetRequiredService() .CreateLogger(); // WP-34: Create DCL Manager Actor for tag subscriptions var dclFactory = _serviceProvider.GetService(); var dclOptions = _serviceProvider.GetService>()?.Value ?? new ZB.MOM.WW.ScadaBridge.DataConnectionLayer.DataConnectionOptions(); IActorRef? dclManager = null; if (dclFactory != null) { var healthCollector = _serviceProvider.GetRequiredService(); var siteEventLogger = _serviceProvider.GetService(); dclManager = _actorSystem!.ActorOf( Props.Create(() => new ZB.MOM.WW.ScadaBridge.DataConnectionLayer.Actors.DataConnectionManagerActor( dclFactory, dclOptions, healthCollector, siteEventLogger)), "dcl-manager"); _logger.LogInformation("Data Connection Layer manager actor created"); } // Resolve the health collector for the Deployment Manager var siteHealthCollector = _serviceProvider.GetService(); siteHealthCollector?.SetNodeHostname(_nodeOptions.NodeHostname); // Create SiteReplicationActor on every node (not a singleton) var sfStorage = _serviceProvider.GetRequiredService(); var replicationService = _serviceProvider.GetRequiredService(); var replicationLogger = _serviceProvider.GetRequiredService() .CreateLogger(); var replicationActor = _actorSystem!.ActorOf( Props.Create(() => new SiteReplicationActor( storage, sfStorage, replicationService, siteRole, replicationLogger)), "site-replication"); // Wire S&F replication handler to forward operations via the replication actor replicationService.SetReplicationHandler(op => { replicationActor.Tell(new ReplicateStoreAndForward(op)); return Task.CompletedTask; }); _logger.LogInformation("SiteReplicationActor created and S&F replication handler wired"); // Create the Deployment Manager as a cluster singleton var singletonProps = ClusterSingletonManager.Props( singletonProps: Props.Create(() => new DeploymentManagerActor( storage, compilationService, sharedScriptLibrary, streamManager, siteRuntimeOptionsValue, dmLogger, dclManager, replicationActor, siteHealthCollector, _serviceProvider)), terminationMessage: PoisonPill.Instance, settings: ClusterSingletonManagerSettings.Create(_actorSystem!) .WithRole(siteRole) .WithSingletonName("deployment-manager")); _actorSystem!.ActorOf(singletonProps, "deployment-manager-singleton"); // Create a proxy for other actors to communicate with the singleton var proxyProps = ClusterSingletonProxy.Props( singletonManagerPath: "/user/deployment-manager-singleton", settings: ClusterSingletonProxySettings.Create(_actorSystem) .WithRole(siteRole) .WithSingletonName("deployment-manager")); var dmProxy = _actorSystem.ActorOf(proxyProps, "deployment-manager-proxy"); // WP-4: Create SiteCommunicationActor for receiving messages from central var siteCommActor = _actorSystem.ActorOf( Props.Create(() => new SiteCommunicationActor( _nodeOptions.SiteId!, _communicationOptions, dmProxy)), "site-communication"); // Register local handlers with SiteCommunicationActor siteCommActor.Tell(new RegisterLocalHandler(LocalHandlerType.Artifacts, dmProxy)); // Event log handler — cluster singleton so queries always reach the // active node. The event log is node-local SQLite and is not // replicated; only the active node records events. A per-node handler // would let a ClusterClient query land on the standby and find nothing. var eventLogQueryService = _serviceProvider.GetService(); if (eventLogQueryService != null) { var eventLogSingletonProps = ClusterSingletonManager.Props( singletonProps: Props.Create(() => new SiteEventLogging.EventLogHandlerActor(eventLogQueryService)), terminationMessage: PoisonPill.Instance, settings: ClusterSingletonManagerSettings.Create(_actorSystem) .WithRole(siteRole) .WithSingletonName("event-log-handler")); _actorSystem.ActorOf(eventLogSingletonProps, "event-log-handler-singleton"); var eventLogProxyProps = ClusterSingletonProxy.Props( singletonManagerPath: "/user/event-log-handler-singleton", settings: ClusterSingletonProxySettings.Create(_actorSystem) .WithRole(siteRole) .WithSingletonName("event-log-handler")); var eventLogProxy = _actorSystem.ActorOf(eventLogProxyProps, "event-log-handler-proxy"); siteCommActor.Tell(new RegisterLocalHandler(LocalHandlerType.EventLog, eventLogProxy)); } // Parked message handler — bridges Akka to StoreAndForwardService var storeAndForwardService = _serviceProvider.GetService(); if (storeAndForwardService != null) { // Initialize SQLite schema and start the retry timer. Must complete before // any actor or HTTP handler touches the service. Host-005: awaited rather // than blocked via GetAwaiter().GetResult() — no thread-pool starvation / // sync-context deadlock risk, and exceptions surface as their original type. cancellationToken.ThrowIfCancellationRequested(); await storeAndForwardService.StartAsync(); // Register the store-and-forward delivery handlers so buffered // ExternalSystem calls, cached DB writes and notifications are actually // delivered by the retry sweep. Without this, every buffered message is // persisted but never delivered. Each handler resolves its scoped consumer // service in a fresh DI scope — the sweep runs on a timer thread, outside // any request scope. storeAndForwardService.RegisterDeliveryHandler( ZB.MOM.WW.ScadaBridge.Commons.Types.Enums.StoreAndForwardCategory.ExternalSystem, async msg => { using var scope = _serviceProvider.CreateScope(); return await scope.ServiceProvider .GetRequiredService() .DeliverBufferedAsync(msg); }); storeAndForwardService.RegisterDeliveryHandler( ZB.MOM.WW.ScadaBridge.Commons.Types.Enums.StoreAndForwardCategory.CachedDbWrite, async msg => { using var scope = _serviceProvider.CreateScope(); return await scope.ServiceProvider .GetRequiredService() .DeliverBufferedAsync(msg); }); // Notification Outbox: a buffered notification is no longer delivered by // the site over SMTP. "Delivering" it means forwarding it to the central // cluster via the SiteCommunicationActor and treating central's // NotificationSubmitAck as the outcome (accepted → delivered; not accepted // or timeout → throw → transient → keep buffering). Central owns SMTP. // // NotificationService-020: register exactly once. The sentinel guard // catches a second registration path that re-introduces the dead // NS-001 site-SMTP handler — see the sentinel's XML doc above for the // historical context. Throwing here is intentional: a silent overwrite // by a future maintainer would invert the design back to site-side // delivery (NotificationForwarder vs. NotificationDeliveryService). if (_notificationDeliveryHandlerRegistered) { throw new InvalidOperationException( "NotificationService-020: A Notification-category store-and-forward " + "delivery handler was already registered. The canonical handler is " + "NotificationForwarder (central-only delivery, post-redesign). " + "If you are re-introducing a second registration path, remove the " + "first one — RegisterDeliveryHandler is last-write-wins per category " + "and a duplicate inverts the design."); } var notificationForwarder = new ZB.MOM.WW.ScadaBridge.StoreAndForward.NotificationForwarder( siteCommActor, _nodeOptions.SiteId!, _communicationOptions.NotificationForwardTimeout); storeAndForwardService.RegisterDeliveryHandler( ZB.MOM.WW.ScadaBridge.Commons.Types.Enums.StoreAndForwardCategory.Notification, notificationForwarder.DeliverAsync); _notificationDeliveryHandlerRegistered = true; _logger.LogInformation( "Store-and-forward delivery handlers registered (ExternalSystem, CachedDbWrite, Notification)"); var parkedMessageHandler = _actorSystem.ActorOf( Props.Create(() => new ParkedMessageHandlerActor( storeAndForwardService, _nodeOptions.SiteId!)), "parked-message-handler"); siteCommActor.Tell(new RegisterLocalHandler(LocalHandlerType.ParkedMessages, parkedMessageHandler)); } // Register SiteCommunicationActor with ClusterClientReceptionist so central ClusterClients can reach it ClusterClientReceptionist.Get(_actorSystem).RegisterService(siteCommActor); _logger.LogInformation( "Site actors registered. DeploymentManager singleton scoped to role={SiteRole}, SiteCommunicationActor created.", siteRole); // Create ClusterClient to central if contact points are configured if (_communicationOptions.CentralContactPoints.Count > 0) { var contacts = _communicationOptions.CentralContactPoints .Select(cp => ActorPath.Parse($"{cp}/system/receptionist")) .ToImmutableHashSet(); var clientSettings = ClusterClientSettings.Create(_actorSystem) .WithInitialContacts(contacts); var centralClient = _actorSystem.ActorOf( ClusterClient.Props(clientSettings), "central-cluster-client"); var siteCommSelection = _actorSystem.ActorSelection("/user/site-communication"); siteCommSelection.Tell(new RegisterCentralClient(centralClient)); _logger.LogInformation( "Created ClusterClient to central with {Count} contact point(s) for site {SiteId}", contacts.Count, _nodeOptions.SiteId); } // Audit Log (#23) — site-side telemetry actor that drains the SQLite // Pending queue and pushes to central via IngestAuditEvents. Not a // cluster singleton: each site is its own cluster, and the actor reads // node-local SQLite (no replication). The Props are bound to the // dedicated audit-telemetry-dispatcher (defined in BuildHocon) so a // batch SQLite read + gRPC push never contend with the default // dispatcher used by hot-path actors. // // Per Bundle E's brief: the SiteAuditTelemetryActor takes its // collaborators through its constructor, so we resolve them from DI // and pass them in via Props.Create rather than relying on a future // FactoryProvider. The real site→central client is constructed and // wired immediately below: a ClusterClientSiteAuditClient (ClusterClient // transport, not gRPC) replaces the DI-default NoOpSiteStreamAuditClient // for site roles, without disturbing the rest of this wiring. var siteAuditOptions = _serviceProvider .GetRequiredService>(); var siteAuditQueue = _serviceProvider .GetRequiredService(); // Audit Log (#23) Task 2 follow-up: the production site→central audit // push uses the ClusterClient transport via the SiteCommunicationActor, // not the DI-resolved NoOpSiteStreamAuditClient. The NoOp default stays // correct for central/test composition roots (no SiteCommunicationActor); // a site role wires the real ClusterClient-based client here so the // SQLite Pending backlog actually drains to central. The forward Ask // reuses NotificationForwardTimeout — the same site→central command // forward bound notifications already use over this transport. ZB.MOM.WW.ScadaBridge.AuditLog.Site.Telemetry.ISiteStreamAuditClient siteAuditClient = new ZB.MOM.WW.ScadaBridge.AuditLog.Site.Telemetry.ClusterClientSiteAuditClient( siteCommActor, _communicationOptions.NotificationForwardTimeout); var siteAuditLogger = _serviceProvider.GetRequiredService() .CreateLogger(); // AuditLog-001: resolve the site-local operation tracking store so the // actor can run the combined-telemetry cached-drain in parallel with // the audit-only drain. The store is registered by AddSiteRuntime on // site composition roots; on central it is intentionally absent and // the cached-drain scheduler is never armed (the central side has no // outbound cached calls to track). GetService — null when not // registered — matches the optional-param contract on the actor ctor. var siteTrackingStore = _serviceProvider .GetService(); var siteAuditTelemetryProps = Props.Create(() => new ZB.MOM.WW.ScadaBridge.AuditLog.Site.Telemetry.SiteAuditTelemetryActor( siteAuditQueue, siteAuditClient, siteAuditOptions, siteAuditLogger, siteTrackingStore)) .WithDispatcher("audit-telemetry-dispatcher"); _actorSystem.ActorOf(siteAuditTelemetryProps, "site-audit-telemetry"); _logger.LogInformation( "SiteAuditTelemetryActor created (dispatcher=audit-telemetry-dispatcher, client={ClientType}, cachedDrain={CachedDrainEnabled})", siteAuditClient.GetType().Name, siteTrackingStore is not null); // Gate gRPC subscriptions until the actor system and SiteStreamManager are // initialized (REQ-HOST-7). // // Host-009: SetReady asserts a deliberately narrow contract. By this point the // actor system exists, SiteStreamManager.Initialize has run, and every // role actor (SiteCommunicationActor, deployment-manager singleton, // SiteReplicationActor, the ClusterClient) has been created with ActorOf — // creation and the registration Tells are synchronous and strictly ordered. // What is NOT guaranteed is completion of each actor's PreStart or the // ClusterClient's initial-contact handshake with central: those are // intentionally asynchronous. Gating readiness on the central handshake would // be wrong — a site must come up and stream locally even while central is // briefly unreachable. gRPC readiness therefore guarantees "the site actor // graph exists and can accept subscription streams", not "the cluster // handshake has completed". Streams opened before SetReady are already // rejected by SiteStreamGrpcServer with StatusCode.Unavailable. var grpcServer = _serviceProvider.GetService(); // Audit Log (#23 M6): hand the site-local SqliteAuditWriter (which // implements ISiteAuditQueue) to the gRPC server so the PullAuditEvents // reconciliation RPC can serve central's pulls. Both the writer and the // gRPC server are singletons — wiring this here keeps the dependency // direction one-way (Host knows both; Communication doesn't reach back // into AuditLog). grpcServer?.SetSiteAuditQueue(siteAuditQueue); grpcServer?.SetReady(_actorSystem!); } }