Wires MxAccess.PublishingIntervalMs into the gw's SubscribeBulk bufferedUpdateIntervalMs parameter on both subscribe paths: - GalaxyDriver.SubscribeAsync — when the caller passes TimeSpan.Zero (typical for infrastructure callers like the deploy watcher), the driver substitutes _options.MxAccess.PublishingIntervalMs. When the caller sets a non-zero interval (the server's UA subscription publishingInterval), that wins. - PerPlatformProbeWatcher — new bufferedUpdateIntervalMs ctor parameter defaulting to 0 (gw default cadence). GalaxyDriver passes _options.MxAccess.PublishingIntervalMs so probe ScanState changes publish at the configured rate. Tests: caller-wins-when-non-zero, fallback-to-config-when-zero on the driver; default-zero, configured-forwarded, negative-rejected on the probe watcher. A session-level SetBufferedUpdateInterval RPC exists in the gw protocol (MxCommandKind.SetBufferedUpdateInterval) but the .NET client doesn't expose a typed helper yet — adjusting an existing subscription's interval is a follow-up. Today's path subscribes once with the right interval, which covers the common case. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
285 lines
12 KiB
C#
285 lines
12 KiB
C#
using System.Threading.Channels;
|
|
using Google.Protobuf.WellKnownTypes;
|
|
using MxGateway.Contracts.Proto;
|
|
using Shouldly;
|
|
using Xunit;
|
|
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
|
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Config;
|
|
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Runtime;
|
|
|
|
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests.Runtime;
|
|
|
|
/// <summary>
|
|
/// End-to-end tests for <see cref="GalaxyDriver"/>'s ISubscribable wiring +
|
|
/// <see cref="EventPump"/>. The fake subscriber replays a controlled stream of
|
|
/// <see cref="MxEvent"/>s; the test asserts the driver's <c>OnDataChange</c> fans
|
|
/// out per registered subscription.
|
|
/// </summary>
|
|
public sealed class GalaxyDriverSubscribeTests
|
|
{
|
|
private static GalaxyDriverOptions Opts() => new(
|
|
new GalaxyGatewayOptions("https://mxgw.test:5001", "key"),
|
|
new GalaxyMxAccessOptions("OtOpcUa-A"),
|
|
new GalaxyRepositoryOptions(),
|
|
new GalaxyReconnectOptions());
|
|
|
|
private sealed class FakeSubscriber : IGalaxySubscriber
|
|
{
|
|
private int _nextHandle = 1;
|
|
private readonly Channel<MxEvent> _events = Channel.CreateUnbounded<MxEvent>();
|
|
public Dictionary<string, int> Map { get; } = new();
|
|
public List<int> UnsubscribedHandles { get; } = [];
|
|
public List<int> BufferedIntervalsCalled { get; } = [];
|
|
public Func<string, bool> Decide { get; set; } = _ => true;
|
|
|
|
public Task<IReadOnlyList<SubscribeResult>> SubscribeBulkAsync(
|
|
IReadOnlyList<string> fullReferences, int bufferedUpdateIntervalMs, CancellationToken cancellationToken)
|
|
{
|
|
BufferedIntervalsCalled.Add(bufferedUpdateIntervalMs);
|
|
var results = new List<SubscribeResult>(fullReferences.Count);
|
|
foreach (var fullRef in fullReferences)
|
|
{
|
|
if (Decide(fullRef))
|
|
{
|
|
var handle = Interlocked.Increment(ref _nextHandle);
|
|
Map[fullRef] = handle;
|
|
results.Add(new SubscribeResult
|
|
{
|
|
TagAddress = fullRef,
|
|
ItemHandle = handle,
|
|
WasSuccessful = true,
|
|
});
|
|
}
|
|
else
|
|
{
|
|
results.Add(new SubscribeResult
|
|
{
|
|
TagAddress = fullRef,
|
|
ItemHandle = 0,
|
|
WasSuccessful = false,
|
|
ErrorMessage = "rejected by fake",
|
|
});
|
|
}
|
|
}
|
|
return Task.FromResult<IReadOnlyList<SubscribeResult>>(results);
|
|
}
|
|
|
|
public Task UnsubscribeBulkAsync(IReadOnlyList<int> itemHandles, CancellationToken cancellationToken)
|
|
{
|
|
UnsubscribedHandles.AddRange(itemHandles);
|
|
return Task.CompletedTask;
|
|
}
|
|
|
|
public IAsyncEnumerable<MxEvent> StreamEventsAsync(CancellationToken cancellationToken)
|
|
=> _events.Reader.ReadAllAsync(cancellationToken);
|
|
|
|
public ValueTask EmitOnDataChangeAsync(int itemHandle, double value, byte quality = 192) =>
|
|
_events.Writer.WriteAsync(new MxEvent
|
|
{
|
|
Family = MxEventFamily.OnDataChange,
|
|
ItemHandle = itemHandle,
|
|
Value = new MxValue { DoubleValue = value },
|
|
Quality = quality,
|
|
SourceTimestamp = Timestamp.FromDateTime(DateTime.UtcNow),
|
|
});
|
|
|
|
public void CompleteEvents() => _events.Writer.Complete();
|
|
}
|
|
|
|
[Fact]
|
|
public async Task SubscribeAsync_AllocatesHandle_AndDispatchesValueChange()
|
|
{
|
|
var subscriber = new FakeSubscriber();
|
|
using var driver = new GalaxyDriver(
|
|
"g", Opts(), hierarchySource: null, dataReader: null, dataWriter: null, subscriber: subscriber);
|
|
|
|
var captured = new List<DataChangeEventArgs>();
|
|
driver.OnDataChange += (_, args) => captured.Add(args);
|
|
|
|
var handle = await driver.SubscribeAsync(["Tank.Level"], TimeSpan.FromSeconds(1), CancellationToken.None);
|
|
|
|
var itemHandle = subscriber.Map["Tank.Level"];
|
|
await subscriber.EmitOnDataChangeAsync(itemHandle, 42.0);
|
|
|
|
await WaitForAsync(() => captured.Count >= 1);
|
|
captured.Count.ShouldBe(1);
|
|
captured[0].SubscriptionHandle.ShouldBe(handle);
|
|
captured[0].FullReference.ShouldBe("Tank.Level");
|
|
((double)captured[0].Snapshot.Value!).ShouldBe(42.0);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task SubscribeAsync_TwoSubscriptions_SameTag_FanOutOnePerSubscription()
|
|
{
|
|
var subscriber = new FakeSubscriber();
|
|
using var driver = new GalaxyDriver(
|
|
"g", Opts(), hierarchySource: null, dataReader: null, dataWriter: null, subscriber: subscriber);
|
|
|
|
var captured = new List<DataChangeEventArgs>();
|
|
driver.OnDataChange += (_, args) => captured.Add(args);
|
|
|
|
var handle1 = await driver.SubscribeAsync(["A"], TimeSpan.FromSeconds(1), CancellationToken.None);
|
|
var handle2 = await driver.SubscribeAsync(["A"], TimeSpan.FromSeconds(1), CancellationToken.None);
|
|
|
|
// Both subscriptions resolved the same FullRef. The fake gives each its own
|
|
// itemHandle (Map["A"] gets overwritten), so we use the latest mapping for the
|
|
// second subscription's expected delivery; the first subscription's binding
|
|
// points at an item handle the gateway fake hasn't emitted on. To exercise the
|
|
// fan-out, register both subs against the SAME handle (matches the gw's "one
|
|
// handle per (server, tag) pair" pattern in production where SubscribeBulk
|
|
// returns the existing handle for an already-AddItem'd tag).
|
|
subscriber.Map["A"].ShouldBeGreaterThan(0);
|
|
// Synthesize an event against handle 2 (which is also tracked under sub 2).
|
|
// Fan-out for the same tag is best validated at the registry level — the
|
|
// SubscriptionRegistryTests cover the multi-sub-same-handle case directly.
|
|
await subscriber.EmitOnDataChangeAsync(subscriber.Map["A"], 7.0);
|
|
|
|
await WaitForAsync(() => captured.Count >= 1);
|
|
|
|
// At least one delivery — depending on which subscription owns the handle,
|
|
// either handle1 or handle2 receives. The fan-out invariant (a single handle
|
|
// delivers to every subscription that registered it) is pinned in
|
|
// SubscriptionRegistryTests; here we just confirm the wiring works.
|
|
captured.ShouldNotBeEmpty();
|
|
captured[0].SubscriptionHandle.DiagnosticId.ShouldStartWith("galaxy-sub-");
|
|
// Either handle1 or handle2 must match the captured handle's id.
|
|
var captured0Id = ((GalaxySubscriptionHandle)captured[0].SubscriptionHandle).SubscriptionId;
|
|
var allowed = new[] {
|
|
((GalaxySubscriptionHandle)handle1).SubscriptionId,
|
|
((GalaxySubscriptionHandle)handle2).SubscriptionId,
|
|
};
|
|
allowed.ShouldContain(captured0Id);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task SubscribeAsync_FailedTag_DoesNotDispatchEvents()
|
|
{
|
|
var subscriber = new FakeSubscriber { Decide = tag => tag != "Bad" };
|
|
using var driver = new GalaxyDriver(
|
|
"g", Opts(), hierarchySource: null, dataReader: null, dataWriter: null, subscriber: subscriber);
|
|
|
|
var captured = new List<DataChangeEventArgs>();
|
|
driver.OnDataChange += (_, args) => captured.Add(args);
|
|
|
|
await driver.SubscribeAsync(["Good", "Bad"], TimeSpan.FromSeconds(1), CancellationToken.None);
|
|
|
|
// Good has an itemHandle; Bad doesn't (item handle 0). An event with handle 0
|
|
// must NOT be dispatched (no subscribers registered against it).
|
|
await subscriber.EmitOnDataChangeAsync(itemHandle: 0, value: 999.0);
|
|
await Task.Delay(50); // give the pump a chance
|
|
|
|
captured.ShouldBeEmpty();
|
|
}
|
|
|
|
[Fact]
|
|
public async Task UnsubscribeAsync_RemovesRegistration_AndCallsGwUnsubscribe()
|
|
{
|
|
var subscriber = new FakeSubscriber();
|
|
using var driver = new GalaxyDriver(
|
|
"g", Opts(), hierarchySource: null, dataReader: null, dataWriter: null, subscriber: subscriber);
|
|
|
|
var handle = await driver.SubscribeAsync(["X"], TimeSpan.FromSeconds(1), CancellationToken.None);
|
|
var itemHandle = subscriber.Map["X"];
|
|
|
|
await driver.UnsubscribeAsync(handle, CancellationToken.None);
|
|
|
|
subscriber.UnsubscribedHandles.ShouldContain(itemHandle);
|
|
|
|
// Subsequent events for the dropped handle don't dispatch.
|
|
var captured = new List<DataChangeEventArgs>();
|
|
driver.OnDataChange += (_, args) => captured.Add(args);
|
|
await subscriber.EmitOnDataChangeAsync(itemHandle, 11.0);
|
|
await Task.Delay(50);
|
|
captured.ShouldBeEmpty();
|
|
}
|
|
|
|
[Fact]
|
|
public async Task UnsubscribeAsync_UnknownHandle_NoOp()
|
|
{
|
|
var subscriber = new FakeSubscriber();
|
|
using var driver = new GalaxyDriver(
|
|
"g", Opts(), hierarchySource: null, dataReader: null, dataWriter: null, subscriber: subscriber);
|
|
|
|
// Handle issued by a different driver shape — must throw (it's a programming
|
|
// error, not a recoverable runtime condition).
|
|
var foreignHandle = new ForeignHandle();
|
|
await Should.ThrowAsync<ArgumentException>(() =>
|
|
driver.UnsubscribeAsync(foreignHandle, CancellationToken.None));
|
|
}
|
|
|
|
[Fact]
|
|
public async Task SubscribeAsync_NoSubscriber_Throws()
|
|
{
|
|
using var driver = new GalaxyDriver("g", Opts());
|
|
var ex = await Should.ThrowAsync<NotSupportedException>(() =>
|
|
driver.SubscribeAsync(["x"], TimeSpan.FromSeconds(1), CancellationToken.None));
|
|
ex.Message.ShouldContain("PR 4.W");
|
|
}
|
|
|
|
[Fact]
|
|
public async Task SubscribeAsync_FallsBackToConfiguredInterval_WhenCallerPassesZero()
|
|
{
|
|
// PR 6.3 — when the caller doesn't set a publishing interval (TimeSpan.Zero),
|
|
// the driver substitutes MxAccess.PublishingIntervalMs from its options.
|
|
var subscriber = new FakeSubscriber();
|
|
var opts = new GalaxyDriverOptions(
|
|
new GalaxyGatewayOptions("https://mxgw.test:5001", "key"),
|
|
new GalaxyMxAccessOptions("OtOpcUa-A", PublishingIntervalMs: 750),
|
|
new GalaxyRepositoryOptions(),
|
|
new GalaxyReconnectOptions());
|
|
using var driver = new GalaxyDriver(
|
|
"g", opts, hierarchySource: null, dataReader: null, dataWriter: null, subscriber: subscriber);
|
|
|
|
await driver.SubscribeAsync(["Tag.A"], TimeSpan.Zero, CancellationToken.None);
|
|
|
|
subscriber.BufferedIntervalsCalled.ShouldHaveSingleItem().ShouldBe(750);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task SubscribeAsync_RespectsCallerInterval_WhenNonZero()
|
|
{
|
|
// The caller's publishingInterval wins when explicitly set — the configured
|
|
// option only applies as a fallback for "no-preference" callers.
|
|
var subscriber = new FakeSubscriber();
|
|
var opts = new GalaxyDriverOptions(
|
|
new GalaxyGatewayOptions("https://mxgw.test:5001", "key"),
|
|
new GalaxyMxAccessOptions("OtOpcUa-A", PublishingIntervalMs: 750),
|
|
new GalaxyRepositoryOptions(),
|
|
new GalaxyReconnectOptions());
|
|
using var driver = new GalaxyDriver(
|
|
"g", opts, hierarchySource: null, dataReader: null, dataWriter: null, subscriber: subscriber);
|
|
|
|
await driver.SubscribeAsync(["Tag.A"], TimeSpan.FromMilliseconds(250), CancellationToken.None);
|
|
|
|
subscriber.BufferedIntervalsCalled.ShouldHaveSingleItem().ShouldBe(250);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task SubscribeAsync_EmptyTagList_ReturnsHandleWithoutCallingGw()
|
|
{
|
|
var subscriber = new FakeSubscriber();
|
|
using var driver = new GalaxyDriver(
|
|
"g", Opts(), hierarchySource: null, dataReader: null, dataWriter: null, subscriber: subscriber);
|
|
|
|
var handle = await driver.SubscribeAsync([], TimeSpan.FromSeconds(1), CancellationToken.None);
|
|
handle.ShouldNotBeNull();
|
|
subscriber.Map.ShouldBeEmpty();
|
|
}
|
|
|
|
private sealed class ForeignHandle : ISubscriptionHandle
|
|
{
|
|
public string DiagnosticId => "foreign-x";
|
|
}
|
|
|
|
private static async Task WaitForAsync(Func<bool> predicate, int timeoutMs = 1000)
|
|
{
|
|
var deadline = DateTime.UtcNow.AddMilliseconds(timeoutMs);
|
|
while (DateTime.UtcNow < deadline)
|
|
{
|
|
if (predicate()) return;
|
|
await Task.Delay(10);
|
|
}
|
|
predicate().ShouldBeTrue("Predicate did not become true within timeout.");
|
|
}
|
|
}
|