feat: port session 19 — JetStream Core
- JetStreamTypes: JetStreamConfig, JetStreamStats, JetStreamAccountLimits, JetStreamTier, JetStreamAccountStats, JetStream engine, JsAccount, JsaUsage - JetStreamApiTypes: 50+ JSApi request/response types, API subject constants - JetStreamErrors: JsApiError + JsApiErrors with all 203 error codes - JetStreamVersioning: version constants and API level helpers - JetStreamBatching: Batching, BatchGroup, BatchStagedDiff, BatchApply - Removed JetStreamConfig/JetStreamState stubs from NatsServerTypes.cs - 374 features complete (IDs 1368-1519, 1751-1972)
This commit is contained in:
132
dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamBatching.cs
Normal file
132
dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamBatching.cs
Normal file
@@ -0,0 +1,132 @@
|
||||
// Copyright 2025 The NATS Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
//
|
||||
// Adapted from server/jetstream_batching.go in the NATS server Go source.
|
||||
|
||||
namespace ZB.MOM.NatsNet.Server;
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Batching types
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
/// <summary>
|
||||
/// Tracks in-progress atomic publish batch groups for a stream.
|
||||
/// Mirrors the <c>batching</c> struct in server/jetstream_batching.go.
|
||||
/// </summary>
|
||||
internal sealed class Batching
|
||||
{
|
||||
private readonly Lock _mu = new();
|
||||
private readonly Dictionary<string, BatchGroup> _group = new(StringComparer.Ordinal);
|
||||
|
||||
public Lock Mu => _mu;
|
||||
public Dictionary<string, BatchGroup> Group => _group;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// A single in-progress atomic batch: its temporary store and cleanup timer.
|
||||
/// Mirrors <c>batchGroup</c> in server/jetstream_batching.go.
|
||||
/// </summary>
|
||||
internal sealed class BatchGroup
|
||||
{
|
||||
/// <summary>Last proposed stream sequence for this batch.</summary>
|
||||
public ulong LastSeq { get; set; }
|
||||
|
||||
/// <summary>Temporary backing store for the batch's messages.</summary>
|
||||
public object? Store { get; set; } // IStreamStore — session 20
|
||||
|
||||
/// <summary>Timer that abandons the batch after the configured timeout.</summary>
|
||||
public Timer? BatchTimer { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// Stops the cleanup timer and flushes pending writes so the batch is
|
||||
/// ready to be committed.
|
||||
/// Mirrors <c>batchGroup.readyForCommit</c>.
|
||||
/// </summary>
|
||||
public bool ReadyForCommit()
|
||||
{
|
||||
// Stub — full implementation requires IStreamStore.FlushAllPending (session 20).
|
||||
return BatchTimer?.Change(Timeout.Infinite, Timeout.Infinite) != null;
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Stages consistency-check data for a single atomic batch before it is committed.
|
||||
/// Mirrors <c>batchStagedDiff</c> in server/jetstream_batching.go.
|
||||
/// </summary>
|
||||
internal sealed class BatchStagedDiff
|
||||
{
|
||||
/// <summary>Message IDs seen in this batch, for duplicate detection.</summary>
|
||||
public Dictionary<string, object?>? MsgIds { get; set; }
|
||||
|
||||
/// <summary>Running counter totals, keyed by subject.</summary>
|
||||
public Dictionary<string, object?>? Counter { get; set; } // map[string]*msgCounterRunningTotal
|
||||
|
||||
/// <summary>Inflight subject byte/op totals for DiscardNew checks.</summary>
|
||||
public Dictionary<string, object?>? Inflight { get; set; } // map[string]*inflightSubjectRunningTotal
|
||||
|
||||
/// <summary>Expected-last-seq-per-subject checks staged in this batch.</summary>
|
||||
public Dictionary<string, BatchExpectedPerSubject>? ExpectedPerSubject { get; set; }
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Cached expected-last-sequence-per-subject result for a single subject within a batch.
|
||||
/// Mirrors <c>batchExpectedPerSubject</c> in server/jetstream_batching.go.
|
||||
/// </summary>
|
||||
internal sealed class BatchExpectedPerSubject
|
||||
{
|
||||
/// <summary>Stream sequence of the last message on this subject at proposal time.</summary>
|
||||
public ulong SSeq { get; set; }
|
||||
|
||||
/// <summary>Clustered proposal sequence at which this check was computed.</summary>
|
||||
public ulong ClSeq { get; set; }
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Tracks the in-progress application of a committed batch on the Raft apply path.
|
||||
/// Mirrors <c>batchApply</c> in server/jetstream_batching.go.
|
||||
/// </summary>
|
||||
internal sealed class BatchApply
|
||||
{
|
||||
private readonly Lock _mu = new();
|
||||
|
||||
/// <summary>ID of the current batch.</summary>
|
||||
public string Id { get; set; } = string.Empty;
|
||||
|
||||
/// <summary>Number of entries expected in the batch (for consistency checks).</summary>
|
||||
public ulong Count { get; set; }
|
||||
|
||||
/// <summary>Raft committed entries that make up this batch.</summary>
|
||||
public List<object?>? Entries { get; set; } // []*CommittedEntry — session 20+
|
||||
|
||||
/// <summary>Index within an entry indicating the first message of the batch.</summary>
|
||||
public int EntryStart { get; set; }
|
||||
|
||||
/// <summary>Applied value before the entry containing the first batch message.</summary>
|
||||
public ulong MaxApplied { get; set; }
|
||||
|
||||
public Lock Mu => _mu;
|
||||
|
||||
/// <summary>
|
||||
/// Clears in-memory apply-batch state.
|
||||
/// Mirrors <c>batchApply.clearBatchStateLocked</c>.
|
||||
/// Lock should be held.
|
||||
/// </summary>
|
||||
public void ClearBatchStateLocked()
|
||||
{
|
||||
Id = string.Empty;
|
||||
Count = 0;
|
||||
Entries = null;
|
||||
EntryStart = 0;
|
||||
MaxApplied = 0;
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user