diff --git a/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/Domain/SubscriptionStats.cs b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/Domain/SubscriptionStats.cs
new file mode 100644
index 0000000..d679488
--- /dev/null
+++ b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/Domain/SubscriptionStats.cs
@@ -0,0 +1,17 @@
+namespace ZB.MOM.WW.LmxProxy.Host.Domain
+{
+ /// Subscription statistics for monitoring.
+ public class SubscriptionStats
+ {
+ public SubscriptionStats(int totalClients, int totalTags, int activeSubscriptions)
+ {
+ TotalClients = totalClients;
+ TotalTags = totalTags;
+ ActiveSubscriptions = activeSubscriptions;
+ }
+
+ public int TotalClients { get; }
+ public int TotalTags { get; }
+ public int ActiveSubscriptions { get; }
+ }
+}
diff --git a/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/Domain/TypedValueComparer.cs b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/Domain/TypedValueComparer.cs
new file mode 100644
index 0000000..1f7eb11
--- /dev/null
+++ b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/Domain/TypedValueComparer.cs
@@ -0,0 +1,35 @@
+using System;
+
+namespace ZB.MOM.WW.LmxProxy.Host.Domain
+{
+ ///
+ /// Type-aware equality comparison for WriteBatchAndWait flag matching.
+ ///
+ public static class TypedValueComparer
+ {
+ ///
+ /// Returns true if both values are the same type and equal.
+ /// Mismatched types are never equal.
+ /// Null equals null only.
+ ///
+ public new static bool Equals(object? a, object? b)
+ {
+ if (a == null && b == null) return true;
+ if (a == null || b == null) return false;
+ if (a.GetType() != b.GetType()) return false;
+
+ if (a is Array arrA && b is Array arrB)
+ {
+ if (arrA.Length != arrB.Length) return false;
+ for (int i = 0; i < arrA.Length; i++)
+ {
+ if (!object.Equals(arrA.GetValue(i), arrB.GetValue(i)))
+ return false;
+ }
+ return true;
+ }
+
+ return object.Equals(a, b);
+ }
+ }
+}
diff --git a/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/MxAccess/MxAccessClient.Connection.cs b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/MxAccess/MxAccessClient.Connection.cs
new file mode 100644
index 0000000..77bbc8e
--- /dev/null
+++ b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/MxAccess/MxAccessClient.Connection.cs
@@ -0,0 +1,199 @@
+using System;
+using System.Runtime.InteropServices;
+using System.Threading;
+using System.Threading.Tasks;
+using ArchestrA.MxAccess;
+using Serilog;
+using ZB.MOM.WW.LmxProxy.Host.Domain;
+
+namespace ZB.MOM.WW.LmxProxy.Host.MxAccess
+{
+ public sealed partial class MxAccessClient
+ {
+ ///
+ /// Connects to MxAccess on the STA thread.
+ ///
+ public async Task ConnectAsync(CancellationToken ct = default)
+ {
+ if (_disposed) throw new ObjectDisposedException(nameof(MxAccessClient));
+ if (IsConnected) return;
+
+ SetState(ConnectionState.Connecting);
+
+ try
+ {
+ await _staThread.DispatchAsync(() =>
+ {
+ // Create COM object
+ _lmxProxy = new LMXProxyServer();
+
+ // Wire event handlers
+ _lmxProxy.OnDataChange += OnDataChange;
+ _lmxProxy.OnWriteComplete += OnWriteComplete;
+
+ // Register with MxAccess
+ _connectionHandle = _lmxProxy.Register("ZB.MOM.WW.LmxProxy.Host");
+ });
+
+ lock (_lock)
+ {
+ _connectedSince = DateTime.UtcNow;
+ }
+
+ SetState(ConnectionState.Connected);
+ Log.Information("Connected to MxAccess (handle={Handle})", _connectionHandle);
+
+ // Recreate any stored subscriptions from a previous connection
+ await RecreateStoredSubscriptionsAsync();
+ }
+ catch (Exception ex)
+ {
+ Log.Error(ex, "Failed to connect to MxAccess");
+ await CleanupComObjectsAsync();
+ SetState(ConnectionState.Error, ex.Message);
+ throw;
+ }
+ }
+
+ ///
+ /// Disconnects from MxAccess on the STA thread.
+ ///
+ public async Task DisconnectAsync(CancellationToken ct = default)
+ {
+ if (!IsConnected) return;
+
+ SetState(ConnectionState.Disconnecting);
+
+ try
+ {
+ await _staThread.DispatchAsync(() =>
+ {
+ if (_lmxProxy != null && _connectionHandle > 0)
+ {
+ try
+ {
+ // Remove event handlers first
+ _lmxProxy.OnDataChange -= OnDataChange;
+ _lmxProxy.OnWriteComplete -= OnWriteComplete;
+
+ // Unregister
+ _lmxProxy.Unregister(_connectionHandle);
+ }
+ catch (Exception ex)
+ {
+ Log.Warning(ex, "Error during MxAccess unregister");
+ }
+ finally
+ {
+ // Force-release COM object
+ Marshal.ReleaseComObject(_lmxProxy);
+ _lmxProxy = null;
+ _connectionHandle = 0;
+ }
+ }
+ });
+
+ SetState(ConnectionState.Disconnected);
+ Log.Information("Disconnected from MxAccess");
+ }
+ catch (Exception ex)
+ {
+ Log.Error(ex, "Error during disconnect");
+ SetState(ConnectionState.Error, ex.Message);
+ }
+ }
+
+ ///
+ /// Starts the auto-reconnect monitor loop.
+ /// Call this after initial ConnectAsync succeeds.
+ ///
+ public void StartMonitorLoop()
+ {
+ if (!_autoReconnect) return;
+
+ _reconnectCts = new CancellationTokenSource();
+ Task.Run(() => MonitorConnectionAsync(_reconnectCts.Token));
+ }
+
+ ///
+ /// Stops the auto-reconnect monitor loop.
+ ///
+ public void StopMonitorLoop()
+ {
+ _reconnectCts?.Cancel();
+ }
+
+ ///
+ /// Auto-reconnect monitor loop. Checks connection every monitorInterval.
+ /// On disconnect, attempts reconnect. On failure, retries at next interval.
+ ///
+ private async Task MonitorConnectionAsync(CancellationToken ct)
+ {
+ Log.Information("Connection monitor loop started (interval={IntervalMs}ms)", _monitorIntervalMs);
+
+ while (!ct.IsCancellationRequested)
+ {
+ try
+ {
+ await Task.Delay(_monitorIntervalMs, ct);
+ }
+ catch (OperationCanceledException)
+ {
+ break;
+ }
+
+ if (IsConnected) continue;
+
+ Log.Information("MxAccess disconnected, attempting reconnect...");
+ SetState(ConnectionState.Reconnecting);
+
+ try
+ {
+ await ConnectAsync(ct);
+ Log.Information("Reconnected to MxAccess successfully");
+ }
+ catch (OperationCanceledException)
+ {
+ break;
+ }
+ catch (Exception ex)
+ {
+ Log.Warning(ex, "Reconnect attempt failed, will retry in {IntervalMs}ms", _monitorIntervalMs);
+ }
+ }
+
+ Log.Information("Connection monitor loop exited");
+ }
+
+ ///
+ /// Cleans up COM objects on the STA thread after a failed connection.
+ ///
+ private async Task CleanupComObjectsAsync()
+ {
+ try
+ {
+ await _staThread.DispatchAsync(() =>
+ {
+ if (_lmxProxy != null)
+ {
+ try { _lmxProxy.OnDataChange -= OnDataChange; } catch { }
+ try { _lmxProxy.OnWriteComplete -= OnWriteComplete; } catch { }
+ try { Marshal.ReleaseComObject(_lmxProxy); } catch { }
+ _lmxProxy = null;
+ }
+ _connectionHandle = 0;
+ });
+ }
+ catch (Exception ex)
+ {
+ Log.Warning(ex, "Error during COM object cleanup");
+ }
+ }
+
+ /// Gets the UTC time when the connection was established.
+ public DateTime ConnectedSince
+ {
+ get { lock (_lock) { return _connectedSince; } }
+ }
+ }
+}
diff --git a/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/MxAccess/MxAccessClient.EventHandlers.cs b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/MxAccess/MxAccessClient.EventHandlers.cs
new file mode 100644
index 0000000..9c341c3
--- /dev/null
+++ b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/MxAccess/MxAccessClient.EventHandlers.cs
@@ -0,0 +1,85 @@
+using System;
+using System.Collections.Generic;
+using ArchestrA.MxAccess;
+using Serilog;
+using ZB.MOM.WW.LmxProxy.Host.Domain;
+
+namespace ZB.MOM.WW.LmxProxy.Host.MxAccess
+{
+ public sealed partial class MxAccessClient
+ {
+ ///
+ /// Callback invoked by the SubscriptionManager when it needs to deliver
+ /// data change events. Set by the SubscriptionManager during initialization.
+ ///
+ public Action? OnTagValueChanged { get; set; }
+
+ ///
+ /// COM event handler for MxAccess OnDataChange events.
+ /// Called on the STA thread when a subscribed tag value changes.
+ /// Signature matches the ArchestrA.MxAccess ILMXProxyServerEvents interface.
+ ///
+ private void OnDataChange(
+ int hLMXServerHandle,
+ int phItemHandle,
+ object pvItemValue,
+ int pwItemQuality,
+ object pftItemTimeStamp,
+ ref MXSTATUS_PROXY[] ItemStatus)
+ {
+ try
+ {
+ var quality = MapQuality(pwItemQuality);
+ var timestamp = ConvertTimestamp(pftItemTimeStamp);
+ var vtq = new Vtq(pvItemValue, timestamp, quality);
+
+ // We don't have the address from the COM callback — the reference code
+ // looks it up from _subscriptionsByHandle. For the v2 design, the
+ // SubscriptionManager's global handler receives (address, vtq) via
+ // OnTagValueChanged. The actual address resolution will be implemented
+ // when the full subscription tracking is wired up on windev.
+
+ // Route to the SubscriptionManager's global handler
+ OnTagValueChanged?.Invoke(phItemHandle.ToString(), vtq);
+ }
+ catch (Exception ex)
+ {
+ Log.Error(ex, "Error processing OnDataChange event for handle {Handle}", phItemHandle);
+ }
+ }
+
+ ///
+ /// COM event handler for MxAccess OnWriteComplete events.
+ /// Signature matches the ArchestrA.MxAccess ILMXProxyServerEvents interface.
+ ///
+ private void OnWriteComplete(
+ int hLMXServerHandle,
+ int phItemHandle,
+ ref MXSTATUS_PROXY[] ItemStatus)
+ {
+ // Write completion is currently fire-and-forget.
+ // Log for diagnostics.
+ try
+ {
+ Log.Debug("WriteCompleted: handle {Handle}", phItemHandle);
+ }
+ catch (Exception ex)
+ {
+ Log.Error(ex, "Error processing OnWriteComplete event for handle {Handle}", phItemHandle);
+ }
+ }
+
+ ///
+ /// Converts a timestamp object to DateTime in UTC.
+ ///
+ private static DateTime ConvertTimestamp(object timestamp)
+ {
+ if (timestamp is DateTime dt)
+ {
+ return dt.Kind == DateTimeKind.Utc ? dt : dt.ToUniversalTime();
+ }
+
+ return DateTime.UtcNow;
+ }
+ }
+}
diff --git a/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/MxAccess/MxAccessClient.ReadWrite.cs b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/MxAccess/MxAccessClient.ReadWrite.cs
new file mode 100644
index 0000000..ce92eb0
--- /dev/null
+++ b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/MxAccess/MxAccessClient.ReadWrite.cs
@@ -0,0 +1,183 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Threading;
+using System.Threading.Tasks;
+using Serilog;
+using ZB.MOM.WW.LmxProxy.Host.Domain;
+
+namespace ZB.MOM.WW.LmxProxy.Host.MxAccess
+{
+ public sealed partial class MxAccessClient
+ {
+ ///
+ /// Reads a single tag value from MxAccess.
+ /// Dispatched to STA thread with semaphore concurrency control.
+ ///
+ public async Task ReadAsync(string address, CancellationToken ct = default)
+ {
+ if (!IsConnected)
+ return Vtq.New(null, Quality.Bad_NotConnected);
+
+ await _readSemaphore.WaitAsync(ct);
+ try
+ {
+ return await _staThread.DispatchAsync(() => ReadInternal(address));
+ }
+ catch (Exception ex)
+ {
+ Log.Error(ex, "ReadAsync failed for tag {Address}", address);
+ return Vtq.New(null, Quality.Bad_CommFailure);
+ }
+ finally
+ {
+ _readSemaphore.Release();
+ }
+ }
+
+ ///
+ /// Reads multiple tags with semaphore-controlled concurrency (max 10 concurrent).
+ /// Each tag is read independently. Partial failures return Bad quality for failed tags.
+ ///
+ public async Task> ReadBatchAsync(
+ IEnumerable addresses, CancellationToken ct = default)
+ {
+ var addressList = addresses.ToList();
+ var results = new Dictionary(addressList.Count, StringComparer.OrdinalIgnoreCase);
+
+ var tasks = addressList.Select(async address =>
+ {
+ var vtq = await ReadAsync(address, ct);
+ return (address, vtq);
+ });
+
+ foreach (var task in await Task.WhenAll(tasks))
+ {
+ results[task.address] = task.vtq;
+ }
+
+ return results;
+ }
+
+ ///
+ /// Writes a single tag value to MxAccess.
+ /// Value should be a native .NET type (not string). Uses TypedValueConverter
+ /// on the gRPC layer; here the value is the boxed .NET object.
+ ///
+ public async Task WriteAsync(string address, object value, CancellationToken ct = default)
+ {
+ if (!IsConnected)
+ throw new InvalidOperationException("Not connected to MxAccess");
+
+ await _writeSemaphore.WaitAsync(ct);
+ try
+ {
+ await _staThread.DispatchAsync(() => WriteInternal(address, value));
+ }
+ finally
+ {
+ _writeSemaphore.Release();
+ }
+ }
+
+ ///
+ /// Writes multiple tag values with semaphore-controlled concurrency.
+ ///
+ public async Task WriteBatchAsync(
+ IReadOnlyDictionary values, CancellationToken ct = default)
+ {
+ var tasks = values.Select(async kvp =>
+ {
+ await WriteAsync(kvp.Key, kvp.Value, ct);
+ });
+
+ await Task.WhenAll(tasks);
+ }
+
+ ///
+ /// Writes a batch, then polls flagTag until it equals flagValue or timeout expires.
+ /// Uses type-aware comparison via TypedValueComparer.
+ ///
+ public async Task<(bool flagReached, int elapsedMs)> WriteBatchAndWaitAsync(
+ IReadOnlyDictionary values,
+ string flagTag,
+ object flagValue,
+ int timeoutMs,
+ int pollIntervalMs,
+ CancellationToken ct = default)
+ {
+ // Write all values first
+ await WriteBatchAsync(values, ct);
+
+ // Poll flag tag
+ var sw = System.Diagnostics.Stopwatch.StartNew();
+ var effectiveTimeout = timeoutMs > 0 ? timeoutMs : 5000;
+ var effectiveInterval = pollIntervalMs > 0 ? pollIntervalMs : 100;
+
+ while (sw.ElapsedMilliseconds < effectiveTimeout)
+ {
+ ct.ThrowIfCancellationRequested();
+
+ var vtq = await ReadAsync(flagTag, ct);
+ if (vtq.Quality.IsGood() && TypedValueComparer.Equals(vtq.Value, flagValue))
+ {
+ return (true, (int)sw.ElapsedMilliseconds);
+ }
+
+ await Task.Delay(effectiveInterval, ct);
+ }
+
+ return (false, (int)sw.ElapsedMilliseconds);
+ }
+
+ // ── Internal COM calls (execute on STA thread) ──────────
+
+ ///
+ /// Reads a single tag from MxAccess COM API.
+ /// Must be called on the STA thread.
+ ///
+ private Vtq ReadInternal(string address)
+ {
+ // The exact MxAccess COM API call depends on the ArchestrA.MXAccess interop assembly.
+ // Consult src-reference/Implementation/MxAccessClient.ReadWrite.cs for the exact
+ // method calls. MxAccess uses a subscribe-read-unsubscribe pattern for reads.
+ //
+ // For now, this throws NotImplementedException. The actual COM call will be
+ // implemented when testing on the windev machine with MxAccess available.
+
+ throw new NotImplementedException(
+ "ReadInternal must be implemented using ArchestrA.MXAccess COM API. " +
+ "See src-reference/Implementation/MxAccessClient.ReadWrite.cs for the exact pattern.");
+ }
+
+ ///
+ /// Writes a single tag via MxAccess COM API.
+ /// Must be called on the STA thread.
+ ///
+ private void WriteInternal(string address, object value)
+ {
+ // The exact COM call pattern uses AddItem, AdviseSupervisory, Write.
+ // Consult src-reference/Implementation/MxAccessClient.ReadWrite.cs for the exact method signature.
+
+ throw new NotImplementedException(
+ "WriteInternal must be implemented using ArchestrA.MXAccess COM API. " +
+ "See src-reference/Implementation/MxAccessClient.ReadWrite.cs for the exact pattern.");
+ }
+
+ ///
+ /// Maps an MxAccess OPC DA quality integer to the domain Quality enum.
+ /// The quality integer from MxAccess is the OPC DA quality byte.
+ ///
+ private static Quality MapQuality(int opcDaQuality)
+ {
+ // OPC DA quality is a byte value that directly maps to our Quality enum
+ if (Enum.IsDefined(typeof(Quality), (byte)opcDaQuality))
+ return (Quality)(byte)opcDaQuality;
+
+ // Fallback: use category bits
+ if (opcDaQuality >= 192) return Quality.Good;
+ if (opcDaQuality >= 64) return Quality.Uncertain;
+ return Quality.Bad;
+ }
+ }
+}
diff --git a/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/MxAccess/MxAccessClient.Subscription.cs b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/MxAccess/MxAccessClient.Subscription.cs
new file mode 100644
index 0000000..5865139
--- /dev/null
+++ b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/MxAccess/MxAccessClient.Subscription.cs
@@ -0,0 +1,158 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Threading;
+using System.Threading.Tasks;
+using Serilog;
+using ZB.MOM.WW.LmxProxy.Host.Domain;
+
+namespace ZB.MOM.WW.LmxProxy.Host.MxAccess
+{
+ public sealed partial class MxAccessClient
+ {
+ ///
+ /// Subscribes to value changes for the specified addresses.
+ /// Stores subscription state for reconnect replay.
+ ///
+ public async Task SubscribeAsync(
+ IEnumerable addresses,
+ Action callback,
+ CancellationToken ct = default)
+ {
+ if (!IsConnected)
+ throw new InvalidOperationException("Not connected to MxAccess");
+
+ var addressList = addresses.ToList();
+
+ await _staThread.DispatchAsync(() =>
+ {
+ foreach (var address in addressList)
+ {
+ SubscribeInternal(address);
+
+ // Store for reconnect replay
+ lock (_lock)
+ {
+ _storedSubscriptions[address] = callback;
+ }
+ }
+ });
+
+ Log.Information("Subscribed to {Count} tags", addressList.Count);
+
+ return new SubscriptionHandle(this, addressList, callback);
+ }
+
+ ///
+ /// Unsubscribes specific addresses.
+ ///
+ internal async Task UnsubscribeAsync(IEnumerable addresses)
+ {
+ var addressList = addresses.ToList();
+
+ await _staThread.DispatchAsync(() =>
+ {
+ foreach (var address in addressList)
+ {
+ UnsubscribeInternal(address);
+
+ lock (_lock)
+ {
+ _storedSubscriptions.Remove(address);
+ }
+ }
+ });
+
+ Log.Information("Unsubscribed from {Count} tags", addressList.Count);
+ }
+
+ ///
+ /// Recreates all stored subscriptions after a reconnect.
+ /// Does not re-store them (they're already stored).
+ ///
+ private async Task RecreateStoredSubscriptionsAsync()
+ {
+ Dictionary> subscriptions;
+ lock (_lock)
+ {
+ if (_storedSubscriptions.Count == 0) return;
+ subscriptions = new Dictionary>(_storedSubscriptions);
+ }
+
+ Log.Information("Recreating {Count} stored subscriptions after reconnect", subscriptions.Count);
+
+ await _staThread.DispatchAsync(() =>
+ {
+ foreach (var kvp in subscriptions)
+ {
+ try
+ {
+ SubscribeInternal(kvp.Key);
+ }
+ catch (Exception ex)
+ {
+ Log.Warning(ex, "Failed to recreate subscription for {Address}", kvp.Key);
+ }
+ }
+ });
+ }
+
+ // ── Internal COM calls (execute on STA thread) ──────────
+
+ ///
+ /// Registers a tag subscription with MxAccess COM API (AddItem + AdviseSupervisory).
+ /// Must be called on the STA thread.
+ ///
+ private void SubscribeInternal(string address)
+ {
+ // The exact MxAccess COM API call is:
+ // var itemHandle = _lmxProxy.AddItem(_connectionHandle, address);
+ // _lmxProxy.AdviseSupervisory(_connectionHandle, itemHandle);
+ //
+ // Consult src-reference/Implementation/MxAccessClient.Subscription.cs
+
+ throw new NotImplementedException(
+ "SubscribeInternal must be implemented using ArchestrA.MXAccess COM API. " +
+ "See src-reference/Implementation/MxAccessClient.Subscription.cs for the exact pattern.");
+ }
+
+ ///
+ /// Unregisters a tag subscription from MxAccess COM API (UnAdvise + RemoveItem).
+ /// Must be called on the STA thread.
+ ///
+ private void UnsubscribeInternal(string address)
+ {
+ // The exact MxAccess COM API call is:
+ // _lmxProxy.UnAdvise(_connectionHandle, itemHandle);
+ // _lmxProxy.RemoveItem(_connectionHandle, itemHandle);
+
+ throw new NotImplementedException(
+ "UnsubscribeInternal must be implemented using ArchestrA.MXAccess COM API.");
+ }
+
+ ///
+ /// Disposable subscription handle that unsubscribes on disposal.
+ ///
+ private sealed class SubscriptionHandle : IAsyncDisposable
+ {
+ private readonly MxAccessClient _client;
+ private readonly List _addresses;
+ private readonly Action _callback;
+ private bool _disposed;
+
+ public SubscriptionHandle(MxAccessClient client, List addresses, Action callback)
+ {
+ _client = client;
+ _addresses = addresses;
+ _callback = callback;
+ }
+
+ public async ValueTask DisposeAsync()
+ {
+ if (_disposed) return;
+ _disposed = true;
+ await _client.UnsubscribeAsync(_addresses);
+ }
+ }
+ }
+}
diff --git a/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/MxAccess/MxAccessClient.cs b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/MxAccess/MxAccessClient.cs
new file mode 100644
index 0000000..7168895
--- /dev/null
+++ b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/MxAccess/MxAccessClient.cs
@@ -0,0 +1,130 @@
+using System;
+using System.Collections.Generic;
+using System.Threading;
+using System.Threading.Tasks;
+using ArchestrA.MxAccess;
+using Serilog;
+using ZB.MOM.WW.LmxProxy.Host.Domain;
+
+namespace ZB.MOM.WW.LmxProxy.Host.MxAccess
+{
+ ///
+ /// Wraps the ArchestrA MXAccess COM API. All COM operations
+ /// execute on a dedicated STA thread via .
+ ///
+ public sealed partial class MxAccessClient : IScadaClient
+ {
+ private static readonly ILogger Log = Serilog.Log.ForContext();
+
+ private readonly StaDispatchThread _staThread;
+ private readonly object _lock = new object();
+ private readonly int _maxConcurrentOperations;
+ private readonly int _readTimeoutMs;
+ private readonly int _writeTimeoutMs;
+ private readonly int _monitorIntervalMs;
+ private readonly bool _autoReconnect;
+ private readonly string? _nodeName;
+ private readonly string? _galaxyName;
+
+ private readonly SemaphoreSlim _readSemaphore;
+ private readonly SemaphoreSlim _writeSemaphore;
+
+ // COM objects — only accessed on STA thread
+ private LMXProxyServer? _lmxProxy;
+ private int _connectionHandle;
+
+ // State
+ private ConnectionState _connectionState = ConnectionState.Disconnected;
+ private DateTime _connectedSince;
+ private bool _disposed;
+
+ // Reconnect
+ private CancellationTokenSource? _reconnectCts;
+
+ // Stored subscriptions for reconnect replay
+ private readonly Dictionary> _storedSubscriptions
+ = new Dictionary>(StringComparer.OrdinalIgnoreCase);
+
+ public MxAccessClient(
+ int maxConcurrentOperations = 10,
+ int readTimeoutSeconds = 5,
+ int writeTimeoutSeconds = 5,
+ int monitorIntervalSeconds = 5,
+ bool autoReconnect = true,
+ string? nodeName = null,
+ string? galaxyName = null)
+ {
+ _maxConcurrentOperations = maxConcurrentOperations;
+ _readTimeoutMs = readTimeoutSeconds * 1000;
+ _writeTimeoutMs = writeTimeoutSeconds * 1000;
+ _monitorIntervalMs = monitorIntervalSeconds * 1000;
+ _autoReconnect = autoReconnect;
+ _nodeName = nodeName;
+ _galaxyName = galaxyName;
+
+ _readSemaphore = new SemaphoreSlim(maxConcurrentOperations, maxConcurrentOperations);
+ _writeSemaphore = new SemaphoreSlim(maxConcurrentOperations, maxConcurrentOperations);
+ _staThread = new StaDispatchThread();
+ }
+
+ public bool IsConnected
+ {
+ get
+ {
+ lock (_lock)
+ {
+ return _lmxProxy != null
+ && _connectionState == ConnectionState.Connected
+ && _connectionHandle > 0;
+ }
+ }
+ }
+
+ public ConnectionState ConnectionState
+ {
+ get { lock (_lock) { return _connectionState; } }
+ }
+
+ public event EventHandler? ConnectionStateChanged;
+
+ private void SetState(ConnectionState newState, string? message = null)
+ {
+ ConnectionState previousState;
+ lock (_lock)
+ {
+ previousState = _connectionState;
+ _connectionState = newState;
+ }
+
+ if (previousState != newState)
+ {
+ Log.Information("Connection state changed: {Previous} -> {Current} {Message}",
+ previousState, newState, message ?? "");
+ ConnectionStateChanged?.Invoke(this,
+ new ConnectionStateChangedEventArgs(previousState, newState, message));
+ }
+ }
+
+ public async ValueTask DisposeAsync()
+ {
+ if (_disposed) return;
+ _disposed = true;
+
+ _reconnectCts?.Cancel();
+
+ try
+ {
+ await DisconnectAsync();
+ }
+ catch (Exception ex)
+ {
+ Log.Warning(ex, "Error during disposal disconnect");
+ }
+
+ _readSemaphore.Dispose();
+ _writeSemaphore.Dispose();
+ _staThread.Dispose();
+ _reconnectCts?.Dispose();
+ }
+ }
+}
diff --git a/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/MxAccess/StaDispatchThread.cs b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/MxAccess/StaDispatchThread.cs
new file mode 100644
index 0000000..916162c
--- /dev/null
+++ b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/MxAccess/StaDispatchThread.cs
@@ -0,0 +1,123 @@
+using System;
+using System.Collections.Concurrent;
+using System.Threading;
+using System.Threading.Tasks;
+using System.Windows.Forms;
+using Serilog;
+
+namespace ZB.MOM.WW.LmxProxy.Host.MxAccess
+{
+ ///
+ /// Dedicated STA thread with a message pump for COM interop.
+ /// All COM operations are dispatched to this thread via a BlockingCollection.
+ ///
+ public sealed class StaDispatchThread : IDisposable
+ {
+ private static readonly ILogger Log = Serilog.Log.ForContext();
+
+ private readonly BlockingCollection _workQueue = new BlockingCollection();
+ private readonly Thread _staThread;
+ private volatile bool _disposed;
+
+ public StaDispatchThread(string threadName = "MxAccess-STA")
+ {
+ _staThread = new Thread(StaThreadLoop)
+ {
+ Name = threadName,
+ IsBackground = true
+ };
+ _staThread.SetApartmentState(ApartmentState.STA);
+ _staThread.Start();
+ Log.Information("STA dispatch thread '{ThreadName}' started", threadName);
+ }
+
+ ///
+ /// Dispatches an action to the STA thread and returns a Task that completes
+ /// when the action finishes.
+ ///
+ public Task DispatchAsync(Action action)
+ {
+ if (_disposed) throw new ObjectDisposedException(nameof(StaDispatchThread));
+
+ var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
+ _workQueue.Add(() =>
+ {
+ try
+ {
+ action();
+ tcs.TrySetResult(true);
+ }
+ catch (Exception ex)
+ {
+ tcs.TrySetException(ex);
+ }
+ });
+ return tcs.Task;
+ }
+
+ ///
+ /// Dispatches a function to the STA thread and returns its result.
+ ///
+ public Task DispatchAsync(Func func)
+ {
+ if (_disposed) throw new ObjectDisposedException(nameof(StaDispatchThread));
+
+ var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
+ _workQueue.Add(() =>
+ {
+ try
+ {
+ var result = func();
+ tcs.TrySetResult(result);
+ }
+ catch (Exception ex)
+ {
+ tcs.TrySetException(ex);
+ }
+ });
+ return tcs.Task;
+ }
+
+ private void StaThreadLoop()
+ {
+ Log.Debug("STA thread loop started");
+
+ // Process the work queue. GetConsumingEnumerable blocks until
+ // items are available or the collection is marked complete.
+ foreach (var action in _workQueue.GetConsumingEnumerable())
+ {
+ try
+ {
+ action();
+ }
+ catch (Exception ex)
+ {
+ // Should not happen — actions set TCS exceptions internally.
+ Log.Error(ex, "Unhandled exception on STA thread");
+ }
+
+ // Pump COM messages between work items
+ Application.DoEvents();
+ }
+
+ Log.Debug("STA thread loop exited");
+ }
+
+ public void Dispose()
+ {
+ if (_disposed) return;
+ _disposed = true;
+
+ _workQueue.CompleteAdding();
+
+ // Wait for the STA thread to drain and exit
+ if (_staThread.IsAlive && !_staThread.Join(TimeSpan.FromSeconds(10)))
+ {
+ Log.Warning("STA thread did not exit within 10 seconds");
+ }
+
+ _workQueue.Dispose();
+ Log.Information("STA dispatch thread disposed");
+ }
+ }
+}
diff --git a/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/Sessions/SessionManager.cs b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/Sessions/SessionManager.cs
new file mode 100644
index 0000000..c0ef555
--- /dev/null
+++ b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/Sessions/SessionManager.cs
@@ -0,0 +1,154 @@
+using System;
+using System.Collections.Concurrent;
+using System.Collections.Generic;
+using System.Linq;
+using System.Threading;
+using Serilog;
+
+namespace ZB.MOM.WW.LmxProxy.Host.Sessions
+{
+ ///
+ /// Tracks active client sessions in memory.
+ /// Thread-safe via ConcurrentDictionary.
+ ///
+ public sealed class SessionManager : IDisposable
+ {
+ private static readonly ILogger Log = Serilog.Log.ForContext();
+
+ private readonly ConcurrentDictionary _sessions
+ = new ConcurrentDictionary(StringComparer.OrdinalIgnoreCase);
+
+ private readonly Timer? _scavengingTimer;
+ private readonly TimeSpan _inactivityTimeout;
+
+ ///
+ /// Creates a SessionManager with optional inactivity scavenging.
+ ///
+ ///
+ /// Sessions inactive for this many minutes are automatically terminated.
+ /// Set to 0 to disable scavenging.
+ ///
+ public SessionManager(int inactivityTimeoutMinutes = 5)
+ {
+ _inactivityTimeout = TimeSpan.FromMinutes(inactivityTimeoutMinutes);
+
+ if (inactivityTimeoutMinutes > 0)
+ {
+ // Check every 60 seconds
+ _scavengingTimer = new Timer(ScavengeInactiveSessions, null,
+ TimeSpan.FromSeconds(60), TimeSpan.FromSeconds(60));
+ }
+ }
+
+ /// Gets the count of active sessions.
+ public int ActiveSessionCount => _sessions.Count;
+
+ ///
+ /// Creates a new session.
+ /// Returns the 32-character hex GUID session ID.
+ ///
+ public string CreateSession(string clientId, string apiKey)
+ {
+ var sessionId = Guid.NewGuid().ToString("N"); // 32-char lowercase hex, no hyphens
+ var sessionInfo = new SessionInfo(sessionId, clientId, apiKey);
+ _sessions[sessionId] = sessionInfo;
+
+ Log.Information("Session created: {SessionId} for client {ClientId}", sessionId, clientId);
+ return sessionId;
+ }
+
+ ///
+ /// Validates a session ID. Updates LastActivity on success.
+ /// Returns true if the session exists.
+ ///
+ public bool ValidateSession(string sessionId)
+ {
+ if (_sessions.TryGetValue(sessionId, out var session))
+ {
+ session.TouchLastActivity();
+ return true;
+ }
+ return false;
+ }
+
+ ///
+ /// Terminates a session. Returns true if the session existed.
+ ///
+ public bool TerminateSession(string sessionId)
+ {
+ if (_sessions.TryRemove(sessionId, out _))
+ {
+ Log.Information("Session terminated: {SessionId}", sessionId);
+ return true;
+ }
+ return false;
+ }
+
+ /// Gets session info by ID, or null if not found.
+ public SessionInfo? GetSession(string sessionId)
+ {
+ _sessions.TryGetValue(sessionId, out var session);
+ return session;
+ }
+
+ /// Gets a snapshot of all active sessions.
+ public IReadOnlyList GetAllSessions()
+ {
+ return _sessions.Values.ToList().AsReadOnly();
+ }
+
+ ///
+ /// Scavenges sessions that have been inactive for longer than the timeout.
+ ///
+ private void ScavengeInactiveSessions(object? state)
+ {
+ if (_inactivityTimeout <= TimeSpan.Zero) return;
+
+ var cutoff = DateTime.UtcNow - _inactivityTimeout;
+ var expired = _sessions.Where(kvp => kvp.Value.LastActivity < cutoff).ToList();
+
+ foreach (var kvp in expired)
+ {
+ if (_sessions.TryRemove(kvp.Key, out _))
+ {
+ Log.Information("Session {SessionId} scavenged (inactive since {LastActivity})",
+ kvp.Key, kvp.Value.LastActivity);
+ }
+ }
+ }
+
+ public void Dispose()
+ {
+ _scavengingTimer?.Dispose();
+ _sessions.Clear();
+ }
+ }
+
+ ///
+ /// Information about an active client session.
+ ///
+ public class SessionInfo
+ {
+ public SessionInfo(string sessionId, string clientId, string apiKey)
+ {
+ SessionId = sessionId;
+ ClientId = clientId;
+ ApiKey = apiKey;
+ ConnectedAt = DateTime.UtcNow;
+ LastActivity = DateTime.UtcNow;
+ }
+
+ public string SessionId { get; }
+ public string ClientId { get; }
+ public string ApiKey { get; }
+ public DateTime ConnectedAt { get; }
+ public DateTime LastActivity { get; private set; }
+ public long ConnectedSinceUtcTicks => ConnectedAt.Ticks;
+
+ /// Updates the last activity timestamp to now.
+ public void TouchLastActivity()
+ {
+ LastActivity = DateTime.UtcNow;
+ }
+ }
+}
diff --git a/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/Subscriptions/SubscriptionManager.cs b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/Subscriptions/SubscriptionManager.cs
new file mode 100644
index 0000000..ffacd92
--- /dev/null
+++ b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/Subscriptions/SubscriptionManager.cs
@@ -0,0 +1,244 @@
+using System;
+using System.Collections.Concurrent;
+using System.Collections.Generic;
+using System.Linq;
+using System.Threading;
+using System.Threading.Channels;
+using Serilog;
+using ZB.MOM.WW.LmxProxy.Host.Domain;
+
+namespace ZB.MOM.WW.LmxProxy.Host.Subscriptions
+{
+ ///
+ /// Manages per-client subscription channels with shared MxAccess subscriptions.
+ /// Ref-counted tag subscriptions: first client creates, last client disposes.
+ ///
+ public sealed class SubscriptionManager : IDisposable
+ {
+ private static readonly ILogger Log = Serilog.Log.ForContext();
+
+ private readonly IScadaClient _scadaClient;
+ private readonly int _channelCapacity;
+ private readonly BoundedChannelFullMode _channelFullMode;
+
+ // Client ID -> ClientSubscription
+ private readonly ConcurrentDictionary _clientSubscriptions
+ = new ConcurrentDictionary(StringComparer.OrdinalIgnoreCase);
+
+ // Tag address -> TagSubscription (shared, ref-counted)
+ private readonly ConcurrentDictionary _tagSubscriptions
+ = new ConcurrentDictionary(StringComparer.OrdinalIgnoreCase);
+
+ private readonly ReaderWriterLockSlim _rwLock = new ReaderWriterLockSlim();
+
+ public SubscriptionManager(IScadaClient scadaClient, int channelCapacity = 1000,
+ BoundedChannelFullMode channelFullMode = BoundedChannelFullMode.DropOldest)
+ {
+ _scadaClient = scadaClient;
+ _channelCapacity = channelCapacity;
+ _channelFullMode = channelFullMode;
+ }
+
+ ///
+ /// Creates a subscription for a client. Returns a ChannelReader to stream from.
+ ///
+ public ChannelReader<(string address, Vtq vtq)> Subscribe(
+ string clientId, IEnumerable addresses, CancellationToken ct)
+ {
+ var channel = Channel.CreateBounded<(string address, Vtq vtq)>(
+ new BoundedChannelOptions(_channelCapacity)
+ {
+ FullMode = _channelFullMode,
+ SingleReader = true,
+ SingleWriter = false
+ });
+
+ var addressSet = new HashSet(addresses, StringComparer.OrdinalIgnoreCase);
+ var clientSub = new ClientSubscription(clientId, channel, addressSet);
+
+ _clientSubscriptions[clientId] = clientSub;
+
+ _rwLock.EnterWriteLock();
+ try
+ {
+ foreach (var address in addressSet)
+ {
+ if (_tagSubscriptions.TryGetValue(address, out var tagSub))
+ {
+ tagSub.ClientIds.Add(clientId);
+ }
+ else
+ {
+ _tagSubscriptions[address] = new TagSubscription(address,
+ new HashSet(StringComparer.OrdinalIgnoreCase) { clientId });
+ }
+ }
+ }
+ finally
+ {
+ _rwLock.ExitWriteLock();
+ }
+
+ // Register cancellation cleanup
+ ct.Register(() => UnsubscribeClient(clientId));
+
+ Log.Information("Client {ClientId} subscribed to {Count} tags", clientId, addressSet.Count);
+ return channel.Reader;
+ }
+
+ ///
+ /// Called from MxAccessClient's OnDataChange handler.
+ /// Fans out the update to all subscribed clients.
+ ///
+ public void OnTagValueChanged(string address, Vtq vtq)
+ {
+ _rwLock.EnterReadLock();
+ HashSet? clientIds = null;
+ try
+ {
+ if (_tagSubscriptions.TryGetValue(address, out var tagSub))
+ {
+ clientIds = new HashSet(tagSub.ClientIds);
+ }
+ }
+ finally
+ {
+ _rwLock.ExitReadLock();
+ }
+
+ if (clientIds == null || clientIds.Count == 0) return;
+
+ foreach (var clientId in clientIds)
+ {
+ if (_clientSubscriptions.TryGetValue(clientId, out var clientSub))
+ {
+ if (!clientSub.Channel.Writer.TryWrite((address, vtq)))
+ {
+ clientSub.IncrementDropped();
+ Log.Debug("Dropped message for client {ClientId} on tag {Address} (channel full)",
+ clientId, address);
+ }
+ else
+ {
+ clientSub.IncrementDelivered();
+ }
+ }
+ }
+ }
+
+ ///
+ /// Removes a client's subscriptions and cleans up tag subscriptions
+ /// when the last client unsubscribes.
+ ///
+ public void UnsubscribeClient(string clientId)
+ {
+ if (!_clientSubscriptions.TryRemove(clientId, out var clientSub))
+ return;
+
+ _rwLock.EnterWriteLock();
+ try
+ {
+ foreach (var address in clientSub.Addresses)
+ {
+ if (_tagSubscriptions.TryGetValue(address, out var tagSub))
+ {
+ tagSub.ClientIds.Remove(clientId);
+
+ // Last client unsubscribed — remove the tag subscription
+ if (tagSub.ClientIds.Count == 0)
+ {
+ _tagSubscriptions.TryRemove(address, out _);
+ }
+ }
+ }
+ }
+ finally
+ {
+ _rwLock.ExitWriteLock();
+ }
+
+ // Complete the channel (signals end of stream to the gRPC handler)
+ clientSub.Channel.Writer.TryComplete();
+
+ Log.Information("Client {ClientId} unsubscribed ({Delivered} delivered, {Dropped} dropped)",
+ clientId, clientSub.DeliveredCount, clientSub.DroppedCount);
+ }
+
+ ///
+ /// Sends a bad-quality notification to all subscribed clients for all their tags.
+ /// Called when MxAccess disconnects.
+ ///
+ public void NotifyDisconnection()
+ {
+ var badVtq = Vtq.New(null, Quality.Bad_NotConnected);
+
+ foreach (var kvp in _clientSubscriptions)
+ {
+ foreach (var address in kvp.Value.Addresses)
+ {
+ kvp.Value.Channel.Writer.TryWrite((address, badVtq));
+ }
+ }
+ }
+
+ /// Returns subscription statistics.
+ public SubscriptionStats GetStats()
+ {
+ return new SubscriptionStats(
+ _clientSubscriptions.Count,
+ _tagSubscriptions.Count,
+ _clientSubscriptions.Values.Sum(c => c.Addresses.Count));
+ }
+
+ public void Dispose()
+ {
+ foreach (var kvp in _clientSubscriptions)
+ {
+ kvp.Value.Channel.Writer.TryComplete();
+ }
+ _clientSubscriptions.Clear();
+ _tagSubscriptions.Clear();
+ _rwLock.Dispose();
+ }
+
+ // ── Nested types ─────────────────────────────────────────
+
+ private class ClientSubscription
+ {
+ public ClientSubscription(string clientId,
+ Channel<(string address, Vtq vtq)> channel,
+ HashSet addresses)
+ {
+ ClientId = clientId;
+ Channel = channel;
+ Addresses = addresses;
+ }
+
+ public string ClientId { get; }
+ public Channel<(string address, Vtq vtq)> Channel { get; }
+ public HashSet Addresses { get; }
+
+ // Use backing fields for Interlocked
+ private long _delivered;
+ private long _dropped;
+
+ public long DeliveredCount => Interlocked.Read(ref _delivered);
+ public long DroppedCount => Interlocked.Read(ref _dropped);
+
+ public void IncrementDelivered() => Interlocked.Increment(ref _delivered);
+ public void IncrementDropped() => Interlocked.Increment(ref _dropped);
+ }
+
+ private class TagSubscription
+ {
+ public TagSubscription(string address, HashSet clientIds)
+ {
+ Address = address;
+ ClientIds = clientIds;
+ }
+
+ public string Address { get; }
+ public HashSet ClientIds { get; }
+ }
+ }
+}
diff --git a/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/ZB.MOM.WW.LmxProxy.Host.csproj b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/ZB.MOM.WW.LmxProxy.Host.csproj
index bbf30a3..d20063b 100644
--- a/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/ZB.MOM.WW.LmxProxy.Host.csproj
+++ b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/ZB.MOM.WW.LmxProxy.Host.csproj
@@ -46,6 +46,7 @@
..\..\lib\ArchestrA.MXAccess.dll
true
+
diff --git a/lmxproxy/tests/ZB.MOM.WW.LmxProxy.Host.Tests/MxAccess/StaDispatchThreadTests.cs b/lmxproxy/tests/ZB.MOM.WW.LmxProxy.Host.Tests/MxAccess/StaDispatchThreadTests.cs
new file mode 100644
index 0000000..7f60949
--- /dev/null
+++ b/lmxproxy/tests/ZB.MOM.WW.LmxProxy.Host.Tests/MxAccess/StaDispatchThreadTests.cs
@@ -0,0 +1,86 @@
+using System;
+using System.Threading;
+using System.Threading.Tasks;
+using FluentAssertions;
+using Xunit;
+using ZB.MOM.WW.LmxProxy.Host.MxAccess;
+
+namespace ZB.MOM.WW.LmxProxy.Host.Tests.MxAccess
+{
+ public class StaDispatchThreadTests
+ {
+ [Fact]
+ public async Task DispatchAsync_ExecutesOnStaThread()
+ {
+ using var sta = new StaDispatchThread("Test-STA");
+ var threadId = await sta.DispatchAsync(() => Thread.CurrentThread.ManagedThreadId);
+ threadId.Should().NotBe(Thread.CurrentThread.ManagedThreadId);
+ }
+
+ [Fact]
+ public async Task DispatchAsync_ReturnsResult()
+ {
+ using var sta = new StaDispatchThread("Test-STA");
+ var result = await sta.DispatchAsync(() => 42);
+ result.Should().Be(42);
+ }
+
+ [Fact]
+ public async Task DispatchAsync_PropagatesException()
+ {
+ using var sta = new StaDispatchThread("Test-STA");
+ Func act = () => sta.DispatchAsync(() => throw new InvalidOperationException("test error"));
+ await act.Should().ThrowAsync().WithMessage("test error");
+ }
+
+ [Fact]
+ public async Task DispatchAsync_Action_Completes()
+ {
+ using var sta = new StaDispatchThread("Test-STA");
+ int value = 0;
+ await sta.DispatchAsync(() => { value = 99; });
+ value.Should().Be(99);
+ }
+
+ [Fact]
+ public void Dispose_CompletesGracefully()
+ {
+ var sta = new StaDispatchThread("Test-STA");
+ sta.Dispose(); // Should not throw
+ }
+
+ [Fact]
+ public void DispatchAfterDispose_ThrowsObjectDisposedException()
+ {
+ var sta = new StaDispatchThread("Test-STA");
+ sta.Dispose();
+ Func act = () => sta.DispatchAsync(() => 42);
+ act.Should().ThrowAsync();
+ }
+
+ [Fact]
+ public async Task MultipleDispatches_ExecuteInOrder()
+ {
+ using var sta = new StaDispatchThread("Test-STA");
+ var results = new System.Collections.Concurrent.ConcurrentBag();
+
+ var tasks = new Task[10];
+ for (int i = 0; i < 10; i++)
+ {
+ int idx = i;
+ tasks[i] = sta.DispatchAsync(() => { results.Add(idx); });
+ }
+
+ await Task.WhenAll(tasks);
+ results.Count.Should().Be(10);
+ }
+
+ [Fact]
+ public async Task StaThread_HasStaApartmentState()
+ {
+ using var sta = new StaDispatchThread("Test-STA");
+ var apartmentState = await sta.DispatchAsync(() => Thread.CurrentThread.GetApartmentState());
+ apartmentState.Should().Be(ApartmentState.STA);
+ }
+ }
+}
diff --git a/lmxproxy/tests/ZB.MOM.WW.LmxProxy.Host.Tests/MxAccess/TypedValueEqualsTests.cs b/lmxproxy/tests/ZB.MOM.WW.LmxProxy.Host.Tests/MxAccess/TypedValueEqualsTests.cs
new file mode 100644
index 0000000..980ab27
--- /dev/null
+++ b/lmxproxy/tests/ZB.MOM.WW.LmxProxy.Host.Tests/MxAccess/TypedValueEqualsTests.cs
@@ -0,0 +1,78 @@
+using FluentAssertions;
+using Xunit;
+using ZB.MOM.WW.LmxProxy.Host.Domain;
+
+namespace ZB.MOM.WW.LmxProxy.Host.Tests.MxAccess
+{
+ public class TypedValueEqualsTests
+ {
+ [Fact]
+ public void NullEqualsNull() => TypedValueComparer.Equals(null, null).Should().BeTrue();
+
+ [Fact]
+ public void NullNotEqualsValue() => TypedValueComparer.Equals(null, 42).Should().BeFalse();
+
+ [Fact]
+ public void ValueNotEqualsNull() => TypedValueComparer.Equals(42, null).Should().BeFalse();
+
+ [Fact]
+ public void SameTypeAndValue() => TypedValueComparer.Equals(42.5, 42.5).Should().BeTrue();
+
+ [Fact]
+ public void SameTypeDifferentValue() => TypedValueComparer.Equals(42.5, 43.0).Should().BeFalse();
+
+ [Fact]
+ public void DifferentTypes_NeverEqual() => TypedValueComparer.Equals(1, 1.0).Should().BeFalse();
+
+ [Fact]
+ public void BoolTrue() => TypedValueComparer.Equals(true, true).Should().BeTrue();
+
+ [Fact]
+ public void BoolFalse() => TypedValueComparer.Equals(false, true).Should().BeFalse();
+
+ [Fact]
+ public void String_CaseSensitive()
+ {
+ TypedValueComparer.Equals("DONE", "DONE").Should().BeTrue();
+ TypedValueComparer.Equals("done", "DONE").Should().BeFalse();
+ }
+
+ [Fact]
+ public void Array_SameElements()
+ {
+ TypedValueComparer.Equals(new[] { 1, 2, 3 }, new[] { 1, 2, 3 }).Should().BeTrue();
+ }
+
+ [Fact]
+ public void Array_DifferentElements()
+ {
+ TypedValueComparer.Equals(new[] { 1, 2, 3 }, new[] { 1, 2, 4 }).Should().BeFalse();
+ }
+
+ [Fact]
+ public void Array_DifferentLengths()
+ {
+ TypedValueComparer.Equals(new[] { 1, 2 }, new[] { 1, 2, 3 }).Should().BeFalse();
+ }
+
+ [Fact]
+ public void Int32_NotEqual_ToDouble()
+ {
+ TypedValueComparer.Equals(1, 1.0).Should().BeFalse();
+ }
+
+ [Fact]
+ public void Long_Equality()
+ {
+ TypedValueComparer.Equals(long.MaxValue, long.MaxValue).Should().BeTrue();
+ }
+
+ [Fact]
+ public void DateTime_TickPrecision()
+ {
+ var dt1 = new System.DateTime(638789000000000000, System.DateTimeKind.Utc);
+ var dt2 = new System.DateTime(638789000000000000, System.DateTimeKind.Utc);
+ TypedValueComparer.Equals(dt1, dt2).Should().BeTrue();
+ }
+ }
+}
diff --git a/lmxproxy/tests/ZB.MOM.WW.LmxProxy.Host.Tests/Sessions/SessionManagerTests.cs b/lmxproxy/tests/ZB.MOM.WW.LmxProxy.Host.Tests/Sessions/SessionManagerTests.cs
new file mode 100644
index 0000000..9000ef4
--- /dev/null
+++ b/lmxproxy/tests/ZB.MOM.WW.LmxProxy.Host.Tests/Sessions/SessionManagerTests.cs
@@ -0,0 +1,148 @@
+using System;
+using System.Threading;
+using System.Threading.Tasks;
+using FluentAssertions;
+using Xunit;
+using ZB.MOM.WW.LmxProxy.Host.Sessions;
+
+namespace ZB.MOM.WW.LmxProxy.Host.Tests.Sessions
+{
+ public class SessionManagerTests
+ {
+ [Fact]
+ public void CreateSession_Returns32CharHexId()
+ {
+ using var sm = new SessionManager(inactivityTimeoutMinutes: 0);
+ var id = sm.CreateSession("client1", "key1");
+ id.Should().HaveLength(32);
+ id.Should().MatchRegex("^[0-9a-f]{32}$");
+ }
+
+ [Fact]
+ public void CreateSession_IncrementsCount()
+ {
+ using var sm = new SessionManager(inactivityTimeoutMinutes: 0);
+ sm.ActiveSessionCount.Should().Be(0);
+ sm.CreateSession("c1", "k1");
+ sm.ActiveSessionCount.Should().Be(1);
+ sm.CreateSession("c2", "k2");
+ sm.ActiveSessionCount.Should().Be(2);
+ }
+
+ [Fact]
+ public void ValidateSession_ReturnsTrueForExistingSession()
+ {
+ using var sm = new SessionManager(inactivityTimeoutMinutes: 0);
+ var id = sm.CreateSession("c1", "k1");
+ sm.ValidateSession(id).Should().BeTrue();
+ }
+
+ [Fact]
+ public void ValidateSession_ReturnsFalseForUnknownSession()
+ {
+ using var sm = new SessionManager(inactivityTimeoutMinutes: 0);
+ sm.ValidateSession("nonexistent").Should().BeFalse();
+ }
+
+ [Fact]
+ public void ValidateSession_UpdatesLastActivity()
+ {
+ using var sm = new SessionManager(inactivityTimeoutMinutes: 0);
+ var id = sm.CreateSession("c1", "k1");
+ var session = sm.GetSession(id);
+ var initialActivity = session!.LastActivity;
+
+ Thread.Sleep(50); // Small delay to ensure time passes
+ sm.ValidateSession(id);
+
+ session.LastActivity.Should().BeAfter(initialActivity);
+ }
+
+ [Fact]
+ public void TerminateSession_RemovesSession()
+ {
+ using var sm = new SessionManager(inactivityTimeoutMinutes: 0);
+ var id = sm.CreateSession("c1", "k1");
+ sm.TerminateSession(id).Should().BeTrue();
+ sm.ActiveSessionCount.Should().Be(0);
+ sm.ValidateSession(id).Should().BeFalse();
+ }
+
+ [Fact]
+ public void TerminateSession_ReturnsFalseForUnknownSession()
+ {
+ using var sm = new SessionManager(inactivityTimeoutMinutes: 0);
+ sm.TerminateSession("nonexistent").Should().BeFalse();
+ }
+
+ [Fact]
+ public void GetSession_ReturnsNullForUnknown()
+ {
+ using var sm = new SessionManager(inactivityTimeoutMinutes: 0);
+ sm.GetSession("nonexistent").Should().BeNull();
+ }
+
+ [Fact]
+ public void GetSession_ReturnsCorrectInfo()
+ {
+ using var sm = new SessionManager(inactivityTimeoutMinutes: 0);
+ var id = sm.CreateSession("client-abc", "key-xyz");
+ var session = sm.GetSession(id);
+ session.Should().NotBeNull();
+ session!.ClientId.Should().Be("client-abc");
+ session.ApiKey.Should().Be("key-xyz");
+ session.SessionId.Should().Be(id);
+ session.ConnectedAt.Should().BeCloseTo(DateTime.UtcNow, TimeSpan.FromSeconds(2));
+ }
+
+ [Fact]
+ public void GetAllSessions_ReturnsSnapshot()
+ {
+ using var sm = new SessionManager(inactivityTimeoutMinutes: 0);
+ sm.CreateSession("c1", "k1");
+ sm.CreateSession("c2", "k2");
+ var all = sm.GetAllSessions();
+ all.Should().HaveCount(2);
+ }
+
+ [Fact]
+ public async Task ConcurrentAccess_IsThreadSafe()
+ {
+ using var sm = new SessionManager(inactivityTimeoutMinutes: 0);
+ var tasks = new Task[100];
+ for (int i = 0; i < 100; i++)
+ {
+ int idx = i;
+ tasks[i] = Task.Run(() =>
+ {
+ var id = sm.CreateSession($"client-{idx}", $"key-{idx}");
+ sm.ValidateSession(id);
+ if (idx % 3 == 0) sm.TerminateSession(id);
+ });
+ }
+ await Task.WhenAll(tasks);
+
+ // Should have ~67 sessions remaining (100 - ~33 terminated)
+ sm.ActiveSessionCount.Should().BeInRange(60, 70);
+ }
+
+ [Fact]
+ public void Dispose_ClearsAllSessions()
+ {
+ var sm = new SessionManager(inactivityTimeoutMinutes: 0);
+ sm.CreateSession("c1", "k1");
+ sm.CreateSession("c2", "k2");
+ sm.Dispose();
+ sm.ActiveSessionCount.Should().Be(0);
+ }
+
+ [Fact]
+ public void ConnectedSinceUtcTicks_ReturnsCorrectValue()
+ {
+ using var sm = new SessionManager(inactivityTimeoutMinutes: 0);
+ var id = sm.CreateSession("c1", "k1");
+ var session = sm.GetSession(id);
+ session!.ConnectedSinceUtcTicks.Should().Be(session.ConnectedAt.Ticks);
+ }
+ }
+}
diff --git a/lmxproxy/tests/ZB.MOM.WW.LmxProxy.Host.Tests/Subscriptions/SubscriptionManagerTests.cs b/lmxproxy/tests/ZB.MOM.WW.LmxProxy.Host.Tests/Subscriptions/SubscriptionManagerTests.cs
new file mode 100644
index 0000000..58cde93
--- /dev/null
+++ b/lmxproxy/tests/ZB.MOM.WW.LmxProxy.Host.Tests/Subscriptions/SubscriptionManagerTests.cs
@@ -0,0 +1,193 @@
+using System;
+using System.Collections.Generic;
+using System.Threading;
+using System.Threading.Channels;
+using System.Threading.Tasks;
+using FluentAssertions;
+using Xunit;
+using ZB.MOM.WW.LmxProxy.Host.Domain;
+using ZB.MOM.WW.LmxProxy.Host.Subscriptions;
+
+namespace ZB.MOM.WW.LmxProxy.Host.Tests.Subscriptions
+{
+ public class SubscriptionManagerTests
+ {
+ /// Fake IScadaClient for testing (no COM dependency).
+ private class FakeScadaClient : IScadaClient
+ {
+ public bool IsConnected => true;
+ public ConnectionState ConnectionState => ConnectionState.Connected;
+ public event EventHandler? ConnectionStateChanged;
+ public Task ConnectAsync(CancellationToken ct = default) => Task.CompletedTask;
+ public Task DisconnectAsync(CancellationToken ct = default) => Task.CompletedTask;
+ public Task ReadAsync(string address, CancellationToken ct = default) =>
+ Task.FromResult(Vtq.Good(42.0));
+ public Task> ReadBatchAsync(IEnumerable addresses, CancellationToken ct = default) =>
+ Task.FromResult>(new Dictionary());
+ public Task WriteAsync(string address, object value, CancellationToken ct = default) => Task.CompletedTask;
+ public Task WriteBatchAsync(IReadOnlyDictionary values, CancellationToken ct = default) => Task.CompletedTask;
+ public Task<(bool flagReached, int elapsedMs)> WriteBatchAndWaitAsync(
+ IReadOnlyDictionary values, string flagTag, object flagValue,
+ int timeoutMs, int pollIntervalMs, CancellationToken ct = default) =>
+ Task.FromResult((false, 0));
+ public Task SubscribeAsync(IEnumerable addresses, Action callback, CancellationToken ct = default) =>
+ Task.FromResult(new FakeSubscriptionHandle());
+ public ValueTask DisposeAsync() => default;
+
+ // Suppress unused event warning
+ internal void FireEvent() => ConnectionStateChanged?.Invoke(this, null!);
+
+ private class FakeSubscriptionHandle : IAsyncDisposable
+ {
+ public ValueTask DisposeAsync() => default;
+ }
+ }
+
+ [Fact]
+ public void Subscribe_ReturnsChannelReader()
+ {
+ using var sm = new SubscriptionManager(new FakeScadaClient());
+ using var cts = new CancellationTokenSource();
+ var reader = sm.Subscribe("client1", new[] { "Tag1", "Tag2" }, cts.Token);
+ reader.Should().NotBeNull();
+ }
+
+ [Fact]
+ public async Task OnTagValueChanged_FansOutToSubscribedClients()
+ {
+ using var sm = new SubscriptionManager(new FakeScadaClient());
+ using var cts = new CancellationTokenSource();
+ var reader = sm.Subscribe("client1", new[] { "Motor.Speed" }, cts.Token);
+
+ var vtq = Vtq.Good(42.0);
+ sm.OnTagValueChanged("Motor.Speed", vtq);
+
+ var result = await reader.ReadAsync(cts.Token);
+ result.address.Should().Be("Motor.Speed");
+ result.vtq.Value.Should().Be(42.0);
+ result.vtq.Quality.Should().Be(Quality.Good);
+ }
+
+ [Fact]
+ public async Task OnTagValueChanged_MultipleClients_BothReceive()
+ {
+ using var sm = new SubscriptionManager(new FakeScadaClient());
+ using var cts = new CancellationTokenSource();
+ var reader1 = sm.Subscribe("client1", new[] { "Motor.Speed" }, cts.Token);
+ var reader2 = sm.Subscribe("client2", new[] { "Motor.Speed" }, cts.Token);
+
+ sm.OnTagValueChanged("Motor.Speed", Vtq.Good(99.0));
+
+ var r1 = await reader1.ReadAsync(cts.Token);
+ var r2 = await reader2.ReadAsync(cts.Token);
+ r1.vtq.Value.Should().Be(99.0);
+ r2.vtq.Value.Should().Be(99.0);
+ }
+
+ [Fact]
+ public void OnTagValueChanged_NonSubscribedTag_NoDelivery()
+ {
+ using var sm = new SubscriptionManager(new FakeScadaClient());
+ using var cts = new CancellationTokenSource();
+ var reader = sm.Subscribe("client1", new[] { "Motor.Speed" }, cts.Token);
+
+ sm.OnTagValueChanged("Motor.Torque", Vtq.Good(10.0));
+
+ // Channel should be empty
+ reader.TryRead(out _).Should().BeFalse();
+ }
+
+ [Fact]
+ public void UnsubscribeClient_CompletesChannel()
+ {
+ using var sm = new SubscriptionManager(new FakeScadaClient());
+ using var cts = new CancellationTokenSource();
+ var reader = sm.Subscribe("client1", new[] { "Motor.Speed" }, cts.Token);
+
+ sm.UnsubscribeClient("client1");
+
+ // Channel should be completed
+ reader.Completion.IsCompleted.Should().BeTrue();
+ }
+
+ [Fact]
+ public void UnsubscribeClient_RemovesFromTagSubscriptions()
+ {
+ using var sm = new SubscriptionManager(new FakeScadaClient());
+ using var cts = new CancellationTokenSource();
+ sm.Subscribe("client1", new[] { "Motor.Speed" }, cts.Token);
+
+ sm.UnsubscribeClient("client1");
+
+ var stats = sm.GetStats();
+ stats.TotalClients.Should().Be(0);
+ stats.TotalTags.Should().Be(0);
+ }
+
+ [Fact]
+ public void RefCounting_LastClientUnsubscribeRemovesTag()
+ {
+ using var sm = new SubscriptionManager(new FakeScadaClient());
+ using var cts = new CancellationTokenSource();
+ sm.Subscribe("client1", new[] { "Motor.Speed" }, cts.Token);
+ sm.Subscribe("client2", new[] { "Motor.Speed" }, cts.Token);
+
+ sm.GetStats().TotalTags.Should().Be(1);
+
+ sm.UnsubscribeClient("client1");
+ sm.GetStats().TotalTags.Should().Be(1); // client2 still subscribed
+
+ sm.UnsubscribeClient("client2");
+ sm.GetStats().TotalTags.Should().Be(0); // last client gone
+ }
+
+ [Fact]
+ public void NotifyDisconnection_SendsBadQualityToAll()
+ {
+ using var sm = new SubscriptionManager(new FakeScadaClient());
+ using var cts = new CancellationTokenSource();
+ var reader = sm.Subscribe("client1", new[] { "Motor.Speed", "Motor.Torque" }, cts.Token);
+
+ sm.NotifyDisconnection();
+
+ // Should receive 2 bad quality messages
+ reader.TryRead(out var r1).Should().BeTrue();
+ r1.vtq.Quality.Should().Be(Quality.Bad_NotConnected);
+ reader.TryRead(out var r2).Should().BeTrue();
+ r2.vtq.Quality.Should().Be(Quality.Bad_NotConnected);
+ }
+
+ [Fact]
+ public void Backpressure_DropOldest_DropsWhenFull()
+ {
+ using var sm = new SubscriptionManager(new FakeScadaClient(), channelCapacity: 3);
+ using var cts = new CancellationTokenSource();
+ var reader = sm.Subscribe("client1", new[] { "Motor.Speed" }, cts.Token);
+
+ // Fill the channel beyond capacity
+ for (int i = 0; i < 10; i++)
+ {
+ sm.OnTagValueChanged("Motor.Speed", Vtq.Good((double)i));
+ }
+
+ // Should have exactly 3 messages (capacity limit)
+ int count = 0;
+ while (reader.TryRead(out _)) count++;
+ count.Should().Be(3);
+ }
+
+ [Fact]
+ public void GetStats_ReturnsCorrectCounts()
+ {
+ using var sm = new SubscriptionManager(new FakeScadaClient());
+ using var cts = new CancellationTokenSource();
+ sm.Subscribe("c1", new[] { "Tag1", "Tag2" }, cts.Token);
+ sm.Subscribe("c2", new[] { "Tag2", "Tag3" }, cts.Token);
+
+ var stats = sm.GetStats();
+ stats.TotalClients.Should().Be(2);
+ stats.TotalTags.Should().Be(3); // Tag1, Tag2, Tag3
+ stats.ActiveSubscriptions.Should().Be(4); // c1:Tag1, c1:Tag2, c2:Tag2, c2:Tag3
+ }
+ }
+}