AB CIP PR 7 — ISubscribable via shared PollGroupEngine. AbCipDriver now implements ISubscribable — Subscribe delegates into the PollGroupEngine extracted in PR 1, Unsubscribe releases the subscription, ShutdownAsync disposes the engine cancelling every active subscription. OnDataChange event wired through the engine's on-change callback so external subscribers see the driver as sender. The engine's reader delegate points at the driver's ReadAsync (already handles lazy runtime init + caching via EnsureTagRuntimeAsync) — each poll tick batch-reads every subscribed tag in one IReadable call. 100ms interval floor inherited from PollGroupEngine.DefaultMinInterval matches Modbus convention. Initial-data push on first poll preserved via forceRaise=true. Exception-tolerant loop preserved — individual read failures show up as DataValueSnapshot with non-Good StatusCode via the status-code mapping PR 3 established. 7 new unit tests in AbCipSubscriptionTests covering initial-poll raising per tag, unchanged value raising only once, value change between polls triggering a new event, Unsubscribe halting the loop, 100ms floor keeping a 5ms request from generating extra events against a stable value, ShutdownAsync cancelling active subscriptions, UDT member subscription routing through the synthesised Motor1.Speed full-reference (proving PR 6's fan-out composes correctly with PR 7's subscription path). Total AbCip unit tests now 137/137 passing (+7 from PR 6's 130). Validates that the shared PollGroupEngine from PR 1 works correctly for a second driver, closing the original motivation for the extraction. Full solution builds 0 errors; Modbus + other drivers untouched.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Joseph Doherty
2026-04-19 17:11:51 -04:00
parent 521bcb2f68
commit 33780eb64c
2 changed files with 205 additions and 2 deletions

View File

@@ -20,17 +20,20 @@ namespace ZB.MOM.WW.OtOpcUa.Driver.AbCip;
/// from native-heap growth that the CLR allocator can't see; it tears down every
/// <see cref="PlcTagHandle"/> and reconnects each device.</para>
/// </remarks>
public sealed class AbCipDriver : IDriver, IReadable, IWritable, ITagDiscovery, IDisposable, IAsyncDisposable
public sealed class AbCipDriver : IDriver, IReadable, IWritable, ITagDiscovery, ISubscribable, IDisposable, IAsyncDisposable
{
private readonly AbCipDriverOptions _options;
private readonly string _driverInstanceId;
private readonly IAbCipTagFactory _tagFactory;
private readonly IAbCipTagEnumeratorFactory _enumeratorFactory;
private readonly AbCipTemplateCache _templateCache = new();
private readonly PollGroupEngine _poll;
private readonly Dictionary<string, DeviceState> _devices = new(StringComparer.OrdinalIgnoreCase);
private readonly Dictionary<string, AbCipTagDefinition> _tagsByName = new(StringComparer.OrdinalIgnoreCase);
private DriverHealth _health = new(DriverState.Unknown, null, null);
public event EventHandler<DataChangeEventArgs>? OnDataChange;
public AbCipDriver(AbCipDriverOptions options, string driverInstanceId,
IAbCipTagFactory? tagFactory = null,
IAbCipTagEnumeratorFactory? enumeratorFactory = null)
@@ -40,6 +43,10 @@ public sealed class AbCipDriver : IDriver, IReadable, IWritable, ITagDiscovery,
_driverInstanceId = driverInstanceId;
_tagFactory = tagFactory ?? new LibplctagTagFactory();
_enumeratorFactory = enumeratorFactory ?? new EmptyAbCipTagEnumeratorFactory();
_poll = new PollGroupEngine(
reader: ReadAsync,
onChange: (handle, tagRef, snapshot) =>
OnDataChange?.Invoke(this, new DataChangeEventArgs(handle, tagRef, snapshot)));
}
/// <summary>Shared UDT template cache. Exposed for PR 6 (UDT reader) + diagnostics.</summary>
@@ -98,13 +105,25 @@ public sealed class AbCipDriver : IDriver, IReadable, IWritable, ITagDiscovery,
await InitializeAsync(driverConfigJson, cancellationToken).ConfigureAwait(false);
}
public Task ShutdownAsync(CancellationToken cancellationToken)
public async Task ShutdownAsync(CancellationToken cancellationToken)
{
await _poll.DisposeAsync().ConfigureAwait(false);
foreach (var state in _devices.Values)
state.DisposeHandles();
_devices.Clear();
_tagsByName.Clear();
_health = new DriverHealth(DriverState.Unknown, _health.LastSuccessfulRead, null);
}
// ---- ISubscribable (polling overlay via shared engine) ----
public Task<ISubscriptionHandle> SubscribeAsync(
IReadOnlyList<string> fullReferences, TimeSpan publishingInterval, CancellationToken cancellationToken) =>
Task.FromResult(_poll.Subscribe(fullReferences, publishingInterval));
public Task UnsubscribeAsync(ISubscriptionHandle handle, CancellationToken cancellationToken)
{
_poll.Unsubscribe(handle);
return Task.CompletedTask;
}