feat(lmxproxy): phase 5 — client core (ILmxProxyClient, connection, read/write/subscribe)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Joseph Doherty
2026-03-22 00:22:29 -04:00
parent 9eb81180c0
commit 8ba75b50e8
19 changed files with 1819 additions and 0 deletions

View File

@@ -0,0 +1,13 @@
namespace ZB.MOM.WW.LmxProxy.Client;
/// <summary>
/// Configuration options for the LmxProxy client, typically set via the builder.
/// </summary>
public class ClientConfiguration
{
/// <summary>Maximum number of retry attempts for transient failures.</summary>
public int MaxRetryAttempts { get; set; } = 0;
/// <summary>Base delay between retries (exponential backoff applied).</summary>
public TimeSpan RetryDelay { get; set; } = TimeSpan.FromSeconds(1);
}

View File

@@ -0,0 +1,31 @@
namespace ZB.MOM.WW.LmxProxy.Client;
/// <summary>
/// TLS configuration for the LmxProxy gRPC client.
/// </summary>
public class ClientTlsConfiguration
{
/// <summary>Whether to use TLS for the gRPC connection.</summary>
public bool UseTls { get; set; } = false;
/// <summary>Path to the client certificate PEM file for mTLS.</summary>
public string? ClientCertificatePath { get; set; }
/// <summary>Path to the client private key PEM file for mTLS.</summary>
public string? ClientKeyPath { get; set; }
/// <summary>Path to the server CA certificate PEM file for custom trust.</summary>
public string? ServerCaCertificatePath { get; set; }
/// <summary>Override the server name used for TLS verification.</summary>
public string? ServerNameOverride { get; set; }
/// <summary>Whether to validate the server certificate.</summary>
public bool ValidateServerCertificate { get; set; } = true;
/// <summary>Whether to allow self-signed certificates.</summary>
public bool AllowSelfSignedCertificates { get; set; } = false;
/// <summary>Whether to ignore all certificate errors (dangerous).</summary>
public bool IgnoreAllCertificateErrors { get; set; } = false;
}

View File

@@ -11,4 +11,19 @@ public static class QualityExtensions
/// <summary>Returns true if quality is in the Bad family (byte < 64).</summary>
public static bool IsBad(this Quality q) => (byte)q < 64;
/// <summary>
/// Converts an OPC UA 32-bit status code to the simplified <see cref="Quality"/> enum.
/// Uses the top two bits to determine the quality family.
/// </summary>
public static Quality FromStatusCode(uint statusCode)
{
uint category = statusCode & 0xC0000000;
return category switch
{
0x00000000 => Quality.Good,
0x40000000 => Quality.Uncertain,
_ => Quality.Bad
};
}
}

View File

@@ -0,0 +1,58 @@
using ZB.MOM.WW.LmxProxy.Client.Domain;
namespace ZB.MOM.WW.LmxProxy.Client;
/// <summary>
/// Interface for LmxProxy client operations.
/// </summary>
public interface ILmxProxyClient : IDisposable, IAsyncDisposable
{
/// <summary>Gets or sets the default timeout for operations (range: 1s to 10min).</summary>
TimeSpan DefaultTimeout { get; set; }
/// <summary>Connects to the LmxProxy service and establishes a session.</summary>
Task ConnectAsync(CancellationToken cancellationToken = default);
/// <summary>Disconnects from the LmxProxy service.</summary>
Task DisconnectAsync();
/// <summary>Returns true if the client has an active session.</summary>
Task<bool> IsConnectedAsync();
/// <summary>Reads a single tag value.</summary>
Task<Vtq> ReadAsync(string address, CancellationToken cancellationToken = default);
/// <summary>Reads multiple tag values in a single batch.</summary>
Task<IDictionary<string, Vtq>> ReadBatchAsync(IEnumerable<string> addresses, CancellationToken cancellationToken = default);
/// <summary>Writes a single tag value (native TypedValue -- no string heuristics).</summary>
Task WriteAsync(string address, TypedValue value, CancellationToken cancellationToken = default);
/// <summary>Writes multiple tag values in a single batch.</summary>
Task WriteBatchAsync(IDictionary<string, TypedValue> values, CancellationToken cancellationToken = default);
/// <summary>
/// Writes a batch of values, then polls a flag tag until it matches or timeout expires.
/// Returns (writeResults, flagReached, elapsedMs).
/// </summary>
Task<WriteBatchAndWaitResponse> WriteBatchAndWaitAsync(
IDictionary<string, TypedValue> values,
string flagTag,
TypedValue flagValue,
int timeoutMs = 5000,
int pollIntervalMs = 100,
CancellationToken cancellationToken = default);
/// <summary>Subscribes to tag updates with value and error callbacks.</summary>
Task<LmxProxyClient.ISubscription> SubscribeAsync(
IEnumerable<string> addresses,
Action<string, Vtq> onUpdate,
Action<Exception>? onStreamError = null,
CancellationToken cancellationToken = default);
/// <summary>Validates an API key and returns info.</summary>
Task<LmxProxyClient.ApiKeyInfo> CheckApiKeyAsync(string apiKey, CancellationToken cancellationToken = default);
/// <summary>Returns a snapshot of client-side metrics.</summary>
Dictionary<string, object> GetMetrics();
}

View File

@@ -0,0 +1,19 @@
namespace ZB.MOM.WW.LmxProxy.Client;
public partial class LmxProxyClient
{
/// <summary>
/// Result of an API key validation check.
/// </summary>
public class ApiKeyInfo
{
/// <summary>Whether the API key is valid.</summary>
public bool IsValid { get; init; }
/// <summary>Role associated with the API key.</summary>
public string? Role { get; init; }
/// <summary>Description or message from the server.</summary>
public string? Description { get; init; }
}
}

View File

@@ -0,0 +1,82 @@
using System.Collections.Concurrent;
namespace ZB.MOM.WW.LmxProxy.Client;
public partial class LmxProxyClient
{
/// <summary>
/// Tracks per-operation counts, errors, and latency with rolling buffer and percentile support.
/// </summary>
internal class ClientMetrics
{
private readonly ConcurrentDictionary<string, long> _operationCounts = new();
private readonly ConcurrentDictionary<string, long> _errorCounts = new();
private readonly ConcurrentDictionary<string, List<long>> _latencies = new();
private readonly Lock _latencyLock = new();
public void IncrementOperationCount(string operation)
{
_operationCounts.AddOrUpdate(operation, 1, (_, count) => count + 1);
}
public void IncrementErrorCount(string operation)
{
_errorCounts.AddOrUpdate(operation, 1, (_, count) => count + 1);
}
public void RecordLatency(string operation, long milliseconds)
{
lock (_latencyLock)
{
if (!_latencies.TryGetValue(operation, out var list))
{
list = [];
_latencies[operation] = list;
}
list.Add(milliseconds);
if (list.Count > 1000)
{
list.RemoveAt(0);
}
}
}
public Dictionary<string, object> GetSnapshot()
{
var snapshot = new Dictionary<string, object>();
foreach (var kvp in _operationCounts)
{
snapshot[$"{kvp.Key}_count"] = kvp.Value;
}
foreach (var kvp in _errorCounts)
{
snapshot[$"{kvp.Key}_errors"] = kvp.Value;
}
lock (_latencyLock)
{
foreach (var kvp in _latencies)
{
var values = kvp.Value;
if (values.Count == 0) continue;
double avg = values.Average();
snapshot[$"{kvp.Key}_avg_latency_ms"] = Math.Round(avg, 2);
snapshot[$"{kvp.Key}_p95_latency_ms"] = GetPercentile(values, 95);
snapshot[$"{kvp.Key}_p99_latency_ms"] = GetPercentile(values, 99);
}
}
return snapshot;
}
private static long GetPercentile(List<long> values, int percentile)
{
var sorted = values.OrderBy(v => v).ToList();
int index = Math.Max(0, (int)Math.Ceiling(percentile / 100.0 * sorted.Count) - 1);
return sorted[index];
}
}
}

View File

@@ -0,0 +1,127 @@
using Microsoft.Extensions.Logging;
using ZB.MOM.WW.LmxProxy.Client.Domain;
namespace ZB.MOM.WW.LmxProxy.Client;
public partial class LmxProxyClient
{
private class CodeFirstSubscription : ISubscription
{
private readonly IScadaService _client;
private readonly string _sessionId;
private readonly List<string> _tags;
private readonly Action<string, Vtq> _onUpdate;
private readonly Action<Exception>? _onStreamError;
private readonly ILogger<LmxProxyClient> _logger;
private readonly Action<ISubscription>? _onDispose;
private readonly CancellationTokenSource _cts = new();
private Task? _processingTask;
private bool _disposed;
private bool _streamErrorFired;
public CodeFirstSubscription(
IScadaService client,
string sessionId,
List<string> tags,
Action<string, Vtq> onUpdate,
Action<Exception>? onStreamError,
ILogger<LmxProxyClient> logger,
Action<ISubscription>? onDispose)
{
_client = client;
_sessionId = sessionId;
_tags = tags;
_onUpdate = onUpdate;
_onStreamError = onStreamError;
_logger = logger;
_onDispose = onDispose;
}
public Task StartAsync(CancellationToken cancellationToken)
{
_processingTask = ProcessUpdatesAsync(cancellationToken);
return Task.CompletedTask;
}
private async Task ProcessUpdatesAsync(CancellationToken cancellationToken)
{
try
{
var request = new SubscribeRequest
{
SessionId = _sessionId,
Tags = _tags,
SamplingMs = 1000
};
using var linkedCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _cts.Token);
await foreach (VtqMessage vtqMsg in _client.SubscribeAsync(request, linkedCts.Token))
{
try
{
Vtq vtq = ConvertVtqMessage(vtqMsg);
_onUpdate(vtqMsg.Tag, vtq);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error processing subscription update for {Tag}", vtqMsg.Tag);
}
}
}
catch (OperationCanceledException) when (_cts.IsCancellationRequested || cancellationToken.IsCancellationRequested)
{
_logger.LogDebug("Subscription cancelled");
}
catch (Exception ex)
{
_logger.LogError(ex, "Error in subscription processing");
FireStreamError(ex);
}
finally
{
if (!_disposed)
{
_disposed = true;
_onDispose?.Invoke(this);
}
}
}
private void FireStreamError(Exception ex)
{
if (_streamErrorFired) return;
_streamErrorFired = true;
try { _onStreamError?.Invoke(ex); }
catch (Exception cbEx) { _logger.LogWarning(cbEx, "onStreamError callback threw"); }
}
public async Task DisposeAsync()
{
if (_disposed) return;
_disposed = true;
await _cts.CancelAsync();
if (_processingTask is not null)
{
try
{
await _processingTask.WaitAsync(TimeSpan.FromSeconds(5));
}
catch { /* swallow timeout or cancellation */ }
}
_cts.Dispose();
}
public void Dispose()
{
if (_disposed) return;
try
{
DisposeAsync().Wait(TimeSpan.FromSeconds(5));
}
catch { /* swallow */ }
}
}
}

View File

@@ -0,0 +1,219 @@
using Grpc.Net.Client;
using Microsoft.Extensions.Logging;
using ProtoBuf.Grpc.Client;
using ZB.MOM.WW.LmxProxy.Client.Domain;
using ZB.MOM.WW.LmxProxy.Client.Security;
namespace ZB.MOM.WW.LmxProxy.Client;
public partial class LmxProxyClient
{
/// <inheritdoc />
public async Task ConnectAsync(CancellationToken cancellationToken = default)
{
await _connectionLock.WaitAsync(cancellationToken);
try
{
ObjectDisposedException.ThrowIf(_disposed, this);
if (IsConnected)
return;
var endpoint = BuildEndpointUri();
_logger.LogInformation("Connecting to LmxProxy at {Endpoint}", endpoint);
GrpcChannel channel = GrpcChannelFactory.CreateChannel(endpoint, _tlsConfiguration, _logger);
IScadaService client;
try
{
client = channel.CreateGrpcService<IScadaService>();
}
catch
{
channel.Dispose();
throw;
}
ConnectResponse response;
try
{
var request = new ConnectRequest
{
ClientId = $"ScadaBridge-{Guid.NewGuid():N}",
ApiKey = _apiKey ?? string.Empty
};
response = await client.ConnectAsync(request);
}
catch
{
channel.Dispose();
throw;
}
if (!response.Success)
{
channel.Dispose();
throw new InvalidOperationException($"Connect failed: {response.Message}");
}
_channel = channel;
_client = client;
_sessionId = response.SessionId;
_isConnected = true;
StartKeepAlive();
_logger.LogInformation("Connected to LmxProxy, session={SessionId}", _sessionId);
}
catch (Exception ex)
{
_channel = null;
_client = null;
_sessionId = string.Empty;
_isConnected = false;
_logger.LogError(ex, "Failed to connect to LmxProxy");
throw;
}
finally
{
_connectionLock.Release();
}
}
/// <inheritdoc />
public async Task DisconnectAsync()
{
await _connectionLock.WaitAsync();
try
{
StopKeepAlive();
if (_client is not null && !string.IsNullOrEmpty(_sessionId))
{
try
{
await _client.DisconnectAsync(new DisconnectRequest { SessionId = _sessionId });
}
catch (Exception ex)
{
_logger.LogWarning(ex, "Error sending disconnect request");
}
}
_client = null;
_sessionId = string.Empty;
_isConnected = false;
_channel?.Dispose();
_channel = null;
}
finally
{
_connectionLock.Release();
}
}
/// <inheritdoc cref="SubscribeAsync"/>
public async Task<ISubscription> SubscribeAsync(
IEnumerable<string> addresses,
Action<string, Vtq> onUpdate,
Action<Exception>? onStreamError = null,
CancellationToken cancellationToken = default)
{
EnsureConnected();
var subscription = new CodeFirstSubscription(
_client!,
_sessionId,
addresses.ToList(),
onUpdate,
onStreamError,
_logger,
sub =>
{
lock (_subscriptionLock)
{
_activeSubscriptions.Remove(sub);
}
});
lock (_subscriptionLock)
{
_activeSubscriptions.Add(subscription);
}
await subscription.StartAsync(cancellationToken);
return subscription;
}
private void StartKeepAlive()
{
_keepAliveTimer = new Timer(
async _ => await KeepAliveCallback(),
null,
_keepAliveInterval,
_keepAliveInterval);
}
private async Task KeepAliveCallback()
{
try
{
if (_client is null || string.IsNullOrEmpty(_sessionId))
return;
await _client.GetConnectionStateAsync(new GetConnectionStateRequest { SessionId = _sessionId });
}
catch (Exception ex)
{
_logger.LogWarning(ex, "Keep-alive failed, marking disconnected");
StopKeepAlive();
await MarkDisconnectedAsync(ex);
}
}
private void StopKeepAlive()
{
_keepAliveTimer?.Dispose();
_keepAliveTimer = null;
}
internal async Task MarkDisconnectedAsync(Exception ex)
{
if (_disposed) return;
await _connectionLock.WaitAsync();
try
{
_isConnected = false;
_client = null;
_sessionId = string.Empty;
_channel?.Dispose();
_channel = null;
}
finally
{
_connectionLock.Release();
}
List<ISubscription> subscriptions;
lock (_subscriptionLock)
{
subscriptions = [.. _activeSubscriptions];
_activeSubscriptions.Clear();
}
foreach (var sub in subscriptions)
{
try { sub.Dispose(); }
catch { /* swallow */ }
}
_logger.LogWarning(ex, "Client marked as disconnected");
}
private Uri BuildEndpointUri()
{
string scheme = _tlsConfiguration?.UseTls == true ? Uri.UriSchemeHttps : Uri.UriSchemeHttp;
return new UriBuilder { Scheme = scheme, Host = _host, Port = _port }.Uri;
}
}

View File

@@ -0,0 +1,13 @@
namespace ZB.MOM.WW.LmxProxy.Client;
public partial class LmxProxyClient
{
/// <summary>
/// Represents an active tag subscription. Dispose to unsubscribe.
/// </summary>
public interface ISubscription : IDisposable
{
/// <summary>Asynchronous disposal with cancellation support.</summary>
Task DisposeAsync();
}
}

View File

@@ -0,0 +1,314 @@
using System.Diagnostics;
using Grpc.Core;
using Grpc.Net.Client;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using Polly;
using Polly.Retry;
using ZB.MOM.WW.LmxProxy.Client.Domain;
namespace ZB.MOM.WW.LmxProxy.Client;
/// <summary>
/// gRPC client for the LmxProxy SCADA proxy service. Uses v2 protocol with native TypedValue.
/// </summary>
public partial class LmxProxyClient : ILmxProxyClient
{
private readonly ILogger<LmxProxyClient> _logger;
private readonly string _host;
private readonly int _port;
private readonly string? _apiKey;
private readonly ClientTlsConfiguration? _tlsConfiguration;
private readonly ClientMetrics _metrics = new();
private readonly SemaphoreSlim _connectionLock = new(1, 1);
private readonly List<ISubscription> _activeSubscriptions = [];
private readonly Lock _subscriptionLock = new();
private GrpcChannel? _channel;
private IScadaService? _client;
private string _sessionId = string.Empty;
private bool _disposed;
private bool _isConnected;
private TimeSpan _defaultTimeout = TimeSpan.FromSeconds(30);
private ClientConfiguration? _configuration;
private ResiliencePipeline? _resiliencePipeline;
private Timer? _keepAliveTimer;
private readonly TimeSpan _keepAliveInterval = TimeSpan.FromSeconds(30);
/// <summary>Returns true if the client has an active session and is not disposed.</summary>
public bool IsConnected => !_disposed && _isConnected && !string.IsNullOrEmpty(_sessionId);
/// <inheritdoc />
public TimeSpan DefaultTimeout
{
get => _defaultTimeout;
set
{
if (value < TimeSpan.FromSeconds(1) || value > TimeSpan.FromMinutes(10))
throw new ArgumentOutOfRangeException(nameof(value), "DefaultTimeout must be between 1 second and 10 minutes.");
_defaultTimeout = value;
}
}
/// <summary>
/// Creates a new LmxProxyClient instance.
/// </summary>
public LmxProxyClient(
string host, int port, string? apiKey,
ClientTlsConfiguration? tlsConfiguration,
ILogger<LmxProxyClient>? logger = null)
{
_host = host ?? throw new ArgumentNullException(nameof(host));
_port = port;
_apiKey = apiKey;
_tlsConfiguration = tlsConfiguration;
_logger = logger ?? NullLogger<LmxProxyClient>.Instance;
}
/// <summary>
/// Sets builder configuration including retry policies. Called internally by the builder.
/// </summary>
internal void SetBuilderConfiguration(ClientConfiguration config)
{
_configuration = config;
if (config.MaxRetryAttempts > 0)
{
_resiliencePipeline = new ResiliencePipelineBuilder()
.AddRetry(new RetryStrategyOptions
{
MaxRetryAttempts = config.MaxRetryAttempts,
Delay = config.RetryDelay,
BackoffType = DelayBackoffType.Exponential,
ShouldHandle = new PredicateBuilder()
.Handle<RpcException>(ex =>
ex.StatusCode == StatusCode.Unavailable ||
ex.StatusCode == StatusCode.DeadlineExceeded ||
ex.StatusCode == StatusCode.ResourceExhausted ||
ex.StatusCode == StatusCode.Aborted),
OnRetry = args =>
{
_logger.LogWarning("Retry {Attempt} after {Delay} for {Exception}",
args.AttemptNumber, args.RetryDelay, args.Outcome.Exception?.Message);
return ValueTask.CompletedTask;
}
})
.Build();
}
}
/// <inheritdoc />
public async Task<Vtq> ReadAsync(string address, CancellationToken cancellationToken = default)
{
EnsureConnected();
_metrics.IncrementOperationCount("Read");
var sw = Stopwatch.StartNew();
try
{
var request = new ReadRequest { SessionId = _sessionId, Tag = address };
ReadResponse response = await ExecuteWithRetry(
() => _client!.ReadAsync(request).AsTask(), cancellationToken);
if (!response.Success)
throw new InvalidOperationException($"Read failed: {response.Message}");
return ConvertVtqMessage(response.Vtq);
}
catch
{
_metrics.IncrementErrorCount("Read");
throw;
}
finally
{
sw.Stop();
_metrics.RecordLatency("Read", sw.ElapsedMilliseconds);
}
}
/// <inheritdoc />
public async Task<IDictionary<string, Vtq>> ReadBatchAsync(
IEnumerable<string> addresses, CancellationToken cancellationToken = default)
{
EnsureConnected();
_metrics.IncrementOperationCount("ReadBatch");
var sw = Stopwatch.StartNew();
try
{
var request = new ReadBatchRequest { SessionId = _sessionId, Tags = addresses.ToList() };
ReadBatchResponse response = await ExecuteWithRetry(
() => _client!.ReadBatchAsync(request).AsTask(), cancellationToken);
var result = new Dictionary<string, Vtq>();
foreach (var vtqMsg in response.Vtqs)
{
result[vtqMsg.Tag] = ConvertVtqMessage(vtqMsg);
}
return result;
}
catch
{
_metrics.IncrementErrorCount("ReadBatch");
throw;
}
finally
{
sw.Stop();
_metrics.RecordLatency("ReadBatch", sw.ElapsedMilliseconds);
}
}
/// <inheritdoc />
public async Task WriteAsync(string address, TypedValue value, CancellationToken cancellationToken = default)
{
EnsureConnected();
_metrics.IncrementOperationCount("Write");
var sw = Stopwatch.StartNew();
try
{
var request = new WriteRequest { SessionId = _sessionId, Tag = address, Value = value };
WriteResponse response = await ExecuteWithRetry(
() => _client!.WriteAsync(request).AsTask(), cancellationToken);
if (!response.Success)
throw new InvalidOperationException($"Write failed: {response.Message}");
}
catch
{
_metrics.IncrementErrorCount("Write");
throw;
}
finally
{
sw.Stop();
_metrics.RecordLatency("Write", sw.ElapsedMilliseconds);
}
}
/// <inheritdoc />
public async Task WriteBatchAsync(IDictionary<string, TypedValue> values, CancellationToken cancellationToken = default)
{
EnsureConnected();
_metrics.IncrementOperationCount("WriteBatch");
var sw = Stopwatch.StartNew();
try
{
var request = new WriteBatchRequest
{
SessionId = _sessionId,
Items = values.Select(kv => new WriteItem { Tag = kv.Key, Value = kv.Value }).ToList()
};
WriteBatchResponse response = await ExecuteWithRetry(
() => _client!.WriteBatchAsync(request).AsTask(), cancellationToken);
if (!response.Success)
throw new InvalidOperationException($"WriteBatch failed: {response.Message}");
}
catch
{
_metrics.IncrementErrorCount("WriteBatch");
throw;
}
finally
{
sw.Stop();
_metrics.RecordLatency("WriteBatch", sw.ElapsedMilliseconds);
}
}
/// <inheritdoc />
public async Task<WriteBatchAndWaitResponse> WriteBatchAndWaitAsync(
IDictionary<string, TypedValue> values, string flagTag, TypedValue flagValue,
int timeoutMs = 5000, int pollIntervalMs = 100, CancellationToken cancellationToken = default)
{
EnsureConnected();
var request = new WriteBatchAndWaitRequest
{
SessionId = _sessionId,
Items = values.Select(kv => new WriteItem { Tag = kv.Key, Value = kv.Value }).ToList(),
FlagTag = flagTag,
FlagValue = flagValue,
TimeoutMs = timeoutMs,
PollIntervalMs = pollIntervalMs
};
return await ExecuteWithRetry(
() => _client!.WriteBatchAndWaitAsync(request).AsTask(), cancellationToken);
}
/// <inheritdoc />
public async Task<ApiKeyInfo> CheckApiKeyAsync(string apiKey, CancellationToken cancellationToken = default)
{
EnsureConnected();
var request = new CheckApiKeyRequest { ApiKey = apiKey };
CheckApiKeyResponse response = await _client!.CheckApiKeyAsync(request);
return new ApiKeyInfo { IsValid = response.IsValid, Description = response.Message };
}
/// <inheritdoc />
public Task<bool> IsConnectedAsync() => Task.FromResult(IsConnected);
/// <inheritdoc />
public Dictionary<string, object> GetMetrics() => _metrics.GetSnapshot();
internal static Vtq ConvertVtqMessage(VtqMessage? msg)
{
if (msg is null)
return new Vtq(null, DateTime.UtcNow, Quality.Bad);
object? value = ExtractTypedValue(msg.Value);
DateTime timestamp = msg.TimestampUtcTicks > 0
? new DateTime(msg.TimestampUtcTicks, DateTimeKind.Utc)
: DateTime.UtcNow;
Quality quality = QualityExtensions.FromStatusCode(msg.Quality?.StatusCode ?? 0x80000000u);
return new Vtq(value, timestamp, quality);
}
internal static object? ExtractTypedValue(TypedValue? tv)
{
if (tv is null) return null;
return tv.GetValueCase() switch
{
TypedValueCase.BoolValue => tv.BoolValue,
TypedValueCase.Int32Value => tv.Int32Value,
TypedValueCase.Int64Value => tv.Int64Value,
TypedValueCase.FloatValue => tv.FloatValue,
TypedValueCase.DoubleValue => tv.DoubleValue,
TypedValueCase.StringValue => tv.StringValue,
TypedValueCase.BytesValue => tv.BytesValue,
TypedValueCase.DatetimeValue => new DateTime(tv.DatetimeValue, DateTimeKind.Utc),
TypedValueCase.ArrayValue => tv.ArrayValue,
TypedValueCase.None => null,
_ => null
};
}
private async Task<T> ExecuteWithRetry<T>(Func<Task<T>> operation, CancellationToken ct)
{
if (_resiliencePipeline is not null)
{
return await _resiliencePipeline.ExecuteAsync(
async token => await operation(), ct);
}
return await operation();
}
private void EnsureConnected()
{
ObjectDisposedException.ThrowIf(_disposed, this);
if (!IsConnected)
throw new InvalidOperationException("Client is not connected. Call ConnectAsync first.");
}
/// <inheritdoc />
public void Dispose()
{
if (_disposed) return;
_disposed = true;
_keepAliveTimer?.Dispose();
_channel?.Dispose();
_connectionLock.Dispose();
}
/// <inheritdoc />
public async ValueTask DisposeAsync()
{
if (_disposed) return;
try { await DisconnectAsync(); } catch { /* swallow */ }
Dispose();
}
}

View File

@@ -0,0 +1,103 @@
using System.Net.Security;
using System.Security.Authentication;
using System.Security.Cryptography.X509Certificates;
using Grpc.Net.Client;
using Microsoft.Extensions.Logging;
namespace ZB.MOM.WW.LmxProxy.Client.Security;
/// <summary>
/// Factory for creating configured gRPC channels with TLS support.
/// </summary>
internal static class GrpcChannelFactory
{
#pragma warning disable CA1810 // Initialize reference type static fields inline
static GrpcChannelFactory()
#pragma warning restore CA1810
{
// Enable HTTP/2 over plaintext for non-TLS scenarios
AppContext.SetSwitch("System.Net.Http.SocketsHttpHandler.Http2UnencryptedSupport", true);
}
/// <summary>
/// Creates a <see cref="GrpcChannel"/> with the specified address and TLS configuration.
/// </summary>
public static GrpcChannel CreateChannel(Uri address, ClientTlsConfiguration? tlsConfiguration, ILogger logger)
{
var handler = new SocketsHttpHandler
{
EnableMultipleHttp2Connections = true
};
if (tlsConfiguration?.UseTls == true)
{
ConfigureTls(handler, tlsConfiguration, logger);
}
var channelOptions = new GrpcChannelOptions
{
HttpHandler = handler
};
logger.LogDebug("Creating gRPC channel to {Address}, TLS={UseTls}", address, tlsConfiguration?.UseTls ?? false);
return GrpcChannel.ForAddress(address, channelOptions);
}
private static void ConfigureTls(SocketsHttpHandler handler, ClientTlsConfiguration tls, ILogger logger)
{
handler.SslOptions = new SslClientAuthenticationOptions
{
EnabledSslProtocols = SslProtocols.Tls12 | SslProtocols.Tls13
};
if (!string.IsNullOrEmpty(tls.ServerNameOverride))
{
handler.SslOptions.TargetHost = tls.ServerNameOverride;
}
// Load client certificate for mTLS
if (!string.IsNullOrEmpty(tls.ClientCertificatePath) && !string.IsNullOrEmpty(tls.ClientKeyPath))
{
var clientCert = X509Certificate2.CreateFromPemFile(tls.ClientCertificatePath, tls.ClientKeyPath);
handler.SslOptions.ClientCertificates = [clientCert];
logger.LogDebug("Loaded client certificate for mTLS from {Path}", tls.ClientCertificatePath);
}
// Certificate validation callback
handler.SslOptions.RemoteCertificateValidationCallback = (sender, certificate, chain, sslPolicyErrors) =>
{
if (tls.IgnoreAllCertificateErrors)
{
logger.LogWarning("Ignoring all certificate errors (IgnoreAllCertificateErrors=true)");
return true;
}
if (!tls.ValidateServerCertificate)
{
return true;
}
if (sslPolicyErrors == SslPolicyErrors.None)
return true;
// Custom CA trust store
if (!string.IsNullOrEmpty(tls.ServerCaCertificatePath) && certificate is not null)
{
using var customChain = new X509Chain();
customChain.ChainPolicy.TrustMode = X509ChainTrustMode.CustomRootTrust;
customChain.ChainPolicy.CustomTrustStore.Add(X509CertificateLoader.LoadCertificateFromFile(tls.ServerCaCertificatePath));
if (customChain.Build(new X509Certificate2(certificate)))
return true;
}
if (tls.AllowSelfSignedCertificates && sslPolicyErrors == SslPolicyErrors.RemoteCertificateChainErrors)
{
logger.LogWarning("Allowing self-signed certificate");
return true;
}
logger.LogError("Certificate validation failed: {Errors}", sslPolicyErrors);
return false;
};
}
}

View File

@@ -14,6 +14,10 @@
<Platforms>AnyCPU</Platforms>
</PropertyGroup>
<ItemGroup>
<InternalsVisibleTo Include="ZB.MOM.WW.LmxProxy.Client.Tests" />
</ItemGroup>
<ItemGroup>
<PackageReference Include="Grpc.Core.Api" Version="2.71.0" />
<PackageReference Include="Grpc.Net.Client" Version="2.71.0" />

View File

@@ -0,0 +1,122 @@
using FluentAssertions;
using Xunit;
namespace ZB.MOM.WW.LmxProxy.Client.Tests;
public class ClientMetricsTests
{
private static LmxProxyClient.ClientMetrics CreateMetrics() => new();
[Fact]
public void IncrementOperationCount_Increments()
{
var metrics = CreateMetrics();
metrics.IncrementOperationCount("Read");
metrics.IncrementOperationCount("Read");
metrics.IncrementOperationCount("Read");
var snapshot = metrics.GetSnapshot();
snapshot["Read_count"].Should().Be(3L);
}
[Fact]
public void IncrementErrorCount_Increments()
{
var metrics = CreateMetrics();
metrics.IncrementErrorCount("Write");
metrics.IncrementErrorCount("Write");
var snapshot = metrics.GetSnapshot();
snapshot["Write_errors"].Should().Be(2L);
}
[Fact]
public void RecordLatency_StoresValues()
{
var metrics = CreateMetrics();
metrics.RecordLatency("Read", 10);
metrics.RecordLatency("Read", 20);
metrics.RecordLatency("Read", 30);
var snapshot = metrics.GetSnapshot();
snapshot.Should().ContainKey("Read_avg_latency_ms");
snapshot.Should().ContainKey("Read_p95_latency_ms");
snapshot.Should().ContainKey("Read_p99_latency_ms");
var avg = (double)snapshot["Read_avg_latency_ms"];
avg.Should().BeApproximately(20.0, 0.1);
}
[Fact]
public void RollingBuffer_CapsAt1000()
{
var metrics = CreateMetrics();
for (int i = 0; i < 1100; i++)
{
metrics.RecordLatency("Read", i);
}
var snapshot = metrics.GetSnapshot();
// After 1100 entries, the buffer should have capped at 1000 (oldest removed)
// The earliest remaining value should be 100 (entries 0-99 were evicted)
var p95 = (long)snapshot["Read_p95_latency_ms"];
// p95 of values 100-1099 should be around 1050
p95.Should().BeGreaterThan(900);
}
[Fact]
public void GetSnapshot_IncludesP95AndP99()
{
var metrics = CreateMetrics();
// Add 100 values: 1, 2, 3, ..., 100
for (int i = 1; i <= 100; i++)
{
metrics.RecordLatency("Op", i);
}
var snapshot = metrics.GetSnapshot();
var p95 = (long)snapshot["Op_p95_latency_ms"];
var p99 = (long)snapshot["Op_p99_latency_ms"];
// P95 of 1..100 should be 95
p95.Should().Be(95);
// P99 of 1..100 should be 99
p99.Should().Be(99);
}
[Fact]
public void GetSnapshot_ReturnsEmptyForNoData()
{
var metrics = CreateMetrics();
var snapshot = metrics.GetSnapshot();
snapshot.Should().BeEmpty();
}
[Fact]
public void GetSnapshot_TracksMultipleOperations()
{
var metrics = CreateMetrics();
metrics.IncrementOperationCount("Read");
metrics.IncrementOperationCount("Write");
metrics.IncrementErrorCount("Read");
metrics.RecordLatency("Read", 10);
metrics.RecordLatency("Write", 20);
var snapshot = metrics.GetSnapshot();
snapshot["Read_count"].Should().Be(1L);
snapshot["Write_count"].Should().Be(1L);
snapshot["Read_errors"].Should().Be(1L);
snapshot.Should().ContainKey("Read_avg_latency_ms");
snapshot.Should().ContainKey("Write_avg_latency_ms");
}
}

View File

@@ -0,0 +1,112 @@
using System.Runtime.CompilerServices;
using ZB.MOM.WW.LmxProxy.Client.Domain;
namespace ZB.MOM.WW.LmxProxy.Client.Tests.Fakes;
/// <summary>
/// Hand-written fake implementation of IScadaService for unit testing.
/// </summary>
internal class FakeScadaService : IScadaService
{
// Configure responses
public ConnectResponse ConnectResponseToReturn { get; set; } = new() { Success = true, SessionId = "test-session-123", Message = "OK" };
public DisconnectResponse DisconnectResponseToReturn { get; set; } = new() { Success = true, Message = "OK" };
public GetConnectionStateResponse GetConnectionStateResponseToReturn { get; set; } = new() { IsConnected = true };
public ReadResponse ReadResponseToReturn { get; set; } = new() { Success = true };
public ReadBatchResponse ReadBatchResponseToReturn { get; set; } = new() { Success = true };
public WriteResponse WriteResponseToReturn { get; set; } = new() { Success = true };
public WriteBatchResponse WriteBatchResponseToReturn { get; set; } = new() { Success = true };
public WriteBatchAndWaitResponse WriteBatchAndWaitResponseToReturn { get; set; } = new() { Success = true };
public CheckApiKeyResponse CheckApiKeyResponseToReturn { get; set; } = new() { IsValid = true, Message = "Valid" };
// Track calls
public List<ConnectRequest> ConnectCalls { get; } = [];
public List<DisconnectRequest> DisconnectCalls { get; } = [];
public List<GetConnectionStateRequest> GetConnectionStateCalls { get; } = [];
public List<ReadRequest> ReadCalls { get; } = [];
public List<ReadBatchRequest> ReadBatchCalls { get; } = [];
public List<WriteRequest> WriteCalls { get; } = [];
public List<WriteBatchRequest> WriteBatchCalls { get; } = [];
public List<WriteBatchAndWaitRequest> WriteBatchAndWaitCalls { get; } = [];
public List<CheckApiKeyRequest> CheckApiKeyCalls { get; } = [];
public List<SubscribeRequest> SubscribeCalls { get; } = [];
// Error injection
public Exception? GetConnectionStateException { get; set; }
// Subscription data
public List<VtqMessage> SubscriptionMessages { get; set; } = [];
public Exception? SubscriptionException { get; set; }
public ValueTask<ConnectResponse> ConnectAsync(ConnectRequest request)
{
ConnectCalls.Add(request);
return new ValueTask<ConnectResponse>(ConnectResponseToReturn);
}
public ValueTask<DisconnectResponse> DisconnectAsync(DisconnectRequest request)
{
DisconnectCalls.Add(request);
return new ValueTask<DisconnectResponse>(DisconnectResponseToReturn);
}
public ValueTask<GetConnectionStateResponse> GetConnectionStateAsync(GetConnectionStateRequest request)
{
GetConnectionStateCalls.Add(request);
if (GetConnectionStateException is not null)
throw GetConnectionStateException;
return new ValueTask<GetConnectionStateResponse>(GetConnectionStateResponseToReturn);
}
public ValueTask<ReadResponse> ReadAsync(ReadRequest request)
{
ReadCalls.Add(request);
return new ValueTask<ReadResponse>(ReadResponseToReturn);
}
public ValueTask<ReadBatchResponse> ReadBatchAsync(ReadBatchRequest request)
{
ReadBatchCalls.Add(request);
return new ValueTask<ReadBatchResponse>(ReadBatchResponseToReturn);
}
public ValueTask<WriteResponse> WriteAsync(WriteRequest request)
{
WriteCalls.Add(request);
return new ValueTask<WriteResponse>(WriteResponseToReturn);
}
public ValueTask<WriteBatchResponse> WriteBatchAsync(WriteBatchRequest request)
{
WriteBatchCalls.Add(request);
return new ValueTask<WriteBatchResponse>(WriteBatchResponseToReturn);
}
public ValueTask<WriteBatchAndWaitResponse> WriteBatchAndWaitAsync(WriteBatchAndWaitRequest request)
{
WriteBatchAndWaitCalls.Add(request);
return new ValueTask<WriteBatchAndWaitResponse>(WriteBatchAndWaitResponseToReturn);
}
public ValueTask<CheckApiKeyResponse> CheckApiKeyAsync(CheckApiKeyRequest request)
{
CheckApiKeyCalls.Add(request);
return new ValueTask<CheckApiKeyResponse>(CheckApiKeyResponseToReturn);
}
public async IAsyncEnumerable<VtqMessage> SubscribeAsync(
SubscribeRequest request, [EnumeratorCancellation] CancellationToken cancellationToken = default)
{
SubscribeCalls.Add(request);
foreach (var msg in SubscriptionMessages)
{
cancellationToken.ThrowIfCancellationRequested();
yield return msg;
await Task.Yield();
}
if (SubscriptionException is not null)
throw SubscriptionException;
}
}

View File

@@ -0,0 +1,50 @@
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using ZB.MOM.WW.LmxProxy.Client.Domain;
namespace ZB.MOM.WW.LmxProxy.Client.Tests.Fakes;
/// <summary>
/// Helper to create an LmxProxyClient wired to a FakeScadaService, bypassing real gRPC.
/// Uses reflection to set private fields since the client has no test seam for IScadaService injection.
/// </summary>
internal static class TestableClient
{
/// <summary>
/// Creates an LmxProxyClient with a fake service injected into its internal state,
/// simulating a connected client.
/// </summary>
public static (LmxProxyClient Client, FakeScadaService Fake) CreateConnected(
string sessionId = "test-session-123",
ILogger<LmxProxyClient>? logger = null)
{
var fake = new FakeScadaService
{
ConnectResponseToReturn = new ConnectResponse
{
Success = true,
SessionId = sessionId,
Message = "OK"
}
};
var client = new LmxProxyClient("localhost", 50051, "test-key", null, logger);
// Use reflection to inject fake service and simulate connected state
var clientType = typeof(LmxProxyClient);
var clientField = clientType.GetField("_client",
System.Reflection.BindingFlags.NonPublic | System.Reflection.BindingFlags.Instance)!;
clientField.SetValue(client, fake);
var sessionField = clientType.GetField("_sessionId",
System.Reflection.BindingFlags.NonPublic | System.Reflection.BindingFlags.Instance)!;
sessionField.SetValue(client, sessionId);
var connectedField = clientType.GetField("_isConnected",
System.Reflection.BindingFlags.NonPublic | System.Reflection.BindingFlags.Instance)!;
connectedField.SetValue(client, true);
return (client, fake);
}
}

View File

@@ -0,0 +1,103 @@
using FluentAssertions;
using Xunit;
using ZB.MOM.WW.LmxProxy.Client.Domain;
using ZB.MOM.WW.LmxProxy.Client.Tests.Fakes;
namespace ZB.MOM.WW.LmxProxy.Client.Tests;
public class LmxProxyClientConnectionTests
{
[Fact]
public async Task IsConnectedAsync_ReturnsFalseBeforeConnect()
{
var client = new LmxProxyClient("localhost", 50051, null, null);
var result = await client.IsConnectedAsync();
result.Should().BeFalse();
client.Dispose();
}
[Fact]
public async Task IsConnectedAsync_ReturnsTrueAfterInjection()
{
var (client, _) = TestableClient.CreateConnected();
var result = await client.IsConnectedAsync();
result.Should().BeTrue();
client.Dispose();
}
[Fact]
public async Task DisconnectAsync_SendsDisconnectAndClearsState()
{
var (client, fake) = TestableClient.CreateConnected();
await client.DisconnectAsync();
fake.DisconnectCalls.Should().HaveCount(1);
fake.DisconnectCalls[0].SessionId.Should().Be("test-session-123");
client.IsConnected.Should().BeFalse();
client.Dispose();
}
[Fact]
public async Task DisconnectAsync_SwallowsExceptions()
{
var (client, fake) = TestableClient.CreateConnected();
fake.DisconnectResponseToReturn = null!; // Force an error path
// Should not throw
var act = () => client.DisconnectAsync();
await act.Should().NotThrowAsync();
client.Dispose();
}
[Fact]
public void IsConnected_ReturnsFalseAfterDispose()
{
var (client, _) = TestableClient.CreateConnected();
client.Dispose();
client.IsConnected.Should().BeFalse();
}
[Fact]
public async Task MarkDisconnectedAsync_ClearsConnectionState()
{
var (client, _) = TestableClient.CreateConnected();
await client.MarkDisconnectedAsync(new Exception("connection lost"));
client.IsConnected.Should().BeFalse();
client.Dispose();
}
[Fact]
public void DefaultTimeout_RejectsOutOfRange()
{
var client = new LmxProxyClient("localhost", 50051, null, null);
var act = () => client.DefaultTimeout = TimeSpan.FromMilliseconds(500);
act.Should().Throw<ArgumentOutOfRangeException>();
var act2 = () => client.DefaultTimeout = TimeSpan.FromMinutes(11);
act2.Should().Throw<ArgumentOutOfRangeException>();
client.Dispose();
}
[Fact]
public void DefaultTimeout_AcceptsValidRange()
{
var client = new LmxProxyClient("localhost", 50051, null, null);
client.DefaultTimeout = TimeSpan.FromSeconds(5);
client.DefaultTimeout.Should().Be(TimeSpan.FromSeconds(5));
client.Dispose();
}
}

View File

@@ -0,0 +1,177 @@
using FluentAssertions;
using Xunit;
using ZB.MOM.WW.LmxProxy.Client.Domain;
using ZB.MOM.WW.LmxProxy.Client.Tests.Fakes;
namespace ZB.MOM.WW.LmxProxy.Client.Tests;
public class LmxProxyClientReadWriteTests
{
[Fact]
public async Task ReadAsync_ReturnsVtqFromResponse()
{
var (client, fake) = TestableClient.CreateConnected();
fake.ReadResponseToReturn = new ReadResponse
{
Success = true,
Vtq = new VtqMessage
{
Tag = "TestTag",
Value = new TypedValue { DoubleValue = 42.5 },
TimestampUtcTicks = new DateTime(2026, 1, 1, 0, 0, 0, DateTimeKind.Utc).Ticks,
Quality = new QualityCode { StatusCode = 0x00000000 }
}
};
var result = await client.ReadAsync("TestTag");
result.Value.Should().Be(42.5);
result.Quality.Should().Be(Quality.Good);
fake.ReadCalls.Should().HaveCount(1);
fake.ReadCalls[0].Tag.Should().Be("TestTag");
fake.ReadCalls[0].SessionId.Should().Be("test-session-123");
client.Dispose();
}
[Fact]
public async Task ReadAsync_ThrowsOnFailureResponse()
{
var (client, fake) = TestableClient.CreateConnected();
fake.ReadResponseToReturn = new ReadResponse { Success = false, Message = "Tag not found" };
var act = () => client.ReadAsync("BadTag");
await act.Should().ThrowAsync<InvalidOperationException>()
.WithMessage("*Tag not found*");
client.Dispose();
}
[Fact]
public async Task ReadAsync_ThrowsWhenNotConnected()
{
var client = new LmxProxyClient("localhost", 50051, null, null);
var act = () => client.ReadAsync("AnyTag");
await act.Should().ThrowAsync<InvalidOperationException>()
.WithMessage("*not connected*");
client.Dispose();
}
[Fact]
public async Task ReadBatchAsync_ReturnsDictionaryOfVtqs()
{
var (client, fake) = TestableClient.CreateConnected();
fake.ReadBatchResponseToReturn = new ReadBatchResponse
{
Success = true,
Vtqs =
[
new VtqMessage
{
Tag = "Tag1",
Value = new TypedValue { Int32Value = 100 },
TimestampUtcTicks = DateTime.UtcNow.Ticks,
Quality = new QualityCode { StatusCode = 0x00000000 }
},
new VtqMessage
{
Tag = "Tag2",
Value = new TypedValue { BoolValue = true },
TimestampUtcTicks = DateTime.UtcNow.Ticks,
Quality = new QualityCode { StatusCode = 0x00000000 }
}
]
};
var result = await client.ReadBatchAsync(["Tag1", "Tag2"]);
result.Should().HaveCount(2);
result["Tag1"].Value.Should().Be(100);
result["Tag2"].Value.Should().Be(true);
client.Dispose();
}
[Fact]
public async Task WriteAsync_SendsTypedValueDirectly()
{
var (client, fake) = TestableClient.CreateConnected();
var typedValue = new TypedValue { DoubleValue = 99.9 };
await client.WriteAsync("TestTag", typedValue);
fake.WriteCalls.Should().HaveCount(1);
fake.WriteCalls[0].Tag.Should().Be("TestTag");
fake.WriteCalls[0].Value.Should().NotBeNull();
fake.WriteCalls[0].Value!.DoubleValue.Should().Be(99.9);
client.Dispose();
}
[Fact]
public async Task WriteAsync_ThrowsOnFailureResponse()
{
var (client, fake) = TestableClient.CreateConnected();
fake.WriteResponseToReturn = new WriteResponse { Success = false, Message = "Write error" };
var act = () => client.WriteAsync("Tag", new TypedValue { Int32Value = 1 });
await act.Should().ThrowAsync<InvalidOperationException>()
.WithMessage("*Write error*");
client.Dispose();
}
[Fact]
public async Task WriteBatchAsync_SendsAllItems()
{
var (client, fake) = TestableClient.CreateConnected();
var values = new Dictionary<string, TypedValue>
{
["Tag1"] = new TypedValue { DoubleValue = 1.0 },
["Tag2"] = new TypedValue { Int32Value = 2 },
["Tag3"] = new TypedValue { BoolValue = true }
};
await client.WriteBatchAsync(values);
fake.WriteBatchCalls.Should().HaveCount(1);
fake.WriteBatchCalls[0].Items.Should().HaveCount(3);
client.Dispose();
}
[Fact]
public async Task WriteBatchAndWaitAsync_ReturnsResponse()
{
var (client, fake) = TestableClient.CreateConnected();
fake.WriteBatchAndWaitResponseToReturn = new WriteBatchAndWaitResponse
{
Success = true,
FlagReached = true,
ElapsedMs = 150,
WriteResults = [new WriteResult { Tag = "Tag1", Success = true }]
};
var values = new Dictionary<string, TypedValue>
{
["Tag1"] = new TypedValue { Int32Value = 1 }
};
var result = await client.WriteBatchAndWaitAsync(
values, "FlagTag", new TypedValue { BoolValue = true });
result.FlagReached.Should().BeTrue();
result.ElapsedMs.Should().Be(150);
client.Dispose();
}
[Fact]
public async Task CheckApiKeyAsync_ReturnsApiKeyInfo()
{
var (client, fake) = TestableClient.CreateConnected();
fake.CheckApiKeyResponseToReturn = new CheckApiKeyResponse { IsValid = true, Message = "Admin key" };
var result = await client.CheckApiKeyAsync("my-api-key");
result.IsValid.Should().BeTrue();
result.Description.Should().Be("Admin key");
client.Dispose();
}
}

View File

@@ -0,0 +1,100 @@
using FluentAssertions;
using Xunit;
using ZB.MOM.WW.LmxProxy.Client.Domain;
using ZB.MOM.WW.LmxProxy.Client.Tests.Fakes;
namespace ZB.MOM.WW.LmxProxy.Client.Tests;
public class LmxProxyClientSubscriptionTests
{
[Fact]
public async Task SubscribeAsync_InvokesCallbackForEachUpdate()
{
var (client, fake) = TestableClient.CreateConnected();
fake.SubscriptionMessages =
[
new VtqMessage
{
Tag = "Tag1",
Value = new TypedValue { DoubleValue = 1.0 },
TimestampUtcTicks = DateTime.UtcNow.Ticks,
Quality = new QualityCode { StatusCode = 0x00000000 }
},
new VtqMessage
{
Tag = "Tag2",
Value = new TypedValue { Int32Value = 42 },
TimestampUtcTicks = DateTime.UtcNow.Ticks,
Quality = new QualityCode { StatusCode = 0x00000000 }
}
];
var updates = new List<(string Tag, Vtq Vtq)>();
var subscription = await client.SubscribeAsync(
["Tag1", "Tag2"],
(tag, vtq) => updates.Add((tag, vtq)));
// Wait for processing to complete (fake yields all then stops)
await Task.Delay(500);
updates.Should().HaveCount(2);
updates[0].Tag.Should().Be("Tag1");
updates[0].Vtq.Value.Should().Be(1.0);
updates[1].Tag.Should().Be("Tag2");
updates[1].Vtq.Value.Should().Be(42);
subscription.Dispose();
client.Dispose();
}
[Fact]
public async Task SubscribeAsync_InvokesStreamErrorOnFailure()
{
var (client, fake) = TestableClient.CreateConnected();
fake.SubscriptionException = new InvalidOperationException("Stream broke");
Exception? capturedError = null;
var subscription = await client.SubscribeAsync(
["Tag1"],
(_, _) => { },
ex => capturedError = ex);
// Wait for error to propagate
await Task.Delay(500);
capturedError.Should().NotBeNull();
capturedError.Should().BeOfType<InvalidOperationException>();
capturedError!.Message.Should().Be("Stream broke");
subscription.Dispose();
client.Dispose();
}
[Fact]
public async Task SubscribeAsync_DisposeStopsProcessing()
{
var (client, fake) = TestableClient.CreateConnected();
// Provide many messages but we'll dispose early
fake.SubscriptionMessages =
[
new VtqMessage
{
Tag = "Tag1",
Value = new TypedValue { DoubleValue = 1.0 },
TimestampUtcTicks = DateTime.UtcNow.Ticks,
Quality = new QualityCode { StatusCode = 0x00000000 }
}
];
var updates = new List<(string Tag, Vtq Vtq)>();
var subscription = await client.SubscribeAsync(
["Tag1"],
(tag, vtq) => updates.Add((tag, vtq)));
// Dispose immediately
subscription.Dispose();
// Should not throw
client.Dispose();
}
}

View File

@@ -0,0 +1,157 @@
using FluentAssertions;
using Xunit;
using ZB.MOM.WW.LmxProxy.Client.Domain;
namespace ZB.MOM.WW.LmxProxy.Client.Tests;
public class TypedValueConversionTests
{
[Fact]
public void ConvertVtqMessage_ExtractsBoolValue()
{
var msg = CreateVtqMessage(new TypedValue { BoolValue = true });
var vtq = LmxProxyClient.ConvertVtqMessage(msg);
vtq.Value.Should().Be(true);
}
[Fact]
public void ConvertVtqMessage_ExtractsInt32Value()
{
var msg = CreateVtqMessage(new TypedValue { Int32Value = 42 });
var vtq = LmxProxyClient.ConvertVtqMessage(msg);
vtq.Value.Should().Be(42);
}
[Fact]
public void ConvertVtqMessage_ExtractsInt64Value()
{
var msg = CreateVtqMessage(new TypedValue { Int64Value = long.MaxValue });
var vtq = LmxProxyClient.ConvertVtqMessage(msg);
vtq.Value.Should().Be(long.MaxValue);
}
[Fact]
public void ConvertVtqMessage_ExtractsFloatValue()
{
var msg = CreateVtqMessage(new TypedValue { FloatValue = 3.14f });
var vtq = LmxProxyClient.ConvertVtqMessage(msg);
vtq.Value.Should().Be(3.14f);
}
[Fact]
public void ConvertVtqMessage_ExtractsDoubleValue()
{
var msg = CreateVtqMessage(new TypedValue { DoubleValue = 99.99 });
var vtq = LmxProxyClient.ConvertVtqMessage(msg);
vtq.Value.Should().Be(99.99);
}
[Fact]
public void ConvertVtqMessage_ExtractsStringValue()
{
var msg = CreateVtqMessage(new TypedValue { StringValue = "hello" });
var vtq = LmxProxyClient.ConvertVtqMessage(msg);
vtq.Value.Should().Be("hello");
}
[Fact]
public void ConvertVtqMessage_ExtractsDateTimeValue()
{
var dt = new DateTime(2026, 3, 22, 12, 0, 0, DateTimeKind.Utc);
var msg = CreateVtqMessage(new TypedValue { DatetimeValue = dt.Ticks });
var vtq = LmxProxyClient.ConvertVtqMessage(msg);
vtq.Value.Should().BeOfType<DateTime>();
((DateTime)vtq.Value!).Should().Be(dt);
}
[Fact]
public void ConvertVtqMessage_HandlesNullTypedValue()
{
var msg = new VtqMessage
{
Tag = "NullTag",
Value = null,
TimestampUtcTicks = DateTime.UtcNow.Ticks,
Quality = new QualityCode { StatusCode = 0x00000000 }
};
var vtq = LmxProxyClient.ConvertVtqMessage(msg);
vtq.Value.Should().BeNull();
}
[Fact]
public void ConvertVtqMessage_HandlesNullMessage()
{
var vtq = LmxProxyClient.ConvertVtqMessage(null);
vtq.Value.Should().BeNull();
vtq.Quality.Should().Be(Quality.Bad);
}
[Fact]
public void ConvertVtqMessage_GoodQualityCode()
{
var msg = CreateVtqMessage(new TypedValue { DoubleValue = 1.0 }, statusCode: 0x00000000);
var vtq = LmxProxyClient.ConvertVtqMessage(msg);
vtq.Quality.Should().Be(Quality.Good);
}
[Fact]
public void ConvertVtqMessage_BadQualityCode()
{
var msg = CreateVtqMessage(new TypedValue { DoubleValue = 1.0 }, statusCode: 0x80000000);
var vtq = LmxProxyClient.ConvertVtqMessage(msg);
vtq.Quality.Should().Be(Quality.Bad);
}
[Fact]
public void ConvertVtqMessage_UncertainQualityCode()
{
var msg = CreateVtqMessage(new TypedValue { DoubleValue = 1.0 }, statusCode: 0x40000000);
var vtq = LmxProxyClient.ConvertVtqMessage(msg);
vtq.Quality.Should().Be(Quality.Uncertain);
}
[Fact]
public void ConvertVtqMessage_MapsQualityCodeCorrectly()
{
// Test that a specific non-zero Good code still maps to Good
var msg = CreateVtqMessage(new TypedValue { Int32Value = 5 }, statusCode: 0x00D80000);
var vtq = LmxProxyClient.ConvertVtqMessage(msg);
vtq.Quality.Should().Be(Quality.Good);
}
private static VtqMessage CreateVtqMessage(TypedValue value, uint statusCode = 0x00000000)
{
return new VtqMessage
{
Tag = "TestTag",
Value = value,
TimestampUtcTicks = new DateTime(2026, 1, 1, 0, 0, 0, DateTimeKind.Utc).Ticks,
Quality = new QualityCode { StatusCode = statusCode }
};
}
}