From 4c0e14fc5d7bca374d556aad7e6aa513f89b3a4b Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sat, 13 Jun 2026 09:24:09 -0400 Subject: [PATCH] worker(alarms): COM-backed LmxSubtagAlarmSource advising alarm subtags --- .../MxAccess/LmxSubtagAlarmSourceTests.cs | 209 +++++++++++ .../MxAccess/LmxSubtagAlarmSource.cs | 334 ++++++++++++++++++ 2 files changed, 543 insertions(+) create mode 100644 src/ZB.MOM.WW.MxGateway.Worker.Tests/MxAccess/LmxSubtagAlarmSourceTests.cs create mode 100644 src/ZB.MOM.WW.MxGateway.Worker/MxAccess/LmxSubtagAlarmSource.cs diff --git a/src/ZB.MOM.WW.MxGateway.Worker.Tests/MxAccess/LmxSubtagAlarmSourceTests.cs b/src/ZB.MOM.WW.MxGateway.Worker.Tests/MxAccess/LmxSubtagAlarmSourceTests.cs new file mode 100644 index 0000000..6178e82 --- /dev/null +++ b/src/ZB.MOM.WW.MxGateway.Worker.Tests/MxAccess/LmxSubtagAlarmSourceTests.cs @@ -0,0 +1,209 @@ +using System; +using System.Collections.Generic; +using ZB.MOM.WW.MxGateway.Worker.MxAccess; + +namespace ZB.MOM.WW.MxGateway.Worker.Tests.MxAccess; + +/// +/// Unit-test coverage for 's advise/write +/// sequencing and its OnDataChange normalization. The actual +/// LMXProxyServerClass COM event subscription cannot be exercised +/// without a live MXAccess install, so these tests drive the source through +/// its internal seam and call +/// HandleDataChange directly to simulate a COM callback — exactly the +/// boundary MxAccessBaseEventSink.OnDataChange uses for the +/// per-session pipeline. End-to-end COM delivery is covered by the +/// Skip-gated alarm live smoke tests. +/// +public sealed class LmxSubtagAlarmSourceTests +{ + private const int FakeServerHandle = 7; + + /// Verifies the production constructor rejects a null factory. + [Fact] + public void Constructor_NullFactory_Throws() + { + Assert.Throws(() => new LmxSubtagAlarmSource(factory: null!)); + } + + /// + /// Verifies calls AddItem then + /// Advise once per distinct address, and is idempotent on a repeated + /// address. + /// + [Fact] + public void Advise_AddsAndAdvisesEachAddressOnce() + { + var server = new RecordingMxAccessServer(); + using var source = new LmxSubtagAlarmSource(server, FakeServerHandle); + + source.Advise(new[] { "Tank1.Alarm.Subtag", "Tank2.Alarm.Subtag" }); + // Re-advising an already-advised address is a no-op. + source.Advise(new[] { "Tank1.Alarm.Subtag" }); + + Assert.Equal( + new[] { "Tank1.Alarm.Subtag", "Tank2.Alarm.Subtag" }, + server.AddedItems); + Assert.Equal(2, server.AdviseCount); + // Every advise targeted the supplied server handle. + Assert.All(server.AdvisedServerHandles, h => Assert.Equal(FakeServerHandle, h)); + } + + /// + /// Verifies a simulated OnDataChange for an advised item handle + /// raises with the + /// address that was advised and the delivered value. + /// + [Fact] + public void HandleDataChange_RaisesValueChangedWithAdvisedAddress() + { + var server = new RecordingMxAccessServer(); + using var source = new LmxSubtagAlarmSource(server, FakeServerHandle); + + source.Advise(new[] { "Tank1.Alarm.Subtag" }); + int itemHandle = server.LastItemHandleFor("Tank1.Alarm.Subtag"); + + SubtagValueChange? received = null; + source.ValueChanged += (_, change) => received = change; + + source.HandleDataChange(itemHandle, pvItemValue: 42, pftItemTimeStamp: null); + + Assert.NotNull(received); + Assert.Equal("Tank1.Alarm.Subtag", received!.ItemAddress); + Assert.Equal(42, received.Value); + Assert.Equal(DateTimeKind.Utc, received.TimestampUtc.Kind); + } + + /// + /// Verifies OnDataChange for an unknown item handle is ignored + /// (no raised). + /// + [Fact] + public void HandleDataChange_UnknownHandle_DoesNotRaise() + { + var server = new RecordingMxAccessServer(); + using var source = new LmxSubtagAlarmSource(server, FakeServerHandle); + + bool raised = false; + source.ValueChanged += (_, _) => raised = true; + + source.HandleDataChange(phItemHandle: 999, pvItemValue: 1, pftItemTimeStamp: null); + + Assert.False(raised); + } + + /// + /// Verifies adds the item when + /// it was not previously advised and writes with user id 0. + /// + [Fact] + public void Write_AddsItemWhenUnknownAndWrites() + { + var server = new RecordingMxAccessServer(); + using var source = new LmxSubtagAlarmSource(server, FakeServerHandle); + + source.Write("Tank1.Alarm.AckComment", "acknowledged"); + + Assert.Contains("Tank1.Alarm.AckComment", server.AddedItems); + Assert.Single(server.Writes); + RecordingMxAccessServer.WriteRecord write = server.Writes[0]; + Assert.Equal(FakeServerHandle, write.ServerHandle); + Assert.Equal("acknowledged", write.Value); + Assert.Equal(0, write.UserId); + } + + /// + /// Verifies reuses an existing + /// item handle (no duplicate AddItem) when the address was already + /// advised. + /// + [Fact] + public void Write_ReusesHandleForAdvisedAddress() + { + var server = new RecordingMxAccessServer(); + using var source = new LmxSubtagAlarmSource(server, FakeServerHandle); + + source.Advise(new[] { "Tank1.Alarm.Subtag" }); + int adviseAddCount = server.AddedItems.Count; + + source.Write("Tank1.Alarm.Subtag", true); + + // No second AddItem for the already-advised address. + Assert.Equal(adviseAddCount, server.AddedItems.Count); + Assert.Single(server.Writes); + } + + /// + /// Recording test double that captures the + /// AddItem/Advise/Write/UnAdvise/RemoveItem/Unregister calls + /// makes and hands out monotonically + /// increasing item handles. + /// + private sealed class RecordingMxAccessServer : IMxAccessServer + { + private readonly Dictionary handlesByAddress = new(StringComparer.Ordinal); + private int nextItemHandle = 100; + + public List AddedItems { get; } = new(); + + public int AdviseCount { get; private set; } + + public List AdvisedServerHandles { get; } = new(); + + public List Writes { get; } = new(); + + public int LastItemHandleFor(string itemAddress) => handlesByAddress[itemAddress]; + + public int Register(string clientName) => FakeServerHandle; + + public void Unregister(int serverHandle) + { + } + + public int AddItem(int serverHandle, string itemDefinition) + { + AddedItems.Add(itemDefinition); + int handle = nextItemHandle++; + handlesByAddress[itemDefinition] = handle; + return handle; + } + + public int AddItem2(int serverHandle, string itemDefinition, string itemContext) + => AddItem(serverHandle, itemDefinition); + + public void RemoveItem(int serverHandle, int itemHandle) + { + } + + public void Advise(int serverHandle, int itemHandle) + { + AdviseCount++; + AdvisedServerHandles.Add(serverHandle); + } + + public void UnAdvise(int serverHandle, int itemHandle) + { + } + + public void AdviseSupervisory(int serverHandle, int itemHandle) + { + } + + public void Write(int serverHandle, int itemHandle, object? value, int userId) + => Writes.Add(new WriteRecord(serverHandle, itemHandle, value, userId)); + + public void Write2(int serverHandle, int itemHandle, object? value, object? timestamp, int userId) + { + } + + public void WriteSecured(int serverHandle, int itemHandle, int currentUserId, int verifierUserId, object? value) + { + } + + public void WriteSecured2(int serverHandle, int itemHandle, int currentUserId, int verifierUserId, object? value, object? timestamp) + { + } + + internal sealed record WriteRecord(int ServerHandle, int ItemHandle, object? Value, int UserId); + } +} diff --git a/src/ZB.MOM.WW.MxGateway.Worker/MxAccess/LmxSubtagAlarmSource.cs b/src/ZB.MOM.WW.MxGateway.Worker/MxAccess/LmxSubtagAlarmSource.cs new file mode 100644 index 0000000..c16e2e6 --- /dev/null +++ b/src/ZB.MOM.WW.MxGateway.Worker/MxAccess/LmxSubtagAlarmSource.cs @@ -0,0 +1,334 @@ +using System; +using System.Collections.Generic; +using System.Runtime.InteropServices; +using ArchestrA.MxAccess; + +namespace ZB.MOM.WW.MxGateway.Worker.MxAccess; + +/// +/// Production backed by its own MXAccess +/// LMXProxyServerClass COM object. It advises a set of alarm subtag +/// item addresses, normalizes each OnDataChange COM callback into a +/// , and supports writing a value (e.g. an +/// ack-comment) to a subtag. +/// +/// +/// +/// Like , this type owns its +/// own MXAccess proxy-server COM object rather than sharing the +/// per-session item pipeline's . That keeps +/// the subtag-fallback subscription isolated from the session's own +/// Register/AddItem/Advise bookkeeping: advising or +/// tearing down alarm subtags never perturbs the client's subscriptions. +/// +/// +/// Threading. The LMXProxyServerClass CLSID is +/// apartment-threaded and delivers OnDataChange via the STA +/// message pump. Every method on this type — the lazy +/// creation, , and — must be +/// invoked on the worker's STA that created the source, exactly as +/// and +/// require. The OnDataChange COM event also arrives on that STA. +/// Because every call is confined to the single owning STA, no lock is +/// taken around the COM object or the handle maps: a lock here would risk +/// deadlocking the STA against a re-entrant pump callback, and the STA +/// affinity already serializes access. +/// +/// +public sealed class LmxSubtagAlarmSource : ISubtagAlarmSource +{ + private const string DefaultClientName = "OtOpcUa.ZB.MOM.WW.MxGateway.Worker.SubtagAlarms"; + + private readonly IMxAccessComObjectFactory? factory; + private readonly string clientName; + private readonly Dictionary itemHandlesByAddress = + new Dictionary(StringComparer.Ordinal); + private readonly Dictionary addressesByItemHandle = + new Dictionary(); + + private object? mxAccessComObject; + private IMxAccessServer? server; + private LMXProxyServerClass? comEventSource; + private int serverHandle; + private bool registered; + private bool disposed; + + /// + /// Production constructor. The MXAccess COM object is created lazily on + /// the first call (which must run on the worker's + /// STA) via the supplied . + /// + /// Factory that creates the MXAccess proxy-server COM object. + /// + /// Optional MXAccess client/registration name used for + /// ; a worker-specific default is + /// used when null or whitespace. Mirrors the registration name + /// accepts. + /// + /// is null. + public LmxSubtagAlarmSource( + IMxAccessComObjectFactory factory, + string? clientName = null) + { + this.factory = factory ?? throw new ArgumentNullException(nameof(factory)); + this.clientName = string.IsNullOrWhiteSpace(clientName) ? DefaultClientName : clientName!; + } + + /// + /// Test-only seam: constructs a source over a caller-supplied + /// with a pre-registered server handle, + /// bypassing the live COM factory. Exposed via + /// InternalsVisibleTo("ZB.MOM.WW.MxGateway.Worker.Tests") so tests can + /// exercise / and + /// without a live LMXProxyServerClass. + /// No COM event source is wired in this mode — tests drive + /// directly to simulate an + /// OnDataChange callback. + /// + /// The server abstraction to drive. + /// The already-registered server handle to use. + internal LmxSubtagAlarmSource( + IMxAccessServer server, + int serverHandle) + { + this.factory = null; + this.clientName = DefaultClientName; + this.server = server ?? throw new ArgumentNullException(nameof(server)); + this.serverHandle = serverHandle; + this.registered = true; + } + + /// + public event EventHandler? ValueChanged; + + /// + /// + /// Idempotent per address: an address already advised is skipped. The + /// MXAccess COM object is created and the client registered on the first + /// call. Must be invoked on the worker's STA. + /// + public void Advise(IReadOnlyCollection itemAddresses) + { + if (itemAddresses is null) + { + throw new ArgumentNullException(nameof(itemAddresses)); + } + + ThrowIfDisposed(); + EnsureRegistered(); + + IMxAccessServer mxServer = server!; + foreach (string? itemAddress in itemAddresses) + { + if (string.IsNullOrWhiteSpace(itemAddress)) + { + continue; + } + + if (itemHandlesByAddress.ContainsKey(itemAddress!)) + { + continue; + } + + int itemHandle = mxServer.AddItem(serverHandle, itemAddress!); + mxServer.Advise(serverHandle, itemHandle); + itemHandlesByAddress[itemAddress!] = itemHandle; + addressesByItemHandle[itemHandle] = itemAddress!; + } + } + + /// + /// + /// Adds the item if it has not been added yet — the write may target a + /// subtag (e.g. the ack-comment) that was never advised — then writes + /// with MXAccess user id 0 (unsecured). Must be invoked on the worker's + /// STA. + /// + public void Write(string itemAddress, object? value) + { + if (itemAddress is null) + { + throw new ArgumentNullException(nameof(itemAddress)); + } + + ThrowIfDisposed(); + EnsureRegistered(); + + IMxAccessServer mxServer = server!; + if (!itemHandlesByAddress.TryGetValue(itemAddress, out int itemHandle)) + { + itemHandle = mxServer.AddItem(serverHandle, itemAddress); + itemHandlesByAddress[itemAddress] = itemHandle; + addressesByItemHandle[itemHandle] = itemAddress; + } + + mxServer.Write(serverHandle, itemHandle, value, userId: 0); + } + + /// + /// Normalizes an MXAccess OnDataChange callback into a + /// and raises + /// when the item handle maps to an advised address. Unknown handles are + /// ignored. Exposed internal as a unit-test seam so the + /// normalization can be exercised without a live LMXProxyServerClass, + /// mirroring MxAccessBaseEventSink.OnDataChange. + /// + /// The MXAccess item handle that changed. + /// The new item value (left as a boxed COM variant; the state machine coerces it). + /// The item timestamp as delivered by MXAccess (a culture-formatted string). + internal void HandleDataChange( + int phItemHandle, + object? pvItemValue, + object? pftItemTimeStamp) + { + if (!addressesByItemHandle.TryGetValue(phItemHandle, out string? itemAddress)) + { + return; + } + + // MXAccess delivers OnDataChange timestamps as a culture-formatted + // string (VT_BSTR), not a FILETIME or VT_DATE — see + // MxAccessEventMapper.TryParseSourceTimestamp, which is the single + // conversion helper the per-session pipeline already uses. Reuse it so + // the subtag path interprets the host-local string the same way; fall + // back to capture time when the string is absent or unparsable. + DateTime timestampUtc = ConvertTimestampUtc(pftItemTimeStamp); + + EventHandler? handler = ValueChanged; + handler?.Invoke(this, new SubtagValueChange + { + ItemAddress = itemAddress, + Value = pvItemValue, + TimestampUtc = timestampUtc, + }); + } + + /// + public void Dispose() + { + if (disposed) + { + return; + } + + disposed = true; + + // Unsubscribe the COM event before tearing handles down so a late pump + // callback cannot re-enter HandleDataChange mid-teardown. + if (comEventSource is not null) + { + try { comEventSource.OnDataChange -= OnDataChange; } catch { /* swallow — best effort */ } + } + + IMxAccessServer? mxServer = server; + if (mxServer is not null && registered) + { + foreach (KeyValuePair entry in addressesByItemHandle) + { + try { mxServer.UnAdvise(serverHandle, entry.Key); } catch { /* swallow — best effort */ } + try { mxServer.RemoveItem(serverHandle, entry.Key); } catch { /* swallow — best effort */ } + } + + try { mxServer.Unregister(serverHandle); } catch { /* swallow — best effort */ } + } + + itemHandlesByAddress.Clear(); + addressesByItemHandle.Clear(); + + object? comToRelease = mxAccessComObject; + mxAccessComObject = null; + comEventSource = null; + server = null; + registered = false; + + if (comToRelease is not null && Marshal.IsComObject(comToRelease)) + { + try { Marshal.FinalReleaseComObject(comToRelease); } catch { /* swallow — best effort */ } + } + } + + private void EnsureRegistered() + { + if (registered) + { + return; + } + + // factory is only null in the test-only constructor, which sets + // registered = true, so this path is never hit there. + object created = factory!.Create() + ?? throw new InvalidOperationException("MXAccess COM factory returned null."); + + try + { + mxAccessComObject = created; + + // Wire OnDataChange the same way MxAccessBaseEventSink.Attach does: + // cast to the concrete LMXProxyServerClass RCW and subscribe. Test + // doubles cannot be cast to LMXProxyServerClass, so the COM event + // subscription is exercised only on the live path; unit tests drive + // HandleDataChange directly via the internal constructor seam. + if (created is LMXProxyServerClass comObject) + { + comEventSource = comObject; + comObject.OnDataChange += OnDataChange; + } + + server = new MxAccessComServer(created); + serverHandle = server.Register(clientName); + registered = true; + } + catch + { + if (comEventSource is not null) + { + try { comEventSource.OnDataChange -= OnDataChange; } catch { /* swallow */ } + comEventSource = null; + } + + if (Marshal.IsComObject(created)) + { + try { Marshal.FinalReleaseComObject(created); } catch { /* swallow */ } + } + + mxAccessComObject = null; + server = null; + throw; + } + } + + /// + /// COM OnDataChange delegate target. Forwards to + /// ; the quality, status array, and server + /// handle carry no information the subtag fallback needs. + /// + private void OnDataChange( + int hLMXServerHandle, + int phItemHandle, + object pvItemValue, + int pwItemQuality, + object pftItemTimeStamp, + ref MXSTATUS_PROXY[] pVars) + { + HandleDataChange(phItemHandle, pvItemValue, pftItemTimeStamp); + } + + private static DateTime ConvertTimestampUtc(object? pftItemTimeStamp) + { + if (pftItemTimeStamp is not null + && MxAccessEventMapper.TryParseSourceTimestamp(pftItemTimeStamp.ToString(), out DateTime parsedUtc)) + { + return parsedUtc; + } + + return DateTime.UtcNow; + } + + private void ThrowIfDisposed() + { + if (disposed) + { + throw new ObjectDisposedException(nameof(LmxSubtagAlarmSource)); + } + } +}