From 162c82b8d9a998ce0124d8c2520aa5f3e4e87c19 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sun, 26 Apr 2026 01:03:00 -0400 Subject: [PATCH] =?UTF-8?q?Auto:=20s7-c3=20=E2=80=94=20per-tag=20scan=20gr?= =?UTF-8?q?oup=20/=20publish=20rate?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Closes #296 --- docs/v2/s7.md | 95 ++++++ src/ZB.MOM.WW.OtOpcUa.Driver.S7/S7Driver.cs | 185 ++++++++-- .../S7DriverFactoryExtensions.cs | 37 +- .../S7DriverOptions.cs | 29 +- .../S7_1500/S7_1500ScanGroupTests.cs | 96 ++++++ .../S7ScanGroupPartitioningTests.cs | 320 ++++++++++++++++++ 6 files changed, 736 insertions(+), 26 deletions(-) create mode 100644 tests/ZB.MOM.WW.OtOpcUa.Driver.S7.IntegrationTests/S7_1500/S7_1500ScanGroupTests.cs create mode 100644 tests/ZB.MOM.WW.OtOpcUa.Driver.S7.Tests/S7ScanGroupPartitioningTests.cs diff --git a/docs/v2/s7.md b/docs/v2/s7.md index 36d81a2..81b51e7 100644 --- a/docs/v2/s7.md +++ b/docs/v2/s7.md @@ -573,6 +573,101 @@ S7 driver health without reaching for a Wireshark capture: The values render alongside Modbus / OPC UA Client metrics in the Admin UI driver-diagnostics panel — same RPC, same dashboard row layout. +### Per-tag scan groups + +Before PR-S7-C3, `ISubscribable.SubscribeAsync` took **one** publishing +interval and applied it to every tag in the input list. A site that wanted +mixed cadences — say a 100 ms HMI pulse, a 1 s dashboard tile, and a 10 s +slow-poll for trend data — had to issue **three separate subscribe calls**, +each with its own list of tags. That works, but it pushes the partitioning +problem up to the caller (the OPC UA address space layer) and means an +operator can't express "this tag is slow-poll" purely in driver config. + +PR-S7-C3 adds **per-tag scan groups** so a single `SubscribeAsync` call +naturally splits into N independent poll loops: + +- `S7TagDefinition.ScanGroup` (string, optional) — the group identifier the + tag belongs to. Tags with no group (or with a group not declared in the + rate map below) keep the legacy behaviour and inherit the + subscription-default publishing interval. +- `S7DriverOptions.ScanGroupIntervals` (`IReadOnlyDictionary`, + optional) — the rate map. Group names are matched case-insensitively. Any + group with a non-positive interval (≤ 0 ms) is silently dropped at config + load and tags falling back to that group land in the default partition. + +At subscribe time the driver buckets the input tag list by **resolved +publishing interval** (per-tag group → map lookup → fallback to the +subscription default), then spins up one background poll loop per distinct +interval. Each loop owns its own `CancellationTokenSource` and its own +`LastValues` cache; `UnsubscribeAsync` cancels and disposes every per-group +loop together so a multi-rate subscription can't leak background tasks. + +#### JSON config example + +```json +{ + "Host": "10.0.0.50", + "ScanGroupIntervalsMs": { + "Fast": 100, + "Medium": 1000, + "Slow": 10000 + }, + "Tags": [ + { "Name": "PressureSetpoint", "Address": "DB1.DBW0", "DataType": "Int16", "ScanGroup": "Fast" }, + { "Name": "BatchTotal", "Address": "DB1.DBD10", "DataType": "Int32", "ScanGroup": "Medium" }, + { "Name": "TrendBucket", "Address": "DB1.DBD20", "DataType": "Float32", "ScanGroup": "Slow" } + ] +} +``` + +A single `SubscribeAsync(["PressureSetpoint","BatchTotal","TrendBucket"], 1s)` +call against this driver produces **three independent poll loops** — +the fast HMI tag ticks at 100 ms, the dashboard tile at 1 s, the trend +bucket at 10 s. The caller-supplied 1 s default is unused because every +tag carries an explicit group. + +#### 100 ms floor applies per partition + +The `100 ms` floor that protects the S7 mailbox from sub-scan polling +applies to **both** the subscription default **and** every per-group rate. +A typo'd entry like `{"TooFast": 25}` is silently floored to 100 ms at +partition-build time — the driver never schedules a sub-100 ms `Task.Delay` +even if the operator tries. + +#### `_gate` contention caveat — "1 connection / 1 mailbox" + +Partitioning into N poll loops does **not** parallelise wire-level reads. +S7netplus's documented pattern is one `Plc` instance per CPU, and the +driver enforces that with a per-instance `SemaphoreSlim` (`_gate`) that +every read takes before touching the socket. All N partitions share the +same gate, so the **mailbox is still strictly serial** — what the multi-rate +split actually buys you is **cadence decoupling**: + +- Before PR-S7-C3: every tag ticked at the slowest configured interval (or + required three separate subscribe calls and three separate logical + subscription handles, complicating the address-space layer). +- After PR-S7-C3: a 100 ms HMI tag isn't blocked behind a 10 s slow-poll + batch's `Task.Delay`. While Slow is sleeping, the gate is free and Fast + acquires it, polls, releases. The CPU sees more frequent small requests + rather than infrequent large ones — which is what you want for a + responsive HMI surface. + +The caveat to be aware of: if Fast's per-tick read takes longer than its +tick interval (e.g. 100 ms tick but 200 ms gate-held read because Medium +or Slow happens to be mid-read on the gate), Fast's effective cadence +slows to "as fast as the gate lets me." That's a property of S7netplus's +single-connection design, not of partitioning — three separate driver +instances against the same CPU would just waste the CPU's +8-64-connection-resource budget without speeding anything up. + +#### Diagnostics + +Partition counts aren't yet surfaced under +`DriverHealth.Diagnostics` (planned for a follow-up alongside per-partition +tick rate). Tests can call the internal helpers `S7Driver.GetPartitionCount` +and `S7Driver.GetPartitionSummary` to inspect the resolved partitioning of +a live subscription handle. + ## TSAP / Connection Type S7comm runs on top of ISO-on-TCP (RFC 1006), and the COTP connection-request diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.S7/S7Driver.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.S7/S7Driver.cs index 49633e8..287a67a 100644 --- a/src/ZB.MOM.WW.OtOpcUa.Driver.S7/S7Driver.cs +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.S7/S7Driver.cs @@ -215,10 +215,15 @@ public sealed class S7Driver(S7DriverOptions options, string driverInstanceId) _probeCts?.Dispose(); _probeCts = null; + // PR-S7-C3 — every subscription owns N partition CTSs; tear them all down so a + // shutdown mid-poll doesn't leave background tasks running against a closed Plc. foreach (var state in _subscriptions.Values) { - try { state.Cts.Cancel(); } catch { } - state.Cts.Dispose(); + foreach (var part in state.Partitions) + { + try { part.Cts.Cancel(); } catch { } + part.Cts.Dispose(); + } } _subscriptions.Clear(); @@ -1160,20 +1165,66 @@ public sealed class S7Driver(S7DriverOptions options, string driverInstanceId) // ---- ISubscribable (polling overlay) ---- + /// + /// PR-S7-C3 — partitions by resolved publishing + /// interval (per-tag looked up in + /// , falling back to + /// ) and starts one background poll loop per + /// distinct interval. The returned is one logical + /// subscription that owns N partition loops; tears + /// them all down together. + /// + /// + /// Each partition shares the per-driver _gate semaphore, so wire-level reads + /// stay strictly serial — the multi-rate split decouples tick cadence (a fast HMI tag + /// isn't blocked behind a slow batch's Task.Delay) but does NOT parallelise + /// mailbox traffic. The "1 connection / 1 mailbox" caveat is documented in + /// docs/v2/s7.md. + /// public Task SubscribeAsync( IReadOnlyList fullReferences, TimeSpan publishingInterval, CancellationToken cancellationToken) { var id = Interlocked.Increment(ref _nextSubscriptionId); - var cts = new CancellationTokenSource(); - // Floor at 100 ms — S7 CPUs scan 2-10 ms but the comms mailbox is processed at most - // once per scan; sub-100 ms polling just queues wire-side with worse latency. - var interval = publishingInterval < TimeSpan.FromMilliseconds(100) - ? TimeSpan.FromMilliseconds(100) - : publishingInterval; var handle = new S7SubscriptionHandle(id); - var state = new SubscriptionState(handle, [.. fullReferences], interval, cts); + + // Floor at 100 ms — S7 CPUs scan 2-10 ms but the comms mailbox is processed at most + // once per scan; sub-100 ms polling just queues wire-side with worse latency. The + // floor applies to BOTH the subscribe-default interval AND any per-group override + // so a misconfigured group can't slip below the protective bound. + var defaultInterval = ApplyMinInterval(publishingInterval); + + // Bucket tags by resolved interval. Tags with no ScanGroup, or with a group not in + // the rate map, fall back to the subscription-default rate. This preserves the + // legacy single-rate path: an unconfigured driver gets exactly one partition. + var partitions = new Dictionary>(); + foreach (var tagRef in fullReferences) + { + var interval = ResolveInterval(tagRef, defaultInterval); + if (!partitions.TryGetValue(interval, out var list)) + { + list = []; + partitions[interval] = list; + } + list.Add(tagRef); + } + + var partitionStates = new List(partitions.Count); + foreach (var (interval, refs) in partitions) + { + var partCts = new CancellationTokenSource(); + var part = new PartitionState(refs, interval, partCts); + partitionStates.Add(part); + } + + var state = new SubscriptionState(handle, [.. fullReferences], defaultInterval, partitionStates); _subscriptions[id] = state; - _ = Task.Run(() => PollLoopAsync(state, cts.Token), cts.Token); + + // Start each partition loop AFTER the state is registered so an early UnsubscribeAsync + // (e.g. the OPC UA stack tearing the session down on session cancel) doesn't race + // ahead of the partitions' Task.Run kickoff. + foreach (var part in partitionStates) + _ = Task.Run(() => PollLoopAsync(handle, part, part.Cts.Token), part.Cts.Token); + return Task.FromResult(handle); } @@ -1181,49 +1232,135 @@ public sealed class S7Driver(S7DriverOptions options, string driverInstanceId) { if (handle is S7SubscriptionHandle h && _subscriptions.TryRemove(h.Id, out var state)) { - state.Cts.Cancel(); - state.Cts.Dispose(); + foreach (var part in state.Partitions) + { + try { part.Cts.Cancel(); } catch { } + part.Cts.Dispose(); + } } return Task.CompletedTask; } - private async Task PollLoopAsync(SubscriptionState state, CancellationToken ct) + /// + /// Apply the 100 ms floor to a caller-supplied publishing interval. Internal so + /// can guard both the default + every per-group rate. + /// + private static TimeSpan ApplyMinInterval(TimeSpan requested) => + requested < TimeSpan.FromMilliseconds(100) ? TimeSpan.FromMilliseconds(100) : requested; + + /// + /// Resolve the publishing interval for one tag — + /// wins when the tag's is present, otherwise + /// fall back to the subscription default. Unknown tags (not in the driver's map) + /// fall back to the default — the poll loop will surface them as BadNodeIdUnknown + /// anyway via . + /// + internal TimeSpan ResolveInterval(string tagRef, TimeSpan defaultInterval) { - // Initial-data push per OPC UA Part 4 convention. - try { await PollOnceAsync(state, forceRaise: true, ct).ConfigureAwait(false); } + if (_options.ScanGroupIntervals is { Count: > 0 } map && + _tagsByName.TryGetValue(tagRef, out var def) && + !string.IsNullOrWhiteSpace(def.ScanGroup) && + // Case-insensitive lookup: scan group names come from human-typed config + // and the JSON DTO already lower-cases the lookup, so don't make ScanGroup + // values case-sensitive at runtime either. + TryGetCaseInsensitive(map, def.ScanGroup!, out var groupInterval)) + { + return ApplyMinInterval(groupInterval); + } + return defaultInterval; + } + + private static bool TryGetCaseInsensitive(IReadOnlyDictionary map, string key, out TimeSpan value) + { + if (map.TryGetValue(key, out value)) return true; + foreach (var kvp in map) + if (string.Equals(kvp.Key, key, StringComparison.OrdinalIgnoreCase)) + { + value = kvp.Value; + return true; + } + value = default; + return false; + } + + /// + /// Test-only: count of distinct partition loops a subscription handle owns. Used by + /// S7ScanGroupPartitioningTests to assert that 3 tags at 3 rates produce 3 + /// partitions (and 3 tags at 1 rate produce 1 partition). + /// + internal int GetPartitionCount(ISubscriptionHandle handle) => + handle is S7SubscriptionHandle h && _subscriptions.TryGetValue(h.Id, out var state) + ? state.Partitions.Count + : 0; + + /// + /// Test-only: snapshot of the (interval, tag-count) pairs for a subscription's + /// partitions. Surfaces the actual partitioning so tests can assert "5 tags split + /// 2 + 3" without grepping the poll-loop internals. + /// + internal IReadOnlyList<(TimeSpan Interval, int TagCount)> GetPartitionSummary(ISubscriptionHandle handle) => + handle is S7SubscriptionHandle h && _subscriptions.TryGetValue(h.Id, out var state) + ? [.. state.Partitions.Select(p => (p.Interval, p.TagReferences.Count))] + : []; + + private async Task PollLoopAsync(S7SubscriptionHandle handle, PartitionState part, CancellationToken ct) + { + // Initial-data push per OPC UA Part 4 convention. Each partition does its own + // initial push: the OPC UA stack receives one DataChange per tag at subscribe time + // regardless of which partition the tag landed in. + try { await PollOnceAsync(handle, part, forceRaise: true, ct).ConfigureAwait(false); } catch (OperationCanceledException) { return; } catch { /* first-read error — polling continues */ } while (!ct.IsCancellationRequested) { - try { await Task.Delay(state.Interval, ct).ConfigureAwait(false); } + try { await Task.Delay(part.Interval, ct).ConfigureAwait(false); } catch (OperationCanceledException) { return; } - try { await PollOnceAsync(state, forceRaise: false, ct).ConfigureAwait(false); } + try { await PollOnceAsync(handle, part, forceRaise: false, ct).ConfigureAwait(false); } catch (OperationCanceledException) { return; } catch { /* transient polling error — loop continues, health surface reflects it */ } } } - private async Task PollOnceAsync(SubscriptionState state, bool forceRaise, CancellationToken ct) + private async Task PollOnceAsync(S7SubscriptionHandle handle, PartitionState part, bool forceRaise, CancellationToken ct) { - var snapshots = await ReadAsync(state.TagReferences, ct).ConfigureAwait(false); - for (var i = 0; i < state.TagReferences.Count; i++) + // ReadAsync takes _gate internally, which is what serialises every partition's + // wire traffic against the single S7 connection. Multiple partitions racing for + // the gate is fine — short-running ones get serviced inside the long ones' Delay + // window, which is exactly the cadence-decoupling we want from PR-S7-C3. + var snapshots = await ReadAsync(part.TagReferences, ct).ConfigureAwait(false); + for (var i = 0; i < part.TagReferences.Count; i++) { - var tagRef = state.TagReferences[i]; + var tagRef = part.TagReferences[i]; var current = snapshots[i]; - var lastSeen = state.LastValues.TryGetValue(tagRef, out var prev) ? prev : default; + var lastSeen = part.LastValues.TryGetValue(tagRef, out var prev) ? prev : default; if (forceRaise || !Equals(lastSeen?.Value, current.Value) || lastSeen?.StatusCode != current.StatusCode) { - state.LastValues[tagRef] = current; - OnDataChange?.Invoke(this, new DataChangeEventArgs(state.Handle, tagRef, current)); + part.LastValues[tagRef] = current; + OnDataChange?.Invoke(this, new DataChangeEventArgs(handle, tagRef, current)); } } } + /// + /// One subscription owns N partitions, one per distinct publishing interval. + /// is the original (unpartitioned) request preserved for + /// diagnostics — runtime polling is driven by . + /// private sealed record SubscriptionState( S7SubscriptionHandle Handle, + IReadOnlyList TagReferences, + TimeSpan DefaultInterval, + IReadOnlyList Partitions); + + /// + /// One poll loop's worth of state: the tags it owns, its tick interval, its + /// per-tag last-seen cache, and the CTS that / + /// trip. + /// + private sealed record PartitionState( IReadOnlyList TagReferences, TimeSpan Interval, CancellationTokenSource Cts) diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.S7/S7DriverFactoryExtensions.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.S7/S7DriverFactoryExtensions.cs index fe360b3..e63cce8 100644 --- a/src/ZB.MOM.WW.OtOpcUa.Driver.S7/S7DriverFactoryExtensions.cs +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.S7/S7DriverFactoryExtensions.cs @@ -34,6 +34,22 @@ public static class S7DriverFactoryExtensions throw new InvalidOperationException( $"S7 driver config for '{driverInstanceId}' missing required Host"); + // PR-S7-C3 — translate ScanGroupIntervalsMs (string -> int ms) into the runtime + // string -> TimeSpan map. Skip any entry with a non-positive value rather than + // throwing, so a config typo (e.g. 0 ms) degrades to "fall back to default + // publishing interval" instead of breaking the whole driver init. + IReadOnlyDictionary? scanGroupMap = null; + if (dto.ScanGroupIntervalsMs is { Count: > 0 }) + { + var built = new Dictionary(StringComparer.OrdinalIgnoreCase); + foreach (var kvp in dto.ScanGroupIntervalsMs) + { + if (string.IsNullOrWhiteSpace(kvp.Key) || kvp.Value <= 0) continue; + built[kvp.Key] = TimeSpan.FromMilliseconds(kvp.Value); + } + if (built.Count > 0) scanGroupMap = built; + } + var options = new S7DriverOptions { Host = dto.Host!, @@ -57,6 +73,7 @@ public static class S7DriverFactoryExtensions fallback: TsapMode.Auto), LocalTsap = dto.LocalTsap, RemoteTsap = dto.RemoteTsap, + ScanGroupIntervals = scanGroupMap, }; return new S7Driver(options, driverInstanceId); @@ -72,7 +89,8 @@ public static class S7DriverFactoryExtensions tagName: t.Name), Writable: t.Writable ?? true, StringLength: t.StringLength ?? 254, - WriteIdempotent: t.WriteIdempotent ?? false); + WriteIdempotent: t.WriteIdempotent ?? false, + ScanGroup: string.IsNullOrWhiteSpace(t.ScanGroup) ? null : t.ScanGroup); private static T ParseEnum(string? raw, string driverInstanceId, string field, string? tagName = null, T? fallback = null) where T : struct, Enum @@ -122,6 +140,15 @@ public static class S7DriverFactoryExtensions /// Optional 16-bit remote TSAP override. Required (with ) when TsapMode = Other. public ushort? RemoteTsap { get; init; } + + /// + /// PR-S7-C3 — optional scan-group → publishing-interval (ms) map. Tags carrying + /// a matching string poll at the configured + /// rate; tags with no group, or with a group not present here, fall back to + /// the subscription default. Group names are matched case-insensitively. See + /// docs/v2/s7.md "Per-tag scan groups" section. + /// + public Dictionary? ScanGroupIntervalsMs { get; init; } } internal sealed class S7TagDto @@ -132,6 +159,14 @@ public static class S7DriverFactoryExtensions public bool? Writable { get; init; } public int? StringLength { get; init; } public bool? WriteIdempotent { get; init; } + + /// + /// PR-S7-C3 — optional scan-group identifier. Resolved against + /// at subscribe time. + /// Null / empty = no group (legacy behaviour, falls back to subscription + /// default publishing interval). + /// + public string? ScanGroup { get; init; } } internal sealed class S7ProbeDto diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.S7/S7DriverOptions.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.S7/S7DriverOptions.cs index 5224161..5afaec1 100644 --- a/src/ZB.MOM.WW.OtOpcUa.Driver.S7/S7DriverOptions.cs +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.S7/S7DriverOptions.cs @@ -122,6 +122,23 @@ public sealed class S7DriverOptions /// non- mode. /// public ushort? RemoteTsap { get; init; } + + /// + /// PR-S7-C3 — per-tag scan-group → publishing-interval map. When a tag declares a + /// , + /// resolves its publishing rate by looking the group up in this dictionary; tags + /// without a group, or with a group not present here, fall back to the + /// subscription-default publishingInterval argument. Keys are matched + /// case-insensitively. See docs/v2/s7.md "Per-tag scan groups" section. + /// + /// + /// The driver still owns one Plc connection serialized through the per-driver + /// _gate, so partitioning into N poll loops does NOT parallelise wire-level + /// reads — every partition queues against the same semaphore. Operator value is + /// decoupling tick cadence: a 100 ms HMI tag isn't blocked behind a 10 s slow-poll + /// batch any more, because the slow batch's Task.Delay isn't holding the gate. + /// + public IReadOnlyDictionary? ScanGroupIntervals { get; init; } } /// @@ -227,6 +244,15 @@ public sealed class S7ProbeOptions /// they need bespoke layout handling and are tracked as a follow-up. Capped at 8000 to /// keep the byte-range request inside a single S7 PDU envelope. /// +/// +/// PR-S7-C3 — optional scan-group identifier. When set, SubscribeAsync looks up +/// the group's publishing interval in +/// and partitions the input tag list so all tags sharing that interval poll together +/// in a dedicated background loop. Tags with no ScanGroup, or with a group not +/// present in the map, fall back to the subscription's default publishing interval +/// (legacy single-rate behaviour). Group names are matched case-insensitively. See +/// docs/v2/s7.md "Per-tag scan groups" section. +/// public sealed record S7TagDefinition( string Name, string Address, @@ -234,7 +260,8 @@ public sealed record S7TagDefinition( bool Writable = true, int StringLength = 254, bool WriteIdempotent = false, - int? ElementCount = null); + int? ElementCount = null, + string? ScanGroup = null); public enum S7DataType { diff --git a/tests/ZB.MOM.WW.OtOpcUa.Driver.S7.IntegrationTests/S7_1500/S7_1500ScanGroupTests.cs b/tests/ZB.MOM.WW.OtOpcUa.Driver.S7.IntegrationTests/S7_1500/S7_1500ScanGroupTests.cs new file mode 100644 index 0000000..fe888df --- /dev/null +++ b/tests/ZB.MOM.WW.OtOpcUa.Driver.S7.IntegrationTests/S7_1500/S7_1500ScanGroupTests.cs @@ -0,0 +1,96 @@ +using Shouldly; +using Xunit; +using ZB.MOM.WW.OtOpcUa.Core.Abstractions; + +namespace ZB.MOM.WW.OtOpcUa.Driver.S7.IntegrationTests.S7_1500; + +/// +/// PR-S7-C3 — end-to-end coverage of per-tag scan-group partitioning. Subscribes three +/// tags at three publishing intervals (100 ms / 1 s / 10 s) against the python-snap7 +/// S7-1500 fixture and asserts each gets its own tick stream with counts proportional +/// to its rate. Scaffold only; runtime execution gated on the Snap7 fixture being up +/// in CI. +/// +[Collection(Snap7ServerCollection.Name)] +[Trait("Category", "Integration")] +[Trait("Device", "S7_1500")] +public sealed class S7_1500ScanGroupTests(Snap7ServerFixture sim) +{ + [Fact] + public async Task Driver_three_scan_groups_publish_independently() + { + if (sim.SkipReason is not null) Assert.Skip(sim.SkipReason); + + // Reuse the smoke profile but override Tags + ScanGroupIntervals so each tag + // lands in its own group. The tags themselves are already seeded by the snap7 + // fixture (DB1.DBW0, DB1.DBW10, DB1.DBD20 — same offsets the smoke tests use). + var baseOpts = S7_1500Profile.BuildOptions(sim.Host, sim.Port); + var options = new S7DriverOptions + { + Host = baseOpts.Host, + Port = baseOpts.Port, + CpuType = baseOpts.CpuType, + Rack = baseOpts.Rack, + Slot = baseOpts.Slot, + Timeout = baseOpts.Timeout, + Probe = baseOpts.Probe, + // Three groups, three rates. The 10s "Slow" group exercises the assertion that + // a slow batch's Task.Delay doesn't block faster partitions from polling — if + // the original (single-rate) implementation had been kept, every tag would + // tick at the slowest configured interval. + ScanGroupIntervals = new Dictionary(StringComparer.OrdinalIgnoreCase) + { + ["Fast"] = TimeSpan.FromMilliseconds(100), + ["Medium"] = TimeSpan.FromSeconds(1), + ["Slow"] = TimeSpan.FromSeconds(10), + }, + Tags = + [ + new S7TagDefinition("FastProbe", "DB1.DBW0", S7DataType.UInt16, ScanGroup: "Fast"), + new S7TagDefinition("MediumI16", "DB1.DBW10", S7DataType.Int16, ScanGroup: "Medium"), + new S7TagDefinition("SlowI32", "DB1.DBD20", S7DataType.Int32, ScanGroup: "Slow"), + ], + }; + + await using var drv = new S7Driver(options, driverInstanceId: "s7-scangroups"); + await drv.InitializeAsync("{}", TestContext.Current.CancellationToken); + + // Per-tag tick counters. The OPC UA initial-data push fires once per tag at + // subscribe time regardless of partition (Part 4 contract), so we discount the + // first tick before evaluating the rate ratio. + var ticks = new System.Collections.Concurrent.ConcurrentDictionary(StringComparer.OrdinalIgnoreCase); + drv.OnDataChange += (_, e) => ticks.AddOrUpdate(e.FullReference, 1, (_, n) => n + 1); + + var handle = await drv.SubscribeAsync( + ["FastProbe", "MediumI16", "SlowI32"], + TimeSpan.FromSeconds(1), // default; ignored because every tag carries a group + TestContext.Current.CancellationToken); + + // Three groups → three partitions. This is the strongest claim of the PR: the + // driver split the input list into one poll loop per distinct interval. + drv.GetPartitionCount(handle).ShouldBe(3, "three distinct rates → three independent poll loops"); + + // Run for ~3 s and capture tick counts. With a 100 ms partition, ~30 ticks expected + // (minus the initial-data push, plus jitter). With a 1 s partition, ~3 ticks. With + // a 10 s partition, only the initial-data push fires inside the window. + await Task.Delay(TimeSpan.FromSeconds(3), TestContext.Current.CancellationToken); + + await drv.UnsubscribeAsync(handle, TestContext.Current.CancellationToken); + + // Discount the initial-data push before checking ratios. After the discount, Fast + // must have produced strictly more ticks than Medium, and Medium must have + // produced at least one tick (Slow stays at 0 inside the 3-second window). + var fastSubsequent = Math.Max(0, (ticks.GetValueOrDefault("FastProbe", 0)) - 1); + var mediumSubsequent = Math.Max(0, (ticks.GetValueOrDefault("MediumI16", 0)) - 1); + + // Loose lower bound on Fast — wall-clock jitter on CI runners makes tighter bounds + // flaky. Anything above ~10 ticks in 3 s proves the 100 ms partition is actually + // running (i.e. it's not blocked behind the 10 s slow partition). + fastSubsequent.ShouldBeGreaterThan(10, + $"100 ms partition should fire many times in 3 s; observed Fast={fastSubsequent}, Medium={mediumSubsequent}"); + // Strict ordering — the whole point of partitioning is that Fast > Medium even + // when Slow is sitting on a 10 s Task.Delay. + fastSubsequent.ShouldBeGreaterThan(mediumSubsequent, + "Fast partition (100 ms) must out-tick Medium partition (1 s) — partitions are independent"); + } +} diff --git a/tests/ZB.MOM.WW.OtOpcUa.Driver.S7.Tests/S7ScanGroupPartitioningTests.cs b/tests/ZB.MOM.WW.OtOpcUa.Driver.S7.Tests/S7ScanGroupPartitioningTests.cs new file mode 100644 index 0000000..8e97cee --- /dev/null +++ b/tests/ZB.MOM.WW.OtOpcUa.Driver.S7.Tests/S7ScanGroupPartitioningTests.cs @@ -0,0 +1,320 @@ +using System.Reflection; +using Shouldly; +using Xunit; +using ZB.MOM.WW.OtOpcUa.Core.Abstractions; + +namespace ZB.MOM.WW.OtOpcUa.Driver.S7.Tests; + +/// +/// PR-S7-C3 — unit coverage for partitioning by +/// resolved publishing interval. Each test wires a small tag map + scan-group rate map +/// and asserts the driver spins up exactly the expected number of internal poll loops +/// by inspecting the test-only GetPartitionCount / GetPartitionSummary +/// entry points. +/// +/// +/// These tests don't initialise the driver (no live PLC) and don't drive ticks — they +/// just verify the partitioning math at SubscribeAsync time. End-to-end tick +/// cadence is covered by the integration smoke test +/// Driver_three_scan_groups_publish_independently. +/// +[Trait("Category", "Unit")] +public sealed class S7ScanGroupPartitioningTests +{ + /// + /// resolves a tag's interval against its private + /// _tagsByName dictionary, populated in InitializeAsync. The unit + /// tests don't run init (no live PLC), so seed the dictionary directly via + /// reflection. Mirrors the pattern in S7DiscoveryAndSubscribeTests which + /// also exercises pre-init code paths. + /// + private static void SeedTagMap(S7Driver drv, params S7TagDefinition[] tags) + { + var field = typeof(S7Driver).GetField("_tagsByName", BindingFlags.NonPublic | BindingFlags.Instance); + field.ShouldNotBeNull(); + var map = (Dictionary)field!.GetValue(drv)!; + foreach (var t in tags) map[t.Name] = t; + } + + [Fact] + public async Task Three_distinct_intervals_produce_three_partitions() + { + var opts = new S7DriverOptions + { + Host = "192.0.2.1", + Probe = new S7ProbeOptions { Enabled = false }, + ScanGroupIntervals = new Dictionary(StringComparer.OrdinalIgnoreCase) + { + ["Fast"] = TimeSpan.FromMilliseconds(100), + ["Medium"] = TimeSpan.FromSeconds(1), + ["Slow"] = TimeSpan.FromSeconds(10), + }, + }; + using var drv = new S7Driver(opts, "s7-3rates"); + SeedTagMap(drv, + new S7TagDefinition("FastTag", "DB1.DBW0", S7DataType.Int16, ScanGroup: "Fast"), + new S7TagDefinition("MediumTag", "DB1.DBW2", S7DataType.Int16, ScanGroup: "Medium"), + new S7TagDefinition("SlowTag", "DB1.DBW4", S7DataType.Int16, ScanGroup: "Slow")); + + var handle = await drv.SubscribeAsync( + ["FastTag", "MediumTag", "SlowTag"], + TimeSpan.FromSeconds(1), + TestContext.Current.CancellationToken); + + drv.GetPartitionCount(handle).ShouldBe(3, "3 distinct intervals → 3 separate poll loops"); + + var summary = drv.GetPartitionSummary(handle); + // Every partition owns exactly one tag — perfect 1:1 mapping. + summary.Count.ShouldBe(3); + summary.ShouldAllBe(p => p.TagCount == 1); + summary.Select(p => p.Interval).Order().ShouldBe(new[] + { + TimeSpan.FromMilliseconds(100), + TimeSpan.FromSeconds(1), + TimeSpan.FromSeconds(10), + }); + + await drv.UnsubscribeAsync(handle, TestContext.Current.CancellationToken); + } + + [Fact] + public async Task Same_interval_for_every_tag_collapses_to_one_partition() + { + var opts = new S7DriverOptions + { + Host = "192.0.2.1", + Probe = new S7ProbeOptions { Enabled = false }, + ScanGroupIntervals = new Dictionary(StringComparer.OrdinalIgnoreCase) + { + ["Default"] = TimeSpan.FromMilliseconds(500), + }, + }; + using var drv = new S7Driver(opts, "s7-1rate"); + SeedTagMap(drv, + new S7TagDefinition("A", "DB1.DBW0", S7DataType.Int16, ScanGroup: "Default"), + new S7TagDefinition("B", "DB1.DBW2", S7DataType.Int16, ScanGroup: "Default"), + new S7TagDefinition("C", "DB1.DBW4", S7DataType.Int16, ScanGroup: "Default")); + + var handle = await drv.SubscribeAsync( + ["A", "B", "C"], + TimeSpan.FromMilliseconds(500), + TestContext.Current.CancellationToken); + + drv.GetPartitionCount(handle).ShouldBe(1, "all three tags share the same group → single poll loop"); + drv.GetPartitionSummary(handle)[0].TagCount.ShouldBe(3); + + await drv.UnsubscribeAsync(handle, TestContext.Current.CancellationToken); + } + + [Fact] + public async Task Mixed_two_at_100ms_and_three_at_1s_produces_two_partitions_with_correct_counts() + { + var opts = new S7DriverOptions + { + Host = "192.0.2.1", + Probe = new S7ProbeOptions { Enabled = false }, + ScanGroupIntervals = new Dictionary(StringComparer.OrdinalIgnoreCase) + { + ["Hmi"] = TimeSpan.FromMilliseconds(100), + ["Slow"] = TimeSpan.FromSeconds(1), + }, + }; + using var drv = new S7Driver(opts, "s7-mixed"); + SeedTagMap(drv, + new S7TagDefinition("Hmi1", "DB1.DBW0", S7DataType.Int16, ScanGroup: "Hmi"), + new S7TagDefinition("Hmi2", "DB1.DBW2", S7DataType.Int16, ScanGroup: "Hmi"), + new S7TagDefinition("Slow1", "DB1.DBW10", S7DataType.Int16, ScanGroup: "Slow"), + new S7TagDefinition("Slow2", "DB1.DBW12", S7DataType.Int16, ScanGroup: "Slow"), + new S7TagDefinition("Slow3", "DB1.DBW14", S7DataType.Int16, ScanGroup: "Slow")); + + var handle = await drv.SubscribeAsync( + ["Hmi1", "Hmi2", "Slow1", "Slow2", "Slow3"], + TimeSpan.FromMilliseconds(500), + TestContext.Current.CancellationToken); + + drv.GetPartitionCount(handle).ShouldBe(2); + + var summary = drv.GetPartitionSummary(handle); + var fast = summary.Single(p => p.Interval == TimeSpan.FromMilliseconds(100)); + var slow = summary.Single(p => p.Interval == TimeSpan.FromSeconds(1)); + fast.TagCount.ShouldBe(2, "Hmi1 + Hmi2 share the 100 ms partition"); + slow.TagCount.ShouldBe(3, "Slow1 + Slow2 + Slow3 share the 1 s partition"); + + await drv.UnsubscribeAsync(handle, TestContext.Current.CancellationToken); + } + + [Fact] + public async Task Tags_without_scan_group_fall_back_to_subscription_default_interval() + { + var opts = new S7DriverOptions + { + Host = "192.0.2.1", + Probe = new S7ProbeOptions { Enabled = false }, + // No ScanGroupIntervals map — every tag must resolve to the subscription default. + }; + using var drv = new S7Driver(opts, "s7-no-groups"); + SeedTagMap(drv, + new S7TagDefinition("Plain1", "DB1.DBW0", S7DataType.Int16), + new S7TagDefinition("Plain2", "DB1.DBW2", S7DataType.Int16)); + + var handle = await drv.SubscribeAsync( + ["Plain1", "Plain2"], + TimeSpan.FromMilliseconds(750), + TestContext.Current.CancellationToken); + + drv.GetPartitionCount(handle).ShouldBe(1, "no groups configured → legacy single-partition behaviour"); + drv.GetPartitionSummary(handle)[0].Interval.ShouldBe(TimeSpan.FromMilliseconds(750)); + + await drv.UnsubscribeAsync(handle, TestContext.Current.CancellationToken); + } + + [Fact] + public async Task Tag_with_unknown_scan_group_falls_back_to_subscription_default() + { + // Operator typo: tag declares "Fst" but the rate map has "Fast". The driver should + // NOT throw — instead the tag silently falls through to the subscription default, + // matching the "config typo degrades, doesn't break" stance from the factory layer. + var opts = new S7DriverOptions + { + Host = "192.0.2.1", + Probe = new S7ProbeOptions { Enabled = false }, + ScanGroupIntervals = new Dictionary(StringComparer.OrdinalIgnoreCase) + { + ["Fast"] = TimeSpan.FromMilliseconds(100), + }, + }; + using var drv = new S7Driver(opts, "s7-typo"); + SeedTagMap(drv, + new S7TagDefinition("Typo", "DB1.DBW0", S7DataType.Int16, ScanGroup: "Fst"), + new S7TagDefinition("Real", "DB1.DBW2", S7DataType.Int16, ScanGroup: "Fast")); + + var handle = await drv.SubscribeAsync( + ["Typo", "Real"], + TimeSpan.FromSeconds(2), + TestContext.Current.CancellationToken); + + var summary = drv.GetPartitionSummary(handle); + summary.Count.ShouldBe(2); + summary.Single(p => p.Interval == TimeSpan.FromMilliseconds(100)).TagCount.ShouldBe(1); + summary.Single(p => p.Interval == TimeSpan.FromSeconds(2)).TagCount.ShouldBe(1); + + await drv.UnsubscribeAsync(handle, TestContext.Current.CancellationToken); + } + + [Fact] + public async Task Scan_group_lookup_is_case_insensitive() + { + // The factory DTO already lower-cases keys — runtime lookup must follow suit so a + // tag declaring "FAST" still resolves against a rate-map key of "Fast". + var opts = new S7DriverOptions + { + Host = "192.0.2.1", + Probe = new S7ProbeOptions { Enabled = false }, + ScanGroupIntervals = new Dictionary(StringComparer.Ordinal) + { + ["Fast"] = TimeSpan.FromMilliseconds(100), + }, + }; + using var drv = new S7Driver(opts, "s7-caseins"); + SeedTagMap(drv, + new S7TagDefinition("T", "DB1.DBW0", S7DataType.Int16, ScanGroup: "FAST")); + + var handle = await drv.SubscribeAsync( + ["T"], TimeSpan.FromSeconds(2), TestContext.Current.CancellationToken); + + drv.GetPartitionSummary(handle)[0].Interval.ShouldBe(TimeSpan.FromMilliseconds(100)); + + await drv.UnsubscribeAsync(handle, TestContext.Current.CancellationToken); + } + + [Fact] + public async Task Group_interval_below_100ms_is_floored_to_100ms() + { + // The 100 ms floor protects the S7 mailbox from sub-scan polling. It applies to + // BOTH the subscription default AND any per-group override — a mis-typed 50 ms + // group rate must NOT slip below the floor. + var opts = new S7DriverOptions + { + Host = "192.0.2.1", + Probe = new S7ProbeOptions { Enabled = false }, + ScanGroupIntervals = new Dictionary(StringComparer.OrdinalIgnoreCase) + { + ["TooFast"] = TimeSpan.FromMilliseconds(25), + }, + }; + using var drv = new S7Driver(opts, "s7-floor-group"); + SeedTagMap(drv, + new S7TagDefinition("T", "DB1.DBW0", S7DataType.Int16, ScanGroup: "TooFast")); + + var handle = await drv.SubscribeAsync( + ["T"], TimeSpan.FromSeconds(1), TestContext.Current.CancellationToken); + + drv.GetPartitionSummary(handle)[0].Interval.ShouldBe(TimeSpan.FromMilliseconds(100)); + + await drv.UnsubscribeAsync(handle, TestContext.Current.CancellationToken); + } + + [Fact] + public async Task Unsubscribe_disposes_every_partition_engine() + { + // Every partition CTS must be cancelled + disposed so a multi-rate subscription + // doesn't leak background poll tasks on unsubscribe. Verified indirectly by the + // partition-count dropping to zero post-unsubscribe (the dictionary is purged) and + // by Dispose being safe to call again with no exception. + var opts = new S7DriverOptions + { + Host = "192.0.2.1", + Probe = new S7ProbeOptions { Enabled = false }, + ScanGroupIntervals = new Dictionary(StringComparer.OrdinalIgnoreCase) + { + ["A"] = TimeSpan.FromMilliseconds(100), + ["B"] = TimeSpan.FromMilliseconds(200), + ["C"] = TimeSpan.FromMilliseconds(500), + }, + }; + using var drv = new S7Driver(opts, "s7-cleanup"); + SeedTagMap(drv, + new S7TagDefinition("Ta", "DB1.DBW0", S7DataType.Int16, ScanGroup: "A"), + new S7TagDefinition("Tb", "DB1.DBW2", S7DataType.Int16, ScanGroup: "B"), + new S7TagDefinition("Tc", "DB1.DBW4", S7DataType.Int16, ScanGroup: "C")); + + var handle = await drv.SubscribeAsync( + ["Ta", "Tb", "Tc"], + TimeSpan.FromSeconds(1), + TestContext.Current.CancellationToken); + + drv.GetPartitionCount(handle).ShouldBe(3); + + await drv.UnsubscribeAsync(handle, TestContext.Current.CancellationToken); + + drv.GetPartitionCount(handle).ShouldBe(0, "post-unsubscribe the subscription is purged from the dictionary"); + // Idempotent: a second unsubscribe must be a no-op, not throw. + await drv.UnsubscribeAsync(handle, TestContext.Current.CancellationToken); + } + + [Fact] + public async Task Legacy_single_rate_path_unchanged_when_no_tag_carries_a_scan_group() + { + // Sanity: existing deployments that don't set ScanGroup or ScanGroupIntervals must + // see exactly one partition with exactly the requested publishing interval — the + // PR is opt-in and zero-impact for legacy callers. + var opts = new S7DriverOptions + { + Host = "192.0.2.1", + Probe = new S7ProbeOptions { Enabled = false }, + }; + using var drv = new S7Driver(opts, "s7-legacy"); + SeedTagMap(drv, + new S7TagDefinition("L1", "DB1.DBW0", S7DataType.Int16), + new S7TagDefinition("L2", "DB1.DBW2", S7DataType.Int16)); + + var handle = await drv.SubscribeAsync( + ["L1", "L2"], TimeSpan.FromMilliseconds(250), TestContext.Current.CancellationToken); + + drv.GetPartitionCount(handle).ShouldBe(1); + drv.GetPartitionSummary(handle)[0].Interval.ShouldBe(TimeSpan.FromMilliseconds(250)); + drv.GetPartitionSummary(handle)[0].TagCount.ShouldBe(2); + + await drv.UnsubscribeAsync(handle, TestContext.Current.CancellationToken); + } +} -- 2.49.1