feat(batch12): complete group1 filestore recovery

This commit is contained in:
Joseph Doherty
2026-02-28 14:22:16 -05:00
parent f4dc4115e8
commit d55bb3ef19
2 changed files with 688 additions and 0 deletions

View File

@@ -18,6 +18,7 @@ using System.Security.Cryptography;
using System.Text;
using System.Text.Json;
using System.Threading.Channels;
using ZB.MOM.NatsNet.Server.Internal;
using ZB.MOM.NatsNet.Server.Internal.DataStructures;
namespace ZB.MOM.NatsNet.Server;
@@ -68,6 +69,8 @@ public sealed class JetStreamFileStore : IStreamStore, IDisposable
// Per-subject index map
private SubjectTree<Psi>? _psim;
private HashWheel? _ttls;
private MsgScheduling? _scheduling;
// Total subject-list length (sum of subject-string lengths)
private int _tsl;
@@ -117,6 +120,12 @@ public sealed class JetStreamFileStore : IStreamStore, IDisposable
private const int ConsumerHeaderLength = 2;
private const int MaxVarIntLength = 10;
private const long NanosecondsPerSecond = 1_000_000_000L;
private const byte FullStateMagic = 11;
private const byte FullStateMinVersion = 1;
private const byte FullStateVersion = 3;
private const int FullStateHeaderLength = 2;
private const int FullStateMinimumLength = 32;
private const int FullStateChecksumLength = 8;
static JetStreamFileStore()
{
@@ -1024,6 +1033,685 @@ public sealed class JetStreamFileStore : IStreamStore, IDisposable
return mb;
}
internal void Warn(string format, params object?[] args)
{
if (_fcfg.Server is not NatsServer server)
return;
server.Warnf("Filestore [{0}] " + format, BuildLogArgs(args));
}
internal void Debug(string format, params object?[] args)
{
if (_fcfg.Server is not NatsServer server)
return;
server.Debugf("Filestore [{0}] " + format, BuildLogArgs(args));
}
internal Exception? RecoverFullState()
{
_mu.EnterWriteLock();
try
{
RecoverPartialPurge();
var fn = Path.Combine(_fcfg.StoreDir, FileStoreDefaults.MsgDir, FileStoreDefaults.StreamStateFile);
byte[] raw;
try
{
raw = File.ReadAllBytes(fn);
}
catch (FileNotFoundException ex)
{
return ex;
}
catch (DirectoryNotFoundException ex)
{
return ex;
}
catch (Exception ex)
{
Warn("Could not read stream state file: {0}", ex);
return ex;
}
if (raw.Length < FullStateMinimumLength)
{
TryDeleteFile(fn);
Warn("Stream state too short ({0} bytes)", raw.Length);
return new InvalidDataException("corrupt state");
}
if (!TryValidateFullStateChecksum(raw, out var buf))
{
TryDeleteFile(fn);
Warn("Stream state checksum did not match");
return new InvalidDataException("corrupt state");
}
if (_prf != null && _aek != null)
{
var ns = _aek.NonceSize;
if (buf.Length <= ns)
{
Warn("Stream state error reading encryption key: malformed ciphertext");
return new InvalidDataException("corrupt state");
}
try
{
buf = _aek.Open(buf.AsSpan(0, ns), buf.AsSpan(ns));
}
catch (Exception ex)
{
Warn("Stream state error reading encryption key: {0}", ex);
return ex;
}
}
if (buf.Length < FullStateHeaderLength)
{
TryDeleteFile(fn);
Warn("Stream state missing header");
return new InvalidDataException("corrupt state");
}
var version = buf[1];
if (buf[0] != FullStateMagic || version < FullStateMinVersion || version > FullStateVersion)
{
TryDeleteFile(fn);
Warn("Stream state magic and version mismatch");
return new InvalidDataException("corrupt state");
}
var bi = FullStateHeaderLength;
if (!TryReadUVarInt(buf, ref bi, out var msgs) ||
!TryReadUVarInt(buf, ref bi, out var bytes) ||
!TryReadUVarInt(buf, ref bi, out var firstSeq) ||
!TryReadVarInt(buf, ref bi, out var baseTime) ||
!TryReadUVarInt(buf, ref bi, out var lastSeq) ||
!TryReadVarInt(buf, ref bi, out var lastTs))
{
TryDeleteFile(fn);
Warn("Stream state could not decode stream summary");
return new InvalidDataException("corrupt state");
}
var recoveredState = new StreamState
{
Msgs = msgs,
Bytes = bytes,
FirstSeq = firstSeq,
LastSeq = lastSeq,
FirstTime = baseTime == 0 ? default : FromUnixNanosUtc(baseTime),
LastTime = lastTs == 0 ? default : FromUnixNanosUtc(lastTs),
};
_psim ??= new SubjectTree<Psi>();
_psim.Reset();
_tsl = 0;
if (!TryReadUVarInt(buf, ref bi, out var numSubjects))
return CorruptStateWithDelete(fn, "Stream state missing subject metadata");
for (var i = 0UL; i < numSubjects; i++)
{
if (!TryReadUVarInt(buf, ref bi, out var subjectLength))
return CorruptStateWithDelete(fn, "Stream state subject length decode failed");
if (subjectLength == 0)
continue;
if (bi < 0 || bi + (int)subjectLength > buf.Length)
return CorruptStateWithDelete(fn, $"Stream state bad subject len ({subjectLength})");
var subject = Encoding.Latin1.GetString(buf, bi, (int)subjectLength);
if (!SubscriptionIndex.IsValidLiteralSubject(subject))
return CorruptStateWithDelete(fn, "Stream state corrupt subject detected");
bi += (int)subjectLength;
if (!TryReadUVarInt(buf, ref bi, out var total) || !TryReadUVarInt(buf, ref bi, out var fblk))
return CorruptStateWithDelete(fn, "Stream state could not decode subject index");
ulong lblk = fblk;
if (total > 1)
{
if (!TryReadUVarInt(buf, ref bi, out lblk))
return CorruptStateWithDelete(fn, "Stream state could not decode subject last block");
}
_psim.Insert(Encoding.Latin1.GetBytes(subject), new Psi
{
Total = total,
Fblk = (uint)fblk,
Lblk = (uint)lblk,
});
_tsl += (int)subjectLength;
}
if (!TryReadUVarInt(buf, ref bi, out var numBlocks))
return CorruptStateWithDelete(fn, "Stream state could not decode block count");
var parsedBlocks = new List<MessageBlock>((int)numBlocks);
var parsedMap = new Dictionary<uint, MessageBlock>((int)numBlocks);
var mstate = new StreamState();
var lastBlockIndex = numBlocks > 0 ? numBlocks - 1 : 0;
for (var i = 0UL; i < numBlocks; i++)
{
if (!TryReadUVarInt(buf, ref bi, out var idx) ||
!TryReadUVarInt(buf, ref bi, out var nbytes) ||
!TryReadUVarInt(buf, ref bi, out var fseq) ||
!TryReadVarInt(buf, ref bi, out var fts) ||
!TryReadUVarInt(buf, ref bi, out var lseq) ||
!TryReadVarInt(buf, ref bi, out var lts) ||
!TryReadUVarInt(buf, ref bi, out var numDeleted))
{
return CorruptStateWithDelete(fn, "Stream state block decode failed");
}
ulong ttls = 0;
if (version >= 2)
{
if (!TryReadUVarInt(buf, ref bi, out ttls))
return CorruptStateWithDelete(fn, "Stream state TTL metadata decode failed");
}
ulong schedules = 0;
if (version >= 3)
{
if (!TryReadUVarInt(buf, ref bi, out schedules))
return CorruptStateWithDelete(fn, "Stream state schedule metadata decode failed");
}
var mb = InitMsgBlock((uint)idx);
mb.First = new MsgId { Seq = fseq, Ts = fts + baseTime };
mb.Last = new MsgId { Seq = lseq, Ts = lts + baseTime };
mb.Bytes = nbytes;
mb.Msgs = lseq >= fseq ? (lseq - fseq + 1) : 0;
mb.Ttls = ttls;
mb.Schedules = schedules;
mb.Closed = true;
if (numDeleted > 0)
{
try
{
var (dmap, consumed) = SequenceSet.Decode(buf.AsSpan(bi));
if (consumed <= 0)
return CorruptStateWithDelete(fn, "Stream state error decoding deleted map");
mb.Dmap = dmap;
mb.Msgs = mb.Msgs > numDeleted ? (mb.Msgs - numDeleted) : 0;
bi += consumed;
}
catch (Exception ex)
{
Warn("Stream state error decoding deleted map: {0}", ex);
return CorruptStateWithDelete(fn, "Stream state error decoding deleted map");
}
}
if (mb.Msgs > 0 || i == lastBlockIndex)
{
parsedBlocks.Add(mb);
parsedMap[mb.Index] = mb;
UpdateTrackingState(mstate, mb);
}
else
{
_dirty++;
}
}
if (!TryReadUVarInt(buf, ref bi, out var blkIndex))
return CorruptStateWithDelete(fn, "Stream state has no block index");
if (bi < 0 || bi + FullStateChecksumLength > buf.Length)
return CorruptStateWithDelete(fn, "Stream state has no checksum present");
var lchk = buf.AsSpan(bi, FullStateChecksumLength).ToArray();
_state = recoveredState;
_blks = parsedBlocks;
_bim = parsedMap;
_lmb = _blks.Count > 0 ? _blks[^1] : null;
if (_lmb == null || _lmb.Index != (uint)blkIndex)
return CorruptStateWithDelete(fn, "Stream state block does not exist or index mismatch");
if (!File.Exists(_lmb.Mfn))
{
Warn("Stream state detected prior state, could not locate msg block {0}", blkIndex);
return new InvalidOperationException("prior stream state detected");
}
if (!LastChecksumMatches(_lmb, lchk))
{
Warn("Stream state outdated, last block has additional entries, will rebuild");
return new InvalidOperationException("prior stream state detected");
}
var mdir = Path.Combine(_fcfg.StoreDir, FileStoreDefaults.MsgDir);
if (Directory.Exists(mdir))
{
foreach (var path in Directory.EnumerateFiles(mdir, "*" + FileStoreDefaults.BlkSuffix, SearchOption.TopDirectoryOnly))
{
if (!TryParseBlockIndex(Path.GetFileName(path), out var index))
continue;
if (index > blkIndex)
{
Warn("Stream state outdated, found extra blocks, will rebuild");
return new InvalidOperationException("prior stream state detected");
}
if (index <= uint.MaxValue && _bim.TryGetValue((uint)index, out var mb))
mb.Closed = false;
}
}
var rebuildRequired = false;
foreach (var mb in _blks)
{
if (!mb.Closed)
continue;
rebuildRequired = true;
Warn("Stream state detected prior state, could not locate msg block {0}", mb.Index);
}
if (rebuildRequired)
return new InvalidOperationException("prior stream state detected");
if (!TrackingStatesEqual(_state, mstate))
return CorruptStateWithDelete(fn, "Stream state encountered internal inconsistency on recover");
return null;
}
finally
{
_mu.ExitWriteLock();
}
}
private object?[] BuildLogArgs(object?[] args)
{
if (args.Length == 0)
return [_cfg.Config.Name];
var formatted = new object?[args.Length + 1];
formatted[0] = _cfg.Config.Name;
Array.Copy(args, 0, formatted, 1, args.Length);
return formatted;
}
private Exception CorruptStateWithDelete(string fileName, string message)
{
TryDeleteFile(fileName);
Warn("{0}", message);
return new InvalidDataException("corrupt state");
}
private static void TryDeleteFile(string fileName)
{
try
{
if (File.Exists(fileName))
File.Delete(fileName);
}
catch
{
// Best effort to drop unusable snapshots.
}
}
private bool TryValidateFullStateChecksum(byte[] raw, out byte[] payload)
{
payload = Array.Empty<byte>();
if (raw.Length <= FullStateChecksumLength)
return false;
var contentLength = raw.Length - FullStateChecksumLength;
payload = raw[..contentLength];
var checksum = raw.AsSpan(contentLength, FullStateChecksumLength);
var key = SHA256.HashData(Encoding.UTF8.GetBytes(_cfg.Config.Name));
using var hmac = new HMACSHA256(key);
var digest = hmac.ComputeHash(payload);
return checksum.SequenceEqual(digest.AsSpan(0, FullStateChecksumLength));
}
private bool LastChecksumMatches(MessageBlock mb, byte[] expected)
{
try
{
using var fs = new FileStream(mb.Mfn, FileMode.Open, FileAccess.Read, FileShare.ReadWrite);
if (fs.Length < FileStoreDefaults.RecordHashSize)
return expected.AsSpan().SequenceEqual(new byte[FileStoreDefaults.RecordHashSize]);
var lchk = new byte[FileStoreDefaults.RecordHashSize];
fs.Seek(-FileStoreDefaults.RecordHashSize, SeekOrigin.End);
fs.ReadExactly(lchk, 0, lchk.Length);
return expected.AsSpan().SequenceEqual(lchk);
}
catch
{
return false;
}
}
private static bool TryParseBlockIndex(string fileName, out ulong index)
{
index = 0;
if (!fileName.EndsWith(FileStoreDefaults.BlkSuffix, StringComparison.Ordinal))
return false;
var stem = Path.GetFileNameWithoutExtension(fileName);
return ulong.TryParse(stem, out index);
}
private void RecoverPartialPurge()
{
var storeDir = _fcfg.StoreDir;
var msgDir = Path.Combine(storeDir, FileStoreDefaults.MsgDir);
var purgeDir = Path.Combine(storeDir, FileStoreDefaults.PurgeDir);
var newMsgDir = Path.Combine(storeDir, FileStoreDefaults.NewMsgDir);
if (!Directory.Exists(msgDir))
{
if (Directory.Exists(newMsgDir))
{
Directory.Move(newMsgDir, msgDir);
}
else if (Directory.Exists(purgeDir))
{
Directory.Move(purgeDir, msgDir);
}
return;
}
if (Directory.Exists(newMsgDir))
Directory.Delete(newMsgDir, recursive: true);
if (Directory.Exists(purgeDir))
Directory.Delete(purgeDir, recursive: true);
}
private void ResetAgeChk(long delta)
{
if (_ageChkRun)
return;
long next = long.MaxValue;
if (_ttls != null)
next = _ttls.GetNextExpiration(next);
if (_cfg.Config.MaxAge <= TimeSpan.Zero && next == long.MaxValue)
{
CancelAgeChk();
return;
}
var fireIn = _cfg.Config.MaxAge;
if (delta == 0 && _state.Msgs > 0)
{
var until = TimeSpan.FromSeconds(2);
if (fireIn == TimeSpan.Zero || until < fireIn)
fireIn = until;
}
if (next < long.MaxValue)
{
var nextTicks = DateTime.UnixEpoch.Ticks + next / 100L;
var nextUtc = new DateTime(Math.Max(nextTicks, DateTime.UnixEpoch.Ticks), DateTimeKind.Utc);
var until = nextUtc - DateTime.UtcNow;
if (fireIn == TimeSpan.Zero || until < fireIn)
fireIn = until;
}
if (delta > 0)
{
var deltaDur = TimeSpan.FromTicks(delta / 100L);
if (fireIn == TimeSpan.Zero || deltaDur < fireIn)
fireIn = deltaDur;
}
if (fireIn < TimeSpan.FromMilliseconds(250))
fireIn = TimeSpan.FromMilliseconds(250);
var expires = DateTime.UtcNow.Ticks + fireIn.Ticks;
if (_ageChkTime > 0 && expires > _ageChkTime)
return;
_ageChkTime = expires;
if (_ageChk != null)
_ageChk.Change(fireIn, Timeout.InfiniteTimeSpan);
else
_ageChk = new Timer(_ => { }, null, fireIn, Timeout.InfiniteTimeSpan);
}
private void CancelAgeChk()
{
_ageChk?.Dispose();
_ageChk = null;
_ageChkTime = 0;
}
private void RunMsgScheduling()
{
_mu.EnterWriteLock();
try
{
_scheduling?.ResetTimer();
}
finally
{
_mu.ExitWriteLock();
}
}
internal Exception? RecoverTTLState()
{
_mu.EnterWriteLock();
try
{
var fn = Path.Combine(_fcfg.StoreDir, FileStoreDefaults.MsgDir, FileStoreDefaults.TtlStreamStateFile);
byte[]? buf = null;
try
{
if (File.Exists(fn))
buf = File.ReadAllBytes(fn);
}
catch (Exception ex)
{
return ex;
}
_ttls = HashWheel.NewHashWheel();
ulong ttlSeq = 0;
if (buf != null)
{
try
{
ttlSeq = _ttls.Decode(buf);
}
catch (Exception ex)
{
Warn("Error decoding TTL state: {0}", ex);
TryDeleteFile(fn);
}
}
if (ttlSeq < _state.FirstSeq)
ttlSeq = _state.FirstSeq;
try
{
if (_state.Msgs > 0 && ttlSeq <= _state.LastSeq)
{
Warn("TTL state is outdated; attempting to recover using linear scan (seq {0} to {1})", ttlSeq, _state.LastSeq);
foreach (var mb in _blks)
{
mb.Mu.EnterReadLock();
try
{
if (mb.Ttls == 0 || mb.Last.Seq < ttlSeq)
continue;
var start = Math.Max(ttlSeq, mb.First.Seq);
var end = mb.Last.Seq;
for (var seq = start; seq <= end; seq++)
{
StoreMsg? sm = null;
try
{
sm = LoadMsg(seq, null);
}
catch (Exception ex)
{
Warn("Error loading msg seq {0} for recovering TTL: {1}", seq, ex);
}
if (sm?.Hdr is not { Length: > 0 })
{
if (seq == ulong.MaxValue)
break;
continue;
}
var (ttl, _) = JetStreamHeaderHelpers.GetMessageTtl(sm.Hdr);
if (ttl > 0)
{
var expires = sm.Ts + (ttl * NanosecondsPerSecond);
_ttls.Add(seq, expires);
}
if (seq == ulong.MaxValue)
break;
}
}
finally
{
mb.Mu.ExitReadLock();
}
}
}
}
finally
{
ResetAgeChk(0);
}
return null;
}
finally
{
_mu.ExitWriteLock();
}
}
internal Exception? RecoverMsgSchedulingState()
{
_mu.EnterWriteLock();
try
{
var fn = Path.Combine(_fcfg.StoreDir, FileStoreDefaults.MsgDir, FileStoreDefaults.MsgSchedulingStreamStateFile);
byte[]? buf = null;
try
{
if (File.Exists(fn))
buf = File.ReadAllBytes(fn);
}
catch (Exception ex)
{
return ex;
}
_scheduling = new MsgScheduling(RunMsgScheduling);
ulong schedSeq = 0;
if (buf != null)
{
var (decodedSeq, err) = _scheduling.Decode(buf);
schedSeq = decodedSeq;
if (err != null)
{
Warn("Error decoding message scheduling state: {0}", err);
TryDeleteFile(fn);
schedSeq = 0;
}
}
if (schedSeq < _state.FirstSeq)
schedSeq = _state.FirstSeq;
try
{
if (_state.Msgs > 0 && schedSeq <= _state.LastSeq)
{
Warn("Message scheduling state is outdated; attempting to recover using linear scan (seq {0} to {1})", schedSeq, _state.LastSeq);
foreach (var mb in _blks)
{
mb.Mu.EnterReadLock();
try
{
if (mb.Schedules == 0 || mb.Last.Seq < schedSeq)
continue;
var start = Math.Max(schedSeq, mb.First.Seq);
var end = mb.Last.Seq;
for (var seq = start; seq <= end; seq++)
{
StoreMsg? sm = null;
try
{
sm = LoadMsg(seq, null);
}
catch (Exception ex)
{
Warn("Error loading msg seq {0} for recovering message schedules: {1}", seq, ex);
}
if (sm?.Hdr is not { Length: > 0 })
{
if (seq == ulong.MaxValue)
break;
continue;
}
var (schedule, ok) = JetStreamHeaderHelpers.NextMessageSchedule(sm.Hdr, sm.Ts);
if (ok && schedule != default)
_scheduling.Init(seq, sm.Subject, schedule.Ticks * 100L);
if (seq == ulong.MaxValue)
break;
}
}
finally
{
mb.Mu.ExitReadLock();
}
}
}
}
finally
{
_scheduling.ResetTimer();
}
return null;
}
finally
{
_mu.ExitWriteLock();
}
}
private static void EnsureStoreDirectoryWritable(string storeDir)
{
if (string.IsNullOrWhiteSpace(storeDir))

Binary file not shown.