Merge pull request '[s7] S7 — Per-tag scan group / publish rate' (#376) from auto/s7/PR-S7-C3 into auto/driver-gaps

This commit was merged in pull request #376.
This commit is contained in:
2026-04-26 01:05:39 -04:00
6 changed files with 736 additions and 26 deletions

View File

@@ -573,6 +573,101 @@ S7 driver health without reaching for a Wireshark capture:
The values render alongside Modbus / OPC UA Client metrics in the Admin
UI driver-diagnostics panel — same RPC, same dashboard row layout.
### Per-tag scan groups
Before PR-S7-C3, `ISubscribable.SubscribeAsync` took **one** publishing
interval and applied it to every tag in the input list. A site that wanted
mixed cadences — say a 100 ms HMI pulse, a 1 s dashboard tile, and a 10 s
slow-poll for trend data — had to issue **three separate subscribe calls**,
each with its own list of tags. That works, but it pushes the partitioning
problem up to the caller (the OPC UA address space layer) and means an
operator can't express "this tag is slow-poll" purely in driver config.
PR-S7-C3 adds **per-tag scan groups** so a single `SubscribeAsync` call
naturally splits into N independent poll loops:
- `S7TagDefinition.ScanGroup` (string, optional) — the group identifier the
tag belongs to. Tags with no group (or with a group not declared in the
rate map below) keep the legacy behaviour and inherit the
subscription-default publishing interval.
- `S7DriverOptions.ScanGroupIntervals` (`IReadOnlyDictionary<string, TimeSpan>`,
optional) — the rate map. Group names are matched case-insensitively. Any
group with a non-positive interval (≤ 0 ms) is silently dropped at config
load and tags falling back to that group land in the default partition.
At subscribe time the driver buckets the input tag list by **resolved
publishing interval** (per-tag group → map lookup → fallback to the
subscription default), then spins up one background poll loop per distinct
interval. Each loop owns its own `CancellationTokenSource` and its own
`LastValues` cache; `UnsubscribeAsync` cancels and disposes every per-group
loop together so a multi-rate subscription can't leak background tasks.
#### JSON config example
```json
{
"Host": "10.0.0.50",
"ScanGroupIntervalsMs": {
"Fast": 100,
"Medium": 1000,
"Slow": 10000
},
"Tags": [
{ "Name": "PressureSetpoint", "Address": "DB1.DBW0", "DataType": "Int16", "ScanGroup": "Fast" },
{ "Name": "BatchTotal", "Address": "DB1.DBD10", "DataType": "Int32", "ScanGroup": "Medium" },
{ "Name": "TrendBucket", "Address": "DB1.DBD20", "DataType": "Float32", "ScanGroup": "Slow" }
]
}
```
A single `SubscribeAsync(["PressureSetpoint","BatchTotal","TrendBucket"], 1s)`
call against this driver produces **three independent poll loops** —
the fast HMI tag ticks at 100 ms, the dashboard tile at 1 s, the trend
bucket at 10 s. The caller-supplied 1 s default is unused because every
tag carries an explicit group.
#### 100 ms floor applies per partition
The `100 ms` floor that protects the S7 mailbox from sub-scan polling
applies to **both** the subscription default **and** every per-group rate.
A typo'd entry like `{"TooFast": 25}` is silently floored to 100 ms at
partition-build time — the driver never schedules a sub-100 ms `Task.Delay`
even if the operator tries.
#### `_gate` contention caveat — "1 connection / 1 mailbox"
Partitioning into N poll loops does **not** parallelise wire-level reads.
S7netplus's documented pattern is one `Plc` instance per CPU, and the
driver enforces that with a per-instance `SemaphoreSlim` (`_gate`) that
every read takes before touching the socket. All N partitions share the
same gate, so the **mailbox is still strictly serial** — what the multi-rate
split actually buys you is **cadence decoupling**:
- Before PR-S7-C3: every tag ticked at the slowest configured interval (or
required three separate subscribe calls and three separate logical
subscription handles, complicating the address-space layer).
- After PR-S7-C3: a 100 ms HMI tag isn't blocked behind a 10 s slow-poll
batch's `Task.Delay`. While Slow is sleeping, the gate is free and Fast
acquires it, polls, releases. The CPU sees more frequent small requests
rather than infrequent large ones — which is what you want for a
responsive HMI surface.
The caveat to be aware of: if Fast's per-tick read takes longer than its
tick interval (e.g. 100 ms tick but 200 ms gate-held read because Medium
or Slow happens to be mid-read on the gate), Fast's effective cadence
slows to "as fast as the gate lets me." That's a property of S7netplus's
single-connection design, not of partitioning — three separate driver
instances against the same CPU would just waste the CPU's
8-64-connection-resource budget without speeding anything up.
#### Diagnostics
Partition counts aren't yet surfaced under
`DriverHealth.Diagnostics` (planned for a follow-up alongside per-partition
tick rate). Tests can call the internal helpers `S7Driver.GetPartitionCount`
and `S7Driver.GetPartitionSummary` to inspect the resolved partitioning of
a live subscription handle.
## TSAP / Connection Type
S7comm runs on top of ISO-on-TCP (RFC 1006), and the COTP connection-request

View File

@@ -215,10 +215,15 @@ public sealed class S7Driver(S7DriverOptions options, string driverInstanceId)
_probeCts?.Dispose();
_probeCts = null;
// PR-S7-C3 — every subscription owns N partition CTSs; tear them all down so a
// shutdown mid-poll doesn't leave background tasks running against a closed Plc.
foreach (var state in _subscriptions.Values)
{
try { state.Cts.Cancel(); } catch { }
state.Cts.Dispose();
foreach (var part in state.Partitions)
{
try { part.Cts.Cancel(); } catch { }
part.Cts.Dispose();
}
}
_subscriptions.Clear();
@@ -1160,20 +1165,66 @@ public sealed class S7Driver(S7DriverOptions options, string driverInstanceId)
// ---- ISubscribable (polling overlay) ----
/// <summary>
/// PR-S7-C3 — partitions <paramref name="fullReferences"/> by resolved publishing
/// interval (per-tag <see cref="S7TagDefinition.ScanGroup"/> looked up in
/// <see cref="S7DriverOptions.ScanGroupIntervals"/>, falling back to
/// <paramref name="publishingInterval"/>) and starts one background poll loop per
/// distinct interval. The returned <see cref="ISubscriptionHandle"/> is one logical
/// subscription that owns N partition loops; <see cref="UnsubscribeAsync"/> tears
/// them all down together.
/// </summary>
/// <remarks>
/// Each partition shares the per-driver <c>_gate</c> semaphore, so wire-level reads
/// stay strictly serial — the multi-rate split decouples tick cadence (a fast HMI tag
/// isn't blocked behind a slow batch's <c>Task.Delay</c>) but does NOT parallelise
/// mailbox traffic. The "1 connection / 1 mailbox" caveat is documented in
/// <c>docs/v2/s7.md</c>.
/// </remarks>
public Task<ISubscriptionHandle> SubscribeAsync(
IReadOnlyList<string> fullReferences, TimeSpan publishingInterval, CancellationToken cancellationToken)
{
var id = Interlocked.Increment(ref _nextSubscriptionId);
var cts = new CancellationTokenSource();
// Floor at 100 ms — S7 CPUs scan 2-10 ms but the comms mailbox is processed at most
// once per scan; sub-100 ms polling just queues wire-side with worse latency.
var interval = publishingInterval < TimeSpan.FromMilliseconds(100)
? TimeSpan.FromMilliseconds(100)
: publishingInterval;
var handle = new S7SubscriptionHandle(id);
var state = new SubscriptionState(handle, [.. fullReferences], interval, cts);
// Floor at 100 ms — S7 CPUs scan 2-10 ms but the comms mailbox is processed at most
// once per scan; sub-100 ms polling just queues wire-side with worse latency. The
// floor applies to BOTH the subscribe-default interval AND any per-group override
// so a misconfigured group can't slip below the protective bound.
var defaultInterval = ApplyMinInterval(publishingInterval);
// Bucket tags by resolved interval. Tags with no ScanGroup, or with a group not in
// the rate map, fall back to the subscription-default rate. This preserves the
// legacy single-rate path: an unconfigured driver gets exactly one partition.
var partitions = new Dictionary<TimeSpan, List<string>>();
foreach (var tagRef in fullReferences)
{
var interval = ResolveInterval(tagRef, defaultInterval);
if (!partitions.TryGetValue(interval, out var list))
{
list = [];
partitions[interval] = list;
}
list.Add(tagRef);
}
var partitionStates = new List<PartitionState>(partitions.Count);
foreach (var (interval, refs) in partitions)
{
var partCts = new CancellationTokenSource();
var part = new PartitionState(refs, interval, partCts);
partitionStates.Add(part);
}
var state = new SubscriptionState(handle, [.. fullReferences], defaultInterval, partitionStates);
_subscriptions[id] = state;
_ = Task.Run(() => PollLoopAsync(state, cts.Token), cts.Token);
// Start each partition loop AFTER the state is registered so an early UnsubscribeAsync
// (e.g. the OPC UA stack tearing the session down on session cancel) doesn't race
// ahead of the partitions' Task.Run kickoff.
foreach (var part in partitionStates)
_ = Task.Run(() => PollLoopAsync(handle, part, part.Cts.Token), part.Cts.Token);
return Task.FromResult<ISubscriptionHandle>(handle);
}
@@ -1181,49 +1232,135 @@ public sealed class S7Driver(S7DriverOptions options, string driverInstanceId)
{
if (handle is S7SubscriptionHandle h && _subscriptions.TryRemove(h.Id, out var state))
{
state.Cts.Cancel();
state.Cts.Dispose();
foreach (var part in state.Partitions)
{
try { part.Cts.Cancel(); } catch { }
part.Cts.Dispose();
}
}
return Task.CompletedTask;
}
private async Task PollLoopAsync(SubscriptionState state, CancellationToken ct)
/// <summary>
/// Apply the 100 ms floor to a caller-supplied publishing interval. Internal so
/// <see cref="SubscribeAsync"/> can guard both the default + every per-group rate.
/// </summary>
private static TimeSpan ApplyMinInterval(TimeSpan requested) =>
requested < TimeSpan.FromMilliseconds(100) ? TimeSpan.FromMilliseconds(100) : requested;
/// <summary>
/// Resolve the publishing interval for one tag — <see cref="S7DriverOptions.ScanGroupIntervals"/>
/// wins when the tag's <see cref="S7TagDefinition.ScanGroup"/> is present, otherwise
/// fall back to the subscription default. Unknown tags (not in the driver's map)
/// fall back to the default — the poll loop will surface them as BadNodeIdUnknown
/// anyway via <see cref="ReadAsync"/>.
/// </summary>
internal TimeSpan ResolveInterval(string tagRef, TimeSpan defaultInterval)
{
// Initial-data push per OPC UA Part 4 convention.
try { await PollOnceAsync(state, forceRaise: true, ct).ConfigureAwait(false); }
if (_options.ScanGroupIntervals is { Count: > 0 } map &&
_tagsByName.TryGetValue(tagRef, out var def) &&
!string.IsNullOrWhiteSpace(def.ScanGroup) &&
// Case-insensitive lookup: scan group names come from human-typed config
// and the JSON DTO already lower-cases the lookup, so don't make ScanGroup
// values case-sensitive at runtime either.
TryGetCaseInsensitive(map, def.ScanGroup!, out var groupInterval))
{
return ApplyMinInterval(groupInterval);
}
return defaultInterval;
}
private static bool TryGetCaseInsensitive(IReadOnlyDictionary<string, TimeSpan> map, string key, out TimeSpan value)
{
if (map.TryGetValue(key, out value)) return true;
foreach (var kvp in map)
if (string.Equals(kvp.Key, key, StringComparison.OrdinalIgnoreCase))
{
value = kvp.Value;
return true;
}
value = default;
return false;
}
/// <summary>
/// Test-only: count of distinct partition loops a subscription handle owns. Used by
/// <c>S7ScanGroupPartitioningTests</c> to assert that 3 tags at 3 rates produce 3
/// partitions (and 3 tags at 1 rate produce 1 partition).
/// </summary>
internal int GetPartitionCount(ISubscriptionHandle handle) =>
handle is S7SubscriptionHandle h && _subscriptions.TryGetValue(h.Id, out var state)
? state.Partitions.Count
: 0;
/// <summary>
/// Test-only: snapshot of the (interval, tag-count) pairs for a subscription's
/// partitions. Surfaces the actual partitioning so tests can assert "5 tags split
/// 2 + 3" without grepping the poll-loop internals.
/// </summary>
internal IReadOnlyList<(TimeSpan Interval, int TagCount)> GetPartitionSummary(ISubscriptionHandle handle) =>
handle is S7SubscriptionHandle h && _subscriptions.TryGetValue(h.Id, out var state)
? [.. state.Partitions.Select(p => (p.Interval, p.TagReferences.Count))]
: [];
private async Task PollLoopAsync(S7SubscriptionHandle handle, PartitionState part, CancellationToken ct)
{
// Initial-data push per OPC UA Part 4 convention. Each partition does its own
// initial push: the OPC UA stack receives one DataChange per tag at subscribe time
// regardless of which partition the tag landed in.
try { await PollOnceAsync(handle, part, forceRaise: true, ct).ConfigureAwait(false); }
catch (OperationCanceledException) { return; }
catch { /* first-read error — polling continues */ }
while (!ct.IsCancellationRequested)
{
try { await Task.Delay(state.Interval, ct).ConfigureAwait(false); }
try { await Task.Delay(part.Interval, ct).ConfigureAwait(false); }
catch (OperationCanceledException) { return; }
try { await PollOnceAsync(state, forceRaise: false, ct).ConfigureAwait(false); }
try { await PollOnceAsync(handle, part, forceRaise: false, ct).ConfigureAwait(false); }
catch (OperationCanceledException) { return; }
catch { /* transient polling error — loop continues, health surface reflects it */ }
}
}
private async Task PollOnceAsync(SubscriptionState state, bool forceRaise, CancellationToken ct)
private async Task PollOnceAsync(S7SubscriptionHandle handle, PartitionState part, bool forceRaise, CancellationToken ct)
{
var snapshots = await ReadAsync(state.TagReferences, ct).ConfigureAwait(false);
for (var i = 0; i < state.TagReferences.Count; i++)
// ReadAsync takes _gate internally, which is what serialises every partition's
// wire traffic against the single S7 connection. Multiple partitions racing for
// the gate is fine — short-running ones get serviced inside the long ones' Delay
// window, which is exactly the cadence-decoupling we want from PR-S7-C3.
var snapshots = await ReadAsync(part.TagReferences, ct).ConfigureAwait(false);
for (var i = 0; i < part.TagReferences.Count; i++)
{
var tagRef = state.TagReferences[i];
var tagRef = part.TagReferences[i];
var current = snapshots[i];
var lastSeen = state.LastValues.TryGetValue(tagRef, out var prev) ? prev : default;
var lastSeen = part.LastValues.TryGetValue(tagRef, out var prev) ? prev : default;
if (forceRaise || !Equals(lastSeen?.Value, current.Value) || lastSeen?.StatusCode != current.StatusCode)
{
state.LastValues[tagRef] = current;
OnDataChange?.Invoke(this, new DataChangeEventArgs(state.Handle, tagRef, current));
part.LastValues[tagRef] = current;
OnDataChange?.Invoke(this, new DataChangeEventArgs(handle, tagRef, current));
}
}
}
/// <summary>
/// One subscription owns N partitions, one per distinct publishing interval.
/// <see cref="TagReferences"/> is the original (unpartitioned) request preserved for
/// diagnostics — runtime polling is driven by <see cref="Partitions"/>.
/// </summary>
private sealed record SubscriptionState(
S7SubscriptionHandle Handle,
IReadOnlyList<string> TagReferences,
TimeSpan DefaultInterval,
IReadOnlyList<PartitionState> Partitions);
/// <summary>
/// One poll loop's worth of state: the tags it owns, its tick interval, its
/// per-tag last-seen cache, and the CTS that <see cref="UnsubscribeAsync"/> /
/// <see cref="DisposeAsync"/> trip.
/// </summary>
private sealed record PartitionState(
IReadOnlyList<string> TagReferences,
TimeSpan Interval,
CancellationTokenSource Cts)

View File

@@ -34,6 +34,22 @@ public static class S7DriverFactoryExtensions
throw new InvalidOperationException(
$"S7 driver config for '{driverInstanceId}' missing required Host");
// PR-S7-C3 — translate ScanGroupIntervalsMs (string -> int ms) into the runtime
// string -> TimeSpan map. Skip any entry with a non-positive value rather than
// throwing, so a config typo (e.g. 0 ms) degrades to "fall back to default
// publishing interval" instead of breaking the whole driver init.
IReadOnlyDictionary<string, TimeSpan>? scanGroupMap = null;
if (dto.ScanGroupIntervalsMs is { Count: > 0 })
{
var built = new Dictionary<string, TimeSpan>(StringComparer.OrdinalIgnoreCase);
foreach (var kvp in dto.ScanGroupIntervalsMs)
{
if (string.IsNullOrWhiteSpace(kvp.Key) || kvp.Value <= 0) continue;
built[kvp.Key] = TimeSpan.FromMilliseconds(kvp.Value);
}
if (built.Count > 0) scanGroupMap = built;
}
var options = new S7DriverOptions
{
Host = dto.Host!,
@@ -57,6 +73,7 @@ public static class S7DriverFactoryExtensions
fallback: TsapMode.Auto),
LocalTsap = dto.LocalTsap,
RemoteTsap = dto.RemoteTsap,
ScanGroupIntervals = scanGroupMap,
};
return new S7Driver(options, driverInstanceId);
@@ -72,7 +89,8 @@ public static class S7DriverFactoryExtensions
tagName: t.Name),
Writable: t.Writable ?? true,
StringLength: t.StringLength ?? 254,
WriteIdempotent: t.WriteIdempotent ?? false);
WriteIdempotent: t.WriteIdempotent ?? false,
ScanGroup: string.IsNullOrWhiteSpace(t.ScanGroup) ? null : t.ScanGroup);
private static T ParseEnum<T>(string? raw, string driverInstanceId, string field,
string? tagName = null, T? fallback = null) where T : struct, Enum
@@ -122,6 +140,15 @@ public static class S7DriverFactoryExtensions
/// <summary>Optional 16-bit remote TSAP override. Required (with <see cref="LocalTsap"/>) when <c>TsapMode = Other</c>.</summary>
public ushort? RemoteTsap { get; init; }
/// <summary>
/// PR-S7-C3 — optional scan-group → publishing-interval (ms) map. Tags carrying
/// a matching <see cref="S7TagDto.ScanGroup"/> string poll at the configured
/// rate; tags with no group, or with a group not present here, fall back to
/// the subscription default. Group names are matched case-insensitively. See
/// <c>docs/v2/s7.md</c> "Per-tag scan groups" section.
/// </summary>
public Dictionary<string, int>? ScanGroupIntervalsMs { get; init; }
}
internal sealed class S7TagDto
@@ -132,6 +159,14 @@ public static class S7DriverFactoryExtensions
public bool? Writable { get; init; }
public int? StringLength { get; init; }
public bool? WriteIdempotent { get; init; }
/// <summary>
/// PR-S7-C3 — optional scan-group identifier. Resolved against
/// <see cref="S7DriverConfigDto.ScanGroupIntervalsMs"/> at subscribe time.
/// Null / empty = no group (legacy behaviour, falls back to subscription
/// default publishing interval).
/// </summary>
public string? ScanGroup { get; init; }
}
internal sealed class S7ProbeDto

View File

@@ -122,6 +122,23 @@ public sealed class S7DriverOptions
/// non-<see cref="TsapMode.Auto"/> mode.
/// </summary>
public ushort? RemoteTsap { get; init; }
/// <summary>
/// PR-S7-C3 — per-tag scan-group → publishing-interval map. When a tag declares a
/// <see cref="S7TagDefinition.ScanGroup"/>, <see cref="S7Driver.SubscribeAsync"/>
/// resolves its publishing rate by looking the group up in this dictionary; tags
/// without a group, or with a group not present here, fall back to the
/// subscription-default <c>publishingInterval</c> argument. Keys are matched
/// case-insensitively. See <c>docs/v2/s7.md</c> "Per-tag scan groups" section.
/// </summary>
/// <remarks>
/// The driver still owns one <c>Plc</c> connection serialized through the per-driver
/// <c>_gate</c>, so partitioning into N poll loops does NOT parallelise wire-level
/// reads — every partition queues against the same semaphore. Operator value is
/// decoupling tick cadence: a 100 ms HMI tag isn't blocked behind a 10 s slow-poll
/// batch any more, because the slow batch's <c>Task.Delay</c> isn't holding the gate.
/// </remarks>
public IReadOnlyDictionary<string, TimeSpan>? ScanGroupIntervals { get; init; }
}
/// <summary>
@@ -227,6 +244,15 @@ public sealed class S7ProbeOptions
/// they need bespoke layout handling and are tracked as a follow-up. Capped at 8000 to
/// keep the byte-range request inside a single S7 PDU envelope.
/// </param>
/// <param name="ScanGroup">
/// PR-S7-C3 — optional scan-group identifier. When set, <c>SubscribeAsync</c> looks up
/// the group's publishing interval in <see cref="S7DriverOptions.ScanGroupIntervals"/>
/// and partitions the input tag list so all tags sharing that interval poll together
/// in a dedicated background loop. Tags with no <c>ScanGroup</c>, or with a group not
/// present in the map, fall back to the subscription's default publishing interval
/// (legacy single-rate behaviour). Group names are matched case-insensitively. See
/// <c>docs/v2/s7.md</c> "Per-tag scan groups" section.
/// </param>
public sealed record S7TagDefinition(
string Name,
string Address,
@@ -234,7 +260,8 @@ public sealed record S7TagDefinition(
bool Writable = true,
int StringLength = 254,
bool WriteIdempotent = false,
int? ElementCount = null);
int? ElementCount = null,
string? ScanGroup = null);
public enum S7DataType
{

View File

@@ -0,0 +1,96 @@
using Shouldly;
using Xunit;
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
namespace ZB.MOM.WW.OtOpcUa.Driver.S7.IntegrationTests.S7_1500;
/// <summary>
/// PR-S7-C3 — end-to-end coverage of per-tag scan-group partitioning. Subscribes three
/// tags at three publishing intervals (100 ms / 1 s / 10 s) against the python-snap7
/// S7-1500 fixture and asserts each gets its own tick stream with counts proportional
/// to its rate. Scaffold only; runtime execution gated on the Snap7 fixture being up
/// in CI.
/// </summary>
[Collection(Snap7ServerCollection.Name)]
[Trait("Category", "Integration")]
[Trait("Device", "S7_1500")]
public sealed class S7_1500ScanGroupTests(Snap7ServerFixture sim)
{
[Fact]
public async Task Driver_three_scan_groups_publish_independently()
{
if (sim.SkipReason is not null) Assert.Skip(sim.SkipReason);
// Reuse the smoke profile but override Tags + ScanGroupIntervals so each tag
// lands in its own group. The tags themselves are already seeded by the snap7
// fixture (DB1.DBW0, DB1.DBW10, DB1.DBD20 — same offsets the smoke tests use).
var baseOpts = S7_1500Profile.BuildOptions(sim.Host, sim.Port);
var options = new S7DriverOptions
{
Host = baseOpts.Host,
Port = baseOpts.Port,
CpuType = baseOpts.CpuType,
Rack = baseOpts.Rack,
Slot = baseOpts.Slot,
Timeout = baseOpts.Timeout,
Probe = baseOpts.Probe,
// Three groups, three rates. The 10s "Slow" group exercises the assertion that
// a slow batch's Task.Delay doesn't block faster partitions from polling — if
// the original (single-rate) implementation had been kept, every tag would
// tick at the slowest configured interval.
ScanGroupIntervals = new Dictionary<string, TimeSpan>(StringComparer.OrdinalIgnoreCase)
{
["Fast"] = TimeSpan.FromMilliseconds(100),
["Medium"] = TimeSpan.FromSeconds(1),
["Slow"] = TimeSpan.FromSeconds(10),
},
Tags =
[
new S7TagDefinition("FastProbe", "DB1.DBW0", S7DataType.UInt16, ScanGroup: "Fast"),
new S7TagDefinition("MediumI16", "DB1.DBW10", S7DataType.Int16, ScanGroup: "Medium"),
new S7TagDefinition("SlowI32", "DB1.DBD20", S7DataType.Int32, ScanGroup: "Slow"),
],
};
await using var drv = new S7Driver(options, driverInstanceId: "s7-scangroups");
await drv.InitializeAsync("{}", TestContext.Current.CancellationToken);
// Per-tag tick counters. The OPC UA initial-data push fires once per tag at
// subscribe time regardless of partition (Part 4 contract), so we discount the
// first tick before evaluating the rate ratio.
var ticks = new System.Collections.Concurrent.ConcurrentDictionary<string, int>(StringComparer.OrdinalIgnoreCase);
drv.OnDataChange += (_, e) => ticks.AddOrUpdate(e.FullReference, 1, (_, n) => n + 1);
var handle = await drv.SubscribeAsync(
["FastProbe", "MediumI16", "SlowI32"],
TimeSpan.FromSeconds(1), // default; ignored because every tag carries a group
TestContext.Current.CancellationToken);
// Three groups → three partitions. This is the strongest claim of the PR: the
// driver split the input list into one poll loop per distinct interval.
drv.GetPartitionCount(handle).ShouldBe(3, "three distinct rates → three independent poll loops");
// Run for ~3 s and capture tick counts. With a 100 ms partition, ~30 ticks expected
// (minus the initial-data push, plus jitter). With a 1 s partition, ~3 ticks. With
// a 10 s partition, only the initial-data push fires inside the window.
await Task.Delay(TimeSpan.FromSeconds(3), TestContext.Current.CancellationToken);
await drv.UnsubscribeAsync(handle, TestContext.Current.CancellationToken);
// Discount the initial-data push before checking ratios. After the discount, Fast
// must have produced strictly more ticks than Medium, and Medium must have
// produced at least one tick (Slow stays at 0 inside the 3-second window).
var fastSubsequent = Math.Max(0, (ticks.GetValueOrDefault("FastProbe", 0)) - 1);
var mediumSubsequent = Math.Max(0, (ticks.GetValueOrDefault("MediumI16", 0)) - 1);
// Loose lower bound on Fast — wall-clock jitter on CI runners makes tighter bounds
// flaky. Anything above ~10 ticks in 3 s proves the 100 ms partition is actually
// running (i.e. it's not blocked behind the 10 s slow partition).
fastSubsequent.ShouldBeGreaterThan(10,
$"100 ms partition should fire many times in 3 s; observed Fast={fastSubsequent}, Medium={mediumSubsequent}");
// Strict ordering — the whole point of partitioning is that Fast > Medium even
// when Slow is sitting on a 10 s Task.Delay.
fastSubsequent.ShouldBeGreaterThan(mediumSubsequent,
"Fast partition (100 ms) must out-tick Medium partition (1 s) — partitions are independent");
}
}

View File

@@ -0,0 +1,320 @@
using System.Reflection;
using Shouldly;
using Xunit;
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
namespace ZB.MOM.WW.OtOpcUa.Driver.S7.Tests;
/// <summary>
/// PR-S7-C3 — unit coverage for <see cref="S7Driver.SubscribeAsync"/> partitioning by
/// resolved publishing interval. Each test wires a small tag map + scan-group rate map
/// and asserts the driver spins up exactly the expected number of internal poll loops
/// by inspecting the test-only <c>GetPartitionCount</c> / <c>GetPartitionSummary</c>
/// entry points.
/// </summary>
/// <remarks>
/// These tests don't initialise the driver (no live PLC) and don't drive ticks — they
/// just verify the partitioning math at <c>SubscribeAsync</c> time. End-to-end tick
/// cadence is covered by the integration smoke test
/// <c>Driver_three_scan_groups_publish_independently</c>.
/// </remarks>
[Trait("Category", "Unit")]
public sealed class S7ScanGroupPartitioningTests
{
/// <summary>
/// <see cref="S7Driver"/> resolves a tag's interval against its private
/// <c>_tagsByName</c> dictionary, populated in <c>InitializeAsync</c>. The unit
/// tests don't run init (no live PLC), so seed the dictionary directly via
/// reflection. Mirrors the pattern in <c>S7DiscoveryAndSubscribeTests</c> which
/// also exercises pre-init code paths.
/// </summary>
private static void SeedTagMap(S7Driver drv, params S7TagDefinition[] tags)
{
var field = typeof(S7Driver).GetField("_tagsByName", BindingFlags.NonPublic | BindingFlags.Instance);
field.ShouldNotBeNull();
var map = (Dictionary<string, S7TagDefinition>)field!.GetValue(drv)!;
foreach (var t in tags) map[t.Name] = t;
}
[Fact]
public async Task Three_distinct_intervals_produce_three_partitions()
{
var opts = new S7DriverOptions
{
Host = "192.0.2.1",
Probe = new S7ProbeOptions { Enabled = false },
ScanGroupIntervals = new Dictionary<string, TimeSpan>(StringComparer.OrdinalIgnoreCase)
{
["Fast"] = TimeSpan.FromMilliseconds(100),
["Medium"] = TimeSpan.FromSeconds(1),
["Slow"] = TimeSpan.FromSeconds(10),
},
};
using var drv = new S7Driver(opts, "s7-3rates");
SeedTagMap(drv,
new S7TagDefinition("FastTag", "DB1.DBW0", S7DataType.Int16, ScanGroup: "Fast"),
new S7TagDefinition("MediumTag", "DB1.DBW2", S7DataType.Int16, ScanGroup: "Medium"),
new S7TagDefinition("SlowTag", "DB1.DBW4", S7DataType.Int16, ScanGroup: "Slow"));
var handle = await drv.SubscribeAsync(
["FastTag", "MediumTag", "SlowTag"],
TimeSpan.FromSeconds(1),
TestContext.Current.CancellationToken);
drv.GetPartitionCount(handle).ShouldBe(3, "3 distinct intervals → 3 separate poll loops");
var summary = drv.GetPartitionSummary(handle);
// Every partition owns exactly one tag — perfect 1:1 mapping.
summary.Count.ShouldBe(3);
summary.ShouldAllBe(p => p.TagCount == 1);
summary.Select(p => p.Interval).Order().ShouldBe(new[]
{
TimeSpan.FromMilliseconds(100),
TimeSpan.FromSeconds(1),
TimeSpan.FromSeconds(10),
});
await drv.UnsubscribeAsync(handle, TestContext.Current.CancellationToken);
}
[Fact]
public async Task Same_interval_for_every_tag_collapses_to_one_partition()
{
var opts = new S7DriverOptions
{
Host = "192.0.2.1",
Probe = new S7ProbeOptions { Enabled = false },
ScanGroupIntervals = new Dictionary<string, TimeSpan>(StringComparer.OrdinalIgnoreCase)
{
["Default"] = TimeSpan.FromMilliseconds(500),
},
};
using var drv = new S7Driver(opts, "s7-1rate");
SeedTagMap(drv,
new S7TagDefinition("A", "DB1.DBW0", S7DataType.Int16, ScanGroup: "Default"),
new S7TagDefinition("B", "DB1.DBW2", S7DataType.Int16, ScanGroup: "Default"),
new S7TagDefinition("C", "DB1.DBW4", S7DataType.Int16, ScanGroup: "Default"));
var handle = await drv.SubscribeAsync(
["A", "B", "C"],
TimeSpan.FromMilliseconds(500),
TestContext.Current.CancellationToken);
drv.GetPartitionCount(handle).ShouldBe(1, "all three tags share the same group → single poll loop");
drv.GetPartitionSummary(handle)[0].TagCount.ShouldBe(3);
await drv.UnsubscribeAsync(handle, TestContext.Current.CancellationToken);
}
[Fact]
public async Task Mixed_two_at_100ms_and_three_at_1s_produces_two_partitions_with_correct_counts()
{
var opts = new S7DriverOptions
{
Host = "192.0.2.1",
Probe = new S7ProbeOptions { Enabled = false },
ScanGroupIntervals = new Dictionary<string, TimeSpan>(StringComparer.OrdinalIgnoreCase)
{
["Hmi"] = TimeSpan.FromMilliseconds(100),
["Slow"] = TimeSpan.FromSeconds(1),
},
};
using var drv = new S7Driver(opts, "s7-mixed");
SeedTagMap(drv,
new S7TagDefinition("Hmi1", "DB1.DBW0", S7DataType.Int16, ScanGroup: "Hmi"),
new S7TagDefinition("Hmi2", "DB1.DBW2", S7DataType.Int16, ScanGroup: "Hmi"),
new S7TagDefinition("Slow1", "DB1.DBW10", S7DataType.Int16, ScanGroup: "Slow"),
new S7TagDefinition("Slow2", "DB1.DBW12", S7DataType.Int16, ScanGroup: "Slow"),
new S7TagDefinition("Slow3", "DB1.DBW14", S7DataType.Int16, ScanGroup: "Slow"));
var handle = await drv.SubscribeAsync(
["Hmi1", "Hmi2", "Slow1", "Slow2", "Slow3"],
TimeSpan.FromMilliseconds(500),
TestContext.Current.CancellationToken);
drv.GetPartitionCount(handle).ShouldBe(2);
var summary = drv.GetPartitionSummary(handle);
var fast = summary.Single(p => p.Interval == TimeSpan.FromMilliseconds(100));
var slow = summary.Single(p => p.Interval == TimeSpan.FromSeconds(1));
fast.TagCount.ShouldBe(2, "Hmi1 + Hmi2 share the 100 ms partition");
slow.TagCount.ShouldBe(3, "Slow1 + Slow2 + Slow3 share the 1 s partition");
await drv.UnsubscribeAsync(handle, TestContext.Current.CancellationToken);
}
[Fact]
public async Task Tags_without_scan_group_fall_back_to_subscription_default_interval()
{
var opts = new S7DriverOptions
{
Host = "192.0.2.1",
Probe = new S7ProbeOptions { Enabled = false },
// No ScanGroupIntervals map — every tag must resolve to the subscription default.
};
using var drv = new S7Driver(opts, "s7-no-groups");
SeedTagMap(drv,
new S7TagDefinition("Plain1", "DB1.DBW0", S7DataType.Int16),
new S7TagDefinition("Plain2", "DB1.DBW2", S7DataType.Int16));
var handle = await drv.SubscribeAsync(
["Plain1", "Plain2"],
TimeSpan.FromMilliseconds(750),
TestContext.Current.CancellationToken);
drv.GetPartitionCount(handle).ShouldBe(1, "no groups configured → legacy single-partition behaviour");
drv.GetPartitionSummary(handle)[0].Interval.ShouldBe(TimeSpan.FromMilliseconds(750));
await drv.UnsubscribeAsync(handle, TestContext.Current.CancellationToken);
}
[Fact]
public async Task Tag_with_unknown_scan_group_falls_back_to_subscription_default()
{
// Operator typo: tag declares "Fst" but the rate map has "Fast". The driver should
// NOT throw — instead the tag silently falls through to the subscription default,
// matching the "config typo degrades, doesn't break" stance from the factory layer.
var opts = new S7DriverOptions
{
Host = "192.0.2.1",
Probe = new S7ProbeOptions { Enabled = false },
ScanGroupIntervals = new Dictionary<string, TimeSpan>(StringComparer.OrdinalIgnoreCase)
{
["Fast"] = TimeSpan.FromMilliseconds(100),
},
};
using var drv = new S7Driver(opts, "s7-typo");
SeedTagMap(drv,
new S7TagDefinition("Typo", "DB1.DBW0", S7DataType.Int16, ScanGroup: "Fst"),
new S7TagDefinition("Real", "DB1.DBW2", S7DataType.Int16, ScanGroup: "Fast"));
var handle = await drv.SubscribeAsync(
["Typo", "Real"],
TimeSpan.FromSeconds(2),
TestContext.Current.CancellationToken);
var summary = drv.GetPartitionSummary(handle);
summary.Count.ShouldBe(2);
summary.Single(p => p.Interval == TimeSpan.FromMilliseconds(100)).TagCount.ShouldBe(1);
summary.Single(p => p.Interval == TimeSpan.FromSeconds(2)).TagCount.ShouldBe(1);
await drv.UnsubscribeAsync(handle, TestContext.Current.CancellationToken);
}
[Fact]
public async Task Scan_group_lookup_is_case_insensitive()
{
// The factory DTO already lower-cases keys — runtime lookup must follow suit so a
// tag declaring "FAST" still resolves against a rate-map key of "Fast".
var opts = new S7DriverOptions
{
Host = "192.0.2.1",
Probe = new S7ProbeOptions { Enabled = false },
ScanGroupIntervals = new Dictionary<string, TimeSpan>(StringComparer.Ordinal)
{
["Fast"] = TimeSpan.FromMilliseconds(100),
},
};
using var drv = new S7Driver(opts, "s7-caseins");
SeedTagMap(drv,
new S7TagDefinition("T", "DB1.DBW0", S7DataType.Int16, ScanGroup: "FAST"));
var handle = await drv.SubscribeAsync(
["T"], TimeSpan.FromSeconds(2), TestContext.Current.CancellationToken);
drv.GetPartitionSummary(handle)[0].Interval.ShouldBe(TimeSpan.FromMilliseconds(100));
await drv.UnsubscribeAsync(handle, TestContext.Current.CancellationToken);
}
[Fact]
public async Task Group_interval_below_100ms_is_floored_to_100ms()
{
// The 100 ms floor protects the S7 mailbox from sub-scan polling. It applies to
// BOTH the subscription default AND any per-group override — a mis-typed 50 ms
// group rate must NOT slip below the floor.
var opts = new S7DriverOptions
{
Host = "192.0.2.1",
Probe = new S7ProbeOptions { Enabled = false },
ScanGroupIntervals = new Dictionary<string, TimeSpan>(StringComparer.OrdinalIgnoreCase)
{
["TooFast"] = TimeSpan.FromMilliseconds(25),
},
};
using var drv = new S7Driver(opts, "s7-floor-group");
SeedTagMap(drv,
new S7TagDefinition("T", "DB1.DBW0", S7DataType.Int16, ScanGroup: "TooFast"));
var handle = await drv.SubscribeAsync(
["T"], TimeSpan.FromSeconds(1), TestContext.Current.CancellationToken);
drv.GetPartitionSummary(handle)[0].Interval.ShouldBe(TimeSpan.FromMilliseconds(100));
await drv.UnsubscribeAsync(handle, TestContext.Current.CancellationToken);
}
[Fact]
public async Task Unsubscribe_disposes_every_partition_engine()
{
// Every partition CTS must be cancelled + disposed so a multi-rate subscription
// doesn't leak background poll tasks on unsubscribe. Verified indirectly by the
// partition-count dropping to zero post-unsubscribe (the dictionary is purged) and
// by Dispose being safe to call again with no exception.
var opts = new S7DriverOptions
{
Host = "192.0.2.1",
Probe = new S7ProbeOptions { Enabled = false },
ScanGroupIntervals = new Dictionary<string, TimeSpan>(StringComparer.OrdinalIgnoreCase)
{
["A"] = TimeSpan.FromMilliseconds(100),
["B"] = TimeSpan.FromMilliseconds(200),
["C"] = TimeSpan.FromMilliseconds(500),
},
};
using var drv = new S7Driver(opts, "s7-cleanup");
SeedTagMap(drv,
new S7TagDefinition("Ta", "DB1.DBW0", S7DataType.Int16, ScanGroup: "A"),
new S7TagDefinition("Tb", "DB1.DBW2", S7DataType.Int16, ScanGroup: "B"),
new S7TagDefinition("Tc", "DB1.DBW4", S7DataType.Int16, ScanGroup: "C"));
var handle = await drv.SubscribeAsync(
["Ta", "Tb", "Tc"],
TimeSpan.FromSeconds(1),
TestContext.Current.CancellationToken);
drv.GetPartitionCount(handle).ShouldBe(3);
await drv.UnsubscribeAsync(handle, TestContext.Current.CancellationToken);
drv.GetPartitionCount(handle).ShouldBe(0, "post-unsubscribe the subscription is purged from the dictionary");
// Idempotent: a second unsubscribe must be a no-op, not throw.
await drv.UnsubscribeAsync(handle, TestContext.Current.CancellationToken);
}
[Fact]
public async Task Legacy_single_rate_path_unchanged_when_no_tag_carries_a_scan_group()
{
// Sanity: existing deployments that don't set ScanGroup or ScanGroupIntervals must
// see exactly one partition with exactly the requested publishing interval — the
// PR is opt-in and zero-impact for legacy callers.
var opts = new S7DriverOptions
{
Host = "192.0.2.1",
Probe = new S7ProbeOptions { Enabled = false },
};
using var drv = new S7Driver(opts, "s7-legacy");
SeedTagMap(drv,
new S7TagDefinition("L1", "DB1.DBW0", S7DataType.Int16),
new S7TagDefinition("L2", "DB1.DBW2", S7DataType.Int16));
var handle = await drv.SubscribeAsync(
["L1", "L2"], TimeSpan.FromMilliseconds(250), TestContext.Current.CancellationToken);
drv.GetPartitionCount(handle).ShouldBe(1);
drv.GetPartitionSummary(handle)[0].Interval.ShouldBe(TimeSpan.FromMilliseconds(250));
drv.GetPartitionSummary(handle)[0].TagCount.ShouldBe(2);
await drv.UnsubscribeAsync(handle, TestContext.Current.CancellationToken);
}
}