Files
ScadaBridge/src/ScadaLink.DataConnectionLayer/Adapters/LmxProxyDataConnection.cs
T
Joseph Doherty e837eae2cc feat: wire real LmxProxy gRPC client into Data Connection Layer
Replace stub ILmxProxyClient with production proto-generated gRPC client
(RealLmxProxyClient) that connects to LmxProxy servers with x-api-key
metadata header authentication. Includes pre-generated proto stubs for
ARM64 Docker compatibility, updated adapter with proper quality mapping
(Good/Uncertain/Bad), subscription via server-streaming RPC, and 20 unit
tests covering all operations. Updated Component-DataConnectionLayer.md
to reflect the actual implementation.
2026-03-18 11:57:18 -04:00

210 lines
7.5 KiB
C#

using Microsoft.Extensions.Logging;
using ScadaLink.Commons.Interfaces.Protocol;
using ScadaLink.Commons.Types.Enums;
namespace ScadaLink.DataConnectionLayer.Adapters;
/// <summary>
/// LmxProxy adapter implementing IDataConnection.
/// Maps IDataConnection operations to the LmxProxy SDK client.
///
/// LmxProxy-specific behavior:
/// - Session-based connection with automatic 30s keep-alive (managed by SDK)
/// - gRPC streaming for subscriptions via ILmxSubscription handles
/// - API key authentication via x-api-key gRPC metadata header
/// </summary>
public class LmxProxyDataConnection : IDataConnection
{
private readonly ILmxProxyClientFactory _clientFactory;
private readonly ILogger<LmxProxyDataConnection> _logger;
private ILmxProxyClient? _client;
private string _host = "localhost";
private int _port = 50051;
private ConnectionHealth _status = ConnectionHealth.Disconnected;
private readonly Dictionary<string, ILmxSubscription> _subscriptions = new();
public LmxProxyDataConnection(ILmxProxyClientFactory clientFactory, ILogger<LmxProxyDataConnection> logger)
{
_clientFactory = clientFactory;
_logger = logger;
}
public ConnectionHealth Status => _status;
public async Task ConnectAsync(IDictionary<string, string> connectionDetails, CancellationToken cancellationToken = default)
{
_host = connectionDetails.TryGetValue("Host", out var host) ? host : "localhost";
if (connectionDetails.TryGetValue("Port", out var portStr) && int.TryParse(portStr, out var port))
_port = port;
connectionDetails.TryGetValue("ApiKey", out var apiKey);
_status = ConnectionHealth.Connecting;
_client = _clientFactory.Create(_host, _port, apiKey);
await _client.ConnectAsync(cancellationToken);
_status = ConnectionHealth.Connected;
_logger.LogInformation("LmxProxy connected to {Host}:{Port}", _host, _port);
}
public async Task DisconnectAsync(CancellationToken cancellationToken = default)
{
if (_client != null)
{
await _client.DisconnectAsync();
_status = ConnectionHealth.Disconnected;
_logger.LogInformation("LmxProxy disconnected from {Host}:{Port}", _host, _port);
}
}
public async Task<ReadResult> ReadAsync(string tagPath, CancellationToken cancellationToken = default)
{
EnsureConnected();
var vtq = await _client!.ReadAsync(tagPath, cancellationToken);
var quality = MapQuality(vtq.Quality);
var tagValue = new TagValue(vtq.Value, quality, new DateTimeOffset(vtq.TimestampUtc, TimeSpan.Zero));
return vtq.Quality == LmxQuality.Bad
? new ReadResult(false, tagValue, "LmxProxy read returned bad quality")
: new ReadResult(true, tagValue, null);
}
public async Task<IReadOnlyDictionary<string, ReadResult>> ReadBatchAsync(IEnumerable<string> tagPaths, CancellationToken cancellationToken = default)
{
EnsureConnected();
var vtqs = await _client!.ReadBatchAsync(tagPaths, cancellationToken);
var results = new Dictionary<string, ReadResult>();
foreach (var (tag, vtq) in vtqs)
{
var quality = MapQuality(vtq.Quality);
var tagValue = new TagValue(vtq.Value, quality, new DateTimeOffset(vtq.TimestampUtc, TimeSpan.Zero));
results[tag] = vtq.Quality == LmxQuality.Bad
? new ReadResult(false, tagValue, "LmxProxy read returned bad quality")
: new ReadResult(true, tagValue, null);
}
return results;
}
public async Task<WriteResult> WriteAsync(string tagPath, object? value, CancellationToken cancellationToken = default)
{
EnsureConnected();
try
{
await _client!.WriteAsync(tagPath, value!, cancellationToken);
return new WriteResult(true, null);
}
catch (Exception ex)
{
return new WriteResult(false, ex.Message);
}
}
public async Task<IReadOnlyDictionary<string, WriteResult>> WriteBatchAsync(IDictionary<string, object?> values, CancellationToken cancellationToken = default)
{
EnsureConnected();
try
{
var nonNullValues = values.Where(kv => kv.Value != null)
.ToDictionary(kv => kv.Key, kv => kv.Value!);
await _client!.WriteBatchAsync(nonNullValues, cancellationToken);
return values.Keys.ToDictionary(k => k, _ => new WriteResult(true, null))
as IReadOnlyDictionary<string, WriteResult>;
}
catch (Exception ex)
{
return values.Keys.ToDictionary(k => k, _ => new WriteResult(false, ex.Message))
as IReadOnlyDictionary<string, WriteResult>;
}
}
public async Task<bool> WriteBatchAndWaitAsync(
IDictionary<string, object?> values, string flagPath, object? flagValue,
string responsePath, object? responseValue, TimeSpan timeout,
CancellationToken cancellationToken = default)
{
var allValues = new Dictionary<string, object?>(values) { [flagPath] = flagValue };
var writeResults = await WriteBatchAsync(allValues, cancellationToken);
if (writeResults.Values.Any(r => !r.Success))
return false;
var deadline = DateTimeOffset.UtcNow + timeout;
while (DateTimeOffset.UtcNow < deadline)
{
cancellationToken.ThrowIfCancellationRequested();
var readResult = await ReadAsync(responsePath, cancellationToken);
if (readResult.Success && readResult.Value != null && Equals(readResult.Value.Value, responseValue))
return true;
await Task.Delay(100, cancellationToken);
}
return false;
}
public async Task<string> SubscribeAsync(string tagPath, SubscriptionCallback callback, CancellationToken cancellationToken = default)
{
EnsureConnected();
var subscription = await _client!.SubscribeAsync(
[tagPath],
(path, vtq) =>
{
var quality = MapQuality(vtq.Quality);
callback(path, new TagValue(vtq.Value, quality, new DateTimeOffset(vtq.TimestampUtc, TimeSpan.Zero)));
},
cancellationToken);
var subscriptionId = Guid.NewGuid().ToString("N");
_subscriptions[subscriptionId] = subscription;
return subscriptionId;
}
public async Task UnsubscribeAsync(string subscriptionId, CancellationToken cancellationToken = default)
{
if (_subscriptions.Remove(subscriptionId, out var subscription))
{
await subscription.DisposeAsync();
}
}
public async ValueTask DisposeAsync()
{
foreach (var subscription in _subscriptions.Values)
{
try { await subscription.DisposeAsync(); }
catch { /* best-effort cleanup */ }
}
_subscriptions.Clear();
if (_client != null)
{
await _client.DisposeAsync();
_client = null;
}
_status = ConnectionHealth.Disconnected;
}
private void EnsureConnected()
{
if (_client == null || !_client.IsConnected)
throw new InvalidOperationException("LmxProxy client is not connected.");
}
private static QualityCode MapQuality(LmxQuality quality) => quality switch
{
LmxQuality.Good => QualityCode.Good,
LmxQuality.Uncertain => QualityCode.Uncertain,
LmxQuality.Bad => QualityCode.Bad,
_ => QualityCode.Bad
};
}