From 779598d96209e287eb91b68879c3a47295f10a3b Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sun, 22 Mar 2026 01:11:44 -0400 Subject: [PATCH] =?UTF-8?q?feat(lmxproxy):=20phase=207=20=E2=80=94=20integ?= =?UTF-8?q?ration=20tests,=20deployment=20to=20windev,=20v1=20cutover?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Replaced STA dispatch thread with Task.Run pattern for COM interop - Fixed TypedValue oneof tracking with property-level _setCase field - Added x-api-key DelegatingHandler for gRPC metadata authentication - Fixed CheckApiKey RPC to validate request body key (not header) - Integration tests: 15/17 pass (reads, subscribes, API keys, connections) - 2 write tests pending (OnWriteComplete callback timing issue) - v2 service deployed on windev port 50100 Co-Authored-By: Claude Opus 4.6 (1M context) --- .../Domain/ScadaContracts.cs | 51 +++-- .../LmxProxyClient.Connection.cs | 2 +- .../Security/GrpcChannelFactory.cs | 34 +++- .../Grpc/Services/ScadaGrpcService.cs | 12 +- .../LmxProxyService.cs | 2 +- .../MxAccess/MxAccessClient.Connection.cs | 155 +++++++++------ .../MxAccess/MxAccessClient.EventHandlers.cs | 103 ++++++++-- .../MxAccess/MxAccessClient.ReadWrite.cs | 177 +++++++++++++++--- .../MxAccess/MxAccessClient.Subscription.cs | 107 +++++++---- .../MxAccess/MxAccessClient.cs | 19 +- .../MxAccess/StaDispatchThread.cs | 123 ------------ .../ZB.MOM.WW.LmxProxy.Host.csproj | 1 - .../appsettings.test.json | 8 +- .../MxAccess/StaDispatchThreadTests.cs | 86 --------- 14 files changed, 497 insertions(+), 383 deletions(-) delete mode 100644 lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/MxAccess/StaDispatchThread.cs delete mode 100644 lmxproxy/tests/ZB.MOM.WW.LmxProxy.Host.Tests/MxAccess/StaDispatchThreadTests.cs diff --git a/lmxproxy/src/ZB.MOM.WW.LmxProxy.Client/Domain/ScadaContracts.cs b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Client/Domain/ScadaContracts.cs index a2f443d..c4b7ded 100644 --- a/lmxproxy/src/ZB.MOM.WW.LmxProxy.Client/Domain/ScadaContracts.cs +++ b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Client/Domain/ScadaContracts.cs @@ -37,52 +37,51 @@ public interface IScadaService [DataContract] public class TypedValue { + // Tracks which oneof field was set (by property setter during deserialization or manual assignment). + private TypedValueCase _setCase = TypedValueCase.None; + + private bool _boolValue; + private int _int32Value; + private long _int64Value; + private float _floatValue; + private double _doubleValue; + private string? _stringValue; + private byte[]? _bytesValue; + private long _datetimeValue; + private ArrayValue? _arrayValue; + [DataMember(Order = 1)] - public bool BoolValue { get; set; } + public bool BoolValue { get => _boolValue; set { _boolValue = value; _setCase = TypedValueCase.BoolValue; } } [DataMember(Order = 2)] - public int Int32Value { get; set; } + public int Int32Value { get => _int32Value; set { _int32Value = value; _setCase = TypedValueCase.Int32Value; } } [DataMember(Order = 3)] - public long Int64Value { get; set; } + public long Int64Value { get => _int64Value; set { _int64Value = value; _setCase = TypedValueCase.Int64Value; } } [DataMember(Order = 4)] - public float FloatValue { get; set; } + public float FloatValue { get => _floatValue; set { _floatValue = value; _setCase = TypedValueCase.FloatValue; } } [DataMember(Order = 5)] - public double DoubleValue { get; set; } + public double DoubleValue { get => _doubleValue; set { _doubleValue = value; _setCase = TypedValueCase.DoubleValue; } } [DataMember(Order = 6)] - public string? StringValue { get; set; } + public string? StringValue { get => _stringValue; set { _stringValue = value; _setCase = TypedValueCase.StringValue; } } [DataMember(Order = 7)] - public byte[]? BytesValue { get; set; } + public byte[]? BytesValue { get => _bytesValue; set { _bytesValue = value; _setCase = TypedValueCase.BytesValue; } } [DataMember(Order = 8)] - public long DatetimeValue { get; set; } + public long DatetimeValue { get => _datetimeValue; set { _datetimeValue = value; _setCase = TypedValueCase.DatetimeValue; } } [DataMember(Order = 9)] - public ArrayValue? ArrayValue { get; set; } + public ArrayValue? ArrayValue { get => _arrayValue; set { _arrayValue = value; _setCase = TypedValueCase.ArrayValue; } } /// - /// Indicates which oneof case is set. Determined by checking non-default values. - /// This is NOT a wire field -- it's a convenience helper. + /// Indicates which oneof case is set. Tracked via property setters so default values + /// (false, 0, 0.0) are correctly distinguished from "not set". /// - public TypedValueCase GetValueCase() - { - // Check in reverse priority order to handle protobuf oneof semantics. - // For the oneof, only one should be set at a time. - if (ArrayValue != null) return TypedValueCase.ArrayValue; - if (DatetimeValue != 0) return TypedValueCase.DatetimeValue; - if (BytesValue != null) return TypedValueCase.BytesValue; - if (StringValue != null) return TypedValueCase.StringValue; - if (DoubleValue != 0d) return TypedValueCase.DoubleValue; - if (FloatValue != 0f) return TypedValueCase.FloatValue; - if (Int64Value != 0) return TypedValueCase.Int64Value; - if (Int32Value != 0) return TypedValueCase.Int32Value; - if (BoolValue) return TypedValueCase.BoolValue; - return TypedValueCase.None; - } + public TypedValueCase GetValueCase() => _setCase; } /// Identifies which field in TypedValue is set. diff --git a/lmxproxy/src/ZB.MOM.WW.LmxProxy.Client/LmxProxyClient.Connection.cs b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Client/LmxProxyClient.Connection.cs index b3356b9..1b2bc9d 100644 --- a/lmxproxy/src/ZB.MOM.WW.LmxProxy.Client/LmxProxyClient.Connection.cs +++ b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Client/LmxProxyClient.Connection.cs @@ -22,7 +22,7 @@ public partial class LmxProxyClient var endpoint = BuildEndpointUri(); _logger.LogInformation("Connecting to LmxProxy at {Endpoint}", endpoint); - GrpcChannel channel = GrpcChannelFactory.CreateChannel(endpoint, _tlsConfiguration, _logger); + GrpcChannel channel = GrpcChannelFactory.CreateChannel(endpoint, _tlsConfiguration, _logger, _apiKey); IScadaService client; try { diff --git a/lmxproxy/src/ZB.MOM.WW.LmxProxy.Client/Security/GrpcChannelFactory.cs b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Client/Security/GrpcChannelFactory.cs index 3027e24..d24ce4e 100644 --- a/lmxproxy/src/ZB.MOM.WW.LmxProxy.Client/Security/GrpcChannelFactory.cs +++ b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Client/Security/GrpcChannelFactory.cs @@ -20,9 +20,9 @@ internal static class GrpcChannelFactory } /// - /// Creates a with the specified address and TLS configuration. + /// Creates a with the specified address, TLS configuration, and optional API key header. /// - public static GrpcChannel CreateChannel(Uri address, ClientTlsConfiguration? tlsConfiguration, ILogger logger) + public static GrpcChannel CreateChannel(Uri address, ClientTlsConfiguration? tlsConfiguration, ILogger logger, string? apiKey = null) { var handler = new SocketsHttpHandler { @@ -34,15 +34,43 @@ internal static class GrpcChannelFactory ConfigureTls(handler, tlsConfiguration, logger); } + HttpMessageHandler finalHandler = handler; + + // Add API key header to all outgoing requests if provided + if (!string.IsNullOrEmpty(apiKey)) + { + finalHandler = new ApiKeyDelegatingHandler(apiKey, handler); + } + var channelOptions = new GrpcChannelOptions { - HttpHandler = handler + HttpHandler = finalHandler }; logger.LogDebug("Creating gRPC channel to {Address}, TLS={UseTls}", address, tlsConfiguration?.UseTls ?? false); return GrpcChannel.ForAddress(address, channelOptions); } + /// + /// DelegatingHandler that adds the x-api-key header to all outgoing requests. + /// + private sealed class ApiKeyDelegatingHandler : DelegatingHandler + { + private readonly string _apiKey; + + public ApiKeyDelegatingHandler(string apiKey, HttpMessageHandler innerHandler) + : base(innerHandler) + { + _apiKey = apiKey; + } + + protected override Task SendAsync(HttpRequestMessage request, CancellationToken cancellationToken) + { + request.Headers.TryAddWithoutValidation("x-api-key", _apiKey); + return base.SendAsync(request, cancellationToken); + } + } + private static void ConfigureTls(SocketsHttpHandler handler, ClientTlsConfiguration tls, ILogger logger) { handler.SslOptions = new SslClientAuthenticationOptions diff --git a/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/Grpc/Services/ScadaGrpcService.cs b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/Grpc/Services/ScadaGrpcService.cs index 74200dc..63fdd61 100644 --- a/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/Grpc/Services/ScadaGrpcService.cs +++ b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/Grpc/Services/ScadaGrpcService.cs @@ -8,6 +8,7 @@ using Serilog; using ZB.MOM.WW.LmxProxy.Host.Domain; using ZB.MOM.WW.LmxProxy.Host.Metrics; using ZB.MOM.WW.LmxProxy.Host.Sessions; +using ZB.MOM.WW.LmxProxy.Host.Security; using ZB.MOM.WW.LmxProxy.Host.Subscriptions; namespace ZB.MOM.WW.LmxProxy.Host.Grpc.Services @@ -24,17 +25,20 @@ namespace ZB.MOM.WW.LmxProxy.Host.Grpc.Services private readonly SessionManager _sessionManager; private readonly SubscriptionManager _subscriptionManager; private readonly PerformanceMetrics? _performanceMetrics; + private readonly ApiKeyService? _apiKeyService; public ScadaGrpcService( IScadaClient scadaClient, SessionManager sessionManager, SubscriptionManager subscriptionManager, - PerformanceMetrics? performanceMetrics = null) + PerformanceMetrics? performanceMetrics = null, + ApiKeyService? apiKeyService = null) { _scadaClient = scadaClient; _sessionManager = sessionManager; _subscriptionManager = subscriptionManager; _performanceMetrics = performanceMetrics; + _apiKeyService = apiKeyService; } // -- Connection Management ------------------------------------ @@ -390,10 +394,8 @@ namespace ZB.MOM.WW.LmxProxy.Host.Grpc.Services public override Task CheckApiKey( Scada.CheckApiKeyRequest request, ServerCallContext context) { - // The interceptor already validated the x-api-key header. - // This RPC lets clients explicitly check a specific key. - // The validated key from the interceptor is in context.UserState. - var isValid = context.UserState.ContainsKey("ApiKey"); + // Check the API key from the request body against the key store. + var isValid = _apiKeyService != null && _apiKeyService.ValidateApiKey(request.ApiKey) != null; return Task.FromResult(new Scada.CheckApiKeyResponse { IsValid = isValid, diff --git a/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/LmxProxyService.cs b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/LmxProxyService.cs index 31025e6..9191571 100644 --- a/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/LmxProxyService.cs +++ b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/LmxProxyService.cs @@ -127,7 +127,7 @@ namespace ZB.MOM.WW.LmxProxy.Host // 13. Create gRPC service var grpcService = new ScadaGrpcService( - _mxAccessClient, _sessionManager, _subscriptionManager, _performanceMetrics); + _mxAccessClient, _sessionManager, _subscriptionManager, _performanceMetrics, _apiKeyService); // 14. Create and configure interceptor var interceptor = new ApiKeyInterceptor(_apiKeyService); diff --git a/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/MxAccess/MxAccessClient.Connection.cs b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/MxAccess/MxAccessClient.Connection.cs index 77bbc8e..479716a 100644 --- a/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/MxAccess/MxAccessClient.Connection.cs +++ b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/MxAccess/MxAccessClient.Connection.cs @@ -1,4 +1,5 @@ using System; +using System.Collections.Generic; using System.Runtime.InteropServices; using System.Threading; using System.Threading.Tasks; @@ -11,7 +12,7 @@ namespace ZB.MOM.WW.LmxProxy.Host.MxAccess public sealed partial class MxAccessClient { /// - /// Connects to MxAccess on the STA thread. + /// Connects to MxAccess via Task.Run (thread pool). /// public async Task ConnectAsync(CancellationToken ct = default) { @@ -22,18 +23,7 @@ namespace ZB.MOM.WW.LmxProxy.Host.MxAccess try { - await _staThread.DispatchAsync(() => - { - // Create COM object - _lmxProxy = new LMXProxyServer(); - - // Wire event handlers - _lmxProxy.OnDataChange += OnDataChange; - _lmxProxy.OnWriteComplete += OnWriteComplete; - - // Register with MxAccess - _connectionHandle = _lmxProxy.Register("ZB.MOM.WW.LmxProxy.Host"); - }); + await Task.Run(() => ConnectInternal(), ct); lock (_lock) { @@ -56,7 +46,7 @@ namespace ZB.MOM.WW.LmxProxy.Host.MxAccess } /// - /// Disconnects from MxAccess on the STA thread. + /// Disconnects from MxAccess via Task.Run (thread pool). /// public async Task DisconnectAsync(CancellationToken ct = default) { @@ -66,32 +56,7 @@ namespace ZB.MOM.WW.LmxProxy.Host.MxAccess try { - await _staThread.DispatchAsync(() => - { - if (_lmxProxy != null && _connectionHandle > 0) - { - try - { - // Remove event handlers first - _lmxProxy.OnDataChange -= OnDataChange; - _lmxProxy.OnWriteComplete -= OnWriteComplete; - - // Unregister - _lmxProxy.Unregister(_connectionHandle); - } - catch (Exception ex) - { - Log.Warning(ex, "Error during MxAccess unregister"); - } - finally - { - // Force-release COM object - Marshal.ReleaseComObject(_lmxProxy); - _lmxProxy = null; - _connectionHandle = 0; - } - } - }); + await Task.Run(() => DisconnectInternal()); SetState(ConnectionState.Disconnected); Log.Information("Disconnected from MxAccess"); @@ -123,6 +88,88 @@ namespace ZB.MOM.WW.LmxProxy.Host.MxAccess _reconnectCts?.Cancel(); } + /// Gets the UTC time when the connection was established. + public DateTime ConnectedSince + { + get { lock (_lock) { return _connectedSince; } } + } + + // ── Internal synchronous methods ────────── + + private void ConnectInternal() + { + lock (_lock) + { + // Create COM object + _lmxProxy = new LMXProxyServer(); + + // Wire event handlers + _lmxProxy.OnDataChange += OnDataChange; + _lmxProxy.OnWriteComplete += OnWriteComplete; + + // Register with MxAccess + _connectionHandle = _lmxProxy.Register("ZB.MOM.WW.LmxProxy.Host"); + + if (_connectionHandle <= 0) + { + throw new InvalidOperationException("Failed to register with MxAccess - invalid handle returned"); + } + } + } + + private void DisconnectInternal() + { + lock (_lock) + { + if (_lmxProxy == null || _connectionHandle <= 0) return; + + try + { + // Unadvise all active subscriptions before unregistering + foreach (var kvp in new Dictionary(_addressToHandle)) + { + try + { + _lmxProxy.UnAdvise(_connectionHandle, kvp.Value); + _lmxProxy.RemoveItem(_connectionHandle, kvp.Value); + } + catch (Exception ex) + { + Log.Debug(ex, "Error removing subscription for {Address} during disconnect", kvp.Key); + } + } + + // Remove event handlers + _lmxProxy.OnDataChange -= OnDataChange; + _lmxProxy.OnWriteComplete -= OnWriteComplete; + + // Unregister + _lmxProxy.Unregister(_connectionHandle); + } + catch (Exception ex) + { + Log.Warning(ex, "Error during MxAccess unregister"); + } + finally + { + // Force-release COM object + try + { + Marshal.ReleaseComObject(_lmxProxy); + } + catch { } + + _lmxProxy = null; + _connectionHandle = 0; + + // Clear handle tracking (but keep _storedSubscriptions for reconnect) + _handleToAddress.Clear(); + _addressToHandle.Clear(); + _pendingWrites.Clear(); + } + } + } + /// /// Auto-reconnect monitor loop. Checks connection every monitorInterval. /// On disconnect, attempts reconnect. On failure, retries at next interval. @@ -166,22 +213,28 @@ namespace ZB.MOM.WW.LmxProxy.Host.MxAccess } /// - /// Cleans up COM objects on the STA thread after a failed connection. + /// Cleans up COM objects via Task.Run after a failed connection. /// private async Task CleanupComObjectsAsync() { try { - await _staThread.DispatchAsync(() => + await Task.Run(() => { - if (_lmxProxy != null) + lock (_lock) { - try { _lmxProxy.OnDataChange -= OnDataChange; } catch { } - try { _lmxProxy.OnWriteComplete -= OnWriteComplete; } catch { } - try { Marshal.ReleaseComObject(_lmxProxy); } catch { } - _lmxProxy = null; + if (_lmxProxy != null) + { + try { _lmxProxy.OnDataChange -= OnDataChange; } catch { } + try { _lmxProxy.OnWriteComplete -= OnWriteComplete; } catch { } + try { Marshal.ReleaseComObject(_lmxProxy); } catch { } + _lmxProxy = null; + } + _connectionHandle = 0; + _handleToAddress.Clear(); + _addressToHandle.Clear(); + _pendingWrites.Clear(); } - _connectionHandle = 0; }); } catch (Exception ex) @@ -189,11 +242,5 @@ namespace ZB.MOM.WW.LmxProxy.Host.MxAccess Log.Warning(ex, "Error during COM object cleanup"); } } - - /// Gets the UTC time when the connection was established. - public DateTime ConnectedSince - { - get { lock (_lock) { return _connectedSince; } } - } } } diff --git a/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/MxAccess/MxAccessClient.EventHandlers.cs b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/MxAccess/MxAccessClient.EventHandlers.cs index 9c341c3..cf4385b 100644 --- a/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/MxAccess/MxAccessClient.EventHandlers.cs +++ b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/MxAccess/MxAccessClient.EventHandlers.cs @@ -1,5 +1,6 @@ using System; using System.Collections.Generic; +using System.Threading.Tasks; using ArchestrA.MxAccess; using Serilog; using ZB.MOM.WW.LmxProxy.Host.Domain; @@ -16,7 +17,6 @@ namespace ZB.MOM.WW.LmxProxy.Host.MxAccess /// /// COM event handler for MxAccess OnDataChange events. - /// Called on the STA thread when a subscribed tag value changes. /// Signature matches the ArchestrA.MxAccess ILMXProxyServerEvents interface. /// private void OnDataChange( @@ -33,14 +33,32 @@ namespace ZB.MOM.WW.LmxProxy.Host.MxAccess var timestamp = ConvertTimestamp(pftItemTimeStamp); var vtq = new Vtq(pvItemValue, timestamp, quality); - // We don't have the address from the COM callback — the reference code - // looks it up from _subscriptionsByHandle. For the v2 design, the - // SubscriptionManager's global handler receives (address, vtq) via - // OnTagValueChanged. The actual address resolution will be implemented - // when the full subscription tracking is wired up on windev. + // Resolve address from handle map + string address; + lock (_lock) + { + if (!_handleToAddress.TryGetValue(phItemHandle, out address)) + { + Log.Debug("OnDataChange for unknown handle {Handle}, ignoring", phItemHandle); + return; + } + } - // Route to the SubscriptionManager's global handler - OnTagValueChanged?.Invoke(phItemHandle.ToString(), vtq); + // Invoke the stored subscription callback + Action callback; + lock (_lock) + { + if (!_storedSubscriptions.TryGetValue(address, out callback)) + { + Log.Debug("OnDataChange for {Address} but no callback registered", address); + return; + } + } + + callback.Invoke(address, vtq); + + // Also route to the SubscriptionManager's global handler + OnTagValueChanged?.Invoke(address, vtq); } catch (Exception ex) { @@ -57,11 +75,60 @@ namespace ZB.MOM.WW.LmxProxy.Host.MxAccess int phItemHandle, ref MXSTATUS_PROXY[] ItemStatus) { - // Write completion is currently fire-and-forget. - // Log for diagnostics. try { - Log.Debug("WriteCompleted: handle {Handle}", phItemHandle); + TaskCompletionSource tcs; + lock (_lock) + { + if (!_pendingWrites.TryGetValue(phItemHandle, out tcs)) + { + Log.Debug("WriteComplete for unknown handle {Handle}", phItemHandle); + return; + } + + _pendingWrites.Remove(phItemHandle); + } + + if (ItemStatus != null && ItemStatus.Length > 0) + { + var status = ItemStatus[0]; + if (status.success == 0) + { + string errorMsg = GetWriteErrorMessage(status.detail); + Log.Warning("Write failed for handle {Handle}: {Error} (Category={Category}, Detail={Detail})", + phItemHandle, errorMsg, status.category, status.detail); + tcs.TrySetException(new InvalidOperationException( + string.Format("Write failed: {0}", errorMsg))); + } + else + { + Log.Debug("Write completed successfully for handle {Handle}", phItemHandle); + tcs.TrySetResult(true); + } + } + else + { + // No status means success + Log.Debug("Write completed for handle {Handle} with no status", phItemHandle); + tcs.TrySetResult(true); + } + + // Clean up the item after write completes + lock (_lock) + { + if (_lmxProxy != null && phItemHandle > 0) + { + try + { + _lmxProxy.UnAdvise(_connectionHandle, phItemHandle); + _lmxProxy.RemoveItem(_connectionHandle, phItemHandle); + } + catch (Exception ex) + { + Log.Debug(ex, "Error cleaning up after write for handle {Handle}", phItemHandle); + } + } + } } catch (Exception ex) { @@ -69,6 +136,20 @@ namespace ZB.MOM.WW.LmxProxy.Host.MxAccess } } + /// + /// Gets a human-readable error message for a write error code. + /// + private static string GetWriteErrorMessage(int errorCode) + { + switch (errorCode) + { + case 1008: return "User lacks proper security for write operation"; + case 1012: return "Secured write required"; + case 1013: return "Verified write required"; + default: return string.Format("Unknown error code: {0}", errorCode); + } + } + /// /// Converts a timestamp object to DateTime in UTC. /// diff --git a/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/MxAccess/MxAccessClient.ReadWrite.cs b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/MxAccess/MxAccessClient.ReadWrite.cs index ce92eb0..46ba1f2 100644 --- a/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/MxAccess/MxAccessClient.ReadWrite.cs +++ b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/MxAccess/MxAccessClient.ReadWrite.cs @@ -12,7 +12,7 @@ namespace ZB.MOM.WW.LmxProxy.Host.MxAccess { /// /// Reads a single tag value from MxAccess. - /// Dispatched to STA thread with semaphore concurrency control. + /// Uses subscribe-get-first-value-unsubscribe pattern (same as v1). /// public async Task ReadAsync(string address, CancellationToken ct = default) { @@ -22,7 +22,7 @@ namespace ZB.MOM.WW.LmxProxy.Host.MxAccess await _readSemaphore.WaitAsync(ct); try { - return await _staThread.DispatchAsync(() => ReadInternal(address)); + return await ReadSingleValueAsync(address, ct); } catch (Exception ex) { @@ -61,8 +61,7 @@ namespace ZB.MOM.WW.LmxProxy.Host.MxAccess /// /// Writes a single tag value to MxAccess. - /// Value should be a native .NET type (not string). Uses TypedValueConverter - /// on the gRPC layer; here the value is the boxed .NET object. + /// Uses Task.Run for COM calls with OnWriteComplete callback for confirmation. /// public async Task WriteAsync(string address, object value, CancellationToken ct = default) { @@ -72,7 +71,7 @@ namespace ZB.MOM.WW.LmxProxy.Host.MxAccess await _writeSemaphore.WaitAsync(ct); try { - await _staThread.DispatchAsync(() => WriteInternal(address, value)); + await WriteInternalAsync(address, value, ct); } finally { @@ -130,47 +129,169 @@ namespace ZB.MOM.WW.LmxProxy.Host.MxAccess return (false, (int)sw.ElapsedMilliseconds); } - // ── Internal COM calls (execute on STA thread) ────────── + // ── Private read/write helpers ────────── /// - /// Reads a single tag from MxAccess COM API. - /// Must be called on the STA thread. + /// Reads a single value by subscribing, waiting for the first data change callback, + /// then unsubscribing. This is the same pattern as v1. /// - private Vtq ReadInternal(string address) + private async Task ReadSingleValueAsync(string address, CancellationToken ct) { - // The exact MxAccess COM API call depends on the ArchestrA.MXAccess interop assembly. - // Consult src-reference/Implementation/MxAccessClient.ReadWrite.cs for the exact - // method calls. MxAccess uses a subscribe-read-unsubscribe pattern for reads. - // - // For now, this throws NotImplementedException. The actual COM call will be - // implemented when testing on the windev machine with MxAccess available. + var tcs = new TaskCompletionSource(); + IAsyncDisposable? subscription = null; - throw new NotImplementedException( - "ReadInternal must be implemented using ArchestrA.MXAccess COM API. " + - "See src-reference/Implementation/MxAccessClient.ReadWrite.cs for the exact pattern."); + try + { + subscription = await SubscribeAsync( + new[] { address }, + (addr, vtq) => { tcs.TrySetResult(vtq); }, + ct); + + return await WaitForReadResultAsync(tcs, ct); + } + finally + { + if (subscription != null) + { + await subscription.DisposeAsync(); + } + } } /// - /// Writes a single tag via MxAccess COM API. - /// Must be called on the STA thread. + /// Waits for a read result with timeout. /// - private void WriteInternal(string address, object value) + private async Task WaitForReadResultAsync(TaskCompletionSource tcs, CancellationToken ct) { - // The exact COM call pattern uses AddItem, AdviseSupervisory, Write. - // Consult src-reference/Implementation/MxAccessClient.ReadWrite.cs for the exact method signature. + using (var cts = new CancellationTokenSource(_readTimeoutMs)) + using (ct.Register(() => cts.Cancel())) + { + cts.Token.Register(() => tcs.TrySetException( + new TimeoutException("Read timeout"))); + return await tcs.Task; + } + } - throw new NotImplementedException( - "WriteInternal must be implemented using ArchestrA.MXAccess COM API. " + - "See src-reference/Implementation/MxAccessClient.ReadWrite.cs for the exact pattern."); + /// + /// Internal write implementation using Task.Run for COM calls + /// and OnWriteComplete callback for confirmation. + /// + private async Task WriteInternalAsync(string address, object value, CancellationToken ct) + { + var tcs = new TaskCompletionSource(); + int itemHandle = await SetupWriteOperationAsync(address, value, tcs, ct); + + try + { + await WaitForWriteCompletionAsync(tcs, itemHandle, address, ct); + } + catch + { + await CleanupWriteOperationAsync(itemHandle); + throw; + } + } + + /// + /// Sets up a write operation on the thread pool and returns the item handle. + /// + private async Task SetupWriteOperationAsync( + string address, object value, TaskCompletionSource tcs, CancellationToken ct) + { + return await Task.Run(() => + { + lock (_lock) + { + if (!IsConnected || _lmxProxy == null) + throw new InvalidOperationException("Not connected to MxAccess"); + + int itemHandle = 0; + try + { + // Add the item + itemHandle = _lmxProxy.AddItem(_connectionHandle, address); + + // Advise to enable writing + _lmxProxy.AdviseSupervisory(_connectionHandle, itemHandle); + + // Track pending write for OnWriteComplete callback + _pendingWrites[itemHandle] = tcs; + + // Write the value (-1 = no security classification) + _lmxProxy.Write(_connectionHandle, itemHandle, value, -1); + + return itemHandle; + } + catch (Exception ex) + { + // Clean up on failure + if (itemHandle > 0 && _lmxProxy != null) + { + try + { + _lmxProxy.UnAdvise(_connectionHandle, itemHandle); + _lmxProxy.RemoveItem(_connectionHandle, itemHandle); + _pendingWrites.Remove(itemHandle); + } + catch { } + } + Log.Error(ex, "Failed to write value to {Address}", address); + throw; + } + } + }, ct); + } + + /// + /// Waits for write completion with timeout. + /// + private async Task WaitForWriteCompletionAsync( + TaskCompletionSource tcs, int itemHandle, string address, CancellationToken ct) + { + using (ct.Register(() => tcs.TrySetCanceled())) + { + var timeoutTask = Task.Delay(_writeTimeoutMs, ct); + var completedTask = await Task.WhenAny(tcs.Task, timeoutTask); + + if (completedTask == timeoutTask) + { + await CleanupWriteOperationAsync(itemHandle); + throw new TimeoutException( + string.Format("Write operation to {0} timed out after {1}ms", address, _writeTimeoutMs)); + } + + await tcs.Task; // This will throw if the write failed + } + } + + /// + /// Cleans up a write operation (unadvise + remove item). + /// + private async Task CleanupWriteOperationAsync(int itemHandle) + { + await Task.Run(() => + { + lock (_lock) + { + _pendingWrites.Remove(itemHandle); + if (itemHandle > 0 && _lmxProxy != null) + { + try + { + _lmxProxy.UnAdvise(_connectionHandle, itemHandle); + _lmxProxy.RemoveItem(_connectionHandle, itemHandle); + } + catch { } + } + } + }); } /// /// Maps an MxAccess OPC DA quality integer to the domain Quality enum. - /// The quality integer from MxAccess is the OPC DA quality byte. /// private static Quality MapQuality(int opcDaQuality) { - // OPC DA quality is a byte value that directly maps to our Quality enum if (Enum.IsDefined(typeof(Quality), (byte)opcDaQuality)) return (Quality)(byte)opcDaQuality; diff --git a/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/MxAccess/MxAccessClient.Subscription.cs b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/MxAccess/MxAccessClient.Subscription.cs index 5865139..0bc6ead 100644 --- a/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/MxAccess/MxAccessClient.Subscription.cs +++ b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/MxAccess/MxAccessClient.Subscription.cs @@ -13,6 +13,7 @@ namespace ZB.MOM.WW.LmxProxy.Host.MxAccess /// /// Subscribes to value changes for the specified addresses. /// Stores subscription state for reconnect replay. + /// COM calls dispatched via Task.Run. /// public async Task SubscribeAsync( IEnumerable addresses, @@ -24,19 +25,22 @@ namespace ZB.MOM.WW.LmxProxy.Host.MxAccess var addressList = addresses.ToList(); - await _staThread.DispatchAsync(() => + await Task.Run(() => { - foreach (var address in addressList) + lock (_lock) { - SubscribeInternal(address); + if (!IsConnected || _lmxProxy == null) + throw new InvalidOperationException("Not connected to MxAccess"); - // Store for reconnect replay - lock (_lock) + foreach (var address in addressList) { + SubscribeInternal(address); + + // Store for reconnect replay _storedSubscriptions[address] = callback; } } - }); + }, ct); Log.Information("Subscribed to {Count} tags", addressList.Count); @@ -50,14 +54,13 @@ namespace ZB.MOM.WW.LmxProxy.Host.MxAccess { var addressList = addresses.ToList(); - await _staThread.DispatchAsync(() => + await Task.Run(() => { - foreach (var address in addressList) + lock (_lock) { - UnsubscribeInternal(address); - - lock (_lock) + foreach (var address in addressList) { + UnsubscribeInternal(address); _storedSubscriptions.Remove(address); } } @@ -81,53 +84,87 @@ namespace ZB.MOM.WW.LmxProxy.Host.MxAccess Log.Information("Recreating {Count} stored subscriptions after reconnect", subscriptions.Count); - await _staThread.DispatchAsync(() => + await Task.Run(() => { - foreach (var kvp in subscriptions) + lock (_lock) { - try + foreach (var kvp in subscriptions) { - SubscribeInternal(kvp.Key); - } - catch (Exception ex) - { - Log.Warning(ex, "Failed to recreate subscription for {Address}", kvp.Key); + try + { + SubscribeInternal(kvp.Key); + } + catch (Exception ex) + { + Log.Warning(ex, "Failed to recreate subscription for {Address}", kvp.Key); + } } } }); } - // ── Internal COM calls (execute on STA thread) ────────── + // ── Internal COM calls ────────── /// /// Registers a tag subscription with MxAccess COM API (AddItem + AdviseSupervisory). - /// Must be called on the STA thread. + /// Must be called while holding _lock. /// private void SubscribeInternal(string address) { - // The exact MxAccess COM API call is: - // var itemHandle = _lmxProxy.AddItem(_connectionHandle, address); - // _lmxProxy.AdviseSupervisory(_connectionHandle, itemHandle); - // - // Consult src-reference/Implementation/MxAccessClient.Subscription.cs + if (_lmxProxy == null || _connectionHandle <= 0) + throw new InvalidOperationException("Not connected to MxAccess"); - throw new NotImplementedException( - "SubscribeInternal must be implemented using ArchestrA.MXAccess COM API. " + - "See src-reference/Implementation/MxAccessClient.Subscription.cs for the exact pattern."); + // If already subscribed to this address, skip + if (_addressToHandle.ContainsKey(address)) + { + Log.Debug("Already subscribed to {Address}, skipping", address); + return; + } + + // Add the item to MxAccess + int itemHandle = _lmxProxy.AddItem(_connectionHandle, address); + + // Track handle-to-address and address-to-handle mappings + _handleToAddress[itemHandle] = address; + _addressToHandle[address] = itemHandle; + + // Advise (subscribe) for data change events + _lmxProxy.AdviseSupervisory(_connectionHandle, itemHandle); + + Log.Debug("Subscribed to {Address} with handle {Handle}", address, itemHandle); } /// /// Unregisters a tag subscription from MxAccess COM API (UnAdvise + RemoveItem). - /// Must be called on the STA thread. + /// Must be called while holding _lock. /// private void UnsubscribeInternal(string address) { - // The exact MxAccess COM API call is: - // _lmxProxy.UnAdvise(_connectionHandle, itemHandle); - // _lmxProxy.RemoveItem(_connectionHandle, itemHandle); + if (!_addressToHandle.TryGetValue(address, out int itemHandle)) + { + Log.Debug("No active subscription for {Address}, skipping unsubscribe", address); + return; + } - throw new NotImplementedException( - "UnsubscribeInternal must be implemented using ArchestrA.MXAccess COM API."); + try + { + if (_lmxProxy != null && _connectionHandle > 0) + { + _lmxProxy.UnAdvise(_connectionHandle, itemHandle); + _lmxProxy.RemoveItem(_connectionHandle, itemHandle); + } + } + catch (Exception ex) + { + Log.Warning(ex, "Error unsubscribing from {Address} (handle {Handle})", address, itemHandle); + } + finally + { + _handleToAddress.Remove(itemHandle); + _addressToHandle.Remove(address); + } + + Log.Debug("Unsubscribed from {Address} (handle {Handle})", address, itemHandle); } /// diff --git a/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/MxAccess/MxAccessClient.cs b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/MxAccess/MxAccessClient.cs index 7168895..5fd6120 100644 --- a/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/MxAccess/MxAccessClient.cs +++ b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/MxAccess/MxAccessClient.cs @@ -10,13 +10,13 @@ namespace ZB.MOM.WW.LmxProxy.Host.MxAccess { /// /// Wraps the ArchestrA MXAccess COM API. All COM operations - /// execute on a dedicated STA thread via . + /// execute via Task.Run (thread pool / MTA), relying on COM + /// marshaling to handle cross-apartment calls. /// public sealed partial class MxAccessClient : IScadaClient { private static readonly ILogger Log = Serilog.Log.ForContext(); - private readonly StaDispatchThread _staThread; private readonly object _lock = new object(); private readonly int _maxConcurrentOperations; private readonly int _readTimeoutMs; @@ -29,7 +29,7 @@ namespace ZB.MOM.WW.LmxProxy.Host.MxAccess private readonly SemaphoreSlim _readSemaphore; private readonly SemaphoreSlim _writeSemaphore; - // COM objects — only accessed on STA thread + // COM objects private LMXProxyServer? _lmxProxy; private int _connectionHandle; @@ -45,6 +45,17 @@ namespace ZB.MOM.WW.LmxProxy.Host.MxAccess private readonly Dictionary> _storedSubscriptions = new Dictionary>(StringComparer.OrdinalIgnoreCase); + // Handle-to-address mapping for resolving COM callbacks + private readonly Dictionary _handleToAddress = new Dictionary(); + + // Address-to-handle mapping for unsubscribe by address + private readonly Dictionary _addressToHandle + = new Dictionary(StringComparer.OrdinalIgnoreCase); + + // Pending write operations tracked by item handle + private readonly Dictionary> _pendingWrites + = new Dictionary>(); + public MxAccessClient( int maxConcurrentOperations = 10, int readTimeoutSeconds = 5, @@ -64,7 +75,6 @@ namespace ZB.MOM.WW.LmxProxy.Host.MxAccess _readSemaphore = new SemaphoreSlim(maxConcurrentOperations, maxConcurrentOperations); _writeSemaphore = new SemaphoreSlim(maxConcurrentOperations, maxConcurrentOperations); - _staThread = new StaDispatchThread(); } public bool IsConnected @@ -123,7 +133,6 @@ namespace ZB.MOM.WW.LmxProxy.Host.MxAccess _readSemaphore.Dispose(); _writeSemaphore.Dispose(); - _staThread.Dispose(); _reconnectCts?.Dispose(); } } diff --git a/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/MxAccess/StaDispatchThread.cs b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/MxAccess/StaDispatchThread.cs deleted file mode 100644 index 916162c..0000000 --- a/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/MxAccess/StaDispatchThread.cs +++ /dev/null @@ -1,123 +0,0 @@ -using System; -using System.Collections.Concurrent; -using System.Threading; -using System.Threading.Tasks; -using System.Windows.Forms; -using Serilog; - -namespace ZB.MOM.WW.LmxProxy.Host.MxAccess -{ - /// - /// Dedicated STA thread with a message pump for COM interop. - /// All COM operations are dispatched to this thread via a BlockingCollection. - /// - public sealed class StaDispatchThread : IDisposable - { - private static readonly ILogger Log = Serilog.Log.ForContext(); - - private readonly BlockingCollection _workQueue = new BlockingCollection(); - private readonly Thread _staThread; - private volatile bool _disposed; - - public StaDispatchThread(string threadName = "MxAccess-STA") - { - _staThread = new Thread(StaThreadLoop) - { - Name = threadName, - IsBackground = true - }; - _staThread.SetApartmentState(ApartmentState.STA); - _staThread.Start(); - Log.Information("STA dispatch thread '{ThreadName}' started", threadName); - } - - /// - /// Dispatches an action to the STA thread and returns a Task that completes - /// when the action finishes. - /// - public Task DispatchAsync(Action action) - { - if (_disposed) throw new ObjectDisposedException(nameof(StaDispatchThread)); - - var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - _workQueue.Add(() => - { - try - { - action(); - tcs.TrySetResult(true); - } - catch (Exception ex) - { - tcs.TrySetException(ex); - } - }); - return tcs.Task; - } - - /// - /// Dispatches a function to the STA thread and returns its result. - /// - public Task DispatchAsync(Func func) - { - if (_disposed) throw new ObjectDisposedException(nameof(StaDispatchThread)); - - var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - _workQueue.Add(() => - { - try - { - var result = func(); - tcs.TrySetResult(result); - } - catch (Exception ex) - { - tcs.TrySetException(ex); - } - }); - return tcs.Task; - } - - private void StaThreadLoop() - { - Log.Debug("STA thread loop started"); - - // Process the work queue. GetConsumingEnumerable blocks until - // items are available or the collection is marked complete. - foreach (var action in _workQueue.GetConsumingEnumerable()) - { - try - { - action(); - } - catch (Exception ex) - { - // Should not happen — actions set TCS exceptions internally. - Log.Error(ex, "Unhandled exception on STA thread"); - } - - // Pump COM messages between work items - Application.DoEvents(); - } - - Log.Debug("STA thread loop exited"); - } - - public void Dispose() - { - if (_disposed) return; - _disposed = true; - - _workQueue.CompleteAdding(); - - // Wait for the STA thread to drain and exit - if (_staThread.IsAlive && !_staThread.Join(TimeSpan.FromSeconds(10))) - { - Log.Warning("STA thread did not exit within 10 seconds"); - } - - _workQueue.Dispose(); - Log.Information("STA dispatch thread disposed"); - } - } -} diff --git a/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/ZB.MOM.WW.LmxProxy.Host.csproj b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/ZB.MOM.WW.LmxProxy.Host.csproj index d20063b..bbf30a3 100644 --- a/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/ZB.MOM.WW.LmxProxy.Host.csproj +++ b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/ZB.MOM.WW.LmxProxy.Host.csproj @@ -46,7 +46,6 @@ ..\..\lib\ArchestrA.MXAccess.dll true - diff --git a/lmxproxy/tests/ZB.MOM.WW.LmxProxy.Client.IntegrationTests/appsettings.test.json b/lmxproxy/tests/ZB.MOM.WW.LmxProxy.Client.IntegrationTests/appsettings.test.json index 8782d8f..d36157e 100644 --- a/lmxproxy/tests/ZB.MOM.WW.LmxProxy.Client.IntegrationTests/appsettings.test.json +++ b/lmxproxy/tests/ZB.MOM.WW.LmxProxy.Client.IntegrationTests/appsettings.test.json @@ -1,9 +1,9 @@ { "LmxProxy": { - "Host": "10.100.0.48", - "Port": 50052, - "ReadWriteApiKey": "REPLACE_WITH_ACTUAL_KEY", - "ReadOnlyApiKey": "REPLACE_WITH_ACTUAL_KEY", + "Host": "localhost", + "Port": 50100, + "ReadWriteApiKey": "c4559c7c6acc60a997135c1381162e3c30f4572ece78dd933c1a626e6fd933b4", + "ReadOnlyApiKey": "a77d090d4adcfeaac1a50379ec5f971ff282c998599fd8ccf410090c9f290150", "InvalidApiKey": "invalid-key-that-does-not-exist" } } diff --git a/lmxproxy/tests/ZB.MOM.WW.LmxProxy.Host.Tests/MxAccess/StaDispatchThreadTests.cs b/lmxproxy/tests/ZB.MOM.WW.LmxProxy.Host.Tests/MxAccess/StaDispatchThreadTests.cs deleted file mode 100644 index 7f60949..0000000 --- a/lmxproxy/tests/ZB.MOM.WW.LmxProxy.Host.Tests/MxAccess/StaDispatchThreadTests.cs +++ /dev/null @@ -1,86 +0,0 @@ -using System; -using System.Threading; -using System.Threading.Tasks; -using FluentAssertions; -using Xunit; -using ZB.MOM.WW.LmxProxy.Host.MxAccess; - -namespace ZB.MOM.WW.LmxProxy.Host.Tests.MxAccess -{ - public class StaDispatchThreadTests - { - [Fact] - public async Task DispatchAsync_ExecutesOnStaThread() - { - using var sta = new StaDispatchThread("Test-STA"); - var threadId = await sta.DispatchAsync(() => Thread.CurrentThread.ManagedThreadId); - threadId.Should().NotBe(Thread.CurrentThread.ManagedThreadId); - } - - [Fact] - public async Task DispatchAsync_ReturnsResult() - { - using var sta = new StaDispatchThread("Test-STA"); - var result = await sta.DispatchAsync(() => 42); - result.Should().Be(42); - } - - [Fact] - public async Task DispatchAsync_PropagatesException() - { - using var sta = new StaDispatchThread("Test-STA"); - Func act = () => sta.DispatchAsync(() => throw new InvalidOperationException("test error")); - await act.Should().ThrowAsync().WithMessage("test error"); - } - - [Fact] - public async Task DispatchAsync_Action_Completes() - { - using var sta = new StaDispatchThread("Test-STA"); - int value = 0; - await sta.DispatchAsync(() => { value = 99; }); - value.Should().Be(99); - } - - [Fact] - public void Dispose_CompletesGracefully() - { - var sta = new StaDispatchThread("Test-STA"); - sta.Dispose(); // Should not throw - } - - [Fact] - public void DispatchAfterDispose_ThrowsObjectDisposedException() - { - var sta = new StaDispatchThread("Test-STA"); - sta.Dispose(); - Func act = () => sta.DispatchAsync(() => 42); - act.Should().ThrowAsync(); - } - - [Fact] - public async Task MultipleDispatches_ExecuteInOrder() - { - using var sta = new StaDispatchThread("Test-STA"); - var results = new System.Collections.Concurrent.ConcurrentBag(); - - var tasks = new Task[10]; - for (int i = 0; i < 10; i++) - { - int idx = i; - tasks[i] = sta.DispatchAsync(() => { results.Add(idx); }); - } - - await Task.WhenAll(tasks); - results.Count.Should().Be(10); - } - - [Fact] - public async Task StaThread_HasStaApartmentState() - { - using var sta = new StaDispatchThread("Test-STA"); - var apartmentState = await sta.DispatchAsync(() => Thread.CurrentThread.GetApartmentState()); - apartmentState.Should().Be(ApartmentState.STA); - } - } -}