Resolve 6 of 7 stability review findings and close test coverage gaps

Fixes P1 StaComThread hang (crash-path faulting via WorkItem queue), P1 subscription
fire-and-forget (block+log or ContinueWith on 5 call sites), P2 continuation point
leak (PurgeExpired on Retrieve/Release), P2 dashboard bind failure (localhost prefix,
bool Start), P3 background loop double-start (task handles + join on stop in 3 files),
and P3 config logging exposure (SqlConnectionStringBuilder password masking). Adds
FakeMxAccessClient fault injection and 12 new tests. Documents required runtime
assemblies in ServiceHosting.md.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Joseph Doherty
2026-04-07 15:37:27 -04:00
parent a28600ab1b
commit 95ad9c6866
16 changed files with 692 additions and 52 deletions

View File

@@ -1,4 +1,5 @@
using System;
using System.Data.SqlClient;
using System.Linq;
using Opc.Ua;
using Serilog;
@@ -70,7 +71,7 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.Configuration
// Galaxy Repository
Log.Information(
"GalaxyRepository.ConnectionString={ConnectionString}, ChangeDetectionInterval={ChangeInterval}s, CommandTimeout={CmdTimeout}s, ExtendedAttributes={ExtendedAttributes}",
config.GalaxyRepository.ConnectionString, config.GalaxyRepository.ChangeDetectionIntervalSeconds,
SanitizeConnectionString(config.GalaxyRepository.ConnectionString), config.GalaxyRepository.ChangeDetectionIntervalSeconds,
config.GalaxyRepository.CommandTimeoutSeconds, config.GalaxyRepository.ExtendedAttributes);
if (string.IsNullOrWhiteSpace(config.GalaxyRepository.ConnectionString))
@@ -210,5 +211,22 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.Configuration
Log.Information("=== Configuration {Status} ===", valid ? "Valid" : "INVALID");
return valid;
}
private static string SanitizeConnectionString(string connectionString)
{
if (string.IsNullOrWhiteSpace(connectionString))
return "(empty)";
try
{
var builder = new SqlConnectionStringBuilder(connectionString);
if (!string.IsNullOrEmpty(builder.Password))
builder.Password = "********";
return builder.ConnectionString;
}
catch
{
return "(unparseable)";
}
}
}
}

View File

@@ -16,6 +16,7 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.GalaxyRepository
private readonly IGalaxyRepository _repository;
private CancellationTokenSource? _cts;
private Task? _pollTask;
/// <summary>
/// Initializes a new change detector for Galaxy deploy timestamps.
@@ -55,8 +56,11 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.GalaxyRepository
/// </summary>
public void Start()
{
if (_cts != null)
Stop();
_cts = new CancellationTokenSource();
Task.Run(() => PollLoopAsync(_cts.Token));
_pollTask = Task.Run(() => PollLoopAsync(_cts.Token));
Log.Information("Change detection started (interval={Interval}s)", _intervalSeconds);
}
@@ -66,6 +70,8 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.GalaxyRepository
public void Stop()
{
_cts?.Cancel();
try { _pollTask?.Wait(TimeSpan.FromSeconds(5)); } catch { /* timeout or faulted */ }
_pollTask = null;
Log.Information("Change detection stopped");
}

View File

@@ -15,7 +15,14 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.Historian
private static readonly ILogger Log = Serilog.Log.ForContext<HistoryContinuationPointManager>();
private readonly ConcurrentDictionary<Guid, StoredContinuation> _store = new();
private readonly TimeSpan _timeout = TimeSpan.FromMinutes(5);
private readonly TimeSpan _timeout;
public HistoryContinuationPointManager() : this(TimeSpan.FromMinutes(5)) { }
internal HistoryContinuationPointManager(TimeSpan timeout)
{
_timeout = timeout;
}
/// <summary>
/// Stores remaining data values and returns a continuation point identifier.
@@ -35,6 +42,7 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.Historian
/// </summary>
public List<DataValue>? Retrieve(byte[] continuationPoint)
{
PurgeExpired();
if (continuationPoint == null || continuationPoint.Length != 16)
return null;
@@ -56,6 +64,7 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.Historian
/// </summary>
public void Release(byte[] continuationPoint)
{
PurgeExpired();
if (continuationPoint == null || continuationPoint.Length != 16)
return;

View File

@@ -7,13 +7,18 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.MxAccess
{
public sealed partial class MxAccessClient
{
private Task? _monitorTask;
/// <summary>
/// Starts the background monitor that reconnects dropped sessions and watches the probe tag for staleness.
/// </summary>
public void StartMonitor()
{
if (_monitorCts != null)
StopMonitor();
_monitorCts = new CancellationTokenSource();
Task.Run(() => MonitorLoopAsync(_monitorCts.Token));
_monitorTask = Task.Run(() => MonitorLoopAsync(_monitorCts.Token));
Log.Information("MxAccess monitor started (interval={Interval}s)", _config.MonitorIntervalSeconds);
}
@@ -23,6 +28,8 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.MxAccess
public void StopMonitor()
{
_monitorCts?.Cancel();
try { _monitorTask?.Wait(TimeSpan.FromSeconds(5)); } catch { /* timeout or faulted */ }
_monitorTask = null;
}
private async Task MonitorLoopAsync(CancellationToken ct)

View File

@@ -21,12 +21,13 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.MxAccess
private readonly TaskCompletionSource<bool> _ready = new();
private readonly Thread _thread;
private readonly ConcurrentQueue<Action> _workItems = new();
private readonly ConcurrentQueue<WorkItem> _workItems = new();
private long _appMessages;
private long _dispatchedMessages;
private bool _disposed;
private DateTime _lastLogTime;
private volatile uint _nativeThreadId;
private volatile bool _pumpExited;
private long _totalMessages;
private long _workItemsExecuted;
@@ -47,7 +48,7 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.MxAccess
/// <summary>
/// Gets a value indicating whether the STA thread is running and able to accept work.
/// </summary>
public bool IsRunning => _nativeThreadId != 0 && !_disposed;
public bool IsRunning => _nativeThreadId != 0 && !_disposed && !_pumpExited;
/// <summary>
/// Stops the STA thread and releases the message-pump resources used for COM interop.
@@ -59,7 +60,7 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.MxAccess
try
{
if (_nativeThreadId != 0)
if (_nativeThreadId != 0 && !_pumpExited)
PostThreadMessage(_nativeThreadId, WM_APP + 1, IntPtr.Zero, IntPtr.Zero);
_thread.Join(TimeSpan.FromSeconds(5));
}
@@ -68,6 +69,7 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.MxAccess
Log.Warning(ex, "Error shutting down STA COM thread");
}
DrainAndFaultQueue();
Log.Information("STA COM thread stopped");
}
@@ -89,21 +91,32 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.MxAccess
public Task RunAsync(Action action)
{
if (_disposed) throw new ObjectDisposedException(nameof(StaComThread));
if (_pumpExited) throw new InvalidOperationException("STA COM thread pump has exited");
var tcs = new TaskCompletionSource<bool>();
_workItems.Enqueue(() =>
_workItems.Enqueue(new WorkItem
{
try
Execute = () =>
{
action();
tcs.TrySetResult(true);
}
catch (Exception ex)
{
tcs.TrySetException(ex);
}
try
{
action();
tcs.TrySetResult(true);
}
catch (Exception ex)
{
tcs.TrySetException(ex);
}
},
Fault = ex => tcs.TrySetException(ex)
});
PostThreadMessage(_nativeThreadId, WM_APP, IntPtr.Zero, IntPtr.Zero);
if (!PostThreadMessage(_nativeThreadId, WM_APP, IntPtr.Zero, IntPtr.Zero))
{
_pumpExited = true;
DrainAndFaultQueue();
}
return tcs.Task;
}
@@ -116,20 +129,31 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.MxAccess
public Task<T> RunAsync<T>(Func<T> func)
{
if (_disposed) throw new ObjectDisposedException(nameof(StaComThread));
if (_pumpExited) throw new InvalidOperationException("STA COM thread pump has exited");
var tcs = new TaskCompletionSource<T>();
_workItems.Enqueue(() =>
_workItems.Enqueue(new WorkItem
{
try
Execute = () =>
{
tcs.TrySetResult(func());
}
catch (Exception ex)
{
tcs.TrySetException(ex);
}
try
{
tcs.TrySetResult(func());
}
catch (Exception ex)
{
tcs.TrySetException(ex);
}
},
Fault = ex => tcs.TrySetException(ex)
});
PostThreadMessage(_nativeThreadId, WM_APP, IntPtr.Zero, IntPtr.Zero);
if (!PostThreadMessage(_nativeThreadId, WM_APP, IntPtr.Zero, IntPtr.Zero))
{
_pumpExited = true;
DrainAndFaultQueue();
}
return tcs.Task;
}
@@ -180,6 +204,11 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.MxAccess
Log.Error(ex, "STA COM thread crashed");
_ready.TrySetException(ex);
}
finally
{
_pumpExited = true;
DrainAndFaultQueue();
}
}
private void DrainQueue()
@@ -189,7 +218,7 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.MxAccess
_workItemsExecuted++;
try
{
workItem();
workItem.Execute();
}
catch (Exception ex)
{
@@ -198,6 +227,22 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.MxAccess
}
}
private void DrainAndFaultQueue()
{
var faultException = new InvalidOperationException("STA COM thread pump has exited");
while (_workItems.TryDequeue(out var workItem))
{
try
{
workItem.Fault(faultException);
}
catch
{
// Faulting a TCS should not throw, but guard against it
}
}
}
private void LogPumpStatsIfDue()
{
var now = DateTime.UtcNow;
@@ -208,6 +253,12 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.MxAccess
_lastLogTime = now;
}
private sealed class WorkItem
{
public Action Execute { get; set; }
public Action<Exception> Fault { get; set; }
}
#region Win32 PInvoke
[StructLayout(LayoutKind.Sequential)]
@@ -255,4 +306,4 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.MxAccess
#endregion
}
}
}

View File

@@ -3,6 +3,7 @@ using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Opc.Ua;
using Opc.Ua.Server;
using Serilog;
@@ -391,14 +392,11 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.OpcUa
{
if (string.IsNullOrEmpty(tag) || !_tagToVariableNode.ContainsKey(tag))
continue;
try
{
_mxAccessClient.SubscribeAsync(tag, (_, _) => { });
}
catch (Exception ex)
{
Log.Warning(ex, "Failed to auto-subscribe to alarm tag {Tag}", tag);
}
var alarmTag = tag;
_mxAccessClient.SubscribeAsync(alarmTag, (_, _) => { })
.ContinueWith(t => Log.Warning(t.Exception?.InnerException,
"Failed to auto-subscribe to alarm tag {Tag}", alarmTag),
TaskContinuationOptions.OnlyOnFaulted);
}
}
}
@@ -895,14 +893,11 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.OpcUa
{
if (string.IsNullOrEmpty(tag) || !_tagToVariableNode.ContainsKey(tag))
continue;
try
{
_mxAccessClient.SubscribeAsync(tag, (_, _) => { });
}
catch
{
/* ignore */
}
var subtreeAlarmTag = tag;
_mxAccessClient.SubscribeAsync(subtreeAlarmTag, (_, _) => { })
.ContinueWith(t => Log.Warning(t.Exception?.InnerException,
"Failed to subscribe alarm tag in subtree {Tag}", subtreeAlarmTag),
TaskContinuationOptions.OnlyOnFaulted);
}
}
}
@@ -1903,7 +1898,16 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.OpcUa
}
if (shouldSubscribe)
_ = _mxAccessClient.SubscribeAsync(fullTagReference, (_, _) => { });
{
try
{
_mxAccessClient.SubscribeAsync(fullTagReference, (_, _) => { }).GetAwaiter().GetResult();
}
catch (Exception ex)
{
Log.Warning(ex, "Failed to subscribe tag {Tag}", fullTagReference);
}
}
}
/// <summary>
@@ -1931,7 +1935,16 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.OpcUa
}
if (shouldUnsubscribe)
_ = _mxAccessClient.UnsubscribeAsync(fullTagReference);
{
try
{
_mxAccessClient.UnsubscribeAsync(fullTagReference).GetAwaiter().GetResult();
}
catch (Exception ex)
{
Log.Warning(ex, "Failed to unsubscribe tag {Tag}", fullTagReference);
}
}
}
/// <summary>
@@ -1957,7 +1970,13 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.OpcUa
}
foreach (var tagRef in tagsToSubscribe)
_ = _mxAccessClient.SubscribeAsync(tagRef, (_, _) => { });
{
var transferTag = tagRef;
_mxAccessClient.SubscribeAsync(transferTag, (_, _) => { })
.ContinueWith(t => Log.Warning(t.Exception?.InnerException,
"Failed to restore subscription for transferred tag {Tag}", transferTag),
TaskContinuationOptions.OnlyOnFaulted);
}
}
private void OnMxAccessDataChange(string address, Vtq vtq)

View File

@@ -18,6 +18,7 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.Status
private readonly StatusReportService _reportService;
private CancellationTokenSource? _cts;
private HttpListener? _listener;
private Task? _listenTask;
/// <summary>
/// Initializes a new dashboard web server bound to the supplied report service and HTTP port.
@@ -46,23 +47,25 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.Status
/// <summary>
/// Starts the HTTP listener and background request loop for the status dashboard.
/// </summary>
public void Start()
public bool Start()
{
try
{
_listener = new HttpListener();
_listener.Prefixes.Add($"http://+:{_port}/");
_listener.Prefixes.Add($"http://localhost:{_port}/");
_listener.Start();
_cts = new CancellationTokenSource();
Task.Run(() => ListenLoopAsync(_cts.Token));
_listenTask = Task.Run(() => ListenLoopAsync(_cts.Token));
Log.Information("Status dashboard started on http://localhost:{Port}/", _port);
return true;
}
catch (Exception ex)
{
Log.Warning(ex, "Failed to start status dashboard on port {Port}", _port);
Log.Error(ex, "Failed to start status dashboard on port {Port}", _port);
_listener = null;
return false;
}
}
@@ -83,6 +86,8 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.Status
}
_listener = null;
try { _listenTask?.Wait(TimeSpan.FromSeconds(5)); } catch { /* timeout or faulted */ }
_listenTask = null;
Log.Information("Status dashboard stopped");
}