From b218773ab094e48f1581ec48d0b91e3360180ead Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sun, 22 Mar 2026 23:48:01 -0400 Subject: [PATCH] fix(lmxproxy): await COM subscription creation to fix Subscribe flakiness SubscriptionManager.Subscribe was fire-and-forgetting the MxAccess COM subscription creation. The initial OnDataChange callback could fire before the subscription was established, losing the first (and possibly only) value update. Changed to async SubscribeAsync that awaits CreateMxAccessSubscriptionsAsync before returning the channel reader. Subscribe_ReceivesUpdates now passes 5/5 consecutive runs. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../Grpc/Services/ScadaGrpcService.cs | 2 +- .../Subscriptions/SubscriptionManager.cs | 11 +++-- .../Health/HealthCheckServiceTests.cs | 2 +- .../Subscriptions/SubscriptionManagerTests.cs | 42 +++++++++---------- 4 files changed, 31 insertions(+), 26 deletions(-) diff --git a/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/Grpc/Services/ScadaGrpcService.cs b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/Grpc/Services/ScadaGrpcService.cs index 63fdd61..0f961da 100644 --- a/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/Grpc/Services/ScadaGrpcService.cs +++ b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/Grpc/Services/ScadaGrpcService.cs @@ -360,7 +360,7 @@ namespace ZB.MOM.WW.LmxProxy.Host.Grpc.Services throw new RpcException(new GrpcStatus(StatusCode.Unauthenticated, "Invalid session")); } - var reader = _subscriptionManager.Subscribe( + var reader = await _subscriptionManager.SubscribeAsync( request.SessionId, request.Tags, context.CancellationToken); try diff --git a/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/Subscriptions/SubscriptionManager.cs b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/Subscriptions/SubscriptionManager.cs index 739538c..05e709a 100644 --- a/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/Subscriptions/SubscriptionManager.cs +++ b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/Subscriptions/SubscriptionManager.cs @@ -42,8 +42,10 @@ namespace ZB.MOM.WW.LmxProxy.Host.Subscriptions /// /// Creates a subscription for a client. Returns a ChannelReader to stream from. + /// Awaits COM subscription creation so the initial OnDataChange callback + /// is not missed. /// - public ChannelReader<(string address, Vtq vtq)> Subscribe( + public async Task> SubscribeAsync( string clientId, IEnumerable addresses, CancellationToken ct) { var channel = Channel.CreateBounded<(string address, Vtq vtq)>( @@ -83,10 +85,13 @@ namespace ZB.MOM.WW.LmxProxy.Host.Subscriptions _rwLock.ExitWriteLock(); } - // Create MxAccess COM subscriptions for newly subscribed tags + // Create MxAccess COM subscriptions — awaited so the initial + // OnDataChange (first value delivery after AdviseSupervisory) + // is not lost. The channel and routing are already set up above, + // so any callback that fires during this call will be delivered. if (newTags.Count > 0) { - _ = CreateMxAccessSubscriptionsAsync(newTags); + await CreateMxAccessSubscriptionsAsync(newTags); } // Register cancellation cleanup diff --git a/lmxproxy/tests/ZB.MOM.WW.LmxProxy.Host.Tests/Health/HealthCheckServiceTests.cs b/lmxproxy/tests/ZB.MOM.WW.LmxProxy.Host.Tests/Health/HealthCheckServiceTests.cs index a735907..64b1827 100644 --- a/lmxproxy/tests/ZB.MOM.WW.LmxProxy.Host.Tests/Health/HealthCheckServiceTests.cs +++ b/lmxproxy/tests/ZB.MOM.WW.LmxProxy.Host.Tests/Health/HealthCheckServiceTests.cs @@ -104,7 +104,7 @@ namespace ZB.MOM.WW.LmxProxy.Host.Tests.Health for (int i = 0; i < 101; i++) { using var cts = new CancellationTokenSource(); - sm.Subscribe("client-" + i, new[] { "tag1" }, cts.Token); + await sm.SubscribeAsync("client-" + i, new[] { "tag1" }, cts.Token); } var svc = new HealthCheckService(client, sm, pm); diff --git a/lmxproxy/tests/ZB.MOM.WW.LmxProxy.Host.Tests/Subscriptions/SubscriptionManagerTests.cs b/lmxproxy/tests/ZB.MOM.WW.LmxProxy.Host.Tests/Subscriptions/SubscriptionManagerTests.cs index b12af80..cb60a1b 100644 --- a/lmxproxy/tests/ZB.MOM.WW.LmxProxy.Host.Tests/Subscriptions/SubscriptionManagerTests.cs +++ b/lmxproxy/tests/ZB.MOM.WW.LmxProxy.Host.Tests/Subscriptions/SubscriptionManagerTests.cs @@ -47,11 +47,11 @@ namespace ZB.MOM.WW.LmxProxy.Host.Tests.Subscriptions } [Fact] - public void Subscribe_ReturnsChannelReader() + public async Task Subscribe_ReturnsChannelReader() { using var sm = new SubscriptionManager(new FakeScadaClient()); using var cts = new CancellationTokenSource(); - var reader = sm.Subscribe("client1", new[] { "Tag1", "Tag2" }, cts.Token); + var reader = await sm.SubscribeAsync("client1", new[] { "Tag1", "Tag2" }, cts.Token); reader.Should().NotBeNull(); } @@ -60,7 +60,7 @@ namespace ZB.MOM.WW.LmxProxy.Host.Tests.Subscriptions { using var sm = new SubscriptionManager(new FakeScadaClient()); using var cts = new CancellationTokenSource(); - var reader = sm.Subscribe("client1", new[] { "Motor.Speed" }, cts.Token); + var reader = await sm.SubscribeAsync("client1", new[] { "Motor.Speed" }, cts.Token); var vtq = Vtq.Good(42.0); sm.OnTagValueChanged("Motor.Speed", vtq); @@ -76,8 +76,8 @@ namespace ZB.MOM.WW.LmxProxy.Host.Tests.Subscriptions { using var sm = new SubscriptionManager(new FakeScadaClient()); using var cts = new CancellationTokenSource(); - var reader1 = sm.Subscribe("client1", new[] { "Motor.Speed" }, cts.Token); - var reader2 = sm.Subscribe("client2", new[] { "Motor.Speed" }, cts.Token); + var reader1 = await sm.SubscribeAsync("client1", new[] { "Motor.Speed" }, cts.Token); + var reader2 = await sm.SubscribeAsync("client2", new[] { "Motor.Speed" }, cts.Token); sm.OnTagValueChanged("Motor.Speed", Vtq.Good(99.0)); @@ -88,11 +88,11 @@ namespace ZB.MOM.WW.LmxProxy.Host.Tests.Subscriptions } [Fact] - public void OnTagValueChanged_NonSubscribedTag_NoDelivery() + public async Task OnTagValueChanged_NonSubscribedTag_NoDelivery() { using var sm = new SubscriptionManager(new FakeScadaClient()); using var cts = new CancellationTokenSource(); - var reader = sm.Subscribe("client1", new[] { "Motor.Speed" }, cts.Token); + var reader = await sm.SubscribeAsync("client1", new[] { "Motor.Speed" }, cts.Token); sm.OnTagValueChanged("Motor.Torque", Vtq.Good(10.0)); @@ -101,11 +101,11 @@ namespace ZB.MOM.WW.LmxProxy.Host.Tests.Subscriptions } [Fact] - public void UnsubscribeClient_CompletesChannel() + public async Task UnsubscribeClient_CompletesChannel() { using var sm = new SubscriptionManager(new FakeScadaClient()); using var cts = new CancellationTokenSource(); - var reader = sm.Subscribe("client1", new[] { "Motor.Speed" }, cts.Token); + var reader = await sm.SubscribeAsync("client1", new[] { "Motor.Speed" }, cts.Token); sm.UnsubscribeClient("client1"); @@ -114,11 +114,11 @@ namespace ZB.MOM.WW.LmxProxy.Host.Tests.Subscriptions } [Fact] - public void UnsubscribeClient_RemovesFromTagSubscriptions() + public async Task UnsubscribeClient_RemovesFromTagSubscriptions() { using var sm = new SubscriptionManager(new FakeScadaClient()); using var cts = new CancellationTokenSource(); - sm.Subscribe("client1", new[] { "Motor.Speed" }, cts.Token); + await sm.SubscribeAsync("client1", new[] { "Motor.Speed" }, cts.Token); sm.UnsubscribeClient("client1"); @@ -128,12 +128,12 @@ namespace ZB.MOM.WW.LmxProxy.Host.Tests.Subscriptions } [Fact] - public void RefCounting_LastClientUnsubscribeRemovesTag() + public async Task RefCounting_LastClientUnsubscribeRemovesTag() { using var sm = new SubscriptionManager(new FakeScadaClient()); using var cts = new CancellationTokenSource(); - sm.Subscribe("client1", new[] { "Motor.Speed" }, cts.Token); - sm.Subscribe("client2", new[] { "Motor.Speed" }, cts.Token); + await sm.SubscribeAsync("client1", new[] { "Motor.Speed" }, cts.Token); + await sm.SubscribeAsync("client2", new[] { "Motor.Speed" }, cts.Token); sm.GetStats().TotalTags.Should().Be(1); @@ -145,11 +145,11 @@ namespace ZB.MOM.WW.LmxProxy.Host.Tests.Subscriptions } [Fact] - public void NotifyDisconnection_SendsBadQualityToAll() + public async Task NotifyDisconnection_SendsBadQualityToAll() { using var sm = new SubscriptionManager(new FakeScadaClient()); using var cts = new CancellationTokenSource(); - var reader = sm.Subscribe("client1", new[] { "Motor.Speed", "Motor.Torque" }, cts.Token); + var reader = await sm.SubscribeAsync("client1", new[] { "Motor.Speed", "Motor.Torque" }, cts.Token); sm.NotifyDisconnection(); @@ -161,11 +161,11 @@ namespace ZB.MOM.WW.LmxProxy.Host.Tests.Subscriptions } [Fact] - public void Backpressure_DropOldest_DropsWhenFull() + public async Task Backpressure_DropOldest_DropsWhenFull() { using var sm = new SubscriptionManager(new FakeScadaClient(), channelCapacity: 3); using var cts = new CancellationTokenSource(); - var reader = sm.Subscribe("client1", new[] { "Motor.Speed" }, cts.Token); + var reader = await sm.SubscribeAsync("client1", new[] { "Motor.Speed" }, cts.Token); // Fill the channel beyond capacity for (int i = 0; i < 10; i++) @@ -180,12 +180,12 @@ namespace ZB.MOM.WW.LmxProxy.Host.Tests.Subscriptions } [Fact] - public void GetStats_ReturnsCorrectCounts() + public async Task GetStats_ReturnsCorrectCounts() { using var sm = new SubscriptionManager(new FakeScadaClient()); using var cts = new CancellationTokenSource(); - sm.Subscribe("c1", new[] { "Tag1", "Tag2" }, cts.Token); - sm.Subscribe("c2", new[] { "Tag2", "Tag3" }, cts.Token); + await sm.SubscribeAsync("c1", new[] { "Tag1", "Tag2" }, cts.Token); + await sm.SubscribeAsync("c2", new[] { "Tag2", "Tag3" }, cts.Token); var stats = sm.GetStats(); stats.TotalClients.Should().Be(2);