Close all four stability-review 2026-04-13 findings so a failed runtime probe subscription can no longer leave a phantom entry that Tick() flips to Stopped and fans out false BadOutOfService quality across a host's subtree, a silently-failed dashboard bind no longer lets the service advertise a successful start while an operator-visible endpoint is dead, the seven sync-over-async sites in LmxNodeManager (rebuild probe sync, Read, Write, four HistoryRead overrides) can no longer park the OPC UA stack thread indefinitely on a hung backend, and alarm auto-subscribe + transferred-subscription restore no longer race shutdown as untracked fire-and-forget tasks.
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -11,6 +11,7 @@ using ZB.MOM.WW.LmxOpcUa.Host.Domain;
|
||||
using ZB.MOM.WW.LmxOpcUa.Host.Historian;
|
||||
using ZB.MOM.WW.LmxOpcUa.Host.Metrics;
|
||||
using ZB.MOM.WW.LmxOpcUa.Host.MxAccess;
|
||||
using ZB.MOM.WW.LmxOpcUa.Host.Utilities;
|
||||
|
||||
namespace ZB.MOM.WW.LmxOpcUa.Host.OpcUa
|
||||
{
|
||||
@@ -107,6 +108,8 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.OpcUa
|
||||
private readonly NodeId? _writeConfigureRoleId;
|
||||
private readonly NodeId? _writeOperateRoleId;
|
||||
private readonly NodeId? _writeTuneRoleId;
|
||||
private readonly TimeSpan _mxAccessRequestTimeout;
|
||||
private readonly TimeSpan _historianRequestTimeout;
|
||||
private long _dispatchCycleCount;
|
||||
private long _suppressedUpdatesCount;
|
||||
private volatile bool _dispatchDisposed;
|
||||
@@ -128,6 +131,13 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.OpcUa
|
||||
private long _alarmAckEventCount;
|
||||
private long _alarmAckWriteFailures;
|
||||
|
||||
// Background subscribe tracking: every fire-and-forget SubscribeAsync for alarm auto-subscribe
|
||||
// and transferred-subscription restore is registered here so shutdown can drain pending work
|
||||
// with a bounded timeout, and so tests can observe pending count without races.
|
||||
private readonly ConcurrentDictionary<long, Task> _pendingBackgroundSubscribes =
|
||||
new ConcurrentDictionary<long, Task>();
|
||||
private long _backgroundSubscribeCounter;
|
||||
|
||||
/// <summary>
|
||||
/// Initializes a new node manager for the Galaxy-backed OPC UA namespace.
|
||||
/// </summary>
|
||||
@@ -156,7 +166,9 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.OpcUa
|
||||
NodeId? alarmAckRoleId = null,
|
||||
AlarmObjectFilter? alarmObjectFilter = null,
|
||||
bool runtimeStatusProbesEnabled = false,
|
||||
int runtimeStatusUnknownTimeoutSeconds = 15)
|
||||
int runtimeStatusUnknownTimeoutSeconds = 15,
|
||||
int mxAccessRequestTimeoutSeconds = 30,
|
||||
int historianRequestTimeoutSeconds = 60)
|
||||
: base(server, configuration, namespaceUri)
|
||||
{
|
||||
_namespaceUri = namespaceUri;
|
||||
@@ -170,6 +182,8 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.OpcUa
|
||||
_writeTuneRoleId = writeTuneRoleId;
|
||||
_writeConfigureRoleId = writeConfigureRoleId;
|
||||
_alarmAckRoleId = alarmAckRoleId;
|
||||
_mxAccessRequestTimeout = TimeSpan.FromSeconds(Math.Max(1, mxAccessRequestTimeoutSeconds));
|
||||
_historianRequestTimeout = TimeSpan.FromSeconds(Math.Max(1, historianRequestTimeoutSeconds));
|
||||
|
||||
if (runtimeStatusProbesEnabled)
|
||||
{
|
||||
@@ -569,7 +583,24 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.OpcUa
|
||||
// Sync the galaxy runtime probe set against the rebuilt hierarchy. This runs
|
||||
// synchronously on the calling thread and issues AdviseSupervisory per host —
|
||||
// expected 500ms-1s additional startup latency for a large multi-host galaxy.
|
||||
_galaxyRuntimeProbeManager?.SyncAsync(hierarchy).GetAwaiter().GetResult();
|
||||
// Bounded by _mxAccessRequestTimeout so a hung probe sync cannot park the address
|
||||
// space rebuild indefinitely; on timeout we log a warning and continue with the
|
||||
// partial probe set (probe sync is advisory, not required for address space correctness).
|
||||
if (_galaxyRuntimeProbeManager != null)
|
||||
{
|
||||
try
|
||||
{
|
||||
SyncOverAsync.WaitSync(
|
||||
_galaxyRuntimeProbeManager.SyncAsync(hierarchy),
|
||||
_mxAccessRequestTimeout,
|
||||
"GalaxyRuntimeProbeManager.SyncAsync");
|
||||
}
|
||||
catch (TimeoutException ex)
|
||||
{
|
||||
Log.Warning(ex, "Runtime probe sync exceeded {Timeout}s; continuing with partial probe set",
|
||||
_mxAccessRequestTimeout.TotalSeconds);
|
||||
}
|
||||
}
|
||||
|
||||
_lastHierarchy = new List<GalaxyObjectInfo>(hierarchy);
|
||||
_lastAttributes = new List<GalaxyAttributeInfo>(attributes);
|
||||
@@ -854,15 +885,40 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.OpcUa
|
||||
{
|
||||
if (string.IsNullOrEmpty(tag) || !_tagToVariableNode.ContainsKey(tag))
|
||||
continue;
|
||||
var alarmTag = tag;
|
||||
_mxAccessClient.SubscribeAsync(alarmTag, (_, _) => { })
|
||||
.ContinueWith(t => Log.Warning(t.Exception?.InnerException,
|
||||
"Failed to auto-subscribe to alarm tag {Tag}", alarmTag),
|
||||
TaskContinuationOptions.OnlyOnFaulted);
|
||||
TrackBackgroundSubscribe(tag, "alarm auto-subscribe");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Issues a fire-and-forget <c>SubscribeAsync</c> for <paramref name="tag"/> and registers
|
||||
/// the resulting task so shutdown can drain pending work with a bounded timeout. The
|
||||
/// continuation both removes the completed entry and logs faults with the supplied
|
||||
/// <paramref name="context"/>.
|
||||
/// </summary>
|
||||
private void TrackBackgroundSubscribe(string tag, string context)
|
||||
{
|
||||
if (_dispatchDisposed)
|
||||
return;
|
||||
|
||||
var id = Interlocked.Increment(ref _backgroundSubscribeCounter);
|
||||
var task = _mxAccessClient.SubscribeAsync(tag, (_, _) => { });
|
||||
_pendingBackgroundSubscribes[id] = task;
|
||||
task.ContinueWith(t =>
|
||||
{
|
||||
_pendingBackgroundSubscribes.TryRemove(id, out _);
|
||||
if (t.IsFaulted)
|
||||
Log.Warning(t.Exception?.InnerException, "Background subscribe failed ({Context}) for {Tag}",
|
||||
context, tag);
|
||||
}, TaskContinuationOptions.ExecuteSynchronously);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Gets the number of background subscribe tasks currently in flight. Exposed for tests
|
||||
/// and for the status dashboard subscription panel.
|
||||
/// </summary>
|
||||
internal int PendingBackgroundSubscribeCount => _pendingBackgroundSubscribes.Count;
|
||||
|
||||
private ServiceResult OnAlarmAcknowledge(
|
||||
ISystemContext context, ConditionState condition, byte[] eventId, LocalizedText comment)
|
||||
{
|
||||
@@ -1358,11 +1414,7 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.OpcUa
|
||||
{
|
||||
if (string.IsNullOrEmpty(tag) || !_tagToVariableNode.ContainsKey(tag))
|
||||
continue;
|
||||
var subtreeAlarmTag = tag;
|
||||
_mxAccessClient.SubscribeAsync(subtreeAlarmTag, (_, _) => { })
|
||||
.ContinueWith(t => Log.Warning(t.Exception?.InnerException,
|
||||
"Failed to subscribe alarm tag in subtree {Tag}", subtreeAlarmTag),
|
||||
TaskContinuationOptions.OnlyOnFaulted);
|
||||
TrackBackgroundSubscribe(tag, "subtree alarm auto-subscribe");
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1705,10 +1757,18 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.OpcUa
|
||||
|
||||
try
|
||||
{
|
||||
var vtq = _mxAccessClient.ReadAsync(tagRef).GetAwaiter().GetResult();
|
||||
var vtq = SyncOverAsync.WaitSync(
|
||||
_mxAccessClient.ReadAsync(tagRef),
|
||||
_mxAccessRequestTimeout,
|
||||
"MxAccessClient.ReadAsync");
|
||||
results[i] = CreatePublishedDataValue(tagRef, vtq);
|
||||
errors[i] = ServiceResult.Good;
|
||||
}
|
||||
catch (TimeoutException ex)
|
||||
{
|
||||
Log.Warning(ex, "Read timed out for {TagRef}", tagRef);
|
||||
errors[i] = new ServiceResult(StatusCodes.BadTimeout);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
Log.Warning(ex, "Read failed for {TagRef}", tagRef);
|
||||
@@ -1779,7 +1839,10 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.OpcUa
|
||||
value = updatedArray;
|
||||
}
|
||||
|
||||
var success = _mxAccessClient.WriteAsync(tagRef, value).GetAwaiter().GetResult();
|
||||
var success = SyncOverAsync.WaitSync(
|
||||
_mxAccessClient.WriteAsync(tagRef, value),
|
||||
_mxAccessRequestTimeout,
|
||||
"MxAccessClient.WriteAsync");
|
||||
if (success)
|
||||
{
|
||||
PublishLocalWrite(tagRef, value);
|
||||
@@ -1790,6 +1853,11 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.OpcUa
|
||||
errors[i] = new ServiceResult(StatusCodes.BadInternalError);
|
||||
}
|
||||
}
|
||||
catch (TimeoutException ex)
|
||||
{
|
||||
Log.Warning(ex, "Write timed out for {TagRef}", tagRef);
|
||||
errors[i] = new ServiceResult(StatusCodes.BadTimeout);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
Log.Warning(ex, "Write failed for {TagRef}", tagRef);
|
||||
@@ -2017,15 +2085,23 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.OpcUa
|
||||
try
|
||||
{
|
||||
var maxValues = details.NumValuesPerNode > 0 ? (int)details.NumValuesPerNode : 0;
|
||||
var dataValues = _historianDataSource.ReadRawAsync(
|
||||
tagRef, details.StartTime, details.EndTime, maxValues)
|
||||
.GetAwaiter().GetResult();
|
||||
var dataValues = SyncOverAsync.WaitSync(
|
||||
_historianDataSource.ReadRawAsync(
|
||||
tagRef, details.StartTime, details.EndTime, maxValues),
|
||||
_historianRequestTimeout,
|
||||
"HistorianDataSource.ReadRawAsync");
|
||||
|
||||
if (details.ReturnBounds)
|
||||
AddBoundingValues(dataValues, details.StartTime, details.EndTime);
|
||||
|
||||
ReturnHistoryPage(dataValues, details.NumValuesPerNode, results, errors, idx);
|
||||
}
|
||||
catch (TimeoutException ex)
|
||||
{
|
||||
historyScope.SetSuccess(false);
|
||||
Log.Warning(ex, "HistoryRead raw timed out for {TagRef}", tagRef);
|
||||
errors[idx] = new ServiceResult(StatusCodes.BadTimeout);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
historyScope.SetSuccess(false);
|
||||
@@ -2094,13 +2170,21 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.OpcUa
|
||||
using var historyScope = _metrics.BeginOperation("HistoryReadProcessed");
|
||||
try
|
||||
{
|
||||
var dataValues = _historianDataSource.ReadAggregateAsync(
|
||||
var dataValues = SyncOverAsync.WaitSync(
|
||||
_historianDataSource.ReadAggregateAsync(
|
||||
tagRef, details.StartTime, details.EndTime,
|
||||
details.ProcessingInterval, column)
|
||||
.GetAwaiter().GetResult();
|
||||
details.ProcessingInterval, column),
|
||||
_historianRequestTimeout,
|
||||
"HistorianDataSource.ReadAggregateAsync");
|
||||
|
||||
ReturnHistoryPage(dataValues, 0, results, errors, idx);
|
||||
}
|
||||
catch (TimeoutException ex)
|
||||
{
|
||||
historyScope.SetSuccess(false);
|
||||
Log.Warning(ex, "HistoryRead processed timed out for {TagRef}", tagRef);
|
||||
errors[idx] = new ServiceResult(StatusCodes.BadTimeout);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
historyScope.SetSuccess(false);
|
||||
@@ -2150,8 +2234,10 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.OpcUa
|
||||
for (var i = 0; i < details.ReqTimes.Count; i++)
|
||||
timestamps[i] = details.ReqTimes[i];
|
||||
|
||||
var dataValues = _historianDataSource.ReadAtTimeAsync(tagRef, timestamps)
|
||||
.GetAwaiter().GetResult();
|
||||
var dataValues = SyncOverAsync.WaitSync(
|
||||
_historianDataSource.ReadAtTimeAsync(tagRef, timestamps),
|
||||
_historianRequestTimeout,
|
||||
"HistorianDataSource.ReadAtTimeAsync");
|
||||
|
||||
var historyData = new HistoryData();
|
||||
historyData.DataValues.AddRange(dataValues);
|
||||
@@ -2163,6 +2249,12 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.OpcUa
|
||||
};
|
||||
errors[idx] = ServiceResult.Good;
|
||||
}
|
||||
catch (TimeoutException ex)
|
||||
{
|
||||
historyScope.SetSuccess(false);
|
||||
Log.Warning(ex, "HistoryRead at-time timed out for {TagRef}", tagRef);
|
||||
errors[idx] = new ServiceResult(StatusCodes.BadTimeout);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
historyScope.SetSuccess(false);
|
||||
@@ -2215,9 +2307,11 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.OpcUa
|
||||
try
|
||||
{
|
||||
var maxEvents = details.NumValuesPerNode > 0 ? (int)details.NumValuesPerNode : 0;
|
||||
var events = _historianDataSource.ReadEventsAsync(
|
||||
sourceName, details.StartTime, details.EndTime, maxEvents)
|
||||
.GetAwaiter().GetResult();
|
||||
var events = SyncOverAsync.WaitSync(
|
||||
_historianDataSource.ReadEventsAsync(
|
||||
sourceName, details.StartTime, details.EndTime, maxEvents),
|
||||
_historianRequestTimeout,
|
||||
"HistorianDataSource.ReadEventsAsync");
|
||||
|
||||
var historyEvent = new HistoryEvent();
|
||||
foreach (var evt in events)
|
||||
@@ -2247,6 +2341,12 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.OpcUa
|
||||
};
|
||||
errors[idx] = ServiceResult.Good;
|
||||
}
|
||||
catch (TimeoutException ex)
|
||||
{
|
||||
historyScope.SetSuccess(false);
|
||||
Log.Warning(ex, "HistoryRead events timed out for {NodeId}", nodeIdStr);
|
||||
errors[idx] = new ServiceResult(StatusCodes.BadTimeout);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
historyScope.SetSuccess(false);
|
||||
@@ -2476,13 +2576,7 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.OpcUa
|
||||
}
|
||||
|
||||
foreach (var tagRef in tagsToSubscribe)
|
||||
{
|
||||
var transferTag = tagRef;
|
||||
_mxAccessClient.SubscribeAsync(transferTag, (_, _) => { })
|
||||
.ContinueWith(t => Log.Warning(t.Exception?.InnerException,
|
||||
"Failed to restore subscription for transferred tag {Tag}", transferTag),
|
||||
TaskContinuationOptions.OnlyOnFaulted);
|
||||
}
|
||||
TrackBackgroundSubscribe(tagRef, "transferred subscription restore");
|
||||
}
|
||||
|
||||
private void OnMxAccessDataChange(string address, Vtq vtq)
|
||||
@@ -2798,12 +2892,33 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.OpcUa
|
||||
// client, so the probes close cleanly.
|
||||
_galaxyRuntimeProbeManager?.Dispose();
|
||||
StopDispatchThread();
|
||||
DrainPendingBackgroundSubscribes();
|
||||
_dataChangeSignal.Dispose();
|
||||
}
|
||||
|
||||
base.Dispose(disposing);
|
||||
}
|
||||
|
||||
private void DrainPendingBackgroundSubscribes()
|
||||
{
|
||||
var snapshot = _pendingBackgroundSubscribes.Values.ToArray();
|
||||
if (snapshot.Length == 0)
|
||||
return;
|
||||
|
||||
try
|
||||
{
|
||||
Task.WaitAll(snapshot, TimeSpan.FromSeconds(5));
|
||||
Log.Information("Drained {Count} pending background subscribe(s) on shutdown", snapshot.Length);
|
||||
}
|
||||
catch (AggregateException ex)
|
||||
{
|
||||
// Individual faults were already logged by the tracked continuation; record the
|
||||
// aggregate at debug level to aid diagnosis without double-logging each failure.
|
||||
Log.Debug(ex, "Background subscribe drain completed with {FaultCount} fault(s)",
|
||||
ex.InnerExceptions.Count);
|
||||
}
|
||||
}
|
||||
|
||||
#endregion
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user