@@ -35,6 +35,7 @@ public sealed class AbCipDriver : IDriver, IReadable, IWritable, ITagDiscovery,
|
||||
private readonly Dictionary<string, AbCipTagDefinition> _tagsByName = new(StringComparer.OrdinalIgnoreCase);
|
||||
private readonly AbCipAlarmProjection _alarmProjection;
|
||||
private readonly SemaphoreSlim _discoverySemaphore = new(1, 1);
|
||||
private readonly AbCipWriteCoalescer _writeCoalescer = new();
|
||||
private DriverHealth _health = new(DriverState.Unknown, null, null);
|
||||
|
||||
public event EventHandler<DataChangeEventArgs>? OnDataChange;
|
||||
@@ -415,6 +416,10 @@ public sealed class AbCipDriver : IDriver, IReadable, IWritable, ITagDiscovery,
|
||||
}
|
||||
_devices.Clear();
|
||||
_tagsByName.Clear();
|
||||
// PR abcip-4.2 — wipe the write-coalescer cache on shutdown. Reinitializing the driver
|
||||
// (Tier-B remediation) starts from a clean slate so the first write after restart pays
|
||||
// the full round-trip rather than reusing stale cached state.
|
||||
_writeCoalescer.ResetAll();
|
||||
_health = new DriverHealth(DriverState.Unknown, _health.LastSuccessfulRead, null);
|
||||
}
|
||||
|
||||
@@ -637,6 +642,13 @@ public sealed class AbCipDriver : IDriver, IReadable, IWritable, ITagDiscovery,
|
||||
state.HostState = newState;
|
||||
state.HostStateChangedUtc = DateTime.UtcNow;
|
||||
}
|
||||
// PR abcip-4.2 — drop the per-device write-coalescer cache when we lose the wire. The
|
||||
// PLC may have been restarted while we were offline + our cached "we already wrote 42"
|
||||
// is no longer valid PLC state. Reset on the Stopped transition (and again on the
|
||||
// recovery edge for safety) so the first post-reconnect write of any value pays the
|
||||
// full round-trip + the coalescer rebuilds its cache from the new baseline.
|
||||
if (newState == HostState.Stopped || newState == HostState.Running)
|
||||
_writeCoalescer.Reset(state.Options.HostAddress);
|
||||
OnHostStatusChanged?.Invoke(this,
|
||||
new HostStatusChangedEventArgs(state.Options.HostAddress, old, newState));
|
||||
}
|
||||
@@ -1255,6 +1267,16 @@ public sealed class AbCipDriver : IDriver, IReadable, IWritable, ITagDiscovery,
|
||||
var def = entry.Definition;
|
||||
var w = entry.Request;
|
||||
var now = DateTime.UtcNow;
|
||||
|
||||
// PR abcip-4.2 — write deadband / write-on-change. Consult the coalescer first; a
|
||||
// suppression decision returns Good without hitting libplctag so the OPC UA client sees
|
||||
// the same write semantics it always has, the wire just doesn't move. Driver health is
|
||||
// intentionally left alone on suppression — a coalesced write is neither a success nor
|
||||
// a failure of the underlying connection. Bit-RMW writes go through their own path
|
||||
// (ExecuteBitRmwWriteAsync) which has its own coalescer call site.
|
||||
if (_writeCoalescer.ShouldSuppress(def.DeviceHostAddress, def, w.Value))
|
||||
return (entry.OriginalIndex, AbCipStatusMapper.Good);
|
||||
|
||||
try
|
||||
{
|
||||
var runtime = await EnsureTagRuntimeAsync(device, def, ct).ConfigureAwait(false);
|
||||
@@ -1265,6 +1287,7 @@ public sealed class AbCipDriver : IDriver, IReadable, IWritable, ITagDiscovery,
|
||||
if (status == 0)
|
||||
{
|
||||
_health = new DriverHealth(DriverState.Healthy, now, null);
|
||||
_writeCoalescer.Record(def.DeviceHostAddress, def, w.Value);
|
||||
return (entry.OriginalIndex, AbCipStatusMapper.Good);
|
||||
}
|
||||
return (entry.OriginalIndex, AbCipStatusMapper.MapLibplctagStatus(status));
|
||||
@@ -1309,13 +1332,24 @@ public sealed class AbCipDriver : IDriver, IReadable, IWritable, ITagDiscovery,
|
||||
private async Task<uint> ExecuteBitRmwWriteAsync(
|
||||
DeviceState device, AbCipMultiWritePlanner.ClassifiedWrite entry, CancellationToken ct)
|
||||
{
|
||||
// PR abcip-4.2 — bit-RMW writes go through the coalescer too. The deadband path is
|
||||
// never useful on a single-bit BOOL (deadband < 1 collapses to equality) but
|
||||
// WriteOnChange is — a UI that toggles a SetPoint.Reset bit at every cycle benefits
|
||||
// from suppressing the redundant pulses.
|
||||
var def = entry.Definition;
|
||||
if (_writeCoalescer.ShouldSuppress(def.DeviceHostAddress, def, entry.Request.Value))
|
||||
return AbCipStatusMapper.Good;
|
||||
|
||||
try
|
||||
{
|
||||
var bit = entry.ParsedPath!.BitIndex!.Value;
|
||||
var code = await WriteBitInDIntAsync(device, entry.ParsedPath, bit, entry.Request.Value, ct)
|
||||
.ConfigureAwait(false);
|
||||
if (code == AbCipStatusMapper.Good)
|
||||
{
|
||||
_health = new DriverHealth(DriverState.Healthy, DateTime.UtcNow, null);
|
||||
_writeCoalescer.Record(def.DeviceHostAddress, def, entry.Request.Value);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
catch (OperationCanceledException)
|
||||
@@ -1478,7 +1512,30 @@ public sealed class AbCipDriver : IDriver, IReadable, IWritable, ITagDiscovery,
|
||||
return runtime;
|
||||
}
|
||||
|
||||
public DriverHealth GetHealth() => _health;
|
||||
public DriverHealth GetHealth() => _health with { Diagnostics = BuildDiagnostics() };
|
||||
|
||||
/// <summary>
|
||||
/// PR abcip-4.2 — driver-attributable counter snapshot exposed via
|
||||
/// <see cref="DriverHealth.Diagnostics"/> + the <c>driver-diagnostics</c> RPC. Names use
|
||||
/// the <c>"<DriverType>.<Counter>"</c> convention so the Admin UI can render
|
||||
/// them alongside Modbus / S7 / OPC UA Client metrics without per-driver special-casing.
|
||||
/// Counters today: <c>AbCip.WritesSuppressed</c> (writes the coalescer skipped because
|
||||
/// deadband / write-on-change suppressed them) and <c>AbCip.WritesPassedThrough</c>
|
||||
/// (writes that hit the wire after consulting the coalescer). Future PRs add CIP-level
|
||||
/// counters (Forward Open count, multi-service-packet ratio, etc.) by extending this
|
||||
/// dictionary.
|
||||
/// </summary>
|
||||
private IReadOnlyDictionary<string, double> BuildDiagnostics() => new Dictionary<string, double>
|
||||
{
|
||||
["AbCip.WritesSuppressed"] = _writeCoalescer.TotalWritesSuppressed,
|
||||
["AbCip.WritesPassedThrough"] = _writeCoalescer.TotalWritesPassedThrough,
|
||||
};
|
||||
|
||||
/// <summary>
|
||||
/// Test seam — exposes the live coalescer for unit tests that want to inspect counters
|
||||
/// without rebuilding the diagnostics dictionary on every assertion.
|
||||
/// </summary>
|
||||
internal AbCipWriteCoalescer WriteCoalescer => _writeCoalescer;
|
||||
|
||||
/// <summary>
|
||||
/// CLR-visible allocation footprint only — libplctag's native heap is invisible to the
|
||||
|
||||
@@ -86,7 +86,11 @@ public static class AbCipDriverFactoryExtensions
|
||||
: null,
|
||||
SafetyTag: t.SafetyTag ?? false,
|
||||
// PR abcip-4.1 — per-tag scan rate override; null means "use subscription default".
|
||||
ScanRateMs: t.ScanRateMs);
|
||||
ScanRateMs: t.ScanRateMs,
|
||||
// PR abcip-4.2 — per-tag write-deadband + write-on-change. Both default to "off"
|
||||
// when absent so back-compat deployments behave exactly as before.
|
||||
WriteDeadband: t.WriteDeadband,
|
||||
WriteOnChange: t.WriteOnChange ?? false);
|
||||
|
||||
private static T ParseEnum<T>(string? raw, string? tagName, string driverInstanceId, string field,
|
||||
T? fallback = null) where T : struct, Enum
|
||||
@@ -180,6 +184,22 @@ public static class AbCipDriverFactoryExtensions
|
||||
/// that don't set the knob. Mirrors Kepware's "scan classes" model.
|
||||
/// </summary>
|
||||
public int? ScanRateMs { get; init; }
|
||||
|
||||
/// <summary>
|
||||
/// PR abcip-4.2 — optional numeric write deadband. When set, the driver skips a
|
||||
/// wire write whose absolute difference from the previous successfully-written
|
||||
/// value falls below this threshold. Suppressed writes still return <c>Good</c>.
|
||||
/// <c>null</c> = no numeric suppression (back-compat default).
|
||||
/// </summary>
|
||||
public double? WriteDeadband { get; init; }
|
||||
|
||||
/// <summary>
|
||||
/// PR abcip-4.2 — optional write-on-change gate. When <c>true</c>, the driver
|
||||
/// skips a wire write whose value equals the previous successfully-written value.
|
||||
/// Combines with <see cref="WriteDeadband"/> on numeric tags (deadband path takes
|
||||
/// priority for numerics). Default <c>false</c> — every write reaches the wire.
|
||||
/// </summary>
|
||||
public bool? WriteOnChange { get; init; }
|
||||
}
|
||||
|
||||
internal sealed class AbCipMemberDto
|
||||
|
||||
@@ -279,6 +279,20 @@ public enum AddressingMode
|
||||
/// <c>ScanRateMs < 100</c> is clamped up. UDT member tags inherit the parent tag's
|
||||
/// <c>ScanRateMs</c> at member-fan-out time. See
|
||||
/// <c>docs/drivers/AbCip-Operability.md</c> §"Per-tag scan rate".</param>
|
||||
/// <param name="WriteDeadband">PR abcip-4.2 — optional numeric write deadband. When set and both
|
||||
/// the previous successfully-written value and the new write are numeric, the driver suppresses
|
||||
/// the next write if <c>|new - last| < WriteDeadband</c>. Suppressed writes still return
|
||||
/// <c>Good</c> so the OPC UA write semantics observed by clients are unchanged — the driver
|
||||
/// simply skips the wire round-trip. Mirrors Kepware's "Deadband (write)" knob and is the
|
||||
/// write-side companion to the read-side deadband already shipped at the OPC UA monitored-item
|
||||
/// layer. NaN / Infinity values bypass suppression (let the wire decide). See
|
||||
/// <c>docs/drivers/AbCip-Operability.md</c> §"Write deadband / write-on-change".</param>
|
||||
/// <param name="WriteOnChange">PR abcip-4.2 — optional write-on-change gate. When <c>true</c> and
|
||||
/// the new write equals the previous successfully-written value, the driver suppresses the
|
||||
/// write (returns <c>Good</c> without hitting the wire). Combines with <see cref="WriteDeadband"/>
|
||||
/// for numeric tags — the deadband test takes priority for numerics, equality is the fallback
|
||||
/// for non-numeric types (BOOL setpoints, STRING constants, etc.). Default <c>false</c> —
|
||||
/// legacy behaviour where every write goes to the wire.</param>
|
||||
public sealed record AbCipTagDefinition(
|
||||
string Name,
|
||||
string DeviceHostAddress,
|
||||
@@ -290,7 +304,9 @@ public sealed record AbCipTagDefinition(
|
||||
bool SafetyTag = false,
|
||||
int? StringLength = null,
|
||||
string? Description = null,
|
||||
int? ScanRateMs = null);
|
||||
int? ScanRateMs = null,
|
||||
double? WriteDeadband = null,
|
||||
bool WriteOnChange = false);
|
||||
|
||||
/// <summary>
|
||||
/// One declared member of a UDT tag. Name is the member identifier on the PLC (e.g. <c>Speed</c>,
|
||||
|
||||
211
src/ZB.MOM.WW.OtOpcUa.Driver.AbCip/AbCipWriteCoalescer.cs
Normal file
211
src/ZB.MOM.WW.OtOpcUa.Driver.AbCip/AbCipWriteCoalescer.cs
Normal file
@@ -0,0 +1,211 @@
|
||||
using System.Collections.Concurrent;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Driver.AbCip;
|
||||
|
||||
/// <summary>
|
||||
/// PR abcip-4.2 — per-tag last-successfully-written-value cache supporting
|
||||
/// <see cref="AbCipTagDefinition.WriteDeadband"/> + <see cref="AbCipTagDefinition.WriteOnChange"/>
|
||||
/// suppression in <see cref="AbCipDriver.WriteAsync"/>. Keys are
|
||||
/// <c>(deviceHostAddress, tagAddress)</c>: the same Logix tag served from two devices
|
||||
/// keeps independent caches because the underlying PLC state is independent. Counters
|
||||
/// (<see cref="TotalWritesSuppressed"/>, <see cref="TotalWritesPassedThrough"/>) feed
|
||||
/// <c>AbCip.WritesSuppressed</c> / <c>AbCip.WritesPassedThrough</c> in the driver
|
||||
/// diagnostics surface.
|
||||
/// </summary>
|
||||
/// <remarks>
|
||||
/// <para>The coalescer is consulted *before* the wire write; only successful writes call
|
||||
/// <see cref="Record"/> so a failed write does not poison the cache (next attempt with the
|
||||
/// same value still hits the wire because no last-value was ever recorded for it).
|
||||
/// <see cref="Reset"/> wipes the per-device entries on reconnect / shutdown — the PLC may
|
||||
/// have been restarted and our cached "we already wrote 42" is no longer valid PLC state.</para>
|
||||
///
|
||||
/// <para>Suppression rules:</para>
|
||||
/// <list type="bullet">
|
||||
/// <item>No prior recorded value → not suppressed (first write always passes through).</item>
|
||||
/// <item><see cref="AbCipTagDefinition.WriteDeadband"/> + both values numeric →
|
||||
/// <c>|new - last| < deadband</c> suppresses. NaN / Infinity in either side bypass
|
||||
/// suppression; the wire decides.</item>
|
||||
/// <item><see cref="AbCipTagDefinition.WriteOnChange"/> set →
|
||||
/// <see cref="object.Equals(object?, object?)"/> equality suppresses. For numeric tags
|
||||
/// with a deadband configured, this still applies as the equality fallback when the
|
||||
/// deadband path doesn't trigger (e.g. exact equality with a 0 deadband).</item>
|
||||
/// <item>Neither knob set → never suppress (back-compat default).</item>
|
||||
/// </list>
|
||||
/// </remarks>
|
||||
internal sealed class AbCipWriteCoalescer
|
||||
{
|
||||
private readonly ConcurrentDictionary<(string Device, string Tag), object?> _lastValues =
|
||||
new(LastKeyComparer.Instance);
|
||||
|
||||
private long _totalWritesSuppressed;
|
||||
private long _totalWritesPassedThrough;
|
||||
|
||||
/// <summary>Diagnostics counter — number of writes the coalescer told the driver to skip.</summary>
|
||||
public long TotalWritesSuppressed => Interlocked.Read(ref _totalWritesSuppressed);
|
||||
|
||||
/// <summary>Diagnostics counter — number of writes that hit the wire after consulting the coalescer.</summary>
|
||||
public long TotalWritesPassedThrough => Interlocked.Read(ref _totalWritesPassedThrough);
|
||||
|
||||
/// <summary>
|
||||
/// Decide whether <paramref name="newValue"/> should suppress the wire write for
|
||||
/// <paramref name="tag"/> on <paramref name="deviceHostAddress"/>. Increments the
|
||||
/// internal <see cref="TotalWritesSuppressed"/> / <see cref="TotalWritesPassedThrough"/>
|
||||
/// counter as a side effect so callers don't have to maintain a parallel tally.
|
||||
/// </summary>
|
||||
/// <returns>
|
||||
/// <c>true</c> when the write can be skipped (last value recorded + suppression rule
|
||||
/// fired). <c>false</c> when the write must hit the wire (no prior value, no rule
|
||||
/// active, or values differ enough).
|
||||
/// </returns>
|
||||
public bool ShouldSuppress(string deviceHostAddress, AbCipTagDefinition tag, object? newValue)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(deviceHostAddress);
|
||||
ArgumentNullException.ThrowIfNull(tag);
|
||||
|
||||
// Fast path — neither knob active. Skip the dictionary lookup entirely; this is the
|
||||
// overwhelming common case in deployments that don't opt in.
|
||||
if (!tag.WriteOnChange && !tag.WriteDeadband.HasValue)
|
||||
{
|
||||
Interlocked.Increment(ref _totalWritesPassedThrough);
|
||||
return false;
|
||||
}
|
||||
|
||||
var key = (deviceHostAddress, tag.TagPath);
|
||||
if (!_lastValues.TryGetValue(key, out var lastValue))
|
||||
{
|
||||
// No prior recorded write — first write must always pass through so the PLC sees a
|
||||
// baseline. The Record call after a successful write seeds the cache from this point.
|
||||
Interlocked.Increment(ref _totalWritesPassedThrough);
|
||||
return false;
|
||||
}
|
||||
|
||||
if (TrySuppress(tag, lastValue, newValue))
|
||||
{
|
||||
Interlocked.Increment(ref _totalWritesSuppressed);
|
||||
return true;
|
||||
}
|
||||
|
||||
Interlocked.Increment(ref _totalWritesPassedThrough);
|
||||
return false;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Record the value just successfully written so the next call to
|
||||
/// <see cref="ShouldSuppress"/> can compare against it. Called only from the
|
||||
/// <see cref="AbCipDriver"/> success branch — failed writes do not seed the cache.
|
||||
/// </summary>
|
||||
public void Record(string deviceHostAddress, AbCipTagDefinition tag, object? writtenValue)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(deviceHostAddress);
|
||||
ArgumentNullException.ThrowIfNull(tag);
|
||||
|
||||
// Only care about tags that opted in to either knob — pure-passthrough tags don't need
|
||||
// a cache entry at all and the dictionary stays small for the common case.
|
||||
if (!tag.WriteOnChange && !tag.WriteDeadband.HasValue) return;
|
||||
|
||||
_lastValues[(deviceHostAddress, tag.TagPath)] = writtenValue;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Drop every cached last-value for one device. Called on reconnect or driver shutdown
|
||||
/// so the next write after a wire-state change pays the full round-trip — the PLC may
|
||||
/// have been restarted and our cached "we already wrote 42" is stale.
|
||||
/// </summary>
|
||||
public void Reset(string deviceHostAddress)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(deviceHostAddress);
|
||||
|
||||
// ConcurrentDictionary doesn't have a "remove where" overload, so iterate keys + remove.
|
||||
// Suppression races are tolerated — losing one suppression decision after a reconnect
|
||||
// costs at most one extra wire write, never correctness.
|
||||
foreach (var key in _lastValues.Keys)
|
||||
{
|
||||
if (string.Equals(key.Device, deviceHostAddress, StringComparison.OrdinalIgnoreCase))
|
||||
_lastValues.TryRemove(key, out _);
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>Drop every cached last-value across all devices — invoked on full driver shutdown.</summary>
|
||||
public void ResetAll() => _lastValues.Clear();
|
||||
|
||||
private static bool TrySuppress(AbCipTagDefinition tag, object? lastValue, object? newValue)
|
||||
{
|
||||
// Numeric deadband — only fires when both sides convert cleanly to double. NaN / Infinity
|
||||
// bypass: the wire decides because IEEE-754 comparisons against NaN are undefined and
|
||||
// we don't want a stale +Inf in the cache to silently swallow a real reset.
|
||||
if (tag.WriteDeadband.HasValue
|
||||
&& TryToDouble(lastValue, out var lastNum)
|
||||
&& TryToDouble(newValue, out var newNum))
|
||||
{
|
||||
if (double.IsNaN(lastNum) || double.IsNaN(newNum)
|
||||
|| double.IsInfinity(lastNum) || double.IsInfinity(newNum))
|
||||
{
|
||||
// Fall through to the WriteOnChange equality check below — NaN / Infinity skip
|
||||
// the deadband path but a legacy WriteOnChange tag should still benefit from
|
||||
// exact-equality suppression on the same packet.
|
||||
}
|
||||
else if (Math.Abs(newNum - lastNum) < tag.WriteDeadband.Value)
|
||||
{
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
// WriteOnChange — equality fallback. Always evaluated when the flag is set so a
|
||||
// non-numeric tag (BOOL, STRING) still benefits even when WriteDeadband is set on the
|
||||
// same tag (the deadband path simply doesn't apply to it).
|
||||
if (tag.WriteOnChange && Equals(lastValue, newValue))
|
||||
return true;
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
private static bool TryToDouble(object? value, out double result)
|
||||
{
|
||||
// IConvertible covers every Logix atomic type the AB CIP driver decodes (sbyte, short,
|
||||
// int, long + their unsigned siblings + float / double). DateTime and string are
|
||||
// excluded — neither has a meaningful "deadband" interpretation.
|
||||
switch (value)
|
||||
{
|
||||
case null:
|
||||
result = 0;
|
||||
return false;
|
||||
case bool:
|
||||
result = 0;
|
||||
return false;
|
||||
case string:
|
||||
result = 0;
|
||||
return false;
|
||||
case DateTime:
|
||||
result = 0;
|
||||
return false;
|
||||
case IConvertible conv:
|
||||
try
|
||||
{
|
||||
result = conv.ToDouble(System.Globalization.CultureInfo.InvariantCulture);
|
||||
return true;
|
||||
}
|
||||
catch
|
||||
{
|
||||
result = 0;
|
||||
return false;
|
||||
}
|
||||
default:
|
||||
result = 0;
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
private sealed class LastKeyComparer : IEqualityComparer<(string Device, string Tag)>
|
||||
{
|
||||
public static readonly LastKeyComparer Instance = new();
|
||||
|
||||
public bool Equals((string Device, string Tag) x, (string Device, string Tag) y) =>
|
||||
string.Equals(x.Device, y.Device, StringComparison.OrdinalIgnoreCase)
|
||||
&& string.Equals(x.Tag, y.Tag, StringComparison.Ordinal);
|
||||
|
||||
public int GetHashCode((string Device, string Tag) obj) =>
|
||||
HashCode.Combine(
|
||||
StringComparer.OrdinalIgnoreCase.GetHashCode(obj.Device),
|
||||
StringComparer.Ordinal.GetHashCode(obj.Tag));
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user