Files
scadaproj/otopcua-uns-loader/otopcua_uns.py
T

602 lines
29 KiB
Python

#!/usr/bin/env python3
"""
otopcua_uns.py — reloadable populate + verify for the OtOpcUa galaxy UNS.
Recreates and verifies an OtOpcUa Unified-Namespace load grounded in the real
AVEVA Galaxy "DEV" hierarchy (the 40 TestMachine instances). Designed to be
re-run after the OtOpcUa docker-dev instance is rebuilt.
Pipeline (see scadaproj/memory otopcua-uns-deploy-and-value-streaming):
populate ──SQL──▶ live config tables (Tag rows, nw-* prefix)
you click "Deploy current configuration" at :9200
driver applies ▶ materialises OtOpcUa/<machine>/<signal> ▶ SubscribeBulk
verify ──OPC UA──▶ browse + read live values on :4840
What it loads: one SystemPlatform Tag per (machine, signal) bound to the
existing GalaxyMxGateway driver. Each tag's FolderPath is the Galaxy object and
its Name the attribute, so the materialised variable NodeId is exactly the
MXAccess ref the driver subscribes to — giving live values. Every row carries
the `nw-` id prefix so `clean` can remove them without touching other config.
Idempotent: populate upserts by TagId; re-running is a no-op when unchanged.
There are TWO overlays:
• the galaxy-native mirror (`populate`) — SystemPlatform driver Tags, 396 tags;
• the Northwind company shape (`populate-equipment`) — an Equipment-kind namespace
whose 1036 signals are VirtualTags. Each VirtualTag's Script simply mirrors the
live galaxy-mirror tag (`return ctx.GetTag("<fullTagReference>").Value;`), so the
company shape carries live VALUES driven off the same Galaxy source.
Subcommands:
generate Build the load plan from galaxy-hierarchy.json (writes load-plan.json)
populate Upsert the SystemPlatform mirror Tag rows into the config DB
populate-equipment Load the company shape as VirtualTag+Script rows (mirror the galaxy tags)
verify Check DB rows present + live OPC UA values are Good on :4840
verify-equipment Browse the company tree; --require-good asserts live values
status Show config-DB + address-space state
clean Delete all nw-* mirror Tags + the company VirtualTag/Script overlay
Deploy is a human-gated AdminUI action (no SQL/REST trigger exists); populate
and clean print the reminder and `verify --wait` polls until it lands.
Deps: pymssql, asyncua (see requirements.txt; use the bundled .venv).
"""
import argparse
import hashlib
import json
import os
import re
import sys
import time
import uuid
# ── config (overridable via env / flags) ───────────────────────────────────
DEF_MSSQL = dict(
host=os.environ.get("OTOPCUA_SQL_HOST", "localhost"),
port=int(os.environ.get("OTOPCUA_SQL_PORT", "14330")),
user=os.environ.get("OTOPCUA_SQL_USER", "sa"),
password=os.environ.get("OTOPCUA_SQL_PASSWORD", "OtOpcUa!Dev123"),
database=os.environ.get("OTOPCUA_SQL_DB", "OtOpcUa"),
)
DEF_OPCUA = os.environ.get("OTOPCUA_OPCUA_ENDPOINT", "opc.tcp://localhost:4840")
DEF_DRIVER = os.environ.get("OTOPCUA_GALAXY_DRIVER", "MAIN-galaxy-mxgw")
DEF_GALAXY_JSON = os.environ.get(
"OTOPCUA_GALAXY_JSON",
os.path.join(os.path.dirname(__file__), "..", "galaxy-hierarchy.json"),
)
DEF_COMPANY_JSON = os.environ.get(
"OTOPCUA_COMPANY_JSON",
os.path.join(os.path.dirname(__file__), "..", "company-uns.json"),
)
ID_PREFIX = "nw-mirror-" # SystemPlatform galaxy-mirror TagId prefix
LOAD_PLAN = os.path.join(os.path.dirname(__file__), "load-plan.json")
# Equipment-overlay (company-shape) object ids — all carry the nw- prefix so
# `clean` can remove them. The Equipment namespace is a SECOND namespace loaded
# alongside the galaxy mirror. Each company signal is a VirtualTag (+ Script) whose
# script mirrors the live SystemPlatform galaxy-mirror tag for that signal — so the
# overlay carries live VALUES (scope doc WS-3), not just structure.
EQ_CLUSTER = os.environ.get("OTOPCUA_EQ_CLUSTER", "MAIN")
EQ_NS = "nw-uns"
EQ_ID_PREFIX = "nweq-" # VirtualTag/Script logical-id prefix (cleanup by prefix scan)
# galaxy dataTypeName / gen_uns dtype → valid OtOpcUa DriverDataType
_DTYPE_FIX = {"Double": "Float64", "Float": "Float32"}
_ACCESS = {"ReadOnly": "0", "Read": "0", "ReadWrite": "1"}
# ── the value signals we mirror, per $TestMachine instance ──────────────────
# (galaxy attribute name, OtOpcUa DriverDataType, access '0'=Read/'1'=ReadWrite)
SIGNALS = [
("TestChangingInt", "Int32", "0"),
("TestHistoryValue", "Int32", "0"),
("TestDouble", "Float64", "0"),
("TestFloat", "Float32", "0"),
("TestDuration", "Float64", "0"),
("TestDateTime", "DateTime", "0"),
("ProtectedValue", "Boolean", "1"),
("ProtectedValue1", "Boolean", "1"),
("TestAlarm001", "Boolean", "0"),
("TestAlarm002", "Boolean", "0"),
("TestAlarm003", "Boolean", "0"),
("InAlarm", "Boolean", "0"),
]
# ── plan generation (grounded in the real galaxy) ───────────────────────────
def build_plan(galaxy_json, driver):
with open(galaxy_json) as f:
gal = json.load(f)
machines = [
o for o in gal["objects"]
if "$TestMachine" in o.get("templateChain", [])
]
machines.sort(key=lambda o: o["tagName"])
rows = []
for m in machines:
have = {a["attributeName"] for a in m["attributes"]}
for attr, dtype, access in SIGNALS:
if attr not in have:
continue # only mirror attributes this instance really has
rows.append({
"tag_id": f"{ID_PREFIX}{m['tagName']}-{attr}".lower(),
"driver_instance_id": driver,
"name": attr,
"folder_path": m["tagName"], # → folder; ref = folder.name
"data_type": dtype,
"access_level": access,
"mxaccess_ref": f"{m['tagName']}.{attr}",
})
return {
"source": galaxy_json,
"driver_instance_id": driver,
"machines": len(machines),
"tags": len(rows),
"rows": rows,
}
# ── DB helpers ──────────────────────────────────────────────────────────────
def connect(cfg):
import pymssql
conn = pymssql.connect(
server=cfg["host"], port=str(cfg["port"]), user=cfg["user"],
password=cfg["password"], database=cfg["database"], autocommit=False,
)
cur = conn.cursor()
# The Tag table has filtered indexes / computed columns; writes require this.
cur.execute("SET QUOTED_IDENTIFIER ON; SET ANSI_NULLS ON;")
return conn, cur
def driver_exists(cur, driver):
cur.execute(
"SELECT n.Kind FROM dbo.DriverInstance d "
"JOIN dbo.Namespace n ON n.NamespaceId = d.NamespaceId "
"WHERE d.DriverInstanceId = %s", (driver,))
r = cur.fetchone()
return r[0] if r else None
# ── commands ────────────────────────────────────────────────────────────────
def cmd_generate(args):
plan = build_plan(args.galaxy_json, args.driver)
with open(LOAD_PLAN, "w") as f:
json.dump(plan, f, indent=2)
print(f"plan: {plan['machines']} machines → {plan['tags']} mirror tags (driver {plan['driver_instance_id']})")
print(f"wrote {LOAD_PLAN}")
return 0
def cmd_populate(args):
plan = build_plan(args.galaxy_json, args.driver)
conn, cur = connect(args.mssql)
kind = driver_exists(cur, args.driver)
if kind is None:
print(f"ERROR: driver instance '{args.driver}' not found in config DB.", file=sys.stderr)
return 2
if kind != "SystemPlatform":
print(f"ERROR: driver '{args.driver}' is in a {kind} namespace; the galaxy mirror needs a "
f"SystemPlatform/GalaxyMxGateway driver.", file=sys.stderr)
return 2
inserted = updated = 0
for r in plan["rows"]:
# Upsert by the SystemPlatform natural key (DriverInstanceId, FolderPath, Name)
# — the UX_Tag_FolderPath unique index. This adopts any pre-existing seed row for
# the same ref into our nw-* set (so `clean` can remove it) and stays idempotent on
# re-run. RowId/RowVersion are server-managed.
cur.execute(
"SELECT TagId FROM dbo.Tag WHERE DriverInstanceId=%s AND FolderPath=%s "
"AND Name=%s AND EquipmentId IS NULL",
(r["driver_instance_id"], r["folder_path"], r["name"]))
if cur.fetchone():
cur.execute(
"UPDATE dbo.Tag SET TagId=%s, DataType=%s, AccessLevel=%s, "
"WriteIdempotent=0, TagConfig='{}' "
"WHERE DriverInstanceId=%s AND FolderPath=%s AND Name=%s AND EquipmentId IS NULL",
(r["tag_id"], r["data_type"], r["access_level"],
r["driver_instance_id"], r["folder_path"], r["name"]))
updated += 1
else:
cur.execute(
"INSERT INTO dbo.Tag (TagRowId, TagId, DriverInstanceId, EquipmentId, "
"Name, FolderPath, DataType, AccessLevel, WriteIdempotent, TagConfig) "
"VALUES (NEWID(), %s, %s, NULL, %s, %s, %s, %s, 0, '{}')",
(r["tag_id"], r["driver_instance_id"], r["name"], r["folder_path"],
r["data_type"], r["access_level"]))
inserted += 1
conn.commit()
conn.close()
print(f"populated: {inserted} inserted, {updated} updated "
f"({plan['tags']} mirror tags across {plan['machines']} machines)")
print()
print(">>> NEXT: open the AdminUI, sign in, and click "
"'Deploy current configuration' to seal + serve the load:")
print(f" {args.deploy_url}")
print(">>> then run: otopcua_uns.py verify --wait")
return 0
def _eq_signal_ids(equipment_id, folder, name):
"""Deterministic (VirtualTagId, ScriptId) for a company signal. Both carry the
EQ_ID_PREFIX so `clean` removes exactly what was created. The two ids share the
same per-signal hash but differ by a kind token so they never collide across the
global UX_VirtualTag_LogicalId / UX_Script_LogicalId unique indexes. Capped at the
64-char id column width."""
base = hashlib.sha1(f"{equipment_id}|{folder}|{name}".encode()).hexdigest()[:20]
return EQ_ID_PREFIX + "vt-" + base, EQ_ID_PREFIX + "sc-" + base
def cmd_populate_equipment(args):
"""Load the company-shape Equipment namespace from company-uns.json: a second
(Equipment-kind) namespace alongside the galaxy mirror, with the Northwind
Area/Line/Equipment/Signal tree. Each signal is a VirtualTag whose Script mirrors
the live galaxy-mirror tag for that signal — `return ctx.GetTag("<ref>").Value;` —
so the company shape streams live VALUES off the same Galaxy source (no driver,
no BadWaitingForInitialData once the galaxy mirror is up). Idempotent:
drop-and-recreate of the nw- overlay rows."""
with open(args.company_json) as f:
doc = json.load(f)
u = doc["uns"]
conn, cur = connect(args.mssql)
# Drop any prior overlay (child rows first), then recreate. VirtualTag/Script go
# before Equipment (VirtualTag.EquipmentId logical-FKs Equipment). Equipment is
# scoped by its overlay UnsLine ('nw-line-%') — NOT by the EquipmentId, which is now the
# canonical 'EQ-'+uuid form (see DraftValidator) and no longer carries an 'nw-' prefix.
cur.execute("DELETE FROM dbo.VirtualTag WHERE VirtualTagId LIKE %s", (EQ_ID_PREFIX + "%",))
cur.execute("DELETE FROM dbo.Script WHERE ScriptId LIKE %s", (EQ_ID_PREFIX + "%",))
cur.execute("DELETE FROM dbo.Equipment WHERE UnsLineId LIKE 'nw-line-%'")
cur.execute("DELETE FROM dbo.UnsLine WHERE UnsLineId LIKE 'nw-line-%'")
cur.execute("DELETE FROM dbo.UnsArea WHERE UnsAreaId LIKE 'nw-area-%'")
# Equipment is now driver-less, but purge any driver still bound to the overlay namespace —
# self-heals environments that ran an older loader which created the 'nw-uns-modbus' placeholder.
cur.execute("DELETE FROM dbo.DriverInstance WHERE NamespaceId=%s", (EQ_NS,))
cur.execute("DELETE FROM dbo.Namespace WHERE NamespaceId=%s", (EQ_NS,))
cur.execute(
"INSERT INTO dbo.Namespace (NamespaceRowId, NamespaceId, ClusterId, Kind, NamespaceUri, Enabled) "
"VALUES (NEWID(), %s, %s, 'Equipment', %s, 1)",
(EQ_NS, EQ_CLUSTER, doc.get("namespace", {}).get("namespaceUri", "urn:northwind:birmingham:uns")))
for a in u["unsAreas"]:
cur.execute("INSERT INTO dbo.UnsArea (UnsAreaRowId, UnsAreaId, ClusterId, Name) VALUES (NEWID(), %s, %s, %s)",
("nw-" + a["unsAreaId"], EQ_CLUSTER, a["name"]))
for l in u["unsLines"]:
cur.execute("INSERT INTO dbo.UnsLine (UnsLineRowId, UnsLineId, UnsAreaId, Name) VALUES (NEWID(), %s, %s, %s)",
("nw-" + l["unsLineId"], "nw-" + l["unsAreaId"], l["name"]))
eq_n = vt_n = 0
for e in u["equipment"]:
eq_uuid = uuid.uuid5(uuid.NAMESPACE_URL, "otopcua-nw-eq/" + e["equipmentId"])
# Canonical EquipmentId: matches OtOpcUa DraftValidator.DeriveEquipmentId
# ("EQ-" + EquipmentUuid.ToString("N")[..12].ToLowerInvariant()). uuid.UUID.hex is
# already lowercase, 32 hex chars, no dashes — .hex[:12] is the first 12.
eq_id = "EQ-" + eq_uuid.hex[:12].lower()
eq_uuid = str(eq_uuid)
cur.execute(
"INSERT INTO dbo.Equipment (EquipmentRowId, EquipmentId, EquipmentUuid, UnsLineId, "
"Name, MachineCode, Manufacturer, Model, Enabled) VALUES (NEWID(), %s, %s, %s, %s, %s, %s, %s, 1)",
(eq_id, eq_uuid, "nw-" + e["unsLineId"], e["name"], e["machineCode"],
e.get("manufacturer"), e.get("model")))
eq_n += 1
for t in e["tags"]:
dtype = _DTYPE_FIX.get(t["dataType"], t["dataType"])
folder = t.get("folderPath")
# The galaxy-mirror MXAccess ref (e.g. TestMachine_001.TestDouble) is the upstream
# the VirtualTag mirrors. DependencyExtractor harvests the literal in ctx.GetTag(),
# so the engine subscribes to exactly this path on the galaxy-mirror driver.
full = t["source"]["fullTagReference"]
vt_id, sc_id = _eq_signal_ids(e["equipmentId"], folder, t["name"])
source_code = f'return ctx.GetTag("{full}").Value;'
source_hash = hashlib.sha256(source_code.encode()).hexdigest()
cur.execute(
"INSERT INTO dbo.Script (ScriptRowId, ScriptId, Name, SourceCode, SourceHash, Language) "
"VALUES (NEWID(), %s, %s, %s, %s, 'CSharp')",
(sc_id, t["name"], source_code, source_hash))
cur.execute(
"INSERT INTO dbo.VirtualTag (VirtualTagRowId, VirtualTagId, EquipmentId, Name, DataType, "
"ScriptId, ChangeTriggered, TimerIntervalMs, Historize, Enabled) "
"VALUES (NEWID(), %s, %s, %s, %s, %s, 1, NULL, %s, 1)",
(vt_id, eq_id, t["name"], dtype, sc_id, 1 if t.get("historize") else 0))
vt_n += 1
conn.commit()
conn.close()
print(f"populated equipment overlay: namespace {EQ_NS} ({EQ_CLUSTER}), "
f"{len(u['unsAreas'])} areas, {len(u['unsLines'])} lines, {eq_n} equipment, "
f"{vt_n} VirtualTags (+ {vt_n} mirror Scripts)")
print()
print(f">>> NEXT: deploy (headless) — curl -s -X POST {args.deploy_url.replace('/deployments','')}/api/deployments "
f"-H 'X-Api-Key: {args.deploy_key}'")
print(">>> then run: otopcua_uns.py verify-equipment --require-good 1036 --wait")
return 0
def cmd_clean(args):
conn, cur = connect(args.mssql)
cur.execute("DELETE FROM dbo.Tag WHERE TagId LIKE %s", (ID_PREFIX + "%",))
n = cur.rowcount
# Also drop the company-shape Equipment overlay (child rows first): VirtualTag and
# Script (both nweq-*) before Equipment, then the rest.
# Equipment is scoped by its overlay UnsLine ('nw-line-%') — NOT by the EquipmentId,
# which is now the canonical 'EQ-'+uuid form (see DraftValidator) with no 'nw-' prefix.
cur.execute("DELETE FROM dbo.VirtualTag WHERE VirtualTagId LIKE %s", (EQ_ID_PREFIX + "%",))
cur.execute("DELETE FROM dbo.Script WHERE ScriptId LIKE %s", (EQ_ID_PREFIX + "%",))
cur.execute("DELETE FROM dbo.Equipment WHERE UnsLineId LIKE 'nw-line-%'")
cur.execute("DELETE FROM dbo.UnsLine WHERE UnsLineId LIKE 'nw-line-%'")
cur.execute("DELETE FROM dbo.UnsArea WHERE UnsAreaId LIKE 'nw-area-%'")
# Purge any driver still bound to the overlay namespace (e.g. the legacy 'nw-uns-modbus'
# placeholder created by an older loader) so 'clean' fully removes the overlay.
cur.execute("DELETE FROM dbo.DriverInstance WHERE NamespaceId=%s", (EQ_NS,))
cur.execute("DELETE FROM dbo.Namespace WHERE NamespaceId=%s", (EQ_NS,))
conn.commit()
conn.close()
print(f"removed {n} nw-* mirror tag(s) + the {EQ_NS} equipment overlay. "
f"Deploy again at {args.deploy_url} to drop them from the address space.")
return 0
def cmd_status(args):
conn, cur = connect(args.mssql)
cur.execute("SELECT COUNT(*) FROM dbo.Tag WHERE TagId LIKE %s", (ID_PREFIX + "%",))
db_tags = cur.fetchone()[0]
# Deployment.Status: 2 = Sealed (the snapshot driver nodes apply).
cur.execute("SELECT TOP 1 RevisionHash, SealedAtUtc FROM dbo.Deployment "
"WHERE Status=2 ORDER BY SealedAtUtc DESC")
dep = cur.fetchone()
conn.close()
print(f"config DB : {db_tags} mirror tags (nw-*) present")
print(f"last sealed : {('rev '+dep[0][:12]+'… @ '+str(dep[1])) if dep else '(none)'}")
folders, variables, good = browse_summary(args.opcua_endpoint)
print(f"address space : {folders} machine folder(s), {variables} variable(s), {good} value(s) Good on {args.opcua_endpoint}")
return 0
def cmd_verify(args):
plan = build_plan(args.galaxy_json, args.driver)
expected = plan["tags"]
deadline = time.time() + (args.wait_seconds if args.wait else 0)
while True:
folders, variables, good = browse_summary(args.opcua_endpoint)
ok = variables >= expected and good >= max(1, int(expected * 0.5))
if ok or time.time() >= deadline:
break
print(f" waiting for deploy… ({variables}/{expected} vars, {good} Good)")
time.sleep(5)
print(f"expected mirror tags : {expected}")
print(f"address-space vars : {variables} (in {folders} folders)")
print(f"values Good (live) : {good}")
sample = sample_values(args.opcua_endpoint, 6)
for nm, val, sc in sample:
print(f" {nm} = {val} [{sc}]")
passed = variables >= expected and good >= max(1, int(expected * 0.5))
print("VERIFY:", "PASS — UNS loaded and live" if passed else "INCOMPLETE — did you Deploy at the AdminUI?")
return 0 if passed else 1
# ── OPC UA helpers (asyncua) ────────────────────────────────────────────────
def browse_summary(endpoint):
import asyncio
from asyncua import Client
async def run():
folders = variables = good = 0
async with Client(endpoint) as c:
for k in await c.nodes.objects.get_children():
if (await k.read_browse_name()).Name != "OtOpcUa":
continue
for f in await k.get_children():
folders += 1
for v in await f.get_children():
variables += 1
try:
dv = await v.read_data_value()
if dv.StatusCode and dv.StatusCode.is_good():
good += 1
except Exception:
pass
return folders, variables, good
try:
return asyncio.run(run())
except Exception as e:
return (f"<{type(e).__name__}>", 0, 0)
def sample_values(endpoint, n):
import asyncio
from asyncua import Client
async def run():
out = []
async with Client(endpoint) as c:
for k in await c.nodes.objects.get_children():
if (await k.read_browse_name()).Name != "OtOpcUa":
continue
for f in await k.get_children():
for v in await f.get_children():
try:
dv = await v.read_data_value()
sc = dv.StatusCode.name if dv.StatusCode else "?"
out.append(((await v.read_browse_name()).Name, dv.Value.Value, sc))
except Exception as e:
out.append(((await v.read_browse_name()).Name, f"<{type(e).__name__}>", "?"))
if len(out) >= n:
return out
return out
try:
return asyncio.run(run())
except Exception as e:
return [("<browse error>", str(e), "?")]
def browse_tree(endpoint, max_depth=8, top_prefix=None, read_values=False):
"""Recursively descend the OtOpcUa address space and count leaf variables, returning
(folder_count, leaf_count, leaf_paths, good_count). A node with no children is a leaf
signal — this correctly handles the DEEP Equipment UNS tree
(Area/Line/Equipment/[FolderPath]/Signal), unlike browse_summary which assumes the flat
2-level Galaxy hierarchy. When top_prefix is set, only top-level OtOpcUa folders whose
browse name starts with it are counted (e.g. 'nw-area-' scopes to the company Equipment
overlay, excluding the Galaxy mirror folders). When read_values is True, each leaf's value
is read and good_count tallies the Good-quality ones (else good_count is 0)."""
import asyncio
from asyncua import Client
async def maybe_good(node, acc):
if not read_values:
return
try:
dv = await node.read_data_value()
if dv.StatusCode and dv.StatusCode.is_good():
acc["good"] += 1
except Exception:
pass
async def walk(node, path, depth, acc):
if depth >= max_depth:
return
for ch in await node.get_children():
try:
name = (await ch.read_browse_name()).Name
except Exception:
continue
child_path = path + "/" + name
grandkids = await ch.get_children()
if grandkids:
acc["folders"] += 1
await walk(ch, child_path, depth + 1, acc)
else:
acc["leaves"] += 1
acc["paths"].append(child_path)
await maybe_good(ch, acc)
async def run():
acc = {"folders": 0, "leaves": 0, "paths": [], "good": 0}
async with Client(endpoint) as c:
for k in await c.nodes.objects.get_children():
if (await k.read_browse_name()).Name != "OtOpcUa":
continue
for top in await k.get_children():
tn = (await top.read_browse_name()).Name
if top_prefix and not tn.startswith(top_prefix):
continue
if await top.get_children():
acc["folders"] += 1
await walk(top, "OtOpcUa/" + tn, 1, acc)
else:
acc["leaves"] += 1
acc["paths"].append("OtOpcUa/" + tn)
await maybe_good(top, acc)
return acc["folders"], acc["leaves"], acc["paths"], acc["good"]
try:
return asyncio.run(run())
except Exception as e:
return (f"<{type(e).__name__}: {e}>", 0, [], 0)
def cmd_verify_equipment(args):
"""Browse the full UNS tree by friendly Area/Line/Equipment/Signal names and report the leaf
signal count. With --expect N, exit non-zero unless exactly N leaf signals are present (the
equipment-namespace structure-materialisation check). With --require-good N (>0), also read
each leaf's value and require at least N Good ones (the live-VALUE check for the VirtualTag
overlay) — back-compat default 0 = structure-only. --wait polls so it can wait for the deploy
+ change-triggered VirtualTag evaluations to land."""
top_prefix = None if args.all else "nw-area-"
scope = "whole address space" if args.all else "company overlay (nw-area-*)"
read_values = args.require_good > 0
deadline = time.time() + (args.wait_seconds if args.wait else 0)
while True:
folders, leaves, paths, good = browse_tree(
args.opcua_endpoint, top_prefix=top_prefix, read_values=read_values)
struct_ok = args.expect is None or leaves == args.expect
good_ok = good >= args.require_good
if (struct_ok and good_ok) or time.time() >= deadline:
break
print(f" waiting for deploy/values… ({leaves} leaves"
+ (f", {good} Good" if read_values else "") + ")")
time.sleep(5)
suffix = f", {good} Good value(s)" if read_values else ""
print(f"equipment tree : {folders} folder(s), {leaves} leaf signal(s){suffix} "
f"on {args.opcua_endpoint} [{scope}]")
for p in sorted(paths)[:args.show]:
print(f" {p}")
if len(paths) > args.show:
print(f" … and {len(paths) - args.show} more")
passed = True
if args.expect is not None:
struct_ok = leaves == args.expect
passed = passed and struct_ok
print(" structure :",
f"PASS ({leaves} == {args.expect})" if struct_ok
else f"FAIL (expected {args.expect}, found {leaves})")
if args.require_good > 0:
good_ok = good >= args.require_good
passed = passed and good_ok
print(" live good :",
f"PASS ({good} >= {args.require_good})" if good_ok
else f"FAIL (expected >= {args.require_good} Good, found {good})")
if args.expect is None and args.require_good == 0:
return 0
print("VERIFY-EQUIPMENT:", "PASS" if passed else "FAIL")
return 0 if passed else 1
# ── arg parsing ─────────────────────────────────────────────────────────────
def main(argv):
p = argparse.ArgumentParser(description="Reloadable populate + verify for the OtOpcUa galaxy UNS.")
p.add_argument("--galaxy-json", default=DEF_GALAXY_JSON)
p.add_argument("--driver", default=DEF_DRIVER, help="SystemPlatform GalaxyMxGateway driver instance id")
p.add_argument("--opcua-endpoint", default=DEF_OPCUA)
p.add_argument("--deploy-url", default="http://localhost:9200/deployments")
p.add_argument("--deploy-key", default=os.environ.get("OTOPCUA_DEPLOY_KEY", "docker-dev-deploy-key"),
help="X-Api-Key for the headless POST /api/deployments endpoint")
p.add_argument("--company-json", default=DEF_COMPANY_JSON)
p.add_argument("--sql-host", default=DEF_MSSQL["host"])
p.add_argument("--sql-port", type=int, default=DEF_MSSQL["port"])
p.add_argument("--sql-user", default=DEF_MSSQL["user"])
p.add_argument("--sql-password", default=DEF_MSSQL["password"])
p.add_argument("--sql-db", default=DEF_MSSQL["database"])
sub = p.add_subparsers(dest="cmd", required=True)
sub.add_parser("generate")
sub.add_parser("populate")
sub.add_parser("populate-equipment",
help="load the company-shape Equipment namespace from company-uns.json (structure-only)")
sub.add_parser("clean")
sub.add_parser("status")
vp = sub.add_parser("verify")
vp.add_argument("--wait", action="store_true", help="poll until the deploy lands")
vp.add_argument("--wait-seconds", type=int, default=120)
ep = sub.add_parser("verify-equipment",
help="recursively browse the Equipment UNS tree + count leaf signals "
"(+ optionally assert live Good values)")
ep.add_argument("--expect", type=int, default=None, help="assert exactly N leaf signals")
ep.add_argument("--require-good", type=int, default=0,
help="read each leaf's value and require >= N Good ones (0 = structure-only, default)")
ep.add_argument("--show", type=int, default=20, help="how many leaf paths to print")
ep.add_argument("--all", action="store_true",
help="count the whole address space (default: only the nw-area-* company overlay)")
ep.add_argument("--wait", action="store_true",
help="poll until the deploy lands + (with --require-good) values go Good")
ep.add_argument("--wait-seconds", type=int, default=120)
a = p.parse_args(argv)
a.mssql = dict(host=a.sql_host, port=a.sql_port, user=a.sql_user,
password=a.sql_password, database=a.sql_db)
return {
"generate": cmd_generate, "populate": cmd_populate,
"populate-equipment": cmd_populate_equipment, "clean": cmd_clean,
"status": cmd_status, "verify": cmd_verify, "verify-equipment": cmd_verify_equipment,
}[a.cmd](a)
if __name__ == "__main__":
sys.exit(main(sys.argv[1:]))