Compare commits
3 Commits
phase-3-pr
...
phase-3-pr
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d8ef35d5bd | ||
| 5e318a1ab6 | |||
| 0eab1271be |
@@ -26,8 +26,20 @@ namespace ZB.MOM.WW.OtOpcUa.Driver.S7;
|
||||
/// </para>
|
||||
/// </remarks>
|
||||
public sealed class S7Driver(S7DriverOptions options, string driverInstanceId)
|
||||
: IDriver, IReadable, IWritable, IDisposable, IAsyncDisposable
|
||||
: IDriver, ITagDiscovery, IReadable, IWritable, ISubscribable, IHostConnectivityProbe, IDisposable, IAsyncDisposable
|
||||
{
|
||||
// ---- ISubscribable + IHostConnectivityProbe state ----
|
||||
|
||||
private readonly System.Collections.Concurrent.ConcurrentDictionary<long, SubscriptionState> _subscriptions = new();
|
||||
private long _nextSubscriptionId;
|
||||
private readonly object _probeLock = new();
|
||||
private HostState _hostState = HostState.Unknown;
|
||||
private DateTime _hostStateChangedUtc = DateTime.UtcNow;
|
||||
private CancellationTokenSource? _probeCts;
|
||||
|
||||
public event EventHandler<DataChangeEventArgs>? OnDataChange;
|
||||
public event EventHandler<HostStatusChangedEventArgs>? OnHostStatusChanged;
|
||||
|
||||
/// <summary>OPC UA StatusCode used when the tag name isn't in the driver's tag map.</summary>
|
||||
private const uint StatusBadNodeIdUnknown = 0x80340000u;
|
||||
/// <summary>OPC UA StatusCode used when the tag's data type isn't implemented yet.</summary>
|
||||
@@ -98,6 +110,15 @@ public sealed class S7Driver(S7DriverOptions options, string driverInstanceId)
|
||||
}
|
||||
|
||||
_health = new DriverHealth(DriverState.Healthy, DateTime.UtcNow, null);
|
||||
|
||||
// Kick off the probe loop once the connection is up. Initial HostState stays
|
||||
// Unknown until the first probe tick succeeds — avoids broadcasting a premature
|
||||
// Running transition before any PDU round-trip has happened.
|
||||
if (_options.Probe.Enabled)
|
||||
{
|
||||
_probeCts = new CancellationTokenSource();
|
||||
_ = Task.Run(() => ProbeLoopAsync(_probeCts.Token), _probeCts.Token);
|
||||
}
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
@@ -118,6 +139,17 @@ public sealed class S7Driver(S7DriverOptions options, string driverInstanceId)
|
||||
|
||||
public Task ShutdownAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
try { _probeCts?.Cancel(); } catch { }
|
||||
_probeCts?.Dispose();
|
||||
_probeCts = null;
|
||||
|
||||
foreach (var state in _subscriptions.Values)
|
||||
{
|
||||
try { state.Cts.Cancel(); } catch { }
|
||||
state.Cts.Dispose();
|
||||
}
|
||||
_subscriptions.Clear();
|
||||
|
||||
try { Plc?.Close(); } catch { /* best-effort — tearing down anyway */ }
|
||||
Plc = null;
|
||||
_health = new DriverHealth(DriverState.Unknown, _health.LastSuccessfulRead, null);
|
||||
@@ -294,6 +326,180 @@ public sealed class S7Driver(S7DriverOptions options, string driverInstanceId)
|
||||
private global::S7.Net.Plc RequirePlc() =>
|
||||
Plc ?? throw new InvalidOperationException("S7Driver not initialized");
|
||||
|
||||
// ---- ITagDiscovery ----
|
||||
|
||||
public Task DiscoverAsync(IAddressSpaceBuilder builder, CancellationToken cancellationToken)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(builder);
|
||||
var folder = builder.Folder("S7", "S7");
|
||||
foreach (var t in _options.Tags)
|
||||
{
|
||||
folder.Variable(t.Name, t.Name, new DriverAttributeInfo(
|
||||
FullName: t.Name,
|
||||
DriverDataType: MapDataType(t.DataType),
|
||||
IsArray: false,
|
||||
ArrayDim: null,
|
||||
SecurityClass: t.Writable ? SecurityClassification.Operate : SecurityClassification.ViewOnly,
|
||||
IsHistorized: false,
|
||||
IsAlarm: false));
|
||||
}
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
private static DriverDataType MapDataType(S7DataType t) => t switch
|
||||
{
|
||||
S7DataType.Bool => DriverDataType.Boolean,
|
||||
S7DataType.Byte => DriverDataType.Int32, // no 8-bit in DriverDataType yet
|
||||
S7DataType.Int16 or S7DataType.UInt16 or S7DataType.Int32 or S7DataType.UInt32 => DriverDataType.Int32,
|
||||
S7DataType.Int64 or S7DataType.UInt64 => DriverDataType.Int32, // widens; lossy for >2^31-1
|
||||
S7DataType.Float32 => DriverDataType.Float32,
|
||||
S7DataType.Float64 => DriverDataType.Float64,
|
||||
S7DataType.String => DriverDataType.String,
|
||||
S7DataType.DateTime => DriverDataType.DateTime,
|
||||
_ => DriverDataType.Int32,
|
||||
};
|
||||
|
||||
// ---- ISubscribable (polling overlay) ----
|
||||
|
||||
public Task<ISubscriptionHandle> SubscribeAsync(
|
||||
IReadOnlyList<string> fullReferences, TimeSpan publishingInterval, CancellationToken cancellationToken)
|
||||
{
|
||||
var id = Interlocked.Increment(ref _nextSubscriptionId);
|
||||
var cts = new CancellationTokenSource();
|
||||
// Floor at 100 ms — S7 CPUs scan 2-10 ms but the comms mailbox is processed at most
|
||||
// once per scan; sub-100 ms polling just queues wire-side with worse latency.
|
||||
var interval = publishingInterval < TimeSpan.FromMilliseconds(100)
|
||||
? TimeSpan.FromMilliseconds(100)
|
||||
: publishingInterval;
|
||||
var handle = new S7SubscriptionHandle(id);
|
||||
var state = new SubscriptionState(handle, [.. fullReferences], interval, cts);
|
||||
_subscriptions[id] = state;
|
||||
_ = Task.Run(() => PollLoopAsync(state, cts.Token), cts.Token);
|
||||
return Task.FromResult<ISubscriptionHandle>(handle);
|
||||
}
|
||||
|
||||
public Task UnsubscribeAsync(ISubscriptionHandle handle, CancellationToken cancellationToken)
|
||||
{
|
||||
if (handle is S7SubscriptionHandle h && _subscriptions.TryRemove(h.Id, out var state))
|
||||
{
|
||||
state.Cts.Cancel();
|
||||
state.Cts.Dispose();
|
||||
}
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
private async Task PollLoopAsync(SubscriptionState state, CancellationToken ct)
|
||||
{
|
||||
// Initial-data push per OPC UA Part 4 convention.
|
||||
try { await PollOnceAsync(state, forceRaise: true, ct).ConfigureAwait(false); }
|
||||
catch (OperationCanceledException) { return; }
|
||||
catch { /* first-read error — polling continues */ }
|
||||
|
||||
while (!ct.IsCancellationRequested)
|
||||
{
|
||||
try { await Task.Delay(state.Interval, ct).ConfigureAwait(false); }
|
||||
catch (OperationCanceledException) { return; }
|
||||
|
||||
try { await PollOnceAsync(state, forceRaise: false, ct).ConfigureAwait(false); }
|
||||
catch (OperationCanceledException) { return; }
|
||||
catch { /* transient polling error — loop continues, health surface reflects it */ }
|
||||
}
|
||||
}
|
||||
|
||||
private async Task PollOnceAsync(SubscriptionState state, bool forceRaise, CancellationToken ct)
|
||||
{
|
||||
var snapshots = await ReadAsync(state.TagReferences, ct).ConfigureAwait(false);
|
||||
for (var i = 0; i < state.TagReferences.Count; i++)
|
||||
{
|
||||
var tagRef = state.TagReferences[i];
|
||||
var current = snapshots[i];
|
||||
var lastSeen = state.LastValues.TryGetValue(tagRef, out var prev) ? prev : default;
|
||||
|
||||
if (forceRaise || !Equals(lastSeen?.Value, current.Value) || lastSeen?.StatusCode != current.StatusCode)
|
||||
{
|
||||
state.LastValues[tagRef] = current;
|
||||
OnDataChange?.Invoke(this, new DataChangeEventArgs(state.Handle, tagRef, current));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private sealed record SubscriptionState(
|
||||
S7SubscriptionHandle Handle,
|
||||
IReadOnlyList<string> TagReferences,
|
||||
TimeSpan Interval,
|
||||
CancellationTokenSource Cts)
|
||||
{
|
||||
public System.Collections.Concurrent.ConcurrentDictionary<string, DataValueSnapshot> LastValues { get; }
|
||||
= new(StringComparer.OrdinalIgnoreCase);
|
||||
}
|
||||
|
||||
private sealed record S7SubscriptionHandle(long Id) : ISubscriptionHandle
|
||||
{
|
||||
public string DiagnosticId => $"s7-sub-{Id}";
|
||||
}
|
||||
|
||||
// ---- IHostConnectivityProbe ----
|
||||
|
||||
/// <summary>
|
||||
/// Host identifier surfaced in <see cref="GetHostStatuses"/>. <c>host:port</c> format
|
||||
/// matches the Modbus driver's convention so the Admin UI dashboard renders both
|
||||
/// family's rows uniformly.
|
||||
/// </summary>
|
||||
public string HostName => $"{_options.Host}:{_options.Port}";
|
||||
|
||||
public IReadOnlyList<HostConnectivityStatus> GetHostStatuses()
|
||||
{
|
||||
lock (_probeLock)
|
||||
return [new HostConnectivityStatus(HostName, _hostState, _hostStateChangedUtc)];
|
||||
}
|
||||
|
||||
private async Task ProbeLoopAsync(CancellationToken ct)
|
||||
{
|
||||
while (!ct.IsCancellationRequested)
|
||||
{
|
||||
var success = false;
|
||||
try
|
||||
{
|
||||
// Probe via S7.Net's low-cost GetCpuStatus — returns the CPU state (Run/Stop)
|
||||
// and is intentionally light on the comms mailbox. Single-word Plc.ReadAsync
|
||||
// would also work but GetCpuStatus doubles as a "PLC actually up" check.
|
||||
using var probeCts = CancellationTokenSource.CreateLinkedTokenSource(ct);
|
||||
probeCts.CancelAfter(_options.Probe.Timeout);
|
||||
|
||||
var plc = Plc;
|
||||
if (plc is null) throw new InvalidOperationException("Plc dropped during probe");
|
||||
|
||||
await _gate.WaitAsync(probeCts.Token).ConfigureAwait(false);
|
||||
try
|
||||
{
|
||||
_ = await plc.ReadStatusAsync(probeCts.Token).ConfigureAwait(false);
|
||||
success = true;
|
||||
}
|
||||
finally { _gate.Release(); }
|
||||
}
|
||||
catch (OperationCanceledException) when (ct.IsCancellationRequested) { return; }
|
||||
catch { /* transport/timeout/exception — treated as Stopped below */ }
|
||||
|
||||
TransitionTo(success ? HostState.Running : HostState.Stopped);
|
||||
|
||||
try { await Task.Delay(_options.Probe.Interval, ct).ConfigureAwait(false); }
|
||||
catch (OperationCanceledException) { return; }
|
||||
}
|
||||
}
|
||||
|
||||
private void TransitionTo(HostState newState)
|
||||
{
|
||||
HostState old;
|
||||
lock (_probeLock)
|
||||
{
|
||||
old = _hostState;
|
||||
if (old == newState) return;
|
||||
_hostState = newState;
|
||||
_hostStateChangedUtc = DateTime.UtcNow;
|
||||
}
|
||||
OnHostStatusChanged?.Invoke(this, new HostStatusChangedEventArgs(HostName, old, newState));
|
||||
}
|
||||
|
||||
public void Dispose() => DisposeAsync().AsTask().GetAwaiter().GetResult();
|
||||
|
||||
public async ValueTask DisposeAsync()
|
||||
|
||||
@@ -0,0 +1,117 @@
|
||||
using Shouldly;
|
||||
using Xunit;
|
||||
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Driver.S7.Tests;
|
||||
|
||||
/// <summary>
|
||||
/// Shape tests for <see cref="S7Driver"/>'s <see cref="ITagDiscovery"/>,
|
||||
/// <see cref="ISubscribable"/>, and <see cref="IHostConnectivityProbe"/> surfaces that
|
||||
/// don't need a live PLC. Wire-level polling round-trips and probe transitions land in a
|
||||
/// follow-up PR once we have a mock S7 server.
|
||||
/// </summary>
|
||||
[Trait("Category", "Unit")]
|
||||
public sealed class S7DiscoveryAndSubscribeTests
|
||||
{
|
||||
private sealed class RecordingAddressSpaceBuilder : IAddressSpaceBuilder
|
||||
{
|
||||
public readonly List<string> Folders = new();
|
||||
public readonly List<(string Name, DriverAttributeInfo Attr)> Variables = new();
|
||||
|
||||
public IAddressSpaceBuilder Folder(string browseName, string displayName)
|
||||
{
|
||||
Folders.Add(browseName);
|
||||
return this;
|
||||
}
|
||||
public IVariableHandle Variable(string browseName, string displayName, DriverAttributeInfo attributeInfo)
|
||||
{
|
||||
Variables.Add((browseName, attributeInfo));
|
||||
return new StubHandle();
|
||||
}
|
||||
public void AddProperty(string browseName, DriverDataType dataType, object? value) { }
|
||||
public void AttachAlarmCondition(IVariableHandle sourceVariable, string alarmName, DriverAttributeInfo alarmInfo) { }
|
||||
|
||||
private sealed class StubHandle : IVariableHandle
|
||||
{
|
||||
public string FullReference => "stub";
|
||||
public IAlarmConditionSink MarkAsAlarmCondition(AlarmConditionInfo info)
|
||||
=> throw new NotImplementedException("S7 driver never calls this — no alarm surfacing");
|
||||
}
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task DiscoverAsync_projects_every_tag_into_the_address_space()
|
||||
{
|
||||
var opts = new S7DriverOptions
|
||||
{
|
||||
Host = "192.0.2.1",
|
||||
Tags =
|
||||
[
|
||||
new("TempSetpoint", "DB1.DBW0", S7DataType.Int16, Writable: true),
|
||||
new("FaultBit", "M0.0", S7DataType.Bool, Writable: false),
|
||||
new("PIDOutput", "DB5.DBD12", S7DataType.Float32, Writable: true),
|
||||
],
|
||||
};
|
||||
using var drv = new S7Driver(opts, "s7-disco");
|
||||
|
||||
var builder = new RecordingAddressSpaceBuilder();
|
||||
await drv.DiscoverAsync(builder, TestContext.Current.CancellationToken);
|
||||
|
||||
builder.Folders.ShouldContain("S7");
|
||||
builder.Variables.Count.ShouldBe(3);
|
||||
builder.Variables[0].Name.ShouldBe("TempSetpoint");
|
||||
builder.Variables[0].Attr.SecurityClass.ShouldBe(SecurityClassification.Operate, "writable tags get Operate security class");
|
||||
builder.Variables[1].Attr.SecurityClass.ShouldBe(SecurityClassification.ViewOnly, "read-only tags get ViewOnly");
|
||||
builder.Variables[2].Attr.DriverDataType.ShouldBe(DriverDataType.Float32);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void GetHostStatuses_returns_one_row_with_host_port_identity_pre_init()
|
||||
{
|
||||
var opts = new S7DriverOptions { Host = "plc1.internal", Port = 102 };
|
||||
using var drv = new S7Driver(opts, "s7-host");
|
||||
|
||||
var rows = drv.GetHostStatuses();
|
||||
rows.Count.ShouldBe(1);
|
||||
rows[0].HostName.ShouldBe("plc1.internal:102");
|
||||
rows[0].State.ShouldBe(HostState.Unknown, "pre-init / pre-probe state is Unknown");
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task SubscribeAsync_returns_unique_handles_and_UnsubscribeAsync_accepts_them()
|
||||
{
|
||||
var opts = new S7DriverOptions { Host = "192.0.2.1" };
|
||||
using var drv = new S7Driver(opts, "s7-sub");
|
||||
|
||||
// SubscribeAsync does not itself call ReadAsync (the poll task does), so this works
|
||||
// even though the driver isn't initialized. The poll task catches the resulting
|
||||
// InvalidOperationException and the loop quietly continues — same pattern as the
|
||||
// Modbus driver's poll loop tolerating transient transport failures.
|
||||
var h1 = await drv.SubscribeAsync(["T1"], TimeSpan.FromMilliseconds(200), TestContext.Current.CancellationToken);
|
||||
var h2 = await drv.SubscribeAsync(["T2"], TimeSpan.FromMilliseconds(200), TestContext.Current.CancellationToken);
|
||||
|
||||
h1.DiagnosticId.ShouldStartWith("s7-sub-");
|
||||
h2.DiagnosticId.ShouldStartWith("s7-sub-");
|
||||
h1.DiagnosticId.ShouldNotBe(h2.DiagnosticId);
|
||||
|
||||
await drv.UnsubscribeAsync(h1, TestContext.Current.CancellationToken);
|
||||
await drv.UnsubscribeAsync(h2, TestContext.Current.CancellationToken);
|
||||
// UnsubscribeAsync with an unknown handle must be a no-op, not throw.
|
||||
await drv.UnsubscribeAsync(h1, TestContext.Current.CancellationToken);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Subscribe_publishing_interval_is_floored_at_100ms()
|
||||
{
|
||||
var opts = new S7DriverOptions { Host = "192.0.2.1", Probe = new S7ProbeOptions { Enabled = false } };
|
||||
using var drv = new S7Driver(opts, "s7-floor");
|
||||
|
||||
// 50 ms requested — the floor protects the S7 CPU from sub-scan polling that would
|
||||
// just queue wire-side. Test that the subscription is accepted (the floor is applied
|
||||
// internally; the floor value isn't exposed, so we're really just asserting that the
|
||||
// driver doesn't reject small intervals).
|
||||
var h = await drv.SubscribeAsync(["T"], TimeSpan.FromMilliseconds(50), TestContext.Current.CancellationToken);
|
||||
h.ShouldNotBeNull();
|
||||
await drv.UnsubscribeAsync(h, TestContext.Current.CancellationToken);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user