diff --git a/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/Configuration/LmxProxyConfiguration.cs b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/Configuration/LmxProxyConfiguration.cs index 89ea71a..b7b7906 100644 --- a/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/Configuration/LmxProxyConfiguration.cs +++ b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/Configuration/LmxProxyConfiguration.cs @@ -23,5 +23,24 @@ namespace ZB.MOM.WW.LmxProxy.Host.Configuration /// Windows SCM service recovery settings. public ServiceRecoveryConfiguration ServiceRecovery { get; set; } = new ServiceRecoveryConfiguration(); + + /// Health check / active probe settings. + public HealthCheckConfiguration HealthCheck { get; set; } = new HealthCheckConfiguration(); + } + + /// Health check / probe configuration. + public class HealthCheckConfiguration + { + /// Tag address to probe for connection liveness. Default: TestChildObject.TestBool. + public string TestTagAddress { get; set; } = "TestChildObject.TestBool"; + + /// Probe timeout in milliseconds. Default: 5000. + public int ProbeTimeoutMs { get; set; } = 5000; + + /// Consecutive transport failures before forced reconnect. Default: 3. + public int MaxConsecutiveTransportFailures { get; set; } = 3; + + /// Probe interval while in degraded state (ms). Default: 30000 (30s). + public int DegradedProbeIntervalMs { get; set; } = 30000; } } diff --git a/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/Domain/IScadaClient.cs b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/Domain/IScadaClient.cs index 56e21b5..bbaedbd 100644 --- a/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/Domain/IScadaClient.cs +++ b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/Domain/IScadaClient.cs @@ -57,6 +57,18 @@ namespace ZB.MOM.WW.LmxProxy.Host.Domain int pollIntervalMs, CancellationToken ct = default); + /// + /// Probes connection health by reading a test tag. + /// Returns a classified result: Healthy, TransportFailure, or DataDegraded. + /// + Task ProbeConnectionAsync(string testTagAddress, int timeoutMs, CancellationToken ct = default); + + /// + /// Unsubscribes specific tag addresses. Removes from stored subscriptions + /// and COM state. Safe to call after reconnect -- uses current handle mappings. + /// + Task UnsubscribeByAddressAsync(IEnumerable addresses); + /// Subscribes to value changes for specified addresses. /// Subscription handle for unsubscribing. Task SubscribeAsync( diff --git a/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/Domain/ProbeResult.cs b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/Domain/ProbeResult.cs new file mode 100644 index 0000000..3d5da4f --- /dev/null +++ b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/Domain/ProbeResult.cs @@ -0,0 +1,39 @@ +using System; + +namespace ZB.MOM.WW.LmxProxy.Host.Domain +{ + public enum ProbeStatus + { + Healthy, + TransportFailure, + DataDegraded + } + + public sealed class ProbeResult + { + public ProbeStatus Status { get; } + public Quality? Quality { get; } + public DateTime? Timestamp { get; } + public string? Message { get; } + public Exception? Exception { get; } + + private ProbeResult(ProbeStatus status, Quality? quality, DateTime? timestamp, + string? message, Exception? exception) + { + Status = status; + Quality = quality; + Timestamp = timestamp; + Message = message; + Exception = exception; + } + + public static ProbeResult Healthy(Quality quality, DateTime timestamp) + => new ProbeResult(ProbeStatus.Healthy, quality, timestamp, null, null); + + public static ProbeResult Degraded(Quality quality, DateTime timestamp, string message) + => new ProbeResult(ProbeStatus.DataDegraded, quality, timestamp, message, null); + + public static ProbeResult TransportFailed(string message, Exception? ex = null) + => new ProbeResult(ProbeStatus.TransportFailure, null, null, message, ex); + } +} diff --git a/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/LmxProxyService.cs b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/LmxProxyService.cs index 9191571..96d09b2 100644 --- a/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/LmxProxyService.cs +++ b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/LmxProxyService.cs @@ -66,7 +66,11 @@ namespace ZB.MOM.WW.LmxProxy.Host monitorIntervalSeconds: _config.Connection.MonitorIntervalSeconds, autoReconnect: _config.Connection.AutoReconnect, nodeName: _config.Connection.NodeName, - galaxyName: _config.Connection.GalaxyName); + galaxyName: _config.Connection.GalaxyName, + probeTestTagAddress: _config.HealthCheck.TestTagAddress, + probeTimeoutMs: _config.HealthCheck.ProbeTimeoutMs, + maxConsecutiveTransportFailures: _config.HealthCheck.MaxConsecutiveTransportFailures, + degradedProbeIntervalMs: _config.HealthCheck.DegradedProbeIntervalMs); // 5. Connect to MxAccess synchronously (with timeout) Log.Information("Connecting to MxAccess (timeout: {Timeout}s)...", @@ -101,6 +105,11 @@ namespace ZB.MOM.WW.LmxProxy.Host { _subscriptionManager.NotifyDisconnection(); } + else if (e.CurrentState == Domain.ConnectionState.Connected && + e.PreviousState == Domain.ConnectionState.Reconnecting) + { + _subscriptionManager.NotifyReconnection(); + } }; // 8. Create SessionManager @@ -111,7 +120,8 @@ namespace ZB.MOM.WW.LmxProxy.Host // 10. Create health check services _healthCheckService = new HealthCheckService(_mxAccessClient, _subscriptionManager, _performanceMetrics); - _detailedHealthCheckService = new DetailedHealthCheckService(_mxAccessClient); + _detailedHealthCheckService = new DetailedHealthCheckService( + _mxAccessClient, _config.HealthCheck.TestTagAddress); // 11. Create status report service _statusReportService = new StatusReportService( diff --git a/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/MxAccess/MxAccessClient.Connection.cs b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/MxAccess/MxAccessClient.Connection.cs index 479716a..6d5af02 100644 --- a/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/MxAccess/MxAccessClient.Connection.cs +++ b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/MxAccess/MxAccessClient.Connection.cs @@ -171,47 +171,180 @@ namespace ZB.MOM.WW.LmxProxy.Host.MxAccess } /// - /// Auto-reconnect monitor loop. Checks connection every monitorInterval. - /// On disconnect, attempts reconnect. On failure, retries at next interval. + /// Probes the connection by reading a test tag with a timeout. + /// Classifies the result as transport failure vs data degraded. + /// + public async Task ProbeConnectionAsync(string testTagAddress, int timeoutMs, + CancellationToken ct = default) + { + if (!IsConnected) + return ProbeResult.TransportFailed("Not connected"); + + try + { + using (var cts = CancellationTokenSource.CreateLinkedTokenSource(ct)) + { + cts.CancelAfter(timeoutMs); + + Vtq vtq; + try + { + vtq = await ReadAsync(testTagAddress, cts.Token); + } + catch (OperationCanceledException) when (!ct.IsCancellationRequested) + { + // Our timeout fired, not the caller's -- treat as transport failure + return ProbeResult.TransportFailed("Probe read timed out after " + timeoutMs + "ms"); + } + + if (vtq.Quality == Domain.Quality.Bad_NotConnected || + vtq.Quality == Domain.Quality.Bad_CommFailure) + { + return ProbeResult.TransportFailed("Probe returned " + vtq.Quality); + } + + if (!vtq.Quality.IsGood()) + { + return ProbeResult.Degraded(vtq.Quality, vtq.Timestamp, + "Probe quality: " + vtq.Quality); + } + + if (DateTime.UtcNow - vtq.Timestamp > TimeSpan.FromMinutes(5)) + { + return ProbeResult.Degraded(vtq.Quality, vtq.Timestamp, + "Probe data stale (>" + 5 + "min)"); + } + + return ProbeResult.Healthy(vtq.Quality, vtq.Timestamp); + } + } + catch (System.Runtime.InteropServices.COMException ex) + { + return ProbeResult.TransportFailed("COM exception: " + ex.Message, ex); + } + catch (InvalidOperationException ex) when (ex.Message.Contains("Not connected")) + { + return ProbeResult.TransportFailed(ex.Message, ex); + } + catch (Exception ex) + { + return ProbeResult.TransportFailed("Probe failed: " + ex.Message, ex); + } + } + + /// + /// Auto-reconnect monitor loop with active health probing. + /// - If IsConnected is false: immediate reconnect (existing behavior). + /// - If IsConnected is true and probe configured: read test tag each interval. + /// - TransportFailure for N consecutive probes -> forced disconnect + reconnect. + /// - DataDegraded -> stay connected, back off probe interval, report degraded. + /// - Healthy -> reset counters and resume normal interval. /// private async Task MonitorConnectionAsync(CancellationToken ct) { - Log.Information("Connection monitor loop started (interval={IntervalMs}ms)", _monitorIntervalMs); + Log.Information("Connection monitor loop started (interval={IntervalMs}ms, probe={ProbeEnabled})", + _monitorIntervalMs, _probeTestTagAddress != null); while (!ct.IsCancellationRequested) { + var interval = _isDegraded ? _degradedProbeIntervalMs : _monitorIntervalMs; + try { - await Task.Delay(_monitorIntervalMs, ct); + await Task.Delay(interval, ct); } catch (OperationCanceledException) { break; } - if (IsConnected) continue; - - Log.Information("MxAccess disconnected, attempting reconnect..."); - SetState(ConnectionState.Reconnecting); - - try + // -- Case 1: Already disconnected -- + if (!IsConnected) { - await ConnectAsync(ct); - Log.Information("Reconnected to MxAccess successfully"); + _isDegraded = false; + _consecutiveTransportFailures = 0; + await AttemptReconnectAsync(ct); + continue; } - catch (OperationCanceledException) + + // -- Case 2: Connected, no probe configured -- legacy behavior -- + if (_probeTestTagAddress == null) + continue; + + // -- Case 3: Connected, probe configured -- active health check -- + var probe = await ProbeConnectionAsync(_probeTestTagAddress, _probeTimeoutMs, ct); + + switch (probe.Status) { - break; - } - catch (Exception ex) - { - Log.Warning(ex, "Reconnect attempt failed, will retry in {IntervalMs}ms", _monitorIntervalMs); + case ProbeStatus.Healthy: + if (_isDegraded) + { + Log.Information("Probe healthy -- exiting degraded mode"); + _isDegraded = false; + } + _consecutiveTransportFailures = 0; + break; + + case ProbeStatus.DataDegraded: + _consecutiveTransportFailures = 0; + if (!_isDegraded) + { + Log.Warning("Probe degraded: {Message} -- entering degraded mode (probe interval {IntervalMs}ms)", + probe.Message, _degradedProbeIntervalMs); + _isDegraded = true; + } + break; + + case ProbeStatus.TransportFailure: + _isDegraded = false; + _consecutiveTransportFailures++; + Log.Warning("Probe transport failure ({Count}/{Max}): {Message}", + _consecutiveTransportFailures, _maxConsecutiveTransportFailures, probe.Message); + + if (_consecutiveTransportFailures >= _maxConsecutiveTransportFailures) + { + Log.Warning("Max consecutive transport failures reached -- forcing reconnect"); + _consecutiveTransportFailures = 0; + + try + { + await DisconnectAsync(ct); + } + catch (Exception ex) + { + Log.Warning(ex, "Error during forced disconnect before reconnect"); + // DisconnectAsync already calls CleanupComObjectsAsync on error path + } + + await AttemptReconnectAsync(ct); + } + break; } } Log.Information("Connection monitor loop exited"); } + private async Task AttemptReconnectAsync(CancellationToken ct) + { + Log.Information("Attempting reconnect..."); + SetState(ConnectionState.Reconnecting); + + try + { + await ConnectAsync(ct); + Log.Information("Reconnected to MxAccess successfully"); + } + catch (OperationCanceledException) + { + // Let the outer loop handle cancellation + } + catch (Exception ex) + { + Log.Warning(ex, "Reconnect attempt failed, will retry at next interval"); + } + } + /// /// Cleans up COM objects via Task.Run after a failed connection. /// diff --git a/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/MxAccess/MxAccessClient.Subscription.cs b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/MxAccess/MxAccessClient.Subscription.cs index 0bc6ead..0b9935e 100644 --- a/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/MxAccess/MxAccessClient.Subscription.cs +++ b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/MxAccess/MxAccessClient.Subscription.cs @@ -47,6 +47,15 @@ namespace ZB.MOM.WW.LmxProxy.Host.MxAccess return new SubscriptionHandle(this, addressList, callback); } + /// + /// Unsubscribes specific addresses by address name. + /// Removes from both COM state and stored subscriptions (no reconnect replay). + /// + public async Task UnsubscribeByAddressAsync(IEnumerable addresses) + { + await UnsubscribeAsync(addresses); + } + /// /// Unsubscribes specific addresses. /// diff --git a/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/MxAccess/MxAccessClient.cs b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/MxAccess/MxAccessClient.cs index 5fd6120..d424c1a 100644 --- a/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/MxAccess/MxAccessClient.cs +++ b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/MxAccess/MxAccessClient.cs @@ -41,6 +41,16 @@ namespace ZB.MOM.WW.LmxProxy.Host.MxAccess // Reconnect private CancellationTokenSource? _reconnectCts; + // Probe configuration + private readonly string? _probeTestTagAddress; + private readonly int _probeTimeoutMs; + private readonly int _maxConsecutiveTransportFailures; + private readonly int _degradedProbeIntervalMs; + + // Probe state + private int _consecutiveTransportFailures; + private bool _isDegraded; + // Stored subscriptions for reconnect replay private readonly Dictionary> _storedSubscriptions = new Dictionary>(StringComparer.OrdinalIgnoreCase); @@ -63,7 +73,11 @@ namespace ZB.MOM.WW.LmxProxy.Host.MxAccess int monitorIntervalSeconds = 5, bool autoReconnect = true, string? nodeName = null, - string? galaxyName = null) + string? galaxyName = null, + string? probeTestTagAddress = null, + int probeTimeoutMs = 5000, + int maxConsecutiveTransportFailures = 3, + int degradedProbeIntervalMs = 30000) { _maxConcurrentOperations = maxConcurrentOperations; _readTimeoutMs = readTimeoutSeconds * 1000; @@ -72,6 +86,10 @@ namespace ZB.MOM.WW.LmxProxy.Host.MxAccess _autoReconnect = autoReconnect; _nodeName = nodeName; _galaxyName = galaxyName; + _probeTestTagAddress = probeTestTagAddress; + _probeTimeoutMs = probeTimeoutMs; + _maxConsecutiveTransportFailures = maxConsecutiveTransportFailures; + _degradedProbeIntervalMs = degradedProbeIntervalMs; _readSemaphore = new SemaphoreSlim(maxConcurrentOperations, maxConcurrentOperations); _writeSemaphore = new SemaphoreSlim(maxConcurrentOperations, maxConcurrentOperations); diff --git a/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/Subscriptions/SubscriptionManager.cs b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/Subscriptions/SubscriptionManager.cs index fab5f85..739538c 100644 --- a/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/Subscriptions/SubscriptionManager.cs +++ b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/Subscriptions/SubscriptionManager.cs @@ -30,10 +30,6 @@ namespace ZB.MOM.WW.LmxProxy.Host.Subscriptions private readonly ConcurrentDictionary _tagSubscriptions = new ConcurrentDictionary(StringComparer.OrdinalIgnoreCase); - // Tag address -> MxAccess subscription handle (for cleanup when last client unsubscribes) - private readonly ConcurrentDictionary _mxAccessHandles - = new ConcurrentDictionary(StringComparer.OrdinalIgnoreCase); - private readonly ReaderWriterLockSlim _rwLock = new ReaderWriterLockSlim(); public SubscriptionManager(IScadaClient scadaClient, int channelCapacity = 1000, @@ -105,15 +101,9 @@ namespace ZB.MOM.WW.LmxProxy.Host.Subscriptions { try { - var handle = await _scadaClient.SubscribeAsync( + await _scadaClient.SubscribeAsync( addresses, (address, vtq) => OnTagValueChanged(address, vtq)); - - // Store handle for each address so we can dispose per-tag - foreach (var address in addresses) - { - _mxAccessHandles[address] = handle; - } } catch (Exception ex) { @@ -195,19 +185,16 @@ namespace ZB.MOM.WW.LmxProxy.Host.Subscriptions _rwLock.ExitWriteLock(); } - // Dispose MxAccess handles for tags with no remaining clients - foreach (var address in tagsToDispose) + // Unsubscribe tags with no remaining clients via address-based API + if (tagsToDispose.Count > 0) { - if (_mxAccessHandles.TryRemove(address, out var handle)) + try { - try - { - handle.DisposeAsync().AsTask().GetAwaiter().GetResult(); - } - catch (Exception ex) - { - Log.Warning(ex, "Error disposing MxAccess subscription for {Address}", address); - } + _scadaClient.UnsubscribeByAddressAsync(tagsToDispose).GetAwaiter().GetResult(); + } + catch (Exception ex) + { + Log.Warning(ex, "Error unsubscribing {Count} tags from MxAccess", tagsToDispose.Count); } } @@ -235,6 +222,18 @@ namespace ZB.MOM.WW.LmxProxy.Host.Subscriptions } } + /// + /// Logs reconnection for observability. Data flow resumes automatically + /// via MxAccessClient.RecreateStoredSubscriptionsAsync callbacks. + /// + public void NotifyReconnection() + { + Log.Information("MxAccess reconnected -- subscriptions recreated, " + + "data flow will resume via OnDataChange callbacks " + + "({ClientCount} clients, {TagCount} tags)", + _clientSubscriptions.Count, _tagSubscriptions.Count); + } + /// Returns subscription statistics. public SubscriptionStats GetStats() { diff --git a/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/appsettings.json b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/appsettings.json index 94fe869..1202291 100644 --- a/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/appsettings.json +++ b/lmxproxy/src/ZB.MOM.WW.LmxProxy.Host/appsettings.json @@ -32,6 +32,13 @@ "Port": 8080 }, + "HealthCheck": { + "TestTagAddress": "TestChildObject.TestBool", + "ProbeTimeoutMs": 5000, + "MaxConsecutiveTransportFailures": 3, + "DegradedProbeIntervalMs": 30000 + }, + "ServiceRecovery": { "FirstFailureDelayMinutes": 1, "SecondFailureDelayMinutes": 5, diff --git a/lmxproxy/tests/ZB.MOM.WW.LmxProxy.Host.Tests/Health/HealthCheckServiceTests.cs b/lmxproxy/tests/ZB.MOM.WW.LmxProxy.Host.Tests/Health/HealthCheckServiceTests.cs index 8482422..a735907 100644 --- a/lmxproxy/tests/ZB.MOM.WW.LmxProxy.Host.Tests/Health/HealthCheckServiceTests.cs +++ b/lmxproxy/tests/ZB.MOM.WW.LmxProxy.Host.Tests/Health/HealthCheckServiceTests.cs @@ -32,6 +32,9 @@ namespace ZB.MOM.WW.LmxProxy.Host.Tests.Health IReadOnlyDictionary values, string flagTag, object flagValue, int timeoutMs, int pollIntervalMs, CancellationToken ct = default) => Task.FromResult((false, 0)); + public Task ProbeConnectionAsync(string testTagAddress, int timeoutMs, CancellationToken ct = default) => + Task.FromResult(ProbeResult.Healthy(Quality.Good, DateTime.UtcNow)); + public Task UnsubscribeByAddressAsync(IEnumerable addresses) => Task.CompletedTask; public Task SubscribeAsync(IEnumerable addresses, Action callback, CancellationToken ct = default) => Task.FromResult(new FakeHandle()); public ValueTask DisposeAsync() => default; @@ -155,6 +158,9 @@ namespace ZB.MOM.WW.LmxProxy.Host.Tests.Health IReadOnlyDictionary values, string flagTag, object flagValue, int timeoutMs, int pollIntervalMs, CancellationToken ct = default) => Task.FromResult((false, 0)); + public Task ProbeConnectionAsync(string testTagAddress, int timeoutMs, CancellationToken ct = default) => + Task.FromResult(ProbeResult.Healthy(Quality.Good, DateTime.UtcNow)); + public Task UnsubscribeByAddressAsync(IEnumerable addresses) => Task.CompletedTask; public Task SubscribeAsync(IEnumerable addresses, Action callback, CancellationToken ct = default) => Task.FromResult(new FakeHandle()); public ValueTask DisposeAsync() => default; diff --git a/lmxproxy/tests/ZB.MOM.WW.LmxProxy.Host.Tests/Status/StatusReportServiceTests.cs b/lmxproxy/tests/ZB.MOM.WW.LmxProxy.Host.Tests/Status/StatusReportServiceTests.cs index 9df6708..08e883b 100644 --- a/lmxproxy/tests/ZB.MOM.WW.LmxProxy.Host.Tests/Status/StatusReportServiceTests.cs +++ b/lmxproxy/tests/ZB.MOM.WW.LmxProxy.Host.Tests/Status/StatusReportServiceTests.cs @@ -33,6 +33,9 @@ namespace ZB.MOM.WW.LmxProxy.Host.Tests.Status IReadOnlyDictionary values, string flagTag, object flagValue, int timeoutMs, int pollIntervalMs, CancellationToken ct = default) => Task.FromResult((false, 0)); + public Task ProbeConnectionAsync(string testTagAddress, int timeoutMs, CancellationToken ct = default) => + Task.FromResult(ProbeResult.Healthy(Quality.Good, DateTime.UtcNow)); + public Task UnsubscribeByAddressAsync(IEnumerable addresses) => Task.CompletedTask; public Task SubscribeAsync(IEnumerable addresses, Action callback, CancellationToken ct = default) => Task.FromResult(new FakeHandle()); public ValueTask DisposeAsync() => default; diff --git a/lmxproxy/tests/ZB.MOM.WW.LmxProxy.Host.Tests/Subscriptions/SubscriptionManagerTests.cs b/lmxproxy/tests/ZB.MOM.WW.LmxProxy.Host.Tests/Subscriptions/SubscriptionManagerTests.cs index 58cde93..b12af80 100644 --- a/lmxproxy/tests/ZB.MOM.WW.LmxProxy.Host.Tests/Subscriptions/SubscriptionManagerTests.cs +++ b/lmxproxy/tests/ZB.MOM.WW.LmxProxy.Host.Tests/Subscriptions/SubscriptionManagerTests.cs @@ -30,6 +30,9 @@ namespace ZB.MOM.WW.LmxProxy.Host.Tests.Subscriptions IReadOnlyDictionary values, string flagTag, object flagValue, int timeoutMs, int pollIntervalMs, CancellationToken ct = default) => Task.FromResult((false, 0)); + public Task ProbeConnectionAsync(string testTagAddress, int timeoutMs, CancellationToken ct = default) => + Task.FromResult(ProbeResult.Healthy(Quality.Good, DateTime.UtcNow)); + public Task UnsubscribeByAddressAsync(IEnumerable addresses) => Task.CompletedTask; public Task SubscribeAsync(IEnumerable addresses, Action callback, CancellationToken ct = default) => Task.FromResult(new FakeSubscriptionHandle()); public ValueTask DisposeAsync() => default;