Files
natsdotnet/src/NATS.Server/JetStream/Api/Handlers/StreamApiHandlers.cs
2026-02-23 12:11:19 -05:00

352 lines
13 KiB
C#

using System.Text.Json;
using System.Text;
using NATS.Server.JetStream.Models;
namespace NATS.Server.JetStream.Api.Handlers;
public static class StreamApiHandlers
{
private const string CreatePrefix = JetStreamApiSubjects.StreamCreate;
private const string InfoPrefix = JetStreamApiSubjects.StreamInfo;
private const string UpdatePrefix = JetStreamApiSubjects.StreamUpdate;
private const string DeletePrefix = JetStreamApiSubjects.StreamDelete;
private const string PurgePrefix = JetStreamApiSubjects.StreamPurge;
private const string MessageGetPrefix = JetStreamApiSubjects.StreamMessageGet;
private const string MessageDeletePrefix = JetStreamApiSubjects.StreamMessageDelete;
private const string SnapshotPrefix = JetStreamApiSubjects.StreamSnapshot;
private const string RestorePrefix = JetStreamApiSubjects.StreamRestore;
public static JetStreamApiResponse HandleCreate(string subject, ReadOnlySpan<byte> payload, StreamManager streamManager)
{
var streamName = ExtractTrailingToken(subject, CreatePrefix);
if (streamName == null)
return JetStreamApiResponse.NotFound(subject);
var config = ParseConfig(payload);
if (string.IsNullOrWhiteSpace(config.Name))
config.Name = streamName;
if (config.Subjects.Count == 0)
config.Subjects.Add(streamName.ToLowerInvariant() + ".>");
return streamManager.CreateOrUpdate(config);
}
public static JetStreamApiResponse HandleInfo(string subject, StreamManager streamManager)
{
var streamName = ExtractTrailingToken(subject, InfoPrefix);
if (streamName == null)
return JetStreamApiResponse.NotFound(subject);
return streamManager.GetInfo(streamName);
}
public static JetStreamApiResponse HandleUpdate(string subject, ReadOnlySpan<byte> payload, StreamManager streamManager)
{
var streamName = ExtractTrailingToken(subject, UpdatePrefix);
if (streamName == null)
return JetStreamApiResponse.NotFound(subject);
var config = ParseConfig(payload);
if (string.IsNullOrWhiteSpace(config.Name))
config.Name = streamName;
if (config.Subjects.Count == 0)
config.Subjects.Add(streamName.ToLowerInvariant() + ".>");
return streamManager.CreateOrUpdate(config);
}
public static JetStreamApiResponse HandleDelete(string subject, StreamManager streamManager)
{
var streamName = ExtractTrailingToken(subject, DeletePrefix);
if (streamName == null)
return JetStreamApiResponse.NotFound(subject);
return streamManager.Delete(streamName)
? JetStreamApiResponse.SuccessResponse()
: JetStreamApiResponse.NotFound(subject);
}
public static JetStreamApiResponse HandlePurge(string subject, StreamManager streamManager)
{
var streamName = ExtractTrailingToken(subject, PurgePrefix);
if (streamName == null)
return JetStreamApiResponse.NotFound(subject);
return streamManager.Purge(streamName)
? JetStreamApiResponse.SuccessResponse()
: JetStreamApiResponse.NotFound(subject);
}
public static JetStreamApiResponse HandleNames(StreamManager streamManager)
{
return new JetStreamApiResponse
{
StreamNames = streamManager.ListNames(),
};
}
public static JetStreamApiResponse HandleList(StreamManager streamManager)
{
return HandleNames(streamManager);
}
public static JetStreamApiResponse HandleMessageGet(string subject, ReadOnlySpan<byte> payload, StreamManager streamManager)
{
var streamName = ExtractTrailingToken(subject, MessageGetPrefix);
if (streamName == null)
return JetStreamApiResponse.NotFound(subject);
var sequence = ParseSequence(payload);
if (sequence == 0)
return JetStreamApiResponse.ErrorResponse(400, "sequence required");
var message = streamManager.GetMessage(streamName, sequence);
if (message == null)
return JetStreamApiResponse.NotFound(subject);
return new JetStreamApiResponse
{
StreamMessage = new JetStreamStreamMessage
{
Sequence = message.Sequence,
Subject = message.Subject,
Payload = Encoding.UTF8.GetString(message.Payload.Span),
},
};
}
public static JetStreamApiResponse HandleMessageDelete(string subject, ReadOnlySpan<byte> payload, StreamManager streamManager)
{
var streamName = ExtractTrailingToken(subject, MessageDeletePrefix);
if (streamName == null)
return JetStreamApiResponse.NotFound(subject);
var sequence = ParseSequence(payload);
if (sequence == 0)
return JetStreamApiResponse.ErrorResponse(400, "sequence required");
return streamManager.DeleteMessage(streamName, sequence)
? JetStreamApiResponse.SuccessResponse()
: JetStreamApiResponse.NotFound(subject);
}
public static JetStreamApiResponse HandleSnapshot(string subject, StreamManager streamManager)
{
var streamName = ExtractTrailingToken(subject, SnapshotPrefix);
if (streamName == null)
return JetStreamApiResponse.NotFound(subject);
var snapshot = streamManager.CreateSnapshot(streamName);
if (snapshot == null)
return JetStreamApiResponse.NotFound(subject);
return new JetStreamApiResponse
{
Snapshot = new JetStreamSnapshot
{
Payload = Convert.ToBase64String(snapshot),
},
};
}
public static JetStreamApiResponse HandleRestore(string subject, ReadOnlySpan<byte> payload, StreamManager streamManager)
{
var streamName = ExtractTrailingToken(subject, RestorePrefix);
if (streamName == null)
return JetStreamApiResponse.NotFound(subject);
var snapshotBytes = ParseRestorePayload(payload);
if (snapshotBytes == null)
return JetStreamApiResponse.ErrorResponse(400, "snapshot payload required");
return streamManager.RestoreSnapshot(streamName, snapshotBytes)
? JetStreamApiResponse.SuccessResponse()
: JetStreamApiResponse.NotFound(subject);
}
private static string? ExtractTrailingToken(string subject, string prefix)
{
if (!subject.StartsWith(prefix, StringComparison.Ordinal))
return null;
var token = subject[prefix.Length..].Trim();
return token.Length == 0 ? null : token;
}
private static StreamConfig ParseConfig(ReadOnlySpan<byte> payload)
{
if (payload.IsEmpty)
return new StreamConfig();
try
{
using var doc = JsonDocument.Parse(payload.ToArray());
var root = doc.RootElement;
var config = new StreamConfig();
if (root.TryGetProperty("name", out var nameEl))
config.Name = nameEl.GetString() ?? string.Empty;
if (root.TryGetProperty("subjects", out var subjectsEl))
{
if (subjectsEl.ValueKind == JsonValueKind.Array)
{
foreach (var item in subjectsEl.EnumerateArray())
{
var value = item.GetString();
if (!string.IsNullOrWhiteSpace(value))
config.Subjects.Add(value);
}
}
else if (subjectsEl.ValueKind == JsonValueKind.String)
{
var value = subjectsEl.GetString();
if (!string.IsNullOrWhiteSpace(value))
config.Subjects.Add(value);
}
}
if (root.TryGetProperty("max_msgs", out var maxMsgsEl) && maxMsgsEl.TryGetInt32(out var maxMsgs))
config.MaxMsgs = maxMsgs;
if (root.TryGetProperty("max_bytes", out var maxBytesEl) && maxBytesEl.TryGetInt64(out var maxBytes))
config.MaxBytes = maxBytes;
if (root.TryGetProperty("max_msgs_per", out var maxMsgsPerEl) && maxMsgsPerEl.TryGetInt32(out var maxMsgsPer))
config.MaxMsgsPer = maxMsgsPer;
if (root.TryGetProperty("max_age_ms", out var maxAgeMsEl) && maxAgeMsEl.TryGetInt32(out var maxAgeMs))
config.MaxAgeMs = maxAgeMs;
if (root.TryGetProperty("max_msg_size", out var maxMsgSizeEl) && maxMsgSizeEl.TryGetInt32(out var maxMsgSize))
config.MaxMsgSize = maxMsgSize;
if (root.TryGetProperty("duplicate_window_ms", out var dupWindowEl) && dupWindowEl.TryGetInt32(out var dupWindow))
config.DuplicateWindowMs = dupWindow;
if (root.TryGetProperty("sealed", out var sealedEl) && sealedEl.ValueKind is JsonValueKind.True or JsonValueKind.False)
config.Sealed = sealedEl.GetBoolean();
if (root.TryGetProperty("deny_delete", out var denyDeleteEl) && denyDeleteEl.ValueKind is JsonValueKind.True or JsonValueKind.False)
config.DenyDelete = denyDeleteEl.GetBoolean();
if (root.TryGetProperty("deny_purge", out var denyPurgeEl) && denyPurgeEl.ValueKind is JsonValueKind.True or JsonValueKind.False)
config.DenyPurge = denyPurgeEl.GetBoolean();
if (root.TryGetProperty("allow_direct", out var allowDirectEl) && allowDirectEl.ValueKind is JsonValueKind.True or JsonValueKind.False)
config.AllowDirect = allowDirectEl.GetBoolean();
if (root.TryGetProperty("discard", out var discardEl))
{
var discard = discardEl.GetString();
if (string.Equals(discard, "new", StringComparison.OrdinalIgnoreCase))
config.Discard = DiscardPolicy.New;
else if (string.Equals(discard, "old", StringComparison.OrdinalIgnoreCase))
config.Discard = DiscardPolicy.Old;
}
if (root.TryGetProperty("storage", out var storageEl))
{
var storage = storageEl.GetString();
if (string.Equals(storage, "file", StringComparison.OrdinalIgnoreCase))
config.Storage = StorageType.File;
else
config.Storage = StorageType.Memory;
}
if (root.TryGetProperty("source", out var sourceEl))
config.Source = sourceEl.GetString();
if (root.TryGetProperty("sources", out var sourcesEl) && sourcesEl.ValueKind == JsonValueKind.Array)
{
foreach (var source in sourcesEl.EnumerateArray())
{
if (source.ValueKind == JsonValueKind.String)
{
var name = source.GetString();
if (!string.IsNullOrWhiteSpace(name))
config.Sources.Add(new StreamSourceConfig { Name = name });
}
else if (source.ValueKind == JsonValueKind.Object &&
source.TryGetProperty("name", out var sourceNameEl))
{
var name = sourceNameEl.GetString();
if (!string.IsNullOrWhiteSpace(name))
{
var sourceConfig = new StreamSourceConfig { Name = name };
if (source.TryGetProperty("subject_transform_prefix", out var prefixEl))
sourceConfig.SubjectTransformPrefix = prefixEl.GetString();
if (source.TryGetProperty("source_account", out var accountEl))
sourceConfig.SourceAccount = accountEl.GetString();
config.Sources.Add(sourceConfig);
}
}
}
}
if (root.TryGetProperty("replicas", out var replicasEl) && replicasEl.TryGetInt32(out var replicas))
config.Replicas = replicas;
return config;
}
catch (JsonException)
{
return new StreamConfig();
}
}
private static ulong ParseSequence(ReadOnlySpan<byte> payload)
{
if (payload.IsEmpty)
return 0;
try
{
using var doc = JsonDocument.Parse(payload.ToArray());
if (doc.RootElement.TryGetProperty("seq", out var seqEl) && seqEl.TryGetUInt64(out var sequence))
return sequence;
}
catch (JsonException)
{
}
return 0;
}
private static byte[]? ParseRestorePayload(ReadOnlySpan<byte> payload)
{
if (payload.IsEmpty)
return null;
var raw = Encoding.UTF8.GetString(payload).Trim();
if (raw.Length == 0)
return null;
try
{
return Convert.FromBase64String(raw);
}
catch (FormatException)
{
}
try
{
using var doc = JsonDocument.Parse(payload.ToArray());
if (doc.RootElement.TryGetProperty("payload", out var payloadEl))
{
var base64 = payloadEl.GetString();
if (!string.IsNullOrWhiteSpace(base64))
return Convert.FromBase64String(base64);
}
}
catch (JsonException)
{
}
return null;
}
}