diff --git a/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/Grpc/Services/ScadaGrpcService.cs b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/Grpc/Services/ScadaGrpcService.cs index 63fdd61..0f961da 100644 --- a/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/Grpc/Services/ScadaGrpcService.cs +++ b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/Grpc/Services/ScadaGrpcService.cs @@ -360,7 +360,7 @@ namespace ZB.MOM.WW.LmxProxy.Host.Grpc.Services throw new RpcException(new GrpcStatus(StatusCode.Unauthenticated, "Invalid session")); } - var reader = _subscriptionManager.Subscribe( + var reader = await _subscriptionManager.SubscribeAsync( request.SessionId, request.Tags, context.CancellationToken); try diff --git a/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/Subscriptions/SubscriptionManager.cs b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/Subscriptions/SubscriptionManager.cs index 739538c..05e709a 100644 --- a/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/Subscriptions/SubscriptionManager.cs +++ b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/Subscriptions/SubscriptionManager.cs @@ -42,8 +42,10 @@ namespace ZB.MOM.WW.LmxProxy.Host.Subscriptions /// /// Creates a subscription for a client. Returns a ChannelReader to stream from. + /// Awaits COM subscription creation so the initial OnDataChange callback + /// is not missed. /// - public ChannelReader<(string address, Vtq vtq)> Subscribe( + public async Task> SubscribeAsync( string clientId, IEnumerable addresses, CancellationToken ct) { var channel = Channel.CreateBounded<(string address, Vtq vtq)>( @@ -83,10 +85,13 @@ namespace ZB.MOM.WW.LmxProxy.Host.Subscriptions _rwLock.ExitWriteLock(); } - // Create MxAccess COM subscriptions for newly subscribed tags + // Create MxAccess COM subscriptions — awaited so the initial + // OnDataChange (first value delivery after AdviseSupervisory) + // is not lost. The channel and routing are already set up above, + // so any callback that fires during this call will be delivered. if (newTags.Count > 0) { - _ = CreateMxAccessSubscriptionsAsync(newTags); + await CreateMxAccessSubscriptionsAsync(newTags); } // Register cancellation cleanup diff --git a/lmxproxy/tests/ZB.MOM.WW.LmxProxy.Host.Tests/Health/HealthCheckServiceTests.cs b/lmxproxy/tests/ZB.MOM.WW.LmxProxy.Host.Tests/Health/HealthCheckServiceTests.cs index a735907..64b1827 100644 --- a/lmxproxy/tests/ZB.MOM.WW.LmxProxy.Host.Tests/Health/HealthCheckServiceTests.cs +++ b/lmxproxy/tests/ZB.MOM.WW.LmxProxy.Host.Tests/Health/HealthCheckServiceTests.cs @@ -104,7 +104,7 @@ namespace ZB.MOM.WW.LmxProxy.Host.Tests.Health for (int i = 0; i < 101; i++) { using var cts = new CancellationTokenSource(); - sm.Subscribe("client-" + i, new[] { "tag1" }, cts.Token); + await sm.SubscribeAsync("client-" + i, new[] { "tag1" }, cts.Token); } var svc = new HealthCheckService(client, sm, pm); diff --git a/lmxproxy/tests/ZB.MOM.WW.LmxProxy.Host.Tests/Subscriptions/SubscriptionManagerTests.cs b/lmxproxy/tests/ZB.MOM.WW.LmxProxy.Host.Tests/Subscriptions/SubscriptionManagerTests.cs index b12af80..cb60a1b 100644 --- a/lmxproxy/tests/ZB.MOM.WW.LmxProxy.Host.Tests/Subscriptions/SubscriptionManagerTests.cs +++ b/lmxproxy/tests/ZB.MOM.WW.LmxProxy.Host.Tests/Subscriptions/SubscriptionManagerTests.cs @@ -47,11 +47,11 @@ namespace ZB.MOM.WW.LmxProxy.Host.Tests.Subscriptions } [Fact] - public void Subscribe_ReturnsChannelReader() + public async Task Subscribe_ReturnsChannelReader() { using var sm = new SubscriptionManager(new FakeScadaClient()); using var cts = new CancellationTokenSource(); - var reader = sm.Subscribe("client1", new[] { "Tag1", "Tag2" }, cts.Token); + var reader = await sm.SubscribeAsync("client1", new[] { "Tag1", "Tag2" }, cts.Token); reader.Should().NotBeNull(); } @@ -60,7 +60,7 @@ namespace ZB.MOM.WW.LmxProxy.Host.Tests.Subscriptions { using var sm = new SubscriptionManager(new FakeScadaClient()); using var cts = new CancellationTokenSource(); - var reader = sm.Subscribe("client1", new[] { "Motor.Speed" }, cts.Token); + var reader = await sm.SubscribeAsync("client1", new[] { "Motor.Speed" }, cts.Token); var vtq = Vtq.Good(42.0); sm.OnTagValueChanged("Motor.Speed", vtq); @@ -76,8 +76,8 @@ namespace ZB.MOM.WW.LmxProxy.Host.Tests.Subscriptions { using var sm = new SubscriptionManager(new FakeScadaClient()); using var cts = new CancellationTokenSource(); - var reader1 = sm.Subscribe("client1", new[] { "Motor.Speed" }, cts.Token); - var reader2 = sm.Subscribe("client2", new[] { "Motor.Speed" }, cts.Token); + var reader1 = await sm.SubscribeAsync("client1", new[] { "Motor.Speed" }, cts.Token); + var reader2 = await sm.SubscribeAsync("client2", new[] { "Motor.Speed" }, cts.Token); sm.OnTagValueChanged("Motor.Speed", Vtq.Good(99.0)); @@ -88,11 +88,11 @@ namespace ZB.MOM.WW.LmxProxy.Host.Tests.Subscriptions } [Fact] - public void OnTagValueChanged_NonSubscribedTag_NoDelivery() + public async Task OnTagValueChanged_NonSubscribedTag_NoDelivery() { using var sm = new SubscriptionManager(new FakeScadaClient()); using var cts = new CancellationTokenSource(); - var reader = sm.Subscribe("client1", new[] { "Motor.Speed" }, cts.Token); + var reader = await sm.SubscribeAsync("client1", new[] { "Motor.Speed" }, cts.Token); sm.OnTagValueChanged("Motor.Torque", Vtq.Good(10.0)); @@ -101,11 +101,11 @@ namespace ZB.MOM.WW.LmxProxy.Host.Tests.Subscriptions } [Fact] - public void UnsubscribeClient_CompletesChannel() + public async Task UnsubscribeClient_CompletesChannel() { using var sm = new SubscriptionManager(new FakeScadaClient()); using var cts = new CancellationTokenSource(); - var reader = sm.Subscribe("client1", new[] { "Motor.Speed" }, cts.Token); + var reader = await sm.SubscribeAsync("client1", new[] { "Motor.Speed" }, cts.Token); sm.UnsubscribeClient("client1"); @@ -114,11 +114,11 @@ namespace ZB.MOM.WW.LmxProxy.Host.Tests.Subscriptions } [Fact] - public void UnsubscribeClient_RemovesFromTagSubscriptions() + public async Task UnsubscribeClient_RemovesFromTagSubscriptions() { using var sm = new SubscriptionManager(new FakeScadaClient()); using var cts = new CancellationTokenSource(); - sm.Subscribe("client1", new[] { "Motor.Speed" }, cts.Token); + await sm.SubscribeAsync("client1", new[] { "Motor.Speed" }, cts.Token); sm.UnsubscribeClient("client1"); @@ -128,12 +128,12 @@ namespace ZB.MOM.WW.LmxProxy.Host.Tests.Subscriptions } [Fact] - public void RefCounting_LastClientUnsubscribeRemovesTag() + public async Task RefCounting_LastClientUnsubscribeRemovesTag() { using var sm = new SubscriptionManager(new FakeScadaClient()); using var cts = new CancellationTokenSource(); - sm.Subscribe("client1", new[] { "Motor.Speed" }, cts.Token); - sm.Subscribe("client2", new[] { "Motor.Speed" }, cts.Token); + await sm.SubscribeAsync("client1", new[] { "Motor.Speed" }, cts.Token); + await sm.SubscribeAsync("client2", new[] { "Motor.Speed" }, cts.Token); sm.GetStats().TotalTags.Should().Be(1); @@ -145,11 +145,11 @@ namespace ZB.MOM.WW.LmxProxy.Host.Tests.Subscriptions } [Fact] - public void NotifyDisconnection_SendsBadQualityToAll() + public async Task NotifyDisconnection_SendsBadQualityToAll() { using var sm = new SubscriptionManager(new FakeScadaClient()); using var cts = new CancellationTokenSource(); - var reader = sm.Subscribe("client1", new[] { "Motor.Speed", "Motor.Torque" }, cts.Token); + var reader = await sm.SubscribeAsync("client1", new[] { "Motor.Speed", "Motor.Torque" }, cts.Token); sm.NotifyDisconnection(); @@ -161,11 +161,11 @@ namespace ZB.MOM.WW.LmxProxy.Host.Tests.Subscriptions } [Fact] - public void Backpressure_DropOldest_DropsWhenFull() + public async Task Backpressure_DropOldest_DropsWhenFull() { using var sm = new SubscriptionManager(new FakeScadaClient(), channelCapacity: 3); using var cts = new CancellationTokenSource(); - var reader = sm.Subscribe("client1", new[] { "Motor.Speed" }, cts.Token); + var reader = await sm.SubscribeAsync("client1", new[] { "Motor.Speed" }, cts.Token); // Fill the channel beyond capacity for (int i = 0; i < 10; i++) @@ -180,12 +180,12 @@ namespace ZB.MOM.WW.LmxProxy.Host.Tests.Subscriptions } [Fact] - public void GetStats_ReturnsCorrectCounts() + public async Task GetStats_ReturnsCorrectCounts() { using var sm = new SubscriptionManager(new FakeScadaClient()); using var cts = new CancellationTokenSource(); - sm.Subscribe("c1", new[] { "Tag1", "Tag2" }, cts.Token); - sm.Subscribe("c2", new[] { "Tag2", "Tag3" }, cts.Token); + await sm.SubscribeAsync("c1", new[] { "Tag1", "Tag2" }, cts.Token); + await sm.SubscribeAsync("c2", new[] { "Tag2", "Tag3" }, cts.Token); var stats = sm.GetStats(); stats.TotalClients.Should().Be(2);