fix: resolve MemStore timestamp overflow causing immediate message expiry

DateTime.UtcNow.Ticks * 100L overflows long (Ticks ~6.38e17, x100 =
6.38e19 > long.MaxValue 9.22e18), producing corrupted timestamps that
cause PruneExpiredMessages to immediately expire all messages when
MaxAgeMs is set. Use Unix epoch offset to match Go's time.Now().UnixNano().
This commit is contained in:
Joseph Doherty
2026-02-24 23:08:39 -05:00
parent 8fcf27af5b
commit 13a3f81d7e
2 changed files with 21 additions and 15 deletions

View File

@@ -132,7 +132,7 @@ public sealed class MemStore : IStreamStore
lock (_gate) lock (_gate)
{ {
var seq = _st.LastSeq + 1; var seq = _st.LastSeq + 1;
var ts = DateTime.UtcNow.Ticks * 100L; // nanos var ts = ToUnixNanos(DateTime.UtcNow); // nanos
StoreInternal(subject, null, payload.IsEmpty ? null : payload.ToArray(), seq, ts, 0); StoreInternal(subject, null, payload.IsEmpty ? null : payload.ToArray(), seq, ts, 0);
return ValueTask.FromResult(seq); return ValueTask.FromResult(seq);
} }
@@ -149,7 +149,7 @@ public sealed class MemStore : IStreamStore
Sequence = m.Seq, Sequence = m.Seq,
Subject = m.Subj, Subject = m.Subj,
Payload = m.Data ?? [], Payload = m.Data ?? [],
TimestampUtc = new DateTime(m.Ts / 100L, DateTimeKind.Utc), TimestampUtc = FromUnixNanos(m.Ts),
}); });
} }
} }
@@ -167,7 +167,7 @@ public sealed class MemStore : IStreamStore
Sequence = m.Seq, Sequence = m.Seq,
Subject = m.Subj, Subject = m.Subj,
Payload = m.Data ?? [], Payload = m.Data ?? [],
TimestampUtc = new DateTime(m.Ts / 100L, DateTimeKind.Utc), TimestampUtc = FromUnixNanos(m.Ts),
}); });
} }
} }
@@ -183,7 +183,7 @@ public sealed class MemStore : IStreamStore
Sequence = m.Seq, Sequence = m.Seq,
Subject = m.Subj, Subject = m.Subj,
Payload = m.Data ?? [], Payload = m.Data ?? [],
TimestampUtc = new DateTime(m.Ts / 100L, DateTimeKind.Utc), TimestampUtc = FromUnixNanos(m.Ts),
}) })
.ToArray(); .ToArray();
return ValueTask.FromResult<IReadOnlyList<StoredMessage>>(list); return ValueTask.FromResult<IReadOnlyList<StoredMessage>>(list);
@@ -219,7 +219,7 @@ public sealed class MemStore : IStreamStore
Sequence = x.Seq, Sequence = x.Seq,
Subject = x.Subj, Subject = x.Subj,
PayloadBase64 = Convert.ToBase64String(x.Data ?? []), PayloadBase64 = Convert.ToBase64String(x.Data ?? []),
TimestampUtc = new DateTime(x.Ts / 100L, DateTimeKind.Utc), TimestampUtc = FromUnixNanos(x.Ts),
}) })
.ToArray(); .ToArray();
return ValueTask.FromResult(JsonSerializer.SerializeToUtf8Bytes(snapshot)); return ValueTask.FromResult(JsonSerializer.SerializeToUtf8Bytes(snapshot));
@@ -248,7 +248,7 @@ public sealed class MemStore : IStreamStore
var data = record.PayloadBase64.Length > 0 var data = record.PayloadBase64.Length > 0
? Convert.FromBase64String(record.PayloadBase64) ? Convert.FromBase64String(record.PayloadBase64)
: null; : null;
var ts = new DateTimeOffset(record.TimestampUtc).ToUnixTimeMilliseconds() * 1_000_000L; var ts = ToUnixNanos(record.TimestampUtc);
// Advance LastSeq so StoreInternal accepts this sequence (handles gaps) // Advance LastSeq so StoreInternal accepts this sequence (handles gaps)
_st.LastSeq = record.Sequence - 1; _st.LastSeq = record.Sequence - 1;
StoreInternal(record.Subject, null, data, record.Sequence, ts, 0); StoreInternal(record.Subject, null, data, record.Sequence, ts, 0);
@@ -290,7 +290,7 @@ public sealed class MemStore : IStreamStore
{ {
var seq = _st.LastSeq + 1; var seq = _st.LastSeq + 1;
// Go uses time.Now().UnixNano() — use Ticks*100 for ~100ns precision // Go uses time.Now().UnixNano() — use Ticks*100 for ~100ns precision
var ts = DateTime.UtcNow.Ticks * 100L; var ts = ToUnixNanos(DateTime.UtcNow);
StoreInternal(subject, hdr, msg.Length == 0 ? null : msg, seq, ts, ttl); StoreInternal(subject, hdr, msg.Length == 0 ? null : msg, seq, ts, ttl);
return (seq, ts); return (seq, ts);
} }
@@ -563,7 +563,7 @@ public sealed class MemStore : IStreamStore
} }
_st.LastSeq = seq; _st.LastSeq = seq;
if (_msgs.TryGetValue(seq, out var last)) if (_msgs.TryGetValue(seq, out var last))
_st.LastTime = new DateTime(last.Ts / 100L, DateTimeKind.Utc); _st.LastTime = FromUnixNanos(last.Ts);
} }
} }
@@ -576,7 +576,7 @@ public sealed class MemStore : IStreamStore
return _st.LastSeq + 1; return _st.LastSeq + 1;
// Go uses time.Time.UnixNano(); match with Ticks*100 (100ns resolution) // Go uses time.Time.UnixNano(); match with Ticks*100 (100ns resolution)
var ts = t.ToUniversalTime().Ticks * 100L; var ts = ToUnixNanos(t.ToUniversalTime());
if (!_msgs.TryGetValue(_st.FirstSeq, out var firstMsg)) if (!_msgs.TryGetValue(_st.FirstSeq, out var firstMsg))
return _st.LastSeq + 1; return _st.LastSeq + 1;
@@ -878,7 +878,7 @@ public sealed class MemStore : IStreamStore
seq = _st.LastSeq + 1; seq = _st.LastSeq + 1;
} }
var now = new DateTime(ts / 100L, DateTimeKind.Utc); var now = FromUnixNanos(ts);
if (_st.Msgs == 0) if (_st.Msgs == 0)
{ {
_st.FirstSeq = seq; _st.FirstSeq = seq;
@@ -964,7 +964,7 @@ public sealed class MemStore : IStreamStore
if (nsm != null) if (nsm != null)
{ {
_st.FirstSeq = nsm.Seq; _st.FirstSeq = nsm.Seq;
_st.FirstTime = new DateTime(nsm.Ts / 100L, DateTimeKind.Utc); _st.FirstTime = FromUnixNanos(nsm.Ts);
} }
else else
{ {
@@ -1062,7 +1062,7 @@ public sealed class MemStore : IStreamStore
if (_msgs.TryGetValue(newFirst, out var nm)) if (_msgs.TryGetValue(newFirst, out var nm))
{ {
_st.FirstSeq = newFirst; _st.FirstSeq = newFirst;
_st.FirstTime = new DateTime(nm.Ts / 100L, DateTimeKind.Utc); _st.FirstTime = FromUnixNanos(nm.Ts);
break; break;
} }
} }
@@ -1165,6 +1165,12 @@ public sealed class MemStore : IStreamStore
return size; return size;
} }
// Go: time.Now().UnixNano() — convert DateTime to Unix nanoseconds
private static long ToUnixNanos(DateTime utc) => (utc.Ticks - DateTime.UnixEpoch.Ticks) * 100L;
// Convert Unix nanoseconds back to DateTime
private static DateTime FromUnixNanos(long nanos) => new(nanos / 100L + DateTime.UnixEpoch.Ticks, DateTimeKind.Utc);
// Subject filter matching (same as FileStore helper) // Subject filter matching (same as FileStore helper)
private static bool SubjectMatchesFilter(string subject, string filter) private static bool SubjectMatchesFilter(string subject, string filter)
{ {

View File

@@ -426,8 +426,8 @@ public sealed class StoreInterfaceTests
for (var seq = 4UL; seq <= 7UL; seq++) for (var seq = 4UL; seq <= 7UL; seq++)
s.RemoveMsg(seq).ShouldBeTrue(); s.RemoveMsg(seq).ShouldBeTrue();
// Convert nanoseconds timestamp to DateTime. // Convert Unix nanoseconds timestamp to DateTime.
var t = new DateTime(startTs / 100L, DateTimeKind.Utc); var t = new DateTime(startTs / 100L + DateTime.UnixEpoch.Ticks, DateTimeKind.Utc);
var found = s.GetSeqFromTime(t); var found = s.GetSeqFromTime(t);
found.ShouldBe(2UL); found.ShouldBe(2UL);
} }
@@ -454,7 +454,7 @@ public sealed class StoreInterfaceTests
// Delete last message — trailing delete. // Delete last message — trailing delete.
s.RemoveMsg(3).ShouldBeTrue(); s.RemoveMsg(3).ShouldBeTrue();
var t = new DateTime(startTs / 100L, DateTimeKind.Utc); var t = new DateTime(startTs / 100L + DateTime.UnixEpoch.Ticks, DateTimeKind.Utc);
var found = s.GetSeqFromTime(t); var found = s.GetSeqFromTime(t);
found.ShouldBe(2UL); found.ShouldBe(2UL);
} }