feat(loader): company overlay as VirtualTags mirroring the galaxy mirror + verify --require-good

This commit is contained in:
Joseph Doherty
2026-06-07 04:59:51 -04:00
parent dce6f83488
commit 5655b75fe6
2 changed files with 162 additions and 58 deletions
+26 -12
View File
@@ -113,22 +113,36 @@ Defaults target docker-dev; override via flags or env:
Besides the galaxy-native mirror, the tool can load the **Northwind company
shape** (`filling / line-1 / rinser-01 / speed-rpm`) as a second, **Equipment**-kind
namespace (`nw-uns`, in cluster `MAIN`) from `../company-uns.json`. This needs
OtOpcUa `master` ≥ the Equipment-namespace structure milestone
(`febe462…9a67ebc`), which materialises Equipment `Tag`/`VirtualTag` rows on
deploy and added a **headless deploy** endpoint.
namespace (`nw-uns`, in cluster `MAIN`) from `../company-uns.json`. Each company
signal is a **VirtualTag** (+ a `Script`) whose script simply mirrors the live
galaxy-mirror tag for that signal:
```csharp
return ctx.GetTag("TestMachine_001.TestDouble").Value;
```
so the company shape carries live **VALUES** driven off the same Galaxy source — no
driver, no `BadWaitingForInitialData` once the galaxy mirror is up. The `ctx.GetTag`
literal is the signal's `source.fullTagReference`; the engine's `DependencyExtractor`
harvests it and subscribes the VirtualTag to that galaxy-mirror tag. This needs
OtOpcUa `master` ≥ the Equipment-namespace VirtualTag materialisation milestone (WS-3),
which materialises `VirtualTag`/`Script` rows on deploy and added the **headless
deploy** endpoint.
```bash
./.venv/bin/python otopcua_uns.py populate-equipment # 3 areas / 8 lines / 40 equipment / 1036 signals
./.venv/bin/python otopcua_uns.py populate-equipment # 3 areas / 8 lines / 40 equipment / 1036 VirtualTags
curl -s -X POST http://localhost:9200/api/deployments -H 'X-Api-Key: docker-dev-deploy-key' # headless deploy
./.venv/bin/python otopcua_uns.py verify-equipment --expect 1036 # browse the company tree (nw-area-* scope)
./.venv/bin/python otopcua_uns.py verify-equipment --expect 1036 --require-good 1036 --wait # structure + live values
```
UNS folders carry the friendly **DisplayName** (`filling`); the BrowseName/NodeId
stay the stable logical Id (`nw-area-filling`) — standard OPC UA. **Structure-only:**
the company leaves materialise as `BadWaitingForInitialData` — live **values** in
the company shape are the next OtOpcUa milestone (driver/VirtualTag source), tracked
in `OtOpcUa/docs/plans/2026-06-06-equipment-namespace-materialization-scope.md` (WS-3).
The galaxy-native mirror (`populate`) still carries live values.
stay the stable logical Id (`nw-area-filling`) — standard OPC UA. **No driver:** the
company signals are VirtualTags (which link to Equipment + a Script, not a driver); a
placeholder `nw-uns-modbus` driver is kept only because an Equipment namespace is
expected to have one, but no `Tag` binds to it. `verify-equipment --require-good N`
reads each leaf's value and asserts at least N are Good (default `0` = structure-only,
back-compat); `--wait` polls until the deploy + change-triggered evaluations land.
Tracked in `OtOpcUa/docs/plans/2026-06-06-equipment-namespace-materialization-scope.md` (WS-3).
`clean` removes both the mirror tags and the company overlay.
`clean` removes both the mirror tags and the company overlay (the `VirtualTag` +
`Script` rows, in FK-safe order, plus the namespace/driver/equipment/areas/lines).
+136 -46
View File
@@ -25,12 +25,21 @@ the `nw-` id prefix so `clean` can remove them without touching other config.
Idempotent: populate upserts by TagId; re-running is a no-op when unchanged.
There are TWO overlays:
• the galaxy-native mirror (`populate`) — SystemPlatform driver Tags, 396 tags;
• the Northwind company shape (`populate-equipment`) — an Equipment-kind namespace
whose 1036 signals are VirtualTags. Each VirtualTag's Script simply mirrors the
live galaxy-mirror tag (`return ctx.GetTag("<fullTagReference>").Value;`), so the
company shape carries live VALUES driven off the same Galaxy source.
Subcommands:
generate Build the load plan from galaxy-hierarchy.json (writes load-plan.json)
populate Upsert the SystemPlatform mirror Tag rows into the config DB
verify Check DB rows present + live OPC UA values are Good on :4840
status Show config-DB + address-space state
clean Delete all nw-* mirror Tag rows
generate Build the load plan from galaxy-hierarchy.json (writes load-plan.json)
populate Upsert the SystemPlatform mirror Tag rows into the config DB
populate-equipment Load the company shape as VirtualTag+Script rows (mirror the galaxy tags)
verify Check DB rows present + live OPC UA values are Good on :4840
verify-equipment Browse the company tree; --require-good asserts live values
status Show config-DB + address-space state
clean Delete all nw-* mirror Tags + the company VirtualTag/Script overlay
Deploy is a human-gated AdminUI action (no SQL/REST trigger exists); populate
and clean print the reminder and `verify --wait` polls until it lands.
@@ -69,11 +78,18 @@ LOAD_PLAN = os.path.join(os.path.dirname(__file__), "load-plan.json")
# Equipment-overlay (company-shape) object ids — all carry the nw- prefix so
# `clean` can remove them. The Equipment namespace is a SECOND namespace loaded
# alongside the galaxy mirror; its leaves stay BadWaitingForInitialData until the
# value milestone (scope doc WS-3) wires a driver/VirtualTag source.
# alongside the galaxy mirror. Each company signal is a VirtualTag (+ Script) whose
# script mirrors the live SystemPlatform galaxy-mirror tag for that signal — so the
# overlay carries live VALUES (scope doc WS-3), not just structure.
EQ_CLUSTER = os.environ.get("OTOPCUA_EQ_CLUSTER", "MAIN")
EQ_NS = "nw-uns"
EQ_DRIVER = "nw-uns-modbus" # non-Galaxy FK driver (structure-only; doesn't stream)
# A placeholder non-Galaxy driver kept ONLY to satisfy "an Equipment namespace has a
# driver" expectations; it streams nothing and no Tag binds to it (the company signals
# are VirtualTags, which need no driver — they link to Equipment + a Script). Its
# DriverType is non-GalaxyMxGateway so DraftValidator.ValidateDriverNamespaceCompatibility
# accepts it in the Equipment-kind namespace.
EQ_DRIVER = "nw-uns-modbus"
EQ_ID_PREFIX = "nweq-" # VirtualTag/Script logical-id prefix (cleanup by prefix scan)
# galaxy dataTypeName / gen_uns dtype → valid OtOpcUa DriverDataType
_DTYPE_FIX = {"Double": "Float64", "Float": "Float32"}
@@ -212,18 +228,34 @@ def cmd_populate(args):
return 0
def _eq_signal_ids(equipment_id, folder, name):
"""Deterministic (VirtualTagId, ScriptId) for a company signal. Both carry the
EQ_ID_PREFIX so `clean` removes exactly what was created. The two ids share the
same per-signal hash but differ by a kind token so they never collide across the
global UX_VirtualTag_LogicalId / UX_Script_LogicalId unique indexes. Capped at the
64-char id column width."""
base = hashlib.sha1(f"{equipment_id}|{folder}|{name}".encode()).hexdigest()[:20]
return EQ_ID_PREFIX + "vt-" + base, EQ_ID_PREFIX + "sc-" + base
def cmd_populate_equipment(args):
"""Load the company-shape Equipment namespace from company-uns.json: a second
(Equipment-kind) namespace alongside the galaxy mirror, with the Northwind
Area/Line/Equipment/Signal tree. Structure-only — leaves materialise as
BadWaitingForInitialData (the value milestone is separate). Idempotent:
Area/Line/Equipment/Signal tree. Each signal is a VirtualTag whose Script mirrors
the live galaxy-mirror tag for that signal — `return ctx.GetTag("<ref>").Value;` —
so the company shape streams live VALUES off the same Galaxy source (no driver,
no BadWaitingForInitialData once the galaxy mirror is up). Idempotent:
drop-and-recreate of the nw- overlay rows."""
with open(args.company_json) as f:
doc = json.load(f)
u = doc["uns"]
conn, cur = connect(args.mssql)
# Drop any prior overlay (child rows first), then recreate.
# Drop any prior overlay (child rows first), then recreate. VirtualTag/Script go
# before Equipment (VirtualTag.EquipmentId logical-FKs Equipment); the stub driver
# keeps no Tags so the Tag delete is just defensive for older overlays.
cur.execute("DELETE FROM dbo.VirtualTag WHERE VirtualTagId LIKE %s", (EQ_ID_PREFIX + "%",))
cur.execute("DELETE FROM dbo.Script WHERE ScriptId LIKE %s", (EQ_ID_PREFIX + "%",))
cur.execute("DELETE FROM dbo.Tag WHERE DriverInstanceId=%s", (EQ_DRIVER,))
cur.execute("DELETE FROM dbo.Equipment WHERE DriverInstanceId=%s", (EQ_DRIVER,))
cur.execute("DELETE FROM dbo.UnsLine WHERE UnsLineId LIKE 'nw-line-%'")
@@ -235,6 +267,9 @@ def cmd_populate_equipment(args):
"INSERT INTO dbo.Namespace (NamespaceRowId, NamespaceId, ClusterId, Kind, NamespaceUri, Enabled) "
"VALUES (NEWID(), %s, %s, 'Equipment', %s, 1)",
(EQ_NS, EQ_CLUSTER, doc.get("namespace", {}).get("namespaceUri", "urn:northwind:birmingham:uns")))
# Placeholder driver kept only to satisfy "Equipment namespace has a driver"; NO Tag
# binds to it — the company signals are VirtualTags (driverless). DriverType must be
# non-GalaxyMxGateway for DraftValidator to accept it in an Equipment-kind namespace.
cur.execute(
"INSERT INTO dbo.DriverInstance (DriverInstanceRowId, DriverInstanceId, ClusterId, NamespaceId, "
"Name, DriverType, Enabled, DriverConfig) VALUES (NEWID(), %s, %s, %s, 'Northwind UNS placeholder', 'Modbus', 1, '{}')",
@@ -247,7 +282,7 @@ def cmd_populate_equipment(args):
cur.execute("INSERT INTO dbo.UnsLine (UnsLineRowId, UnsLineId, UnsAreaId, Name) VALUES (NEWID(), %s, %s, %s)",
("nw-" + l["unsLineId"], "nw-" + l["unsAreaId"], l["name"]))
eq_n = tag_n = 0
eq_n = vt_n = 0
for e in u["equipment"]:
eq_id = "nw-" + e["equipmentId"]
eq_uuid = str(uuid.uuid5(uuid.NAMESPACE_URL, "otopcua-nw-eq/" + e["equipmentId"]))
@@ -259,30 +294,34 @@ def cmd_populate_equipment(args):
eq_n += 1
for t in e["tags"]:
dtype = _DTYPE_FIX.get(t["dataType"], t["dataType"])
access = _ACCESS.get(t["accessLevel"], "0")
folder = t.get("folderPath") or None
# Local NodeId == TagConfig.FullName; prefix with nw: so it never collides with the
# galaxy-mirror SystemPlatform NodeIds (which use the bare MXAccess ref).
full = "nw:" + t["source"]["fullTagReference"]
# TagId is capped at 64 chars; a short stable hash keeps it unique. Cleanup is by
# DriverInstanceId (not TagId), so no prefix scan is needed.
tag_id = "nweq-" + hashlib.sha1(
f"{e['equipmentId']}|{folder}|{t['name']}".encode()).hexdigest()[:20]
cfg = json.dumps({"FullName": full, "DataType": dtype})
# The galaxy-mirror MXAccess ref (e.g. TestMachine_001.TestDouble) is the upstream
# the VirtualTag mirrors. DependencyExtractor harvests the literal in ctx.GetTag(),
# so the engine subscribes to exactly this path on the galaxy-mirror driver.
full = t["source"]["fullTagReference"]
vt_id, sc_id = _eq_signal_ids(e["equipmentId"], folder, t["name"])
source_code = f'return ctx.GetTag("{full}").Value;'
source_hash = hashlib.sha256(source_code.encode()).hexdigest()
cur.execute(
"INSERT INTO dbo.Tag (TagRowId, TagId, DriverInstanceId, EquipmentId, Name, FolderPath, "
"DataType, AccessLevel, WriteIdempotent, TagConfig) VALUES (NEWID(), %s, %s, %s, %s, %s, %s, %s, 0, %s)",
(tag_id, EQ_DRIVER, eq_id, t["name"], folder, dtype, access, cfg))
tag_n += 1
"INSERT INTO dbo.Script (ScriptRowId, ScriptId, Name, SourceCode, SourceHash, Language) "
"VALUES (NEWID(), %s, %s, %s, %s, 'CSharp')",
(sc_id, t["name"], source_code, source_hash))
cur.execute(
"INSERT INTO dbo.VirtualTag (VirtualTagRowId, VirtualTagId, EquipmentId, Name, DataType, "
"ScriptId, ChangeTriggered, TimerIntervalMs, Historize, Enabled) "
"VALUES (NEWID(), %s, %s, %s, %s, %s, 1, NULL, %s, 1)",
(vt_id, eq_id, t["name"], dtype, sc_id, 1 if t.get("historize") else 0))
vt_n += 1
conn.commit()
conn.close()
print(f"populated equipment overlay: namespace {EQ_NS} ({EQ_CLUSTER}), "
f"{len(u['unsAreas'])} areas, {len(u['unsLines'])} lines, {eq_n} equipment, {tag_n} signals")
f"{len(u['unsAreas'])} areas, {len(u['unsLines'])} lines, {eq_n} equipment, "
f"{vt_n} VirtualTags (+ {vt_n} mirror Scripts)")
print()
print(f">>> NEXT: deploy (headless) — curl -s -X POST {args.deploy_url.replace('/deployments','')}/api/deployments "
f"-H 'X-Api-Key: {args.deploy_key}'")
print(">>> then run: otopcua_uns.py verify-equipment")
print(">>> then run: otopcua_uns.py verify-equipment --require-good 1036 --wait")
return 0
@@ -290,7 +329,10 @@ def cmd_clean(args):
conn, cur = connect(args.mssql)
cur.execute("DELETE FROM dbo.Tag WHERE TagId LIKE %s", (ID_PREFIX + "%",))
n = cur.rowcount
# Also drop the company-shape Equipment overlay (child rows first).
# Also drop the company-shape Equipment overlay (child rows first): VirtualTag and
# Script (both nweq-*) before Equipment, then any stub-driver Tag, then the rest.
cur.execute("DELETE FROM dbo.VirtualTag WHERE VirtualTagId LIKE %s", (EQ_ID_PREFIX + "%",))
cur.execute("DELETE FROM dbo.Script WHERE ScriptId LIKE %s", (EQ_ID_PREFIX + "%",))
cur.execute("DELETE FROM dbo.Tag WHERE DriverInstanceId=%s", (EQ_DRIVER,))
cur.execute("DELETE FROM dbo.Equipment WHERE DriverInstanceId=%s", (EQ_DRIVER,))
cur.execute("DELETE FROM dbo.UnsLine WHERE UnsLineId LIKE 'nw-line-%'")
@@ -398,16 +440,28 @@ def sample_values(endpoint, n):
return [("<browse error>", str(e), "?")]
def browse_tree(endpoint, max_depth=8, top_prefix=None):
def browse_tree(endpoint, max_depth=8, top_prefix=None, read_values=False):
"""Recursively descend the OtOpcUa address space and count leaf variables, returning
(folder_count, leaf_count, leaf_paths). A node with no children is a leaf signal — this
correctly handles the DEEP Equipment UNS tree (Area/Line/Equipment/[FolderPath]/Signal),
unlike browse_summary which assumes the flat 2-level Galaxy hierarchy. When top_prefix is
set, only top-level OtOpcUa folders whose browse name starts with it are counted (e.g.
'nw-area-' scopes to the company Equipment overlay, excluding the Galaxy mirror folders)."""
(folder_count, leaf_count, leaf_paths, good_count). A node with no children is a leaf
signal — this correctly handles the DEEP Equipment UNS tree
(Area/Line/Equipment/[FolderPath]/Signal), unlike browse_summary which assumes the flat
2-level Galaxy hierarchy. When top_prefix is set, only top-level OtOpcUa folders whose
browse name starts with it are counted (e.g. 'nw-area-' scopes to the company Equipment
overlay, excluding the Galaxy mirror folders). When read_values is True, each leaf's value
is read and good_count tallies the Good-quality ones (else good_count is 0)."""
import asyncio
from asyncua import Client
async def maybe_good(node, acc):
if not read_values:
return
try:
dv = await node.read_data_value()
if dv.StatusCode and dv.StatusCode.is_good():
acc["good"] += 1
except Exception:
pass
async def walk(node, path, depth, acc):
if depth >= max_depth:
return
@@ -424,9 +478,10 @@ def browse_tree(endpoint, max_depth=8, top_prefix=None):
else:
acc["leaves"] += 1
acc["paths"].append(child_path)
await maybe_good(ch, acc)
async def run():
acc = {"folders": 0, "leaves": 0, "paths": []}
acc = {"folders": 0, "leaves": 0, "paths": [], "good": 0}
async with Client(endpoint) as c:
for k in await c.nodes.objects.get_children():
if (await k.read_browse_name()).Name != "OtOpcUa":
@@ -441,32 +496,61 @@ def browse_tree(endpoint, max_depth=8, top_prefix=None):
else:
acc["leaves"] += 1
acc["paths"].append("OtOpcUa/" + tn)
return acc["folders"], acc["leaves"], acc["paths"]
await maybe_good(top, acc)
return acc["folders"], acc["leaves"], acc["paths"], acc["good"]
try:
return asyncio.run(run())
except Exception as e:
return (f"<{type(e).__name__}: {e}>", 0, [])
return (f"<{type(e).__name__}: {e}>", 0, [], 0)
def cmd_verify_equipment(args):
"""Browse the full UNS tree by friendly Area/Line/Equipment/Signal names and report the leaf
signal count. With --expect N, exit non-zero unless exactly N leaf signals are present (the
equipment-namespace structure-materialisation check)."""
equipment-namespace structure-materialisation check). With --require-good N (>0), also read
each leaf's value and require at least N Good ones (the live-VALUE check for the VirtualTag
overlay) — back-compat default 0 = structure-only. --wait polls so it can wait for the deploy
+ change-triggered VirtualTag evaluations to land."""
top_prefix = None if args.all else "nw-area-"
folders, leaves, paths = browse_tree(args.opcua_endpoint, top_prefix=top_prefix)
scope = "whole address space" if args.all else "company overlay (nw-area-*)"
print(f"equipment tree : {folders} folder(s), {leaves} leaf signal(s) on {args.opcua_endpoint} [{scope}]")
read_values = args.require_good > 0
deadline = time.time() + (args.wait_seconds if args.wait else 0)
while True:
folders, leaves, paths, good = browse_tree(
args.opcua_endpoint, top_prefix=top_prefix, read_values=read_values)
struct_ok = args.expect is None or leaves == args.expect
good_ok = good >= args.require_good
if (struct_ok and good_ok) or time.time() >= deadline:
break
print(f" waiting for deploy/values… ({leaves} leaves"
+ (f", {good} Good" if read_values else "") + ")")
time.sleep(5)
suffix = f", {good} Good value(s)" if read_values else ""
print(f"equipment tree : {folders} folder(s), {leaves} leaf signal(s){suffix} "
f"on {args.opcua_endpoint} [{scope}]")
for p in sorted(paths)[:args.show]:
print(f" {p}")
if len(paths) > args.show:
print(f" … and {len(paths) - args.show} more")
passed = True
if args.expect is not None:
passed = leaves == args.expect
print("VERIFY-EQUIPMENT:",
f"PASS ({leaves} == {args.expect})" if passed
struct_ok = leaves == args.expect
passed = passed and struct_ok
print(" structure :",
f"PASS ({leaves} == {args.expect})" if struct_ok
else f"FAIL (expected {args.expect}, found {leaves})")
return 0 if passed else 1
return 0
if args.require_good > 0:
good_ok = good >= args.require_good
passed = passed and good_ok
print(" live good :",
f"PASS ({good} >= {args.require_good})" if good_ok
else f"FAIL (expected >= {args.require_good} Good, found {good})")
if args.expect is None and args.require_good == 0:
return 0
print("VERIFY-EQUIPMENT:", "PASS" if passed else "FAIL")
return 0 if passed else 1
# ── arg parsing ─────────────────────────────────────────────────────────────
@@ -495,11 +579,17 @@ def main(argv):
vp.add_argument("--wait", action="store_true", help="poll until the deploy lands")
vp.add_argument("--wait-seconds", type=int, default=120)
ep = sub.add_parser("verify-equipment",
help="recursively browse the Equipment UNS tree + count leaf signals")
help="recursively browse the Equipment UNS tree + count leaf signals "
"(+ optionally assert live Good values)")
ep.add_argument("--expect", type=int, default=None, help="assert exactly N leaf signals")
ep.add_argument("--require-good", type=int, default=0,
help="read each leaf's value and require >= N Good ones (0 = structure-only, default)")
ep.add_argument("--show", type=int, default=20, help="how many leaf paths to print")
ep.add_argument("--all", action="store_true",
help="count the whole address space (default: only the nw-area-* company overlay)")
ep.add_argument("--wait", action="store_true",
help="poll until the deploy lands + (with --require-good) values go Good")
ep.add_argument("--wait-seconds", type=int, default=120)
a = p.parse_args(argv)
a.mssql = dict(host=a.sql_host, port=a.sql_port, user=a.sql_user,