feat(dcl): MxGatewayDataConnection adapter (connect/subscribe/read/write/wait/browse)
Implements IDataConnection + IBrowsableDataConnection over the IMxGatewayClient seam: connect/disconnect with once-only Disconnected guard + background event loop, subscribe/unsubscribe with tag routing, read/write batch with per-tag error classification, WriteBatchAndWait, and Galaxy browse mapping. Covers plan Tasks 6-10. Full unit coverage via FakeMxGatewayClient (12 tests).
This commit is contained in:
@@ -0,0 +1,220 @@
|
||||
using System.Collections.Concurrent;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using ZB.MOM.WW.ScadaBridge.Commons.Interfaces.Protocol;
|
||||
using ZB.MOM.WW.ScadaBridge.Commons.Serialization;
|
||||
using ZB.MOM.WW.ScadaBridge.Commons.Types.Enums;
|
||||
|
||||
namespace ZB.MOM.WW.ScadaBridge.DataConnectionLayer.Adapters;
|
||||
|
||||
/// <summary>
|
||||
/// MxGateway adapter implementing <see cref="IDataConnection"/> + <see cref="IBrowsableDataConnection"/>.
|
||||
/// Maps IDataConnection concepts onto the MxAccess Gateway session model via the
|
||||
/// <see cref="IMxGatewayClient"/> seam:
|
||||
/// <list type="bullet">
|
||||
/// <item>Connect → OpenSession + Register, then a background event loop.</item>
|
||||
/// <item>Subscribe → AddItem + Advise; value changes arrive on the event stream.</item>
|
||||
/// <item>Read/Write → ReadBulk / WriteBulk.</item>
|
||||
/// <item>Browse → Galaxy repository BrowseChildren.</item>
|
||||
/// </list>
|
||||
/// Reconnection is driven by the <c>DataConnectionActor</c>: a stream fault raises
|
||||
/// <see cref="Disconnected"/>, the actor disposes this adapter, creates a fresh one,
|
||||
/// reconnects and re-subscribes all tags.
|
||||
/// </summary>
|
||||
public class MxGatewayDataConnection : IDataConnection, IBrowsableDataConnection
|
||||
{
|
||||
private readonly IMxGatewayClientFactory _clientFactory;
|
||||
private readonly ILogger<MxGatewayDataConnection> _logger;
|
||||
private IMxGatewayClient? _client;
|
||||
private ConnectionHealth _status = ConnectionHealth.Disconnected;
|
||||
private CancellationTokenSource? _eventLoopCts;
|
||||
|
||||
// subscriptionId → (tagPath, callback) so the event loop can route updates by tag,
|
||||
// plus tagPath → subscriptionId for reverse lookup. Concurrent because the event
|
||||
// loop reads from a background thread while Subscribe/Unsubscribe mutate.
|
||||
private readonly ConcurrentDictionary<string, (string TagPath, SubscriptionCallback Callback)> _subs = new();
|
||||
private readonly ConcurrentDictionary<string, string> _tagToSub = new();
|
||||
|
||||
// DataConnectionLayer mirror of OpcUaDataConnection's once-only guard: an int toggled
|
||||
// with Interlocked.Exchange so only the first caller raises Disconnected.
|
||||
// 0 = not fired, 1 = fired. Reset on (re)connect.
|
||||
private int _disconnectFired;
|
||||
|
||||
/// <summary>Initializes a new instance of <see cref="MxGatewayDataConnection"/>.</summary>
|
||||
/// <param name="clientFactory">Factory used to create gateway client instances.</param>
|
||||
/// <param name="logger">Logger instance.</param>
|
||||
public MxGatewayDataConnection(IMxGatewayClientFactory clientFactory, ILogger<MxGatewayDataConnection> logger)
|
||||
{
|
||||
_clientFactory = clientFactory;
|
||||
_logger = logger;
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
public ConnectionHealth Status => _status;
|
||||
|
||||
/// <summary>Raised once when the gateway event stream faults (connection lost).</summary>
|
||||
public event Action? Disconnected;
|
||||
|
||||
/// <inheritdoc />
|
||||
public async Task ConnectAsync(IDictionary<string, string> connectionDetails, CancellationToken cancellationToken = default)
|
||||
{
|
||||
var cfg = MxGatewayEndpointConfigSerializer.FromFlatDict(connectionDetails);
|
||||
Interlocked.Exchange(ref _disconnectFired, 0); // reset guard on (re)connect, like OPC UA
|
||||
_client = _clientFactory.Create();
|
||||
|
||||
await _client.ConnectAsync(new MxGatewayConnectionOptions(
|
||||
cfg.Endpoint,
|
||||
cfg.ApiKey,
|
||||
string.IsNullOrWhiteSpace(cfg.ClientName) ? "scadabridge" : cfg.ClientName,
|
||||
cfg.WriteUserId,
|
||||
cfg.UseTls,
|
||||
string.IsNullOrWhiteSpace(cfg.CaFile) ? null : cfg.CaFile,
|
||||
string.IsNullOrWhiteSpace(cfg.ServerName) ? null : cfg.ServerName,
|
||||
cfg.ReadTimeoutMs), cancellationToken);
|
||||
|
||||
_status = ConnectionHealth.Connected;
|
||||
|
||||
// Background event loop: route each value change to the matching subscription callback.
|
||||
_eventLoopCts = new CancellationTokenSource();
|
||||
_ = Task.Run(() => RunEventLoopAsync(_eventLoopCts.Token));
|
||||
}
|
||||
|
||||
private async Task RunEventLoopAsync(CancellationToken ct)
|
||||
{
|
||||
try
|
||||
{
|
||||
await _client!.RunEventLoopAsync(update =>
|
||||
{
|
||||
if (_tagToSub.TryGetValue(update.TagPath, out var subId) && _subs.TryGetValue(subId, out var s))
|
||||
s.Callback(update.TagPath, new TagValue(update.Value, update.Quality, update.Timestamp));
|
||||
}, ct);
|
||||
}
|
||||
catch (OperationCanceledException)
|
||||
{
|
||||
// Normal shutdown (DisconnectAsync / DisposeAsync cancelled the loop).
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogWarning(ex, "MxGateway event stream faulted; signalling disconnect");
|
||||
RaiseDisconnected();
|
||||
}
|
||||
}
|
||||
|
||||
private void RaiseDisconnected()
|
||||
{
|
||||
if (Interlocked.Exchange(ref _disconnectFired, 1) == 0)
|
||||
{
|
||||
_status = ConnectionHealth.Disconnected;
|
||||
Disconnected?.Invoke();
|
||||
}
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
public async Task DisconnectAsync(CancellationToken cancellationToken = default)
|
||||
{
|
||||
_eventLoopCts?.Cancel();
|
||||
if (_client is not null)
|
||||
await _client.DisconnectAsync(cancellationToken);
|
||||
_status = ConnectionHealth.Disconnected;
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
public async Task<string> SubscribeAsync(string tagPath, SubscriptionCallback callback, CancellationToken cancellationToken = default)
|
||||
{
|
||||
var subId = await _client!.SubscribeAsync(tagPath, cancellationToken);
|
||||
_subs[subId] = (tagPath, callback);
|
||||
_tagToSub[tagPath] = subId;
|
||||
return subId;
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
public async Task UnsubscribeAsync(string subscriptionId, CancellationToken cancellationToken = default)
|
||||
{
|
||||
if (_subs.TryRemove(subscriptionId, out var s))
|
||||
_tagToSub.TryRemove(s.TagPath, out _);
|
||||
await _client!.UnsubscribeAsync(subscriptionId, cancellationToken);
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
public async Task<ReadResult> ReadAsync(string tagPath, CancellationToken cancellationToken = default)
|
||||
{
|
||||
var r = (await _client!.ReadAsync(new[] { tagPath }, cancellationToken)).Single();
|
||||
return ToReadResult(r);
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
public async Task<IReadOnlyDictionary<string, ReadResult>> ReadBatchAsync(IEnumerable<string> tagPaths, CancellationToken cancellationToken = default)
|
||||
{
|
||||
var list = tagPaths.ToList();
|
||||
var results = await _client!.ReadAsync(list, cancellationToken);
|
||||
return results.ToDictionary(r => r.TagPath, ToReadResult);
|
||||
}
|
||||
|
||||
private static ReadResult ToReadResult(MxReadOutcome r) => r.Success
|
||||
? new ReadResult(true, new TagValue(r.Value, r.Quality, r.Timestamp), null)
|
||||
: new ReadResult(false, null, r.Error);
|
||||
|
||||
/// <inheritdoc />
|
||||
public async Task<WriteResult> WriteAsync(string tagPath, object? value, CancellationToken cancellationToken = default)
|
||||
{
|
||||
var w = (await _client!.WriteAsync(new[] { (tagPath, value) }, cancellationToken)).Single();
|
||||
return new WriteResult(w.Success, w.Error);
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
public async Task<IReadOnlyDictionary<string, WriteResult>> WriteBatchAsync(IDictionary<string, object?> values, CancellationToken cancellationToken = default)
|
||||
{
|
||||
var results = await _client!.WriteAsync(values.Select(kv => (kv.Key, kv.Value)).ToList(), cancellationToken);
|
||||
return results.ToDictionary(w => w.TagPath, w => new WriteResult(w.Success, w.Error));
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
public async Task<bool> WriteBatchAndWaitAsync(
|
||||
IDictionary<string, object?> values, string flagPath, object? flagValue,
|
||||
string responsePath, object? responseValue, TimeSpan timeout, CancellationToken cancellationToken = default)
|
||||
{
|
||||
await WriteBatchAsync(values, cancellationToken);
|
||||
await WriteAsync(flagPath, flagValue, cancellationToken);
|
||||
|
||||
using var timeoutCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
|
||||
timeoutCts.CancelAfter(timeout);
|
||||
try
|
||||
{
|
||||
while (!timeoutCts.IsCancellationRequested)
|
||||
{
|
||||
var r = await ReadAsync(responsePath, timeoutCts.Token);
|
||||
// r.Value is a TagValue wrapper; compare its underlying scalar. String
|
||||
// projection tolerates numeric type differences across the gRPC boundary.
|
||||
if (r.Success && string.Equals(r.Value?.Value?.ToString(), responseValue?.ToString(), StringComparison.Ordinal))
|
||||
return true;
|
||||
await Task.Delay(TimeSpan.FromMilliseconds(200), timeoutCts.Token);
|
||||
}
|
||||
}
|
||||
catch (OperationCanceledException) when (!cancellationToken.IsCancellationRequested)
|
||||
{
|
||||
// Timeout elapsed (the linked CTS, not the caller's token) — fall through to false.
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
public async Task<BrowseChildrenResult> BrowseChildrenAsync(string? parentNodeId, CancellationToken cancellationToken = default)
|
||||
{
|
||||
if (_status != ConnectionHealth.Connected || _client is null)
|
||||
throw new ConnectionNotConnectedException($"MxGateway connection is not connected (status: {_status}).");
|
||||
|
||||
var (children, truncated) = await _client.BrowseChildrenAsync(parentNodeId, cancellationToken);
|
||||
var nodes = children
|
||||
.Select(c => new BrowseNode(c.NodeId, c.DisplayName, c.NodeClass, c.HasChildren))
|
||||
.ToList();
|
||||
return new BrowseChildrenResult(nodes, truncated);
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
public async ValueTask DisposeAsync()
|
||||
{
|
||||
_eventLoopCts?.Cancel();
|
||||
if (_client is not null)
|
||||
await _client.DisposeAsync();
|
||||
GC.SuppressFinalize(this);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,65 @@
|
||||
using ZB.MOM.WW.ScadaBridge.DataConnectionLayer.Adapters;
|
||||
|
||||
namespace ZB.MOM.WW.ScadaBridge.DataConnectionLayer.Tests.Adapters;
|
||||
|
||||
/// <summary>
|
||||
/// In-memory fake <see cref="IMxGatewayClient"/> for adapter unit tests. Lets tests
|
||||
/// drive the event loop (push updates / fault the stream) and stub read/write/browse.
|
||||
/// </summary>
|
||||
public sealed class FakeMxGatewayClient : IMxGatewayClient, IMxGatewayClientFactory
|
||||
{
|
||||
public MxGatewayConnectionOptions? ConnectedWith;
|
||||
public readonly List<string> Subscribed = new();
|
||||
public readonly List<string> Unsubscribed = new();
|
||||
public readonly TaskCompletionSource EventLoopGate = new(TaskCreationOptions.RunContinuationsAsynchronously);
|
||||
public Action<MxValueUpdate>? OnUpdate;
|
||||
public Func<IReadOnlyList<string>, IReadOnlyList<MxReadOutcome>>? ReadHandler;
|
||||
public Func<IReadOnlyList<(string TagPath, object? Value)>, IReadOnlyList<MxWriteOutcome>>? WriteHandler;
|
||||
public Func<string?, (IReadOnlyList<MxBrowseChild>, bool)>? BrowseHandler;
|
||||
private int _nextHandle;
|
||||
|
||||
public IMxGatewayClient Create() => this;
|
||||
|
||||
public Task ConnectAsync(MxGatewayConnectionOptions o, CancellationToken ct = default)
|
||||
{
|
||||
ConnectedWith = o;
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
public Task DisconnectAsync(CancellationToken ct = default) => Task.CompletedTask;
|
||||
|
||||
public Task<string> SubscribeAsync(string tag, CancellationToken ct = default)
|
||||
{
|
||||
var id = (++_nextHandle).ToString();
|
||||
Subscribed.Add(tag);
|
||||
return Task.FromResult(id);
|
||||
}
|
||||
|
||||
public Task UnsubscribeAsync(string id, CancellationToken ct = default)
|
||||
{
|
||||
Unsubscribed.Add(id);
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
public Task<IReadOnlyList<MxReadOutcome>> ReadAsync(IReadOnlyList<string> tags, CancellationToken ct = default)
|
||||
=> Task.FromResult(ReadHandler!(tags));
|
||||
|
||||
public Task<IReadOnlyList<MxWriteOutcome>> WriteAsync(IReadOnlyList<(string TagPath, object? Value)> w, CancellationToken ct = default)
|
||||
=> Task.FromResult(WriteHandler!(w));
|
||||
|
||||
public Task<(IReadOnlyList<MxBrowseChild> Children, bool Truncated)> BrowseChildrenAsync(string? p, CancellationToken ct = default)
|
||||
=> Task.FromResult(BrowseHandler!(p));
|
||||
|
||||
public async Task RunEventLoopAsync(Action<MxValueUpdate> onUpdate, CancellationToken ct = default)
|
||||
{
|
||||
OnUpdate = onUpdate;
|
||||
using var reg = ct.Register(() => EventLoopGate.TrySetResult());
|
||||
await EventLoopGate.Task; // test completes this to end the loop…
|
||||
ct.ThrowIfCancellationRequested(); // …or FaultEventLoop() faults it to simulate a stream break
|
||||
}
|
||||
|
||||
public ValueTask DisposeAsync() => ValueTask.CompletedTask;
|
||||
|
||||
/// <summary>Simulate a stream break so the adapter raises Disconnected.</summary>
|
||||
public void FaultEventLoop() => EventLoopGate.TrySetException(new Exception("stream broke"));
|
||||
}
|
||||
+259
@@ -0,0 +1,259 @@
|
||||
using Microsoft.Extensions.Logging.Abstractions;
|
||||
using ZB.MOM.WW.ScadaBridge.Commons.Interfaces.Protocol;
|
||||
using ZB.MOM.WW.ScadaBridge.Commons.Types.Enums;
|
||||
using ZB.MOM.WW.ScadaBridge.DataConnectionLayer.Adapters;
|
||||
|
||||
namespace ZB.MOM.WW.ScadaBridge.DataConnectionLayer.Tests.Adapters;
|
||||
|
||||
public class MxGatewayDataConnectionTests
|
||||
{
|
||||
private static MxGatewayDataConnection NewAdapter(FakeMxGatewayClient fake) =>
|
||||
new(fake, NullLogger<MxGatewayDataConnection>.Instance);
|
||||
|
||||
private static Dictionary<string, string> Details(int writeUserId = 0) => new()
|
||||
{
|
||||
["Endpoint"] = "http://gw:5000",
|
||||
["ApiKey"] = "key",
|
||||
["ClientName"] = "client-a",
|
||||
["WriteUserId"] = writeUserId.ToString(),
|
||||
["ReadTimeoutMs"] = "2000",
|
||||
};
|
||||
|
||||
// ── Task 6: connect / status / Disconnected ──
|
||||
|
||||
[Fact]
|
||||
public async Task ConnectAsync_resolves_options_and_sets_status_connected()
|
||||
{
|
||||
var fake = new FakeMxGatewayClient();
|
||||
var adapter = NewAdapter(fake);
|
||||
|
||||
await adapter.ConnectAsync(Details(writeUserId: 7));
|
||||
|
||||
Assert.Equal(ConnectionHealth.Connected, adapter.Status);
|
||||
Assert.NotNull(fake.ConnectedWith);
|
||||
Assert.Equal("http://gw:5000", fake.ConnectedWith!.Endpoint);
|
||||
Assert.Equal("client-a", fake.ConnectedWith.ClientName);
|
||||
Assert.Equal(7, fake.ConnectedWith.WriteUserId);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task ConnectAsync_blank_client_name_defaults_to_scadabridge()
|
||||
{
|
||||
var fake = new FakeMxGatewayClient();
|
||||
var adapter = NewAdapter(fake);
|
||||
var details = Details();
|
||||
details["ClientName"] = "";
|
||||
|
||||
await adapter.ConnectAsync(details);
|
||||
|
||||
Assert.Equal("scadabridge", fake.ConnectedWith!.ClientName);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Disconnected_fires_exactly_once_when_event_loop_faults()
|
||||
{
|
||||
var fake = new FakeMxGatewayClient();
|
||||
var adapter = NewAdapter(fake);
|
||||
int raised = 0;
|
||||
adapter.Disconnected += () => Interlocked.Increment(ref raised);
|
||||
|
||||
await adapter.ConnectAsync(Details());
|
||||
// Wait for the event loop to attach.
|
||||
await WaitUntil(() => fake.OnUpdate is not null);
|
||||
|
||||
fake.FaultEventLoop();
|
||||
await WaitUntil(() => raised >= 1);
|
||||
|
||||
Assert.Equal(1, raised);
|
||||
Assert.Equal(ConnectionHealth.Disconnected, adapter.Status);
|
||||
}
|
||||
|
||||
// ── Task 7: subscribe / unsubscribe + event routing ──
|
||||
|
||||
[Fact]
|
||||
public async Task Subscribed_tag_update_invokes_callback_with_mapped_TagValue()
|
||||
{
|
||||
var fake = new FakeMxGatewayClient();
|
||||
var adapter = NewAdapter(fake);
|
||||
await adapter.ConnectAsync(Details());
|
||||
await WaitUntil(() => fake.OnUpdate is not null);
|
||||
|
||||
TagValue? received = null;
|
||||
await adapter.SubscribeAsync("Area.Pump.Speed", (_, v) => received = v);
|
||||
Assert.Contains("Area.Pump.Speed", fake.Subscribed);
|
||||
|
||||
var ts = DateTimeOffset.UtcNow;
|
||||
fake.OnUpdate!(new MxValueUpdate("Area.Pump.Speed", 42.0, QualityCode.Good, ts));
|
||||
|
||||
Assert.NotNull(received);
|
||||
Assert.Equal(42.0, received!.Value);
|
||||
Assert.Equal(QualityCode.Good, received.Quality);
|
||||
Assert.Equal(ts, received.Timestamp);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Unsubscribe_stops_routing_updates()
|
||||
{
|
||||
var fake = new FakeMxGatewayClient();
|
||||
var adapter = NewAdapter(fake);
|
||||
await adapter.ConnectAsync(Details());
|
||||
await WaitUntil(() => fake.OnUpdate is not null);
|
||||
|
||||
int hits = 0;
|
||||
var subId = await adapter.SubscribeAsync("T", (_, _) => hits++);
|
||||
await adapter.UnsubscribeAsync(subId);
|
||||
|
||||
fake.OnUpdate!(new MxValueUpdate("T", 1, QualityCode.Good, DateTimeOffset.UtcNow));
|
||||
|
||||
Assert.Equal(0, hits);
|
||||
Assert.Contains(subId, fake.Unsubscribed);
|
||||
}
|
||||
|
||||
// ── Task 8: read / write + error classification ──
|
||||
|
||||
[Fact]
|
||||
public async Task ReadAsync_maps_success_and_failure()
|
||||
{
|
||||
var fake = new FakeMxGatewayClient
|
||||
{
|
||||
ReadHandler = tags => tags.Select(t => t == "ok"
|
||||
? new MxReadOutcome(t, true, 5, QualityCode.Good, DateTimeOffset.UtcNow, null)
|
||||
: new MxReadOutcome(t, false, null, QualityCode.Bad, default, "bad tag")).ToList()
|
||||
};
|
||||
var adapter = NewAdapter(fake);
|
||||
await adapter.ConnectAsync(Details());
|
||||
|
||||
var ok = await adapter.ReadAsync("ok");
|
||||
Assert.True(ok.Success);
|
||||
Assert.Equal(5, ok.Value!.Value);
|
||||
|
||||
var bad = await adapter.ReadAsync("nope");
|
||||
Assert.False(bad.Success);
|
||||
Assert.Equal("bad tag", bad.ErrorMessage);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task ReadBatchAsync_returns_dictionary_keyed_by_tag()
|
||||
{
|
||||
var fake = new FakeMxGatewayClient
|
||||
{
|
||||
ReadHandler = tags => tags.Select(t =>
|
||||
new MxReadOutcome(t, true, t.Length, QualityCode.Good, DateTimeOffset.UtcNow, null)).ToList()
|
||||
};
|
||||
var adapter = NewAdapter(fake);
|
||||
await adapter.ConnectAsync(Details());
|
||||
|
||||
var results = await adapter.ReadBatchAsync(new[] { "aa", "bbb" });
|
||||
Assert.Equal(2, results["aa"].Value!.Value);
|
||||
Assert.Equal(3, results["bbb"].Value!.Value);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task WriteAsync_maps_failure_to_unsuccessful_WriteResult()
|
||||
{
|
||||
var fake = new FakeMxGatewayClient
|
||||
{
|
||||
WriteHandler = writes => writes.Select(w =>
|
||||
new MxWriteOutcome(w.TagPath, false, "rejected")).ToList()
|
||||
};
|
||||
var adapter = NewAdapter(fake);
|
||||
await adapter.ConnectAsync(Details());
|
||||
|
||||
var r = await adapter.WriteAsync("T", 1);
|
||||
Assert.False(r.Success);
|
||||
Assert.Equal("rejected", r.ErrorMessage);
|
||||
}
|
||||
|
||||
// ── Task 9: WriteBatchAndWait ──
|
||||
|
||||
[Fact]
|
||||
public async Task WriteBatchAndWait_returns_true_when_response_matches()
|
||||
{
|
||||
var writeCalls = new List<string>();
|
||||
var fake = new FakeMxGatewayClient
|
||||
{
|
||||
WriteHandler = writes =>
|
||||
{
|
||||
writeCalls.AddRange(writes.Select(w => w.TagPath));
|
||||
return writes.Select(w => new MxWriteOutcome(w.TagPath, true, null)).ToList();
|
||||
},
|
||||
// Response path already reads the expected value.
|
||||
ReadHandler = tags => tags.Select(t =>
|
||||
new MxReadOutcome(t, true, "DONE", QualityCode.Good, DateTimeOffset.UtcNow, null)).ToList()
|
||||
};
|
||||
var adapter = NewAdapter(fake);
|
||||
await adapter.ConnectAsync(Details());
|
||||
|
||||
var ok = await adapter.WriteBatchAndWaitAsync(
|
||||
new Dictionary<string, object?> { ["V"] = 1 }, "Flag", 1, "Resp", "DONE",
|
||||
TimeSpan.FromSeconds(5));
|
||||
|
||||
Assert.True(ok);
|
||||
// Values written before the flag, and the flag itself.
|
||||
Assert.Contains("V", writeCalls);
|
||||
Assert.Contains("Flag", writeCalls);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task WriteBatchAndWait_returns_false_on_timeout()
|
||||
{
|
||||
var fake = new FakeMxGatewayClient
|
||||
{
|
||||
WriteHandler = writes => writes.Select(w => new MxWriteOutcome(w.TagPath, true, null)).ToList(),
|
||||
ReadHandler = tags => tags.Select(t =>
|
||||
new MxReadOutcome(t, true, "NEVER", QualityCode.Good, DateTimeOffset.UtcNow, null)).ToList()
|
||||
};
|
||||
var adapter = NewAdapter(fake);
|
||||
await adapter.ConnectAsync(Details());
|
||||
|
||||
var ok = await adapter.WriteBatchAndWaitAsync(
|
||||
new Dictionary<string, object?> { ["V"] = 1 }, "Flag", 1, "Resp", "DONE",
|
||||
TimeSpan.FromMilliseconds(300));
|
||||
|
||||
Assert.False(ok);
|
||||
}
|
||||
|
||||
// ── Task 10: browse ──
|
||||
|
||||
[Fact]
|
||||
public async Task BrowseChildrenAsync_maps_children_and_truncated()
|
||||
{
|
||||
var fake = new FakeMxGatewayClient
|
||||
{
|
||||
BrowseHandler = _ => (new List<MxBrowseChild>
|
||||
{
|
||||
new("Area1", "Area1", BrowseNodeClass.Object, true),
|
||||
new("Area1.Pump.Speed", "Speed", BrowseNodeClass.Variable, false),
|
||||
}, true)
|
||||
};
|
||||
var adapter = NewAdapter(fake);
|
||||
await adapter.ConnectAsync(Details());
|
||||
|
||||
var result = await adapter.BrowseChildrenAsync(null);
|
||||
|
||||
Assert.True(result.Truncated);
|
||||
Assert.Equal(2, result.Children.Count);
|
||||
Assert.Equal(BrowseNodeClass.Object, result.Children[0].NodeClass);
|
||||
Assert.True(result.Children[0].HasChildren);
|
||||
Assert.Equal("Area1.Pump.Speed", result.Children[1].NodeId);
|
||||
Assert.Equal(BrowseNodeClass.Variable, result.Children[1].NodeClass);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task BrowseChildrenAsync_throws_when_not_connected()
|
||||
{
|
||||
var fake = new FakeMxGatewayClient();
|
||||
var adapter = NewAdapter(fake);
|
||||
// Not connected.
|
||||
await Assert.ThrowsAsync<ConnectionNotConnectedException>(
|
||||
() => adapter.BrowseChildrenAsync(null));
|
||||
}
|
||||
|
||||
private static async Task WaitUntil(Func<bool> condition, int timeoutMs = 2000)
|
||||
{
|
||||
var sw = System.Diagnostics.Stopwatch.StartNew();
|
||||
while (!condition() && sw.ElapsedMilliseconds < timeoutMs)
|
||||
await Task.Delay(10);
|
||||
Assert.True(condition(), "Condition not met within timeout.");
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user