A.3 (gateway dispatcher): WorkerAlarmRpcDispatcher routes alarm RPCs over the worker pipe
Replaces NotWiredAlarmRpcDispatcher in DI with a production
implementation that issues the new MxCommandKind.{AcknowledgeAlarm,
QueryActiveAlarms} commands across the IPC and unwraps the resulting
MxCommandReply into the public RPC types.
QueryActiveAlarms is fully wired: builds the QueryActiveAlarmsCommand
(forwarding alarm_filter_prefix), invokes it on the resolved
GatewaySession's worker client, and yields each ActiveAlarmSnapshot
from the QueryActiveAlarmsReplyPayload as the RPC stream. Worker
failures + missing sessions yield an empty stream — matches the
ConditionRefresh contract clients already speak to.
AcknowledgeAlarm is partially wired: the public RPC takes
AlarmFullReference (Provider!Group.Tag), but the worker's wnwrap
consumer acks by GUID. Strategy:
- If AlarmFullReference parses as a canonical GUID, forward it
directly through MxCommandKind.AcknowledgeAlarm. Native status
flows back via MxCommandReply.Hresult and the dedicated
AcknowledgeAlarmReplyPayload.NativeStatus.
- Otherwise, return InvalidRequest with a clear diagnostic naming the
follow-up — reference→GUID lookup needs a worker-side AlarmAckByName
command wrapping wwAlarmConsumerClass.AlarmAckByName.
DI: SessionServiceCollectionExtensions registers WorkerAlarmRpcDispatcher
as the default IAlarmRpcDispatcher; MxAccessGatewayService picks it up
via constructor injection. NotWiredAlarmRpcDispatcher is retained for
test fixtures that want the no-side-effect fake.
Tests: 7 new unit tests cover session-not-found short-circuit, GUID-vs-
reference branching, native-status propagation, worker MxaccessFailure
diagnostic propagation, and snapshot-stream yielding. Server test
suite total: 288/0 fail. Solution builds clean.
End-to-end alarms-over-gateway pipeline status:
consumer → sink → queue (A.2 + A.3 in-process slice)
worker IPC commands (A.3 worker slice)
gateway dispatcher (this slice)
Remaining for full E2E:
- Auto-issue SubscribeAlarms on session open (or add a public
SubscribeAlarms RPC). Without this trigger the consumer never
starts and Acknowledge/Query return "not subscribed".
- AlarmAckByName worker command for ack-by-reference.
- End-to-end live test against the dev rig.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -11,6 +11,7 @@ public static class SessionServiceCollectionExtensions
|
||||
services.AddSingleton<ISessionRegistry, SessionRegistry>();
|
||||
services.AddSingleton<ISessionWorkerClientFactory, SessionWorkerClientFactory>();
|
||||
services.AddSingleton<ISessionManager, SessionManager>();
|
||||
services.AddSingleton<IAlarmRpcDispatcher, WorkerAlarmRpcDispatcher>();
|
||||
services.AddHostedService<SessionLeaseMonitorHostedService>();
|
||||
services.AddHostedService<SessionShutdownHostedService>();
|
||||
|
||||
|
||||
@@ -0,0 +1,172 @@
|
||||
using System.Collections.Generic;
|
||||
using System.Runtime.CompilerServices;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using Google.Protobuf.WellKnownTypes;
|
||||
using MxGateway.Contracts.Proto;
|
||||
using MxGateway.Server.Grpc;
|
||||
|
||||
namespace MxGateway.Server.Sessions;
|
||||
|
||||
/// <summary>
|
||||
/// Production <see cref="IAlarmRpcDispatcher"/> that routes the public
|
||||
/// <c>AcknowledgeAlarm</c> + <c>QueryActiveAlarms</c> RPCs through the
|
||||
/// worker pipe IPC. Replaces <see cref="NotWiredAlarmRpcDispatcher"/>
|
||||
/// once the worker AlarmCommandHandler is wired in.
|
||||
/// </summary>
|
||||
/// <remarks>
|
||||
/// <para>
|
||||
/// <c>QueryActiveAlarms</c> is fully wired: issues a
|
||||
/// <see cref="QueryActiveAlarmsCommand"/> over the pipe and yields
|
||||
/// each <see cref="ActiveAlarmSnapshot"/> from the
|
||||
/// <see cref="QueryActiveAlarmsReplyPayload"/>.
|
||||
/// </para>
|
||||
/// <para>
|
||||
/// <c>AcknowledgeAlarm</c> is partially wired: the public RPC's
|
||||
/// <see cref="AcknowledgeAlarmRequest.AlarmFullReference"/> is a
|
||||
/// <c>Provider!Group.Tag</c> string, but the worker's wnwrap consumer
|
||||
/// acks by GUID. When the supplied reference parses as a GUID
|
||||
/// directly, the dispatcher forwards it as-is. Otherwise it
|
||||
/// returns an <c>Unimplemented</c> diagnostic. Resolving
|
||||
/// reference→GUID requires an additional worker IPC command
|
||||
/// (e.g. <c>AlarmAckByName</c> wrapping
|
||||
/// <c>wwAlarmConsumerClass.AlarmAckByName</c>) and is tracked as
|
||||
/// a follow-up.
|
||||
/// </para>
|
||||
/// </remarks>
|
||||
public sealed class WorkerAlarmRpcDispatcher : IAlarmRpcDispatcher
|
||||
{
|
||||
private readonly ISessionRegistry sessionRegistry;
|
||||
private readonly TimeProvider timeProvider;
|
||||
|
||||
public WorkerAlarmRpcDispatcher(ISessionRegistry sessionRegistry, TimeProvider? timeProvider = null)
|
||||
{
|
||||
this.sessionRegistry = sessionRegistry ?? throw new System.ArgumentNullException(nameof(sessionRegistry));
|
||||
this.timeProvider = timeProvider ?? TimeProvider.System;
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
public async Task<AcknowledgeAlarmReply> AcknowledgeAsync(
|
||||
AcknowledgeAlarmRequest request,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
if (request is null) throw new System.ArgumentNullException(nameof(request));
|
||||
|
||||
if (!sessionRegistry.TryGet(request.SessionId, out GatewaySession session))
|
||||
{
|
||||
return new AcknowledgeAlarmReply
|
||||
{
|
||||
SessionId = request.SessionId,
|
||||
CorrelationId = request.ClientCorrelationId,
|
||||
ProtocolStatus = MxAccessGrpcMapper.SessionNotFound(
|
||||
$"Session '{request.SessionId}' not found."),
|
||||
DiagnosticMessage = "AcknowledgeAlarm: session not found.",
|
||||
};
|
||||
}
|
||||
|
||||
if (!System.Guid.TryParse(request.AlarmFullReference, out System.Guid guid))
|
||||
{
|
||||
// Reference→GUID lookup not yet implemented. Surface a clear
|
||||
// diagnostic so client teams can plumb the reference parser
|
||||
// when the worker AlarmAckByName command lands.
|
||||
return new AcknowledgeAlarmReply
|
||||
{
|
||||
SessionId = request.SessionId,
|
||||
CorrelationId = request.ClientCorrelationId,
|
||||
ProtocolStatus = new ProtocolStatus
|
||||
{
|
||||
Code = ProtocolStatusCode.InvalidRequest,
|
||||
Message = "AlarmFullReference must currently be a canonical GUID; reference→GUID lookup is pending the AlarmAckByName worker command.",
|
||||
},
|
||||
DiagnosticMessage = $"AcknowledgeAlarm received non-GUID reference '{request.AlarmFullReference}'.",
|
||||
};
|
||||
}
|
||||
|
||||
WorkerCommand workerCommand = new WorkerCommand
|
||||
{
|
||||
Command = new MxCommand
|
||||
{
|
||||
Kind = MxCommandKind.AcknowledgeAlarm,
|
||||
AcknowledgeAlarmCommand = new AcknowledgeAlarmCommand
|
||||
{
|
||||
AlarmGuid = guid.ToString(),
|
||||
Comment = request.Comment ?? string.Empty,
|
||||
OperatorUser = request.OperatorUser ?? string.Empty,
|
||||
// Operator node/domain/full-name are not on the public
|
||||
// RPC surface today; pass empty strings so the worker
|
||||
// honours the existing AcknowledgeAlarmCommand schema.
|
||||
OperatorNode = string.Empty,
|
||||
OperatorDomain = string.Empty,
|
||||
OperatorFullName = string.Empty,
|
||||
},
|
||||
},
|
||||
EnqueueTimestamp = Timestamp.FromDateTimeOffset(timeProvider.GetUtcNow()),
|
||||
};
|
||||
|
||||
WorkerCommandReply workerReply = await session.InvokeAsync(workerCommand, cancellationToken)
|
||||
.ConfigureAwait(false);
|
||||
|
||||
MxCommandReply mxReply = workerReply.Reply ?? new MxCommandReply
|
||||
{
|
||||
ProtocolStatus = new ProtocolStatus
|
||||
{
|
||||
Code = ProtocolStatusCode.ProtocolViolation,
|
||||
Message = "Worker reply did not include an MxCommandReply.",
|
||||
},
|
||||
};
|
||||
|
||||
AcknowledgeAlarmReply reply = new AcknowledgeAlarmReply
|
||||
{
|
||||
SessionId = request.SessionId,
|
||||
CorrelationId = request.ClientCorrelationId,
|
||||
ProtocolStatus = mxReply.ProtocolStatus ?? MxAccessGrpcMapper.Ok(),
|
||||
DiagnosticMessage = mxReply.DiagnosticMessage ?? string.Empty,
|
||||
};
|
||||
if (mxReply.HasHresult)
|
||||
{
|
||||
reply.Hresult = mxReply.Hresult;
|
||||
}
|
||||
return reply;
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
public async IAsyncEnumerable<ActiveAlarmSnapshot> QueryActiveAlarmsAsync(
|
||||
QueryActiveAlarmsRequest request,
|
||||
[EnumeratorCancellation] CancellationToken cancellationToken)
|
||||
{
|
||||
if (request is null) throw new System.ArgumentNullException(nameof(request));
|
||||
|
||||
if (!sessionRegistry.TryGet(request.SessionId, out GatewaySession session))
|
||||
{
|
||||
yield break;
|
||||
}
|
||||
|
||||
WorkerCommand workerCommand = new WorkerCommand
|
||||
{
|
||||
Command = new MxCommand
|
||||
{
|
||||
Kind = MxCommandKind.QueryActiveAlarms,
|
||||
QueryActiveAlarmsCommand = new QueryActiveAlarmsCommand
|
||||
{
|
||||
AlarmFilterPrefix = request.AlarmFilterPrefix ?? string.Empty,
|
||||
},
|
||||
},
|
||||
EnqueueTimestamp = Timestamp.FromDateTimeOffset(timeProvider.GetUtcNow()),
|
||||
};
|
||||
|
||||
WorkerCommandReply workerReply = await session.InvokeAsync(workerCommand, cancellationToken)
|
||||
.ConfigureAwait(false);
|
||||
|
||||
MxCommandReply? mxReply = workerReply.Reply;
|
||||
if (mxReply?.ProtocolStatus?.Code != ProtocolStatusCode.Ok) yield break;
|
||||
|
||||
QueryActiveAlarmsReplyPayload? payload = mxReply.QueryActiveAlarms;
|
||||
if (payload is null) yield break;
|
||||
|
||||
foreach (ActiveAlarmSnapshot snapshot in payload.Snapshots)
|
||||
{
|
||||
cancellationToken.ThrowIfCancellationRequested();
|
||||
yield return snapshot;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,300 @@
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Linq;
|
||||
using System.Runtime.CompilerServices;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using MxGateway.Contracts.Proto;
|
||||
using MxGateway.Server.Sessions;
|
||||
using MxGateway.Server.Workers;
|
||||
|
||||
namespace MxGateway.Tests.Gateway.Sessions;
|
||||
|
||||
/// <summary>
|
||||
/// Pins the production <see cref="WorkerAlarmRpcDispatcher"/>'s behaviour:
|
||||
/// resolves the session by id, issues the matching MxCommand over the
|
||||
/// worker pipe, and unwraps the reply into AcknowledgeAlarmReply or the
|
||||
/// ActiveAlarmSnapshot stream.
|
||||
/// </summary>
|
||||
public sealed class WorkerAlarmRpcDispatcherTests
|
||||
{
|
||||
[Fact]
|
||||
public async Task AcknowledgeAsync_returns_session_not_found_when_session_missing()
|
||||
{
|
||||
SessionRegistry registry = new SessionRegistry();
|
||||
WorkerAlarmRpcDispatcher dispatcher = new WorkerAlarmRpcDispatcher(registry);
|
||||
|
||||
AcknowledgeAlarmReply reply = await dispatcher.AcknowledgeAsync(
|
||||
new AcknowledgeAlarmRequest
|
||||
{
|
||||
SessionId = "missing",
|
||||
ClientCorrelationId = "c1",
|
||||
AlarmFullReference = Guid.NewGuid().ToString(),
|
||||
},
|
||||
CancellationToken.None);
|
||||
|
||||
Assert.Equal(ProtocolStatusCode.SessionNotFound, reply.ProtocolStatus.Code);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task AcknowledgeAsync_returns_invalid_request_when_reference_is_not_a_guid()
|
||||
{
|
||||
SessionRegistry registry = new SessionRegistry();
|
||||
FakeAlarmWorkerClient worker = new FakeAlarmWorkerClient();
|
||||
GatewaySession session = NewSession("s1");
|
||||
session.AttachWorkerClient(worker);
|
||||
session.MarkReady();
|
||||
registry.TryAdd(session);
|
||||
|
||||
WorkerAlarmRpcDispatcher dispatcher = new WorkerAlarmRpcDispatcher(registry);
|
||||
|
||||
AcknowledgeAlarmReply reply = await dispatcher.AcknowledgeAsync(
|
||||
new AcknowledgeAlarmRequest
|
||||
{
|
||||
SessionId = "s1",
|
||||
ClientCorrelationId = "c1",
|
||||
AlarmFullReference = "Galaxy!Area.Tag", // not a GUID
|
||||
Comment = "x",
|
||||
OperatorUser = "u",
|
||||
},
|
||||
CancellationToken.None);
|
||||
|
||||
Assert.Equal(ProtocolStatusCode.InvalidRequest, reply.ProtocolStatus.Code);
|
||||
Assert.Equal(0, worker.InvokeCount); // dispatcher short-circuits before the IPC.
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task AcknowledgeAsync_forwards_guid_and_returns_native_status()
|
||||
{
|
||||
SessionRegistry registry = new SessionRegistry();
|
||||
Guid alarmGuid = Guid.NewGuid();
|
||||
FakeAlarmWorkerClient worker = new FakeAlarmWorkerClient
|
||||
{
|
||||
ReplyFactory = command =>
|
||||
{
|
||||
Assert.Equal(MxCommandKind.AcknowledgeAlarm, command.Command.Kind);
|
||||
Assert.Equal(alarmGuid.ToString(), command.Command.AcknowledgeAlarmCommand.AlarmGuid);
|
||||
Assert.Equal("ack", command.Command.AcknowledgeAlarmCommand.Comment);
|
||||
Assert.Equal("alice", command.Command.AcknowledgeAlarmCommand.OperatorUser);
|
||||
return new MxCommandReply
|
||||
{
|
||||
Kind = MxCommandKind.AcknowledgeAlarm,
|
||||
ProtocolStatus = new ProtocolStatus { Code = ProtocolStatusCode.Ok, Message = "OK" },
|
||||
Hresult = 0,
|
||||
AcknowledgeAlarm = new AcknowledgeAlarmReplyPayload { NativeStatus = 0 },
|
||||
};
|
||||
},
|
||||
};
|
||||
GatewaySession session = NewSession("s1");
|
||||
session.AttachWorkerClient(worker);
|
||||
session.MarkReady();
|
||||
registry.TryAdd(session);
|
||||
|
||||
WorkerAlarmRpcDispatcher dispatcher = new WorkerAlarmRpcDispatcher(registry);
|
||||
|
||||
AcknowledgeAlarmReply reply = await dispatcher.AcknowledgeAsync(
|
||||
new AcknowledgeAlarmRequest
|
||||
{
|
||||
SessionId = "s1",
|
||||
ClientCorrelationId = "c1",
|
||||
AlarmFullReference = alarmGuid.ToString(),
|
||||
Comment = "ack",
|
||||
OperatorUser = "alice",
|
||||
},
|
||||
CancellationToken.None);
|
||||
|
||||
Assert.Equal(ProtocolStatusCode.Ok, reply.ProtocolStatus.Code);
|
||||
Assert.Equal(0, reply.Hresult);
|
||||
Assert.Equal("s1", reply.SessionId);
|
||||
Assert.Equal("c1", reply.CorrelationId);
|
||||
Assert.Equal(1, worker.InvokeCount);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task AcknowledgeAsync_propagates_worker_diagnostic_on_failure()
|
||||
{
|
||||
SessionRegistry registry = new SessionRegistry();
|
||||
FakeAlarmWorkerClient worker = new FakeAlarmWorkerClient
|
||||
{
|
||||
ReplyFactory = _ => new MxCommandReply
|
||||
{
|
||||
Kind = MxCommandKind.AcknowledgeAlarm,
|
||||
ProtocolStatus = new ProtocolStatus
|
||||
{
|
||||
Code = ProtocolStatusCode.MxaccessFailure,
|
||||
Message = "AVEVA Acknowledge failed.",
|
||||
},
|
||||
Hresult = -123,
|
||||
DiagnosticMessage = "AVEVA AlarmAckByGUID returned non-zero status -123.",
|
||||
},
|
||||
};
|
||||
GatewaySession session = NewSession("s1");
|
||||
session.AttachWorkerClient(worker);
|
||||
session.MarkReady();
|
||||
registry.TryAdd(session);
|
||||
|
||||
WorkerAlarmRpcDispatcher dispatcher = new WorkerAlarmRpcDispatcher(registry);
|
||||
|
||||
AcknowledgeAlarmReply reply = await dispatcher.AcknowledgeAsync(
|
||||
new AcknowledgeAlarmRequest
|
||||
{
|
||||
SessionId = "s1",
|
||||
AlarmFullReference = Guid.NewGuid().ToString(),
|
||||
},
|
||||
CancellationToken.None);
|
||||
|
||||
Assert.Equal(ProtocolStatusCode.MxaccessFailure, reply.ProtocolStatus.Code);
|
||||
Assert.Equal(-123, reply.Hresult);
|
||||
Assert.Contains("-123", reply.DiagnosticMessage);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task QueryActiveAlarmsAsync_yields_each_snapshot_from_payload()
|
||||
{
|
||||
SessionRegistry registry = new SessionRegistry();
|
||||
FakeAlarmWorkerClient worker = new FakeAlarmWorkerClient
|
||||
{
|
||||
ReplyFactory = command =>
|
||||
{
|
||||
Assert.Equal(MxCommandKind.QueryActiveAlarms, command.Command.Kind);
|
||||
QueryActiveAlarmsReplyPayload payload = new QueryActiveAlarmsReplyPayload();
|
||||
payload.Snapshots.Add(new ActiveAlarmSnapshot
|
||||
{
|
||||
AlarmFullReference = "Galaxy!A.T1",
|
||||
CurrentState = AlarmConditionState.Active,
|
||||
Severity = 500,
|
||||
});
|
||||
payload.Snapshots.Add(new ActiveAlarmSnapshot
|
||||
{
|
||||
AlarmFullReference = "Galaxy!A.T2",
|
||||
CurrentState = AlarmConditionState.ActiveAcked,
|
||||
Severity = 100,
|
||||
});
|
||||
return new MxCommandReply
|
||||
{
|
||||
Kind = MxCommandKind.QueryActiveAlarms,
|
||||
ProtocolStatus = new ProtocolStatus { Code = ProtocolStatusCode.Ok, Message = "OK" },
|
||||
QueryActiveAlarms = payload,
|
||||
};
|
||||
},
|
||||
};
|
||||
GatewaySession session = NewSession("s1");
|
||||
session.AttachWorkerClient(worker);
|
||||
session.MarkReady();
|
||||
registry.TryAdd(session);
|
||||
|
||||
WorkerAlarmRpcDispatcher dispatcher = new WorkerAlarmRpcDispatcher(registry);
|
||||
|
||||
List<ActiveAlarmSnapshot> collected = new List<ActiveAlarmSnapshot>();
|
||||
await foreach (ActiveAlarmSnapshot snap in dispatcher.QueryActiveAlarmsAsync(
|
||||
new QueryActiveAlarmsRequest { SessionId = "s1" },
|
||||
CancellationToken.None))
|
||||
{
|
||||
collected.Add(snap);
|
||||
}
|
||||
|
||||
Assert.Equal(2, collected.Count);
|
||||
Assert.Equal("Galaxy!A.T1", collected[0].AlarmFullReference);
|
||||
Assert.Equal("Galaxy!A.T2", collected[1].AlarmFullReference);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task QueryActiveAlarmsAsync_yields_empty_when_session_missing()
|
||||
{
|
||||
SessionRegistry registry = new SessionRegistry();
|
||||
WorkerAlarmRpcDispatcher dispatcher = new WorkerAlarmRpcDispatcher(registry);
|
||||
|
||||
List<ActiveAlarmSnapshot> collected = new List<ActiveAlarmSnapshot>();
|
||||
await foreach (ActiveAlarmSnapshot snap in dispatcher.QueryActiveAlarmsAsync(
|
||||
new QueryActiveAlarmsRequest { SessionId = "missing" },
|
||||
CancellationToken.None))
|
||||
{
|
||||
collected.Add(snap);
|
||||
}
|
||||
|
||||
Assert.Empty(collected);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task QueryActiveAlarmsAsync_yields_empty_on_worker_failure()
|
||||
{
|
||||
SessionRegistry registry = new SessionRegistry();
|
||||
FakeAlarmWorkerClient worker = new FakeAlarmWorkerClient
|
||||
{
|
||||
ReplyFactory = _ => new MxCommandReply
|
||||
{
|
||||
Kind = MxCommandKind.QueryActiveAlarms,
|
||||
ProtocolStatus = new ProtocolStatus
|
||||
{
|
||||
Code = ProtocolStatusCode.MxaccessFailure,
|
||||
Message = "alarm consumer not subscribed",
|
||||
},
|
||||
},
|
||||
};
|
||||
GatewaySession session = NewSession("s1");
|
||||
session.AttachWorkerClient(worker);
|
||||
session.MarkReady();
|
||||
registry.TryAdd(session);
|
||||
|
||||
WorkerAlarmRpcDispatcher dispatcher = new WorkerAlarmRpcDispatcher(registry);
|
||||
|
||||
List<ActiveAlarmSnapshot> collected = new List<ActiveAlarmSnapshot>();
|
||||
await foreach (ActiveAlarmSnapshot snap in dispatcher.QueryActiveAlarmsAsync(
|
||||
new QueryActiveAlarmsRequest { SessionId = "s1" },
|
||||
CancellationToken.None))
|
||||
{
|
||||
collected.Add(snap);
|
||||
}
|
||||
|
||||
Assert.Empty(collected);
|
||||
}
|
||||
|
||||
private static GatewaySession NewSession(string sessionId)
|
||||
{
|
||||
return new GatewaySession(
|
||||
sessionId,
|
||||
"mxaccess",
|
||||
$"mxaccess-gateway-1-{sessionId}",
|
||||
"nonce",
|
||||
"client-1",
|
||||
"test-session",
|
||||
"client-correlation-1",
|
||||
commandTimeout: TimeSpan.FromSeconds(30),
|
||||
startupTimeout: TimeSpan.FromSeconds(5),
|
||||
shutdownTimeout: TimeSpan.FromSeconds(5),
|
||||
leaseDuration: TimeSpan.FromMinutes(30),
|
||||
openedAt: DateTimeOffset.UtcNow);
|
||||
}
|
||||
|
||||
private sealed class FakeAlarmWorkerClient : IWorkerClient
|
||||
{
|
||||
public string SessionId { get; } = "session-1";
|
||||
public int? ProcessId { get; } = 1;
|
||||
public WorkerClientState State { get; } = WorkerClientState.Ready;
|
||||
public DateTimeOffset LastHeartbeatAt { get; } = DateTimeOffset.UtcNow;
|
||||
|
||||
public Func<WorkerCommand, MxCommandReply>? ReplyFactory { get; set; }
|
||||
public int InvokeCount { get; private set; }
|
||||
|
||||
public Task StartAsync(CancellationToken cancellationToken) => Task.CompletedTask;
|
||||
|
||||
public Task<WorkerCommandReply> InvokeAsync(
|
||||
WorkerCommand command, TimeSpan timeout, CancellationToken cancellationToken)
|
||||
{
|
||||
InvokeCount++;
|
||||
MxCommandReply reply = ReplyFactory?.Invoke(command) ?? new MxCommandReply();
|
||||
return Task.FromResult(new WorkerCommandReply { Reply = reply });
|
||||
}
|
||||
|
||||
public async IAsyncEnumerable<WorkerEvent> ReadEventsAsync(
|
||||
[EnumeratorCancellation] CancellationToken cancellationToken)
|
||||
{
|
||||
await Task.CompletedTask;
|
||||
yield break;
|
||||
}
|
||||
|
||||
public Task ShutdownAsync(TimeSpan timeout, CancellationToken cancellationToken) => Task.CompletedTask;
|
||||
public void Kill(string reason) { }
|
||||
public ValueTask DisposeAsync() => ValueTask.CompletedTask;
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user