Files
natsdotnet/src/NATS.Server/JetStream/StreamManager.cs

330 lines
11 KiB
C#

using System.Collections.Concurrent;
using NATS.Server.Auth;
using NATS.Server.JetStream.Api;
using NATS.Server.JetStream.Cluster;
using NATS.Server.JetStream.MirrorSource;
using NATS.Server.JetStream.Models;
using NATS.Server.JetStream.Publish;
using NATS.Server.JetStream.Snapshots;
using NATS.Server.JetStream.Storage;
using NATS.Server.Subscriptions;
namespace NATS.Server.JetStream;
public sealed class StreamManager
{
private readonly Account? _account;
private readonly JetStreamMetaGroup? _metaGroup;
private readonly ConcurrentDictionary<string, StreamHandle> _streams =
new(StringComparer.Ordinal);
private readonly ConcurrentDictionary<string, StreamReplicaGroup> _replicaGroups =
new(StringComparer.Ordinal);
private readonly ConcurrentDictionary<string, List<MirrorCoordinator>> _mirrorsByOrigin =
new(StringComparer.Ordinal);
private readonly ConcurrentDictionary<string, List<SourceCoordinator>> _sourcesByOrigin =
new(StringComparer.Ordinal);
private readonly StreamSnapshotService _snapshotService = new();
public StreamManager(JetStreamMetaGroup? metaGroup = null, Account? account = null)
{
_metaGroup = metaGroup;
_account = account;
}
public IReadOnlyCollection<string> StreamNames => _streams.Keys.ToArray();
public IReadOnlyList<string> ListNames()
=> [.. _streams.Keys.OrderBy(x => x, StringComparer.Ordinal)];
public JetStreamApiResponse CreateOrUpdate(StreamConfig config)
{
if (string.IsNullOrWhiteSpace(config.Name))
return JetStreamApiResponse.ErrorResponse(400, "stream name required");
var normalized = NormalizeConfig(config);
var isCreate = !_streams.ContainsKey(normalized.Name);
if (isCreate && _account is not null && !_account.TryReserveStream())
return JetStreamApiResponse.ErrorResponse(10027, "maximum streams exceeded");
var handle = _streams.AddOrUpdate(
normalized.Name,
_ => new StreamHandle(normalized, CreateStore(normalized)),
(_, existing) =>
{
if (existing.Config.Storage == normalized.Storage)
return existing with { Config = normalized };
return new StreamHandle(normalized, CreateStore(normalized));
});
_replicaGroups.AddOrUpdate(
normalized.Name,
_ => new StreamReplicaGroup(normalized.Name, normalized.Replicas),
(_, existing) => existing.Nodes.Count == Math.Max(normalized.Replicas, 1)
? existing
: new StreamReplicaGroup(normalized.Name, normalized.Replicas));
RebuildReplicationCoordinators();
_metaGroup?.ProposeCreateStreamAsync(normalized, default).GetAwaiter().GetResult();
return BuildStreamInfoResponse(handle);
}
public JetStreamApiResponse GetInfo(string name)
{
if (_streams.TryGetValue(name, out var stream))
return BuildStreamInfoResponse(stream);
return JetStreamApiResponse.NotFound($"$JS.API.STREAM.INFO.{name}");
}
public bool TryGet(string name, out StreamHandle handle) => _streams.TryGetValue(name, out handle!);
public bool Delete(string name)
{
if (!_streams.TryRemove(name, out _))
return false;
_replicaGroups.TryRemove(name, out _);
_account?.ReleaseStream();
RebuildReplicationCoordinators();
return true;
}
public bool Purge(string name)
{
if (!_streams.TryGetValue(name, out var stream))
return false;
stream.Store.PurgeAsync(default).GetAwaiter().GetResult();
return true;
}
public StoredMessage? GetMessage(string name, ulong sequence)
{
if (!_streams.TryGetValue(name, out var stream))
return null;
return stream.Store.LoadAsync(sequence, default).GetAwaiter().GetResult();
}
public bool DeleteMessage(string name, ulong sequence)
{
if (!_streams.TryGetValue(name, out var stream))
return false;
return stream.Store.RemoveAsync(sequence, default).GetAwaiter().GetResult();
}
public byte[]? CreateSnapshot(string name)
{
if (!_streams.TryGetValue(name, out var stream))
return null;
return _snapshotService.SnapshotAsync(stream, default).GetAwaiter().GetResult();
}
public bool RestoreSnapshot(string name, ReadOnlyMemory<byte> snapshot)
{
if (!_streams.TryGetValue(name, out var stream))
return false;
_snapshotService.RestoreAsync(stream, snapshot, default).GetAwaiter().GetResult();
return true;
}
public ValueTask<StreamState> GetStateAsync(string name, CancellationToken ct)
{
if (_streams.TryGetValue(name, out var stream))
return stream.Store.GetStateAsync(ct);
return ValueTask.FromResult(new StreamState());
}
public StreamHandle? FindBySubject(string subject)
{
foreach (var stream in _streams.Values)
{
if (stream.Config.Subjects.Any(p => SubjectMatch.MatchLiteral(subject, p)))
return stream;
}
return null;
}
public PubAck? Capture(string subject, ReadOnlyMemory<byte> payload)
{
var stream = FindBySubject(subject);
if (stream == null)
return null;
var stateBefore = stream.Store.GetStateAsync(default).GetAwaiter().GetResult();
if (stream.Config.MaxBytes > 0 && (long)stateBefore.Bytes + payload.Length > stream.Config.MaxBytes)
{
if (stream.Config.Discard == DiscardPolicy.New)
{
return new PubAck
{
Stream = stream.Config.Name,
ErrorCode = 10054,
};
}
while ((long)stateBefore.Bytes + payload.Length > stream.Config.MaxBytes && stateBefore.FirstSeq > 0)
{
stream.Store.RemoveAsync(stateBefore.FirstSeq, default).GetAwaiter().GetResult();
stateBefore = stream.Store.GetStateAsync(default).GetAwaiter().GetResult();
}
}
if (_replicaGroups.TryGetValue(stream.Config.Name, out var replicaGroup))
_ = replicaGroup.ProposeAsync($"PUB {subject}", default).GetAwaiter().GetResult();
var seq = stream.Store.AppendAsync(subject, payload, default).GetAwaiter().GetResult();
EnforceLimits(stream);
var stored = stream.Store.LoadAsync(seq, default).GetAwaiter().GetResult();
if (stored != null)
ReplicateIfConfigured(stream.Config.Name, stored);
return new PubAck
{
Stream = stream.Config.Name,
Seq = seq,
};
}
public Task StepDownStreamLeaderAsync(string stream, CancellationToken ct)
{
if (_replicaGroups.TryGetValue(stream, out var replicaGroup))
return replicaGroup.StepDownAsync(ct);
return Task.CompletedTask;
}
private static StreamConfig NormalizeConfig(StreamConfig config)
{
var copy = new StreamConfig
{
Name = config.Name,
Subjects = config.Subjects.Count == 0 ? [] : [.. config.Subjects],
MaxMsgs = config.MaxMsgs,
MaxBytes = config.MaxBytes,
MaxMsgsPer = config.MaxMsgsPer,
MaxAgeMs = config.MaxAgeMs,
MaxConsumers = config.MaxConsumers,
Retention = config.Retention,
Discard = config.Discard,
Storage = config.Storage,
Replicas = config.Replicas,
Mirror = config.Mirror,
Source = config.Source,
Sources = config.Sources.Count == 0 ? [] : [.. config.Sources.Select(s => new StreamSourceConfig { Name = s.Name })],
};
return copy;
}
private static JetStreamApiResponse BuildStreamInfoResponse(StreamHandle handle)
{
var state = handle.Store.GetStateAsync(default).GetAwaiter().GetResult();
return new JetStreamApiResponse
{
StreamInfo = new JetStreamStreamInfo
{
Config = handle.Config,
State = state,
},
};
}
private static void EnforceLimits(StreamHandle stream)
{
if (stream.Config.MaxMsgs <= 0)
return;
var maxMessages = (ulong)stream.Config.MaxMsgs;
if (stream.Store is MemStore memStore)
{
memStore.TrimToMaxMessages(maxMessages);
return;
}
if (stream.Store is FileStore fileStore)
fileStore.TrimToMaxMessages(maxMessages);
}
private void RebuildReplicationCoordinators()
{
_mirrorsByOrigin.Clear();
_sourcesByOrigin.Clear();
foreach (var stream in _streams.Values)
{
if (!string.IsNullOrWhiteSpace(stream.Config.Mirror)
&& _streams.TryGetValue(stream.Config.Mirror, out _))
{
var list = _mirrorsByOrigin.GetOrAdd(stream.Config.Mirror, _ => []);
list.Add(new MirrorCoordinator(stream.Store));
}
if (!string.IsNullOrWhiteSpace(stream.Config.Source)
&& _streams.TryGetValue(stream.Config.Source, out _))
{
var list = _sourcesByOrigin.GetOrAdd(stream.Config.Source, _ => []);
list.Add(new SourceCoordinator(stream.Store));
}
if (stream.Config.Sources.Count > 0)
{
foreach (var source in stream.Config.Sources)
{
if (string.IsNullOrWhiteSpace(source.Name) || !_streams.TryGetValue(source.Name, out _))
continue;
var list = _sourcesByOrigin.GetOrAdd(source.Name, _ => []);
list.Add(new SourceCoordinator(stream.Store));
}
}
}
}
private void ReplicateIfConfigured(string originStream, StoredMessage stored)
{
if (_mirrorsByOrigin.TryGetValue(originStream, out var mirrors))
{
foreach (var mirror in mirrors)
mirror.OnOriginAppendAsync(stored, default).GetAwaiter().GetResult();
}
if (_sourcesByOrigin.TryGetValue(originStream, out var sources))
{
foreach (var source in sources)
source.OnOriginAppendAsync(stored, default).GetAwaiter().GetResult();
}
}
public string GetStoreBackendType(string streamName)
{
if (!_streams.TryGetValue(streamName, out var stream))
return "missing";
return stream.Store switch
{
FileStore => "file",
_ => "memory",
};
}
private static IStreamStore CreateStore(StreamConfig config)
{
return config.Storage switch
{
StorageType.File => new FileStore(new FileStoreOptions
{
Directory = Path.Combine(Path.GetTempPath(), "natsdotnet-js-store", config.Name),
}),
_ => new MemStore(),
};
}
}
public sealed record StreamHandle(StreamConfig Config, IStreamStore Store);