A.3 (worker IPC slice): proto SubscribeAlarms/Acknowledge/QueryActive commands + executor routing
Adds the worker-side IPC surface for the alarm subsystem so the gateway can drive the AlarmDispatcher across the named-pipe boundary. Adds four proto MxCommandKind values + matching command messages and two MxCommandReply payload variants: - SubscribeAlarmsCommand(subscription_expression) - UnsubscribeAlarmsCommand - AcknowledgeAlarmCommand(alarm_guid, comment, operator_user/node/domain/full_name) - QueryActiveAlarmsCommand(alarm_filter_prefix) - AcknowledgeAlarmReplyPayload(native_status) - QueryActiveAlarmsReplyPayload(repeated ActiveAlarmSnapshot snapshots) Worker plumbing: - New IAlarmCommandHandler interface + AlarmCommandHandler production impl. Lazy-creates an AlarmDispatcher (with a wnwrap-backed consumer by default) on the first SubscribeAlarms; routes Acknowledge / QueryActive / Unsubscribe through it. Idempotent under repeated Unsubscribe; rejects a second Subscribe without an intervening Unsubscribe; cleans up the consumer if the underlying Subscribe call throws. - MxAccessCommandExecutor: 4 new switch arms map MxCommandKind values to IAlarmCommandHandler calls. Acknowledge surfaces the AVEVA native status into both MxCommandReply.Hresult and the dedicated AcknowledgeAlarmReplyPayload.NativeStatus so gateway-side consumers can echo it without unpacking the outer envelope. Invalid GUIDs and missing payloads return InvalidRequest; handler exceptions return MxaccessFailure with the exception message in DiagnosticMessage. - MxAccessStaSession: new constructor overload accepts an alarmCommandHandlerFactory; it's invoked on the STA thread during StartAsync and the resulting handler is passed into the executor. ShutdownGracefullyAsync + Dispose tear it down on the STA before the data-side cleanup runs. Tests: 20 new unit tests covering AlarmCommandHandler lazy lifecycle (Subscribe/Unsubscribe/Acknowledge/Query/Dispose, error paths) and the executor's 4 alarm switch arms (OK/InvalidRequest/MxaccessFailure paths, hresult propagation, prefix filtering). Worker test suite total: 192 passed / 3 skipped (live probes) / 1 pre-existing structure-test fail (untouched). Deferred to next slice: gateway-side WorkerAlarmRpcDispatcher that replaces NotWiredAlarmRpcDispatcher, builds + sends these commands across the IPC, and unwraps the resulting MxCommandReply into AcknowledgeAlarmReply / ActiveAlarmSnapshot stream. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
File diff suppressed because it is too large
Load Diff
@@ -88,6 +88,10 @@ message MxCommand {
|
|||||||
UnAdviseItemBulkCommand un_advise_item_bulk = 31;
|
UnAdviseItemBulkCommand un_advise_item_bulk = 31;
|
||||||
SubscribeBulkCommand subscribe_bulk = 32;
|
SubscribeBulkCommand subscribe_bulk = 32;
|
||||||
UnsubscribeBulkCommand unsubscribe_bulk = 33;
|
UnsubscribeBulkCommand unsubscribe_bulk = 33;
|
||||||
|
SubscribeAlarmsCommand subscribe_alarms = 34;
|
||||||
|
UnsubscribeAlarmsCommand unsubscribe_alarms = 35;
|
||||||
|
AcknowledgeAlarmCommand acknowledge_alarm_command = 36;
|
||||||
|
QueryActiveAlarmsCommand query_active_alarms_command = 37;
|
||||||
PingCommand ping = 100;
|
PingCommand ping = 100;
|
||||||
GetSessionStateCommand get_session_state = 101;
|
GetSessionStateCommand get_session_state = 101;
|
||||||
GetWorkerInfoCommand get_worker_info = 102;
|
GetWorkerInfoCommand get_worker_info = 102;
|
||||||
@@ -122,6 +126,10 @@ enum MxCommandKind {
|
|||||||
MX_COMMAND_KIND_UN_ADVISE_ITEM_BULK = 22;
|
MX_COMMAND_KIND_UN_ADVISE_ITEM_BULK = 22;
|
||||||
MX_COMMAND_KIND_SUBSCRIBE_BULK = 23;
|
MX_COMMAND_KIND_SUBSCRIBE_BULK = 23;
|
||||||
MX_COMMAND_KIND_UNSUBSCRIBE_BULK = 24;
|
MX_COMMAND_KIND_UNSUBSCRIBE_BULK = 24;
|
||||||
|
MX_COMMAND_KIND_SUBSCRIBE_ALARMS = 25;
|
||||||
|
MX_COMMAND_KIND_UNSUBSCRIBE_ALARMS = 26;
|
||||||
|
MX_COMMAND_KIND_ACKNOWLEDGE_ALARM = 27;
|
||||||
|
MX_COMMAND_KIND_QUERY_ACTIVE_ALARMS = 28;
|
||||||
MX_COMMAND_KIND_PING = 100;
|
MX_COMMAND_KIND_PING = 100;
|
||||||
MX_COMMAND_KIND_GET_SESSION_STATE = 101;
|
MX_COMMAND_KIND_GET_SESSION_STATE = 101;
|
||||||
MX_COMMAND_KIND_GET_WORKER_INFO = 102;
|
MX_COMMAND_KIND_GET_WORKER_INFO = 102;
|
||||||
@@ -263,6 +271,42 @@ message SubscribeBulkCommand {
|
|||||||
repeated string tag_addresses = 2;
|
repeated string tag_addresses = 2;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Subscribe the worker's alarm consumer to an AVEVA alarm provider.
|
||||||
|
// Subscription expression follows the canonical
|
||||||
|
// `\\<machine>\Galaxy!<area>` format (literal "Galaxy" provider). The
|
||||||
|
// worker spins up a wnwrapConsumer-backed subscription on its STA on
|
||||||
|
// first call; subsequent calls are an error (use UnsubscribeAlarms then
|
||||||
|
// SubscribeAlarms to reconfigure).
|
||||||
|
message SubscribeAlarmsCommand {
|
||||||
|
string subscription_expression = 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Tear down the worker's alarm consumer. No-op if no subscription is
|
||||||
|
// currently active.
|
||||||
|
message UnsubscribeAlarmsCommand {
|
||||||
|
}
|
||||||
|
|
||||||
|
// Acknowledge a single alarm by its GUID. Operator identity fields are
|
||||||
|
// recorded atomically with the ack transition in the alarm-history log.
|
||||||
|
// The reply's hresult / native_status surfaces AVEVA's
|
||||||
|
// AlarmAckByGUID return code.
|
||||||
|
message AcknowledgeAlarmCommand {
|
||||||
|
// Canonical 8-4-4-4-12 GUID string (e.g. "BCC47053-9542-4D65-BDAA-BCDEA6A32A73").
|
||||||
|
string alarm_guid = 1;
|
||||||
|
string comment = 2;
|
||||||
|
string operator_user = 3;
|
||||||
|
string operator_node = 4;
|
||||||
|
string operator_domain = 5;
|
||||||
|
string operator_full_name = 6;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Snapshot the currently-active alarm set. Optional filter prefix scopes
|
||||||
|
// the snapshot to alarms whose alarm_full_reference starts with the
|
||||||
|
// supplied string (matches QueryActiveAlarmsRequest.alarm_filter_prefix).
|
||||||
|
message QueryActiveAlarmsCommand {
|
||||||
|
string alarm_filter_prefix = 1;
|
||||||
|
}
|
||||||
|
|
||||||
message UnsubscribeBulkCommand {
|
message UnsubscribeBulkCommand {
|
||||||
int32 server_handle = 1;
|
int32 server_handle = 1;
|
||||||
repeated int32 item_handles = 2;
|
repeated int32 item_handles = 2;
|
||||||
@@ -314,6 +358,8 @@ message MxCommandReply {
|
|||||||
BulkSubscribeReply un_advise_item_bulk = 31;
|
BulkSubscribeReply un_advise_item_bulk = 31;
|
||||||
BulkSubscribeReply subscribe_bulk = 32;
|
BulkSubscribeReply subscribe_bulk = 32;
|
||||||
BulkSubscribeReply unsubscribe_bulk = 33;
|
BulkSubscribeReply unsubscribe_bulk = 33;
|
||||||
|
AcknowledgeAlarmReplyPayload acknowledge_alarm = 34;
|
||||||
|
QueryActiveAlarmsReplyPayload query_active_alarms = 35;
|
||||||
SessionStateReply session_state = 100;
|
SessionStateReply session_state = 100;
|
||||||
WorkerInfoReply worker_info = 101;
|
WorkerInfoReply worker_info = 101;
|
||||||
DrainEventsReply drain_events = 102;
|
DrainEventsReply drain_events = 102;
|
||||||
@@ -379,6 +425,24 @@ message DrainEventsReply {
|
|||||||
repeated MxEvent events = 1;
|
repeated MxEvent events = 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Reply payload for AcknowledgeAlarmCommand. Surfaces AVEVA's native
|
||||||
|
// AlarmAckByGUID return code; 0 means success. The MxCommandReply's
|
||||||
|
// hresult field carries the same value and is preferred for protocol
|
||||||
|
// consumers — this payload exists so the gateway-side
|
||||||
|
// WorkerAlarmRpcDispatcher can echo native_status into
|
||||||
|
// AcknowledgeAlarmReply.hresult without unpacking the outer envelope.
|
||||||
|
message AcknowledgeAlarmReplyPayload {
|
||||||
|
int32 native_status = 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Reply payload for QueryActiveAlarmsCommand. The worker walks
|
||||||
|
// IMxAccessAlarmConsumer.SnapshotActiveAlarms and packs each record as
|
||||||
|
// an ActiveAlarmSnapshot proto for the gateway-side ConditionRefresh
|
||||||
|
// stream.
|
||||||
|
message QueryActiveAlarmsReplyPayload {
|
||||||
|
repeated ActiveAlarmSnapshot snapshots = 1;
|
||||||
|
}
|
||||||
|
|
||||||
message MxEvent {
|
message MxEvent {
|
||||||
MxEventFamily family = 1;
|
MxEventFamily family = 1;
|
||||||
string session_id = 2;
|
string session_id = 2;
|
||||||
|
|||||||
@@ -0,0 +1,384 @@
|
|||||||
|
using System;
|
||||||
|
using System.Collections.Generic;
|
||||||
|
using MxGateway.Contracts.Proto;
|
||||||
|
using MxGateway.Worker.MxAccess;
|
||||||
|
using MxGateway.Worker.Sta;
|
||||||
|
|
||||||
|
namespace MxGateway.Worker.Tests.MxAccess;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Verifies that the four new alarm <see cref="MxCommandKind"/> values
|
||||||
|
/// route through <see cref="MxAccessCommandExecutor"/> to a fake
|
||||||
|
/// <see cref="IAlarmCommandHandler"/> and that the resulting
|
||||||
|
/// <see cref="MxCommandReply"/> carries the expected payload.
|
||||||
|
///
|
||||||
|
/// The data-side <see cref="MxAccessSession"/> is constructed via a
|
||||||
|
/// no-op factory because the executor only touches it for non-alarm
|
||||||
|
/// command kinds — alarm dispatch never reaches the data session.
|
||||||
|
/// </summary>
|
||||||
|
public sealed class AlarmCommandExecutorTests
|
||||||
|
{
|
||||||
|
private const string SessionId = "S";
|
||||||
|
private const string CorrelationId = "C";
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void SubscribeAlarms_routes_to_handler_and_returns_ok()
|
||||||
|
{
|
||||||
|
FakeAlarmHandler handler = new FakeAlarmHandler();
|
||||||
|
MxAccessCommandExecutor executor = NewExecutor(handler);
|
||||||
|
|
||||||
|
StaCommand command = new StaCommand(
|
||||||
|
SessionId, CorrelationId,
|
||||||
|
new MxCommand
|
||||||
|
{
|
||||||
|
Kind = MxCommandKind.SubscribeAlarms,
|
||||||
|
SubscribeAlarms = new SubscribeAlarmsCommand
|
||||||
|
{
|
||||||
|
SubscriptionExpression = @"\\HOST\Galaxy!Area",
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
MxCommandReply reply = executor.Execute(command);
|
||||||
|
|
||||||
|
Assert.Equal(ProtocolStatusCode.Ok, reply.ProtocolStatus.Code);
|
||||||
|
Assert.Equal(@"\\HOST\Galaxy!Area", handler.LastSubscription);
|
||||||
|
Assert.Equal(SessionId, handler.LastSessionId);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void SubscribeAlarms_without_handler_returns_invalid_request()
|
||||||
|
{
|
||||||
|
MxAccessCommandExecutor executor = NewExecutor(alarmHandler: null);
|
||||||
|
|
||||||
|
StaCommand command = new StaCommand(
|
||||||
|
SessionId, CorrelationId,
|
||||||
|
new MxCommand
|
||||||
|
{
|
||||||
|
Kind = MxCommandKind.SubscribeAlarms,
|
||||||
|
SubscribeAlarms = new SubscribeAlarmsCommand
|
||||||
|
{
|
||||||
|
SubscriptionExpression = @"\\HOST\Galaxy!Area",
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
MxCommandReply reply = executor.Execute(command);
|
||||||
|
|
||||||
|
Assert.Equal(ProtocolStatusCode.InvalidRequest, reply.ProtocolStatus.Code);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void SubscribeAlarms_with_empty_expression_returns_invalid_request()
|
||||||
|
{
|
||||||
|
MxAccessCommandExecutor executor = NewExecutor(new FakeAlarmHandler());
|
||||||
|
|
||||||
|
StaCommand command = new StaCommand(
|
||||||
|
SessionId, CorrelationId,
|
||||||
|
new MxCommand
|
||||||
|
{
|
||||||
|
Kind = MxCommandKind.SubscribeAlarms,
|
||||||
|
SubscribeAlarms = new SubscribeAlarmsCommand
|
||||||
|
{
|
||||||
|
SubscriptionExpression = " ",
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
MxCommandReply reply = executor.Execute(command);
|
||||||
|
|
||||||
|
Assert.Equal(ProtocolStatusCode.InvalidRequest, reply.ProtocolStatus.Code);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void AcknowledgeAlarm_routes_native_status_into_hresult_and_payload()
|
||||||
|
{
|
||||||
|
FakeAlarmHandler handler = new FakeAlarmHandler { AcknowledgeReturn = 0 };
|
||||||
|
MxAccessCommandExecutor executor = NewExecutor(handler);
|
||||||
|
Guid g = Guid.NewGuid();
|
||||||
|
|
||||||
|
StaCommand command = new StaCommand(
|
||||||
|
SessionId, CorrelationId,
|
||||||
|
new MxCommand
|
||||||
|
{
|
||||||
|
Kind = MxCommandKind.AcknowledgeAlarm,
|
||||||
|
AcknowledgeAlarmCommand = new AcknowledgeAlarmCommand
|
||||||
|
{
|
||||||
|
AlarmGuid = g.ToString(),
|
||||||
|
Comment = "ack",
|
||||||
|
OperatorUser = "alice",
|
||||||
|
OperatorNode = "WS",
|
||||||
|
OperatorDomain = "CORP",
|
||||||
|
OperatorFullName = "Alice S",
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
MxCommandReply reply = executor.Execute(command);
|
||||||
|
|
||||||
|
Assert.Equal(ProtocolStatusCode.Ok, reply.ProtocolStatus.Code);
|
||||||
|
Assert.Equal(0, reply.Hresult);
|
||||||
|
Assert.NotNull(reply.AcknowledgeAlarm);
|
||||||
|
Assert.Equal(0, reply.AcknowledgeAlarm.NativeStatus);
|
||||||
|
Assert.Equal(g, handler.LastAckGuid);
|
||||||
|
Assert.Equal("alice", handler.LastAckOperatorName);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void AcknowledgeAlarm_with_invalid_guid_returns_invalid_request()
|
||||||
|
{
|
||||||
|
MxAccessCommandExecutor executor = NewExecutor(new FakeAlarmHandler());
|
||||||
|
|
||||||
|
StaCommand command = new StaCommand(
|
||||||
|
SessionId, CorrelationId,
|
||||||
|
new MxCommand
|
||||||
|
{
|
||||||
|
Kind = MxCommandKind.AcknowledgeAlarm,
|
||||||
|
AcknowledgeAlarmCommand = new AcknowledgeAlarmCommand
|
||||||
|
{
|
||||||
|
AlarmGuid = "not-a-guid",
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
MxCommandReply reply = executor.Execute(command);
|
||||||
|
|
||||||
|
Assert.Equal(ProtocolStatusCode.InvalidRequest, reply.ProtocolStatus.Code);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void AcknowledgeAlarm_with_nonzero_native_status_carries_diagnostic()
|
||||||
|
{
|
||||||
|
FakeAlarmHandler handler = new FakeAlarmHandler { AcknowledgeReturn = -123 };
|
||||||
|
MxAccessCommandExecutor executor = NewExecutor(handler);
|
||||||
|
|
||||||
|
StaCommand command = new StaCommand(
|
||||||
|
SessionId, CorrelationId,
|
||||||
|
new MxCommand
|
||||||
|
{
|
||||||
|
Kind = MxCommandKind.AcknowledgeAlarm,
|
||||||
|
AcknowledgeAlarmCommand = new AcknowledgeAlarmCommand
|
||||||
|
{
|
||||||
|
AlarmGuid = Guid.NewGuid().ToString(),
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
MxCommandReply reply = executor.Execute(command);
|
||||||
|
|
||||||
|
Assert.Equal(-123, reply.Hresult);
|
||||||
|
Assert.Contains("-123", reply.DiagnosticMessage);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void QueryActiveAlarms_returns_payload_with_snapshots()
|
||||||
|
{
|
||||||
|
FakeAlarmHandler handler = new FakeAlarmHandler
|
||||||
|
{
|
||||||
|
QueryResult = new[]
|
||||||
|
{
|
||||||
|
new ActiveAlarmSnapshot { AlarmFullReference = "Galaxy!A.T1" },
|
||||||
|
new ActiveAlarmSnapshot { AlarmFullReference = "Galaxy!A.T2" },
|
||||||
|
},
|
||||||
|
};
|
||||||
|
MxAccessCommandExecutor executor = NewExecutor(handler);
|
||||||
|
|
||||||
|
StaCommand command = new StaCommand(
|
||||||
|
SessionId, CorrelationId,
|
||||||
|
new MxCommand
|
||||||
|
{
|
||||||
|
Kind = MxCommandKind.QueryActiveAlarms,
|
||||||
|
QueryActiveAlarmsCommand = new QueryActiveAlarmsCommand
|
||||||
|
{
|
||||||
|
AlarmFilterPrefix = "Galaxy!A",
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
MxCommandReply reply = executor.Execute(command);
|
||||||
|
|
||||||
|
Assert.Equal(ProtocolStatusCode.Ok, reply.ProtocolStatus.Code);
|
||||||
|
Assert.NotNull(reply.QueryActiveAlarms);
|
||||||
|
Assert.Equal(2, reply.QueryActiveAlarms.Snapshots.Count);
|
||||||
|
Assert.Equal("Galaxy!A", handler.LastFilterPrefix);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void UnsubscribeAlarms_routes_to_handler()
|
||||||
|
{
|
||||||
|
FakeAlarmHandler handler = new FakeAlarmHandler();
|
||||||
|
MxAccessCommandExecutor executor = NewExecutor(handler);
|
||||||
|
|
||||||
|
StaCommand command = new StaCommand(
|
||||||
|
SessionId, CorrelationId,
|
||||||
|
new MxCommand
|
||||||
|
{
|
||||||
|
Kind = MxCommandKind.UnsubscribeAlarms,
|
||||||
|
UnsubscribeAlarms = new UnsubscribeAlarmsCommand(),
|
||||||
|
});
|
||||||
|
|
||||||
|
MxCommandReply reply = executor.Execute(command);
|
||||||
|
|
||||||
|
Assert.Equal(ProtocolStatusCode.Ok, reply.ProtocolStatus.Code);
|
||||||
|
Assert.True(handler.UnsubscribeCalled);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void UnsubscribeAlarms_without_handler_is_ok_noop()
|
||||||
|
{
|
||||||
|
MxAccessCommandExecutor executor = NewExecutor(alarmHandler: null);
|
||||||
|
|
||||||
|
StaCommand command = new StaCommand(
|
||||||
|
SessionId, CorrelationId,
|
||||||
|
new MxCommand
|
||||||
|
{
|
||||||
|
Kind = MxCommandKind.UnsubscribeAlarms,
|
||||||
|
UnsubscribeAlarms = new UnsubscribeAlarmsCommand(),
|
||||||
|
});
|
||||||
|
|
||||||
|
MxCommandReply reply = executor.Execute(command);
|
||||||
|
|
||||||
|
Assert.Equal(ProtocolStatusCode.Ok, reply.ProtocolStatus.Code);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void Acknowledge_handler_throw_returns_mxaccess_failure()
|
||||||
|
{
|
||||||
|
FakeAlarmHandler handler = new FakeAlarmHandler { AcknowledgeThrow = true };
|
||||||
|
MxAccessCommandExecutor executor = NewExecutor(handler);
|
||||||
|
|
||||||
|
StaCommand command = new StaCommand(
|
||||||
|
SessionId, CorrelationId,
|
||||||
|
new MxCommand
|
||||||
|
{
|
||||||
|
Kind = MxCommandKind.AcknowledgeAlarm,
|
||||||
|
AcknowledgeAlarmCommand = new AcknowledgeAlarmCommand
|
||||||
|
{
|
||||||
|
AlarmGuid = Guid.NewGuid().ToString(),
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
MxCommandReply reply = executor.Execute(command);
|
||||||
|
|
||||||
|
Assert.Equal(ProtocolStatusCode.MxaccessFailure, reply.ProtocolStatus.Code);
|
||||||
|
Assert.Contains("simulated", reply.DiagnosticMessage);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static MxAccessCommandExecutor NewExecutor(IAlarmCommandHandler? alarmHandler)
|
||||||
|
{
|
||||||
|
// Construct an executor with a no-op data session — we only exercise
|
||||||
|
// the alarm switch arms, which never touch the data session.
|
||||||
|
return new MxAccessCommandExecutor(
|
||||||
|
session: NoopMxAccessSession.Create(),
|
||||||
|
variantConverter: new MxGateway.Worker.Conversion.VariantConverter(),
|
||||||
|
alarmCommandHandler: alarmHandler);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Reflection-based helper to construct an MxAccessSession without
|
||||||
|
/// a real COM object. Only the alarm-side code paths are exercised
|
||||||
|
/// in this test class, so the session reference is never
|
||||||
|
/// dereferenced.
|
||||||
|
/// </summary>
|
||||||
|
private static class NoopMxAccessSession
|
||||||
|
{
|
||||||
|
public static MxAccessSession Create()
|
||||||
|
{
|
||||||
|
// Walk to the private constructor via reflection — the public
|
||||||
|
// factory MxAccessSession.Create(...) requires a real COM object.
|
||||||
|
System.Reflection.ConstructorInfo? ctor = typeof(MxAccessSession)
|
||||||
|
.GetConstructor(
|
||||||
|
System.Reflection.BindingFlags.NonPublic | System.Reflection.BindingFlags.Instance,
|
||||||
|
binder: null,
|
||||||
|
types: new[]
|
||||||
|
{
|
||||||
|
typeof(object),
|
||||||
|
typeof(IMxAccessServer),
|
||||||
|
typeof(IMxAccessEventSink),
|
||||||
|
typeof(MxAccessHandleRegistry),
|
||||||
|
typeof(int),
|
||||||
|
},
|
||||||
|
modifiers: null);
|
||||||
|
if (ctor is null)
|
||||||
|
{
|
||||||
|
throw new InvalidOperationException(
|
||||||
|
"MxAccessSession private ctor signature changed; update the test seam.");
|
||||||
|
}
|
||||||
|
return (MxAccessSession)ctor.Invoke(new object[]
|
||||||
|
{
|
||||||
|
new object(),
|
||||||
|
new NullMxAccessServer(),
|
||||||
|
new NullEventSink(),
|
||||||
|
new MxAccessHandleRegistry(),
|
||||||
|
System.Environment.CurrentManagedThreadId,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private sealed class NullMxAccessServer : IMxAccessServer
|
||||||
|
{
|
||||||
|
public int Register(string clientName) => 0;
|
||||||
|
public void Unregister(int serverHandle) { }
|
||||||
|
public int AddItem(int serverHandle, string itemDefinition) => 0;
|
||||||
|
public int AddItem2(int serverHandle, string itemDefinition, string itemContext) => 0;
|
||||||
|
public void RemoveItem(int serverHandle, int itemHandle) { }
|
||||||
|
public void Advise(int serverHandle, int itemHandle) { }
|
||||||
|
public void UnAdvise(int serverHandle, int itemHandle) { }
|
||||||
|
public void AdviseSupervisory(int serverHandle, int itemHandle) { }
|
||||||
|
public int AddBufferedItem(int serverHandle, string itemDefinition, string itemContext) => 0;
|
||||||
|
public void SetBufferedUpdateInterval(int serverHandle, int updateIntervalMilliseconds) { }
|
||||||
|
public void Suspend(int serverHandle, int itemHandle) { }
|
||||||
|
public void Activate(int serverHandle, int itemHandle) { }
|
||||||
|
public void Write(int serverHandle, int itemHandle, object value, int userId) { }
|
||||||
|
public void Write2(int serverHandle, int itemHandle, object value, object timestampValue, int userId) { }
|
||||||
|
public void WriteSecured(int serverHandle, int itemHandle, int currentUserId, int verifierUserId, object value) { }
|
||||||
|
public void WriteSecured2(int serverHandle, int itemHandle, int currentUserId, int verifierUserId, object value, object timestampValue) { }
|
||||||
|
public int AuthenticateUser(string userName, string password) => 0;
|
||||||
|
public int ArchestrAUserToId(string userName) => 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
private sealed class NullEventSink : IMxAccessEventSink
|
||||||
|
{
|
||||||
|
public void Attach(object mxAccessComObject, string sessionId) { }
|
||||||
|
public void Detach() { }
|
||||||
|
}
|
||||||
|
|
||||||
|
private sealed class FakeAlarmHandler : IAlarmCommandHandler
|
||||||
|
{
|
||||||
|
public string? LastSubscription { get; private set; }
|
||||||
|
public string? LastSessionId { get; private set; }
|
||||||
|
public bool UnsubscribeCalled { get; private set; }
|
||||||
|
public Guid LastAckGuid { get; private set; }
|
||||||
|
public string? LastAckOperatorName { get; private set; }
|
||||||
|
public int AcknowledgeReturn { get; set; }
|
||||||
|
public bool AcknowledgeThrow { get; set; }
|
||||||
|
public IReadOnlyList<ActiveAlarmSnapshot> QueryResult { get; set; } =
|
||||||
|
Array.Empty<ActiveAlarmSnapshot>();
|
||||||
|
public string? LastFilterPrefix { get; private set; }
|
||||||
|
|
||||||
|
public void Subscribe(string subscription, string sessionId)
|
||||||
|
{
|
||||||
|
LastSubscription = subscription;
|
||||||
|
LastSessionId = sessionId;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void Unsubscribe()
|
||||||
|
{
|
||||||
|
UnsubscribeCalled = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int Acknowledge(
|
||||||
|
Guid alarmGuid, string comment, string operatorUser,
|
||||||
|
string operatorNode, string operatorDomain, string operatorFullName)
|
||||||
|
{
|
||||||
|
LastAckGuid = alarmGuid;
|
||||||
|
LastAckOperatorName = operatorUser;
|
||||||
|
if (AcknowledgeThrow)
|
||||||
|
{
|
||||||
|
throw new InvalidOperationException("simulated alarm-handler failure");
|
||||||
|
}
|
||||||
|
return AcknowledgeReturn;
|
||||||
|
}
|
||||||
|
|
||||||
|
public IReadOnlyList<ActiveAlarmSnapshot> QueryActive(string? alarmFilterPrefix)
|
||||||
|
{
|
||||||
|
LastFilterPrefix = alarmFilterPrefix;
|
||||||
|
return QueryResult;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void Dispose() { }
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,232 @@
|
|||||||
|
using System;
|
||||||
|
using System.Collections.Generic;
|
||||||
|
using MxGateway.Contracts.Proto;
|
||||||
|
using MxGateway.Worker.MxAccess;
|
||||||
|
|
||||||
|
namespace MxGateway.Worker.Tests.MxAccess;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Unit tests for the per-session alarm command router. Uses a fake
|
||||||
|
/// consumer factory so the lazy-construction lifecycle on
|
||||||
|
/// <c>SubscribeAlarms</c> is exercised without touching wnwrap COM.
|
||||||
|
/// </summary>
|
||||||
|
public sealed class AlarmCommandHandlerTests
|
||||||
|
{
|
||||||
|
[Fact]
|
||||||
|
public void Subscribe_creates_consumer_and_calls_subscribe()
|
||||||
|
{
|
||||||
|
FakeConsumer consumer = new FakeConsumer();
|
||||||
|
AlarmCommandHandler handler = new AlarmCommandHandler(
|
||||||
|
new MxAccessEventQueue(),
|
||||||
|
() => consumer);
|
||||||
|
|
||||||
|
handler.Subscribe(@"\\HOST\Galaxy!Area", "session-1");
|
||||||
|
|
||||||
|
Assert.True(handler.IsSubscribed);
|
||||||
|
Assert.Equal(@"\\HOST\Galaxy!Area", consumer.LastSubscription);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void Second_subscribe_without_unsubscribe_throws()
|
||||||
|
{
|
||||||
|
FakeConsumer consumer = new FakeConsumer();
|
||||||
|
AlarmCommandHandler handler = new AlarmCommandHandler(
|
||||||
|
new MxAccessEventQueue(),
|
||||||
|
() => consumer);
|
||||||
|
|
||||||
|
handler.Subscribe(@"\\HOST\Galaxy!A", "s1");
|
||||||
|
Assert.Throws<InvalidOperationException>(
|
||||||
|
() => handler.Subscribe(@"\\HOST\Galaxy!B", "s1"));
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void Subscribe_disposes_consumer_when_underlying_subscribe_throws()
|
||||||
|
{
|
||||||
|
FakeConsumer consumer = new FakeConsumer { ThrowOnSubscribe = true };
|
||||||
|
AlarmCommandHandler handler = new AlarmCommandHandler(
|
||||||
|
new MxAccessEventQueue(),
|
||||||
|
() => consumer);
|
||||||
|
|
||||||
|
Assert.Throws<InvalidOperationException>(
|
||||||
|
() => handler.Subscribe(@"\\HOST\Galaxy!A", "s1"));
|
||||||
|
Assert.False(handler.IsSubscribed);
|
||||||
|
Assert.True(consumer.Disposed);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void Unsubscribe_disposes_consumer_and_clears_state()
|
||||||
|
{
|
||||||
|
FakeConsumer consumer = new FakeConsumer();
|
||||||
|
AlarmCommandHandler handler = new AlarmCommandHandler(
|
||||||
|
new MxAccessEventQueue(),
|
||||||
|
() => consumer);
|
||||||
|
handler.Subscribe(@"\\HOST\Galaxy!A", "s1");
|
||||||
|
|
||||||
|
handler.Unsubscribe();
|
||||||
|
|
||||||
|
Assert.False(handler.IsSubscribed);
|
||||||
|
Assert.True(consumer.Disposed);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void Unsubscribe_without_prior_subscribe_is_noop()
|
||||||
|
{
|
||||||
|
AlarmCommandHandler handler = new AlarmCommandHandler(
|
||||||
|
new MxAccessEventQueue(),
|
||||||
|
() => new FakeConsumer());
|
||||||
|
handler.Unsubscribe(); // Should not throw.
|
||||||
|
Assert.False(handler.IsSubscribed);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void Acknowledge_forwards_to_consumer_with_full_operator_identity()
|
||||||
|
{
|
||||||
|
FakeConsumer consumer = new FakeConsumer { AcknowledgeReturn = 0 };
|
||||||
|
AlarmCommandHandler handler = new AlarmCommandHandler(
|
||||||
|
new MxAccessEventQueue(),
|
||||||
|
() => consumer);
|
||||||
|
handler.Subscribe(@"\\HOST\Galaxy!A", "s1");
|
||||||
|
|
||||||
|
Guid g = Guid.NewGuid();
|
||||||
|
int rc = handler.Acknowledge(g, "c", "u", "n", "d", "F");
|
||||||
|
|
||||||
|
Assert.Equal(0, rc);
|
||||||
|
Assert.Equal(g, consumer.LastAckGuid);
|
||||||
|
Assert.Equal("u", consumer.LastAckOperatorName);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void Acknowledge_before_subscribe_throws_invalid_op()
|
||||||
|
{
|
||||||
|
AlarmCommandHandler handler = new AlarmCommandHandler(
|
||||||
|
new MxAccessEventQueue(),
|
||||||
|
() => new FakeConsumer());
|
||||||
|
|
||||||
|
Assert.Throws<InvalidOperationException>(
|
||||||
|
() => handler.Acknowledge(Guid.Empty, "", "", "", "", ""));
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void QueryActive_returns_mapped_proto_snapshots()
|
||||||
|
{
|
||||||
|
FakeConsumer consumer = new FakeConsumer
|
||||||
|
{
|
||||||
|
SnapshotResult = new[]
|
||||||
|
{
|
||||||
|
new MxAlarmSnapshotRecord
|
||||||
|
{
|
||||||
|
AlarmGuid = Guid.NewGuid(),
|
||||||
|
ProviderName = "Galaxy",
|
||||||
|
Group = "TestArea",
|
||||||
|
TagName = "Tag1",
|
||||||
|
Type = "DSC",
|
||||||
|
Priority = 500,
|
||||||
|
State = MxAlarmStateKind.UnackAlm,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
};
|
||||||
|
AlarmCommandHandler handler = new AlarmCommandHandler(
|
||||||
|
new MxAccessEventQueue(),
|
||||||
|
() => consumer);
|
||||||
|
handler.Subscribe(@"\\HOST\Galaxy!A", "s1");
|
||||||
|
|
||||||
|
IReadOnlyList<ActiveAlarmSnapshot> snapshots = handler.QueryActive(null);
|
||||||
|
|
||||||
|
Assert.Single(snapshots);
|
||||||
|
Assert.Equal("Galaxy!TestArea.Tag1", snapshots[0].AlarmFullReference);
|
||||||
|
Assert.Equal(AlarmConditionState.Active, snapshots[0].CurrentState);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void QueryActive_filters_by_prefix()
|
||||||
|
{
|
||||||
|
FakeConsumer consumer = new FakeConsumer
|
||||||
|
{
|
||||||
|
SnapshotResult = new[]
|
||||||
|
{
|
||||||
|
NewRecord("Galaxy", "AreaA", "Tag1"),
|
||||||
|
NewRecord("Galaxy", "AreaB", "Tag2"),
|
||||||
|
},
|
||||||
|
};
|
||||||
|
AlarmCommandHandler handler = new AlarmCommandHandler(
|
||||||
|
new MxAccessEventQueue(),
|
||||||
|
() => consumer);
|
||||||
|
handler.Subscribe(@"\\HOST\Galaxy!A", "s1");
|
||||||
|
|
||||||
|
IReadOnlyList<ActiveAlarmSnapshot> filtered = handler.QueryActive("Galaxy!AreaA");
|
||||||
|
|
||||||
|
Assert.Single(filtered);
|
||||||
|
Assert.Equal("Galaxy!AreaA.Tag1", filtered[0].AlarmFullReference);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void Dispose_unsubscribes_and_disposes_consumer()
|
||||||
|
{
|
||||||
|
FakeConsumer consumer = new FakeConsumer();
|
||||||
|
AlarmCommandHandler handler = new AlarmCommandHandler(
|
||||||
|
new MxAccessEventQueue(),
|
||||||
|
() => consumer);
|
||||||
|
handler.Subscribe(@"\\HOST\Galaxy!A", "s1");
|
||||||
|
|
||||||
|
handler.Dispose();
|
||||||
|
|
||||||
|
Assert.True(consumer.Disposed);
|
||||||
|
Assert.Throws<ObjectDisposedException>(
|
||||||
|
() => handler.Subscribe("x", "y"));
|
||||||
|
}
|
||||||
|
|
||||||
|
private static MxAlarmSnapshotRecord NewRecord(string provider, string group, string tag)
|
||||||
|
{
|
||||||
|
return new MxAlarmSnapshotRecord
|
||||||
|
{
|
||||||
|
AlarmGuid = Guid.NewGuid(),
|
||||||
|
ProviderName = provider,
|
||||||
|
Group = group,
|
||||||
|
TagName = tag,
|
||||||
|
Type = "DSC",
|
||||||
|
Priority = 500,
|
||||||
|
State = MxAlarmStateKind.UnackAlm,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
private sealed class FakeConsumer : IMxAccessAlarmConsumer
|
||||||
|
{
|
||||||
|
#pragma warning disable CS0067 // Event never invoked — fake; AlarmCommandHandler tests don't drive transitions.
|
||||||
|
public event EventHandler<MxAlarmTransitionEvent>? AlarmTransitionEmitted;
|
||||||
|
#pragma warning restore CS0067
|
||||||
|
|
||||||
|
public string? LastSubscription { get; private set; }
|
||||||
|
public Guid LastAckGuid { get; private set; }
|
||||||
|
public string? LastAckOperatorName { get; private set; }
|
||||||
|
public int AcknowledgeReturn { get; set; }
|
||||||
|
public IReadOnlyList<MxAlarmSnapshotRecord> SnapshotResult { get; set; } =
|
||||||
|
Array.Empty<MxAlarmSnapshotRecord>();
|
||||||
|
public bool ThrowOnSubscribe { get; set; }
|
||||||
|
public bool Disposed { get; private set; }
|
||||||
|
|
||||||
|
public void Subscribe(string subscription)
|
||||||
|
{
|
||||||
|
LastSubscription = subscription;
|
||||||
|
if (ThrowOnSubscribe)
|
||||||
|
{
|
||||||
|
throw new InvalidOperationException("simulated wnwrap subscribe failure");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public int AcknowledgeByGuid(
|
||||||
|
Guid alarmGuid, string ackComment, string ackOperatorName,
|
||||||
|
string ackOperatorNode, string ackOperatorDomain, string ackOperatorFullName)
|
||||||
|
{
|
||||||
|
LastAckGuid = alarmGuid;
|
||||||
|
LastAckOperatorName = ackOperatorName;
|
||||||
|
return AcknowledgeReturn;
|
||||||
|
}
|
||||||
|
|
||||||
|
public IReadOnlyList<MxAlarmSnapshotRecord> SnapshotActiveAlarms() => SnapshotResult;
|
||||||
|
|
||||||
|
public void Dispose()
|
||||||
|
{
|
||||||
|
Disposed = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,192 @@
|
|||||||
|
using System;
|
||||||
|
using System.Collections.Generic;
|
||||||
|
using MxGateway.Contracts.Proto;
|
||||||
|
|
||||||
|
namespace MxGateway.Worker.MxAccess;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Per-session owner of the worker's alarm-side state. Lazy-creates an
|
||||||
|
/// <see cref="AlarmDispatcher"/> (with a wnwrap-backed
|
||||||
|
/// <see cref="WnWrapAlarmConsumer"/> by default) on the first
|
||||||
|
/// <see cref="Subscribe"/> call, then routes
|
||||||
|
/// <see cref="Acknowledge"/> / <see cref="QueryActive"/> /
|
||||||
|
/// <see cref="Unsubscribe"/> through the same instance for the
|
||||||
|
/// session's lifetime.
|
||||||
|
/// </summary>
|
||||||
|
/// <remarks>
|
||||||
|
/// <para>
|
||||||
|
/// Construction is dependency-injectable: the consumer factory
|
||||||
|
/// (default <c>() => new WnWrapAlarmConsumer()</c>) lets tests
|
||||||
|
/// substitute a fake without touching AVEVA COM. The event queue
|
||||||
|
/// is supplied by the owning <see cref="MxAccessStaSession"/> so
|
||||||
|
/// the alarm-side proto events land on the same queue the worker
|
||||||
|
/// already drains for IPC dispatch.
|
||||||
|
/// </para>
|
||||||
|
/// <para>
|
||||||
|
/// Threading: invoked from <see cref="MxAccessCommandExecutor"/>
|
||||||
|
/// which runs on the STA. The wnwrap consumer's polling timer
|
||||||
|
/// fires on a thread-pool thread; the only cross-thread surface
|
||||||
|
/// is the <see cref="AlarmDispatcher"/>'s event handler, which
|
||||||
|
/// hand-offs into the thread-safe <see cref="MxAccessEventQueue"/>.
|
||||||
|
/// </para>
|
||||||
|
/// </remarks>
|
||||||
|
public sealed class AlarmCommandHandler : IAlarmCommandHandler
|
||||||
|
{
|
||||||
|
private readonly MxAccessEventQueue eventQueue;
|
||||||
|
private readonly Func<IMxAccessAlarmConsumer> consumerFactory;
|
||||||
|
private readonly object syncRoot = new object();
|
||||||
|
private AlarmDispatcher? dispatcher;
|
||||||
|
private bool disposed;
|
||||||
|
|
||||||
|
public AlarmCommandHandler(MxAccessEventQueue eventQueue)
|
||||||
|
: this(eventQueue, () => new WnWrapAlarmConsumer())
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>Test seam — inject a custom consumer factory.</summary>
|
||||||
|
public AlarmCommandHandler(
|
||||||
|
MxAccessEventQueue eventQueue,
|
||||||
|
Func<IMxAccessAlarmConsumer> consumerFactory)
|
||||||
|
{
|
||||||
|
this.eventQueue = eventQueue ?? throw new ArgumentNullException(nameof(eventQueue));
|
||||||
|
this.consumerFactory = consumerFactory ?? throw new ArgumentNullException(nameof(consumerFactory));
|
||||||
|
}
|
||||||
|
|
||||||
|
public bool IsSubscribed
|
||||||
|
{
|
||||||
|
get { lock (syncRoot) return dispatcher is not null; }
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <inheritdoc />
|
||||||
|
public void Subscribe(string subscription, string sessionId)
|
||||||
|
{
|
||||||
|
if (disposed) throw new ObjectDisposedException(nameof(AlarmCommandHandler));
|
||||||
|
if (subscription is null) throw new ArgumentNullException(nameof(subscription));
|
||||||
|
|
||||||
|
lock (syncRoot)
|
||||||
|
{
|
||||||
|
if (dispatcher is not null)
|
||||||
|
{
|
||||||
|
throw new InvalidOperationException(
|
||||||
|
"AlarmCommandHandler already has an active subscription; " +
|
||||||
|
"call Unsubscribe before issuing another SubscribeAlarms command.");
|
||||||
|
}
|
||||||
|
IMxAccessAlarmConsumer consumer = consumerFactory()
|
||||||
|
?? throw new InvalidOperationException("Alarm consumer factory returned null.");
|
||||||
|
MxAccessAlarmEventSink sink = new MxAccessAlarmEventSink(
|
||||||
|
eventQueue, new MxAccessEventMapper());
|
||||||
|
dispatcher = new AlarmDispatcher(consumer, sink, sessionId ?? string.Empty);
|
||||||
|
try
|
||||||
|
{
|
||||||
|
dispatcher.Subscribe(subscription);
|
||||||
|
}
|
||||||
|
catch
|
||||||
|
{
|
||||||
|
try { dispatcher.Dispose(); } catch { /* swallow */ }
|
||||||
|
dispatcher = null;
|
||||||
|
throw;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <inheritdoc />
|
||||||
|
public void Unsubscribe()
|
||||||
|
{
|
||||||
|
AlarmDispatcher? toDispose;
|
||||||
|
lock (syncRoot)
|
||||||
|
{
|
||||||
|
toDispose = dispatcher;
|
||||||
|
dispatcher = null;
|
||||||
|
}
|
||||||
|
toDispose?.Dispose();
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <inheritdoc />
|
||||||
|
public int Acknowledge(
|
||||||
|
Guid alarmGuid,
|
||||||
|
string comment,
|
||||||
|
string operatorUser,
|
||||||
|
string operatorNode,
|
||||||
|
string operatorDomain,
|
||||||
|
string operatorFullName)
|
||||||
|
{
|
||||||
|
AlarmDispatcher? d = GetDispatcherOrThrow();
|
||||||
|
return d.Acknowledge(
|
||||||
|
alarmGuid,
|
||||||
|
comment ?? string.Empty,
|
||||||
|
operatorUser ?? string.Empty,
|
||||||
|
operatorNode ?? string.Empty,
|
||||||
|
operatorDomain ?? string.Empty,
|
||||||
|
operatorFullName ?? string.Empty);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <inheritdoc />
|
||||||
|
public IReadOnlyList<ActiveAlarmSnapshot> QueryActive(string? alarmFilterPrefix)
|
||||||
|
{
|
||||||
|
AlarmDispatcher? d = GetDispatcherOrThrow();
|
||||||
|
IReadOnlyList<ActiveAlarmSnapshot> all = d.SnapshotActiveAlarms();
|
||||||
|
if (string.IsNullOrEmpty(alarmFilterPrefix)) return all;
|
||||||
|
List<ActiveAlarmSnapshot> filtered = new List<ActiveAlarmSnapshot>(all.Count);
|
||||||
|
foreach (ActiveAlarmSnapshot snap in all)
|
||||||
|
{
|
||||||
|
if (snap.AlarmFullReference.StartsWith(alarmFilterPrefix!, StringComparison.Ordinal))
|
||||||
|
{
|
||||||
|
filtered.Add(snap);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return filtered;
|
||||||
|
}
|
||||||
|
|
||||||
|
private AlarmDispatcher GetDispatcherOrThrow()
|
||||||
|
{
|
||||||
|
if (disposed) throw new ObjectDisposedException(nameof(AlarmCommandHandler));
|
||||||
|
AlarmDispatcher? d;
|
||||||
|
lock (syncRoot) d = dispatcher;
|
||||||
|
if (d is null)
|
||||||
|
{
|
||||||
|
throw new InvalidOperationException(
|
||||||
|
"AlarmCommandHandler has no active subscription; " +
|
||||||
|
"call SubscribeAlarms before issuing alarm-related commands.");
|
||||||
|
}
|
||||||
|
return d;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <inheritdoc />
|
||||||
|
public void Dispose()
|
||||||
|
{
|
||||||
|
if (disposed) return;
|
||||||
|
disposed = true;
|
||||||
|
Unsubscribe();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Per-session interface routing the worker's alarm IPC commands —
|
||||||
|
/// <c>SubscribeAlarmsCommand</c>, <c>AcknowledgeAlarmCommand</c>,
|
||||||
|
/// <c>QueryActiveAlarmsCommand</c>, <c>UnsubscribeAlarmsCommand</c> —
|
||||||
|
/// to the underlying <see cref="AlarmDispatcher"/>. Production binding
|
||||||
|
/// is <see cref="AlarmCommandHandler"/>; tests substitute a fake.
|
||||||
|
/// </summary>
|
||||||
|
public interface IAlarmCommandHandler : IDisposable
|
||||||
|
{
|
||||||
|
/// <summary>Begin a subscription against the supplied AVEVA alarm-provider expression.</summary>
|
||||||
|
void Subscribe(string subscription, string sessionId);
|
||||||
|
|
||||||
|
/// <summary>Tear down the active subscription. No-op if not subscribed.</summary>
|
||||||
|
void Unsubscribe();
|
||||||
|
|
||||||
|
/// <summary>Acknowledge a single alarm by GUID. Returns AVEVA's native status (0 = success).</summary>
|
||||||
|
int Acknowledge(
|
||||||
|
Guid alarmGuid,
|
||||||
|
string comment,
|
||||||
|
string operatorUser,
|
||||||
|
string operatorNode,
|
||||||
|
string operatorDomain,
|
||||||
|
string operatorFullName);
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Snapshot the currently-active alarm set, optionally scoped to a
|
||||||
|
/// prefix matched against <c>AlarmFullReference</c>.
|
||||||
|
/// </summary>
|
||||||
|
IReadOnlyList<ActiveAlarmSnapshot> QueryActive(string? alarmFilterPrefix);
|
||||||
|
}
|
||||||
@@ -13,13 +13,14 @@ public sealed class MxAccessCommandExecutor : IStaCommandExecutor
|
|||||||
{
|
{
|
||||||
private readonly MxAccessSession session;
|
private readonly MxAccessSession session;
|
||||||
private readonly VariantConverter variantConverter;
|
private readonly VariantConverter variantConverter;
|
||||||
|
private readonly IAlarmCommandHandler? alarmCommandHandler;
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// Initializes a command executor with an MXAccess session.
|
/// Initializes a command executor with an MXAccess session.
|
||||||
/// </summary>
|
/// </summary>
|
||||||
/// <param name="session">MXAccess session on the STA thread.</param>
|
/// <param name="session">MXAccess session on the STA thread.</param>
|
||||||
public MxAccessCommandExecutor(MxAccessSession session)
|
public MxAccessCommandExecutor(MxAccessSession session)
|
||||||
: this(session, new VariantConverter())
|
: this(session, new VariantConverter(), alarmCommandHandler: null)
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -31,9 +32,24 @@ public sealed class MxAccessCommandExecutor : IStaCommandExecutor
|
|||||||
public MxAccessCommandExecutor(
|
public MxAccessCommandExecutor(
|
||||||
MxAccessSession session,
|
MxAccessSession session,
|
||||||
VariantConverter variantConverter)
|
VariantConverter variantConverter)
|
||||||
|
: this(session, variantConverter, alarmCommandHandler: null)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Initializes a command executor with an MXAccess session, variant
|
||||||
|
/// converter, and an alarm command handler. The alarm handler is
|
||||||
|
/// optional — when null, alarm-side commands return an
|
||||||
|
/// "alarm consumer not configured" diagnostic.
|
||||||
|
/// </summary>
|
||||||
|
public MxAccessCommandExecutor(
|
||||||
|
MxAccessSession session,
|
||||||
|
VariantConverter variantConverter,
|
||||||
|
IAlarmCommandHandler? alarmCommandHandler)
|
||||||
{
|
{
|
||||||
this.session = session ?? throw new ArgumentNullException(nameof(session));
|
this.session = session ?? throw new ArgumentNullException(nameof(session));
|
||||||
this.variantConverter = variantConverter ?? throw new ArgumentNullException(nameof(variantConverter));
|
this.variantConverter = variantConverter ?? throw new ArgumentNullException(nameof(variantConverter));
|
||||||
|
this.alarmCommandHandler = alarmCommandHandler;
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
@@ -64,6 +80,10 @@ public sealed class MxAccessCommandExecutor : IStaCommandExecutor
|
|||||||
MxCommandKind.UnAdviseItemBulk => ExecuteUnAdviseItemBulk(command),
|
MxCommandKind.UnAdviseItemBulk => ExecuteUnAdviseItemBulk(command),
|
||||||
MxCommandKind.SubscribeBulk => ExecuteSubscribeBulk(command),
|
MxCommandKind.SubscribeBulk => ExecuteSubscribeBulk(command),
|
||||||
MxCommandKind.UnsubscribeBulk => ExecuteUnsubscribeBulk(command),
|
MxCommandKind.UnsubscribeBulk => ExecuteUnsubscribeBulk(command),
|
||||||
|
MxCommandKind.SubscribeAlarms => ExecuteSubscribeAlarms(command),
|
||||||
|
MxCommandKind.UnsubscribeAlarms => ExecuteUnsubscribeAlarms(command),
|
||||||
|
MxCommandKind.AcknowledgeAlarm => ExecuteAcknowledgeAlarm(command),
|
||||||
|
MxCommandKind.QueryActiveAlarms => ExecuteQueryActiveAlarms(command),
|
||||||
_ => CreateInvalidRequestReply(command, $"Unsupported MXAccess command kind {command.Kind}."),
|
_ => CreateInvalidRequestReply(command, $"Unsupported MXAccess command kind {command.Kind}."),
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
@@ -280,6 +300,153 @@ public sealed class MxAccessCommandExecutor : IStaCommandExecutor
|
|||||||
session.UnsubscribeBulk(unsubscribeBulkCommand.ServerHandle, unsubscribeBulkCommand.ItemHandles));
|
session.UnsubscribeBulk(unsubscribeBulkCommand.ServerHandle, unsubscribeBulkCommand.ItemHandles));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private MxCommandReply ExecuteSubscribeAlarms(StaCommand command)
|
||||||
|
{
|
||||||
|
if (command.Command.PayloadCase != MxCommand.PayloadOneofCase.SubscribeAlarms)
|
||||||
|
{
|
||||||
|
return CreateInvalidRequestReply(command, "SubscribeAlarms command payload is required.");
|
||||||
|
}
|
||||||
|
if (alarmCommandHandler is null)
|
||||||
|
{
|
||||||
|
return CreateInvalidRequestReply(
|
||||||
|
command,
|
||||||
|
"SubscribeAlarms requires an alarm command handler; the worker was constructed without one.");
|
||||||
|
}
|
||||||
|
|
||||||
|
string subscription = command.Command.SubscribeAlarms.SubscriptionExpression ?? string.Empty;
|
||||||
|
if (string.IsNullOrWhiteSpace(subscription))
|
||||||
|
{
|
||||||
|
return CreateInvalidRequestReply(command, "SubscribeAlarms.subscription_expression is required.");
|
||||||
|
}
|
||||||
|
|
||||||
|
try
|
||||||
|
{
|
||||||
|
alarmCommandHandler.Subscribe(subscription, command.SessionId);
|
||||||
|
return CreateOkReply(command);
|
||||||
|
}
|
||||||
|
catch (Exception ex)
|
||||||
|
{
|
||||||
|
return CreateAlarmFailureReply(command, ex);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private MxCommandReply ExecuteUnsubscribeAlarms(StaCommand command)
|
||||||
|
{
|
||||||
|
if (command.Command.PayloadCase != MxCommand.PayloadOneofCase.UnsubscribeAlarms)
|
||||||
|
{
|
||||||
|
return CreateInvalidRequestReply(command, "UnsubscribeAlarms command payload is required.");
|
||||||
|
}
|
||||||
|
if (alarmCommandHandler is null)
|
||||||
|
{
|
||||||
|
// No handler configured — Unsubscribe is a no-op in that case;
|
||||||
|
// it can't be in a subscribed state to begin with.
|
||||||
|
return CreateOkReply(command);
|
||||||
|
}
|
||||||
|
|
||||||
|
try
|
||||||
|
{
|
||||||
|
alarmCommandHandler.Unsubscribe();
|
||||||
|
return CreateOkReply(command);
|
||||||
|
}
|
||||||
|
catch (Exception ex)
|
||||||
|
{
|
||||||
|
return CreateAlarmFailureReply(command, ex);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private MxCommandReply ExecuteAcknowledgeAlarm(StaCommand command)
|
||||||
|
{
|
||||||
|
if (command.Command.PayloadCase != MxCommand.PayloadOneofCase.AcknowledgeAlarmCommand)
|
||||||
|
{
|
||||||
|
return CreateInvalidRequestReply(command, "AcknowledgeAlarm command payload is required.");
|
||||||
|
}
|
||||||
|
if (alarmCommandHandler is null)
|
||||||
|
{
|
||||||
|
return CreateInvalidRequestReply(
|
||||||
|
command,
|
||||||
|
"AcknowledgeAlarm requires an alarm command handler; the worker was constructed without one.");
|
||||||
|
}
|
||||||
|
|
||||||
|
AcknowledgeAlarmCommand payload = command.Command.AcknowledgeAlarmCommand;
|
||||||
|
if (!Guid.TryParse(payload.AlarmGuid, out Guid alarmGuid))
|
||||||
|
{
|
||||||
|
return CreateInvalidRequestReply(
|
||||||
|
command,
|
||||||
|
$"AcknowledgeAlarm.alarm_guid is not a valid canonical GUID: '{payload.AlarmGuid}'.");
|
||||||
|
}
|
||||||
|
|
||||||
|
try
|
||||||
|
{
|
||||||
|
int rc = alarmCommandHandler.Acknowledge(
|
||||||
|
alarmGuid,
|
||||||
|
payload.Comment,
|
||||||
|
payload.OperatorUser,
|
||||||
|
payload.OperatorNode,
|
||||||
|
payload.OperatorDomain,
|
||||||
|
payload.OperatorFullName);
|
||||||
|
MxCommandReply reply = CreateOkReply(command);
|
||||||
|
reply.Hresult = rc;
|
||||||
|
reply.AcknowledgeAlarm = new AcknowledgeAlarmReplyPayload
|
||||||
|
{
|
||||||
|
NativeStatus = rc,
|
||||||
|
};
|
||||||
|
if (rc != 0)
|
||||||
|
{
|
||||||
|
reply.DiagnosticMessage = $"AVEVA AlarmAckByGUID returned non-zero status {rc}.";
|
||||||
|
}
|
||||||
|
return reply;
|
||||||
|
}
|
||||||
|
catch (Exception ex)
|
||||||
|
{
|
||||||
|
return CreateAlarmFailureReply(command, ex);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private MxCommandReply ExecuteQueryActiveAlarms(StaCommand command)
|
||||||
|
{
|
||||||
|
if (command.Command.PayloadCase != MxCommand.PayloadOneofCase.QueryActiveAlarmsCommand)
|
||||||
|
{
|
||||||
|
return CreateInvalidRequestReply(command, "QueryActiveAlarms command payload is required.");
|
||||||
|
}
|
||||||
|
if (alarmCommandHandler is null)
|
||||||
|
{
|
||||||
|
return CreateInvalidRequestReply(
|
||||||
|
command,
|
||||||
|
"QueryActiveAlarms requires an alarm command handler; the worker was constructed without one.");
|
||||||
|
}
|
||||||
|
|
||||||
|
try
|
||||||
|
{
|
||||||
|
IReadOnlyList<ActiveAlarmSnapshot> snapshots = alarmCommandHandler.QueryActive(
|
||||||
|
command.Command.QueryActiveAlarmsCommand.AlarmFilterPrefix);
|
||||||
|
QueryActiveAlarmsReplyPayload payload = new QueryActiveAlarmsReplyPayload();
|
||||||
|
payload.Snapshots.AddRange(snapshots);
|
||||||
|
MxCommandReply reply = CreateOkReply(command);
|
||||||
|
reply.QueryActiveAlarms = payload;
|
||||||
|
return reply;
|
||||||
|
}
|
||||||
|
catch (Exception ex)
|
||||||
|
{
|
||||||
|
return CreateAlarmFailureReply(command, ex);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static MxCommandReply CreateAlarmFailureReply(StaCommand command, Exception exception)
|
||||||
|
{
|
||||||
|
return new MxCommandReply
|
||||||
|
{
|
||||||
|
SessionId = command.SessionId,
|
||||||
|
CorrelationId = command.CorrelationId,
|
||||||
|
Kind = command.Kind,
|
||||||
|
ProtocolStatus = new ProtocolStatus
|
||||||
|
{
|
||||||
|
Code = ProtocolStatusCode.MxaccessFailure,
|
||||||
|
Message = exception.Message,
|
||||||
|
},
|
||||||
|
DiagnosticMessage = $"{exception.GetType().FullName}: {exception.Message}",
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
private static MxCommandReply CreateOkReply(StaCommand command)
|
private static MxCommandReply CreateOkReply(StaCommand command)
|
||||||
{
|
{
|
||||||
return new MxCommandReply
|
return new MxCommandReply
|
||||||
|
|||||||
@@ -4,6 +4,7 @@ using System.Diagnostics;
|
|||||||
using System.Threading;
|
using System.Threading;
|
||||||
using System.Threading.Tasks;
|
using System.Threading.Tasks;
|
||||||
using MxGateway.Contracts.Proto;
|
using MxGateway.Contracts.Proto;
|
||||||
|
using MxGateway.Worker.Conversion;
|
||||||
using MxGateway.Worker.Sta;
|
using MxGateway.Worker.Sta;
|
||||||
|
|
||||||
namespace MxGateway.Worker.MxAccess;
|
namespace MxGateway.Worker.MxAccess;
|
||||||
@@ -14,8 +15,10 @@ public sealed class MxAccessStaSession : IWorkerRuntimeSession
|
|||||||
private readonly IMxAccessEventSink eventSink;
|
private readonly IMxAccessEventSink eventSink;
|
||||||
private readonly MxAccessEventQueue eventQueue;
|
private readonly MxAccessEventQueue eventQueue;
|
||||||
private readonly StaRuntime staRuntime;
|
private readonly StaRuntime staRuntime;
|
||||||
|
private readonly Func<MxAccessEventQueue, IAlarmCommandHandler>? alarmCommandHandlerFactory;
|
||||||
private StaCommandDispatcher? commandDispatcher;
|
private StaCommandDispatcher? commandDispatcher;
|
||||||
private MxAccessSession? session;
|
private MxAccessSession? session;
|
||||||
|
private IAlarmCommandHandler? alarmCommandHandler;
|
||||||
private bool disposed;
|
private bool disposed;
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
@@ -69,11 +72,29 @@ public sealed class MxAccessStaSession : IWorkerRuntimeSession
|
|||||||
IMxAccessComObjectFactory factory,
|
IMxAccessComObjectFactory factory,
|
||||||
IMxAccessEventSink eventSink,
|
IMxAccessEventSink eventSink,
|
||||||
MxAccessEventQueue eventQueue)
|
MxAccessEventQueue eventQueue)
|
||||||
|
: this(staRuntime, factory, eventSink, eventQueue, alarmCommandHandlerFactory: null)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Initializes a new instance of <see cref="MxAccessStaSession"/> with all
|
||||||
|
/// dependencies including an alarm-command handler factory. The factory is
|
||||||
|
/// invoked on the STA thread during <see cref="StartAsync(string, int, CancellationToken)"/>;
|
||||||
|
/// pass <c>null</c> to opt out of alarm-side commands (the worker rejects
|
||||||
|
/// them with an "alarm consumer not configured" diagnostic).
|
||||||
|
/// </summary>
|
||||||
|
public MxAccessStaSession(
|
||||||
|
StaRuntime staRuntime,
|
||||||
|
IMxAccessComObjectFactory factory,
|
||||||
|
IMxAccessEventSink eventSink,
|
||||||
|
MxAccessEventQueue eventQueue,
|
||||||
|
Func<MxAccessEventQueue, IAlarmCommandHandler>? alarmCommandHandlerFactory)
|
||||||
{
|
{
|
||||||
this.staRuntime = staRuntime ?? throw new ArgumentNullException(nameof(staRuntime));
|
this.staRuntime = staRuntime ?? throw new ArgumentNullException(nameof(staRuntime));
|
||||||
this.factory = factory ?? throw new ArgumentNullException(nameof(factory));
|
this.factory = factory ?? throw new ArgumentNullException(nameof(factory));
|
||||||
this.eventSink = eventSink ?? throw new ArgumentNullException(nameof(eventSink));
|
this.eventSink = eventSink ?? throw new ArgumentNullException(nameof(eventSink));
|
||||||
this.eventQueue = eventQueue ?? throw new ArgumentNullException(nameof(eventQueue));
|
this.eventQueue = eventQueue ?? throw new ArgumentNullException(nameof(eventQueue));
|
||||||
|
this.alarmCommandHandlerFactory = alarmCommandHandlerFactory;
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
@@ -117,9 +138,16 @@ public sealed class MxAccessStaSession : IWorkerRuntimeSession
|
|||||||
}
|
}
|
||||||
|
|
||||||
session = MxAccessSession.Create(factory, eventSink, sessionId);
|
session = MxAccessSession.Create(factory, eventSink, sessionId);
|
||||||
|
if (alarmCommandHandlerFactory is not null)
|
||||||
|
{
|
||||||
|
alarmCommandHandler = alarmCommandHandlerFactory(eventQueue);
|
||||||
|
}
|
||||||
commandDispatcher = new StaCommandDispatcher(
|
commandDispatcher = new StaCommandDispatcher(
|
||||||
staRuntime,
|
staRuntime,
|
||||||
new MxAccessCommandExecutor(session));
|
new MxAccessCommandExecutor(
|
||||||
|
session,
|
||||||
|
new VariantConverter(),
|
||||||
|
alarmCommandHandler));
|
||||||
|
|
||||||
return session.CreateWorkerReady(workerProcessId);
|
return session.CreateWorkerReady(workerProcessId);
|
||||||
},
|
},
|
||||||
@@ -279,6 +307,27 @@ public sealed class MxAccessStaSession : IWorkerRuntimeSession
|
|||||||
|
|
||||||
commandDispatcher?.RequestShutdown();
|
commandDispatcher?.RequestShutdown();
|
||||||
|
|
||||||
|
// Stop the alarm consumer's polling timer and tear down the
|
||||||
|
// dispatcher BEFORE the data-side cleanup begins. The alarm
|
||||||
|
// consumer holds a wnwrap COM RCW that needs the STA pump to
|
||||||
|
// unwind cleanly; doing it here gives it the opportunity while
|
||||||
|
// the STA is still alive.
|
||||||
|
IAlarmCommandHandler? alarmHandlerToDispose = alarmCommandHandler;
|
||||||
|
alarmCommandHandler = null;
|
||||||
|
if (alarmHandlerToDispose is not null)
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
await staRuntime.InvokeAsync(
|
||||||
|
() => alarmHandlerToDispose.Dispose(),
|
||||||
|
cancellationToken).ConfigureAwait(false);
|
||||||
|
}
|
||||||
|
catch
|
||||||
|
{
|
||||||
|
// Swallow — alarm cleanup must not block data shutdown.
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
Stopwatch stopwatch = Stopwatch.StartNew();
|
Stopwatch stopwatch = Stopwatch.StartNew();
|
||||||
MxAccessShutdownResult result;
|
MxAccessShutdownResult result;
|
||||||
if (session is null)
|
if (session is null)
|
||||||
@@ -333,6 +382,19 @@ public sealed class MxAccessStaSession : IWorkerRuntimeSession
|
|||||||
|
|
||||||
RequestShutdown();
|
RequestShutdown();
|
||||||
|
|
||||||
|
IAlarmCommandHandler? alarmHandlerToDispose = alarmCommandHandler;
|
||||||
|
alarmCommandHandler = null;
|
||||||
|
if (alarmHandlerToDispose is not null)
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
staRuntime.InvokeAsync(() => alarmHandlerToDispose.Dispose())
|
||||||
|
.Wait(TimeSpan.FromSeconds(2));
|
||||||
|
}
|
||||||
|
catch (AggregateException) { }
|
||||||
|
catch (ObjectDisposedException) { }
|
||||||
|
}
|
||||||
|
|
||||||
if (session is not null)
|
if (session is not null)
|
||||||
{
|
{
|
||||||
try
|
try
|
||||||
|
|||||||
Reference in New Issue
Block a user