using System.Collections.Concurrent;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using MxGateway.Client;
using MxGateway.Contracts.Proto;
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Runtime;
///
/// Production over .
/// For each batch entry: lazy-AddItem to obtain the MXAccess item handle, encode
/// the value via , route through Write or WriteSecured
/// based on the per-tag , and translate the
/// reply's MxStatusProxy into an OPC UA .
///
///
/// Item handle cache survives across writes — repeated writes to the same tag avoid
/// re-AddItem. Per-tag failures are isolated: one bad write doesn't fail the batch.
/// PR 4.4 will share this cache with the subscription registry; for now it lives
/// here so the writer is independently testable.
///
public sealed class GatewayGalaxyDataWriter : IGalaxyDataWriter
{
private readonly GalaxyMxSession _session;
private readonly int _writeUserId;
private readonly ILogger _logger;
private readonly ConcurrentDictionary _itemHandles =
new(StringComparer.OrdinalIgnoreCase);
public GatewayGalaxyDataWriter(GalaxyMxSession session, int writeUserId, ILogger? logger = null)
{
_session = session ?? throw new ArgumentNullException(nameof(session));
_writeUserId = writeUserId;
_logger = logger ?? NullLogger.Instance;
}
public async Task> WriteAsync(
IReadOnlyList writes,
Func securityResolver,
CancellationToken cancellationToken)
{
ArgumentNullException.ThrowIfNull(writes);
ArgumentNullException.ThrowIfNull(securityResolver);
var session = _session.Session
?? throw new InvalidOperationException(
"GalaxyMxSession is not connected. Call ConnectAsync before issuing writes.");
var serverHandle = _session.ServerHandle;
var results = new WriteResult[writes.Count];
for (var i = 0; i < writes.Count; i++)
{
results[i] = await WriteOneAsync(session, serverHandle, writes[i],
securityResolver(writes[i].FullReference), cancellationToken)
.ConfigureAwait(false);
}
return results;
}
private async Task WriteOneAsync(
MxGatewaySession session, int serverHandle, WriteRequest request,
SecurityClassification classification, CancellationToken ct)
{
try
{
var itemHandle = await EnsureItemHandleAsync(session, serverHandle, request.FullReference, ct)
.ConfigureAwait(false);
var mxValue = MxValueEncoder.Encode(request.Value);
var reply = NeedsSecuredWrite(classification)
? await InvokeWriteSecuredAsync(session, serverHandle, itemHandle, mxValue, ct).ConfigureAwait(false)
: await session.WriteRawAsync(serverHandle, itemHandle, mxValue, _writeUserId, ct).ConfigureAwait(false);
return TranslateReply(reply, request.FullReference);
}
catch (ArgumentException ex)
{
// Bad value type — caller passed a CLR type the encoder can't render.
_logger.LogWarning(ex,
"GalaxyDriver write rejected — unsupported value type for {FullRef}", request.FullReference);
return new WriteResult(StatusCodeMap.BadInternalError);
}
catch (OperationCanceledException) when (ct.IsCancellationRequested) { throw; }
catch (Exception ex)
{
_logger.LogWarning(ex, "GalaxyDriver write failed for {FullRef}", request.FullReference);
return new WriteResult(StatusCodeMap.BadCommunicationError);
}
}
private static bool NeedsSecuredWrite(SecurityClassification classification) =>
classification is SecurityClassification.SecuredWrite or SecurityClassification.VerifiedWrite;
private async Task EnsureItemHandleAsync(
MxGatewaySession session, int serverHandle, string fullRef, CancellationToken ct)
{
if (_itemHandles.TryGetValue(fullRef, out var existing)) return existing;
var handle = await session.AddItemAsync(serverHandle, fullRef, ct).ConfigureAwait(false);
_itemHandles[fullRef] = handle;
return handle;
}
///
/// Issue a WriteSecured command. The high-level session client doesn't expose
/// WriteSecuredAsync as a typed method — we build the
/// directly and route through InvokeAsync. Verifier user is left at zero
/// for SecuredWrite; VerifiedWrite uses the same path because the gw's worker
/// interprets the underlying MXAccess command kind.
///
private static Task InvokeWriteSecuredAsync(
MxGatewaySession session, int serverHandle, int itemHandle, MxValue value, CancellationToken ct)
{
var command = new MxCommand
{
Kind = MxCommandKind.WriteSecured,
WriteSecured = new WriteSecuredCommand
{
ServerHandle = serverHandle,
ItemHandle = itemHandle,
Value = value,
CurrentUserId = 0,
VerifierUserId = 0,
},
};
var request = new MxCommandRequest
{
SessionId = session.SessionId,
ClientCorrelationId = Guid.NewGuid().ToString("N"),
Command = command,
};
return session.InvokeAsync(request, ct);
}
///
/// Translate a gateway into an OPC UA
/// . Honours the protocol-level Status field first
/// (transport / dispatch failures), then the first MXAccess status row.
///
private WriteResult TranslateReply(MxCommandReply reply, string fullRef)
{
// Protocol status — wraps transport / worker-side failures that happen before
// MXAccess saw the command.
if (reply.ProtocolStatus is { } proto && proto.Code != ProtocolStatusCode.Ok)
{
_logger.LogWarning(
"GalaxyDriver write protocol failure {Code} for {FullRef}: {Message}",
proto.Code, fullRef, proto.Message);
return new WriteResult(StatusCodeMap.BadCommunicationError);
}
// MX-side status — the worker's WriteCompleteEvent rolls into the reply's
// statuses array. Use the first row (single-write contract).
if (reply.Statuses.Count > 0)
{
var status = reply.Statuses[0];
return new WriteResult(StatusCodeMap.FromMxStatus(status, _logger));
}
return new WriteResult(StatusCodeMap.Good);
}
}