using System.Threading.Channels; using Google.Protobuf.WellKnownTypes; using ZB.MOM.WW.MxGateway.Contracts.Proto; using Shouldly; using Xunit; using ZB.MOM.WW.OtOpcUa.Core.Abstractions; using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Runtime; namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests.Runtime; /// /// Regression coverage for Driver.Galaxy-001 (Critical): when the gw StreamEvents /// stream faults, the must notify the reconnect supervisor /// rather than silently logging and exiting. Without the onStreamFault /// hand-off a transient gateway transport drop permanently kills the event stream. /// public sealed class EventPumpStreamFaultTests { private const int WaitMs = 2_000; [Fact] public async Task StreamFault_InvokesOnStreamFaultCallback_WithTheCause() { var subscriber = new FaultingSubscriber(); var registry = new SubscriptionRegistry(); var faultObserved = new TaskCompletionSource( TaskCreationOptions.RunContinuationsAsynchronously); await using var pump = new EventPump( subscriber, registry, channelCapacity: 4, clientName: "FaultTest", onStreamFault: ex => faultObserved.TrySetResult(ex)); pump.Start(); // Drop the gw stream — RunAsync's await foreach throws. subscriber.FaultStream(new IOException("simulated gateway transport drop")); var completed = await Task.WhenAny(faultObserved.Task, Task.Delay(WaitMs)); completed.ShouldBe(faultObserved.Task, "EventPump must invoke onStreamFault when the gw StreamEvents stream faults"); (await faultObserved.Task).ShouldBeOfType(); } [Fact] public async Task StreamFault_DrivesReconnectSupervisorReopenReplay() { // End-to-end: a faulting EventPump wired to a real ReconnectSupervisor must // drive the supervisor through its reopen → replay recovery loop. var subscriber = new FaultingSubscriber(); var registry = new SubscriptionRegistry(); var reopenRan = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); var replayRan = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); using var supervisor = new ReconnectSupervisor( reopen: _ => { reopenRan.TrySetResult(); return Task.CompletedTask; }, replay: _ => { replayRan.TrySetResult(); return Task.CompletedTask; }, options: new ReconnectOptions( InitialBackoffOverride: TimeSpan.FromMilliseconds(5), MaxBackoffOverride: TimeSpan.FromMilliseconds(20))); await using var pump = new EventPump( subscriber, registry, channelCapacity: 4, clientName: "FaultTest", onStreamFault: supervisor.ReportTransportFailure); pump.Start(); supervisor.CurrentState.ShouldBe(ReconnectSupervisor.State.Healthy); subscriber.FaultStream(new IOException("simulated gateway transport drop")); (await Task.WhenAny(reopenRan.Task, Task.Delay(WaitMs))).ShouldBe(reopenRan.Task, "stream fault must trigger the supervisor's reopen path"); (await Task.WhenAny(replayRan.Task, Task.Delay(WaitMs))).ShouldBe(replayRan.Task, "stream fault must trigger the supervisor's replay path"); await supervisor.WaitForHealthyAsync(new CancellationTokenSource(WaitMs).Token); supervisor.IsDegraded.ShouldBeFalse(); } [Fact] public async Task FaultedPump_IsNotRestartableInPlace_ButAFreshPumpResumesDispatch() { // Regression coverage for Driver.Galaxy-008 (High): after a stream fault the old // pump's RunAsync loop has exited and its channel is completed — EventPump.Start() // is a no-op on a non-null-but-completed loop, so the recovery path must DISPOSE // the faulted pump and create a FRESH one. This test pins both halves of that: // (a) the faulted pump is dead, (b) a new pump on a live stream resumes OnDataChange. var registry = new SubscriptionRegistry(); registry.Register(1, [new TagBinding("Tank.Level", ItemHandle: 7)]); // --- first pump: faults, then is "restarted" (no-op) and confirmed dead --- var faulted = new FaultingSubscriber(); var staleObserved = false; var oldPump = new EventPump(faulted, registry, channelCapacity: 8, clientName: "Restart"); oldPump.OnDataChange += (_, _) => staleObserved = true; oldPump.Start(); faulted.FaultStream(new IOException("simulated gateway transport drop")); await Task.Delay(100); // In-place Start() after a fault is a no-op — the loop task is non-null but done. oldPump.Start(); await oldPump.DisposeAsync(); // --- fresh pump on a live re-subscribed stream: OnDataChange must resume --- var resubscribed = new ReplaySubscriber(); var resumed = new TaskCompletionSource( TaskCreationOptions.RunContinuationsAsynchronously); await using var newPump = new EventPump( resubscribed, registry, channelCapacity: 8, clientName: "Restart"); newPump.OnDataChange += (_, args) => resumed.TrySetResult(args); newPump.Start(); await resubscribed.EmitAsync(itemHandle: 7, value: 123.0); var completed = await Task.WhenAny(resumed.Task, Task.Delay(WaitMs)); completed.ShouldBe(resumed.Task, "a fresh EventPump created after a fault must resume dispatching OnDataChange"); (await resumed.Task).FullReference.ShouldBe("Tank.Level"); staleObserved.ShouldBeFalse("the faulted pump must not dispatch after its stream dropped"); } [Fact] public async Task CleanShutdown_DoesNotInvokeOnStreamFault() { var subscriber = new FaultingSubscriber(); var registry = new SubscriptionRegistry(); var faulted = false; var pump = new EventPump( subscriber, registry, channelCapacity: 4, clientName: "FaultTest", onStreamFault: _ => faulted = true); pump.Start(); // Graceful disposal cancels the loop — that is OperationCanceledException, // not a transport fault, and must NOT trip the supervisor. await pump.DisposeAsync(); faulted.ShouldBeFalse("clean shutdown must not be reported as a transport fault"); } /// /// fake whose StreamEvents stream can be faulted /// on demand so the EventPump's RunAsync catch path is exercised. /// private sealed class FaultingSubscriber : IGalaxySubscriber { private readonly Channel _stream = Channel.CreateUnbounded(new UnboundedChannelOptions { SingleReader = true }); public Task> SubscribeBulkAsync( IReadOnlyList fullReferences, int bufferedUpdateIntervalMs, CancellationToken cancellationToken) => Task.FromResult>([]); public Task UnsubscribeBulkAsync(IReadOnlyList itemHandles, CancellationToken cancellationToken) => Task.CompletedTask; public IAsyncEnumerable StreamEventsAsync(CancellationToken cancellationToken) => _stream.Reader.ReadAllAsync(cancellationToken); /// Fault the stream so the pump's await foreach throws. public void FaultStream(Exception cause) => _stream.Writer.TryComplete(cause); } /// /// fake modelling the post-reconnect stream — a /// fresh, healthy StreamEvents the recovery path's new EventPump consumes. /// private sealed class ReplaySubscriber : IGalaxySubscriber { private readonly Channel _stream = Channel.CreateUnbounded(new UnboundedChannelOptions { SingleReader = true }); public Task> SubscribeBulkAsync( IReadOnlyList fullReferences, int bufferedUpdateIntervalMs, CancellationToken cancellationToken) => Task.FromResult>([]); public Task UnsubscribeBulkAsync(IReadOnlyList itemHandles, CancellationToken cancellationToken) => Task.CompletedTask; public IAsyncEnumerable StreamEventsAsync(CancellationToken cancellationToken) => _stream.Reader.ReadAllAsync(cancellationToken); public ValueTask EmitAsync(int itemHandle, double value) => _stream.Writer.WriteAsync(new MxEvent { Family = MxEventFamily.OnDataChange, ItemHandle = itemHandle, Value = new MxValue { DoubleValue = value }, Quality = 192, SourceTimestamp = Timestamp.FromDateTime(DateTime.UtcNow), }); } }