feat(cluster): add structured inflight proposal tracking with ops counting

Replace the simple string-keyed inflight dictionaries with account-scoped
ConcurrentDictionary<string, Dictionary<string, InflightInfo>> structures.
Adds InflightInfo record with OpsCount for duplicate proposal tracking,
TrackInflight/RemoveInflight/IsInflight methods for streams and consumers,
and ClearAllInflight(). Updates existing Propose* methods to use $G account.
Go reference: jetstream_cluster.go:1193-1278.
This commit is contained in:
Joseph Doherty
2026-02-25 02:05:53 -05:00
parent 1a9b6a9175
commit 63d4e43178
2 changed files with 316 additions and 20 deletions

View File

@@ -22,10 +22,12 @@ public sealed class JetStreamMetaGroup
private readonly ConcurrentDictionary<string, StreamAssignment> _assignments =
new(StringComparer.Ordinal);
// B8: Inflight proposal tracking -- entries that have been proposed but not yet committed.
// Go reference: jetstream_cluster.go inflight tracking for proposals.
private readonly ConcurrentDictionary<string, string> _inflightStreams = new(StringComparer.Ordinal);
private readonly ConcurrentDictionary<string, string> _inflightConsumers = new(StringComparer.Ordinal);
// Account-scoped inflight proposal tracking -- entries proposed but not yet committed.
// Go reference: jetstream_cluster.go inflight tracking for proposals (jetstream_cluster.go:1193-1278).
// Outer key: account name. Inner key: stream name → InflightInfo.
private readonly ConcurrentDictionary<string, Dictionary<string, InflightInfo>> _inflightStreams = new(StringComparer.Ordinal);
// Outer key: account name. Inner key: "stream/consumer" → InflightInfo.
private readonly ConcurrentDictionary<string, Dictionary<string, InflightInfo>> _inflightConsumers = new(StringComparer.Ordinal);
// Running count of consumers across all stream assignments.
private int _totalConsumerCount;
@@ -74,14 +76,152 @@ public sealed class JetStreamMetaGroup
public int ConsumerCount => _totalConsumerCount;
/// <summary>
/// Number of inflight stream proposals.
/// Total number of inflight stream proposals across all accounts.
/// </summary>
public int InflightStreamCount => _inflightStreams.Count;
public int InflightStreamCount => _inflightStreams.Values.Sum(d => d.Count);
/// <summary>
/// Number of inflight consumer proposals.
/// Total number of inflight consumer proposals across all accounts.
/// </summary>
public int InflightConsumerCount => _inflightConsumers.Count;
public int InflightConsumerCount => _inflightConsumers.Values.Sum(d => d.Count);
// ---------------------------------------------------------------
// Inflight proposal tracking — public API
// Go reference: jetstream_cluster.go:1193-1278 inflight proposal management.
// ---------------------------------------------------------------
/// <summary>
/// Tracks a stream proposal as inflight for the given account.
/// Increments OpsCount on duplicate proposals for the same stream name.
/// Go reference: jetstream_cluster.go inflight proposal tracking.
/// </summary>
public void TrackInflightStreamProposal(string account, StreamAssignment sa)
{
var accountDict = _inflightStreams.GetOrAdd(account, _ => new Dictionary<string, InflightInfo>(StringComparer.Ordinal));
lock (accountDict)
{
if (accountDict.TryGetValue(sa.StreamName, out var existing))
accountDict[sa.StreamName] = existing with { OpsCount = existing.OpsCount + 1 };
else
accountDict[sa.StreamName] = new InflightInfo(OpsCount: 1, Deleted: false, Assignment: sa);
}
}
/// <summary>
/// Decrements OpsCount for a stream proposal. Removes the entry when OpsCount reaches zero.
/// Removes the account entry when its dictionary becomes empty.
/// Go reference: jetstream_cluster.go inflight proposal tracking.
/// </summary>
public void RemoveInflightStreamProposal(string account, string streamName)
{
if (!_inflightStreams.TryGetValue(account, out var accountDict))
return;
lock (accountDict)
{
if (!accountDict.TryGetValue(streamName, out var existing))
return;
if (existing.OpsCount <= 1)
{
accountDict.Remove(streamName);
if (accountDict.Count == 0)
_inflightStreams.TryRemove(account, out _);
}
else
{
accountDict[streamName] = existing with { OpsCount = existing.OpsCount - 1 };
}
}
}
/// <summary>
/// Returns true if the given stream is currently tracked as inflight for the account.
/// Go reference: jetstream_cluster.go inflight check.
/// </summary>
public bool IsStreamInflight(string account, string streamName)
{
if (!_inflightStreams.TryGetValue(account, out var accountDict))
return false;
lock (accountDict)
{
return accountDict.TryGetValue(streamName, out var info) && info.OpsCount > 0;
}
}
/// <summary>
/// Tracks a consumer proposal as inflight for the given account.
/// Increments OpsCount on duplicate proposals for the same stream/consumer key.
/// Go reference: jetstream_cluster.go inflight consumer proposal tracking.
/// </summary>
public void TrackInflightConsumerProposal(string account, string streamName, string consumerName, ConsumerAssignment? ca = null)
{
var key = $"{streamName}/{consumerName}";
var accountDict = _inflightConsumers.GetOrAdd(account, _ => new Dictionary<string, InflightInfo>(StringComparer.Ordinal));
lock (accountDict)
{
if (accountDict.TryGetValue(key, out var existing))
accountDict[key] = existing with { OpsCount = existing.OpsCount + 1 };
else
accountDict[key] = new InflightInfo(OpsCount: 1, Deleted: false, Assignment: null);
}
}
/// <summary>
/// Decrements OpsCount for a consumer proposal. Removes the entry when OpsCount reaches zero.
/// Removes the account entry when its dictionary becomes empty.
/// Go reference: jetstream_cluster.go inflight consumer proposal tracking.
/// </summary>
public void RemoveInflightConsumerProposal(string account, string streamName, string consumerName)
{
var key = $"{streamName}/{consumerName}";
if (!_inflightConsumers.TryGetValue(account, out var accountDict))
return;
lock (accountDict)
{
if (!accountDict.TryGetValue(key, out var existing))
return;
if (existing.OpsCount <= 1)
{
accountDict.Remove(key);
if (accountDict.Count == 0)
_inflightConsumers.TryRemove(account, out _);
}
else
{
accountDict[key] = existing with { OpsCount = existing.OpsCount - 1 };
}
}
}
/// <summary>
/// Returns true if the given consumer is currently tracked as inflight for the account.
/// Go reference: jetstream_cluster.go inflight check.
/// </summary>
public bool IsConsumerInflight(string account, string streamName, string consumerName)
{
var key = $"{streamName}/{consumerName}";
if (!_inflightConsumers.TryGetValue(account, out var accountDict))
return false;
lock (accountDict)
{
return accountDict.TryGetValue(key, out var info) && info.OpsCount > 0;
}
}
/// <summary>
/// Clears all inflight stream and consumer proposals across all accounts.
/// Go reference: jetstream_cluster.go — inflight cleared on shutdown/reset.
/// </summary>
public void ClearAllInflight()
{
_inflightStreams.Clear();
_inflightConsumers.Clear();
}
// ---------------------------------------------------------------
// Stream proposals
@@ -104,14 +244,16 @@ public sealed class JetStreamMetaGroup
{
_ = ct;
var resolvedGroup = group ?? new RaftGroup { Name = config.Name };
// Track as inflight
_inflightStreams[config.Name] = config.Name;
TrackInflightStreamProposal("$G", new StreamAssignment { StreamName = config.Name, Group = resolvedGroup });
// Apply the entry (idempotent via AddOrUpdate)
ApplyStreamCreate(config.Name, group ?? new RaftGroup { Name = config.Name });
ApplyStreamCreate(config.Name, resolvedGroup);
// Clear inflight
_inflightStreams.TryRemove(config.Name, out _);
RemoveInflightStreamProposal("$G", config.Name);
return Task.CompletedTask;
}
@@ -131,14 +273,16 @@ public sealed class JetStreamMetaGroup
if (_assignments.ContainsKey(config.Name))
throw new InvalidOperationException($"Stream '{config.Name}' already exists.");
var resolvedGroup = group ?? new RaftGroup { Name = config.Name };
// Track as inflight
_inflightStreams[config.Name] = config.Name;
TrackInflightStreamProposal("$G", new StreamAssignment { StreamName = config.Name, Group = resolvedGroup });
// Apply the entry
ApplyStreamCreate(config.Name, group ?? new RaftGroup { Name = config.Name });
ApplyStreamCreate(config.Name, resolvedGroup);
// Clear inflight
_inflightStreams.TryRemove(config.Name, out _);
RemoveInflightStreamProposal("$G", config.Name);
return Task.CompletedTask;
}
@@ -187,14 +331,13 @@ public sealed class JetStreamMetaGroup
_ = ct;
// Track as inflight
var inflightKey = $"{streamName}/{consumerName}";
_inflightConsumers[inflightKey] = inflightKey;
TrackInflightConsumerProposal("$G", streamName, consumerName);
// Apply the entry (silently ignored if stream does not exist)
ApplyConsumerCreate(streamName, consumerName, group);
// Clear inflight
_inflightConsumers.TryRemove(inflightKey, out _);
RemoveInflightConsumerProposal("$G", streamName, consumerName);
return Task.CompletedTask;
}
@@ -219,14 +362,13 @@ public sealed class JetStreamMetaGroup
throw new InvalidOperationException($"Stream '{streamName}' not found.");
// Track as inflight
var inflightKey = $"{streamName}/{consumerName}";
_inflightConsumers[inflightKey] = inflightKey;
TrackInflightConsumerProposal("$G", streamName, consumerName);
// Apply the entry
ApplyConsumerCreate(streamName, consumerName, group);
// Clear inflight
_inflightConsumers.TryRemove(inflightKey, out _);
RemoveInflightConsumerProposal("$G", streamName, consumerName);
return Task.CompletedTask;
}
@@ -612,3 +754,11 @@ public sealed class MetaGroupState
/// </summary>
public int ConsumerCount { get; init; }
}
/// <summary>
/// Tracks an inflight stream or consumer proposal with ops counting.
/// OpsCount increments on duplicate proposals so that each proposer must
/// independently call Remove before the entry is cleared.
/// Go reference: jetstream_cluster.go inflight proposal tracking.
/// </summary>
public record InflightInfo(int OpsCount, bool Deleted, StreamAssignment? Assignment);

View File

@@ -0,0 +1,146 @@
using NATS.Server.JetStream.Cluster;
namespace NATS.Server.Tests.JetStream.Cluster;
public class JetStreamInflightTrackingTests
{
[Fact]
public void TrackInflightStreamProposal_increments_ops()
{
var meta = new JetStreamMetaGroup(3);
var sa = new StreamAssignment
{
StreamName = "inflight-1",
Group = new RaftGroup { Name = "rg-inf", Peers = ["n1", "n2", "n3"] },
};
meta.TrackInflightStreamProposal("ACC", sa);
meta.InflightStreamCount.ShouldBe(1);
meta.IsStreamInflight("ACC", "inflight-1").ShouldBeTrue();
}
[Fact]
public void RemoveInflightStreamProposal_clears_when_zero()
{
var meta = new JetStreamMetaGroup(3);
var sa = new StreamAssignment
{
StreamName = "inflight-2",
Group = new RaftGroup { Name = "rg-inf2", Peers = ["n1", "n2", "n3"] },
};
meta.TrackInflightStreamProposal("ACC", sa);
meta.RemoveInflightStreamProposal("ACC", "inflight-2");
meta.IsStreamInflight("ACC", "inflight-2").ShouldBeFalse();
}
[Fact]
public void Duplicate_proposal_increments_ops_count()
{
var meta = new JetStreamMetaGroup(3);
var sa = new StreamAssignment
{
StreamName = "dup-stream",
Group = new RaftGroup { Name = "rg-dup", Peers = ["n1", "n2", "n3"] },
};
meta.TrackInflightStreamProposal("ACC", sa);
meta.TrackInflightStreamProposal("ACC", sa);
meta.InflightStreamCount.ShouldBe(1); // still one unique stream
// Need two removes to fully clear
meta.RemoveInflightStreamProposal("ACC", "dup-stream");
meta.IsStreamInflight("ACC", "dup-stream").ShouldBeTrue(); // ops > 0
meta.RemoveInflightStreamProposal("ACC", "dup-stream");
meta.IsStreamInflight("ACC", "dup-stream").ShouldBeFalse();
}
[Fact]
public void IsStreamInflight_returns_false_for_unknown_account()
{
var meta = new JetStreamMetaGroup(3);
meta.IsStreamInflight("UNKNOWN", "no-stream").ShouldBeFalse();
}
[Fact]
public void TrackInflightConsumerProposal_tracks_by_account()
{
var meta = new JetStreamMetaGroup(3);
meta.TrackInflightConsumerProposal("ACC", "stream1", "consumer1");
meta.InflightConsumerCount.ShouldBe(1);
meta.IsConsumerInflight("ACC", "stream1", "consumer1").ShouldBeTrue();
}
[Fact]
public void RemoveInflightConsumerProposal_clears_when_zero()
{
var meta = new JetStreamMetaGroup(3);
meta.TrackInflightConsumerProposal("ACC", "stream1", "consumer1");
meta.RemoveInflightConsumerProposal("ACC", "stream1", "consumer1");
meta.IsConsumerInflight("ACC", "stream1", "consumer1").ShouldBeFalse();
}
[Fact]
public void ClearAllInflight_removes_everything()
{
var meta = new JetStreamMetaGroup(3);
var sa = new StreamAssignment
{
StreamName = "s1",
Group = new RaftGroup { Name = "rg", Peers = ["n1", "n2", "n3"] },
};
meta.TrackInflightStreamProposal("ACC1", sa);
meta.TrackInflightConsumerProposal("ACC2", "s2", "c1");
meta.ClearAllInflight();
meta.InflightStreamCount.ShouldBe(0);
meta.InflightConsumerCount.ShouldBe(0);
}
[Fact]
public void StepDown_clears_inflight()
{
var meta = new JetStreamMetaGroup(3);
var sa = new StreamAssignment
{
StreamName = "s1",
Group = new RaftGroup { Name = "rg", Peers = ["n1", "n2", "n3"] },
};
meta.TrackInflightStreamProposal("ACC", sa);
meta.StepDown();
meta.InflightStreamCount.ShouldBe(0);
}
[Fact]
public void Multiple_accounts_tracked_independently()
{
var meta = new JetStreamMetaGroup(3);
var sa1 = new StreamAssignment
{
StreamName = "s1",
Group = new RaftGroup { Name = "rg1", Peers = ["n1", "n2", "n3"] },
};
var sa2 = new StreamAssignment
{
StreamName = "s1", // same stream name, different account
Group = new RaftGroup { Name = "rg2", Peers = ["n1", "n2", "n3"] },
};
meta.TrackInflightStreamProposal("ACC1", sa1);
meta.TrackInflightStreamProposal("ACC2", sa2);
meta.InflightStreamCount.ShouldBe(2); // one per account
meta.IsStreamInflight("ACC1", "s1").ShouldBeTrue();
meta.IsStreamInflight("ACC2", "s1").ShouldBeTrue();
meta.RemoveInflightStreamProposal("ACC1", "s1");
meta.IsStreamInflight("ACC1", "s1").ShouldBeFalse();
meta.IsStreamInflight("ACC2", "s1").ShouldBeTrue(); // still tracked
}
}