118 lines
6.0 KiB
C#
118 lines
6.0 KiB
C#
using NATS.Server.JetStream.Api.Handlers;
|
|
|
|
namespace NATS.Server.JetStream.Api;
|
|
|
|
public sealed class JetStreamApiRouter
|
|
{
|
|
private readonly StreamManager _streamManager;
|
|
private readonly ConsumerManager _consumerManager;
|
|
private readonly JetStream.Cluster.JetStreamMetaGroup? _metaGroup;
|
|
|
|
public JetStreamApiRouter()
|
|
: this(new StreamManager(), new ConsumerManager(), null)
|
|
{
|
|
}
|
|
|
|
public JetStreamApiRouter(StreamManager streamManager, ConsumerManager consumerManager, JetStream.Cluster.JetStreamMetaGroup? metaGroup = null)
|
|
{
|
|
_streamManager = streamManager;
|
|
_consumerManager = consumerManager;
|
|
_metaGroup = metaGroup;
|
|
}
|
|
|
|
public JetStreamApiResponse Route(string subject, ReadOnlySpan<byte> payload)
|
|
{
|
|
if (subject.Equals(JetStreamApiSubjects.Info, StringComparison.Ordinal))
|
|
return AccountApiHandlers.HandleInfo(_streamManager, _consumerManager);
|
|
|
|
if (subject.Equals(JetStreamApiSubjects.ServerRemove, StringComparison.Ordinal))
|
|
return AccountControlApiHandlers.HandleServerRemove();
|
|
|
|
if (subject.StartsWith(JetStreamApiSubjects.AccountPurge, StringComparison.Ordinal))
|
|
return AccountControlApiHandlers.HandleAccountPurge(subject);
|
|
|
|
if (subject.StartsWith(JetStreamApiSubjects.AccountStreamMoveCancel, StringComparison.Ordinal))
|
|
return AccountControlApiHandlers.HandleAccountStreamMoveCancel(subject);
|
|
|
|
if (subject.StartsWith(JetStreamApiSubjects.AccountStreamMove, StringComparison.Ordinal))
|
|
return AccountControlApiHandlers.HandleAccountStreamMove(subject);
|
|
|
|
if (subject.StartsWith(JetStreamApiSubjects.StreamCreate, StringComparison.Ordinal))
|
|
return StreamApiHandlers.HandleCreate(subject, payload, _streamManager);
|
|
|
|
if (subject.StartsWith(JetStreamApiSubjects.StreamInfo, StringComparison.Ordinal))
|
|
return StreamApiHandlers.HandleInfo(subject, _streamManager);
|
|
|
|
if (subject.Equals(JetStreamApiSubjects.StreamNames, StringComparison.Ordinal))
|
|
return StreamApiHandlers.HandleNames(_streamManager);
|
|
|
|
if (subject.Equals(JetStreamApiSubjects.StreamList, StringComparison.Ordinal))
|
|
return StreamApiHandlers.HandleList(_streamManager);
|
|
|
|
if (subject.StartsWith(JetStreamApiSubjects.StreamUpdate, StringComparison.Ordinal))
|
|
return StreamApiHandlers.HandleUpdate(subject, payload, _streamManager);
|
|
|
|
if (subject.StartsWith(JetStreamApiSubjects.StreamDelete, StringComparison.Ordinal))
|
|
return StreamApiHandlers.HandleDelete(subject, _streamManager);
|
|
|
|
if (subject.StartsWith(JetStreamApiSubjects.StreamPurge, StringComparison.Ordinal))
|
|
return StreamApiHandlers.HandlePurge(subject, _streamManager);
|
|
|
|
if (subject.StartsWith(JetStreamApiSubjects.StreamMessageGet, StringComparison.Ordinal))
|
|
return StreamApiHandlers.HandleMessageGet(subject, payload, _streamManager);
|
|
|
|
if (subject.StartsWith(JetStreamApiSubjects.StreamMessageDelete, StringComparison.Ordinal))
|
|
return StreamApiHandlers.HandleMessageDelete(subject, payload, _streamManager);
|
|
|
|
if (subject.StartsWith(JetStreamApiSubjects.StreamSnapshot, StringComparison.Ordinal))
|
|
return StreamApiHandlers.HandleSnapshot(subject, _streamManager);
|
|
|
|
if (subject.StartsWith(JetStreamApiSubjects.StreamRestore, StringComparison.Ordinal))
|
|
return StreamApiHandlers.HandleRestore(subject, payload, _streamManager);
|
|
|
|
if (subject.StartsWith(JetStreamApiSubjects.StreamLeaderStepdown, StringComparison.Ordinal))
|
|
return ClusterControlApiHandlers.HandleStreamLeaderStepdown(subject, _streamManager);
|
|
|
|
if (subject.StartsWith(JetStreamApiSubjects.StreamPeerRemove, StringComparison.Ordinal))
|
|
return ClusterControlApiHandlers.HandleStreamPeerRemove(subject);
|
|
|
|
if (subject.StartsWith(JetStreamApiSubjects.ConsumerCreate, StringComparison.Ordinal))
|
|
return ConsumerApiHandlers.HandleCreate(subject, payload, _consumerManager);
|
|
|
|
if (subject.StartsWith(JetStreamApiSubjects.ConsumerInfo, StringComparison.Ordinal))
|
|
return ConsumerApiHandlers.HandleInfo(subject, _consumerManager);
|
|
|
|
if (subject.StartsWith(JetStreamApiSubjects.ConsumerNames, StringComparison.Ordinal))
|
|
return ConsumerApiHandlers.HandleNames(subject, _consumerManager);
|
|
|
|
if (subject.StartsWith(JetStreamApiSubjects.ConsumerList, StringComparison.Ordinal))
|
|
return ConsumerApiHandlers.HandleList(subject, _consumerManager);
|
|
|
|
if (subject.StartsWith(JetStreamApiSubjects.ConsumerDelete, StringComparison.Ordinal))
|
|
return ConsumerApiHandlers.HandleDelete(subject, _consumerManager);
|
|
|
|
if (subject.StartsWith(JetStreamApiSubjects.ConsumerPause, StringComparison.Ordinal))
|
|
return ConsumerApiHandlers.HandlePause(subject, payload, _consumerManager);
|
|
|
|
if (subject.StartsWith(JetStreamApiSubjects.ConsumerReset, StringComparison.Ordinal))
|
|
return ConsumerApiHandlers.HandleReset(subject, _consumerManager);
|
|
|
|
if (subject.StartsWith(JetStreamApiSubjects.ConsumerUnpin, StringComparison.Ordinal))
|
|
return ConsumerApiHandlers.HandleUnpin(subject, _consumerManager);
|
|
|
|
if (subject.StartsWith(JetStreamApiSubjects.ConsumerNext, StringComparison.Ordinal))
|
|
return ConsumerApiHandlers.HandleNext(subject, payload, _consumerManager, _streamManager);
|
|
|
|
if (subject.StartsWith(JetStreamApiSubjects.ConsumerLeaderStepdown, StringComparison.Ordinal))
|
|
return ClusterControlApiHandlers.HandleConsumerLeaderStepdown(subject);
|
|
|
|
if (subject.StartsWith(JetStreamApiSubjects.DirectGet, StringComparison.Ordinal))
|
|
return DirectApiHandlers.HandleGet(subject, payload, _streamManager);
|
|
|
|
if (subject.Equals(JetStreamApiSubjects.MetaLeaderStepdown, StringComparison.Ordinal) && _metaGroup != null)
|
|
return ClusterControlApiHandlers.HandleMetaLeaderStepdown(_metaGroup);
|
|
|
|
return JetStreamApiResponse.NotFound(subject);
|
|
}
|
|
}
|