diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.S7/S7Driver.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.S7/S7Driver.cs index b7e6be2..708ad33 100644 --- a/src/ZB.MOM.WW.OtOpcUa.Driver.S7/S7Driver.cs +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.S7/S7Driver.cs @@ -26,8 +26,20 @@ namespace ZB.MOM.WW.OtOpcUa.Driver.S7; /// /// public sealed class S7Driver(S7DriverOptions options, string driverInstanceId) - : IDriver, IReadable, IWritable, IDisposable, IAsyncDisposable + : IDriver, ITagDiscovery, IReadable, IWritable, ISubscribable, IHostConnectivityProbe, IDisposable, IAsyncDisposable { + // ---- ISubscribable + IHostConnectivityProbe state ---- + + private readonly System.Collections.Concurrent.ConcurrentDictionary _subscriptions = new(); + private long _nextSubscriptionId; + private readonly object _probeLock = new(); + private HostState _hostState = HostState.Unknown; + private DateTime _hostStateChangedUtc = DateTime.UtcNow; + private CancellationTokenSource? _probeCts; + + public event EventHandler? OnDataChange; + public event EventHandler? OnHostStatusChanged; + /// OPC UA StatusCode used when the tag name isn't in the driver's tag map. private const uint StatusBadNodeIdUnknown = 0x80340000u; /// OPC UA StatusCode used when the tag's data type isn't implemented yet. @@ -98,6 +110,15 @@ public sealed class S7Driver(S7DriverOptions options, string driverInstanceId) } _health = new DriverHealth(DriverState.Healthy, DateTime.UtcNow, null); + + // Kick off the probe loop once the connection is up. Initial HostState stays + // Unknown until the first probe tick succeeds — avoids broadcasting a premature + // Running transition before any PDU round-trip has happened. + if (_options.Probe.Enabled) + { + _probeCts = new CancellationTokenSource(); + _ = Task.Run(() => ProbeLoopAsync(_probeCts.Token), _probeCts.Token); + } } catch (Exception ex) { @@ -118,6 +139,17 @@ public sealed class S7Driver(S7DriverOptions options, string driverInstanceId) public Task ShutdownAsync(CancellationToken cancellationToken) { + try { _probeCts?.Cancel(); } catch { } + _probeCts?.Dispose(); + _probeCts = null; + + foreach (var state in _subscriptions.Values) + { + try { state.Cts.Cancel(); } catch { } + state.Cts.Dispose(); + } + _subscriptions.Clear(); + try { Plc?.Close(); } catch { /* best-effort — tearing down anyway */ } Plc = null; _health = new DriverHealth(DriverState.Unknown, _health.LastSuccessfulRead, null); @@ -294,6 +326,180 @@ public sealed class S7Driver(S7DriverOptions options, string driverInstanceId) private global::S7.Net.Plc RequirePlc() => Plc ?? throw new InvalidOperationException("S7Driver not initialized"); + // ---- ITagDiscovery ---- + + public Task DiscoverAsync(IAddressSpaceBuilder builder, CancellationToken cancellationToken) + { + ArgumentNullException.ThrowIfNull(builder); + var folder = builder.Folder("S7", "S7"); + foreach (var t in _options.Tags) + { + folder.Variable(t.Name, t.Name, new DriverAttributeInfo( + FullName: t.Name, + DriverDataType: MapDataType(t.DataType), + IsArray: false, + ArrayDim: null, + SecurityClass: t.Writable ? SecurityClassification.Operate : SecurityClassification.ViewOnly, + IsHistorized: false, + IsAlarm: false)); + } + return Task.CompletedTask; + } + + private static DriverDataType MapDataType(S7DataType t) => t switch + { + S7DataType.Bool => DriverDataType.Boolean, + S7DataType.Byte => DriverDataType.Int32, // no 8-bit in DriverDataType yet + S7DataType.Int16 or S7DataType.UInt16 or S7DataType.Int32 or S7DataType.UInt32 => DriverDataType.Int32, + S7DataType.Int64 or S7DataType.UInt64 => DriverDataType.Int32, // widens; lossy for >2^31-1 + S7DataType.Float32 => DriverDataType.Float32, + S7DataType.Float64 => DriverDataType.Float64, + S7DataType.String => DriverDataType.String, + S7DataType.DateTime => DriverDataType.DateTime, + _ => DriverDataType.Int32, + }; + + // ---- ISubscribable (polling overlay) ---- + + public Task SubscribeAsync( + IReadOnlyList fullReferences, TimeSpan publishingInterval, CancellationToken cancellationToken) + { + var id = Interlocked.Increment(ref _nextSubscriptionId); + var cts = new CancellationTokenSource(); + // Floor at 100 ms — S7 CPUs scan 2-10 ms but the comms mailbox is processed at most + // once per scan; sub-100 ms polling just queues wire-side with worse latency. + var interval = publishingInterval < TimeSpan.FromMilliseconds(100) + ? TimeSpan.FromMilliseconds(100) + : publishingInterval; + var handle = new S7SubscriptionHandle(id); + var state = new SubscriptionState(handle, [.. fullReferences], interval, cts); + _subscriptions[id] = state; + _ = Task.Run(() => PollLoopAsync(state, cts.Token), cts.Token); + return Task.FromResult(handle); + } + + public Task UnsubscribeAsync(ISubscriptionHandle handle, CancellationToken cancellationToken) + { + if (handle is S7SubscriptionHandle h && _subscriptions.TryRemove(h.Id, out var state)) + { + state.Cts.Cancel(); + state.Cts.Dispose(); + } + return Task.CompletedTask; + } + + private async Task PollLoopAsync(SubscriptionState state, CancellationToken ct) + { + // Initial-data push per OPC UA Part 4 convention. + try { await PollOnceAsync(state, forceRaise: true, ct).ConfigureAwait(false); } + catch (OperationCanceledException) { return; } + catch { /* first-read error — polling continues */ } + + while (!ct.IsCancellationRequested) + { + try { await Task.Delay(state.Interval, ct).ConfigureAwait(false); } + catch (OperationCanceledException) { return; } + + try { await PollOnceAsync(state, forceRaise: false, ct).ConfigureAwait(false); } + catch (OperationCanceledException) { return; } + catch { /* transient polling error — loop continues, health surface reflects it */ } + } + } + + private async Task PollOnceAsync(SubscriptionState state, bool forceRaise, CancellationToken ct) + { + var snapshots = await ReadAsync(state.TagReferences, ct).ConfigureAwait(false); + for (var i = 0; i < state.TagReferences.Count; i++) + { + var tagRef = state.TagReferences[i]; + var current = snapshots[i]; + var lastSeen = state.LastValues.TryGetValue(tagRef, out var prev) ? prev : default; + + if (forceRaise || !Equals(lastSeen?.Value, current.Value) || lastSeen?.StatusCode != current.StatusCode) + { + state.LastValues[tagRef] = current; + OnDataChange?.Invoke(this, new DataChangeEventArgs(state.Handle, tagRef, current)); + } + } + } + + private sealed record SubscriptionState( + S7SubscriptionHandle Handle, + IReadOnlyList TagReferences, + TimeSpan Interval, + CancellationTokenSource Cts) + { + public System.Collections.Concurrent.ConcurrentDictionary LastValues { get; } + = new(StringComparer.OrdinalIgnoreCase); + } + + private sealed record S7SubscriptionHandle(long Id) : ISubscriptionHandle + { + public string DiagnosticId => $"s7-sub-{Id}"; + } + + // ---- IHostConnectivityProbe ---- + + /// + /// Host identifier surfaced in . host:port format + /// matches the Modbus driver's convention so the Admin UI dashboard renders both + /// family's rows uniformly. + /// + public string HostName => $"{_options.Host}:{_options.Port}"; + + public IReadOnlyList GetHostStatuses() + { + lock (_probeLock) + return [new HostConnectivityStatus(HostName, _hostState, _hostStateChangedUtc)]; + } + + private async Task ProbeLoopAsync(CancellationToken ct) + { + while (!ct.IsCancellationRequested) + { + var success = false; + try + { + // Probe via S7.Net's low-cost GetCpuStatus — returns the CPU state (Run/Stop) + // and is intentionally light on the comms mailbox. Single-word Plc.ReadAsync + // would also work but GetCpuStatus doubles as a "PLC actually up" check. + using var probeCts = CancellationTokenSource.CreateLinkedTokenSource(ct); + probeCts.CancelAfter(_options.Probe.Timeout); + + var plc = Plc; + if (plc is null) throw new InvalidOperationException("Plc dropped during probe"); + + await _gate.WaitAsync(probeCts.Token).ConfigureAwait(false); + try + { + _ = await plc.ReadStatusAsync(probeCts.Token).ConfigureAwait(false); + success = true; + } + finally { _gate.Release(); } + } + catch (OperationCanceledException) when (ct.IsCancellationRequested) { return; } + catch { /* transport/timeout/exception — treated as Stopped below */ } + + TransitionTo(success ? HostState.Running : HostState.Stopped); + + try { await Task.Delay(_options.Probe.Interval, ct).ConfigureAwait(false); } + catch (OperationCanceledException) { return; } + } + } + + private void TransitionTo(HostState newState) + { + HostState old; + lock (_probeLock) + { + old = _hostState; + if (old == newState) return; + _hostState = newState; + _hostStateChangedUtc = DateTime.UtcNow; + } + OnHostStatusChanged?.Invoke(this, new HostStatusChangedEventArgs(HostName, old, newState)); + } + public void Dispose() => DisposeAsync().AsTask().GetAwaiter().GetResult(); public async ValueTask DisposeAsync() diff --git a/tests/ZB.MOM.WW.OtOpcUa.Driver.S7.Tests/S7DiscoveryAndSubscribeTests.cs b/tests/ZB.MOM.WW.OtOpcUa.Driver.S7.Tests/S7DiscoveryAndSubscribeTests.cs new file mode 100644 index 0000000..339369b --- /dev/null +++ b/tests/ZB.MOM.WW.OtOpcUa.Driver.S7.Tests/S7DiscoveryAndSubscribeTests.cs @@ -0,0 +1,117 @@ +using Shouldly; +using Xunit; +using ZB.MOM.WW.OtOpcUa.Core.Abstractions; + +namespace ZB.MOM.WW.OtOpcUa.Driver.S7.Tests; + +/// +/// Shape tests for 's , +/// , and surfaces that +/// don't need a live PLC. Wire-level polling round-trips and probe transitions land in a +/// follow-up PR once we have a mock S7 server. +/// +[Trait("Category", "Unit")] +public sealed class S7DiscoveryAndSubscribeTests +{ + private sealed class RecordingAddressSpaceBuilder : IAddressSpaceBuilder + { + public readonly List Folders = new(); + public readonly List<(string Name, DriverAttributeInfo Attr)> Variables = new(); + + public IAddressSpaceBuilder Folder(string browseName, string displayName) + { + Folders.Add(browseName); + return this; + } + public IVariableHandle Variable(string browseName, string displayName, DriverAttributeInfo attributeInfo) + { + Variables.Add((browseName, attributeInfo)); + return new StubHandle(); + } + public void AddProperty(string browseName, DriverDataType dataType, object? value) { } + public void AttachAlarmCondition(IVariableHandle sourceVariable, string alarmName, DriverAttributeInfo alarmInfo) { } + + private sealed class StubHandle : IVariableHandle + { + public string FullReference => "stub"; + public IAlarmConditionSink MarkAsAlarmCondition(AlarmConditionInfo info) + => throw new NotImplementedException("S7 driver never calls this — no alarm surfacing"); + } + } + + [Fact] + public async Task DiscoverAsync_projects_every_tag_into_the_address_space() + { + var opts = new S7DriverOptions + { + Host = "192.0.2.1", + Tags = + [ + new("TempSetpoint", "DB1.DBW0", S7DataType.Int16, Writable: true), + new("FaultBit", "M0.0", S7DataType.Bool, Writable: false), + new("PIDOutput", "DB5.DBD12", S7DataType.Float32, Writable: true), + ], + }; + using var drv = new S7Driver(opts, "s7-disco"); + + var builder = new RecordingAddressSpaceBuilder(); + await drv.DiscoverAsync(builder, TestContext.Current.CancellationToken); + + builder.Folders.ShouldContain("S7"); + builder.Variables.Count.ShouldBe(3); + builder.Variables[0].Name.ShouldBe("TempSetpoint"); + builder.Variables[0].Attr.SecurityClass.ShouldBe(SecurityClassification.Operate, "writable tags get Operate security class"); + builder.Variables[1].Attr.SecurityClass.ShouldBe(SecurityClassification.ViewOnly, "read-only tags get ViewOnly"); + builder.Variables[2].Attr.DriverDataType.ShouldBe(DriverDataType.Float32); + } + + [Fact] + public void GetHostStatuses_returns_one_row_with_host_port_identity_pre_init() + { + var opts = new S7DriverOptions { Host = "plc1.internal", Port = 102 }; + using var drv = new S7Driver(opts, "s7-host"); + + var rows = drv.GetHostStatuses(); + rows.Count.ShouldBe(1); + rows[0].HostName.ShouldBe("plc1.internal:102"); + rows[0].State.ShouldBe(HostState.Unknown, "pre-init / pre-probe state is Unknown"); + } + + [Fact] + public async Task SubscribeAsync_returns_unique_handles_and_UnsubscribeAsync_accepts_them() + { + var opts = new S7DriverOptions { Host = "192.0.2.1" }; + using var drv = new S7Driver(opts, "s7-sub"); + + // SubscribeAsync does not itself call ReadAsync (the poll task does), so this works + // even though the driver isn't initialized. The poll task catches the resulting + // InvalidOperationException and the loop quietly continues — same pattern as the + // Modbus driver's poll loop tolerating transient transport failures. + var h1 = await drv.SubscribeAsync(["T1"], TimeSpan.FromMilliseconds(200), TestContext.Current.CancellationToken); + var h2 = await drv.SubscribeAsync(["T2"], TimeSpan.FromMilliseconds(200), TestContext.Current.CancellationToken); + + h1.DiagnosticId.ShouldStartWith("s7-sub-"); + h2.DiagnosticId.ShouldStartWith("s7-sub-"); + h1.DiagnosticId.ShouldNotBe(h2.DiagnosticId); + + await drv.UnsubscribeAsync(h1, TestContext.Current.CancellationToken); + await drv.UnsubscribeAsync(h2, TestContext.Current.CancellationToken); + // UnsubscribeAsync with an unknown handle must be a no-op, not throw. + await drv.UnsubscribeAsync(h1, TestContext.Current.CancellationToken); + } + + [Fact] + public async Task Subscribe_publishing_interval_is_floored_at_100ms() + { + var opts = new S7DriverOptions { Host = "192.0.2.1", Probe = new S7ProbeOptions { Enabled = false } }; + using var drv = new S7Driver(opts, "s7-floor"); + + // 50 ms requested — the floor protects the S7 CPU from sub-scan polling that would + // just queue wire-side. Test that the subscription is accepted (the floor is applied + // internally; the floor value isn't exposed, so we're really just asserting that the + // driver doesn't reject small intervals). + var h = await drv.SubscribeAsync(["T"], TimeSpan.FromMilliseconds(50), TestContext.Current.CancellationToken); + h.ShouldNotBeNull(); + await drv.UnsubscribeAsync(h, TestContext.Current.CancellationToken); + } +}