feat(p7-09): JetStream unit tests — versioning (12), dirstore (12), batching/errors deferred (66)

Port session P7-09: add tests from jetstream_versioning_test.go (T:1791–1808),
dirstore_test.go (T:285–296), jetstream_batching_test.go (T:716–744),
jetstream_errors_test.go (T:1381–1384), and accounts_test.go (T:80–110).

- JetStreamVersioningTests: 12 active unit tests + 6 deferred (server-required)
- DirectoryStoreTests: 12 filesystem tests using fake JWTs (no NKeys dependency)
- JetStreamBatchingTests: 29 deferred stubs (all require running JetStream cluster)
- JetStreamErrorsTests: 4 deferred stubs (NewJS* factories not yet ported)
- accounts_test.go T:80–110: 31 deferred (all use RunServerWithConfig)

Fix DirJwtStore.cs expiration bugs:
  - Use DateTimeOffset.UtcNow.UtcTicks (not Unix-relative ticks) for expiry comparison
  - Replace in-place JwtItem mutation with new-object replacement so DrainStale
    can detect stale heap entries via ReferenceEquals check

Add JetStreamVersioning.cs methods: SetStaticStreamMetadata,
SetDynamicStreamMetadata, CopyStreamMetadata, SetStaticConsumerMetadata,
SetDynamicConsumerMetadata, SetDynamicConsumerInfoMetadata, CopyConsumerMetadata.

Tests: 725 pass, 53 skipped/deferred, 0 failures.
DB: +24 complete, +66 deferred.
This commit is contained in:
Joseph Doherty
2026-02-26 20:02:00 -05:00
parent 6e90eea736
commit f0faaffe69
9 changed files with 1627 additions and 18 deletions

View File

@@ -624,7 +624,7 @@ public sealed class DirJwtStore : IDisposable
/// Deletes the JWT for <paramref name="publicKey"/> according to <see cref="_deleteType"/>.
/// Mirrors Go <c>DirJWTStore.delete</c>.
/// </summary>
private void Delete(string publicKey)
public void Delete(string publicKey)
{
if (_readonly)
{
@@ -795,7 +795,7 @@ public sealed class DirJwtStore : IDisposable
// Background timer — mirrors Go goroutine + time.Ticker.
var timer = new Timer(_ =>
{
var now = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() * TimeSpan.TicksPerMillisecond;
var now = DateTimeOffset.UtcNow.UtcTicks;
while (true)
{
@@ -1104,14 +1104,13 @@ internal sealed class ExpirationTracker
// Remove old hash contribution from rolling XOR.
XorAssign(_hash, existing.Hash);
// Update in-place.
existing.Expiration = exp;
existing.Hash = hash;
// Re-enqueue with updated priority (PriorityQueue does not support update;
// use a version counter approach — mark old entry stale, enqueue fresh).
existing.Version++;
_heap.Enqueue(existing, exp);
// Create a new JwtItem so the old heap entry becomes a stale orphan.
// DrainStale uses ReferenceEquals(current, top) to detect orphans:
// the old heap entry points to the old JwtItem object which is no longer
// in _idx, so it will be drained on the next PeekExpired call.
var updated = new JwtItem(publicKey, exp, hash);
_idx[publicKey] = updated;
_heap.Enqueue(updated, exp);
}
else
{
@@ -1141,9 +1140,11 @@ internal sealed class ExpirationTracker
? long.MaxValue
: (DateTimeOffset.UtcNow + Ttl).UtcTicks;
item.Expiration = newExp;
item.Version++;
_heap.Enqueue(item, newExp);
// Replace with a new JwtItem so the old heap entry becomes a stale orphan
// (DrainStale detects staleness via ReferenceEquals).
var updated = new JwtItem(publicKey, newExp, item.Hash);
_idx[publicKey] = updated;
_heap.Enqueue(updated, newExp);
}
if (EvictOnLimit)

View File

@@ -103,4 +103,224 @@ public static class JetStreamVersioning
if (string.IsNullOrEmpty(reqApiLevelHeader)) return false;
return !int.TryParse(reqApiLevelHeader, out var minLevel) || JsApiLevel < minLevel;
}
// ---- Stream metadata mutations ----
/// <summary>
/// Sets the required API level in stream config metadata based on which v2.11+/v2.12+ features
/// the stream config uses. Removes any dynamic fields first.
/// Mirrors <c>setStaticStreamMetadata</c>.
/// </summary>
public static void SetStaticStreamMetadata(StreamConfig cfg)
{
cfg.Metadata ??= new Dictionary<string, string>();
DeleteDynamicMetadata(cfg.Metadata);
var requiredApiLevel = 0;
void Requires(int level) { if (level > requiredApiLevel) requiredApiLevel = level; }
if (cfg.AllowMsgTTL || cfg.SubjectDeleteMarkerTTL > TimeSpan.Zero)
Requires(ApiLevelForTTL);
if (cfg.AllowMsgCounter)
Requires(ApiLevelForCounters);
if (cfg.AllowAtomicPublish)
Requires(ApiLevelForAtomicPublish);
if (cfg.AllowMsgSchedules)
Requires(ApiLevelForMsgSchedules);
if (cfg.PersistMode == PersistModeType.AsyncPersistMode)
Requires(ApiLevelForAsyncPersist);
cfg.Metadata[JsRequiredLevelMetadataKey] = requiredApiLevel.ToString();
}
/// <summary>
/// Returns a shallow copy of the stream config with dynamic versioning fields added to a new
/// metadata dictionary. Does not mutate <paramref name="cfg"/>.
/// Mirrors <c>setDynamicStreamMetadata</c>.
/// </summary>
public static StreamConfig SetDynamicStreamMetadata(StreamConfig cfg)
{
// Shallow-copy the struct-like record: clone all fields then replace metadata.
var newCfg = cfg.Clone();
newCfg.Metadata = new Dictionary<string, string>();
if (cfg.Metadata != null)
foreach (var kv in cfg.Metadata)
newCfg.Metadata[kv.Key] = kv.Value;
newCfg.Metadata[JsServerVersionMetadataKey] = ServerConstants.Version;
newCfg.Metadata[JsServerLevelMetadataKey] = JsApiLevel.ToString();
return newCfg;
}
/// <summary>
/// Copies the required-level versioning field from <paramref name="prevCfg"/> into
/// <paramref name="cfg"/>, removing dynamic fields and deleting the key if absent in prevCfg.
/// Mirrors <c>copyStreamMetadata</c>.
/// </summary>
public static void CopyStreamMetadata(StreamConfig cfg, StreamConfig? prevCfg)
{
if (cfg.Metadata != null)
DeleteDynamicMetadata(cfg.Metadata);
SetOrDeleteInStreamMetadata(cfg, prevCfg, JsRequiredLevelMetadataKey);
}
private static void SetOrDeleteInStreamMetadata(StreamConfig cfg, StreamConfig? prevCfg, string key)
{
if (prevCfg?.Metadata != null && prevCfg.Metadata.TryGetValue(key, out var value))
{
cfg.Metadata ??= new Dictionary<string, string>();
cfg.Metadata[key] = value;
return;
}
if (cfg.Metadata != null)
{
cfg.Metadata.Remove(key);
if (cfg.Metadata.Count == 0)
cfg.Metadata = null;
}
}
// ---- Consumer metadata mutations ----
/// <summary>
/// Sets the required API level in consumer config metadata based on which v2.11+ features
/// the consumer config uses. Removes any dynamic fields first.
/// Mirrors <c>setStaticConsumerMetadata</c>.
/// </summary>
public static void SetStaticConsumerMetadata(ConsumerConfig cfg)
{
cfg.Metadata ??= new Dictionary<string, string>();
DeleteDynamicMetadata(cfg.Metadata);
var requiredApiLevel = 0;
void Requires(int level) { if (level > requiredApiLevel) requiredApiLevel = level; }
if (cfg.PauseUntil.HasValue && cfg.PauseUntil.Value != default)
Requires(ApiLevelForConsumerPause);
if (cfg.PriorityPolicy != PriorityPolicy.PriorityNone
|| cfg.PinnedTTL != TimeSpan.Zero
|| (cfg.PriorityGroups != null && cfg.PriorityGroups.Length > 0))
Requires(ApiLevelForPriorityGroups);
cfg.Metadata[JsRequiredLevelMetadataKey] = requiredApiLevel.ToString();
}
/// <summary>
/// Returns a shallow copy of the consumer config with dynamic versioning fields added to a new
/// metadata dictionary. Does not mutate <paramref name="cfg"/>.
/// Mirrors <c>setDynamicConsumerMetadata</c>.
/// </summary>
public static ConsumerConfig SetDynamicConsumerMetadata(ConsumerConfig cfg)
{
var newCfg = new ConsumerConfig();
// Copy all fields via serialisation-free approach: copy properties from cfg
CopyConsumerConfigFields(cfg, newCfg);
newCfg.Metadata = new Dictionary<string, string>();
if (cfg.Metadata != null)
foreach (var kv in cfg.Metadata)
newCfg.Metadata[kv.Key] = kv.Value;
newCfg.Metadata[JsServerVersionMetadataKey] = ServerConstants.Version;
newCfg.Metadata[JsServerLevelMetadataKey] = JsApiLevel.ToString();
return newCfg;
}
/// <summary>
/// Returns a shallow copy of the consumer info with dynamic versioning fields added to the
/// config's metadata. Does not mutate <paramref name="info"/>.
/// Mirrors <c>setDynamicConsumerInfoMetadata</c>.
/// </summary>
public static ConsumerInfo SetDynamicConsumerInfoMetadata(ConsumerInfo info)
{
var newInfo = new ConsumerInfo
{
Stream = info.Stream,
Name = info.Name,
Created = info.Created,
Delivered = info.Delivered,
AckFloor = info.AckFloor,
NumAckPending = info.NumAckPending,
NumRedelivered = info.NumRedelivered,
NumWaiting = info.NumWaiting,
NumPending = info.NumPending,
Cluster = info.Cluster,
PushBound = info.PushBound,
Paused = info.Paused,
PauseRemaining = info.PauseRemaining,
TimeStamp = info.TimeStamp,
PriorityGroups = info.PriorityGroups,
Config = info.Config != null ? SetDynamicConsumerMetadata(info.Config) : null,
};
return newInfo;
}
/// <summary>
/// Copies the required-level versioning field from <paramref name="prevCfg"/> into
/// <paramref name="cfg"/>, removing dynamic fields and deleting the key if absent in prevCfg.
/// Mirrors <c>copyConsumerMetadata</c>.
/// </summary>
public static void CopyConsumerMetadata(ConsumerConfig cfg, ConsumerConfig? prevCfg)
{
if (cfg.Metadata != null)
DeleteDynamicMetadata(cfg.Metadata);
SetOrDeleteInConsumerMetadata(cfg, prevCfg, JsRequiredLevelMetadataKey);
}
private static void SetOrDeleteInConsumerMetadata(ConsumerConfig cfg, ConsumerConfig? prevCfg, string key)
{
if (prevCfg?.Metadata != null && prevCfg.Metadata.TryGetValue(key, out var value))
{
cfg.Metadata ??= new Dictionary<string, string>();
cfg.Metadata[key] = value;
return;
}
if (cfg.Metadata != null)
{
cfg.Metadata.Remove(key);
if (cfg.Metadata.Count == 0)
cfg.Metadata = null;
}
}
// ---- Private helpers ----
/// <summary>
/// Copies all scalar/reference properties from <paramref name="src"/> to <paramref name="dst"/>,
/// excluding <c>Metadata</c> (which is set separately by the caller).
/// </summary>
private static void CopyConsumerConfigFields(ConsumerConfig src, ConsumerConfig dst)
{
dst.DeliverPolicy = src.DeliverPolicy;
dst.OptStartSeq = src.OptStartSeq;
dst.OptStartTime = src.OptStartTime;
dst.DeliverSubject = src.DeliverSubject;
dst.DeliverGroup = src.DeliverGroup;
dst.Durable = src.Durable;
dst.Name = src.Name;
dst.Description = src.Description;
dst.FilterSubject = src.FilterSubject;
dst.FilterSubjects = src.FilterSubjects;
dst.AckPolicy = src.AckPolicy;
dst.AckWait = src.AckWait;
dst.MaxDeliver = src.MaxDeliver;
dst.BackOff = src.BackOff;
dst.ReplayPolicy = src.ReplayPolicy;
dst.RateLimit = src.RateLimit;
dst.SampleFrequency = src.SampleFrequency;
dst.MaxWaiting = src.MaxWaiting;
dst.MaxAckPending = src.MaxAckPending;
dst.FlowControl = src.FlowControl;
dst.Heartbeat = src.Heartbeat;
dst.Direct = src.Direct;
dst.HeadersOnly = src.HeadersOnly;
dst.MaxRequestBatch = src.MaxRequestBatch;
dst.MaxRequestMaxBytes = src.MaxRequestMaxBytes;
dst.MaxRequestExpires = src.MaxRequestExpires;
dst.InactiveThreshold = src.InactiveThreshold;
dst.Replicas = src.Replicas;
dst.MemoryStorage = src.MemoryStorage;
dst.PauseUntil = src.PauseUntil;
dst.PinnedTTL = src.PinnedTTL;
dst.PriorityPolicy = src.PriorityPolicy;
dst.PriorityGroups = src.PriorityGroups;
// Metadata is NOT copied here — caller sets it.
}
}