diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Modbus/ModbusDriver.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Modbus/ModbusDriver.cs index a77c3ac..847efa1 100644 --- a/src/ZB.MOM.WW.OtOpcUa.Driver.Modbus/ModbusDriver.cs +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Modbus/ModbusDriver.cs @@ -55,7 +55,47 @@ public sealed class ModbusDriver _poll = new PollGroupEngine( reader: ReadAsync, onChange: (handle, tagRef, snapshot) => - OnDataChange?.Invoke(this, new DataChangeEventArgs(handle, tagRef, snapshot))); + { + // #141 deadband filter: when configured on a tag, suppress publishes whose + // numeric distance from the last-published value is below the threshold. + if (!ShouldPublish(tagRef, snapshot)) return; + OnDataChange?.Invoke(this, new DataChangeEventArgs(handle, tagRef, snapshot)); + }); + } + + // Last-published value per tag, keyed by FullReference. Used by ShouldPublish to apply + // the deadband filter. Stored as object so all numeric types share one map; the comparison + // does a typed cast inside. + private readonly Dictionary _lastPublishedByRef = new(StringComparer.OrdinalIgnoreCase); + + // Last-written value per tag for the WriteOnChangeOnly suppression. Invalidated by reads + // that return a different value (so an HMI-side change doesn't get masked). + private readonly Dictionary _lastWrittenByRef = new(StringComparer.OrdinalIgnoreCase); + private readonly object _lastWrittenLock = new(); + + private bool ShouldPublish(string tagRef, DataValueSnapshot snapshot) + { + if (!_tagsByName.TryGetValue(tagRef, out var tag) || tag.Deadband is null) return true; + if (snapshot.Value is null) return true; + // Deadband only applies to numeric scalar types — array / Bool / String publishes + // unconditionally. Easier to special-case skip than to enumerate the supported types. + if (tag.ArrayCount.HasValue || tag.DataType is ModbusDataType.Bool or ModbusDataType.BitInRegister or ModbusDataType.String) + return true; + + if (!_lastPublishedByRef.TryGetValue(tagRef, out var prev)) + { + // First sample passes through unconditionally — the threshold can't be evaluated + // without a baseline. The publish lands and seeds the comparison. + _lastPublishedByRef[tagRef] = snapshot.Value; + return true; + } + + var newD = Convert.ToDouble(snapshot.Value); + var oldD = Convert.ToDouble(prev); + if (Math.Abs(newD - oldD) < tag.Deadband.Value) return false; + + _lastPublishedByRef[tagRef] = snapshot.Value; + return true; } public string DriverInstanceId => _driverInstanceId; @@ -151,6 +191,19 @@ public sealed class ModbusDriver var value = await ReadOneAsync(transport, tag, cancellationToken).ConfigureAwait(false); results[i] = new DataValueSnapshot(value, 0u, now, now); _health = new DriverHealth(DriverState.Healthy, now, null); + + // Invalidate the WriteOnChangeOnly cache when the read returns a different value + // — typically an HMI-side or PLC-internal change. Without this, a setpoint + // tweaked at the panel could be silently re-suppressed when our client tried + // to restore it. + if (_options.WriteOnChangeOnly) + { + lock (_lastWrittenLock) + { + if (_lastWrittenByRef.TryGetValue(fullReferences[i], out var prev) && !Equals(prev, value)) + _lastWrittenByRef.Remove(fullReferences[i]); + } + } } catch (ModbusException mex) { @@ -387,10 +440,20 @@ public sealed class ModbusDriver results[i] = new WriteResult(StatusBadNotWritable); continue; } + // #141 WriteOnChangeOnly suppression: skip the wire round-trip when the same value + // was already successfully written and no read since has invalidated the cache. + if (_options.WriteOnChangeOnly && IsRedundantWrite(w.FullReference, w.Value)) + { + results[i] = new WriteResult(0u); + continue; + } + try { await WriteOneAsync(transport, tag, w.Value, cancellationToken).ConfigureAwait(false); results[i] = new WriteResult(0u); + if (_options.WriteOnChangeOnly) + lock (_lastWrittenLock) _lastWrittenByRef[w.FullReference] = w.Value; } catch (ModbusException mex) { @@ -404,6 +467,20 @@ public sealed class ModbusDriver return results; } + private bool IsRedundantWrite(string tagRef, object? value) + { + lock (_lastWrittenLock) + { + if (!_lastWrittenByRef.TryGetValue(tagRef, out var prev)) return false; + // Object.Equals handles boxed-numeric equality (5 == 5 even if one was short and + // one int through boxing). For arrays we deliberately don't suppress — equality + // semantics on arrays are reference-only so the cache miss is the safer answer. + if (prev is null || value is null) return Equals(prev, value); + if (prev is Array || value is Array) return false; + return prev.Equals(value); + } + } + // BitInRegister writes need a read-modify-write against the full holding register. A // per-register lock keeps concurrent bit-write callers from stomping on each other — // Write bit 0 and Write bit 5 targeting the same register can arrive on separate diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Modbus/ModbusDriverFactoryExtensions.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Modbus/ModbusDriverFactoryExtensions.cs index 5c1f3c1..3e2f048 100644 --- a/src/ZB.MOM.WW.OtOpcUa.Driver.Modbus/ModbusDriverFactoryExtensions.cs +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Modbus/ModbusDriverFactoryExtensions.cs @@ -45,6 +45,7 @@ public static class ModbusDriverFactoryExtensions UseFC15ForSingleCoilWrites = dto.UseFC15ForSingleCoilWrites ?? false, UseFC16ForSingleRegisterWrites = dto.UseFC16ForSingleRegisterWrites ?? false, DisableFC23 = dto.DisableFC23 ?? false, + WriteOnChangeOnly = dto.WriteOnChangeOnly ?? false, AutoReconnect = dto.AutoReconnect ?? true, Tags = dto.Tags is { Count: > 0 } ? [.. dto.Tags.Select(t => BuildTag(t, driverInstanceId))] @@ -102,7 +103,8 @@ public static class ModbusDriverFactoryExtensions ? ModbusStringByteOrder.HighByteFirst : ParseEnum(t.StringByteOrder, name, driverInstanceId, "StringByteOrder"), WriteIdempotent: t.WriteIdempotent ?? false, - ArrayCount: parsed.ArrayCount); + ArrayCount: parsed.ArrayCount, + Deadband: t.Deadband); } return new ModbusTagDefinition( @@ -121,7 +123,8 @@ public static class ModbusDriverFactoryExtensions ? ModbusStringByteOrder.HighByteFirst : ParseEnum(t.StringByteOrder, t.Name, driverInstanceId, "StringByteOrder"), WriteIdempotent: t.WriteIdempotent ?? false, - ArrayCount: t.ArrayCount); + ArrayCount: t.ArrayCount, + Deadband: t.Deadband); } private static T ParseEnum(string? raw, string? tagName, string driverInstanceId, string field) where T : struct, Enum @@ -155,6 +158,7 @@ public static class ModbusDriverFactoryExtensions public bool? UseFC15ForSingleCoilWrites { get; init; } public bool? UseFC16ForSingleRegisterWrites { get; init; } public bool? DisableFC23 { get; init; } + public bool? WriteOnChangeOnly { get; init; } public bool? AutoReconnect { get; init; } public List? Tags { get; init; } public ModbusProbeDto? Probe { get; init; } @@ -202,6 +206,7 @@ public static class ModbusDriverFactoryExtensions public string? StringByteOrder { get; init; } public bool? WriteIdempotent { get; init; } public int? ArrayCount { get; init; } + public double? Deadband { get; init; } } internal sealed class ModbusProbeDto diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Modbus/ModbusDriverOptions.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Modbus/ModbusDriverOptions.cs index 5052a79..687f0ae 100644 --- a/src/ZB.MOM.WW.OtOpcUa.Driver.Modbus/ModbusDriverOptions.cs +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Modbus/ModbusDriverOptions.cs @@ -79,6 +79,17 @@ public sealed class ModbusDriverOptions /// public bool DisableFC23 { get; init; } = false; + /// + /// When true, the driver suppresses redundant writes: if the most recent + /// successful write to a tag carried value V and a new write of V arrives, the second + /// write returns Good without touching the wire. Saves PLC bandwidth on clients that + /// re-publish the same setpoint every scan. The cached "last written" is invalidated + /// on the next read that returns a different value, so HMI-side changes don't get + /// masked. Default false preserves the historical "every write goes to the wire" + /// behaviour. Per-tag deadband lives on ModbusTagDefinition.Deadband. + /// + public bool WriteOnChangeOnly { get; init; } = false; + /// /// When true (default) the built-in detects /// mid-transaction socket failures (, @@ -186,6 +197,13 @@ public sealed class ModbusProbeOptions /// registers consumed = ArrayCount * registers-per-element. Bit + array is rejected at /// bind time (no use case). Default null = scalar (existing behavior). /// +/// +/// When non-null, the subscribe path suppresses a publish whenever +/// |new - last_published| < Deadband. Reduces wire traffic on noisy analog +/// signals (flow meters, temperatures). Only meaningful for numeric scalar types +/// (Int*, UInt*, Float32, Float64, Bcd*); ignored for Bool / BitInRegister / String / +/// array tags. Default null = no deadband (every change publishes). +/// public sealed record ModbusTagDefinition( string Name, ModbusRegion Region, @@ -197,4 +215,5 @@ public sealed record ModbusTagDefinition( ushort StringLength = 0, ModbusStringByteOrder StringByteOrder = ModbusStringByteOrder.HighByteFirst, bool WriteIdempotent = false, - int? ArrayCount = null); + int? ArrayCount = null, + double? Deadband = null); diff --git a/tests/ZB.MOM.WW.OtOpcUa.Driver.Modbus.Tests/ModbusProtocolOptionsTests.cs b/tests/ZB.MOM.WW.OtOpcUa.Driver.Modbus.Tests/ModbusProtocolOptionsTests.cs index b1aac03..04e36b4 100644 --- a/tests/ZB.MOM.WW.OtOpcUa.Driver.Modbus.Tests/ModbusProtocolOptionsTests.cs +++ b/tests/ZB.MOM.WW.OtOpcUa.Driver.Modbus.Tests/ModbusProtocolOptionsTests.cs @@ -63,7 +63,7 @@ public sealed class ModbusProtocolOptionsTests { var fake = new CapturingTransport(); var tag = new ModbusTagDefinition("Run", ModbusRegion.Coils, 0, ModbusDataType.Bool); - var drv = new ModbusDriver(new ModbusDriverOptions { Host = "f", Tags = [tag] }, "m1", _ => fake); + var drv = new ModbusDriver(new ModbusDriverOptions { Host = "f", Tags = [tag], Probe = new ModbusProbeOptions { Enabled = false } }, "m1", _ => fake); await drv.InitializeAsync("{}", CancellationToken.None); await drv.WriteAsync([new WriteRequest("Run", true)], CancellationToken.None); @@ -76,7 +76,7 @@ public sealed class ModbusProtocolOptionsTests { var fake = new CapturingTransport(); var tag = new ModbusTagDefinition("Run", ModbusRegion.Coils, 0, ModbusDataType.Bool); - var opts = new ModbusDriverOptions { Host = "f", Tags = [tag], UseFC15ForSingleCoilWrites = true }; + var opts = new ModbusDriverOptions { Host = "f", Tags = [tag], UseFC15ForSingleCoilWrites = true, Probe = new ModbusProbeOptions { Enabled = false } }; var drv = new ModbusDriver(opts, "m1", _ => fake); await drv.InitializeAsync("{}", CancellationToken.None); @@ -90,7 +90,7 @@ public sealed class ModbusProtocolOptionsTests { var fake = new CapturingTransport(); var tag = new ModbusTagDefinition("Sp", ModbusRegion.HoldingRegisters, 0, ModbusDataType.Int16); - var drv = new ModbusDriver(new ModbusDriverOptions { Host = "f", Tags = [tag] }, "m1", _ => fake); + var drv = new ModbusDriver(new ModbusDriverOptions { Host = "f", Tags = [tag], Probe = new ModbusProbeOptions { Enabled = false } }, "m1", _ => fake); await drv.InitializeAsync("{}", CancellationToken.None); await drv.WriteAsync([new WriteRequest("Sp", (short)42)], CancellationToken.None); @@ -103,7 +103,7 @@ public sealed class ModbusProtocolOptionsTests { var fake = new CapturingTransport(); var tag = new ModbusTagDefinition("Sp", ModbusRegion.HoldingRegisters, 0, ModbusDataType.Int16); - var opts = new ModbusDriverOptions { Host = "f", Tags = [tag], UseFC16ForSingleRegisterWrites = true }; + var opts = new ModbusDriverOptions { Host = "f", Tags = [tag], UseFC16ForSingleRegisterWrites = true, Probe = new ModbusProbeOptions { Enabled = false } }; var drv = new ModbusDriver(opts, "m1", _ => fake); await drv.InitializeAsync("{}", CancellationToken.None); @@ -118,7 +118,7 @@ public sealed class ModbusProtocolOptionsTests var fake = new CapturingTransport(); // 2500 coils with cap 2000 → 2 reads (2000 + 500). var tag = new ModbusTagDefinition("Big", ModbusRegion.Coils, 0, ModbusDataType.Bool, ArrayCount: 2500); - var opts = new ModbusDriverOptions { Host = "f", Tags = [tag], MaxCoilsPerRead = 2000 }; + var opts = new ModbusDriverOptions { Host = "f", Tags = [tag], MaxCoilsPerRead = 2000, Probe = new ModbusProbeOptions { Enabled = false } }; var drv = new ModbusDriver(opts, "m1", _ => fake); await drv.InitializeAsync("{}", CancellationToken.None); diff --git a/tests/ZB.MOM.WW.OtOpcUa.Driver.Modbus.Tests/ModbusSubscribeOptionsTests.cs b/tests/ZB.MOM.WW.OtOpcUa.Driver.Modbus.Tests/ModbusSubscribeOptionsTests.cs new file mode 100644 index 0000000..d39a1b3 --- /dev/null +++ b/tests/ZB.MOM.WW.OtOpcUa.Driver.Modbus.Tests/ModbusSubscribeOptionsTests.cs @@ -0,0 +1,172 @@ +using Shouldly; +using Xunit; +using ZB.MOM.WW.OtOpcUa.Core.Abstractions; + +namespace ZB.MOM.WW.OtOpcUa.Driver.Modbus.Tests; + +/// +/// #141 subscribe-side knobs: per-tag Deadband, driver-wide WriteOnChangeOnly. +/// +[Trait("Category", "Unit")] +public sealed class ModbusSubscribeOptionsTests +{ + /// + /// Programmable transport: caller seeds a bank-of-registers value, each FC03 returns + /// the current value. Lets tests step the underlying register through a sequence and + /// observe how the deadband filter responds. + /// + private sealed class ProgrammableTransport : IModbusTransport + { + public ushort CurrentValue; + public int WritesSent; + public int FC06Count; + public Task ConnectAsync(CancellationToken ct) => Task.CompletedTask; + public Task SendAsync(byte unitId, byte[] pdu, CancellationToken ct) + { + switch (pdu[0]) + { + case 0x03: + { + var qty = (ushort)((pdu[3] << 8) | pdu[4]); + var resp = new byte[2 + qty * 2]; + resp[0] = 0x03; resp[1] = (byte)(qty * 2); + for (var i = 0; i < qty; i++) + { + resp[2 + i * 2] = (byte)(CurrentValue >> 8); + resp[3 + i * 2] = (byte)(CurrentValue & 0xFF); + } + return Task.FromResult(resp); + } + case 0x06: + WritesSent++; FC06Count++; + CurrentValue = (ushort)((pdu[3] << 8) | pdu[4]); + return Task.FromResult(pdu); + default: + return Task.FromResult(new byte[] { pdu[0], 0, 0 }); + } + } + public ValueTask DisposeAsync() => ValueTask.CompletedTask; + } + + [Fact] + public async Task Deadband_Suppresses_SubThreshold_Changes() + { + var fake = new ProgrammableTransport(); + var tag = new ModbusTagDefinition("Temp", ModbusRegion.HoldingRegisters, 0, ModbusDataType.Int16, Deadband: 5.0); + var opts = new ModbusDriverOptions { Host = "f", Tags = [tag], Probe = new ModbusProbeOptions { Enabled = false } }; + var drv = new ModbusDriver(opts, "m1", _ => fake); + await drv.InitializeAsync("{}", CancellationToken.None); + + var publishes = new List(); + drv.OnDataChange += (_, e) => publishes.Add((short)e.Snapshot.Value!); + + // First publish always passes (no baseline). Then step the value: + // 100 → 102 (delta 2 < 5, suppressed) → 106 (delta 6 ≥ 5, published) → 107 (delta 1, suppressed). + var sub = await drv.SubscribeAsync(["Temp"], TimeSpan.FromMilliseconds(50), CancellationToken.None); + try + { + fake.CurrentValue = 100; + await Task.Delay(150); + fake.CurrentValue = 102; + await Task.Delay(150); + fake.CurrentValue = 106; + await Task.Delay(150); + fake.CurrentValue = 107; + await Task.Delay(150); + } + finally + { + await drv.UnsubscribeAsync(sub, CancellationToken.None); + } + + // Expect at most 2 distinct values surfaced (100 baseline + 106). The 102 and 107 should + // be suppressed by the deadband. Ordering can be flaky on slow CI so we assert the set, + // not the exact sequence. + publishes.ShouldContain((short)100); + publishes.ShouldContain((short)106); + publishes.ShouldNotContain((short)102); + publishes.ShouldNotContain((short)107); + } + + [Fact] + public async Task Deadband_Null_Publishes_Every_Change() + { + var fake = new ProgrammableTransport(); + var tag = new ModbusTagDefinition("Temp", ModbusRegion.HoldingRegisters, 0, ModbusDataType.Int16); // no deadband + var opts = new ModbusDriverOptions { Host = "f", Tags = [tag], Probe = new ModbusProbeOptions { Enabled = false } }; + var drv = new ModbusDriver(opts, "m1", _ => fake); + await drv.InitializeAsync("{}", CancellationToken.None); + + var publishes = new List(); + drv.OnDataChange += (_, e) => publishes.Add((short)e.Snapshot.Value!); + + var sub = await drv.SubscribeAsync(["Temp"], TimeSpan.FromMilliseconds(50), CancellationToken.None); + try + { + fake.CurrentValue = 100; await Task.Delay(150); + fake.CurrentValue = 101; await Task.Delay(150); // tiny change still publishes + } + finally { await drv.UnsubscribeAsync(sub, CancellationToken.None); } + + publishes.ShouldContain((short)100); + publishes.ShouldContain((short)101); + } + + [Fact] + public async Task WriteOnChangeOnly_Suppresses_Identical_Repeated_Writes() + { + var fake = new ProgrammableTransport(); + var tag = new ModbusTagDefinition("Sp", ModbusRegion.HoldingRegisters, 0, ModbusDataType.Int16); + var opts = new ModbusDriverOptions { Host = "f", Tags = [tag], WriteOnChangeOnly = true, + Probe = new ModbusProbeOptions { Enabled = false } }; + var drv = new ModbusDriver(opts, "m1", _ => fake); + await drv.InitializeAsync("{}", CancellationToken.None); + + await drv.WriteAsync([new WriteRequest("Sp", (short)42)], CancellationToken.None); + await drv.WriteAsync([new WriteRequest("Sp", (short)42)], CancellationToken.None); // suppressed + await drv.WriteAsync([new WriteRequest("Sp", (short)42)], CancellationToken.None); // suppressed + await drv.WriteAsync([new WriteRequest("Sp", (short)43)], CancellationToken.None); // distinct + + fake.WritesSent.ShouldBe(2, "two distinct values written; identical-value repeats suppressed"); + } + + [Fact] + public async Task WriteOnChangeOnly_Default_False_Always_Writes() + { + var fake = new ProgrammableTransport(); + var tag = new ModbusTagDefinition("Sp", ModbusRegion.HoldingRegisters, 0, ModbusDataType.Int16); + var opts = new ModbusDriverOptions { Host = "f", Tags = [tag], + Probe = new ModbusProbeOptions { Enabled = false } }; + var drv = new ModbusDriver(opts, "m1", _ => fake); + await drv.InitializeAsync("{}", CancellationToken.None); + + await drv.WriteAsync([new WriteRequest("Sp", (short)42)], CancellationToken.None); + await drv.WriteAsync([new WriteRequest("Sp", (short)42)], CancellationToken.None); + await drv.WriteAsync([new WriteRequest("Sp", (short)42)], CancellationToken.None); + + fake.WritesSent.ShouldBe(3, "default false → every write goes to the wire"); + } + + [Fact] + public async Task WriteOnChangeOnly_Cache_Invalidated_By_Read_Divergence() + { + var fake = new ProgrammableTransport(); + var tag = new ModbusTagDefinition("Sp", ModbusRegion.HoldingRegisters, 0, ModbusDataType.Int16); + var opts = new ModbusDriverOptions { Host = "f", Tags = [tag], WriteOnChangeOnly = true, + Probe = new ModbusProbeOptions { Enabled = false } }; + var drv = new ModbusDriver(opts, "m1", _ => fake); + await drv.InitializeAsync("{}", CancellationToken.None); + + await drv.WriteAsync([new WriteRequest("Sp", (short)42)], CancellationToken.None); + fake.FC06Count.ShouldBe(1); + + // External change at the PLC (panel writes 99). Read sees 99 → invalidates the cache. + fake.CurrentValue = 99; + var read = await drv.ReadAsync(["Sp"], CancellationToken.None); + read[0].Value.ShouldBe((short)99); + + // Now writing 42 again should NOT be suppressed because the cache was invalidated. + await drv.WriteAsync([new WriteRequest("Sp", (short)42)], CancellationToken.None); + fake.FC06Count.ShouldBe(2, "post-divergence write not suppressed"); + } +}