feat(batch15): complete group 3 msgblock/consumerfilestore
This commit is contained in:
@@ -14,6 +14,7 @@
|
||||
// Adapted from server/filestore.go (msgBlock struct and consumerFileStore struct)
|
||||
|
||||
using System.Text.Json;
|
||||
using System.Text;
|
||||
using System.Threading.Channels;
|
||||
using System.Buffers.Binary;
|
||||
using System.Security.Cryptography;
|
||||
@@ -261,7 +262,33 @@ internal sealed class MessageBlock
|
||||
/// <summary>When true, simulates a write failure. Used by unit tests only.</summary>
|
||||
public bool MockWriteErr { get; set; }
|
||||
|
||||
internal void CloseFDs()
|
||||
internal Exception? CloseFDs()
|
||||
{
|
||||
if (Mu.IsWriteLockHeld)
|
||||
return CloseFDsLocked();
|
||||
|
||||
Mu.EnterWriteLock();
|
||||
try
|
||||
{
|
||||
return CloseFDsLocked();
|
||||
}
|
||||
finally
|
||||
{
|
||||
Mu.ExitWriteLock();
|
||||
}
|
||||
}
|
||||
|
||||
internal Exception? CloseFDsLocked()
|
||||
{
|
||||
var (pending, pendingErr) = BytesPending();
|
||||
if (pendingErr == null && pending != null && pending.Length > 0)
|
||||
return new InvalidOperationException("pending data");
|
||||
|
||||
CloseFDsLockedNoCheck();
|
||||
return null;
|
||||
}
|
||||
|
||||
internal void CloseFDsLockedNoCheck()
|
||||
{
|
||||
Mfd?.Dispose();
|
||||
Mfd = null;
|
||||
@@ -795,6 +822,253 @@ internal sealed class MessageBlock
|
||||
return null;
|
||||
}
|
||||
|
||||
internal void ClearCache()
|
||||
{
|
||||
if (Ctmr != null)
|
||||
{
|
||||
var sinceLast = SinceLastActivity();
|
||||
if (Fss == null || sinceLast >= Fexp)
|
||||
{
|
||||
Fss = null;
|
||||
Ctmr.Dispose();
|
||||
Ctmr = null;
|
||||
}
|
||||
else
|
||||
{
|
||||
ResetCacheExpireTimerLocked(Fexp - sinceLast);
|
||||
}
|
||||
}
|
||||
|
||||
var cache = CacheData;
|
||||
if (cache == null)
|
||||
return;
|
||||
|
||||
var buf = cache.Buf;
|
||||
CacheData = null;
|
||||
if (!cache.Nra && buf.Length > 0)
|
||||
JetStreamFileStore.RecycleMsgBlockBuf(buf);
|
||||
}
|
||||
|
||||
internal void ExpireCache()
|
||||
{
|
||||
Mu.EnterWriteLock();
|
||||
try
|
||||
{
|
||||
ExpireCacheLocked();
|
||||
}
|
||||
finally
|
||||
{
|
||||
Mu.ExitWriteLock();
|
||||
}
|
||||
}
|
||||
|
||||
internal void TryForceExpireCache()
|
||||
{
|
||||
Mu.EnterWriteLock();
|
||||
try
|
||||
{
|
||||
TryForceExpireCacheLocked();
|
||||
}
|
||||
finally
|
||||
{
|
||||
Mu.ExitWriteLock();
|
||||
}
|
||||
}
|
||||
|
||||
internal byte[]? TryExpireWriteCache()
|
||||
{
|
||||
if (CacheData == null)
|
||||
return null;
|
||||
|
||||
var lwts = Lwts;
|
||||
var llts = Llts;
|
||||
var prior = CacheData.Buf;
|
||||
var priorNra = CacheData.Nra;
|
||||
|
||||
Lwts = 0;
|
||||
CacheData.Nra = true;
|
||||
ExpireCacheLocked();
|
||||
|
||||
Lwts = lwts;
|
||||
if (CacheData != null)
|
||||
CacheData.Nra = priorNra;
|
||||
|
||||
if (llts == 0 && CacheData?.Buf is null or { Length: 0 })
|
||||
{
|
||||
Lwts = 0;
|
||||
return Array.Empty<byte>();
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
internal void ExpireCacheLocked()
|
||||
{
|
||||
var cache = CacheData;
|
||||
if (cache == null && Fss == null)
|
||||
{
|
||||
if (Ctmr != null)
|
||||
{
|
||||
Ctmr.Dispose();
|
||||
Ctmr = null;
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
if (cache != null && PendingWriteSizeLocked() > 0)
|
||||
{
|
||||
ResetCacheExpireTimerLocked(Cexp);
|
||||
return;
|
||||
}
|
||||
|
||||
var now = JetStreamFileStore.TimestampNormalized(DateTime.UtcNow);
|
||||
var bufts = Math.Max(Llts, Lwts);
|
||||
var cacheNs = TimeSpanToNanoseconds(Cexp);
|
||||
if (cacheNs > 0 && now - bufts <= cacheNs)
|
||||
{
|
||||
ResetCacheExpireTimerLocked(NanosecondsToTimeSpan(cacheNs - (now - bufts)));
|
||||
return;
|
||||
}
|
||||
|
||||
if (cache != null)
|
||||
{
|
||||
if (!cache.Nra && cache.Buf.Length > 0)
|
||||
JetStreamFileStore.RecycleMsgBlockBuf(cache.Buf);
|
||||
|
||||
cache.Buf = Array.Empty<byte>();
|
||||
cache.Idx = Array.Empty<uint>();
|
||||
cache.Wp = 0;
|
||||
}
|
||||
|
||||
ClearCache();
|
||||
}
|
||||
|
||||
internal Exception? EnableForWriting(bool fip)
|
||||
{
|
||||
if (Mfd != null)
|
||||
return null;
|
||||
|
||||
try
|
||||
{
|
||||
Mfd = new FileStream(Mfn, FileMode.OpenOrCreate, FileAccess.ReadWrite, FileShare.Read);
|
||||
if (!fip)
|
||||
SpinUpFlushLoopLocked();
|
||||
return null;
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
return ex;
|
||||
}
|
||||
}
|
||||
|
||||
internal Exception? WriteTombstone(ulong seq, long ts)
|
||||
=> WriteMsgRecord(0, seq | FileStoreDefaults.Tbit, string.Empty, null, null, ts, true);
|
||||
|
||||
internal Exception? WriteTombstoneNoFlush(ulong seq, long ts)
|
||||
{
|
||||
Mu.EnterWriteLock();
|
||||
try
|
||||
{
|
||||
return WriteMsgRecordLocked(0, seq | FileStoreDefaults.Tbit, string.Empty, null, null, ts, flush: false, kick: false);
|
||||
}
|
||||
finally
|
||||
{
|
||||
Mu.ExitWriteLock();
|
||||
}
|
||||
}
|
||||
|
||||
internal Exception? WriteMsgRecord(ulong rl, ulong seq, string subj, byte[]? mhdr, byte[]? msg, long ts, bool flush)
|
||||
{
|
||||
Mu.EnterWriteLock();
|
||||
try
|
||||
{
|
||||
return WriteMsgRecordLocked(rl, seq, subj, mhdr, msg, ts, flush, kick: true);
|
||||
}
|
||||
finally
|
||||
{
|
||||
Mu.ExitWriteLock();
|
||||
}
|
||||
}
|
||||
|
||||
internal Exception? WriteMsgRecordLocked(ulong rl, ulong seq, string subj, byte[]? mhdr, byte[]? msg, long ts, bool flush, bool kick)
|
||||
{
|
||||
var enableErr = EnableForWriting(flush && kick);
|
||||
if (enableErr != null)
|
||||
return enableErr;
|
||||
|
||||
var cacheErr = SetupWriteCache();
|
||||
if (cacheErr != null)
|
||||
return cacheErr;
|
||||
|
||||
var cache = CacheData;
|
||||
if (cache == null)
|
||||
return new InvalidOperationException("cache was not initialized");
|
||||
|
||||
mhdr ??= Array.Empty<byte>();
|
||||
msg ??= Array.Empty<byte>();
|
||||
rl = rl == 0 ? (ulong)CalculateRecordLength(subj, mhdr, msg) : rl;
|
||||
|
||||
var index = cache.Buf.Length;
|
||||
var record = BuildMessageRecord(subj, mhdr, msg, seq, ts);
|
||||
cache.Buf = AppendBuffer(cache.Buf, record);
|
||||
Lwts = ts;
|
||||
NeedSync = true;
|
||||
Lchk = record.AsSpan(record.Length - FileStoreDefaults.RecordHashSize).ToArray();
|
||||
|
||||
var isTombstone = (seq & FileStoreDefaults.Tbit) != 0;
|
||||
if (!isTombstone)
|
||||
{
|
||||
var last = Last.Seq;
|
||||
UpdateAccounting(seq, ts, rl);
|
||||
seq &= ~FileStoreDefaults.Ebit;
|
||||
|
||||
if (cache.Idx.Length > 0 && last + 1 < seq)
|
||||
{
|
||||
var fill = (int)Math.Min(int.MaxValue, seq - (last + 1));
|
||||
if (fill > 0)
|
||||
{
|
||||
var next = new uint[cache.Idx.Length + fill];
|
||||
cache.Idx.AsSpan().CopyTo(next);
|
||||
for (var i = cache.Idx.Length; i < next.Length; i++)
|
||||
next[i] = FileStoreDefaults.Dbit;
|
||||
cache.Idx = next;
|
||||
}
|
||||
}
|
||||
|
||||
var slot = (uint)index | FileStoreDefaults.Cbit;
|
||||
var expanded = new uint[cache.Idx.Length + 1];
|
||||
cache.Idx.AsSpan().CopyTo(expanded);
|
||||
expanded[^1] = slot;
|
||||
cache.Idx = expanded;
|
||||
if (cache.Idx.Length == 1)
|
||||
cache.Fseq = seq;
|
||||
}
|
||||
else
|
||||
{
|
||||
var tombstoneSeq = seq & ~FileStoreDefaults.Tbit;
|
||||
if (Msgs == 0 && tombstoneSeq > Last.Seq)
|
||||
{
|
||||
Last = new MsgId { Seq = tombstoneSeq, Ts = ts };
|
||||
First = new MsgId { Seq = tombstoneSeq + 1, Ts = 0 };
|
||||
}
|
||||
RBytes += rl;
|
||||
Dmap.Insert(tombstoneSeq);
|
||||
}
|
||||
|
||||
if (flush || Werr != null)
|
||||
{
|
||||
var flushErr = FlushPendingToDiskLocked();
|
||||
if (flushErr != null)
|
||||
return flushErr;
|
||||
}
|
||||
else if (kick)
|
||||
{
|
||||
JetStreamFileStore.KickFlusher(Fch);
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
internal void FinishedWithCache()
|
||||
{
|
||||
if (CacheData != null && PendingWriteSizeLocked() == 0)
|
||||
@@ -1003,10 +1277,7 @@ internal sealed class MessageBlock
|
||||
Mu.EnterWriteLock();
|
||||
try
|
||||
{
|
||||
if (Ctmr == null)
|
||||
StartCacheExpireTimer();
|
||||
else
|
||||
_ = Ctmr.Change(Cexp, Timeout.InfiniteTimeSpan);
|
||||
ResetCacheExpireTimerLocked(Cexp);
|
||||
}
|
||||
finally
|
||||
{
|
||||
@@ -1019,20 +1290,7 @@ internal sealed class MessageBlock
|
||||
if (Cexp <= TimeSpan.Zero)
|
||||
return;
|
||||
|
||||
Ctmr?.Dispose();
|
||||
Ctmr = new Timer(_ =>
|
||||
{
|
||||
Mu.EnterWriteLock();
|
||||
try
|
||||
{
|
||||
if (!Closed)
|
||||
TryForceExpireCacheLocked();
|
||||
}
|
||||
finally
|
||||
{
|
||||
Mu.ExitWriteLock();
|
||||
}
|
||||
}, null, Cexp, Timeout.InfiniteTimeSpan);
|
||||
ResetCacheExpireTimerLocked(Cexp);
|
||||
}
|
||||
|
||||
internal void ClearCacheAndOffset()
|
||||
@@ -1080,14 +1338,247 @@ internal sealed class MessageBlock
|
||||
|
||||
internal ulong PendingWriteSizeLocked()
|
||||
{
|
||||
if (Closed || Mfd == null)
|
||||
return 0;
|
||||
|
||||
var cache = CacheData;
|
||||
if (cache == null)
|
||||
if (cache == null || cache.Buf.Length <= cache.Wp)
|
||||
return 0;
|
||||
|
||||
var wp = Math.Clamp(cache.Wp, 0, cache.Buf.Length);
|
||||
return (ulong)(cache.Buf.Length - wp);
|
||||
}
|
||||
|
||||
internal (byte[]? Buffer, Exception? Error) BytesPending()
|
||||
{
|
||||
if (Mfd == null)
|
||||
return (null, new InvalidOperationException("no pending data"));
|
||||
|
||||
var cache = CacheData;
|
||||
if (cache == null)
|
||||
return (null, new InvalidOperationException("no cache"));
|
||||
|
||||
if (cache.Buf.Length <= cache.Wp)
|
||||
return (null, new InvalidOperationException("no pending data"));
|
||||
|
||||
var wp = Math.Clamp(cache.Wp, 0, cache.Buf.Length);
|
||||
var pending = cache.Buf[wp..];
|
||||
return pending.Length == 0
|
||||
? (null, new InvalidOperationException("no pending data"))
|
||||
: (pending, null);
|
||||
}
|
||||
|
||||
internal ulong BlkSize()
|
||||
{
|
||||
Mu.EnterReadLock();
|
||||
try
|
||||
{
|
||||
return RBytes;
|
||||
}
|
||||
finally
|
||||
{
|
||||
Mu.ExitReadLock();
|
||||
}
|
||||
}
|
||||
|
||||
internal void UpdateAccounting(ulong seq, long ts, ulong rl)
|
||||
{
|
||||
var isDeleted = (seq & FileStoreDefaults.Ebit) != 0;
|
||||
if (isDeleted)
|
||||
seq &= ~FileStoreDefaults.Ebit;
|
||||
|
||||
var fseq = First.Seq;
|
||||
if ((fseq == 0 || First.Ts == 0) && seq >= fseq)
|
||||
First = new MsgId { Seq = seq, Ts = ts };
|
||||
|
||||
Last = new MsgId { Seq = seq, Ts = ts };
|
||||
RBytes += rl;
|
||||
if (!isDeleted)
|
||||
{
|
||||
Bytes += rl;
|
||||
Msgs++;
|
||||
}
|
||||
}
|
||||
|
||||
internal Exception? RecompressOnDiskIfNeeded()
|
||||
{
|
||||
byte[] original;
|
||||
try
|
||||
{
|
||||
original = File.Exists(Mfn) ? File.ReadAllBytes(Mfn) : Array.Empty<byte>();
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
return new IOException("failed to read original block from disk", ex);
|
||||
}
|
||||
|
||||
if (original.Length == 0)
|
||||
return null;
|
||||
|
||||
var checkErr = CheckAndLoadEncryption();
|
||||
if (checkErr != null)
|
||||
return checkErr;
|
||||
|
||||
if (Bek != null)
|
||||
Bek.XorKeyStream(original.AsSpan());
|
||||
|
||||
var desired = Cmp;
|
||||
var meta = new CompressionInfo();
|
||||
var consumed = meta.UnmarshalMetadata(original);
|
||||
if (consumed > 0 && meta.Type == desired)
|
||||
return null;
|
||||
|
||||
byte[] rewritten;
|
||||
if (desired == StoreCompression.NoCompression && consumed > 0)
|
||||
{
|
||||
if (meta.Type != StoreCompression.NoCompression)
|
||||
return new NotSupportedException("compressed block decompression is not available");
|
||||
|
||||
rewritten = original[consumed..];
|
||||
}
|
||||
else
|
||||
{
|
||||
// Compression transforms are introduced separately with StoreCompression helpers.
|
||||
rewritten = original;
|
||||
}
|
||||
|
||||
if (Bek != null)
|
||||
Bek.XorKeyStream(rewritten.AsSpan());
|
||||
|
||||
try
|
||||
{
|
||||
var tmp = $"{Mfn}.tmp";
|
||||
File.WriteAllBytes(tmp, rewritten);
|
||||
File.Move(tmp, Mfn, overwrite: true);
|
||||
return null;
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
return ex;
|
||||
}
|
||||
}
|
||||
|
||||
private Exception? FlushPendingToDiskLocked()
|
||||
{
|
||||
var (pending, pendingErr) = BytesPending();
|
||||
if (pendingErr != null || pending == null || pending.Length == 0)
|
||||
return pendingErr;
|
||||
|
||||
try
|
||||
{
|
||||
Mfd ??= new FileStream(Mfn, FileMode.OpenOrCreate, FileAccess.ReadWrite, FileShare.Read);
|
||||
Mfd.Seek(0, SeekOrigin.End);
|
||||
Mfd.Write(pending, 0, pending.Length);
|
||||
Mfd.Flush(SyncAlways);
|
||||
NeedSync = false;
|
||||
|
||||
if (CacheData != null)
|
||||
CacheData.Wp = CacheData.Buf.Length;
|
||||
|
||||
return null;
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
Werr = ex;
|
||||
return ex;
|
||||
}
|
||||
}
|
||||
|
||||
private void ResetCacheExpireTimerLocked(TimeSpan due)
|
||||
{
|
||||
if (Cexp <= TimeSpan.Zero)
|
||||
return;
|
||||
|
||||
if (due <= TimeSpan.Zero)
|
||||
due = TimeSpan.FromMilliseconds(1);
|
||||
|
||||
if (Ctmr == null)
|
||||
{
|
||||
Ctmr = new Timer(_ =>
|
||||
{
|
||||
Mu.EnterWriteLock();
|
||||
try
|
||||
{
|
||||
if (!Closed)
|
||||
ExpireCacheLocked();
|
||||
}
|
||||
finally
|
||||
{
|
||||
Mu.ExitWriteLock();
|
||||
}
|
||||
}, null, due, Timeout.InfiniteTimeSpan);
|
||||
}
|
||||
else
|
||||
{
|
||||
_ = Ctmr.Change(due, Timeout.InfiniteTimeSpan);
|
||||
}
|
||||
}
|
||||
|
||||
private TimeSpan SinceLastActivity()
|
||||
{
|
||||
var now = JetStreamFileStore.TimestampNormalized(DateTime.UtcNow);
|
||||
var last = Math.Max(Math.Max(Lrts, Llts), Math.Max(Lwts, Lsts));
|
||||
var delta = now - last;
|
||||
return NanosecondsToTimeSpan(delta);
|
||||
}
|
||||
|
||||
private static long TimeSpanToNanoseconds(TimeSpan value)
|
||||
=> value <= TimeSpan.Zero ? 0 : checked(value.Ticks * 100L);
|
||||
|
||||
private static TimeSpan NanosecondsToTimeSpan(long value)
|
||||
{
|
||||
if (value <= 0)
|
||||
return TimeSpan.Zero;
|
||||
|
||||
return TimeSpan.FromTicks(Math.Max(1L, value / 100L));
|
||||
}
|
||||
|
||||
private static int CalculateRecordLength(string subject, byte[] mhdr, byte[] msg)
|
||||
=> (8 + 8 + 4 + 4 + 4) + Encoding.UTF8.GetByteCount(subject) + mhdr.Length + msg.Length + FileStoreDefaults.RecordHashSize;
|
||||
|
||||
private static byte[] BuildMessageRecord(string subject, byte[] mhdr, byte[] msg, ulong seq, long ts)
|
||||
{
|
||||
var subjectBytes = Encoding.UTF8.GetBytes(subject);
|
||||
var bodyLength = (8 + 8 + 4 + 4 + 4) + subjectBytes.Length + mhdr.Length + msg.Length;
|
||||
var record = new byte[bodyLength + FileStoreDefaults.RecordHashSize];
|
||||
var span = record.AsSpan();
|
||||
|
||||
var index = 0;
|
||||
BinaryPrimitives.WriteUInt64LittleEndian(span[index..], seq);
|
||||
index += 8;
|
||||
BinaryPrimitives.WriteInt64LittleEndian(span[index..], ts);
|
||||
index += 8;
|
||||
BinaryPrimitives.WriteInt32LittleEndian(span[index..], subjectBytes.Length);
|
||||
index += 4;
|
||||
BinaryPrimitives.WriteInt32LittleEndian(span[index..], mhdr.Length);
|
||||
index += 4;
|
||||
BinaryPrimitives.WriteInt32LittleEndian(span[index..], msg.Length);
|
||||
index += 4;
|
||||
|
||||
subjectBytes.CopyTo(span[index..]);
|
||||
index += subjectBytes.Length;
|
||||
mhdr.CopyTo(span[index..]);
|
||||
index += mhdr.Length;
|
||||
msg.CopyTo(span[index..]);
|
||||
index += msg.Length;
|
||||
|
||||
var checksum = SHA256.HashData(span[..index]);
|
||||
checksum.AsSpan(0, FileStoreDefaults.RecordHashSize).CopyTo(span[index..]);
|
||||
return record;
|
||||
}
|
||||
|
||||
private static byte[] AppendBuffer(byte[] source, byte[] append)
|
||||
{
|
||||
if (append.Length == 0)
|
||||
return source;
|
||||
|
||||
var merged = new byte[source.Length + append.Length];
|
||||
if (source.Length > 0)
|
||||
Buffer.BlockCopy(source, 0, merged, 0, source.Length);
|
||||
Buffer.BlockCopy(append, 0, merged, source.Length, append.Length);
|
||||
return merged;
|
||||
}
|
||||
|
||||
private static bool HasChecksum(byte[]? checksum)
|
||||
{
|
||||
if (checksum == null || checksum.Length != FileStoreDefaults.RecordHashSize)
|
||||
@@ -1212,11 +1703,14 @@ internal sealed class MessageBlock
|
||||
|
||||
internal void TryForceExpireCacheLocked()
|
||||
{
|
||||
if (CacheData?.Buf is { Length: > 0 } buf)
|
||||
JetStreamFileStore.RecycleMsgBlockBuf(buf);
|
||||
|
||||
CacheData = null;
|
||||
var llts = Llts;
|
||||
var lwts = Lwts;
|
||||
Llts = 0;
|
||||
Lwts = 0;
|
||||
ExpireCacheLocked();
|
||||
Fss = null;
|
||||
Llts = llts;
|
||||
Lwts = lwts;
|
||||
}
|
||||
|
||||
// Lock should be held.
|
||||
|
||||
Reference in New Issue
Block a user