From 698bdef572e7895b2de4d31423d95ddd60be63f0 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Wed, 29 Apr 2026 17:00:52 -0400 Subject: [PATCH] =?UTF-8?q?PR=206.4=20=E2=80=94=20Soak=20scenario=20test?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Long-running soak harness exercising the in-process GalaxyDriver against a live mxaccessgw. Subscribes a configurable tag count (default 50_000), holds the subscription for a configurable duration (default 24h), polls the EventPump's three counters every minute, and asserts: - events.received continues to grow (gw stream isn't stuck) - events.dropped stays under a configurable percent ceiling (default 0.5%) - process working-set doesn't grow >1 GB above baseline (leak guard) Always skipped unless the operator opts in via OTOPCUA_SOAK_RUN=1. Tag count, duration, and drop ceiling are env-overridable (OTOPCUA_SOAK_TAGS / OTOPCUA_SOAK_MINUTES / OTOPCUA_SOAK_DROP_PCT) so a smoke run can compress the scenario for CI gating. Per-minute progress is logged as a CSV-style line to stdout so an operator can grep the test runner output mid-run. PR 6.5 consumes the data this scenario emits to tune MxGatewayClientOptions defaults. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../SoakScenarioTests.cs | 138 ++++++++++++++++++ 1 file changed, 138 insertions(+) create mode 100644 tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.ParityTests/SoakScenarioTests.cs diff --git a/tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.ParityTests/SoakScenarioTests.cs b/tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.ParityTests/SoakScenarioTests.cs new file mode 100644 index 0000000..fdb9867 --- /dev/null +++ b/tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.ParityTests/SoakScenarioTests.cs @@ -0,0 +1,138 @@ +using System.Diagnostics; +using System.Diagnostics.Metrics; +using Shouldly; +using Xunit; +using ZB.MOM.WW.OtOpcUa.Core.Abstractions; + +namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.ParityTests; + +/// +/// PR 6.4 — long-running soak scenario for the in-process Galaxy driver against a +/// live mxaccessgw. Subscribes a configurable tag count, holds the subscription +/// for a configurable duration, polls the EventPump's three counters +/// (galaxy.events.received / galaxy.events.dispatched / +/// galaxy.events.dropped) every minute, and asserts: +/// +/// events.received continues to grow (the gw stream isn't stuck) +/// events.dropped stays under a configurable ceiling +/// process working-set size doesn't grow unboundedly (leak guard) +/// +/// Always skipped unless the operator opts in via OTOPCUA_SOAK_RUN=1 and +/// the mxgw backend is reachable. The default scenario size is 50k tags / 24h +/// per the PR plan; both are env-overridable so a smoke run can shorten them +/// to a few minutes for CI. +/// +[Trait("Category", "Soak")] +[Collection(nameof(ParityCollection))] +public sealed class SoakScenarioTests +{ + private const string MeterName = "ZB.MOM.WW.OtOpcUa.Driver.Galaxy"; + + private readonly ParityHarness _h; + public SoakScenarioTests(ParityHarness h) => _h = h; + + [Fact] + public async Task Soak_HoldsSubscription_AndKeepsEventStreamFlowing() + { + var run = Environment.GetEnvironmentVariable("OTOPCUA_SOAK_RUN"); + if (!string.Equals(run, "1", StringComparison.Ordinal)) + { + Assert.Skip("set OTOPCUA_SOAK_RUN=1 to run the 50k-tag soak (default 24h, override OTOPCUA_SOAK_MINUTES + OTOPCUA_SOAK_TAGS for CI)"); + } + if (_h.MxGatewayDriver is null) + { + Assert.Skip($"mxgateway backend unavailable: {_h.MxGatewaySkipReason}"); + } + + var tagCount = ParseInt("OTOPCUA_SOAK_TAGS", 50_000); + var soakMinutes = ParseInt("OTOPCUA_SOAK_MINUTES", 24 * 60); + var dropCeilingPercent = ParseDouble("OTOPCUA_SOAK_DROP_PCT", 0.5); // 0.5% drop ceiling + + // Discover and pick a sample. If the live Galaxy doesn't have tagCount tags, + // fall back to whatever's available — soak diagnostics still apply. + var driver = _h.MxGatewayDriver!; + var b = new RecordingAddressSpaceBuilder(); + await ((ITagDiscovery)driver).DiscoverAsync(b, CancellationToken.None); + + var sample = b.Variables.Take(tagCount) + .Select(v => v.AttributeInfo.FullName) + .Distinct(StringComparer.OrdinalIgnoreCase) + .ToArray(); + if (sample.Length == 0) Assert.Skip("dev Galaxy reported zero discoverable variables — nothing to soak"); + + // Capture the three EventPump counters via MeterListener so we can poll + // their cumulative totals once per minute. + var snapshot = new CounterSnapshot(); + using var listener = new MeterListener(); + listener.InstrumentPublished = (instr, l) => + { + if (instr.Meter.Name == MeterName) l.EnableMeasurementEvents(instr); + }; + listener.SetMeasurementEventCallback((instr, value, _, _) => + { + switch (instr.Name) + { + case "galaxy.events.received": Interlocked.Add(ref snapshot._received, value); break; + case "galaxy.events.dispatched": Interlocked.Add(ref snapshot._dispatched, value); break; + case "galaxy.events.dropped": Interlocked.Add(ref snapshot._dropped, value); break; + } + }); + listener.Start(); + + var initialWorkingSet = Process.GetCurrentProcess().WorkingSet64; + var startedUtc = DateTime.UtcNow; + var deadline = startedUtc + TimeSpan.FromMinutes(soakMinutes); + + var handle = await ((ISubscribable)driver) + .SubscribeAsync(sample, TimeSpan.FromSeconds(1), CancellationToken.None); + try + { + // Per-minute poll loop — pin the invariants and produce a CSV-style + // log row so an operator can grep the test runner's stdout. + var lastReceived = 0L; + while (DateTime.UtcNow < deadline) + { + await Task.Delay(TimeSpan.FromMinutes(1)); + var elapsed = DateTime.UtcNow - startedUtc; + var ws = Process.GetCurrentProcess().WorkingSet64; + Console.WriteLine( + $"soak,{elapsed.TotalMinutes:F1},received={snapshot.Received},dispatched={snapshot.Dispatched},dropped={snapshot.Dropped},ws_mb={ws / 1024 / 1024}"); + + snapshot.Received.ShouldBeGreaterThan(lastReceived, + $"events.received did not grow over the last minute (elapsed={elapsed:hh\\:mm\\:ss}) — gw stream may be stuck"); + lastReceived = snapshot.Received; + + var droppedPct = snapshot.Received == 0 + ? 0.0 + : 100.0 * snapshot.Dropped / snapshot.Received; + droppedPct.ShouldBeLessThan(dropCeilingPercent, + $"events.dropped ratio {droppedPct:F2}% exceeded {dropCeilingPercent:F2}% ceiling at {elapsed:hh\\:mm\\:ss}"); + + // Working-set guard: if the process grew >1 GB above the initial + // baseline, surface it. This is generous — a hot subscription stream + // legitimately uses memory; we're catching unbounded leaks, not + // steady-state allocation. + ((ws - initialWorkingSet) / (1024L * 1024L * 1024L)) + .ShouldBeLessThan(1L, + $"working set grew >1 GB above baseline at {elapsed:hh\\:mm\\:ss} — possible leak"); + } + } + finally + { + await ((ISubscribable)driver).UnsubscribeAsync(handle, CancellationToken.None); + } + } + + private static int ParseInt(string name, int defaultValue) => + int.TryParse(Environment.GetEnvironmentVariable(name), out var v) ? v : defaultValue; + private static double ParseDouble(string name, double defaultValue) => + double.TryParse(Environment.GetEnvironmentVariable(name), out var v) ? v : defaultValue; + + private sealed class CounterSnapshot + { + internal long _received, _dispatched, _dropped; + public long Received => Interlocked.Read(ref _received); + public long Dispatched => Interlocked.Read(ref _dispatched); + public long Dropped => Interlocked.Read(ref _dropped); + } +}