From c96e71c83c74f5fc763fab7540c85079a90391b5 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sun, 22 Mar 2026 16:55:06 -0400 Subject: [PATCH] Revert "fix(lmxproxy): resolve subscribe/unsubscribe race condition on client reconnect" This reverts commit 9e9efbecab399fd7dcfb3e7e14e8b08418c3c3fc. --- .../Grpc/Services/ScadaGrpcService.cs | 2 +- .../MxAccess/MxAccessClient.EventHandlers.cs | 10 +- .../Subscriptions/SubscriptionManager.cs | 225 ++++++------------ .../Health/HealthCheckServiceTests.cs | 2 +- .../Subscriptions/SubscriptionManagerTests.cs | 178 ++------------ 5 files changed, 109 insertions(+), 308 deletions(-) diff --git a/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/Grpc/Services/ScadaGrpcService.cs b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/Grpc/Services/ScadaGrpcService.cs index 0f961da..63fdd61 100644 --- a/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/Grpc/Services/ScadaGrpcService.cs +++ b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/Grpc/Services/ScadaGrpcService.cs @@ -360,7 +360,7 @@ namespace ZB.MOM.WW.LmxProxy.Host.Grpc.Services throw new RpcException(new GrpcStatus(StatusCode.Unauthenticated, "Invalid session")); } - var reader = await _subscriptionManager.SubscribeAsync( + var reader = _subscriptionManager.Subscribe( request.SessionId, request.Tags, context.CancellationToken); try diff --git a/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/MxAccess/MxAccessClient.EventHandlers.cs b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/MxAccess/MxAccessClient.EventHandlers.cs index f62e16a..198409d 100644 --- a/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/MxAccess/MxAccessClient.EventHandlers.cs +++ b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/MxAccess/MxAccessClient.EventHandlers.cs @@ -53,21 +53,21 @@ namespace ZB.MOM.WW.LmxProxy.Host.MxAccess } } - // Invoke the stored subscription callback (SubscriptionManager.OnTagValueChanged). - // This is the single delivery path — OnTagValueChanged property is NOT invoked - // separately to avoid duplicate VTQ delivery. + // Invoke the stored subscription callback Action callback; lock (_lock) { if (!_storedSubscriptions.TryGetValue(address, out callback)) { - // Fall back to global handler if no stored callback - OnTagValueChanged?.Invoke(address, vtq); + Log.Debug("OnDataChange for {Address} but no callback registered", address); return; } } callback.Invoke(address, vtq); + + // Also route to the SubscriptionManager's global handler + OnTagValueChanged?.Invoke(address, vtq); } catch (Exception ex) { diff --git a/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/Subscriptions/SubscriptionManager.cs b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/Subscriptions/SubscriptionManager.cs index 471bfe2..739538c 100644 --- a/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/Subscriptions/SubscriptionManager.cs +++ b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/Subscriptions/SubscriptionManager.cs @@ -32,15 +32,6 @@ namespace ZB.MOM.WW.LmxProxy.Host.Subscriptions private readonly ReaderWriterLockSlim _rwLock = new ReaderWriterLockSlim(); - // Serializes Subscribe and UnsubscribeClient to prevent race conditions - // where old-session unsubscribe removes new-session COM subscriptions. - private readonly SemaphoreSlim _subscriptionGate = new SemaphoreSlim(1, 1); - - // Tags that failed MxAccess subscription (e.g., MxAccess was down). - // Retried on reconnect via RetryPendingSubscriptions(). - private readonly HashSet _pendingTags - = new HashSet(StringComparer.OrdinalIgnoreCase); - public SubscriptionManager(IScadaClient scadaClient, int channelCapacity = 1000, BoundedChannelFullMode channelFullMode = BoundedChannelFullMode.DropOldest) { @@ -51,69 +42,59 @@ namespace ZB.MOM.WW.LmxProxy.Host.Subscriptions /// /// Creates a subscription for a client. Returns a ChannelReader to stream from. - /// Serialized with UnsubscribeClient via _subscriptionGate to prevent race - /// conditions during client reconnect (old unsubscribe vs new subscribe). /// - public async Task> SubscribeAsync( + public ChannelReader<(string address, Vtq vtq)> Subscribe( string clientId, IEnumerable addresses, CancellationToken ct) { - await _subscriptionGate.WaitAsync(ct); + var channel = Channel.CreateBounded<(string address, Vtq vtq)>( + new BoundedChannelOptions(_channelCapacity) + { + FullMode = _channelFullMode, + SingleReader = true, + SingleWriter = false + }); + + var addressSet = new HashSet(addresses, StringComparer.OrdinalIgnoreCase); + var clientSub = new ClientSubscription(clientId, channel, addressSet); + + _clientSubscriptions[clientId] = clientSub; + + var newTags = new List(); + + _rwLock.EnterWriteLock(); try { - var channel = Channel.CreateBounded<(string address, Vtq vtq)>( - new BoundedChannelOptions(_channelCapacity) - { - FullMode = _channelFullMode, - SingleReader = true, - SingleWriter = false - }); - - var addressSet = new HashSet(addresses, StringComparer.OrdinalIgnoreCase); - var clientSub = new ClientSubscription(clientId, channel, addressSet); - - _clientSubscriptions[clientId] = clientSub; - - var newTags = new List(); - - _rwLock.EnterWriteLock(); - try + foreach (var address in addressSet) { - foreach (var address in addressSet) + if (_tagSubscriptions.TryGetValue(address, out var tagSub)) { - if (_tagSubscriptions.TryGetValue(address, out var tagSub)) - { - tagSub.ClientIds.Add(clientId); - } - else - { - _tagSubscriptions[address] = new TagSubscription(address, - new HashSet(StringComparer.OrdinalIgnoreCase) { clientId }); - newTags.Add(address); - } + tagSub.ClientIds.Add(clientId); + } + else + { + _tagSubscriptions[address] = new TagSubscription(address, + new HashSet(StringComparer.OrdinalIgnoreCase) { clientId }); + newTags.Add(address); } } - finally - { - _rwLock.ExitWriteLock(); - } - - // Create MxAccess COM subscriptions for newly subscribed tags (awaited, not fire-and-forget) - if (newTags.Count > 0) - { - await CreateMxAccessSubscriptionsAsync(newTags); - } - - // Register cancellation cleanup - ct.Register(() => UnsubscribeClient(clientId)); - - Log.Information("Client {ClientId} subscribed to {Count} tags ({NewCount} new MxAccess subscriptions)", - clientId, addressSet.Count, newTags.Count); - return channel.Reader; } finally { - _subscriptionGate.Release(); + _rwLock.ExitWriteLock(); } + + // Create MxAccess COM subscriptions for newly subscribed tags + if (newTags.Count > 0) + { + _ = CreateMxAccessSubscriptionsAsync(newTags); + } + + // Register cancellation cleanup + ct.Register(() => UnsubscribeClient(clientId)); + + Log.Information("Client {ClientId} subscribed to {Count} tags ({NewCount} new MxAccess subscriptions)", + clientId, addressSet.Count, newTags.Count); + return channel.Reader; } private async Task CreateMxAccessSubscriptionsAsync(List addresses) @@ -123,25 +104,10 @@ namespace ZB.MOM.WW.LmxProxy.Host.Subscriptions await _scadaClient.SubscribeAsync( addresses, (address, vtq) => OnTagValueChanged(address, vtq)); - - // Successful — remove from pending if they were there - lock (_pendingTags) - { - foreach (var address in addresses) - _pendingTags.Remove(address); - } } catch (Exception ex) { - Log.Error(ex, "Failed to create MxAccess subscriptions for {Count} tags — " + - "storing as pending for retry on reconnect", addresses.Count); - - // Store failed addresses for retry when MxAccess reconnects - lock (_pendingTags) - { - foreach (var address in addresses) - _pendingTags.Add(address); - } + Log.Error(ex, "Failed to create MxAccess subscriptions for {Count} tags", addresses.Count); } } @@ -187,72 +153,56 @@ namespace ZB.MOM.WW.LmxProxy.Host.Subscriptions /// /// Removes a client's subscriptions and cleans up tag subscriptions - /// when the last client unsubscribes. Serialized with SubscribeAsync - /// via _subscriptionGate to prevent race conditions. + /// when the last client unsubscribes. /// public void UnsubscribeClient(string clientId) { - _subscriptionGate.Wait(); + if (!_clientSubscriptions.TryRemove(clientId, out var clientSub)) + return; + + var tagsToDispose = new List(); + + _rwLock.EnterWriteLock(); try { - if (!_clientSubscriptions.TryRemove(clientId, out var clientSub)) - return; - - var tagsToDispose = new List(); - - _rwLock.EnterWriteLock(); - try + foreach (var address in clientSub.Addresses) { - foreach (var address in clientSub.Addresses) + if (_tagSubscriptions.TryGetValue(address, out var tagSub)) { - if (_tagSubscriptions.TryGetValue(address, out var tagSub)) - { - tagSub.ClientIds.Remove(clientId); + tagSub.ClientIds.Remove(clientId); - // Last client unsubscribed — remove the tag subscription - if (tagSub.ClientIds.Count == 0) - { - _tagSubscriptions.TryRemove(address, out _); - tagsToDispose.Add(address); - } + // Last client unsubscribed — remove the tag subscription + if (tagSub.ClientIds.Count == 0) + { + _tagSubscriptions.TryRemove(address, out _); + tagsToDispose.Add(address); } } } - finally - { - _rwLock.ExitWriteLock(); - } - - // Unsubscribe tags with no remaining clients via address-based API - if (tagsToDispose.Count > 0) - { - // Also remove from pending if they were awaiting retry - lock (_pendingTags) - { - foreach (var address in tagsToDispose) - _pendingTags.Remove(address); - } - - try - { - _scadaClient.UnsubscribeByAddressAsync(tagsToDispose).GetAwaiter().GetResult(); - } - catch (Exception ex) - { - Log.Warning(ex, "Error unsubscribing {Count} tags from MxAccess", tagsToDispose.Count); - } - } - - // Complete the channel (signals end of stream to the gRPC handler) - clientSub.Channel.Writer.TryComplete(); - - Log.Information("Client {ClientId} unsubscribed ({Delivered} delivered, {Dropped} dropped)", - clientId, clientSub.DeliveredCount, clientSub.DroppedCount); } finally { - _subscriptionGate.Release(); + _rwLock.ExitWriteLock(); } + + // Unsubscribe tags with no remaining clients via address-based API + if (tagsToDispose.Count > 0) + { + try + { + _scadaClient.UnsubscribeByAddressAsync(tagsToDispose).GetAwaiter().GetResult(); + } + catch (Exception ex) + { + Log.Warning(ex, "Error unsubscribing {Count} tags from MxAccess", tagsToDispose.Count); + } + } + + // Complete the channel (signals end of stream to the gRPC handler) + clientSub.Channel.Writer.TryComplete(); + + Log.Information("Client {ClientId} unsubscribed ({Delivered} delivered, {Dropped} dropped)", + clientId, clientSub.DeliveredCount, clientSub.DroppedCount); } /// @@ -273,8 +223,8 @@ namespace ZB.MOM.WW.LmxProxy.Host.Subscriptions } /// - /// Called when MxAccess reconnects. Retries any pending subscriptions - /// that failed during the disconnected period. + /// Logs reconnection for observability. Data flow resumes automatically + /// via MxAccessClient.RecreateStoredSubscriptionsAsync callbacks. /// public void NotifyReconnection() { @@ -282,24 +232,6 @@ namespace ZB.MOM.WW.LmxProxy.Host.Subscriptions "data flow will resume via OnDataChange callbacks " + "({ClientCount} clients, {TagCount} tags)", _clientSubscriptions.Count, _tagSubscriptions.Count); - - _ = RetryPendingSubscriptionsAsync(); - } - - /// - /// Retries MxAccess subscriptions for tags that failed during disconnect. - /// - private async Task RetryPendingSubscriptionsAsync() - { - List tagsToRetry; - lock (_pendingTags) - { - if (_pendingTags.Count == 0) return; - tagsToRetry = new List(_pendingTags); - } - - Log.Information("Retrying {Count} pending MxAccess subscriptions after reconnect", tagsToRetry.Count); - await CreateMxAccessSubscriptionsAsync(tagsToRetry); } /// Returns subscription statistics. @@ -320,7 +252,6 @@ namespace ZB.MOM.WW.LmxProxy.Host.Subscriptions _clientSubscriptions.Clear(); _tagSubscriptions.Clear(); _rwLock.Dispose(); - _subscriptionGate.Dispose(); } // ── Nested types ───────────────────────────────────────── diff --git a/lmxproxy/tests/ZB.MOM.WW.LmxProxy.Host.Tests/Health/HealthCheckServiceTests.cs b/lmxproxy/tests/ZB.MOM.WW.LmxProxy.Host.Tests/Health/HealthCheckServiceTests.cs index 64b1827..a735907 100644 --- a/lmxproxy/tests/ZB.MOM.WW.LmxProxy.Host.Tests/Health/HealthCheckServiceTests.cs +++ b/lmxproxy/tests/ZB.MOM.WW.LmxProxy.Host.Tests/Health/HealthCheckServiceTests.cs @@ -104,7 +104,7 @@ namespace ZB.MOM.WW.LmxProxy.Host.Tests.Health for (int i = 0; i < 101; i++) { using var cts = new CancellationTokenSource(); - await sm.SubscribeAsync("client-" + i, new[] { "tag1" }, cts.Token); + sm.Subscribe("client-" + i, new[] { "tag1" }, cts.Token); } var svc = new HealthCheckService(client, sm, pm); diff --git a/lmxproxy/tests/ZB.MOM.WW.LmxProxy.Host.Tests/Subscriptions/SubscriptionManagerTests.cs b/lmxproxy/tests/ZB.MOM.WW.LmxProxy.Host.Tests/Subscriptions/SubscriptionManagerTests.cs index cf33642..b12af80 100644 --- a/lmxproxy/tests/ZB.MOM.WW.LmxProxy.Host.Tests/Subscriptions/SubscriptionManagerTests.cs +++ b/lmxproxy/tests/ZB.MOM.WW.LmxProxy.Host.Tests/Subscriptions/SubscriptionManagerTests.cs @@ -1,6 +1,5 @@ using System; using System.Collections.Generic; -using System.Linq; using System.Threading; using System.Threading.Channels; using System.Threading.Tasks; @@ -33,34 +32,11 @@ namespace ZB.MOM.WW.LmxProxy.Host.Tests.Subscriptions Task.FromResult((false, 0)); public Task ProbeConnectionAsync(string testTagAddress, int timeoutMs, CancellationToken ct = default) => Task.FromResult(ProbeResult.Healthy(Quality.Good, DateTime.UtcNow)); + public Task UnsubscribeByAddressAsync(IEnumerable addresses) => Task.CompletedTask; + public Task SubscribeAsync(IEnumerable addresses, Action callback, CancellationToken ct = default) => + Task.FromResult(new FakeSubscriptionHandle()); public ValueTask DisposeAsync() => default; - // Track subscribe/unsubscribe calls for assertions - public List> SubscribeCalls { get; } = new List>(); - public List> UnsubscribeCalls { get; } = new List>(); - public List> StoredCallbacks { get; } = new List>(); - - // When true, SubscribeAsync throws to simulate MxAccess being down - public bool FailSubscriptions { get; set; } - - public Task UnsubscribeByAddressAsync(IEnumerable addresses) - { - UnsubscribeCalls.Add(addresses.ToList()); - return Task.CompletedTask; - } - - public Task SubscribeAsync(IEnumerable addresses, Action callback, CancellationToken ct = default) - { - var addressList = addresses.ToList(); - SubscribeCalls.Add(addressList); - StoredCallbacks.Add(callback); - - if (FailSubscriptions) - throw new InvalidOperationException("Not connected to MxAccess"); - - return Task.FromResult(new FakeSubscriptionHandle()); - } - // Suppress unused event warning internal void FireEvent() => ConnectionStateChanged?.Invoke(this, null!); @@ -71,11 +47,11 @@ namespace ZB.MOM.WW.LmxProxy.Host.Tests.Subscriptions } [Fact] - public async Task Subscribe_ReturnsChannelReader() + public void Subscribe_ReturnsChannelReader() { using var sm = new SubscriptionManager(new FakeScadaClient()); using var cts = new CancellationTokenSource(); - var reader = await sm.SubscribeAsync("client1", new[] { "Tag1", "Tag2" }, cts.Token); + var reader = sm.Subscribe("client1", new[] { "Tag1", "Tag2" }, cts.Token); reader.Should().NotBeNull(); } @@ -84,7 +60,7 @@ namespace ZB.MOM.WW.LmxProxy.Host.Tests.Subscriptions { using var sm = new SubscriptionManager(new FakeScadaClient()); using var cts = new CancellationTokenSource(); - var reader = await sm.SubscribeAsync("client1", new[] { "Motor.Speed" }, cts.Token); + var reader = sm.Subscribe("client1", new[] { "Motor.Speed" }, cts.Token); var vtq = Vtq.Good(42.0); sm.OnTagValueChanged("Motor.Speed", vtq); @@ -100,8 +76,8 @@ namespace ZB.MOM.WW.LmxProxy.Host.Tests.Subscriptions { using var sm = new SubscriptionManager(new FakeScadaClient()); using var cts = new CancellationTokenSource(); - var reader1 = await sm.SubscribeAsync("client1", new[] { "Motor.Speed" }, cts.Token); - var reader2 = await sm.SubscribeAsync("client2", new[] { "Motor.Speed" }, cts.Token); + var reader1 = sm.Subscribe("client1", new[] { "Motor.Speed" }, cts.Token); + var reader2 = sm.Subscribe("client2", new[] { "Motor.Speed" }, cts.Token); sm.OnTagValueChanged("Motor.Speed", Vtq.Good(99.0)); @@ -112,11 +88,11 @@ namespace ZB.MOM.WW.LmxProxy.Host.Tests.Subscriptions } [Fact] - public async Task OnTagValueChanged_NonSubscribedTag_NoDelivery() + public void OnTagValueChanged_NonSubscribedTag_NoDelivery() { using var sm = new SubscriptionManager(new FakeScadaClient()); using var cts = new CancellationTokenSource(); - var reader = await sm.SubscribeAsync("client1", new[] { "Motor.Speed" }, cts.Token); + var reader = sm.Subscribe("client1", new[] { "Motor.Speed" }, cts.Token); sm.OnTagValueChanged("Motor.Torque", Vtq.Good(10.0)); @@ -125,11 +101,11 @@ namespace ZB.MOM.WW.LmxProxy.Host.Tests.Subscriptions } [Fact] - public async Task UnsubscribeClient_CompletesChannel() + public void UnsubscribeClient_CompletesChannel() { using var sm = new SubscriptionManager(new FakeScadaClient()); using var cts = new CancellationTokenSource(); - var reader = await sm.SubscribeAsync("client1", new[] { "Motor.Speed" }, cts.Token); + var reader = sm.Subscribe("client1", new[] { "Motor.Speed" }, cts.Token); sm.UnsubscribeClient("client1"); @@ -138,11 +114,11 @@ namespace ZB.MOM.WW.LmxProxy.Host.Tests.Subscriptions } [Fact] - public async Task UnsubscribeClient_RemovesFromTagSubscriptions() + public void UnsubscribeClient_RemovesFromTagSubscriptions() { using var sm = new SubscriptionManager(new FakeScadaClient()); using var cts = new CancellationTokenSource(); - await sm.SubscribeAsync("client1", new[] { "Motor.Speed" }, cts.Token); + sm.Subscribe("client1", new[] { "Motor.Speed" }, cts.Token); sm.UnsubscribeClient("client1"); @@ -152,12 +128,12 @@ namespace ZB.MOM.WW.LmxProxy.Host.Tests.Subscriptions } [Fact] - public async Task RefCounting_LastClientUnsubscribeRemovesTag() + public void RefCounting_LastClientUnsubscribeRemovesTag() { using var sm = new SubscriptionManager(new FakeScadaClient()); using var cts = new CancellationTokenSource(); - await sm.SubscribeAsync("client1", new[] { "Motor.Speed" }, cts.Token); - await sm.SubscribeAsync("client2", new[] { "Motor.Speed" }, cts.Token); + sm.Subscribe("client1", new[] { "Motor.Speed" }, cts.Token); + sm.Subscribe("client2", new[] { "Motor.Speed" }, cts.Token); sm.GetStats().TotalTags.Should().Be(1); @@ -169,11 +145,11 @@ namespace ZB.MOM.WW.LmxProxy.Host.Tests.Subscriptions } [Fact] - public async Task NotifyDisconnection_SendsBadQualityToAll() + public void NotifyDisconnection_SendsBadQualityToAll() { using var sm = new SubscriptionManager(new FakeScadaClient()); using var cts = new CancellationTokenSource(); - var reader = await sm.SubscribeAsync("client1", new[] { "Motor.Speed", "Motor.Torque" }, cts.Token); + var reader = sm.Subscribe("client1", new[] { "Motor.Speed", "Motor.Torque" }, cts.Token); sm.NotifyDisconnection(); @@ -185,11 +161,11 @@ namespace ZB.MOM.WW.LmxProxy.Host.Tests.Subscriptions } [Fact] - public async Task Backpressure_DropOldest_DropsWhenFull() + public void Backpressure_DropOldest_DropsWhenFull() { using var sm = new SubscriptionManager(new FakeScadaClient(), channelCapacity: 3); using var cts = new CancellationTokenSource(); - var reader = await sm.SubscribeAsync("client1", new[] { "Motor.Speed" }, cts.Token); + var reader = sm.Subscribe("client1", new[] { "Motor.Speed" }, cts.Token); // Fill the channel beyond capacity for (int i = 0; i < 10; i++) @@ -204,123 +180,17 @@ namespace ZB.MOM.WW.LmxProxy.Host.Tests.Subscriptions } [Fact] - public async Task GetStats_ReturnsCorrectCounts() + public void GetStats_ReturnsCorrectCounts() { using var sm = new SubscriptionManager(new FakeScadaClient()); using var cts = new CancellationTokenSource(); - await sm.SubscribeAsync("c1", new[] { "Tag1", "Tag2" }, cts.Token); - await sm.SubscribeAsync("c2", new[] { "Tag2", "Tag3" }, cts.Token); + sm.Subscribe("c1", new[] { "Tag1", "Tag2" }, cts.Token); + sm.Subscribe("c2", new[] { "Tag2", "Tag3" }, cts.Token); var stats = sm.GetStats(); stats.TotalClients.Should().Be(2); stats.TotalTags.Should().Be(3); // Tag1, Tag2, Tag3 stats.ActiveSubscriptions.Should().Be(4); // c1:Tag1, c1:Tag2, c2:Tag2, c2:Tag3 } - - // ── New tests for race condition fix ────────────────────────── - - [Fact] - public async Task SubscribeAfterUnsubscribe_CreatesMxAccessSubscriptions() - { - // Verifies FIX 1: when a client disconnects and reconnects with the same tags, - // the new subscribe must create fresh MxAccess subscriptions (not skip them - // because old handles still exist). - var fake = new FakeScadaClient(); - using var sm = new SubscriptionManager(fake); - using var cts = new CancellationTokenSource(); - - // First client subscribes - await sm.SubscribeAsync("client1", new[] { "Motor.Speed" }, cts.Token); - fake.SubscribeCalls.Should().HaveCount(1); - fake.SubscribeCalls[0].Should().Contain("Motor.Speed"); - - // Client disconnects — unsubscribe removes the tag (ref count → 0) - sm.UnsubscribeClient("client1"); - fake.UnsubscribeCalls.Should().HaveCount(1); - fake.UnsubscribeCalls[0].Should().Contain("Motor.Speed"); - - // Same client reconnects — must create a NEW MxAccess subscription - await sm.SubscribeAsync("client1", new[] { "Motor.Speed" }, cts.Token); - fake.SubscribeCalls.Should().HaveCount(2, "new subscribe must create fresh MxAccess subscription"); - fake.SubscribeCalls[1].Should().Contain("Motor.Speed"); - } - - [Fact] - public async Task SubscribeAfterUnsubscribe_SerializedByGate() - { - // Verifies FIX 1: subscribe and unsubscribe are serialized so they cannot - // interleave and cause the race condition. - var fake = new FakeScadaClient(); - using var sm = new SubscriptionManager(fake); - using var cts = new CancellationTokenSource(); - - var tags = new[] { "Tag.A", "Tag.B", "Tag.C" }; - - // Subscribe, unsubscribe, re-subscribe in sequence - await sm.SubscribeAsync("session1", tags, cts.Token); - sm.UnsubscribeClient("session1"); - await sm.SubscribeAsync("session2", tags, cts.Token); - - // Both subscribes should have called SubscribeAsync on the scada client - fake.SubscribeCalls.Should().HaveCount(2); - // The unsubscribe in between should have cleaned up - fake.UnsubscribeCalls.Should().HaveCount(1); - - // Data should flow to the new session - var reader = await sm.SubscribeAsync("session3", tags, cts.Token); - sm.OnTagValueChanged("Tag.A", Vtq.Good(1.0)); - var result = await reader.ReadAsync(cts.Token); - result.vtq.Value.Should().Be(1.0); - } - - [Fact] - public async Task OnTagValueChanged_NoDuplicateDelivery() - { - // Verifies FIX 2: each OnDataChange produces exactly one VTQ per client, - // not two (which happened when both stored callback and OnTagValueChanged - // property were invoked). - var fake = new FakeScadaClient(); - using var sm = new SubscriptionManager(fake); - using var cts = new CancellationTokenSource(); - - var reader = await sm.SubscribeAsync("client1", new[] { "Motor.Speed" }, cts.Token); - - // Deliver one update - sm.OnTagValueChanged("Motor.Speed", Vtq.Good(42.0)); - - // Should receive exactly one message - reader.TryRead(out var msg).Should().BeTrue(); - msg.vtq.Value.Should().Be(42.0); - - // No duplicate - reader.TryRead(out _).Should().BeFalse("each update should be delivered exactly once"); - } - - [Fact] - public async Task FailedSubscription_StoredAsPending_RetriedOnReconnect() - { - // Verifies FIX 3: when MxAccess is down during subscribe, tags are stored - // as pending and retried when NotifyReconnection is called. - var fake = new FakeScadaClient(); - fake.FailSubscriptions = true; - using var sm = new SubscriptionManager(fake); - using var cts = new CancellationTokenSource(); - - // Subscribe while MxAccess is "down" — should not throw (errors are logged) - var reader = await sm.SubscribeAsync("client1", new[] { "Motor.Speed" }, cts.Token); - reader.Should().NotBeNull(); - fake.SubscribeCalls.Should().HaveCount(1); - - // MxAccess comes back up - fake.FailSubscriptions = false; - sm.NotifyReconnection(); - - // Give the async retry a moment to complete - await Task.Delay(100); - - // Should have retried the subscription - fake.SubscribeCalls.Should().HaveCount(2, "pending subscriptions should be retried on reconnect"); - fake.SubscribeCalls[1].Should().Contain("Motor.Speed"); - } } }