#!/usr/bin/env python3 """OPC UA client tool for ScadaLink test infrastructure.""" import argparse import sys import time from opcua import Client, ua DEFAULT_ENDPOINT = "opc.tcp://localhost:50000" def cmd_check(args): """Connect and report server status.""" client = Client(args.endpoint) try: client.connect() server_status = client.get_node(ua.ObjectIds.Server_ServerStatus).get_value() print(f"Connected to: {args.endpoint}") print(f"Server state: {server_status.State}") print(f"Start time: {server_status.StartTime}") print(f"Current time: {server_status.CurrentTime}") print(f"Build info: {server_status.BuildInfo.ProductName} {server_status.BuildInfo.SoftwareVersion}") ns = client.get_namespace_array() print(f"\nNamespaces:") for i, n in enumerate(ns): print(f" {i}: {n}") endpoints = client.get_endpoints() print(f"\nEndpoints ({len(endpoints)}):") for ep in endpoints: print(f" {ep.EndpointUrl} [{ep.SecurityPolicyUri.split('#')[-1]}]") except Exception as e: print(f"Error: {e}", file=sys.stderr) sys.exit(1) finally: client.disconnect() def cmd_browse(args): """Browse server node tree.""" client = Client(args.endpoint) try: client.connect() if args.path: root = client.get_objects_node() node = root.get_child(args.path.split(".")) else: node = client.get_objects_node() browse_name = node.get_browse_name() print(f"Browsing: {browse_name.Name} ({node.nodeid})") print() children = node.get_children() for child in children: name = child.get_browse_name() node_class = child.get_node_class() class_name = node_class.name if hasattr(node_class, "name") else str(node_class) if node_class == ua.NodeClass.Variable: try: value = child.get_value() dtype = child.get_data_type_as_variant_type() print(f" {name.Name:<30} [{class_name}] {dtype.name} = {value}") except Exception: print(f" {name.Name:<30} [{class_name}]") else: print(f" {name.Name:<30} [{class_name}]") except Exception as e: print(f"Error: {e}", file=sys.stderr) sys.exit(1) finally: client.disconnect() def cmd_read(args): """Read a node value.""" client = Client(args.endpoint) try: client.connect() node = client.get_node(args.node) data_value = node.get_data_value() value = data_value.Value.Value dtype = node.get_data_type_as_variant_type() ts = data_value.SourceTimestamp print(f"Node: {args.node}") print(f"Value: {value}") print(f"Data type: {dtype.name}") print(f"Timestamp: {ts}") except Exception as e: print(f"Error: {e}", file=sys.stderr) sys.exit(1) finally: client.disconnect() def cmd_write(args): """Write a value to a node.""" type_map = { "Double": (ua.VariantType.Double, float), "Boolean": (ua.VariantType.Boolean, lambda v: v.lower() in ("true", "1", "yes")), "UInt32": (ua.VariantType.UInt32, int), "Int32": (ua.VariantType.Int32, int), "String": (ua.VariantType.String, str), } if args.type not in type_map: print(f"Error: unsupported type '{args.type}'. Use one of: {', '.join(type_map)}", file=sys.stderr) sys.exit(1) variant_type, converter = type_map[args.type] converted_value = converter(args.value) client = Client(args.endpoint) try: client.connect() node = client.get_node(args.node) dv = ua.DataValue(ua.Variant(converted_value, variant_type)) node.set_value(dv) print(f"Wrote {converted_value} ({args.type}) to {args.node}") except Exception as e: print(f"Error: {e}", file=sys.stderr) sys.exit(1) finally: client.disconnect() def cmd_monitor(args): """Subscribe and print value changes.""" nodes = [n.strip() for n in args.nodes.split(",")] client = Client(args.endpoint) try: client.connect() class Handler: def datachange_notification(self, node, val, data): print(f" {node} = {val}") handler = Handler() sub = client.create_subscription(500, handler) handles = [] for node_id in nodes: node = client.get_node(node_id) handle = sub.subscribe_data_change(node) handles.append(handle) name = node.get_browse_name() print(f"Monitoring: {name.Name} ({node_id})") print(f"\nListening for {args.duration}s...\n") time.sleep(args.duration) for handle in handles: sub.unsubscribe(handle) sub.delete() print("\nDone.") except Exception as e: print(f"Error: {e}", file=sys.stderr) sys.exit(1) finally: client.disconnect() def main(): parser = argparse.ArgumentParser(description="OPC UA client tool for ScadaLink test infrastructure") parser.add_argument("--endpoint", default=DEFAULT_ENDPOINT, help=f"OPC UA endpoint (default: {DEFAULT_ENDPOINT})") sub = parser.add_subparsers(dest="command", required=True) sub.add_parser("check", help="Connect and report server status") browse_p = sub.add_parser("browse", help="Browse server node tree") browse_p.add_argument("--path", help="Node path to browse (e.g. 3:OpcPlc.3:Telemetry)") read_p = sub.add_parser("read", help="Read a node value") read_p.add_argument("--node", required=True, help="Node ID (e.g. ns=3;s=Motor.Speed)") write_p = sub.add_parser("write", help="Write a value to a node") write_p.add_argument("--node", required=True, help="Node ID") write_p.add_argument("--value", required=True, help="Value to write") write_p.add_argument("--type", required=True, choices=["Double", "Boolean", "UInt32", "Int32", "String"], help="Data type") monitor_p = sub.add_parser("monitor", help="Subscribe and print value changes") monitor_p.add_argument("--nodes", required=True, help="Comma-separated node IDs") monitor_p.add_argument("--duration", type=int, default=10, help="Duration in seconds (default: 10)") args = parser.parse_args() commands = { "check": cmd_check, "browse": cmd_browse, "read": cmd_read, "write": cmd_write, "monitor": cmd_monitor, } commands[args.command](args) if __name__ == "__main__": main()