diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StoreParity.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StoreParity.cs new file mode 100644 index 0000000..69e6681 --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StoreParity.cs @@ -0,0 +1,276 @@ +using System.Text; +using ZB.MOM.NatsNet.Server.Internal.DataStructures; + +namespace ZB.MOM.NatsNet.Server; + +public static class StoreParity +{ + private const byte StreamStateMagic = 42; + private const byte StreamStateVersion = 1; + private const byte RunLengthMagic = 33; + private const byte SequenceSetMagic = 22; + private const int StreamHeaderLength = 2; + private const int MaxVarIntLength = 10; + private const int ConsumerStateHeaderSize = 6 * MaxVarIntLength + StreamHeaderLength; + private const long NanosecondsPerSecond = 1_000_000_000L; + + public static readonly Exception ErrLastSeqMismatch = new InvalidOperationException("last sequence mismatch"); + public static readonly Exception ErrFirstSequenceMismatch = new InvalidOperationException("first sequence mismatch"); + public static readonly Exception ErrCatchupAbortedNoLeader = new InvalidOperationException("catchup aborted no leader"); + public static readonly Exception ErrCatchupTooManyRetries = new InvalidOperationException("catchup too many retries"); + + public static bool IsEncodedStreamState(byte[]? buf) + => buf is { Length: >= StreamHeaderLength } && + buf[0] == StreamStateMagic && + buf[1] == StreamStateVersion; + + public static (StreamReplicatedState? State, Exception? Error) DecodeStreamState(byte[]? buf) + { + if (!IsEncodedStreamState(buf)) + return (null, StoreErrors.ErrBadStreamStateEncoding); + + var data = buf!; + var state = new StreamReplicatedState(); + var index = StreamHeaderLength; + + if (!TryReadUVarInt(data, ref index, out var msgs) || + !TryReadUVarInt(data, ref index, out var bytes) || + !TryReadUVarInt(data, ref index, out var firstSeq) || + !TryReadUVarInt(data, ref index, out var lastSeq) || + !TryReadUVarInt(data, ref index, out var failed)) + return (null, StoreErrors.ErrCorruptStreamState); + + state.Msgs = msgs; + state.Bytes = bytes; + state.FirstSeq = firstSeq; + state.LastSeq = lastSeq; + state.Failed = failed; + + if (!TryReadUVarInt(data, ref index, out var numDeleted)) + return (null, StoreErrors.ErrCorruptStreamState); + + if (numDeleted > 0) + { + while (index < data.Length) + { + switch (data[index]) + { + case SequenceSetMagic: + try + { + var (set, bytesRead) = SequenceSet.Decode(data.AsSpan(index)); + if (bytesRead <= 0) + return (null, StoreErrors.ErrCorruptStreamState); + + index += bytesRead; + state.Deleted.Add(new SequenceSetDeleteBlock(set)); + } + catch (InvalidDataException) + { + return (null, StoreErrors.ErrCorruptStreamState); + } + break; + case RunLengthMagic: + index++; + if (!TryReadUVarInt(data, ref index, out var first) || + !TryReadUVarInt(data, ref index, out var num)) + return (null, StoreErrors.ErrCorruptStreamState); + + state.Deleted.Add(new DeleteRange + { + First = first, + Num = num, + }); + break; + default: + return (null, StoreErrors.ErrCorruptStreamState); + } + } + } + + return (state, null); + } + + public static byte[] EncodeConsumerState(ConsumerState state) + { + ArgumentNullException.ThrowIfNull(state); + + var pendingCount = state.Pending?.Count ?? 0; + var redeliveredCount = state.Redelivered?.Count ?? 0; + + var maxSize = ConsumerStateHeaderSize; + if (pendingCount > 0) + maxSize += pendingCount * (3 * MaxVarIntLength) + MaxVarIntLength; + if (redeliveredCount > 0) + maxSize += redeliveredCount * (2 * MaxVarIntLength) + MaxVarIntLength; + + var buffer = new byte[maxSize]; + buffer[0] = SequenceSetMagic; + buffer[1] = 2; + + var offset = StreamHeaderLength; + WriteUVarInt(buffer, ref offset, state.AckFloor.Consumer); + WriteUVarInt(buffer, ref offset, state.AckFloor.Stream); + WriteUVarInt(buffer, ref offset, state.Delivered.Consumer); + WriteUVarInt(buffer, ref offset, state.Delivered.Stream); + WriteUVarInt(buffer, ref offset, (ulong)pendingCount); + + var ackStreamFloor = state.AckFloor.Stream; + var ackConsumerFloor = state.AckFloor.Consumer; + + if (pendingCount > 0) + { + var minTs = DateTimeOffset.UtcNow.ToUnixTimeSeconds(); + WriteVarInt(buffer, ref offset, minTs); + + foreach (var entry in state.Pending!) + { + var pending = entry.Value ?? new Pending(); + WriteUVarInt(buffer, ref offset, entry.Key - ackStreamFloor); + WriteUVarInt(buffer, ref offset, pending.Sequence - ackConsumerFloor); + + var tsSeconds = pending.Timestamp / NanosecondsPerSecond; + WriteVarInt(buffer, ref offset, minTs - tsSeconds); + } + } + + WriteUVarInt(buffer, ref offset, (ulong)redeliveredCount); + if (redeliveredCount > 0) + { + foreach (var entry in state.Redelivered!) + { + WriteUVarInt(buffer, ref offset, entry.Key - ackStreamFloor); + WriteUVarInt(buffer, ref offset, entry.Value); + } + } + + return buffer[..offset]; + } + + public static bool IsOutOfSpaceErr(Exception? error) + => error != null && error.Message.Contains("no space left", StringComparison.Ordinal); + + public static bool IsClusterResetErr(Exception? error) + => ContainsException(error, ErrLastSeqMismatch) || + ContainsException(error, StoreErrors.ErrStoreEOF) || + ContainsException(error, ErrFirstSequenceMismatch) || + ContainsException(error, ErrCatchupAbortedNoLeader) || + ContainsException(error, ErrCatchupTooManyRetries); + + public static string BytesToString(byte[]? bytes) + => bytes is not { Length: > 0 } ? string.Empty : Encoding.Latin1.GetString(bytes); + + public static byte[]? StringToBytes(string text) + => string.IsNullOrEmpty(text) ? null : Encoding.Latin1.GetBytes(text); + + public static string CopyString(string text) + { + if (string.IsNullOrEmpty(text)) + return string.Empty; + + return new string(text.AsSpan()); + } + + public static bool IsPermissionError(Exception? error) + => AnyException(error, static ex => ex is UnauthorizedAccessException); + + private static bool TryReadUVarInt(ReadOnlySpan source, ref int index, out ulong value) + { + if ((uint)index >= (uint)source.Length) + { + value = 0; + index = -1; + return false; + } + + value = 0; + var shift = 0; + for (var i = 0; i < MaxVarIntLength; i++) + { + if ((uint)index >= (uint)source.Length) + { + value = 0; + index = -1; + return false; + } + + var b = source[index++]; + if (b < 0x80) + { + if (i == MaxVarIntLength - 1 && b > 1) + { + value = 0; + index = -1; + return false; + } + + value |= (ulong)b << shift; + return true; + } + + value |= (ulong)(b & 0x7F) << shift; + shift += 7; + } + + value = 0; + index = -1; + return false; + } + + private static void WriteUVarInt(byte[] buffer, ref int offset, ulong value) + { + while (value >= 0x80) + { + buffer[offset++] = (byte)(value | 0x80); + value >>= 7; + } + + buffer[offset++] = (byte)value; + } + + private static void WriteVarInt(byte[] buffer, ref int offset, long value) + { + var encoded = (ulong)value << 1; + if (value < 0) + encoded = ~encoded; + + WriteUVarInt(buffer, ref offset, encoded); + } + + private static bool ContainsException(Exception? source, Exception target) + => AnyException(source, ex => ReferenceEquals(ex, target)); + + private static bool AnyException(Exception? source, Func matcher) + { + if (source == null) + return false; + + if (matcher(source)) + return true; + + if (source is AggregateException aggregate) + { + foreach (var inner in aggregate.InnerExceptions) + { + if (AnyException(inner, matcher)) + return true; + } + } + + return source.InnerException != null && AnyException(source.InnerException, matcher); + } + + private sealed class SequenceSetDeleteBlock(SequenceSet set) : IDeleteBlock + { + private readonly SequenceSet _set = set; + + public (ulong First, ulong Last, ulong Num) GetState() + { + var (min, max, count) = _set.State(); + return (min, max, count); + } + + public void Range(Func f) + => _set.Range(f); + } +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StoreTypes.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StoreTypes.cs index 442847e..0783c52 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StoreTypes.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StoreTypes.cs @@ -97,17 +97,47 @@ public sealed class StoreMsg public ulong Seq { get; set; } public long Ts { get; set; } + /// Copy this message's fields into . + public void Copy(StoreMsg dst) + { + ArgumentNullException.ThrowIfNull(dst); + + if (Buf.Length > 0) + { + var copiedBuffer = new byte[Buf.Length]; + Buffer.BlockCopy(Buf, 0, copiedBuffer, 0, Buf.Length); + dst.Buf = copiedBuffer; + var headerLength = Math.Min(Hdr.Length, copiedBuffer.Length); + dst.Hdr = copiedBuffer[..headerLength]; + dst.Msg = copiedBuffer[headerLength..]; + } + else + { + var headerCopy = new byte[Hdr.Length]; + Buffer.BlockCopy(Hdr, 0, headerCopy, 0, Hdr.Length); + + var messageCopy = new byte[Msg.Length]; + Buffer.BlockCopy(Msg, 0, messageCopy, 0, Msg.Length); + + dst.Hdr = headerCopy; + dst.Msg = messageCopy; + + var combined = new byte[headerCopy.Length + messageCopy.Length]; + Buffer.BlockCopy(headerCopy, 0, combined, 0, headerCopy.Length); + Buffer.BlockCopy(messageCopy, 0, combined, headerCopy.Length, messageCopy.Length); + dst.Buf = combined; + } + + dst.Subject = Subject; + dst.Seq = Seq; + dst.Ts = Ts; + } + /// Copy fields from another StoreMsg into this one. public void CopyFrom(StoreMsg src) { - var newBuf = new byte[src.Buf.Length]; - src.Buf.CopyTo(newBuf, 0); - Buf = newBuf; - Hdr = newBuf[..src.Hdr.Length]; - Msg = newBuf[src.Hdr.Length..]; - Subject = src.Subject; - Seq = src.Seq; - Ts = src.Ts; + ArgumentNullException.ThrowIfNull(src); + src.Copy(this); } /// Clear all fields, resetting to an empty message. @@ -353,6 +383,9 @@ public sealed class DeleteRange : IDeleteBlock public ulong First { get; set; } public ulong Num { get; set; } + public (ulong First, ulong Last, ulong Num) State() + => GetState(); + public (ulong First, ulong Last, ulong Num) GetState() { var deletesAfterFirst = Num > 0 ? Num - 1 : 0; @@ -383,6 +416,9 @@ public sealed class DeleteSlice : IDeleteBlock _seqs = seqs; } + public (ulong First, ulong Last, ulong Num) State() + => GetState(); + public (ulong First, ulong Last, ulong Num) GetState() { if (_seqs.Length == 0) diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/StoreTypesTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/StoreTypesTests.cs new file mode 100644 index 0000000..5dfbedd --- /dev/null +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/StoreTypesTests.cs @@ -0,0 +1,261 @@ +using System.Text; +using Shouldly; +using ZB.MOM.NatsNet.Server; + +namespace ZB.MOM.NatsNet.Server.Tests.JetStream; + +public class StoreTypesTests +{ + [Fact] + public void IsEncodedStreamState_ValidHeader_ReturnsTrue() + { + var buffer = new byte[] { 42, 1, 0 }; + StoreParity.IsEncodedStreamState(buffer).ShouldBeTrue(); + } + + [Fact] + public void IsEncodedStreamState_InvalidHeader_ReturnsFalse() + { + StoreParity.IsEncodedStreamState(Array.Empty()).ShouldBeFalse(); + StoreParity.IsEncodedStreamState(new byte[] { 42 }).ShouldBeFalse(); + StoreParity.IsEncodedStreamState(new byte[] { 41, 1 }).ShouldBeFalse(); + StoreParity.IsEncodedStreamState(new byte[] { 42, 2 }).ShouldBeFalse(); + } + + [Fact] + public void DecodeStreamState_ValidRunLengthBlock_ReturnsParsedState() + { + var buffer = new List { 42, 1 }; + AppendUVarInt(buffer, 10); // msgs + AppendUVarInt(buffer, 2048); // bytes + AppendUVarInt(buffer, 3); // first + AppendUVarInt(buffer, 12); // last + AppendUVarInt(buffer, 4); // failed + AppendUVarInt(buffer, 1); // numDeleted + buffer.Add(33); // runLengthMagic + AppendUVarInt(buffer, 5); // first + AppendUVarInt(buffer, 4); // num + + var (state, error) = StoreParity.DecodeStreamState(buffer.ToArray()); + + error.ShouldBeNull(); + state.ShouldNotBeNull(); + state.Msgs.ShouldBe(10UL); + state.Bytes.ShouldBe(2048UL); + state.FirstSeq.ShouldBe(3UL); + state.LastSeq.ShouldBe(12UL); + state.Failed.ShouldBe(4UL); + state.Deleted.Count.ShouldBe(1); + var (first, last, num) = state.Deleted[0].GetState(); + first.ShouldBe(5UL); + last.ShouldBe(8UL); + num.ShouldBe(4UL); + } + + [Fact] + public void DecodeStreamState_BadHeader_ReturnsBadEncodingError() + { + var (state, error) = StoreParity.DecodeStreamState(new byte[] { 1, 1, 1 }); + state.ShouldBeNull(); + error.ShouldBe(StoreErrors.ErrBadStreamStateEncoding); + } + + [Fact] + public void DecodeStreamState_CorruptPayload_ReturnsCorruptError() + { + var badVarint = new byte[] { 42, 1, 0x80 }; + var (state, error) = StoreParity.DecodeStreamState(badVarint); + state.ShouldBeNull(); + error.ShouldBe(StoreErrors.ErrCorruptStreamState); + } + + [Fact] + public void DecodeStreamState_UnknownDeleteBlockMarker_ReturnsCorruptError() + { + var buffer = new List { 42, 1 }; + AppendUVarInt(buffer, 1); + AppendUVarInt(buffer, 1); + AppendUVarInt(buffer, 1); + AppendUVarInt(buffer, 1); + AppendUVarInt(buffer, 0); + AppendUVarInt(buffer, 1); + buffer.Add(99); + + var (state, error) = StoreParity.DecodeStreamState(buffer.ToArray()); + state.ShouldBeNull(); + error.ShouldBe(StoreErrors.ErrCorruptStreamState); + } + + [Fact] + public void DeleteRange_GetState_UsesRunLengthParity() + { + var dr = new DeleteRange { First = 7, Num = 3 }; + var (first, last, num) = dr.GetState(); + first.ShouldBe(7UL); + last.ShouldBe(9UL); + num.ShouldBe(3UL); + } + + [Fact] + public void DeleteSlice_GetState_EmptyAndNonEmptyParity() + { + var empty = new DeleteSlice(Array.Empty()); + empty.GetState().ShouldBe((0UL, 0UL, 0UL)); + + var nonEmpty = new DeleteSlice(new ulong[] { 2, 4, 9 }); + nonEmpty.GetState().ShouldBe((2UL, 9UL, 3UL)); + } + + [Fact] + public void EncodeConsumerState_WithPendingAndRedelivered_EncodesExpectedShape() + { + var pendingTs = 1_700_000_012_000_000_000L; // ns + var state = new ConsumerState + { + AckFloor = new SequencePair { Consumer = 20, Stream = 100 }, + Delivered = new SequencePair { Consumer = 25, Stream = 110 }, + Pending = new Dictionary + { + [104] = new Pending { Sequence = 23, Timestamp = pendingTs }, + }, + Redelivered = new Dictionary + { + [108] = 2, + }, + }; + + var encoded = StoreParity.EncodeConsumerState(state); + + encoded[0].ShouldBe((byte)22); + encoded[1].ShouldBe((byte)2); + + var index = 2; + ReadUVarInt(encoded, ref index).ShouldBe(20UL); + ReadUVarInt(encoded, ref index).ShouldBe(100UL); + ReadUVarInt(encoded, ref index).ShouldBe(25UL); + ReadUVarInt(encoded, ref index).ShouldBe(110UL); + ReadUVarInt(encoded, ref index).ShouldBe(1UL); + + var minTs = ReadVarInt(encoded, ref index); + var pendingStreamDelta = ReadUVarInt(encoded, ref index); + var pendingConsumerDelta = ReadUVarInt(encoded, ref index); + var pendingTsDelta = ReadVarInt(encoded, ref index); + + (100UL + pendingStreamDelta).ShouldBe(104UL); + (20UL + pendingConsumerDelta).ShouldBe(23UL); + (minTs - pendingTsDelta).ShouldBe(pendingTs / 1_000_000_000L); + + ReadUVarInt(encoded, ref index).ShouldBe(1UL); + var redeliveredStreamDelta = ReadUVarInt(encoded, ref index); + var redeliveredCount = ReadUVarInt(encoded, ref index); + (100UL + redeliveredStreamDelta).ShouldBe(108UL); + redeliveredCount.ShouldBe(2UL); + } + + [Fact] + public void IsOutOfSpaceErr_MessageContainsNoSpaceLeft_ReturnsTrue() + { + StoreParity.IsOutOfSpaceErr(new IOException("disk full: no space left on device")).ShouldBeTrue(); + StoreParity.IsOutOfSpaceErr(new IOException("permission denied")).ShouldBeFalse(); + StoreParity.IsOutOfSpaceErr(null).ShouldBeFalse(); + } + + [Fact] + public void IsClusterResetErr_KnownSentinels_ReturnsExpected() + { + StoreParity.IsClusterResetErr(StoreParity.ErrLastSeqMismatch).ShouldBeTrue(); + StoreParity.IsClusterResetErr(StoreErrors.ErrStoreEOF).ShouldBeTrue(); + StoreParity.IsClusterResetErr(StoreParity.ErrFirstSequenceMismatch).ShouldBeTrue(); + StoreParity.IsClusterResetErr(new InvalidOperationException("other")).ShouldBeFalse(); + } + + [Fact] + public void Copy_SourceToDestination_PreservesAndIsolatesBuffers() + { + var source = new StoreMsg + { + Subject = "ORDERS.created", + Hdr = Encoding.ASCII.GetBytes("HDR"), + Msg = Encoding.ASCII.GetBytes("BODY"), + Buf = Encoding.ASCII.GetBytes("HDRBODY"), + Seq = 42, + Ts = 9001, + }; + var destination = new StoreMsg(); + + source.Copy(destination); + + destination.Subject.ShouldBe(source.Subject); + destination.Seq.ShouldBe(42UL); + destination.Ts.ShouldBe(9001L); + destination.Hdr.SequenceEqual(Encoding.ASCII.GetBytes("HDR")).ShouldBeTrue(); + destination.Msg.SequenceEqual(Encoding.ASCII.GetBytes("BODY")).ShouldBeTrue(); + destination.Buf.SequenceEqual(Encoding.ASCII.GetBytes("HDRBODY")).ShouldBeTrue(); + + source.Buf[0] = (byte)'X'; + destination.Buf[0].ShouldBe((byte)'H'); + } + + [Fact] + public void BytesToString_StringToBytes_CopyString_ParityBehavior() + { + StoreParity.BytesToString(Array.Empty()).ShouldBe(string.Empty); + StoreParity.StringToBytes(string.Empty).ShouldBeNull(); + + var raw = new byte[] { 0, 255, 65 }; + var text = StoreParity.BytesToString(raw); + var roundtrip = StoreParity.StringToBytes(text); + + roundtrip.ShouldNotBeNull(); + roundtrip!.SequenceEqual(raw).ShouldBeTrue(); + + var original = new string(new[] { 'n', 'a', 't', 's' }); + var copy = StoreParity.CopyString(original); + copy.ShouldBe(original); + ReferenceEquals(copy, original).ShouldBeFalse(); + } + + [Fact] + public void IsPermissionError_UnauthorizedAccess_ReturnsTrue() + { + StoreParity.IsPermissionError(new UnauthorizedAccessException("no access")).ShouldBeTrue(); + StoreParity.IsPermissionError(new IOException("other")).ShouldBeFalse(); + StoreParity.IsPermissionError(null).ShouldBeFalse(); + } + + private static void AppendUVarInt(List buffer, ulong value) + { + while (value >= 0x80) + { + buffer.Add((byte)(value | 0x80)); + value >>= 7; + } + buffer.Add((byte)value); + } + + private static ulong ReadUVarInt(byte[] buffer, ref int index) + { + ulong value = 0; + var shift = 0; + while (index < buffer.Length) + { + var b = buffer[index++]; + if (b < 0x80) + { + value |= (ulong)b << shift; + return value; + } + + value |= (ulong)(b & 0x7F) << shift; + shift += 7; + } + + throw new InvalidDataException("Unexpected end of varint"); + } + + private static long ReadVarInt(byte[] buffer, ref int index) + { + var uv = ReadUVarInt(buffer, ref index); + return (long)((uv >> 1) ^ (ulong)-(long)(uv & 1)); + } +} diff --git a/porting.db b/porting.db index 1425209..5110e57 100644 Binary files a/porting.db and b/porting.db differ