feat(batch8): implement store codec/helpers parity group A

This commit is contained in:
Joseph Doherty
2026-02-28 11:49:13 -05:00
parent 365712b912
commit cfb49ef477
4 changed files with 581 additions and 8 deletions

View File

@@ -0,0 +1,261 @@
using System.Text;
using Shouldly;
using ZB.MOM.NatsNet.Server;
namespace ZB.MOM.NatsNet.Server.Tests.JetStream;
public class StoreTypesTests
{
[Fact]
public void IsEncodedStreamState_ValidHeader_ReturnsTrue()
{
var buffer = new byte[] { 42, 1, 0 };
StoreParity.IsEncodedStreamState(buffer).ShouldBeTrue();
}
[Fact]
public void IsEncodedStreamState_InvalidHeader_ReturnsFalse()
{
StoreParity.IsEncodedStreamState(Array.Empty<byte>()).ShouldBeFalse();
StoreParity.IsEncodedStreamState(new byte[] { 42 }).ShouldBeFalse();
StoreParity.IsEncodedStreamState(new byte[] { 41, 1 }).ShouldBeFalse();
StoreParity.IsEncodedStreamState(new byte[] { 42, 2 }).ShouldBeFalse();
}
[Fact]
public void DecodeStreamState_ValidRunLengthBlock_ReturnsParsedState()
{
var buffer = new List<byte> { 42, 1 };
AppendUVarInt(buffer, 10); // msgs
AppendUVarInt(buffer, 2048); // bytes
AppendUVarInt(buffer, 3); // first
AppendUVarInt(buffer, 12); // last
AppendUVarInt(buffer, 4); // failed
AppendUVarInt(buffer, 1); // numDeleted
buffer.Add(33); // runLengthMagic
AppendUVarInt(buffer, 5); // first
AppendUVarInt(buffer, 4); // num
var (state, error) = StoreParity.DecodeStreamState(buffer.ToArray());
error.ShouldBeNull();
state.ShouldNotBeNull();
state.Msgs.ShouldBe(10UL);
state.Bytes.ShouldBe(2048UL);
state.FirstSeq.ShouldBe(3UL);
state.LastSeq.ShouldBe(12UL);
state.Failed.ShouldBe(4UL);
state.Deleted.Count.ShouldBe(1);
var (first, last, num) = state.Deleted[0].GetState();
first.ShouldBe(5UL);
last.ShouldBe(8UL);
num.ShouldBe(4UL);
}
[Fact]
public void DecodeStreamState_BadHeader_ReturnsBadEncodingError()
{
var (state, error) = StoreParity.DecodeStreamState(new byte[] { 1, 1, 1 });
state.ShouldBeNull();
error.ShouldBe(StoreErrors.ErrBadStreamStateEncoding);
}
[Fact]
public void DecodeStreamState_CorruptPayload_ReturnsCorruptError()
{
var badVarint = new byte[] { 42, 1, 0x80 };
var (state, error) = StoreParity.DecodeStreamState(badVarint);
state.ShouldBeNull();
error.ShouldBe(StoreErrors.ErrCorruptStreamState);
}
[Fact]
public void DecodeStreamState_UnknownDeleteBlockMarker_ReturnsCorruptError()
{
var buffer = new List<byte> { 42, 1 };
AppendUVarInt(buffer, 1);
AppendUVarInt(buffer, 1);
AppendUVarInt(buffer, 1);
AppendUVarInt(buffer, 1);
AppendUVarInt(buffer, 0);
AppendUVarInt(buffer, 1);
buffer.Add(99);
var (state, error) = StoreParity.DecodeStreamState(buffer.ToArray());
state.ShouldBeNull();
error.ShouldBe(StoreErrors.ErrCorruptStreamState);
}
[Fact]
public void DeleteRange_GetState_UsesRunLengthParity()
{
var dr = new DeleteRange { First = 7, Num = 3 };
var (first, last, num) = dr.GetState();
first.ShouldBe(7UL);
last.ShouldBe(9UL);
num.ShouldBe(3UL);
}
[Fact]
public void DeleteSlice_GetState_EmptyAndNonEmptyParity()
{
var empty = new DeleteSlice(Array.Empty<ulong>());
empty.GetState().ShouldBe((0UL, 0UL, 0UL));
var nonEmpty = new DeleteSlice(new ulong[] { 2, 4, 9 });
nonEmpty.GetState().ShouldBe((2UL, 9UL, 3UL));
}
[Fact]
public void EncodeConsumerState_WithPendingAndRedelivered_EncodesExpectedShape()
{
var pendingTs = 1_700_000_012_000_000_000L; // ns
var state = new ConsumerState
{
AckFloor = new SequencePair { Consumer = 20, Stream = 100 },
Delivered = new SequencePair { Consumer = 25, Stream = 110 },
Pending = new Dictionary<ulong, Pending>
{
[104] = new Pending { Sequence = 23, Timestamp = pendingTs },
},
Redelivered = new Dictionary<ulong, ulong>
{
[108] = 2,
},
};
var encoded = StoreParity.EncodeConsumerState(state);
encoded[0].ShouldBe((byte)22);
encoded[1].ShouldBe((byte)2);
var index = 2;
ReadUVarInt(encoded, ref index).ShouldBe(20UL);
ReadUVarInt(encoded, ref index).ShouldBe(100UL);
ReadUVarInt(encoded, ref index).ShouldBe(25UL);
ReadUVarInt(encoded, ref index).ShouldBe(110UL);
ReadUVarInt(encoded, ref index).ShouldBe(1UL);
var minTs = ReadVarInt(encoded, ref index);
var pendingStreamDelta = ReadUVarInt(encoded, ref index);
var pendingConsumerDelta = ReadUVarInt(encoded, ref index);
var pendingTsDelta = ReadVarInt(encoded, ref index);
(100UL + pendingStreamDelta).ShouldBe(104UL);
(20UL + pendingConsumerDelta).ShouldBe(23UL);
(minTs - pendingTsDelta).ShouldBe(pendingTs / 1_000_000_000L);
ReadUVarInt(encoded, ref index).ShouldBe(1UL);
var redeliveredStreamDelta = ReadUVarInt(encoded, ref index);
var redeliveredCount = ReadUVarInt(encoded, ref index);
(100UL + redeliveredStreamDelta).ShouldBe(108UL);
redeliveredCount.ShouldBe(2UL);
}
[Fact]
public void IsOutOfSpaceErr_MessageContainsNoSpaceLeft_ReturnsTrue()
{
StoreParity.IsOutOfSpaceErr(new IOException("disk full: no space left on device")).ShouldBeTrue();
StoreParity.IsOutOfSpaceErr(new IOException("permission denied")).ShouldBeFalse();
StoreParity.IsOutOfSpaceErr(null).ShouldBeFalse();
}
[Fact]
public void IsClusterResetErr_KnownSentinels_ReturnsExpected()
{
StoreParity.IsClusterResetErr(StoreParity.ErrLastSeqMismatch).ShouldBeTrue();
StoreParity.IsClusterResetErr(StoreErrors.ErrStoreEOF).ShouldBeTrue();
StoreParity.IsClusterResetErr(StoreParity.ErrFirstSequenceMismatch).ShouldBeTrue();
StoreParity.IsClusterResetErr(new InvalidOperationException("other")).ShouldBeFalse();
}
[Fact]
public void Copy_SourceToDestination_PreservesAndIsolatesBuffers()
{
var source = new StoreMsg
{
Subject = "ORDERS.created",
Hdr = Encoding.ASCII.GetBytes("HDR"),
Msg = Encoding.ASCII.GetBytes("BODY"),
Buf = Encoding.ASCII.GetBytes("HDRBODY"),
Seq = 42,
Ts = 9001,
};
var destination = new StoreMsg();
source.Copy(destination);
destination.Subject.ShouldBe(source.Subject);
destination.Seq.ShouldBe(42UL);
destination.Ts.ShouldBe(9001L);
destination.Hdr.SequenceEqual(Encoding.ASCII.GetBytes("HDR")).ShouldBeTrue();
destination.Msg.SequenceEqual(Encoding.ASCII.GetBytes("BODY")).ShouldBeTrue();
destination.Buf.SequenceEqual(Encoding.ASCII.GetBytes("HDRBODY")).ShouldBeTrue();
source.Buf[0] = (byte)'X';
destination.Buf[0].ShouldBe((byte)'H');
}
[Fact]
public void BytesToString_StringToBytes_CopyString_ParityBehavior()
{
StoreParity.BytesToString(Array.Empty<byte>()).ShouldBe(string.Empty);
StoreParity.StringToBytes(string.Empty).ShouldBeNull();
var raw = new byte[] { 0, 255, 65 };
var text = StoreParity.BytesToString(raw);
var roundtrip = StoreParity.StringToBytes(text);
roundtrip.ShouldNotBeNull();
roundtrip!.SequenceEqual(raw).ShouldBeTrue();
var original = new string(new[] { 'n', 'a', 't', 's' });
var copy = StoreParity.CopyString(original);
copy.ShouldBe(original);
ReferenceEquals(copy, original).ShouldBeFalse();
}
[Fact]
public void IsPermissionError_UnauthorizedAccess_ReturnsTrue()
{
StoreParity.IsPermissionError(new UnauthorizedAccessException("no access")).ShouldBeTrue();
StoreParity.IsPermissionError(new IOException("other")).ShouldBeFalse();
StoreParity.IsPermissionError(null).ShouldBeFalse();
}
private static void AppendUVarInt(List<byte> buffer, ulong value)
{
while (value >= 0x80)
{
buffer.Add((byte)(value | 0x80));
value >>= 7;
}
buffer.Add((byte)value);
}
private static ulong ReadUVarInt(byte[] buffer, ref int index)
{
ulong value = 0;
var shift = 0;
while (index < buffer.Length)
{
var b = buffer[index++];
if (b < 0x80)
{
value |= (ulong)b << shift;
return value;
}
value |= (ulong)(b & 0x7F) << shift;
shift += 7;
}
throw new InvalidDataException("Unexpected end of varint");
}
private static long ReadVarInt(byte[] buffer, ref int index)
{
var uv = ReadUVarInt(buffer, ref index);
return (long)((uv >> 1) ^ (ulong)-(long)(uv & 1));
}
}