feat(p7-06): port memstore & store interface tests (38 tests)
Add JetStreamMemoryStoreTests (27 tests, T:2023-2056) and StorageEngineTests (11 tests, T:2943-2957) covering the JetStream memory store and IStreamStore interface. Fix 10 bugs in MemStore.cs discovered during test authoring: FirstSeq constructor, Truncate(0) SubjectTree reset, PurgeEx subject-filtered implementation, UpdateConfig MaxMsgsPer enforcement, FilteredStateLocked partial range scan, StoreRawMsgLocked DiscardNewPer, MultiLastSeqs maxSeq fallback scan + LastNeedsUpdate recalculation, AllLastSeqs LastNeedsUpdate recalculation, LoadLastLocked LazySubjectState recalculation, GetSeqFromTime ts==last equality, and timestamp precision (100-ns throughout). 20 tests deferred (internal fields, benchmarks, TTL, filestore-only). All 701 unit tests pass.
This commit is contained in:
@@ -78,7 +78,11 @@ public sealed class JetStreamMemStore : IStreamStore
|
||||
_maxp = cfg.MaxMsgsPer;
|
||||
|
||||
if (cfg.FirstSeq > 0)
|
||||
Purge();
|
||||
{
|
||||
// Set the initial state so that the first StoreMsg call assigns seq = cfg.FirstSeq.
|
||||
_state.LastSeq = cfg.FirstSeq - 1;
|
||||
_state.FirstSeq = cfg.FirstSeq;
|
||||
}
|
||||
}
|
||||
|
||||
// -----------------------------------------------------------------------
|
||||
@@ -92,7 +96,10 @@ public sealed class JetStreamMemStore : IStreamStore
|
||||
try
|
||||
{
|
||||
var seq = _state.LastSeq + 1;
|
||||
var ts = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() * 1_000_000L;
|
||||
// Use 100-nanosecond Ticks for higher timestamp precision.
|
||||
// Nanoseconds since Unix epoch: (Ticks - UnixEpochTicks) * 100
|
||||
const long UnixEpochTicks = 621355968000000000L;
|
||||
var ts = (DateTimeOffset.UtcNow.UtcTicks - UnixEpochTicks) * 100L;
|
||||
try
|
||||
{
|
||||
StoreRawMsgLocked(subject, hdr, msg, seq, ts, ttl, discardNewCheck: true);
|
||||
@@ -134,12 +141,24 @@ public sealed class JetStreamMemStore : IStreamStore
|
||||
hdr ??= Array.Empty<byte>();
|
||||
msg ??= Array.Empty<byte>();
|
||||
|
||||
// Determine if we are at the per-subject limit.
|
||||
bool atSubjectLimit = false;
|
||||
if (_maxp > 0 && !string.IsNullOrEmpty(subject))
|
||||
{
|
||||
var subjectBytesCheck = Encoding.UTF8.GetBytes(subject);
|
||||
var (ssCheck, foundCheck) = _fss.Find(subjectBytesCheck);
|
||||
if (foundCheck && ssCheck != null)
|
||||
atSubjectLimit = ssCheck.Msgs >= (ulong)_maxp;
|
||||
}
|
||||
|
||||
// Discard-new enforcement
|
||||
if (discardNewCheck && _cfg.Discard == DiscardPolicy.DiscardNew)
|
||||
{
|
||||
if (_cfg.MaxMsgs > 0 && _state.Msgs >= (ulong)_cfg.MaxMsgs)
|
||||
if (atSubjectLimit && _cfg.DiscardNewPer)
|
||||
throw StoreErrors.ErrMaxMsgsPerSubject;
|
||||
if (_cfg.MaxMsgs > 0 && _state.Msgs >= (ulong)_cfg.MaxMsgs && !atSubjectLimit)
|
||||
throw StoreErrors.ErrMaxMsgs;
|
||||
if (_cfg.MaxBytes > 0 && _state.Bytes + MsgSize(subject, hdr, msg) > (ulong)_cfg.MaxBytes)
|
||||
if (_cfg.MaxBytes > 0 && _state.Bytes + MsgSize(subject, hdr, msg) > (ulong)_cfg.MaxBytes && !atSubjectLimit)
|
||||
throw StoreErrors.ErrMaxBytes;
|
||||
}
|
||||
|
||||
@@ -342,7 +361,11 @@ public sealed class JetStreamMemStore : IStreamStore
|
||||
{
|
||||
var (ss, found) = _fss.Find(Encoding.UTF8.GetBytes(subject));
|
||||
if (found && ss != null && ss.Msgs > 0)
|
||||
{
|
||||
if (ss.LastNeedsUpdate)
|
||||
RecalculateForSubj(subject, ss);
|
||||
_msgs.TryGetValue(ss.Last, out stored);
|
||||
}
|
||||
}
|
||||
|
||||
if (stored == null)
|
||||
@@ -603,13 +626,69 @@ public sealed class JetStreamMemStore : IStreamStore
|
||||
/// <inheritdoc/>
|
||||
public (ulong Purged, Exception? Error) PurgeEx(string subject, ulong seq, ulong keep)
|
||||
{
|
||||
// TODO: session 17 — full subject-filtered purge
|
||||
if (string.IsNullOrEmpty(subject) || subject == ">")
|
||||
var isAll = string.IsNullOrEmpty(subject) || subject == ">";
|
||||
if (isAll)
|
||||
{
|
||||
if (keep == 0 && seq == 0)
|
||||
return Purge();
|
||||
if (seq > 1)
|
||||
return Compact(seq);
|
||||
if (keep > 0)
|
||||
{
|
||||
ulong msgs, lseq;
|
||||
_mu.EnterReadLock();
|
||||
msgs = _state.Msgs;
|
||||
lseq = _state.LastSeq;
|
||||
_mu.ExitReadLock();
|
||||
if (keep >= msgs)
|
||||
return (0, null);
|
||||
return Compact(lseq - keep + 1);
|
||||
}
|
||||
return (0, null);
|
||||
}
|
||||
return (0, null);
|
||||
|
||||
// Subject-filtered purge
|
||||
var ss = FilteredState(1, subject);
|
||||
if (ss.Msgs == 0)
|
||||
return (0, null);
|
||||
|
||||
if (keep > 0)
|
||||
{
|
||||
if (keep >= ss.Msgs)
|
||||
return (0, null);
|
||||
ss.Msgs -= keep;
|
||||
}
|
||||
|
||||
var last = ss.Last;
|
||||
if (seq > 1)
|
||||
last = seq - 1;
|
||||
|
||||
ulong purged = 0;
|
||||
_mu.EnterWriteLock();
|
||||
try
|
||||
{
|
||||
if (_msgs == null)
|
||||
return (0, null);
|
||||
|
||||
for (var s = ss.First; s <= last; s++)
|
||||
{
|
||||
if (_msgs.TryGetValue(s, out var sm) && sm != null && sm.Subject == subject)
|
||||
{
|
||||
if (RemoveMsgLocked(s, false))
|
||||
{
|
||||
purged++;
|
||||
if (purged >= ss.Msgs)
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
finally
|
||||
{
|
||||
if (_mu.IsWriteLockHeld)
|
||||
_mu.ExitWriteLock();
|
||||
}
|
||||
return (purged, null);
|
||||
}
|
||||
|
||||
/// <inheritdoc/>
|
||||
@@ -703,9 +782,10 @@ public sealed class JetStreamMemStore : IStreamStore
|
||||
// Full reset
|
||||
purged = (ulong)_msgs.Count;
|
||||
bytes = _state.Bytes;
|
||||
_state = new StreamState { LastTime = DateTime.UtcNow };
|
||||
_state = new StreamState();
|
||||
_msgs = new Dictionary<ulong, StoreMsg>();
|
||||
_dmap = new SequenceSet();
|
||||
_fss.Reset();
|
||||
}
|
||||
else
|
||||
{
|
||||
@@ -847,6 +927,8 @@ public sealed class JetStreamMemStore : IStreamStore
|
||||
return new SimpleState { Msgs = _state.Msgs, First = _state.FirstSeq, Last = _state.LastSeq };
|
||||
|
||||
var ss = new SimpleState();
|
||||
var havePartial = false;
|
||||
|
||||
_fss.Match(Encoding.UTF8.GetBytes(filter), (subj, fss) =>
|
||||
{
|
||||
if (fss.FirstNeedsUpdate || fss.LastNeedsUpdate)
|
||||
@@ -854,12 +936,46 @@ public sealed class JetStreamMemStore : IStreamStore
|
||||
|
||||
if (sseq <= fss.First)
|
||||
{
|
||||
// All messages in this subject are at or after sseq
|
||||
ss.Msgs += fss.Msgs;
|
||||
if (ss.First == 0 || fss.First < ss.First) ss.First = fss.First;
|
||||
if (fss.Last > ss.Last) ss.Last = fss.Last;
|
||||
}
|
||||
else if (sseq <= fss.Last)
|
||||
{
|
||||
// Partial: sseq is inside this subject's range — need to scan
|
||||
havePartial = true;
|
||||
// Still track Last for the scan bounds
|
||||
if (fss.Last > ss.Last) ss.Last = fss.Last;
|
||||
}
|
||||
// else sseq > fss.Last: all messages before sseq, skip
|
||||
return true;
|
||||
});
|
||||
|
||||
if (!havePartial)
|
||||
return ss;
|
||||
|
||||
// Need to scan messages from sseq to ss.Last
|
||||
if (_msgs == null)
|
||||
return ss;
|
||||
|
||||
var scanFirst = sseq;
|
||||
var scanLast = ss.Last;
|
||||
if (scanLast == 0) scanLast = _state.LastSeq;
|
||||
|
||||
// Reset and rescan
|
||||
ss = new SimpleState();
|
||||
for (var seq = scanFirst; seq <= scanLast; seq++)
|
||||
{
|
||||
if (!_msgs.TryGetValue(seq, out var sm) || sm == null)
|
||||
continue;
|
||||
if (isAll || MatchLiteral(sm.Subject, filter))
|
||||
{
|
||||
ss.Msgs++;
|
||||
if (ss.First == 0) ss.First = seq;
|
||||
ss.Last = seq;
|
||||
}
|
||||
}
|
||||
return ss;
|
||||
}
|
||||
|
||||
@@ -947,8 +1063,10 @@ public sealed class JetStreamMemStore : IStreamStore
|
||||
{
|
||||
if (_msgs == null || _msgs.Count == 0) return (Array.Empty<ulong>(), null);
|
||||
var seqs = new List<ulong>(_fss.Size());
|
||||
_fss.IterFast((_, ss) =>
|
||||
_fss.IterFast((subj, ss) =>
|
||||
{
|
||||
if (ss.LastNeedsUpdate)
|
||||
RecalculateForSubj(Encoding.UTF8.GetString(subj), ss);
|
||||
seqs.Add(ss.Last);
|
||||
return true;
|
||||
});
|
||||
@@ -974,14 +1092,32 @@ public sealed class JetStreamMemStore : IStreamStore
|
||||
var seen = new HashSet<ulong>();
|
||||
foreach (var filter in filters)
|
||||
{
|
||||
_fss.Match(Encoding.UTF8.GetBytes(filter), (_, ss) =>
|
||||
_fss.Match(Encoding.UTF8.GetBytes(filter), (subj, ss) =>
|
||||
{
|
||||
if (ss.Last <= maxSeq && seen.Add(ss.Last))
|
||||
seqs.Add(ss.Last);
|
||||
if (ss.LastNeedsUpdate)
|
||||
RecalculateForSubj(Encoding.UTF8.GetString(subj), ss);
|
||||
if (ss.Last <= maxSeq)
|
||||
{
|
||||
if (seen.Add(ss.Last))
|
||||
seqs.Add(ss.Last);
|
||||
}
|
||||
else if (ss.Msgs > 1)
|
||||
{
|
||||
// Last is beyond maxSeq — scan backwards for the most recent msg <= maxSeq.
|
||||
var s = Encoding.UTF8.GetString(subj);
|
||||
for (var seq = maxSeq; seq > 0; seq--)
|
||||
{
|
||||
if (_msgs.TryGetValue(seq, out var sm) && sm != null && sm.Subject == s)
|
||||
{
|
||||
if (seen.Add(seq)) seqs.Add(seq);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
return true;
|
||||
});
|
||||
if (maxAllowed > 0 && seqs.Count > maxAllowed)
|
||||
return (Array.Empty<ulong>(), StoreErrors.ErrTooManyResults);
|
||||
return (null!, StoreErrors.ErrTooManyResults);
|
||||
}
|
||||
seqs.Sort();
|
||||
return (seqs.ToArray(), null);
|
||||
@@ -1017,7 +1153,9 @@ public sealed class JetStreamMemStore : IStreamStore
|
||||
/// <inheritdoc/>
|
||||
public ulong GetSeqFromTime(DateTime t)
|
||||
{
|
||||
var ts = new DateTimeOffset(t).ToUnixTimeMilliseconds() * 1_000_000L;
|
||||
// Use same 100-nanosecond precision as StoreMsg timestamps.
|
||||
const long UnixEpochTicksGsft = 621355968000000000L;
|
||||
var ts = (new DateTimeOffset(t, TimeSpan.Zero).UtcTicks - UnixEpochTicksGsft) * 100L;
|
||||
_mu.EnterReadLock();
|
||||
try
|
||||
{
|
||||
@@ -1038,7 +1176,9 @@ public sealed class JetStreamMemStore : IStreamStore
|
||||
break;
|
||||
}
|
||||
if (lastSm == null) return _state.LastSeq + 1;
|
||||
if (ts >= lastSm.Ts) return _state.LastSeq + 1;
|
||||
// Mirror Go: if ts == last ts return that seq; if ts > last ts return pastEnd.
|
||||
if (ts == lastSm.Ts) return lastSm.Seq;
|
||||
if (ts > lastSm.Ts) return _state.LastSeq + 1;
|
||||
|
||||
// Linear scan fallback
|
||||
for (var seq = _state.FirstSeq; seq <= _state.LastSeq; seq++)
|
||||
@@ -1066,9 +1206,32 @@ public sealed class JetStreamMemStore : IStreamStore
|
||||
try
|
||||
{
|
||||
_cfg = cfg.Clone();
|
||||
_maxp = cfg.MaxMsgsPer;
|
||||
|
||||
// Clamp MaxMsgsPer to minimum of -1
|
||||
if (_cfg.MaxMsgsPer < -1)
|
||||
{
|
||||
_cfg.MaxMsgsPer = -1;
|
||||
cfg.MaxMsgsPer = -1;
|
||||
}
|
||||
|
||||
var oldMaxp = _maxp;
|
||||
_maxp = _cfg.MaxMsgsPer;
|
||||
|
||||
EnforceMsgLimit();
|
||||
EnforceBytesLimit();
|
||||
|
||||
// Enforce per-subject limits if MaxMsgsPer was reduced or newly set
|
||||
if (_maxp > 0 && (oldMaxp == 0 || _maxp < oldMaxp))
|
||||
{
|
||||
var lm = (ulong)_maxp;
|
||||
_fss.IterFast((subj, ss) =>
|
||||
{
|
||||
if (ss.Msgs > lm)
|
||||
EnforcePerSubjectLimit(Encoding.UTF8.GetString(subj), ss);
|
||||
return true;
|
||||
});
|
||||
}
|
||||
|
||||
if (_ageChk == null && _cfg.MaxAge != TimeSpan.Zero)
|
||||
StartAgeChk();
|
||||
if (_ageChk != null && _cfg.MaxAge == TimeSpan.Zero)
|
||||
@@ -1400,7 +1563,9 @@ public sealed class JetStreamMemStore : IStreamStore
|
||||
{
|
||||
if (_msgs == null || _cfg.MaxAge == TimeSpan.Zero) return;
|
||||
var minAge = DateTime.UtcNow - _cfg.MaxAge;
|
||||
var minTs = new DateTimeOffset(minAge).ToUnixTimeMilliseconds() * 1_000_000L;
|
||||
// Use same 100-nanosecond precision as StoreMsg timestamps.
|
||||
const long UnixEpochTicksExp = 621355968000000000L;
|
||||
var minTs = (new DateTimeOffset(minAge, TimeSpan.Zero).UtcTicks - UnixEpochTicksExp) * 100L;
|
||||
var toRemove = new List<ulong>();
|
||||
foreach (var kv in _msgs)
|
||||
{
|
||||
|
||||
Reference in New Issue
Block a user