feat(infra): add IOpcUaBridge interface and OpcUaBridge with OPC UA reconnection
This commit is contained in:
25
infra/lmxfakeproxy/Bridge/IOpcUaBridge.cs
Normal file
25
infra/lmxfakeproxy/Bridge/IOpcUaBridge.cs
Normal file
@@ -0,0 +1,25 @@
|
||||
namespace LmxFakeProxy.Bridge;
|
||||
|
||||
public record OpcUaReadResult(object? Value, DateTime SourceTimestamp, uint StatusCode);
|
||||
|
||||
public interface IOpcUaBridge : IAsyncDisposable
|
||||
{
|
||||
bool IsConnected { get; }
|
||||
|
||||
Task ConnectAsync(CancellationToken cancellationToken = default);
|
||||
|
||||
Task<OpcUaReadResult> ReadAsync(string nodeId, CancellationToken cancellationToken = default);
|
||||
|
||||
Task<uint> WriteAsync(string nodeId, object? value, CancellationToken cancellationToken = default);
|
||||
|
||||
Task<string> AddMonitoredItemsAsync(
|
||||
IEnumerable<string> nodeIds,
|
||||
int samplingIntervalMs,
|
||||
Action<string, object?, DateTime, uint> onValueChanged,
|
||||
CancellationToken cancellationToken = default);
|
||||
|
||||
Task RemoveMonitoredItemsAsync(string handle, CancellationToken cancellationToken = default);
|
||||
|
||||
event Action? Disconnected;
|
||||
event Action? Reconnected;
|
||||
}
|
||||
306
infra/lmxfakeproxy/Bridge/OpcUaBridge.cs
Normal file
306
infra/lmxfakeproxy/Bridge/OpcUaBridge.cs
Normal file
@@ -0,0 +1,306 @@
|
||||
using Opc.Ua;
|
||||
using Opc.Ua.Client;
|
||||
using Opc.Ua.Configuration;
|
||||
|
||||
namespace LmxFakeProxy.Bridge;
|
||||
|
||||
public class OpcUaBridge : IOpcUaBridge
|
||||
{
|
||||
private readonly string _endpointUrl;
|
||||
private readonly ILogger<OpcUaBridge> _logger;
|
||||
private Opc.Ua.Client.ISession? _session;
|
||||
private Subscription? _subscription;
|
||||
private volatile bool _connected;
|
||||
private volatile bool _reconnecting;
|
||||
private CancellationTokenSource? _reconnectCts;
|
||||
|
||||
private readonly Dictionary<string, List<MonitoredItem>> _handleItems = new();
|
||||
private readonly Dictionary<string, Action<string, object?, DateTime, uint>> _handleCallbacks = new();
|
||||
private readonly object _lock = new();
|
||||
|
||||
public OpcUaBridge(string endpointUrl, ILogger<OpcUaBridge> logger)
|
||||
{
|
||||
_endpointUrl = endpointUrl;
|
||||
_logger = logger;
|
||||
}
|
||||
|
||||
public bool IsConnected => _connected;
|
||||
public event Action? Disconnected;
|
||||
public event Action? Reconnected;
|
||||
|
||||
public async Task ConnectAsync(CancellationToken cancellationToken = default)
|
||||
{
|
||||
var appConfig = new ApplicationConfiguration
|
||||
{
|
||||
ApplicationName = "LmxFakeProxy",
|
||||
ApplicationType = ApplicationType.Client,
|
||||
SecurityConfiguration = new SecurityConfiguration
|
||||
{
|
||||
AutoAcceptUntrustedCertificates = true,
|
||||
ApplicationCertificate = new CertificateIdentifier(),
|
||||
TrustedIssuerCertificates = new CertificateTrustList { StorePath = Path.Combine(Path.GetTempPath(), "LmxFakeProxy", "pki", "issuers") },
|
||||
TrustedPeerCertificates = new CertificateTrustList { StorePath = Path.Combine(Path.GetTempPath(), "LmxFakeProxy", "pki", "trusted") },
|
||||
RejectedCertificateStore = new CertificateTrustList { StorePath = Path.Combine(Path.GetTempPath(), "LmxFakeProxy", "pki", "rejected") }
|
||||
},
|
||||
ClientConfiguration = new ClientConfiguration { DefaultSessionTimeout = 60000 },
|
||||
TransportQuotas = new TransportQuotas { OperationTimeout = 15000 }
|
||||
};
|
||||
|
||||
var application = new ApplicationInstance
|
||||
{
|
||||
ApplicationName = "LmxFakeProxy",
|
||||
ApplicationType = ApplicationType.Client,
|
||||
ApplicationConfiguration = appConfig
|
||||
};
|
||||
await application.CheckApplicationInstanceCertificate(false, 0);
|
||||
appConfig.CertificateValidator.CertificateValidation += (_, e) => e.Accept = true;
|
||||
|
||||
EndpointDescription? endpoint;
|
||||
try
|
||||
{
|
||||
#pragma warning disable CS0618
|
||||
using var discoveryClient = DiscoveryClient.Create(new Uri(_endpointUrl));
|
||||
var endpoints = discoveryClient.GetEndpoints(null);
|
||||
#pragma warning restore CS0618
|
||||
endpoint = endpoints
|
||||
.Where(e => e.SecurityMode == MessageSecurityMode.None)
|
||||
.FirstOrDefault() ?? endpoints.FirstOrDefault();
|
||||
}
|
||||
catch
|
||||
{
|
||||
endpoint = new EndpointDescription(_endpointUrl);
|
||||
}
|
||||
|
||||
var endpointConfig = EndpointConfiguration.Create(appConfig);
|
||||
var configuredEndpoint = new ConfiguredEndpoint(null, endpoint, endpointConfig);
|
||||
|
||||
_session = await Session.Create(
|
||||
appConfig, configuredEndpoint, false,
|
||||
"LmxFakeProxy-Session", 60000, null, null, cancellationToken);
|
||||
|
||||
_session.KeepAlive += OnSessionKeepAlive;
|
||||
|
||||
_subscription = new Subscription(_session.DefaultSubscription)
|
||||
{
|
||||
DisplayName = "LmxFakeProxy",
|
||||
PublishingEnabled = true,
|
||||
PublishingInterval = 500,
|
||||
KeepAliveCount = 10,
|
||||
LifetimeCount = 30,
|
||||
MaxNotificationsPerPublish = 1000
|
||||
};
|
||||
|
||||
_session.AddSubscription(_subscription);
|
||||
await _subscription.CreateAsync(cancellationToken);
|
||||
|
||||
_connected = true;
|
||||
_logger.LogInformation("OPC UA bridge connected to {Endpoint}", _endpointUrl);
|
||||
}
|
||||
|
||||
public async Task<OpcUaReadResult> ReadAsync(string nodeId, CancellationToken cancellationToken = default)
|
||||
{
|
||||
EnsureConnected();
|
||||
var readValue = new ReadValueId { NodeId = nodeId, AttributeId = Attributes.Value };
|
||||
var response = await _session!.ReadAsync(
|
||||
null, 0, TimestampsToReturn.Source,
|
||||
new ReadValueIdCollection { readValue }, cancellationToken);
|
||||
var result = response.Results[0];
|
||||
return new OpcUaReadResult(result.Value, result.SourceTimestamp, result.StatusCode.Code);
|
||||
}
|
||||
|
||||
public async Task<uint> WriteAsync(string nodeId, object? value, CancellationToken cancellationToken = default)
|
||||
{
|
||||
EnsureConnected();
|
||||
var writeValue = new WriteValue
|
||||
{
|
||||
NodeId = nodeId,
|
||||
AttributeId = Attributes.Value,
|
||||
Value = new DataValue(new Variant(value))
|
||||
};
|
||||
var response = await _session!.WriteAsync(
|
||||
null, new WriteValueCollection { writeValue }, cancellationToken);
|
||||
return response.Results[0].Code;
|
||||
}
|
||||
|
||||
public async Task<string> AddMonitoredItemsAsync(
|
||||
IEnumerable<string> nodeIds, int samplingIntervalMs,
|
||||
Action<string, object?, DateTime, uint> onValueChanged,
|
||||
CancellationToken cancellationToken = default)
|
||||
{
|
||||
EnsureConnected();
|
||||
var handle = Guid.NewGuid().ToString("N");
|
||||
var items = new List<MonitoredItem>();
|
||||
|
||||
foreach (var nodeId in nodeIds)
|
||||
{
|
||||
var monitoredItem = new MonitoredItem(_subscription!.DefaultItem)
|
||||
{
|
||||
DisplayName = nodeId,
|
||||
StartNodeId = nodeId,
|
||||
AttributeId = Attributes.Value,
|
||||
SamplingInterval = samplingIntervalMs,
|
||||
QueueSize = 10,
|
||||
DiscardOldest = true
|
||||
};
|
||||
|
||||
monitoredItem.Notification += (item, e) =>
|
||||
{
|
||||
if (e.NotificationValue is MonitoredItemNotification notification)
|
||||
{
|
||||
var val = notification.Value?.Value;
|
||||
var ts = notification.Value?.SourceTimestamp ?? DateTime.UtcNow;
|
||||
var sc = notification.Value?.StatusCode.Code ?? 0;
|
||||
onValueChanged(nodeId, val, ts, sc);
|
||||
}
|
||||
};
|
||||
|
||||
items.Add(monitoredItem);
|
||||
_subscription!.AddItem(monitoredItem);
|
||||
}
|
||||
|
||||
await _subscription!.ApplyChangesAsync(cancellationToken);
|
||||
|
||||
lock (_lock)
|
||||
{
|
||||
_handleItems[handle] = items;
|
||||
_handleCallbacks[handle] = onValueChanged;
|
||||
}
|
||||
|
||||
return handle;
|
||||
}
|
||||
|
||||
public async Task RemoveMonitoredItemsAsync(string handle, CancellationToken cancellationToken = default)
|
||||
{
|
||||
List<MonitoredItem>? items;
|
||||
lock (_lock)
|
||||
{
|
||||
if (!_handleItems.Remove(handle, out items))
|
||||
return;
|
||||
_handleCallbacks.Remove(handle);
|
||||
}
|
||||
|
||||
if (_subscription != null)
|
||||
{
|
||||
foreach (var item in items)
|
||||
_subscription.RemoveItem(item);
|
||||
try { await _subscription.ApplyChangesAsync(cancellationToken); }
|
||||
catch { /* best-effort during cleanup */ }
|
||||
}
|
||||
}
|
||||
|
||||
private void OnSessionKeepAlive(Opc.Ua.Client.ISession session, KeepAliveEventArgs e)
|
||||
{
|
||||
if (ServiceResult.IsBad(e.Status))
|
||||
{
|
||||
if (!_connected) return;
|
||||
_connected = false;
|
||||
_logger.LogWarning("OPC UA backend connection lost");
|
||||
Disconnected?.Invoke();
|
||||
StartReconnectLoop();
|
||||
}
|
||||
}
|
||||
|
||||
private void StartReconnectLoop()
|
||||
{
|
||||
if (_reconnecting) return;
|
||||
_reconnecting = true;
|
||||
_reconnectCts = new CancellationTokenSource();
|
||||
|
||||
_ = Task.Run(async () =>
|
||||
{
|
||||
while (!_reconnectCts.Token.IsCancellationRequested)
|
||||
{
|
||||
await Task.Delay(5000, _reconnectCts.Token);
|
||||
try
|
||||
{
|
||||
_logger.LogInformation("Attempting OPC UA reconnection...");
|
||||
if (_session != null)
|
||||
{
|
||||
_session.KeepAlive -= OnSessionKeepAlive;
|
||||
try { await _session.CloseAsync(); } catch { }
|
||||
_session = null;
|
||||
_subscription = null;
|
||||
}
|
||||
|
||||
await ConnectAsync(_reconnectCts.Token);
|
||||
|
||||
// Re-add monitored items for active handles
|
||||
lock (_lock)
|
||||
{
|
||||
foreach (var (handle, callback) in _handleCallbacks)
|
||||
{
|
||||
if (_handleItems.TryGetValue(handle, out var oldItems))
|
||||
{
|
||||
var nodeIds = oldItems.Select(i => i.StartNodeId.ToString()).ToList();
|
||||
var newItems = new List<MonitoredItem>();
|
||||
foreach (var nodeId in nodeIds)
|
||||
{
|
||||
var monitoredItem = new MonitoredItem(_subscription!.DefaultItem)
|
||||
{
|
||||
DisplayName = nodeId,
|
||||
StartNodeId = nodeId,
|
||||
AttributeId = Attributes.Value,
|
||||
SamplingInterval = oldItems[0].SamplingInterval,
|
||||
QueueSize = 10,
|
||||
DiscardOldest = true
|
||||
};
|
||||
var capturedNodeId = nodeId;
|
||||
var capturedCallback = callback;
|
||||
monitoredItem.Notification += (item, ev) =>
|
||||
{
|
||||
if (ev.NotificationValue is MonitoredItemNotification notification)
|
||||
{
|
||||
var val = notification.Value?.Value;
|
||||
var ts = notification.Value?.SourceTimestamp ?? DateTime.UtcNow;
|
||||
var sc = notification.Value?.StatusCode.Code ?? 0;
|
||||
capturedCallback(capturedNodeId, val, ts, sc);
|
||||
}
|
||||
};
|
||||
newItems.Add(monitoredItem);
|
||||
_subscription!.AddItem(monitoredItem);
|
||||
}
|
||||
_handleItems[handle] = newItems;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (_subscription != null)
|
||||
await _subscription.ApplyChangesAsync();
|
||||
|
||||
_reconnecting = false;
|
||||
_logger.LogInformation("OPC UA reconnection successful");
|
||||
Reconnected?.Invoke();
|
||||
return;
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogWarning(ex, "OPC UA reconnection attempt failed, retrying in 5s");
|
||||
}
|
||||
}
|
||||
}, _reconnectCts.Token);
|
||||
}
|
||||
|
||||
private void EnsureConnected()
|
||||
{
|
||||
if (!_connected || _session == null)
|
||||
throw new InvalidOperationException("OPC UA backend unavailable");
|
||||
}
|
||||
|
||||
public async ValueTask DisposeAsync()
|
||||
{
|
||||
_reconnectCts?.Cancel();
|
||||
_reconnectCts?.Dispose();
|
||||
if (_subscription != null)
|
||||
{
|
||||
try { await _subscription.DeleteAsync(true); } catch { }
|
||||
_subscription = null;
|
||||
}
|
||||
if (_session != null)
|
||||
{
|
||||
_session.KeepAlive -= OnSessionKeepAlive;
|
||||
try { await _session.CloseAsync(); } catch { }
|
||||
_session = null;
|
||||
}
|
||||
_connected = false;
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user