Files
natsnet/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StoreParity.cs
2026-02-28 11:53:44 -05:00

519 lines
18 KiB
C#

using System.Text;
using System.Text.Json;
using System.Text.Json.Serialization;
using ZB.MOM.NatsNet.Server.Internal.DataStructures;
namespace ZB.MOM.NatsNet.Server;
public static class StoreParity
{
private const byte StreamStateMagic = 42;
private const byte StreamStateVersion = 1;
private const byte RunLengthMagic = 33;
private const byte SequenceSetMagic = 22;
private const int StreamHeaderLength = 2;
private const int MaxVarIntLength = 10;
private const int ConsumerStateHeaderSize = 6 * MaxVarIntLength + StreamHeaderLength;
private const long NanosecondsPerSecond = 1_000_000_000L;
public static readonly Exception ErrLastSeqMismatch = new InvalidOperationException("last sequence mismatch");
public static readonly Exception ErrFirstSequenceMismatch = new InvalidOperationException("first sequence mismatch");
public static readonly Exception ErrCatchupAbortedNoLeader = new InvalidOperationException("catchup aborted no leader");
public static readonly Exception ErrCatchupTooManyRetries = new InvalidOperationException("catchup too many retries");
public static bool IsEncodedStreamState(byte[]? buf)
=> buf is { Length: >= StreamHeaderLength } &&
buf[0] == StreamStateMagic &&
buf[1] == StreamStateVersion;
public static (StreamReplicatedState? State, Exception? Error) DecodeStreamState(byte[]? buf)
{
if (!IsEncodedStreamState(buf))
return (null, StoreErrors.ErrBadStreamStateEncoding);
var data = buf!;
var state = new StreamReplicatedState();
var index = StreamHeaderLength;
if (!TryReadUVarInt(data, ref index, out var msgs) ||
!TryReadUVarInt(data, ref index, out var bytes) ||
!TryReadUVarInt(data, ref index, out var firstSeq) ||
!TryReadUVarInt(data, ref index, out var lastSeq) ||
!TryReadUVarInt(data, ref index, out var failed))
return (null, StoreErrors.ErrCorruptStreamState);
state.Msgs = msgs;
state.Bytes = bytes;
state.FirstSeq = firstSeq;
state.LastSeq = lastSeq;
state.Failed = failed;
if (!TryReadUVarInt(data, ref index, out var numDeleted))
return (null, StoreErrors.ErrCorruptStreamState);
if (numDeleted > 0)
{
while (index < data.Length)
{
switch (data[index])
{
case SequenceSetMagic:
try
{
var (set, bytesRead) = SequenceSet.Decode(data.AsSpan(index));
if (bytesRead <= 0)
return (null, StoreErrors.ErrCorruptStreamState);
index += bytesRead;
state.Deleted.Add(new SequenceSetDeleteBlock(set));
}
catch (InvalidDataException)
{
return (null, StoreErrors.ErrCorruptStreamState);
}
break;
case RunLengthMagic:
index++;
if (!TryReadUVarInt(data, ref index, out var first) ||
!TryReadUVarInt(data, ref index, out var num))
return (null, StoreErrors.ErrCorruptStreamState);
state.Deleted.Add(new DeleteRange
{
First = first,
Num = num,
});
break;
default:
return (null, StoreErrors.ErrCorruptStreamState);
}
}
}
return (state, null);
}
public static byte[] EncodeConsumerState(ConsumerState state)
{
ArgumentNullException.ThrowIfNull(state);
var pendingCount = state.Pending?.Count ?? 0;
var redeliveredCount = state.Redelivered?.Count ?? 0;
var maxSize = ConsumerStateHeaderSize;
if (pendingCount > 0)
maxSize += pendingCount * (3 * MaxVarIntLength) + MaxVarIntLength;
if (redeliveredCount > 0)
maxSize += redeliveredCount * (2 * MaxVarIntLength) + MaxVarIntLength;
var buffer = new byte[maxSize];
buffer[0] = SequenceSetMagic;
buffer[1] = 2;
var offset = StreamHeaderLength;
WriteUVarInt(buffer, ref offset, state.AckFloor.Consumer);
WriteUVarInt(buffer, ref offset, state.AckFloor.Stream);
WriteUVarInt(buffer, ref offset, state.Delivered.Consumer);
WriteUVarInt(buffer, ref offset, state.Delivered.Stream);
WriteUVarInt(buffer, ref offset, (ulong)pendingCount);
var ackStreamFloor = state.AckFloor.Stream;
var ackConsumerFloor = state.AckFloor.Consumer;
if (pendingCount > 0)
{
var minTs = DateTimeOffset.UtcNow.ToUnixTimeSeconds();
WriteVarInt(buffer, ref offset, minTs);
foreach (var entry in state.Pending!)
{
var pending = entry.Value ?? new Pending();
WriteUVarInt(buffer, ref offset, entry.Key - ackStreamFloor);
WriteUVarInt(buffer, ref offset, pending.Sequence - ackConsumerFloor);
var tsSeconds = pending.Timestamp / NanosecondsPerSecond;
WriteVarInt(buffer, ref offset, minTs - tsSeconds);
}
}
WriteUVarInt(buffer, ref offset, (ulong)redeliveredCount);
if (redeliveredCount > 0)
{
foreach (var entry in state.Redelivered!)
{
WriteUVarInt(buffer, ref offset, entry.Key - ackStreamFloor);
WriteUVarInt(buffer, ref offset, entry.Value);
}
}
return buffer[..offset];
}
public static bool IsOutOfSpaceErr(Exception? error)
=> error != null && error.Message.Contains("no space left", StringComparison.Ordinal);
public static bool IsClusterResetErr(Exception? error)
=> ContainsException(error, ErrLastSeqMismatch) ||
ContainsException(error, StoreErrors.ErrStoreEOF) ||
ContainsException(error, ErrFirstSequenceMismatch) ||
ContainsException(error, ErrCatchupAbortedNoLeader) ||
ContainsException(error, ErrCatchupTooManyRetries);
public static string BytesToString(byte[]? bytes)
=> bytes is not { Length: > 0 } ? string.Empty : Encoding.Latin1.GetString(bytes);
public static byte[]? StringToBytes(string text)
=> string.IsNullOrEmpty(text) ? null : Encoding.Latin1.GetBytes(text);
public static string CopyString(string text)
{
if (string.IsNullOrEmpty(text))
return string.Empty;
return new string(text.AsSpan());
}
public static bool IsPermissionError(Exception? error)
=> AnyException(error, static ex => ex is UnauthorizedAccessException);
private static bool TryReadUVarInt(ReadOnlySpan<byte> source, ref int index, out ulong value)
{
if ((uint)index >= (uint)source.Length)
{
value = 0;
index = -1;
return false;
}
value = 0;
var shift = 0;
for (var i = 0; i < MaxVarIntLength; i++)
{
if ((uint)index >= (uint)source.Length)
{
value = 0;
index = -1;
return false;
}
var b = source[index++];
if (b < 0x80)
{
if (i == MaxVarIntLength - 1 && b > 1)
{
value = 0;
index = -1;
return false;
}
value |= (ulong)b << shift;
return true;
}
value |= (ulong)(b & 0x7F) << shift;
shift += 7;
}
value = 0;
index = -1;
return false;
}
private static void WriteUVarInt(byte[] buffer, ref int offset, ulong value)
{
while (value >= 0x80)
{
buffer[offset++] = (byte)(value | 0x80);
value >>= 7;
}
buffer[offset++] = (byte)value;
}
private static void WriteVarInt(byte[] buffer, ref int offset, long value)
{
var encoded = (ulong)value << 1;
if (value < 0)
encoded = ~encoded;
WriteUVarInt(buffer, ref offset, encoded);
}
private static bool ContainsException(Exception? source, Exception target)
=> AnyException(source, ex => ReferenceEquals(ex, target));
private static bool AnyException(Exception? source, Func<Exception, bool> matcher)
{
if (source == null)
return false;
if (matcher(source))
return true;
if (source is AggregateException aggregate)
{
foreach (var inner in aggregate.InnerExceptions)
{
if (AnyException(inner, matcher))
return true;
}
}
return source.InnerException != null && AnyException(source.InnerException, matcher);
}
private sealed class SequenceSetDeleteBlock(SequenceSet set) : IDeleteBlock
{
private readonly SequenceSet _set = set;
public (ulong First, ulong Last, ulong Num) GetState()
{
var (min, max, count) = _set.State();
return (min, max, count);
}
public void Range(Func<ulong, bool> f)
=> _set.Range(f);
}
}
public static class StoreEnumParityExtensions
{
public static string String(this RetentionPolicy value)
=> value switch
{
RetentionPolicy.LimitsPolicy => "Limits",
RetentionPolicy.InterestPolicy => "Interest",
RetentionPolicy.WorkQueuePolicy => "WorkQueue",
_ => "Unknown Retention Policy",
};
public static string String(this DiscardPolicy value)
=> value switch
{
DiscardPolicy.DiscardOld => "DiscardOld",
DiscardPolicy.DiscardNew => "DiscardNew",
_ => "Unknown Discard Policy",
};
public static string String(this StorageType value)
=> value switch
{
StorageType.MemoryStorage => "Memory",
StorageType.FileStorage => "File",
_ => "Unknown Storage Type",
};
}
public sealed class RetentionPolicyJsonConverter : JsonConverter<RetentionPolicy>
{
public override RetentionPolicy Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options)
{
if (reader.TokenType != JsonTokenType.String)
throw new JsonException("can not unmarshal token");
return reader.GetString() switch
{
"limits" => RetentionPolicy.LimitsPolicy,
"interest" => RetentionPolicy.InterestPolicy,
"workqueue" => RetentionPolicy.WorkQueuePolicy,
var value => throw new JsonException($"can not unmarshal \"{value}\""),
};
}
public override void Write(Utf8JsonWriter writer, RetentionPolicy value, JsonSerializerOptions options)
{
switch (value)
{
case RetentionPolicy.LimitsPolicy:
writer.WriteStringValue("limits");
break;
case RetentionPolicy.InterestPolicy:
writer.WriteStringValue("interest");
break;
case RetentionPolicy.WorkQueuePolicy:
writer.WriteStringValue("workqueue");
break;
default:
throw new JsonException($"can not marshal {value}");
}
}
}
public sealed class DiscardPolicyJsonConverter : JsonConverter<DiscardPolicy>
{
public override DiscardPolicy Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options)
{
if (reader.TokenType != JsonTokenType.String)
throw new JsonException("can not unmarshal token");
var token = reader.GetString() ?? string.Empty;
return token.ToLowerInvariant() switch
{
"old" => DiscardPolicy.DiscardOld,
"new" => DiscardPolicy.DiscardNew,
_ => throw new JsonException($"can not unmarshal \"{token}\""),
};
}
public override void Write(Utf8JsonWriter writer, DiscardPolicy value, JsonSerializerOptions options)
{
switch (value)
{
case DiscardPolicy.DiscardOld:
writer.WriteStringValue("old");
break;
case DiscardPolicy.DiscardNew:
writer.WriteStringValue("new");
break;
default:
throw new JsonException($"can not marshal {value}");
}
}
}
public sealed class StorageTypeJsonConverter : JsonConverter<StorageType>
{
public override StorageType Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options)
{
if (reader.TokenType != JsonTokenType.String)
throw new JsonException("can not unmarshal token");
return reader.GetString() switch
{
"memory" => StorageType.MemoryStorage,
"file" => StorageType.FileStorage,
var value => throw new JsonException($"can not unmarshal \"{value}\""),
};
}
public override void Write(Utf8JsonWriter writer, StorageType value, JsonSerializerOptions options)
{
switch (value)
{
case StorageType.MemoryStorage:
writer.WriteStringValue("memory");
break;
case StorageType.FileStorage:
writer.WriteStringValue("file");
break;
default:
throw new JsonException($"can not marshal {value}");
}
}
}
public sealed class AckPolicyJsonConverter : JsonConverter<AckPolicy>
{
public override AckPolicy Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options)
{
if (reader.TokenType != JsonTokenType.String)
throw new JsonException("can not unmarshal token");
return reader.GetString() switch
{
"none" => AckPolicy.AckNone,
"all" => AckPolicy.AckAll,
"explicit" => AckPolicy.AckExplicit,
var value => throw new JsonException($"can not unmarshal \"{value}\""),
};
}
public override void Write(Utf8JsonWriter writer, AckPolicy value, JsonSerializerOptions options)
{
switch (value)
{
case AckPolicy.AckNone:
writer.WriteStringValue("none");
break;
case AckPolicy.AckAll:
writer.WriteStringValue("all");
break;
case AckPolicy.AckExplicit:
writer.WriteStringValue("explicit");
break;
default:
throw new JsonException($"can not marshal {value}");
}
}
}
public sealed class ReplayPolicyJsonConverter : JsonConverter<ReplayPolicy>
{
public override ReplayPolicy Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options)
{
if (reader.TokenType != JsonTokenType.String)
throw new JsonException("can not unmarshal token");
return reader.GetString() switch
{
"instant" => ReplayPolicy.ReplayInstant,
"original" => ReplayPolicy.ReplayOriginal,
var value => throw new JsonException($"can not unmarshal \"{value}\""),
};
}
public override void Write(Utf8JsonWriter writer, ReplayPolicy value, JsonSerializerOptions options)
{
switch (value)
{
case ReplayPolicy.ReplayInstant:
writer.WriteStringValue("instant");
break;
case ReplayPolicy.ReplayOriginal:
writer.WriteStringValue("original");
break;
default:
throw new JsonException($"can not marshal {value}");
}
}
}
public sealed class DeliverPolicyJsonConverter : JsonConverter<DeliverPolicy>
{
public override DeliverPolicy Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options)
{
if (reader.TokenType != JsonTokenType.String)
throw new JsonException("can not unmarshal token");
return reader.GetString() switch
{
"all" or "undefined" => DeliverPolicy.DeliverAll,
"last" => DeliverPolicy.DeliverLast,
"last_per_subject" => DeliverPolicy.DeliverLastPerSubject,
"new" => DeliverPolicy.DeliverNew,
"by_start_sequence" => DeliverPolicy.DeliverByStartSequence,
"by_start_time" => DeliverPolicy.DeliverByStartTime,
var value => throw new JsonException($"can not unmarshal \"{value}\""),
};
}
public override void Write(Utf8JsonWriter writer, DeliverPolicy value, JsonSerializerOptions options)
{
switch (value)
{
case DeliverPolicy.DeliverAll:
writer.WriteStringValue("all");
break;
case DeliverPolicy.DeliverLast:
writer.WriteStringValue("last");
break;
case DeliverPolicy.DeliverLastPerSubject:
writer.WriteStringValue("last_per_subject");
break;
case DeliverPolicy.DeliverNew:
writer.WriteStringValue("new");
break;
case DeliverPolicy.DeliverByStartSequence:
writer.WriteStringValue("by_start_sequence");
break;
case DeliverPolicy.DeliverByStartTime:
writer.WriteStringValue("by_start_time");
break;
default:
writer.WriteStringValue("undefined");
break;
}
}
}