using Akka.Actor;
using Akka.Cluster.Tools.PublishSubscribe;
using Akka.Event;
using Microsoft.EntityFrameworkCore;
using ZB.MOM.WW.OtOpcUa.Commons.Messages.Redundancy;
using ZB.MOM.WW.OtOpcUa.Commons.Observability;
using ZB.MOM.WW.OtOpcUa.Commons.OpcUa;
using ZB.MOM.WW.OtOpcUa.Commons.Types;
using ZB.MOM.WW.OtOpcUa.Configuration;
using ZB.MOM.WW.OtOpcUa.OpcUaServer;
using ZB.MOM.WW.OtOpcUa.Runtime.Drivers;
namespace ZB.MOM.WW.OtOpcUa.Runtime.OpcUa;
///
/// Single-threaded bridge between Akka messages and the OPC UA SDK address space. Hosted on
/// the pinned opcua-synchronized-dispatcher (Task 19 HOCON) so the OPC UA SDK sees
/// only one thread per actor instance — its session/subscription locks expect strict
/// single-threaded access.
///
/// Address-space writes route through ; ServiceLevel
/// writes route through . Production binds SDK-backed
/// implementations; dev/Mac/tests bind the Null* defaults so the actor stays decoupled from
/// Opc.Ua.Server. The remaining piece is wiring those bindings to a real
/// StandardServer address space — tracked as F10b.
///
public sealed class OpcUaPublishActor : ReceiveActor
{
public const string DispatcherId = "opcua-synchronized-dispatcher";
public const string RedundancyStateTopic = "redundancy-state";
public sealed record AttributeValueUpdate(string NodeId, object? Value, OpcUaQuality Quality, DateTime TimestampUtc);
public sealed record AlarmStateUpdate(string AlarmNodeId, bool Active, bool Acknowledged, DateTime TimestampUtc);
///
/// Triggers an address-space rebuild. is the deployment
/// just applied by the host; the rebuild loads THAT artifact so materialisation matches the
/// applied config + the SubscribeBulk pass. It is null only for legacy/dev callers, which
/// fall back to the latest sealed deployment (lags a not-yet-sealed apply by one revision).
///
public sealed record RebuildAddressSpace(CorrelationId Correlation, DeploymentId? DeploymentId = null);
public sealed record ServiceLevelChanged(byte ServiceLevel);
private readonly IOpcUaAddressSpaceSink _sink;
private readonly IServiceLevelPublisher _serviceLevel;
private readonly bool _subscribeRedundancyTopic;
private readonly NodeId? _localNode;
private readonly IDbContextFactory? _dbFactory;
private readonly Phase7Applier? _applier;
private readonly ILoggingAdapter _log = Context.GetLogger();
private int _writes;
private byte _lastServiceLevel;
private Phase7CompositionResult _lastApplied = new(
Array.Empty(),
Array.Empty(),
Array.Empty(),
Array.Empty(),
Array.Empty(),
Array.Empty());
/// Gets the number of writes performed.
public int WriteCount => _writes;
/// Gets the last published service level.
public byte LastServiceLevel => _lastServiceLevel;
/// Production Props — pins the OPC UA dispatcher + subscribes to the
/// redundancy-state DPS topic so cluster transitions drive the local ServiceLevel
/// publish path. When + are supplied,
/// reads the latest deployment artifact + drives the
/// applier through the sink.
/// The OPC UA address space sink.
/// The service level publisher.
/// The local cluster node ID.
/// The optional database context factory.
/// The optional Phase 7 applier.
public static Props Props(
IOpcUaAddressSpaceSink? sink = null,
IServiceLevelPublisher? serviceLevel = null,
NodeId? localNode = null,
IDbContextFactory? dbFactory = null,
Phase7Applier? applier = null) =>
Akka.Actor.Props.Create(() => new OpcUaPublishActor(
sink ?? NullOpcUaAddressSpaceSink.Instance,
serviceLevel ?? NullServiceLevelPublisher.Instance,
subscribeRedundancyTopic: true,
localNode,
dbFactory,
applier)).WithDispatcher(DispatcherId);
/// Test-only Props that omits the pinned-dispatcher requirement and skips the
/// DPS subscribe so unit tests can spin up the actor on a vanilla TestKit cluster.
/// The OPC UA address space sink.
/// The service level publisher.
/// Whether to subscribe to the redundancy topic.
/// The local cluster node ID.
/// The optional database context factory.
/// The optional Phase 7 applier.
public static Props PropsForTests(
IOpcUaAddressSpaceSink? sink = null,
IServiceLevelPublisher? serviceLevel = null,
bool subscribeRedundancyTopic = false,
NodeId? localNode = null,
IDbContextFactory? dbFactory = null,
Phase7Applier? applier = null) =>
Akka.Actor.Props.Create(() => new OpcUaPublishActor(
sink ?? NullOpcUaAddressSpaceSink.Instance,
serviceLevel ?? NullServiceLevelPublisher.Instance,
subscribeRedundancyTopic,
localNode,
dbFactory,
applier));
/// Initializes a new instance of the class.
/// The OPC UA address space sink.
/// The service level publisher.
/// Whether to subscribe to the redundancy topic.
/// The local cluster node ID.
/// The optional database context factory.
/// The optional Phase 7 applier.
public OpcUaPublishActor(
IOpcUaAddressSpaceSink sink,
IServiceLevelPublisher serviceLevel,
bool subscribeRedundancyTopic,
NodeId? localNode,
IDbContextFactory? dbFactory = null,
Phase7Applier? applier = null)
{
_sink = sink;
_serviceLevel = serviceLevel;
_subscribeRedundancyTopic = subscribeRedundancyTopic;
_localNode = localNode;
_dbFactory = dbFactory;
_applier = applier;
Receive(HandleAttributeUpdate);
Receive(HandleAlarmUpdate);
Receive(HandleRebuild);
Receive(HandleServiceLevelChanged);
Receive(HandleRedundancyStateChanged);
Receive(_ => { /* PubSub ack */ });
}
///
protected override void PreStart()
{
if (_subscribeRedundancyTopic)
{
DistributedPubSub.Get(Context.System).Mediator.Tell(new Subscribe(RedundancyStateTopic, Self));
}
}
private void HandleAttributeUpdate(AttributeValueUpdate msg)
{
try
{
_sink.WriteValue(msg.NodeId, msg.Value, msg.Quality, msg.TimestampUtc);
Interlocked.Increment(ref _writes);
OtOpcUaTelemetry.OpcUaSinkWrite.Add(1, new KeyValuePair("kind", "value"));
}
catch (Exception ex)
{
_log.Warning(ex, "OpcUaPublish: sink.WriteValue threw for {Node}", msg.NodeId);
}
}
private void HandleAlarmUpdate(AlarmStateUpdate msg)
{
try
{
_sink.WriteAlarmState(msg.AlarmNodeId, msg.Active, msg.Acknowledged, msg.TimestampUtc);
Interlocked.Increment(ref _writes);
OtOpcUaTelemetry.OpcUaSinkWrite.Add(1, new KeyValuePair("kind", "alarm"));
}
catch (Exception ex)
{
_log.Warning(ex, "OpcUaPublish: sink.WriteAlarmState threw for {Node}", msg.AlarmNodeId);
}
}
private void HandleRebuild(RebuildAddressSpace msg)
{
using var span = OtOpcUaTelemetry.StartAddressSpaceRebuildSpan();
span?.SetTag("otopcua.correlation_id", msg.Correlation.ToString());
// Two modes: when dbFactory + applier are wired, do a real diff-and-apply pass against
// the latest deployment artifact. Without them, fall back to a raw sink rebuild — the
// F10b/dev path before the integration completes.
if (_dbFactory is null || _applier is null)
{
try
{
_sink.RebuildAddressSpace();
OtOpcUaTelemetry.OpcUaSinkWrite.Add(1, new KeyValuePair("kind", "rebuild"));
}
catch (Exception ex)
{
_log.Error(ex, "OpcUaPublish: sink.RebuildAddressSpace threw (correlation={Correlation})",
msg.Correlation);
}
return;
}
try
{
// Prefer the artifact of the deployment the host just applied — at apply time it is not
// yet Sealed, so LoadLatestArtifact would return the PREVIOUS revision and materialise a
// stale composition (variables that don't match the SubscribeBulk refs). Fall back to
// latest-sealed only for legacy callers that don't carry a DeploymentId.
var artifact = msg.DeploymentId is { } depId
? LoadArtifact(depId)
: LoadLatestArtifact();
var composition = DeploymentArtifact.ParseComposition(artifact);
var plan = Phase7Planner.Compute(_lastApplied, composition);
if (plan.IsEmpty)
{
_log.Debug("OpcUaPublish: rebuild requested but plan is empty (correlation={Correlation})",
msg.Correlation);
return;
}
var outcome = _applier.Apply(plan);
_lastApplied = composition;
// #85 — after the plan diff lands, rebuild the UNS folder hierarchy so OPC UA
// clients see Area/Line/Equipment as proper folders. Idempotent; Phase7Applier
// skips folders that already exist with the same node id.
_applier.MaterialiseHierarchy(composition);
// Galaxy / SystemPlatform tags get their own pass: ensures their FolderPath folder
// + Variable node exist so clients can browse them. The Galaxy driver fills values
// on a future SubscribeBulk pass; until then variables show BadWaitingForInitialData.
_applier.MaterialiseGalaxyTags(composition);
OtOpcUaTelemetry.OpcUaSinkWrite.Add(1, new KeyValuePair("kind", "rebuild"));
_log.Info("OpcUaPublish: applied rebuild (correlation={Correlation}, added={Added}, removed={Removed}, changed={Changed}, rebuild={Rebuild})",
msg.Correlation, outcome.AddedNodes, outcome.RemovedNodes, outcome.ChangedNodes, outcome.RebuildCalled);
}
catch (Exception ex)
{
_log.Error(ex, "OpcUaPublish: rebuild pipeline threw (correlation={Correlation})", msg.Correlation);
}
}
/// Read a specific deployment's artifact blob from ConfigDb (the one just applied,
/// which may not be Sealed yet). Empty array on any failure — parser treats it as "no composition".
private byte[] LoadArtifact(DeploymentId deploymentId)
{
try
{
using var db = _dbFactory!.CreateDbContext();
return db.Deployments.AsNoTracking()
.Where(d => d.DeploymentId == deploymentId.Value)
.Select(d => d.ArtifactBlob)
.FirstOrDefault() ?? Array.Empty();
}
catch (Exception ex)
{
_log.Warning(ex, "OpcUaPublish: failed to load artifact for deployment {Id}; rebuild becomes no-op", deploymentId);
return Array.Empty();
}
}
/// Read the most recent Sealed deployment's artifact blob from ConfigDb.
/// Empty array on any failure — the parser treats empty blob as "no composition".
private byte[] LoadLatestArtifact()
{
try
{
using var db = _dbFactory!.CreateDbContext();
return db.Deployments.AsNoTracking()
.Where(d => d.Status == Configuration.Enums.DeploymentStatus.Sealed)
.OrderByDescending(d => d.SealedAtUtc)
.Select(d => d.ArtifactBlob)
.FirstOrDefault() ?? Array.Empty();
}
catch (Exception ex)
{
_log.Warning(ex, "OpcUaPublish: failed to load latest deployment artifact; rebuild becomes no-op");
return Array.Empty();
}
}
private void HandleServiceLevelChanged(ServiceLevelChanged msg)
{
if (msg.ServiceLevel == _lastServiceLevel) return;
_lastServiceLevel = msg.ServiceLevel;
try
{
_serviceLevel.Publish(msg.ServiceLevel);
OtOpcUaTelemetry.ServiceLevelChange.Add(1,
new KeyValuePair("level", msg.ServiceLevel));
_log.Debug("OpcUaPublish: ServiceLevel={Level}", msg.ServiceLevel);
}
catch (Exception ex)
{
_log.Warning(ex, "OpcUaPublish: ServiceLevel publisher threw at level {Level}", msg.ServiceLevel);
}
}
///
/// Compute a coarse ServiceLevel from the cluster snapshot and forward to the
/// . This is a placeholder for F10b's full health
/// aggregation — for now we surface "primary-leader → 240, secondary → 100, detached → 0"
/// so the local SDK at least reflects role state. The full
/// path (with DB-reachable, OPC UA probe inputs) lives in RedundancyStateActor on
/// admin nodes; this driver-side mirror exists so each node's own SDK exposes a sensible
/// ServiceLevel without round-tripping back through the admin singleton.
///
private void HandleRedundancyStateChanged(RedundancyStateChanged msg)
{
if (_localNode is null) return;
var local = msg.Nodes.FirstOrDefault(n => n.NodeId == _localNode.Value);
if (local is null) return;
byte level = local.Role switch
{
RedundancyRole.Primary when local.IsRoleLeaderForDriver => 240,
RedundancyRole.Primary => 200,
RedundancyRole.Secondary => 100,
RedundancyRole.Detached => 0,
_ => 0,
};
Self.Tell(new ServiceLevelChanged(level));
}
}