using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Grpc.Net.Client;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using Polly;
using ZB.MOM.WW.LmxProxy.Client.Domain;
using ZB.MOM.WW.LmxProxy.Client.Security;
namespace ZB.MOM.WW.LmxProxy.Client
{
///
/// Client for communicating with the LmxProxy gRPC service using protobuf-net.Grpc code-first
///
public partial class LmxProxyClient : ILmxProxyClient
{
private static readonly string Http2InsecureSwitch = "System.Net.Http.SocketsHttpHandler.Http2UnencryptedSupport";
private readonly ILogger _logger;
private readonly string _host;
private readonly int _port;
private readonly string? _apiKey;
private GrpcChannel? _channel;
private IScadaService? _client;
private string _sessionId = string.Empty;
private readonly SemaphoreSlim _connectionLock = new(1, 1);
private readonly List _activeSubscriptions = [];
private readonly Lock _subscriptionLock = new();
private bool _disposed;
private bool _isConnected;
private TimeSpan _defaultTimeout = TimeSpan.FromSeconds(30);
private ClientConfiguration? _configuration;
private IAsyncPolicy? _retryPolicy;
private readonly ClientMetrics _metrics = new();
private Timer? _keepAliveTimer;
private readonly TimeSpan _keepAliveInterval = TimeSpan.FromSeconds(30);
private readonly ClientTlsConfiguration? _tlsConfiguration;
static LmxProxyClient()
{
AppContext.SetSwitch(Http2InsecureSwitch, true);
}
///
/// Gets or sets the default timeout for operations
///
public TimeSpan DefaultTimeout
{
get => _defaultTimeout;
set
{
if (value <= TimeSpan.Zero)
throw new ArgumentOutOfRangeException(nameof(value), "Timeout must be positive");
if (value > TimeSpan.FromMinutes(10))
throw new ArgumentOutOfRangeException(nameof(value), "Timeout cannot exceed 10 minutes");
_defaultTimeout = value;
}
}
///
/// Initializes a new instance of the LmxProxyClient
///
/// The host address of the LmxProxy service
/// The port of the LmxProxy service
/// The API key for authentication
/// Optional logger instance
public LmxProxyClient(string host, int port, string? apiKey = null, ILogger? logger = null)
: this(host, port, apiKey, null, logger)
{
}
///
/// Creates a new instance of the LmxProxyClient with TLS configuration
///
/// The host address of the LmxProxy service
/// The port of the LmxProxy service
/// The API key for authentication
/// TLS configuration for secure connections
/// Optional logger instance
public LmxProxyClient(string host, int port, string? apiKey, ClientTlsConfiguration? tlsConfiguration, ILogger? logger = null)
{
if (string.IsNullOrWhiteSpace(host))
throw new ArgumentException("Host cannot be null or empty", nameof(host));
if (port < 1 || port > 65535)
throw new ArgumentOutOfRangeException(nameof(port), "Port must be between 1 and 65535");
_host = host;
_port = port;
_apiKey = apiKey;
_tlsConfiguration = tlsConfiguration;
_logger = logger ?? NullLogger.Instance;
}
///
/// Gets whether the client is connected to the service
///
public bool IsConnected => !_disposed && _isConnected && !string.IsNullOrEmpty(_sessionId);
///
/// Asynchronously checks if the client is connected with proper synchronization
///
public async Task IsConnectedAsync()
{
await _connectionLock.WaitAsync();
try
{
return !_disposed && _client != null && _isConnected && !string.IsNullOrEmpty(_sessionId);
}
finally
{
_connectionLock.Release();
}
}
///
/// Sets the builder configuration (internal use)
///
/// The client configuration.
internal void SetBuilderConfiguration(ClientConfiguration configuration)
{
_configuration = configuration;
// Setup retry policy if configured
if (configuration.MaxRetryAttempts > 0)
{
_retryPolicy = Policy
.Handle(IsTransientError)
.WaitAndRetryAsync(
configuration.MaxRetryAttempts,
retryAttempt => configuration.RetryDelay * Math.Pow(2, retryAttempt - 1),
onRetry: (exception, timeSpan, retryCount, context) =>
{
object? correlationId = context.GetValueOrDefault("CorrelationId", "N/A");
_logger.LogWarning(exception,
"Retry {RetryCount} after {Delay}ms. CorrelationId: {CorrelationId}",
retryCount, timeSpan.TotalMilliseconds, correlationId);
});
}
}
///
/// Reads a single tag value
///
/// The tag address to read.
/// Cancellation token.
public async Task ReadAsync(string address, CancellationToken cancellationToken = default)
{
if (string.IsNullOrEmpty(address))
throw new ArgumentNullException(nameof(address));
EnsureConnected();
string correlationId = GenerateCorrelationId();
var stopwatch = Stopwatch.StartNew();
try
{
_metrics.IncrementOperationCount("Read");
var request = new ReadRequest
{
SessionId = _sessionId,
Tag = address
};
ReadResponse response = await ExecuteWithRetryAsync(async () =>
await _client!.ReadAsync(request),
correlationId);
if (!response.Success)
{
_metrics.IncrementErrorCount("Read");
throw new InvalidOperationException($"Read failed for tag '{address}': {response.Message}. CorrelationId: {correlationId}");
}
_metrics.RecordLatency("Read", stopwatch.ElapsedMilliseconds);
return ConvertToVtq(address, response.Vtq);
}
catch (Exception ex)
{
_metrics.IncrementErrorCount("Read");
_logger.LogError(ex, "Read operation failed for tag: {Tag}, CorrelationId: {CorrelationId}",
address, correlationId);
throw;
}
}
///
/// Reads multiple tag values
///
/// The tag addresses to read.
/// Cancellation token.
public async Task> ReadBatchAsync(IEnumerable addresses, CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(addresses);
var addressList = addresses.ToList();
if (!addressList.Any())
throw new ArgumentException("At least one address must be provided", nameof(addresses));
EnsureConnected();
var request = new ReadBatchRequest
{
SessionId = _sessionId,
Tags = addressList
};
ReadBatchResponse response = await _client!.ReadBatchAsync(request);
if (!response.Success)
throw new InvalidOperationException($"ReadBatch failed: {response.Message}");
var results = new Dictionary();
foreach (VtqMessage vtq in response.Vtqs)
{
results[vtq.Tag] = ConvertToVtq(vtq.Tag, vtq);
}
return results;
}
///
/// Writes a single tag value
///
/// The tag address to write.
/// The value to write.
/// Cancellation token.
public async Task WriteAsync(string address, object value, CancellationToken cancellationToken = default)
{
if (string.IsNullOrEmpty(address))
throw new ArgumentNullException(nameof(address));
ArgumentNullException.ThrowIfNull(value);
EnsureConnected();
var request = new WriteRequest
{
SessionId = _sessionId,
Tag = address,
Value = ConvertToString(value)
};
WriteResponse response = await _client!.WriteAsync(request);
if (!response.Success)
throw new InvalidOperationException($"Write failed: {response.Message}");
}
///
/// Writes multiple tag values
///
/// The tag addresses and values to write.
/// Cancellation token.
public async Task WriteBatchAsync(IDictionary values, CancellationToken cancellationToken = default)
{
if (values == null || !values.Any())
throw new ArgumentException("At least one value must be provided", nameof(values));
EnsureConnected();
var request = new WriteBatchRequest
{
SessionId = _sessionId,
Items = values.Select(kvp => new WriteItem
{
Tag = kvp.Key,
Value = ConvertToString(kvp.Value)
}).ToList()
};
WriteBatchResponse response = await _client!.WriteBatchAsync(request);
if (!response.Success)
throw new InvalidOperationException($"WriteBatch failed: {response.Message}");
}
///
/// Writes values and waits for a condition to be met
///
/// The tag addresses and values to write.
/// The flag address to write.
/// The flag value to write.
/// The response address to monitor.
/// The expected response value.
/// Timeout in seconds.
/// Cancellation token.
public async Task WriteBatchAndWaitAsync(
IDictionary values,
string flagAddress,
object flagValue,
string responseAddress,
object responseValue,
int timeoutSeconds = 30,
CancellationToken cancellationToken = default)
{
if (values == null || !values.Any())
throw new ArgumentException("At least one value must be provided", nameof(values));
EnsureConnected();
var request = new WriteBatchAndWaitRequest
{
SessionId = _sessionId,
Items = values.Select(kvp => new WriteItem
{
Tag = kvp.Key,
Value = ConvertToString(kvp.Value)
}).ToList(),
FlagTag = flagAddress,
FlagValue = ConvertToString(flagValue),
TimeoutMs = timeoutSeconds * 1000,
PollIntervalMs = 100
};
WriteBatchAndWaitResponse response = await _client!.WriteBatchAndWaitAsync(request);
if (!response.Success)
throw new InvalidOperationException($"WriteBatchAndWait failed: {response.Message}");
return response.FlagReached;
}
///
/// Checks the validity and permissions of the current API key
///
/// Cancellation token.
public async Task CheckApiKeyAsync(CancellationToken cancellationToken = default)
{
EnsureConnected();
var request = new CheckApiKeyRequest { ApiKey = _apiKey ?? string.Empty };
CheckApiKeyResponse response = await _client!.CheckApiKeyAsync(request);
return new ApiKeyInfo(
response.IsValid,
"ReadWrite", // Code-first contract doesn't return role
response.Message);
}
///
/// Subscribes to tag value changes
///
/// The tag addresses to subscribe to.
/// Callback invoked when tag values change.
/// Cancellation token.
public Task SubscribeAsync(
IEnumerable addresses,
Action onUpdate,
CancellationToken cancellationToken = default)
{
List addressList = addresses?.ToList() ?? throw new ArgumentNullException(nameof(addresses));
if (!addressList.Any())
throw new ArgumentException("At least one address must be provided", nameof(addresses));
ArgumentNullException.ThrowIfNull(onUpdate);
EnsureConnected();
var subscription = new CodeFirstSubscription(_client!, _sessionId, addressList, onUpdate, _logger, RemoveSubscription);
// Track the subscription
lock (_subscriptionLock)
{
_activeSubscriptions.Add(subscription);
}
// Start processing updates
Task startTask = subscription.StartAsync(cancellationToken);
// Log any startup errors but don't throw
startTask.ContinueWith(t =>
{
if (t.IsFaulted)
{
_logger.LogError(t.Exception, "Subscription startup failed");
}
}, TaskContinuationOptions.OnlyOnFaulted);
return Task.FromResult(subscription);
}
private void EnsureConnected()
{
if (_disposed)
throw new ObjectDisposedException(nameof(LmxProxyClient));
if (_client == null || !_isConnected || string.IsNullOrEmpty(_sessionId))
throw new InvalidOperationException("Client is not connected. Call ConnectAsync first.");
}
private static Vtq ConvertToVtq(string tag, VtqMessage? vtqMessage)
{
if (vtqMessage == null)
return new Vtq(null, DateTime.UtcNow, Quality.Bad);
// Parse the string value
object? value = vtqMessage.Value;
if (!string.IsNullOrEmpty(vtqMessage.Value))
{
// Try to parse as numeric types
if (double.TryParse(vtqMessage.Value, out double doubleVal))
value = doubleVal;
else if (bool.TryParse(vtqMessage.Value, out bool boolVal))
value = boolVal;
else
value = vtqMessage.Value;
}
var timestamp = new DateTime(vtqMessage.TimestampUtcTicks, DateTimeKind.Utc);
Quality quality = vtqMessage.Quality?.ToUpperInvariant() switch
{
"GOOD" => Quality.Good,
"UNCERTAIN" => Quality.Uncertain,
_ => Quality.Bad
};
return new Vtq(value, timestamp, quality);
}
private static string ConvertToString(object value)
{
if (value == null)
return string.Empty;
return value switch
{
DateTime dt => dt.ToUniversalTime().ToString("O"),
DateTimeOffset dto => dto.ToString("O"),
bool b => b.ToString().ToLowerInvariant(),
_ => value.ToString() ?? string.Empty
};
}
///
/// Removes a subscription from the active tracking list
///
private void RemoveSubscription(ISubscription subscription)
{
lock (_subscriptionLock)
{
_activeSubscriptions.Remove(subscription);
}
}
///
/// Disposes of the client and closes the connection
///
public void Dispose()
{
if (_disposed)
{
return;
}
DisposeAsync().AsTask().GetAwaiter().GetResult();
GC.SuppressFinalize(this);
}
///
/// Asynchronously disposes of the client and closes the connection
///
public async ValueTask DisposeAsync()
{
if (_disposed)
return;
_disposed = true;
await DisposeCoreAsync().ConfigureAwait(false);
_connectionLock.Dispose();
GC.SuppressFinalize(this);
}
///
/// Protected disposal implementation
///
/// True if disposing managed resources.
protected virtual void Dispose(bool disposing)
{
if (!disposing || _disposed)
return;
_disposed = true;
DisposeCoreAsync().GetAwaiter().GetResult();
_connectionLock.Dispose();
}
private async Task DisposeCoreAsync()
{
StopKeepAlive();
List subscriptionsToDispose;
lock (_subscriptionLock)
{
subscriptionsToDispose = new List(_activeSubscriptions);
_activeSubscriptions.Clear();
}
foreach (ISubscription subscription in subscriptionsToDispose)
{
try
{
await subscription.DisposeAsync().ConfigureAwait(false);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error disposing subscription");
}
}
// Disconnect session
if (_client != null && !string.IsNullOrEmpty(_sessionId))
{
try
{
var request = new DisconnectRequest { SessionId = _sessionId };
await _client.DisconnectAsync(request);
}
catch (Exception ex)
{
_logger.LogDebug(ex, "Error during disconnect on dispose");
}
}
await _connectionLock.WaitAsync().ConfigureAwait(false);
try
{
_client = null;
_sessionId = string.Empty;
_isConnected = false;
_channel?.Dispose();
_channel = null;
}
finally
{
_connectionLock.Release();
}
}
private string GenerateCorrelationId()
{
return Guid.NewGuid().ToString("N");
}
private bool IsTransientError(Exception ex)
{
// Check for transient gRPC errors
return ex.Message.Contains("Unavailable") ||
ex.Message.Contains("DeadlineExceeded") ||
ex.Message.Contains("ResourceExhausted") ||
ex.Message.Contains("Aborted");
}
private async Task ExecuteWithRetryAsync(Func> operation, string correlationId)
{
if (_retryPolicy != null)
{
var context = new Context { ["CorrelationId"] = correlationId };
return await _retryPolicy.ExecuteAsync(async (_) => await operation(), context);
}
return await operation();
}
///
/// Gets the current metrics snapshot
///
public Dictionary GetMetrics() => _metrics.GetSnapshot();
}
}