diff --git a/lmxproxy/src/ZB.MOM.WW.LmxProxy.Client/ClientConfiguration.cs b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Client/ClientConfiguration.cs new file mode 100644 index 0000000..d8f67b5 --- /dev/null +++ b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Client/ClientConfiguration.cs @@ -0,0 +1,13 @@ +namespace ZB.MOM.WW.LmxProxy.Client; + +/// +/// Configuration options for the LmxProxy client, typically set via the builder. +/// +public class ClientConfiguration +{ + /// Maximum number of retry attempts for transient failures. + public int MaxRetryAttempts { get; set; } = 0; + + /// Base delay between retries (exponential backoff applied). + public TimeSpan RetryDelay { get; set; } = TimeSpan.FromSeconds(1); +} diff --git a/lmxproxy/src/ZB.MOM.WW.LmxProxy.Client/ClientTlsConfiguration.cs b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Client/ClientTlsConfiguration.cs new file mode 100644 index 0000000..e7fe617 --- /dev/null +++ b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Client/ClientTlsConfiguration.cs @@ -0,0 +1,31 @@ +namespace ZB.MOM.WW.LmxProxy.Client; + +/// +/// TLS configuration for the LmxProxy gRPC client. +/// +public class ClientTlsConfiguration +{ + /// Whether to use TLS for the gRPC connection. + public bool UseTls { get; set; } = false; + + /// Path to the client certificate PEM file for mTLS. + public string? ClientCertificatePath { get; set; } + + /// Path to the client private key PEM file for mTLS. + public string? ClientKeyPath { get; set; } + + /// Path to the server CA certificate PEM file for custom trust. + public string? ServerCaCertificatePath { get; set; } + + /// Override the server name used for TLS verification. + public string? ServerNameOverride { get; set; } + + /// Whether to validate the server certificate. + public bool ValidateServerCertificate { get; set; } = true; + + /// Whether to allow self-signed certificates. + public bool AllowSelfSignedCertificates { get; set; } = false; + + /// Whether to ignore all certificate errors (dangerous). + public bool IgnoreAllCertificateErrors { get; set; } = false; +} diff --git a/lmxproxy/src/ZB.MOM.WW.LmxProxy.Client/Domain/QualityExtensions.cs b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Client/Domain/QualityExtensions.cs index bc28210..c2bd5c0 100644 --- a/lmxproxy/src/ZB.MOM.WW.LmxProxy.Client/Domain/QualityExtensions.cs +++ b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Client/Domain/QualityExtensions.cs @@ -11,4 +11,19 @@ public static class QualityExtensions /// Returns true if quality is in the Bad family (byte < 64). public static bool IsBad(this Quality q) => (byte)q < 64; + + /// + /// Converts an OPC UA 32-bit status code to the simplified enum. + /// Uses the top two bits to determine the quality family. + /// + public static Quality FromStatusCode(uint statusCode) + { + uint category = statusCode & 0xC0000000; + return category switch + { + 0x00000000 => Quality.Good, + 0x40000000 => Quality.Uncertain, + _ => Quality.Bad + }; + } } diff --git a/lmxproxy/src/ZB.MOM.WW.LmxProxy.Client/ILmxProxyClient.cs b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Client/ILmxProxyClient.cs new file mode 100644 index 0000000..f07a724 --- /dev/null +++ b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Client/ILmxProxyClient.cs @@ -0,0 +1,58 @@ +using ZB.MOM.WW.LmxProxy.Client.Domain; + +namespace ZB.MOM.WW.LmxProxy.Client; + +/// +/// Interface for LmxProxy client operations. +/// +public interface ILmxProxyClient : IDisposable, IAsyncDisposable +{ + /// Gets or sets the default timeout for operations (range: 1s to 10min). + TimeSpan DefaultTimeout { get; set; } + + /// Connects to the LmxProxy service and establishes a session. + Task ConnectAsync(CancellationToken cancellationToken = default); + + /// Disconnects from the LmxProxy service. + Task DisconnectAsync(); + + /// Returns true if the client has an active session. + Task IsConnectedAsync(); + + /// Reads a single tag value. + Task ReadAsync(string address, CancellationToken cancellationToken = default); + + /// Reads multiple tag values in a single batch. + Task> ReadBatchAsync(IEnumerable addresses, CancellationToken cancellationToken = default); + + /// Writes a single tag value (native TypedValue -- no string heuristics). + Task WriteAsync(string address, TypedValue value, CancellationToken cancellationToken = default); + + /// Writes multiple tag values in a single batch. + Task WriteBatchAsync(IDictionary values, CancellationToken cancellationToken = default); + + /// + /// Writes a batch of values, then polls a flag tag until it matches or timeout expires. + /// Returns (writeResults, flagReached, elapsedMs). + /// + Task WriteBatchAndWaitAsync( + IDictionary values, + string flagTag, + TypedValue flagValue, + int timeoutMs = 5000, + int pollIntervalMs = 100, + CancellationToken cancellationToken = default); + + /// Subscribes to tag updates with value and error callbacks. + Task SubscribeAsync( + IEnumerable addresses, + Action onUpdate, + Action? onStreamError = null, + CancellationToken cancellationToken = default); + + /// Validates an API key and returns info. + Task CheckApiKeyAsync(string apiKey, CancellationToken cancellationToken = default); + + /// Returns a snapshot of client-side metrics. + Dictionary GetMetrics(); +} diff --git a/lmxproxy/src/ZB.MOM.WW.LmxProxy.Client/LmxProxyClient.ApiKeyInfo.cs b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Client/LmxProxyClient.ApiKeyInfo.cs new file mode 100644 index 0000000..841b79a --- /dev/null +++ b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Client/LmxProxyClient.ApiKeyInfo.cs @@ -0,0 +1,19 @@ +namespace ZB.MOM.WW.LmxProxy.Client; + +public partial class LmxProxyClient +{ + /// + /// Result of an API key validation check. + /// + public class ApiKeyInfo + { + /// Whether the API key is valid. + public bool IsValid { get; init; } + + /// Role associated with the API key. + public string? Role { get; init; } + + /// Description or message from the server. + public string? Description { get; init; } + } +} diff --git a/lmxproxy/src/ZB.MOM.WW.LmxProxy.Client/LmxProxyClient.ClientMetrics.cs b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Client/LmxProxyClient.ClientMetrics.cs new file mode 100644 index 0000000..c3ad9b7 --- /dev/null +++ b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Client/LmxProxyClient.ClientMetrics.cs @@ -0,0 +1,82 @@ +using System.Collections.Concurrent; + +namespace ZB.MOM.WW.LmxProxy.Client; + +public partial class LmxProxyClient +{ + /// + /// Tracks per-operation counts, errors, and latency with rolling buffer and percentile support. + /// + internal class ClientMetrics + { + private readonly ConcurrentDictionary _operationCounts = new(); + private readonly ConcurrentDictionary _errorCounts = new(); + private readonly ConcurrentDictionary> _latencies = new(); + private readonly Lock _latencyLock = new(); + + public void IncrementOperationCount(string operation) + { + _operationCounts.AddOrUpdate(operation, 1, (_, count) => count + 1); + } + + public void IncrementErrorCount(string operation) + { + _errorCounts.AddOrUpdate(operation, 1, (_, count) => count + 1); + } + + public void RecordLatency(string operation, long milliseconds) + { + lock (_latencyLock) + { + if (!_latencies.TryGetValue(operation, out var list)) + { + list = []; + _latencies[operation] = list; + } + list.Add(milliseconds); + if (list.Count > 1000) + { + list.RemoveAt(0); + } + } + } + + public Dictionary GetSnapshot() + { + var snapshot = new Dictionary(); + + foreach (var kvp in _operationCounts) + { + snapshot[$"{kvp.Key}_count"] = kvp.Value; + } + + foreach (var kvp in _errorCounts) + { + snapshot[$"{kvp.Key}_errors"] = kvp.Value; + } + + lock (_latencyLock) + { + foreach (var kvp in _latencies) + { + var values = kvp.Value; + if (values.Count == 0) continue; + + double avg = values.Average(); + snapshot[$"{kvp.Key}_avg_latency_ms"] = Math.Round(avg, 2); + snapshot[$"{kvp.Key}_p95_latency_ms"] = GetPercentile(values, 95); + snapshot[$"{kvp.Key}_p99_latency_ms"] = GetPercentile(values, 99); + } + } + + return snapshot; + } + + private static long GetPercentile(List values, int percentile) + { + var sorted = values.OrderBy(v => v).ToList(); + int index = Math.Max(0, (int)Math.Ceiling(percentile / 100.0 * sorted.Count) - 1); + return sorted[index]; + } + } +} diff --git a/lmxproxy/src/ZB.MOM.WW.LmxProxy.Client/LmxProxyClient.CodeFirstSubscription.cs b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Client/LmxProxyClient.CodeFirstSubscription.cs new file mode 100644 index 0000000..deaf7f8 --- /dev/null +++ b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Client/LmxProxyClient.CodeFirstSubscription.cs @@ -0,0 +1,127 @@ +using Microsoft.Extensions.Logging; +using ZB.MOM.WW.LmxProxy.Client.Domain; + +namespace ZB.MOM.WW.LmxProxy.Client; + +public partial class LmxProxyClient +{ + private class CodeFirstSubscription : ISubscription + { + private readonly IScadaService _client; + private readonly string _sessionId; + private readonly List _tags; + private readonly Action _onUpdate; + private readonly Action? _onStreamError; + private readonly ILogger _logger; + private readonly Action? _onDispose; + private readonly CancellationTokenSource _cts = new(); + private Task? _processingTask; + private bool _disposed; + private bool _streamErrorFired; + + public CodeFirstSubscription( + IScadaService client, + string sessionId, + List tags, + Action onUpdate, + Action? onStreamError, + ILogger logger, + Action? onDispose) + { + _client = client; + _sessionId = sessionId; + _tags = tags; + _onUpdate = onUpdate; + _onStreamError = onStreamError; + _logger = logger; + _onDispose = onDispose; + } + + public Task StartAsync(CancellationToken cancellationToken) + { + _processingTask = ProcessUpdatesAsync(cancellationToken); + return Task.CompletedTask; + } + + private async Task ProcessUpdatesAsync(CancellationToken cancellationToken) + { + try + { + var request = new SubscribeRequest + { + SessionId = _sessionId, + Tags = _tags, + SamplingMs = 1000 + }; + using var linkedCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _cts.Token); + + await foreach (VtqMessage vtqMsg in _client.SubscribeAsync(request, linkedCts.Token)) + { + try + { + Vtq vtq = ConvertVtqMessage(vtqMsg); + _onUpdate(vtqMsg.Tag, vtq); + } + catch (Exception ex) + { + _logger.LogError(ex, "Error processing subscription update for {Tag}", vtqMsg.Tag); + } + } + } + catch (OperationCanceledException) when (_cts.IsCancellationRequested || cancellationToken.IsCancellationRequested) + { + _logger.LogDebug("Subscription cancelled"); + } + catch (Exception ex) + { + _logger.LogError(ex, "Error in subscription processing"); + FireStreamError(ex); + } + finally + { + if (!_disposed) + { + _disposed = true; + _onDispose?.Invoke(this); + } + } + } + + private void FireStreamError(Exception ex) + { + if (_streamErrorFired) return; + _streamErrorFired = true; + try { _onStreamError?.Invoke(ex); } + catch (Exception cbEx) { _logger.LogWarning(cbEx, "onStreamError callback threw"); } + } + + public async Task DisposeAsync() + { + if (_disposed) return; + _disposed = true; + + await _cts.CancelAsync(); + + if (_processingTask is not null) + { + try + { + await _processingTask.WaitAsync(TimeSpan.FromSeconds(5)); + } + catch { /* swallow timeout or cancellation */ } + } + + _cts.Dispose(); + } + + public void Dispose() + { + if (_disposed) return; + try + { + DisposeAsync().Wait(TimeSpan.FromSeconds(5)); + } + catch { /* swallow */ } + } + } +} diff --git a/lmxproxy/src/ZB.MOM.WW.LmxProxy.Client/LmxProxyClient.Connection.cs b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Client/LmxProxyClient.Connection.cs new file mode 100644 index 0000000..b3356b9 --- /dev/null +++ b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Client/LmxProxyClient.Connection.cs @@ -0,0 +1,219 @@ +using Grpc.Net.Client; +using Microsoft.Extensions.Logging; +using ProtoBuf.Grpc.Client; +using ZB.MOM.WW.LmxProxy.Client.Domain; +using ZB.MOM.WW.LmxProxy.Client.Security; + +namespace ZB.MOM.WW.LmxProxy.Client; + +public partial class LmxProxyClient +{ + /// + public async Task ConnectAsync(CancellationToken cancellationToken = default) + { + await _connectionLock.WaitAsync(cancellationToken); + try + { + ObjectDisposedException.ThrowIf(_disposed, this); + + if (IsConnected) + return; + + var endpoint = BuildEndpointUri(); + _logger.LogInformation("Connecting to LmxProxy at {Endpoint}", endpoint); + + GrpcChannel channel = GrpcChannelFactory.CreateChannel(endpoint, _tlsConfiguration, _logger); + IScadaService client; + try + { + client = channel.CreateGrpcService(); + } + catch + { + channel.Dispose(); + throw; + } + + ConnectResponse response; + try + { + var request = new ConnectRequest + { + ClientId = $"ScadaBridge-{Guid.NewGuid():N}", + ApiKey = _apiKey ?? string.Empty + }; + response = await client.ConnectAsync(request); + } + catch + { + channel.Dispose(); + throw; + } + + if (!response.Success) + { + channel.Dispose(); + throw new InvalidOperationException($"Connect failed: {response.Message}"); + } + + _channel = channel; + _client = client; + _sessionId = response.SessionId; + _isConnected = true; + + StartKeepAlive(); + + _logger.LogInformation("Connected to LmxProxy, session={SessionId}", _sessionId); + } + catch (Exception ex) + { + _channel = null; + _client = null; + _sessionId = string.Empty; + _isConnected = false; + _logger.LogError(ex, "Failed to connect to LmxProxy"); + throw; + } + finally + { + _connectionLock.Release(); + } + } + + /// + public async Task DisconnectAsync() + { + await _connectionLock.WaitAsync(); + try + { + StopKeepAlive(); + + if (_client is not null && !string.IsNullOrEmpty(_sessionId)) + { + try + { + await _client.DisconnectAsync(new DisconnectRequest { SessionId = _sessionId }); + } + catch (Exception ex) + { + _logger.LogWarning(ex, "Error sending disconnect request"); + } + } + + _client = null; + _sessionId = string.Empty; + _isConnected = false; + _channel?.Dispose(); + _channel = null; + } + finally + { + _connectionLock.Release(); + } + } + + /// + public async Task SubscribeAsync( + IEnumerable addresses, + Action onUpdate, + Action? onStreamError = null, + CancellationToken cancellationToken = default) + { + EnsureConnected(); + + var subscription = new CodeFirstSubscription( + _client!, + _sessionId, + addresses.ToList(), + onUpdate, + onStreamError, + _logger, + sub => + { + lock (_subscriptionLock) + { + _activeSubscriptions.Remove(sub); + } + }); + + lock (_subscriptionLock) + { + _activeSubscriptions.Add(subscription); + } + + await subscription.StartAsync(cancellationToken); + return subscription; + } + + private void StartKeepAlive() + { + _keepAliveTimer = new Timer( + async _ => await KeepAliveCallback(), + null, + _keepAliveInterval, + _keepAliveInterval); + } + + private async Task KeepAliveCallback() + { + try + { + if (_client is null || string.IsNullOrEmpty(_sessionId)) + return; + + await _client.GetConnectionStateAsync(new GetConnectionStateRequest { SessionId = _sessionId }); + } + catch (Exception ex) + { + _logger.LogWarning(ex, "Keep-alive failed, marking disconnected"); + StopKeepAlive(); + await MarkDisconnectedAsync(ex); + } + } + + private void StopKeepAlive() + { + _keepAliveTimer?.Dispose(); + _keepAliveTimer = null; + } + + internal async Task MarkDisconnectedAsync(Exception ex) + { + if (_disposed) return; + + await _connectionLock.WaitAsync(); + try + { + _isConnected = false; + _client = null; + _sessionId = string.Empty; + _channel?.Dispose(); + _channel = null; + } + finally + { + _connectionLock.Release(); + } + + List subscriptions; + lock (_subscriptionLock) + { + subscriptions = [.. _activeSubscriptions]; + _activeSubscriptions.Clear(); + } + + foreach (var sub in subscriptions) + { + try { sub.Dispose(); } + catch { /* swallow */ } + } + + _logger.LogWarning(ex, "Client marked as disconnected"); + } + + private Uri BuildEndpointUri() + { + string scheme = _tlsConfiguration?.UseTls == true ? Uri.UriSchemeHttps : Uri.UriSchemeHttp; + return new UriBuilder { Scheme = scheme, Host = _host, Port = _port }.Uri; + } +} diff --git a/lmxproxy/src/ZB.MOM.WW.LmxProxy.Client/LmxProxyClient.ISubscription.cs b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Client/LmxProxyClient.ISubscription.cs new file mode 100644 index 0000000..ae371b1 --- /dev/null +++ b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Client/LmxProxyClient.ISubscription.cs @@ -0,0 +1,13 @@ +namespace ZB.MOM.WW.LmxProxy.Client; + +public partial class LmxProxyClient +{ + /// + /// Represents an active tag subscription. Dispose to unsubscribe. + /// + public interface ISubscription : IDisposable + { + /// Asynchronous disposal with cancellation support. + Task DisposeAsync(); + } +} diff --git a/lmxproxy/src/ZB.MOM.WW.LmxProxy.Client/LmxProxyClient.cs b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Client/LmxProxyClient.cs new file mode 100644 index 0000000..fab607a --- /dev/null +++ b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Client/LmxProxyClient.cs @@ -0,0 +1,314 @@ +using System.Diagnostics; +using Grpc.Core; +using Grpc.Net.Client; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Logging.Abstractions; +using Polly; +using Polly.Retry; +using ZB.MOM.WW.LmxProxy.Client.Domain; + +namespace ZB.MOM.WW.LmxProxy.Client; + +/// +/// gRPC client for the LmxProxy SCADA proxy service. Uses v2 protocol with native TypedValue. +/// +public partial class LmxProxyClient : ILmxProxyClient +{ + private readonly ILogger _logger; + private readonly string _host; + private readonly int _port; + private readonly string? _apiKey; + private readonly ClientTlsConfiguration? _tlsConfiguration; + private readonly ClientMetrics _metrics = new(); + private readonly SemaphoreSlim _connectionLock = new(1, 1); + private readonly List _activeSubscriptions = []; + private readonly Lock _subscriptionLock = new(); + + private GrpcChannel? _channel; + private IScadaService? _client; + private string _sessionId = string.Empty; + private bool _disposed; + private bool _isConnected; + private TimeSpan _defaultTimeout = TimeSpan.FromSeconds(30); + private ClientConfiguration? _configuration; + private ResiliencePipeline? _resiliencePipeline; + private Timer? _keepAliveTimer; + private readonly TimeSpan _keepAliveInterval = TimeSpan.FromSeconds(30); + + /// Returns true if the client has an active session and is not disposed. + public bool IsConnected => !_disposed && _isConnected && !string.IsNullOrEmpty(_sessionId); + + /// + public TimeSpan DefaultTimeout + { + get => _defaultTimeout; + set + { + if (value < TimeSpan.FromSeconds(1) || value > TimeSpan.FromMinutes(10)) + throw new ArgumentOutOfRangeException(nameof(value), "DefaultTimeout must be between 1 second and 10 minutes."); + _defaultTimeout = value; + } + } + + /// + /// Creates a new LmxProxyClient instance. + /// + public LmxProxyClient( + string host, int port, string? apiKey, + ClientTlsConfiguration? tlsConfiguration, + ILogger? logger = null) + { + _host = host ?? throw new ArgumentNullException(nameof(host)); + _port = port; + _apiKey = apiKey; + _tlsConfiguration = tlsConfiguration; + _logger = logger ?? NullLogger.Instance; + } + + /// + /// Sets builder configuration including retry policies. Called internally by the builder. + /// + internal void SetBuilderConfiguration(ClientConfiguration config) + { + _configuration = config; + if (config.MaxRetryAttempts > 0) + { + _resiliencePipeline = new ResiliencePipelineBuilder() + .AddRetry(new RetryStrategyOptions + { + MaxRetryAttempts = config.MaxRetryAttempts, + Delay = config.RetryDelay, + BackoffType = DelayBackoffType.Exponential, + ShouldHandle = new PredicateBuilder() + .Handle(ex => + ex.StatusCode == StatusCode.Unavailable || + ex.StatusCode == StatusCode.DeadlineExceeded || + ex.StatusCode == StatusCode.ResourceExhausted || + ex.StatusCode == StatusCode.Aborted), + OnRetry = args => + { + _logger.LogWarning("Retry {Attempt} after {Delay} for {Exception}", + args.AttemptNumber, args.RetryDelay, args.Outcome.Exception?.Message); + return ValueTask.CompletedTask; + } + }) + .Build(); + } + } + + /// + public async Task ReadAsync(string address, CancellationToken cancellationToken = default) + { + EnsureConnected(); + _metrics.IncrementOperationCount("Read"); + var sw = Stopwatch.StartNew(); + try + { + var request = new ReadRequest { SessionId = _sessionId, Tag = address }; + ReadResponse response = await ExecuteWithRetry( + () => _client!.ReadAsync(request).AsTask(), cancellationToken); + if (!response.Success) + throw new InvalidOperationException($"Read failed: {response.Message}"); + return ConvertVtqMessage(response.Vtq); + } + catch + { + _metrics.IncrementErrorCount("Read"); + throw; + } + finally + { + sw.Stop(); + _metrics.RecordLatency("Read", sw.ElapsedMilliseconds); + } + } + + /// + public async Task> ReadBatchAsync( + IEnumerable addresses, CancellationToken cancellationToken = default) + { + EnsureConnected(); + _metrics.IncrementOperationCount("ReadBatch"); + var sw = Stopwatch.StartNew(); + try + { + var request = new ReadBatchRequest { SessionId = _sessionId, Tags = addresses.ToList() }; + ReadBatchResponse response = await ExecuteWithRetry( + () => _client!.ReadBatchAsync(request).AsTask(), cancellationToken); + var result = new Dictionary(); + foreach (var vtqMsg in response.Vtqs) + { + result[vtqMsg.Tag] = ConvertVtqMessage(vtqMsg); + } + return result; + } + catch + { + _metrics.IncrementErrorCount("ReadBatch"); + throw; + } + finally + { + sw.Stop(); + _metrics.RecordLatency("ReadBatch", sw.ElapsedMilliseconds); + } + } + + /// + public async Task WriteAsync(string address, TypedValue value, CancellationToken cancellationToken = default) + { + EnsureConnected(); + _metrics.IncrementOperationCount("Write"); + var sw = Stopwatch.StartNew(); + try + { + var request = new WriteRequest { SessionId = _sessionId, Tag = address, Value = value }; + WriteResponse response = await ExecuteWithRetry( + () => _client!.WriteAsync(request).AsTask(), cancellationToken); + if (!response.Success) + throw new InvalidOperationException($"Write failed: {response.Message}"); + } + catch + { + _metrics.IncrementErrorCount("Write"); + throw; + } + finally + { + sw.Stop(); + _metrics.RecordLatency("Write", sw.ElapsedMilliseconds); + } + } + + /// + public async Task WriteBatchAsync(IDictionary values, CancellationToken cancellationToken = default) + { + EnsureConnected(); + _metrics.IncrementOperationCount("WriteBatch"); + var sw = Stopwatch.StartNew(); + try + { + var request = new WriteBatchRequest + { + SessionId = _sessionId, + Items = values.Select(kv => new WriteItem { Tag = kv.Key, Value = kv.Value }).ToList() + }; + WriteBatchResponse response = await ExecuteWithRetry( + () => _client!.WriteBatchAsync(request).AsTask(), cancellationToken); + if (!response.Success) + throw new InvalidOperationException($"WriteBatch failed: {response.Message}"); + } + catch + { + _metrics.IncrementErrorCount("WriteBatch"); + throw; + } + finally + { + sw.Stop(); + _metrics.RecordLatency("WriteBatch", sw.ElapsedMilliseconds); + } + } + + /// + public async Task WriteBatchAndWaitAsync( + IDictionary values, string flagTag, TypedValue flagValue, + int timeoutMs = 5000, int pollIntervalMs = 100, CancellationToken cancellationToken = default) + { + EnsureConnected(); + var request = new WriteBatchAndWaitRequest + { + SessionId = _sessionId, + Items = values.Select(kv => new WriteItem { Tag = kv.Key, Value = kv.Value }).ToList(), + FlagTag = flagTag, + FlagValue = flagValue, + TimeoutMs = timeoutMs, + PollIntervalMs = pollIntervalMs + }; + return await ExecuteWithRetry( + () => _client!.WriteBatchAndWaitAsync(request).AsTask(), cancellationToken); + } + + /// + public async Task CheckApiKeyAsync(string apiKey, CancellationToken cancellationToken = default) + { + EnsureConnected(); + var request = new CheckApiKeyRequest { ApiKey = apiKey }; + CheckApiKeyResponse response = await _client!.CheckApiKeyAsync(request); + return new ApiKeyInfo { IsValid = response.IsValid, Description = response.Message }; + } + + /// + public Task IsConnectedAsync() => Task.FromResult(IsConnected); + + /// + public Dictionary GetMetrics() => _metrics.GetSnapshot(); + + internal static Vtq ConvertVtqMessage(VtqMessage? msg) + { + if (msg is null) + return new Vtq(null, DateTime.UtcNow, Quality.Bad); + + object? value = ExtractTypedValue(msg.Value); + DateTime timestamp = msg.TimestampUtcTicks > 0 + ? new DateTime(msg.TimestampUtcTicks, DateTimeKind.Utc) + : DateTime.UtcNow; + Quality quality = QualityExtensions.FromStatusCode(msg.Quality?.StatusCode ?? 0x80000000u); + return new Vtq(value, timestamp, quality); + } + + internal static object? ExtractTypedValue(TypedValue? tv) + { + if (tv is null) return null; + + return tv.GetValueCase() switch + { + TypedValueCase.BoolValue => tv.BoolValue, + TypedValueCase.Int32Value => tv.Int32Value, + TypedValueCase.Int64Value => tv.Int64Value, + TypedValueCase.FloatValue => tv.FloatValue, + TypedValueCase.DoubleValue => tv.DoubleValue, + TypedValueCase.StringValue => tv.StringValue, + TypedValueCase.BytesValue => tv.BytesValue, + TypedValueCase.DatetimeValue => new DateTime(tv.DatetimeValue, DateTimeKind.Utc), + TypedValueCase.ArrayValue => tv.ArrayValue, + TypedValueCase.None => null, + _ => null + }; + } + + private async Task ExecuteWithRetry(Func> operation, CancellationToken ct) + { + if (_resiliencePipeline is not null) + { + return await _resiliencePipeline.ExecuteAsync( + async token => await operation(), ct); + } + return await operation(); + } + + private void EnsureConnected() + { + ObjectDisposedException.ThrowIf(_disposed, this); + if (!IsConnected) + throw new InvalidOperationException("Client is not connected. Call ConnectAsync first."); + } + + /// + public void Dispose() + { + if (_disposed) return; + _disposed = true; + _keepAliveTimer?.Dispose(); + _channel?.Dispose(); + _connectionLock.Dispose(); + } + + /// + public async ValueTask DisposeAsync() + { + if (_disposed) return; + try { await DisconnectAsync(); } catch { /* swallow */ } + Dispose(); + } +} diff --git a/lmxproxy/src/ZB.MOM.WW.LmxProxy.Client/Security/GrpcChannelFactory.cs b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Client/Security/GrpcChannelFactory.cs new file mode 100644 index 0000000..3027e24 --- /dev/null +++ b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Client/Security/GrpcChannelFactory.cs @@ -0,0 +1,103 @@ +using System.Net.Security; +using System.Security.Authentication; +using System.Security.Cryptography.X509Certificates; +using Grpc.Net.Client; +using Microsoft.Extensions.Logging; + +namespace ZB.MOM.WW.LmxProxy.Client.Security; + +/// +/// Factory for creating configured gRPC channels with TLS support. +/// +internal static class GrpcChannelFactory +{ +#pragma warning disable CA1810 // Initialize reference type static fields inline + static GrpcChannelFactory() +#pragma warning restore CA1810 + { + // Enable HTTP/2 over plaintext for non-TLS scenarios + AppContext.SetSwitch("System.Net.Http.SocketsHttpHandler.Http2UnencryptedSupport", true); + } + + /// + /// Creates a with the specified address and TLS configuration. + /// + public static GrpcChannel CreateChannel(Uri address, ClientTlsConfiguration? tlsConfiguration, ILogger logger) + { + var handler = new SocketsHttpHandler + { + EnableMultipleHttp2Connections = true + }; + + if (tlsConfiguration?.UseTls == true) + { + ConfigureTls(handler, tlsConfiguration, logger); + } + + var channelOptions = new GrpcChannelOptions + { + HttpHandler = handler + }; + + logger.LogDebug("Creating gRPC channel to {Address}, TLS={UseTls}", address, tlsConfiguration?.UseTls ?? false); + return GrpcChannel.ForAddress(address, channelOptions); + } + + private static void ConfigureTls(SocketsHttpHandler handler, ClientTlsConfiguration tls, ILogger logger) + { + handler.SslOptions = new SslClientAuthenticationOptions + { + EnabledSslProtocols = SslProtocols.Tls12 | SslProtocols.Tls13 + }; + + if (!string.IsNullOrEmpty(tls.ServerNameOverride)) + { + handler.SslOptions.TargetHost = tls.ServerNameOverride; + } + + // Load client certificate for mTLS + if (!string.IsNullOrEmpty(tls.ClientCertificatePath) && !string.IsNullOrEmpty(tls.ClientKeyPath)) + { + var clientCert = X509Certificate2.CreateFromPemFile(tls.ClientCertificatePath, tls.ClientKeyPath); + handler.SslOptions.ClientCertificates = [clientCert]; + logger.LogDebug("Loaded client certificate for mTLS from {Path}", tls.ClientCertificatePath); + } + + // Certificate validation callback + handler.SslOptions.RemoteCertificateValidationCallback = (sender, certificate, chain, sslPolicyErrors) => + { + if (tls.IgnoreAllCertificateErrors) + { + logger.LogWarning("Ignoring all certificate errors (IgnoreAllCertificateErrors=true)"); + return true; + } + + if (!tls.ValidateServerCertificate) + { + return true; + } + + if (sslPolicyErrors == SslPolicyErrors.None) + return true; + + // Custom CA trust store + if (!string.IsNullOrEmpty(tls.ServerCaCertificatePath) && certificate is not null) + { + using var customChain = new X509Chain(); + customChain.ChainPolicy.TrustMode = X509ChainTrustMode.CustomRootTrust; + customChain.ChainPolicy.CustomTrustStore.Add(X509CertificateLoader.LoadCertificateFromFile(tls.ServerCaCertificatePath)); + if (customChain.Build(new X509Certificate2(certificate))) + return true; + } + + if (tls.AllowSelfSignedCertificates && sslPolicyErrors == SslPolicyErrors.RemoteCertificateChainErrors) + { + logger.LogWarning("Allowing self-signed certificate"); + return true; + } + + logger.LogError("Certificate validation failed: {Errors}", sslPolicyErrors); + return false; + }; + } +} diff --git a/lmxproxy/src/ZB.MOM.WW.LmxProxy.Client/ZB.MOM.WW.LmxProxy.Client.csproj b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Client/ZB.MOM.WW.LmxProxy.Client.csproj index cebb7a2..0b81981 100644 --- a/lmxproxy/src/ZB.MOM.WW.LmxProxy.Client/ZB.MOM.WW.LmxProxy.Client.csproj +++ b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Client/ZB.MOM.WW.LmxProxy.Client.csproj @@ -14,6 +14,10 @@ AnyCPU + + + + diff --git a/lmxproxy/tests/ZB.MOM.WW.LmxProxy.Client.Tests/ClientMetricsTests.cs b/lmxproxy/tests/ZB.MOM.WW.LmxProxy.Client.Tests/ClientMetricsTests.cs new file mode 100644 index 0000000..1f038f6 --- /dev/null +++ b/lmxproxy/tests/ZB.MOM.WW.LmxProxy.Client.Tests/ClientMetricsTests.cs @@ -0,0 +1,122 @@ +using FluentAssertions; +using Xunit; + +namespace ZB.MOM.WW.LmxProxy.Client.Tests; + +public class ClientMetricsTests +{ + private static LmxProxyClient.ClientMetrics CreateMetrics() => new(); + + [Fact] + public void IncrementOperationCount_Increments() + { + var metrics = CreateMetrics(); + + metrics.IncrementOperationCount("Read"); + metrics.IncrementOperationCount("Read"); + metrics.IncrementOperationCount("Read"); + + var snapshot = metrics.GetSnapshot(); + snapshot["Read_count"].Should().Be(3L); + } + + [Fact] + public void IncrementErrorCount_Increments() + { + var metrics = CreateMetrics(); + + metrics.IncrementErrorCount("Write"); + metrics.IncrementErrorCount("Write"); + + var snapshot = metrics.GetSnapshot(); + snapshot["Write_errors"].Should().Be(2L); + } + + [Fact] + public void RecordLatency_StoresValues() + { + var metrics = CreateMetrics(); + + metrics.RecordLatency("Read", 10); + metrics.RecordLatency("Read", 20); + metrics.RecordLatency("Read", 30); + + var snapshot = metrics.GetSnapshot(); + snapshot.Should().ContainKey("Read_avg_latency_ms"); + snapshot.Should().ContainKey("Read_p95_latency_ms"); + snapshot.Should().ContainKey("Read_p99_latency_ms"); + + var avg = (double)snapshot["Read_avg_latency_ms"]; + avg.Should().BeApproximately(20.0, 0.1); + } + + [Fact] + public void RollingBuffer_CapsAt1000() + { + var metrics = CreateMetrics(); + + for (int i = 0; i < 1100; i++) + { + metrics.RecordLatency("Read", i); + } + + var snapshot = metrics.GetSnapshot(); + // After 1100 entries, the buffer should have capped at 1000 (oldest removed) + // The earliest remaining value should be 100 (entries 0-99 were evicted) + var p95 = (long)snapshot["Read_p95_latency_ms"]; + // p95 of values 100-1099 should be around 1050 + p95.Should().BeGreaterThan(900); + } + + [Fact] + public void GetSnapshot_IncludesP95AndP99() + { + var metrics = CreateMetrics(); + + // Add 100 values: 1, 2, 3, ..., 100 + for (int i = 1; i <= 100; i++) + { + metrics.RecordLatency("Op", i); + } + + var snapshot = metrics.GetSnapshot(); + + var p95 = (long)snapshot["Op_p95_latency_ms"]; + var p99 = (long)snapshot["Op_p99_latency_ms"]; + + // P95 of 1..100 should be 95 + p95.Should().Be(95); + // P99 of 1..100 should be 99 + p99.Should().Be(99); + } + + [Fact] + public void GetSnapshot_ReturnsEmptyForNoData() + { + var metrics = CreateMetrics(); + + var snapshot = metrics.GetSnapshot(); + + snapshot.Should().BeEmpty(); + } + + [Fact] + public void GetSnapshot_TracksMultipleOperations() + { + var metrics = CreateMetrics(); + + metrics.IncrementOperationCount("Read"); + metrics.IncrementOperationCount("Write"); + metrics.IncrementErrorCount("Read"); + metrics.RecordLatency("Read", 10); + metrics.RecordLatency("Write", 20); + + var snapshot = metrics.GetSnapshot(); + + snapshot["Read_count"].Should().Be(1L); + snapshot["Write_count"].Should().Be(1L); + snapshot["Read_errors"].Should().Be(1L); + snapshot.Should().ContainKey("Read_avg_latency_ms"); + snapshot.Should().ContainKey("Write_avg_latency_ms"); + } +} diff --git a/lmxproxy/tests/ZB.MOM.WW.LmxProxy.Client.Tests/Fakes/FakeScadaService.cs b/lmxproxy/tests/ZB.MOM.WW.LmxProxy.Client.Tests/Fakes/FakeScadaService.cs new file mode 100644 index 0000000..d1b2dae --- /dev/null +++ b/lmxproxy/tests/ZB.MOM.WW.LmxProxy.Client.Tests/Fakes/FakeScadaService.cs @@ -0,0 +1,112 @@ +using System.Runtime.CompilerServices; +using ZB.MOM.WW.LmxProxy.Client.Domain; + +namespace ZB.MOM.WW.LmxProxy.Client.Tests.Fakes; + +/// +/// Hand-written fake implementation of IScadaService for unit testing. +/// +internal class FakeScadaService : IScadaService +{ + // Configure responses + public ConnectResponse ConnectResponseToReturn { get; set; } = new() { Success = true, SessionId = "test-session-123", Message = "OK" }; + public DisconnectResponse DisconnectResponseToReturn { get; set; } = new() { Success = true, Message = "OK" }; + public GetConnectionStateResponse GetConnectionStateResponseToReturn { get; set; } = new() { IsConnected = true }; + public ReadResponse ReadResponseToReturn { get; set; } = new() { Success = true }; + public ReadBatchResponse ReadBatchResponseToReturn { get; set; } = new() { Success = true }; + public WriteResponse WriteResponseToReturn { get; set; } = new() { Success = true }; + public WriteBatchResponse WriteBatchResponseToReturn { get; set; } = new() { Success = true }; + public WriteBatchAndWaitResponse WriteBatchAndWaitResponseToReturn { get; set; } = new() { Success = true }; + public CheckApiKeyResponse CheckApiKeyResponseToReturn { get; set; } = new() { IsValid = true, Message = "Valid" }; + + // Track calls + public List ConnectCalls { get; } = []; + public List DisconnectCalls { get; } = []; + public List GetConnectionStateCalls { get; } = []; + public List ReadCalls { get; } = []; + public List ReadBatchCalls { get; } = []; + public List WriteCalls { get; } = []; + public List WriteBatchCalls { get; } = []; + public List WriteBatchAndWaitCalls { get; } = []; + public List CheckApiKeyCalls { get; } = []; + public List SubscribeCalls { get; } = []; + + // Error injection + public Exception? GetConnectionStateException { get; set; } + + // Subscription data + public List SubscriptionMessages { get; set; } = []; + public Exception? SubscriptionException { get; set; } + + public ValueTask ConnectAsync(ConnectRequest request) + { + ConnectCalls.Add(request); + return new ValueTask(ConnectResponseToReturn); + } + + public ValueTask DisconnectAsync(DisconnectRequest request) + { + DisconnectCalls.Add(request); + return new ValueTask(DisconnectResponseToReturn); + } + + public ValueTask GetConnectionStateAsync(GetConnectionStateRequest request) + { + GetConnectionStateCalls.Add(request); + if (GetConnectionStateException is not null) + throw GetConnectionStateException; + return new ValueTask(GetConnectionStateResponseToReturn); + } + + public ValueTask ReadAsync(ReadRequest request) + { + ReadCalls.Add(request); + return new ValueTask(ReadResponseToReturn); + } + + public ValueTask ReadBatchAsync(ReadBatchRequest request) + { + ReadBatchCalls.Add(request); + return new ValueTask(ReadBatchResponseToReturn); + } + + public ValueTask WriteAsync(WriteRequest request) + { + WriteCalls.Add(request); + return new ValueTask(WriteResponseToReturn); + } + + public ValueTask WriteBatchAsync(WriteBatchRequest request) + { + WriteBatchCalls.Add(request); + return new ValueTask(WriteBatchResponseToReturn); + } + + public ValueTask WriteBatchAndWaitAsync(WriteBatchAndWaitRequest request) + { + WriteBatchAndWaitCalls.Add(request); + return new ValueTask(WriteBatchAndWaitResponseToReturn); + } + + public ValueTask CheckApiKeyAsync(CheckApiKeyRequest request) + { + CheckApiKeyCalls.Add(request); + return new ValueTask(CheckApiKeyResponseToReturn); + } + + public async IAsyncEnumerable SubscribeAsync( + SubscribeRequest request, [EnumeratorCancellation] CancellationToken cancellationToken = default) + { + SubscribeCalls.Add(request); + + foreach (var msg in SubscriptionMessages) + { + cancellationToken.ThrowIfCancellationRequested(); + yield return msg; + await Task.Yield(); + } + + if (SubscriptionException is not null) + throw SubscriptionException; + } +} diff --git a/lmxproxy/tests/ZB.MOM.WW.LmxProxy.Client.Tests/Fakes/TestableClient.cs b/lmxproxy/tests/ZB.MOM.WW.LmxProxy.Client.Tests/Fakes/TestableClient.cs new file mode 100644 index 0000000..d46cda2 --- /dev/null +++ b/lmxproxy/tests/ZB.MOM.WW.LmxProxy.Client.Tests/Fakes/TestableClient.cs @@ -0,0 +1,50 @@ +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Logging.Abstractions; +using ZB.MOM.WW.LmxProxy.Client.Domain; + +namespace ZB.MOM.WW.LmxProxy.Client.Tests.Fakes; + +/// +/// Helper to create an LmxProxyClient wired to a FakeScadaService, bypassing real gRPC. +/// Uses reflection to set private fields since the client has no test seam for IScadaService injection. +/// +internal static class TestableClient +{ + /// + /// Creates an LmxProxyClient with a fake service injected into its internal state, + /// simulating a connected client. + /// + public static (LmxProxyClient Client, FakeScadaService Fake) CreateConnected( + string sessionId = "test-session-123", + ILogger? logger = null) + { + var fake = new FakeScadaService + { + ConnectResponseToReturn = new ConnectResponse + { + Success = true, + SessionId = sessionId, + Message = "OK" + } + }; + + var client = new LmxProxyClient("localhost", 50051, "test-key", null, logger); + + // Use reflection to inject fake service and simulate connected state + var clientType = typeof(LmxProxyClient); + + var clientField = clientType.GetField("_client", + System.Reflection.BindingFlags.NonPublic | System.Reflection.BindingFlags.Instance)!; + clientField.SetValue(client, fake); + + var sessionField = clientType.GetField("_sessionId", + System.Reflection.BindingFlags.NonPublic | System.Reflection.BindingFlags.Instance)!; + sessionField.SetValue(client, sessionId); + + var connectedField = clientType.GetField("_isConnected", + System.Reflection.BindingFlags.NonPublic | System.Reflection.BindingFlags.Instance)!; + connectedField.SetValue(client, true); + + return (client, fake); + } +} diff --git a/lmxproxy/tests/ZB.MOM.WW.LmxProxy.Client.Tests/LmxProxyClientConnectionTests.cs b/lmxproxy/tests/ZB.MOM.WW.LmxProxy.Client.Tests/LmxProxyClientConnectionTests.cs new file mode 100644 index 0000000..710da79 --- /dev/null +++ b/lmxproxy/tests/ZB.MOM.WW.LmxProxy.Client.Tests/LmxProxyClientConnectionTests.cs @@ -0,0 +1,103 @@ +using FluentAssertions; +using Xunit; +using ZB.MOM.WW.LmxProxy.Client.Domain; +using ZB.MOM.WW.LmxProxy.Client.Tests.Fakes; + +namespace ZB.MOM.WW.LmxProxy.Client.Tests; + +public class LmxProxyClientConnectionTests +{ + [Fact] + public async Task IsConnectedAsync_ReturnsFalseBeforeConnect() + { + var client = new LmxProxyClient("localhost", 50051, null, null); + + var result = await client.IsConnectedAsync(); + + result.Should().BeFalse(); + client.Dispose(); + } + + [Fact] + public async Task IsConnectedAsync_ReturnsTrueAfterInjection() + { + var (client, _) = TestableClient.CreateConnected(); + + var result = await client.IsConnectedAsync(); + + result.Should().BeTrue(); + client.Dispose(); + } + + [Fact] + public async Task DisconnectAsync_SendsDisconnectAndClearsState() + { + var (client, fake) = TestableClient.CreateConnected(); + + await client.DisconnectAsync(); + + fake.DisconnectCalls.Should().HaveCount(1); + fake.DisconnectCalls[0].SessionId.Should().Be("test-session-123"); + client.IsConnected.Should().BeFalse(); + client.Dispose(); + } + + [Fact] + public async Task DisconnectAsync_SwallowsExceptions() + { + var (client, fake) = TestableClient.CreateConnected(); + fake.DisconnectResponseToReturn = null!; // Force an error path + + // Should not throw + var act = () => client.DisconnectAsync(); + await act.Should().NotThrowAsync(); + + client.Dispose(); + } + + [Fact] + public void IsConnected_ReturnsFalseAfterDispose() + { + var (client, _) = TestableClient.CreateConnected(); + + client.Dispose(); + + client.IsConnected.Should().BeFalse(); + } + + [Fact] + public async Task MarkDisconnectedAsync_ClearsConnectionState() + { + var (client, _) = TestableClient.CreateConnected(); + + await client.MarkDisconnectedAsync(new Exception("connection lost")); + + client.IsConnected.Should().BeFalse(); + client.Dispose(); + } + + [Fact] + public void DefaultTimeout_RejectsOutOfRange() + { + var client = new LmxProxyClient("localhost", 50051, null, null); + + var act = () => client.DefaultTimeout = TimeSpan.FromMilliseconds(500); + act.Should().Throw(); + + var act2 = () => client.DefaultTimeout = TimeSpan.FromMinutes(11); + act2.Should().Throw(); + + client.Dispose(); + } + + [Fact] + public void DefaultTimeout_AcceptsValidRange() + { + var client = new LmxProxyClient("localhost", 50051, null, null); + + client.DefaultTimeout = TimeSpan.FromSeconds(5); + client.DefaultTimeout.Should().Be(TimeSpan.FromSeconds(5)); + + client.Dispose(); + } +} diff --git a/lmxproxy/tests/ZB.MOM.WW.LmxProxy.Client.Tests/LmxProxyClientReadWriteTests.cs b/lmxproxy/tests/ZB.MOM.WW.LmxProxy.Client.Tests/LmxProxyClientReadWriteTests.cs new file mode 100644 index 0000000..4569d51 --- /dev/null +++ b/lmxproxy/tests/ZB.MOM.WW.LmxProxy.Client.Tests/LmxProxyClientReadWriteTests.cs @@ -0,0 +1,177 @@ +using FluentAssertions; +using Xunit; +using ZB.MOM.WW.LmxProxy.Client.Domain; +using ZB.MOM.WW.LmxProxy.Client.Tests.Fakes; + +namespace ZB.MOM.WW.LmxProxy.Client.Tests; + +public class LmxProxyClientReadWriteTests +{ + [Fact] + public async Task ReadAsync_ReturnsVtqFromResponse() + { + var (client, fake) = TestableClient.CreateConnected(); + fake.ReadResponseToReturn = new ReadResponse + { + Success = true, + Vtq = new VtqMessage + { + Tag = "TestTag", + Value = new TypedValue { DoubleValue = 42.5 }, + TimestampUtcTicks = new DateTime(2026, 1, 1, 0, 0, 0, DateTimeKind.Utc).Ticks, + Quality = new QualityCode { StatusCode = 0x00000000 } + } + }; + + var result = await client.ReadAsync("TestTag"); + + result.Value.Should().Be(42.5); + result.Quality.Should().Be(Quality.Good); + fake.ReadCalls.Should().HaveCount(1); + fake.ReadCalls[0].Tag.Should().Be("TestTag"); + fake.ReadCalls[0].SessionId.Should().Be("test-session-123"); + client.Dispose(); + } + + [Fact] + public async Task ReadAsync_ThrowsOnFailureResponse() + { + var (client, fake) = TestableClient.CreateConnected(); + fake.ReadResponseToReturn = new ReadResponse { Success = false, Message = "Tag not found" }; + + var act = () => client.ReadAsync("BadTag"); + + await act.Should().ThrowAsync() + .WithMessage("*Tag not found*"); + client.Dispose(); + } + + [Fact] + public async Task ReadAsync_ThrowsWhenNotConnected() + { + var client = new LmxProxyClient("localhost", 50051, null, null); + + var act = () => client.ReadAsync("AnyTag"); + + await act.Should().ThrowAsync() + .WithMessage("*not connected*"); + client.Dispose(); + } + + [Fact] + public async Task ReadBatchAsync_ReturnsDictionaryOfVtqs() + { + var (client, fake) = TestableClient.CreateConnected(); + fake.ReadBatchResponseToReturn = new ReadBatchResponse + { + Success = true, + Vtqs = + [ + new VtqMessage + { + Tag = "Tag1", + Value = new TypedValue { Int32Value = 100 }, + TimestampUtcTicks = DateTime.UtcNow.Ticks, + Quality = new QualityCode { StatusCode = 0x00000000 } + }, + new VtqMessage + { + Tag = "Tag2", + Value = new TypedValue { BoolValue = true }, + TimestampUtcTicks = DateTime.UtcNow.Ticks, + Quality = new QualityCode { StatusCode = 0x00000000 } + } + ] + }; + + var result = await client.ReadBatchAsync(["Tag1", "Tag2"]); + + result.Should().HaveCount(2); + result["Tag1"].Value.Should().Be(100); + result["Tag2"].Value.Should().Be(true); + client.Dispose(); + } + + [Fact] + public async Task WriteAsync_SendsTypedValueDirectly() + { + var (client, fake) = TestableClient.CreateConnected(); + var typedValue = new TypedValue { DoubleValue = 99.9 }; + + await client.WriteAsync("TestTag", typedValue); + + fake.WriteCalls.Should().HaveCount(1); + fake.WriteCalls[0].Tag.Should().Be("TestTag"); + fake.WriteCalls[0].Value.Should().NotBeNull(); + fake.WriteCalls[0].Value!.DoubleValue.Should().Be(99.9); + client.Dispose(); + } + + [Fact] + public async Task WriteAsync_ThrowsOnFailureResponse() + { + var (client, fake) = TestableClient.CreateConnected(); + fake.WriteResponseToReturn = new WriteResponse { Success = false, Message = "Write error" }; + + var act = () => client.WriteAsync("Tag", new TypedValue { Int32Value = 1 }); + + await act.Should().ThrowAsync() + .WithMessage("*Write error*"); + client.Dispose(); + } + + [Fact] + public async Task WriteBatchAsync_SendsAllItems() + { + var (client, fake) = TestableClient.CreateConnected(); + var values = new Dictionary + { + ["Tag1"] = new TypedValue { DoubleValue = 1.0 }, + ["Tag2"] = new TypedValue { Int32Value = 2 }, + ["Tag3"] = new TypedValue { BoolValue = true } + }; + + await client.WriteBatchAsync(values); + + fake.WriteBatchCalls.Should().HaveCount(1); + fake.WriteBatchCalls[0].Items.Should().HaveCount(3); + client.Dispose(); + } + + [Fact] + public async Task WriteBatchAndWaitAsync_ReturnsResponse() + { + var (client, fake) = TestableClient.CreateConnected(); + fake.WriteBatchAndWaitResponseToReturn = new WriteBatchAndWaitResponse + { + Success = true, + FlagReached = true, + ElapsedMs = 150, + WriteResults = [new WriteResult { Tag = "Tag1", Success = true }] + }; + var values = new Dictionary + { + ["Tag1"] = new TypedValue { Int32Value = 1 } + }; + + var result = await client.WriteBatchAndWaitAsync( + values, "FlagTag", new TypedValue { BoolValue = true }); + + result.FlagReached.Should().BeTrue(); + result.ElapsedMs.Should().Be(150); + client.Dispose(); + } + + [Fact] + public async Task CheckApiKeyAsync_ReturnsApiKeyInfo() + { + var (client, fake) = TestableClient.CreateConnected(); + fake.CheckApiKeyResponseToReturn = new CheckApiKeyResponse { IsValid = true, Message = "Admin key" }; + + var result = await client.CheckApiKeyAsync("my-api-key"); + + result.IsValid.Should().BeTrue(); + result.Description.Should().Be("Admin key"); + client.Dispose(); + } +} diff --git a/lmxproxy/tests/ZB.MOM.WW.LmxProxy.Client.Tests/LmxProxyClientSubscriptionTests.cs b/lmxproxy/tests/ZB.MOM.WW.LmxProxy.Client.Tests/LmxProxyClientSubscriptionTests.cs new file mode 100644 index 0000000..23d632f --- /dev/null +++ b/lmxproxy/tests/ZB.MOM.WW.LmxProxy.Client.Tests/LmxProxyClientSubscriptionTests.cs @@ -0,0 +1,100 @@ +using FluentAssertions; +using Xunit; +using ZB.MOM.WW.LmxProxy.Client.Domain; +using ZB.MOM.WW.LmxProxy.Client.Tests.Fakes; + +namespace ZB.MOM.WW.LmxProxy.Client.Tests; + +public class LmxProxyClientSubscriptionTests +{ + [Fact] + public async Task SubscribeAsync_InvokesCallbackForEachUpdate() + { + var (client, fake) = TestableClient.CreateConnected(); + fake.SubscriptionMessages = + [ + new VtqMessage + { + Tag = "Tag1", + Value = new TypedValue { DoubleValue = 1.0 }, + TimestampUtcTicks = DateTime.UtcNow.Ticks, + Quality = new QualityCode { StatusCode = 0x00000000 } + }, + new VtqMessage + { + Tag = "Tag2", + Value = new TypedValue { Int32Value = 42 }, + TimestampUtcTicks = DateTime.UtcNow.Ticks, + Quality = new QualityCode { StatusCode = 0x00000000 } + } + ]; + + var updates = new List<(string Tag, Vtq Vtq)>(); + var subscription = await client.SubscribeAsync( + ["Tag1", "Tag2"], + (tag, vtq) => updates.Add((tag, vtq))); + + // Wait for processing to complete (fake yields all then stops) + await Task.Delay(500); + + updates.Should().HaveCount(2); + updates[0].Tag.Should().Be("Tag1"); + updates[0].Vtq.Value.Should().Be(1.0); + updates[1].Tag.Should().Be("Tag2"); + updates[1].Vtq.Value.Should().Be(42); + + subscription.Dispose(); + client.Dispose(); + } + + [Fact] + public async Task SubscribeAsync_InvokesStreamErrorOnFailure() + { + var (client, fake) = TestableClient.CreateConnected(); + fake.SubscriptionException = new InvalidOperationException("Stream broke"); + + Exception? capturedError = null; + var subscription = await client.SubscribeAsync( + ["Tag1"], + (_, _) => { }, + ex => capturedError = ex); + + // Wait for error to propagate + await Task.Delay(500); + + capturedError.Should().NotBeNull(); + capturedError.Should().BeOfType(); + capturedError!.Message.Should().Be("Stream broke"); + + subscription.Dispose(); + client.Dispose(); + } + + [Fact] + public async Task SubscribeAsync_DisposeStopsProcessing() + { + var (client, fake) = TestableClient.CreateConnected(); + // Provide many messages but we'll dispose early + fake.SubscriptionMessages = + [ + new VtqMessage + { + Tag = "Tag1", + Value = new TypedValue { DoubleValue = 1.0 }, + TimestampUtcTicks = DateTime.UtcNow.Ticks, + Quality = new QualityCode { StatusCode = 0x00000000 } + } + ]; + + var updates = new List<(string Tag, Vtq Vtq)>(); + var subscription = await client.SubscribeAsync( + ["Tag1"], + (tag, vtq) => updates.Add((tag, vtq))); + + // Dispose immediately + subscription.Dispose(); + + // Should not throw + client.Dispose(); + } +} diff --git a/lmxproxy/tests/ZB.MOM.WW.LmxProxy.Client.Tests/TypedValueConversionTests.cs b/lmxproxy/tests/ZB.MOM.WW.LmxProxy.Client.Tests/TypedValueConversionTests.cs new file mode 100644 index 0000000..0bdf7ab --- /dev/null +++ b/lmxproxy/tests/ZB.MOM.WW.LmxProxy.Client.Tests/TypedValueConversionTests.cs @@ -0,0 +1,157 @@ +using FluentAssertions; +using Xunit; +using ZB.MOM.WW.LmxProxy.Client.Domain; + +namespace ZB.MOM.WW.LmxProxy.Client.Tests; + +public class TypedValueConversionTests +{ + [Fact] + public void ConvertVtqMessage_ExtractsBoolValue() + { + var msg = CreateVtqMessage(new TypedValue { BoolValue = true }); + + var vtq = LmxProxyClient.ConvertVtqMessage(msg); + + vtq.Value.Should().Be(true); + } + + [Fact] + public void ConvertVtqMessage_ExtractsInt32Value() + { + var msg = CreateVtqMessage(new TypedValue { Int32Value = 42 }); + + var vtq = LmxProxyClient.ConvertVtqMessage(msg); + + vtq.Value.Should().Be(42); + } + + [Fact] + public void ConvertVtqMessage_ExtractsInt64Value() + { + var msg = CreateVtqMessage(new TypedValue { Int64Value = long.MaxValue }); + + var vtq = LmxProxyClient.ConvertVtqMessage(msg); + + vtq.Value.Should().Be(long.MaxValue); + } + + [Fact] + public void ConvertVtqMessage_ExtractsFloatValue() + { + var msg = CreateVtqMessage(new TypedValue { FloatValue = 3.14f }); + + var vtq = LmxProxyClient.ConvertVtqMessage(msg); + + vtq.Value.Should().Be(3.14f); + } + + [Fact] + public void ConvertVtqMessage_ExtractsDoubleValue() + { + var msg = CreateVtqMessage(new TypedValue { DoubleValue = 99.99 }); + + var vtq = LmxProxyClient.ConvertVtqMessage(msg); + + vtq.Value.Should().Be(99.99); + } + + [Fact] + public void ConvertVtqMessage_ExtractsStringValue() + { + var msg = CreateVtqMessage(new TypedValue { StringValue = "hello" }); + + var vtq = LmxProxyClient.ConvertVtqMessage(msg); + + vtq.Value.Should().Be("hello"); + } + + [Fact] + public void ConvertVtqMessage_ExtractsDateTimeValue() + { + var dt = new DateTime(2026, 3, 22, 12, 0, 0, DateTimeKind.Utc); + var msg = CreateVtqMessage(new TypedValue { DatetimeValue = dt.Ticks }); + + var vtq = LmxProxyClient.ConvertVtqMessage(msg); + + vtq.Value.Should().BeOfType(); + ((DateTime)vtq.Value!).Should().Be(dt); + } + + [Fact] + public void ConvertVtqMessage_HandlesNullTypedValue() + { + var msg = new VtqMessage + { + Tag = "NullTag", + Value = null, + TimestampUtcTicks = DateTime.UtcNow.Ticks, + Quality = new QualityCode { StatusCode = 0x00000000 } + }; + + var vtq = LmxProxyClient.ConvertVtqMessage(msg); + + vtq.Value.Should().BeNull(); + } + + [Fact] + public void ConvertVtqMessage_HandlesNullMessage() + { + var vtq = LmxProxyClient.ConvertVtqMessage(null); + + vtq.Value.Should().BeNull(); + vtq.Quality.Should().Be(Quality.Bad); + } + + [Fact] + public void ConvertVtqMessage_GoodQualityCode() + { + var msg = CreateVtqMessage(new TypedValue { DoubleValue = 1.0 }, statusCode: 0x00000000); + + var vtq = LmxProxyClient.ConvertVtqMessage(msg); + + vtq.Quality.Should().Be(Quality.Good); + } + + [Fact] + public void ConvertVtqMessage_BadQualityCode() + { + var msg = CreateVtqMessage(new TypedValue { DoubleValue = 1.0 }, statusCode: 0x80000000); + + var vtq = LmxProxyClient.ConvertVtqMessage(msg); + + vtq.Quality.Should().Be(Quality.Bad); + } + + [Fact] + public void ConvertVtqMessage_UncertainQualityCode() + { + var msg = CreateVtqMessage(new TypedValue { DoubleValue = 1.0 }, statusCode: 0x40000000); + + var vtq = LmxProxyClient.ConvertVtqMessage(msg); + + vtq.Quality.Should().Be(Quality.Uncertain); + } + + [Fact] + public void ConvertVtqMessage_MapsQualityCodeCorrectly() + { + // Test that a specific non-zero Good code still maps to Good + var msg = CreateVtqMessage(new TypedValue { Int32Value = 5 }, statusCode: 0x00D80000); + + var vtq = LmxProxyClient.ConvertVtqMessage(msg); + + vtq.Quality.Should().Be(Quality.Good); + } + + private static VtqMessage CreateVtqMessage(TypedValue value, uint statusCode = 0x00000000) + { + return new VtqMessage + { + Tag = "TestTag", + Value = value, + TimestampUtcTicks = new DateTime(2026, 1, 1, 0, 0, 0, DateTimeKind.Utc).Ticks, + Quality = new QualityCode { StatusCode = statusCode } + }; + } +}