Stand up local dev infrastructure (OPC UA, LDAP, MS SQL) with Docker Compose, Python CLI tools for service interaction, and teardown script. Fix GLAuth config mount, OPC PLC node format, and document actual DN/namespace behavior discovered during testing. Resolve Q1-Q8,Q10: .NET 10, Akka.NET 1.5.x, monorepo with slnx, appsettings JWT, Windows Server 2022 site target.
209 lines
6.6 KiB
Python
209 lines
6.6 KiB
Python
#!/usr/bin/env python3
|
|
"""OPC UA client tool for ScadaLink test infrastructure."""
|
|
|
|
import argparse
|
|
import sys
|
|
import time
|
|
|
|
from opcua import Client, ua
|
|
|
|
|
|
DEFAULT_ENDPOINT = "opc.tcp://localhost:50000"
|
|
|
|
|
|
def cmd_check(args):
|
|
"""Connect and report server status."""
|
|
client = Client(args.endpoint)
|
|
try:
|
|
client.connect()
|
|
server_status = client.get_node(ua.ObjectIds.Server_ServerStatus).get_value()
|
|
|
|
print(f"Connected to: {args.endpoint}")
|
|
print(f"Server state: {server_status.State}")
|
|
print(f"Start time: {server_status.StartTime}")
|
|
print(f"Current time: {server_status.CurrentTime}")
|
|
print(f"Build info: {server_status.BuildInfo.ProductName} {server_status.BuildInfo.SoftwareVersion}")
|
|
|
|
ns = client.get_namespace_array()
|
|
print(f"\nNamespaces:")
|
|
for i, n in enumerate(ns):
|
|
print(f" {i}: {n}")
|
|
|
|
endpoints = client.get_endpoints()
|
|
print(f"\nEndpoints ({len(endpoints)}):")
|
|
for ep in endpoints:
|
|
print(f" {ep.EndpointUrl} [{ep.SecurityPolicyUri.split('#')[-1]}]")
|
|
except Exception as e:
|
|
print(f"Error: {e}", file=sys.stderr)
|
|
sys.exit(1)
|
|
finally:
|
|
client.disconnect()
|
|
|
|
|
|
def cmd_browse(args):
|
|
"""Browse server node tree."""
|
|
client = Client(args.endpoint)
|
|
try:
|
|
client.connect()
|
|
if args.path:
|
|
root = client.get_objects_node()
|
|
node = root.get_child(args.path.split("."))
|
|
else:
|
|
node = client.get_objects_node()
|
|
|
|
browse_name = node.get_browse_name()
|
|
print(f"Browsing: {browse_name.Name} ({node.nodeid})")
|
|
print()
|
|
|
|
children = node.get_children()
|
|
for child in children:
|
|
name = child.get_browse_name()
|
|
node_class = child.get_node_class()
|
|
class_name = node_class.name if hasattr(node_class, "name") else str(node_class)
|
|
|
|
if node_class == ua.NodeClass.Variable:
|
|
try:
|
|
value = child.get_value()
|
|
dtype = child.get_data_type_as_variant_type()
|
|
print(f" {name.Name:<30} [{class_name}] {dtype.name} = {value}")
|
|
except Exception:
|
|
print(f" {name.Name:<30} [{class_name}]")
|
|
else:
|
|
print(f" {name.Name:<30} [{class_name}]")
|
|
except Exception as e:
|
|
print(f"Error: {e}", file=sys.stderr)
|
|
sys.exit(1)
|
|
finally:
|
|
client.disconnect()
|
|
|
|
|
|
def cmd_read(args):
|
|
"""Read a node value."""
|
|
client = Client(args.endpoint)
|
|
try:
|
|
client.connect()
|
|
node = client.get_node(args.node)
|
|
data_value = node.get_data_value()
|
|
value = data_value.Value.Value
|
|
dtype = node.get_data_type_as_variant_type()
|
|
ts = data_value.SourceTimestamp
|
|
|
|
print(f"Node: {args.node}")
|
|
print(f"Value: {value}")
|
|
print(f"Data type: {dtype.name}")
|
|
print(f"Timestamp: {ts}")
|
|
except Exception as e:
|
|
print(f"Error: {e}", file=sys.stderr)
|
|
sys.exit(1)
|
|
finally:
|
|
client.disconnect()
|
|
|
|
|
|
def cmd_write(args):
|
|
"""Write a value to a node."""
|
|
type_map = {
|
|
"Double": (ua.VariantType.Double, float),
|
|
"Boolean": (ua.VariantType.Boolean, lambda v: v.lower() in ("true", "1", "yes")),
|
|
"UInt32": (ua.VariantType.UInt32, int),
|
|
"Int32": (ua.VariantType.Int32, int),
|
|
"String": (ua.VariantType.String, str),
|
|
}
|
|
|
|
if args.type not in type_map:
|
|
print(f"Error: unsupported type '{args.type}'. Use one of: {', '.join(type_map)}", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
variant_type, converter = type_map[args.type]
|
|
converted_value = converter(args.value)
|
|
|
|
client = Client(args.endpoint)
|
|
try:
|
|
client.connect()
|
|
node = client.get_node(args.node)
|
|
dv = ua.DataValue(ua.Variant(converted_value, variant_type))
|
|
node.set_value(dv)
|
|
print(f"Wrote {converted_value} ({args.type}) to {args.node}")
|
|
except Exception as e:
|
|
print(f"Error: {e}", file=sys.stderr)
|
|
sys.exit(1)
|
|
finally:
|
|
client.disconnect()
|
|
|
|
|
|
def cmd_monitor(args):
|
|
"""Subscribe and print value changes."""
|
|
nodes = [n.strip() for n in args.nodes.split(",")]
|
|
|
|
client = Client(args.endpoint)
|
|
try:
|
|
client.connect()
|
|
|
|
class Handler:
|
|
def datachange_notification(self, node, val, data):
|
|
print(f" {node} = {val}")
|
|
|
|
handler = Handler()
|
|
sub = client.create_subscription(500, handler)
|
|
|
|
handles = []
|
|
for node_id in nodes:
|
|
node = client.get_node(node_id)
|
|
handle = sub.subscribe_data_change(node)
|
|
handles.append(handle)
|
|
name = node.get_browse_name()
|
|
print(f"Monitoring: {name.Name} ({node_id})")
|
|
|
|
print(f"\nListening for {args.duration}s...\n")
|
|
time.sleep(args.duration)
|
|
|
|
for handle in handles:
|
|
sub.unsubscribe(handle)
|
|
sub.delete()
|
|
print("\nDone.")
|
|
except Exception as e:
|
|
print(f"Error: {e}", file=sys.stderr)
|
|
sys.exit(1)
|
|
finally:
|
|
client.disconnect()
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="OPC UA client tool for ScadaLink test infrastructure")
|
|
parser.add_argument("--endpoint", default=DEFAULT_ENDPOINT, help=f"OPC UA endpoint (default: {DEFAULT_ENDPOINT})")
|
|
|
|
sub = parser.add_subparsers(dest="command", required=True)
|
|
|
|
sub.add_parser("check", help="Connect and report server status")
|
|
|
|
browse_p = sub.add_parser("browse", help="Browse server node tree")
|
|
browse_p.add_argument("--path", help="Node path to browse (e.g. 3:OpcPlc.3:Telemetry)")
|
|
|
|
read_p = sub.add_parser("read", help="Read a node value")
|
|
read_p.add_argument("--node", required=True, help="Node ID (e.g. ns=3;s=Motor.Speed)")
|
|
|
|
write_p = sub.add_parser("write", help="Write a value to a node")
|
|
write_p.add_argument("--node", required=True, help="Node ID")
|
|
write_p.add_argument("--value", required=True, help="Value to write")
|
|
write_p.add_argument("--type", required=True, choices=["Double", "Boolean", "UInt32", "Int32", "String"],
|
|
help="Data type")
|
|
|
|
monitor_p = sub.add_parser("monitor", help="Subscribe and print value changes")
|
|
monitor_p.add_argument("--nodes", required=True, help="Comma-separated node IDs")
|
|
monitor_p.add_argument("--duration", type=int, default=10, help="Duration in seconds (default: 10)")
|
|
|
|
args = parser.parse_args()
|
|
|
|
commands = {
|
|
"check": cmd_check,
|
|
"browse": cmd_browse,
|
|
"read": cmd_read,
|
|
"write": cmd_write,
|
|
"monitor": cmd_monitor,
|
|
}
|
|
|
|
commands[args.command](args)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|