Add alarm acknowledge plan and incorporate code review fixes
Adds alarm_ack.md documenting the two-way acknowledge flow (OPC UA client writes AckMsg, Galaxy confirms via Acked data change). Includes external code review fixes for subscriptions and node manager, and removes stale plan files now superseded by component documentation. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -34,7 +34,6 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.OpcUa
|
||||
private readonly Dictionary<string, BaseDataVariableState> _tagToVariableNode = new Dictionary<string, BaseDataVariableState>(StringComparer.OrdinalIgnoreCase);
|
||||
private readonly Dictionary<string, TagMetadata> _tagMetadata = new Dictionary<string, TagMetadata>(StringComparer.OrdinalIgnoreCase);
|
||||
|
||||
private readonly object _lock = new object();
|
||||
private IDictionary<NodeId, IList<IReference>>? _externalReferences;
|
||||
|
||||
// Data change dispatch queue: decouples MXAccess STA callbacks from OPC UA framework Lock
|
||||
@@ -42,6 +41,7 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.OpcUa
|
||||
private readonly AutoResetEvent _dataChangeSignal = new AutoResetEvent(false);
|
||||
private Thread? _dispatchThread;
|
||||
private volatile bool _dispatchRunning;
|
||||
private volatile bool _dispatchDisposed;
|
||||
|
||||
// Dispatch queue metrics
|
||||
private long _totalMxChangeEvents;
|
||||
@@ -1397,7 +1397,8 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.OpcUa
|
||||
/// <param name="fullTagReference">The fully qualified Galaxy tag reference to subscribe.</param>
|
||||
internal void SubscribeTag(string fullTagReference)
|
||||
{
|
||||
lock (_lock)
|
||||
var shouldSubscribe = false;
|
||||
lock (Lock)
|
||||
{
|
||||
if (_subscriptionRefCounts.TryGetValue(fullTagReference, out var count))
|
||||
{
|
||||
@@ -1406,9 +1407,12 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.OpcUa
|
||||
else
|
||||
{
|
||||
_subscriptionRefCounts[fullTagReference] = 1;
|
||||
_ = _mxAccessClient.SubscribeAsync(fullTagReference, (_, _) => { });
|
||||
shouldSubscribe = true;
|
||||
}
|
||||
}
|
||||
|
||||
if (shouldSubscribe)
|
||||
_ = _mxAccessClient.SubscribeAsync(fullTagReference, (_, _) => { });
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
@@ -1417,14 +1421,15 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.OpcUa
|
||||
/// <param name="fullTagReference">The fully qualified Galaxy tag reference to unsubscribe.</param>
|
||||
internal void UnsubscribeTag(string fullTagReference)
|
||||
{
|
||||
lock (_lock)
|
||||
var shouldUnsubscribe = false;
|
||||
lock (Lock)
|
||||
{
|
||||
if (_subscriptionRefCounts.TryGetValue(fullTagReference, out var count))
|
||||
{
|
||||
if (count <= 1)
|
||||
{
|
||||
_subscriptionRefCounts.Remove(fullTagReference);
|
||||
_ = _mxAccessClient.UnsubscribeAsync(fullTagReference);
|
||||
shouldUnsubscribe = true;
|
||||
}
|
||||
else
|
||||
{
|
||||
@@ -1432,6 +1437,9 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.OpcUa
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (shouldUnsubscribe)
|
||||
_ = _mxAccessClient.UnsubscribeAsync(fullTagReference);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
@@ -1445,25 +1453,38 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.OpcUa
|
||||
.GroupBy(tagRef => tagRef, StringComparer.OrdinalIgnoreCase)
|
||||
.ToDictionary(g => g.Key, g => g.Count(), StringComparer.OrdinalIgnoreCase);
|
||||
|
||||
var tagsToSubscribe = new List<string>();
|
||||
foreach (var kvp in transferredCounts)
|
||||
{
|
||||
lock (_lock)
|
||||
lock (Lock)
|
||||
{
|
||||
if (_subscriptionRefCounts.ContainsKey(kvp.Key))
|
||||
continue;
|
||||
|
||||
_subscriptionRefCounts[kvp.Key] = kvp.Value;
|
||||
tagsToSubscribe.Add(kvp.Key);
|
||||
}
|
||||
|
||||
_ = _mxAccessClient.SubscribeAsync(kvp.Key, (_, _) => { });
|
||||
}
|
||||
|
||||
foreach (var tagRef in tagsToSubscribe)
|
||||
_ = _mxAccessClient.SubscribeAsync(tagRef, (_, _) => { });
|
||||
}
|
||||
|
||||
private void OnMxAccessDataChange(string address, Vtq vtq)
|
||||
{
|
||||
if (_dispatchDisposed)
|
||||
return;
|
||||
|
||||
Interlocked.Increment(ref _totalMxChangeEvents);
|
||||
_pendingDataChanges[address] = vtq;
|
||||
_dataChangeSignal.Set();
|
||||
try
|
||||
{
|
||||
_dataChangeSignal.Set();
|
||||
}
|
||||
catch (ObjectDisposedException)
|
||||
{
|
||||
// Shutdown may race with one final callback from the runtime.
|
||||
}
|
||||
}
|
||||
|
||||
#endregion
|
||||
@@ -1494,103 +1515,141 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.OpcUa
|
||||
|
||||
while (_dispatchRunning)
|
||||
{
|
||||
_dataChangeSignal.WaitOne(TimeSpan.FromMilliseconds(100));
|
||||
|
||||
if (!_dispatchRunning) break;
|
||||
|
||||
var keys = _pendingDataChanges.Keys.ToList();
|
||||
if (keys.Count == 0)
|
||||
try
|
||||
{
|
||||
ReportDispatchMetricsIfDue();
|
||||
continue;
|
||||
}
|
||||
_dataChangeSignal.WaitOne(TimeSpan.FromMilliseconds(100));
|
||||
|
||||
// Prepare updates outside the Lock — no IO, just value conversion
|
||||
var updates = new List<(BaseDataVariableState variable, DataValue dataValue)>(keys.Count);
|
||||
var pendingAlarmEvents = new List<(AlarmInfo info, bool active)>();
|
||||
if (!_dispatchRunning)
|
||||
break;
|
||||
|
||||
foreach (var address in keys)
|
||||
{
|
||||
if (_pendingDataChanges.TryRemove(address, out var vtq))
|
||||
var keys = _pendingDataChanges.Keys.ToList();
|
||||
if (keys.Count == 0)
|
||||
{
|
||||
if (_tagToVariableNode.TryGetValue(address, out var variable))
|
||||
{
|
||||
try
|
||||
{
|
||||
var dataValue = CreatePublishedDataValue(address, vtq);
|
||||
updates.Add((variable, dataValue));
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
Log.Warning(ex, "Error preparing data change for {Address}", address);
|
||||
}
|
||||
}
|
||||
ReportDispatchMetricsIfDue();
|
||||
continue;
|
||||
}
|
||||
|
||||
// Check for alarm InAlarm transitions
|
||||
if (_alarmInAlarmTags.TryGetValue(address, out var alarmInfo))
|
||||
{
|
||||
var newInAlarm = vtq.Value is true || vtq.Value is 1 || (vtq.Value is int intVal && intVal != 0);
|
||||
if (newInAlarm != alarmInfo.LastInAlarm)
|
||||
{
|
||||
alarmInfo.LastInAlarm = newInAlarm;
|
||||
// Prepare updates outside the Lock. Shared-state lookups stay inside the Lock.
|
||||
var updates = new List<(string address, BaseDataVariableState variable, DataValue dataValue)>(keys.Count);
|
||||
var pendingAlarmEvents = new List<(string address, AlarmInfo info, bool active, ushort? severity, string? message)>();
|
||||
|
||||
// Read Priority and DescAttrName via MXAccess (outside Lock, safe here)
|
||||
if (newInAlarm)
|
||||
foreach (var address in keys)
|
||||
{
|
||||
if (!_pendingDataChanges.TryRemove(address, out var vtq))
|
||||
continue;
|
||||
|
||||
AlarmInfo? alarmInfo = null;
|
||||
bool newInAlarm = false;
|
||||
|
||||
lock (Lock)
|
||||
{
|
||||
if (_tagToVariableNode.TryGetValue(address, out var variable))
|
||||
{
|
||||
try
|
||||
{
|
||||
try
|
||||
{
|
||||
var pVtq = _mxAccessClient.ReadAsync(alarmInfo.PriorityTagReference).GetAwaiter().GetResult();
|
||||
if (pVtq.Value is int ip) alarmInfo.CachedSeverity = (ushort)System.Math.Min(System.Math.Max(ip, 1), 1000);
|
||||
else if (pVtq.Value is short sp) alarmInfo.CachedSeverity = (ushort)System.Math.Min(System.Math.Max((int)sp, 1), 1000);
|
||||
}
|
||||
catch { /* keep previous */ }
|
||||
|
||||
try
|
||||
{
|
||||
var dVtq = _mxAccessClient.ReadAsync(alarmInfo.DescAttrNameTagReference).GetAwaiter().GetResult();
|
||||
if (dVtq.Value is string desc && !string.IsNullOrEmpty(desc))
|
||||
alarmInfo.CachedMessage = desc;
|
||||
}
|
||||
catch { /* keep previous */ }
|
||||
var dataValue = CreatePublishedDataValue(address, vtq);
|
||||
updates.Add((address, variable, dataValue));
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
Log.Warning(ex, "Error preparing data change for {Address}", address);
|
||||
}
|
||||
}
|
||||
|
||||
pendingAlarmEvents.Add((alarmInfo, newInAlarm));
|
||||
if (_alarmInAlarmTags.TryGetValue(address, out alarmInfo))
|
||||
{
|
||||
newInAlarm = vtq.Value is true || vtq.Value is 1 || (vtq.Value is int intVal && intVal != 0);
|
||||
if (newInAlarm == alarmInfo.LastInAlarm)
|
||||
alarmInfo = null;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Apply under Lock so ClearChangeMasks propagates to monitored items
|
||||
if (updates.Count > 0 || pendingAlarmEvents.Count > 0)
|
||||
{
|
||||
lock (Lock)
|
||||
{
|
||||
foreach (var (variable, dataValue) in updates)
|
||||
{
|
||||
variable.Value = dataValue.Value;
|
||||
variable.StatusCode = dataValue.StatusCode;
|
||||
variable.Timestamp = dataValue.SourceTimestamp;
|
||||
variable.ClearChangeMasks(SystemContext, false);
|
||||
}
|
||||
if (alarmInfo == null)
|
||||
continue;
|
||||
|
||||
// Report alarm events
|
||||
foreach (var (info, active) in pendingAlarmEvents)
|
||||
ushort? severity = null;
|
||||
string? message = null;
|
||||
|
||||
if (newInAlarm)
|
||||
{
|
||||
try
|
||||
{
|
||||
ReportAlarmEvent(info, active);
|
||||
var pVtq = _mxAccessClient.ReadAsync(alarmInfo.PriorityTagReference).GetAwaiter().GetResult();
|
||||
if (pVtq.Value is int ip)
|
||||
severity = (ushort)System.Math.Min(System.Math.Max(ip, 1), 1000);
|
||||
else if (pVtq.Value is short sp)
|
||||
severity = (ushort)System.Math.Min(System.Math.Max((int)sp, 1), 1000);
|
||||
}
|
||||
catch (Exception ex)
|
||||
catch
|
||||
{
|
||||
Log.Warning(ex, "Error reporting alarm event for {Source}", info.SourceName);
|
||||
// Keep the previously cached severity when refresh reads fail.
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
var dVtq = _mxAccessClient.ReadAsync(alarmInfo.DescAttrNameTagReference).GetAwaiter().GetResult();
|
||||
if (dVtq.Value is string desc && !string.IsNullOrEmpty(desc))
|
||||
message = desc;
|
||||
}
|
||||
catch
|
||||
{
|
||||
// Keep the previously cached message when refresh reads fail.
|
||||
}
|
||||
}
|
||||
|
||||
pendingAlarmEvents.Add((address, alarmInfo, newInAlarm, severity, message));
|
||||
}
|
||||
|
||||
// Apply under Lock so ClearChangeMasks propagates to monitored items.
|
||||
if (updates.Count > 0 || pendingAlarmEvents.Count > 0)
|
||||
{
|
||||
lock (Lock)
|
||||
{
|
||||
foreach (var (address, variable, dataValue) in updates)
|
||||
{
|
||||
if (!_tagToVariableNode.TryGetValue(address, out var currentVariable) || !ReferenceEquals(currentVariable, variable))
|
||||
continue;
|
||||
|
||||
variable.Value = dataValue.Value;
|
||||
variable.StatusCode = dataValue.StatusCode;
|
||||
variable.Timestamp = dataValue.SourceTimestamp;
|
||||
variable.ClearChangeMasks(SystemContext, false);
|
||||
}
|
||||
|
||||
foreach (var (address, info, active, severity, message) in pendingAlarmEvents)
|
||||
{
|
||||
if (!_alarmInAlarmTags.TryGetValue(address, out var currentInfo) || !ReferenceEquals(currentInfo, info))
|
||||
continue;
|
||||
|
||||
if (currentInfo.LastInAlarm == active)
|
||||
continue;
|
||||
|
||||
currentInfo.LastInAlarm = active;
|
||||
if (severity.HasValue)
|
||||
currentInfo.CachedSeverity = severity.Value;
|
||||
if (!string.IsNullOrEmpty(message))
|
||||
currentInfo.CachedMessage = message!;
|
||||
|
||||
try
|
||||
{
|
||||
ReportAlarmEvent(currentInfo, active);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
Log.Warning(ex, "Error reporting alarm event for {Source}", currentInfo.SourceName);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Interlocked.Add(ref _totalDispatchBatchSize, updates.Count);
|
||||
Interlocked.Increment(ref _dispatchCycleCount);
|
||||
ReportDispatchMetricsIfDue();
|
||||
Interlocked.Add(ref _totalDispatchBatchSize, updates.Count);
|
||||
Interlocked.Increment(ref _dispatchCycleCount);
|
||||
ReportDispatchMetricsIfDue();
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
Log.Error(ex, "Unhandled error in data change dispatch loop");
|
||||
}
|
||||
}
|
||||
|
||||
Log.Information("Data change dispatch thread stopped");
|
||||
@@ -1629,6 +1688,8 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.OpcUa
|
||||
{
|
||||
if (disposing)
|
||||
{
|
||||
_dispatchDisposed = true;
|
||||
_mxAccessClient.OnTagValueChanged -= OnMxAccessDataChange;
|
||||
StopDispatchThread();
|
||||
_dataChangeSignal.Dispose();
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user