Files
lmxopcua/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/ServiceCollectionExtensions.cs
T
Joseph Doherty 4203b84d51 feat(runtime): publish DriverHealthChanged via DriverInstanceActor
- IDriverHealthPublisher in Core.Abstractions + NullDriverHealthPublisher
  no-op for tests/dev-stub paths.
- AkkaDriverHealthPublisher in Runtime forwards to the cluster-wide
  `driver-health` DPS topic.
- DriverInstanceActor instrumented to publish snapshots on every
  observable state change + a periodic 30s heartbeat so the AdminUI
  snapshot store warms up for newly-joined SignalR clients.
- Sliding 5-minute Faulted-count tracked per actor via Queue<DateTime>.
- DriverHostActor.SpawnChild threads clusterId (_localNode.Value) and
  the health publisher down to every DriverInstanceActor child.
- ServiceCollectionExtensions.AddOtOpcUaRuntime registers
  AkkaDriverHealthPublisher as IDriverHealthPublisher singleton.
2026-05-28 10:22:44 -04:00

142 lines
7.2 KiB
C#

using Akka.Actor;
using Akka.Hosting;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using ZB.MOM.WW.OtOpcUa.Commons.Interfaces;
using ZB.MOM.WW.OtOpcUa.Commons.OpcUa;
using ZB.MOM.WW.OtOpcUa.Configuration;
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
using ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian;
using ZB.MOM.WW.OtOpcUa.OpcUaServer;
using ZB.MOM.WW.OtOpcUa.Runtime.Drivers;
using ZB.MOM.WW.OtOpcUa.Runtime.Health;
using ZB.MOM.WW.OtOpcUa.Runtime.Historian;
using ZB.MOM.WW.OtOpcUa.Runtime.OpcUa;
using ZB.MOM.WW.OtOpcUa.Runtime.VirtualTags;
namespace ZB.MOM.WW.OtOpcUa.Runtime;
public static class ServiceCollectionExtensions
{
public const string DriverRole = "driver";
public const string DriverHostActorName = "driver-host";
public const string DbHealthProbeActorName = "db-health";
public const string HistorianAdapterActorName = "historian-adapter";
public const string DependencyMuxActorName = "dependency-mux";
public const string OpcUaPublishActorName = "opcua-publish";
/// <summary>
/// Registers shared runtime services. Currently binds <see cref="IAlarmHistorianSink"/>
/// to <see cref="NullAlarmHistorianSink"/> as the default; production deployments
/// override this with <c>SqliteStoreAndForwardSink</c> wrapping <c>WonderwareHistorianClient</c>.
/// Call this BEFORE <c>AddAkka</c>.
/// </summary>
/// <param name="services">The service collection to register with.</param>
public static IServiceCollection AddOtOpcUaRuntime(this IServiceCollection services)
{
services.TryAddSingleton<IAlarmHistorianSink>(NullAlarmHistorianSink.Instance);
services.TryAddSingleton<IDriverFactory>(NullDriverFactory.Instance);
services.TryAddSingleton<IOpcUaAddressSpaceSink>(NullOpcUaAddressSpaceSink.Instance);
services.TryAddSingleton<IServiceLevelPublisher>(NullServiceLevelPublisher.Instance);
services.TryAddSingleton<IDriverHealthPublisher, AkkaDriverHealthPublisher>();
return services;
}
/// <summary>
/// Spawns the per-node driver-role actors on the host's <see cref="ActorSystem"/>:
/// <see cref="DriverHostActor"/> (one per node), <see cref="DbHealthProbeActor"/>
/// (consumed by the health endpoint + redundancy calc), and
/// <see cref="HistorianAdapterActor"/> wrapping the registered <see cref="IAlarmHistorianSink"/>.
///
/// Mirror of <c>WithOtOpcUaControlPlaneSingletons</c> for the driver role. Both must
/// be registered on the same <see cref="AkkaConfigurationBuilder"/> as the cluster
/// bootstrap so the actors share the host's ActorSystem.
///
/// Wire from the fused Host's Program.cs when the node carries the <c>driver</c> role:
/// <code>
/// services.AddOtOpcUaRuntime();
/// services.AddAkka("otopcua", (ab, sp) => { ab.WithOtOpcUaClusterBootstrap(sp); if (hasDriver) ab.WithOtOpcUaRuntimeActors(); });
/// </code>
/// </summary>
/// <param name="builder">The Akka configuration builder.</param>
public static AkkaConfigurationBuilder WithOtOpcUaRuntimeActors(this AkkaConfigurationBuilder builder)
{
// Production cluster HOCON (akka.conf) carries this dispatcher block, but consumers that
// bootstrap their own HOCON (e.g. ServiceCollectionExtensionsTests) wouldn't pick it up
// — OpcUaPublishActor.Props pins itself to opcua-synchronized-dispatcher and Akka throws
// ConfigurationException if it doesn't exist. Prepend a fallback so the runtime extension
// is self-contained.
builder.AddHocon(@"
opcua-synchronized-dispatcher {
type = ""PinnedDispatcher""
executor = ""thread-pool-executor""
throughput = 1
}
", HoconAddMode.Prepend);
builder.WithActors((system, registry, resolver) =>
{
var dbFactory = resolver.GetService<IDbContextFactory<OtOpcUaConfigDbContext>>();
var roleInfo = resolver.GetService<IClusterRoleInfo>();
// Fallback to Null* if AddOtOpcUaRuntime wasn't called (e.g., test harnesses).
var historianSink = resolver.GetService<IAlarmHistorianSink>() ?? NullAlarmHistorianSink.Instance;
var driverFactory = resolver.GetService<IDriverFactory>() ?? NullDriverFactory.Instance;
var addressSpaceSink = resolver.GetService<IOpcUaAddressSpaceSink>() ?? NullOpcUaAddressSpaceSink.Instance;
var serviceLevel = resolver.GetService<IServiceLevelPublisher>() ?? NullServiceLevelPublisher.Instance;
var loggerFactory = resolver.GetService<ILoggerFactory>() ?? NullLoggerFactory.Instance;
var healthPublisher = resolver.GetService<IDriverHealthPublisher>() ?? NullDriverHealthPublisher.Instance;
var dbHealth = system.ActorOf(
DbHealthProbeActor.Props(dbFactory),
DbHealthProbeActorName);
registry.Register<DbHealthProbeActorKey>(dbHealth);
// Dependency mux must be spawned before DriverHostActor so the host can forward
// AttributeValuePublished into it from the very first driver spawn.
var mux = system.ActorOf(DependencyMuxActor.Props(), DependencyMuxActorName);
registry.Register<DependencyMuxActorKey>(mux);
// OPC UA publish actor — pinned dispatcher, owns the address-space side of the
// pipeline. Phase7Applier is constructed here so the actor + applier share the
// same sink reference (when DeferredAddressSpaceSink swaps later, both see it).
var applier = new Phase7Applier(addressSpaceSink, loggerFactory.CreateLogger<Phase7Applier>());
var publishActor = system.ActorOf(
OpcUaPublishActor.Props(
sink: addressSpaceSink,
serviceLevel: serviceLevel,
localNode: roleInfo.LocalNode,
dbFactory: dbFactory,
applier: applier),
OpcUaPublishActorName);
registry.Register<OpcUaPublishActorKey>(publishActor);
var driverHost = system.ActorOf(
DriverHostActor.Props(dbFactory, roleInfo.LocalNode, coordinator: null,
driverFactory: driverFactory, localRoles: roleInfo.LocalRoles,
dependencyMux: mux,
opcUaPublishActor: publishActor,
healthPublisher: healthPublisher),
DriverHostActorName);
registry.Register<DriverHostActorKey>(driverHost);
var historian = system.ActorOf(
HistorianAdapterActor.Props(historianSink),
HistorianAdapterActorName);
registry.Register<HistorianAdapterActorKey>(historian);
});
return builder;
}
}
/// <summary>Marker key types used by <c>Akka.Hosting</c> to resolve runtime actors from the registry.</summary>
public sealed class DriverHostActorKey { }
public sealed class DbHealthProbeActorKey { }
public sealed class HistorianAdapterActorKey { }
public sealed class DependencyMuxActorKey { }
public sealed class OpcUaPublishActorKey { }