diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamBatching.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamBatching.cs index 465338d..6182716 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamBatching.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamBatching.cs @@ -13,10 +13,18 @@ // // Adapted from server/jetstream_batching.go in the NATS server Go source. +using System.Numerics; +using System.Text; +using System.Text.Json; +using ZB.MOM.NatsNet.Server.Internal.DataStructures; + namespace ZB.MOM.NatsNet.Server; internal static class JetStreamBatching { + private const string JsStreamSource = "Nats-Stream-Source"; + private const string JsMessageCounterSources = "Nats-Counter-Sources"; + internal static readonly TimeSpan StreamDefaultMaxBatchTimeout = TimeSpan.FromSeconds(10); internal const string BatchTimeout = "timeout"; @@ -117,6 +125,490 @@ internal static class JetStreamBatching ArgumentNullException.ThrowIfNull(batchGroup); batchGroup.StopLocked(); } + + internal static void Commit(BatchStagedDiff diff, BatchRuntimeState state) + { + ArgumentNullException.ThrowIfNull(diff); + ArgumentNullException.ThrowIfNull(state); + + if (diff.MsgIds is { Count: > 0 }) + { + var now = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() * 1_000_000L; + foreach (var msgId in diff.MsgIds.Keys) + state.MsgIds[msgId] = new NatsStream.DedupeEntry { Id = msgId, Seq = 0, TimestampNanos = now }; + } + + if (diff.Counter is { Count: > 0 }) + { + foreach (var entry in diff.Counter) + state.ClusteredCounterTotal[entry.Key] = entry.Value; + } + + if (diff.Inflight is { Count: > 0 }) + { + foreach (var pair in diff.Inflight) + { + if (state.Inflight.TryGetValue(pair.Key, out var existing)) + { + existing.Bytes += pair.Value.Bytes; + existing.Ops += pair.Value.Ops; + } + else + { + state.Inflight[pair.Key] = pair.Value; + } + } + } + + if (diff.ExpectedPerSubject is { Count: > 0 }) + { + foreach (var pair in diff.ExpectedPerSubject) + { + state.ExpectedPerSubjectSequence[pair.Value.ClSeq] = pair.Key; + state.ExpectedPerSubjectInProcess.Add(pair.Key); + } + } + } + + internal static void ClearBatchStateLocked(BatchApply batch) + { + ArgumentNullException.ThrowIfNull(batch); + batch.ClearBatchStateLocked(); + } + + internal static void RejectBatchStateLocked(BatchApply batch, BatchApplyContext context) + { + ArgumentNullException.ThrowIfNull(batch); + ArgumentNullException.ThrowIfNull(context); + batch.RejectBatchStateLocked(context); + } + + internal static void RejectBatchState(BatchApply batch, BatchApplyContext context) + { + ArgumentNullException.ThrowIfNull(batch); + ArgumentNullException.ThrowIfNull(context); + batch.RejectBatchState(context); + } + + internal static (byte[] Hdr, byte[] Msg, ulong Sequence, JsApiError? ApiError, Exception? Error) CheckMsgHeadersPreClusteredProposal( + BatchStagedDiff diff, + BatchHeaderCheckContext context, + string subject, + byte[]? hdr, + byte[]? msg, + bool sourced, + string name, + bool allowRollup, + bool denyPurge, + bool allowTtl, + bool allowMsgCounter, + bool allowMsgSchedules, + DiscardPolicy discard, + bool discardNewPer, + int maxMsgSize, + long maxMsgs, + long maxMsgsPer, + long maxBytes) + { + ArgumentNullException.ThrowIfNull(diff); + ArgumentNullException.ThrowIfNull(context); + ArgumentNullException.ThrowIfNull(context.Store); + ArgumentException.ThrowIfNullOrWhiteSpace(subject); + ArgumentException.ThrowIfNullOrWhiteSpace(name); + + hdr ??= Array.Empty(); + msg ??= Array.Empty(); + BigInteger? incr = null; + + if (hdr.Length > 0) + { + if (hdr.Length > ushort.MaxValue) + { + var err = new InvalidOperationException($"JetStream header size exceeds limits for '{context.AccountName} > {name}'"); + return (hdr, msg, 0, JsApiErrors.NewJSStreamHeaderExceedsMaximumError(), err); + } + + var (parsedIncr, okIncr) = NatsStream.GetMessageIncr(hdr); + if (!okIncr) + { + var apiErr = JsApiErrors.NewJSMessageIncrInvalidError(); + return (hdr, msg, 0, apiErr, new InvalidOperationException(apiErr.Description)); + } + + incr = parsedIncr; + if (incr != null && !sourced) + { + if (!allowMsgCounter) + { + var apiErr = JsApiErrors.NewJSMessageIncrDisabledError(); + return (hdr, msg, 0, apiErr, new InvalidOperationException(apiErr.Description)); + } + + if (msg.Length > 0) + { + var apiErr = JsApiErrors.NewJSMessageIncrPayloadError(); + return (hdr, msg, 0, apiErr, new InvalidOperationException(apiErr.Description)); + } + + var hasInvalidCounterHeader = + NatsStream.GetRollup(hdr) != string.Empty || + NatsStream.GetExpectedStream(hdr) != string.Empty || + NatsStream.GetExpectedLastMsgId(hdr) != string.Empty || + NatsStream.GetExpectedLastSeqPerSubjectForSubject(hdr) != string.Empty || + NatsStream.GetExpectedLastSeq(hdr).Exists || + NatsStream.GetExpectedLastSeqPerSubject(hdr).Exists; + + if (hasInvalidCounterHeader) + { + var apiErr = JsApiErrors.NewJSMessageIncrInvalidError(); + return (hdr, msg, 0, apiErr, new InvalidOperationException(apiErr.Description)); + } + } + + var expectedStream = NatsStream.GetExpectedStream(hdr); + if (!string.IsNullOrEmpty(expectedStream) && !string.Equals(expectedStream, name, StringComparison.Ordinal)) + return (hdr, msg, 0, JsApiErrors.NewJSStreamNotMatchError(), new InvalidOperationException("stream mismatch")); + + var ttlValue = NatsMessageHeaders.GetHeader(NatsHeaderConstants.JsMessageTtl, hdr) is { Length: > 0 } ttlHdr + ? Encoding.ASCII.GetString(ttlHdr) + : string.Empty; + var (ttl, ttlErr) = string.IsNullOrEmpty(ttlValue) + ? (0L, (Exception?)null) + : NatsStream.ParseMessageTTL(ttlValue); + if (!sourced && (ttl != 0 || ttlErr != null)) + { + if (!allowTtl) + return (hdr, msg, 0, JsApiErrors.NewJSMessageTTLDisabledError(), new InvalidOperationException("message ttl disabled")); + + if (ttlErr != null) + return (hdr, msg, 0, JsApiErrors.NewJSMessageTTLInvalidError(), ttlErr); + } + + var msgId = NatsStream.GetMsgId(hdr); + if (!string.IsNullOrEmpty(msgId)) + { + diff.MsgIds ??= new Dictionary(StringComparer.Ordinal); + if (diff.MsgIds.ContainsKey(msgId)) + { + var err = new InvalidOperationException("duplicate message id in staged batch"); + return (hdr, msg, 0, JsApiErrors.NewJSAtomicPublishContainsDuplicateMessageError(), err); + } + + if (context.MsgIds.TryGetValue(msgId, out var dedupe)) + { + var err = new InvalidOperationException("duplicate message id"); + if (dedupe.Seq > 0) + return (hdr, msg, dedupe.Seq, JsApiErrors.NewJSAtomicPublishContainsDuplicateMessageError(), err); + + return (hdr, msg, 0, JsApiErrors.NewJSStreamDuplicateMessageConflictError(), err); + } + + diff.MsgIds[msgId] = null; + } + } + + if (incr is null && allowMsgCounter) + { + var apiErr = JsApiErrors.NewJSMessageIncrMissingError(); + return (hdr, msg, 0, apiErr, new InvalidOperationException(apiErr.Description)); + } + + if (incr != null && allowMsgCounter) + { + var initial = BigInteger.Zero; + Dictionary>? sources = null; + MsgCounterRunningTotal? counter = null; + + diff.Counter ??= new Dictionary(StringComparer.Ordinal); + if (diff.Counter.TryGetValue(subject, out var stagedCounter)) + { + initial = stagedCounter.Total; + sources = stagedCounter.Sources; + counter = stagedCounter; + } + else if (context.ClusteredCounterTotal.TryGetValue(subject, out var committedCounter)) + { + initial = committedCounter.Total; + sources = committedCounter.Sources; + counter = new MsgCounterRunningTotal { Ops = committedCounter.Ops }; + } + else + { + var last = context.Store.LoadLastMsg(subject, new StoreMsg()); + if (last != null) + { + try + { + var current = JsonSerializer.Deserialize(last.Msg); + if (current?.Value is null || !BigInteger.TryParse(current.Value, out initial)) + throw new InvalidOperationException("invalid counter payload"); + + var sourceHdr = NatsMessageHeaders.SliceHeader(JsMessageCounterSources, last.Hdr); + if (sourceHdr is { Length: > 0 }) + sources = JsonSerializer.Deserialize>>(sourceHdr.Value.ToArray()); + } + catch (Exception ex) + { + return (hdr, msg, 0, JsApiErrors.NewJSMessageCounterBrokenError(), ex); + } + } + } + + var sourceHeader = NatsMessageHeaders.SliceHeader(JsStreamSource, hdr); + if (sourceHeader is { Length: > 0 }) + { + try + { + var fields = Encoding.ASCII.GetString(sourceHeader.Value.Span).Split(' ', StringSplitOptions.RemoveEmptyEntries); + var originStream = fields.Length > 0 ? fields[0] : string.Empty; + var originSubject = fields.Length >= 5 ? fields[4] : subject; + + var current = JsonSerializer.Deserialize(msg); + if (current?.Value is null || !BigInteger.TryParse(current.Value, out var sourcedValue)) + throw new InvalidOperationException("invalid sourced counter payload"); + + sources ??= new Dictionary>(StringComparer.Ordinal); + if (!sources.TryGetValue(originStream, out var bySubject)) + { + bySubject = new Dictionary(StringComparer.Ordinal); + sources[originStream] = bySubject; + } + + var previousRaw = bySubject.GetValueOrDefault(originSubject); + _ = BigInteger.TryParse(previousRaw, out var previousValue); + bySubject[originSubject] = sourcedValue.ToString(); + incr = sourcedValue - previousValue; + hdr = NatsMessageHeaders.SetHeader(NatsHeaderConstants.JsMessageIncr, incr.ToString(), hdr); + } + catch (Exception ex) + { + return (hdr, msg, 0, JsApiErrors.NewJSMessageCounterBrokenError(), ex); + } + } + + initial += incr.Value; + msg = Encoding.ASCII.GetBytes($"{{\"val\":\"{initial}\"}}"); + if (sources is { Count: > 0 }) + hdr = NatsMessageHeaders.SetHeader(JsMessageCounterSources, JsonSerializer.Serialize(sources), hdr); + + var maxSize = context.MaxPayload; + if (maxMsgSize >= 0 && maxMsgSize < maxSize) + maxSize = maxMsgSize; + if (hdr.Length > maxSize || msg.Length > maxSize - hdr.Length) + return (hdr, msg, 0, JsApiErrors.NewJSStreamMessageExceedsMaximumError(), ServerErrors.ErrMaxPayload); + + counter ??= new MsgCounterRunningTotal(); + counter.Total = initial; + counter.Sources = sources; + counter.Ops++; + diff.Counter[subject] = counter; + } + + if (hdr.Length > 0) + { + var (lastSeq, hasLastSeq) = NatsStream.GetExpectedLastSeq(hdr); + var actualLastSeq = context.ClSeq - context.Clfs; + if (hasLastSeq && lastSeq != actualLastSeq) + return (hdr, msg, 0, JsApiErrors.NewJSStreamWrongLastSequenceError(actualLastSeq), new InvalidOperationException($"last sequence mismatch: {lastSeq} vs {actualLastSeq}")); + if (hasLastSeq && diff.Inflight is { Count: > 0 }) + return (hdr, msg, 0, JsApiErrors.NewJSStreamWrongLastSequenceConstantError(), new InvalidOperationException("last sequence mismatch")); + + var (expectedPerSubject, hasExpectedPerSubject) = NatsStream.GetExpectedLastSeqPerSubject(hdr); + if (hasExpectedPerSubject) + { + var seqSubject = subject; + var overrideSubject = NatsStream.GetExpectedLastSeqPerSubjectForSubject(hdr); + if (!string.IsNullOrEmpty(overrideSubject)) + seqSubject = overrideSubject; + + if ((diff.Inflight?.ContainsKey(seqSubject) ?? false) || + context.ExpectedPerSubjectInProcess.Contains(seqSubject) || + context.Inflight.ContainsKey(seqSubject)) + return (hdr, msg, 0, JsApiErrors.NewJSStreamWrongLastSequenceConstantError(), new InvalidOperationException("last sequence by subject mismatch")); + + diff.ExpectedPerSubject ??= new Dictionary(StringComparer.Ordinal); + if (diff.ExpectedPerSubject.TryGetValue(seqSubject, out var existingExpected)) + { + if (existingExpected.SSeq != expectedPerSubject) + return (hdr, msg, 0, JsApiErrors.NewJSStreamWrongLastSequenceError(existingExpected.SSeq), new InvalidOperationException($"last sequence by subject mismatch: {expectedPerSubject} vs {existingExpected.SSeq}")); + existingExpected.ClSeq = context.ClSeq; + } + else + { + var stored = context.Store.LoadLastMsg(seqSubject, new StoreMsg()); + var foundSeq = stored?.Seq ?? 0; + if (foundSeq != expectedPerSubject) + return (hdr, msg, 0, JsApiErrors.NewJSStreamWrongLastSequenceError(foundSeq), new InvalidOperationException($"last sequence by subject mismatch: {expectedPerSubject} vs {foundSeq}")); + + diff.ExpectedPerSubject[seqSubject] = new BatchExpectedPerSubject + { + SSeq = foundSeq, + ClSeq = context.ClSeq, + }; + } + } + else if (!string.IsNullOrEmpty(NatsStream.GetExpectedLastSeqPerSubjectForSubject(hdr))) + { + var apiErr = JsApiErrors.NewJSStreamExpectedLastSeqPerSubjectInvalidError(); + return (hdr, msg, 0, apiErr, new InvalidOperationException(apiErr.Description)); + } + + var (schedule, hasValidSchedulePattern) = NatsStream.GetMessageSchedule(hdr); + if (!hasValidSchedulePattern) + { + var apiErr = allowMsgSchedules + ? JsApiErrors.NewJSMessageSchedulesPatternInvalidError() + : JsApiErrors.NewJSMessageSchedulesDisabledError(); + return (hdr, msg, 0, apiErr, new InvalidOperationException(apiErr.Description)); + } + if (schedule != default) + { + if (!allowMsgSchedules) + { + var apiErr = JsApiErrors.NewJSMessageSchedulesDisabledError(); + return (hdr, msg, 0, apiErr, new InvalidOperationException(apiErr.Description)); + } + + var (scheduleTtl, scheduleTtlOk) = NatsStream.GetMessageScheduleTTL(hdr); + if (!scheduleTtlOk) + { + var apiErr = JsApiErrors.NewJSMessageSchedulesTTLInvalidError(); + return (hdr, msg, 0, apiErr, new InvalidOperationException(apiErr.Description)); + } + if (!string.IsNullOrEmpty(scheduleTtl) && !allowTtl) + return (hdr, msg, 0, JsApiErrors.NewJSMessageTTLDisabledError(), new InvalidOperationException("message ttl disabled")); + + var scheduleTarget = NatsStream.GetMessageScheduleTarget(hdr); + if (string.IsNullOrEmpty(scheduleTarget) || + !SubscriptionIndex.IsValidPublishSubject(scheduleTarget) || + SubscriptionIndex.SubjectsCollide(scheduleTarget, subject)) + { + var apiErr = JsApiErrors.NewJSMessageSchedulesTargetInvalidError(); + return (hdr, msg, 0, apiErr, new InvalidOperationException(apiErr.Description)); + } + + var scheduleSource = NatsStream.GetMessageScheduleSource(hdr); + if (!string.IsNullOrEmpty(scheduleSource) && + (string.Equals(scheduleSource, scheduleTarget, StringComparison.Ordinal) || + string.Equals(scheduleSource, subject, StringComparison.Ordinal) || + !SubscriptionIndex.IsValidPublishSubject(scheduleSource))) + { + var apiErr = JsApiErrors.NewJSMessageSchedulesSourceInvalidError(); + return (hdr, msg, 0, apiErr, new InvalidOperationException(apiErr.Description)); + } + + if (context.StreamSubjects.Length > 0 && !context.StreamSubjects.Any(s => SubscriptionIndex.SubjectsCollide(s, scheduleTarget))) + { + var apiErr = JsApiErrors.NewJSMessageSchedulesTargetInvalidError(); + return (hdr, msg, 0, apiErr, new InvalidOperationException(apiErr.Description)); + } + + var rollup = NatsStream.GetRollup(hdr); + if (string.IsNullOrEmpty(rollup)) + { + hdr = NatsMessageHeaders.GenHeader(hdr, NatsHeaderConstants.JsMsgRollup, NatsHeaderConstants.JsMsgRollupSubject); + } + else if (!string.Equals(rollup, NatsHeaderConstants.JsMsgRollupSubject, StringComparison.Ordinal)) + { + var apiErr = JsApiErrors.NewJSMessageSchedulesRollupInvalidError(); + return (hdr, msg, 0, apiErr, new InvalidOperationException(apiErr.Description)); + } + } + + var headerRollup = NatsStream.GetRollup(hdr); + if (!string.IsNullOrEmpty(headerRollup)) + { + if (!allowRollup || denyPurge) + { + var err = new InvalidOperationException("rollup not permitted"); + return (hdr, msg, 0, JsApiErrors.NewJSStreamRollupFailedError(err), err); + } + + if (string.Equals(headerRollup, NatsHeaderConstants.JsMsgRollupSubject, StringComparison.Ordinal)) + { + if (diff.Inflight?.ContainsKey(subject) ?? false) + { + var err = new InvalidOperationException("batch rollup sub invalid"); + return (hdr, msg, 0, JsApiErrors.NewJSStreamRollupFailedError(err), err); + } + } + else if (string.Equals(headerRollup, NatsHeaderConstants.JsMsgRollupAll, StringComparison.Ordinal)) + { + if (diff.Inflight is { Count: > 0 }) + { + var err = new InvalidOperationException("batch rollup all invalid"); + return (hdr, msg, 0, JsApiErrors.NewJSStreamRollupFailedError(err), err); + } + } + else + { + var err = new InvalidOperationException($"rollup value invalid: {headerRollup}"); + return (hdr, msg, 0, JsApiErrors.NewJSStreamRollupFailedError(err), err); + } + } + } + + diff.Inflight ??= new Dictionary(StringComparer.Ordinal); + var msgSize = context.Store.Type() == StorageType.FileStorage + ? JetStreamFileStore.FileStoreMsgSizeRaw(subject.Length, hdr.Length, msg.Length) + : JetStreamMemStore.MemStoreMsgSizeRaw(subject.Length, hdr.Length, msg.Length); + + if (diff.Inflight.TryGetValue(subject, out var inflight)) + { + inflight.Bytes += msgSize; + inflight.Ops++; + } + else + { + inflight = new InflightSubjectRunningTotal { Bytes = msgSize, Ops = 1 }; + diff.Inflight[subject] = inflight; + } + + if (discard == DiscardPolicy.DiscardNew) + { + if (maxMsgs > 0 || maxBytes > 0) + { + var state = new StreamState(); + context.Store.FastState(state); + var totalMsgs = state.Msgs; + var totalBytes = state.Bytes; + foreach (var inflightState in context.Inflight.Values) + { + totalMsgs += inflightState.Ops; + totalBytes += inflightState.Bytes; + } + foreach (var inflightState in diff.Inflight.Values) + { + totalMsgs += inflightState.Ops; + totalBytes += inflightState.Bytes; + } + + Exception? thresholdErr = null; + if (maxMsgs > 0 && totalMsgs > (ulong)maxMsgs) + thresholdErr = StoreErrors.ErrMaxMsgs; + else if (maxBytes > 0 && totalBytes > (ulong)maxBytes) + thresholdErr = StoreErrors.ErrMaxBytes; + + if (thresholdErr != null) + return (hdr, msg, 0, JsApiErrors.NewJSStreamStoreFailedError(thresholdErr, JsApiErrors.Unless(thresholdErr)), thresholdErr); + } + + if (discardNewPer && maxMsgsPer > 0) + { + var bySubject = context.Store.SubjectsTotals(subject); + var totalForSubject = bySubject.GetValueOrDefault(subject) + inflight.Ops; + if (context.Inflight.TryGetValue(subject, out var streamInflight)) + totalForSubject += streamInflight.Ops; + if (totalForSubject > (ulong)maxMsgsPer) + { + var err = StoreErrors.ErrMaxMsgsPerSubject; + return (hdr, msg, 0, JsApiErrors.NewJSStreamStoreFailedError(err, JsApiErrors.Unless(err)), err); + } + } + } + + return (hdr, msg, 0, null, null); + } } internal interface IBatchTimer @@ -258,10 +750,10 @@ internal sealed class BatchStagedDiff public Dictionary? MsgIds { get; set; } /// Running counter totals, keyed by subject. - public Dictionary? Counter { get; set; } + public Dictionary? Counter { get; set; } /// Inflight subject byte/op totals for DiscardNew checks. - public Dictionary? Inflight { get; set; } + public Dictionary? Inflight { get; set; } /// Expected-last-seq-per-subject checks staged in this batch. public Dictionary? ExpectedPerSubject { get; set; } @@ -295,7 +787,7 @@ internal sealed class BatchApply public ulong Count { get; set; } /// Raft committed entries that make up this batch. - public List? Entries { get; set; } + public List? Entries { get; set; } /// Index within an entry indicating the first message of the batch. public int EntryStart { get; set; } @@ -317,4 +809,77 @@ internal sealed class BatchApply EntryStart = 0; MaxApplied = 0; } + + public void RejectBatchStateLocked(BatchApplyContext context) + { + ArgumentNullException.ThrowIfNull(context); + + context.Clfs += Count; + if (Entries is { Count: > 0 }) + { + foreach (var entry in Entries) + entry.ReturnToPool(); + } + + ClearBatchStateLocked(); + } + + public void RejectBatchState(BatchApplyContext context) + { + lock (_mu) + { + RejectBatchStateLocked(context); + } + } +} + +internal interface ICommittedEntry +{ + void ReturnToPool(); +} + +internal sealed class BatchApplyContext +{ + public ulong Clfs { get; set; } +} + +internal sealed class MsgCounterRunningTotal +{ + public ulong Ops { get; set; } + public BigInteger Total { get; set; } + public Dictionary>? Sources { get; set; } +} + +internal sealed class InflightSubjectRunningTotal +{ + public ulong Bytes { get; set; } + public ulong Ops { get; set; } +} + +internal sealed class BatchRuntimeState +{ + public Dictionary MsgIds { get; } = new(StringComparer.Ordinal); + public Dictionary ClusteredCounterTotal { get; } = new(StringComparer.Ordinal); + public Dictionary Inflight { get; } = new(StringComparer.Ordinal); + public Dictionary ExpectedPerSubjectSequence { get; } = new(); + public HashSet ExpectedPerSubjectInProcess { get; } = new(StringComparer.Ordinal); +} + +internal sealed class BatchHeaderCheckContext +{ + public string AccountName { get; set; } = string.Empty; + public required IStreamStore Store { get; set; } + public Dictionary MsgIds { get; } = new(StringComparer.Ordinal); + public Dictionary ClusteredCounterTotal { get; } = new(StringComparer.Ordinal); + public Dictionary Inflight { get; } = new(StringComparer.Ordinal); + public HashSet ExpectedPerSubjectInProcess { get; } = new(StringComparer.Ordinal); + public string[] StreamSubjects { get; set; } = Array.Empty(); + public ulong ClSeq { get; set; } + public ulong Clfs { get; set; } + public int MaxPayload { get; set; } = int.MaxValue; +} + +internal sealed class CounterValue +{ + public string Value { get; set; } = "0"; } diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/JetStreamBatchingCoreTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/JetStreamBatchingCoreTests.cs index 53055a2..d6822d3 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/JetStreamBatchingCoreTests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/JetStreamBatchingCoreTests.cs @@ -143,4 +143,165 @@ public sealed class JetStreamBatchingCoreTests store.Received(1).Stop(); JetStreamBatching.GlobalInflightBatches.ShouldBe(0); } + + [Fact] + public void Commit_DiffContainsState_UpdatesRuntimeState() + { + var diff = new BatchStagedDiff + { + MsgIds = new Dictionary { ["id-1"] = null }, + Counter = new Dictionary + { + ["foo"] = new() { Ops = 2, Total = new System.Numerics.BigInteger(5) }, + }, + Inflight = new Dictionary + { + ["foo"] = new() { Bytes = 10, Ops = 1 }, + }, + ExpectedPerSubject = new Dictionary + { + ["foo"] = new() { SSeq = 3, ClSeq = 9 }, + }, + }; + + var state = new BatchRuntimeState(); + + JetStreamBatching.Commit(diff, state); + + state.MsgIds.ContainsKey("id-1").ShouldBeTrue(); + state.ClusteredCounterTotal["foo"].Total.ShouldBe(new System.Numerics.BigInteger(5)); + state.Inflight["foo"].Ops.ShouldBe((ulong)1); + state.ExpectedPerSubjectSequence[9].ShouldBe("foo"); + state.ExpectedPerSubjectInProcess.Contains("foo").ShouldBeTrue(); + } + + [Fact] + public void BatchApplyWrappers_ClearAndRejectPaths_UpdateState() + { + var entry = new TestCommittedEntry(); + var apply = new BatchApply + { + Id = "batch-A", + Count = 3, + EntryStart = 4, + MaxApplied = 8, + Entries = [entry], + }; + + var context = new BatchApplyContext { Clfs = 10 }; + JetStreamBatching.RejectBatchStateLocked(apply, context); + + context.Clfs.ShouldBe((ulong)13); + entry.Returned.ShouldBeTrue(); + apply.Id.ShouldBe(string.Empty); + apply.Count.ShouldBe((ulong)0); + + apply.Id = "batch-B"; + apply.Count = 2; + JetStreamBatching.ClearBatchStateLocked(apply); + apply.Id.ShouldBe(string.Empty); + apply.Count.ShouldBe((ulong)0); + } + + [Fact] + public void CheckMsgHeadersPreClusteredProposal_ExpectedStreamMismatch_ReturnsNotMatchError() + { + var store = new JetStreamMemStore(new StreamConfig { Name = "ORDERS", Storage = StorageType.MemoryStorage }); + try + { + var hdr = NatsMessageHeaders.GenHeader(null, NatsHeaderConstants.JsExpectedStream, "OTHER"); + var diff = new BatchStagedDiff(); + var context = new BatchHeaderCheckContext + { + Store = store, + ClSeq = 1, + Clfs = 0, + MaxPayload = 1024, + StreamSubjects = ["ORDERS.>"], + }; + + var (_, _, _, apiError, error) = JetStreamBatching.CheckMsgHeadersPreClusteredProposal( + diff, + context, + "ORDERS.created", + hdr, + [], + sourced: false, + name: "ORDERS", + allowRollup: true, + denyPurge: false, + allowTtl: true, + allowMsgCounter: false, + allowMsgSchedules: false, + discard: DiscardPolicy.DiscardOld, + discardNewPer: false, + maxMsgSize: -1, + maxMsgs: -1, + maxMsgsPer: -1, + maxBytes: -1); + + apiError.ShouldNotBeNull(); + apiError!.ErrCode.ShouldBe(JsApiErrors.StreamNotMatch.ErrCode); + error.ShouldNotBeNull(); + } + finally + { + store.Stop(); + } + } + + [Fact] + public void CheckMsgHeadersPreClusteredProposal_DuplicateMsgIdInBatch_ReturnsDuplicateError() + { + var store = new JetStreamMemStore(new StreamConfig { Name = "ORDERS", Storage = StorageType.MemoryStorage }); + try + { + var hdr = NatsMessageHeaders.GenHeader(null, NatsHeaderConstants.JsMsgId, "msg-1"); + var diff = new BatchStagedDiff + { + MsgIds = new Dictionary { ["msg-1"] = null }, + }; + var context = new BatchHeaderCheckContext + { + Store = store, + ClSeq = 1, + MaxPayload = 1024, + }; + + var (_, _, _, apiError, error) = JetStreamBatching.CheckMsgHeadersPreClusteredProposal( + diff, + context, + "ORDERS.created", + hdr, + [], + sourced: false, + name: "ORDERS", + allowRollup: true, + denyPurge: false, + allowTtl: true, + allowMsgCounter: false, + allowMsgSchedules: false, + discard: DiscardPolicy.DiscardOld, + discardNewPer: false, + maxMsgSize: -1, + maxMsgs: -1, + maxMsgsPer: -1, + maxBytes: -1); + + apiError.ShouldNotBeNull(); + apiError!.ErrCode.ShouldBe(JsApiErrors.AtomicPublishContainsDuplicateMessage.ErrCode); + error.ShouldNotBeNull(); + } + finally + { + store.Stop(); + } + } +} + +internal sealed class TestCommittedEntry : ICommittedEntry +{ + public bool Returned { get; private set; } + + public void ReturnToPool() => Returned = true; } diff --git a/porting.db b/porting.db index c655db9..fb85e7d 100644 Binary files a/porting.db and b/porting.db differ