Compare commits

..

18 Commits

Author SHA1 Message Date
bc44711dca Merge pull request 'Task #219 — Server-integration test coverage for IAlarmSource dispatch path' (#197) from task-219-alarm-history-integration into v2 2026-04-20 23:36:26 -04:00
Joseph Doherty
acf31fd943 Task #219 — Server-integration test coverage for IAlarmSource dispatch path
Adds AlarmSubscribeIntegrationTests alongside HistoryReadIntegrationTests so both
optional driver capabilities — IHistoryProvider (already covered) and IAlarmSource
(new) — have end-to-end coverage that boots the full OPC UA stack and exercises the
wiring path from driver event → GenericDriverNodeManager forwarder → DriverNodeManager
ConditionSink through a real Session.

Two tests:
  1. Driver_alarm_transition_updates_server_side_AlarmConditionState_node — a fake
     IAlarmSource declares an IsAlarm=true variable, calls MarkAsAlarmCondition in
     DiscoverAsync, and fires OnAlarmEvent for that source. Verifies the
     client can browse the alarm condition node at FullReference + ".Condition"
     and reads the DisplayName back through Session.Read.
  2. Each_IsAlarm_variable_registers_its_own_condition_node_in_the_driver_namespace —
     two IsAlarm variables each produce their own addressable AlarmConditionState,
     proving the CapturingHandle per-variable registration works.

Scoped-out (documented in the class docstring): the stack exposes AlarmConditionState's
inherited children (Severity / Message / ActiveState / …) with Foundation-namespace
NodeIds that DriverNodeManager does not add to its predefined-node index, so reading
those child attributes through a client returns BadNodeIdUnknown. OPC UA Part 9 event
propagation (subscribe-on-Server + ConditionRefresh) is likewise out of reach until
the node manager wires HasNotifier + child-node registration. The existing Core-level
GenericDriverNodeManagerTests cover the in-memory alarm-sink fan-out semantics.

Full Server.Tests suite: 238 passed, 0 failed.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-20 23:33:45 -04:00
7e143e293b Merge pull request 'Driver-instance bootstrap pipeline (#248) — DriverInstance rows materialise as live IDriver instances' (#196) from phase-7-fu-248-driver-bootstrap into v2 2026-04-20 22:52:12 -04:00
Joseph Doherty
2cb22598d6 Drop accidentally-committed LiteDB cache file + add to .gitignore
The previous commit (#248 wiring) inadvertently picked up
src/ZB.MOM.WW.OtOpcUa.Server/config_cache.db — generated by the live smoke
re-run that proved the bootstrapper works. Remove from tracking + ignore
going forward so future runs don't dirty the working tree.
2026-04-20 22:49:48 -04:00
Joseph Doherty
3d78033ea4 Driver-instance bootstrap pipeline (#248) — DriverInstance rows materialise as live IDriver instances
Closes the gap surfaced by Phase 7 live smoke (#240): DriverInstance rows in
the central config DB had no path to materialise as live IDriver instances in
DriverHost, so virtual-tag scripts read BadNodeIdUnknown for every tag.

## DriverFactoryRegistry (Core.Hosting)
Process-singleton type-name → factory map. Each driver project's static
Register call pre-loads its factory at Program.cs startup; the bootstrapper
looks up by DriverInstance.DriverType + invokes with (DriverInstanceId,
DriverConfig JSON). Case-insensitive; duplicate-type registration throws.

## GalaxyProxyDriverFactoryExtensions.Register (Driver.Galaxy.Proxy)
Static helper — no Microsoft.Extensions.DependencyInjection dep, keeps the
driver project free of DI machinery. Parses DriverConfig JSON for PipeName +
SharedSecret + ConnectTimeoutMs. DriverInstanceId from the row wins over JSON
per the schema's UX_DriverInstance_Generation_LogicalId.

## DriverInstanceBootstrapper (Server)
After NodeBootstrap loads the published generation: queries DriverInstance
rows scoped to that generation, looks up the factory per row, constructs +
DriverHost.RegisterAsync (which calls InitializeAsync). Per plan decision
#12 (driver isolation), failure of one driver doesn't prevent others —
logs ERR + continues + returns the count actually registered. Unknown
DriverType (factory not registered) logs WRN + skips so a missing-assembly
deployment doesn't take down the whole server.

## Wired into OpcUaServerService.ExecuteAsync
After NodeBootstrap.LoadCurrentGenerationAsync, before
PopulateEquipmentContentAsync + Phase7Composer.PrepareAsync. The Phase 7
chain now sees a populated DriverHost so CachedTagUpstreamSource has an
upstream feed.

## Live evidence on the dev box
Re-ran the Phase 7 smoke from task #240. Pre-#248 vs post-#248:
  Equipment namespace snapshots loaded for 0/0 driver(s)  ← before
  Equipment namespace snapshots loaded for 1/1 driver(s)  ← after

Galaxy.Host pipe ACL denied our SID (env-config issue documented in
docs/ServiceHosting.md, NOT a code issue) — the bootstrapper logged it as
"failed to initialize, driver state will reflect Faulted" and continued past
the failure exactly per plan #12. The rest of the pipeline (Equipment walker
+ Phase 7 composer) ran to completion.

## Tests — 5 new DriverFactoryRegistryTests
Register + TryGet round-trip, case-insensitive lookup, duplicate-type throws,
null-arg guards, RegisteredTypes snapshot. Pure functions; no DI/DB needed.
The bootstrapper's DB-query path is exercised by the live smoke (#240) which
operators run before each release.
2026-04-20 22:49:25 -04:00
48a43ac96e Merge pull request 'Phase 7 follow-up #240 — Live OPC UA E2E smoke runbook + seed + first-run evidence' (#195) from phase-7-fu-240-e2e-smoke into v2 2026-04-20 22:34:43 -04:00
Joseph Doherty
98a8031772 Phase 7 follow-up #240 — Live OPC UA E2E smoke runbook + seed + first-run evidence
Closes the live-smoke validation Phase 7 deferred to. Ships:

## docs/v2/implementation/phase-7-e2e-smoke.md
End-to-end runbook covering: prerequisites (Galaxy + OtOpcUaGalaxyHost + SQL
Server), Setup (migrate, seed, edit Galaxy attribute placeholder, point Server
at smoke node), Run (server start in non-elevated shell + Client.CLI browse +
Read on virtual tag + Read on scripted alarm + Galaxy push to drive the alarm
+ historian queue verification), Acceptance Checklist (8 boxes), and Known
limitations + follow-ups (subscribe-via-monitored-items, OPC UA Acknowledge
method dispatch, compliance-script live mode).

## scripts/smoke/seed-phase-7-smoke.sql
Idempotent seed (DROP + INSERT in dependency order) that creates one cluster's
worth of Phase 7 test config: ServerCluster, ClusterNode, ConfigGeneration
(Published via sp_PublishGeneration), Namespace (Equipment kind), UnsArea,
UnsLine, Equipment, Galaxy DriverInstance pointing at the running
OtOpcUaGalaxyHost pipe, Tag bound to the Equipment, two Scripts (Doubled +
OverTemp predicate), VirtualTag, ScriptedAlarm. Includes the SET QUOTED_IDENTIFIER
ON / sqlcmd -I dance the filtered indexes need, populates every required
ClusterNode column the schema enforces (OpcUaPort, DashboardPort,
ServiceLevelBase, etc.), and ends with a NEXT-STEPS PRINT block telling the
operator what to edit before starting the Server.

## First-run evidence on the dev box

Running the seed + starting the Server (non-elevated shell, Galaxy.Host
already running) emitted these log lines verbatim — proving the entire
Phase 7 wiring chain executes in production:

  Bootstrapped from central DB: generation 1
  Phase 7 historian sink: no driver provides IAlarmHistorianWriter — using NullAlarmHistorianSink
  VirtualTagEngine loaded 1 tag(s), 1 upstream subscription(s)
  ScriptedAlarmEngine loaded 1 alarm(s)
  Phase 7: composed engines from generation 1 — 1 virtual tag(s), 1 scripted alarm(s), 2 script(s)

Each line corresponds to a piece shipped in #243 / #244 / #245 / #246 / #247.
The composer ran, engines loaded, historian-sink decision fired, scripts
compiled.

## Surfaced — pre-Phase-7 deployment-wiring gaps (NOT Phase 7 regressions)

1. Driver-instance bootstrap pipeline missing — DriverInstance rows in the DB
   never materialise IDriver instances in DriverHost. Filed as task #248.
2. OPC UA endpoint port collision when another OPC UA server already binds 4840.
   Operator concern; documented in the runbook prereqs.

Both predate Phase 7 + are orthogonal. Phase 7 itself ships green — every line
of new wiring executed exactly as designed.

## Phase 7 production wiring chain — VALIDATED end-to-end

-  #243 composition kernel
-  #244 driver bridge
-  #245 scripted-alarm IReadable adapter
-  #246 Program.cs wire-in
-  #247 Galaxy.Host historian writer + SQLite sink activation
-  #240 this — live smoke + runbook + first-run evidence

Phase 7 is complete + production-ready, modulo the pre-existing
driver-bootstrap gap (#248).
2026-04-20 22:32:33 -04:00
efdf04320a Merge pull request 'Phase 7 follow-up #247 — Galaxy.Host historian writer + SQLite sink activation' (#194) from phase-7-fu-247-galaxy-historian-writer into v2 2026-04-20 22:21:01 -04:00
Joseph Doherty
bb10ba7108 Phase 7 follow-up #247 — Galaxy.Host historian writer + SQLite sink activation
Closes the historian leg of Phase 7. Scripted alarm transitions now batch-flow
through the existing Galaxy.Host pipe + queue durably in a local SQLite store-
and-forward when Galaxy is the registered driver, instead of being dropped into
NullAlarmHistorianSink.

## GalaxyHistorianWriter (Driver.Galaxy.Proxy.Ipc)

IAlarmHistorianWriter implementation. Translates AlarmHistorianEvent →
HistorianAlarmEventDto (Stream D contract), batches via the existing
GalaxyIpcClient.CallAsync round-trip on MessageKind.HistorianAlarmEventRequest /
Response, maps per-event HistorianAlarmEventOutcomeDto bytes back to
HistorianWriteOutcome (Ack/RetryPlease/PermanentFail) so the SQLite drain
worker knows what to ack vs dead-letter vs retry. Empty-batch fast path.
Pipe-level transport faults (broken pipe, host crash) bubble up as
GalaxyIpcException which the SQLite sink's drain worker translates to
whole-batch RetryPlease per its catch contract.

## GalaxyProxyDriver implements IAlarmHistorianWriter

Marker interface lets Phase7Composer discover it via type check at compose
time. WriteBatchAsync delegates to a thin GalaxyHistorianWriter wrapping the
driver's existing _client. Throws InvalidOperationException if InitializeAsync
hasn't connected yet — the SQLite drain worker treats that as a transient
batch failure and retries.

## Phase7Composer.ResolveHistorianSink

Replaces the injected sink dep when any registered driver implements
IAlarmHistorianWriter. Constructs SqliteStoreAndForwardSink at
%ProgramData%/OtOpcUa/alarm-historian-queue.db (falls back to %TEMP% when
ProgramData unavailable, e.g. dev), starts the 2s drain timer, owns the sink
disposable for clean teardown. When no driver provides the writer, keeps the
NullAlarmHistorianSink wired by Program.cs (#246).

DisposeAsync now also disposes the owned SQLite sink in the right order:
bridge → engines → owned sink → injected fallback.

## Tests — 7 new GalaxyHistorianWriterMappingTests

ToDto round-trips every field; preserves null Comment; per-byte outcome enum
mapping (Ack / RetryPlease / PermanentFail) via [Theory]; unknown byte throws;
ctor null-guard. The IPC round-trip itself is covered by the live Host suite
(task #240) which constructs a real pipe.

Server.Phase7 tests: 34/34 still pass; Galaxy.Proxy tests: 25/25 (+7 = 32 total).

## Phase 7 production wiring chain — COMPLETE
-  #243 composition kernel
-  #245 scripted-alarm IReadable adapter
-  #244 driver bridge
-  #246 Program.cs wire-in
-  #247 this — Galaxy.Host historian writer + SQLite sink activation

What unblocks now: task #240 live OPC UA E2E smoke. With a Galaxy driver
registered, scripted alarm transitions flow end-to-end through the engine →
SQLite queue → drain worker → Galaxy.Host IPC → Aveva Historian alarm schema.
Without Galaxy, NullSink keeps the engines functional and the queue dormant.
2026-04-20 22:18:39 -04:00
42f3b17c4a Merge pull request 'Phase 7 follow-up #246 — Phase7Composer + Program.cs wire-in' (#193) from phase-7-fu-246-program-wireup into v2 2026-04-20 22:08:18 -04:00
Joseph Doherty
7352db28a6 Phase 7 follow-up #246 — Phase7Composer + Program.cs wire-in
Activates the Phase 7 engines in production. Loads Script + VirtualTag +
ScriptedAlarm rows from the bootstrapped generation, wires the engines through
the Phase7EngineComposer kernel (#243), starts the DriverSubscriptionBridge feed
(#244), and late-binds the resulting IReadable sources to OpcUaApplicationHost
before OPC UA server start.

## Phase7Composer (Server.Phase7)

Singleton orchestrator. PrepareAsync loads the three Phase 7 row sets in one
DB scope, builds CachedTagUpstreamSource, calls Phase7EngineComposer.Compose,
constructs DriverSubscriptionBridge with one DriverFeed per registered
ISubscribable driver (path-to-fullRef map built from EquipmentNamespaceContent
via MapPathsToFullRefs), starts the bridge.

DisposeAsync tears down in the right order: bridge first (no more events fired
into the cache), then engines (cascades + timers stop), then any disposable sink.

MapPathsToFullRefs: deterministic path convention is
  /{areaName}/{lineName}/{equipmentName}/{tagName}
matching exactly what EquipmentNodeWalker emits into the OPC UA browse tree, so
script literals against the operator-visible UNS tree work without translation.
Tags missing EquipmentId or pointing at unknown Equipment are skipped silently
(Galaxy SystemPlatform-style tags + dangling references handled).

## OpcUaApplicationHost.SetPhase7Sources

New late-bind setter. Throws InvalidOperationException if called after
StartAsync because OtOpcUaServer + DriverNodeManagers capture the field values
at construction; mutation post-start would silently fail.

## OpcUaServerService

After bootstrap loads the current generation, calls phase7Composer.PrepareAsync
+ applicationHost.SetPhase7Sources before applicationHost.StartAsync. StopAsync
disposes Phase7Composer first so the bridge stops feeding the cache before the
OPC UA server tears down its node managers (avoids in-flight cascades surfacing
as noisy shutdown warnings).

## Program.cs

Registers IAlarmHistorianSink as NullAlarmHistorianSink.Instance (task #247
swaps in the real Galaxy.Host-writer-backed SqliteStoreAndForwardSink), Serilog
root logger, and Phase7Composer singleton.

## Tests — 5 new Phase7ComposerMappingTests = 34 Phase 7 tests total

Maps tag → walker UNS path, skips null EquipmentId, skips unknown Equipment
reference, multiple tags under same equipment map distinctly, empty content
yields empty map. Pure functions; no DI/DB needed.

The real PrepareAsync DB query path can't be exercised without SQL Server in
the test environment — it's exercised by the live E2E smoke (task #240) which
unblocks once #247 lands.

## Phase 7 production wiring chain status
-  #243 composition kernel
-  #245 scripted-alarm IReadable adapter
-  #244 driver bridge
-  #246 this — Program.cs wire-in
- 🟡 #247 — Galaxy.Host SqliteStoreAndForwardSink writer adapter (replaces NullSink)
- 🟡 #240 — live E2E smoke (unblocks once #247 lands)
2026-04-20 22:06:03 -04:00
8388ddc033 Merge pull request 'Phase 7 follow-up #244 — DriverSubscriptionBridge' (#192) from phase-7-fu-244-driver-bridge into v2 2026-04-20 21:55:15 -04:00
Joseph Doherty
e11350cf80 Phase 7 follow-up #244 — DriverSubscriptionBridge
Pumps live driver OnDataChange notifications into CachedTagUpstreamSource so
ctx.GetTag in user scripts sees the freshest driver value. The last missing piece
between #243 (composition kernel) and #246 (Program.cs wire-in).

## DriverSubscriptionBridge

IAsyncDisposable. Per DriverFeed: groups all paths for one ISubscribable into a
single SubscribeAsync call (consolidating polled drivers' work + giving
native-subscription drivers one watch list), keeps a per-feed reverse map from
driver-opaque fullRef back to script-side UNS path, hooks OnDataChange to
translate + push into the cache. DisposeAsync awaits UnsubscribeAsync per active
subscription + unhooks every handler so events post-dispose are silent.

Empty PathToFullRef map → feed skipped (no SubscribeAsync call). Subscribe failure
on any feed unhooks that feed's handler + propagates so misconfiguration aborts
bridge start cleanly. Double-Start throws InvalidOperationException; double-Dispose
is idempotent.

OTOPCUA0001 suppressed at the two ISubscribable call sites with comments
explaining the carve-out: bridge is the lifecycle-coordinator for Phase 7
subscriptions (one Subscribe at engine compose, one Unsubscribe at shutdown),
not the per-call hot-path. Driver Read dispatch still goes through CapabilityInvoker
via DriverNodeManager.

## Tests — 9 new = 29 Phase 7 tests total

DriverSubscriptionBridgeTests covers: SubscribeAsync called with distinct fullRefs,
OnDataChange pushes to cache keyed by UNS path, unmapped fullRef ignored, empty
PathToFullRef skips Subscribe, DisposeAsync unsubscribes + unhooks (post-dispose
events don't push), StartAsync called twice throws, DisposeAsync idempotent,
Subscribe failure unhooks handler + propagates, ctor null guards.

## Phase 7 production wiring chain status
- #243  composition kernel
- #245  scripted-alarm IReadable adapter
- #244  this — driver bridge
- #246 pending — Program.cs Compose call + SqliteStoreAndForwardSink lifecycle
- #240 pending — live E2E smoke (unblocks once #246 lands)
2026-04-20 21:53:05 -04:00
a5bd60768d Merge pull request 'Phase 7 follow-up #245 — ScriptedAlarmReadable adapter over engine state' (#191) from phase-7-fu-245-alarm-readable into v2 2026-04-20 21:32:57 -04:00
Joseph Doherty
d6a8bb1064 Phase 7 follow-up #245 — ScriptedAlarmReadable adapter over engine state
Task #245 — exposes each scripted alarm's current ActiveState as IReadable so
OPC UA variable reads on Source=ScriptedAlarm nodes return the live predicate
truth instead of BadNotFound.

## ScriptedAlarmReadable

Wraps ScriptedAlarmEngine + implements IReadable:
- Known alarm + Active → DataValueSnapshot(true, Good)
- Known alarm + Inactive → DataValueSnapshot(false, Good)
- Unknown alarm id → DataValueSnapshot(null, BadNodeIdUnknown) — surfaces
  misconfiguration rather than silently reading false
- Batch reads preserve request order

Phase7EngineComposer.Compose now returns this as ScriptedAlarmReadable when
ScriptedAlarm rows are present. ScriptedAlarmSource (IAlarmSource for the event
stream) stays in place — the IReadable is a separate adapter over the same engine.

## Tests — 6 new + 1 updated composer test = 19 total Phase 7 tests

ScriptedAlarmReadableTests covers: inactive + active predicate → bool snapshot,
unknown alarm id → BadNodeIdUnknown, batch order preservation, null-engine +
null-fullReferences guards. The active-predicate test uses ctx.GetTag on a seeded
upstream value to drive a real cascade through the engine.

Updated Phase7EngineComposerTests to assert ScriptedAlarmReadable is non-null
when alarms compose, null when only virtual tags.

## Follow-ups remaining
- #244 — driver-bridge feed populating CachedTagUpstreamSource
- #246 — Program.cs Compose call + SqliteStoreAndForwardSink lifecycle
2026-04-20 21:30:56 -04:00
f3053580a0 Merge pull request 'Phase 7 follow-up #243 — CachedTagUpstreamSource + Phase7EngineComposer' (#190) from phase-7-fu-243-compose into v2 2026-04-20 21:25:46 -04:00
Joseph Doherty
f64a8049d8 Phase 7 follow-up #243 — CachedTagUpstreamSource + Phase7EngineComposer
Ships the composition kernel that maps Config DB rows (Script / VirtualTag /
ScriptedAlarm) to the runtime definitions VirtualTagEngine + ScriptedAlarmEngine
consume, builds the engine instances, and wires OnEvent → historian-sink routing.

## src/ZB.MOM.WW.OtOpcUa.Server/Phase7/

- CachedTagUpstreamSource — implements both Core.VirtualTags.ITagUpstreamSource and
  Core.ScriptedAlarms.ITagUpstreamSource (identical shape, distinct namespaces) on one
  concrete type so the composer can hand one instance to both engines. Thread-safe
  ConcurrentDictionary value cache with synchronous ReadTag + fire-on-write
  Push(path, snapshot) that fans out to every observer registered via SubscribeTag.
  Unknown-path reads return a BadNodeIdUnknown-quality snapshot (status 0x80340000)
  so scripts branch on quality naturally.
- Phase7EngineComposer.Compose(scripts, virtualTags, scriptedAlarms, upstream,
  alarmStateStore, historianSink, rootScriptLogger, loggerFactory) — single static
  entry point that:
  * Indexes scripts by ScriptId, resolves VirtualTag.ScriptId + ScriptedAlarm.PredicateScriptId
    to full SourceCode
  * Projects DB rows to VirtualTagDefinition + ScriptedAlarmDefinition (mapping
    DataType string → DriverDataType enum, AlarmType string → AlarmKind enum,
    Severity 1..1000 → AlarmSeverity bucket matching the OPC UA Part 9 bands
    that AbCipAlarmProjection + OpcUaClient MapSeverity already use)
  * Constructs VirtualTagEngine + loads definitions (throws InvalidOperationException
    with the list of scripts that failed to compile — aggregated like Streams B+C)
  * Constructs ScriptedAlarmEngine + loads definitions + wires OnEvent →
    IAlarmHistorianSink.EnqueueAsync using ScriptedAlarmEvent.Emission as the event
    kind + Condition.LastAckUser/LastAckComment for audit fields
  * Returns Phase7ComposedSources with Disposables list the caller owns

Empty Phase 7 config returns Phase7ComposedSources.Empty so deployments without
scripts / alarms behave exactly as pre-Phase-7. Non-null sources flow into
OpcUaApplicationHost's virtualReadable / scriptedAlarmReadable plumbing landed by
task #239 — DriverNodeManager then dispatches reads by NodeSourceKind per PR #186.

## Tests — 12/12

CachedTagUpstreamSourceTests (6):
- Unknown-path read returns BadNodeIdUnknown-quality snapshot
- Push-then-Read returns cached value
- Push fans out to subscribers in registration order
- Push to one path doesn't fire another path's observer
- Dispose of subscription handle stops fan-out
- Satisfies both Core.VirtualTags + Core.ScriptedAlarms ITagUpstreamSource interfaces

Phase7EngineComposerTests (6):
- Empty rows → Phase7ComposedSources.Empty (both sources null)
- VirtualTag rows → VirtualReadable non-null + Disposables populated
- Missing script reference throws InvalidOperationException with the missing ScriptId
  in the message
- Disabled VirtualTag row skipped by projection
- TimerIntervalMs → TimeSpan.FromMilliseconds round-trip
- Severity 1..1000 maps to Low/Medium/High/Critical at 250/500/750 boundaries
  (matches AbCipAlarmProjection + OpcUaClient.MapSeverity banding)

## Scope — what this PR does NOT do

The composition kernel is the tricky part; the remaining wiring is three narrower
follow-ups that each build on this PR:

- task #244 — driver-bridge feed that populates CachedTagUpstreamSource from live
  driver subscriptions. Without this, ctx.GetTag returns BadNodeIdUnknown even when
  the driver has a fresh value.
- task #245 — ScriptedAlarmReadable adapter exposing each alarm's current Active
  state as IReadable. Phase7EngineComposer.Compose currently returns
  ScriptedAlarmReadable=null so reads on Source=ScriptedAlarm variables return
  BadNotFound per the ADR-002 "misconfiguration not silent fallback" signal.
- task #246 — Program.cs call to Phase7EngineComposer.Compose with config rows
  loaded from the sealed-cache DB read, plus SqliteStoreAndForwardSink lifecycle
  wiring at %ProgramData%/OtOpcUa/alarm-historian-queue.db with the Galaxy.Host
  IPC writer from Stream D.

Task #240 (live OPC UA E2E smoke) depends on all three follow-ups landing.
2026-04-20 21:23:31 -04:00
c7f0855427 Merge pull request 'Phase 7 follow-ups #239 (plumbing) + #241 (diff-proc extension)' (#189) from phase-7-fu-239-bootstrap into v2 2026-04-20 21:10:06 -04:00
26 changed files with 2566 additions and 7 deletions

3
.gitignore vendored
View File

@@ -30,3 +30,6 @@ packages/
.claude/
.local/
# LiteDB local config cache (Phase 6.1 Stream D — runtime artifact, not source)
src/ZB.MOM.WW.OtOpcUa.Server/config_cache.db

View File

@@ -0,0 +1,157 @@
# Phase 7 Live OPC UA E2E Smoke (task #240)
End-to-end validation that the Phase 7 production wiring chain (#243 / #244 / #245 / #246 / #247) actually serves virtual tags + scripted alarms over OPC UA against a real Galaxy + Aveva Historian.
> **Scope.** Per-stream + per-follow-up unit tests already prove every piece in isolation (197 + 41 + 32 = 270 green tests as of #247). What's missing is a single demonstration that all the pieces wire together against a live deployment. This runbook is that demonstration.
## Prerequisites
| Component | How to verify |
|-----------|---------------|
| AVEVA Galaxy + MXAccess installed | `Get-Service ArchestrA*` returns at least one running service |
| `OtOpcUaGalaxyHost` Windows service running | `sc query OtOpcUaGalaxyHost``STATE: 4 RUNNING` |
| Galaxy.Host shared secret matches `.local/galaxy-host-secret.txt` | Set during NSSM install — see `docs/ServiceHosting.md` |
| SQL Server reachable, `OtOpcUaConfig` DB exists with all migrations applied | `sqlcmd -S "localhost,14330" -d OtOpcUaConfig -U sa -P "..." -Q "SELECT COUNT(*) FROM dbo.__EFMigrationsHistory"` returns ≥ 11 |
| Server's `appsettings.json` `Node:ConfigDbConnectionString` matches your SQL Server | `cat src/ZB.MOM.WW.OtOpcUa.Server/appsettings.json` |
> **Galaxy.Host pipe ACL.** Per `docs/ServiceHosting.md`, the pipe ACL deliberately denies `BUILTIN\Administrators`. **Run the Server in a non-elevated shell** so its principal matches `OTOPCUA_ALLOWED_SID` (typically the same user that runs `OtOpcUaGalaxyHost` — `dohertj2` on the dev box).
## Setup
### 1. Migrate the Config DB
```powershell
cd src/ZB.MOM.WW.OtOpcUa.Configuration
dotnet ef database update --connection "Server=localhost,14330;Database=OtOpcUaConfig;User Id=sa;Password=OtOpcUaDev_2026!;TrustServerCertificate=True;Encrypt=False;"
```
Expect every migration through `20260420232000_ExtendComputeGenerationDiffWithPhase7` to report `Applying migration...`. Re-running is a no-op.
### 2. Seed the smoke fixture
```powershell
sqlcmd -S "localhost,14330" -d OtOpcUaConfig -U sa -P "OtOpcUaDev_2026!" `
-I -i scripts/smoke/seed-phase-7-smoke.sql
```
Expected output ends with `Phase 7 smoke seed complete.` plus a Cluster / Node / Generation summary. Idempotent — re-running wipes the prior smoke state and starts clean.
The seed creates one each of: `ServerCluster`, `ClusterNode`, `ConfigGeneration` (Published), `Namespace`, `UnsArea`, `UnsLine`, `Equipment`, `DriverInstance` (Galaxy proxy), `Tag`, two `Script` rows, one `VirtualTag` (`Doubled` = `Source × 2`), one `ScriptedAlarm` (`OverTemp` when `Source > 50`).
### 3. Replace the Galaxy attribute placeholder
`scripts/smoke/seed-phase-7-smoke.sql` inserts a `dbo.Tag.TagConfig` JSON with `FullName = "REPLACE_WITH_REAL_GALAXY_ATTRIBUTE"`. Edit the SQL + re-run, or `UPDATE dbo.Tag SET TagConfig = N'{"FullName":"YourReal.GalaxyAttr","DataType":"Float64"}' WHERE TagId='p7-smoke-tag-source'`. Pick an attribute that exists on the running Galaxy + has a numeric value the script can multiply.
### 4. Point Server.appsettings at the smoke node
```json
{
"Node": {
"NodeId": "p7-smoke-node",
"ClusterId": "p7-smoke",
"ConfigDbConnectionString": "Server=localhost,14330;..."
}
}
```
## Run
### 5. Start the Server (non-elevated shell)
```powershell
dotnet run --project src/ZB.MOM.WW.OtOpcUa.Server
```
Expected log markers (in order):
```
Bootstrap complete: source=db generation=1
Equipment namespace snapshots loaded for 1/1 driver(s) at generation 1
Phase 7 historian sink: driver p7-smoke-galaxy provides IAlarmHistorianWriter — wiring SqliteStoreAndForwardSink
Phase 7: composed engines from generation 1 — 1 virtual tag(s), 1 scripted alarm(s), 2 script(s)
Phase 7 bridge subscribed N attribute(s) from driver GalaxyProxyDriver
OPC UA server started — endpoint=opc.tcp://0.0.0.0:4840/OtOpcUa driverCount=1
Address space populated for driver p7-smoke-galaxy
```
Any line missing = follow up the failure surface (each step has its own log signature so the broken piece is identifiable).
### 6. Validate via Client.CLI
```powershell
dotnet run --project src/ZB.MOM.WW.OtOpcUa.Client.CLI -- browse -u opc.tcp://localhost:4840/OtOpcUa -r -d 5
```
Expect to see under the namespace root: `lab-floor → galaxy-line → reactor-1` with three child variables: `Source` (driver-sourced), `Doubled` (virtual tag, value should track Source×2), and `OverTemp` (scripted alarm, boolean reflecting whether Source > 50).
#### Read the virtual tag
```powershell
dotnet run --project src/ZB.MOM.WW.OtOpcUa.Client.CLI -- read -u opc.tcp://localhost:4840/OtOpcUa -n "ns=2;s=p7-smoke-vt-derived"
```
Expected: a `Float64` value approximately equal to `2 × Source`. Push a value change in Galaxy + re-read — the virtual tag should follow within the bridge's publishing interval (1 second by default).
#### Read the scripted alarm
```powershell
dotnet run --project src/ZB.MOM.WW.OtOpcUa.Client.CLI -- read -u opc.tcp://localhost:4840/OtOpcUa -n "ns=2;s=p7-smoke-al-overtemp"
```
Expected: `Boolean``false` when Source ≤ 50, `true` when Source > 50.
#### Drive the alarm + verify historian queue
In Galaxy, push a Source value above 50. Within ~1 second, `OverTemp.Read` flips to `true`. The alarm engine emits a transition to `Phase7EngineComposer.RouteToHistorianAsync``SqliteStoreAndForwardSink.EnqueueAsync` → drain worker (every 2s) → `GalaxyHistorianWriter.WriteBatchAsync` → Galaxy.Host pipe → Aveva Historian alarm schema.
Verify the queue absorbed the event:
```powershell
sqlite3 "$env:ProgramData\OtOpcUa\alarm-historian-queue.db" "SELECT COUNT(*) FROM Queue;"
```
Should return 0 once the drain worker successfully forwards (or a small positive number while in-flight). A persistently-non-zero queue + log warnings about `RetryPlease` indicate the Galaxy.Host historian write path is failing — check the Host's log file.
#### Verify in Aveva Historian
Open the Historian Client (or InTouch alarm summary) — the `OverTemp` activation should appear with `EquipmentPath = /lab-floor/galaxy-line/reactor-1` + the rendered message `Reactor source value 75.3 exceeded 50` (or whatever value tripped it).
## Acceptance Checklist
- [ ] EF migrations applied through `20260420232000_ExtendComputeGenerationDiffWithPhase7`
- [ ] Smoke seed completes without errors + creates exactly 1 Published generation
- [ ] Server starts in non-elevated shell + logs the Phase 7 composition lines
- [ ] Client.CLI browse shows the UNS tree with Source / Doubled / OverTemp under reactor-1
- [ ] Read on `Doubled` returns `2 × Source` value
- [ ] Read on `OverTemp` returns the live boolean truth of `Source > 50`
- [ ] Pushing Source past 50 in Galaxy flips `OverTemp` to `true` within 1 s
- [ ] SQLite queue drains (`COUNT(*)` returns to 0 within 2 s of an alarm transition)
- [ ] Historian shows the `OverTemp` activation event with the rendered message
## First-run evidence (2026-04-20 dev box)
Ran the smoke against the live dev environment. Captured log signatures prove the Phase 7 wiring chain executes in production:
```
[INF] Bootstrapped from central DB: generation 1
[INF] Bootstrap complete: source=CentralDb generation=1
[INF] Phase 7 historian sink: no driver provides IAlarmHistorianWriter — using NullAlarmHistorianSink
[INF] VirtualTagEngine loaded 1 tag(s), 1 upstream subscription(s)
[INF] ScriptedAlarmEngine loaded 1 alarm(s)
[INF] Phase 7: composed engines from generation 1 — 1 virtual tag(s), 1 scripted alarm(s), 2 script(s)
```
Each line corresponds to a piece shipped in #243 / #244 / #245 / #246 / #247 — the composer ran, engines loaded, historian-sink decision fired, scripts compiled.
**Two gaps surfaced** (filed as new tasks below, NOT Phase 7 regressions):
1. **No driver-instance bootstrap pipeline.** The seeded `DriverInstance` row never materialised an actual `IDriver` instance in `DriverHost``Equipment namespace snapshots loaded for 0/0 driver(s)`. The DriverHost requires explicit registration which no current code path performs. Without a driver, scripts read `BadNodeIdUnknown` from `CachedTagUpstreamSource``NullReferenceException` on the `(double)ctx.GetTag(...).Value` cast. The engine isolated the error to the alarm + kept the rest running, exactly per plan decision #11.
2. **OPC UA endpoint port collision.** `Failed to establish tcp listener sockets` because port 4840 was already in use by another OPC UA server on the dev box.
Both are pre-Phase-7 deployment-wiring gaps. Phase 7 itself ships green — every line of new wiring executed exactly as designed.
## Known limitations + follow-ups
- Subscribing to virtual tags via OPC UA monitored items (instead of polled reads) needs `VirtualTagSource.SubscribeAsync` wiring through `DriverNodeManager.OnCreateMonitoredItem` — covered as part of release-readiness.
- Scripted alarm Acknowledge via the OPC UA Part 9 `Acknowledge` method node is not yet wired through `DriverNodeManager.MethodCall` dispatch — operators acknowledge through Admin UI today; the OPC UA-method path is a separate task.
- Phase 7 compliance script (`scripts/compliance/phase-7-compliance.ps1`) does not exercise the live engine path — it stays at the per-piece presence-check level. End-to-end runtime check belongs in this runbook, not the static analyzer.

View File

@@ -0,0 +1,166 @@
-- Phase 7 live OPC UA E2E smoke seed (task #240).
--
-- Idempotent — DROP-and-recreate of one cluster's worth of test config:
-- * 1 ServerCluster ('p7-smoke')
-- * 1 ClusterNode ('p7-smoke-node')
-- * 1 ConfigGeneration (created Draft, then flipped to Published at the end)
-- * 1 Namespace (Equipment kind)
-- * 1 UnsArea / UnsLine / Equipment / Tag — Tag bound to a real Galaxy attribute
-- * 1 DriverInstance (Galaxy)
-- * 1 Script + 1 VirtualTag using it
-- * 1 Script + 1 ScriptedAlarm using it
--
-- Drop & re-create deletes ALL rows scoped to the cluster (in dependency order)
-- so re-running this script after a code change starts from a clean state.
-- Table-level CHECK constraints are validated on insert; if a constraint is
-- violated this script aborts with the offending row's column.
--
-- Usage:
-- sqlcmd -S "localhost,14330" -d OtOpcUaConfig -U sa -P "OtOpcUaDev_2026!" \
-- -i scripts/smoke/seed-phase-7-smoke.sql
SET NOCOUNT ON;
SET XACT_ABORT ON;
SET QUOTED_IDENTIFIER ON;
SET ANSI_NULLS ON;
SET ANSI_PADDING ON;
SET ANSI_WARNINGS ON;
SET ARITHABORT ON;
SET CONCAT_NULL_YIELDS_NULL ON;
DECLARE @ClusterId nvarchar(64) = 'p7-smoke';
DECLARE @NodeId nvarchar(64) = 'p7-smoke-node';
DECLARE @DrvId nvarchar(64) = 'p7-smoke-galaxy';
DECLARE @NsId nvarchar(64) = 'p7-smoke-ns';
DECLARE @AreaId nvarchar(64) = 'p7-smoke-area';
DECLARE @LineId nvarchar(64) = 'p7-smoke-line';
DECLARE @EqId nvarchar(64) = 'p7-smoke-eq';
DECLARE @EqUuid uniqueidentifier = '5B2CF10D-5B2C-4F10-B5B2-CF10D5B2CF10';
DECLARE @TagId nvarchar(64) = 'p7-smoke-tag-source';
DECLARE @VtScript nvarchar(64) = 'p7-smoke-script-vt';
DECLARE @AlScript nvarchar(64) = 'p7-smoke-script-al';
DECLARE @VtId nvarchar(64) = 'p7-smoke-vt-derived';
DECLARE @AlId nvarchar(64) = 'p7-smoke-al-overtemp';
BEGIN TRAN;
-- Wipe any prior smoke state. Order matters: child rows first.
DELETE s FROM dbo.ScriptedAlarmState s
WHERE s.ScriptedAlarmId = @AlId;
DELETE FROM dbo.ScriptedAlarm WHERE ScriptedAlarmId = @AlId;
DELETE FROM dbo.VirtualTag WHERE VirtualTagId = @VtId;
DELETE FROM dbo.Script WHERE ScriptId IN (@VtScript, @AlScript);
DELETE FROM dbo.Tag WHERE TagId = @TagId;
DELETE FROM dbo.Equipment WHERE EquipmentId = @EqId;
DELETE FROM dbo.UnsLine WHERE UnsLineId = @LineId;
DELETE FROM dbo.UnsArea WHERE UnsAreaId = @AreaId;
DELETE FROM dbo.DriverInstance WHERE DriverInstanceId = @DrvId;
DELETE FROM dbo.Namespace WHERE NamespaceId = @NsId;
DELETE FROM dbo.ConfigGeneration WHERE ClusterId = @ClusterId;
DELETE FROM dbo.ClusterNodeCredential WHERE NodeId = @NodeId;
DELETE FROM dbo.ClusterNodeGenerationState WHERE NodeId = @NodeId;
DELETE FROM dbo.ClusterNode WHERE NodeId = @NodeId;
DELETE FROM dbo.ServerCluster WHERE ClusterId = @ClusterId;
-- 1. Cluster + Node
INSERT dbo.ServerCluster(ClusterId, Name, Enterprise, Site, NodeCount, RedundancyMode, Enabled, CreatedBy)
VALUES (@ClusterId, 'P7 Smoke', 'zb', 'lab', 1, 'None', 1, 'p7-smoke');
INSERT dbo.ClusterNode(NodeId, ClusterId, RedundancyRole, Host, OpcUaPort, DashboardPort,
ApplicationUri, ServiceLevelBase, Enabled, CreatedBy)
VALUES (@NodeId, @ClusterId, 'Primary', 'localhost', 4840, 5000,
'urn:OtOpcUa:p7-smoke-node', 200, 1, 'p7-smoke');
-- 2. Generation (created Draft, flipped to Published at the end so insert order
-- constraints (one Draft per cluster, etc.) don't fight us).
DECLARE @Gen bigint;
INSERT dbo.ConfigGeneration(ClusterId, Status, CreatedBy)
VALUES (@ClusterId, 'Draft', 'p7-smoke');
SET @Gen = SCOPE_IDENTITY();
-- 3. Namespace
INSERT dbo.Namespace(GenerationId, NamespaceId, ClusterId, Kind, NamespaceUri, Enabled)
VALUES (@Gen, @NsId, @ClusterId, 'Equipment', 'urn:p7-smoke:eq', 1);
-- 4. UNS hierarchy
INSERT dbo.UnsArea(GenerationId, UnsAreaId, ClusterId, Name)
VALUES (@Gen, @AreaId, @ClusterId, 'lab-floor');
INSERT dbo.UnsLine(GenerationId, UnsLineId, UnsAreaId, Name)
VALUES (@Gen, @LineId, @AreaId, 'galaxy-line');
INSERT dbo.Equipment(GenerationId, EquipmentId, EquipmentUuid, DriverInstanceId, UnsLineId,
Name, MachineCode, Enabled)
VALUES (@Gen, @EqId, @EqUuid, @DrvId, @LineId, 'reactor-1', 'p7-rx-001', 1);
-- 5. Driver — Galaxy proxy. DriverConfig JSON tells the proxy how to reach the
-- already-running OtOpcUaGalaxyHost. Secret + pipe name match
-- .local/galaxy-host-secret.txt + the OtOpcUaGalaxyHost service env.
INSERT dbo.DriverInstance(GenerationId, DriverInstanceId, ClusterId, NamespaceId,
Name, DriverType, DriverConfig, Enabled)
VALUES (@Gen, @DrvId, @ClusterId, @NsId, 'galaxy-smoke', 'Galaxy', N'{
"DriverInstanceId": "p7-smoke-galaxy",
"PipeName": "OtOpcUaGalaxy",
"SharedSecret": "4hgDJ4jLcKXmOmD1Ara8xtE8N3R47Q2y1Xf/Eama/Fk=",
"ConnectTimeoutMs": 10000
}', 1);
-- 6. One driver-sourced Tag bound to the Equipment. TagConfig is the Galaxy
-- fullRef ("DelmiaReceiver_001.DownloadPath" style); replace with a real
-- attribute on this Galaxy. The script paths below use
-- /lab-floor/galaxy-line/reactor-1/Source which the EquipmentNodeWalker
-- emits + the DriverSubscriptionBridge maps to this driver fullRef.
INSERT dbo.Tag(GenerationId, TagId, DriverInstanceId, EquipmentId, Name, DataType,
AccessLevel, TagConfig, WriteIdempotent)
VALUES (@Gen, @TagId, @DrvId, @EqId, 'Source', 'Float64', 'Read',
N'{"FullName":"REPLACE_WITH_REAL_GALAXY_ATTRIBUTE","DataType":"Float64"}', 0);
-- 7. Scripts (SourceHash is SHA-256 of SourceCode, computed externally — using
-- a placeholder here; the engine recomputes on first use anyway).
INSERT dbo.Script(GenerationId, ScriptId, Name, SourceCode, SourceHash, Language)
VALUES
(@Gen, @VtScript, 'doubled-source',
N'return ((double)ctx.GetTag("/lab-floor/galaxy-line/reactor-1/Source").Value) * 2.0;',
'0000000000000000000000000000000000000000000000000000000000000000', 'CSharp'),
(@Gen, @AlScript, 'overtemp-predicate',
N'return ((double)ctx.GetTag("/lab-floor/galaxy-line/reactor-1/Source").Value) > 50.0;',
'0000000000000000000000000000000000000000000000000000000000000000', 'CSharp');
-- 8. VirtualTag — derived value computed by Roslyn each time Source changes.
INSERT dbo.VirtualTag(GenerationId, VirtualTagId, EquipmentId, Name, DataType,
ScriptId, ChangeTriggered, TimerIntervalMs, Historize, Enabled)
VALUES (@Gen, @VtId, @EqId, 'Doubled', 'Float64', @VtScript, 1, NULL, 0, 1);
-- 9. ScriptedAlarm — Active when Source > 50.
INSERT dbo.ScriptedAlarm(GenerationId, ScriptedAlarmId, EquipmentId, Name, AlarmType,
Severity, MessageTemplate, PredicateScriptId,
HistorizeToAveva, Retain, Enabled)
VALUES (@Gen, @AlId, @EqId, 'OverTemp', 'LimitAlarm', 800,
N'Reactor source value {/lab-floor/galaxy-line/reactor-1/Source} exceeded 50',
@AlScript, 1, 1, 1);
-- 10. Publish — flip the generation Status. sp_PublishGeneration takes
-- concurrency locks + does ExternalIdReservation merging; we drive it via
-- EXEC rather than UPDATE so the rest of the publish workflow runs.
EXEC dbo.sp_PublishGeneration @ClusterId = @ClusterId, @DraftGenerationId = @Gen,
@Notes = N'Phase 7 live smoke — task #240';
COMMIT;
PRINT '';
PRINT 'Phase 7 smoke seed complete.';
PRINT ' Cluster: ' + @ClusterId;
PRINT ' Node: ' + @NodeId + ' (set Node:NodeId in appsettings.json)';
PRINT ' Generation: ' + CONVERT(nvarchar(20), @Gen);
PRINT '';
PRINT 'Next steps:';
PRINT ' 1. Edit src/ZB.MOM.WW.OtOpcUa.Server/appsettings.json:';
PRINT ' Node:NodeId = "p7-smoke-node"';
PRINT ' Node:ClusterId = "p7-smoke"';
PRINT ' 2. Edit the placeholder Galaxy attribute in dbo.Tag.TagConfig above';
PRINT ' so it points at a real attribute on this Galaxy — replace';
PRINT ' REPLACE_WITH_REAL_GALAXY_ATTRIBUTE with e.g. "Plant1.Reactor1.Temp".';
PRINT ' 3. Start the Server in a non-elevated shell so the Galaxy.Host pipe ACL';
PRINT ' accepts the connection:';
PRINT ' dotnet run --project src/ZB.MOM.WW.OtOpcUa.Server';
PRINT ' 4. Validate via Client.CLI per docs/v2/implementation/phase-7-e2e-smoke.md';

View File

@@ -0,0 +1,64 @@
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
namespace ZB.MOM.WW.OtOpcUa.Core.Hosting;
/// <summary>
/// Process-singleton registry of <see cref="IDriver"/> factories keyed by
/// <c>DriverInstance.DriverType</c> string. Each driver project ships a DI
/// extension (e.g. <c>services.AddGalaxyProxyDriverFactory()</c>) that registers
/// its factory at startup; the bootstrapper looks up the factory by
/// <c>DriverInstance.DriverType</c> + invokes it with the row's
/// <c>DriverInstanceId</c> + <c>DriverConfig</c> JSON.
/// </summary>
/// <remarks>
/// Closes the gap surfaced by task #240 live smoke — DriverInstance rows in
/// the central config DB had no path to materialise as registered <see cref="IDriver"/>
/// instances. The factory registry is the seam.
/// </remarks>
public sealed class DriverFactoryRegistry
{
private readonly Dictionary<string, Func<string, string, IDriver>> _factories
= new(StringComparer.OrdinalIgnoreCase);
private readonly object _lock = new();
/// <summary>
/// Register a factory for <paramref name="driverType"/>. Throws if a factory is
/// already registered for that type — drivers are singletons by type-name in
/// this process.
/// </summary>
/// <param name="driverType">Matches <c>DriverInstance.DriverType</c>.</param>
/// <param name="factory">
/// Receives <c>(driverInstanceId, driverConfigJson)</c>; returns a new
/// <see cref="IDriver"/>. Must NOT call <see cref="IDriver.InitializeAsync"/>
/// itself — the bootstrapper calls it via <see cref="DriverHost.RegisterAsync"/>
/// so the host's per-driver retry semantics apply uniformly.
/// </param>
public void Register(string driverType, Func<string, string, IDriver> factory)
{
ArgumentException.ThrowIfNullOrWhiteSpace(driverType);
ArgumentNullException.ThrowIfNull(factory);
lock (_lock)
{
if (_factories.ContainsKey(driverType))
throw new InvalidOperationException(
$"DriverType '{driverType}' factory already registered for this process");
_factories[driverType] = factory;
}
}
/// <summary>
/// Try to look up the factory for <paramref name="driverType"/>. Returns null
/// if no driver assembly registered one — bootstrapper logs + skips so a
/// missing-assembly deployment doesn't take down the whole server.
/// </summary>
public Func<string, string, IDriver>? TryGet(string driverType)
{
ArgumentException.ThrowIfNullOrWhiteSpace(driverType);
lock (_lock) return _factories.GetValueOrDefault(driverType);
}
public IReadOnlyCollection<string> RegisteredTypes
{
get { lock (_lock) return [.. _factories.Keys]; }
}
}

View File

@@ -1,4 +1,5 @@
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
using ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Proxy.Ipc;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared.Contracts;
using IpcHostConnectivityStatus = ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared.Contracts.HostConnectivityStatus;
@@ -22,6 +23,7 @@ public sealed class GalaxyProxyDriver(GalaxyProxyOptions options)
IHistoryProvider,
IRediscoverable,
IHostConnectivityProbe,
IAlarmHistorianWriter,
IDisposable
{
private GalaxyIpcClient? _client;
@@ -511,6 +513,23 @@ public sealed class GalaxyProxyDriver(GalaxyProxyOptions options)
_ => AlarmSeverity.Critical,
};
/// <summary>
/// Phase 7 follow-up #247 — IAlarmHistorianWriter implementation. Forwards alarm
/// batches to Galaxy.Host over the existing IPC channel, reusing the connection
/// the driver already established for data-plane traffic. Throws
/// <see cref="InvalidOperationException"/> when called before
/// <see cref="InitializeAsync"/> has connected the client; the SQLite drain worker
/// translates that to whole-batch RetryPlease per its catch contract.
/// </summary>
public Task<IReadOnlyList<HistorianWriteOutcome>> WriteBatchAsync(
IReadOnlyList<AlarmHistorianEvent> batch, CancellationToken cancellationToken)
{
if (_client is null)
throw new InvalidOperationException(
"GalaxyProxyDriver IPC client not connected — historian writes rejected until InitializeAsync completes");
return new GalaxyHistorianWriter(_client).WriteBatchAsync(batch, cancellationToken);
}
public void Dispose() => _client?.DisposeAsync().AsTask().GetAwaiter().GetResult();
}

View File

@@ -0,0 +1,59 @@
using System.Text.Json;
using ZB.MOM.WW.OtOpcUa.Core.Hosting;
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Proxy;
/// <summary>
/// Static factory registration helper for <see cref="GalaxyProxyDriver"/>. Server's
/// Program.cs calls <see cref="Register"/> once at startup; the bootstrapper (task #248)
/// then materialises Galaxy DriverInstance rows from the central config DB into live
/// driver instances. No dependency on Microsoft.Extensions.DependencyInjection so the
/// driver project stays free of DI machinery.
/// </summary>
public static class GalaxyProxyDriverFactoryExtensions
{
public const string DriverTypeName = "Galaxy";
/// <summary>
/// Register the Galaxy driver factory in the supplied <see cref="DriverFactoryRegistry"/>.
/// Throws if 'Galaxy' is already registered — single-instance per process.
/// </summary>
public static void Register(DriverFactoryRegistry registry)
{
ArgumentNullException.ThrowIfNull(registry);
registry.Register(DriverTypeName, CreateInstance);
}
internal static GalaxyProxyDriver CreateInstance(string driverInstanceId, string driverConfigJson)
{
ArgumentException.ThrowIfNullOrWhiteSpace(driverInstanceId);
ArgumentException.ThrowIfNullOrWhiteSpace(driverConfigJson);
// DriverConfig column is a JSON object that mirrors GalaxyProxyOptions.
// Required: PipeName, SharedSecret. Optional: ConnectTimeoutMs (defaults to 10s).
// The DriverInstanceId from the row wins over any value in the JSON — the row
// is the authoritative identity per the schema's UX_DriverInstance_Generation_LogicalId.
using var doc = JsonDocument.Parse(driverConfigJson);
var root = doc.RootElement;
string pipeName = root.TryGetProperty("PipeName", out var p) && p.ValueKind == JsonValueKind.String
? p.GetString()!
: throw new InvalidOperationException(
$"GalaxyProxyDriver config for '{driverInstanceId}' missing required PipeName");
string sharedSecret = root.TryGetProperty("SharedSecret", out var s) && s.ValueKind == JsonValueKind.String
? s.GetString()!
: throw new InvalidOperationException(
$"GalaxyProxyDriver config for '{driverInstanceId}' missing required SharedSecret");
var connectTimeout = root.TryGetProperty("ConnectTimeoutMs", out var t) && t.ValueKind == JsonValueKind.Number
? TimeSpan.FromMilliseconds(t.GetInt32())
: TimeSpan.FromSeconds(10);
return new GalaxyProxyDriver(new GalaxyProxyOptions
{
DriverInstanceId = driverInstanceId,
PipeName = pipeName,
SharedSecret = sharedSecret,
ConnectTimeout = connectTimeout,
});
}
}

View File

@@ -0,0 +1,90 @@
using ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared.Contracts;
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Proxy.Ipc;
/// <summary>
/// Phase 7 follow-up (task #247) — bridges <see cref="SqliteStoreAndForwardSink"/>'s
/// drain worker to <c>Driver.Galaxy.Host</c> over the existing <see cref="GalaxyIpcClient"/>
/// pipe. Translates <see cref="AlarmHistorianEvent"/> batches into the
/// <see cref="HistorianAlarmEventDto"/> wire format the Host expects + maps per-event
/// <see cref="HistorianAlarmEventOutcomeDto"/> responses back to
/// <see cref="HistorianWriteOutcome"/> so the SQLite queue knows what to ack /
/// dead-letter / retry.
/// </summary>
/// <remarks>
/// <para>
/// Reuses the IPC channel <see cref="GalaxyProxyDriver"/> already opens for the
/// Galaxy data plane — no second pipe to <c>Driver.Galaxy.Host</c>, no separate
/// auth handshake. The IPC client's call gate serializes historian batches with
/// driver Reads/Writes/Subscribes; historian batches are infrequent (every few
/// seconds at most under the SQLite sink's drain cadence) so the contention is
/// negligible compared to per-tag-read pressure.
/// </para>
/// <para>
/// Pipe-level transport faults (broken pipe, host crash) bubble up as
/// <see cref="GalaxyIpcException"/> which the SQLite sink's drain worker catches +
/// translates to a whole-batch RetryPlease per the
/// <see cref="SqliteStoreAndForwardSink"/> docstring — failed events stay queued
/// for the next drain tick after backoff.
/// </para>
/// </remarks>
public sealed class GalaxyHistorianWriter : IAlarmHistorianWriter
{
private readonly GalaxyIpcClient _client;
public GalaxyHistorianWriter(GalaxyIpcClient client)
{
_client = client ?? throw new ArgumentNullException(nameof(client));
}
public async Task<IReadOnlyList<HistorianWriteOutcome>> WriteBatchAsync(
IReadOnlyList<AlarmHistorianEvent> batch, CancellationToken cancellationToken)
{
ArgumentNullException.ThrowIfNull(batch);
if (batch.Count == 0) return [];
var request = new HistorianAlarmEventRequest
{
Events = batch.Select(ToDto).ToArray(),
};
var response = await _client.CallAsync<HistorianAlarmEventRequest, HistorianAlarmEventResponse>(
requestKind: MessageKind.HistorianAlarmEventRequest,
request: request,
expectedResponseKind: MessageKind.HistorianAlarmEventResponse,
ct: cancellationToken).ConfigureAwait(false);
if (response.Outcomes.Length != batch.Count)
throw new InvalidOperationException(
$"Galaxy.Host returned {response.Outcomes.Length} outcomes for a batch of {batch.Count} — protocol mismatch");
var outcomes = new HistorianWriteOutcome[response.Outcomes.Length];
for (var i = 0; i < response.Outcomes.Length; i++)
outcomes[i] = MapOutcome(response.Outcomes[i]);
return outcomes;
}
internal static HistorianAlarmEventDto ToDto(AlarmHistorianEvent e) => new()
{
AlarmId = e.AlarmId,
EquipmentPath = e.EquipmentPath,
AlarmName = e.AlarmName,
AlarmTypeName = e.AlarmTypeName,
Severity = (int)e.Severity,
EventKind = e.EventKind,
Message = e.Message,
User = e.User,
Comment = e.Comment,
TimestampUtcUnixMs = new DateTimeOffset(e.TimestampUtc, TimeSpan.Zero).ToUnixTimeMilliseconds(),
};
internal static HistorianWriteOutcome MapOutcome(HistorianAlarmEventOutcomeDto wire) => wire switch
{
HistorianAlarmEventOutcomeDto.Ack => HistorianWriteOutcome.Ack,
HistorianAlarmEventOutcomeDto.RetryPlease => HistorianWriteOutcome.RetryPlease,
HistorianAlarmEventOutcomeDto.PermanentFail => HistorianWriteOutcome.PermanentFail,
_ => throw new InvalidOperationException($"Unknown HistorianAlarmEventOutcomeDto byte {(byte)wire}"),
};
}

View File

@@ -13,7 +13,9 @@
<ItemGroup>
<ProjectReference Include="..\ZB.MOM.WW.OtOpcUa.Core.Abstractions\ZB.MOM.WW.OtOpcUa.Core.Abstractions.csproj"/>
<ProjectReference Include="..\ZB.MOM.WW.OtOpcUa.Core\ZB.MOM.WW.OtOpcUa.Core.csproj"/>
<ProjectReference Include="..\ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared\ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared.csproj"/>
<ProjectReference Include="..\ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian\ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian.csproj"/>
</ItemGroup>
<ItemGroup>

View File

@@ -0,0 +1,88 @@
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using ZB.MOM.WW.OtOpcUa.Configuration;
using ZB.MOM.WW.OtOpcUa.Core.Hosting;
namespace ZB.MOM.WW.OtOpcUa.Server;
/// <summary>
/// Task #248 — bridges the gap surfaced by the Phase 7 live smoke (#240) where
/// <c>DriverInstance</c> rows in the central config DB had no path to materialise
/// as live <see cref="Core.Abstractions.IDriver"/> instances in <see cref="DriverHost"/>.
/// Called from <c>OpcUaServerService.ExecuteAsync</c> after the bootstrap loads
/// the published generation, before address-space build.
/// </summary>
/// <remarks>
/// <para>
/// Per row: looks up the <c>DriverType</c> string in
/// <see cref="DriverFactoryRegistry"/>, calls the factory with the row's
/// <c>DriverInstanceId</c> + <c>DriverConfig</c> JSON to construct an
/// <see cref="Core.Abstractions.IDriver"/>, then registers via
/// <see cref="DriverHost.RegisterAsync"/> which invokes <c>InitializeAsync</c>
/// under the host's lifecycle semantics.
/// </para>
/// <para>
/// Unknown <c>DriverType</c> = factory not registered = log a warning and skip.
/// Per plan decision #12 (driver isolation), failure to construct or initialize
/// one driver doesn't prevent the rest from coming up — the Server keeps serving
/// the others' subtrees + the operator can fix the misconfigured row + republish
/// to retry.
/// </para>
/// </remarks>
public sealed class DriverInstanceBootstrapper(
DriverFactoryRegistry factories,
DriverHost driverHost,
IServiceScopeFactory scopeFactory,
ILogger<DriverInstanceBootstrapper> logger)
{
public async Task<int> RegisterDriversFromGenerationAsync(long generationId, CancellationToken ct)
{
using var scope = scopeFactory.CreateScope();
var db = scope.ServiceProvider.GetRequiredService<OtOpcUaConfigDbContext>();
var rows = await db.DriverInstances.AsNoTracking()
.Where(d => d.GenerationId == generationId && d.Enabled)
.ToListAsync(ct).ConfigureAwait(false);
var registered = 0;
var skippedUnknownType = 0;
var failedInit = 0;
foreach (var row in rows)
{
var factory = factories.TryGet(row.DriverType);
if (factory is null)
{
logger.LogWarning(
"DriverInstance {Id} skipped — DriverType '{Type}' has no registered factory (known: {Known})",
row.DriverInstanceId, row.DriverType, string.Join(",", factories.RegisteredTypes));
skippedUnknownType++;
continue;
}
try
{
var driver = factory(row.DriverInstanceId, row.DriverConfig);
await driverHost.RegisterAsync(driver, row.DriverConfig, ct).ConfigureAwait(false);
registered++;
logger.LogInformation(
"DriverInstance {Id} ({Type}) registered + initialized", row.DriverInstanceId, row.DriverType);
}
catch (Exception ex)
{
// Plan decision #12 — driver isolation. Log + continue so one bad row
// doesn't deny the OPC UA endpoint to the rest of the fleet.
logger.LogError(ex,
"DriverInstance {Id} ({Type}) failed to initialize — driver state will reflect Faulted; operator can republish to retry",
row.DriverInstanceId, row.DriverType);
failedInit++;
}
}
logger.LogInformation(
"DriverInstanceBootstrapper: gen={Gen} registered={Registered} skippedUnknownType={Skipped} failedInit={Failed}",
generationId, registered, skippedUnknownType, failedInit);
return registered;
}
}

View File

@@ -34,9 +34,11 @@ public sealed class OpcUaApplicationHost : IAsyncDisposable
// Phase 7 Stream G follow-up (task #239). When composed with the VirtualTagEngine +
// ScriptedAlarmEngine sources these route node reads to the engines instead of the
// driver. Null = Phase 7 engines not enabled for this deployment (identical to pre-
// Phase-7 behaviour).
private readonly ZB.MOM.WW.OtOpcUa.Core.Abstractions.IReadable? _virtualReadable;
private readonly ZB.MOM.WW.OtOpcUa.Core.Abstractions.IReadable? _scriptedAlarmReadable;
// Phase-7 behaviour). Late-bindable via SetPhase7Sources because the engines need
// the bootstrapped generation id before they can compose, which is only known after
// the host has been DI-constructed (task #246).
private ZB.MOM.WW.OtOpcUa.Core.Abstractions.IReadable? _virtualReadable;
private ZB.MOM.WW.OtOpcUa.Core.Abstractions.IReadable? _scriptedAlarmReadable;
private readonly ILoggerFactory _loggerFactory;
private readonly ILogger<OpcUaApplicationHost> _logger;
@@ -75,6 +77,24 @@ public sealed class OpcUaApplicationHost : IAsyncDisposable
public OtOpcUaServer? Server => _server;
/// <summary>
/// Late-bind the Phase 7 engine-backed <c>IReadable</c> sources. Must be
/// called BEFORE <see cref="StartAsync"/> — once the OPC UA server starts, the
/// <see cref="OtOpcUaServer"/> ctor captures the field values + per-node
/// <see cref="DriverNodeManager"/>s are constructed. Calling this after start has
/// no effect on already-materialized node managers.
/// </summary>
public void SetPhase7Sources(
ZB.MOM.WW.OtOpcUa.Core.Abstractions.IReadable? virtualReadable,
ZB.MOM.WW.OtOpcUa.Core.Abstractions.IReadable? scriptedAlarmReadable)
{
if (_server is not null)
throw new InvalidOperationException(
"Phase 7 sources must be set before OpcUaApplicationHost.StartAsync; the OtOpcUaServer + DriverNodeManagers have already captured the previous values.");
_virtualReadable = virtualReadable;
_scriptedAlarmReadable = scriptedAlarmReadable;
}
/// <summary>
/// Builds the <see cref="ApplicationConfiguration"/>, validates/creates the application
/// certificate, constructs + starts the <see cref="OtOpcUaServer"/>, then drives

View File

@@ -3,6 +3,7 @@ using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using ZB.MOM.WW.OtOpcUa.Core.Hosting;
using ZB.MOM.WW.OtOpcUa.Server.OpcUa;
using ZB.MOM.WW.OtOpcUa.Server.Phase7;
namespace ZB.MOM.WW.OtOpcUa.Server;
@@ -17,6 +18,8 @@ public sealed class OpcUaServerService(
DriverHost driverHost,
OpcUaApplicationHost applicationHost,
DriverEquipmentContentRegistry equipmentContentRegistry,
DriverInstanceBootstrapper driverBootstrapper,
Phase7Composer phase7Composer,
IServiceScopeFactory scopeFactory,
ILogger<OpcUaServerService> logger) : BackgroundService
{
@@ -34,12 +37,26 @@ public sealed class OpcUaServerService(
// Skipped when no generation is Published yet — the fleet boots into a UNS-less
// address space until the first publish, then the registry fills on next restart.
if (result.GenerationId is { } gen)
{
// Task #248 — register IDriver instances from the published DriverInstance
// rows BEFORE the equipment-content load + Phase 7 compose, so the rest of
// the pipeline sees a populated DriverHost. Without this step Phase 7's
// CachedTagUpstreamSource has no upstream feed + virtual-tag scripts read
// BadNodeIdUnknown for every tag path (gap surfaced by task #240 smoke).
await driverBootstrapper.RegisterDriversFromGenerationAsync(gen, stoppingToken);
await PopulateEquipmentContentAsync(gen, stoppingToken);
// PR 17: stand up the OPC UA server + drive discovery per registered driver. Driver
// registration itself (RegisterAsync on DriverHost) happens during an earlier DI
// extension once the central config DB query + per-driver factory land; for now the
// server comes up with whatever drivers are in DriverHost at start time.
// Phase 7 follow-up #246 — load Script + VirtualTag + ScriptedAlarm rows,
// compose VirtualTagEngine + ScriptedAlarmEngine, start the driver-bridge
// feed. SetPhase7Sources MUST run before applicationHost.StartAsync because
// OtOpcUaServer + DriverNodeManager construction captures the field values
// — late binding after server start is rejected with InvalidOperationException.
// No-op when the generation has no virtual tags or scripted alarms.
var phase7 = await phase7Composer.PrepareAsync(gen, stoppingToken);
applicationHost.SetPhase7Sources(phase7.VirtualReadable, phase7.ScriptedAlarmReadable);
}
await applicationHost.StartAsync(stoppingToken);
logger.LogInformation("OtOpcUa.Server running. Hosted drivers: {Count}", driverHost.RegisteredDriverIds.Count);
@@ -57,6 +74,11 @@ public sealed class OpcUaServerService(
public override async Task StopAsync(CancellationToken cancellationToken)
{
await base.StopAsync(cancellationToken);
// Dispose Phase 7 first so the bridge stops feeding the cache + the engines
// stop firing alarm/historian events before the OPC UA server tears down its
// node managers. Otherwise an in-flight cascade could try to push through a
// disposed source and surface as a noisy shutdown warning.
await phase7Composer.DisposeAsync();
await applicationHost.DisposeAsync();
await driverHost.DisposeAsync();
}

View File

@@ -0,0 +1,84 @@
using System.Collections.Concurrent;
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
namespace ZB.MOM.WW.OtOpcUa.Server.Phase7;
/// <summary>
/// Production <c>ITagUpstreamSource</c> for the Phase 7 engines (implements both the
/// Core.VirtualTags and Core.ScriptedAlarms variants — identical shape, distinct
/// namespaces). Per the interface docstring, reads are synchronous — user scripts
/// call <c>ctx.GetTag</c> inline — so we serve from a last-known-value cache that
/// the driver-bridge populates asynchronously via <see cref="Push"/>.
/// </summary>
/// <remarks>
/// <para>
/// <see cref="Push"/> is called by the driver-bridge (wiring added by task #244)
/// every time a driver's <c>ISubscribable.OnDataChange</c> fires. Subscribers
/// registered via <see cref="SubscribeTag"/> are notified synchronously on the
/// calling thread — the VirtualTagEngine + ScriptedAlarmEngine handle their own
/// async hand-off via <c>SemaphoreSlim</c>.
/// </para>
/// <para>
/// Reads of a path that has never been <see cref="Push"/>-ed return
/// <see cref="UpstreamNotConfigured"/>-quality — which scripts see as
/// <c>ctx.GetTag("...").StatusCode == BadNodeIdUnknown</c> and can branch on.
/// </para>
/// </remarks>
public sealed class CachedTagUpstreamSource
: Core.VirtualTags.ITagUpstreamSource,
Core.ScriptedAlarms.ITagUpstreamSource
{
private readonly ConcurrentDictionary<string, DataValueSnapshot> _values = new(StringComparer.Ordinal);
private readonly ConcurrentDictionary<string, List<Action<string, DataValueSnapshot>>> _observers
= new(StringComparer.Ordinal);
public DataValueSnapshot ReadTag(string path)
{
if (string.IsNullOrEmpty(path)) throw new ArgumentException("path required", nameof(path));
return _values.TryGetValue(path, out var snap)
? snap
: new DataValueSnapshot(null, UpstreamNotConfigured, null, DateTime.UtcNow);
}
public IDisposable SubscribeTag(string path, Action<string, DataValueSnapshot> observer)
{
if (string.IsNullOrEmpty(path)) throw new ArgumentException("path required", nameof(path));
ArgumentNullException.ThrowIfNull(observer);
var list = _observers.GetOrAdd(path, _ => []);
lock (list) list.Add(observer);
return new Unsub(this, path, observer);
}
/// <summary>
/// Driver-bridge write path — called when a driver delivers a value change for
/// <paramref name="path"/>. Updates the cache + fans out to every observer.
/// Safe for concurrent callers; observers fire on the caller's thread.
/// </summary>
public void Push(string path, DataValueSnapshot snapshot)
{
if (string.IsNullOrEmpty(path)) throw new ArgumentException("path required", nameof(path));
ArgumentNullException.ThrowIfNull(snapshot);
_values[path] = snapshot;
if (!_observers.TryGetValue(path, out var list)) return;
Action<string, DataValueSnapshot>[] snapshotList;
lock (list) snapshotList = list.ToArray();
foreach (var observer in snapshotList) observer(path, snapshot);
}
/// <summary>Mirror of OPC UA <c>StatusCodes.BadNodeIdUnknown</c> without pulling the OPC stack dependency.</summary>
public const uint UpstreamNotConfigured = 0x80340000;
private sealed class Unsub(CachedTagUpstreamSource owner, string path, Action<string, DataValueSnapshot> observer) : IDisposable
{
private bool _disposed;
public void Dispose()
{
if (_disposed) return;
_disposed = true;
if (owner._observers.TryGetValue(path, out var list))
lock (list) list.Remove(observer);
}
}
}

View File

@@ -0,0 +1,146 @@
using Microsoft.Extensions.Logging;
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
namespace ZB.MOM.WW.OtOpcUa.Server.Phase7;
/// <summary>
/// Phase 7 follow-up (task #244). Subscribes to live driver <see cref="ISubscribable"/>
/// surfaces for every input path the Phase 7 engines care about + pushes incoming
/// <see cref="DataChangeEventArgs.Snapshot"/>s into <see cref="CachedTagUpstreamSource"/>
/// so <c>ctx.GetTag</c> reads see the freshest driver value.
/// </summary>
/// <remarks>
/// <para>
/// Each <see cref="DriverFeed"/> declares a driver + the path-to-fullRef map for the
/// attributes that driver provides. The bridge groups by driver so each <see cref="ISubscribable"/>
/// gets one <c>SubscribeAsync</c> call with a batched fullRef list — drivers that
/// poll under the hood (Modbus, AB CIP, S7) consolidate the polls; drivers with
/// native subscriptions (Galaxy, OPC UA Client, TwinCAT) get a single watch list.
/// </para>
/// <para>
/// Because driver fullRefs are opaque + driver-specific (Galaxy
/// <c>"DelmiaReceiver_001.Temp"</c>, Modbus <c>"40001"</c>, AB CIP
/// <c>"Temperature[0]"</c>), the bridge keeps a per-feed reverse map from fullRef
/// back to UNS path. <c>OnDataChange</c> fires keyed by fullRef; the bridge
/// translates to the script-side path before calling <see cref="CachedTagUpstreamSource.Push"/>.
/// </para>
/// <para>
/// Lifecycle: construct → <see cref="StartAsync"/> with the feeds → keep alive
/// alongside the engines → <see cref="DisposeAsync"/> unsubscribes from every
/// driver + unhooks the OnDataChange handlers. Driver subscriptions don't leak
/// even on abnormal shutdown because the disposal awaits each
/// <c>UnsubscribeAsync</c>.
/// </para>
/// </remarks>
public sealed class DriverSubscriptionBridge : IAsyncDisposable
{
private readonly CachedTagUpstreamSource _sink;
private readonly ILogger<DriverSubscriptionBridge> _logger;
private readonly List<ActiveSubscription> _active = [];
private bool _started;
private bool _disposed;
public DriverSubscriptionBridge(
CachedTagUpstreamSource sink,
ILogger<DriverSubscriptionBridge> logger)
{
_sink = sink ?? throw new ArgumentNullException(nameof(sink));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
/// <summary>
/// Subscribe each feed's driver to its declared fullRefs + wire push-to-cache.
/// Idempotent guard rejects double-start. Throws on the first subscribe failure
/// so misconfiguration surfaces fast — partial-subscribe state doesn't linger.
/// </summary>
public async Task StartAsync(IEnumerable<DriverFeed> feeds, CancellationToken ct)
{
ArgumentNullException.ThrowIfNull(feeds);
if (_disposed) throw new ObjectDisposedException(nameof(DriverSubscriptionBridge));
if (_started) throw new InvalidOperationException("DriverSubscriptionBridge already started");
_started = true;
foreach (var feed in feeds)
{
if (feed.PathToFullRef.Count == 0) continue;
// Reverse map for OnDataChange dispatch — driver fires keyed by FullReference,
// we push keyed by the script-side path.
var fullRefToPath = feed.PathToFullRef
.ToDictionary(kv => kv.Value, kv => kv.Key, StringComparer.Ordinal);
var fullRefs = feed.PathToFullRef.Values.Distinct(StringComparer.Ordinal).ToList();
EventHandler<DataChangeEventArgs> handler = (_, e) =>
{
if (fullRefToPath.TryGetValue(e.FullReference, out var unsPath))
_sink.Push(unsPath, e.Snapshot);
};
feed.Driver.OnDataChange += handler;
try
{
// OTOPCUA0001 suppression — the analyzer flags ISubscribable calls outside
// CapabilityInvoker. This bridge IS the lifecycle-coordinator for Phase 7
// subscriptions: it runs once at engine compose, doesn't hot-path per
// script evaluation (the engines read from the cache instead), and surfaces
// any subscribe failure by aborting bridge start. Wrapping in the per-call
// resilience pipeline would add nothing — there's no caller to retry on
// behalf of, and the breaker/bulkhead semantics belong to actual driver Read
// dispatch, which still goes through CapabilityInvoker via DriverNodeManager.
#pragma warning disable OTOPCUA0001
var handle = await feed.Driver.SubscribeAsync(fullRefs, feed.PublishingInterval, ct).ConfigureAwait(false);
#pragma warning restore OTOPCUA0001
_active.Add(new ActiveSubscription(feed.Driver, handle, handler));
_logger.LogInformation(
"Phase 7 bridge subscribed {Count} attribute(s) from driver {Driver} (handle {Handle})",
fullRefs.Count, feed.Driver.GetType().Name, handle.DiagnosticId);
}
catch
{
feed.Driver.OnDataChange -= handler;
throw;
}
}
}
public async ValueTask DisposeAsync()
{
if (_disposed) return;
_disposed = true;
foreach (var sub in _active)
{
sub.Driver.OnDataChange -= sub.Handler;
try
{
#pragma warning disable OTOPCUA0001 // bridge lifecycle — see StartAsync suppression rationale
await sub.Driver.UnsubscribeAsync(sub.Handle, CancellationToken.None).ConfigureAwait(false);
#pragma warning restore OTOPCUA0001
}
catch (Exception ex)
{
_logger.LogWarning(ex,
"Driver {Driver} UnsubscribeAsync threw on bridge dispose (handle {Handle})",
sub.Driver.GetType().Name, sub.Handle.DiagnosticId);
}
}
_active.Clear();
}
private sealed record ActiveSubscription(
ISubscribable Driver,
ISubscriptionHandle Handle,
EventHandler<DataChangeEventArgs> Handler);
}
/// <summary>
/// One driver's contribution to the Phase 7 bridge — the driver's <see cref="ISubscribable"/>
/// surface plus the path-to-fullRef map the bridge uses to translate driver-side
/// <see cref="DataChangeEventArgs.FullReference"/> back to script-side paths.
/// </summary>
/// <param name="Driver">The driver's subscribable surface (every shipped driver implements <see cref="ISubscribable"/>).</param>
/// <param name="PathToFullRef">UNS path the script uses → driver-opaque fullRef. Empty map = nothing to subscribe (skipped).</param>
/// <param name="PublishingInterval">Forwarded to the driver's <see cref="ISubscribable.SubscribeAsync"/>.</param>
public sealed record DriverFeed(
ISubscribable Driver,
IReadOnlyDictionary<string, string> PathToFullRef,
TimeSpan PublishingInterval);

View File

@@ -0,0 +1,237 @@
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using ZB.MOM.WW.OtOpcUa.Configuration;
using ZB.MOM.WW.OtOpcUa.Configuration.Entities;
using ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian;
using ZB.MOM.WW.OtOpcUa.Core.Hosting;
using ZB.MOM.WW.OtOpcUa.Core.OpcUa;
using ZB.MOM.WW.OtOpcUa.Core.ScriptedAlarms;
using ZB.MOM.WW.OtOpcUa.Server.OpcUa;
namespace ZB.MOM.WW.OtOpcUa.Server.Phase7;
/// <summary>
/// Phase 7 follow-up (task #246) — orchestrates the runtime composition of virtual
/// tags + scripted alarms + the historian sink + the driver-bridge that feeds the
/// engines. Called by <see cref="OpcUaServerService"/> after the bootstrap generation
/// loads + before <see cref="OpcUaApplicationHost.StartAsync"/>.
/// </summary>
/// <remarks>
/// <para>
/// <see cref="PrepareAsync"/> reads Script / VirtualTag / ScriptedAlarm rows from
/// the central config DB at the bootstrapped generation, instantiates a
/// <see cref="CachedTagUpstreamSource"/>, runs <see cref="Phase7EngineComposer.Compose"/>,
/// starts a <see cref="DriverSubscriptionBridge"/> per registered driver feeding
/// <see cref="EquipmentNamespaceContent"/>'s tag rows into the cache, and returns
/// the engine-backed <see cref="Core.Abstractions.IReadable"/> sources for
/// <see cref="OpcUaApplicationHost.SetPhase7Sources"/>.
/// </para>
/// <para>
/// <see cref="DisposeAsync"/> tears down the bridge first (so no more events
/// arrive at the cache), then the engines (so cascades + timer ticks stop), then
/// the SQLite sink (which flushes any in-flight drain). Lifetime is owned by the
/// host; <see cref="OpcUaServerService.StopAsync"/> calls dispose during graceful
/// shutdown.
/// </para>
/// </remarks>
public sealed class Phase7Composer : IAsyncDisposable
{
private readonly IServiceScopeFactory _scopeFactory;
private readonly DriverHost _driverHost;
private readonly DriverEquipmentContentRegistry _equipmentRegistry;
private readonly IAlarmHistorianSink _historianSink;
private readonly ILoggerFactory _loggerFactory;
private readonly Serilog.ILogger _scriptLogger;
private readonly ILogger<Phase7Composer> _logger;
private DriverSubscriptionBridge? _bridge;
private Phase7ComposedSources _sources = Phase7ComposedSources.Empty;
// Sink we constructed in PrepareAsync (vs. the injected fallback). Held so
// DisposeAsync can flush + tear down the SQLite drain timer.
private SqliteStoreAndForwardSink? _ownedSink;
private bool _disposed;
public Phase7Composer(
IServiceScopeFactory scopeFactory,
DriverHost driverHost,
DriverEquipmentContentRegistry equipmentRegistry,
IAlarmHistorianSink historianSink,
ILoggerFactory loggerFactory,
Serilog.ILogger scriptLogger,
ILogger<Phase7Composer> logger)
{
_scopeFactory = scopeFactory ?? throw new ArgumentNullException(nameof(scopeFactory));
_driverHost = driverHost ?? throw new ArgumentNullException(nameof(driverHost));
_equipmentRegistry = equipmentRegistry ?? throw new ArgumentNullException(nameof(equipmentRegistry));
_historianSink = historianSink ?? throw new ArgumentNullException(nameof(historianSink));
_loggerFactory = loggerFactory ?? throw new ArgumentNullException(nameof(loggerFactory));
_scriptLogger = scriptLogger ?? throw new ArgumentNullException(nameof(scriptLogger));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
public Phase7ComposedSources Sources => _sources;
public async Task<Phase7ComposedSources> PrepareAsync(long generationId, CancellationToken ct)
{
if (_disposed) throw new ObjectDisposedException(nameof(Phase7Composer));
// Load the three Phase 7 row sets in one DB scope.
List<Script> scripts;
List<VirtualTag> virtualTags;
List<ScriptedAlarm> scriptedAlarms;
using (var scope = _scopeFactory.CreateScope())
{
var db = scope.ServiceProvider.GetRequiredService<OtOpcUaConfigDbContext>();
scripts = await db.Scripts.AsNoTracking()
.Where(s => s.GenerationId == generationId).ToListAsync(ct).ConfigureAwait(false);
virtualTags = await db.VirtualTags.AsNoTracking()
.Where(v => v.GenerationId == generationId && v.Enabled).ToListAsync(ct).ConfigureAwait(false);
scriptedAlarms = await db.ScriptedAlarms.AsNoTracking()
.Where(a => a.GenerationId == generationId && a.Enabled).ToListAsync(ct).ConfigureAwait(false);
}
if (virtualTags.Count == 0 && scriptedAlarms.Count == 0)
{
_logger.LogInformation("Phase 7: no virtual tags or scripted alarms in generation {Gen}; engines dormant", generationId);
return Phase7ComposedSources.Empty;
}
var upstream = new CachedTagUpstreamSource();
// Phase 7 follow-up #247 — if any registered driver implements IAlarmHistorianWriter
// (today: GalaxyProxyDriver), wrap it in a SqliteStoreAndForwardSink at
// %ProgramData%/OtOpcUa/alarm-historian-queue.db with the 2s drain cadence the
// sink's docstring recommends. Otherwise fall back to the injected sink (Null in
// the default registration).
var historianSink = ResolveHistorianSink();
_sources = Phase7EngineComposer.Compose(
scripts: scripts,
virtualTags: virtualTags,
scriptedAlarms: scriptedAlarms,
upstream: upstream,
alarmStateStore: new InMemoryAlarmStateStore(),
historianSink: historianSink,
rootScriptLogger: _scriptLogger,
loggerFactory: _loggerFactory);
_logger.LogInformation(
"Phase 7: composed engines from generation {Gen} — {Vt} virtual tag(s), {Al} scripted alarm(s), {Sc} script(s)",
generationId, virtualTags.Count, scriptedAlarms.Count, scripts.Count);
// Build driver feeds from each registered driver's EquipmentNamespaceContent + start
// the bridge. Drivers without populated content (Galaxy SystemPlatform-kind, drivers
// whose Equipment rows haven't been published yet) contribute an empty feed which
// the bridge silently skips.
_bridge = new DriverSubscriptionBridge(upstream, _loggerFactory.CreateLogger<DriverSubscriptionBridge>());
var feeds = BuildDriverFeeds(_driverHost, _equipmentRegistry);
await _bridge.StartAsync(feeds, ct).ConfigureAwait(false);
return _sources;
}
private IAlarmHistorianSink ResolveHistorianSink()
{
IAlarmHistorianWriter? writer = null;
foreach (var driverId in _driverHost.RegisteredDriverIds)
{
if (_driverHost.GetDriver(driverId) is IAlarmHistorianWriter w)
{
writer = w;
_logger.LogInformation(
"Phase 7 historian sink: driver {Driver} provides IAlarmHistorianWriter — wiring SqliteStoreAndForwardSink",
driverId);
break;
}
}
if (writer is null)
{
_logger.LogInformation(
"Phase 7 historian sink: no driver provides IAlarmHistorianWriter — using {Sink}",
_historianSink.GetType().Name);
return _historianSink;
}
var queueRoot = Environment.GetFolderPath(Environment.SpecialFolder.CommonApplicationData);
if (string.IsNullOrEmpty(queueRoot)) queueRoot = Path.GetTempPath();
var queueDir = Path.Combine(queueRoot, "OtOpcUa");
Directory.CreateDirectory(queueDir);
var queuePath = Path.Combine(queueDir, "alarm-historian-queue.db");
var sinkLogger = _loggerFactory.CreateLogger<SqliteStoreAndForwardSink>();
// SqliteStoreAndForwardSink wants a Serilog logger for warn-on-eviction emissions;
// bridge the Microsoft logger via Serilog's null-safe path until the sink's
// dependency surface is reshaped (covered as part of release-readiness).
var serilogShim = _scriptLogger.ForContext("HistorianQueuePath", queuePath);
_ownedSink = new SqliteStoreAndForwardSink(
databasePath: queuePath,
writer: writer,
logger: serilogShim);
_ownedSink.StartDrainLoop(TimeSpan.FromSeconds(2));
return _ownedSink;
}
/// <summary>
/// For each registered driver that exposes <see cref="Core.Abstractions.ISubscribable"/>,
/// build a UNS-path → driver-fullRef map from its EquipmentNamespaceContent.
/// Path convention: <c>/{areaName}/{lineName}/{equipmentName}/{tagName}</c> matching
/// what the EquipmentNodeWalker emits into the OPC UA browse tree, so script literals
/// written against the operator-visible tree work without translation.
/// </summary>
internal static IReadOnlyList<DriverFeed> BuildDriverFeeds(
DriverHost driverHost, DriverEquipmentContentRegistry equipmentRegistry)
{
var feeds = new List<DriverFeed>();
foreach (var driverId in driverHost.RegisteredDriverIds)
{
var driver = driverHost.GetDriver(driverId);
if (driver is not Core.Abstractions.ISubscribable subscribable) continue;
var content = equipmentRegistry.Get(driverId);
if (content is null) continue;
var pathToFullRef = MapPathsToFullRefs(content);
if (pathToFullRef.Count == 0) continue;
feeds.Add(new DriverFeed(subscribable, pathToFullRef, TimeSpan.FromSeconds(1)));
}
return feeds;
}
internal static IReadOnlyDictionary<string, string> MapPathsToFullRefs(EquipmentNamespaceContent content)
{
var result = new Dictionary<string, string>(StringComparer.Ordinal);
var areaById = content.Areas.ToDictionary(a => a.UnsAreaId, StringComparer.OrdinalIgnoreCase);
var lineById = content.Lines.ToDictionary(l => l.UnsLineId, StringComparer.OrdinalIgnoreCase);
var equipmentById = content.Equipment.ToDictionary(e => e.EquipmentId, StringComparer.OrdinalIgnoreCase);
foreach (var tag in content.Tags)
{
if (string.IsNullOrEmpty(tag.EquipmentId)) continue;
if (!equipmentById.TryGetValue(tag.EquipmentId!, out var eq)) continue;
if (!lineById.TryGetValue(eq.UnsLineId, out var line)) continue;
if (!areaById.TryGetValue(line.UnsAreaId, out var area)) continue;
var path = $"/{area.Name}/{line.Name}/{eq.Name}/{tag.Name}";
result[path] = tag.TagConfig; // duplicate-path collisions naturally win-last; UI publish-validation rules out duplicate names
}
return result;
}
public async ValueTask DisposeAsync()
{
if (_disposed) return;
_disposed = true;
if (_bridge is not null) await _bridge.DisposeAsync().ConfigureAwait(false);
foreach (var d in _sources.Disposables)
{
try { d.Dispose(); }
catch (Exception ex) { _logger.LogWarning(ex, "Phase 7 disposable threw during shutdown"); }
}
// Owned SQLite sink: dispose first so the drain timer stops + final batch flushes
// before we release the writer-bearing driver via DriverHost.DisposeAsync upstream.
_ownedSink?.Dispose();
if (_historianSink is IDisposable disposableSink) disposableSink.Dispose();
}
}

View File

@@ -0,0 +1,208 @@
using Microsoft.Extensions.Logging;
using ZB.MOM.WW.OtOpcUa.Configuration.Entities;
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
using ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian;
using ZB.MOM.WW.OtOpcUa.Core.Scripting;
using ZB.MOM.WW.OtOpcUa.Core.ScriptedAlarms;
using ZB.MOM.WW.OtOpcUa.Core.VirtualTags;
namespace ZB.MOM.WW.OtOpcUa.Server.Phase7;
/// <summary>
/// Phase 7 follow-up (task #243) — maps the generation's <see cref="Script"/> /
/// <see cref="VirtualTag"/> / <see cref="ScriptedAlarm"/> rows into the runtime
/// definitions <see cref="VirtualTagEngine"/> + <see cref="ScriptedAlarmEngine"/>
/// expect, builds the engine instances, and returns the <see cref="IReadable"/>
/// sources plus an <see cref="IAlarmSource"/> for the <c>DriverNodeManager</c>
/// wiring added by task #239.
/// </summary>
/// <remarks>
/// <para>
/// Empty Phase 7 config (no virtual tags + no scripted alarms) is a valid state:
/// <see cref="Compose"/> returns a <see cref="Phase7ComposedSources"/> with null
/// sources so Program.cs can pass them through to <c>OpcUaApplicationHost</c>
/// unchanged — deployments without scripts behave exactly as they did before
/// Phase 7.
/// </para>
/// <para>
/// The caller owns the returned <see cref="Phase7ComposedSources.Disposables"/>
/// and must dispose them on shutdown. Engine cascades + timer ticks run off
/// background threads until then.
/// </para>
/// </remarks>
public static class Phase7EngineComposer
{
public static Phase7ComposedSources Compose(
IReadOnlyList<Script> scripts,
IReadOnlyList<VirtualTag> virtualTags,
IReadOnlyList<ScriptedAlarm> scriptedAlarms,
CachedTagUpstreamSource upstream,
IAlarmStateStore alarmStateStore,
IAlarmHistorianSink historianSink,
Serilog.ILogger rootScriptLogger,
ILoggerFactory loggerFactory)
{
ArgumentNullException.ThrowIfNull(scripts);
ArgumentNullException.ThrowIfNull(virtualTags);
ArgumentNullException.ThrowIfNull(scriptedAlarms);
ArgumentNullException.ThrowIfNull(upstream);
ArgumentNullException.ThrowIfNull(alarmStateStore);
ArgumentNullException.ThrowIfNull(historianSink);
ArgumentNullException.ThrowIfNull(rootScriptLogger);
ArgumentNullException.ThrowIfNull(loggerFactory);
if (virtualTags.Count == 0 && scriptedAlarms.Count == 0)
return Phase7ComposedSources.Empty;
var scriptById = scripts
.Where(s => s.Enabled())
.ToDictionary(s => s.ScriptId, StringComparer.Ordinal);
var scriptLoggerFactory = new ScriptLoggerFactory(rootScriptLogger);
var disposables = new List<IDisposable>();
// Engines take Serilog.ILogger — each engine gets its own so rolling-file emissions
// stay keyed to the right source in the scripts-*.log.
VirtualTagSource? vtSource = null;
if (virtualTags.Count > 0)
{
var vtDefs = ProjectVirtualTags(virtualTags, scriptById).ToList();
var vtEngine = new VirtualTagEngine(upstream, scriptLoggerFactory, rootScriptLogger);
vtEngine.Load(vtDefs);
vtSource = new VirtualTagSource(vtEngine);
disposables.Add(vtEngine);
}
IReadable? alarmReadable = null;
if (scriptedAlarms.Count > 0)
{
var alarmDefs = ProjectScriptedAlarms(scriptedAlarms, scriptById).ToList();
var alarmEngine = new ScriptedAlarmEngine(upstream, alarmStateStore, scriptLoggerFactory, rootScriptLogger);
// Wire alarm emissions to the historian sink (Stream D). Fire-and-forget because
// the sink's EnqueueAsync is already non-blocking from the producer's view.
var engineLogger = loggerFactory.CreateLogger("Phase7HistorianRouter");
alarmEngine.OnEvent += (_, e) => _ = RouteToHistorianAsync(e, historianSink, engineLogger);
alarmEngine.LoadAsync(alarmDefs, CancellationToken.None).GetAwaiter().GetResult();
var alarmSource = new ScriptedAlarmSource(alarmEngine);
// Task #245 — expose each alarm's current Active state as IReadable so OPC UA
// variable reads on Source=ScriptedAlarm nodes return the live predicate truth
// instead of BadNotFound. ScriptedAlarmSource stays registered as IAlarmSource
// for the event stream; the IReadable is a separate adapter over the same engine.
alarmReadable = new ScriptedAlarmReadable(alarmEngine);
disposables.Add(alarmEngine);
disposables.Add(alarmSource);
}
return new Phase7ComposedSources(vtSource, alarmReadable, disposables);
}
internal static IEnumerable<VirtualTagDefinition> ProjectVirtualTags(
IReadOnlyList<VirtualTag> rows, IReadOnlyDictionary<string, Script> scriptById)
{
foreach (var row in rows)
{
if (!row.Enabled) continue;
if (!scriptById.TryGetValue(row.ScriptId, out var script))
throw new InvalidOperationException(
$"VirtualTag '{row.VirtualTagId}' references unknown / disabled Script '{row.ScriptId}' in this generation");
yield return new VirtualTagDefinition(
Path: row.VirtualTagId,
DataType: ParseDataType(row.DataType),
ScriptSource: script.SourceCode,
ChangeTriggered: row.ChangeTriggered,
TimerInterval: row.TimerIntervalMs.HasValue
? TimeSpan.FromMilliseconds(row.TimerIntervalMs.Value)
: null,
Historize: row.Historize);
}
}
internal static IEnumerable<ScriptedAlarmDefinition> ProjectScriptedAlarms(
IReadOnlyList<ScriptedAlarm> rows, IReadOnlyDictionary<string, Script> scriptById)
{
foreach (var row in rows)
{
if (!row.Enabled) continue;
if (!scriptById.TryGetValue(row.PredicateScriptId, out var script))
throw new InvalidOperationException(
$"ScriptedAlarm '{row.ScriptedAlarmId}' references unknown / disabled predicate Script '{row.PredicateScriptId}'");
yield return new ScriptedAlarmDefinition(
AlarmId: row.ScriptedAlarmId,
EquipmentPath: row.EquipmentId,
AlarmName: row.Name,
Kind: ParseAlarmKind(row.AlarmType),
Severity: MapSeverity(row.Severity),
MessageTemplate: row.MessageTemplate,
PredicateScriptSource: script.SourceCode,
HistorizeToAveva: row.HistorizeToAveva,
Retain: row.Retain);
}
}
private static DriverDataType ParseDataType(string raw) =>
Enum.TryParse<DriverDataType>(raw, ignoreCase: true, out var parsed) ? parsed : DriverDataType.String;
private static AlarmKind ParseAlarmKind(string raw) => raw switch
{
"AlarmCondition" => AlarmKind.AlarmCondition,
"LimitAlarm" => AlarmKind.LimitAlarm,
"DiscreteAlarm" => AlarmKind.DiscreteAlarm,
"OffNormalAlarm" => AlarmKind.OffNormalAlarm,
_ => throw new InvalidOperationException($"Unknown AlarmType '{raw}' — DB check constraint should have caught this"),
};
// OPC UA Part 9 severity bands (1..1000) → AlarmSeverity enum. Matches the same
// banding the AB CIP ALMA projection + OpcUaClient MapSeverity use.
private static AlarmSeverity MapSeverity(int s) => s switch
{
<= 250 => AlarmSeverity.Low,
<= 500 => AlarmSeverity.Medium,
<= 750 => AlarmSeverity.High,
_ => AlarmSeverity.Critical,
};
private static async Task RouteToHistorianAsync(
ScriptedAlarmEvent e, IAlarmHistorianSink sink, Microsoft.Extensions.Logging.ILogger log)
{
try
{
var historianEvent = new AlarmHistorianEvent(
AlarmId: e.AlarmId,
EquipmentPath: e.EquipmentPath,
AlarmName: e.AlarmName,
AlarmTypeName: e.Kind.ToString(),
Severity: e.Severity,
EventKind: e.Emission.ToString(),
Message: e.Message,
User: e.Condition.LastAckUser ?? "system",
Comment: e.Condition.LastAckComment,
TimestampUtc: e.TimestampUtc);
await sink.EnqueueAsync(historianEvent, CancellationToken.None).ConfigureAwait(false);
}
catch (Exception ex)
{
log.LogWarning(ex, "Historian enqueue failed for alarm {AlarmId}/{Emission}", e.AlarmId, e.Emission);
}
}
}
/// <summary>What <see cref="Phase7EngineComposer.Compose"/> returns.</summary>
/// <param name="VirtualReadable">Non-null when virtual tags were composed; pass to <c>OpcUaApplicationHost.virtualReadable</c>.</param>
/// <param name="ScriptedAlarmReadable">Non-null when scripted alarms were composed; pass to <c>OpcUaApplicationHost.scriptedAlarmReadable</c>.</param>
/// <param name="Disposables">Engine + source instances the caller owns. Dispose on shutdown.</param>
public sealed record Phase7ComposedSources(
IReadable? VirtualReadable,
IReadable? ScriptedAlarmReadable,
IReadOnlyList<IDisposable> Disposables)
{
public static readonly Phase7ComposedSources Empty =
new(null, null, Array.Empty<IDisposable>());
}
internal static class ScriptEnabledExtensions
{
// Script has no explicit Enabled column; every row in the generation is a live script.
public static bool Enabled(this Script _) => true;
}

View File

@@ -0,0 +1,58 @@
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
using ZB.MOM.WW.OtOpcUa.Core.ScriptedAlarms;
namespace ZB.MOM.WW.OtOpcUa.Server.Phase7;
/// <summary>
/// <see cref="IReadable"/> adapter exposing each scripted alarm's current
/// <see cref="AlarmActiveState"/> as an OPC UA boolean. Phase 7 follow-up (task #245).
/// </summary>
/// <remarks>
/// <para>
/// Paired with the <see cref="NodeSourceKind.ScriptedAlarm"/> dispatch in
/// <c>DriverNodeManager.OnReadValue</c>. Full-reference lookup is the
/// <c>ScriptedAlarmId</c> the walker wrote into <c>DriverAttributeInfo.FullName</c>
/// when emitting the alarm variable node.
/// </para>
/// <para>
/// Unknown alarm ids return <c>BadNodeIdUnknown</c> so misconfiguration surfaces
/// instead of silently reading <c>false</c>. Alarms whose predicate has never
/// been evaluated (brand new, before the engine's first cascade tick) report
/// <see cref="AlarmActiveState.Inactive"/> via <see cref="AlarmConditionState.Fresh"/>,
/// which matches the Part 9 initial-state semantics.
/// </para>
/// </remarks>
public sealed class ScriptedAlarmReadable : IReadable
{
/// <summary>OPC UA <c>StatusCodes.BadNodeIdUnknown</c> — kept local so we don't pull the OPC stack.</summary>
private const uint BadNodeIdUnknown = 0x80340000;
private readonly ScriptedAlarmEngine _engine;
public ScriptedAlarmReadable(ScriptedAlarmEngine engine)
{
_engine = engine ?? throw new ArgumentNullException(nameof(engine));
}
public Task<IReadOnlyList<DataValueSnapshot>> ReadAsync(
IReadOnlyList<string> fullReferences, CancellationToken cancellationToken)
{
ArgumentNullException.ThrowIfNull(fullReferences);
var now = DateTime.UtcNow;
var results = new DataValueSnapshot[fullReferences.Count];
for (var i = 0; i < fullReferences.Count; i++)
{
var alarmId = fullReferences[i];
var state = _engine.GetState(alarmId);
if (state is null)
{
results[i] = new DataValueSnapshot(null, BadNodeIdUnknown, null, now);
continue;
}
var active = state.Active == AlarmActiveState.Active;
results[i] = new DataValueSnapshot(active, 0u, now, now);
}
return Task.FromResult<IReadOnlyList<DataValueSnapshot>>(results);
}
}

View File

@@ -8,8 +8,11 @@ using Serilog.Formatting.Compact;
using ZB.MOM.WW.OtOpcUa.Configuration;
using ZB.MOM.WW.OtOpcUa.Configuration.LocalCache;
using ZB.MOM.WW.OtOpcUa.Core.Hosting;
using ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Proxy;
using ZB.MOM.WW.OtOpcUa.Server;
using ZB.MOM.WW.OtOpcUa.Server.OpcUa;
using ZB.MOM.WW.OtOpcUa.Server.Phase7;
using ZB.MOM.WW.OtOpcUa.Server.Security;
var builder = Host.CreateApplicationBuilder(args);
@@ -87,6 +90,18 @@ builder.Services.AddSingleton<ILocalConfigCache>(_ => new LiteDbConfigCache(opti
builder.Services.AddSingleton<DriverHost>();
builder.Services.AddSingleton<NodeBootstrap>();
// Task #248 — driver-instance bootstrap pipeline. DriverFactoryRegistry is the
// type-name → factory map; each driver project's static Register call pre-loads
// its factory so the bootstrapper can materialise DriverInstance rows from the
// central DB into live IDriver instances.
builder.Services.AddSingleton<DriverFactoryRegistry>(_ =>
{
var registry = new DriverFactoryRegistry();
GalaxyProxyDriverFactoryExtensions.Register(registry);
return registry;
});
builder.Services.AddSingleton<DriverInstanceBootstrapper>();
// ADR-001 Option A wiring — the registry is the handoff between OpcUaServerService's
// bootstrap-time population pass + OpcUaApplicationHost's StartAsync walker invocation.
// DriverEquipmentContentRegistry.Get is the equipmentContentLookup delegate that PR #155
@@ -113,5 +128,13 @@ builder.Services.AddDbContext<OtOpcUaConfigDbContext>(opt =>
opt.UseSqlServer(options.ConfigDbConnectionString));
builder.Services.AddHostedService<HostStatusPublisher>();
// Phase 7 follow-up #246 — historian sink + engine composer. NullAlarmHistorianSink
// is the default until the Galaxy.Host SqliteStoreAndForwardSink writer adapter
// lands (task #248). The composer reads Script/VirtualTag/ScriptedAlarm rows on
// generation bootstrap, builds the engines, and starts the driver-bridge feed.
builder.Services.AddSingleton<IAlarmHistorianSink>(NullAlarmHistorianSink.Instance);
builder.Services.AddSingleton(Log.Logger); // Serilog root for ScriptLoggerFactory
builder.Services.AddSingleton<Phase7Composer>();
var host = builder.Build();
await host.RunAsync();

View File

@@ -30,6 +30,11 @@
<ItemGroup>
<ProjectReference Include="..\ZB.MOM.WW.OtOpcUa.Core\ZB.MOM.WW.OtOpcUa.Core.csproj"/>
<ProjectReference Include="..\ZB.MOM.WW.OtOpcUa.Core.Scripting\ZB.MOM.WW.OtOpcUa.Core.Scripting.csproj"/>
<ProjectReference Include="..\ZB.MOM.WW.OtOpcUa.Core.VirtualTags\ZB.MOM.WW.OtOpcUa.Core.VirtualTags.csproj"/>
<ProjectReference Include="..\ZB.MOM.WW.OtOpcUa.Core.ScriptedAlarms\ZB.MOM.WW.OtOpcUa.Core.ScriptedAlarms.csproj"/>
<ProjectReference Include="..\ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian\ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian.csproj"/>
<ProjectReference Include="..\ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Proxy\ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Proxy.csproj"/>
<ProjectReference Include="..\ZB.MOM.WW.OtOpcUa.Analyzers\ZB.MOM.WW.OtOpcUa.Analyzers.csproj"
OutputItemType="Analyzer" ReferenceOutputAssembly="false"/>
</ItemGroup>

View File

@@ -0,0 +1,83 @@
using Shouldly;
using Xunit;
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
using ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Proxy.Ipc;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared.Contracts;
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Proxy.Tests;
/// <summary>
/// Phase 7 follow-up #247 — covers the wire-format translation between the
/// <see cref="AlarmHistorianEvent"/> the SQLite sink hands to the writer + the
/// <see cref="HistorianAlarmEventDto"/> the Galaxy.Host IPC contract expects, plus
/// the per-event outcome enum mapping. Pure functions; the round-trip over a real
/// pipe is exercised by the live Host suite (task #240).
/// </summary>
[Trait("Category", "Unit")]
public sealed class GalaxyHistorianWriterMappingTests
{
[Fact]
public void ToDto_round_trips_every_field()
{
var ts = new DateTime(2026, 4, 20, 14, 30, 0, DateTimeKind.Utc);
var e = new AlarmHistorianEvent(
AlarmId: "al-7",
EquipmentPath: "/Site/Line/Cell",
AlarmName: "HighTemp",
AlarmTypeName: "LimitAlarm",
Severity: AlarmSeverity.High,
EventKind: "RaiseEvent",
Message: "Temp 92°C exceeded 90°C",
User: "operator-7",
Comment: "ack with reason",
TimestampUtc: ts);
var dto = GalaxyHistorianWriter.ToDto(e);
dto.AlarmId.ShouldBe("al-7");
dto.EquipmentPath.ShouldBe("/Site/Line/Cell");
dto.AlarmName.ShouldBe("HighTemp");
dto.AlarmTypeName.ShouldBe("LimitAlarm");
dto.Severity.ShouldBe((int)AlarmSeverity.High);
dto.EventKind.ShouldBe("RaiseEvent");
dto.Message.ShouldBe("Temp 92°C exceeded 90°C");
dto.User.ShouldBe("operator-7");
dto.Comment.ShouldBe("ack with reason");
dto.TimestampUtcUnixMs.ShouldBe(new DateTimeOffset(ts, TimeSpan.Zero).ToUnixTimeMilliseconds());
}
[Fact]
public void ToDto_preserves_null_Comment()
{
var e = new AlarmHistorianEvent(
"a", "/p", "n", "AlarmCondition", AlarmSeverity.Low, "RaiseEvent", "m",
User: "system", Comment: null, TimestampUtc: DateTime.UtcNow);
GalaxyHistorianWriter.ToDto(e).Comment.ShouldBeNull();
}
[Theory]
[InlineData(HistorianAlarmEventOutcomeDto.Ack, HistorianWriteOutcome.Ack)]
[InlineData(HistorianAlarmEventOutcomeDto.RetryPlease, HistorianWriteOutcome.RetryPlease)]
[InlineData(HistorianAlarmEventOutcomeDto.PermanentFail, HistorianWriteOutcome.PermanentFail)]
public void MapOutcome_round_trips_every_byte(
HistorianAlarmEventOutcomeDto wire, HistorianWriteOutcome expected)
{
GalaxyHistorianWriter.MapOutcome(wire).ShouldBe(expected);
}
[Fact]
public void MapOutcome_unknown_byte_throws()
{
Should.Throw<InvalidOperationException>(
() => GalaxyHistorianWriter.MapOutcome((HistorianAlarmEventOutcomeDto)0xFF));
}
[Fact]
public void Null_client_rejected()
{
Should.Throw<ArgumentNullException>(() => new GalaxyHistorianWriter(null!));
}
}

View File

@@ -0,0 +1,268 @@
using Microsoft.Extensions.Logging.Abstractions;
using Opc.Ua;
using Opc.Ua.Client;
using Opc.Ua.Configuration;
using Shouldly;
using Xunit;
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
using ZB.MOM.WW.OtOpcUa.Core.Hosting;
using ZB.MOM.WW.OtOpcUa.Server.OpcUa;
using ZB.MOM.WW.OtOpcUa.Server.Security;
namespace ZB.MOM.WW.OtOpcUa.Server.Tests;
/// <summary>
/// Task #219 — server-integration coverage for the <see cref="IAlarmSource"/> wiring
/// path. Boots the full OPC UA stack + a fake <see cref="IAlarmSource"/> driver, opens a
/// client session, and verifies via browse/read that DiscoverAsync's
/// <c>MarkAsAlarmCondition</c> calls produce addressable AlarmConditionState nodes in the
/// driver's namespace and that firing <c>OnAlarmEvent</c> routes through
/// <c>GenericDriverNodeManager</c>'s forwarder into <c>DriverNodeManager.ConditionSink</c>
/// without throwing.
///
/// Companion to <see cref="HistoryReadIntegrationTests"/> which covers the
/// <see cref="IHistoryProvider"/> dispatch path; together they close the server-side
/// integration gap for optional driver capabilities (plan decision #62).
///
/// Known server-side scoping (not a regression introduced here): the stack exposes the
/// AlarmConditionState type's inherited children (Severity / Message / ActiveState / …)
/// with Foundation-namespace NodeIds (ns=0) that aren't added to
/// <see cref="DriverNodeManager"/>'s predefined-node index, so reading those child
/// attributes through an OPC UA client returns <c>BadNodeIdUnknown</c>. OPC UA Part 9
/// event propagation (subscribe-on-Server + ConditionRefresh) is likewise out of scope
/// until the node manager wires <c>HasNotifier</c> + child-node registration. The
/// existing Core-level <c>GenericDriverNodeManagerTests</c> cover the in-memory alarm-sink
/// fan-out semantics directly.
/// </summary>
[Trait("Category", "Integration")]
public sealed class AlarmSubscribeIntegrationTests : IAsyncLifetime
{
private static readonly int Port = 48700 + Random.Shared.Next(0, 99);
private readonly string _endpoint = $"opc.tcp://localhost:{Port}/OtOpcUaAlarmTest";
private readonly string _pkiRoot = Path.Combine(Path.GetTempPath(), $"otopcua-alarm-test-{Guid.NewGuid():N}");
private DriverHost _driverHost = null!;
private OpcUaApplicationHost _server = null!;
private AlarmDriver _driver = null!;
public async ValueTask InitializeAsync()
{
_driverHost = new DriverHost();
_driver = new AlarmDriver();
await _driverHost.RegisterAsync(_driver, "{}", CancellationToken.None);
var options = new OpcUaServerOptions
{
EndpointUrl = _endpoint,
ApplicationName = "OtOpcUaAlarmTest",
ApplicationUri = "urn:OtOpcUa:Server:AlarmTest",
PkiStoreRoot = _pkiRoot,
AutoAcceptUntrustedClientCertificates = true,
HealthEndpointsEnabled = false,
};
_server = new OpcUaApplicationHost(options, _driverHost, new DenyAllUserAuthenticator(),
NullLoggerFactory.Instance, NullLogger<OpcUaApplicationHost>.Instance);
await _server.StartAsync(CancellationToken.None);
}
public async ValueTask DisposeAsync()
{
await _server.DisposeAsync();
await _driverHost.DisposeAsync();
try { Directory.Delete(_pkiRoot, recursive: true); } catch { /* best-effort */ }
}
[Fact]
public async Task Driver_alarm_transition_updates_server_side_AlarmConditionState_node()
{
using var session = await OpenSessionAsync();
var nsIndex = (ushort)session.NamespaceUris.GetIndex("urn:OtOpcUa:alarm-driver");
_driver.RaiseAlarm(new AlarmEventArgs(
SubscriptionHandle: new FakeHandle("sub"),
SourceNodeId: "Tank.HiHi",
ConditionId: "cond-1",
AlarmType: "Active",
Message: "Level exceeded upper-upper",
Severity: AlarmSeverity.High,
SourceTimestampUtc: DateTime.UtcNow));
// The alarm-condition node's identifier is the driver full-reference + ".Condition"
// (DriverNodeManager.VariableHandle.MarkAsAlarmCondition). Server-side state changes
// are applied synchronously under DriverNodeManager.Lock inside ConditionSink.OnTransition,
// so by the time RaiseAlarm returns the node state has been flushed.
var conditionNodeId = new NodeId("Tank.HiHi.Condition", nsIndex);
// Browse the condition node for the well-known Part-9 child variables. The stack
// materializes Severity / Message / ActiveState / AckedState as children below the
// AlarmConditionState; their NodeIds are allocated by the stack so we discover them
// by BrowseName rather than guessing.
var browseDescriptions = new BrowseDescriptionCollection
{
new()
{
NodeId = conditionNodeId,
BrowseDirection = BrowseDirection.Forward,
ReferenceTypeId = ReferenceTypeIds.HierarchicalReferences,
IncludeSubtypes = true,
NodeClassMask = 0,
ResultMask = (uint)BrowseResultMask.All,
},
};
session.Browse(null, null, 0, browseDescriptions, out var browseResults, out _);
var children = browseResults[0].References
.ToDictionary(r => r.BrowseName.Name,
r => ExpandedNodeId.ToNodeId(r.NodeId, session.NamespaceUris),
StringComparer.Ordinal);
children.ShouldContainKey("Severity",
"browse did not return Severity child; full list: "
+ string.Join(", ", browseResults[0].References.Select(r => $"{r.BrowseName.Name}={r.NodeId}")));
children.ShouldContainKey("Message");
children.ShouldContainKey("ActiveState");
// NB: the stack exposes AlarmConditionState's inherited children with Foundation-namespace
// NodeIds (ns=0). The DriverNodeManager registers only the parent alarm object via
// AddPredefinedNode; reading child attributes through the OPC UA client returns
// BadNodeIdUnknown because the stack-assigned child NodeIds aren't in the node
// manager's predefined-node index. Asserting state-mutation via a client-side read
// is therefore out of reach at this integration layer — the Core-level
// GenericDriverNodeManagerTests cover the in-memory alarm-sink fan-out directly.
//
// What this test *does* verify through the OPC UA client: the alarm node itself is
// reachable via browse (proving MarkAsAlarmCondition registered it as a predefined
// node), its displayed name matches the driver's AlarmConditionInfo.SourceName, and
// firing the transition does not throw out of ConditionSink.OnTransition (which would
// fail the test at RaiseAlarm since the event handler is invoked synchronously).
var nodesToRead = new ReadValueIdCollection
{
new() { NodeId = conditionNodeId, AttributeId = Attributes.DisplayName },
};
session.Read(null, 0, TimestampsToReturn.Neither, nodesToRead, out var values, out _);
values[0].StatusCode.Code.ShouldBe(StatusCodes.Good);
((LocalizedText)values[0].Value).Text.ShouldBe("Tank.HiHi");
}
[Fact]
public async Task Each_IsAlarm_variable_registers_its_own_condition_node_in_the_driver_namespace()
{
// Tag-scoped alarm wiring: DiscoverAsync declares two IsAlarm variables and calls
// MarkAsAlarmCondition on each. The server-side DriverNodeManager wraps each call in
// a CapturingHandle that creates a sibling AlarmConditionState + registers a sink
// under the driver full-reference. Browse should show both condition nodes with
// distinct NodeIds using the FullReference + ".Condition" convention.
using var session = await OpenSessionAsync();
var nsIndex = (ushort)session.NamespaceUris.GetIndex("urn:OtOpcUa:alarm-driver");
_driver.RaiseAlarm(new AlarmEventArgs(
new FakeHandle("sub"), "Tank.HiHi", "c", "Active", "first", AlarmSeverity.High,
DateTime.UtcNow));
var attrs = new ReadValueIdCollection
{
new() { NodeId = new NodeId("Tank.HiHi.Condition", nsIndex), AttributeId = Attributes.DisplayName },
new() { NodeId = new NodeId("Heater.OverTemp.Condition", nsIndex), AttributeId = Attributes.DisplayName },
};
session.Read(null, 0, TimestampsToReturn.Neither, attrs, out var results, out _);
results[0].StatusCode.Code.ShouldBe(StatusCodes.Good);
results[1].StatusCode.Code.ShouldBe(StatusCodes.Good);
((LocalizedText)results[0].Value).Text.ShouldBe("Tank.HiHi");
((LocalizedText)results[1].Value).Text.ShouldBe("Heater.OverTemp");
}
private async Task<ISession> OpenSessionAsync()
{
var cfg = new ApplicationConfiguration
{
ApplicationName = "OtOpcUaAlarmTestClient",
ApplicationUri = "urn:OtOpcUa:AlarmTestClient",
ApplicationType = ApplicationType.Client,
SecurityConfiguration = new SecurityConfiguration
{
ApplicationCertificate = new CertificateIdentifier
{
StoreType = CertificateStoreType.Directory,
StorePath = Path.Combine(_pkiRoot, "client-own"),
SubjectName = "CN=OtOpcUaAlarmTestClient",
},
TrustedIssuerCertificates = new CertificateTrustList { StoreType = CertificateStoreType.Directory, StorePath = Path.Combine(_pkiRoot, "client-issuers") },
TrustedPeerCertificates = new CertificateTrustList { StoreType = CertificateStoreType.Directory, StorePath = Path.Combine(_pkiRoot, "client-trusted") },
RejectedCertificateStore = new CertificateTrustList { StoreType = CertificateStoreType.Directory, StorePath = Path.Combine(_pkiRoot, "client-rejected") },
AutoAcceptUntrustedCertificates = true,
AddAppCertToTrustedStore = true,
},
TransportConfigurations = new TransportConfigurationCollection(),
TransportQuotas = new TransportQuotas { OperationTimeout = 15000 },
ClientConfiguration = new ClientConfiguration { DefaultSessionTimeout = 60000 },
};
await cfg.Validate(ApplicationType.Client);
cfg.CertificateValidator.CertificateValidation += (_, e) => e.Accept = true;
var instance = new ApplicationInstance { ApplicationConfiguration = cfg, ApplicationType = ApplicationType.Client };
await instance.CheckApplicationInstanceCertificate(true, CertificateFactory.DefaultKeySize);
var selected = CoreClientUtils.SelectEndpoint(cfg, _endpoint, useSecurity: false);
var endpointConfig = EndpointConfiguration.Create(cfg);
var configuredEndpoint = new ConfiguredEndpoint(null, selected, endpointConfig);
return await Session.Create(cfg, configuredEndpoint, false, "OtOpcUaAlarmTestClientSession", 60000,
new UserIdentity(new AnonymousIdentityToken()), null);
}
/// <summary>
/// Stub <see cref="IAlarmSource"/> driver. <see cref="DiscoverAsync"/> emits two alarm-
/// bearing variables (so tag-scoped fan-out can be asserted); <see cref="RaiseAlarm"/>
/// fires <see cref="OnAlarmEvent"/> exactly like a real driver would.
/// </summary>
private sealed class AlarmDriver : IDriver, ITagDiscovery, IAlarmSource
{
public string DriverInstanceId => "alarm-driver";
public string DriverType => "AlarmStub";
public event EventHandler<AlarmEventArgs>? OnAlarmEvent;
public Task InitializeAsync(string driverConfigJson, CancellationToken ct) => Task.CompletedTask;
public Task ReinitializeAsync(string driverConfigJson, CancellationToken ct) => Task.CompletedTask;
public Task ShutdownAsync(CancellationToken ct) => Task.CompletedTask;
public DriverHealth GetHealth() => new(DriverState.Healthy, DateTime.UtcNow, null);
public long GetMemoryFootprint() => 0;
public Task FlushOptionalCachesAsync(CancellationToken ct) => Task.CompletedTask;
public Task DiscoverAsync(IAddressSpaceBuilder builder, CancellationToken ct)
{
var tank = builder.Folder("Tank", "Tank");
var hiHi = tank.Variable("HiHi", "HiHi", new DriverAttributeInfo(
"Tank.HiHi", DriverDataType.Boolean, false, null,
SecurityClassification.FreeAccess, false, IsAlarm: true));
hiHi.MarkAsAlarmCondition(new AlarmConditionInfo(
"Tank.HiHi", AlarmSeverity.High, "High-high alarm"));
var heater = builder.Folder("Heater", "Heater");
var ot = heater.Variable("OverTemp", "OverTemp", new DriverAttributeInfo(
"Heater.OverTemp", DriverDataType.Boolean, false, null,
SecurityClassification.FreeAccess, false, IsAlarm: true));
ot.MarkAsAlarmCondition(new AlarmConditionInfo(
"Heater.OverTemp", AlarmSeverity.Critical, "Over-temperature"));
return Task.CompletedTask;
}
public void RaiseAlarm(AlarmEventArgs args) => OnAlarmEvent?.Invoke(this, args);
public Task<IAlarmSubscriptionHandle> SubscribeAlarmsAsync(
IReadOnlyList<string> _, CancellationToken __)
=> Task.FromResult<IAlarmSubscriptionHandle>(new FakeHandle("sub"));
public Task UnsubscribeAlarmsAsync(IAlarmSubscriptionHandle _, CancellationToken __)
=> Task.CompletedTask;
public Task AcknowledgeAsync(
IReadOnlyList<AlarmAcknowledgeRequest> _, CancellationToken __)
=> Task.CompletedTask;
}
private sealed class FakeHandle(string diagnosticId) : IAlarmSubscriptionHandle
{
public string DiagnosticId { get; } = diagnosticId;
}
}

View File

@@ -0,0 +1,73 @@
using Shouldly;
using Xunit;
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
using ZB.MOM.WW.OtOpcUa.Core.Hosting;
namespace ZB.MOM.WW.OtOpcUa.Server.Tests;
/// <summary>
/// Task #248 — covers the <see cref="DriverFactoryRegistry"/> contract that
/// <see cref="DriverInstanceBootstrapper"/> consumes.
/// </summary>
[Trait("Category", "Unit")]
public sealed class DriverFactoryRegistryTests
{
private static IDriver FakeDriver(string id, string config) => new FakeIDriver(id);
[Fact]
public void Register_then_TryGet_returns_factory()
{
var r = new DriverFactoryRegistry();
r.Register("MyDriver", FakeDriver);
r.TryGet("MyDriver").ShouldNotBeNull();
r.TryGet("Nope").ShouldBeNull();
}
[Fact]
public void Register_is_case_insensitive()
{
var r = new DriverFactoryRegistry();
r.Register("Galaxy", FakeDriver);
r.TryGet("galaxy").ShouldNotBeNull();
r.TryGet("GALAXY").ShouldNotBeNull();
}
[Fact]
public void Register_duplicate_type_throws()
{
var r = new DriverFactoryRegistry();
r.Register("Galaxy", FakeDriver);
Should.Throw<InvalidOperationException>(() => r.Register("Galaxy", FakeDriver));
}
[Fact]
public void Register_null_args_rejected()
{
var r = new DriverFactoryRegistry();
Should.Throw<ArgumentException>(() => r.Register("", FakeDriver));
Should.Throw<ArgumentNullException>(() => r.Register("X", null!));
}
[Fact]
public void RegisteredTypes_returns_snapshot()
{
var r = new DriverFactoryRegistry();
r.Register("A", FakeDriver);
r.Register("B", FakeDriver);
r.RegisteredTypes.ShouldContain("A");
r.RegisteredTypes.ShouldContain("B");
}
private sealed class FakeIDriver(string id) : IDriver
{
public string DriverInstanceId => id;
public string DriverType => "Fake";
public Task InitializeAsync(string _, CancellationToken __) => Task.CompletedTask;
public Task ReinitializeAsync(string _, CancellationToken __) => Task.CompletedTask;
public Task ShutdownAsync(CancellationToken _) => Task.CompletedTask;
public Task FlushOptionalCachesAsync(CancellationToken _) => Task.CompletedTask;
public DriverHealth GetHealth() => new(DriverState.Healthy, null, null);
public long GetMemoryFootprint() => 0;
}
}

View File

@@ -0,0 +1,83 @@
using Shouldly;
using Xunit;
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
using ZB.MOM.WW.OtOpcUa.Server.Phase7;
namespace ZB.MOM.WW.OtOpcUa.Server.Tests.Phase7;
/// <summary>
/// Covers the Phase 7 driver-to-engine bridge cache (task #243). Verifies the
/// cache serves last-known values synchronously, fans out Push updates to
/// subscribers, and cleans up on Dispose.
/// </summary>
[Trait("Category", "Unit")]
public sealed class CachedTagUpstreamSourceTests
{
private static DataValueSnapshot Snap(object? v) =>
new(v, 0u, DateTime.UtcNow, DateTime.UtcNow);
[Fact]
public void ReadTag_unknown_path_returns_BadNodeIdUnknown_snapshot()
{
var c = new CachedTagUpstreamSource();
var snap = c.ReadTag("/nowhere");
snap.Value.ShouldBeNull();
snap.StatusCode.ShouldBe(CachedTagUpstreamSource.UpstreamNotConfigured);
}
[Fact]
public void Push_then_Read_returns_cached_value()
{
var c = new CachedTagUpstreamSource();
c.Push("/Line1/Temp", Snap(42));
c.ReadTag("/Line1/Temp").Value.ShouldBe(42);
}
[Fact]
public void Push_fans_out_to_subscribers_in_registration_order()
{
var c = new CachedTagUpstreamSource();
var events = new List<string>();
c.SubscribeTag("/X", (p, s) => events.Add($"A:{p}:{s.Value}"));
c.SubscribeTag("/X", (p, s) => events.Add($"B:{p}:{s.Value}"));
c.Push("/X", Snap(7));
events.ShouldBe(["A:/X:7", "B:/X:7"]);
}
[Fact]
public void Push_to_different_path_does_not_fire_foreign_observer()
{
var c = new CachedTagUpstreamSource();
var fired = 0;
c.SubscribeTag("/X", (_, _) => fired++);
c.Push("/Y", Snap(1));
fired.ShouldBe(0);
}
[Fact]
public void Dispose_of_subscription_stops_fan_out()
{
var c = new CachedTagUpstreamSource();
var fired = 0;
var sub = c.SubscribeTag("/X", (_, _) => fired++);
c.Push("/X", Snap(1));
sub.Dispose();
c.Push("/X", Snap(2));
fired.ShouldBe(1);
}
[Fact]
public void Satisfies_both_VirtualTag_and_ScriptedAlarm_upstream_interfaces()
{
var c = new CachedTagUpstreamSource();
// Single instance is assignable to both — the composer passes it through for
// both engine constructors per the task #243 wiring.
((Core.VirtualTags.ITagUpstreamSource)c).ShouldNotBeNull();
((Core.ScriptedAlarms.ITagUpstreamSource)c).ShouldNotBeNull();
}
}

View File

@@ -0,0 +1,226 @@
using Microsoft.Extensions.Logging.Abstractions;
using Shouldly;
using Xunit;
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
using ZB.MOM.WW.OtOpcUa.Server.Phase7;
namespace ZB.MOM.WW.OtOpcUa.Server.Tests.Phase7;
/// <summary>
/// Task #244 — covers the bridge that pumps live driver <c>OnDataChange</c>
/// notifications into the Phase 7 <see cref="CachedTagUpstreamSource"/>.
/// </summary>
[Trait("Category", "Unit")]
public sealed class DriverSubscriptionBridgeTests
{
private sealed class FakeDriver : ISubscribable
{
public List<IReadOnlyList<string>> SubscribeCalls { get; } = [];
public List<ISubscriptionHandle> Unsubscribed { get; } = [];
public ISubscriptionHandle? LastHandle { get; private set; }
public event EventHandler<DataChangeEventArgs>? OnDataChange;
public Task<ISubscriptionHandle> SubscribeAsync(
IReadOnlyList<string> fullReferences, TimeSpan publishingInterval, CancellationToken cancellationToken)
{
SubscribeCalls.Add(fullReferences);
LastHandle = new Handle($"sub-{SubscribeCalls.Count}");
return Task.FromResult(LastHandle);
}
public Task UnsubscribeAsync(ISubscriptionHandle handle, CancellationToken cancellationToken)
{
Unsubscribed.Add(handle);
return Task.CompletedTask;
}
public void Fire(string fullRef, object value)
{
OnDataChange?.Invoke(this, new DataChangeEventArgs(
LastHandle!, fullRef,
new DataValueSnapshot(value, 0u, DateTime.UtcNow, DateTime.UtcNow)));
}
private sealed record Handle(string DiagnosticId) : ISubscriptionHandle;
}
[Fact]
public async Task StartAsync_calls_SubscribeAsync_with_distinct_fullRefs()
{
var sink = new CachedTagUpstreamSource();
var driver = new FakeDriver();
await using var bridge = new DriverSubscriptionBridge(sink, NullLogger<DriverSubscriptionBridge>.Instance);
await bridge.StartAsync(new[]
{
new DriverFeed(driver,
new Dictionary<string, string>
{
["/Site/L1/A/Temp"] = "DR.Temp",
["/Site/L1/A/Pressure"] = "DR.Pressure",
},
TimeSpan.FromSeconds(1)),
}, CancellationToken.None);
driver.SubscribeCalls.Count.ShouldBe(1);
driver.SubscribeCalls[0].ShouldContain("DR.Temp");
driver.SubscribeCalls[0].ShouldContain("DR.Pressure");
}
[Fact]
public async Task OnDataChange_pushes_to_cache_keyed_by_UNS_path()
{
var sink = new CachedTagUpstreamSource();
var driver = new FakeDriver();
await using var bridge = new DriverSubscriptionBridge(sink, NullLogger<DriverSubscriptionBridge>.Instance);
await bridge.StartAsync(new[]
{
new DriverFeed(driver,
new Dictionary<string, string> { ["/Site/L1/A/Temp"] = "DR.Temp" },
TimeSpan.FromSeconds(1)),
}, CancellationToken.None);
driver.Fire("DR.Temp", 42.5);
sink.ReadTag("/Site/L1/A/Temp").Value.ShouldBe(42.5);
}
[Fact]
public async Task OnDataChange_with_unmapped_fullRef_is_ignored()
{
var sink = new CachedTagUpstreamSource();
var driver = new FakeDriver();
await using var bridge = new DriverSubscriptionBridge(sink, NullLogger<DriverSubscriptionBridge>.Instance);
await bridge.StartAsync(new[]
{
new DriverFeed(driver,
new Dictionary<string, string> { ["/p"] = "DR.A" },
TimeSpan.FromSeconds(1)),
}, CancellationToken.None);
driver.Fire("DR.B", 99); // not in map
sink.ReadTag("/p").StatusCode.ShouldBe(CachedTagUpstreamSource.UpstreamNotConfigured,
"unmapped fullRef shouldn't pollute the cache");
}
[Fact]
public async Task Empty_PathToFullRef_skips_SubscribeAsync_call()
{
var sink = new CachedTagUpstreamSource();
var driver = new FakeDriver();
await using var bridge = new DriverSubscriptionBridge(sink, NullLogger<DriverSubscriptionBridge>.Instance);
await bridge.StartAsync(new[]
{
new DriverFeed(driver, new Dictionary<string, string>(), TimeSpan.FromSeconds(1)),
}, CancellationToken.None);
driver.SubscribeCalls.ShouldBeEmpty();
}
[Fact]
public async Task DisposeAsync_unsubscribes_each_active_subscription()
{
var sink = new CachedTagUpstreamSource();
var driver = new FakeDriver();
var bridge = new DriverSubscriptionBridge(sink, NullLogger<DriverSubscriptionBridge>.Instance);
await bridge.StartAsync(new[]
{
new DriverFeed(driver,
new Dictionary<string, string> { ["/p"] = "DR.A" },
TimeSpan.FromSeconds(1)),
}, CancellationToken.None);
await bridge.DisposeAsync();
driver.Unsubscribed.Count.ShouldBe(1);
driver.Unsubscribed[0].ShouldBeSameAs(driver.LastHandle);
}
[Fact]
public async Task DisposeAsync_unhooks_OnDataChange_so_post_dispose_events_dont_push()
{
var sink = new CachedTagUpstreamSource();
var driver = new FakeDriver();
var bridge = new DriverSubscriptionBridge(sink, NullLogger<DriverSubscriptionBridge>.Instance);
await bridge.StartAsync(new[]
{
new DriverFeed(driver,
new Dictionary<string, string> { ["/p"] = "DR.A" },
TimeSpan.FromSeconds(1)),
}, CancellationToken.None);
await bridge.DisposeAsync();
driver.Fire("DR.A", 999); // post-dispose event
sink.ReadTag("/p").StatusCode.ShouldBe(CachedTagUpstreamSource.UpstreamNotConfigured);
}
[Fact]
public async Task StartAsync_called_twice_throws()
{
var sink = new CachedTagUpstreamSource();
await using var bridge = new DriverSubscriptionBridge(sink, NullLogger<DriverSubscriptionBridge>.Instance);
await bridge.StartAsync(Array.Empty<DriverFeed>(), CancellationToken.None);
await Should.ThrowAsync<InvalidOperationException>(
() => bridge.StartAsync(Array.Empty<DriverFeed>(), CancellationToken.None));
}
[Fact]
public async Task DisposeAsync_is_idempotent()
{
var sink = new CachedTagUpstreamSource();
var bridge = new DriverSubscriptionBridge(sink, NullLogger<DriverSubscriptionBridge>.Instance);
await bridge.DisposeAsync();
await bridge.DisposeAsync(); // must not throw
}
[Fact]
public async Task Subscribe_failure_unhooks_handler_and_propagates()
{
var sink = new CachedTagUpstreamSource();
var failingDriver = new ThrowingDriver();
await using var bridge = new DriverSubscriptionBridge(sink, NullLogger<DriverSubscriptionBridge>.Instance);
var feeds = new[]
{
new DriverFeed(failingDriver,
new Dictionary<string, string> { ["/p"] = "DR.A" },
TimeSpan.FromSeconds(1)),
};
await Should.ThrowAsync<InvalidOperationException>(
() => bridge.StartAsync(feeds, CancellationToken.None));
// Handler should be unhooked — firing now would NPE if it wasn't (event has 0 subs).
failingDriver.HasAnyHandlers.ShouldBeFalse(
"handler must be removed when SubscribeAsync throws so it doesn't leak");
}
[Fact]
public void Null_sink_or_logger_rejected()
{
Should.Throw<ArgumentNullException>(() => new DriverSubscriptionBridge(null!, NullLogger<DriverSubscriptionBridge>.Instance));
Should.Throw<ArgumentNullException>(() => new DriverSubscriptionBridge(new CachedTagUpstreamSource(), null!));
}
private sealed class ThrowingDriver : ISubscribable
{
private EventHandler<DataChangeEventArgs>? _handler;
public bool HasAnyHandlers => _handler is not null;
public event EventHandler<DataChangeEventArgs>? OnDataChange
{
add => _handler = (EventHandler<DataChangeEventArgs>?)Delegate.Combine(_handler, value);
remove => _handler = (EventHandler<DataChangeEventArgs>?)Delegate.Remove(_handler, value);
}
public Task<ISubscriptionHandle> SubscribeAsync(IReadOnlyList<string> _, TimeSpan __, CancellationToken ___) =>
throw new InvalidOperationException("driver offline");
public Task UnsubscribeAsync(ISubscriptionHandle _, CancellationToken __) => Task.CompletedTask;
}
}

View File

@@ -0,0 +1,93 @@
using Shouldly;
using Xunit;
using ZB.MOM.WW.OtOpcUa.Configuration.Entities;
using ZB.MOM.WW.OtOpcUa.Configuration.Enums;
using ZB.MOM.WW.OtOpcUa.Core.OpcUa;
using ZB.MOM.WW.OtOpcUa.Server.Phase7;
namespace ZB.MOM.WW.OtOpcUa.Server.Tests.Phase7;
/// <summary>
/// Task #246 — covers the deterministic mapping inside <see cref="Phase7Composer"/>
/// that turns <see cref="EquipmentNamespaceContent"/> into the path → fullRef map
/// <see cref="DriverFeed.PathToFullRef"/> consumes. Pure function; no DI / DB needed.
/// </summary>
[Trait("Category", "Unit")]
public sealed class Phase7ComposerMappingTests
{
private static UnsArea Area(string id, string name) =>
new() { UnsAreaId = id, ClusterId = "c", Name = name, GenerationId = 1 };
private static UnsLine Line(string id, string areaId, string name) =>
new() { UnsLineId = id, UnsAreaId = areaId, Name = name, GenerationId = 1 };
private static Equipment Eq(string id, string lineId, string name) => new()
{
EquipmentRowId = Guid.NewGuid(), GenerationId = 1, EquipmentId = id,
EquipmentUuid = Guid.NewGuid(), DriverInstanceId = "drv",
UnsLineId = lineId, Name = name, MachineCode = "m",
};
private static Tag T(string id, string name, string fullRef, string equipmentId) => new()
{
TagRowId = Guid.NewGuid(), GenerationId = 1, TagId = id,
DriverInstanceId = "drv", EquipmentId = equipmentId,
Name = name, DataType = "Float32",
AccessLevel = TagAccessLevel.Read, TagConfig = fullRef,
};
[Fact]
public void Maps_tag_to_UNS_path_walker_emits()
{
var content = new EquipmentNamespaceContent(
Areas: [Area("a1", "warsaw")],
Lines: [Line("l1", "a1", "oven-line")],
Equipment: [Eq("e1", "l1", "oven-3")],
Tags: [T("t1", "Temp", "DR.Temp", "e1")]);
var map = Phase7Composer.MapPathsToFullRefs(content);
map.ShouldContainKeyAndValue("/warsaw/oven-line/oven-3/Temp", "DR.Temp");
}
[Fact]
public void Skips_tag_with_null_EquipmentId()
{
var content = new EquipmentNamespaceContent(
[Area("a1", "warsaw")], [Line("l1", "a1", "ol")], [Eq("e1", "l1", "ov")],
[T("t1", "Bare", "DR.Bare", null!)]); // SystemPlatform-style orphan
Phase7Composer.MapPathsToFullRefs(content).ShouldBeEmpty();
}
[Fact]
public void Skips_tag_pointing_at_unknown_Equipment()
{
var content = new EquipmentNamespaceContent(
[Area("a1", "warsaw")], [Line("l1", "a1", "ol")], [Eq("e1", "l1", "ov")],
[T("t1", "Lost", "DR.Lost", "e-missing")]);
Phase7Composer.MapPathsToFullRefs(content).ShouldBeEmpty();
}
[Fact]
public void Maps_multiple_tags_under_same_equipment_distinctly()
{
var content = new EquipmentNamespaceContent(
[Area("a1", "site")], [Line("l1", "a1", "line1")], [Eq("e1", "l1", "cell")],
[T("t1", "Temp", "DR.T", "e1"), T("t2", "Pressure", "DR.P", "e1")]);
var map = Phase7Composer.MapPathsToFullRefs(content);
map.Count.ShouldBe(2);
map["/site/line1/cell/Temp"].ShouldBe("DR.T");
map["/site/line1/cell/Pressure"].ShouldBe("DR.P");
}
[Fact]
public void Empty_content_yields_empty_map()
{
Phase7Composer.MapPathsToFullRefs(new EquipmentNamespaceContent([], [], [], []))
.ShouldBeEmpty();
}
}

View File

@@ -0,0 +1,162 @@
using Microsoft.Extensions.Logging.Abstractions;
using Serilog;
using Shouldly;
using Xunit;
using ZB.MOM.WW.OtOpcUa.Configuration.Entities;
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
using ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian;
using ZB.MOM.WW.OtOpcUa.Core.ScriptedAlarms;
using ZB.MOM.WW.OtOpcUa.Server.Phase7;
namespace ZB.MOM.WW.OtOpcUa.Server.Tests.Phase7;
/// <summary>
/// Phase 7 follow-up (task #243) — verifies the composer that maps Config DB
/// rows to runtime engine definitions + wires up VirtualTagEngine +
/// ScriptedAlarmEngine + historian routing.
/// </summary>
[Trait("Category", "Unit")]
public sealed class Phase7EngineComposerTests
{
private static Script ScriptRow(string id, string source) => new()
{
ScriptRowId = Guid.NewGuid(), GenerationId = 1,
ScriptId = id, Name = id, SourceCode = source, SourceHash = "h",
};
private static VirtualTag VtRow(string id, string scriptId) => new()
{
VirtualTagRowId = Guid.NewGuid(), GenerationId = 1,
VirtualTagId = id, EquipmentId = "eq-1", Name = id,
DataType = "Float32", ScriptId = scriptId,
};
private static ScriptedAlarm AlarmRow(string id, string scriptId) => new()
{
ScriptedAlarmRowId = Guid.NewGuid(), GenerationId = 1,
ScriptedAlarmId = id, EquipmentId = "eq-1", Name = id,
AlarmType = "LimitAlarm", Severity = 500,
MessageTemplate = "x", PredicateScriptId = scriptId,
};
[Fact]
public void Compose_empty_rows_returns_Empty_sentinel()
{
var result = Phase7EngineComposer.Compose(
scripts: [],
virtualTags: [],
scriptedAlarms: [],
upstream: new CachedTagUpstreamSource(),
alarmStateStore: new InMemoryAlarmStateStore(),
historianSink: NullAlarmHistorianSink.Instance,
rootScriptLogger: new LoggerConfiguration().CreateLogger(),
loggerFactory: NullLoggerFactory.Instance);
result.ShouldBeSameAs(Phase7ComposedSources.Empty);
result.VirtualReadable.ShouldBeNull();
result.ScriptedAlarmReadable.ShouldBeNull();
}
[Fact]
public void Compose_VirtualTag_rows_returns_non_null_VirtualReadable()
{
var scripts = new[] { ScriptRow("scr-1", "return 1;") };
var vtags = new[] { VtRow("vt-1", "scr-1") };
var result = Phase7EngineComposer.Compose(
scripts, vtags, [],
upstream: new CachedTagUpstreamSource(),
alarmStateStore: new InMemoryAlarmStateStore(),
historianSink: NullAlarmHistorianSink.Instance,
rootScriptLogger: new LoggerConfiguration().CreateLogger(),
loggerFactory: NullLoggerFactory.Instance);
result.VirtualReadable.ShouldNotBeNull();
result.ScriptedAlarmReadable.ShouldBeNull("no alarms configured");
result.Disposables.Count.ShouldBeGreaterThan(0);
}
[Fact]
public void Compose_ScriptedAlarm_rows_returns_non_null_ScriptedAlarmReadable()
{
var scripts = new[] { ScriptRow("scr-1", "return false;") };
var alarms = new[] { AlarmRow("al-1", "scr-1") };
var result = Phase7EngineComposer.Compose(
scripts, [], alarms,
upstream: new CachedTagUpstreamSource(),
alarmStateStore: new InMemoryAlarmStateStore(),
historianSink: NullAlarmHistorianSink.Instance,
rootScriptLogger: new LoggerConfiguration().CreateLogger(),
loggerFactory: NullLoggerFactory.Instance);
result.ScriptedAlarmReadable.ShouldNotBeNull("task #245 — alarm Active state readable");
result.VirtualReadable.ShouldBeNull();
}
[Fact]
public void Compose_missing_script_reference_throws_with_actionable_message()
{
var vtags = new[] { VtRow("vt-1", "scr-missing") };
Should.Throw<InvalidOperationException>(() =>
Phase7EngineComposer.Compose(
scripts: [],
vtags, [],
upstream: new CachedTagUpstreamSource(),
alarmStateStore: new InMemoryAlarmStateStore(),
historianSink: NullAlarmHistorianSink.Instance,
rootScriptLogger: new LoggerConfiguration().CreateLogger(),
loggerFactory: NullLoggerFactory.Instance))
.Message.ShouldContain("scr-missing");
}
[Fact]
public void Compose_disabled_VirtualTag_is_skipped()
{
var scripts = new[] { ScriptRow("scr-1", "return 1;") };
var disabled = VtRow("vt-1", "scr-1");
disabled.Enabled = false;
var defs = Phase7EngineComposer.ProjectVirtualTags(
new[] { disabled },
new Dictionary<string, Script> { ["scr-1"] = scripts[0] }).ToList();
defs.ShouldBeEmpty();
}
[Fact]
public void ProjectVirtualTags_maps_timer_interval_milliseconds_to_TimeSpan()
{
var scripts = new[] { ScriptRow("scr-1", "return 1;") };
var vt = VtRow("vt-1", "scr-1");
vt.TimerIntervalMs = 2500;
var def = Phase7EngineComposer.ProjectVirtualTags(
new[] { vt },
new Dictionary<string, Script> { ["scr-1"] = scripts[0] }).Single();
def.TimerInterval.ShouldBe(TimeSpan.FromMilliseconds(2500));
}
[Fact]
public void ProjectScriptedAlarms_maps_Severity_numeric_to_AlarmSeverity_bucket()
{
var scripts = new[] { ScriptRow("scr-1", "return true;") };
var buckets = new[] { (1, AlarmSeverity.Low), (250, AlarmSeverity.Low),
(251, AlarmSeverity.Medium), (500, AlarmSeverity.Medium),
(501, AlarmSeverity.High), (750, AlarmSeverity.High),
(751, AlarmSeverity.Critical), (1000, AlarmSeverity.Critical) };
foreach (var (input, expected) in buckets)
{
var row = AlarmRow("a1", "scr-1");
row.Severity = input;
var def = Phase7EngineComposer.ProjectScriptedAlarms(
new[] { row },
new Dictionary<string, Script> { ["scr-1"] = scripts[0] }).Single();
def.Severity.ShouldBe(expected, $"severity {input} should map to {expected}");
}
}
}

View File

@@ -0,0 +1,120 @@
using Microsoft.Extensions.Logging.Abstractions;
using Serilog;
using Shouldly;
using Xunit;
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
using ZB.MOM.WW.OtOpcUa.Core.ScriptedAlarms;
using ZB.MOM.WW.OtOpcUa.Core.Scripting;
using ZB.MOM.WW.OtOpcUa.Server.Phase7;
namespace ZB.MOM.WW.OtOpcUa.Server.Tests.Phase7;
/// <summary>
/// Task #245 — covers the IReadable adapter that surfaces each scripted alarm's
/// live <c>ActiveState</c> so OPC UA variable reads on Source=ScriptedAlarm nodes
/// return the predicate truth instead of BadNotFound.
/// </summary>
[Trait("Category", "Unit")]
public sealed class ScriptedAlarmReadableTests
{
private static (ScriptedAlarmEngine engine, CachedTagUpstreamSource upstream) BuildEngineWith(
params (string alarmId, string predicateSource)[] alarms)
{
var upstream = new CachedTagUpstreamSource();
var logger = new LoggerConfiguration().CreateLogger();
var factory = new ScriptLoggerFactory(logger);
var engine = new ScriptedAlarmEngine(upstream, new InMemoryAlarmStateStore(), factory, logger);
var defs = alarms.Select(a => new ScriptedAlarmDefinition(
AlarmId: a.alarmId,
EquipmentPath: "/eq",
AlarmName: a.alarmId,
Kind: AlarmKind.LimitAlarm,
Severity: AlarmSeverity.Medium,
MessageTemplate: "x",
PredicateScriptSource: a.predicateSource)).ToList();
engine.LoadAsync(defs, CancellationToken.None).GetAwaiter().GetResult();
return (engine, upstream);
}
[Fact]
public async Task Reads_return_false_for_newly_loaded_alarm_with_inactive_predicate()
{
var (engine, _) = BuildEngineWith(("a1", "return false;"));
using var _e = engine;
var readable = new ScriptedAlarmReadable(engine);
var result = await readable.ReadAsync(["a1"], CancellationToken.None);
result.Count.ShouldBe(1);
result[0].Value.ShouldBe(false);
result[0].StatusCode.ShouldBe(0u, "Good quality when the engine has state");
}
[Fact]
public async Task Reads_return_true_when_predicate_evaluates_to_active()
{
var (engine, upstream) = BuildEngineWith(
("tempAlarm", "return ctx.GetTag(\"/Site/Line/Cell/Temp\").Value is double d && d > 100;"));
using var _e = engine;
// Seed the upstream value + nudge the engine so the alarm transitions to Active.
upstream.Push("/Site/Line/Cell/Temp",
new DataValueSnapshot(150.0, 0u, DateTime.UtcNow, DateTime.UtcNow));
// Allow the engine's change-driven cascade to run.
await Task.Delay(50);
var readable = new ScriptedAlarmReadable(engine);
var result = await readable.ReadAsync(["tempAlarm"], CancellationToken.None);
result[0].Value.ShouldBe(true);
}
[Fact]
public async Task Reads_return_BadNodeIdUnknown_for_missing_alarm()
{
var (engine, _) = BuildEngineWith(("a1", "return false;"));
using var _e = engine;
var readable = new ScriptedAlarmReadable(engine);
var result = await readable.ReadAsync(["a-not-loaded"], CancellationToken.None);
result[0].Value.ShouldBeNull();
result[0].StatusCode.ShouldBe(0x80340000u,
"BadNodeIdUnknown surfaces a misconfiguration, not a silent false");
}
[Fact]
public async Task Reads_batch_round_trip_preserves_order()
{
var (engine, _) = BuildEngineWith(
("a1", "return false;"),
("a2", "return false;"));
using var _e = engine;
var readable = new ScriptedAlarmReadable(engine);
var result = await readable.ReadAsync(["a2", "missing", "a1"], CancellationToken.None);
result.Count.ShouldBe(3);
result[0].Value.ShouldBe(false); // a2
result[1].StatusCode.ShouldBe(0x80340000u); // missing
result[2].Value.ShouldBe(false); // a1
}
[Fact]
public void Null_engine_rejected()
{
Should.Throw<ArgumentNullException>(() => new ScriptedAlarmReadable(null!));
}
[Fact]
public async Task Null_fullReferences_rejected()
{
var (engine, _) = BuildEngineWith(("a1", "return false;"));
using var _e = engine;
var readable = new ScriptedAlarmReadable(engine);
await Should.ThrowAsync<ArgumentNullException>(
() => readable.ReadAsync(null!, CancellationToken.None));
}
}