#!/usr/bin/env python3 """ otopcua_uns.py — reloadable populate + verify for the OtOpcUa galaxy UNS. Recreates and verifies an OtOpcUa Unified-Namespace load grounded in the real AVEVA Galaxy "DEV" hierarchy (the 40 TestMachine instances). Designed to be re-run after the OtOpcUa docker-dev instance is rebuilt. Pipeline (see scadaproj/memory otopcua-uns-deploy-and-value-streaming): populate ──SQL──▶ live config tables (Tag rows, nw-* prefix) │ you click "Deploy current configuration" at :9200 │ ▼ driver applies ▶ materialises OtOpcUa// ▶ SubscribeBulk │ verify ──OPC UA──▶ browse + read live values on :4840 What it loads: one SystemPlatform Tag per (machine, signal) bound to the existing GalaxyMxGateway driver. Each tag's FolderPath is the Galaxy object and its Name the attribute, so the materialised variable NodeId is exactly the MXAccess ref the driver subscribes to — giving live values. Every row carries the `nw-` id prefix so `clean` can remove them without touching other config. Idempotent: populate upserts by TagId; re-running is a no-op when unchanged. Subcommands: generate Build the load plan from galaxy-hierarchy.json (writes load-plan.json) populate Upsert the SystemPlatform mirror Tag rows into the config DB verify Check DB rows present + live OPC UA values are Good on :4840 status Show config-DB + address-space state clean Delete all nw-* mirror Tag rows Deploy is a human-gated AdminUI action (no SQL/REST trigger exists); populate and clean print the reminder and `verify --wait` polls until it lands. Deps: pymssql, asyncua (see requirements.txt; use the bundled .venv). """ import argparse import hashlib import json import os import re import sys import time import uuid # ── config (overridable via env / flags) ─────────────────────────────────── DEF_MSSQL = dict( host=os.environ.get("OTOPCUA_SQL_HOST", "localhost"), port=int(os.environ.get("OTOPCUA_SQL_PORT", "14330")), user=os.environ.get("OTOPCUA_SQL_USER", "sa"), password=os.environ.get("OTOPCUA_SQL_PASSWORD", "OtOpcUa!Dev123"), database=os.environ.get("OTOPCUA_SQL_DB", "OtOpcUa"), ) DEF_OPCUA = os.environ.get("OTOPCUA_OPCUA_ENDPOINT", "opc.tcp://localhost:4840") DEF_DRIVER = os.environ.get("OTOPCUA_GALAXY_DRIVER", "MAIN-galaxy-mxgw") DEF_GALAXY_JSON = os.environ.get( "OTOPCUA_GALAXY_JSON", os.path.join(os.path.dirname(__file__), "..", "galaxy-hierarchy.json"), ) DEF_COMPANY_JSON = os.environ.get( "OTOPCUA_COMPANY_JSON", os.path.join(os.path.dirname(__file__), "..", "company-uns.json"), ) ID_PREFIX = "nw-mirror-" # SystemPlatform galaxy-mirror TagId prefix LOAD_PLAN = os.path.join(os.path.dirname(__file__), "load-plan.json") # Equipment-overlay (company-shape) object ids — all carry the nw- prefix so # `clean` can remove them. The Equipment namespace is a SECOND namespace loaded # alongside the galaxy mirror; its leaves stay BadWaitingForInitialData until the # value milestone (scope doc WS-3) wires a driver/VirtualTag source. EQ_CLUSTER = os.environ.get("OTOPCUA_EQ_CLUSTER", "MAIN") EQ_NS = "nw-uns" EQ_DRIVER = "nw-uns-modbus" # non-Galaxy FK driver (structure-only; doesn't stream) # galaxy dataTypeName / gen_uns dtype → valid OtOpcUa DriverDataType _DTYPE_FIX = {"Double": "Float64", "Float": "Float32"} _ACCESS = {"ReadOnly": "0", "Read": "0", "ReadWrite": "1"} # ── the value signals we mirror, per $TestMachine instance ────────────────── # (galaxy attribute name, OtOpcUa DriverDataType, access '0'=Read/'1'=ReadWrite) SIGNALS = [ ("TestChangingInt", "Int32", "0"), ("TestHistoryValue", "Int32", "0"), ("TestDouble", "Float64", "0"), ("TestFloat", "Float32", "0"), ("TestDuration", "Float64", "0"), ("TestDateTime", "DateTime", "0"), ("ProtectedValue", "Boolean", "1"), ("ProtectedValue1", "Boolean", "1"), ("TestAlarm001", "Boolean", "0"), ("TestAlarm002", "Boolean", "0"), ("TestAlarm003", "Boolean", "0"), ("InAlarm", "Boolean", "0"), ] # ── plan generation (grounded in the real galaxy) ─────────────────────────── def build_plan(galaxy_json, driver): with open(galaxy_json) as f: gal = json.load(f) machines = [ o for o in gal["objects"] if "$TestMachine" in o.get("templateChain", []) ] machines.sort(key=lambda o: o["tagName"]) rows = [] for m in machines: have = {a["attributeName"] for a in m["attributes"]} for attr, dtype, access in SIGNALS: if attr not in have: continue # only mirror attributes this instance really has rows.append({ "tag_id": f"{ID_PREFIX}{m['tagName']}-{attr}".lower(), "driver_instance_id": driver, "name": attr, "folder_path": m["tagName"], # → folder; ref = folder.name "data_type": dtype, "access_level": access, "mxaccess_ref": f"{m['tagName']}.{attr}", }) return { "source": galaxy_json, "driver_instance_id": driver, "machines": len(machines), "tags": len(rows), "rows": rows, } # ── DB helpers ────────────────────────────────────────────────────────────── def connect(cfg): import pymssql conn = pymssql.connect( server=cfg["host"], port=str(cfg["port"]), user=cfg["user"], password=cfg["password"], database=cfg["database"], autocommit=False, ) cur = conn.cursor() # The Tag table has filtered indexes / computed columns; writes require this. cur.execute("SET QUOTED_IDENTIFIER ON; SET ANSI_NULLS ON;") return conn, cur def driver_exists(cur, driver): cur.execute( "SELECT n.Kind FROM dbo.DriverInstance d " "JOIN dbo.Namespace n ON n.NamespaceId = d.NamespaceId " "WHERE d.DriverInstanceId = %s", (driver,)) r = cur.fetchone() return r[0] if r else None # ── commands ──────────────────────────────────────────────────────────────── def cmd_generate(args): plan = build_plan(args.galaxy_json, args.driver) with open(LOAD_PLAN, "w") as f: json.dump(plan, f, indent=2) print(f"plan: {plan['machines']} machines → {plan['tags']} mirror tags (driver {plan['driver_instance_id']})") print(f"wrote {LOAD_PLAN}") return 0 def cmd_populate(args): plan = build_plan(args.galaxy_json, args.driver) conn, cur = connect(args.mssql) kind = driver_exists(cur, args.driver) if kind is None: print(f"ERROR: driver instance '{args.driver}' not found in config DB.", file=sys.stderr) return 2 if kind != "SystemPlatform": print(f"ERROR: driver '{args.driver}' is in a {kind} namespace; the galaxy mirror needs a " f"SystemPlatform/GalaxyMxGateway driver.", file=sys.stderr) return 2 inserted = updated = 0 for r in plan["rows"]: # Upsert by the SystemPlatform natural key (DriverInstanceId, FolderPath, Name) # — the UX_Tag_FolderPath unique index. This adopts any pre-existing seed row for # the same ref into our nw-* set (so `clean` can remove it) and stays idempotent on # re-run. RowId/RowVersion are server-managed. cur.execute( "SELECT TagId FROM dbo.Tag WHERE DriverInstanceId=%s AND FolderPath=%s " "AND Name=%s AND EquipmentId IS NULL", (r["driver_instance_id"], r["folder_path"], r["name"])) if cur.fetchone(): cur.execute( "UPDATE dbo.Tag SET TagId=%s, DataType=%s, AccessLevel=%s, " "WriteIdempotent=0, TagConfig='{}' " "WHERE DriverInstanceId=%s AND FolderPath=%s AND Name=%s AND EquipmentId IS NULL", (r["tag_id"], r["data_type"], r["access_level"], r["driver_instance_id"], r["folder_path"], r["name"])) updated += 1 else: cur.execute( "INSERT INTO dbo.Tag (TagRowId, TagId, DriverInstanceId, EquipmentId, " "Name, FolderPath, DataType, AccessLevel, WriteIdempotent, TagConfig) " "VALUES (NEWID(), %s, %s, NULL, %s, %s, %s, %s, 0, '{}')", (r["tag_id"], r["driver_instance_id"], r["name"], r["folder_path"], r["data_type"], r["access_level"])) inserted += 1 conn.commit() conn.close() print(f"populated: {inserted} inserted, {updated} updated " f"({plan['tags']} mirror tags across {plan['machines']} machines)") print() print(">>> NEXT: open the AdminUI, sign in, and click " "'Deploy current configuration' to seal + serve the load:") print(f" {args.deploy_url}") print(">>> then run: otopcua_uns.py verify --wait") return 0 def cmd_populate_equipment(args): """Load the company-shape Equipment namespace from company-uns.json: a second (Equipment-kind) namespace alongside the galaxy mirror, with the Northwind Area/Line/Equipment/Signal tree. Structure-only — leaves materialise as BadWaitingForInitialData (the value milestone is separate). Idempotent: drop-and-recreate of the nw- overlay rows.""" with open(args.company_json) as f: doc = json.load(f) u = doc["uns"] conn, cur = connect(args.mssql) # Drop any prior overlay (child rows first), then recreate. cur.execute("DELETE FROM dbo.Tag WHERE DriverInstanceId=%s", (EQ_DRIVER,)) cur.execute("DELETE FROM dbo.Equipment WHERE DriverInstanceId=%s", (EQ_DRIVER,)) cur.execute("DELETE FROM dbo.UnsLine WHERE UnsLineId LIKE 'nw-line-%'") cur.execute("DELETE FROM dbo.UnsArea WHERE UnsAreaId LIKE 'nw-area-%'") cur.execute("DELETE FROM dbo.DriverInstance WHERE DriverInstanceId=%s", (EQ_DRIVER,)) cur.execute("DELETE FROM dbo.Namespace WHERE NamespaceId=%s", (EQ_NS,)) cur.execute( "INSERT INTO dbo.Namespace (NamespaceRowId, NamespaceId, ClusterId, Kind, NamespaceUri, Enabled) " "VALUES (NEWID(), %s, %s, 'Equipment', %s, 1)", (EQ_NS, EQ_CLUSTER, doc.get("namespace", {}).get("namespaceUri", "urn:northwind:birmingham:uns"))) cur.execute( "INSERT INTO dbo.DriverInstance (DriverInstanceRowId, DriverInstanceId, ClusterId, NamespaceId, " "Name, DriverType, Enabled, DriverConfig) VALUES (NEWID(), %s, %s, %s, 'Northwind UNS placeholder', 'Modbus', 1, '{}')", (EQ_DRIVER, EQ_CLUSTER, EQ_NS)) for a in u["unsAreas"]: cur.execute("INSERT INTO dbo.UnsArea (UnsAreaRowId, UnsAreaId, ClusterId, Name) VALUES (NEWID(), %s, %s, %s)", ("nw-" + a["unsAreaId"], EQ_CLUSTER, a["name"])) for l in u["unsLines"]: cur.execute("INSERT INTO dbo.UnsLine (UnsLineRowId, UnsLineId, UnsAreaId, Name) VALUES (NEWID(), %s, %s, %s)", ("nw-" + l["unsLineId"], "nw-" + l["unsAreaId"], l["name"])) eq_n = tag_n = 0 for e in u["equipment"]: eq_id = "nw-" + e["equipmentId"] eq_uuid = str(uuid.uuid5(uuid.NAMESPACE_URL, "otopcua-nw-eq/" + e["equipmentId"])) cur.execute( "INSERT INTO dbo.Equipment (EquipmentRowId, EquipmentId, EquipmentUuid, DriverInstanceId, UnsLineId, " "Name, MachineCode, Manufacturer, Model, Enabled) VALUES (NEWID(), %s, %s, %s, %s, %s, %s, %s, %s, 1)", (eq_id, eq_uuid, EQ_DRIVER, "nw-" + e["unsLineId"], e["name"], e["machineCode"], e.get("manufacturer"), e.get("model"))) eq_n += 1 for t in e["tags"]: dtype = _DTYPE_FIX.get(t["dataType"], t["dataType"]) access = _ACCESS.get(t["accessLevel"], "0") folder = t.get("folderPath") or None # Local NodeId == TagConfig.FullName; prefix with nw: so it never collides with the # galaxy-mirror SystemPlatform NodeIds (which use the bare MXAccess ref). full = "nw:" + t["source"]["fullTagReference"] # TagId is capped at 64 chars; a short stable hash keeps it unique. Cleanup is by # DriverInstanceId (not TagId), so no prefix scan is needed. tag_id = "nweq-" + hashlib.sha1( f"{e['equipmentId']}|{folder}|{t['name']}".encode()).hexdigest()[:20] cfg = json.dumps({"FullName": full, "DataType": dtype}) cur.execute( "INSERT INTO dbo.Tag (TagRowId, TagId, DriverInstanceId, EquipmentId, Name, FolderPath, " "DataType, AccessLevel, WriteIdempotent, TagConfig) VALUES (NEWID(), %s, %s, %s, %s, %s, %s, %s, 0, %s)", (tag_id, EQ_DRIVER, eq_id, t["name"], folder, dtype, access, cfg)) tag_n += 1 conn.commit() conn.close() print(f"populated equipment overlay: namespace {EQ_NS} ({EQ_CLUSTER}), " f"{len(u['unsAreas'])} areas, {len(u['unsLines'])} lines, {eq_n} equipment, {tag_n} signals") print() print(f">>> NEXT: deploy (headless) — curl -s -X POST {args.deploy_url.replace('/deployments','')}/api/deployments " f"-H 'X-Api-Key: {args.deploy_key}'") print(">>> then run: otopcua_uns.py verify-equipment") return 0 def cmd_clean(args): conn, cur = connect(args.mssql) cur.execute("DELETE FROM dbo.Tag WHERE TagId LIKE %s", (ID_PREFIX + "%",)) n = cur.rowcount # Also drop the company-shape Equipment overlay (child rows first). cur.execute("DELETE FROM dbo.Tag WHERE DriverInstanceId=%s", (EQ_DRIVER,)) cur.execute("DELETE FROM dbo.Equipment WHERE DriverInstanceId=%s", (EQ_DRIVER,)) cur.execute("DELETE FROM dbo.UnsLine WHERE UnsLineId LIKE 'nw-line-%'") cur.execute("DELETE FROM dbo.UnsArea WHERE UnsAreaId LIKE 'nw-area-%'") cur.execute("DELETE FROM dbo.DriverInstance WHERE DriverInstanceId=%s", (EQ_DRIVER,)) cur.execute("DELETE FROM dbo.Namespace WHERE NamespaceId=%s", (EQ_NS,)) conn.commit() conn.close() print(f"removed {n} nw-* mirror tag(s) + the {EQ_NS} equipment overlay. " f"Deploy again at {args.deploy_url} to drop them from the address space.") return 0 def cmd_status(args): conn, cur = connect(args.mssql) cur.execute("SELECT COUNT(*) FROM dbo.Tag WHERE TagId LIKE %s", (ID_PREFIX + "%",)) db_tags = cur.fetchone()[0] # Deployment.Status: 2 = Sealed (the snapshot driver nodes apply). cur.execute("SELECT TOP 1 RevisionHash, SealedAtUtc FROM dbo.Deployment " "WHERE Status=2 ORDER BY SealedAtUtc DESC") dep = cur.fetchone() conn.close() print(f"config DB : {db_tags} mirror tags (nw-*) present") print(f"last sealed : {('rev '+dep[0][:12]+'… @ '+str(dep[1])) if dep else '(none)'}") folders, variables, good = browse_summary(args.opcua_endpoint) print(f"address space : {folders} machine folder(s), {variables} variable(s), {good} value(s) Good on {args.opcua_endpoint}") return 0 def cmd_verify(args): plan = build_plan(args.galaxy_json, args.driver) expected = plan["tags"] deadline = time.time() + (args.wait_seconds if args.wait else 0) while True: folders, variables, good = browse_summary(args.opcua_endpoint) ok = variables >= expected and good >= max(1, int(expected * 0.5)) if ok or time.time() >= deadline: break print(f" waiting for deploy… ({variables}/{expected} vars, {good} Good)") time.sleep(5) print(f"expected mirror tags : {expected}") print(f"address-space vars : {variables} (in {folders} folders)") print(f"values Good (live) : {good}") sample = sample_values(args.opcua_endpoint, 6) for nm, val, sc in sample: print(f" {nm} = {val} [{sc}]") passed = variables >= expected and good >= max(1, int(expected * 0.5)) print("VERIFY:", "PASS — UNS loaded and live" if passed else "INCOMPLETE — did you Deploy at the AdminUI?") return 0 if passed else 1 # ── OPC UA helpers (asyncua) ──────────────────────────────────────────────── def browse_summary(endpoint): import asyncio from asyncua import Client async def run(): folders = variables = good = 0 async with Client(endpoint) as c: for k in await c.nodes.objects.get_children(): if (await k.read_browse_name()).Name != "OtOpcUa": continue for f in await k.get_children(): folders += 1 for v in await f.get_children(): variables += 1 try: dv = await v.read_data_value() if dv.StatusCode and dv.StatusCode.is_good(): good += 1 except Exception: pass return folders, variables, good try: return asyncio.run(run()) except Exception as e: return (f"<{type(e).__name__}>", 0, 0) def sample_values(endpoint, n): import asyncio from asyncua import Client async def run(): out = [] async with Client(endpoint) as c: for k in await c.nodes.objects.get_children(): if (await k.read_browse_name()).Name != "OtOpcUa": continue for f in await k.get_children(): for v in await f.get_children(): try: dv = await v.read_data_value() sc = dv.StatusCode.name if dv.StatusCode else "?" out.append(((await v.read_browse_name()).Name, dv.Value.Value, sc)) except Exception as e: out.append(((await v.read_browse_name()).Name, f"<{type(e).__name__}>", "?")) if len(out) >= n: return out return out try: return asyncio.run(run()) except Exception as e: return [("", str(e), "?")] def browse_tree(endpoint, max_depth=8, top_prefix=None): """Recursively descend the OtOpcUa address space and count leaf variables, returning (folder_count, leaf_count, leaf_paths). A node with no children is a leaf signal — this correctly handles the DEEP Equipment UNS tree (Area/Line/Equipment/[FolderPath]/Signal), unlike browse_summary which assumes the flat 2-level Galaxy hierarchy. When top_prefix is set, only top-level OtOpcUa folders whose browse name starts with it are counted (e.g. 'nw-area-' scopes to the company Equipment overlay, excluding the Galaxy mirror folders).""" import asyncio from asyncua import Client async def walk(node, path, depth, acc): if depth >= max_depth: return for ch in await node.get_children(): try: name = (await ch.read_browse_name()).Name except Exception: continue child_path = path + "/" + name grandkids = await ch.get_children() if grandkids: acc["folders"] += 1 await walk(ch, child_path, depth + 1, acc) else: acc["leaves"] += 1 acc["paths"].append(child_path) async def run(): acc = {"folders": 0, "leaves": 0, "paths": []} async with Client(endpoint) as c: for k in await c.nodes.objects.get_children(): if (await k.read_browse_name()).Name != "OtOpcUa": continue for top in await k.get_children(): tn = (await top.read_browse_name()).Name if top_prefix and not tn.startswith(top_prefix): continue if await top.get_children(): acc["folders"] += 1 await walk(top, "OtOpcUa/" + tn, 1, acc) else: acc["leaves"] += 1 acc["paths"].append("OtOpcUa/" + tn) return acc["folders"], acc["leaves"], acc["paths"] try: return asyncio.run(run()) except Exception as e: return (f"<{type(e).__name__}: {e}>", 0, []) def cmd_verify_equipment(args): """Browse the full UNS tree by friendly Area/Line/Equipment/Signal names and report the leaf signal count. With --expect N, exit non-zero unless exactly N leaf signals are present (the equipment-namespace structure-materialisation check).""" top_prefix = None if args.all else "nw-area-" folders, leaves, paths = browse_tree(args.opcua_endpoint, top_prefix=top_prefix) scope = "whole address space" if args.all else "company overlay (nw-area-*)" print(f"equipment tree : {folders} folder(s), {leaves} leaf signal(s) on {args.opcua_endpoint} [{scope}]") for p in sorted(paths)[:args.show]: print(f" {p}") if len(paths) > args.show: print(f" … and {len(paths) - args.show} more") if args.expect is not None: passed = leaves == args.expect print("VERIFY-EQUIPMENT:", f"PASS ({leaves} == {args.expect})" if passed else f"FAIL (expected {args.expect}, found {leaves})") return 0 if passed else 1 return 0 # ── arg parsing ───────────────────────────────────────────────────────────── def main(argv): p = argparse.ArgumentParser(description="Reloadable populate + verify for the OtOpcUa galaxy UNS.") p.add_argument("--galaxy-json", default=DEF_GALAXY_JSON) p.add_argument("--driver", default=DEF_DRIVER, help="SystemPlatform GalaxyMxGateway driver instance id") p.add_argument("--opcua-endpoint", default=DEF_OPCUA) p.add_argument("--deploy-url", default="http://localhost:9200/deployments") p.add_argument("--deploy-key", default=os.environ.get("OTOPCUA_DEPLOY_KEY", "docker-dev-deploy-key"), help="X-Api-Key for the headless POST /api/deployments endpoint") p.add_argument("--company-json", default=DEF_COMPANY_JSON) p.add_argument("--sql-host", default=DEF_MSSQL["host"]) p.add_argument("--sql-port", type=int, default=DEF_MSSQL["port"]) p.add_argument("--sql-user", default=DEF_MSSQL["user"]) p.add_argument("--sql-password", default=DEF_MSSQL["password"]) p.add_argument("--sql-db", default=DEF_MSSQL["database"]) sub = p.add_subparsers(dest="cmd", required=True) sub.add_parser("generate") sub.add_parser("populate") sub.add_parser("populate-equipment", help="load the company-shape Equipment namespace from company-uns.json (structure-only)") sub.add_parser("clean") sub.add_parser("status") vp = sub.add_parser("verify") vp.add_argument("--wait", action="store_true", help="poll until the deploy lands") vp.add_argument("--wait-seconds", type=int, default=120) ep = sub.add_parser("verify-equipment", help="recursively browse the Equipment UNS tree + count leaf signals") ep.add_argument("--expect", type=int, default=None, help="assert exactly N leaf signals") ep.add_argument("--show", type=int, default=20, help="how many leaf paths to print") ep.add_argument("--all", action="store_true", help="count the whole address space (default: only the nw-area-* company overlay)") a = p.parse_args(argv) a.mssql = dict(host=a.sql_host, port=a.sql_port, user=a.sql_user, password=a.sql_password, database=a.sql_db) return { "generate": cmd_generate, "populate": cmd_populate, "populate-equipment": cmd_populate_equipment, "clean": cmd_clean, "status": cmd_status, "verify": cmd_verify, "verify-equipment": cmd_verify_equipment, }[a.cmd](a) if __name__ == "__main__": sys.exit(main(sys.argv[1:]))