From 2fdad81af379d64b953cb2cc343977f63cec5164 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Wed, 29 Apr 2026 16:56:33 -0400 Subject: [PATCH] =?UTF-8?q?PR=206.3=20=E2=80=94=20Buffered=20update=20inte?= =?UTF-8?q?rval=20landing?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Wires MxAccess.PublishingIntervalMs into the gw's SubscribeBulk bufferedUpdateIntervalMs parameter on both subscribe paths: - GalaxyDriver.SubscribeAsync — when the caller passes TimeSpan.Zero (typical for infrastructure callers like the deploy watcher), the driver substitutes _options.MxAccess.PublishingIntervalMs. When the caller sets a non-zero interval (the server's UA subscription publishingInterval), that wins. - PerPlatformProbeWatcher — new bufferedUpdateIntervalMs ctor parameter defaulting to 0 (gw default cadence). GalaxyDriver passes _options.MxAccess.PublishingIntervalMs so probe ScanState changes publish at the configured rate. Tests: caller-wins-when-non-zero, fallback-to-config-when-zero on the driver; default-zero, configured-forwarded, negative-rejected on the probe watcher. A session-level SetBufferedUpdateInterval RPC exists in the gw protocol (MxCommandKind.SetBufferedUpdateInterval) but the .NET client doesn't expose a typed helper yet — adjusting an existing subscription's interval is a follow-up. Today's path subscribes once with the right interval, which covers the common case. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../GalaxyDriver.cs | 11 ++++- .../Health/PerPlatformProbeWatcher.cs | 20 ++++++++-- .../Health/PerPlatformProbeWatcherTests.cs | 36 +++++++++++++++++ .../Runtime/GalaxyDriverSubscribeTests.cs | 40 +++++++++++++++++++ 4 files changed, 101 insertions(+), 6 deletions(-) diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/GalaxyDriver.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/GalaxyDriver.cs index 7bf8035..987b2e8 100644 --- a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/GalaxyDriver.cs +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/GalaxyDriver.cs @@ -209,7 +209,9 @@ public sealed class GalaxyDriver _supervisor.StateChanged += OnSupervisorStateChanged; - _probeWatcher = new PerPlatformProbeWatcher(_subscriber, _hostStatuses, _logger); + _probeWatcher = new PerPlatformProbeWatcher( + _subscriber, _hostStatuses, _logger, + bufferedUpdateIntervalMs: _options.MxAccess.PublishingIntervalMs); } /// @@ -441,7 +443,12 @@ public sealed class GalaxyDriver return new GalaxySubscriptionHandle(subscriptionId); } - var bufferedIntervalMs = (int)Math.Max(0, publishingInterval.TotalMilliseconds); + // PR 6.3 — when the caller doesn't set a publishing interval (TimeSpan.Zero or + // negative), fall back to the configured MxAccess.PublishingIntervalMs. The + // server's UA subscription publishingInterval drives this in production; tests + // and infrastructure callers (probe watcher, deploy watcher) hit the fallback. + var requested = (int)Math.Max(0, publishingInterval.TotalMilliseconds); + var bufferedIntervalMs = requested > 0 ? requested : _options.MxAccess.PublishingIntervalMs; var results = await _subscriber .SubscribeBulkAsync(fullReferences, bufferedIntervalMs, cancellationToken) .ConfigureAwait(false); diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/Health/PerPlatformProbeWatcher.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/Health/PerPlatformProbeWatcher.cs index b86a9cd..faba039 100644 --- a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/Health/PerPlatformProbeWatcher.cs +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/Health/PerPlatformProbeWatcher.cs @@ -36,6 +36,7 @@ public sealed class PerPlatformProbeWatcher : IDisposable private readonly IGalaxySubscriber _subscriber; private readonly HostStatusAggregator _aggregator; private readonly ILogger _logger; + private readonly int _bufferedUpdateIntervalMs; // Tracked platform → gw item handle. Item handle 0 means the gw rejected the subscribe; // we keep the entry so SyncPlatformsAsync doesn't try to subscribe it again on every call. @@ -45,11 +46,20 @@ public sealed class PerPlatformProbeWatcher : IDisposable private bool _disposed; public PerPlatformProbeWatcher( - IGalaxySubscriber subscriber, HostStatusAggregator aggregator, ILogger? logger = null) + IGalaxySubscriber subscriber, + HostStatusAggregator aggregator, + ILogger? logger = null, + int bufferedUpdateIntervalMs = 0) { _subscriber = subscriber ?? throw new ArgumentNullException(nameof(subscriber)); _aggregator = aggregator ?? throw new ArgumentNullException(nameof(aggregator)); _logger = logger ?? NullLogger.Instance; + if (bufferedUpdateIntervalMs < 0) + { + throw new ArgumentOutOfRangeException(nameof(bufferedUpdateIntervalMs), + "bufferedUpdateIntervalMs must be >= 0; 0 means use the gw's default cadence."); + } + _bufferedUpdateIntervalMs = bufferedUpdateIntervalMs; } /// Snapshot of platform tag names currently watched. @@ -107,10 +117,12 @@ public sealed class PerPlatformProbeWatcher : IDisposable if (toAdd.Count == 0) return; var probeAddresses = toAdd.Select(p => p + ProbeSuffix).ToArray(); - // bufferedUpdateInterval=0 — probe ScanState changes are rare enough that the gw's - // default cadence is fine; explicit polling rate goes through PR 6.3. + // PR 6.3 — use the configured bufferedUpdateIntervalMs (defaults to 0 = gw cadence + // when the driver hasn't overridden MxAccess.PublishingIntervalMs). Probe ScanState + // changes are rare so a coarser interval is usually fine; deployments that need + // tighter health visibility can dial it down through GalaxyDriverOptions. var results = await _subscriber.SubscribeBulkAsync( - probeAddresses, bufferedUpdateIntervalMs: 0, cancellationToken).ConfigureAwait(false); + probeAddresses, _bufferedUpdateIntervalMs, cancellationToken).ConfigureAwait(false); for (var i = 0; i < toAdd.Count; i++) { diff --git a/tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests/Health/PerPlatformProbeWatcherTests.cs b/tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests/Health/PerPlatformProbeWatcherTests.cs index cd7dd9e..5c299d9 100644 --- a/tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests/Health/PerPlatformProbeWatcherTests.cs +++ b/tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests/Health/PerPlatformProbeWatcherTests.cs @@ -18,6 +18,7 @@ public sealed class PerPlatformProbeWatcherTests private sealed class FakeSubscriber : IGalaxySubscriber { public List> Subscribes { get; } = []; + public List SubscribeIntervalsMs { get; } = []; public List> Unsubscribes { get; } = []; private int _nextHandle = 1; public Dictionary HandleByAddress { get; } = new(StringComparer.OrdinalIgnoreCase); @@ -26,6 +27,7 @@ public sealed class PerPlatformProbeWatcherTests IReadOnlyList fullReferences, int bufferedUpdateIntervalMs, CancellationToken cancellationToken) { Subscribes.Add([.. fullReferences]); + SubscribeIntervalsMs.Add(bufferedUpdateIntervalMs); var results = new List(fullReferences.Count); foreach (var addr in fullReferences) { @@ -71,6 +73,40 @@ public sealed class PerPlatformProbeWatcherTests watcher.WatchedPlatforms.OrderBy(x => x).ShouldBe(new[] { "PlatformA", "PlatformB" }); } + [Fact] + public async Task SyncPlatformsAsync_DefaultBufferedIntervalIsZero_GwCadence() + { + // PR 6.3 — without an override, the watcher passes 0 (gw default cadence) so + // existing deployments don't see a behavior change. + var subscriber = new FakeSubscriber(); + using var watcher = new PerPlatformProbeWatcher(subscriber, new HostStatusAggregator()); + await watcher.SyncPlatformsAsync(["PlatformA"], CancellationToken.None); + subscriber.SubscribeIntervalsMs.ShouldHaveSingleItem().ShouldBe(0); + } + + [Fact] + public async Task SyncPlatformsAsync_ConfiguredBufferedInterval_IsForwardedToGw() + { + // PR 6.3 — when a deployment dials down MxAccess.PublishingIntervalMs for + // tighter health visibility, the probe watcher must forward that interval + // through SubscribeBulk so the gw publishes ScanState changes at the + // configured cadence. + var subscriber = new FakeSubscriber(); + using var watcher = new PerPlatformProbeWatcher( + subscriber, new HostStatusAggregator(), + bufferedUpdateIntervalMs: 250); + await watcher.SyncPlatformsAsync(["PlatformA"], CancellationToken.None); + subscriber.SubscribeIntervalsMs.ShouldHaveSingleItem().ShouldBe(250); + } + + [Fact] + public void Constructor_RejectsNegativeBufferedInterval() + { + var subscriber = new FakeSubscriber(); + Should.Throw(() => + new PerPlatformProbeWatcher(subscriber, new HostStatusAggregator(), bufferedUpdateIntervalMs: -1)); + } + [Fact] public async Task SyncPlatformsAsync_SameSetTwice_DoesNotResubscribe() { diff --git a/tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests/Runtime/GalaxyDriverSubscribeTests.cs b/tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests/Runtime/GalaxyDriverSubscribeTests.cs index 73457fb..a5fb50f 100644 --- a/tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests/Runtime/GalaxyDriverSubscribeTests.cs +++ b/tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests/Runtime/GalaxyDriverSubscribeTests.cs @@ -29,11 +29,13 @@ public sealed class GalaxyDriverSubscribeTests private readonly Channel _events = Channel.CreateUnbounded(); public Dictionary Map { get; } = new(); public List UnsubscribedHandles { get; } = []; + public List BufferedIntervalsCalled { get; } = []; public Func Decide { get; set; } = _ => true; public Task> SubscribeBulkAsync( IReadOnlyList fullReferences, int bufferedUpdateIntervalMs, CancellationToken cancellationToken) { + BufferedIntervalsCalled.Add(bufferedUpdateIntervalMs); var results = new List(fullReferences.Count); foreach (var fullRef in fullReferences) { @@ -214,6 +216,44 @@ public sealed class GalaxyDriverSubscribeTests ex.Message.ShouldContain("PR 4.W"); } + [Fact] + public async Task SubscribeAsync_FallsBackToConfiguredInterval_WhenCallerPassesZero() + { + // PR 6.3 — when the caller doesn't set a publishing interval (TimeSpan.Zero), + // the driver substitutes MxAccess.PublishingIntervalMs from its options. + var subscriber = new FakeSubscriber(); + var opts = new GalaxyDriverOptions( + new GalaxyGatewayOptions("https://mxgw.test:5001", "key"), + new GalaxyMxAccessOptions("OtOpcUa-A", PublishingIntervalMs: 750), + new GalaxyRepositoryOptions(), + new GalaxyReconnectOptions()); + using var driver = new GalaxyDriver( + "g", opts, hierarchySource: null, dataReader: null, dataWriter: null, subscriber: subscriber); + + await driver.SubscribeAsync(["Tag.A"], TimeSpan.Zero, CancellationToken.None); + + subscriber.BufferedIntervalsCalled.ShouldHaveSingleItem().ShouldBe(750); + } + + [Fact] + public async Task SubscribeAsync_RespectsCallerInterval_WhenNonZero() + { + // The caller's publishingInterval wins when explicitly set — the configured + // option only applies as a fallback for "no-preference" callers. + var subscriber = new FakeSubscriber(); + var opts = new GalaxyDriverOptions( + new GalaxyGatewayOptions("https://mxgw.test:5001", "key"), + new GalaxyMxAccessOptions("OtOpcUa-A", PublishingIntervalMs: 750), + new GalaxyRepositoryOptions(), + new GalaxyReconnectOptions()); + using var driver = new GalaxyDriver( + "g", opts, hierarchySource: null, dataReader: null, dataWriter: null, subscriber: subscriber); + + await driver.SubscribeAsync(["Tag.A"], TimeSpan.FromMilliseconds(250), CancellationToken.None); + + subscriber.BufferedIntervalsCalled.ShouldHaveSingleItem().ShouldBe(250); + } + [Fact] public async Task SubscribeAsync_EmptyTagList_ReturnsHandleWithoutCallingGw() {