LmxProxy is no longer needed. Moved the entire lmxproxy/ workspace, DCL adapter files, and related docs to deprecated/. Removed LmxProxy registration from DataConnectionFactory, project reference from DCL, protocol option from UI, and cleaned up all requirement docs.
574 lines
21 KiB
C#
574 lines
21 KiB
C#
using System;
|
|
using System.Collections.Generic;
|
|
using System.Diagnostics;
|
|
using System.Linq;
|
|
using System.Threading;
|
|
using System.Threading.Tasks;
|
|
using Grpc.Net.Client;
|
|
using Microsoft.Extensions.Logging;
|
|
using Microsoft.Extensions.Logging.Abstractions;
|
|
using Polly;
|
|
using ZB.MOM.WW.LmxProxy.Client.Domain;
|
|
using ZB.MOM.WW.LmxProxy.Client.Security;
|
|
|
|
namespace ZB.MOM.WW.LmxProxy.Client
|
|
{
|
|
/// <summary>
|
|
/// Client for communicating with the LmxProxy gRPC service using protobuf-net.Grpc code-first
|
|
/// </summary>
|
|
public partial class LmxProxyClient : ILmxProxyClient
|
|
{
|
|
private static readonly string Http2InsecureSwitch = "System.Net.Http.SocketsHttpHandler.Http2UnencryptedSupport";
|
|
private readonly ILogger<LmxProxyClient> _logger;
|
|
private readonly string _host;
|
|
private readonly int _port;
|
|
private readonly string? _apiKey;
|
|
private GrpcChannel? _channel;
|
|
private IScadaService? _client;
|
|
private string _sessionId = string.Empty;
|
|
private readonly SemaphoreSlim _connectionLock = new(1, 1);
|
|
private readonly List<ISubscription> _activeSubscriptions = [];
|
|
private readonly Lock _subscriptionLock = new();
|
|
private bool _disposed;
|
|
private bool _isConnected;
|
|
private TimeSpan _defaultTimeout = TimeSpan.FromSeconds(30);
|
|
private ClientConfiguration? _configuration;
|
|
private IAsyncPolicy? _retryPolicy;
|
|
private readonly ClientMetrics _metrics = new();
|
|
private Timer? _keepAliveTimer;
|
|
private readonly TimeSpan _keepAliveInterval = TimeSpan.FromSeconds(30);
|
|
private readonly ClientTlsConfiguration? _tlsConfiguration;
|
|
|
|
static LmxProxyClient()
|
|
{
|
|
AppContext.SetSwitch(Http2InsecureSwitch, true);
|
|
}
|
|
|
|
/// <summary>
|
|
/// Gets or sets the default timeout for operations
|
|
/// </summary>
|
|
public TimeSpan DefaultTimeout
|
|
{
|
|
get => _defaultTimeout;
|
|
set
|
|
{
|
|
if (value <= TimeSpan.Zero)
|
|
throw new ArgumentOutOfRangeException(nameof(value), "Timeout must be positive");
|
|
if (value > TimeSpan.FromMinutes(10))
|
|
throw new ArgumentOutOfRangeException(nameof(value), "Timeout cannot exceed 10 minutes");
|
|
_defaultTimeout = value;
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// Initializes a new instance of the LmxProxyClient
|
|
/// </summary>
|
|
/// <param name="host">The host address of the LmxProxy service</param>
|
|
/// <param name="port">The port of the LmxProxy service</param>
|
|
/// <param name="apiKey">The API key for authentication</param>
|
|
/// <param name="logger">Optional logger instance</param>
|
|
public LmxProxyClient(string host, int port, string? apiKey = null, ILogger<LmxProxyClient>? logger = null)
|
|
: this(host, port, apiKey, null, logger)
|
|
{
|
|
}
|
|
|
|
/// <summary>
|
|
/// Creates a new instance of the LmxProxyClient with TLS configuration
|
|
/// </summary>
|
|
/// <param name="host">The host address of the LmxProxy service</param>
|
|
/// <param name="port">The port of the LmxProxy service</param>
|
|
/// <param name="apiKey">The API key for authentication</param>
|
|
/// <param name="tlsConfiguration">TLS configuration for secure connections</param>
|
|
/// <param name="logger">Optional logger instance</param>
|
|
public LmxProxyClient(string host, int port, string? apiKey, ClientTlsConfiguration? tlsConfiguration, ILogger<LmxProxyClient>? logger = null)
|
|
{
|
|
if (string.IsNullOrWhiteSpace(host))
|
|
throw new ArgumentException("Host cannot be null or empty", nameof(host));
|
|
if (port < 1 || port > 65535)
|
|
throw new ArgumentOutOfRangeException(nameof(port), "Port must be between 1 and 65535");
|
|
|
|
_host = host;
|
|
_port = port;
|
|
_apiKey = apiKey;
|
|
_tlsConfiguration = tlsConfiguration;
|
|
_logger = logger ?? NullLogger<LmxProxyClient>.Instance;
|
|
}
|
|
|
|
/// <summary>
|
|
/// Gets whether the client is connected to the service
|
|
/// </summary>
|
|
public bool IsConnected => !_disposed && _isConnected && !string.IsNullOrEmpty(_sessionId);
|
|
|
|
/// <summary>
|
|
/// Asynchronously checks if the client is connected with proper synchronization
|
|
/// </summary>
|
|
public async Task<bool> IsConnectedAsync()
|
|
{
|
|
await _connectionLock.WaitAsync();
|
|
try
|
|
{
|
|
return !_disposed && _client != null && _isConnected && !string.IsNullOrEmpty(_sessionId);
|
|
}
|
|
finally
|
|
{
|
|
_connectionLock.Release();
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// Sets the builder configuration (internal use)
|
|
/// </summary>
|
|
/// <param name="configuration">The client configuration.</param>
|
|
internal void SetBuilderConfiguration(ClientConfiguration configuration)
|
|
{
|
|
_configuration = configuration;
|
|
|
|
// Setup retry policy if configured
|
|
if (configuration.MaxRetryAttempts > 0)
|
|
{
|
|
_retryPolicy = Policy
|
|
.Handle<Exception>(IsTransientError)
|
|
.WaitAndRetryAsync(
|
|
configuration.MaxRetryAttempts,
|
|
retryAttempt => configuration.RetryDelay * Math.Pow(2, retryAttempt - 1),
|
|
onRetry: (exception, timeSpan, retryCount, context) =>
|
|
{
|
|
object? correlationId = context.GetValueOrDefault("CorrelationId", "N/A");
|
|
_logger.LogWarning(exception,
|
|
"Retry {RetryCount} after {Delay}ms. CorrelationId: {CorrelationId}",
|
|
retryCount, timeSpan.TotalMilliseconds, correlationId);
|
|
});
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// Reads a single tag value
|
|
/// </summary>
|
|
/// <param name="address">The tag address to read.</param>
|
|
/// <param name="cancellationToken">Cancellation token.</param>
|
|
public async Task<Vtq> ReadAsync(string address, CancellationToken cancellationToken = default)
|
|
{
|
|
if (string.IsNullOrEmpty(address))
|
|
throw new ArgumentNullException(nameof(address));
|
|
|
|
EnsureConnected();
|
|
|
|
string correlationId = GenerateCorrelationId();
|
|
var stopwatch = Stopwatch.StartNew();
|
|
|
|
try
|
|
{
|
|
_metrics.IncrementOperationCount("Read");
|
|
|
|
var request = new ReadRequest
|
|
{
|
|
SessionId = _sessionId,
|
|
Tag = address
|
|
};
|
|
|
|
ReadResponse response = await ExecuteWithRetryAsync(async () =>
|
|
await _client!.ReadAsync(request),
|
|
correlationId);
|
|
|
|
if (!response.Success)
|
|
{
|
|
_metrics.IncrementErrorCount("Read");
|
|
throw new InvalidOperationException($"Read failed for tag '{address}': {response.Message}. CorrelationId: {correlationId}");
|
|
}
|
|
|
|
_metrics.RecordLatency("Read", stopwatch.ElapsedMilliseconds);
|
|
return ConvertToVtq(address, response.Vtq);
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_metrics.IncrementErrorCount("Read");
|
|
_logger.LogError(ex, "Read operation failed for tag: {Tag}, CorrelationId: {CorrelationId}",
|
|
address, correlationId);
|
|
throw;
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// Reads multiple tag values
|
|
/// </summary>
|
|
/// <param name="addresses">The tag addresses to read.</param>
|
|
/// <param name="cancellationToken">Cancellation token.</param>
|
|
public async Task<IDictionary<string, Vtq>> ReadBatchAsync(IEnumerable<string> addresses, CancellationToken cancellationToken = default)
|
|
{
|
|
ArgumentNullException.ThrowIfNull(addresses);
|
|
|
|
var addressList = addresses.ToList();
|
|
if (!addressList.Any())
|
|
throw new ArgumentException("At least one address must be provided", nameof(addresses));
|
|
|
|
EnsureConnected();
|
|
|
|
var request = new ReadBatchRequest
|
|
{
|
|
SessionId = _sessionId,
|
|
Tags = addressList
|
|
};
|
|
|
|
ReadBatchResponse response = await _client!.ReadBatchAsync(request);
|
|
|
|
if (!response.Success)
|
|
throw new InvalidOperationException($"ReadBatch failed: {response.Message}");
|
|
|
|
var results = new Dictionary<string, Vtq>();
|
|
foreach (VtqMessage vtq in response.Vtqs)
|
|
{
|
|
results[vtq.Tag] = ConvertToVtq(vtq.Tag, vtq);
|
|
}
|
|
return results;
|
|
}
|
|
|
|
/// <summary>
|
|
/// Writes a single tag value
|
|
/// </summary>
|
|
/// <param name="address">The tag address to write.</param>
|
|
/// <param name="value">The value to write.</param>
|
|
/// <param name="cancellationToken">Cancellation token.</param>
|
|
public async Task WriteAsync(string address, object value, CancellationToken cancellationToken = default)
|
|
{
|
|
if (string.IsNullOrEmpty(address))
|
|
throw new ArgumentNullException(nameof(address));
|
|
ArgumentNullException.ThrowIfNull(value);
|
|
|
|
EnsureConnected();
|
|
|
|
var request = new WriteRequest
|
|
{
|
|
SessionId = _sessionId,
|
|
Tag = address,
|
|
Value = ConvertToString(value)
|
|
};
|
|
|
|
WriteResponse response = await _client!.WriteAsync(request);
|
|
|
|
if (!response.Success)
|
|
throw new InvalidOperationException($"Write failed: {response.Message}");
|
|
}
|
|
|
|
/// <summary>
|
|
/// Writes multiple tag values
|
|
/// </summary>
|
|
/// <param name="values">The tag addresses and values to write.</param>
|
|
/// <param name="cancellationToken">Cancellation token.</param>
|
|
public async Task WriteBatchAsync(IDictionary<string, object> values, CancellationToken cancellationToken = default)
|
|
{
|
|
if (values == null || !values.Any())
|
|
throw new ArgumentException("At least one value must be provided", nameof(values));
|
|
|
|
EnsureConnected();
|
|
|
|
var request = new WriteBatchRequest
|
|
{
|
|
SessionId = _sessionId,
|
|
Items = values.Select(kvp => new WriteItem
|
|
{
|
|
Tag = kvp.Key,
|
|
Value = ConvertToString(kvp.Value)
|
|
}).ToList()
|
|
};
|
|
|
|
WriteBatchResponse response = await _client!.WriteBatchAsync(request);
|
|
|
|
if (!response.Success)
|
|
throw new InvalidOperationException($"WriteBatch failed: {response.Message}");
|
|
}
|
|
|
|
/// <summary>
|
|
/// Writes values and waits for a condition to be met
|
|
/// </summary>
|
|
/// <param name="values">The tag addresses and values to write.</param>
|
|
/// <param name="flagAddress">The flag address to write.</param>
|
|
/// <param name="flagValue">The flag value to write.</param>
|
|
/// <param name="responseAddress">The response address to monitor.</param>
|
|
/// <param name="responseValue">The expected response value.</param>
|
|
/// <param name="timeoutSeconds">Timeout in seconds.</param>
|
|
/// <param name="cancellationToken">Cancellation token.</param>
|
|
public async Task<bool> WriteBatchAndWaitAsync(
|
|
IDictionary<string, object> values,
|
|
string flagAddress,
|
|
object flagValue,
|
|
string responseAddress,
|
|
object responseValue,
|
|
int timeoutSeconds = 30,
|
|
CancellationToken cancellationToken = default)
|
|
{
|
|
if (values == null || !values.Any())
|
|
throw new ArgumentException("At least one value must be provided", nameof(values));
|
|
|
|
EnsureConnected();
|
|
|
|
var request = new WriteBatchAndWaitRequest
|
|
{
|
|
SessionId = _sessionId,
|
|
Items = values.Select(kvp => new WriteItem
|
|
{
|
|
Tag = kvp.Key,
|
|
Value = ConvertToString(kvp.Value)
|
|
}).ToList(),
|
|
FlagTag = flagAddress,
|
|
FlagValue = ConvertToString(flagValue),
|
|
TimeoutMs = timeoutSeconds * 1000,
|
|
PollIntervalMs = 100
|
|
};
|
|
|
|
WriteBatchAndWaitResponse response = await _client!.WriteBatchAndWaitAsync(request);
|
|
|
|
if (!response.Success)
|
|
throw new InvalidOperationException($"WriteBatchAndWait failed: {response.Message}");
|
|
|
|
return response.FlagReached;
|
|
}
|
|
|
|
/// <summary>
|
|
/// Checks the validity and permissions of the current API key
|
|
/// </summary>
|
|
/// <param name="cancellationToken">Cancellation token.</param>
|
|
public async Task<ApiKeyInfo> CheckApiKeyAsync(CancellationToken cancellationToken = default)
|
|
{
|
|
EnsureConnected();
|
|
|
|
var request = new CheckApiKeyRequest { ApiKey = _apiKey ?? string.Empty };
|
|
CheckApiKeyResponse response = await _client!.CheckApiKeyAsync(request);
|
|
|
|
return new ApiKeyInfo(
|
|
response.IsValid,
|
|
"ReadWrite", // Code-first contract doesn't return role
|
|
response.Message);
|
|
}
|
|
|
|
/// <summary>
|
|
/// Subscribes to tag value changes
|
|
/// </summary>
|
|
/// <param name="addresses">The tag addresses to subscribe to.</param>
|
|
/// <param name="onUpdate">Callback invoked when tag values change.</param>
|
|
/// <param name="cancellationToken">Cancellation token.</param>
|
|
public Task<ISubscription> SubscribeAsync(
|
|
IEnumerable<string> addresses,
|
|
Action<string, Vtq> onUpdate,
|
|
CancellationToken cancellationToken = default)
|
|
{
|
|
List<string> addressList = addresses?.ToList() ?? throw new ArgumentNullException(nameof(addresses));
|
|
if (!addressList.Any())
|
|
throw new ArgumentException("At least one address must be provided", nameof(addresses));
|
|
ArgumentNullException.ThrowIfNull(onUpdate);
|
|
|
|
EnsureConnected();
|
|
|
|
var subscription = new CodeFirstSubscription(_client!, _sessionId, addressList, onUpdate, _logger, RemoveSubscription);
|
|
|
|
// Track the subscription
|
|
lock (_subscriptionLock)
|
|
{
|
|
_activeSubscriptions.Add(subscription);
|
|
}
|
|
|
|
// Start processing updates
|
|
Task startTask = subscription.StartAsync(cancellationToken);
|
|
|
|
// Log any startup errors but don't throw
|
|
startTask.ContinueWith(t =>
|
|
{
|
|
if (t.IsFaulted)
|
|
{
|
|
_logger.LogError(t.Exception, "Subscription startup failed");
|
|
}
|
|
}, TaskContinuationOptions.OnlyOnFaulted);
|
|
|
|
return Task.FromResult<ISubscription>(subscription);
|
|
}
|
|
|
|
private void EnsureConnected()
|
|
{
|
|
if (_disposed)
|
|
throw new ObjectDisposedException(nameof(LmxProxyClient));
|
|
if (_client == null || !_isConnected || string.IsNullOrEmpty(_sessionId))
|
|
throw new InvalidOperationException("Client is not connected. Call ConnectAsync first.");
|
|
}
|
|
|
|
private static Vtq ConvertToVtq(string tag, VtqMessage? vtqMessage)
|
|
{
|
|
if (vtqMessage == null)
|
|
return new Vtq(null, DateTime.UtcNow, Quality.Bad);
|
|
|
|
// Parse the string value
|
|
object? value = vtqMessage.Value;
|
|
if (!string.IsNullOrEmpty(vtqMessage.Value))
|
|
{
|
|
// Try to parse as numeric types
|
|
if (double.TryParse(vtqMessage.Value, out double doubleVal))
|
|
value = doubleVal;
|
|
else if (bool.TryParse(vtqMessage.Value, out bool boolVal))
|
|
value = boolVal;
|
|
else
|
|
value = vtqMessage.Value;
|
|
}
|
|
|
|
var timestamp = new DateTime(vtqMessage.TimestampUtcTicks, DateTimeKind.Utc);
|
|
Quality quality = vtqMessage.Quality?.ToUpperInvariant() switch
|
|
{
|
|
"GOOD" => Quality.Good,
|
|
"UNCERTAIN" => Quality.Uncertain,
|
|
_ => Quality.Bad
|
|
};
|
|
|
|
return new Vtq(value, timestamp, quality);
|
|
}
|
|
|
|
private static string ConvertToString(object value)
|
|
{
|
|
if (value == null)
|
|
return string.Empty;
|
|
|
|
return value switch
|
|
{
|
|
DateTime dt => dt.ToUniversalTime().ToString("O"),
|
|
DateTimeOffset dto => dto.ToString("O"),
|
|
bool b => b.ToString().ToLowerInvariant(),
|
|
_ => value.ToString() ?? string.Empty
|
|
};
|
|
}
|
|
|
|
/// <summary>
|
|
/// Removes a subscription from the active tracking list
|
|
/// </summary>
|
|
private void RemoveSubscription(ISubscription subscription)
|
|
{
|
|
lock (_subscriptionLock)
|
|
{
|
|
_activeSubscriptions.Remove(subscription);
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// Disposes of the client and closes the connection
|
|
/// </summary>
|
|
public void Dispose()
|
|
{
|
|
if (_disposed)
|
|
{
|
|
return;
|
|
}
|
|
|
|
DisposeAsync().AsTask().GetAwaiter().GetResult();
|
|
GC.SuppressFinalize(this);
|
|
}
|
|
|
|
/// <summary>
|
|
/// Asynchronously disposes of the client and closes the connection
|
|
/// </summary>
|
|
public async ValueTask DisposeAsync()
|
|
{
|
|
if (_disposed)
|
|
return;
|
|
|
|
_disposed = true;
|
|
|
|
await DisposeCoreAsync().ConfigureAwait(false);
|
|
_connectionLock.Dispose();
|
|
GC.SuppressFinalize(this);
|
|
}
|
|
|
|
/// <summary>
|
|
/// Protected disposal implementation
|
|
/// </summary>
|
|
/// <param name="disposing">True if disposing managed resources.</param>
|
|
protected virtual void Dispose(bool disposing)
|
|
{
|
|
if (!disposing || _disposed)
|
|
return;
|
|
|
|
_disposed = true;
|
|
|
|
DisposeCoreAsync().GetAwaiter().GetResult();
|
|
_connectionLock.Dispose();
|
|
}
|
|
|
|
private async Task DisposeCoreAsync()
|
|
{
|
|
StopKeepAlive();
|
|
|
|
List<ISubscription> subscriptionsToDispose;
|
|
lock (_subscriptionLock)
|
|
{
|
|
subscriptionsToDispose = new List<ISubscription>(_activeSubscriptions);
|
|
_activeSubscriptions.Clear();
|
|
}
|
|
|
|
foreach (ISubscription subscription in subscriptionsToDispose)
|
|
{
|
|
try
|
|
{
|
|
await subscription.DisposeAsync().ConfigureAwait(false);
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_logger.LogError(ex, "Error disposing subscription");
|
|
}
|
|
}
|
|
|
|
// Disconnect session
|
|
if (_client != null && !string.IsNullOrEmpty(_sessionId))
|
|
{
|
|
try
|
|
{
|
|
var request = new DisconnectRequest { SessionId = _sessionId };
|
|
await _client.DisconnectAsync(request);
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_logger.LogDebug(ex, "Error during disconnect on dispose");
|
|
}
|
|
}
|
|
|
|
await _connectionLock.WaitAsync().ConfigureAwait(false);
|
|
try
|
|
{
|
|
_client = null;
|
|
_sessionId = string.Empty;
|
|
_isConnected = false;
|
|
|
|
_channel?.Dispose();
|
|
_channel = null;
|
|
}
|
|
finally
|
|
{
|
|
_connectionLock.Release();
|
|
}
|
|
}
|
|
|
|
private string GenerateCorrelationId()
|
|
{
|
|
return Guid.NewGuid().ToString("N");
|
|
}
|
|
|
|
private bool IsTransientError(Exception ex)
|
|
{
|
|
// Check for transient gRPC errors
|
|
return ex.Message.Contains("Unavailable") ||
|
|
ex.Message.Contains("DeadlineExceeded") ||
|
|
ex.Message.Contains("ResourceExhausted") ||
|
|
ex.Message.Contains("Aborted");
|
|
}
|
|
|
|
private async Task<T> ExecuteWithRetryAsync<T>(Func<Task<T>> operation, string correlationId)
|
|
{
|
|
if (_retryPolicy != null)
|
|
{
|
|
var context = new Context { ["CorrelationId"] = correlationId };
|
|
return await _retryPolicy.ExecuteAsync(async (_) => await operation(), context);
|
|
}
|
|
|
|
return await operation();
|
|
}
|
|
|
|
/// <summary>
|
|
/// Gets the current metrics snapshot
|
|
/// </summary>
|
|
public Dictionary<string, object> GetMetrics() => _metrics.GetSnapshot();
|
|
}
|
|
}
|