feat: add jetstream stream lifecycle api
This commit is contained in:
91
src/NATS.Server/JetStream/Api/Handlers/StreamApiHandlers.cs
Normal file
91
src/NATS.Server/JetStream/Api/Handlers/StreamApiHandlers.cs
Normal file
@@ -0,0 +1,91 @@
|
|||||||
|
using System.Text.Json;
|
||||||
|
using NATS.Server.JetStream.Models;
|
||||||
|
|
||||||
|
namespace NATS.Server.JetStream.Api.Handlers;
|
||||||
|
|
||||||
|
public static class StreamApiHandlers
|
||||||
|
{
|
||||||
|
private const string CreatePrefix = "$JS.API.STREAM.CREATE.";
|
||||||
|
private const string InfoPrefix = "$JS.API.STREAM.INFO.";
|
||||||
|
|
||||||
|
public static JetStreamApiResponse HandleCreate(string subject, ReadOnlySpan<byte> payload, StreamManager streamManager)
|
||||||
|
{
|
||||||
|
var streamName = ExtractTrailingToken(subject, CreatePrefix);
|
||||||
|
if (streamName == null)
|
||||||
|
return JetStreamApiResponse.NotFound(subject);
|
||||||
|
|
||||||
|
var config = ParseConfig(payload);
|
||||||
|
if (string.IsNullOrWhiteSpace(config.Name))
|
||||||
|
config.Name = streamName;
|
||||||
|
|
||||||
|
if (config.Subjects.Count == 0)
|
||||||
|
config.Subjects.Add(streamName.ToLowerInvariant() + ".>");
|
||||||
|
|
||||||
|
return streamManager.CreateOrUpdate(config);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static JetStreamApiResponse HandleInfo(string subject, StreamManager streamManager)
|
||||||
|
{
|
||||||
|
var streamName = ExtractTrailingToken(subject, InfoPrefix);
|
||||||
|
if (streamName == null)
|
||||||
|
return JetStreamApiResponse.NotFound(subject);
|
||||||
|
|
||||||
|
return streamManager.GetInfo(streamName);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static string? ExtractTrailingToken(string subject, string prefix)
|
||||||
|
{
|
||||||
|
if (!subject.StartsWith(prefix, StringComparison.Ordinal))
|
||||||
|
return null;
|
||||||
|
|
||||||
|
var token = subject[prefix.Length..].Trim();
|
||||||
|
return token.Length == 0 ? null : token;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static StreamConfig ParseConfig(ReadOnlySpan<byte> payload)
|
||||||
|
{
|
||||||
|
if (payload.IsEmpty)
|
||||||
|
return new StreamConfig();
|
||||||
|
|
||||||
|
try
|
||||||
|
{
|
||||||
|
using var doc = JsonDocument.Parse(payload.ToArray());
|
||||||
|
var root = doc.RootElement;
|
||||||
|
var config = new StreamConfig();
|
||||||
|
|
||||||
|
if (root.TryGetProperty("name", out var nameEl))
|
||||||
|
config.Name = nameEl.GetString() ?? string.Empty;
|
||||||
|
|
||||||
|
if (root.TryGetProperty("subjects", out var subjectsEl))
|
||||||
|
{
|
||||||
|
if (subjectsEl.ValueKind == JsonValueKind.Array)
|
||||||
|
{
|
||||||
|
foreach (var item in subjectsEl.EnumerateArray())
|
||||||
|
{
|
||||||
|
var value = item.GetString();
|
||||||
|
if (!string.IsNullOrWhiteSpace(value))
|
||||||
|
config.Subjects.Add(value);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else if (subjectsEl.ValueKind == JsonValueKind.String)
|
||||||
|
{
|
||||||
|
var value = subjectsEl.GetString();
|
||||||
|
if (!string.IsNullOrWhiteSpace(value))
|
||||||
|
config.Subjects.Add(value);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (root.TryGetProperty("max_msgs", out var maxMsgsEl) && maxMsgsEl.TryGetInt32(out var maxMsgs))
|
||||||
|
config.MaxMsgs = maxMsgs;
|
||||||
|
|
||||||
|
if (root.TryGetProperty("replicas", out var replicasEl) && replicasEl.TryGetInt32(out var replicas))
|
||||||
|
config.Replicas = replicas;
|
||||||
|
|
||||||
|
return config;
|
||||||
|
}
|
||||||
|
catch (JsonException)
|
||||||
|
{
|
||||||
|
return new StreamConfig();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -1,3 +1,5 @@
|
|||||||
|
using NATS.Server.JetStream.Models;
|
||||||
|
|
||||||
namespace NATS.Server.JetStream.Api;
|
namespace NATS.Server.JetStream.Api;
|
||||||
|
|
||||||
public sealed class JetStreamApiResponse
|
public sealed class JetStreamApiResponse
|
||||||
@@ -16,31 +18,24 @@ public sealed class JetStreamApiResponse
|
|||||||
};
|
};
|
||||||
|
|
||||||
public static JetStreamApiResponse Ok() => new();
|
public static JetStreamApiResponse Ok() => new();
|
||||||
|
|
||||||
|
public static JetStreamApiResponse ErrorResponse(int code, string description) => new()
|
||||||
|
{
|
||||||
|
Error = new JetStreamApiError
|
||||||
|
{
|
||||||
|
Code = code,
|
||||||
|
Description = description,
|
||||||
|
},
|
||||||
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
public sealed class JetStreamStreamInfo
|
public sealed class JetStreamStreamInfo
|
||||||
{
|
{
|
||||||
public required JetStreamStreamConfig Config { get; init; }
|
public required StreamConfig Config { get; init; }
|
||||||
public required JetStreamStreamState State { get; init; }
|
public required StreamState State { get; init; }
|
||||||
}
|
}
|
||||||
|
|
||||||
public sealed class JetStreamConsumerInfo
|
public sealed class JetStreamConsumerInfo
|
||||||
{
|
{
|
||||||
public required JetStreamConsumerConfig Config { get; init; }
|
public required ConsumerConfig Config { get; init; }
|
||||||
}
|
|
||||||
|
|
||||||
public sealed class JetStreamStreamConfig
|
|
||||||
{
|
|
||||||
public string Name { get; init; } = string.Empty;
|
|
||||||
}
|
|
||||||
|
|
||||||
public sealed class JetStreamStreamState
|
|
||||||
{
|
|
||||||
public ulong Messages { get; init; }
|
|
||||||
public ulong FirstSeq { get; init; }
|
|
||||||
}
|
|
||||||
|
|
||||||
public sealed class JetStreamConsumerConfig
|
|
||||||
{
|
|
||||||
public string DurableName { get; init; } = string.Empty;
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,7 +1,29 @@
|
|||||||
|
using NATS.Server.JetStream.Api.Handlers;
|
||||||
|
|
||||||
namespace NATS.Server.JetStream.Api;
|
namespace NATS.Server.JetStream.Api;
|
||||||
|
|
||||||
public sealed class JetStreamApiRouter
|
public sealed class JetStreamApiRouter
|
||||||
{
|
{
|
||||||
public JetStreamApiResponse Route(string subject, ReadOnlySpan<byte> payload)
|
private readonly StreamManager _streamManager;
|
||||||
=> JetStreamApiResponse.NotFound(subject);
|
|
||||||
|
public JetStreamApiRouter()
|
||||||
|
: this(new StreamManager())
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
public JetStreamApiRouter(StreamManager streamManager)
|
||||||
|
{
|
||||||
|
_streamManager = streamManager;
|
||||||
|
}
|
||||||
|
|
||||||
|
public JetStreamApiResponse Route(string subject, ReadOnlySpan<byte> payload)
|
||||||
|
{
|
||||||
|
if (subject.StartsWith("$JS.API.STREAM.CREATE.", StringComparison.Ordinal))
|
||||||
|
return StreamApiHandlers.HandleCreate(subject, payload, _streamManager);
|
||||||
|
|
||||||
|
if (subject.StartsWith("$JS.API.STREAM.INFO.", StringComparison.Ordinal))
|
||||||
|
return StreamApiHandlers.HandleInfo(subject, _streamManager);
|
||||||
|
|
||||||
|
return JetStreamApiResponse.NotFound(subject);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
78
src/NATS.Server/JetStream/StreamManager.cs
Normal file
78
src/NATS.Server/JetStream/StreamManager.cs
Normal file
@@ -0,0 +1,78 @@
|
|||||||
|
using System.Collections.Concurrent;
|
||||||
|
using NATS.Server.JetStream.Api;
|
||||||
|
using NATS.Server.JetStream.Models;
|
||||||
|
using NATS.Server.JetStream.Storage;
|
||||||
|
using NATS.Server.Subscriptions;
|
||||||
|
|
||||||
|
namespace NATS.Server.JetStream;
|
||||||
|
|
||||||
|
public sealed class StreamManager
|
||||||
|
{
|
||||||
|
private readonly ConcurrentDictionary<string, StreamHandle> _streams =
|
||||||
|
new(StringComparer.Ordinal);
|
||||||
|
|
||||||
|
public IReadOnlyCollection<string> StreamNames => _streams.Keys.ToArray();
|
||||||
|
|
||||||
|
public JetStreamApiResponse CreateOrUpdate(StreamConfig config)
|
||||||
|
{
|
||||||
|
if (string.IsNullOrWhiteSpace(config.Name))
|
||||||
|
return JetStreamApiResponse.ErrorResponse(400, "stream name required");
|
||||||
|
|
||||||
|
var normalized = NormalizeConfig(config);
|
||||||
|
var handle = _streams.AddOrUpdate(
|
||||||
|
normalized.Name,
|
||||||
|
_ => new StreamHandle(normalized, new MemStore()),
|
||||||
|
(_, existing) => existing with { Config = normalized });
|
||||||
|
|
||||||
|
return BuildStreamInfoResponse(handle);
|
||||||
|
}
|
||||||
|
|
||||||
|
public JetStreamApiResponse GetInfo(string name)
|
||||||
|
{
|
||||||
|
if (_streams.TryGetValue(name, out var stream))
|
||||||
|
return BuildStreamInfoResponse(stream);
|
||||||
|
|
||||||
|
return JetStreamApiResponse.NotFound($"$JS.API.STREAM.INFO.{name}");
|
||||||
|
}
|
||||||
|
|
||||||
|
public bool TryGet(string name, out StreamHandle handle) => _streams.TryGetValue(name, out handle!);
|
||||||
|
|
||||||
|
public StreamHandle? FindBySubject(string subject)
|
||||||
|
{
|
||||||
|
foreach (var stream in _streams.Values)
|
||||||
|
{
|
||||||
|
if (stream.Config.Subjects.Any(p => SubjectMatch.MatchLiteral(subject, p)))
|
||||||
|
return stream;
|
||||||
|
}
|
||||||
|
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static StreamConfig NormalizeConfig(StreamConfig config)
|
||||||
|
{
|
||||||
|
var copy = new StreamConfig
|
||||||
|
{
|
||||||
|
Name = config.Name,
|
||||||
|
Subjects = config.Subjects.Count == 0 ? [] : [.. config.Subjects],
|
||||||
|
MaxMsgs = config.MaxMsgs,
|
||||||
|
Replicas = config.Replicas,
|
||||||
|
};
|
||||||
|
|
||||||
|
return copy;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static JetStreamApiResponse BuildStreamInfoResponse(StreamHandle handle)
|
||||||
|
{
|
||||||
|
var state = handle.Store.GetStateAsync(default).GetAwaiter().GetResult();
|
||||||
|
return new JetStreamApiResponse
|
||||||
|
{
|
||||||
|
StreamInfo = new JetStreamStreamInfo
|
||||||
|
{
|
||||||
|
Config = handle.Config,
|
||||||
|
State = state,
|
||||||
|
},
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public sealed record StreamHandle(StreamConfig Config, IStreamStore Store);
|
||||||
15
tests/NATS.Server.Tests/JetStreamStreamApiTests.cs
Normal file
15
tests/NATS.Server.Tests/JetStreamStreamApiTests.cs
Normal file
@@ -0,0 +1,15 @@
|
|||||||
|
namespace NATS.Server.Tests;
|
||||||
|
|
||||||
|
public class JetStreamStreamApiTests
|
||||||
|
{
|
||||||
|
[Fact]
|
||||||
|
public async Task Stream_create_and_info_roundtrip()
|
||||||
|
{
|
||||||
|
var create = await JetStreamApiFixture.RequestAsync("$JS.API.STREAM.CREATE.ORDERS", "{\"name\":\"ORDERS\",\"subjects\":[\"orders.*\"]}");
|
||||||
|
create.Error.ShouldBeNull();
|
||||||
|
|
||||||
|
var info = await JetStreamApiFixture.RequestAsync("$JS.API.STREAM.INFO.ORDERS", "{}");
|
||||||
|
info.Error.ShouldBeNull();
|
||||||
|
info.StreamInfo!.Config.Name.ShouldBe("ORDERS");
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user