Compare commits
4 Commits
9892ceae9a
...
c02f016f1d
| Author | SHA1 | Date | |
|---|---|---|---|
| c02f016f1d | |||
| a1325299ce | |||
| 14fb2b05ed | |||
| da141497f8 |
@@ -81,20 +81,20 @@
|
||||
{"id": "F4", "subject": "Follow-up: Harden AuditWriterActor.WrapDetails JSON synthesis with System.Text.Json", "status": "completed", "classification": "small", "estMinutes": 5, "parallelizableWith": ["F3"], "blockedBy": [], "commit": "f57f61d", "deviation": "Moot — F3 deleted WrapDetails entirely (EventId/CorrelationId now live in dedicated columns).", "origin": "Self-review of Task 33 — WrapDetails uses string concat; malformed caller DetailsJson would produce invalid JSON and trip the CK_ConfigAuditLog_DetailsJson_IsJson constraint, killing the entire flush batch. Discard this task if F3 lands first (F3 removes WrapDetails entirely)."},
|
||||
{"id": "F5", "subject": "Follow-up: ConfigPublishCoordinator multi-node happy-path test", "status": "completed", "classification": "standard", "estMinutes": 30, "parallelizableWith": [], "blockedBy": [], "commit": "5cfbe8b", "deviation": "Delivered by Task 59 — DeployHappyPathTests.StartDeployment_seals_after_both_nodes_apply exercises the exact 'dispatch to N driver nodes, all ack, seals' flow via the real 2-node TwoNodeClusterHarness rather than a multi-system TestKit. Cleaner because it tests the production code path end-to-end.", "origin": "Self-review of Task 30 — single-ActorSystem TestKit can't simulate the plan's 'dispatch to N driver nodes, all ack, seals' happy path because DiscoverDriverNodes() needs real cluster membership. Add a multi-system test (two ActorSystems joined into one cluster, driver-role on the second)."},
|
||||
{"id": "F6", "subject": "Follow-up: RedundancyStateActor publisher abstraction so tests don't need DPS bootstrap", "status": "completed", "classification": "small", "estMinutes": 10, "parallelizableWith": [], "blockedBy": [], "commit": "dfc143c", "origin": "Self-review of Task 35 — RedundancyStateActorTests are skipped because single-node DistributedPubSub bootstrap is unreliable in TestKit. Inject an Action<object> broadcast so tests can replace it with a probe; un-skip both tests."},
|
||||
{"id": "F7", "subject": "Follow-up: DriverInstanceActor full engine wiring (subscriptions, writes, ApplyDelta diff)", "status": "pending", "classification": "standard", "estMinutes": 45, "parallelizableWith": [], "blockedBy": [44], "origin": "Self-review of Task 41 — subscription publishing, ApplyDelta diffing, bad-quality-on-disconnect, write path, and supervisor backoff are stubbed. Wire after OpcUaPublishActor lands."},
|
||||
{"id": "F8", "subject": "Follow-up: VirtualTagActor engine wiring (compile expression, subscribe deps, publish result)", "status": "pending", "classification": "standard", "estMinutes": 30, "parallelizableWith": [], "blockedBy": [], "origin": "Self-review of Task 42 — VirtualTagEngine.Evaluate not called; DependencyValueChanged just buffers."},
|
||||
{"id": "F9", "subject": "Follow-up: ScriptedAlarmActor engine wiring + state persistence", "status": "pending", "classification": "standard", "estMinutes": 30, "parallelizableWith": [], "blockedBy": [], "origin": "Self-review of Task 43 — AlarmConditionService not called; PreRestart persistence to ScriptedAlarmState DB not wired; HistorianAdapter rows not emitted."},
|
||||
{"id": "F10", "subject": "Follow-up: OpcUaPublishActor SDK integration (address-space writes + ServiceLevel + RebuildAddressSpace)", "status": "pending", "classification": "high-risk", "estMinutes": 60, "parallelizableWith": [], "blockedBy": [47], "origin": "Self-review of Task 44 — SDK calls stubbed; counters only. Wire after Phase 7 OpcUaServer extraction."},
|
||||
{"id": "F7", "subject": "Follow-up: DriverInstanceActor full engine wiring (subscriptions, writes, ApplyDelta diff)", "status": "partial", "classification": "standard", "estMinutes": 45, "parallelizableWith": [], "blockedBy": [44], "origin": "Self-review of Task 41 — subscription publishing, ApplyDelta diffing, bad-quality-on-disconnect, write path, and supervisor backoff are stubbed. Wire after OpcUaPublishActor lands.", "shipped": "Spawn lifecycle in DriverHostActor: artifact parsing, DriverSpawnPlanner pure-diff (spawn/delta/stop), IDriverFactory abstraction in Core.Abstractions with NullDriverFactory + DriverFactoryRegistryAdapter, ApplyDelta forwarded to children. Subscription publishing + write path still stubbed — split into F7-sub (subscribe + write)."},
|
||||
{"id": "F8", "subject": "Follow-up: VirtualTagActor engine wiring (compile expression, subscribe deps, publish result)", "status": "partial", "classification": "standard", "estMinutes": 30, "parallelizableWith": [], "blockedBy": [], "origin": "Self-review of Task 42 — VirtualTagEngine.Evaluate not called; DependencyValueChanged just buffers.", "shipped": "IVirtualTagEvaluator seam in Commons.Engines + NullVirtualTagEvaluator default. VirtualTagActor calls evaluator on DependencyValueChanged, dedupes unchanged results, emits EvaluationResult to parent, publishes ScriptLogEntry Warning to script-logs DPS topic on evaluator failure. Production binding to Core.VirtualTags.VirtualTagEngine still TODO (compile + ITagUpstreamSource subscribe) — split as F8b."},
|
||||
{"id": "F9", "subject": "Follow-up: ScriptedAlarmActor engine wiring + state persistence", "status": "partial", "classification": "standard", "estMinutes": 30, "parallelizableWith": [], "blockedBy": [], "origin": "Self-review of Task 43 — AlarmConditionService not called; PreRestart persistence to ScriptedAlarmState DB not wired; HistorianAdapter rows not emitted.", "shipped": "IScriptedAlarmEvaluator seam in Commons.Engines + NullScriptedAlarmEvaluator default. ScriptedAlarmActor takes AlarmConfig (id/name/path/severity/predicate), calls evaluator on DependencyValueChanged, emits AlarmTransitionEvent on alerts DPS topic + ScriptLogEntry on script-logs at every transition (Activated/Acknowledged/Cleared with user attribution). Predicate binding to Core.ScriptedAlarms + ScriptedAlarmState DB persistence still TODO — split as F9b."},
|
||||
{"id": "F10", "subject": "Follow-up: OpcUaPublishActor SDK integration (address-space writes + ServiceLevel + RebuildAddressSpace)", "status": "partial", "classification": "high-risk", "estMinutes": 60, "parallelizableWith": [], "blockedBy": [47], "origin": "Self-review of Task 44 — SDK calls stubbed; counters only. Wire after Phase 7 OpcUaServer extraction.", "shipped": "IOpcUaAddressSpaceSink + IServiceLevelPublisher seams in Commons.OpcUa with Null* defaults. OpcUaPublishActor routes AttributeValueUpdate/AlarmStateUpdate/RebuildAddressSpace to the sink, dedupes ServiceLevelChanged, subscribes to redundancy-state DPS topic, and maps per-local-node redundancy snapshot to a coarse ServiceLevel (Primary+leader=240, Primary=200, Secondary=100, Detached=0). Production binding to a real SDK NodeManager + Variable nodes still TODO — split as F10b. Task 60 still blocked on F10b."},
|
||||
{"id": "F11", "subject": "Follow-up: HistorianAdapterActor named-pipe IPC + SqliteStoreAndForwardSink wiring", "status": "completed", "classification": "standard", "estMinutes": 30, "parallelizableWith": [], "blockedBy": [], "commit": "6861381", "deviationNotes": "Reshaped HistorianAdapterActor around the existing IAlarmHistorianSink abstraction (alarm-event shape, not the original tag-history-row stub). Defaults to NullAlarmHistorianSink; production deployments wire SqliteStoreAndForwardSink + WonderwareHistorianClient via AddOtOpcUaRuntime overrides. Actor now exposes GetStatus returning HistorianSinkStatus for diagnostics. Named-pipe transport implementation lives unchanged in src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client/WonderwareHistorianClient.cs — the actor is intentionally just a fire-and-forget bridge.", "origin": "Self-review of Task 45 — stub buffers in-memory; named-pipe + SQLite store-and-forward not wired."},
|
||||
{"id": "F12", "subject": "Follow-up: PeerOpcUaProbeActor real opc.tcp ping (replace Ok=true stub)", "status": "completed", "classification": "small", "estMinutes": 20, "parallelizableWith": [], "blockedBy": [], "commit": "b06e3ae", "deviation": "TCP-connect probe rather than full OPC UA Hello/Acknowledge handshake. Enough for the redundancy calc; deeper liveness signals can layer on later without changing the actor's contract.", "origin": "Self-review of Task 45 — RunProbe always returns Ok=true; replace with OPC UA Client connect."},
|
||||
{"id": "F13", "subject": "Follow-up: Full OpcUaApplicationHost extraction (security/alarms/history/observability)", "status": "partial", "classification": "high-risk", "estMinutes": 120, "parallelizableWith": [], "blockedBy": [], "commit": "36c4751-partial", "deviationNotes": "F13a (cert auto-creation) shipped in 36c4751. Remaining: endpoint-security wiring (SecurityProfileResolver into ServerConfiguration.SecurityPolicies), LDAP user-token validator (the OPC UA UserNameToken path; HTTP-layer LDAP auth is separate and already in OtOpcUa.Security), scripted-alarm node manager creation, history backend wiring, observability hooks (OpenTelemetry metrics + traces). These are gated by F10's OpcUaPublishActor SDK integration — until F10 lands, nothing instantiates OpcUaApplicationHost so the missing wiring is dead weight.", "origin": "Self-review of Task 46 — facade only boots ApplicationInstance + StandardServer. Legacy 391-line file pulls Server.Security/Alarms/History/Observability. Pull those into thin OpcUaServer interfaces."},
|
||||
{"id": "F14", "subject": "Follow-up: Migrate side-effecting Phase7Composer (EquipmentNodeWalker, trace logs, node cache)", "status": "pending", "classification": "standard", "estMinutes": 60, "parallelizableWith": [], "blockedBy": [], "origin": "Self-review of Task 47 — pure version covers the projection. Walker + alarm sink registration + cache mutation stay in legacy until split into Phase7Applier."},
|
||||
{"id": "F14", "subject": "Follow-up: Migrate side-effecting Phase7Composer (EquipmentNodeWalker, trace logs, node cache)", "status": "partial", "classification": "standard", "estMinutes": 60, "parallelizableWith": [], "blockedBy": [], "origin": "Self-review of Task 47 — pure version covers the projection. Walker + alarm sink registration + cache mutation stay in legacy until split into Phase7Applier.", "shipped": "Phase7Plan + Phase7Planner.Compute (pure diff over EquipmentNodes/DriverInstancePlans/ScriptedAlarmPlans by stable id, with Added/Removed/Changed lists). Phase7Applier consumes plan + IOpcUaAddressSpaceSink: drives RebuildAddressSpace on Equipment/Alarm topology change, writes inactive AlarmState for removed nodes, catches + logs sink faults. Driver-only changes correctly skip the rebuild (DriverHostActor's spawn-plan in Runtime handles those). Walker integration with the real SDK NodeManager is the remaining piece — split as F14b (consumes the existing EquipmentNodeWalker once F10b lands an SDK builder)."},
|
||||
{"id": "F15", "subject": "Follow-up: Migrate 47 legacy Admin Blazor components into AdminUI library", "status": "completed", "classification": "high-risk", "estMinutes": 180, "commit": "Phase A-D (read views) + F15.2 batches 1-4 (live-edit CRUD) + F15.3 (live alerts/script-log/CSV import/Monaco)", "deviationNotes": "All 4 phases of read-only views shipped: Phase A (shell/auth/fleet/hosts), B (cluster CRUD + Overview/Redundancy), C (Equipment/UNS/Namespaces/Drivers/Tags/ACLs), D (Audit/VirtualTags/ScriptedAlarms/Scripts/RoleGrants/Certificates/Reservations/AlarmsHistorian). Per Q1–Q5 of docs/v2/AdminUI-rebuild-plan.md: typed driver editors deferred, top-level VirtualTags/ScriptedAlarms kept (Q2 reversed for cross-cluster discoverability), routes-not-tabs adopted, fleet-wide LDAP→role map only, generic login errors. Live-edit forms (F15.2) and ScriptLog page (depends on F16 ScriptLogHub) are explicit follow-ups.", "parallelizableWith": [], "blockedBy": [], "origin": "Self-review of Task 48 — only MapAdminUI scaffold + 1 new page (Deployments). 47 pages stay in legacy Admin (accepted-broken) until Task 56."},
|
||||
{"id": "F16", "subject": "Follow-up: Bridge FleetStatusBroadcaster → SignalR hubs (FleetStatusHub / AlertHub / ScriptLogHub)", "status": "completed", "classification": "standard", "estMinutes": 30, "parallelizableWith": [], "blockedBy": [], "commit": "f18c285", "deviation": "FleetStatusHub bridge landed. AlertHub + ScriptLogHub deferred — they need upstream message contracts that aren't defined yet (alerts emerge from F9 ScriptedAlarmActor, script logs from F8 VirtualTagActor).", "origin": "Self-review of Task 49 — hubs are passive Hub subclasses; the bridge from FleetStatusBroadcaster.broadcast → IHubContext is not wired."},
|
||||
{"id": "F17", "subject": "Follow-up: FleetDiagnosticsClient real Akka ActorSelection round-trip (GetDiagnosticsRequest)", "status": "completed", "classification": "standard", "estMinutes": 30, "parallelizableWith": [], "blockedBy": [], "commit": "8f32b89", "origin": "Self-review of Task 51 — client returns an empty snapshot stub. Add GetDiagnosticsRequest contract + DriverHostActor handler + real Ask/Reply."},
|
||||
{"id": "F18", "subject": "Follow-up: Thread HttpContext.User.Identity.Name into Deployments page (createdBy)", "status": "completed", "classification": "small", "estMinutes": 5, "parallelizableWith": [], "blockedBy": [], "commit": "b266f63", "origin": "Self-review of Task 52 — Deployments.razor hardcodes createdBy=\"(current user)\"; needs @inject AuthenticationStateProvider."},
|
||||
{"id": "F19", "subject": "Follow-up: RuntimeStartup extension for driver-role node-actor spawn", "status": "completed", "classification": "standard", "estMinutes": 20, "parallelizableWith": [], "blockedBy": [], "commit": "09d6676", "origin": "Self-review of Task 53 — only admin-role singletons are wired via WithOtOpcUaControlPlaneSingletons. Driver-role nodes need a parallel WithOtOpcUaRuntimeActors that spawns DriverHostActor."},
|
||||
{"id": "F20", "subject": "Follow-up: Wire DriverInstanceActor.ShouldStub() into DriverHostActor child spawn", "status": "pending", "classification": "small", "estMinutes": 10, "parallelizableWith": ["F7"], "blockedBy": [], "origin": "Self-review of Task 55 — ShouldStub helper exists but nothing calls it. Folds into F7 when DriverHostActor learns to spawn DriverInstanceActor children."},
|
||||
{"id": "F20", "subject": "Follow-up: Wire DriverInstanceActor.ShouldStub() into DriverHostActor child spawn", "status": "completed", "classification": "small", "estMinutes": 10, "parallelizableWith": ["F7"], "blockedBy": [], "origin": "Self-review of Task 55 — ShouldStub helper exists but nothing calls it. Folds into F7 when DriverHostActor learns to spawn DriverInstanceActor children.", "shipped": "DriverHostActor.SpawnChild now calls DriverInstanceActor.ShouldStub(type, _localRoles) and routes Windows-only driver types to the stub path on non-Windows / dev-role hosts. Verified by DriverHostActorReconcileTests.Galaxy_on_non_windows_is_stubbed_by_ShouldStub_check."},
|
||||
{"id": "F21", "subject": "Follow-up: docker-compose.yml for Host.IntegrationTests (real SQL Server + OpenLDAP)", "status": "completed", "classification": "standard", "estMinutes": 30, "parallelizableWith": [], "blockedBy": [], "commit": "b0a2bb0", "deviationNotes": "Stack shipped (SQL on 14331, OpenLDAP on 3894). HarnessMode reads OTOPCUA_HARNESS_USE_SQL=1 / USE_LDAP=1 from env; SQL mode uses per-harness unique DB via EnsureCreated. Compose itself not local-validated — DESKTOP-6JL3KKO has no Docker per CLAUDE.md; CI on Linux will exercise the real path. The xunit test-trait split was punted — env vars are simpler and cover the same use case (one suite, two modes, no test-class duplication).", "origin": "Deviation from Task 58 — TwoNodeClusterHarness uses EF InMemoryDatabase + StubLdapAuthService. For Mac-friendly local runs against real SQL constraints + LDAP, add tests/Server/ZB.MOM.WW.OtOpcUa.Host.IntegrationTests/docker-compose.yml (SQL Server + OpenLDAP), wire EF SqlServer provider behind an env var (OTOPCUA_HARNESS_USE_SQL=1), and add a test trait so CI can run both modes."},
|
||||
{"id": "F22", "subject": "Follow-up: failover scenario integration tests (kill-mid-apply, split-brain, restart-during-deploy)", "status": "completed", "classification": "standard", "estMinutes": 60, "parallelizableWith": [], "blockedBy": [], "commit": "cd5540c", "deviationNotes": "Shipped 3 scenarios on the existing 2-node harness: stop-shrinks, restart-rejoins-same-port, deploy-with-one-node-down. Split-brain via simulated partition deferred — Akka.Hosting + xunit don't expose transport-level interference cleanly. The graceful-shutdown + rejoin path is what production actually exercises; ungraceful kill-mid-apply non-deterministic under SBR's 15s stable-after.", "origin": "Deviation from Task 59 — happy-path + idempotency landed but design §8 cases 3-7 need controlled node-down primitives on TwoNodeClusterHarness (StopNodeAsync, RestartNodeAsync, PartitionBetweenAsync). Add those + 5 scenario tests."}
|
||||
]
|
||||
|
||||
@@ -0,0 +1,30 @@
|
||||
namespace ZB.MOM.WW.OtOpcUa.Commons.Engines;
|
||||
|
||||
/// <summary>
|
||||
/// Abstraction over the scripted-alarm predicate engine. Production binds this to a
|
||||
/// wrapper around <c>ScriptedAlarmEngine</c> from <c>Core.ScriptedAlarms</c>; default
|
||||
/// binding is <see cref="NullScriptedAlarmEvaluator"/> which keeps the alarm in its
|
||||
/// current state (so an unconfigured node never spuriously alarms).
|
||||
/// </summary>
|
||||
public interface IScriptedAlarmEvaluator
|
||||
{
|
||||
ScriptedAlarmEvalResult Evaluate(string alarmId, string predicate, IReadOnlyDictionary<string, object?> dependencies);
|
||||
}
|
||||
|
||||
/// <summary>Result of one alarm-predicate evaluation. <c>Active</c> is only meaningful when
|
||||
/// <c>Success</c> is true; on failure the caller should keep the prior state and log Reason.</summary>
|
||||
public sealed record ScriptedAlarmEvalResult(bool Success, bool Active, string? Reason)
|
||||
{
|
||||
public static ScriptedAlarmEvalResult Ok(bool active) => new(true, active, null);
|
||||
public static ScriptedAlarmEvalResult Failure(string reason) => new(false, false, reason);
|
||||
}
|
||||
|
||||
/// <summary>Default that always returns <c>Active = false, Success = true</c>. Safe no-op:
|
||||
/// no alarm fires when no real engine is bound.</summary>
|
||||
public sealed class NullScriptedAlarmEvaluator : IScriptedAlarmEvaluator
|
||||
{
|
||||
public static readonly NullScriptedAlarmEvaluator Instance = new();
|
||||
private NullScriptedAlarmEvaluator() { }
|
||||
public ScriptedAlarmEvalResult Evaluate(string alarmId, string predicate, IReadOnlyDictionary<string, object?> dependencies)
|
||||
=> ScriptedAlarmEvalResult.Ok(active: false);
|
||||
}
|
||||
@@ -0,0 +1,36 @@
|
||||
namespace ZB.MOM.WW.OtOpcUa.Commons.Engines;
|
||||
|
||||
/// <summary>
|
||||
/// Abstraction over the compiled virtual-tag expression engine. Runtime consumes this so
|
||||
/// <see cref="VirtualTagActor"/> can stay free of Roslyn / scripting machinery and the
|
||||
/// production wiring binds an adapter over <c>VirtualTagEngine</c> from
|
||||
/// <c>Core.VirtualTags</c>.
|
||||
/// </summary>
|
||||
public interface IVirtualTagEvaluator
|
||||
{
|
||||
/// <summary>
|
||||
/// Evaluate <paramref name="expression"/> against the snapshot in
|
||||
/// <paramref name="dependencies"/>. Implementations must not throw — script failures
|
||||
/// are reported via <see cref="VirtualTagEvalResult.Failure"/>.
|
||||
/// </summary>
|
||||
VirtualTagEvalResult Evaluate(string virtualTagId, string expression, IReadOnlyDictionary<string, object?> dependencies);
|
||||
}
|
||||
|
||||
/// <summary>Result of one virtual-tag expression eval. Stash a Reason on every Failure so
|
||||
/// callers can emit a useful <c>ScriptLogEntry</c> to operators.</summary>
|
||||
public sealed record VirtualTagEvalResult(bool Success, object? Value, string? Reason)
|
||||
{
|
||||
public static readonly VirtualTagEvalResult NoChange = new(true, null, "no-change");
|
||||
public static VirtualTagEvalResult Ok(object? value) => new(true, value, null);
|
||||
public static VirtualTagEvalResult Failure(string reason) => new(false, null, reason);
|
||||
}
|
||||
|
||||
/// <summary>Returns <see cref="VirtualTagEvalResult.NoChange"/> from every call. Bound by default
|
||||
/// when the production <c>VirtualTagEngine</c> adapter hasn't been registered (Mac dev, tests).</summary>
|
||||
public sealed class NullVirtualTagEvaluator : IVirtualTagEvaluator
|
||||
{
|
||||
public static readonly NullVirtualTagEvaluator Instance = new();
|
||||
private NullVirtualTagEvaluator() { }
|
||||
public VirtualTagEvalResult Evaluate(string virtualTagId, string expression, IReadOnlyDictionary<string, object?> dependencies)
|
||||
=> VirtualTagEvalResult.NoChange;
|
||||
}
|
||||
@@ -0,0 +1,37 @@
|
||||
namespace ZB.MOM.WW.OtOpcUa.Commons.OpcUa;
|
||||
|
||||
/// <summary>
|
||||
/// Abstraction over the OPC UA SDK's address space. <c>OpcUaPublishActor</c> consumes this
|
||||
/// so the Runtime project doesn't reference <c>Opc.Ua.Server</c> directly — production
|
||||
/// binds a real SDK-backed sink in the fused Host's wiring, dev/Mac binds the
|
||||
/// <see cref="NullOpcUaAddressSpaceSink"/> no-op.
|
||||
/// </summary>
|
||||
public interface IOpcUaAddressSpaceSink
|
||||
{
|
||||
/// <summary>Write a Variable node's current value + quality + source timestamp.</summary>
|
||||
void WriteValue(string nodeId, object? value, OpcUaQuality quality, DateTime sourceTimestampUtc);
|
||||
|
||||
/// <summary>Write an alarm-condition Variable's active/acknowledged state.</summary>
|
||||
void WriteAlarmState(string alarmNodeId, bool active, bool acknowledged, DateTime sourceTimestampUtc);
|
||||
|
||||
/// <summary>
|
||||
/// Tear down + repopulate the address space. Called by <c>OpcUaPublishActor</c> after a
|
||||
/// successful deployment apply so the node manager reflects the new config. Idempotent.
|
||||
/// </summary>
|
||||
void RebuildAddressSpace();
|
||||
}
|
||||
|
||||
/// <summary>OPC UA status code projection — Good / Uncertain / Bad. Real SDK has finer-grained
|
||||
/// codes; the engine actors only need this 3-state classification.</summary>
|
||||
public enum OpcUaQuality { Good, Uncertain, Bad }
|
||||
|
||||
/// <summary>No-op sink. Bound by default so the actors are safe to run in dev / Mac /
|
||||
/// integration tests without a real SDK behind them.</summary>
|
||||
public sealed class NullOpcUaAddressSpaceSink : IOpcUaAddressSpaceSink
|
||||
{
|
||||
public static readonly NullOpcUaAddressSpaceSink Instance = new();
|
||||
private NullOpcUaAddressSpaceSink() { }
|
||||
public void WriteValue(string nodeId, object? value, OpcUaQuality quality, DateTime sourceTimestampUtc) { }
|
||||
public void WriteAlarmState(string alarmNodeId, bool active, bool acknowledged, DateTime sourceTimestampUtc) { }
|
||||
public void RebuildAddressSpace() { }
|
||||
}
|
||||
@@ -0,0 +1,22 @@
|
||||
namespace ZB.MOM.WW.OtOpcUa.Commons.OpcUa;
|
||||
|
||||
/// <summary>
|
||||
/// Writes the OPC UA Server object's <c>ServiceLevel</c> Variable (0–255). Production binds
|
||||
/// a sink that pokes the SDK's ServiceLevel node; tests + dev mode bind
|
||||
/// <see cref="NullServiceLevelPublisher"/> which just records the most recently set level
|
||||
/// for inspection.
|
||||
/// </summary>
|
||||
public interface IServiceLevelPublisher
|
||||
{
|
||||
void Publish(byte serviceLevel);
|
||||
}
|
||||
|
||||
/// <summary>No-op default that retains the last-written ServiceLevel in
|
||||
/// <see cref="LastPublished"/>. Used by dev mode + verified by tests.</summary>
|
||||
public sealed class NullServiceLevelPublisher : IServiceLevelPublisher
|
||||
{
|
||||
public static readonly NullServiceLevelPublisher Instance = new();
|
||||
private NullServiceLevelPublisher() { }
|
||||
public byte LastPublished { get; private set; }
|
||||
public void Publish(byte serviceLevel) => LastPublished = serviceLevel;
|
||||
}
|
||||
@@ -0,0 +1,35 @@
|
||||
namespace ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
||||
|
||||
/// <summary>
|
||||
/// Abstraction over the process-wide driver registry. Runtime consumes this instead of
|
||||
/// <c>DriverFactoryRegistry</c> directly so the Runtime project doesn't pull in
|
||||
/// <c>ZB.MOM.WW.OtOpcUa.Core</c> (which would drag in Polly + driver hosting). The fused
|
||||
/// Host binds a <c>DriverFactoryRegistryAdapter</c> after every <c>Driver.*.Register()</c>
|
||||
/// extension has run.
|
||||
/// </summary>
|
||||
public interface IDriverFactory
|
||||
{
|
||||
/// <summary>
|
||||
/// Return a new <see cref="IDriver"/> for the given <paramref name="driverType"/>, or
|
||||
/// <c>null</c> when no factory is registered for that type (missing assembly, typo, etc.).
|
||||
/// The DriverHostActor logs + skips the row rather than failing the whole apply.
|
||||
/// </summary>
|
||||
IDriver? TryCreate(string driverType, string driverInstanceId, string driverConfigJson);
|
||||
|
||||
/// <summary>Driver-type names this factory can materialise. Mostly for diagnostics + logs.</summary>
|
||||
IReadOnlyCollection<string> SupportedTypes { get; }
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Returns <c>null</c> from every <see cref="IDriverFactory.TryCreate"/> call. Bound when the
|
||||
/// fused Host hasn't registered any concrete driver assemblies yet (Mac dev path, smoke
|
||||
/// tests). DriverHostActor sees zero supported types and treats the deployment as a no-op.
|
||||
/// </summary>
|
||||
public sealed class NullDriverFactory : IDriverFactory
|
||||
{
|
||||
public static readonly NullDriverFactory Instance = new();
|
||||
private NullDriverFactory() { }
|
||||
|
||||
public IDriver? TryCreate(string driverType, string driverInstanceId, string driverConfigJson) => null;
|
||||
public IReadOnlyCollection<string> SupportedTypes { get; } = Array.Empty<string>();
|
||||
}
|
||||
@@ -0,0 +1,28 @@
|
||||
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Core.Hosting;
|
||||
|
||||
/// <summary>
|
||||
/// Adapts the existing <see cref="DriverFactoryRegistry"/> (v1 surface, still the
|
||||
/// concrete singleton every driver assembly registers itself against) to the v2
|
||||
/// <see cref="IDriverFactory"/> abstraction consumed by Runtime. The fused Host binds
|
||||
/// this in DI once each <c>Driver.*.Register(registry)</c> call has completed.
|
||||
/// </summary>
|
||||
public sealed class DriverFactoryRegistryAdapter : IDriverFactory
|
||||
{
|
||||
private readonly DriverFactoryRegistry _registry;
|
||||
|
||||
public DriverFactoryRegistryAdapter(DriverFactoryRegistry registry)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(registry);
|
||||
_registry = registry;
|
||||
}
|
||||
|
||||
public IDriver? TryCreate(string driverType, string driverInstanceId, string driverConfigJson)
|
||||
{
|
||||
var factory = _registry.TryGet(driverType);
|
||||
return factory?.Invoke(driverInstanceId, driverConfigJson);
|
||||
}
|
||||
|
||||
public IReadOnlyCollection<string> SupportedTypes => _registry.RegisteredTypes;
|
||||
}
|
||||
@@ -0,0 +1,109 @@
|
||||
using Microsoft.Extensions.Logging;
|
||||
using ZB.MOM.WW.OtOpcUa.Commons.OpcUa;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.OpcUaServer;
|
||||
|
||||
/// <summary>
|
||||
/// Side-effecting orchestrator over <see cref="Phase7Plan"/>. Drives an
|
||||
/// <see cref="IOpcUaAddressSpaceSink"/> to materialise the diff between two
|
||||
/// <see cref="Phase7CompositionResult"/> snapshots:
|
||||
///
|
||||
/// <list type="bullet">
|
||||
/// <item>RemovedEquipment / RemovedAlarms — write Bad-quality on every removed
|
||||
/// node id then call <c>RebuildAddressSpace</c> at the end so the sink can
|
||||
/// actually tear down the OPC UA folders + variables.</item>
|
||||
/// <item>AddedEquipment / AddedAlarms — same Rebuild trigger (real SDK NodeManager
|
||||
/// will repopulate from the persisted artifact). For now we record the work.</item>
|
||||
/// <item>ChangedEquipment / ChangedAlarms — record what changed; the SDK adapter
|
||||
/// that lands in F10b will decide between in-place property writes and
|
||||
/// tear-down + rebuild.</item>
|
||||
/// </list>
|
||||
///
|
||||
/// This is the side-effecting layer Task 47 deferred to F14. It stays pure-of-SDK so
|
||||
/// production binds a real SDK sink, dev/Mac binds <see cref="NullOpcUaAddressSpaceSink"/>,
|
||||
/// and tests can capture every call.
|
||||
/// </summary>
|
||||
public sealed class Phase7Applier
|
||||
{
|
||||
private readonly IOpcUaAddressSpaceSink _sink;
|
||||
private readonly ILogger<Phase7Applier> _logger;
|
||||
|
||||
public Phase7Applier(IOpcUaAddressSpaceSink sink, ILogger<Phase7Applier> logger)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(sink);
|
||||
ArgumentNullException.ThrowIfNull(logger);
|
||||
_sink = sink;
|
||||
_logger = logger;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Apply <paramref name="plan"/> to the sink. Returns a summary of what was applied so
|
||||
/// callers (OpcUaPublishActor) can correlate the work back to the originating deployment.
|
||||
/// </summary>
|
||||
public Phase7ApplyOutcome Apply(Phase7Plan plan)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(plan);
|
||||
|
||||
if (plan.IsEmpty)
|
||||
{
|
||||
_logger.LogDebug("Phase7Applier: plan is empty; skipping sink writes");
|
||||
return new Phase7ApplyOutcome(RemovedNodes: 0, AddedNodes: 0, ChangedNodes: 0, RebuildCalled: false);
|
||||
}
|
||||
|
||||
var ts = DateTime.UtcNow;
|
||||
var removedCount = 0;
|
||||
foreach (var eq in plan.RemovedEquipment)
|
||||
{
|
||||
SafeWriteAlarmState(eq.EquipmentId, active: false, acknowledged: false, ts);
|
||||
removedCount++;
|
||||
}
|
||||
foreach (var alarm in plan.RemovedAlarms)
|
||||
{
|
||||
SafeWriteAlarmState(alarm.ScriptedAlarmId, active: false, acknowledged: false, ts);
|
||||
removedCount++;
|
||||
}
|
||||
|
||||
var changedCount =
|
||||
plan.ChangedEquipment.Count + plan.ChangedDrivers.Count + plan.ChangedAlarms.Count;
|
||||
var addedCount =
|
||||
plan.AddedEquipment.Count + plan.AddedDrivers.Count + plan.AddedAlarms.Count;
|
||||
|
||||
// Any add/remove of Equipment or ScriptedAlarm requires a real address-space rebuild.
|
||||
// Driver-instance changes don't touch the address-space topology directly — they go
|
||||
// through DriverHostActor's spawn-plan in Runtime.
|
||||
var needsRebuild =
|
||||
plan.AddedEquipment.Count > 0 || plan.RemovedEquipment.Count > 0 ||
|
||||
plan.AddedAlarms.Count > 0 || plan.RemovedAlarms.Count > 0;
|
||||
|
||||
if (needsRebuild)
|
||||
{
|
||||
try
|
||||
{
|
||||
_sink.RebuildAddressSpace();
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogError(ex, "Phase7Applier: sink.RebuildAddressSpace threw");
|
||||
}
|
||||
}
|
||||
|
||||
_logger.LogInformation(
|
||||
"Phase7Applier: applied plan (added={Added}, removed={Removed}, changed={Changed}, rebuild={Rebuild})",
|
||||
addedCount, removedCount, changedCount, needsRebuild);
|
||||
|
||||
return new Phase7ApplyOutcome(removedCount, addedCount, changedCount, needsRebuild);
|
||||
}
|
||||
|
||||
private void SafeWriteAlarmState(string nodeId, bool active, bool acknowledged, DateTime ts)
|
||||
{
|
||||
try { _sink.WriteAlarmState(nodeId, active, acknowledged, ts); }
|
||||
catch (Exception ex) { _logger.LogWarning(ex, "Phase7Applier: WriteAlarmState threw for {Node}", nodeId); }
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>Summary of one apply pass. Useful for tests + audit-log entries on the deploy path.</summary>
|
||||
public sealed record Phase7ApplyOutcome(
|
||||
int RemovedNodes,
|
||||
int AddedNodes,
|
||||
int ChangedNodes,
|
||||
bool RebuildCalled);
|
||||
@@ -0,0 +1,98 @@
|
||||
namespace ZB.MOM.WW.OtOpcUa.OpcUaServer;
|
||||
|
||||
/// <summary>
|
||||
/// Pure diff between two <see cref="Phase7CompositionResult"/> snapshots — the
|
||||
/// <c>previous</c> currently-applied composition and the <c>next</c> from a freshly-applied
|
||||
/// deployment. Three lists per entity class (Equipment / DriverInstance / ScriptedAlarm)
|
||||
/// captured by stable identity: added items are new, removed items have to be torn down,
|
||||
/// changed items have the same identity but at least one field differs.
|
||||
///
|
||||
/// OpcUaPublishActor's <c>RebuildAddressSpace</c> consumes this against a real
|
||||
/// <see cref="Commons.OpcUa.IOpcUaAddressSpaceSink"/> binding so re-applies only mutate the
|
||||
/// nodes that actually changed — full tear-down + rebuild is reserved for first-boot or
|
||||
/// drastic schema flips.
|
||||
/// </summary>
|
||||
public sealed record Phase7Plan(
|
||||
IReadOnlyList<EquipmentNode> AddedEquipment,
|
||||
IReadOnlyList<EquipmentNode> RemovedEquipment,
|
||||
IReadOnlyList<Phase7Plan.EquipmentDelta> ChangedEquipment,
|
||||
IReadOnlyList<DriverInstancePlan> AddedDrivers,
|
||||
IReadOnlyList<DriverInstancePlan> RemovedDrivers,
|
||||
IReadOnlyList<Phase7Plan.DriverDelta> ChangedDrivers,
|
||||
IReadOnlyList<ScriptedAlarmPlan> AddedAlarms,
|
||||
IReadOnlyList<ScriptedAlarmPlan> RemovedAlarms,
|
||||
IReadOnlyList<Phase7Plan.AlarmDelta> ChangedAlarms)
|
||||
{
|
||||
public bool IsEmpty =>
|
||||
AddedEquipment.Count == 0 && RemovedEquipment.Count == 0 && ChangedEquipment.Count == 0 &&
|
||||
AddedDrivers.Count == 0 && RemovedDrivers.Count == 0 && ChangedDrivers.Count == 0 &&
|
||||
AddedAlarms.Count == 0 && RemovedAlarms.Count == 0 && ChangedAlarms.Count == 0;
|
||||
|
||||
public sealed record EquipmentDelta(EquipmentNode Previous, EquipmentNode Current);
|
||||
public sealed record DriverDelta(DriverInstancePlan Previous, DriverInstancePlan Current);
|
||||
public sealed record AlarmDelta(ScriptedAlarmPlan Previous, ScriptedAlarmPlan Current);
|
||||
}
|
||||
|
||||
public static class Phase7Planner
|
||||
{
|
||||
/// <summary>
|
||||
/// Diff two compositions, emitting Added/Removed/Changed sets per entity class.
|
||||
/// Identity is the entity's stable id (EquipmentId, DriverInstanceId, ScriptedAlarmId).
|
||||
/// Element equality on the projection records doubles as the "did this change" check,
|
||||
/// so any field difference moves an item from "stable" to ChangedX.
|
||||
/// </summary>
|
||||
public static Phase7Plan Compute(Phase7CompositionResult previous, Phase7CompositionResult next)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(previous);
|
||||
ArgumentNullException.ThrowIfNull(next);
|
||||
|
||||
var (addedEq, removedEq, changedEq) = DiffById(
|
||||
previous.EquipmentNodes, next.EquipmentNodes,
|
||||
n => n.EquipmentId,
|
||||
(a, b) => new Phase7Plan.EquipmentDelta(a, b));
|
||||
|
||||
var (addedDrv, removedDrv, changedDrv) = DiffById(
|
||||
previous.DriverInstancePlans, next.DriverInstancePlans,
|
||||
d => d.DriverInstanceId,
|
||||
(a, b) => new Phase7Plan.DriverDelta(a, b));
|
||||
|
||||
var (addedAlarm, removedAlarm, changedAlarm) = DiffById(
|
||||
previous.ScriptedAlarmPlans, next.ScriptedAlarmPlans,
|
||||
a => a.ScriptedAlarmId,
|
||||
(a, b) => new Phase7Plan.AlarmDelta(a, b));
|
||||
|
||||
return new Phase7Plan(
|
||||
addedEq, removedEq, changedEq,
|
||||
addedDrv, removedDrv, changedDrv,
|
||||
addedAlarm, removedAlarm, changedAlarm);
|
||||
}
|
||||
|
||||
private static (IReadOnlyList<T> Added, IReadOnlyList<T> Removed, IReadOnlyList<TDelta> Changed)
|
||||
DiffById<T, TDelta>(
|
||||
IReadOnlyList<T> previous,
|
||||
IReadOnlyList<T> next,
|
||||
Func<T, string> identity,
|
||||
Func<T, T, TDelta> deltaFactory) where T : class
|
||||
{
|
||||
var prevById = previous.ToDictionary(identity, StringComparer.Ordinal);
|
||||
var nextById = next.ToDictionary(identity, StringComparer.Ordinal);
|
||||
|
||||
var added = new List<T>();
|
||||
var removed = new List<T>();
|
||||
var changed = new List<TDelta>();
|
||||
|
||||
foreach (var (id, p) in prevById)
|
||||
{
|
||||
if (!nextById.TryGetValue(id, out var n)) { removed.Add(p); continue; }
|
||||
if (!EqualityComparer<T>.Default.Equals(p, n)) changed.Add(deltaFactory(p, n));
|
||||
}
|
||||
foreach (var (id, n) in nextById)
|
||||
{
|
||||
if (!prevById.ContainsKey(id)) added.Add(n);
|
||||
}
|
||||
|
||||
added.Sort((a, b) => string.CompareOrdinal(identity(a), identity(b)));
|
||||
removed.Sort((a, b) => string.CompareOrdinal(identity(a), identity(b)));
|
||||
return (added, removed, changed);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,78 @@
|
||||
using System.Text.Json;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Runtime.Drivers;
|
||||
|
||||
/// <summary>
|
||||
/// Minimal driver-side view of the deployment artifact emitted by
|
||||
/// <c>ConfigComposer.SnapshotAndFlattenAsync</c>. The artifact JSON is the full snapshot —
|
||||
/// for driver spawning we only need the <c>DriverInstances</c> array. Reading just the
|
||||
/// subset keeps allocations cheap on every deploy.
|
||||
/// </summary>
|
||||
public sealed record DriverInstanceSpec(
|
||||
Guid DriverInstanceRowId,
|
||||
string DriverInstanceId,
|
||||
string Name,
|
||||
string DriverType,
|
||||
bool Enabled,
|
||||
string DriverConfig);
|
||||
|
||||
public static class DeploymentArtifact
|
||||
{
|
||||
private static readonly JsonSerializerOptions JsonOptions = new()
|
||||
{
|
||||
PropertyNameCaseInsensitive = true,
|
||||
};
|
||||
|
||||
/// <summary>
|
||||
/// Parse a deployment artifact blob into the list of driver-instance specs to spawn.
|
||||
/// Empty / malformed blobs return an empty list — callers log + treat as "no drivers".
|
||||
/// </summary>
|
||||
public static IReadOnlyList<DriverInstanceSpec> ParseDriverInstances(ReadOnlySpan<byte> blob)
|
||||
{
|
||||
if (blob.IsEmpty) return Array.Empty<DriverInstanceSpec>();
|
||||
|
||||
try
|
||||
{
|
||||
using var doc = JsonDocument.Parse(blob.ToArray());
|
||||
if (!doc.RootElement.TryGetProperty("DriverInstances", out var arr)
|
||||
|| arr.ValueKind != JsonValueKind.Array)
|
||||
{
|
||||
return Array.Empty<DriverInstanceSpec>();
|
||||
}
|
||||
|
||||
var result = new List<DriverInstanceSpec>(arr.GetArrayLength());
|
||||
foreach (var el in arr.EnumerateArray())
|
||||
{
|
||||
if (el.ValueKind != JsonValueKind.Object) continue;
|
||||
var spec = TryReadSpec(el);
|
||||
if (spec is not null) result.Add(spec);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
catch (JsonException)
|
||||
{
|
||||
return Array.Empty<DriverInstanceSpec>();
|
||||
}
|
||||
}
|
||||
|
||||
private static DriverInstanceSpec? TryReadSpec(JsonElement el)
|
||||
{
|
||||
var rowId = el.TryGetProperty("DriverInstanceRowId", out var rowEl)
|
||||
&& rowEl.TryGetGuid(out var rid) ? rid : Guid.Empty;
|
||||
var id = el.TryGetProperty("DriverInstanceId", out var idEl) ? idEl.GetString() : null;
|
||||
var name = el.TryGetProperty("Name", out var nameEl) ? nameEl.GetString() : null;
|
||||
var type = el.TryGetProperty("DriverType", out var typeEl) ? typeEl.GetString() : null;
|
||||
var enabled = !el.TryGetProperty("Enabled", out var enEl) || enEl.GetBoolean();
|
||||
var config = el.TryGetProperty("DriverConfig", out var cfgEl) ? cfgEl.GetString() : null;
|
||||
|
||||
if (string.IsNullOrWhiteSpace(id) || string.IsNullOrWhiteSpace(type)) return null;
|
||||
|
||||
return new DriverInstanceSpec(
|
||||
DriverInstanceRowId: rowId,
|
||||
DriverInstanceId: id!,
|
||||
Name: name ?? id!,
|
||||
DriverType: type!,
|
||||
Enabled: enabled,
|
||||
DriverConfig: config ?? "{}");
|
||||
}
|
||||
}
|
||||
@@ -9,6 +9,7 @@ using ZB.MOM.WW.OtOpcUa.Commons.Types;
|
||||
using ZB.MOM.WW.OtOpcUa.Configuration;
|
||||
using ZB.MOM.WW.OtOpcUa.Configuration.Entities;
|
||||
using ZB.MOM.WW.OtOpcUa.Configuration.Enums;
|
||||
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
||||
using CommonsNodeId = ZB.MOM.WW.OtOpcUa.Commons.Types.NodeId;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Runtime.Drivers;
|
||||
@@ -38,11 +39,17 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers
|
||||
private readonly IDbContextFactory<OtOpcUaConfigDbContext> _dbFactory;
|
||||
private readonly CommonsNodeId _localNode;
|
||||
private readonly IActorRef? _coordinatorOverride;
|
||||
private readonly IDriverFactory _driverFactory;
|
||||
private readonly IReadOnlySet<string> _localRoles;
|
||||
private readonly ILoggingAdapter _log = Context.GetLogger();
|
||||
|
||||
private RevisionHash? _currentRevision;
|
||||
private DeploymentId? _applyingDeploymentId;
|
||||
|
||||
private readonly Dictionary<string, ChildEntry> _children = new(StringComparer.Ordinal);
|
||||
|
||||
private sealed record ChildEntry(IActorRef Actor, string DriverType, string LastConfigJson, bool Stubbed);
|
||||
|
||||
public ITimerScheduler Timers { get; set; } = null!;
|
||||
|
||||
public sealed class RetryConfigDbConnection
|
||||
@@ -54,17 +61,23 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers
|
||||
public static Props Props(
|
||||
IDbContextFactory<OtOpcUaConfigDbContext> dbFactory,
|
||||
CommonsNodeId localNode,
|
||||
IActorRef? coordinator = null) =>
|
||||
Akka.Actor.Props.Create(() => new DriverHostActor(dbFactory, localNode, coordinator));
|
||||
IActorRef? coordinator = null,
|
||||
IDriverFactory? driverFactory = null,
|
||||
IReadOnlySet<string>? localRoles = null) =>
|
||||
Akka.Actor.Props.Create(() => new DriverHostActor(dbFactory, localNode, coordinator, driverFactory, localRoles));
|
||||
|
||||
public DriverHostActor(
|
||||
IDbContextFactory<OtOpcUaConfigDbContext> dbFactory,
|
||||
CommonsNodeId localNode,
|
||||
IActorRef? coordinator)
|
||||
IActorRef? coordinator,
|
||||
IDriverFactory? driverFactory = null,
|
||||
IReadOnlySet<string>? localRoles = null)
|
||||
{
|
||||
_dbFactory = dbFactory;
|
||||
_localNode = localNode;
|
||||
_coordinatorOverride = coordinator;
|
||||
_driverFactory = driverFactory ?? NullDriverFactory.Instance;
|
||||
_localRoles = localRoles ?? new HashSet<string>(StringComparer.Ordinal);
|
||||
|
||||
// Default behavior is Steady — PreStart may flip to Stale or replay an orphan apply.
|
||||
Become(Steady);
|
||||
@@ -172,12 +185,19 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers
|
||||
|
||||
private void HandleGetDiagnostics(GetDiagnostics msg)
|
||||
{
|
||||
// Driver-instance children aren't spawned yet (F7); the snapshot reports an empty driver
|
||||
// list. CurrentRevision is real — it's what the host believes is its applied revision.
|
||||
var drivers = _children
|
||||
.Select(kv => new DriverInstanceDiagnostics(
|
||||
DriverInstanceId: Guid.Empty,
|
||||
Name: kv.Key,
|
||||
State: kv.Value.Stubbed ? "Stubbed" : "Spawned",
|
||||
ConnectedDevices: 0,
|
||||
FaultedDevices: 0,
|
||||
LastChangeUtc: DateTime.UtcNow))
|
||||
.ToArray();
|
||||
var snapshot = new NodeDiagnosticsSnapshot(
|
||||
NodeId: _localNode,
|
||||
CurrentRevision: _currentRevision,
|
||||
Drivers: Array.Empty<DriverInstanceDiagnostics>(),
|
||||
Drivers: drivers,
|
||||
AsOfUtc: DateTime.UtcNow);
|
||||
Sender.Tell(snapshot);
|
||||
}
|
||||
@@ -205,11 +225,12 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers
|
||||
|
||||
try
|
||||
{
|
||||
// Future: dispatch ApplyDelta to children, wait for acks. For Task 37/38, just no-op.
|
||||
ReconcileDrivers(deploymentId);
|
||||
_currentRevision = revision;
|
||||
UpsertNodeDeploymentState(deploymentId, NodeDeploymentStatus.Applied, failureReason: null);
|
||||
SendAck(deploymentId, ApplyAckOutcome.Applied, failureReason: null, correlation);
|
||||
_log.Info("DriverHost {Node}: applied deployment {Id} (rev {Rev})", _localNode, deploymentId, revision);
|
||||
_log.Info("DriverHost {Node}: applied deployment {Id} (rev {Rev}, children={Count})",
|
||||
_localNode, deploymentId, revision, _children.Count);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
@@ -224,6 +245,126 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Read the deployment artifact + reconcile the set of running <see cref="DriverInstanceActor"/>
|
||||
/// children. Spawn missing, ApplyDelta on config change, stop removed/disabled drivers.
|
||||
/// When the artifact blob is empty (legacy ControlPlane tests, smoke fixtures) or the
|
||||
/// configured <see cref="IDriverFactory"/> can't materialise any of the requested
|
||||
/// types, this is effectively a no-op.
|
||||
/// </summary>
|
||||
private void ReconcileDrivers(DeploymentId deploymentId)
|
||||
{
|
||||
byte[] blob;
|
||||
try
|
||||
{
|
||||
using var db = _dbFactory.CreateDbContext();
|
||||
blob = db.Deployments.AsNoTracking()
|
||||
.Where(d => d.DeploymentId == deploymentId.Value)
|
||||
.Select(d => d.ArtifactBlob)
|
||||
.FirstOrDefault() ?? Array.Empty<byte>();
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_log.Warning(ex, "DriverHost {Node}: failed to load artifact for {Id}; skipping reconcile",
|
||||
_localNode, deploymentId);
|
||||
return;
|
||||
}
|
||||
|
||||
var specs = DeploymentArtifact.ParseDriverInstances(blob);
|
||||
var snapshots = _children.ToDictionary(
|
||||
kv => kv.Key,
|
||||
kv => new DriverChildSnapshot(kv.Value.DriverType, kv.Value.LastConfigJson),
|
||||
StringComparer.Ordinal);
|
||||
var plan = DriverSpawnPlanner.Compute(snapshots, specs);
|
||||
|
||||
foreach (var id in plan.ToStop) StopChild(id);
|
||||
foreach (var spec in plan.ToApplyDelta) ApplyChildDelta(spec);
|
||||
foreach (var spec in plan.ToSpawn) SpawnChild(spec);
|
||||
}
|
||||
|
||||
private void SpawnChild(DriverInstanceSpec spec)
|
||||
{
|
||||
var stub = DriverInstanceActor.ShouldStub(spec.DriverType, _localRoles);
|
||||
IDriver? driver = null;
|
||||
if (!stub)
|
||||
{
|
||||
try { driver = _driverFactory.TryCreate(spec.DriverType, spec.DriverInstanceId, spec.DriverConfig); }
|
||||
catch (Exception ex)
|
||||
{
|
||||
_log.Warning(ex, "DriverHost {Node}: factory for {Type} threw on {Id}; stubbing",
|
||||
_localNode, spec.DriverType, spec.DriverInstanceId);
|
||||
}
|
||||
if (driver is null)
|
||||
{
|
||||
_log.Warning(
|
||||
"DriverHost {Node}: no factory for driver type {Type} (instance {Id}); falling back to stub",
|
||||
_localNode, spec.DriverType, spec.DriverInstanceId);
|
||||
stub = true;
|
||||
}
|
||||
}
|
||||
|
||||
IActorRef child;
|
||||
if (stub)
|
||||
{
|
||||
child = Context.ActorOf(
|
||||
DriverInstanceActor.Props(new StubbedDriver(spec.DriverInstanceId, spec.DriverType),
|
||||
reconnectInterval: null, startStubbed: true),
|
||||
ActorNameFor(spec.DriverInstanceId));
|
||||
}
|
||||
else
|
||||
{
|
||||
child = Context.ActorOf(
|
||||
DriverInstanceActor.Props(driver!),
|
||||
ActorNameFor(spec.DriverInstanceId));
|
||||
child.Tell(new DriverInstanceActor.InitializeRequested(spec.DriverConfig));
|
||||
}
|
||||
|
||||
_children[spec.DriverInstanceId] = new ChildEntry(child, spec.DriverType, spec.DriverConfig, stub);
|
||||
_log.Info("DriverHost {Node}: spawned {Type} driver {Id} (stub={Stub})",
|
||||
_localNode, spec.DriverType, spec.DriverInstanceId, stub);
|
||||
}
|
||||
|
||||
private void ApplyChildDelta(DriverInstanceSpec spec)
|
||||
{
|
||||
if (!_children.TryGetValue(spec.DriverInstanceId, out var entry)) return;
|
||||
entry.Actor.Tell(new DriverInstanceActor.ApplyDelta(spec.DriverConfig, CorrelationId.NewId()));
|
||||
_children[spec.DriverInstanceId] = entry with { LastConfigJson = spec.DriverConfig };
|
||||
_log.Debug("DriverHost {Node}: ApplyDelta queued for {Id}", _localNode, spec.DriverInstanceId);
|
||||
}
|
||||
|
||||
private void StopChild(string driverInstanceId)
|
||||
{
|
||||
if (!_children.TryGetValue(driverInstanceId, out var entry)) return;
|
||||
Context.Stop(entry.Actor);
|
||||
_children.Remove(driverInstanceId);
|
||||
_log.Info("DriverHost {Node}: stopped driver child {Id}", _localNode, driverInstanceId);
|
||||
}
|
||||
|
||||
private static string ActorNameFor(string driverInstanceId)
|
||||
{
|
||||
// Akka actor names cannot contain '/', ':', or whitespace. Mangle defensively.
|
||||
var chars = driverInstanceId.Select(c => char.IsLetterOrDigit(c) || c is '-' or '_' or '.' ? c : '_').ToArray();
|
||||
return "drv-" + new string(chars);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Minimal placeholder driver used when no factory is registered for a driver type or when
|
||||
/// <see cref="DriverInstanceActor.ShouldStub"/> returns true. <see cref="DriverInstanceActor"/>
|
||||
/// is started with <c>startStubbed:true</c> so the driver methods on this object never run.
|
||||
/// </summary>
|
||||
private sealed class StubbedDriver : IDriver
|
||||
{
|
||||
public string DriverInstanceId { get; }
|
||||
public string DriverType { get; }
|
||||
public StubbedDriver(string id, string type) { DriverInstanceId = id; DriverType = type; }
|
||||
public Task InitializeAsync(string driverConfigJson, CancellationToken cancellationToken) => Task.CompletedTask;
|
||||
public Task ReinitializeAsync(string driverConfigJson, CancellationToken cancellationToken) => Task.CompletedTask;
|
||||
public Task ShutdownAsync(CancellationToken cancellationToken) => Task.CompletedTask;
|
||||
public DriverHealth GetHealth() => new(DriverState.Healthy, DateTime.UtcNow, LastError: null);
|
||||
public long GetMemoryFootprint() => 0;
|
||||
public Task FlushOptionalCachesAsync(CancellationToken cancellationToken) => Task.CompletedTask;
|
||||
}
|
||||
|
||||
private void TryRecoverFromStale()
|
||||
{
|
||||
try
|
||||
|
||||
@@ -0,0 +1,67 @@
|
||||
namespace ZB.MOM.WW.OtOpcUa.Runtime.Drivers;
|
||||
|
||||
/// <summary>
|
||||
/// Pure diff between the currently-running driver children (keyed by
|
||||
/// <c>DriverInstance.DriverInstanceId</c>) and the target spec list from a freshly-applied
|
||||
/// deployment artifact. The DriverHostActor consumes the three lists and calls
|
||||
/// spawn / ApplyDelta / stop on its child actors accordingly.
|
||||
/// </summary>
|
||||
/// <param name="ToSpawn">Specs with no current child — create a new actor.</param>
|
||||
/// <param name="ToApplyDelta">Specs whose child exists but config JSON or type differs.</param>
|
||||
/// <param name="ToStop">DriverInstanceIds currently running but missing from the new artifact, or now disabled.</param>
|
||||
public sealed record DriverSpawnPlan(
|
||||
IReadOnlyList<DriverInstanceSpec> ToSpawn,
|
||||
IReadOnlyList<DriverInstanceSpec> ToApplyDelta,
|
||||
IReadOnlyList<string> ToStop);
|
||||
|
||||
public static class DriverSpawnPlanner
|
||||
{
|
||||
/// <summary>
|
||||
/// Compute the spawn/delta/stop sets. Disabled entries in <paramref name="target"/> are
|
||||
/// treated as "not desired here": if a child exists for the id it goes into ToStop,
|
||||
/// otherwise the row is dropped entirely (no spawn for a disabled driver).
|
||||
/// </summary>
|
||||
public static DriverSpawnPlan Compute(
|
||||
IReadOnlyDictionary<string, DriverChildSnapshot> current,
|
||||
IReadOnlyList<DriverInstanceSpec> target)
|
||||
{
|
||||
var toSpawn = new List<DriverInstanceSpec>();
|
||||
var toDelta = new List<DriverInstanceSpec>();
|
||||
var toStop = new List<string>();
|
||||
|
||||
var targetById = new Dictionary<string, DriverInstanceSpec>(StringComparer.Ordinal);
|
||||
foreach (var spec in target) targetById[spec.DriverInstanceId] = spec;
|
||||
|
||||
foreach (var (id, snap) in current)
|
||||
{
|
||||
if (!targetById.TryGetValue(id, out var spec) || !spec.Enabled)
|
||||
{
|
||||
toStop.Add(id);
|
||||
continue;
|
||||
}
|
||||
// Driver type changes can't be reinitialized in-place (factory-bound) — stop + respawn.
|
||||
if (!string.Equals(snap.DriverType, spec.DriverType, StringComparison.Ordinal))
|
||||
{
|
||||
toStop.Add(id);
|
||||
toSpawn.Add(spec);
|
||||
continue;
|
||||
}
|
||||
if (!string.Equals(snap.LastConfigJson, spec.DriverConfig, StringComparison.Ordinal))
|
||||
{
|
||||
toDelta.Add(spec);
|
||||
}
|
||||
}
|
||||
|
||||
foreach (var (id, spec) in targetById)
|
||||
{
|
||||
if (!spec.Enabled) continue;
|
||||
if (current.ContainsKey(id)) continue;
|
||||
toSpawn.Add(spec);
|
||||
}
|
||||
|
||||
return new DriverSpawnPlan(toSpawn, toDelta, toStop);
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>Snapshot of one running driver child as the host sees it. Used as the diff input.</summary>
|
||||
public sealed record DriverChildSnapshot(string DriverType, string LastConfigJson);
|
||||
@@ -1,70 +1,178 @@
|
||||
using Akka.Actor;
|
||||
using Akka.Cluster.Tools.PublishSubscribe;
|
||||
using Akka.Event;
|
||||
using ZB.MOM.WW.OtOpcUa.Commons.Messages.Redundancy;
|
||||
using ZB.MOM.WW.OtOpcUa.Commons.OpcUa;
|
||||
using ZB.MOM.WW.OtOpcUa.Commons.Types;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Runtime.OpcUa;
|
||||
|
||||
/// <summary>
|
||||
/// Single-threaded bridge between Akka messages and the OPC UA SDK address space. Hosted on
|
||||
/// the pinned <c>opcua-synchronized-dispatcher</c> (Task 19 HOCON) so the OPC UA SDK sees
|
||||
/// only one thread per actor instance — its session/subscription locks expect strict
|
||||
/// single-threaded access.
|
||||
/// Single-threaded bridge between Akka messages and the OPC UA SDK address space. Hosted on
|
||||
/// the pinned <c>opcua-synchronized-dispatcher</c> (Task 19 HOCON) so the OPC UA SDK sees
|
||||
/// only one thread per actor instance — its session/subscription locks expect strict
|
||||
/// single-threaded access.
|
||||
///
|
||||
/// Engine wiring (call into <c>OpcUaApplicationHost</c> address-space writes, manage
|
||||
/// <c>ServiceLevel</c> + <c>ServerUriArray</c> nodes, subscribe to the <c>redundancy-state</c>
|
||||
/// DistributedPubSub topic) is staged for follow-up F10. This skeleton compiles + exposes the
|
||||
/// message contracts so producers (DriverInstance, VirtualTag, ScriptedAlarm) can target it.
|
||||
/// Address-space writes route through <see cref="IOpcUaAddressSpaceSink"/>; ServiceLevel
|
||||
/// writes route through <see cref="IServiceLevelPublisher"/>. Production binds SDK-backed
|
||||
/// implementations; dev/Mac/tests bind the Null* defaults so the actor stays decoupled from
|
||||
/// <c>Opc.Ua.Server</c>. The remaining piece is wiring those bindings to a real
|
||||
/// <c>StandardServer</c> address space — tracked as F10b.
|
||||
/// </summary>
|
||||
public sealed class OpcUaPublishActor : ReceiveActor
|
||||
{
|
||||
public const string DispatcherId = "opcua-synchronized-dispatcher";
|
||||
public const string RedundancyStateTopic = "redundancy-state";
|
||||
|
||||
public sealed record AttributeValueUpdate(string NodeId, object? Value, OpcUaQuality Quality, DateTime TimestampUtc);
|
||||
public sealed record AlarmStateUpdate(string AlarmNodeId, bool Active, bool Acknowledged, DateTime TimestampUtc);
|
||||
public sealed record RebuildAddressSpace(CorrelationId Correlation);
|
||||
public sealed record ServiceLevelChanged(byte ServiceLevel);
|
||||
|
||||
public enum OpcUaQuality { Good, Uncertain, Bad }
|
||||
|
||||
private readonly IOpcUaAddressSpaceSink _sink;
|
||||
private readonly IServiceLevelPublisher _serviceLevel;
|
||||
private readonly bool _subscribeRedundancyTopic;
|
||||
private readonly NodeId? _localNode;
|
||||
private readonly ILoggingAdapter _log = Context.GetLogger();
|
||||
|
||||
private int _writes;
|
||||
|
||||
/// <summary>
|
||||
/// Returns Props pre-configured to use the <c>opcua-synchronized-dispatcher</c>. Caller can
|
||||
/// still override by chaining <c>.WithDispatcher(otherId)</c> for unit tests.
|
||||
/// </summary>
|
||||
public static Props Props() =>
|
||||
Akka.Actor.Props.Create(() => new OpcUaPublishActor()).WithDispatcher(DispatcherId);
|
||||
|
||||
/// <summary>Test-only Props that omits the pinned dispatcher requirement.</summary>
|
||||
public static Props PropsForTests() =>
|
||||
Akka.Actor.Props.Create(() => new OpcUaPublishActor());
|
||||
private byte _lastServiceLevel;
|
||||
|
||||
public int WriteCount => _writes;
|
||||
public byte LastServiceLevel => _lastServiceLevel;
|
||||
|
||||
public OpcUaPublishActor()
|
||||
/// <summary>Production Props — pins the OPC UA dispatcher + subscribes to the
|
||||
/// <c>redundancy-state</c> DPS topic so cluster transitions drive the local ServiceLevel
|
||||
/// publish path.</summary>
|
||||
public static Props Props(
|
||||
IOpcUaAddressSpaceSink? sink = null,
|
||||
IServiceLevelPublisher? serviceLevel = null,
|
||||
NodeId? localNode = null) =>
|
||||
Akka.Actor.Props.Create(() => new OpcUaPublishActor(
|
||||
sink ?? NullOpcUaAddressSpaceSink.Instance,
|
||||
serviceLevel ?? NullServiceLevelPublisher.Instance,
|
||||
subscribeRedundancyTopic: true,
|
||||
localNode)).WithDispatcher(DispatcherId);
|
||||
|
||||
/// <summary>Test-only Props that omits the pinned-dispatcher requirement and skips the
|
||||
/// DPS subscribe so unit tests can spin up the actor on a vanilla TestKit cluster.</summary>
|
||||
public static Props PropsForTests(
|
||||
IOpcUaAddressSpaceSink? sink = null,
|
||||
IServiceLevelPublisher? serviceLevel = null,
|
||||
bool subscribeRedundancyTopic = false,
|
||||
NodeId? localNode = null) =>
|
||||
Akka.Actor.Props.Create(() => new OpcUaPublishActor(
|
||||
sink ?? NullOpcUaAddressSpaceSink.Instance,
|
||||
serviceLevel ?? NullServiceLevelPublisher.Instance,
|
||||
subscribeRedundancyTopic,
|
||||
localNode));
|
||||
|
||||
public OpcUaPublishActor(
|
||||
IOpcUaAddressSpaceSink sink,
|
||||
IServiceLevelPublisher serviceLevel,
|
||||
bool subscribeRedundancyTopic,
|
||||
NodeId? localNode)
|
||||
{
|
||||
Receive<AttributeValueUpdate>(msg =>
|
||||
_sink = sink;
|
||||
_serviceLevel = serviceLevel;
|
||||
_subscribeRedundancyTopic = subscribeRedundancyTopic;
|
||||
_localNode = localNode;
|
||||
|
||||
Receive<AttributeValueUpdate>(HandleAttributeUpdate);
|
||||
Receive<AlarmStateUpdate>(HandleAlarmUpdate);
|
||||
Receive<RebuildAddressSpace>(HandleRebuild);
|
||||
Receive<ServiceLevelChanged>(HandleServiceLevelChanged);
|
||||
Receive<RedundancyStateChanged>(HandleRedundancyStateChanged);
|
||||
Receive<SubscribeAck>(_ => { /* PubSub ack */ });
|
||||
}
|
||||
|
||||
protected override void PreStart()
|
||||
{
|
||||
if (_subscribeRedundancyTopic)
|
||||
{
|
||||
// F10: call into OpcUaApplicationHost to write the address-space node.
|
||||
DistributedPubSub.Get(Context.System).Mediator.Tell(new Subscribe(RedundancyStateTopic, Self));
|
||||
}
|
||||
}
|
||||
|
||||
private void HandleAttributeUpdate(AttributeValueUpdate msg)
|
||||
{
|
||||
try
|
||||
{
|
||||
_sink.WriteValue(msg.NodeId, msg.Value, msg.Quality, msg.TimestampUtc);
|
||||
Interlocked.Increment(ref _writes);
|
||||
_log.Debug("OpcUaPublish: queued AttributeValueUpdate for {Node} ({Quality}) (write staged for F10)",
|
||||
msg.NodeId, msg.Quality);
|
||||
});
|
||||
Receive<AlarmStateUpdate>(msg =>
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_log.Warning(ex, "OpcUaPublish: sink.WriteValue threw for {Node}", msg.NodeId);
|
||||
}
|
||||
}
|
||||
|
||||
private void HandleAlarmUpdate(AlarmStateUpdate msg)
|
||||
{
|
||||
try
|
||||
{
|
||||
_sink.WriteAlarmState(msg.AlarmNodeId, msg.Active, msg.Acknowledged, msg.TimestampUtc);
|
||||
Interlocked.Increment(ref _writes);
|
||||
_log.Debug("OpcUaPublish: queued AlarmStateUpdate for {Node} (active={Active})",
|
||||
msg.AlarmNodeId, msg.Active);
|
||||
});
|
||||
Receive<RebuildAddressSpace>(msg =>
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_log.Info("OpcUaPublish: address-space rebuild requested (correlation={Correlation}); F10 wires the SDK call",
|
||||
_log.Warning(ex, "OpcUaPublish: sink.WriteAlarmState threw for {Node}", msg.AlarmNodeId);
|
||||
}
|
||||
}
|
||||
|
||||
private void HandleRebuild(RebuildAddressSpace msg)
|
||||
{
|
||||
try
|
||||
{
|
||||
_sink.RebuildAddressSpace();
|
||||
_log.Info("OpcUaPublish: address-space rebuilt (correlation={Correlation})", msg.Correlation);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_log.Error(ex, "OpcUaPublish: sink.RebuildAddressSpace threw (correlation={Correlation})",
|
||||
msg.Correlation);
|
||||
});
|
||||
Receive<ServiceLevelChanged>(msg =>
|
||||
}
|
||||
}
|
||||
|
||||
private void HandleServiceLevelChanged(ServiceLevelChanged msg)
|
||||
{
|
||||
if (msg.ServiceLevel == _lastServiceLevel) return;
|
||||
_lastServiceLevel = msg.ServiceLevel;
|
||||
try
|
||||
{
|
||||
_log.Debug("OpcUaPublish: ServiceLevel={Level} (write staged for F10)", msg.ServiceLevel);
|
||||
});
|
||||
_serviceLevel.Publish(msg.ServiceLevel);
|
||||
_log.Debug("OpcUaPublish: ServiceLevel={Level}", msg.ServiceLevel);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_log.Warning(ex, "OpcUaPublish: ServiceLevel publisher threw at level {Level}", msg.ServiceLevel);
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Compute a coarse ServiceLevel from the cluster snapshot and forward to the
|
||||
/// <see cref="IServiceLevelPublisher"/>. This is a placeholder for F10b's full health
|
||||
/// aggregation — for now we surface "primary-leader → 240, secondary → 100, detached → 0"
|
||||
/// so the local SDK at least reflects role state. The full <see cref="ServiceLevelCalculator"/>
|
||||
/// path (with DB-reachable, OPC UA probe inputs) lives in <c>RedundancyStateActor</c> on
|
||||
/// admin nodes; this driver-side mirror exists so each node's own SDK exposes a sensible
|
||||
/// ServiceLevel without round-tripping back through the admin singleton.
|
||||
/// </summary>
|
||||
private void HandleRedundancyStateChanged(RedundancyStateChanged msg)
|
||||
{
|
||||
if (_localNode is null) return;
|
||||
|
||||
var local = msg.Nodes.FirstOrDefault(n => n.NodeId == _localNode.Value);
|
||||
if (local is null) return;
|
||||
|
||||
byte level = local.Role switch
|
||||
{
|
||||
RedundancyRole.Primary when local.IsRoleLeaderForDriver => 240,
|
||||
RedundancyRole.Primary => 200,
|
||||
RedundancyRole.Secondary => 100,
|
||||
RedundancyRole.Detached => 0,
|
||||
_ => 0,
|
||||
};
|
||||
Self.Tell(new ServiceLevelChanged(level));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,60 +1,162 @@
|
||||
using Akka.Actor;
|
||||
using Akka.Cluster.Tools.PublishSubscribe;
|
||||
using Akka.Event;
|
||||
using ZB.MOM.WW.OtOpcUa.Commons.Engines;
|
||||
using ZB.MOM.WW.OtOpcUa.Commons.Messages.Alerts;
|
||||
using ZB.MOM.WW.OtOpcUa.Commons.Messages.Logging;
|
||||
using ZB.MOM.WW.OtOpcUa.Runtime.VirtualTags;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Runtime.ScriptedAlarms;
|
||||
|
||||
public enum ScriptedAlarmActorState { Inactive, Active, Acknowledged }
|
||||
|
||||
/// <summary>
|
||||
/// State machine wrapping a single scripted alarm. Transitions:
|
||||
/// <c>Inactive → Active → Acknowledged → Inactive</c>.
|
||||
///
|
||||
/// Engine wiring (compile alarm expression via <c>AlarmConditionService</c>, persist state to
|
||||
/// <c>ScriptedAlarmState</c> ConfigDb table on <c>PreRestart</c>, emit history rows to
|
||||
/// <c>HistorianAdapter</c>) is staged for follow-up F9. This skeleton owns the state machine
|
||||
/// so DriverHostActor can spawn it as a child.
|
||||
/// One scripted alarm. Receives dependency value updates, runs the predicate via an
|
||||
/// injected <see cref="IScriptedAlarmEvaluator"/>, and on transitions publishes both
|
||||
/// an <see cref="AlarmTransitionEvent"/> on the cluster <c>alerts</c> DPS topic and a
|
||||
/// <see cref="ScriptLogEntry"/> on <c>script-logs</c>. Manual <see cref="AcknowledgeAlarm"/>
|
||||
/// + <see cref="ConditionCleared"/> still flow through the same state machine so the
|
||||
/// legacy callers keep working.
|
||||
/// </summary>
|
||||
public sealed class ScriptedAlarmActor : ReceiveActor
|
||||
{
|
||||
public const string AlertsTopic = "alerts";
|
||||
public const string ScriptLogsTopic = "script-logs";
|
||||
|
||||
public sealed record DependencyValueChanged(string TagId, object? Value, DateTime TimestampUtc);
|
||||
public sealed record ConditionMet(string Reason);
|
||||
public sealed record AcknowledgeAlarm(string Actor);
|
||||
public sealed record ConditionCleared;
|
||||
public sealed record StateChanged(string AlarmId, ScriptedAlarmActorState State, DateTime AtUtc);
|
||||
|
||||
private readonly string _alarmId;
|
||||
public sealed record AlarmConfig(
|
||||
string AlarmId,
|
||||
string AlarmName,
|
||||
string EquipmentPath,
|
||||
int Severity,
|
||||
string? Predicate);
|
||||
|
||||
private readonly AlarmConfig _config;
|
||||
private readonly IScriptedAlarmEvaluator _evaluator;
|
||||
private readonly Func<DPSPublisher>? _publisherFactory;
|
||||
private readonly ILoggingAdapter _log = Context.GetLogger();
|
||||
private readonly Dictionary<string, object?> _dependencies = new(StringComparer.Ordinal);
|
||||
|
||||
private ScriptedAlarmActorState _state = ScriptedAlarmActorState.Inactive;
|
||||
|
||||
public static Props Props(
|
||||
AlarmConfig config,
|
||||
IScriptedAlarmEvaluator? evaluator = null,
|
||||
Func<DPSPublisher>? publisherFactory = null) =>
|
||||
Akka.Actor.Props.Create(() => new ScriptedAlarmActor(
|
||||
config,
|
||||
evaluator ?? NullScriptedAlarmEvaluator.Instance,
|
||||
publisherFactory));
|
||||
|
||||
/// <summary>Legacy single-arg ctor kept for callers that only care about the state machine
|
||||
/// (no engine evaluation, no DPS fan-out). Equivalent to <c>Props(new AlarmConfig(...))</c>.</summary>
|
||||
public static Props Props(string alarmId) =>
|
||||
Akka.Actor.Props.Create(() => new ScriptedAlarmActor(alarmId));
|
||||
Props(new AlarmConfig(alarmId, alarmId, EquipmentPath: "", Severity: 500, Predicate: null));
|
||||
|
||||
public ScriptedAlarmActor(string alarmId)
|
||||
public ScriptedAlarmActor(AlarmConfig config, IScriptedAlarmEvaluator evaluator, Func<DPSPublisher>? publisherFactory)
|
||||
{
|
||||
_alarmId = alarmId;
|
||||
_config = config;
|
||||
_evaluator = evaluator;
|
||||
_publisherFactory = publisherFactory;
|
||||
|
||||
Receive<ConditionMet>(msg =>
|
||||
{
|
||||
if (_state != ScriptedAlarmActorState.Inactive) return;
|
||||
Transition(ScriptedAlarmActorState.Active);
|
||||
});
|
||||
Receive<AcknowledgeAlarm>(msg =>
|
||||
{
|
||||
if (_state != ScriptedAlarmActorState.Active) return;
|
||||
Transition(ScriptedAlarmActorState.Acknowledged);
|
||||
});
|
||||
Receive<ConditionCleared>(_ =>
|
||||
{
|
||||
if (_state == ScriptedAlarmActorState.Inactive) return;
|
||||
Transition(ScriptedAlarmActorState.Inactive);
|
||||
});
|
||||
Receive<DependencyValueChanged>(OnDependencyChanged);
|
||||
Receive<ConditionMet>(_ => { if (_state == ScriptedAlarmActorState.Inactive) Transition(ScriptedAlarmActorState.Active, user: "system"); });
|
||||
Receive<AcknowledgeAlarm>(msg => { if (_state == ScriptedAlarmActorState.Active) Transition(ScriptedAlarmActorState.Acknowledged, user: msg.Actor); });
|
||||
Receive<ConditionCleared>(_ => { if (_state != ScriptedAlarmActorState.Inactive) Transition(ScriptedAlarmActorState.Inactive, user: "system"); });
|
||||
}
|
||||
|
||||
private void Transition(ScriptedAlarmActorState next)
|
||||
private void OnDependencyChanged(DependencyValueChanged msg)
|
||||
{
|
||||
_dependencies[msg.TagId] = msg.Value;
|
||||
|
||||
if (string.IsNullOrEmpty(_config.Predicate)) return;
|
||||
|
||||
ScriptedAlarmEvalResult result;
|
||||
try
|
||||
{
|
||||
result = _evaluator.Evaluate(_config.AlarmId, _config.Predicate, _dependencies);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_log.Warning(ex, "ScriptedAlarm {Id}: evaluator threw", _config.AlarmId);
|
||||
PublishLog("Error", $"evaluator threw: {ex.Message}");
|
||||
return;
|
||||
}
|
||||
|
||||
if (!result.Success)
|
||||
{
|
||||
PublishLog("Warning", result.Reason ?? "evaluator failure");
|
||||
return;
|
||||
}
|
||||
|
||||
// Active condition wins regardless of ack state — re-firing is suppressed because
|
||||
// _state already == Active. Cleared moves Active OR Acknowledged → Inactive.
|
||||
if (result.Active && _state == ScriptedAlarmActorState.Inactive)
|
||||
{
|
||||
Transition(ScriptedAlarmActorState.Active, user: "system");
|
||||
}
|
||||
else if (!result.Active && _state != ScriptedAlarmActorState.Inactive)
|
||||
{
|
||||
Transition(ScriptedAlarmActorState.Inactive, user: "system");
|
||||
}
|
||||
}
|
||||
|
||||
private void Transition(ScriptedAlarmActorState next, string user)
|
||||
{
|
||||
var prev = _state;
|
||||
_state = next;
|
||||
_log.Info("ScriptedAlarm {Id}: {From} → {To}", _alarmId, prev, next);
|
||||
Context.Parent.Tell(new StateChanged(_alarmId, next, DateTime.UtcNow));
|
||||
// F9: emit history row via HistorianAdapter; persist state to ScriptedAlarmState DB.
|
||||
_log.Info("ScriptedAlarm {Id}: {From} → {To}", _config.AlarmId, prev, next);
|
||||
|
||||
var nowUtc = DateTime.UtcNow;
|
||||
Context.Parent.Tell(new StateChanged(_config.AlarmId, next, nowUtc));
|
||||
|
||||
var kind = next switch
|
||||
{
|
||||
ScriptedAlarmActorState.Active => "Activated",
|
||||
ScriptedAlarmActorState.Acknowledged => "Acknowledged",
|
||||
ScriptedAlarmActorState.Inactive => "Cleared",
|
||||
_ => next.ToString(),
|
||||
};
|
||||
|
||||
var evt = new AlarmTransitionEvent(
|
||||
AlarmId: _config.AlarmId,
|
||||
EquipmentPath: _config.EquipmentPath,
|
||||
AlarmName: _config.AlarmName,
|
||||
TransitionKind: kind,
|
||||
Severity: _config.Severity,
|
||||
Message: $"{_config.AlarmName} {kind}",
|
||||
User: user,
|
||||
TimestampUtc: nowUtc);
|
||||
|
||||
PublishOrFallback(AlertsTopic, evt);
|
||||
PublishLog("Information", $"{_config.AlarmName} {kind} (by {user})");
|
||||
}
|
||||
|
||||
private void PublishLog(string level, string message)
|
||||
{
|
||||
var entry = new ScriptLogEntry(
|
||||
ScriptId: _config.AlarmId,
|
||||
Level: level,
|
||||
Message: message,
|
||||
TimestampUtc: DateTime.UtcNow,
|
||||
VirtualTagId: null,
|
||||
AlarmId: _config.AlarmId,
|
||||
EquipmentId: null);
|
||||
PublishOrFallback(ScriptLogsTopic, entry);
|
||||
}
|
||||
|
||||
private void PublishOrFallback(string topic, object payload)
|
||||
{
|
||||
if (_publisherFactory is not null)
|
||||
{
|
||||
_publisherFactory().Publish(topic, payload);
|
||||
return;
|
||||
}
|
||||
DistributedPubSub.Get(Context.System).Mediator.Tell(new Publish(topic, payload));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -5,6 +5,7 @@ using Microsoft.Extensions.DependencyInjection;
|
||||
using Microsoft.Extensions.DependencyInjection.Extensions;
|
||||
using ZB.MOM.WW.OtOpcUa.Commons.Interfaces;
|
||||
using ZB.MOM.WW.OtOpcUa.Configuration;
|
||||
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
||||
using ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian;
|
||||
using ZB.MOM.WW.OtOpcUa.Runtime.Drivers;
|
||||
using ZB.MOM.WW.OtOpcUa.Runtime.Health;
|
||||
@@ -29,6 +30,7 @@ public static class ServiceCollectionExtensions
|
||||
public static IServiceCollection AddOtOpcUaRuntime(this IServiceCollection services)
|
||||
{
|
||||
services.TryAddSingleton<IAlarmHistorianSink>(NullAlarmHistorianSink.Instance);
|
||||
services.TryAddSingleton<IDriverFactory>(NullDriverFactory.Instance);
|
||||
return services;
|
||||
}
|
||||
|
||||
@@ -54,8 +56,9 @@ public static class ServiceCollectionExtensions
|
||||
{
|
||||
var dbFactory = resolver.GetService<IDbContextFactory<OtOpcUaConfigDbContext>>();
|
||||
var roleInfo = resolver.GetService<IClusterRoleInfo>();
|
||||
// Fallback to NullAlarmHistorianSink if AddOtOpcUaRuntime wasn't called (e.g., test harnesses).
|
||||
// Fallback to Null* if AddOtOpcUaRuntime wasn't called (e.g., test harnesses).
|
||||
var historianSink = resolver.GetService<IAlarmHistorianSink>() ?? NullAlarmHistorianSink.Instance;
|
||||
var driverFactory = resolver.GetService<IDriverFactory>() ?? NullDriverFactory.Instance;
|
||||
|
||||
var dbHealth = system.ActorOf(
|
||||
DbHealthProbeActor.Props(dbFactory),
|
||||
@@ -63,7 +66,8 @@ public static class ServiceCollectionExtensions
|
||||
registry.Register<DbHealthProbeActorKey>(dbHealth);
|
||||
|
||||
var driverHost = system.ActorOf(
|
||||
DriverHostActor.Props(dbFactory, roleInfo.LocalNode),
|
||||
DriverHostActor.Props(dbFactory, roleInfo.LocalNode, coordinator: null,
|
||||
driverFactory: driverFactory, localRoles: roleInfo.LocalRoles),
|
||||
DriverHostActorName);
|
||||
registry.Register<DriverHostActorKey>(driverHost);
|
||||
|
||||
|
||||
@@ -1,41 +1,123 @@
|
||||
using Akka.Actor;
|
||||
using Akka.Cluster.Tools.PublishSubscribe;
|
||||
using Akka.Event;
|
||||
using ZB.MOM.WW.OtOpcUa.Commons.Engines;
|
||||
using ZB.MOM.WW.OtOpcUa.Commons.Messages.Logging;
|
||||
using ZB.MOM.WW.OtOpcUa.Commons.Types;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Runtime.VirtualTags;
|
||||
|
||||
/// <summary>
|
||||
/// Wraps a single virtual-tag expression. Receives dependency-tag updates, recomputes the
|
||||
/// expression, and publishes the result to <c>OpcUaPublishActor</c>.
|
||||
///
|
||||
/// Engine wiring (compile expression via <c>VirtualTagEngine</c>, manage subscriptions,
|
||||
/// emit <c>AttributeValueUpdate</c>) is staged for follow-up F8. This skeleton compiles + has
|
||||
/// a basic message contract so DriverHostActor can spawn it as a child.
|
||||
/// expression via an injected <see cref="IVirtualTagEvaluator"/>, and emits an
|
||||
/// <see cref="EvaluationResult"/> to the parent (the publish actor) whenever the value
|
||||
/// actually changes. Script failures publish a Warning <see cref="ScriptLogEntry"/> on the
|
||||
/// <c>script-logs</c> DPS topic so operators see the diagnostic in the live tail.
|
||||
/// </summary>
|
||||
public sealed class VirtualTagActor : ReceiveActor
|
||||
{
|
||||
public const string ScriptLogsTopic = "script-logs";
|
||||
|
||||
public sealed record DependencyValueChanged(string TagId, object? Value, DateTime TimestampUtc);
|
||||
public sealed record EvaluationResult(string VirtualTagId, object? Value, DateTime TimestampUtc, CorrelationId Correlation);
|
||||
|
||||
private readonly string _virtualTagId;
|
||||
private readonly string _scriptId;
|
||||
private readonly string _expression;
|
||||
private readonly IVirtualTagEvaluator _evaluator;
|
||||
private readonly Func<DPSPublisher>? _publisherFactory;
|
||||
private readonly ILoggingAdapter _log = Context.GetLogger();
|
||||
private readonly Dictionary<string, object?> _dependencies = new(StringComparer.Ordinal);
|
||||
|
||||
public static Props Props(string virtualTagId, string expression) =>
|
||||
Akka.Actor.Props.Create(() => new VirtualTagActor(virtualTagId, expression));
|
||||
private bool _hasLastValue;
|
||||
private object? _lastValue;
|
||||
|
||||
public VirtualTagActor(string virtualTagId, string expression)
|
||||
public static Props Props(
|
||||
string virtualTagId,
|
||||
string expression,
|
||||
IVirtualTagEvaluator? evaluator = null,
|
||||
string? scriptId = null,
|
||||
Func<DPSPublisher>? publisherFactory = null) =>
|
||||
Akka.Actor.Props.Create(() => new VirtualTagActor(
|
||||
virtualTagId, expression,
|
||||
evaluator ?? NullVirtualTagEvaluator.Instance,
|
||||
scriptId ?? virtualTagId,
|
||||
publisherFactory));
|
||||
|
||||
public VirtualTagActor(
|
||||
string virtualTagId,
|
||||
string expression,
|
||||
IVirtualTagEvaluator evaluator,
|
||||
string scriptId,
|
||||
Func<DPSPublisher>? publisherFactory)
|
||||
{
|
||||
_virtualTagId = virtualTagId;
|
||||
_scriptId = scriptId;
|
||||
_expression = expression;
|
||||
_evaluator = evaluator;
|
||||
_publisherFactory = publisherFactory;
|
||||
|
||||
Receive<DependencyValueChanged>(msg =>
|
||||
Receive<DependencyValueChanged>(OnDependencyChanged);
|
||||
}
|
||||
|
||||
private void OnDependencyChanged(DependencyValueChanged msg)
|
||||
{
|
||||
_dependencies[msg.TagId] = msg.Value;
|
||||
|
||||
VirtualTagEvalResult result;
|
||||
try
|
||||
{
|
||||
_dependencies[msg.TagId] = msg.Value;
|
||||
// Engine wiring (F8): VirtualTagEngine.Evaluate(_expression, _dependencies) → publish.
|
||||
_log.Debug("VirtualTag {Id}: dependency {Tag}={Value} buffered (eval staged for F8)",
|
||||
_virtualTagId, msg.TagId, msg.Value);
|
||||
});
|
||||
result = _evaluator.Evaluate(_virtualTagId, _expression, _dependencies);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_log.Warning(ex, "VirtualTag {Id}: evaluator threw", _virtualTagId);
|
||||
PublishLog("Error", $"evaluator threw: {ex.Message}");
|
||||
return;
|
||||
}
|
||||
|
||||
if (!result.Success)
|
||||
{
|
||||
PublishLog("Warning", result.Reason ?? "evaluator failure");
|
||||
return;
|
||||
}
|
||||
|
||||
// Skip no-change results. Real evaluator returns Ok(value); Null returns NoChange — both
|
||||
// safe because Null never produces a fresh value.
|
||||
if (ReferenceEquals(result, VirtualTagEvalResult.NoChange)) return;
|
||||
|
||||
if (_hasLastValue && Equals(_lastValue, result.Value)) return;
|
||||
|
||||
_hasLastValue = true;
|
||||
_lastValue = result.Value;
|
||||
var evalResult = new EvaluationResult(_virtualTagId, result.Value, msg.TimestampUtc, CorrelationId.NewId());
|
||||
Context.Parent.Tell(evalResult);
|
||||
}
|
||||
|
||||
private void PublishLog(string level, string message)
|
||||
{
|
||||
var entry = new ScriptLogEntry(
|
||||
ScriptId: _scriptId,
|
||||
Level: level,
|
||||
Message: message,
|
||||
TimestampUtc: DateTime.UtcNow,
|
||||
VirtualTagId: _virtualTagId,
|
||||
AlarmId: null,
|
||||
EquipmentId: null);
|
||||
|
||||
if (_publisherFactory is not null)
|
||||
{
|
||||
_publisherFactory().Publish(ScriptLogsTopic, entry);
|
||||
return;
|
||||
}
|
||||
|
||||
DistributedPubSub.Get(Context.System).Mediator.Tell(new Publish(ScriptLogsTopic, entry));
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Thin seam for tests to capture DPS publishes without standing up a real cluster.
|
||||
/// Production never instantiates this directly — the actor falls through to
|
||||
/// <see cref="DistributedPubSub"/> when the factory is null.
|
||||
/// </summary>
|
||||
public sealed record DPSPublisher(Action<string, object> Publish);
|
||||
|
||||
@@ -0,0 +1,150 @@
|
||||
using System.Collections.Concurrent;
|
||||
using Microsoft.Extensions.Logging.Abstractions;
|
||||
using Shouldly;
|
||||
using Xunit;
|
||||
using ZB.MOM.WW.OtOpcUa.Commons.OpcUa;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.OpcUaServer.Tests;
|
||||
|
||||
public sealed class Phase7ApplierTests
|
||||
{
|
||||
[Fact]
|
||||
public void Empty_plan_does_not_call_sink_and_does_not_trigger_rebuild()
|
||||
{
|
||||
var sink = new RecordingSink();
|
||||
var applier = new Phase7Applier(sink, NullLogger<Phase7Applier>.Instance);
|
||||
|
||||
var outcome = applier.Apply(EmptyPlan);
|
||||
|
||||
outcome.RebuildCalled.ShouldBeFalse();
|
||||
outcome.AddedNodes.ShouldBe(0);
|
||||
outcome.RemovedNodes.ShouldBe(0);
|
||||
outcome.ChangedNodes.ShouldBe(0);
|
||||
sink.RebuildCalls.ShouldBe(0);
|
||||
sink.AlarmWrites.ShouldBeEmpty();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Removed_equipment_writes_inactive_alarm_state_per_id_and_triggers_rebuild()
|
||||
{
|
||||
var sink = new RecordingSink();
|
||||
var applier = new Phase7Applier(sink, NullLogger<Phase7Applier>.Instance);
|
||||
|
||||
var plan = WithEquipmentRemoval("eq-1", "eq-2");
|
||||
var outcome = applier.Apply(plan);
|
||||
|
||||
outcome.RemovedNodes.ShouldBe(2);
|
||||
outcome.RebuildCalled.ShouldBeTrue();
|
||||
sink.AlarmWrites.Select(a => a.NodeId).OrderBy(x => x).ShouldBe(new[] { "eq-1", "eq-2" });
|
||||
sink.AlarmWrites.All(a => a.Active == false && a.Acknowledged == false).ShouldBeTrue();
|
||||
sink.RebuildCalls.ShouldBe(1);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Added_equipment_triggers_rebuild_without_alarm_writes()
|
||||
{
|
||||
var sink = new RecordingSink();
|
||||
var applier = new Phase7Applier(sink, NullLogger<Phase7Applier>.Instance);
|
||||
|
||||
var plan = new Phase7Plan(
|
||||
AddedEquipment: new[] { new EquipmentNode("new", "New", "line-1") },
|
||||
RemovedEquipment: Array.Empty<EquipmentNode>(),
|
||||
ChangedEquipment: Array.Empty<Phase7Plan.EquipmentDelta>(),
|
||||
AddedDrivers: Array.Empty<DriverInstancePlan>(),
|
||||
RemovedDrivers: Array.Empty<DriverInstancePlan>(),
|
||||
ChangedDrivers: Array.Empty<Phase7Plan.DriverDelta>(),
|
||||
AddedAlarms: Array.Empty<ScriptedAlarmPlan>(),
|
||||
RemovedAlarms: Array.Empty<ScriptedAlarmPlan>(),
|
||||
ChangedAlarms: Array.Empty<Phase7Plan.AlarmDelta>());
|
||||
|
||||
var outcome = applier.Apply(plan);
|
||||
|
||||
outcome.RebuildCalled.ShouldBeTrue();
|
||||
outcome.AddedNodes.ShouldBe(1);
|
||||
sink.AlarmWrites.ShouldBeEmpty();
|
||||
sink.RebuildCalls.ShouldBe(1);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Driver_only_changes_do_not_trigger_address_space_rebuild()
|
||||
{
|
||||
var sink = new RecordingSink();
|
||||
var applier = new Phase7Applier(sink, NullLogger<Phase7Applier>.Instance);
|
||||
|
||||
var plan = new Phase7Plan(
|
||||
AddedEquipment: Array.Empty<EquipmentNode>(),
|
||||
RemovedEquipment: Array.Empty<EquipmentNode>(),
|
||||
ChangedEquipment: Array.Empty<Phase7Plan.EquipmentDelta>(),
|
||||
AddedDrivers: new[] { new DriverInstancePlan("d-new", "Modbus", "{}") },
|
||||
RemovedDrivers: Array.Empty<DriverInstancePlan>(),
|
||||
ChangedDrivers: new[]
|
||||
{
|
||||
new Phase7Plan.DriverDelta(
|
||||
new DriverInstancePlan("d-1", "Modbus", "{\"v\":1}"),
|
||||
new DriverInstancePlan("d-1", "Modbus", "{\"v\":2}")),
|
||||
},
|
||||
AddedAlarms: Array.Empty<ScriptedAlarmPlan>(),
|
||||
RemovedAlarms: Array.Empty<ScriptedAlarmPlan>(),
|
||||
ChangedAlarms: Array.Empty<Phase7Plan.AlarmDelta>());
|
||||
|
||||
var outcome = applier.Apply(plan);
|
||||
|
||||
outcome.RebuildCalled.ShouldBeFalse();
|
||||
sink.RebuildCalls.ShouldBe(0);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Sink_exception_in_WriteAlarmState_does_not_propagate_and_rebuild_still_fires()
|
||||
{
|
||||
var sink = new ThrowingSink(throwOnAlarmWrite: true);
|
||||
var applier = new Phase7Applier(sink, NullLogger<Phase7Applier>.Instance);
|
||||
|
||||
var plan = WithEquipmentRemoval("eq-1");
|
||||
|
||||
var outcome = applier.Apply(plan); // should not throw
|
||||
outcome.RemovedNodes.ShouldBe(1);
|
||||
outcome.RebuildCalled.ShouldBeTrue();
|
||||
}
|
||||
|
||||
private static Phase7Plan EmptyPlan => new(
|
||||
Array.Empty<EquipmentNode>(), Array.Empty<EquipmentNode>(), Array.Empty<Phase7Plan.EquipmentDelta>(),
|
||||
Array.Empty<DriverInstancePlan>(), Array.Empty<DriverInstancePlan>(), Array.Empty<Phase7Plan.DriverDelta>(),
|
||||
Array.Empty<ScriptedAlarmPlan>(), Array.Empty<ScriptedAlarmPlan>(), Array.Empty<Phase7Plan.AlarmDelta>());
|
||||
|
||||
private static Phase7Plan WithEquipmentRemoval(params string[] ids) => new(
|
||||
AddedEquipment: Array.Empty<EquipmentNode>(),
|
||||
RemovedEquipment: ids.Select(id => new EquipmentNode(id, id, "line-1")).ToArray(),
|
||||
ChangedEquipment: Array.Empty<Phase7Plan.EquipmentDelta>(),
|
||||
AddedDrivers: Array.Empty<DriverInstancePlan>(),
|
||||
RemovedDrivers: Array.Empty<DriverInstancePlan>(),
|
||||
ChangedDrivers: Array.Empty<Phase7Plan.DriverDelta>(),
|
||||
AddedAlarms: Array.Empty<ScriptedAlarmPlan>(),
|
||||
RemovedAlarms: Array.Empty<ScriptedAlarmPlan>(),
|
||||
ChangedAlarms: Array.Empty<Phase7Plan.AlarmDelta>());
|
||||
|
||||
private sealed class RecordingSink : IOpcUaAddressSpaceSink
|
||||
{
|
||||
public ConcurrentQueue<(string NodeId, bool Active, bool Acknowledged)> AlarmQueue { get; } = new();
|
||||
public int RebuildCalls;
|
||||
|
||||
public List<(string NodeId, bool Active, bool Acknowledged)> AlarmWrites => AlarmQueue.ToList();
|
||||
|
||||
public void WriteValue(string nodeId, object? value, OpcUaQuality quality, DateTime sourceTimestampUtc) { }
|
||||
public void WriteAlarmState(string alarmNodeId, bool active, bool acknowledged, DateTime sourceTimestampUtc)
|
||||
=> AlarmQueue.Enqueue((alarmNodeId, active, acknowledged));
|
||||
public void RebuildAddressSpace() => Interlocked.Increment(ref RebuildCalls);
|
||||
}
|
||||
|
||||
private sealed class ThrowingSink : IOpcUaAddressSpaceSink
|
||||
{
|
||||
private readonly bool _throwOnAlarmWrite;
|
||||
public ThrowingSink(bool throwOnAlarmWrite) { _throwOnAlarmWrite = throwOnAlarmWrite; }
|
||||
|
||||
public void WriteValue(string nodeId, object? value, OpcUaQuality quality, DateTime sourceTimestampUtc) { }
|
||||
public void WriteAlarmState(string alarmNodeId, bool active, bool acknowledged, DateTime sourceTimestampUtc)
|
||||
{
|
||||
if (_throwOnAlarmWrite) throw new InvalidOperationException("simulated sink fault");
|
||||
}
|
||||
public void RebuildAddressSpace() { }
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,150 @@
|
||||
using Shouldly;
|
||||
using Xunit;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.OpcUaServer.Tests;
|
||||
|
||||
public sealed class Phase7PlannerTests
|
||||
{
|
||||
[Fact]
|
||||
public void Empty_inputs_produce_empty_plan()
|
||||
{
|
||||
var prev = new Phase7CompositionResult(Array.Empty<EquipmentNode>(), Array.Empty<DriverInstancePlan>(), Array.Empty<ScriptedAlarmPlan>());
|
||||
var next = prev;
|
||||
|
||||
var plan = Phase7Planner.Compute(prev, next);
|
||||
|
||||
plan.IsEmpty.ShouldBeTrue();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Identical_compositions_produce_empty_plan()
|
||||
{
|
||||
var eq = new EquipmentNode("eq-1", "Eq 1", "line-1");
|
||||
var prev = new Phase7CompositionResult(new[] { eq }, Array.Empty<DriverInstancePlan>(), Array.Empty<ScriptedAlarmPlan>());
|
||||
var next = new Phase7CompositionResult(new[] { eq }, Array.Empty<DriverInstancePlan>(), Array.Empty<ScriptedAlarmPlan>());
|
||||
|
||||
var plan = Phase7Planner.Compute(prev, next);
|
||||
|
||||
plan.IsEmpty.ShouldBeTrue();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void New_equipment_goes_to_AddedEquipment()
|
||||
{
|
||||
var prev = new Phase7CompositionResult(Array.Empty<EquipmentNode>(), Array.Empty<DriverInstancePlan>(), Array.Empty<ScriptedAlarmPlan>());
|
||||
var next = new Phase7CompositionResult(
|
||||
new[] { new EquipmentNode("eq-1", "A", "line-1") },
|
||||
Array.Empty<DriverInstancePlan>(),
|
||||
Array.Empty<ScriptedAlarmPlan>());
|
||||
|
||||
var plan = Phase7Planner.Compute(prev, next);
|
||||
|
||||
plan.AddedEquipment.Single().EquipmentId.ShouldBe("eq-1");
|
||||
plan.RemovedEquipment.ShouldBeEmpty();
|
||||
plan.ChangedEquipment.ShouldBeEmpty();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Disappeared_equipment_goes_to_RemovedEquipment()
|
||||
{
|
||||
var prev = new Phase7CompositionResult(
|
||||
new[] { new EquipmentNode("eq-1", "A", "line-1") },
|
||||
Array.Empty<DriverInstancePlan>(),
|
||||
Array.Empty<ScriptedAlarmPlan>());
|
||||
var next = new Phase7CompositionResult(Array.Empty<EquipmentNode>(), Array.Empty<DriverInstancePlan>(), Array.Empty<ScriptedAlarmPlan>());
|
||||
|
||||
var plan = Phase7Planner.Compute(prev, next);
|
||||
|
||||
plan.RemovedEquipment.Single().EquipmentId.ShouldBe("eq-1");
|
||||
plan.AddedEquipment.ShouldBeEmpty();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Same_id_with_different_display_name_routes_to_ChangedEquipment()
|
||||
{
|
||||
var prev = new Phase7CompositionResult(
|
||||
new[] { new EquipmentNode("eq-1", "Old", "line-1") },
|
||||
Array.Empty<DriverInstancePlan>(),
|
||||
Array.Empty<ScriptedAlarmPlan>());
|
||||
var next = new Phase7CompositionResult(
|
||||
new[] { new EquipmentNode("eq-1", "New", "line-1") },
|
||||
Array.Empty<DriverInstancePlan>(),
|
||||
Array.Empty<ScriptedAlarmPlan>());
|
||||
|
||||
var plan = Phase7Planner.Compute(prev, next);
|
||||
|
||||
plan.ChangedEquipment.Single().Previous.DisplayName.ShouldBe("Old");
|
||||
plan.ChangedEquipment.Single().Current.DisplayName.ShouldBe("New");
|
||||
plan.AddedEquipment.ShouldBeEmpty();
|
||||
plan.RemovedEquipment.ShouldBeEmpty();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Driver_config_change_routes_to_ChangedDrivers()
|
||||
{
|
||||
var prev = new Phase7CompositionResult(
|
||||
Array.Empty<EquipmentNode>(),
|
||||
new[] { new DriverInstancePlan("drv-1", "Modbus", "{\"host\":\"old\"}") },
|
||||
Array.Empty<ScriptedAlarmPlan>());
|
||||
var next = new Phase7CompositionResult(
|
||||
Array.Empty<EquipmentNode>(),
|
||||
new[] { new DriverInstancePlan("drv-1", "Modbus", "{\"host\":\"new\"}") },
|
||||
Array.Empty<ScriptedAlarmPlan>());
|
||||
|
||||
var plan = Phase7Planner.Compute(prev, next);
|
||||
|
||||
plan.ChangedDrivers.Single().Current.ConfigJson.ShouldContain("new");
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Alarm_message_template_change_routes_to_ChangedAlarms()
|
||||
{
|
||||
var prev = new Phase7CompositionResult(
|
||||
Array.Empty<EquipmentNode>(),
|
||||
Array.Empty<DriverInstancePlan>(),
|
||||
new[] { new ScriptedAlarmPlan("a-1", "eq-1", "script-1", "old") });
|
||||
var next = new Phase7CompositionResult(
|
||||
Array.Empty<EquipmentNode>(),
|
||||
Array.Empty<DriverInstancePlan>(),
|
||||
new[] { new ScriptedAlarmPlan("a-1", "eq-1", "script-1", "new") });
|
||||
|
||||
var plan = Phase7Planner.Compute(prev, next);
|
||||
|
||||
plan.ChangedAlarms.Single().Current.MessageTemplate.ShouldBe("new");
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Added_and_removed_lists_are_sorted_by_id_for_deterministic_ordering()
|
||||
{
|
||||
var prev = new Phase7CompositionResult(
|
||||
new[] { new EquipmentNode("z", "Z", "line-1"), new EquipmentNode("a", "A", "line-1") },
|
||||
Array.Empty<DriverInstancePlan>(),
|
||||
Array.Empty<ScriptedAlarmPlan>());
|
||||
var next = new Phase7CompositionResult(Array.Empty<EquipmentNode>(), Array.Empty<DriverInstancePlan>(), Array.Empty<ScriptedAlarmPlan>());
|
||||
|
||||
var plan = Phase7Planner.Compute(prev, next);
|
||||
|
||||
plan.RemovedEquipment.Select(e => e.EquipmentId).ShouldBe(new[] { "a", "z" });
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Mixed_changes_across_all_three_classes_are_captured_in_one_pass()
|
||||
{
|
||||
var prev = new Phase7CompositionResult(
|
||||
new[] { new EquipmentNode("eq-keep", "Keep", "line-1"), new EquipmentNode("eq-drop", "Drop", "line-1") },
|
||||
new[] { new DriverInstancePlan("drv-keep", "Modbus", "{}"), new DriverInstancePlan("drv-change", "Modbus", "{\"v\":1}") },
|
||||
new[] { new ScriptedAlarmPlan("a-keep", "eq-keep", "s1", "t1") });
|
||||
var next = new Phase7CompositionResult(
|
||||
new[] { new EquipmentNode("eq-keep", "Keep", "line-1"), new EquipmentNode("eq-new", "New", "line-1") },
|
||||
new[] { new DriverInstancePlan("drv-keep", "Modbus", "{}"), new DriverInstancePlan("drv-change", "Modbus", "{\"v\":2}") },
|
||||
new[] { new ScriptedAlarmPlan("a-keep", "eq-keep", "s1", "t1"), new ScriptedAlarmPlan("a-new", "eq-new", "s2", "t2") });
|
||||
|
||||
var plan = Phase7Planner.Compute(prev, next);
|
||||
|
||||
plan.AddedEquipment.Single().EquipmentId.ShouldBe("eq-new");
|
||||
plan.RemovedEquipment.Single().EquipmentId.ShouldBe("eq-drop");
|
||||
plan.ChangedEquipment.ShouldBeEmpty();
|
||||
plan.ChangedDrivers.Single().Current.DriverInstanceId.ShouldBe("drv-change");
|
||||
plan.AddedAlarms.Single().ScriptedAlarmId.ShouldBe("a-new");
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,93 @@
|
||||
using System.Text;
|
||||
using System.Text.Json;
|
||||
using Shouldly;
|
||||
using Xunit;
|
||||
using ZB.MOM.WW.OtOpcUa.Runtime.Drivers;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Runtime.Tests.Drivers;
|
||||
|
||||
public sealed class DeploymentArtifactTests
|
||||
{
|
||||
[Fact]
|
||||
public void Empty_blob_returns_empty_list()
|
||||
{
|
||||
DeploymentArtifact.ParseDriverInstances(ReadOnlySpan<byte>.Empty).ShouldBeEmpty();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Malformed_json_returns_empty_list()
|
||||
{
|
||||
DeploymentArtifact.ParseDriverInstances(Encoding.UTF8.GetBytes("not json")).ShouldBeEmpty();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Snapshot_without_DriverInstances_returns_empty()
|
||||
{
|
||||
var blob = Encoding.UTF8.GetBytes("{\"Clusters\":[]}");
|
||||
DeploymentArtifact.ParseDriverInstances(blob).ShouldBeEmpty();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Parses_driver_instances_from_composer_shaped_blob()
|
||||
{
|
||||
// Mirrors the shape ConfigComposer.SnapshotAndFlattenAsync emits — Pascal-case fields
|
||||
// serialised directly off the EF entity.
|
||||
var rowId = Guid.NewGuid();
|
||||
var blob = JsonSerializer.SerializeToUtf8Bytes(new
|
||||
{
|
||||
DriverInstances = new[]
|
||||
{
|
||||
new
|
||||
{
|
||||
DriverInstanceRowId = rowId,
|
||||
DriverInstanceId = "DI-modbus-1",
|
||||
Name = "Modbus Line A",
|
||||
DriverType = "Modbus",
|
||||
Enabled = true,
|
||||
DriverConfig = "{\"host\":\"127.0.0.1\"}",
|
||||
},
|
||||
new
|
||||
{
|
||||
DriverInstanceRowId = Guid.NewGuid(),
|
||||
DriverInstanceId = "DI-disabled",
|
||||
Name = "Decommissioned",
|
||||
DriverType = "AbCip",
|
||||
Enabled = false,
|
||||
DriverConfig = "{}",
|
||||
},
|
||||
},
|
||||
});
|
||||
|
||||
var specs = DeploymentArtifact.ParseDriverInstances(blob);
|
||||
|
||||
specs.Count.ShouldBe(2);
|
||||
specs[0].DriverInstanceRowId.ShouldBe(rowId);
|
||||
specs[0].DriverInstanceId.ShouldBe("DI-modbus-1");
|
||||
specs[0].DriverType.ShouldBe("Modbus");
|
||||
specs[0].Enabled.ShouldBeTrue();
|
||||
specs[0].DriverConfig.ShouldContain("127.0.0.1");
|
||||
specs[1].Enabled.ShouldBeFalse();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Spec_missing_required_fields_is_dropped()
|
||||
{
|
||||
var blob = JsonSerializer.SerializeToUtf8Bytes(new
|
||||
{
|
||||
DriverInstances = new object[]
|
||||
{
|
||||
new { Name = "no-id" },
|
||||
new
|
||||
{
|
||||
DriverInstanceId = "DI-ok",
|
||||
DriverType = "Modbus",
|
||||
DriverConfig = "{}",
|
||||
},
|
||||
},
|
||||
});
|
||||
|
||||
var specs = DeploymentArtifact.ParseDriverInstances(blob);
|
||||
|
||||
specs.Single().DriverInstanceId.ShouldBe("DI-ok");
|
||||
}
|
||||
}
|
||||
+192
@@ -0,0 +1,192 @@
|
||||
using System.Text.Json;
|
||||
using Akka.Actor;
|
||||
using Microsoft.EntityFrameworkCore;
|
||||
using Shouldly;
|
||||
using Xunit;
|
||||
using ZB.MOM.WW.OtOpcUa.Commons.Messages.Deploy;
|
||||
using ZB.MOM.WW.OtOpcUa.Commons.Messages.Fleet;
|
||||
using ZB.MOM.WW.OtOpcUa.Commons.Types;
|
||||
using ZB.MOM.WW.OtOpcUa.Configuration;
|
||||
using ZB.MOM.WW.OtOpcUa.Configuration.Entities;
|
||||
using ZB.MOM.WW.OtOpcUa.Configuration.Enums;
|
||||
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
||||
using ZB.MOM.WW.OtOpcUa.Runtime.Drivers;
|
||||
using ZB.MOM.WW.OtOpcUa.Runtime.Tests.Harness;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Runtime.Tests.Drivers;
|
||||
|
||||
public sealed class DriverHostActorReconcileTests : RuntimeActorTestBase
|
||||
{
|
||||
private static readonly NodeId TestNode = NodeId.Parse("driver-test");
|
||||
private static readonly RevisionHash RevA = RevisionHash.Parse(new string('a', 64));
|
||||
private static readonly RevisionHash RevB = RevisionHash.Parse(new string('b', 64));
|
||||
|
||||
[Fact]
|
||||
public void Apply_with_driver_instances_in_artifact_spawns_one_child_per_enabled_row()
|
||||
{
|
||||
var db = NewInMemoryDbFactory();
|
||||
var factory = new CountingDriverFactory("Modbus");
|
||||
var deploymentId = SeedDeploymentWithDrivers(db, RevA,
|
||||
("DI-1", "Modbus", "{}", true),
|
||||
("DI-2", "Modbus", "{}", true),
|
||||
("DI-3", "Modbus", "{}", false)); // disabled — not spawned
|
||||
|
||||
var coordinator = CreateTestProbe();
|
||||
var actor = Sys.ActorOf(DriverHostActor.Props(
|
||||
db, TestNode, coordinator.Ref,
|
||||
driverFactory: factory,
|
||||
localRoles: new HashSet<string> { "driver" }));
|
||||
|
||||
actor.Tell(new DispatchDeployment(deploymentId, RevA, CorrelationId.NewId()));
|
||||
|
||||
coordinator.ExpectMsg<ApplyAck>(TimeSpan.FromSeconds(5)).Outcome.ShouldBe(ApplyAckOutcome.Applied);
|
||||
AwaitAssert(() => factory.CreateCount.ShouldBe(2), duration: TimeSpan.FromSeconds(3));
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Apply_with_unsupported_driver_type_falls_back_to_stub()
|
||||
{
|
||||
var db = NewInMemoryDbFactory();
|
||||
// Factory only supports "Modbus" — the Galaxy row should boot stubbed.
|
||||
var factory = new CountingDriverFactory("Modbus");
|
||||
var deploymentId = SeedDeploymentWithDrivers(db, RevA,
|
||||
("DI-galaxy", "Galaxy", "{}", true));
|
||||
|
||||
var coordinator = CreateTestProbe();
|
||||
var actor = Sys.ActorOf(DriverHostActor.Props(
|
||||
db, TestNode, coordinator.Ref,
|
||||
driverFactory: factory,
|
||||
localRoles: new HashSet<string> { "driver" }));
|
||||
|
||||
actor.Tell(new DispatchDeployment(deploymentId, RevA, CorrelationId.NewId()));
|
||||
|
||||
coordinator.ExpectMsg<ApplyAck>(TimeSpan.FromSeconds(5)).Outcome.ShouldBe(ApplyAckOutcome.Applied);
|
||||
|
||||
// No real driver was constructed — stubbing took over.
|
||||
factory.CreateCount.ShouldBe(0);
|
||||
|
||||
// GetDiagnostics should still report the (stubbed) child.
|
||||
actor.Tell(new GetDiagnostics(CorrelationId.NewId()), coordinator.Ref);
|
||||
var snap = coordinator.ExpectMsg<Commons.Interfaces.NodeDiagnosticsSnapshot>(TimeSpan.FromSeconds(2));
|
||||
snap.Drivers.Count.ShouldBe(1);
|
||||
snap.Drivers[0].State.ShouldBe("Stubbed");
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Galaxy_on_non_windows_is_stubbed_by_ShouldStub_check()
|
||||
{
|
||||
// Even if the factory could create it, ShouldStub('Galaxy', ...) returns true on macOS/Linux —
|
||||
// the factory should never be called.
|
||||
var db = NewInMemoryDbFactory();
|
||||
var factory = new CountingDriverFactory("Galaxy");
|
||||
var deploymentId = SeedDeploymentWithDrivers(db, RevA,
|
||||
("DI-galaxy", "Galaxy", "{}", true));
|
||||
|
||||
var coordinator = CreateTestProbe();
|
||||
var actor = Sys.ActorOf(DriverHostActor.Props(
|
||||
db, TestNode, coordinator.Ref,
|
||||
driverFactory: factory,
|
||||
localRoles: new HashSet<string> { "driver" }));
|
||||
|
||||
actor.Tell(new DispatchDeployment(deploymentId, RevA, CorrelationId.NewId()));
|
||||
|
||||
coordinator.ExpectMsg<ApplyAck>(TimeSpan.FromSeconds(5));
|
||||
|
||||
if (OperatingSystem.IsWindows())
|
||||
{
|
||||
factory.CreateCount.ShouldBe(1);
|
||||
}
|
||||
else
|
||||
{
|
||||
factory.CreateCount.ShouldBe(0); // ShouldStub forced the stub path
|
||||
}
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Second_apply_with_removed_driver_stops_the_child()
|
||||
{
|
||||
var db = NewInMemoryDbFactory();
|
||||
var factory = new CountingDriverFactory("Modbus");
|
||||
var d1 = SeedDeploymentWithDrivers(db, RevA, ("DI-1", "Modbus", "{}", true), ("DI-2", "Modbus", "{}", true));
|
||||
var d2 = SeedDeploymentWithDrivers(db, RevB, ("DI-1", "Modbus", "{}", true));
|
||||
|
||||
var coordinator = CreateTestProbe();
|
||||
var actor = Sys.ActorOf(DriverHostActor.Props(
|
||||
db, TestNode, coordinator.Ref,
|
||||
driverFactory: factory,
|
||||
localRoles: new HashSet<string> { "driver" }));
|
||||
|
||||
actor.Tell(new DispatchDeployment(d1, RevA, CorrelationId.NewId()));
|
||||
coordinator.ExpectMsg<ApplyAck>(TimeSpan.FromSeconds(5));
|
||||
AwaitAssert(() => factory.CreateCount.ShouldBe(2), duration: TimeSpan.FromSeconds(3));
|
||||
|
||||
actor.Tell(new DispatchDeployment(d2, RevB, CorrelationId.NewId()));
|
||||
coordinator.ExpectMsg<ApplyAck>(TimeSpan.FromSeconds(5));
|
||||
|
||||
actor.Tell(new GetDiagnostics(CorrelationId.NewId()), coordinator.Ref);
|
||||
var snap = coordinator.ExpectMsg<Commons.Interfaces.NodeDiagnosticsSnapshot>(TimeSpan.FromSeconds(2));
|
||||
snap.Drivers.Count.ShouldBe(1);
|
||||
}
|
||||
|
||||
private static DeploymentId SeedDeploymentWithDrivers(
|
||||
IDbContextFactory<OtOpcUaConfigDbContext> db,
|
||||
RevisionHash rev,
|
||||
params (string Id, string Type, string Config, bool Enabled)[] drivers)
|
||||
{
|
||||
var artifact = JsonSerializer.SerializeToUtf8Bytes(new
|
||||
{
|
||||
DriverInstances = drivers.Select(d => new
|
||||
{
|
||||
DriverInstanceRowId = Guid.NewGuid(),
|
||||
DriverInstanceId = d.Id,
|
||||
Name = d.Id,
|
||||
DriverType = d.Type,
|
||||
Enabled = d.Enabled,
|
||||
DriverConfig = d.Config,
|
||||
}).ToArray(),
|
||||
});
|
||||
|
||||
var id = DeploymentId.NewId();
|
||||
using var ctx = db.CreateDbContext();
|
||||
ctx.Deployments.Add(new Deployment
|
||||
{
|
||||
DeploymentId = id.Value,
|
||||
RevisionHash = rev.Value,
|
||||
Status = DeploymentStatus.Sealed,
|
||||
CreatedBy = "test",
|
||||
SealedAtUtc = DateTime.UtcNow,
|
||||
ArtifactBlob = artifact,
|
||||
});
|
||||
ctx.SaveChanges();
|
||||
return id;
|
||||
}
|
||||
|
||||
private sealed class CountingDriverFactory : IDriverFactory
|
||||
{
|
||||
private readonly string _supportedType;
|
||||
public int CreateCount;
|
||||
public CountingDriverFactory(string supportedType) { _supportedType = supportedType; }
|
||||
|
||||
public IDriver? TryCreate(string driverType, string driverInstanceId, string driverConfigJson)
|
||||
{
|
||||
if (!string.Equals(driverType, _supportedType, StringComparison.Ordinal)) return null;
|
||||
Interlocked.Increment(ref CreateCount);
|
||||
return new TestDriver(driverInstanceId, driverType);
|
||||
}
|
||||
|
||||
public IReadOnlyCollection<string> SupportedTypes => new[] { _supportedType };
|
||||
}
|
||||
|
||||
private sealed class TestDriver : IDriver
|
||||
{
|
||||
public string DriverInstanceId { get; }
|
||||
public string DriverType { get; }
|
||||
public TestDriver(string id, string type) { DriverInstanceId = id; DriverType = type; }
|
||||
public Task InitializeAsync(string driverConfigJson, CancellationToken cancellationToken) => Task.CompletedTask;
|
||||
public Task ReinitializeAsync(string driverConfigJson, CancellationToken cancellationToken) => Task.CompletedTask;
|
||||
public Task ShutdownAsync(CancellationToken cancellationToken) => Task.CompletedTask;
|
||||
public DriverHealth GetHealth() => new(DriverState.Healthy, DateTime.UtcNow, LastError: null);
|
||||
public long GetMemoryFootprint() => 0;
|
||||
public Task FlushOptionalCachesAsync(CancellationToken cancellationToken) => Task.CompletedTask;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,118 @@
|
||||
using Shouldly;
|
||||
using Xunit;
|
||||
using ZB.MOM.WW.OtOpcUa.Runtime.Drivers;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Runtime.Tests.Drivers;
|
||||
|
||||
public sealed class DriverSpawnPlannerTests
|
||||
{
|
||||
private static DriverInstanceSpec Spec(string id, string type = "Modbus", string config = "{\"host\":\"127.0.0.1\"}", bool enabled = true) =>
|
||||
new(Guid.NewGuid(), id, id, type, enabled, config);
|
||||
|
||||
[Fact]
|
||||
public void All_new_drivers_go_into_ToSpawn_when_current_is_empty()
|
||||
{
|
||||
var current = new Dictionary<string, DriverChildSnapshot>();
|
||||
var target = new[] { Spec("a"), Spec("b") };
|
||||
|
||||
var plan = DriverSpawnPlanner.Compute(current, target);
|
||||
|
||||
plan.ToSpawn.Count.ShouldBe(2);
|
||||
plan.ToApplyDelta.ShouldBeEmpty();
|
||||
plan.ToStop.ShouldBeEmpty();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Same_config_yields_empty_plan()
|
||||
{
|
||||
var current = new Dictionary<string, DriverChildSnapshot>
|
||||
{
|
||||
["a"] = new("Modbus", "{\"host\":\"127.0.0.1\"}"),
|
||||
};
|
||||
var target = new[] { Spec("a") };
|
||||
|
||||
var plan = DriverSpawnPlanner.Compute(current, target);
|
||||
|
||||
plan.ToSpawn.ShouldBeEmpty();
|
||||
plan.ToApplyDelta.ShouldBeEmpty();
|
||||
plan.ToStop.ShouldBeEmpty();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Different_config_routes_to_ApplyDelta()
|
||||
{
|
||||
var current = new Dictionary<string, DriverChildSnapshot>
|
||||
{
|
||||
["a"] = new("Modbus", "{\"host\":\"old\"}"),
|
||||
};
|
||||
var target = new[] { Spec("a", config: "{\"host\":\"new\"}") };
|
||||
|
||||
var plan = DriverSpawnPlanner.Compute(current, target);
|
||||
|
||||
plan.ToApplyDelta.Single().DriverInstanceId.ShouldBe("a");
|
||||
plan.ToSpawn.ShouldBeEmpty();
|
||||
plan.ToStop.ShouldBeEmpty();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Removed_driver_routes_to_ToStop()
|
||||
{
|
||||
var current = new Dictionary<string, DriverChildSnapshot>
|
||||
{
|
||||
["a"] = new("Modbus", "{\"host\":\"127.0.0.1\"}"),
|
||||
["b"] = new("Modbus", "{}"),
|
||||
};
|
||||
var target = new[] { Spec("a") };
|
||||
|
||||
var plan = DriverSpawnPlanner.Compute(current, target);
|
||||
|
||||
plan.ToStop.ShouldBe(new[] { "b" });
|
||||
plan.ToSpawn.ShouldBeEmpty();
|
||||
plan.ToApplyDelta.ShouldBeEmpty();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Disabled_driver_with_running_child_routes_to_ToStop()
|
||||
{
|
||||
var current = new Dictionary<string, DriverChildSnapshot>
|
||||
{
|
||||
["a"] = new("Modbus", "{}"),
|
||||
};
|
||||
var target = new[] { Spec("a", enabled: false) };
|
||||
|
||||
var plan = DriverSpawnPlanner.Compute(current, target);
|
||||
|
||||
plan.ToStop.Single().ShouldBe("a");
|
||||
plan.ToSpawn.ShouldBeEmpty();
|
||||
plan.ToApplyDelta.ShouldBeEmpty();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Disabled_new_driver_is_not_spawned()
|
||||
{
|
||||
var current = new Dictionary<string, DriverChildSnapshot>();
|
||||
var target = new[] { Spec("a", enabled: false) };
|
||||
|
||||
var plan = DriverSpawnPlanner.Compute(current, target);
|
||||
|
||||
plan.ToSpawn.ShouldBeEmpty();
|
||||
plan.ToApplyDelta.ShouldBeEmpty();
|
||||
plan.ToStop.ShouldBeEmpty();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Driver_type_change_triggers_stop_plus_respawn()
|
||||
{
|
||||
var current = new Dictionary<string, DriverChildSnapshot>
|
||||
{
|
||||
["a"] = new("Modbus", "{}"),
|
||||
};
|
||||
var target = new[] { Spec("a", type: "AbCip") };
|
||||
|
||||
var plan = DriverSpawnPlanner.Compute(current, target);
|
||||
|
||||
plan.ToStop.Single().ShouldBe("a");
|
||||
plan.ToSpawn.Single().DriverType.ShouldBe("AbCip");
|
||||
plan.ToApplyDelta.ShouldBeEmpty();
|
||||
}
|
||||
}
|
||||
@@ -1,6 +1,9 @@
|
||||
using System.Collections.Concurrent;
|
||||
using Akka.Actor;
|
||||
using Shouldly;
|
||||
using Xunit;
|
||||
using ZB.MOM.WW.OtOpcUa.Commons.Messages.Redundancy;
|
||||
using ZB.MOM.WW.OtOpcUa.Commons.OpcUa;
|
||||
using ZB.MOM.WW.OtOpcUa.Commons.Types;
|
||||
using ZB.MOM.WW.OtOpcUa.Runtime.OpcUa;
|
||||
using ZB.MOM.WW.OtOpcUa.Runtime.Tests.Harness;
|
||||
@@ -13,12 +16,11 @@ public sealed class OpcUaPublishActorTests : RuntimeActorTestBase
|
||||
public void Accepts_message_contracts_without_pinned_dispatcher_in_tests()
|
||||
{
|
||||
var actor = Sys.ActorOf(OpcUaPublishActor.PropsForTests());
|
||||
actor.Tell(new OpcUaPublishActor.AttributeValueUpdate("ns=2;s=Tag1", 42.0, OpcUaPublishActor.OpcUaQuality.Good, DateTime.UtcNow));
|
||||
actor.Tell(new OpcUaPublishActor.AttributeValueUpdate("ns=2;s=Tag1", 42.0, OpcUaQuality.Good, DateTime.UtcNow));
|
||||
actor.Tell(new OpcUaPublishActor.AlarmStateUpdate("ns=2;s=Alarm1", true, false, DateTime.UtcNow));
|
||||
actor.Tell(new OpcUaPublishActor.RebuildAddressSpace(CorrelationId.NewId()));
|
||||
actor.Tell(new OpcUaPublishActor.ServiceLevelChanged(240));
|
||||
|
||||
// Actor stays alive; no exceptions surface.
|
||||
ExpectNoMsg(TimeSpan.FromMilliseconds(200));
|
||||
}
|
||||
|
||||
@@ -28,4 +30,135 @@ public sealed class OpcUaPublishActorTests : RuntimeActorTestBase
|
||||
var props = OpcUaPublishActor.Props();
|
||||
props.Dispatcher.ShouldBe(OpcUaPublishActor.DispatcherId);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void AttributeValueUpdate_routes_to_sink_WriteValue()
|
||||
{
|
||||
var sink = new RecordingSink();
|
||||
var actor = Sys.ActorOf(OpcUaPublishActor.PropsForTests(sink: sink));
|
||||
|
||||
actor.Tell(new OpcUaPublishActor.AttributeValueUpdate("ns=2;s=T1", 3.14, OpcUaQuality.Good, DateTime.UtcNow));
|
||||
actor.Tell(new OpcUaPublishActor.AttributeValueUpdate("ns=2;s=T2", "abc", OpcUaQuality.Uncertain, DateTime.UtcNow));
|
||||
|
||||
AwaitAssert(() =>
|
||||
{
|
||||
sink.Values.Count.ShouldBe(2);
|
||||
sink.Values[0].NodeId.ShouldBe("ns=2;s=T1");
|
||||
sink.Values[0].Value.ShouldBe(3.14);
|
||||
sink.Values[0].Quality.ShouldBe(OpcUaQuality.Good);
|
||||
sink.Values[1].Quality.ShouldBe(OpcUaQuality.Uncertain);
|
||||
}, duration: TimeSpan.FromMilliseconds(500));
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void AlarmStateUpdate_routes_to_sink_WriteAlarmState()
|
||||
{
|
||||
var sink = new RecordingSink();
|
||||
var actor = Sys.ActorOf(OpcUaPublishActor.PropsForTests(sink: sink));
|
||||
|
||||
actor.Tell(new OpcUaPublishActor.AlarmStateUpdate("ns=2;s=A1", Active: true, Acknowledged: false, DateTime.UtcNow));
|
||||
|
||||
AwaitAssert(() =>
|
||||
{
|
||||
sink.Alarms.Count.ShouldBe(1);
|
||||
sink.Alarms[0].AlarmNodeId.ShouldBe("ns=2;s=A1");
|
||||
sink.Alarms[0].Active.ShouldBeTrue();
|
||||
sink.Alarms[0].Acknowledged.ShouldBeFalse();
|
||||
}, duration: TimeSpan.FromMilliseconds(500));
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void RebuildAddressSpace_calls_sink_Rebuild()
|
||||
{
|
||||
var sink = new RecordingSink();
|
||||
var actor = Sys.ActorOf(OpcUaPublishActor.PropsForTests(sink: sink));
|
||||
|
||||
actor.Tell(new OpcUaPublishActor.RebuildAddressSpace(CorrelationId.NewId()));
|
||||
|
||||
AwaitAssert(() => sink.RebuildCalls.ShouldBe(1), duration: TimeSpan.FromMilliseconds(500));
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void ServiceLevelChanged_publishes_to_IServiceLevelPublisher_once_per_unique_level()
|
||||
{
|
||||
var publisher = new RecordingPublisher();
|
||||
var actor = Sys.ActorOf(OpcUaPublishActor.PropsForTests(serviceLevel: publisher));
|
||||
|
||||
actor.Tell(new OpcUaPublishActor.ServiceLevelChanged(240));
|
||||
actor.Tell(new OpcUaPublishActor.ServiceLevelChanged(240)); // dedup
|
||||
actor.Tell(new OpcUaPublishActor.ServiceLevelChanged(100));
|
||||
|
||||
AwaitAssert(() => publisher.Levels.ShouldBe(new byte[] { 240, 100 }),
|
||||
duration: TimeSpan.FromMilliseconds(500));
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void RedundancyStateChanged_drives_local_ServiceLevel_publish_for_primary_leader()
|
||||
{
|
||||
var publisher = new RecordingPublisher();
|
||||
var local = NodeId.Parse("primary-node");
|
||||
var actor = Sys.ActorOf(OpcUaPublishActor.PropsForTests(
|
||||
serviceLevel: publisher, localNode: local));
|
||||
|
||||
var snapshot = new RedundancyStateChanged(
|
||||
Nodes: new[]
|
||||
{
|
||||
new NodeRedundancyState(local, RedundancyRole.Primary,
|
||||
IsClusterLeader: true, IsRoleLeaderForDriver: true, DateTime.UtcNow),
|
||||
new NodeRedundancyState(NodeId.Parse("other-node"), RedundancyRole.Secondary,
|
||||
IsClusterLeader: false, IsRoleLeaderForDriver: false, DateTime.UtcNow),
|
||||
},
|
||||
CorrelationId.NewId());
|
||||
actor.Tell(snapshot);
|
||||
|
||||
AwaitAssert(() => publisher.Levels.ShouldBe(new byte[] { 240 }),
|
||||
duration: TimeSpan.FromMilliseconds(500));
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void RedundancyStateChanged_for_secondary_publishes_100()
|
||||
{
|
||||
var publisher = new RecordingPublisher();
|
||||
var local = NodeId.Parse("secondary-node");
|
||||
var actor = Sys.ActorOf(OpcUaPublishActor.PropsForTests(serviceLevel: publisher, localNode: local));
|
||||
|
||||
var snapshot = new RedundancyStateChanged(
|
||||
Nodes: new[]
|
||||
{
|
||||
new NodeRedundancyState(local, RedundancyRole.Secondary,
|
||||
IsClusterLeader: false, IsRoleLeaderForDriver: false, DateTime.UtcNow),
|
||||
},
|
||||
CorrelationId.NewId());
|
||||
actor.Tell(snapshot);
|
||||
|
||||
AwaitAssert(() => publisher.Levels.ShouldBe(new byte[] { 100 }),
|
||||
duration: TimeSpan.FromMilliseconds(500));
|
||||
}
|
||||
|
||||
private sealed class RecordingSink : IOpcUaAddressSpaceSink
|
||||
{
|
||||
public ConcurrentQueue<(string NodeId, object? Value, OpcUaQuality Quality, DateTime Ts)> ValueQueue { get; } = new();
|
||||
public ConcurrentQueue<(string AlarmNodeId, bool Active, bool Acknowledged, DateTime Ts)> AlarmQueue { get; } = new();
|
||||
public int RebuildCalls;
|
||||
|
||||
public List<(string NodeId, object? Value, OpcUaQuality Quality, DateTime Ts)> Values =>
|
||||
ValueQueue.ToList();
|
||||
public List<(string AlarmNodeId, bool Active, bool Acknowledged, DateTime Ts)> Alarms =>
|
||||
AlarmQueue.ToList();
|
||||
|
||||
public void WriteValue(string nodeId, object? value, OpcUaQuality quality, DateTime ts) =>
|
||||
ValueQueue.Enqueue((nodeId, value, quality, ts));
|
||||
|
||||
public void WriteAlarmState(string alarmNodeId, bool active, bool acknowledged, DateTime ts) =>
|
||||
AlarmQueue.Enqueue((alarmNodeId, active, acknowledged, ts));
|
||||
|
||||
public void RebuildAddressSpace() => Interlocked.Increment(ref RebuildCalls);
|
||||
}
|
||||
|
||||
private sealed class RecordingPublisher : IServiceLevelPublisher
|
||||
{
|
||||
private readonly ConcurrentQueue<byte> _q = new();
|
||||
public byte[] Levels => _q.ToArray();
|
||||
public void Publish(byte serviceLevel) => _q.Enqueue(serviceLevel);
|
||||
}
|
||||
}
|
||||
|
||||
+114
-1
@@ -1,9 +1,14 @@
|
||||
using System.Collections.Concurrent;
|
||||
using Akka.Actor;
|
||||
using Akka.TestKit;
|
||||
using Shouldly;
|
||||
using Xunit;
|
||||
using ZB.MOM.WW.OtOpcUa.Commons.Engines;
|
||||
using ZB.MOM.WW.OtOpcUa.Commons.Messages.Alerts;
|
||||
using ZB.MOM.WW.OtOpcUa.Commons.Messages.Logging;
|
||||
using ZB.MOM.WW.OtOpcUa.Runtime.ScriptedAlarms;
|
||||
using ZB.MOM.WW.OtOpcUa.Runtime.Tests.Harness;
|
||||
using ZB.MOM.WW.OtOpcUa.Runtime.VirtualTags;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Runtime.Tests.ScriptedAlarms;
|
||||
|
||||
@@ -13,7 +18,6 @@ public sealed class ScriptedAlarmActorTests : RuntimeActorTestBase
|
||||
public void Full_state_cycle_publishes_StateChanged_to_parent_at_each_transition()
|
||||
{
|
||||
var parent = CreateTestProbe();
|
||||
// Wrap the alarm actor under our probe as parent so StateChanged lands on the probe.
|
||||
var actor = parent.ChildActorOf(ScriptedAlarmActor.Props("alarm-1"));
|
||||
|
||||
actor.Tell(new ScriptedAlarmActor.ConditionMet("threshold"));
|
||||
@@ -41,4 +45,113 @@ public sealed class ScriptedAlarmActorTests : RuntimeActorTestBase
|
||||
actor.Tell(new ScriptedAlarmActor.ConditionMet("second"));
|
||||
parent.ExpectNoMsg(TimeSpan.FromMilliseconds(200));
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Engine_active_transition_publishes_AlarmTransitionEvent_to_alerts_topic()
|
||||
{
|
||||
var capture = new CapturingPublisher();
|
||||
var parent = CreateTestProbe();
|
||||
var config = new ScriptedAlarmActor.AlarmConfig(
|
||||
AlarmId: "alarm-7",
|
||||
AlarmName: "High Temp",
|
||||
EquipmentPath: "/site-1/line-A/oven",
|
||||
Severity: 800,
|
||||
Predicate: "temp > 80");
|
||||
var actor = parent.ChildActorOf(ScriptedAlarmActor.Props(
|
||||
config,
|
||||
evaluator: new ThresholdEvaluator(80),
|
||||
publisherFactory: () => new DPSPublisher(capture.Publish)));
|
||||
|
||||
actor.Tell(new ScriptedAlarmActor.DependencyValueChanged("temp", 92, DateTime.UtcNow));
|
||||
parent.ExpectMsg<ScriptedAlarmActor.StateChanged>().State.ShouldBe(ScriptedAlarmActorState.Active);
|
||||
|
||||
AwaitAssert(() =>
|
||||
{
|
||||
var transitionEvt = capture.Payloads.OfType<AlarmTransitionEvent>().SingleOrDefault();
|
||||
transitionEvt.ShouldNotBeNull();
|
||||
transitionEvt.AlarmId.ShouldBe("alarm-7");
|
||||
transitionEvt.AlarmName.ShouldBe("High Temp");
|
||||
transitionEvt.EquipmentPath.ShouldBe("/site-1/line-A/oven");
|
||||
transitionEvt.Severity.ShouldBe(800);
|
||||
transitionEvt.TransitionKind.ShouldBe("Activated");
|
||||
transitionEvt.User.ShouldBe("system");
|
||||
|
||||
var log = capture.Payloads.OfType<ScriptLogEntry>().SingleOrDefault();
|
||||
log.ShouldNotBeNull();
|
||||
log.AlarmId.ShouldBe("alarm-7");
|
||||
}, duration: TimeSpan.FromSeconds(1));
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Engine_clear_transition_publishes_Cleared_event()
|
||||
{
|
||||
var capture = new CapturingPublisher();
|
||||
var parent = CreateTestProbe();
|
||||
var config = new ScriptedAlarmActor.AlarmConfig("alarm-7", "High Temp", "/p", 500, "temp > 80");
|
||||
var evaluator = new ThresholdEvaluator(80);
|
||||
var actor = parent.ChildActorOf(ScriptedAlarmActor.Props(
|
||||
config, evaluator,
|
||||
publisherFactory: () => new DPSPublisher(capture.Publish)));
|
||||
|
||||
actor.Tell(new ScriptedAlarmActor.DependencyValueChanged("temp", 92, DateTime.UtcNow));
|
||||
parent.ExpectMsg<ScriptedAlarmActor.StateChanged>();
|
||||
|
||||
actor.Tell(new ScriptedAlarmActor.DependencyValueChanged("temp", 70, DateTime.UtcNow));
|
||||
parent.ExpectMsg<ScriptedAlarmActor.StateChanged>().State.ShouldBe(ScriptedAlarmActorState.Inactive);
|
||||
|
||||
AwaitAssert(() =>
|
||||
{
|
||||
var kinds = capture.Payloads.OfType<AlarmTransitionEvent>().Select(e => e.TransitionKind).ToList();
|
||||
kinds.ShouldContain("Activated");
|
||||
kinds.ShouldContain("Cleared");
|
||||
}, duration: TimeSpan.FromSeconds(1));
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Manual_acknowledge_emits_Acknowledged_transition_with_user()
|
||||
{
|
||||
var capture = new CapturingPublisher();
|
||||
var parent = CreateTestProbe();
|
||||
var config = new ScriptedAlarmActor.AlarmConfig("a-1", "Pump Fail", "/eq", 700, Predicate: null);
|
||||
var actor = parent.ChildActorOf(ScriptedAlarmActor.Props(
|
||||
config, evaluator: null,
|
||||
publisherFactory: () => new DPSPublisher(capture.Publish)));
|
||||
|
||||
actor.Tell(new ScriptedAlarmActor.ConditionMet("driver-fault"));
|
||||
parent.ExpectMsg<ScriptedAlarmActor.StateChanged>();
|
||||
|
||||
actor.Tell(new ScriptedAlarmActor.AcknowledgeAlarm("operator-jane"));
|
||||
parent.ExpectMsg<ScriptedAlarmActor.StateChanged>().State.ShouldBe(ScriptedAlarmActorState.Acknowledged);
|
||||
|
||||
AwaitAssert(() =>
|
||||
{
|
||||
var ackEvt = capture.Payloads.OfType<AlarmTransitionEvent>()
|
||||
.SingleOrDefault(e => e.TransitionKind == "Acknowledged");
|
||||
ackEvt.ShouldNotBeNull();
|
||||
ackEvt.User.ShouldBe("operator-jane");
|
||||
}, duration: TimeSpan.FromSeconds(1));
|
||||
}
|
||||
|
||||
private sealed class ThresholdEvaluator : IScriptedAlarmEvaluator
|
||||
{
|
||||
private readonly double _threshold;
|
||||
public ThresholdEvaluator(double threshold) { _threshold = threshold; }
|
||||
public ScriptedAlarmEvalResult Evaluate(string id, string predicate, IReadOnlyDictionary<string, object?> deps)
|
||||
{
|
||||
if (!deps.TryGetValue("temp", out var raw) || raw is null)
|
||||
return ScriptedAlarmEvalResult.Failure("missing temp");
|
||||
return ScriptedAlarmEvalResult.Ok(Convert.ToDouble(raw) > _threshold);
|
||||
}
|
||||
}
|
||||
|
||||
private sealed class CapturingPublisher
|
||||
{
|
||||
public ConcurrentBag<string> Topics { get; } = new();
|
||||
public ConcurrentBag<object> Payloads { get; } = new();
|
||||
public void Publish(string topic, object payload)
|
||||
{
|
||||
Topics.Add(topic);
|
||||
Payloads.Add(payload);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,6 +1,9 @@
|
||||
using System.Collections.Concurrent;
|
||||
using Akka.Actor;
|
||||
using Shouldly;
|
||||
using Xunit;
|
||||
using ZB.MOM.WW.OtOpcUa.Commons.Engines;
|
||||
using ZB.MOM.WW.OtOpcUa.Commons.Messages.Logging;
|
||||
using ZB.MOM.WW.OtOpcUa.Runtime.Tests.Harness;
|
||||
using ZB.MOM.WW.OtOpcUa.Runtime.VirtualTags;
|
||||
|
||||
@@ -15,8 +18,100 @@ public sealed class VirtualTagActorTests : RuntimeActorTestBase
|
||||
Watch(actor);
|
||||
actor.Tell(new VirtualTagActor.DependencyValueChanged("tag-a", 10, DateTime.UtcNow));
|
||||
actor.Tell(new VirtualTagActor.DependencyValueChanged("tag-b", 20, DateTime.UtcNow));
|
||||
|
||||
// No crash, no termination.
|
||||
ExpectNoMsg(TimeSpan.FromMilliseconds(200));
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Evaluator_result_flows_to_parent_as_EvaluationResult()
|
||||
{
|
||||
var parent = CreateTestProbe();
|
||||
var evaluator = new SumEvaluator();
|
||||
var actor = parent.ChildActorOf(VirtualTagActor.Props("vt-1", "a + b", evaluator: evaluator));
|
||||
|
||||
actor.Tell(new VirtualTagActor.DependencyValueChanged("a", 10, DateTime.UtcNow));
|
||||
actor.Tell(new VirtualTagActor.DependencyValueChanged("b", 32, DateTime.UtcNow));
|
||||
|
||||
// First dep: a alone -> 10. Second dep: a + b -> 42.
|
||||
var first = parent.ExpectMsg<VirtualTagActor.EvaluationResult>();
|
||||
first.Value.ShouldBe(10);
|
||||
var second = parent.ExpectMsg<VirtualTagActor.EvaluationResult>();
|
||||
second.Value.ShouldBe(42);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Repeated_same_value_does_not_emit_a_second_EvaluationResult()
|
||||
{
|
||||
var parent = CreateTestProbe();
|
||||
var evaluator = new ConstEvaluator(42);
|
||||
var actor = parent.ChildActorOf(VirtualTagActor.Props("vt-1", "expr", evaluator: evaluator));
|
||||
|
||||
actor.Tell(new VirtualTagActor.DependencyValueChanged("a", 1, DateTime.UtcNow));
|
||||
var first = parent.ExpectMsg<VirtualTagActor.EvaluationResult>();
|
||||
first.Value.ShouldBe(42);
|
||||
|
||||
actor.Tell(new VirtualTagActor.DependencyValueChanged("a", 2, DateTime.UtcNow));
|
||||
parent.ExpectNoMsg(TimeSpan.FromMilliseconds(200));
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Evaluator_failure_publishes_ScriptLogEntry_warning()
|
||||
{
|
||||
var capture = new CapturingPublisher();
|
||||
var parent = CreateTestProbe();
|
||||
var actor = parent.ChildActorOf(VirtualTagActor.Props(
|
||||
"vt-1", "broken",
|
||||
evaluator: new FailingEvaluator("syntax error"),
|
||||
scriptId: "script-7",
|
||||
publisherFactory: () => new DPSPublisher(capture.Publish)));
|
||||
|
||||
actor.Tell(new VirtualTagActor.DependencyValueChanged("a", 1, DateTime.UtcNow));
|
||||
|
||||
AwaitAssert(() =>
|
||||
{
|
||||
capture.Topics.ShouldContain("script-logs");
|
||||
var entry = (ScriptLogEntry)capture.Payloads.Single();
|
||||
entry.Level.ShouldBe("Warning");
|
||||
entry.Message.ShouldContain("syntax error");
|
||||
entry.ScriptId.ShouldBe("script-7");
|
||||
entry.VirtualTagId.ShouldBe("vt-1");
|
||||
}, duration: TimeSpan.FromMilliseconds(500));
|
||||
|
||||
parent.ExpectNoMsg(TimeSpan.FromMilliseconds(100));
|
||||
}
|
||||
|
||||
private sealed class SumEvaluator : IVirtualTagEvaluator
|
||||
{
|
||||
public VirtualTagEvalResult Evaluate(string id, string expr, IReadOnlyDictionary<string, object?> deps)
|
||||
{
|
||||
var sum = deps.Values.OfType<int>().Sum();
|
||||
return VirtualTagEvalResult.Ok(sum);
|
||||
}
|
||||
}
|
||||
|
||||
private sealed class ConstEvaluator : IVirtualTagEvaluator
|
||||
{
|
||||
private readonly object _value;
|
||||
public ConstEvaluator(object value) { _value = value; }
|
||||
public VirtualTagEvalResult Evaluate(string id, string expr, IReadOnlyDictionary<string, object?> deps)
|
||||
=> VirtualTagEvalResult.Ok(_value);
|
||||
}
|
||||
|
||||
private sealed class FailingEvaluator : IVirtualTagEvaluator
|
||||
{
|
||||
private readonly string _reason;
|
||||
public FailingEvaluator(string reason) { _reason = reason; }
|
||||
public VirtualTagEvalResult Evaluate(string id, string expr, IReadOnlyDictionary<string, object?> deps)
|
||||
=> VirtualTagEvalResult.Failure(_reason);
|
||||
}
|
||||
|
||||
private sealed class CapturingPublisher
|
||||
{
|
||||
public ConcurrentBag<string> Topics { get; } = new();
|
||||
public ConcurrentBag<object> Payloads { get; } = new();
|
||||
public void Publish(string topic, object payload)
|
||||
{
|
||||
Topics.Add(topic);
|
||||
Payloads.Add(payload);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user