diff --git a/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/Grpc/Services/ScadaGrpcService.cs b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/Grpc/Services/ScadaGrpcService.cs index 63fdd61..0f961da 100644 --- a/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/Grpc/Services/ScadaGrpcService.cs +++ b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/Grpc/Services/ScadaGrpcService.cs @@ -360,7 +360,7 @@ namespace ZB.MOM.WW.LmxProxy.Host.Grpc.Services throw new RpcException(new GrpcStatus(StatusCode.Unauthenticated, "Invalid session")); } - var reader = _subscriptionManager.Subscribe( + var reader = await _subscriptionManager.SubscribeAsync( request.SessionId, request.Tags, context.CancellationToken); try diff --git a/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/MxAccess/MxAccessClient.EventHandlers.cs b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/MxAccess/MxAccessClient.EventHandlers.cs index 198409d..f62e16a 100644 --- a/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/MxAccess/MxAccessClient.EventHandlers.cs +++ b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/MxAccess/MxAccessClient.EventHandlers.cs @@ -53,21 +53,21 @@ namespace ZB.MOM.WW.LmxProxy.Host.MxAccess } } - // Invoke the stored subscription callback + // Invoke the stored subscription callback (SubscriptionManager.OnTagValueChanged). + // This is the single delivery path — OnTagValueChanged property is NOT invoked + // separately to avoid duplicate VTQ delivery. Action callback; lock (_lock) { if (!_storedSubscriptions.TryGetValue(address, out callback)) { - Log.Debug("OnDataChange for {Address} but no callback registered", address); + // Fall back to global handler if no stored callback + OnTagValueChanged?.Invoke(address, vtq); return; } } callback.Invoke(address, vtq); - - // Also route to the SubscriptionManager's global handler - OnTagValueChanged?.Invoke(address, vtq); } catch (Exception ex) { diff --git a/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/Subscriptions/SubscriptionManager.cs b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/Subscriptions/SubscriptionManager.cs index 739538c..471bfe2 100644 --- a/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/Subscriptions/SubscriptionManager.cs +++ b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/Subscriptions/SubscriptionManager.cs @@ -32,6 +32,15 @@ namespace ZB.MOM.WW.LmxProxy.Host.Subscriptions private readonly ReaderWriterLockSlim _rwLock = new ReaderWriterLockSlim(); + // Serializes Subscribe and UnsubscribeClient to prevent race conditions + // where old-session unsubscribe removes new-session COM subscriptions. + private readonly SemaphoreSlim _subscriptionGate = new SemaphoreSlim(1, 1); + + // Tags that failed MxAccess subscription (e.g., MxAccess was down). + // Retried on reconnect via RetryPendingSubscriptions(). + private readonly HashSet _pendingTags + = new HashSet(StringComparer.OrdinalIgnoreCase); + public SubscriptionManager(IScadaClient scadaClient, int channelCapacity = 1000, BoundedChannelFullMode channelFullMode = BoundedChannelFullMode.DropOldest) { @@ -42,59 +51,69 @@ namespace ZB.MOM.WW.LmxProxy.Host.Subscriptions /// /// Creates a subscription for a client. Returns a ChannelReader to stream from. + /// Serialized with UnsubscribeClient via _subscriptionGate to prevent race + /// conditions during client reconnect (old unsubscribe vs new subscribe). /// - public ChannelReader<(string address, Vtq vtq)> Subscribe( + public async Task> SubscribeAsync( string clientId, IEnumerable addresses, CancellationToken ct) { - var channel = Channel.CreateBounded<(string address, Vtq vtq)>( - new BoundedChannelOptions(_channelCapacity) - { - FullMode = _channelFullMode, - SingleReader = true, - SingleWriter = false - }); - - var addressSet = new HashSet(addresses, StringComparer.OrdinalIgnoreCase); - var clientSub = new ClientSubscription(clientId, channel, addressSet); - - _clientSubscriptions[clientId] = clientSub; - - var newTags = new List(); - - _rwLock.EnterWriteLock(); + await _subscriptionGate.WaitAsync(ct); try { - foreach (var address in addressSet) + var channel = Channel.CreateBounded<(string address, Vtq vtq)>( + new BoundedChannelOptions(_channelCapacity) + { + FullMode = _channelFullMode, + SingleReader = true, + SingleWriter = false + }); + + var addressSet = new HashSet(addresses, StringComparer.OrdinalIgnoreCase); + var clientSub = new ClientSubscription(clientId, channel, addressSet); + + _clientSubscriptions[clientId] = clientSub; + + var newTags = new List(); + + _rwLock.EnterWriteLock(); + try { - if (_tagSubscriptions.TryGetValue(address, out var tagSub)) + foreach (var address in addressSet) { - tagSub.ClientIds.Add(clientId); - } - else - { - _tagSubscriptions[address] = new TagSubscription(address, - new HashSet(StringComparer.OrdinalIgnoreCase) { clientId }); - newTags.Add(address); + if (_tagSubscriptions.TryGetValue(address, out var tagSub)) + { + tagSub.ClientIds.Add(clientId); + } + else + { + _tagSubscriptions[address] = new TagSubscription(address, + new HashSet(StringComparer.OrdinalIgnoreCase) { clientId }); + newTags.Add(address); + } } } + finally + { + _rwLock.ExitWriteLock(); + } + + // Create MxAccess COM subscriptions for newly subscribed tags (awaited, not fire-and-forget) + if (newTags.Count > 0) + { + await CreateMxAccessSubscriptionsAsync(newTags); + } + + // Register cancellation cleanup + ct.Register(() => UnsubscribeClient(clientId)); + + Log.Information("Client {ClientId} subscribed to {Count} tags ({NewCount} new MxAccess subscriptions)", + clientId, addressSet.Count, newTags.Count); + return channel.Reader; } finally { - _rwLock.ExitWriteLock(); + _subscriptionGate.Release(); } - - // Create MxAccess COM subscriptions for newly subscribed tags - if (newTags.Count > 0) - { - _ = CreateMxAccessSubscriptionsAsync(newTags); - } - - // Register cancellation cleanup - ct.Register(() => UnsubscribeClient(clientId)); - - Log.Information("Client {ClientId} subscribed to {Count} tags ({NewCount} new MxAccess subscriptions)", - clientId, addressSet.Count, newTags.Count); - return channel.Reader; } private async Task CreateMxAccessSubscriptionsAsync(List addresses) @@ -104,10 +123,25 @@ namespace ZB.MOM.WW.LmxProxy.Host.Subscriptions await _scadaClient.SubscribeAsync( addresses, (address, vtq) => OnTagValueChanged(address, vtq)); + + // Successful — remove from pending if they were there + lock (_pendingTags) + { + foreach (var address in addresses) + _pendingTags.Remove(address); + } } catch (Exception ex) { - Log.Error(ex, "Failed to create MxAccess subscriptions for {Count} tags", addresses.Count); + Log.Error(ex, "Failed to create MxAccess subscriptions for {Count} tags — " + + "storing as pending for retry on reconnect", addresses.Count); + + // Store failed addresses for retry when MxAccess reconnects + lock (_pendingTags) + { + foreach (var address in addresses) + _pendingTags.Add(address); + } } } @@ -153,56 +187,72 @@ namespace ZB.MOM.WW.LmxProxy.Host.Subscriptions /// /// Removes a client's subscriptions and cleans up tag subscriptions - /// when the last client unsubscribes. + /// when the last client unsubscribes. Serialized with SubscribeAsync + /// via _subscriptionGate to prevent race conditions. /// public void UnsubscribeClient(string clientId) { - if (!_clientSubscriptions.TryRemove(clientId, out var clientSub)) - return; - - var tagsToDispose = new List(); - - _rwLock.EnterWriteLock(); + _subscriptionGate.Wait(); try { - foreach (var address in clientSub.Addresses) - { - if (_tagSubscriptions.TryGetValue(address, out var tagSub)) - { - tagSub.ClientIds.Remove(clientId); + if (!_clientSubscriptions.TryRemove(clientId, out var clientSub)) + return; - // Last client unsubscribed — remove the tag subscription - if (tagSub.ClientIds.Count == 0) + var tagsToDispose = new List(); + + _rwLock.EnterWriteLock(); + try + { + foreach (var address in clientSub.Addresses) + { + if (_tagSubscriptions.TryGetValue(address, out var tagSub)) { - _tagSubscriptions.TryRemove(address, out _); - tagsToDispose.Add(address); + tagSub.ClientIds.Remove(clientId); + + // Last client unsubscribed — remove the tag subscription + if (tagSub.ClientIds.Count == 0) + { + _tagSubscriptions.TryRemove(address, out _); + tagsToDispose.Add(address); + } } } } + finally + { + _rwLock.ExitWriteLock(); + } + + // Unsubscribe tags with no remaining clients via address-based API + if (tagsToDispose.Count > 0) + { + // Also remove from pending if they were awaiting retry + lock (_pendingTags) + { + foreach (var address in tagsToDispose) + _pendingTags.Remove(address); + } + + try + { + _scadaClient.UnsubscribeByAddressAsync(tagsToDispose).GetAwaiter().GetResult(); + } + catch (Exception ex) + { + Log.Warning(ex, "Error unsubscribing {Count} tags from MxAccess", tagsToDispose.Count); + } + } + + // Complete the channel (signals end of stream to the gRPC handler) + clientSub.Channel.Writer.TryComplete(); + + Log.Information("Client {ClientId} unsubscribed ({Delivered} delivered, {Dropped} dropped)", + clientId, clientSub.DeliveredCount, clientSub.DroppedCount); } finally { - _rwLock.ExitWriteLock(); + _subscriptionGate.Release(); } - - // Unsubscribe tags with no remaining clients via address-based API - if (tagsToDispose.Count > 0) - { - try - { - _scadaClient.UnsubscribeByAddressAsync(tagsToDispose).GetAwaiter().GetResult(); - } - catch (Exception ex) - { - Log.Warning(ex, "Error unsubscribing {Count} tags from MxAccess", tagsToDispose.Count); - } - } - - // Complete the channel (signals end of stream to the gRPC handler) - clientSub.Channel.Writer.TryComplete(); - - Log.Information("Client {ClientId} unsubscribed ({Delivered} delivered, {Dropped} dropped)", - clientId, clientSub.DeliveredCount, clientSub.DroppedCount); } /// @@ -223,8 +273,8 @@ namespace ZB.MOM.WW.LmxProxy.Host.Subscriptions } /// - /// Logs reconnection for observability. Data flow resumes automatically - /// via MxAccessClient.RecreateStoredSubscriptionsAsync callbacks. + /// Called when MxAccess reconnects. Retries any pending subscriptions + /// that failed during the disconnected period. /// public void NotifyReconnection() { @@ -232,6 +282,24 @@ namespace ZB.MOM.WW.LmxProxy.Host.Subscriptions "data flow will resume via OnDataChange callbacks " + "({ClientCount} clients, {TagCount} tags)", _clientSubscriptions.Count, _tagSubscriptions.Count); + + _ = RetryPendingSubscriptionsAsync(); + } + + /// + /// Retries MxAccess subscriptions for tags that failed during disconnect. + /// + private async Task RetryPendingSubscriptionsAsync() + { + List tagsToRetry; + lock (_pendingTags) + { + if (_pendingTags.Count == 0) return; + tagsToRetry = new List(_pendingTags); + } + + Log.Information("Retrying {Count} pending MxAccess subscriptions after reconnect", tagsToRetry.Count); + await CreateMxAccessSubscriptionsAsync(tagsToRetry); } /// Returns subscription statistics. @@ -252,6 +320,7 @@ namespace ZB.MOM.WW.LmxProxy.Host.Subscriptions _clientSubscriptions.Clear(); _tagSubscriptions.Clear(); _rwLock.Dispose(); + _subscriptionGate.Dispose(); } // ── Nested types ───────────────────────────────────────── diff --git a/lmxproxy/tests/ZB.MOM.WW.LmxProxy.Host.Tests/Health/HealthCheckServiceTests.cs b/lmxproxy/tests/ZB.MOM.WW.LmxProxy.Host.Tests/Health/HealthCheckServiceTests.cs index a735907..64b1827 100644 --- a/lmxproxy/tests/ZB.MOM.WW.LmxProxy.Host.Tests/Health/HealthCheckServiceTests.cs +++ b/lmxproxy/tests/ZB.MOM.WW.LmxProxy.Host.Tests/Health/HealthCheckServiceTests.cs @@ -104,7 +104,7 @@ namespace ZB.MOM.WW.LmxProxy.Host.Tests.Health for (int i = 0; i < 101; i++) { using var cts = new CancellationTokenSource(); - sm.Subscribe("client-" + i, new[] { "tag1" }, cts.Token); + await sm.SubscribeAsync("client-" + i, new[] { "tag1" }, cts.Token); } var svc = new HealthCheckService(client, sm, pm); diff --git a/lmxproxy/tests/ZB.MOM.WW.LmxProxy.Host.Tests/Subscriptions/SubscriptionManagerTests.cs b/lmxproxy/tests/ZB.MOM.WW.LmxProxy.Host.Tests/Subscriptions/SubscriptionManagerTests.cs index b12af80..cf33642 100644 --- a/lmxproxy/tests/ZB.MOM.WW.LmxProxy.Host.Tests/Subscriptions/SubscriptionManagerTests.cs +++ b/lmxproxy/tests/ZB.MOM.WW.LmxProxy.Host.Tests/Subscriptions/SubscriptionManagerTests.cs @@ -1,5 +1,6 @@ using System; using System.Collections.Generic; +using System.Linq; using System.Threading; using System.Threading.Channels; using System.Threading.Tasks; @@ -32,11 +33,34 @@ namespace ZB.MOM.WW.LmxProxy.Host.Tests.Subscriptions Task.FromResult((false, 0)); public Task ProbeConnectionAsync(string testTagAddress, int timeoutMs, CancellationToken ct = default) => Task.FromResult(ProbeResult.Healthy(Quality.Good, DateTime.UtcNow)); - public Task UnsubscribeByAddressAsync(IEnumerable addresses) => Task.CompletedTask; - public Task SubscribeAsync(IEnumerable addresses, Action callback, CancellationToken ct = default) => - Task.FromResult(new FakeSubscriptionHandle()); public ValueTask DisposeAsync() => default; + // Track subscribe/unsubscribe calls for assertions + public List> SubscribeCalls { get; } = new List>(); + public List> UnsubscribeCalls { get; } = new List>(); + public List> StoredCallbacks { get; } = new List>(); + + // When true, SubscribeAsync throws to simulate MxAccess being down + public bool FailSubscriptions { get; set; } + + public Task UnsubscribeByAddressAsync(IEnumerable addresses) + { + UnsubscribeCalls.Add(addresses.ToList()); + return Task.CompletedTask; + } + + public Task SubscribeAsync(IEnumerable addresses, Action callback, CancellationToken ct = default) + { + var addressList = addresses.ToList(); + SubscribeCalls.Add(addressList); + StoredCallbacks.Add(callback); + + if (FailSubscriptions) + throw new InvalidOperationException("Not connected to MxAccess"); + + return Task.FromResult(new FakeSubscriptionHandle()); + } + // Suppress unused event warning internal void FireEvent() => ConnectionStateChanged?.Invoke(this, null!); @@ -47,11 +71,11 @@ namespace ZB.MOM.WW.LmxProxy.Host.Tests.Subscriptions } [Fact] - public void Subscribe_ReturnsChannelReader() + public async Task Subscribe_ReturnsChannelReader() { using var sm = new SubscriptionManager(new FakeScadaClient()); using var cts = new CancellationTokenSource(); - var reader = sm.Subscribe("client1", new[] { "Tag1", "Tag2" }, cts.Token); + var reader = await sm.SubscribeAsync("client1", new[] { "Tag1", "Tag2" }, cts.Token); reader.Should().NotBeNull(); } @@ -60,7 +84,7 @@ namespace ZB.MOM.WW.LmxProxy.Host.Tests.Subscriptions { using var sm = new SubscriptionManager(new FakeScadaClient()); using var cts = new CancellationTokenSource(); - var reader = sm.Subscribe("client1", new[] { "Motor.Speed" }, cts.Token); + var reader = await sm.SubscribeAsync("client1", new[] { "Motor.Speed" }, cts.Token); var vtq = Vtq.Good(42.0); sm.OnTagValueChanged("Motor.Speed", vtq); @@ -76,8 +100,8 @@ namespace ZB.MOM.WW.LmxProxy.Host.Tests.Subscriptions { using var sm = new SubscriptionManager(new FakeScadaClient()); using var cts = new CancellationTokenSource(); - var reader1 = sm.Subscribe("client1", new[] { "Motor.Speed" }, cts.Token); - var reader2 = sm.Subscribe("client2", new[] { "Motor.Speed" }, cts.Token); + var reader1 = await sm.SubscribeAsync("client1", new[] { "Motor.Speed" }, cts.Token); + var reader2 = await sm.SubscribeAsync("client2", new[] { "Motor.Speed" }, cts.Token); sm.OnTagValueChanged("Motor.Speed", Vtq.Good(99.0)); @@ -88,11 +112,11 @@ namespace ZB.MOM.WW.LmxProxy.Host.Tests.Subscriptions } [Fact] - public void OnTagValueChanged_NonSubscribedTag_NoDelivery() + public async Task OnTagValueChanged_NonSubscribedTag_NoDelivery() { using var sm = new SubscriptionManager(new FakeScadaClient()); using var cts = new CancellationTokenSource(); - var reader = sm.Subscribe("client1", new[] { "Motor.Speed" }, cts.Token); + var reader = await sm.SubscribeAsync("client1", new[] { "Motor.Speed" }, cts.Token); sm.OnTagValueChanged("Motor.Torque", Vtq.Good(10.0)); @@ -101,11 +125,11 @@ namespace ZB.MOM.WW.LmxProxy.Host.Tests.Subscriptions } [Fact] - public void UnsubscribeClient_CompletesChannel() + public async Task UnsubscribeClient_CompletesChannel() { using var sm = new SubscriptionManager(new FakeScadaClient()); using var cts = new CancellationTokenSource(); - var reader = sm.Subscribe("client1", new[] { "Motor.Speed" }, cts.Token); + var reader = await sm.SubscribeAsync("client1", new[] { "Motor.Speed" }, cts.Token); sm.UnsubscribeClient("client1"); @@ -114,11 +138,11 @@ namespace ZB.MOM.WW.LmxProxy.Host.Tests.Subscriptions } [Fact] - public void UnsubscribeClient_RemovesFromTagSubscriptions() + public async Task UnsubscribeClient_RemovesFromTagSubscriptions() { using var sm = new SubscriptionManager(new FakeScadaClient()); using var cts = new CancellationTokenSource(); - sm.Subscribe("client1", new[] { "Motor.Speed" }, cts.Token); + await sm.SubscribeAsync("client1", new[] { "Motor.Speed" }, cts.Token); sm.UnsubscribeClient("client1"); @@ -128,12 +152,12 @@ namespace ZB.MOM.WW.LmxProxy.Host.Tests.Subscriptions } [Fact] - public void RefCounting_LastClientUnsubscribeRemovesTag() + public async Task RefCounting_LastClientUnsubscribeRemovesTag() { using var sm = new SubscriptionManager(new FakeScadaClient()); using var cts = new CancellationTokenSource(); - sm.Subscribe("client1", new[] { "Motor.Speed" }, cts.Token); - sm.Subscribe("client2", new[] { "Motor.Speed" }, cts.Token); + await sm.SubscribeAsync("client1", new[] { "Motor.Speed" }, cts.Token); + await sm.SubscribeAsync("client2", new[] { "Motor.Speed" }, cts.Token); sm.GetStats().TotalTags.Should().Be(1); @@ -145,11 +169,11 @@ namespace ZB.MOM.WW.LmxProxy.Host.Tests.Subscriptions } [Fact] - public void NotifyDisconnection_SendsBadQualityToAll() + public async Task NotifyDisconnection_SendsBadQualityToAll() { using var sm = new SubscriptionManager(new FakeScadaClient()); using var cts = new CancellationTokenSource(); - var reader = sm.Subscribe("client1", new[] { "Motor.Speed", "Motor.Torque" }, cts.Token); + var reader = await sm.SubscribeAsync("client1", new[] { "Motor.Speed", "Motor.Torque" }, cts.Token); sm.NotifyDisconnection(); @@ -161,11 +185,11 @@ namespace ZB.MOM.WW.LmxProxy.Host.Tests.Subscriptions } [Fact] - public void Backpressure_DropOldest_DropsWhenFull() + public async Task Backpressure_DropOldest_DropsWhenFull() { using var sm = new SubscriptionManager(new FakeScadaClient(), channelCapacity: 3); using var cts = new CancellationTokenSource(); - var reader = sm.Subscribe("client1", new[] { "Motor.Speed" }, cts.Token); + var reader = await sm.SubscribeAsync("client1", new[] { "Motor.Speed" }, cts.Token); // Fill the channel beyond capacity for (int i = 0; i < 10; i++) @@ -180,17 +204,123 @@ namespace ZB.MOM.WW.LmxProxy.Host.Tests.Subscriptions } [Fact] - public void GetStats_ReturnsCorrectCounts() + public async Task GetStats_ReturnsCorrectCounts() { using var sm = new SubscriptionManager(new FakeScadaClient()); using var cts = new CancellationTokenSource(); - sm.Subscribe("c1", new[] { "Tag1", "Tag2" }, cts.Token); - sm.Subscribe("c2", new[] { "Tag2", "Tag3" }, cts.Token); + await sm.SubscribeAsync("c1", new[] { "Tag1", "Tag2" }, cts.Token); + await sm.SubscribeAsync("c2", new[] { "Tag2", "Tag3" }, cts.Token); var stats = sm.GetStats(); stats.TotalClients.Should().Be(2); stats.TotalTags.Should().Be(3); // Tag1, Tag2, Tag3 stats.ActiveSubscriptions.Should().Be(4); // c1:Tag1, c1:Tag2, c2:Tag2, c2:Tag3 } + + // ── New tests for race condition fix ────────────────────────── + + [Fact] + public async Task SubscribeAfterUnsubscribe_CreatesMxAccessSubscriptions() + { + // Verifies FIX 1: when a client disconnects and reconnects with the same tags, + // the new subscribe must create fresh MxAccess subscriptions (not skip them + // because old handles still exist). + var fake = new FakeScadaClient(); + using var sm = new SubscriptionManager(fake); + using var cts = new CancellationTokenSource(); + + // First client subscribes + await sm.SubscribeAsync("client1", new[] { "Motor.Speed" }, cts.Token); + fake.SubscribeCalls.Should().HaveCount(1); + fake.SubscribeCalls[0].Should().Contain("Motor.Speed"); + + // Client disconnects — unsubscribe removes the tag (ref count → 0) + sm.UnsubscribeClient("client1"); + fake.UnsubscribeCalls.Should().HaveCount(1); + fake.UnsubscribeCalls[0].Should().Contain("Motor.Speed"); + + // Same client reconnects — must create a NEW MxAccess subscription + await sm.SubscribeAsync("client1", new[] { "Motor.Speed" }, cts.Token); + fake.SubscribeCalls.Should().HaveCount(2, "new subscribe must create fresh MxAccess subscription"); + fake.SubscribeCalls[1].Should().Contain("Motor.Speed"); + } + + [Fact] + public async Task SubscribeAfterUnsubscribe_SerializedByGate() + { + // Verifies FIX 1: subscribe and unsubscribe are serialized so they cannot + // interleave and cause the race condition. + var fake = new FakeScadaClient(); + using var sm = new SubscriptionManager(fake); + using var cts = new CancellationTokenSource(); + + var tags = new[] { "Tag.A", "Tag.B", "Tag.C" }; + + // Subscribe, unsubscribe, re-subscribe in sequence + await sm.SubscribeAsync("session1", tags, cts.Token); + sm.UnsubscribeClient("session1"); + await sm.SubscribeAsync("session2", tags, cts.Token); + + // Both subscribes should have called SubscribeAsync on the scada client + fake.SubscribeCalls.Should().HaveCount(2); + // The unsubscribe in between should have cleaned up + fake.UnsubscribeCalls.Should().HaveCount(1); + + // Data should flow to the new session + var reader = await sm.SubscribeAsync("session3", tags, cts.Token); + sm.OnTagValueChanged("Tag.A", Vtq.Good(1.0)); + var result = await reader.ReadAsync(cts.Token); + result.vtq.Value.Should().Be(1.0); + } + + [Fact] + public async Task OnTagValueChanged_NoDuplicateDelivery() + { + // Verifies FIX 2: each OnDataChange produces exactly one VTQ per client, + // not two (which happened when both stored callback and OnTagValueChanged + // property were invoked). + var fake = new FakeScadaClient(); + using var sm = new SubscriptionManager(fake); + using var cts = new CancellationTokenSource(); + + var reader = await sm.SubscribeAsync("client1", new[] { "Motor.Speed" }, cts.Token); + + // Deliver one update + sm.OnTagValueChanged("Motor.Speed", Vtq.Good(42.0)); + + // Should receive exactly one message + reader.TryRead(out var msg).Should().BeTrue(); + msg.vtq.Value.Should().Be(42.0); + + // No duplicate + reader.TryRead(out _).Should().BeFalse("each update should be delivered exactly once"); + } + + [Fact] + public async Task FailedSubscription_StoredAsPending_RetriedOnReconnect() + { + // Verifies FIX 3: when MxAccess is down during subscribe, tags are stored + // as pending and retried when NotifyReconnection is called. + var fake = new FakeScadaClient(); + fake.FailSubscriptions = true; + using var sm = new SubscriptionManager(fake); + using var cts = new CancellationTokenSource(); + + // Subscribe while MxAccess is "down" — should not throw (errors are logged) + var reader = await sm.SubscribeAsync("client1", new[] { "Motor.Speed" }, cts.Token); + reader.Should().NotBeNull(); + fake.SubscribeCalls.Should().HaveCount(1); + + // MxAccess comes back up + fake.FailSubscriptions = false; + sm.NotifyReconnection(); + + // Give the async retry a moment to complete + await Task.Delay(100); + + // Should have retried the subscription + fake.SubscribeCalls.Should().HaveCount(2, "pending subscriptions should be retried on reconnect"); + fake.SubscribeCalls[1].Should().Contain("Motor.Speed"); + } } }