Fix E2E test gaps and add comprehensive E2E + parity test suites

- Fix pull consumer fetch: send original stream subject in HMSG (not inbox)
  so NATS client distinguishes data messages from control messages
- Fix MaxAge expiry: add background timer in StreamManager for periodic pruning
- Fix JetStream wire format: Go-compatible anonymous objects with string enums,
  proper offset-based pagination for stream/consumer list APIs
- Add 42 E2E black-box tests (core messaging, auth, TLS, accounts, JetStream)
- Add ~1000 parity tests across all subsystems (gaps closure)
- Update gap inventory docs to reflect implementation status
This commit is contained in:
Joseph Doherty
2026-03-12 14:09:23 -04:00
parent 79c1ee8776
commit c30e67a69d
226 changed files with 17801 additions and 709 deletions

View File

@@ -53,30 +53,57 @@ public static class ConsumerApiHandlers
: JetStreamApiResponse.NotFound(subject);
}
public static JetStreamApiResponse HandleNames(string subject, ConsumerManager consumerManager)
public static JetStreamApiResponse HandleNames(string subject, ReadOnlySpan<byte> payload, ConsumerManager consumerManager)
{
var stream = ParseStreamSubject(subject, NamesPrefix);
if (stream == null)
return JetStreamApiResponse.NotFound(subject);
var offset = ParseOffset(payload);
var all = consumerManager.ListNames(stream);
var page = offset >= all.Count ? [] : all.Skip(offset).ToList();
return new JetStreamApiResponse
{
ConsumerNames = consumerManager.ListNames(stream),
ConsumerNames = page,
PaginationTotal = all.Count,
PaginationOffset = offset,
};
}
public static JetStreamApiResponse HandleList(string subject, ConsumerManager consumerManager)
public static JetStreamApiResponse HandleList(string subject, ReadOnlySpan<byte> payload, ConsumerManager consumerManager)
{
var stream = ParseStreamSubject(subject, ListPrefix);
if (stream == null)
return JetStreamApiResponse.NotFound(subject);
var offset = ParseOffset(payload);
var all = consumerManager.ListConsumerInfos(stream);
var page = offset >= all.Count ? [] : all.Skip(offset).ToList();
return new JetStreamApiResponse
{
ConsumerNames = consumerManager.ListNames(stream),
ConsumerInfoList = page,
PaginationTotal = all.Count,
PaginationOffset = offset,
};
}
private static int ParseOffset(ReadOnlySpan<byte> payload)
{
if (payload.IsEmpty) return 0;
try
{
using var doc = JsonDocument.Parse(payload.ToArray());
if (doc.RootElement.TryGetProperty("offset", out var el) && el.TryGetInt32(out var v))
return Math.Max(v, 0);
}
catch (JsonException ex)
{
System.Diagnostics.Debug.WriteLine($"Malformed offset payload: {ex.Message}");
}
return 0;
}
public static JetStreamApiResponse HandlePause(string subject, ReadOnlySpan<byte> payload, ConsumerManager consumerManager)
{
var parsed = ParseSubject(subject, PausePrefix);
@@ -254,15 +281,21 @@ public static class ConsumerApiHandlers
{
using var doc = JsonDocument.Parse(payload.ToArray());
var root = doc.RootElement;
// The client wraps config in a "config" property (CreateConsumerRequest).
// Go reference: consumer.go — CreateConsumerRequest { Config ConsumerConfig `json:"config"` }
var configEl = root.TryGetProperty("config", out var nested) ? nested : root;
var config = new ConsumerConfig();
if (root.TryGetProperty("durable_name", out var durableEl))
if (configEl.TryGetProperty("durable_name", out var durableEl))
config.DurableName = durableEl.GetString() ?? string.Empty;
else if (configEl.TryGetProperty("name", out var nameEl))
config.DurableName = nameEl.GetString() ?? string.Empty;
if (root.TryGetProperty("filter_subject", out var filterEl))
if (configEl.TryGetProperty("filter_subject", out var filterEl))
config.FilterSubject = filterEl.GetString();
if (root.TryGetProperty("filter_subjects", out var filterSubjectsEl) && filterSubjectsEl.ValueKind == JsonValueKind.Array)
if (configEl.TryGetProperty("filter_subjects", out var filterSubjectsEl) && filterSubjectsEl.ValueKind == JsonValueKind.Array)
{
foreach (var item in filterSubjectsEl.EnumerateArray())
{
@@ -272,41 +305,41 @@ public static class ConsumerApiHandlers
}
}
if (root.TryGetProperty("ephemeral", out var ephemeralEl) && ephemeralEl.ValueKind == JsonValueKind.True)
if (configEl.TryGetProperty("ephemeral", out var ephemeralEl) && ephemeralEl.ValueKind == JsonValueKind.True)
config.Ephemeral = true;
if (root.TryGetProperty("push", out var pushEl) && pushEl.ValueKind == JsonValueKind.True)
if (configEl.TryGetProperty("push", out var pushEl) && pushEl.ValueKind == JsonValueKind.True)
config.Push = true;
if (root.TryGetProperty("heartbeat_ms", out var hbEl) && hbEl.TryGetInt32(out var hbMs))
if (configEl.TryGetProperty("heartbeat_ms", out var hbEl) && hbEl.TryGetInt32(out var hbMs))
config.HeartbeatMs = hbMs;
if (root.TryGetProperty("ack_wait_ms", out var ackWaitEl) && ackWaitEl.TryGetInt32(out var ackWait))
if (configEl.TryGetProperty("ack_wait_ms", out var ackWaitEl) && ackWaitEl.TryGetInt32(out var ackWait))
config.AckWaitMs = ackWait;
if (root.TryGetProperty("max_deliver", out var maxDeliverEl) && maxDeliverEl.TryGetInt32(out var maxDeliver))
if (configEl.TryGetProperty("max_deliver", out var maxDeliverEl) && maxDeliverEl.TryGetInt32(out var maxDeliver))
config.MaxDeliver = Math.Max(maxDeliver, 0);
if (root.TryGetProperty("max_ack_pending", out var maxAckPendingEl) && maxAckPendingEl.TryGetInt32(out var maxAckPending))
if (configEl.TryGetProperty("max_ack_pending", out var maxAckPendingEl) && maxAckPendingEl.TryGetInt32(out var maxAckPending))
config.MaxAckPending = Math.Max(maxAckPending, 0);
if (root.TryGetProperty("flow_control", out var flowControlEl) && flowControlEl.ValueKind is JsonValueKind.True or JsonValueKind.False)
if (configEl.TryGetProperty("flow_control", out var flowControlEl) && flowControlEl.ValueKind is JsonValueKind.True or JsonValueKind.False)
config.FlowControl = flowControlEl.GetBoolean();
if (root.TryGetProperty("rate_limit_bps", out var rateLimitEl) && rateLimitEl.TryGetInt64(out var rateLimit))
if (configEl.TryGetProperty("rate_limit_bps", out var rateLimitEl) && rateLimitEl.TryGetInt64(out var rateLimit))
config.RateLimitBps = Math.Max(rateLimit, 0);
if (root.TryGetProperty("opt_start_seq", out var optStartSeqEl) && optStartSeqEl.TryGetUInt64(out var optStartSeq))
if (configEl.TryGetProperty("opt_start_seq", out var optStartSeqEl) && optStartSeqEl.TryGetUInt64(out var optStartSeq))
config.OptStartSeq = optStartSeq;
if (root.TryGetProperty("opt_start_time_utc", out var optStartTimeEl)
if (configEl.TryGetProperty("opt_start_time_utc", out var optStartTimeEl)
&& optStartTimeEl.ValueKind == JsonValueKind.String
&& DateTime.TryParse(optStartTimeEl.GetString(), out var optStartTime))
{
config.OptStartTimeUtc = optStartTime.ToUniversalTime();
}
if (root.TryGetProperty("backoff_ms", out var backoffEl) && backoffEl.ValueKind == JsonValueKind.Array)
if (configEl.TryGetProperty("backoff_ms", out var backoffEl) && backoffEl.ValueKind == JsonValueKind.Array)
{
foreach (var item in backoffEl.EnumerateArray())
{
@@ -315,7 +348,7 @@ public static class ConsumerApiHandlers
}
}
if (root.TryGetProperty("ack_policy", out var ackPolicyEl))
if (configEl.TryGetProperty("ack_policy", out var ackPolicyEl))
{
var ackPolicy = ackPolicyEl.GetString();
if (string.Equals(ackPolicy, "explicit", StringComparison.OrdinalIgnoreCase))
@@ -324,7 +357,7 @@ public static class ConsumerApiHandlers
config.AckPolicy = AckPolicy.All;
}
if (root.TryGetProperty("deliver_policy", out var deliverPolicyEl))
if (configEl.TryGetProperty("deliver_policy", out var deliverPolicyEl))
{
var deliver = deliverPolicyEl.GetString();
if (string.Equals(deliver, "last", StringComparison.OrdinalIgnoreCase))
@@ -339,7 +372,7 @@ public static class ConsumerApiHandlers
config.DeliverPolicy = DeliverPolicy.LastPerSubject;
}
if (root.TryGetProperty("replay_policy", out var replayPolicyEl))
if (configEl.TryGetProperty("replay_policy", out var replayPolicyEl))
{
var replay = replayPolicyEl.GetString();
if (string.Equals(replay, "original", StringComparison.OrdinalIgnoreCase))

View File

@@ -102,17 +102,47 @@ public static class StreamApiHandlers
return JetStreamApiResponse.PurgeResponse((ulong)purged);
}
public static JetStreamApiResponse HandleNames(StreamManager streamManager)
public static JetStreamApiResponse HandleNames(ReadOnlySpan<byte> payload, StreamManager streamManager)
{
var offset = ParseOffset(payload);
var all = streamManager.ListNames();
var page = offset >= all.Count ? [] : all.Skip(offset).ToList();
return new JetStreamApiResponse
{
StreamNames = streamManager.ListNames(),
StreamNames = page,
PaginationTotal = all.Count,
PaginationOffset = offset,
};
}
public static JetStreamApiResponse HandleList(StreamManager streamManager)
public static JetStreamApiResponse HandleList(ReadOnlySpan<byte> payload, StreamManager streamManager)
{
return HandleNames(streamManager);
var offset = ParseOffset(payload);
var all = streamManager.ListStreamInfos();
var page = offset >= all.Count ? [] : all.Skip(offset).ToList();
return new JetStreamApiResponse
{
StreamInfoList = page,
PaginationTotal = all.Count,
PaginationOffset = offset,
};
}
private static int ParseOffset(ReadOnlySpan<byte> payload)
{
if (payload.IsEmpty) return 0;
try
{
using var doc = System.Text.Json.JsonDocument.Parse(payload.ToArray());
if (doc.RootElement.TryGetProperty("offset", out var el) && el.TryGetInt32(out var v))
return Math.Max(v, 0);
}
catch (System.Text.Json.JsonException ex)
{
System.Diagnostics.Debug.WriteLine($"Malformed offset payload: {ex.Message}");
}
return 0;
}
public static JetStreamApiResponse HandleMessageGet(string subject, ReadOnlySpan<byte> payload, StreamManager streamManager)
@@ -445,7 +475,9 @@ public static class StreamApiHandlers
if (root.TryGetProperty("max_msgs_per", out var maxMsgsPerEl) && maxMsgsPerEl.TryGetInt32(out var maxMsgsPer))
config.MaxMsgsPer = maxMsgsPer;
if (root.TryGetProperty("max_age_ms", out var maxAgeMsEl) && maxAgeMsEl.TryGetInt32(out var maxAgeMs))
if (root.TryGetProperty("max_age", out var maxAgeNsEl) && maxAgeNsEl.TryGetInt64(out var maxAgeNs))
config.MaxAge = maxAgeNs;
else if (root.TryGetProperty("max_age_ms", out var maxAgeMsEl) && maxAgeMsEl.TryGetInt32(out var maxAgeMs))
config.MaxAgeMs = maxAgeMs;
if (root.TryGetProperty("max_msg_size", out var maxMsgSizeEl) && maxMsgSizeEl.TryGetInt32(out var maxMsgSize))

View File

@@ -0,0 +1,14 @@
namespace NATS.Server.JetStream.Api;
/// <summary>
/// JetStream API size and queue limits aligned with Go server constants.
/// Go reference: server/jetstream_api.go (JSMaxDescriptionLen, JSMaxMetadataLen,
/// JSMaxNameLen, JSDefaultRequestQueueLimit).
/// </summary>
public static class JetStreamApiLimits
{
public const int JSMaxDescriptionLen = 4_096;
public const int JSMaxMetadataLen = 128 * 1024;
public const int JSMaxNameLen = 255;
public const int JSDefaultRequestQueueLimit = 10_000;
}

View File

@@ -9,7 +9,9 @@ public sealed class JetStreamApiResponse
public JetStreamConsumerInfo? ConsumerInfo { get; init; }
public JetStreamAccountInfo? AccountInfo { get; init; }
public IReadOnlyList<string>? StreamNames { get; init; }
public IReadOnlyList<JetStreamStreamInfo>? StreamInfoList { get; init; }
public IReadOnlyList<string>? ConsumerNames { get; init; }
public IReadOnlyList<JetStreamConsumerInfo>? ConsumerInfoList { get; init; }
public JetStreamStreamMessage? StreamMessage { get; init; }
public JetStreamDirectMessage? DirectMessage { get; init; }
public JetStreamSnapshot? Snapshot { get; init; }
@@ -17,6 +19,17 @@ public sealed class JetStreamApiResponse
public bool Success { get; init; }
public ulong Purged { get; init; }
/// <summary>
/// Total count of all items (before pagination). Used by list responses for offset-based pagination.
/// Go reference: jetstream_api.go — ApiPaged struct includes Total, Offset, Limit fields.
/// </summary>
public int PaginationTotal { get; init; }
/// <summary>
/// Requested offset for pagination. Echoed back to client so it can calculate the next page.
/// </summary>
public int PaginationOffset { get; init; }
/// <summary>
/// Whether the consumer is currently paused. Populated by pause/resume API responses.
/// Go reference: server/consumer.go jsConsumerPauseResponse.paused field.
@@ -29,6 +42,123 @@ public sealed class JetStreamApiResponse
/// </summary>
public DateTime? PauseUntil { get; init; }
/// <summary>
/// Returns a wire-format object for JSON serialization matching the Go server's
/// flat response structure (e.g., config/state at root level for stream responses,
/// not nested under a wrapper property).
/// </summary>
public object ToWireFormat()
{
if (StreamInfo != null)
{
if (Error != null)
return new { type = "io.nats.jetstream.api.v1.stream_create_response", error = Error };
return new
{
type = "io.nats.jetstream.api.v1.stream_create_response",
config = ToWireConfig(StreamInfo.Config),
state = ToWireState(StreamInfo.State),
};
}
if (ConsumerInfo != null)
{
if (Error != null)
return new { type = "io.nats.jetstream.api.v1.consumer_create_response", error = Error };
return new
{
type = "io.nats.jetstream.api.v1.consumer_create_response",
stream_name = ConsumerInfo.StreamName,
name = ConsumerInfo.Name,
config = ToWireConsumerConfig(ConsumerInfo.Config),
};
}
if (Error != null)
return new { error = Error };
if (StreamInfoList != null)
{
var wireStreams = StreamInfoList.Select(s => new
{
config = ToWireConfig(s.Config),
state = ToWireState(s.State),
}).ToList();
return new { total = PaginationTotal, offset = PaginationOffset, limit = wireStreams.Count, streams = wireStreams };
}
if (StreamNames != null)
return new { total = PaginationTotal, offset = PaginationOffset, limit = StreamNames.Count, streams = StreamNames };
if (ConsumerInfoList != null)
{
var wireConsumers = ConsumerInfoList.Select(c => new
{
stream_name = c.StreamName,
name = c.Name,
config = ToWireConsumerConfig(c.Config),
}).ToList();
return new { total = PaginationTotal, offset = PaginationOffset, limit = wireConsumers.Count, consumers = wireConsumers };
}
if (ConsumerNames != null)
return new { total = PaginationTotal, offset = PaginationOffset, limit = ConsumerNames.Count, consumers = ConsumerNames };
if (Purged > 0 || Success)
return new { success = Success, purged = Purged };
return new { success = Success };
}
/// <summary>
/// Creates a Go-compatible wire format for StreamConfig.
/// Only includes fields the Go server sends, with enums as lowercase strings.
/// Go reference: server/stream.go StreamConfig JSON marshaling.
/// </summary>
private static object ToWireConfig(StreamConfig c) => new
{
name = c.Name,
subjects = c.Subjects,
retention = c.Retention.ToString().ToLowerInvariant(),
max_consumers = c.MaxConsumers,
max_msgs = c.MaxMsgs,
max_bytes = c.MaxBytes,
max_age = c.MaxAge,
max_msgs_per_subject = c.MaxMsgsPer,
max_msg_size = c.MaxMsgSize,
storage = c.Storage.ToString().ToLowerInvariant(),
discard = c.Discard.ToString().ToLowerInvariant(),
num_replicas = c.Replicas,
duplicate_window = (long)c.DuplicateWindowMs * 1_000_000L,
sealed_field = c.Sealed,
deny_delete = c.DenyDelete,
deny_purge = c.DenyPurge,
allow_direct = c.AllowDirect,
first_seq = c.FirstSeq,
};
private static object ToWireState(ApiStreamState s) => new
{
messages = s.Messages,
bytes = s.Bytes,
first_seq = s.FirstSeq,
last_seq = s.LastSeq,
consumer_count = 0,
};
private static object ToWireConsumerConfig(ConsumerConfig c) => new
{
durable_name = string.IsNullOrEmpty(c.DurableName) ? null : c.DurableName,
name = string.IsNullOrEmpty(c.DurableName) ? null : c.DurableName,
deliver_policy = c.DeliverPolicy.ToString().ToLowerInvariant(),
ack_policy = c.AckPolicy.ToString().ToLowerInvariant(),
replay_policy = c.ReplayPolicy.ToString().ToLowerInvariant(),
ack_wait = (long)c.AckWaitMs * 1_000_000L,
max_deliver = c.MaxDeliver,
max_ack_pending = c.MaxAckPending,
filter_subject = c.FilterSubject,
};
public static JetStreamApiResponse NotFound(string subject) => new()
{
Error = new JetStreamApiError
@@ -99,6 +229,8 @@ public sealed class JetStreamStreamInfo
public sealed class JetStreamConsumerInfo
{
public string? Name { get; init; }
public string? StreamName { get; init; }
public required ConsumerConfig Config { get; init; }
}

View File

@@ -249,10 +249,10 @@ public sealed class JetStreamApiRouter
return StreamApiHandlers.HandleInfo(subject, _streamManager);
if (subject.Equals(JetStreamApiSubjects.StreamNames, StringComparison.Ordinal))
return StreamApiHandlers.HandleNames(_streamManager);
return StreamApiHandlers.HandleNames(payload, _streamManager);
if (subject.Equals(JetStreamApiSubjects.StreamList, StringComparison.Ordinal))
return StreamApiHandlers.HandleList(_streamManager);
return StreamApiHandlers.HandleList(payload, _streamManager);
if (subject.StartsWith(JetStreamApiSubjects.StreamUpdate, StringComparison.Ordinal))
return StreamApiHandlers.HandleUpdate(subject, payload, _streamManager);
@@ -288,10 +288,10 @@ public sealed class JetStreamApiRouter
return ConsumerApiHandlers.HandleInfo(subject, _consumerManager);
if (subject.StartsWith(JetStreamApiSubjects.ConsumerNames, StringComparison.Ordinal))
return ConsumerApiHandlers.HandleNames(subject, _consumerManager);
return ConsumerApiHandlers.HandleNames(subject, payload, _consumerManager);
if (subject.StartsWith(JetStreamApiSubjects.ConsumerList, StringComparison.Ordinal))
return ConsumerApiHandlers.HandleList(subject, _consumerManager);
return ConsumerApiHandlers.HandleList(subject, payload, _consumerManager);
if (subject.StartsWith(JetStreamApiSubjects.ConsumerDelete, StringComparison.Ordinal))
return ConsumerApiHandlers.HandleDelete(subject, _consumerManager);

View File

@@ -4,6 +4,7 @@ using NATS.Server.JetStream.Cluster;
using NATS.Server.JetStream.Consumers;
using NATS.Server.JetStream.Models;
using NATS.Server.JetStream.Storage;
using NATS.Server.JetStream.Validation;
using NATS.Server.Subscriptions;
namespace NATS.Server.JetStream;
@@ -40,6 +41,12 @@ public sealed class ConsumerManager : IDisposable
return JetStreamApiResponse.ErrorResponse(400, "durable name required");
}
if (!JetStreamConfigValidator.IsValidName(config.DurableName))
return JetStreamApiResponse.ErrorResponse(400, "invalid durable name");
if (!JetStreamConfigValidator.IsMetadataWithinLimit(config.Metadata))
return JetStreamApiResponse.ErrorResponse(400, "consumer metadata exceeds maximum size");
if (config.FilterSubjects.Count == 0 && !string.IsNullOrWhiteSpace(config.FilterSubject))
config.FilterSubjects.Add(config.FilterSubject);
@@ -58,6 +65,8 @@ public sealed class ConsumerManager : IDisposable
{
ConsumerInfo = new JetStreamConsumerInfo
{
Name = handle.Config.DurableName,
StreamName = stream,
Config = handle.Config,
},
};
@@ -71,6 +80,8 @@ public sealed class ConsumerManager : IDisposable
{
ConsumerInfo = new JetStreamConsumerInfo
{
Name = handle.Config.DurableName,
StreamName = stream,
Config = handle.Config,
},
};
@@ -95,6 +106,13 @@ public sealed class ConsumerManager : IDisposable
.OrderBy(x => x, StringComparer.Ordinal)
.ToArray();
public IReadOnlyList<JetStreamConsumerInfo> ListConsumerInfos(string stream)
=> _consumers
.Where(kv => string.Equals(kv.Key.Stream, stream, StringComparison.Ordinal))
.OrderBy(kv => kv.Key.Name, StringComparer.Ordinal)
.Select(kv => new JetStreamConsumerInfo { Name = kv.Value.Config.DurableName, StreamName = stream, Config = kv.Value.Config })
.ToList();
public bool Pause(string stream, string durableName, bool paused)
{
if (!_consumers.TryGetValue((stream, durableName), out var handle))

View File

@@ -0,0 +1,58 @@
namespace NATS.Server.JetStream;
/// <summary>
/// API usage counters for JetStream.
/// Go reference: server/jetstream.go JetStreamAPIStats.
/// </summary>
public sealed class JetStreamApiStats
{
public int Level { get; set; }
public ulong Total { get; set; }
public ulong Errors { get; set; }
public int Inflight { get; set; }
}
/// <summary>
/// Per-tier JetStream resource view.
/// Go reference: server/jetstream.go JetStreamTier.
/// </summary>
public sealed class JetStreamTier
{
public string Name { get; set; } = string.Empty;
public long Memory { get; set; }
public long Store { get; set; }
public int Streams { get; set; }
public int Consumers { get; set; }
}
/// <summary>
/// Per-account JetStream limits.
/// Go reference: server/jetstream.go JetStreamAccountLimits.
/// </summary>
public sealed class JetStreamAccountLimits
{
public long MaxMemory { get; set; }
public long MaxStore { get; set; }
public int MaxStreams { get; set; }
public int MaxConsumers { get; set; }
public int MaxAckPending { get; set; }
public long MemoryMaxStreamBytes { get; set; }
public long StoreMaxStreamBytes { get; set; }
public bool MaxBytesRequired { get; set; }
public Dictionary<string, JetStreamTier> Tiers { get; set; } = new(StringComparer.Ordinal);
}
/// <summary>
/// Server-level JetStream usage stats.
/// Go reference: server/jetstream.go JetStreamStats.
/// </summary>
public sealed class JetStreamStats
{
public long Memory { get; set; }
public long Store { get; set; }
public long ReservedMemory { get; set; }
public long ReservedStore { get; set; }
public int Accounts { get; set; }
public int HaAssets { get; set; }
public JetStreamApiStats Api { get; set; } = new();
}

View File

@@ -8,7 +8,18 @@ public sealed class StreamConfig
public int MaxMsgs { get; set; }
public long MaxBytes { get; set; }
public int MaxMsgsPer { get; set; }
[System.Text.Json.Serialization.JsonIgnore]
public int MaxAgeMs { get; set; }
/// <summary>
/// MaxAge in nanoseconds for JSON wire compatibility with Go server.
/// Go reference: StreamConfig.MaxAge is a time.Duration (nanoseconds in JSON).
/// </summary>
public long MaxAge
{
get => (long)MaxAgeMs * 1_000_000L;
set => MaxAgeMs = (int)(value / 1_000_000);
}
public int MaxMsgSize { get; set; }
public int MaxConsumers { get; set; }
public int DuplicateWindowMs { get; set; }

View File

@@ -774,11 +774,7 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
{
if (string.IsNullOrEmpty(filter))
return true;
if (NATS.Server.Subscriptions.SubjectMatch.IsLiteral(filter))
return string.Equals(subject, filter, StringComparison.Ordinal);
return NATS.Server.Subscriptions.SubjectMatch.MatchLiteral(subject, filter);
return NATS.Server.Subscriptions.SubjectMatch.SubjectMatchesFilter(subject, filter);
}
/// <summary>

View File

@@ -1176,9 +1176,7 @@ public sealed class MemStore : IStreamStore
{
if (string.IsNullOrEmpty(filter) || filter == ">")
return true;
if (SubjectMatch.IsLiteral(filter))
return string.Equals(subject, filter, StringComparison.Ordinal);
return SubjectMatch.MatchLiteral(subject, filter);
return SubjectMatch.SubjectMatchesFilter(subject, filter);
}
// Fill a StoreMsg from an internal Msg

View File

@@ -1,4 +1,5 @@
using System.Collections.Concurrent;
using System.Text;
using NATS.Server.Auth;
using NATS.Server.JetStream.Api;
using NATS.Server.JetStream.Cluster;
@@ -7,11 +8,12 @@ using NATS.Server.JetStream.Models;
using NATS.Server.JetStream.Publish;
using NATS.Server.JetStream.Snapshots;
using NATS.Server.JetStream.Storage;
using NATS.Server.JetStream.Validation;
using NATS.Server.Subscriptions;
namespace NATS.Server.JetStream;
public sealed class StreamManager
public sealed class StreamManager : IDisposable
{
private readonly Account? _account;
private readonly ConsumerManager? _consumerManager;
@@ -25,12 +27,52 @@ public sealed class StreamManager
private readonly ConcurrentDictionary<string, List<SourceCoordinator>> _sourcesByOrigin =
new(StringComparer.Ordinal);
private readonly StreamSnapshotService _snapshotService = new();
private readonly CancellationTokenSource _expiryTimerCts = new();
private Task? _expiryTimerTask;
public StreamManager(JetStreamMetaGroup? metaGroup = null, Account? account = null, ConsumerManager? consumerManager = null)
{
_metaGroup = metaGroup;
_account = account;
_consumerManager = consumerManager;
_expiryTimerTask = RunExpiryTimerAsync(_expiryTimerCts.Token);
}
public void Dispose()
{
_expiryTimerCts.Cancel();
_expiryTimerCts.Dispose();
}
/// <summary>
/// Periodically prunes expired messages from streams with MaxAge configured.
/// Go reference: stream.go — expireMsgs runs on a timer (checkMaxAge interval).
/// </summary>
private async Task RunExpiryTimerAsync(CancellationToken ct)
{
using var timer = new PeriodicTimer(TimeSpan.FromSeconds(1));
while (!ct.IsCancellationRequested)
{
var ticked = false;
try
{
ticked = await timer.WaitForNextTickAsync(ct);
}
catch (OperationCanceledException)
{
return; // Shutdown requested via Dispose — exit the timer loop
}
if (!ticked)
return;
var nowUtc = DateTime.UtcNow;
foreach (var stream in _streams.Values)
{
if (stream.Config.MaxAgeMs > 0)
PruneExpiredMessages(stream, nowUtc);
}
}
}
public IReadOnlyCollection<string> StreamNames => _streams.Keys.ToArray();
@@ -39,10 +81,31 @@ public sealed class StreamManager
public IReadOnlyList<string> ListNames()
=> [.. _streams.Keys.OrderBy(x => x, StringComparer.Ordinal)];
public IReadOnlyList<JetStreamStreamInfo> ListStreamInfos()
{
return _streams.OrderBy(kv => kv.Key, StringComparer.Ordinal)
.Select(kv =>
{
var state = kv.Value.Store.GetStateAsync(default).GetAwaiter().GetResult();
return new JetStreamStreamInfo
{
Config = kv.Value.Config,
State = state,
};
})
.ToList();
}
public JetStreamApiResponse CreateOrUpdate(StreamConfig config)
{
if (string.IsNullOrWhiteSpace(config.Name))
return JetStreamApiResponse.ErrorResponse(400, "stream name required");
if (!JetStreamConfigValidator.IsValidName(config.Name))
return JetStreamApiResponse.ErrorResponse(400, "invalid stream name");
if (Encoding.UTF8.GetByteCount(config.Description) > JetStreamApiLimits.JSMaxDescriptionLen)
return JetStreamApiResponse.ErrorResponse(400, "stream description is too long");
if (!JetStreamConfigValidator.IsMetadataWithinLimit(config.Metadata))
return JetStreamApiResponse.ErrorResponse(400, "stream metadata exceeds maximum size");
var normalized = NormalizeConfig(config);
@@ -302,6 +365,8 @@ public sealed class StreamManager
if (stream == null)
return null;
if (stream.Config.MaxMsgSize > 0 && payload.Length > stream.Config.MaxMsgSize)
{
return new PubAck

View File

@@ -1,10 +1,48 @@
using System.Text;
using NATS.Server.Configuration;
using NATS.Server.JetStream.Api;
using NATS.Server.JetStream.Models;
namespace NATS.Server.JetStream.Validation;
public static class JetStreamConfigValidator
{
public static bool IsValidName(string? name)
{
if (string.IsNullOrWhiteSpace(name))
return false;
// Go len(name) checks byte length, not character length.
if (Encoding.UTF8.GetByteCount(name) > JetStreamApiLimits.JSMaxNameLen)
return false;
foreach (var ch in name)
{
if (char.IsWhiteSpace(ch) || ch is '*' or '>')
return false;
}
return true;
}
public static bool IsMetadataWithinLimit(Dictionary<string, string>? metadata)
=> MetadataByteSize(metadata) <= JetStreamApiLimits.JSMaxMetadataLen;
public static int MetadataByteSize(Dictionary<string, string>? metadata)
{
if (metadata is null || metadata.Count == 0)
return 0;
var size = 0;
foreach (var (key, value) in metadata)
{
size += Encoding.UTF8.GetByteCount(key);
size += Encoding.UTF8.GetByteCount(value);
}
return size;
}
public static ValidationResult Validate(StreamConfig config)
{
if (string.IsNullOrWhiteSpace(config.Name) || config.Subjects.Count == 0)