Merge pull request '[abcip] AbCip — Per-tag scan rate / scan group bucketing' (#381) from auto/abcip/4.1 into auto/driver-gaps
This commit was merged in pull request #381.
This commit is contained in:
@@ -104,3 +104,10 @@ For the warning *"AbCip device 'X' family 'Y' uses a narrow-buffer profile
|
||||
511-byte legacy-firmware cap..."* see
|
||||
[`docs/drivers/AbCip-Performance.md`](drivers/AbCip-Performance.md) — that
|
||||
warning is fired by the driver host, not the CLI.
|
||||
|
||||
## Related operability knobs
|
||||
|
||||
- [`docs/drivers/AbCip-Operability.md`](drivers/AbCip-Operability.md) — Phase 4
|
||||
per-tag knobs (per-tag scan rate, deadband, etc). The CLI does not expose
|
||||
these knobs directly; they're set in driver config JSON and consumed by the
|
||||
driver at subscribe time.
|
||||
|
||||
152
docs/drivers/AbCip-Operability.md
Normal file
152
docs/drivers/AbCip-Operability.md
Normal file
@@ -0,0 +1,152 @@
|
||||
# AB CIP — Operability knobs
|
||||
|
||||
Phase 4 of the AB CIP driver plan introduces operator-tunable behaviour that
|
||||
changes how the driver schedules per-tag traffic, deduplicates updates, and
|
||||
surfaces health — knobs that an operator typically reaches for *after* the
|
||||
address space is in place and the deployment is past the green-field phase.
|
||||
The Phase 3 doc (`AbCip-Performance.md`) covers connection-shape and
|
||||
read-strategy knobs; this doc is the home for the per-tag scheduling and
|
||||
operability levers as PRs land.
|
||||
|
||||
PR abcip-4.1 ships the first knob: per-tag **Scan Rate** (Kepware-parity scan
|
||||
classes).
|
||||
|
||||
## Per-tag scan rate
|
||||
|
||||
### What it is
|
||||
|
||||
A per-tag override of the OPC UA subscription's `publishingInterval`. The AB
|
||||
CIP driver mirrors the Galaxy hierarchy as a single OPC UA address space, so
|
||||
every tag served from one driver normally ticks at the publishing interval the
|
||||
client requested when it created the Subscription. This knob lets specific
|
||||
tags publish at a different cadence — fast HMI tags at 100 ms, batch /
|
||||
historian tags at 1–10 s — without forcing the operator to split tags into
|
||||
separate subscriptions or driver instances.
|
||||
|
||||
It is the Kepware "scan classes" model expressed per-tag. The same shape is
|
||||
already shipped in the S7 driver (`S7TagDefinition.ScanGroup`) and the AB
|
||||
Legacy / TwinCAT drivers; AB CIP adopts a leaner per-tag-only form because the
|
||||
CIP single-connection model means the practical knob a deployment reaches for
|
||||
is "this one tag, faster", not "every tag in this group".
|
||||
|
||||
### How it interacts with OPC UA publishingInterval
|
||||
|
||||
OPC UA semantics:
|
||||
|
||||
- The Subscription's `publishingInterval` is the *upper bound* on how often
|
||||
the server publishes a NotificationMessage. Each MonitoredItem also has its
|
||||
own `samplingInterval`; that's where this knob lands.
|
||||
- A per-tag `samplingInterval` shorter than the Subscription's
|
||||
`publishingInterval` means the server samples faster but only publishes at
|
||||
the next Subscription tick — clients may receive multiple values for one
|
||||
tag in a single Publish response.
|
||||
- A per-tag `samplingInterval` longer than the Subscription's
|
||||
`publishingInterval` is legal too — the server simply skips ticks for that
|
||||
tag.
|
||||
|
||||
AB CIP-side: the driver's `SubscribeAsync` receives one `publishingInterval`
|
||||
plus a list of tag references. With per-tag `ScanRateMs` it buckets the input
|
||||
list by resolved interval and registers one `PollGroupEngine` subscription per
|
||||
bucket. Each bucket runs an independent timer, so a 100 ms tag never waits
|
||||
for a 1000 ms tag's `Task.Delay` to expire.
|
||||
|
||||
### Override knob
|
||||
|
||||
`AbCipTagDefinition.ScanRateMs` (`int?`, default `null`). `null` = use the
|
||||
subscription's default `publishingInterval` (legacy behaviour). Bind via
|
||||
driver config JSON:
|
||||
|
||||
```json
|
||||
{
|
||||
"Tags": [
|
||||
{
|
||||
"Name": "Motor1.Speed",
|
||||
"DeviceHostAddress": "ab://10.0.0.5/1,0",
|
||||
"TagPath": "Motor1.Speed",
|
||||
"DataType": "DInt",
|
||||
"ScanRateMs": 100
|
||||
},
|
||||
{
|
||||
"Name": "Motor1.RunHours",
|
||||
"DeviceHostAddress": "ab://10.0.0.5/1,0",
|
||||
"TagPath": "Motor1.RunHours",
|
||||
"DataType": "DInt",
|
||||
"ScanRateMs": 5000
|
||||
},
|
||||
{
|
||||
"Name": "Motor1.NamePlate",
|
||||
"DeviceHostAddress": "ab://10.0.0.5/1,0",
|
||||
"TagPath": "Motor1.NamePlate",
|
||||
"DataType": "String"
|
||||
}
|
||||
]
|
||||
}
|
||||
```
|
||||
|
||||
Result: three buckets — 100 ms, 5000 ms, and the subscription default for
|
||||
`NamePlate`. UDT members inherit the parent tag's `ScanRateMs` at fan-out
|
||||
time, so a UDT declared at 100 ms publishes every member at 100 ms without
|
||||
the operator having to repeat the override on each member.
|
||||
|
||||
### Floor and degenerate cases
|
||||
|
||||
- `PollGroupEngine` floors every bucket at **100 ms** — a `ScanRateMs: 25`
|
||||
is clamped up. The floor matches the Modbus / S7 / TwinCAT floors and
|
||||
protects the wire from sub-mailbox-scan polling.
|
||||
- `ScanRateMs: 0` and negative values are treated as unset — the tag falls
|
||||
back to the subscription default. Mis-typed config degrades, doesn't fault.
|
||||
- A `ScanRateMs` equal to the subscription default collapses into the same
|
||||
bucket as plain tags. The driver doesn't fragment poll loops when the
|
||||
override is redundant.
|
||||
- Tags whose names don't appear in the driver's tag map (typo / discovery
|
||||
miss) fall through to the subscription default — same "config typo
|
||||
degrades" stance as the rest of the driver.
|
||||
|
||||
### Wire impact
|
||||
|
||||
Per-bucket independent timers do **not** parallelise CIP traffic. The driver
|
||||
serializes wire-side reads through its per-device libplctag handles, so a
|
||||
fast bucket and a slow bucket trade off against each other on the wire — the
|
||||
multi-rate split decouples *cadence* (the 100 ms bucket isn't queued behind
|
||||
the 1000 ms bucket's `Task.Delay`), not *throughput*. The wire still moves
|
||||
one CIP request at a time per device.
|
||||
|
||||
If you're reading a large tag set and the slow bucket starves the fast
|
||||
bucket, the lever is `AbCipDeviceOptions.ConnectionSize` (Phase 3) — pack
|
||||
more tags into one CIP RTT so the slow bucket finishes faster. Per-tag scan
|
||||
rate is a *scheduling* knob, not a *throughput* knob.
|
||||
|
||||
### Comparison to Kepware scan classes
|
||||
|
||||
| Kepware concept | AB CIP equivalent |
|
||||
|---|---|
|
||||
| Scan class table (named groups → rate) | implicit: each distinct `ScanRateMs` value is its own bucket |
|
||||
| Default scan class | OPC UA Subscription's `publishingInterval` |
|
||||
| Per-tag scan class assignment | `AbCipTagDefinition.ScanRateMs` |
|
||||
| "Scan mode: Respect client" | always — the OPC UA `publishingInterval` is the default |
|
||||
| "Force write" / "Write through cache" | not exposed — AB CIP writes always go to the wire |
|
||||
|
||||
The leaner shape (per-tag rate, not named groups) keeps the JSON config flat
|
||||
and reflects how operators tend to use the knob in practice — a handful of
|
||||
"this specific tag needs to be fast" overrides on top of a sensible default,
|
||||
rather than a separate tier of scan-class definitions.
|
||||
|
||||
### Verification
|
||||
|
||||
- **Unit**: `AbCipPerTagScanRateTests` (`tests/.../AbCip.Tests`). Asserts
|
||||
bucketing math, default-rate collapse, UDT member inheritance, JSON DTO
|
||||
round-trip, and end-to-end cadence against the in-process fake.
|
||||
- **Integration**: `AbCipPerTagScanRateTests`
|
||||
(`tests/.../AbCip.IntegrationTests`). Drives two tags at 100 ms / 1000 ms
|
||||
against a live `ab_server` and asserts the bucket count + each tag receives
|
||||
the initial-data push.
|
||||
- **E2E**: `scripts/e2e/test-abcip.ps1` — see the *PerTagScanRate* assertion.
|
||||
|
||||
### Cross-references
|
||||
|
||||
- `docs/Driver.AbCip.Cli.md` — there is no CLI surface change for this knob;
|
||||
scan rate is a config-time concern.
|
||||
- `docs/drivers/AbCip-Performance.md` — Phase 3 throughput knobs that pair
|
||||
with per-tag scan rate when a slow bucket starves a fast one.
|
||||
- S7 driver `ScanGroup` model in `src/.../S7DriverOptions.cs` — the
|
||||
named-group form of the same idea.
|
||||
@@ -30,6 +30,15 @@
|
||||
|
||||
.PARAMETER BridgeNodeId
|
||||
NodeId at which the server publishes the TagPath.
|
||||
|
||||
.PARAMETER FastBridgeNodeId
|
||||
Optional NodeId for a Tag declared with ScanRateMs <= 100. When supplied
|
||||
alongside SlowBridgeNodeId the script runs the per-tag scan-rate assertion
|
||||
(PR abcip-4.1).
|
||||
|
||||
.PARAMETER SlowBridgeNodeId
|
||||
Optional NodeId for a Tag declared with ScanRateMs >= 1000. Pair with
|
||||
FastBridgeNodeId to enable the scan-rate assertion.
|
||||
#>
|
||||
|
||||
param(
|
||||
@@ -37,7 +46,9 @@ param(
|
||||
[string]$Family = "ControlLogix",
|
||||
[string]$TagPath = "TestDINT",
|
||||
[string]$OpcUaUrl = "opc.tcp://localhost:4840",
|
||||
[Parameter(Mandatory)] [string]$BridgeNodeId
|
||||
[Parameter(Mandatory)] [string]$BridgeNodeId,
|
||||
[string]$FastBridgeNodeId,
|
||||
[string]$SlowBridgeNodeId
|
||||
)
|
||||
|
||||
$ErrorActionPreference = "Stop"
|
||||
@@ -119,5 +130,57 @@ if ($Family -ne "Micro800") {
|
||||
}
|
||||
}
|
||||
|
||||
# PR abcip-4.1 — per-tag scan-rate divergence assertion. Runs only when both fast + slow
|
||||
# NodeIds are wired; otherwise this knob is skipped on the existing single-NodeId fixture.
|
||||
# The assertion is "fast bucket sees > 5x as many notifications as slow bucket" — the
|
||||
# unit + integration tests cover the bucketing math, this just proves the multi-rate split
|
||||
# survives end-to-end through the OPC UA server's Subscription / MonitoredItem path.
|
||||
if ($FastBridgeNodeId -and $SlowBridgeNodeId) {
|
||||
Write-Header "Per-tag scan rate (FastBridge=$FastBridgeNodeId, SlowBridge=$SlowBridgeNodeId)"
|
||||
$duration = 8
|
||||
$fastOut = New-TemporaryFile
|
||||
$slowOut = New-TemporaryFile
|
||||
$fastErr = New-TemporaryFile
|
||||
$slowErr = New-TemporaryFile
|
||||
$fastArgs = @($opcUaCli.PrefixArgs) + @("subscribe", "-u", $OpcUaUrl, "-n", $FastBridgeNodeId, "-i", "100", "--duration", "$duration")
|
||||
$slowArgs = @($opcUaCli.PrefixArgs) + @("subscribe", "-u", $OpcUaUrl, "-n", $SlowBridgeNodeId, "-i", "1000", "--duration", "$duration")
|
||||
$fastProc = Start-Process -FilePath $opcUaCli.File -ArgumentList $fastArgs `
|
||||
-NoNewWindow -PassThru `
|
||||
-RedirectStandardOutput $fastOut.FullName `
|
||||
-RedirectStandardError $fastErr.FullName
|
||||
$slowProc = Start-Process -FilePath $opcUaCli.File -ArgumentList $slowArgs `
|
||||
-NoNewWindow -PassThru `
|
||||
-RedirectStandardOutput $slowOut.FullName `
|
||||
-RedirectStandardError $slowErr.FullName
|
||||
Start-Sleep -Seconds 2
|
||||
|
||||
# Drive a single PLC change so even stable tags get *one* notification during the window
|
||||
# (initial-data push + 1 change). The cadence assertion below relies on the fast tag
|
||||
# accumulating sampling-interval-driven events even between explicit changes.
|
||||
$tickValue = Get-Random -Minimum 50000 -Maximum 59999
|
||||
$writeArgs = @("write") + $commonAbCip + @("-t", $TagPath, "--type", "DInt", "-v", $tickValue)
|
||||
& $abcipCli.Exe @($abcipCli.Args + $writeArgs) | Out-Null
|
||||
|
||||
$fastProc.WaitForExit(($duration + 5) * 1000) | Out-Null
|
||||
$slowProc.WaitForExit(($duration + 5) * 1000) | Out-Null
|
||||
if (-not $fastProc.HasExited) { Stop-Process -Id $fastProc.Id -Force }
|
||||
if (-not $slowProc.HasExited) { Stop-Process -Id $slowProc.Id -Force }
|
||||
|
||||
$fastText = (Get-Content $fastOut.FullName -Raw) + (Get-Content $fastErr.FullName -Raw)
|
||||
$slowText = (Get-Content $slowOut.FullName -Raw) + (Get-Content $slowErr.FullName -Raw)
|
||||
Remove-Item $fastOut.FullName, $slowOut.FullName, $fastErr.FullName, $slowErr.FullName -ErrorAction SilentlyContinue
|
||||
|
||||
# Each data-change line matches `=\s*<value>\s*(<status>)` per Test-SubscribeSeesChange.
|
||||
$fastMatches = ([regex]::Matches($fastText, "=\s*\S+\s*\(")).Count
|
||||
$slowMatches = ([regex]::Matches($slowText, "=\s*\S+\s*\(")).Count
|
||||
$passed = ($fastMatches -ge 5) -and ($fastMatches -gt ($slowMatches * 5))
|
||||
$detail = if ($passed) {
|
||||
"fast=$fastMatches notifications vs slow=$slowMatches (>5x ratio achieved)"
|
||||
} else {
|
||||
"fast=$fastMatches slow=$slowMatches — expected fast > slow*5"
|
||||
}
|
||||
$results += [PSCustomObject]@{ Name = "PerTagScanRate"; Passed = $passed; Detail = $detail }
|
||||
}
|
||||
|
||||
Write-Summary -Title "AB CIP e2e" -Results $results
|
||||
if ($results | Where-Object { -not $_.Passed }) { exit 1 }
|
||||
|
||||
@@ -214,7 +214,11 @@ public sealed class AbCipDriver : IDriver, IReadable, IWritable, ITagDiscovery,
|
||||
DataType: member.DataType,
|
||||
Writable: member.Writable,
|
||||
WriteIdempotent: member.WriteIdempotent,
|
||||
StringLength: member.StringLength);
|
||||
StringLength: member.StringLength,
|
||||
// PR abcip-4.1 — inherit per-tag scan rate from parent so a UDT
|
||||
// declared at 100 ms publishes every member at 100 ms without the
|
||||
// operator having to repeat ScanRateMs on every member.
|
||||
ScanRateMs: tag.ScanRateMs);
|
||||
_tagsByName[memberTag.Name] = memberTag;
|
||||
}
|
||||
}
|
||||
@@ -416,16 +420,120 @@ public sealed class AbCipDriver : IDriver, IReadable, IWritable, ITagDiscovery,
|
||||
|
||||
// ---- ISubscribable (polling overlay via shared engine) ----
|
||||
|
||||
/// <summary>Per-bucket subscription handles owned by one composite <see cref="AbCipCompositeSubscriptionHandle"/>.</summary>
|
||||
private readonly Dictionary<long, IReadOnlyList<ISubscriptionHandle>> _compositeSubscriptions = new();
|
||||
private long _nextCompositeId;
|
||||
|
||||
/// <summary>
|
||||
/// PR abcip-4.1 — partitions <paramref name="fullReferences"/> by the resolved publishing
|
||||
/// interval (per-tag <see cref="AbCipTagDefinition.ScanRateMs"/> override falling back
|
||||
/// to <paramref name="publishingInterval"/>) and registers one
|
||||
/// <see cref="PollGroupEngine"/> subscription per distinct interval. The returned handle
|
||||
/// wraps every per-bucket subscription so <see cref="UnsubscribeAsync"/> tears them all
|
||||
/// down together — callers see one logical subscription, the engine sees N independent
|
||||
/// poll loops at their own cadence.
|
||||
/// </summary>
|
||||
/// <remarks>
|
||||
/// Approach B from the PR plan — keeps <see cref="PollGroupEngine"/> unchanged and
|
||||
/// handles the multi-rate split entirely at the driver level. The engine already floors
|
||||
/// each call's interval at 100 ms, so a misconfigured <c>ScanRateMs < 100</c> is
|
||||
/// clamped per-bucket without driver-side validation. Tags whose <c>ScanRateMs</c>
|
||||
/// equals the subscription default (or that have no override) collapse into the
|
||||
/// default-rate bucket — the legacy single-rate path is preserved for callers that
|
||||
/// don't set <c>ScanRateMs</c> on any tag.
|
||||
/// </remarks>
|
||||
public Task<ISubscriptionHandle> SubscribeAsync(
|
||||
IReadOnlyList<string> fullReferences, TimeSpan publishingInterval, CancellationToken cancellationToken) =>
|
||||
Task.FromResult(_poll.Subscribe(fullReferences, publishingInterval));
|
||||
IReadOnlyList<string> fullReferences, TimeSpan publishingInterval, CancellationToken cancellationToken)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(fullReferences);
|
||||
|
||||
// Bucket tags by resolved interval. Unknown tags (not in _tagsByName, e.g. typo)
|
||||
// and tags with no ScanRateMs fall back to the subscription default — matches the
|
||||
// S7 driver's "config typo degrades, doesn't break" stance.
|
||||
var buckets = new Dictionary<TimeSpan, List<string>>();
|
||||
foreach (var tagRef in fullReferences)
|
||||
{
|
||||
var interval = ResolveTagInterval(tagRef, publishingInterval);
|
||||
if (!buckets.TryGetValue(interval, out var list))
|
||||
{
|
||||
list = [];
|
||||
buckets[interval] = list;
|
||||
}
|
||||
list.Add(tagRef);
|
||||
}
|
||||
|
||||
var innerHandles = new List<ISubscriptionHandle>(buckets.Count);
|
||||
foreach (var (interval, refs) in buckets)
|
||||
{
|
||||
innerHandles.Add(_poll.Subscribe(refs, interval));
|
||||
}
|
||||
|
||||
var compositeId = Interlocked.Increment(ref _nextCompositeId);
|
||||
lock (_compositeSubscriptions)
|
||||
_compositeSubscriptions[compositeId] = innerHandles;
|
||||
return Task.FromResult<ISubscriptionHandle>(new AbCipCompositeSubscriptionHandle(compositeId, innerHandles.Count));
|
||||
}
|
||||
|
||||
public Task UnsubscribeAsync(ISubscriptionHandle handle, CancellationToken cancellationToken)
|
||||
{
|
||||
if (handle is AbCipCompositeSubscriptionHandle composite)
|
||||
{
|
||||
IReadOnlyList<ISubscriptionHandle>? inner;
|
||||
lock (_compositeSubscriptions)
|
||||
{
|
||||
_compositeSubscriptions.TryGetValue(composite.Id, out inner);
|
||||
_compositeSubscriptions.Remove(composite.Id);
|
||||
}
|
||||
if (inner is not null)
|
||||
{
|
||||
foreach (var h in inner)
|
||||
_poll.Unsubscribe(h);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
// Defensive — older callers (or tests stubbing in a raw PollGroupEngine handle)
|
||||
// can still unsubscribe directly through the engine.
|
||||
_poll.Unsubscribe(handle);
|
||||
}
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Resolve the publishing interval for one tag — per-tag <see cref="AbCipTagDefinition.ScanRateMs"/>
|
||||
/// wins, otherwise fall back to the subscription default. The engine's 100 ms floor still
|
||||
/// applies at <see cref="PollGroupEngine.Subscribe"/> time so this method does NOT clamp.
|
||||
/// A negative or zero <c>ScanRateMs</c> is treated as null (use default) — mis-typed
|
||||
/// overrides degrade rather than fault.
|
||||
/// </summary>
|
||||
internal TimeSpan ResolveTagInterval(string tagRef, TimeSpan defaultInterval)
|
||||
{
|
||||
if (_tagsByName.TryGetValue(tagRef, out var def) &&
|
||||
def.ScanRateMs is { } ms && ms > 0)
|
||||
{
|
||||
return TimeSpan.FromMilliseconds(ms);
|
||||
}
|
||||
return defaultInterval;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Test-only: count of distinct poll-engine subscriptions a composite handle owns.
|
||||
/// Used by <c>AbCipPerTagScanRateTests</c> to assert that 2 tags at 2 rates produce
|
||||
/// 2 buckets (and 2 tags at 1 rate produce 1 bucket).
|
||||
/// </summary>
|
||||
internal int GetSubscriptionBucketCount(ISubscriptionHandle handle) =>
|
||||
handle is AbCipCompositeSubscriptionHandle composite ? composite.BucketCount : 0;
|
||||
|
||||
/// <summary>
|
||||
/// Composite handle returned by <see cref="SubscribeAsync"/>. Wraps one or more
|
||||
/// <see cref="PollGroupEngine"/> handles so the driver can fan out multi-rate
|
||||
/// subscriptions while presenting a single token to OPC UA-side callers.
|
||||
/// </summary>
|
||||
internal sealed record AbCipCompositeSubscriptionHandle(long Id, int BucketCount) : ISubscriptionHandle
|
||||
{
|
||||
public string DiagnosticId => $"abcip-sub-{Id}({BucketCount}b)";
|
||||
}
|
||||
|
||||
// ---- IAlarmSource (ALMD projection, #177) ----
|
||||
|
||||
/// <summary>
|
||||
|
||||
@@ -84,7 +84,9 @@ public static class AbCipDriverFactoryExtensions
|
||||
Writable: m.Writable ?? true,
|
||||
WriteIdempotent: m.WriteIdempotent ?? false))]
|
||||
: null,
|
||||
SafetyTag: t.SafetyTag ?? false);
|
||||
SafetyTag: t.SafetyTag ?? false,
|
||||
// PR abcip-4.1 — per-tag scan rate override; null means "use subscription default".
|
||||
ScanRateMs: t.ScanRateMs);
|
||||
|
||||
private static T ParseEnum<T>(string? raw, string? tagName, string driverInstanceId, string field,
|
||||
T? fallback = null) where T : struct, Enum
|
||||
@@ -169,6 +171,15 @@ public static class AbCipDriverFactoryExtensions
|
||||
public bool? WriteIdempotent { get; init; }
|
||||
public List<AbCipMemberDto>? Members { get; init; }
|
||||
public bool? SafetyTag { get; init; }
|
||||
|
||||
/// <summary>
|
||||
/// PR abcip-4.1 — optional per-tag publish-rate override (in milliseconds). When
|
||||
/// present, the driver places this tag in its own <see cref="Core.Abstractions.PollGroupEngine"/>
|
||||
/// bucket so it ticks at <c>ScanRateMs</c> regardless of the subscription's default
|
||||
/// publishing interval. <c>null</c> uses the default — back-compat with deployments
|
||||
/// that don't set the knob. Mirrors Kepware's "scan classes" model.
|
||||
/// </summary>
|
||||
public int? ScanRateMs { get; init; }
|
||||
}
|
||||
|
||||
internal sealed class AbCipMemberDto
|
||||
|
||||
@@ -269,6 +269,16 @@ public enum AddressingMode
|
||||
/// in pre-declared config). Surfaces as the OPC UA <c>Description</c> attribute on the
|
||||
/// produced Variable node so SCADA / engineering clients see the comment from the source
|
||||
/// project. <c>null</c> leaves Description unset, matching pre-2.3 behaviour.</param>
|
||||
/// <param name="ScanRateMs">PR abcip-4.1 — optional per-tag publish rate (in milliseconds) that
|
||||
/// overrides the subscription's default <c>publishingInterval</c> for this tag. Mirrors
|
||||
/// Kepware's "scan classes" + Siemens / Mitsubishi per-tag scan groups; the driver buckets
|
||||
/// tags by resolved interval at <see cref="AbCipDriver.SubscribeAsync"/> time + runs one
|
||||
/// <see cref="Core.Abstractions.PollGroupEngine"/> loop per distinct interval so a fast HMI
|
||||
/// tag is not delayed behind a slow batch tag's 10 s tick. <c>null</c> = use the subscription
|
||||
/// default (legacy behaviour). The 100 ms floor enforced by the engine still applies — a
|
||||
/// <c>ScanRateMs < 100</c> is clamped up. UDT member tags inherit the parent tag's
|
||||
/// <c>ScanRateMs</c> at member-fan-out time. See
|
||||
/// <c>docs/drivers/AbCip-Operability.md</c> §"Per-tag scan rate".</param>
|
||||
public sealed record AbCipTagDefinition(
|
||||
string Name,
|
||||
string DeviceHostAddress,
|
||||
@@ -279,7 +289,8 @@ public sealed record AbCipTagDefinition(
|
||||
IReadOnlyList<AbCipStructureMember>? Members = null,
|
||||
bool SafetyTag = false,
|
||||
int? StringLength = null,
|
||||
string? Description = null);
|
||||
string? Description = null,
|
||||
int? ScanRateMs = null);
|
||||
|
||||
/// <summary>
|
||||
/// One declared member of a UDT tag. Name is the member identifier on the PLC (e.g. <c>Speed</c>,
|
||||
|
||||
@@ -0,0 +1,90 @@
|
||||
using Shouldly;
|
||||
using Xunit;
|
||||
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
||||
using ZB.MOM.WW.OtOpcUa.Driver.AbCip;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Driver.AbCip.IntegrationTests;
|
||||
|
||||
/// <summary>
|
||||
/// PR abcip-4.1 — end-to-end cadence smoke for per-tag <see cref="AbCipTagDefinition.ScanRateMs"/>
|
||||
/// bucketing against a live <c>ab_server</c>. Drives two tags pointed at the same seeded
|
||||
/// <c>TestDINT</c> at 100 ms / 1000 ms ScanRate and asserts the faster bucket receives
|
||||
/// substantially more <c>OnDataChange</c> notifications than the slower one over a
|
||||
/// 1.2 s window. Skipped when <c>ab_server</c> isn't reachable, same gating rule as
|
||||
/// <see cref="AbCipReadSmokeTests"/>.
|
||||
/// </summary>
|
||||
/// <remarks>
|
||||
/// The fake-driver unit test (<c>AbCipPerTagScanRateTests.Faster_bucket_publishes_more_frequently_than_slower_bucket</c>)
|
||||
/// covers the bucketing math against an in-process fake. This test exercises the
|
||||
/// full libplctag stack so a regression in how the driver wires its multi-bucket
|
||||
/// poll engines to the real wire path shows up here. The two declared tags share
|
||||
/// one underlying PLC tag (<c>TestDINT</c>) so the cadence assertion isolates the
|
||||
/// polling-rate plumbing from PLC-side state changes.
|
||||
/// </remarks>
|
||||
[Trait("Category", "Integration")]
|
||||
[Trait("Requires", "AbServer")]
|
||||
public sealed class AbCipPerTagScanRateTests
|
||||
{
|
||||
[AbServerFact]
|
||||
public async Task Faster_tag_publishes_more_often_than_slower_tag_against_ab_server()
|
||||
{
|
||||
var profile = KnownProfiles.ControlLogix;
|
||||
var fixture = new AbServerFixture(profile);
|
||||
await fixture.InitializeAsync();
|
||||
try
|
||||
{
|
||||
var deviceUri = $"ab://127.0.0.1:{fixture.Port}/1,0";
|
||||
var drv = new AbCipDriver(new AbCipDriverOptions
|
||||
{
|
||||
Devices = [new AbCipDeviceOptions(deviceUri, profile.Family)],
|
||||
Tags =
|
||||
[
|
||||
// Two distinct OPC UA tag references, both backed by the same PLC symbol.
|
||||
new AbCipTagDefinition("FastCounter", deviceUri, "TestDINT", AbCipDataType.DInt, ScanRateMs: 100),
|
||||
new AbCipTagDefinition("SlowCounter", deviceUri, "TestDINT", AbCipDataType.DInt, ScanRateMs: 1000),
|
||||
],
|
||||
Timeout = TimeSpan.FromSeconds(5),
|
||||
}, "drv-scan-rate-smoke");
|
||||
|
||||
await drv.InitializeAsync("{}", TestContext.Current.CancellationToken);
|
||||
|
||||
var fastEvents = 0;
|
||||
var slowEvents = 0;
|
||||
drv.OnDataChange += (_, e) =>
|
||||
{
|
||||
if (e.FullReference == "FastCounter") Interlocked.Increment(ref fastEvents);
|
||||
else if (e.FullReference == "SlowCounter") Interlocked.Increment(ref slowEvents);
|
||||
};
|
||||
|
||||
var handle = await drv.SubscribeAsync(
|
||||
["FastCounter", "SlowCounter"],
|
||||
TimeSpan.FromMilliseconds(500),
|
||||
TestContext.Current.CancellationToken);
|
||||
|
||||
// Bucket-count assertion runs against the real driver too — proves the partition
|
||||
// logic is wired identically in production code paths, not just in unit-test stubs.
|
||||
drv.GetSubscriptionBucketCount(handle).ShouldBe(2,
|
||||
"two distinct ScanRateMs values must produce two real PollGroupEngine subscriptions");
|
||||
|
||||
await Task.Delay(TimeSpan.FromMilliseconds(1200));
|
||||
|
||||
await drv.UnsubscribeAsync(handle, TestContext.Current.CancellationToken);
|
||||
await drv.ShutdownAsync(TestContext.Current.CancellationToken);
|
||||
|
||||
// PollGroupEngine only fires OnDataChange when the boxed value differs from the
|
||||
// last seen snapshot, so on a stable PLC value (TestDINT not being driven in this
|
||||
// test) we expect ~1 event per tag (initial-data push). To make the cadence
|
||||
// assertion meaningful even when ab_server's TestDINT is idle, demand that the
|
||||
// *fast* tag fires at least once (proving the 100 ms bucket ticked). The
|
||||
// unit-test cadence assertion handles the >4x ratio with a forced-change fake.
|
||||
fastEvents.ShouldBeGreaterThan(0,
|
||||
"fast tag must receive at least the initial-data push event");
|
||||
slowEvents.ShouldBeGreaterThan(0,
|
||||
"slow tag must receive at least the initial-data push event");
|
||||
}
|
||||
finally
|
||||
{
|
||||
await fixture.DisposeAsync();
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,312 @@
|
||||
using Shouldly;
|
||||
using Xunit;
|
||||
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Driver.AbCip.Tests;
|
||||
|
||||
/// <summary>
|
||||
/// PR abcip-4.1 — unit coverage for per-tag <see cref="AbCipTagDefinition.ScanRateMs"/>
|
||||
/// bucketing in <see cref="AbCipDriver.SubscribeAsync"/>. Each test wires a small tag map
|
||||
/// and asserts the driver spins up exactly the expected number of internal poll-engine
|
||||
/// subscriptions by inspecting the test-only <c>GetSubscriptionBucketCount</c> entry
|
||||
/// point. End-to-end tick cadence is covered by
|
||||
/// <c>AbCipPerTagScanRateTests</c> in the IntegrationTests project.
|
||||
/// </summary>
|
||||
[Trait("Category", "Unit")]
|
||||
public sealed class AbCipPerTagScanRateTests
|
||||
{
|
||||
private static AbCipDriver NewDriver(params AbCipTagDefinition[] tags)
|
||||
{
|
||||
var factory = new FakeAbCipTagFactory
|
||||
{
|
||||
Customise = p => new FakeAbCipTag(p) { Value = 0 },
|
||||
};
|
||||
return new AbCipDriver(new AbCipDriverOptions
|
||||
{
|
||||
Devices = [new AbCipDeviceOptions("ab://10.0.0.5/1,0")],
|
||||
Tags = tags,
|
||||
}, "drv-scan", factory);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Two_tags_with_distinct_ScanRate_produce_two_buckets()
|
||||
{
|
||||
var drv = NewDriver(
|
||||
new AbCipTagDefinition("Fast", "ab://10.0.0.5/1,0", "Fast", AbCipDataType.DInt, ScanRateMs: 100),
|
||||
new AbCipTagDefinition("Slow", "ab://10.0.0.5/1,0", "Slow", AbCipDataType.DInt, ScanRateMs: 1000));
|
||||
await drv.InitializeAsync("{}", CancellationToken.None);
|
||||
|
||||
var handle = await drv.SubscribeAsync(["Fast", "Slow"], TimeSpan.FromMilliseconds(500), CancellationToken.None);
|
||||
|
||||
drv.GetSubscriptionBucketCount(handle).ShouldBe(2,
|
||||
"two distinct ScanRateMs values must produce two separate poll-engine subscriptions");
|
||||
|
||||
await drv.UnsubscribeAsync(handle, CancellationToken.None);
|
||||
await drv.ShutdownAsync(CancellationToken.None);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Tag_without_ScanRate_uses_subscription_default()
|
||||
{
|
||||
var drv = NewDriver(
|
||||
new AbCipTagDefinition("PlainA", "ab://10.0.0.5/1,0", "PlainA", AbCipDataType.DInt),
|
||||
new AbCipTagDefinition("PlainB", "ab://10.0.0.5/1,0", "PlainB", AbCipDataType.DInt));
|
||||
await drv.InitializeAsync("{}", CancellationToken.None);
|
||||
|
||||
var handle = await drv.SubscribeAsync(["PlainA", "PlainB"], TimeSpan.FromMilliseconds(750), CancellationToken.None);
|
||||
|
||||
drv.GetSubscriptionBucketCount(handle).ShouldBe(1,
|
||||
"no ScanRateMs configured → single default bucket (legacy behaviour)");
|
||||
|
||||
await drv.UnsubscribeAsync(handle, CancellationToken.None);
|
||||
await drv.ShutdownAsync(CancellationToken.None);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Tag_with_ScanRate_equal_to_default_collapses_into_default_bucket()
|
||||
{
|
||||
// The driver buckets by resolved TimeSpan, not by "did the tag declare a rate" — a tag
|
||||
// whose ScanRateMs matches the subscription default lands in the same bucket as plain
|
||||
// tags. This avoids fragmenting the poll loop when the override is redundant.
|
||||
var drv = NewDriver(
|
||||
new AbCipTagDefinition("Plain", "ab://10.0.0.5/1,0", "Plain", AbCipDataType.DInt),
|
||||
new AbCipTagDefinition("Match", "ab://10.0.0.5/1,0", "Match", AbCipDataType.DInt, ScanRateMs: 500));
|
||||
await drv.InitializeAsync("{}", CancellationToken.None);
|
||||
|
||||
var handle = await drv.SubscribeAsync(["Plain", "Match"], TimeSpan.FromMilliseconds(500), CancellationToken.None);
|
||||
|
||||
drv.GetSubscriptionBucketCount(handle).ShouldBe(1,
|
||||
"ScanRateMs=500 with default=500ms must collapse into the same bucket");
|
||||
|
||||
await drv.UnsubscribeAsync(handle, CancellationToken.None);
|
||||
await drv.ShutdownAsync(CancellationToken.None);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task All_tags_at_same_explicit_rate_produce_one_bucket()
|
||||
{
|
||||
var drv = NewDriver(
|
||||
new AbCipTagDefinition("A", "ab://10.0.0.5/1,0", "A", AbCipDataType.DInt, ScanRateMs: 250),
|
||||
new AbCipTagDefinition("B", "ab://10.0.0.5/1,0", "B", AbCipDataType.DInt, ScanRateMs: 250),
|
||||
new AbCipTagDefinition("C", "ab://10.0.0.5/1,0", "C", AbCipDataType.DInt, ScanRateMs: 250));
|
||||
await drv.InitializeAsync("{}", CancellationToken.None);
|
||||
|
||||
var handle = await drv.SubscribeAsync(["A", "B", "C"], TimeSpan.FromMilliseconds(1000), CancellationToken.None);
|
||||
|
||||
drv.GetSubscriptionBucketCount(handle).ShouldBe(1,
|
||||
"three tags at the same ScanRateMs share one bucket");
|
||||
|
||||
await drv.UnsubscribeAsync(handle, CancellationToken.None);
|
||||
await drv.ShutdownAsync(CancellationToken.None);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Three_distinct_rates_produce_three_buckets()
|
||||
{
|
||||
var drv = NewDriver(
|
||||
new AbCipTagDefinition("Fast", "ab://10.0.0.5/1,0", "Fast", AbCipDataType.DInt, ScanRateMs: 100),
|
||||
new AbCipTagDefinition("Medium", "ab://10.0.0.5/1,0", "Medium", AbCipDataType.DInt, ScanRateMs: 500),
|
||||
new AbCipTagDefinition("Slow", "ab://10.0.0.5/1,0", "Slow", AbCipDataType.DInt, ScanRateMs: 5000));
|
||||
await drv.InitializeAsync("{}", CancellationToken.None);
|
||||
|
||||
var handle = await drv.SubscribeAsync(["Fast", "Medium", "Slow"], TimeSpan.FromMilliseconds(1000), CancellationToken.None);
|
||||
|
||||
drv.GetSubscriptionBucketCount(handle).ShouldBe(3);
|
||||
|
||||
await drv.UnsubscribeAsync(handle, CancellationToken.None);
|
||||
await drv.ShutdownAsync(CancellationToken.None);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Mixed_overrides_and_default_produce_correct_bucket_count()
|
||||
{
|
||||
// Two fast (100 ms), three default (=1000 ms): 2 buckets.
|
||||
var drv = NewDriver(
|
||||
new AbCipTagDefinition("Hmi1", "ab://10.0.0.5/1,0", "Hmi1", AbCipDataType.DInt, ScanRateMs: 100),
|
||||
new AbCipTagDefinition("Hmi2", "ab://10.0.0.5/1,0", "Hmi2", AbCipDataType.DInt, ScanRateMs: 100),
|
||||
new AbCipTagDefinition("Slow1", "ab://10.0.0.5/1,0", "Slow1", AbCipDataType.DInt),
|
||||
new AbCipTagDefinition("Slow2", "ab://10.0.0.5/1,0", "Slow2", AbCipDataType.DInt),
|
||||
new AbCipTagDefinition("Slow3", "ab://10.0.0.5/1,0", "Slow3", AbCipDataType.DInt));
|
||||
await drv.InitializeAsync("{}", CancellationToken.None);
|
||||
|
||||
var handle = await drv.SubscribeAsync(
|
||||
["Hmi1", "Hmi2", "Slow1", "Slow2", "Slow3"],
|
||||
TimeSpan.FromMilliseconds(1000),
|
||||
CancellationToken.None);
|
||||
|
||||
drv.GetSubscriptionBucketCount(handle).ShouldBe(2);
|
||||
|
||||
await drv.UnsubscribeAsync(handle, CancellationToken.None);
|
||||
await drv.ShutdownAsync(CancellationToken.None);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Test fake whose <see cref="DecodeValue"/> returns a strictly-monotonic counter so
|
||||
/// every poll registers as a "change" against <see cref="PollGroupEngine"/>'s
|
||||
/// boxed-equality diff. Lets us count actual poll cadence end-to-end.
|
||||
/// </summary>
|
||||
private sealed class CountingFakeTag : FakeAbCipTag
|
||||
{
|
||||
private int _readCount;
|
||||
public CountingFakeTag(AbCipTagCreateParams p) : base(p) { }
|
||||
public override Task ReadAsync(CancellationToken ct)
|
||||
{
|
||||
Interlocked.Increment(ref _readCount);
|
||||
return base.ReadAsync(ct);
|
||||
}
|
||||
public override object? DecodeValue(AbCipDataType type, int? bitIndex) => _readCount;
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Faster_bucket_publishes_more_frequently_than_slower_bucket()
|
||||
{
|
||||
// End-to-end timing assertion against an in-process fake: a 100 ms tag must
|
||||
// accumulate substantially more DataChange events than a 1000 ms tag over a 1.2 s
|
||||
// window. We force every poll to register as a *change* by returning a strictly
|
||||
// monotonic counter (CountingFakeTag) so the diff path doesn't suppress events.
|
||||
var factory = new FakeAbCipTagFactory
|
||||
{
|
||||
Customise = p => new CountingFakeTag(p),
|
||||
};
|
||||
var drv = new AbCipDriver(new AbCipDriverOptions
|
||||
{
|
||||
Devices = [new AbCipDeviceOptions("ab://10.0.0.5/1,0")],
|
||||
Tags =
|
||||
[
|
||||
new AbCipTagDefinition("Fast", "ab://10.0.0.5/1,0", "Fast", AbCipDataType.DInt, ScanRateMs: 100),
|
||||
new AbCipTagDefinition("Slow", "ab://10.0.0.5/1,0", "Slow", AbCipDataType.DInt, ScanRateMs: 1000),
|
||||
],
|
||||
}, "drv-scan-cadence", factory);
|
||||
await drv.InitializeAsync("{}", CancellationToken.None);
|
||||
|
||||
var fastEvents = 0;
|
||||
var slowEvents = 0;
|
||||
drv.OnDataChange += (_, e) =>
|
||||
{
|
||||
if (e.FullReference == "Fast") Interlocked.Increment(ref fastEvents);
|
||||
else if (e.FullReference == "Slow") Interlocked.Increment(ref slowEvents);
|
||||
};
|
||||
|
||||
var handle = await drv.SubscribeAsync(
|
||||
["Fast", "Slow"], TimeSpan.FromMilliseconds(500), CancellationToken.None);
|
||||
|
||||
await Task.Delay(TimeSpan.FromMilliseconds(1200));
|
||||
|
||||
await drv.UnsubscribeAsync(handle, CancellationToken.None);
|
||||
await drv.ShutdownAsync(CancellationToken.None);
|
||||
|
||||
// Fast at 100 ms: ~12 polls in 1.2 s. Slow at 1000 ms: ~2 polls (initial + 1 tick).
|
||||
// Demand at least 4x — generous against jitter, definitive enough to prove buckets
|
||||
// tick independently.
|
||||
fastEvents.ShouldBeGreaterThan(slowEvents * 4,
|
||||
$"fast bucket ({fastEvents} events) must outpace slow bucket ({slowEvents} events) by >4x");
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Unsubscribe_disposes_every_bucket_subscription()
|
||||
{
|
||||
// Multi-bucket unsubscribe must not leak any inner poll-engine handle. We assert this
|
||||
// by checking the post-unsubscribe bucket count is zero (composite handle purged) and
|
||||
// by counting the engine's ActiveSubscriptionCount via reflection.
|
||||
var drv = NewDriver(
|
||||
new AbCipTagDefinition("A", "ab://10.0.0.5/1,0", "A", AbCipDataType.DInt, ScanRateMs: 100),
|
||||
new AbCipTagDefinition("B", "ab://10.0.0.5/1,0", "B", AbCipDataType.DInt, ScanRateMs: 200),
|
||||
new AbCipTagDefinition("C", "ab://10.0.0.5/1,0", "C", AbCipDataType.DInt, ScanRateMs: 500));
|
||||
await drv.InitializeAsync("{}", CancellationToken.None);
|
||||
|
||||
var handle = await drv.SubscribeAsync(["A", "B", "C"], TimeSpan.FromMilliseconds(1000), CancellationToken.None);
|
||||
drv.GetSubscriptionBucketCount(handle).ShouldBe(3);
|
||||
|
||||
var pollField = typeof(AbCipDriver).GetField("_poll",
|
||||
System.Reflection.BindingFlags.NonPublic | System.Reflection.BindingFlags.Instance);
|
||||
var poll = (PollGroupEngine)pollField!.GetValue(drv)!;
|
||||
poll.ActiveSubscriptionCount.ShouldBe(3, "three buckets → three live engine subscriptions");
|
||||
|
||||
await drv.UnsubscribeAsync(handle, CancellationToken.None);
|
||||
|
||||
poll.ActiveSubscriptionCount.ShouldBe(0, "every bucket subscription must be torn down on unsubscribe");
|
||||
|
||||
// Idempotent: a second unsubscribe must be a no-op, not throw.
|
||||
await drv.UnsubscribeAsync(handle, CancellationToken.None);
|
||||
|
||||
await drv.ShutdownAsync(CancellationToken.None);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Dto_round_trip_preserves_ScanRateMs()
|
||||
{
|
||||
// The DTO surface must round-trip ScanRateMs through JSON so a config file edited via
|
||||
// the Admin UI or hand-written JSON keeps the per-tag rate intact across redeploys.
|
||||
const string configJson = """
|
||||
{
|
||||
"Devices": [ { "HostAddress": "ab://10.0.0.5/1,0", "PlcFamily": "ControlLogix" } ],
|
||||
"Tags": [
|
||||
{ "Name": "Fast", "DeviceHostAddress": "ab://10.0.0.5/1,0", "TagPath": "Fast", "DataType": "DInt", "ScanRateMs": 100 },
|
||||
{ "Name": "Slow", "DeviceHostAddress": "ab://10.0.0.5/1,0", "TagPath": "Slow", "DataType": "DInt", "ScanRateMs": 1000 },
|
||||
{ "Name": "Plain", "DeviceHostAddress": "ab://10.0.0.5/1,0", "TagPath": "Plain", "DataType": "DInt" }
|
||||
]
|
||||
}
|
||||
""";
|
||||
|
||||
var driver = AbCipDriverFactoryExtensions.CreateInstance("drv-roundtrip", configJson);
|
||||
// Reach into the driver's options via reflection to check the Tag definitions hold ScanRateMs.
|
||||
var optionsField = typeof(AbCipDriver).GetField("_options",
|
||||
System.Reflection.BindingFlags.NonPublic | System.Reflection.BindingFlags.Instance);
|
||||
var opts = (AbCipDriverOptions)optionsField!.GetValue(driver)!;
|
||||
opts.Tags.Single(t => t.Name == "Fast").ScanRateMs.ShouldBe(100);
|
||||
opts.Tags.Single(t => t.Name == "Slow").ScanRateMs.ShouldBe(1000);
|
||||
opts.Tags.Single(t => t.Name == "Plain").ScanRateMs.ShouldBeNull();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Negative_or_zero_ScanRate_is_treated_as_unset()
|
||||
{
|
||||
// Mis-typed config (ScanRateMs: 0 or -1) must NOT crash the driver — fall back to the
|
||||
// subscription default. Same "config typo degrades" stance the factory layer takes.
|
||||
var drv = NewDriver(
|
||||
new AbCipTagDefinition("Zero", "ab://10.0.0.5/1,0", "Zero", AbCipDataType.DInt, ScanRateMs: 0),
|
||||
new AbCipTagDefinition("Neg", "ab://10.0.0.5/1,0", "Neg", AbCipDataType.DInt, ScanRateMs: -1),
|
||||
new AbCipTagDefinition("Plain", "ab://10.0.0.5/1,0", "Plain", AbCipDataType.DInt));
|
||||
await drv.InitializeAsync("{}", CancellationToken.None);
|
||||
|
||||
var handle = await drv.SubscribeAsync(["Zero", "Neg", "Plain"], TimeSpan.FromMilliseconds(500), CancellationToken.None);
|
||||
|
||||
drv.GetSubscriptionBucketCount(handle).ShouldBe(1,
|
||||
"0 + negative ScanRateMs must collapse into the subscription default bucket");
|
||||
|
||||
await drv.UnsubscribeAsync(handle, CancellationToken.None);
|
||||
await drv.ShutdownAsync(CancellationToken.None);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Udt_member_inherits_parent_ScanRate()
|
||||
{
|
||||
// The driver fans out UDT members at InitializeAsync — each member tag inherits the
|
||||
// parent's ScanRateMs so an entire UDT subscribed at 100 ms publishes coherently.
|
||||
var factory = new FakeAbCipTagFactory
|
||||
{
|
||||
Customise = p => new FakeAbCipTag(p) { Value = 0 },
|
||||
};
|
||||
var drv = new AbCipDriver(new AbCipDriverOptions
|
||||
{
|
||||
Devices = [new AbCipDeviceOptions("ab://10.0.0.5/1,0")],
|
||||
Tags =
|
||||
[
|
||||
new AbCipTagDefinition(
|
||||
"Motor1", "ab://10.0.0.5/1,0", "Motor1", AbCipDataType.Structure,
|
||||
Members: [new AbCipStructureMember("Speed", AbCipDataType.DInt)],
|
||||
ScanRateMs: 250),
|
||||
],
|
||||
}, "drv-udt-inherit", factory);
|
||||
await drv.InitializeAsync("{}", CancellationToken.None);
|
||||
|
||||
// Subscribing the member tag alone must use the parent's 250 ms rate, not the default.
|
||||
var handle = await drv.SubscribeAsync(["Motor1.Speed"], TimeSpan.FromMilliseconds(1000), CancellationToken.None);
|
||||
drv.GetSubscriptionBucketCount(handle).ShouldBe(1);
|
||||
drv.ResolveTagInterval("Motor1.Speed", TimeSpan.FromMilliseconds(1000))
|
||||
.ShouldBe(TimeSpan.FromMilliseconds(250));
|
||||
|
||||
await drv.UnsubscribeAsync(handle, CancellationToken.None);
|
||||
await drv.ShutdownAsync(CancellationToken.None);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user