Design doc covers architecture, v2 protocol (TypedValue/QualityCode), COM threading model, session lifecycle, subscription semantics, error model, and guardrails. Implementation plans are detailed enough for autonomous Claude Code execution. Verified all dev tooling on windev (Grpc.Tools, protobuf-net.Grpc, Polly v8, xUnit).
72 KiB
Phase 2: Host Core Components — Implementation Plan
Prerequisites
- Phase 1 complete and passing: all projects build, all unit tests pass, cross-stack serialization verified.
- The following Phase 1 artifacts exist and are used throughout this phase:
src/ZB.MOM.WW.LmxProxy.Host/Domain/— Quality, Vtq, ConnectionState, IScadaClient, TypedValueConverter, QualityCodeMappersrc/ZB.MOM.WW.LmxProxy.Host/Grpc/Protos/scada.proto— v2 proto (generatesScada.*classes)
Guardrails
- COM calls only on STA thread — no
Task.Runfor COM operations. All go through the STA dispatch queue. - No v1 code — reference
src-reference/for patterns but write fresh. - status_code is canonical for quality — use
QualityCodeMapperfor all quality conversions. - No string serialization heuristics — use
TypedValueConverterfor all value conversions. - Unit tests for every component — test before moving to Phase 3.
- Each step must compile before proceeding to the next.
Step 1: MxAccessClient — STA Dispatch Thread
File: src/ZB.MOM.WW.LmxProxy.Host/MxAccess/StaDispatchThread.cs
Namespace: ZB.MOM.WW.LmxProxy.Host.MxAccess
This is the foundation for all COM interop. MxAccess is an STA COM component — all COM calls must execute on a dedicated STA thread with a message pump.
Class Design
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
using System.Windows.Forms;
using Serilog;
namespace ZB.MOM.WW.LmxProxy.Host.MxAccess
{
/// <summary>
/// Dedicated STA thread with a message pump for COM interop.
/// All COM operations are dispatched to this thread via a BlockingCollection.
/// </summary>
public sealed class StaDispatchThread : IDisposable
{
private static readonly ILogger Log = Serilog.Log.ForContext<StaDispatchThread>();
private readonly BlockingCollection<Action> _workQueue = new BlockingCollection<Action>();
private readonly Thread _staThread;
private volatile bool _disposed;
public StaDispatchThread(string threadName = "MxAccess-STA")
{
_staThread = new Thread(StaThreadLoop)
{
Name = threadName,
IsBackground = true
};
_staThread.SetApartmentState(ApartmentState.STA);
_staThread.Start();
Log.Information("STA dispatch thread '{ThreadName}' started", threadName);
}
/// <summary>
/// Dispatches an action to the STA thread and returns a Task that completes
/// when the action finishes.
/// </summary>
public Task DispatchAsync(Action action)
{
if (_disposed) throw new ObjectDisposedException(nameof(StaDispatchThread));
var tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
_workQueue.Add(() =>
{
try
{
action();
tcs.TrySetResult(true);
}
catch (Exception ex)
{
tcs.TrySetException(ex);
}
});
return tcs.Task;
}
/// <summary>
/// Dispatches a function to the STA thread and returns its result.
/// </summary>
public Task<T> DispatchAsync<T>(Func<T> func)
{
if (_disposed) throw new ObjectDisposedException(nameof(StaDispatchThread));
var tcs = new TaskCompletionSource<T>(TaskCreationOptions.RunContinuationsAsynchronously);
_workQueue.Add(() =>
{
try
{
var result = func();
tcs.TrySetResult(result);
}
catch (Exception ex)
{
tcs.TrySetException(ex);
}
});
return tcs.Task;
}
private void StaThreadLoop()
{
Log.Debug("STA thread loop started");
// Process the work queue. GetConsumingEnumerable blocks until
// items are available or the collection is marked complete.
foreach (var action in _workQueue.GetConsumingEnumerable())
{
try
{
action();
}
catch (Exception ex)
{
// Should not happen — actions set TCS exceptions internally.
Log.Error(ex, "Unhandled exception on STA thread");
}
// Pump COM messages between work items
Application.DoEvents();
}
Log.Debug("STA thread loop exited");
}
public void Dispose()
{
if (_disposed) return;
_disposed = true;
_workQueue.CompleteAdding();
// Wait for the STA thread to drain and exit
if (_staThread.IsAlive && !_staThread.Join(TimeSpan.FromSeconds(10)))
{
Log.Warning("STA thread did not exit within 10 seconds");
}
_workQueue.Dispose();
Log.Information("STA dispatch thread disposed");
}
}
}
Key design decisions:
BlockingCollection<Action>is the dispatch queue (thread-safe, blocking consumer).TaskCompletionSource<T>bridges the STA thread back to async callers.Application.DoEvents()pumps COM messages between work items (required for MxAccess callbacks like OnDataChange).RunContinuationsAsynchronouslyprevents continuations from running on the STA thread.- On dispose,
CompleteAdding()signals the loop to exit, thenJoin(10s)waits for drain.
Dependency: The Host project already references System.Windows.Forms implicitly through .NET Framework 4.8. If the build fails with a missing reference, add <Reference Include="System.Windows.Forms" /> to the csproj <ItemGroup>.
Step 2: MxAccessClient — Connection
File: src/ZB.MOM.WW.LmxProxy.Host/MxAccess/MxAccessClient.cs (main partial class)
File: src/ZB.MOM.WW.LmxProxy.Host/MxAccess/MxAccessClient.Connection.cs (connection partial)
2.1 Main class file
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Serilog;
using ZB.MOM.WW.LmxProxy.Host.Domain;
namespace ZB.MOM.WW.LmxProxy.Host.MxAccess
{
/// <summary>
/// Wraps the ArchestrA MXAccess COM API. All COM operations
/// execute on a dedicated STA thread via <see cref="StaDispatchThread"/>.
/// </summary>
public sealed partial class MxAccessClient : IScadaClient
{
private static readonly ILogger Log = Serilog.Log.ForContext<MxAccessClient>();
private readonly StaDispatchThread _staThread;
private readonly object _lock = new object();
private readonly int _maxConcurrentOperations;
private readonly int _readTimeoutMs;
private readonly int _writeTimeoutMs;
private readonly int _monitorIntervalMs;
private readonly bool _autoReconnect;
private readonly string? _nodeName;
private readonly string? _galaxyName;
private readonly SemaphoreSlim _readSemaphore;
private readonly SemaphoreSlim _writeSemaphore;
// COM objects — only accessed on STA thread
private ArchestrA.MxAccess.LMXProxyServerClass? _lmxProxy;
private int _connectionHandle;
// State
private ConnectionState _connectionState = ConnectionState.Disconnected;
private DateTime _connectedSince;
private bool _disposed;
// Reconnect
private CancellationTokenSource? _reconnectCts;
// Stored subscriptions for reconnect replay
private readonly Dictionary<string, Action<string, Vtq>> _storedSubscriptions
= new Dictionary<string, Action<string, Vtq>>(StringComparer.OrdinalIgnoreCase);
public MxAccessClient(
int maxConcurrentOperations = 10,
int readTimeoutSeconds = 5,
int writeTimeoutSeconds = 5,
int monitorIntervalSeconds = 5,
bool autoReconnect = true,
string? nodeName = null,
string? galaxyName = null)
{
_maxConcurrentOperations = maxConcurrentOperations;
_readTimeoutMs = readTimeoutSeconds * 1000;
_writeTimeoutMs = writeTimeoutSeconds * 1000;
_monitorIntervalMs = monitorIntervalSeconds * 1000;
_autoReconnect = autoReconnect;
_nodeName = nodeName;
_galaxyName = galaxyName;
_readSemaphore = new SemaphoreSlim(maxConcurrentOperations, maxConcurrentOperations);
_writeSemaphore = new SemaphoreSlim(maxConcurrentOperations, maxConcurrentOperations);
_staThread = new StaDispatchThread();
}
public bool IsConnected
{
get
{
lock (_lock)
{
return _lmxProxy != null
&& _connectionState == ConnectionState.Connected
&& _connectionHandle > 0;
}
}
}
public ConnectionState ConnectionState
{
get { lock (_lock) { return _connectionState; } }
}
public event EventHandler<ConnectionStateChangedEventArgs>? ConnectionStateChanged;
private void SetState(ConnectionState newState, string? message = null)
{
ConnectionState previousState;
lock (_lock)
{
previousState = _connectionState;
_connectionState = newState;
}
if (previousState != newState)
{
Log.Information("Connection state changed: {Previous} -> {Current} {Message}",
previousState, newState, message ?? "");
ConnectionStateChanged?.Invoke(this,
new ConnectionStateChangedEventArgs(previousState, newState, message));
}
}
public async ValueTask DisposeAsync()
{
if (_disposed) return;
_disposed = true;
_reconnectCts?.Cancel();
try
{
await DisconnectAsync();
}
catch (Exception ex)
{
Log.Warning(ex, "Error during disposal disconnect");
}
_readSemaphore.Dispose();
_writeSemaphore.Dispose();
_staThread.Dispose();
_reconnectCts?.Dispose();
}
}
}
2.2 Connection partial class
using System;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;
using Serilog;
using ZB.MOM.WW.LmxProxy.Host.Domain;
namespace ZB.MOM.WW.LmxProxy.Host.MxAccess
{
public sealed partial class MxAccessClient
{
/// <summary>
/// Connects to MxAccess on the STA thread.
/// </summary>
public async Task ConnectAsync(CancellationToken ct = default)
{
if (_disposed) throw new ObjectDisposedException(nameof(MxAccessClient));
if (IsConnected) return;
SetState(ConnectionState.Connecting);
try
{
await _staThread.DispatchAsync(() =>
{
// Create COM object
_lmxProxy = new ArchestrA.MxAccess.LMXProxyServerClass();
// Wire event handlers
_lmxProxy.DataChanged += OnDataChange;
_lmxProxy.WriteCompleted += OnWriteComplete;
// Register with MxAccess
_connectionHandle = _lmxProxy.Register("ZB.MOM.WW.LmxProxy.Host");
});
lock (_lock)
{
_connectedSince = DateTime.UtcNow;
}
SetState(ConnectionState.Connected);
Log.Information("Connected to MxAccess (handle={Handle})", _connectionHandle);
// Recreate any stored subscriptions from a previous connection
await RecreateStoredSubscriptionsAsync();
}
catch (Exception ex)
{
Log.Error(ex, "Failed to connect to MxAccess");
await CleanupComObjectsAsync();
SetState(ConnectionState.Error, ex.Message);
throw;
}
}
/// <summary>
/// Disconnects from MxAccess on the STA thread.
/// </summary>
public async Task DisconnectAsync(CancellationToken ct = default)
{
if (!IsConnected) return;
SetState(ConnectionState.Disconnecting);
try
{
await _staThread.DispatchAsync(() =>
{
if (_lmxProxy != null && _connectionHandle > 0)
{
try
{
// Remove event handlers first
_lmxProxy.DataChanged -= OnDataChange;
_lmxProxy.WriteCompleted -= OnWriteComplete;
// Unregister
_lmxProxy.Unregister(_connectionHandle);
}
catch (Exception ex)
{
Log.Warning(ex, "Error during MxAccess unregister");
}
finally
{
// Force-release COM object
Marshal.ReleaseComObject(_lmxProxy);
_lmxProxy = null;
_connectionHandle = 0;
}
}
});
SetState(ConnectionState.Disconnected);
Log.Information("Disconnected from MxAccess");
}
catch (Exception ex)
{
Log.Error(ex, "Error during disconnect");
SetState(ConnectionState.Error, ex.Message);
}
}
/// <summary>
/// Starts the auto-reconnect monitor loop.
/// Call this after initial ConnectAsync succeeds.
/// </summary>
public void StartMonitorLoop()
{
if (!_autoReconnect) return;
_reconnectCts = new CancellationTokenSource();
Task.Run(() => MonitorConnectionAsync(_reconnectCts.Token));
}
/// <summary>
/// Stops the auto-reconnect monitor loop.
/// Waits up to 5 seconds for the loop to exit.
/// </summary>
public void StopMonitorLoop()
{
_reconnectCts?.Cancel();
}
/// <summary>
/// Auto-reconnect monitor loop. Checks connection every monitorInterval.
/// On disconnect, attempts reconnect. On failure, retries at next interval.
/// </summary>
private async Task MonitorConnectionAsync(CancellationToken ct)
{
Log.Information("Connection monitor loop started (interval={IntervalMs}ms)", _monitorIntervalMs);
while (!ct.IsCancellationRequested)
{
try
{
await Task.Delay(_monitorIntervalMs, ct);
}
catch (OperationCanceledException)
{
break;
}
if (IsConnected) continue;
Log.Information("MxAccess disconnected, attempting reconnect...");
SetState(ConnectionState.Reconnecting);
try
{
await ConnectAsync(ct);
Log.Information("Reconnected to MxAccess successfully");
}
catch (OperationCanceledException)
{
break;
}
catch (Exception ex)
{
Log.Warning(ex, "Reconnect attempt failed, will retry in {IntervalMs}ms", _monitorIntervalMs);
}
}
Log.Information("Connection monitor loop exited");
}
/// <summary>
/// Cleans up COM objects on the STA thread after a failed connection.
/// </summary>
private async Task CleanupComObjectsAsync()
{
try
{
await _staThread.DispatchAsync(() =>
{
if (_lmxProxy != null)
{
try { _lmxProxy.DataChanged -= OnDataChange; } catch { }
try { _lmxProxy.WriteCompleted -= OnWriteComplete; } catch { }
try { Marshal.ReleaseComObject(_lmxProxy); } catch { }
_lmxProxy = null;
}
_connectionHandle = 0;
});
}
catch (Exception ex)
{
Log.Warning(ex, "Error during COM object cleanup");
}
}
/// <summary>Gets the UTC time when the connection was established.</summary>
public DateTime ConnectedSince
{
get { lock (_lock) { return _connectedSince; } }
}
}
}
Note: The exact COM interop method names (Register, Unregister, DataChanged, WriteCompleted) come from the ArchestrA.MXAccess COM interop assembly. Consult src-reference/ZB.MOM.WW.LmxProxy.Host/Implementation/MxAccessClient.Connection.cs for the exact method signatures and event wiring patterns. The reference code uses _lmxProxy.DataChanged += OnDataChange style — match that exactly.
Step 3: MxAccessClient — Read/Write
File: src/ZB.MOM.WW.LmxProxy.Host/MxAccess/MxAccessClient.ReadWrite.cs
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Serilog;
using ZB.MOM.WW.LmxProxy.Host.Domain;
namespace ZB.MOM.WW.LmxProxy.Host.MxAccess
{
public sealed partial class MxAccessClient
{
/// <summary>
/// Reads a single tag value from MxAccess.
/// Dispatched to STA thread with semaphore concurrency control.
/// </summary>
public async Task<Vtq> ReadAsync(string address, CancellationToken ct = default)
{
if (!IsConnected)
return Vtq.New(null, Quality.Bad_NotConnected);
await _readSemaphore.WaitAsync(ct);
try
{
return await _staThread.DispatchAsync(() => ReadInternal(address));
}
catch (Exception ex)
{
Log.Error(ex, "ReadAsync failed for tag {Address}", address);
return Vtq.New(null, Quality.Bad_CommFailure);
}
finally
{
_readSemaphore.Release();
}
}
/// <summary>
/// Reads multiple tags with semaphore-controlled concurrency (max 10 concurrent).
/// Each tag is read independently. Partial failures return Bad quality for failed tags.
/// </summary>
public async Task<IReadOnlyDictionary<string, Vtq>> ReadBatchAsync(
IEnumerable<string> addresses, CancellationToken ct = default)
{
var addressList = addresses.ToList();
var results = new Dictionary<string, Vtq>(addressList.Count, StringComparer.OrdinalIgnoreCase);
var tasks = addressList.Select(async address =>
{
var vtq = await ReadAsync(address, ct);
return (address, vtq);
});
foreach (var task in await Task.WhenAll(tasks))
{
results[task.address] = task.vtq;
}
return results;
}
/// <summary>
/// Writes a single tag value to MxAccess.
/// Value should be a native .NET type (not string). Uses TypedValueConverter
/// on the gRPC layer; here the value is the boxed .NET object.
/// </summary>
public async Task WriteAsync(string address, object value, CancellationToken ct = default)
{
if (!IsConnected)
throw new InvalidOperationException("Not connected to MxAccess");
await _writeSemaphore.WaitAsync(ct);
try
{
await _staThread.DispatchAsync(() => WriteInternal(address, value));
}
finally
{
_writeSemaphore.Release();
}
}
/// <summary>
/// Writes multiple tag values with semaphore-controlled concurrency.
/// </summary>
public async Task WriteBatchAsync(
IReadOnlyDictionary<string, object> values, CancellationToken ct = default)
{
var tasks = values.Select(async kvp =>
{
await WriteAsync(kvp.Key, kvp.Value, ct);
});
await Task.WhenAll(tasks);
}
/// <summary>
/// Writes a batch, then polls flagTag until it equals flagValue or timeout expires.
/// Uses type-aware comparison via TypedValueEquals.
/// </summary>
public async Task<(bool flagReached, int elapsedMs)> WriteBatchAndWaitAsync(
IReadOnlyDictionary<string, object> values,
string flagTag,
object flagValue,
int timeoutMs,
int pollIntervalMs,
CancellationToken ct = default)
{
// Write all values first
await WriteBatchAsync(values, ct);
// Poll flag tag
var sw = System.Diagnostics.Stopwatch.StartNew();
var effectiveTimeout = timeoutMs > 0 ? timeoutMs : 5000;
var effectiveInterval = pollIntervalMs > 0 ? pollIntervalMs : 100;
while (sw.ElapsedMilliseconds < effectiveTimeout)
{
ct.ThrowIfCancellationRequested();
var vtq = await ReadAsync(flagTag, ct);
if (vtq.Quality.IsGood() && TypedValueEquals(vtq.Value, flagValue))
{
return (true, (int)sw.ElapsedMilliseconds);
}
await Task.Delay(effectiveInterval, ct);
}
return (false, (int)sw.ElapsedMilliseconds);
}
/// <summary>
/// Type-aware equality comparison for WriteBatchAndWait flag matching.
/// Both values must be the same CLR type. Mismatched types are never equal.
/// </summary>
private static bool TypedValueEquals(object? a, object? b)
{
if (a == null && b == null) return true;
if (a == null || b == null) return false;
if (a.GetType() != b.GetType()) return false;
// Array types need element-by-element comparison
if (a is Array arrA && b is Array arrB)
{
if (arrA.Length != arrB.Length) return false;
for (int i = 0; i < arrA.Length; i++)
{
if (!Equals(arrA.GetValue(i), arrB.GetValue(i)))
return false;
}
return true;
}
return Equals(a, b);
}
// ── Internal COM calls (execute on STA thread) ──────────
/// <summary>
/// Reads a single tag from MxAccess COM API.
/// Must be called on the STA thread.
/// </summary>
private Vtq ReadInternal(string address)
{
// This is a skeleton — the exact MxAccess COM API call depends on the
// ArchestrA.MXAccess interop assembly. Consult src-reference for the exact
// method calls. The pattern is:
//
// object value = null;
// int quality = 0;
// DateTime timestamp = DateTime.MinValue;
// _lmxProxy.Read(_connectionHandle, address, ref value, ref quality, ref timestamp);
//
// Then convert the COM value to a Vtq:
// return new Vtq(value, timestamp.ToUniversalTime(), MapQuality(quality));
//
// For now, this throws NotImplementedException. The actual COM call will be
// implemented when testing on the windev machine with MxAccess available.
throw new NotImplementedException(
"ReadInternal must be implemented using ArchestrA.MXAccess COM API. " +
"See src-reference/Implementation/MxAccessClient.ReadWrite.cs for the exact pattern.");
}
/// <summary>
/// Writes a single tag via MxAccess COM API.
/// Must be called on the STA thread.
/// </summary>
private void WriteInternal(string address, object value)
{
// Similar to ReadInternal — the exact COM call is:
// _lmxProxy.Write(_connectionHandle, address, value);
//
// Consult src-reference for the exact method signature.
throw new NotImplementedException(
"WriteInternal must be implemented using ArchestrA.MXAccess COM API. " +
"See src-reference/Implementation/MxAccessClient.ReadWrite.cs for the exact pattern.");
}
/// <summary>
/// Maps an MxAccess OPC DA quality integer to the domain Quality enum.
/// The quality integer from MxAccess is the OPC DA quality byte.
/// </summary>
private static Quality MapQuality(int opcDaQuality)
{
// OPC DA quality is a byte value that directly maps to our Quality enum
if (Enum.IsDefined(typeof(Quality), (byte)opcDaQuality))
return (Quality)(byte)opcDaQuality;
// Fallback: use category bits
if (opcDaQuality >= 192) return Quality.Good;
if (opcDaQuality >= 64) return Quality.Uncertain;
return Quality.Bad;
}
}
}
Important note on ReadInternal/WriteInternal: These methods contain throw new NotImplementedException() because the exact MxAccess COM API signatures depend on the ArchestrA.MXAccess interop assembly, which is only available on the Windows development machine. The implementing session should:
- Read
src-reference/ZB.MOM.WW.LmxProxy.Host/Implementation/MxAccessClient.ReadWrite.csto find the exact COM method signatures. - Replace the
throwwith the actual COM calls. - The pattern is well-established in the reference code — it's a direct translation, not a redesign.
Step 4: MxAccessClient — Subscriptions
File: src/ZB.MOM.WW.LmxProxy.Host/MxAccess/MxAccessClient.Subscription.cs
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Serilog;
using ZB.MOM.WW.LmxProxy.Host.Domain;
namespace ZB.MOM.WW.LmxProxy.Host.MxAccess
{
public sealed partial class MxAccessClient
{
/// <summary>
/// Subscribes to value changes for the specified addresses.
/// Stores subscription state for reconnect replay.
/// </summary>
public async Task<IAsyncDisposable> SubscribeAsync(
IEnumerable<string> addresses,
Action<string, Vtq> callback,
CancellationToken ct = default)
{
if (!IsConnected)
throw new InvalidOperationException("Not connected to MxAccess");
var addressList = addresses.ToList();
await _staThread.DispatchAsync(() =>
{
foreach (var address in addressList)
{
SubscribeInternal(address);
// Store for reconnect replay
lock (_lock)
{
_storedSubscriptions[address] = callback;
}
}
});
Log.Information("Subscribed to {Count} tags", addressList.Count);
return new SubscriptionHandle(this, addressList, callback);
}
/// <summary>
/// Unsubscribes specific addresses.
/// </summary>
internal async Task UnsubscribeAsync(IEnumerable<string> addresses)
{
var addressList = addresses.ToList();
await _staThread.DispatchAsync(() =>
{
foreach (var address in addressList)
{
UnsubscribeInternal(address);
lock (_lock)
{
_storedSubscriptions.Remove(address);
}
}
});
Log.Information("Unsubscribed from {Count} tags", addressList.Count);
}
/// <summary>
/// Recreates all stored subscriptions after a reconnect.
/// Does not re-store them (they're already stored).
/// </summary>
private async Task RecreateStoredSubscriptionsAsync()
{
Dictionary<string, Action<string, Vtq>> subscriptions;
lock (_lock)
{
if (_storedSubscriptions.Count == 0) return;
subscriptions = new Dictionary<string, Action<string, Vtq>>(_storedSubscriptions);
}
Log.Information("Recreating {Count} stored subscriptions after reconnect", subscriptions.Count);
await _staThread.DispatchAsync(() =>
{
foreach (var kvp in subscriptions)
{
try
{
SubscribeInternal(kvp.Key);
}
catch (Exception ex)
{
Log.Warning(ex, "Failed to recreate subscription for {Address}", kvp.Key);
}
}
});
}
// ── Internal COM calls (execute on STA thread) ──────────
/// <summary>
/// Registers a tag subscription with MxAccess COM API (Advise).
/// Must be called on the STA thread.
/// </summary>
private void SubscribeInternal(string address)
{
// The exact MxAccess COM API call is something like:
// _lmxProxy.Advise(_connectionHandle, address);
//
// Consult src-reference/Implementation/MxAccessClient.Subscription.cs
throw new NotImplementedException(
"SubscribeInternal must be implemented using ArchestrA.MXAccess COM API. " +
"See src-reference/Implementation/MxAccessClient.Subscription.cs for the exact pattern.");
}
/// <summary>
/// Unregisters a tag subscription from MxAccess COM API (Unadvise).
/// Must be called on the STA thread.
/// </summary>
private void UnsubscribeInternal(string address)
{
// The exact MxAccess COM API call is something like:
// _lmxProxy.Unadvise(_connectionHandle, address);
throw new NotImplementedException(
"UnsubscribeInternal must be implemented using ArchestrA.MXAccess COM API.");
}
/// <summary>
/// Disposable subscription handle that unsubscribes on disposal.
/// </summary>
private sealed class SubscriptionHandle : IAsyncDisposable
{
private readonly MxAccessClient _client;
private readonly List<string> _addresses;
private readonly Action<string, Vtq> _callback;
private bool _disposed;
public SubscriptionHandle(MxAccessClient client, List<string> addresses, Action<string, Vtq> callback)
{
_client = client;
_addresses = addresses;
_callback = callback;
}
public async ValueTask DisposeAsync()
{
if (_disposed) return;
_disposed = true;
await _client.UnsubscribeAsync(_addresses);
}
}
}
}
Step 5: MxAccessClient — Event Handlers
File: src/ZB.MOM.WW.LmxProxy.Host/MxAccess/MxAccessClient.EventHandlers.cs
using System;
using System.Collections.Generic;
using Serilog;
using ZB.MOM.WW.LmxProxy.Host.Domain;
namespace ZB.MOM.WW.LmxProxy.Host.MxAccess
{
public sealed partial class MxAccessClient
{
/// <summary>
/// Callback invoked by the SubscriptionManager when it needs to deliver
/// data change events. Set by the SubscriptionManager during initialization.
/// </summary>
public Action<string, Vtq>? OnTagValueChanged { get; set; }
/// <summary>
/// COM event handler for MxAccess DataChanged events.
/// Called on the STA thread when a subscribed tag value changes.
/// </summary>
private void OnDataChange(
int hConnect,
int numberOfItems,
// The exact parameter types depend on the COM interop assembly.
// Consult src-reference/Implementation/MxAccessClient.EventHandlers.cs
// for the exact signature. The pattern is:
// object[] addresses, object[] values, object[] qualities, object[] timestamps
// or it may use SAFEARRAY parameters.
object addresses,
object values,
object qualities,
object timestamps)
{
// This handler fires on the STA thread.
// Parse the COM arrays and dispatch to OnTagValueChanged for each item.
//
// Skeleton implementation:
try
{
var addrArray = (object[])addresses;
var valArray = (object[])values;
var qualArray = (object[])qualities;
var tsArray = (object[])timestamps;
for (int i = 0; i < numberOfItems; i++)
{
var address = addrArray[i]?.ToString() ?? "";
var value = valArray[i];
var quality = MapQuality(Convert.ToInt32(qualArray[i]));
var timestamp = Convert.ToDateTime(tsArray[i]).ToUniversalTime();
var vtq = new Vtq(value, timestamp, quality);
// Route to stored callback
Action<string, Vtq>? callback = null;
lock (_lock)
{
_storedSubscriptions.TryGetValue(address, out callback);
}
callback?.Invoke(address, vtq);
// Also route to the SubscriptionManager's global handler
OnTagValueChanged?.Invoke(address, vtq);
}
}
catch (Exception ex)
{
Log.Error(ex, "Error processing OnDataChange event");
}
}
/// <summary>
/// COM event handler for MxAccess WriteCompleted events.
/// </summary>
private void OnWriteComplete(
int hConnect,
int numberOfItems,
object addresses,
object results)
{
// Write completion is currently fire-and-forget.
// Log for diagnostics.
try
{
Log.Debug("WriteCompleted: {Count} items", numberOfItems);
}
catch (Exception ex)
{
Log.Error(ex, "Error processing OnWriteComplete event");
}
}
}
}
Important: The exact COM event handler signatures (OnDataChange, OnWriteComplete) depend on the ArchestrA.MXAccess COM interop assembly's event definitions. The implementing session MUST consult src-reference/ZB.MOM.WW.LmxProxy.Host/Implementation/MxAccessClient.EventHandlers.cs for the exact parameter types. The skeleton above uses a common pattern but may need adjustment.
Step 6: SessionManager
File: src/ZB.MOM.WW.LmxProxy.Host/Sessions/SessionManager.cs
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using Serilog;
namespace ZB.MOM.WW.LmxProxy.Host.Sessions
{
/// <summary>
/// Tracks active client sessions in memory.
/// Thread-safe via ConcurrentDictionary.
/// </summary>
public sealed class SessionManager : IDisposable
{
private static readonly ILogger Log = Serilog.Log.ForContext<SessionManager>();
private readonly ConcurrentDictionary<string, SessionInfo> _sessions
= new ConcurrentDictionary<string, SessionInfo>(StringComparer.OrdinalIgnoreCase);
private readonly Timer? _scavengingTimer;
private readonly TimeSpan _inactivityTimeout;
/// <summary>
/// Creates a SessionManager with optional inactivity scavenging.
/// </summary>
/// <param name="inactivityTimeoutMinutes">
/// Sessions inactive for this many minutes are automatically terminated.
/// Set to 0 to disable scavenging.
/// </param>
public SessionManager(int inactivityTimeoutMinutes = 5)
{
_inactivityTimeout = TimeSpan.FromMinutes(inactivityTimeoutMinutes);
if (inactivityTimeoutMinutes > 0)
{
// Check every 60 seconds
_scavengingTimer = new Timer(ScavengeInactiveSessions, null,
TimeSpan.FromSeconds(60), TimeSpan.FromSeconds(60));
}
}
/// <summary>Gets the count of active sessions.</summary>
public int ActiveSessionCount => _sessions.Count;
/// <summary>
/// Creates a new session.
/// Returns the 32-character hex GUID session ID.
/// </summary>
public string CreateSession(string clientId, string apiKey)
{
var sessionId = Guid.NewGuid().ToString("N"); // 32-char lowercase hex, no hyphens
var sessionInfo = new SessionInfo(sessionId, clientId, apiKey);
_sessions[sessionId] = sessionInfo;
Log.Information("Session created: {SessionId} for client {ClientId}", sessionId, clientId);
return sessionId;
}
/// <summary>
/// Validates a session ID. Updates LastActivity on success.
/// Returns true if the session exists.
/// </summary>
public bool ValidateSession(string sessionId)
{
if (_sessions.TryGetValue(sessionId, out var session))
{
session.TouchLastActivity();
return true;
}
return false;
}
/// <summary>
/// Terminates a session. Returns true if the session existed.
/// </summary>
public bool TerminateSession(string sessionId)
{
if (_sessions.TryRemove(sessionId, out _))
{
Log.Information("Session terminated: {SessionId}", sessionId);
return true;
}
return false;
}
/// <summary>Gets session info by ID, or null if not found.</summary>
public SessionInfo? GetSession(string sessionId)
{
_sessions.TryGetValue(sessionId, out var session);
return session;
}
/// <summary>Gets a snapshot of all active sessions.</summary>
public IReadOnlyList<SessionInfo> GetAllSessions()
{
return _sessions.Values.ToList().AsReadOnly();
}
/// <summary>
/// Scavenges sessions that have been inactive for longer than the timeout.
/// </summary>
private void ScavengeInactiveSessions(object? state)
{
if (_inactivityTimeout <= TimeSpan.Zero) return;
var cutoff = DateTime.UtcNow - _inactivityTimeout;
var expired = _sessions.Where(kvp => kvp.Value.LastActivity < cutoff).ToList();
foreach (var kvp in expired)
{
if (_sessions.TryRemove(kvp.Key, out _))
{
Log.Information("Session {SessionId} scavenged (inactive since {LastActivity})",
kvp.Key, kvp.Value.LastActivity);
}
}
}
public void Dispose()
{
_scavengingTimer?.Dispose();
_sessions.Clear();
}
}
/// <summary>
/// Information about an active client session.
/// </summary>
public class SessionInfo
{
public SessionInfo(string sessionId, string clientId, string apiKey)
{
SessionId = sessionId;
ClientId = clientId;
ApiKey = apiKey;
ConnectedAt = DateTime.UtcNow;
LastActivity = DateTime.UtcNow;
}
public string SessionId { get; }
public string ClientId { get; }
public string ApiKey { get; }
public DateTime ConnectedAt { get; }
public DateTime LastActivity { get; private set; }
public long ConnectedSinceUtcTicks => ConnectedAt.Ticks;
/// <summary>Updates the last activity timestamp to now.</summary>
public void TouchLastActivity()
{
LastActivity = DateTime.UtcNow;
}
}
}
Step 7: SubscriptionManager
File: src/ZB.MOM.WW.LmxProxy.Host/Subscriptions/SubscriptionManager.cs
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Channels;
using Serilog;
using ZB.MOM.WW.LmxProxy.Host.Domain;
namespace ZB.MOM.WW.LmxProxy.Host.Subscriptions
{
/// <summary>
/// Manages per-client subscription channels with shared MxAccess subscriptions.
/// Ref-counted tag subscriptions: first client creates, last client disposes.
/// </summary>
public sealed class SubscriptionManager : IDisposable
{
private static readonly ILogger Log = Serilog.Log.ForContext<SubscriptionManager>();
private readonly IScadaClient _scadaClient;
private readonly int _channelCapacity;
private readonly BoundedChannelFullMode _channelFullMode;
// Client ID → ClientSubscription
private readonly ConcurrentDictionary<string, ClientSubscription> _clientSubscriptions
= new ConcurrentDictionary<string, ClientSubscription>(StringComparer.OrdinalIgnoreCase);
// Tag address → TagSubscription (shared, ref-counted)
private readonly ConcurrentDictionary<string, TagSubscription> _tagSubscriptions
= new ConcurrentDictionary<string, TagSubscription>(StringComparer.OrdinalIgnoreCase);
private readonly ReaderWriterLockSlim _rwLock = new ReaderWriterLockSlim();
public SubscriptionManager(IScadaClient scadaClient, int channelCapacity = 1000,
BoundedChannelFullMode channelFullMode = BoundedChannelFullMode.DropOldest)
{
_scadaClient = scadaClient;
_channelCapacity = channelCapacity;
_channelFullMode = channelFullMode;
}
/// <summary>
/// Creates a subscription for a client. Returns a ChannelReader to stream from.
/// </summary>
public ChannelReader<(string address, Vtq vtq)> Subscribe(
string clientId, IEnumerable<string> addresses, CancellationToken ct)
{
var channel = Channel.CreateBounded<(string address, Vtq vtq)>(
new BoundedChannelOptions(_channelCapacity)
{
FullMode = _channelFullMode,
SingleReader = true,
SingleWriter = false
});
var addressSet = new HashSet<string>(addresses, StringComparer.OrdinalIgnoreCase);
var clientSub = new ClientSubscription(clientId, channel, addressSet);
_clientSubscriptions[clientId] = clientSub;
_rwLock.EnterWriteLock();
try
{
foreach (var address in addressSet)
{
if (_tagSubscriptions.TryGetValue(address, out var tagSub))
{
tagSub.ClientIds.Add(clientId);
}
else
{
_tagSubscriptions[address] = new TagSubscription(address,
new HashSet<string>(StringComparer.OrdinalIgnoreCase) { clientId });
}
}
}
finally
{
_rwLock.ExitWriteLock();
}
// Register cancellation cleanup
ct.Register(() => UnsubscribeClient(clientId));
Log.Information("Client {ClientId} subscribed to {Count} tags", clientId, addressSet.Count);
return channel.Reader;
}
/// <summary>
/// Called from MxAccessClient's OnDataChange handler.
/// Fans out the update to all subscribed clients.
/// </summary>
public void OnTagValueChanged(string address, Vtq vtq)
{
_rwLock.EnterReadLock();
HashSet<string>? clientIds = null;
try
{
if (_tagSubscriptions.TryGetValue(address, out var tagSub))
{
clientIds = new HashSet<string>(tagSub.ClientIds);
}
}
finally
{
_rwLock.ExitReadLock();
}
if (clientIds == null || clientIds.Count == 0) return;
foreach (var clientId in clientIds)
{
if (_clientSubscriptions.TryGetValue(clientId, out var clientSub))
{
if (!clientSub.Channel.Writer.TryWrite((address, vtq)))
{
clientSub.IncrementDropped();
Log.Debug("Dropped message for client {ClientId} on tag {Address} (channel full)",
clientId, address);
}
else
{
clientSub.IncrementDelivered();
}
}
}
}
/// <summary>
/// Removes a client's subscriptions and cleans up tag subscriptions
/// when the last client unsubscribes.
/// </summary>
public void UnsubscribeClient(string clientId)
{
if (!_clientSubscriptions.TryRemove(clientId, out var clientSub))
return;
_rwLock.EnterWriteLock();
try
{
foreach (var address in clientSub.Addresses)
{
if (_tagSubscriptions.TryGetValue(address, out var tagSub))
{
tagSub.ClientIds.Remove(clientId);
// Last client unsubscribed — remove the tag subscription
if (tagSub.ClientIds.Count == 0)
{
_tagSubscriptions.TryRemove(address, out _);
}
}
}
}
finally
{
_rwLock.ExitWriteLock();
}
// Complete the channel (signals end of stream to the gRPC handler)
clientSub.Channel.Writer.TryComplete();
Log.Information("Client {ClientId} unsubscribed ({Delivered} delivered, {Dropped} dropped)",
clientId, clientSub.DeliveredCount, clientSub.DroppedCount);
}
/// <summary>
/// Sends a bad-quality notification to all subscribed clients for all their tags.
/// Called when MxAccess disconnects.
/// </summary>
public void NotifyDisconnection()
{
var badVtq = Vtq.New(null, Quality.Bad_NotConnected);
foreach (var kvp in _clientSubscriptions)
{
foreach (var address in kvp.Value.Addresses)
{
kvp.Value.Channel.Writer.TryWrite((address, badVtq));
}
}
}
/// <summary>Returns subscription statistics.</summary>
public SubscriptionStats GetStats()
{
return new SubscriptionStats(
_clientSubscriptions.Count,
_tagSubscriptions.Count,
_clientSubscriptions.Values.Sum(c => c.Addresses.Count));
}
public void Dispose()
{
foreach (var kvp in _clientSubscriptions)
{
kvp.Value.Channel.Writer.TryComplete();
}
_clientSubscriptions.Clear();
_tagSubscriptions.Clear();
_rwLock.Dispose();
}
// ── Nested types ─────────────────────────────────────────
private class ClientSubscription
{
public ClientSubscription(string clientId,
Channel<(string address, Vtq vtq)> channel,
HashSet<string> addresses)
{
ClientId = clientId;
Channel = channel;
Addresses = addresses;
}
public string ClientId { get; }
public Channel<(string address, Vtq vtq)> Channel { get; }
public HashSet<string> Addresses { get; }
public long DeliveredCount { get; private set; }
public long DroppedCount { get; private set; }
public void IncrementDelivered() => Interlocked.Increment(ref _delivered);
public void IncrementDropped() => Interlocked.Increment(ref _dropped);
// Use backing fields for Interlocked
private long _delivered;
private long _dropped;
}
private class TagSubscription
{
public TagSubscription(string address, HashSet<string> clientIds)
{
Address = address;
ClientIds = clientIds;
}
public string Address { get; }
public HashSet<string> ClientIds { get; }
}
}
}
Note: The ClientSubscription class has a minor issue — DeliveredCount and DroppedCount properties read the old field values, not the _delivered/_dropped backing fields. Fix by changing the properties to:
public long DeliveredCount => Interlocked.Read(ref _delivered);
public long DroppedCount => Interlocked.Read(ref _dropped);
Also need to add SubscriptionStats to the Domain:
File: src/ZB.MOM.WW.LmxProxy.Host/Domain/SubscriptionStats.cs
namespace ZB.MOM.WW.LmxProxy.Host.Domain
{
/// <summary>Subscription statistics for monitoring.</summary>
public class SubscriptionStats
{
public SubscriptionStats(int totalClients, int totalTags, int activeSubscriptions)
{
TotalClients = totalClients;
TotalTags = totalTags;
ActiveSubscriptions = activeSubscriptions;
}
public int TotalClients { get; }
public int TotalTags { get; }
public int ActiveSubscriptions { get; }
}
}
Step 8: Unit tests
8.1 SessionManager tests
File: tests/ZB.MOM.WW.LmxProxy.Host.Tests/Sessions/SessionManagerTests.cs
using System;
using System.Threading;
using System.Threading.Tasks;
using FluentAssertions;
using Xunit;
using ZB.MOM.WW.LmxProxy.Host.Sessions;
namespace ZB.MOM.WW.LmxProxy.Host.Tests.Sessions
{
public class SessionManagerTests
{
[Fact]
public void CreateSession_Returns32CharHexId()
{
using var sm = new SessionManager(inactivityTimeoutMinutes: 0);
var id = sm.CreateSession("client1", "key1");
id.Should().HaveLength(32);
id.Should().MatchRegex("^[0-9a-f]{32}$");
}
[Fact]
public void CreateSession_IncrementsCount()
{
using var sm = new SessionManager(inactivityTimeoutMinutes: 0);
sm.ActiveSessionCount.Should().Be(0);
sm.CreateSession("c1", "k1");
sm.ActiveSessionCount.Should().Be(1);
sm.CreateSession("c2", "k2");
sm.ActiveSessionCount.Should().Be(2);
}
[Fact]
public void ValidateSession_ReturnsTrueForExistingSession()
{
using var sm = new SessionManager(inactivityTimeoutMinutes: 0);
var id = sm.CreateSession("c1", "k1");
sm.ValidateSession(id).Should().BeTrue();
}
[Fact]
public void ValidateSession_ReturnsFalseForUnknownSession()
{
using var sm = new SessionManager(inactivityTimeoutMinutes: 0);
sm.ValidateSession("nonexistent").Should().BeFalse();
}
[Fact]
public void ValidateSession_UpdatesLastActivity()
{
using var sm = new SessionManager(inactivityTimeoutMinutes: 0);
var id = sm.CreateSession("c1", "k1");
var session = sm.GetSession(id);
var initialActivity = session!.LastActivity;
Thread.Sleep(50); // Small delay to ensure time passes
sm.ValidateSession(id);
session.LastActivity.Should().BeAfter(initialActivity);
}
[Fact]
public void TerminateSession_RemovesSession()
{
using var sm = new SessionManager(inactivityTimeoutMinutes: 0);
var id = sm.CreateSession("c1", "k1");
sm.TerminateSession(id).Should().BeTrue();
sm.ActiveSessionCount.Should().Be(0);
sm.ValidateSession(id).Should().BeFalse();
}
[Fact]
public void TerminateSession_ReturnsFalseForUnknownSession()
{
using var sm = new SessionManager(inactivityTimeoutMinutes: 0);
sm.TerminateSession("nonexistent").Should().BeFalse();
}
[Fact]
public void GetSession_ReturnsNullForUnknown()
{
using var sm = new SessionManager(inactivityTimeoutMinutes: 0);
sm.GetSession("nonexistent").Should().BeNull();
}
[Fact]
public void GetSession_ReturnsCorrectInfo()
{
using var sm = new SessionManager(inactivityTimeoutMinutes: 0);
var id = sm.CreateSession("client-abc", "key-xyz");
var session = sm.GetSession(id);
session.Should().NotBeNull();
session!.ClientId.Should().Be("client-abc");
session.ApiKey.Should().Be("key-xyz");
session.SessionId.Should().Be(id);
session.ConnectedAt.Should().BeCloseTo(DateTime.UtcNow, TimeSpan.FromSeconds(2));
}
[Fact]
public void GetAllSessions_ReturnsSnapshot()
{
using var sm = new SessionManager(inactivityTimeoutMinutes: 0);
sm.CreateSession("c1", "k1");
sm.CreateSession("c2", "k2");
var all = sm.GetAllSessions();
all.Should().HaveCount(2);
}
[Fact]
public void ConcurrentAccess_IsThreadSafe()
{
using var sm = new SessionManager(inactivityTimeoutMinutes: 0);
var tasks = new Task[100];
for (int i = 0; i < 100; i++)
{
int idx = i;
tasks[i] = Task.Run(() =>
{
var id = sm.CreateSession($"client-{idx}", $"key-{idx}");
sm.ValidateSession(id);
if (idx % 3 == 0) sm.TerminateSession(id);
});
}
Task.WaitAll(tasks);
// Should have ~67 sessions remaining (100 - ~33 terminated)
sm.ActiveSessionCount.Should().BeInRange(60, 70);
}
[Fact]
public void Dispose_ClearsAllSessions()
{
var sm = new SessionManager(inactivityTimeoutMinutes: 0);
sm.CreateSession("c1", "k1");
sm.CreateSession("c2", "k2");
sm.Dispose();
sm.ActiveSessionCount.Should().Be(0);
}
[Fact]
public void ConnectedSinceUtcTicks_ReturnsCorrectValue()
{
using var sm = new SessionManager(inactivityTimeoutMinutes: 0);
var id = sm.CreateSession("c1", "k1");
var session = sm.GetSession(id);
session!.ConnectedSinceUtcTicks.Should().Be(session.ConnectedAt.Ticks);
}
}
}
8.2 SubscriptionManager tests
File: tests/ZB.MOM.WW.LmxProxy.Host.Tests/Subscriptions/SubscriptionManagerTests.cs
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using FluentAssertions;
using Xunit;
using ZB.MOM.WW.LmxProxy.Host.Domain;
using ZB.MOM.WW.LmxProxy.Host.Subscriptions;
namespace ZB.MOM.WW.LmxProxy.Host.Tests.Subscriptions
{
public class SubscriptionManagerTests
{
/// <summary>Fake IScadaClient for testing (no COM dependency).</summary>
private class FakeScadaClient : IScadaClient
{
public bool IsConnected => true;
public ConnectionState ConnectionState => ConnectionState.Connected;
public event EventHandler<ConnectionStateChangedEventArgs>? ConnectionStateChanged;
public Task ConnectAsync(CancellationToken ct = default) => Task.CompletedTask;
public Task DisconnectAsync(CancellationToken ct = default) => Task.CompletedTask;
public Task<Vtq> ReadAsync(string address, CancellationToken ct = default) =>
Task.FromResult(Vtq.Good(42.0));
public Task<IReadOnlyDictionary<string, Vtq>> ReadBatchAsync(IEnumerable<string> addresses, CancellationToken ct = default) =>
Task.FromResult<IReadOnlyDictionary<string, Vtq>>(new Dictionary<string, Vtq>());
public Task WriteAsync(string address, object value, CancellationToken ct = default) => Task.CompletedTask;
public Task WriteBatchAsync(IReadOnlyDictionary<string, object> values, CancellationToken ct = default) => Task.CompletedTask;
public Task<(bool flagReached, int elapsedMs)> WriteBatchAndWaitAsync(
IReadOnlyDictionary<string, object> values, string flagTag, object flagValue,
int timeoutMs, int pollIntervalMs, CancellationToken ct = default) =>
Task.FromResult((false, 0));
public Task<IAsyncDisposable> SubscribeAsync(IEnumerable<string> addresses, Action<string, Vtq> callback, CancellationToken ct = default) =>
Task.FromResult<IAsyncDisposable>(new FakeSubscriptionHandle());
public ValueTask DisposeAsync() => ValueTask.CompletedTask;
private class FakeSubscriptionHandle : IAsyncDisposable { public ValueTask DisposeAsync() => ValueTask.CompletedTask; }
}
[Fact]
public void Subscribe_ReturnsChannelReader()
{
using var sm = new SubscriptionManager(new FakeScadaClient());
using var cts = new CancellationTokenSource();
var reader = sm.Subscribe("client1", new[] { "Tag1", "Tag2" }, cts.Token);
reader.Should().NotBeNull();
}
[Fact]
public async Task OnTagValueChanged_FansOutToSubscribedClients()
{
using var sm = new SubscriptionManager(new FakeScadaClient());
using var cts = new CancellationTokenSource();
var reader = sm.Subscribe("client1", new[] { "Motor.Speed" }, cts.Token);
var vtq = Vtq.Good(42.0);
sm.OnTagValueChanged("Motor.Speed", vtq);
var result = await reader.ReadAsync(cts.Token);
result.address.Should().Be("Motor.Speed");
result.vtq.Value.Should().Be(42.0);
result.vtq.Quality.Should().Be(Quality.Good);
}
[Fact]
public async Task OnTagValueChanged_MultipleClients_BothReceive()
{
using var sm = new SubscriptionManager(new FakeScadaClient());
using var cts = new CancellationTokenSource();
var reader1 = sm.Subscribe("client1", new[] { "Motor.Speed" }, cts.Token);
var reader2 = sm.Subscribe("client2", new[] { "Motor.Speed" }, cts.Token);
sm.OnTagValueChanged("Motor.Speed", Vtq.Good(99.0));
var r1 = await reader1.ReadAsync(cts.Token);
var r2 = await reader2.ReadAsync(cts.Token);
r1.vtq.Value.Should().Be(99.0);
r2.vtq.Value.Should().Be(99.0);
}
[Fact]
public async Task OnTagValueChanged_NonSubscribedTag_NoDelivery()
{
using var sm = new SubscriptionManager(new FakeScadaClient());
using var cts = new CancellationTokenSource();
var reader = sm.Subscribe("client1", new[] { "Motor.Speed" }, cts.Token);
sm.OnTagValueChanged("Motor.Torque", Vtq.Good(10.0));
// Channel should be empty
reader.TryRead(out _).Should().BeFalse();
}
[Fact]
public void UnsubscribeClient_CompletesChannel()
{
using var sm = new SubscriptionManager(new FakeScadaClient());
using var cts = new CancellationTokenSource();
var reader = sm.Subscribe("client1", new[] { "Motor.Speed" }, cts.Token);
sm.UnsubscribeClient("client1");
// Channel should be completed
reader.Completion.IsCompleted.Should().BeTrue();
}
[Fact]
public void UnsubscribeClient_RemovesFromTagSubscriptions()
{
using var sm = new SubscriptionManager(new FakeScadaClient());
using var cts = new CancellationTokenSource();
sm.Subscribe("client1", new[] { "Motor.Speed" }, cts.Token);
sm.UnsubscribeClient("client1");
var stats = sm.GetStats();
stats.TotalClients.Should().Be(0);
stats.TotalTags.Should().Be(0);
}
[Fact]
public void RefCounting_LastClientUnsubscribeRemovesTag()
{
using var sm = new SubscriptionManager(new FakeScadaClient());
using var cts = new CancellationTokenSource();
sm.Subscribe("client1", new[] { "Motor.Speed" }, cts.Token);
sm.Subscribe("client2", new[] { "Motor.Speed" }, cts.Token);
sm.GetStats().TotalTags.Should().Be(1);
sm.UnsubscribeClient("client1");
sm.GetStats().TotalTags.Should().Be(1); // client2 still subscribed
sm.UnsubscribeClient("client2");
sm.GetStats().TotalTags.Should().Be(0); // last client gone
}
[Fact]
public void NotifyDisconnection_SendsBadQualityToAll()
{
using var sm = new SubscriptionManager(new FakeScadaClient());
using var cts = new CancellationTokenSource();
var reader = sm.Subscribe("client1", new[] { "Motor.Speed", "Motor.Torque" }, cts.Token);
sm.NotifyDisconnection();
// Should receive 2 bad quality messages
reader.TryRead(out var r1).Should().BeTrue();
r1.vtq.Quality.Should().Be(Quality.Bad_NotConnected);
reader.TryRead(out var r2).Should().BeTrue();
r2.vtq.Quality.Should().Be(Quality.Bad_NotConnected);
}
[Fact]
public void Backpressure_DropOldest_DropsWhenFull()
{
using var sm = new SubscriptionManager(new FakeScadaClient(), channelCapacity: 3);
using var cts = new CancellationTokenSource();
var reader = sm.Subscribe("client1", new[] { "Motor.Speed" }, cts.Token);
// Fill the channel beyond capacity
for (int i = 0; i < 10; i++)
{
sm.OnTagValueChanged("Motor.Speed", Vtq.Good((double)i));
}
// Should have exactly 3 messages (capacity limit)
int count = 0;
while (reader.TryRead(out _)) count++;
count.Should().Be(3);
}
[Fact]
public void GetStats_ReturnsCorrectCounts()
{
using var sm = new SubscriptionManager(new FakeScadaClient());
using var cts = new CancellationTokenSource();
sm.Subscribe("c1", new[] { "Tag1", "Tag2" }, cts.Token);
sm.Subscribe("c2", new[] { "Tag2", "Tag3" }, cts.Token);
var stats = sm.GetStats();
stats.TotalClients.Should().Be(2);
stats.TotalTags.Should().Be(3); // Tag1, Tag2, Tag3
stats.ActiveSubscriptions.Should().Be(4); // c1:Tag1, c1:Tag2, c2:Tag2, c2:Tag3
}
}
}
8.3 StaDispatchThread tests
File: tests/ZB.MOM.WW.LmxProxy.Host.Tests/MxAccess/StaDispatchThreadTests.cs
using System;
using System.Threading;
using System.Threading.Tasks;
using FluentAssertions;
using Xunit;
using ZB.MOM.WW.LmxProxy.Host.MxAccess;
namespace ZB.MOM.WW.LmxProxy.Host.Tests.MxAccess
{
public class StaDispatchThreadTests
{
[Fact]
public async Task DispatchAsync_ExecutesOnStaThread()
{
using var sta = new StaDispatchThread("Test-STA");
var threadId = await sta.DispatchAsync(() => Thread.CurrentThread.ManagedThreadId);
threadId.Should().NotBe(Thread.CurrentThread.ManagedThreadId);
}
[Fact]
public async Task DispatchAsync_ReturnsResult()
{
using var sta = new StaDispatchThread("Test-STA");
var result = await sta.DispatchAsync(() => 42);
result.Should().Be(42);
}
[Fact]
public async Task DispatchAsync_PropagatesException()
{
using var sta = new StaDispatchThread("Test-STA");
var act = () => sta.DispatchAsync(() => throw new InvalidOperationException("test error"));
await act.Should().ThrowAsync<InvalidOperationException>().WithMessage("test error");
}
[Fact]
public async Task DispatchAsync_Action_Completes()
{
using var sta = new StaDispatchThread("Test-STA");
int value = 0;
await sta.DispatchAsync(() => { value = 99; });
value.Should().Be(99);
}
[Fact]
public void Dispose_CompletesGracefully()
{
var sta = new StaDispatchThread("Test-STA");
sta.Dispose(); // Should not throw
}
[Fact]
public void DispatchAfterDispose_ThrowsObjectDisposedException()
{
var sta = new StaDispatchThread("Test-STA");
sta.Dispose();
var act = () => sta.DispatchAsync(() => 42);
act.Should().ThrowAsync<ObjectDisposedException>();
}
[Fact]
public async Task MultipleDispatches_ExecuteInOrder()
{
using var sta = new StaDispatchThread("Test-STA");
var results = new System.Collections.Concurrent.ConcurrentBag<int>();
var tasks = new Task[10];
for (int i = 0; i < 10; i++)
{
int idx = i;
tasks[i] = sta.DispatchAsync(() => { results.Add(idx); });
}
await Task.WhenAll(tasks);
results.Count.Should().Be(10);
}
[Fact]
public async Task StaThread_HasStaApartmentState()
{
using var sta = new StaDispatchThread("Test-STA");
var apartmentState = await sta.DispatchAsync(() => Thread.CurrentThread.GetApartmentState());
apartmentState.Should().Be(ApartmentState.STA);
}
}
}
8.4 MxAccessClient TypedValueEquals tests
File: tests/ZB.MOM.WW.LmxProxy.Host.Tests/MxAccess/TypedValueEqualsTests.cs
Since TypedValueEquals is private in MxAccessClient, test it indirectly or extract it to a helper. For testability, create a public static helper:
File: src/ZB.MOM.WW.LmxProxy.Host/Domain/TypedValueComparer.cs
using System;
namespace ZB.MOM.WW.LmxProxy.Host.Domain
{
/// <summary>
/// Type-aware equality comparison for WriteBatchAndWait flag matching.
/// </summary>
public static class TypedValueComparer
{
/// <summary>
/// Returns true if both values are the same type and equal.
/// Mismatched types are never equal.
/// Null equals null only.
/// </summary>
public static bool Equals(object? a, object? b)
{
if (a == null && b == null) return true;
if (a == null || b == null) return false;
if (a.GetType() != b.GetType()) return false;
if (a is Array arrA && b is Array arrB)
{
if (arrA.Length != arrB.Length) return false;
for (int i = 0; i < arrA.Length; i++)
{
if (!object.Equals(arrA.GetValue(i), arrB.GetValue(i)))
return false;
}
return true;
}
return object.Equals(a, b);
}
}
}
Then the test file:
using FluentAssertions;
using Xunit;
using ZB.MOM.WW.LmxProxy.Host.Domain;
namespace ZB.MOM.WW.LmxProxy.Host.Tests.MxAccess
{
public class TypedValueEqualsTests
{
[Fact]
public void NullEqualsNull() => TypedValueComparer.Equals(null, null).Should().BeTrue();
[Fact]
public void NullNotEqualsValue() => TypedValueComparer.Equals(null, 42).Should().BeFalse();
[Fact]
public void ValueNotEqualsNull() => TypedValueComparer.Equals(42, null).Should().BeFalse();
[Fact]
public void SameTypeAndValue() => TypedValueComparer.Equals(42.5, 42.5).Should().BeTrue();
[Fact]
public void SameTypeDifferentValue() => TypedValueComparer.Equals(42.5, 43.0).Should().BeFalse();
[Fact]
public void DifferentTypes_NeverEqual() => TypedValueComparer.Equals(1, 1.0).Should().BeFalse();
[Fact]
public void BoolTrue() => TypedValueComparer.Equals(true, true).Should().BeTrue();
[Fact]
public void BoolFalse() => TypedValueComparer.Equals(false, true).Should().BeFalse();
[Fact]
public void String_CaseSensitive()
{
TypedValueComparer.Equals("DONE", "DONE").Should().BeTrue();
TypedValueComparer.Equals("done", "DONE").Should().BeFalse();
}
[Fact]
public void Array_SameElements()
{
TypedValueComparer.Equals(new[] { 1, 2, 3 }, new[] { 1, 2, 3 }).Should().BeTrue();
}
[Fact]
public void Array_DifferentElements()
{
TypedValueComparer.Equals(new[] { 1, 2, 3 }, new[] { 1, 2, 4 }).Should().BeFalse();
}
[Fact]
public void Array_DifferentLengths()
{
TypedValueComparer.Equals(new[] { 1, 2 }, new[] { 1, 2, 3 }).Should().BeFalse();
}
[Fact]
public void Int32_NotEqual_ToDouble()
{
TypedValueComparer.Equals(1, 1.0).Should().BeFalse();
}
[Fact]
public void Long_Equality()
{
TypedValueComparer.Equals(long.MaxValue, long.MaxValue).Should().BeTrue();
}
[Fact]
public void DateTime_TickPrecision()
{
var dt1 = new System.DateTime(638789000000000000, System.DateTimeKind.Utc);
var dt2 = new System.DateTime(638789000000000000, System.DateTimeKind.Utc);
TypedValueComparer.Equals(dt1, dt2).Should().BeTrue();
}
}
}
Step 9: Build verification
cd /Users/dohertj2/Desktop/scadalink-design/lmxproxy
# Build Client (works on macOS)
dotnet build src/ZB.MOM.WW.LmxProxy.Client/ZB.MOM.WW.LmxProxy.Client.csproj
# Run Client tests
dotnet test tests/ZB.MOM.WW.LmxProxy.Client.Tests/ZB.MOM.WW.LmxProxy.Client.Tests.csproj
# Host builds on Windows only (net48/x86):
# dotnet build src/ZB.MOM.WW.LmxProxy.Host/ZB.MOM.WW.LmxProxy.Host.csproj
# dotnet test tests/ZB.MOM.WW.LmxProxy.Host.Tests/ZB.MOM.WW.LmxProxy.Host.Tests.csproj
Completion Criteria
StaDispatchThreadcompiles and tests pass (STA apartment, dispatch, exception propagation)MxAccessClientmain class compiles with all partial filesMxAccessClient.Connection.cscompiles (ConnectAsync, DisconnectAsync, MonitorConnectionAsync)MxAccessClient.ReadWrite.cscompiles (ReadAsync, ReadBatchAsync, WriteAsync, WriteBatchAsync, WriteBatchAndWaitAsync)MxAccessClient.Subscription.cscompiles (SubscribeAsync, UnsubscribeAsync, RecreateStoredSubscriptionsAsync)MxAccessClient.EventHandlers.cscompiles (OnDataChange, OnWriteComplete)SessionManagercompiles and all tests pass (CRUD, scavenging, concurrency)SubscriptionManagercompiles and all tests pass (subscribe, fan-out, unsubscribe, ref-counting, backpressure, disconnect notification)TypedValueComparertests pass (all comparison rules from design doc)- COM method bodies are marked with
NotImplementedExceptionand clear instructions to consult reference code - No references to old namespaces