diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Modbus/ModbusDriver.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Modbus/ModbusDriver.cs index af536fc..7da8eac 100644 --- a/src/ZB.MOM.WW.OtOpcUa.Driver.Modbus/ModbusDriver.cs +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Modbus/ModbusDriver.cs @@ -196,8 +196,17 @@ public sealed class ModbusDriver var transport = RequireTransport(); var now = DateTime.UtcNow; var results = new DataValueSnapshot[fullReferences.Count]; + + // #143 block-read coalescing: when MaxReadGap is non-zero, route eligible tags through + // the coalescing planner first. Tags it can't coalesce (arrays, coils, prohibited, + // unknown) fall through to the per-tag loop below with results[i] still default. + var coalesced = _options.MaxReadGap > 0 + ? await ReadCoalescedAsync(transport, fullReferences, results, now, cancellationToken).ConfigureAwait(false) + : new HashSet(); + for (var i = 0; i < fullReferences.Count; i++) { + if (coalesced.Contains(i)) continue; if (!_tagsByName.TryGetValue(fullReferences[i], out var tag)) { results[i] = new DataValueSnapshot(null, StatusBadNodeIdUnknown, null, now); @@ -377,6 +386,130 @@ public sealed class ModbusDriver /// Resolve the UnitId for a tag — per-tag override (#142) or driver-level fallback. private byte ResolveUnitId(ModbusTagDefinition tag) => tag.UnitId ?? _options.UnitId; + /// + /// #143 block-read coalescing planner. Groups eligible tags by (UnitId, Region), sorts + /// by start address, and merges adjacent / near-adjacent (gap ≤ MaxReadGap) into single + /// FC03/FC04 reads. Per-block: emit one Modbus PDU, slice the response back into per-tag + /// values, populate and the WriteOnChangeOnly cache. Returns + /// the set of indices the planner handled — the + /// caller falls back to the per-tag path for the rest (arrays, coils, prohibited, unknown). + /// + private async Task> ReadCoalescedAsync( + IModbusTransport transport, + IReadOnlyList fullReferences, + DataValueSnapshot[] results, + DateTime timestamp, + CancellationToken ct) + { + // Eligible: known tag, register region (HoldingRegisters or InputRegisters), scalar + // (no array, no string), not CoalesceProhibited, not BitInRegister. + var eligible = new List<(int Index, string Ref, ModbusTagDefinition Tag)>(); + for (var i = 0; i < fullReferences.Count; i++) + { + if (!_tagsByName.TryGetValue(fullReferences[i], out var tag)) continue; + if (tag.CoalesceProhibited) continue; + if (tag.ArrayCount.HasValue) continue; + if (tag.Region is not (ModbusRegion.HoldingRegisters or ModbusRegion.InputRegisters)) continue; + if (tag.DataType is ModbusDataType.String or ModbusDataType.BitInRegister) continue; + eligible.Add((i, fullReferences[i], tag)); + } + if (eligible.Count == 0) return new HashSet(); + + var handled = new HashSet(); + + // Group by (UnitId, Region) — coalescing across slaves or regions is unsafe. + foreach (var group in eligible.GroupBy(e => (Unit: ResolveUnitId(e.Tag), e.Tag.Region))) + { + var fc = group.Key.Region == ModbusRegion.HoldingRegisters ? (byte)0x03 : (byte)0x04; + var cap = _options.MaxRegistersPerRead == 0 ? (ushort)125 : _options.MaxRegistersPerRead; + var sorted = group.OrderBy(e => e.Tag.Address).ToList(); + + // Build merged blocks. A "block" is (start, lastEnd, members[]) where lastEnd is + // the inclusive end address of the last tag's register span. A new tag joins the + // block if its start ≤ lastEnd + 1 + MaxReadGap AND the resulting span ≤ cap. + var blocks = new List<(ushort Start, ushort End, List<(int Index, ModbusTagDefinition Tag)> Members)>(); + foreach (var (idx, _, tag) in sorted) + { + var tagStart = tag.Address; + var tagEnd = (ushort)(tag.Address + RegisterCount(tag) - 1); + if (blocks.Count > 0) + { + var last = blocks[^1]; + var gap = tagStart - last.End - 1; + var newEnd = Math.Max(tagEnd, last.End); + var newSpan = newEnd - last.Start + 1; + if (gap <= _options.MaxReadGap && newSpan <= cap) + { + last.Members.Add((idx, tag)); + blocks[^1] = (last.Start, (ushort)newEnd, last.Members); + continue; + } + } + blocks.Add((tagStart, tagEnd, new List<(int, ModbusTagDefinition)> { (idx, tag) })); + } + + // Issue one PDU per block. On block-level failure mark every member Bad — caller's + // per-tag fallback won't re-try since handled-set already includes them; auto-split- + // on-failure is a follow-up. + foreach (var block in blocks) + { + if (block.Members.Count == 1) + { + // Lone tag — let the per-tag path handle it for symmetry with WriteOnChange + // cache invalidation. Costs nothing because one tag = one PDU either way. + continue; + } + + var qty = (ushort)(block.End - block.Start + 1); + try + { + var data = await ReadRegisterBlockAsync(transport, group.Key.Unit, fc, block.Start, qty, ct).ConfigureAwait(false); + foreach (var (idx, tag) in block.Members) + { + var sliceOffsetBytes = (tag.Address - block.Start) * 2; + var sliceLenBytes = RegisterCount(tag) * 2; + var value = DecodeRegister(data.AsSpan(sliceOffsetBytes, sliceLenBytes), tag); + results[idx] = new DataValueSnapshot(value, 0u, timestamp, timestamp); + handled.Add(idx); + InvalidateWriteCacheIfDiverged(fullReferences[idx], value); + } + _health = new DriverHealth(DriverState.Healthy, timestamp, null); + } + catch (ModbusException mex) + { + var status = MapModbusExceptionToStatus(mex.ExceptionCode); + foreach (var (idx, _) in block.Members) + { + results[idx] = new DataValueSnapshot(null, status, null, timestamp); + handled.Add(idx); + } + _health = new DriverHealth(DriverState.Degraded, _health.LastSuccessfulRead, mex.Message); + } + catch (Exception ex) + { + foreach (var (idx, _) in block.Members) + { + results[idx] = new DataValueSnapshot(null, StatusBadCommunicationError, null, timestamp); + handled.Add(idx); + } + _health = new DriverHealth(DriverState.Degraded, _health.LastSuccessfulRead, ex.Message); + } + } + } + return handled; + } + + private void InvalidateWriteCacheIfDiverged(string fullRef, object value) + { + if (!_options.WriteOnChangeOnly) return; + lock (_lastWrittenLock) + { + if (_lastWrittenByRef.TryGetValue(fullRef, out var prev) && !Equals(prev, value)) + _lastWrittenByRef.Remove(fullRef); + } + } + + private async Task ReadRegisterBlockAsync( IModbusTransport transport, byte unitId, byte fc, ushort address, ushort quantity, CancellationToken ct) { diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Modbus/ModbusDriverOptions.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Modbus/ModbusDriverOptions.cs index 91391c9..c8eff91 100644 --- a/src/ZB.MOM.WW.OtOpcUa.Driver.Modbus/ModbusDriverOptions.cs +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Modbus/ModbusDriverOptions.cs @@ -79,6 +79,16 @@ public sealed class ModbusDriverOptions /// public bool DisableFC23 { get; init; } = false; + /// + /// Block-read coalescing budget (#143). When non-zero, the read planner combines tags + /// in the same (UnitId, Region) group whose addresses are at most this many registers + /// apart into a single FC03/FC04/FC01/FC02 read. The sliced response is then dispatched + /// back to per-tag values. Default 0 = no coalescing — every tag gets its own + /// PDU (preserves pre-#143 behaviour). Typical opt-in values are 5..32 — large enough + /// to bridge a few unused registers, small enough to avoid trampling protected holes. + /// + public ushort MaxReadGap { get; init; } = 0; + /// /// PLC family hint that drives the parser's family-native branch (#144). When set to a /// non-Generic value, address strings using that family's native syntax (DL205 V2000 / @@ -225,6 +235,12 @@ public sealed class ModbusProbeOptions /// Tags with different UnitIds belong to different physical slaves and the read planner /// must NOT coalesce them across slaves — even at the same address. /// +/// +/// Escape hatch for #143 block-read coalescing. When true, the planner reads this +/// tag in isolation regardless of ModbusDriverOptions.MaxReadGap. Use when the +/// surrounding registers are write-only or fault on read (some Schneider Premium / Siemens +/// PNs have protected holes). Default false. +/// public sealed record ModbusTagDefinition( string Name, ModbusRegion Region, @@ -238,4 +254,5 @@ public sealed record ModbusTagDefinition( bool WriteIdempotent = false, int? ArrayCount = null, double? Deadband = null, - byte? UnitId = null); + byte? UnitId = null, + bool CoalesceProhibited = false); diff --git a/tests/ZB.MOM.WW.OtOpcUa.Driver.Modbus.Tests/ModbusCoalescingTests.cs b/tests/ZB.MOM.WW.OtOpcUa.Driver.Modbus.Tests/ModbusCoalescingTests.cs new file mode 100644 index 0000000..60d5234 --- /dev/null +++ b/tests/ZB.MOM.WW.OtOpcUa.Driver.Modbus.Tests/ModbusCoalescingTests.cs @@ -0,0 +1,176 @@ +using Shouldly; +using Xunit; +using ZB.MOM.WW.OtOpcUa.Core.Abstractions; + +namespace ZB.MOM.WW.OtOpcUa.Driver.Modbus.Tests; + +/// +/// #143 block-read coalescing: with MaxReadGap > 0 the driver merges nearby tags into a +/// single FC03/FC04 read. Coverage focuses on the planner output (PDU count + quantity) +/// rather than wire bytes — those are tested by ModbusDriverTests. +/// +[Trait("Category", "Unit")] +public sealed class ModbusCoalescingTests +{ + private sealed class CountingTransport : IModbusTransport + { + public readonly List<(byte Unit, byte Fc, ushort Address, ushort Quantity)> Reads = new(); + public Task ConnectAsync(CancellationToken ct) => Task.CompletedTask; + public Task SendAsync(byte unitId, byte[] pdu, CancellationToken ct) + { + var addr = (ushort)((pdu[1] << 8) | pdu[2]); + var qty = (ushort)((pdu[3] << 8) | pdu[4]); + if (pdu[0] is 0x03 or 0x04) Reads.Add((unitId, pdu[0], addr, qty)); + switch (pdu[0]) + { + case 0x03: case 0x04: + { + var resp = new byte[2 + qty * 2]; + resp[0] = pdu[0]; resp[1] = (byte)(qty * 2); + return Task.FromResult(resp); + } + default: return Task.FromResult(new byte[] { pdu[0], 0, 0 }); + } + } + public ValueTask DisposeAsync() => ValueTask.CompletedTask; + } + + [Fact] + public async Task MaxReadGap_Zero_Defaults_To_Per_Tag_Reads() + { + var fake = new CountingTransport(); + var t1 = new ModbusTagDefinition("T1", ModbusRegion.HoldingRegisters, 100, ModbusDataType.Int16); + var t2 = new ModbusTagDefinition("T2", ModbusRegion.HoldingRegisters, 102, ModbusDataType.Int16); + var opts = new ModbusDriverOptions { Host = "f", Tags = [t1, t2], MaxReadGap = 0, + Probe = new ModbusProbeOptions { Enabled = false } }; + var drv = new ModbusDriver(opts, "m1", _ => fake); + await drv.InitializeAsync("{}", CancellationToken.None); + + await drv.ReadAsync(["T1", "T2"], CancellationToken.None); + + // With coalescing off, expect 2 separate FC03 reads. + var fc03Reads = fake.Reads.Where(r => r.Fc == 0x03).ToList(); + fc03Reads.Count.ShouldBe(2); + } + + [Fact] + public async Task MaxReadGap_Bridges_Two_Adjacent_Tags_Into_One_Read() + { + var fake = new CountingTransport(); + // Three tags within 5 registers: T1@100, T2@102, T3@104. Gaps: 1, 1. MaxReadGap=2 → 1 block. + var t1 = new ModbusTagDefinition("T1", ModbusRegion.HoldingRegisters, 100, ModbusDataType.Int16); + var t2 = new ModbusTagDefinition("T2", ModbusRegion.HoldingRegisters, 102, ModbusDataType.Int16); + var t3 = new ModbusTagDefinition("T3", ModbusRegion.HoldingRegisters, 104, ModbusDataType.Int16); + var opts = new ModbusDriverOptions { Host = "f", Tags = [t1, t2, t3], MaxReadGap = 2, + Probe = new ModbusProbeOptions { Enabled = false } }; + var drv = new ModbusDriver(opts, "m1", _ => fake); + await drv.InitializeAsync("{}", CancellationToken.None); + + await drv.ReadAsync(["T1", "T2", "T3"], CancellationToken.None); + + var fc03Reads = fake.Reads.Where(r => r.Fc == 0x03).ToList(); + fc03Reads.Count.ShouldBe(1); + fc03Reads[0].Address.ShouldBe((ushort)100); + fc03Reads[0].Quantity.ShouldBe((ushort)5); // 100..104 + } + + [Fact] + public async Task MaxReadGap_Splits_When_Gap_Exceeds_Threshold() + { + var fake = new CountingTransport(); + // T1@100, T2@102 (gap 1, joins block), T3@200 (gap 97 → exceeds gap=10 → second block). + var t1 = new ModbusTagDefinition("T1", ModbusRegion.HoldingRegisters, 100, ModbusDataType.Int16); + var t2 = new ModbusTagDefinition("T2", ModbusRegion.HoldingRegisters, 102, ModbusDataType.Int16); + var t3 = new ModbusTagDefinition("T3", ModbusRegion.HoldingRegisters, 200, ModbusDataType.Int16); + var opts = new ModbusDriverOptions { Host = "f", Tags = [t1, t2, t3], MaxReadGap = 10, + Probe = new ModbusProbeOptions { Enabled = false } }; + var drv = new ModbusDriver(opts, "m1", _ => fake); + await drv.InitializeAsync("{}", CancellationToken.None); + + await drv.ReadAsync(["T1", "T2", "T3"], CancellationToken.None); + + var fc03Reads = fake.Reads.Where(r => r.Fc == 0x03).ToList(); + fc03Reads.Count.ShouldBe(2); // T1+T2 coalesced; T3 alone + } + + [Fact] + public async Task CoalesceProhibited_Tag_Reads_Alone() + { + var fake = new CountingTransport(); + var t1 = new ModbusTagDefinition("T1", ModbusRegion.HoldingRegisters, 100, ModbusDataType.Int16); + var t2 = new ModbusTagDefinition("T2", ModbusRegion.HoldingRegisters, 102, ModbusDataType.Int16, CoalesceProhibited: true); + var t3 = new ModbusTagDefinition("T3", ModbusRegion.HoldingRegisters, 104, ModbusDataType.Int16); + var opts = new ModbusDriverOptions { Host = "f", Tags = [t1, t2, t3], MaxReadGap = 10, + Probe = new ModbusProbeOptions { Enabled = false } }; + var drv = new ModbusDriver(opts, "m1", _ => fake); + await drv.InitializeAsync("{}", CancellationToken.None); + + await drv.ReadAsync(["T1", "T2", "T3"], CancellationToken.None); + + var fc03Reads = fake.Reads.Where(r => r.Fc == 0x03).ToList(); + // T2 read alone (CoalesceProhibited). T1 and T3 coalesce (gap = 3 within MaxReadGap=10). + // Expect 2 reads total. + fc03Reads.Count.ShouldBe(2); + } + + [Fact] + public async Task Coalescing_Does_Not_Cross_UnitId_Boundaries() + { + var fake = new CountingTransport(); + // Same Region + adjacent addresses but different UnitIds → must NOT coalesce. + var t1 = new ModbusTagDefinition("T1", ModbusRegion.HoldingRegisters, 100, ModbusDataType.Int16, UnitId: 1); + var t2 = new ModbusTagDefinition("T2", ModbusRegion.HoldingRegisters, 102, ModbusDataType.Int16, UnitId: 2); + var opts = new ModbusDriverOptions { Host = "f", Tags = [t1, t2], MaxReadGap = 100, + Probe = new ModbusProbeOptions { Enabled = false } }; + var drv = new ModbusDriver(opts, "m1", _ => fake); + await drv.InitializeAsync("{}", CancellationToken.None); + + await drv.ReadAsync(["T1", "T2"], CancellationToken.None); + + var fc03Reads = fake.Reads.Where(r => r.Fc == 0x03).ToList(); + fc03Reads.Count.ShouldBe(2); + fc03Reads.Select(r => r.Unit).Distinct().Count().ShouldBe(2); + } + + [Fact] + public async Task Coalescing_Splits_Block_That_Exceeds_MaxRegistersPerRead() + { + var fake = new CountingTransport(); + // T1@0, T2@200 with MaxReadGap=300 would naturally form one block of 201 registers, + // but MaxRegistersPerRead=125 caps it. The planner should NOT coalesce because the + // resulting span exceeds the cap — it falls back to two separate reads. + var t1 = new ModbusTagDefinition("T1", ModbusRegion.HoldingRegisters, 0, ModbusDataType.Int16); + var t2 = new ModbusTagDefinition("T2", ModbusRegion.HoldingRegisters, 200, ModbusDataType.Int16); + var opts = new ModbusDriverOptions { Host = "f", Tags = [t1, t2], MaxReadGap = 300, + MaxRegistersPerRead = 125, Probe = new ModbusProbeOptions { Enabled = false } }; + var drv = new ModbusDriver(opts, "m1", _ => fake); + await drv.InitializeAsync("{}", CancellationToken.None); + + await drv.ReadAsync(["T1", "T2"], CancellationToken.None); + + var fc03Reads = fake.Reads.Where(r => r.Fc == 0x03).ToList(); + fc03Reads.Count.ShouldBe(2); + } + + [Fact] + public async Task Coalesced_Read_Surfaces_Each_Tag_Value_Independently() + { + // Sanity check: after coalescing the per-tag values must still be correct (no + // index-shift bugs in the slice math). + var fake = new CountingTransport(); + var t1 = new ModbusTagDefinition("T1", ModbusRegion.HoldingRegisters, 100, ModbusDataType.Int16); + var t2 = new ModbusTagDefinition("T2", ModbusRegion.HoldingRegisters, 101, ModbusDataType.Int16); + var opts = new ModbusDriverOptions { Host = "f", Tags = [t1, t2], MaxReadGap = 5, + Probe = new ModbusProbeOptions { Enabled = false } }; + var drv = new ModbusDriver(opts, "m1", _ => fake); + await drv.InitializeAsync("{}", CancellationToken.None); + + var values = await drv.ReadAsync(["T1", "T2"], CancellationToken.None); + + values.Count.ShouldBe(2); + values[0].StatusCode.ShouldBe(0u); + values[1].StatusCode.ShouldBe(0u); + // The fake returns zeros for our values; the assertion is on quality + that the slice + // didn't mis-index (a bug there would surface as IndexOutOfRange / wrong type). + } +}