using System.Threading.Channels; using Google.Protobuf.WellKnownTypes; using MxGateway.Contracts.Proto; using Shouldly; using Xunit; using ZB.MOM.WW.OtOpcUa.Core.Abstractions; using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Runtime; namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests.Runtime; /// /// PR B.1 — pins the EventPump's OnAlarmTransition decode path. Synthetic MxEvents /// with the new family go in; the pump fires OnAlarmTransition with the /// decoded payload + mapped severity bucket; data-change subscribers stay /// untouched. /// public sealed class EventPumpAlarmTests { [Fact] public async Task Dispatches_raise_acknowledge_clear_in_sequence() { var subscriber = new ManualSubscriber(); var registry = new SubscriptionRegistry(); var transitions = new List(); var dispatched = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); await using var pump = new EventPump(subscriber, registry, channelCapacity: 16, clientName: "AlarmTest"); pump.OnAlarmTransition += (_, transition) => { lock (transitions) { transitions.Add(transition); if (transitions.Count == 3) dispatched.TrySetResult(true); } }; pump.Start(); var raise = new DateTime(2026, 5, 1, 12, 0, 0, DateTimeKind.Utc); var ack = raise.AddSeconds(30); var clear = ack.AddSeconds(60); await subscriber.EmitAlarmAsync(NewAlarm("Tank01.Level.HiHi", AlarmTransitionKind.Raise, severity: 750, transitionTime: raise)); await subscriber.EmitAlarmAsync(NewAlarm("Tank01.Level.HiHi", AlarmTransitionKind.Acknowledge, severity: 750, transitionTime: ack, originalRaise: raise, operatorUser: "alice", operatorComment: "investigating")); await subscriber.EmitAlarmAsync(NewAlarm("Tank01.Level.HiHi", AlarmTransitionKind.Clear, severity: 750, transitionTime: clear, originalRaise: raise)); var completed = await Task.WhenAny(dispatched.Task, Task.Delay(TimeSpan.FromSeconds(2))); completed.ShouldBe(dispatched.Task, "all three alarm transitions should dispatch within 2s"); transitions.Count.ShouldBe(3); transitions[0].TransitionKind.ShouldBe(GalaxyAlarmTransitionKind.Raise); transitions[0].SeverityBucket.ShouldBe(AlarmSeverity.Critical); transitions[0].OpcUaSeverity.ShouldBe(MxAccessSeverityMapper.OpcUaSeverityCritical); transitions[0].RawMxAccessSeverity.ShouldBe(750); transitions[0].TransitionTimestampUtc.ShouldBe(raise); transitions[1].TransitionKind.ShouldBe(GalaxyAlarmTransitionKind.Acknowledge); transitions[1].OperatorUser.ShouldBe("alice"); transitions[1].OperatorComment.ShouldBe("investigating"); transitions[1].OriginalRaiseTimestampUtc.ShouldBe(raise); transitions[2].TransitionKind.ShouldBe(GalaxyAlarmTransitionKind.Clear); transitions[2].OriginalRaiseTimestampUtc.ShouldBe(raise); } [Fact] public async Task Drops_alarm_event_with_unspecified_transition_kind() { var subscriber = new ManualSubscriber(); var registry = new SubscriptionRegistry(); var transitions = new List(); await using var pump = new EventPump(subscriber, registry, channelCapacity: 4, clientName: "AlarmTest"); pump.OnAlarmTransition += (_, transition) => transitions.Add(transition); pump.Start(); await subscriber.EmitAlarmAsync(NewAlarm("Tank01.Level.HiHi", AlarmTransitionKind.Unspecified, severity: 100, transitionTime: DateTime.UtcNow)); // Give the pump a beat to drain the channel. await Task.Delay(150); transitions.ShouldBeEmpty("alarm transitions with Unspecified kind are decoder failures and must not fire OnAlarmTransition"); } [Fact] public async Task Drops_alarm_event_with_missing_body() { var subscriber = new ManualSubscriber(); var registry = new SubscriptionRegistry(); var transitions = new List(); await using var pump = new EventPump(subscriber, registry, channelCapacity: 4, clientName: "AlarmTest"); pump.OnAlarmTransition += (_, transition) => transitions.Add(transition); pump.Start(); // Family marked as alarm-transition but body left empty (worker version skew / // malformed event). Production should count + drop, not throw. await subscriber.EmitRawAsync(new MxEvent { Family = MxEventFamily.OnAlarmTransition, WorkerSequence = 42, }); await Task.Delay(150); transitions.ShouldBeEmpty(); } [Fact] public async Task Mixed_data_change_and_alarm_events_dispatch_independently() { var subscriber = new ManualSubscriber(); var registry = new SubscriptionRegistry(); registry.Register(1, [new TagBinding("Tank01.Level", ItemHandle: 7)]); var dataChanges = new List(); var alarms = new List(); var bothSeen = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); await using var pump = new EventPump(subscriber, registry, channelCapacity: 16, clientName: "MixedTest"); pump.OnDataChange += (_, args) => { lock (dataChanges) { dataChanges.Add(args); if (dataChanges.Count >= 1 && alarms.Count >= 1) bothSeen.TrySetResult(true); } }; pump.OnAlarmTransition += (_, transition) => { lock (alarms) { alarms.Add(transition); if (dataChanges.Count >= 1 && alarms.Count >= 1) bothSeen.TrySetResult(true); } }; pump.Start(); await subscriber.EmitAsync(itemHandle: 7, value: 41.0); await subscriber.EmitAlarmAsync(NewAlarm("Tank01.Level.HiHi", AlarmTransitionKind.Raise, severity: 600, transitionTime: DateTime.UtcNow)); var completed = await Task.WhenAny(bothSeen.Task, Task.Delay(TimeSpan.FromSeconds(2))); completed.ShouldBe(bothSeen.Task); dataChanges.Count.ShouldBe(1); alarms.Count.ShouldBe(1); alarms[0].SeverityBucket.ShouldBe(AlarmSeverity.High); } [Fact] public async Task Filters_out_unsupported_event_families() { var subscriber = new ManualSubscriber(); var registry = new SubscriptionRegistry(); var transitions = new List(); await using var pump = new EventPump(subscriber, registry, channelCapacity: 4, clientName: "FilterTest"); pump.OnAlarmTransition += (_, transition) => transitions.Add(transition); pump.Start(); // OnWriteComplete and OperationComplete should be silently dropped. await subscriber.EmitRawAsync(new MxEvent { Family = MxEventFamily.OnWriteComplete }); await subscriber.EmitRawAsync(new MxEvent { Family = MxEventFamily.OperationComplete }); await Task.Delay(150); transitions.ShouldBeEmpty(); } private static MxEvent NewAlarm( string fullReference, AlarmTransitionKind kind, int severity, DateTime transitionTime, DateTime? originalRaise = null, string operatorUser = "", string operatorComment = "") { var body = new OnAlarmTransitionEvent { AlarmFullReference = fullReference, SourceObjectReference = fullReference.Split('.')[0], AlarmTypeName = "AnalogLimitAlarm.HiHi", TransitionKind = kind, Severity = severity, TransitionTimestamp = Timestamp.FromDateTime(transitionTime), OperatorUser = operatorUser, OperatorComment = operatorComment, Category = "Process", Description = "Tank 01 high-high level", }; if (originalRaise is { } orts) { body.OriginalRaiseTimestamp = Timestamp.FromDateTime(orts); } return new MxEvent { Family = MxEventFamily.OnAlarmTransition, OnAlarmTransition = body, }; } private sealed class ManualSubscriber : IGalaxySubscriber { private readonly Channel _stream = Channel.CreateUnbounded(new UnboundedChannelOptions { SingleReader = true }); public Task> SubscribeBulkAsync( IReadOnlyList fullReferences, int bufferedUpdateIntervalMs, CancellationToken cancellationToken) => Task.FromResult>([]); public Task UnsubscribeBulkAsync(IReadOnlyList itemHandles, CancellationToken cancellationToken) => Task.CompletedTask; public IAsyncEnumerable StreamEventsAsync(CancellationToken cancellationToken) => _stream.Reader.ReadAllAsync(cancellationToken); public ValueTask EmitAsync(int itemHandle, double value) => _stream.Writer.WriteAsync(new MxEvent { Family = MxEventFamily.OnDataChange, ItemHandle = itemHandle, Value = new MxValue { DoubleValue = value }, Quality = 192, SourceTimestamp = Timestamp.FromDateTime(DateTime.UtcNow), }); public ValueTask EmitAlarmAsync(MxEvent ev) => _stream.Writer.WriteAsync(ev); public ValueTask EmitRawAsync(MxEvent ev) => _stream.Writer.WriteAsync(ev); } }