PR 6.4 — Soak scenario test

Long-running soak harness exercising the in-process GalaxyDriver
against a live mxaccessgw. Subscribes a configurable tag count
(default 50_000), holds the subscription for a configurable duration
(default 24h), polls the EventPump's three counters every minute, and
asserts:

- events.received continues to grow (gw stream isn't stuck)
- events.dropped stays under a configurable percent ceiling
  (default 0.5%)
- process working-set doesn't grow >1 GB above baseline (leak guard)

Always skipped unless the operator opts in via OTOPCUA_SOAK_RUN=1.
Tag count, duration, and drop ceiling are env-overridable
(OTOPCUA_SOAK_TAGS / OTOPCUA_SOAK_MINUTES / OTOPCUA_SOAK_DROP_PCT) so
a smoke run can compress the scenario for CI gating.

Per-minute progress is logged as a CSV-style line to stdout so an
operator can grep the test runner output mid-run. PR 6.5 consumes the
data this scenario emits to tune MxGatewayClientOptions defaults.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Joseph Doherty
2026-04-29 17:00:52 -04:00
parent 2fdad81af3
commit 698bdef572

View File

@@ -0,0 +1,138 @@
using System.Diagnostics;
using System.Diagnostics.Metrics;
using Shouldly;
using Xunit;
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.ParityTests;
/// <summary>
/// PR 6.4 — long-running soak scenario for the in-process Galaxy driver against a
/// live mxaccessgw. Subscribes a configurable tag count, holds the subscription
/// for a configurable duration, polls the EventPump's three counters
/// (<c>galaxy.events.received</c> / <c>galaxy.events.dispatched</c> /
/// <c>galaxy.events.dropped</c>) every minute, and asserts:
/// <list type="bullet">
/// <item>events.received continues to grow (the gw stream isn't stuck)</item>
/// <item>events.dropped stays under a configurable ceiling</item>
/// <item>process working-set size doesn't grow unboundedly (leak guard)</item>
/// </list>
/// Always skipped unless the operator opts in via <c>OTOPCUA_SOAK_RUN=1</c> and
/// the mxgw backend is reachable. The default scenario size is 50k tags / 24h
/// per the PR plan; both are env-overridable so a smoke run can shorten them
/// to a few minutes for CI.
/// </summary>
[Trait("Category", "Soak")]
[Collection(nameof(ParityCollection))]
public sealed class SoakScenarioTests
{
private const string MeterName = "ZB.MOM.WW.OtOpcUa.Driver.Galaxy";
private readonly ParityHarness _h;
public SoakScenarioTests(ParityHarness h) => _h = h;
[Fact]
public async Task Soak_HoldsSubscription_AndKeepsEventStreamFlowing()
{
var run = Environment.GetEnvironmentVariable("OTOPCUA_SOAK_RUN");
if (!string.Equals(run, "1", StringComparison.Ordinal))
{
Assert.Skip("set OTOPCUA_SOAK_RUN=1 to run the 50k-tag soak (default 24h, override OTOPCUA_SOAK_MINUTES + OTOPCUA_SOAK_TAGS for CI)");
}
if (_h.MxGatewayDriver is null)
{
Assert.Skip($"mxgateway backend unavailable: {_h.MxGatewaySkipReason}");
}
var tagCount = ParseInt("OTOPCUA_SOAK_TAGS", 50_000);
var soakMinutes = ParseInt("OTOPCUA_SOAK_MINUTES", 24 * 60);
var dropCeilingPercent = ParseDouble("OTOPCUA_SOAK_DROP_PCT", 0.5); // 0.5% drop ceiling
// Discover and pick a sample. If the live Galaxy doesn't have tagCount tags,
// fall back to whatever's available — soak diagnostics still apply.
var driver = _h.MxGatewayDriver!;
var b = new RecordingAddressSpaceBuilder();
await ((ITagDiscovery)driver).DiscoverAsync(b, CancellationToken.None);
var sample = b.Variables.Take(tagCount)
.Select(v => v.AttributeInfo.FullName)
.Distinct(StringComparer.OrdinalIgnoreCase)
.ToArray();
if (sample.Length == 0) Assert.Skip("dev Galaxy reported zero discoverable variables — nothing to soak");
// Capture the three EventPump counters via MeterListener so we can poll
// their cumulative totals once per minute.
var snapshot = new CounterSnapshot();
using var listener = new MeterListener();
listener.InstrumentPublished = (instr, l) =>
{
if (instr.Meter.Name == MeterName) l.EnableMeasurementEvents(instr);
};
listener.SetMeasurementEventCallback<long>((instr, value, _, _) =>
{
switch (instr.Name)
{
case "galaxy.events.received": Interlocked.Add(ref snapshot._received, value); break;
case "galaxy.events.dispatched": Interlocked.Add(ref snapshot._dispatched, value); break;
case "galaxy.events.dropped": Interlocked.Add(ref snapshot._dropped, value); break;
}
});
listener.Start();
var initialWorkingSet = Process.GetCurrentProcess().WorkingSet64;
var startedUtc = DateTime.UtcNow;
var deadline = startedUtc + TimeSpan.FromMinutes(soakMinutes);
var handle = await ((ISubscribable)driver)
.SubscribeAsync(sample, TimeSpan.FromSeconds(1), CancellationToken.None);
try
{
// Per-minute poll loop — pin the invariants and produce a CSV-style
// log row so an operator can grep the test runner's stdout.
var lastReceived = 0L;
while (DateTime.UtcNow < deadline)
{
await Task.Delay(TimeSpan.FromMinutes(1));
var elapsed = DateTime.UtcNow - startedUtc;
var ws = Process.GetCurrentProcess().WorkingSet64;
Console.WriteLine(
$"soak,{elapsed.TotalMinutes:F1},received={snapshot.Received},dispatched={snapshot.Dispatched},dropped={snapshot.Dropped},ws_mb={ws / 1024 / 1024}");
snapshot.Received.ShouldBeGreaterThan(lastReceived,
$"events.received did not grow over the last minute (elapsed={elapsed:hh\\:mm\\:ss}) — gw stream may be stuck");
lastReceived = snapshot.Received;
var droppedPct = snapshot.Received == 0
? 0.0
: 100.0 * snapshot.Dropped / snapshot.Received;
droppedPct.ShouldBeLessThan(dropCeilingPercent,
$"events.dropped ratio {droppedPct:F2}% exceeded {dropCeilingPercent:F2}% ceiling at {elapsed:hh\\:mm\\:ss}");
// Working-set guard: if the process grew >1 GB above the initial
// baseline, surface it. This is generous — a hot subscription stream
// legitimately uses memory; we're catching unbounded leaks, not
// steady-state allocation.
((ws - initialWorkingSet) / (1024L * 1024L * 1024L))
.ShouldBeLessThan(1L,
$"working set grew >1 GB above baseline at {elapsed:hh\\:mm\\:ss} — possible leak");
}
}
finally
{
await ((ISubscribable)driver).UnsubscribeAsync(handle, CancellationToken.None);
}
}
private static int ParseInt(string name, int defaultValue) =>
int.TryParse(Environment.GetEnvironmentVariable(name), out var v) ? v : defaultValue;
private static double ParseDouble(string name, double defaultValue) =>
double.TryParse(Environment.GetEnvironmentVariable(name), out var v) ? v : defaultValue;
private sealed class CounterSnapshot
{
internal long _received, _dispatched, _dropped;
public long Received => Interlocked.Read(ref _received);
public long Dispatched => Interlocked.Read(ref _dispatched);
public long Dropped => Interlocked.Read(ref _dropped);
}
}