From 64c92c63e5100cd9a10becb7f276abcbad72d6b2 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sat, 21 Mar 2026 23:58:17 -0400 Subject: [PATCH] =?UTF-8?q?feat(lmxproxy):=20phase=202=20=E2=80=94=20host?= =?UTF-8?q?=20core=20(MxAccessClient,=20SessionManager,=20SubscriptionMana?= =?UTF-8?q?ger)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-Authored-By: Claude Opus 4.6 (1M context) --- .../Domain/SubscriptionStats.cs | 17 ++ .../Domain/TypedValueComparer.cs | 35 +++ .../MxAccess/MxAccessClient.Connection.cs | 199 ++++++++++++++ .../MxAccess/MxAccessClient.EventHandlers.cs | 85 ++++++ .../MxAccess/MxAccessClient.ReadWrite.cs | 183 +++++++++++++ .../MxAccess/MxAccessClient.Subscription.cs | 158 ++++++++++++ .../MxAccess/MxAccessClient.cs | 130 ++++++++++ .../MxAccess/StaDispatchThread.cs | 123 +++++++++ .../Sessions/SessionManager.cs | 154 +++++++++++ .../Subscriptions/SubscriptionManager.cs | 244 ++++++++++++++++++ .../ZB.MOM.WW.LmxProxy.Host.csproj | 1 + .../MxAccess/StaDispatchThreadTests.cs | 86 ++++++ .../MxAccess/TypedValueEqualsTests.cs | 78 ++++++ .../Sessions/SessionManagerTests.cs | 148 +++++++++++ .../Subscriptions/SubscriptionManagerTests.cs | 193 ++++++++++++++ 15 files changed, 1834 insertions(+) create mode 100644 lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/Domain/SubscriptionStats.cs create mode 100644 lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/Domain/TypedValueComparer.cs create mode 100644 lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/MxAccess/MxAccessClient.Connection.cs create mode 100644 lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/MxAccess/MxAccessClient.EventHandlers.cs create mode 100644 lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/MxAccess/MxAccessClient.ReadWrite.cs create mode 100644 lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/MxAccess/MxAccessClient.Subscription.cs create mode 100644 lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/MxAccess/MxAccessClient.cs create mode 100644 lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/MxAccess/StaDispatchThread.cs create mode 100644 lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/Sessions/SessionManager.cs create mode 100644 lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/Subscriptions/SubscriptionManager.cs create mode 100644 lmxproxy/tests/ZB.MOM.WW.LmxProxy.Host.Tests/MxAccess/StaDispatchThreadTests.cs create mode 100644 lmxproxy/tests/ZB.MOM.WW.LmxProxy.Host.Tests/MxAccess/TypedValueEqualsTests.cs create mode 100644 lmxproxy/tests/ZB.MOM.WW.LmxProxy.Host.Tests/Sessions/SessionManagerTests.cs create mode 100644 lmxproxy/tests/ZB.MOM.WW.LmxProxy.Host.Tests/Subscriptions/SubscriptionManagerTests.cs diff --git a/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/Domain/SubscriptionStats.cs b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/Domain/SubscriptionStats.cs new file mode 100644 index 0000000..d679488 --- /dev/null +++ b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/Domain/SubscriptionStats.cs @@ -0,0 +1,17 @@ +namespace ZB.MOM.WW.LmxProxy.Host.Domain +{ + /// Subscription statistics for monitoring. + public class SubscriptionStats + { + public SubscriptionStats(int totalClients, int totalTags, int activeSubscriptions) + { + TotalClients = totalClients; + TotalTags = totalTags; + ActiveSubscriptions = activeSubscriptions; + } + + public int TotalClients { get; } + public int TotalTags { get; } + public int ActiveSubscriptions { get; } + } +} diff --git a/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/Domain/TypedValueComparer.cs b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/Domain/TypedValueComparer.cs new file mode 100644 index 0000000..1f7eb11 --- /dev/null +++ b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/Domain/TypedValueComparer.cs @@ -0,0 +1,35 @@ +using System; + +namespace ZB.MOM.WW.LmxProxy.Host.Domain +{ + /// + /// Type-aware equality comparison for WriteBatchAndWait flag matching. + /// + public static class TypedValueComparer + { + /// + /// Returns true if both values are the same type and equal. + /// Mismatched types are never equal. + /// Null equals null only. + /// + public new static bool Equals(object? a, object? b) + { + if (a == null && b == null) return true; + if (a == null || b == null) return false; + if (a.GetType() != b.GetType()) return false; + + if (a is Array arrA && b is Array arrB) + { + if (arrA.Length != arrB.Length) return false; + for (int i = 0; i < arrA.Length; i++) + { + if (!object.Equals(arrA.GetValue(i), arrB.GetValue(i))) + return false; + } + return true; + } + + return object.Equals(a, b); + } + } +} diff --git a/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/MxAccess/MxAccessClient.Connection.cs b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/MxAccess/MxAccessClient.Connection.cs new file mode 100644 index 0000000..77bbc8e --- /dev/null +++ b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/MxAccess/MxAccessClient.Connection.cs @@ -0,0 +1,199 @@ +using System; +using System.Runtime.InteropServices; +using System.Threading; +using System.Threading.Tasks; +using ArchestrA.MxAccess; +using Serilog; +using ZB.MOM.WW.LmxProxy.Host.Domain; + +namespace ZB.MOM.WW.LmxProxy.Host.MxAccess +{ + public sealed partial class MxAccessClient + { + /// + /// Connects to MxAccess on the STA thread. + /// + public async Task ConnectAsync(CancellationToken ct = default) + { + if (_disposed) throw new ObjectDisposedException(nameof(MxAccessClient)); + if (IsConnected) return; + + SetState(ConnectionState.Connecting); + + try + { + await _staThread.DispatchAsync(() => + { + // Create COM object + _lmxProxy = new LMXProxyServer(); + + // Wire event handlers + _lmxProxy.OnDataChange += OnDataChange; + _lmxProxy.OnWriteComplete += OnWriteComplete; + + // Register with MxAccess + _connectionHandle = _lmxProxy.Register("ZB.MOM.WW.LmxProxy.Host"); + }); + + lock (_lock) + { + _connectedSince = DateTime.UtcNow; + } + + SetState(ConnectionState.Connected); + Log.Information("Connected to MxAccess (handle={Handle})", _connectionHandle); + + // Recreate any stored subscriptions from a previous connection + await RecreateStoredSubscriptionsAsync(); + } + catch (Exception ex) + { + Log.Error(ex, "Failed to connect to MxAccess"); + await CleanupComObjectsAsync(); + SetState(ConnectionState.Error, ex.Message); + throw; + } + } + + /// + /// Disconnects from MxAccess on the STA thread. + /// + public async Task DisconnectAsync(CancellationToken ct = default) + { + if (!IsConnected) return; + + SetState(ConnectionState.Disconnecting); + + try + { + await _staThread.DispatchAsync(() => + { + if (_lmxProxy != null && _connectionHandle > 0) + { + try + { + // Remove event handlers first + _lmxProxy.OnDataChange -= OnDataChange; + _lmxProxy.OnWriteComplete -= OnWriteComplete; + + // Unregister + _lmxProxy.Unregister(_connectionHandle); + } + catch (Exception ex) + { + Log.Warning(ex, "Error during MxAccess unregister"); + } + finally + { + // Force-release COM object + Marshal.ReleaseComObject(_lmxProxy); + _lmxProxy = null; + _connectionHandle = 0; + } + } + }); + + SetState(ConnectionState.Disconnected); + Log.Information("Disconnected from MxAccess"); + } + catch (Exception ex) + { + Log.Error(ex, "Error during disconnect"); + SetState(ConnectionState.Error, ex.Message); + } + } + + /// + /// Starts the auto-reconnect monitor loop. + /// Call this after initial ConnectAsync succeeds. + /// + public void StartMonitorLoop() + { + if (!_autoReconnect) return; + + _reconnectCts = new CancellationTokenSource(); + Task.Run(() => MonitorConnectionAsync(_reconnectCts.Token)); + } + + /// + /// Stops the auto-reconnect monitor loop. + /// + public void StopMonitorLoop() + { + _reconnectCts?.Cancel(); + } + + /// + /// Auto-reconnect monitor loop. Checks connection every monitorInterval. + /// On disconnect, attempts reconnect. On failure, retries at next interval. + /// + private async Task MonitorConnectionAsync(CancellationToken ct) + { + Log.Information("Connection monitor loop started (interval={IntervalMs}ms)", _monitorIntervalMs); + + while (!ct.IsCancellationRequested) + { + try + { + await Task.Delay(_monitorIntervalMs, ct); + } + catch (OperationCanceledException) + { + break; + } + + if (IsConnected) continue; + + Log.Information("MxAccess disconnected, attempting reconnect..."); + SetState(ConnectionState.Reconnecting); + + try + { + await ConnectAsync(ct); + Log.Information("Reconnected to MxAccess successfully"); + } + catch (OperationCanceledException) + { + break; + } + catch (Exception ex) + { + Log.Warning(ex, "Reconnect attempt failed, will retry in {IntervalMs}ms", _monitorIntervalMs); + } + } + + Log.Information("Connection monitor loop exited"); + } + + /// + /// Cleans up COM objects on the STA thread after a failed connection. + /// + private async Task CleanupComObjectsAsync() + { + try + { + await _staThread.DispatchAsync(() => + { + if (_lmxProxy != null) + { + try { _lmxProxy.OnDataChange -= OnDataChange; } catch { } + try { _lmxProxy.OnWriteComplete -= OnWriteComplete; } catch { } + try { Marshal.ReleaseComObject(_lmxProxy); } catch { } + _lmxProxy = null; + } + _connectionHandle = 0; + }); + } + catch (Exception ex) + { + Log.Warning(ex, "Error during COM object cleanup"); + } + } + + /// Gets the UTC time when the connection was established. + public DateTime ConnectedSince + { + get { lock (_lock) { return _connectedSince; } } + } + } +} diff --git a/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/MxAccess/MxAccessClient.EventHandlers.cs b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/MxAccess/MxAccessClient.EventHandlers.cs new file mode 100644 index 0000000..9c341c3 --- /dev/null +++ b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/MxAccess/MxAccessClient.EventHandlers.cs @@ -0,0 +1,85 @@ +using System; +using System.Collections.Generic; +using ArchestrA.MxAccess; +using Serilog; +using ZB.MOM.WW.LmxProxy.Host.Domain; + +namespace ZB.MOM.WW.LmxProxy.Host.MxAccess +{ + public sealed partial class MxAccessClient + { + /// + /// Callback invoked by the SubscriptionManager when it needs to deliver + /// data change events. Set by the SubscriptionManager during initialization. + /// + public Action? OnTagValueChanged { get; set; } + + /// + /// COM event handler for MxAccess OnDataChange events. + /// Called on the STA thread when a subscribed tag value changes. + /// Signature matches the ArchestrA.MxAccess ILMXProxyServerEvents interface. + /// + private void OnDataChange( + int hLMXServerHandle, + int phItemHandle, + object pvItemValue, + int pwItemQuality, + object pftItemTimeStamp, + ref MXSTATUS_PROXY[] ItemStatus) + { + try + { + var quality = MapQuality(pwItemQuality); + var timestamp = ConvertTimestamp(pftItemTimeStamp); + var vtq = new Vtq(pvItemValue, timestamp, quality); + + // We don't have the address from the COM callback — the reference code + // looks it up from _subscriptionsByHandle. For the v2 design, the + // SubscriptionManager's global handler receives (address, vtq) via + // OnTagValueChanged. The actual address resolution will be implemented + // when the full subscription tracking is wired up on windev. + + // Route to the SubscriptionManager's global handler + OnTagValueChanged?.Invoke(phItemHandle.ToString(), vtq); + } + catch (Exception ex) + { + Log.Error(ex, "Error processing OnDataChange event for handle {Handle}", phItemHandle); + } + } + + /// + /// COM event handler for MxAccess OnWriteComplete events. + /// Signature matches the ArchestrA.MxAccess ILMXProxyServerEvents interface. + /// + private void OnWriteComplete( + int hLMXServerHandle, + int phItemHandle, + ref MXSTATUS_PROXY[] ItemStatus) + { + // Write completion is currently fire-and-forget. + // Log for diagnostics. + try + { + Log.Debug("WriteCompleted: handle {Handle}", phItemHandle); + } + catch (Exception ex) + { + Log.Error(ex, "Error processing OnWriteComplete event for handle {Handle}", phItemHandle); + } + } + + /// + /// Converts a timestamp object to DateTime in UTC. + /// + private static DateTime ConvertTimestamp(object timestamp) + { + if (timestamp is DateTime dt) + { + return dt.Kind == DateTimeKind.Utc ? dt : dt.ToUniversalTime(); + } + + return DateTime.UtcNow; + } + } +} diff --git a/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/MxAccess/MxAccessClient.ReadWrite.cs b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/MxAccess/MxAccessClient.ReadWrite.cs new file mode 100644 index 0000000..ce92eb0 --- /dev/null +++ b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/MxAccess/MxAccessClient.ReadWrite.cs @@ -0,0 +1,183 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using Serilog; +using ZB.MOM.WW.LmxProxy.Host.Domain; + +namespace ZB.MOM.WW.LmxProxy.Host.MxAccess +{ + public sealed partial class MxAccessClient + { + /// + /// Reads a single tag value from MxAccess. + /// Dispatched to STA thread with semaphore concurrency control. + /// + public async Task ReadAsync(string address, CancellationToken ct = default) + { + if (!IsConnected) + return Vtq.New(null, Quality.Bad_NotConnected); + + await _readSemaphore.WaitAsync(ct); + try + { + return await _staThread.DispatchAsync(() => ReadInternal(address)); + } + catch (Exception ex) + { + Log.Error(ex, "ReadAsync failed for tag {Address}", address); + return Vtq.New(null, Quality.Bad_CommFailure); + } + finally + { + _readSemaphore.Release(); + } + } + + /// + /// Reads multiple tags with semaphore-controlled concurrency (max 10 concurrent). + /// Each tag is read independently. Partial failures return Bad quality for failed tags. + /// + public async Task> ReadBatchAsync( + IEnumerable addresses, CancellationToken ct = default) + { + var addressList = addresses.ToList(); + var results = new Dictionary(addressList.Count, StringComparer.OrdinalIgnoreCase); + + var tasks = addressList.Select(async address => + { + var vtq = await ReadAsync(address, ct); + return (address, vtq); + }); + + foreach (var task in await Task.WhenAll(tasks)) + { + results[task.address] = task.vtq; + } + + return results; + } + + /// + /// Writes a single tag value to MxAccess. + /// Value should be a native .NET type (not string). Uses TypedValueConverter + /// on the gRPC layer; here the value is the boxed .NET object. + /// + public async Task WriteAsync(string address, object value, CancellationToken ct = default) + { + if (!IsConnected) + throw new InvalidOperationException("Not connected to MxAccess"); + + await _writeSemaphore.WaitAsync(ct); + try + { + await _staThread.DispatchAsync(() => WriteInternal(address, value)); + } + finally + { + _writeSemaphore.Release(); + } + } + + /// + /// Writes multiple tag values with semaphore-controlled concurrency. + /// + public async Task WriteBatchAsync( + IReadOnlyDictionary values, CancellationToken ct = default) + { + var tasks = values.Select(async kvp => + { + await WriteAsync(kvp.Key, kvp.Value, ct); + }); + + await Task.WhenAll(tasks); + } + + /// + /// Writes a batch, then polls flagTag until it equals flagValue or timeout expires. + /// Uses type-aware comparison via TypedValueComparer. + /// + public async Task<(bool flagReached, int elapsedMs)> WriteBatchAndWaitAsync( + IReadOnlyDictionary values, + string flagTag, + object flagValue, + int timeoutMs, + int pollIntervalMs, + CancellationToken ct = default) + { + // Write all values first + await WriteBatchAsync(values, ct); + + // Poll flag tag + var sw = System.Diagnostics.Stopwatch.StartNew(); + var effectiveTimeout = timeoutMs > 0 ? timeoutMs : 5000; + var effectiveInterval = pollIntervalMs > 0 ? pollIntervalMs : 100; + + while (sw.ElapsedMilliseconds < effectiveTimeout) + { + ct.ThrowIfCancellationRequested(); + + var vtq = await ReadAsync(flagTag, ct); + if (vtq.Quality.IsGood() && TypedValueComparer.Equals(vtq.Value, flagValue)) + { + return (true, (int)sw.ElapsedMilliseconds); + } + + await Task.Delay(effectiveInterval, ct); + } + + return (false, (int)sw.ElapsedMilliseconds); + } + + // ── Internal COM calls (execute on STA thread) ────────── + + /// + /// Reads a single tag from MxAccess COM API. + /// Must be called on the STA thread. + /// + private Vtq ReadInternal(string address) + { + // The exact MxAccess COM API call depends on the ArchestrA.MXAccess interop assembly. + // Consult src-reference/Implementation/MxAccessClient.ReadWrite.cs for the exact + // method calls. MxAccess uses a subscribe-read-unsubscribe pattern for reads. + // + // For now, this throws NotImplementedException. The actual COM call will be + // implemented when testing on the windev machine with MxAccess available. + + throw new NotImplementedException( + "ReadInternal must be implemented using ArchestrA.MXAccess COM API. " + + "See src-reference/Implementation/MxAccessClient.ReadWrite.cs for the exact pattern."); + } + + /// + /// Writes a single tag via MxAccess COM API. + /// Must be called on the STA thread. + /// + private void WriteInternal(string address, object value) + { + // The exact COM call pattern uses AddItem, AdviseSupervisory, Write. + // Consult src-reference/Implementation/MxAccessClient.ReadWrite.cs for the exact method signature. + + throw new NotImplementedException( + "WriteInternal must be implemented using ArchestrA.MXAccess COM API. " + + "See src-reference/Implementation/MxAccessClient.ReadWrite.cs for the exact pattern."); + } + + /// + /// Maps an MxAccess OPC DA quality integer to the domain Quality enum. + /// The quality integer from MxAccess is the OPC DA quality byte. + /// + private static Quality MapQuality(int opcDaQuality) + { + // OPC DA quality is a byte value that directly maps to our Quality enum + if (Enum.IsDefined(typeof(Quality), (byte)opcDaQuality)) + return (Quality)(byte)opcDaQuality; + + // Fallback: use category bits + if (opcDaQuality >= 192) return Quality.Good; + if (opcDaQuality >= 64) return Quality.Uncertain; + return Quality.Bad; + } + } +} diff --git a/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/MxAccess/MxAccessClient.Subscription.cs b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/MxAccess/MxAccessClient.Subscription.cs new file mode 100644 index 0000000..5865139 --- /dev/null +++ b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/MxAccess/MxAccessClient.Subscription.cs @@ -0,0 +1,158 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using Serilog; +using ZB.MOM.WW.LmxProxy.Host.Domain; + +namespace ZB.MOM.WW.LmxProxy.Host.MxAccess +{ + public sealed partial class MxAccessClient + { + /// + /// Subscribes to value changes for the specified addresses. + /// Stores subscription state for reconnect replay. + /// + public async Task SubscribeAsync( + IEnumerable addresses, + Action callback, + CancellationToken ct = default) + { + if (!IsConnected) + throw new InvalidOperationException("Not connected to MxAccess"); + + var addressList = addresses.ToList(); + + await _staThread.DispatchAsync(() => + { + foreach (var address in addressList) + { + SubscribeInternal(address); + + // Store for reconnect replay + lock (_lock) + { + _storedSubscriptions[address] = callback; + } + } + }); + + Log.Information("Subscribed to {Count} tags", addressList.Count); + + return new SubscriptionHandle(this, addressList, callback); + } + + /// + /// Unsubscribes specific addresses. + /// + internal async Task UnsubscribeAsync(IEnumerable addresses) + { + var addressList = addresses.ToList(); + + await _staThread.DispatchAsync(() => + { + foreach (var address in addressList) + { + UnsubscribeInternal(address); + + lock (_lock) + { + _storedSubscriptions.Remove(address); + } + } + }); + + Log.Information("Unsubscribed from {Count} tags", addressList.Count); + } + + /// + /// Recreates all stored subscriptions after a reconnect. + /// Does not re-store them (they're already stored). + /// + private async Task RecreateStoredSubscriptionsAsync() + { + Dictionary> subscriptions; + lock (_lock) + { + if (_storedSubscriptions.Count == 0) return; + subscriptions = new Dictionary>(_storedSubscriptions); + } + + Log.Information("Recreating {Count} stored subscriptions after reconnect", subscriptions.Count); + + await _staThread.DispatchAsync(() => + { + foreach (var kvp in subscriptions) + { + try + { + SubscribeInternal(kvp.Key); + } + catch (Exception ex) + { + Log.Warning(ex, "Failed to recreate subscription for {Address}", kvp.Key); + } + } + }); + } + + // ── Internal COM calls (execute on STA thread) ────────── + + /// + /// Registers a tag subscription with MxAccess COM API (AddItem + AdviseSupervisory). + /// Must be called on the STA thread. + /// + private void SubscribeInternal(string address) + { + // The exact MxAccess COM API call is: + // var itemHandle = _lmxProxy.AddItem(_connectionHandle, address); + // _lmxProxy.AdviseSupervisory(_connectionHandle, itemHandle); + // + // Consult src-reference/Implementation/MxAccessClient.Subscription.cs + + throw new NotImplementedException( + "SubscribeInternal must be implemented using ArchestrA.MXAccess COM API. " + + "See src-reference/Implementation/MxAccessClient.Subscription.cs for the exact pattern."); + } + + /// + /// Unregisters a tag subscription from MxAccess COM API (UnAdvise + RemoveItem). + /// Must be called on the STA thread. + /// + private void UnsubscribeInternal(string address) + { + // The exact MxAccess COM API call is: + // _lmxProxy.UnAdvise(_connectionHandle, itemHandle); + // _lmxProxy.RemoveItem(_connectionHandle, itemHandle); + + throw new NotImplementedException( + "UnsubscribeInternal must be implemented using ArchestrA.MXAccess COM API."); + } + + /// + /// Disposable subscription handle that unsubscribes on disposal. + /// + private sealed class SubscriptionHandle : IAsyncDisposable + { + private readonly MxAccessClient _client; + private readonly List _addresses; + private readonly Action _callback; + private bool _disposed; + + public SubscriptionHandle(MxAccessClient client, List addresses, Action callback) + { + _client = client; + _addresses = addresses; + _callback = callback; + } + + public async ValueTask DisposeAsync() + { + if (_disposed) return; + _disposed = true; + await _client.UnsubscribeAsync(_addresses); + } + } + } +} diff --git a/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/MxAccess/MxAccessClient.cs b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/MxAccess/MxAccessClient.cs new file mode 100644 index 0000000..7168895 --- /dev/null +++ b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/MxAccess/MxAccessClient.cs @@ -0,0 +1,130 @@ +using System; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; +using ArchestrA.MxAccess; +using Serilog; +using ZB.MOM.WW.LmxProxy.Host.Domain; + +namespace ZB.MOM.WW.LmxProxy.Host.MxAccess +{ + /// + /// Wraps the ArchestrA MXAccess COM API. All COM operations + /// execute on a dedicated STA thread via . + /// + public sealed partial class MxAccessClient : IScadaClient + { + private static readonly ILogger Log = Serilog.Log.ForContext(); + + private readonly StaDispatchThread _staThread; + private readonly object _lock = new object(); + private readonly int _maxConcurrentOperations; + private readonly int _readTimeoutMs; + private readonly int _writeTimeoutMs; + private readonly int _monitorIntervalMs; + private readonly bool _autoReconnect; + private readonly string? _nodeName; + private readonly string? _galaxyName; + + private readonly SemaphoreSlim _readSemaphore; + private readonly SemaphoreSlim _writeSemaphore; + + // COM objects — only accessed on STA thread + private LMXProxyServer? _lmxProxy; + private int _connectionHandle; + + // State + private ConnectionState _connectionState = ConnectionState.Disconnected; + private DateTime _connectedSince; + private bool _disposed; + + // Reconnect + private CancellationTokenSource? _reconnectCts; + + // Stored subscriptions for reconnect replay + private readonly Dictionary> _storedSubscriptions + = new Dictionary>(StringComparer.OrdinalIgnoreCase); + + public MxAccessClient( + int maxConcurrentOperations = 10, + int readTimeoutSeconds = 5, + int writeTimeoutSeconds = 5, + int monitorIntervalSeconds = 5, + bool autoReconnect = true, + string? nodeName = null, + string? galaxyName = null) + { + _maxConcurrentOperations = maxConcurrentOperations; + _readTimeoutMs = readTimeoutSeconds * 1000; + _writeTimeoutMs = writeTimeoutSeconds * 1000; + _monitorIntervalMs = monitorIntervalSeconds * 1000; + _autoReconnect = autoReconnect; + _nodeName = nodeName; + _galaxyName = galaxyName; + + _readSemaphore = new SemaphoreSlim(maxConcurrentOperations, maxConcurrentOperations); + _writeSemaphore = new SemaphoreSlim(maxConcurrentOperations, maxConcurrentOperations); + _staThread = new StaDispatchThread(); + } + + public bool IsConnected + { + get + { + lock (_lock) + { + return _lmxProxy != null + && _connectionState == ConnectionState.Connected + && _connectionHandle > 0; + } + } + } + + public ConnectionState ConnectionState + { + get { lock (_lock) { return _connectionState; } } + } + + public event EventHandler? ConnectionStateChanged; + + private void SetState(ConnectionState newState, string? message = null) + { + ConnectionState previousState; + lock (_lock) + { + previousState = _connectionState; + _connectionState = newState; + } + + if (previousState != newState) + { + Log.Information("Connection state changed: {Previous} -> {Current} {Message}", + previousState, newState, message ?? ""); + ConnectionStateChanged?.Invoke(this, + new ConnectionStateChangedEventArgs(previousState, newState, message)); + } + } + + public async ValueTask DisposeAsync() + { + if (_disposed) return; + _disposed = true; + + _reconnectCts?.Cancel(); + + try + { + await DisconnectAsync(); + } + catch (Exception ex) + { + Log.Warning(ex, "Error during disposal disconnect"); + } + + _readSemaphore.Dispose(); + _writeSemaphore.Dispose(); + _staThread.Dispose(); + _reconnectCts?.Dispose(); + } + } +} diff --git a/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/MxAccess/StaDispatchThread.cs b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/MxAccess/StaDispatchThread.cs new file mode 100644 index 0000000..916162c --- /dev/null +++ b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/MxAccess/StaDispatchThread.cs @@ -0,0 +1,123 @@ +using System; +using System.Collections.Concurrent; +using System.Threading; +using System.Threading.Tasks; +using System.Windows.Forms; +using Serilog; + +namespace ZB.MOM.WW.LmxProxy.Host.MxAccess +{ + /// + /// Dedicated STA thread with a message pump for COM interop. + /// All COM operations are dispatched to this thread via a BlockingCollection. + /// + public sealed class StaDispatchThread : IDisposable + { + private static readonly ILogger Log = Serilog.Log.ForContext(); + + private readonly BlockingCollection _workQueue = new BlockingCollection(); + private readonly Thread _staThread; + private volatile bool _disposed; + + public StaDispatchThread(string threadName = "MxAccess-STA") + { + _staThread = new Thread(StaThreadLoop) + { + Name = threadName, + IsBackground = true + }; + _staThread.SetApartmentState(ApartmentState.STA); + _staThread.Start(); + Log.Information("STA dispatch thread '{ThreadName}' started", threadName); + } + + /// + /// Dispatches an action to the STA thread and returns a Task that completes + /// when the action finishes. + /// + public Task DispatchAsync(Action action) + { + if (_disposed) throw new ObjectDisposedException(nameof(StaDispatchThread)); + + var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + _workQueue.Add(() => + { + try + { + action(); + tcs.TrySetResult(true); + } + catch (Exception ex) + { + tcs.TrySetException(ex); + } + }); + return tcs.Task; + } + + /// + /// Dispatches a function to the STA thread and returns its result. + /// + public Task DispatchAsync(Func func) + { + if (_disposed) throw new ObjectDisposedException(nameof(StaDispatchThread)); + + var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + _workQueue.Add(() => + { + try + { + var result = func(); + tcs.TrySetResult(result); + } + catch (Exception ex) + { + tcs.TrySetException(ex); + } + }); + return tcs.Task; + } + + private void StaThreadLoop() + { + Log.Debug("STA thread loop started"); + + // Process the work queue. GetConsumingEnumerable blocks until + // items are available or the collection is marked complete. + foreach (var action in _workQueue.GetConsumingEnumerable()) + { + try + { + action(); + } + catch (Exception ex) + { + // Should not happen — actions set TCS exceptions internally. + Log.Error(ex, "Unhandled exception on STA thread"); + } + + // Pump COM messages between work items + Application.DoEvents(); + } + + Log.Debug("STA thread loop exited"); + } + + public void Dispose() + { + if (_disposed) return; + _disposed = true; + + _workQueue.CompleteAdding(); + + // Wait for the STA thread to drain and exit + if (_staThread.IsAlive && !_staThread.Join(TimeSpan.FromSeconds(10))) + { + Log.Warning("STA thread did not exit within 10 seconds"); + } + + _workQueue.Dispose(); + Log.Information("STA dispatch thread disposed"); + } + } +} diff --git a/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/Sessions/SessionManager.cs b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/Sessions/SessionManager.cs new file mode 100644 index 0000000..c0ef555 --- /dev/null +++ b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/Sessions/SessionManager.cs @@ -0,0 +1,154 @@ +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using Serilog; + +namespace ZB.MOM.WW.LmxProxy.Host.Sessions +{ + /// + /// Tracks active client sessions in memory. + /// Thread-safe via ConcurrentDictionary. + /// + public sealed class SessionManager : IDisposable + { + private static readonly ILogger Log = Serilog.Log.ForContext(); + + private readonly ConcurrentDictionary _sessions + = new ConcurrentDictionary(StringComparer.OrdinalIgnoreCase); + + private readonly Timer? _scavengingTimer; + private readonly TimeSpan _inactivityTimeout; + + /// + /// Creates a SessionManager with optional inactivity scavenging. + /// + /// + /// Sessions inactive for this many minutes are automatically terminated. + /// Set to 0 to disable scavenging. + /// + public SessionManager(int inactivityTimeoutMinutes = 5) + { + _inactivityTimeout = TimeSpan.FromMinutes(inactivityTimeoutMinutes); + + if (inactivityTimeoutMinutes > 0) + { + // Check every 60 seconds + _scavengingTimer = new Timer(ScavengeInactiveSessions, null, + TimeSpan.FromSeconds(60), TimeSpan.FromSeconds(60)); + } + } + + /// Gets the count of active sessions. + public int ActiveSessionCount => _sessions.Count; + + /// + /// Creates a new session. + /// Returns the 32-character hex GUID session ID. + /// + public string CreateSession(string clientId, string apiKey) + { + var sessionId = Guid.NewGuid().ToString("N"); // 32-char lowercase hex, no hyphens + var sessionInfo = new SessionInfo(sessionId, clientId, apiKey); + _sessions[sessionId] = sessionInfo; + + Log.Information("Session created: {SessionId} for client {ClientId}", sessionId, clientId); + return sessionId; + } + + /// + /// Validates a session ID. Updates LastActivity on success. + /// Returns true if the session exists. + /// + public bool ValidateSession(string sessionId) + { + if (_sessions.TryGetValue(sessionId, out var session)) + { + session.TouchLastActivity(); + return true; + } + return false; + } + + /// + /// Terminates a session. Returns true if the session existed. + /// + public bool TerminateSession(string sessionId) + { + if (_sessions.TryRemove(sessionId, out _)) + { + Log.Information("Session terminated: {SessionId}", sessionId); + return true; + } + return false; + } + + /// Gets session info by ID, or null if not found. + public SessionInfo? GetSession(string sessionId) + { + _sessions.TryGetValue(sessionId, out var session); + return session; + } + + /// Gets a snapshot of all active sessions. + public IReadOnlyList GetAllSessions() + { + return _sessions.Values.ToList().AsReadOnly(); + } + + /// + /// Scavenges sessions that have been inactive for longer than the timeout. + /// + private void ScavengeInactiveSessions(object? state) + { + if (_inactivityTimeout <= TimeSpan.Zero) return; + + var cutoff = DateTime.UtcNow - _inactivityTimeout; + var expired = _sessions.Where(kvp => kvp.Value.LastActivity < cutoff).ToList(); + + foreach (var kvp in expired) + { + if (_sessions.TryRemove(kvp.Key, out _)) + { + Log.Information("Session {SessionId} scavenged (inactive since {LastActivity})", + kvp.Key, kvp.Value.LastActivity); + } + } + } + + public void Dispose() + { + _scavengingTimer?.Dispose(); + _sessions.Clear(); + } + } + + /// + /// Information about an active client session. + /// + public class SessionInfo + { + public SessionInfo(string sessionId, string clientId, string apiKey) + { + SessionId = sessionId; + ClientId = clientId; + ApiKey = apiKey; + ConnectedAt = DateTime.UtcNow; + LastActivity = DateTime.UtcNow; + } + + public string SessionId { get; } + public string ClientId { get; } + public string ApiKey { get; } + public DateTime ConnectedAt { get; } + public DateTime LastActivity { get; private set; } + public long ConnectedSinceUtcTicks => ConnectedAt.Ticks; + + /// Updates the last activity timestamp to now. + public void TouchLastActivity() + { + LastActivity = DateTime.UtcNow; + } + } +} diff --git a/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/Subscriptions/SubscriptionManager.cs b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/Subscriptions/SubscriptionManager.cs new file mode 100644 index 0000000..ffacd92 --- /dev/null +++ b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/Subscriptions/SubscriptionManager.cs @@ -0,0 +1,244 @@ +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Channels; +using Serilog; +using ZB.MOM.WW.LmxProxy.Host.Domain; + +namespace ZB.MOM.WW.LmxProxy.Host.Subscriptions +{ + /// + /// Manages per-client subscription channels with shared MxAccess subscriptions. + /// Ref-counted tag subscriptions: first client creates, last client disposes. + /// + public sealed class SubscriptionManager : IDisposable + { + private static readonly ILogger Log = Serilog.Log.ForContext(); + + private readonly IScadaClient _scadaClient; + private readonly int _channelCapacity; + private readonly BoundedChannelFullMode _channelFullMode; + + // Client ID -> ClientSubscription + private readonly ConcurrentDictionary _clientSubscriptions + = new ConcurrentDictionary(StringComparer.OrdinalIgnoreCase); + + // Tag address -> TagSubscription (shared, ref-counted) + private readonly ConcurrentDictionary _tagSubscriptions + = new ConcurrentDictionary(StringComparer.OrdinalIgnoreCase); + + private readonly ReaderWriterLockSlim _rwLock = new ReaderWriterLockSlim(); + + public SubscriptionManager(IScadaClient scadaClient, int channelCapacity = 1000, + BoundedChannelFullMode channelFullMode = BoundedChannelFullMode.DropOldest) + { + _scadaClient = scadaClient; + _channelCapacity = channelCapacity; + _channelFullMode = channelFullMode; + } + + /// + /// Creates a subscription for a client. Returns a ChannelReader to stream from. + /// + public ChannelReader<(string address, Vtq vtq)> Subscribe( + string clientId, IEnumerable addresses, CancellationToken ct) + { + var channel = Channel.CreateBounded<(string address, Vtq vtq)>( + new BoundedChannelOptions(_channelCapacity) + { + FullMode = _channelFullMode, + SingleReader = true, + SingleWriter = false + }); + + var addressSet = new HashSet(addresses, StringComparer.OrdinalIgnoreCase); + var clientSub = new ClientSubscription(clientId, channel, addressSet); + + _clientSubscriptions[clientId] = clientSub; + + _rwLock.EnterWriteLock(); + try + { + foreach (var address in addressSet) + { + if (_tagSubscriptions.TryGetValue(address, out var tagSub)) + { + tagSub.ClientIds.Add(clientId); + } + else + { + _tagSubscriptions[address] = new TagSubscription(address, + new HashSet(StringComparer.OrdinalIgnoreCase) { clientId }); + } + } + } + finally + { + _rwLock.ExitWriteLock(); + } + + // Register cancellation cleanup + ct.Register(() => UnsubscribeClient(clientId)); + + Log.Information("Client {ClientId} subscribed to {Count} tags", clientId, addressSet.Count); + return channel.Reader; + } + + /// + /// Called from MxAccessClient's OnDataChange handler. + /// Fans out the update to all subscribed clients. + /// + public void OnTagValueChanged(string address, Vtq vtq) + { + _rwLock.EnterReadLock(); + HashSet? clientIds = null; + try + { + if (_tagSubscriptions.TryGetValue(address, out var tagSub)) + { + clientIds = new HashSet(tagSub.ClientIds); + } + } + finally + { + _rwLock.ExitReadLock(); + } + + if (clientIds == null || clientIds.Count == 0) return; + + foreach (var clientId in clientIds) + { + if (_clientSubscriptions.TryGetValue(clientId, out var clientSub)) + { + if (!clientSub.Channel.Writer.TryWrite((address, vtq))) + { + clientSub.IncrementDropped(); + Log.Debug("Dropped message for client {ClientId} on tag {Address} (channel full)", + clientId, address); + } + else + { + clientSub.IncrementDelivered(); + } + } + } + } + + /// + /// Removes a client's subscriptions and cleans up tag subscriptions + /// when the last client unsubscribes. + /// + public void UnsubscribeClient(string clientId) + { + if (!_clientSubscriptions.TryRemove(clientId, out var clientSub)) + return; + + _rwLock.EnterWriteLock(); + try + { + foreach (var address in clientSub.Addresses) + { + if (_tagSubscriptions.TryGetValue(address, out var tagSub)) + { + tagSub.ClientIds.Remove(clientId); + + // Last client unsubscribed — remove the tag subscription + if (tagSub.ClientIds.Count == 0) + { + _tagSubscriptions.TryRemove(address, out _); + } + } + } + } + finally + { + _rwLock.ExitWriteLock(); + } + + // Complete the channel (signals end of stream to the gRPC handler) + clientSub.Channel.Writer.TryComplete(); + + Log.Information("Client {ClientId} unsubscribed ({Delivered} delivered, {Dropped} dropped)", + clientId, clientSub.DeliveredCount, clientSub.DroppedCount); + } + + /// + /// Sends a bad-quality notification to all subscribed clients for all their tags. + /// Called when MxAccess disconnects. + /// + public void NotifyDisconnection() + { + var badVtq = Vtq.New(null, Quality.Bad_NotConnected); + + foreach (var kvp in _clientSubscriptions) + { + foreach (var address in kvp.Value.Addresses) + { + kvp.Value.Channel.Writer.TryWrite((address, badVtq)); + } + } + } + + /// Returns subscription statistics. + public SubscriptionStats GetStats() + { + return new SubscriptionStats( + _clientSubscriptions.Count, + _tagSubscriptions.Count, + _clientSubscriptions.Values.Sum(c => c.Addresses.Count)); + } + + public void Dispose() + { + foreach (var kvp in _clientSubscriptions) + { + kvp.Value.Channel.Writer.TryComplete(); + } + _clientSubscriptions.Clear(); + _tagSubscriptions.Clear(); + _rwLock.Dispose(); + } + + // ── Nested types ───────────────────────────────────────── + + private class ClientSubscription + { + public ClientSubscription(string clientId, + Channel<(string address, Vtq vtq)> channel, + HashSet addresses) + { + ClientId = clientId; + Channel = channel; + Addresses = addresses; + } + + public string ClientId { get; } + public Channel<(string address, Vtq vtq)> Channel { get; } + public HashSet Addresses { get; } + + // Use backing fields for Interlocked + private long _delivered; + private long _dropped; + + public long DeliveredCount => Interlocked.Read(ref _delivered); + public long DroppedCount => Interlocked.Read(ref _dropped); + + public void IncrementDelivered() => Interlocked.Increment(ref _delivered); + public void IncrementDropped() => Interlocked.Increment(ref _dropped); + } + + private class TagSubscription + { + public TagSubscription(string address, HashSet clientIds) + { + Address = address; + ClientIds = clientIds; + } + + public string Address { get; } + public HashSet ClientIds { get; } + } + } +} diff --git a/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/ZB.MOM.WW.LmxProxy.Host.csproj b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/ZB.MOM.WW.LmxProxy.Host.csproj index bbf30a3..d20063b 100644 --- a/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/ZB.MOM.WW.LmxProxy.Host.csproj +++ b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/ZB.MOM.WW.LmxProxy.Host.csproj @@ -46,6 +46,7 @@ ..\..\lib\ArchestrA.MXAccess.dll true + diff --git a/lmxproxy/tests/ZB.MOM.WW.LmxProxy.Host.Tests/MxAccess/StaDispatchThreadTests.cs b/lmxproxy/tests/ZB.MOM.WW.LmxProxy.Host.Tests/MxAccess/StaDispatchThreadTests.cs new file mode 100644 index 0000000..7f60949 --- /dev/null +++ b/lmxproxy/tests/ZB.MOM.WW.LmxProxy.Host.Tests/MxAccess/StaDispatchThreadTests.cs @@ -0,0 +1,86 @@ +using System; +using System.Threading; +using System.Threading.Tasks; +using FluentAssertions; +using Xunit; +using ZB.MOM.WW.LmxProxy.Host.MxAccess; + +namespace ZB.MOM.WW.LmxProxy.Host.Tests.MxAccess +{ + public class StaDispatchThreadTests + { + [Fact] + public async Task DispatchAsync_ExecutesOnStaThread() + { + using var sta = new StaDispatchThread("Test-STA"); + var threadId = await sta.DispatchAsync(() => Thread.CurrentThread.ManagedThreadId); + threadId.Should().NotBe(Thread.CurrentThread.ManagedThreadId); + } + + [Fact] + public async Task DispatchAsync_ReturnsResult() + { + using var sta = new StaDispatchThread("Test-STA"); + var result = await sta.DispatchAsync(() => 42); + result.Should().Be(42); + } + + [Fact] + public async Task DispatchAsync_PropagatesException() + { + using var sta = new StaDispatchThread("Test-STA"); + Func act = () => sta.DispatchAsync(() => throw new InvalidOperationException("test error")); + await act.Should().ThrowAsync().WithMessage("test error"); + } + + [Fact] + public async Task DispatchAsync_Action_Completes() + { + using var sta = new StaDispatchThread("Test-STA"); + int value = 0; + await sta.DispatchAsync(() => { value = 99; }); + value.Should().Be(99); + } + + [Fact] + public void Dispose_CompletesGracefully() + { + var sta = new StaDispatchThread("Test-STA"); + sta.Dispose(); // Should not throw + } + + [Fact] + public void DispatchAfterDispose_ThrowsObjectDisposedException() + { + var sta = new StaDispatchThread("Test-STA"); + sta.Dispose(); + Func act = () => sta.DispatchAsync(() => 42); + act.Should().ThrowAsync(); + } + + [Fact] + public async Task MultipleDispatches_ExecuteInOrder() + { + using var sta = new StaDispatchThread("Test-STA"); + var results = new System.Collections.Concurrent.ConcurrentBag(); + + var tasks = new Task[10]; + for (int i = 0; i < 10; i++) + { + int idx = i; + tasks[i] = sta.DispatchAsync(() => { results.Add(idx); }); + } + + await Task.WhenAll(tasks); + results.Count.Should().Be(10); + } + + [Fact] + public async Task StaThread_HasStaApartmentState() + { + using var sta = new StaDispatchThread("Test-STA"); + var apartmentState = await sta.DispatchAsync(() => Thread.CurrentThread.GetApartmentState()); + apartmentState.Should().Be(ApartmentState.STA); + } + } +} diff --git a/lmxproxy/tests/ZB.MOM.WW.LmxProxy.Host.Tests/MxAccess/TypedValueEqualsTests.cs b/lmxproxy/tests/ZB.MOM.WW.LmxProxy.Host.Tests/MxAccess/TypedValueEqualsTests.cs new file mode 100644 index 0000000..980ab27 --- /dev/null +++ b/lmxproxy/tests/ZB.MOM.WW.LmxProxy.Host.Tests/MxAccess/TypedValueEqualsTests.cs @@ -0,0 +1,78 @@ +using FluentAssertions; +using Xunit; +using ZB.MOM.WW.LmxProxy.Host.Domain; + +namespace ZB.MOM.WW.LmxProxy.Host.Tests.MxAccess +{ + public class TypedValueEqualsTests + { + [Fact] + public void NullEqualsNull() => TypedValueComparer.Equals(null, null).Should().BeTrue(); + + [Fact] + public void NullNotEqualsValue() => TypedValueComparer.Equals(null, 42).Should().BeFalse(); + + [Fact] + public void ValueNotEqualsNull() => TypedValueComparer.Equals(42, null).Should().BeFalse(); + + [Fact] + public void SameTypeAndValue() => TypedValueComparer.Equals(42.5, 42.5).Should().BeTrue(); + + [Fact] + public void SameTypeDifferentValue() => TypedValueComparer.Equals(42.5, 43.0).Should().BeFalse(); + + [Fact] + public void DifferentTypes_NeverEqual() => TypedValueComparer.Equals(1, 1.0).Should().BeFalse(); + + [Fact] + public void BoolTrue() => TypedValueComparer.Equals(true, true).Should().BeTrue(); + + [Fact] + public void BoolFalse() => TypedValueComparer.Equals(false, true).Should().BeFalse(); + + [Fact] + public void String_CaseSensitive() + { + TypedValueComparer.Equals("DONE", "DONE").Should().BeTrue(); + TypedValueComparer.Equals("done", "DONE").Should().BeFalse(); + } + + [Fact] + public void Array_SameElements() + { + TypedValueComparer.Equals(new[] { 1, 2, 3 }, new[] { 1, 2, 3 }).Should().BeTrue(); + } + + [Fact] + public void Array_DifferentElements() + { + TypedValueComparer.Equals(new[] { 1, 2, 3 }, new[] { 1, 2, 4 }).Should().BeFalse(); + } + + [Fact] + public void Array_DifferentLengths() + { + TypedValueComparer.Equals(new[] { 1, 2 }, new[] { 1, 2, 3 }).Should().BeFalse(); + } + + [Fact] + public void Int32_NotEqual_ToDouble() + { + TypedValueComparer.Equals(1, 1.0).Should().BeFalse(); + } + + [Fact] + public void Long_Equality() + { + TypedValueComparer.Equals(long.MaxValue, long.MaxValue).Should().BeTrue(); + } + + [Fact] + public void DateTime_TickPrecision() + { + var dt1 = new System.DateTime(638789000000000000, System.DateTimeKind.Utc); + var dt2 = new System.DateTime(638789000000000000, System.DateTimeKind.Utc); + TypedValueComparer.Equals(dt1, dt2).Should().BeTrue(); + } + } +} diff --git a/lmxproxy/tests/ZB.MOM.WW.LmxProxy.Host.Tests/Sessions/SessionManagerTests.cs b/lmxproxy/tests/ZB.MOM.WW.LmxProxy.Host.Tests/Sessions/SessionManagerTests.cs new file mode 100644 index 0000000..9000ef4 --- /dev/null +++ b/lmxproxy/tests/ZB.MOM.WW.LmxProxy.Host.Tests/Sessions/SessionManagerTests.cs @@ -0,0 +1,148 @@ +using System; +using System.Threading; +using System.Threading.Tasks; +using FluentAssertions; +using Xunit; +using ZB.MOM.WW.LmxProxy.Host.Sessions; + +namespace ZB.MOM.WW.LmxProxy.Host.Tests.Sessions +{ + public class SessionManagerTests + { + [Fact] + public void CreateSession_Returns32CharHexId() + { + using var sm = new SessionManager(inactivityTimeoutMinutes: 0); + var id = sm.CreateSession("client1", "key1"); + id.Should().HaveLength(32); + id.Should().MatchRegex("^[0-9a-f]{32}$"); + } + + [Fact] + public void CreateSession_IncrementsCount() + { + using var sm = new SessionManager(inactivityTimeoutMinutes: 0); + sm.ActiveSessionCount.Should().Be(0); + sm.CreateSession("c1", "k1"); + sm.ActiveSessionCount.Should().Be(1); + sm.CreateSession("c2", "k2"); + sm.ActiveSessionCount.Should().Be(2); + } + + [Fact] + public void ValidateSession_ReturnsTrueForExistingSession() + { + using var sm = new SessionManager(inactivityTimeoutMinutes: 0); + var id = sm.CreateSession("c1", "k1"); + sm.ValidateSession(id).Should().BeTrue(); + } + + [Fact] + public void ValidateSession_ReturnsFalseForUnknownSession() + { + using var sm = new SessionManager(inactivityTimeoutMinutes: 0); + sm.ValidateSession("nonexistent").Should().BeFalse(); + } + + [Fact] + public void ValidateSession_UpdatesLastActivity() + { + using var sm = new SessionManager(inactivityTimeoutMinutes: 0); + var id = sm.CreateSession("c1", "k1"); + var session = sm.GetSession(id); + var initialActivity = session!.LastActivity; + + Thread.Sleep(50); // Small delay to ensure time passes + sm.ValidateSession(id); + + session.LastActivity.Should().BeAfter(initialActivity); + } + + [Fact] + public void TerminateSession_RemovesSession() + { + using var sm = new SessionManager(inactivityTimeoutMinutes: 0); + var id = sm.CreateSession("c1", "k1"); + sm.TerminateSession(id).Should().BeTrue(); + sm.ActiveSessionCount.Should().Be(0); + sm.ValidateSession(id).Should().BeFalse(); + } + + [Fact] + public void TerminateSession_ReturnsFalseForUnknownSession() + { + using var sm = new SessionManager(inactivityTimeoutMinutes: 0); + sm.TerminateSession("nonexistent").Should().BeFalse(); + } + + [Fact] + public void GetSession_ReturnsNullForUnknown() + { + using var sm = new SessionManager(inactivityTimeoutMinutes: 0); + sm.GetSession("nonexistent").Should().BeNull(); + } + + [Fact] + public void GetSession_ReturnsCorrectInfo() + { + using var sm = new SessionManager(inactivityTimeoutMinutes: 0); + var id = sm.CreateSession("client-abc", "key-xyz"); + var session = sm.GetSession(id); + session.Should().NotBeNull(); + session!.ClientId.Should().Be("client-abc"); + session.ApiKey.Should().Be("key-xyz"); + session.SessionId.Should().Be(id); + session.ConnectedAt.Should().BeCloseTo(DateTime.UtcNow, TimeSpan.FromSeconds(2)); + } + + [Fact] + public void GetAllSessions_ReturnsSnapshot() + { + using var sm = new SessionManager(inactivityTimeoutMinutes: 0); + sm.CreateSession("c1", "k1"); + sm.CreateSession("c2", "k2"); + var all = sm.GetAllSessions(); + all.Should().HaveCount(2); + } + + [Fact] + public async Task ConcurrentAccess_IsThreadSafe() + { + using var sm = new SessionManager(inactivityTimeoutMinutes: 0); + var tasks = new Task[100]; + for (int i = 0; i < 100; i++) + { + int idx = i; + tasks[i] = Task.Run(() => + { + var id = sm.CreateSession($"client-{idx}", $"key-{idx}"); + sm.ValidateSession(id); + if (idx % 3 == 0) sm.TerminateSession(id); + }); + } + await Task.WhenAll(tasks); + + // Should have ~67 sessions remaining (100 - ~33 terminated) + sm.ActiveSessionCount.Should().BeInRange(60, 70); + } + + [Fact] + public void Dispose_ClearsAllSessions() + { + var sm = new SessionManager(inactivityTimeoutMinutes: 0); + sm.CreateSession("c1", "k1"); + sm.CreateSession("c2", "k2"); + sm.Dispose(); + sm.ActiveSessionCount.Should().Be(0); + } + + [Fact] + public void ConnectedSinceUtcTicks_ReturnsCorrectValue() + { + using var sm = new SessionManager(inactivityTimeoutMinutes: 0); + var id = sm.CreateSession("c1", "k1"); + var session = sm.GetSession(id); + session!.ConnectedSinceUtcTicks.Should().Be(session.ConnectedAt.Ticks); + } + } +} diff --git a/lmxproxy/tests/ZB.MOM.WW.LmxProxy.Host.Tests/Subscriptions/SubscriptionManagerTests.cs b/lmxproxy/tests/ZB.MOM.WW.LmxProxy.Host.Tests/Subscriptions/SubscriptionManagerTests.cs new file mode 100644 index 0000000..58cde93 --- /dev/null +++ b/lmxproxy/tests/ZB.MOM.WW.LmxProxy.Host.Tests/Subscriptions/SubscriptionManagerTests.cs @@ -0,0 +1,193 @@ +using System; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Channels; +using System.Threading.Tasks; +using FluentAssertions; +using Xunit; +using ZB.MOM.WW.LmxProxy.Host.Domain; +using ZB.MOM.WW.LmxProxy.Host.Subscriptions; + +namespace ZB.MOM.WW.LmxProxy.Host.Tests.Subscriptions +{ + public class SubscriptionManagerTests + { + /// Fake IScadaClient for testing (no COM dependency). + private class FakeScadaClient : IScadaClient + { + public bool IsConnected => true; + public ConnectionState ConnectionState => ConnectionState.Connected; + public event EventHandler? ConnectionStateChanged; + public Task ConnectAsync(CancellationToken ct = default) => Task.CompletedTask; + public Task DisconnectAsync(CancellationToken ct = default) => Task.CompletedTask; + public Task ReadAsync(string address, CancellationToken ct = default) => + Task.FromResult(Vtq.Good(42.0)); + public Task> ReadBatchAsync(IEnumerable addresses, CancellationToken ct = default) => + Task.FromResult>(new Dictionary()); + public Task WriteAsync(string address, object value, CancellationToken ct = default) => Task.CompletedTask; + public Task WriteBatchAsync(IReadOnlyDictionary values, CancellationToken ct = default) => Task.CompletedTask; + public Task<(bool flagReached, int elapsedMs)> WriteBatchAndWaitAsync( + IReadOnlyDictionary values, string flagTag, object flagValue, + int timeoutMs, int pollIntervalMs, CancellationToken ct = default) => + Task.FromResult((false, 0)); + public Task SubscribeAsync(IEnumerable addresses, Action callback, CancellationToken ct = default) => + Task.FromResult(new FakeSubscriptionHandle()); + public ValueTask DisposeAsync() => default; + + // Suppress unused event warning + internal void FireEvent() => ConnectionStateChanged?.Invoke(this, null!); + + private class FakeSubscriptionHandle : IAsyncDisposable + { + public ValueTask DisposeAsync() => default; + } + } + + [Fact] + public void Subscribe_ReturnsChannelReader() + { + using var sm = new SubscriptionManager(new FakeScadaClient()); + using var cts = new CancellationTokenSource(); + var reader = sm.Subscribe("client1", new[] { "Tag1", "Tag2" }, cts.Token); + reader.Should().NotBeNull(); + } + + [Fact] + public async Task OnTagValueChanged_FansOutToSubscribedClients() + { + using var sm = new SubscriptionManager(new FakeScadaClient()); + using var cts = new CancellationTokenSource(); + var reader = sm.Subscribe("client1", new[] { "Motor.Speed" }, cts.Token); + + var vtq = Vtq.Good(42.0); + sm.OnTagValueChanged("Motor.Speed", vtq); + + var result = await reader.ReadAsync(cts.Token); + result.address.Should().Be("Motor.Speed"); + result.vtq.Value.Should().Be(42.0); + result.vtq.Quality.Should().Be(Quality.Good); + } + + [Fact] + public async Task OnTagValueChanged_MultipleClients_BothReceive() + { + using var sm = new SubscriptionManager(new FakeScadaClient()); + using var cts = new CancellationTokenSource(); + var reader1 = sm.Subscribe("client1", new[] { "Motor.Speed" }, cts.Token); + var reader2 = sm.Subscribe("client2", new[] { "Motor.Speed" }, cts.Token); + + sm.OnTagValueChanged("Motor.Speed", Vtq.Good(99.0)); + + var r1 = await reader1.ReadAsync(cts.Token); + var r2 = await reader2.ReadAsync(cts.Token); + r1.vtq.Value.Should().Be(99.0); + r2.vtq.Value.Should().Be(99.0); + } + + [Fact] + public void OnTagValueChanged_NonSubscribedTag_NoDelivery() + { + using var sm = new SubscriptionManager(new FakeScadaClient()); + using var cts = new CancellationTokenSource(); + var reader = sm.Subscribe("client1", new[] { "Motor.Speed" }, cts.Token); + + sm.OnTagValueChanged("Motor.Torque", Vtq.Good(10.0)); + + // Channel should be empty + reader.TryRead(out _).Should().BeFalse(); + } + + [Fact] + public void UnsubscribeClient_CompletesChannel() + { + using var sm = new SubscriptionManager(new FakeScadaClient()); + using var cts = new CancellationTokenSource(); + var reader = sm.Subscribe("client1", new[] { "Motor.Speed" }, cts.Token); + + sm.UnsubscribeClient("client1"); + + // Channel should be completed + reader.Completion.IsCompleted.Should().BeTrue(); + } + + [Fact] + public void UnsubscribeClient_RemovesFromTagSubscriptions() + { + using var sm = new SubscriptionManager(new FakeScadaClient()); + using var cts = new CancellationTokenSource(); + sm.Subscribe("client1", new[] { "Motor.Speed" }, cts.Token); + + sm.UnsubscribeClient("client1"); + + var stats = sm.GetStats(); + stats.TotalClients.Should().Be(0); + stats.TotalTags.Should().Be(0); + } + + [Fact] + public void RefCounting_LastClientUnsubscribeRemovesTag() + { + using var sm = new SubscriptionManager(new FakeScadaClient()); + using var cts = new CancellationTokenSource(); + sm.Subscribe("client1", new[] { "Motor.Speed" }, cts.Token); + sm.Subscribe("client2", new[] { "Motor.Speed" }, cts.Token); + + sm.GetStats().TotalTags.Should().Be(1); + + sm.UnsubscribeClient("client1"); + sm.GetStats().TotalTags.Should().Be(1); // client2 still subscribed + + sm.UnsubscribeClient("client2"); + sm.GetStats().TotalTags.Should().Be(0); // last client gone + } + + [Fact] + public void NotifyDisconnection_SendsBadQualityToAll() + { + using var sm = new SubscriptionManager(new FakeScadaClient()); + using var cts = new CancellationTokenSource(); + var reader = sm.Subscribe("client1", new[] { "Motor.Speed", "Motor.Torque" }, cts.Token); + + sm.NotifyDisconnection(); + + // Should receive 2 bad quality messages + reader.TryRead(out var r1).Should().BeTrue(); + r1.vtq.Quality.Should().Be(Quality.Bad_NotConnected); + reader.TryRead(out var r2).Should().BeTrue(); + r2.vtq.Quality.Should().Be(Quality.Bad_NotConnected); + } + + [Fact] + public void Backpressure_DropOldest_DropsWhenFull() + { + using var sm = new SubscriptionManager(new FakeScadaClient(), channelCapacity: 3); + using var cts = new CancellationTokenSource(); + var reader = sm.Subscribe("client1", new[] { "Motor.Speed" }, cts.Token); + + // Fill the channel beyond capacity + for (int i = 0; i < 10; i++) + { + sm.OnTagValueChanged("Motor.Speed", Vtq.Good((double)i)); + } + + // Should have exactly 3 messages (capacity limit) + int count = 0; + while (reader.TryRead(out _)) count++; + count.Should().Be(3); + } + + [Fact] + public void GetStats_ReturnsCorrectCounts() + { + using var sm = new SubscriptionManager(new FakeScadaClient()); + using var cts = new CancellationTokenSource(); + sm.Subscribe("c1", new[] { "Tag1", "Tag2" }, cts.Token); + sm.Subscribe("c2", new[] { "Tag2", "Tag3" }, cts.Token); + + var stats = sm.GetStats(); + stats.TotalClients.Should().Be(2); + stats.TotalTags.Should().Be(3); // Tag1, Tag2, Tag3 + stats.ActiveSubscriptions.Should().Be(4); // c1:Tag1, c1:Tag2, c2:Tag2, c2:Tag3 + } + } +}