Files
natsnet/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamBatching.cs
Joseph Doherty 84d450b4a0 feat: port session 19 — JetStream Core
- JetStreamTypes: JetStreamConfig, JetStreamStats, JetStreamAccountLimits,
  JetStreamTier, JetStreamAccountStats, JetStream engine, JsAccount, JsaUsage
- JetStreamApiTypes: 50+ JSApi request/response types, API subject constants
- JetStreamErrors: JsApiError + JsApiErrors with all 203 error codes
- JetStreamVersioning: version constants and API level helpers
- JetStreamBatching: Batching, BatchGroup, BatchStagedDiff, BatchApply
- Removed JetStreamConfig/JetStreamState stubs from NatsServerTypes.cs
- 374 features complete (IDs 1368-1519, 1751-1972)
2026-02-26 16:14:40 -05:00

133 lines
5.0 KiB
C#

// Copyright 2025 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Adapted from server/jetstream_batching.go in the NATS server Go source.
namespace ZB.MOM.NatsNet.Server;
// ---------------------------------------------------------------------------
// Batching types
// ---------------------------------------------------------------------------
/// <summary>
/// Tracks in-progress atomic publish batch groups for a stream.
/// Mirrors the <c>batching</c> struct in server/jetstream_batching.go.
/// </summary>
internal sealed class Batching
{
private readonly Lock _mu = new();
private readonly Dictionary<string, BatchGroup> _group = new(StringComparer.Ordinal);
public Lock Mu => _mu;
public Dictionary<string, BatchGroup> Group => _group;
}
/// <summary>
/// A single in-progress atomic batch: its temporary store and cleanup timer.
/// Mirrors <c>batchGroup</c> in server/jetstream_batching.go.
/// </summary>
internal sealed class BatchGroup
{
/// <summary>Last proposed stream sequence for this batch.</summary>
public ulong LastSeq { get; set; }
/// <summary>Temporary backing store for the batch's messages.</summary>
public object? Store { get; set; } // IStreamStore — session 20
/// <summary>Timer that abandons the batch after the configured timeout.</summary>
public Timer? BatchTimer { get; set; }
/// <summary>
/// Stops the cleanup timer and flushes pending writes so the batch is
/// ready to be committed.
/// Mirrors <c>batchGroup.readyForCommit</c>.
/// </summary>
public bool ReadyForCommit()
{
// Stub — full implementation requires IStreamStore.FlushAllPending (session 20).
return BatchTimer?.Change(Timeout.Infinite, Timeout.Infinite) != null;
}
}
/// <summary>
/// Stages consistency-check data for a single atomic batch before it is committed.
/// Mirrors <c>batchStagedDiff</c> in server/jetstream_batching.go.
/// </summary>
internal sealed class BatchStagedDiff
{
/// <summary>Message IDs seen in this batch, for duplicate detection.</summary>
public Dictionary<string, object?>? MsgIds { get; set; }
/// <summary>Running counter totals, keyed by subject.</summary>
public Dictionary<string, object?>? Counter { get; set; } // map[string]*msgCounterRunningTotal
/// <summary>Inflight subject byte/op totals for DiscardNew checks.</summary>
public Dictionary<string, object?>? Inflight { get; set; } // map[string]*inflightSubjectRunningTotal
/// <summary>Expected-last-seq-per-subject checks staged in this batch.</summary>
public Dictionary<string, BatchExpectedPerSubject>? ExpectedPerSubject { get; set; }
}
/// <summary>
/// Cached expected-last-sequence-per-subject result for a single subject within a batch.
/// Mirrors <c>batchExpectedPerSubject</c> in server/jetstream_batching.go.
/// </summary>
internal sealed class BatchExpectedPerSubject
{
/// <summary>Stream sequence of the last message on this subject at proposal time.</summary>
public ulong SSeq { get; set; }
/// <summary>Clustered proposal sequence at which this check was computed.</summary>
public ulong ClSeq { get; set; }
}
/// <summary>
/// Tracks the in-progress application of a committed batch on the Raft apply path.
/// Mirrors <c>batchApply</c> in server/jetstream_batching.go.
/// </summary>
internal sealed class BatchApply
{
private readonly Lock _mu = new();
/// <summary>ID of the current batch.</summary>
public string Id { get; set; } = string.Empty;
/// <summary>Number of entries expected in the batch (for consistency checks).</summary>
public ulong Count { get; set; }
/// <summary>Raft committed entries that make up this batch.</summary>
public List<object?>? Entries { get; set; } // []*CommittedEntry — session 20+
/// <summary>Index within an entry indicating the first message of the batch.</summary>
public int EntryStart { get; set; }
/// <summary>Applied value before the entry containing the first batch message.</summary>
public ulong MaxApplied { get; set; }
public Lock Mu => _mu;
/// <summary>
/// Clears in-memory apply-batch state.
/// Mirrors <c>batchApply.clearBatchStateLocked</c>.
/// Lock should be held.
/// </summary>
public void ClearBatchStateLocked()
{
Id = string.Empty;
Count = 0;
Entries = null;
EntryStart = 0;
MaxApplied = 0;
}
}