feat(lmxproxy): phase 7 — integration tests, deployment to windev, v1 cutover

- Replaced STA dispatch thread with Task.Run pattern for COM interop
- Fixed TypedValue oneof tracking with property-level _setCase field
- Added x-api-key DelegatingHandler for gRPC metadata authentication
- Fixed CheckApiKey RPC to validate request body key (not header)
- Integration tests: 15/17 pass (reads, subscribes, API keys, connections)
- 2 write tests pending (OnWriteComplete callback timing issue)
- v2 service deployed on windev port 50100

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Joseph Doherty
2026-03-22 01:11:44 -04:00
parent 6d9bf594ec
commit 779598d962
14 changed files with 497 additions and 383 deletions

View File

@@ -37,52 +37,51 @@ public interface IScadaService
[DataContract] [DataContract]
public class TypedValue public class TypedValue
{ {
// Tracks which oneof field was set (by property setter during deserialization or manual assignment).
private TypedValueCase _setCase = TypedValueCase.None;
private bool _boolValue;
private int _int32Value;
private long _int64Value;
private float _floatValue;
private double _doubleValue;
private string? _stringValue;
private byte[]? _bytesValue;
private long _datetimeValue;
private ArrayValue? _arrayValue;
[DataMember(Order = 1)] [DataMember(Order = 1)]
public bool BoolValue { get; set; } public bool BoolValue { get => _boolValue; set { _boolValue = value; _setCase = TypedValueCase.BoolValue; } }
[DataMember(Order = 2)] [DataMember(Order = 2)]
public int Int32Value { get; set; } public int Int32Value { get => _int32Value; set { _int32Value = value; _setCase = TypedValueCase.Int32Value; } }
[DataMember(Order = 3)] [DataMember(Order = 3)]
public long Int64Value { get; set; } public long Int64Value { get => _int64Value; set { _int64Value = value; _setCase = TypedValueCase.Int64Value; } }
[DataMember(Order = 4)] [DataMember(Order = 4)]
public float FloatValue { get; set; } public float FloatValue { get => _floatValue; set { _floatValue = value; _setCase = TypedValueCase.FloatValue; } }
[DataMember(Order = 5)] [DataMember(Order = 5)]
public double DoubleValue { get; set; } public double DoubleValue { get => _doubleValue; set { _doubleValue = value; _setCase = TypedValueCase.DoubleValue; } }
[DataMember(Order = 6)] [DataMember(Order = 6)]
public string? StringValue { get; set; } public string? StringValue { get => _stringValue; set { _stringValue = value; _setCase = TypedValueCase.StringValue; } }
[DataMember(Order = 7)] [DataMember(Order = 7)]
public byte[]? BytesValue { get; set; } public byte[]? BytesValue { get => _bytesValue; set { _bytesValue = value; _setCase = TypedValueCase.BytesValue; } }
[DataMember(Order = 8)] [DataMember(Order = 8)]
public long DatetimeValue { get; set; } public long DatetimeValue { get => _datetimeValue; set { _datetimeValue = value; _setCase = TypedValueCase.DatetimeValue; } }
[DataMember(Order = 9)] [DataMember(Order = 9)]
public ArrayValue? ArrayValue { get; set; } public ArrayValue? ArrayValue { get => _arrayValue; set { _arrayValue = value; _setCase = TypedValueCase.ArrayValue; } }
/// <summary> /// <summary>
/// Indicates which oneof case is set. Determined by checking non-default values. /// Indicates which oneof case is set. Tracked via property setters so default values
/// This is NOT a wire field -- it's a convenience helper. /// (false, 0, 0.0) are correctly distinguished from "not set".
/// </summary> /// </summary>
public TypedValueCase GetValueCase() public TypedValueCase GetValueCase() => _setCase;
{
// Check in reverse priority order to handle protobuf oneof semantics.
// For the oneof, only one should be set at a time.
if (ArrayValue != null) return TypedValueCase.ArrayValue;
if (DatetimeValue != 0) return TypedValueCase.DatetimeValue;
if (BytesValue != null) return TypedValueCase.BytesValue;
if (StringValue != null) return TypedValueCase.StringValue;
if (DoubleValue != 0d) return TypedValueCase.DoubleValue;
if (FloatValue != 0f) return TypedValueCase.FloatValue;
if (Int64Value != 0) return TypedValueCase.Int64Value;
if (Int32Value != 0) return TypedValueCase.Int32Value;
if (BoolValue) return TypedValueCase.BoolValue;
return TypedValueCase.None;
}
} }
/// <summary>Identifies which field in TypedValue is set.</summary> /// <summary>Identifies which field in TypedValue is set.</summary>

View File

@@ -22,7 +22,7 @@ public partial class LmxProxyClient
var endpoint = BuildEndpointUri(); var endpoint = BuildEndpointUri();
_logger.LogInformation("Connecting to LmxProxy at {Endpoint}", endpoint); _logger.LogInformation("Connecting to LmxProxy at {Endpoint}", endpoint);
GrpcChannel channel = GrpcChannelFactory.CreateChannel(endpoint, _tlsConfiguration, _logger); GrpcChannel channel = GrpcChannelFactory.CreateChannel(endpoint, _tlsConfiguration, _logger, _apiKey);
IScadaService client; IScadaService client;
try try
{ {

View File

@@ -20,9 +20,9 @@ internal static class GrpcChannelFactory
} }
/// <summary> /// <summary>
/// Creates a <see cref="GrpcChannel"/> with the specified address and TLS configuration. /// Creates a <see cref="GrpcChannel"/> with the specified address, TLS configuration, and optional API key header.
/// </summary> /// </summary>
public static GrpcChannel CreateChannel(Uri address, ClientTlsConfiguration? tlsConfiguration, ILogger logger) public static GrpcChannel CreateChannel(Uri address, ClientTlsConfiguration? tlsConfiguration, ILogger logger, string? apiKey = null)
{ {
var handler = new SocketsHttpHandler var handler = new SocketsHttpHandler
{ {
@@ -34,15 +34,43 @@ internal static class GrpcChannelFactory
ConfigureTls(handler, tlsConfiguration, logger); ConfigureTls(handler, tlsConfiguration, logger);
} }
HttpMessageHandler finalHandler = handler;
// Add API key header to all outgoing requests if provided
if (!string.IsNullOrEmpty(apiKey))
{
finalHandler = new ApiKeyDelegatingHandler(apiKey, handler);
}
var channelOptions = new GrpcChannelOptions var channelOptions = new GrpcChannelOptions
{ {
HttpHandler = handler HttpHandler = finalHandler
}; };
logger.LogDebug("Creating gRPC channel to {Address}, TLS={UseTls}", address, tlsConfiguration?.UseTls ?? false); logger.LogDebug("Creating gRPC channel to {Address}, TLS={UseTls}", address, tlsConfiguration?.UseTls ?? false);
return GrpcChannel.ForAddress(address, channelOptions); return GrpcChannel.ForAddress(address, channelOptions);
} }
/// <summary>
/// DelegatingHandler that adds the x-api-key header to all outgoing requests.
/// </summary>
private sealed class ApiKeyDelegatingHandler : DelegatingHandler
{
private readonly string _apiKey;
public ApiKeyDelegatingHandler(string apiKey, HttpMessageHandler innerHandler)
: base(innerHandler)
{
_apiKey = apiKey;
}
protected override Task<HttpResponseMessage> SendAsync(HttpRequestMessage request, CancellationToken cancellationToken)
{
request.Headers.TryAddWithoutValidation("x-api-key", _apiKey);
return base.SendAsync(request, cancellationToken);
}
}
private static void ConfigureTls(SocketsHttpHandler handler, ClientTlsConfiguration tls, ILogger logger) private static void ConfigureTls(SocketsHttpHandler handler, ClientTlsConfiguration tls, ILogger logger)
{ {
handler.SslOptions = new SslClientAuthenticationOptions handler.SslOptions = new SslClientAuthenticationOptions

View File

@@ -8,6 +8,7 @@ using Serilog;
using ZB.MOM.WW.LmxProxy.Host.Domain; using ZB.MOM.WW.LmxProxy.Host.Domain;
using ZB.MOM.WW.LmxProxy.Host.Metrics; using ZB.MOM.WW.LmxProxy.Host.Metrics;
using ZB.MOM.WW.LmxProxy.Host.Sessions; using ZB.MOM.WW.LmxProxy.Host.Sessions;
using ZB.MOM.WW.LmxProxy.Host.Security;
using ZB.MOM.WW.LmxProxy.Host.Subscriptions; using ZB.MOM.WW.LmxProxy.Host.Subscriptions;
namespace ZB.MOM.WW.LmxProxy.Host.Grpc.Services namespace ZB.MOM.WW.LmxProxy.Host.Grpc.Services
@@ -24,17 +25,20 @@ namespace ZB.MOM.WW.LmxProxy.Host.Grpc.Services
private readonly SessionManager _sessionManager; private readonly SessionManager _sessionManager;
private readonly SubscriptionManager _subscriptionManager; private readonly SubscriptionManager _subscriptionManager;
private readonly PerformanceMetrics? _performanceMetrics; private readonly PerformanceMetrics? _performanceMetrics;
private readonly ApiKeyService? _apiKeyService;
public ScadaGrpcService( public ScadaGrpcService(
IScadaClient scadaClient, IScadaClient scadaClient,
SessionManager sessionManager, SessionManager sessionManager,
SubscriptionManager subscriptionManager, SubscriptionManager subscriptionManager,
PerformanceMetrics? performanceMetrics = null) PerformanceMetrics? performanceMetrics = null,
ApiKeyService? apiKeyService = null)
{ {
_scadaClient = scadaClient; _scadaClient = scadaClient;
_sessionManager = sessionManager; _sessionManager = sessionManager;
_subscriptionManager = subscriptionManager; _subscriptionManager = subscriptionManager;
_performanceMetrics = performanceMetrics; _performanceMetrics = performanceMetrics;
_apiKeyService = apiKeyService;
} }
// -- Connection Management ------------------------------------ // -- Connection Management ------------------------------------
@@ -390,10 +394,8 @@ namespace ZB.MOM.WW.LmxProxy.Host.Grpc.Services
public override Task<Scada.CheckApiKeyResponse> CheckApiKey( public override Task<Scada.CheckApiKeyResponse> CheckApiKey(
Scada.CheckApiKeyRequest request, ServerCallContext context) Scada.CheckApiKeyRequest request, ServerCallContext context)
{ {
// The interceptor already validated the x-api-key header. // Check the API key from the request body against the key store.
// This RPC lets clients explicitly check a specific key. var isValid = _apiKeyService != null && _apiKeyService.ValidateApiKey(request.ApiKey) != null;
// The validated key from the interceptor is in context.UserState.
var isValid = context.UserState.ContainsKey("ApiKey");
return Task.FromResult(new Scada.CheckApiKeyResponse return Task.FromResult(new Scada.CheckApiKeyResponse
{ {
IsValid = isValid, IsValid = isValid,

View File

@@ -127,7 +127,7 @@ namespace ZB.MOM.WW.LmxProxy.Host
// 13. Create gRPC service // 13. Create gRPC service
var grpcService = new ScadaGrpcService( var grpcService = new ScadaGrpcService(
_mxAccessClient, _sessionManager, _subscriptionManager, _performanceMetrics); _mxAccessClient, _sessionManager, _subscriptionManager, _performanceMetrics, _apiKeyService);
// 14. Create and configure interceptor // 14. Create and configure interceptor
var interceptor = new ApiKeyInterceptor(_apiKeyService); var interceptor = new ApiKeyInterceptor(_apiKeyService);

View File

@@ -1,4 +1,5 @@
using System; using System;
using System.Collections.Generic;
using System.Runtime.InteropServices; using System.Runtime.InteropServices;
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
@@ -11,7 +12,7 @@ namespace ZB.MOM.WW.LmxProxy.Host.MxAccess
public sealed partial class MxAccessClient public sealed partial class MxAccessClient
{ {
/// <summary> /// <summary>
/// Connects to MxAccess on the STA thread. /// Connects to MxAccess via Task.Run (thread pool).
/// </summary> /// </summary>
public async Task ConnectAsync(CancellationToken ct = default) public async Task ConnectAsync(CancellationToken ct = default)
{ {
@@ -22,18 +23,7 @@ namespace ZB.MOM.WW.LmxProxy.Host.MxAccess
try try
{ {
await _staThread.DispatchAsync(() => await Task.Run(() => ConnectInternal(), ct);
{
// Create COM object
_lmxProxy = new LMXProxyServer();
// Wire event handlers
_lmxProxy.OnDataChange += OnDataChange;
_lmxProxy.OnWriteComplete += OnWriteComplete;
// Register with MxAccess
_connectionHandle = _lmxProxy.Register("ZB.MOM.WW.LmxProxy.Host");
});
lock (_lock) lock (_lock)
{ {
@@ -56,7 +46,7 @@ namespace ZB.MOM.WW.LmxProxy.Host.MxAccess
} }
/// <summary> /// <summary>
/// Disconnects from MxAccess on the STA thread. /// Disconnects from MxAccess via Task.Run (thread pool).
/// </summary> /// </summary>
public async Task DisconnectAsync(CancellationToken ct = default) public async Task DisconnectAsync(CancellationToken ct = default)
{ {
@@ -66,32 +56,7 @@ namespace ZB.MOM.WW.LmxProxy.Host.MxAccess
try try
{ {
await _staThread.DispatchAsync(() => await Task.Run(() => DisconnectInternal());
{
if (_lmxProxy != null && _connectionHandle > 0)
{
try
{
// Remove event handlers first
_lmxProxy.OnDataChange -= OnDataChange;
_lmxProxy.OnWriteComplete -= OnWriteComplete;
// Unregister
_lmxProxy.Unregister(_connectionHandle);
}
catch (Exception ex)
{
Log.Warning(ex, "Error during MxAccess unregister");
}
finally
{
// Force-release COM object
Marshal.ReleaseComObject(_lmxProxy);
_lmxProxy = null;
_connectionHandle = 0;
}
}
});
SetState(ConnectionState.Disconnected); SetState(ConnectionState.Disconnected);
Log.Information("Disconnected from MxAccess"); Log.Information("Disconnected from MxAccess");
@@ -123,6 +88,88 @@ namespace ZB.MOM.WW.LmxProxy.Host.MxAccess
_reconnectCts?.Cancel(); _reconnectCts?.Cancel();
} }
/// <summary>Gets the UTC time when the connection was established.</summary>
public DateTime ConnectedSince
{
get { lock (_lock) { return _connectedSince; } }
}
// ── Internal synchronous methods ──────────
private void ConnectInternal()
{
lock (_lock)
{
// Create COM object
_lmxProxy = new LMXProxyServer();
// Wire event handlers
_lmxProxy.OnDataChange += OnDataChange;
_lmxProxy.OnWriteComplete += OnWriteComplete;
// Register with MxAccess
_connectionHandle = _lmxProxy.Register("ZB.MOM.WW.LmxProxy.Host");
if (_connectionHandle <= 0)
{
throw new InvalidOperationException("Failed to register with MxAccess - invalid handle returned");
}
}
}
private void DisconnectInternal()
{
lock (_lock)
{
if (_lmxProxy == null || _connectionHandle <= 0) return;
try
{
// Unadvise all active subscriptions before unregistering
foreach (var kvp in new Dictionary<string, int>(_addressToHandle))
{
try
{
_lmxProxy.UnAdvise(_connectionHandle, kvp.Value);
_lmxProxy.RemoveItem(_connectionHandle, kvp.Value);
}
catch (Exception ex)
{
Log.Debug(ex, "Error removing subscription for {Address} during disconnect", kvp.Key);
}
}
// Remove event handlers
_lmxProxy.OnDataChange -= OnDataChange;
_lmxProxy.OnWriteComplete -= OnWriteComplete;
// Unregister
_lmxProxy.Unregister(_connectionHandle);
}
catch (Exception ex)
{
Log.Warning(ex, "Error during MxAccess unregister");
}
finally
{
// Force-release COM object
try
{
Marshal.ReleaseComObject(_lmxProxy);
}
catch { }
_lmxProxy = null;
_connectionHandle = 0;
// Clear handle tracking (but keep _storedSubscriptions for reconnect)
_handleToAddress.Clear();
_addressToHandle.Clear();
_pendingWrites.Clear();
}
}
}
/// <summary> /// <summary>
/// Auto-reconnect monitor loop. Checks connection every monitorInterval. /// Auto-reconnect monitor loop. Checks connection every monitorInterval.
/// On disconnect, attempts reconnect. On failure, retries at next interval. /// On disconnect, attempts reconnect. On failure, retries at next interval.
@@ -166,22 +213,28 @@ namespace ZB.MOM.WW.LmxProxy.Host.MxAccess
} }
/// <summary> /// <summary>
/// Cleans up COM objects on the STA thread after a failed connection. /// Cleans up COM objects via Task.Run after a failed connection.
/// </summary> /// </summary>
private async Task CleanupComObjectsAsync() private async Task CleanupComObjectsAsync()
{ {
try try
{ {
await _staThread.DispatchAsync(() => await Task.Run(() =>
{ {
if (_lmxProxy != null) lock (_lock)
{ {
try { _lmxProxy.OnDataChange -= OnDataChange; } catch { } if (_lmxProxy != null)
try { _lmxProxy.OnWriteComplete -= OnWriteComplete; } catch { } {
try { Marshal.ReleaseComObject(_lmxProxy); } catch { } try { _lmxProxy.OnDataChange -= OnDataChange; } catch { }
_lmxProxy = null; try { _lmxProxy.OnWriteComplete -= OnWriteComplete; } catch { }
try { Marshal.ReleaseComObject(_lmxProxy); } catch { }
_lmxProxy = null;
}
_connectionHandle = 0;
_handleToAddress.Clear();
_addressToHandle.Clear();
_pendingWrites.Clear();
} }
_connectionHandle = 0;
}); });
} }
catch (Exception ex) catch (Exception ex)
@@ -189,11 +242,5 @@ namespace ZB.MOM.WW.LmxProxy.Host.MxAccess
Log.Warning(ex, "Error during COM object cleanup"); Log.Warning(ex, "Error during COM object cleanup");
} }
} }
/// <summary>Gets the UTC time when the connection was established.</summary>
public DateTime ConnectedSince
{
get { lock (_lock) { return _connectedSince; } }
}
} }
} }

View File

@@ -1,5 +1,6 @@
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Threading.Tasks;
using ArchestrA.MxAccess; using ArchestrA.MxAccess;
using Serilog; using Serilog;
using ZB.MOM.WW.LmxProxy.Host.Domain; using ZB.MOM.WW.LmxProxy.Host.Domain;
@@ -16,7 +17,6 @@ namespace ZB.MOM.WW.LmxProxy.Host.MxAccess
/// <summary> /// <summary>
/// COM event handler for MxAccess OnDataChange events. /// COM event handler for MxAccess OnDataChange events.
/// Called on the STA thread when a subscribed tag value changes.
/// Signature matches the ArchestrA.MxAccess ILMXProxyServerEvents interface. /// Signature matches the ArchestrA.MxAccess ILMXProxyServerEvents interface.
/// </summary> /// </summary>
private void OnDataChange( private void OnDataChange(
@@ -33,14 +33,32 @@ namespace ZB.MOM.WW.LmxProxy.Host.MxAccess
var timestamp = ConvertTimestamp(pftItemTimeStamp); var timestamp = ConvertTimestamp(pftItemTimeStamp);
var vtq = new Vtq(pvItemValue, timestamp, quality); var vtq = new Vtq(pvItemValue, timestamp, quality);
// We don't have the address from the COM callback — the reference code // Resolve address from handle map
// looks it up from _subscriptionsByHandle. For the v2 design, the string address;
// SubscriptionManager's global handler receives (address, vtq) via lock (_lock)
// OnTagValueChanged. The actual address resolution will be implemented {
// when the full subscription tracking is wired up on windev. if (!_handleToAddress.TryGetValue(phItemHandle, out address))
{
Log.Debug("OnDataChange for unknown handle {Handle}, ignoring", phItemHandle);
return;
}
}
// Route to the SubscriptionManager's global handler // Invoke the stored subscription callback
OnTagValueChanged?.Invoke(phItemHandle.ToString(), vtq); Action<string, Vtq> callback;
lock (_lock)
{
if (!_storedSubscriptions.TryGetValue(address, out callback))
{
Log.Debug("OnDataChange for {Address} but no callback registered", address);
return;
}
}
callback.Invoke(address, vtq);
// Also route to the SubscriptionManager's global handler
OnTagValueChanged?.Invoke(address, vtq);
} }
catch (Exception ex) catch (Exception ex)
{ {
@@ -57,11 +75,60 @@ namespace ZB.MOM.WW.LmxProxy.Host.MxAccess
int phItemHandle, int phItemHandle,
ref MXSTATUS_PROXY[] ItemStatus) ref MXSTATUS_PROXY[] ItemStatus)
{ {
// Write completion is currently fire-and-forget.
// Log for diagnostics.
try try
{ {
Log.Debug("WriteCompleted: handle {Handle}", phItemHandle); TaskCompletionSource<bool> tcs;
lock (_lock)
{
if (!_pendingWrites.TryGetValue(phItemHandle, out tcs))
{
Log.Debug("WriteComplete for unknown handle {Handle}", phItemHandle);
return;
}
_pendingWrites.Remove(phItemHandle);
}
if (ItemStatus != null && ItemStatus.Length > 0)
{
var status = ItemStatus[0];
if (status.success == 0)
{
string errorMsg = GetWriteErrorMessage(status.detail);
Log.Warning("Write failed for handle {Handle}: {Error} (Category={Category}, Detail={Detail})",
phItemHandle, errorMsg, status.category, status.detail);
tcs.TrySetException(new InvalidOperationException(
string.Format("Write failed: {0}", errorMsg)));
}
else
{
Log.Debug("Write completed successfully for handle {Handle}", phItemHandle);
tcs.TrySetResult(true);
}
}
else
{
// No status means success
Log.Debug("Write completed for handle {Handle} with no status", phItemHandle);
tcs.TrySetResult(true);
}
// Clean up the item after write completes
lock (_lock)
{
if (_lmxProxy != null && phItemHandle > 0)
{
try
{
_lmxProxy.UnAdvise(_connectionHandle, phItemHandle);
_lmxProxy.RemoveItem(_connectionHandle, phItemHandle);
}
catch (Exception ex)
{
Log.Debug(ex, "Error cleaning up after write for handle {Handle}", phItemHandle);
}
}
}
} }
catch (Exception ex) catch (Exception ex)
{ {
@@ -69,6 +136,20 @@ namespace ZB.MOM.WW.LmxProxy.Host.MxAccess
} }
} }
/// <summary>
/// Gets a human-readable error message for a write error code.
/// </summary>
private static string GetWriteErrorMessage(int errorCode)
{
switch (errorCode)
{
case 1008: return "User lacks proper security for write operation";
case 1012: return "Secured write required";
case 1013: return "Verified write required";
default: return string.Format("Unknown error code: {0}", errorCode);
}
}
/// <summary> /// <summary>
/// Converts a timestamp object to DateTime in UTC. /// Converts a timestamp object to DateTime in UTC.
/// </summary> /// </summary>

View File

@@ -12,7 +12,7 @@ namespace ZB.MOM.WW.LmxProxy.Host.MxAccess
{ {
/// <summary> /// <summary>
/// Reads a single tag value from MxAccess. /// Reads a single tag value from MxAccess.
/// Dispatched to STA thread with semaphore concurrency control. /// Uses subscribe-get-first-value-unsubscribe pattern (same as v1).
/// </summary> /// </summary>
public async Task<Vtq> ReadAsync(string address, CancellationToken ct = default) public async Task<Vtq> ReadAsync(string address, CancellationToken ct = default)
{ {
@@ -22,7 +22,7 @@ namespace ZB.MOM.WW.LmxProxy.Host.MxAccess
await _readSemaphore.WaitAsync(ct); await _readSemaphore.WaitAsync(ct);
try try
{ {
return await _staThread.DispatchAsync(() => ReadInternal(address)); return await ReadSingleValueAsync(address, ct);
} }
catch (Exception ex) catch (Exception ex)
{ {
@@ -61,8 +61,7 @@ namespace ZB.MOM.WW.LmxProxy.Host.MxAccess
/// <summary> /// <summary>
/// Writes a single tag value to MxAccess. /// Writes a single tag value to MxAccess.
/// Value should be a native .NET type (not string). Uses TypedValueConverter /// Uses Task.Run for COM calls with OnWriteComplete callback for confirmation.
/// on the gRPC layer; here the value is the boxed .NET object.
/// </summary> /// </summary>
public async Task WriteAsync(string address, object value, CancellationToken ct = default) public async Task WriteAsync(string address, object value, CancellationToken ct = default)
{ {
@@ -72,7 +71,7 @@ namespace ZB.MOM.WW.LmxProxy.Host.MxAccess
await _writeSemaphore.WaitAsync(ct); await _writeSemaphore.WaitAsync(ct);
try try
{ {
await _staThread.DispatchAsync(() => WriteInternal(address, value)); await WriteInternalAsync(address, value, ct);
} }
finally finally
{ {
@@ -130,47 +129,169 @@ namespace ZB.MOM.WW.LmxProxy.Host.MxAccess
return (false, (int)sw.ElapsedMilliseconds); return (false, (int)sw.ElapsedMilliseconds);
} }
// ── Internal COM calls (execute on STA thread) ────────── // ── Private read/write helpers ──────────
/// <summary> /// <summary>
/// Reads a single tag from MxAccess COM API. /// Reads a single value by subscribing, waiting for the first data change callback,
/// Must be called on the STA thread. /// then unsubscribing. This is the same pattern as v1.
/// </summary> /// </summary>
private Vtq ReadInternal(string address) private async Task<Vtq> ReadSingleValueAsync(string address, CancellationToken ct)
{ {
// The exact MxAccess COM API call depends on the ArchestrA.MXAccess interop assembly. var tcs = new TaskCompletionSource<Vtq>();
// Consult src-reference/Implementation/MxAccessClient.ReadWrite.cs for the exact IAsyncDisposable? subscription = null;
// method calls. MxAccess uses a subscribe-read-unsubscribe pattern for reads.
//
// For now, this throws NotImplementedException. The actual COM call will be
// implemented when testing on the windev machine with MxAccess available.
throw new NotImplementedException( try
"ReadInternal must be implemented using ArchestrA.MXAccess COM API. " + {
"See src-reference/Implementation/MxAccessClient.ReadWrite.cs for the exact pattern."); subscription = await SubscribeAsync(
new[] { address },
(addr, vtq) => { tcs.TrySetResult(vtq); },
ct);
return await WaitForReadResultAsync(tcs, ct);
}
finally
{
if (subscription != null)
{
await subscription.DisposeAsync();
}
}
} }
/// <summary> /// <summary>
/// Writes a single tag via MxAccess COM API. /// Waits for a read result with timeout.
/// Must be called on the STA thread.
/// </summary> /// </summary>
private void WriteInternal(string address, object value) private async Task<Vtq> WaitForReadResultAsync(TaskCompletionSource<Vtq> tcs, CancellationToken ct)
{ {
// The exact COM call pattern uses AddItem, AdviseSupervisory, Write. using (var cts = new CancellationTokenSource(_readTimeoutMs))
// Consult src-reference/Implementation/MxAccessClient.ReadWrite.cs for the exact method signature. using (ct.Register(() => cts.Cancel()))
{
cts.Token.Register(() => tcs.TrySetException(
new TimeoutException("Read timeout")));
return await tcs.Task;
}
}
throw new NotImplementedException( /// <summary>
"WriteInternal must be implemented using ArchestrA.MXAccess COM API. " + /// Internal write implementation using Task.Run for COM calls
"See src-reference/Implementation/MxAccessClient.ReadWrite.cs for the exact pattern."); /// and OnWriteComplete callback for confirmation.
/// </summary>
private async Task WriteInternalAsync(string address, object value, CancellationToken ct)
{
var tcs = new TaskCompletionSource<bool>();
int itemHandle = await SetupWriteOperationAsync(address, value, tcs, ct);
try
{
await WaitForWriteCompletionAsync(tcs, itemHandle, address, ct);
}
catch
{
await CleanupWriteOperationAsync(itemHandle);
throw;
}
}
/// <summary>
/// Sets up a write operation on the thread pool and returns the item handle.
/// </summary>
private async Task<int> SetupWriteOperationAsync(
string address, object value, TaskCompletionSource<bool> tcs, CancellationToken ct)
{
return await Task.Run(() =>
{
lock (_lock)
{
if (!IsConnected || _lmxProxy == null)
throw new InvalidOperationException("Not connected to MxAccess");
int itemHandle = 0;
try
{
// Add the item
itemHandle = _lmxProxy.AddItem(_connectionHandle, address);
// Advise to enable writing
_lmxProxy.AdviseSupervisory(_connectionHandle, itemHandle);
// Track pending write for OnWriteComplete callback
_pendingWrites[itemHandle] = tcs;
// Write the value (-1 = no security classification)
_lmxProxy.Write(_connectionHandle, itemHandle, value, -1);
return itemHandle;
}
catch (Exception ex)
{
// Clean up on failure
if (itemHandle > 0 && _lmxProxy != null)
{
try
{
_lmxProxy.UnAdvise(_connectionHandle, itemHandle);
_lmxProxy.RemoveItem(_connectionHandle, itemHandle);
_pendingWrites.Remove(itemHandle);
}
catch { }
}
Log.Error(ex, "Failed to write value to {Address}", address);
throw;
}
}
}, ct);
}
/// <summary>
/// Waits for write completion with timeout.
/// </summary>
private async Task WaitForWriteCompletionAsync(
TaskCompletionSource<bool> tcs, int itemHandle, string address, CancellationToken ct)
{
using (ct.Register(() => tcs.TrySetCanceled()))
{
var timeoutTask = Task.Delay(_writeTimeoutMs, ct);
var completedTask = await Task.WhenAny(tcs.Task, timeoutTask);
if (completedTask == timeoutTask)
{
await CleanupWriteOperationAsync(itemHandle);
throw new TimeoutException(
string.Format("Write operation to {0} timed out after {1}ms", address, _writeTimeoutMs));
}
await tcs.Task; // This will throw if the write failed
}
}
/// <summary>
/// Cleans up a write operation (unadvise + remove item).
/// </summary>
private async Task CleanupWriteOperationAsync(int itemHandle)
{
await Task.Run(() =>
{
lock (_lock)
{
_pendingWrites.Remove(itemHandle);
if (itemHandle > 0 && _lmxProxy != null)
{
try
{
_lmxProxy.UnAdvise(_connectionHandle, itemHandle);
_lmxProxy.RemoveItem(_connectionHandle, itemHandle);
}
catch { }
}
}
});
} }
/// <summary> /// <summary>
/// Maps an MxAccess OPC DA quality integer to the domain Quality enum. /// Maps an MxAccess OPC DA quality integer to the domain Quality enum.
/// The quality integer from MxAccess is the OPC DA quality byte.
/// </summary> /// </summary>
private static Quality MapQuality(int opcDaQuality) private static Quality MapQuality(int opcDaQuality)
{ {
// OPC DA quality is a byte value that directly maps to our Quality enum
if (Enum.IsDefined(typeof(Quality), (byte)opcDaQuality)) if (Enum.IsDefined(typeof(Quality), (byte)opcDaQuality))
return (Quality)(byte)opcDaQuality; return (Quality)(byte)opcDaQuality;

View File

@@ -13,6 +13,7 @@ namespace ZB.MOM.WW.LmxProxy.Host.MxAccess
/// <summary> /// <summary>
/// Subscribes to value changes for the specified addresses. /// Subscribes to value changes for the specified addresses.
/// Stores subscription state for reconnect replay. /// Stores subscription state for reconnect replay.
/// COM calls dispatched via Task.Run.
/// </summary> /// </summary>
public async Task<IAsyncDisposable> SubscribeAsync( public async Task<IAsyncDisposable> SubscribeAsync(
IEnumerable<string> addresses, IEnumerable<string> addresses,
@@ -24,19 +25,22 @@ namespace ZB.MOM.WW.LmxProxy.Host.MxAccess
var addressList = addresses.ToList(); var addressList = addresses.ToList();
await _staThread.DispatchAsync(() => await Task.Run(() =>
{ {
foreach (var address in addressList) lock (_lock)
{ {
SubscribeInternal(address); if (!IsConnected || _lmxProxy == null)
throw new InvalidOperationException("Not connected to MxAccess");
// Store for reconnect replay foreach (var address in addressList)
lock (_lock)
{ {
SubscribeInternal(address);
// Store for reconnect replay
_storedSubscriptions[address] = callback; _storedSubscriptions[address] = callback;
} }
} }
}); }, ct);
Log.Information("Subscribed to {Count} tags", addressList.Count); Log.Information("Subscribed to {Count} tags", addressList.Count);
@@ -50,14 +54,13 @@ namespace ZB.MOM.WW.LmxProxy.Host.MxAccess
{ {
var addressList = addresses.ToList(); var addressList = addresses.ToList();
await _staThread.DispatchAsync(() => await Task.Run(() =>
{ {
foreach (var address in addressList) lock (_lock)
{ {
UnsubscribeInternal(address); foreach (var address in addressList)
lock (_lock)
{ {
UnsubscribeInternal(address);
_storedSubscriptions.Remove(address); _storedSubscriptions.Remove(address);
} }
} }
@@ -81,53 +84,87 @@ namespace ZB.MOM.WW.LmxProxy.Host.MxAccess
Log.Information("Recreating {Count} stored subscriptions after reconnect", subscriptions.Count); Log.Information("Recreating {Count} stored subscriptions after reconnect", subscriptions.Count);
await _staThread.DispatchAsync(() => await Task.Run(() =>
{ {
foreach (var kvp in subscriptions) lock (_lock)
{ {
try foreach (var kvp in subscriptions)
{ {
SubscribeInternal(kvp.Key); try
} {
catch (Exception ex) SubscribeInternal(kvp.Key);
{ }
Log.Warning(ex, "Failed to recreate subscription for {Address}", kvp.Key); catch (Exception ex)
{
Log.Warning(ex, "Failed to recreate subscription for {Address}", kvp.Key);
}
} }
} }
}); });
} }
// ── Internal COM calls (execute on STA thread) ────────── // ── Internal COM calls ──────────
/// <summary> /// <summary>
/// Registers a tag subscription with MxAccess COM API (AddItem + AdviseSupervisory). /// Registers a tag subscription with MxAccess COM API (AddItem + AdviseSupervisory).
/// Must be called on the STA thread. /// Must be called while holding _lock.
/// </summary> /// </summary>
private void SubscribeInternal(string address) private void SubscribeInternal(string address)
{ {
// The exact MxAccess COM API call is: if (_lmxProxy == null || _connectionHandle <= 0)
// var itemHandle = _lmxProxy.AddItem(_connectionHandle, address); throw new InvalidOperationException("Not connected to MxAccess");
// _lmxProxy.AdviseSupervisory(_connectionHandle, itemHandle);
//
// Consult src-reference/Implementation/MxAccessClient.Subscription.cs
throw new NotImplementedException( // If already subscribed to this address, skip
"SubscribeInternal must be implemented using ArchestrA.MXAccess COM API. " + if (_addressToHandle.ContainsKey(address))
"See src-reference/Implementation/MxAccessClient.Subscription.cs for the exact pattern."); {
Log.Debug("Already subscribed to {Address}, skipping", address);
return;
}
// Add the item to MxAccess
int itemHandle = _lmxProxy.AddItem(_connectionHandle, address);
// Track handle-to-address and address-to-handle mappings
_handleToAddress[itemHandle] = address;
_addressToHandle[address] = itemHandle;
// Advise (subscribe) for data change events
_lmxProxy.AdviseSupervisory(_connectionHandle, itemHandle);
Log.Debug("Subscribed to {Address} with handle {Handle}", address, itemHandle);
} }
/// <summary> /// <summary>
/// Unregisters a tag subscription from MxAccess COM API (UnAdvise + RemoveItem). /// Unregisters a tag subscription from MxAccess COM API (UnAdvise + RemoveItem).
/// Must be called on the STA thread. /// Must be called while holding _lock.
/// </summary> /// </summary>
private void UnsubscribeInternal(string address) private void UnsubscribeInternal(string address)
{ {
// The exact MxAccess COM API call is: if (!_addressToHandle.TryGetValue(address, out int itemHandle))
// _lmxProxy.UnAdvise(_connectionHandle, itemHandle); {
// _lmxProxy.RemoveItem(_connectionHandle, itemHandle); Log.Debug("No active subscription for {Address}, skipping unsubscribe", address);
return;
}
throw new NotImplementedException( try
"UnsubscribeInternal must be implemented using ArchestrA.MXAccess COM API."); {
if (_lmxProxy != null && _connectionHandle > 0)
{
_lmxProxy.UnAdvise(_connectionHandle, itemHandle);
_lmxProxy.RemoveItem(_connectionHandle, itemHandle);
}
}
catch (Exception ex)
{
Log.Warning(ex, "Error unsubscribing from {Address} (handle {Handle})", address, itemHandle);
}
finally
{
_handleToAddress.Remove(itemHandle);
_addressToHandle.Remove(address);
}
Log.Debug("Unsubscribed from {Address} (handle {Handle})", address, itemHandle);
} }
/// <summary> /// <summary>

View File

@@ -10,13 +10,13 @@ namespace ZB.MOM.WW.LmxProxy.Host.MxAccess
{ {
/// <summary> /// <summary>
/// Wraps the ArchestrA MXAccess COM API. All COM operations /// Wraps the ArchestrA MXAccess COM API. All COM operations
/// execute on a dedicated STA thread via <see cref="StaDispatchThread"/>. /// execute via Task.Run (thread pool / MTA), relying on COM
/// marshaling to handle cross-apartment calls.
/// </summary> /// </summary>
public sealed partial class MxAccessClient : IScadaClient public sealed partial class MxAccessClient : IScadaClient
{ {
private static readonly ILogger Log = Serilog.Log.ForContext<MxAccessClient>(); private static readonly ILogger Log = Serilog.Log.ForContext<MxAccessClient>();
private readonly StaDispatchThread _staThread;
private readonly object _lock = new object(); private readonly object _lock = new object();
private readonly int _maxConcurrentOperations; private readonly int _maxConcurrentOperations;
private readonly int _readTimeoutMs; private readonly int _readTimeoutMs;
@@ -29,7 +29,7 @@ namespace ZB.MOM.WW.LmxProxy.Host.MxAccess
private readonly SemaphoreSlim _readSemaphore; private readonly SemaphoreSlim _readSemaphore;
private readonly SemaphoreSlim _writeSemaphore; private readonly SemaphoreSlim _writeSemaphore;
// COM objects — only accessed on STA thread // COM objects
private LMXProxyServer? _lmxProxy; private LMXProxyServer? _lmxProxy;
private int _connectionHandle; private int _connectionHandle;
@@ -45,6 +45,17 @@ namespace ZB.MOM.WW.LmxProxy.Host.MxAccess
private readonly Dictionary<string, Action<string, Vtq>> _storedSubscriptions private readonly Dictionary<string, Action<string, Vtq>> _storedSubscriptions
= new Dictionary<string, Action<string, Vtq>>(StringComparer.OrdinalIgnoreCase); = new Dictionary<string, Action<string, Vtq>>(StringComparer.OrdinalIgnoreCase);
// Handle-to-address mapping for resolving COM callbacks
private readonly Dictionary<int, string> _handleToAddress = new Dictionary<int, string>();
// Address-to-handle mapping for unsubscribe by address
private readonly Dictionary<string, int> _addressToHandle
= new Dictionary<string, int>(StringComparer.OrdinalIgnoreCase);
// Pending write operations tracked by item handle
private readonly Dictionary<int, TaskCompletionSource<bool>> _pendingWrites
= new Dictionary<int, TaskCompletionSource<bool>>();
public MxAccessClient( public MxAccessClient(
int maxConcurrentOperations = 10, int maxConcurrentOperations = 10,
int readTimeoutSeconds = 5, int readTimeoutSeconds = 5,
@@ -64,7 +75,6 @@ namespace ZB.MOM.WW.LmxProxy.Host.MxAccess
_readSemaphore = new SemaphoreSlim(maxConcurrentOperations, maxConcurrentOperations); _readSemaphore = new SemaphoreSlim(maxConcurrentOperations, maxConcurrentOperations);
_writeSemaphore = new SemaphoreSlim(maxConcurrentOperations, maxConcurrentOperations); _writeSemaphore = new SemaphoreSlim(maxConcurrentOperations, maxConcurrentOperations);
_staThread = new StaDispatchThread();
} }
public bool IsConnected public bool IsConnected
@@ -123,7 +133,6 @@ namespace ZB.MOM.WW.LmxProxy.Host.MxAccess
_readSemaphore.Dispose(); _readSemaphore.Dispose();
_writeSemaphore.Dispose(); _writeSemaphore.Dispose();
_staThread.Dispose();
_reconnectCts?.Dispose(); _reconnectCts?.Dispose();
} }
} }

View File

@@ -1,123 +0,0 @@
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
using System.Windows.Forms;
using Serilog;
namespace ZB.MOM.WW.LmxProxy.Host.MxAccess
{
/// <summary>
/// Dedicated STA thread with a message pump for COM interop.
/// All COM operations are dispatched to this thread via a BlockingCollection.
/// </summary>
public sealed class StaDispatchThread : IDisposable
{
private static readonly ILogger Log = Serilog.Log.ForContext<StaDispatchThread>();
private readonly BlockingCollection<Action> _workQueue = new BlockingCollection<Action>();
private readonly Thread _staThread;
private volatile bool _disposed;
public StaDispatchThread(string threadName = "MxAccess-STA")
{
_staThread = new Thread(StaThreadLoop)
{
Name = threadName,
IsBackground = true
};
_staThread.SetApartmentState(ApartmentState.STA);
_staThread.Start();
Log.Information("STA dispatch thread '{ThreadName}' started", threadName);
}
/// <summary>
/// Dispatches an action to the STA thread and returns a Task that completes
/// when the action finishes.
/// </summary>
public Task DispatchAsync(Action action)
{
if (_disposed) throw new ObjectDisposedException(nameof(StaDispatchThread));
var tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
_workQueue.Add(() =>
{
try
{
action();
tcs.TrySetResult(true);
}
catch (Exception ex)
{
tcs.TrySetException(ex);
}
});
return tcs.Task;
}
/// <summary>
/// Dispatches a function to the STA thread and returns its result.
/// </summary>
public Task<T> DispatchAsync<T>(Func<T> func)
{
if (_disposed) throw new ObjectDisposedException(nameof(StaDispatchThread));
var tcs = new TaskCompletionSource<T>(TaskCreationOptions.RunContinuationsAsynchronously);
_workQueue.Add(() =>
{
try
{
var result = func();
tcs.TrySetResult(result);
}
catch (Exception ex)
{
tcs.TrySetException(ex);
}
});
return tcs.Task;
}
private void StaThreadLoop()
{
Log.Debug("STA thread loop started");
// Process the work queue. GetConsumingEnumerable blocks until
// items are available or the collection is marked complete.
foreach (var action in _workQueue.GetConsumingEnumerable())
{
try
{
action();
}
catch (Exception ex)
{
// Should not happen — actions set TCS exceptions internally.
Log.Error(ex, "Unhandled exception on STA thread");
}
// Pump COM messages between work items
Application.DoEvents();
}
Log.Debug("STA thread loop exited");
}
public void Dispose()
{
if (_disposed) return;
_disposed = true;
_workQueue.CompleteAdding();
// Wait for the STA thread to drain and exit
if (_staThread.IsAlive && !_staThread.Join(TimeSpan.FromSeconds(10)))
{
Log.Warning("STA thread did not exit within 10 seconds");
}
_workQueue.Dispose();
Log.Information("STA dispatch thread disposed");
}
}
}

View File

@@ -46,7 +46,6 @@
<HintPath>..\..\lib\ArchestrA.MXAccess.dll</HintPath> <HintPath>..\..\lib\ArchestrA.MXAccess.dll</HintPath>
<Private>true</Private> <Private>true</Private>
</Reference> </Reference>
<Reference Include="System.Windows.Forms" />
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>

View File

@@ -1,9 +1,9 @@
{ {
"LmxProxy": { "LmxProxy": {
"Host": "10.100.0.48", "Host": "localhost",
"Port": 50052, "Port": 50100,
"ReadWriteApiKey": "REPLACE_WITH_ACTUAL_KEY", "ReadWriteApiKey": "c4559c7c6acc60a997135c1381162e3c30f4572ece78dd933c1a626e6fd933b4",
"ReadOnlyApiKey": "REPLACE_WITH_ACTUAL_KEY", "ReadOnlyApiKey": "a77d090d4adcfeaac1a50379ec5f971ff282c998599fd8ccf410090c9f290150",
"InvalidApiKey": "invalid-key-that-does-not-exist" "InvalidApiKey": "invalid-key-that-does-not-exist"
} }
} }

View File

@@ -1,86 +0,0 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using FluentAssertions;
using Xunit;
using ZB.MOM.WW.LmxProxy.Host.MxAccess;
namespace ZB.MOM.WW.LmxProxy.Host.Tests.MxAccess
{
public class StaDispatchThreadTests
{
[Fact]
public async Task DispatchAsync_ExecutesOnStaThread()
{
using var sta = new StaDispatchThread("Test-STA");
var threadId = await sta.DispatchAsync(() => Thread.CurrentThread.ManagedThreadId);
threadId.Should().NotBe(Thread.CurrentThread.ManagedThreadId);
}
[Fact]
public async Task DispatchAsync_ReturnsResult()
{
using var sta = new StaDispatchThread("Test-STA");
var result = await sta.DispatchAsync(() => 42);
result.Should().Be(42);
}
[Fact]
public async Task DispatchAsync_PropagatesException()
{
using var sta = new StaDispatchThread("Test-STA");
Func<Task> act = () => sta.DispatchAsync<int>(() => throw new InvalidOperationException("test error"));
await act.Should().ThrowAsync<InvalidOperationException>().WithMessage("test error");
}
[Fact]
public async Task DispatchAsync_Action_Completes()
{
using var sta = new StaDispatchThread("Test-STA");
int value = 0;
await sta.DispatchAsync(() => { value = 99; });
value.Should().Be(99);
}
[Fact]
public void Dispose_CompletesGracefully()
{
var sta = new StaDispatchThread("Test-STA");
sta.Dispose(); // Should not throw
}
[Fact]
public void DispatchAfterDispose_ThrowsObjectDisposedException()
{
var sta = new StaDispatchThread("Test-STA");
sta.Dispose();
Func<Task> act = () => sta.DispatchAsync(() => 42);
act.Should().ThrowAsync<ObjectDisposedException>();
}
[Fact]
public async Task MultipleDispatches_ExecuteInOrder()
{
using var sta = new StaDispatchThread("Test-STA");
var results = new System.Collections.Concurrent.ConcurrentBag<int>();
var tasks = new Task[10];
for (int i = 0; i < 10; i++)
{
int idx = i;
tasks[i] = sta.DispatchAsync(() => { results.Add(idx); });
}
await Task.WhenAll(tasks);
results.Count.Should().Be(10);
}
[Fact]
public async Task StaThread_HasStaApartmentState()
{
using var sta = new StaDispatchThread("Test-STA");
var apartmentState = await sta.DispatchAsync(() => Thread.CurrentThread.GetApartmentState());
apartmentState.Should().Be(ApartmentState.STA);
}
}
}