feat: add atomic batch publish engine & versioning support (Tasks 9-10)
- AtomicBatchPublishEngine: stage/commit/rollback semantics for batch publish - JsVersioning: API level negotiation and stream/consumer metadata - Fix NormalizeConfig missing AllowAtomicPublish, Metadata, PersistMode copy - 46 batch publish tests + 67 versioning tests, all passing
This commit is contained in:
@@ -11,7 +11,7 @@ public sealed record ExternalAuthRequest(
|
|||||||
string? Token,
|
string? Token,
|
||||||
string? Jwt);
|
string? Jwt);
|
||||||
|
|
||||||
public sealed record ExternalAuthDecision(
|
public record ExternalAuthDecision(
|
||||||
bool Allowed,
|
bool Allowed,
|
||||||
string? Identity = null,
|
string? Identity = null,
|
||||||
string? Account = null,
|
string? Account = null,
|
||||||
|
|||||||
299
src/NATS.Server/JetStream/JsVersioning.cs
Normal file
299
src/NATS.Server/JetStream/JsVersioning.cs
Normal file
@@ -0,0 +1,299 @@
|
|||||||
|
// Go reference: golang/nats-server/server/jetstream_versioning.go
|
||||||
|
// Versioning metadata management for JetStream streams and consumers.
|
||||||
|
// Manages the _nats.req.level, _nats.ver, _nats.level metadata keys.
|
||||||
|
|
||||||
|
using NATS.Server.JetStream.Models;
|
||||||
|
|
||||||
|
namespace NATS.Server.JetStream;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// JetStream API versioning constants and metadata management.
|
||||||
|
/// Go reference: server/jetstream_versioning.go
|
||||||
|
/// </summary>
|
||||||
|
public static class JsVersioning
|
||||||
|
{
|
||||||
|
/// <summary>
|
||||||
|
/// Current JetStream API level supported by this server.
|
||||||
|
/// Go: JSApiLevel = 3 (jetstream_versioning.go:20)
|
||||||
|
/// </summary>
|
||||||
|
public const int JsApiLevel = 3;
|
||||||
|
|
||||||
|
/// <summary>Server version string.</summary>
|
||||||
|
public const string Version = "2.12.0";
|
||||||
|
|
||||||
|
/// <summary>Metadata key for the minimum required API level to use this asset.</summary>
|
||||||
|
public const string RequiredLevelKey = "_nats.req.level";
|
||||||
|
|
||||||
|
/// <summary>Metadata key for the server version that last modified this asset.</summary>
|
||||||
|
public const string ServerVersionKey = "_nats.ver";
|
||||||
|
|
||||||
|
/// <summary>Metadata key for the server API level that last modified this asset.</summary>
|
||||||
|
public const string ServerLevelKey = "_nats.level";
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Returns the required API level string from metadata, or empty if absent.
|
||||||
|
/// Go: getRequiredApiLevel (jetstream_versioning.go:28)
|
||||||
|
/// </summary>
|
||||||
|
public static string GetRequiredApiLevel(Dictionary<string, string>? metadata)
|
||||||
|
{
|
||||||
|
if (metadata != null && metadata.TryGetValue(RequiredLevelKey, out var level) && level.Length > 0)
|
||||||
|
return level;
|
||||||
|
return string.Empty;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Returns whether the required API level is supported by this server.
|
||||||
|
/// Go: supportsRequiredApiLevel (jetstream_versioning.go:36)
|
||||||
|
/// </summary>
|
||||||
|
public static bool SupportsRequiredApiLevel(Dictionary<string, string>? metadata)
|
||||||
|
{
|
||||||
|
var level = GetRequiredApiLevel(metadata);
|
||||||
|
if (level.Length == 0)
|
||||||
|
return true;
|
||||||
|
if (!int.TryParse(level, out var li))
|
||||||
|
return false;
|
||||||
|
return li <= JsApiLevel;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Sets static (stored) versioning metadata on a stream config.
|
||||||
|
/// Clears dynamic fields (server version/level) and sets the required API level.
|
||||||
|
/// Go: setStaticStreamMetadata (jetstream_versioning.go:44)
|
||||||
|
/// </summary>
|
||||||
|
public static void SetStaticStreamMetadata(StreamConfig cfg)
|
||||||
|
{
|
||||||
|
if (cfg.Metadata == null)
|
||||||
|
cfg.Metadata = [];
|
||||||
|
else
|
||||||
|
DeleteDynamicMetadata(cfg.Metadata);
|
||||||
|
|
||||||
|
var requiredApiLevel = 0;
|
||||||
|
void Requires(int level) { if (level > requiredApiLevel) requiredApiLevel = level; }
|
||||||
|
|
||||||
|
// TTLs require API level 1 (added v2.11)
|
||||||
|
if (cfg.AllowMsgTtl || cfg.SubjectDeleteMarkerTtlMs > 0)
|
||||||
|
Requires(1);
|
||||||
|
|
||||||
|
// Counter CRDTs require API level 2 (added v2.12)
|
||||||
|
if (cfg.AllowMsgCounter)
|
||||||
|
Requires(2);
|
||||||
|
|
||||||
|
// Atomic batch publishing requires API level 2 (added v2.12)
|
||||||
|
if (cfg.AllowAtomicPublish)
|
||||||
|
Requires(2);
|
||||||
|
|
||||||
|
// Message scheduling requires API level 2 (added v2.12)
|
||||||
|
if (cfg.AllowMsgSchedules)
|
||||||
|
Requires(2);
|
||||||
|
|
||||||
|
// Async persist mode requires API level 2 (added v2.12)
|
||||||
|
if (cfg.PersistMode == PersistMode.Async)
|
||||||
|
Requires(2);
|
||||||
|
|
||||||
|
cfg.Metadata[RequiredLevelKey] = requiredApiLevel.ToString();
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Returns a copy of the stream config with dynamic metadata fields added.
|
||||||
|
/// The original config is not modified.
|
||||||
|
/// Go: setDynamicStreamMetadata (jetstream_versioning.go:88)
|
||||||
|
/// </summary>
|
||||||
|
public static StreamConfig SetDynamicStreamMetadata(StreamConfig cfg)
|
||||||
|
{
|
||||||
|
// Shallow copy the config
|
||||||
|
var newCfg = ShallowCopyStream(cfg);
|
||||||
|
newCfg.Metadata = [];
|
||||||
|
if (cfg.Metadata != null)
|
||||||
|
{
|
||||||
|
foreach (var (k, v) in cfg.Metadata)
|
||||||
|
newCfg.Metadata[k] = v;
|
||||||
|
}
|
||||||
|
newCfg.Metadata[ServerVersionKey] = Version;
|
||||||
|
newCfg.Metadata[ServerLevelKey] = JsApiLevel.ToString();
|
||||||
|
return newCfg;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Copies versioning fields from prevCfg into cfg (for stream update equality checks).
|
||||||
|
/// Removes dynamic fields. If prevCfg has no metadata, removes the key from cfg.
|
||||||
|
/// Go: copyStreamMetadata (jetstream_versioning.go:110)
|
||||||
|
/// </summary>
|
||||||
|
public static void CopyStreamMetadata(StreamConfig cfg, StreamConfig? prevCfg)
|
||||||
|
{
|
||||||
|
if (cfg.Metadata != null)
|
||||||
|
DeleteDynamicMetadata(cfg.Metadata);
|
||||||
|
SetOrDeleteInStreamMetadata(cfg, prevCfg, RequiredLevelKey);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Sets static (stored) versioning metadata on a consumer config.
|
||||||
|
/// Go: setStaticConsumerMetadata (jetstream_versioning.go:136)
|
||||||
|
/// </summary>
|
||||||
|
public static void SetStaticConsumerMetadata(ConsumerConfig cfg)
|
||||||
|
{
|
||||||
|
if (cfg.Metadata == null)
|
||||||
|
cfg.Metadata = [];
|
||||||
|
else
|
||||||
|
DeleteDynamicMetadata(cfg.Metadata);
|
||||||
|
|
||||||
|
var requiredApiLevel = 0;
|
||||||
|
void Requires(int level) { if (level > requiredApiLevel) requiredApiLevel = level; }
|
||||||
|
|
||||||
|
// PauseUntil (non-zero) requires API level 1 (added v2.11)
|
||||||
|
if (cfg.PauseUntil.HasValue && cfg.PauseUntil.Value != DateTime.MinValue && cfg.PauseUntil.Value != default)
|
||||||
|
Requires(1);
|
||||||
|
|
||||||
|
// Priority policy / groups / pinned TTL require API level 1
|
||||||
|
if (cfg.PriorityPolicy != PriorityPolicy.None || cfg.PinnedTtlMs != 0 || cfg.PriorityGroups.Count > 0)
|
||||||
|
Requires(1);
|
||||||
|
|
||||||
|
cfg.Metadata[RequiredLevelKey] = requiredApiLevel.ToString();
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Returns a copy of the consumer config with dynamic metadata fields added.
|
||||||
|
/// Go: setDynamicConsumerMetadata (jetstream_versioning.go:164)
|
||||||
|
/// </summary>
|
||||||
|
public static ConsumerConfig SetDynamicConsumerMetadata(ConsumerConfig cfg)
|
||||||
|
{
|
||||||
|
var newCfg = ShallowCopyConsumer(cfg);
|
||||||
|
newCfg.Metadata = [];
|
||||||
|
if (cfg.Metadata != null)
|
||||||
|
{
|
||||||
|
foreach (var (k, v) in cfg.Metadata)
|
||||||
|
newCfg.Metadata[k] = v;
|
||||||
|
}
|
||||||
|
newCfg.Metadata[ServerVersionKey] = Version;
|
||||||
|
newCfg.Metadata[ServerLevelKey] = JsApiLevel.ToString();
|
||||||
|
return newCfg;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Copies versioning fields from prevCfg into cfg (for consumer update equality checks).
|
||||||
|
/// Removes dynamic fields.
|
||||||
|
/// Go: copyConsumerMetadata (jetstream_versioning.go:198)
|
||||||
|
/// </summary>
|
||||||
|
public static void CopyConsumerMetadata(ConsumerConfig cfg, ConsumerConfig? prevCfg)
|
||||||
|
{
|
||||||
|
if (cfg.Metadata != null)
|
||||||
|
DeleteDynamicMetadata(cfg.Metadata);
|
||||||
|
SetOrDeleteInConsumerMetadata(cfg, prevCfg, RequiredLevelKey);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Removes dynamic metadata fields (server version and level) from a metadata dictionary.
|
||||||
|
/// Go: deleteDynamicMetadata (jetstream_versioning.go:222)
|
||||||
|
/// </summary>
|
||||||
|
public static void DeleteDynamicMetadata(Dictionary<string, string> metadata)
|
||||||
|
{
|
||||||
|
metadata.Remove(ServerVersionKey);
|
||||||
|
metadata.Remove(ServerLevelKey);
|
||||||
|
}
|
||||||
|
|
||||||
|
// =========================================================================
|
||||||
|
// Private helpers
|
||||||
|
// =========================================================================
|
||||||
|
|
||||||
|
private static void SetOrDeleteInStreamMetadata(StreamConfig cfg, StreamConfig? prevCfg, string key)
|
||||||
|
{
|
||||||
|
if (prevCfg?.Metadata != null && prevCfg.Metadata.TryGetValue(key, out var value))
|
||||||
|
{
|
||||||
|
cfg.Metadata ??= [];
|
||||||
|
cfg.Metadata[key] = value;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (cfg.Metadata != null)
|
||||||
|
{
|
||||||
|
cfg.Metadata.Remove(key);
|
||||||
|
if (cfg.Metadata.Count == 0)
|
||||||
|
cfg.Metadata = null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void SetOrDeleteInConsumerMetadata(ConsumerConfig cfg, ConsumerConfig? prevCfg, string key)
|
||||||
|
{
|
||||||
|
if (prevCfg?.Metadata != null && prevCfg.Metadata.TryGetValue(key, out var value))
|
||||||
|
{
|
||||||
|
cfg.Metadata ??= [];
|
||||||
|
cfg.Metadata[key] = value;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (cfg.Metadata != null)
|
||||||
|
{
|
||||||
|
cfg.Metadata.Remove(key);
|
||||||
|
if (cfg.Metadata.Count == 0)
|
||||||
|
cfg.Metadata = null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>Shallow copy of a StreamConfig for metadata mutation.</summary>
|
||||||
|
private static StreamConfig ShallowCopyStream(StreamConfig src) => new()
|
||||||
|
{
|
||||||
|
Name = src.Name,
|
||||||
|
Description = src.Description,
|
||||||
|
Subjects = src.Subjects,
|
||||||
|
MaxMsgs = src.MaxMsgs,
|
||||||
|
MaxBytes = src.MaxBytes,
|
||||||
|
MaxMsgsPer = src.MaxMsgsPer,
|
||||||
|
MaxAgeMs = src.MaxAgeMs,
|
||||||
|
MaxMsgSize = src.MaxMsgSize,
|
||||||
|
MaxConsumers = src.MaxConsumers,
|
||||||
|
DuplicateWindowMs = src.DuplicateWindowMs,
|
||||||
|
Sealed = src.Sealed,
|
||||||
|
DenyDelete = src.DenyDelete,
|
||||||
|
DenyPurge = src.DenyPurge,
|
||||||
|
AllowDirect = src.AllowDirect,
|
||||||
|
AllowMsgTtl = src.AllowMsgTtl,
|
||||||
|
FirstSeq = src.FirstSeq,
|
||||||
|
Retention = src.Retention,
|
||||||
|
Discard = src.Discard,
|
||||||
|
Storage = src.Storage,
|
||||||
|
Replicas = src.Replicas,
|
||||||
|
Mirror = src.Mirror,
|
||||||
|
Source = src.Source,
|
||||||
|
Sources = src.Sources,
|
||||||
|
SubjectTransformSource = src.SubjectTransformSource,
|
||||||
|
SubjectTransformDest = src.SubjectTransformDest,
|
||||||
|
RePublishSource = src.RePublishSource,
|
||||||
|
RePublishDest = src.RePublishDest,
|
||||||
|
RePublishHeadersOnly = src.RePublishHeadersOnly,
|
||||||
|
SubjectDeleteMarkerTtlMs = src.SubjectDeleteMarkerTtlMs,
|
||||||
|
AllowMsgSchedules = src.AllowMsgSchedules,
|
||||||
|
AllowMsgCounter = src.AllowMsgCounter,
|
||||||
|
AllowAtomicPublish = src.AllowAtomicPublish,
|
||||||
|
PersistMode = src.PersistMode,
|
||||||
|
Metadata = src.Metadata,
|
||||||
|
};
|
||||||
|
|
||||||
|
/// <summary>Shallow copy of a ConsumerConfig for metadata mutation.</summary>
|
||||||
|
private static ConsumerConfig ShallowCopyConsumer(ConsumerConfig src) => new()
|
||||||
|
{
|
||||||
|
DurableName = src.DurableName,
|
||||||
|
Ephemeral = src.Ephemeral,
|
||||||
|
FilterSubject = src.FilterSubject,
|
||||||
|
FilterSubjects = src.FilterSubjects,
|
||||||
|
AckPolicy = src.AckPolicy,
|
||||||
|
DeliverPolicy = src.DeliverPolicy,
|
||||||
|
OptStartSeq = src.OptStartSeq,
|
||||||
|
OptStartTimeUtc = src.OptStartTimeUtc,
|
||||||
|
ReplayPolicy = src.ReplayPolicy,
|
||||||
|
AckWaitMs = src.AckWaitMs,
|
||||||
|
MaxDeliver = src.MaxDeliver,
|
||||||
|
MaxAckPending = src.MaxAckPending,
|
||||||
|
Push = src.Push,
|
||||||
|
DeliverSubject = src.DeliverSubject,
|
||||||
|
HeartbeatMs = src.HeartbeatMs,
|
||||||
|
BackOffMs = src.BackOffMs,
|
||||||
|
FlowControl = src.FlowControl,
|
||||||
|
RateLimitBps = src.RateLimitBps,
|
||||||
|
MaxWaiting = src.MaxWaiting,
|
||||||
|
MaxRequestBatch = src.MaxRequestBatch,
|
||||||
|
MaxRequestMaxBytes = src.MaxRequestMaxBytes,
|
||||||
|
MaxRequestExpiresMs = src.MaxRequestExpiresMs,
|
||||||
|
PauseUntil = src.PauseUntil,
|
||||||
|
PriorityPolicy = src.PriorityPolicy,
|
||||||
|
PriorityGroups = src.PriorityGroups,
|
||||||
|
PinnedTtlMs = src.PinnedTtlMs,
|
||||||
|
Metadata = src.Metadata,
|
||||||
|
};
|
||||||
|
}
|
||||||
@@ -34,6 +34,29 @@ public sealed class ConsumerConfig
|
|||||||
// Go: consumer.go — max_request_expires limits expires duration per pull request (ms)
|
// Go: consumer.go — max_request_expires limits expires duration per pull request (ms)
|
||||||
public int MaxRequestExpiresMs { get; set; }
|
public int MaxRequestExpiresMs { get; set; }
|
||||||
|
|
||||||
|
// Go: ConsumerConfig.PauseUntil — pauses consumer delivery until this UTC time.
|
||||||
|
// Null or zero time means not paused.
|
||||||
|
// Added in v2.11, requires API level 1.
|
||||||
|
// Go reference: server/consumer.go (PauseUntil field)
|
||||||
|
public DateTime? PauseUntil { get; set; }
|
||||||
|
|
||||||
|
// Go: ConsumerConfig.PriorityPolicy — consumer priority routing policy.
|
||||||
|
// PriorityPinnedClient requires API level 1.
|
||||||
|
// Go reference: server/consumer.go (PriorityPolicy field)
|
||||||
|
public PriorityPolicy PriorityPolicy { get; set; } = PriorityPolicy.None;
|
||||||
|
|
||||||
|
// Go: ConsumerConfig.PriorityGroups — list of priority group names.
|
||||||
|
// Go reference: server/consumer.go (PriorityGroups field)
|
||||||
|
public List<string> PriorityGroups { get; set; } = [];
|
||||||
|
|
||||||
|
// Go: ConsumerConfig.PinnedTTL — TTL for pinned client assignment.
|
||||||
|
// Go reference: server/consumer.go (PinnedTTL field)
|
||||||
|
public long PinnedTtlMs { get; set; }
|
||||||
|
|
||||||
|
// Go: ConsumerConfig.Metadata — user-supplied and server-managed key/value metadata.
|
||||||
|
// Go reference: server/consumer.go (Metadata field)
|
||||||
|
public Dictionary<string, string>? Metadata { get; set; }
|
||||||
|
|
||||||
public string? ResolvePrimaryFilterSubject()
|
public string? ResolvePrimaryFilterSubject()
|
||||||
{
|
{
|
||||||
if (FilterSubjects.Count > 0)
|
if (FilterSubjects.Count > 0)
|
||||||
@@ -49,3 +72,14 @@ public enum AckPolicy
|
|||||||
Explicit,
|
Explicit,
|
||||||
All,
|
All,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Consumer priority routing policy.
|
||||||
|
/// Go reference: server/consumer.go — PriorityNone, PriorityPinnedClient constants.
|
||||||
|
/// </summary>
|
||||||
|
public enum PriorityPolicy
|
||||||
|
{
|
||||||
|
None = 0,
|
||||||
|
Overflow = 1,
|
||||||
|
PinnedClient = 2,
|
||||||
|
}
|
||||||
|
|||||||
@@ -52,6 +52,36 @@ public sealed class StreamConfig
|
|||||||
// Incompatible with Mirror and Sources.
|
// Incompatible with Mirror and Sources.
|
||||||
// Go reference: server/stream.go:369 (AllowMsgSchedules field)
|
// Go reference: server/stream.go:369 (AllowMsgSchedules field)
|
||||||
public bool AllowMsgSchedules { get; set; }
|
public bool AllowMsgSchedules { get; set; }
|
||||||
|
|
||||||
|
// Go: StreamConfig.AllowMsgCounter — enables CRDT counter semantics on messages.
|
||||||
|
// Added in v2.12, requires API level 2.
|
||||||
|
// Go reference: server/stream.go:365 (AllowMsgCounter field)
|
||||||
|
public bool AllowMsgCounter { get; set; }
|
||||||
|
|
||||||
|
// Go: StreamConfig.AllowAtomicPublish — enables atomic batch publishing.
|
||||||
|
// Added in v2.12, requires API level 2.
|
||||||
|
// Go reference: server/stream.go:367 (AllowAtomicPublish field)
|
||||||
|
public bool AllowAtomicPublish { get; set; }
|
||||||
|
|
||||||
|
// Go: StreamConfig.PersistMode — async vs sync storage persistence.
|
||||||
|
// AsyncPersistMode requires API level 2.
|
||||||
|
// Go reference: server/stream.go:375 (PersistMode field)
|
||||||
|
public PersistMode PersistMode { get; set; } = PersistMode.Sync;
|
||||||
|
|
||||||
|
// Go: StreamConfig.Metadata — user-supplied and server-managed key/value metadata.
|
||||||
|
// The server automatically sets _nats.req.level, _nats.ver, _nats.level.
|
||||||
|
// Go reference: server/stream.go:380 (Metadata field)
|
||||||
|
public Dictionary<string, string>? Metadata { get; set; }
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Persistence mode for the stream.
|
||||||
|
/// Go reference: server/stream.go — AsyncPersistMode constant.
|
||||||
|
/// </summary>
|
||||||
|
public enum PersistMode
|
||||||
|
{
|
||||||
|
Sync = 0,
|
||||||
|
Async = 1,
|
||||||
}
|
}
|
||||||
|
|
||||||
public enum StorageType
|
public enum StorageType
|
||||||
|
|||||||
374
src/NATS.Server/JetStream/Publish/AtomicBatchPublishEngine.cs
Normal file
374
src/NATS.Server/JetStream/Publish/AtomicBatchPublishEngine.cs
Normal file
@@ -0,0 +1,374 @@
|
|||||||
|
using System.Collections.Concurrent;
|
||||||
|
|
||||||
|
namespace NATS.Server.JetStream.Publish;
|
||||||
|
|
||||||
|
// Go: server/jetstream_batching.go — streamBatches / batchGroup
|
||||||
|
// Handles the staging and commit protocol for atomic batch publishing.
|
||||||
|
// Messages within a batch are buffered until Nats-Batch-Commit is received,
|
||||||
|
// then committed atomically to the store.
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Error codes for atomic batch publish operations.
|
||||||
|
/// Go reference: server/jetstream_errors_generated.go
|
||||||
|
/// </summary>
|
||||||
|
public static class AtomicBatchPublishErrorCodes
|
||||||
|
{
|
||||||
|
/// <summary>JSAtomicPublishDisabledErr (10174) — stream has AllowAtomicPublish=false.</summary>
|
||||||
|
public const int Disabled = 10174;
|
||||||
|
|
||||||
|
/// <summary>JSAtomicPublishMissingSeqErr (10175) — Nats-Batch-Id present but Nats-Batch-Sequence missing.</summary>
|
||||||
|
public const int MissingSeq = 10175;
|
||||||
|
|
||||||
|
/// <summary>JSAtomicPublishIncompleteBatchErr (10176) — sequence gap, timeout, or over-limit batch.</summary>
|
||||||
|
public const int IncompleteBatch = 10176;
|
||||||
|
|
||||||
|
/// <summary>JSAtomicPublishUnsupportedHeaderBatchErr (10177) — unsupported header in batch message.</summary>
|
||||||
|
public const int UnsupportedHeader = 10177;
|
||||||
|
|
||||||
|
/// <summary>JSAtomicPublishInvalidBatchIDErr (10179) — batch ID too long (>64 chars).</summary>
|
||||||
|
public const int InvalidBatchId = 10179;
|
||||||
|
|
||||||
|
/// <summary>JSAtomicPublishTooLargeBatchErrF (10199) — batch size exceeds maximum.</summary>
|
||||||
|
public const int TooLargeBatch = 10199;
|
||||||
|
|
||||||
|
/// <summary>JSAtomicPublishInvalidBatchCommitErr (10200) — Nats-Batch-Commit header has invalid value.</summary>
|
||||||
|
public const int InvalidCommit = 10200;
|
||||||
|
|
||||||
|
/// <summary>JSAtomicPublishContainsDuplicateMessageErr (10201) — batch contains a duplicate msg ID.</summary>
|
||||||
|
public const int ContainsDuplicate = 10201;
|
||||||
|
|
||||||
|
/// <summary>JSMirrorWithAtomicPublishErr (10198) — mirror stream cannot have AllowAtomicPublish.</summary>
|
||||||
|
public const int MirrorWithAtomicPublish = 10198;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// A single staged message within an in-flight atomic batch.
|
||||||
|
/// </summary>
|
||||||
|
public sealed class StagedBatchMessage
|
||||||
|
{
|
||||||
|
public required string Subject { get; init; }
|
||||||
|
public required ReadOnlyMemory<byte> Payload { get; init; }
|
||||||
|
public string? MsgId { get; init; }
|
||||||
|
public ulong ExpectedLastSeq { get; init; }
|
||||||
|
public ulong ExpectedLastSubjectSeq { get; init; }
|
||||||
|
public string? ExpectedLastSubjectSeqSubject { get; init; }
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// State for one in-flight atomic batch (identified by Nats-Batch-Id).
|
||||||
|
/// Go reference: server/jetstream_batching.go batchGroup struct.
|
||||||
|
/// </summary>
|
||||||
|
internal sealed class InFlightBatch
|
||||||
|
{
|
||||||
|
private readonly List<StagedBatchMessage> _messages = [];
|
||||||
|
private readonly HashSet<string> _stagedMsgIds = new(StringComparer.Ordinal);
|
||||||
|
|
||||||
|
public DateTimeOffset CreatedAt { get; } = DateTimeOffset.UtcNow;
|
||||||
|
public int Count => _messages.Count;
|
||||||
|
public IReadOnlyList<StagedBatchMessage> Messages => _messages;
|
||||||
|
|
||||||
|
public void Add(StagedBatchMessage msg)
|
||||||
|
{
|
||||||
|
_messages.Add(msg);
|
||||||
|
if (msg.MsgId != null)
|
||||||
|
_stagedMsgIds.Add(msg.MsgId);
|
||||||
|
}
|
||||||
|
|
||||||
|
public bool ContainsMsgId(string msgId) => _stagedMsgIds.Contains(msgId);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Engine that manages all in-flight atomic batches for a single stream.
|
||||||
|
/// Go reference: server/jetstream_batching.go streamBatches struct.
|
||||||
|
///
|
||||||
|
/// The protocol:
|
||||||
|
/// 1. Publisher sends messages with Nats-Batch-Id + Nats-Batch-Sequence headers.
|
||||||
|
/// Messages with seq > 1 are staged; they are not committed to the store yet.
|
||||||
|
/// 2. The final message has Nats-Batch-Commit: 1 (or "eob"). This triggers atomic
|
||||||
|
/// commit of all staged messages.
|
||||||
|
/// 3. On error (gap, timeout, disable, delete, leader-change), the batch is rolled back.
|
||||||
|
/// </summary>
|
||||||
|
public sealed class AtomicBatchPublishEngine
|
||||||
|
{
|
||||||
|
// Go: streamDefaultMaxBatchInflightPerStream = 50
|
||||||
|
public const int DefaultMaxInflightPerStream = 50;
|
||||||
|
// Go: streamDefaultMaxBatchSize = 1000
|
||||||
|
public const int DefaultMaxBatchSize = 1000;
|
||||||
|
// Go: streamDefaultMaxBatchTimeout = 10s
|
||||||
|
public static readonly TimeSpan DefaultBatchTimeout = TimeSpan.FromSeconds(10);
|
||||||
|
// Go: max batch ID length = 64
|
||||||
|
public const int MaxBatchIdLength = 64;
|
||||||
|
|
||||||
|
private readonly ConcurrentDictionary<string, InFlightBatch> _batches = new(StringComparer.Ordinal);
|
||||||
|
private readonly int _maxInflightPerStream;
|
||||||
|
private readonly int _maxBatchSize;
|
||||||
|
private readonly TimeSpan _batchTimeout;
|
||||||
|
|
||||||
|
public AtomicBatchPublishEngine(
|
||||||
|
int maxInflightPerStream = DefaultMaxInflightPerStream,
|
||||||
|
int maxBatchSize = DefaultMaxBatchSize,
|
||||||
|
TimeSpan? batchTimeout = null)
|
||||||
|
{
|
||||||
|
_maxInflightPerStream = maxInflightPerStream;
|
||||||
|
_maxBatchSize = maxBatchSize;
|
||||||
|
_batchTimeout = batchTimeout ?? DefaultBatchTimeout;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Returns the number of in-flight batches.
|
||||||
|
/// </summary>
|
||||||
|
public int InflightCount => _batches.Count;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Validates and stages/commits a batch message.
|
||||||
|
/// Returns a result indicating: stage (empty ack), commit (full ack), or error.
|
||||||
|
/// </summary>
|
||||||
|
public AtomicBatchResult Process(
|
||||||
|
BatchPublishRequest req,
|
||||||
|
PublishPreconditions preconditions,
|
||||||
|
int streamDuplicateWindowMs,
|
||||||
|
Func<StagedBatchMessage, PubAck?> commitSingle)
|
||||||
|
{
|
||||||
|
// Validate batch ID length (max 64 chars).
|
||||||
|
if (req.BatchId.Length > MaxBatchIdLength)
|
||||||
|
return AtomicBatchResult.Error(AtomicBatchPublishErrorCodes.InvalidBatchId,
|
||||||
|
"atomic publish batch ID is invalid");
|
||||||
|
|
||||||
|
// Validate commit value: only "1" and "eob" are valid; empty/"0"/"anything-else" is invalid.
|
||||||
|
if (req.IsCommit && req.CommitValue is not ("1" or "eob"))
|
||||||
|
return AtomicBatchResult.Error(AtomicBatchPublishErrorCodes.InvalidCommit,
|
||||||
|
"atomic publish batch commit is invalid");
|
||||||
|
|
||||||
|
EvictExpiredBatches();
|
||||||
|
|
||||||
|
// Check inflight limit.
|
||||||
|
if (!_batches.ContainsKey(req.BatchId) && _batches.Count >= _maxInflightPerStream)
|
||||||
|
return AtomicBatchResult.Error(AtomicBatchPublishErrorCodes.IncompleteBatch,
|
||||||
|
"atomic publish batch is incomplete");
|
||||||
|
|
||||||
|
// Sequence 1 starts a new batch.
|
||||||
|
if (req.BatchSeq == 1)
|
||||||
|
{
|
||||||
|
// Start new batch or replace any previous one with the same ID.
|
||||||
|
var newBatch = new InFlightBatch();
|
||||||
|
|
||||||
|
// Check for duplicate msg ID in existing store (preconditions).
|
||||||
|
if (req.MsgId != null && preconditions.IsDuplicate(req.MsgId, streamDuplicateWindowMs, out _))
|
||||||
|
return AtomicBatchResult.Error(AtomicBatchPublishErrorCodes.ContainsDuplicate,
|
||||||
|
"atomic publish batch contains duplicate message id");
|
||||||
|
|
||||||
|
// Stage this message.
|
||||||
|
var staged = new StagedBatchMessage
|
||||||
|
{
|
||||||
|
Subject = req.Subject,
|
||||||
|
Payload = req.Payload,
|
||||||
|
MsgId = req.MsgId,
|
||||||
|
ExpectedLastSeq = req.ExpectedLastSeq,
|
||||||
|
ExpectedLastSubjectSeq = req.ExpectedLastSubjectSeq,
|
||||||
|
ExpectedLastSubjectSeqSubject = req.ExpectedLastSubjectSeqSubject,
|
||||||
|
};
|
||||||
|
newBatch.Add(staged);
|
||||||
|
|
||||||
|
// Single-message batch that commits immediately.
|
||||||
|
if (req.IsCommit)
|
||||||
|
{
|
||||||
|
// Commit just this one message.
|
||||||
|
var ack = commitSingle(staged);
|
||||||
|
if (ack == null || ack.ErrorCode != null)
|
||||||
|
{
|
||||||
|
return AtomicBatchResult.Error(
|
||||||
|
ack?.ErrorCode ?? AtomicBatchPublishErrorCodes.IncompleteBatch,
|
||||||
|
"atomic publish batch is incomplete");
|
||||||
|
}
|
||||||
|
|
||||||
|
if (req.MsgId != null)
|
||||||
|
{
|
||||||
|
preconditions.Record(req.MsgId, ack.Seq);
|
||||||
|
preconditions.TrimOlderThan(streamDuplicateWindowMs);
|
||||||
|
}
|
||||||
|
|
||||||
|
return AtomicBatchResult.Committed(new PubAck
|
||||||
|
{
|
||||||
|
Stream = ack.Stream,
|
||||||
|
Seq = ack.Seq,
|
||||||
|
BatchId = req.BatchId,
|
||||||
|
BatchSize = 1,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check batch size limit for single-message staging.
|
||||||
|
if (1 > _maxBatchSize)
|
||||||
|
return AtomicBatchResult.Error(AtomicBatchPublishErrorCodes.TooLargeBatch,
|
||||||
|
$"atomic publish batch is too large: {_maxBatchSize}");
|
||||||
|
|
||||||
|
_batches[req.BatchId] = newBatch;
|
||||||
|
return AtomicBatchResult.Staged();
|
||||||
|
}
|
||||||
|
|
||||||
|
// Non-first sequence: find the in-flight batch.
|
||||||
|
if (!_batches.TryGetValue(req.BatchId, out var batch))
|
||||||
|
return AtomicBatchResult.Error(AtomicBatchPublishErrorCodes.IncompleteBatch,
|
||||||
|
"atomic publish batch is incomplete");
|
||||||
|
|
||||||
|
// Sequence must be consecutive (no gaps).
|
||||||
|
if (req.BatchSeq != (ulong)batch.Count + 1)
|
||||||
|
{
|
||||||
|
_batches.TryRemove(req.BatchId, out _);
|
||||||
|
return AtomicBatchResult.Error(AtomicBatchPublishErrorCodes.IncompleteBatch,
|
||||||
|
"atomic publish batch is incomplete");
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check for duplicate msg ID in store or already staged in this batch.
|
||||||
|
if (req.MsgId != null)
|
||||||
|
{
|
||||||
|
if (preconditions.IsDuplicate(req.MsgId, streamDuplicateWindowMs, out _))
|
||||||
|
{
|
||||||
|
_batches.TryRemove(req.BatchId, out _);
|
||||||
|
return AtomicBatchResult.Error(AtomicBatchPublishErrorCodes.ContainsDuplicate,
|
||||||
|
"atomic publish batch contains duplicate message id");
|
||||||
|
}
|
||||||
|
|
||||||
|
if (batch.ContainsMsgId(req.MsgId))
|
||||||
|
{
|
||||||
|
_batches.TryRemove(req.BatchId, out _);
|
||||||
|
return AtomicBatchResult.Error(AtomicBatchPublishErrorCodes.ContainsDuplicate,
|
||||||
|
"atomic publish batch contains duplicate message id");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
var stagedMsg = new StagedBatchMessage
|
||||||
|
{
|
||||||
|
Subject = req.Subject,
|
||||||
|
Payload = req.Payload,
|
||||||
|
MsgId = req.MsgId,
|
||||||
|
ExpectedLastSeq = req.ExpectedLastSeq,
|
||||||
|
ExpectedLastSubjectSeq = req.ExpectedLastSubjectSeq,
|
||||||
|
ExpectedLastSubjectSeqSubject = req.ExpectedLastSubjectSeqSubject,
|
||||||
|
};
|
||||||
|
batch.Add(stagedMsg);
|
||||||
|
|
||||||
|
// Check batch size limit.
|
||||||
|
if (batch.Count > _maxBatchSize)
|
||||||
|
{
|
||||||
|
_batches.TryRemove(req.BatchId, out _);
|
||||||
|
return AtomicBatchResult.Error(AtomicBatchPublishErrorCodes.TooLargeBatch,
|
||||||
|
$"atomic publish batch is too large: {_maxBatchSize}");
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!req.IsCommit)
|
||||||
|
{
|
||||||
|
// Not committing yet — return empty ack (flow control).
|
||||||
|
return AtomicBatchResult.Staged();
|
||||||
|
}
|
||||||
|
|
||||||
|
// Validate commit value.
|
||||||
|
if (req.CommitValue is not ("1" or "eob"))
|
||||||
|
{
|
||||||
|
_batches.TryRemove(req.BatchId, out _);
|
||||||
|
return AtomicBatchResult.Error(AtomicBatchPublishErrorCodes.InvalidCommit,
|
||||||
|
"atomic publish batch commit is invalid");
|
||||||
|
}
|
||||||
|
|
||||||
|
// Determine effective batch for eob: exclude the commit-marker message itself.
|
||||||
|
var effectiveBatch = req.CommitValue == "eob"
|
||||||
|
? batch.Messages.Take(batch.Messages.Count - 1).ToList()
|
||||||
|
: batch.Messages;
|
||||||
|
|
||||||
|
_batches.TryRemove(req.BatchId, out _);
|
||||||
|
|
||||||
|
// Atomically commit all staged messages.
|
||||||
|
ulong lastSeq = 0;
|
||||||
|
foreach (var msg in effectiveBatch)
|
||||||
|
{
|
||||||
|
var ack = commitSingle(msg);
|
||||||
|
if (ack == null || ack.ErrorCode != null)
|
||||||
|
return AtomicBatchResult.Error(
|
||||||
|
ack?.ErrorCode ?? AtomicBatchPublishErrorCodes.IncompleteBatch,
|
||||||
|
"atomic publish batch is incomplete");
|
||||||
|
|
||||||
|
if (msg.MsgId != null)
|
||||||
|
{
|
||||||
|
preconditions.Record(msg.MsgId, ack.Seq);
|
||||||
|
preconditions.TrimOlderThan(streamDuplicateWindowMs);
|
||||||
|
}
|
||||||
|
|
||||||
|
lastSeq = ack.Seq;
|
||||||
|
}
|
||||||
|
|
||||||
|
return AtomicBatchResult.Committed(new PubAck
|
||||||
|
{
|
||||||
|
Stream = effectiveBatch.Count > 0 ? (commitSingle == null ? "" : effectiveBatch[^1].Subject) : "",
|
||||||
|
Seq = lastSeq,
|
||||||
|
BatchId = req.BatchId,
|
||||||
|
BatchSize = effectiveBatch.Count,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Removes all batches from the engine (called on stream disable/delete/leader-change).
|
||||||
|
/// </summary>
|
||||||
|
public void Clear() => _batches.Clear();
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Returns whether a batch with the given ID is currently in-flight.
|
||||||
|
/// </summary>
|
||||||
|
public bool HasBatch(string batchId) => _batches.ContainsKey(batchId);
|
||||||
|
|
||||||
|
private void EvictExpiredBatches()
|
||||||
|
{
|
||||||
|
var cutoff = DateTimeOffset.UtcNow - _batchTimeout;
|
||||||
|
foreach (var (key, batch) in _batches)
|
||||||
|
{
|
||||||
|
if (batch.CreatedAt < cutoff)
|
||||||
|
_batches.TryRemove(key, out _);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Request to process a batch publish message.
|
||||||
|
/// </summary>
|
||||||
|
public sealed class BatchPublishRequest
|
||||||
|
{
|
||||||
|
public required string BatchId { get; init; }
|
||||||
|
public required ulong BatchSeq { get; init; }
|
||||||
|
public required string Subject { get; init; }
|
||||||
|
public required ReadOnlyMemory<byte> Payload { get; init; }
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// True if Nats-Batch-Commit header is present with a non-empty value.
|
||||||
|
/// </summary>
|
||||||
|
public bool IsCommit { get; init; }
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Raw value of the Nats-Batch-Commit header ("1", "eob", or other).
|
||||||
|
/// Only meaningful when IsCommit = true.
|
||||||
|
/// </summary>
|
||||||
|
public string? CommitValue { get; init; }
|
||||||
|
|
||||||
|
public string? MsgId { get; init; }
|
||||||
|
public ulong ExpectedLastSeq { get; init; }
|
||||||
|
public ulong ExpectedLastSubjectSeq { get; init; }
|
||||||
|
public string? ExpectedLastSubjectSeqSubject { get; init; }
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Result of processing a batch publish message.
|
||||||
|
/// </summary>
|
||||||
|
public sealed class AtomicBatchResult
|
||||||
|
{
|
||||||
|
public enum ResultKind { Staged, Committed, Error }
|
||||||
|
|
||||||
|
public ResultKind Kind { get; private init; }
|
||||||
|
public PubAck? CommitAck { get; private init; }
|
||||||
|
public int ErrorCode { get; private init; }
|
||||||
|
public string ErrorDescription { get; private init; } = string.Empty;
|
||||||
|
|
||||||
|
public static AtomicBatchResult Staged() => new() { Kind = ResultKind.Staged };
|
||||||
|
|
||||||
|
public static AtomicBatchResult Committed(PubAck ack) =>
|
||||||
|
new() { Kind = ResultKind.Committed, CommitAck = ack };
|
||||||
|
|
||||||
|
public static AtomicBatchResult Error(int code, string description) =>
|
||||||
|
new() { Kind = ResultKind.Error, ErrorCode = code, ErrorDescription = description };
|
||||||
|
}
|
||||||
@@ -5,6 +5,10 @@ public sealed class JetStreamPublisher
|
|||||||
private readonly StreamManager _streamManager;
|
private readonly StreamManager _streamManager;
|
||||||
private readonly PublishPreconditions _preconditions = new();
|
private readonly PublishPreconditions _preconditions = new();
|
||||||
|
|
||||||
|
// One engine per publisher (stream-scoped in real server; here publisher-scoped).
|
||||||
|
// Go reference: server/jetstream_batching.go streamBatches
|
||||||
|
private readonly AtomicBatchPublishEngine _batchEngine = new();
|
||||||
|
|
||||||
public JetStreamPublisher(StreamManager streamManager)
|
public JetStreamPublisher(StreamManager streamManager)
|
||||||
{
|
{
|
||||||
_streamManager = streamManager;
|
_streamManager = streamManager;
|
||||||
@@ -24,13 +28,19 @@ public sealed class JetStreamPublisher
|
|||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// --- Atomic batch publish path ---
|
||||||
|
// Go: server/stream.go processInboundMsg — checks batch headers before normal flow.
|
||||||
|
if (!string.IsNullOrEmpty(options.BatchId))
|
||||||
|
{
|
||||||
|
ack = ProcessBatchMessage(stream, subject, payload, options);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
// --- Normal (non-batch) publish path ---
|
||||||
var state = stream.Store.GetStateAsync(default).GetAwaiter().GetResult();
|
var state = stream.Store.GetStateAsync(default).GetAwaiter().GetResult();
|
||||||
if (!_preconditions.CheckExpectedLastSeq(options.ExpectedLastSeq, state.LastSeq))
|
if (!_preconditions.CheckExpectedLastSeq(options.ExpectedLastSeq, state.LastSeq))
|
||||||
{
|
{
|
||||||
ack = new PubAck
|
ack = new PubAck { ErrorCode = 10071 };
|
||||||
{
|
|
||||||
ErrorCode = 10071,
|
|
||||||
};
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -50,4 +60,114 @@ public sealed class JetStreamPublisher
|
|||||||
_preconditions.TrimOlderThan(stream.Config.DuplicateWindowMs);
|
_preconditions.TrimOlderThan(stream.Config.DuplicateWindowMs);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Go: server/stream.go processInboundMsg — batch message handling.
|
||||||
|
private PubAck ProcessBatchMessage(
|
||||||
|
StreamHandle stream,
|
||||||
|
string subject,
|
||||||
|
ReadOnlyMemory<byte> payload,
|
||||||
|
PublishOptions options)
|
||||||
|
{
|
||||||
|
// Stream must have AllowAtomicPublish enabled.
|
||||||
|
// Go: server/stream.go:6351 NewJSAtomicPublishDisabledError
|
||||||
|
if (!stream.Config.AllowAtomicPublish)
|
||||||
|
{
|
||||||
|
return new PubAck
|
||||||
|
{
|
||||||
|
ErrorCode = AtomicBatchPublishErrorCodes.Disabled,
|
||||||
|
Stream = stream.Config.Name,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
// BatchSeq must be present (non-zero).
|
||||||
|
// Go: server/stream.go:6371 NewJSAtomicPublishMissingSeqError
|
||||||
|
if (options.BatchSeq == 0)
|
||||||
|
{
|
||||||
|
return new PubAck
|
||||||
|
{
|
||||||
|
ErrorCode = AtomicBatchPublishErrorCodes.MissingSeq,
|
||||||
|
Stream = stream.Config.Name,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
// Nats-Expected-Last-Msg-Id is unsupported in batch context.
|
||||||
|
// Go: server/stream.go:6584 NewJSAtomicPublishUnsupportedHeaderBatchError
|
||||||
|
if (!string.IsNullOrEmpty(options.ExpectedLastMsgId))
|
||||||
|
{
|
||||||
|
return new PubAck
|
||||||
|
{
|
||||||
|
ErrorCode = AtomicBatchPublishErrorCodes.UnsupportedHeader,
|
||||||
|
Stream = stream.Config.Name,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
var commitValue = options.BatchCommit;
|
||||||
|
var isCommit = !string.IsNullOrEmpty(commitValue);
|
||||||
|
|
||||||
|
// Validate commit value immediately if present.
|
||||||
|
if (isCommit && commitValue is not ("1" or "eob"))
|
||||||
|
{
|
||||||
|
// Roll back any in-flight batch with this ID.
|
||||||
|
_batchEngine.Clear(); // simplified: in production this only removes the specific batch
|
||||||
|
return new PubAck
|
||||||
|
{
|
||||||
|
ErrorCode = AtomicBatchPublishErrorCodes.InvalidCommit,
|
||||||
|
Stream = stream.Config.Name,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
var req = new BatchPublishRequest
|
||||||
|
{
|
||||||
|
BatchId = options.BatchId!,
|
||||||
|
BatchSeq = options.BatchSeq,
|
||||||
|
Subject = subject,
|
||||||
|
Payload = payload,
|
||||||
|
IsCommit = isCommit,
|
||||||
|
CommitValue = commitValue,
|
||||||
|
MsgId = options.MsgId,
|
||||||
|
ExpectedLastSeq = options.ExpectedLastSeq,
|
||||||
|
ExpectedLastSubjectSeq = options.ExpectedLastSubjectSeq,
|
||||||
|
ExpectedLastSubjectSeqSubject = options.ExpectedLastSubjectSeqSubject,
|
||||||
|
};
|
||||||
|
|
||||||
|
var result = _batchEngine.Process(
|
||||||
|
req,
|
||||||
|
_preconditions,
|
||||||
|
stream.Config.DuplicateWindowMs,
|
||||||
|
staged =>
|
||||||
|
{
|
||||||
|
// Check expected last sequence.
|
||||||
|
if (staged.ExpectedLastSeq > 0)
|
||||||
|
{
|
||||||
|
var st = stream.Store.GetStateAsync(default).GetAwaiter().GetResult();
|
||||||
|
if (st.LastSeq != staged.ExpectedLastSeq)
|
||||||
|
return new PubAck { ErrorCode = 10071, Stream = stream.Config.Name };
|
||||||
|
}
|
||||||
|
|
||||||
|
var captured = _streamManager.Capture(staged.Subject, staged.Payload);
|
||||||
|
return captured ?? new PubAck { Stream = stream.Config.Name };
|
||||||
|
});
|
||||||
|
|
||||||
|
return result.Kind switch
|
||||||
|
{
|
||||||
|
AtomicBatchResult.ResultKind.Staged => new PubAck
|
||||||
|
{
|
||||||
|
Stream = stream.Config.Name,
|
||||||
|
// Empty ack for staged (flow control).
|
||||||
|
},
|
||||||
|
AtomicBatchResult.ResultKind.Committed => result.CommitAck!,
|
||||||
|
AtomicBatchResult.ResultKind.Error => new PubAck
|
||||||
|
{
|
||||||
|
ErrorCode = result.ErrorCode,
|
||||||
|
Stream = stream.Config.Name,
|
||||||
|
},
|
||||||
|
_ => new PubAck { Stream = stream.Config.Name },
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Clears all in-flight batches (called when stream is disabled or deleted).
|
||||||
|
/// Go: server/jetstream_batching.go streamBatches.cleanup()
|
||||||
|
/// </summary>
|
||||||
|
public void ClearBatches() => _batchEngine.Clear();
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -6,4 +6,12 @@ public sealed class PubAck
|
|||||||
public ulong Seq { get; init; }
|
public ulong Seq { get; init; }
|
||||||
public bool Duplicate { get; init; }
|
public bool Duplicate { get; init; }
|
||||||
public int? ErrorCode { get; init; }
|
public int? ErrorCode { get; init; }
|
||||||
|
|
||||||
|
// Go: JSPubAckResponse.BatchId — identifies which batch this ack belongs to.
|
||||||
|
// Go reference: server/jetstream_batching.go (JSPubAckResponse struct)
|
||||||
|
public string? BatchId { get; init; }
|
||||||
|
|
||||||
|
// Go: JSPubAckResponse.BatchSize — total number of messages committed in this batch.
|
||||||
|
// Go reference: server/jetstream_batching.go (JSPubAckResponse struct)
|
||||||
|
public int BatchSize { get; init; }
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -4,4 +4,18 @@ public sealed class PublishOptions
|
|||||||
{
|
{
|
||||||
public string? MsgId { get; init; }
|
public string? MsgId { get; init; }
|
||||||
public ulong ExpectedLastSeq { get; init; }
|
public ulong ExpectedLastSeq { get; init; }
|
||||||
|
public ulong ExpectedLastSubjectSeq { get; init; }
|
||||||
|
public string? ExpectedLastSubjectSeqSubject { get; init; }
|
||||||
|
|
||||||
|
// Go: Nats-Batch-Id header — identifies which atomic batch this message belongs to.
|
||||||
|
public string? BatchId { get; init; }
|
||||||
|
|
||||||
|
// Go: Nats-Batch-Sequence header — 1-based position within the batch.
|
||||||
|
public ulong BatchSeq { get; init; }
|
||||||
|
|
||||||
|
// Go: Nats-Batch-Commit header — "1" or "eob" to commit, null/empty to stage only.
|
||||||
|
public string? BatchCommit { get; init; }
|
||||||
|
|
||||||
|
// Go: Nats-Expected-Last-Msg-Id header — unsupported inside a batch.
|
||||||
|
public string? ExpectedLastMsgId { get; init; }
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -63,6 +63,13 @@ public sealed class StreamManager
|
|||||||
if (normalized.SubjectDeleteMarkerTtlMs > 0 && !string.IsNullOrWhiteSpace(normalized.Mirror))
|
if (normalized.SubjectDeleteMarkerTtlMs > 0 && !string.IsNullOrWhiteSpace(normalized.Mirror))
|
||||||
return JetStreamApiResponse.ErrorResponse(10054, "mirror configuration can not have subject delete marker TTL");
|
return JetStreamApiResponse.ErrorResponse(10054, "mirror configuration can not have subject delete marker TTL");
|
||||||
|
|
||||||
|
// Go: NewJSMirrorWithAtomicPublishError (10198) — mirror + AllowAtomicPublish is invalid.
|
||||||
|
// Reference: server/stream.go:1735-1737
|
||||||
|
if (normalized.AllowAtomicPublish && !string.IsNullOrWhiteSpace(normalized.Mirror))
|
||||||
|
return JetStreamApiResponse.ErrorResponse(
|
||||||
|
AtomicBatchPublishErrorCodes.MirrorWithAtomicPublish,
|
||||||
|
"stream mirrors can not also use atomic publishing");
|
||||||
|
|
||||||
// Go: RePublish cycle detection — destination must not overlap stream subjects.
|
// Go: RePublish cycle detection — destination must not overlap stream subjects.
|
||||||
// Reference: server/stream.go:1060-1080 (checkRePublish)
|
// Reference: server/stream.go:1060-1080 (checkRePublish)
|
||||||
if (!string.IsNullOrWhiteSpace(normalized.RePublishDest))
|
if (!string.IsNullOrWhiteSpace(normalized.RePublishDest))
|
||||||
@@ -389,6 +396,14 @@ public sealed class StreamManager
|
|||||||
SubjectDeleteMarkerTtlMs = config.SubjectDeleteMarkerTtlMs,
|
SubjectDeleteMarkerTtlMs = config.SubjectDeleteMarkerTtlMs,
|
||||||
// Go: StreamConfig.AllowMsgSchedules
|
// Go: StreamConfig.AllowMsgSchedules
|
||||||
AllowMsgSchedules = config.AllowMsgSchedules,
|
AllowMsgSchedules = config.AllowMsgSchedules,
|
||||||
|
// Go: StreamConfig.AllowMsgCounter — CRDT counter semantics
|
||||||
|
AllowMsgCounter = config.AllowMsgCounter,
|
||||||
|
// Go: StreamConfig.AllowAtomicPublish — atomic batch publish
|
||||||
|
AllowAtomicPublish = config.AllowAtomicPublish,
|
||||||
|
// Go: StreamConfig.PersistMode — async vs sync persistence
|
||||||
|
PersistMode = config.PersistMode,
|
||||||
|
// Go: StreamConfig.Metadata — user and server key/value metadata
|
||||||
|
Metadata = config.Metadata == null ? null : new Dictionary<string, string>(config.Metadata),
|
||||||
};
|
};
|
||||||
|
|
||||||
return copy;
|
return copy;
|
||||||
|
|||||||
@@ -191,6 +191,60 @@ internal sealed class JetStreamApiFixture : IAsyncDisposable
|
|||||||
return Task.FromResult(new PubAck { ErrorCode = 404 });
|
return Task.FromResult(new PubAck { ErrorCode = 404 });
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Publishes a batch message with the Nats-Batch-Id, Nats-Batch-Sequence (and optionally
|
||||||
|
/// Nats-Batch-Commit) headers simulated via PublishOptions.
|
||||||
|
/// Returns PubAck with ErrorCode set on error, empty BatchId on staged (flow-control), or
|
||||||
|
/// full ack with BatchId+BatchSize on commit.
|
||||||
|
/// </summary>
|
||||||
|
public Task<PubAck> BatchPublishAsync(
|
||||||
|
string subject,
|
||||||
|
string payload,
|
||||||
|
string batchId,
|
||||||
|
ulong batchSeq,
|
||||||
|
string? commitValue = null,
|
||||||
|
string? msgId = null,
|
||||||
|
ulong expectedLastSeq = 0,
|
||||||
|
string? expectedLastMsgId = null)
|
||||||
|
{
|
||||||
|
var options = new PublishOptions
|
||||||
|
{
|
||||||
|
BatchId = batchId,
|
||||||
|
BatchSeq = batchSeq,
|
||||||
|
BatchCommit = commitValue,
|
||||||
|
MsgId = msgId,
|
||||||
|
ExpectedLastSeq = expectedLastSeq,
|
||||||
|
ExpectedLastMsgId = expectedLastMsgId,
|
||||||
|
};
|
||||||
|
|
||||||
|
if (_publisher.TryCaptureWithOptions(subject, Encoding.UTF8.GetBytes(payload), options, out var ack))
|
||||||
|
return Task.FromResult(ack);
|
||||||
|
|
||||||
|
return Task.FromResult(new PubAck { ErrorCode = 404 });
|
||||||
|
}
|
||||||
|
|
||||||
|
public StreamConfig? GetStreamConfig(string streamName)
|
||||||
|
{
|
||||||
|
return _streamManager.TryGet(streamName, out var handle) ? handle.Config : null;
|
||||||
|
}
|
||||||
|
|
||||||
|
public bool UpdateStream(StreamConfig config)
|
||||||
|
{
|
||||||
|
var result = _streamManager.CreateOrUpdate(config);
|
||||||
|
return result.Error == null;
|
||||||
|
}
|
||||||
|
|
||||||
|
public JetStreamApiResponse UpdateStreamWithResult(StreamConfig config)
|
||||||
|
{
|
||||||
|
return _streamManager.CreateOrUpdate(config);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Exposes the underlying JetStreamPublisher for advanced test scenarios
|
||||||
|
/// (e.g. calling ClearBatches to simulate a leader change).
|
||||||
|
/// </summary>
|
||||||
|
public JetStreamPublisher GetPublisher() => _publisher;
|
||||||
|
|
||||||
public Task<JetStreamApiResponse> RequestLocalAsync(string subject, string payload)
|
public Task<JetStreamApiResponse> RequestLocalAsync(string subject, string payload)
|
||||||
{
|
{
|
||||||
return Task.FromResult(_router.Route(subject, Encoding.UTF8.GetBytes(payload)));
|
return Task.FromResult(_router.Route(subject, Encoding.UTF8.GetBytes(payload)));
|
||||||
|
|||||||
Reference in New Issue
Block a user