feat: port internal data structures from Go (Wave 2)

- AVL SequenceSet: sparse sequence set with AVL tree, 16 tests
- Subject Tree: Adaptive Radix Tree (ART) with 5 node tiers, 59 tests
- Generic Subject List: trie-based subject matcher, 21 tests
- Time Hash Wheel: O(1) TTL expiration wheel, 8 tests

Total: 106 new tests (1,081 → 1,187 passing)
This commit is contained in:
Joseph Doherty
2026-02-23 20:56:20 -05:00
parent 636906f545
commit 256daad8e5
10 changed files with 6402 additions and 8 deletions

View File

@@ -1,7 +1,414 @@
// Go reference: server/thw/thw.go
// Time hash wheel for efficient TTL expiration tracking.
// Fixed-size array of slots (the wheel), each containing a dictionary of (seq, expires) entries.
// Slot index = (expires / tickResolution) % wheelSize.
using System.Buffers.Binary;
using System.Diagnostics;
namespace NATS.Server.Internal.TimeHashWheel;
// Go reference: server/thw/thw.go
// TODO: Port time hash wheel for TTL expiration
/// <summary>
/// A timing hash wheel for efficient TTL expiration management.
/// Uses a fixed-size circular buffer of slots, where each slot holds entries
/// that expire within the same time tick. Supports O(1) add/remove and
/// efficient batch expiration scanning.
/// </summary>
public class HashWheel
{
// Go: tickDuration = int64(time.Second) — tick duration in nanoseconds.
private const long TickDuration = 1_000_000_000;
// Go: wheelBits = 12, wheelSize = 1 << 12 = 4096, wheelMask = 4095.
private const int WheelBits = 12;
internal const int WheelSize = 1 << WheelBits;
private const int WheelMask = WheelSize - 1;
// Go: headerLen = 17 — 1 byte magic + 2 x uint64.
private const int HeaderLen = 17;
private Slot?[] _wheel;
private long _lowest;
private ulong _count;
public HashWheel()
{
_wheel = new Slot?[WheelSize];
_lowest = long.MaxValue;
}
/// <summary>
/// Gets the number of entries in the wheel.
/// </summary>
// Go: Count() server/thw/thw.go:190
public ulong Count => _count;
/// <summary>
/// Calculates the slot position for a given expiration time.
/// </summary>
// Go: getPosition server/thw/thw.go:66
private static int GetPosition(long expires)
{
return (int)((expires / TickDuration) & WheelMask);
}
/// <summary>
/// Schedules a new timer task. If the sequence already exists in the target slot,
/// its expiration is updated without incrementing the count.
/// </summary>
// Go: Add server/thw/thw.go:79
public void Add(ulong seq, long expires)
{
var pos = GetPosition(expires);
// Initialize the slot lazily.
_wheel[pos] ??= new Slot();
var slot = _wheel[pos]!;
if (!slot.Entries.ContainsKey(seq))
{
_count++;
}
slot.Entries[seq] = expires;
// Update slot's lowest expiration if this is earlier.
if (expires < slot.Lowest)
{
slot.Lowest = expires;
// Update global lowest if this is now the earliest.
if (expires < _lowest)
{
_lowest = expires;
}
}
}
/// <summary>
/// Removes a timer task. Returns true if the task was found and removed,
/// false if the task was not found.
/// </summary>
// Go: Remove server/thw/thw.go:103
public bool Remove(ulong seq, long expires)
{
var pos = GetPosition(expires);
var slot = _wheel[pos];
if (slot is null)
{
return false;
}
if (!slot.Entries.Remove(seq))
{
return false;
}
_count--;
// If the slot is empty, set it to null to free memory.
if (slot.Entries.Count == 0)
{
_wheel[pos] = null;
}
return true;
}
/// <summary>
/// Updates the expiration time of an existing timer task by removing it from
/// the old slot and adding it to the new one.
/// </summary>
// Go: Update server/thw/thw.go:123
public void Update(ulong seq, long oldExpires, long newExpires)
{
Remove(seq, oldExpires);
Add(seq, newExpires);
}
/// <summary>
/// Processes all expired tasks using the current time. The callback receives each
/// expired entry's sequence and expiration time. If the callback returns true,
/// the entry is removed; if false, it remains for future expiration checks.
/// </summary>
// Go: ExpireTasks server/thw/thw.go:133
public void ExpireTasks(Func<ulong, long, bool> callback)
{
var now = Stopwatch.GetTimestamp();
// Convert to nanoseconds for consistency with the Go implementation.
var nowNanos = (long)((double)now / Stopwatch.Frequency * 1_000_000_000);
ExpireTasksInternal(nowNanos, callback);
}
/// <summary>
/// Internal expiration method that accepts an explicit timestamp.
/// Used by tests that need deterministic time control.
/// </summary>
// Go: expireTasks server/thw/thw.go:138
internal void ExpireTasksInternal(long ts, Func<ulong, long, bool> callback)
{
// Quick return if nothing is expired.
if (_lowest > ts)
{
return;
}
var globalLowest = long.MaxValue;
for (var pos = 0; pos < _wheel.Length; pos++)
{
var slot = _wheel[pos];
// Skip slot if nothing to expire.
if (slot is null || slot.Lowest > ts)
{
if (slot is not null && slot.Lowest < globalLowest)
{
globalLowest = slot.Lowest;
}
continue;
}
// Track new lowest while processing expirations.
var slotLowest = long.MaxValue;
var toRemove = new List<ulong>();
foreach (var (seq, expires) in slot.Entries)
{
if (expires <= ts && callback(seq, expires))
{
toRemove.Add(seq);
continue;
}
if (expires < slotLowest)
{
slotLowest = expires;
}
}
foreach (var seq in toRemove)
{
slot.Entries.Remove(seq);
_count--;
}
// Nil out if we are empty.
if (slot.Entries.Count == 0)
{
_wheel[pos] = null;
}
else
{
slot.Lowest = slotLowest;
if (slotLowest < globalLowest)
{
globalLowest = slotLowest;
}
}
}
_lowest = globalLowest;
}
/// <summary>
/// Returns the earliest expiration time if it is before the given time.
/// Returns <see cref="long.MaxValue"/> if no expirations exist before the specified time.
/// </summary>
// Go: GetNextExpiration server/thw/thw.go:182
public long GetNextExpiration(long before)
{
if (_lowest < before)
{
return _lowest;
}
return long.MaxValue;
}
/// <summary>
/// Encodes the wheel state into a binary snapshot for persistence.
/// The high sequence number is included and will be returned on decode.
/// Format: [1 byte magic version][8 bytes entry count][8 bytes highSeq][varint expires, uvarint seq pairs...]
/// </summary>
// Go: Encode server/thw/thw.go:197
public byte[] Encode(ulong highSeq)
{
// Estimate capacity: header + entries * (max varint size * 2).
var estimatedSize = HeaderLen + (int)(_count * 2 * 10);
var buffer = new byte[estimatedSize];
var offset = 0;
// Magic version byte.
buffer[offset++] = 1;
// Entry count (little-endian uint64).
BinaryPrimitives.WriteUInt64LittleEndian(buffer.AsSpan(offset), _count);
offset += 8;
// High sequence stamp (little-endian uint64).
BinaryPrimitives.WriteUInt64LittleEndian(buffer.AsSpan(offset), highSeq);
offset += 8;
// Write all entries as varint(expires) + uvarint(seq) pairs.
foreach (var slot in _wheel)
{
if (slot?.Entries is null)
{
continue;
}
foreach (var (seq, expires) in slot.Entries)
{
// Ensure buffer has enough space.
if (offset + 20 > buffer.Length)
{
Array.Resize(ref buffer, buffer.Length * 2);
}
offset += WriteVarint(buffer.AsSpan(offset), expires);
offset += WriteUvarint(buffer.AsSpan(offset), seq);
}
}
return buffer.AsSpan(0, offset).ToArray();
}
/// <summary>
/// Decodes a binary-encoded snapshot and replaces the contents of this wheel.
/// Returns the high sequence number from the snapshot and the number of bytes consumed.
/// </summary>
// Go: Decode server/thw/thw.go:216
public (ulong HighSeq, int BytesRead) Decode(ReadOnlySpan<byte> buf)
{
if (buf.Length < HeaderLen)
{
throw new InvalidOperationException("Buffer too short for hash wheel header.");
}
if (buf[0] != 1)
{
throw new InvalidOperationException("Unknown hash wheel encoding version.");
}
// Reset the wheel.
_wheel = new Slot?[WheelSize];
_lowest = long.MaxValue;
_count = 0;
var count = BinaryPrimitives.ReadUInt64LittleEndian(buf[1..]);
var highSeq = BinaryPrimitives.ReadUInt64LittleEndian(buf[9..]);
var offset = HeaderLen;
for (ulong i = 0; i < count; i++)
{
var (ts, tn) = ReadVarint(buf[offset..]);
if (tn <= 0)
{
throw new InvalidOperationException("Unexpected end of buffer reading varint.");
}
var (seq, vn) = ReadUvarint(buf[(offset + tn)..]);
if (vn <= 0)
{
throw new InvalidOperationException("Unexpected end of buffer reading uvarint.");
}
Add(seq, ts);
offset += tn + vn;
}
return (highSeq, offset);
}
// Varint encoding/decoding compatible with Go's encoding/binary.
/// <summary>
/// Writes a signed varint (zigzag-encoded) to the buffer.
/// Compatible with Go's binary.AppendVarint / binary.Varint.
/// </summary>
private static int WriteVarint(Span<byte> buffer, long value)
{
// Zigzag encode: (value << 1) ^ (value >> 63)
var zigzag = (ulong)((value << 1) ^ (value >> 63));
return WriteUvarint(buffer, zigzag);
}
/// <summary>
/// Writes an unsigned varint to the buffer.
/// Compatible with Go's binary.AppendUvarint / binary.Uvarint.
/// </summary>
private static int WriteUvarint(Span<byte> buffer, ulong value)
{
var i = 0;
while (value >= 0x80)
{
buffer[i++] = (byte)(value | 0x80);
value >>= 7;
}
buffer[i++] = (byte)value;
return i;
}
/// <summary>
/// Reads a signed varint (zigzag-encoded) from the buffer.
/// Returns the value and the number of bytes consumed.
/// </summary>
private static (long Value, int BytesRead) ReadVarint(ReadOnlySpan<byte> buffer)
{
var (zigzag, n) = ReadUvarint(buffer);
if (n <= 0)
{
return (0, n);
}
// Zigzag decode: (zigzag >> 1) ^ -(zigzag & 1)
var value = (long)(zigzag >> 1) ^ -(long)(zigzag & 1);
return (value, n);
}
/// <summary>
/// Reads an unsigned varint from the buffer.
/// Returns the value and the number of bytes consumed.
/// </summary>
private static (ulong Value, int BytesRead) ReadUvarint(ReadOnlySpan<byte> buffer)
{
ulong result = 0;
var shift = 0;
for (var i = 0; i < buffer.Length; i++)
{
var b = buffer[i];
result |= (ulong)(b & 0x7F) << shift;
if ((b & 0x80) == 0)
{
return (result, i + 1);
}
shift += 7;
if (shift >= 64)
{
return (0, -1); // Overflow.
}
}
return (0, -1); // Buffer too short.
}
/// <summary>
/// Internal access to the wheel slots for testing encode/decode round-trip verification.
/// </summary>
internal Slot?[] Wheel => _wheel;
/// <summary>
/// Represents a single slot in the wheel containing entries that hash to the same position.
/// </summary>
internal sealed class Slot
{
// Go: slot.entries — map of sequence to expires.
public Dictionary<ulong, long> Entries { get; } = new();
// Go: slot.lowest — lowest expiration time in this slot.
public long Lowest { get; set; } = long.MaxValue;
}
}