From 1d498b94b44cbf5f070deff97f451d64ddba62b5 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Thu, 19 Mar 2026 11:20:25 -0400 Subject: [PATCH] feat(infra): add IOpcUaBridge interface and OpcUaBridge with OPC UA reconnection --- infra/lmxfakeproxy/Bridge/IOpcUaBridge.cs | 25 ++ infra/lmxfakeproxy/Bridge/OpcUaBridge.cs | 306 ++++++++++++++++++++++ 2 files changed, 331 insertions(+) create mode 100644 infra/lmxfakeproxy/Bridge/IOpcUaBridge.cs create mode 100644 infra/lmxfakeproxy/Bridge/OpcUaBridge.cs diff --git a/infra/lmxfakeproxy/Bridge/IOpcUaBridge.cs b/infra/lmxfakeproxy/Bridge/IOpcUaBridge.cs new file mode 100644 index 0000000..23d59a7 --- /dev/null +++ b/infra/lmxfakeproxy/Bridge/IOpcUaBridge.cs @@ -0,0 +1,25 @@ +namespace LmxFakeProxy.Bridge; + +public record OpcUaReadResult(object? Value, DateTime SourceTimestamp, uint StatusCode); + +public interface IOpcUaBridge : IAsyncDisposable +{ + bool IsConnected { get; } + + Task ConnectAsync(CancellationToken cancellationToken = default); + + Task ReadAsync(string nodeId, CancellationToken cancellationToken = default); + + Task WriteAsync(string nodeId, object? value, CancellationToken cancellationToken = default); + + Task AddMonitoredItemsAsync( + IEnumerable nodeIds, + int samplingIntervalMs, + Action onValueChanged, + CancellationToken cancellationToken = default); + + Task RemoveMonitoredItemsAsync(string handle, CancellationToken cancellationToken = default); + + event Action? Disconnected; + event Action? Reconnected; +} diff --git a/infra/lmxfakeproxy/Bridge/OpcUaBridge.cs b/infra/lmxfakeproxy/Bridge/OpcUaBridge.cs new file mode 100644 index 0000000..2aa3e90 --- /dev/null +++ b/infra/lmxfakeproxy/Bridge/OpcUaBridge.cs @@ -0,0 +1,306 @@ +using Opc.Ua; +using Opc.Ua.Client; +using Opc.Ua.Configuration; + +namespace LmxFakeProxy.Bridge; + +public class OpcUaBridge : IOpcUaBridge +{ + private readonly string _endpointUrl; + private readonly ILogger _logger; + private Opc.Ua.Client.ISession? _session; + private Subscription? _subscription; + private volatile bool _connected; + private volatile bool _reconnecting; + private CancellationTokenSource? _reconnectCts; + + private readonly Dictionary> _handleItems = new(); + private readonly Dictionary> _handleCallbacks = new(); + private readonly object _lock = new(); + + public OpcUaBridge(string endpointUrl, ILogger logger) + { + _endpointUrl = endpointUrl; + _logger = logger; + } + + public bool IsConnected => _connected; + public event Action? Disconnected; + public event Action? Reconnected; + + public async Task ConnectAsync(CancellationToken cancellationToken = default) + { + var appConfig = new ApplicationConfiguration + { + ApplicationName = "LmxFakeProxy", + ApplicationType = ApplicationType.Client, + SecurityConfiguration = new SecurityConfiguration + { + AutoAcceptUntrustedCertificates = true, + ApplicationCertificate = new CertificateIdentifier(), + TrustedIssuerCertificates = new CertificateTrustList { StorePath = Path.Combine(Path.GetTempPath(), "LmxFakeProxy", "pki", "issuers") }, + TrustedPeerCertificates = new CertificateTrustList { StorePath = Path.Combine(Path.GetTempPath(), "LmxFakeProxy", "pki", "trusted") }, + RejectedCertificateStore = new CertificateTrustList { StorePath = Path.Combine(Path.GetTempPath(), "LmxFakeProxy", "pki", "rejected") } + }, + ClientConfiguration = new ClientConfiguration { DefaultSessionTimeout = 60000 }, + TransportQuotas = new TransportQuotas { OperationTimeout = 15000 } + }; + + var application = new ApplicationInstance + { + ApplicationName = "LmxFakeProxy", + ApplicationType = ApplicationType.Client, + ApplicationConfiguration = appConfig + }; + await application.CheckApplicationInstanceCertificate(false, 0); + appConfig.CertificateValidator.CertificateValidation += (_, e) => e.Accept = true; + + EndpointDescription? endpoint; + try + { +#pragma warning disable CS0618 + using var discoveryClient = DiscoveryClient.Create(new Uri(_endpointUrl)); + var endpoints = discoveryClient.GetEndpoints(null); +#pragma warning restore CS0618 + endpoint = endpoints + .Where(e => e.SecurityMode == MessageSecurityMode.None) + .FirstOrDefault() ?? endpoints.FirstOrDefault(); + } + catch + { + endpoint = new EndpointDescription(_endpointUrl); + } + + var endpointConfig = EndpointConfiguration.Create(appConfig); + var configuredEndpoint = new ConfiguredEndpoint(null, endpoint, endpointConfig); + + _session = await Session.Create( + appConfig, configuredEndpoint, false, + "LmxFakeProxy-Session", 60000, null, null, cancellationToken); + + _session.KeepAlive += OnSessionKeepAlive; + + _subscription = new Subscription(_session.DefaultSubscription) + { + DisplayName = "LmxFakeProxy", + PublishingEnabled = true, + PublishingInterval = 500, + KeepAliveCount = 10, + LifetimeCount = 30, + MaxNotificationsPerPublish = 1000 + }; + + _session.AddSubscription(_subscription); + await _subscription.CreateAsync(cancellationToken); + + _connected = true; + _logger.LogInformation("OPC UA bridge connected to {Endpoint}", _endpointUrl); + } + + public async Task ReadAsync(string nodeId, CancellationToken cancellationToken = default) + { + EnsureConnected(); + var readValue = new ReadValueId { NodeId = nodeId, AttributeId = Attributes.Value }; + var response = await _session!.ReadAsync( + null, 0, TimestampsToReturn.Source, + new ReadValueIdCollection { readValue }, cancellationToken); + var result = response.Results[0]; + return new OpcUaReadResult(result.Value, result.SourceTimestamp, result.StatusCode.Code); + } + + public async Task WriteAsync(string nodeId, object? value, CancellationToken cancellationToken = default) + { + EnsureConnected(); + var writeValue = new WriteValue + { + NodeId = nodeId, + AttributeId = Attributes.Value, + Value = new DataValue(new Variant(value)) + }; + var response = await _session!.WriteAsync( + null, new WriteValueCollection { writeValue }, cancellationToken); + return response.Results[0].Code; + } + + public async Task AddMonitoredItemsAsync( + IEnumerable nodeIds, int samplingIntervalMs, + Action onValueChanged, + CancellationToken cancellationToken = default) + { + EnsureConnected(); + var handle = Guid.NewGuid().ToString("N"); + var items = new List(); + + foreach (var nodeId in nodeIds) + { + var monitoredItem = new MonitoredItem(_subscription!.DefaultItem) + { + DisplayName = nodeId, + StartNodeId = nodeId, + AttributeId = Attributes.Value, + SamplingInterval = samplingIntervalMs, + QueueSize = 10, + DiscardOldest = true + }; + + monitoredItem.Notification += (item, e) => + { + if (e.NotificationValue is MonitoredItemNotification notification) + { + var val = notification.Value?.Value; + var ts = notification.Value?.SourceTimestamp ?? DateTime.UtcNow; + var sc = notification.Value?.StatusCode.Code ?? 0; + onValueChanged(nodeId, val, ts, sc); + } + }; + + items.Add(monitoredItem); + _subscription!.AddItem(monitoredItem); + } + + await _subscription!.ApplyChangesAsync(cancellationToken); + + lock (_lock) + { + _handleItems[handle] = items; + _handleCallbacks[handle] = onValueChanged; + } + + return handle; + } + + public async Task RemoveMonitoredItemsAsync(string handle, CancellationToken cancellationToken = default) + { + List? items; + lock (_lock) + { + if (!_handleItems.Remove(handle, out items)) + return; + _handleCallbacks.Remove(handle); + } + + if (_subscription != null) + { + foreach (var item in items) + _subscription.RemoveItem(item); + try { await _subscription.ApplyChangesAsync(cancellationToken); } + catch { /* best-effort during cleanup */ } + } + } + + private void OnSessionKeepAlive(Opc.Ua.Client.ISession session, KeepAliveEventArgs e) + { + if (ServiceResult.IsBad(e.Status)) + { + if (!_connected) return; + _connected = false; + _logger.LogWarning("OPC UA backend connection lost"); + Disconnected?.Invoke(); + StartReconnectLoop(); + } + } + + private void StartReconnectLoop() + { + if (_reconnecting) return; + _reconnecting = true; + _reconnectCts = new CancellationTokenSource(); + + _ = Task.Run(async () => + { + while (!_reconnectCts.Token.IsCancellationRequested) + { + await Task.Delay(5000, _reconnectCts.Token); + try + { + _logger.LogInformation("Attempting OPC UA reconnection..."); + if (_session != null) + { + _session.KeepAlive -= OnSessionKeepAlive; + try { await _session.CloseAsync(); } catch { } + _session = null; + _subscription = null; + } + + await ConnectAsync(_reconnectCts.Token); + + // Re-add monitored items for active handles + lock (_lock) + { + foreach (var (handle, callback) in _handleCallbacks) + { + if (_handleItems.TryGetValue(handle, out var oldItems)) + { + var nodeIds = oldItems.Select(i => i.StartNodeId.ToString()).ToList(); + var newItems = new List(); + foreach (var nodeId in nodeIds) + { + var monitoredItem = new MonitoredItem(_subscription!.DefaultItem) + { + DisplayName = nodeId, + StartNodeId = nodeId, + AttributeId = Attributes.Value, + SamplingInterval = oldItems[0].SamplingInterval, + QueueSize = 10, + DiscardOldest = true + }; + var capturedNodeId = nodeId; + var capturedCallback = callback; + monitoredItem.Notification += (item, ev) => + { + if (ev.NotificationValue is MonitoredItemNotification notification) + { + var val = notification.Value?.Value; + var ts = notification.Value?.SourceTimestamp ?? DateTime.UtcNow; + var sc = notification.Value?.StatusCode.Code ?? 0; + capturedCallback(capturedNodeId, val, ts, sc); + } + }; + newItems.Add(monitoredItem); + _subscription!.AddItem(monitoredItem); + } + _handleItems[handle] = newItems; + } + } + } + + if (_subscription != null) + await _subscription.ApplyChangesAsync(); + + _reconnecting = false; + _logger.LogInformation("OPC UA reconnection successful"); + Reconnected?.Invoke(); + return; + } + catch (Exception ex) + { + _logger.LogWarning(ex, "OPC UA reconnection attempt failed, retrying in 5s"); + } + } + }, _reconnectCts.Token); + } + + private void EnsureConnected() + { + if (!_connected || _session == null) + throw new InvalidOperationException("OPC UA backend unavailable"); + } + + public async ValueTask DisposeAsync() + { + _reconnectCts?.Cancel(); + _reconnectCts?.Dispose(); + if (_subscription != null) + { + try { await _subscription.DeleteAsync(true); } catch { } + _subscription = null; + } + if (_session != null) + { + _session.KeepAlive -= OnSessionKeepAlive; + try { await _session.CloseAsync(); } catch { } + _session = null; + } + _connected = false; + } +}