diff --git a/src/ZB.MOM.WW.ScadaBridge.DataConnectionLayer/Adapters/MxGatewayDataConnection.cs b/src/ZB.MOM.WW.ScadaBridge.DataConnectionLayer/Adapters/MxGatewayDataConnection.cs new file mode 100644 index 00000000..79d9b8e2 --- /dev/null +++ b/src/ZB.MOM.WW.ScadaBridge.DataConnectionLayer/Adapters/MxGatewayDataConnection.cs @@ -0,0 +1,220 @@ +using System.Collections.Concurrent; +using Microsoft.Extensions.Logging; +using ZB.MOM.WW.ScadaBridge.Commons.Interfaces.Protocol; +using ZB.MOM.WW.ScadaBridge.Commons.Serialization; +using ZB.MOM.WW.ScadaBridge.Commons.Types.Enums; + +namespace ZB.MOM.WW.ScadaBridge.DataConnectionLayer.Adapters; + +/// +/// MxGateway adapter implementing + . +/// Maps IDataConnection concepts onto the MxAccess Gateway session model via the +/// seam: +/// +/// Connect → OpenSession + Register, then a background event loop. +/// Subscribe → AddItem + Advise; value changes arrive on the event stream. +/// Read/Write → ReadBulk / WriteBulk. +/// Browse → Galaxy repository BrowseChildren. +/// +/// Reconnection is driven by the DataConnectionActor: a stream fault raises +/// , the actor disposes this adapter, creates a fresh one, +/// reconnects and re-subscribes all tags. +/// +public class MxGatewayDataConnection : IDataConnection, IBrowsableDataConnection +{ + private readonly IMxGatewayClientFactory _clientFactory; + private readonly ILogger _logger; + private IMxGatewayClient? _client; + private ConnectionHealth _status = ConnectionHealth.Disconnected; + private CancellationTokenSource? _eventLoopCts; + + // subscriptionId → (tagPath, callback) so the event loop can route updates by tag, + // plus tagPath → subscriptionId for reverse lookup. Concurrent because the event + // loop reads from a background thread while Subscribe/Unsubscribe mutate. + private readonly ConcurrentDictionary _subs = new(); + private readonly ConcurrentDictionary _tagToSub = new(); + + // DataConnectionLayer mirror of OpcUaDataConnection's once-only guard: an int toggled + // with Interlocked.Exchange so only the first caller raises Disconnected. + // 0 = not fired, 1 = fired. Reset on (re)connect. + private int _disconnectFired; + + /// Initializes a new instance of . + /// Factory used to create gateway client instances. + /// Logger instance. + public MxGatewayDataConnection(IMxGatewayClientFactory clientFactory, ILogger logger) + { + _clientFactory = clientFactory; + _logger = logger; + } + + /// + public ConnectionHealth Status => _status; + + /// Raised once when the gateway event stream faults (connection lost). + public event Action? Disconnected; + + /// + public async Task ConnectAsync(IDictionary connectionDetails, CancellationToken cancellationToken = default) + { + var cfg = MxGatewayEndpointConfigSerializer.FromFlatDict(connectionDetails); + Interlocked.Exchange(ref _disconnectFired, 0); // reset guard on (re)connect, like OPC UA + _client = _clientFactory.Create(); + + await _client.ConnectAsync(new MxGatewayConnectionOptions( + cfg.Endpoint, + cfg.ApiKey, + string.IsNullOrWhiteSpace(cfg.ClientName) ? "scadabridge" : cfg.ClientName, + cfg.WriteUserId, + cfg.UseTls, + string.IsNullOrWhiteSpace(cfg.CaFile) ? null : cfg.CaFile, + string.IsNullOrWhiteSpace(cfg.ServerName) ? null : cfg.ServerName, + cfg.ReadTimeoutMs), cancellationToken); + + _status = ConnectionHealth.Connected; + + // Background event loop: route each value change to the matching subscription callback. + _eventLoopCts = new CancellationTokenSource(); + _ = Task.Run(() => RunEventLoopAsync(_eventLoopCts.Token)); + } + + private async Task RunEventLoopAsync(CancellationToken ct) + { + try + { + await _client!.RunEventLoopAsync(update => + { + if (_tagToSub.TryGetValue(update.TagPath, out var subId) && _subs.TryGetValue(subId, out var s)) + s.Callback(update.TagPath, new TagValue(update.Value, update.Quality, update.Timestamp)); + }, ct); + } + catch (OperationCanceledException) + { + // Normal shutdown (DisconnectAsync / DisposeAsync cancelled the loop). + } + catch (Exception ex) + { + _logger.LogWarning(ex, "MxGateway event stream faulted; signalling disconnect"); + RaiseDisconnected(); + } + } + + private void RaiseDisconnected() + { + if (Interlocked.Exchange(ref _disconnectFired, 1) == 0) + { + _status = ConnectionHealth.Disconnected; + Disconnected?.Invoke(); + } + } + + /// + public async Task DisconnectAsync(CancellationToken cancellationToken = default) + { + _eventLoopCts?.Cancel(); + if (_client is not null) + await _client.DisconnectAsync(cancellationToken); + _status = ConnectionHealth.Disconnected; + } + + /// + public async Task SubscribeAsync(string tagPath, SubscriptionCallback callback, CancellationToken cancellationToken = default) + { + var subId = await _client!.SubscribeAsync(tagPath, cancellationToken); + _subs[subId] = (tagPath, callback); + _tagToSub[tagPath] = subId; + return subId; + } + + /// + public async Task UnsubscribeAsync(string subscriptionId, CancellationToken cancellationToken = default) + { + if (_subs.TryRemove(subscriptionId, out var s)) + _tagToSub.TryRemove(s.TagPath, out _); + await _client!.UnsubscribeAsync(subscriptionId, cancellationToken); + } + + /// + public async Task ReadAsync(string tagPath, CancellationToken cancellationToken = default) + { + var r = (await _client!.ReadAsync(new[] { tagPath }, cancellationToken)).Single(); + return ToReadResult(r); + } + + /// + public async Task> ReadBatchAsync(IEnumerable tagPaths, CancellationToken cancellationToken = default) + { + var list = tagPaths.ToList(); + var results = await _client!.ReadAsync(list, cancellationToken); + return results.ToDictionary(r => r.TagPath, ToReadResult); + } + + private static ReadResult ToReadResult(MxReadOutcome r) => r.Success + ? new ReadResult(true, new TagValue(r.Value, r.Quality, r.Timestamp), null) + : new ReadResult(false, null, r.Error); + + /// + public async Task WriteAsync(string tagPath, object? value, CancellationToken cancellationToken = default) + { + var w = (await _client!.WriteAsync(new[] { (tagPath, value) }, cancellationToken)).Single(); + return new WriteResult(w.Success, w.Error); + } + + /// + public async Task> WriteBatchAsync(IDictionary values, CancellationToken cancellationToken = default) + { + var results = await _client!.WriteAsync(values.Select(kv => (kv.Key, kv.Value)).ToList(), cancellationToken); + return results.ToDictionary(w => w.TagPath, w => new WriteResult(w.Success, w.Error)); + } + + /// + public async Task WriteBatchAndWaitAsync( + IDictionary values, string flagPath, object? flagValue, + string responsePath, object? responseValue, TimeSpan timeout, CancellationToken cancellationToken = default) + { + await WriteBatchAsync(values, cancellationToken); + await WriteAsync(flagPath, flagValue, cancellationToken); + + using var timeoutCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); + timeoutCts.CancelAfter(timeout); + try + { + while (!timeoutCts.IsCancellationRequested) + { + var r = await ReadAsync(responsePath, timeoutCts.Token); + // r.Value is a TagValue wrapper; compare its underlying scalar. String + // projection tolerates numeric type differences across the gRPC boundary. + if (r.Success && string.Equals(r.Value?.Value?.ToString(), responseValue?.ToString(), StringComparison.Ordinal)) + return true; + await Task.Delay(TimeSpan.FromMilliseconds(200), timeoutCts.Token); + } + } + catch (OperationCanceledException) when (!cancellationToken.IsCancellationRequested) + { + // Timeout elapsed (the linked CTS, not the caller's token) — fall through to false. + } + return false; + } + + /// + public async Task BrowseChildrenAsync(string? parentNodeId, CancellationToken cancellationToken = default) + { + if (_status != ConnectionHealth.Connected || _client is null) + throw new ConnectionNotConnectedException($"MxGateway connection is not connected (status: {_status})."); + + var (children, truncated) = await _client.BrowseChildrenAsync(parentNodeId, cancellationToken); + var nodes = children + .Select(c => new BrowseNode(c.NodeId, c.DisplayName, c.NodeClass, c.HasChildren)) + .ToList(); + return new BrowseChildrenResult(nodes, truncated); + } + + /// + public async ValueTask DisposeAsync() + { + _eventLoopCts?.Cancel(); + if (_client is not null) + await _client.DisposeAsync(); + GC.SuppressFinalize(this); + } +} diff --git a/tests/ZB.MOM.WW.ScadaBridge.DataConnectionLayer.Tests/Adapters/FakeMxGatewayClient.cs b/tests/ZB.MOM.WW.ScadaBridge.DataConnectionLayer.Tests/Adapters/FakeMxGatewayClient.cs new file mode 100644 index 00000000..1118dedc --- /dev/null +++ b/tests/ZB.MOM.WW.ScadaBridge.DataConnectionLayer.Tests/Adapters/FakeMxGatewayClient.cs @@ -0,0 +1,65 @@ +using ZB.MOM.WW.ScadaBridge.DataConnectionLayer.Adapters; + +namespace ZB.MOM.WW.ScadaBridge.DataConnectionLayer.Tests.Adapters; + +/// +/// In-memory fake for adapter unit tests. Lets tests +/// drive the event loop (push updates / fault the stream) and stub read/write/browse. +/// +public sealed class FakeMxGatewayClient : IMxGatewayClient, IMxGatewayClientFactory +{ + public MxGatewayConnectionOptions? ConnectedWith; + public readonly List Subscribed = new(); + public readonly List Unsubscribed = new(); + public readonly TaskCompletionSource EventLoopGate = new(TaskCreationOptions.RunContinuationsAsynchronously); + public Action? OnUpdate; + public Func, IReadOnlyList>? ReadHandler; + public Func, IReadOnlyList>? WriteHandler; + public Func, bool)>? BrowseHandler; + private int _nextHandle; + + public IMxGatewayClient Create() => this; + + public Task ConnectAsync(MxGatewayConnectionOptions o, CancellationToken ct = default) + { + ConnectedWith = o; + return Task.CompletedTask; + } + + public Task DisconnectAsync(CancellationToken ct = default) => Task.CompletedTask; + + public Task SubscribeAsync(string tag, CancellationToken ct = default) + { + var id = (++_nextHandle).ToString(); + Subscribed.Add(tag); + return Task.FromResult(id); + } + + public Task UnsubscribeAsync(string id, CancellationToken ct = default) + { + Unsubscribed.Add(id); + return Task.CompletedTask; + } + + public Task> ReadAsync(IReadOnlyList tags, CancellationToken ct = default) + => Task.FromResult(ReadHandler!(tags)); + + public Task> WriteAsync(IReadOnlyList<(string TagPath, object? Value)> w, CancellationToken ct = default) + => Task.FromResult(WriteHandler!(w)); + + public Task<(IReadOnlyList Children, bool Truncated)> BrowseChildrenAsync(string? p, CancellationToken ct = default) + => Task.FromResult(BrowseHandler!(p)); + + public async Task RunEventLoopAsync(Action onUpdate, CancellationToken ct = default) + { + OnUpdate = onUpdate; + using var reg = ct.Register(() => EventLoopGate.TrySetResult()); + await EventLoopGate.Task; // test completes this to end the loop… + ct.ThrowIfCancellationRequested(); // …or FaultEventLoop() faults it to simulate a stream break + } + + public ValueTask DisposeAsync() => ValueTask.CompletedTask; + + /// Simulate a stream break so the adapter raises Disconnected. + public void FaultEventLoop() => EventLoopGate.TrySetException(new Exception("stream broke")); +} diff --git a/tests/ZB.MOM.WW.ScadaBridge.DataConnectionLayer.Tests/Adapters/MxGatewayDataConnectionTests.cs b/tests/ZB.MOM.WW.ScadaBridge.DataConnectionLayer.Tests/Adapters/MxGatewayDataConnectionTests.cs new file mode 100644 index 00000000..a6548105 --- /dev/null +++ b/tests/ZB.MOM.WW.ScadaBridge.DataConnectionLayer.Tests/Adapters/MxGatewayDataConnectionTests.cs @@ -0,0 +1,259 @@ +using Microsoft.Extensions.Logging.Abstractions; +using ZB.MOM.WW.ScadaBridge.Commons.Interfaces.Protocol; +using ZB.MOM.WW.ScadaBridge.Commons.Types.Enums; +using ZB.MOM.WW.ScadaBridge.DataConnectionLayer.Adapters; + +namespace ZB.MOM.WW.ScadaBridge.DataConnectionLayer.Tests.Adapters; + +public class MxGatewayDataConnectionTests +{ + private static MxGatewayDataConnection NewAdapter(FakeMxGatewayClient fake) => + new(fake, NullLogger.Instance); + + private static Dictionary Details(int writeUserId = 0) => new() + { + ["Endpoint"] = "http://gw:5000", + ["ApiKey"] = "key", + ["ClientName"] = "client-a", + ["WriteUserId"] = writeUserId.ToString(), + ["ReadTimeoutMs"] = "2000", + }; + + // ── Task 6: connect / status / Disconnected ── + + [Fact] + public async Task ConnectAsync_resolves_options_and_sets_status_connected() + { + var fake = new FakeMxGatewayClient(); + var adapter = NewAdapter(fake); + + await adapter.ConnectAsync(Details(writeUserId: 7)); + + Assert.Equal(ConnectionHealth.Connected, adapter.Status); + Assert.NotNull(fake.ConnectedWith); + Assert.Equal("http://gw:5000", fake.ConnectedWith!.Endpoint); + Assert.Equal("client-a", fake.ConnectedWith.ClientName); + Assert.Equal(7, fake.ConnectedWith.WriteUserId); + } + + [Fact] + public async Task ConnectAsync_blank_client_name_defaults_to_scadabridge() + { + var fake = new FakeMxGatewayClient(); + var adapter = NewAdapter(fake); + var details = Details(); + details["ClientName"] = ""; + + await adapter.ConnectAsync(details); + + Assert.Equal("scadabridge", fake.ConnectedWith!.ClientName); + } + + [Fact] + public async Task Disconnected_fires_exactly_once_when_event_loop_faults() + { + var fake = new FakeMxGatewayClient(); + var adapter = NewAdapter(fake); + int raised = 0; + adapter.Disconnected += () => Interlocked.Increment(ref raised); + + await adapter.ConnectAsync(Details()); + // Wait for the event loop to attach. + await WaitUntil(() => fake.OnUpdate is not null); + + fake.FaultEventLoop(); + await WaitUntil(() => raised >= 1); + + Assert.Equal(1, raised); + Assert.Equal(ConnectionHealth.Disconnected, adapter.Status); + } + + // ── Task 7: subscribe / unsubscribe + event routing ── + + [Fact] + public async Task Subscribed_tag_update_invokes_callback_with_mapped_TagValue() + { + var fake = new FakeMxGatewayClient(); + var adapter = NewAdapter(fake); + await adapter.ConnectAsync(Details()); + await WaitUntil(() => fake.OnUpdate is not null); + + TagValue? received = null; + await adapter.SubscribeAsync("Area.Pump.Speed", (_, v) => received = v); + Assert.Contains("Area.Pump.Speed", fake.Subscribed); + + var ts = DateTimeOffset.UtcNow; + fake.OnUpdate!(new MxValueUpdate("Area.Pump.Speed", 42.0, QualityCode.Good, ts)); + + Assert.NotNull(received); + Assert.Equal(42.0, received!.Value); + Assert.Equal(QualityCode.Good, received.Quality); + Assert.Equal(ts, received.Timestamp); + } + + [Fact] + public async Task Unsubscribe_stops_routing_updates() + { + var fake = new FakeMxGatewayClient(); + var adapter = NewAdapter(fake); + await adapter.ConnectAsync(Details()); + await WaitUntil(() => fake.OnUpdate is not null); + + int hits = 0; + var subId = await adapter.SubscribeAsync("T", (_, _) => hits++); + await adapter.UnsubscribeAsync(subId); + + fake.OnUpdate!(new MxValueUpdate("T", 1, QualityCode.Good, DateTimeOffset.UtcNow)); + + Assert.Equal(0, hits); + Assert.Contains(subId, fake.Unsubscribed); + } + + // ── Task 8: read / write + error classification ── + + [Fact] + public async Task ReadAsync_maps_success_and_failure() + { + var fake = new FakeMxGatewayClient + { + ReadHandler = tags => tags.Select(t => t == "ok" + ? new MxReadOutcome(t, true, 5, QualityCode.Good, DateTimeOffset.UtcNow, null) + : new MxReadOutcome(t, false, null, QualityCode.Bad, default, "bad tag")).ToList() + }; + var adapter = NewAdapter(fake); + await adapter.ConnectAsync(Details()); + + var ok = await adapter.ReadAsync("ok"); + Assert.True(ok.Success); + Assert.Equal(5, ok.Value!.Value); + + var bad = await adapter.ReadAsync("nope"); + Assert.False(bad.Success); + Assert.Equal("bad tag", bad.ErrorMessage); + } + + [Fact] + public async Task ReadBatchAsync_returns_dictionary_keyed_by_tag() + { + var fake = new FakeMxGatewayClient + { + ReadHandler = tags => tags.Select(t => + new MxReadOutcome(t, true, t.Length, QualityCode.Good, DateTimeOffset.UtcNow, null)).ToList() + }; + var adapter = NewAdapter(fake); + await adapter.ConnectAsync(Details()); + + var results = await adapter.ReadBatchAsync(new[] { "aa", "bbb" }); + Assert.Equal(2, results["aa"].Value!.Value); + Assert.Equal(3, results["bbb"].Value!.Value); + } + + [Fact] + public async Task WriteAsync_maps_failure_to_unsuccessful_WriteResult() + { + var fake = new FakeMxGatewayClient + { + WriteHandler = writes => writes.Select(w => + new MxWriteOutcome(w.TagPath, false, "rejected")).ToList() + }; + var adapter = NewAdapter(fake); + await adapter.ConnectAsync(Details()); + + var r = await adapter.WriteAsync("T", 1); + Assert.False(r.Success); + Assert.Equal("rejected", r.ErrorMessage); + } + + // ── Task 9: WriteBatchAndWait ── + + [Fact] + public async Task WriteBatchAndWait_returns_true_when_response_matches() + { + var writeCalls = new List(); + var fake = new FakeMxGatewayClient + { + WriteHandler = writes => + { + writeCalls.AddRange(writes.Select(w => w.TagPath)); + return writes.Select(w => new MxWriteOutcome(w.TagPath, true, null)).ToList(); + }, + // Response path already reads the expected value. + ReadHandler = tags => tags.Select(t => + new MxReadOutcome(t, true, "DONE", QualityCode.Good, DateTimeOffset.UtcNow, null)).ToList() + }; + var adapter = NewAdapter(fake); + await adapter.ConnectAsync(Details()); + + var ok = await adapter.WriteBatchAndWaitAsync( + new Dictionary { ["V"] = 1 }, "Flag", 1, "Resp", "DONE", + TimeSpan.FromSeconds(5)); + + Assert.True(ok); + // Values written before the flag, and the flag itself. + Assert.Contains("V", writeCalls); + Assert.Contains("Flag", writeCalls); + } + + [Fact] + public async Task WriteBatchAndWait_returns_false_on_timeout() + { + var fake = new FakeMxGatewayClient + { + WriteHandler = writes => writes.Select(w => new MxWriteOutcome(w.TagPath, true, null)).ToList(), + ReadHandler = tags => tags.Select(t => + new MxReadOutcome(t, true, "NEVER", QualityCode.Good, DateTimeOffset.UtcNow, null)).ToList() + }; + var adapter = NewAdapter(fake); + await adapter.ConnectAsync(Details()); + + var ok = await adapter.WriteBatchAndWaitAsync( + new Dictionary { ["V"] = 1 }, "Flag", 1, "Resp", "DONE", + TimeSpan.FromMilliseconds(300)); + + Assert.False(ok); + } + + // ── Task 10: browse ── + + [Fact] + public async Task BrowseChildrenAsync_maps_children_and_truncated() + { + var fake = new FakeMxGatewayClient + { + BrowseHandler = _ => (new List + { + new("Area1", "Area1", BrowseNodeClass.Object, true), + new("Area1.Pump.Speed", "Speed", BrowseNodeClass.Variable, false), + }, true) + }; + var adapter = NewAdapter(fake); + await adapter.ConnectAsync(Details()); + + var result = await adapter.BrowseChildrenAsync(null); + + Assert.True(result.Truncated); + Assert.Equal(2, result.Children.Count); + Assert.Equal(BrowseNodeClass.Object, result.Children[0].NodeClass); + Assert.True(result.Children[0].HasChildren); + Assert.Equal("Area1.Pump.Speed", result.Children[1].NodeId); + Assert.Equal(BrowseNodeClass.Variable, result.Children[1].NodeClass); + } + + [Fact] + public async Task BrowseChildrenAsync_throws_when_not_connected() + { + var fake = new FakeMxGatewayClient(); + var adapter = NewAdapter(fake); + // Not connected. + await Assert.ThrowsAsync( + () => adapter.BrowseChildrenAsync(null)); + } + + private static async Task WaitUntil(Func condition, int timeoutMs = 2000) + { + var sw = System.Diagnostics.Stopwatch.StartNew(); + while (!condition() && sw.ElapsedMilliseconds < timeoutMs) + await Task.Delay(10); + Assert.True(condition(), "Condition not met within timeout."); + } +}