using System.Threading.Channels; using Google.Protobuf.WellKnownTypes; using MxGateway.Contracts.Proto; using Shouldly; using Xunit; using ZB.MOM.WW.OtOpcUa.Core.Abstractions; using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Config; using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Runtime; namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests.Runtime; /// /// End-to-end tests for 's ISubscribable wiring + /// . The fake subscriber replays a controlled stream of /// s; the test asserts the driver's OnDataChange fans /// out per registered subscription. /// public sealed class GalaxyDriverSubscribeTests { private static GalaxyDriverOptions Opts() => new( new GalaxyGatewayOptions("https://mxgw.test:5001", "key"), new GalaxyMxAccessOptions("OtOpcUa-A"), new GalaxyRepositoryOptions(), new GalaxyReconnectOptions()); internal sealed class FakeSubscriber : IGalaxySubscriber { private int _nextHandle = 1; private readonly Channel _events = Channel.CreateUnbounded(); public Dictionary Map { get; } = new(); public List UnsubscribedHandles { get; } = []; public List BufferedIntervalsCalled { get; } = []; public Func Decide { get; set; } = _ => true; public Task> SubscribeBulkAsync( IReadOnlyList fullReferences, int bufferedUpdateIntervalMs, CancellationToken cancellationToken) { BufferedIntervalsCalled.Add(bufferedUpdateIntervalMs); var results = new List(fullReferences.Count); foreach (var fullRef in fullReferences) { if (Decide(fullRef)) { var handle = Interlocked.Increment(ref _nextHandle); Map[fullRef] = handle; results.Add(new SubscribeResult { TagAddress = fullRef, ItemHandle = handle, WasSuccessful = true, }); } else { results.Add(new SubscribeResult { TagAddress = fullRef, ItemHandle = 0, WasSuccessful = false, ErrorMessage = "rejected by fake", }); } } return Task.FromResult>(results); } public Task UnsubscribeBulkAsync(IReadOnlyList itemHandles, CancellationToken cancellationToken) { UnsubscribedHandles.AddRange(itemHandles); return Task.CompletedTask; } public IAsyncEnumerable StreamEventsAsync(CancellationToken cancellationToken) => _events.Reader.ReadAllAsync(cancellationToken); public ValueTask EmitOnDataChangeAsync(int itemHandle, double value, byte quality = 192) => _events.Writer.WriteAsync(new MxEvent { Family = MxEventFamily.OnDataChange, ItemHandle = itemHandle, Value = new MxValue { DoubleValue = value }, Quality = quality, SourceTimestamp = Timestamp.FromDateTime(DateTime.UtcNow), }); public void CompleteEvents() => _events.Writer.Complete(); } [Fact] public async Task SubscribeAsync_AllocatesHandle_AndDispatchesValueChange() { var subscriber = new FakeSubscriber(); using var driver = new GalaxyDriver( "g", Opts(), hierarchySource: null, dataReader: null, dataWriter: null, subscriber: subscriber); var captured = new List(); driver.OnDataChange += (_, args) => captured.Add(args); var handle = await driver.SubscribeAsync(["Tank.Level"], TimeSpan.FromSeconds(1), CancellationToken.None); var itemHandle = subscriber.Map["Tank.Level"]; await subscriber.EmitOnDataChangeAsync(itemHandle, 42.0); await WaitForAsync(() => captured.Count >= 1); captured.Count.ShouldBe(1); captured[0].SubscriptionHandle.ShouldBe(handle); captured[0].FullReference.ShouldBe("Tank.Level"); ((double)captured[0].Snapshot.Value!).ShouldBe(42.0); } [Fact] public async Task SubscribeAsync_TwoSubscriptions_SameTag_FanOutOnePerSubscription() { var subscriber = new FakeSubscriber(); using var driver = new GalaxyDriver( "g", Opts(), hierarchySource: null, dataReader: null, dataWriter: null, subscriber: subscriber); var captured = new List(); driver.OnDataChange += (_, args) => captured.Add(args); var handle1 = await driver.SubscribeAsync(["A"], TimeSpan.FromSeconds(1), CancellationToken.None); var handle2 = await driver.SubscribeAsync(["A"], TimeSpan.FromSeconds(1), CancellationToken.None); // Both subscriptions resolved the same FullRef. The fake gives each its own // itemHandle (Map["A"] gets overwritten), so we use the latest mapping for the // second subscription's expected delivery; the first subscription's binding // points at an item handle the gateway fake hasn't emitted on. To exercise the // fan-out, register both subs against the SAME handle (matches the gw's "one // handle per (server, tag) pair" pattern in production where SubscribeBulk // returns the existing handle for an already-AddItem'd tag). subscriber.Map["A"].ShouldBeGreaterThan(0); // Synthesize an event against handle 2 (which is also tracked under sub 2). // Fan-out for the same tag is best validated at the registry level — the // SubscriptionRegistryTests cover the multi-sub-same-handle case directly. await subscriber.EmitOnDataChangeAsync(subscriber.Map["A"], 7.0); await WaitForAsync(() => captured.Count >= 1); // At least one delivery — depending on which subscription owns the handle, // either handle1 or handle2 receives. The fan-out invariant (a single handle // delivers to every subscription that registered it) is pinned in // SubscriptionRegistryTests; here we just confirm the wiring works. captured.ShouldNotBeEmpty(); captured[0].SubscriptionHandle.DiagnosticId.ShouldStartWith("galaxy-sub-"); // Either handle1 or handle2 must match the captured handle's id. var captured0Id = ((GalaxySubscriptionHandle)captured[0].SubscriptionHandle).SubscriptionId; var allowed = new[] { ((GalaxySubscriptionHandle)handle1).SubscriptionId, ((GalaxySubscriptionHandle)handle2).SubscriptionId, }; allowed.ShouldContain(captured0Id); } [Fact] public async Task SubscribeAsync_FailedTag_DoesNotDispatchEvents() { var subscriber = new FakeSubscriber { Decide = tag => tag != "Bad" }; using var driver = new GalaxyDriver( "g", Opts(), hierarchySource: null, dataReader: null, dataWriter: null, subscriber: subscriber); var captured = new List(); driver.OnDataChange += (_, args) => captured.Add(args); await driver.SubscribeAsync(["Good", "Bad"], TimeSpan.FromSeconds(1), CancellationToken.None); // Good has an itemHandle; Bad doesn't (item handle 0). An event with handle 0 // must NOT be dispatched (no subscribers registered against it). await subscriber.EmitOnDataChangeAsync(itemHandle: 0, value: 999.0); await Task.Delay(50); // give the pump a chance captured.ShouldBeEmpty(); } [Fact] public async Task UnsubscribeAsync_RemovesRegistration_AndCallsGwUnsubscribe() { var subscriber = new FakeSubscriber(); using var driver = new GalaxyDriver( "g", Opts(), hierarchySource: null, dataReader: null, dataWriter: null, subscriber: subscriber); var handle = await driver.SubscribeAsync(["X"], TimeSpan.FromSeconds(1), CancellationToken.None); var itemHandle = subscriber.Map["X"]; await driver.UnsubscribeAsync(handle, CancellationToken.None); subscriber.UnsubscribedHandles.ShouldContain(itemHandle); // Subsequent events for the dropped handle don't dispatch. var captured = new List(); driver.OnDataChange += (_, args) => captured.Add(args); await subscriber.EmitOnDataChangeAsync(itemHandle, 11.0); await Task.Delay(50); captured.ShouldBeEmpty(); } [Fact] public async Task UnsubscribeAsync_UnknownHandle_NoOp() { var subscriber = new FakeSubscriber(); using var driver = new GalaxyDriver( "g", Opts(), hierarchySource: null, dataReader: null, dataWriter: null, subscriber: subscriber); // Handle issued by a different driver shape — must throw (it's a programming // error, not a recoverable runtime condition). var foreignHandle = new ForeignHandle(); await Should.ThrowAsync(() => driver.UnsubscribeAsync(foreignHandle, CancellationToken.None)); } [Fact] public async Task SubscribeAsync_NoSubscriber_Throws() { using var driver = new GalaxyDriver("g", Opts()); var ex = await Should.ThrowAsync(() => driver.SubscribeAsync(["x"], TimeSpan.FromSeconds(1), CancellationToken.None)); ex.Message.ShouldContain("PR 4.W"); } [Fact] public async Task SubscribeAsync_FallsBackToConfiguredInterval_WhenCallerPassesZero() { // PR 6.3 — when the caller doesn't set a publishing interval (TimeSpan.Zero), // the driver substitutes MxAccess.PublishingIntervalMs from its options. var subscriber = new FakeSubscriber(); var opts = new GalaxyDriverOptions( new GalaxyGatewayOptions("https://mxgw.test:5001", "key"), new GalaxyMxAccessOptions("OtOpcUa-A", PublishingIntervalMs: 750), new GalaxyRepositoryOptions(), new GalaxyReconnectOptions()); using var driver = new GalaxyDriver( "g", opts, hierarchySource: null, dataReader: null, dataWriter: null, subscriber: subscriber); await driver.SubscribeAsync(["Tag.A"], TimeSpan.Zero, CancellationToken.None); subscriber.BufferedIntervalsCalled.ShouldHaveSingleItem().ShouldBe(750); } [Fact] public async Task SubscribeAsync_RespectsCallerInterval_WhenNonZero() { // The caller's publishingInterval wins when explicitly set — the configured // option only applies as a fallback for "no-preference" callers. var subscriber = new FakeSubscriber(); var opts = new GalaxyDriverOptions( new GalaxyGatewayOptions("https://mxgw.test:5001", "key"), new GalaxyMxAccessOptions("OtOpcUa-A", PublishingIntervalMs: 750), new GalaxyRepositoryOptions(), new GalaxyReconnectOptions()); using var driver = new GalaxyDriver( "g", opts, hierarchySource: null, dataReader: null, dataWriter: null, subscriber: subscriber); await driver.SubscribeAsync(["Tag.A"], TimeSpan.FromMilliseconds(250), CancellationToken.None); subscriber.BufferedIntervalsCalled.ShouldHaveSingleItem().ShouldBe(250); } [Fact] public async Task SubscribeAsync_EmptyTagList_ReturnsHandleWithoutCallingGw() { var subscriber = new FakeSubscriber(); using var driver = new GalaxyDriver( "g", Opts(), hierarchySource: null, dataReader: null, dataWriter: null, subscriber: subscriber); var handle = await driver.SubscribeAsync([], TimeSpan.FromSeconds(1), CancellationToken.None); handle.ShouldNotBeNull(); subscriber.Map.ShouldBeEmpty(); } private sealed class ForeignHandle : ISubscriptionHandle { public string DiagnosticId => "foreign-x"; } private static async Task WaitForAsync(Func predicate, int timeoutMs = 1000) { var deadline = DateTime.UtcNow.AddMilliseconds(timeoutMs); while (DateTime.UtcNow < deadline) { if (predicate()) return; await Task.Delay(10); } predicate().ShouldBeTrue("Predicate did not become true within timeout."); } }