feat(storage): add tombstone tracking and purge operations (Go parity)

Implement PurgeEx, Compact, Truncate, FilteredState, SubjectsState,
SubjectsTotals, State, FastState, GetSeqFromTime on FileStore. Add
MsgBlock.IsDeleted, DeletedSequences, EnumerateNonDeleted. Includes
wildcard subject support via SubjectMatch for all filtered operations.
This commit is contained in:
Joseph Doherty
2026-02-24 13:42:17 -05:00
parent 2816e8f048
commit b0b64292b3
3 changed files with 794 additions and 0 deletions

View File

@@ -227,6 +227,318 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable
RewriteBlocks();
}
// -------------------------------------------------------------------------
// Go-parity sync interface implementations
// Reference: golang/nats-server/server/filestore.go
// -------------------------------------------------------------------------
/// <summary>
/// Removes all messages from the store and returns the count purged.
/// Reference: golang/nats-server/server/filestore.go — purge / purgeMsgs.
/// </summary>
public ulong Purge()
{
var count = (ulong)_messages.Count;
_messages.Clear();
_last = 0;
DisposeAllBlocks();
CleanBlockFiles();
return count;
}
/// <summary>
/// Purge messages on a given subject, up to sequence <paramref name="seq"/>,
/// keeping the newest <paramref name="keep"/> messages.
/// If subject is empty or null, behaves like <see cref="Purge"/>.
/// Returns the number of messages removed.
/// Reference: golang/nats-server/server/filestore.go — PurgeEx.
/// </summary>
public ulong PurgeEx(string subject, ulong seq, ulong keep)
{
if (string.IsNullOrEmpty(subject))
return Purge();
// Collect all messages matching the subject (with wildcard support) at or below seq, ordered by sequence.
var candidates = _messages.Values
.Where(m => SubjectMatchesFilter(m.Subject, subject))
.Where(m => seq == 0 || m.Sequence <= seq)
.OrderBy(m => m.Sequence)
.ToList();
if (candidates.Count == 0)
return 0;
// Keep the newest `keep` messages; purge the rest.
var toRemove = keep > 0 && (ulong)candidates.Count > keep
? candidates.Take(candidates.Count - (int)keep).ToList()
: (keep == 0 ? candidates : []);
if (toRemove.Count == 0)
return 0;
foreach (var msg in toRemove)
{
_messages.Remove(msg.Sequence);
DeleteInBlock(msg.Sequence);
}
// Update _last if required.
if (_messages.Count == 0)
_last = 0;
else if (!_messages.ContainsKey(_last))
_last = _messages.Keys.Max();
return (ulong)toRemove.Count;
}
/// <summary>
/// Removes all messages with sequence number strictly less than <paramref name="seq"/>
/// and returns the count removed.
/// Reference: golang/nats-server/server/filestore.go — Compact.
/// </summary>
public ulong Compact(ulong seq)
{
if (seq == 0)
return 0;
var toRemove = _messages.Keys.Where(k => k < seq).ToArray();
if (toRemove.Length == 0)
return 0;
foreach (var s in toRemove)
{
_messages.Remove(s);
DeleteInBlock(s);
}
if (_messages.Count == 0)
_last = 0;
else if (!_messages.ContainsKey(_last))
_last = _messages.Keys.Max();
return (ulong)toRemove.Length;
}
/// <summary>
/// Removes all messages with sequence number strictly greater than <paramref name="seq"/>
/// and updates the last sequence pointer.
/// Reference: golang/nats-server/server/filestore.go — Truncate.
/// </summary>
public void Truncate(ulong seq)
{
if (seq == 0)
{
// Truncate to nothing.
_messages.Clear();
_last = 0;
DisposeAllBlocks();
CleanBlockFiles();
return;
}
var toRemove = _messages.Keys.Where(k => k > seq).ToArray();
foreach (var s in toRemove)
{
_messages.Remove(s);
DeleteInBlock(s);
}
// Update _last to the new highest existing sequence (or seq if it exists,
// or the highest below seq).
_last = _messages.Count == 0 ? 0 : _messages.Keys.Max();
}
/// <summary>
/// Returns the first sequence number at or after the given UTC time.
/// Returns <c>_last + 1</c> if no message exists at or after <paramref name="t"/>.
/// Reference: golang/nats-server/server/filestore.go — GetSeqFromTime.
/// </summary>
public ulong GetSeqFromTime(DateTime t)
{
var utc = t.Kind == DateTimeKind.Utc ? t : t.ToUniversalTime();
var match = _messages.Values
.Where(m => m.TimestampUtc >= utc)
.OrderBy(m => m.Sequence)
.FirstOrDefault();
return match?.Sequence ?? _last + 1;
}
/// <summary>
/// Returns compact state for non-deleted messages on <paramref name="subject"/>
/// at or after sequence <paramref name="seq"/>.
/// Reference: golang/nats-server/server/filestore.go — FilteredState.
/// </summary>
public SimpleState FilteredState(ulong seq, string subject)
{
var matching = _messages.Values
.Where(m => m.Sequence >= seq)
.Where(m => string.IsNullOrEmpty(subject)
|| SubjectMatchesFilter(m.Subject, subject))
.OrderBy(m => m.Sequence)
.ToList();
if (matching.Count == 0)
return new SimpleState();
return new SimpleState
{
Msgs = (ulong)matching.Count,
First = matching[0].Sequence,
Last = matching[^1].Sequence,
};
}
/// <summary>
/// Returns per-subject <see cref="SimpleState"/> for all subjects matching
/// <paramref name="filterSubject"/>. Supports NATS wildcard filters.
/// Reference: golang/nats-server/server/filestore.go — SubjectsState.
/// </summary>
public Dictionary<string, SimpleState> SubjectsState(string filterSubject)
{
var result = new Dictionary<string, SimpleState>(StringComparer.Ordinal);
foreach (var msg in _messages.Values)
{
if (!string.IsNullOrEmpty(filterSubject) && !SubjectMatchesFilter(msg.Subject, filterSubject))
continue;
if (result.TryGetValue(msg.Subject, out var existing))
{
result[msg.Subject] = new SimpleState
{
Msgs = existing.Msgs + 1,
First = Math.Min(existing.First == 0 ? msg.Sequence : existing.First, msg.Sequence),
Last = Math.Max(existing.Last, msg.Sequence),
};
}
else
{
result[msg.Subject] = new SimpleState
{
Msgs = 1,
First = msg.Sequence,
Last = msg.Sequence,
};
}
}
return result;
}
/// <summary>
/// Returns per-subject message counts for all subjects matching
/// <paramref name="filterSubject"/>. Supports NATS wildcard filters.
/// Reference: golang/nats-server/server/filestore.go — SubjectsTotals.
/// </summary>
public Dictionary<string, ulong> SubjectsTotals(string filterSubject)
{
var result = new Dictionary<string, ulong>(StringComparer.Ordinal);
foreach (var msg in _messages.Values)
{
if (!string.IsNullOrEmpty(filterSubject) && !SubjectMatchesFilter(msg.Subject, filterSubject))
continue;
result.TryGetValue(msg.Subject, out var count);
result[msg.Subject] = count + 1;
}
return result;
}
/// <summary>
/// Returns the full stream state, including the list of deleted (interior gap) sequences.
/// Reference: golang/nats-server/server/filestore.go — State.
/// </summary>
public StreamState State()
{
var state = new StreamState();
FastState(ref state);
// Populate deleted sequences: sequences in [firstSeq, lastSeq] that are
// not present in _messages.
if (state.FirstSeq > 0 && state.LastSeq >= state.FirstSeq)
{
var deletedList = new List<ulong>();
for (var s = state.FirstSeq; s <= state.LastSeq; s++)
{
if (!_messages.ContainsKey(s))
deletedList.Add(s);
}
if (deletedList.Count > 0)
{
state.Deleted = [.. deletedList];
state.NumDeleted = deletedList.Count;
}
}
// Populate per-subject counts.
var subjectCounts = new Dictionary<string, ulong>(StringComparer.Ordinal);
foreach (var msg in _messages.Values)
{
subjectCounts.TryGetValue(msg.Subject, out var cnt);
subjectCounts[msg.Subject] = cnt + 1;
}
state.NumSubjects = subjectCounts.Count;
state.Subjects = subjectCounts.Count > 0 ? subjectCounts : null;
return state;
}
/// <summary>
/// Populates a pre-allocated <see cref="StreamState"/> with the minimum fields
/// needed for replication without allocating a new struct.
/// Does not populate the <see cref="StreamState.Deleted"/> array or
/// <see cref="StreamState.Subjects"/> dictionary.
/// Reference: golang/nats-server/server/filestore.go — FastState.
/// </summary>
public void FastState(ref StreamState state)
{
state.Msgs = (ulong)_messages.Count;
state.Bytes = (ulong)_messages.Values.Sum(m => (long)m.Payload.Length);
state.LastSeq = _last;
state.LastTime = default;
if (_messages.Count == 0)
{
state.FirstSeq = 0;
state.FirstTime = default;
}
else
{
var firstSeq = _messages.Keys.Min();
state.FirstSeq = firstSeq;
state.FirstTime = _messages[firstSeq].TimestampUtc;
state.LastTime = _messages[_last].TimestampUtc;
}
}
// -------------------------------------------------------------------------
// Subject matching helper
// -------------------------------------------------------------------------
/// <summary>
/// Returns true if <paramref name="subject"/> matches <paramref name="filter"/>.
/// If filter is a literal, performs exact string comparison.
/// If filter contains NATS wildcards (* or >), uses SubjectMatch.MatchLiteral.
/// Reference: golang/nats-server/server/filestore.go — subjectMatch helper.
/// </summary>
private static bool SubjectMatchesFilter(string subject, string filter)
{
if (string.IsNullOrEmpty(filter))
return true;
if (NATS.Server.Subscriptions.SubjectMatch.IsLiteral(filter))
return string.Equals(subject, filter, StringComparison.Ordinal);
return NATS.Server.Subscriptions.SubjectMatch.MatchLiteral(subject, filter);
}
public ValueTask DisposeAsync()
{
DisposeAllBlocks();