feat: add jetstream mirror and source orchestration
This commit is contained in:
@@ -1,5 +1,6 @@
|
||||
using System.Collections.Concurrent;
|
||||
using NATS.Server.JetStream.Api;
|
||||
using NATS.Server.JetStream.MirrorSource;
|
||||
using NATS.Server.JetStream.Models;
|
||||
using NATS.Server.JetStream.Publish;
|
||||
using NATS.Server.JetStream.Storage;
|
||||
@@ -11,6 +12,10 @@ public sealed class StreamManager
|
||||
{
|
||||
private readonly ConcurrentDictionary<string, StreamHandle> _streams =
|
||||
new(StringComparer.Ordinal);
|
||||
private readonly ConcurrentDictionary<string, List<MirrorCoordinator>> _mirrorsByOrigin =
|
||||
new(StringComparer.Ordinal);
|
||||
private readonly ConcurrentDictionary<string, List<SourceCoordinator>> _sourcesByOrigin =
|
||||
new(StringComparer.Ordinal);
|
||||
|
||||
public IReadOnlyCollection<string> StreamNames => _streams.Keys.ToArray();
|
||||
|
||||
@@ -24,6 +29,7 @@ public sealed class StreamManager
|
||||
normalized.Name,
|
||||
_ => new StreamHandle(normalized, new MemStore()),
|
||||
(_, existing) => existing with { Config = normalized });
|
||||
RebuildReplicationCoordinators();
|
||||
|
||||
return BuildStreamInfoResponse(handle);
|
||||
}
|
||||
@@ -65,6 +71,9 @@ public sealed class StreamManager
|
||||
|
||||
var seq = stream.Store.AppendAsync(subject, payload, default).GetAwaiter().GetResult();
|
||||
EnforceLimits(stream);
|
||||
var stored = stream.Store.LoadAsync(seq, default).GetAwaiter().GetResult();
|
||||
if (stored != null)
|
||||
ReplicateIfConfigured(stream.Config.Name, stored);
|
||||
|
||||
return new PubAck
|
||||
{
|
||||
@@ -81,6 +90,8 @@ public sealed class StreamManager
|
||||
Subjects = config.Subjects.Count == 0 ? [] : [.. config.Subjects],
|
||||
MaxMsgs = config.MaxMsgs,
|
||||
Replicas = config.Replicas,
|
||||
Mirror = config.Mirror,
|
||||
Source = config.Source,
|
||||
};
|
||||
|
||||
return copy;
|
||||
@@ -114,6 +125,44 @@ public sealed class StreamManager
|
||||
if (stream.Store is FileStore fileStore)
|
||||
fileStore.TrimToMaxMessages(maxMessages);
|
||||
}
|
||||
|
||||
private void RebuildReplicationCoordinators()
|
||||
{
|
||||
_mirrorsByOrigin.Clear();
|
||||
_sourcesByOrigin.Clear();
|
||||
|
||||
foreach (var stream in _streams.Values)
|
||||
{
|
||||
if (!string.IsNullOrWhiteSpace(stream.Config.Mirror)
|
||||
&& _streams.TryGetValue(stream.Config.Mirror, out _))
|
||||
{
|
||||
var list = _mirrorsByOrigin.GetOrAdd(stream.Config.Mirror, _ => []);
|
||||
list.Add(new MirrorCoordinator(stream.Store));
|
||||
}
|
||||
|
||||
if (!string.IsNullOrWhiteSpace(stream.Config.Source)
|
||||
&& _streams.TryGetValue(stream.Config.Source, out _))
|
||||
{
|
||||
var list = _sourcesByOrigin.GetOrAdd(stream.Config.Source, _ => []);
|
||||
list.Add(new SourceCoordinator(stream.Store));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void ReplicateIfConfigured(string originStream, StoredMessage stored)
|
||||
{
|
||||
if (_mirrorsByOrigin.TryGetValue(originStream, out var mirrors))
|
||||
{
|
||||
foreach (var mirror in mirrors)
|
||||
mirror.OnOriginAppendAsync(stored, default).GetAwaiter().GetResult();
|
||||
}
|
||||
|
||||
if (_sourcesByOrigin.TryGetValue(originStream, out var sources))
|
||||
{
|
||||
foreach (var source in sources)
|
||||
source.OnOriginAppendAsync(stored, default).GetAwaiter().GetResult();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public sealed record StreamHandle(StreamConfig Config, IStreamStore Store);
|
||||
|
||||
Reference in New Issue
Block a user