using System.Diagnostics; using System.Diagnostics.Metrics; using Shouldly; using Xunit; using ZB.MOM.WW.OtOpcUa.Core.Abstractions; namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.ParityTests; /// /// PR 6.4 — long-running soak scenario for the in-process Galaxy driver against a /// live mxaccessgw. Subscribes a configurable tag count, holds the subscription /// for a configurable duration, polls the EventPump's three counters /// (galaxy.events.received / galaxy.events.dispatched / /// galaxy.events.dropped) every minute, and asserts: /// /// events.received continues to grow (the gw stream isn't stuck) /// events.dropped stays under a configurable ceiling /// process working-set size doesn't grow unboundedly (leak guard) /// /// Always skipped unless the operator opts in via OTOPCUA_SOAK_RUN=1 and /// the mxgw backend is reachable. The default scenario size is 50k tags / 24h /// per the PR plan; both are env-overridable so a smoke run can shorten them /// to a few minutes for CI. /// [Trait("Category", "Soak")] [Collection(nameof(ParityCollection))] public sealed class SoakScenarioTests { private const string MeterName = "ZB.MOM.WW.OtOpcUa.Driver.Galaxy"; private readonly ParityHarness _h; public SoakScenarioTests(ParityHarness h) => _h = h; [Fact] public async Task Soak_HoldsSubscription_AndKeepsEventStreamFlowing() { var run = Environment.GetEnvironmentVariable("OTOPCUA_SOAK_RUN"); if (!string.Equals(run, "1", StringComparison.Ordinal)) { Assert.Skip("set OTOPCUA_SOAK_RUN=1 to run the 50k-tag soak (default 24h, override OTOPCUA_SOAK_MINUTES + OTOPCUA_SOAK_TAGS for CI)"); } if (_h.MxGatewayDriver is null) { Assert.Skip($"mxgateway backend unavailable: {_h.MxGatewaySkipReason}"); } var tagCount = ParseInt("OTOPCUA_SOAK_TAGS", 50_000); var soakMinutes = ParseInt("OTOPCUA_SOAK_MINUTES", 24 * 60); var dropCeilingPercent = ParseDouble("OTOPCUA_SOAK_DROP_PCT", 0.5); // 0.5% drop ceiling // Discover and pick a sample. If the live Galaxy doesn't have tagCount tags, // fall back to whatever's available — soak diagnostics still apply. var driver = _h.MxGatewayDriver!; var b = new RecordingAddressSpaceBuilder(); await ((ITagDiscovery)driver).DiscoverAsync(b, CancellationToken.None); var sample = b.Variables.Take(tagCount) .Select(v => v.AttributeInfo.FullName) .Distinct(StringComparer.OrdinalIgnoreCase) .ToArray(); if (sample.Length == 0) Assert.Skip("dev Galaxy reported zero discoverable variables — nothing to soak"); // Capture the three EventPump counters via MeterListener so we can poll // their cumulative totals once per minute. var snapshot = new CounterSnapshot(); using var listener = new MeterListener(); listener.InstrumentPublished = (instr, l) => { if (instr.Meter.Name == MeterName) l.EnableMeasurementEvents(instr); }; listener.SetMeasurementEventCallback((instr, value, _, _) => { switch (instr.Name) { case "galaxy.events.received": Interlocked.Add(ref snapshot._received, value); break; case "galaxy.events.dispatched": Interlocked.Add(ref snapshot._dispatched, value); break; case "galaxy.events.dropped": Interlocked.Add(ref snapshot._dropped, value); break; } }); listener.Start(); var initialWorkingSet = Process.GetCurrentProcess().WorkingSet64; var startedUtc = DateTime.UtcNow; var deadline = startedUtc + TimeSpan.FromMinutes(soakMinutes); var handle = await ((ISubscribable)driver) .SubscribeAsync(sample, TimeSpan.FromSeconds(1), CancellationToken.None); try { // Per-minute poll loop — pin the invariants and produce a CSV-style // log row so an operator can grep the test runner's stdout. var lastReceived = 0L; while (DateTime.UtcNow < deadline) { await Task.Delay(TimeSpan.FromMinutes(1)); var elapsed = DateTime.UtcNow - startedUtc; var ws = Process.GetCurrentProcess().WorkingSet64; Console.WriteLine( $"soak,{elapsed.TotalMinutes:F1},received={snapshot.Received},dispatched={snapshot.Dispatched},dropped={snapshot.Dropped},ws_mb={ws / 1024 / 1024}"); snapshot.Received.ShouldBeGreaterThan(lastReceived, $"events.received did not grow over the last minute (elapsed={elapsed:hh\\:mm\\:ss}) — gw stream may be stuck"); lastReceived = snapshot.Received; var droppedPct = snapshot.Received == 0 ? 0.0 : 100.0 * snapshot.Dropped / snapshot.Received; droppedPct.ShouldBeLessThan(dropCeilingPercent, $"events.dropped ratio {droppedPct:F2}% exceeded {dropCeilingPercent:F2}% ceiling at {elapsed:hh\\:mm\\:ss}"); // Working-set guard: if the process grew >1 GB above the initial // baseline, surface it. This is generous — a hot subscription stream // legitimately uses memory; we're catching unbounded leaks, not // steady-state allocation. ((ws - initialWorkingSet) / (1024L * 1024L * 1024L)) .ShouldBeLessThan(1L, $"working set grew >1 GB above baseline at {elapsed:hh\\:mm\\:ss} — possible leak"); } } finally { await ((ISubscribable)driver).UnsubscribeAsync(handle, CancellationToken.None); } } private static int ParseInt(string name, int defaultValue) => int.TryParse(Environment.GetEnvironmentVariable(name), out var v) ? v : defaultValue; private static double ParseDouble(string name, double defaultValue) => double.TryParse(Environment.GetEnvironmentVariable(name), out var v) ? v : defaultValue; private sealed class CounterSnapshot { internal long _received, _dispatched, _dropped; public long Received => Interlocked.Read(ref _received); public long Dispatched => Interlocked.Read(ref _dispatched); public long Dropped => Interlocked.Read(ref _dropped); } }