Auto: s7-b2 — block-read coalescing for contiguous DBs

Closes #293
This commit is contained in:
Joseph Doherty
2026-04-25 21:23:06 -04:00
parent 5432c49364
commit 17faf76ea7
7 changed files with 976 additions and 11 deletions

View File

@@ -0,0 +1,241 @@
namespace ZB.MOM.WW.OtOpcUa.Driver.S7;
/// <summary>
/// Block-read coalescing planner for the S7 driver (PR-S7-B2). Where the
/// <see cref="S7ReadPacker"/> coalesces N scalar tags into ⌈N/19⌉
/// <c>Plc.ReadMultipleVarsAsync</c> PDUs, this planner takes one further pass:
/// it groups same-area, same-DB tags by contiguous byte range and folds them
/// into a single <c>Plc.ReadBytesAsync</c> covering the merged span. The
/// response is sliced client-side per tag so the per-tag decode path is
/// unchanged.
/// </summary>
/// <remarks>
/// <para>
/// <b>Why coalesce</b>: Reading <c>DB1.DBW0</c> + <c>DB1.DBW2</c> +
/// <c>DB1.DBW4</c> as three multi-var items still uses three slots in a
/// single PDU; coalescing into one 6-byte byte-range read drops the per-item
/// framing entirely and makes the request fit in fewer (sometimes zero
/// additional) PDUs. On a typical contiguous DB the wire-level reduction is
/// 50:1 for 50 contiguous DBWs.
/// </para>
/// <para>
/// <b>Gap-merge threshold</b>: The planner merges adjacent tag ranges when
/// the gap between them is at most the <c>gapMergeBytes</c> argument to
/// <see cref="Plan"/>. The default <see cref="DefaultGapMergeBytes"/> is
/// 16 bytes — over-fetching 16 bytes is cheaper than one extra PDU
/// (240-byte default PDU envelope, ~18 bytes per request frame). Operators
/// can tune the threshold per driver instance via
/// <see cref="S7DriverOptions.BlockCoalescingGapBytes"/>.
/// </para>
/// <para>
/// <b>Opaque-size opt-out</b>: STRING / WSTRING / CHAR / WCHAR and DTL /
/// DT / S5TIME / TIME / TOD / DATE-as-DateTime tags carry a header (or
/// have a per-tag width that varies with <c>StringLength</c>) and are
/// flagged <c>OpaqueSize=true</c>. The planner emits these as standalone
/// single-tag ranges and never merges them into a sibling block — the
/// per-tag decode path needs an exact byte slice and a wrong slice from
/// a coalesced read would silently corrupt every neighbour.
/// </para>
/// <para>
/// <b>Order-preserving</b>: Each <see cref="BlockReadRange"/> carries a list
/// of <see cref="TagSlice"/> values pointing back at the original
/// caller-index. The driver's <c>ReadAsync</c> uses the index to write the
/// decoded value into the correct slot of the result array, so caller
/// ordering of the input <c>fullReferences</c> is preserved across the
/// coalescing step.
/// </para>
/// </remarks>
internal static class S7BlockCoalescingPlanner
{
/// <summary>Default gap-merge threshold in bytes.</summary>
internal const int DefaultGapMergeBytes = 16;
/// <summary>
/// One coalesced byte-range request. The driver issues a single
/// <c>Plc.ReadBytesAsync</c> covering <see cref="StartByte"/>..
/// <see cref="StartByte"/>+<see cref="ByteCount"/>; each entry in
/// <see cref="Tags"/> carries the offset within the response buffer to
/// slice for that tag.
/// </summary>
internal sealed record BlockReadRange(
S7Area Area,
int DbNumber,
int StartByte,
int ByteCount,
IReadOnlyList<TagSlice> Tags);
/// <summary>
/// One tag's slot inside a <see cref="BlockReadRange"/>. <see cref="OffsetInBlock"/>
/// is the byte offset within the coalesced buffer; <see cref="ByteCount"/> is the
/// per-tag width that the slice covers.
/// </summary>
/// <param name="CallerIndex">Original index in the caller's <c>fullReferences</c> list.</param>
/// <param name="OffsetInBlock">Byte offset into <see cref="BlockReadRange"/>'s buffer.</param>
/// <param name="ByteCount">Bytes the tag claims from the buffer.</param>
internal sealed record TagSlice(int CallerIndex, int OffsetInBlock, int ByteCount);
/// <summary>
/// Input row. Captures everything the planner needs to make a coalescing
/// decision without needing the full <see cref="S7TagDefinition"/> graph.
/// </summary>
/// <param name="CallerIndex">Caller-supplied stable index used to thread the decoded value back.</param>
/// <param name="Area">Memory area; M and DB never merge into the same range.</param>
/// <param name="DbNumber">DB number when <see cref="Area"/> is DataBlock; 0 otherwise.</param>
/// <param name="StartByte">Byte offset in the area where the tag's storage begins.</param>
/// <param name="ByteCount">On-wire byte width of the tag.</param>
/// <param name="OpaqueSize">
/// True for tags whose effective decode width is variable / header-prefixed
/// (STRING/WSTRING/CHAR/WCHAR and structured timestamps DTL/DT/etc.) so the
/// planner skips them — they emit standalone reads and never merge with
/// neighbours.
/// </param>
internal sealed record TagSpec(
int CallerIndex,
S7Area Area,
int DbNumber,
int StartByte,
int ByteCount,
bool OpaqueSize);
/// <summary>
/// Plan a list of byte-range reads from <paramref name="tags"/>. Same-area /
/// same-DB rows are sorted by <see cref="TagSpec.StartByte"/> then merged
/// greedily when the gap between their byte ranges is &lt;=
/// <paramref name="gapMergeBytes"/>. Opaque-size rows always emit as their
/// own single-tag range and never extend a sibling block.
/// </summary>
/// <remarks>
/// Order of returned ranges is not significant — the driver issues them
/// sequentially against the same connection gate so wire-level ordering is
/// determined by the loop, not by this list. The planner DOES preserve
/// the caller-index inside each range so the per-tag decode result lands
/// in the correct slot of the response array.
/// </remarks>
internal static List<BlockReadRange> Plan(IReadOnlyList<TagSpec> tags, int gapMergeBytes = DefaultGapMergeBytes)
{
if (gapMergeBytes < 0)
throw new ArgumentOutOfRangeException(nameof(gapMergeBytes), "Gap-merge threshold must be non-negative.");
var ranges = new List<BlockReadRange>(tags.Count);
if (tags.Count == 0) return ranges;
// Phase 1: opaque rows emit as standalone single-tag ranges. Strip them
// out of the merge candidate set so neighbour ranges don't accidentally
// straddle a STRING header / DTL block.
var mergeable = new List<TagSpec>(tags.Count);
foreach (var t in tags)
{
if (t.OpaqueSize)
{
ranges.Add(new BlockReadRange(
t.Area, t.DbNumber, t.StartByte, t.ByteCount,
[new TagSlice(t.CallerIndex, OffsetInBlock: 0, t.ByteCount)]));
}
else
{
mergeable.Add(t);
}
}
// Phase 2: bucket by (Area, DbNumber). Memory M and DataBlock DB1 (etc.)
// share neither the wire request type nor an addressable space, so they
// can never coalesce.
var groups = mergeable.GroupBy(t => (t.Area, t.DbNumber));
foreach (var group in groups)
{
// Sort ascending by start byte so the greedy merge below is O(n).
// Stable secondary sort on caller index keeps tag-slice ordering
// deterministic for tags with identical byte offsets.
var sorted = group
.OrderBy(t => t.StartByte)
.ThenBy(t => t.CallerIndex)
.ToList();
var blockStart = sorted[0].StartByte;
var blockEnd = sorted[0].StartByte + sorted[0].ByteCount;
var blockSlices = new List<TagSlice>
{
new(sorted[0].CallerIndex, 0, sorted[0].ByteCount),
};
for (var i = 1; i < sorted.Count; i++)
{
var t = sorted[i];
var gap = t.StartByte - blockEnd;
// gap < 0 means the next tag overlaps with the current block — treat
// as zero-gap merge (overlap is fine, the slice just reuses earlier
// bytes). gap <= threshold = merge; otherwise close the current
// block and start a new one.
if (gap <= gapMergeBytes)
{
var newEnd = Math.Max(blockEnd, t.StartByte + t.ByteCount);
blockSlices.Add(new TagSlice(t.CallerIndex, t.StartByte - blockStart, t.ByteCount));
blockEnd = newEnd;
}
else
{
ranges.Add(new BlockReadRange(
group.Key.Area, group.Key.DbNumber, blockStart, blockEnd - blockStart, blockSlices));
blockStart = t.StartByte;
blockEnd = t.StartByte + t.ByteCount;
blockSlices = [new TagSlice(t.CallerIndex, 0, t.ByteCount)];
}
}
ranges.Add(new BlockReadRange(
group.Key.Area, group.Key.DbNumber, blockStart, blockEnd - blockStart, blockSlices));
}
return ranges;
}
/// <summary>
/// True when <paramref name="tag"/>'s on-wire width is variable / header-prefixed.
/// Such tags MUST NOT participate in block coalescing because the slice into a
/// coalesced byte buffer would land at a wrong offset for any neighbour.
/// </summary>
internal static bool IsOpaqueSize(S7TagDefinition tag)
{
// Variable-width string types — STRING/WSTRING carry a 2-byte (or 4-byte)
// header and the actual length depends on the runtime value, not the
// declared StringLength. CHAR/WCHAR are fixed-width (1 / 2 bytes) but
// routed via the per-tag string codec path, so coalescing them would
// bypass the codec; treat them as opaque to keep the decode surface
// unchanged.
if (tag.DataType is S7DataType.String or S7DataType.WString
or S7DataType.Char or S7DataType.WChar)
return true;
// Structured timestamps — DTL is 12 bytes, DT is 8 bytes BCD-encoded;
// both decode through S7DateTimeCodec and would silently mis-decode if
// the slice landed mid-block. S5TIME/TIME/TOD/DATE are fixed-width 2/4
// bytes but currently flow through the per-tag codec path; treat them
// all as opaque so the planner emits a single-tag range and the existing
// codec dispatch stays the source of truth for date/time decode.
if (tag.DataType is S7DataType.Dtl or S7DataType.DateAndTime
or S7DataType.S5Time or S7DataType.Time or S7DataType.TimeOfDay or S7DataType.Date)
return true;
// Arrays opt out: per-tag width is N × elementBytes, the slice must be
// exact. Routing them as opaque keeps the array-aware byte-range read
// path in S7Driver.ReadOneAsync.
if (tag.ElementCount is int n && n > 1)
return true;
return false;
}
/// <summary>
/// Byte width of a packable scalar tag for byte-range coalescing. Mirrors the
/// size suffix the address grammar carried (<see cref="S7Size.Bit"/>=1 byte
/// because reading a single bit still requires reading the containing byte;
/// bit-extraction happens in the slice step).
/// </summary>
internal static int ScalarByteCount(S7Size size) => size switch
{
S7Size.Bit => 1,
S7Size.Byte => 1,
S7Size.Word => 2,
S7Size.DWord => 4,
S7Size.LWord => 8,
_ => throw new InvalidOperationException($"Unknown S7Size {size}"),
};
}

View File

@@ -1,4 +1,5 @@
using System.Buffers.Binary;
using System.Collections.Generic;
using S7.Net;
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
@@ -86,6 +87,31 @@ public sealed class S7Driver(S7DriverOptions options, string driverInstanceId)
private DriverHealth _health = new(DriverState.Unknown, null, null);
private bool _disposed;
// ---- Block-read coalescing diagnostics (PR-S7-B2) ----
//
// Counters surface through DriverHealth.Diagnostics so the driver-diagnostics
// RPC and integration tests can verify wire-level reduction without needing
// access to the underlying S7.Net PDU stream. Names match the
// "<DriverType>.<Counter>" convention adopted for the modbus and opcuaclient
// drivers — see decision #154.
private long _totalBlockReads; // Plc.ReadBytesAsync calls issued by the coalesced path
private long _totalMultiVarBatches; // Plc.ReadMultipleVarsAsync calls issued
private long _totalSingleReads; // per-tag ReadOneAsync fallbacks
/// <summary>
/// Total <c>Plc.ReadBytesAsync</c> calls the coalesced byte-range path issued.
/// Test-only entry point for the integration assertion that 50 contiguous DBWs
/// coalesce into exactly 1 byte-range read.
/// </summary>
internal long TotalBlockReads => Interlocked.Read(ref _totalBlockReads);
/// <summary>
/// Total <c>Plc.ReadMultipleVarsAsync</c> batches issued. For a fully-coalesced
/// contiguous workload this stays at 0 — every tag flows through the byte-range
/// path instead.
/// </summary>
internal long TotalMultiVarBatches => Interlocked.Read(ref _totalMultiVarBatches);
public string DriverInstanceId => driverInstanceId;
public string DriverType => "S7";
@@ -206,10 +232,11 @@ public sealed class S7Driver(S7DriverOptions options, string driverInstanceId)
try
{
// Phase 1: classify each request into (a) unknown / not-found, (b) packable
// scalar (Bool/Byte/Int16/UInt16/Int32/UInt32/Float32/Float64), or (c) needs
// per-tag fallback (arrays, strings, dates, 64-bit ints, UDT-fanout). Packable
// tags collect into 19-item batches sent via Plc.ReadMultipleVarsAsync; the
// rest stay on the legacy ReadOneAsync path.
// scalar (Bool/Byte/Int16/UInt16/Int32/UInt32/Float32/Float64) which can
// potentially coalesce into a byte-range read, or (c) per-tag fallback
// (arrays, strings, dates, 64-bit ints, UDT-fanout). Packable tags feed
// the block-coalescing planner first (PR-S7-B2); whatever survives as a
// singleton range falls through to the multi-var packer (PR-S7-B1).
var packableIndexes = new List<int>(fullReferences.Count);
var fallbackIndexes = new List<int>();
for (var i = 0; i < fullReferences.Count; i++)
@@ -225,15 +252,55 @@ public sealed class S7Driver(S7DriverOptions options, string driverInstanceId)
else fallbackIndexes.Add(i);
}
// Phase 2: bin-pack and dispatch the packable group via ReadMultipleVarsAsync.
// On a per-batch S7.Net failure the whole batch falls back to ReadOneAsync per
// tag — that way one bad item doesn't poison the rest of the batch and each
// tag still gets its own per-item StatusCode (BadDeviceFailure for PUT/GET
// refusal, BadCommunicationError for transport faults).
// Phase 2a: block-read coalescing — group same-area / same-DB packable
// tags into contiguous byte ranges (gap-merge threshold from
// S7DriverOptions.BlockCoalescingGapBytes, default 16). Multi-tag ranges
// dispatch via Plc.ReadBytesAsync; singleton ranges fall through to the
// multi-var packer below.
var singletons = new List<int>();
if (packableIndexes.Count > 0)
{
var specs = new List<S7BlockCoalescingPlanner.TagSpec>(packableIndexes.Count);
foreach (var idx in packableIndexes)
{
var tag = _tagsByName[fullReferences[idx]];
var addr = _parsedByName[fullReferences[idx]];
specs.Add(new S7BlockCoalescingPlanner.TagSpec(
CallerIndex: idx,
Area: addr.Area,
DbNumber: addr.DbNumber,
StartByte: addr.ByteOffset,
ByteCount: S7BlockCoalescingPlanner.ScalarByteCount(addr.Size),
OpaqueSize: false));
}
var ranges = S7BlockCoalescingPlanner.Plan(specs, _options.BlockCoalescingGapBytes);
foreach (var range in ranges)
{
if (range.Tags.Count == 1)
{
// Singleton — let the multi-var packer batch it with other
// singletons in the same ReadAsync call. Cheaper than its
// own one-tag ReadBytesAsync round-trip.
singletons.Add(range.Tags[0].CallerIndex);
}
else
{
await ReadCoalescedRangeAsync(plc, range, fullReferences, results, now, cancellationToken)
.ConfigureAwait(false);
}
}
}
// Phase 2b: bin-pack residual singletons through ReadMultipleVarsAsync.
// On a per-batch S7.Net failure the whole batch falls back to ReadOneAsync
// per tag — that way one bad item doesn't poison the rest of the batch
// and each tag still gets its own per-item StatusCode (BadDeviceFailure
// for PUT/GET refusal, BadCommunicationError for transport faults).
if (singletons.Count > 0)
{
var budget = S7ReadPacker.ItemBudget(S7ReadPacker.DefaultPduSize);
var batches = S7ReadPacker.BinPack(packableIndexes, budget);
var batches = S7ReadPacker.BinPack(singletons, budget);
foreach (var batch in batches)
{
await ReadBatchAsync(plc, batch, fullReferences, results, now, cancellationToken)
@@ -255,6 +322,108 @@ public sealed class S7Driver(S7DriverOptions options, string driverInstanceId)
return results;
}
/// <summary>
/// Issue one coalesced <c>Plc.ReadBytesAsync</c> covering
/// <paramref name="range"/> and slice the response per tag. On a transport
/// fault the whole range falls back to per-tag <see cref="ReadOneAsSnapshotAsync"/>
/// so a single bad slot doesn't poison N-1 good neighbours.
/// </summary>
private async Task ReadCoalescedRangeAsync(
global::S7.Net.Plc plc,
S7BlockCoalescingPlanner.BlockReadRange range,
IReadOnlyList<string> fullReferences,
DataValueSnapshot[] results,
DateTime now,
CancellationToken ct)
{
byte[]? buf;
try
{
Interlocked.Increment(ref _totalBlockReads);
buf = await plc.ReadBytesAsync(MapArea(range.Area), range.DbNumber, range.StartByte, range.ByteCount, ct)
.ConfigureAwait(false);
}
catch (Exception)
{
// Block read fault → fan out per-tag so a bad address in the block
// surfaces its own StatusCode and good neighbours can still retry
// through the per-tag fallback path.
foreach (var slice in range.Tags)
{
var tag = _tagsByName[fullReferences[slice.CallerIndex]];
results[slice.CallerIndex] = await ReadOneAsSnapshotAsync(plc, tag, now, ct).ConfigureAwait(false);
}
return;
}
if (buf is null || buf.Length != range.ByteCount)
{
// Short / truncated PDU — same fan-out semantics as a transport fault.
foreach (var slice in range.Tags)
{
results[slice.CallerIndex] = new DataValueSnapshot(null, StatusBadCommunicationError, null, now);
}
return;
}
foreach (var slice in range.Tags)
{
var name = fullReferences[slice.CallerIndex];
var tag = _tagsByName[name];
var addr = _parsedByName[name];
try
{
var value = DecodeScalarFromBlock(buf, slice.OffsetInBlock, tag, addr);
results[slice.CallerIndex] = new DataValueSnapshot(value, 0u, now, now);
}
catch (Exception ex)
{
results[slice.CallerIndex] = new DataValueSnapshot(null, StatusBadInternalError, null, now);
_health = new DriverHealth(DriverState.Degraded, _health.LastSuccessfulRead, ex.Message);
}
}
_health = new DriverHealth(DriverState.Healthy, now, null, BuildDiagnostics());
}
/// <summary>
/// Decode one packable scalar from a coalesced byte buffer. Mirrors the
/// reinterpret table in <see cref="S7ReadPacker.DecodePackedValue"/> so the
/// coalesced and per-tag-batch paths produce identical .NET types for the
/// same wire bytes.
/// </summary>
private static object DecodeScalarFromBlock(byte[] buf, int offset, S7TagDefinition tag, S7ParsedAddress addr)
{
return (tag.DataType, addr.Size) switch
{
(S7DataType.Bool, S7Size.Bit) => ((buf[offset] >> addr.BitOffset) & 0x1) == 1,
(S7DataType.Byte, S7Size.Byte) => buf[offset],
(S7DataType.UInt16, S7Size.Word) => BinaryPrimitives.ReadUInt16BigEndian(buf.AsSpan(offset, 2)),
(S7DataType.Int16, S7Size.Word) => BinaryPrimitives.ReadInt16BigEndian(buf.AsSpan(offset, 2)),
(S7DataType.UInt32, S7Size.DWord) => BinaryPrimitives.ReadUInt32BigEndian(buf.AsSpan(offset, 4)),
(S7DataType.Int32, S7Size.DWord) => BinaryPrimitives.ReadInt32BigEndian(buf.AsSpan(offset, 4)),
(S7DataType.Float32, S7Size.DWord) =>
BitConverter.UInt32BitsToSingle(BinaryPrimitives.ReadUInt32BigEndian(buf.AsSpan(offset, 4))),
(S7DataType.Float64, S7Size.LWord) =>
BitConverter.UInt64BitsToDouble(BinaryPrimitives.ReadUInt64BigEndian(buf.AsSpan(offset, 8))),
_ => throw new System.IO.InvalidDataException(
$"S7 block-decode: tag '{tag.Name}' declared {tag.DataType} but address parsed Size={addr.Size}"),
};
}
/// <summary>
/// Snapshot of the wire-level coalescing counters surfaced through
/// <see cref="DriverHealth.Diagnostics"/>. Names follow the
/// <c>"&lt;DriverType&gt;.&lt;Counter&gt;"</c> convention so the driver-diagnostics
/// RPC can render them in the Admin UI alongside Modbus / OPC UA Client
/// metrics without a per-driver special-case.
/// </summary>
private IReadOnlyDictionary<string, double> BuildDiagnostics() => new Dictionary<string, double>
{
["S7.TotalBlockReads"] = Interlocked.Read(ref _totalBlockReads),
["S7.TotalMultiVarBatches"] = Interlocked.Read(ref _totalMultiVarBatches),
["S7.TotalSingleReads"] = Interlocked.Read(ref _totalSingleReads),
};
/// <summary>
/// Read one packed batch via <c>Plc.ReadMultipleVarsAsync</c>. On batch
/// success each <c>DataItem.Value</c> decodes into its tag's snapshot
@@ -279,6 +448,7 @@ public sealed class S7Driver(S7DriverOptions options, string driverInstanceId)
try
{
Interlocked.Increment(ref _totalMultiVarBatches);
var responses = await plc.ReadMultipleVarsAsync(items, ct).ConfigureAwait(false);
// S7.Net mutates the input list in place and also returns it; iterate by
// index against the input list so we are agnostic to either contract.
@@ -303,7 +473,7 @@ public sealed class S7Driver(S7DriverOptions options, string driverInstanceId)
_health = new DriverHealth(DriverState.Degraded, _health.LastSuccessfulRead, ex.Message);
}
}
_health = new DriverHealth(DriverState.Healthy, now, null);
_health = new DriverHealth(DriverState.Healthy, now, null, BuildDiagnostics());
}
catch (Exception)
{
@@ -329,6 +499,7 @@ public sealed class S7Driver(S7DriverOptions options, string driverInstanceId)
{
try
{
Interlocked.Increment(ref _totalSingleReads);
var value = await ReadOneAsync(plc, tag, ct).ConfigureAwait(false);
_health = new DriverHealth(DriverState.Healthy, now, null);
return new DataValueSnapshot(value, 0u, now, now);

View File

@@ -63,6 +63,24 @@ public sealed class S7DriverOptions
/// Running ↔ Stopped transitions.
/// </summary>
public S7ProbeOptions Probe { get; init; } = new();
/// <summary>
/// Block-read coalescing gap-merge threshold (bytes). When two same-DB tags are
/// within this many bytes of each other the planner folds them into a single
/// <c>Plc.ReadBytesAsync</c> request and slices the response client-side. The
/// default <see cref="S7BlockCoalescingPlanner.DefaultGapMergeBytes"/> = 16 bytes
/// trades a minor over-fetch for one fewer PDU round-trip — over-fetching 16
/// bytes is cheaper than the ~30-byte S7 request frame.
/// </summary>
/// <remarks>
/// Raise the threshold for chatty PLCs where PDU round-trips dominate latency
/// (S7-1200 with default 240-byte PDU); lower it when DBs are sparsely populated
/// so the over-fetch cost outweighs the saved PDU. Setting to 0 disables gap
/// merging entirely — only literally adjacent ranges (gap == 0) coalesce.
/// STRING / WSTRING / CHAR / WCHAR / structured-timestamp / array tags always
/// opt out of merging regardless of this knob.
/// </remarks>
public int BlockCoalescingGapBytes { get; init; } = S7BlockCoalescingPlanner.DefaultGapMergeBytes;
}
public sealed class S7ProbeOptions

View File

@@ -23,6 +23,7 @@
<ItemGroup>
<InternalsVisibleTo Include="ZB.MOM.WW.OtOpcUa.Driver.S7.Tests"/>
<InternalsVisibleTo Include="ZB.MOM.WW.OtOpcUa.Driver.S7.IntegrationTests"/>
</ItemGroup>
</Project>