using Shouldly; using Xunit; using ZB.MOM.WW.OtOpcUa.Core.Abstractions; namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.ParityTests; /// /// PR 5.3 — Subscribe + event-rate parity. Both backends must accept the same /// full-reference list, return a usable subscription handle, and dispatch a /// similar number of OnDataChange events for the same observation window. /// [Trait("Category", "ParityE2E")] [Collection(nameof(ParityCollection))] public sealed class SubscribeAndEventRateParityTests { private readonly ParityHarness _h; public SubscribeAndEventRateParityTests(ParityHarness h) => _h = h; [Fact] public async Task Subscribe_returns_a_handle_for_each_backend() { _h.RequireBoth(); var sample = await PickSampleAsync(5); if (sample.Length == 0) Assert.Skip("dev Galaxy has no discoverable variables"); var handles = await _h.RunOnAvailableAsync( (driver, ct) => ((ISubscribable)driver).SubscribeAsync(sample, TimeSpan.FromMilliseconds(500), ct), CancellationToken.None); handles[ParityHarness.Backend.LegacyHost].ShouldNotBeNull(); handles[ParityHarness.Backend.MxGateway].ShouldNotBeNull(); // Clean up so we don't leave dangling advises in either backend. foreach (var (backend, handle) in handles) { await ((ISubscribable)_h.GetDriver(backend)) .UnsubscribeAsync(handle, CancellationToken.None); } } [Fact] public async Task Subscribe_event_rate_within_tolerance_for_a_3s_window() { _h.RequireBoth(); var sample = await PickSampleAsync(5); if (sample.Length == 0) Assert.Skip("dev Galaxy has no discoverable variables"); var counts = new Dictionary(); var subs = new Dictionary(); try { foreach (var backend in new[] { ParityHarness.Backend.LegacyHost, ParityHarness.Backend.MxGateway }) { var driver = _h.GetDriver(backend); var local = 0; EventHandler handler = (_, _) => Interlocked.Increment(ref local); ((ISubscribable)driver).OnDataChange += handler; var handle = await ((ISubscribable)driver) .SubscribeAsync(sample, TimeSpan.FromMilliseconds(500), CancellationToken.None); subs[backend] = handle; await Task.Delay(3_000, TestContext.Current.CancellationToken); ((ISubscribable)driver).OnDataChange -= handler; counts[backend] = Volatile.Read(ref local); } // Tolerance is generous because both backends are looking at the same // physical Galaxy; the gateway's StreamEvents pump and the legacy // OnDataChange COM advises are fed by the same MXAccess subscriptions // upstream. ±50% absorbs scheduler jitter without hiding a wholesale // event-rate regression. var legacyCount = counts[ParityHarness.Backend.LegacyHost]; var mxgwCount = counts[ParityHarness.Backend.MxGateway]; if (legacyCount + mxgwCount == 0) { Assert.Skip("no value changes observed in 3s window — sample may be all static configuration tags"); } var ratio = (double)mxgwCount / Math.Max(legacyCount, 1); ratio.ShouldBeInRange(0.5, 1.5, $"event-rate parity within ±50%: legacy={legacyCount}, mxgw={mxgwCount}"); } finally { foreach (var (backend, handle) in subs) { try { await ((ISubscribable)_h.GetDriver(backend)) .UnsubscribeAsync(handle, CancellationToken.None); } catch { /* best-effort cleanup */ } } } } private async Task PickSampleAsync(int count) { var b = new RecordingAddressSpaceBuilder(); await ((ITagDiscovery)_h.LegacyDriver!).DiscoverAsync(b, CancellationToken.None); return b.Variables.Take(count).Select(v => v.AttributeInfo.FullName).ToArray(); } }