fix(lmxproxy): resolve subscribe/unsubscribe race condition on client reconnect

Three fixes for the SubscriptionManager/MxAccessClient subscription pipeline:

1. Serialize Subscribe and UnsubscribeClient with a SemaphoreSlim gate to prevent
   race where old-session unsubscribe removes new-session COM subscriptions.
   CreateMxAccessSubscriptionsAsync is now awaited instead of fire-and-forget.

2. Fix dual VTQ delivery in MxAccessClient.OnDataChange — each update was delivered
   twice (once via stored callback, once via OnTagValueChanged property). Now uses
   stored callback as the single delivery path.

3. Store pending tag addresses when CreateMxAccessSubscriptionsAsync fails (MxAccess
   down) and retry them on reconnect via NotifyReconnection/RetryPendingSubscriptionsAsync.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Joseph Doherty
2026-03-22 16:33:19 -04:00
parent bc4fc97652
commit fa33e1acf1
5 changed files with 309 additions and 110 deletions

View File

@@ -32,6 +32,15 @@ namespace ZB.MOM.WW.LmxProxy.Host.Subscriptions
private readonly ReaderWriterLockSlim _rwLock = new ReaderWriterLockSlim();
// Serializes Subscribe and UnsubscribeClient to prevent race conditions
// where old-session unsubscribe removes new-session COM subscriptions.
private readonly SemaphoreSlim _subscriptionGate = new SemaphoreSlim(1, 1);
// Tags that failed MxAccess subscription (e.g., MxAccess was down).
// Retried on reconnect via RetryPendingSubscriptions().
private readonly HashSet<string> _pendingTags
= new HashSet<string>(StringComparer.OrdinalIgnoreCase);
public SubscriptionManager(IScadaClient scadaClient, int channelCapacity = 1000,
BoundedChannelFullMode channelFullMode = BoundedChannelFullMode.DropOldest)
{
@@ -42,59 +51,69 @@ namespace ZB.MOM.WW.LmxProxy.Host.Subscriptions
/// <summary>
/// Creates a subscription for a client. Returns a ChannelReader to stream from.
/// Serialized with UnsubscribeClient via _subscriptionGate to prevent race
/// conditions during client reconnect (old unsubscribe vs new subscribe).
/// </summary>
public ChannelReader<(string address, Vtq vtq)> Subscribe(
public async Task<ChannelReader<(string address, Vtq vtq)>> SubscribeAsync(
string clientId, IEnumerable<string> addresses, CancellationToken ct)
{
var channel = Channel.CreateBounded<(string address, Vtq vtq)>(
new BoundedChannelOptions(_channelCapacity)
{
FullMode = _channelFullMode,
SingleReader = true,
SingleWriter = false
});
var addressSet = new HashSet<string>(addresses, StringComparer.OrdinalIgnoreCase);
var clientSub = new ClientSubscription(clientId, channel, addressSet);
_clientSubscriptions[clientId] = clientSub;
var newTags = new List<string>();
_rwLock.EnterWriteLock();
await _subscriptionGate.WaitAsync(ct);
try
{
foreach (var address in addressSet)
var channel = Channel.CreateBounded<(string address, Vtq vtq)>(
new BoundedChannelOptions(_channelCapacity)
{
FullMode = _channelFullMode,
SingleReader = true,
SingleWriter = false
});
var addressSet = new HashSet<string>(addresses, StringComparer.OrdinalIgnoreCase);
var clientSub = new ClientSubscription(clientId, channel, addressSet);
_clientSubscriptions[clientId] = clientSub;
var newTags = new List<string>();
_rwLock.EnterWriteLock();
try
{
if (_tagSubscriptions.TryGetValue(address, out var tagSub))
foreach (var address in addressSet)
{
tagSub.ClientIds.Add(clientId);
}
else
{
_tagSubscriptions[address] = new TagSubscription(address,
new HashSet<string>(StringComparer.OrdinalIgnoreCase) { clientId });
newTags.Add(address);
if (_tagSubscriptions.TryGetValue(address, out var tagSub))
{
tagSub.ClientIds.Add(clientId);
}
else
{
_tagSubscriptions[address] = new TagSubscription(address,
new HashSet<string>(StringComparer.OrdinalIgnoreCase) { clientId });
newTags.Add(address);
}
}
}
finally
{
_rwLock.ExitWriteLock();
}
// Create MxAccess COM subscriptions for newly subscribed tags (awaited, not fire-and-forget)
if (newTags.Count > 0)
{
await CreateMxAccessSubscriptionsAsync(newTags);
}
// Register cancellation cleanup
ct.Register(() => UnsubscribeClient(clientId));
Log.Information("Client {ClientId} subscribed to {Count} tags ({NewCount} new MxAccess subscriptions)",
clientId, addressSet.Count, newTags.Count);
return channel.Reader;
}
finally
{
_rwLock.ExitWriteLock();
_subscriptionGate.Release();
}
// Create MxAccess COM subscriptions for newly subscribed tags
if (newTags.Count > 0)
{
_ = CreateMxAccessSubscriptionsAsync(newTags);
}
// Register cancellation cleanup
ct.Register(() => UnsubscribeClient(clientId));
Log.Information("Client {ClientId} subscribed to {Count} tags ({NewCount} new MxAccess subscriptions)",
clientId, addressSet.Count, newTags.Count);
return channel.Reader;
}
private async Task CreateMxAccessSubscriptionsAsync(List<string> addresses)
@@ -104,10 +123,25 @@ namespace ZB.MOM.WW.LmxProxy.Host.Subscriptions
await _scadaClient.SubscribeAsync(
addresses,
(address, vtq) => OnTagValueChanged(address, vtq));
// Successful — remove from pending if they were there
lock (_pendingTags)
{
foreach (var address in addresses)
_pendingTags.Remove(address);
}
}
catch (Exception ex)
{
Log.Error(ex, "Failed to create MxAccess subscriptions for {Count} tags", addresses.Count);
Log.Error(ex, "Failed to create MxAccess subscriptions for {Count} tags — " +
"storing as pending for retry on reconnect", addresses.Count);
// Store failed addresses for retry when MxAccess reconnects
lock (_pendingTags)
{
foreach (var address in addresses)
_pendingTags.Add(address);
}
}
}
@@ -153,56 +187,72 @@ namespace ZB.MOM.WW.LmxProxy.Host.Subscriptions
/// <summary>
/// Removes a client's subscriptions and cleans up tag subscriptions
/// when the last client unsubscribes.
/// when the last client unsubscribes. Serialized with SubscribeAsync
/// via _subscriptionGate to prevent race conditions.
/// </summary>
public void UnsubscribeClient(string clientId)
{
if (!_clientSubscriptions.TryRemove(clientId, out var clientSub))
return;
var tagsToDispose = new List<string>();
_rwLock.EnterWriteLock();
_subscriptionGate.Wait();
try
{
foreach (var address in clientSub.Addresses)
{
if (_tagSubscriptions.TryGetValue(address, out var tagSub))
{
tagSub.ClientIds.Remove(clientId);
if (!_clientSubscriptions.TryRemove(clientId, out var clientSub))
return;
// Last client unsubscribed — remove the tag subscription
if (tagSub.ClientIds.Count == 0)
var tagsToDispose = new List<string>();
_rwLock.EnterWriteLock();
try
{
foreach (var address in clientSub.Addresses)
{
if (_tagSubscriptions.TryGetValue(address, out var tagSub))
{
_tagSubscriptions.TryRemove(address, out _);
tagsToDispose.Add(address);
tagSub.ClientIds.Remove(clientId);
// Last client unsubscribed — remove the tag subscription
if (tagSub.ClientIds.Count == 0)
{
_tagSubscriptions.TryRemove(address, out _);
tagsToDispose.Add(address);
}
}
}
}
finally
{
_rwLock.ExitWriteLock();
}
// Unsubscribe tags with no remaining clients via address-based API
if (tagsToDispose.Count > 0)
{
// Also remove from pending if they were awaiting retry
lock (_pendingTags)
{
foreach (var address in tagsToDispose)
_pendingTags.Remove(address);
}
try
{
_scadaClient.UnsubscribeByAddressAsync(tagsToDispose).GetAwaiter().GetResult();
}
catch (Exception ex)
{
Log.Warning(ex, "Error unsubscribing {Count} tags from MxAccess", tagsToDispose.Count);
}
}
// Complete the channel (signals end of stream to the gRPC handler)
clientSub.Channel.Writer.TryComplete();
Log.Information("Client {ClientId} unsubscribed ({Delivered} delivered, {Dropped} dropped)",
clientId, clientSub.DeliveredCount, clientSub.DroppedCount);
}
finally
{
_rwLock.ExitWriteLock();
_subscriptionGate.Release();
}
// Unsubscribe tags with no remaining clients via address-based API
if (tagsToDispose.Count > 0)
{
try
{
_scadaClient.UnsubscribeByAddressAsync(tagsToDispose).GetAwaiter().GetResult();
}
catch (Exception ex)
{
Log.Warning(ex, "Error unsubscribing {Count} tags from MxAccess", tagsToDispose.Count);
}
}
// Complete the channel (signals end of stream to the gRPC handler)
clientSub.Channel.Writer.TryComplete();
Log.Information("Client {ClientId} unsubscribed ({Delivered} delivered, {Dropped} dropped)",
clientId, clientSub.DeliveredCount, clientSub.DroppedCount);
}
/// <summary>
@@ -223,8 +273,8 @@ namespace ZB.MOM.WW.LmxProxy.Host.Subscriptions
}
/// <summary>
/// Logs reconnection for observability. Data flow resumes automatically
/// via MxAccessClient.RecreateStoredSubscriptionsAsync callbacks.
/// Called when MxAccess reconnects. Retries any pending subscriptions
/// that failed during the disconnected period.
/// </summary>
public void NotifyReconnection()
{
@@ -232,6 +282,24 @@ namespace ZB.MOM.WW.LmxProxy.Host.Subscriptions
"data flow will resume via OnDataChange callbacks " +
"({ClientCount} clients, {TagCount} tags)",
_clientSubscriptions.Count, _tagSubscriptions.Count);
_ = RetryPendingSubscriptionsAsync();
}
/// <summary>
/// Retries MxAccess subscriptions for tags that failed during disconnect.
/// </summary>
private async Task RetryPendingSubscriptionsAsync()
{
List<string> tagsToRetry;
lock (_pendingTags)
{
if (_pendingTags.Count == 0) return;
tagsToRetry = new List<string>(_pendingTags);
}
Log.Information("Retrying {Count} pending MxAccess subscriptions after reconnect", tagsToRetry.Count);
await CreateMxAccessSubscriptionsAsync(tagsToRetry);
}
/// <summary>Returns subscription statistics.</summary>
@@ -252,6 +320,7 @@ namespace ZB.MOM.WW.LmxProxy.Host.Subscriptions
_clientSubscriptions.Clear();
_tagSubscriptions.Clear();
_rwLock.Dispose();
_subscriptionGate.Dispose();
}
// ── Nested types ─────────────────────────────────────────