Phase 0 — mechanical rename ZB.MOM.WW.LmxOpcUa.* → ZB.MOM.WW.OtOpcUa.*

Renames all 11 projects (5 src + 6 tests), the .slnx solution file, all source-file namespaces, all axaml namespace references, and all v1 documentation references in CLAUDE.md and docs/*.md (excluding docs/v2/ which is already in OtOpcUa form). Also updates the TopShelf service registration name from "LmxOpcUa" to "OtOpcUa" per Phase 0 Task 0.6.

Preserves runtime identifiers per Phase 0 Out-of-Scope rules to avoid breaking v1/v2 client trust during coexistence: OPC UA `ApplicationUri` defaults (`urn:{GalaxyName}:LmxOpcUa`), server `EndpointPath` (`/LmxOpcUa`), `ServerName` default (feeds cert subject CN), `MxAccessConfiguration.ClientName` default (defensive — stays "LmxOpcUa" for MxAccess audit-trail consistency), client OPC UA identifiers (`ApplicationName = "LmxOpcUaClient"`, `ApplicationUri = "urn:localhost:LmxOpcUaClient"`, cert directory `%LocalAppData%\LmxOpcUaClient\pki\`), and the `LmxOpcUaServer` class name (class rename out of Phase 0 scope per Task 0.5 sed pattern; happens in Phase 1 alongside `LmxNodeManager → GenericDriverNodeManager` Core extraction). 23 LmxOpcUa references retained, all enumerated and justified in `docs/v2/implementation/exit-gate-phase-0.md`.

Build clean: 0 errors, 30 warnings (lower than baseline 167). Tests at strict improvement over baseline: 821 passing / 1 failing vs baseline 820 / 2 (one flaky pre-existing failure passed this run; the other still fails — both pre-existing and unrelated to the rename). `Client.UI.Tests`, `Historian.Aveva.Tests`, `Client.Shared.Tests`, `IntegrationTests` all match baseline exactly. Exit gate compliance results recorded in `docs/v2/implementation/exit-gate-phase-0.md` with all 7 checks PASS or DEFERRED-to-PR-review (#7 service install verification needs Windows service permissions on the reviewer's box).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Joseph Doherty
2026-04-17 13:57:47 -04:00
parent 5b8d708c58
commit 3b2defd94f
293 changed files with 841 additions and 722 deletions

View File

@@ -0,0 +1,472 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Serilog;
using ZB.MOM.WW.OtOpcUa.Host.Domain;
namespace ZB.MOM.WW.OtOpcUa.Host.MxAccess
{
/// <summary>
/// Advises <c>&lt;ObjectName&gt;.ScanState</c> on every deployed <c>$WinPlatform</c> and
/// <c>$AppEngine</c>, tracks their runtime state (Unknown / Running / Stopped), and notifies
/// the owning node manager on Running↔Stopped transitions so it can proactively flip every
/// OPC UA variable hosted by that object to <c>BadOutOfService</c> (and clear on recovery).
/// </summary>
/// <remarks>
/// State machine semantics are documented in <c>runtimestatus.md</c>. Key facts:
/// <list type="bullet">
/// <item><c>ScanState</c> is delivered on-change only — no periodic heartbeat. A stably
/// Running host may go hours without a callback.</item>
/// <item>Running → Stopped is driven by explicit error callbacks or <c>ScanState = false</c>,
/// NEVER by starvation. The only starvation check applies to the initial Unknown state.</item>
/// <item>When the MxAccess transport is disconnected, <see cref="GetSnapshot"/> returns every
/// entry with <see cref="GalaxyRuntimeState.Unknown"/> regardless of the underlying state,
/// because we can't observe anything through a dead transport.</item>
/// <item>The stop/start callbacks fire synchronously from whichever thread delivered the
/// probe update. The manager releases its own lock before invoking them to avoid
/// lock-inversion deadlocks with the node manager's <c>Lock</c>.</item>
/// </list>
/// </remarks>
public sealed class GalaxyRuntimeProbeManager : IDisposable
{
private static readonly ILogger Log = Serilog.Log.ForContext<GalaxyRuntimeProbeManager>();
private const int CategoryWinPlatform = 1;
private const int CategoryAppEngine = 3;
private const string KindWinPlatform = "$WinPlatform";
private const string KindAppEngine = "$AppEngine";
private const string ProbeAttribute = ".ScanState";
private readonly IMxAccessClient _client;
private readonly TimeSpan _unknownTimeout;
private readonly Action<int>? _onHostStopped;
private readonly Action<int>? _onHostRunning;
private readonly Func<DateTime> _clock;
// Key: probe tag reference (e.g. "DevAppEngine.ScanState").
// Value: the current runtime status for that host, kept in sync on every probe callback
// and queried via GetSnapshot for dashboard rendering.
private readonly Dictionary<string, GalaxyRuntimeStatus> _byProbe =
new Dictionary<string, GalaxyRuntimeStatus>(StringComparer.OrdinalIgnoreCase);
// Reverse index: gobject_id -> probe tag, so Sync() can diff new/removed hosts efficiently.
private readonly Dictionary<int, string> _probeByGobjectId = new Dictionary<int, string>();
private readonly object _lock = new object();
private bool _disposed;
/// <summary>
/// Initializes a new probe manager. <paramref name="onHostStopped"/> and
/// <paramref name="onHostRunning"/> are invoked synchronously on Running↔Stopped
/// transitions so the owning node manager can invalidate / restore the hosted subtree.
/// </summary>
public GalaxyRuntimeProbeManager(
IMxAccessClient client,
int unknownTimeoutSeconds,
Action<int>? onHostStopped = null,
Action<int>? onHostRunning = null)
: this(client, unknownTimeoutSeconds, onHostStopped, onHostRunning, () => DateTime.UtcNow)
{
}
internal GalaxyRuntimeProbeManager(
IMxAccessClient client,
int unknownTimeoutSeconds,
Action<int>? onHostStopped,
Action<int>? onHostRunning,
Func<DateTime> clock)
{
_client = client ?? throw new ArgumentNullException(nameof(client));
_unknownTimeout = TimeSpan.FromSeconds(Math.Max(1, unknownTimeoutSeconds));
_onHostStopped = onHostStopped;
_onHostRunning = onHostRunning;
_clock = clock ?? throw new ArgumentNullException(nameof(clock));
}
/// <summary>
/// Gets the number of active probe subscriptions. Surfaced on the dashboard Subscriptions
/// panel so operators can see bridge-owned probe count separately from the total.
/// </summary>
public int ActiveProbeCount
{
get
{
lock (_lock)
return _byProbe.Count;
}
}
/// <summary>
/// Returns <see langword="true"/> when the galaxy runtime host identified by
/// <paramref name="gobjectId"/> is currently in the <see cref="GalaxyRuntimeState.Stopped"/>
/// state. Used by the node manager's Read path to short-circuit on-demand reads of tags
/// hosted by a known-stopped runtime object, preventing MxAccess from serving stale
/// cached values as Good. Unlike <see cref="GetSnapshot"/> this check uses the
/// underlying state directly — transport-disconnected hosts will NOT report Stopped here
/// (they report their last-known state), because connection-loss is handled by the
/// normal MxAccess error paths and we don't want this method to double-flag.
/// </summary>
public bool IsHostStopped(int gobjectId)
{
lock (_lock)
{
if (_probeByGobjectId.TryGetValue(gobjectId, out var probe)
&& _byProbe.TryGetValue(probe, out var status))
{
return status.State == GalaxyRuntimeState.Stopped;
}
}
return false;
}
/// <summary>
/// Returns a point-in-time clone of the runtime status for the host identified by
/// <paramref name="gobjectId"/>, or <see langword="null"/> when no probe is registered
/// for that object. Used by the node manager to populate the synthetic <c>$RuntimeState</c>
/// child variables on each host object. Uses the underlying state directly (not the
/// transport-gated rewrite), matching <see cref="IsHostStopped"/>.
/// </summary>
public GalaxyRuntimeStatus? GetHostStatus(int gobjectId)
{
lock (_lock)
{
if (_probeByGobjectId.TryGetValue(gobjectId, out var probe)
&& _byProbe.TryGetValue(probe, out var status))
{
return Clone(status, forceUnknown: false);
}
}
return null;
}
/// <summary>
/// Diffs the supplied hierarchy against the active probe set, advising new hosts and
/// unadvising removed ones. The hierarchy is filtered to runtime host categories
/// ($WinPlatform, $AppEngine) — non-host rows are ignored. Idempotent: a second call
/// with the same hierarchy performs no Advise / Unadvise work.
/// </summary>
/// <remarks>
/// Sync is synchronous on MxAccess: <see cref="IMxAccessClient.SubscribeAsync"/> is
/// awaited for each new host, so for a galaxy with N runtime hosts the call blocks for
/// ~N round-trips. This is acceptable because it only runs during address-space build
/// and rebuild, not on the hot path.
/// </remarks>
public async Task SyncAsync(IReadOnlyList<GalaxyObjectInfo> hierarchy)
{
if (_disposed || hierarchy == null)
return;
// Filter to runtime hosts and project to the expected probe tag name.
var desired = new Dictionary<int, (string Probe, string Kind, GalaxyObjectInfo Obj)>();
foreach (var obj in hierarchy)
{
if (obj.CategoryId != CategoryWinPlatform && obj.CategoryId != CategoryAppEngine)
continue;
if (string.IsNullOrWhiteSpace(obj.TagName))
continue;
var probe = obj.TagName + ProbeAttribute;
var kind = obj.CategoryId == CategoryWinPlatform ? KindWinPlatform : KindAppEngine;
desired[obj.GobjectId] = (probe, kind, obj);
}
// Compute diffs under lock, release lock before issuing SDK calls (which can block).
// toSubscribe carries the gobject id alongside the probe name so the rollback path on
// subscribe failure can unwind both dictionaries without a reverse lookup.
List<(int GobjectId, string Probe)> toSubscribe;
List<string> toUnsubscribe;
lock (_lock)
{
toSubscribe = new List<(int, string)>();
toUnsubscribe = new List<string>();
foreach (var kvp in desired)
{
if (_probeByGobjectId.TryGetValue(kvp.Key, out var existingProbe))
{
// Already tracked: ensure the status entry is aligned (tag rename path is
// intentionally not supported — if the probe changed, treat it as remove+add).
if (!string.Equals(existingProbe, kvp.Value.Probe, StringComparison.OrdinalIgnoreCase))
{
toUnsubscribe.Add(existingProbe);
_byProbe.Remove(existingProbe);
_probeByGobjectId.Remove(kvp.Key);
toSubscribe.Add((kvp.Key, kvp.Value.Probe));
_byProbe[kvp.Value.Probe] = MakeInitialStatus(kvp.Value.Obj, kvp.Value.Kind);
_probeByGobjectId[kvp.Key] = kvp.Value.Probe;
}
}
else
{
toSubscribe.Add((kvp.Key, kvp.Value.Probe));
_byProbe[kvp.Value.Probe] = MakeInitialStatus(kvp.Value.Obj, kvp.Value.Kind);
_probeByGobjectId[kvp.Key] = kvp.Value.Probe;
}
}
// Remove hosts that are no longer in the desired set.
var toRemove = _probeByGobjectId.Keys.Where(id => !desired.ContainsKey(id)).ToList();
foreach (var id in toRemove)
{
var probe = _probeByGobjectId[id];
toUnsubscribe.Add(probe);
_byProbe.Remove(probe);
_probeByGobjectId.Remove(id);
}
}
// Apply the diff outside the lock.
foreach (var (gobjectId, probe) in toSubscribe)
{
try
{
await _client.SubscribeAsync(probe, OnProbeValueChanged);
Log.Information("Galaxy runtime probe advised: {Probe}", probe);
}
catch (Exception ex)
{
Log.Warning(ex, "Failed to advise galaxy runtime probe {Probe}", probe);
// Roll back the pending entry so Tick() can't later transition a never-advised
// probe from Unknown to Stopped and fan out a false-negative host-down signal.
// A concurrent SyncAsync may have re-added the same gobject under a new probe
// name, so compare against the captured probe string before removing.
lock (_lock)
{
if (_probeByGobjectId.TryGetValue(gobjectId, out var current)
&& string.Equals(current, probe, StringComparison.OrdinalIgnoreCase))
{
_probeByGobjectId.Remove(gobjectId);
}
_byProbe.Remove(probe);
}
}
}
foreach (var probe in toUnsubscribe)
{
try
{
await _client.UnsubscribeAsync(probe);
}
catch (Exception ex)
{
Log.Debug(ex, "Failed to unadvise galaxy runtime probe {Probe} during sync", probe);
}
}
}
/// <summary>
/// Routes an <c>OnTagValueChanged</c> callback to the probe state machine. Returns
/// <see langword="true"/> when <paramref name="tagRef"/> matches a bridge-owned probe
/// (in which case the owning node manager should skip its normal variable-update path).
/// </summary>
public bool HandleProbeUpdate(string tagRef, Vtq vtq)
{
if (_disposed || string.IsNullOrEmpty(tagRef))
return false;
GalaxyRuntimeStatus? status;
int fromToGobjectId = 0;
GalaxyRuntimeState? transitionTo = null;
lock (_lock)
{
if (!_byProbe.TryGetValue(tagRef, out status))
return false; // not a probe — let the caller handle it normally
var now = _clock();
var isRunning = vtq.Quality.IsGood() && vtq.Value is bool b && b;
status.LastStateCallbackTime = now;
status.LastScanState = vtq.Value as bool?;
if (isRunning)
{
status.GoodUpdateCount++;
status.LastError = null;
if (status.State != GalaxyRuntimeState.Running)
{
// Only fire the host-running callback on a true Stopped → Running
// recovery. Unknown → Running happens once at startup for every host
// and is not a recovery — firing ClearHostVariablesBadQuality there
// would wipe Bad status set by the concurrently-stopping other host
// on variables that span both lists.
var wasStopped = status.State == GalaxyRuntimeState.Stopped;
status.State = GalaxyRuntimeState.Running;
status.LastStateChangeTime = now;
if (wasStopped)
{
transitionTo = GalaxyRuntimeState.Running;
fromToGobjectId = status.GobjectId;
}
}
}
else
{
status.FailureCount++;
status.LastError = BuildErrorDetail(vtq);
if (status.State != GalaxyRuntimeState.Stopped)
{
status.State = GalaxyRuntimeState.Stopped;
status.LastStateChangeTime = now;
transitionTo = GalaxyRuntimeState.Stopped;
fromToGobjectId = status.GobjectId;
}
}
}
// Invoke transition callbacks outside the lock to avoid inverting the node manager's
// lock order when it subsequently takes its own Lock to flip hosted variables.
if (transitionTo == GalaxyRuntimeState.Stopped)
{
Log.Information("Galaxy runtime {Probe} transitioned Running → Stopped ({Err})",
tagRef, status?.LastError ?? "(no detail)");
try { _onHostStopped?.Invoke(fromToGobjectId); }
catch (Exception ex) { Log.Warning(ex, "onHostStopped callback threw for {Probe}", tagRef); }
}
else if (transitionTo == GalaxyRuntimeState.Running)
{
Log.Information("Galaxy runtime {Probe} transitioned → Running", tagRef);
try { _onHostRunning?.Invoke(fromToGobjectId); }
catch (Exception ex) { Log.Warning(ex, "onHostRunning callback threw for {Probe}", tagRef); }
}
return true;
}
/// <summary>
/// Periodic tick — flips Unknown entries to Stopped once their registration has been
/// outstanding for longer than the configured timeout without ever receiving a first
/// callback. Does nothing to Running or Stopped entries.
/// </summary>
public void Tick()
{
if (_disposed)
return;
var transitions = new List<int>();
lock (_lock)
{
var now = _clock();
foreach (var entry in _byProbe.Values)
{
if (entry.State != GalaxyRuntimeState.Unknown)
continue;
// LastStateChangeTime is set at creation to "now" so the timeout is measured
// from when the probe was advised.
if (entry.LastStateChangeTime.HasValue
&& now - entry.LastStateChangeTime.Value > _unknownTimeout)
{
entry.State = GalaxyRuntimeState.Stopped;
entry.LastStateChangeTime = now;
entry.FailureCount++;
entry.LastError = "Probe never received an initial callback within the unknown-resolution timeout";
transitions.Add(entry.GobjectId);
}
}
}
foreach (var gobjectId in transitions)
{
Log.Warning("Galaxy runtime gobject {GobjectId} timed out in Unknown state → Stopped", gobjectId);
try { _onHostStopped?.Invoke(gobjectId); }
catch (Exception ex) { Log.Warning(ex, "onHostStopped callback threw during tick for {GobjectId}", gobjectId); }
}
}
/// <summary>
/// Returns a read-only snapshot of every tracked host. When the MxAccess transport is
/// disconnected, every entry is rewritten to Unknown on the way out so operators aren't
/// misled by cached per-host state — the Connection panel is the primary signal in that
/// case. The underlying <c>_byProbe</c> map is not modified.
/// </summary>
public IReadOnlyList<GalaxyRuntimeStatus> GetSnapshot()
{
var transportDown = _client.State != ConnectionState.Connected;
lock (_lock)
{
var result = new List<GalaxyRuntimeStatus>(_byProbe.Count);
foreach (var entry in _byProbe.Values)
result.Add(Clone(entry, forceUnknown: transportDown));
// Stable ordering by name so dashboard rows don't jitter between refreshes.
result.Sort((a, b) => string.CompareOrdinal(a.ObjectName, b.ObjectName));
return result;
}
}
/// <inheritdoc />
public void Dispose()
{
List<string> probes;
lock (_lock)
{
if (_disposed)
return;
_disposed = true;
probes = _byProbe.Keys.ToList();
_byProbe.Clear();
_probeByGobjectId.Clear();
}
foreach (var probe in probes)
{
try
{
_client.UnsubscribeAsync(probe).GetAwaiter().GetResult();
}
catch (Exception ex)
{
Log.Debug(ex, "Failed to unadvise galaxy runtime probe {Probe} during Dispose", probe);
}
}
}
private void OnProbeValueChanged(string tagRef, Vtq vtq)
{
HandleProbeUpdate(tagRef, vtq);
}
private GalaxyRuntimeStatus MakeInitialStatus(GalaxyObjectInfo obj, string kind)
{
return new GalaxyRuntimeStatus
{
ObjectName = obj.TagName,
GobjectId = obj.GobjectId,
Kind = kind,
State = GalaxyRuntimeState.Unknown,
LastStateChangeTime = _clock()
};
}
private static GalaxyRuntimeStatus Clone(GalaxyRuntimeStatus src, bool forceUnknown)
{
return new GalaxyRuntimeStatus
{
ObjectName = src.ObjectName,
GobjectId = src.GobjectId,
Kind = src.Kind,
State = forceUnknown ? GalaxyRuntimeState.Unknown : src.State,
LastStateCallbackTime = src.LastStateCallbackTime,
LastStateChangeTime = src.LastStateChangeTime,
LastScanState = src.LastScanState,
LastError = forceUnknown ? null : src.LastError,
GoodUpdateCount = src.GoodUpdateCount,
FailureCount = src.FailureCount
};
}
private static string BuildErrorDetail(Vtq vtq)
{
if (vtq.Quality.IsBad())
return $"bad quality ({vtq.Quality})";
if (vtq.Quality.IsUncertain())
return $"uncertain quality ({vtq.Quality})";
if (vtq.Value is bool b && !b)
return "ScanState = false (OffScan)";
return $"unexpected value: {vtq.Value ?? "(null)"}";
}
}
}

View File

@@ -0,0 +1,149 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using ZB.MOM.WW.OtOpcUa.Host.Domain;
namespace ZB.MOM.WW.OtOpcUa.Host.MxAccess
{
public sealed partial class MxAccessClient
{
/// <summary>
/// Opens the MXAccess runtime connection, replays stored subscriptions, and starts the optional probe subscription.
/// </summary>
/// <param name="ct">A token that cancels the connection attempt.</param>
public async Task ConnectAsync(CancellationToken ct = default)
{
if (_state == ConnectionState.Connected) return;
SetState(ConnectionState.Connecting);
try
{
_connectionHandle = await _staThread.RunAsync(() =>
{
AttachProxyEvents();
return _proxy.Register(_config.ClientName);
});
Log.Information("MxAccess registered with handle {Handle}", _connectionHandle);
SetState(ConnectionState.Connected);
// Replay stored subscriptions
await ReplayStoredSubscriptionsAsync();
// Start probe if configured
if (!string.IsNullOrWhiteSpace(_config.ProbeTag))
{
_probeTag = _config.ProbeTag;
_lastProbeValueTime = DateTime.UtcNow;
await SubscribeInternalAsync(_probeTag!);
Log.Information("Probe tag subscribed: {ProbeTag}", _probeTag);
}
}
catch (Exception ex)
{
try
{
await _staThread.RunAsync(DetachProxyEvents);
}
catch (Exception cleanupEx)
{
Log.Warning(cleanupEx, "Failed to detach proxy events after connection failure");
}
Log.Error(ex, "MxAccess connection failed");
SetState(ConnectionState.Error, ex.Message);
throw;
}
}
/// <summary>
/// Disconnects from the runtime and cleans up active handles, callbacks, and pending operations.
/// </summary>
public async Task DisconnectAsync()
{
if (_state == ConnectionState.Disconnected) return;
SetState(ConnectionState.Disconnecting);
try
{
await _staThread.RunAsync(() =>
{
// UnAdvise + RemoveItem for all active subscriptions
foreach (var kvp in _addressToHandle)
try
{
_proxy.UnAdviseSupervisory(_connectionHandle, kvp.Value);
_proxy.RemoveItem(_connectionHandle, kvp.Value);
}
catch (Exception ex)
{
Log.Warning(ex, "Error cleaning up subscription for {Address}", kvp.Key);
}
// Unwire events before unregister
DetachProxyEvents();
// Unregister
try
{
_proxy.Unregister(_connectionHandle);
}
catch (Exception ex)
{
Log.Warning(ex, "Error during Unregister");
}
});
_handleToAddress.Clear();
_addressToHandle.Clear();
_pendingReadsByAddress.Clear();
_pendingWrites.Clear();
}
catch (Exception ex)
{
Log.Warning(ex, "Error during disconnect");
}
finally
{
SetState(ConnectionState.Disconnected);
}
}
/// <summary>
/// Attempts to recover from a runtime fault by disconnecting and reconnecting the client.
/// </summary>
public async Task ReconnectAsync()
{
SetState(ConnectionState.Reconnecting);
Interlocked.Increment(ref _reconnectCount);
Log.Information("MxAccess reconnect attempt #{Count}", _reconnectCount);
try
{
await DisconnectAsync();
await ConnectAsync();
}
catch (Exception ex)
{
Log.Error(ex, "Reconnect failed");
SetState(ConnectionState.Error, ex.Message);
}
}
private void AttachProxyEvents()
{
if (_proxyEventsAttached) return;
_proxy.OnDataChange += HandleOnDataChange;
_proxy.OnWriteComplete += HandleOnWriteComplete;
_proxyEventsAttached = true;
}
private void DetachProxyEvents()
{
if (!_proxyEventsAttached) return;
_proxy.OnDataChange -= HandleOnDataChange;
_proxy.OnWriteComplete -= HandleOnWriteComplete;
_proxyEventsAttached = false;
}
}
}

View File

@@ -0,0 +1,97 @@
using System;
using ArchestrA.MxAccess;
using ZB.MOM.WW.OtOpcUa.Host.Domain;
namespace ZB.MOM.WW.OtOpcUa.Host.MxAccess
{
public sealed partial class MxAccessClient
{
/// <summary>
/// COM event handler for MxAccess OnDataChange events.
/// Signature matches the ArchestrA.MxAccess ILMXProxyServerEvents interface.
/// </summary>
private void HandleOnDataChange(
int hLMXServerHandle,
int phItemHandle,
object pvItemValue,
int pwItemQuality,
object pftItemTimeStamp,
ref MXSTATUS_PROXY[] ItemStatus)
{
try
{
if (!_handleToAddress.TryGetValue(phItemHandle, out var address))
{
Log.Debug("OnDataChange for unknown handle {Handle}", phItemHandle);
return;
}
var quality = QualityMapper.MapFromMxAccessQuality(pwItemQuality);
// Check MXSTATUS_PROXY — if success is false, use more specific quality
if (ItemStatus != null && ItemStatus.Length > 0 && ItemStatus[0].success == 0)
quality = MxErrorCodes.MapToQuality(ItemStatus[0].detail);
var timestamp = ConvertTimestamp(pftItemTimeStamp);
var vtq = new Vtq(pvItemValue, timestamp, quality);
// Update probe timestamp
if (string.Equals(address, _probeTag, StringComparison.OrdinalIgnoreCase))
_lastProbeValueTime = DateTime.UtcNow;
// Invoke stored subscription callback
if (_storedSubscriptions.TryGetValue(address, out var callback)) callback(address, vtq);
if (_pendingReadsByAddress.TryGetValue(address, out var pendingReads))
foreach (var pendingRead in pendingReads.Values)
pendingRead.TrySetResult(vtq);
// Global handler
OnTagValueChanged?.Invoke(address, vtq);
}
catch (Exception ex)
{
Log.Error(ex, "Error processing OnDataChange for handle {Handle}", phItemHandle);
}
}
/// <summary>
/// COM event handler for MxAccess OnWriteComplete events.
/// </summary>
private void HandleOnWriteComplete(
int hLMXServerHandle,
int phItemHandle,
ref MXSTATUS_PROXY[] ItemStatus)
{
try
{
if (_pendingWrites.TryRemove(phItemHandle, out var tcs))
{
var success = ItemStatus == null || ItemStatus.Length == 0 || ItemStatus[0].success != 0;
if (success)
{
tcs.TrySetResult(true);
}
else
{
var detail = ItemStatus![0].detail;
var message = MxErrorCodes.GetMessage(detail);
Log.Warning("Write failed for handle {Handle}: {Message}", phItemHandle, message);
tcs.TrySetResult(false);
}
}
}
catch (Exception ex)
{
Log.Error(ex, "Error processing OnWriteComplete for handle {Handle}", phItemHandle);
}
}
private static DateTime ConvertTimestamp(object pftItemTimeStamp)
{
if (pftItemTimeStamp is DateTime dt)
return dt.Kind == DateTimeKind.Utc ? dt : dt.ToUniversalTime();
return DateTime.UtcNow;
}
}
}

View File

@@ -0,0 +1,78 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using ZB.MOM.WW.OtOpcUa.Host.Domain;
namespace ZB.MOM.WW.OtOpcUa.Host.MxAccess
{
public sealed partial class MxAccessClient
{
private Task? _monitorTask;
/// <summary>
/// Starts the background monitor that reconnects dropped sessions and watches the probe tag for staleness.
/// </summary>
public void StartMonitor()
{
if (_monitorCts != null)
StopMonitor();
_monitorCts = new CancellationTokenSource();
_monitorTask = Task.Run(() => MonitorLoopAsync(_monitorCts.Token));
Log.Information("MxAccess monitor started (interval={Interval}s)", _config.MonitorIntervalSeconds);
}
/// <summary>
/// Stops the background monitor loop.
/// </summary>
public void StopMonitor()
{
_monitorCts?.Cancel();
try { _monitorTask?.Wait(TimeSpan.FromSeconds(5)); } catch { /* timeout or faulted */ }
_monitorTask = null;
}
private async Task MonitorLoopAsync(CancellationToken ct)
{
while (!ct.IsCancellationRequested)
{
try
{
await Task.Delay(TimeSpan.FromSeconds(_config.MonitorIntervalSeconds), ct);
}
catch (OperationCanceledException)
{
break;
}
try
{
if ((_state == ConnectionState.Disconnected || _state == ConnectionState.Error) &&
_config.AutoReconnect)
{
Log.Information("Monitor: connection lost (state={State}), attempting reconnect", _state);
await ReconnectAsync();
continue;
}
if (_state == ConnectionState.Connected && _probeTag != null)
{
var elapsed = DateTime.UtcNow - _lastProbeValueTime;
if (elapsed.TotalSeconds > _config.ProbeStaleThresholdSeconds)
{
Log.Warning("Monitor: probe stale ({Elapsed:F0}s > {Threshold}s), forcing reconnect",
elapsed.TotalSeconds, _config.ProbeStaleThresholdSeconds);
await ReconnectAsync();
}
}
}
catch (Exception ex)
{
Log.Error(ex, "Monitor loop error");
}
}
Log.Information("MxAccess monitor stopped");
}
}
}

View File

@@ -0,0 +1,166 @@
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
using ZB.MOM.WW.OtOpcUa.Host.Domain;
namespace ZB.MOM.WW.OtOpcUa.Host.MxAccess
{
public sealed partial class MxAccessClient
{
/// <summary>
/// Performs a one-shot read of a Galaxy tag by waiting for the next runtime data-change callback.
/// </summary>
/// <param name="fullTagReference">The fully qualified Galaxy tag reference to read.</param>
/// <param name="ct">A token that cancels the read.</param>
/// <returns>The resulting VTQ value or a bad-quality fallback on timeout or failure.</returns>
public async Task<Vtq> ReadAsync(string fullTagReference, CancellationToken ct = default)
{
if (_state != ConnectionState.Connected)
return Vtq.Bad(Quality.BadNotConnected);
await _operationSemaphore.WaitAsync(ct);
try
{
using var scope = _metrics.BeginOperation("Read");
var tcs = new TaskCompletionSource<Vtq>();
var itemHandle = await _staThread.RunAsync(() =>
{
var h = _proxy.AddItem(_connectionHandle, fullTagReference);
_proxy.AdviseSupervisory(_connectionHandle, h);
return h;
});
var pendingReads = _pendingReadsByAddress.GetOrAdd(fullTagReference,
_ => new ConcurrentDictionary<int, TaskCompletionSource<Vtq>>());
pendingReads[itemHandle] = tcs;
_handleToAddress[itemHandle] = fullTagReference;
try
{
using var cts = CancellationTokenSource.CreateLinkedTokenSource(ct);
cts.CancelAfter(TimeSpan.FromSeconds(_config.ReadTimeoutSeconds));
cts.Token.Register(() => tcs.TrySetResult(Vtq.Bad(Quality.BadCommFailure)));
var result = await tcs.Task;
if (result.Quality != Quality.Good)
scope.SetSuccess(false);
return result;
}
catch
{
scope.SetSuccess(false);
return Vtq.Bad(Quality.BadCommFailure);
}
finally
{
if (_pendingReadsByAddress.TryGetValue(fullTagReference, out var reads))
{
reads.TryRemove(itemHandle, out _);
if (reads.IsEmpty)
_pendingReadsByAddress.TryRemove(fullTagReference, out _);
}
_handleToAddress.TryRemove(itemHandle, out _);
try
{
await _staThread.RunAsync(() =>
{
_proxy.UnAdviseSupervisory(_connectionHandle, itemHandle);
_proxy.RemoveItem(_connectionHandle, itemHandle);
});
}
catch (Exception ex)
{
Log.Warning(ex, "Error cleaning up read subscription for {Address}", fullTagReference);
}
}
}
finally
{
_operationSemaphore.Release();
}
}
/// <summary>
/// Writes a value to a Galaxy tag and waits for the runtime write-complete callback.
/// </summary>
/// <param name="fullTagReference">The fully qualified Galaxy tag reference to write.</param>
/// <param name="value">The value to send to the runtime.</param>
/// <param name="ct">A token that cancels the write.</param>
/// <returns><see langword="true" /> when the runtime acknowledges success; otherwise, <see langword="false" />.</returns>
public async Task<bool> WriteAsync(string fullTagReference, object value, CancellationToken ct = default)
{
if (_state != ConnectionState.Connected) return false;
await _operationSemaphore.WaitAsync(ct);
try
{
using var scope = _metrics.BeginOperation("Write");
var itemHandle = await _staThread.RunAsync(() =>
{
var h = _proxy.AddItem(_connectionHandle, fullTagReference);
_proxy.AdviseSupervisory(_connectionHandle, h);
return h;
});
_handleToAddress[itemHandle] = fullTagReference;
var tcs = new TaskCompletionSource<bool>();
_pendingWrites[itemHandle] = tcs;
try
{
await _staThread.RunAsync(() => _proxy.Write(_connectionHandle, itemHandle, value, -1));
using var cts = CancellationTokenSource.CreateLinkedTokenSource(ct);
cts.CancelAfter(TimeSpan.FromSeconds(_config.WriteTimeoutSeconds));
cts.Token.Register(() =>
{
Log.Warning("Write timed out for {Address} after {Timeout}s", fullTagReference,
_config.WriteTimeoutSeconds);
tcs.TrySetResult(false);
});
var success = await tcs.Task;
if (!success)
scope.SetSuccess(false);
return success;
}
catch (Exception ex)
{
scope.SetSuccess(false);
Log.Error(ex, "Write failed for {Address}", fullTagReference);
return false;
}
finally
{
_pendingWrites.TryRemove(itemHandle, out _);
_handleToAddress.TryRemove(itemHandle, out _);
try
{
await _staThread.RunAsync(() =>
{
_proxy.UnAdviseSupervisory(_connectionHandle, itemHandle);
_proxy.RemoveItem(_connectionHandle, itemHandle);
});
}
catch (Exception ex)
{
Log.Warning(ex, "Error cleaning up write subscription for {Address}", fullTagReference);
}
}
}
finally
{
_operationSemaphore.Release();
}
}
}
}

View File

@@ -0,0 +1,107 @@
using System;
using System.Threading.Tasks;
using ZB.MOM.WW.OtOpcUa.Host.Domain;
namespace ZB.MOM.WW.OtOpcUa.Host.MxAccess
{
public sealed partial class MxAccessClient
{
/// <summary>
/// Registers a persistent subscription callback for a Galaxy tag and activates it immediately when connected.
/// </summary>
/// <param name="fullTagReference">The fully qualified Galaxy tag reference to monitor.</param>
/// <param name="callback">The callback that should receive runtime value changes.</param>
public async Task SubscribeAsync(string fullTagReference, Action<string, Vtq> callback)
{
_storedSubscriptions[fullTagReference] = callback;
if (_state != ConnectionState.Connected) return;
if (_addressToHandle.ContainsKey(fullTagReference)) return;
await SubscribeInternalAsync(fullTagReference);
}
/// <summary>
/// Removes a persistent subscription callback and tears down the runtime item when appropriate.
/// </summary>
/// <param name="fullTagReference">The fully qualified Galaxy tag reference to stop monitoring.</param>
public async Task UnsubscribeAsync(string fullTagReference)
{
_storedSubscriptions.TryRemove(fullTagReference, out _);
// Don't unsubscribe the probe tag
if (string.Equals(fullTagReference, _probeTag, StringComparison.OrdinalIgnoreCase))
return;
if (_addressToHandle.TryRemove(fullTagReference, out var itemHandle))
{
_handleToAddress.TryRemove(itemHandle, out _);
if (_state == ConnectionState.Connected)
await _staThread.RunAsync(() =>
{
try
{
_proxy.UnAdviseSupervisory(_connectionHandle, itemHandle);
_proxy.RemoveItem(_connectionHandle, itemHandle);
}
catch (Exception ex)
{
Log.Warning(ex, "Error unsubscribing {Address}", fullTagReference);
}
});
}
}
private async Task SubscribeInternalAsync(string address)
{
if (_addressToHandle.ContainsKey(address))
return;
using var scope = _metrics.BeginOperation("Subscribe");
try
{
var itemHandle = await _staThread.RunAsync(() =>
{
var h = _proxy.AddItem(_connectionHandle, address);
_proxy.AdviseSupervisory(_connectionHandle, h);
return h;
});
var registeredHandle = _addressToHandle.GetOrAdd(address, itemHandle);
if (registeredHandle != itemHandle)
{
await _staThread.RunAsync(() =>
{
_proxy.UnAdviseSupervisory(_connectionHandle, itemHandle);
_proxy.RemoveItem(_connectionHandle, itemHandle);
});
return;
}
_handleToAddress[itemHandle] = address;
Log.Debug("Subscribed to {Address} (handle={Handle})", address, itemHandle);
}
catch (Exception ex)
{
scope.SetSuccess(false);
Log.Error(ex, "Failed to subscribe to {Address}", address);
throw;
}
}
private async Task ReplayStoredSubscriptionsAsync()
{
foreach (var kvp in _storedSubscriptions)
try
{
await SubscribeInternalAsync(kvp.Key);
}
catch (Exception ex)
{
Log.Warning(ex, "Failed to replay subscription for {Address}", kvp.Key);
}
Log.Information("Replayed {Count} stored subscriptions", _storedSubscriptions.Count);
}
}
}

View File

@@ -0,0 +1,125 @@
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
using Serilog;
using ZB.MOM.WW.OtOpcUa.Host.Configuration;
using ZB.MOM.WW.OtOpcUa.Host.Domain;
using ZB.MOM.WW.OtOpcUa.Host.Metrics;
namespace ZB.MOM.WW.OtOpcUa.Host.MxAccess
{
/// <summary>
/// Core MXAccess client implementing IMxAccessClient via IMxProxy abstraction.
/// Split across partial classes: Connection, Subscription, ReadWrite, EventHandlers, Monitor.
/// (MXA-001 through MXA-009)
/// </summary>
public sealed partial class MxAccessClient : IMxAccessClient
{
private static readonly ILogger Log = Serilog.Log.ForContext<MxAccessClient>();
private readonly ConcurrentDictionary<string, int> _addressToHandle = new(StringComparer.OrdinalIgnoreCase);
private readonly MxAccessConfiguration _config;
// Handle mappings
private readonly ConcurrentDictionary<int, string> _handleToAddress = new();
private readonly PerformanceMetrics _metrics;
private readonly SemaphoreSlim _operationSemaphore;
private readonly ConcurrentDictionary<string, ConcurrentDictionary<int, TaskCompletionSource<Vtq>>>
_pendingReadsByAddress
= new(StringComparer.OrdinalIgnoreCase);
// Pending writes
private readonly ConcurrentDictionary<int, TaskCompletionSource<bool>> _pendingWrites = new();
private readonly IMxProxy _proxy;
private readonly StaComThread _staThread;
// Subscription storage
private readonly ConcurrentDictionary<string, Action<string, Vtq>> _storedSubscriptions
= new(StringComparer.OrdinalIgnoreCase);
private int _connectionHandle;
private DateTime _lastProbeValueTime = DateTime.UtcNow;
private CancellationTokenSource? _monitorCts;
// Probe
private string? _probeTag;
private bool _proxyEventsAttached;
private int _reconnectCount;
private volatile ConnectionState _state = ConnectionState.Disconnected;
/// <summary>
/// Initializes a new MXAccess client around the STA thread, COM proxy abstraction, and runtime throttling settings.
/// </summary>
/// <param name="staThread">The STA thread used to marshal COM interactions.</param>
/// <param name="proxy">The COM proxy abstraction used to talk to the runtime.</param>
/// <param name="config">The runtime timeout, throttling, and reconnect settings.</param>
/// <param name="metrics">The metrics collector used to time MXAccess operations.</param>
public MxAccessClient(StaComThread staThread, IMxProxy proxy, MxAccessConfiguration config,
PerformanceMetrics metrics)
{
_staThread = staThread;
_proxy = proxy;
_config = config;
_metrics = metrics;
_operationSemaphore = new SemaphoreSlim(config.MaxConcurrentOperations, config.MaxConcurrentOperations);
}
/// <summary>
/// Gets the current runtime connection state for the MXAccess client.
/// </summary>
public ConnectionState State => _state;
/// <summary>
/// Gets the number of active tag subscriptions currently maintained against the runtime.
/// </summary>
public int ActiveSubscriptionCount => _storedSubscriptions.Count;
/// <summary>
/// Gets the number of reconnect attempts performed since the client was created.
/// </summary>
public int ReconnectCount => _reconnectCount;
/// <summary>
/// Occurs when the MXAccess connection state changes.
/// </summary>
public event EventHandler<ConnectionStateChangedEventArgs>? ConnectionStateChanged;
/// <summary>
/// Occurs when a subscribed runtime tag publishes a new value.
/// </summary>
public event Action<string, Vtq>? OnTagValueChanged;
/// <summary>
/// Cancels monitoring and disconnects the runtime session before releasing local resources.
/// </summary>
public void Dispose()
{
try
{
_monitorCts?.Cancel();
DisconnectAsync().GetAwaiter().GetResult();
}
catch (Exception ex)
{
Log.Warning(ex, "Error during MxAccessClient dispose");
}
finally
{
_operationSemaphore.Dispose();
_monitorCts?.Dispose();
}
}
private void SetState(ConnectionState newState, string message = "")
{
var previous = _state;
if (previous == newState) return;
_state = newState;
Log.Information("MxAccess state: {Previous} → {Current} {Message}", previous, newState, message);
ConnectionStateChanged?.Invoke(this, new ConnectionStateChangedEventArgs(previous, newState, message));
}
}
}

View File

@@ -0,0 +1,130 @@
using System;
using System.Runtime.InteropServices;
using ArchestrA.MxAccess;
using ZB.MOM.WW.OtOpcUa.Host.Domain;
namespace ZB.MOM.WW.OtOpcUa.Host.MxAccess
{
/// <summary>
/// Wraps the real ArchestrA.MxAccess.LMXProxyServer COM object, forwarding calls to IMxProxy.
/// Uses strongly-typed interop — same pattern as the reference LmxProxy implementation. (MXA-001)
/// </summary>
public sealed class MxProxyAdapter : IMxProxy
{
private LMXProxyServer? _lmxProxy;
/// <summary>
/// Occurs when the COM proxy publishes a live data-change callback for a subscribed Galaxy attribute.
/// </summary>
public event MxDataChangeHandler? OnDataChange;
/// <summary>
/// Occurs when the COM proxy confirms completion of a write request.
/// </summary>
public event MxWriteCompleteHandler? OnWriteComplete;
/// <summary>
/// Creates and registers the COM proxy session that backs live MXAccess operations.
/// </summary>
/// <param name="clientName">The client name reported to the Wonderware runtime.</param>
/// <returns>The runtime connection handle assigned by the COM server.</returns>
public int Register(string clientName)
{
_lmxProxy = new LMXProxyServer();
_lmxProxy.OnDataChange += ProxyOnDataChange;
_lmxProxy.OnWriteComplete += ProxyOnWriteComplete;
var handle = _lmxProxy.Register(clientName);
if (handle <= 0)
throw new InvalidOperationException($"LMXProxyServer.Register returned invalid handle: {handle}");
return handle;
}
/// <summary>
/// Unregisters the COM proxy session and releases the underlying COM object.
/// </summary>
/// <param name="handle">The runtime connection handle returned by <see cref="Register(string)" />.</param>
public void Unregister(int handle)
{
if (_lmxProxy != null)
try
{
_lmxProxy.OnDataChange -= ProxyOnDataChange;
_lmxProxy.OnWriteComplete -= ProxyOnWriteComplete;
_lmxProxy.Unregister(handle);
}
finally
{
Marshal.ReleaseComObject(_lmxProxy);
_lmxProxy = null;
}
}
/// <summary>
/// Resolves a Galaxy attribute reference into a runtime item handle through the COM proxy.
/// </summary>
/// <param name="handle">The runtime connection handle.</param>
/// <param name="address">The fully qualified Galaxy attribute reference.</param>
/// <returns>The item handle assigned by the COM proxy.</returns>
public int AddItem(int handle, string address)
{
return _lmxProxy!.AddItem(handle, address);
}
/// <summary>
/// Removes an item handle from the active COM proxy session.
/// </summary>
/// <param name="handle">The runtime connection handle.</param>
/// <param name="itemHandle">The item handle to remove.</param>
public void RemoveItem(int handle, int itemHandle)
{
_lmxProxy!.RemoveItem(handle, itemHandle);
}
/// <summary>
/// Enables supervisory callbacks for the specified runtime item.
/// </summary>
/// <param name="handle">The runtime connection handle.</param>
/// <param name="itemHandle">The item handle to monitor.</param>
public void AdviseSupervisory(int handle, int itemHandle)
{
_lmxProxy!.AdviseSupervisory(handle, itemHandle);
}
/// <summary>
/// Disables supervisory callbacks for the specified runtime item.
/// </summary>
/// <param name="handle">The runtime connection handle.</param>
/// <param name="itemHandle">The item handle to stop monitoring.</param>
public void UnAdviseSupervisory(int handle, int itemHandle)
{
_lmxProxy!.UnAdvise(handle, itemHandle);
}
/// <summary>
/// Writes a value to the specified runtime item through the COM proxy.
/// </summary>
/// <param name="handle">The runtime connection handle.</param>
/// <param name="itemHandle">The item handle to write.</param>
/// <param name="value">The value to send to the runtime.</param>
/// <param name="securityClassification">The Wonderware security classification applied to the write.</param>
public void Write(int handle, int itemHandle, object value, int securityClassification)
{
_lmxProxy!.Write(handle, itemHandle, value, securityClassification);
}
private void ProxyOnDataChange(int hLMXServerHandle, int phItemHandle, object pvItemValue,
int pwItemQuality, object pftItemTimeStamp, ref MXSTATUS_PROXY[] ItemStatus)
{
OnDataChange?.Invoke(hLMXServerHandle, phItemHandle, pvItemValue, pwItemQuality, pftItemTimeStamp,
ref ItemStatus);
}
private void ProxyOnWriteComplete(int hLMXServerHandle, int phItemHandle, ref MXSTATUS_PROXY[] ItemStatus)
{
OnWriteComplete?.Invoke(hLMXServerHandle, phItemHandle, ref ItemStatus);
}
}
}

View File

@@ -0,0 +1,309 @@
using System;
using System.Collections.Concurrent;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;
using Serilog;
namespace ZB.MOM.WW.OtOpcUa.Host.MxAccess
{
/// <summary>
/// Dedicated STA thread with a raw Win32 message pump for COM interop.
/// All MxAccess COM objects must be created and called on this thread. (MXA-001)
/// </summary>
public sealed class StaComThread : IDisposable
{
private const uint WM_APP = 0x8000;
private const uint PM_NOREMOVE = 0x0000;
private static readonly ILogger Log = Serilog.Log.ForContext<StaComThread>();
private static readonly TimeSpan PumpLogInterval = TimeSpan.FromMinutes(5);
private readonly TaskCompletionSource<bool> _ready = new();
private readonly Thread _thread;
private readonly ConcurrentQueue<WorkItem> _workItems = new();
private long _appMessages;
private long _dispatchedMessages;
private bool _disposed;
private DateTime _lastLogTime;
private volatile uint _nativeThreadId;
private volatile bool _pumpExited;
private long _totalMessages;
private long _workItemsExecuted;
/// <summary>
/// Initializes a dedicated STA thread wrapper for Wonderware COM interop.
/// </summary>
public StaComThread()
{
_thread = new Thread(ThreadEntry)
{
Name = "MxAccess-STA",
IsBackground = true
};
_thread.SetApartmentState(ApartmentState.STA);
}
/// <summary>
/// Gets a value indicating whether the STA thread is running and able to accept work.
/// </summary>
public bool IsRunning => _nativeThreadId != 0 && !_disposed && !_pumpExited;
/// <summary>
/// Stops the STA thread and releases the message-pump resources used for COM interop.
/// </summary>
public void Dispose()
{
if (_disposed) return;
_disposed = true;
try
{
if (_nativeThreadId != 0 && !_pumpExited)
PostThreadMessage(_nativeThreadId, WM_APP + 1, IntPtr.Zero, IntPtr.Zero);
_thread.Join(TimeSpan.FromSeconds(5));
}
catch (Exception ex)
{
Log.Warning(ex, "Error shutting down STA COM thread");
}
DrainAndFaultQueue();
Log.Information("STA COM thread stopped");
}
/// <summary>
/// Starts the STA thread and waits until its message pump is ready for COM work.
/// </summary>
public void Start()
{
_thread.Start();
_ready.Task.GetAwaiter().GetResult();
Log.Information("STA COM thread started (ThreadId={ThreadId})", _thread.ManagedThreadId);
}
/// <summary>
/// Queues an action to execute on the STA thread.
/// </summary>
/// <param name="action">The work item to execute on the STA thread.</param>
/// <returns>A task that completes when the action has finished executing.</returns>
public Task RunAsync(Action action)
{
if (_disposed) throw new ObjectDisposedException(nameof(StaComThread));
if (_pumpExited) throw new InvalidOperationException("STA COM thread pump has exited");
var tcs = new TaskCompletionSource<bool>();
_workItems.Enqueue(new WorkItem
{
Execute = () =>
{
try
{
action();
tcs.TrySetResult(true);
}
catch (Exception ex)
{
tcs.TrySetException(ex);
}
},
Fault = ex => tcs.TrySetException(ex)
});
if (!PostThreadMessage(_nativeThreadId, WM_APP, IntPtr.Zero, IntPtr.Zero))
{
_pumpExited = true;
DrainAndFaultQueue();
}
return tcs.Task;
}
/// <summary>
/// Queues a function to execute on the STA thread and returns its result.
/// </summary>
/// <typeparam name="T">The result type produced by the function.</typeparam>
/// <param name="func">The work item to execute on the STA thread.</param>
/// <returns>A task that completes with the function result.</returns>
public Task<T> RunAsync<T>(Func<T> func)
{
if (_disposed) throw new ObjectDisposedException(nameof(StaComThread));
if (_pumpExited) throw new InvalidOperationException("STA COM thread pump has exited");
var tcs = new TaskCompletionSource<T>();
_workItems.Enqueue(new WorkItem
{
Execute = () =>
{
try
{
tcs.TrySetResult(func());
}
catch (Exception ex)
{
tcs.TrySetException(ex);
}
},
Fault = ex => tcs.TrySetException(ex)
});
if (!PostThreadMessage(_nativeThreadId, WM_APP, IntPtr.Zero, IntPtr.Zero))
{
_pumpExited = true;
DrainAndFaultQueue();
}
return tcs.Task;
}
private void ThreadEntry()
{
try
{
_nativeThreadId = GetCurrentThreadId();
MSG msg;
PeekMessage(out msg, IntPtr.Zero, 0, 0, PM_NOREMOVE);
_ready.TrySetResult(true);
_lastLogTime = DateTime.UtcNow;
Log.Debug("STA message pump entering loop");
while (GetMessage(out msg, IntPtr.Zero, 0, 0) > 0)
{
_totalMessages++;
if (msg.message == WM_APP)
{
_appMessages++;
DrainQueue();
}
else if (msg.message == WM_APP + 1)
{
DrainQueue();
PostQuitMessage(0);
}
else
{
_dispatchedMessages++;
TranslateMessage(ref msg);
DispatchMessage(ref msg);
}
LogPumpStatsIfDue();
}
Log.Information(
"STA message pump exited (Total={Total}, App={App}, Dispatched={Dispatched}, WorkItems={WorkItems})",
_totalMessages, _appMessages, _dispatchedMessages, _workItemsExecuted);
}
catch (Exception ex)
{
Log.Error(ex, "STA COM thread crashed");
_ready.TrySetException(ex);
}
finally
{
_pumpExited = true;
DrainAndFaultQueue();
}
}
private void DrainQueue()
{
while (_workItems.TryDequeue(out var workItem))
{
_workItemsExecuted++;
try
{
workItem.Execute();
}
catch (Exception ex)
{
Log.Error(ex, "Unhandled exception in STA work item");
}
}
}
private void DrainAndFaultQueue()
{
var faultException = new InvalidOperationException("STA COM thread pump has exited");
while (_workItems.TryDequeue(out var workItem))
{
try
{
workItem.Fault(faultException);
}
catch
{
// Faulting a TCS should not throw, but guard against it
}
}
}
private void LogPumpStatsIfDue()
{
var now = DateTime.UtcNow;
if (now - _lastLogTime < PumpLogInterval) return;
Log.Debug(
"STA pump alive: Total={Total}, App={App}, Dispatched={Dispatched}, WorkItems={WorkItems}, Pending={Pending}",
_totalMessages, _appMessages, _dispatchedMessages, _workItemsExecuted, _workItems.Count);
_lastLogTime = now;
}
private sealed class WorkItem
{
public Action Execute { get; set; }
public Action<Exception> Fault { get; set; }
}
#region Win32 PInvoke
[StructLayout(LayoutKind.Sequential)]
private struct MSG
{
public IntPtr hwnd;
public uint message;
public IntPtr wParam;
public IntPtr lParam;
public uint time;
public POINT pt;
}
[StructLayout(LayoutKind.Sequential)]
private struct POINT
{
public int x;
public int y;
}
[DllImport("user32.dll")]
private static extern int GetMessage(out MSG lpMsg, IntPtr hWnd, uint wMsgFilterMin, uint wMsgFilterMax);
[DllImport("user32.dll")]
[return: MarshalAs(UnmanagedType.Bool)]
private static extern bool TranslateMessage(ref MSG lpMsg);
[DllImport("user32.dll")]
private static extern IntPtr DispatchMessage(ref MSG lpMsg);
[DllImport("user32.dll")]
[return: MarshalAs(UnmanagedType.Bool)]
private static extern bool PostThreadMessage(uint idThread, uint Msg, IntPtr wParam, IntPtr lParam);
[DllImport("user32.dll")]
private static extern void PostQuitMessage(int nExitCode);
[DllImport("user32.dll")]
[return: MarshalAs(UnmanagedType.Bool)]
private static extern bool PeekMessage(out MSG lpMsg, IntPtr hWnd, uint wMsgFilterMin, uint wMsgFilterMax,
uint wRemoveMsg);
[DllImport("kernel32.dll")]
private static extern uint GetCurrentThreadId();
#endregion
}
}