Files
scadaproj/otopcua-uns-loader/otopcua_uns.py
T
Joseph Doherty eb26bf3248 Add Galaxy UNS artifacts + reloadable OtOpcUa loader tool
galaxy-hierarchy.json: full AVEVA Galaxy DEV hierarchy pulled live via the
MxGateway .NET client (129 objects, 14k attrs). company-uns.json/.tree.txt +
gen_uns.py: a fake-company (Northwind) ISA-95 UNS modeled on OtOpcUa's
Cluster->Namespace->Area->Line->Equipment->Tag schema, grounded in the 40
TestMachine instances. otopcua-uns-loader/: reloadable generate/populate/verify/
clean tool that recreates + verifies the galaxy mirror (396 live tags across 40
machines) in OtOpcUa's config DB after a rebuild.
2026-06-06 14:22:25 -04:00

333 lines
14 KiB
Python

#!/usr/bin/env python3
"""
otopcua_uns.py — reloadable populate + verify for the OtOpcUa galaxy UNS.
Recreates and verifies an OtOpcUa Unified-Namespace load grounded in the real
AVEVA Galaxy "DEV" hierarchy (the 40 TestMachine instances). Designed to be
re-run after the OtOpcUa docker-dev instance is rebuilt.
Pipeline (see scadaproj/memory otopcua-uns-deploy-and-value-streaming):
populate ──SQL──▶ live config tables (Tag rows, nw-* prefix)
you click "Deploy current configuration" at :9200
driver applies ▶ materialises OtOpcUa/<machine>/<signal> ▶ SubscribeBulk
verify ──OPC UA──▶ browse + read live values on :4840
What it loads: one SystemPlatform Tag per (machine, signal) bound to the
existing GalaxyMxGateway driver. Each tag's FolderPath is the Galaxy object and
its Name the attribute, so the materialised variable NodeId is exactly the
MXAccess ref the driver subscribes to — giving live values. Every row carries
the `nw-` id prefix so `clean` can remove them without touching other config.
Idempotent: populate upserts by TagId; re-running is a no-op when unchanged.
Subcommands:
generate Build the load plan from galaxy-hierarchy.json (writes load-plan.json)
populate Upsert the SystemPlatform mirror Tag rows into the config DB
verify Check DB rows present + live OPC UA values are Good on :4840
status Show config-DB + address-space state
clean Delete all nw-* mirror Tag rows
Deploy is a human-gated AdminUI action (no SQL/REST trigger exists); populate
and clean print the reminder and `verify --wait` polls until it lands.
Deps: pymssql, asyncua (see requirements.txt; use the bundled .venv).
"""
import argparse
import json
import os
import re
import sys
import time
# ── config (overridable via env / flags) ───────────────────────────────────
DEF_MSSQL = dict(
host=os.environ.get("OTOPCUA_SQL_HOST", "localhost"),
port=int(os.environ.get("OTOPCUA_SQL_PORT", "14330")),
user=os.environ.get("OTOPCUA_SQL_USER", "sa"),
password=os.environ.get("OTOPCUA_SQL_PASSWORD", "OtOpcUa!Dev123"),
database=os.environ.get("OTOPCUA_SQL_DB", "OtOpcUa"),
)
DEF_OPCUA = os.environ.get("OTOPCUA_OPCUA_ENDPOINT", "opc.tcp://localhost:4840")
DEF_DRIVER = os.environ.get("OTOPCUA_GALAXY_DRIVER", "MAIN-galaxy-mxgw")
DEF_GALAXY_JSON = os.environ.get(
"OTOPCUA_GALAXY_JSON",
os.path.join(os.path.dirname(__file__), "..", "galaxy-hierarchy.json"),
)
ID_PREFIX = "nw-mirror-" # all rows we own carry this TagId prefix
LOAD_PLAN = os.path.join(os.path.dirname(__file__), "load-plan.json")
# ── the value signals we mirror, per $TestMachine instance ──────────────────
# (galaxy attribute name, OtOpcUa DriverDataType, access '0'=Read/'1'=ReadWrite)
SIGNALS = [
("TestChangingInt", "Int32", "0"),
("TestHistoryValue", "Int32", "0"),
("TestDouble", "Float64", "0"),
("TestFloat", "Float32", "0"),
("TestDuration", "Float64", "0"),
("TestDateTime", "DateTime", "0"),
("ProtectedValue", "Boolean", "1"),
("ProtectedValue1", "Boolean", "1"),
("TestAlarm001", "Boolean", "0"),
("TestAlarm002", "Boolean", "0"),
("TestAlarm003", "Boolean", "0"),
("InAlarm", "Boolean", "0"),
]
# ── plan generation (grounded in the real galaxy) ───────────────────────────
def build_plan(galaxy_json, driver):
with open(galaxy_json) as f:
gal = json.load(f)
machines = [
o for o in gal["objects"]
if "$TestMachine" in o.get("templateChain", [])
]
machines.sort(key=lambda o: o["tagName"])
rows = []
for m in machines:
have = {a["attributeName"] for a in m["attributes"]}
for attr, dtype, access in SIGNALS:
if attr not in have:
continue # only mirror attributes this instance really has
rows.append({
"tag_id": f"{ID_PREFIX}{m['tagName']}-{attr}".lower(),
"driver_instance_id": driver,
"name": attr,
"folder_path": m["tagName"], # → folder; ref = folder.name
"data_type": dtype,
"access_level": access,
"mxaccess_ref": f"{m['tagName']}.{attr}",
})
return {
"source": galaxy_json,
"driver_instance_id": driver,
"machines": len(machines),
"tags": len(rows),
"rows": rows,
}
# ── DB helpers ──────────────────────────────────────────────────────────────
def connect(cfg):
import pymssql
conn = pymssql.connect(
server=cfg["host"], port=str(cfg["port"]), user=cfg["user"],
password=cfg["password"], database=cfg["database"], autocommit=False,
)
cur = conn.cursor()
# The Tag table has filtered indexes / computed columns; writes require this.
cur.execute("SET QUOTED_IDENTIFIER ON; SET ANSI_NULLS ON;")
return conn, cur
def driver_exists(cur, driver):
cur.execute(
"SELECT n.Kind FROM dbo.DriverInstance d "
"JOIN dbo.Namespace n ON n.NamespaceId = d.NamespaceId "
"WHERE d.DriverInstanceId = %s", (driver,))
r = cur.fetchone()
return r[0] if r else None
# ── commands ────────────────────────────────────────────────────────────────
def cmd_generate(args):
plan = build_plan(args.galaxy_json, args.driver)
with open(LOAD_PLAN, "w") as f:
json.dump(plan, f, indent=2)
print(f"plan: {plan['machines']} machines → {plan['tags']} mirror tags (driver {plan['driver_instance_id']})")
print(f"wrote {LOAD_PLAN}")
return 0
def cmd_populate(args):
plan = build_plan(args.galaxy_json, args.driver)
conn, cur = connect(args.mssql)
kind = driver_exists(cur, args.driver)
if kind is None:
print(f"ERROR: driver instance '{args.driver}' not found in config DB.", file=sys.stderr)
return 2
if kind != "SystemPlatform":
print(f"ERROR: driver '{args.driver}' is in a {kind} namespace; the galaxy mirror needs a "
f"SystemPlatform/GalaxyMxGateway driver.", file=sys.stderr)
return 2
inserted = updated = 0
for r in plan["rows"]:
# Upsert by the SystemPlatform natural key (DriverInstanceId, FolderPath, Name)
# — the UX_Tag_FolderPath unique index. This adopts any pre-existing seed row for
# the same ref into our nw-* set (so `clean` can remove it) and stays idempotent on
# re-run. RowId/RowVersion are server-managed.
cur.execute(
"SELECT TagId FROM dbo.Tag WHERE DriverInstanceId=%s AND FolderPath=%s "
"AND Name=%s AND EquipmentId IS NULL",
(r["driver_instance_id"], r["folder_path"], r["name"]))
if cur.fetchone():
cur.execute(
"UPDATE dbo.Tag SET TagId=%s, DataType=%s, AccessLevel=%s, "
"WriteIdempotent=0, TagConfig='{}' "
"WHERE DriverInstanceId=%s AND FolderPath=%s AND Name=%s AND EquipmentId IS NULL",
(r["tag_id"], r["data_type"], r["access_level"],
r["driver_instance_id"], r["folder_path"], r["name"]))
updated += 1
else:
cur.execute(
"INSERT INTO dbo.Tag (TagRowId, TagId, DriverInstanceId, EquipmentId, "
"Name, FolderPath, DataType, AccessLevel, WriteIdempotent, TagConfig) "
"VALUES (NEWID(), %s, %s, NULL, %s, %s, %s, %s, 0, '{}')",
(r["tag_id"], r["driver_instance_id"], r["name"], r["folder_path"],
r["data_type"], r["access_level"]))
inserted += 1
conn.commit()
conn.close()
print(f"populated: {inserted} inserted, {updated} updated "
f"({plan['tags']} mirror tags across {plan['machines']} machines)")
print()
print(">>> NEXT: open the AdminUI, sign in, and click "
"'Deploy current configuration' to seal + serve the load:")
print(f" {args.deploy_url}")
print(">>> then run: otopcua_uns.py verify --wait")
return 0
def cmd_clean(args):
conn, cur = connect(args.mssql)
cur.execute("DELETE FROM dbo.Tag WHERE TagId LIKE %s", (ID_PREFIX + "%",))
n = cur.rowcount
conn.commit()
conn.close()
print(f"removed {n} nw-* mirror tag(s). Deploy again at {args.deploy_url} to drop them from the address space.")
return 0
def cmd_status(args):
conn, cur = connect(args.mssql)
cur.execute("SELECT COUNT(*) FROM dbo.Tag WHERE TagId LIKE %s", (ID_PREFIX + "%",))
db_tags = cur.fetchone()[0]
# Deployment.Status: 2 = Sealed (the snapshot driver nodes apply).
cur.execute("SELECT TOP 1 RevisionHash, SealedAtUtc FROM dbo.Deployment "
"WHERE Status=2 ORDER BY SealedAtUtc DESC")
dep = cur.fetchone()
conn.close()
print(f"config DB : {db_tags} mirror tags (nw-*) present")
print(f"last sealed : {('rev '+dep[0][:12]+'… @ '+str(dep[1])) if dep else '(none)'}")
folders, variables, good = browse_summary(args.opcua_endpoint)
print(f"address space : {folders} machine folder(s), {variables} variable(s), {good} value(s) Good on {args.opcua_endpoint}")
return 0
def cmd_verify(args):
plan = build_plan(args.galaxy_json, args.driver)
expected = plan["tags"]
deadline = time.time() + (args.wait_seconds if args.wait else 0)
while True:
folders, variables, good = browse_summary(args.opcua_endpoint)
ok = variables >= expected and good >= max(1, int(expected * 0.5))
if ok or time.time() >= deadline:
break
print(f" waiting for deploy… ({variables}/{expected} vars, {good} Good)")
time.sleep(5)
print(f"expected mirror tags : {expected}")
print(f"address-space vars : {variables} (in {folders} folders)")
print(f"values Good (live) : {good}")
sample = sample_values(args.opcua_endpoint, 6)
for nm, val, sc in sample:
print(f" {nm} = {val} [{sc}]")
passed = variables >= expected and good >= max(1, int(expected * 0.5))
print("VERIFY:", "PASS — UNS loaded and live" if passed else "INCOMPLETE — did you Deploy at the AdminUI?")
return 0 if passed else 1
# ── OPC UA helpers (asyncua) ────────────────────────────────────────────────
def browse_summary(endpoint):
import asyncio
from asyncua import Client
async def run():
folders = variables = good = 0
async with Client(endpoint) as c:
for k in await c.nodes.objects.get_children():
if (await k.read_browse_name()).Name != "OtOpcUa":
continue
for f in await k.get_children():
folders += 1
for v in await f.get_children():
variables += 1
try:
dv = await v.read_data_value()
if dv.StatusCode and dv.StatusCode.is_good():
good += 1
except Exception:
pass
return folders, variables, good
try:
return asyncio.run(run())
except Exception as e:
return (f"<{type(e).__name__}>", 0, 0)
def sample_values(endpoint, n):
import asyncio
from asyncua import Client
async def run():
out = []
async with Client(endpoint) as c:
for k in await c.nodes.objects.get_children():
if (await k.read_browse_name()).Name != "OtOpcUa":
continue
for f in await k.get_children():
for v in await f.get_children():
try:
dv = await v.read_data_value()
sc = dv.StatusCode.name if dv.StatusCode else "?"
out.append(((await v.read_browse_name()).Name, dv.Value.Value, sc))
except Exception as e:
out.append(((await v.read_browse_name()).Name, f"<{type(e).__name__}>", "?"))
if len(out) >= n:
return out
return out
try:
return asyncio.run(run())
except Exception as e:
return [("<browse error>", str(e), "?")]
# ── arg parsing ─────────────────────────────────────────────────────────────
def main(argv):
p = argparse.ArgumentParser(description="Reloadable populate + verify for the OtOpcUa galaxy UNS.")
p.add_argument("--galaxy-json", default=DEF_GALAXY_JSON)
p.add_argument("--driver", default=DEF_DRIVER, help="SystemPlatform GalaxyMxGateway driver instance id")
p.add_argument("--opcua-endpoint", default=DEF_OPCUA)
p.add_argument("--deploy-url", default="http://localhost:9200/deployments")
p.add_argument("--sql-host", default=DEF_MSSQL["host"])
p.add_argument("--sql-port", type=int, default=DEF_MSSQL["port"])
p.add_argument("--sql-user", default=DEF_MSSQL["user"])
p.add_argument("--sql-password", default=DEF_MSSQL["password"])
p.add_argument("--sql-db", default=DEF_MSSQL["database"])
sub = p.add_subparsers(dest="cmd", required=True)
sub.add_parser("generate")
sub.add_parser("populate")
sub.add_parser("clean")
sub.add_parser("status")
vp = sub.add_parser("verify")
vp.add_argument("--wait", action="store_true", help="poll until the deploy lands")
vp.add_argument("--wait-seconds", type=int, default=120)
a = p.parse_args(argv)
a.mssql = dict(host=a.sql_host, port=a.sql_port, user=a.sql_user,
password=a.sql_password, database=a.sql_db)
return {
"generate": cmd_generate, "populate": cmd_populate, "clean": cmd_clean,
"status": cmd_status, "verify": cmd_verify,
}[a.cmd](a)
if __name__ == "__main__":
sys.exit(main(sys.argv[1:]))