worker(alarms): COM-backed LmxSubtagAlarmSource advising alarm subtags

This commit is contained in:
Joseph Doherty
2026-06-13 09:24:09 -04:00
parent c75920c620
commit 4c0e14fc5d
2 changed files with 543 additions and 0 deletions
@@ -0,0 +1,209 @@
using System;
using System.Collections.Generic;
using ZB.MOM.WW.MxGateway.Worker.MxAccess;
namespace ZB.MOM.WW.MxGateway.Worker.Tests.MxAccess;
/// <summary>
/// Unit-test coverage for <see cref="LmxSubtagAlarmSource"/>'s advise/write
/// sequencing and its <c>OnDataChange</c> normalization. The actual
/// <c>LMXProxyServerClass</c> COM event subscription cannot be exercised
/// without a live MXAccess install, so these tests drive the source through
/// its internal <see cref="IMxAccessServer"/> seam and call
/// <c>HandleDataChange</c> directly to simulate a COM callback — exactly the
/// boundary <c>MxAccessBaseEventSink.OnDataChange</c> uses for the
/// per-session pipeline. End-to-end COM delivery is covered by the
/// Skip-gated alarm live smoke tests.
/// </summary>
public sealed class LmxSubtagAlarmSourceTests
{
private const int FakeServerHandle = 7;
/// <summary>Verifies the production constructor rejects a null factory.</summary>
[Fact]
public void Constructor_NullFactory_Throws()
{
Assert.Throws<ArgumentNullException>(() => new LmxSubtagAlarmSource(factory: null!));
}
/// <summary>
/// Verifies <see cref="LmxSubtagAlarmSource.Advise"/> calls AddItem then
/// Advise once per distinct address, and is idempotent on a repeated
/// address.
/// </summary>
[Fact]
public void Advise_AddsAndAdvisesEachAddressOnce()
{
var server = new RecordingMxAccessServer();
using var source = new LmxSubtagAlarmSource(server, FakeServerHandle);
source.Advise(new[] { "Tank1.Alarm.Subtag", "Tank2.Alarm.Subtag" });
// Re-advising an already-advised address is a no-op.
source.Advise(new[] { "Tank1.Alarm.Subtag" });
Assert.Equal(
new[] { "Tank1.Alarm.Subtag", "Tank2.Alarm.Subtag" },
server.AddedItems);
Assert.Equal(2, server.AdviseCount);
// Every advise targeted the supplied server handle.
Assert.All(server.AdvisedServerHandles, h => Assert.Equal(FakeServerHandle, h));
}
/// <summary>
/// Verifies a simulated <c>OnDataChange</c> for an advised item handle
/// raises <see cref="LmxSubtagAlarmSource.ValueChanged"/> with the
/// address that was advised and the delivered value.
/// </summary>
[Fact]
public void HandleDataChange_RaisesValueChangedWithAdvisedAddress()
{
var server = new RecordingMxAccessServer();
using var source = new LmxSubtagAlarmSource(server, FakeServerHandle);
source.Advise(new[] { "Tank1.Alarm.Subtag" });
int itemHandle = server.LastItemHandleFor("Tank1.Alarm.Subtag");
SubtagValueChange? received = null;
source.ValueChanged += (_, change) => received = change;
source.HandleDataChange(itemHandle, pvItemValue: 42, pftItemTimeStamp: null);
Assert.NotNull(received);
Assert.Equal("Tank1.Alarm.Subtag", received!.ItemAddress);
Assert.Equal(42, received.Value);
Assert.Equal(DateTimeKind.Utc, received.TimestampUtc.Kind);
}
/// <summary>
/// Verifies <c>OnDataChange</c> for an unknown item handle is ignored
/// (no <see cref="LmxSubtagAlarmSource.ValueChanged"/> raised).
/// </summary>
[Fact]
public void HandleDataChange_UnknownHandle_DoesNotRaise()
{
var server = new RecordingMxAccessServer();
using var source = new LmxSubtagAlarmSource(server, FakeServerHandle);
bool raised = false;
source.ValueChanged += (_, _) => raised = true;
source.HandleDataChange(phItemHandle: 999, pvItemValue: 1, pftItemTimeStamp: null);
Assert.False(raised);
}
/// <summary>
/// Verifies <see cref="LmxSubtagAlarmSource.Write"/> adds the item when
/// it was not previously advised and writes with user id 0.
/// </summary>
[Fact]
public void Write_AddsItemWhenUnknownAndWrites()
{
var server = new RecordingMxAccessServer();
using var source = new LmxSubtagAlarmSource(server, FakeServerHandle);
source.Write("Tank1.Alarm.AckComment", "acknowledged");
Assert.Contains("Tank1.Alarm.AckComment", server.AddedItems);
Assert.Single(server.Writes);
RecordingMxAccessServer.WriteRecord write = server.Writes[0];
Assert.Equal(FakeServerHandle, write.ServerHandle);
Assert.Equal("acknowledged", write.Value);
Assert.Equal(0, write.UserId);
}
/// <summary>
/// Verifies <see cref="LmxSubtagAlarmSource.Write"/> reuses an existing
/// item handle (no duplicate AddItem) when the address was already
/// advised.
/// </summary>
[Fact]
public void Write_ReusesHandleForAdvisedAddress()
{
var server = new RecordingMxAccessServer();
using var source = new LmxSubtagAlarmSource(server, FakeServerHandle);
source.Advise(new[] { "Tank1.Alarm.Subtag" });
int adviseAddCount = server.AddedItems.Count;
source.Write("Tank1.Alarm.Subtag", true);
// No second AddItem for the already-advised address.
Assert.Equal(adviseAddCount, server.AddedItems.Count);
Assert.Single(server.Writes);
}
/// <summary>
/// Recording <see cref="IMxAccessServer"/> test double that captures the
/// AddItem/Advise/Write/UnAdvise/RemoveItem/Unregister calls
/// <see cref="LmxSubtagAlarmSource"/> makes and hands out monotonically
/// increasing item handles.
/// </summary>
private sealed class RecordingMxAccessServer : IMxAccessServer
{
private readonly Dictionary<string, int> handlesByAddress = new(StringComparer.Ordinal);
private int nextItemHandle = 100;
public List<string> AddedItems { get; } = new();
public int AdviseCount { get; private set; }
public List<int> AdvisedServerHandles { get; } = new();
public List<WriteRecord> Writes { get; } = new();
public int LastItemHandleFor(string itemAddress) => handlesByAddress[itemAddress];
public int Register(string clientName) => FakeServerHandle;
public void Unregister(int serverHandle)
{
}
public int AddItem(int serverHandle, string itemDefinition)
{
AddedItems.Add(itemDefinition);
int handle = nextItemHandle++;
handlesByAddress[itemDefinition] = handle;
return handle;
}
public int AddItem2(int serverHandle, string itemDefinition, string itemContext)
=> AddItem(serverHandle, itemDefinition);
public void RemoveItem(int serverHandle, int itemHandle)
{
}
public void Advise(int serverHandle, int itemHandle)
{
AdviseCount++;
AdvisedServerHandles.Add(serverHandle);
}
public void UnAdvise(int serverHandle, int itemHandle)
{
}
public void AdviseSupervisory(int serverHandle, int itemHandle)
{
}
public void Write(int serverHandle, int itemHandle, object? value, int userId)
=> Writes.Add(new WriteRecord(serverHandle, itemHandle, value, userId));
public void Write2(int serverHandle, int itemHandle, object? value, object? timestamp, int userId)
{
}
public void WriteSecured(int serverHandle, int itemHandle, int currentUserId, int verifierUserId, object? value)
{
}
public void WriteSecured2(int serverHandle, int itemHandle, int currentUserId, int verifierUserId, object? value, object? timestamp)
{
}
internal sealed record WriteRecord(int ServerHandle, int ItemHandle, object? Value, int UserId);
}
}
@@ -0,0 +1,334 @@
using System;
using System.Collections.Generic;
using System.Runtime.InteropServices;
using ArchestrA.MxAccess;
namespace ZB.MOM.WW.MxGateway.Worker.MxAccess;
/// <summary>
/// Production <see cref="ISubtagAlarmSource"/> backed by its own MXAccess
/// <c>LMXProxyServerClass</c> COM object. It advises a set of alarm subtag
/// item addresses, normalizes each <c>OnDataChange</c> COM callback into a
/// <see cref="SubtagValueChange"/>, and supports writing a value (e.g. an
/// ack-comment) to a subtag.
/// </summary>
/// <remarks>
/// <para>
/// Like <see cref="WnWrapAlarmConsumer"/>, this type owns its
/// <em>own</em> MXAccess proxy-server COM object rather than sharing the
/// per-session item pipeline's <see cref="MxAccessSession"/>. That keeps
/// the subtag-fallback subscription isolated from the session's own
/// <c>Register</c>/<c>AddItem</c>/<c>Advise</c> bookkeeping: advising or
/// tearing down alarm subtags never perturbs the client's subscriptions.
/// </para>
/// <para>
/// <strong>Threading.</strong> The <c>LMXProxyServerClass</c> CLSID is
/// apartment-threaded and delivers <c>OnDataChange</c> via the STA
/// message pump. Every method on this type — the lazy <see cref="Advise"/>
/// creation, <see cref="Write"/>, and <see cref="Dispose"/> — must be
/// invoked on the worker's STA that created the source, exactly as
/// <see cref="MxAccessSession"/> and <see cref="WnWrapAlarmConsumer"/>
/// require. The <c>OnDataChange</c> COM event also arrives on that STA.
/// Because every call is confined to the single owning STA, no lock is
/// taken around the COM object or the handle maps: a lock here would risk
/// deadlocking the STA against a re-entrant pump callback, and the STA
/// affinity already serializes access.
/// </para>
/// </remarks>
public sealed class LmxSubtagAlarmSource : ISubtagAlarmSource
{
private const string DefaultClientName = "OtOpcUa.ZB.MOM.WW.MxGateway.Worker.SubtagAlarms";
private readonly IMxAccessComObjectFactory? factory;
private readonly string clientName;
private readonly Dictionary<string, int> itemHandlesByAddress =
new Dictionary<string, int>(StringComparer.Ordinal);
private readonly Dictionary<int, string> addressesByItemHandle =
new Dictionary<int, string>();
private object? mxAccessComObject;
private IMxAccessServer? server;
private LMXProxyServerClass? comEventSource;
private int serverHandle;
private bool registered;
private bool disposed;
/// <summary>
/// Production constructor. The MXAccess COM object is created lazily on
/// the first <see cref="Advise"/> call (which must run on the worker's
/// STA) via the supplied <paramref name="factory"/>.
/// </summary>
/// <param name="factory">Factory that creates the MXAccess proxy-server COM object.</param>
/// <param name="clientName">
/// Optional MXAccess client/registration name used for
/// <see cref="IMxAccessServer.Register"/>; a worker-specific default is
/// used when null or whitespace. Mirrors the registration name
/// <see cref="MxAccessSession.Register"/> accepts.
/// </param>
/// <exception cref="ArgumentNullException"><paramref name="factory"/> is null.</exception>
public LmxSubtagAlarmSource(
IMxAccessComObjectFactory factory,
string? clientName = null)
{
this.factory = factory ?? throw new ArgumentNullException(nameof(factory));
this.clientName = string.IsNullOrWhiteSpace(clientName) ? DefaultClientName : clientName!;
}
/// <summary>
/// Test-only seam: constructs a source over a caller-supplied
/// <see cref="IMxAccessServer"/> with a pre-registered server handle,
/// bypassing the live COM factory. Exposed via
/// <c>InternalsVisibleTo("ZB.MOM.WW.MxGateway.Worker.Tests")</c> so tests can
/// exercise <see cref="Advise"/>/<see cref="Write"/> and
/// <see cref="HandleDataChange"/> without a live <c>LMXProxyServerClass</c>.
/// No COM event source is wired in this mode — tests drive
/// <see cref="HandleDataChange"/> directly to simulate an
/// <c>OnDataChange</c> callback.
/// </summary>
/// <param name="server">The server abstraction to drive.</param>
/// <param name="serverHandle">The already-registered server handle to use.</param>
internal LmxSubtagAlarmSource(
IMxAccessServer server,
int serverHandle)
{
this.factory = null;
this.clientName = DefaultClientName;
this.server = server ?? throw new ArgumentNullException(nameof(server));
this.serverHandle = serverHandle;
this.registered = true;
}
/// <inheritdoc />
public event EventHandler<SubtagValueChange>? ValueChanged;
/// <inheritdoc />
/// <remarks>
/// Idempotent per address: an address already advised is skipped. The
/// MXAccess COM object is created and the client registered on the first
/// call. Must be invoked on the worker's STA.
/// </remarks>
public void Advise(IReadOnlyCollection<string> itemAddresses)
{
if (itemAddresses is null)
{
throw new ArgumentNullException(nameof(itemAddresses));
}
ThrowIfDisposed();
EnsureRegistered();
IMxAccessServer mxServer = server!;
foreach (string? itemAddress in itemAddresses)
{
if (string.IsNullOrWhiteSpace(itemAddress))
{
continue;
}
if (itemHandlesByAddress.ContainsKey(itemAddress!))
{
continue;
}
int itemHandle = mxServer.AddItem(serverHandle, itemAddress!);
mxServer.Advise(serverHandle, itemHandle);
itemHandlesByAddress[itemAddress!] = itemHandle;
addressesByItemHandle[itemHandle] = itemAddress!;
}
}
/// <inheritdoc />
/// <remarks>
/// Adds the item if it has not been added yet — the write may target a
/// subtag (e.g. the ack-comment) that was never advised — then writes
/// with MXAccess user id 0 (unsecured). Must be invoked on the worker's
/// STA.
/// </remarks>
public void Write(string itemAddress, object? value)
{
if (itemAddress is null)
{
throw new ArgumentNullException(nameof(itemAddress));
}
ThrowIfDisposed();
EnsureRegistered();
IMxAccessServer mxServer = server!;
if (!itemHandlesByAddress.TryGetValue(itemAddress, out int itemHandle))
{
itemHandle = mxServer.AddItem(serverHandle, itemAddress);
itemHandlesByAddress[itemAddress] = itemHandle;
addressesByItemHandle[itemHandle] = itemAddress;
}
mxServer.Write(serverHandle, itemHandle, value, userId: 0);
}
/// <summary>
/// Normalizes an MXAccess <c>OnDataChange</c> callback into a
/// <see cref="SubtagValueChange"/> and raises <see cref="ValueChanged"/>
/// when the item handle maps to an advised address. Unknown handles are
/// ignored. Exposed <c>internal</c> as a unit-test seam so the
/// normalization can be exercised without a live <c>LMXProxyServerClass</c>,
/// mirroring <c>MxAccessBaseEventSink.OnDataChange</c>.
/// </summary>
/// <param name="phItemHandle">The MXAccess item handle that changed.</param>
/// <param name="pvItemValue">The new item value (left as a boxed COM variant; the state machine coerces it).</param>
/// <param name="pftItemTimeStamp">The item timestamp as delivered by MXAccess (a culture-formatted string).</param>
internal void HandleDataChange(
int phItemHandle,
object? pvItemValue,
object? pftItemTimeStamp)
{
if (!addressesByItemHandle.TryGetValue(phItemHandle, out string? itemAddress))
{
return;
}
// MXAccess delivers OnDataChange timestamps as a culture-formatted
// string (VT_BSTR), not a FILETIME or VT_DATE — see
// MxAccessEventMapper.TryParseSourceTimestamp, which is the single
// conversion helper the per-session pipeline already uses. Reuse it so
// the subtag path interprets the host-local string the same way; fall
// back to capture time when the string is absent or unparsable.
DateTime timestampUtc = ConvertTimestampUtc(pftItemTimeStamp);
EventHandler<SubtagValueChange>? handler = ValueChanged;
handler?.Invoke(this, new SubtagValueChange
{
ItemAddress = itemAddress,
Value = pvItemValue,
TimestampUtc = timestampUtc,
});
}
/// <inheritdoc />
public void Dispose()
{
if (disposed)
{
return;
}
disposed = true;
// Unsubscribe the COM event before tearing handles down so a late pump
// callback cannot re-enter HandleDataChange mid-teardown.
if (comEventSource is not null)
{
try { comEventSource.OnDataChange -= OnDataChange; } catch { /* swallow — best effort */ }
}
IMxAccessServer? mxServer = server;
if (mxServer is not null && registered)
{
foreach (KeyValuePair<int, string> entry in addressesByItemHandle)
{
try { mxServer.UnAdvise(serverHandle, entry.Key); } catch { /* swallow — best effort */ }
try { mxServer.RemoveItem(serverHandle, entry.Key); } catch { /* swallow — best effort */ }
}
try { mxServer.Unregister(serverHandle); } catch { /* swallow — best effort */ }
}
itemHandlesByAddress.Clear();
addressesByItemHandle.Clear();
object? comToRelease = mxAccessComObject;
mxAccessComObject = null;
comEventSource = null;
server = null;
registered = false;
if (comToRelease is not null && Marshal.IsComObject(comToRelease))
{
try { Marshal.FinalReleaseComObject(comToRelease); } catch { /* swallow — best effort */ }
}
}
private void EnsureRegistered()
{
if (registered)
{
return;
}
// factory is only null in the test-only constructor, which sets
// registered = true, so this path is never hit there.
object created = factory!.Create()
?? throw new InvalidOperationException("MXAccess COM factory returned null.");
try
{
mxAccessComObject = created;
// Wire OnDataChange the same way MxAccessBaseEventSink.Attach does:
// cast to the concrete LMXProxyServerClass RCW and subscribe. Test
// doubles cannot be cast to LMXProxyServerClass, so the COM event
// subscription is exercised only on the live path; unit tests drive
// HandleDataChange directly via the internal constructor seam.
if (created is LMXProxyServerClass comObject)
{
comEventSource = comObject;
comObject.OnDataChange += OnDataChange;
}
server = new MxAccessComServer(created);
serverHandle = server.Register(clientName);
registered = true;
}
catch
{
if (comEventSource is not null)
{
try { comEventSource.OnDataChange -= OnDataChange; } catch { /* swallow */ }
comEventSource = null;
}
if (Marshal.IsComObject(created))
{
try { Marshal.FinalReleaseComObject(created); } catch { /* swallow */ }
}
mxAccessComObject = null;
server = null;
throw;
}
}
/// <summary>
/// COM <c>OnDataChange</c> delegate target. Forwards to
/// <see cref="HandleDataChange"/>; the quality, status array, and server
/// handle carry no information the subtag fallback needs.
/// </summary>
private void OnDataChange(
int hLMXServerHandle,
int phItemHandle,
object pvItemValue,
int pwItemQuality,
object pftItemTimeStamp,
ref MXSTATUS_PROXY[] pVars)
{
HandleDataChange(phItemHandle, pvItemValue, pftItemTimeStamp);
}
private static DateTime ConvertTimestampUtc(object? pftItemTimeStamp)
{
if (pftItemTimeStamp is not null
&& MxAccessEventMapper.TryParseSourceTimestamp(pftItemTimeStamp.ToString(), out DateTime parsedUtc))
{
return parsedUtc;
}
return DateTime.UtcNow;
}
private void ThrowIfDisposed()
{
if (disposed)
{
throw new ObjectDisposedException(nameof(LmxSubtagAlarmSource));
}
}
}