diff --git a/src/NATS.Server/JetStream/Cluster/JetStreamMetaGroup.cs b/src/NATS.Server/JetStream/Cluster/JetStreamMetaGroup.cs index fd6d82f..0a20dee 100644 --- a/src/NATS.Server/JetStream/Cluster/JetStreamMetaGroup.cs +++ b/src/NATS.Server/JetStream/Cluster/JetStreamMetaGroup.cs @@ -22,10 +22,12 @@ public sealed class JetStreamMetaGroup private readonly ConcurrentDictionary _assignments = new(StringComparer.Ordinal); - // B8: Inflight proposal tracking -- entries that have been proposed but not yet committed. - // Go reference: jetstream_cluster.go inflight tracking for proposals. - private readonly ConcurrentDictionary _inflightStreams = new(StringComparer.Ordinal); - private readonly ConcurrentDictionary _inflightConsumers = new(StringComparer.Ordinal); + // Account-scoped inflight proposal tracking -- entries proposed but not yet committed. + // Go reference: jetstream_cluster.go inflight tracking for proposals (jetstream_cluster.go:1193-1278). + // Outer key: account name. Inner key: stream name → InflightInfo. + private readonly ConcurrentDictionary> _inflightStreams = new(StringComparer.Ordinal); + // Outer key: account name. Inner key: "stream/consumer" → InflightInfo. + private readonly ConcurrentDictionary> _inflightConsumers = new(StringComparer.Ordinal); // Running count of consumers across all stream assignments. private int _totalConsumerCount; @@ -74,14 +76,152 @@ public sealed class JetStreamMetaGroup public int ConsumerCount => _totalConsumerCount; /// - /// Number of inflight stream proposals. + /// Total number of inflight stream proposals across all accounts. /// - public int InflightStreamCount => _inflightStreams.Count; + public int InflightStreamCount => _inflightStreams.Values.Sum(d => d.Count); /// - /// Number of inflight consumer proposals. + /// Total number of inflight consumer proposals across all accounts. /// - public int InflightConsumerCount => _inflightConsumers.Count; + public int InflightConsumerCount => _inflightConsumers.Values.Sum(d => d.Count); + + // --------------------------------------------------------------- + // Inflight proposal tracking — public API + // Go reference: jetstream_cluster.go:1193-1278 inflight proposal management. + // --------------------------------------------------------------- + + /// + /// Tracks a stream proposal as inflight for the given account. + /// Increments OpsCount on duplicate proposals for the same stream name. + /// Go reference: jetstream_cluster.go inflight proposal tracking. + /// + public void TrackInflightStreamProposal(string account, StreamAssignment sa) + { + var accountDict = _inflightStreams.GetOrAdd(account, _ => new Dictionary(StringComparer.Ordinal)); + lock (accountDict) + { + if (accountDict.TryGetValue(sa.StreamName, out var existing)) + accountDict[sa.StreamName] = existing with { OpsCount = existing.OpsCount + 1 }; + else + accountDict[sa.StreamName] = new InflightInfo(OpsCount: 1, Deleted: false, Assignment: sa); + } + } + + /// + /// Decrements OpsCount for a stream proposal. Removes the entry when OpsCount reaches zero. + /// Removes the account entry when its dictionary becomes empty. + /// Go reference: jetstream_cluster.go inflight proposal tracking. + /// + public void RemoveInflightStreamProposal(string account, string streamName) + { + if (!_inflightStreams.TryGetValue(account, out var accountDict)) + return; + + lock (accountDict) + { + if (!accountDict.TryGetValue(streamName, out var existing)) + return; + + if (existing.OpsCount <= 1) + { + accountDict.Remove(streamName); + if (accountDict.Count == 0) + _inflightStreams.TryRemove(account, out _); + } + else + { + accountDict[streamName] = existing with { OpsCount = existing.OpsCount - 1 }; + } + } + } + + /// + /// Returns true if the given stream is currently tracked as inflight for the account. + /// Go reference: jetstream_cluster.go inflight check. + /// + public bool IsStreamInflight(string account, string streamName) + { + if (!_inflightStreams.TryGetValue(account, out var accountDict)) + return false; + + lock (accountDict) + { + return accountDict.TryGetValue(streamName, out var info) && info.OpsCount > 0; + } + } + + /// + /// Tracks a consumer proposal as inflight for the given account. + /// Increments OpsCount on duplicate proposals for the same stream/consumer key. + /// Go reference: jetstream_cluster.go inflight consumer proposal tracking. + /// + public void TrackInflightConsumerProposal(string account, string streamName, string consumerName, ConsumerAssignment? ca = null) + { + var key = $"{streamName}/{consumerName}"; + var accountDict = _inflightConsumers.GetOrAdd(account, _ => new Dictionary(StringComparer.Ordinal)); + lock (accountDict) + { + if (accountDict.TryGetValue(key, out var existing)) + accountDict[key] = existing with { OpsCount = existing.OpsCount + 1 }; + else + accountDict[key] = new InflightInfo(OpsCount: 1, Deleted: false, Assignment: null); + } + } + + /// + /// Decrements OpsCount for a consumer proposal. Removes the entry when OpsCount reaches zero. + /// Removes the account entry when its dictionary becomes empty. + /// Go reference: jetstream_cluster.go inflight consumer proposal tracking. + /// + public void RemoveInflightConsumerProposal(string account, string streamName, string consumerName) + { + var key = $"{streamName}/{consumerName}"; + if (!_inflightConsumers.TryGetValue(account, out var accountDict)) + return; + + lock (accountDict) + { + if (!accountDict.TryGetValue(key, out var existing)) + return; + + if (existing.OpsCount <= 1) + { + accountDict.Remove(key); + if (accountDict.Count == 0) + _inflightConsumers.TryRemove(account, out _); + } + else + { + accountDict[key] = existing with { OpsCount = existing.OpsCount - 1 }; + } + } + } + + /// + /// Returns true if the given consumer is currently tracked as inflight for the account. + /// Go reference: jetstream_cluster.go inflight check. + /// + public bool IsConsumerInflight(string account, string streamName, string consumerName) + { + var key = $"{streamName}/{consumerName}"; + if (!_inflightConsumers.TryGetValue(account, out var accountDict)) + return false; + + lock (accountDict) + { + return accountDict.TryGetValue(key, out var info) && info.OpsCount > 0; + } + } + + /// + /// Clears all inflight stream and consumer proposals across all accounts. + /// Go reference: jetstream_cluster.go — inflight cleared on shutdown/reset. + /// + public void ClearAllInflight() + { + _inflightStreams.Clear(); + _inflightConsumers.Clear(); + } // --------------------------------------------------------------- // Stream proposals @@ -104,14 +244,16 @@ public sealed class JetStreamMetaGroup { _ = ct; + var resolvedGroup = group ?? new RaftGroup { Name = config.Name }; + // Track as inflight - _inflightStreams[config.Name] = config.Name; + TrackInflightStreamProposal("$G", new StreamAssignment { StreamName = config.Name, Group = resolvedGroup }); // Apply the entry (idempotent via AddOrUpdate) - ApplyStreamCreate(config.Name, group ?? new RaftGroup { Name = config.Name }); + ApplyStreamCreate(config.Name, resolvedGroup); // Clear inflight - _inflightStreams.TryRemove(config.Name, out _); + RemoveInflightStreamProposal("$G", config.Name); return Task.CompletedTask; } @@ -131,14 +273,16 @@ public sealed class JetStreamMetaGroup if (_assignments.ContainsKey(config.Name)) throw new InvalidOperationException($"Stream '{config.Name}' already exists."); + var resolvedGroup = group ?? new RaftGroup { Name = config.Name }; + // Track as inflight - _inflightStreams[config.Name] = config.Name; + TrackInflightStreamProposal("$G", new StreamAssignment { StreamName = config.Name, Group = resolvedGroup }); // Apply the entry - ApplyStreamCreate(config.Name, group ?? new RaftGroup { Name = config.Name }); + ApplyStreamCreate(config.Name, resolvedGroup); // Clear inflight - _inflightStreams.TryRemove(config.Name, out _); + RemoveInflightStreamProposal("$G", config.Name); return Task.CompletedTask; } @@ -187,14 +331,13 @@ public sealed class JetStreamMetaGroup _ = ct; // Track as inflight - var inflightKey = $"{streamName}/{consumerName}"; - _inflightConsumers[inflightKey] = inflightKey; + TrackInflightConsumerProposal("$G", streamName, consumerName); // Apply the entry (silently ignored if stream does not exist) ApplyConsumerCreate(streamName, consumerName, group); // Clear inflight - _inflightConsumers.TryRemove(inflightKey, out _); + RemoveInflightConsumerProposal("$G", streamName, consumerName); return Task.CompletedTask; } @@ -219,14 +362,13 @@ public sealed class JetStreamMetaGroup throw new InvalidOperationException($"Stream '{streamName}' not found."); // Track as inflight - var inflightKey = $"{streamName}/{consumerName}"; - _inflightConsumers[inflightKey] = inflightKey; + TrackInflightConsumerProposal("$G", streamName, consumerName); // Apply the entry ApplyConsumerCreate(streamName, consumerName, group); // Clear inflight - _inflightConsumers.TryRemove(inflightKey, out _); + RemoveInflightConsumerProposal("$G", streamName, consumerName); return Task.CompletedTask; } @@ -612,3 +754,11 @@ public sealed class MetaGroupState /// public int ConsumerCount { get; init; } } + +/// +/// Tracks an inflight stream or consumer proposal with ops counting. +/// OpsCount increments on duplicate proposals so that each proposer must +/// independently call Remove before the entry is cleared. +/// Go reference: jetstream_cluster.go inflight proposal tracking. +/// +public record InflightInfo(int OpsCount, bool Deleted, StreamAssignment? Assignment); diff --git a/tests/NATS.Server.Tests/JetStream/Cluster/JetStreamInflightTrackingTests.cs b/tests/NATS.Server.Tests/JetStream/Cluster/JetStreamInflightTrackingTests.cs new file mode 100644 index 0000000..96d4268 --- /dev/null +++ b/tests/NATS.Server.Tests/JetStream/Cluster/JetStreamInflightTrackingTests.cs @@ -0,0 +1,146 @@ +using NATS.Server.JetStream.Cluster; + +namespace NATS.Server.Tests.JetStream.Cluster; + +public class JetStreamInflightTrackingTests +{ + [Fact] + public void TrackInflightStreamProposal_increments_ops() + { + var meta = new JetStreamMetaGroup(3); + var sa = new StreamAssignment + { + StreamName = "inflight-1", + Group = new RaftGroup { Name = "rg-inf", Peers = ["n1", "n2", "n3"] }, + }; + + meta.TrackInflightStreamProposal("ACC", sa); + meta.InflightStreamCount.ShouldBe(1); + meta.IsStreamInflight("ACC", "inflight-1").ShouldBeTrue(); + } + + [Fact] + public void RemoveInflightStreamProposal_clears_when_zero() + { + var meta = new JetStreamMetaGroup(3); + var sa = new StreamAssignment + { + StreamName = "inflight-2", + Group = new RaftGroup { Name = "rg-inf2", Peers = ["n1", "n2", "n3"] }, + }; + + meta.TrackInflightStreamProposal("ACC", sa); + meta.RemoveInflightStreamProposal("ACC", "inflight-2"); + meta.IsStreamInflight("ACC", "inflight-2").ShouldBeFalse(); + } + + [Fact] + public void Duplicate_proposal_increments_ops_count() + { + var meta = new JetStreamMetaGroup(3); + var sa = new StreamAssignment + { + StreamName = "dup-stream", + Group = new RaftGroup { Name = "rg-dup", Peers = ["n1", "n2", "n3"] }, + }; + + meta.TrackInflightStreamProposal("ACC", sa); + meta.TrackInflightStreamProposal("ACC", sa); + meta.InflightStreamCount.ShouldBe(1); // still one unique stream + + // Need two removes to fully clear + meta.RemoveInflightStreamProposal("ACC", "dup-stream"); + meta.IsStreamInflight("ACC", "dup-stream").ShouldBeTrue(); // ops > 0 + meta.RemoveInflightStreamProposal("ACC", "dup-stream"); + meta.IsStreamInflight("ACC", "dup-stream").ShouldBeFalse(); + } + + [Fact] + public void IsStreamInflight_returns_false_for_unknown_account() + { + var meta = new JetStreamMetaGroup(3); + meta.IsStreamInflight("UNKNOWN", "no-stream").ShouldBeFalse(); + } + + [Fact] + public void TrackInflightConsumerProposal_tracks_by_account() + { + var meta = new JetStreamMetaGroup(3); + meta.TrackInflightConsumerProposal("ACC", "stream1", "consumer1"); + + meta.InflightConsumerCount.ShouldBe(1); + meta.IsConsumerInflight("ACC", "stream1", "consumer1").ShouldBeTrue(); + } + + [Fact] + public void RemoveInflightConsumerProposal_clears_when_zero() + { + var meta = new JetStreamMetaGroup(3); + meta.TrackInflightConsumerProposal("ACC", "stream1", "consumer1"); + meta.RemoveInflightConsumerProposal("ACC", "stream1", "consumer1"); + + meta.IsConsumerInflight("ACC", "stream1", "consumer1").ShouldBeFalse(); + } + + [Fact] + public void ClearAllInflight_removes_everything() + { + var meta = new JetStreamMetaGroup(3); + var sa = new StreamAssignment + { + StreamName = "s1", + Group = new RaftGroup { Name = "rg", Peers = ["n1", "n2", "n3"] }, + }; + + meta.TrackInflightStreamProposal("ACC1", sa); + meta.TrackInflightConsumerProposal("ACC2", "s2", "c1"); + + meta.ClearAllInflight(); + + meta.InflightStreamCount.ShouldBe(0); + meta.InflightConsumerCount.ShouldBe(0); + } + + [Fact] + public void StepDown_clears_inflight() + { + var meta = new JetStreamMetaGroup(3); + var sa = new StreamAssignment + { + StreamName = "s1", + Group = new RaftGroup { Name = "rg", Peers = ["n1", "n2", "n3"] }, + }; + + meta.TrackInflightStreamProposal("ACC", sa); + meta.StepDown(); + + meta.InflightStreamCount.ShouldBe(0); + } + + [Fact] + public void Multiple_accounts_tracked_independently() + { + var meta = new JetStreamMetaGroup(3); + var sa1 = new StreamAssignment + { + StreamName = "s1", + Group = new RaftGroup { Name = "rg1", Peers = ["n1", "n2", "n3"] }, + }; + var sa2 = new StreamAssignment + { + StreamName = "s1", // same stream name, different account + Group = new RaftGroup { Name = "rg2", Peers = ["n1", "n2", "n3"] }, + }; + + meta.TrackInflightStreamProposal("ACC1", sa1); + meta.TrackInflightStreamProposal("ACC2", sa2); + + meta.InflightStreamCount.ShouldBe(2); // one per account + meta.IsStreamInflight("ACC1", "s1").ShouldBeTrue(); + meta.IsStreamInflight("ACC2", "s1").ShouldBeTrue(); + + meta.RemoveInflightStreamProposal("ACC1", "s1"); + meta.IsStreamInflight("ACC1", "s1").ShouldBeFalse(); + meta.IsStreamInflight("ACC2", "s1").ShouldBeTrue(); // still tracked + } +}