using System.Collections.Concurrent; using Microsoft.Extensions.Logging; using ZB.MOM.WW.ScadaBridge.Commons.Interfaces.Protocol; using ZB.MOM.WW.ScadaBridge.Commons.Serialization; using ZB.MOM.WW.ScadaBridge.Commons.Types.Enums; namespace ZB.MOM.WW.ScadaBridge.DataConnectionLayer.Adapters; /// /// MxGateway adapter implementing + . /// Maps IDataConnection concepts onto the MxAccess Gateway session model via the /// seam: /// /// Connect → OpenSession + Register, then a background event loop. /// Subscribe → AddItem + Advise; value changes arrive on the event stream. /// Read/Write → ReadBulk / WriteBulk. /// Browse → Galaxy repository BrowseChildren. /// /// Reconnection is driven by the DataConnectionActor: a stream fault raises /// , the actor disposes this adapter, creates a fresh one, /// reconnects and re-subscribes all tags. /// public class MxGatewayDataConnection : IDataConnection, IBrowsableDataConnection, IAlarmSubscribableConnection { private readonly IMxGatewayClientFactory _clientFactory; private readonly ILogger _logger; private IMxGatewayClient? _client; private ConnectionHealth _status = ConnectionHealth.Disconnected; private CancellationTokenSource? _eventLoopCts; // Native alarm feed: the gateway StreamAlarms RPC is session-less and // gateway-wide, so one shared feed serves the whole connection. The // DataConnectionActor routes transitions to instances by source reference, // so a single shared callback (the first registered) suffices; subscriptions // are ref-counted so the feed stops when the last one is removed. private CancellationTokenSource? _alarmCts; private int _alarmSubCount; private readonly object _alarmLock = new(); // subscriptionId → (tagPath, callback) so the event loop can route updates by tag, // plus tagPath → subscriptionId for reverse lookup. Concurrent because the event // loop reads from a background thread while Subscribe/Unsubscribe mutate. private readonly ConcurrentDictionary _subs = new(); private readonly ConcurrentDictionary _tagToSub = new(); // DataConnectionLayer mirror of OpcUaDataConnection's once-only guard: an int toggled // with Interlocked.Exchange so only the first caller raises Disconnected. // 0 = not fired, 1 = fired. Reset on (re)connect. private int _disconnectFired; /// Initializes a new instance of . /// Factory used to create gateway client instances. /// Logger instance. public MxGatewayDataConnection(IMxGatewayClientFactory clientFactory, ILogger logger) { _clientFactory = clientFactory; _logger = logger; } /// public ConnectionHealth Status => _status; /// Raised once when the gateway event stream faults (connection lost). public event Action? Disconnected; /// public async Task ConnectAsync(IDictionary connectionDetails, CancellationToken cancellationToken = default) { var cfg = MxGatewayEndpointConfigSerializer.FromFlatDict(connectionDetails); Interlocked.Exchange(ref _disconnectFired, 0); // reset guard on (re)connect, like OPC UA _client = _clientFactory.Create(); await _client.ConnectAsync(new MxGatewayConnectionOptions( cfg.Endpoint, cfg.ApiKey, string.IsNullOrWhiteSpace(cfg.ClientName) ? "scadabridge" : cfg.ClientName, cfg.WriteUserId, cfg.UseTls, string.IsNullOrWhiteSpace(cfg.CaFile) ? null : cfg.CaFile, string.IsNullOrWhiteSpace(cfg.ServerName) ? null : cfg.ServerName, cfg.ReadTimeoutMs), cancellationToken); _status = ConnectionHealth.Connected; // Background event loop: route each value change to the matching subscription callback. _eventLoopCts = new CancellationTokenSource(); _ = Task.Run(() => RunEventLoopAsync(_eventLoopCts.Token)); } private async Task RunEventLoopAsync(CancellationToken ct) { try { await _client!.RunEventLoopAsync(update => { if (_tagToSub.TryGetValue(update.TagPath, out var subId) && _subs.TryGetValue(subId, out var s)) s.Callback(update.TagPath, new TagValue(update.Value, update.Quality, update.Timestamp)); }, ct); } catch (OperationCanceledException) { // Normal shutdown (DisconnectAsync / DisposeAsync cancelled the loop). } catch (Exception ex) { _logger.LogWarning(ex, "MxGateway event stream faulted; signalling disconnect"); RaiseDisconnected(); } } private void RaiseDisconnected() { if (Interlocked.Exchange(ref _disconnectFired, 1) == 0) { _status = ConnectionHealth.Disconnected; Disconnected?.Invoke(); } } /// public async Task DisconnectAsync(CancellationToken cancellationToken = default) { _eventLoopCts?.Cancel(); lock (_alarmLock) { _alarmCts?.Cancel(); _alarmCts?.Dispose(); _alarmCts = null; _alarmSubCount = 0; } if (_client is not null) await _client.DisconnectAsync(cancellationToken); _status = ConnectionHealth.Disconnected; } /// public async Task SubscribeAsync(string tagPath, SubscriptionCallback callback, CancellationToken cancellationToken = default) { var subId = await _client!.SubscribeAsync(tagPath, cancellationToken); _subs[subId] = (tagPath, callback); _tagToSub[tagPath] = subId; return subId; } /// public async Task UnsubscribeAsync(string subscriptionId, CancellationToken cancellationToken = default) { if (_subs.TryRemove(subscriptionId, out var s)) _tagToSub.TryRemove(s.TagPath, out _); await _client!.UnsubscribeAsync(subscriptionId, cancellationToken); } /// public Task SubscribeAlarmsAsync( string sourceReference, string? conditionFilter, AlarmTransitionCallback callback, CancellationToken cancellationToken = default) { lock (_alarmLock) { _alarmSubCount++; if (_alarmCts == null) { _alarmCts = new CancellationTokenSource(); var token = _alarmCts.Token; var client = _client!; // Gateway-wide feed (null prefix); the actor filters per source reference. _ = Task.Run(() => client.RunAlarmStreamAsync(null, t => callback(t), token), token); } } return Task.FromResult(Guid.NewGuid().ToString()); } /// public Task UnsubscribeAlarmsAsync(string subscriptionId, CancellationToken cancellationToken = default) { lock (_alarmLock) { if (_alarmSubCount > 0) _alarmSubCount--; if (_alarmSubCount == 0) { _alarmCts?.Cancel(); _alarmCts?.Dispose(); _alarmCts = null; } } return Task.CompletedTask; } /// public async Task ReadAsync(string tagPath, CancellationToken cancellationToken = default) { var r = (await _client!.ReadAsync(new[] { tagPath }, cancellationToken)).Single(); return ToReadResult(r); } /// public async Task> ReadBatchAsync(IEnumerable tagPaths, CancellationToken cancellationToken = default) { var list = tagPaths.ToList(); var results = await _client!.ReadAsync(list, cancellationToken); return results.ToDictionary(r => r.TagPath, ToReadResult); } private static ReadResult ToReadResult(MxReadOutcome r) => r.Success ? new ReadResult(true, new TagValue(r.Value, r.Quality, r.Timestamp), null) : new ReadResult(false, null, r.Error); /// public async Task WriteAsync(string tagPath, object? value, CancellationToken cancellationToken = default) { var w = (await _client!.WriteAsync(new[] { (tagPath, value) }, cancellationToken)).Single(); return new WriteResult(w.Success, w.Error); } /// public async Task> WriteBatchAsync(IDictionary values, CancellationToken cancellationToken = default) { var results = await _client!.WriteAsync(values.Select(kv => (kv.Key, kv.Value)).ToList(), cancellationToken); return results.ToDictionary(w => w.TagPath, w => new WriteResult(w.Success, w.Error)); } /// public async Task WriteBatchAndWaitAsync( IDictionary values, string flagPath, object? flagValue, string responsePath, object? responseValue, TimeSpan timeout, CancellationToken cancellationToken = default) { await WriteBatchAsync(values, cancellationToken); await WriteAsync(flagPath, flagValue, cancellationToken); using var timeoutCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); timeoutCts.CancelAfter(timeout); try { while (!timeoutCts.IsCancellationRequested) { var r = await ReadAsync(responsePath, timeoutCts.Token); // r.Value is a TagValue wrapper; compare its underlying scalar. String // projection tolerates numeric type differences across the gRPC boundary. if (r.Success && string.Equals(r.Value?.Value?.ToString(), responseValue?.ToString(), StringComparison.Ordinal)) return true; await Task.Delay(TimeSpan.FromMilliseconds(200), timeoutCts.Token); } } catch (OperationCanceledException) when (!cancellationToken.IsCancellationRequested) { // Timeout elapsed (the linked CTS, not the caller's token) — fall through to false. } return false; } /// public async Task BrowseChildrenAsync(string? parentNodeId, CancellationToken cancellationToken = default) { if (_status != ConnectionHealth.Connected || _client is null) throw new ConnectionNotConnectedException($"MxGateway connection is not connected (status: {_status})."); var (children, truncated) = await _client.BrowseChildrenAsync(parentNodeId, cancellationToken); var nodes = children .Select(c => new BrowseNode(c.NodeId, c.DisplayName, c.NodeClass, c.HasChildren)) .ToList(); return new BrowseChildrenResult(nodes, truncated); } /// public async ValueTask DisposeAsync() { _eventLoopCts?.Cancel(); if (_client is not null) await _client.DisposeAsync(); GC.SuppressFinalize(this); } }