Files
mxaccessgw/src/MxGateway.Worker/MxAccess/WnWrapAlarmConsumer.cs
T
Joseph Doherty 53e3973209 Resolve Worker-001, Worker-002, Worker-003 code-review findings
Worker-001: WnWrapAlarmConsumer armed a System.Threading.Timer whose OnPoll
callback ran GetXmlCurrentAlarms2 on a thread-pool thread against the
Apartment-threaded wnwrap COM object, which can deadlock on cross-apartment
marshaling. Removed the pollTimer/pollIntervalMs fields, OnPoll, the
poll-interval constructor parameter, and the timer arm/disposal. Polls are
driven externally by the STA via StaRuntime.InvokeAsync(PollOnce).

Worker-002: RunHeartbeatLoopAsync delayed a full HeartbeatInterval before
the first heartbeat. Restructured so the first beat is sent immediately on
entering the loop and the delay applies only between subsequent beats.

Worker-003: ProcessCommandAsync silently returned without a reply when
_state was not a command-serving state after dispatch. Both drop sites now
log a WorkerCommandResultDropped diagnostic with correlation_id via
IWorkerLogger; _state is now volatile.

Three pre-existing tests that asserted strict frame ordering were updated to
tolerate an interleaved first heartbeat (Worker-002 consequence).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-18 20:59:46 -04:00

519 lines
21 KiB
C#

using System;
using System.Collections.Generic;
using System.Globalization;
using System.Runtime.InteropServices;
using System.Xml;
using WNWRAPCONSUMERLib;
namespace MxGateway.Worker.MxAccess;
/// <summary>
/// Production <see cref="IMxAccessAlarmConsumer"/> backed by AVEVA's
/// standalone <c>WNWRAPCONSUMERLib.wwAlarmConsumerClass</c> COM object
/// (CLSID <c>{7AB52E5F-36B2-4A30-AE46-952A746F667C}</c>, hosted by
/// <c>C:\Program Files (x86)\Common Files\ArchestrA\wnwrapConsumer.dll</c>).
/// </summary>
/// <remarks>
/// <para>
/// Replaces the earlier <c>AlarmClientConsumer</c> built on
/// <c>aaAlarmManagedClient.AlarmClient</c>, which crashed in
/// <c>GetHighPriAlarm</c> with <c>ArgumentOutOfRangeException</c>
/// (FILETIME→DateTime auto-marshaling on AVEVA's sentinel timestamps).
/// The wnwrap surface returns the alarm record as a BSTR XML string
/// via <c>GetXmlCurrentAlarms2</c>; timestamps arrive as ASCII
/// <c>DATE</c> + <c>TIME</c> + <c>GMTOFFSET</c> + <c>DSTADJUST</c>
/// fields and never touch the .NET DateTime marshaler. See
/// <c>docs/AlarmClientDiscovery.md</c> "Option A — captured" for
/// the discovery and the captured payload schema.
/// </para>
/// <para>
/// <strong>Threading.</strong> The wnwrap CLSID is registered with
/// <c>ThreadingModel=Apartment</c>. The consumer must be created
/// and operated from an STA thread; the worker's
/// <see cref="MxAccessStaSession"/> runs an STA pump that hosts it.
/// The consumer owns <em>no</em> internal timer: every COM call
/// (<c>Subscribe</c>, <c>PollOnce</c>, <c>AcknowledgeBy*</c>) must
/// be invoked on the STA that created the consumer. Polling cadence
/// is driven externally by the worker's STA via
/// <c>StaRuntime.InvokeAsync(() =&gt; consumer.PollOnce())</c>, which
/// keeps every <c>GetXmlCurrentAlarms2</c> call on the apartment that
/// owns the COM object. A thread-pool timer would call the COM API
/// off the owning STA and can deadlock on cross-apartment marshaling
/// when the STA is not pumping messages, so no such timer exists.
/// </para>
/// </remarks>
public sealed class WnWrapAlarmConsumer : IMxAccessAlarmConsumer
{
private const string DefaultProductName = "OtOpcUa.MxGateway";
private const string DefaultApplicationName = "OtOpcUa.MxGateway.Worker";
private const string DefaultVersion = "1.0";
private const int DefaultMaxAlarmsPerFetch = 1024;
private readonly object syncRoot = new object();
private readonly Dictionary<Guid, MxAlarmSnapshotRecord> latestSnapshot =
new Dictionary<Guid, MxAlarmSnapshotRecord>();
private readonly int maxAlarmsPerFetch;
private wwAlarmConsumerClass? client;
private wwAlarmConsumerClass? ackClient;
private string subscriptionExpression = string.Empty;
private bool subscribed;
private bool disposed;
/// <summary>
/// Production constructor — creates the wnwrap COM object on the
/// current thread (which must be the worker's STA). Polling is driven
/// externally by the STA via
/// <c>StaRuntime.InvokeAsync(() =&gt; consumer.PollOnce())</c> so that
/// every COM call stays on the STA that owns the apartment.
/// </summary>
public WnWrapAlarmConsumer()
: this(new wwAlarmConsumerClass(), DefaultMaxAlarmsPerFetch)
{
}
/// <summary>
/// Test seam / explicit construction.
/// </summary>
public WnWrapAlarmConsumer(
wwAlarmConsumerClass client,
int maxAlarmsPerFetch)
{
this.client = client ?? throw new ArgumentNullException(nameof(client));
this.maxAlarmsPerFetch = maxAlarmsPerFetch > 0
? maxAlarmsPerFetch
: DefaultMaxAlarmsPerFetch;
}
/// <inheritdoc />
public event EventHandler<MxAlarmTransitionEvent>? AlarmTransitionEmitted;
/// <inheritdoc />
public void Subscribe(string subscription)
{
if (subscription is null) throw new ArgumentNullException(nameof(subscription));
if (disposed) throw new ObjectDisposedException(nameof(WnWrapAlarmConsumer));
lock (syncRoot)
{
if (subscribed)
{
throw new InvalidOperationException(
"WnWrapAlarmConsumer.Subscribe was called more than once; " +
"wwAlarmConsumerClass.Subscribe replaces the previous filter and is not idempotent.");
}
wwAlarmConsumerClass com = client
?? throw new ObjectDisposedException(nameof(WnWrapAlarmConsumer));
// Use the IwwAlarmConsumer (v1) prefix-named methods for the
// lifecycle. Empirically (live dev-rig 2026-05-01) this is the
// only path that lets AlarmAckByName succeed afterwards. The
// v2 Initialize/Register/Subscribe methods on the class
// succeed (return 0) but acks against that consumer state
// return -55. The v1 prefix path is what WIN-911-style code
// uses against the same wnwrap library.
int init = com.IwwAlarmConsumer_InitializeConsumer(DefaultApplicationName);
if (init != 0)
{
throw new InvalidOperationException(
$"wwAlarmConsumer.InitializeConsumer returned non-zero status {init}.");
}
// hWnd=0: wnwrap supports a pull-based model — no message pump
// is required. GetXmlCurrentAlarms2 is polled by the worker's STA
// via StaRuntime.InvokeAsync(() => consumer.PollOnce()); this type
// owns no internal timer.
int reg = com.IwwAlarmConsumer_RegisterConsumer(
hWnd: 0,
szProductName: DefaultProductName,
szApplicationName: DefaultApplicationName,
szVersion: DefaultVersion);
if (reg != 0)
{
throw new InvalidOperationException(
$"wwAlarmConsumer.RegisterConsumer returned non-zero status {reg}.");
}
int sub = com.IwwAlarmConsumer_Subscribe(
szSubscription: subscription,
wFromPri: 1,
wToPri: 999,
QueryType: eQueryType.qtSummary,
SortFlags: eSortFlags.sfReturnNewestFirst,
FilterMask: eAlarmFilterState.asAlarmActiveNow,
FilterSpecification: eAlarmFilterState.asAlarmActiveNow);
if (sub != 0)
{
throw new InvalidOperationException(
$"wwAlarmConsumer.Subscribe('{subscription}') returned non-zero status {sub}.");
}
// Empirically required: even though the round-trip echo of
// SetXmlAlarmQuery is mangled (see docs/AlarmClientDiscovery.md),
// calling it is necessary for subsequent GetXmlCurrentAlarms2
// calls to succeed. Without it, GetXmlCurrentAlarms2 returns
// E_FAIL (HRESULT 0x80004005) on the first poll. SetXmlAlarmQuery
// also breaks AlarmAckByName on the same consumer (rejects with
// -55), so a separate ack-only consumer is provisioned below
// that gets only Initialize/Register/Subscribe (no SetXmlAlarmQuery).
string xmlQuery = ComposeXmlAlarmQuery(subscription);
com.SetXmlAlarmQuery(xmlQuery);
// Provision a parallel COM consumer for ack calls. It runs the
// v1 lifecycle (Initialize/Register/Subscribe) only; without
// SetXmlAlarmQuery, AlarmAckByName succeeds. State is read-only
// — we never poll this consumer.
ackClient = new wwAlarmConsumerClass();
int ackInit = ackClient.IwwAlarmConsumer_InitializeConsumer(DefaultApplicationName + ".ack");
int ackReg = ackClient.IwwAlarmConsumer_RegisterConsumer(
hWnd: 0,
szProductName: DefaultProductName,
szApplicationName: DefaultApplicationName + ".ack",
szVersion: DefaultVersion);
int ackSub = ackClient.IwwAlarmConsumer_Subscribe(
szSubscription: subscription,
wFromPri: 1,
wToPri: 999,
QueryType: eQueryType.qtSummary,
SortFlags: eSortFlags.sfReturnNewestFirst,
FilterMask: eAlarmFilterState.asAlarmActiveNow,
FilterSpecification: eAlarmFilterState.asAlarmActiveNow);
if (ackInit != 0 || ackReg != 0 || ackSub != 0)
{
throw new InvalidOperationException(
$"Ack consumer setup returned non-zero status: " +
$"Initialize={ackInit}, Register={ackReg}, Subscribe={ackSub}.");
}
subscriptionExpression = subscription;
subscribed = true;
}
}
/// <inheritdoc />
public int AcknowledgeByGuid(
Guid alarmGuid,
string ackComment,
string ackOperatorName,
string ackOperatorNode,
string ackOperatorDomain,
string ackOperatorFullName)
{
if (disposed) throw new ObjectDisposedException(nameof(WnWrapAlarmConsumer));
wwAlarmConsumerClass com = client
?? throw new ObjectDisposedException(nameof(WnWrapAlarmConsumer));
// VBGUID is wnwrap's GUID interop struct (same memory layout as
// System.Guid: int32 + 2x int16 + 8x byte). Convert via a single
// unmanaged-blittable round-trip.
VBGUID vb = ToVbGuid(alarmGuid);
return com.AlarmAckByGUID(
AlmGUID: vb,
szComment: ackComment ?? string.Empty,
szOprName: ackOperatorName ?? string.Empty,
szNode: ackOperatorNode ?? string.Empty,
szDomainName: ackOperatorDomain ?? string.Empty,
szOprFullName: ackOperatorFullName ?? string.Empty);
}
/// <inheritdoc />
public int AcknowledgeByName(
string alarmName,
string providerName,
string groupName,
string ackComment,
string ackOperatorName,
string ackOperatorNode,
string ackOperatorDomain,
string ackOperatorFullName)
{
if (disposed) throw new ObjectDisposedException(nameof(WnWrapAlarmConsumer));
// Use the parallel ack-only consumer (no SetXmlAlarmQuery applied)
// — see docs/AlarmClientDiscovery.md "Option A — captured" for the
// empirical justification.
wwAlarmConsumerClass com = ackClient
?? throw new InvalidOperationException(
"Cannot acknowledge: WnWrapAlarmConsumer was disposed or has not been subscribed yet.");
// Empirically (live dev-rig 2026-05-01): the IwwAlarmConsumer2
// 8-arg AlarmAckByName returns -55 on this AVEVA build (looks like
// a stub). The legacy 6-arg IwwAlarmConsumer.AlarmAckByName works
// and reaches the alarm-history path correctly. Operator-domain
// and operator-full-name fields are accepted by the proto contract
// for forward-compat but are not propagated to AVEVA today —
// wrapped in the 6-arg call so domain/full-name go to the
// alarm-history operator-name field via the szOprName parameter.
// Suppress unused-warning explicitly:
_ = ackOperatorDomain;
_ = ackOperatorFullName;
return com.AlarmAckByName(
szAlarmName: alarmName ?? string.Empty,
szProviderName: providerName ?? string.Empty,
szGroupName: groupName ?? string.Empty,
szComment: ackComment ?? string.Empty,
szOprName: ackOperatorName ?? string.Empty,
szNode: ackOperatorNode ?? string.Empty);
}
/// <inheritdoc />
public IReadOnlyList<MxAlarmSnapshotRecord> SnapshotActiveAlarms()
{
if (disposed) throw new ObjectDisposedException(nameof(WnWrapAlarmConsumer));
lock (syncRoot)
{
List<MxAlarmSnapshotRecord> active = new List<MxAlarmSnapshotRecord>();
foreach (MxAlarmSnapshotRecord record in latestSnapshot.Values)
{
if (record.State == MxAlarmStateKind.UnackAlm
|| record.State == MxAlarmStateKind.AckAlm)
{
active.Add(record);
}
}
return active;
}
}
/// <summary>
/// Synchronously poll the wnwrap consumer once and dispatch any
/// transitions. STA-bound hosts drive polling by calling this from
/// the thread that owns the COM object. The consumer deliberately
/// owns no internal timer: a thread-pool timer would call the
/// apartment-threaded COM object off its owning STA and can block
/// indefinitely on cross-apartment marshaling when the STA is not
/// pumping messages.
/// </summary>
public void PollOnce()
{
wwAlarmConsumerClass? com;
lock (syncRoot)
{
if (disposed || !subscribed) return;
com = client;
}
if (com is null) return;
object xmlObj = string.Empty;
com.GetXmlCurrentAlarms2(maxAlmCnt: maxAlarmsPerFetch, vartCurrentXmlAlarms: out xmlObj);
string xml = xmlObj?.ToString() ?? string.Empty;
if (xml.Length == 0) return;
Dictionary<Guid, MxAlarmSnapshotRecord> next = ParseSnapshotXml(xml);
List<MxAlarmTransitionEvent> transitions = new List<MxAlarmTransitionEvent>();
lock (syncRoot)
{
foreach (KeyValuePair<Guid, MxAlarmSnapshotRecord> kv in next)
{
MxAlarmStateKind previousState = MxAlarmStateKind.Unspecified;
if (latestSnapshot.TryGetValue(kv.Key, out MxAlarmSnapshotRecord? prev))
{
previousState = prev.State;
if (previousState == kv.Value.State) continue; // no transition
}
transitions.Add(new MxAlarmTransitionEvent
{
Record = kv.Value,
PreviousState = previousState,
});
}
latestSnapshot.Clear();
foreach (KeyValuePair<Guid, MxAlarmSnapshotRecord> kv in next)
{
latestSnapshot[kv.Key] = kv.Value;
}
}
if (transitions.Count == 0) return;
EventHandler<MxAlarmTransitionEvent>? handler = AlarmTransitionEmitted;
if (handler is null) return;
foreach (MxAlarmTransitionEvent transition in transitions)
{
handler.Invoke(this, transition);
}
}
/// <summary>
/// Parse the XML payload returned by <c>GetXmlCurrentAlarms2</c>
/// into a GUID-keyed dictionary. Records with malformed GUIDs are
/// silently dropped (no fault is recorded — the next poll will
/// resync).
/// </summary>
public static Dictionary<Guid, MxAlarmSnapshotRecord> ParseSnapshotXml(string xml)
{
Dictionary<Guid, MxAlarmSnapshotRecord> records =
new Dictionary<Guid, MxAlarmSnapshotRecord>();
if (string.IsNullOrWhiteSpace(xml)) return records;
XmlDocument doc = new XmlDocument();
doc.LoadXml(xml);
XmlNodeList? alarmNodes = doc.SelectNodes("/ALARM_RECORDS/ALARM");
if (alarmNodes is null) return records;
foreach (XmlNode alarmNode in alarmNodes)
{
string guidHex = TextOf(alarmNode, "GUID");
if (!TryParseHexGuid(guidHex, out Guid guid)) continue;
string xmlDate = TextOf(alarmNode, "DATE");
string xmlTime = TextOf(alarmNode, "TIME");
int gmtOffset = ParseInt(TextOf(alarmNode, "GMTOFFSET"));
int dstAdjust = ParseInt(TextOf(alarmNode, "DSTADJUST"));
DateTime tsUtc = AlarmRecordTransitionMapper.ParseTransitionTimestampUtc(
xmlDate, xmlTime, gmtOffset, dstAdjust);
records[guid] = new MxAlarmSnapshotRecord
{
AlarmGuid = guid,
TransitionTimestampUtc = tsUtc,
ProviderNode = TextOf(alarmNode, "PROVIDER_NODE"),
ProviderName = TextOf(alarmNode, "PROVIDER_NAME"),
Group = TextOf(alarmNode, "GROUP"),
TagName = TextOf(alarmNode, "TAGNAME"),
Type = TextOf(alarmNode, "TYPE"),
Value = TextOf(alarmNode, "VALUE"),
Limit = TextOf(alarmNode, "LIMIT"),
Priority = ParseInt(TextOf(alarmNode, "PRIORITY")),
State = AlarmRecordTransitionMapper.ParseStateKind(TextOf(alarmNode, "STATE")),
OperatorNode = TextOf(alarmNode, "OPERATOR_NODE"),
OperatorName = TextOf(alarmNode, "OPERATOR_NAME"),
AlarmComment = TextOf(alarmNode, "ALARM_COMMENT"),
};
}
return records;
}
private static string TextOf(XmlNode parent, string childName)
{
XmlNode? node = parent.SelectSingleNode(childName);
return node?.InnerText ?? string.Empty;
}
private static int ParseInt(string text)
{
return int.TryParse(text, NumberStyles.Integer, CultureInfo.InvariantCulture, out int n)
? n : 0;
}
/// <summary>
/// wnwrap's XML <c>GUID</c> field is a 32-char hex string with no
/// dashes (e.g. <c>"BCC4705395424D65BDAABCDEA6A32A73"</c>). Convert
/// to <see cref="Guid"/>'s canonical 8-4-4-4-12 layout.
/// </summary>
public static bool TryParseHexGuid(string? hex, out Guid guid)
{
guid = Guid.Empty;
if (string.IsNullOrWhiteSpace(hex)) return false;
string trimmed = hex!.Trim();
if (Guid.TryParse(trimmed, out guid)) return true;
if (trimmed.Length != 32) return false;
string canonical =
trimmed.Substring(0, 8) + "-" +
trimmed.Substring(8, 4) + "-" +
trimmed.Substring(12, 4) + "-" +
trimmed.Substring(16, 4) + "-" +
trimmed.Substring(20, 12);
return Guid.TryParse(canonical, out guid);
}
/// <summary>
/// Compose the XML payload <c>SetXmlAlarmQuery</c> expects from a
/// canonical subscription expression
/// (<c>\\&lt;machine&gt;\Galaxy!&lt;area&gt;</c>). The wnwrap
/// consumer mangles the round-trip but evidently still needs the
/// call — without it <c>GetXmlCurrentAlarms2</c> fails with
/// E_FAIL. Best-effort parse: if the subscription doesn't decompose
/// cleanly, fall back to a permissive ALL-priority/ALL-state form
/// so the worker doesn't fail to start.
/// </summary>
internal static string ComposeXmlAlarmQuery(string subscription)
{
string node = Environment.MachineName;
string provider = "Galaxy";
string group = string.Empty;
if (!string.IsNullOrEmpty(subscription))
{
// Strip leading backslashes from "\\<node>\..." form.
string trimmed = subscription.TrimStart('\\');
int slash = trimmed.IndexOf('\\');
if (slash > 0)
{
node = trimmed.Substring(0, slash);
trimmed = trimmed.Substring(slash + 1);
}
int bang = trimmed.IndexOf('!');
if (bang > 0)
{
provider = trimmed.Substring(0, bang);
group = trimmed.Substring(bang + 1);
}
else
{
provider = trimmed;
}
}
System.Text.StringBuilder sb = new System.Text.StringBuilder();
sb.Append("<QUERIES FROM_PRIORITY=\"1\" TO_PRIORITY=\"999\" ALARM_STATE=\"ALL\" DISPLAY_MODE=\"Summary\">");
sb.Append("<QUERY>");
sb.Append("<NODE>").Append(node).Append("</NODE>");
sb.Append("<PROVIDER>").Append(provider).Append("</PROVIDER>");
if (!string.IsNullOrEmpty(group))
{
sb.Append("<GROUP>").Append(group).Append("</GROUP>");
}
sb.Append("</QUERY>");
sb.Append("</QUERIES>");
return sb.ToString();
}
private static VBGUID ToVbGuid(Guid g)
{
byte[] bytes = g.ToByteArray();
// Guid byte layout: int32-LE + int16-LE + int16-LE + 8 bytes (Data4).
VBGUID vb = new VBGUID
{
Data1 = BitConverter.ToInt32(bytes, 0),
Data2 = BitConverter.ToInt16(bytes, 4),
Data3 = BitConverter.ToInt16(bytes, 6),
Data4 = new byte[8],
};
Array.Copy(bytes, 8, vb.Data4, 0, 8);
return vb;
}
/// <inheritdoc />
public void Dispose()
{
wwAlarmConsumerClass? clientToDispose;
wwAlarmConsumerClass? ackClientToDispose;
lock (syncRoot)
{
if (disposed) return;
disposed = true;
clientToDispose = client;
client = null;
ackClientToDispose = ackClient;
ackClient = null;
}
ReleaseConsumerCom(clientToDispose);
ReleaseConsumerCom(ackClientToDispose);
}
private static void ReleaseConsumerCom(wwAlarmConsumerClass? consumer)
{
if (consumer is null) return;
try { consumer.DeregisterConsumer(); } catch { /* swallow */ }
try { consumer.UninitializeConsumer(); } catch { /* swallow */ }
if (Marshal.IsComObject(consumer))
{
try { Marshal.FinalReleaseComObject(consumer); } catch { /* swallow */ }
}
}
}