fix(driver-galaxy): resolve High code-review findings (Driver.Galaxy-002, Driver.Galaxy-008)

Driver.Galaxy-002 — DataTypeMap.Map had no Int64 arm though MxValueDecoder/
MxValueEncoder both fully support Int64. Galaxy attributes with the Int64
mx_data_type code fell through to the String default, creating a String
address-space node while runtime reads decoded a boxed long. Added
`6 => DriverDataType.Int64`, extending the contiguous 0..5 scheme so the type
map agrees with the decoder/encoder on all seven Galaxy data types.

Driver.Galaxy-008 — after a stream fault the EventPump's StreamEvents consumer
loop exited and its channel completed; EventPump.Start() is a no-op on a
completed-but-non-null loop, so a replayed subscription had no consumer and
ReplayAsync never re-registered the post-reconnect item handles. ReplayAsync
now recreates the EventPump (RestartEventPumpForReplay) and rebinds the
SubscriptionRegistry per subscription with the fresh item handles returned by
the post-reconnect SubscribeBulkAsync, via new SubscriptionRegistry.SnapshotEntries
and Rebind APIs.

Regression tests: DataTypeMapTests (every code incl. Int64), SubscriptionRegistry
Tests (Rebind/SnapshotEntries), EventPumpStreamFaultTests (faulted pump dead,
fresh pump resumes dispatch).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Joseph Doherty
2026-05-22 06:59:09 -04:00
parent c9e446387a
commit 7f2e144f8d
7 changed files with 345 additions and 15 deletions

View File

@@ -8,6 +8,14 @@ namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Browse;
/// to <see cref="DriverDataType.String"/> for unknown codes — keeps wire compatibility
/// with deployed configs while we tighten this through the parity matrix.
/// </summary>
/// <remarks>
/// Code <c>6</c> (Int64) extends the contiguous 0..5 scheme so the map covers the same
/// seven Galaxy data types <c>MxValueDecoder</c> / <c>MxValueEncoder</c> already decode
/// and encode (Boolean, Int32, Int64, Float32, Float64, String, DateTime). Without it an
/// Int64 attribute fell through to the <see cref="DriverDataType.String"/> default,
/// creating a String address-space node while runtime reads decoded a boxed <c>long</c> —
/// a metadata / coercion mismatch (Driver.Galaxy-002).
/// </remarks>
internal static class DataTypeMap
{
public static DriverDataType Map(int mxDataType) => mxDataType switch
@@ -18,6 +26,7 @@ internal static class DataTypeMap
3 => DriverDataType.Float64,
4 => DriverDataType.String,
5 => DriverDataType.DateTime,
6 => DriverDataType.Int64,
_ => DriverDataType.String,
};
}

View File

@@ -257,22 +257,106 @@ public sealed class GalaxyDriver
}
/// <summary>
/// Replay callback. Walks every active subscription's bindings and re-issues
/// SubscribeBulk for the tag list. PR 6.x can swap this for the gw's batched
/// <c>ReplaySubscriptionsCommand</c> once it ships.
/// Replay callback. Walks every active subscription, re-issues SubscribeBulk for
/// its tag list, and <see cref="SubscriptionRegistry.Rebind">rebinds</see> the
/// registry with the fresh item handles the gateway returned — the pre-reconnect
/// handles are dead once the session reopened. The faulted <see cref="EventPump"/>
/// is recreated first so the replayed subscriptions have a live StreamEvents
/// consumer; without that restart the replayed tags are subscribed on the gw but
/// never reach <c>OnDataChange</c> (Driver.Galaxy-008). PR 6.x can swap this for
/// the gw's batched <c>ReplaySubscriptionsCommand</c> once it ships.
/// </summary>
private async Task ReplayAsync(CancellationToken cancellationToken)
{
if (_subscriber is null) return;
var bindings = _subscriptions.SnapshotAllBindings();
if (bindings.Count == 0) return;
var entries = _subscriptions.SnapshotEntries();
if (entries.Count == 0) return;
// The stream-fault that triggered this recovery left the old pump's RunAsync loop
// exited and its channel completed; EventPump.Start() is a no-op on a non-null but
// completed loop. Recreate the pump so the replayed subscriptions have a consumer.
RestartEventPumpForReplay();
var tagCount = 0;
foreach (var (subscriptionId, oldBindings) in entries)
{
var refs = oldBindings
.Select(b => b.FullReference)
.Distinct(StringComparer.OrdinalIgnoreCase)
.ToArray();
if (refs.Length == 0) continue;
var results = await _subscriber
.SubscribeBulkAsync(refs, _options.MxAccess.PublishingIntervalMs, cancellationToken)
.ConfigureAwait(false);
var byAddress = BuildResultIndex(results);
var newBindings = new List<TagBinding>(refs.Length);
foreach (var fullRef in refs)
{
var itemHandle = byAddress.TryGetValue(fullRef, out var match) && match.WasSuccessful
? match.ItemHandle
: 0;
newBindings.Add(new TagBinding(fullRef, itemHandle));
}
// Rebind so the EventPump fan-out reverse map points at the post-reconnect
// handles; otherwise events on the new handles miss every subscription.
_subscriptions.Rebind(subscriptionId, newBindings);
tagCount += refs.Length;
}
var refs = bindings.Select(b => b.FullReference).Distinct(StringComparer.OrdinalIgnoreCase).ToArray();
await _subscriber.SubscribeBulkAsync(
refs, _options.MxAccess.PublishingIntervalMs, cancellationToken).ConfigureAwait(false);
_logger.LogInformation(
"GalaxyDriver {InstanceId} replay completed — {Count} tags re-subscribed",
_driverInstanceId, refs.Length);
"GalaxyDriver {InstanceId} replay completed — {SubCount} subscriptions, {TagCount} tags re-subscribed",
_driverInstanceId, entries.Count, tagCount);
}
/// <summary>
/// Index a SubscribeBulk result list by tag address (OrdinalIgnoreCase) so the
/// subscribe / replay correlation loops are O(1) per reference rather than a
/// linear scan. Last-write-wins on a duplicate address (the gw shouldn't emit one).
/// </summary>
private static Dictionary<string, SubscribeResult> BuildResultIndex(IReadOnlyList<SubscribeResult> results)
{
var index = new Dictionary<string, SubscribeResult>(results.Count, StringComparer.OrdinalIgnoreCase);
foreach (var result in results)
{
if (!string.IsNullOrEmpty(result.TagAddress)) index[result.TagAddress] = result;
}
return index;
}
/// <summary>
/// Dispose the faulted <see cref="EventPump"/> and create a fresh one bound to the
/// same subscriber / registry, started immediately. Invoked from the reconnect
/// replay path. No-op when no pump was ever started (no active subscriptions).
/// </summary>
private void RestartEventPumpForReplay()
{
EventPump? old;
lock (_pumpLock)
{
old = _eventPump;
if (old is null) return; // pump never started — nothing to restart
_eventPump = null;
}
// Detach + dispose the faulted pump outside the lock so a slow shutdown doesn't
// block a concurrent EnsureEventPumpStarted; the old loop already exited on fault.
old.OnDataChange -= OnPumpDataChange;
try { old.DisposeAsync().AsTask().GetAwaiter().GetResult(); }
catch (Exception ex)
{
_logger.LogWarning(ex,
"GalaxyDriver {InstanceId} faulted EventPump dispose failed during replay — continuing.",
_driverInstanceId);
}
// EnsureEventPumpStarted creates + starts a fresh pump under the lock. Skip the
// recreate if the driver is being disposed — Dispose already tore the pump down
// and a fresh one here would leak past the driver's lifetime.
if (_disposed) return;
EnsureEventPumpStarted();
}
private void OnSupervisorStateChanged(object? sender, StateTransition transition)

View File

@@ -95,6 +95,47 @@ internal sealed class SubscriptionRegistry
public IReadOnlyList<TagBinding> SnapshotAllBindings() =>
[.. _bySubscriptionId.Values.SelectMany(entry => entry.Bindings)];
/// <summary>
/// Snapshot every active subscription with its bindings, grouped by subscription id.
/// Used by the reconnect replay path so it can re-issue SubscribeBulk per subscription
/// and then <see cref="Rebind"/> each one with the post-reconnect item handles.
/// </summary>
public IReadOnlyList<(long SubscriptionId, IReadOnlyList<TagBinding> Bindings)> SnapshotEntries() =>
[.. _bySubscriptionId.Values.Select(entry => (entry.SubscriptionId, entry.Bindings))];
/// <summary>
/// Replace an existing subscription's bindings with the item handles a post-reconnect
/// SubscribeBulk returned, rebuilding the reverse fan-out map so events on the new
/// handles dispatch and the now-dead pre-reconnect handles are dropped. No-op when the
/// subscription id is unknown (it was unsubscribed during the reconnect window).
/// </summary>
public void Rebind(long subscriptionId, IReadOnlyList<TagBinding> newBindings)
{
if (!_bySubscriptionId.TryGetValue(subscriptionId, out var oldEntry)) return;
// Drop this subscription from every reverse-map bag it currently appears in. The
// pre-reconnect item handles are stale once the gw re-issues fresh ones.
foreach (var binding in oldEntry.Bindings)
{
if (binding.ItemHandle <= 0) continue;
if (!_subscribersByItemHandle.TryGetValue(binding.ItemHandle, out var bag)) continue;
var remaining = new ConcurrentBag<long>(bag.Where(id => id != subscriptionId));
if (remaining.IsEmpty) _subscribersByItemHandle.TryRemove(binding.ItemHandle, out _);
else _subscribersByItemHandle[binding.ItemHandle] = remaining;
}
_bySubscriptionId[subscriptionId] = new SubscriptionEntry(subscriptionId, newBindings);
foreach (var binding in newBindings)
{
if (binding.ItemHandle <= 0) continue; // failed gw subscribe — no events expected
_subscribersByItemHandle.AddOrUpdate(
binding.ItemHandle,
_ => [subscriptionId],
(_, bag) => { bag.Add(subscriptionId); return bag; });
}
}
private sealed record SubscriptionEntry(long SubscriptionId, IReadOnlyList<TagBinding> Bindings);
}