using System.Buffers.Binary; using System.Text; namespace MxNativeCodec; public sealed record NmxCallbackValue( byte WireKind, MxValueKind? ValueKind, object? Value, int EncodedLength); public sealed record NmxSubscriptionRecord( int Status, int? DetailStatus, ushort Quality, DateTime TimestampUtc, byte WireKind, object? Value, int Offset, int Length) { public MxStatus ToDataChangeStatus() { return MxStatus.DataChangeOk; } } public sealed record NmxSubscriptionMessage( byte Command, ushort Version, int RecordCount, Guid OperationId, Guid? ItemCorrelationId, IReadOnlyList Records) { public const byte SubscriptionStatusCommand = 0x32; public const byte DataUpdateCommand = 0x33; public static NmxSubscriptionMessage ParseProcessDataReceivedBody(ReadOnlyMemory body) { var envelope = NmxObservedEnvelope.ParseProcessDataReceivedBodyFlexible(body); return ParseInner(envelope.InnerBody.Span); } public static NmxSubscriptionMessage ParseInner(ReadOnlySpan inner) { if (inner.Length < 23) { throw new ArgumentException("NMX subscription callback body is too short.", nameof(inner)); } byte command = inner[0]; ushort version = BinaryPrimitives.ReadUInt16LittleEndian(inner.Slice(1, sizeof(ushort))); int recordCount = BinaryPrimitives.ReadInt32LittleEndian(inner.Slice(3, sizeof(int))); var operationId = new Guid(inner.Slice(7, 16)); return command switch { SubscriptionStatusCommand => ParseSubscriptionStatus(inner, version, recordCount, operationId), DataUpdateCommand => ParseDataUpdate(inner, version, recordCount, operationId), _ => throw new ArgumentException($"Unsupported NMX subscription callback command 0x{command:X2}.", nameof(inner)), }; } private static NmxSubscriptionMessage ParseDataUpdate( ReadOnlySpan inner, ushort version, int recordCount, Guid operationId) { if (recordCount != 1) { throw new ArgumentException("Observed NMX DataUpdate callback parser currently supports one record per body.", nameof(inner)); } const int recordOffset = 23; var record = ParseRecord(inner, recordOffset, hasDetailStatus: false); return new NmxSubscriptionMessage( DataUpdateCommand, version, recordCount, operationId, null, [record]); } private static NmxSubscriptionMessage ParseSubscriptionStatus( ReadOnlySpan inner, ushort version, int recordCount, Guid operationId) { if (inner.Length < 39) { throw new ArgumentException("NMX SubscriptionStatus callback body is too short.", nameof(inner)); } var itemCorrelationId = new Guid(inner.Slice(23, 16)); int offset = 39; List records = []; for (int i = 0; i < recordCount; i++) { var record = ParseRecord(inner, offset, hasDetailStatus: true); records.Add(record); offset += record.Length; } return new NmxSubscriptionMessage( SubscriptionStatusCommand, version, recordCount, operationId, itemCorrelationId, records); } private static NmxSubscriptionRecord ParseRecord(ReadOnlySpan body, int offset, bool hasDetailStatus) { int minimumLength = hasDetailStatus ? 19 : 15; if (offset < 0 || offset + minimumLength > body.Length) { throw new ArgumentException("NMX subscription record is too short.", nameof(body)); } int start = offset; int status = BinaryPrimitives.ReadInt32LittleEndian(body.Slice(offset, sizeof(int))); offset += sizeof(int); int? detailStatus = null; if (hasDetailStatus) { detailStatus = BinaryPrimitives.ReadInt32LittleEndian(body.Slice(offset, sizeof(int))); offset += sizeof(int); } ushort quality = BinaryPrimitives.ReadUInt16LittleEndian(body.Slice(offset, sizeof(ushort))); offset += sizeof(ushort); long fileTime = BinaryPrimitives.ReadInt64LittleEndian(body.Slice(offset, sizeof(long))); offset += sizeof(long); byte wireKind = body[offset++]; var value = DecodeValue(wireKind, body[offset..]); offset += value.EncodedLength; return new NmxSubscriptionRecord( status, detailStatus, quality, DateTime.FromFileTimeUtc(fileTime), wireKind, value.Value, start, offset - start); } private static NmxCallbackValue DecodeValue(byte wireKind, ReadOnlySpan body) { if (body.Length == 0) { return new NmxCallbackValue(wireKind, ToValueKindOrNull(wireKind), null, 0); } return wireKind switch { 0x01 when body.Length >= 1 => new NmxCallbackValue(wireKind, MxValueKind.Boolean, body[0] != 0, 1), 0x02 when body.Length >= sizeof(int) => new NmxCallbackValue(wireKind, MxValueKind.Int32, BinaryPrimitives.ReadInt32LittleEndian(body[..sizeof(int)]), sizeof(int)), 0x03 when body.Length >= sizeof(float) => new NmxCallbackValue(wireKind, MxValueKind.Float32, BitConverter.Int32BitsToSingle(BinaryPrimitives.ReadInt32LittleEndian(body[..sizeof(int)])), sizeof(float)), 0x04 when body.Length >= sizeof(double) => new NmxCallbackValue(wireKind, MxValueKind.Float64, BitConverter.Int64BitsToDouble(BinaryPrimitives.ReadInt64LittleEndian(body[..sizeof(long)])), sizeof(double)), 0x05 => DecodeStringValue(wireKind, body), 0x06 => DecodeDateTimeValue(wireKind, body), 0x07 => DecodeElapsedTimeValue(wireKind, body), 0x41 or 0x42 or 0x43 or 0x44 or 0x45 or 0x46 => DecodeArrayValue(wireKind, body), _ => new NmxCallbackValue(wireKind, ToValueKindOrNull(wireKind), null, 0), }; } private static NmxCallbackValue DecodeStringValue(byte wireKind, ReadOnlySpan body) { if (body.Length < sizeof(int)) { return new NmxCallbackValue(wireKind, MxValueKind.String, null, 0); } int recordLength = BinaryPrimitives.ReadInt32LittleEndian(body[..sizeof(int)]); if (recordLength == sizeof(int)) { return new NmxCallbackValue(wireKind, MxValueKind.String, string.Empty, sizeof(int)); } if (body.Length < 8) { return new NmxCallbackValue(wireKind, MxValueKind.String, null, 0); } int textByteLength = BinaryPrimitives.ReadInt32LittleEndian(body.Slice(4, sizeof(int))); if (recordLength < 8 || textByteLength < 0 || recordLength != textByteLength + 4 || body.Length < 8 + textByteLength) { return new NmxCallbackValue(wireKind, MxValueKind.String, null, 0); } ReadOnlySpan textBytes = body.Slice(8, textByteLength); if (textBytes.Length >= 2 && textBytes[^2] == 0 && textBytes[^1] == 0) { textBytes = textBytes[..^2]; } string value = Encoding.Unicode.GetString(textBytes); return new NmxCallbackValue(wireKind, MxValueKind.String, value, 8 + textByteLength); } private static NmxCallbackValue DecodeDateTimeValue(byte wireKind, ReadOnlySpan body) { if (body.Length >= 14) { int recordLength = BinaryPrimitives.ReadInt32LittleEndian(body[..sizeof(int)]); if (recordLength >= 10 && body.Length >= sizeof(int) + recordLength) { long fileTime = BinaryPrimitives.ReadInt64LittleEndian(body.Slice(sizeof(int), sizeof(long))); if (TryFromFileTimeUtc(fileTime, out DateTime timestamp)) { return new NmxCallbackValue( wireKind, MxValueKind.DateTime, timestamp, sizeof(int) + recordLength); } return new NmxCallbackValue(wireKind, MxValueKind.DateTime, null, sizeof(int) + recordLength); } } if (body.Length >= sizeof(long)) { long fileTime = BinaryPrimitives.ReadInt64LittleEndian(body[..sizeof(long)]); if (TryFromFileTimeUtc(fileTime, out DateTime timestamp)) { return new NmxCallbackValue(wireKind, MxValueKind.DateTime, timestamp, sizeof(long)); } } return new NmxCallbackValue(wireKind, MxValueKind.DateTime, null, 0); } private static NmxCallbackValue DecodeElapsedTimeValue(byte wireKind, ReadOnlySpan body) { if (body.Length < sizeof(int)) { return new NmxCallbackValue(wireKind, MxValueKind.ElapsedTime, null, 0); } int milliseconds = BinaryPrimitives.ReadInt32LittleEndian(body[..sizeof(int)]); return new NmxCallbackValue(wireKind, MxValueKind.ElapsedTime, TimeSpan.FromMilliseconds(milliseconds), sizeof(int)); } private static NmxCallbackValue DecodeArrayValue(byte wireKind, ReadOnlySpan body) { const int arrayHeaderLength = 10; if (body.Length < arrayHeaderLength) { return new NmxCallbackValue(wireKind, ToValueKindOrNull(wireKind), null, 0); } ushort count = BinaryPrimitives.ReadUInt16LittleEndian(body.Slice(4, sizeof(ushort))); int elementWidth = BinaryPrimitives.ReadInt32LittleEndian(body.Slice(6, sizeof(int))); ReadOnlySpan values = body[arrayHeaderLength..]; return wireKind switch { 0x41 => DecodeBooleanArray(wireKind, count, elementWidth, body.Length, values), 0x42 => DecodeInt32Array(wireKind, count, elementWidth, body.Length, values), 0x43 => DecodeFloat32Array(wireKind, count, elementWidth, body.Length, values), 0x44 => DecodeFloat64Array(wireKind, count, elementWidth, body.Length, values), 0x45 => DecodeStringArray(wireKind, count, values), 0x46 => DecodeDateTimeArray(wireKind, count, elementWidth, body.Length, values), _ => new NmxCallbackValue(wireKind, ToValueKindOrNull(wireKind), null, 0), }; } private static NmxCallbackValue DecodeBooleanArray(byte wireKind, ushort count, int elementWidth, int bodyLength, ReadOnlySpan values) { if (elementWidth != sizeof(short) || values.Length < count * elementWidth) { return new NmxCallbackValue(wireKind, MxValueKind.BooleanArray, null, 0); } bool[] decoded = new bool[count]; for (int i = 0; i < count; i++) { decoded[i] = BinaryPrimitives.ReadInt16LittleEndian(values.Slice(i * elementWidth, sizeof(short))) != 0; } return new NmxCallbackValue(wireKind, MxValueKind.BooleanArray, decoded, 10 + count * elementWidth); } private static NmxCallbackValue DecodeInt32Array(byte wireKind, ushort count, int elementWidth, int bodyLength, ReadOnlySpan values) { if (elementWidth != sizeof(int) || values.Length < count * elementWidth) { return new NmxCallbackValue(wireKind, MxValueKind.Int32Array, null, 0); } int[] decoded = new int[count]; for (int i = 0; i < count; i++) { decoded[i] = BinaryPrimitives.ReadInt32LittleEndian(values.Slice(i * elementWidth, sizeof(int))); } return new NmxCallbackValue(wireKind, MxValueKind.Int32Array, decoded, 10 + count * elementWidth); } private static NmxCallbackValue DecodeFloat32Array(byte wireKind, ushort count, int elementWidth, int bodyLength, ReadOnlySpan values) { if (elementWidth != sizeof(float) || values.Length < count * elementWidth) { return new NmxCallbackValue(wireKind, MxValueKind.Float32Array, null, 0); } float[] decoded = new float[count]; for (int i = 0; i < count; i++) { decoded[i] = BitConverter.Int32BitsToSingle(BinaryPrimitives.ReadInt32LittleEndian(values.Slice(i * elementWidth, sizeof(int)))); } return new NmxCallbackValue(wireKind, MxValueKind.Float32Array, decoded, 10 + count * elementWidth); } private static NmxCallbackValue DecodeFloat64Array(byte wireKind, ushort count, int elementWidth, int bodyLength, ReadOnlySpan values) { if (elementWidth != sizeof(double) || values.Length < count * elementWidth) { return new NmxCallbackValue(wireKind, MxValueKind.Float64Array, null, 0); } double[] decoded = new double[count]; for (int i = 0; i < count; i++) { decoded[i] = BitConverter.Int64BitsToDouble(BinaryPrimitives.ReadInt64LittleEndian(values.Slice(i * elementWidth, sizeof(long)))); } return new NmxCallbackValue(wireKind, MxValueKind.Float64Array, decoded, 10 + count * elementWidth); } private static NmxCallbackValue DecodeDateTimeArray(byte wireKind, ushort count, int elementWidth, int bodyLength, ReadOnlySpan values) { if (elementWidth != 12 || values.Length < count * elementWidth) { return new NmxCallbackValue(wireKind, MxValueKind.DateTimeArray, null, 0); } DateTime[] decoded = new DateTime[count]; for (int i = 0; i < count; i++) { long fileTime = BinaryPrimitives.ReadInt64LittleEndian(values.Slice(i * elementWidth, sizeof(long))); decoded[i] = DateTime.FromFileTimeUtc(fileTime); } return new NmxCallbackValue(wireKind, MxValueKind.DateTimeArray, decoded, 10 + count * elementWidth); } private static NmxCallbackValue DecodeStringArray(byte wireKind, ushort count, ReadOnlySpan values) { string[] decoded = new string[count]; int offset = 0; for (int i = 0; i < count; i++) { if (offset + 13 > values.Length) { return new NmxCallbackValue(wireKind, MxValueKind.StringArray, null, 0); } int recordLength = BinaryPrimitives.ReadInt32LittleEndian(values.Slice(offset, sizeof(int))); byte elementKind = values[offset + 4]; int textRecordLength = BinaryPrimitives.ReadInt32LittleEndian(values.Slice(offset + 5, sizeof(int))); int textByteLength = BinaryPrimitives.ReadInt32LittleEndian(values.Slice(offset + 9, sizeof(int))); if (recordLength < 9 || elementKind != 0x05 || textRecordLength != textByteLength + sizeof(int) || recordLength != 1 + sizeof(int) + sizeof(int) + textByteLength || offset + 13 + textByteLength > values.Length) { return new NmxCallbackValue(wireKind, MxValueKind.StringArray, null, 0); } ReadOnlySpan textBytes = values.Slice(offset + 13, textByteLength); if (textBytes.Length >= 2 && textBytes[^2] == 0 && textBytes[^1] == 0) { textBytes = textBytes[..^2]; } decoded[i] = Encoding.Unicode.GetString(textBytes); offset += 13 + textByteLength; } return new NmxCallbackValue(wireKind, MxValueKind.StringArray, decoded, 10 + offset); } private static MxValueKind? ToValueKindOrNull(byte wireKind) { return wireKind switch { 0x01 => MxValueKind.Boolean, 0x02 => MxValueKind.Int32, 0x03 => MxValueKind.Float32, 0x04 => MxValueKind.Float64, 0x05 => MxValueKind.String, 0x06 => MxValueKind.DateTime, 0x07 => MxValueKind.ElapsedTime, 0x41 => MxValueKind.BooleanArray, 0x42 => MxValueKind.Int32Array, 0x43 => MxValueKind.Float32Array, 0x44 => MxValueKind.Float64Array, 0x45 => MxValueKind.StringArray, 0x46 => MxValueKind.DateTimeArray, _ => null, }; } private static bool TryFromFileTimeUtc(long fileTime, out DateTime timestamp) { try { timestamp = DateTime.FromFileTimeUtc(fileTime); return true; } catch (ArgumentOutOfRangeException) { timestamp = default; return false; } } }