fix: resolve MemStore timestamp overflow causing immediate message expiry
DateTime.UtcNow.Ticks * 100L overflows long (Ticks ~6.38e17, x100 = 6.38e19 > long.MaxValue 9.22e18), producing corrupted timestamps that cause PruneExpiredMessages to immediately expire all messages when MaxAgeMs is set. Use Unix epoch offset to match Go's time.Now().UnixNano().
This commit is contained in:
@@ -132,7 +132,7 @@ public sealed class MemStore : IStreamStore
|
||||
lock (_gate)
|
||||
{
|
||||
var seq = _st.LastSeq + 1;
|
||||
var ts = DateTime.UtcNow.Ticks * 100L; // nanos
|
||||
var ts = ToUnixNanos(DateTime.UtcNow); // nanos
|
||||
StoreInternal(subject, null, payload.IsEmpty ? null : payload.ToArray(), seq, ts, 0);
|
||||
return ValueTask.FromResult(seq);
|
||||
}
|
||||
@@ -149,7 +149,7 @@ public sealed class MemStore : IStreamStore
|
||||
Sequence = m.Seq,
|
||||
Subject = m.Subj,
|
||||
Payload = m.Data ?? [],
|
||||
TimestampUtc = new DateTime(m.Ts / 100L, DateTimeKind.Utc),
|
||||
TimestampUtc = FromUnixNanos(m.Ts),
|
||||
});
|
||||
}
|
||||
}
|
||||
@@ -167,7 +167,7 @@ public sealed class MemStore : IStreamStore
|
||||
Sequence = m.Seq,
|
||||
Subject = m.Subj,
|
||||
Payload = m.Data ?? [],
|
||||
TimestampUtc = new DateTime(m.Ts / 100L, DateTimeKind.Utc),
|
||||
TimestampUtc = FromUnixNanos(m.Ts),
|
||||
});
|
||||
}
|
||||
}
|
||||
@@ -183,7 +183,7 @@ public sealed class MemStore : IStreamStore
|
||||
Sequence = m.Seq,
|
||||
Subject = m.Subj,
|
||||
Payload = m.Data ?? [],
|
||||
TimestampUtc = new DateTime(m.Ts / 100L, DateTimeKind.Utc),
|
||||
TimestampUtc = FromUnixNanos(m.Ts),
|
||||
})
|
||||
.ToArray();
|
||||
return ValueTask.FromResult<IReadOnlyList<StoredMessage>>(list);
|
||||
@@ -219,7 +219,7 @@ public sealed class MemStore : IStreamStore
|
||||
Sequence = x.Seq,
|
||||
Subject = x.Subj,
|
||||
PayloadBase64 = Convert.ToBase64String(x.Data ?? []),
|
||||
TimestampUtc = new DateTime(x.Ts / 100L, DateTimeKind.Utc),
|
||||
TimestampUtc = FromUnixNanos(x.Ts),
|
||||
})
|
||||
.ToArray();
|
||||
return ValueTask.FromResult(JsonSerializer.SerializeToUtf8Bytes(snapshot));
|
||||
@@ -248,7 +248,7 @@ public sealed class MemStore : IStreamStore
|
||||
var data = record.PayloadBase64.Length > 0
|
||||
? Convert.FromBase64String(record.PayloadBase64)
|
||||
: null;
|
||||
var ts = new DateTimeOffset(record.TimestampUtc).ToUnixTimeMilliseconds() * 1_000_000L;
|
||||
var ts = ToUnixNanos(record.TimestampUtc);
|
||||
// Advance LastSeq so StoreInternal accepts this sequence (handles gaps)
|
||||
_st.LastSeq = record.Sequence - 1;
|
||||
StoreInternal(record.Subject, null, data, record.Sequence, ts, 0);
|
||||
@@ -290,7 +290,7 @@ public sealed class MemStore : IStreamStore
|
||||
{
|
||||
var seq = _st.LastSeq + 1;
|
||||
// Go uses time.Now().UnixNano() — use Ticks*100 for ~100ns precision
|
||||
var ts = DateTime.UtcNow.Ticks * 100L;
|
||||
var ts = ToUnixNanos(DateTime.UtcNow);
|
||||
StoreInternal(subject, hdr, msg.Length == 0 ? null : msg, seq, ts, ttl);
|
||||
return (seq, ts);
|
||||
}
|
||||
@@ -563,7 +563,7 @@ public sealed class MemStore : IStreamStore
|
||||
}
|
||||
_st.LastSeq = seq;
|
||||
if (_msgs.TryGetValue(seq, out var last))
|
||||
_st.LastTime = new DateTime(last.Ts / 100L, DateTimeKind.Utc);
|
||||
_st.LastTime = FromUnixNanos(last.Ts);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -576,7 +576,7 @@ public sealed class MemStore : IStreamStore
|
||||
return _st.LastSeq + 1;
|
||||
|
||||
// Go uses time.Time.UnixNano(); match with Ticks*100 (100ns resolution)
|
||||
var ts = t.ToUniversalTime().Ticks * 100L;
|
||||
var ts = ToUnixNanos(t.ToUniversalTime());
|
||||
|
||||
if (!_msgs.TryGetValue(_st.FirstSeq, out var firstMsg))
|
||||
return _st.LastSeq + 1;
|
||||
@@ -878,7 +878,7 @@ public sealed class MemStore : IStreamStore
|
||||
seq = _st.LastSeq + 1;
|
||||
}
|
||||
|
||||
var now = new DateTime(ts / 100L, DateTimeKind.Utc);
|
||||
var now = FromUnixNanos(ts);
|
||||
if (_st.Msgs == 0)
|
||||
{
|
||||
_st.FirstSeq = seq;
|
||||
@@ -964,7 +964,7 @@ public sealed class MemStore : IStreamStore
|
||||
if (nsm != null)
|
||||
{
|
||||
_st.FirstSeq = nsm.Seq;
|
||||
_st.FirstTime = new DateTime(nsm.Ts / 100L, DateTimeKind.Utc);
|
||||
_st.FirstTime = FromUnixNanos(nsm.Ts);
|
||||
}
|
||||
else
|
||||
{
|
||||
@@ -1062,7 +1062,7 @@ public sealed class MemStore : IStreamStore
|
||||
if (_msgs.TryGetValue(newFirst, out var nm))
|
||||
{
|
||||
_st.FirstSeq = newFirst;
|
||||
_st.FirstTime = new DateTime(nm.Ts / 100L, DateTimeKind.Utc);
|
||||
_st.FirstTime = FromUnixNanos(nm.Ts);
|
||||
break;
|
||||
}
|
||||
}
|
||||
@@ -1165,6 +1165,12 @@ public sealed class MemStore : IStreamStore
|
||||
return size;
|
||||
}
|
||||
|
||||
// Go: time.Now().UnixNano() — convert DateTime to Unix nanoseconds
|
||||
private static long ToUnixNanos(DateTime utc) => (utc.Ticks - DateTime.UnixEpoch.Ticks) * 100L;
|
||||
|
||||
// Convert Unix nanoseconds back to DateTime
|
||||
private static DateTime FromUnixNanos(long nanos) => new(nanos / 100L + DateTime.UnixEpoch.Ticks, DateTimeKind.Utc);
|
||||
|
||||
// Subject filter matching (same as FileStore helper)
|
||||
private static bool SubjectMatchesFilter(string subject, string filter)
|
||||
{
|
||||
|
||||
@@ -426,8 +426,8 @@ public sealed class StoreInterfaceTests
|
||||
for (var seq = 4UL; seq <= 7UL; seq++)
|
||||
s.RemoveMsg(seq).ShouldBeTrue();
|
||||
|
||||
// Convert nanoseconds timestamp to DateTime.
|
||||
var t = new DateTime(startTs / 100L, DateTimeKind.Utc);
|
||||
// Convert Unix nanoseconds timestamp to DateTime.
|
||||
var t = new DateTime(startTs / 100L + DateTime.UnixEpoch.Ticks, DateTimeKind.Utc);
|
||||
var found = s.GetSeqFromTime(t);
|
||||
found.ShouldBe(2UL);
|
||||
}
|
||||
@@ -454,7 +454,7 @@ public sealed class StoreInterfaceTests
|
||||
// Delete last message — trailing delete.
|
||||
s.RemoveMsg(3).ShouldBeTrue();
|
||||
|
||||
var t = new DateTime(startTs / 100L, DateTimeKind.Utc);
|
||||
var t = new DateTime(startTs / 100L + DateTime.UnixEpoch.Ticks, DateTimeKind.Utc);
|
||||
var found = s.GetSeqFromTime(t);
|
||||
found.ShouldBe(2UL);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user