feat: complete remaining jetstream parity implementation plan

This commit is contained in:
Joseph Doherty
2026-02-23 10:16:16 -05:00
parent c7bbf45c8f
commit f46b331921
59 changed files with 1734 additions and 54 deletions

View File

@@ -0,0 +1,16 @@
namespace NATS.Server.JetStream.Api.Handlers;
public static class AccountApiHandlers
{
public static JetStreamApiResponse HandleInfo(StreamManager streams, ConsumerManager consumers)
{
return new JetStreamApiResponse
{
AccountInfo = new JetStreamAccountInfo
{
Streams = streams.StreamNames.Count,
Consumers = consumers.ConsumerCount,
},
};
}
}

View File

@@ -0,0 +1,23 @@
namespace NATS.Server.JetStream.Api.Handlers;
public static class ClusterControlApiHandlers
{
public static JetStreamApiResponse HandleMetaLeaderStepdown(JetStream.Cluster.JetStreamMetaGroup meta)
{
meta.StepDown();
return JetStreamApiResponse.SuccessResponse();
}
public static JetStreamApiResponse HandleStreamLeaderStepdown(string subject, StreamManager streams)
{
if (!subject.StartsWith(JetStreamApiSubjects.StreamLeaderStepdown, StringComparison.Ordinal))
return JetStreamApiResponse.NotFound(subject);
var stream = subject[JetStreamApiSubjects.StreamLeaderStepdown.Length..].Trim();
if (stream.Length == 0)
return JetStreamApiResponse.NotFound(subject);
streams.StepDownStreamLeaderAsync(stream, default).GetAwaiter().GetResult();
return JetStreamApiResponse.SuccessResponse();
}
}

View File

@@ -1,3 +1,4 @@
using System.Text;
using System.Text.Json;
using NATS.Server.JetStream.Models;
@@ -5,8 +6,15 @@ namespace NATS.Server.JetStream.Api.Handlers;
public static class ConsumerApiHandlers
{
private const string CreatePrefix = "$JS.API.CONSUMER.CREATE.";
private const string InfoPrefix = "$JS.API.CONSUMER.INFO.";
private const string CreatePrefix = JetStreamApiSubjects.ConsumerCreate;
private const string InfoPrefix = JetStreamApiSubjects.ConsumerInfo;
private const string NamesPrefix = JetStreamApiSubjects.ConsumerNames;
private const string ListPrefix = JetStreamApiSubjects.ConsumerList;
private const string DeletePrefix = JetStreamApiSubjects.ConsumerDelete;
private const string PausePrefix = JetStreamApiSubjects.ConsumerPause;
private const string ResetPrefix = JetStreamApiSubjects.ConsumerReset;
private const string UnpinPrefix = JetStreamApiSubjects.ConsumerUnpin;
private const string NextPrefix = JetStreamApiSubjects.ConsumerNext;
public static JetStreamApiResponse HandleCreate(string subject, ReadOnlySpan<byte> payload, ConsumerManager consumerManager)
{
@@ -32,6 +40,104 @@ public static class ConsumerApiHandlers
return consumerManager.GetInfo(stream, durableName);
}
public static JetStreamApiResponse HandleDelete(string subject, ConsumerManager consumerManager)
{
var parsed = ParseSubject(subject, DeletePrefix);
if (parsed == null)
return JetStreamApiResponse.NotFound(subject);
var (stream, durableName) = parsed.Value;
return consumerManager.Delete(stream, durableName)
? JetStreamApiResponse.SuccessResponse()
: JetStreamApiResponse.NotFound(subject);
}
public static JetStreamApiResponse HandleNames(string subject, ConsumerManager consumerManager)
{
var stream = ParseStreamSubject(subject, NamesPrefix);
if (stream == null)
return JetStreamApiResponse.NotFound(subject);
return new JetStreamApiResponse
{
ConsumerNames = consumerManager.ListNames(stream),
};
}
public static JetStreamApiResponse HandleList(string subject, ConsumerManager consumerManager)
{
var stream = ParseStreamSubject(subject, ListPrefix);
if (stream == null)
return JetStreamApiResponse.NotFound(subject);
return new JetStreamApiResponse
{
ConsumerNames = consumerManager.ListNames(stream),
};
}
public static JetStreamApiResponse HandlePause(string subject, ReadOnlySpan<byte> payload, ConsumerManager consumerManager)
{
var parsed = ParseSubject(subject, PausePrefix);
if (parsed == null)
return JetStreamApiResponse.NotFound(subject);
var (stream, durableName) = parsed.Value;
var paused = ParsePause(payload);
return consumerManager.Pause(stream, durableName, paused)
? JetStreamApiResponse.SuccessResponse()
: JetStreamApiResponse.NotFound(subject);
}
public static JetStreamApiResponse HandleReset(string subject, ConsumerManager consumerManager)
{
var parsed = ParseSubject(subject, ResetPrefix);
if (parsed == null)
return JetStreamApiResponse.NotFound(subject);
var (stream, durableName) = parsed.Value;
return consumerManager.Reset(stream, durableName)
? JetStreamApiResponse.SuccessResponse()
: JetStreamApiResponse.NotFound(subject);
}
public static JetStreamApiResponse HandleUnpin(string subject, ConsumerManager consumerManager)
{
var parsed = ParseSubject(subject, UnpinPrefix);
if (parsed == null)
return JetStreamApiResponse.NotFound(subject);
var (stream, durableName) = parsed.Value;
return consumerManager.Unpin(stream, durableName)
? JetStreamApiResponse.SuccessResponse()
: JetStreamApiResponse.NotFound(subject);
}
public static JetStreamApiResponse HandleNext(string subject, ReadOnlySpan<byte> payload, ConsumerManager consumerManager, StreamManager streamManager)
{
var parsed = ParseSubject(subject, NextPrefix);
if (parsed == null)
return JetStreamApiResponse.NotFound(subject);
var (stream, durableName) = parsed.Value;
var batch = ParseBatch(payload);
var pullBatch = consumerManager.FetchAsync(stream, durableName, batch, streamManager, default).GetAwaiter().GetResult();
return new JetStreamApiResponse
{
PullBatch = new JetStreamPullBatch
{
Messages = pullBatch.Messages
.Select(m => new JetStreamDirectMessage
{
Sequence = m.Sequence,
Subject = m.Subject,
Payload = Encoding.UTF8.GetString(m.Payload.Span),
})
.ToArray(),
},
};
}
private static (string Stream, string Durable)? ParseSubject(string subject, string prefix)
{
if (!subject.StartsWith(prefix, StringComparison.Ordinal))
@@ -76,6 +182,8 @@ public static class ConsumerApiHandlers
var ackPolicy = ackPolicyEl.GetString();
if (string.Equals(ackPolicy, "explicit", StringComparison.OrdinalIgnoreCase))
config.AckPolicy = AckPolicy.Explicit;
else if (string.Equals(ackPolicy, "all", StringComparison.OrdinalIgnoreCase))
config.AckPolicy = AckPolicy.All;
}
return config;
@@ -85,4 +193,49 @@ public static class ConsumerApiHandlers
return new ConsumerConfig();
}
}
private static int ParseBatch(ReadOnlySpan<byte> payload)
{
if (payload.IsEmpty)
return 1;
try
{
using var doc = JsonDocument.Parse(payload.ToArray());
if (doc.RootElement.TryGetProperty("batch", out var batchEl) && batchEl.TryGetInt32(out var batch))
return Math.Max(batch, 1);
}
catch (JsonException)
{
}
return 1;
}
private static bool ParsePause(ReadOnlySpan<byte> payload)
{
if (payload.IsEmpty)
return false;
try
{
using var doc = JsonDocument.Parse(payload.ToArray());
if (doc.RootElement.TryGetProperty("pause", out var pauseEl))
return pauseEl.ValueKind == JsonValueKind.True;
}
catch (JsonException)
{
}
return false;
}
private static string? ParseStreamSubject(string subject, string prefix)
{
if (!subject.StartsWith(prefix, StringComparison.Ordinal))
return null;
var stream = subject[prefix.Length..].Trim();
return stream.Length == 0 ? null : stream;
}
}

View File

@@ -0,0 +1,61 @@
using System.Text;
using System.Text.Json;
namespace NATS.Server.JetStream.Api.Handlers;
public static class DirectApiHandlers
{
private const string Prefix = JetStreamApiSubjects.DirectGet;
public static JetStreamApiResponse HandleGet(string subject, ReadOnlySpan<byte> payload, StreamManager streamManager)
{
var streamName = ExtractTrailingToken(subject, Prefix);
if (streamName == null)
return JetStreamApiResponse.NotFound(subject);
var sequence = ParseSequence(payload);
if (sequence == 0)
return JetStreamApiResponse.ErrorResponse(400, "sequence required");
var message = streamManager.GetMessage(streamName, sequence);
if (message == null)
return JetStreamApiResponse.NotFound(subject);
return new JetStreamApiResponse
{
DirectMessage = new JetStreamDirectMessage
{
Sequence = message.Sequence,
Subject = message.Subject,
Payload = Encoding.UTF8.GetString(message.Payload.Span),
},
};
}
private static string? ExtractTrailingToken(string subject, string prefix)
{
if (!subject.StartsWith(prefix, StringComparison.Ordinal))
return null;
var token = subject[prefix.Length..].Trim();
return token.Length == 0 ? null : token;
}
private static ulong ParseSequence(ReadOnlySpan<byte> payload)
{
if (payload.IsEmpty)
return 0;
try
{
using var doc = JsonDocument.Parse(payload.ToArray());
if (doc.RootElement.TryGetProperty("seq", out var seqEl) && seqEl.TryGetUInt64(out var sequence))
return sequence;
}
catch (JsonException)
{
}
return 0;
}
}

View File

@@ -1,12 +1,20 @@
using System.Text.Json;
using System.Text;
using NATS.Server.JetStream.Models;
namespace NATS.Server.JetStream.Api.Handlers;
public static class StreamApiHandlers
{
private const string CreatePrefix = "$JS.API.STREAM.CREATE.";
private const string InfoPrefix = "$JS.API.STREAM.INFO.";
private const string CreatePrefix = JetStreamApiSubjects.StreamCreate;
private const string InfoPrefix = JetStreamApiSubjects.StreamInfo;
private const string UpdatePrefix = JetStreamApiSubjects.StreamUpdate;
private const string DeletePrefix = JetStreamApiSubjects.StreamDelete;
private const string PurgePrefix = JetStreamApiSubjects.StreamPurge;
private const string MessageGetPrefix = JetStreamApiSubjects.StreamMessageGet;
private const string MessageDeletePrefix = JetStreamApiSubjects.StreamMessageDelete;
private const string SnapshotPrefix = JetStreamApiSubjects.StreamSnapshot;
private const string RestorePrefix = JetStreamApiSubjects.StreamRestore;
public static JetStreamApiResponse HandleCreate(string subject, ReadOnlySpan<byte> payload, StreamManager streamManager)
{
@@ -33,6 +41,131 @@ public static class StreamApiHandlers
return streamManager.GetInfo(streamName);
}
public static JetStreamApiResponse HandleUpdate(string subject, ReadOnlySpan<byte> payload, StreamManager streamManager)
{
var streamName = ExtractTrailingToken(subject, UpdatePrefix);
if (streamName == null)
return JetStreamApiResponse.NotFound(subject);
var config = ParseConfig(payload);
if (string.IsNullOrWhiteSpace(config.Name))
config.Name = streamName;
if (config.Subjects.Count == 0)
config.Subjects.Add(streamName.ToLowerInvariant() + ".>");
return streamManager.CreateOrUpdate(config);
}
public static JetStreamApiResponse HandleDelete(string subject, StreamManager streamManager)
{
var streamName = ExtractTrailingToken(subject, DeletePrefix);
if (streamName == null)
return JetStreamApiResponse.NotFound(subject);
return streamManager.Delete(streamName)
? JetStreamApiResponse.SuccessResponse()
: JetStreamApiResponse.NotFound(subject);
}
public static JetStreamApiResponse HandlePurge(string subject, StreamManager streamManager)
{
var streamName = ExtractTrailingToken(subject, PurgePrefix);
if (streamName == null)
return JetStreamApiResponse.NotFound(subject);
return streamManager.Purge(streamName)
? JetStreamApiResponse.SuccessResponse()
: JetStreamApiResponse.NotFound(subject);
}
public static JetStreamApiResponse HandleNames(StreamManager streamManager)
{
return new JetStreamApiResponse
{
StreamNames = streamManager.ListNames(),
};
}
public static JetStreamApiResponse HandleList(StreamManager streamManager)
{
return HandleNames(streamManager);
}
public static JetStreamApiResponse HandleMessageGet(string subject, ReadOnlySpan<byte> payload, StreamManager streamManager)
{
var streamName = ExtractTrailingToken(subject, MessageGetPrefix);
if (streamName == null)
return JetStreamApiResponse.NotFound(subject);
var sequence = ParseSequence(payload);
if (sequence == 0)
return JetStreamApiResponse.ErrorResponse(400, "sequence required");
var message = streamManager.GetMessage(streamName, sequence);
if (message == null)
return JetStreamApiResponse.NotFound(subject);
return new JetStreamApiResponse
{
StreamMessage = new JetStreamStreamMessage
{
Sequence = message.Sequence,
Subject = message.Subject,
Payload = Encoding.UTF8.GetString(message.Payload.Span),
},
};
}
public static JetStreamApiResponse HandleMessageDelete(string subject, ReadOnlySpan<byte> payload, StreamManager streamManager)
{
var streamName = ExtractTrailingToken(subject, MessageDeletePrefix);
if (streamName == null)
return JetStreamApiResponse.NotFound(subject);
var sequence = ParseSequence(payload);
if (sequence == 0)
return JetStreamApiResponse.ErrorResponse(400, "sequence required");
return streamManager.DeleteMessage(streamName, sequence)
? JetStreamApiResponse.SuccessResponse()
: JetStreamApiResponse.NotFound(subject);
}
public static JetStreamApiResponse HandleSnapshot(string subject, StreamManager streamManager)
{
var streamName = ExtractTrailingToken(subject, SnapshotPrefix);
if (streamName == null)
return JetStreamApiResponse.NotFound(subject);
var snapshot = streamManager.CreateSnapshot(streamName);
if (snapshot == null)
return JetStreamApiResponse.NotFound(subject);
return new JetStreamApiResponse
{
Snapshot = new JetStreamSnapshot
{
Payload = Convert.ToBase64String(snapshot),
},
};
}
public static JetStreamApiResponse HandleRestore(string subject, ReadOnlySpan<byte> payload, StreamManager streamManager)
{
var streamName = ExtractTrailingToken(subject, RestorePrefix);
if (streamName == null)
return JetStreamApiResponse.NotFound(subject);
var snapshotBytes = ParseRestorePayload(payload);
if (snapshotBytes == null)
return JetStreamApiResponse.ErrorResponse(400, "snapshot payload required");
return streamManager.RestoreSnapshot(streamName, snapshotBytes)
? JetStreamApiResponse.SuccessResponse()
: JetStreamApiResponse.NotFound(subject);
}
private static string? ExtractTrailingToken(string subject, string prefix)
{
if (!subject.StartsWith(prefix, StringComparison.Ordinal))
@@ -88,4 +221,56 @@ public static class StreamApiHandlers
return new StreamConfig();
}
}
private static ulong ParseSequence(ReadOnlySpan<byte> payload)
{
if (payload.IsEmpty)
return 0;
try
{
using var doc = JsonDocument.Parse(payload.ToArray());
if (doc.RootElement.TryGetProperty("seq", out var seqEl) && seqEl.TryGetUInt64(out var sequence))
return sequence;
}
catch (JsonException)
{
}
return 0;
}
private static byte[]? ParseRestorePayload(ReadOnlySpan<byte> payload)
{
if (payload.IsEmpty)
return null;
var raw = Encoding.UTF8.GetString(payload).Trim();
if (raw.Length == 0)
return null;
try
{
return Convert.FromBase64String(raw);
}
catch (FormatException)
{
}
try
{
using var doc = JsonDocument.Parse(payload.ToArray());
if (doc.RootElement.TryGetProperty("payload", out var payloadEl))
{
var base64 = payloadEl.GetString();
if (!string.IsNullOrWhiteSpace(base64))
return Convert.FromBase64String(base64);
}
}
catch (JsonException)
{
}
return null;
}
}

View File

@@ -7,6 +7,14 @@ public sealed class JetStreamApiResponse
public JetStreamApiError? Error { get; init; }
public JetStreamStreamInfo? StreamInfo { get; init; }
public JetStreamConsumerInfo? ConsumerInfo { get; init; }
public JetStreamAccountInfo? AccountInfo { get; init; }
public IReadOnlyList<string>? StreamNames { get; init; }
public IReadOnlyList<string>? ConsumerNames { get; init; }
public JetStreamStreamMessage? StreamMessage { get; init; }
public JetStreamDirectMessage? DirectMessage { get; init; }
public JetStreamSnapshot? Snapshot { get; init; }
public JetStreamPullBatch? PullBatch { get; init; }
public bool Success { get; init; }
public static JetStreamApiResponse NotFound(string subject) => new()
{
@@ -19,6 +27,11 @@ public sealed class JetStreamApiResponse
public static JetStreamApiResponse Ok() => new();
public static JetStreamApiResponse SuccessResponse() => new()
{
Success = true,
};
public static JetStreamApiResponse ErrorResponse(int code, string description) => new()
{
Error = new JetStreamApiError
@@ -39,3 +52,33 @@ public sealed class JetStreamConsumerInfo
{
public required ConsumerConfig Config { get; init; }
}
public sealed class JetStreamAccountInfo
{
public int Streams { get; init; }
public int Consumers { get; init; }
}
public sealed class JetStreamStreamMessage
{
public ulong Sequence { get; init; }
public string Subject { get; init; } = string.Empty;
public string Payload { get; init; } = string.Empty;
}
public sealed class JetStreamDirectMessage
{
public ulong Sequence { get; init; }
public string Subject { get; init; } = string.Empty;
public string Payload { get; init; } = string.Empty;
}
public sealed class JetStreamSnapshot
{
public string Payload { get; init; } = string.Empty;
}
public sealed class JetStreamPullBatch
{
public IReadOnlyList<JetStreamDirectMessage> Messages { get; init; } = [];
}

View File

@@ -6,32 +6,94 @@ public sealed class JetStreamApiRouter
{
private readonly StreamManager _streamManager;
private readonly ConsumerManager _consumerManager;
private readonly JetStream.Cluster.JetStreamMetaGroup? _metaGroup;
public JetStreamApiRouter()
: this(new StreamManager(), new ConsumerManager())
: this(new StreamManager(), new ConsumerManager(), null)
{
}
public JetStreamApiRouter(StreamManager streamManager, ConsumerManager consumerManager)
public JetStreamApiRouter(StreamManager streamManager, ConsumerManager consumerManager, JetStream.Cluster.JetStreamMetaGroup? metaGroup = null)
{
_streamManager = streamManager;
_consumerManager = consumerManager;
_metaGroup = metaGroup;
}
public JetStreamApiResponse Route(string subject, ReadOnlySpan<byte> payload)
{
if (subject.StartsWith("$JS.API.STREAM.CREATE.", StringComparison.Ordinal))
if (subject.Equals(JetStreamApiSubjects.Info, StringComparison.Ordinal))
return AccountApiHandlers.HandleInfo(_streamManager, _consumerManager);
if (subject.StartsWith(JetStreamApiSubjects.StreamCreate, StringComparison.Ordinal))
return StreamApiHandlers.HandleCreate(subject, payload, _streamManager);
if (subject.StartsWith("$JS.API.STREAM.INFO.", StringComparison.Ordinal))
if (subject.StartsWith(JetStreamApiSubjects.StreamInfo, StringComparison.Ordinal))
return StreamApiHandlers.HandleInfo(subject, _streamManager);
if (subject.StartsWith("$JS.API.CONSUMER.CREATE.", StringComparison.Ordinal))
if (subject.Equals(JetStreamApiSubjects.StreamNames, StringComparison.Ordinal))
return StreamApiHandlers.HandleNames(_streamManager);
if (subject.Equals(JetStreamApiSubjects.StreamList, StringComparison.Ordinal))
return StreamApiHandlers.HandleList(_streamManager);
if (subject.StartsWith(JetStreamApiSubjects.StreamUpdate, StringComparison.Ordinal))
return StreamApiHandlers.HandleUpdate(subject, payload, _streamManager);
if (subject.StartsWith(JetStreamApiSubjects.StreamDelete, StringComparison.Ordinal))
return StreamApiHandlers.HandleDelete(subject, _streamManager);
if (subject.StartsWith(JetStreamApiSubjects.StreamPurge, StringComparison.Ordinal))
return StreamApiHandlers.HandlePurge(subject, _streamManager);
if (subject.StartsWith(JetStreamApiSubjects.StreamMessageGet, StringComparison.Ordinal))
return StreamApiHandlers.HandleMessageGet(subject, payload, _streamManager);
if (subject.StartsWith(JetStreamApiSubjects.StreamMessageDelete, StringComparison.Ordinal))
return StreamApiHandlers.HandleMessageDelete(subject, payload, _streamManager);
if (subject.StartsWith(JetStreamApiSubjects.StreamSnapshot, StringComparison.Ordinal))
return StreamApiHandlers.HandleSnapshot(subject, _streamManager);
if (subject.StartsWith(JetStreamApiSubjects.StreamRestore, StringComparison.Ordinal))
return StreamApiHandlers.HandleRestore(subject, payload, _streamManager);
if (subject.StartsWith(JetStreamApiSubjects.StreamLeaderStepdown, StringComparison.Ordinal))
return ClusterControlApiHandlers.HandleStreamLeaderStepdown(subject, _streamManager);
if (subject.StartsWith(JetStreamApiSubjects.ConsumerCreate, StringComparison.Ordinal))
return ConsumerApiHandlers.HandleCreate(subject, payload, _consumerManager);
if (subject.StartsWith("$JS.API.CONSUMER.INFO.", StringComparison.Ordinal))
if (subject.StartsWith(JetStreamApiSubjects.ConsumerInfo, StringComparison.Ordinal))
return ConsumerApiHandlers.HandleInfo(subject, _consumerManager);
if (subject.StartsWith(JetStreamApiSubjects.ConsumerNames, StringComparison.Ordinal))
return ConsumerApiHandlers.HandleNames(subject, _consumerManager);
if (subject.StartsWith(JetStreamApiSubjects.ConsumerList, StringComparison.Ordinal))
return ConsumerApiHandlers.HandleList(subject, _consumerManager);
if (subject.StartsWith(JetStreamApiSubjects.ConsumerDelete, StringComparison.Ordinal))
return ConsumerApiHandlers.HandleDelete(subject, _consumerManager);
if (subject.StartsWith(JetStreamApiSubjects.ConsumerPause, StringComparison.Ordinal))
return ConsumerApiHandlers.HandlePause(subject, payload, _consumerManager);
if (subject.StartsWith(JetStreamApiSubjects.ConsumerReset, StringComparison.Ordinal))
return ConsumerApiHandlers.HandleReset(subject, _consumerManager);
if (subject.StartsWith(JetStreamApiSubjects.ConsumerUnpin, StringComparison.Ordinal))
return ConsumerApiHandlers.HandleUnpin(subject, _consumerManager);
if (subject.StartsWith(JetStreamApiSubjects.ConsumerNext, StringComparison.Ordinal))
return ConsumerApiHandlers.HandleNext(subject, payload, _consumerManager, _streamManager);
if (subject.StartsWith(JetStreamApiSubjects.DirectGet, StringComparison.Ordinal))
return DirectApiHandlers.HandleGet(subject, payload, _streamManager);
if (subject.Equals(JetStreamApiSubjects.MetaLeaderStepdown, StringComparison.Ordinal) && _metaGroup != null)
return ClusterControlApiHandlers.HandleMetaLeaderStepdown(_metaGroup);
return JetStreamApiResponse.NotFound(subject);
}
}

View File

@@ -0,0 +1,29 @@
namespace NATS.Server.JetStream.Api;
public static class JetStreamApiSubjects
{
public const string Info = "$JS.API.INFO";
public const string StreamCreate = "$JS.API.STREAM.CREATE.";
public const string StreamInfo = "$JS.API.STREAM.INFO.";
public const string StreamNames = "$JS.API.STREAM.NAMES";
public const string StreamList = "$JS.API.STREAM.LIST";
public const string StreamUpdate = "$JS.API.STREAM.UPDATE.";
public const string StreamDelete = "$JS.API.STREAM.DELETE.";
public const string StreamPurge = "$JS.API.STREAM.PURGE.";
public const string StreamMessageGet = "$JS.API.STREAM.MSG.GET.";
public const string StreamMessageDelete = "$JS.API.STREAM.MSG.DELETE.";
public const string StreamSnapshot = "$JS.API.STREAM.SNAPSHOT.";
public const string StreamRestore = "$JS.API.STREAM.RESTORE.";
public const string StreamLeaderStepdown = "$JS.API.STREAM.LEADER.STEPDOWN.";
public const string ConsumerCreate = "$JS.API.CONSUMER.CREATE.";
public const string ConsumerInfo = "$JS.API.CONSUMER.INFO.";
public const string ConsumerNames = "$JS.API.CONSUMER.NAMES.";
public const string ConsumerList = "$JS.API.CONSUMER.LIST.";
public const string ConsumerDelete = "$JS.API.CONSUMER.DELETE.";
public const string ConsumerPause = "$JS.API.CONSUMER.PAUSE.";
public const string ConsumerReset = "$JS.API.CONSUMER.RESET.";
public const string ConsumerUnpin = "$JS.API.CONSUMER.UNPIN.";
public const string ConsumerNext = "$JS.API.CONSUMER.MSG.NEXT.";
public const string DirectGet = "$JS.API.DIRECT.GET.";
public const string MetaLeaderStepdown = "$JS.API.META.LEADER.STEPDOWN";
}

View File

@@ -27,6 +27,12 @@ public sealed class JetStreamMetaGroup
ClusterSize = _nodes,
};
}
public void StepDown()
{
// Placeholder for parity API behavior; current in-memory meta group
// does not track explicit leader state.
}
}
public sealed class MetaGroupState

View File

@@ -59,7 +59,46 @@ public sealed class ConsumerManager
public bool TryGet(string stream, string durableName, out ConsumerHandle handle)
=> _consumers.TryGetValue((stream, durableName), out handle!);
public bool Delete(string stream, string durableName)
{
return _consumers.TryRemove((stream, durableName), out _);
}
public IReadOnlyList<string> ListNames(string stream)
=> _consumers.Keys
.Where(k => string.Equals(k.Stream, stream, StringComparison.Ordinal))
.Select(k => k.Name)
.OrderBy(x => x, StringComparer.Ordinal)
.ToArray();
public bool Pause(string stream, string durableName, bool paused)
{
if (!_consumers.TryGetValue((stream, durableName), out var handle))
return false;
handle.Paused = paused;
return true;
}
public bool Reset(string stream, string durableName)
{
if (!_consumers.TryGetValue((stream, durableName), out var handle))
return false;
handle.NextSequence = 1;
handle.Pending.Clear();
return true;
}
public bool Unpin(string stream, string durableName)
{
return _consumers.ContainsKey((stream, durableName));
}
public async ValueTask<PullFetchBatch> FetchAsync(string stream, string durableName, int batch, StreamManager streamManager, CancellationToken ct)
=> await FetchAsync(stream, durableName, new PullFetchRequest { Batch = batch }, streamManager, ct);
public async ValueTask<PullFetchBatch> FetchAsync(string stream, string durableName, PullFetchRequest request, StreamManager streamManager, CancellationToken ct)
{
if (!_consumers.TryGetValue((stream, durableName), out var consumer))
return new PullFetchBatch([]);
@@ -67,7 +106,24 @@ public sealed class ConsumerManager
if (!streamManager.TryGet(stream, out var streamHandle))
return new PullFetchBatch([]);
return await _pullConsumerEngine.FetchAsync(streamHandle, consumer, batch, ct);
return await _pullConsumerEngine.FetchAsync(streamHandle, consumer, request, ct);
}
public bool AckAll(string stream, string durableName, ulong sequence)
{
if (!_consumers.TryGetValue((stream, durableName), out var handle))
return false;
handle.AckProcessor.AckAll(sequence);
return true;
}
public int GetPendingCount(string stream, string durableName)
{
if (!_consumers.TryGetValue((stream, durableName), out var handle))
return 0;
return handle.AckProcessor.PendingCount;
}
public void OnPublished(string stream, StoredMessage message)
@@ -91,6 +147,7 @@ public sealed class ConsumerManager
public sealed record ConsumerHandle(string Stream, ConsumerConfig Config)
{
public ulong NextSequence { get; set; } = 1;
public bool Paused { get; set; }
public Queue<StoredMessage> Pending { get; } = new();
public Queue<PushFrame> PushFrames { get; } = new();
public AckProcessor AckProcessor { get; } = new();

View File

@@ -21,4 +21,11 @@ public sealed class AckProcessor
}
public bool HasPending => _pending.Count > 0;
public int PendingCount => _pending.Count;
public void AckAll(ulong sequence)
{
foreach (var key in _pending.Keys.Where(k => k <= sequence).ToArray())
_pending.Remove(key);
}
}

View File

@@ -6,9 +6,20 @@ namespace NATS.Server.JetStream.Consumers;
public sealed class PullConsumerEngine
{
public async ValueTask<PullFetchBatch> FetchAsync(StreamHandle stream, ConsumerHandle consumer, int batch, CancellationToken ct)
=> await FetchAsync(stream, consumer, new PullFetchRequest { Batch = batch }, ct);
public async ValueTask<PullFetchBatch> FetchAsync(StreamHandle stream, ConsumerHandle consumer, PullFetchRequest request, CancellationToken ct)
{
var batch = Math.Max(request.Batch, 1);
var messages = new List<StoredMessage>(batch);
if (request.NoWait)
{
var available = await stream.Store.LoadAsync(consumer.NextSequence, ct);
if (available == null)
return new PullFetchBatch([], timedOut: false);
}
if (consumer.Config.AckPolicy == AckPolicy.Explicit)
{
var expired = consumer.AckProcessor.NextExpired();
@@ -42,7 +53,7 @@ public sealed class PullConsumerEngine
break;
messages.Add(message);
if (consumer.Config.AckPolicy == AckPolicy.Explicit)
if (consumer.Config.AckPolicy is AckPolicy.Explicit or AckPolicy.All)
consumer.AckProcessor.Register(message.Sequence, consumer.Config.AckWaitMs);
sequence++;
}
@@ -55,9 +66,18 @@ public sealed class PullConsumerEngine
public sealed class PullFetchBatch
{
public IReadOnlyList<StoredMessage> Messages { get; }
public bool TimedOut { get; }
public PullFetchBatch(IReadOnlyList<StoredMessage> messages)
public PullFetchBatch(IReadOnlyList<StoredMessage> messages, bool timedOut = false)
{
Messages = messages;
TimedOut = timedOut;
}
}
public sealed class PullFetchRequest
{
public int Batch { get; init; } = 1;
public bool NoWait { get; init; }
public int ExpiresMs { get; init; }
}

View File

@@ -13,7 +13,7 @@ public sealed class PushConsumerEngine
Message = message,
});
if (consumer.Config.AckPolicy == AckPolicy.Explicit)
if (consumer.Config.AckPolicy is AckPolicy.Explicit or AckPolicy.All)
consumer.AckProcessor.Register(message.Sequence, consumer.Config.AckWaitMs);
if (consumer.Config.HeartbeatMs > 0)

View File

@@ -5,6 +5,8 @@ public sealed class ConsumerConfig
public string DurableName { get; set; } = string.Empty;
public string? FilterSubject { get; set; }
public AckPolicy AckPolicy { get; set; } = AckPolicy.None;
public DeliverPolicy DeliverPolicy { get; set; } = DeliverPolicy.All;
public ReplayPolicy ReplayPolicy { get; set; } = ReplayPolicy.Instant;
public int AckWaitMs { get; set; } = 30_000;
public int MaxDeliver { get; set; } = 1;
public bool Push { get; set; }
@@ -15,4 +17,5 @@ public enum AckPolicy
{
None,
Explicit,
All,
}

View File

@@ -0,0 +1,30 @@
namespace NATS.Server.JetStream.Models;
public enum RetentionPolicy
{
Limits,
Interest,
WorkQueue,
}
public enum DiscardPolicy
{
Old,
New,
}
public enum DeliverPolicy
{
All,
Last,
New,
ByStartSequence,
ByStartTime,
LastPerSubject,
}
public enum ReplayPolicy
{
Instant,
Original,
}

View File

@@ -5,6 +5,9 @@ public sealed class StreamConfig
public string Name { get; set; } = string.Empty;
public List<string> Subjects { get; set; } = [];
public int MaxMsgs { get; set; }
public int MaxConsumers { get; set; }
public RetentionPolicy Retention { get; set; } = RetentionPolicy.Limits;
public DiscardPolicy Discard { get; set; } = DiscardPolicy.Old;
public int Replicas { get; set; } = 1;
public string? Mirror { get; set; }
public string? Source { get; set; }

View File

@@ -11,11 +11,30 @@ public sealed class JetStreamPublisher
}
public bool TryCapture(string subject, ReadOnlyMemory<byte> payload, out PubAck ack)
=> TryCapture(subject, payload, null, out ack);
=> TryCaptureWithOptions(subject, payload, new PublishOptions(), out ack);
public bool TryCapture(string subject, ReadOnlyMemory<byte> payload, string? msgId, out PubAck ack)
=> TryCaptureWithOptions(subject, payload, new PublishOptions { MsgId = msgId }, out ack);
public bool TryCaptureWithOptions(string subject, ReadOnlyMemory<byte> payload, PublishOptions options, out PubAck ack)
{
if (_preconditions.IsDuplicate(msgId, out var existingSequence))
if (_streamManager.FindBySubject(subject) is not { } stream)
{
ack = new PubAck();
return false;
}
var state = stream.Store.GetStateAsync(default).GetAwaiter().GetResult();
if (!_preconditions.CheckExpectedLastSeq(options.ExpectedLastSeq, state.LastSeq))
{
ack = new PubAck
{
ErrorCode = 10071,
};
return true;
}
if (_preconditions.IsDuplicate(options.MsgId, out var existingSequence))
{
ack = new PubAck
{
@@ -26,14 +45,8 @@ public sealed class JetStreamPublisher
}
var captured = _streamManager.Capture(subject, payload);
if (captured == null)
{
ack = new PubAck();
return false;
}
ack = captured;
_preconditions.Record(msgId, ack.Seq);
ack = captured ?? new PubAck();
_preconditions.Record(options.MsgId, ack.Seq);
return true;
}
}

View File

@@ -0,0 +1,7 @@
namespace NATS.Server.JetStream.Publish;
public sealed class PublishOptions
{
public string? MsgId { get; init; }
public ulong ExpectedLastSeq { get; init; }
}

View File

@@ -22,4 +22,7 @@ public sealed class PublishPreconditions
_dedupe[msgId] = sequence;
}
public bool CheckExpectedLastSeq(ulong expectedLastSeq, ulong actualLastSeq)
=> expectedLastSeq == 0 || expectedLastSeq == actualLastSeq;
}

View File

@@ -0,0 +1,10 @@
namespace NATS.Server.JetStream.Snapshots;
public sealed class StreamSnapshotService
{
public ValueTask<byte[]> SnapshotAsync(StreamHandle stream, CancellationToken ct)
=> stream.Store.CreateSnapshotAsync(ct);
public ValueTask RestoreAsync(StreamHandle stream, ReadOnlyMemory<byte> snapshot, CancellationToken ct)
=> stream.Store.RestoreSnapshotAsync(snapshot, ct);
}

View File

@@ -43,6 +43,23 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable
return ValueTask.FromResult(msg);
}
public ValueTask<StoredMessage?> LoadLastBySubjectAsync(string subject, CancellationToken ct)
{
var match = _messages.Values
.Where(m => string.Equals(m.Subject, subject, StringComparison.Ordinal))
.OrderByDescending(m => m.Sequence)
.FirstOrDefault();
return ValueTask.FromResult(match);
}
public ValueTask<bool> RemoveAsync(ulong sequence, CancellationToken ct)
{
var removed = _messages.Remove(sequence);
if (removed)
RewriteDataFile();
return ValueTask.FromResult(removed);
}
public ValueTask PurgeAsync(CancellationToken ct)
{
_messages.Clear();
@@ -52,6 +69,49 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable
return ValueTask.CompletedTask;
}
public ValueTask<byte[]> CreateSnapshotAsync(CancellationToken ct)
{
var snapshot = _messages
.Values
.OrderBy(x => x.Sequence)
.Select(x => new FileRecord
{
Sequence = x.Sequence,
Subject = x.Subject,
PayloadBase64 = Convert.ToBase64String(x.Payload.ToArray()),
})
.ToArray();
return ValueTask.FromResult(JsonSerializer.SerializeToUtf8Bytes(snapshot));
}
public ValueTask RestoreSnapshotAsync(ReadOnlyMemory<byte> snapshot, CancellationToken ct)
{
_messages.Clear();
_last = 0;
if (!snapshot.IsEmpty)
{
var records = JsonSerializer.Deserialize<FileRecord[]>(snapshot.Span);
if (records != null)
{
foreach (var record in records)
{
var message = new StoredMessage
{
Sequence = record.Sequence,
Subject = record.Subject ?? string.Empty,
Payload = Convert.FromBase64String(record.PayloadBase64 ?? string.Empty),
};
_messages[record.Sequence] = message;
_last = Math.Max(_last, record.Sequence);
}
}
}
RewriteDataFile();
return ValueTask.CompletedTask;
}
public ValueTask<StreamState> GetStateAsync(CancellationToken ct)
{
return ValueTask.FromResult(new StreamState

View File

@@ -6,6 +6,10 @@ public interface IStreamStore
{
ValueTask<ulong> AppendAsync(string subject, ReadOnlyMemory<byte> payload, CancellationToken ct);
ValueTask<StoredMessage?> LoadAsync(ulong sequence, CancellationToken ct);
ValueTask<StoredMessage?> LoadLastBySubjectAsync(string subject, CancellationToken ct);
ValueTask<bool> RemoveAsync(ulong sequence, CancellationToken ct);
ValueTask PurgeAsync(CancellationToken ct);
ValueTask<byte[]> CreateSnapshotAsync(CancellationToken ct);
ValueTask RestoreSnapshotAsync(ReadOnlyMemory<byte> snapshot, CancellationToken ct);
ValueTask<StreamState> GetStateAsync(CancellationToken ct);
}

View File

@@ -1,9 +1,17 @@
using System.Text.Json;
using NATS.Server.JetStream.Models;
namespace NATS.Server.JetStream.Storage;
public sealed class MemStore : IStreamStore
{
private sealed class SnapshotRecord
{
public ulong Sequence { get; init; }
public string Subject { get; init; } = string.Empty;
public string PayloadBase64 { get; init; } = string.Empty;
}
private readonly object _gate = new();
private ulong _last;
private readonly Dictionary<ulong, StoredMessage> _messages = new();
@@ -32,6 +40,26 @@ public sealed class MemStore : IStreamStore
}
}
public ValueTask<StoredMessage?> LoadLastBySubjectAsync(string subject, CancellationToken ct)
{
lock (_gate)
{
var match = _messages.Values
.Where(m => string.Equals(m.Subject, subject, StringComparison.Ordinal))
.OrderByDescending(m => m.Sequence)
.FirstOrDefault();
return ValueTask.FromResult(match);
}
}
public ValueTask<bool> RemoveAsync(ulong sequence, CancellationToken ct)
{
lock (_gate)
{
return ValueTask.FromResult(_messages.Remove(sequence));
}
}
public ValueTask PurgeAsync(CancellationToken ct)
{
lock (_gate)
@@ -42,6 +70,53 @@ public sealed class MemStore : IStreamStore
}
}
public ValueTask<byte[]> CreateSnapshotAsync(CancellationToken ct)
{
lock (_gate)
{
var snapshot = _messages
.Values
.OrderBy(x => x.Sequence)
.Select(x => new SnapshotRecord
{
Sequence = x.Sequence,
Subject = x.Subject,
PayloadBase64 = Convert.ToBase64String(x.Payload.ToArray()),
})
.ToArray();
return ValueTask.FromResult(JsonSerializer.SerializeToUtf8Bytes(snapshot));
}
}
public ValueTask RestoreSnapshotAsync(ReadOnlyMemory<byte> snapshot, CancellationToken ct)
{
lock (_gate)
{
_messages.Clear();
_last = 0;
if (!snapshot.IsEmpty)
{
var records = JsonSerializer.Deserialize<SnapshotRecord[]>(snapshot.Span);
if (records != null)
{
foreach (var record in records)
{
_messages[record.Sequence] = new StoredMessage
{
Sequence = record.Sequence,
Subject = record.Subject,
Payload = Convert.FromBase64String(record.PayloadBase64),
};
_last = Math.Max(_last, record.Sequence);
}
}
}
return ValueTask.CompletedTask;
}
}
public ValueTask<StreamState> GetStateAsync(CancellationToken ct)
{
lock (_gate)

View File

@@ -5,6 +5,7 @@ using NATS.Server.JetStream.Cluster;
using NATS.Server.JetStream.MirrorSource;
using NATS.Server.JetStream.Models;
using NATS.Server.JetStream.Publish;
using NATS.Server.JetStream.Snapshots;
using NATS.Server.JetStream.Storage;
using NATS.Server.Subscriptions;
@@ -22,6 +23,7 @@ public sealed class StreamManager
new(StringComparer.Ordinal);
private readonly ConcurrentDictionary<string, List<SourceCoordinator>> _sourcesByOrigin =
new(StringComparer.Ordinal);
private readonly StreamSnapshotService _snapshotService = new();
public StreamManager(JetStreamMetaGroup? metaGroup = null, Account? account = null)
{
@@ -31,6 +33,9 @@ public sealed class StreamManager
public IReadOnlyCollection<string> StreamNames => _streams.Keys.ToArray();
public IReadOnlyList<string> ListNames()
=> [.. _streams.Keys.OrderBy(x => x, StringComparer.Ordinal)];
public JetStreamApiResponse CreateOrUpdate(StreamConfig config)
{
if (string.IsNullOrWhiteSpace(config.Name))
@@ -67,6 +72,59 @@ public sealed class StreamManager
public bool TryGet(string name, out StreamHandle handle) => _streams.TryGetValue(name, out handle!);
public bool Delete(string name)
{
if (!_streams.TryRemove(name, out _))
return false;
_replicaGroups.TryRemove(name, out _);
_account?.ReleaseStream();
RebuildReplicationCoordinators();
return true;
}
public bool Purge(string name)
{
if (!_streams.TryGetValue(name, out var stream))
return false;
stream.Store.PurgeAsync(default).GetAwaiter().GetResult();
return true;
}
public StoredMessage? GetMessage(string name, ulong sequence)
{
if (!_streams.TryGetValue(name, out var stream))
return null;
return stream.Store.LoadAsync(sequence, default).GetAwaiter().GetResult();
}
public bool DeleteMessage(string name, ulong sequence)
{
if (!_streams.TryGetValue(name, out var stream))
return false;
return stream.Store.RemoveAsync(sequence, default).GetAwaiter().GetResult();
}
public byte[]? CreateSnapshot(string name)
{
if (!_streams.TryGetValue(name, out var stream))
return null;
return _snapshotService.SnapshotAsync(stream, default).GetAwaiter().GetResult();
}
public bool RestoreSnapshot(string name, ReadOnlyMemory<byte> snapshot)
{
if (!_streams.TryGetValue(name, out var stream))
return false;
_snapshotService.RestoreAsync(stream, snapshot, default).GetAwaiter().GetResult();
return true;
}
public ValueTask<StreamState> GetStateAsync(string name, CancellationToken ct)
{
if (_streams.TryGetValue(name, out var stream))
@@ -123,6 +181,9 @@ public sealed class StreamManager
Name = config.Name,
Subjects = config.Subjects.Count == 0 ? [] : [.. config.Subjects],
MaxMsgs = config.MaxMsgs,
MaxConsumers = config.MaxConsumers,
Retention = config.Retention,
Discard = config.Discard,
Replicas = config.Replicas,
Mirror = config.Mirror,
Source = config.Source,

View File

@@ -5,9 +5,15 @@ namespace NATS.Server.JetStream.Validation;
public static class JetStreamConfigValidator
{
public static ValidationResult Validate(StreamConfig config)
=> string.IsNullOrWhiteSpace(config.Name) || config.Subjects.Count == 0
? ValidationResult.Invalid("name/subjects required")
: ValidationResult.Valid();
{
if (string.IsNullOrWhiteSpace(config.Name) || config.Subjects.Count == 0)
return ValidationResult.Invalid("name/subjects required");
if (config.Retention == RetentionPolicy.WorkQueue && config.MaxConsumers == 0)
return ValidationResult.Invalid("workqueue retention requires max consumers > 0");
return ValidationResult.Valid();
}
}
public sealed class ValidationResult

View File

@@ -24,6 +24,8 @@ public sealed class JszHandler
Storage = 0,
Streams = _server.JetStreamStreams,
Consumers = _server.JetStreamConsumers,
ApiTotal = (ulong)Math.Max(Interlocked.Read(ref _server.Stats.JetStreamApiTotal), 0),
ApiErrors = (ulong)Math.Max(Interlocked.Read(ref _server.Stats.JetStreamApiErrors), 0),
Config = new JetStreamConfig
{
MaxMemory = _options.JetStream?.MaxMemoryStore ?? 0,
@@ -57,6 +59,12 @@ public sealed class JszResponse
[JsonPropertyName("consumers")]
public int Consumers { get; set; }
[JsonPropertyName("api_total")]
public ulong ApiTotal { get; set; }
[JsonPropertyName("api_errors")]
public ulong ApiErrors { get; set; }
[JsonPropertyName("config")]
public JetStreamConfig Config { get; set; } = new();
}

View File

@@ -136,6 +136,11 @@ public sealed class VarzHandler : IDisposable
HaAssets = _server.JetStreamStreams,
Streams = _server.JetStreamStreams,
Consumers = _server.JetStreamConsumers,
Api = new JetStreamApiStats
{
Total = (ulong)Math.Max(Interlocked.Read(ref stats.JetStreamApiTotal), 0),
Errors = (ulong)Math.Max(Interlocked.Read(ref stats.JetStreamApiErrors), 0),
},
},
},
};

View File

@@ -5,6 +5,7 @@ using System.Net.Sockets;
using System.Runtime.InteropServices;
using System.Security.Cryptography.X509Certificates;
using System.Text;
using System.Text.Json;
using Microsoft.Extensions.Logging;
using NATS.NKeys;
using NATS.Server.Auth;
@@ -805,6 +806,20 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
public void ProcessMessage(string subject, string? replyTo, ReadOnlyMemory<byte> headers,
ReadOnlyMemory<byte> payload, NatsClient sender)
{
if (replyTo != null
&& subject.StartsWith("$JS.API", StringComparison.Ordinal)
&& _jetStreamApiRouter != null)
{
var response = _jetStreamApiRouter.Route(subject, payload.Span);
Interlocked.Increment(ref _stats.JetStreamApiTotal);
if (response.Error != null)
Interlocked.Increment(ref _stats.JetStreamApiErrors);
var data = JsonSerializer.SerializeToUtf8Bytes(response);
ProcessMessage(replyTo, null, default, data, sender);
return;
}
if (TryCaptureJetStreamPublish(subject, payload, out var pubAck))
sender.RecordJetStreamPubAck(pubAck);

View File

@@ -79,6 +79,16 @@ public sealed class RaftNode
Log.AppendReplicated(entry);
}
public Task TryAppendFromLeaderAsync(RaftLogEntry entry, CancellationToken ct)
{
_ = ct;
if (entry.Term < TermState.CurrentTerm)
throw new InvalidOperationException("stale term append rejected");
ReceiveReplicatedEntry(entry);
return Task.CompletedTask;
}
public async Task<RaftSnapshot> CreateSnapshotAsync(CancellationToken ct)
{
var snapshot = new RaftSnapshot

View File

@@ -24,5 +24,7 @@ public sealed class ServerStats
public long StaleConnectionLeafs;
public long StaleConnectionGateways;
public bool JetStreamEnabled;
public long JetStreamApiTotal;
public long JetStreamApiErrors;
public readonly ConcurrentDictionary<string, long> HttpReqStats = new();
}