Files
lmxopcua/src/ZB.MOM.WW.OtOpcUa.Driver.S7/S7Driver.cs
2026-04-26 01:14:59 -04:00

1574 lines
77 KiB
C#
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
using System.Buffers.Binary;
using System.Collections.Generic;
using S7.Net;
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
namespace ZB.MOM.WW.OtOpcUa.Driver.S7;
/// <summary>
/// Siemens S7 native driver — speaks S7comm over ISO-on-TCP (port 102) via the S7netplus
/// library. First implementation of <see cref="IDriver"/> for an in-process .NET Standard
/// PLC protocol that is NOT Modbus, validating that the v2 driver-capability interfaces
/// generalize beyond Modbus + Galaxy.
/// </summary>
/// <remarks>
/// <para>
/// PR 62 ships the scaffold: <see cref="IDriver"/> only (Initialize / Reinitialize /
/// Shutdown / GetHealth). <see cref="ITagDiscovery"/>, <see cref="IReadable"/>,
/// <see cref="IWritable"/>, <see cref="ISubscribable"/>, <see cref="IHostConnectivityProbe"/>
/// land in PRs 63-65 once the address parser (PR 63) is in place.
/// </para>
/// <para>
/// <b>Single-connection policy</b>: S7netplus documented pattern is one
/// <c>Plc</c> instance per PLC, serialized with a <see cref="SemaphoreSlim"/>.
/// Parallelising reads against a single S7 CPU doesn't help — the CPU scans the
/// communication mailbox at most once per cycle (2-10 ms) and queues concurrent
/// requests wire-side anyway. Multiple client-side connections just waste the CPU's
/// 8-64 connection-resource budget.
/// </para>
/// </remarks>
public sealed class S7Driver(S7DriverOptions options, string driverInstanceId)
: IDriver, ITagDiscovery, IReadable, IWritable, ISubscribable, IHostConnectivityProbe, IDisposable, IAsyncDisposable
{
// ---- ISubscribable + IHostConnectivityProbe state ----
private readonly System.Collections.Concurrent.ConcurrentDictionary<long, SubscriptionState> _subscriptions = new();
private long _nextSubscriptionId;
private readonly object _probeLock = new();
private HostState _hostState = HostState.Unknown;
private DateTime _hostStateChangedUtc = DateTime.UtcNow;
private CancellationTokenSource? _probeCts;
public event EventHandler<DataChangeEventArgs>? OnDataChange;
public event EventHandler<HostStatusChangedEventArgs>? OnHostStatusChanged;
/// <summary>OPC UA StatusCode used when the tag name isn't in the driver's tag map.</summary>
private const uint StatusBadNodeIdUnknown = 0x80340000u;
/// <summary>OPC UA StatusCode used when the tag's data type isn't implemented yet.</summary>
private const uint StatusBadNotSupported = 0x803D0000u;
/// <summary>OPC UA StatusCode used when the tag is declared read-only.</summary>
private const uint StatusBadNotWritable = 0x803B0000u;
/// <summary>OPC UA StatusCode used when write fails validation (e.g. out-of-range value).</summary>
private const uint StatusBadInternalError = 0x80020000u;
/// <summary>OPC UA StatusCode used for socket / timeout / protocol-layer faults.</summary>
private const uint StatusBadCommunicationError = 0x80050000u;
/// <summary>OPC UA StatusCode used when S7 returns <c>ErrorCode.WrongCPU</c> / PUT/GET disabled.</summary>
private const uint StatusBadDeviceFailure = 0x80550000u;
/// <summary>
/// Hard upper bound on <see cref="S7TagDefinition.ElementCount"/>. The S7 PDU envelope
/// for negotiated default 240-byte and extended 960-byte payloads cannot fit a single
/// byte-range read larger than ~960 bytes, so a Float64 array of more than ~120
/// elements is already lossy. 8000 is an order-of-magnitude generous ceiling that still
/// rejects obvious config typos (e.g. ElementCount = 65535) at init time.
/// </summary>
internal const int MaxArrayElements = 8000;
private readonly Dictionary<string, S7TagDefinition> _tagsByName = new(StringComparer.OrdinalIgnoreCase);
private readonly Dictionary<string, S7ParsedAddress> _parsedByName = new(StringComparer.OrdinalIgnoreCase);
private readonly S7DriverOptions _options = options;
private readonly SemaphoreSlim _gate = new(1, 1);
/// <summary>
/// Per-connection gate. Internal so PRs 63-65 (read/write/subscribe) can serialize on
/// the same semaphore without exposing it publicly. Single-connection-per-PLC is a
/// hard requirement of S7netplus — see class remarks.
/// </summary>
internal SemaphoreSlim Gate => _gate;
/// <summary>
/// Active S7.Net PLC connection. Null until <see cref="InitializeAsync"/> returns; null
/// after <see cref="ShutdownAsync"/>. Read-only outside this class; PR 64's Read/Write
/// will take the <see cref="_gate"/> before touching it.
/// </summary>
internal Plc? Plc { get; private set; }
private DriverHealth _health = new(DriverState.Unknown, null, null);
private bool _disposed;
// ---- Block-read coalescing diagnostics (PR-S7-B2) ----
//
// Counters surface through DriverHealth.Diagnostics so the driver-diagnostics
// RPC and integration tests can verify wire-level reduction without needing
// access to the underlying S7.Net PDU stream. Names match the
// "<DriverType>.<Counter>" convention adopted for the modbus and opcuaclient
// drivers — see decision #154.
private long _totalBlockReads; // Plc.ReadBytesAsync calls issued by the coalesced path
private long _totalMultiVarBatches; // Plc.ReadMultipleVarsAsync calls issued
private long _totalSingleReads; // per-tag ReadOneAsync fallbacks
/// <summary>
/// Negotiated PDU size from the most recent <see cref="Plc.OpenAsync"/>. Snapshotted
/// once into a field so the diagnostics dictionary keeps a stable reading even after
/// the underlying <c>Plc</c> instance is closed (e.g. mid-reinit). Resets to 0 on
/// <see cref="ShutdownAsync"/> so a stale post-disconnect reading never confuses an
/// operator inspecting the driver-diagnostics panel.
/// </summary>
private int _negotiatedPduSize;
/// <summary>
/// Test-only entry point for the negotiated PDU size that's surfaced via
/// <see cref="DriverHealth.Diagnostics"/> as <c>S7.NegotiatedPduSize</c>.
/// </summary>
internal int NegotiatedPduSize => _negotiatedPduSize;
/// <summary>
/// Total <c>Plc.ReadBytesAsync</c> calls the coalesced byte-range path issued.
/// Test-only entry point for the integration assertion that 50 contiguous DBWs
/// coalesce into exactly 1 byte-range read.
/// </summary>
internal long TotalBlockReads => Interlocked.Read(ref _totalBlockReads);
/// <summary>
/// Total <c>Plc.ReadMultipleVarsAsync</c> batches issued. For a fully-coalesced
/// contiguous workload this stays at 0 — every tag flows through the byte-range
/// path instead.
/// </summary>
internal long TotalMultiVarBatches => Interlocked.Read(ref _totalMultiVarBatches);
public string DriverInstanceId => driverInstanceId;
public string DriverType => "S7";
public async Task InitializeAsync(string driverConfigJson, CancellationToken cancellationToken)
{
_health = new DriverHealth(DriverState.Initializing, null, null);
try
{
// Parse + validate every tag before opening the TCP socket so config bugs
// (bad address, oversized array, unsupported array element) surface as
// FormatException without waiting on a connect timeout. Per the v1 driver-config
// story this lets the Admin UI's "Save" round-trip stay sub-second on bad input.
_tagsByName.Clear();
_parsedByName.Clear();
foreach (var t in _options.Tags)
{
// Pass CpuType so V-memory addresses (S7-200 / S7-200 Smart / LOGO!) resolve
// against the device's family-specific DB mapping.
var parsed = S7AddressParser.Parse(t.Address, _options.CpuType); // throws FormatException
if (t.ElementCount is int n && n > 1)
{
// Array sanity: cap at S7 PDU realistic limit, reject variable-width
// element types and BOOL (packed-bit layout) up-front so a config typo
// fails at init instead of surfacing as BadInternalError on every read.
if (n > MaxArrayElements)
throw new FormatException(
$"S7 tag '{t.Name}' ElementCount {n} exceeds S7 PDU realistic limit ({MaxArrayElements})");
if (!IsArrayElementSupported(t.DataType))
throw new FormatException(
$"S7 tag '{t.Name}' DataType {t.DataType} not supported as an array element " +
$"(variable-width string types and BOOL packed-bit arrays are a follow-up)");
}
_tagsByName[t.Name] = t;
_parsedByName[t.Name] = parsed;
}
var plc = BuildPlc();
// S7netplus writes timeouts into the underlying TcpClient via Plc.WriteTimeout /
// Plc.ReadTimeout (milliseconds). Set before OpenAsync so the handshake itself
// honours the bound.
plc.WriteTimeout = (int)_options.Timeout.TotalMilliseconds;
plc.ReadTimeout = (int)_options.Timeout.TotalMilliseconds;
using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
cts.CancelAfter(_options.Timeout);
await plc.OpenAsync(cts.Token).ConfigureAwait(false);
Plc = plc;
// S7netplus exposes the PDU size negotiated during the COTP/S7comm handshake on
// Plc.MaxPDUSize. Snapshot once so the diagnostics surface (S7.NegotiatedPduSize)
// doesn't have to dereference Plc on every BuildDiagnostics() call. Default S7-1500
// CPUs negotiate 240 bytes; CPUs running the extended PDU advertise 480 or 960.
_negotiatedPduSize = plc.MaxPDUSize;
_health = new DriverHealth(DriverState.Healthy, DateTime.UtcNow, null, BuildDiagnostics());
// Kick off the probe loop once the connection is up. Initial HostState stays
// Unknown until the first probe tick succeeds — avoids broadcasting a premature
// Running transition before any PDU round-trip has happened.
if (_options.Probe.Enabled)
{
_probeCts = new CancellationTokenSource();
_ = Task.Run(() => ProbeLoopAsync(_probeCts.Token), _probeCts.Token);
}
}
catch (Exception ex)
{
// Clean up a partially-constructed Plc so a retry from the caller doesn't leak
// the TcpClient. S7netplus's Close() is best-effort and idempotent.
try { Plc?.Close(); } catch { }
Plc = null;
_health = new DriverHealth(DriverState.Faulted, null, ex.Message);
throw;
}
}
public async Task ReinitializeAsync(string driverConfigJson, CancellationToken cancellationToken)
{
await ShutdownAsync(cancellationToken).ConfigureAwait(false);
await InitializeAsync(driverConfigJson, cancellationToken).ConfigureAwait(false);
}
public Task ShutdownAsync(CancellationToken cancellationToken)
{
try { _probeCts?.Cancel(); } catch { }
_probeCts?.Dispose();
_probeCts = null;
// PR-S7-C3 — every subscription owns N partition CTSs; tear them all down so a
// shutdown mid-poll doesn't leave background tasks running against a closed Plc.
foreach (var state in _subscriptions.Values)
{
foreach (var part in state.Partitions)
{
try { part.Cts.Cancel(); } catch { }
part.Cts.Dispose();
}
}
_subscriptions.Clear();
try { Plc?.Close(); } catch { /* best-effort — tearing down anyway */ }
Plc = null;
// Reset the snapshot so a post-shutdown diagnostics read doesn't display a stale
// PDU size from the previous connection. Reinit will repopulate after OpenAsync.
_negotiatedPduSize = 0;
_health = new DriverHealth(DriverState.Unknown, _health.LastSuccessfulRead, null);
return Task.CompletedTask;
}
public DriverHealth GetHealth() => _health;
/// <summary>
/// Approximate memory footprint. The Plc instance + one 240-960 byte PDU buffer is
/// under 4 KB; return 0 because the <see cref="IDriver"/> contract asks for a
/// driver-attributable growth number and S7.Net doesn't expose one.
/// </summary>
public long GetMemoryFootprint() => 0;
public Task FlushOptionalCachesAsync(CancellationToken cancellationToken) => Task.CompletedTask;
// ---- IReadable ----
public async Task<IReadOnlyList<DataValueSnapshot>> ReadAsync(
IReadOnlyList<string> fullReferences, CancellationToken cancellationToken)
{
var plc = RequirePlc();
var now = DateTime.UtcNow;
var results = new DataValueSnapshot[fullReferences.Count];
await _gate.WaitAsync(cancellationToken).ConfigureAwait(false);
try
{
// Phase 1: classify each request into (a) unknown / not-found, (b) packable
// scalar (Bool/Byte/Int16/UInt16/Int32/UInt32/Float32/Float64) which can
// potentially coalesce into a byte-range read, or (c) per-tag fallback
// (arrays, strings, dates, 64-bit ints, UDT-fanout). Packable tags feed
// the block-coalescing planner first (PR-S7-B2); whatever survives as a
// singleton range falls through to the multi-var packer (PR-S7-B1).
var packableIndexes = new List<int>(fullReferences.Count);
var fallbackIndexes = new List<int>();
for (var i = 0; i < fullReferences.Count; i++)
{
var name = fullReferences[i];
if (!_tagsByName.TryGetValue(name, out var tag))
{
results[i] = new DataValueSnapshot(null, StatusBadNodeIdUnknown, null, now);
continue;
}
var addr = _parsedByName[name];
if (S7ReadPacker.IsPackable(tag, addr)) packableIndexes.Add(i);
else fallbackIndexes.Add(i);
}
// Phase 2a: block-read coalescing — group same-area / same-DB packable
// tags into contiguous byte ranges (gap-merge threshold from
// S7DriverOptions.BlockCoalescingGapBytes, default 16). Multi-tag ranges
// dispatch via Plc.ReadBytesAsync; singleton ranges fall through to the
// multi-var packer below.
var singletons = new List<int>();
if (packableIndexes.Count > 0)
{
var specs = new List<S7BlockCoalescingPlanner.TagSpec>(packableIndexes.Count);
foreach (var idx in packableIndexes)
{
var tag = _tagsByName[fullReferences[idx]];
var addr = _parsedByName[fullReferences[idx]];
specs.Add(new S7BlockCoalescingPlanner.TagSpec(
CallerIndex: idx,
Area: addr.Area,
DbNumber: addr.DbNumber,
StartByte: addr.ByteOffset,
ByteCount: S7BlockCoalescingPlanner.ScalarByteCount(addr.Size),
OpaqueSize: false));
}
var ranges = S7BlockCoalescingPlanner.Plan(specs, _options.BlockCoalescingGapBytes);
foreach (var range in ranges)
{
if (range.Tags.Count == 1)
{
// Singleton — let the multi-var packer batch it with other
// singletons in the same ReadAsync call. Cheaper than its
// own one-tag ReadBytesAsync round-trip.
singletons.Add(range.Tags[0].CallerIndex);
}
else
{
await ReadCoalescedRangeAsync(plc, range, fullReferences, results, now, cancellationToken)
.ConfigureAwait(false);
}
}
}
// Phase 2b: bin-pack residual singletons through ReadMultipleVarsAsync.
// On a per-batch S7.Net failure the whole batch falls back to ReadOneAsync
// per tag — that way one bad item doesn't poison the rest of the batch
// and each tag still gets its own per-item StatusCode (BadDeviceFailure
// for PUT/GET refusal, BadCommunicationError for transport faults).
if (singletons.Count > 0)
{
var budget = S7ReadPacker.ItemBudget(S7ReadPacker.DefaultPduSize);
var batches = S7ReadPacker.BinPack(singletons, budget);
foreach (var batch in batches)
{
await ReadBatchAsync(plc, batch, fullReferences, results, now, cancellationToken)
.ConfigureAwait(false);
}
}
// Phase 3: per-tag fallback for everything that can't pack into a single
// DataItem. Keeps the existing decode path as the source of truth for
// string/date/array/64-bit semantics.
foreach (var i in fallbackIndexes)
{
var tag = _tagsByName[fullReferences[i]];
results[i] = await ReadOneAsSnapshotAsync(plc, tag, now, cancellationToken)
.ConfigureAwait(false);
}
}
finally { _gate.Release(); }
return results;
}
/// <summary>
/// Issue one coalesced <c>Plc.ReadBytesAsync</c> covering
/// <paramref name="range"/> and slice the response per tag. On a transport
/// fault the whole range falls back to per-tag <see cref="ReadOneAsSnapshotAsync"/>
/// so a single bad slot doesn't poison N-1 good neighbours.
/// </summary>
private async Task ReadCoalescedRangeAsync(
global::S7.Net.Plc plc,
S7BlockCoalescingPlanner.BlockReadRange range,
IReadOnlyList<string> fullReferences,
DataValueSnapshot[] results,
DateTime now,
CancellationToken ct)
{
byte[]? buf;
try
{
Interlocked.Increment(ref _totalBlockReads);
buf = await plc.ReadBytesAsync(MapArea(range.Area), range.DbNumber, range.StartByte, range.ByteCount, ct)
.ConfigureAwait(false);
}
catch (Exception)
{
// Block read fault → fan out per-tag so a bad address in the block
// surfaces its own StatusCode and good neighbours can still retry
// through the per-tag fallback path.
foreach (var slice in range.Tags)
{
var tag = _tagsByName[fullReferences[slice.CallerIndex]];
results[slice.CallerIndex] = await ReadOneAsSnapshotAsync(plc, tag, now, ct).ConfigureAwait(false);
}
return;
}
if (buf is null || buf.Length != range.ByteCount)
{
// Short / truncated PDU — same fan-out semantics as a transport fault.
foreach (var slice in range.Tags)
{
results[slice.CallerIndex] = new DataValueSnapshot(null, StatusBadCommunicationError, null, now);
}
return;
}
foreach (var slice in range.Tags)
{
var name = fullReferences[slice.CallerIndex];
var tag = _tagsByName[name];
var addr = _parsedByName[name];
try
{
var value = DecodeScalarFromBlock(buf, slice.OffsetInBlock, tag, addr);
results[slice.CallerIndex] = new DataValueSnapshot(value, 0u, now, now);
}
catch (Exception ex)
{
results[slice.CallerIndex] = new DataValueSnapshot(null, StatusBadInternalError, null, now);
_health = new DriverHealth(DriverState.Degraded, _health.LastSuccessfulRead, ex.Message);
}
}
_health = new DriverHealth(DriverState.Healthy, now, null, BuildDiagnostics());
}
/// <summary>
/// Decode one packable scalar from a coalesced byte buffer. Mirrors the
/// reinterpret table in <see cref="S7ReadPacker.DecodePackedValue"/> so the
/// coalesced and per-tag-batch paths produce identical .NET types for the
/// same wire bytes.
/// </summary>
private static object DecodeScalarFromBlock(byte[] buf, int offset, S7TagDefinition tag, S7ParsedAddress addr)
{
return (tag.DataType, addr.Size) switch
{
(S7DataType.Bool, S7Size.Bit) => ((buf[offset] >> addr.BitOffset) & 0x1) == 1,
(S7DataType.Byte, S7Size.Byte) => buf[offset],
(S7DataType.UInt16, S7Size.Word) => BinaryPrimitives.ReadUInt16BigEndian(buf.AsSpan(offset, 2)),
(S7DataType.Int16, S7Size.Word) => BinaryPrimitives.ReadInt16BigEndian(buf.AsSpan(offset, 2)),
(S7DataType.UInt32, S7Size.DWord) => BinaryPrimitives.ReadUInt32BigEndian(buf.AsSpan(offset, 4)),
(S7DataType.Int32, S7Size.DWord) => BinaryPrimitives.ReadInt32BigEndian(buf.AsSpan(offset, 4)),
(S7DataType.Float32, S7Size.DWord) =>
BitConverter.UInt32BitsToSingle(BinaryPrimitives.ReadUInt32BigEndian(buf.AsSpan(offset, 4))),
(S7DataType.Float64, S7Size.LWord) =>
BitConverter.UInt64BitsToDouble(BinaryPrimitives.ReadUInt64BigEndian(buf.AsSpan(offset, 8))),
_ => throw new System.IO.InvalidDataException(
$"S7 block-decode: tag '{tag.Name}' declared {tag.DataType} but address parsed Size={addr.Size}"),
};
}
/// <summary>
/// Snapshot of the wire-level coalescing counters surfaced through
/// <see cref="DriverHealth.Diagnostics"/>. Names follow the
/// <c>"&lt;DriverType&gt;.&lt;Counter&gt;"</c> convention so the driver-diagnostics
/// RPC can render them in the Admin UI alongside Modbus / OPC UA Client
/// metrics without a per-driver special-case.
/// </summary>
private IReadOnlyDictionary<string, double> BuildDiagnostics() => new Dictionary<string, double>
{
["S7.TotalBlockReads"] = Interlocked.Read(ref _totalBlockReads),
["S7.TotalMultiVarBatches"] = Interlocked.Read(ref _totalMultiVarBatches),
["S7.TotalSingleReads"] = Interlocked.Read(ref _totalSingleReads),
// Negotiated PDU size from the COTP/S7comm handshake — 240 bytes on a default
// S7-1500 CPU, 480 or 960 on CPUs running the extended PDU. 0 before connect /
// after shutdown so an operator can tell the driver isn't currently online.
["S7.NegotiatedPduSize"] = _negotiatedPduSize,
};
/// <summary>
/// Read one packed batch via <c>Plc.ReadMultipleVarsAsync</c>. On batch
/// success each <c>DataItem.Value</c> decodes into its tag's snapshot
/// slot; on batch failure each tag in the batch falls back to
/// <see cref="ReadOneAsSnapshotAsync"/> so the failure fans out per-tag instead
/// of poisoning the whole batch with one StatusCode.
/// </summary>
private async Task ReadBatchAsync(
global::S7.Net.Plc plc,
IReadOnlyList<int> batchIndexes,
IReadOnlyList<string> fullReferences,
DataValueSnapshot[] results,
DateTime now,
CancellationToken ct)
{
var items = new List<global::S7.Net.Types.DataItem>(batchIndexes.Count);
foreach (var idx in batchIndexes)
{
var name = fullReferences[idx];
items.Add(S7ReadPacker.BuildDataItem(_tagsByName[name], _parsedByName[name]));
}
try
{
Interlocked.Increment(ref _totalMultiVarBatches);
var responses = await plc.ReadMultipleVarsAsync(items, ct).ConfigureAwait(false);
// S7.Net mutates the input list in place and also returns it; iterate by
// index against the input list so we are agnostic to either contract.
for (var k = 0; k < batchIndexes.Count; k++)
{
var idx = batchIndexes[k];
var tag = _tagsByName[fullReferences[idx]];
var raw = (responses != null && k < responses.Count ? responses[k] : items[k]).Value;
if (raw is null)
{
results[idx] = new DataValueSnapshot(null, StatusBadCommunicationError, null, now);
continue;
}
try
{
var decoded = S7ReadPacker.DecodePackedValue(tag, raw);
results[idx] = new DataValueSnapshot(decoded, 0u, now, now);
}
catch (Exception ex)
{
results[idx] = new DataValueSnapshot(null, StatusBadInternalError, null, now);
_health = new DriverHealth(DriverState.Degraded, _health.LastSuccessfulRead, ex.Message);
}
}
_health = new DriverHealth(DriverState.Healthy, now, null, BuildDiagnostics());
}
catch (Exception)
{
// Batch-level fault: most likely a single bad address poisoned the
// multi-var response. Fall back to ReadOneAsync per tag in the batch so
// good tags still surface a value and the offender gets its own StatusCode.
foreach (var idx in batchIndexes)
{
var tag = _tagsByName[fullReferences[idx]];
results[idx] = await ReadOneAsSnapshotAsync(plc, tag, now, ct).ConfigureAwait(false);
}
}
}
/// <summary>
/// Single-tag read wrapped as a <see cref="DataValueSnapshot"/> with the same
/// exception-to-StatusCode mapping the legacy per-tag loop applied. Shared
/// between the fallback path and the post-batch retry path so the failure
/// surface stays identical.
/// </summary>
private async Task<DataValueSnapshot> ReadOneAsSnapshotAsync(
global::S7.Net.Plc plc, S7TagDefinition tag, DateTime now, CancellationToken ct)
{
try
{
Interlocked.Increment(ref _totalSingleReads);
var value = await ReadOneAsync(plc, tag, ct).ConfigureAwait(false);
_health = new DriverHealth(DriverState.Healthy, now, null);
return new DataValueSnapshot(value, 0u, now, now);
}
catch (NotSupportedException)
{
return new DataValueSnapshot(null, StatusBadNotSupported, null, now);
}
catch (global::S7.Net.PlcException pex)
{
_health = new DriverHealth(DriverState.Degraded, _health.LastSuccessfulRead, pex.Message);
return new DataValueSnapshot(null, StatusBadDeviceFailure, null, now);
}
catch (Exception ex)
{
_health = new DriverHealth(DriverState.Degraded, _health.LastSuccessfulRead, ex.Message);
return new DataValueSnapshot(null, StatusBadCommunicationError, null, now);
}
}
private async Task<object> ReadOneAsync(global::S7.Net.Plc plc, S7TagDefinition tag, CancellationToken ct)
{
var addr = _parsedByName[tag.Name];
// 1-D array path: one byte-range read covering N×elementBytes, sliced client-side.
// Init-time validation guarantees only fixed-width element types reach here.
if (tag.ElementCount is int n && n > 1)
{
var elemBytes = ArrayElementBytes(tag.DataType);
var totalBytes = checked(n * elemBytes);
if (addr.Size == S7Size.Bit)
throw new System.IO.InvalidDataException(
$"S7 Read type-mismatch: tag '{tag.Name}' is array of {tag.DataType} but address '{tag.Address}' " +
$"parsed as bit-access; arrays require byte-addressing");
var arrBytes = await plc.ReadBytesAsync(MapArea(addr.Area), addr.DbNumber, addr.ByteOffset, totalBytes, ct)
.ConfigureAwait(false);
if (arrBytes is null || arrBytes.Length != totalBytes)
throw new System.IO.InvalidDataException(
$"S7.Net returned {arrBytes?.Length ?? 0} bytes for array '{tag.Address}' (n={n}), expected {totalBytes}");
return SliceArray(arrBytes, tag.DataType, n, elemBytes);
}
// String-shaped types (STRING/WSTRING/CHAR/WCHAR): S7.Net's string-keyed ReadAsync
// has no syntax for these, so the driver issues a raw byte read and decodes via
// S7StringCodec. Wire order is big-endian for the WSTRING/WCHAR UTF-16 payload.
if (tag.DataType is S7DataType.String or S7DataType.WString or S7DataType.Char or S7DataType.WChar)
{
if (addr.Size == S7Size.Bit)
throw new System.IO.InvalidDataException(
$"S7 Read type-mismatch: tag '{tag.Name}' declared {tag.DataType} but address '{tag.Address}' " +
$"parsed as bit-access; string-shaped types require byte-addressing (e.g. DBB / MB / IB / QB)");
var (area, dbNum, off) = (addr.Area, addr.DbNumber, addr.ByteOffset);
switch (tag.DataType)
{
case S7DataType.Char:
{
var b = await plc.ReadBytesAsync(MapArea(area), dbNum, off, 1, ct).ConfigureAwait(false);
if (b is null || b.Length != 1)
throw new System.IO.InvalidDataException($"S7.Net returned {b?.Length ?? 0} bytes for CHAR '{tag.Address}', expected 1");
return S7StringCodec.DecodeChar(b);
}
case S7DataType.WChar:
{
var b = await plc.ReadBytesAsync(MapArea(area), dbNum, off, 2, ct).ConfigureAwait(false);
if (b is null || b.Length != 2)
throw new System.IO.InvalidDataException($"S7.Net returned {b?.Length ?? 0} bytes for WCHAR '{tag.Address}', expected 2");
return S7StringCodec.DecodeWChar(b);
}
case S7DataType.String:
{
var max = tag.StringLength;
var size = S7StringCodec.StringBufferSize(max);
var b = await plc.ReadBytesAsync(MapArea(area), dbNum, off, size, ct).ConfigureAwait(false);
if (b is null || b.Length != size)
throw new System.IO.InvalidDataException($"S7.Net returned {b?.Length ?? 0} bytes for STRING '{tag.Address}', expected {size}");
return S7StringCodec.DecodeString(b, max);
}
case S7DataType.WString:
{
var max = tag.StringLength;
var size = S7StringCodec.WStringBufferSize(max);
var b = await plc.ReadBytesAsync(MapArea(area), dbNum, off, size, ct).ConfigureAwait(false);
if (b is null || b.Length != size)
throw new System.IO.InvalidDataException($"S7.Net returned {b?.Length ?? 0} bytes for WSTRING '{tag.Address}', expected {size}");
return S7StringCodec.DecodeWString(b, max);
}
}
}
// Date/time-shaped types (DTL/DT/S5TIME/TIME/TOD/DATE): S7.Net has no native size
// suffix for any of these, so the driver issues a raw byte read at the address's
// ByteOffset and decodes via S7DateTimeCodec. All require byte-addressing — bit-
// access against a date/time tag is a config bug worth surfacing as a hard error.
if (tag.DataType is S7DataType.Dtl or S7DataType.DateAndTime or S7DataType.S5Time
or S7DataType.Time or S7DataType.TimeOfDay or S7DataType.Date)
{
if (addr.Size == S7Size.Bit)
throw new System.IO.InvalidDataException(
$"S7 Read type-mismatch: tag '{tag.Name}' declared {tag.DataType} but address '{tag.Address}' " +
$"parsed as bit-access; date/time types require byte-addressing");
int size = tag.DataType switch
{
S7DataType.Dtl => S7DateTimeCodec.DtlSize,
S7DataType.DateAndTime => S7DateTimeCodec.DtSize,
S7DataType.S5Time => S7DateTimeCodec.S5TimeSize,
S7DataType.Time => S7DateTimeCodec.TimeSize,
S7DataType.TimeOfDay => S7DateTimeCodec.TodSize,
S7DataType.Date => S7DateTimeCodec.DateSize,
_ => throw new InvalidOperationException(),
};
var b = await plc.ReadBytesAsync(MapArea(addr.Area), addr.DbNumber, addr.ByteOffset, size, ct).ConfigureAwait(false);
if (b is null || b.Length != size)
throw new System.IO.InvalidDataException(
$"S7.Net returned {b?.Length ?? 0} bytes for {tag.DataType} '{tag.Address}', expected {size}");
return tag.DataType switch
{
S7DataType.Dtl => S7DateTimeCodec.DecodeDtl(b),
S7DataType.DateAndTime => S7DateTimeCodec.DecodeDt(b),
// S5TIME/TIME/TOD surface as Int32 ms — DriverDataType has no Duration type;
// OPC UA clients see a millisecond integer matching the IEC-1131 convention.
S7DataType.S5Time => (int)S7DateTimeCodec.DecodeS5Time(b).TotalMilliseconds,
S7DataType.Time => (int)S7DateTimeCodec.DecodeTime(b).TotalMilliseconds,
S7DataType.TimeOfDay => (int)S7DateTimeCodec.DecodeTod(b).TotalMilliseconds,
S7DataType.Date => S7DateTimeCodec.DecodeDate(b),
_ => throw new InvalidOperationException(),
};
}
// 64-bit types: S7.Net's string-based ReadAsync has no LWord size suffix, so issue an
// 8-byte ReadBytesAsync and convert big-endian in-process. Wire order on S7 is BE.
if (tag.DataType is S7DataType.Int64 or S7DataType.UInt64 or S7DataType.Float64)
{
if (addr.Size != S7Size.LWord)
throw new System.IO.InvalidDataException(
$"S7 Read type-mismatch: tag '{tag.Name}' declared {tag.DataType} but address '{tag.Address}' " +
$"parsed as Size={addr.Size}; 64-bit types require an LD/DBL/DBLD suffix");
var bytes = await plc.ReadBytesAsync(MapArea(addr.Area), addr.DbNumber, addr.ByteOffset, 8, ct)
.ConfigureAwait(false);
if (bytes is null || bytes.Length != 8)
throw new System.IO.InvalidDataException($"S7.Net returned {bytes?.Length ?? 0} bytes for '{tag.Address}', expected 8");
return tag.DataType switch
{
S7DataType.Int64 => BinaryPrimitives.ReadInt64BigEndian(bytes),
S7DataType.UInt64 => BinaryPrimitives.ReadUInt64BigEndian(bytes),
S7DataType.Float64 => BitConverter.UInt64BitsToDouble(BinaryPrimitives.ReadUInt64BigEndian(bytes)),
_ => throw new InvalidOperationException(),
};
}
// S7.Net's string-based ReadAsync returns object where the boxed .NET type depends on
// the size suffix: DBX=bool, DBB=byte, DBW=ushort, DBD=uint. Our S7DataType enum
// specifies the SEMANTIC type (Int16 vs UInt16 vs Float32 etc.); the reinterpret below
// converts the raw unsigned boxed value into the requested type without issuing an
// extra PLC round-trip.
var raw = await plc.ReadAsync(tag.Address, ct).ConfigureAwait(false)
?? throw new System.IO.InvalidDataException($"S7.Net returned null for '{tag.Address}'");
return (tag.DataType, addr.Size, raw) switch
{
(S7DataType.Bool, S7Size.Bit, bool b) => b,
(S7DataType.Byte, S7Size.Byte, byte by) => by,
(S7DataType.UInt16, S7Size.Word, ushort u16) => u16,
(S7DataType.Int16, S7Size.Word, ushort u16) => unchecked((short)u16),
(S7DataType.UInt32, S7Size.DWord, uint u32) => u32,
(S7DataType.Int32, S7Size.DWord, uint u32) => unchecked((int)u32),
(S7DataType.Float32, S7Size.DWord, uint u32) => BitConverter.UInt32BitsToSingle(u32),
(S7DataType.DateTime, _, _) => throw new NotSupportedException("S7 DateTime reads land in a follow-up PR"),
_ => throw new System.IO.InvalidDataException(
$"S7 Read type-mismatch: tag '{tag.Name}' declared {tag.DataType} but address '{tag.Address}' " +
$"parsed as Size={addr.Size}; S7.Net returned {raw.GetType().Name}"),
};
}
/// <summary>Map driver-internal <see cref="S7Area"/> to S7.Net's <see cref="global::S7.Net.DataType"/>.</summary>
private static global::S7.Net.DataType MapArea(S7Area area) => area switch
{
S7Area.DataBlock => global::S7.Net.DataType.DataBlock,
S7Area.Memory => global::S7.Net.DataType.Memory,
S7Area.Input => global::S7.Net.DataType.Input,
S7Area.Output => global::S7.Net.DataType.Output,
S7Area.Timer => global::S7.Net.DataType.Timer,
S7Area.Counter => global::S7.Net.DataType.Counter,
_ => throw new InvalidOperationException($"Unknown S7Area {area}"),
};
// ---- IWritable ----
public async Task<IReadOnlyList<WriteResult>> WriteAsync(
IReadOnlyList<WriteRequest> writes, CancellationToken cancellationToken)
{
var plc = RequirePlc();
var results = new WriteResult[writes.Count];
await _gate.WaitAsync(cancellationToken).ConfigureAwait(false);
try
{
for (var i = 0; i < writes.Count; i++)
{
var w = writes[i];
if (!_tagsByName.TryGetValue(w.FullReference, out var tag))
{
results[i] = new WriteResult(StatusBadNodeIdUnknown);
continue;
}
if (!tag.Writable)
{
results[i] = new WriteResult(StatusBadNotWritable);
continue;
}
try
{
await WriteOneAsync(plc, tag, w.Value, cancellationToken).ConfigureAwait(false);
results[i] = new WriteResult(0u);
}
catch (NotSupportedException)
{
results[i] = new WriteResult(StatusBadNotSupported);
}
catch (global::S7.Net.PlcException)
{
results[i] = new WriteResult(StatusBadDeviceFailure);
}
catch (Exception)
{
results[i] = new WriteResult(StatusBadInternalError);
}
}
}
finally { _gate.Release(); }
return results;
}
private async Task WriteOneAsync(global::S7.Net.Plc plc, S7TagDefinition tag, object? value, CancellationToken ct)
{
// 1-D array path: pack all N elements into a single buffer then push via WriteBytesAsync.
// Init-time validation guarantees only fixed-width element types reach here.
if (tag.ElementCount is int n && n > 1)
{
var addr = _parsedByName[tag.Name];
if (addr.Size == S7Size.Bit)
throw new InvalidOperationException(
$"S7 Write type-mismatch: tag '{tag.Name}' is array of {tag.DataType} but address '{tag.Address}' " +
$"parsed as bit-access; arrays require byte-addressing");
if (value is null)
throw new ArgumentNullException(nameof(value));
var elemBytes = ArrayElementBytes(tag.DataType);
var buf = PackArray(value, tag.DataType, n, elemBytes, tag.Name);
await plc.WriteBytesAsync(MapArea(addr.Area), addr.DbNumber, addr.ByteOffset, buf, ct).ConfigureAwait(false);
return;
}
// String-shaped types: encode via S7StringCodec then push via WriteBytesAsync. The
// codec rejects out-of-range lengths and non-ASCII for CHAR — we let the resulting
// ArgumentException bubble out so the WriteAsync caller maps it to BadInternalError.
if (tag.DataType is S7DataType.String or S7DataType.WString or S7DataType.Char or S7DataType.WChar)
{
var addr = _parsedByName[tag.Name];
if (addr.Size == S7Size.Bit)
throw new InvalidOperationException(
$"S7 Write type-mismatch: tag '{tag.Name}' declared {tag.DataType} but address '{tag.Address}' " +
$"parsed as bit-access; string-shaped types require byte-addressing (e.g. DBB / MB / IB / QB)");
byte[] payload = tag.DataType switch
{
S7DataType.Char => S7StringCodec.EncodeChar(Convert.ToChar(value ?? throw new ArgumentNullException(nameof(value)))),
S7DataType.WChar => S7StringCodec.EncodeWChar(Convert.ToChar(value ?? throw new ArgumentNullException(nameof(value)))),
S7DataType.String => S7StringCodec.EncodeString(Convert.ToString(value) ?? string.Empty, tag.StringLength),
S7DataType.WString => S7StringCodec.EncodeWString(Convert.ToString(value) ?? string.Empty, tag.StringLength),
_ => throw new InvalidOperationException(),
};
await plc.WriteBytesAsync(MapArea(addr.Area), addr.DbNumber, addr.ByteOffset, payload, ct).ConfigureAwait(false);
return;
}
// Date/time-shaped types: encode via S7DateTimeCodec and push as raw bytes. S5TIME /
// TIME / TOD accept an integer-ms input (matching the read surface); DTL / DT / DATE
// accept a DateTime. ArgumentException from the codec bubbles to BadInternalError.
if (tag.DataType is S7DataType.Dtl or S7DataType.DateAndTime or S7DataType.S5Time
or S7DataType.Time or S7DataType.TimeOfDay or S7DataType.Date)
{
var addr = _parsedByName[tag.Name];
if (addr.Size == S7Size.Bit)
throw new InvalidOperationException(
$"S7 Write type-mismatch: tag '{tag.Name}' declared {tag.DataType} but address '{tag.Address}' " +
$"parsed as bit-access; date/time types require byte-addressing");
if (value is null)
throw new ArgumentNullException(nameof(value));
byte[] payload = tag.DataType switch
{
S7DataType.Dtl => S7DateTimeCodec.EncodeDtl(Convert.ToDateTime(value)),
S7DataType.DateAndTime => S7DateTimeCodec.EncodeDt(Convert.ToDateTime(value)),
S7DataType.S5Time => S7DateTimeCodec.EncodeS5Time(value is TimeSpan ts1 ? ts1 : TimeSpan.FromMilliseconds(Convert.ToInt32(value))),
S7DataType.Time => S7DateTimeCodec.EncodeTime(value is TimeSpan ts2 ? ts2 : TimeSpan.FromMilliseconds(Convert.ToInt32(value))),
S7DataType.TimeOfDay => S7DateTimeCodec.EncodeTod(value is TimeSpan ts3 ? ts3 : TimeSpan.FromMilliseconds(Convert.ToInt64(value))),
S7DataType.Date => S7DateTimeCodec.EncodeDate(Convert.ToDateTime(value)),
_ => throw new InvalidOperationException(),
};
await plc.WriteBytesAsync(MapArea(addr.Area), addr.DbNumber, addr.ByteOffset, payload, ct).ConfigureAwait(false);
return;
}
// 64-bit types: S7.Net has no LWord-aware WriteAsync(string, object) overload, so emit
// the value as 8 big-endian bytes via WriteBytesAsync. Wire order on S7 is BE so a
// BinaryPrimitives.Write*BigEndian round-trips with the matching ReadOneAsync path.
if (tag.DataType is S7DataType.Int64 or S7DataType.UInt64 or S7DataType.Float64)
{
var addr = _parsedByName[tag.Name];
if (addr.Size != S7Size.LWord)
throw new InvalidOperationException(
$"S7 Write type-mismatch: tag '{tag.Name}' declared {tag.DataType} but address '{tag.Address}' " +
$"parsed as Size={addr.Size}; 64-bit types require an LD/DBL/DBLD suffix");
var buf = new byte[8];
switch (tag.DataType)
{
case S7DataType.Int64:
BinaryPrimitives.WriteInt64BigEndian(buf, Convert.ToInt64(value));
break;
case S7DataType.UInt64:
BinaryPrimitives.WriteUInt64BigEndian(buf, Convert.ToUInt64(value));
break;
case S7DataType.Float64:
BinaryPrimitives.WriteUInt64BigEndian(buf, BitConverter.DoubleToUInt64Bits(Convert.ToDouble(value)));
break;
}
await plc.WriteBytesAsync(MapArea(addr.Area), addr.DbNumber, addr.ByteOffset, buf, ct).ConfigureAwait(false);
return;
}
// S7.Net's Plc.WriteAsync(string address, object value) expects the boxed value to
// match the address's size-suffix type: DBX=bool, DBB=byte, DBW=ushort, DBD=uint.
// Our S7DataType lets the caller pass short/int/float; convert to the unsigned
// wire representation before handing off.
var boxed = tag.DataType switch
{
S7DataType.Bool => (object)Convert.ToBoolean(value),
S7DataType.Byte => (object)Convert.ToByte(value),
S7DataType.UInt16 => (object)Convert.ToUInt16(value),
S7DataType.Int16 => (object)unchecked((ushort)Convert.ToInt16(value)),
S7DataType.UInt32 => (object)Convert.ToUInt32(value),
S7DataType.Int32 => (object)unchecked((uint)Convert.ToInt32(value)),
S7DataType.Float32 => (object)BitConverter.SingleToUInt32Bits(Convert.ToSingle(value)),
S7DataType.DateTime => throw new NotSupportedException("S7 DateTime writes land in a follow-up PR"),
_ => throw new InvalidOperationException($"Unknown S7DataType {tag.DataType}"),
};
await plc.WriteAsync(tag.Address, boxed, ct).ConfigureAwait(false);
}
private global::S7.Net.Plc RequirePlc() =>
Plc ?? throw new InvalidOperationException("S7Driver not initialized");
/// <summary>
/// Construct the underlying S7netplus <see cref="Plc"/> honouring
/// <see cref="S7DriverOptions.TsapMode"/>, <see cref="S7DriverOptions.LocalTsap"/>,
/// and <see cref="S7DriverOptions.RemoteTsap"/>. <see cref="TsapMode.Auto"/> falls
/// back to the existing <c>(CpuType, host, port, rack, slot)</c> constructor so the
/// change is opt-in for sites that don't need a non-default class. Other modes go
/// through the raw-TSAP-pair overload, computing the pair from
/// <see cref="S7TsapDefaults"/> and the configured rack/slot, then layering the
/// caller-supplied <see cref="S7DriverOptions.LocalTsap"/> /
/// <see cref="S7DriverOptions.RemoteTsap"/> on top.
/// </summary>
private Plc BuildPlc()
{
if (_options.TsapMode == TsapMode.Auto)
{
// Existing behaviour: S7netplus picks the TSAP pair via TsapPair.GetDefaultTsapPair
// from CpuType + rack + slot. An explicit LocalTsap / RemoteTsap under Auto is
// ignored on purpose — Auto means "let the library decide". Document this in s7.md.
return new Plc(_options.CpuType, _options.Host, _options.Port, _options.Rack, _options.Slot);
}
ushort localTsap;
ushort remoteTsap;
if (_options.TsapMode == TsapMode.Other)
{
if (_options.LocalTsap is not ushort lt || _options.RemoteTsap is not ushort rt)
{
throw new InvalidOperationException(
"S7DriverOptions.TsapMode = Other requires both LocalTsap and RemoteTsap to be set " +
"(no class default exists for Other). Set both, or pick Pg / Op / S7Basic.");
}
localTsap = lt;
remoteTsap = rt;
}
else
{
var classByte = S7TsapDefaults.HighByteFor(_options.TsapMode);
// Compute defaults from the class + configured rack/slot, then let explicit
// overrides win — so e.g. "TsapMode = Pg, LocalTsap = 0x0142" produces a PG-class
// remote with a custom local for sites that need a fixed source-TSAP.
localTsap = _options.LocalTsap ?? S7TsapDefaults.BuildLocalTsap(classByte);
remoteTsap = _options.RemoteTsap ?? S7TsapDefaults.BuildRemoteTsap(
classByte, _options.Rack, _options.Slot);
}
var pair = new global::S7.Net.Protocol.TsapPair(
new global::S7.Net.Protocol.Tsap((byte)(localTsap >> 8), (byte)(localTsap & 0xFF)),
new global::S7.Net.Protocol.Tsap((byte)(remoteTsap >> 8), (byte)(remoteTsap & 0xFF)));
return new Plc(_options.Host, _options.Port, pair);
}
// ---- ITagDiscovery ----
public Task DiscoverAsync(IAddressSpaceBuilder builder, CancellationToken cancellationToken)
{
ArgumentNullException.ThrowIfNull(builder);
var folder = builder.Folder("S7", "S7");
foreach (var t in _options.Tags)
{
var isArr = t.ElementCount is int ec && ec > 1;
folder.Variable(t.Name, t.Name, new DriverAttributeInfo(
FullName: t.Name,
DriverDataType: MapDataType(t.DataType),
IsArray: isArr,
ArrayDim: isArr ? (uint)t.ElementCount!.Value : null,
SecurityClass: t.Writable ? SecurityClassification.Operate : SecurityClassification.ViewOnly,
IsHistorized: false,
IsAlarm: false,
WriteIdempotent: t.WriteIdempotent));
}
return Task.CompletedTask;
}
/// <summary>
/// True when <paramref name="t"/> can be used as an array element. Variable-width string
/// types and BOOL (packed-bit layout) are rejected — both need bespoke addressing
/// beyond a flat <c>N × elementBytes</c> byte-range read and ship as a follow-up.
/// </summary>
internal static bool IsArrayElementSupported(S7DataType t) => t is
S7DataType.Byte or
S7DataType.Int16 or S7DataType.UInt16 or
S7DataType.Int32 or S7DataType.UInt32 or
S7DataType.Int64 or S7DataType.UInt64 or
S7DataType.Float32 or S7DataType.Float64 or
S7DataType.Date or S7DataType.Time or S7DataType.TimeOfDay;
/// <summary>
/// On-wire bytes per array element for the supported fixed-width element types. DATE
/// is a 16-bit days-since-1990 counter, TIME and TOD are 32-bit ms counters.
/// </summary>
internal static int ArrayElementBytes(S7DataType t) => t switch
{
S7DataType.Byte => 1,
S7DataType.Int16 or S7DataType.UInt16 or S7DataType.Date => 2,
S7DataType.Int32 or S7DataType.UInt32 or S7DataType.Float32
or S7DataType.Time or S7DataType.TimeOfDay => 4,
S7DataType.Int64 or S7DataType.UInt64 or S7DataType.Float64 => 8,
_ => throw new InvalidOperationException($"S7 array element bytes undefined for {t}"),
};
/// <summary>
/// Slice a flat S7 byte buffer into a typed array using the existing big-endian scalar
/// codec for each element. Returns the typed array boxed as <c>object</c> so the
/// <see cref="DataValueSnapshot"/> surface can carry it without further conversion.
/// </summary>
internal static object SliceArray(byte[] bytes, S7DataType t, int n, int elemBytes)
{
switch (t)
{
case S7DataType.Byte:
{
var a = new byte[n];
Buffer.BlockCopy(bytes, 0, a, 0, n);
return a;
}
case S7DataType.Int16:
{
var a = new short[n];
for (var i = 0; i < n; i++) a[i] = BinaryPrimitives.ReadInt16BigEndian(bytes.AsSpan(i * elemBytes, 2));
return a;
}
case S7DataType.UInt16:
{
var a = new ushort[n];
for (var i = 0; i < n; i++) a[i] = BinaryPrimitives.ReadUInt16BigEndian(bytes.AsSpan(i * elemBytes, 2));
return a;
}
case S7DataType.Int32:
{
var a = new int[n];
for (var i = 0; i < n; i++) a[i] = BinaryPrimitives.ReadInt32BigEndian(bytes.AsSpan(i * elemBytes, 4));
return a;
}
case S7DataType.UInt32:
{
var a = new uint[n];
for (var i = 0; i < n; i++) a[i] = BinaryPrimitives.ReadUInt32BigEndian(bytes.AsSpan(i * elemBytes, 4));
return a;
}
case S7DataType.Int64:
{
var a = new long[n];
for (var i = 0; i < n; i++) a[i] = BinaryPrimitives.ReadInt64BigEndian(bytes.AsSpan(i * elemBytes, 8));
return a;
}
case S7DataType.UInt64:
{
var a = new ulong[n];
for (var i = 0; i < n; i++) a[i] = BinaryPrimitives.ReadUInt64BigEndian(bytes.AsSpan(i * elemBytes, 8));
return a;
}
case S7DataType.Float32:
{
var a = new float[n];
for (var i = 0; i < n; i++)
a[i] = BitConverter.UInt32BitsToSingle(BinaryPrimitives.ReadUInt32BigEndian(bytes.AsSpan(i * elemBytes, 4)));
return a;
}
case S7DataType.Float64:
{
var a = new double[n];
for (var i = 0; i < n; i++)
a[i] = BitConverter.UInt64BitsToDouble(BinaryPrimitives.ReadUInt64BigEndian(bytes.AsSpan(i * elemBytes, 8)));
return a;
}
case S7DataType.Date:
{
var a = new DateTime[n];
for (var i = 0; i < n; i++)
a[i] = S7DateTimeCodec.DecodeDate(bytes.AsSpan(i * elemBytes, 2));
return a;
}
case S7DataType.Time:
{
// Surface as Int32 ms — matches the scalar Time read path (driver-specs §5).
var a = new int[n];
for (var i = 0; i < n; i++)
a[i] = (int)S7DateTimeCodec.DecodeTime(bytes.AsSpan(i * elemBytes, 4)).TotalMilliseconds;
return a;
}
case S7DataType.TimeOfDay:
{
var a = new int[n];
for (var i = 0; i < n; i++)
a[i] = (int)S7DateTimeCodec.DecodeTod(bytes.AsSpan(i * elemBytes, 4)).TotalMilliseconds;
return a;
}
default:
throw new InvalidOperationException($"S7 array slice undefined for {t}");
}
}
/// <summary>
/// Pack a caller-supplied array (object) into the on-wire S7 byte layout for
/// <paramref name="elementType"/>. Accepts both the strongly-typed array
/// (<c>short[]</c>, <c>int[]</c>, ...) and a generic <c>System.Array</c> / <c>IEnumerable</c>
/// so OPC UA Variant-boxed values flow through unchanged.
/// </summary>
internal static byte[] PackArray(object value, S7DataType elementType, int n, int elemBytes, string tagName)
{
if (value is not System.Collections.IEnumerable enumerable)
throw new ArgumentException($"S7 Write tag '{tagName}' is array but value is not enumerable (got {value.GetType().Name})", nameof(value));
var buf = new byte[n * elemBytes];
var i = 0;
foreach (var raw in enumerable)
{
if (i >= n)
throw new ArgumentException($"S7 Write tag '{tagName}': value has more than ElementCount={n} elements", nameof(value));
var span = buf.AsSpan(i * elemBytes, elemBytes);
switch (elementType)
{
case S7DataType.Byte: span[0] = Convert.ToByte(raw); break;
case S7DataType.Int16: BinaryPrimitives.WriteInt16BigEndian(span, Convert.ToInt16(raw)); break;
case S7DataType.UInt16: BinaryPrimitives.WriteUInt16BigEndian(span, Convert.ToUInt16(raw)); break;
case S7DataType.Int32: BinaryPrimitives.WriteInt32BigEndian(span, Convert.ToInt32(raw)); break;
case S7DataType.UInt32: BinaryPrimitives.WriteUInt32BigEndian(span, Convert.ToUInt32(raw)); break;
case S7DataType.Int64: BinaryPrimitives.WriteInt64BigEndian(span, Convert.ToInt64(raw)); break;
case S7DataType.UInt64: BinaryPrimitives.WriteUInt64BigEndian(span, Convert.ToUInt64(raw)); break;
case S7DataType.Float32: BinaryPrimitives.WriteUInt32BigEndian(span, BitConverter.SingleToUInt32Bits(Convert.ToSingle(raw))); break;
case S7DataType.Float64: BinaryPrimitives.WriteUInt64BigEndian(span, BitConverter.DoubleToUInt64Bits(Convert.ToDouble(raw))); break;
case S7DataType.Date:
S7DateTimeCodec.EncodeDate(Convert.ToDateTime(raw)).CopyTo(span);
break;
case S7DataType.Time:
S7DateTimeCodec.EncodeTime(raw is TimeSpan ts ? ts : TimeSpan.FromMilliseconds(Convert.ToInt32(raw))).CopyTo(span);
break;
case S7DataType.TimeOfDay:
S7DateTimeCodec.EncodeTod(raw is TimeSpan tod ? tod : TimeSpan.FromMilliseconds(Convert.ToInt64(raw))).CopyTo(span);
break;
default:
throw new InvalidOperationException($"S7 array pack undefined for {elementType}");
}
i++;
}
if (i != n)
throw new ArgumentException($"S7 Write tag '{tagName}': value had {i} elements, expected ElementCount={n}", nameof(value));
return buf;
}
private static DriverDataType MapDataType(S7DataType t) => t switch
{
S7DataType.Bool => DriverDataType.Boolean,
S7DataType.Byte => DriverDataType.Int32, // no 8-bit in DriverDataType yet
S7DataType.Int16 => DriverDataType.Int16,
S7DataType.UInt16 => DriverDataType.UInt16,
S7DataType.Int32 => DriverDataType.Int32,
S7DataType.UInt32 => DriverDataType.UInt32,
S7DataType.Int64 => DriverDataType.Int64,
S7DataType.UInt64 => DriverDataType.UInt64,
S7DataType.Float32 => DriverDataType.Float32,
S7DataType.Float64 => DriverDataType.Float64,
S7DataType.String => DriverDataType.String,
S7DataType.WString => DriverDataType.String,
S7DataType.Char => DriverDataType.String,
S7DataType.WChar => DriverDataType.String,
S7DataType.DateTime => DriverDataType.DateTime,
S7DataType.Dtl => DriverDataType.DateTime,
S7DataType.DateAndTime => DriverDataType.DateTime,
S7DataType.Date => DriverDataType.DateTime,
// S5TIME/TIME/TOD have no Duration type in DriverDataType — surface as Int32 ms
// (matching the IEC-1131 representation).
S7DataType.S5Time => DriverDataType.Int32,
S7DataType.Time => DriverDataType.Int32,
S7DataType.TimeOfDay => DriverDataType.Int32,
_ => DriverDataType.Int32,
};
// ---- ISubscribable (polling overlay) ----
/// <summary>
/// PR-S7-C3 — partitions <paramref name="fullReferences"/> by resolved publishing
/// interval (per-tag <see cref="S7TagDefinition.ScanGroup"/> looked up in
/// <see cref="S7DriverOptions.ScanGroupIntervals"/>, falling back to
/// <paramref name="publishingInterval"/>) and starts one background poll loop per
/// distinct interval. The returned <see cref="ISubscriptionHandle"/> is one logical
/// subscription that owns N partition loops; <see cref="UnsubscribeAsync"/> tears
/// them all down together.
/// </summary>
/// <remarks>
/// Each partition shares the per-driver <c>_gate</c> semaphore, so wire-level reads
/// stay strictly serial — the multi-rate split decouples tick cadence (a fast HMI tag
/// isn't blocked behind a slow batch's <c>Task.Delay</c>) but does NOT parallelise
/// mailbox traffic. The "1 connection / 1 mailbox" caveat is documented in
/// <c>docs/v2/s7.md</c>.
/// </remarks>
public Task<ISubscriptionHandle> SubscribeAsync(
IReadOnlyList<string> fullReferences, TimeSpan publishingInterval, CancellationToken cancellationToken)
{
var id = Interlocked.Increment(ref _nextSubscriptionId);
var handle = new S7SubscriptionHandle(id);
// Floor at 100 ms — S7 CPUs scan 2-10 ms but the comms mailbox is processed at most
// once per scan; sub-100 ms polling just queues wire-side with worse latency. The
// floor applies to BOTH the subscribe-default interval AND any per-group override
// so a misconfigured group can't slip below the protective bound.
var defaultInterval = ApplyMinInterval(publishingInterval);
// Bucket tags by resolved interval. Tags with no ScanGroup, or with a group not in
// the rate map, fall back to the subscription-default rate. This preserves the
// legacy single-rate path: an unconfigured driver gets exactly one partition.
var partitions = new Dictionary<TimeSpan, List<string>>();
foreach (var tagRef in fullReferences)
{
var interval = ResolveInterval(tagRef, defaultInterval);
if (!partitions.TryGetValue(interval, out var list))
{
list = [];
partitions[interval] = list;
}
list.Add(tagRef);
}
var partitionStates = new List<PartitionState>(partitions.Count);
foreach (var (interval, refs) in partitions)
{
var partCts = new CancellationTokenSource();
var part = new PartitionState(refs, interval, partCts);
partitionStates.Add(part);
}
var state = new SubscriptionState(handle, [.. fullReferences], defaultInterval, partitionStates);
_subscriptions[id] = state;
// Start each partition loop AFTER the state is registered so an early UnsubscribeAsync
// (e.g. the OPC UA stack tearing the session down on session cancel) doesn't race
// ahead of the partitions' Task.Run kickoff.
foreach (var part in partitionStates)
_ = Task.Run(() => PollLoopAsync(handle, part, part.Cts.Token), part.Cts.Token);
return Task.FromResult<ISubscriptionHandle>(handle);
}
public Task UnsubscribeAsync(ISubscriptionHandle handle, CancellationToken cancellationToken)
{
if (handle is S7SubscriptionHandle h && _subscriptions.TryRemove(h.Id, out var state))
{
foreach (var part in state.Partitions)
{
try { part.Cts.Cancel(); } catch { }
part.Cts.Dispose();
}
}
return Task.CompletedTask;
}
/// <summary>
/// Apply the 100 ms floor to a caller-supplied publishing interval. Internal so
/// <see cref="SubscribeAsync"/> can guard both the default + every per-group rate.
/// </summary>
private static TimeSpan ApplyMinInterval(TimeSpan requested) =>
requested < TimeSpan.FromMilliseconds(100) ? TimeSpan.FromMilliseconds(100) : requested;
/// <summary>
/// Resolve the publishing interval for one tag — <see cref="S7DriverOptions.ScanGroupIntervals"/>
/// wins when the tag's <see cref="S7TagDefinition.ScanGroup"/> is present, otherwise
/// fall back to the subscription default. Unknown tags (not in the driver's map)
/// fall back to the default — the poll loop will surface them as BadNodeIdUnknown
/// anyway via <see cref="ReadAsync"/>.
/// </summary>
internal TimeSpan ResolveInterval(string tagRef, TimeSpan defaultInterval)
{
if (_options.ScanGroupIntervals is { Count: > 0 } map &&
_tagsByName.TryGetValue(tagRef, out var def) &&
!string.IsNullOrWhiteSpace(def.ScanGroup) &&
// Case-insensitive lookup: scan group names come from human-typed config
// and the JSON DTO already lower-cases the lookup, so don't make ScanGroup
// values case-sensitive at runtime either.
TryGetCaseInsensitive(map, def.ScanGroup!, out var groupInterval))
{
return ApplyMinInterval(groupInterval);
}
return defaultInterval;
}
private static bool TryGetCaseInsensitive(IReadOnlyDictionary<string, TimeSpan> map, string key, out TimeSpan value)
{
if (map.TryGetValue(key, out value)) return true;
foreach (var kvp in map)
if (string.Equals(kvp.Key, key, StringComparison.OrdinalIgnoreCase))
{
value = kvp.Value;
return true;
}
value = default;
return false;
}
/// <summary>
/// Test-only: count of distinct partition loops a subscription handle owns. Used by
/// <c>S7ScanGroupPartitioningTests</c> to assert that 3 tags at 3 rates produce 3
/// partitions (and 3 tags at 1 rate produce 1 partition).
/// </summary>
internal int GetPartitionCount(ISubscriptionHandle handle) =>
handle is S7SubscriptionHandle h && _subscriptions.TryGetValue(h.Id, out var state)
? state.Partitions.Count
: 0;
/// <summary>
/// Test-only: snapshot of the (interval, tag-count) pairs for a subscription's
/// partitions. Surfaces the actual partitioning so tests can assert "5 tags split
/// 2 + 3" without grepping the poll-loop internals.
/// </summary>
internal IReadOnlyList<(TimeSpan Interval, int TagCount)> GetPartitionSummary(ISubscriptionHandle handle) =>
handle is S7SubscriptionHandle h && _subscriptions.TryGetValue(h.Id, out var state)
? [.. state.Partitions.Select(p => (p.Interval, p.TagReferences.Count))]
: [];
private async Task PollLoopAsync(S7SubscriptionHandle handle, PartitionState part, CancellationToken ct)
{
// Initial-data push per OPC UA Part 4 convention. Each partition does its own
// initial push: the OPC UA stack receives one DataChange per tag at subscribe time
// regardless of which partition the tag landed in.
try { await PollOnceAsync(handle, part, forceRaise: true, ct).ConfigureAwait(false); }
catch (OperationCanceledException) { return; }
catch { /* first-read error — polling continues */ }
while (!ct.IsCancellationRequested)
{
try { await Task.Delay(part.Interval, ct).ConfigureAwait(false); }
catch (OperationCanceledException) { return; }
try { await PollOnceAsync(handle, part, forceRaise: false, ct).ConfigureAwait(false); }
catch (OperationCanceledException) { return; }
catch { /* transient polling error — loop continues, health surface reflects it */ }
}
}
private async Task PollOnceAsync(S7SubscriptionHandle handle, PartitionState part, bool forceRaise, CancellationToken ct)
{
// ReadAsync takes _gate internally, which is what serialises every partition's
// wire traffic against the single S7 connection. Multiple partitions racing for
// the gate is fine — short-running ones get serviced inside the long ones' Delay
// window, which is exactly the cadence-decoupling we want from PR-S7-C3.
var snapshots = await ReadAsync(part.TagReferences, ct).ConfigureAwait(false);
for (var i = 0; i < part.TagReferences.Count; i++)
{
var tagRef = part.TagReferences[i];
var current = snapshots[i];
var hasPrev = part.LastValues.TryGetValue(tagRef, out var lastSeen);
// Status-code change always publishes (transitions Bad → Good and back are
// semantically meaningful regardless of the value channel). First sample
// (no prior cached value) and forced-raise (initial-data push) bypass the
// deadband filter outright.
var statusChanged = !hasPrev || lastSeen!.StatusCode != current.StatusCode;
var publish = forceRaise || statusChanged;
if (!publish)
{
// Tag def lookup is best-effort — if absent (defensive against test paths
// that seed only a partition state) fall back to exact equality, which is
// the legacy behaviour.
_tagsByName.TryGetValue(tagRef, out var def);
publish = ShouldPublish(def, lastSeen!.Value, current.Value);
}
if (publish)
{
part.LastValues[tagRef] = current;
OnDataChange?.Invoke(this, new DataChangeEventArgs(handle, tagRef, current));
}
}
}
/// <summary>
/// PR-S7-C4 — per-tag deadband / on-change filter. Pure-function entry point so
/// unit tests can drive every edge case (NaN, ±Infinity, sign flip, near-zero
/// baseline) without spinning up a partition state. Returns <c>true</c> when the
/// sample should be emitted on <c>OnDataChange</c>, <c>false</c> when the deadband
/// suppresses it.
/// </summary>
/// <remarks>
/// <para>
/// Decision matrix:
/// <list type="bullet">
/// <item>Either value <c>NaN</c> / <c>±Infinity</c> → publish (degenerate samples
/// surface to the client).</item>
/// <item>Non-numeric types (string, bool, byte[]) → exact equality
/// (<c>!Equals</c>); deadband knobs are ignored.</item>
/// <item>Numeric, both deadbands null → exact equality.</item>
/// <item>Numeric with <c>DeadbandAbsolute</c> set → suppress when
/// <c>|delta| &lt; DeadbandAbsolute</c>.</item>
/// <item>Numeric with <c>DeadbandPercent</c> set → suppress when
/// <c>|delta| &lt; |prev| * pct / 100</c>; <c>|prev| &lt; 1e-6</c> falls
/// back to absolute (and publishes if no absolute is configured — there
/// is no meaningful percent-of-zero threshold).</item>
/// <item>Both set → publish if EITHER threshold says publish (Kepware
/// semantics; mirrors AbLegacy's <c>ShouldPublish</c> for consistency).</item>
/// </list>
/// </para>
/// </remarks>
internal static bool ShouldPublish(S7TagDefinition? tag, object? prev, object? current)
{
// No deadband knobs (or no def at all) → legacy exact-equality.
if (tag is null || (tag.DeadbandAbsolute is null && tag.DeadbandPercent is null))
return !Equals(prev, current);
// Non-numeric types ignore deadband — exact equality. Same rule applies if either
// sample isn't a numeric scalar (e.g. mid-flight type change, status flip).
if (!TryAsDouble(prev, out var prevD) || !TryAsDouble(current, out var currD))
return !Equals(prev, current);
// NaN / ±Infinity bypass: NaN never equals NaN, and ±Inf is a degenerate signal
// worth surfacing rather than silently filtering. Treat as "publish".
if (double.IsNaN(prevD) || double.IsNaN(currD) ||
double.IsInfinity(prevD) || double.IsInfinity(currD))
return true;
var delta = Math.Abs(currD - prevD);
// Absolute first — cheap and exact.
var absPass = tag.DeadbandAbsolute is double abs && delta >= abs;
bool percentPass;
if (tag.DeadbandPercent is double pct)
{
// Near-zero baseline rule: |prev| < 1e-6 → fall back to absolute. If no
// absolute is configured the sample publishes (no usable percent threshold).
if (Math.Abs(prevD) < 1e-6)
percentPass = tag.DeadbandAbsolute is null ? delta > 0 : false;
else
percentPass = delta >= Math.Abs(prevD) * pct / 100.0;
}
else percentPass = false;
// OR semantics — publish if EITHER deadband triggers. Matches AbLegacy +
// Kepware's "either threshold triggers" convention.
var pass = (tag.DeadbandAbsolute is not null && absPass)
|| (tag.DeadbandPercent is not null && percentPass);
// Edge case: deadbands configured but neither threshold "passes" yet the values
// genuinely differ. Suppress — that's the whole point of a deadband filter.
return pass;
}
/// <summary>
/// PR-S7-C4 — best-effort numeric coercion for the deadband filter. Returns
/// <c>false</c> for non-numerics (string, bool, byte[], null) so the caller falls
/// back to exact-equality.
/// </summary>
private static bool TryAsDouble(object? value, out double result)
{
switch (value)
{
case null:
case bool:
case string:
case byte[]:
case Array:
result = 0;
return false;
case double d: result = d; return true;
case float f: result = f; return true;
case int i: result = i; return true;
case uint u: result = u; return true;
case short s: result = s; return true;
case ushort us: result = us; return true;
case long l: result = l; return true;
case ulong ul: result = ul; return true;
case byte b: result = b; return true;
case sbyte sb: result = sb; return true;
case IConvertible conv:
try
{
result = conv.ToDouble(System.Globalization.CultureInfo.InvariantCulture);
return true;
}
catch { result = 0; return false; }
default: result = 0; return false;
}
}
/// <summary>
/// One subscription owns N partitions, one per distinct publishing interval.
/// <see cref="TagReferences"/> is the original (unpartitioned) request preserved for
/// diagnostics — runtime polling is driven by <see cref="Partitions"/>.
/// </summary>
private sealed record SubscriptionState(
S7SubscriptionHandle Handle,
IReadOnlyList<string> TagReferences,
TimeSpan DefaultInterval,
IReadOnlyList<PartitionState> Partitions);
/// <summary>
/// One poll loop's worth of state: the tags it owns, its tick interval, its
/// per-tag last-seen cache, and the CTS that <see cref="UnsubscribeAsync"/> /
/// <see cref="DisposeAsync"/> trip.
/// </summary>
private sealed record PartitionState(
IReadOnlyList<string> TagReferences,
TimeSpan Interval,
CancellationTokenSource Cts)
{
public System.Collections.Concurrent.ConcurrentDictionary<string, DataValueSnapshot> LastValues { get; }
= new(StringComparer.OrdinalIgnoreCase);
}
private sealed record S7SubscriptionHandle(long Id) : ISubscriptionHandle
{
public string DiagnosticId => $"s7-sub-{Id}";
}
// ---- IHostConnectivityProbe ----
/// <summary>
/// Host identifier surfaced in <see cref="GetHostStatuses"/>. <c>host:port</c> format
/// matches the Modbus driver's convention so the Admin UI dashboard renders both
/// family's rows uniformly.
/// </summary>
public string HostName => $"{_options.Host}:{_options.Port}";
public IReadOnlyList<HostConnectivityStatus> GetHostStatuses()
{
lock (_probeLock)
return [new HostConnectivityStatus(HostName, _hostState, _hostStateChangedUtc)];
}
private async Task ProbeLoopAsync(CancellationToken ct)
{
while (!ct.IsCancellationRequested)
{
var success = false;
try
{
// Probe via S7.Net's low-cost GetCpuStatus — returns the CPU state (Run/Stop)
// and is intentionally light on the comms mailbox. Single-word Plc.ReadAsync
// would also work but GetCpuStatus doubles as a "PLC actually up" check.
using var probeCts = CancellationTokenSource.CreateLinkedTokenSource(ct);
probeCts.CancelAfter(_options.Probe.Timeout);
var plc = Plc;
if (plc is null) throw new InvalidOperationException("Plc dropped during probe");
await _gate.WaitAsync(probeCts.Token).ConfigureAwait(false);
try
{
_ = await plc.ReadStatusAsync(probeCts.Token).ConfigureAwait(false);
success = true;
}
finally { _gate.Release(); }
}
catch (OperationCanceledException) when (ct.IsCancellationRequested) { return; }
catch { /* transport/timeout/exception — treated as Stopped below */ }
TransitionTo(success ? HostState.Running : HostState.Stopped);
try { await Task.Delay(_options.Probe.Interval, ct).ConfigureAwait(false); }
catch (OperationCanceledException) { return; }
}
}
private void TransitionTo(HostState newState)
{
HostState old;
lock (_probeLock)
{
old = _hostState;
if (old == newState) return;
_hostState = newState;
_hostStateChangedUtc = DateTime.UtcNow;
}
OnHostStatusChanged?.Invoke(this, new HostStatusChangedEventArgs(HostName, old, newState));
}
public void Dispose() => DisposeAsync().AsTask().GetAwaiter().GetResult();
public async ValueTask DisposeAsync()
{
if (_disposed) return;
_disposed = true;
try { await ShutdownAsync(CancellationToken.None).ConfigureAwait(false); }
catch { /* disposal is best-effort */ }
_gate.Dispose();
}
}