feat(batch11): complete filestore init feature and test port
This commit is contained in:
@@ -111,6 +111,17 @@ public sealed class JetStreamFileStore : IStreamStore, IDisposable
|
||||
// to the memory store implementation while file-specific APIs are added on top.
|
||||
private readonly JetStreamMemStore _memStore;
|
||||
private static readonly ArrayPool<byte> MsgBlockBufferPool = ArrayPool<byte>.Shared;
|
||||
private static readonly object InitLock = new();
|
||||
private static SemaphoreSlim? _diskIoSlots;
|
||||
private static int _diskIoCount;
|
||||
private const int ConsumerHeaderLength = 2;
|
||||
private const int MaxVarIntLength = 10;
|
||||
private const long NanosecondsPerSecond = 1_000_000_000L;
|
||||
|
||||
static JetStreamFileStore()
|
||||
{
|
||||
Init();
|
||||
}
|
||||
|
||||
// -----------------------------------------------------------------------
|
||||
// Constructor
|
||||
@@ -468,6 +479,224 @@ public sealed class JetStreamFileStore : IStreamStore, IDisposable
|
||||
return SubjectsEqual;
|
||||
}
|
||||
|
||||
internal static long TimestampNormalized(DateTime t)
|
||||
{
|
||||
if (t == default)
|
||||
return 0;
|
||||
|
||||
var utc = t.Kind == DateTimeKind.Utc ? t : t.ToUniversalTime();
|
||||
return (utc - DateTime.UnixEpoch).Ticks * 100L;
|
||||
}
|
||||
|
||||
internal static void Init()
|
||||
{
|
||||
var mp = Environment.ProcessorCount;
|
||||
var nIo = Math.Min(16, Math.Max(4, mp));
|
||||
if (mp > 32)
|
||||
nIo = Math.Max(16, Math.Min(mp, mp / 2));
|
||||
|
||||
lock (InitLock)
|
||||
{
|
||||
if (_diskIoSlots != null && _diskIoCount == nIo)
|
||||
return;
|
||||
|
||||
_diskIoSlots?.Dispose();
|
||||
_diskIoSlots = new SemaphoreSlim(nIo, nIo);
|
||||
_diskIoCount = nIo;
|
||||
}
|
||||
}
|
||||
|
||||
internal static (byte Version, Exception? Error) CheckConsumerHeader(byte[]? hdr)
|
||||
{
|
||||
if (hdr is not { Length: >= ConsumerHeaderLength } || hdr[0] != FileStoreDefaults.FileStoreMagic)
|
||||
return (0, new InvalidDataException("corrupt state"));
|
||||
|
||||
var version = hdr[1];
|
||||
return version switch
|
||||
{
|
||||
FileStoreDefaults.FileStoreVersion or FileStoreDefaults.NewVersion => (version, null),
|
||||
_ => (0, new InvalidDataException($"unsupported version: {version}")),
|
||||
};
|
||||
}
|
||||
|
||||
internal static (ConsumerState? State, Exception? Error) DecodeConsumerState(byte[]? buf)
|
||||
{
|
||||
if (buf == null)
|
||||
return (null, new InvalidDataException("corrupt state"));
|
||||
|
||||
var (version, headerErr) = CheckConsumerHeader(buf);
|
||||
if (headerErr != null)
|
||||
return (null, headerErr);
|
||||
|
||||
var index = ConsumerHeaderLength;
|
||||
if (!TryReadUVarInt(buf, ref index, out var ackConsumer) ||
|
||||
!TryReadUVarInt(buf, ref index, out var ackStream) ||
|
||||
!TryReadUVarInt(buf, ref index, out var deliveredConsumer) ||
|
||||
!TryReadUVarInt(buf, ref index, out var deliveredStream))
|
||||
{
|
||||
return (null, new InvalidDataException("corrupt state"));
|
||||
}
|
||||
|
||||
var state = new ConsumerState
|
||||
{
|
||||
AckFloor = new SequencePair { Consumer = ackConsumer, Stream = ackStream },
|
||||
Delivered = new SequencePair { Consumer = deliveredConsumer, Stream = deliveredStream },
|
||||
};
|
||||
|
||||
if (version == FileStoreDefaults.FileStoreVersion)
|
||||
{
|
||||
if (state.AckFloor.Consumer > 1)
|
||||
state.Delivered.Consumer += state.AckFloor.Consumer - 1;
|
||||
if (state.AckFloor.Stream > 1)
|
||||
state.Delivered.Stream += state.AckFloor.Stream - 1;
|
||||
}
|
||||
|
||||
const ulong highBit = 1UL << 63;
|
||||
if ((state.AckFloor.Stream & highBit) != 0 || (state.Delivered.Stream & highBit) != 0)
|
||||
return (null, new InvalidDataException("corrupt state"));
|
||||
|
||||
if (!TryReadUVarInt(buf, ref index, out var pendingCount))
|
||||
return (null, new InvalidDataException("corrupt state"));
|
||||
|
||||
if (pendingCount > 0)
|
||||
{
|
||||
if (!TryReadVarInt(buf, ref index, out var minTs))
|
||||
return (null, new InvalidDataException("corrupt state"));
|
||||
|
||||
state.Pending = new Dictionary<ulong, Pending>((int)pendingCount);
|
||||
for (var i = 0; i < (int)pendingCount; i++)
|
||||
{
|
||||
if (!TryReadUVarInt(buf, ref index, out var sseq))
|
||||
return (null, new InvalidDataException("corrupt state"));
|
||||
|
||||
var dseq = 0UL;
|
||||
if (version == FileStoreDefaults.NewVersion && !TryReadUVarInt(buf, ref index, out dseq))
|
||||
return (null, new InvalidDataException("corrupt state"));
|
||||
|
||||
if (!TryReadVarInt(buf, ref index, out var ts))
|
||||
return (null, new InvalidDataException("corrupt state"));
|
||||
|
||||
sseq += state.AckFloor.Stream;
|
||||
if (sseq == 0)
|
||||
return (null, new InvalidDataException("corrupt state"));
|
||||
|
||||
if (version == FileStoreDefaults.NewVersion)
|
||||
dseq += state.AckFloor.Consumer;
|
||||
|
||||
var adjustedTs = version == FileStoreDefaults.FileStoreVersion
|
||||
? (ts + minTs) * NanosecondsPerSecond
|
||||
: (minTs - ts) * NanosecondsPerSecond;
|
||||
|
||||
state.Pending[sseq] = new Pending
|
||||
{
|
||||
Sequence = dseq,
|
||||
Timestamp = adjustedTs,
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
if (!TryReadUVarInt(buf, ref index, out var redeliveredCount))
|
||||
return (null, new InvalidDataException("corrupt state"));
|
||||
|
||||
if (redeliveredCount > 0)
|
||||
{
|
||||
state.Redelivered = new Dictionary<ulong, ulong>((int)redeliveredCount);
|
||||
for (var i = 0; i < (int)redeliveredCount; i++)
|
||||
{
|
||||
if (!TryReadUVarInt(buf, ref index, out var seq) ||
|
||||
!TryReadUVarInt(buf, ref index, out var count))
|
||||
{
|
||||
return (null, new InvalidDataException("corrupt state"));
|
||||
}
|
||||
|
||||
if (seq > 0 && count > 0)
|
||||
{
|
||||
seq += state.AckFloor.Stream;
|
||||
state.Redelivered[seq] = count;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return (state, null);
|
||||
}
|
||||
|
||||
internal static Exception? WriteFileWithSync(string name, byte[] data, UnixFileMode perm)
|
||||
=> WriteAtomically(name, data, perm, sync: true);
|
||||
|
||||
internal static Exception? WriteAtomically(string name, byte[] data, UnixFileMode perm, bool sync)
|
||||
{
|
||||
ArgumentException.ThrowIfNullOrWhiteSpace(name);
|
||||
ArgumentNullException.ThrowIfNull(data);
|
||||
|
||||
Init();
|
||||
var slots = _diskIoSlots!;
|
||||
var tmp = name + ".tmp";
|
||||
|
||||
slots.Wait();
|
||||
try
|
||||
{
|
||||
var options = sync ? FileOptions.WriteThrough : FileOptions.None;
|
||||
using (var stream = new FileStream(
|
||||
tmp,
|
||||
FileMode.Create,
|
||||
FileAccess.Write,
|
||||
FileShare.None,
|
||||
bufferSize: 4096,
|
||||
options))
|
||||
{
|
||||
stream.Write(data, 0, data.Length);
|
||||
stream.Flush(sync);
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
File.SetUnixFileMode(tmp, perm);
|
||||
}
|
||||
catch (PlatformNotSupportedException)
|
||||
{
|
||||
}
|
||||
|
||||
File.Move(tmp, name, overwrite: true);
|
||||
|
||||
if (sync)
|
||||
{
|
||||
var dir = Path.GetDirectoryName(Path.GetFullPath(name));
|
||||
if (!string.IsNullOrEmpty(dir))
|
||||
{
|
||||
try
|
||||
{
|
||||
using var handle = File.OpenHandle(dir, FileMode.Open, FileAccess.Read, FileShare.ReadWrite | FileShare.Delete);
|
||||
RandomAccess.FlushToDisk(handle);
|
||||
}
|
||||
catch
|
||||
{
|
||||
// Best-effort directory metadata sync.
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
try
|
||||
{
|
||||
if (File.Exists(tmp))
|
||||
File.Delete(tmp);
|
||||
}
|
||||
catch
|
||||
{
|
||||
// Best-effort cleanup.
|
||||
}
|
||||
|
||||
return ex;
|
||||
}
|
||||
finally
|
||||
{
|
||||
slots.Release();
|
||||
}
|
||||
}
|
||||
|
||||
internal static AeadCipher GenEncryptionKey(StoreCipher sc, byte[] seed)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(seed);
|
||||
@@ -557,6 +786,57 @@ public sealed class JetStreamFileStore : IStreamStore, IDisposable
|
||||
return DateTimeOffset.FromUnixTimeSeconds(seconds).AddTicks(remainderNanos / 100L).UtcDateTime;
|
||||
}
|
||||
|
||||
private static bool TryReadUVarInt(ReadOnlySpan<byte> source, ref int index, out ulong value)
|
||||
{
|
||||
value = 0;
|
||||
var shift = 0;
|
||||
for (var i = 0; i < MaxVarIntLength; i++)
|
||||
{
|
||||
if ((uint)index >= (uint)source.Length)
|
||||
{
|
||||
index = -1;
|
||||
value = 0;
|
||||
return false;
|
||||
}
|
||||
|
||||
var b = source[index++];
|
||||
if (b < 0x80)
|
||||
{
|
||||
if (i == MaxVarIntLength - 1 && b > 1)
|
||||
{
|
||||
index = -1;
|
||||
value = 0;
|
||||
return false;
|
||||
}
|
||||
|
||||
value |= (ulong)b << shift;
|
||||
return true;
|
||||
}
|
||||
|
||||
value |= (ulong)(b & 0x7F) << shift;
|
||||
shift += 7;
|
||||
}
|
||||
|
||||
index = -1;
|
||||
value = 0;
|
||||
return false;
|
||||
}
|
||||
|
||||
private static bool TryReadVarInt(ReadOnlySpan<byte> source, ref int index, out long value)
|
||||
{
|
||||
if (!TryReadUVarInt(source, ref index, out var unsigned))
|
||||
{
|
||||
value = 0;
|
||||
return false;
|
||||
}
|
||||
|
||||
value = (long)(unsigned >> 1);
|
||||
if ((unsigned & 1) != 0)
|
||||
value = ~value;
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
internal void RecoverAEK()
|
||||
{
|
||||
if (_prf == null || _aek != null)
|
||||
|
||||
Reference in New Issue
Block a user