batch36 task2 implement group-a lifecycle primitives

This commit is contained in:
Joseph Doherty
2026-02-28 22:52:06 -05:00
parent 0860afc886
commit 82c2f4ed1c
8 changed files with 424 additions and 1 deletions

View File

@@ -0,0 +1,71 @@
using System.Text.Json;
namespace ZB.MOM.NatsNet.Server;
public sealed partial class Account
{
internal (NatsStream? Stream, Exception? Error) AddStream(StreamConfig config) =>
AddStreamWithAssignment(config, null, null, pedantic: false);
internal (NatsStream? Stream, Exception? Error) AddStreamWithStore(StreamConfig config, FileStoreConfig? fileStoreConfig) =>
AddStreamWithAssignment(config, fileStoreConfig, null, pedantic: false);
internal (NatsStream? Stream, Exception? Error) AddStreamPedantic(StreamConfig config, bool pedantic) =>
AddStreamWithAssignment(config, null, null, pedantic);
internal (NatsStream? Stream, Exception? Error) AddStreamWithAssignment(
StreamConfig config,
FileStoreConfig? fileStoreConfig,
StreamAssignment? assignment,
bool pedantic)
{
if (config == null)
return (null, new ArgumentNullException(nameof(config)));
var (server, jsa, error) = CheckForJetStream();
if (error != null || server == null || jsa == null)
return (null, error ?? new InvalidOperationException("jetstream not enabled for account"));
if (string.IsNullOrWhiteSpace(config.Name))
return (null, new InvalidOperationException("stream name is required"));
var normalized = config.Clone();
if (normalized.Subjects == null || normalized.Subjects.Length == 0)
normalized.Subjects = [$"{normalized.Name}.>"];
_ = pedantic;
_ = fileStoreConfig;
jsa.Lock.EnterWriteLock();
try
{
if (jsa.Streams.TryGetValue(normalized.Name, out var existingObject) && existingObject is NatsStream existing)
{
if (StreamConfigsEqual(existing.Config, normalized))
{
if (assignment != null)
existing.SetStreamAssignment(assignment);
return (existing, null);
}
return (null, new InvalidOperationException("stream name already in use"));
}
IStreamStore store = new JetStreamMemStore(normalized.Clone());
var stream = NatsStream.Create(this, normalized, jsa, store, assignment, server);
if (stream == null)
return (null, new InvalidOperationException("stream creation failed"));
jsa.Streams[normalized.Name] = stream;
return (stream, null);
}
finally
{
jsa.Lock.ExitWriteLock();
}
}
private static bool StreamConfigsEqual(StreamConfig left, StreamConfig right)
=> string.Equals(JsonSerializer.Serialize(left), JsonSerializer.Serialize(right), StringComparison.Ordinal);
}

View File

@@ -0,0 +1,160 @@
using System.Threading.Channels;
namespace ZB.MOM.NatsNet.Server;
internal sealed partial class NatsStream
{
internal StreamAssignment? StreamAssignment()
{
_mu.EnterReadLock();
try
{
return _assignment;
}
finally
{
_mu.ExitReadLock();
}
}
internal void SetStreamAssignment(StreamAssignment? assignment)
{
_mu.EnterWriteLock();
try
{
_assignment = assignment;
if (assignment?.Group?.Node != null)
{
_node = assignment.Group.Node;
assignment.Group.Node.UpdateKnownPeers(assignment.Group.Peers);
}
_updateChannel.Writer.TryWrite(true);
}
finally
{
_mu.ExitWriteLock();
}
}
internal ChannelReader<bool>? MonitorQuitC()
{
_mu.EnterWriteLock();
try
{
_monitorQuitChannel ??= Channel.CreateBounded<bool>(1);
return _monitorQuitChannel.Reader;
}
finally
{
_mu.ExitWriteLock();
}
}
internal void SignalMonitorQuit()
{
_mu.EnterWriteLock();
try
{
if (_monitorQuitChannel != null)
{
_monitorQuitChannel.Writer.TryComplete();
_monitorQuitChannel = null;
}
}
finally
{
_mu.ExitWriteLock();
}
}
internal ChannelReader<bool> UpdateC() => _updateChannel.Reader;
internal bool IsLeaderNodeState()
{
_mu.EnterReadLock();
try
{
if (_node is IRaftNode raftNode)
return raftNode.State() == RaftState.Leader;
return true;
}
finally
{
_mu.ExitReadLock();
}
}
internal bool IsLeaderInternal()
{
_mu.EnterReadLock();
try
{
if (_assignment?.Group?.Node is IRaftNode node)
return node.Leader();
return true;
}
finally
{
_mu.ExitReadLock();
}
}
internal void StartClusterSubs()
{
_mu.EnterWriteLock();
try
{
_clusterSubsActive = true;
}
finally
{
_mu.ExitWriteLock();
}
}
internal void StopClusterSubs()
{
_mu.EnterWriteLock();
try
{
_clusterSubsActive = false;
}
finally
{
_mu.ExitWriteLock();
}
}
internal bool ClusterSubsActive()
{
_mu.EnterReadLock();
try
{
return _clusterSubsActive;
}
finally
{
_mu.ExitReadLock();
}
}
internal Account? AccountLocked(bool needLock)
{
if (needLock)
_mu.EnterReadLock();
try
{
return Account;
}
finally
{
if (needLock)
_mu.ExitReadLock();
}
}
}

View File

@@ -13,13 +13,15 @@
//
// Adapted from server/stream.go in the NATS server Go source.
using System.Threading.Channels;
namespace ZB.MOM.NatsNet.Server;
/// <summary>
/// Represents a JetStream stream, managing message storage, replication, and lifecycle.
/// Mirrors the <c>stream</c> struct in server/stream.go.
/// </summary>
internal sealed class NatsStream : IDisposable
internal sealed partial class NatsStream : IDisposable
{
private readonly ReaderWriterLockSlim _mu = new(LockRecursionPolicy.SupportsRecursion);
@@ -42,6 +44,10 @@ internal sealed class NatsStream : IDisposable
private ulong _leaderTerm;
private bool _sealed;
private CancellationTokenSource? _quitCts;
private Channel<bool>? _monitorQuitChannel = Channel.CreateBounded<bool>(1);
private readonly Channel<bool> _updateChannel = Channel.CreateBounded<bool>(4);
private StreamAssignment? _assignment;
private bool _clusterSubsActive;
/// <summary>IRaftNode — stored as object to avoid cross-dependency on Raft session.</summary>
private object? _node;

View File

@@ -303,6 +303,14 @@ public static class StoreEnumParityExtensions
StorageType.FileStorage => "File",
_ => "Unknown Storage Type",
};
public static string String(this PersistModeType value)
=> value switch
{
PersistModeType.DefaultPersistMode => "Default",
PersistModeType.AsyncPersistMode => "Async",
_ => "Unknown Persist Mode Type",
};
}
public sealed class RetentionPolicyJsonConverter : JsonConverter<RetentionPolicy>
@@ -403,6 +411,38 @@ public sealed class StorageTypeJsonConverter : JsonConverter<StorageType>
}
}
public sealed class PersistModeTypeJsonConverter : JsonConverter<PersistModeType>
{
public override PersistModeType Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options)
{
if (reader.TokenType != JsonTokenType.String)
throw new JsonException("can not unmarshal token");
return reader.GetString() switch
{
"default" => PersistModeType.DefaultPersistMode,
"" => PersistModeType.DefaultPersistMode,
"async" => PersistModeType.AsyncPersistMode,
var value => throw new JsonException($"can not unmarshal \"{value}\""),
};
}
public override void Write(Utf8JsonWriter writer, PersistModeType value, JsonSerializerOptions options)
{
switch (value)
{
case PersistModeType.DefaultPersistMode:
writer.WriteStringValue("default");
break;
case PersistModeType.AsyncPersistMode:
writer.WriteStringValue("async");
break;
default:
throw new JsonException($"can not marshal {value}");
}
}
}
public sealed class AckPolicyJsonConverter : JsonConverter<AckPolicy>
{
public override AckPolicy Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options)

View File

@@ -641,6 +641,7 @@ public enum StoreCompression : byte
// ---------------------------------------------------------------------------
/// <summary>Determines what persistence mode the stream uses.</summary>
[JsonConverter(typeof(PersistModeTypeJsonConverter))]
public enum PersistModeType
{
DefaultPersistMode = 0,
@@ -711,6 +712,14 @@ public sealed class ExternalStream
[JsonPropertyName("deliver")]
public string DeliverPrefix { get; set; } = string.Empty;
public string Domain()
{
if (string.IsNullOrEmpty(ApiPrefix))
return string.Empty;
return SubscriptionIndex.TokenAt(ApiPrefix, 2);
}
}
// ---------------------------------------------------------------------------
@@ -737,6 +746,47 @@ public sealed class StreamSource
[JsonPropertyName("external")]
public ExternalStream? External { get; set; }
[JsonIgnore]
public string IndexName { get; private set; } = string.Empty;
public string ComposeIName()
{
var name = Name;
if (External != null)
name = $"{name}:{NatsServer.GetHash(External.ApiPrefix)}";
var source = FilterSubject ?? string.Empty;
var destination = ">";
if (SubjectTransforms is null || SubjectTransforms.Length == 0)
{
if (string.IsNullOrEmpty(source))
source = ">";
if (string.IsNullOrEmpty(destination))
destination = ">";
}
else
{
var sources = new List<string>(SubjectTransforms.Length);
var destinations = new List<string>(SubjectTransforms.Length);
foreach (var transform in SubjectTransforms)
{
sources.Add(string.IsNullOrEmpty(transform.Source) ? ">" : transform.Source);
destinations.Add(string.IsNullOrEmpty(transform.Destination) ? ">" : transform.Destination);
}
source = string.Join('\f', sources);
destination = string.Join('\f', destinations);
}
return string.Join(' ', [name, source, destination]);
}
public void SetIndexName()
{
IndexName = ComposeIName();
}
}
// ---------------------------------------------------------------------------