@@ -215,10 +215,15 @@ public sealed class S7Driver(S7DriverOptions options, string driverInstanceId)
|
||||
_probeCts?.Dispose();
|
||||
_probeCts = null;
|
||||
|
||||
// PR-S7-C3 — every subscription owns N partition CTSs; tear them all down so a
|
||||
// shutdown mid-poll doesn't leave background tasks running against a closed Plc.
|
||||
foreach (var state in _subscriptions.Values)
|
||||
{
|
||||
try { state.Cts.Cancel(); } catch { }
|
||||
state.Cts.Dispose();
|
||||
foreach (var part in state.Partitions)
|
||||
{
|
||||
try { part.Cts.Cancel(); } catch { }
|
||||
part.Cts.Dispose();
|
||||
}
|
||||
}
|
||||
_subscriptions.Clear();
|
||||
|
||||
@@ -1160,20 +1165,66 @@ public sealed class S7Driver(S7DriverOptions options, string driverInstanceId)
|
||||
|
||||
// ---- ISubscribable (polling overlay) ----
|
||||
|
||||
/// <summary>
|
||||
/// PR-S7-C3 — partitions <paramref name="fullReferences"/> by resolved publishing
|
||||
/// interval (per-tag <see cref="S7TagDefinition.ScanGroup"/> looked up in
|
||||
/// <see cref="S7DriverOptions.ScanGroupIntervals"/>, falling back to
|
||||
/// <paramref name="publishingInterval"/>) and starts one background poll loop per
|
||||
/// distinct interval. The returned <see cref="ISubscriptionHandle"/> is one logical
|
||||
/// subscription that owns N partition loops; <see cref="UnsubscribeAsync"/> tears
|
||||
/// them all down together.
|
||||
/// </summary>
|
||||
/// <remarks>
|
||||
/// Each partition shares the per-driver <c>_gate</c> semaphore, so wire-level reads
|
||||
/// stay strictly serial — the multi-rate split decouples tick cadence (a fast HMI tag
|
||||
/// isn't blocked behind a slow batch's <c>Task.Delay</c>) but does NOT parallelise
|
||||
/// mailbox traffic. The "1 connection / 1 mailbox" caveat is documented in
|
||||
/// <c>docs/v2/s7.md</c>.
|
||||
/// </remarks>
|
||||
public Task<ISubscriptionHandle> SubscribeAsync(
|
||||
IReadOnlyList<string> fullReferences, TimeSpan publishingInterval, CancellationToken cancellationToken)
|
||||
{
|
||||
var id = Interlocked.Increment(ref _nextSubscriptionId);
|
||||
var cts = new CancellationTokenSource();
|
||||
// Floor at 100 ms — S7 CPUs scan 2-10 ms but the comms mailbox is processed at most
|
||||
// once per scan; sub-100 ms polling just queues wire-side with worse latency.
|
||||
var interval = publishingInterval < TimeSpan.FromMilliseconds(100)
|
||||
? TimeSpan.FromMilliseconds(100)
|
||||
: publishingInterval;
|
||||
var handle = new S7SubscriptionHandle(id);
|
||||
var state = new SubscriptionState(handle, [.. fullReferences], interval, cts);
|
||||
|
||||
// Floor at 100 ms — S7 CPUs scan 2-10 ms but the comms mailbox is processed at most
|
||||
// once per scan; sub-100 ms polling just queues wire-side with worse latency. The
|
||||
// floor applies to BOTH the subscribe-default interval AND any per-group override
|
||||
// so a misconfigured group can't slip below the protective bound.
|
||||
var defaultInterval = ApplyMinInterval(publishingInterval);
|
||||
|
||||
// Bucket tags by resolved interval. Tags with no ScanGroup, or with a group not in
|
||||
// the rate map, fall back to the subscription-default rate. This preserves the
|
||||
// legacy single-rate path: an unconfigured driver gets exactly one partition.
|
||||
var partitions = new Dictionary<TimeSpan, List<string>>();
|
||||
foreach (var tagRef in fullReferences)
|
||||
{
|
||||
var interval = ResolveInterval(tagRef, defaultInterval);
|
||||
if (!partitions.TryGetValue(interval, out var list))
|
||||
{
|
||||
list = [];
|
||||
partitions[interval] = list;
|
||||
}
|
||||
list.Add(tagRef);
|
||||
}
|
||||
|
||||
var partitionStates = new List<PartitionState>(partitions.Count);
|
||||
foreach (var (interval, refs) in partitions)
|
||||
{
|
||||
var partCts = new CancellationTokenSource();
|
||||
var part = new PartitionState(refs, interval, partCts);
|
||||
partitionStates.Add(part);
|
||||
}
|
||||
|
||||
var state = new SubscriptionState(handle, [.. fullReferences], defaultInterval, partitionStates);
|
||||
_subscriptions[id] = state;
|
||||
_ = Task.Run(() => PollLoopAsync(state, cts.Token), cts.Token);
|
||||
|
||||
// Start each partition loop AFTER the state is registered so an early UnsubscribeAsync
|
||||
// (e.g. the OPC UA stack tearing the session down on session cancel) doesn't race
|
||||
// ahead of the partitions' Task.Run kickoff.
|
||||
foreach (var part in partitionStates)
|
||||
_ = Task.Run(() => PollLoopAsync(handle, part, part.Cts.Token), part.Cts.Token);
|
||||
|
||||
return Task.FromResult<ISubscriptionHandle>(handle);
|
||||
}
|
||||
|
||||
@@ -1181,49 +1232,135 @@ public sealed class S7Driver(S7DriverOptions options, string driverInstanceId)
|
||||
{
|
||||
if (handle is S7SubscriptionHandle h && _subscriptions.TryRemove(h.Id, out var state))
|
||||
{
|
||||
state.Cts.Cancel();
|
||||
state.Cts.Dispose();
|
||||
foreach (var part in state.Partitions)
|
||||
{
|
||||
try { part.Cts.Cancel(); } catch { }
|
||||
part.Cts.Dispose();
|
||||
}
|
||||
}
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
private async Task PollLoopAsync(SubscriptionState state, CancellationToken ct)
|
||||
/// <summary>
|
||||
/// Apply the 100 ms floor to a caller-supplied publishing interval. Internal so
|
||||
/// <see cref="SubscribeAsync"/> can guard both the default + every per-group rate.
|
||||
/// </summary>
|
||||
private static TimeSpan ApplyMinInterval(TimeSpan requested) =>
|
||||
requested < TimeSpan.FromMilliseconds(100) ? TimeSpan.FromMilliseconds(100) : requested;
|
||||
|
||||
/// <summary>
|
||||
/// Resolve the publishing interval for one tag — <see cref="S7DriverOptions.ScanGroupIntervals"/>
|
||||
/// wins when the tag's <see cref="S7TagDefinition.ScanGroup"/> is present, otherwise
|
||||
/// fall back to the subscription default. Unknown tags (not in the driver's map)
|
||||
/// fall back to the default — the poll loop will surface them as BadNodeIdUnknown
|
||||
/// anyway via <see cref="ReadAsync"/>.
|
||||
/// </summary>
|
||||
internal TimeSpan ResolveInterval(string tagRef, TimeSpan defaultInterval)
|
||||
{
|
||||
// Initial-data push per OPC UA Part 4 convention.
|
||||
try { await PollOnceAsync(state, forceRaise: true, ct).ConfigureAwait(false); }
|
||||
if (_options.ScanGroupIntervals is { Count: > 0 } map &&
|
||||
_tagsByName.TryGetValue(tagRef, out var def) &&
|
||||
!string.IsNullOrWhiteSpace(def.ScanGroup) &&
|
||||
// Case-insensitive lookup: scan group names come from human-typed config
|
||||
// and the JSON DTO already lower-cases the lookup, so don't make ScanGroup
|
||||
// values case-sensitive at runtime either.
|
||||
TryGetCaseInsensitive(map, def.ScanGroup!, out var groupInterval))
|
||||
{
|
||||
return ApplyMinInterval(groupInterval);
|
||||
}
|
||||
return defaultInterval;
|
||||
}
|
||||
|
||||
private static bool TryGetCaseInsensitive(IReadOnlyDictionary<string, TimeSpan> map, string key, out TimeSpan value)
|
||||
{
|
||||
if (map.TryGetValue(key, out value)) return true;
|
||||
foreach (var kvp in map)
|
||||
if (string.Equals(kvp.Key, key, StringComparison.OrdinalIgnoreCase))
|
||||
{
|
||||
value = kvp.Value;
|
||||
return true;
|
||||
}
|
||||
value = default;
|
||||
return false;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Test-only: count of distinct partition loops a subscription handle owns. Used by
|
||||
/// <c>S7ScanGroupPartitioningTests</c> to assert that 3 tags at 3 rates produce 3
|
||||
/// partitions (and 3 tags at 1 rate produce 1 partition).
|
||||
/// </summary>
|
||||
internal int GetPartitionCount(ISubscriptionHandle handle) =>
|
||||
handle is S7SubscriptionHandle h && _subscriptions.TryGetValue(h.Id, out var state)
|
||||
? state.Partitions.Count
|
||||
: 0;
|
||||
|
||||
/// <summary>
|
||||
/// Test-only: snapshot of the (interval, tag-count) pairs for a subscription's
|
||||
/// partitions. Surfaces the actual partitioning so tests can assert "5 tags split
|
||||
/// 2 + 3" without grepping the poll-loop internals.
|
||||
/// </summary>
|
||||
internal IReadOnlyList<(TimeSpan Interval, int TagCount)> GetPartitionSummary(ISubscriptionHandle handle) =>
|
||||
handle is S7SubscriptionHandle h && _subscriptions.TryGetValue(h.Id, out var state)
|
||||
? [.. state.Partitions.Select(p => (p.Interval, p.TagReferences.Count))]
|
||||
: [];
|
||||
|
||||
private async Task PollLoopAsync(S7SubscriptionHandle handle, PartitionState part, CancellationToken ct)
|
||||
{
|
||||
// Initial-data push per OPC UA Part 4 convention. Each partition does its own
|
||||
// initial push: the OPC UA stack receives one DataChange per tag at subscribe time
|
||||
// regardless of which partition the tag landed in.
|
||||
try { await PollOnceAsync(handle, part, forceRaise: true, ct).ConfigureAwait(false); }
|
||||
catch (OperationCanceledException) { return; }
|
||||
catch { /* first-read error — polling continues */ }
|
||||
|
||||
while (!ct.IsCancellationRequested)
|
||||
{
|
||||
try { await Task.Delay(state.Interval, ct).ConfigureAwait(false); }
|
||||
try { await Task.Delay(part.Interval, ct).ConfigureAwait(false); }
|
||||
catch (OperationCanceledException) { return; }
|
||||
|
||||
try { await PollOnceAsync(state, forceRaise: false, ct).ConfigureAwait(false); }
|
||||
try { await PollOnceAsync(handle, part, forceRaise: false, ct).ConfigureAwait(false); }
|
||||
catch (OperationCanceledException) { return; }
|
||||
catch { /* transient polling error — loop continues, health surface reflects it */ }
|
||||
}
|
||||
}
|
||||
|
||||
private async Task PollOnceAsync(SubscriptionState state, bool forceRaise, CancellationToken ct)
|
||||
private async Task PollOnceAsync(S7SubscriptionHandle handle, PartitionState part, bool forceRaise, CancellationToken ct)
|
||||
{
|
||||
var snapshots = await ReadAsync(state.TagReferences, ct).ConfigureAwait(false);
|
||||
for (var i = 0; i < state.TagReferences.Count; i++)
|
||||
// ReadAsync takes _gate internally, which is what serialises every partition's
|
||||
// wire traffic against the single S7 connection. Multiple partitions racing for
|
||||
// the gate is fine — short-running ones get serviced inside the long ones' Delay
|
||||
// window, which is exactly the cadence-decoupling we want from PR-S7-C3.
|
||||
var snapshots = await ReadAsync(part.TagReferences, ct).ConfigureAwait(false);
|
||||
for (var i = 0; i < part.TagReferences.Count; i++)
|
||||
{
|
||||
var tagRef = state.TagReferences[i];
|
||||
var tagRef = part.TagReferences[i];
|
||||
var current = snapshots[i];
|
||||
var lastSeen = state.LastValues.TryGetValue(tagRef, out var prev) ? prev : default;
|
||||
var lastSeen = part.LastValues.TryGetValue(tagRef, out var prev) ? prev : default;
|
||||
|
||||
if (forceRaise || !Equals(lastSeen?.Value, current.Value) || lastSeen?.StatusCode != current.StatusCode)
|
||||
{
|
||||
state.LastValues[tagRef] = current;
|
||||
OnDataChange?.Invoke(this, new DataChangeEventArgs(state.Handle, tagRef, current));
|
||||
part.LastValues[tagRef] = current;
|
||||
OnDataChange?.Invoke(this, new DataChangeEventArgs(handle, tagRef, current));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// One subscription owns N partitions, one per distinct publishing interval.
|
||||
/// <see cref="TagReferences"/> is the original (unpartitioned) request preserved for
|
||||
/// diagnostics — runtime polling is driven by <see cref="Partitions"/>.
|
||||
/// </summary>
|
||||
private sealed record SubscriptionState(
|
||||
S7SubscriptionHandle Handle,
|
||||
IReadOnlyList<string> TagReferences,
|
||||
TimeSpan DefaultInterval,
|
||||
IReadOnlyList<PartitionState> Partitions);
|
||||
|
||||
/// <summary>
|
||||
/// One poll loop's worth of state: the tags it owns, its tick interval, its
|
||||
/// per-tag last-seen cache, and the CTS that <see cref="UnsubscribeAsync"/> /
|
||||
/// <see cref="DisposeAsync"/> trip.
|
||||
/// </summary>
|
||||
private sealed record PartitionState(
|
||||
IReadOnlyList<string> TagReferences,
|
||||
TimeSpan Interval,
|
||||
CancellationTokenSource Cts)
|
||||
|
||||
Reference in New Issue
Block a user