184 lines
8.3 KiB
C#
184 lines
8.3 KiB
C#
using System;
|
|
using System.Threading;
|
|
using System.Threading.Tasks;
|
|
using MessagePack;
|
|
using Serilog;
|
|
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend;
|
|
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared;
|
|
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared.Contracts;
|
|
|
|
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Ipc;
|
|
|
|
/// <summary>
|
|
/// Real IPC dispatcher — routes each <see cref="MessageKind"/> to the matching
|
|
/// <see cref="IGalaxyBackend"/> method. Replaces <see cref="StubFrameHandler"/>. Heartbeat
|
|
/// stays handled inline so liveness detection works regardless of backend health.
|
|
/// </summary>
|
|
public sealed class GalaxyFrameHandler(IGalaxyBackend backend, ILogger logger) : IFrameHandler
|
|
{
|
|
public async Task HandleAsync(MessageKind kind, byte[] body, FrameWriter writer, CancellationToken ct)
|
|
{
|
|
try
|
|
{
|
|
switch (kind)
|
|
{
|
|
case MessageKind.Heartbeat:
|
|
{
|
|
var hb = Deserialize<Heartbeat>(body);
|
|
await writer.WriteAsync(MessageKind.HeartbeatAck,
|
|
new HeartbeatAck { SequenceNumber = hb.SequenceNumber, UtcUnixMs = hb.UtcUnixMs }, ct);
|
|
return;
|
|
}
|
|
case MessageKind.OpenSessionRequest:
|
|
{
|
|
var resp = await backend.OpenSessionAsync(Deserialize<OpenSessionRequest>(body), ct);
|
|
await writer.WriteAsync(MessageKind.OpenSessionResponse, resp, ct);
|
|
return;
|
|
}
|
|
case MessageKind.CloseSessionRequest:
|
|
await backend.CloseSessionAsync(Deserialize<CloseSessionRequest>(body), ct);
|
|
return; // one-way
|
|
|
|
case MessageKind.DiscoverHierarchyRequest:
|
|
{
|
|
var resp = await backend.DiscoverAsync(Deserialize<DiscoverHierarchyRequest>(body), ct);
|
|
await writer.WriteAsync(MessageKind.DiscoverHierarchyResponse, resp, ct);
|
|
return;
|
|
}
|
|
case MessageKind.ReadValuesRequest:
|
|
{
|
|
var resp = await backend.ReadValuesAsync(Deserialize<ReadValuesRequest>(body), ct);
|
|
await writer.WriteAsync(MessageKind.ReadValuesResponse, resp, ct);
|
|
return;
|
|
}
|
|
case MessageKind.WriteValuesRequest:
|
|
{
|
|
var resp = await backend.WriteValuesAsync(Deserialize<WriteValuesRequest>(body), ct);
|
|
await writer.WriteAsync(MessageKind.WriteValuesResponse, resp, ct);
|
|
return;
|
|
}
|
|
case MessageKind.SubscribeRequest:
|
|
{
|
|
var resp = await backend.SubscribeAsync(Deserialize<SubscribeRequest>(body), ct);
|
|
await writer.WriteAsync(MessageKind.SubscribeResponse, resp, ct);
|
|
return;
|
|
}
|
|
case MessageKind.UnsubscribeRequest:
|
|
await backend.UnsubscribeAsync(Deserialize<UnsubscribeRequest>(body), ct);
|
|
return; // one-way
|
|
|
|
case MessageKind.AlarmSubscribeRequest:
|
|
await backend.SubscribeAlarmsAsync(Deserialize<AlarmSubscribeRequest>(body), ct);
|
|
return; // one-way; subsequent alarm events are server-pushed
|
|
case MessageKind.AlarmAckRequest:
|
|
await backend.AcknowledgeAlarmAsync(Deserialize<AlarmAckRequest>(body), ct);
|
|
return;
|
|
|
|
case MessageKind.HistoryReadRequest:
|
|
{
|
|
var resp = await backend.HistoryReadAsync(Deserialize<HistoryReadRequest>(body), ct);
|
|
await writer.WriteAsync(MessageKind.HistoryReadResponse, resp, ct);
|
|
return;
|
|
}
|
|
case MessageKind.HistoryReadProcessedRequest:
|
|
{
|
|
var resp = await backend.HistoryReadProcessedAsync(
|
|
Deserialize<HistoryReadProcessedRequest>(body), ct);
|
|
await writer.WriteAsync(MessageKind.HistoryReadProcessedResponse, resp, ct);
|
|
return;
|
|
}
|
|
case MessageKind.HistoryReadAtTimeRequest:
|
|
{
|
|
var resp = await backend.HistoryReadAtTimeAsync(
|
|
Deserialize<HistoryReadAtTimeRequest>(body), ct);
|
|
await writer.WriteAsync(MessageKind.HistoryReadAtTimeResponse, resp, ct);
|
|
return;
|
|
}
|
|
case MessageKind.HistoryReadEventsRequest:
|
|
{
|
|
var resp = await backend.HistoryReadEventsAsync(
|
|
Deserialize<HistoryReadEventsRequest>(body), ct);
|
|
await writer.WriteAsync(MessageKind.HistoryReadEventsResponse, resp, ct);
|
|
return;
|
|
}
|
|
case MessageKind.RecycleHostRequest:
|
|
{
|
|
var resp = await backend.RecycleAsync(Deserialize<RecycleHostRequest>(body), ct);
|
|
await writer.WriteAsync(MessageKind.RecycleStatusResponse, resp, ct);
|
|
return;
|
|
}
|
|
default:
|
|
await SendErrorAsync(writer, "unknown-kind", $"Frame kind {kind} not handled by Host", ct);
|
|
return;
|
|
}
|
|
}
|
|
catch (OperationCanceledException) { throw; }
|
|
catch (Exception ex)
|
|
{
|
|
logger.Error(ex, "GalaxyFrameHandler threw on {Kind}", kind);
|
|
await SendErrorAsync(writer, "handler-exception", ex.Message, ct);
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// Subscribes the backend's server-pushed events for the lifetime of the connection.
|
|
/// The returned disposable unsubscribes when the connection closes — without it the
|
|
/// backend's static event invocation list would accumulate dead writer references and
|
|
/// leak memory + raise <see cref="ObjectDisposedException"/> on every push.
|
|
/// </summary>
|
|
public IDisposable AttachConnection(FrameWriter writer)
|
|
{
|
|
var sink = new ConnectionSink(backend, writer, logger);
|
|
sink.Attach();
|
|
return sink;
|
|
}
|
|
|
|
private static T Deserialize<T>(byte[] body) => MessagePackSerializer.Deserialize<T>(body);
|
|
|
|
private static Task SendErrorAsync(FrameWriter writer, string code, string message, CancellationToken ct)
|
|
=> writer.WriteAsync(MessageKind.ErrorResponse,
|
|
new ErrorResponse { Code = code, Message = message }, ct);
|
|
|
|
private sealed class ConnectionSink : IDisposable
|
|
{
|
|
private readonly IGalaxyBackend _backend;
|
|
private readonly FrameWriter _writer;
|
|
private readonly ILogger _logger;
|
|
private EventHandler<OnDataChangeNotification>? _onData;
|
|
private EventHandler<GalaxyAlarmEvent>? _onAlarm;
|
|
private EventHandler<HostConnectivityStatus>? _onHost;
|
|
|
|
public ConnectionSink(IGalaxyBackend backend, FrameWriter writer, ILogger logger)
|
|
{
|
|
_backend = backend; _writer = writer; _logger = logger;
|
|
}
|
|
|
|
public void Attach()
|
|
{
|
|
_onData = (_, e) => Push(MessageKind.OnDataChangeNotification, e);
|
|
_onAlarm = (_, e) => Push(MessageKind.AlarmEvent, e);
|
|
_onHost = (_, e) => Push(MessageKind.RuntimeStatusChange,
|
|
new RuntimeStatusChangeNotification { Status = e });
|
|
_backend.OnDataChange += _onData;
|
|
_backend.OnAlarmEvent += _onAlarm;
|
|
_backend.OnHostStatusChanged += _onHost;
|
|
}
|
|
|
|
private void Push<T>(MessageKind kind, T payload)
|
|
{
|
|
// Fire-and-forget — pushes can race with disposal of the writer. We swallow
|
|
// ObjectDisposedException because the dispose path will detach this sink shortly.
|
|
try { _writer.WriteAsync(kind, payload, CancellationToken.None).GetAwaiter().GetResult(); }
|
|
catch (ObjectDisposedException) { }
|
|
catch (Exception ex) { _logger.Warning(ex, "ConnectionSink push failed for {Kind}", kind); }
|
|
}
|
|
|
|
public void Dispose()
|
|
{
|
|
if (_onData is not null) _backend.OnDataChange -= _onData;
|
|
if (_onAlarm is not null) _backend.OnAlarmEvent -= _onAlarm;
|
|
if (_onHost is not null) _backend.OnHostStatusChanged -= _onHost;
|
|
}
|
|
}
|
|
}
|