A.3 (worker IPC slice): proto SubscribeAlarms/Acknowledge/QueryActive commands + executor routing

Adds the worker-side IPC surface for the alarm subsystem so the gateway
can drive the AlarmDispatcher across the named-pipe boundary. Adds four
proto MxCommandKind values + matching command messages and two
MxCommandReply payload variants:

- SubscribeAlarmsCommand(subscription_expression)
- UnsubscribeAlarmsCommand
- AcknowledgeAlarmCommand(alarm_guid, comment, operator_user/node/domain/full_name)
- QueryActiveAlarmsCommand(alarm_filter_prefix)
- AcknowledgeAlarmReplyPayload(native_status)
- QueryActiveAlarmsReplyPayload(repeated ActiveAlarmSnapshot snapshots)

Worker plumbing:

- New IAlarmCommandHandler interface + AlarmCommandHandler production
  impl. Lazy-creates an AlarmDispatcher (with a wnwrap-backed consumer
  by default) on the first SubscribeAlarms; routes Acknowledge / QueryActive /
  Unsubscribe through it. Idempotent under repeated Unsubscribe; rejects
  a second Subscribe without an intervening Unsubscribe; cleans up the
  consumer if the underlying Subscribe call throws.
- MxAccessCommandExecutor: 4 new switch arms map MxCommandKind values to
  IAlarmCommandHandler calls. Acknowledge surfaces the AVEVA native
  status into both MxCommandReply.Hresult and the dedicated
  AcknowledgeAlarmReplyPayload.NativeStatus so gateway-side consumers
  can echo it without unpacking the outer envelope. Invalid GUIDs and
  missing payloads return InvalidRequest; handler exceptions return
  MxaccessFailure with the exception message in DiagnosticMessage.
- MxAccessStaSession: new constructor overload accepts an
  alarmCommandHandlerFactory; it's invoked on the STA thread during
  StartAsync and the resulting handler is passed into the executor.
  ShutdownGracefullyAsync + Dispose tear it down on the STA before the
  data-side cleanup runs.

Tests: 20 new unit tests covering AlarmCommandHandler lazy lifecycle
(Subscribe/Unsubscribe/Acknowledge/Query/Dispose, error paths) and the
executor's 4 alarm switch arms (OK/InvalidRequest/MxaccessFailure paths,
hresult propagation, prefix filtering). Worker test suite total: 192
passed / 3 skipped (live probes) / 1 pre-existing structure-test fail
(untouched).

Deferred to next slice: gateway-side WorkerAlarmRpcDispatcher that
replaces NotWiredAlarmRpcDispatcher, builds + sends these commands across
the IPC, and unwraps the resulting MxCommandReply into AcknowledgeAlarmReply
/ ActiveAlarmSnapshot stream.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Joseph Doherty
2026-05-01 10:52:04 -04:00
parent 82eb0ad569
commit 01f5e6ad91
7 changed files with 3170 additions and 352 deletions
@@ -0,0 +1,192 @@
using System;
using System.Collections.Generic;
using MxGateway.Contracts.Proto;
namespace MxGateway.Worker.MxAccess;
/// <summary>
/// Per-session owner of the worker's alarm-side state. Lazy-creates an
/// <see cref="AlarmDispatcher"/> (with a wnwrap-backed
/// <see cref="WnWrapAlarmConsumer"/> by default) on the first
/// <see cref="Subscribe"/> call, then routes
/// <see cref="Acknowledge"/> / <see cref="QueryActive"/> /
/// <see cref="Unsubscribe"/> through the same instance for the
/// session's lifetime.
/// </summary>
/// <remarks>
/// <para>
/// Construction is dependency-injectable: the consumer factory
/// (default <c>() =&gt; new WnWrapAlarmConsumer()</c>) lets tests
/// substitute a fake without touching AVEVA COM. The event queue
/// is supplied by the owning <see cref="MxAccessStaSession"/> so
/// the alarm-side proto events land on the same queue the worker
/// already drains for IPC dispatch.
/// </para>
/// <para>
/// Threading: invoked from <see cref="MxAccessCommandExecutor"/>
/// which runs on the STA. The wnwrap consumer's polling timer
/// fires on a thread-pool thread; the only cross-thread surface
/// is the <see cref="AlarmDispatcher"/>'s event handler, which
/// hand-offs into the thread-safe <see cref="MxAccessEventQueue"/>.
/// </para>
/// </remarks>
public sealed class AlarmCommandHandler : IAlarmCommandHandler
{
private readonly MxAccessEventQueue eventQueue;
private readonly Func<IMxAccessAlarmConsumer> consumerFactory;
private readonly object syncRoot = new object();
private AlarmDispatcher? dispatcher;
private bool disposed;
public AlarmCommandHandler(MxAccessEventQueue eventQueue)
: this(eventQueue, () => new WnWrapAlarmConsumer())
{
}
/// <summary>Test seam — inject a custom consumer factory.</summary>
public AlarmCommandHandler(
MxAccessEventQueue eventQueue,
Func<IMxAccessAlarmConsumer> consumerFactory)
{
this.eventQueue = eventQueue ?? throw new ArgumentNullException(nameof(eventQueue));
this.consumerFactory = consumerFactory ?? throw new ArgumentNullException(nameof(consumerFactory));
}
public bool IsSubscribed
{
get { lock (syncRoot) return dispatcher is not null; }
}
/// <inheritdoc />
public void Subscribe(string subscription, string sessionId)
{
if (disposed) throw new ObjectDisposedException(nameof(AlarmCommandHandler));
if (subscription is null) throw new ArgumentNullException(nameof(subscription));
lock (syncRoot)
{
if (dispatcher is not null)
{
throw new InvalidOperationException(
"AlarmCommandHandler already has an active subscription; " +
"call Unsubscribe before issuing another SubscribeAlarms command.");
}
IMxAccessAlarmConsumer consumer = consumerFactory()
?? throw new InvalidOperationException("Alarm consumer factory returned null.");
MxAccessAlarmEventSink sink = new MxAccessAlarmEventSink(
eventQueue, new MxAccessEventMapper());
dispatcher = new AlarmDispatcher(consumer, sink, sessionId ?? string.Empty);
try
{
dispatcher.Subscribe(subscription);
}
catch
{
try { dispatcher.Dispose(); } catch { /* swallow */ }
dispatcher = null;
throw;
}
}
}
/// <inheritdoc />
public void Unsubscribe()
{
AlarmDispatcher? toDispose;
lock (syncRoot)
{
toDispose = dispatcher;
dispatcher = null;
}
toDispose?.Dispose();
}
/// <inheritdoc />
public int Acknowledge(
Guid alarmGuid,
string comment,
string operatorUser,
string operatorNode,
string operatorDomain,
string operatorFullName)
{
AlarmDispatcher? d = GetDispatcherOrThrow();
return d.Acknowledge(
alarmGuid,
comment ?? string.Empty,
operatorUser ?? string.Empty,
operatorNode ?? string.Empty,
operatorDomain ?? string.Empty,
operatorFullName ?? string.Empty);
}
/// <inheritdoc />
public IReadOnlyList<ActiveAlarmSnapshot> QueryActive(string? alarmFilterPrefix)
{
AlarmDispatcher? d = GetDispatcherOrThrow();
IReadOnlyList<ActiveAlarmSnapshot> all = d.SnapshotActiveAlarms();
if (string.IsNullOrEmpty(alarmFilterPrefix)) return all;
List<ActiveAlarmSnapshot> filtered = new List<ActiveAlarmSnapshot>(all.Count);
foreach (ActiveAlarmSnapshot snap in all)
{
if (snap.AlarmFullReference.StartsWith(alarmFilterPrefix!, StringComparison.Ordinal))
{
filtered.Add(snap);
}
}
return filtered;
}
private AlarmDispatcher GetDispatcherOrThrow()
{
if (disposed) throw new ObjectDisposedException(nameof(AlarmCommandHandler));
AlarmDispatcher? d;
lock (syncRoot) d = dispatcher;
if (d is null)
{
throw new InvalidOperationException(
"AlarmCommandHandler has no active subscription; " +
"call SubscribeAlarms before issuing alarm-related commands.");
}
return d;
}
/// <inheritdoc />
public void Dispose()
{
if (disposed) return;
disposed = true;
Unsubscribe();
}
}
/// <summary>
/// Per-session interface routing the worker's alarm IPC commands —
/// <c>SubscribeAlarmsCommand</c>, <c>AcknowledgeAlarmCommand</c>,
/// <c>QueryActiveAlarmsCommand</c>, <c>UnsubscribeAlarmsCommand</c> —
/// to the underlying <see cref="AlarmDispatcher"/>. Production binding
/// is <see cref="AlarmCommandHandler"/>; tests substitute a fake.
/// </summary>
public interface IAlarmCommandHandler : IDisposable
{
/// <summary>Begin a subscription against the supplied AVEVA alarm-provider expression.</summary>
void Subscribe(string subscription, string sessionId);
/// <summary>Tear down the active subscription. No-op if not subscribed.</summary>
void Unsubscribe();
/// <summary>Acknowledge a single alarm by GUID. Returns AVEVA's native status (0 = success).</summary>
int Acknowledge(
Guid alarmGuid,
string comment,
string operatorUser,
string operatorNode,
string operatorDomain,
string operatorFullName);
/// <summary>
/// Snapshot the currently-active alarm set, optionally scoped to a
/// prefix matched against <c>AlarmFullReference</c>.
/// </summary>
IReadOnlyList<ActiveAlarmSnapshot> QueryActive(string? alarmFilterPrefix);
}