Revert "fix(lmxproxy): resolve subscribe/unsubscribe race condition on client reconnect"

This reverts commit 9e9efbecab399fd7dcfb3e7e14e8b08418c3c3fc.
This commit is contained in:
Joseph Doherty
2026-03-22 16:55:06 -04:00
parent fa33e1acf1
commit c96e71c83c
5 changed files with 109 additions and 308 deletions

View File

@@ -32,15 +32,6 @@ namespace ZB.MOM.WW.LmxProxy.Host.Subscriptions
private readonly ReaderWriterLockSlim _rwLock = new ReaderWriterLockSlim();
// Serializes Subscribe and UnsubscribeClient to prevent race conditions
// where old-session unsubscribe removes new-session COM subscriptions.
private readonly SemaphoreSlim _subscriptionGate = new SemaphoreSlim(1, 1);
// Tags that failed MxAccess subscription (e.g., MxAccess was down).
// Retried on reconnect via RetryPendingSubscriptions().
private readonly HashSet<string> _pendingTags
= new HashSet<string>(StringComparer.OrdinalIgnoreCase);
public SubscriptionManager(IScadaClient scadaClient, int channelCapacity = 1000,
BoundedChannelFullMode channelFullMode = BoundedChannelFullMode.DropOldest)
{
@@ -51,69 +42,59 @@ namespace ZB.MOM.WW.LmxProxy.Host.Subscriptions
/// <summary>
/// Creates a subscription for a client. Returns a ChannelReader to stream from.
/// Serialized with UnsubscribeClient via _subscriptionGate to prevent race
/// conditions during client reconnect (old unsubscribe vs new subscribe).
/// </summary>
public async Task<ChannelReader<(string address, Vtq vtq)>> SubscribeAsync(
public ChannelReader<(string address, Vtq vtq)> Subscribe(
string clientId, IEnumerable<string> addresses, CancellationToken ct)
{
await _subscriptionGate.WaitAsync(ct);
var channel = Channel.CreateBounded<(string address, Vtq vtq)>(
new BoundedChannelOptions(_channelCapacity)
{
FullMode = _channelFullMode,
SingleReader = true,
SingleWriter = false
});
var addressSet = new HashSet<string>(addresses, StringComparer.OrdinalIgnoreCase);
var clientSub = new ClientSubscription(clientId, channel, addressSet);
_clientSubscriptions[clientId] = clientSub;
var newTags = new List<string>();
_rwLock.EnterWriteLock();
try
{
var channel = Channel.CreateBounded<(string address, Vtq vtq)>(
new BoundedChannelOptions(_channelCapacity)
{
FullMode = _channelFullMode,
SingleReader = true,
SingleWriter = false
});
var addressSet = new HashSet<string>(addresses, StringComparer.OrdinalIgnoreCase);
var clientSub = new ClientSubscription(clientId, channel, addressSet);
_clientSubscriptions[clientId] = clientSub;
var newTags = new List<string>();
_rwLock.EnterWriteLock();
try
foreach (var address in addressSet)
{
foreach (var address in addressSet)
if (_tagSubscriptions.TryGetValue(address, out var tagSub))
{
if (_tagSubscriptions.TryGetValue(address, out var tagSub))
{
tagSub.ClientIds.Add(clientId);
}
else
{
_tagSubscriptions[address] = new TagSubscription(address,
new HashSet<string>(StringComparer.OrdinalIgnoreCase) { clientId });
newTags.Add(address);
}
tagSub.ClientIds.Add(clientId);
}
else
{
_tagSubscriptions[address] = new TagSubscription(address,
new HashSet<string>(StringComparer.OrdinalIgnoreCase) { clientId });
newTags.Add(address);
}
}
finally
{
_rwLock.ExitWriteLock();
}
// Create MxAccess COM subscriptions for newly subscribed tags (awaited, not fire-and-forget)
if (newTags.Count > 0)
{
await CreateMxAccessSubscriptionsAsync(newTags);
}
// Register cancellation cleanup
ct.Register(() => UnsubscribeClient(clientId));
Log.Information("Client {ClientId} subscribed to {Count} tags ({NewCount} new MxAccess subscriptions)",
clientId, addressSet.Count, newTags.Count);
return channel.Reader;
}
finally
{
_subscriptionGate.Release();
_rwLock.ExitWriteLock();
}
// Create MxAccess COM subscriptions for newly subscribed tags
if (newTags.Count > 0)
{
_ = CreateMxAccessSubscriptionsAsync(newTags);
}
// Register cancellation cleanup
ct.Register(() => UnsubscribeClient(clientId));
Log.Information("Client {ClientId} subscribed to {Count} tags ({NewCount} new MxAccess subscriptions)",
clientId, addressSet.Count, newTags.Count);
return channel.Reader;
}
private async Task CreateMxAccessSubscriptionsAsync(List<string> addresses)
@@ -123,25 +104,10 @@ namespace ZB.MOM.WW.LmxProxy.Host.Subscriptions
await _scadaClient.SubscribeAsync(
addresses,
(address, vtq) => OnTagValueChanged(address, vtq));
// Successful — remove from pending if they were there
lock (_pendingTags)
{
foreach (var address in addresses)
_pendingTags.Remove(address);
}
}
catch (Exception ex)
{
Log.Error(ex, "Failed to create MxAccess subscriptions for {Count} tags — " +
"storing as pending for retry on reconnect", addresses.Count);
// Store failed addresses for retry when MxAccess reconnects
lock (_pendingTags)
{
foreach (var address in addresses)
_pendingTags.Add(address);
}
Log.Error(ex, "Failed to create MxAccess subscriptions for {Count} tags", addresses.Count);
}
}
@@ -187,72 +153,56 @@ namespace ZB.MOM.WW.LmxProxy.Host.Subscriptions
/// <summary>
/// Removes a client's subscriptions and cleans up tag subscriptions
/// when the last client unsubscribes. Serialized with SubscribeAsync
/// via _subscriptionGate to prevent race conditions.
/// when the last client unsubscribes.
/// </summary>
public void UnsubscribeClient(string clientId)
{
_subscriptionGate.Wait();
if (!_clientSubscriptions.TryRemove(clientId, out var clientSub))
return;
var tagsToDispose = new List<string>();
_rwLock.EnterWriteLock();
try
{
if (!_clientSubscriptions.TryRemove(clientId, out var clientSub))
return;
var tagsToDispose = new List<string>();
_rwLock.EnterWriteLock();
try
foreach (var address in clientSub.Addresses)
{
foreach (var address in clientSub.Addresses)
if (_tagSubscriptions.TryGetValue(address, out var tagSub))
{
if (_tagSubscriptions.TryGetValue(address, out var tagSub))
{
tagSub.ClientIds.Remove(clientId);
tagSub.ClientIds.Remove(clientId);
// Last client unsubscribed — remove the tag subscription
if (tagSub.ClientIds.Count == 0)
{
_tagSubscriptions.TryRemove(address, out _);
tagsToDispose.Add(address);
}
// Last client unsubscribed — remove the tag subscription
if (tagSub.ClientIds.Count == 0)
{
_tagSubscriptions.TryRemove(address, out _);
tagsToDispose.Add(address);
}
}
}
finally
{
_rwLock.ExitWriteLock();
}
// Unsubscribe tags with no remaining clients via address-based API
if (tagsToDispose.Count > 0)
{
// Also remove from pending if they were awaiting retry
lock (_pendingTags)
{
foreach (var address in tagsToDispose)
_pendingTags.Remove(address);
}
try
{
_scadaClient.UnsubscribeByAddressAsync(tagsToDispose).GetAwaiter().GetResult();
}
catch (Exception ex)
{
Log.Warning(ex, "Error unsubscribing {Count} tags from MxAccess", tagsToDispose.Count);
}
}
// Complete the channel (signals end of stream to the gRPC handler)
clientSub.Channel.Writer.TryComplete();
Log.Information("Client {ClientId} unsubscribed ({Delivered} delivered, {Dropped} dropped)",
clientId, clientSub.DeliveredCount, clientSub.DroppedCount);
}
finally
{
_subscriptionGate.Release();
_rwLock.ExitWriteLock();
}
// Unsubscribe tags with no remaining clients via address-based API
if (tagsToDispose.Count > 0)
{
try
{
_scadaClient.UnsubscribeByAddressAsync(tagsToDispose).GetAwaiter().GetResult();
}
catch (Exception ex)
{
Log.Warning(ex, "Error unsubscribing {Count} tags from MxAccess", tagsToDispose.Count);
}
}
// Complete the channel (signals end of stream to the gRPC handler)
clientSub.Channel.Writer.TryComplete();
Log.Information("Client {ClientId} unsubscribed ({Delivered} delivered, {Dropped} dropped)",
clientId, clientSub.DeliveredCount, clientSub.DroppedCount);
}
/// <summary>
@@ -273,8 +223,8 @@ namespace ZB.MOM.WW.LmxProxy.Host.Subscriptions
}
/// <summary>
/// Called when MxAccess reconnects. Retries any pending subscriptions
/// that failed during the disconnected period.
/// Logs reconnection for observability. Data flow resumes automatically
/// via MxAccessClient.RecreateStoredSubscriptionsAsync callbacks.
/// </summary>
public void NotifyReconnection()
{
@@ -282,24 +232,6 @@ namespace ZB.MOM.WW.LmxProxy.Host.Subscriptions
"data flow will resume via OnDataChange callbacks " +
"({ClientCount} clients, {TagCount} tags)",
_clientSubscriptions.Count, _tagSubscriptions.Count);
_ = RetryPendingSubscriptionsAsync();
}
/// <summary>
/// Retries MxAccess subscriptions for tags that failed during disconnect.
/// </summary>
private async Task RetryPendingSubscriptionsAsync()
{
List<string> tagsToRetry;
lock (_pendingTags)
{
if (_pendingTags.Count == 0) return;
tagsToRetry = new List<string>(_pendingTags);
}
Log.Information("Retrying {Count} pending MxAccess subscriptions after reconnect", tagsToRetry.Count);
await CreateMxAccessSubscriptionsAsync(tagsToRetry);
}
/// <summary>Returns subscription statistics.</summary>
@@ -320,7 +252,6 @@ namespace ZB.MOM.WW.LmxProxy.Host.Subscriptions
_clientSubscriptions.Clear();
_tagSubscriptions.Clear();
_rwLock.Dispose();
_subscriptionGate.Dispose();
}
// ── Nested types ─────────────────────────────────────────