Files
lmxopcua/src/Server/ZB.MOM.WW.OtOpcUa.Host/OpcUa/OtOpcUaServerHostedService.cs
T

201 lines
11 KiB
C#

using Akka.Actor;
using Akka.Cluster.Tools.PublishSubscribe;
using Akka.Hosting;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using ZB.MOM.WW.OtOpcUa.Commons.OpcUa;
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
using ZB.MOM.WW.OtOpcUa.OpcUaServer;
using ZB.MOM.WW.OtOpcUa.OpcUaServer.Security;
using ZB.MOM.WW.OtOpcUa.Runtime;
using ZB.MOM.WW.OtOpcUa.Runtime.Drivers;
using ZB.MOM.WW.OtOpcUa.Runtime.ScriptedAlarms;
namespace ZB.MOM.WW.OtOpcUa.Host.OpcUa;
/// <summary>
/// Owns the OPC UA SDK lifecycle on driver-role hosts. Reads
/// <see cref="OpcUaApplicationHostOptions"/> from the <c>OpcUa</c> config section, boots
/// an <see cref="OtOpcUaSdkServer"/> through <see cref="OpcUaApplicationHost"/>, then
/// swaps a real <see cref="SdkAddressSpaceSink"/> into the
/// <see cref="DeferredAddressSpaceSink"/> singleton so <c>OpcUaPublishActor</c>'s writes
/// start landing in the real address space.
///
/// Tests boot the OPC UA server directly via <see cref="OpcUaApplicationHost"/>; this
/// hosted service is the production wiring.
/// </summary>
public sealed class OtOpcUaServerHostedService : IHostedService, IAsyncDisposable
{
private readonly OpcUaApplicationHostOptions _options;
private readonly DeferredAddressSpaceSink _deferredSink;
private readonly DeferredServiceLevelPublisher _deferredServiceLevel;
private readonly IOpcUaUserAuthenticator _userAuthenticator;
private readonly IHistorianDataSource _historianDataSource;
private readonly Func<ActorSystem> _actorSystemAccessor;
private readonly ActorRegistry _actorRegistry;
private readonly ILoggerFactory _loggerFactory;
private readonly ILogger<OtOpcUaServerHostedService> _logger;
private OpcUaApplicationHost? _appHost;
private OtOpcUaSdkServer? _server;
/// <summary>
/// Initializes a new instance of the OtOpcUaServerHostedService class.
/// </summary>
/// <param name="options">The validated OPC UA host options (bound from the <c>OpcUa</c> section and validated at startup via <c>ValidateOnStart</c>).</param>
/// <param name="deferredSink">The deferred address space sink that receives the real sink once the server is ready.</param>
/// <param name="deferredServiceLevel">The deferred service level publisher that receives the real publisher once the server is ready.</param>
/// <param name="userAuthenticator">The OPC UA user authenticator.</param>
/// <param name="historianDataSource">The server-side HistoryRead backend resolved from DI — the
/// <c>NullHistorianDataSource</c> default seeded by <c>AddOtOpcUaRuntime</c> (which runs on this driver
/// node, the same source the address-space sink + node-write gateway come from), or the configured
/// Wonderware read client when <c>AddServerHistorian</c> enabled it. Wired onto the node manager in
/// <see cref="StartAsync"/>.</param>
/// <param name="actorSystemAccessor">Lazy accessor for the running <see cref="ActorSystem"/>, used to
/// resolve the DistributedPubSub mediator the inbound alarm-command router publishes through. Resolved
/// lazily (mirroring <c>DpsScriptLogPublisher</c>) so construction never races Akka startup.</param>
/// <param name="actorRegistry">The Akka.Hosting actor registry, used to resolve the local
/// <c>DriverHostActor</c> ref (<c>DriverHostActorKey</c>) the inbound node-write router Asks. Resolved
/// in <see cref="StartAsync"/> after the runtime actors have been registered.</param>
/// <param name="loggerFactory">The logger factory for creating loggers.</param>
public OtOpcUaServerHostedService(
IOptions<OpcUaApplicationHostOptions> options,
DeferredAddressSpaceSink deferredSink,
DeferredServiceLevelPublisher deferredServiceLevel,
IOpcUaUserAuthenticator userAuthenticator,
IHistorianDataSource historianDataSource,
Func<ActorSystem> actorSystemAccessor,
ActorRegistry actorRegistry,
ILoggerFactory loggerFactory)
{
_options = options.Value;
_deferredSink = deferredSink;
_deferredServiceLevel = deferredServiceLevel;
_userAuthenticator = userAuthenticator;
_historianDataSource = historianDataSource;
_actorSystemAccessor = actorSystemAccessor;
_actorRegistry = actorRegistry;
_loggerFactory = loggerFactory;
_logger = loggerFactory.CreateLogger<OtOpcUaServerHostedService>();
}
/// <summary>
/// Starts the OPC UA server asynchronously.
/// </summary>
/// <param name="cancellationToken">Cancellation token.</param>
public async Task StartAsync(CancellationToken cancellationToken)
{
_server = new OtOpcUaSdkServer();
_appHost = new OpcUaApplicationHost(
_options,
_loggerFactory.CreateLogger<OpcUaApplicationHost>(),
_userAuthenticator);
try
{
await _appHost.StartAsync(_server, cancellationToken).ConfigureAwait(false);
}
catch (Exception ex)
{
_logger.LogError(ex,
"OtOpcUaServerHostedService: SDK start failed; OpcUaPublishActor writes will continue to no-op");
// Don't rethrow — the rest of the host (admin UI, driver actors, etc.) can still boot.
// Operators see the failure via the logs + can correct config without a process bounce
// of the whole binary.
return;
}
if (_server.NodeManager is null)
{
_logger.LogWarning(
"OtOpcUaServerHostedService: SDK reported started but NodeManager is null; sink stays Null");
return;
}
_deferredSink.SetSink(new SdkAddressSpaceSink(_server.NodeManager));
// Wire the reverse-path inbound-alarm-command router: a client Acknowledge/Confirm/Shelve that
// passes the node manager's AlarmAck gate publishes the mapped AlarmCommand onto the cluster
// `alarm-commands` topic (same DistributedPubSub mediator the `alerts`/`script-logs` topics use).
// The Tell is fire-and-forget so the handler — which runs under the SDK's Lock — never blocks.
// The mediator is resolved per-publish via the lazy ActorSystem accessor so a transient cluster
// condition is tolerated and construction never raced Akka startup.
_server.SetAlarmCommandRouter(cmd =>
{
try
{
var mediator = DistributedPubSub.Get(_actorSystemAccessor()).Mediator;
mediator.Tell(new Publish(ScriptedAlarmHostActor.AlarmCommandsTopic, cmd));
}
catch (Exception ex)
{
// The router runs under the SDK Lock on a server thread; a cluster hiccup must not
// escape into the SDK's Call path. Log + drop — the client still gets Good for the
// node-state change; the missed command surfaces as a non-applied engine transition.
_logger.LogWarning(ex,
"OtOpcUaServerHostedService: failed to route inbound alarm command {Operation} for {AlarmId}",
cmd.Operation, cmd.AlarmId);
}
});
// Wire the reverse-path inbound operator-write gateway: a client write to a writable equipment-tag
// node that passes the node manager's WriteOperate gate routes the write to the owning driver child
// (RouteNodeWrite → NodeWriteResult) via the local DriverHostActor. The node manager calls the
// gateway's WriteAsync FIRE-AND-FORGET: the SDK's CustomNodeManager2.Write holds the node-manager
// Lock while invoking OnWriteValue, so a blocking Ask here would freeze ALL address-space operations
// (reads, subscription notifications, the publish path) for up to the Ask timeout. The gateway kicks
// off the Ask and resolves a NodeWriteOutcome; the node manager applies the client value optimistically
// and self-corrects (reverts to the pre-write value) when the device write comes back FAILED — but only
// while the node still holds the optimistic value, so a fresh driver poll is not clobbered. The
// DriverHostActor ref is resolved LAZILY per write (inside the gateway) — this hosted service's
// StartAsync runs before the Akka DriverHostActor registers, so a one-shot resolve here would always
// miss and leave every write unavailable. By write time (long after startup) the registry has it; a
// node that genuinely has no driver-host (admin-only, no writable driver nodes materialised) logs +
// resolves the write to "writes unavailable".
_server.SetNodeWriteGateway(new ActorNodeWriteGateway(
resolveDriverHost: () => _actorRegistry.TryGet<DriverHostActorKey>(out var driverHost) ? driverHost : null,
logger: _loggerFactory.CreateLogger<ActorNodeWriteGateway>()));
// Wire the server-side read backend resolved from DI — the NullHistorianDataSource default (when
// the ServerHistorian section is disabled) or the configured Wonderware read client (when enabled).
// The node manager's HistoryRead overrides block-bridge to whatever source is set here.
_server.SetHistorianDataSource(_historianDataSource);
// ServiceLevel publisher needs IServerInternal — only available after Start.
if (_server.CurrentInstance is { } serverInternal)
{
_deferredServiceLevel.SetInner(new SdkServiceLevelPublisher(
serverInternal,
_loggerFactory.CreateLogger<SdkServiceLevelPublisher>()));
}
_logger.LogInformation("OtOpcUaServerHostedService: SDK started, address-space + ServiceLevel sinks bound");
}
/// <summary>
/// Stops the OPC UA server asynchronously.
/// </summary>
/// <param name="cancellationToken">Cancellation token.</param>
public Task StopAsync(CancellationToken cancellationToken)
{
// Revert to Null adapters so any in-flight writes from a poison-pilled actor don't hit a
// half-disposed NodeManager.
_deferredSink.SetSink(null);
_deferredServiceLevel.SetInner(null);
// Restore the Null write gateway so a late client write doesn't Ask a stopping DriverHostActor.
_server?.SetNodeWriteGateway(null);
// Restore the Null historian so a late HistoryRead doesn't hit a disposed read client.
_server?.SetHistorianDataSource(null);
return Task.CompletedTask;
}
/// <summary>
/// Disposes the hosted service and its resources asynchronously.
/// </summary>
public async ValueTask DisposeAsync()
{
if (_appHost is not null) await _appHost.DisposeAsync();
}
}